I was cobbling together a long-running Go script to send webhook messages to a system when some events occur. The initial script would continuously poll a Kafka topic for events and spawn new goroutines to make HTTP requests to the destination. This had two problems:
- It could create unlimited goroutines if many events arrived quickly
- It might overload the destination system by making many concurrent requests
In Python, I’d use just asyncio.Semaphore
to limit concurrency. I’ve previously written
about this here1. Turns out, in Go, you could do the same with a buffered channel. Here’s
how the naive version looks:
package main
import ("fmt"; "sync")
func worker(id int, wg *sync.WaitGroup) {
defer wg.Done()
// ... Send http post request
fmt.Println("Sending webhook request")
}
func main() {
var wg sync.WaitGroup
nWorkers := 10
for i := 1; i <= nWorkers; i++ {
wg.Add(1)
go worker(i, &wg)
}
wg.Wait()
fmt.Println("All workers have completed")
}
We’re sending the webhook request in the worker
function. It takes an integer ID for
bookkeeping and a pointer to a WaitGroup
instance for synchronization. Once it finishes
making the request, it signals the WaitGroup
with wg.Done()
. In the main
function, we
spawn 10 workers as goroutines and wait for all of them to finish work with wg.Wait()
.
Without the wait-group synchronization, the main
goroutine would bail before all the
background workers finish their work.
In the above scenario, all the requests were made in parallel. How can we limit the system
to only allow n
number of concurrent requests at the same time? Sure, you can choose to
spin up n
number of goroutines and no more. But how do you do it from inside an infinite
loop that’s also polling a queue continuously?
In this case, I want to throttle the script so that it’ll send 2 requests in parallel and then wait until those are done. Then it’ll wait for a bit before firing up the next batch of 2 goroutines and continuously repeat the same process. Buffered channels allow us to do exactly that. Observe:
package main
import ("fmt"; "sync"; "time")
func worker(id int, sem chan struct{}, wg *sync.WaitGroup) {
defer wg.Done()
// Acquire semaphore
fmt.Printf("Worker %d: Waiting to acquire semaphore\n", id)
sem <- struct{}{}
// Do work
fmt.Printf("Worker %d: Semaphore acquired, running\n", id)
time.Sleep(10 * time.Millisecond)
// Release semaphore
<-sem
fmt.Printf("Worker %d: Semaphore released\n", id)
}
func main() {
nWorkers := 10 // Total number of goroutines
maxConcurrency := 2 // Allowed to run at the same time
batchInterval := 50 * time.Millisecond // Delay between each batch of 2 goros
// Create a buffered channel with a capacity of maxConcurrency
sem := make(chan struct{}, maxConcurrency)
var wg sync.WaitGroup
// We start 10 goroutines but only 2 of them will run in parallel
for i := 1; i <= nWorkers; i++ {
wg.Add(1)
go worker(i, sem, &wg)
// Introduce a delay after each batch of workers
if i % maxConcurrency == 0 && i != nWorkers {
fmt.Printf("Waiting for batch interval...\n")
time.Sleep(batchInterval)
}
}
wg.Wait()
close(sem) // Remember to close the channel once done
fmt.Println("All workers have completed")
}
The clever bit here is the buffered channel named sem
which acts as a semaphore to limit
concurrency. We set its capacity to the max number of goroutines we want running at once, in
this case 2. Before making the request, each worker
goroutine tries to acquire the
semaphore by sending a value into the channel via sem <- struct{}{}
. The value itself
doesn’t matter. So we’re just sending an empty struct to avoid redundant allocation.
Sending data to the channel will block if it’s already full, essentially meaning all
permits are taken. Once the send succeeds, the goroutine has acquired the semaphore and is
free to proceed with its work. When finished, it releases the semaphore by reading from
the channel <-sem
. This frees up a slot in the channel for another goroutine to acquire
it. By using this semaphore channel to limit access to critical sections, we can precisely
control the number of concurrent goroutines.
This channel-based semaphore gives us more flexibility than just using a WaitGroup
.
Combining it with a buffered channel provides fine-grained control over simultaneous
goroutine execution. The buffer size of the channel determines the allowed parallelism, 2
here. We’ve also thrown in an extra bit of delay after each batch of operation finishes
with:
// Introduce additional delay after each batch of workers
if i % maxConcurrency == 0 && i != nWorkers {
fmt.Printf("Waiting for batch interval...\n")
time.Sleep(batchInterval)
}
Running the script will show that although we’ve started 10 goroutines in the main
function, only 2 of them run at once. Also, there’s a delay of 3 seconds between each batch.
We can tune it according to our need to be lenient on the consumer.
Waiting for batch interval...
Worker 2: Waiting to acquire semaphore
Worker 2: Semaphore acquired, running
Worker 1: Waiting to acquire semaphore
Worker 1: Semaphore acquired, running
Worker 1: Semaphore released
Worker 2: Semaphore released
Waiting for batch interval...
Worker 4: Waiting to acquire semaphore
Worker 4: Semaphore acquired, running
Worker 3: Waiting to acquire semaphore
Worker 3: Semaphore acquired, running
Worker 3: Semaphore released
Worker 4: Semaphore released
Waiting for batch interval...
...
Now, you might want to add extra abstractions over the core behavior to make it more ergonomic. Here’s a pointer2 on how to do so. Effective Go also mentions3 this pattern briefly.
Recent posts
- Explicit method overriding with @typing.override
- Quicker startup with module-level __getattr__
- Docker mount revisited
- Topological sort
- Writing a circuit breaker in Go
- Discovering direnv
- Notes on building event-driven systems
- Bash namerefs for dynamic variable referencing
- Behind the blog
- Shell redirection syntax soup