Best Practices #
Recommended patterns for building reliable applications with the Nellie API.
Security #
API Key Management #
✅ DO #
# Use environment variables
import os
api_key = os.environ.get("NELLIE_API_KEY")
# Or use a secrets manager
from aws_secretsmanager import get_secret
api_key = get_secret("nellie/api-key")
❌ DON’T #
# Never hardcode API keys
api_key = "nel_abc123..." # BAD!
# Never commit to version control
# .env files should be in .gitignore
Key Rotation #
Rotate API keys regularly, especially:
- When team members leave
- After potential exposure
- As part of regular security hygiene
# Zero-downtime key rotation
# 1. Create new key in API dashboard
# 2. Update application config
new_key = os.environ.get("NELLIE_API_KEY_NEW")
client = Nellie(api_key=new_key)
# 3. Deploy and verify
# 4. Revoke old key in API dashboard
Webhook Security #
Always verify webhook signatures:
from nellie_api import Webhook, WebhookSignatureError
@app.route("/webhook", methods=["POST"])
def webhook():
try:
event = Webhook.construct_event(
request.data,
request.headers.get("X-Nellie-Signature"),
WEBHOOK_SECRET
)
except WebhookSignatureError:
# Log the attempt for security monitoring
logger.warning(f"Invalid webhook signature from {request.remote_addr}")
return "Invalid signature", 400
# Process verified event
handle_event(event)
return "OK", 200
Server-Side Only #
Never expose your API key in client-side code:
// ❌ BAD - Client-side JavaScript
const response = await fetch('https://api.nelliewriter.com/v1/book', {
headers: { 'X-API-Key': 'nel_abc123...' } // Exposed to users!
});
// ✅ GOOD - Call your own backend
const response = await fetch('/api/generate-book', {
method: 'POST',
body: JSON.stringify({ prompt: 'A mystery novel' })
});
Your backend then calls Nellie:
# Backend API route
@app.route("/api/generate-book", methods=["POST"])
def generate_story():
client = Nellie(api_key=os.environ["NELLIE_API_KEY"])
book = client.books.create(prompt=request.json["prompt"])
return jsonify({"requestId": book.request_id})
Error Handling #
Implement Retry Logic #
from nellie_api import Nellie, RateLimitError, APIError
import time
from functools import wraps
def with_retry(max_attempts=3, backoff_factor=2):
""Decorator for retry logic with exponential backoff.""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
last_exception = None
for attempt in range(max_attempts):
try:
return func(*args, **kwargs)
except RateLimitError as e:
last_exception = e
wait_time = backoff_factor ** attempt * 30
print(f"Rate limited. Waiting {wait_time}s...")
time.sleep(wait_time)
except APIError as e:
if e.status_code >= 500:
last_exception = e
wait_time = backoff_factor ** attempt * 10
time.sleep(wait_time)
else:
raise # Don't retry client errors
raise last_exception
return wrapper
return decorator
@with_retry(max_attempts=3)
def create_book(prompt: str):
client = Nellie()
return client.books.create(prompt=prompt)
Handle All Error Types #
from nellie_api import (
NellieError,
AuthenticationError,
RateLimitError,
APIError
)
def safe_generate(prompt: str) -> dict:
""Generate with comprehensive error handling.""
try:
client = Nellie()
book = client.books.create(prompt=prompt)
return {"success": True, "request_id": book.request_id}
except AuthenticationError:
return {
"success": False,
"error": "auth_error",
"message": "Invalid API key. Check your configuration."
}
except RateLimitError:
return {
"success": False,
"error": "rate_limit",
"message": "Too many requests. Please try again later."
}
except APIError as e:
return {
"success": False,
"error": "api_error",
"message": str(e),
"status_code": e.status_code
}
except NellieError as e:
return {
"success": False,
"error": "unknown",
"message": str(e)
}
Graceful Degradation #
def generate_content(prompt: str) -> str:
""Generate content with fallback behavior.""
try:
client = Nellie()
book = client.books.create(prompt=prompt)
result = client.books.wait_for_completion(book.request_id)
if result.is_successful():
return result.result_url
else:
# Fallback: notify user of failure
return notify_user_of_delay(book.request_id)
except NellieError:
# Fallback: queue for later processing
return queue_for_retry(prompt)
Performance #
Use Webhooks Over Polling #
Webhooks are more efficient and provide faster notifications:
# ✅ Preferred: Webhook-based flow
book = client.books.create(
prompt="A mystery novel",
webhook_url="https://myapp.com/webhooks/nellie"
)
# Your server receives a POST when complete
# ❌ Less efficient: Polling
book = client.books.create(prompt="A mystery novel")
while True:
status = client.books.retrieve(book.request_id)
if status.is_complete():
break
time.sleep(120) # Wasted API calls
Respect Rate Limits #
import time
from collections import deque
from threading import Lock
class RateLimiter:
""Simple rate limiter for API calls.""
def __init__(self, max_calls: int, period: float):
self.max_calls = max_calls
self.period = period
self.calls = deque()
self.lock = Lock()
def acquire(self):
with self.lock:
now = time.time()
# Remove old calls
while self.calls and self.calls[0] < now - self.period:
self.calls.popleft()
# Wait if at limit
if len(self.calls) >= self.max_calls:
sleep_time = self.calls[0] + self.period - now
time.sleep(sleep_time)
return self.acquire()
self.calls.append(now)
# Usage
limiter = RateLimiter(max_calls=1, period=6) # 1 request per 6 seconds
def create_book_limited(prompt: str):
limiter.acquire()
return client.books.create(prompt=prompt)
Cache Configuration Data #
from functools import lru_cache
from datetime import datetime, timedelta
@lru_cache(maxsize=1)
def get_cached_config():
""Cache configuration for 1 hour.""
return client.get_configuration()
# Or with time-based expiration
class ConfigCache:
def __init__(self, ttl_seconds=3600):
self.ttl = ttl_seconds
self.config = None
self.expires_at = None
def get(self):
now = datetime.now()
if self.config is None or now > self.expires_at:
self.config = client.get_configuration()
self.expires_at = now + timedelta(seconds=self.ttl)
return self.config
config_cache = ConfigCache()
styles = config_cache.get().styles
Reliability #
Idempotent Webhook Handlers #
Handle duplicate webhook deliveries gracefully:
from functools import lru_cache
# Track processed events
processed_events = set()
@app.route("/webhook", methods=["POST"])
def webhook():
event = Webhook.construct_event(...)
# Check if already processed
if event.request_id in processed_events:
return "Already processed", 200
# Process the event
process_book(event)
# Mark as processed
processed_events.add(event.request_id)
return "OK", 200
For production, use a database:
def handle_webhook(event):
# Use database with unique constraint
try:
db.execute(
"INSERT INTO processed_webhooks (request_id) VALUES (%s)",
(event.request_id,)
)
except UniqueViolation:
return # Already processed
# Process the event
process_book(event)
Timeout Handling #
from nellie_api import Nellie
client = Nellie(
api_key="nel_...",
timeout=60 # Request timeout
)
# For long-running operations
try:
result = client.books.wait_for_completion(
request_id,
timeout=7200, # 2 hour max wait
poll_interval=120
)
except TimeoutError:
# Handle timeout gracefully
logger.warning(f"Book {request_id} timed out")
notify_user_of_delay(request_id)
Health Checks #
Monitor API availability:
import requests
def check_nellie_health() -> bool:
""Check if Nellie API is responsive.""
try:
response = requests.get(
"https://api.nelliewriter.com/v1/configuration",
timeout=10
)
return response.status_code == 200
except requests.RequestException:
return False
# Use in health check endpoint
@app.route("/health")
def health():
checks = {
"database": check_database(),
"nellie_api": check_nellie_health(),
}
all_healthy = all(checks.values())
return jsonify(checks), 200 if all_healthy else 503
Architecture #
Asynchronous Processing #
Don’t block web requests on book generation:
# ✅ Good: Return immediately, process async
@app.route("/generate", methods=["POST"])
def generate():
prompt = request.json["prompt"]
# Queue the job
job_id = queue_generation_job(prompt)
return jsonify({
"status": "queued",
"job_id": job_id,
"message": "Generation started."
}), 202
# Background worker processes the queue
def worker():
while True:
job = get_next_job()
client = Nellie()
book = client.books.create(
prompt=job.prompt,
webhook_url=f"{BASE_URL}/internal/webhook/{job.id}"
)
update_job_status(job.id, "processing", book.request_id)
Separate Concerns #
# services/nellie_service.py
class NellieService:
""Encapsulate all Nellie API interactions.""
def __init__(self):
self.client = Nellie()
def start_generation(self, prompt: str, **kwargs) -> str:
""Start a book generation and return the request ID.""
book = self.client.books.create(prompt=prompt, **kwargs)
return book.request_id
def get_status(self, request_id: str) -> dict:
""Get generation status.""
status = self.client.books.retrieve(request_id)
return {
"status": status.status,
"progress": status.progress,
"result_url": status.result_url,
"error": status.error
}
def get_available_credits(self) -> int:
""Calculate available credits.""
usage = self.client.get_usage()
return CREDIT_LIMIT - usage.total_credits_used
# Use in routes
nellie = NellieService()
@app.route("/books", methods=["POST"])
def create_book():
request_id = nellie.start_generation(request.json["prompt"])
return jsonify({"request_id": request_id}), 202
Monitoring #
Log Important Events #
import logging
logger = logging.getLogger("nellie")
def generate_book(prompt: str):
logger.info(f"Starting generation", extra={"prompt_length": len(prompt)})
try:
book = client.books.create(prompt=prompt)
logger.info(f"Generation started", extra={
"request_id": book.request_id
})
result = client.books.wait_for_completion(book.request_id)
if result.is_successful():
logger.info("Generation completed", extra={
"request_id": result.request_id,
"credits_used": result.credits_used
})
else:
logger.error("Generation failed", extra={
"request_id": result.request_id,
"error": result.error
})
except Exception as e:
logger.exception("Unexpected error during generation")
raise
Track Metrics #
from prometheus_client import Counter, Histogram
# Define metrics
stories_created = Counter(
'nellie_stories_created_total',
'Total stories created',
['status']
)
generation_duration = Histogram(
'nellie_generation_duration_seconds',
'Time to complete generation'
)
credits_used = Counter(
'nellie_credits_used_total',
'Total credits consumed'
)
# Use in code
def generate_with_metrics(prompt: str):
with generation_duration.time():
book = client.books.create(prompt=prompt)
result = client.books.wait_for_completion(book.request_id)
stories_created.labels(status=result.status).inc()
if result.is_successful():
credits_used.inc(result.credits_used)
return result
Alert on Issues #
def handle_webhook(event):
if event.status == "failed":
# Send alert
send_alert(
level="warning",
title="Book Generation Failed",
message=f"Request {event.request_id} failed: {event.error}"
)
# Track failure rate
if should_alert_on_failure_rate():
send_alert(
level="critical",
title="High Failure Rate Detected",
message="Book generation failure rate exceeded threshold"
)
Cost Management #
Monitor Credit Usage #
def check_credit_budget():
""Alert if credit usage exceeds budget.""
usage = client.get_usage()
MONTHLY_BUDGET = 50000
ALERT_THRESHOLD = 0.8
if usage.total_credits_used > MONTHLY_BUDGET * ALERT_THRESHOLD:
send_alert(
level="warning",
title="Credit Budget Alert",
message=f"Used {usage.total_credits_used}/{MONTHLY_BUDGET} credits"
)
Pre-flight Credit Check #
def can_afford_generation(model: str = "2.0") -> bool:
""Check if we have enough credits.""
COSTS = {"2.0": 250, "3.0": 500}
BUFFER = 100 # Keep some reserve
usage = client.get_usage()
available = CREDIT_LIMIT - usage.total_credits_used
return available >= COSTS.get(model, 500) + BUFFER
Summary Checklist #
Before Going to Production #
- [ ] API key stored in environment variable or secrets manager
- [ ] Webhook signatures are verified
- [ ] Error handling covers all exception types
- [ ] Retry logic implemented with exponential backoff
- [ ] Rate limits respected
- [ ] Webhook handlers are idempotent
- [ ] Logging captures important events
- [ ] Monitoring and alerts configured
- [ ] Credit usage tracking in place
- [ ] Health checks implemented
- Authentication — API key management
- Errors — Error handling reference
- Rate Limits — Understanding limits
- Examples — Code examples
- SDK Reference — Python SDK documentation