The current title of this post is probably incorrect and may even be misleading. I had a hard time coming up with a suitable name for it. But the idea goes like this: sometimes you might find yourself in a situation where you need to iterate through a generator more than once. Sure, you can use an iterable like a tuple or list to allow multiple iterations, but if the number of elements is large, that’ll cause an OOM error. On the other hand, once you’ve already consumed a generator, you’ll need to restart it if you want to go through it again. This behavior is common in pretty much every programming language that supports the generator construct.
Python
Escaping the template pattern hellscape in Python
Over the years, I’ve used the template pattern across multiple OO languages with varying degrees of success. It was one of the first patterns I learned in the primordial hours of my software engineering career, and for some reason, it just feels like the natural way to tackle many real-world code-sharing problems. Yet, even before I jumped on board with the composition over inheritance camp, I couldn’t help but notice how using this particular inheritance technique spawns all sorts of design and maintenance headaches as the codebase starts to grow.
Python dependency management redux
One major drawback of Python’s huge ecosystem is the significant variances in workflows among people trying to accomplish different things. This holds true for dependency management as well. Depending on what you’re doing with Python - whether it’s building reusable libraries, writing web apps, or diving into data science and machine learning - your workflow can look completely different from someone else’s. That being said, my usual approach to any development process is to pick a method and give it a shot to see if it works for my specific needs. Once a process works, I usually automate it and rarely revisit it unless something breaks.
Implementing a simple traceroute clone in Python
I was watching Storytelling with traceroute lightning talk by Karla Burnett and wanted to
understand how traceroute works in Unix. Traceroute is a tool that shows the route of a
network packet from your computer to another computer on the internet. It also tells you how
long it takes for the packet to reach each stop along the way.
It’s useful when you want to know more about how your computer connects to other computers on the internet. For example, if you want to visit a website, your computer sends a request to the website’s server, which is another computer that hosts the website. But the request doesn’t go directly from your computer to the server. It has to pass through several other devices, such as routers, that help direct the traffic on the internet. These devices are called hops. Traceroute shows you the list of hops that your request goes through, and how long it takes for each hop to respond. This can help you troubleshoot network problems, such as slow connections or unreachable websites.
Sorting a Django queryset by a custom sequence of an attribute
I needed a way to sort a Django queryset based on a custom sequence of an attribute. Typically, Django allows sorting a queryset by any attribute on the model or related to it in either ascending or descending order. However, what if you need to sort the queryset following a custom sequence of attribute values?
Suppose, you’re working with a model called Product where you want to sort the rows of the
table based on a list of product ids that are already sorted in a particular order. Here’s
how it might look:
Deduplicating iterables while preserving order in Python
Whenever I need to deduplicate the items of an iterable in Python, my usual approach is to create a set from the iterable and then convert it back into a list or tuple. However, this approach doesn’t preserve the original order of the items, which can be a problem if you need to keep the order unscathed. Here’s a naive approach that works:
from __future__ import annotations
from collections.abc import Iterable # Python >3.9
def dedup(it: Iterable) -> list:
seen = set()
result = []
for item in it:
if item not in seen:
seen.add(item)
result.append(item)
return result
it = (2, 1, 3, 4, 66, 0, 1, 1, 1)
deduped_it = dedup(it) # Gives you [2, 1, 3, 4, 66, 0]
This code snippet defines a function dedup that takes an iterable it as input and
returns a new list containing the unique items of the input iterable in their original
order. The function uses a set seen to keep track of the items that have already been
seen, and a list result to store the unique items.
Pushing real-time updates to clients with Server-Sent Events (SSEs)
In multi-page web applications, a common workflow is where a user:
- Loads a specific page or clicks on some button that triggers a long-running task.
- On the server side, a background worker picks up the task and starts processing it asynchronously.
- The page shouldn’t reload while the task is running.
- The backend then communicates the status of the long-running task in real-time.
- Once the task is finished, the client needs to display a success or an error message depending on the final status of the finished task.
The de facto tool for handling situations where real-time bidirectional communication is necessary is WebSocket. However, in the case above, you can see that the communication is mostly unidirectional where the client initiates some action in the server and then the server continuously pushes data to the client during the lifespan of the background job.
Tinkering with Unix domain sockets
I’ve always had a vague idea about what Unix domain sockets are from my experience working with Docker for the past couple of years. However, lately, I’m spending more time in embedded edge environments and had to explore Unix domain sockets in a bit more detail. This is a rough documentation of what I’ve explored to gain some insights.
The dry definition
Unix domain sockets (UDS) are similar to TCP sockets in a way that they allow two processes to communicate with each other, but there are some core differences. While TCP sockets are used for communication over a network, Unix domain sockets are used for communication between processes running on the same computer.
Signal handling in a multithreaded socket server
While working on a multithreaded socket server in an embedded environment, I realized that
the default behavior of Python’s socketserver.ThreadingTCPServer requires some extra work
if you want to shut down the server gracefully in the presence of an interruption signal.
The intended behavior here is that whenever any of SIGHUP, SIGINT, SIGTERM, or
SIGQUIT signals are sent to the server, it should:
- Acknowledge the signal and log a message to the output console of the server.
- Notify all the connected clients that the server is going offline.
- Give the clients enough time (specified by a timeout parameter) to close the requests.
- Close all the client requests and then shut down the server after the timeout exceeds.
Here’s a quick implementation of a multithreaded echo server and see what happens when you
send SIGINT to shut down the server:
Switching between multiple data streams in a single thread
I was working on a project where I needed to poll multiple data sources and consume the incoming data points in a single thread. In this particular case, the two data streams were coming from two different Redis lists. The correct way to consume them would be to write two separate consumers and spin them up as different processes.
However, in this scenario, I needed a simple way to poll and consume data from one data source, wait for a bit, then poll and consume from another data source, and keep doing this indefinitely. That way I could get away with doing the whole workflow in a single thread without the overhead of managing multiple processes.