Skip to main content

Rate Limits

Understanding and managing API rate limits on the Rustellar platform.

Overview

Rate limits control the number of API requests you can make within a specific time window. This ensures fair usage and platform stability for all users.

Rate Limit Tiers

Free Tier

Limit TypeValue
Requests per minute (RPM)20
Tokens per minute (TPM)40,000
Requests per day (RPD)200

Standard Tier

Limit TypeValue
Requests per minute (RPM)60
Tokens per minute (TPM)150,000
Requests per day (RPD)10,000

Pro Tier

Limit TypeValue
Requests per minute (RPM)300
Tokens per minute (TPM)500,000
Requests per day (RPD)100,000

Enterprise Tier

Custom limits based on your requirements. Contact our sales team for details.

Rate Limit Headers

Every API response includes headers showing your current rate limit status:

X-RateLimit-Limit-Requests: 60
X-RateLimit-Limit-Tokens: 150000
X-RateLimit-Remaining-Requests: 45
X-RateLimit-Remaining-Tokens: 125000
X-RateLimit-Reset-Requests: 2024-12-14T12:35:00Z
X-RateLimit-Reset-Tokens: 2024-12-14T12:35:00Z

Header Descriptions

HeaderDescription
X-RateLimit-Limit-RequestsMaximum requests allowed per minute
X-RateLimit-Limit-TokensMaximum tokens allowed per minute
X-RateLimit-Remaining-RequestsRemaining requests in current window
X-RateLimit-Remaining-TokensRemaining tokens in current window
X-RateLimit-Reset-RequestsWhen the request limit resets
X-RateLimit-Reset-TokensWhen the token limit resets

Rate Limit Exceeded Response

When you exceed rate limits, you'll receive a 429 status code:

{
"error": {
"message": "Rate limit exceeded. Please try again in 30 seconds.",
"type": "rate_limit_error",
"code": "rate_limit_exceeded"
}
}

Handling Rate Limits

Basic Retry Logic

import requests
import time

def make_request_with_retry(url, headers, data, max_retries=3):
"""リトライロジック付きAPIリクエスト"""
for attempt in range(max_retries):
try:
response = requests.post(url, headers=headers, json=data)

# 成功した場合はレスポンスを返す
if response.status_code == 200:
return response.json()

# レート制限エラーの場合
if response.status_code == 429:
# Retry-Afterヘッダーから待機時間を取得
retry_after = int(response.headers.get('Retry-After', 60))
print(f"Rate limited. Waiting {retry_after} seconds...")
time.sleep(retry_after)
continue

# その他のエラー
response.raise_for_status()

except Exception as e:
print(f"Attempt {attempt + 1} failed: {e}")
if attempt < max_retries - 1:
time.sleep(2 ** attempt) # エクスポネンシャルバックオフ
else:
raise

return None

# 使用例
result = make_request_with_retry(
"https://api.rustellar.com/v1/chat/completions",
headers={
"Authorization": "Bearer YOUR_API_KEY",
"Content-Type": "application/json"
},
data={
"model": "helix-v1",
"messages": [{"role": "user", "content": "Hello!"}]
}
)

Advanced Rate Limiting with Token Bucket

import time
from collections import deque

class RateLimiter:
"""トークンバケット方式のレート制限クラス"""

def __init__(self, max_requests_per_minute):
"""
Args:
max_requests_per_minute: 1分あたりの最大リクエスト数
"""
self.max_requests = max_requests_per_minute
self.window = 60 # 1分 = 60秒
self.requests = deque()

def can_make_request(self):
"""リクエストが可能かチェック"""
now = time.time()

# 古いリクエスト記録を削除(1分以上前)
while self.requests and now - self.requests[0] > self.window:
self.requests.popleft()

# リクエスト数が上限未満かチェック
return len(self.requests) < self.max_requests

def wait_if_needed(self):
"""必要に応じて待機"""
while not self.can_make_request():
# 最も古いリクエストが期限切れになるまで待機
oldest_request = self.requests[0]
wait_time = self.window - (time.time() - oldest_request)
if wait_time > 0:
print(f"Rate limit reached. Waiting {wait_time:.1f} seconds...")
time.sleep(wait_time + 0.1) # 少し余裕を持たせる

def record_request(self):
"""リクエストを記録"""
self.requests.append(time.time())

# 使用例
limiter = RateLimiter(max_requests_per_minute=60)

def make_controlled_request(messages):
"""レート制限を考慮したリクエスト"""
# 必要に応じて待機
limiter.wait_if_needed()

# リクエストを実行
response = requests.post(
"https://api.rustellar.com/v1/chat/completions",
headers={
"Authorization": "Bearer YOUR_API_KEY",
"Content-Type": "application/json"
},
json={
"model": "helix-v1",
"messages": messages
}
)

# リクエストを記録
limiter.record_request()

return response.json()

# 複数のリクエストを実行
for i in range(100):
result = make_controlled_request([
{"role": "user", "content": f"Question {i}"}
])
print(f"Request {i}: {result['choices'][0]['message']['content'][:50]}...")

Monitoring Rate Limit Status

def check_rate_limit_status(response):
"""レート制限の状態を確認"""
headers = response.headers

remaining_requests = int(headers.get('X-RateLimit-Remaining-Requests', 0))
remaining_tokens = int(headers.get('X-RateLimit-Remaining-Tokens', 0))

# 警告を表示(残り20%以下の場合)
if remaining_requests < 12: # 60 * 0.2 = 12
print(f"⚠️ Warning: Only {remaining_requests} requests remaining!")

if remaining_tokens < 30000: # 150000 * 0.2 = 30000
print(f"⚠️ Warning: Only {remaining_tokens} tokens remaining!")

return {
"remaining_requests": remaining_requests,
"remaining_tokens": remaining_tokens,
"reset_time": headers.get('X-RateLimit-Reset-Requests')
}

# 使用例
response = requests.post(
"https://api.rustellar.com/v1/chat/completions",
headers={
"Authorization": "Bearer YOUR_API_KEY",
"Content-Type": "application/json"
},
json={
"model": "helix-v1",
"messages": [{"role": "user", "content": "Hello!"}]
}
)

status = check_rate_limit_status(response)
print(f"Rate limit status: {status}")

Best Practices

  1. Implement exponential backoff when retrying failed requests
  2. Monitor rate limit headers to avoid hitting limits
  3. Batch requests when possible to reduce API calls
  4. Cache responses for frequently requested content
  5. Upgrade your tier if you consistently hit rate limits
  6. Distribute load across different time windows

Increasing Your Limits

If you need higher rate limits:

  1. Upgrade your plan to Standard or Pro tier
  2. Contact sales for Enterprise custom limits
  3. Provide use case details for limit increase requests
  4. Demonstrate usage patterns to justify higher limits

Common Issues

Issue: Hitting Token Limits with Large Conversations

Solution: Implement conversation summarization or pruning for long chat histories.

def trim_conversation(messages, max_tokens=3000):
"""会話履歴をトークン数に基づいてトリミング"""
# 簡易的な実装(実際にはトークナイザーを使用)
estimated_tokens = sum(len(m['content']) // 4 for m in messages)

if estimated_tokens > max_tokens:
# システムメッセージと最新のメッセージのみ保持
system_messages = [m for m in messages if m['role'] == 'system']
recent_messages = messages[-10:] # 最新10メッセージ

return system_messages + recent_messages

return messages

Issue: Concurrent Requests Exceeding Limits

Solution: Use a request queue with rate limiting.

import asyncio
from asyncio import Semaphore

class AsyncRateLimiter:
"""非同期リクエスト用のレート制限"""

def __init__(self, max_concurrent=5, requests_per_minute=60):
self.semaphore = Semaphore(max_concurrent)
self.limiter = RateLimiter(requests_per_minute)

async def make_request(self, session, messages):
"""レート制限付き非同期リクエスト"""
async with self.semaphore:
self.limiter.wait_if_needed()

# 実際のリクエスト(aiohttp使用を想定)
async with session.post(
"https://api.rustellar.com/v1/chat/completions",
headers={
"Authorization": "Bearer YOUR_API_KEY",
"Content-Type": "application/json"
},
json={
"model": "helix-v1",
"messages": messages
}
) as response:
self.limiter.record_request()
return await response.json()

Next Steps