9 - Beyond the Sandbox: Production AI Systems with Claude Code

9 - Beyond the Sandbox: Production AI Systems with Claude Code

November 06, 2025
4 views

Table of Contents


Introduction

From Desktop Tool to Production System

You've been using Claude Code on your laptop. Maybe you automated a few tasks. Perhaps your team started experimenting.

Now comes the hard question: "Can we put this in production?"

Here's what production means:

| Production Concern | The Reality | Impact | |-------------------|------------|---------| | ๐Ÿ” Security | 69% of enterprises cite AI data leaks as top concern | Data breaches, compliance violations | | ๐Ÿ“‹ Compliance | 55% are unprepared for AI regulatory requirements | Fines, legal exposure | | ๐Ÿ‘๏ธ Governance | 64% lack visibility into AI risks | Uncontrolled usage, budget overruns | | ๐Ÿ“Š Scale | Traditional monitoring fails for probabilistic systems | Silent failures, quality degradation | | ๐Ÿ’ฐ Cost | Uncontrolled token usage blows budgets | Surprise bills, project shutdowns |

This isn't about installing Claude Code on more laptops. This is about building a Production AI System:

  • โš™๏ธ Custom MCP servers connecting Claude to your data and tools
  • ๐Ÿ”’ Enterprise security with SSO, RBAC, audit logs, PII detection
  • ๐Ÿ“ˆ Production observability tracking latency, tokens, errors, and quality
  • ๐Ÿค– Programmatic automation via API, headless mode, CI/CD integration
  • ๐Ÿ”— Third-party ecosystems (Slack, Discord, GitHub, monitoring)
  • ๐Ÿ“œ Governance frameworks meeting GDPR, HIPAA, SOC 2 requirements

The stakes: OpenAI, Google DeepMind, and Microsoft all adopted the Model Context Protocol in 2025. The ecosystem is consolidating around standards. Now is the time to build production-grade AI integrations.

By the end, you'll have:

  • ๐Ÿ“Š The Integration Maturity Model (5 levels from desktop to AI OS)
  • ๐Ÿ—๏ธ Complete MCP server implementations (data sources, actions, workflows)
  • ๐Ÿ›ก๏ธ Enterprise security framework (authentication, data protection, compliance)
  • ๐Ÿ” Production observability stack (metrics, logs, traces, alerts)
  • ๐Ÿš€ Real deployment playbooks (Zimbra, Slack, GitHub, monitoring)

Let's build production AI systems.


Part 1: The Integration Maturity Model

The 5 Levels of AI System Integration

Most teams start at Level 1 and think they're done. Production systems operate at Level 4-5.

Integration Maturity Model - 5 levels from desktop tool to AI operating system The Integration Maturity Model: Progression from individual desktop tool (Level 1) to strategic AI operating system (Level 5) with enterprise governance, observability, and ecosystem integration

Understanding the Maturity Levels

| Level | Name | Adoption | Key Indicator | Investment Required | |-------|------|----------|---------------|---------------------| | 1๏ธโƒฃ | Basic Usage | 90% | Manual CLI conversations | Hours | | 2๏ธโƒฃ | API Integration | 8% | Automated workflows, CI/CD | Days | | 3๏ธโƒฃ | MCP Ecosystem | 1.5% | Custom servers, tool orchestration | Weeks | | 4๏ธโƒฃ | Enterprise Platform | 0.4% | SSO, RBAC, compliance | Months | | 5๏ธโƒฃ | AI Operating System | <0.1% | Full ecosystem, self-service | Quarters |

Level 1: Basic Usage (Desktop Tool) ๐Ÿ–ฅ๏ธ

Capabilities:

  • Local CLI (claude)
  • IDE extension (VS Code)
  • Manual workflows
  • Single user

Use Cases:

  • Individual developers
  • Learning and prototyping
  • Side projects

Limitations:

  • โŒ No automation
  • โŒ No team coordination
  • โŒ No governance
  • โŒ No observability

Example:

$ claude
> "Help me debug this function"
[Manual conversation]

Who's Here: 90% of teams. You're experimenting, proving value.

Level 2: API Integration (Programmatic) ๐Ÿ”Œ

Capabilities:

  • Claude API programmatic access
  • Headless mode (CI/CD)
  • Pre-commit hooks
  • Automated workflows

Use Cases:

  • Automated code reviews
  • Build pipeline integration
  • Scheduled tasks

Limitations:

  • โŒ Point integrations (not ecosystem)
  • โŒ Manual security management
  • โŒ Limited observability

Example:

# Headless code review in CI
claude -p "Review PR #${PR_NUMBER}" \
  --output-format stream-json \
  > review.json

Who's Here: 8% of teams. You're automating specific workflows.

Level 3: MCP Ecosystem (Connected) ๐Ÿ”—

Capabilities:

  • Custom MCP servers
  • Tool integrations (GitHub, Slack, databases)
  • Resource sharing (documents, APIs, knowledge bases)
  • Prompt libraries

Use Cases:

  • Connected workflows
  • Data access across systems
  • Tool orchestration
  • Reusable templates

Limitations:

  • โŒ Single-tenant architecture
  • โŒ Limited governance controls
  • โŒ Manual compliance

Example:

// Custom knowledge base MCP server
server.setRequestHandler(ListResourcesRequestSchema, async () => ({
  resources: [
    { uri: "kb://policies/security", name: "Security Policies" },
    { uri: "kb://docs/api", name: "API Documentation" }
  ]
}));

Who's Here: 1.5% of teams. You're building connected ecosystems.

Level 4: Enterprise Platform (Governed) ๐Ÿข

Capabilities:

  • SSO integration (SAML/OAuth/OIDC)
  • RBAC (role-based access control)
  • Comprehensive audit logs
  • PII/PCI detection
  • Compliance frameworks (GDPR, HIPAA, SOC 2)
  • Cost tracking per user/team

Use Cases:

  • Team deployment at scale
  • Regulated industries
  • Enterprise security requirements
  • Multi-tenant environments

Enterprise Security Table:

| Security Layer | Components | Compliance Coverage | |---------------|-----------|---------------------| | Authentication | SSO, MFA, session management | GDPR, HIPAA, SOC 2 | | Authorization | RBAC, attribute-based access | PCI DSS, SOC 2 | | Data Protection | PII detection, encryption, residency | GDPR, HIPAA, CCPA | | Audit & Compliance | Logs, reports, incident response | All frameworks |

Example:

# Enterprise policy enforcement
policies:
  - name: "PII Detection"
    scan_input: true
    scan_output: true
    block_on_match: true
    alert_security_team: true

  - name: "Production Access"
    resources: ["prod-db", "prod-api"]
    requires_approval: true
    approvers: ["security-team"]

Who's Here: 0.4% of teams. You're running AI as enterprise infrastructure.

Level 5: AI Operating System (Strategic) ๐ŸŒ

Capabilities:

  • Full ecosystem integration (all tools, all data)
  • Real-time observability (OpenTelemetry)
  • Self-service deployment (platform engineering)
  • Automated governance (policy as code)
  • Multi-region deployment
  • Cost optimization engine
  • Quality gates (acceptance rates, security)

Use Cases:

  • Organization-wide transformation
  • AI-first development culture
  • Strategic competitive advantage

Requirements:

  • Platform engineering team
  • Production SRE practices
  • Continuous investment

Example:

# Self-service MCP server deployment
$ ai-platform deploy-mcp-server \
    --name internal-kb \
    --source ./kb-server \
    --region us-east-1,eu-west-1 \
    --auto-scale \
    --compliance gdpr,soc2 \
    --monitoring enabled

โœ“ Deployed to 2 regions
โœ“ Compliance checks passed
โœ“ Monitoring dashboards created
โœ“ Cost budgets configured

Who's Here: <0.1% of teams. You're building the future.

Your Journey Map

Where are you now? Level 1? Level 2?

Where do you need to be?

| Organization Type | Target Level | Timeline | Key Focus | |------------------|--------------|----------|-----------| | ๐Ÿš€ Startup | Level 3 | 2-3 months | MCP ecosystem, velocity | | ๐Ÿญ Mid-Market | Level 4 | 3-6 months | Governance, security | | ๐ŸŒ Enterprise | Level 4-5 | 6-12 months | Full platform, compliance |

The gap: Let's bridge it.

๐Ÿ’ก Active Recall: Before reading further, ask yourself: Which maturity level capabilities would have the biggest impact on your team's productivity today?


Part 2: Building Custom MCP Servers

MCP Architecture Primer

Model Context Protocol (MCP): Open standard (Anthropic, November 2024) for connecting AI systems to external tools and data.

Adopted by:

  • โœ… OpenAI (March 2025): ChatGPT, Agents SDK, Responses API
  • โœ… Google DeepMind (April 2025): Gemini models
  • โœ… Microsoft (2025): Copilot Studio

Components:

  • Hosts: LLM applications (Claude Code, ChatGPT, Gemini)
  • Clients: Connectors within hosts
  • Servers: You build these! Provide resources, tools, prompts

Official SDKs: Python, TypeScript, C#, Java

Resources:

MCP Architecture - Hosts, Protocol Layer, Clients, Servers, and Resources Model Context Protocol Architecture: How LLM applications (Hosts) connect through MCP clients to custom servers that provide access to data sources, APIs, and tools via JSON-RPC 2.0 protocol

The 4 MCP Server Patterns

| Pattern | Purpose | Use When | Complexity | |---------|---------|----------|------------| | ๐Ÿ—„๏ธ Data Source | Expose internal data | Knowledge bases, databases | Low | | โšก Action Executor | Execute operations | Deployments, API calls | Medium | | ๐Ÿ“ Prompt Library | Templated workflows | Code reviews, reports | Low | | ๐Ÿ“Š Observability | Inject monitoring | Tracing, metrics | Medium-High |

Pattern 1: Data Source Connector ๐Ÿ—„๏ธ

Use Case: Expose internal data to Claude

Example: Company Knowledge Base Server

server.ts:

import { Server } from "@modelcontextprotocol/sdk/server/index.js";
import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";
import {
  CallToolRequestSchema,
  ListResourcesRequestSchema,
  ListToolsRequestSchema,
  ReadResourceRequestSchema,
} from "@modelcontextprotocol/sdk/types.js";

// Your internal knowledge base API
import { KnowledgeBaseAPI } from "./kb-api.js";

const kb = new KnowledgeBaseAPI({
  apiKey: process.env.KB_API_KEY,
  baseUrl: "https://kb.company.internal"
});

const server = new Server(
  {
    name: "company-kb-server",
    version: "1.0.0",
  },
  {
    capabilities: {
      resources: {},
      tools: {},
    },
  }
);

// List available documents
server.setRequestHandler(ListResourcesRequestSchema, async () => {
  const docs = await kb.listDocuments();

  return {
    resources: docs.map(doc => ({
      uri: `kb://docs/${doc.id}`,
      name: doc.title,
      description: doc.summary,
      mimeType: "text/plain",
    })),
  };
});

// Read document content
server.setRequestHandler(ReadResourceRequestSchema, async (request) => {
  const docId = request.params.uri.split("/").pop();
  const content = await kb.getDocument(docId);

  return {
    contents: [{
      uri: request.params.uri,
      mimeType: "text/plain",
      text: content.body,
    }],
  };
});

// Search functionality
server.setRequestHandler(ListToolsRequestSchema, async () => ({
  tools: [{
    name: "search_kb",
    description: "Search company knowledge base for relevant documents",
    inputSchema: {
      type: "object",
      properties: {
        query: {
          type: "string",
          description: "Search query"
        },
        limit: {
          type: "number",
          description: "Max results (default 5)",
          default: 5
        }
      },
      required: ["query"]
    }
  }]
}));

server.setRequestHandler(CallToolRequestSchema, async (request) => {
  if (request.params.name === "search_kb") {
    const { query, limit = 5 } = request.params.arguments;
    const results = await kb.search(query, limit);

    return {
      content: [{
        type: "text",
        text: JSON.stringify(results, null, 2)
      }]
    };
  }

  throw new Error(`Unknown tool: ${request.params.name}`);
});

// Start server
async function main() {
  const transport = new StdioServerTransport();
  await server.connect(transport);
  console.error("Company KB MCP server running on stdio");
}

main().catch(console.error);

Configuration (.claude/mcp-servers.json):

{
  "mcpServers": {
    "company-kb": {
      "command": "node",
      "args": ["dist/server.js"],
      "env": {
        "KB_API_KEY": "${KB_API_KEY}"
      }
    }
  }
}

Usage:

$ claude
> "Search our knowledge base for API authentication best practices"

[Claude uses search_kb tool, retrieves results, summarizes findings]

Pattern 2: Action Executor โšก

Use Case: Execute operations on behalf of Claude

Example: Deployment Automation Server

Security Checklist for Action Executors:

| Security Control | Implementation | Why Critical | |-----------------|----------------|--------------| | โœ… Authorization | Check RBAC before execution | Prevent unauthorized deployments | | โœ… Input Validation | Enum/pattern constraints | Prevent injection attacks | | โœ… Timeouts | 5-minute max execution | Prevent resource exhaustion | | โœ… Audit Logging | All attempts logged | Compliance & forensics | | โœ… Notifications | Slack/email on completion | Team awareness | | โœ… Rollback Support | Emergency abort capability | Disaster recovery |

deploy-server.ts:

import { Server } from "@modelcontextprotocol/sdk/server/index.js";
import { execSync } from "child_process";

const server = new Server(
  { name: "deploy-server", version: "1.0.0" },
  { capabilities: { tools: {} } }
);

// Authorization check
async function checkDeployPermission(service: string, env: string): Promise<boolean> {
  // Query your RBAC system
  const user = process.env.CLAUDE_USER;
  const hasPermission = await rbac.check(user, `deploy:${service}:${env}`);
  return hasPermission;
}

server.setRequestHandler(ListToolsRequestSchema, async () => ({
  tools: [
    {
      name: "deploy_staging",
      description: "Deploy service to staging environment",
      inputSchema: {
        type: "object",
        properties: {
          service: { type: "string", enum: ["api", "web", "worker"] },
          version: { type: "string", pattern: "^v\\d+\\.\\d+\\.\\d+$" }
        },
        required: ["service", "version"]
      }
    },
    {
      name: "rollback_staging",
      description: "Rollback service in staging to previous version",
      inputSchema: {
        type: "object",
        properties: {
          service: { type: "string", enum: ["api", "web", "worker"] }
        },
        required: ["service"]
      }
    }
  ]
}));

server.setRequestHandler(CallToolRequestSchema, async (request) => {
  const { name, arguments: args } = request.params;

  if (name === "deploy_staging") {
    // Authorization check
    if (!await checkDeployPermission(args.service, "staging")) {
      return {
        content: [{
          type: "text",
          text: "โŒ Unauthorized: You don't have permission to deploy this service"
        }],
        isError: true
      };
    }

    // Execute deployment
    try {
      const result = execSync(
        `./scripts/deploy.sh ${args.service} ${args.version} staging`,
        { encoding: "utf-8", timeout: 300000 }
      );

      // Send notification
      await slack.notify({
        channel: "#deployments",
        text: `โœ… ${args.service} ${args.version} deployed to staging by Claude`
      });

      return {
        content: [{
          type: "text",
          text: `โœ… Successfully deployed ${args.service} ${args.version} to staging\n\n${result}`
        }]
      };
    } catch (error) {
      return {
        content: [{
          type: "text",
          text: `โŒ Deployment failed: ${error.message}`
        }],
        isError: true
      };
    }
  }

  if (name === "rollback_staging") {
    // Similar implementation
  }

  throw new Error(`Unknown tool: ${name}`);
});

Safety Considerations:

  • โœ… Authorization checks before execution
  • โœ… Restricted service/environment enum
  • โœ… Version validation (semantic versioning)
  • โœ… Command timeout (5 minutes)
  • โœ… Notification on completion
  • โœ… Audit logging

Pattern 3: Prompt Library ๐Ÿ“

Use Case: Provide templated workflows

Example: Code Review Templates

review-prompts-server.ts:

server.setRequestHandler(ListPromptsRequestSchema, async () => ({
  prompts: [
    {
      name: "code_review",
      description: "Comprehensive code review workflow",
      arguments: [
        {
          name: "pr_number",
          description: "Pull request number",
          required: true
        },
        {
          name: "focus",
          description: "Review focus area",
          required: false
        }
      ]
    },
    {
      name: "security_audit",
      description: "Security-focused code audit",
      arguments: [
        {
          name: "files",
          description: "Files to audit (glob pattern)",
          required: true
        }
      ]
    },
    {
      name: "refactor_assessment",
      description: "Assess refactoring opportunities",
      arguments: [
        {
          name: "directory",
          description: "Directory to analyze",
          required: true
        }
      ]
    }
  ]
}));

server.setRequestHandler(GetPromptRequestSchema, async (request) => {
  const { name, arguments: args } = request.params;

  if (name === "code_review") {
    const pr = await github.getPR(args.pr_number);

    return {
      messages: [{
        role: "user",
        content: {
          type: "text",
          text: `Perform a comprehensive code review of PR #${args.pr_number}.

**PR Title**: ${pr.title}
**Author**: ${pr.author}
**Changed Files**: ${pr.files.length}

**Focus Areas**:
${args.focus || "- Code quality\n- Security issues\n- Performance concerns\n- Best practices"}

**Instructions**:
1. Analyze each changed file
2. Identify issues and suggest improvements
3. Highlight security vulnerabilities
4. Check for performance regressions
5. Verify test coverage

**Output Format**:
- Summary of findings
- File-by-file analysis
- Recommended actions
- Approval recommendation (Approve / Request Changes / Comment)

Begin the review:`
        }
      }]
    };
  }

  // Similar for other prompts
});

Usage:

$ claude
> "/prompts"

Available prompts:
- code_review: Comprehensive code review workflow
- security_audit: Security-focused code audit
- refactor_assessment: Assess refactoring opportunities

> "/prompt code_review pr_number=123 focus='security and performance'"

[Claude executes comprehensive review using template]

Pattern 4: Observability Bridge ๐Ÿ“Š

Use Case: Inject monitoring into Claude workflows

Example: OpenTelemetry Integration

instrumented-server.ts:

import { trace, context, SpanStatusCode } from '@opentelemetry/api';
import { NodeTracerProvider } from '@opentelemetry/sdk-trace-node';
import { BatchSpanProcessor } from '@opentelemetry/sdk-trace-base';
import { OTLPTraceExporter } from '@opentelemetry/exporter-trace-otlp-http';

// Initialize tracing
const provider = new NodeTracerProvider();
const exporter = new OTLPTraceExporter({
  url: 'http://localhost:4318/v1/traces'
});
provider.addSpanProcessor(new BatchSpanProcessor(exporter));
provider.register();

const tracer = trace.getTracer('mcp-server', '1.0.0');

// Instrument tool calls
server.setRequestHandler(CallToolRequestSchema, async (request) => {
  const span = tracer.startSpan(`mcp.tool.${request.params.name}`, {
    attributes: {
      'mcp.tool.name': request.params.name,
      'mcp.user': process.env.CLAUDE_USER,
      'mcp.session': process.env.CLAUDE_SESSION_ID
    }
  });

  return context.with(trace.setSpan(context.active(), span), async () => {
    try {
      const startTime = Date.now();
      const result = await executeTool(request.params);

      span.setAttributes({
        'mcp.tool.success': true,
        'mcp.tool.duration_ms': Date.now() - startTime,
        'mcp.tool.result_size': JSON.stringify(result).length
      });

      span.setStatus({ code: SpanStatusCode.OK });
      return result;
    } catch (error) {
      span.setAttributes({
        'mcp.tool.success': false,
        'mcp.tool.error.type': error.constructor.name,
        'mcp.tool.error.message': error.message
      });

      span.setStatus({
        code: SpanStatusCode.ERROR,
        message: error.message
      });

      span.recordException(error);
      throw error;
    } finally {
      span.end();
    }
  });
});

Benefits:

  • โœ… Distributed tracing across all Claude operations
  • โœ… Tool execution metrics
  • โœ… Error tracking
  • โœ… Performance analysis
  • โœ… User attribution

Packaging & Distribution ๐Ÿ“ฆ

Docker Container (production best practice):

FROM node:20-alpine

WORKDIR /app

COPY package*.json ./
RUN npm ci --only=production

COPY dist/ ./dist/

# Run as non-root
USER node

CMD ["node", "dist/server.js"]

Deployment:

# Build
docker build -t company-kb-mcp:1.0.0 .

# Run
docker run -d \
  --name kb-mcp-server \
  -e KB_API_KEY="${KB_API_KEY}" \
  -v /var/run/docker.sock:/var/run/docker.sock \
  company-kb-mcp:1.0.0

๐Ÿ’ก Active Recall: Which MCP server pattern would solve your team's biggest integration challenge? Data Source, Action Executor, Prompt Library, or Observability?


Part 3: Claude API & Programmatic Usage

Resources:

Authentication Methods Comparison

| Method | Best For | Data Residency | Enterprise Features | |--------|----------|----------------|---------------------| | ๐Ÿ”‘ Direct API | Quick start, prototyping | US | Basic | | โ˜๏ธ AWS Bedrock | Production, compliance | Configurable | SSO, VPC, audit logs | | ๐ŸŒ GCP Vertex AI | GCP-native apps | Configurable | IAM, VPC Service Controls |

Option 1: Direct API Key

from anthropic import Anthropic

client = Anthropic(api_key=os.environ["ANTHROPIC_API_KEY"])

Option 2: Amazon Bedrock

from anthropic import AnthropicBedrock

client = AnthropicBedrock(
    aws_region="us-east-1",
    aws_access_key=os.environ["AWS_ACCESS_KEY_ID"],
    aws_secret_key=os.environ["AWS_SECRET_ACCESS_KEY"]
)

Option 3: Google Cloud Vertex AI

from anthropic import AnthropicVertex

client = AnthropicVertex(
    region="us-central1",
    project_id="my-project"
)

Pattern 1: Headless Automation ๐Ÿค–

Use Case: CI/CD pipelines, cron jobs

Example: Automated Code Review

.github/workflows/claude-review.yml:

name: Claude Code Review

on:
  pull_request:
    types: [opened, synchronize]

jobs:
  review:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4

      - name: Claude Review
        env:
          ANTHROPIC_API_KEY: ${{ secrets.ANTHROPIC_API_KEY }}
        run: |
          claude -p "Review PR #${{ github.event.pull_request.number }} for security issues" \
            --output-format stream-json \
            --allowedTools read,grep,git \
            > review.json

      - name: Post Review
        uses: actions/github-script@v7
        with:
          script: |
            const fs = require('fs');
            const review = JSON.parse(fs.readFileSync('review.json', 'utf8'));

            await github.rest.issues.createComment({
              owner: context.repo.owner,
              repo: context.repo.repo,
              issue_number: context.issue.number,
              body: review.content
            });

Pattern 2: Multi-Instance Orchestration ๐ŸŽญ

Use Case: Parallel workflows, specialized agents

Example: Code + Tests + Review

parallel-development.sh:

#!/bin/bash
set -e

PR_BRANCH="feature/new-api"
BASE_DIR=$(pwd)

# Create separate worktrees
git worktree add /tmp/impl-${PR_BRANCH} ${PR_BRANCH}
git worktree add /tmp/tests-${PR_BRANCH} ${PR_BRANCH}
git worktree add /tmp/review-${PR_BRANCH} ${PR_BRANCH}

# Parallel execution
(
  cd /tmp/impl-${PR_BRANCH}
  claude -p "Implement REST API endpoint for user profile updates per SPEC.md" \
    --output-format stream-json \
    > impl-result.json
) &
IMPL_PID=$!

(
  cd /tmp/tests-${PR_BRANCH}
  claude -p "Write comprehensive tests for user profile API endpoint" \
    --output-format stream-json \
    > tests-result.json
) &
TESTS_PID=$!

# Wait for implementation and tests
wait $IMPL_PID $TESTS_PID

# Review after both complete
(
  cd /tmp/review-${PR_BRANCH}
  claude -p "Review implementation and tests for completeness and quality" \
    --output-format stream-json \
    > review-result.json
)

# Merge results
echo "โœ… Implementation complete"
echo "โœ… Tests complete"
echo "โœ… Review complete"

# Cleanup
git worktree remove /tmp/impl-${PR_BRANCH}
git worktree remove /tmp/tests-${PR_BRANCH}
git worktree remove /tmp/review-${PR_BRANCH}

Pattern 3: Programmatic SDK Usage ๐Ÿ’ป

Example: Batch Code Review

batch-review.py:

#!/usr/bin/env python3
import os
import sys
from anthropic import Anthropic
from github import Github

client = Anthropic(api_key=os.environ["ANTHROPIC_API_KEY"])
github = Github(os.environ["GITHUB_TOKEN"])

def review_file(file_path: str, content: str, context: str) -> str:
    """Review a single file using Claude"""

    response = client.messages.create(
        model="claude-sonnet-4-5-20250929",
        max_tokens=4096,
        messages=[{
            "role": "user",
            "content": f"""Review this code file for security and quality issues.

**File**: {file_path}
**Context**: {context}

**Code**:

{content}


**Focus on**:
- Security vulnerabilities (SQL injection, XSS, etc.)
- Error handling
- Performance concerns
- Best practices violations

**Format**: Markdown with severity levels (๐Ÿ”ด Critical, ๐ŸŸก Warning, ๐ŸŸข Info)
"""
        }]
    )

    return response.content[0].text

def review_pr(repo_name: str, pr_number: int):
    """Review all files in a PR"""

    repo = github.get_repo(repo_name)
    pr = repo.get_pull(pr_number)

    print(f"Reviewing PR #{pr_number}: {pr.title}")

    reviews = []
    for file in pr.get_files():
        # Skip if too large or not code
        if file.changes > 500 or not file.filename.endswith(('.py', '.js', '.ts', '.java')):
            continue

        print(f"  Reviewing {file.filename}...")

        # Get file content
        content = repo.get_contents(file.filename, ref=pr.head.sha).decoded_content.decode()

        # Review with Claude
        review = review_file(
            file_path=file.filename,
            content=content,
            context=f"PR #{pr_number}: {pr.title}"
        )

        reviews.append({
            'file': file.filename,
            'review': review
        })

    # Post consolidated review
    review_body = "## Claude Code Review\n\n"
    for r in reviews:
        review_body += f"### {r['file']}\n\n{r['review']}\n\n---\n\n"

    pr.create_issue_comment(review_body)

    print(f"โœ… Review posted to PR #{pr_number}")

if __name__ == "__main__":
    if len(sys.argv) != 3:
        print("Usage: batch-review.py <repo> <pr_number>")
        sys.exit(1)

    review_pr(sys.argv[1], int(sys.argv[2]))

Usage:

$ python batch-review.py company/api-server 142

Reviewing PR #142: Add user authentication
  Reviewing src/auth/login.py...
  Reviewing src/auth/tokens.py...
  Reviewing tests/test_auth.py...
โœ… Review posted to PR #142

Pattern 4: Streaming Responses ๐Ÿ“ก

Use Case: Real-time feedback for long operations

streaming-generation.py:

from anthropic import Anthropic

client = Anthropic()

def generate_with_progress(prompt: str):
    """Generate code with real-time streaming"""

    print("Generating...\n")

    with client.messages.stream(
        model="claude-sonnet-4-5-20250929",
        max_tokens=4096,
        messages=[{"role": "user", "content": prompt}]
    ) as stream:
        for text in stream.text_stream:
            print(text, end="", flush=True)

    print("\n\nโœ… Generation complete")

# Usage
generate_with_progress("""
Create a Python FastAPI endpoint for user registration with:
- Email validation
- Password hashing (bcrypt)
- Rate limiting
- Comprehensive error handling
- OpenAPI documentation
""")

Output:

Generating...

from fastapi import APIRouter, HTTPException, Depends
from fastapi.security import OAuth2PasswordBearer
from pydantic import BaseModel, EmailStr
import bcrypt
from slowapi import Limiter
from slowapi.util import get_remote_address

router = APIRouter()
limiter = Limiter(key_func=get_remote_address)

class UserRegistration(BaseModel):
    email: EmailStr
    password: str

@router.post("/register", status_code=201)
@limiter.limit("5/minute")
async def register_user(user: UserRegistration):
    """Register a new user with email and password"""
    ...

โœ… Generation complete

๐Ÿ’ก Active Recall: Which programmatic pattern would accelerate your team's workflow most: Headless CI/CD, Multi-Instance Orchestration, Batch Processing, or Streaming?


Part 4: Enterprise Security & Governance

Resources:

The Security Challenge

The Data:

| Security Gap | Percentage | Risk Level | |-------------|-----------|-----------| | ๐Ÿ‘๏ธ Lack visibility into AI risks | 64% | ๐Ÿ”ด Critical | | ๐Ÿšจ Cite AI data leaks as top concern | 69% | ๐Ÿ”ด Critical | | ๐Ÿ›ก๏ธ Have NO AI-specific security controls | 47% | ๐Ÿ”ด Critical | | ๐Ÿ“„ Files with PII/PCI uploaded to GenAI | 40% | ๐ŸŸก High | | ๐Ÿ‘ค AI usage through unmanaged accounts | 67% | ๐ŸŸก High |

The Stakes: One leaked API key, one exposed database, one PII breach = millions in fines, reputation damage, customer loss.

Enterprise Security Framework - 4 layers of protection Enterprise Security Framework: Four comprehensive layers including authentication & authorization, data protection with PII/PCI detection, audit & compliance with GDPR/HIPAA/SOC2, and governance policies with automated incident response

The 4 Security Layers

| Layer | Purpose | Key Technologies | Compliance Impact | |-------|---------|-----------------|-------------------| | ๐Ÿ” Layer 1 | Authentication & Authorization | SSO, RBAC, MFA | Access control | | ๐Ÿ›ก๏ธ Layer 2 | Data Protection | PII detection, encryption, residency | Privacy regulations | | ๐Ÿ“‹ Layer 3 | Audit & Compliance | Logging, reporting, retention | Regulatory proof | | ๐Ÿ“œ Layer 4 | Governance | Policy-as-code, enforcement | Operational control |

Layer 1: Authentication & Authorization ๐Ÿ”

SSO Integration

Example: SAML 2.0 with Okta

saml-auth.py:

from onelogin.saml2.auth import OneLogin_Saml2_Auth
from flask import Flask, request, redirect, session

app = Flask(__name__)
app.secret_key = os.environ["SECRET_KEY"]

SAML_SETTINGS = {
    "sp": {
        "entityId": "https://claude.company.com",
        "assertionConsumerService": {
            "url": "https://claude.company.com/saml/acs",
            "binding": "urn:oasis:names:tc:SAML:2.0:bindings:HTTP-POST"
        }
    },
    "idp": {
        "entityId": "http://www.okta.com/exk123456",
        "singleSignOnService": {
            "url": "https://company.okta.com/app/exk123456/sso/saml",
            "binding": "urn:oasis:names:tc:SAML:2.0:bindings:HTTP-Redirect"
        },
        "x509cert": os.environ["OKTA_CERT"]
    }
}

@app.route("/saml/login")
def saml_login():
    auth = OneLogin_Saml2_Auth(request, SAML_SETTINGS)
    return redirect(auth.login())

@app.route("/saml/acs", methods=["POST"])
def saml_acs():
    auth = OneLogin_Saml2_Auth(request, SAML_SETTINGS)
    auth.process_response()

    if not auth.is_authenticated():
        return "Authentication failed", 401

    # Store user info
    session["user"] = {
        "email": auth.get_nameid(),
        "attributes": auth.get_attributes()
    }

    # Get user roles from SAML attributes
    roles = auth.get_attributes().get("roles", [])
    session["roles"] = roles

    return redirect("/dashboard")

RBAC Implementation

Role Permission Matrix:

| Role | Permissions | Typical Users | Risk Level | |------|------------|---------------|-----------| | ๐Ÿ‘จโ€๐Ÿ’ป Developer | read_code, write_code, create_branch, create_pr | Engineers | ๐ŸŸข Low | | ๐Ÿ‘€ Reviewer | read_code, comment_pr, approve_pr | Tech Leads | ๐ŸŸข Low | | ๐Ÿ”ง Admin | All permissions (*) | Platform Team | ๐Ÿ”ด Critical | | ๐Ÿ” Security Auditor | read_code, read_logs, read_audit_trail | Security Team | ๐ŸŸก Medium |

roles.yaml:

roles:
  developer:
    permissions:
      - read_code
      - write_code
      - create_branch
      - create_pr

  reviewer:
    permissions:
      - read_code
      - comment_pr
      - approve_pr

  admin:
    permissions:
      - "*"  # All permissions

  security_auditor:
    permissions:
      - read_code
      - read_logs
      - read_audit_trail

rbac.py:

class RBAC:
    def __init__(self, roles_config):
        self.roles = roles_config["roles"]

    def check_permission(self, user_roles: list[str], permission: str) -> bool:
        """Check if user has required permission"""

        for role in user_roles:
            role_perms = self.roles.get(role, {}).get("permissions", [])

            # Wildcard permission
            if "*" in role_perms:
                return True

            # Exact match
            if permission in role_perms:
                return True

        return False

    def require_permission(self, permission: str):
        """Decorator to enforce permission"""

        def decorator(func):
            def wrapper(*args, **kwargs):
                user_roles = session.get("roles", [])

                if not self.check_permission(user_roles, permission):
                    raise PermissionError(
                        f"Permission denied: {permission} required"
                    )

                return func(*args, **kwargs)

            return wrapper
        return decorator

rbac = RBAC(yaml.safe_load(open("roles.yaml")))

@app.route("/api/code/write", methods=["POST"])
@rbac.require_permission("write_code")
def write_code():
    """Write code endpoint (requires write_code permission)"""
    # Implementation

Layer 2: Data Protection ๐Ÿ›ก๏ธ

PII/PCI Detection Patterns:

| Data Type | Pattern Example | Severity | Regulation | |-----------|----------------|----------|------------| | ๐Ÿ”ข SSN | 123-45-6789 | ๐Ÿ”ด Critical | GDPR, HIPAA | | ๐Ÿ’ณ Credit Card | 4532-1234-5678-9010 | ๐Ÿ”ด Critical | PCI DSS | | ๐Ÿ“ง Email | user@example.com | ๐ŸŸก Warning | GDPR, CCPA | | ๐Ÿ”‘ API Key | api_key: sk_live_abc123... | ๐Ÿ”ด Critical | All | | โ˜๏ธ AWS Key | AKIA123456789ABCDEFG | ๐Ÿ”ด Critical | All | | ๐Ÿ” Private Key | -----BEGIN RSA PRIVATE KEY----- | ๐Ÿ”ด Critical | All |

data-scanner.py:

import re
from typing import List, Tuple

class DataProtectionScanner:
    PATTERNS = {
        'ssn': {
            'pattern': r'\b\d{3}-\d{2}-\d{4}\b',
            'description': 'Social Security Number',
            'severity': 'CRITICAL'
        },
        'credit_card': {
            'pattern': r'\b\d{4}[- ]?\d{4}[- ]?\d{4}[- ]?\d{4}\b',
            'description': 'Credit Card Number',
            'severity': 'CRITICAL'
        },
        'email': {
            'pattern': r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b',
            'description': 'Email Address',
            'severity': 'WARNING'
        },
        'api_key': {
            'pattern': r'(api[_-]?key|apikey|access[_-]?token)[\'"\s:=]+([a-zA-Z0-9]{32,})',
            'description': 'API Key / Access Token',
            'severity': 'CRITICAL'
        },
        'aws_key': {
            'pattern': r'AKIA[0-9A-Z]{16}',
            'description': 'AWS Access Key',
            'severity': 'CRITICAL'
        },
        'private_key': {
            'pattern': r'-----BEGIN (RSA|DSA|EC|OPENSSH) PRIVATE KEY-----',
            'description': 'Private Key',
            'severity': 'CRITICAL'
        }
    }

    def scan(self, content: str) -> List[Tuple[str, str, str]]:
        """Scan content for sensitive data

        Returns: List of (type, description, matched_text)
        """
        findings = []

        for data_type, config in self.PATTERNS.items():
            matches = re.finditer(config['pattern'], content, re.IGNORECASE)

            for match in matches:
                findings.append((
                    data_type,
                    config['description'],
                    config['severity'],
                    match.group(0)[:50]  # Truncate for logging
                ))

        return findings

class DataProtectionError(Exception):
    """Raised when sensitive data detected"""
    pass

scanner = DataProtectionScanner()

def protect_claude_input(content: str):
    """Scan input before sending to Claude"""

    findings = scanner.scan(content)

    critical = [f for f in findings if f[2] == 'CRITICAL']

    if critical:
        # Log security incident
        security_log.warning(
            "Sensitive data detected in Claude input",
            user=session["user"]["email"],
            data_types=[f[0] for f in critical]
        )

        # Block request
        raise DataProtectionError(
            f"Cannot process: {len(critical)} sensitive data patterns detected"
        )

    return True

# Use in API endpoint
@app.route("/api/claude/chat", methods=["POST"])
def claude_chat():
    content = request.json["message"]

    # Scan before processing
    protect_claude_input(content)

    # Safe to proceed
    response = claude_client.messages.create(...)
    return jsonify(response)

Data Residency Controls

Regional Configuration Table:

| Region | AWS Bedrock | GCP Vertex AI | Compliance | |--------|------------|---------------|------------| | ๐Ÿ‡บ๐Ÿ‡ธ US | us-east-1 | us-central1 | CCPA, SOC 2 | | ๐Ÿ‡ช๐Ÿ‡บ EU | eu-west-1 | europe-west1 | GDPR | | ๐ŸŒ APAC | ap-southeast-1 | asia-southeast1 | Local regulations |

claude-client.py:

class EnterpriseClaudeClient:
    REGION_CONFIGS = {
        'us': {
            'bedrock_region': 'us-east-1',
            'vertex_region': 'us-central1',
            'data_residency': 'United States'
        },
        'eu': {
            'bedrock_region': 'eu-west-1',
            'vertex_region': 'europe-west1',
            'data_residency': 'European Union'
        },
        'apac': {
            'bedrock_region': 'ap-southeast-1',
            'vertex_region': 'asia-southeast1',
            'data_residency': 'Asia Pacific'
        }
    }

    def __init__(self, data_residency: str):
        if data_residency not in self.REGION_CONFIGS:
            raise ValueError(f"Invalid data residency: {data_residency}")

        config = self.REGION_CONFIGS[data_residency]

        # Use AWS Bedrock for data residency
        from anthropic import AnthropicBedrock

        self.client = AnthropicBedrock(
            aws_region=config['bedrock_region']
        )

        self.data_residency = config['data_residency']

    def complete(self, **kwargs):
        """Ensure data stays in configured region"""

        # Add metadata for audit
        kwargs.setdefault('metadata', {})
        kwargs['metadata']['data_residency'] = self.data_residency

        return self.client.messages.create(**kwargs)

# Usage based on user location
user_country = session["user"]["country"]

if user_country in ["DE", "FR", "IT", "ES"]:
    client = EnterpriseClaudeClient("eu")
elif user_country in ["US", "CA"]:
    client = EnterpriseClaudeClient("us")
else:
    client = EnterpriseClaudeClient("apac")

Layer 3: Audit & Compliance ๐Ÿ“‹

Comprehensive Audit Logging

Audit Event Types:

| Event Type | Information Captured | Retention | Purpose | |-----------|---------------------|-----------|---------| | ๐Ÿ” Authentication | User, timestamp, IP, success/fail | 1 year | Security forensics | | ๐Ÿ’ฌ Chat Request | User, message length, sensitive data flags | 1 year | Compliance audit | | ๐Ÿ“Š Chat Response | Model, tokens, cost, latency | 1 year | Cost & performance | | โš ๏ธ Security Incident | Type, severity, user, action taken | 7 years | Regulatory requirement | | ๐Ÿ”ง Configuration Change | User, before/after, timestamp | 3 years | Change management |

audit-log.py:

import json
from datetime import datetime
from typing import Dict, Any

class AuditLogger:
    def __init__(self, log_path: str = "/var/log/claude/audit.jsonl"):
        self.log_path = log_path

    def log(self, event_type: str, **kwargs):
        """Log audit event"""

        event = {
            "timestamp": datetime.utcnow().isoformat() + "Z",
            "event_type": event_type,
            **kwargs
        }

        with open(self.log_path, "a") as f:
            f.write(json.dumps(event) + "\n")

audit = AuditLogger()

# Log all Claude operations
@app.route("/api/claude/chat", methods=["POST"])
def claude_chat():
    user = session["user"]
    message = request.json["message"]

    # Scan for sensitive data
    findings = scanner.scan(message)

    # Create audit event
    audit.log(
        event_type="claude_chat_request",
        user_email=user["email"],
        user_roles=user["roles"],
        message_length=len(message),
        sensitive_data_detected=len(findings) > 0,
        sensitive_data_types=[f[0] for f in findings] if findings else [],
        ip_address=request.remote_addr,
        user_agent=request.headers.get("User-Agent")
    )

    # Process request
    response = claude_client.messages.create(
        model="claude-sonnet-4-5-20250929",
        max_tokens=4096,
        messages=[{"role": "user", "content": message}]
    )

    # Log response
    audit.log(
        event_type="claude_chat_response",
        user_email=user["email"],
        model=response.model,
        tokens_used=response.usage.total_tokens,
        cost_usd=calculate_cost(response.usage),
        latency_ms=(response.stop_reason_timestamp - response.start_timestamp) * 1000
    )

    return jsonify(response)

Compliance Report Generation

compliance-report.py:

def generate_compliance_report(start_date, end_date):
    """Generate compliance report for audit period"""

    # Read audit logs
    events = []
    with open("/var/log/claude/audit.jsonl") as f:
        for line in f:
            event = json.loads(line)
            if start_date <= event["timestamp"] <= end_date:
                events.append(event)

    # Calculate metrics
    total_requests = len([e for e in events if e["event_type"] == "claude_chat_request"])
    sensitive_data_blocked = len([e for e in events
                                   if e.get("sensitive_data_detected")])
    unique_users = len(set(e["user_email"] for e in events))
    total_tokens = sum(e.get("tokens_used", 0) for e in events
                       if e["event_type"] == "claude_chat_response")
    total_cost = sum(e.get("cost_usd", 0) for e in events
                     if e["event_type"] == "claude_chat_response")

    report = {
        "period": {
            "start": start_date,
            "end": end_date
        },
        "usage": {
            "total_requests": total_requests,
            "unique_users": unique_users,
            "total_tokens": total_tokens,
            "total_cost_usd": round(total_cost, 2)
        },
        "security": {
            "sensitive_data_incidents": sensitive_data_blocked,
            "incident_rate": round(sensitive_data_blocked / total_requests * 100, 2)
        },
        "compliance": {
            "audit_logs_complete": True,
            "data_residency_enforced": True,
            "access_controls_enabled": True,
            "encryption_at_rest": True,
            "encryption_in_transit": True
        }
    }

    return report

Layer 4: Governance Policies ๐Ÿ“œ

Policy-as-Code

.claude/policies/enterprise.yaml:

version: "1.0"

security:
  pii_detection:
    enabled: true
    block_on_match: true
    alert_security_team: true
    patterns:
      - ssn
      - credit_card
      - api_key
      - private_key

  data_residency:
    enforce: true
    allowed_regions:
      - us-east-1  # US users
      - eu-west-1  # EU users

access_control:
  authentication:
    required: true
    methods:
      - saml
      - oauth2

  authorization:
    model: rbac
    roles_file: roles.yaml

usage_limits:
  per_user:
    requests_per_hour: 100
    tokens_per_day: 1000000

  per_team:
    requests_per_hour: 1000
    cost_per_month_usd: 10000

content_filters:
  block_patterns:
    - pattern: "prod|production"
      scope: "database_queries"
      message: "Cannot access production databases"

    - pattern: "DROP TABLE|DELETE FROM"
      scope: "sql"
      message: "Destructive SQL not allowed"

audit:
  retention_days: 365
  export_format: jsonl
  compliance_frameworks:
    - gdpr
    - hipaa
    - soc2

Policy Enforcement

Usage Limit Enforcement Table:

| Limit Type | Threshold | Action | Reset Period | |-----------|----------|--------|--------------| | โฑ๏ธ Hourly Requests | 100/user | Block request | 1 hour rolling | | ๐Ÿ“Š Daily Tokens | 1M/user | Block request | Midnight UTC | | ๐Ÿ’ฐ Monthly Cost | $10K/team | Alert + soft limit | Monthly | | ๐Ÿšจ Critical Pattern | 1 match | Block + alert security | N/A |

policy-engine.py:

import yaml

class PolicyEngine:
    def __init__(self, policy_file: str):
        with open(policy_file) as f:
            self.policy = yaml.safe_load(f)

    def enforce_usage_limits(self, user_email: str):
        """Check if user within usage limits"""

        limits = self.policy["usage_limits"]["per_user"]

        # Check hourly requests
        recent_requests = audit.count_events(
            event_type="claude_chat_request",
            user_email=user_email,
            since=datetime.now() - timedelta(hours=1)
        )

        if recent_requests >= limits["requests_per_hour"]:
            raise RateLimitError(
                f"Hourly limit exceeded: {recent_requests}/{limits['requests_per_hour']}"
            )

        # Check daily tokens
        today_tokens = audit.sum_field(
            event_type="claude_chat_response",
            user_email=user_email,
            field="tokens_used",
            since=datetime.now().replace(hour=0, minute=0, second=0)
        )

        if today_tokens >= limits["tokens_per_day"]:
            raise RateLimitError(
                f"Daily token limit exceeded: {today_tokens}/{limits['tokens_per_day']}"
            )

        return True

    def check_content_filters(self, content: str, scope: str):
        """Apply content filters"""

        filters = self.policy.get("content_filters", {}).get("block_patterns", [])

        for filter in filters:
            if filter["scope"] != scope:
                continue

            if re.search(filter["pattern"], content, re.IGNORECASE):
                raise ContentFilterError(filter["message"])

        return True

policy = PolicyEngine(".claude/policies/enterprise.yaml")

@app.route("/api/claude/chat", methods=["POST"])
def claude_chat():
    user = session["user"]
    message = request.json["message"]

    # Enforce policies
    policy.enforce_usage_limits(user["email"])
    policy.check_content_filters(message, "general")

    # Process request
    ...

๐Ÿ’ก Active Recall: If you had to choose only ONE security layer to implement first, which would protect your organization most: Authentication, Data Protection, Audit Logging, or Governance Policies?


Part 5: Production Observability

Resources:

The Observability Challenge

Why AI is Different:

| Traditional Systems | AI Systems | Implication | |--------------------|-----------|-------------| | โœ… Deterministic outputs | โŒ Probabilistic outputs | Same input โ‰  same output | | โœ… Simple dependencies | โŒ Complex dependencies | Models + tools + data sources | | โœ… Predictable latency | โŒ Variable latency | Token generation is non-deterministic | | โœ… Uptime metrics | โŒ Quality metrics | Need acceptance rate, not just uptime |

Traditional monitoring is insufficient.

Production Observability Stack - Application to Visualization Production Observability Stack: Complete monitoring architecture from application instrumentation with OpenTelemetry through collection, storage (Prometheus/Loki/Jaeger), and visualization with Grafana dashboards and alerting

Essential Metrics to Track

Performance Metrics Hierarchy:

| Category | Metric | Why It Matters | SLA Target | |----------|--------|---------------|-----------| | ๐Ÿ“Š Volume | claude_requests_total | Usage trends, capacity planning | N/A | | โฑ๏ธ Latency | claude_latency_seconds (p50/p95/p99) | User experience | p95 < 5s | | ๐Ÿช™ Tokens | claude_tokens_used_total | Cost driver | Track trends | | ๐Ÿ’ฐ Cost | claude_cost_usd_total | Budget management | <$50K/month | | โœ… Quality | claude_acceptance_rate | Output usefulness | >70% | | โŒ Errors | claude_error_rate | Reliability | <1% |

Implementation:

from prometheus_client import Counter, Histogram, Gauge

# Request metrics
claude_requests_total = Counter(
    'claude_requests_total',
    'Total Claude API requests',
    ['model', 'status', 'user']
)

claude_latency_seconds = Histogram(
    'claude_latency_seconds',
    'Claude API request latency',
    ['model'],
    buckets=[0.1, 0.5, 1.0, 2.0, 5.0, 10.0, 30.0]
)

claude_tokens_used = Counter(
    'claude_tokens_used_total',
    'Total tokens used',
    ['model', 'user', 'team']
)

# Quality metrics
claude_acceptance_rate = Gauge(
    'claude_acceptance_rate',
    'Percentage of Claude suggestions accepted',
    ['feature']
)

claude_error_rate = Gauge(
    'claude_error_rate',
    'Percentage of requests resulting in errors',
    ['error_type']
)

# Cost metrics
claude_cost_usd = Counter(
    'claude_cost_usd_total',
    'Total cost in USD',
    ['model', 'team']
)

Usage:

@app.route("/api/claude/chat", methods=["POST"])
def claude_chat():
    user = session["user"]
    start_time = time.time()

    try:
        response = claude_client.messages.create(
            model="claude-sonnet-4-5-20250929",
            max_tokens=4096,
            messages=[{"role": "user", "content": request.json["message"]}]
        )

        # Record metrics
        claude_requests_total.labels(
            model=response.model,
            status="success",
            user=user["email"]
        ).inc()

        latency = time.time() - start_time
        claude_latency_seconds.labels(model=response.model).observe(latency)

        claude_tokens_used.labels(
            model=response.model,
            user=user["email"],
            team=user["team"]
        ).inc(response.usage.total_tokens)

        cost = calculate_cost(response.model, response.usage.total_tokens)
        claude_cost_usd.labels(
            model=response.model,
            team=user["team"]
        ).inc(cost)

        return jsonify(response)

    except Exception as error:
        claude_requests_total.labels(
            model="claude-sonnet-4-5-20250929",
            status="error",
            user=user["email"]
        ).inc()

        raise

OpenTelemetry Integration ๐Ÿ”

Complete Distributed Tracing

otel-setup.py:

from opentelemetry import trace, metrics
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader
from opentelemetry.exporter.otlp.proto.http.metric_exporter import OTLPMetricExporter
from opentelemetry.sdk.resources import Resource

# Configure resource
resource = Resource.create({
    "service.name": "claude-integration",
    "service.version": "1.0.0",
    "deployment.environment": os.environ.get("ENV", "production")
})

# Tracing
trace_provider = TracerProvider(resource=resource)
trace_exporter = OTLPSpanExporter(endpoint="http://localhost:4318/v1/traces")
trace_provider.add_span_processor(BatchSpanProcessor(trace_exporter))
trace.set_tracer_provider(trace_provider)

# Metrics
metric_reader = PeriodicExportingMetricReader(
    OTLPMetricExporter(endpoint="http://localhost:4318/v1/metrics")
)
metric_provider = MeterProvider(
    resource=resource,
    metric_readers=[metric_reader]
)
metrics.set_meter_provider(metric_provider)

tracer = trace.get_tracer(__name__)
meter = metrics.get_meter(__name__)

Instrumented Client:

class ObservableClaudeClient:
    def __init__(self, client):
        self.client = client
        self.tracer = trace.get_tracer(__name__)
        self.meter = metrics.get_meter(__name__)

        # Metrics
        self.request_counter = self.meter.create_counter(
            "claude.requests",
            description="Number of Claude API requests"
        )

        self.token_counter = self.meter.create_counter(
            "claude.tokens",
            description="Total tokens used"
        )

        self.latency_histogram = self.meter.create_histogram(
            "claude.latency",
            unit="ms",
            description="Request latency in milliseconds"
        )

    def messages_create(self, **kwargs):
        """Instrumented messages.create"""

        span = self.tracer.start_span(
            "claude.messages.create",
            attributes={
                "claude.model": kwargs.get("model"),
                "claude.max_tokens": kwargs.get("max_tokens"),
                "claude.user": kwargs.get("metadata", {}).get("user_id")
            }
        )

        start_time = time.time()

        try:
            response = self.client.messages.create(**kwargs)

            # Record metrics
            self.request_counter.add(1, {
                "model": response.model,
                "status": "success"
            })

            self.token_counter.add(response.usage.total_tokens, {
                "model": response.model,
                "type": "total"
            })

            self.token_counter.add(response.usage.input_tokens, {
                "model": response.model,
                "type": "input"
            })

            self.token_counter.add(response.usage.output_tokens, {
                "model": response.model,
                "type": "output"
            })

            latency_ms = (time.time() - start_time) * 1000
            self.latency_histogram.record(latency_ms, {
                "model": response.model
            })

            # Add to span
            span.set_attributes({
                "claude.response.id": response.id,
                "claude.response.model": response.model,
                "claude.response.stop_reason": response.stop_reason,
                "claude.usage.input_tokens": response.usage.input_tokens,
                "claude.usage.output_tokens": response.usage.output_tokens,
                "claude.usage.total_tokens": response.usage.total_tokens,
                "claude.latency_ms": latency_ms
            })

            span.set_status(trace.Status(trace.StatusCode.OK))

            return response

        except Exception as error:
            self.request_counter.add(1, {
                "model": kwargs.get("model"),
                "status": "error"
            })

            span.record_exception(error)
            span.set_status(trace.Status(
                trace.StatusCode.ERROR,
                str(error)
            ))

            raise

        finally:
            span.end()

# Usage
observable_client = ObservableClaudeClient(claude_client)
response = observable_client.messages_create(...)

Alerting Rules ๐Ÿšจ

Alert Severity Levels:

| Severity | Condition | Response Time | Escalation | |----------|-----------|--------------|------------| | ๐ŸŸข Info | < 1% error rate | Review daily | None | | ๐ŸŸก Warning | 5-20% error rate, p95 > 10s | Review within 1 hour | After 4 hours | | ๐Ÿ”ด Critical | > 20% error rate, budget exceeded | Immediate (page) | After 15 minutes |

prometheus-alerts.yml:

groups:
  - name: claude_integration
    interval: 30s
    rules:
      # Error rate alerts
      - alert: HighErrorRate
        expr: |
          rate(claude_requests_total{status="error"}[5m])
          / rate(claude_requests_total[5m]) > 0.05
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "Claude API error rate above 5%"
          description: "{{ $value | humanizePercentage }} of requests failing"

      - alert: CriticalErrorRate
        expr: |
          rate(claude_requests_total{status="error"}[5m])
          / rate(claude_requests_total[5m]) > 0.20
        for: 2m
        labels:
          severity: critical
        annotations:
          summary: "Claude API error rate above 20%"

      # Latency alerts
      - alert: HighLatency
        expr: |
          histogram_quantile(0.95,
            rate(claude_latency_seconds_bucket[5m])
          ) > 10
        for: 10m
        labels:
          severity: warning
        annotations:
          summary: "Claude API p95 latency above 10 seconds"

      - alert: CriticalLatency
        expr: |
          histogram_quantile(0.95,
            rate(claude_latency_seconds_bucket[5m])
          ) > 30
        for: 5m
        labels:
          severity: critical
        annotations:
          summary: "Claude API p95 latency above 30 seconds"

      # Cost alerts
      - alert: CostAnomaly
        expr: |
          increase(claude_cost_usd_total[1h]) > 1000
        labels:
          severity: warning
        annotations:
          summary: "Abnormal cost spike detected"
          description: "${{ $value }} spent in last hour"

      - alert: MonthlyBudgetExceeded
        expr: |
          sum(increase(claude_cost_usd_total[30d])) > 50000
        labels:
          severity: critical
        annotations:
          summary: "Monthly budget of $50,000 exceeded"

      # Quality alerts
      - alert: LowAcceptanceRate
        expr: |
          claude_acceptance_rate < 0.50
        for: 1h
        labels:
          severity: warning
        annotations:
          summary: "Claude suggestion acceptance rate below 50%"

      # Usage alerts
      - alert: UnusualTokenUsage
        expr: |
          increase(claude_tokens_used_total[1h])
          > 4 * avg_over_time(increase(claude_tokens_used_total[1h])[7d])
        labels:
          severity: warning
        annotations:
          summary: "Token usage 4x higher than weekly average"

Dashboards ๐Ÿ“Š

Key Dashboard Panels:

| Panel | Metric | Visualization | Update Frequency | |-------|--------|---------------|------------------| | ๐Ÿ“ˆ Request Rate | rate(claude_requests_total[5m]) | Time series | 5 seconds | | โฑ๏ธ Latency | histogram_quantile(0.95, ...) | Time series (p50/p95/p99) | 5 seconds | | ๐Ÿช™ Token Usage | sum by (team) (rate(tokens[1h])) | Stacked area chart | 1 minute | | ๐Ÿ’ฐ Cost | sum(increase(cost[30d])) | Gauge + trend | 5 minutes | | โœ… Acceptance Rate | claude_acceptance_rate | Gauge by feature | 1 minute |

Grafana Dashboard (JSON export):

{
  "title": "Claude Code Production",
  "panels": [
    {
      "title": "Request Rate",
      "targets": [{
        "expr": "rate(claude_requests_total[5m])",
        "legendFormat": "{{ status }}"
      }]
    },
    {
      "title": "Latency (p50, p95, p99)",
      "targets": [
        {
          "expr": "histogram_quantile(0.50, rate(claude_latency_seconds_bucket[5m]))",
          "legendFormat": "p50"
        },
        {
          "expr": "histogram_quantile(0.95, rate(claude_latency_seconds_bucket[5m]))",
          "legendFormat": "p95"
        },
        {
          "expr": "histogram_quantile(0.99, rate(claude_latency_seconds_bucket[5m]))",
          "legendFormat": "p99"
        }
      ]
    },
    {
      "title": "Token Usage by Team",
      "targets": [{
        "expr": "sum by (team) (rate(claude_tokens_used_total[1h]))",
        "legendFormat": "{{ team }}"
      }]
    },
    {
      "title": "Cost (Last 30 Days)",
      "targets": [{
        "expr": "sum(increase(claude_cost_usd_total[30d]))",
        "legendFormat": "Total"
      }]
    },
    {
      "title": "Acceptance Rate by Feature",
      "targets": [{
        "expr": "claude_acceptance_rate",
        "legendFormat": "{{ feature }}"
      }]
    }
  ]
}

๐Ÿ’ก Active Recall: If your Claude integration started failing right now, which metrics would you check FIRST to diagnose the issue?


Part 6: Third-Party Integrations

Slack Integration ๐Ÿ’ฌ

Pattern: Bot with Claude Backend

slack-bot.py:

from slack_bolt import App
from anthropic import Anthropic

app = App(token=os.environ["SLACK_BOT_TOKEN"])
claude = Anthropic(api_key=os.environ["ANTHROPIC_API_KEY"])

@app.event("app_mention")
def handle_mention(event, say):
    """Respond to @mentions"""

    # Extract question
    question = event["text"].split(">", 1)[1].strip()

    # Get Claude response
    response = claude.messages.create(
        model="claude-sonnet-4-5-20250929",
        max_tokens=1024,
        messages=[{
            "role": "user",
            "content": f"Answer this Slack question concisely:\n\n{question}"
        }],
        metadata={
            "user_id": event["user"],
            "channel_id": event["channel"]
        }
    )

    # Reply in thread
    say(
        text=response.content[0].text,
        thread_ts=event.get("thread_ts", event["ts"])
    )

@app.command("/claude")
def handle_slash_command(ack, command, respond):
    """Handle /claude slash command"""

    ack()  # Acknowledge immediately

    # Get Claude response
    response = claude.messages.create(
        model="claude-sonnet-4-5-20250929",
        max_tokens=1024,
        messages=[{
            "role": "user",
            "content": command["text"]
        }]
    )

    # Respond (only visible to user)
    respond(response.content[0].text)

if __name__ == "__main__":
    app.start(port=3000)

Pattern: Webhook Alerts

slack-alerts.py:

import requests

SLACK_WEBHOOK_URL = os.environ["SLACK_WEBHOOK_URL"]

def alert_security_team(incident: dict):
    """Alert security team of potential issue"""

    requests.post(
        SLACK_WEBHOOK_URL,
        json={
            "text": f"๐Ÿšจ Security Alert: {incident['type']}",
            "attachments": [{
                "color": "danger",
                "fields": [
                    {
                        "title": "User",
                        "value": incident["user_email"],
                        "short": True
                    },
                    {
                        "title": "Severity",
                        "value": incident["severity"],
                        "short": True
                    },
                    {
                        "title": "Details",
                        "value": incident["description"],
                        "short": False
                    },
                    {
                        "title": "Action Required",
                        "value": incident["action"],
                        "short": False
                    }
                ],
                "footer": "Claude Security Monitor",
                "ts": int(time.time())
            }]
        }
    )

# Use in data protection scanner
def protect_claude_input(content: str):
    findings = scanner.scan(content)

    if findings:
        alert_security_team({
            "type": "Sensitive Data Detected",
            "user_email": session["user"]["email"],
            "severity": "HIGH",
            "description": f"Detected {len(findings)} sensitive data patterns",
            "action": "Request blocked. Review user activity."
        })

        raise DataProtectionError("Sensitive data detected")

GitHub Integration ๐Ÿ™

Automated PR Review Bot

github-bot.py:

from github import Github
from anthropic import Anthropic
import hmac
import hashlib

github_client = Github(os.environ["GITHUB_TOKEN"])
claude = Anthropic()

@app.route("/github/webhook", methods=["POST"])
def github_webhook():
    """Handle GitHub webhook events"""

    # Verify signature
    signature = request.headers.get("X-Hub-Signature-256")
    if not verify_github_signature(request.data, signature):
        return "Invalid signature", 401

    event = request.headers.get("X-GitHub-Event")
    payload = request.json

    if event == "pull_request":
        if payload["action"] in ["opened", "synchronize"]:
            # New or updated PR
            review_pr(payload["pull_request"])

    return "OK", 200

def verify_github_signature(payload: bytes, signature: str) -> bool:
    """Verify GitHub webhook signature"""

    secret = os.environ["GITHUB_WEBHOOK_SECRET"].encode()
    expected = "sha256=" + hmac.new(secret, payload, hashlib.sha256).hexdigest()
    return hmac.compare_digest(expected, signature)

def review_pr(pr_data: dict):
    """Review PR with Claude"""

    repo = github_client.get_repo(pr_data["base"]["repo"]["full_name"])
    pr = repo.get_pull(pr_data["number"])

    # Get changed files
    files = pr.get_files()

    reviews = []
    for file in files:
        # Skip large files
        if file.changes > 500:
            continue

        # Get file content
        try:
            content = repo.get_contents(
                file.filename,
                ref=pr.head.sha
            ).decoded_content.decode()
        except:
            continue  # File deleted or binary

        # Review with Claude
        review = claude.messages.create(
            model="claude-sonnet-4-5-20250929",
            max_tokens=2048,
            messages=[{
                "role": "user",
                "content": f"""Review this code change for security and quality:

**File**: {file.filename}
**Changes**: +{file.additions} -{file.deletions}

{file.patch}


Focus on:
- Security vulnerabilities
- Error handling
- Performance issues
- Code quality

Format: Markdown list of issues with severity (๐Ÿ”ด/๐ŸŸก/๐ŸŸข)"""
            }]
        )

        reviews.append({
            "file": file.filename,
            "review": review.content[0].text
        })

    # Post consolidated review
    review_body = "## ๐Ÿค– Claude Code Review\n\n"
    for r in reviews:
        review_body += f"### {r['file']}\n\n{r['review']}\n\n"

    review_body += "\n---\n*Automated review by Claude Code*"

    pr.create_issue_comment(review_body)

Discord Integration ๐ŸŽฎ

discord-bot.py:

import discord
from anthropic import Anthropic

intents = discord.Intents.default()
intents.message_content = True

client = discord.Client(intents=intents)
claude = Anthropic()

@client.event
async def on_ready():
    print(f"Logged in as {client.user}")

@client.event
async def on_message(message):
    # Ignore own messages
    if message.author == client.user:
        return

    # Respond to !claude command
    if message.content.startswith("!claude"):
        prompt = message.content[8:].strip()

        if not prompt:
            await message.channel.send("Usage: `!claude <question>`")
            return

        # Show typing indicator
        async with message.channel.typing():
            response = claude.messages.create(
                model="claude-sonnet-4-5-20250929",
                max_tokens=1024,
                messages=[{
                    "role": "user",
                    "content": prompt
                }],
                metadata={
                    "user_id": str(message.author.id),
                    "channel_id": str(message.channel.id),
                    "guild_id": str(message.guild.id) if message.guild else None
                }
            )

        # Split response if too long (Discord limit: 2000 chars)
        content = response.content[0].text

        if len(content) <= 2000:
            await message.channel.send(content)
        else:
            # Split into chunks
            chunks = [content[i:i+1900] for i in range(0, len(content), 1900)]
            for chunk in chunks:
                await message.channel.send(chunk)

client.run(os.environ["DISCORD_BOT_TOKEN"])

Integration Comparison Table:

| Platform | Best For | Complexity | User Experience | |----------|---------|-----------|-----------------| | ๐Ÿ’ฌ Slack | Team collaboration, alerts | Low | Familiar, thread-based | | ๐Ÿ™ GitHub | Code review, PR automation | Medium | Integrated with workflow | | ๐ŸŽฎ Discord | Developer communities | Low | Real-time, casual | | ๐Ÿ“ง Email | Reports, scheduled digests | Low | Universal reach |


Part 7: Zimbra Production Deployment

Deployment Architecture Patterns - Single-tenant, Multi-tenant, and Federated Three Deployment Patterns: Single-tenant (dedicated per customer), Multi-tenant Shared (row-level security), and Federated Hybrid (data residency control for GDPR/HIPAA compliance with on-premise, cloud, and edge gateways)

Deployment Pattern Selection

| Pattern | Use When | Data Isolation | Cost | Compliance | |---------|----------|----------------|------|------------| | ๐Ÿข Single-Tenant | Regulated industries, large customers | โœ… Complete | ๐Ÿ’ฐ๐Ÿ’ฐ๐Ÿ’ฐ High | โœ… Maximum | | ๐Ÿญ Multi-Tenant | SaaS, startups, cost optimization | โš ๏ธ Row-level security | ๐Ÿ’ฐ Low | โš ๏ธ Requires design | | ๐ŸŒ Federated Hybrid | Global enterprises, data residency | โœ… Geographic | ๐Ÿ’ฐ๐Ÿ’ฐ Medium | โœ… Flexible |

Use Case 1: Custom MCP Server for Zimbra Admin ๐Ÿ”ง

Example: Zimbra Operations Server

zimbra-mcp-server.ts:

import { Server } from "@modelcontextprotocol/sdk/server/index.js";
import { execSync } from "child_process";

const server = new Server(
  { name: "zimbra-admin-server", version: "1.0.0" },
  { capabilities: { tools: {}, resources: {} } }
);

// List mailboxes resource
server.setRequestHandler(ListResourcesRequestSchema, async () => ({
  resources: [
    {
      uri: "zimbra://mailboxes",
      name: "All Mailboxes",
      description: "List of all Zimbra mailboxes",
      mimeType: "application/json"
    },
    {
      uri: "zimbra://domains",
      name: "All Domains",
      description: "List of all Zimbra domains",
      mimeType: "application/json"
    }
  ]
}));

// Read mailbox list
server.setRequestHandler(ReadResourceRequestSchema, async (request) => {
  if (request.params.uri === "zimbra://mailboxes") {
    const output = execSync("zmprov -l gaa", { encoding: "utf-8" });
    const mailboxes = output.trim().split("\n");

    return {
      contents: [{
        uri: request.params.uri,
        mimeType: "application/json",
        text: JSON.stringify(mailboxes, null, 2)
      }]
    };
  }
});

// Admin tools
server.setRequestHandler(ListToolsRequestSchema, async () => ({
  tools: [
    {
      name: "check_mailbox_health",
      description: "Check health status of a mailbox",
      inputSchema: {
        type: "object",
        properties: {
          email: { type: "string", format: "email" }
        },
        required: ["email"]
      }
    },
    {
      name: "get_mailbox_quota",
      description: "Get quota information for a mailbox",
      inputSchema: {
        type: "object",
        properties: {
          email: { type: "string", format: "email" }
        },
        required: ["email"]
      }
    },
    {
      name: "search_logs",
      description: "Search Zimbra logs for patterns",
      inputSchema: {
        type: "object",
        properties: {
          pattern: { type: "string" },
          log_file: {
            type: "string",
            enum: ["mailbox.log", "audit.log", "sync.log"]
          },
          lines: { type: "number", default: 100 }
        },
        required: ["pattern", "log_file"]
      }
    }
  ]
}));

server.setRequestHandler(CallToolRequestSchema, async (request) => {
  const { name, arguments: args } = request.params;

  if (name === "check_mailbox_health") {
    try {
      const output = execSync(
        `zmmailbox -z -m ${args.email} gaf`,
        { encoding: "utf-8", timeout: 30000 }
      );

      return {
        content: [{
          type: "text",
          text: `โœ… Mailbox ${args.email} is healthy\n\nFolders:\n${output}`
        }]
      };
    } catch (error) {
      return {
        content: [{
          type: "text",
          text: `โŒ Health check failed: ${error.message}`
        }],
        isError: true
      };
    }
  }

  if (name === "get_mailbox_quota") {
    const output = execSync(
      `zmprov gmi ${args.email} zimbraMailQuota`,
      { encoding: "utf-8" }
    );

    const quota = parseInt(output.match(/zimbraMailQuota: (\d+)/)?.[1] || "0");
    const used = execSync(
      `zmmailbox -z -m ${args.email} gms`,
      { encoding: "utf-8" }
    );

    return {
      content: [{
        type: "text",
        text: `Quota for ${args.email}:\n- Limit: ${(quota / 1024 / 1024).toFixed(2)} MB\n- Used: ${used}`
      }]
    };
  }

  if (name === "search_logs") {
    const logPath = `/opt/zimbra/log/${args.log_file}`;
    const output = execSync(
      `grep -i "${args.pattern}" ${logPath} | tail -n ${args.lines}`,
      { encoding: "utf-8" }
    );

    return {
      content: [{
        type: "text",
        text: output || `No matches found for "${args.pattern}"`
      }]
    };
  }

  throw new Error(`Unknown tool: ${name}`);
});

Use Case 2: Automated Zimbra Monitoring ๐Ÿ“Š

zimbra-monitor.py:

#!/usr/bin/env python3
import subprocess
import json
from anthropic import Anthropic
import time

claude = Anthropic()

def check_zimbra_health() -> dict:
    """Collect Zimbra health metrics"""

    health = {}

    # Service status
    try:
        status_output = subprocess.check_output(
            ["zmcontrol", "status"],
            encoding="utf-8"
        )
        health["services"] = parse_service_status(status_output)
    except Exception as e:
        health["services"] = {"error": str(e)}

    # Mailbox store usage
    try:
        df_output = subprocess.check_output(
            ["df", "-h", "/opt/zimbra/store"],
            encoding="utf-8"
        )
        health["storage"] = parse_storage_status(df_output)
    except Exception as e:
        health["storage"] = {"error": str(e)}

    # Recent errors in log
    try:
        errors = subprocess.check_output(
            ["grep", "-i", "error", "/opt/zimbra/log/mailbox.log"],
            encoding="utf-8"
        ).strip().split("\n")[-10:]  # Last 10 errors
        health["recent_errors"] = errors
    except Exception:
        health["recent_errors"] = []

    return health

def analyze_with_claude(health_data: dict) -> dict:
    """Analyze health data with Claude"""

    response = claude.messages.create(
        model="claude-sonnet-4-5-20250929",
        max_tokens=2048,
        messages=[{
            "role": "user",
            "content": f"""Analyze this Zimbra server health data and provide recommendations:

```json
{json.dumps(health_data, indent=2)}

Provide:

  1. Overall health assessment (Healthy/Warning/Critical)
  2. Issues identified
  3. Recommended actions
  4. Priority (High/Medium/Low)

Format as JSON.""" }] )

return json.loads(response.content[0].text)

def alert_if_needed(analysis: dict): """Send alert if issues detected"""

if analysis["overall_health"] in ["Warning", "Critical"]:
    # Send to Slack
    requests.post(
        os.environ["SLACK_WEBHOOK_URL"],
        json={
            "text": f"โš ๏ธ Zimbra Health Alert: {analysis['overall_health']}",
            "attachments": [{
                "color": "warning" if analysis["overall_health"] == "Warning" else "danger",
                "fields": [
                    {
                        "title": "Issues",
                        "value": "\n".join(analysis["issues"]),
                        "short": False
                    },
                    {
                        "title": "Recommended Actions",
                        "value": "\n".join(analysis["recommended_actions"]),
                        "short": False
                    }
                ]
            }]
        }
    )

Main monitoring loop

if name == "main": while True: print("Checking Zimbra health...")

    health = check_zimbra_health()
    analysis = analyze_with_claude(health)
    alert_if_needed(analysis)

    print(f"Health: {analysis['overall_health']}")

    # Run every 5 minutes
    time.sleep(300)

### Use Case 3: Slack Integration for Zimbra Alerts ๐Ÿ’ฌ

**zimbra-slack-bot.py**:
```python
from slack_bolt import App
from anthropic import Anthropic
import subprocess

app = App(token=os.environ["SLACK_BOT_TOKEN"])
claude = Anthropic()

@app.command("/zimbra")
def handle_zimbra_command(ack, command, respond):
    """Handle /zimbra slash command"""

    ack()  # Acknowledge immediately

    action = command["text"].strip()

    if action == "status":
        # Get Zimbra status
        status = subprocess.check_output(
            ["zmcontrol", "status"],
            encoding="utf-8"
        )

        respond(f"```\n{status}\n```")

    elif action.startswith("diagnose "):
        email = action.split(" ", 1)[1]

        # Diagnose with Claude
        response = claude.messages.create(
            model="claude-sonnet-4-5-20250929",
            max_tokens=1024,
            messages=[{
                "role": "user",
                "content": f"Diagnose issues for Zimbra mailbox {email}. Check logs and provide troubleshooting steps."
            }]
        )

        respond(response.content[0].text)

    else:
        respond("Usage: `/zimbra status` or `/zimbra diagnose <email>`")

if __name__ == "__main__":
    app.start(port=3000)

๐Ÿ’ก Active Recall: For a Zimbra production deployment, which monitoring approach would catch issues fastest: MCP server tools, automated health checks, or Slack bot commands?


Part 8: Cost Optimization

Cost Optimization Workflow - Monitor, Analyze, Optimize, Validate cycle Cost Optimization Cycle: Continuous process from monitoring API calls and token usage, analyzing high-cost drivers, implementing optimization strategies (prompt engineering, model selection, caching, batching), validating results with A/B testing, and achieving 40-60% cost reduction

The Cost Challenge

Claude Sonnet 4.5 Pricing (as of 2025):

| Component | Cost per Million Tokens | Typical Ratio | |-----------|------------------------|---------------| | ๐Ÿ“ฅ Input | $3.00 | 60-70% of total | | ๐Ÿ“ค Output | $15.00 | 30-40% of total |

Example Monthly Costs:

  • 100 users ร— 100K tokens/day = 10M tokens/day = $450-2,250/day = $13,500-67,500/month

Optimization is critical.

The 5 Cost Optimization Strategies

| Strategy | Savings Potential | Complexity | Implementation Time | |----------|------------------|------------|---------------------| | ๐Ÿ’พ Caching | Up to 90% on repeated context | Low | 1 day | | ๐ŸŽฏ Model Selection | 30-60% with smart routing | Medium | 3-5 days | | ๐ŸŽซ Token Budgets | Prevents overruns | Low | 1 day | | ๐Ÿ“ก Response Streaming | 10-20% early stopping | Medium | 2-3 days | | ๐Ÿ“Š Monitoring & Alerts | Prevents waste | Low | 2 days |

Strategy 1: Caching ๐Ÿ’พ

Prompt Caching (Anthropic feature):

# Without caching
response1 = claude.messages.create(
    model="claude-sonnet-4-5-20250929",
    max_tokens=1024,
    messages=[{
        "role": "user",
        "content": f"{large_context}\n\nQuestion 1"
    }]
)
# Cost: Full input tokens

response2 = claude.messages.create(
    model="claude-sonnet-4-5-20250929",
    max_tokens=1024,
    messages=[{
        "role": "user",
        "content": f"{large_context}\n\nQuestion 2"
    }]
)
# Cost: Full input tokens again!

# With caching
response1 = claude.messages.create(
    model="claude-sonnet-4-5-20250929",
    max_tokens=1024,
    system=[{
        "type": "text",
        "text": large_context,
        "cache_control": {"type": "ephemeral"}
    }],
    messages=[{
        "role": "user",
        "content": "Question 1"
    }]
)
# Cost: Full input tokens (first time)

response2 = claude.messages.create(
    model="claude-sonnet-4-5-20250929",
    max_tokens=1024,
    system=[{
        "type": "text",
        "text": large_context,
        "cache_control": {"type": "ephemeral"}
    }],
    messages=[{
        "role": "user",
        "content": "Question 2"
    }]
)
# Cost: Cached input tokens (90% cheaper!)

Savings: Up to 90% on repeated context

Strategy 2: Model Selection ๐ŸŽฏ

Model Comparison Table:

| Model | Cost (Input/Output) | Speed | Best For | |-------|-------------------|-------|----------| | ๐Ÿƒ Haiku | $0.25/$1.25 per M | โšกโšกโšก Fast | Simple tasks, high volume | | โš–๏ธ Sonnet | $3/$15 per M | โšกโšก Medium | Most tasks, balanced | | ๐Ÿง  Opus | $15/$75 per M | โšก Slow | Complex reasoning, critical |

Choose the right model:

class SmartClaudeClient:
    def route_request(self, prompt: str, complexity: str = "auto"):
        """Route to appropriate model based on complexity"""

        if complexity == "auto":
            # Analyze prompt complexity
            if len(prompt) < 200 and "simple" in prompt.lower():
                model = "claude-haiku-4-20250409"
            elif "complex" in prompt.lower() or len(prompt) > 2000:
                model = "claude-opus-4-20250514"
            else:
                model = "claude-sonnet-4-5-20250929"
        else:
            model = {
                "simple": "claude-haiku-4-20250409",
                "standard": "claude-sonnet-4-5-20250929",
                "complex": "claude-opus-4-20250514"
            }[complexity]

        return self.client.messages.create(
            model=model,
            messages=[{"role": "user", "content": prompt}]
        )

Strategy 3: Token Budgets ๐ŸŽซ

Per-User Limits:

class TokenBudgetEnforcer:
    def __init__(self, redis_client):
        self.redis = redis_client

    def check_budget(self, user_id: str, tokens_requested: int) -> bool:
        """Check if user within daily token budget"""

        key = f"token_budget:{user_id}:{date.today()}"
        used = int(self.redis.get(key) or 0)
        limit = 100000  # 100K tokens/day

        if used + tokens_requested > limit:
            raise BudgetExceededError(
                f"Daily budget exceeded: {used}/{limit} tokens used"
            )

        return True

    def record_usage(self, user_id: str, tokens_used: int):
        """Record token usage"""

        key = f"token_budget:{user_id}:{date.today()}"
        self.redis.incr(key, tokens_used)
        self.redis.expire(key, 86400)  # Expire after 24h

enforcer = TokenBudgetEnforcer(redis_client)

@app.route("/api/claude/chat", methods=["POST"])
def claude_chat():
    user_id = session["user"]["id"]

    # Check budget
    enforcer.check_budget(user_id, estimated_tokens)

    # Process request
    response = claude.messages.create(...)

    # Record actual usage
    enforcer.record_usage(user_id, response.usage.total_tokens)

    return jsonify(response)

Strategy 4: Response Streaming ๐Ÿ“ก

Stop when enough:

def stream_with_early_stop(prompt: str, max_quality_tokens: int = 500):
    """Stream response, stop when quality degrades"""

    with claude.messages.stream(
        model="claude-sonnet-4-5-20250929",
        max_tokens=2048,
        messages=[{"role": "user", "content": prompt}]
    ) as stream:
        tokens = 0
        accumulated = ""

        for text in stream.text_stream:
            accumulated += text
            tokens += 1

            # Stop if we have enough quality content
            if tokens > max_quality_tokens and accumulated.endswith((".", "!", "?")):
                stream.close()
                break

        return accumulated

Strategy 5: Monitoring & Alerts ๐Ÿ“Š

Cost Dashboard:

@app.route("/admin/costs")
def cost_dashboard():
    """Real-time cost dashboard"""

    # Today's costs
    today_tokens = sum_tokens_today()
    today_cost = calculate_cost(today_tokens)

    # This month
    month_tokens = sum_tokens_month()
    month_cost = calculate_cost(month_tokens)

    # Projected monthly cost
    days_elapsed = date.today().day
    days_in_month = calendar.monthrange(date.today().year, date.today().month)[1]
    projected_cost = (month_cost / days_elapsed) * days_in_month

    # Top users
    top_users = get_top_users_by_cost(limit=10)

    return render_template("costs.html",
        today_cost=today_cost,
        month_cost=month_cost,
        projected_cost=projected_cost,
        budget=50000,  # $50K/month
        top_users=top_users
    )

Alert when approaching budget:

# Check every hour
@app.before_request
def check_monthly_budget():
    if not g.get("budget_checked"):
        month_cost = calculate_month_cost()
        budget = 50000  # $50K

        if month_cost > budget * 0.90:
            alert_admin(
                f"โš ๏ธ 90% of monthly budget used: ${month_cost:,.2f} / ${budget:,.2f}"
            )

        g.budget_checked = True

Cost Optimization Checklist:

| Optimization | Before | After | Savings | Status | |--------------|--------|-------|---------|--------| | โœ… Prompt caching | $20K/month | $4K/month | 80% | Implemented | | โœ… Model routing | $15K/month | $8K/month | 47% | Implemented | | โœ… Token budgets | Uncontrolled | $50K cap | N/A | Enforced | | โฑ๏ธ Early stopping | Not used | Testing | TBD | In progress | | โœ… Cost alerts | None | Real-time | N/A | Active |

๐Ÿ’ก Active Recall: If you could only implement TWO cost optimization strategies today, which combination would yield the highest ROI: Caching + Model Selection, Token Budgets + Monitoring, or Streaming + Alerts?


FAQ

How do I choose between building a custom MCP server vs using the Claude API directly?

Decision Matrix:

| Factor | MCP Server | Claude API Direct | |--------|-----------|------------------| | ๐Ÿ—„๏ธ Data access needs | โœ… Complex, multiple sources | โš ๏ธ Simple, single source | | ๐Ÿ‘ฅ Team usage | โœ… Multi-user, shared resources | โš ๏ธ Single user, scripts | | ๐Ÿ”„ Reusability | โœ… Across sessions, discoverable | โŒ Per-script basis | | โš™๏ธ Tool orchestration | โœ… Multiple tools, workflows | โš ๏ธ Single purpose | | ๐Ÿš€ Time to value | โš ๏ธ Days to build | โœ… Hours to script |

Use MCP server when:

  • You need to expose internal data/tools to Claude
  • You want reusable resources across sessions
  • You're building for team/multi-user access
  • You want standardized, discoverable capabilities

Use API directly when:

  • Simple automation scripts
  • One-off tasks
  • No need for persistent resources
  • Programmatic control over every aspect

Often: Use both! API for orchestration, MCP for capabilities.

What's the minimum security setup for production?

Essential Security Stack (Priority Order):

| Priority | Control | Implementation Time | Risk if Missing | |----------|---------|-------------------|----------------| | ๐Ÿ”ด P0 | Authentication (SSO) | 1 week | ๐Ÿ”ด Critical | | ๐Ÿ”ด P0 | PII detection | 3 days | ๐Ÿ”ด Critical | | ๐ŸŸก P1 | RBAC | 1 week | ๐ŸŸก High | | ๐ŸŸก P1 | Audit logging | 3 days | ๐ŸŸก High | | ๐ŸŸข P2 | Data residency | 5 days | ๐ŸŸข Medium |

Essential:

  1. โœ… Authentication (SSO preferred)
  2. โœ… RBAC (role-based access control)
  3. โœ… PII detection (block sensitive data)
  4. โœ… Audit logging (comprehensive)
  5. โœ… Data residency (comply with regulations)

Start here, then add:

  • Usage limits
  • Cost tracking
  • Observability
  • Governance policies

How much should I budget for Claude Code in production?

Formula:

Monthly Cost = Users ร— Tokens/User/Day ร— 30 days ร— Token Price

Example (100 users, 100K tokens/day avg, mostly Sonnet):

100 users ร— 100K tokens ร— 30 days = 300M tokens/month
300M ร— $3 (input) / 1M = $900 (input)
300M ร— $15 (output) / 1M = $4,500 (output)
Total: ~$5,400/month

Add 30% buffer for spikes โ†’ ~$7,000/month

Budgeting by Team Size:

| Team Size | Tokens/Month | Estimated Cost | Per-User Cost | |-----------|-------------|---------------|--------------| | 10 users | 30M | $1,000-1,500 | $100-150 | | 50 users | 150M | $5,000-7,500 | $100-150 | | 100 users | 300M | $10,000-15,000 | $100-150 | | 500 users | 1.5B | $50,000-75,000 | $100-150 |

Rule of thumb: $50-150 per active user per month

How do I measure ROI?

ROI Calculation Framework:

Track:

  1. Time saved: Features delivered ร— hours saved per feature
  2. Quality improvement: Bugs reduced, test coverage increased
  3. Cost: Claude API + engineering time

Example:

  • 50 features/month ร— 4 hours saved = 200 hours saved
  • 200 hours ร— $100/hour = $20,000 value
  • Cost: $7,000 (Claude) + $5,000 (eng time) = $12,000
  • ROI: 67% return

ROI Metrics Table:

| Metric | Measurement Method | Target | |--------|-------------------|--------| | โฑ๏ธ Time Savings | Features/month ร— hours saved | >100 hours/month | | ๐Ÿ› Bug Reduction | Bugs filed pre/post deployment | -30% bugs | | โœ… Test Coverage | Lines covered by tests | +20% coverage | | ๐Ÿš€ Velocity | Story points/sprint | +25% velocity | | ๐Ÿ’ฐ Net Value | (Time saved ร— rate) - costs | >50% ROI |

Can I use Claude Code for regulated industries (healthcare, finance)?

Yes, but you need:

Compliance Requirements by Industry:

| Industry | Key Requirements | Claude Setup | |----------|-----------------|-------------| | ๐Ÿฅ Healthcare (HIPAA) | BAA, PHI protection, audit trails | AWS Bedrock + encryption | | ๐Ÿ’ณ Finance (PCI DSS) | PCI compliance, data segregation | VPC + dedicated instances | | ๐Ÿฆ Banking (GLBA) | Financial privacy, audit logs | Private deployment + RBAC | | ๐Ÿ‡ช๐Ÿ‡บ EU Operations (GDPR) | Data residency, right to erasure | EU regions only |

  1. โœ… BAA (Business Associate Agreement) for HIPAA
  2. โœ… Data residency controls (EU, US, etc.)
  3. โœ… Audit trails (comprehensive logging)
  4. โœ… Access controls (RBAC, SOC 2)
  5. โœ… Encryption (at rest, in transit)

Best: Use AWS Bedrock or GCP Vertex AI for compliance features built-in.

How do I handle MCP server failures gracefully?

Pattern: Circuit breaker + fallback

class MCPServerWrapper {
  private failureCount = 0;
  private maxFailures = 3;
  private cooldownMs = 60000;
  private lastFailureTime = 0;

  async callTool(name: string, args: any) {
    // Check circuit breaker
    if (this.failureCount >= this.maxFailures) {
      const elapsed = Date.now() - this.lastFailureTime;
      if (elapsed < this.cooldownMs) {
        throw new Error("Circuit breaker open");
      }
      // Reset after cooldown
      this.failureCount = 0;
    }

    try {
      const result = await this.server.callTool(name, args);
      this.failureCount = 0;  // Reset on success
      return result;
    } catch (error) {
      this.failureCount++;
      this.lastFailureTime = Date.now();

      // Log for monitoring
      logger.error("MCP tool call failed", { name, error });

      throw error;
    }
  }
}

Resilience Patterns:

| Pattern | Purpose | Tradeoff | |---------|---------|----------| | ๐Ÿ”„ Circuit Breaker | Stop cascading failures | Temporary unavailability | | โฑ๏ธ Timeout | Prevent hanging | May cut off slow requests | | ๐Ÿ” Retry with Backoff | Handle transient failures | Increased latency | | ๐Ÿ“‰ Graceful Degradation | Partial functionality | Reduced capability |

What's the best way to test MCP servers before production?

Three-stage testing:

Testing Pyramid:

| Stage | Coverage | Tools | Time Investment | |-------|----------|-------|-----------------| | ๐Ÿ”ฌ Unit Tests | Server logic, handlers | Jest, pytest | 2-3 days | | ๐Ÿ”— Integration Tests | With Claude CLI | Manual testing | 1-2 days | | ๐Ÿš€ Load Tests | Production simulation | k6, Locust | 2-3 days |

  1. Unit tests (server logic):
describe("Knowledge Base Server", () => {
  it("should list resources", async () => {
    const result = await server.handle(ListResourcesRequest);
    expect(result.resources).toHaveLength(10);
  });

  it("should handle missing resources", async () => {
    await expect(
      server.handle(ReadResourceRequest, { uri: "kb://invalid" })
    ).rejects.toThrow("Resource not found");
  });
});
  1. Integration tests (with Claude):
# Test via Claude CLI
$ claude
> "List resources from knowledge-base server"
[Verify output]

> "Search knowledge base for 'API authentication'"
[Verify results]
  1. Load tests (production simulation):
# Load test MCP server
import asyncio

async def load_test():
    tasks = [
        server.call_tool("search_kb", {"query": f"query {i}"})
        for i in range(100)
    ]
    await asyncio.gather(*tasks)

# Measure: latency, error rate, resource usage

Conclusion

You've Built a Production AI System

Not a desktop tool. Not a side project. A production system.

โœ… The Integration Maturity Model: Progressed from Level 1 (desktop) to Level 4-5 (enterprise platform)

โœ… Custom MCP Servers: Built data connectors, action executors, prompt libraries, observability bridges

โœ… Enterprise Security: Implemented SSO, RBAC, PII detection, audit logging, compliance frameworks

โœ… Production Observability: Deployed OpenTelemetry, Prometheus metrics, Grafana dashboards, alerting

โœ… Third-Party Integrations: Connected Slack, Discord, GitHub with automated workflows

โœ… Zimbra Production: Real enterprise deployment with monitoring, alerts, and automation

โœ… Cost Optimization: Caching, model selection, token budgets, monitoring, alerting

But here's what matters most: You understand that AI in production isn't about using cooler models.

It's about:

  • ๐Ÿ”’ Security (protecting data, meeting compliance)
  • ๐Ÿ‘๏ธ Observability (knowing what's happening)
  • ๐Ÿ“œ Governance (controlling costs, enforcing policies)
  • ๐Ÿ›ก๏ธ Reliability (handling failures gracefully)
  • ๐Ÿ“ˆ Scale (serving hundreds or thousands of users)

The Series Complete

This is Post 9โ€”the finale of our Claude Code journey:

| Post | Focus | Achievement | |------|-------|-------------| | 1 | Introduction | Discovered Claude Code | | 2 | Setup | Installed and configured | | 3 | Basics | First commands and workflows | | 4 | Advanced | Power user techniques | | 5 | Automation | Hooks, scripts, CI/CD | | 6 | MCP & Extensions | Plugins and integrations | | 7 | Troubleshooting | Resilience engineering | | 8 | Workflows | Team productivity | | 9 | Production | Enterprise deployment โ† You are here |

From zero to production in 9 posts.

What's Next?

Journey Recommendations by Current Level:

| Your Current Level | Next Steps | Timeline | |-------------------|-----------|----------| | ๐ŸŸข Level 1-2 | Focus on automation (Post 5) and workflows (Post 8) | 1-2 months | | ๐ŸŸก Level 3 | Build first MCP server (this post), deploy observability (Part 5) | 2-3 months | | ๐Ÿ”ด Level 4 | Implement complete governance framework, achieve Level 5 | 3-6 months |

If you're at Level 1-2: Focus on automation (Post 5) and workflows (Post 8)

If you're at Level 3: Build your first MCP server (this post), deploy observability (Part 5)

If you're at Level 4: Implement complete governance framework, achieve Level 5 (AI Operating System)

For everyone: The ecosystem is moving fast. OpenAI, Google, Microsoft all adopted MCP in 2025. The time to build production AI systems is now.

The Challenge

Your mission: Deploy Claude Code to production following this framework.

30-Day Production Deployment Plan:

| Week | Focus | Deliverables | Success Criteria | |------|-------|-------------|-----------------| | 1๏ธโƒฃ | MCP Foundation | Data connector server | Claude can access internal data | | 2๏ธโƒฃ | Security | SSO, RBAC, PII detection | All access authenticated + authorized | | 3๏ธโƒฃ | Observability | Metrics, logs, traces, alerts | Full visibility into operations | | 4๏ธโƒฃ | Integration & Optimization | Slack/GitHub, cost controls | Team workflows automated |

30 days:

  • Week 1: Build custom MCP server (data connector)
  • Week 2: Implement security (SSO, RBAC, PII detection)
  • Week 3: Deploy observability (metrics, logs, traces, alerts)
  • Week 4: Integrate third-party (Slack/GitHub), optimize costs

After 30 days: You'll have a production AI system serving your team with enterprise security, complete observability, and controlled costs.

Then ask: What can we build that wasn't possible before?

For resources and continued learning, visit Claude Code Docs, explore MCP Specification, and join the community on GitHub.


๐Ÿ‘ˆ Previous: Workflow Engineering


P.S. Remember: 64% of organizations lack visibility into AI risks. 47% have no AI-specific security controls. Don't be in that majority. Build production systems the right wayโ€”secure, observable, governed. Your future self (and your security team) will thank you.