API Best Practices
Follow these guidelines for optimal CTWiseAPI integration.
Authentication​
Secure Key Storage​
# Good - Environment variable
import os
api_key = os.getenv("CTWISE_API_KEY")
# Good - AWS Secrets Manager
import boto3
secrets = boto3.client('secretsmanager')
api_key = secrets.get_secret_value(SecretId='ctwise/api-key')['SecretString']
# Bad - Hardcoded (NEVER do this)
api_key = "ctwise_sk_live_xxx" # DON'T DO THIS
Key Rotation​
- Create a new API key in the dashboard
- Update your application to use the new key
- Verify the new key works in production
- Revoke the old key
# Graceful key rotation
import os
PRIMARY_KEY = os.getenv("CTWISE_API_KEY_PRIMARY")
FALLBACK_KEY = os.getenv("CTWISE_API_KEY_FALLBACK")
def get_client():
try:
client = CTWiseClient(api_key=PRIMARY_KEY)
# Test the key
client.health_check()
return client
except AuthenticationError:
return CTWiseClient(api_key=FALLBACK_KEY)
Error Handling​
Comprehensive Error Handling​
from ctwise import CTWiseClient
from ctwise.exceptions import (
CTWiseError,
AuthenticationError,
RateLimitError,
ValidationError,
ServerError,
)
import time
import logging
logger = logging.getLogger(__name__)
def search_with_retry(client, criteria, max_retries=3):
"""Search with automatic retry on transient errors."""
for attempt in range(max_retries):
try:
return client.requirements.search(**criteria)
except AuthenticationError:
logger.error("Authentication failed - check API key")
raise # Don't retry auth errors
except RateLimitError as e:
if attempt < max_retries - 1:
wait_time = e.retry_after or (2 ** attempt)
logger.warning(f"Rate limited, waiting {wait_time}s")
time.sleep(wait_time)
else:
raise
except ServerError as e:
if attempt < max_retries - 1:
wait_time = 2 ** attempt
logger.warning(f"Server error, retrying in {wait_time}s")
time.sleep(wait_time)
else:
raise
except ValidationError as e:
logger.error(f"Invalid request: {e.message}")
raise # Don't retry validation errors
raise CTWiseError("Max retries exceeded")
JavaScript Error Handling​
import { CTWiseClient } from '@ctwise/sdk';
import {
RateLimitError,
AuthenticationError,
ServerError,
} from '@ctwise/sdk/errors';
async function searchWithRetry(
client: CTWiseClient,
criteria: SearchCriteria,
maxRetries = 3
): Promise<SearchResponse> {
for (let attempt = 0; attempt < maxRetries; attempt++) {
try {
return await client.requirements.search(criteria);
} catch (error) {
if (error instanceof AuthenticationError) {
throw error; // Don't retry
}
if (error instanceof RateLimitError) {
if (attempt < maxRetries - 1) {
await delay(error.retryAfter * 1000);
continue;
}
}
if (error instanceof ServerError && attempt < maxRetries - 1) {
await delay(Math.pow(2, attempt) * 1000);
continue;
}
throw error;
}
}
throw new Error('Max retries exceeded');
}
Rate Limiting​
Understand Your Limits​
| Tier | Monthly Calls | Per-Second Rate |
|---|---|---|
| Free | 1,000 | 2 |
| Starter | 10,000 | 10 |
| Pro | 50,000 | 50 |
| Enterprise | Unlimited | 100 |
Implement Client-Side Rate Limiting​
import time
from collections import deque
from threading import Lock
class RateLimiter:
def __init__(self, calls_per_second):
self.calls_per_second = calls_per_second
self.calls = deque()
self.lock = Lock()
def wait(self):
with self.lock:
now = time.time()
# Remove calls older than 1 second
while self.calls and now - self.calls[0] > 1:
self.calls.popleft()
if len(self.calls) >= self.calls_per_second:
sleep_time = 1 - (now - self.calls[0])
if sleep_time > 0:
time.sleep(sleep_time)
self.calls.append(time.time())
# Usage
limiter = RateLimiter(calls_per_second=10)
def rate_limited_search(client, criteria):
limiter.wait()
return client.requirements.search(**criteria)
Monitor Usage​
class UsageTracker:
def __init__(self, monthly_limit):
self.monthly_limit = monthly_limit
self.current_usage = 0
def track(self):
self.current_usage += 1
usage_percent = (self.current_usage / self.monthly_limit) * 100
if usage_percent >= 90:
logging.warning(f"Usage at {usage_percent:.1f}%")
elif usage_percent >= 80:
logging.info(f"Usage at {usage_percent:.1f}%")
def can_make_request(self):
return self.current_usage < self.monthly_limit
Caching​
Implement Response Caching​
from functools import lru_cache
import hashlib
import json
@lru_cache(maxsize=100)
def search_cached(criteria_hash):
"""Cache search results by criteria hash."""
criteria = json.loads(criteria_hash)
return client.requirements.search(**criteria)
def search_with_cache(criteria):
"""Convert criteria to hashable form for caching."""
criteria_hash = hashlib.md5(
json.dumps(criteria, sort_keys=True).encode()
).hexdigest()
return search_cached(criteria_hash)
Redis Caching for Distributed Systems​
import redis
import json
from datetime import timedelta
redis_client = redis.Redis(host='localhost', port=6379)
def search_with_redis_cache(client, criteria, ttl_hours=24):
"""Cache results in Redis."""
cache_key = f"ctwise:search:{hash(json.dumps(criteria, sort_keys=True))}"
# Check cache
cached = redis_client.get(cache_key)
if cached:
return json.loads(cached)
# Make API call
results = client.requirements.search(**criteria)
# Cache results
redis_client.setex(
cache_key,
timedelta(hours=ttl_hours),
json.dumps(results.dict())
)
return results
Pagination​
Handle Large Result Sets​
def get_all_requirements(client, criteria):
"""Fetch all results with pagination."""
all_results = []
offset = 0
page_size = 100
while True:
results = client.requirements.search(
**criteria,
max_results=page_size,
offset=offset
)
all_results.extend(results.requirements)
if len(results.requirements) < page_size:
break # Last page
offset += page_size
return all_results
Use Async for Parallel Fetching​
import asyncio
from ctwise import AsyncCTWiseClient
async def fetch_multiple_areas(areas):
"""Fetch requirements for multiple areas in parallel."""
async with AsyncCTWiseClient() as client:
tasks = [
client.requirements.search(therapeutic_area=area)
for area in areas
]
results = await asyncio.gather(*tasks)
return dict(zip(areas, results))
# Usage
areas = ["Oncology", "Cardiovascular", "Neurology"]
results = asyncio.run(fetch_multiple_areas(areas))
Logging​
Structured Logging​
import logging
import json
class JSONFormatter(logging.Formatter):
def format(self, record):
log_data = {
"timestamp": self.formatTime(record),
"level": record.levelname,
"message": record.getMessage(),
"module": record.module,
}
if hasattr(record, 'api_call'):
log_data['api_call'] = record.api_call
return json.dumps(log_data)
# Setup
handler = logging.StreamHandler()
handler.setFormatter(JSONFormatter())
logger = logging.getLogger("ctwise")
logger.addHandler(handler)
logger.setLevel(logging.INFO)
# Usage
def logged_search(client, criteria):
start = time.time()
results = client.requirements.search(**criteria)
duration = time.time() - start
logger.info(
"API call completed",
extra={
"api_call": {
"endpoint": "requirements/search",
"duration_ms": duration * 1000,
"result_count": results.total_count
}
}
)
return results
Testing​
Mock API Responses​
import pytest
from unittest.mock import Mock, patch
from ctwise import CTWiseClient
@pytest.fixture
def mock_client():
with patch('ctwise.CTWiseClient') as MockClient:
client = MockClient.return_value
client.requirements.search.return_value = Mock(
total_count=2,
requirements=[
Mock(
requirement_id="REQ-001",
title="Test Requirement",
authority="FDA"
)
]
)
yield client
def test_search_returns_requirements(mock_client):
results = mock_client.requirements.search(
therapeutic_area="Oncology"
)
assert results.total_count == 2
assert results.requirements[0].authority == "FDA"
Integration Tests​
import os
import pytest
@pytest.mark.integration
def test_real_api_connection():
"""Test actual API connectivity (requires API key)."""
if not os.getenv("CTWISE_API_KEY"):
pytest.skip("No API key configured")
client = CTWiseClient()
results = client.requirements.search(
therapeutic_area="Oncology",
max_results=1
)
assert results.total_count > 0
assert len(results.requirements) == 1
Production Checklist​
Before going to production:
- API keys stored securely (not in code)
- Error handling covers all error types
- Retry logic with exponential backoff
- Rate limiting on client side
- Caching for repeated queries
- Logging for debugging and monitoring
- Timeouts configured appropriately
- Health checks verify API connectivity
- Usage alerts configured in dashboard
- Upgrade path planned for tier limits