Skip to main content

API Best Practices

Follow these guidelines for optimal CTWiseAPI integration.

Authentication​

Secure Key Storage​

# Good - Environment variable
import os
api_key = os.getenv("CTWISE_API_KEY")

# Good - AWS Secrets Manager
import boto3
secrets = boto3.client('secretsmanager')
api_key = secrets.get_secret_value(SecretId='ctwise/api-key')['SecretString']

# Bad - Hardcoded (NEVER do this)
api_key = "ctwise_sk_live_xxx" # DON'T DO THIS

Key Rotation​

  1. Create a new API key in the dashboard
  2. Update your application to use the new key
  3. Verify the new key works in production
  4. Revoke the old key
# Graceful key rotation
import os

PRIMARY_KEY = os.getenv("CTWISE_API_KEY_PRIMARY")
FALLBACK_KEY = os.getenv("CTWISE_API_KEY_FALLBACK")

def get_client():
try:
client = CTWiseClient(api_key=PRIMARY_KEY)
# Test the key
client.health_check()
return client
except AuthenticationError:
return CTWiseClient(api_key=FALLBACK_KEY)

Error Handling​

Comprehensive Error Handling​

from ctwise import CTWiseClient
from ctwise.exceptions import (
CTWiseError,
AuthenticationError,
RateLimitError,
ValidationError,
ServerError,
)
import time
import logging

logger = logging.getLogger(__name__)

def search_with_retry(client, criteria, max_retries=3):
"""Search with automatic retry on transient errors."""
for attempt in range(max_retries):
try:
return client.requirements.search(**criteria)

except AuthenticationError:
logger.error("Authentication failed - check API key")
raise # Don't retry auth errors

except RateLimitError as e:
if attempt < max_retries - 1:
wait_time = e.retry_after or (2 ** attempt)
logger.warning(f"Rate limited, waiting {wait_time}s")
time.sleep(wait_time)
else:
raise

except ServerError as e:
if attempt < max_retries - 1:
wait_time = 2 ** attempt
logger.warning(f"Server error, retrying in {wait_time}s")
time.sleep(wait_time)
else:
raise

except ValidationError as e:
logger.error(f"Invalid request: {e.message}")
raise # Don't retry validation errors

raise CTWiseError("Max retries exceeded")

JavaScript Error Handling​

import { CTWiseClient } from '@ctwise/sdk';
import {
RateLimitError,
AuthenticationError,
ServerError,
} from '@ctwise/sdk/errors';

async function searchWithRetry(
client: CTWiseClient,
criteria: SearchCriteria,
maxRetries = 3
): Promise<SearchResponse> {
for (let attempt = 0; attempt < maxRetries; attempt++) {
try {
return await client.requirements.search(criteria);
} catch (error) {
if (error instanceof AuthenticationError) {
throw error; // Don't retry
}
if (error instanceof RateLimitError) {
if (attempt < maxRetries - 1) {
await delay(error.retryAfter * 1000);
continue;
}
}
if (error instanceof ServerError && attempt < maxRetries - 1) {
await delay(Math.pow(2, attempt) * 1000);
continue;
}
throw error;
}
}
throw new Error('Max retries exceeded');
}

Rate Limiting​

Understand Your Limits​

TierMonthly CallsPer-Second Rate
Free1,0002
Starter10,00010
Pro50,00050
EnterpriseUnlimited100

Implement Client-Side Rate Limiting​

import time
from collections import deque
from threading import Lock

class RateLimiter:
def __init__(self, calls_per_second):
self.calls_per_second = calls_per_second
self.calls = deque()
self.lock = Lock()

def wait(self):
with self.lock:
now = time.time()
# Remove calls older than 1 second
while self.calls and now - self.calls[0] > 1:
self.calls.popleft()

if len(self.calls) >= self.calls_per_second:
sleep_time = 1 - (now - self.calls[0])
if sleep_time > 0:
time.sleep(sleep_time)

self.calls.append(time.time())

# Usage
limiter = RateLimiter(calls_per_second=10)

def rate_limited_search(client, criteria):
limiter.wait()
return client.requirements.search(**criteria)

Monitor Usage​

class UsageTracker:
def __init__(self, monthly_limit):
self.monthly_limit = monthly_limit
self.current_usage = 0

def track(self):
self.current_usage += 1
usage_percent = (self.current_usage / self.monthly_limit) * 100

if usage_percent >= 90:
logging.warning(f"Usage at {usage_percent:.1f}%")
elif usage_percent >= 80:
logging.info(f"Usage at {usage_percent:.1f}%")

def can_make_request(self):
return self.current_usage < self.monthly_limit

Caching​

Implement Response Caching​

from functools import lru_cache
import hashlib
import json

@lru_cache(maxsize=100)
def search_cached(criteria_hash):
"""Cache search results by criteria hash."""
criteria = json.loads(criteria_hash)
return client.requirements.search(**criteria)

def search_with_cache(criteria):
"""Convert criteria to hashable form for caching."""
criteria_hash = hashlib.md5(
json.dumps(criteria, sort_keys=True).encode()
).hexdigest()
return search_cached(criteria_hash)

Redis Caching for Distributed Systems​

import redis
import json
from datetime import timedelta

redis_client = redis.Redis(host='localhost', port=6379)

def search_with_redis_cache(client, criteria, ttl_hours=24):
"""Cache results in Redis."""
cache_key = f"ctwise:search:{hash(json.dumps(criteria, sort_keys=True))}"

# Check cache
cached = redis_client.get(cache_key)
if cached:
return json.loads(cached)

# Make API call
results = client.requirements.search(**criteria)

# Cache results
redis_client.setex(
cache_key,
timedelta(hours=ttl_hours),
json.dumps(results.dict())
)

return results

Pagination​

Handle Large Result Sets​

def get_all_requirements(client, criteria):
"""Fetch all results with pagination."""
all_results = []
offset = 0
page_size = 100

while True:
results = client.requirements.search(
**criteria,
max_results=page_size,
offset=offset
)

all_results.extend(results.requirements)

if len(results.requirements) < page_size:
break # Last page

offset += page_size

return all_results

Use Async for Parallel Fetching​

import asyncio
from ctwise import AsyncCTWiseClient

async def fetch_multiple_areas(areas):
"""Fetch requirements for multiple areas in parallel."""
async with AsyncCTWiseClient() as client:
tasks = [
client.requirements.search(therapeutic_area=area)
for area in areas
]
results = await asyncio.gather(*tasks)
return dict(zip(areas, results))

# Usage
areas = ["Oncology", "Cardiovascular", "Neurology"]
results = asyncio.run(fetch_multiple_areas(areas))

Logging​

Structured Logging​

import logging
import json

class JSONFormatter(logging.Formatter):
def format(self, record):
log_data = {
"timestamp": self.formatTime(record),
"level": record.levelname,
"message": record.getMessage(),
"module": record.module,
}
if hasattr(record, 'api_call'):
log_data['api_call'] = record.api_call
return json.dumps(log_data)

# Setup
handler = logging.StreamHandler()
handler.setFormatter(JSONFormatter())
logger = logging.getLogger("ctwise")
logger.addHandler(handler)
logger.setLevel(logging.INFO)

# Usage
def logged_search(client, criteria):
start = time.time()
results = client.requirements.search(**criteria)
duration = time.time() - start

logger.info(
"API call completed",
extra={
"api_call": {
"endpoint": "requirements/search",
"duration_ms": duration * 1000,
"result_count": results.total_count
}
}
)
return results

Testing​

Mock API Responses​

import pytest
from unittest.mock import Mock, patch
from ctwise import CTWiseClient

@pytest.fixture
def mock_client():
with patch('ctwise.CTWiseClient') as MockClient:
client = MockClient.return_value
client.requirements.search.return_value = Mock(
total_count=2,
requirements=[
Mock(
requirement_id="REQ-001",
title="Test Requirement",
authority="FDA"
)
]
)
yield client

def test_search_returns_requirements(mock_client):
results = mock_client.requirements.search(
therapeutic_area="Oncology"
)
assert results.total_count == 2
assert results.requirements[0].authority == "FDA"

Integration Tests​

import os
import pytest

@pytest.mark.integration
def test_real_api_connection():
"""Test actual API connectivity (requires API key)."""
if not os.getenv("CTWISE_API_KEY"):
pytest.skip("No API key configured")

client = CTWiseClient()
results = client.requirements.search(
therapeutic_area="Oncology",
max_results=1
)

assert results.total_count > 0
assert len(results.requirements) == 1

Production Checklist​

Before going to production:

  • API keys stored securely (not in code)
  • Error handling covers all error types
  • Retry logic with exponential backoff
  • Rate limiting on client side
  • Caching for repeated queries
  • Logging for debugging and monitoring
  • Timeouts configured appropriately
  • Health checks verify API connectivity
  • Usage alerts configured in dashboard
  • Upgrade path planned for tier limits