Development Guide

AI API Integration Guide

Comprehensive step-by-step guide to integrate various AI APIs into your applications with best practices

Overview

AI API integration enables developers to incorporate artificial intelligence capabilities into their applications without building models from scratch. This guide covers the complete process from selecting the right API to implementing robust, production-ready integrations.

Multi-Provider Strategy

Learn to integrate and switch between different AI providers

Error Handling

Implement robust error handling and fallback mechanisms

Cost Optimization

Strategies for managing API costs and usage limits

Integration Planning

API Selection Criteria

  1. Functionality: Does the API provide the specific AI capabilities you need?
  2. Pricing: Cost per request and monthly limits
  3. Rate Limits: Requests per minute/second and concurrent limits
  4. Latency: Response time requirements for your application
  5. Reliability: Uptime guarantees and support availability
  6. Data Privacy: Data handling and retention policies

Common Integration Patterns

  • Direct Integration: Single API provider integration
  • Multi-Provider: Fallback across multiple providers
  • Hybrid Approach: Combine local and cloud AI models
  • Batch Processing: Process multiple requests together
  • Streaming: Real-time response streaming

Basic Integration Framework

OpenAI Integration

  • SDK: Official Python/JavaScript libraries
  • Authentication: API key in headers
  • Endpoints: Chat, completions, embeddings
  • Rate Limits: RPM and TPM based

Anthropic Claude

  • SDK: Official Anthropic SDK
  • Authentication: API key in headers
  • Endpoints: Messages API
  • Rate Limits: Requests per minute

Google Gemini

  • SDK: Google Generative AI
  • Authentication: API key or service account
  • Endpoints: Generate content, chat
  • Rate Limits: Queries per minute

Unified API Client

Python Implementation

# Unified AI client supporting multiple providers
import os
from abc import ABC, abstractmethod
from typing import List, Dict, Any, Optional
import openai
from anthropic import Anthropic
import google.generativeai as genai

class AIProvider(ABC):
    @abstractmethod
    def chat_completion(self, messages: List[Dict], **kwargs) -> str:
        pass
    
    @abstractmethod
    def get_usage_cost(self) -> float:
        pass

class OpenAIClient(AIProvider):
    def __init__(self, api_key: str):
        self.client = openai.OpenAI(api_key=api_key)
        self.total_tokens = 0
        
    def chat_completion(self, messages: List[Dict], **kwargs) -> str:
        model = kwargs.get('model', 'gpt-3.5-turbo')
        response = self.client.chat.completions.create(
            model=model,
            messages=messages,
            temperature=kwargs.get('temperature', 0.7),
            max_tokens=kwargs.get('max_tokens', 1000)
        )
        
        self.total_tokens += response.usage.total_tokens
        return response.choices[0].message.content
    
    def get_usage_cost(self) -> float:
        # Approximate cost calculation
        return self.total_tokens * 0.002 / 1000  # $0.002 per 1K tokens

class AnthropicClient(AIProvider):
    def __init__(self, api_key: str):
        self.client = Anthropic(api_key=api_key)
        self.total_tokens = 0
        
    def chat_completion(self, messages: List[Dict], **kwargs) -> str:
        # Convert to Anthropic format
        system_message = next((m['content'] for m in messages if m['role'] == 'system'), "")
        user_messages = [m for m in messages if m['role'] == 'user']
        
        response = self.client.messages.create(
            model=kwargs.get('model', 'claude-3-sonnet-20240229'),
            system=system_message,
            messages=user_messages,
            temperature=kwargs.get('temperature', 0.7),
            max_tokens=kwargs.get('max_tokens', 1000)
        )
        
        self.total_tokens += response.usage.input_tokens + response.usage.output_tokens
        return response.content[0].text
    
    def get_usage_cost(self) -> float:
        return self.total_tokens * 0.008 / 1000  # $0.008 per 1K tokens

class GeminiClient(AIProvider):
    def __init__(self, api_key: str):
        genai.configure(api_key=api_key)
        self.model = genai.GenerativeModel('gemini-pro')
        self.total_chars = 0
        
    def chat_completion(self, messages: List[Dict], **kwargs) -> str:
        # Convert to Gemini format
        last_user_message = next((m for m in reversed(messages) if m['role'] == 'user'), None)
        if not last_user_message:
            raise ValueError("No user message found")
            
        response = self.model.generate_content(
            last_user_message['content'],
            generation_config=genai.types.GenerationConfig(
                temperature=kwargs.get('temperature', 0.7),
                max_output_tokens=kwargs.get('max_tokens', 1000)
            )
        )
        
        self.total_chars += len(last_user_message['content']) + len(response.text)
        return response.text
    
    def get_usage_cost(self) -> float:
        return self.total_chars * 0.00025 / 1000  # $0.00025 per 1K characters

Unified Client Manager

class AIClientManager:
    def __init__(self):
        self.providers = {}
        self.fallback_order = []
        
    def add_provider(self, name: str, provider: AIProvider, priority: int = 1):
        self.providers[name] = provider
        self.fallback_order = sorted(
            self.providers.keys(),
            key=lambda x: priority,
            reverse=True
        )
    
    def chat_completion(self, messages: List[Dict], **kwargs) -> Dict[str, Any]:
        errors = []
        
        for provider_name in self.fallback_order:
            try:
                provider = self.providers[provider_name]
                response = provider.chat_completion(messages, **kwargs)
                
                return {
                    'success': True,
                    'response': response,
                    'provider': provider_name,
                    'cost': provider.get_usage_cost()
                }
                
            except Exception as e:
                errors.append(f"{provider_name}: {str(e)}")
                continue
        
        return {
            'success': False,
            'error': 'All providers failed',
            'errors': errors
        }
    
    def get_total_cost(self) -> float:
        return sum(provider.get_usage_cost() for provider in self.providers.values())

# Usage example
def setup_ai_manager():
    manager = AIClientManager()
    
    # Add providers with priorities
    manager.add_provider(
        'openai', 
        OpenAIClient(os.getenv('OPENAI_API_KEY')),
        priority=3  # Highest priority
    )
    manager.add_provider(
        'anthropic',
        AnthropicClient(os.getenv('ANTHROPIC_API_KEY')),
        priority=2
    )
    manager.add_provider(
        'gemini',
        GeminiClient(os.getenv('GEMINI_API_KEY')),
        priority=1  # Lowest priority
    )
    
    return manager

# Example usage
manager = setup_ai_manager()
messages = [
    {"role": "system", "content": "You are a helpful assistant."},
    {"role": "user", "content": "Explain machine learning in simple terms"}
]

result = manager.chat_completion(messages, model='gpt-3.5-turbo')
if result['success']:
    print(f"Response from {result['provider']}: {result['response']}")
    print(f"Cost: ${result['cost']:.6f}")
else:
    print("All providers failed:", result['errors'])

Advanced Integration Patterns

Retry Logic with Exponential Backoff

import time
import random
from functools import wraps

def retry_with_backoff(
    max_retries: int = 3,
    base_delay: float = 1.0,
    max_delay: float = 60.0,
    jitter: bool = True
):
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            retries = 0
            while retries <= max_retries:
                try:
                    return func(*args, **kwargs)
                except Exception as e:
                    retries += 1
                    if retries > max_retries:
                        raise e
                    
                    # Calculate delay with exponential backoff
                    delay = min(max_delay, base_delay * (2 ** (retries - 1)))
                    
                    # Add jitter to avoid thundering herd
                    if jitter:
                        delay = random.uniform(0.5 * delay, 1.5 * delay)
                    
                    print(f"Retry {retries}/{max_retries} after {delay:.2f}s: {str(e)}")
                    time.sleep(delay)
            
            raise Exception(f"Max retries ({max_retries}) exceeded")
        return wrapper
    return decorator

class RobustAIClient:
    def __init__(self, manager: AIClientManager):
        self.manager = manager
    
    @retry_with_backoff(max_retries=3, base_delay=1.0)
    def robust_chat_completion(self, messages: List[Dict], **kwargs) -> Dict[str, Any]:
        result = self.manager.chat_completion(messages, **kwargs)
        
        if not result['success']:
            raise Exception(f"AI request failed: {result['error']}")
        
        return result
    
    def batch_process(self, prompts: List[str], **kwargs) -> List[Dict[str, Any]]:
        results = []
        batch_size = kwargs.get('batch_size', 5)
        delay_between_batches = kwargs.get('delay_between_batches', 0.1)
        
        for i in range(0, len(prompts), batch_size):
            batch = prompts[i:i + batch_size]
            batch_results = []
            
            for prompt in batch:
                try:
                    messages = [{"role": "user", "content": prompt}]
                    result = self.robust_chat_completion(messages, **kwargs)
                    batch_results.append({
                        'prompt': prompt,
                        'success': True,
                        'response': result['response'],
                        'provider': result['provider']
                    })
                except Exception as e:
                    batch_results.append({
                        'prompt': prompt,
                        'success': False,
                        'error': str(e)
                    })
            
            results.extend(batch_results)
            
            # Small delay to avoid rate limits
            if i + batch_size < len(prompts):
                time.sleep(delay_between_batches)
        
        return results

Streaming Response Handling

class StreamingAIClient:
    def __init__(self, openai_client: OpenAIClient):
        self.client = openai_client
    
    def stream_chat_completion(self, messages: List[Dict], **kwargs):
        model = kwargs.get('model', 'gpt-3.5-turbo')
        
        stream = self.client.client.chat.completions.create(
            model=model,
            messages=messages,
            temperature=kwargs.get('temperature', 0.7),
            max_tokens=kwargs.get('max_tokens', 1000),
            stream=True
        )
        
        full_response = ""
        for chunk in stream:
            if chunk.choices[0].delta.content is not None:
                content = chunk.choices[0].delta.content
                full_response += content
                yield {
                    'type': 'content',
                    'content': content,
                    'complete': False
                }
        
        yield {
            'type': 'complete',
            'content': full_response,
            'complete': True
        }
    
    async def async_stream_chat_completion(self, messages: List[Dict], **kwargs):
        model = kwargs.get('model', 'gpt-3.5-turbo')
        
        stream = await self.client.client.chat.completions.create(
            model=model,
            messages=messages,
            temperature=kwargs.get('temperature', 0.7),
            max_tokens=kwargs.get('max_tokens', 1000),
            stream=True
        )
        
        full_response = ""
        async for chunk in stream:
            if chunk.choices[0].delta.content is not None:
                content = chunk.choices[0].delta.content
                full_response += content
                yield {
                    'type': 'content',
                    'content': content,
                    'complete': False
                }
        
        yield {
            'type': 'complete',
            'content': full_response,
            'complete': True
        }

# WebSocket integration example
from fastapi import FastAPI, WebSocket
import json

app = FastAPI()

@app.websocket("/ws/chat")
async def websocket_chat(websocket: WebSocket):
    await websocket.accept()
    client = StreamingAIClient(OpenAIClient(os.getenv('OPENAI_API_KEY')))
    
    try:
        while True:
            # Receive message from client
            data = await websocket.receive_text()
            message_data = json.loads(data)
            
            messages = message_data.get('messages', [])
            
            # Stream response back to client
            async for chunk in client.async_stream_chat_completion(messages):
                await websocket.send_text(json.dumps(chunk))
                
    except Exception as e:
        print(f"WebSocket error: {e}")
    finally:
        await websocket.close()

Security and Best Practices

API Key Management

# Secure API key management
import os
from cryptography.fernet import Fernet
import keyring

class SecureKeyManager:
    def __init__(self, service_name: str):
        self.service_name = service_name
        self.cipher_suite = Fernet(self._get_encryption_key())
    
    def _get_encryption_key(self) -> bytes:
        # Get or create encryption key from secure storage
        key = keyring.get_password("ai_api_manager", "encryption_key")
        if not key:
            key = Fernet.generate_key().decode()
            keyring.set_password("ai_api_manager", "encryption_key", key)
        return key.encode()
    
    def store_api_key(self, provider: str, api_key: str):
        encrypted_key = self.cipher_suite.encrypt(api_key.encode())
        keyring.set_password(self.service_name, provider, encrypted_key.decode())
    
    def get_api_key(self, provider: str) -> str:
        encrypted_key = keyring.get_password(self.service_name, provider)
        if not encrypted_key:
            raise ValueError(f"No API key found for {provider}")
        
        decrypted_key = self.cipher_suite.decrypt(encrypted_key.encode())
        return decrypted_key.decode()
    
    def delete_api_key(self, provider: str):
        keyring.delete_password(self.service_name, provider)

# Environment-based configuration
class Config:
    def __init__(self):
        self.required_keys = [
            'OPENAI_API_KEY',
            'ANTHROPIC_API_KEY', 
            'GEMINI_API_KEY'
        ]
        self._validate_config()
    
    def _validate_config(self):
        missing = [key for key in self.required_keys if not os.getenv(key)]
        if missing:
            raise EnvironmentError(f"Missing environment variables: {', '.join(missing)}")
    
    def get_openai_key(self) -> str:
        return os.getenv('OPENAI_API_KEY')
    
    def get_anthropic_key(self) -> str:
        return os.getenv('ANTHROPIC_API_KEY')
    
    def get_gemini_key(self) -> str:
        return os.getenv('GEMINI_API_KEY')

# Usage
config = Config()
key_manager = SecureKeyManager("my_ai_app")

# Store keys securely (run once)
key_manager.store_api_key("openai", config.get_openai_key())
key_manager.store_api_key("anthropic", config.get_anthropic_key())
key_manager.store_api_key("gemini", config.get_gemini_key())

Rate Limit Management

import time
from threading import Lock
from collections import defaultdict

class RateLimiter:
    def __init__(self):
        self.locks = defaultdict(Lock)
        self.request_times = defaultdict(list)
    
    def wait_if_needed(self, provider: str, max_requests: int, time_window: int):
        with self.locks[provider]:
            current_time = time.time()
            
            # Remove old requests outside the time window
            self.request_times[provider] = [
                t for t in self.request_times[provider] 
                if current_time - t < time_window
            ]
            
            # Check if we're at the limit
            if len(self.request_times[provider]) >= max_requests:
                # Calculate wait time
                oldest_request = self.request_times[provider][0]
                wait_time = time_window - (current_time - oldest_request)
                if wait_time > 0:
                    time.sleep(wait_time)
                
                # Update the list after waiting
                self.request_times[provider] = [
                    t for t in self.request_times[provider] 
                    if current_time - t < time_window
                ]
            
            # Add current request
            self.request_times[provider].append(current_time)

class RateLimitedAIClient:
    def __init__(self, manager: AIClientManager):
        self.manager = manager
        self.rate_limiter = RateLimiter()
        self.rate_limits = {
            'openai': (3500, 60),  # 3500 requests per minute
            'anthropic': (1000, 60),  # 1000 requests per minute
            'gemini': (60, 60)  # 60 requests per minute
        }
    
    def chat_completion_with_rate_limit(self, messages: List[Dict], **kwargs) -> Dict[str, Any]:
        # First, try without specifying provider to use fallback
        result = self.manager.chat_completion(messages, **kwargs)
        
        if result['success']:
            provider = result['provider']
            max_requests, time_window = self.rate_limits.get(provider, (60, 60))
            
            # Apply rate limiting for the successful provider
            self.rate_limiter.wait_if_needed(provider, max_requests, time_window)
        
        return result

Monitoring and Analytics

Performance Tracking

import time
from dataclasses import dataclass
from typing import List, Dict
from statistics import mean, median

@dataclass
class APIRequestMetrics:
    provider: str
    success: bool
    response_time: float
    tokens_used: int
    cost: float
    timestamp: float

class APIMonitor:
    def __init__(self):
        self.metrics: List[APIRequestMetrics] = []
    
    def record_request(
        self, 
        provider: str, 
        success: bool, 
        response_time: float,
        tokens_used: int = 0,
        cost: float = 0.0
    ):
        metric = APIRequestMetrics(
            provider=provider,
            success=success,
            response_time=response_time,
            tokens_used=tokens_used,
            cost=cost,
            timestamp=time.time()
        )
        self.metrics.append(metric)
    
    def get_provider_stats(self, provider: str, hours: int = 24) -> Dict[str, float]:
        cutoff_time = time.time() - (hours * 3600)
        provider_metrics = [
            m for m in self.metrics 
            if m.provider == provider and m.timestamp > cutoff_time
        ]
        
        if not provider_metrics:
            return {}
        
        success_rate = sum(1 for m in provider_metrics if m.success) / len(provider_metrics)
        response_times = [m.response_time for m in provider_metrics if m.success]
        total_cost = sum(m.cost for m in provider_metrics)
        total_tokens = sum(m.tokens_used for m in provider_metrics)
        
        return {
            'total_requests': len(provider_metrics),
            'success_rate': success_rate,
            'avg_response_time': mean(response_times) if response_times else 0,
            'median_response_time': median(response_times) if response_times else 0,
            'total_cost': total_cost,
            'total_tokens': total_tokens,
            'cost_per_request': total_cost / len(provider_metrics) if provider_metrics else 0
        }
    
    def get_overall_stats(self, hours: int = 24) -> Dict[str, any]:
        cutoff_time = time.time() - (hours * 3600)
        recent_metrics = [m for m in self.metrics if m.timestamp > cutoff_time]
        
        if not recent_metrics:
            return {}
        
        providers = set(m.provider for m in recent_metrics)
        provider_stats = {p: self.get_provider_stats(p, hours) for p in providers}
        
        return {
            'total_requests': len(recent_metrics),
            'success_rate': sum(1 for m in recent_metrics if m.success) / len(recent_metrics),
            'total_cost': sum(m.cost for m in recent_metrics),
            'providers': provider_stats
        }

# Decorator for automatic monitoring
def monitor_ai_requests(monitor: APIMonitor):
    def decorator(func):
        def wrapper(*args, **kwargs):
            start_time = time.time()
            try:
                result = func(*args, **kwargs)
                response_time = time.time() - start_time
                
                monitor.record_request(
                    provider=result.get('provider', 'unknown'),
                    success=result.get('success', False),
                    response_time=response_time,
                    tokens_used=result.get('tokens_used', 0),
                    cost=result.get('cost', 0.0)
                )
                
                return result
            except Exception as e:
                response_time = time.time() - start_time
                monitor.record_request(
                    provider='unknown',
                    success=False,
                    response_time=response_time
                )
                raise e
        return wrapper
    return decorator

Deployment Considerations

Environment Considerations Recommended Setup Cost Impact
Development Rapid iteration, testing Single provider, generous limits Low - test usage only
Staging Production-like testing Multi-provider, realistic limits Medium - limited testing
Production Reliability, scalability Multi-provider with fallbacks High - actual usage

Production Checklist

  • Implement proper error handling and retry logic
  • Set up rate limiting and usage monitoring
  • Use secure API key management
  • Implement cost tracking and alerts
  • Set up logging and monitoring
  • Create fallback mechanisms for provider failures
  • Test with realistic load and failure scenarios
  • Document integration patterns and procedures