Sorting a Django queryset by a custom sequence of an attribute

· 7 min

I needed a way to sort a Django queryset based on a custom sequence of an attribute. Typically, Django allows sorting a queryset by any attribute on the model or related to it in either ascending or descending order. However, what if you need to sort the queryset following a custom sequence of attribute values?

Suppose, you’re working with a model called Product where you want to sort the rows of the table based on a list of product ids that are already sorted in a particular order. Here’s how it might look:

Deduplicating iterables while preserving order in Python

· 5 min

Whenever I need to deduplicate the items of an iterable in Python, my usual approach is to create a set from the iterable and then convert it back into a list or tuple. However, this approach doesn’t preserve the original order of the items, which can be a problem if you need to keep the order unscathed. Here’s a naive approach that works:

from __future__ import annotations

from collections.abc import Iterable  # Python >3.9


def dedup(it: Iterable) -> list:
    seen = set()
    result = []
    for item in it:
        if item not in seen:
            seen.add(item)
            result.append(item)
    return result


it = (2, 1, 3, 4, 66, 0, 1, 1, 1)
deduped_it = dedup(it)  # Gives you [2, 1, 3, 4, 66, 0]

This code snippet defines a function dedup that takes an iterable it as input and returns a new list containing the unique items of the input iterable in their original order. The function uses a set seen to keep track of the items that have already been seen, and a list result to store the unique items.

Pushing real-time updates to clients with Server-Sent Events (SSEs)

· 14 min

In multi-page web applications, a common workflow is where a user:

  • Loads a specific page or clicks on some button that triggers a long-running task.
  • On the server side, a background worker picks up the task and starts processing it asynchronously.
  • The page shouldn’t reload while the task is running.
  • The backend then communicates the status of the long-running task in real-time.
  • Once the task is finished, the client needs to display a success or an error message depending on the final status of the finished task.

The de facto tool for handling situations where real-time bidirectional communication is necessary is WebSocket. However, in the case above, you can see that the communication is mostly unidirectional where the client initiates some action in the server and then the server continuously pushes data to the client during the lifespan of the background job.

Signal handling in a multithreaded socket server

· 9 min

While working on a multithreaded socket server in an embedded environment, I realized that the default behavior of Python’s socketserver.ThreadingTCPServer requires some extra work if you want to shut down the server gracefully in the presence of an interruption signal. The intended behavior here is that whenever any of SIGHUP, SIGINT, SIGTERM, or SIGQUIT signals are sent to the server, it should:

  • Acknowledge the signal and log a message to the output console of the server.
  • Notify all the connected clients that the server is going offline.
  • Give the clients enough time (specified by a timeout parameter) to close the requests.
  • Close all the client requests and then shut down the server after the timeout exceeds.

Here’s a quick implementation of a multithreaded echo server and see what happens when you send SIGINT to shut down the server:

Switching between multiple data streams in a single thread

· 4 min

I was working on a project where I needed to poll multiple data sources and consume the incoming data points in a single thread. In this particular case, the two data streams were coming from two different Redis lists. The correct way to consume them would be to write two separate consumers and spin them up as different processes.

However, in this scenario, I needed a simple way to poll and consume data from one data source, wait for a bit, then poll and consume from another data source, and keep doing this indefinitely. That way I could get away with doing the whole workflow in a single thread without the overhead of managing multiple processes.

Skipping the first part of an iterable in Python

· 3 min

Consider this iterable:

it = (1, 2, 3, 0, 4, 5, 6, 7)

Let’s say you want to build another iterable that includes only the numbers that appear starting from the element 0. Usually, I’d do this:

# This returns (0, 4, 5, 6, 7).
from_zero = tuple(elem for idx, elem in enumerate(it) if idx >= it.index(0))

While this is quite terse and does the job, it won’t work with a generator. There’s an even more generic and terser way to do the same thing with itertools.dropwhile function. Here’s how to do it:

Pausing and resuming a socket server in Python

· 5 min

I needed to write a socket server in Python that would allow me to intermittently pause the server loop for a while, run something else, then get back to the previous request-handling phase; repeating this iteration until the heat death of the universe. Initially, I opted for the low-level socket module to write something quick and dirty. However, the implementation got hairy pretty quickly. While the socket module gives you plenty of control over how you can tune the server’s behavior, writing a server with robust signal and error handling can be quite a bit of boilerplate work.

Debugging a containerized Django application in Jupyter Notebook

· 5 min

Back in the days when I was working as a data analyst, I used to spend hours inside Jupyter notebooks exploring, wrangling, and plotting data to gain insights. However, as I shifted my career gear towards backend software development, my usage of interactive exploratory tools dwindled.

Nowadays, I spend the majority of my time working on a fairly large Django monolith accompanied by a fleet of microservices. Although I love my text editor and terminal emulators, I miss the ability to just start a Jupyter Notebook server and run code snippets interactively. While Django allows you to open up a shell environment and run code snippets interactively, it still isn’t as flexible as a notebook.

Manipulating text with query expressions in Django

· 4 min

I was working with a table that had a similar (simplified) structure like this:

|               uuid               |         file_path         |
|----------------------------------|---------------------------|
| b8658dfc3e80446c92f7303edf31dcbd | media/private/file_1.pdf  |
| 3d750874a9df47388569a23c559a4561 | media/private/file_2.csv  |
| d177b7f7d8b046768ab65857451a0354 | media/private/file_3.txt  |
| df45742175d7451dad59761f15653d9d | media/private/image_1.png |
| a542966fc193470dab84351c15523042 | media/private/image_2.jpg |

Let’s say the above table is represented by the following Django model:

from django.db import models


class FileCabinet(models.Model):
    uuid = models.UUIDField(
        primary_key=True, default=uuid.uuid4, editable=False
    )
    file_path = models.FileField(upload_to="files/")

I needed to extract the file names with their extensions from the file_path column and create new paths by adding the prefix dir/ before each file name. This would involve stripping everything before the file name from a file path and adding the prefix, resulting in a list of new file paths like this: ['dir/file_1.pdf', ..., 'dir/image_2.jpg'].

Using tqdm with concurrent.fututes in Python

· 3 min

At my workplace, I was writing a script to download multiple files from different S3 buckets. The script relied on Django ORM, so I couldn’t use Python’s async paradigm to speed up the process. Instead, I opted for boto3 to download the files and concurrent.futures.ThreadPoolExecutor to spin up multiple threads and make the requests concurrently.

However, since the script was expected to be long-running, I needed to display progress bars to show the state of execution. It’s quite easy to do with tqdm when you’re just looping over a list of file paths and downloading the contents synchronously: