Skip to main content
Reference

Integration Patterns

REST API, GraphQL, WebSocket, event-driven, webhook, Slack/Discord bot, and background job patterns for embedding agents in applications.

For backend engineers integrating harnesses into larger applications.

When you’ve built a harness and need to embed it into production systems, this document covers the main integration patterns: from library embedding to event-driven architectures, from REST APIs to GraphQL, third-party messaging platforms to database persistence.


Part 1: Harness as a Library

When to Use Library Integration

Best for:

  • Single monolithic application
  • Low latency critical (<500ms)
  • Complex state shared between harness and application
  • Debugging ease important (shared memory, no network)

Not ideal for:

  • Distributed systems (agents need to scale independently)
  • Async/long-running jobs (blocks application thread)
  • When you need independent versioning

Installation & Setup

Assume your harness is a Python package (published to PyPI or internal registry):

# Install in your application
pip install my-harness-agent==1.0.0

Basic Usage Pattern

# app.py
from my_harness import AgentHarness, AgentConfig

# Initialize once (reuse across requests)
config = AgentConfig(
    model="claude-3-5-sonnet",
    api_key=os.environ["CLAUDE_API_KEY"],
    system_prompt="You are a helpful assistant",
    tools=["file_read", "file_write", "web_search"]
)

harness = AgentHarness(config=config)

# Call from anywhere in your app
def process_user_request(user_input: str) -> dict:
    """Synchronous harness call."""
    result = harness.run(user_input)
    
    return {
        "status": "success",
        "result": result["output"],
        "cost": result["cost_usd"],
        "tokens_used": result["tokens_used"]
    }

Error Handling

from my_harness.exceptions import (
    HarnessError,
    LoopLimitExceeded,
    ToolExecutionError,
    APIKeyError
)

def safe_harness_call(user_input: str) -> dict:
    """Call harness with comprehensive error handling."""
    try:
        result = harness.run(user_input, max_iterations=10)
        return {"status": "success", "result": result}
    
    except LoopLimitExceeded as e:
        # Agent hit iteration limit - partial results may be available
        return {
            "status": "partial",
            "result": e.partial_result,
            "error": f"Agent exceeded max iterations: {str(e)}",
            "iterations": e.iterations_completed
        }
    
    except ToolExecutionError as e:
        # A tool failed - agent may have recovered
        return {
            "status": "partial",
            "result": e.last_successful_result,
            "error": f"Tool failed: {e.tool_name}: {str(e)}",
            "failed_tool": e.tool_name
        }
    
    except APIKeyError as e:
        # API key invalid or expired
        return {
            "status": "error",
            "error": "API authentication failed",
            "code": "auth_error"
        }
    
    except HarnessError as e:
        # Generic harness error
        return {
            "status": "error",
            "error": str(e),
            "code": "harness_error"
        }

Passing Configuration

# Dynamic configuration per request
def process_with_custom_config(user_input: str, model: str = None) -> dict:
    """Run harness with request-specific configuration."""
    
    # Override defaults if provided
    config = config.copy(update={
        "model": model or config.model,
        "temperature": 0.7,
        "max_tokens": 2000
    })
    
    harness_instance = AgentHarness(config=config)
    result = harness_instance.run(user_input)
    
    return {
        "result": result["output"],
        "model_used": model,
        "cost": result["cost_usd"]
    }

Shared Context with Application

class HarnessWithAppContext:
    """Harness that shares application state."""
    
    def __init__(self, harness: AgentHarness, app_db):
        self.harness = harness
        self.app_db = app_db
    
    def run_with_app_context(self, user_input: str, user_id: str) -> dict:
        """Run harness with access to application database."""
        
        # Load user context
        user = self.app_db.users.get(user_id)
        
        # Inject into harness system prompt
        enhanced_prompt = f"""
        User: {user.name}
        Account type: {user.account_type}
        Previous interactions: {user.interaction_count}
        
        Use this context to provide personalized responses.
        """
        
        # Run with context
        result = self.harness.run(
            user_input,
            system_prompt=enhanced_prompt,
            context={"user_id": user_id, "account": user.account_type}
        )
        
        # Save interaction to app database
        self.app_db.interactions.insert({
            "user_id": user_id,
            "input": user_input,
            "output": result["output"],
            "cost": result["cost_usd"],
            "timestamp": datetime.utcnow()
        })
        
        return result

# Usage in Flask/FastAPI
harness_app = HarnessWithAppContext(harness, database)

@app.post("/ask")
def ask(request: AskRequest):
    result = harness_app.run_with_app_context(
        request.question,
        user_id=request.user_id
    )
    return {"answer": result["output"], "cost": result["cost_usd"]}

Return Values & Metadata

# Standard return structure
result = harness.run(user_input)

# Always includes:
{
    "output": "Final agent response",
    "status": "success",  # or "partial", "error"
    
    # Cost tracking
    "cost_usd": 0.0042,
    "tokens_used": {
        "input": 150,
        "output": 200,
        "total": 350
    },
    
    # Execution info
    "iterations": 3,
    "duration_seconds": 2.14,
    "tools_called": ["web_search", "file_read"],
    
    # Debug info (if enabled)
    "reasoning": "Thought process from model",
    "tool_calls": [
        {
            "tool": "web_search",
            "input": {"query": "..."},
            "output": "Search results...",
            "duration": 0.5
        }
    ]
}

Part 2: Harness as a REST API Service

When to Use REST API Pattern

Best for:

  • Distributed systems (harness runs separately)
  • Multiple applications consuming the harness
  • Independent scaling (harness scales independently from app)
  • Asynchronous processing with callbacks
  • Rate limiting and quota management per consumer

Trade-offs:

  • Network latency (100-500ms overhead)
  • More complex deployment
  • Requires API authentication and versioning

FastAPI Implementation

# harness_api.py
from fastapi import FastAPI, HTTPException, Header, Depends
from pydantic import BaseModel, Field
from typing import Optional
import json
from datetime import datetime
import os

from my_harness import AgentHarness, AgentConfig

app = FastAPI(title="Harness Agent API", version="1.0.0")

# Initialize harness once
config = AgentConfig(
    model="claude-3-5-sonnet",
    api_key=os.environ["CLAUDE_API_KEY"]
)
harness = AgentHarness(config=config)

# ===== Request/Response Models =====

class QueryRequest(BaseModel):
    """User query."""
    query: str = Field(..., description="The user's question or task")
    context: Optional[dict] = Field(None, description="Optional context")
    max_iterations: int = Field(10, ge=1, le=50)
    temperature: Optional[float] = Field(None, ge=0, le=2)

class ToolCall(BaseModel):
    """Record of a tool execution."""
    tool_name: str
    input: dict
    output: str
    duration_ms: float
    status: str  # "success" or "error"

class QueryResponse(BaseModel):
    """Successful query response."""
    query_id: str
    status: str  # "success" or "partial"
    result: str
    iterations: int
    cost_usd: float
    tokens_used: int
    duration_seconds: float
    tools_called: list[ToolCall]
    timestamp: str

class ErrorResponse(BaseModel):
    """Error response."""
    error: str
    code: str
    details: Optional[dict] = None

# ===== Dependency: API Key Validation =====

async def verify_api_key(x_api_key: str = Header(...)) -> str:
    """Validate API key from header."""
    # In production, verify against database or secrets manager
    valid_keys = os.environ.get("VALID_API_KEYS", "").split(",")
    
    if x_api_key not in valid_keys:
        raise HTTPException(status_code=401, detail="Invalid API key")
    
    return x_api_key

# ===== Endpoints =====

@app.post("/agent/query", response_model=QueryResponse)
async def agent_query(
    request: QueryRequest,
    api_key: str = Depends(verify_api_key)
) -> dict:
    """
    Query the agent harness.
    
    **Parameters**:
    - query: User question/task
    - context: Optional application context
    - max_iterations: Max loops agent can perform
    
    **Returns**:
    - Result from agent
    - Cost in USD
    - Tools called
    - Execution time
    """
    import uuid
    
    query_id = str(uuid.uuid4())
    start_time = datetime.utcnow()
    
    try:
        # Run harness
        result = harness.run(
            request.query,
            context=request.context,
            max_iterations=request.max_iterations,
            temperature=request.temperature
        )
        
        duration = (datetime.utcnow() - start_time).total_seconds()
        
        return QueryResponse(
            query_id=query_id,
            status=result.get("status", "success"),
            result=result["output"],
            iterations=result.get("iterations", 0),
            cost_usd=result.get("cost_usd", 0),
            tokens_used=result.get("tokens_used", {}).get("total", 0),
            duration_seconds=duration,
            tools_called=[
                ToolCall(**tool) for tool in result.get("tool_calls", [])
            ],
            timestamp=start_time.isoformat()
        )
    
    except Exception as e:
        raise HTTPException(
            status_code=500,
            detail={
                "error": str(e),
                "query_id": query_id,
                "code": "harness_error"
            }
        )

@app.get("/health")
async def health_check():
    """Health check endpoint."""
    return {
        "status": "healthy",
        "timestamp": datetime.utcnow().isoformat()
    }

@app.get("/ready")
async def readiness_check():
    """Readiness probe (can accept requests?)."""
    try:
        # Quick test: can we call the model?
        harness.run("Acknowledge", max_iterations=1)
        
        return {
            "ready": True,
            "model_responsive": True,
            "timestamp": datetime.utcnow().isoformat()
        }
    except Exception as e:
        return {
            "ready": False,
            "model_responsive": False,
            "error": str(e)
        }, 503

@app.get("/metrics")
async def metrics():
    """Prometheus-compatible metrics endpoint."""
    # Collect from harness or database
    return {
        "requests_total": harness.metrics.get("total_requests", 0),
        "requests_success": harness.metrics.get("successful_requests", 0),
        "cost_total_usd": harness.metrics.get("total_cost_usd", 0),
        "tokens_used_total": harness.metrics.get("total_tokens", 0)
    }

# Run server
if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

Client Library

# harness_client.py - For consuming applications
import httpx
import json
from typing import Optional

class HarnessClient:
    """Client for interacting with Harness API."""
    
    def __init__(self, base_url: str, api_key: str):
        self.base_url = base_url.rstrip("/")
        self.api_key = api_key
        self.client = httpx.Client(
            headers={"X-API-Key": api_key},
            timeout=30.0
        )
    
    def query(self, prompt: str, context: Optional[dict] = None) -> dict:
        """Send query to harness."""
        response = self.client.post(
            f"{self.base_url}/agent/query",
            json={
                "query": prompt,
                "context": context,
                "max_iterations": 10
            }
        )
        
        response.raise_for_status()
        return response.json()
    
    def health(self) -> bool:
        """Check if harness is healthy."""
        response = self.client.get(f"{self.base_url}/health")
        return response.status_code == 200
    
    def close(self):
        """Close HTTP connection."""
        self.client.close()

# Usage in consuming app
harness = HarnessClient(
    base_url="https://harness.internal:8000",
    api_key=os.environ["HARNESS_API_KEY"]
)

result = harness.query("Summarize my sales data")
print(f"Cost: ${result['cost_usd']}")
print(f"Result: {result['result']}")

Error Handling & HTTP Status Codes

from fastapi import HTTPException, status

# Standardized error codes
class HarnessErrorCode:
    LOOP_LIMIT_EXCEEDED = "loop_limit_exceeded"      # 400
    TOOL_EXECUTION_ERROR = "tool_error"              # 422
    INVALID_INPUT = "invalid_input"                  # 400
    API_KEY_INVALID = "auth_failed"                  # 401
    RATE_LIMIT_EXCEEDED = "rate_limit_exceeded"      # 429
    INTERNAL_ERROR = "internal_error"                # 500
    SERVICE_UNAVAILABLE = "service_unavailable"      # 503

@app.exception_handler(Exception)
async def general_exception_handler(request, exc):
    """Global error handler."""
    return {
        "error": str(exc),
        "code": HarnessErrorCode.INTERNAL_ERROR,
        "timestamp": datetime.utcnow().isoformat()
    }, 500

# Specific handlers for known errors
@app.post("/agent/query")
async def agent_query_with_error_handling(request: QueryRequest):
    try:
        return await agent_query(request)
    
    except LoopLimitExceeded:
        raise HTTPException(
            status_code=400,
            detail={
                "error": "Agent exceeded iteration limit",
                "code": HarnessErrorCode.LOOP_LIMIT_EXCEEDED
            }
        )
    
    except ToolExecutionError:
        raise HTTPException(
            status_code=422,
            detail={
                "error": "Tool execution failed",
                "code": HarnessErrorCode.TOOL_EXECUTION_ERROR
            }
        )

Part 3: Harness as Async/Background Job

When to Use Background Jobs

Best for:

  • Long-running agents (>10 seconds)
  • Batch processing
  • Fire-and-forget operations
  • When latency isn’t critical

Celery + Redis Pattern

# tasks.py - Celery tasks
from celery import Celery
from kombu import Exchange, Queue
from my_harness import AgentHarness
import os
import json

# Configure Celery
app = Celery(
    'harness_tasks',
    broker=os.environ["CELERY_BROKER_URL"],  # redis://localhost:6379
    backend=os.environ["CELERY_RESULT_BACKEND"]
)

app.conf.update(
    task_serializer='json',
    accept_content=['json'],
    result_serializer='json',
    timezone='UTC',
    enable_utc=True,
    task_track_started=True,
    task_time_limit=30 * 60,  # 30 minutes hard limit
    task_soft_time_limit=25 * 60  # 25 minutes soft limit (gives grace period)
)

# Initialize harness once (reused by all tasks)
harness = AgentHarness(config=AgentConfig(
    model="claude-3-5-sonnet",
    api_key=os.environ["CLAUDE_API_KEY"]
))

@app.task(
    bind=True,
    name='harness.query',
    max_retries=3
)
def harness_query(self, query: str, user_id: str, context: dict = None):
    """
    Run harness query as background job.
    
    Args:
        query: User query
        user_id: User requesting the query
        context: Optional application context
    
    Returns:
        dict with result, cost, metadata
    """
    try:
        # Log job start
        print(f"[{self.request.id}] Starting harness query for user {user_id}")
        
        # Run harness
        result = harness.run(query, context=context, max_iterations=10)
        
        # Log completion
        print(f"[{self.request.id}] Completed in {result['duration_seconds']}s, cost: ${result['cost_usd']}")
        
        return {
            "status": "success",
            "query_id": self.request.id,
            "result": result["output"],
            "cost_usd": result["cost_usd"],
            "iterations": result["iterations"],
            "user_id": user_id
        }
    
    except Exception as exc:
        # Retry with exponential backoff
        raise self.retry(exc=exc, countdown=2 ** self.request.retries)

@app.task(
    bind=True,
    name='harness.batch',
    max_retries=2
)
def harness_batch(self, queries: list, user_id: str):
    """Process multiple queries."""
    results = []
    total_cost = 0
    
    for i, query in enumerate(queries):
        try:
            result = harness.run(query, max_iterations=5)
            results.append({
                "query": query,
                "result": result["output"],
                "cost": result["cost_usd"]
            })
            total_cost += result["cost_usd"]
        
        except Exception as e:
            results.append({
                "query": query,
                "error": str(e)
            })
    
    return {
        "status": "completed",
        "user_id": user_id,
        "results": results,
        "total_cost_usd": total_cost,
        "completed_count": sum(1 for r in results if "result" in r)
    }

# Usage from Flask/FastAPI
from celery.result import AsyncResult

@app.post("/query/async")
def query_async(request: QueryRequest):
    """Submit harness query as background job."""
    
    # Submit task to Celery
    task = harness_query.delay(
        query=request.query,
        user_id=request.user_id,
        context=request.context
    )
    
    return {
        "task_id": task.id,
        "status": "queued",
        "check_url": f"/query/status/{task.id}"
    }

@app.get("/query/status/{task_id}")
def query_status(task_id: str):
    """Check status of background job."""
    result = AsyncResult(task_id, app=app)
    
    if result.state == "PENDING":
        return {"status": "pending", "task_id": task_id}
    
    elif result.state == "SUCCESS":
        return {
            "status": "completed",
            "task_id": task_id,
            "result": result.result
        }
    
    elif result.state == "FAILURE":
        return {
            "status": "failed",
            "task_id": task_id,
            "error": str(result.info)
        }
    
    else:  # RETRY, STARTED, etc.
        return {"status": result.state, "task_id": task_id}

@app.get("/query/result/{task_id}")
def query_result(task_id: str):
    """Get result of completed job."""
    result = AsyncResult(task_id, app=app)
    
    if result.ready():
        if result.successful():
            return result.result
        else:
            raise HTTPException(status_code=500, detail=str(result.info))
    
    raise HTTPException(status_code=202, detail="Task still processing")

Webhook Callbacks (Job Completion)

# harness_tasks_with_webhooks.py
import httpx

@app.task(bind=True, name='harness.query_with_webhook')
def harness_query_with_webhook(self, query: str, webhook_url: str):
    """Run harness and POST result to webhook."""
    
    try:
        result = harness.run(query)
        
        # POST result to webhook
        client = httpx.Client()
        response = client.post(
            webhook_url,
            json={
                "task_id": self.request.id,
                "status": "completed",
                "result": result["output"],
                "cost_usd": result["cost_usd"]
            },
            timeout=30
        )
        
        response.raise_for_status()
        
        return {
            "status": "success",
            "webhook_status": response.status_code
        }
    
    except Exception as e:
        # Retry webhook if it fails
        try:
            client = httpx.Client()
            client.post(
                webhook_url,
                json={
                    "task_id": self.request.id,
                    "status": "failed",
                    "error": str(e)
                }
            )
        except:
            pass  # Webhook delivery failure is logged but not retried infinitely
        
        raise

# Usage
@app.post("/query/with-callback")
def query_with_callback(request: QueryWithCallbackRequest):
    """Submit query with callback URL."""
    
    task = harness_query_with_webhook.delay(
        query=request.query,
        webhook_url=request.callback_url
    )
    
    return {
        "task_id": task.id,
        "message": "Query queued. Result will be POSTed to callback URL"
    }

Timeout Handling

from celery.exceptions import SoftTimeLimitExceeded

@app.task(bind=True, name='harness.query_with_timeout')
def harness_query_with_timeout(self, query: str):
    """Handle graceful timeout."""
    
    try:
        # This will raise SoftTimeLimitExceeded at 25 minutes
        result = harness.run(query)
        return {"status": "success", "result": result}
    
    except SoftTimeLimitExceeded:
        # Gracefully save partial progress
        partial_result = harness.get_partial_result()
        
        return {
            "status": "timeout",
            "partial_result": partial_result,
            "message": "Agent exceeded time limit. Partial results above."
        }

Part 4: Event-Driven Integration

When to Use Event-Driven

Best for:

  • Responding to real-world events (user messages, file uploads, API webhooks)
  • Streaming data (Kafka, Pub/Sub topics)
  • Decoupled microservices
  • Building event sourcing systems

Kafka Event Consumer

# kafka_consumer.py
from kafka import KafkaConsumer
from my_harness import AgentHarness
import json
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

harness = AgentHarness(config=AgentConfig(
    model="claude-3-5-sonnet",
    api_key=os.environ["CLAUDE_API_KEY"]
))

# Kafka consumer
consumer = KafkaConsumer(
    'user_queries',  # Topic name
    bootstrap_servers=os.environ["KAFKA_BOOTSTRAP_SERVERS"].split(","),
    group_id='harness-consumer',
    value_deserializer=lambda x: json.loads(x.decode('utf-8')),
    auto_offset_reset='earliest',
    enable_auto_commit=True,
    max_poll_records=1  # Process one at a time
)

logger.info("Kafka consumer started, listening on 'user_queries' topic")

for message in consumer:
    """Process each event."""
    event = message.value
    
    try:
        logger.info(f"Processing event: {event.get('id')}")
        
        # Run harness on event data
        result = harness.run(
            query=event["user_query"],
            context={
                "user_id": event.get("user_id"),
                "session_id": event.get("session_id"),
                "metadata": event.get("metadata", {})
            },
            max_iterations=5
        )
        
        # Produce result to output topic
        producer.send(
            "agent_responses",
            value={
                "event_id": event["id"],
                "user_id": event.get("user_id"),
                "result": result["output"],
                "cost_usd": result["cost_usd"],
                "iterations": result["iterations"]
            }
        )
        
        logger.info(f"Completed event {event['id']}")
    
    except Exception as e:
        logger.error(f"Failed processing event {event.get('id')}: {str(e)}")
        
        # Send to dead-letter topic
        producer.send(
            "agent_errors",
            value={
                "event_id": event["id"],
                "error": str(e),
                "original_event": event
            }
        )

Google Pub/Sub Event Handler

# pubsub_handler.py
from google.cloud import pubsub_v1
from my_harness import AgentHarness
import json
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

harness = AgentHarness(config=AgentConfig(
    model="claude-3-5-sonnet",
    api_key=os.environ["CLAUDE_API_KEY"]
))

def message_handler(message):
    """Handle incoming Pub/Sub message."""
    try:
        event = json.loads(message.data.decode('utf-8'))
        
        logger.info(f"Processing event: {event.get('id')}")
        
        # Run harness
        result = harness.run(
            query=event["user_query"],
            context=event.get("context", {}),
            max_iterations=10
        )
        
        # Publish result
        publisher = pubsub_v1.PublisherClient()
        topic_path = publisher.topic_path(
            os.environ["GCP_PROJECT"],
            "agent-responses"
        )
        
        publisher.publish(
            topic_path,
            data=json.dumps({
                "event_id": event["id"],
                "result": result["output"],
                "cost_usd": result["cost_usd"]
            }).encode('utf-8')
        )
        
        # Acknowledge message
        message.ack()
        
    except Exception as e:
        logger.error(f"Error processing message: {str(e)}")
        # Negative acknowledgement will requeue the message
        message.nack()

# Subscribe and listen
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(
    os.environ["GCP_PROJECT"],
    "harness-subscription"
)

streaming_pull_future = subscriber.subscribe(
    subscription_path,
    callback=message_handler,
    flow_control=pubsub_v1.types.FlowControl(max_messages=1)
)

logger.info("Pub/Sub listener started")

try:
    streaming_pull_future.result()
except KeyboardInterrupt:
    streaming_pull_future.cancel()

Stateful Event Processing (Multiple Events)

# stateful_event_processor.py
from dataclasses import dataclass
from typing import Optional
import json

@dataclass
class EventSession:
    """Session state across multiple events."""
    session_id: str
    user_id: str
    events_processed: int = 0
    accumulated_context: dict = None
    total_cost: float = 0
    
    def __post_init__(self):
        if self.accumulated_context is None:
            self.accumulated_context = {}

class StatefulEventProcessor:
    """Process multiple related events with shared state."""
    
    def __init__(self, harness: AgentHarness, db):
        self.harness = harness
        self.db = db  # Database for session persistence
        self.sessions = {}  # In-memory cache
    
    def process_event(self, event: dict) -> dict:
        """Process event, maintaining session state."""
        
        session_id = event.get("session_id")
        
        # Load or create session
        session = self._get_or_create_session(session_id, event)
        
        # Accumulate context from previous events
        enhanced_context = {
            **session.accumulated_context,
            **event.get("context", {})
        }
        
        # Build prompt that includes session history
        prompt = f"""
        Session: {session_id}
        Events processed so far: {session.events_processed}
        Previous context: {json.dumps(session.accumulated_context)}
        
        New event: {event.get('user_query')}
        """
        
        # Run harness
        result = self.harness.run(prompt, context=enhanced_context)
        
        # Update session state
        session.events_processed += 1
        session.total_cost += result["cost_usd"]
        session.accumulated_context.update(enhanced_context)
        
        # Save session to database
        self._save_session(session)
        
        return {
            "event_id": event.get("id"),
            "session_id": session_id,
            "result": result["output"],
            "session_total_cost": session.total_cost,
            "events_in_session": session.events_processed
        }
    
    def _get_or_create_session(self, session_id: str, event: dict) -> EventSession:
        """Get session from cache or database."""
        
        if session_id in self.sessions:
            return self.sessions[session_id]
        
        # Try to load from database
        session_data = self.db.sessions.find_one({"session_id": session_id})
        
        if session_data:
            session = EventSession(**session_data)
        else:
            session = EventSession(
                session_id=session_id,
                user_id=event.get("user_id")
            )
        
        self.sessions[session_id] = session
        return session
    
    def _save_session(self, session: EventSession):
        """Persist session to database."""
        self.db.sessions.update_one(
            {"session_id": session.session_id},
            {
                "$set": {
                    "user_id": session.user_id,
                    "events_processed": session.events_processed,
                    "accumulated_context": session.accumulated_context,
                    "total_cost": session.total_cost,
                    "updated_at": datetime.utcnow()
                }
            },
            upsert=True
        )

Part 5: GraphQL Endpoint

When to Use GraphQL

Best for:

  • Complex, nested data queries
  • Multiple clients with different data needs
  • Frontend applications that want to query specific fields
  • Building internal APIs with strict schemas

GraphQL Schema & Resolver

# schema.py
import graphene
from graphene import ObjectType, String, Float, Int, List, Field
from my_harness import AgentHarness
import asyncio

# Result type
class AgentResult(ObjectType):
    """Result from agent execution."""
    query_id = String(required=True)
    output = String(required=True)
    cost_usd = Float(required=True)
    tokens_used = Int(required=True)
    iterations = Int(required=True)
    tools_called = List(String)

# Query type
class Query(ObjectType):
    """GraphQL queries for harness."""
    
    agent_query = Field(
        AgentResult,
        query=String(required=True),
        context=graphene.JSONString(required=False)
    )
    
    agent_batch = Field(
        List(AgentResult),
        queries=List(String, required=True)
    )
    
    def resolve_agent_query(self, info, query, context=None):
        """Resolve single agent query."""
        harness = info.context.harness
        
        result = harness.run(query, context=context)
        
        return AgentResult(
            query_id=info.context.request_id,
            output=result["output"],
            cost_usd=result["cost_usd"],
            tokens_used=result["tokens_used"].get("total", 0),
            iterations=result.get("iterations", 0),
            tools_called=result.get("tools_called", [])
        )
    
    def resolve_agent_batch(self, info, queries):
        """Resolve multiple queries."""
        harness = info.context.harness
        results = []
        
        for query in queries:
            result = harness.run(query)
            results.append(AgentResult(
                query_id=f"{info.context.request_id}:{len(results)}",
                output=result["output"],
                cost_usd=result["cost_usd"],
                tokens_used=result["tokens_used"].get("total", 0),
                iterations=result.get("iterations", 0),
                tools_called=result.get("tools_called", [])
            ))
        
        return results

# Mutations for stateful operations
class ExecuteAgent(graphene.Mutation):
    """Execute agent with side effects."""
    
    class Arguments:
        query = String(required=True)
        persist = graphene.Boolean(required=False, default_value=False)
    
    result = Field(AgentResult)
    success = graphene.Boolean()
    message = String()
    
    def mutate(self, info, query, persist=False):
        harness = info.context.harness
        
        result = harness.run(query)
        
        if persist:
            # Save to database
            info.context.db.executions.insert({
                "query": query,
                "result": result["output"],
                "cost": result["cost_usd"]
            })
        
        return ExecuteAgent(
            result=AgentResult(
                query_id=info.context.request_id,
                output=result["output"],
                cost_usd=result["cost_usd"],
                tokens_used=result["tokens_used"].get("total", 0),
                iterations=result.get("iterations", 0),
                tools_called=result.get("tools_called", [])
            ),
            success=True,
            message="Execution completed"
        )

class Mutation(ObjectType):
    execute_agent = ExecuteAgent.Field()

# Create schema
schema = graphene.Schema(query=Query, mutation=Mutation)

# Usage with FastAPI
from ariadne import graphql_sync, make_executable_schema
from fastapi import FastAPI
from starlette.requests import Request

app = FastAPI()

@app.post("/graphql")
async def graphql_endpoint(request: Request):
    """GraphQL endpoint."""
    body = await request.json()
    
    # Create context
    context = {
        "harness": harness,
        "db": database,
        "request_id": str(uuid.uuid4())
    }
    
    # Execute query
    success, result = graphql_sync(
        schema,
        body,
        context_value=context
    )
    
    return {
        "success": success,
        "data": result.get("data"),
        "errors": result.get("errors")
    }

Part 6: WebSocket/Streaming

When to Use WebSocket

Best for:

  • Real-time interactions (live agent reasoning)
  • Streaming LLM token responses
  • Live tool execution progress
  • Long-running operations with progress updates
  • Web/mobile apps that want live feedback

FastAPI WebSocket Implementation

# websocket_server.py
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from my_harness import AgentHarness
import json
import asyncio
from datetime import datetime

app = FastAPI()

harness = AgentHarness(config=AgentConfig(
    model="claude-3-5-sonnet",
    api_key=os.environ["CLAUDE_API_KEY"]
))

@app.websocket("/ws/agent")
async def websocket_agent(websocket: WebSocket):
    """WebSocket endpoint for real-time agent interaction."""
    
    await websocket.accept()
    
    try:
        while True:
            # Receive message from client
            data = await websocket.receive_text()
            message = json.loads(data)
            
            if message.get("type") == "query":
                query = message.get("content")
                
                # Send acknowledgment
                await websocket.send_json({
                    "type": "status",
                    "status": "processing",
                    "timestamp": datetime.utcnow().isoformat()
                })
                
                # Run agent with streaming callback
                async def stream_callback(event):
                    """Called for each reasoning step."""
                    await websocket.send_json({
                        "type": "reasoning",
                        "content": event.get("thought"),
                        "timestamp": datetime.utcnow().isoformat()
                    })
                
                # Run harness
                result = await asyncio.to_thread(
                    harness.run_with_streaming,
                    query,
                    callback=stream_callback
                )
                
                # Send final result
                await websocket.send_json({
                    "type": "result",
                    "content": result["output"],
                    "cost_usd": result["cost_usd"],
                    "iterations": result["iterations"],
                    "timestamp": datetime.utcnow().isoformat()
                })
            
            elif message.get("type") == "stop":
                # Stop agent
                await websocket.send_json({
                    "type": "status",
                    "status": "stopped"
                })
                break
    
    except WebSocketDisconnect:
        print("Client disconnected")

JavaScript Client

// harness-client.js
class HarnessWebSocketClient {
    constructor(wsUrl) {
        this.wsUrl = wsUrl;
        this.ws = null;
        this.messageHandlers = {};
    }
    
    connect() {
        return new Promise((resolve, reject) => {
            this.ws = new WebSocket(this.wsUrl);
            
            this.ws.onopen = () => {
                console.log("Connected to harness");
                resolve();
            };
            
            this.ws.onerror = (error) => {
                console.error("WebSocket error:", error);
                reject(error);
            };
            
            this.ws.onmessage = (event) => {
                const message = JSON.parse(event.data);
                this.handleMessage(message);
            };
        });
    }
    
    query(prompt) {
        this.ws.send(JSON.stringify({
            type: "query",
            content: prompt
        }));
    }
    
    onReasoning(callback) {
        this.messageHandlers.reasoning = callback;
    }
    
    onResult(callback) {
        this.messageHandlers.result = callback;
    }
    
    onStatus(callback) {
        this.messageHandlers.status = callback;
    }
    
    handleMessage(message) {
        const handler = this.messageHandlers[message.type];
        if (handler) {
            handler(message);
        }
    }
    
    disconnect() {
        if (this.ws) {
            this.ws.close();
        }
    }
}

// Usage
const client = new HarnessWebSocketClient('ws://localhost:8000/ws/agent');

await client.connect();

client.onStatus((msg) => {
    console.log("Status:", msg.status);
});

client.onReasoning((msg) => {
    console.log("Agent thinking:", msg.content);
    document.getElementById('thinking').textContent = msg.content;
});

client.onResult((msg) => {
    console.log("Result:", msg.content);
    document.getElementById('result').textContent = msg.content;
    console.log("Cost: $" + msg.cost_usd);
});

client.query("What's the weather like?");

Part 7: Third-Party API Integrations

Slack Bot

# slack_bot.py
from slack_bolt import App
from slack_bolt.adapter.fastapi import SlackRequestHandler
from fastapi import FastAPI
from my_harness import AgentHarness
import os

# Initialize Slack app
slack_app = App(
    token=os.environ["SLACK_BOT_TOKEN"],
    signing_secret=os.environ["SLACK_SIGNING_SECRET"]
)

harness = AgentHarness(config=AgentConfig(
    model="claude-3-5-sonnet",
    api_key=os.environ["CLAUDE_API_KEY"]
))

@slack_app.message(".*")
def handle_message(message, say, client):
    """Handle direct messages to bot."""
    
    user_id = message["user"]
    text = message["text"]
    
    # Send "thinking..." reaction
    client.reactions_add(
        channel=message["channel"],
        timestamp=message["ts"],
        emoji="thinking_face"
    )
    
    try:
        # Run harness
        result = harness.run(text)
        
        # Send result
        say(f"*Agent Response*\n{result['output']}\n\n_Cost: ${result['cost_usd']:.4f}_")
    
    except Exception as e:
        say(f"Error: {str(e)}")

@slack_app.command("/agent")
def handle_agent_command(ack, body, say):
    """Handle /agent slash command."""
    ack()
    
    query = body.get("text")
    user_id = body.get("user_id")
    
    try:
        result = harness.run(query)
        
        say(f"*Query:* {query}\n*Response:* {result['output']}\n*Cost:* ${result['cost_usd']:.4f}")
    
    except Exception as e:
        say(f"Error processing query: {str(e)}")

# FastAPI app
app = FastAPI()

# Register Slack handler
handler = SlackRequestHandler(slack_app)

@app.post("/slack/events")
async def slack_events(req):
    return await handler.handle(req)

Discord Bot

# discord_bot.py
import discord
from discord.ext import commands
from my_harness import AgentHarness
import os

# Initialize bot
intents = discord.Intents.default()
intents.message_content = True
bot = commands.Bot(command_prefix="!", intents=intents)

harness = AgentHarness(config=AgentConfig(
    model="claude-3-5-sonnet",
    api_key=os.environ["CLAUDE_API_KEY"]
)

@bot.event
async def on_ready():
    print(f"Bot logged in as {bot.user}")

@bot.command(name="ask")
async def ask_command(ctx, *, question):
    """Ask the agent a question."""
    
    async with ctx.typing():  # Show typing indicator
        try:
            result = harness.run(question)
            
            # Split long responses (Discord has 2000 char limit)
            response = result["output"]
            if len(response) > 2000:
                response = response[:1997] + "..."
            
            embed = discord.Embed(
                title="Agent Response",
                description=response,
                color=discord.Color.blue()
            )
            embed.add_field(name="Cost", value=f"${result['cost_usd']:.4f}", inline=True)
            embed.add_field(name="Iterations", value=str(result["iterations"]), inline=True)
            
            await ctx.send(embed=embed)
        
        except Exception as e:
            await ctx.send(f"Error: {str(e)}")

@bot.command(name="status")
async def status_command(ctx):
    """Check agent status."""
    await ctx.send("Agent is online and ready!")

# Run bot
bot.run(os.environ["DISCORD_BOT_TOKEN"])

Telegram Bot

# telegram_bot.py
from telegram import Update
from telegram.ext import Application, CommandHandler, MessageHandler, filters, ContextTypes
from my_harness import AgentHarness
import os

harness = AgentHarness(config=AgentConfig(
    model="claude-3-5-sonnet",
    api_key=os.environ["CLAUDE_API_KEY"]
))

async def start(update: Update, context: ContextTypes.DEFAULT_TYPE):
    """Start command."""
    await update.message.reply_text(
        "Hi! I'm an AI agent. Send me a message and I'll respond!"
    )

async def handle_message(update: Update, context: ContextTypes.DEFAULT_TYPE):
    """Handle user messages."""
    
    user_message = update.message.text
    
    # Send typing indicator
    await context.bot.send_chat_action(
        chat_id=update.effective_chat.id,
        action="typing"
    )
    
    try:
        result = harness.run(user_message)
        
        await update.message.reply_text(
            f"{result['output']}\n\n_Cost: ${result['cost_usd']:.4f}_",
            parse_mode="Markdown"
        )
    
    except Exception as e:
        await update.message.reply_text(f"Error: {str(e)}")

# Create application
application = Application.builder().token(
    os.environ["TELEGRAM_BOT_TOKEN"]
).build()

# Add handlers
application.add_handler(CommandHandler("start", start))
application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, handle_message))

# Start bot
application.run_polling()

Part 8: Database Integration

Reading State from Database

# database_integration.py
from sqlalchemy import create_engine, Column, String, DateTime, Text
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from datetime import datetime
import json

Base = declarative_base()

class UserProfile(Base):
    """User profile from application database."""
    __tablename__ = "users"
    
    id = Column(String, primary_key=True)
    name = Column(String)
    email = Column(String)
    preferences = Column(Text)  # JSON
    created_at = Column(DateTime)

class Agent Context(Base):
    """Context extracted from database."""
    __tablename__ = "agent_context"
    
    user_id = Column(String, primary_key=True)
    context_json = Column(Text)
    last_updated = Column(DateTime)

class HarnessWithDatabase:
    """Harness that loads state from database."""
    
    def __init__(self, harness: AgentHarness, db_url: str):
        self.harness = harness
        self.engine = create_engine(db_url)
        self.SessionLocal = sessionmaker(bind=self.engine)
    
    def run_for_user(self, query: str, user_id: str) -> dict:
        """Run harness with user context from database."""
        
        session = self.SessionLocal()
        
        try:
            # Load user profile
            user = session.query(UserProfile).filter_by(id=user_id).first()
            
            if not user:
                raise ValueError(f"User {user_id} not found")
            
            # Load cached context
            context_record = session.query(AgentContext).filter_by(
                user_id=user_id
            ).first()
            
            context = json.loads(context_record.context_json) if context_record else {}
            
            # Enhance with user profile
            context.update({
                "user_name": user.name,
                "user_email": user.email,
                "user_preferences": json.loads(user.preferences) if user.preferences else {}
            })
            
            # Run harness with context
            result = self.harness.run(query, context=context)
            
            # Save updated context
            if not context_record:
                context_record = AgentContext(
                    user_id=user_id,
                    context_json=json.dumps(context),
                    last_updated=datetime.utcnow()
                )
                session.add(context_record)
            else:
                context_record.context_json = json.dumps(context)
                context_record.last_updated = datetime.utcnow()
            
            session.commit()
            
            return result
        
        finally:
            session.close()

Writing Results to Database

class AgentExecution(Base):
    """Record of agent execution."""
    __tablename__ = "agent_executions"
    
    id = Column(String, primary_key=True)
    user_id = Column(String)
    query = Column(Text)
    result = Column(Text)
    cost_usd = Column(Float)
    tokens_used = Column(Integer)
    duration_seconds = Column(Float)
    created_at = Column(DateTime, default=datetime.utcnow)

def save_execution(session, execution_record: AgentExecution):
    """Save harness execution to database."""
    session.add(execution_record)
    session.commit()

# Usage
result = harness.run(query)

execution = AgentExecution(
    id=str(uuid.uuid4()),
    user_id=user_id,
    query=query,
    result=result["output"],
    cost_usd=result["cost_usd"],
    tokens_used=result["tokens_used"].get("total", 0),
    duration_seconds=result.get("duration_seconds", 0)
)

save_execution(db_session, execution)

Transaction Management

from sqlalchemy import transaction

def transactional_harness_run(harness, query, user_id, db_session):
    """Run harness within database transaction."""
    
    with db_session.begin():  # Auto-rollback on exception
        try:
            # Update user state before harness
            user = db_session.query(UserProfile).filter_by(id=user_id).with_for_update().first()
            user.last_query_at = datetime.utcnow()
            
            # Run harness
            result = harness.run(query)
            
            # Save result
            execution = AgentExecution(
                user_id=user_id,
                query=query,
                result=result["output"],
                cost_usd=result["cost_usd"]
            )
            db_session.add(execution)
            
            # Update cost tracking
            user.total_cost += result["cost_usd"]
            user.total_executions += 1
            
            # All changes committed together
            return result
        
        except Exception as e:
            # Transaction automatically rolled back
            logger.error(f"Transaction failed: {str(e)}")
            raise

Part 9: File System Integration

Reading and Writing Files

# file_integration.py
import os
from pathlib import Path

def harness_with_file_context(harness, query: str, project_dir: str) -> dict:
    """Run harness with access to project files."""
    
    # Load relevant files
    context = {
        "project_structure": _get_directory_structure(project_dir),
        "readme_content": _read_file(os.path.join(project_dir, "README.md")),
        "requirements": _read_file(os.path.join(project_dir, "requirements.txt"))
    }
    
    # Run harness
    result = harness.run(query, context=context)
    
    # Agent may have written files - collect them
    written_files = _collect_written_files(project_dir)
    
    return {
        **result,
        "files_created": written_files
    }

def _get_directory_structure(root_dir: str, max_depth=3) -> str:
    """Get project directory structure."""
    def get_tree(path, prefix="", depth=0):
        if depth > max_depth:
            return ""
        
        try:
            entries = sorted(os.listdir(path))
        except PermissionError:
            return ""
        
        lines = []
        for entry in entries:
            if entry.startswith("."):
                continue
            
            full_path = os.path.join(path, entry)
            is_dir = os.path.isdir(full_path)
            
            lines.append(f"{prefix}{'├── ' if is_dir else '└── '}{entry}")
            
            if is_dir:
                lines.append(get_tree(
                    full_path,
                    prefix + ("│   " if is_dir else "    "),
                    depth + 1
                ))
        
        return "\n".join(lines)
    
    return get_tree(root_dir)

def _read_file(file_path: str, max_size: int = 5000) -> str:
    """Read file with size limit."""
    try:
        with open(file_path, 'r') as f:
            content = f.read(max_size)
        return content + "..." if len(content) == max_size else content
    except FileNotFoundError:
        return ""

Object Storage (S3, GCS)

# object_storage_integration.py
import boto3
from google.cloud import storage as gcs

class HarnessWithS3:
    """Harness with S3 file access."""
    
    def __init__(self, harness, bucket_name: str):
        self.harness = harness
        self.s3 = boto3.client('s3')
        self.bucket = bucket_name
    
    def run_with_s3_files(self, query: str, file_keys: list) -> dict:
        """Run harness with files from S3."""
        
        # Download files
        files_content = {}
        for key in file_keys:
            try:
                response = self.s3.get_object(Bucket=self.bucket, Key=key)
                content = response['Body'].read().decode('utf-8')
                files_content[key] = content
            except Exception as e:
                files_content[key] = f"Error: {str(e)}"
        
        # Run harness with file content
        context = {"s3_files": files_content}
        result = self.harness.run(query, context=context)
        
        # If agent generated files, upload them
        if result.get("generated_files"):
            for filename, content in result["generated_files"].items():
                self.s3.put_object(
                    Bucket=self.bucket,
                    Key=f"generated/{filename}",
                    Body=content.encode('utf-8')
                )
        
        return result

class HarnessWithGCS:
    """Harness with Google Cloud Storage access."""
    
    def __init__(self, harness, bucket_name: str, project_id: str):
        self.harness = harness
        self.client = gcs.Client(project=project_id)
        self.bucket = self.client.bucket(bucket_name)
    
    def run_with_gcs_files(self, query: str, file_names: list) -> dict:
        """Run harness with files from GCS."""
        
        files_content = {}
        for name in file_names:
            try:
                blob = self.bucket.blob(name)
                content = blob.download_as_string().decode('utf-8')
                files_content[name] = content
            except Exception as e:
                files_content[name] = f"Error: {str(e)}"
        
        context = {"gcs_files": files_content}
        result = self.harness.run(query, context=context)
        
        # Upload generated files
        if result.get("generated_files"):
            for filename, content in result["generated_files"].items():
                blob = self.bucket.blob(f"generated/{filename}")
                blob.upload_from_string(content.encode('utf-8'))
        
        return result

Part 10: Monitoring & Observability Integration

Sending Metrics to Prometheus

# prometheus_integration.py
from prometheus_client import Counter, Gauge, Histogram, generate_latest, CollectorRegistry
import time

class PrometheusHarnessMetrics:
    """Prometheus metrics for harness."""
    
    def __init__(self):
        self.registry = CollectorRegistry()
        
        # Counters
        self.requests_total = Counter(
            'harness_requests_total',
            'Total harness requests',
            registry=self.registry
        )
        
        self.errors_total = Counter(
            'harness_errors_total',
            'Total harness errors',
            ['error_type'],
            registry=self.registry
        )
        
        self.cost_total = Counter(
            'harness_cost_usd_total',
            'Total cost in USD',
            registry=self.registry
        )
        
        # Gauges
        self.active_requests = Gauge(
            'harness_requests_active',
            'Currently active requests',
            registry=self.registry
        )
        
        self.cost_daily = Gauge(
            'harness_cost_usd_daily',
            'Daily cost in USD',
            registry=self.registry
        )
        
        # Histograms
        self.request_duration = Histogram(
            'harness_request_duration_seconds',
            'Request duration in seconds',
            buckets=(0.5, 1.0, 2.5, 5.0, 10.0),
            registry=self.registry
        )
        
        self.iterations = Histogram(
            'harness_iterations',
            'Number of agent iterations',
            buckets=(1, 2, 3, 5, 10),
            registry=self.registry
        )

def harness_with_metrics(harness, metrics: PrometheusHarnessMetrics):
    """Wrap harness with Prometheus metrics."""
    
    def run_with_metrics(query: str, **kwargs) -> dict:
        metrics.requests_total.inc()
        metrics.active_requests.inc()
        
        start = time.time()
        
        try:
            result = harness.run(query, **kwargs)
            
            # Record metrics
            duration = time.time() - start
            metrics.request_duration.observe(duration)
            metrics.iterations.observe(result.get("iterations", 0))
            metrics.cost_total.inc(result.get("cost_usd", 0))
            metrics.cost_daily.inc(result.get("cost_usd", 0))
            
            return result
        
        except Exception as e:
            metrics.errors_total.labels(error_type=type(e).__name__).inc()
            raise
        
        finally:
            metrics.active_requests.dec()
    
    return run_with_metrics

# FastAPI endpoint for Prometheus scraping
@app.get("/metrics")
async def metrics(metrics_obj: PrometheusHarnessMetrics):
    return generate_latest(metrics_obj.registry)

Sending Logs to Centralized Logging

# logging_integration.py
import logging
import json
from pythonjsonlogger import jsonlogger

class HarnessJSONFormatter(jsonlogger.JsonFormatter):
    """JSON formatter for structured logging."""
    
    def add_fields(self, log_record, record, message_dict):
        super(HarnessJSONFormatter, self).add_fields(log_record, record, message_dict)
        
        log_record['timestamp'] = datetime.utcnow().isoformat()
        log_record['logger'] = record.name
        log_record['level'] = record.levelname
        
        if hasattr(record, 'user_id'):
            log_record['user_id'] = record.user_id
        
        if hasattr(record, 'query_id'):
            log_record['query_id'] = record.query_id

# Configure logging
handler = logging.StreamHandler()
formatter = HarnessJSONFormatter('%(timestamp)s %(level)s %(message)s')
handler.setFormatter(formatter)

logger = logging.getLogger('harness')
logger.addHandler(handler)
logger.setLevel(logging.INFO)

# Log harness execution
def log_harness_execution(logger, query_id, query, result):
    """Log harness execution with structure."""
    logger.info(
        "Harness execution completed",
        extra={
            "query_id": query_id,
            "query": query[:100],  # First 100 chars
            "result_length": len(result["output"]),
            "cost_usd": result["cost_usd"],
            "iterations": result.get("iterations"),
            "duration_seconds": result.get("duration_seconds"),
            "tools": result.get("tools_called", [])
        }
    )

Tracing Integration (Jaeger, Datadog)

# tracing_integration.py
from opentelemetry import trace, metrics
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.jaeger.thrift import JaegerExporter
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
from opentelemetry.instrumentation.requests import RequestsInstrumentor

# Initialize Jaeger exporter
jaeger_exporter = JaegerExporter(
    agent_host_name="localhost",
    agent_port=6831,
)

trace.set_tracer_provider(TracerProvider())
trace.get_tracer_provider().add_span_processor(
    BatchSpanProcessor(jaeger_exporter)
)

tracer = trace.get_tracer(__name__)

# Instrument FastAPI
FastAPIInstrumentor.instrument_app(app)
RequestsInstrumentor().instrument()

# Use tracing in harness
def harness_with_tracing(harness):
    """Wrap harness with distributed tracing."""
    
    def run_with_tracing(query: str, **kwargs) -> dict:
        with tracer.start_as_current_span("harness.query") as span:
            span.set_attribute("query", query)
            
            # Trace tool calls
            def traced_tool_call(tool_name, input_data):
                with tracer.start_as_current_span(f"tool.{tool_name}") as tool_span:
                    tool_span.set_attribute("tool", tool_name)
                    tool_span.set_attribute("input", json.dumps(input_data))
                    
                    result = harness.call_tool(tool_name, input_data)
                    
                    tool_span.set_attribute("output_length", len(str(result)))
                    return result
            
            result = harness.run(query, **kwargs)
            
            span.set_attribute("cost_usd", result["cost_usd"])
            span.set_attribute("iterations", result.get("iterations", 0))
            span.set_attribute("status", result.get("status", "success"))
            
            return result
    
    return run_with_tracing

Part 11: Authentication & Authorization

API Key Authentication

# auth_integration.py
from fastapi import HTTPException, Header, Depends
from functools import lru_cache
import hmac
import hashlib

class APIKeyManager:
    """Manage API keys for harness access."""
    
    def __init__(self, db):
        self.db = db
    
    def validate_api_key(self, api_key: str) -> str:
        """Validate API key and return user_id."""
        record = self.db.api_keys.find_one({"key": api_key})
        
        if not record or not record.get("active"):
            raise HTTPException(status_code=401, detail="Invalid API key")
        
        return record["user_id"]
    
    def get_key_limit(self, api_key: str) -> int:
        """Get rate limit for this key."""
        record = self.db.api_keys.find_one({"key": api_key})
        return record.get("rate_limit_per_minute", 10)

# Dependency for FastAPI
api_key_manager = APIKeyManager(database)

async def verify_api_key(x_api_key: str = Header(...)) -> str:
    """FastAPI dependency to validate API key."""
    return api_key_manager.validate_api_key(x_api_key)

@app.post("/agent/query")
async def agent_query(
    request: QueryRequest,
    user_id: str = Depends(verify_api_key)
) -> dict:
    """Only accessible with valid API key."""
    result = harness.run(request.query)
    
    # Log for user
    database.executions.insert({
        "user_id": user_id,
        "query": request.query,
        "cost": result["cost_usd"],
        "timestamp": datetime.utcnow()
    })
    
    return result

OAuth 2.0 Integration

# oauth_integration.py
from fastapi_users import FastAPIUsers, BaseUserManager, sqlalchemy_to_pydantic
from fastapi_users.authentication import JwtAuthentication
from fastapi_users.db import SQLAlchemyUserDatabase

# User model
class User(Base):
    id = Column(Integer, primary_key=True)
    email = Column(String, unique=True)
    hashed_password = Column(String)
    is_active = Column(Boolean, default=True)
    is_superuser = Column(Boolean, default=False)
    rate_limit = Column(Integer, default=10)  # Requests per minute

# OAuth setup
oauth2_scheme = FastAPIUsers(
    SQLAlchemyUserDatabase(database, User),
    [JwtAuthentication(secret="your-secret")],
    User,
)

@app.post("/agent/query")
async def agent_query_oauth(
    request: QueryRequest,
    user: User = Depends(oauth2_scheme.current_user())
) -> dict:
    """OAuth-protected harness endpoint."""
    result = harness.run(request.query)
    
    # Log access
    database.executions.insert({
        "user_id": user.id,
        "query": request.query,
        "cost": result["cost_usd"]
    })
    
    return result

Rate Limiting Per User

# rate_limiting.py
from slowapi import Limiter
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded

limiter = Limiter(key_func=get_remote_address)

# Per-user rate limiting
async def rate_limit_key(user_id: str) -> str:
    """Rate limit by user ID instead of IP."""
    return user_id

@app.post("/agent/query")
@limiter.limit("10/minute")  # Default limit
async def agent_query_limited(
    request: QueryRequest,
    user_id: str = Depends(verify_api_key)
):
    """Rate-limited harness endpoint."""
    
    # Check user-specific limit
    limit = api_key_manager.get_key_limit(user_id)
    
    result = harness.run(request.query)
    return result

Part 12: Complete Integration Example

REST API + Webhook Callback + Slack Bot

# complete_integration.py
"""
Complete integration example:
- FastAPI REST API for harness queries
- Webhook callbacks for async results
- Slack bot using the REST API
- Cost tracking across all channels
"""

from fastapi import FastAPI, HTTPException, Header, Depends
from slack_bolt import App as SlackApp
from slack_bolt.adapter.fastapi import SlackRequestHandler
import httpx
import os
from datetime import datetime
from typing import Optional
import json

# ===== Initialize Components =====

app = FastAPI(title="Harness Integration Platform")

# Harness
harness = AgentHarness(config=AgentConfig(
    model="claude-3-5-sonnet",
    api_key=os.environ["CLAUDE_API_KEY"]
))

# Slack bot
slack_app = SlackApp(
    token=os.environ["SLACK_BOT_TOKEN"],
    signing_secret=os.environ["SLACK_SIGNING_SECRET"]
)

# Database
db_engine = create_engine(os.environ["DATABASE_URL"])
SessionLocal = sessionmaker(bind=db_engine)

# ===== Database Models =====

class ExecutionRecord(Base):
    __tablename__ = "executions"
    
    id = Column(String, primary_key=True)
    user_id = Column(String)
    channel = Column(String)  # "rest_api", "slack", "webhook"
    query = Column(Text)
    result = Column(Text)
    cost_usd = Column(Float)
    tokens_used = Column(Integer)
    duration_seconds = Column(Float)
    created_at = Column(DateTime, default=datetime.utcnow)

class WebhookSubscription(Base):
    __tablename__ = "webhook_subscriptions"
    
    id = Column(String, primary_key=True)
    user_id = Column(String)
    webhook_url = Column(String)
    events = Column(String)  # JSON list of event types
    active = Column(Boolean, default=True)
    created_at = Column(DateTime, default=datetime.utcnow)

# ===== REST API Endpoints =====

async def verify_api_key(x_api_key: str = Header(...)) -> str:
    """Verify API key."""
    if x_api_key != os.environ.get("API_KEY"):
        raise HTTPException(status_code=401, detail="Invalid API key")
    return "api_user"

@app.post("/query")
async def rest_query(
    query: str,
    user_id: str = Depends(verify_api_key),
    callback_url: Optional[str] = None
) -> dict:
    """
    Submit harness query via REST API.
    
    If callback_url provided, result will be POSTed when ready.
    """
    import uuid
    
    execution_id = str(uuid.uuid4())
    
    async def process_query():
        """Process in background."""
        try:
            result = harness.run(query)
            
            # Save execution
            session = SessionLocal()
            execution = ExecutionRecord(
                id=execution_id,
                user_id=user_id,
                channel="rest_api",
                query=query,
                result=result["output"],
                cost_usd=result["cost_usd"],
                tokens_used=result["tokens_used"].get("total", 0),
                duration_seconds=result.get("duration_seconds", 0)
            )
            session.add(execution)
            session.commit()
            
            # Send webhook if provided
            if callback_url:
                async with httpx.AsyncClient() as client:
                    await client.post(
                        callback_url,
                        json={
                            "execution_id": execution_id,
                            "status": "completed",
                            "result": result["output"],
                            "cost_usd": result["cost_usd"]
                        }
                    )
        
        except Exception as e:
            # Send error webhook
            if callback_url:
                async with httpx.AsyncClient() as client:
                    await client.post(
                        callback_url,
                        json={
                            "execution_id": execution_id,
                            "status": "error",
                            "error": str(e)
                        }
                    )
    
    # Start background task
    import asyncio
    asyncio.create_task(process_query())
    
    return {
        "execution_id": execution_id,
        "status": "queued",
        "callback_url": callback_url
    }

@app.get("/execution/{execution_id}")
async def get_execution(execution_id: str):
    """Get execution result."""
    session = SessionLocal()
    execution = session.query(ExecutionRecord).filter_by(id=execution_id).first()
    session.close()
    
    if not execution:
        raise HTTPException(status_code=404, detail="Execution not found")
    
    return {
        "id": execution.id,
        "result": execution.result,
        "cost_usd": execution.cost_usd,
        "created_at": execution.created_at.isoformat()
    }

# ===== Slack Bot =====

@slack_app.message(".*")
def handle_slack_message(message, say, client):
    """Handle Slack DMs."""
    
    user_id = message["user"]
    text = message["text"]
    
    client.reactions_add(
        channel=message["channel"],
        timestamp=message["ts"],
        emoji="hourglass_flowing_sand"
    )
    
    try:
        result = harness.run(text)
        
        # Save execution
        session = SessionLocal()
        execution = ExecutionRecord(
            id=f"slack-{user_id}-{int(datetime.now().timestamp())}",
            user_id=user_id,
            channel="slack",
            query=text,
            result=result["output"],
            cost_usd=result["cost_usd"],
            tokens_used=result["tokens_used"].get("total", 0)
        )
        session.add(execution)
        session.commit()
        session.close()
        
        say(f"*Response:*\n{result['output']}\n\n_Cost: ${result['cost_usd']:.4f}_")
    
    except Exception as e:
        say(f"Error: {str(e)}")

# Register Slack handler
handler = SlackRequestHandler(slack_app)

@app.post("/slack/events")
async def slack_events(req):
    return await handler.handle(req)

# ===== Cost Tracking Across All Channels =====

@app.get("/analytics/cost")
async def cost_analytics():
    """Get cost breakdown by channel."""
    session = SessionLocal()
    
    result = session.query(
        ExecutionRecord.channel,
        func.sum(ExecutionRecord.cost_usd).label("total_cost"),
        func.count(ExecutionRecord.id).label("count"),
        func.avg(ExecutionRecord.cost_usd).label("avg_cost")
    ).group_by(ExecutionRecord.channel).all()
    
    session.close()
    
    return {
        "by_channel": [
            {
                "channel": r[0],
                "total_cost_usd": float(r[1]),
                "request_count": r[2],
                "avg_cost_usd": float(r[3])
            }
            for r in result
        ]
    }

# ===== Main =====

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

Summary Table: Which Pattern to Use?

IntegrationLatencyComplexityScalingUse When
Library<100msLowN/ASingle monolithic app
REST API100-500msMediumIndependentMultiple apps, distributed system
Background Job (Celery)1-60sMediumVia queueLong-running, batch processing
Event-Driven (Kafka)10-100msHighExcellentReal-time, streaming architecture
GraphQL100-500msMedium-HighIndependentComplex query patterns, frontend apps
WebSocket<50msMedium-HighPer-connectionReal-time interactions, live feedback
Slack/Discord/Telegram1-5sLowVia platformUser-facing chatbots
Database Integration<100msMediumVia DBShared state, multi-agent systems

  • Architecture: See 06_harness_architecture.md for harness design patterns
  • Deployment: See 12_deployment_patterns.md for Docker, Kubernetes, CI/CD
  • Monitoring: See 09_operations_and_observability.md for logging, metrics, cost tracking
  • Security: See 10_security_and_safety.md for authentication, input validation, rate limiting
  • Testing: See 11_testing_and_qa.md for testing integrated harnesses

Validation Checklist

How do you know you got this right?

Performance Checks

  • Library integration: <100ms latency, no network overhead
  • REST API: <500ms p95 latency, 99%+ uptime
  • Async jobs: queue latency <1s, processing <60s for typical task
  • Event-driven: <100ms event-to-response latency
  • WebSocket: <50ms round-trip for interactive feedback

Implementation Checks

  • Integration pattern chosen based on latency/complexity requirements
  • Error handling implemented: circuit breaker, retry logic, fallback
  • Authentication working: correct credentials, tokens, API keys passed
  • Request/response formats correct: matching integration partner’s spec
  • Monitoring integrated: logging all requests, errors, latency metrics
  • Rate limiting respected: not exceeding partner API quotas
  • Tested against integration partner: real requests, not mocks

Integration Checks

  • Harness components isolated: failure in one doesn’t cascade
  • State synchronization working: shared state consistent across integrations
  • Request tracing: can follow request from entry to exit
  • Cost tracking: know cost of each integration type and request
  • Graceful degradation: missing integration doesn’t break harness

Common Failure Modes

  • Latency miscalculation: Library chosen for low-latency, turns out needs remote APIs
  • Auth credentials hardcoded: Credentials in code; move to env vars/secrets
  • No rate limit handling: API rate limits hit unexpectedly; implement backoff
  • State inconsistency: Harness updates DB, but REST client reads stale copy
  • Error swallowed: Integration fails silently; no logging of failures
  • Timeout too short: Network hiccup causes false failure; increase timeout

Sign-Off Criteria

  • Integration chosen and documented with reasoning (latency/complexity trade-off)
  • Tested end-to-end with real integration partner (not mocked)
  • Load tested: verified performance under expected usage
  • Error scenarios tested: what happens when integration fails
  • Monitoring in place: can see integration performance and failures
  • Documentation complete: how to use, configure, troubleshoot

See Also

  • Doc 06 (Harness Architecture): Integration patterns extend component 7 (Orchestration)
  • Doc 09 (Operations): Monitoring integrated harnesses in production
  • Doc 10 (Security & Safety): Authentication, authorization, credential management

Changelog

  • April 2026: Created comprehensive integration patterns guide
    • 12 integration patterns covering library to event-driven architectures
    • REST API, WebSocket, GraphQL implementations with code examples
    • Third-party platform integrations (Slack, Discord, Telegram)
    • Database and file system integration patterns
    • Monitoring, authentication, and complete end-to-end example
    • Decision matrix for choosing integration pattern