Rate Limits
Understanding and managing API rate limits on the Rustellar platform.
Overview
Rate limits control the number of API requests you can make within a specific time window. This ensures fair usage and platform stability for all users.
Rate Limit Tiers
Free Tier
| Limit Type | Value |
|---|---|
| Requests per minute (RPM) | 20 |
| Tokens per minute (TPM) | 40,000 |
| Requests per day (RPD) | 200 |
Standard Tier
| Limit Type | Value |
|---|---|
| Requests per minute (RPM) | 60 |
| Tokens per minute (TPM) | 150,000 |
| Requests per day (RPD) | 10,000 |
Pro Tier
| Limit Type | Value |
|---|---|
| Requests per minute (RPM) | 300 |
| Tokens per minute (TPM) | 500,000 |
| Requests per day (RPD) | 100,000 |
Enterprise Tier
Custom limits based on your requirements. Contact our sales team for details.
Rate Limit Headers
Every API response includes headers showing your current rate limit status:
X-RateLimit-Limit-Requests: 60
X-RateLimit-Limit-Tokens: 150000
X-RateLimit-Remaining-Requests: 45
X-RateLimit-Remaining-Tokens: 125000
X-RateLimit-Reset-Requests: 2024-12-14T12:35:00Z
X-RateLimit-Reset-Tokens: 2024-12-14T12:35:00Z
Header Descriptions
| Header | Description |
|---|---|
X-RateLimit-Limit-Requests | Maximum requests allowed per minute |
X-RateLimit-Limit-Tokens | Maximum tokens allowed per minute |
X-RateLimit-Remaining-Requests | Remaining requests in current window |
X-RateLimit-Remaining-Tokens | Remaining tokens in current window |
X-RateLimit-Reset-Requests | When the request limit resets |
X-RateLimit-Reset-Tokens | When the token limit resets |
Rate Limit Exceeded Response
When you exceed rate limits, you'll receive a 429 status code:
{
"error": {
"message": "Rate limit exceeded. Please try again in 30 seconds.",
"type": "rate_limit_error",
"code": "rate_limit_exceeded"
}
}
Handling Rate Limits
Basic Retry Logic
import requests
import time
def make_request_with_retry(url, headers, data, max_retries=3):
"""リトライロジック付きAPIリクエスト"""
for attempt in range(max_retries):
try:
response = requests.post(url, headers=headers, json=data)
# 成功した場合はレスポンスを返す
if response.status_code == 200:
return response.json()
# レート制限エラーの場合
if response.status_code == 429:
# Retry-Afterヘッダーから待機時間を取得
retry_after = int(response.headers.get('Retry-After', 60))
print(f"Rate limited. Waiting {retry_after} seconds...")
time.sleep(retry_after)
continue
# その他のエラー
response.raise_for_status()
except Exception as e:
print(f"Attempt {attempt + 1} failed: {e}")
if attempt < max_retries - 1:
time.sleep(2 ** attempt) # エクスポネンシャルバックオフ
else:
raise
return None
# 使用例
result = make_request_with_retry(
"https://api.rustellar.com/v1/chat/completions",
headers={
"Authorization": "Bearer YOUR_API_KEY",
"Content-Type": "application/json"
},
data={
"model": "helix-v1",
"messages": [{"role": "user", "content": "Hello!"}]
}
)
Advanced Rate Limiting with Token Bucket
import time
from collections import deque
class RateLimiter:
"""トークンバケット方式のレート制限クラス"""
def __init__(self, max_requests_per_minute):
"""
Args:
max_requests_per_minute: 1分あたりの最大リクエスト数
"""
self.max_requests = max_requests_per_minute
self.window = 60 # 1分 = 60秒
self.requests = deque()
def can_make_request(self):
"""リクエストが可能かチェック"""
now = time.time()
# 古いリクエスト記録を削除(1分以上前)
while self.requests and now - self.requests[0] > self.window:
self.requests.popleft()
# リクエスト数が上限未満かチェック
return len(self.requests) < self.max_requests
def wait_if_needed(self):
"""必要に応じて待機"""
while not self.can_make_request():
# 最も古いリクエストが期限切れになるまで待機
oldest_request = self.requests[0]
wait_time = self.window - (time.time() - oldest_request)
if wait_time > 0:
print(f"Rate limit reached. Waiting {wait_time:.1f} seconds...")
time.sleep(wait_time + 0.1) # 少し余裕を持たせる
def record_request(self):
"""リクエストを記録"""
self.requests.append(time.time())
# 使用例
limiter = RateLimiter(max_requests_per_minute=60)
def make_controlled_request(messages):
"""レート制限を考慮したリクエスト"""
# 必要に応じて待機
limiter.wait_if_needed()
# リクエストを実行
response = requests.post(
"https://api.rustellar.com/v1/chat/completions",
headers={
"Authorization": "Bearer YOUR_API_KEY",
"Content-Type": "application/json"
},
json={
"model": "helix-v1",
"messages": messages
}
)
# リクエストを記録
limiter.record_request()
return response.json()
# 複数のリクエストを実行
for i in range(100):
result = make_controlled_request([
{"role": "user", "content": f"Question {i}"}
])
print(f"Request {i}: {result['choices'][0]['message']['content'][:50]}...")
Monitoring Rate Limit Status
def check_rate_limit_status(response):
"""レート制限の状態を確認"""
headers = response.headers
remaining_requests = int(headers.get('X-RateLimit-Remaining-Requests', 0))
remaining_tokens = int(headers.get('X-RateLimit-Remaining-Tokens', 0))
# 警告を表示(残り20%以下の場合)
if remaining_requests < 12: # 60 * 0.2 = 12
print(f"⚠️ Warning: Only {remaining_requests} requests remaining!")
if remaining_tokens < 30000: # 150000 * 0.2 = 30000
print(f"⚠️ Warning: Only {remaining_tokens} tokens remaining!")
return {
"remaining_requests": remaining_requests,
"remaining_tokens": remaining_tokens,
"reset_time": headers.get('X-RateLimit-Reset-Requests')
}
# 使用例
response = requests.post(
"https://api.rustellar.com/v1/chat/completions",
headers={
"Authorization": "Bearer YOUR_API_KEY",
"Content-Type": "application/json"
},
json={
"model": "helix-v1",
"messages": [{"role": "user", "content": "Hello!"}]
}
)
status = check_rate_limit_status(response)
print(f"Rate limit status: {status}")
Best Practices
- Implement exponential backoff when retrying failed requests
- Monitor rate limit headers to avoid hitting limits
- Batch requests when possible to reduce API calls
- Cache responses for frequently requested content
- Upgrade your tier if you consistently hit rate limits
- Distribute load across different time windows
Increasing Your Limits
If you need higher rate limits:
- Upgrade your plan to Standard or Pro tier
- Contact sales for Enterprise custom limits
- Provide use case details for limit increase requests
- Demonstrate usage patterns to justify higher limits
Common Issues
Issue: Hitting Token Limits with Large Conversations
Solution: Implement conversation summarization or pruning for long chat histories.
def trim_conversation(messages, max_tokens=3000):
"""会話履歴をトークン数に基づいてトリミング"""
# 簡易的な実装(実際にはトークナイザーを使用)
estimated_tokens = sum(len(m['content']) // 4 for m in messages)
if estimated_tokens > max_tokens:
# システムメッセージと最新のメッセージのみ保持
system_messages = [m for m in messages if m['role'] == 'system']
recent_messages = messages[-10:] # 最新10メッセージ
return system_messages + recent_messages
return messages
Issue: Concurrent Requests Exceeding Limits
Solution: Use a request queue with rate limiting.
import asyncio
from asyncio import Semaphore
class AsyncRateLimiter:
"""非同期リクエスト用のレート制限"""
def __init__(self, max_concurrent=5, requests_per_minute=60):
self.semaphore = Semaphore(max_concurrent)
self.limiter = RateLimiter(requests_per_minute)
async def make_request(self, session, messages):
"""レート制限付き非同期リクエスト"""
async with self.semaphore:
self.limiter.wait_if_needed()
# 実際のリクエスト(aiohttp使用を想定)
async with session.post(
"https://api.rustellar.com/v1/chat/completions",
headers={
"Authorization": "Bearer YOUR_API_KEY",
"Content-Type": "application/json"
},
json={
"model": "helix-v1",
"messages": messages
}
) as response:
self.limiter.record_request()
return await response.json()
Next Steps
- Learn about Input Formats
- Check Output Formats
- Review Pricing tiers