Reference
Integration Patterns
REST API, GraphQL, WebSocket, event-driven, webhook, Slack/Discord bot, and background job patterns for embedding agents in applications.
For backend engineers integrating harnesses into larger applications.
When you’ve built a harness and need to embed it into production systems, this document covers the main integration patterns: from library embedding to event-driven architectures, from REST APIs to GraphQL, third-party messaging platforms to database persistence.
Part 1: Harness as a Library
When to Use Library Integration
Best for:
- Single monolithic application
- Low latency critical (<500ms)
- Complex state shared between harness and application
- Debugging ease important (shared memory, no network)
Not ideal for:
- Distributed systems (agents need to scale independently)
- Async/long-running jobs (blocks application thread)
- When you need independent versioning
Installation & Setup
Assume your harness is a Python package (published to PyPI or internal registry):
# Install in your application
pip install my-harness-agent==1.0.0
Basic Usage Pattern
# app.py
from my_harness import AgentHarness, AgentConfig
# Initialize once (reuse across requests)
config = AgentConfig(
model="claude-3-5-sonnet",
api_key=os.environ["CLAUDE_API_KEY"],
system_prompt="You are a helpful assistant",
tools=["file_read", "file_write", "web_search"]
)
harness = AgentHarness(config=config)
# Call from anywhere in your app
def process_user_request(user_input: str) -> dict:
"""Synchronous harness call."""
result = harness.run(user_input)
return {
"status": "success",
"result": result["output"],
"cost": result["cost_usd"],
"tokens_used": result["tokens_used"]
}
Error Handling
from my_harness.exceptions import (
HarnessError,
LoopLimitExceeded,
ToolExecutionError,
APIKeyError
)
def safe_harness_call(user_input: str) -> dict:
"""Call harness with comprehensive error handling."""
try:
result = harness.run(user_input, max_iterations=10)
return {"status": "success", "result": result}
except LoopLimitExceeded as e:
# Agent hit iteration limit - partial results may be available
return {
"status": "partial",
"result": e.partial_result,
"error": f"Agent exceeded max iterations: {str(e)}",
"iterations": e.iterations_completed
}
except ToolExecutionError as e:
# A tool failed - agent may have recovered
return {
"status": "partial",
"result": e.last_successful_result,
"error": f"Tool failed: {e.tool_name}: {str(e)}",
"failed_tool": e.tool_name
}
except APIKeyError as e:
# API key invalid or expired
return {
"status": "error",
"error": "API authentication failed",
"code": "auth_error"
}
except HarnessError as e:
# Generic harness error
return {
"status": "error",
"error": str(e),
"code": "harness_error"
}
Passing Configuration
# Dynamic configuration per request
def process_with_custom_config(user_input: str, model: str = None) -> dict:
"""Run harness with request-specific configuration."""
# Override defaults if provided
config = config.copy(update={
"model": model or config.model,
"temperature": 0.7,
"max_tokens": 2000
})
harness_instance = AgentHarness(config=config)
result = harness_instance.run(user_input)
return {
"result": result["output"],
"model_used": model,
"cost": result["cost_usd"]
}
Shared Context with Application
class HarnessWithAppContext:
"""Harness that shares application state."""
def __init__(self, harness: AgentHarness, app_db):
self.harness = harness
self.app_db = app_db
def run_with_app_context(self, user_input: str, user_id: str) -> dict:
"""Run harness with access to application database."""
# Load user context
user = self.app_db.users.get(user_id)
# Inject into harness system prompt
enhanced_prompt = f"""
User: {user.name}
Account type: {user.account_type}
Previous interactions: {user.interaction_count}
Use this context to provide personalized responses.
"""
# Run with context
result = self.harness.run(
user_input,
system_prompt=enhanced_prompt,
context={"user_id": user_id, "account": user.account_type}
)
# Save interaction to app database
self.app_db.interactions.insert({
"user_id": user_id,
"input": user_input,
"output": result["output"],
"cost": result["cost_usd"],
"timestamp": datetime.utcnow()
})
return result
# Usage in Flask/FastAPI
harness_app = HarnessWithAppContext(harness, database)
@app.post("/ask")
def ask(request: AskRequest):
result = harness_app.run_with_app_context(
request.question,
user_id=request.user_id
)
return {"answer": result["output"], "cost": result["cost_usd"]}
Return Values & Metadata
# Standard return structure
result = harness.run(user_input)
# Always includes:
{
"output": "Final agent response",
"status": "success", # or "partial", "error"
# Cost tracking
"cost_usd": 0.0042,
"tokens_used": {
"input": 150,
"output": 200,
"total": 350
},
# Execution info
"iterations": 3,
"duration_seconds": 2.14,
"tools_called": ["web_search", "file_read"],
# Debug info (if enabled)
"reasoning": "Thought process from model",
"tool_calls": [
{
"tool": "web_search",
"input": {"query": "..."},
"output": "Search results...",
"duration": 0.5
}
]
}
Part 2: Harness as a REST API Service
When to Use REST API Pattern
Best for:
- Distributed systems (harness runs separately)
- Multiple applications consuming the harness
- Independent scaling (harness scales independently from app)
- Asynchronous processing with callbacks
- Rate limiting and quota management per consumer
Trade-offs:
- Network latency (100-500ms overhead)
- More complex deployment
- Requires API authentication and versioning
FastAPI Implementation
# harness_api.py
from fastapi import FastAPI, HTTPException, Header, Depends
from pydantic import BaseModel, Field
from typing import Optional
import json
from datetime import datetime
import os
from my_harness import AgentHarness, AgentConfig
app = FastAPI(title="Harness Agent API", version="1.0.0")
# Initialize harness once
config = AgentConfig(
model="claude-3-5-sonnet",
api_key=os.environ["CLAUDE_API_KEY"]
)
harness = AgentHarness(config=config)
# ===== Request/Response Models =====
class QueryRequest(BaseModel):
"""User query."""
query: str = Field(..., description="The user's question or task")
context: Optional[dict] = Field(None, description="Optional context")
max_iterations: int = Field(10, ge=1, le=50)
temperature: Optional[float] = Field(None, ge=0, le=2)
class ToolCall(BaseModel):
"""Record of a tool execution."""
tool_name: str
input: dict
output: str
duration_ms: float
status: str # "success" or "error"
class QueryResponse(BaseModel):
"""Successful query response."""
query_id: str
status: str # "success" or "partial"
result: str
iterations: int
cost_usd: float
tokens_used: int
duration_seconds: float
tools_called: list[ToolCall]
timestamp: str
class ErrorResponse(BaseModel):
"""Error response."""
error: str
code: str
details: Optional[dict] = None
# ===== Dependency: API Key Validation =====
async def verify_api_key(x_api_key: str = Header(...)) -> str:
"""Validate API key from header."""
# In production, verify against database or secrets manager
valid_keys = os.environ.get("VALID_API_KEYS", "").split(",")
if x_api_key not in valid_keys:
raise HTTPException(status_code=401, detail="Invalid API key")
return x_api_key
# ===== Endpoints =====
@app.post("/agent/query", response_model=QueryResponse)
async def agent_query(
request: QueryRequest,
api_key: str = Depends(verify_api_key)
) -> dict:
"""
Query the agent harness.
**Parameters**:
- query: User question/task
- context: Optional application context
- max_iterations: Max loops agent can perform
**Returns**:
- Result from agent
- Cost in USD
- Tools called
- Execution time
"""
import uuid
query_id = str(uuid.uuid4())
start_time = datetime.utcnow()
try:
# Run harness
result = harness.run(
request.query,
context=request.context,
max_iterations=request.max_iterations,
temperature=request.temperature
)
duration = (datetime.utcnow() - start_time).total_seconds()
return QueryResponse(
query_id=query_id,
status=result.get("status", "success"),
result=result["output"],
iterations=result.get("iterations", 0),
cost_usd=result.get("cost_usd", 0),
tokens_used=result.get("tokens_used", {}).get("total", 0),
duration_seconds=duration,
tools_called=[
ToolCall(**tool) for tool in result.get("tool_calls", [])
],
timestamp=start_time.isoformat()
)
except Exception as e:
raise HTTPException(
status_code=500,
detail={
"error": str(e),
"query_id": query_id,
"code": "harness_error"
}
)
@app.get("/health")
async def health_check():
"""Health check endpoint."""
return {
"status": "healthy",
"timestamp": datetime.utcnow().isoformat()
}
@app.get("/ready")
async def readiness_check():
"""Readiness probe (can accept requests?)."""
try:
# Quick test: can we call the model?
harness.run("Acknowledge", max_iterations=1)
return {
"ready": True,
"model_responsive": True,
"timestamp": datetime.utcnow().isoformat()
}
except Exception as e:
return {
"ready": False,
"model_responsive": False,
"error": str(e)
}, 503
@app.get("/metrics")
async def metrics():
"""Prometheus-compatible metrics endpoint."""
# Collect from harness or database
return {
"requests_total": harness.metrics.get("total_requests", 0),
"requests_success": harness.metrics.get("successful_requests", 0),
"cost_total_usd": harness.metrics.get("total_cost_usd", 0),
"tokens_used_total": harness.metrics.get("total_tokens", 0)
}
# Run server
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
Client Library
# harness_client.py - For consuming applications
import httpx
import json
from typing import Optional
class HarnessClient:
"""Client for interacting with Harness API."""
def __init__(self, base_url: str, api_key: str):
self.base_url = base_url.rstrip("/")
self.api_key = api_key
self.client = httpx.Client(
headers={"X-API-Key": api_key},
timeout=30.0
)
def query(self, prompt: str, context: Optional[dict] = None) -> dict:
"""Send query to harness."""
response = self.client.post(
f"{self.base_url}/agent/query",
json={
"query": prompt,
"context": context,
"max_iterations": 10
}
)
response.raise_for_status()
return response.json()
def health(self) -> bool:
"""Check if harness is healthy."""
response = self.client.get(f"{self.base_url}/health")
return response.status_code == 200
def close(self):
"""Close HTTP connection."""
self.client.close()
# Usage in consuming app
harness = HarnessClient(
base_url="https://harness.internal:8000",
api_key=os.environ["HARNESS_API_KEY"]
)
result = harness.query("Summarize my sales data")
print(f"Cost: ${result['cost_usd']}")
print(f"Result: {result['result']}")
Error Handling & HTTP Status Codes
from fastapi import HTTPException, status
# Standardized error codes
class HarnessErrorCode:
LOOP_LIMIT_EXCEEDED = "loop_limit_exceeded" # 400
TOOL_EXECUTION_ERROR = "tool_error" # 422
INVALID_INPUT = "invalid_input" # 400
API_KEY_INVALID = "auth_failed" # 401
RATE_LIMIT_EXCEEDED = "rate_limit_exceeded" # 429
INTERNAL_ERROR = "internal_error" # 500
SERVICE_UNAVAILABLE = "service_unavailable" # 503
@app.exception_handler(Exception)
async def general_exception_handler(request, exc):
"""Global error handler."""
return {
"error": str(exc),
"code": HarnessErrorCode.INTERNAL_ERROR,
"timestamp": datetime.utcnow().isoformat()
}, 500
# Specific handlers for known errors
@app.post("/agent/query")
async def agent_query_with_error_handling(request: QueryRequest):
try:
return await agent_query(request)
except LoopLimitExceeded:
raise HTTPException(
status_code=400,
detail={
"error": "Agent exceeded iteration limit",
"code": HarnessErrorCode.LOOP_LIMIT_EXCEEDED
}
)
except ToolExecutionError:
raise HTTPException(
status_code=422,
detail={
"error": "Tool execution failed",
"code": HarnessErrorCode.TOOL_EXECUTION_ERROR
}
)
Part 3: Harness as Async/Background Job
When to Use Background Jobs
Best for:
- Long-running agents (>10 seconds)
- Batch processing
- Fire-and-forget operations
- When latency isn’t critical
Celery + Redis Pattern
# tasks.py - Celery tasks
from celery import Celery
from kombu import Exchange, Queue
from my_harness import AgentHarness
import os
import json
# Configure Celery
app = Celery(
'harness_tasks',
broker=os.environ["CELERY_BROKER_URL"], # redis://localhost:6379
backend=os.environ["CELERY_RESULT_BACKEND"]
)
app.conf.update(
task_serializer='json',
accept_content=['json'],
result_serializer='json',
timezone='UTC',
enable_utc=True,
task_track_started=True,
task_time_limit=30 * 60, # 30 minutes hard limit
task_soft_time_limit=25 * 60 # 25 minutes soft limit (gives grace period)
)
# Initialize harness once (reused by all tasks)
harness = AgentHarness(config=AgentConfig(
model="claude-3-5-sonnet",
api_key=os.environ["CLAUDE_API_KEY"]
))
@app.task(
bind=True,
name='harness.query',
max_retries=3
)
def harness_query(self, query: str, user_id: str, context: dict = None):
"""
Run harness query as background job.
Args:
query: User query
user_id: User requesting the query
context: Optional application context
Returns:
dict with result, cost, metadata
"""
try:
# Log job start
print(f"[{self.request.id}] Starting harness query for user {user_id}")
# Run harness
result = harness.run(query, context=context, max_iterations=10)
# Log completion
print(f"[{self.request.id}] Completed in {result['duration_seconds']}s, cost: ${result['cost_usd']}")
return {
"status": "success",
"query_id": self.request.id,
"result": result["output"],
"cost_usd": result["cost_usd"],
"iterations": result["iterations"],
"user_id": user_id
}
except Exception as exc:
# Retry with exponential backoff
raise self.retry(exc=exc, countdown=2 ** self.request.retries)
@app.task(
bind=True,
name='harness.batch',
max_retries=2
)
def harness_batch(self, queries: list, user_id: str):
"""Process multiple queries."""
results = []
total_cost = 0
for i, query in enumerate(queries):
try:
result = harness.run(query, max_iterations=5)
results.append({
"query": query,
"result": result["output"],
"cost": result["cost_usd"]
})
total_cost += result["cost_usd"]
except Exception as e:
results.append({
"query": query,
"error": str(e)
})
return {
"status": "completed",
"user_id": user_id,
"results": results,
"total_cost_usd": total_cost,
"completed_count": sum(1 for r in results if "result" in r)
}
# Usage from Flask/FastAPI
from celery.result import AsyncResult
@app.post("/query/async")
def query_async(request: QueryRequest):
"""Submit harness query as background job."""
# Submit task to Celery
task = harness_query.delay(
query=request.query,
user_id=request.user_id,
context=request.context
)
return {
"task_id": task.id,
"status": "queued",
"check_url": f"/query/status/{task.id}"
}
@app.get("/query/status/{task_id}")
def query_status(task_id: str):
"""Check status of background job."""
result = AsyncResult(task_id, app=app)
if result.state == "PENDING":
return {"status": "pending", "task_id": task_id}
elif result.state == "SUCCESS":
return {
"status": "completed",
"task_id": task_id,
"result": result.result
}
elif result.state == "FAILURE":
return {
"status": "failed",
"task_id": task_id,
"error": str(result.info)
}
else: # RETRY, STARTED, etc.
return {"status": result.state, "task_id": task_id}
@app.get("/query/result/{task_id}")
def query_result(task_id: str):
"""Get result of completed job."""
result = AsyncResult(task_id, app=app)
if result.ready():
if result.successful():
return result.result
else:
raise HTTPException(status_code=500, detail=str(result.info))
raise HTTPException(status_code=202, detail="Task still processing")
Webhook Callbacks (Job Completion)
# harness_tasks_with_webhooks.py
import httpx
@app.task(bind=True, name='harness.query_with_webhook')
def harness_query_with_webhook(self, query: str, webhook_url: str):
"""Run harness and POST result to webhook."""
try:
result = harness.run(query)
# POST result to webhook
client = httpx.Client()
response = client.post(
webhook_url,
json={
"task_id": self.request.id,
"status": "completed",
"result": result["output"],
"cost_usd": result["cost_usd"]
},
timeout=30
)
response.raise_for_status()
return {
"status": "success",
"webhook_status": response.status_code
}
except Exception as e:
# Retry webhook if it fails
try:
client = httpx.Client()
client.post(
webhook_url,
json={
"task_id": self.request.id,
"status": "failed",
"error": str(e)
}
)
except:
pass # Webhook delivery failure is logged but not retried infinitely
raise
# Usage
@app.post("/query/with-callback")
def query_with_callback(request: QueryWithCallbackRequest):
"""Submit query with callback URL."""
task = harness_query_with_webhook.delay(
query=request.query,
webhook_url=request.callback_url
)
return {
"task_id": task.id,
"message": "Query queued. Result will be POSTed to callback URL"
}
Timeout Handling
from celery.exceptions import SoftTimeLimitExceeded
@app.task(bind=True, name='harness.query_with_timeout')
def harness_query_with_timeout(self, query: str):
"""Handle graceful timeout."""
try:
# This will raise SoftTimeLimitExceeded at 25 minutes
result = harness.run(query)
return {"status": "success", "result": result}
except SoftTimeLimitExceeded:
# Gracefully save partial progress
partial_result = harness.get_partial_result()
return {
"status": "timeout",
"partial_result": partial_result,
"message": "Agent exceeded time limit. Partial results above."
}
Part 4: Event-Driven Integration
When to Use Event-Driven
Best for:
- Responding to real-world events (user messages, file uploads, API webhooks)
- Streaming data (Kafka, Pub/Sub topics)
- Decoupled microservices
- Building event sourcing systems
Kafka Event Consumer
# kafka_consumer.py
from kafka import KafkaConsumer
from my_harness import AgentHarness
import json
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
harness = AgentHarness(config=AgentConfig(
model="claude-3-5-sonnet",
api_key=os.environ["CLAUDE_API_KEY"]
))
# Kafka consumer
consumer = KafkaConsumer(
'user_queries', # Topic name
bootstrap_servers=os.environ["KAFKA_BOOTSTRAP_SERVERS"].split(","),
group_id='harness-consumer',
value_deserializer=lambda x: json.loads(x.decode('utf-8')),
auto_offset_reset='earliest',
enable_auto_commit=True,
max_poll_records=1 # Process one at a time
)
logger.info("Kafka consumer started, listening on 'user_queries' topic")
for message in consumer:
"""Process each event."""
event = message.value
try:
logger.info(f"Processing event: {event.get('id')}")
# Run harness on event data
result = harness.run(
query=event["user_query"],
context={
"user_id": event.get("user_id"),
"session_id": event.get("session_id"),
"metadata": event.get("metadata", {})
},
max_iterations=5
)
# Produce result to output topic
producer.send(
"agent_responses",
value={
"event_id": event["id"],
"user_id": event.get("user_id"),
"result": result["output"],
"cost_usd": result["cost_usd"],
"iterations": result["iterations"]
}
)
logger.info(f"Completed event {event['id']}")
except Exception as e:
logger.error(f"Failed processing event {event.get('id')}: {str(e)}")
# Send to dead-letter topic
producer.send(
"agent_errors",
value={
"event_id": event["id"],
"error": str(e),
"original_event": event
}
)
Google Pub/Sub Event Handler
# pubsub_handler.py
from google.cloud import pubsub_v1
from my_harness import AgentHarness
import json
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
harness = AgentHarness(config=AgentConfig(
model="claude-3-5-sonnet",
api_key=os.environ["CLAUDE_API_KEY"]
))
def message_handler(message):
"""Handle incoming Pub/Sub message."""
try:
event = json.loads(message.data.decode('utf-8'))
logger.info(f"Processing event: {event.get('id')}")
# Run harness
result = harness.run(
query=event["user_query"],
context=event.get("context", {}),
max_iterations=10
)
# Publish result
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(
os.environ["GCP_PROJECT"],
"agent-responses"
)
publisher.publish(
topic_path,
data=json.dumps({
"event_id": event["id"],
"result": result["output"],
"cost_usd": result["cost_usd"]
}).encode('utf-8')
)
# Acknowledge message
message.ack()
except Exception as e:
logger.error(f"Error processing message: {str(e)}")
# Negative acknowledgement will requeue the message
message.nack()
# Subscribe and listen
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(
os.environ["GCP_PROJECT"],
"harness-subscription"
)
streaming_pull_future = subscriber.subscribe(
subscription_path,
callback=message_handler,
flow_control=pubsub_v1.types.FlowControl(max_messages=1)
)
logger.info("Pub/Sub listener started")
try:
streaming_pull_future.result()
except KeyboardInterrupt:
streaming_pull_future.cancel()
Stateful Event Processing (Multiple Events)
# stateful_event_processor.py
from dataclasses import dataclass
from typing import Optional
import json
@dataclass
class EventSession:
"""Session state across multiple events."""
session_id: str
user_id: str
events_processed: int = 0
accumulated_context: dict = None
total_cost: float = 0
def __post_init__(self):
if self.accumulated_context is None:
self.accumulated_context = {}
class StatefulEventProcessor:
"""Process multiple related events with shared state."""
def __init__(self, harness: AgentHarness, db):
self.harness = harness
self.db = db # Database for session persistence
self.sessions = {} # In-memory cache
def process_event(self, event: dict) -> dict:
"""Process event, maintaining session state."""
session_id = event.get("session_id")
# Load or create session
session = self._get_or_create_session(session_id, event)
# Accumulate context from previous events
enhanced_context = {
**session.accumulated_context,
**event.get("context", {})
}
# Build prompt that includes session history
prompt = f"""
Session: {session_id}
Events processed so far: {session.events_processed}
Previous context: {json.dumps(session.accumulated_context)}
New event: {event.get('user_query')}
"""
# Run harness
result = self.harness.run(prompt, context=enhanced_context)
# Update session state
session.events_processed += 1
session.total_cost += result["cost_usd"]
session.accumulated_context.update(enhanced_context)
# Save session to database
self._save_session(session)
return {
"event_id": event.get("id"),
"session_id": session_id,
"result": result["output"],
"session_total_cost": session.total_cost,
"events_in_session": session.events_processed
}
def _get_or_create_session(self, session_id: str, event: dict) -> EventSession:
"""Get session from cache or database."""
if session_id in self.sessions:
return self.sessions[session_id]
# Try to load from database
session_data = self.db.sessions.find_one({"session_id": session_id})
if session_data:
session = EventSession(**session_data)
else:
session = EventSession(
session_id=session_id,
user_id=event.get("user_id")
)
self.sessions[session_id] = session
return session
def _save_session(self, session: EventSession):
"""Persist session to database."""
self.db.sessions.update_one(
{"session_id": session.session_id},
{
"$set": {
"user_id": session.user_id,
"events_processed": session.events_processed,
"accumulated_context": session.accumulated_context,
"total_cost": session.total_cost,
"updated_at": datetime.utcnow()
}
},
upsert=True
)
Part 5: GraphQL Endpoint
When to Use GraphQL
Best for:
- Complex, nested data queries
- Multiple clients with different data needs
- Frontend applications that want to query specific fields
- Building internal APIs with strict schemas
GraphQL Schema & Resolver
# schema.py
import graphene
from graphene import ObjectType, String, Float, Int, List, Field
from my_harness import AgentHarness
import asyncio
# Result type
class AgentResult(ObjectType):
"""Result from agent execution."""
query_id = String(required=True)
output = String(required=True)
cost_usd = Float(required=True)
tokens_used = Int(required=True)
iterations = Int(required=True)
tools_called = List(String)
# Query type
class Query(ObjectType):
"""GraphQL queries for harness."""
agent_query = Field(
AgentResult,
query=String(required=True),
context=graphene.JSONString(required=False)
)
agent_batch = Field(
List(AgentResult),
queries=List(String, required=True)
)
def resolve_agent_query(self, info, query, context=None):
"""Resolve single agent query."""
harness = info.context.harness
result = harness.run(query, context=context)
return AgentResult(
query_id=info.context.request_id,
output=result["output"],
cost_usd=result["cost_usd"],
tokens_used=result["tokens_used"].get("total", 0),
iterations=result.get("iterations", 0),
tools_called=result.get("tools_called", [])
)
def resolve_agent_batch(self, info, queries):
"""Resolve multiple queries."""
harness = info.context.harness
results = []
for query in queries:
result = harness.run(query)
results.append(AgentResult(
query_id=f"{info.context.request_id}:{len(results)}",
output=result["output"],
cost_usd=result["cost_usd"],
tokens_used=result["tokens_used"].get("total", 0),
iterations=result.get("iterations", 0),
tools_called=result.get("tools_called", [])
))
return results
# Mutations for stateful operations
class ExecuteAgent(graphene.Mutation):
"""Execute agent with side effects."""
class Arguments:
query = String(required=True)
persist = graphene.Boolean(required=False, default_value=False)
result = Field(AgentResult)
success = graphene.Boolean()
message = String()
def mutate(self, info, query, persist=False):
harness = info.context.harness
result = harness.run(query)
if persist:
# Save to database
info.context.db.executions.insert({
"query": query,
"result": result["output"],
"cost": result["cost_usd"]
})
return ExecuteAgent(
result=AgentResult(
query_id=info.context.request_id,
output=result["output"],
cost_usd=result["cost_usd"],
tokens_used=result["tokens_used"].get("total", 0),
iterations=result.get("iterations", 0),
tools_called=result.get("tools_called", [])
),
success=True,
message="Execution completed"
)
class Mutation(ObjectType):
execute_agent = ExecuteAgent.Field()
# Create schema
schema = graphene.Schema(query=Query, mutation=Mutation)
# Usage with FastAPI
from ariadne import graphql_sync, make_executable_schema
from fastapi import FastAPI
from starlette.requests import Request
app = FastAPI()
@app.post("/graphql")
async def graphql_endpoint(request: Request):
"""GraphQL endpoint."""
body = await request.json()
# Create context
context = {
"harness": harness,
"db": database,
"request_id": str(uuid.uuid4())
}
# Execute query
success, result = graphql_sync(
schema,
body,
context_value=context
)
return {
"success": success,
"data": result.get("data"),
"errors": result.get("errors")
}
Part 6: WebSocket/Streaming
When to Use WebSocket
Best for:
- Real-time interactions (live agent reasoning)
- Streaming LLM token responses
- Live tool execution progress
- Long-running operations with progress updates
- Web/mobile apps that want live feedback
FastAPI WebSocket Implementation
# websocket_server.py
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from my_harness import AgentHarness
import json
import asyncio
from datetime import datetime
app = FastAPI()
harness = AgentHarness(config=AgentConfig(
model="claude-3-5-sonnet",
api_key=os.environ["CLAUDE_API_KEY"]
))
@app.websocket("/ws/agent")
async def websocket_agent(websocket: WebSocket):
"""WebSocket endpoint for real-time agent interaction."""
await websocket.accept()
try:
while True:
# Receive message from client
data = await websocket.receive_text()
message = json.loads(data)
if message.get("type") == "query":
query = message.get("content")
# Send acknowledgment
await websocket.send_json({
"type": "status",
"status": "processing",
"timestamp": datetime.utcnow().isoformat()
})
# Run agent with streaming callback
async def stream_callback(event):
"""Called for each reasoning step."""
await websocket.send_json({
"type": "reasoning",
"content": event.get("thought"),
"timestamp": datetime.utcnow().isoformat()
})
# Run harness
result = await asyncio.to_thread(
harness.run_with_streaming,
query,
callback=stream_callback
)
# Send final result
await websocket.send_json({
"type": "result",
"content": result["output"],
"cost_usd": result["cost_usd"],
"iterations": result["iterations"],
"timestamp": datetime.utcnow().isoformat()
})
elif message.get("type") == "stop":
# Stop agent
await websocket.send_json({
"type": "status",
"status": "stopped"
})
break
except WebSocketDisconnect:
print("Client disconnected")
JavaScript Client
// harness-client.js
class HarnessWebSocketClient {
constructor(wsUrl) {
this.wsUrl = wsUrl;
this.ws = null;
this.messageHandlers = {};
}
connect() {
return new Promise((resolve, reject) => {
this.ws = new WebSocket(this.wsUrl);
this.ws.onopen = () => {
console.log("Connected to harness");
resolve();
};
this.ws.onerror = (error) => {
console.error("WebSocket error:", error);
reject(error);
};
this.ws.onmessage = (event) => {
const message = JSON.parse(event.data);
this.handleMessage(message);
};
});
}
query(prompt) {
this.ws.send(JSON.stringify({
type: "query",
content: prompt
}));
}
onReasoning(callback) {
this.messageHandlers.reasoning = callback;
}
onResult(callback) {
this.messageHandlers.result = callback;
}
onStatus(callback) {
this.messageHandlers.status = callback;
}
handleMessage(message) {
const handler = this.messageHandlers[message.type];
if (handler) {
handler(message);
}
}
disconnect() {
if (this.ws) {
this.ws.close();
}
}
}
// Usage
const client = new HarnessWebSocketClient('ws://localhost:8000/ws/agent');
await client.connect();
client.onStatus((msg) => {
console.log("Status:", msg.status);
});
client.onReasoning((msg) => {
console.log("Agent thinking:", msg.content);
document.getElementById('thinking').textContent = msg.content;
});
client.onResult((msg) => {
console.log("Result:", msg.content);
document.getElementById('result').textContent = msg.content;
console.log("Cost: $" + msg.cost_usd);
});
client.query("What's the weather like?");
Part 7: Third-Party API Integrations
Slack Bot
# slack_bot.py
from slack_bolt import App
from slack_bolt.adapter.fastapi import SlackRequestHandler
from fastapi import FastAPI
from my_harness import AgentHarness
import os
# Initialize Slack app
slack_app = App(
token=os.environ["SLACK_BOT_TOKEN"],
signing_secret=os.environ["SLACK_SIGNING_SECRET"]
)
harness = AgentHarness(config=AgentConfig(
model="claude-3-5-sonnet",
api_key=os.environ["CLAUDE_API_KEY"]
))
@slack_app.message(".*")
def handle_message(message, say, client):
"""Handle direct messages to bot."""
user_id = message["user"]
text = message["text"]
# Send "thinking..." reaction
client.reactions_add(
channel=message["channel"],
timestamp=message["ts"],
emoji="thinking_face"
)
try:
# Run harness
result = harness.run(text)
# Send result
say(f"*Agent Response*\n{result['output']}\n\n_Cost: ${result['cost_usd']:.4f}_")
except Exception as e:
say(f"Error: {str(e)}")
@slack_app.command("/agent")
def handle_agent_command(ack, body, say):
"""Handle /agent slash command."""
ack()
query = body.get("text")
user_id = body.get("user_id")
try:
result = harness.run(query)
say(f"*Query:* {query}\n*Response:* {result['output']}\n*Cost:* ${result['cost_usd']:.4f}")
except Exception as e:
say(f"Error processing query: {str(e)}")
# FastAPI app
app = FastAPI()
# Register Slack handler
handler = SlackRequestHandler(slack_app)
@app.post("/slack/events")
async def slack_events(req):
return await handler.handle(req)
Discord Bot
# discord_bot.py
import discord
from discord.ext import commands
from my_harness import AgentHarness
import os
# Initialize bot
intents = discord.Intents.default()
intents.message_content = True
bot = commands.Bot(command_prefix="!", intents=intents)
harness = AgentHarness(config=AgentConfig(
model="claude-3-5-sonnet",
api_key=os.environ["CLAUDE_API_KEY"]
)
@bot.event
async def on_ready():
print(f"Bot logged in as {bot.user}")
@bot.command(name="ask")
async def ask_command(ctx, *, question):
"""Ask the agent a question."""
async with ctx.typing(): # Show typing indicator
try:
result = harness.run(question)
# Split long responses (Discord has 2000 char limit)
response = result["output"]
if len(response) > 2000:
response = response[:1997] + "..."
embed = discord.Embed(
title="Agent Response",
description=response,
color=discord.Color.blue()
)
embed.add_field(name="Cost", value=f"${result['cost_usd']:.4f}", inline=True)
embed.add_field(name="Iterations", value=str(result["iterations"]), inline=True)
await ctx.send(embed=embed)
except Exception as e:
await ctx.send(f"Error: {str(e)}")
@bot.command(name="status")
async def status_command(ctx):
"""Check agent status."""
await ctx.send("Agent is online and ready!")
# Run bot
bot.run(os.environ["DISCORD_BOT_TOKEN"])
Telegram Bot
# telegram_bot.py
from telegram import Update
from telegram.ext import Application, CommandHandler, MessageHandler, filters, ContextTypes
from my_harness import AgentHarness
import os
harness = AgentHarness(config=AgentConfig(
model="claude-3-5-sonnet",
api_key=os.environ["CLAUDE_API_KEY"]
))
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Start command."""
await update.message.reply_text(
"Hi! I'm an AI agent. Send me a message and I'll respond!"
)
async def handle_message(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Handle user messages."""
user_message = update.message.text
# Send typing indicator
await context.bot.send_chat_action(
chat_id=update.effective_chat.id,
action="typing"
)
try:
result = harness.run(user_message)
await update.message.reply_text(
f"{result['output']}\n\n_Cost: ${result['cost_usd']:.4f}_",
parse_mode="Markdown"
)
except Exception as e:
await update.message.reply_text(f"Error: {str(e)}")
# Create application
application = Application.builder().token(
os.environ["TELEGRAM_BOT_TOKEN"]
).build()
# Add handlers
application.add_handler(CommandHandler("start", start))
application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, handle_message))
# Start bot
application.run_polling()
Part 8: Database Integration
Reading State from Database
# database_integration.py
from sqlalchemy import create_engine, Column, String, DateTime, Text
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from datetime import datetime
import json
Base = declarative_base()
class UserProfile(Base):
"""User profile from application database."""
__tablename__ = "users"
id = Column(String, primary_key=True)
name = Column(String)
email = Column(String)
preferences = Column(Text) # JSON
created_at = Column(DateTime)
class Agent Context(Base):
"""Context extracted from database."""
__tablename__ = "agent_context"
user_id = Column(String, primary_key=True)
context_json = Column(Text)
last_updated = Column(DateTime)
class HarnessWithDatabase:
"""Harness that loads state from database."""
def __init__(self, harness: AgentHarness, db_url: str):
self.harness = harness
self.engine = create_engine(db_url)
self.SessionLocal = sessionmaker(bind=self.engine)
def run_for_user(self, query: str, user_id: str) -> dict:
"""Run harness with user context from database."""
session = self.SessionLocal()
try:
# Load user profile
user = session.query(UserProfile).filter_by(id=user_id).first()
if not user:
raise ValueError(f"User {user_id} not found")
# Load cached context
context_record = session.query(AgentContext).filter_by(
user_id=user_id
).first()
context = json.loads(context_record.context_json) if context_record else {}
# Enhance with user profile
context.update({
"user_name": user.name,
"user_email": user.email,
"user_preferences": json.loads(user.preferences) if user.preferences else {}
})
# Run harness with context
result = self.harness.run(query, context=context)
# Save updated context
if not context_record:
context_record = AgentContext(
user_id=user_id,
context_json=json.dumps(context),
last_updated=datetime.utcnow()
)
session.add(context_record)
else:
context_record.context_json = json.dumps(context)
context_record.last_updated = datetime.utcnow()
session.commit()
return result
finally:
session.close()
Writing Results to Database
class AgentExecution(Base):
"""Record of agent execution."""
__tablename__ = "agent_executions"
id = Column(String, primary_key=True)
user_id = Column(String)
query = Column(Text)
result = Column(Text)
cost_usd = Column(Float)
tokens_used = Column(Integer)
duration_seconds = Column(Float)
created_at = Column(DateTime, default=datetime.utcnow)
def save_execution(session, execution_record: AgentExecution):
"""Save harness execution to database."""
session.add(execution_record)
session.commit()
# Usage
result = harness.run(query)
execution = AgentExecution(
id=str(uuid.uuid4()),
user_id=user_id,
query=query,
result=result["output"],
cost_usd=result["cost_usd"],
tokens_used=result["tokens_used"].get("total", 0),
duration_seconds=result.get("duration_seconds", 0)
)
save_execution(db_session, execution)
Transaction Management
from sqlalchemy import transaction
def transactional_harness_run(harness, query, user_id, db_session):
"""Run harness within database transaction."""
with db_session.begin(): # Auto-rollback on exception
try:
# Update user state before harness
user = db_session.query(UserProfile).filter_by(id=user_id).with_for_update().first()
user.last_query_at = datetime.utcnow()
# Run harness
result = harness.run(query)
# Save result
execution = AgentExecution(
user_id=user_id,
query=query,
result=result["output"],
cost_usd=result["cost_usd"]
)
db_session.add(execution)
# Update cost tracking
user.total_cost += result["cost_usd"]
user.total_executions += 1
# All changes committed together
return result
except Exception as e:
# Transaction automatically rolled back
logger.error(f"Transaction failed: {str(e)}")
raise
Part 9: File System Integration
Reading and Writing Files
# file_integration.py
import os
from pathlib import Path
def harness_with_file_context(harness, query: str, project_dir: str) -> dict:
"""Run harness with access to project files."""
# Load relevant files
context = {
"project_structure": _get_directory_structure(project_dir),
"readme_content": _read_file(os.path.join(project_dir, "README.md")),
"requirements": _read_file(os.path.join(project_dir, "requirements.txt"))
}
# Run harness
result = harness.run(query, context=context)
# Agent may have written files - collect them
written_files = _collect_written_files(project_dir)
return {
**result,
"files_created": written_files
}
def _get_directory_structure(root_dir: str, max_depth=3) -> str:
"""Get project directory structure."""
def get_tree(path, prefix="", depth=0):
if depth > max_depth:
return ""
try:
entries = sorted(os.listdir(path))
except PermissionError:
return ""
lines = []
for entry in entries:
if entry.startswith("."):
continue
full_path = os.path.join(path, entry)
is_dir = os.path.isdir(full_path)
lines.append(f"{prefix}{'├── ' if is_dir else '└── '}{entry}")
if is_dir:
lines.append(get_tree(
full_path,
prefix + ("│ " if is_dir else " "),
depth + 1
))
return "\n".join(lines)
return get_tree(root_dir)
def _read_file(file_path: str, max_size: int = 5000) -> str:
"""Read file with size limit."""
try:
with open(file_path, 'r') as f:
content = f.read(max_size)
return content + "..." if len(content) == max_size else content
except FileNotFoundError:
return ""
Object Storage (S3, GCS)
# object_storage_integration.py
import boto3
from google.cloud import storage as gcs
class HarnessWithS3:
"""Harness with S3 file access."""
def __init__(self, harness, bucket_name: str):
self.harness = harness
self.s3 = boto3.client('s3')
self.bucket = bucket_name
def run_with_s3_files(self, query: str, file_keys: list) -> dict:
"""Run harness with files from S3."""
# Download files
files_content = {}
for key in file_keys:
try:
response = self.s3.get_object(Bucket=self.bucket, Key=key)
content = response['Body'].read().decode('utf-8')
files_content[key] = content
except Exception as e:
files_content[key] = f"Error: {str(e)}"
# Run harness with file content
context = {"s3_files": files_content}
result = self.harness.run(query, context=context)
# If agent generated files, upload them
if result.get("generated_files"):
for filename, content in result["generated_files"].items():
self.s3.put_object(
Bucket=self.bucket,
Key=f"generated/{filename}",
Body=content.encode('utf-8')
)
return result
class HarnessWithGCS:
"""Harness with Google Cloud Storage access."""
def __init__(self, harness, bucket_name: str, project_id: str):
self.harness = harness
self.client = gcs.Client(project=project_id)
self.bucket = self.client.bucket(bucket_name)
def run_with_gcs_files(self, query: str, file_names: list) -> dict:
"""Run harness with files from GCS."""
files_content = {}
for name in file_names:
try:
blob = self.bucket.blob(name)
content = blob.download_as_string().decode('utf-8')
files_content[name] = content
except Exception as e:
files_content[name] = f"Error: {str(e)}"
context = {"gcs_files": files_content}
result = self.harness.run(query, context=context)
# Upload generated files
if result.get("generated_files"):
for filename, content in result["generated_files"].items():
blob = self.bucket.blob(f"generated/{filename}")
blob.upload_from_string(content.encode('utf-8'))
return result
Part 10: Monitoring & Observability Integration
Sending Metrics to Prometheus
# prometheus_integration.py
from prometheus_client import Counter, Gauge, Histogram, generate_latest, CollectorRegistry
import time
class PrometheusHarnessMetrics:
"""Prometheus metrics for harness."""
def __init__(self):
self.registry = CollectorRegistry()
# Counters
self.requests_total = Counter(
'harness_requests_total',
'Total harness requests',
registry=self.registry
)
self.errors_total = Counter(
'harness_errors_total',
'Total harness errors',
['error_type'],
registry=self.registry
)
self.cost_total = Counter(
'harness_cost_usd_total',
'Total cost in USD',
registry=self.registry
)
# Gauges
self.active_requests = Gauge(
'harness_requests_active',
'Currently active requests',
registry=self.registry
)
self.cost_daily = Gauge(
'harness_cost_usd_daily',
'Daily cost in USD',
registry=self.registry
)
# Histograms
self.request_duration = Histogram(
'harness_request_duration_seconds',
'Request duration in seconds',
buckets=(0.5, 1.0, 2.5, 5.0, 10.0),
registry=self.registry
)
self.iterations = Histogram(
'harness_iterations',
'Number of agent iterations',
buckets=(1, 2, 3, 5, 10),
registry=self.registry
)
def harness_with_metrics(harness, metrics: PrometheusHarnessMetrics):
"""Wrap harness with Prometheus metrics."""
def run_with_metrics(query: str, **kwargs) -> dict:
metrics.requests_total.inc()
metrics.active_requests.inc()
start = time.time()
try:
result = harness.run(query, **kwargs)
# Record metrics
duration = time.time() - start
metrics.request_duration.observe(duration)
metrics.iterations.observe(result.get("iterations", 0))
metrics.cost_total.inc(result.get("cost_usd", 0))
metrics.cost_daily.inc(result.get("cost_usd", 0))
return result
except Exception as e:
metrics.errors_total.labels(error_type=type(e).__name__).inc()
raise
finally:
metrics.active_requests.dec()
return run_with_metrics
# FastAPI endpoint for Prometheus scraping
@app.get("/metrics")
async def metrics(metrics_obj: PrometheusHarnessMetrics):
return generate_latest(metrics_obj.registry)
Sending Logs to Centralized Logging
# logging_integration.py
import logging
import json
from pythonjsonlogger import jsonlogger
class HarnessJSONFormatter(jsonlogger.JsonFormatter):
"""JSON formatter for structured logging."""
def add_fields(self, log_record, record, message_dict):
super(HarnessJSONFormatter, self).add_fields(log_record, record, message_dict)
log_record['timestamp'] = datetime.utcnow().isoformat()
log_record['logger'] = record.name
log_record['level'] = record.levelname
if hasattr(record, 'user_id'):
log_record['user_id'] = record.user_id
if hasattr(record, 'query_id'):
log_record['query_id'] = record.query_id
# Configure logging
handler = logging.StreamHandler()
formatter = HarnessJSONFormatter('%(timestamp)s %(level)s %(message)s')
handler.setFormatter(formatter)
logger = logging.getLogger('harness')
logger.addHandler(handler)
logger.setLevel(logging.INFO)
# Log harness execution
def log_harness_execution(logger, query_id, query, result):
"""Log harness execution with structure."""
logger.info(
"Harness execution completed",
extra={
"query_id": query_id,
"query": query[:100], # First 100 chars
"result_length": len(result["output"]),
"cost_usd": result["cost_usd"],
"iterations": result.get("iterations"),
"duration_seconds": result.get("duration_seconds"),
"tools": result.get("tools_called", [])
}
)
Tracing Integration (Jaeger, Datadog)
# tracing_integration.py
from opentelemetry import trace, metrics
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.jaeger.thrift import JaegerExporter
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
from opentelemetry.instrumentation.requests import RequestsInstrumentor
# Initialize Jaeger exporter
jaeger_exporter = JaegerExporter(
agent_host_name="localhost",
agent_port=6831,
)
trace.set_tracer_provider(TracerProvider())
trace.get_tracer_provider().add_span_processor(
BatchSpanProcessor(jaeger_exporter)
)
tracer = trace.get_tracer(__name__)
# Instrument FastAPI
FastAPIInstrumentor.instrument_app(app)
RequestsInstrumentor().instrument()
# Use tracing in harness
def harness_with_tracing(harness):
"""Wrap harness with distributed tracing."""
def run_with_tracing(query: str, **kwargs) -> dict:
with tracer.start_as_current_span("harness.query") as span:
span.set_attribute("query", query)
# Trace tool calls
def traced_tool_call(tool_name, input_data):
with tracer.start_as_current_span(f"tool.{tool_name}") as tool_span:
tool_span.set_attribute("tool", tool_name)
tool_span.set_attribute("input", json.dumps(input_data))
result = harness.call_tool(tool_name, input_data)
tool_span.set_attribute("output_length", len(str(result)))
return result
result = harness.run(query, **kwargs)
span.set_attribute("cost_usd", result["cost_usd"])
span.set_attribute("iterations", result.get("iterations", 0))
span.set_attribute("status", result.get("status", "success"))
return result
return run_with_tracing
Part 11: Authentication & Authorization
API Key Authentication
# auth_integration.py
from fastapi import HTTPException, Header, Depends
from functools import lru_cache
import hmac
import hashlib
class APIKeyManager:
"""Manage API keys for harness access."""
def __init__(self, db):
self.db = db
def validate_api_key(self, api_key: str) -> str:
"""Validate API key and return user_id."""
record = self.db.api_keys.find_one({"key": api_key})
if not record or not record.get("active"):
raise HTTPException(status_code=401, detail="Invalid API key")
return record["user_id"]
def get_key_limit(self, api_key: str) -> int:
"""Get rate limit for this key."""
record = self.db.api_keys.find_one({"key": api_key})
return record.get("rate_limit_per_minute", 10)
# Dependency for FastAPI
api_key_manager = APIKeyManager(database)
async def verify_api_key(x_api_key: str = Header(...)) -> str:
"""FastAPI dependency to validate API key."""
return api_key_manager.validate_api_key(x_api_key)
@app.post("/agent/query")
async def agent_query(
request: QueryRequest,
user_id: str = Depends(verify_api_key)
) -> dict:
"""Only accessible with valid API key."""
result = harness.run(request.query)
# Log for user
database.executions.insert({
"user_id": user_id,
"query": request.query,
"cost": result["cost_usd"],
"timestamp": datetime.utcnow()
})
return result
OAuth 2.0 Integration
# oauth_integration.py
from fastapi_users import FastAPIUsers, BaseUserManager, sqlalchemy_to_pydantic
from fastapi_users.authentication import JwtAuthentication
from fastapi_users.db import SQLAlchemyUserDatabase
# User model
class User(Base):
id = Column(Integer, primary_key=True)
email = Column(String, unique=True)
hashed_password = Column(String)
is_active = Column(Boolean, default=True)
is_superuser = Column(Boolean, default=False)
rate_limit = Column(Integer, default=10) # Requests per minute
# OAuth setup
oauth2_scheme = FastAPIUsers(
SQLAlchemyUserDatabase(database, User),
[JwtAuthentication(secret="your-secret")],
User,
)
@app.post("/agent/query")
async def agent_query_oauth(
request: QueryRequest,
user: User = Depends(oauth2_scheme.current_user())
) -> dict:
"""OAuth-protected harness endpoint."""
result = harness.run(request.query)
# Log access
database.executions.insert({
"user_id": user.id,
"query": request.query,
"cost": result["cost_usd"]
})
return result
Rate Limiting Per User
# rate_limiting.py
from slowapi import Limiter
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded
limiter = Limiter(key_func=get_remote_address)
# Per-user rate limiting
async def rate_limit_key(user_id: str) -> str:
"""Rate limit by user ID instead of IP."""
return user_id
@app.post("/agent/query")
@limiter.limit("10/minute") # Default limit
async def agent_query_limited(
request: QueryRequest,
user_id: str = Depends(verify_api_key)
):
"""Rate-limited harness endpoint."""
# Check user-specific limit
limit = api_key_manager.get_key_limit(user_id)
result = harness.run(request.query)
return result
Part 12: Complete Integration Example
REST API + Webhook Callback + Slack Bot
# complete_integration.py
"""
Complete integration example:
- FastAPI REST API for harness queries
- Webhook callbacks for async results
- Slack bot using the REST API
- Cost tracking across all channels
"""
from fastapi import FastAPI, HTTPException, Header, Depends
from slack_bolt import App as SlackApp
from slack_bolt.adapter.fastapi import SlackRequestHandler
import httpx
import os
from datetime import datetime
from typing import Optional
import json
# ===== Initialize Components =====
app = FastAPI(title="Harness Integration Platform")
# Harness
harness = AgentHarness(config=AgentConfig(
model="claude-3-5-sonnet",
api_key=os.environ["CLAUDE_API_KEY"]
))
# Slack bot
slack_app = SlackApp(
token=os.environ["SLACK_BOT_TOKEN"],
signing_secret=os.environ["SLACK_SIGNING_SECRET"]
)
# Database
db_engine = create_engine(os.environ["DATABASE_URL"])
SessionLocal = sessionmaker(bind=db_engine)
# ===== Database Models =====
class ExecutionRecord(Base):
__tablename__ = "executions"
id = Column(String, primary_key=True)
user_id = Column(String)
channel = Column(String) # "rest_api", "slack", "webhook"
query = Column(Text)
result = Column(Text)
cost_usd = Column(Float)
tokens_used = Column(Integer)
duration_seconds = Column(Float)
created_at = Column(DateTime, default=datetime.utcnow)
class WebhookSubscription(Base):
__tablename__ = "webhook_subscriptions"
id = Column(String, primary_key=True)
user_id = Column(String)
webhook_url = Column(String)
events = Column(String) # JSON list of event types
active = Column(Boolean, default=True)
created_at = Column(DateTime, default=datetime.utcnow)
# ===== REST API Endpoints =====
async def verify_api_key(x_api_key: str = Header(...)) -> str:
"""Verify API key."""
if x_api_key != os.environ.get("API_KEY"):
raise HTTPException(status_code=401, detail="Invalid API key")
return "api_user"
@app.post("/query")
async def rest_query(
query: str,
user_id: str = Depends(verify_api_key),
callback_url: Optional[str] = None
) -> dict:
"""
Submit harness query via REST API.
If callback_url provided, result will be POSTed when ready.
"""
import uuid
execution_id = str(uuid.uuid4())
async def process_query():
"""Process in background."""
try:
result = harness.run(query)
# Save execution
session = SessionLocal()
execution = ExecutionRecord(
id=execution_id,
user_id=user_id,
channel="rest_api",
query=query,
result=result["output"],
cost_usd=result["cost_usd"],
tokens_used=result["tokens_used"].get("total", 0),
duration_seconds=result.get("duration_seconds", 0)
)
session.add(execution)
session.commit()
# Send webhook if provided
if callback_url:
async with httpx.AsyncClient() as client:
await client.post(
callback_url,
json={
"execution_id": execution_id,
"status": "completed",
"result": result["output"],
"cost_usd": result["cost_usd"]
}
)
except Exception as e:
# Send error webhook
if callback_url:
async with httpx.AsyncClient() as client:
await client.post(
callback_url,
json={
"execution_id": execution_id,
"status": "error",
"error": str(e)
}
)
# Start background task
import asyncio
asyncio.create_task(process_query())
return {
"execution_id": execution_id,
"status": "queued",
"callback_url": callback_url
}
@app.get("/execution/{execution_id}")
async def get_execution(execution_id: str):
"""Get execution result."""
session = SessionLocal()
execution = session.query(ExecutionRecord).filter_by(id=execution_id).first()
session.close()
if not execution:
raise HTTPException(status_code=404, detail="Execution not found")
return {
"id": execution.id,
"result": execution.result,
"cost_usd": execution.cost_usd,
"created_at": execution.created_at.isoformat()
}
# ===== Slack Bot =====
@slack_app.message(".*")
def handle_slack_message(message, say, client):
"""Handle Slack DMs."""
user_id = message["user"]
text = message["text"]
client.reactions_add(
channel=message["channel"],
timestamp=message["ts"],
emoji="hourglass_flowing_sand"
)
try:
result = harness.run(text)
# Save execution
session = SessionLocal()
execution = ExecutionRecord(
id=f"slack-{user_id}-{int(datetime.now().timestamp())}",
user_id=user_id,
channel="slack",
query=text,
result=result["output"],
cost_usd=result["cost_usd"],
tokens_used=result["tokens_used"].get("total", 0)
)
session.add(execution)
session.commit()
session.close()
say(f"*Response:*\n{result['output']}\n\n_Cost: ${result['cost_usd']:.4f}_")
except Exception as e:
say(f"Error: {str(e)}")
# Register Slack handler
handler = SlackRequestHandler(slack_app)
@app.post("/slack/events")
async def slack_events(req):
return await handler.handle(req)
# ===== Cost Tracking Across All Channels =====
@app.get("/analytics/cost")
async def cost_analytics():
"""Get cost breakdown by channel."""
session = SessionLocal()
result = session.query(
ExecutionRecord.channel,
func.sum(ExecutionRecord.cost_usd).label("total_cost"),
func.count(ExecutionRecord.id).label("count"),
func.avg(ExecutionRecord.cost_usd).label("avg_cost")
).group_by(ExecutionRecord.channel).all()
session.close()
return {
"by_channel": [
{
"channel": r[0],
"total_cost_usd": float(r[1]),
"request_count": r[2],
"avg_cost_usd": float(r[3])
}
for r in result
]
}
# ===== Main =====
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
Summary Table: Which Pattern to Use?
| Integration | Latency | Complexity | Scaling | Use When |
|---|---|---|---|---|
| Library | <100ms | Low | N/A | Single monolithic app |
| REST API | 100-500ms | Medium | Independent | Multiple apps, distributed system |
| Background Job (Celery) | 1-60s | Medium | Via queue | Long-running, batch processing |
| Event-Driven (Kafka) | 10-100ms | High | Excellent | Real-time, streaming architecture |
| GraphQL | 100-500ms | Medium-High | Independent | Complex query patterns, frontend apps |
| WebSocket | <50ms | Medium-High | Per-connection | Real-time interactions, live feedback |
| Slack/Discord/Telegram | 1-5s | Low | Via platform | User-facing chatbots |
| Database Integration | <100ms | Medium | Via DB | Shared state, multi-agent systems |
Related Documentation
- Architecture: See
06_harness_architecture.mdfor harness design patterns - Deployment: See
12_deployment_patterns.mdfor Docker, Kubernetes, CI/CD - Monitoring: See
09_operations_and_observability.mdfor logging, metrics, cost tracking - Security: See
10_security_and_safety.mdfor authentication, input validation, rate limiting - Testing: See
11_testing_and_qa.mdfor testing integrated harnesses
Validation Checklist
How do you know you got this right?
Performance Checks
- Library integration: <100ms latency, no network overhead
- REST API: <500ms p95 latency, 99%+ uptime
- Async jobs: queue latency <1s, processing <60s for typical task
- Event-driven: <100ms event-to-response latency
- WebSocket: <50ms round-trip for interactive feedback
Implementation Checks
- Integration pattern chosen based on latency/complexity requirements
- Error handling implemented: circuit breaker, retry logic, fallback
- Authentication working: correct credentials, tokens, API keys passed
- Request/response formats correct: matching integration partner’s spec
- Monitoring integrated: logging all requests, errors, latency metrics
- Rate limiting respected: not exceeding partner API quotas
- Tested against integration partner: real requests, not mocks
Integration Checks
- Harness components isolated: failure in one doesn’t cascade
- State synchronization working: shared state consistent across integrations
- Request tracing: can follow request from entry to exit
- Cost tracking: know cost of each integration type and request
- Graceful degradation: missing integration doesn’t break harness
Common Failure Modes
- Latency miscalculation: Library chosen for low-latency, turns out needs remote APIs
- Auth credentials hardcoded: Credentials in code; move to env vars/secrets
- No rate limit handling: API rate limits hit unexpectedly; implement backoff
- State inconsistency: Harness updates DB, but REST client reads stale copy
- Error swallowed: Integration fails silently; no logging of failures
- Timeout too short: Network hiccup causes false failure; increase timeout
Sign-Off Criteria
- Integration chosen and documented with reasoning (latency/complexity trade-off)
- Tested end-to-end with real integration partner (not mocked)
- Load tested: verified performance under expected usage
- Error scenarios tested: what happens when integration fails
- Monitoring in place: can see integration performance and failures
- Documentation complete: how to use, configure, troubleshoot
See Also
- Doc 06 (Harness Architecture): Integration patterns extend component 7 (Orchestration)
- Doc 09 (Operations): Monitoring integrated harnesses in production
- Doc 10 (Security & Safety): Authentication, authorization, credential management
Changelog
- April 2026: Created comprehensive integration patterns guide
- 12 integration patterns covering library to event-driven architectures
- REST API, WebSocket, GraphQL implementations with code examples
- Third-party platform integrations (Slack, Discord, Telegram)
- Database and file system integration patterns
- Monitoring, authentication, and complete end-to-end example
- Decision matrix for choosing integration pattern