Development Guide
AI API Integration Guide
Comprehensive step-by-step guide to integrate various AI APIs into your applications with best practices
Overview
AI API integration enables developers to incorporate artificial intelligence capabilities into their applications without building models from scratch. This guide covers the complete process from selecting the right API to implementing robust, production-ready integrations.
Multi-Provider Strategy
Learn to integrate and switch between different AI providers
Error Handling
Implement robust error handling and fallback mechanisms
Cost Optimization
Strategies for managing API costs and usage limits
Integration Planning
API Selection Criteria
- Functionality: Does the API provide the specific AI capabilities you need?
- Pricing: Cost per request and monthly limits
- Rate Limits: Requests per minute/second and concurrent limits
- Latency: Response time requirements for your application
- Reliability: Uptime guarantees and support availability
- Data Privacy: Data handling and retention policies
Common Integration Patterns
- Direct Integration: Single API provider integration
- Multi-Provider: Fallback across multiple providers
- Hybrid Approach: Combine local and cloud AI models
- Batch Processing: Process multiple requests together
- Streaming: Real-time response streaming
Basic Integration Framework
OpenAI Integration
- SDK: Official Python/JavaScript libraries
- Authentication: API key in headers
- Endpoints: Chat, completions, embeddings
- Rate Limits: RPM and TPM based
Anthropic Claude
- SDK: Official Anthropic SDK
- Authentication: API key in headers
- Endpoints: Messages API
- Rate Limits: Requests per minute
Google Gemini
- SDK: Google Generative AI
- Authentication: API key or service account
- Endpoints: Generate content, chat
- Rate Limits: Queries per minute
Unified API Client
Python Implementation
# Unified AI client supporting multiple providers
import os
from abc import ABC, abstractmethod
from typing import List, Dict, Any, Optional
import openai
from anthropic import Anthropic
import google.generativeai as genai
class AIProvider(ABC):
@abstractmethod
def chat_completion(self, messages: List[Dict], **kwargs) -> str:
pass
@abstractmethod
def get_usage_cost(self) -> float:
pass
class OpenAIClient(AIProvider):
def __init__(self, api_key: str):
self.client = openai.OpenAI(api_key=api_key)
self.total_tokens = 0
def chat_completion(self, messages: List[Dict], **kwargs) -> str:
model = kwargs.get('model', 'gpt-3.5-turbo')
response = self.client.chat.completions.create(
model=model,
messages=messages,
temperature=kwargs.get('temperature', 0.7),
max_tokens=kwargs.get('max_tokens', 1000)
)
self.total_tokens += response.usage.total_tokens
return response.choices[0].message.content
def get_usage_cost(self) -> float:
# Approximate cost calculation
return self.total_tokens * 0.002 / 1000 # $0.002 per 1K tokens
class AnthropicClient(AIProvider):
def __init__(self, api_key: str):
self.client = Anthropic(api_key=api_key)
self.total_tokens = 0
def chat_completion(self, messages: List[Dict], **kwargs) -> str:
# Convert to Anthropic format
system_message = next((m['content'] for m in messages if m['role'] == 'system'), "")
user_messages = [m for m in messages if m['role'] == 'user']
response = self.client.messages.create(
model=kwargs.get('model', 'claude-3-sonnet-20240229'),
system=system_message,
messages=user_messages,
temperature=kwargs.get('temperature', 0.7),
max_tokens=kwargs.get('max_tokens', 1000)
)
self.total_tokens += response.usage.input_tokens + response.usage.output_tokens
return response.content[0].text
def get_usage_cost(self) -> float:
return self.total_tokens * 0.008 / 1000 # $0.008 per 1K tokens
class GeminiClient(AIProvider):
def __init__(self, api_key: str):
genai.configure(api_key=api_key)
self.model = genai.GenerativeModel('gemini-pro')
self.total_chars = 0
def chat_completion(self, messages: List[Dict], **kwargs) -> str:
# Convert to Gemini format
last_user_message = next((m for m in reversed(messages) if m['role'] == 'user'), None)
if not last_user_message:
raise ValueError("No user message found")
response = self.model.generate_content(
last_user_message['content'],
generation_config=genai.types.GenerationConfig(
temperature=kwargs.get('temperature', 0.7),
max_output_tokens=kwargs.get('max_tokens', 1000)
)
)
self.total_chars += len(last_user_message['content']) + len(response.text)
return response.text
def get_usage_cost(self) -> float:
return self.total_chars * 0.00025 / 1000 # $0.00025 per 1K characters
Unified Client Manager
class AIClientManager:
def __init__(self):
self.providers = {}
self.fallback_order = []
def add_provider(self, name: str, provider: AIProvider, priority: int = 1):
self.providers[name] = provider
self.fallback_order = sorted(
self.providers.keys(),
key=lambda x: priority,
reverse=True
)
def chat_completion(self, messages: List[Dict], **kwargs) -> Dict[str, Any]:
errors = []
for provider_name in self.fallback_order:
try:
provider = self.providers[provider_name]
response = provider.chat_completion(messages, **kwargs)
return {
'success': True,
'response': response,
'provider': provider_name,
'cost': provider.get_usage_cost()
}
except Exception as e:
errors.append(f"{provider_name}: {str(e)}")
continue
return {
'success': False,
'error': 'All providers failed',
'errors': errors
}
def get_total_cost(self) -> float:
return sum(provider.get_usage_cost() for provider in self.providers.values())
# Usage example
def setup_ai_manager():
manager = AIClientManager()
# Add providers with priorities
manager.add_provider(
'openai',
OpenAIClient(os.getenv('OPENAI_API_KEY')),
priority=3 # Highest priority
)
manager.add_provider(
'anthropic',
AnthropicClient(os.getenv('ANTHROPIC_API_KEY')),
priority=2
)
manager.add_provider(
'gemini',
GeminiClient(os.getenv('GEMINI_API_KEY')),
priority=1 # Lowest priority
)
return manager
# Example usage
manager = setup_ai_manager()
messages = [
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": "Explain machine learning in simple terms"}
]
result = manager.chat_completion(messages, model='gpt-3.5-turbo')
if result['success']:
print(f"Response from {result['provider']}: {result['response']}")
print(f"Cost: ${result['cost']:.6f}")
else:
print("All providers failed:", result['errors'])
Advanced Integration Patterns
Retry Logic with Exponential Backoff
import time
import random
from functools import wraps
def retry_with_backoff(
max_retries: int = 3,
base_delay: float = 1.0,
max_delay: float = 60.0,
jitter: bool = True
):
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
retries = 0
while retries <= max_retries:
try:
return func(*args, **kwargs)
except Exception as e:
retries += 1
if retries > max_retries:
raise e
# Calculate delay with exponential backoff
delay = min(max_delay, base_delay * (2 ** (retries - 1)))
# Add jitter to avoid thundering herd
if jitter:
delay = random.uniform(0.5 * delay, 1.5 * delay)
print(f"Retry {retries}/{max_retries} after {delay:.2f}s: {str(e)}")
time.sleep(delay)
raise Exception(f"Max retries ({max_retries}) exceeded")
return wrapper
return decorator
class RobustAIClient:
def __init__(self, manager: AIClientManager):
self.manager = manager
@retry_with_backoff(max_retries=3, base_delay=1.0)
def robust_chat_completion(self, messages: List[Dict], **kwargs) -> Dict[str, Any]:
result = self.manager.chat_completion(messages, **kwargs)
if not result['success']:
raise Exception(f"AI request failed: {result['error']}")
return result
def batch_process(self, prompts: List[str], **kwargs) -> List[Dict[str, Any]]:
results = []
batch_size = kwargs.get('batch_size', 5)
delay_between_batches = kwargs.get('delay_between_batches', 0.1)
for i in range(0, len(prompts), batch_size):
batch = prompts[i:i + batch_size]
batch_results = []
for prompt in batch:
try:
messages = [{"role": "user", "content": prompt}]
result = self.robust_chat_completion(messages, **kwargs)
batch_results.append({
'prompt': prompt,
'success': True,
'response': result['response'],
'provider': result['provider']
})
except Exception as e:
batch_results.append({
'prompt': prompt,
'success': False,
'error': str(e)
})
results.extend(batch_results)
# Small delay to avoid rate limits
if i + batch_size < len(prompts):
time.sleep(delay_between_batches)
return results
Streaming Response Handling
class StreamingAIClient:
def __init__(self, openai_client: OpenAIClient):
self.client = openai_client
def stream_chat_completion(self, messages: List[Dict], **kwargs):
model = kwargs.get('model', 'gpt-3.5-turbo')
stream = self.client.client.chat.completions.create(
model=model,
messages=messages,
temperature=kwargs.get('temperature', 0.7),
max_tokens=kwargs.get('max_tokens', 1000),
stream=True
)
full_response = ""
for chunk in stream:
if chunk.choices[0].delta.content is not None:
content = chunk.choices[0].delta.content
full_response += content
yield {
'type': 'content',
'content': content,
'complete': False
}
yield {
'type': 'complete',
'content': full_response,
'complete': True
}
async def async_stream_chat_completion(self, messages: List[Dict], **kwargs):
model = kwargs.get('model', 'gpt-3.5-turbo')
stream = await self.client.client.chat.completions.create(
model=model,
messages=messages,
temperature=kwargs.get('temperature', 0.7),
max_tokens=kwargs.get('max_tokens', 1000),
stream=True
)
full_response = ""
async for chunk in stream:
if chunk.choices[0].delta.content is not None:
content = chunk.choices[0].delta.content
full_response += content
yield {
'type': 'content',
'content': content,
'complete': False
}
yield {
'type': 'complete',
'content': full_response,
'complete': True
}
# WebSocket integration example
from fastapi import FastAPI, WebSocket
import json
app = FastAPI()
@app.websocket("/ws/chat")
async def websocket_chat(websocket: WebSocket):
await websocket.accept()
client = StreamingAIClient(OpenAIClient(os.getenv('OPENAI_API_KEY')))
try:
while True:
# Receive message from client
data = await websocket.receive_text()
message_data = json.loads(data)
messages = message_data.get('messages', [])
# Stream response back to client
async for chunk in client.async_stream_chat_completion(messages):
await websocket.send_text(json.dumps(chunk))
except Exception as e:
print(f"WebSocket error: {e}")
finally:
await websocket.close()
Security and Best Practices
API Key Management
# Secure API key management
import os
from cryptography.fernet import Fernet
import keyring
class SecureKeyManager:
def __init__(self, service_name: str):
self.service_name = service_name
self.cipher_suite = Fernet(self._get_encryption_key())
def _get_encryption_key(self) -> bytes:
# Get or create encryption key from secure storage
key = keyring.get_password("ai_api_manager", "encryption_key")
if not key:
key = Fernet.generate_key().decode()
keyring.set_password("ai_api_manager", "encryption_key", key)
return key.encode()
def store_api_key(self, provider: str, api_key: str):
encrypted_key = self.cipher_suite.encrypt(api_key.encode())
keyring.set_password(self.service_name, provider, encrypted_key.decode())
def get_api_key(self, provider: str) -> str:
encrypted_key = keyring.get_password(self.service_name, provider)
if not encrypted_key:
raise ValueError(f"No API key found for {provider}")
decrypted_key = self.cipher_suite.decrypt(encrypted_key.encode())
return decrypted_key.decode()
def delete_api_key(self, provider: str):
keyring.delete_password(self.service_name, provider)
# Environment-based configuration
class Config:
def __init__(self):
self.required_keys = [
'OPENAI_API_KEY',
'ANTHROPIC_API_KEY',
'GEMINI_API_KEY'
]
self._validate_config()
def _validate_config(self):
missing = [key for key in self.required_keys if not os.getenv(key)]
if missing:
raise EnvironmentError(f"Missing environment variables: {', '.join(missing)}")
def get_openai_key(self) -> str:
return os.getenv('OPENAI_API_KEY')
def get_anthropic_key(self) -> str:
return os.getenv('ANTHROPIC_API_KEY')
def get_gemini_key(self) -> str:
return os.getenv('GEMINI_API_KEY')
# Usage
config = Config()
key_manager = SecureKeyManager("my_ai_app")
# Store keys securely (run once)
key_manager.store_api_key("openai", config.get_openai_key())
key_manager.store_api_key("anthropic", config.get_anthropic_key())
key_manager.store_api_key("gemini", config.get_gemini_key())
Rate Limit Management
import time
from threading import Lock
from collections import defaultdict
class RateLimiter:
def __init__(self):
self.locks = defaultdict(Lock)
self.request_times = defaultdict(list)
def wait_if_needed(self, provider: str, max_requests: int, time_window: int):
with self.locks[provider]:
current_time = time.time()
# Remove old requests outside the time window
self.request_times[provider] = [
t for t in self.request_times[provider]
if current_time - t < time_window
]
# Check if we're at the limit
if len(self.request_times[provider]) >= max_requests:
# Calculate wait time
oldest_request = self.request_times[provider][0]
wait_time = time_window - (current_time - oldest_request)
if wait_time > 0:
time.sleep(wait_time)
# Update the list after waiting
self.request_times[provider] = [
t for t in self.request_times[provider]
if current_time - t < time_window
]
# Add current request
self.request_times[provider].append(current_time)
class RateLimitedAIClient:
def __init__(self, manager: AIClientManager):
self.manager = manager
self.rate_limiter = RateLimiter()
self.rate_limits = {
'openai': (3500, 60), # 3500 requests per minute
'anthropic': (1000, 60), # 1000 requests per minute
'gemini': (60, 60) # 60 requests per minute
}
def chat_completion_with_rate_limit(self, messages: List[Dict], **kwargs) -> Dict[str, Any]:
# First, try without specifying provider to use fallback
result = self.manager.chat_completion(messages, **kwargs)
if result['success']:
provider = result['provider']
max_requests, time_window = self.rate_limits.get(provider, (60, 60))
# Apply rate limiting for the successful provider
self.rate_limiter.wait_if_needed(provider, max_requests, time_window)
return result
Monitoring and Analytics
Performance Tracking
import time
from dataclasses import dataclass
from typing import List, Dict
from statistics import mean, median
@dataclass
class APIRequestMetrics:
provider: str
success: bool
response_time: float
tokens_used: int
cost: float
timestamp: float
class APIMonitor:
def __init__(self):
self.metrics: List[APIRequestMetrics] = []
def record_request(
self,
provider: str,
success: bool,
response_time: float,
tokens_used: int = 0,
cost: float = 0.0
):
metric = APIRequestMetrics(
provider=provider,
success=success,
response_time=response_time,
tokens_used=tokens_used,
cost=cost,
timestamp=time.time()
)
self.metrics.append(metric)
def get_provider_stats(self, provider: str, hours: int = 24) -> Dict[str, float]:
cutoff_time = time.time() - (hours * 3600)
provider_metrics = [
m for m in self.metrics
if m.provider == provider and m.timestamp > cutoff_time
]
if not provider_metrics:
return {}
success_rate = sum(1 for m in provider_metrics if m.success) / len(provider_metrics)
response_times = [m.response_time for m in provider_metrics if m.success]
total_cost = sum(m.cost for m in provider_metrics)
total_tokens = sum(m.tokens_used for m in provider_metrics)
return {
'total_requests': len(provider_metrics),
'success_rate': success_rate,
'avg_response_time': mean(response_times) if response_times else 0,
'median_response_time': median(response_times) if response_times else 0,
'total_cost': total_cost,
'total_tokens': total_tokens,
'cost_per_request': total_cost / len(provider_metrics) if provider_metrics else 0
}
def get_overall_stats(self, hours: int = 24) -> Dict[str, any]:
cutoff_time = time.time() - (hours * 3600)
recent_metrics = [m for m in self.metrics if m.timestamp > cutoff_time]
if not recent_metrics:
return {}
providers = set(m.provider for m in recent_metrics)
provider_stats = {p: self.get_provider_stats(p, hours) for p in providers}
return {
'total_requests': len(recent_metrics),
'success_rate': sum(1 for m in recent_metrics if m.success) / len(recent_metrics),
'total_cost': sum(m.cost for m in recent_metrics),
'providers': provider_stats
}
# Decorator for automatic monitoring
def monitor_ai_requests(monitor: APIMonitor):
def decorator(func):
def wrapper(*args, **kwargs):
start_time = time.time()
try:
result = func(*args, **kwargs)
response_time = time.time() - start_time
monitor.record_request(
provider=result.get('provider', 'unknown'),
success=result.get('success', False),
response_time=response_time,
tokens_used=result.get('tokens_used', 0),
cost=result.get('cost', 0.0)
)
return result
except Exception as e:
response_time = time.time() - start_time
monitor.record_request(
provider='unknown',
success=False,
response_time=response_time
)
raise e
return wrapper
return decorator
Deployment Considerations
| Environment | Considerations | Recommended Setup | Cost Impact |
|---|---|---|---|
| Development | Rapid iteration, testing | Single provider, generous limits | Low - test usage only |
| Staging | Production-like testing | Multi-provider, realistic limits | Medium - limited testing |
| Production | Reliability, scalability | Multi-provider with fallbacks | High - actual usage |
Production Checklist
- Implement proper error handling and retry logic
- Set up rate limiting and usage monitoring
- Use secure API key management
- Implement cost tracking and alerts
- Set up logging and monitoring
- Create fallback mechanisms for provider failures
- Test with realistic load and failure scenarios
- Document integration patterns and procedures