Rate Limiters
Challenge
Model provider APIs have strict rate limits. It's very easy to exceed these limits if you're making requests concurrently without throttling.
In this challenge, you will build a sliding window rate limiter for calling async APIs.
A sliding window rate limiter works by keeping a queue of the most recent N requests within a given time window.
Before processing a new request, the rate limiter removes any requests that fall outside the time window, then checks if the number of remaining requests is below the limit. If so, the new request is added to the queue and processed. Otherwise, the limiter waits until old requests have aged out of the window.
Before you start
The following functions or classes are relevant for this chapter. It might be helpful to read their docs before you start:
asyncio.gather()for waiting on running tasks.asyncio.get_running_loop().time()for getting the current time in a coroutine.asyncio.Lock()for protecting shared resources.asyncio.sleep()for waiting in a coroutine.
Step 0
To get started, get a Gemini API key from Google AI Studio. We use the Gemini API because it has a generous free tier, but any async model API will work.
export GEMINI_API_KEY="YOUR_API_KEY"
Step 1
In this step, your goal is to make concurrent requests to the Gemini API and hit the rate limits.
Create a new script (script.py) that makes 20 concurrent requests to the Gemini API then run your script. The solution to the LLM Responses chapter explains how to do this.
Confirm you get a resource exhausted error:
python script.py
> google.genai.errors.ClientError: 429 RESOURCE_EXHAUSTED. {'error': {'code': 429,
'message': 'You exceeded your current quota, please check your plan and billing details.
For more information on this error, head to: https://ai.google.dev/gemini-api/docs/rate-limits. ...
At the time of writing, Gemini 2.5 Flash had a limit fo 10 requests per minute on the free tier.
Step 2
In this step, your goal is to implement the sliding window rate limiter.
Below is skeleton code for a RateLimiter class. You need to implement the acquire() method:
import asyncio
from collections import deque
from datetime import timedelta
class RateLimiter:
def __init__(self, limit: int, interval: timedelta):
self._limit = limit
self._interval = interval.total_seconds()
# Holds request timestamps.
self._window = deque()
async def acquire(self) -> None:
"""Wait until a new request can be made under the rate limit."""
pass
acquire() will be awaited until requests can be made under the rate limit:
async def generate_content(client, rate_limiter):
# Waits until we are under the rate limit.
await rate_limiter.acquire()
response = await client.aio.models.generate_content(
model="gemini-flash-latest", contents="Why do some birds migrate?"
)
return response
The acquire() method should:
- Acquire a lock to prevent race conditions when making changes to the
window. You can create the lock usingasyncio.Lock(). - Use
asyncio.get_running_loop().time()to get the current time in seconds. - Remove old requests from
windowto ensure it only has requests that were made within the pastintervalseconds. - If the window has fewer than
limitrequests, the request is allowed. Add the current request time to thewindowand return. - If the limit is reached, calculate how long to wait for the oldest request to age out of the window, then sleep with
asyncio.sleep(). - Retry the above steps in a
whileloop.
Step 3
In this step, your goal is to test your rate limiter.
Update your concurrent code to call await limiter.acquire() before making requests.
Verify that the rate limiter delays requests to avoid hitting the Gemini API rate limits.
Going Further
- Try implementing other rate limiting algorithms like token bucket. This will require keeping track of "tokens" and replenishing them in every iteration at a fixed rate.
- Implement a sliding window rate limiter that avoids busy-waiting and respects request order. The rate limiter should not use
whileloops orasyncio.sleep(). When the limit is reached, create a future withloop.create_future()and add it to a waiters queue, then await it. When a request is sent, useloop.call_later(interval, callback)to schedule a callback that will wake up the next waiter from the futures queue. Effectively, every allowed requests reserves a slot that expires inintervalseconds when the callback is called and unblocks the next waiter in line and lets the next request through.
Now take some time to attempt the challenge before looking at the solution!
Solution
Below is a walkthrough of one possible solution. Your implementation may differ, and that's okay!
Step 1 - Solution
See the LLM Responses solution to make concurrent requests to the Gemini API. Increase _NUM_REQUESTS = 20 to trigger the rate limit error.
Step 2 - Solution
import asyncio
from collections import deque
from datetime import timedelta
class RateLimiter:
def __init__(self, limit: int, interval: timedelta):
self._limit = limit
self._interval = interval.total_seconds()
self._window = deque()
self._lock = asyncio.Lock()
def _prune_window(self, now: float) -> None:
"""Removes requests that have aged out of the time window."""
while self._window and now - self._window[0] > self._interval:
self._window.popleft()
async def acquire(self) -> None:
loop = asyncio.get_running_loop()
while True:
async with self._lock:
now = loop.time()
self._prune_window(now)
if len(self._window) < self._limit:
# We have space in the sliding window to send a request.
self._window.append(now)
return
# Wait for the oldest request to age out of the window.
oldest_request_time = self._window[0]
elapsed = now - oldest_request_time
remaining = self._interval - elapsed
await asyncio.sleep(remaining)
Note how:
_lockprevents race conditions when multiple tasks callacquire()simultaneously_prune_window()removes requests outside the sliding window- We release the lock before sleeping to allow other tasks to check the rate limit
This solution suffers from the "thundering heard" problem. If multiple tasks are sleeping, all of them will wake up at the same time to try to acquire the lock. Only one request will be allowed, and the remaining tasks will need to sleep again.
One way to avoid this problem is to implement the rate limiter using futures as described in the Going Further section.
Step 3 - Solution
Now let's integrate the rate limiter with our Gemini API calls:
import asyncio
from datetime import datetime, timedelta
from google import genai
_NUM_REQUESTS = 20
class RateLimiter:
# Same as above.
...
async def generate_content(index, client, rate_limiter):
await rate_limiter.acquire()
print(f"Request {index} sent at {datetime.now().strftime('%H:%M:%S')}")
response = await client.aio.models.generate_content(
model="gemini-flash-latest", contents="Why do some birds migrate?"
)
return response
async def main():
# Gemini Flash Latest has a rate limit of 10 requests per minute.
limiter = RateLimiter(limit=10, interval=timedelta(minutes=1))
client = genai.Client()
tasks = [generate_content(i, client, limiter) for i in range(_NUM_REQUESTS)]
results = await asyncio.gather(*tasks)
if __name__ == "__main__":
asyncio.run(main())
Now when we run this with 20 requests, it completes successfully without hitting rate limits:
time python script.py
> Request 0 sent at 22:31:10
> Request 1 sent at 22:31:10
> Request 2 sent at 22:31:10
> Request 3 sent at 22:31:10
> Request 4 sent at 22:31:10
> Request 5 sent at 22:31:10
> Request 6 sent at 22:31:10
> Request 7 sent at 22:31:10
> Request 8 sent at 22:31:10
> Request 9 sent at 22:31:10
# Note how we wait one minute before sending the 11th request to stay
# within the rate limit.
> Request 19 sent at 22:32:10
> Request 18 sent at 22:32:10
> Request 17 sent at 22:32:10
> Request 16 sent at 22:32:10
> Request 15 sent at 22:32:10
> Request 14 sent at 22:32:10
> Request 12 sent at 22:32:10
> Request 11 sent at 22:32:10
> Request 13 sent at 22:32:10
> Request 10 sent at 22:32:10
>
> real 1m9.061s
> user 0m2.072s
> sys 0m0.402s
The first 10 requests are allowed immediately, then the rate limiter automatically pauses until enough time has passed to send the next batch. All 20 requests succeed without any resource exhausted errors.