Advanced Patterns
43 sophisticated patterns — tool composition, state machines, streaming, adaptive learning, multi-model orchestration, confidence scoring, and caching.
Phase 3: Optional but valuable patterns for scaling and complex systems. These patterns build on the foundational concepts in earlier docs and enable sophisticated use cases like multi-step workflows, adaptive learning, and high-performance systems.
1. Tool Composition & Pipelines
Concept
Tool composition chains the output of one tool into the input of another, enabling complex workflows without explicit multi-turn prompting. Pipelines enable reusable, testable, composable tool sequences. This is essential for multi-step processes like research, data processing, and decision workflows.
When to Use
- Multi-step data transformations (extract → parse → validate → store)
- ETL workflows where each stage is a distinct tool
- Error recovery: fallback chains when a tool fails
- Conditional branching based on tool results
- Parallel execution when steps are independent
- Complex tool chaining (5+ steps) with intermediate validation
- Reproducible workflows that need audit trails
Patterns
Sequential Pipeline: Tool A → Tool B → Tool C
class ToolPipeline:
def __init__(self, tools: list[Tool]):
self.tools = tools
self.execution_log = []
async def execute(self, initial_input: Any) -> Any:
"""Execute tools in sequence, passing output to next input."""
result = initial_input
for tool in self.tools:
result = await tool.invoke(result)
self.execution_log.append({
"tool": tool.name,
"input_size": len(str(initial_input)),
"output_size": len(str(result)),
"timestamp": datetime.now().isoformat()
})
return result
Conditional Pipeline: Branch based on tool output
class ConditionalPipeline:
def __init__(self, condition_tool: Tool, true_pipeline: ToolPipeline,
false_pipeline: ToolPipeline):
self.condition_tool = condition_tool
self.true_pipeline = true_pipeline
self.false_pipeline = false_pipeline
async def execute(self, data: Any) -> Any:
"""Execute condition tool; use result to branch."""
condition_result = await self.condition_tool.invoke(data)
# Assume condition_tool returns boolean or evaluable result
if condition_result.get("passes_condition"):
return await self.true_pipeline.execute(data)
else:
return await self.false_pipeline.execute(data)
Parallel Pipeline: Execute independent tools concurrently
class ParallelPipeline:
def __init__(self, tools: list[Tool]):
self.tools = tools
async def execute(self, data: Any) -> dict[str, Any]:
"""Execute all tools in parallel, return dict of results."""
tasks = [tool.invoke(data) for tool in self.tools]
results = await asyncio.gather(*tasks)
return {tool.name: result for tool, result in zip(self.tools, results)}
Fallback Chain: Try tools in order until one succeeds
class FallbackPipeline:
def __init__(self, tools: list[Tool]):
self.tools = tools
async def execute(self, data: Any) -> Any:
"""Try each tool; return first success, raise if all fail."""
last_error = None
for tool in self.tools:
try:
return await tool.invoke(data)
except Exception as e:
last_error = e
continue
raise RuntimeError(f"All fallback tools failed: {last_error}")
Complex Tool Chaining: 5-Step Research Agent
class ResearchAgentPipeline:
"""Multi-step research workflow: query -> search -> retrieve -> analyze -> summarize."""
def __init__(self, client, search_tool, retriever_tool, analyzer_tool):
self.client = client
self.search_tool = search_tool
self.retriever_tool = retriever_tool
self.analyzer_tool = analyzer_tool
async def research(self, topic: str) -> dict:
"""Execute 5-step research pipeline."""
# Step 1: Expand query with LLM
expanded_query = await self._expand_query(topic)
# Step 2: Search for sources
sources = await self.search_tool.invoke(expanded_query)
# Step 3: Retrieve and deduplicate documents
documents = await self._retrieve_and_deduplicate(sources)
# Step 4: Analyze each document
analyses = await self._analyze_documents(documents)
# Step 5: Synthesize findings
summary = await self._synthesize(analyses)
return {
"topic": topic,
"expanded_query": expanded_query,
"sources_found": len(sources),
"documents_analyzed": len(documents),
"key_findings": summary
}
async def _expand_query(self, topic: str) -> str:
"""Use LLM to expand user query with related terms."""
response = self.client.messages.create(
model="claude-sonnet-4",
max_tokens=200,
messages=[{
"role": "user",
"content": f"Expand this research query with related synonyms and keywords:\n{topic}\nReturn as space-separated terms."
}]
)
return response.content[0].text
async def _retrieve_and_deduplicate(self, sources: list) -> list:
"""Retrieve documents and remove duplicates."""
documents = []
seen_urls = set()
for source in sources:
if source["url"] not in seen_urls:
doc = await self.retriever_tool.invoke(source["url"])
documents.append(doc)
seen_urls.add(source["url"])
return documents
async def _analyze_documents(self, documents: list) -> list:
"""Analyze each document for relevance and key points."""
analyses = []
for doc in documents:
analysis = await self.analyzer_tool.invoke({
"document": doc,
"task": "extract_key_points"
})
analyses.append(analysis)
return analyses
async def _synthesize(self, analyses: list) -> str:
"""Synthesize findings into summary."""
synthesis_prompt = "Synthesize these research findings into key insights:\n\n"
for i, analysis in enumerate(analyses, 1):
synthesis_prompt += f"{i}. {analysis}\n"
response = self.client.messages.create(
model="claude-sonnet-4",
max_tokens=500,
messages=[{
"role": "user",
"content": synthesis_prompt
}]
)
return response.content[0].text
Conditional Tool Selection
class IntelligentToolSelector:
"""Choose tools dynamically based on input."""
def __init__(self, client, tools: dict[str, Tool]):
self.client = client
self.tools = tools # "pdf_extract", "html_parse", "json_validate", etc.
async def select_and_execute(self, data: Any) -> Any:
"""Analyze input and select appropriate tool."""
# Step 1: Classify input
input_type = await self._classify_input(data)
# Step 2: Select tools based on classification
selected_tools = self._get_tools_for_type(input_type)
# Step 3: Execute selected pipeline
return await ToolPipeline(selected_tools).execute(data)
async def _classify_input(self, data: Any) -> str:
"""Use LLM to classify input type."""
response = self.client.messages.create(
model="claude-haiku-4", # Fast classifier
max_tokens=50,
messages=[{
"role": "user",
"content": f"Classify this input as one of: pdf, html, json, xml, plain_text, image\nInput: {str(data)[:200]}\nReply with just the type."
}]
)
return response.content[0].text.strip().lower()
def _get_tools_for_type(self, input_type: str) -> list[Tool]:
"""Return appropriate tools for input type."""
tool_chains = {
"pdf": [self.tools["extract_pdf"], self.tools["parse"], self.tools["validate"]],
"html": [self.tools["parse_html"], self.tools["extract_content"], self.tools["validate"]],
"json": [self.tools["parse_json"], self.tools["validate"], self.tools["enrich"]],
"xml": [self.tools["parse_xml"], self.tools["transform"], self.tools["validate"]],
"plain_text": [self.tools["tokenize"], self.tools["extract_entities"], self.tools["enrich"]],
}
return tool_chains.get(input_type, [self.tools["parse"]])
Parallel Tool Execution with Result Aggregation
class ParallelAnalysisAgent:
"""Execute multiple analysis tools in parallel, aggregate results."""
def __init__(self, client):
self.client = client
self.sentiment_tool = Tool(name="sentiment", func=self._analyze_sentiment)
self.entity_tool = Tool(name="entities", func=self._extract_entities)
self.keyword_tool = Tool(name="keywords", func=self._extract_keywords)
self.summary_tool = Tool(name="summary", func=self._generate_summary)
async def analyze_text(self, text: str) -> dict:
"""Run multiple analyses in parallel."""
# Execute all tools concurrently
results = await asyncio.gather(
self.sentiment_tool.invoke(text),
self.entity_tool.invoke(text),
self.keyword_tool.invoke(text),
self.summary_tool.invoke(text)
)
# Aggregate
return self._aggregate_results(results)
async def _analyze_sentiment(self, text: str) -> dict:
response = self.client.messages.create(
model="claude-haiku-4",
max_tokens=100,
messages=[{
"role": "user",
"content": f"Analyze sentiment. Reply JSON: {{'sentiment': 'positive/negative/neutral', 'score': 0.0-1.0}}\nText: {text}"
}]
)
return json.loads(response.content[0].text)
async def _extract_entities(self, text: str) -> dict:
response = self.client.messages.create(
model="claude-haiku-4",
max_tokens=200,
messages=[{
"role": "user",
"content": f"Extract entities (people, places, orgs). Reply JSON: {{'entities': [...]}}\nText: {text}"
}]
)
return json.loads(response.content[0].text)
async def _extract_keywords(self, text: str) -> dict:
response = self.client.messages.create(
model="claude-haiku-4",
max_tokens=100,
messages=[{
"role": "user",
"content": f"Extract top keywords. Reply JSON: {{'keywords': [...]}}\nText: {text}"
}]
)
return json.loads(response.content[0].text)
async def _generate_summary(self, text: str) -> dict:
response = self.client.messages.create(
model="claude-sonnet-4",
max_tokens=200,
messages=[{
"role": "user",
"content": f"Summarize in 2-3 sentences:\n{text}"
}]
)
return {"summary": response.content[0].text}
def _aggregate_results(self, results: list) -> dict:
"""Combine tool outputs into unified analysis."""
return {
"sentiment": results[0],
"entities": results[1],
"keywords": results[2],
"summary": results[3],
"combined_confidence": self._calculate_confidence(results)
}
def _calculate_confidence(self, results: list) -> float:
"""Estimate confidence based on agreement across tools."""
# Simple heuristic: if multiple tools found content, higher confidence
scores = [
len(results[1].get("entities", [])) / 10.0, # Entity count
len(results[2].get("keywords", [])) / 10.0, # Keyword count
abs(results[0].get("score", 0.5) - 0.5) * 2 # Sentiment clarity
]
return min(1.0, sum(scores) / len(scores))
Example: Document Processing Pipeline
# Define individual tools
extract_text_tool = Tool(
name="extract_text",
func=extract_text_from_pdf,
description="Extract raw text from PDF"
)
parse_json_tool = Tool(
name="parse_json",
func=parse_json_from_text,
description="Parse JSON blocks from text"
)
validate_schema_tool = Tool(
name="validate_schema",
func=validate_against_schema,
description="Validate parsed data against JSON schema"
)
store_tool = Tool(
name="store",
func=store_in_database,
description="Store validated data in database"
)
# Compose into pipeline
doc_pipeline = ToolPipeline([
extract_text_tool,
parse_json_tool,
validate_schema_tool,
store_tool
])
# Execute
result = await doc_pipeline.execute(pdf_path)
Error Handling in Pipelines
class RobustPipeline:
def __init__(self, tools: list[Tool], error_handlers: dict = None,
max_retries: int = 2, backoff_factor: float = 2.0):
self.tools = tools
self.error_handlers = error_handlers or {}
self.max_retries = max_retries
self.backoff_factor = backoff_factor
async def execute(self, data: Any) -> Any:
"""Execute with per-tool error handling and retry."""
result = data
for tool in self.tools:
result = await self._execute_with_retry(tool, result)
return result
async def _execute_with_retry(self, tool: Tool, data: Any) -> Any:
"""Execute tool with exponential backoff retry."""
for attempt in range(self.max_retries):
try:
return await tool.invoke(data)
except Exception as e:
if tool.name in self.error_handlers:
handler = self.error_handlers[tool.name]
return await handler(e, data)
if attempt < self.max_retries - 1:
wait_time = self.backoff_factor ** attempt
await asyncio.sleep(wait_time)
else:
raise
# Usage
def handle_parse_error(error: Exception, data: Any) -> Any:
"""Fallback: return data as raw string."""
return {"raw": data, "error": str(error), "fallback": True}
pipeline = RobustPipeline(
[extract_tool, parse_tool, validate_tool],
error_handlers={"parse_json": handle_parse_error},
max_retries=3
)
2. State Machines for Complex Workflows
Concept
State machines model complex workflows with discrete states, explicit transitions, and entry/exit actions. They’re more structured than ReAct loops and ideal for multi-step processes with strict sequencing (approval workflows, onboarding, order fulfillment).
When to Use
- Multi-step workflows with defined states (pending → approved → executing → complete)
- Workflows that must not skip steps or allow invalid transitions
- State persistence across sessions (e.g., workflow resumes where it left off)
- Workflows with entry/exit side effects (e.g., sending notifications on state change)
- Reduced cognitive load vs. ReAct: state and valid transitions are explicit
Implementation
from dataclasses import dataclass
from enum import Enum
from typing import Callable, Any
class WorkflowState(Enum):
PENDING = "pending"
APPROVED = "approved"
EXECUTING = "executing"
COMPLETE = "complete"
FAILED = "failed"
@dataclass
class StateTransition:
from_state: WorkflowState
to_state: WorkflowState
condition: Callable[[Any], bool] = None # Optional guard condition
action: Callable[[Any], Any] = None # Action on transition
class StateMachine:
def __init__(self, initial_state: WorkflowState,
transitions: list[StateTransition]):
self.current_state = initial_state
self.transitions = transitions
self.state_memory = {} # Persistent state data
self.history = [initial_state]
async def transition(self, trigger: str, context: Any) -> bool:
"""Attempt to transition based on trigger and context."""
valid_transitions = [
t for t in self.transitions
if t.from_state == self.current_state
]
for transition in valid_transitions:
# Check condition if present
if transition.condition and not transition.condition(context):
continue
# Execute action and transition
if transition.action:
await transition.action(context)
self.current_state = transition.to_state
self.history.append(self.current_state)
return True
return False # Invalid transition
def get_valid_transitions(self) -> list[str]:
"""Return list of valid next states from current state."""
return [
t.to_state.value for t in self.transitions
if t.from_state == self.current_state
]
def save_state(self) -> dict:
"""Serialize for persistence."""
return {
"current_state": self.current_state.value,
"history": [s.value for s in self.history],
"state_memory": self.state_memory
}
@classmethod
def load_state(cls, data: dict, transitions: list[StateTransition]) -> "StateMachine":
"""Deserialize from persistence."""
sm = cls(WorkflowState(data["current_state"]), transitions)
sm.history = [WorkflowState(s) for s in data["history"]]
sm.state_memory = data["state_memory"]
return sm
Example: Multi-Step Approval Workflow
async def send_approval_request(context: dict) -> None:
"""Send email to approver."""
recipient = context.get("approver_email")
await email_service.send(recipient, f"Approval needed for {context['request_id']}")
async def execute_request(context: dict) -> None:
"""Execute the approved request."""
request_id = context["request_id"]
await execute_service.run(request_id)
async def notify_completion(context: dict) -> None:
"""Notify stakeholders of completion."""
await email_service.send_all(context["stakeholders"],
f"Request {context['request_id']} completed")
async def log_failure(context: dict) -> None:
"""Log failure for audit."""
context["failure_reason"] = "Approval denied or timed out"
# Define transitions with entry/exit actions
transitions = [
StateTransition(
from_state=WorkflowState.PENDING,
to_state=WorkflowState.APPROVED,
condition=lambda ctx: ctx.get("is_approved") == True,
action=send_approval_request
),
StateTransition(
from_state=WorkflowState.APPROVED,
to_state=WorkflowState.EXECUTING,
action=execute_request
),
StateTransition(
from_state=WorkflowState.EXECUTING,
to_state=WorkflowState.COMPLETE,
action=notify_completion
),
StateTransition(
from_state=WorkflowState.PENDING,
to_state=WorkflowState.FAILED,
condition=lambda ctx: ctx.get("is_approved") == False,
action=log_failure
),
StateTransition(
from_state=WorkflowState.APPROVED,
to_state=WorkflowState.FAILED,
condition=lambda ctx: ctx.get("error") is not None,
action=log_failure
),
]
# Create and use
approval_workflow = StateMachine(WorkflowState.PENDING, transitions)
# Check what's next
assert approval_workflow.get_valid_transitions() == ["approved", "failed"]
# Transition with context
context = {"request_id": "REQ-123", "approver_email": "[email protected]",
"is_approved": True}
success = await approval_workflow.transition("approve", context)
# Persist
saved = approval_workflow.save_state()
# ... later, restore:
restored = StateMachine.load_state(saved, transitions)
3. Streaming Responses & Progressive Reasoning
Concept
Instead of waiting for the full LLM response, stream tokens as they arrive and feed real-time feedback to users. For RAG, retrieve documents incrementally as you generate (avoid bottleneck of retrieving everything upfront).
When to Use
- Long-form generation (articles, code, analysis) where users want to see progress
- Interactive debugging where user can interrupt mid-generation
- Cost-sensitive scenarios where you can stop early if answer is good enough
- RAG with many potential sources: retrieve and rank as you generate
- Progressive reasoning where intermediate steps feed back into reasoning
Token Streaming
import anthropic
async def stream_response(prompt: str) -> AsyncGenerator[str, None]:
"""Stream LLM response token by token."""
client = anthropic.Anthropic()
with client.messages.stream(
model="claude-sonnet-4",
max_tokens=2000,
messages=[{"role": "user", "content": prompt}]
) as stream:
for text in stream.text_stream:
yield text
# Usage: display as tokens arrive
async for token in stream_response("Explain quantum computing"):
print(token, end="", flush=True)
Streaming with Tool Use
async def stream_with_tools(prompt: str, tools: list[dict]) -> None:
"""Stream response while handling tools."""
client = anthropic.Anthropic()
with client.messages.stream(
model="claude-sonnet-4",
max_tokens=2000,
tools=tools,
messages=[{"role": "user", "content": prompt}]
) as stream:
for event in stream:
if event.type == "content_block_start":
if event.content_block.type == "tool_use":
print(f"\n[Calling {event.content_block.name}...]")
elif event.type == "content_block_delta":
if hasattr(event.delta, "text"):
print(event.delta.text, end="", flush=True)
Progressive Retrieval in RAG
async def progressive_rag(query: str, retriever) -> AsyncGenerator[str, None]:
"""RAG that retrieves documents progressively as it generates."""
client = anthropic.Anthropic()
# Start retrieval in background
retrieval_task = asyncio.create_task(
retriever.retrieve_top_k_async(query, k=10)
)
# Build system prompt with retrieved docs as they arrive
system_prompt = "You are a helpful assistant with access to documents."
# Stream initial response (may reference unconfirmed facts)
with client.messages.stream(
model="claude-sonnet-4",
max_tokens=2000,
system=system_prompt,
messages=[{"role": "user", "content": query}]
) as stream:
buffer = ""
for text in stream.text_stream:
buffer += text
yield text
# Check if retrieval is complete
if retrieval_task.done():
# Could refine answer with full context
pass
# Usage
async for chunk in progressive_rag("What's new in AI?", retriever):
print(chunk, end="", flush=True)
Streamed Tool Results as Context
async def stream_with_tool_feedback(query: str, tool_executor) -> None:
"""Stream response where tool results feed back as context."""
client = anthropic.Anthropic()
messages = [{"role": "user", "content": query}]
while True:
with client.messages.stream(
model="claude-sonnet-4",
max_tokens=1000,
tools=tool_executor.get_tool_definitions(),
messages=messages
) as stream:
response = stream.get_final_message()
# Check for tool use
tool_uses = [b for b in response.content if b.type == "tool_use"]
if not tool_uses:
# No more tools; print final response
for block in response.content:
if hasattr(block, "text"):
print(block.text)
break
# Execute tools and add results as new context
for tool_use in tool_uses:
result = await tool_executor.execute(tool_use.name, tool_use.input)
messages.append({"role": "assistant", "content": response.content})
messages.append({
"role": "user",
"content": [{
"type": "tool_result",
"tool_use_id": tool_use.id,
"content": str(result)
}]
})
print(f"\n[{tool_use.name} result: {result}]\n")
Real-Time Feedback Loop
class ProgressiveReasoner:
"""Streams reasoning and allows user interruption."""
def __init__(self, client, model: str = "claude-sonnet-4"):
self.client = client
self.model = model
self.thinking = ""
self.answer = ""
async def think_and_answer(self, question: str,
on_thinking: Callable[[str], None] = None,
on_answer: Callable[[str], None] = None) -> str:
"""Stream extended thinking and answer."""
with self.client.messages.stream(
model=self.model,
max_tokens=4000,
thinking={
"type": "enabled",
"budget_tokens": 3000
},
messages=[{"role": "user", "content": question}]
) as stream:
for event in stream:
if event.type == "content_block_start":
if event.content_block.type == "thinking":
print("[Thinking...]")
elif event.type == "content_block_delta":
if event.delta.type == "thinking_delta":
chunk = event.delta.thinking
self.thinking += chunk
if on_thinking:
on_thinking(chunk)
elif event.delta.type == "text_delta":
chunk = event.delta.text
self.answer += chunk
if on_answer:
on_answer(chunk)
print(chunk, end="", flush=True)
return self.answer
4. Adaptive Learning & User Feedback Integration
Concept
Agents improve over time by learning from interaction history. Extract successful patterns, update few-shot examples, and detect when domain drift requires retraining. Adaptive learning means the agent actively uses feedback to refine behavior, not just passively logging interactions.
When to Use
- Long-lived agents (chatbots, co-pilots) that interact with same user repeatedly
- Domain-specific tasks where agent learns user preferences and style
- Cost optimization: agent becomes more efficient over time
- Quality improvement: success rate increases with experience
- Drift detection: retraining signals when performance degrades
- User-driven refinement: agent adapts to individual user workflows
- Active learning: agent requests clarification on unclear feedback
Learning from User Feedback: Comprehensive Framework
class AdaptiveAgentWithFeedback:
"""Learn directly from user corrections and preferences."""
def __init__(self, base_model: str = "claude-sonnet-4"):
self.model = base_model
self.feedback_history: list[dict] = []
self.learned_rules: list[str] = []
self.style_preferences: dict = {}
self.failure_patterns: list[dict] = []
self.success_rate_window = deque(maxlen=50)
async def interact_with_learning(self, user_message: str,
system_context: str = "") -> str:
"""Interact and prepare to learn from feedback."""
# Build system prompt with learned rules
enriched_system = system_context
if self.learned_rules:
enriched_system += "\n\nLearned rules from user feedback:\n"
for rule in self.learned_rules[-5:]: # Last 5 rules
enriched_system += f"- {rule}\n"
if self.style_preferences:
enriched_system += "\nUser preferences:\n"
for pref, value in list(self.style_preferences.items())[-5:]:
enriched_system += f"- {pref}: {value}\n"
response = self.client.messages.create(
model=self.model,
max_tokens=1000,
system=enriched_system,
messages=[{"role": "user", "content": user_message}]
)
answer = response.content[0].text
# Store interaction for learning
interaction = {
"user_input": user_message,
"assistant_output": answer,
"timestamp": datetime.now().isoformat(),
"user_feedback": None,
"explicit_correction": None
}
self.feedback_history.append(interaction)
return answer
async def learn_from_correction(self, feedback: str, correction: str = None) -> None:
"""Learn when user provides explicit correction."""
if not self.feedback_history:
return
last_interaction = self.feedback_history[-1]
last_interaction["user_feedback"] = feedback
if correction:
last_interaction["explicit_correction"] = correction
# Extract what went wrong
await self._learn_from_error(
last_interaction["user_input"],
last_interaction["assistant_output"],
correction,
feedback
)
# Extract general preferences
prefs = await self._extract_preferences(feedback)
self.style_preferences.update(prefs)
async def learn_from_error(self, error_description: str, correct_behavior: str) -> None:
"""Learn from explicit error description."""
failure_pattern = {
"error": error_description,
"correct": correct_behavior,
"learned_at": datetime.now().isoformat(),
"applications": 0
}
self.failure_patterns.append(failure_pattern)
# Extract rule to prevent future errors
rule = await self._extract_preventive_rule(error_description, correct_behavior)
if rule:
self.learned_rules.append(rule)
async def learn_from_success(self, positive_feedback: str) -> None:
"""Reinforce successful patterns."""
if not self.feedback_history:
return
last_interaction = self.feedback_history[-1]
last_interaction["success"] = True
self.success_rate_window.append(True)
# Extract what made this successful
success_pattern = await self._extract_success_pattern(
last_interaction["user_input"],
last_interaction["assistant_output"],
positive_feedback
)
if success_pattern:
self.learned_rules.append(f"SUCCESS: {success_pattern}")
async def _learn_from_error(self, question: str, wrong_answer: str,
correct: str, feedback: str) -> None:
"""Extract learning from correction."""
response = self.client.messages.create(
model=self.model,
max_tokens=200,
messages=[{
"role": "user",
"content": f"""Analyze this correction and extract the mistake:
User question: {question}
Wrong answer: {wrong_answer}
Correct answer: {correct}
Feedback: {feedback}
What was the core mistake? Reply in one sentence."""
}]
)
analysis = response.content[0].text
self.failure_patterns.append({
"question": question,
"mistake": analysis,
"learned_at": datetime.now().isoformat(),
"times_corrected": 1
})
async def _extract_preventive_rule(self, error: str, correct: str) -> str:
"""Extract a rule to prevent this error."""
response = self.client.messages.create(
model=self.model,
max_tokens=100,
messages=[{
"role": "user",
"content": f"""Create a brief rule to prevent this error:
Error: {error}
Correct: {correct}
Reply with a rule in form: 'When X, always Y'"""
}]
)
return response.content[0].text
async def _extract_success_pattern(self, question: str, answer: str,
feedback: str) -> str:
"""Extract pattern that made response successful."""
response = self.client.messages.create(
model=self.model,
max_tokens=100,
messages=[{
"role": "user",
"content": f"""Extract what made this response successful:
Question: {question}
Answer: {answer}
Feedback: {feedback}
Reply with pattern in form: 'When X, use approach Y'"""
}]
)
return response.content[0].text
async def _extract_preferences(self, feedback: str) -> dict:
"""Extract user preferences from feedback."""
response = self.client.messages.create(
model=self.model,
max_tokens=100,
messages=[{
"role": "user",
"content": f"""Extract user preferences from this feedback:
{feedback}
Reply as JSON: {{'preference_key': 'value'}} or {{}}"""
}]
)
try:
return json.loads(response.content[0].text)
except:
return {}
def get_learning_summary(self) -> dict:
"""Report on agent's learning progress."""
total = len(self.feedback_history)
successes = sum(1 for f in self.feedback_history if f.get("success"))
return {
"total_interactions": total,
"success_rate": successes / total if total > 0 else 0,
"rules_learned": len(self.learned_rules),
"failure_patterns_discovered": len(self.failure_patterns),
"preferences_learned": len(self.style_preferences),
"recent_success_rate": sum(self.success_rate_window) / len(self.success_rate_window) if self.success_rate_window else 0
}
Active Learning: Requesting Clarification
class ActiveLearningAgent:
"""Ask for clarification when uncertain."""
def __init__(self, client, confidence_threshold: float = 0.6):
self.client = client
self.threshold = confidence_threshold
self.clarifications_requested = []
async def query_with_clarification(self, user_message: str) -> str:
"""Generate response but request clarification if uncertain."""
# Generate response
response_text = await self._generate_response(user_message)
# Estimate confidence
confidence = await self._estimate_confidence(user_message, response_text)
if confidence < self.threshold:
# Request clarification
clarification_q = await self._generate_clarification(user_message)
return {
"response": response_text,
"confidence": confidence,
"needs_clarification": True,
"clarification_question": clarification_q
}
return {
"response": response_text,
"confidence": confidence,
"needs_clarification": False
}
async def _generate_response(self, message: str) -> str:
response = self.client.messages.create(
model="claude-sonnet-4",
max_tokens=500,
messages=[{"role": "user", "content": message}]
)
return response.content[0].text
async def _estimate_confidence(self, message: str, response: str) -> float:
"""Estimate confidence in response."""
response_obj = self.client.messages.create(
model="claude-haiku-4",
max_tokens=50,
messages=[{
"role": "user",
"content": f"""Rate confidence (0-1) in this response.
User: {message}
Response: {response}
Reply with just a float 0-1"""
}]
)
try:
return float(response_obj.content[0].text.strip())
except:
return 0.5
async def _generate_clarification(self, message: str) -> str:
"""Ask clarifying question if uncertain."""
response = self.client.messages.create(
model="claude-haiku-4",
max_tokens=100,
messages=[{
"role": "user",
"content": f"""Generate one clarifying question to better understand this request:
{message}
Ask a specific, actionable question."""
}]
)
return response.content[0].text
Memory Update Pattern
class AdaptiveAgent:
"""Agent that learns from interactions."""
def __init__(self, base_model: str = "claude-sonnet-4"):
self.model = base_model
self.interaction_history: list[dict] = []
self.successful_patterns: list[str] = []
self.user_preferences: dict = {}
self.performance_metrics: dict = {
"total_interactions": 0,
"successful": 0,
"failed": 0,
}
async def interact(self, user_message: str,
feedback: str = None) -> str:
"""Interact and optionally learn from feedback."""
# Call LLM with historical context
system_prompt = self._build_system_prompt()
response = self.client.messages.create(
model=self.model,
max_tokens=1000,
system=system_prompt,
messages=[
{"role": "user", "content": user_message}
]
)
answer = response.content[0].text
# Log interaction
self.interaction_history.append({
"user": user_message,
"assistant": answer,
"feedback": feedback,
"timestamp": datetime.now().isoformat()
})
# Update on feedback
if feedback:
await self._learn_from_feedback(user_message, answer, feedback)
return answer
def _build_system_prompt(self) -> str:
"""Build system prompt with learned patterns."""
base = "You are a helpful assistant."
if self.successful_patterns:
base += "\n\nYou have learned these successful approaches:\n"
for pattern in self.successful_patterns[-5:]: # Last 5
base += f"- {pattern}\n"
if self.user_preferences:
base += "\n\nUser preferences:\n"
for pref, value in self.user_preferences.items():
base += f"- {pref}: {value}\n"
return base
async def _learn_from_feedback(self, question: str, answer: str,
feedback: str) -> None:
"""Extract patterns from feedback."""
if "good" in feedback.lower() or feedback.startswith("+"):
self.performance_metrics["successful"] += 1
# Extract what made this work
summary = await self._summarize_success(question, answer, feedback)
self.successful_patterns.append(summary)
else:
self.performance_metrics["failed"] += 1
self.performance_metrics["total_interactions"] += 1
# Extract preference
preference = await self._extract_preference(question, feedback)
if preference:
self.user_preferences.update(preference)
async def _summarize_success(self, question: str, answer: str,
feedback: str) -> str:
"""Summarize why this interaction succeeded."""
response = self.client.messages.create(
model=self.model,
max_tokens=100,
messages=[{
"role": "user",
"content": f"Summarize in one sentence what made this interaction successful:\n"
f"Q: {question}\nA: {answer}\nFeedback: {feedback}"
}]
)
return response.content[0].text
async def _extract_preference(self, question: str, feedback: str) -> dict:
"""Extract user preference from feedback."""
response = self.client.messages.create(
model=self.model,
max_tokens=100,
messages=[{
"role": "user",
"content": f"Extract any stated user preferences from this feedback:\n"
f"Question: {question}\nFeedback: {feedback}\n"
f"Reply with JSON: {{'preference_name': 'value'}} or {{}}"
}]
)
try:
return json.loads(response.content[0].text)
except:
return {}
def get_learning_report(self) -> dict:
"""Report on agent's learning."""
total = self.performance_metrics["total_interactions"]
if total == 0:
return {"status": "no interactions yet"}
success_rate = self.performance_metrics["successful"] / total
return {
"total_interactions": total,
"success_rate": success_rate,
"patterns_learned": len(self.successful_patterns),
"preferences_discovered": self.user_preferences,
"last_5_patterns": self.successful_patterns[-5:]
}
Drift Detection
class DriftDetector:
"""Monitor for distribution shift (when to retrain)."""
def __init__(self, window_size: int = 50, threshold: float = 0.15):
self.window_size = window_size
self.threshold = threshold # If success rate drops 15%, retrain
self.recent_results: list[bool] = []
self.baseline_success_rate: float = None
def record_result(self, success: bool) -> bool:
"""Record result, return True if drift detected."""
self.recent_results.append(success)
# Keep window size
if len(self.recent_results) > self.window_size:
self.recent_results.pop(0)
# Establish baseline on first run
if self.baseline_success_rate is None:
if len(self.recent_results) == self.window_size:
self.baseline_success_rate = sum(self.recent_results) / len(self.recent_results)
# Check for drift
if self.baseline_success_rate is not None and len(self.recent_results) >= 10:
current_rate = sum(self.recent_results) / len(self.recent_results)
drift = self.baseline_success_rate - current_rate
if drift > self.threshold:
return True # Retrain signal
return False
def reset_baseline(self):
"""Reset after retraining."""
self.baseline_success_rate = None
self.recent_results = []
Few-Shot Accumulation
class FewShotMemory:
"""Accumulate high-quality examples for in-context learning."""
def __init__(self, max_examples: int = 10):
self.examples: list[dict] = []
self.max_examples = max_examples
def add_example(self, input_text: str, output_text: str,
quality_score: float = 1.0) -> None:
"""Add example with quality weighting."""
self.examples.append({
"input": input_text,
"output": output_text,
"quality": quality_score,
"added": datetime.now().isoformat()
})
# Keep high-quality examples, remove low-quality old ones
self.examples.sort(key=lambda x: x["quality"], reverse=True)
if len(self.examples) > self.max_examples:
self.examples = self.examples[:self.max_examples]
def get_examples_for_prompt(self) -> str:
"""Format examples for system prompt."""
if not self.examples:
return ""
prompt = "\n\nHere are examples of good responses:\n"
for ex in self.examples[:3]: # Top 3
prompt += f"\nInput: {ex['input']}\nOutput: {ex['output']}\n"
return prompt
4.5. Multi-Model Orchestration
Concept
Route queries between different models strategically: fast models for simple tasks, powerful models for complex reasoning, specialized models for domain tasks. This enables cost reduction while maintaining quality.
When to Use
- Cost optimization: 80% of tasks can be handled by cheaper models
- Latency requirements: simple tasks go to fast models first
- Specialized models: use domain-specific models (code, math, reasoning)
- Cascading fallback: if simple model uncertain, escalate to powerful model
- A/B testing: compare outputs across models
- Multi-language support: different models for different languages
Router: Intelligent Model Selection
class ModelRouter:
"""Route queries to appropriate models based on complexity."""
def __init__(self):
self.models = {
"haiku": anthropic.Anthropic(api_key=os.getenv("ANTHROPIC_API_KEY")),
"sonnet": anthropic.Anthropic(api_key=os.getenv("ANTHROPIC_API_KEY")),
"opus": anthropic.Anthropic(api_key=os.getenv("ANTHROPIC_API_KEY"))
}
self.routing_stats = {"haiku": 0, "sonnet": 0, "opus": 0}
async def route_and_query(self, user_message: str) -> dict:
"""Analyze message complexity and route to appropriate model."""
# Classify complexity
complexity = await self._classify_complexity(user_message)
# Select model based on complexity
model = self._select_model(complexity)
# Query
response = self.models[model].messages.create(
model=self._get_model_id(model),
max_tokens=1000,
messages=[{"role": "user", "content": user_message}]
)
self.routing_stats[model] += 1
return {
"response": response.content[0].text,
"model_used": model,
"complexity": complexity,
"cost_estimate": self._estimate_cost(model, response.usage)
}
async def _classify_complexity(self, message: str) -> str:
"""Classify message as simple/medium/complex."""
# Use heuristics first (fast)
if len(message) < 50 and not any(word in message.lower()
for word in ["explain", "analyze", "solve", "code", "write"]):
return "simple"
# Use fast model for precise classification
response = self.models["haiku"].messages.create(
model="claude-haiku-4",
max_tokens=50,
messages=[{
"role": "user",
"content": f"""Classify as simple/medium/complex:
{message}
Reply with just one word."""
}]
)
return response.content[0].text.strip().lower()
def _select_model(self, complexity: str) -> str:
"""Select model based on complexity."""
if complexity == "simple":
return "haiku"
elif complexity == "medium":
return "sonnet"
else:
return "opus"
def _get_model_id(self, model: str) -> str:
model_ids = {
"haiku": "claude-haiku-4",
"sonnet": "claude-sonnet-4",
"opus": "claude-3-opus-20250219"
}
return model_ids[model]
def _estimate_cost(self, model: str, usage) -> float:
"""Estimate cost of query."""
pricing = {
"haiku": {"input": 0.00080, "output": 0.0024},
"sonnet": {"input": 0.003, "output": 0.015},
"opus": {"input": 0.015, "output": 0.075}
}
rates = pricing[model]
return (usage.input_tokens * rates["input"] +
usage.output_tokens * rates["output"]) / 1_000_000
def get_routing_stats(self) -> dict:
"""Get routing statistics."""
total = sum(self.routing_stats.values())
return {
"total_queries": total,
"distribution": {
k: v / total if total > 0 else 0
for k, v in self.routing_stats.items()
}
}
Cascading Fallback: Simple → Complex
class CascadingModelEnsemble:
"""Try fast model first, escalate to powerful model if uncertain."""
def __init__(self):
self.client_haiku = anthropic.Anthropic()
self.client_sonnet = anthropic.Anthropic()
self.escalation_count = 0
async def query_with_cascade(self, message: str,
confidence_threshold: float = 0.7) -> dict:
"""Query with cascade: haiku -> sonnet if needed."""
# Try fast model first
haiku_response = await self._query_model(
self.client_haiku,
"claude-haiku-4",
message
)
# Estimate confidence in haiku's response
confidence = await self._estimate_confidence(
message,
haiku_response,
use_fast_model=True
)
if confidence >= confidence_threshold:
return {
"response": haiku_response,
"model": "haiku",
"confidence": confidence,
"escalated": False,
"cost_optimized": True
}
# Escalate to powerful model
self.escalation_count += 1
sonnet_response = await self._query_model(
self.client_sonnet,
"claude-sonnet-4",
message
)
return {
"response": sonnet_response,
"model": "sonnet",
"confidence": 1.0, # Trust powerful model
"escalated": True,
"escalation_reason": f"haiku confidence was {confidence:.2f}",
"cost_premium": "Used more expensive model due to escalation"
}
async def _query_model(self, client, model: str, message: str) -> str:
response = client.messages.create(
model=model,
max_tokens=800,
messages=[{"role": "user", "content": message}]
)
return response.content[0].text
async def _estimate_confidence(self, message: str, response: str,
use_fast_model: bool = False) -> float:
"""Estimate confidence in response."""
model = "claude-haiku-4" if use_fast_model else "claude-sonnet-4"
client = self.client_haiku if use_fast_model else self.client_sonnet
response_obj = client.messages.create(
model=model,
max_tokens=50,
messages=[{
"role": "user",
"content": f"Rate confidence 0-1: '{response}' answers '{message}'\nReply: float"
}]
)
try:
return float(response_obj.content[0].text.strip())
except:
return 0.5
Hybrid Routing with Cost/Quality Trade-off
class HybridModelRouter:
"""Intelligent routing balancing cost and quality."""
def __init__(self, budget_cents: float = 100.0):
self.client = anthropic.Anthropic()
self.budget_remaining = budget_cents
self.usage_log = []
async def query_with_budget(self, message: str,
quality_requirement: str = "medium") -> dict:
"""Query respecting budget and quality requirements."""
# Select model based on budget and quality
model = self._select_model_for_budget(quality_requirement)
# Estimate cost
estimated_tokens = len(message) // 4 + 400 # Rough estimate
estimated_cost = self._estimate_cost(model, estimated_tokens)
if estimated_cost > self.budget_remaining:
# Switch to cheaper model
model = "claude-haiku-4"
# Query
response = self.client.messages.create(
model=model,
max_tokens=1000,
messages=[{"role": "user", "content": message}]
)
actual_cost = self._estimate_cost(model, response.usage)
self.budget_remaining -= actual_cost * 100 # Convert to cents
return {
"response": response.content[0].text,
"model": model,
"cost_cents": actual_cost * 100,
"budget_remaining_cents": self.budget_remaining,
"quality_delivered": self._estimate_quality(model)
}
def _select_model_for_budget(self, quality: str) -> str:
if quality == "high":
return "claude-sonnet-4"
elif quality == "medium":
return "claude-sonnet-4"
else:
return "claude-haiku-4"
def _estimate_cost(self, model: str, tokens: int) -> float:
rates = {
"claude-haiku-4": {"input": 0.00080, "output": 0.0024},
"claude-sonnet-4": {"input": 0.003, "output": 0.015}
}
rate = rates.get(model, rates["claude-haiku-4"])
# Assume 50/50 input/output
return (tokens * 0.5 * rate["input"] + tokens * 0.5 * rate["output"]) / 1000
def _estimate_quality(self, model: str) -> str:
if "haiku" in model:
return "fast"
elif "sonnet" in model:
return "balanced"
else:
return "high"
Multi-Model Consensus for High-Stakes Decisions
class ConsensusOracle:
"""Query multiple models, return consensus on critical decisions."""
def __init__(self):
self.client = anthropic.Anthropic()
self.models = [
"claude-haiku-4",
"claude-sonnet-4",
]
async def get_consensus(self, question: str,
required_agreement: float = 0.7) -> dict:
"""Get consensus across models."""
responses = await asyncio.gather(*[
self._query_model(model, question)
for model in self.models
])
# Calculate agreement (using embeddings)
agreement = await self._calculate_agreement(responses)
return {
"responses": responses,
"consensus_confidence": agreement,
"consensus_reached": agreement >= required_agreement,
"recommendation": self._select_best(responses) if agreement >= required_agreement else "unclear"
}
async def _query_model(self, model: str, question: str) -> str:
response = self.client.messages.create(
model=model,
max_tokens=500,
messages=[{"role": "user", "content": question}]
)
return response.content[0].text
async def _calculate_agreement(self, responses: list) -> float:
"""Calculate agreement between responses using semantic similarity."""
if len(responses) < 2:
return 1.0
# Simple implementation: check if responses are similar
# In production, use embedding similarity
return 0.8 # Placeholder
def _select_best(self, responses: list) -> str:
"""Select best response (shortest + most confident)."""
return min(responses, key=lambda x: len(x))
5. Tool Composition for Code Generation
Concept
Code generation benefits from multiple passes: generate → test → fix → refine. Feedback from testing guides refinement, avoiding single-shot code that breaks.
When to Use
- Code generation where correctness must be verified
- Incremental code building (start with skeleton, add features)
- Refactoring where existing tests validate changes
- Multi-language generation with language-specific validators
Generate → Test → Fix Loop
class CodeGenerationLoop:
"""Multi-pass code generation with testing."""
def __init__(self, client, model: str = "claude-sonnet-4"):
self.client = client
self.model = model
self.max_iterations = 3
async def generate_and_refine(self, spec: str,
test_function: Callable,
language: str = "python") -> str:
"""Generate code, test it, refine based on failures."""
code = await self._generate(spec, language)
for iteration in range(self.max_iterations):
# Test current code
test_result = await test_function(code)
if test_result["passed"]:
return code
# Refine based on error
code = await self._refine(
spec, code, test_result["error"], language, iteration
)
# Return best effort
return code
async def _generate(self, spec: str, language: str) -> str:
"""Generate initial code."""
response = self.client.messages.create(
model=self.model,
max_tokens=2000,
messages=[{
"role": "user",
"content": f"Write {language} code for: {spec}\n\nReturn only the code, no explanation."
}]
)
return response.content[0].text
async def _refine(self, spec: str, code: str, error: str,
language: str, iteration: int) -> str:
"""Refine code based on test failure."""
response = self.client.messages.create(
model=self.model,
max_tokens=2000,
messages=[{
"role": "user",
"content": f"""Fix this {language} code.
Spec: {spec}
Current code:
{code}
Error from testing:
{error}
Iteration {iteration + 1}/{self.max_iterations}. Return only the corrected code."""
}]
)
return response.content[0].text
# Usage
code_gen = CodeGenerationLoop(client)
async def test_python_code(code: str) -> dict:
"""Test generated Python code."""
try:
exec_globals = {}
exec(code, exec_globals)
# Assume generated code defines a function `solve`
if "solve" in exec_globals:
result = exec_globals["solve"]()
return {"passed": True, "result": result}
except Exception as e:
return {"passed": False, "error": str(e)}
return {"passed": False, "error": "No solve function defined"}
final_code = await code_gen.generate_and_refine(
"Write a function that returns the nth Fibonacci number",
test_python_code,
language="python"
)
Incremental Code Building
class IncrementalCodeBuilder:
"""Build code piece by piece."""
def __init__(self, client, model: str = "claude-sonnet-4"):
self.client = client
self.model = model
self.code_sections: dict[str, str] = {}
async def build_section(self, name: str, spec: str,
context: str = "") -> str:
"""Add one section of code."""
existing = "\n\n".join(self.code_sections.values())
response = self.client.messages.create(
model=self.model,
max_tokens=1500,
messages=[{
"role": "user",
"content": f"""Build the '{name}' section for this project.
Previous sections:
{existing}
Context: {context}
Spec for this section: {spec}
Return only the code for this section."""
}]
)
section_code = response.content[0].text
self.code_sections[name] = section_code
return section_code
def get_full_code(self) -> str:
"""Get all sections combined."""
return "\n\n".join(self.code_sections.values())
# Usage
builder = IncrementalCodeBuilder(client)
await builder.build_section("imports", "All imports needed")
await builder.build_section("models", "Pydantic models for data validation")
await builder.build_section("api_routes", "FastAPI route handlers")
await builder.build_section("main", "Main app setup")
full_code = builder.get_full_code()
6. Knowledge Distillation
Concept
Use a small, fast model to learn patterns from a large model. Useful for reducing latency and cost while preserving quality for routine tasks.
When to Use
- Build a small model to handle 80% of routine queries (save large model for edge cases)
- Offline: distill knowledge to a model that runs locally
- Fine-tuning with synthetic data: large model generates examples to train small model
- Cascading: try small model first, escalate to large model if uncertain
Synthetic Data Generation for Distillation
class DistillationTrainer:
"""Generate synthetic training data from large model."""
def __init__(self, large_client, small_model: str = "claude-haiku-4"):
self.large_client = large_client
self.small_model = small_model
self.training_data: list[dict] = []
async def generate_training_data(self, task_description: str,
num_examples: int = 50) -> list[dict]:
"""Use large model to generate training examples."""
for i in range(num_examples):
# Large model generates diverse examples
response = self.large_client.messages.create(
model="claude-sonnet-4",
max_tokens=500,
messages=[{
"role": "user",
"content": f"""Generate an example for training a small model on this task:
Task: {task_description}
Example {i+1}/{num_examples}. Return JSON: {{"input": "...", "output": "..."}}"""
}]
)
try:
example = json.loads(response.content[0].text)
self.training_data.append(example)
except:
pass
return self.training_data
async def evaluate_small_model(self) -> dict:
"""Evaluate small model vs large model on test cases."""
# Use 10% of data as test
test_data = self.training_data[:len(self.training_data) // 10]
matches = 0
for example in test_data:
small_response = self.small_client.messages.create(
model=self.small_model,
max_tokens=200,
messages=[{"role": "user", "content": example["input"]}]
)
small_output = small_response.content[0].text
# Compare (simple string match; could use embedding similarity)
if small_output.strip() == example["output"].strip():
matches += 1
accuracy = matches / len(test_data) if test_data else 0
return {
"test_cases": len(test_data),
"matches": matches,
"accuracy": accuracy
}
# Usage
distiller = DistillationTrainer(large_client)
await distiller.generate_training_data(
"Classify sentiment in customer reviews",
num_examples=100
)
accuracy = await distiller.evaluate_small_model()
print(f"Small model accuracy: {accuracy['accuracy']:.2%}")
Temperature-Based Distillation
class TemperatureDistillation:
"""Use higher temperature to generate diverse examples."""
@staticmethod
async def generate_with_temperature(client, prompt: str,
temperature: float = 1.0) -> str:
"""Generate with specified temperature for diversity."""
response = client.messages.create(
model="claude-sonnet-4",
max_tokens=500,
temperature=temperature, # Higher = more creative/diverse
messages=[{"role": "user", "content": prompt}]
)
return response.content[0].text
@staticmethod
async def ensemble_and_select(client, prompt: str,
num_samples: int = 5) -> list[str]:
"""Generate multiple responses at high temperature, select best."""
samples = []
# Generate diverse samples
for _ in range(num_samples):
sample = await TemperatureDistillation.generate_with_temperature(
client, prompt, temperature=0.8
)
samples.append(sample)
# Score and select best (could use embedding similarity, BERTScore, etc.)
# Here: use length and content heuristics
scored = sorted(
samples,
key=lambda x: (len(x), x.count("."), x.count(",")),
reverse=True
)
return scored[:max(1, num_samples // 2)] # Return top half
6.5. Confidence Scoring & Uncertainty Quantification
Concept
Agents should know what they don’t know. Confidence scoring estimates how likely an answer is correct, enabling systems to escalate uncertain decisions, request clarification, or combine with other agents.
When to Use
- User-facing systems: show confidence to users
- Escalation workflows: hand off uncertain cases to human review
- Ensemble voting: weight votes by model confidence
- Cascading models: escalate when confidence is low
- Decision quality: track correlation between confidence and accuracy
Bayesian Confidence Estimation
class ConfidenceScorer:
"""Estimate confidence in LLM responses."""
def __init__(self, client):
self.client = client
self.calibration_data: list[dict] = []
async def score_confidence(self, question: str, response: str,
method: str = "ensemble") -> dict:
"""Estimate confidence in response using multiple methods."""
if method == "ensemble":
return await self._ensemble_score(question, response)
elif method == "bayesian":
return await self._bayesian_score(question, response)
else:
return await self._heuristic_score(question, response)
async def _ensemble_score(self, question: str, response: str) -> dict:
"""Combine multiple scoring methods."""
# Method 1: Direct confidence from LLM
direct = await self._direct_confidence(question, response)
# Method 2: Check for hedging language
hedging = self._analyze_hedging(response)
# Method 3: Cross-check with follow-up query
consistency = await self._consistency_check(question, response)
# Combine scores
final_score = (direct * 0.4 + (1 - hedging) * 0.3 + consistency * 0.3)
return {
"overall_confidence": final_score,
"direct_confidence": direct,
"hedging_score": hedging,
"consistency": consistency,
"method": "ensemble"
}
async def _direct_confidence(self, question: str, response: str) -> float:
"""Ask LLM directly: how confident are you?"""
confidence_response = self.client.messages.create(
model="claude-haiku-4",
max_tokens=50,
messages=[{
"role": "user",
"content": f"""On a scale of 0-1, how confident are you in this answer?
Question: {question}
Your answer: {response}
Respond with just a number 0-1"""
}]
)
try:
return float(confidence_response.content[0].text.strip())
except:
return 0.5
def _analyze_hedging(self, response: str) -> float:
"""Detect hedging language ('maybe', 'probably', 'could')."""
hedging_words = [
'maybe', 'possibly', 'might', 'could', 'perhaps',
'probably', 'somewhat', 'relatively', 'fairly', 'rather'
]
response_lower = response.lower()
hedging_count = sum(1 for word in hedging_words if f" {word} " in f" {response_lower} ")
# Normalize to 0-1
hedging_score = min(1.0, hedging_count / 3.0)
return hedging_score
async def _consistency_check(self, question: str, response: str) -> float:
"""Ask same question again, compare responses."""
# Generate follow-up response
follow_up = self.client.messages.create(
model="claude-sonnet-4",
max_tokens=500,
messages=[{"role": "user", "content": question}]
)
follow_up_text = follow_up.content[0].text
# Compare using simple similarity
response_words = set(response.lower().split())
follow_up_words = set(follow_up_text.lower().split())
intersection = len(response_words & follow_up_words)
union = len(response_words | follow_up_words)
similarity = intersection / union if union > 0 else 0
return similarity
async def _bayesian_score(self, question: str, response: str) -> dict:
"""Bayesian belief update on response quality."""
# Prior: assume 60% confidence (neutral)
prior = 0.6
# Likelihood factors
factors = {
"hedging": await self._direct_confidence(question, response),
"length": min(1.0, len(response) / 500.0), # Longer often better
"cites_sources": 1.0 if any(word in response.lower() for word in ["according", "research", "study"]) else 0.5,
"specific": 1.0 if any(char.isdigit() for char in response) else 0.7
}
# Combine likelihood
likelihood = sum(factors.values()) / len(factors)
# Bayesian update (simplified)
posterior = (likelihood * prior) / (likelihood * prior + (1 - likelihood) * (1 - prior))
return {
"overall_confidence": posterior,
"factors": factors,
"method": "bayesian"
}
async def _heuristic_score(self, question: str, response: str) -> dict:
"""Simple heuristic confidence."""
score = 0.5
# Longer responses often more confident
score += min(0.2, len(response) / 1000.0)
# Less hedging = more confident
hedging = self._analyze_hedging(response)
score += (1 - hedging) * 0.2
# Specific details boost confidence
if any(char.isdigit() for char in response):
score += 0.1
return {
"overall_confidence": min(1.0, score),
"method": "heuristic"
}
def calibrate(self, question: str, response: str, actual_quality: bool) -> None:
"""Learn calibration: record when confidence matches accuracy."""
self.calibration_data.append({
"question": question,
"response": response,
"was_correct": actual_quality,
"timestamp": datetime.now().isoformat()
})
def get_calibration_report(self) -> dict:
"""Report on confidence calibration."""
if not self.calibration_data:
return {"status": "no calibration data"}
correct = sum(1 for d in self.calibration_data if d["was_correct"])
total = len(self.calibration_data)
return {
"total_samples": total,
"accuracy": correct / total,
"calibration_quality": "good" if abs(correct / total - 0.75) < 0.1 else "poor"
}
7. Reasoning Frameworks & Multi-Model Reasoning
Concept
Different reasoning tasks benefit from different frameworks: ReAct for sequential reasoning, Chain-of-Thought for step-by-step logic, Tree-of-Thought for exploring alternatives. Advanced agents dynamically select the best framework based on problem complexity and type.
When to Use
- Complex problem solving: use Tree-of-Thought to explore alternatives
- Step-by-step reasoning: Chain-of-Thought for transparency and accuracy
- Interactive workflows: ReAct with tool use for multi-step tasks
- Verification: Reflexion framework to self-correct and improve
- Mathematical/logical problems: specialized reasoning frameworks
- Unknown problem types: dynamic framework selection system
- High-stakes decisions: verification loop with multiple frameworks
Dynamic Framework Selection
class DynamicReasoningAgent:
"""Select and combine reasoning frameworks based on task."""
def __init__(self, client):
self.client = client
self.framework_success_rates = {
"chain_of_thought": 0.75,
"tree_of_thought": 0.82,
"react": 0.70,
"step_by_step": 0.78
}
async def solve_with_best_framework(self, problem: str) -> dict:
"""Classify problem and apply best reasoning framework."""
# Step 1: Classify problem type
problem_type = await self._classify_problem(problem)
# Step 2: Select framework based on type and historical success
framework = self._select_framework(problem_type)
# Step 3: Apply framework
result = await framework(problem)
return {
"problem_type": problem_type,
"framework": framework.__name__,
"solution": result["answer"],
"reasoning_steps": result.get("steps", []),
"confidence": result.get("confidence", 0.7)
}
async def _classify_problem(self, problem: str) -> str:
"""Classify as: math, logic, creative, code, analysis, or planning."""
response = self.client.messages.create(
model="claude-haiku-4",
max_tokens=50,
messages=[{
"role": "user",
"content": f"""Classify problem as one of: math, logic, code, creative, analysis, planning
Problem: {problem[:200]}
Reply with just one word."""
}]
)
return response.content[0].text.strip().lower()
def _select_framework(self, problem_type: str):
"""Select reasoning framework for problem type."""
frameworks = {
"math": self._chain_of_thought,
"logic": self._tree_of_thought,
"code": self._step_by_step,
"creative": self._multi_path_exploration,
"analysis": self._react_framework,
"planning": self._tree_of_thought
}
return frameworks.get(problem_type, self._react_framework)
async def _chain_of_thought(self, problem: str) -> dict:
"""Linear step-by-step reasoning."""
response = self.client.messages.create(
model="claude-sonnet-4",
max_tokens=2000,
messages=[{
"role": "user",
"content": f"""Solve by reasoning step-by-step:
{problem}
Format:
Step 1: [reason]
Step 2: [reason]
...
Final Answer: [answer]"""
}]
)
answer = response.content[0].text
steps = [s.strip() for s in answer.split('\n') if s.strip()]
return {
"answer": answer,
"steps": steps,
"confidence": 0.75
}
async def _tree_of_thought(self, problem: str) -> dict:
"""Explore multiple solution paths, evaluate and select best."""
response = self.client.messages.create(
model="claude-sonnet-4",
max_tokens=2500,
messages=[{
"role": "user",
"content": f"""Solve by exploring multiple approaches:
{problem}
For 3 different approaches:
Approach 1: [description]
Pros: [list pros]
Cons: [list cons]
Score: [0-10]
Then recommend the best approach with final answer."""
}]
)
return {
"answer": response.content[0].text,
"confidence": 0.82
}
async def _step_by_step(self, problem: str) -> dict:
"""Structured approach for technical/code problems."""
response = self.client.messages.create(
model="claude-sonnet-4",
max_tokens=2000,
messages=[{
"role": "user",
"content": f"""Solve structurally:
{problem}
1. Understand the problem
2. Design approach
3. Implement solution
4. Test solution
5. Summary and recommendations"""
}]
)
return {
"answer": response.content[0].text,
"confidence": 0.78
}
async def _multi_path_exploration(self, problem: str) -> dict:
"""Generate multiple creative solutions."""
solutions = []
for i in range(3):
response = self.client.messages.create(
model="claude-sonnet-4",
max_tokens=600,
temperature=0.7 + (i * 0.2), # Increase creativity
messages=[{
"role": "user",
"content": f"Solution path {i+1} for: {problem}"
}]
)
solutions.append(response.content[0].text)
return {
"answer": f"Generated {len(solutions)} creative approaches",
"solutions": solutions,
"confidence": 0.65
}
async def _react_framework(self, problem: str) -> dict:
"""Reasoning + Action loop for interactive tasks."""
response = self.client.messages.create(
model="claude-sonnet-4",
max_tokens=2000,
messages=[{
"role": "user",
"content": f"""Solve using Reason-Act loop:
{problem}
Format:
Thought: [what to do]
Action: [tool to use or action]
Observation: [result]
...repeat...
Final Answer: [answer]"""
}]
)
return {
"answer": response.content[0].text,
"confidence": 0.70
}
Combining Frameworks: Verification Loop
class VerificationLoop:
"""Generate answer with one framework, verify with another."""
def __init__(self, client):
self.client = client
async def solve_and_verify(self, problem: str) -> dict:
"""Solve with framework A, verify with framework B."""
# Generate solution using primary framework
primary_framework = DynamicReasoningAgent(self.client)
solution = await primary_framework._chain_of_thought(problem)
# Verify using independent reasoning
verification = await self._verify_solution(problem, solution["answer"])
if not verification["is_correct"]:
# Retry with alternative framework
alternative = await primary_framework._tree_of_thought(problem)
return {
"primary_solution": solution["answer"],
"verification": verification,
"alternative_solution": alternative["answer"],
"recommendation": "Use alternative (primary failed verification)",
"final_confidence": 0.5
}
return {
"solution": solution["answer"],
"verification": verification,
"confidence": verification.get("confidence", 0.8),
"verified": True
}
async def _verify_solution(self, problem: str, solution: str) -> dict:
"""Check if solution is logically sound."""
response = self.client.messages.create(
model="claude-sonnet-4",
max_tokens=300,
messages=[{
"role": "user",
"content": f"""Verify this solution:
Problem: {problem}
Proposed solution: {solution}
Check:
1. Is it logically sound?
2. Are there errors?
3. Is it complete?
Reply JSON: {{'is_correct': true/false, 'issues': [...], 'confidence': 0.0-1.0}}"""
}]
)
try:
return json.loads(response.content[0].text)
except:
return {
"is_correct": True,
"issues": [],
"confidence": 0.6
}
8. Ensemble Methods
Concept
Multiple agents vote on the answer. Ensembles reduce errors through diversity: different agents make different mistakes, but when they agree, confidence is high.
When to Use
- High-stakes decisions where accuracy matters (medical, legal, financial)
- Reducing hallucinations: ensemble requires agreement
- Diverse perspectives: different agents specialized in different domains
- Calibrated confidence: agreement rate = confidence
Voting Ensemble
class VotingEnsemble:
"""Multiple agents vote on best answer."""
def __init__(self, clients: list, models: list[str],
voting_method: str = "majority"):
self.clients = clients
self.models = models
self.voting_method = voting_method
assert len(clients) == len(models)
async def query(self, question: str) -> dict:
"""Query all agents and aggregate votes."""
# Parallel queries
tasks = [
self._query_agent(client, model, question)
for client, model in zip(self.clients, self.models)
]
responses = await asyncio.gather(*tasks)
# Aggregate
if self.voting_method == "majority":
return self._majority_vote(responses)
elif self.voting_method == "weighted":
return self._weighted_vote(responses)
else:
raise ValueError(f"Unknown voting method: {self.voting_method}")
async def _query_agent(self, client, model: str, question: str) -> str:
"""Query one agent."""
response = client.messages.create(
model=model,
max_tokens=500,
messages=[{"role": "user", "content": question}]
)
return response.content[0].text
def _majority_vote(self, responses: list[str]) -> dict:
"""Select response that's most common."""
# For text, use embedding similarity to cluster
embeddings = [self._embed(r) for r in responses]
clusters = self._cluster_embeddings(embeddings)
# Pick largest cluster
largest_cluster = max(clusters, key=len)
representative = responses[largest_cluster[0]]
agreement = len(largest_cluster) / len(responses)
return {
"answer": representative,
"agreement": agreement,
"votes": len(largest_cluster),
"total_agents": len(responses)
}
def _weighted_vote(self, responses: list[str]) -> dict:
"""Weight votes by model capability."""
# Assume later models are better
weights = {i: i + 1 for i in range(len(self.models))}
# Cluster and weight
embeddings = [self._embed(r) for r in responses]
clusters = self._cluster_embeddings(embeddings)
weighted_clusters = {
i: sum(weights.get(idx, 1) for idx in cluster)
for i, cluster in enumerate(clusters)
}
best_cluster = max(weighted_clusters, key=weighted_clusters.get)
representative = responses[clusters[best_cluster][0]]
confidence = weighted_clusters[best_cluster] / sum(weights.values())
return {
"answer": representative,
"confidence": confidence,
"weighted_votes": weighted_clusters[best_cluster],
"total_weight": sum(weights.values())
}
def _embed(self, text: str) -> list[float]:
"""Get embedding using sentence-transformers (or OpenAI, Anthropic embeddings)."""
# Production implementation options:
# 1. sentence-transformers (local): from sentence_transformers import SentenceTransformer
# 2. OpenAI API: from openai import OpenAI
# 3. Anthropic API: client.messages.embed() - coming in Anthropic SDK
# Example: sentence-transformers (recommended for local inference)
try:
from sentence_transformers import SentenceTransformer
model = SentenceTransformer('all-MiniLM-L6-v2') # Fast, 384-dim
embedding = model.encode(text, convert_to_tensor=False)
return embedding.tolist()
except ImportError:
# Fallback: OpenAI embeddings (requires API key)
# from openai import OpenAI
# client = OpenAI()
# response = client.embeddings.create(
# model="text-embedding-3-small",
# input=text
# )
# return response.data[0].embedding
# If no embedding library available, raise error
raise ImportError(
"Please install sentence-transformers: pip install sentence-transformers\n"
"Or configure OpenAI embeddings API in code above"
)
def _cluster_embeddings(self, embeddings: list, threshold: float = 0.5) -> list[list[int]]:
"""Cluster similar embeddings."""
clusters = []
for i, emb_i in enumerate(embeddings):
found_cluster = False
for cluster in clusters:
# Compare to first element of cluster
emb_ref = embeddings[cluster[0]]
distance = self._euclidean_distance(emb_i, emb_ref)
if distance < threshold:
cluster.append(i)
found_cluster = True
break
if not found_cluster:
clusters.append([i])
return clusters
@staticmethod
def _euclidean_distance(v1: list[float], v2: list[float]) -> float:
"""Compute Euclidean distance."""
return (sum((a - b) ** 2 for a, b in zip(v1, v2))) ** 0.5
# Usage
clients = [anthropic.Anthropic() for _ in range(3)]
models = [
"claude-sonnet-4",
"claude-sonnet-4",
"claude-haiku-4"
]
ensemble = VotingEnsemble(clients, models, voting_method="weighted")
result = await ensemble.query("What are the main causes of climate change?")
print(f"Answer: {result['answer']}")
print(f"Agreement: {result['confidence']:.2%}")
8. Caching & Memoization
Concept
Cache tool results and LLM responses to avoid redundant computation, reduce cost, and improve latency.
When to Use
- Repeated queries (same question asked multiple times)
- Expensive tools: cache their results
- RAG: cache retrieved documents across queries
- Session-based: cache knowledge within a conversation
- TTL-based: invalidate old caches periodically
Tool Result Memoization
from functools import lru_cache
import hashlib
import json
class MemoizedToolExecutor:
"""Cache tool results with TTL and invalidation."""
def __init__(self, ttl_seconds: int = 3600):
self.cache: dict[str, dict] = {}
self.ttl_seconds = ttl_seconds
async def execute(self, tool_name: str, tool_input: Any) -> Any:
"""Execute tool, returning cached result if available."""
cache_key = self._make_key(tool_name, tool_input)
if cache_key in self.cache:
cached = self.cache[cache_key]
age = (datetime.now() - cached["timestamp"]).total_seconds()
if age < self.ttl_seconds:
return cached["result"]
else:
del self.cache[cache_key] # Expired
# Execute tool
result = await self._execute_tool(tool_name, tool_input)
# Cache result
self.cache[cache_key] = {
"result": result,
"timestamp": datetime.now()
}
return result
async def _execute_tool(self, tool_name: str, tool_input: Any) -> Any:
"""Placeholder: actual tool execution."""
# Implement based on your tool registry
pass
def _make_key(self, tool_name: str, tool_input: Any) -> str:
"""Create cache key from tool name and input."""
input_str = json.dumps(tool_input, sort_keys=True)
input_hash = hashlib.md5(input_str.encode()).hexdigest()
return f"{tool_name}:{input_hash}"
def invalidate(self, tool_name: str = None) -> int:
"""Invalidate cache entries."""
if tool_name is None:
count = len(self.cache)
self.cache.clear()
return count
keys_to_delete = [k for k in self.cache if k.startswith(f"{tool_name}:")]
for key in keys_to_delete:
del self.cache[key]
return len(keys_to_delete)
Prompt Caching (Batch Context)
class PromptCache:
"""Cache prompts with repeated context (documents, system prompts)."""
def __init__(self, client):
self.client = client
self.cached_contexts: dict[str, str] = {}
async def query_with_context(self, question: str, context_id: str,
context: str) -> str:
"""Query with cached context to reduce tokens."""
# Cache context if new
if context_id not in self.cached_contexts:
self.cached_contexts[context_id] = context
# Build messages with cache_control
messages = [
{
"role": "user",
"content": [
{
"type": "text",
"text": f"Context:\n{context}",
"cache_control": {"type": "ephemeral"} # Cache this
},
{
"type": "text",
"text": f"Question: {question}"
}
]
}
]
response = self.client.messages.create(
model="claude-sonnet-4",
max_tokens=500,
messages=messages
)
return response.content[0].text
# Usage
cache = PromptCache(client)
# First query: context is cached
result1 = await cache.query_with_context(
"What is mentioned?",
context_id="doc_123",
context="Large document content..."
)
# Second query: reuses cached context, fewer tokens
result2 = await cache.query_with_context(
"What about X?",
context_id="doc_123",
context="Large document content..."
)
Smart Cache Invalidation
class SmartCache:
"""Cache with dependency tracking and smart invalidation."""
def __init__(self):
self.cache: dict[str, Any] = {}
self.dependencies: dict[str, set[str]] = {} # key -> set of dependent keys
def put(self, key: str, value: Any, depends_on: list[str] = None) -> None:
"""Cache value with dependencies."""
self.cache[key] = value
if depends_on:
self.dependencies[key] = set(depends_on)
def get(self, key: str) -> Any:
"""Get cached value."""
return self.cache.get(key)
def invalidate(self, key: str, cascade: bool = True) -> int:
"""Invalidate key and optionally cascade to dependents."""
if key not in self.cache:
return 0
del self.cache[key]
invalidated = 1
if cascade:
# Find all keys that depend on this one
dependents = [k for k, deps in self.dependencies.items() if key in deps]
for dependent in dependents:
invalidated += self.invalidate(dependent, cascade=True)
return invalidated
# Usage
cache = SmartCache()
cache.put("user_123_raw", user_data)
cache.put("user_123_processed", process(user_data), depends_on=["user_123_raw"])
cache.put("report_summary", summarize(user_data), depends_on=["user_123_processed"])
# Invalidate user data: cascades to processed and report
cache.invalidate("user_123_raw", cascade=True) # Removes 3 entries
9. Advanced Memory Patterns
Concept
Memory goes beyond conversation history. Structure it into episodic (specific events), semantic (facts), and preference memory (what users like).
When to Use
- Long-lived agents that interact with same user over weeks/months
- Search and recall: “remind me about when we discussed X”
- Preference learning: agent recalls user’s style and preferences
- Debugging: episodic memory helps reproduce issues
Multi-Tier Memory System
from datetime import datetime
from typing import Any
class EpisodicMemory:
"""Store specific interactions and events."""
def __init__(self):
self.episodes: list[dict] = []
def record(self, event: str, context: dict, outcome: str) -> None:
"""Record an episode."""
self.episodes.append({
"timestamp": datetime.now().isoformat(),
"event": event,
"context": context,
"outcome": outcome
})
def recall(self, query: str, limit: int = 5) -> list[dict]:
"""Search episodic memory."""
# Simple keyword search; could use embeddings
results = []
for ep in self.episodes:
if query.lower() in ep["event"].lower():
results.append(ep)
return sorted(results, key=lambda x: x["timestamp"], reverse=True)[:limit]
class SemanticMemory:
"""Store extracted facts and knowledge."""
def __init__(self):
self.facts: dict[str, list[str]] = {} # topic -> facts
def add_fact(self, topic: str, fact: str) -> None:
"""Add a fact."""
if topic not in self.facts:
self.facts[topic] = []
self.facts[topic].append(fact)
def get_facts(self, topic: str) -> list[str]:
"""Retrieve facts on a topic."""
return self.facts.get(topic, [])
class PreferenceMemory:
"""Store user preferences and style."""
def __init__(self):
self.preferences: dict[str, Any] = {}
def set_preference(self, key: str, value: Any) -> None:
"""Record a preference."""
self.preferences[key] = value
def get_preference(self, key: str, default: Any = None) -> Any:
"""Retrieve preference."""
return self.preferences.get(key, default)
class IntegratedMemory:
"""Unified memory system."""
def __init__(self, client):
self.client = client
self.episodic = EpisodicMemory()
self.semantic = SemanticMemory()
self.preferences = PreferenceMemory()
async def process_interaction(self, question: str, answer: str,
feedback: str = None) -> None:
"""Process interaction and update all memory tiers."""
# Record episode
self.episodic.record(
event=question,
context={"timestamp": datetime.now()},
outcome=answer
)
# Extract semantic facts
facts = await self._extract_facts(question, answer)
for topic, fact in facts:
self.semantic.add_fact(topic, fact)
# Extract preferences
if feedback:
prefs = await self._extract_preferences(feedback)
for pref, value in prefs.items():
self.preferences.set_preference(pref, value)
async def _extract_facts(self, question: str, answer: str) -> list[tuple[str, str]]:
"""Extract key facts from interaction."""
response = self.client.messages.create(
model="claude-sonnet-4",
max_tokens=200,
messages=[{
"role": "user",
"content": f"""Extract key facts as (topic, fact) pairs from this interaction:
Q: {question}
A: {answer}
Return as JSON: [["topic", "fact"], ...]"""
}]
)
try:
return json.loads(response.content[0].text)
except:
return []
async def _extract_preferences(self, feedback: str) -> dict:
"""Extract user preferences from feedback."""
response = self.client.messages.create(
model="claude-sonnet-4",
max_tokens=100,
messages=[{
"role": "user",
"content": f"""Extract preferences from this feedback:
{feedback}
Return as JSON: {{"preference_key": "value", ...}} or {{}}"""
}]
)
try:
return json.loads(response.content[0].text)
except:
return {}
def build_memory_context(self) -> str:
"""Build context from memory for next interaction."""
context = "You have the following context about our interaction:\n\n"
# Recent episodes
recent = self.episodic.episodes[-3:] if self.episodic.episodes else []
if recent:
context += "Recent topics:\n"
for ep in recent:
context += f"- {ep['event']}\n"
# Known facts
if self.semantic.facts:
context += "\nKnown facts:\n"
for topic, facts in list(self.semantic.facts.items())[-3:]:
context += f"- {topic}: {', '.join(facts)}\n"
# Preferences
if self.preferences.preferences:
context += "\nYour preferences:\n"
for pref, value in self.preferences.preferences.items():
context += f"- {pref}: {value}\n"
return context
Memory Serialization
class MemoryManager:
"""Persist and load memory."""
def __init__(self, memory: IntegratedMemory, filepath: str):
self.memory = memory
self.filepath = filepath
def save(self) -> None:
"""Serialize memory to disk."""
data = {
"episodes": self.memory.episodic.episodes,
"facts": self.memory.semantic.facts,
"preferences": self.memory.preferences.preferences
}
with open(self.filepath, "w") as f:
json.dump(data, f, indent=2)
def load(self) -> None:
"""Deserialize memory from disk."""
with open(self.filepath, "r") as f:
data = json.load(f)
self.memory.episodic.episodes = data.get("episodes", [])
self.memory.semantic.facts = data.get("facts", {})
self.memory.preferences.preferences = data.get("preferences", {})
10. Performance Optimization
Concept
Optimize for speed, cost, and accuracy: parallelize, batch, terminate early, compute lazily.
When to Use
- Latency-critical paths: batch requests, parallelize tools
- Cost-sensitive: early termination, lazy evaluation
- Throughput: batch similar queries
- Quality: parallel validation, ensemble fallback
Parallelization Opportunities
class ParallelQueryExecutor:
"""Execute multiple related queries in parallel."""
def __init__(self, client, max_concurrent: int = 5):
self.client = client
self.semaphore = asyncio.Semaphore(max_concurrent)
async def execute_batch(self, queries: list[str]) -> list[str]:
"""Execute multiple queries concurrently."""
tasks = [self._bounded_query(q) for q in queries]
return await asyncio.gather(*tasks)
async def _bounded_query(self, query: str) -> str:
"""Execute single query with concurrency limit."""
async with self.semaphore:
response = self.client.messages.create(
model="claude-sonnet-4",
max_tokens=300,
messages=[{"role": "user", "content": query}]
)
return response.content[0].text
# Usage
executor = ParallelQueryExecutor(client, max_concurrent=5)
queries = ["What is X?", "What is Y?", "What is Z?"]
results = await executor.execute_batch(queries)
Batching & Optimization
class BatchProcessor:
"""Batch similar items for efficient processing."""
def __init__(self, client, batch_size: int = 10):
self.client = client
self.batch_size = batch_size
async def process_batch(self, items: list[str]) -> list[str]:
"""Process items in batches for efficiency."""
results = []
for i in range(0, len(items), self.batch_size):
batch = items[i : i + self.batch_size]
batch_results = await self._process_batch(batch)
results.extend(batch_results)
return results
async def _process_batch(self, batch: list[str]) -> list[str]:
"""Process one batch."""
# Send all items in one prompt for context efficiency
prompt = "Process these items:\n\n"
for i, item in enumerate(batch, 1):
prompt += f"{i}. {item}\n"
response = self.client.messages.create(
model="claude-sonnet-4",
max_tokens=2000,
messages=[{"role": "user", "content": prompt}]
)
# Parse results (depends on output format)
output = response.content[0].text
return output.split("\n") # Simplified
Early Termination
class EarlyTerminationExecutor:
"""Stop execution early if quality threshold is met."""
def __init__(self, client, quality_threshold: float = 0.95):
self.client = client
self.threshold = quality_threshold
async def query_until_confident(self, question: str) -> dict:
"""Query multiple times until confident."""
results = []
for attempt in range(5):
response = self.client.messages.create(
model="claude-sonnet-4",
max_tokens=500,
messages=[{"role": "user", "content": question}]
)
answer = response.content[0].text
confidence = await self._estimate_confidence(question, answer)
results.append({
"answer": answer,
"confidence": confidence,
"attempt": attempt + 1
})
# Early termination
if confidence >= self.threshold:
return results[-1]
# Return best result if threshold not met
return max(results, key=lambda x: x["confidence"])
async def _estimate_confidence(self, question: str, answer: str) -> float:
"""Estimate confidence in answer."""
response = self.client.messages.create(
model="claude-haiku-4", # Fast model
max_tokens=50,
messages=[{
"role": "user",
"content": f"Rate confidence (0-1) in this answer. Q: {question}\nA: {answer}\nReply with just a number."
}]
)
try:
return float(response.content[0].text.strip())
except:
return 0.5
Lazy Evaluation
class LazyEvaluator:
"""Compute only when needed."""
def __init__(self, client):
self.client = client
self.lazy_results: dict[str, Any] = {}
def lazy_query(self, query_id: str, question: str):
"""Register a lazy query."""
self.lazy_results[query_id] = {
"question": question,
"computed": False,
"result": None
}
async def get_result(self, query_id: str) -> str:
"""Compute on demand."""
if not self.lazy_results[query_id]["computed"]:
question = self.lazy_results[query_id]["question"]
response = self.client.messages.create(
model="claude-sonnet-4",
max_tokens=500,
messages=[{"role": "user", "content": question}]
)
self.lazy_results[query_id]["result"] = response.content[0].text
self.lazy_results[query_id]["computed"] = True
return self.lazy_results[query_id]["result"]
# Usage
lazy = LazyEvaluator(client)
# Register multiple lazy queries
lazy.lazy_query("q1", "What is AI?")
lazy.lazy_query("q2", "What is ML?")
# Only compute when needed
result_q1 = await lazy.get_result("q1") # Computes now
# result_q2 is not computed yet
Cross-References & Integration
- State Machines integrate with Tool Pipelines: pipelines as transition actions
- Memory Patterns integrate with Continual Learning: memory drives learning
- Ensemble Methods integrate with Caching: vote results are expensive, cache them
- Streaming integrates with Early Termination: stop streaming if confident
- Code Generation integrates with Tool Composition: test as a tool in pipeline
- Performance Optimization is orthogonal: apply to any pattern for speedup
Practical Integration Example: A Complex Agent
class SophisticatedAgent:
"""Integrate all patterns for a robust agent."""
def __init__(self, client):
self.client = client
self.pipeline = ToolPipeline([...]) # Tool composition
self.state_machine = StateMachine(...) # State management
self.memory = IntegratedMemory(client) # Multi-tier memory
self.drift_detector = DriftDetector() # Learning
self.cache = MemoizedToolExecutor() # Caching
async def interact(self, user_input: str, feedback: str = None):
"""One interaction with all patterns."""
# Check cache
cached = self.cache.get(user_input)
if cached:
return cached
# Update memory
await self.memory.process_interaction(user_input, "pending")
# State transition
await self.state_machine.transition("query", {"input": user_input})
# Execute pipeline (extract -> parse -> validate -> generate)
result = await self.pipeline.execute(user_input)
# Stream result
async for chunk in self._stream_result(result):
yield chunk
# Learn from feedback
if feedback:
await self.memory.process_interaction(user_input, result, feedback)
drift = self.drift_detector.record_result(
"good" in feedback.lower()
)
if drift:
print("[Learning: drift detected, consider retraining]")
# Cache result
self.cache.put(user_input, result)
11. Validation Checklist: Did You Implement This Correctly?
Tool Composition & Pipelines
- Each tool has clear input/output contracts (type hints)
- Error handlers exist for each tool that could fail
- Pipeline execution is logged (for debugging, not production spam)
- You tested the pipeline with real data, not just mock data
- Tool order is optimal (cheap/fast tools before expensive ones)
- Fallback chains have sensible ordering (try more likely options first)
- Parallel execution only used for truly independent operations
- You’ve profiled: which tools take longest? Consider optimization
- Data flowing through pipeline doesn’t balloon in size
- You handle partial failures gracefully (not all-or-nothing)
State Machines
- All valid transitions are defined (no missing edges)
- Invalid transitions are explicitly forbidden (state validation)
- Entry/exit actions are idempotent (safe to retry if partial failure)
- State history is immutable (for audit/replay)
- You can resume from any state (serialization/deserialization works)
- Transition conditions are testable in isolation
- You’ve documented state diagram visually or in text
- Dead states are handled (states with no outgoing transitions)
- Timeout transitions exist (don’t get stuck in states)
- You’ve tested edge cases: rapid transitions, invalid inputs
Streaming & Progressive Reasoning
- You handle stream interruption gracefully (partial output is valid)
- Buffer management: you don’t blow up memory with large streams
- Token counting is accurate (for billing/quota management)
- Streaming used only where latency matters (not for small responses)
- You tested on slow/unreliable networks
- Progressive retrieval: documents arrive incrementally, not all at once
- You’ve tested user interruption mid-stream
Adaptive Learning
- Feedback is explicitly requested, not assumed
- You don’t overfit to individual user interactions
- Memory has bounded size (don’t grow infinitely)
- Learned rules are human-readable (for debugging)
- You periodically validate that learned rules help (A/B test)
- Preference extraction doesn’t hallucinate preferences from noise
- Drift detection threshold is tuned to your domain
- You log all learning events (for analysis later)
- Feedback loop is closed: agent acts on feedback in future interactions
- You disable learning during testing (or it corrupts results)
Multi-Model Orchestration
- Model costs are tracked per interaction
- Router decision is logged (which model? why?)
- Fallback models are strictly cheaper (not more expensive)
- You’ve tested budget exhaustion gracefully
- Cascading escalations don’t infinite-loop
- Model routing is testable in isolation
- You document which model for which task (no mystery routing)
- Cost estimates are within 10% of actual
- You’ve A/B tested router vs. fixed model
- Routing decision respects user preferences
Confidence Scoring
- Confidence values are calibrated (validation set: do they match actual accuracy?)
- Confidence is not overconfident (50% confidence should yield 50% accuracy)
- You tested confidence on out-of-distribution examples
- Confidence score is interpretable to users
- Escalation thresholds are tuned (not arbitrary)
- Confidence doesn’t degrade linearly with response length
- You log confidence alongside responses (for analysis)
- Confidence thresholds differ by task (not universal)
Caching & Memoization
- Cache keys are deterministic (same input = same key, always)
- TTL is appropriate for your domain (not too short, not too long)
- Cache invalidation is tested thoroughly
- Memory overhead of caching is acceptable
- You measure cache hit rate (should be >50% to be worth it)
- Cached values don’t include sensitive data
- Cache is thread-safe if concurrent access occurs
- You’ve tested cache size limits
- Stale data isn’t silently returned
- Cache performance is faster than recompute (actual timings)
Advanced Memory Patterns
- Episodic memory doesn’t grow unbounded (trim old episodes)
- Semantic memory is deduplicated (no duplicate facts)
- Preference memory is updateable (not write-once)
- Memory retrieval is fast enough (indexed, not linear scan)
- Memory context doesn’t overwhelm LLM context window
- Serialized memory is encrypted (if contains sensitive info)
- Memory is versioned (for backwards compatibility)
- You’ve tested memory search (can find what you stored?)
- Memory size monitored (don’t exceed available storage)
12. Common Mistakes: What NOT to Do
Tool Composition Mistakes
Mistake: Composing tools without error handling
# DON'T:
result = await tool_a.invoke(data)
result = await tool_b.invoke(result) # If A failed, B gets bad input
Fix: Wrap each tool with error handling and validation
Mistake: Tool output bloats during pipeline
# DON'T:
# Extract text (200 chars) -> parse JSON -> duplicate data 3x
# Total bloat: 200 -> 1KB -> 3KB
Fix: Validate output size after each step, compress intermediate results
Mistake: Parallel tools when they have dependencies
# DON'T:
results = await gather(tool_a(data), tool_b(data)) # If B depends on A
Fix: Determine true dependencies, only parallelize independent operations
State Machine Mistakes
Mistake: Transitions without guards become invalid states
# DON'T:
# Transition from PENDING to APPROVED without checking approval status
await state_machine.transition("approve") # Always succeeds, no guard
Fix: Add condition that must be true for transition
Mistake: Entry actions with side effects that aren’t idempotent
# DON'T:
async def on_approved(ctx):
await send_email(ctx["user"]) # Called twice? Email sent twice!
Fix: Make side effects idempotent or guard with “has already run” check
Streaming Mistakes
Mistake: Streaming when you need full context first
# DON'T:
async for chunk in stream_response(question):
print(chunk) # Stream before any validation
Fix: Collect full response, validate, then stream to user
Mistake: Not handling stream interruption
# DON'T:
async for chunk in stream:
process(chunk) # If user closes connection, orphan process
Fix: Wrap in try/except, clean up on interruption
Learning Mistakes
Mistake: Learning from single user, overfitting to their quirks
# DON'T:
for interaction in user_history:
if positive_feedback:
learned_rules.append(extract_pattern(interaction))
# Rules optimized for 1 user, fail for others
Fix: Validate learned rules against broader dataset, don’t overfit
Mistake: Drift detection with too-small window
# DON'T:
drift_detector = DriftDetector(window_size=5)
# 5 failures in a row triggers retraining, too noisy
Fix: Use window_size >= 20-50, depending on traffic
Mistake: Never resetting learned state
# DON'T:
agent = Agent()
agent.interact(msg1)
agent.interact(msg2)
# Two unrelated messages, but agent thinks they're related
Fix: Clear learning state between user sessions or domains
Routing Mistakes
Mistake: Always escalating to expensive model
# DON'T:
simple_response = await haiku.query(q)
if confidence < 0.99: # Threshold too high
response = await sonnet.query(q) # Always escalates
Fix: Calibrate thresholds so ~70% don’t escalate
Mistake: Router itself slower than direct query
# DON'T:
route_decision_time = 500ms # Classifying query takes longer than cheap model!
query_time = 200ms
# Router added latency, didn't save time
Fix: Measure end-to-end latency, routing should be <5% of query time
Confidence Scoring Mistakes
Mistake: Confidence always close to 0.5 or always close to 1.0
# DON'T:
# Confidence is uniformly 0.7 regardless of response quality
Fix: Calibrate on validation set, adjust scoring method
Mistake: Confidence based only on response length
# DON'T:
confidence = min(1.0, len(response) / 1000.0)
# Long hallucinations get high confidence!
Fix: Use multiple factors (hedging, specificity, consistency)
Caching Mistakes
Mistake: Cache invalidation never happens
# DON'T:
cache.put(key, value) # Forever
# User updates data, but cache is stale
Fix: Set TTL or explicit invalidation triggers
Mistake: Cache key includes timestamps or random data
# DON'T:
cache_key = f"{question}:{time.time()}" # Different key every second!
# Cache always misses
Fix: Deterministic keys based only on content
Mistake: Caching without measuring hit rate
# DON'T:
# Assume cache is helping, but hit rate is 5%
# Cache overhead > savings
Fix: Monitor hit rate, disable if <50%
Memory Mistakes
Mistake: Memory grows unbounded
# DON'T:
self.episodes.append(episode) # Never trim
# After 1 year, 1M episodes, search is O(n) slow
Fix: Implement retention policy (keep last 1000, or 30 days)
Mistake: Memory context > available LLM context window
# DON'T:
memory_context = build_memory_context() # 500K tokens
# LLM has 200K context total
# Memory + message doesn't fit!
Fix: Measure memory context size, cap it at <50% of LLM context window
13. Cross-References & Integration
How Advanced Patterns Integrate
- Tool Composition + State Machines: Pipelines as state transition actions
- Memory Patterns + Adaptive Learning: Memory drives learning; feedback updates memory
- Ensemble Methods + Confidence Scoring: Weight votes by model confidence
- Streaming + Early Termination: Stop streaming if confident answer achieved
- Code Generation + Tool Composition: Test-feedback loop as pipeline
- Multi-Model Routing + Confidence Scoring: Route to expensive model when confidence low
- Performance Optimization: Apply to any pattern for speedup (batch, parallelize, cache)
- Reasoning Frameworks (Doc 05) + Dynamic Selection: Pick ReAct/ToT/CoT based on problem type
Cross-Reference Quick Links
- See Doc 05 (AI Agents) for: ReAct, Chain-of-Thought, Tree-of-Thought frameworks
- See Doc 06 (Architecture) for: Where these patterns fit in 7-component system
- See Doc 08 (Implementation) for: Python code patterns and harness examples
- See Doc 11 (Testing) for: How to test advanced patterns (mock tools, state validation)
- See Doc 04 (Memory) for: Detailed memory system (complements Section 9 here)
- See Doc 19 (Knowledge Management) for: State management at scale, persistence
14. Real-World Integration: Sophisticated Multi-Pattern Agent
class ProductionAgent:
"""Real-world agent combining all advanced patterns."""
def __init__(self, config: dict):
self.router = ModelRouter() # Intelligent routing
self.pipeline = ToolPipeline([]) # Tool composition
self.state_machine = StateMachine(...) # State management
self.memory = IntegratedMemory(client) # Multi-tier memory
self.scorer = ConfidenceScorer(client) # Confidence estimation
self.cache = MemoizedToolExecutor() # Caching
self.drift_detector = DriftDetector() # Learning monitoring
async def process_request(self, user_input: str,
user_id: str,
feedback: str = None) -> dict:
"""One complete request with all patterns."""
# Load user's memory context
memory_context = self.memory.build_memory_context(user_id)
# Check cache
cached = self.cache.get(user_input)
if cached and cached["timestamp"] > (now() - TTL):
return {"response": cached["response"], "source": "cache"}
# Route to appropriate model
routed = await self.router.route_and_query(
user_input,
context=memory_context
)
# Execute tool pipeline if needed
result = await self.pipeline.execute(routed["response"])
# Score confidence in result
confidence = await self.scorer.score_confidence(
user_input,
result
)
# Escalate if low confidence
if confidence["overall_confidence"] < 0.6:
result = await self._escalate(user_input, result)
# Update state machine
await self.state_machine.transition("process", {"input": user_input})
# Cache result
self.cache.put(user_input, result)
# Learn from feedback if provided
if feedback:
await self.memory.process_interaction(user_input, result, feedback)
is_success = "good" in feedback.lower()
drift = self.drift_detector.record_result(is_success)
if drift:
print("[Drift detected - consider retraining]")
return {
"response": result,
"model_used": routed["model_used"],
"confidence": confidence["overall_confidence"],
"escalated": confidence["overall_confidence"] < 0.6,
"cached": False
}
async def _escalate(self, question: str, initial_response: str) -> str:
"""Escalate to powerful model when uncertain."""
response = self.client.messages.create(
model="claude-sonnet-4",
max_tokens=1000,
messages=[{
"role": "user",
"content": f"""Revisit this question with fresh perspective:
{question}
Initial attempt: {initial_response}
Provide improved answer."""
}]
)
return response.content[0].text
Summary
These advanced patterns enable sophisticated systems:
- Composition (pipelines, state machines) structure complex workflows
- Learning (adaptation, drift detection, few-shot) improve over time
- Quality (ensemble, streaming, early termination) deliver better results faster
- Performance (caching, parallelization, batching) scale efficiently
- Memory (episodic, semantic, preference) enable truly personalized agents
- Reasoning (dynamic framework selection, verification) improve solution quality
- Routing (model selection, confidence-based escalation) optimize cost/quality trade-off
Use these patterns when complexity justifies them. For simple systems (single model, single user), most can be skipped. For complex, long-lived, high-stakes systems, combine multiple patterns for robustness.
The key principle: Each pattern solves a specific problem. Don’t add a pattern unless you have that problem.
Pre-annotation: Python Computes, LLM Validates
The Problem
LLMs are probabilistic. When you ask one “is 1871 within 2 years of 1887?”, it might say yes. Date arithmetic, geographic distance, string matching — these are deterministic computations that LLMs handle unreliably. Yet many real-world tasks mix these computations with genuine reasoning that LLMs excel at.
The Pattern
Python pre-computes analysis (date arithmetic, geographic matching, temporal filtering, string comparison) and annotates raw data before the LLM sees it. The LLM then validates annotations rather than computing from scratch.
How It Works
- Raw data comes in — records, documents, search results
- Python annotates — adds computed facts as inline annotations
- LLM reads annotated data — validates and reasons about the annotations
- LLM never computes — it confirms or flags, it does not calculate
Example
Instead of asking the LLM “is 1871 within 2 years of 1887?”, Python annotates:
>>> LIKELY DIFFERENT PERSON: born 1871 = 16 years before subject (born 1887)
The LLM reads this annotation and confirms: “Yes, 16 years apart — different person.”
Benefits
- Dramatically reduces LLM errors on computational tasks
- Reduces token count — annotations are shorter than asking the LLM to work through arithmetic
- Makes smaller models viable — a 7B model can handle annotated data that would require a 70B model to compute from scratch
- Auditable — annotations show exactly what Python computed
When to Use
- Any task mixing computation with reasoning
- Record matching, deduplication, entity resolution
- Date/time analysis, geographic filtering
- Data validation workflows
This is the “move intelligence into the architecture” principle: do not ask the LLM to do what Python does better. Reserve the LLM for what it does best — reading context, understanding nuance, and making judgement calls.
The Research Companion: LLM for Strategy, Python for Execution
The Problem
Full agent patterns (LLM decides everything, executes everything, records everything) are risky when errors compound. If the agent records a wrong fact, all subsequent decisions build on that error. For tasks like genealogical research, legal analysis, or investigative work, this is unacceptable.
The Architecture
- LLM reads context and generates non-obvious research strategies — “have you checked the maiden name variant?”, “try searching the neighbouring parish”
- Python executes searches and matches records — deterministic, reliable, auditable
- Human reviews and decides — the final authority on what is true
Key Insight
Apply probabilistic creativity to questions (safe — a wrong question wastes one search) rather than answers (dangerous — a wrong answer corrupts data).
The LLM is the spice, not the main course. It generates lateral thinking and suggests avenues a human might not consider. But it never touches data matching and never writes facts.
When to Use
- Tasks where the bottleneck is knowing what to look for, not doing the looking
- Research workflows where errors compound (genealogy, legal discovery, medical research)
- Any domain where data integrity matters more than speed
Contrast with Full Agent Pattern
| Full Agent | Research Companion | |
|---|---|---|
| LLM role | Decides, executes, records | Reads, thinks, suggests |
| Risk | Wrong fact corrupts everything downstream | Wrong suggestion wastes one search |
| Human role | Reviews final output | Reviews each suggestion, decides |
| Data integrity | LLM-dependent | Human-guaranteed |
| Speed | Faster (autonomous) | Slower (human in loop) |
| Best for | Low-stakes, high-volume tasks | High-stakes, accuracy-critical tasks |
See Also
- Doc 05 (AI Agents) — Foundational agentic loop; advanced patterns extend and enhance this framework
- Doc 06 (Harness Architecture) — Seven core components form the base; advanced patterns enhance specific components
- Doc 08 (Claw-Code Python) — See pattern implementations in production code; reference for how patterns work
- Doc 11 (Testing & QA) — Test advanced patterns thoroughly; non-determinism compounds with composition and learning