Bulk operations in Django with process pool

· 4 min

I’ve rarely been able to take advantage of Django’s bulk_create / bulk_update APIs in production applications; especially in the cases where I need to create or update multiple complex objects with a script. Often time, these complex objects trigger a chain of signals or need non-trivial setups before any operations can be performed on each of them.

The issue is, bulk_create / bulk_update doesn’t trigger these signals or expose any hooks to run any setup code. The Django doc mentions these bulk_create caveats in detail. Here are a few of them:

Limit concurrency with semaphore in Python asyncio

· 5 min

I was working with a rate-limited API endpoint where I continuously needed to send short-polling GET requests without hitting HTTP 429 error. Perusing the API doc, I found out that the API endpoint only allows a maximum of 100 requests per second. So, my goal was to find out a way to send the maximum amount of requests without encountering the too-many-requests error.

I picked up Python’s asyncio and the amazing HTTPx library by Tom Christie to make the requests. This is the naive version that I wrote in the beginning; it quickly hits the HTTP 429 error:

Running tqdm with Python multiprocessing

· 2 min

Making tqdm play nice with multiprocessing requires some additional work. It’s not always obvious and I don’t want to add another third-party dependency just for this purpose.

The following example attempts to make tqdm work with multiprocessing.imap_unordered. However, this should also work with similar mapping methods like - multiprocessing.map, multiprocessing.imap, multiprocessing.starmap, etc.

"""
Run `pip install tqdm` before running the script.

The function `foo` is going to be executed 100 times across
`MAX_WORKERS=5` processes. In a single pass, each process will
get an iterable of size `CHUNK_SIZE=5`. So 5 processes each consuming
5 elements of an iterable will require (100 / (5*5)) 4 passes to finish
consuming the entire iterable of 100 elements.

Tqdm progress bar will update every `MAX_WORKERS*CHUNK_SIZE` iterations.
"""

# src.py


from __future__ import annotations

import multiprocessing as mp

from tqdm import tqdm
import time

import random
from dataclasses import dataclass

MAX_WORKERS = 5
CHUNK_SIZE = 5


@dataclass
class StartEnd:
    start: int
    end: int


def foo(start_end: StartEnd) -> int:
    time.sleep(0.2)
    return random.randint(start_end.start, start_end.end)


def main() -> None:
    inputs = [
        StartEnd(start, end)
        for start, end in zip(
            range(0, 100),
            range(100, 200),
        )
    ]

    with mp.Pool(processes=MAX_WORKERS) as pool:
        results = tqdm(
            pool.imap_unordered(foo, inputs, chunksize=CHUNK_SIZE),
            total=len(inputs),
        )  # 'total' is redundant here but can be useful
        # when the size of the iterable is unobvious

        for result in results:
            print(result)


if __name__ == "__main__":
    main()

This will print:

Use daemon threads to test infinite while loops in Python

· 1 min

Python’s daemon threads are cool. A Python script will stop when the main thread is done and only daemon threads are running. To test a simple hello function that runs indefinitely, you can do the following:

# test_hello.py
from __future__ import annotations

import asyncio
import threading
from functools import partial
from unittest.mock import patch


async def hello() -> None:
    while True:
        await asyncio.sleep(1)
        print("hello")


@patch("asyncio.sleep", autospec=True)
async def test_hello(mock_asyncio_sleep, capsys):
    run = partial(asyncio.run, hello())
    t = threading.Thread(target=run, daemon=True)
    t.start()
    t.join(timeout=0.1)

    out, err = capsys.readouterr()
    assert err == ""
    assert "hello" in out
    mock_asyncio_sleep.assert_awaited()

To execute the script, make sure you’ve your virtual env actiavated. Also you’ll need to install pytest and pytest-asyncio. Then run:

Effortless concurrency with Python's concurrent.futures

· 17 min

Writing concurrent code in Python can be tricky. Before you even start, you have to worry about all this icky stuff like whether the task at hand is I/O or CPU bound or whether putting the extra effort to achieve concurrency is even going to give you the boost you need. Also, the presence of Global Interpreter Lock, GIL foists further limitations on writing truly concurrent code. But for the sake of sanity, you can oversimplify it like this without being blatantly incorrect: