Introduction

AI applications spend a lot of time waiting.

They wait on model responses, database queries, or even agent tool calls. Most AI applications are IO-bound. They wait on external services, not CPU cycles.

asyncio is a library for writing faster, more scalable IO-bound code in Python. It uses async/await syntax to run concurrent code, allowing you to send hundreds of model requests at once instead of one at a time.

Making concurrent API calls is the most common usecase of asyncio, but you can also use it to build way cooler things like rate limiters, producer-consumer pipelines, or web crawlers. This is a guide about building those cool things.

How this guide is structured

This guide walks through six real-world applications of asyncio:

  • LLM Responses — Call LLM APIs concurrently without blocking
  • Rate Limiters — Control API request throughput to stay within rate limits
  • Data Pipelines — Process large datasets with producer-consumer pipelines
  • Request Batchers — Batch API requests for efficiency
  • Web Crawlers — Efficiently crawl the web and parse web pages
  • Tool-Calling Agents — Build agents that execute tools concurrently

Each section follows a challenge-solution format inspired by John Crickett's Coding Challenges.

Try to solve each challenge yourself first. Research shows that productive failure leads to deeper learning, even if you struggle and can't come up with the solution.

About me

Hi! I'm Abdul. I build infrastructure for Gemini fine-tuning and batch inference at Google. I care about making AI development easier, faster, and more accessible.

If you find this guide helpful, check out my blog at abdulsaleh.dev.

Source Code

You can find the markdown source code for this guide on github. It was compiled in Rust using mdbook.

The Basics

The best way to learn asyncio is by doing.

asyncio lets you write concurrent code in Python. When you make API calls or read files, your program usually waits. With asyncio, you can do other work while waiting. This makes your code faster.

Each chapter has a "Before you start" section that lists the relevant asyncio functions and classes. You can learn about these as you go.

If you want to understand how asyncio works first, take a look at this tutorial or read the official conceptual overview.

For syntax and API reference, see the official asyncio documentation.

Async LLM Responses

Challenge

When working with LLMs, you often need to make API calls to generate responses for multiple prompts or user requests. Making these calls synchronously means waiting for each response before sending the next, which is slow and inefficient.

In this challenge, you'll use asyncio to send concurrent requests to the Gemini API. You'll start with a basic synchronous script, measure its performance, then use asyncio to write a faster async solution.

Before you start

The following functions or classes are relevant for this chapter. It might be helpful to read their docs before you start:

  • asyncio.gather() for waiting on running tasks.

Step 0

To get started, get a Gemini API key from Google AI Studio. We use the Gemini API because it has a generous free tier, but any async model API will work.

export GEMINI_API_KEY="YOUR_API_KEY"

Step 1

In this step, your goal is to verify you can call the Gemini API.

Install the Google GenAI SDK and make your first request. Write your code to script.py.

pip install -q -U google-genai
from google import genai

client = genai.Client()

response = client.models.generate_content(
    model="gemini-flash-latest", contents="Why do some birds migrate?"
)

Step 2

In this step, your goal is send multiple requests to the Gemini API and time your code.

Run generate_content() in a loop with five iterations. Time your script. How long does it take to make five requests?

time python script.py

Step 3

In this step, your goal is to make 5 concurrent requests using the async Gemini API:

response = await client.aio.models.generate_content(
    model="gemini-flash-latest", contents="Why do some birds migrate?"
)

Use await asyncio.gather() to wait on the responses.

Now run and time your code. How long does it take?

Step 4

What happens if you increase the number of requests? At what point do you hit rate limits?

Going Further

  • See the Rate Limiters chapter to build your own rate limiter and avoid hitting rate limits.
  • Creating asyncio tasks does not scale indefinitely. If you create tens of thousands of tasks you can run out of memory or see worse performance due to scheduling overhead. If you have a large number of requests, it's better to use a producer-consumer queue and a fixed number of consumer tasks. See the Data Pipelines to learn how to do this.

Now take some time to attempt the challenge before looking at the solution!


Solution

Below is a walkthrough of one possible solution. Your implementation may differ, and that's okay! The key concepts are using async/await with the Gemini API and asyncio.gather() for concurrency.

Step 2 - Solution

Let's call generate_content() in a loop:

from google import genai
from google.genai.types import GenerateContentResponse

_NUM_REQUESTS = 5


def generate_content(index: int, client: genai.Client) -> GenerateContentResponse:
    response = client.models.generate_content(
        model="gemini-flash-latest", contents="Why do some birds migrate?"
    )
    print(f"Request {index} completed")
    return response


def main():
    client = genai.Client()
    results = [generate_content(i, client) for i in range(_NUM_REQUESTS)]


if __name__ == "__main__":
    main()

This takes about 30 seconds to run, including the time to connect to the client.

time python script.py
> Request 0 completed
> Request 1 completed
> Request 2 completed
> Request 3 completed
> Request 4 completed
> 
> real    0m32.134s
> user    0m1.733s
> sys     0m0.204s

Step 3 - Solution

Now let's define some tasks and use asyncio.gather() to run them concurrently:

import asyncio
from google import genai
from google.genai.types import GenerateContentResponse


_NUM_REQUESTS = 5


async def generate_content(index: int, client: genai.Client) -> GenerateContentResponse:
    response = await client.aio.models.generate_content(
        model="gemini-flash-latest", contents="Why do some birds migrate?"
    )
    print(f"Request {index} completed")
    return response


async def main():
    client = genai.Client()
    tasks = [generate_content(i, client) for i in range(_NUM_REQUESTS)]
    results = await asyncio.gather(*tasks)


if __name__ == "__main__":
    asyncio.run(main())

We only create the client once as we only need to establish one client connection.

This takes ~10 seconds to run, about three times faster than before. The requests run concurrently and don't return in the same order.

time python script.py
> Request 3 completed
> Request 4 completed
> Request 1 completed
> Request 0 completed
> Request 2 completed
> 
> real    0m8.460s
> user    0m1.860s
> sys     0m0.240s

Step 4 - Solution

If we increase _NUM_REQUESTS = 20 we quickly hit the rate limit.

google.genai.errors.ClientError: 429 RESOURCE_EXHAUSTED. {'error': {'code': 429, 
 'message': 'You exceeded your current quota, please check your plan and billing details....

See the Rate Limiters chapter to build your own rate limiter and avoid hitting rate limits.

Rate Limiters

Challenge

Model provider APIs have strict rate limits. It's very easy to exceed these limits if you're making requests concurrently without throttling.

In this challenge, you will build a sliding window rate limiter for calling async APIs.

A sliding window rate limiter works by keeping a queue of the most recent N requests within a given time window.

Before processing a new request, the rate limiter removes any requests that fall outside the time window, then checks if the number of remaining requests is below the limit. If so, the new request is added to the queue and processed. Otherwise, the limiter waits until old requests have aged out of the window.

Before you start

The following functions or classes are relevant for this chapter. It might be helpful to read their docs before you start:

  • asyncio.gather() for waiting on running tasks.
  • asyncio.get_running_loop().time() for getting the current time in a coroutine.
  • asyncio.Lock() for protecting shared resources.
  • asyncio.sleep() for waiting in a coroutine.

Step 0

To get started, get a Gemini API key from Google AI Studio. We use the Gemini API because it has a generous free tier, but any async model API will work.

export GEMINI_API_KEY="YOUR_API_KEY"

Step 1

In this step, your goal is to make concurrent requests to the Gemini API and hit the rate limits.

Create a new script (script.py) that makes 20 concurrent requests to the Gemini API then run your script. The solution to the LLM Responses chapter explains how to do this.

Confirm you get a resource exhausted error:

python script.py
> google.genai.errors.ClientError: 429 RESOURCE_EXHAUSTED. {'error': {'code': 429,
 'message': 'You exceeded your current quota, please check your plan and billing details.
  For more information on this error, head to: https://ai.google.dev/gemini-api/docs/rate-limits. ...

At the time of writing, Gemini 2.5 Flash had a limit fo 10 requests per minute on the free tier.

Step 2

In this step, your goal is to implement the sliding window rate limiter.

Below is skeleton code for a RateLimiter class. You need to implement the acquire() method:

import asyncio
from collections import deque
from datetime import timedelta


class RateLimiter:
    def __init__(self, limit: int, interval: timedelta):
        self._limit = limit
        self._interval = interval.total_seconds()
        # Holds request timestamps.
        self._window = deque()

    async def acquire(self) -> None:
        """Wait until a new request can be made under the rate limit."""
        pass

acquire() will be awaited until requests can be made under the rate limit:

async def generate_content(client, rate_limiter):
    # Waits until we are under the rate limit.
    await rate_limiter.acquire()
    response = await client.aio.models.generate_content(
        model="gemini-flash-latest", contents="Why do some birds migrate?"
    )
    return response

The acquire() method should:

  1. Acquire a lock to prevent race conditions when making changes to the window. You can create the lock using asyncio.Lock().
  2. Use asyncio.get_running_loop().time() to get the current time in seconds.
  3. Remove old requests from window to ensure it only has requests that were made within the past interval seconds.
  4. If the window has fewer than limit requests, the request is allowed. Add the current request time to the window and return.
  5. If the limit is reached, calculate how long to wait for the oldest request to age out of the window, then sleep with asyncio.sleep().
  6. Retry the above steps in a while loop.

Step 3

In this step, your goal is to test your rate limiter.

Update your concurrent code to call await limiter.acquire() before making requests.

Verify that the rate limiter delays requests to avoid hitting the Gemini API rate limits.

Going Further

  • Try implementing other rate limiting algorithms like token bucket. This will require keeping track of "tokens" and replenishing them in every iteration at a fixed rate.
  • Implement a sliding window rate limiter that avoids busy-waiting and respects request order. The rate limiter should not use while loops or asyncio.sleep(). When the limit is reached, create a future with loop.create_future() and add it to a waiters queue, then await it. When a request is sent, use loop.call_later(interval, callback) to schedule a callback that will wake up the next waiter from the futures queue. Effectively, every allowed requests reserves a slot that expires in interval seconds when the callback is called and unblocks the next waiter in line and lets the next request through.

Now take some time to attempt the challenge before looking at the solution!


Solution

Below is a walkthrough of one possible solution. Your implementation may differ, and that's okay!

Step 1 - Solution

See the LLM Responses solution to make concurrent requests to the Gemini API. Increase _NUM_REQUESTS = 20 to trigger the rate limit error.

Step 2 - Solution

import asyncio
from collections import deque
from datetime import timedelta


class RateLimiter:
    def __init__(self, limit: int, interval: timedelta):
        self._limit = limit
        self._interval = interval.total_seconds()
        self._window = deque()
        self._lock = asyncio.Lock()

    def _prune_window(self, now: float) -> None:
        """Removes requests that have aged out of the time window."""
        while self._window and now - self._window[0] > self._interval:
            self._window.popleft()

    async def acquire(self) -> None:
        loop = asyncio.get_running_loop()
        while True:
            async with self._lock:
                now = loop.time()
                self._prune_window(now)

                if len(self._window) < self._limit:
                    # We have space in the sliding window to send a request.
                    self._window.append(now)
                    return

                # Wait for the oldest request to age out of the window.
                oldest_request_time = self._window[0]
                elapsed = now - oldest_request_time
                remaining = self._interval - elapsed

            await asyncio.sleep(remaining)

Note how:

  • _lock prevents race conditions when multiple tasks call acquire() simultaneously
  • _prune_window() removes requests outside the sliding window
  • We release the lock before sleeping to allow other tasks to check the rate limit

This solution suffers from the "thundering heard" problem. If multiple tasks are sleeping, all of them will wake up at the same time to try to acquire the lock. Only one request will be allowed, and the remaining tasks will need to sleep again.

One way to avoid this problem is to implement the rate limiter using futures as described in the Going Further section.

Step 3 - Solution

Now let's integrate the rate limiter with our Gemini API calls:

import asyncio
from datetime import datetime, timedelta

from google import genai

_NUM_REQUESTS = 20


class RateLimiter:
    # Same as above.
    ...


async def generate_content(index, client, rate_limiter):
    await rate_limiter.acquire()
    print(f"Request {index} sent at {datetime.now().strftime('%H:%M:%S')}")
    response = await client.aio.models.generate_content(
        model="gemini-flash-latest", contents="Why do some birds migrate?"
    )
    return response


async def main():
    # Gemini Flash Latest has a rate limit of 10 requests per minute.
    limiter = RateLimiter(limit=10, interval=timedelta(minutes=1))

    client = genai.Client()
    tasks = [generate_content(i, client, limiter) for i in range(_NUM_REQUESTS)]
    results = await asyncio.gather(*tasks)


if __name__ == "__main__":
    asyncio.run(main())

Now when we run this with 20 requests, it completes successfully without hitting rate limits:

time python script.py
> Request 0 sent at 22:31:10
> Request 1 sent at 22:31:10
> Request 2 sent at 22:31:10
> Request 3 sent at 22:31:10
> Request 4 sent at 22:31:10
> Request 5 sent at 22:31:10
> Request 6 sent at 22:31:10
> Request 7 sent at 22:31:10
> Request 8 sent at 22:31:10
> Request 9 sent at 22:31:10
    # Note how we wait one minute before sending the 11th request to stay
    # within the rate limit.
> Request 19 sent at 22:32:10
> Request 18 sent at 22:32:10
> Request 17 sent at 22:32:10
> Request 16 sent at 22:32:10
> Request 15 sent at 22:32:10
> Request 14 sent at 22:32:10
> Request 12 sent at 22:32:10
> Request 11 sent at 22:32:10
> Request 13 sent at 22:32:10
> Request 10 sent at 22:32:10
> 
> real    1m9.061s
> user    0m2.072s
> sys     0m0.402s

The first 10 requests are allowed immediately, then the rate limiter automatically pauses until enough time has passed to send the next batch. All 20 requests succeed without any resource exhausted errors.

Data Pipelines

Challenge

In this challenge, you will build a concurrent data pipeline. The pipeline will read some prompts from disk, call the Gemini API, then write the model responses back to disk.

Reading/writing files and sending API requests are I/O operations so asyncio is a great fit for speeding up this pipeline. Instead of waiting on disk I/O or model responses, you can be doing work concurrently to save time.

You can also extend this pipeline to do CPU-bound work. See the Going Further section if you want to learn how.

Before you start

The following functions or classes are relevant for this chapter. It might be helpful to read their docs before you start:

  • asyncio.gather() for waiting on running tasks.
  • asyncio.Queue() for creating async queues.
  • queue.task_done() and queue.join() for tracking queue completion.
  • task.cancel() for canceling running tasks.
  • aiofiles.open() for async file operations.

Producer-Consumer Queues

You will be using a producer-consumer queue in this challenge, which is a key design pattern for concurrent code.

A queue allows different tasks to communicate with each other. A producer task can add items to a queue, and a consumer task can process these items as they are added.

Queues are useful when you have:

  1. Different pipeline stages: You can interleave reading/writing files while making API calls. You're not stuck waiting on any one stage.
  2. Streaming data: You can add data to the input queue and it will get processed as it is added.
  3. Multiple workers per stage: You can have multiple tasks working concurrently which is faster (see the LLM Responses chapter.)
  4. Backpressure control: Limiting the queue sizes prevents memory overflow if producers are faster than consumers.

Take a look at this example:

import asyncio

_NUM_ITEMS = 3


async def producer(queue: asyncio.Queue):
    for i in range(_NUM_ITEMS):
        print(f"Adding item {i}")
        await queue.put(i)
        # Pass control back to the event loop so the consumer can run.
        await asyncio.sleep(0.1)


async def consumer(queue: asyncio.Queue):
    try:
        while True:
            item = await queue.get()
            print(f"Processing the item {item}")
            queue.task_done()

    except asyncio.CancelledError:
        print("Shutting down consumer.")


async def main():
    queue = asyncio.Queue()

    consumer_task = asyncio.create_task(consumer(queue))
    producer_task = asyncio.create_task(producer(queue))

    # Wait until all the producers are done.
    # This ensures all items have been added to the queue.
    await producer_task

    # Wait until the queue is empty.
    # .put() and .task_done() must be called an equal number of times.
    await queue.join()

    # Cancel the consumers since the queue is empty.
    consumer_task.cancel()
    await consumer_task


if __name__ == "__main__":
    asyncio.run(main())

The code above produces the following output:

> Adding item 0
> Processing the item 0
> Adding item 1
> Processing the item 1
> Adding item 2
> Processing the item 2
> Shutting down consumer.

Note how the producer and consumer steps are interleaved. Items are processed as they are added.

The consumer doesn't wait for all items to be produced. It starts processing immediately. With multiple consumers, you can process many items concurrently while the producer is still adding more.

You can create multiple consumer or producer tasks all using the same queue. This is analogous to creating more "workers" to achieve higher concurrency.

_NUM_PRODUCERS = 4
_NUM_CONSUMERS = 2

async def main():
    queue = asyncio.Queue()

    consumers = [asyncio.create_task(consumer(queue)) for _ in range(_NUM_CONSUMERS)]
    producers = [asyncio.create_task(producer(queue)) for _ in range(_NUM_PRODUCERS)]

    await asyncio.gather(*producers)
    await queue.join()

    for c in consumers:
        c.cancel()
    await asyncio.gather(*consumers)

Important

The consumers loop forever waiting on new items. You need to cancel the consumers explicitly after the producers are done and the queue is empty.

Another option is to pass a "poison pill" or a None sentinel to notify the consumer to shut down instead of cancelling it explicitly. See the Request Batchers chapter for an example of using sentinel values to gracefully shut down pipeline stages.



The data pipeline in this challenge will have:

  1. An input reader task which reads input prompts from disk then adds them to the input queue.
  2. Multiple content generation tasks which get prompts from the input queue, call the Gemini API, then add responses to the output queue.
  3. An output writer task which gets the responses from the outputs queue then writes them to disk.

The queue decouples stages so they don't block each other. The content generation workers also process API calls concurrently so we can have multiple API calls inflight at the same time.

Step 0

To get started, get a Gemini API key from Google AI Studio. We use the Gemini API because it has a generous free tier, but any async model API will work.

export GEMINI_API_KEY="YOUR_API_KEY"

Step 1

In this step, your goal is to generate the input prompts.

Run the command below to create a new inputs/ directory. This directory will have 10 files, each with 3 prompts. This will simulate reading from sharded input files.

mkdir -p inputs

for shard in {0..9}; do for i in {0..2}; do echo "What is two times $((shard * 3 + i))?" >> inputs/shard_$shard.txt;
done; done

Verify that the input data was created:

ls -l inputs

>-rw-rw-r-- 1 user user 63 Nov 23 16:34 shard_0.txt
>-rw-rw-r-- 1 user user 63 Nov 23 16:34 shard_1.txt
>-rw-rw-r-- 1 user user 63 Nov 23 16:34 shard_2.txt
...

Also inspect the input files:

cat inputs/shard_4.txt 

> What is two times 12?
> What is two times 13?
> What is two times 14?

Step 2

In this step, your goal is to implement the input reader coroutine.

The input reader coroutine takes the input directory, reads the files in that directory, then adds the file contents to the input queue. Each line in the file is a separate prompt or input.

You can use aiofiles for async file operations. The input directory in our case is inputs/.

import aiofiles

async def read_inputs(input_dir: pathlib.Path, input_queue: asyncio.Queue):
    pass

Step 3

In this step, your goal is to implement the content generation coroutine.

This coroutine reads prompts from the input queue, makes requests to the Gemini API, then writes the response text to the output queue. The output queue should hold a (input, response) tuple.

See the solution to the LLM Responses chapter to see how to call the async Gemini API.

async def generate_content(
    client: genai.Client,
    input_queue: asyncio.Queue[str],
    output_queue: asyncio.Queue[tuple[str, str]],
):
    pass

Make sure to handle the API call failures. You can either skip failed inputs, retry them in the current task, or add them back to the input queue to be processed by another task.

Tip

To avoid hitting rate limits, you can sleep between requests or you can use the rate limiter we implemented in the Rate Limiters chapter.

Step 4

In this step, your goal is to implement the output writer coroutine.

This coroutine does the following:

  1. It creates an output shard file at the output directory.
  2. It gets responses from the output queue, then writes them to the output file until _SHARD_SIZE is reached.
  3. After the shard is full, we create a new shard and repeat.

You can use aiofiles again for file operations.

_SHARD_SIZE = 5

async def write_outputs(
    output_dir: pathlib.Path, output_queue: asyncio.Queue[tuple[str, str]]
):
    return 

Important

Make sure there is only one task writing to any given file at a time. Having multiple tasks write to the same file or shard can cause race conditions.

Step 5

In this step, your goal is to create the final consumer-producer pipeline, chaining all the coroutines you have implemented above.

You need to define two queues:

_QUEUE_SIZE = 5

async def main():
    inputs_queue = asyncio.Queue(maxsize=_QUEUE_SIZE)
    outputs_queue = asyncio.Queue(maxsize=_QUEUE_SIZE)

Set the maxsize parameter for these queues to implement backpressure. This prevents the reader from loading all prompts into memory at once and causing memory overflow. When a queue is full, await queue.put() will block until space is available.

You then need to create the tasks or workers:

  1. Create one input reader task.
  2. Create at least two content generation tasks.
  3. Create one output writer task.

Run your code and verify reading, generation, and writing are all interleaved. Verify that running the pipeline with multiple generate workers is faster than running it with one.

Going Further

  • This pipeline only has I/O operations so asyncio works well. But what if the pipeline has CPU-heavy operations? For example, what if you need to split and count the words in the model responses? You can try using loop.run_in_executor(executor, operation) and ProcessPoolExecutor to offload CPU heavy work to a separate process and avoid blocking the event loop.

  • In this challenge, we created a single reader and writer task since disk I/O is fast and the API calls were the bottleneck. However, if our reads/writes were slower (e.g., to a remote database or cloud storage), we could extend this pattern to have multiple concurrent reader/writer tasks.

Now take some time to attempt the challenge before looking at the solution!


Solution

Below is a walkthrough of one possible solution. Your implementation may differ, and that's okay!

First let's define all the imports and constants:

import asyncio
import pathlib

import aiofiles
from google import genai

_INPUTS_PATH = "inputs/"
_OUTPUTS_PATH = "outputs/"
_GENERATE_TASKS = 5
_SHARD_SIZE = 5
_QUEUE_SIZE = 5

Step 2 - Solution

async def read_inputs(input_dir: pathlib.Path, input_queue: asyncio.Queue[str]):
    for path in input_dir.iterdir():
        async with aiofiles.open(path) as f:
            async for prompt in f:
                if not prompt:
                    continue
                await input_queue.put(prompt)
                print(f"Enqueued prompt {prompt.strip()} from {path}")

Step 3 - Solution

async def generate_with_retry(
    client: genai.Client,
    input_queue: asyncio.Queue[str],
    output_queue: asyncio.Queue[tuple[str, str]],
):
    prompt = await input_queue.get()
    try:
        response = await client.aio.models.generate_content(
            model="gemini-flash-latest", contents=prompt
        )
        await output_queue.put((prompt, response.text))
        print(f"Generated content for prompt: {prompt.strip()}")
    except Exception as e:
        print(f"Error generating content for prompt {prompt.strip()}: {e}")
        # Re-queue the prompt for retry
        await input_queue.put(prompt)
    finally:
        input_queue.task_done()


async def generate_content(
    client: genai.Client,
    input_queue: asyncio.Queue[str],
    output_queue: asyncio.Queue[tuple[str, str]],
):
    try:
        while True:
            # Optionally wait between requests to avoid rate limiting.
            # await asyncio.sleep(0.15)
            await generate_with_retry(client, input_queue, output_queue)
    except asyncio.CancelledError:
        print("Shutting down content generation task...")

Step 4 - Solution

async def write_outputs(
    output_dir: pathlib.Path, output_queue: asyncio.Queue[tuple[str, str]]
):
    try:
        # Create parent directories if they don't exist
        output_dir.mkdir(parents=True, exist_ok=True)
        shard_index = 0
        while True:
            path = output_dir / f"shard_{shard_index}.txt"
            async with aiofiles.open(path, "w") as f:
                for _ in range(_SHARD_SIZE):
                    prompt, response = await output_queue.get()
                    await f.write(f"{prompt} - {response} \n")
                    print(f"Wrote {prompt.strip()} response to {path}")
                    output_queue.task_done()
            shard_index += 1
    except asyncio.CancelledError:
        print("Shutting down writer...")

Step 5 - Solution

async def main():
    client = genai.Client()
    input_queue = asyncio.Queue(maxsize=_QUEUE_SIZE)
    output_queue = asyncio.Queue(maxsize=_QUEUE_SIZE)

    input_dir = pathlib.Path(_INPUTS_PATH)
    output_dir = pathlib.Path(_OUTPUTS_PATH)

    reader_task = asyncio.create_task(read_inputs(input_dir, input_queue))
    generate_tasks = [
        asyncio.create_task(generate_content(client, input_queue, output_queue))
        for _ in range(_GENERATE_TASKS)
    ]
    writer_task = asyncio.create_task(write_outputs(output_dir, output_queue))

    # Wait until all the inputs are read and processed
    await reader_task
    await input_queue.join()

    # Cancel the generate tasks since all inputs have been processed.
    for task in generate_tasks:
        task.cancel()
    await asyncio.gather(*generate_tasks, return_exceptions=True)

    # Wait until all outputs are written.
    await output_queue.join()
    writer_task.cancel()
    await writer_task


asyncio.run(main())

Now let's run this and check the output:

Enqueued prompt What is two times 27? from inputs/shard_9.txt
Enqueued prompt What is two times 28? from inputs/shard_9.txt
Enqueued prompt What is two times 29? from inputs/shard_9.txt
Enqueued prompt What is two times 6? from inputs/shard_2.txt
Generated content for prompt: What is two times 28?
Enqueued prompt What is two times 20? from inputs/shard_6.txt
Wrote What is two times 28? response to outputs/shard_0.txt
Generated content for prompt: What is two times 27?
Enqueued prompt What is two times 24? from inputs/shard_8.txt
Wrote What is two times 27? response to outputs/shard_0.txt
...
Generated content for prompt: What is two times 0?
Wrote What is two times 0? response to outputs/shard_0.txt
Generated content for prompt: What is two times 13?
Wrote What is two times 13? response to outputs/shard_0.txt
Generated content for prompt: What is two times 1?
Wrote What is two times 1? response to outputs/shard_0.txt
Generated content for prompt: What is two times 2?
Shutting down content generation task...
Shutting down content generation task...
Shutting down content generation task...
Wrote What is two times 2? response to outputs/shard_0.txt
Shutting down writer...

Note how the file reads, API calls, and file writes are all interleaved. As the producers (file reader, content generator) add items to the queues, the consumers (content generator, writer) get items from the queues and process them concurrently.

While the API is processing prompt #1, we can be reading prompt #2 from disk and writing the response for prompt #0. Without concurrency, each stage would wait idle for the others to complete.

With 1 generator task this script takes ~20 seconds (sequential API calls), but with 5 generator tasks it takes ~6 seconds (concurrent API calls). Multiple workers can process the slow stage (API calls) concurrently, while the fast stages (file I/O) keep them fed with work.

Request Batchers

Challenge

In this challenge, you will build an async request batcher for the Gemini Embedding API.

Many applications require batching API calls for efficiency. Say you're building an application that requires generating embeddings for a stream of queries (e.g., a search engine).

You have two options:

  1. Generate embeddings one at a time - This is simple but each request to the embedding generation service incurs some network latency and overhead.
  2. Batch multiple requests together - Combine several inputs into a single request and minimize the network overhead.

In this challenge, you will batch requests based on two parameters:

  • batch_size - Maximum number of requests in a batch
  • timeout - Maximum time to wait to fill a batch

The batcher continuously reads from an input queue and creates a batch when the batch size is reached or when the timeout expires (resulting in a partial batch).

Info

In real systems, the batch size and timeout are tuned to balance efficiency and latency. Large batches are generally more efficient but waiting to fill them can hurt end-to-end latency.

We will be using producer-consumer queues in the challenge. See the Data Pipelines chapter to learn more about this pattern.

In this challenge, you will implement four async functions or coroutines:

  1. An input enqueuer that adds inputs to the input queue.
  2. A batcher that reads from the input queue, creates batches, then adds them to the batch queue.
  3. An embedding generator that reads from the batch queue, calls the Gemini Embedding API, then adds the output to the output queue.
  4. An output logger that reads from the outputs queue and prints the outputs.

Before you start

The following functions or classes are relevant for this chapter. It might be helpful to read their docs before you start:

  • asyncio.Queue() for creating async queues.
  • asyncio.wait_for() for implementing timeouts on async operations.
  • asyncio.get_running_loop().time() for tracking elapsed time.
  • queue.task_done() and queue.join() for tracking queue completion.
  • task.cancel() for canceling running tasks.

Step 0

To get started, get a Gemini API key from Google AI Studio. We use the Gemini API because it has a generous free tier.

export GEMINI_API_KEY="YOUR_API_KEY"

Step 1

In this step, your goal is to verify you can call the Gemini API to generate embeddings.

Install the Google GenAI SDK and make your first request. Write your code to script.py.

pip install -q -U google-genai
import asyncio

from google import genai

_NUM_REQUESTS = 10


async def main():
    client = genai.Client()
    contents = [f"Input: {i}" for i in range(_NUM_REQUESTS)]
    result = await client.aio.models.embed_content(
        model="gemini-embedding-001", contents=contents
    )

if __name__ == "__main__":
    asyncio.run(main())

Step 2

In this step, your goal is to implement the input enqueuer coroutine.

This coroutine simulates a stream of inputs arriving at variable times. It adds inputs to the input queue with random delays between each input. After all inputs are added, it sends a sentinel value (None) to signal the end of the input stream.

import random

_NUM_INPUTS = 100

async def enqueue_inputs(input_queue: asyncio.Queue[str | None]):
    for i in range(_NUM_INPUTS):
        await input_queue.put(f"input-{i}")
        # Simulate variable input arrival times
        await asyncio.sleep(random.uniform(0, 0.3))
    # Signal end of inputs with sentinel value
    await input_queue.put(None)

This simulates real-world scenarios where inputs don't arrive all at once but stream in over time. The sentinel value is a clean way to signal completion without needing to cancel tasks.

Step 3

In this step, your goal is to implement the Batcher class.

The batcher is the core of this challenge. It reads from the input queue and creates batches based on two conditions:

  1. Batch size reached - When we have collected batch_size inputs, send the batch immediately.
  2. Timeout expired - If the timeout expires before the batch fills, send a partial batch.
from datetime import timedelta

class Batcher:
    def __init__(self, batch_size: int, timeout: timedelta):
        self._batch_size = batch_size
        self._timeout = timeout.total_seconds()

    async def batch(
        self, input_queue: asyncio.Queue[str | None], batch_queue: asyncio.Queue[list[str] | None]
    ):
        pass

The batch() method should:

  1. Use a while True loop to continuously process inputs.
  2. Track the remaining timeout for the current batch. Use asyncio.get_running_loop().time() to get the current time.
  3. Use asyncio.wait_for(input_queue.get(), timeout) to wait for an input with a timeout. This will raise asyncio.TimeoutError if the timeout expires.
  4. If the batch size is reached or the timeout expires, add the batch to the batch queue.
  5. Check for the sentinel value (None). When received, send any remaining batch, then propagate the sentinel to the next queue and return.

Important

Make sure to update the remaining timeout after each input. As time elapses, the remaining timeout decreases. This ensures the batch times out at the correct time.

Step 4

In this step, your goal is to implement the embedding generator coroutine.

This coroutine reads batches from the batch queue, calls the Gemini Embedding API, then adds individual (content, embedding) pairs to the output queue.

from google.genai import types

async def embed_content(
    client: genai.Client,
    batch_queue: asyncio.Queue[list[str] | None],
    output_queue: asyncio.Queue[tuple[str, types.ContentEmbedding] | None],
):
    pass

The Gemini API accepts multiple contents in a single call and returns embeddings in the same order. You need to zip the batch contents with the returned embeddings then add them to the output queue individually. When you receive the sentinel value, propagate it to the output queue and return.

Step 5

In this step, your goal is to implement the output logger coroutine.

This coroutine reads from the output queue and prints each (content, embedding) pair. We could update this to write the results to a file, but we will just print the output here for simplicity.

async def log_outputs(
    output_queue: asyncio.Queue[tuple[str, types.ContentEmbedding] | None],
):
    pass

Check for the sentinel value and return when received.

Step 6

In this step, your goal is to chain all the coroutines together in the main function.

You need to create three queues:

async def main():
    input_queue = asyncio.Queue()
    batch_queue = asyncio.Queue()
    output_queue = asyncio.Queue()

Then create the tasks:

  1. Create the input enqueuer task.
  2. Create the batcher task with a batch size of 8 and timeout of 1 second.
  3. Create the embedding generator task.
  4. Create the output logger task.

After creating the tasks, you can wait for all of them to complete using asyncio.gather(). With sentinel values, tasks will shut down gracefully on their own without cancellation.

Run your code and verify that:

  • Batches are created when they reach size 8.
  • Partial batches are created when the timeout expires.
  • All inputs are processed even if the final batch is partial.

Going Further

  • Try experimenting with different batch sizes and timeouts. How does this affect the total runtime? What's the tradeoff between efficiency and latency?

  • Implement a dynamic batching strategy that adjusts the batch size based on input arrival rate. If inputs are arriving slowly, use smaller batches with shorter timeouts. If inputs are arriving quickly, use larger batches.

Now take some time to attempt the challenge before looking at the solution!


Solution

Below is a walkthrough of one possible solution. Your implementation may differ, and that's okay!

First let's define all the imports and type aliases:

import asyncio
import random
from datetime import timedelta
from typing import AsyncIterator, TypeAlias

from google import genai
from google.genai import types

_NUM_INPUTS = 100
_BATCH_SIZE = 8
_TIMEOUT_SECONDS = 1
_INPUT_DELAY_SECONDS = 0.3


Input: TypeAlias = str
InputBatch: TypeAlias = list[Input]

Step 2 - Solution

async def enqueue_inputs(input_queue: asyncio.Queue[Input | None]):
    for i in range(_NUM_INPUTS):
        await input_queue.put(f"input-{i}")
        # Simulate variable input arrival times
        await asyncio.sleep(random.uniform(0, _INPUT_DELAY_SECONDS))
    # Signal end of inputs with sentinel value
    await input_queue.put(None)
    print("All inputs enqueued.")

This coroutine adds inputs to the queue with random delays. The random delays simulate real-world scenarios where inputs arrive at variable rates. After all inputs are enqueued, we send a sentinel value (None) to signal that no more inputs are coming. This allows downstream consumers to shut down gracefully without explicit cancellation.

Step 3 - Solution

class Batcher:
    def __init__(self, batch_size: int, timeout: timedelta):
        self._batch_size = batch_size
        self._timeout = timeout.total_seconds()

    async def _batches(
        self, input_queue: asyncio.Queue[Input | None]
    ) -> AsyncIterator[InputBatch | None]:
        """Yields batches until sentinel received. Yields None as final value."""
        loop = asyncio.get_running_loop()

        while True:
            batch = []
            start = loop.time()
            timeout = self._timeout

            while True:
                try:
                    item = await asyncio.wait_for(input_queue.get(), timeout)
                except asyncio.TimeoutError:
                    elapsed = loop.time() - start
                    print(f"Batch timed out after {elapsed} seconds.")
                    break

                if item is None:
                    # Received sentinel value.
                    input_queue.task_done()
                    if batch:
                        yield batch
                    yield None
                    return

                batch.append(item)
                elapsed = loop.time() - start
                timeout = self._timeout - elapsed

                if len(batch) == self._batch_size:
                    print("Batch is full.")
                    break

            if batch:
                yield batch
                for _ in batch:
                    input_queue.task_done()

    async def batch(
        self,
        input_queue: asyncio.Queue[Input | None],
        batch_queue: asyncio.Queue[InputBatch | None],
    ):
        async for batch in self._batches(input_queue):
            if batch is None:
                # Propagate sentinel to next stage
                await batch_queue.put(None)
                print("Shutting down batcher...")
                return
            print(f"Batch size is {len(batch)}")
            await batch_queue.put(batch)

Some things to note:

  • We track the elapsed time and update the remaining timeout after each input. This ensures batches time out at the correct time regardless of when inputs arrive.
  • We use asyncio.wait_for() to implement the timeout. This raises asyncio.TimeoutError when the timeout expires.
  • We check for the sentinel value (None). When received, we send any remaining batch, propagate the sentinel to the next queue, and return. This gracefully shuts down the batcher without needing task cancellation.
  • We call input_queue.task_done() for each item in the batch to properly track queue completion.
  • Empty batches are skipped to avoid sending empty API requests.

Step 4 - Solution

async def embed_content(
    client: genai.Client,
    batch_queue: asyncio.Queue[InputBatch | None],
    output_queue: asyncio.Queue[tuple[Input, types.ContentEmbedding] | None],
):
    while True:
        batch = await batch_queue.get()

        if batch is None:
            # Received sentinel value.
            break

        result = await client.aio.models.embed_content(
            model="gemini-embedding-001",
            contents=batch,
            config=types.EmbedContentConfig(output_dimensionality=3),
        )
        if result.embeddings is None:
            raise ValueError("No embeddings returned from the API.")

        for content, embedding in zip(batch, result.embeddings):
            await output_queue.put((content, embedding))

        batch_queue.task_done()

    # Propagate sentinel to next stage
    await output_queue.put(None)
    batch_queue.task_done()
    print("Shutting down embedding task...")

The Gemini API returns embeddings in the same order as the input contents. We use zip() to pair each content with its embedding, then add them individually to the output queue. When we receive the sentinel value, we propagate it to the output queue and return.

Step 5 - Solution

async def log_outputs(
    output_queue: asyncio.Queue[tuple[Input, types.ContentEmbedding] | None],
):
    while True:
        output = await output_queue.get()

        if output is None:
            # Received sentinel value.
            break

        content, embedding = output
        print(f"Content: {content} -> Embedding: {embedding.values}")
        output_queue.task_done()

    output_queue.task_done()
    print("Shutting down writer...")

Step 6 - Solution

async def main():
    input_queue = asyncio.Queue()
    batch_queue = asyncio.Queue()
    output_queue = asyncio.Queue()

    client = genai.Client()
    batcher = Batcher(
        batch_size=_BATCH_SIZE, timeout=timedelta(seconds=_TIMEOUT_SECONDS)
    )

    enqueue_task = asyncio.create_task(enqueue_inputs(input_queue))
    batcher_task = asyncio.create_task(batcher.batch(input_queue, batch_queue))
    embed_task = asyncio.create_task(embed_content(client, batch_queue, output_queue))
    log_task = asyncio.create_task(log_outputs(output_queue))

    # Wait for all tasks to complete gracefully via sentinel values
    await asyncio.gather(enqueue_task, batcher_task, embed_task, log_task)


asyncio.run(main())

This is how the shutdown sequence works using sentinel values:

  1. The input enqueuer finishes adding all inputs, then sends a sentinel value (None) to the input queue.
  2. The batcher receives the sentinel, sends any remaining batch, propagates the sentinel to the batch queue, and returns.
  3. The embedding task receives the sentinel, propagates it to the output queue, and returns.
  4. The logger receives the sentinel and returns.
  5. All tasks complete naturally, and asyncio.gather() returns.

This approach is cleaner than explicit cancellation because each stage knows when to shut down based on the sentinel value.

Now let's run this and check the output:

...
Batch timed out after 1.0009840049997365 seconds.
Batch size is 5
Content: input-49 -> Embedding: [-0.011579741, -0.010578066, 0.018000955]
Content: input-50 -> Embedding: [-0.004875405, -3.5398414e-05, 0.019322932]
Content: input-51 -> Embedding: [-0.014710601, -0.005792096, 0.0061859917]
Content: input-52 -> Embedding: [-0.01397081, -0.0063333134, 0.01519548]
Content: input-53 -> Embedding: [0.00022102315, -0.010211905, 0.012408208]
Batch is full.
Batch size is 8
Content: input-54 -> Embedding: [-0.005042672, -0.010983077, 0.011013798]
Content: input-55 -> Embedding: [-0.013474228, -0.0002503009, 0.021384422]
Content: input-56 -> Embedding: [-0.007262096, -0.012684701, 0.02125588]
Content: input-57 -> Embedding: [-0.007679552, -0.0016124722, 0.014986247]
Content: input-58 -> Embedding: [-0.0013965481, -0.016043654, 0.015830107]
Content: input-59 -> Embedding: [0.0011211497, 0.0062395, 0.022803845]
Content: input-60 -> Embedding: [-0.005757582, -0.019661691, 0.012294045]
Content: input-61 -> Embedding: [-0.009954969, -0.030515866, 0.0075060125]
....
Batch timed out after 1.0012599780002347 seconds.
Batch size is 4
Content: input-90 -> Embedding: [-0.0027523814, -0.015859226, 0.017288134]
Content: input-91 -> Embedding: [-0.0006344775, -0.019411083, 0.00752806]
Content: input-92 -> Embedding: [-0.010482627, -0.002565886, 0.028898504]
Content: input-93 -> Embedding: [0.0059372373, -0.015417972, 0.016691986]
All inputs enqueued.
Batch timed out after 1.0011447710003267 seconds.
Batch size is 6
Shutting down batcher...
Content: input-94 -> Embedding: [-0.010927118, -0.014871426, 0.01482102]
Content: input-95 -> Embedding: [-0.0030360888, -0.007296145, 0.009159293]
Content: input-96 -> Embedding: [-0.01636001, -0.0077388645, 0.015982967]
Content: input-97 -> Embedding: [-0.0011909363, -0.01127491, 0.013533601]
Content: input-98 -> Embedding: [-0.005317036, -0.024050364, 0.010049778]
Content: input-99 -> Embedding: [-0.010439553, 0.0002079097, 0.027198879]
Shutting down embedding task...
Shutting down writer...

Note how:

  • Some batches reach size 8 and are sent immediately (marked as "Batch is full").
  • Other batches timeout after 1 second and are sent as partial batches.
  • Inputs, batching, API calls, and logging are all interleaved and happening concurrently.
  • The batcher automatically adapts to the input arrival rate, creating full batches when inputs arrive quickly and partial batches when they arrive slowly.