Introduction
If you've used the OpenAI or Gemini API, you've probably seen asyncio throughout the API docs. You might've copied async/await without understanding what it does. You might've wondered if it actually matters.
It matters.
asyncio is a library for writing concurrent code in Python. You can use asyncio to make concurrent model API calls, sending 100s of requests concurrently instead of one at a time.
Making API calls is a common use case of asyncio, but you can also use it to build way cooler things like rate limiters, producer-consumer pipelines, and web crawlers. This guide is about building those cool things.
How this guide is structured
This guide walks through six real-world applications of asyncio:
- LLM Responses — Call LLM APIs concurrently without blocking
- Rate Limiters — Control API request throughput to stay within rate limits
- Data Pipelines — Process large datasets with producer-consumer pipelines
- Request Batchers — Batch API requests for efficiency
- Web Crawlers — Efficiently crawl the web and parse web pages
- Function-Calling Agents — Build agents that execute tools concurrently
Each section follows a challenge-solution format inspired by John Crickett's Coding Challenges.
Try to solve each challenge yourself first. Research shows that productive failure leads to deeper learning, even if you struggle and can't come up with the solution.
About me
Hi! I'm Abdul. I build infrastructure for Gemini fine-tuning and batch inference at Google. I care about making AI development easier, faster, and more accessible.
Check out my blog at abdulsaleh.dev.
Source Code
You can find the markdown source code for this guide on github. It was compiled in Rust using mdbook.
The Basics
The best way to learn asyncio is by doing.
asyncio lets you write concurrent code in Python. When you make API calls or read files, your program usually waits. With asyncio, you can do other work while waiting. This makes your code faster.
Each chapter has a "Before you start" section that lists the relevant asyncio functions and classes. You can learn about these as you go.
If you want to understand how asyncio works first, take a look at this tutorial or read the official conceptual overview.
For syntax and API reference, see the official asyncio documentation.
Async LLM Responses
Challenge
When working with LLMs, you often need to make API calls to generate responses for multiple prompts or user requests. Making these calls synchronously means waiting for each response before sending the next, which is slow and inefficient.
In this challenge, you'll use asyncio to send concurrent requests to the Gemini API. You'll start with a basic synchronous script, measure its performance, then use asyncio to write a faster async solution.
Before you start
The following functions or classes are relevant for this chapter. It might be helpful to read their docs before you start:
asyncio.gather()for waiting on running tasks.
Step 0
To get started, get a Gemini API key from Google AI Studio. We use the Gemini API because it has a generous free tier, but any async model API will work.
export GEMINI_API_KEY="YOUR_API_KEY"
Step 1
In this step, your goal is to verify you can call the Gemini API.
Install the Google GenAI SDK and make your first request. Write your code to script.py.
pip install -q -U google-genai
from google import genai
client = genai.Client()
response = client.models.generate_content(
model="gemini-flash-latest", contents="Why do some birds migrate?"
)
Step 2
In this step, your goal is send multiple requests to the Gemini API and time your code.
Run generate_content() in a loop with five iterations. Time your script. How long does it take to make five requests?
time python script.py
Step 3
In this step, your goal is to make 5 concurrent requests using the async Gemini API:
response = await client.aio.models.generate_content(
model="gemini-flash-latest", contents="Why do some birds migrate?"
)
Use await asyncio.gather() to wait on the responses.
Now run and time your code. How long does it take?
Step 4
What happens if you increase the number of requests? At what point do you hit rate limits?
Going Further
- See the Rate Limiters chapter to build your own rate limiter and avoid hitting rate limits.
- Creating
asynciotasks does not scale indefinitely. If you create tens of thousands of tasks you can run out of memory or see worse performance due to scheduling overhead. If you have a large number of requests, it's better to use a producer-consumer queue and a fixed number of consumer tasks. See the Data Pipelines to learn how to do this.
Now take some time to attempt the challenge before looking at the solution!
Solution
Below is a walkthrough of one possible solution. Your implementation may differ, and that's okay! The key concepts are using async/await with the Gemini API and asyncio.gather() for concurrency.
Step 2 - Solution
Let's call generate_content() in a loop:
from google import genai
from google.genai.types import GenerateContentResponse
_NUM_REQUESTS = 5
def generate_content(index: int, client: genai.Client) -> GenerateContentResponse:
response = client.models.generate_content(
model="gemini-flash-latest", contents="Why do some birds migrate?"
)
print(f"Request {index} completed")
return response
def main():
client = genai.Client()
results = [generate_content(i, client) for i in range(_NUM_REQUESTS)]
if __name__ == "__main__":
main()
This takes about 30 seconds to run, including the time to connect to the client.
time python script.py
> Request 0 completed
> Request 1 completed
> Request 2 completed
> Request 3 completed
> Request 4 completed
>
> real 0m32.134s
> user 0m1.733s
> sys 0m0.204s
Step 3 - Solution
Now let's define some tasks and use asyncio.gather() to run them concurrently:
import asyncio
from google import genai
from google.genai.types import GenerateContentResponse
_NUM_REQUESTS = 5
async def generate_content(index: int, client: genai.Client) -> GenerateContentResponse:
response = await client.aio.models.generate_content(
model="gemini-flash-latest", contents="Why do some birds migrate?"
)
print(f"Request {index} completed")
return response
async def main():
client = genai.Client()
tasks = [generate_content(i, client) for i in range(_NUM_REQUESTS)]
results = await asyncio.gather(*tasks)
if __name__ == "__main__":
asyncio.run(main())
We only create the client once as we only need to establish one client connection.
This takes ~10 seconds to run, about three times faster than before. The requests run concurrently and don't return in the same order.
time python script.py
> Request 3 completed
> Request 4 completed
> Request 1 completed
> Request 0 completed
> Request 2 completed
>
> real 0m8.460s
> user 0m1.860s
> sys 0m0.240s
Step 4 - Solution
If we increase _NUM_REQUESTS = 20 we quickly hit the rate limit.
google.genai.errors.ClientError: 429 RESOURCE_EXHAUSTED. {'error': {'code': 429,
'message': 'You exceeded your current quota, please check your plan and billing details....
See the Rate Limiters chapter to build your own rate limiter and avoid hitting rate limits.
Rate Limiters
Challenge
Model provider APIs have strict rate limits. It's very easy to exceed these limits if you're making requests concurrently without throttling.
In this challenge, you will build a sliding window rate limiter for calling async APIs.
A sliding window rate limiter works by keeping a queue of the most recent N requests within a given time window.
Before processing a new request, the rate limiter removes any requests that fall outside the time window, then checks if the number of remaining requests is below the limit. If so, the new request is added to the queue and processed. Otherwise, the limiter waits until old requests have aged out of the window.
Before you start
The following functions or classes are relevant for this chapter. It might be helpful to read their docs before you start:
asyncio.gather()for waiting on running tasks.asyncio.get_running_loop().time()for getting the current time in a coroutine.asyncio.Lock()for protecting shared resources.asyncio.sleep()for waiting in a coroutine.
Step 0
To get started, get a Gemini API key from Google AI Studio. We use the Gemini API because it has a generous free tier, but any async model API will work.
export GEMINI_API_KEY="YOUR_API_KEY"
Step 1
In this step, your goal is to make concurrent requests to the Gemini API and hit the rate limits.
Create a new script (script.py) that makes 20 concurrent requests to the Gemini API then run your script. The solution to the LLM Responses chapter explains how to do this.
Confirm you get a resource exhausted error:
python script.py
> google.genai.errors.ClientError: 429 RESOURCE_EXHAUSTED. {'error': {'code': 429,
'message': 'You exceeded your current quota, please check your plan and billing details.
For more information on this error, head to: https://ai.google.dev/gemini-api/docs/rate-limits. ...
At the time of writing, Gemini 2.5 Flash had a limit fo 10 requests per minute on the free tier.
Step 2
In this step, your goal is to implement the sliding window rate limiter.
Below is skeleton code for a RateLimiter class. You need to implement the acquire() method:
import asyncio
from collections import deque
from datetime import timedelta
class RateLimiter:
def __init__(self, limit: int, interval: timedelta):
self._limit = limit
self._interval = interval.total_seconds()
# Holds request timestamps.
self._window = deque()
async def acquire(self) -> None:
"""Wait until a new request can be made under the rate limit."""
pass
acquire() will be awaited until requests can be made under the rate limit:
async def generate_content(client, rate_limiter):
# Waits until we are under the rate limit.
await rate_limiter.acquire()
response = await client.aio.models.generate_content(
model="gemini-flash-latest", contents="Why do some birds migrate?"
)
return response
The acquire() method should:
- Acquire a lock to prevent race conditions when making changes to the
window. You can create the lock usingasyncio.Lock(). - Use
asyncio.get_running_loop().time()to get the current time in seconds. - Remove old requests from
windowto ensure it only has requests that were made within the pastintervalseconds. - If the window has fewer than
limitrequests, the request is allowed. Add the current request time to thewindowand return. - If the limit is reached, calculate how long to wait for the oldest request to age out of the window, then sleep with
asyncio.sleep(). - Retry the above steps in a
whileloop.
Step 3
In this step, your goal is to test your rate limiter.
Update your concurrent code to call await limiter.acquire() before making requests.
Verify that the rate limiter delays requests to avoid hitting the Gemini API rate limits.
Going Further
- Try implementing other rate limiting algorithms like token bucket. This will require keeping track of "tokens" and replenishing them in every iteration at a fixed rate.
- Implement a sliding window rate limiter that avoids busy-waiting and respects request order. The rate limiter should not use
whileloops orasyncio.sleep(). When the limit is reached, create a future withloop.create_future()and add it to a waiters queue, then await it. When a request is sent, useloop.call_later(interval, callback)to schedule a callback that will wake up the next waiter from the futures queue. Effectively, every allowed requests reserves a slot that expires inintervalseconds when the callback is called and unblocks the next waiter in line and lets the next request through.
Now take some time to attempt the challenge before looking at the solution!
Solution
Below is a walkthrough of one possible solution. Your implementation may differ, and that's okay!
Step 1 - Solution
See the LLM Responses solution to make concurrent requests to the Gemini API. Increase _NUM_REQUESTS = 20 to trigger the rate limit error.
Step 2 - Solution
import asyncio
from collections import deque
from datetime import timedelta
class RateLimiter:
def __init__(self, limit: int, interval: timedelta):
self._limit = limit
self._interval = interval.total_seconds()
self._window = deque()
self._lock = asyncio.Lock()
def _prune_window(self, now: float) -> None:
"""Removes requests that have aged out of the time window."""
while self._window and now - self._window[0] > self._interval:
self._window.popleft()
async def acquire(self) -> None:
loop = asyncio.get_running_loop()
while True:
async with self._lock:
now = loop.time()
self._prune_window(now)
if len(self._window) < self._limit:
# We have space in the sliding window to send a request.
self._window.append(now)
return
# Wait for the oldest request to age out of the window.
oldest_request_time = self._window[0]
elapsed = now - oldest_request_time
remaining = self._interval - elapsed
await asyncio.sleep(remaining)
Note how:
_lockprevents race conditions when multiple tasks callacquire()simultaneously_prune_window()removes requests outside the sliding window- We release the lock before sleeping to allow other tasks to check the rate limit
This solution suffers from the "thundering heard" problem. If multiple tasks are sleeping, all of them will wake up at the same time to try to acquire the lock. Only one request will be allowed, and the remaining tasks will need to sleep again.
One way to avoid this problem is to implement the rate limiter using futures as described in the Going Further section.
Step 3 - Solution
Now let's integrate the rate limiter with our Gemini API calls:
import asyncio
from datetime import datetime, timedelta
from google import genai
_NUM_REQUESTS = 20
class RateLimiter:
# Same as above.
...
async def generate_content(index, client, rate_limiter):
await rate_limiter.acquire()
print(f"Request {index} sent at {datetime.now().strftime('%H:%M:%S')}")
response = await client.aio.models.generate_content(
model="gemini-flash-latest", contents="Why do some birds migrate?"
)
return response
async def main():
# Gemini Flash Latest has a rate limit of 10 requests per minute.
limiter = RateLimiter(limit=10, interval=timedelta(minutes=1))
client = genai.Client()
tasks = [generate_content(i, client, limiter) for i in range(_NUM_REQUESTS)]
results = await asyncio.gather(*tasks)
if __name__ == "__main__":
asyncio.run(main())
Now when we run this with 20 requests, it completes successfully without hitting rate limits:
time python script.py
> Request 0 sent at 22:31:10
> Request 1 sent at 22:31:10
> Request 2 sent at 22:31:10
> Request 3 sent at 22:31:10
> Request 4 sent at 22:31:10
> Request 5 sent at 22:31:10
> Request 6 sent at 22:31:10
> Request 7 sent at 22:31:10
> Request 8 sent at 22:31:10
> Request 9 sent at 22:31:10
# Note how we wait one minute before sending the 11th request to stay
# within the rate limit.
> Request 19 sent at 22:32:10
> Request 18 sent at 22:32:10
> Request 17 sent at 22:32:10
> Request 16 sent at 22:32:10
> Request 15 sent at 22:32:10
> Request 14 sent at 22:32:10
> Request 12 sent at 22:32:10
> Request 11 sent at 22:32:10
> Request 13 sent at 22:32:10
> Request 10 sent at 22:32:10
>
> real 1m9.061s
> user 0m2.072s
> sys 0m0.402s
The first 10 requests are allowed immediately, then the rate limiter automatically pauses until enough time has passed to send the next batch. All 20 requests succeed without any resource exhausted errors.
Data Pipelines
Challenge
In this challenge, you will build a concurrent data pipeline. The pipeline will read some prompts from disk, call the Gemini API, then write the model responses back to disk.
Reading/writing files and sending API requests are I/O operations so asyncio is a great fit for speeding up this pipeline. Instead of waiting on disk I/O or model responses, you can be doing work concurrently to save time.
You can also extend this pipeline to do CPU-bound work. See the Going Further section if you want to learn how.
Before you start
The following functions or classes are relevant for this chapter. It might be helpful to read their docs before you start:
asyncio.gather()for waiting on running tasks.asyncio.Queue()for creating async queues.queue.task_done()andqueue.join()for tracking queue completion.task.cancel()for canceling running tasks.aiofiles.open()for async file operations.
Producer-Consumer Queues
You will be using a producer-consumer queue in this challenge, which is a key design pattern for concurrent code.
A queue allows different tasks to communicate with each other. A producer task can add items to a queue, and a consumer task can process these items as they are added.
Queues are useful when you have:
- Different pipeline stages: You can interleave reading/writing files while making API calls. You're not stuck waiting on any one stage.
- Streaming data: You can add data to the input queue and it will get processed as it is added.
- Multiple workers per stage: You can have multiple tasks working concurrently which is faster (see the LLM Responses chapter.)
- Backpressure control: Limiting the queue sizes prevents memory overflow if producers are faster than consumers.
Take a look at this example:
import asyncio
_NUM_ITEMS = 3
async def producer(queue: asyncio.Queue):
for i in range(_NUM_ITEMS):
print(f"Adding item {i}")
await queue.put(i)
# Pass control back to the event loop so the consumer can run.
await asyncio.sleep(0.1)
async def consumer(queue: asyncio.Queue):
try:
while True:
item = await queue.get()
print(f"Processing the item {item}")
queue.task_done()
except asyncio.CancelledError:
print("Shutting down consumer.")
async def main():
queue = asyncio.Queue()
consumer_task = asyncio.create_task(consumer(queue))
producer_task = asyncio.create_task(producer(queue))
# Wait until all the producers are done.
# This ensures all items have been added to the queue.
await producer_task
# Wait until the queue is empty.
# .put() and .task_done() must be called an equal number of times.
await queue.join()
# Cancel the consumers since the queue is empty.
consumer_task.cancel()
await consumer_task
if __name__ == "__main__":
asyncio.run(main())
The code above produces the following output:
> Adding item 0
> Processing the item 0
> Adding item 1
> Processing the item 1
> Adding item 2
> Processing the item 2
> Shutting down consumer.
Note how the producer and consumer steps are interleaved. Items are processed as they are added.
The consumer doesn't wait for all items to be produced. It starts processing immediately. With multiple consumers, you can process many items concurrently while the producer is still adding more.
You can create multiple consumer or producer tasks all using the same queue. This is analogous to creating more "workers" to achieve higher concurrency.
_NUM_PRODUCERS = 4
_NUM_CONSUMERS = 2
async def main():
queue = asyncio.Queue()
consumers = [asyncio.create_task(consumer(queue)) for _ in range(_NUM_CONSUMERS)]
producers = [asyncio.create_task(producer(queue)) for _ in range(_NUM_PRODUCERS)]
await asyncio.gather(*producers)
await queue.join()
for c in consumers:
c.cancel()
await asyncio.gather(*consumers)
The consumers loop forever waiting on new items. You need to cancel the consumers explicitly after the producers are done and the queue is empty.
Another option is to pass a "poison pill" or a None sentinel to notify the consumer to shut down instead of cancelling it explicitly. See the Request Batchers chapter for an example of using sentinel values to gracefully shut down pipeline stages.
The data pipeline in this challenge will have:
- An input reader task which reads input prompts from disk then adds them to the input queue.
- Multiple content generation tasks which get prompts from the input queue, call the Gemini API, then add responses to the output queue.
- An output writer task which gets the responses from the outputs queue then writes them to disk.
The queue decouples stages so they don't block each other. The content generation workers also process API calls concurrently so we can have multiple API calls inflight at the same time.
Step 0
To get started, get a Gemini API key from Google AI Studio. We use the Gemini API because it has a generous free tier, but any async model API will work.
export GEMINI_API_KEY="YOUR_API_KEY"
Step 1
In this step, your goal is to generate the input prompts.
Run the command below to create a new inputs/ directory. This directory will have 10 files, each with 3 prompts. This will simulate reading from sharded input files.
mkdir -p inputs
for shard in {0..9}; do for i in {0..2}; do echo "What is two times $((shard * 3 + i))?" >> inputs/shard_$shard.txt;
done; done
Verify that the input data was created:
ls -l inputs
>-rw-rw-r-- 1 user user 63 Nov 23 16:34 shard_0.txt
>-rw-rw-r-- 1 user user 63 Nov 23 16:34 shard_1.txt
>-rw-rw-r-- 1 user user 63 Nov 23 16:34 shard_2.txt
...
Also inspect the input files:
cat inputs/shard_4.txt
> What is two times 12?
> What is two times 13?
> What is two times 14?
Step 2
In this step, your goal is to implement the input reader coroutine.
The input reader coroutine takes the input directory, reads the files in that directory, then adds the file contents to the input queue. Each line in the file is a separate prompt or input.
You can use aiofiles for async file operations. The input directory in our case is inputs/.
import aiofiles
async def read_inputs(input_dir: pathlib.Path, input_queue: asyncio.Queue):
pass
Step 3
In this step, your goal is to implement the content generation coroutine.
This coroutine reads prompts from the input queue, makes requests to the Gemini API, then writes the response text to the output queue. The output queue should hold a (input, response) tuple.
See the solution to the LLM Responses chapter to see how to call the async Gemini API.
async def generate_content(
client: genai.Client,
input_queue: asyncio.Queue[str],
output_queue: asyncio.Queue[tuple[str, str]],
):
pass
Make sure to handle the API call failures. You can either skip failed inputs, retry them in the current task, or add them back to the input queue to be processed by another task.
To avoid hitting rate limits, you can sleep between requests or you can use the rate limiter we implemented in the Rate Limiters chapter.
Step 4
In this step, your goal is to implement the output writer coroutine.
This coroutine does the following:
- It creates an output shard file at the output directory.
- It gets responses from the output queue, then writes them to the output file until
_SHARD_SIZEis reached. - After the shard is full, we create a new shard and repeat.
You can use aiofiles again for file operations.
_SHARD_SIZE = 5
async def write_outputs(
output_dir: pathlib.Path, output_queue: asyncio.Queue[tuple[str, str]]
):
return
Make sure there is only one task writing to any given file at a time. Having multiple tasks write to the same file or shard can cause race conditions.
Step 5
In this step, your goal is to create the final consumer-producer pipeline, chaining all the coroutines you have implemented above.
You need to define two queues:
_QUEUE_SIZE = 5
async def main():
inputs_queue = asyncio.Queue(maxsize=_QUEUE_SIZE)
outputs_queue = asyncio.Queue(maxsize=_QUEUE_SIZE)
Set the maxsize parameter for these queues to implement backpressure. This prevents the reader from loading all prompts into memory at once and causing memory overflow. When a queue is full, await queue.put() will block until space is available.
You then need to create the tasks or workers:
- Create one input reader task.
- Create at least two content generation tasks.
- Create one output writer task.
Run your code and verify reading, generation, and writing are all interleaved. Verify that running the pipeline with multiple generate workers is faster than running it with one.
Going Further
-
This pipeline only has I/O operations so
asyncioworks well. But what if the pipeline has CPU-heavy operations? For example, what if you need to split and count the words in the model responses? You can try usingloop.run_in_executor(executor, operation)andProcessPoolExecutorto offload CPU heavy work to a separate process and avoid blocking the event loop. -
In this challenge, we created a single reader and writer task since disk I/O is fast and the API calls were the bottleneck. However, if our reads/writes were slower (e.g., to a remote database or cloud storage), we could extend this pattern to have multiple concurrent reader/writer tasks.
Now take some time to attempt the challenge before looking at the solution!
Solution
Below is a walkthrough of one possible solution. Your implementation may differ, and that's okay!
First let's define all the imports and constants:
import asyncio
import pathlib
import aiofiles
from google import genai
_INPUTS_PATH = "inputs/"
_OUTPUTS_PATH = "outputs/"
_GENERATE_TASKS = 5
_SHARD_SIZE = 5
_QUEUE_SIZE = 5
Step 2 - Solution
async def read_inputs(input_dir: pathlib.Path, input_queue: asyncio.Queue[str]):
for path in input_dir.iterdir():
async with aiofiles.open(path) as f:
async for prompt in f:
if not prompt:
continue
await input_queue.put(prompt)
print(f"Enqueued prompt {prompt.strip()} from {path}")
Step 3 - Solution
async def generate_with_retry(
client: genai.Client,
input_queue: asyncio.Queue[str],
output_queue: asyncio.Queue[tuple[str, str]],
):
prompt = await input_queue.get()
try:
response = await client.aio.models.generate_content(
model="gemini-flash-latest", contents=prompt
)
await output_queue.put((prompt, response.text))
print(f"Generated content for prompt: {prompt.strip()}")
except Exception as e:
print(f"Error generating content for prompt {prompt.strip()}: {e}")
# Re-queue the prompt for retry
await input_queue.put(prompt)
finally:
input_queue.task_done()
async def generate_content(
client: genai.Client,
input_queue: asyncio.Queue[str],
output_queue: asyncio.Queue[tuple[str, str]],
):
try:
while True:
# Optionally wait between requests to avoid rate limiting.
# await asyncio.sleep(0.15)
await generate_with_retry(client, input_queue, output_queue)
except asyncio.CancelledError:
print("Shutting down content generation task...")
Step 4 - Solution
async def write_outputs(
output_dir: pathlib.Path, output_queue: asyncio.Queue[tuple[str, str]]
):
try:
# Create parent directories if they don't exist
output_dir.mkdir(parents=True, exist_ok=True)
shard_index = 0
while True:
path = output_dir / f"shard_{shard_index}.txt"
async with aiofiles.open(path, "w") as f:
for _ in range(_SHARD_SIZE):
prompt, response = await output_queue.get()
await f.write(f"{prompt} - {response} \n")
print(f"Wrote {prompt.strip()} response to {path}")
output_queue.task_done()
shard_index += 1
except asyncio.CancelledError:
print("Shutting down writer...")
Step 5 - Solution
async def main():
client = genai.Client()
input_queue = asyncio.Queue(maxsize=_QUEUE_SIZE)
output_queue = asyncio.Queue(maxsize=_QUEUE_SIZE)
input_dir = pathlib.Path(_INPUTS_PATH)
output_dir = pathlib.Path(_OUTPUTS_PATH)
reader_task = asyncio.create_task(read_inputs(input_dir, input_queue))
generate_tasks = [
asyncio.create_task(generate_content(client, input_queue, output_queue))
for _ in range(_GENERATE_TASKS)
]
writer_task = asyncio.create_task(write_outputs(output_dir, output_queue))
# Wait until all the inputs are read and processed
await reader_task
await input_queue.join()
# Cancel the generate tasks since all inputs have been processed.
for task in generate_tasks:
task.cancel()
await asyncio.gather(*generate_tasks, return_exceptions=True)
# Wait until all outputs are written.
await output_queue.join()
writer_task.cancel()
await writer_task
asyncio.run(main())
Now let's run this and check the output:
Enqueued prompt What is two times 27? from inputs/shard_9.txt
Enqueued prompt What is two times 28? from inputs/shard_9.txt
Enqueued prompt What is two times 29? from inputs/shard_9.txt
Enqueued prompt What is two times 6? from inputs/shard_2.txt
Generated content for prompt: What is two times 28?
Enqueued prompt What is two times 20? from inputs/shard_6.txt
Wrote What is two times 28? response to outputs/shard_0.txt
Generated content for prompt: What is two times 27?
Enqueued prompt What is two times 24? from inputs/shard_8.txt
Wrote What is two times 27? response to outputs/shard_0.txt
...
Generated content for prompt: What is two times 0?
Wrote What is two times 0? response to outputs/shard_0.txt
Generated content for prompt: What is two times 13?
Wrote What is two times 13? response to outputs/shard_0.txt
Generated content for prompt: What is two times 1?
Wrote What is two times 1? response to outputs/shard_0.txt
Generated content for prompt: What is two times 2?
Shutting down content generation task...
Shutting down content generation task...
Shutting down content generation task...
Wrote What is two times 2? response to outputs/shard_0.txt
Shutting down writer...
Note how the file reads, API calls, and file writes are all interleaved. As the producers (file reader, content generator) add items to the queues, the consumers (content generator, writer) get items from the queues and process them concurrently.
While the API is processing prompt #1, we can be reading prompt #2 from disk and writing the response for prompt #0. Without concurrency, each stage would wait idle for the others to complete.
With 1 generator task this script takes ~20 seconds (sequential API calls), but with 5 generator tasks it takes ~6 seconds (concurrent API calls). Multiple workers can process the slow stage (API calls) concurrently, while the fast stages (file I/O) keep them fed with work.
Request Batchers
Challenge
In this challenge, you will build an async request batcher for the Gemini Embedding API.
Many applications require batching API calls for efficiency. Say you're building an application that requires generating embeddings for a stream of queries (e.g., a search engine).
You have two options:
- Generate embeddings one at a time - This is simple but each request to the embedding generation service incurs some network latency and overhead.
- Batch multiple requests together - Combine several inputs into a single request and minimize the network overhead.
In this challenge, you will batch requests based on two parameters:
batch_size- Maximum number of requests in a batchtimeout- Maximum time to wait to fill a batch
The batcher continuously reads from an input queue and creates a batch when the batch size is reached or when the timeout expires (resulting in a partial batch).
In real systems, the batch size and timeout are tuned to balance efficiency and latency. Large batches are generally more efficient but waiting to fill them can hurt end-to-end latency.
We will be using producer-consumer queues in the challenge. See the Data Pipelines chapter to learn more about this pattern.
In this challenge, you will implement four async functions or coroutines:
- An input enqueuer that adds inputs to the input queue.
- A batcher that reads from the input queue, creates batches, then adds them to the batch queue.
- An embedding generator that reads from the batch queue, calls the Gemini Embedding API, then adds the output to the output queue.
- An output logger that reads from the outputs queue and prints the outputs.
Before you start
The following functions or classes are relevant for this chapter. It might be helpful to read their docs before you start:
asyncio.Queue()for creating async queues.asyncio.wait_for()for implementing timeouts on async operations.asyncio.get_running_loop().time()for tracking elapsed time.queue.task_done()andqueue.join()for tracking queue completion.task.cancel()for canceling running tasks.
Step 0
To get started, get a Gemini API key from Google AI Studio. We use the Gemini API because it has a generous free tier.
export GEMINI_API_KEY="YOUR_API_KEY"
Step 1
In this step, your goal is to verify you can call the Gemini API to generate embeddings.
Install the Google GenAI SDK and make your first request. Write your code to script.py.
pip install -q -U google-genai
import asyncio
from google import genai
_NUM_REQUESTS = 10
async def main():
client = genai.Client()
contents = [f"Input: {i}" for i in range(_NUM_REQUESTS)]
result = await client.aio.models.embed_content(
model="gemini-embedding-001", contents=contents
)
if __name__ == "__main__":
asyncio.run(main())
Step 2
In this step, your goal is to implement the input enqueuer coroutine.
This coroutine simulates a stream of inputs arriving at variable times. It adds inputs to the input queue with random delays between each input. After all inputs are added, it sends a sentinel value (None) to signal the end of the input stream.
import random
_NUM_INPUTS = 100
async def enqueue_inputs(input_queue: asyncio.Queue[str | None]):
for i in range(_NUM_INPUTS):
await input_queue.put(f"input-{i}")
# Simulate variable input arrival times
await asyncio.sleep(random.uniform(0, 0.3))
# Signal end of inputs with sentinel value
await input_queue.put(None)
This simulates real-world scenarios where inputs don't arrive all at once but stream in over time. The sentinel value is a clean way to signal completion without needing to cancel tasks.
Step 3
In this step, your goal is to implement the Batcher class.
The batcher is the core of this challenge. It reads from the input queue and creates batches based on two conditions:
- Batch size reached - When we have collected
batch_sizeinputs, send the batch immediately. - Timeout expired - If the timeout expires before the batch fills, send a partial batch.
from datetime import timedelta
class Batcher:
def __init__(self, batch_size: int, timeout: timedelta):
self._batch_size = batch_size
self._timeout = timeout.total_seconds()
async def batch(
self, input_queue: asyncio.Queue[str | None], batch_queue: asyncio.Queue[list[str] | None]
):
pass
The batch() method should:
- Use a
while Trueloop to continuously process inputs. - Track the remaining timeout for the current batch. Use
asyncio.get_running_loop().time()to get the current time. - Use
asyncio.wait_for(input_queue.get(), timeout)to wait for an input with a timeout. This will raiseasyncio.TimeoutErrorif the timeout expires. - If the batch size is reached or the timeout expires, add the batch to the batch queue.
- Check for the sentinel value (
None). When received, send any remaining batch, then propagate the sentinel to the next queue and return.
Make sure to update the remaining timeout after each input. As time elapses, the remaining timeout decreases. This ensures the batch times out at the correct time.
Step 4
In this step, your goal is to implement the embedding generator coroutine.
This coroutine reads batches from the batch queue, calls the Gemini Embedding API, then adds individual (content, embedding) pairs to the output queue.
from google.genai import types
async def embed_content(
client: genai.Client,
batch_queue: asyncio.Queue[list[str] | None],
output_queue: asyncio.Queue[tuple[str, types.ContentEmbedding] | None],
):
pass
The Gemini API accepts multiple contents in a single call and returns embeddings in the same order. You need to zip the batch contents with the returned embeddings then add them to the output queue individually. When you receive the sentinel value, propagate it to the output queue and return.
Step 5
In this step, your goal is to implement the output logger coroutine.
This coroutine reads from the output queue and prints each (content, embedding) pair. We could update this to write the results to a file, but we will just print the output here for simplicity.
async def log_outputs(
output_queue: asyncio.Queue[tuple[str, types.ContentEmbedding] | None],
):
pass
Check for the sentinel value and return when received.
Step 6
In this step, your goal is to chain all the coroutines together in the main function.
You need to create three queues:
async def main():
input_queue = asyncio.Queue()
batch_queue = asyncio.Queue()
output_queue = asyncio.Queue()
Then create the tasks:
- Create the input enqueuer task.
- Create the batcher task with a batch size of 8 and timeout of 1 second.
- Create the embedding generator task.
- Create the output logger task.
After creating the tasks, you can wait for all of them to complete using asyncio.gather(). With sentinel values, tasks will shut down gracefully on their own without cancellation.
Run your code and verify that:
- Batches are created when they reach size 8.
- Partial batches are created when the timeout expires.
- All inputs are processed even if the final batch is partial.
Going Further
-
Try experimenting with different batch sizes and timeouts. How does this affect the total runtime? What's the tradeoff between efficiency and latency?
-
Implement a dynamic batching strategy that adjusts the batch size based on input arrival rate. If inputs are arriving slowly, use smaller batches with shorter timeouts. If inputs are arriving quickly, use larger batches.
Now take some time to attempt the challenge before looking at the solution!
Solution
Below is a walkthrough of one possible solution. Your implementation may differ, and that's okay!
First let's define all the imports and type aliases:
import asyncio
import random
from datetime import timedelta
from typing import AsyncIterator, TypeAlias
from google import genai
from google.genai import types
_NUM_INPUTS = 100
_BATCH_SIZE = 8
_TIMEOUT_SECONDS = 1
_INPUT_DELAY_SECONDS = 0.3
Input: TypeAlias = str
InputBatch: TypeAlias = list[Input]
Step 2 - Solution
async def enqueue_inputs(input_queue: asyncio.Queue[Input | None]):
for i in range(_NUM_INPUTS):
await input_queue.put(f"input-{i}")
# Simulate variable input arrival times
await asyncio.sleep(random.uniform(0, _INPUT_DELAY_SECONDS))
# Signal end of inputs with sentinel value
await input_queue.put(None)
print("All inputs enqueued.")
This coroutine adds inputs to the queue with random delays. The random delays simulate real-world scenarios where inputs arrive at variable rates. After all inputs are enqueued, we send a sentinel value (None) to signal that no more inputs are coming. This allows downstream consumers to shut down gracefully without explicit cancellation.
Step 3 - Solution
class Batcher:
def __init__(self, batch_size: int, timeout: timedelta):
self._batch_size = batch_size
self._timeout = timeout.total_seconds()
async def _batches(
self, input_queue: asyncio.Queue[Input | None]
) -> AsyncIterator[InputBatch | None]:
"""Yields batches until sentinel received. Yields None as final value."""
loop = asyncio.get_running_loop()
while True:
batch = []
start = loop.time()
timeout = self._timeout
while True:
try:
item = await asyncio.wait_for(input_queue.get(), timeout)
except asyncio.TimeoutError:
elapsed = loop.time() - start
print(f"Batch timed out after {elapsed} seconds.")
break
if item is None:
# Received sentinel value.
input_queue.task_done()
if batch:
yield batch
yield None
return
batch.append(item)
elapsed = loop.time() - start
timeout = self._timeout - elapsed
if len(batch) == self._batch_size:
print("Batch is full.")
break
if batch:
yield batch
for _ in batch:
input_queue.task_done()
async def batch(
self,
input_queue: asyncio.Queue[Input | None],
batch_queue: asyncio.Queue[InputBatch | None],
):
async for batch in self._batches(input_queue):
if batch is None:
# Propagate sentinel to next stage
await batch_queue.put(None)
print("Shutting down batcher...")
return
print(f"Batch size is {len(batch)}")
await batch_queue.put(batch)
Some things to note:
- We track the elapsed time and update the remaining timeout after each input. This ensures batches time out at the correct time regardless of when inputs arrive.
- We use
asyncio.wait_for()to implement the timeout. This raisesasyncio.TimeoutErrorwhen the timeout expires. - We check for the sentinel value (
None). When received, we send any remaining batch, propagate the sentinel to the next queue, and return. This gracefully shuts down the batcher without needing task cancellation. - We call
input_queue.task_done()for each item in the batch to properly track queue completion. - Empty batches are skipped to avoid sending empty API requests.
Step 4 - Solution
async def embed_content(
client: genai.Client,
batch_queue: asyncio.Queue[InputBatch | None],
output_queue: asyncio.Queue[tuple[Input, types.ContentEmbedding] | None],
):
while True:
batch = await batch_queue.get()
if batch is None:
# Received sentinel value.
break
result = await client.aio.models.embed_content(
model="gemini-embedding-001",
contents=batch,
config=types.EmbedContentConfig(output_dimensionality=3),
)
if result.embeddings is None:
raise ValueError("No embeddings returned from the API.")
for content, embedding in zip(batch, result.embeddings):
await output_queue.put((content, embedding))
batch_queue.task_done()
# Propagate sentinel to next stage
await output_queue.put(None)
batch_queue.task_done()
print("Shutting down embedding task...")
The Gemini API returns embeddings in the same order as the input contents. We use zip() to pair each content with its embedding, then add them individually to the output queue. When we receive the sentinel value, we propagate it to the output queue and return.
Step 5 - Solution
async def log_outputs(
output_queue: asyncio.Queue[tuple[Input, types.ContentEmbedding] | None],
):
while True:
output = await output_queue.get()
if output is None:
# Received sentinel value.
break
content, embedding = output
print(f"Content: {content} -> Embedding: {embedding.values}")
output_queue.task_done()
output_queue.task_done()
print("Shutting down writer...")
Step 6 - Solution
async def main():
input_queue = asyncio.Queue()
batch_queue = asyncio.Queue()
output_queue = asyncio.Queue()
client = genai.Client()
batcher = Batcher(
batch_size=_BATCH_SIZE, timeout=timedelta(seconds=_TIMEOUT_SECONDS)
)
enqueue_task = asyncio.create_task(enqueue_inputs(input_queue))
batcher_task = asyncio.create_task(batcher.batch(input_queue, batch_queue))
embed_task = asyncio.create_task(embed_content(client, batch_queue, output_queue))
log_task = asyncio.create_task(log_outputs(output_queue))
# Wait for all tasks to complete gracefully via sentinel values
await asyncio.gather(enqueue_task, batcher_task, embed_task, log_task)
asyncio.run(main())
This is how the shutdown sequence works using sentinel values:
- The input enqueuer finishes adding all inputs, then sends a sentinel value (
None) to the input queue. - The batcher receives the sentinel, sends any remaining batch, propagates the sentinel to the batch queue, and returns.
- The embedding task receives the sentinel, propagates it to the output queue, and returns.
- The logger receives the sentinel and returns.
- All tasks complete naturally, and
asyncio.gather()returns.
This approach is cleaner than explicit cancellation because each stage knows when to shut down based on the sentinel value.
Now let's run this and check the output:
...
Batch timed out after 1.0009840049997365 seconds.
Batch size is 5
Content: input-49 -> Embedding: [-0.011579741, -0.010578066, 0.018000955]
Content: input-50 -> Embedding: [-0.004875405, -3.5398414e-05, 0.019322932]
Content: input-51 -> Embedding: [-0.014710601, -0.005792096, 0.0061859917]
Content: input-52 -> Embedding: [-0.01397081, -0.0063333134, 0.01519548]
Content: input-53 -> Embedding: [0.00022102315, -0.010211905, 0.012408208]
Batch is full.
Batch size is 8
Content: input-54 -> Embedding: [-0.005042672, -0.010983077, 0.011013798]
Content: input-55 -> Embedding: [-0.013474228, -0.0002503009, 0.021384422]
Content: input-56 -> Embedding: [-0.007262096, -0.012684701, 0.02125588]
Content: input-57 -> Embedding: [-0.007679552, -0.0016124722, 0.014986247]
Content: input-58 -> Embedding: [-0.0013965481, -0.016043654, 0.015830107]
Content: input-59 -> Embedding: [0.0011211497, 0.0062395, 0.022803845]
Content: input-60 -> Embedding: [-0.005757582, -0.019661691, 0.012294045]
Content: input-61 -> Embedding: [-0.009954969, -0.030515866, 0.0075060125]
....
Batch timed out after 1.0012599780002347 seconds.
Batch size is 4
Content: input-90 -> Embedding: [-0.0027523814, -0.015859226, 0.017288134]
Content: input-91 -> Embedding: [-0.0006344775, -0.019411083, 0.00752806]
Content: input-92 -> Embedding: [-0.010482627, -0.002565886, 0.028898504]
Content: input-93 -> Embedding: [0.0059372373, -0.015417972, 0.016691986]
All inputs enqueued.
Batch timed out after 1.0011447710003267 seconds.
Batch size is 6
Shutting down batcher...
Content: input-94 -> Embedding: [-0.010927118, -0.014871426, 0.01482102]
Content: input-95 -> Embedding: [-0.0030360888, -0.007296145, 0.009159293]
Content: input-96 -> Embedding: [-0.01636001, -0.0077388645, 0.015982967]
Content: input-97 -> Embedding: [-0.0011909363, -0.01127491, 0.013533601]
Content: input-98 -> Embedding: [-0.005317036, -0.024050364, 0.010049778]
Content: input-99 -> Embedding: [-0.010439553, 0.0002079097, 0.027198879]
Shutting down embedding task...
Shutting down writer...
Note how:
- Some batches reach size 8 and are sent immediately (marked as "Batch is full").
- Other batches timeout after 1 second and are sent as partial batches.
- Inputs, batching, API calls, and logging are all interleaved and happening concurrently.
- The batcher automatically adapts to the input arrival rate, creating full batches when inputs arrive quickly and partial batches when they arrive slowly.
Web Crawlers
Challenge
In this challenge, you will build a concurrent web crawler with asyncio. AI models often train on data scraped from the web so efficient web crawlers are important for collecting training data at scale.
Instead of fetching pages one at a time, the crawler will processes requests concurrently which is more efficient.
The crawler will:
- Maintain a queue of URLs to visit
- Use multiple worker tasks to fetch and parse pages concurrently
- Track visited URLs to avoid duplicates, extract URLs from each page, and add them to the queue
- Output parsed pages to a separate queue for downstream processing
Before you start
The following functions or classes are relevant for this chapter. It might be helpful to read their docs before you start:
asyncio.Queue()for managing the crawl queueasyncio.Lock()for protecting shared stateasyncio.create_task()for spawning worker taskstask.cancel()for gracefully shutting down workersqueue.task_done()andqueue.join()for tracking completionaiohttp.ClientSession()for making async HTTP requestsBeautifulSoupfrombs4for parsing HTML
Step 0
To get started, install the required packages:
pip install aiohttp beautifulsoup4
We'll use web-scraping.dev (https://web-scraping.dev/products) since it's designed for web scraping practice. But you can also use this web crawler for scraping other sites (e.g. Wikipedia).
Step 1
In this step, your goal is to implement a coroutine that fetches and parses a web page.
First, create a ParsedPage dataclass to hold the results:
from dataclasses import dataclass
@dataclass
class ParsedPage:
"""Parsed page with html and child URLs found on that page."""
url: str
html: str
child_urls: set[str]
Then implement the fetch and parse function:
import aiohttp
async def fetch_and_parse(url: str, session: aiohttp.ClientSession) -> ParsedPage:
"""Fetch a page and extract links."""
pass
This function should:
- Fetch the page using
session.get(url). - Get the HTML text with
await response.text() - Extract the child urls that appear on that page.
- Return a
ParsedPagewith the URL, HTML, and child URLs.
Always reuse the same aiohttp.ClientSession across requests. Creating a new session for each request is slow and inefficient. Create the session once and pass it to all worker tasks.
You can use this helper function for extracting the child urls:
from urllib.parse import urljoin, urlparse
from bs4 import BeautifulSoup
def extract_urls(html: str, url: str) -> set[str]:
"""Extracts URLs from the given page."""
soup = BeautifulSoup(html, "html.parser")
parsed_base = urlparse(url)
base_domain = parsed_base.netloc
child_urls = set()
for anchor in soup.find_all("a", href=True):
href = str(anchor["href"])
absolute_url = urljoin(url, href)
parsed = urlparse(absolute_url)
# Only keep URLs from the same domain
if parsed.netloc != base_domain:
continue
child_urls.add(absolute_url)
return child_urls
Step 3
In this step, your goal is to implement the WebCrawler class with its worker tasks.
The crawler uses breadth-first search, so it needs to maintain a queue of urls to visit, a set of visited urls, and a lock to protect the set:
import asyncio
class WebCrawler:
def __init__(self, seed_urls: list[str], max_pages: int):
self._max_pages = max_pages
self._crawl_queue = asyncio.Queue()
self._visited = set()
self._lock = asyncio.Lock()
# Add seed URLs to the queue and mark as visited
for url in seed_urls:
self._crawl_queue.put_nowait(url)
self._visited.add(url)
async def _crawl_task(
self,
parsed_queue: asyncio.Queue[ParsedPage | None],
session: aiohttp.ClientSession,
) -> None:
pass
async def crawl(
self, num_workers: int, parsed_queue: asyncio.Queue[ParsedPage | None]
) -> None:
pass
We initialize the crawl queue and visited set with the seed urls. The worker tasks can then start reading pages from the crawl queue.
Implementing the worker task
The _crawl_task() method is what actually executes the breadth-first search. Each crawl task:
- Gets a URL from the crawl queue
- Fetches and parses the page
- Adds the parsed result to the output
parsed_queue - Extracts child URLs and adds new ones to the crawl queue if not already visited.
The crawl task gets new urls from the crawl queue in a loop until the queue is empty. We should stop adding new pages after we've visited max_pages.
Implementing the main crawl method
The crawl() method should:
- Initialize the shared client session.
- Spawn multiple worker tasks
- Wait for the crawl queue to be empty
- Cancel all workers gracefully
Step 4
In this step, your goal is to implement the output logger.
The output logger reads parsed pages from the output queue and processes them. In a real system, you might save the HTML to disk or a database, but for this challenge, we can just print the results.
async def log_results(parsed_queue: asyncio.Queue[ParsedPage | None]):
pass
Step 5
Now tie everything together in the main function. Create and run the logger and crawler tasks.
async def main():
urls = ["https://web-scraping.dev/products"]
crawler = WebCrawler(seed_urls=urls, max_pages=1000)
parsed_queue = asyncio.Queue()
log_task = asyncio.create_task(log_results(parsed_queue))
await crawler.crawl(num_workers=10, parsed_queue=parsed_queue)
# Optionally pass sentinel value to shut down logger.
await parsed_queue.put(None)
await log_task
asyncio.run(main())
Run your crawler and verify that:
- Multiple pages are fetched concurrently
- Duplicate URLs are not visited
- The crawler stops after reaching the max page limit or visiting all links
- Using 1 worker is slower than using multiple.
Going Further
-
Add a
max_depthlimit to control the depth of the search. -
Add request throttling per domain to be polite to servers. Use the rate limiter from the Rate Limiters chapter. For an added challenge, use separate rate limiters for each domain.
-
The
extract_urlsfunction call andBeautifulSoupparsing is not async so it blocks the event loop. Try usingloop.run_in_executor()to extract the URLs. Does it affect performance? Why or why not?
Now take some time to attempt the challenge before looking at the solution!
Solution
Below is a walkthrough of one possible solution. Your implementation may differ, and that's okay!
First let's define all the imports and constants:
import asyncio
from dataclasses import dataclass
from urllib.parse import urljoin, urlparse
import aiohttp
from bs4 import BeautifulSoup
_NUM_WORKERS = 10
_MAX_PAGES = 1000
Step 1 - Solution
@dataclass
class ParsedPage:
"""Parsed page with html and child URLs found on that page."""
url: str
html: str
child_urls: set[str]
def extract_urls(html: str, url: str) -> set[str]:
"""Extracts URLs from the given page."""
soup = BeautifulSoup(html, "html.parser")
parsed_base = urlparse(url)
base_domain = parsed_base.netloc
child_urls = set()
for anchor in soup.find_all("a", href=True):
href = str(anchor["href"])
absolute_url = urljoin(url, href)
parsed = urlparse(absolute_url)
# Only keep URLs from the same domain
if parsed.netloc != base_domain:
continue
child_urls.add(absolute_url)
return child_urls
def parse_page(html: str, url: str) -> ParsedPage:
"""Extract links from HTML and filter to same-domain URLs."""
return ParsedPage(
url=url,
html=html,
child_urls=extract_urls(html, url),
)
async def fetch_and_parse(url: str, session: aiohttp.ClientSession) -> ParsedPage:
"""Fetch a page and extract links."""
async with session.get(url) as response:
html = await response.text()
parsed_page = parse_page(html, url)
return parsed_page
Step 3 - Solution
class WebCrawler:
def __init__(self, seed_urls: list[str], max_pages: int):
self._max_pages = max_pages
self._crawl_queue = asyncio.Queue()
self._visited = set()
self._lock = asyncio.Lock()
for url in seed_urls:
self._crawl_queue.put_nowait(url)
self._visited.add(url)
async def crawl(
self, num_workers: int, parsed_queue: asyncio.Queue[ParsedPage | None]
) -> None:
async with aiohttp.ClientSession() as session:
tasks = [
asyncio.create_task(self._crawl_task(parsed_queue, session))
for _ in range(num_workers)
]
await self._wait_until_done(tasks)
async def _wait_until_done(self, crawl_tasks: list[asyncio.Task]) -> None:
"""Waits for crawling to complete and shuts down tasks."""
await self._crawl_queue.join()
for task in crawl_tasks:
task.cancel()
await asyncio.gather(*crawl_tasks, return_exceptions=True)
async def _crawl_task(
self,
parsed_queue: asyncio.Queue[ParsedPage | None],
session: aiohttp.ClientSession,
) -> None:
while True:
try:
url = await self._crawl_queue.get()
except asyncio.CancelledError:
print("Shutting down crawl task...")
return
# Parse the page and add it to the output queue.
try:
parsed_page = await fetch_and_parse(url, session)
await parsed_queue.put(parsed_page)
print(f"Parsed {parsed_page.url}.")
async with self._lock:
for url in parsed_page.child_urls:
if url in self._visited:
continue
if len(self._visited) >= self._max_pages:
break
self._visited.add(url)
await self._crawl_queue.put(url)
except Exception as e:
print(f"Error fetching {url}: {e}")
finally:
self._crawl_queue.task_done()
The crawl() method creates an aiohttp.ClientSession that's shared across all workers. This enables connection pooling and is more efficient than creating a new session per request.
The _crawl_task() method implements the breadth-first search. Note that:
- The
while Trueloop runs forever until the task is cancelled. - We catch
asyncio.CancelledErrorwhen getting from the queue to handle graceful shutdown. See the Data Pipelines chapter for a similar example of producer-consumer queues. - The lock protects the visited set. Multiple workers could try to add the same URL simultaneously without the lock.
- We check the max pages limit inside the lock to ensure we don't exceed it.
- The
finallyblock ensurestask_done()is called even if an error occurs, which is important soqueue.join()can work correctly.
The _wait_until_done() method waits for the queue to be empty with queue.join(), then cancels all workers.
Step 4 - Solution
async def log_results(parsed_queue: asyncio.Queue[ParsedPage | None]) -> None:
while True:
parsed_page = await parsed_queue.get()
if parsed_page is None:
parsed_queue.task_done()
print("Shutting down writer...")
return
print(f"Logged {parsed_page.url} with {len(parsed_page.html)} chars.")
parsed_queue.task_done()
Step 5 - Solution
async def main():
urls = [
"https://web-scraping.dev/products",
]
crawler = WebCrawler(seed_urls=urls, max_pages=_MAX_PAGES)
parsed_queue = asyncio.Queue()
log_task = asyncio.create_task(log_results(parsed_queue))
await crawler.crawl(num_workers=_NUM_WORKERS, parsed_queue=parsed_queue)
# Stop writer after all crawlers complete.
await parsed_queue.put(None)
await log_task
asyncio.run(main())
Now let's run this and check the output:
python script.py
...
Parsed https://web-scraping.dev/products?category=household.
Parsed https://web-scraping.dev/products?category=consumables.
Parsed https://web-scraping.dev/cart.
Parsed https://web-scraping.dev/testimonials.
Logged https://web-scraping.dev/sitemap.xml with 6689 chars.
Logged https://web-scraping.dev/file-download with 19476 chars.
Logged https://web-scraping.dev/products?category=household with 12582 chars.
Logged https://web-scraping.dev/products?category=consumables with 17133 chars.
Logged https://web-scraping.dev/cart with 11856 chars.
Logged https://web-scraping.dev/testimonials with 29975 chars.
Parsed https://web-scraping.dev/products?page=5.
Parsed https://web-scraping.dev/login.
Parsed https://web-scraping.dev/products?page=2.
Parsed https://web-scraping.dev/product/3.
Parsed https://web-scraping.dev/product/5.
...
Logged https://web-scraping.dev/products?category=household&page=3 with 8848 chars.
Parsed https://web-scraping.dev/products?category=consumables&page=5.
Logged https://web-scraping.dev/products?category=consumables&page=5 with 8848 chars.
Shutting down crawl task...
Shutting down crawl task...
Shutting down crawl task...
Shutting down crawl task...
Shutting down crawl task...
Shutting down crawl task...
Shutting down crawl task...
Shutting down crawl task...
Shutting down crawl task...
Shutting down crawl task...
Shutting down writer...
Note how:
- Multiple pages are fetched and logged concurrently.
- All workers shut down gracefully when the queue is empty.
This script takes about 12s to run with 1 worker and 4s to run with 5 workers as pages are fetched concurrently.
Function-Calling Agents
Challenge
Function-calling allows you to connect models to external tools and APIs. The model can decide when and how to call specific functions to interact with real world.
In this challenge, you will build an async function-calling agent. The agent will have access to the OpenAlex API to search for academic works and answer research questions.
The agent will:
- Stream model response
- Execute non-blocking function calls as soon as they are detected in the stream
- Make multiple function calls concurrently
- Continue the conversation loop until the model provides a final answer
An async function-calling agent is more responsive and efficient because it can interleave the API calls with the streamed model responses (not blocking on the full model response), and it can make multiple API calls can happen concurrently (not blocking on any API call).
Before you start
The following functions or classes are relevant for this chapter. It might be helpful to read their docs before you start:
client.aio.models.generate_content_stream()for streaming model responses- Function calling API for understanding how to define tools and handle function calls
asyncio.create_task()for spawning concurrent search tasksasyncio.gather()for waiting on the tasks
Step 0
To get started, get a Gemini API key from Google AI Studio. We use the Gemini API because it has a generous free tier and function-calling support.
export GEMINI_API_KEY="YOUR_API_KEY"
Install the required packages:
pip install google-genai aiohttp
Step 1
In this step, your goal is to implement the OpenAlex search function.
The search function should query the OpenAlex API and return academic works matching the search query. OpenAlex returns abstracts in an inverted index format, so you'll need to parse them.
import aiohttp
async def search_openalex(
search: str, session: aiohttp.ClientSession
) -> dict[str, Any]:
"""Searches OpenAlex for works matching the search query."""
pass
You can use these helper functions:
OPENALEX_API_URL = "https://api.openalex.org/works"
MAX_RESULTS_PER_PAGE = 10
async def fetch_openalex_works(
session: aiohttp.ClientSession, params: dict[str, Any]
) -> dict[str, Any]:
"""Fetches works from the OpenAlex API based on the given parameters."""
async with session.get(OPENALEX_API_URL, params=params) as response:
response.raise_for_status()
return await response.json()
def parse_abstract(abstract_inverted_index: dict[str, list[int]]) -> str:
"""Parses the abstract from the inverted index format."""
if not abstract_inverted_index:
return ""
index_to_word = {}
for word, positions in abstract_inverted_index.items():
for pos in positions:
index_to_word[pos] = word
abstract_words = [index_to_word.get(i, " ") for i in range(len(index_to_word))]
return " ".join(abstract_words)
def update_abstracts(data: dict[str, Any] | None) -> None:
"""Parses and updates abstracts in the OpenAlex response data."""
if not data or "results" not in data:
return
for result in data["results"]:
if "abstract_inverted_index" in result:
result["abstract"] = parse_abstract(result["abstract_inverted_index"])
del result["abstract_inverted_index"]
Your search_openalex() function should:
- Build the query parameters including the search term, sorting, and field selection
- Fetch the results from OpenAlex
- Parse the abstracts using
update_abstracts() - Return the data
The fields you should select are: id, title, display_name, publication_year, publication_date, cited_by_count, primary_location, abstract_inverted_index, and authorships.
Step 2
In this step, your goal is to define the function specification for the model.
The model needs to know what functions are available and how to call them. You'll define this as a function declaration:
search_openalex_spec = {
"name": "search_openalex",
"description": "Searches the OpenAlex API to retrieve academic works.",
"parameters": {
"type": "object",
"properties": {
"search": {
"type": "string",
"description": (
"Search term (1-10 words optimal). "
"Use AND, OR, NOT (UPPERCASE) for boolean searches. "
"Use quotes for exact phrases."
),
},
},
"required": ["search"],
},
}
Step 3
In this step, your goal is to implement the main generation loop with streaming.
The key insight here is that you don't need to wait for the entire model response before executing function calls. You can start executing searches as soon as function calls are detected in the stream.
Implement the generate_content() function:
async def generate_content(
question: str,
client: genai.Client,
config: types.GenerateContentConfig,
session: aiohttp.ClientSession,
) -> None:
"""Generate content with streaming and parallel function execution."""
pass
This function should:
- Initialize the conversation history with the user's question
- Enter a loop that:
- Streams the model response using
client.aio.models.generate_content_stream() - Processes each chunk as it arrives
- Collects function calls and spawns search tasks immediately
- Prints text content as it streams
- Waits for all search tasks to complete after the stream finishes
- Adds function responses back to the conversation history
- Continues the loop until no more function calls are made
- Streams the model response using
You'll need to track both the model's response parts (for conversation history) and any search tasks that are spawned during streaming. When you detect a function call in a part, create a task immediately with asyncio.create_task() and add it to a list.
Step 4
In this step, your goal is to create the system instruction and main function.
The system instruction guides the model on how to use the search function effectively. It should encourage the model to:
- Make multiple parallel searches (up to 10 at a time)
- Iterate through multiple rounds of searches
- Only provide a final answer after at least 3 rounds
You can use this system instruction:
_SYSTEM_INSTRUCTION = """
You are a research assistant that queries the OpenAlex academic database to
answer research questions.
Your task is to generate strategic search queries, analyze the abstracts,
then provide a concise cited answer.
You have access to the search_openalex function which searches academic
works in OpenAlex.
## Query Strategy
1. **Start broad** with general search terms
2. **Make up to 10 search function calls in parallel** to explore the topic
3. **Iterate with multiple rounds of searches** based on what you learn
4. **Boolean searches**: Use AND, OR, NOT (must be UPPERCASE)
5. **Exact phrases**: Use double quotes
## CRITICAL: Multi-Round Research Process
**YOU MUST COMPLETE AT LEAST 3 ROUNDS OF SEARCHES BEFORE PROVIDING YOUR FINAL ANSWER.**
Each round should:
- Make up to 10 parallel search function calls
- Analyze the abstracts returned
- Identify new keywords, related concepts, or gaps in coverage
- Use insights to formulate better queries for the next round
**DO NOT provide your final answer after just one round.**
## Important Constraints
- **You only have access to titles and abstracts** - not full papers
- Only cite information explicitly stated in abstracts
- Search terms use automatic stemming (e.g., "possums" matches "possum")
- Keep queries concise (1-10 words work best)
## Final Answer Format
**Only after completing at least 3 rounds of searches**, provide your
answer in this exact format:
**Final answer:** [1-2 paragraphs with hyperlinked citations]
### Requirements:
- Start with "Final answer:"
- **Strictly 1-2 paragraphs maximum**
- Include hyperlinked citations: `[Author et al., Year](https://openalex.org/W1234567890)`
- Use OpenAlex link format: `https://openalex.org/W1234567890` (NOT the API URL)
- Synthesize findings across papers
- Briefly acknowledge you're working from abstracts only
- No headers, bullet points, or extended formatting
### Example:
Final answer: Based on available abstracts, microplastics accumulate in marine organisms through multiple pathways [Thompson et al., 2020](https://openalex.org/W3012345678), with bioaccumulation rates varying by species. Filter feeders show particularly high concentrations [Garcia & Lee, 2021](https://openalex.org/W3123456789), and transfer through trophic levels has been documented [Chen et al., 2022](https://openalex.org/W4012345670), suggesting impacts on apex predators.
This analysis is based on article abstracts; detailed methodologies require consulting full papers. The literature indicates widespread microplastic contamination across marine environments [Martinez et al., 2023](https://openalex.org/W4123456781).
## Workflow
1. Understand the user's research question
2. **Round 1**: Generate up to 10 broad search function calls in parallel
3. Review abstracts and identify key papers, terminology, and knowledge gaps
4. **Round 2**: Generate up to 10 refined search queries based on Round 1
5. Analyze new abstracts and identify additional angles or specific subtopics
6. **Round 3+**: Continue iterating with targeted searches until comprehensive
7. **Only then** provide final answer in required format with citations
"""
Now implement the main function:
async def main():
client = genai.Client()
config = types.GenerateContentConfig(
tools=[types.Tool(function_declarations=[search_openalex_spec])],
system_instruction=_SYSTEM_INSTRUCTION,
)
question = "How does the gut microbiome influence mental health?"
async with aiohttp.ClientSession() as session:
await generate_content(question, client, config, session)
asyncio.run(main())
Run your agent and verify that:
- Model responses stream as they are generated
- Multiple searches run in parallel when the model makes multiple function calls
- The conversation continues for multiple rounds
- The model provides a final cited answer
Going Further
-
Modify the system instruction to support different research workflows (e.g., finding papers by a specific author, or papers published in a specific year range).
-
Add support for other function calls like fetching full paper metadata or citation graphs.
Now take some time to attempt the challenge before looking at the solution!
Solution
Below is a walkthrough of one possible solution. Your implementation may differ, and that's okay! The key concepts are streaming function calls and executing them in parallel.
First let's define all the imports and constants:
import asyncio
from typing import Any
import aiohttp
from google import genai
from google.genai import types
# API Configuration
OPENALEX_API_URL = "https://api.openalex.org/works"
MAX_RESULTS_PER_PAGE = 10
Step 1 - Solution
async def fetch_openalex_works(
session: aiohttp.ClientSession, params: dict[str, Any]
) -> dict[str, Any]:
"""Fetches works from the OpenAlex API based on the given parameters."""
async with session.get(OPENALEX_API_URL, params=params) as response:
response.raise_for_status()
return await response.json()
def parse_abstract(abstract_inverted_index: dict[str, list[int]]) -> str:
"""Parses the abstract from the inverted index format."""
if not abstract_inverted_index:
return ""
index_to_word = {}
for word, positions in abstract_inverted_index.items():
for pos in positions:
index_to_word[pos] = word
abstract_words = [index_to_word.get(i, " ") for i in range(len(index_to_word))]
return " ".join(abstract_words)
def update_abstracts(data: dict[str, Any] | None) -> None:
"""Parses and updates abstracts in the OpenAlex response data."""
if not data or "results" not in data:
return
for result in data["results"]:
if "abstract_inverted_index" in result:
result["abstract"] = parse_abstract(result["abstract_inverted_index"])
del result["abstract_inverted_index"]
async def search_openalex(
search: str, session: aiohttp.ClientSession
) -> dict[str, Any]:
"""Searches OpenAlex for works matching the search query."""
params = {
"search": search,
"sort": "relevance_score:desc",
"select": ",".join(
[
"id",
"title",
"display_name",
"publication_year",
"publication_date",
"cited_by_count",
"primary_location",
"abstract_inverted_index",
"authorships",
]
),
"per_page": MAX_RESULTS_PER_PAGE,
"mailto": "your@email.com",
}
data = await fetch_openalex_works(session, params)
update_abstracts(data)
return data
Step 2 - Solution
search_openalex_spec = {
"name": "search_openalex",
"description": "Searches the OpenAlex API to retrieve academic works.",
"parameters": {
"type": "object",
"properties": {
"search": {
"type": "string",
"description": (
"Search term (1-10 words optimal). "
"Use AND, OR, NOT (UPPERCASE) for boolean searches. "
"Use quotes for exact phrases."
),
},
},
"required": ["search"],
},
}
Step 3 - Solution
The key to this solution is executing function calls as they stream in, rather than waiting for the complete response.
async def generate_content(
question: str,
client: genai.Client,
config: types.GenerateContentConfig,
session: aiohttp.ClientSession,
) -> None:
"""Generate content with streaming and parallel function execution."""
# Initialize conversation history
contents = [question]
while True:
model_parts = []
search_tasks = []
print("\n--- Streaming response ---")
async for chunk in await client.aio.models.generate_content_stream(
model="gemini-flash-latest",
contents=contents,
config=config,
):
if not chunk.candidates:
continue
candidate = chunk.candidates[0]
if not candidate.content or not candidate.content.parts:
continue
for part in candidate.content.parts:
# Collect all parts for history
model_parts.append(part)
# Print any text content as it arrives
if part.text:
print(part.text, end="", flush=True)
# Execute function calls immediately as they are streamed
if part.function_call:
print(f"\n[Function call detected: {part.function_call.name}]")
print(f"[Arguments: {part.function_call.args}]")
# Start the search task immediately mid-stream
task = asyncio.create_task(
search_openalex(**part.function_call.args, session=session)
)
search_tasks.append(task)
# Add model's response to conversation history
if model_parts:
contents.append(types.Content(role="model", parts=model_parts))
if not search_tasks:
# No more function calls to execute
print("\n--- Conversation complete ---")
return
print(f"\n\n[Started {len(search_tasks)} search tasks]")
search_results = await asyncio.gather(*search_tasks)
print("\n\n[Search tasks complete]")
# Prepare function responses and add to conversation history
function_response_parts = [
types.Part.from_function_response(
name="search_openalex", response={"output": result}
)
for result in search_results
]
contents.append(types.Content(role="user", parts=function_response_parts))
The conversation loop continues until the model stops making function calls and provides its final answer.
Note how:
- Function calls are executed immediately as they appear in the stream using
asyncio.create_task(). - Multiple function calls in the same response run in parallel.
- Text content is printed as it streams for real-time feedback.
- The conversation history tracks all turns for context.
Step 4 - Solution
_SYSTEM_INSTRUCTION = """
You are a research assistant that queries the OpenAlex academic database to
answer research questions.
Your task is to generate strategic search queries, analyze the abstracts,
then provide a concise cited answer.
You have access to the search_openalex function which searches academic
works in OpenAlex.
## Query Strategy
1. **Start broad** with general search terms
2. **Make up to 10 search function calls in parallel** to explore the topic
3. **Iterate with multiple rounds of searches** based on what you learn
4. **Boolean searches**: Use AND, OR, NOT (must be UPPERCASE)
5. **Exact phrases**: Use double quotes
## CRITICAL: Multi-Round Research Process
**YOU MUST COMPLETE AT LEAST 3 ROUNDS OF SEARCHES BEFORE PROVIDING YOUR FINAL ANSWER.**
Each round should:
- Make up to 10 parallel search function calls
- Analyze the abstracts returned
- Identify new keywords, related concepts, or gaps in coverage
- Use insights to formulate better queries for the next round
**DO NOT provide your final answer after just one round.**
## Important Constraints
- **You only have access to titles and abstracts** - not full papers
- Only cite information explicitly stated in abstracts
- Search terms use automatic stemming (e.g., "possums" matches "possum")
- Keep queries concise (1-10 words work best)
## Final Answer Format
**Only after completing at least 3 rounds of searches**, provide your
answer in this exact format:
**Final answer:** [1-2 paragraphs with hyperlinked citations]
### Requirements:
- Start with "Final answer:"
- **Strictly 1-2 paragraphs maximum**
- Include hyperlinked citations: `[Author et al., Year](https://openalex.org/W1234567890)`
- Use OpenAlex link format: `https://openalex.org/W1234567890` (NOT the API URL)
- Synthesize findings across papers
- Briefly acknowledge you're working from abstracts only
- No headers, bullet points, or extended formatting
### Example:
Final answer: Based on available abstracts, microplastics accumulate in marine organisms through multiple pathways [Thompson et al., 2020](https://openalex.org/W3012345678), with bioaccumulation rates varying by species. Filter feeders show particularly high concentrations [Garcia & Lee, 2021](https://openalex.org/W3123456789), and transfer through trophic levels has been documented [Chen et al., 2022](https://openalex.org/W4012345670), suggesting impacts on apex predators.
This analysis is based on article abstracts; detailed methodologies require consulting full papers. The literature indicates widespread microplastic contamination across marine environments [Martinez et al., 2023](https://openalex.org/W4123456781).
## Workflow
1. Understand the user's research question
2. **Round 1**: Generate up to 10 broad search function calls in parallel
3. Review abstracts and identify key papers, terminology, and knowledge gaps
4. **Round 2**: Generate up to 10 refined search queries based on Round 1
5. Analyze new abstracts and identify additional angles or specific subtopics
6. **Round 3+**: Continue iterating with targeted searches until comprehensive
7. **Only then** provide final answer in required format with citations
"""
async def main():
client = genai.Client()
config = types.GenerateContentConfig(
tools=[types.Tool(function_declarations=[search_openalex_spec])],
system_instruction=_SYSTEM_INSTRUCTION,
)
question = "How does the gut microbiome influence mental health?"
async with aiohttp.ClientSession() as session:
await generate_content(question, client, config, session)
asyncio.run(main())
Now let's run this and observe the output:
python script.py
--- Streaming response ---
[Function call detected: search_openalex]
[Arguments: {'search': 'gut microbiome mental health'}]
[Function call detected: search_openalex]
[Arguments: {'search': 'microbiota gut brain axis'}]
[Function call detected: search_openalex]
[Arguments: {'search': 'probiotics anxiety depression'}]
[Function call detected: search_openalex]
[Arguments: {'search': 'dysbiosis neurological disorders'}]
[Function call detected: search_openalex]
[Arguments: {'search': 'gut bacteria neurotransmitters'}]
[Function call detected: search_openalex]
[Arguments: {'search': 'microbiome stress response'}]
[Function call detected: search_openalex]
[Arguments: {'search': 'psychobiotics'}]
[Function call detected: search_openalex]
[Arguments: {'search': 'microbiome autism'}]
[Function call detected: search_openalex]
[Arguments: {'search': 'gut brain communication'}]
[Function call detected: search_openalex]
[Arguments: {'search': 'microbiome influence mood'}]
[Started 10 search tasks]
[Search tasks complete]
--- Streaming response ---
[Function call detected: search_openalex]
[Arguments: {'search': 'microbiome Tryptophan kynurenine serotonin'}]
[Function call detected: search_openalex]
[Arguments: {'search': 'Short-chain fatty acids SCFA influence brain function'}]
[Function call detected: search_openalex]
[Arguments: {'search': 'Butyrate brain barrier'}]
[Started 3 search tasks]
[Search tasks complete]
--- Streaming response ---
The gut microbiome influences mental health through the **Microbiota-Gut-Brain (MGB) axis**, a complex, bidirectional communication network linking the gastrointestinal tract and the central nervous system through endocrine, immune, and neural signaling pathways [Cryan et al., 2019](https://openalex.org/W2970686316). This relationship means that alterations in the composition of the gut microbiota (dysbiosis) are associated with a range of neuropsychiatric and neurological disorders, including anxiety, depression, and autism spectrum disorder (ASD) [Socała et al., 2021](https://openalex.org/W3193421084).
The primary mechanisms by which gut microbes modulate brain function involve the production of chemical signals. First, microbial fermentation of dietary fiber produces **short-chain fatty acids (SCFAs)**, such as butyrate, propionate, and acetate. These metabolites are crucial for maintaining the integrity of the gut barrier and the blood-brain barrier (BBB), and they function as signaling molecules that regulate neuro-immunoendocrine pathways, potentially alleviating stress-induced brain alterations [Silva et al., 2020](https://openalex.org/W3003912482); [van de Wouw et al., 2018](https://openalex.org/W2891851874). Second, gut bacteria regulate the metabolism of the amino acid tryptophan, a precursor to the neurotransmitter **serotonin (5-HT)**, which plays a critical role in mood, appetite, and sleep [O’Mahony et al., 2014](https://openalex.org/W1972237932); [Roth et al., 2021](https://openalex.org/W3137164735). Neural communication occurs directly via the **Vagus Nerve**, which transmits signals from the gut to the brain; certain beneficial bacteria strains have demonstrated anxiolytic-like effects that require the integrity of this nerve to modulate central neural pathways [Bravo et al., 2011](https://openalex.org/W2164342861). Furthermore, the microbiota helps regulate the **Hypothalamic-Pituitary-Adrenal (HPA) axis**, the body’s main stress response system, and dysbiosis can exacerbate the inflammatory processes associated with depression and increased stress reactivity [Cryan et al., 2019](https://openalex.org/W2970686316); [Kiecolt-Glaser et al., 2015](https://openalex.org/W2159036260). This research has led to the development of "psychobiotics" (probiotics or prebiotics) aimed at manipulating the MGB axis to improve symptoms of conditions like depression and anxiety [Dinan et al., 2013](https://openalex.org/W1983814959).
***
*This answer is based solely on the analysis of available academic abstracts.*
--- Conversation complete ---
Note how:
- Multiple searches run concurrently within each round.
- Searches are triggered as the responses are streamed. This is faster than sequentially waiting on responses and making API calls.
- The model iterates through multiple rounds before providing the final answer.