
9 - Beyond the Sandbox: Production AI Systems with Claude Code
Table of Contents
- Introduction
- Part 1: The Integration Maturity Model
- Part 2: Building Custom MCP Servers
- Part 3: Claude API & Programmatic Usage
- Part 4: Enterprise Security & Governance
- Part 5: Production Observability
- Part 6: Third-Party Integrations
- Part 7: Zimbra Production Deployment
- Part 8: Cost Optimization
- FAQ
- Conclusion
Introduction
From Desktop Tool to Production System
You've been using Claude Code on your laptop. Maybe you automated a few tasks. Perhaps your team started experimenting.
Now comes the hard question: "Can we put this in production?"
Here's what production means:
| Production Concern | The Reality | Impact | |-------------------|------------|---------| | ๐ Security | 69% of enterprises cite AI data leaks as top concern | Data breaches, compliance violations | | ๐ Compliance | 55% are unprepared for AI regulatory requirements | Fines, legal exposure | | ๐๏ธ Governance | 64% lack visibility into AI risks | Uncontrolled usage, budget overruns | | ๐ Scale | Traditional monitoring fails for probabilistic systems | Silent failures, quality degradation | | ๐ฐ Cost | Uncontrolled token usage blows budgets | Surprise bills, project shutdowns |
This isn't about installing Claude Code on more laptops. This is about building a Production AI System:
- โ๏ธ Custom MCP servers connecting Claude to your data and tools
- ๐ Enterprise security with SSO, RBAC, audit logs, PII detection
- ๐ Production observability tracking latency, tokens, errors, and quality
- ๐ค Programmatic automation via API, headless mode, CI/CD integration
- ๐ Third-party ecosystems (Slack, Discord, GitHub, monitoring)
- ๐ Governance frameworks meeting GDPR, HIPAA, SOC 2 requirements
The stakes: OpenAI, Google DeepMind, and Microsoft all adopted the Model Context Protocol in 2025. The ecosystem is consolidating around standards. Now is the time to build production-grade AI integrations.
By the end, you'll have:
- ๐ The Integration Maturity Model (5 levels from desktop to AI OS)
- ๐๏ธ Complete MCP server implementations (data sources, actions, workflows)
- ๐ก๏ธ Enterprise security framework (authentication, data protection, compliance)
- ๐ Production observability stack (metrics, logs, traces, alerts)
- ๐ Real deployment playbooks (Zimbra, Slack, GitHub, monitoring)
Let's build production AI systems.
Part 1: The Integration Maturity Model
The 5 Levels of AI System Integration
Most teams start at Level 1 and think they're done. Production systems operate at Level 4-5.
The Integration Maturity Model: Progression from individual desktop tool (Level 1) to strategic AI operating system (Level 5) with enterprise governance, observability, and ecosystem integration
Understanding the Maturity Levels
| Level | Name | Adoption | Key Indicator | Investment Required | |-------|------|----------|---------------|---------------------| | 1๏ธโฃ | Basic Usage | 90% | Manual CLI conversations | Hours | | 2๏ธโฃ | API Integration | 8% | Automated workflows, CI/CD | Days | | 3๏ธโฃ | MCP Ecosystem | 1.5% | Custom servers, tool orchestration | Weeks | | 4๏ธโฃ | Enterprise Platform | 0.4% | SSO, RBAC, compliance | Months | | 5๏ธโฃ | AI Operating System | <0.1% | Full ecosystem, self-service | Quarters |
Level 1: Basic Usage (Desktop Tool) ๐ฅ๏ธ
Capabilities:
- Local CLI (
claude) - IDE extension (VS Code)
- Manual workflows
- Single user
Use Cases:
- Individual developers
- Learning and prototyping
- Side projects
Limitations:
- โ No automation
- โ No team coordination
- โ No governance
- โ No observability
Example:
$ claude
> "Help me debug this function"
[Manual conversation]
Who's Here: 90% of teams. You're experimenting, proving value.
Level 2: API Integration (Programmatic) ๐
Capabilities:
- Claude API programmatic access
- Headless mode (CI/CD)
- Pre-commit hooks
- Automated workflows
Use Cases:
- Automated code reviews
- Build pipeline integration
- Scheduled tasks
Limitations:
- โ Point integrations (not ecosystem)
- โ Manual security management
- โ Limited observability
Example:
# Headless code review in CI
claude -p "Review PR #${PR_NUMBER}" \
--output-format stream-json \
> review.json
Who's Here: 8% of teams. You're automating specific workflows.
Level 3: MCP Ecosystem (Connected) ๐
Capabilities:
- Custom MCP servers
- Tool integrations (GitHub, Slack, databases)
- Resource sharing (documents, APIs, knowledge bases)
- Prompt libraries
Use Cases:
- Connected workflows
- Data access across systems
- Tool orchestration
- Reusable templates
Limitations:
- โ Single-tenant architecture
- โ Limited governance controls
- โ Manual compliance
Example:
// Custom knowledge base MCP server
server.setRequestHandler(ListResourcesRequestSchema, async () => ({
resources: [
{ uri: "kb://policies/security", name: "Security Policies" },
{ uri: "kb://docs/api", name: "API Documentation" }
]
}));
Who's Here: 1.5% of teams. You're building connected ecosystems.
Level 4: Enterprise Platform (Governed) ๐ข
Capabilities:
- SSO integration (SAML/OAuth/OIDC)
- RBAC (role-based access control)
- Comprehensive audit logs
- PII/PCI detection
- Compliance frameworks (GDPR, HIPAA, SOC 2)
- Cost tracking per user/team
Use Cases:
- Team deployment at scale
- Regulated industries
- Enterprise security requirements
- Multi-tenant environments
Enterprise Security Table:
| Security Layer | Components | Compliance Coverage | |---------------|-----------|---------------------| | Authentication | SSO, MFA, session management | GDPR, HIPAA, SOC 2 | | Authorization | RBAC, attribute-based access | PCI DSS, SOC 2 | | Data Protection | PII detection, encryption, residency | GDPR, HIPAA, CCPA | | Audit & Compliance | Logs, reports, incident response | All frameworks |
Example:
# Enterprise policy enforcement
policies:
- name: "PII Detection"
scan_input: true
scan_output: true
block_on_match: true
alert_security_team: true
- name: "Production Access"
resources: ["prod-db", "prod-api"]
requires_approval: true
approvers: ["security-team"]
Who's Here: 0.4% of teams. You're running AI as enterprise infrastructure.
Level 5: AI Operating System (Strategic) ๐
Capabilities:
- Full ecosystem integration (all tools, all data)
- Real-time observability (OpenTelemetry)
- Self-service deployment (platform engineering)
- Automated governance (policy as code)
- Multi-region deployment
- Cost optimization engine
- Quality gates (acceptance rates, security)
Use Cases:
- Organization-wide transformation
- AI-first development culture
- Strategic competitive advantage
Requirements:
- Platform engineering team
- Production SRE practices
- Continuous investment
Example:
# Self-service MCP server deployment
$ ai-platform deploy-mcp-server \
--name internal-kb \
--source ./kb-server \
--region us-east-1,eu-west-1 \
--auto-scale \
--compliance gdpr,soc2 \
--monitoring enabled
โ Deployed to 2 regions
โ Compliance checks passed
โ Monitoring dashboards created
โ Cost budgets configured
Who's Here: <0.1% of teams. You're building the future.
Your Journey Map
Where are you now? Level 1? Level 2?
Where do you need to be?
| Organization Type | Target Level | Timeline | Key Focus | |------------------|--------------|----------|-----------| | ๐ Startup | Level 3 | 2-3 months | MCP ecosystem, velocity | | ๐ญ Mid-Market | Level 4 | 3-6 months | Governance, security | | ๐ Enterprise | Level 4-5 | 6-12 months | Full platform, compliance |
The gap: Let's bridge it.
๐ก Active Recall: Before reading further, ask yourself: Which maturity level capabilities would have the biggest impact on your team's productivity today?
Part 2: Building Custom MCP Servers
MCP Architecture Primer
Model Context Protocol (MCP): Open standard (Anthropic, November 2024) for connecting AI systems to external tools and data.
Adopted by:
- โ OpenAI (March 2025): ChatGPT, Agents SDK, Responses API
- โ Google DeepMind (April 2025): Gemini models
- โ Microsoft (2025): Copilot Studio
Components:
- Hosts: LLM applications (Claude Code, ChatGPT, Gemini)
- Clients: Connectors within hosts
- Servers: You build these! Provide resources, tools, prompts
Official SDKs: Python, TypeScript, C#, Java
Resources:
- Model Context Protocol Specification - Official MCP standard documentation
- MCP TypeScript SDK - Official TypeScript implementation
- MCP Python SDK - Official Python implementation
Model Context Protocol Architecture: How LLM applications (Hosts) connect through MCP clients to custom servers that provide access to data sources, APIs, and tools via JSON-RPC 2.0 protocol
The 4 MCP Server Patterns
| Pattern | Purpose | Use When | Complexity | |---------|---------|----------|------------| | ๐๏ธ Data Source | Expose internal data | Knowledge bases, databases | Low | | โก Action Executor | Execute operations | Deployments, API calls | Medium | | ๐ Prompt Library | Templated workflows | Code reviews, reports | Low | | ๐ Observability | Inject monitoring | Tracing, metrics | Medium-High |
Pattern 1: Data Source Connector ๐๏ธ
Use Case: Expose internal data to Claude
Example: Company Knowledge Base Server
server.ts:
import { Server } from "@modelcontextprotocol/sdk/server/index.js";
import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";
import {
CallToolRequestSchema,
ListResourcesRequestSchema,
ListToolsRequestSchema,
ReadResourceRequestSchema,
} from "@modelcontextprotocol/sdk/types.js";
// Your internal knowledge base API
import { KnowledgeBaseAPI } from "./kb-api.js";
const kb = new KnowledgeBaseAPI({
apiKey: process.env.KB_API_KEY,
baseUrl: "https://kb.company.internal"
});
const server = new Server(
{
name: "company-kb-server",
version: "1.0.0",
},
{
capabilities: {
resources: {},
tools: {},
},
}
);
// List available documents
server.setRequestHandler(ListResourcesRequestSchema, async () => {
const docs = await kb.listDocuments();
return {
resources: docs.map(doc => ({
uri: `kb://docs/${doc.id}`,
name: doc.title,
description: doc.summary,
mimeType: "text/plain",
})),
};
});
// Read document content
server.setRequestHandler(ReadResourceRequestSchema, async (request) => {
const docId = request.params.uri.split("/").pop();
const content = await kb.getDocument(docId);
return {
contents: [{
uri: request.params.uri,
mimeType: "text/plain",
text: content.body,
}],
};
});
// Search functionality
server.setRequestHandler(ListToolsRequestSchema, async () => ({
tools: [{
name: "search_kb",
description: "Search company knowledge base for relevant documents",
inputSchema: {
type: "object",
properties: {
query: {
type: "string",
description: "Search query"
},
limit: {
type: "number",
description: "Max results (default 5)",
default: 5
}
},
required: ["query"]
}
}]
}));
server.setRequestHandler(CallToolRequestSchema, async (request) => {
if (request.params.name === "search_kb") {
const { query, limit = 5 } = request.params.arguments;
const results = await kb.search(query, limit);
return {
content: [{
type: "text",
text: JSON.stringify(results, null, 2)
}]
};
}
throw new Error(`Unknown tool: ${request.params.name}`);
});
// Start server
async function main() {
const transport = new StdioServerTransport();
await server.connect(transport);
console.error("Company KB MCP server running on stdio");
}
main().catch(console.error);
Configuration (.claude/mcp-servers.json):
{
"mcpServers": {
"company-kb": {
"command": "node",
"args": ["dist/server.js"],
"env": {
"KB_API_KEY": "${KB_API_KEY}"
}
}
}
}
Usage:
$ claude
> "Search our knowledge base for API authentication best practices"
[Claude uses search_kb tool, retrieves results, summarizes findings]
Pattern 2: Action Executor โก
Use Case: Execute operations on behalf of Claude
Example: Deployment Automation Server
Security Checklist for Action Executors:
| Security Control | Implementation | Why Critical | |-----------------|----------------|--------------| | โ Authorization | Check RBAC before execution | Prevent unauthorized deployments | | โ Input Validation | Enum/pattern constraints | Prevent injection attacks | | โ Timeouts | 5-minute max execution | Prevent resource exhaustion | | โ Audit Logging | All attempts logged | Compliance & forensics | | โ Notifications | Slack/email on completion | Team awareness | | โ Rollback Support | Emergency abort capability | Disaster recovery |
deploy-server.ts:
import { Server } from "@modelcontextprotocol/sdk/server/index.js";
import { execSync } from "child_process";
const server = new Server(
{ name: "deploy-server", version: "1.0.0" },
{ capabilities: { tools: {} } }
);
// Authorization check
async function checkDeployPermission(service: string, env: string): Promise<boolean> {
// Query your RBAC system
const user = process.env.CLAUDE_USER;
const hasPermission = await rbac.check(user, `deploy:${service}:${env}`);
return hasPermission;
}
server.setRequestHandler(ListToolsRequestSchema, async () => ({
tools: [
{
name: "deploy_staging",
description: "Deploy service to staging environment",
inputSchema: {
type: "object",
properties: {
service: { type: "string", enum: ["api", "web", "worker"] },
version: { type: "string", pattern: "^v\\d+\\.\\d+\\.\\d+$" }
},
required: ["service", "version"]
}
},
{
name: "rollback_staging",
description: "Rollback service in staging to previous version",
inputSchema: {
type: "object",
properties: {
service: { type: "string", enum: ["api", "web", "worker"] }
},
required: ["service"]
}
}
]
}));
server.setRequestHandler(CallToolRequestSchema, async (request) => {
const { name, arguments: args } = request.params;
if (name === "deploy_staging") {
// Authorization check
if (!await checkDeployPermission(args.service, "staging")) {
return {
content: [{
type: "text",
text: "โ Unauthorized: You don't have permission to deploy this service"
}],
isError: true
};
}
// Execute deployment
try {
const result = execSync(
`./scripts/deploy.sh ${args.service} ${args.version} staging`,
{ encoding: "utf-8", timeout: 300000 }
);
// Send notification
await slack.notify({
channel: "#deployments",
text: `โ
${args.service} ${args.version} deployed to staging by Claude`
});
return {
content: [{
type: "text",
text: `โ
Successfully deployed ${args.service} ${args.version} to staging\n\n${result}`
}]
};
} catch (error) {
return {
content: [{
type: "text",
text: `โ Deployment failed: ${error.message}`
}],
isError: true
};
}
}
if (name === "rollback_staging") {
// Similar implementation
}
throw new Error(`Unknown tool: ${name}`);
});
Safety Considerations:
- โ Authorization checks before execution
- โ Restricted service/environment enum
- โ Version validation (semantic versioning)
- โ Command timeout (5 minutes)
- โ Notification on completion
- โ Audit logging
Pattern 3: Prompt Library ๐
Use Case: Provide templated workflows
Example: Code Review Templates
review-prompts-server.ts:
server.setRequestHandler(ListPromptsRequestSchema, async () => ({
prompts: [
{
name: "code_review",
description: "Comprehensive code review workflow",
arguments: [
{
name: "pr_number",
description: "Pull request number",
required: true
},
{
name: "focus",
description: "Review focus area",
required: false
}
]
},
{
name: "security_audit",
description: "Security-focused code audit",
arguments: [
{
name: "files",
description: "Files to audit (glob pattern)",
required: true
}
]
},
{
name: "refactor_assessment",
description: "Assess refactoring opportunities",
arguments: [
{
name: "directory",
description: "Directory to analyze",
required: true
}
]
}
]
}));
server.setRequestHandler(GetPromptRequestSchema, async (request) => {
const { name, arguments: args } = request.params;
if (name === "code_review") {
const pr = await github.getPR(args.pr_number);
return {
messages: [{
role: "user",
content: {
type: "text",
text: `Perform a comprehensive code review of PR #${args.pr_number}.
**PR Title**: ${pr.title}
**Author**: ${pr.author}
**Changed Files**: ${pr.files.length}
**Focus Areas**:
${args.focus || "- Code quality\n- Security issues\n- Performance concerns\n- Best practices"}
**Instructions**:
1. Analyze each changed file
2. Identify issues and suggest improvements
3. Highlight security vulnerabilities
4. Check for performance regressions
5. Verify test coverage
**Output Format**:
- Summary of findings
- File-by-file analysis
- Recommended actions
- Approval recommendation (Approve / Request Changes / Comment)
Begin the review:`
}
}]
};
}
// Similar for other prompts
});
Usage:
$ claude
> "/prompts"
Available prompts:
- code_review: Comprehensive code review workflow
- security_audit: Security-focused code audit
- refactor_assessment: Assess refactoring opportunities
> "/prompt code_review pr_number=123 focus='security and performance'"
[Claude executes comprehensive review using template]
Pattern 4: Observability Bridge ๐
Use Case: Inject monitoring into Claude workflows
Example: OpenTelemetry Integration
instrumented-server.ts:
import { trace, context, SpanStatusCode } from '@opentelemetry/api';
import { NodeTracerProvider } from '@opentelemetry/sdk-trace-node';
import { BatchSpanProcessor } from '@opentelemetry/sdk-trace-base';
import { OTLPTraceExporter } from '@opentelemetry/exporter-trace-otlp-http';
// Initialize tracing
const provider = new NodeTracerProvider();
const exporter = new OTLPTraceExporter({
url: 'http://localhost:4318/v1/traces'
});
provider.addSpanProcessor(new BatchSpanProcessor(exporter));
provider.register();
const tracer = trace.getTracer('mcp-server', '1.0.0');
// Instrument tool calls
server.setRequestHandler(CallToolRequestSchema, async (request) => {
const span = tracer.startSpan(`mcp.tool.${request.params.name}`, {
attributes: {
'mcp.tool.name': request.params.name,
'mcp.user': process.env.CLAUDE_USER,
'mcp.session': process.env.CLAUDE_SESSION_ID
}
});
return context.with(trace.setSpan(context.active(), span), async () => {
try {
const startTime = Date.now();
const result = await executeTool(request.params);
span.setAttributes({
'mcp.tool.success': true,
'mcp.tool.duration_ms': Date.now() - startTime,
'mcp.tool.result_size': JSON.stringify(result).length
});
span.setStatus({ code: SpanStatusCode.OK });
return result;
} catch (error) {
span.setAttributes({
'mcp.tool.success': false,
'mcp.tool.error.type': error.constructor.name,
'mcp.tool.error.message': error.message
});
span.setStatus({
code: SpanStatusCode.ERROR,
message: error.message
});
span.recordException(error);
throw error;
} finally {
span.end();
}
});
});
Benefits:
- โ Distributed tracing across all Claude operations
- โ Tool execution metrics
- โ Error tracking
- โ Performance analysis
- โ User attribution
Packaging & Distribution ๐ฆ
Docker Container (production best practice):
FROM node:20-alpine
WORKDIR /app
COPY package*.json ./
RUN npm ci --only=production
COPY dist/ ./dist/
# Run as non-root
USER node
CMD ["node", "dist/server.js"]
Deployment:
# Build
docker build -t company-kb-mcp:1.0.0 .
# Run
docker run -d \
--name kb-mcp-server \
-e KB_API_KEY="${KB_API_KEY}" \
-v /var/run/docker.sock:/var/run/docker.sock \
company-kb-mcp:1.0.0
๐ก Active Recall: Which MCP server pattern would solve your team's biggest integration challenge? Data Source, Action Executor, Prompt Library, or Observability?
Part 3: Claude API & Programmatic Usage
Resources:
- Claude API Documentation - Official API reference
- Claude on Amazon Bedrock - AWS integration guide
- Claude on Google Vertex AI - GCP integration guide
Authentication Methods Comparison
| Method | Best For | Data Residency | Enterprise Features | |--------|----------|----------------|---------------------| | ๐ Direct API | Quick start, prototyping | US | Basic | | โ๏ธ AWS Bedrock | Production, compliance | Configurable | SSO, VPC, audit logs | | ๐ GCP Vertex AI | GCP-native apps | Configurable | IAM, VPC Service Controls |
Option 1: Direct API Key
from anthropic import Anthropic
client = Anthropic(api_key=os.environ["ANTHROPIC_API_KEY"])
Option 2: Amazon Bedrock
from anthropic import AnthropicBedrock
client = AnthropicBedrock(
aws_region="us-east-1",
aws_access_key=os.environ["AWS_ACCESS_KEY_ID"],
aws_secret_key=os.environ["AWS_SECRET_ACCESS_KEY"]
)
Option 3: Google Cloud Vertex AI
from anthropic import AnthropicVertex
client = AnthropicVertex(
region="us-central1",
project_id="my-project"
)
Pattern 1: Headless Automation ๐ค
Use Case: CI/CD pipelines, cron jobs
Example: Automated Code Review
.github/workflows/claude-review.yml:
name: Claude Code Review
on:
pull_request:
types: [opened, synchronize]
jobs:
review:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Claude Review
env:
ANTHROPIC_API_KEY: ${{ secrets.ANTHROPIC_API_KEY }}
run: |
claude -p "Review PR #${{ github.event.pull_request.number }} for security issues" \
--output-format stream-json \
--allowedTools read,grep,git \
> review.json
- name: Post Review
uses: actions/github-script@v7
with:
script: |
const fs = require('fs');
const review = JSON.parse(fs.readFileSync('review.json', 'utf8'));
await github.rest.issues.createComment({
owner: context.repo.owner,
repo: context.repo.repo,
issue_number: context.issue.number,
body: review.content
});
Pattern 2: Multi-Instance Orchestration ๐ญ
Use Case: Parallel workflows, specialized agents
Example: Code + Tests + Review
parallel-development.sh:
#!/bin/bash
set -e
PR_BRANCH="feature/new-api"
BASE_DIR=$(pwd)
# Create separate worktrees
git worktree add /tmp/impl-${PR_BRANCH} ${PR_BRANCH}
git worktree add /tmp/tests-${PR_BRANCH} ${PR_BRANCH}
git worktree add /tmp/review-${PR_BRANCH} ${PR_BRANCH}
# Parallel execution
(
cd /tmp/impl-${PR_BRANCH}
claude -p "Implement REST API endpoint for user profile updates per SPEC.md" \
--output-format stream-json \
> impl-result.json
) &
IMPL_PID=$!
(
cd /tmp/tests-${PR_BRANCH}
claude -p "Write comprehensive tests for user profile API endpoint" \
--output-format stream-json \
> tests-result.json
) &
TESTS_PID=$!
# Wait for implementation and tests
wait $IMPL_PID $TESTS_PID
# Review after both complete
(
cd /tmp/review-${PR_BRANCH}
claude -p "Review implementation and tests for completeness and quality" \
--output-format stream-json \
> review-result.json
)
# Merge results
echo "โ
Implementation complete"
echo "โ
Tests complete"
echo "โ
Review complete"
# Cleanup
git worktree remove /tmp/impl-${PR_BRANCH}
git worktree remove /tmp/tests-${PR_BRANCH}
git worktree remove /tmp/review-${PR_BRANCH}
Pattern 3: Programmatic SDK Usage ๐ป
Example: Batch Code Review
batch-review.py:
#!/usr/bin/env python3
import os
import sys
from anthropic import Anthropic
from github import Github
client = Anthropic(api_key=os.environ["ANTHROPIC_API_KEY"])
github = Github(os.environ["GITHUB_TOKEN"])
def review_file(file_path: str, content: str, context: str) -> str:
"""Review a single file using Claude"""
response = client.messages.create(
model="claude-sonnet-4-5-20250929",
max_tokens=4096,
messages=[{
"role": "user",
"content": f"""Review this code file for security and quality issues.
**File**: {file_path}
**Context**: {context}
**Code**:
{content}
**Focus on**:
- Security vulnerabilities (SQL injection, XSS, etc.)
- Error handling
- Performance concerns
- Best practices violations
**Format**: Markdown with severity levels (๐ด Critical, ๐ก Warning, ๐ข Info)
"""
}]
)
return response.content[0].text
def review_pr(repo_name: str, pr_number: int):
"""Review all files in a PR"""
repo = github.get_repo(repo_name)
pr = repo.get_pull(pr_number)
print(f"Reviewing PR #{pr_number}: {pr.title}")
reviews = []
for file in pr.get_files():
# Skip if too large or not code
if file.changes > 500 or not file.filename.endswith(('.py', '.js', '.ts', '.java')):
continue
print(f" Reviewing {file.filename}...")
# Get file content
content = repo.get_contents(file.filename, ref=pr.head.sha).decoded_content.decode()
# Review with Claude
review = review_file(
file_path=file.filename,
content=content,
context=f"PR #{pr_number}: {pr.title}"
)
reviews.append({
'file': file.filename,
'review': review
})
# Post consolidated review
review_body = "## Claude Code Review\n\n"
for r in reviews:
review_body += f"### {r['file']}\n\n{r['review']}\n\n---\n\n"
pr.create_issue_comment(review_body)
print(f"โ
Review posted to PR #{pr_number}")
if __name__ == "__main__":
if len(sys.argv) != 3:
print("Usage: batch-review.py <repo> <pr_number>")
sys.exit(1)
review_pr(sys.argv[1], int(sys.argv[2]))
Usage:
$ python batch-review.py company/api-server 142
Reviewing PR #142: Add user authentication
Reviewing src/auth/login.py...
Reviewing src/auth/tokens.py...
Reviewing tests/test_auth.py...
โ
Review posted to PR #142
Pattern 4: Streaming Responses ๐ก
Use Case: Real-time feedback for long operations
streaming-generation.py:
from anthropic import Anthropic
client = Anthropic()
def generate_with_progress(prompt: str):
"""Generate code with real-time streaming"""
print("Generating...\n")
with client.messages.stream(
model="claude-sonnet-4-5-20250929",
max_tokens=4096,
messages=[{"role": "user", "content": prompt}]
) as stream:
for text in stream.text_stream:
print(text, end="", flush=True)
print("\n\nโ
Generation complete")
# Usage
generate_with_progress("""
Create a Python FastAPI endpoint for user registration with:
- Email validation
- Password hashing (bcrypt)
- Rate limiting
- Comprehensive error handling
- OpenAPI documentation
""")
Output:
Generating...
from fastapi import APIRouter, HTTPException, Depends
from fastapi.security import OAuth2PasswordBearer
from pydantic import BaseModel, EmailStr
import bcrypt
from slowapi import Limiter
from slowapi.util import get_remote_address
router = APIRouter()
limiter = Limiter(key_func=get_remote_address)
class UserRegistration(BaseModel):
email: EmailStr
password: str
@router.post("/register", status_code=201)
@limiter.limit("5/minute")
async def register_user(user: UserRegistration):
"""Register a new user with email and password"""
...
โ
Generation complete
๐ก Active Recall: Which programmatic pattern would accelerate your team's workflow most: Headless CI/CD, Multi-Instance Orchestration, Batch Processing, or Streaming?
Part 4: Enterprise Security & Governance
Resources:
- GDPR Compliance Guide - General Data Protection Regulation documentation
- HIPAA Security Rule - Health Insurance Portability and Accountability Act
- SOC 2 Framework - Service Organization Control 2 certification
- OWASP Top 10 for LLM Applications - Security risks for AI systems
The Security Challenge
The Data:
| Security Gap | Percentage | Risk Level | |-------------|-----------|-----------| | ๐๏ธ Lack visibility into AI risks | 64% | ๐ด Critical | | ๐จ Cite AI data leaks as top concern | 69% | ๐ด Critical | | ๐ก๏ธ Have NO AI-specific security controls | 47% | ๐ด Critical | | ๐ Files with PII/PCI uploaded to GenAI | 40% | ๐ก High | | ๐ค AI usage through unmanaged accounts | 67% | ๐ก High |
The Stakes: One leaked API key, one exposed database, one PII breach = millions in fines, reputation damage, customer loss.
Enterprise Security Framework: Four comprehensive layers including authentication & authorization, data protection with PII/PCI detection, audit & compliance with GDPR/HIPAA/SOC2, and governance policies with automated incident response
The 4 Security Layers
| Layer | Purpose | Key Technologies | Compliance Impact | |-------|---------|-----------------|-------------------| | ๐ Layer 1 | Authentication & Authorization | SSO, RBAC, MFA | Access control | | ๐ก๏ธ Layer 2 | Data Protection | PII detection, encryption, residency | Privacy regulations | | ๐ Layer 3 | Audit & Compliance | Logging, reporting, retention | Regulatory proof | | ๐ Layer 4 | Governance | Policy-as-code, enforcement | Operational control |
Layer 1: Authentication & Authorization ๐
SSO Integration
Example: SAML 2.0 with Okta
saml-auth.py:
from onelogin.saml2.auth import OneLogin_Saml2_Auth
from flask import Flask, request, redirect, session
app = Flask(__name__)
app.secret_key = os.environ["SECRET_KEY"]
SAML_SETTINGS = {
"sp": {
"entityId": "https://claude.company.com",
"assertionConsumerService": {
"url": "https://claude.company.com/saml/acs",
"binding": "urn:oasis:names:tc:SAML:2.0:bindings:HTTP-POST"
}
},
"idp": {
"entityId": "http://www.okta.com/exk123456",
"singleSignOnService": {
"url": "https://company.okta.com/app/exk123456/sso/saml",
"binding": "urn:oasis:names:tc:SAML:2.0:bindings:HTTP-Redirect"
},
"x509cert": os.environ["OKTA_CERT"]
}
}
@app.route("/saml/login")
def saml_login():
auth = OneLogin_Saml2_Auth(request, SAML_SETTINGS)
return redirect(auth.login())
@app.route("/saml/acs", methods=["POST"])
def saml_acs():
auth = OneLogin_Saml2_Auth(request, SAML_SETTINGS)
auth.process_response()
if not auth.is_authenticated():
return "Authentication failed", 401
# Store user info
session["user"] = {
"email": auth.get_nameid(),
"attributes": auth.get_attributes()
}
# Get user roles from SAML attributes
roles = auth.get_attributes().get("roles", [])
session["roles"] = roles
return redirect("/dashboard")
RBAC Implementation
Role Permission Matrix:
| Role | Permissions | Typical Users | Risk Level | |------|------------|---------------|-----------| | ๐จโ๐ป Developer | read_code, write_code, create_branch, create_pr | Engineers | ๐ข Low | | ๐ Reviewer | read_code, comment_pr, approve_pr | Tech Leads | ๐ข Low | | ๐ง Admin | All permissions (*) | Platform Team | ๐ด Critical | | ๐ Security Auditor | read_code, read_logs, read_audit_trail | Security Team | ๐ก Medium |
roles.yaml:
roles:
developer:
permissions:
- read_code
- write_code
- create_branch
- create_pr
reviewer:
permissions:
- read_code
- comment_pr
- approve_pr
admin:
permissions:
- "*" # All permissions
security_auditor:
permissions:
- read_code
- read_logs
- read_audit_trail
rbac.py:
class RBAC:
def __init__(self, roles_config):
self.roles = roles_config["roles"]
def check_permission(self, user_roles: list[str], permission: str) -> bool:
"""Check if user has required permission"""
for role in user_roles:
role_perms = self.roles.get(role, {}).get("permissions", [])
# Wildcard permission
if "*" in role_perms:
return True
# Exact match
if permission in role_perms:
return True
return False
def require_permission(self, permission: str):
"""Decorator to enforce permission"""
def decorator(func):
def wrapper(*args, **kwargs):
user_roles = session.get("roles", [])
if not self.check_permission(user_roles, permission):
raise PermissionError(
f"Permission denied: {permission} required"
)
return func(*args, **kwargs)
return wrapper
return decorator
rbac = RBAC(yaml.safe_load(open("roles.yaml")))
@app.route("/api/code/write", methods=["POST"])
@rbac.require_permission("write_code")
def write_code():
"""Write code endpoint (requires write_code permission)"""
# Implementation
Layer 2: Data Protection ๐ก๏ธ
PII/PCI Detection Patterns:
| Data Type | Pattern Example | Severity | Regulation |
|-----------|----------------|----------|------------|
| ๐ข SSN | 123-45-6789 | ๐ด Critical | GDPR, HIPAA |
| ๐ณ Credit Card | 4532-1234-5678-9010 | ๐ด Critical | PCI DSS |
| ๐ง Email | user@example.com | ๐ก Warning | GDPR, CCPA |
| ๐ API Key | api_key: sk_live_abc123... | ๐ด Critical | All |
| โ๏ธ AWS Key | AKIA123456789ABCDEFG | ๐ด Critical | All |
| ๐ Private Key | -----BEGIN RSA PRIVATE KEY----- | ๐ด Critical | All |
data-scanner.py:
import re
from typing import List, Tuple
class DataProtectionScanner:
PATTERNS = {
'ssn': {
'pattern': r'\b\d{3}-\d{2}-\d{4}\b',
'description': 'Social Security Number',
'severity': 'CRITICAL'
},
'credit_card': {
'pattern': r'\b\d{4}[- ]?\d{4}[- ]?\d{4}[- ]?\d{4}\b',
'description': 'Credit Card Number',
'severity': 'CRITICAL'
},
'email': {
'pattern': r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b',
'description': 'Email Address',
'severity': 'WARNING'
},
'api_key': {
'pattern': r'(api[_-]?key|apikey|access[_-]?token)[\'"\s:=]+([a-zA-Z0-9]{32,})',
'description': 'API Key / Access Token',
'severity': 'CRITICAL'
},
'aws_key': {
'pattern': r'AKIA[0-9A-Z]{16}',
'description': 'AWS Access Key',
'severity': 'CRITICAL'
},
'private_key': {
'pattern': r'-----BEGIN (RSA|DSA|EC|OPENSSH) PRIVATE KEY-----',
'description': 'Private Key',
'severity': 'CRITICAL'
}
}
def scan(self, content: str) -> List[Tuple[str, str, str]]:
"""Scan content for sensitive data
Returns: List of (type, description, matched_text)
"""
findings = []
for data_type, config in self.PATTERNS.items():
matches = re.finditer(config['pattern'], content, re.IGNORECASE)
for match in matches:
findings.append((
data_type,
config['description'],
config['severity'],
match.group(0)[:50] # Truncate for logging
))
return findings
class DataProtectionError(Exception):
"""Raised when sensitive data detected"""
pass
scanner = DataProtectionScanner()
def protect_claude_input(content: str):
"""Scan input before sending to Claude"""
findings = scanner.scan(content)
critical = [f for f in findings if f[2] == 'CRITICAL']
if critical:
# Log security incident
security_log.warning(
"Sensitive data detected in Claude input",
user=session["user"]["email"],
data_types=[f[0] for f in critical]
)
# Block request
raise DataProtectionError(
f"Cannot process: {len(critical)} sensitive data patterns detected"
)
return True
# Use in API endpoint
@app.route("/api/claude/chat", methods=["POST"])
def claude_chat():
content = request.json["message"]
# Scan before processing
protect_claude_input(content)
# Safe to proceed
response = claude_client.messages.create(...)
return jsonify(response)
Data Residency Controls
Regional Configuration Table:
| Region | AWS Bedrock | GCP Vertex AI | Compliance | |--------|------------|---------------|------------| | ๐บ๐ธ US | us-east-1 | us-central1 | CCPA, SOC 2 | | ๐ช๐บ EU | eu-west-1 | europe-west1 | GDPR | | ๐ APAC | ap-southeast-1 | asia-southeast1 | Local regulations |
claude-client.py:
class EnterpriseClaudeClient:
REGION_CONFIGS = {
'us': {
'bedrock_region': 'us-east-1',
'vertex_region': 'us-central1',
'data_residency': 'United States'
},
'eu': {
'bedrock_region': 'eu-west-1',
'vertex_region': 'europe-west1',
'data_residency': 'European Union'
},
'apac': {
'bedrock_region': 'ap-southeast-1',
'vertex_region': 'asia-southeast1',
'data_residency': 'Asia Pacific'
}
}
def __init__(self, data_residency: str):
if data_residency not in self.REGION_CONFIGS:
raise ValueError(f"Invalid data residency: {data_residency}")
config = self.REGION_CONFIGS[data_residency]
# Use AWS Bedrock for data residency
from anthropic import AnthropicBedrock
self.client = AnthropicBedrock(
aws_region=config['bedrock_region']
)
self.data_residency = config['data_residency']
def complete(self, **kwargs):
"""Ensure data stays in configured region"""
# Add metadata for audit
kwargs.setdefault('metadata', {})
kwargs['metadata']['data_residency'] = self.data_residency
return self.client.messages.create(**kwargs)
# Usage based on user location
user_country = session["user"]["country"]
if user_country in ["DE", "FR", "IT", "ES"]:
client = EnterpriseClaudeClient("eu")
elif user_country in ["US", "CA"]:
client = EnterpriseClaudeClient("us")
else:
client = EnterpriseClaudeClient("apac")
Layer 3: Audit & Compliance ๐
Comprehensive Audit Logging
Audit Event Types:
| Event Type | Information Captured | Retention | Purpose | |-----------|---------------------|-----------|---------| | ๐ Authentication | User, timestamp, IP, success/fail | 1 year | Security forensics | | ๐ฌ Chat Request | User, message length, sensitive data flags | 1 year | Compliance audit | | ๐ Chat Response | Model, tokens, cost, latency | 1 year | Cost & performance | | โ ๏ธ Security Incident | Type, severity, user, action taken | 7 years | Regulatory requirement | | ๐ง Configuration Change | User, before/after, timestamp | 3 years | Change management |
audit-log.py:
import json
from datetime import datetime
from typing import Dict, Any
class AuditLogger:
def __init__(self, log_path: str = "/var/log/claude/audit.jsonl"):
self.log_path = log_path
def log(self, event_type: str, **kwargs):
"""Log audit event"""
event = {
"timestamp": datetime.utcnow().isoformat() + "Z",
"event_type": event_type,
**kwargs
}
with open(self.log_path, "a") as f:
f.write(json.dumps(event) + "\n")
audit = AuditLogger()
# Log all Claude operations
@app.route("/api/claude/chat", methods=["POST"])
def claude_chat():
user = session["user"]
message = request.json["message"]
# Scan for sensitive data
findings = scanner.scan(message)
# Create audit event
audit.log(
event_type="claude_chat_request",
user_email=user["email"],
user_roles=user["roles"],
message_length=len(message),
sensitive_data_detected=len(findings) > 0,
sensitive_data_types=[f[0] for f in findings] if findings else [],
ip_address=request.remote_addr,
user_agent=request.headers.get("User-Agent")
)
# Process request
response = claude_client.messages.create(
model="claude-sonnet-4-5-20250929",
max_tokens=4096,
messages=[{"role": "user", "content": message}]
)
# Log response
audit.log(
event_type="claude_chat_response",
user_email=user["email"],
model=response.model,
tokens_used=response.usage.total_tokens,
cost_usd=calculate_cost(response.usage),
latency_ms=(response.stop_reason_timestamp - response.start_timestamp) * 1000
)
return jsonify(response)
Compliance Report Generation
compliance-report.py:
def generate_compliance_report(start_date, end_date):
"""Generate compliance report for audit period"""
# Read audit logs
events = []
with open("/var/log/claude/audit.jsonl") as f:
for line in f:
event = json.loads(line)
if start_date <= event["timestamp"] <= end_date:
events.append(event)
# Calculate metrics
total_requests = len([e for e in events if e["event_type"] == "claude_chat_request"])
sensitive_data_blocked = len([e for e in events
if e.get("sensitive_data_detected")])
unique_users = len(set(e["user_email"] for e in events))
total_tokens = sum(e.get("tokens_used", 0) for e in events
if e["event_type"] == "claude_chat_response")
total_cost = sum(e.get("cost_usd", 0) for e in events
if e["event_type"] == "claude_chat_response")
report = {
"period": {
"start": start_date,
"end": end_date
},
"usage": {
"total_requests": total_requests,
"unique_users": unique_users,
"total_tokens": total_tokens,
"total_cost_usd": round(total_cost, 2)
},
"security": {
"sensitive_data_incidents": sensitive_data_blocked,
"incident_rate": round(sensitive_data_blocked / total_requests * 100, 2)
},
"compliance": {
"audit_logs_complete": True,
"data_residency_enforced": True,
"access_controls_enabled": True,
"encryption_at_rest": True,
"encryption_in_transit": True
}
}
return report
Layer 4: Governance Policies ๐
Policy-as-Code
.claude/policies/enterprise.yaml:
version: "1.0"
security:
pii_detection:
enabled: true
block_on_match: true
alert_security_team: true
patterns:
- ssn
- credit_card
- api_key
- private_key
data_residency:
enforce: true
allowed_regions:
- us-east-1 # US users
- eu-west-1 # EU users
access_control:
authentication:
required: true
methods:
- saml
- oauth2
authorization:
model: rbac
roles_file: roles.yaml
usage_limits:
per_user:
requests_per_hour: 100
tokens_per_day: 1000000
per_team:
requests_per_hour: 1000
cost_per_month_usd: 10000
content_filters:
block_patterns:
- pattern: "prod|production"
scope: "database_queries"
message: "Cannot access production databases"
- pattern: "DROP TABLE|DELETE FROM"
scope: "sql"
message: "Destructive SQL not allowed"
audit:
retention_days: 365
export_format: jsonl
compliance_frameworks:
- gdpr
- hipaa
- soc2
Policy Enforcement
Usage Limit Enforcement Table:
| Limit Type | Threshold | Action | Reset Period | |-----------|----------|--------|--------------| | โฑ๏ธ Hourly Requests | 100/user | Block request | 1 hour rolling | | ๐ Daily Tokens | 1M/user | Block request | Midnight UTC | | ๐ฐ Monthly Cost | $10K/team | Alert + soft limit | Monthly | | ๐จ Critical Pattern | 1 match | Block + alert security | N/A |
policy-engine.py:
import yaml
class PolicyEngine:
def __init__(self, policy_file: str):
with open(policy_file) as f:
self.policy = yaml.safe_load(f)
def enforce_usage_limits(self, user_email: str):
"""Check if user within usage limits"""
limits = self.policy["usage_limits"]["per_user"]
# Check hourly requests
recent_requests = audit.count_events(
event_type="claude_chat_request",
user_email=user_email,
since=datetime.now() - timedelta(hours=1)
)
if recent_requests >= limits["requests_per_hour"]:
raise RateLimitError(
f"Hourly limit exceeded: {recent_requests}/{limits['requests_per_hour']}"
)
# Check daily tokens
today_tokens = audit.sum_field(
event_type="claude_chat_response",
user_email=user_email,
field="tokens_used",
since=datetime.now().replace(hour=0, minute=0, second=0)
)
if today_tokens >= limits["tokens_per_day"]:
raise RateLimitError(
f"Daily token limit exceeded: {today_tokens}/{limits['tokens_per_day']}"
)
return True
def check_content_filters(self, content: str, scope: str):
"""Apply content filters"""
filters = self.policy.get("content_filters", {}).get("block_patterns", [])
for filter in filters:
if filter["scope"] != scope:
continue
if re.search(filter["pattern"], content, re.IGNORECASE):
raise ContentFilterError(filter["message"])
return True
policy = PolicyEngine(".claude/policies/enterprise.yaml")
@app.route("/api/claude/chat", methods=["POST"])
def claude_chat():
user = session["user"]
message = request.json["message"]
# Enforce policies
policy.enforce_usage_limits(user["email"])
policy.check_content_filters(message, "general")
# Process request
...
๐ก Active Recall: If you had to choose only ONE security layer to implement first, which would protect your organization most: Authentication, Data Protection, Audit Logging, or Governance Policies?
Part 5: Production Observability
Resources:
- OpenTelemetry Documentation - Observability framework standard
- Prometheus Monitoring - Metrics collection and alerting
- Grafana Dashboards - Visualization and analytics platform
- Jaeger Distributed Tracing - End-to-end distributed tracing
The Observability Challenge
Why AI is Different:
| Traditional Systems | AI Systems | Implication | |--------------------|-----------|-------------| | โ Deterministic outputs | โ Probabilistic outputs | Same input โ same output | | โ Simple dependencies | โ Complex dependencies | Models + tools + data sources | | โ Predictable latency | โ Variable latency | Token generation is non-deterministic | | โ Uptime metrics | โ Quality metrics | Need acceptance rate, not just uptime |
Traditional monitoring is insufficient.
Production Observability Stack: Complete monitoring architecture from application instrumentation with OpenTelemetry through collection, storage (Prometheus/Loki/Jaeger), and visualization with Grafana dashboards and alerting
Essential Metrics to Track
Performance Metrics Hierarchy:
| Category | Metric | Why It Matters | SLA Target | |----------|--------|---------------|-----------| | ๐ Volume | claude_requests_total | Usage trends, capacity planning | N/A | | โฑ๏ธ Latency | claude_latency_seconds (p50/p95/p99) | User experience | p95 < 5s | | ๐ช Tokens | claude_tokens_used_total | Cost driver | Track trends | | ๐ฐ Cost | claude_cost_usd_total | Budget management | <$50K/month | | โ Quality | claude_acceptance_rate | Output usefulness | >70% | | โ Errors | claude_error_rate | Reliability | <1% |
Implementation:
from prometheus_client import Counter, Histogram, Gauge
# Request metrics
claude_requests_total = Counter(
'claude_requests_total',
'Total Claude API requests',
['model', 'status', 'user']
)
claude_latency_seconds = Histogram(
'claude_latency_seconds',
'Claude API request latency',
['model'],
buckets=[0.1, 0.5, 1.0, 2.0, 5.0, 10.0, 30.0]
)
claude_tokens_used = Counter(
'claude_tokens_used_total',
'Total tokens used',
['model', 'user', 'team']
)
# Quality metrics
claude_acceptance_rate = Gauge(
'claude_acceptance_rate',
'Percentage of Claude suggestions accepted',
['feature']
)
claude_error_rate = Gauge(
'claude_error_rate',
'Percentage of requests resulting in errors',
['error_type']
)
# Cost metrics
claude_cost_usd = Counter(
'claude_cost_usd_total',
'Total cost in USD',
['model', 'team']
)
Usage:
@app.route("/api/claude/chat", methods=["POST"])
def claude_chat():
user = session["user"]
start_time = time.time()
try:
response = claude_client.messages.create(
model="claude-sonnet-4-5-20250929",
max_tokens=4096,
messages=[{"role": "user", "content": request.json["message"]}]
)
# Record metrics
claude_requests_total.labels(
model=response.model,
status="success",
user=user["email"]
).inc()
latency = time.time() - start_time
claude_latency_seconds.labels(model=response.model).observe(latency)
claude_tokens_used.labels(
model=response.model,
user=user["email"],
team=user["team"]
).inc(response.usage.total_tokens)
cost = calculate_cost(response.model, response.usage.total_tokens)
claude_cost_usd.labels(
model=response.model,
team=user["team"]
).inc(cost)
return jsonify(response)
except Exception as error:
claude_requests_total.labels(
model="claude-sonnet-4-5-20250929",
status="error",
user=user["email"]
).inc()
raise
OpenTelemetry Integration ๐
Complete Distributed Tracing
otel-setup.py:
from opentelemetry import trace, metrics
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader
from opentelemetry.exporter.otlp.proto.http.metric_exporter import OTLPMetricExporter
from opentelemetry.sdk.resources import Resource
# Configure resource
resource = Resource.create({
"service.name": "claude-integration",
"service.version": "1.0.0",
"deployment.environment": os.environ.get("ENV", "production")
})
# Tracing
trace_provider = TracerProvider(resource=resource)
trace_exporter = OTLPSpanExporter(endpoint="http://localhost:4318/v1/traces")
trace_provider.add_span_processor(BatchSpanProcessor(trace_exporter))
trace.set_tracer_provider(trace_provider)
# Metrics
metric_reader = PeriodicExportingMetricReader(
OTLPMetricExporter(endpoint="http://localhost:4318/v1/metrics")
)
metric_provider = MeterProvider(
resource=resource,
metric_readers=[metric_reader]
)
metrics.set_meter_provider(metric_provider)
tracer = trace.get_tracer(__name__)
meter = metrics.get_meter(__name__)
Instrumented Client:
class ObservableClaudeClient:
def __init__(self, client):
self.client = client
self.tracer = trace.get_tracer(__name__)
self.meter = metrics.get_meter(__name__)
# Metrics
self.request_counter = self.meter.create_counter(
"claude.requests",
description="Number of Claude API requests"
)
self.token_counter = self.meter.create_counter(
"claude.tokens",
description="Total tokens used"
)
self.latency_histogram = self.meter.create_histogram(
"claude.latency",
unit="ms",
description="Request latency in milliseconds"
)
def messages_create(self, **kwargs):
"""Instrumented messages.create"""
span = self.tracer.start_span(
"claude.messages.create",
attributes={
"claude.model": kwargs.get("model"),
"claude.max_tokens": kwargs.get("max_tokens"),
"claude.user": kwargs.get("metadata", {}).get("user_id")
}
)
start_time = time.time()
try:
response = self.client.messages.create(**kwargs)
# Record metrics
self.request_counter.add(1, {
"model": response.model,
"status": "success"
})
self.token_counter.add(response.usage.total_tokens, {
"model": response.model,
"type": "total"
})
self.token_counter.add(response.usage.input_tokens, {
"model": response.model,
"type": "input"
})
self.token_counter.add(response.usage.output_tokens, {
"model": response.model,
"type": "output"
})
latency_ms = (time.time() - start_time) * 1000
self.latency_histogram.record(latency_ms, {
"model": response.model
})
# Add to span
span.set_attributes({
"claude.response.id": response.id,
"claude.response.model": response.model,
"claude.response.stop_reason": response.stop_reason,
"claude.usage.input_tokens": response.usage.input_tokens,
"claude.usage.output_tokens": response.usage.output_tokens,
"claude.usage.total_tokens": response.usage.total_tokens,
"claude.latency_ms": latency_ms
})
span.set_status(trace.Status(trace.StatusCode.OK))
return response
except Exception as error:
self.request_counter.add(1, {
"model": kwargs.get("model"),
"status": "error"
})
span.record_exception(error)
span.set_status(trace.Status(
trace.StatusCode.ERROR,
str(error)
))
raise
finally:
span.end()
# Usage
observable_client = ObservableClaudeClient(claude_client)
response = observable_client.messages_create(...)
Alerting Rules ๐จ
Alert Severity Levels:
| Severity | Condition | Response Time | Escalation | |----------|-----------|--------------|------------| | ๐ข Info | < 1% error rate | Review daily | None | | ๐ก Warning | 5-20% error rate, p95 > 10s | Review within 1 hour | After 4 hours | | ๐ด Critical | > 20% error rate, budget exceeded | Immediate (page) | After 15 minutes |
prometheus-alerts.yml:
groups:
- name: claude_integration
interval: 30s
rules:
# Error rate alerts
- alert: HighErrorRate
expr: |
rate(claude_requests_total{status="error"}[5m])
/ rate(claude_requests_total[5m]) > 0.05
for: 5m
labels:
severity: warning
annotations:
summary: "Claude API error rate above 5%"
description: "{{ $value | humanizePercentage }} of requests failing"
- alert: CriticalErrorRate
expr: |
rate(claude_requests_total{status="error"}[5m])
/ rate(claude_requests_total[5m]) > 0.20
for: 2m
labels:
severity: critical
annotations:
summary: "Claude API error rate above 20%"
# Latency alerts
- alert: HighLatency
expr: |
histogram_quantile(0.95,
rate(claude_latency_seconds_bucket[5m])
) > 10
for: 10m
labels:
severity: warning
annotations:
summary: "Claude API p95 latency above 10 seconds"
- alert: CriticalLatency
expr: |
histogram_quantile(0.95,
rate(claude_latency_seconds_bucket[5m])
) > 30
for: 5m
labels:
severity: critical
annotations:
summary: "Claude API p95 latency above 30 seconds"
# Cost alerts
- alert: CostAnomaly
expr: |
increase(claude_cost_usd_total[1h]) > 1000
labels:
severity: warning
annotations:
summary: "Abnormal cost spike detected"
description: "${{ $value }} spent in last hour"
- alert: MonthlyBudgetExceeded
expr: |
sum(increase(claude_cost_usd_total[30d])) > 50000
labels:
severity: critical
annotations:
summary: "Monthly budget of $50,000 exceeded"
# Quality alerts
- alert: LowAcceptanceRate
expr: |
claude_acceptance_rate < 0.50
for: 1h
labels:
severity: warning
annotations:
summary: "Claude suggestion acceptance rate below 50%"
# Usage alerts
- alert: UnusualTokenUsage
expr: |
increase(claude_tokens_used_total[1h])
> 4 * avg_over_time(increase(claude_tokens_used_total[1h])[7d])
labels:
severity: warning
annotations:
summary: "Token usage 4x higher than weekly average"
Dashboards ๐
Key Dashboard Panels:
| Panel | Metric | Visualization | Update Frequency | |-------|--------|---------------|------------------| | ๐ Request Rate | rate(claude_requests_total[5m]) | Time series | 5 seconds | | โฑ๏ธ Latency | histogram_quantile(0.95, ...) | Time series (p50/p95/p99) | 5 seconds | | ๐ช Token Usage | sum by (team) (rate(tokens[1h])) | Stacked area chart | 1 minute | | ๐ฐ Cost | sum(increase(cost[30d])) | Gauge + trend | 5 minutes | | โ Acceptance Rate | claude_acceptance_rate | Gauge by feature | 1 minute |
Grafana Dashboard (JSON export):
{
"title": "Claude Code Production",
"panels": [
{
"title": "Request Rate",
"targets": [{
"expr": "rate(claude_requests_total[5m])",
"legendFormat": "{{ status }}"
}]
},
{
"title": "Latency (p50, p95, p99)",
"targets": [
{
"expr": "histogram_quantile(0.50, rate(claude_latency_seconds_bucket[5m]))",
"legendFormat": "p50"
},
{
"expr": "histogram_quantile(0.95, rate(claude_latency_seconds_bucket[5m]))",
"legendFormat": "p95"
},
{
"expr": "histogram_quantile(0.99, rate(claude_latency_seconds_bucket[5m]))",
"legendFormat": "p99"
}
]
},
{
"title": "Token Usage by Team",
"targets": [{
"expr": "sum by (team) (rate(claude_tokens_used_total[1h]))",
"legendFormat": "{{ team }}"
}]
},
{
"title": "Cost (Last 30 Days)",
"targets": [{
"expr": "sum(increase(claude_cost_usd_total[30d]))",
"legendFormat": "Total"
}]
},
{
"title": "Acceptance Rate by Feature",
"targets": [{
"expr": "claude_acceptance_rate",
"legendFormat": "{{ feature }}"
}]
}
]
}
๐ก Active Recall: If your Claude integration started failing right now, which metrics would you check FIRST to diagnose the issue?
Part 6: Third-Party Integrations
Slack Integration ๐ฌ
Pattern: Bot with Claude Backend
slack-bot.py:
from slack_bolt import App
from anthropic import Anthropic
app = App(token=os.environ["SLACK_BOT_TOKEN"])
claude = Anthropic(api_key=os.environ["ANTHROPIC_API_KEY"])
@app.event("app_mention")
def handle_mention(event, say):
"""Respond to @mentions"""
# Extract question
question = event["text"].split(">", 1)[1].strip()
# Get Claude response
response = claude.messages.create(
model="claude-sonnet-4-5-20250929",
max_tokens=1024,
messages=[{
"role": "user",
"content": f"Answer this Slack question concisely:\n\n{question}"
}],
metadata={
"user_id": event["user"],
"channel_id": event["channel"]
}
)
# Reply in thread
say(
text=response.content[0].text,
thread_ts=event.get("thread_ts", event["ts"])
)
@app.command("/claude")
def handle_slash_command(ack, command, respond):
"""Handle /claude slash command"""
ack() # Acknowledge immediately
# Get Claude response
response = claude.messages.create(
model="claude-sonnet-4-5-20250929",
max_tokens=1024,
messages=[{
"role": "user",
"content": command["text"]
}]
)
# Respond (only visible to user)
respond(response.content[0].text)
if __name__ == "__main__":
app.start(port=3000)
Pattern: Webhook Alerts
slack-alerts.py:
import requests
SLACK_WEBHOOK_URL = os.environ["SLACK_WEBHOOK_URL"]
def alert_security_team(incident: dict):
"""Alert security team of potential issue"""
requests.post(
SLACK_WEBHOOK_URL,
json={
"text": f"๐จ Security Alert: {incident['type']}",
"attachments": [{
"color": "danger",
"fields": [
{
"title": "User",
"value": incident["user_email"],
"short": True
},
{
"title": "Severity",
"value": incident["severity"],
"short": True
},
{
"title": "Details",
"value": incident["description"],
"short": False
},
{
"title": "Action Required",
"value": incident["action"],
"short": False
}
],
"footer": "Claude Security Monitor",
"ts": int(time.time())
}]
}
)
# Use in data protection scanner
def protect_claude_input(content: str):
findings = scanner.scan(content)
if findings:
alert_security_team({
"type": "Sensitive Data Detected",
"user_email": session["user"]["email"],
"severity": "HIGH",
"description": f"Detected {len(findings)} sensitive data patterns",
"action": "Request blocked. Review user activity."
})
raise DataProtectionError("Sensitive data detected")
GitHub Integration ๐
Automated PR Review Bot
github-bot.py:
from github import Github
from anthropic import Anthropic
import hmac
import hashlib
github_client = Github(os.environ["GITHUB_TOKEN"])
claude = Anthropic()
@app.route("/github/webhook", methods=["POST"])
def github_webhook():
"""Handle GitHub webhook events"""
# Verify signature
signature = request.headers.get("X-Hub-Signature-256")
if not verify_github_signature(request.data, signature):
return "Invalid signature", 401
event = request.headers.get("X-GitHub-Event")
payload = request.json
if event == "pull_request":
if payload["action"] in ["opened", "synchronize"]:
# New or updated PR
review_pr(payload["pull_request"])
return "OK", 200
def verify_github_signature(payload: bytes, signature: str) -> bool:
"""Verify GitHub webhook signature"""
secret = os.environ["GITHUB_WEBHOOK_SECRET"].encode()
expected = "sha256=" + hmac.new(secret, payload, hashlib.sha256).hexdigest()
return hmac.compare_digest(expected, signature)
def review_pr(pr_data: dict):
"""Review PR with Claude"""
repo = github_client.get_repo(pr_data["base"]["repo"]["full_name"])
pr = repo.get_pull(pr_data["number"])
# Get changed files
files = pr.get_files()
reviews = []
for file in files:
# Skip large files
if file.changes > 500:
continue
# Get file content
try:
content = repo.get_contents(
file.filename,
ref=pr.head.sha
).decoded_content.decode()
except:
continue # File deleted or binary
# Review with Claude
review = claude.messages.create(
model="claude-sonnet-4-5-20250929",
max_tokens=2048,
messages=[{
"role": "user",
"content": f"""Review this code change for security and quality:
**File**: {file.filename}
**Changes**: +{file.additions} -{file.deletions}
{file.patch}
Focus on:
- Security vulnerabilities
- Error handling
- Performance issues
- Code quality
Format: Markdown list of issues with severity (๐ด/๐ก/๐ข)"""
}]
)
reviews.append({
"file": file.filename,
"review": review.content[0].text
})
# Post consolidated review
review_body = "## ๐ค Claude Code Review\n\n"
for r in reviews:
review_body += f"### {r['file']}\n\n{r['review']}\n\n"
review_body += "\n---\n*Automated review by Claude Code*"
pr.create_issue_comment(review_body)
Discord Integration ๐ฎ
discord-bot.py:
import discord
from anthropic import Anthropic
intents = discord.Intents.default()
intents.message_content = True
client = discord.Client(intents=intents)
claude = Anthropic()
@client.event
async def on_ready():
print(f"Logged in as {client.user}")
@client.event
async def on_message(message):
# Ignore own messages
if message.author == client.user:
return
# Respond to !claude command
if message.content.startswith("!claude"):
prompt = message.content[8:].strip()
if not prompt:
await message.channel.send("Usage: `!claude <question>`")
return
# Show typing indicator
async with message.channel.typing():
response = claude.messages.create(
model="claude-sonnet-4-5-20250929",
max_tokens=1024,
messages=[{
"role": "user",
"content": prompt
}],
metadata={
"user_id": str(message.author.id),
"channel_id": str(message.channel.id),
"guild_id": str(message.guild.id) if message.guild else None
}
)
# Split response if too long (Discord limit: 2000 chars)
content = response.content[0].text
if len(content) <= 2000:
await message.channel.send(content)
else:
# Split into chunks
chunks = [content[i:i+1900] for i in range(0, len(content), 1900)]
for chunk in chunks:
await message.channel.send(chunk)
client.run(os.environ["DISCORD_BOT_TOKEN"])
Integration Comparison Table:
| Platform | Best For | Complexity | User Experience | |----------|---------|-----------|-----------------| | ๐ฌ Slack | Team collaboration, alerts | Low | Familiar, thread-based | | ๐ GitHub | Code review, PR automation | Medium | Integrated with workflow | | ๐ฎ Discord | Developer communities | Low | Real-time, casual | | ๐ง Email | Reports, scheduled digests | Low | Universal reach |
Part 7: Zimbra Production Deployment
Three Deployment Patterns: Single-tenant (dedicated per customer), Multi-tenant Shared (row-level security), and Federated Hybrid (data residency control for GDPR/HIPAA compliance with on-premise, cloud, and edge gateways)
Deployment Pattern Selection
| Pattern | Use When | Data Isolation | Cost | Compliance | |---------|----------|----------------|------|------------| | ๐ข Single-Tenant | Regulated industries, large customers | โ Complete | ๐ฐ๐ฐ๐ฐ High | โ Maximum | | ๐ญ Multi-Tenant | SaaS, startups, cost optimization | โ ๏ธ Row-level security | ๐ฐ Low | โ ๏ธ Requires design | | ๐ Federated Hybrid | Global enterprises, data residency | โ Geographic | ๐ฐ๐ฐ Medium | โ Flexible |
Use Case 1: Custom MCP Server for Zimbra Admin ๐ง
Example: Zimbra Operations Server
zimbra-mcp-server.ts:
import { Server } from "@modelcontextprotocol/sdk/server/index.js";
import { execSync } from "child_process";
const server = new Server(
{ name: "zimbra-admin-server", version: "1.0.0" },
{ capabilities: { tools: {}, resources: {} } }
);
// List mailboxes resource
server.setRequestHandler(ListResourcesRequestSchema, async () => ({
resources: [
{
uri: "zimbra://mailboxes",
name: "All Mailboxes",
description: "List of all Zimbra mailboxes",
mimeType: "application/json"
},
{
uri: "zimbra://domains",
name: "All Domains",
description: "List of all Zimbra domains",
mimeType: "application/json"
}
]
}));
// Read mailbox list
server.setRequestHandler(ReadResourceRequestSchema, async (request) => {
if (request.params.uri === "zimbra://mailboxes") {
const output = execSync("zmprov -l gaa", { encoding: "utf-8" });
const mailboxes = output.trim().split("\n");
return {
contents: [{
uri: request.params.uri,
mimeType: "application/json",
text: JSON.stringify(mailboxes, null, 2)
}]
};
}
});
// Admin tools
server.setRequestHandler(ListToolsRequestSchema, async () => ({
tools: [
{
name: "check_mailbox_health",
description: "Check health status of a mailbox",
inputSchema: {
type: "object",
properties: {
email: { type: "string", format: "email" }
},
required: ["email"]
}
},
{
name: "get_mailbox_quota",
description: "Get quota information for a mailbox",
inputSchema: {
type: "object",
properties: {
email: { type: "string", format: "email" }
},
required: ["email"]
}
},
{
name: "search_logs",
description: "Search Zimbra logs for patterns",
inputSchema: {
type: "object",
properties: {
pattern: { type: "string" },
log_file: {
type: "string",
enum: ["mailbox.log", "audit.log", "sync.log"]
},
lines: { type: "number", default: 100 }
},
required: ["pattern", "log_file"]
}
}
]
}));
server.setRequestHandler(CallToolRequestSchema, async (request) => {
const { name, arguments: args } = request.params;
if (name === "check_mailbox_health") {
try {
const output = execSync(
`zmmailbox -z -m ${args.email} gaf`,
{ encoding: "utf-8", timeout: 30000 }
);
return {
content: [{
type: "text",
text: `โ
Mailbox ${args.email} is healthy\n\nFolders:\n${output}`
}]
};
} catch (error) {
return {
content: [{
type: "text",
text: `โ Health check failed: ${error.message}`
}],
isError: true
};
}
}
if (name === "get_mailbox_quota") {
const output = execSync(
`zmprov gmi ${args.email} zimbraMailQuota`,
{ encoding: "utf-8" }
);
const quota = parseInt(output.match(/zimbraMailQuota: (\d+)/)?.[1] || "0");
const used = execSync(
`zmmailbox -z -m ${args.email} gms`,
{ encoding: "utf-8" }
);
return {
content: [{
type: "text",
text: `Quota for ${args.email}:\n- Limit: ${(quota / 1024 / 1024).toFixed(2)} MB\n- Used: ${used}`
}]
};
}
if (name === "search_logs") {
const logPath = `/opt/zimbra/log/${args.log_file}`;
const output = execSync(
`grep -i "${args.pattern}" ${logPath} | tail -n ${args.lines}`,
{ encoding: "utf-8" }
);
return {
content: [{
type: "text",
text: output || `No matches found for "${args.pattern}"`
}]
};
}
throw new Error(`Unknown tool: ${name}`);
});
Use Case 2: Automated Zimbra Monitoring ๐
zimbra-monitor.py:
#!/usr/bin/env python3
import subprocess
import json
from anthropic import Anthropic
import time
claude = Anthropic()
def check_zimbra_health() -> dict:
"""Collect Zimbra health metrics"""
health = {}
# Service status
try:
status_output = subprocess.check_output(
["zmcontrol", "status"],
encoding="utf-8"
)
health["services"] = parse_service_status(status_output)
except Exception as e:
health["services"] = {"error": str(e)}
# Mailbox store usage
try:
df_output = subprocess.check_output(
["df", "-h", "/opt/zimbra/store"],
encoding="utf-8"
)
health["storage"] = parse_storage_status(df_output)
except Exception as e:
health["storage"] = {"error": str(e)}
# Recent errors in log
try:
errors = subprocess.check_output(
["grep", "-i", "error", "/opt/zimbra/log/mailbox.log"],
encoding="utf-8"
).strip().split("\n")[-10:] # Last 10 errors
health["recent_errors"] = errors
except Exception:
health["recent_errors"] = []
return health
def analyze_with_claude(health_data: dict) -> dict:
"""Analyze health data with Claude"""
response = claude.messages.create(
model="claude-sonnet-4-5-20250929",
max_tokens=2048,
messages=[{
"role": "user",
"content": f"""Analyze this Zimbra server health data and provide recommendations:
```json
{json.dumps(health_data, indent=2)}
Provide:
- Overall health assessment (Healthy/Warning/Critical)
- Issues identified
- Recommended actions
- Priority (High/Medium/Low)
Format as JSON.""" }] )
return json.loads(response.content[0].text)
def alert_if_needed(analysis: dict): """Send alert if issues detected"""
if analysis["overall_health"] in ["Warning", "Critical"]:
# Send to Slack
requests.post(
os.environ["SLACK_WEBHOOK_URL"],
json={
"text": f"โ ๏ธ Zimbra Health Alert: {analysis['overall_health']}",
"attachments": [{
"color": "warning" if analysis["overall_health"] == "Warning" else "danger",
"fields": [
{
"title": "Issues",
"value": "\n".join(analysis["issues"]),
"short": False
},
{
"title": "Recommended Actions",
"value": "\n".join(analysis["recommended_actions"]),
"short": False
}
]
}]
}
)
Main monitoring loop
if name == "main": while True: print("Checking Zimbra health...")
health = check_zimbra_health()
analysis = analyze_with_claude(health)
alert_if_needed(analysis)
print(f"Health: {analysis['overall_health']}")
# Run every 5 minutes
time.sleep(300)
### Use Case 3: Slack Integration for Zimbra Alerts ๐ฌ
**zimbra-slack-bot.py**:
```python
from slack_bolt import App
from anthropic import Anthropic
import subprocess
app = App(token=os.environ["SLACK_BOT_TOKEN"])
claude = Anthropic()
@app.command("/zimbra")
def handle_zimbra_command(ack, command, respond):
"""Handle /zimbra slash command"""
ack() # Acknowledge immediately
action = command["text"].strip()
if action == "status":
# Get Zimbra status
status = subprocess.check_output(
["zmcontrol", "status"],
encoding="utf-8"
)
respond(f"```\n{status}\n```")
elif action.startswith("diagnose "):
email = action.split(" ", 1)[1]
# Diagnose with Claude
response = claude.messages.create(
model="claude-sonnet-4-5-20250929",
max_tokens=1024,
messages=[{
"role": "user",
"content": f"Diagnose issues for Zimbra mailbox {email}. Check logs and provide troubleshooting steps."
}]
)
respond(response.content[0].text)
else:
respond("Usage: `/zimbra status` or `/zimbra diagnose <email>`")
if __name__ == "__main__":
app.start(port=3000)
๐ก Active Recall: For a Zimbra production deployment, which monitoring approach would catch issues fastest: MCP server tools, automated health checks, or Slack bot commands?
Part 8: Cost Optimization
Cost Optimization Cycle: Continuous process from monitoring API calls and token usage, analyzing high-cost drivers, implementing optimization strategies (prompt engineering, model selection, caching, batching), validating results with A/B testing, and achieving 40-60% cost reduction
The Cost Challenge
Claude Sonnet 4.5 Pricing (as of 2025):
| Component | Cost per Million Tokens | Typical Ratio | |-----------|------------------------|---------------| | ๐ฅ Input | $3.00 | 60-70% of total | | ๐ค Output | $15.00 | 30-40% of total |
Example Monthly Costs:
- 100 users ร 100K tokens/day = 10M tokens/day = $450-2,250/day = $13,500-67,500/month
Optimization is critical.
The 5 Cost Optimization Strategies
| Strategy | Savings Potential | Complexity | Implementation Time | |----------|------------------|------------|---------------------| | ๐พ Caching | Up to 90% on repeated context | Low | 1 day | | ๐ฏ Model Selection | 30-60% with smart routing | Medium | 3-5 days | | ๐ซ Token Budgets | Prevents overruns | Low | 1 day | | ๐ก Response Streaming | 10-20% early stopping | Medium | 2-3 days | | ๐ Monitoring & Alerts | Prevents waste | Low | 2 days |
Strategy 1: Caching ๐พ
Prompt Caching (Anthropic feature):
# Without caching
response1 = claude.messages.create(
model="claude-sonnet-4-5-20250929",
max_tokens=1024,
messages=[{
"role": "user",
"content": f"{large_context}\n\nQuestion 1"
}]
)
# Cost: Full input tokens
response2 = claude.messages.create(
model="claude-sonnet-4-5-20250929",
max_tokens=1024,
messages=[{
"role": "user",
"content": f"{large_context}\n\nQuestion 2"
}]
)
# Cost: Full input tokens again!
# With caching
response1 = claude.messages.create(
model="claude-sonnet-4-5-20250929",
max_tokens=1024,
system=[{
"type": "text",
"text": large_context,
"cache_control": {"type": "ephemeral"}
}],
messages=[{
"role": "user",
"content": "Question 1"
}]
)
# Cost: Full input tokens (first time)
response2 = claude.messages.create(
model="claude-sonnet-4-5-20250929",
max_tokens=1024,
system=[{
"type": "text",
"text": large_context,
"cache_control": {"type": "ephemeral"}
}],
messages=[{
"role": "user",
"content": "Question 2"
}]
)
# Cost: Cached input tokens (90% cheaper!)
Savings: Up to 90% on repeated context
Strategy 2: Model Selection ๐ฏ
Model Comparison Table:
| Model | Cost (Input/Output) | Speed | Best For | |-------|-------------------|-------|----------| | ๐ Haiku | $0.25/$1.25 per M | โกโกโก Fast | Simple tasks, high volume | | โ๏ธ Sonnet | $3/$15 per M | โกโก Medium | Most tasks, balanced | | ๐ง Opus | $15/$75 per M | โก Slow | Complex reasoning, critical |
Choose the right model:
class SmartClaudeClient:
def route_request(self, prompt: str, complexity: str = "auto"):
"""Route to appropriate model based on complexity"""
if complexity == "auto":
# Analyze prompt complexity
if len(prompt) < 200 and "simple" in prompt.lower():
model = "claude-haiku-4-20250409"
elif "complex" in prompt.lower() or len(prompt) > 2000:
model = "claude-opus-4-20250514"
else:
model = "claude-sonnet-4-5-20250929"
else:
model = {
"simple": "claude-haiku-4-20250409",
"standard": "claude-sonnet-4-5-20250929",
"complex": "claude-opus-4-20250514"
}[complexity]
return self.client.messages.create(
model=model,
messages=[{"role": "user", "content": prompt}]
)
Strategy 3: Token Budgets ๐ซ
Per-User Limits:
class TokenBudgetEnforcer:
def __init__(self, redis_client):
self.redis = redis_client
def check_budget(self, user_id: str, tokens_requested: int) -> bool:
"""Check if user within daily token budget"""
key = f"token_budget:{user_id}:{date.today()}"
used = int(self.redis.get(key) or 0)
limit = 100000 # 100K tokens/day
if used + tokens_requested > limit:
raise BudgetExceededError(
f"Daily budget exceeded: {used}/{limit} tokens used"
)
return True
def record_usage(self, user_id: str, tokens_used: int):
"""Record token usage"""
key = f"token_budget:{user_id}:{date.today()}"
self.redis.incr(key, tokens_used)
self.redis.expire(key, 86400) # Expire after 24h
enforcer = TokenBudgetEnforcer(redis_client)
@app.route("/api/claude/chat", methods=["POST"])
def claude_chat():
user_id = session["user"]["id"]
# Check budget
enforcer.check_budget(user_id, estimated_tokens)
# Process request
response = claude.messages.create(...)
# Record actual usage
enforcer.record_usage(user_id, response.usage.total_tokens)
return jsonify(response)
Strategy 4: Response Streaming ๐ก
Stop when enough:
def stream_with_early_stop(prompt: str, max_quality_tokens: int = 500):
"""Stream response, stop when quality degrades"""
with claude.messages.stream(
model="claude-sonnet-4-5-20250929",
max_tokens=2048,
messages=[{"role": "user", "content": prompt}]
) as stream:
tokens = 0
accumulated = ""
for text in stream.text_stream:
accumulated += text
tokens += 1
# Stop if we have enough quality content
if tokens > max_quality_tokens and accumulated.endswith((".", "!", "?")):
stream.close()
break
return accumulated
Strategy 5: Monitoring & Alerts ๐
Cost Dashboard:
@app.route("/admin/costs")
def cost_dashboard():
"""Real-time cost dashboard"""
# Today's costs
today_tokens = sum_tokens_today()
today_cost = calculate_cost(today_tokens)
# This month
month_tokens = sum_tokens_month()
month_cost = calculate_cost(month_tokens)
# Projected monthly cost
days_elapsed = date.today().day
days_in_month = calendar.monthrange(date.today().year, date.today().month)[1]
projected_cost = (month_cost / days_elapsed) * days_in_month
# Top users
top_users = get_top_users_by_cost(limit=10)
return render_template("costs.html",
today_cost=today_cost,
month_cost=month_cost,
projected_cost=projected_cost,
budget=50000, # $50K/month
top_users=top_users
)
Alert when approaching budget:
# Check every hour
@app.before_request
def check_monthly_budget():
if not g.get("budget_checked"):
month_cost = calculate_month_cost()
budget = 50000 # $50K
if month_cost > budget * 0.90:
alert_admin(
f"โ ๏ธ 90% of monthly budget used: ${month_cost:,.2f} / ${budget:,.2f}"
)
g.budget_checked = True
Cost Optimization Checklist:
| Optimization | Before | After | Savings | Status | |--------------|--------|-------|---------|--------| | โ Prompt caching | $20K/month | $4K/month | 80% | Implemented | | โ Model routing | $15K/month | $8K/month | 47% | Implemented | | โ Token budgets | Uncontrolled | $50K cap | N/A | Enforced | | โฑ๏ธ Early stopping | Not used | Testing | TBD | In progress | | โ Cost alerts | None | Real-time | N/A | Active |
๐ก Active Recall: If you could only implement TWO cost optimization strategies today, which combination would yield the highest ROI: Caching + Model Selection, Token Budgets + Monitoring, or Streaming + Alerts?
FAQ
How do I choose between building a custom MCP server vs using the Claude API directly?
Decision Matrix:
| Factor | MCP Server | Claude API Direct | |--------|-----------|------------------| | ๐๏ธ Data access needs | โ Complex, multiple sources | โ ๏ธ Simple, single source | | ๐ฅ Team usage | โ Multi-user, shared resources | โ ๏ธ Single user, scripts | | ๐ Reusability | โ Across sessions, discoverable | โ Per-script basis | | โ๏ธ Tool orchestration | โ Multiple tools, workflows | โ ๏ธ Single purpose | | ๐ Time to value | โ ๏ธ Days to build | โ Hours to script |
Use MCP server when:
- You need to expose internal data/tools to Claude
- You want reusable resources across sessions
- You're building for team/multi-user access
- You want standardized, discoverable capabilities
Use API directly when:
- Simple automation scripts
- One-off tasks
- No need for persistent resources
- Programmatic control over every aspect
Often: Use both! API for orchestration, MCP for capabilities.
What's the minimum security setup for production?
Essential Security Stack (Priority Order):
| Priority | Control | Implementation Time | Risk if Missing | |----------|---------|-------------------|----------------| | ๐ด P0 | Authentication (SSO) | 1 week | ๐ด Critical | | ๐ด P0 | PII detection | 3 days | ๐ด Critical | | ๐ก P1 | RBAC | 1 week | ๐ก High | | ๐ก P1 | Audit logging | 3 days | ๐ก High | | ๐ข P2 | Data residency | 5 days | ๐ข Medium |
Essential:
- โ Authentication (SSO preferred)
- โ RBAC (role-based access control)
- โ PII detection (block sensitive data)
- โ Audit logging (comprehensive)
- โ Data residency (comply with regulations)
Start here, then add:
- Usage limits
- Cost tracking
- Observability
- Governance policies
How much should I budget for Claude Code in production?
Formula:
Monthly Cost = Users ร Tokens/User/Day ร 30 days ร Token Price
Example (100 users, 100K tokens/day avg, mostly Sonnet):
100 users ร 100K tokens ร 30 days = 300M tokens/month
300M ร $3 (input) / 1M = $900 (input)
300M ร $15 (output) / 1M = $4,500 (output)
Total: ~$5,400/month
Add 30% buffer for spikes โ ~$7,000/month
Budgeting by Team Size:
| Team Size | Tokens/Month | Estimated Cost | Per-User Cost | |-----------|-------------|---------------|--------------| | 10 users | 30M | $1,000-1,500 | $100-150 | | 50 users | 150M | $5,000-7,500 | $100-150 | | 100 users | 300M | $10,000-15,000 | $100-150 | | 500 users | 1.5B | $50,000-75,000 | $100-150 |
Rule of thumb: $50-150 per active user per month
How do I measure ROI?
ROI Calculation Framework:
Track:
- Time saved: Features delivered ร hours saved per feature
- Quality improvement: Bugs reduced, test coverage increased
- Cost: Claude API + engineering time
Example:
- 50 features/month ร 4 hours saved = 200 hours saved
- 200 hours ร $100/hour = $20,000 value
- Cost: $7,000 (Claude) + $5,000 (eng time) = $12,000
- ROI: 67% return
ROI Metrics Table:
| Metric | Measurement Method | Target | |--------|-------------------|--------| | โฑ๏ธ Time Savings | Features/month ร hours saved | >100 hours/month | | ๐ Bug Reduction | Bugs filed pre/post deployment | -30% bugs | | โ Test Coverage | Lines covered by tests | +20% coverage | | ๐ Velocity | Story points/sprint | +25% velocity | | ๐ฐ Net Value | (Time saved ร rate) - costs | >50% ROI |
Can I use Claude Code for regulated industries (healthcare, finance)?
Yes, but you need:
Compliance Requirements by Industry:
| Industry | Key Requirements | Claude Setup | |----------|-----------------|-------------| | ๐ฅ Healthcare (HIPAA) | BAA, PHI protection, audit trails | AWS Bedrock + encryption | | ๐ณ Finance (PCI DSS) | PCI compliance, data segregation | VPC + dedicated instances | | ๐ฆ Banking (GLBA) | Financial privacy, audit logs | Private deployment + RBAC | | ๐ช๐บ EU Operations (GDPR) | Data residency, right to erasure | EU regions only |
- โ BAA (Business Associate Agreement) for HIPAA
- โ Data residency controls (EU, US, etc.)
- โ Audit trails (comprehensive logging)
- โ Access controls (RBAC, SOC 2)
- โ Encryption (at rest, in transit)
Best: Use AWS Bedrock or GCP Vertex AI for compliance features built-in.
How do I handle MCP server failures gracefully?
Pattern: Circuit breaker + fallback
class MCPServerWrapper {
private failureCount = 0;
private maxFailures = 3;
private cooldownMs = 60000;
private lastFailureTime = 0;
async callTool(name: string, args: any) {
// Check circuit breaker
if (this.failureCount >= this.maxFailures) {
const elapsed = Date.now() - this.lastFailureTime;
if (elapsed < this.cooldownMs) {
throw new Error("Circuit breaker open");
}
// Reset after cooldown
this.failureCount = 0;
}
try {
const result = await this.server.callTool(name, args);
this.failureCount = 0; // Reset on success
return result;
} catch (error) {
this.failureCount++;
this.lastFailureTime = Date.now();
// Log for monitoring
logger.error("MCP tool call failed", { name, error });
throw error;
}
}
}
Resilience Patterns:
| Pattern | Purpose | Tradeoff | |---------|---------|----------| | ๐ Circuit Breaker | Stop cascading failures | Temporary unavailability | | โฑ๏ธ Timeout | Prevent hanging | May cut off slow requests | | ๐ Retry with Backoff | Handle transient failures | Increased latency | | ๐ Graceful Degradation | Partial functionality | Reduced capability |
What's the best way to test MCP servers before production?
Three-stage testing:
Testing Pyramid:
| Stage | Coverage | Tools | Time Investment | |-------|----------|-------|-----------------| | ๐ฌ Unit Tests | Server logic, handlers | Jest, pytest | 2-3 days | | ๐ Integration Tests | With Claude CLI | Manual testing | 1-2 days | | ๐ Load Tests | Production simulation | k6, Locust | 2-3 days |
- Unit tests (server logic):
describe("Knowledge Base Server", () => {
it("should list resources", async () => {
const result = await server.handle(ListResourcesRequest);
expect(result.resources).toHaveLength(10);
});
it("should handle missing resources", async () => {
await expect(
server.handle(ReadResourceRequest, { uri: "kb://invalid" })
).rejects.toThrow("Resource not found");
});
});
- Integration tests (with Claude):
# Test via Claude CLI
$ claude
> "List resources from knowledge-base server"
[Verify output]
> "Search knowledge base for 'API authentication'"
[Verify results]
- Load tests (production simulation):
# Load test MCP server
import asyncio
async def load_test():
tasks = [
server.call_tool("search_kb", {"query": f"query {i}"})
for i in range(100)
]
await asyncio.gather(*tasks)
# Measure: latency, error rate, resource usage
Conclusion
You've Built a Production AI System
Not a desktop tool. Not a side project. A production system.
โ The Integration Maturity Model: Progressed from Level 1 (desktop) to Level 4-5 (enterprise platform)
โ Custom MCP Servers: Built data connectors, action executors, prompt libraries, observability bridges
โ Enterprise Security: Implemented SSO, RBAC, PII detection, audit logging, compliance frameworks
โ Production Observability: Deployed OpenTelemetry, Prometheus metrics, Grafana dashboards, alerting
โ Third-Party Integrations: Connected Slack, Discord, GitHub with automated workflows
โ Zimbra Production: Real enterprise deployment with monitoring, alerts, and automation
โ Cost Optimization: Caching, model selection, token budgets, monitoring, alerting
But here's what matters most: You understand that AI in production isn't about using cooler models.
It's about:
- ๐ Security (protecting data, meeting compliance)
- ๐๏ธ Observability (knowing what's happening)
- ๐ Governance (controlling costs, enforcing policies)
- ๐ก๏ธ Reliability (handling failures gracefully)
- ๐ Scale (serving hundreds or thousands of users)
The Series Complete
This is Post 9โthe finale of our Claude Code journey:
| Post | Focus | Achievement | |------|-------|-------------| | 1 | Introduction | Discovered Claude Code | | 2 | Setup | Installed and configured | | 3 | Basics | First commands and workflows | | 4 | Advanced | Power user techniques | | 5 | Automation | Hooks, scripts, CI/CD | | 6 | MCP & Extensions | Plugins and integrations | | 7 | Troubleshooting | Resilience engineering | | 8 | Workflows | Team productivity | | 9 | Production | Enterprise deployment โ You are here |
From zero to production in 9 posts.
What's Next?
Journey Recommendations by Current Level:
| Your Current Level | Next Steps | Timeline | |-------------------|-----------|----------| | ๐ข Level 1-2 | Focus on automation (Post 5) and workflows (Post 8) | 1-2 months | | ๐ก Level 3 | Build first MCP server (this post), deploy observability (Part 5) | 2-3 months | | ๐ด Level 4 | Implement complete governance framework, achieve Level 5 | 3-6 months |
If you're at Level 1-2: Focus on automation (Post 5) and workflows (Post 8)
If you're at Level 3: Build your first MCP server (this post), deploy observability (Part 5)
If you're at Level 4: Implement complete governance framework, achieve Level 5 (AI Operating System)
For everyone: The ecosystem is moving fast. OpenAI, Google, Microsoft all adopted MCP in 2025. The time to build production AI systems is now.
The Challenge
Your mission: Deploy Claude Code to production following this framework.
30-Day Production Deployment Plan:
| Week | Focus | Deliverables | Success Criteria | |------|-------|-------------|-----------------| | 1๏ธโฃ | MCP Foundation | Data connector server | Claude can access internal data | | 2๏ธโฃ | Security | SSO, RBAC, PII detection | All access authenticated + authorized | | 3๏ธโฃ | Observability | Metrics, logs, traces, alerts | Full visibility into operations | | 4๏ธโฃ | Integration & Optimization | Slack/GitHub, cost controls | Team workflows automated |
30 days:
- Week 1: Build custom MCP server (data connector)
- Week 2: Implement security (SSO, RBAC, PII detection)
- Week 3: Deploy observability (metrics, logs, traces, alerts)
- Week 4: Integrate third-party (Slack/GitHub), optimize costs
After 30 days: You'll have a production AI system serving your team with enterprise security, complete observability, and controlled costs.
Then ask: What can we build that wasn't possible before?
For resources and continued learning, visit Claude Code Docs, explore MCP Specification, and join the community on GitHub.
๐ Previous: Workflow Engineering
P.S. Remember: 64% of organizations lack visibility into AI risks. 47% have no AI-specific security controls. Don't be in that majority. Build production systems the right wayโsecure, observable, governed. Your future self (and your security team) will thank you.