Tutorials
Learning-oriented guides that take you through practical exercises to master AgentHub
Tutorials
These hands-on tutorials will guide you through learning AgentHub by doing. Each tutorial is designed to be followed step-by-step and will help you build practical experience with the system.
π Tutorial Categories
π― Learning Path
1. Start Here
Begin with Getting Started tutorials to install and run your first examples
2. Build Systems
Progress to Workflows to create sophisticated agent interactions
3. Monitor & Observe
Master Observability to monitor and troubleshoot your deployments
π Prerequisites
Before starting these tutorials, make sure you have:
- Go 1.24 or later installed
- Basic understanding of command-line tools
- Familiarity with distributed systems concepts (helpful but not required)
π‘ Tutorial Tips
- Follow tutorials in order for the best learning experience
- Each tutorial builds on concepts from previous ones
- Code examples are tested and should work as written
- Don’t hesitate to experiment beyond the tutorial steps
Note
All tutorials are designed to work on Linux, macOS, and Windows. Platform-specific instructions are provided where needed.1 - Getting Started
Essential tutorials to get you up and running with AgentHub
Getting Started Tutorials
Step-by-step tutorials to help you get AgentHub installed, configured, and running your first examples.
Available Tutorials
1.1 - Installation and Setup Tutorial
Guide for installing AgentHub and setting up your development environment from scratch. Get a working A2A-compliant AgentHub installation ready for building agent systems.
Installation and Setup Tutorial
This tutorial will guide you through installing AgentHub and setting up your development environment from scratch. By the end, you’ll have a working A2A-compliant AgentHub installation ready for building Agent2Agent protocol systems.
Prerequisites Check
Before we begin, let’s verify you have the required software installed.
Step 1: Verify Go Installation
Check if Go 1.24+ is installed:
You should see output like:
go version go1.24.0 darwin/amd64
If Go is not installed or the version is older than 1.24:
macOS (using Homebrew):
Linux (using package manager):
# Ubuntu/Debian
sudo apt update && sudo apt install golang-go
# CentOS/RHEL
sudo yum install golang
# Arch Linux
sudo pacman -S go
Windows:
Download from https://golang.org/dl/ and run the installer.
Step 2: Verify Protocol Buffers Compiler
Check if protoc is installed:
You should see output like:
libprotoc 3.21.12
If protoc is not installed:
macOS (using Homebrew):
Linux:
# Ubuntu/Debian
sudo apt update && sudo apt install protobuf-compiler
# CentOS/RHEL
sudo yum install protobuf-compiler
# Arch Linux
sudo pacman -S protobuf
Windows:
Download from Protocol Buffers releases and add to PATH.
Step 3: Install Go Protocol Buffer Plugins
Install the required Go plugins for Protocol Buffers:
go install google.golang.org/protobuf/cmd/protoc-gen-go@latest
go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest
Verify the plugins are in your PATH:
which protoc-gen-go
which protoc-gen-go-grpc
Both commands should return paths to the installed plugins.
Installing AgentHub
Step 4: Clone the Repository
Clone the AgentHub repository:
git clone https://github.com/owulveryck/agenthub.git
cd agenthub
Step 5: Verify Project Structure
Let’s explore what we have:
You should see:
drwxr-xr-x agents/           # Sample A2A agent implementations
drwxr-xr-x broker/           # A2A-compliant AgentHub broker server
drwxr-xr-x documentation/    # Complete A2A documentation
drwxr-xr-x events/           # Generated A2A protocol code
drwxr-xr-x internal/         # Internal packages and abstractions
-rw-r--r-- go.mod            # Go module definition
-rw-r--r-- Makefile         # Build automation
drwxr-xr-x proto/           # A2A protocol definitions
-rw-r--r-- README.md        # Project overview
Step 6: Initialize Go Module
Ensure Go modules are properly initialized:
This downloads all required dependencies. You should see output about downloading packages.
Step 7: Generate Protocol Buffer Code
Generate the Go code from Protocol Buffer definitions:
You should see:
Generating protobuf code for A2A protocol definitions...
Generating proto/eventbus.proto...
Generating proto/a2a.proto...
Protobuf code generated successfully.
Verify the generated files exist:
You should see:
a2a/          # A2A protocol definitions
eventbus/     # AgentHub broker definitions
You should see:
a2a.pb.go
a2a_grpc.pb.go
You should see:
eventbus.pb.go
eventbus_grpc.pb.go
Step 8: Build All Components
Build the AgentHub components:
You should see:
Building A2A-compliant server binary...
Building A2A publisher binary...
Building A2A subscriber binary...
Build complete. A2A-compliant binaries are in the 'bin/' directory.
Verify the binaries were created:
You should see:
agenthub-server  # A2A-compliant AgentHub broker
publisher        # A2A message publisher
subscriber       # A2A message subscriber
Verification Test
Let’s verify everything works by running a quick test.
Step 9: Test the Installation
Start the A2A-compliant broker server in the background:
You should see:
2025/09/28 10:00:00 A2A-compliant AgentHub broker gRPC server listening on [::]:50051
2025/09/28 10:00:00 AgentHub service ready for A2A protocol communication
Start an A2A subscriber agent:
You should see:
A2A Agent started. Listening for A2A events and tasks. Press Enter to stop.
2025/09/28 10:00:05 A2A Agent agent_demo_subscriber subscribing to A2A tasks...
2025/09/28 10:00:05 Successfully subscribed to A2A tasks for agent agent_demo_subscriber. Waiting for A2A tasks...
Run the A2A publisher to send test tasks:
You should see A2A tasks being published and processed with conversation context and structured artifacts.
Clean up the test processes:
pkill -f agenthub-server
pkill -f subscriber
Development Environment Setup
For VS Code users:
Install the Go extension:
- Open VS Code
- Go to Extensions (Ctrl+Shift+X)
- Search for “Go” and install the official Go extension
- Open the AgentHub project folder
For other editors:
Ensure your editor has Go language support and Protocol Buffer syntax highlighting.
Step 11: Set Up Environment Variables (Recommended)
AgentHub uses environment variables for configuration. Create a .envrc file for local development:
cat > .envrc << EOF
# Core A2A AgentHub Configuration
export AGENTHUB_BROKER_ADDR="localhost"
export AGENTHUB_BROKER_PORT="50051"
export AGENTHUB_GRPC_PORT=":50051"
# A2A Protocol Configuration
export AGENTHUB_A2A_PROTOCOL_VERSION="1.0"
export AGENTHUB_MESSAGE_BUFFER_SIZE="100"
export AGENTHUB_CONTEXT_TIMEOUT="30s"
export AGENTHUB_ARTIFACT_MAX_SIZE="10MB"
# Health Check Ports
export AGENTHUB_HEALTH_PORT="8080"
export A2A_PUBLISHER_HEALTH_PORT="8081"
export A2A_SUBSCRIBER_HEALTH_PORT="8082"
# Observability (optional for development)
export JAEGER_ENDPOINT="http://localhost:14268/api/traces"
export SERVICE_NAME="agenthub-dev"
export SERVICE_VERSION="dev"
export ENVIRONMENT="development"
export LOG_LEVEL="DEBUG"
EOF
Install direnv for automatic loading (recommended):
# macOS
brew install direnv
# Ubuntu/Debian
sudo apt install direnv
# After installation, add to your shell
echo 'eval "$(direnv hook bash)"' >> ~/.bashrc  # For bash
echo 'eval "$(direnv hook zsh)"' >> ~/.zshrc    # For zsh
Allow the environment file:
Alternative: Manual loading
π For complete environment variable reference, see Environment Variables Reference
Step 12: Verify Make Targets
Test all available make targets:
You should see all available commands:
Makefile for gRPC Event Bus
Usage:
  make <target>
Targets:
  all              Builds all binaries (default).
  proto            Generates Go code from .proto files.
  build            Builds the server, publisher, and subscriber binaries.
  run-server       Runs the event bus gRPC server.
  run-publisher    Runs the publisher client.
  run-subscriber   Runs the subscriber client.
  clean            Removes generated Go files and build artifacts.
  help             Displays this help message.
Common Issues and Solutions
Issue: “protoc-gen-go: program not found”
Solution: Ensure Go bin directory is in your PATH:
export PATH=$PATH:$(go env GOPATH)/bin
echo 'export PATH=$PATH:$(go env GOPATH)/bin' >> ~/.bashrc
source ~/.bashrc
Issue: “go.mod not found”
Solution: Ensure you’re in the AgentHub project directory:
pwd  # Should show .../agenthub
ls go.mod  # Should exist
Issue: Port 50051 already in use
Solution: Kill existing processes or change the port:
lsof -ti:50051 | xargs kill -9
Issue: Permission denied on binaries
Solution: Make binaries executable:
Next Steps
Now that you have AgentHub installed and verified:
- Learn the basics: Follow the Running the Demo tutorial
- Build your first agent: Try Create a Subscriber
- Understand the concepts: Read The Agent2Agent Principle
Getting Help
If you encounter issues:
- Check the troubleshooting section above
- Review the complete documentation
- Open an issue on the GitHub repository
Congratulations! You now have a fully functional AgentHub development environment ready for building autonomous agent systems.
1.2 - Running the A2A-Compliant AgentHub Demo
Walk through setting up and running the complete A2A-compliant AgentHub EDA broker system. Learn how agents communicate using Agent2Agent protocol messages through the Event-Driven Architecture broker.
Running the A2A-Compliant AgentHub Demo
This tutorial will walk you through setting up and running the complete Agent2Agent (A2A) protocol-compliant AgentHub Event-Driven Architecture (EDA) broker system. By the end of this tutorial, you’ll have agents communicating using standardized A2A messages through the scalable EDA broker.
Prerequisites
- Go 1.24 or later installed
- Protocol Buffers compiler (protoc) installed
- Basic understanding of gRPC and message brokers
Step 1: Build the A2A-Compliant Components
First, let’s build all the A2A-compliant components using the Makefile:
# Build all A2A-compliant binaries (generates protobuf files first)
make build
This will:
- Generate A2A protocol files from proto/a2a_core.protoandproto/eventbus.proto
- Build the A2A-compliant broker, publisher, and subscriber binaries
- Place all binaries in the bin/directory
You should see output like:
Building A2A-compliant server binary...
Building A2A-compliant publisher binary...
Building A2A-compliant subscriber binary...
Build complete. A2A-compliant binaries are in the 'bin/' directory.
Step 2: Start the AgentHub Broker Server
Open a terminal and start the AgentHub broker server:
You should see output like:
time=2025-09-29T11:51:26.612+02:00 level=INFO msg="Starting health server" port=8080
time=2025-09-29T11:51:26.611+02:00 level=INFO msg="AgentHub gRPC server with observability listening" address=[::]:50051 health_endpoint=http://localhost:8080/health metrics_endpoint=http://localhost:8080/metrics component=broker
Keep this terminal open - the AgentHub broker needs to run continuously.
Step 3: Start an Agent (Subscriber)
Open a second terminal and start an agent that can receive and process tasks:
You should see output indicating the agent has started:
time=2025-09-29T11:52:04.727+02:00 level=INFO msg="AgentHub client started with observability" broker_addr=localhost:50051 component=subscriber
time=2025-09-29T11:52:04.727+02:00 level=INFO msg="Starting health server" port=8082
time=2025-09-29T11:52:04.728+02:00 level=INFO msg="Agent started with observability. Listening for events and tasks."
time=2025-09-29T11:52:04.728+02:00 level=INFO msg="Subscribing to task results" agent_id=agent_demo_subscriber
time=2025-09-29T11:52:04.728+02:00 level=INFO msg="Subscribing to tasks" agent_id=agent_demo_subscriber
This agent can process several types of tasks:
- greeting: Simple greeting messages
- math_calculation: Basic arithmetic operations
- random_number: Random number generation
- Any unknown task type will be rejected
Step 4: Send A2A-Compliant Tasks
Open a third terminal and run the publisher to send A2A protocol-compliant task messages:
You’ll see the publisher send various A2A-compliant task messages through the AgentHub EDA broker:
time=2025-09-29T14:41:11.237+02:00 level=INFO msg="Starting publisher demo"
time=2025-09-29T14:41:11.237+02:00 level=INFO msg="Testing Agent2Agent Task Publishing via AgentHub with observability"
time=2025-09-29T14:41:11.237+02:00 level=INFO msg="Publishing A2A task" task_id=task_greeting_1759149671 task_type=greeting responder_agent_id=agent_demo_subscriber context_id=ctx_greeting_1759149671
time=2025-09-29T14:41:11.242+02:00 level=INFO msg="A2A task published successfully" task_id=task_greeting_1759149671 task_type=greeting event_id=evt_msg_greeting_1759149671_1759149671
time=2025-09-29T14:41:11.242+02:00 level=INFO msg="Published greeting task" task_id=task_greeting_1759149671
time=2025-09-29T14:41:14.243+02:00 level=INFO msg="Publishing A2A task" task_id=task_math_calculation_1759149674 task_type=math_calculation responder_agent_id=agent_demo_subscriber context_id=ctx_math_calculation_1759149674
time=2025-09-29T14:41:14.247+02:00 level=INFO msg="A2A task published successfully" task_id=task_math_calculation_1759149674 task_type=math_calculation event_id=evt_msg_math_calculation_1759149674_1759149674
time=2025-09-29T14:41:16.248+02:00 level=INFO msg="Publishing A2A task" task_id=task_random_number_1759149676 task_type=random_number responder_agent_id=agent_demo_subscriber context_id=ctx_random_number_1759149676
time=2025-09-29T14:41:16.249+02:00 level=INFO msg="Published random number task" task_id=task_random_number_1759149676
Notice how the A2A implementation includes:
- Context IDs: Each task is grouped in a conversation context (ctx_greeting_...)
- Event IDs: EDA wrapper events have unique identifiers for tracing
- A2A Task Structure: Tasks use A2A-compliant Message and Part formats
Step 5: Observe A2A Task Processing
Switch back to the subscriber terminal to see the agent processing A2A tasks in real-time:
time=2025-09-29T14:41:11.243+02:00 level=INFO msg="Task processing completed" task_id=task_greeting_1759149671 status=TASK_STATE_COMPLETED has_artifact=true
time=2025-09-29T14:41:14.253+02:00 level=INFO msg="Task processing completed" task_id=task_math_calculation_1759149674 status=TASK_STATE_COMPLETED has_artifact=true
time=2025-09-29T14:41:16.249+02:00 level=INFO msg="Task processing completed" task_id=task_random_number_1759149676 status=TASK_STATE_COMPLETED has_artifact=true
Notice the A2A-compliant processing:
- Task States: Using A2A standard states (TASK_STATE_COMPLETED)
- Artifacts: Each completed task generates A2A artifacts (has_artifact=true)
- Structured Processing: Tasks are processed using A2A Message and Part handlers
Step 6: Check the Broker Logs
In the first terminal (broker server), you’ll see logs showing message routing:
2025/09/27 16:34:33 Received task request: task_greeting_1758983673 (type: greeting) from agent: agent_demo_publisher
2025/09/27 16:34:35 Received task result for task: task_greeting_1758983673 from agent: agent_demo_subscriber
2025/09/27 16:34:35 Received task progress for task: task_greeting_1758983673 (100%) from agent: agent_demo_subscriber
Understanding What Happened
- A2A Message Creation: The publisher created A2A-compliant messages with: - Message Structure: Using A2A Message format with Part content
- Context Grouping: Each task belongs to a conversation context
- Task Association: Messages are linked to specific A2A tasks
- Role Definition: Messages specify USER (requester) or AGENT (responder) roles
 
- EDA Event Routing: The AgentHub EDA broker: - Wrapped A2A Messages: A2A messages wrapped in AgentEvent for EDA transport
- Event-Driven Routing: Used EDA patterns for scalable message delivery
- Task Storage: Stored A2A tasks with full message history and artifacts
- Status Tracking: Managed A2A task lifecycle (SUBMITTED β WORKING β COMPLETED)
 
- A2A Task Processing: The subscriber agent: - A2A Task Reception: Received A2A tasks via EDA event streams
- Message Processing: Processed A2A Message content using Part handlers
- Artifact Generation: Generated structured A2A artifacts as task output
- Status Updates: Published A2A-compliant status updates through EDA events
 
- Hybrid Architecture Benefits: - A2A Compliance: Full interoperability with other A2A-compliant systems
- EDA Scalability: Event-driven patterns for high-throughput scenarios
- Standards-Based: Using industry-standard Agent2Agent protocol
- Observable: Built-in tracing and metrics for production deployment
 
Next Steps
Now that you have the basic system working, you can:
- Create Multiple Agents: Run multiple subscriber instances with different agent IDs to see task distribution
- Add Custom Task Types: Modify the subscriber to handle new types of tasks
- Build a Request-Response Flow: Create an agent that both requests and processes tasks
- Monitor Task Progress: Build a dashboard that subscribes to task progress updates
Troubleshooting
Port Already in Use: If you see “bind: address already in use”, kill any existing processes:
lsof -ti:50051 | xargs kill -9
Agent Not Receiving Tasks: Ensure the agent ID in the publisher matches the subscriber’s agent ID (agent_demo_subscriber).
Build Errors: Regenerate A2A protocol buffer files and ensure all imports are correct:
# Clean old protobuf files
make clean
# Regenerate A2A protobuf files
make proto
# Rebuild everything
make build
A2A Compliance Issues: Verify A2A protocol structures are correctly generated:
# Check A2A core types
ls events/a2a/
# Should show: a2a_core.pb.go eventbus.pb.go eventbus_grpc.pb.go
You now have a working A2A-compliant AgentHub EDA broker system! The agents can exchange standardized A2A messages, maintain conversation contexts, generate structured artifacts, and track task lifecycles - all through your scalable Event-Driven Architecture broker with full Agent2Agent protocol compliance.
2 - Observability
Tutorials for monitoring and observing AgentHub systems
Observability Tutorials
Learn how to monitor, trace, and observe your AgentHub deployments with comprehensive observability features.
Available Tutorials
2.1 - Interactive Dashboard Tour
Take a guided tour through AgentHub’s Grafana dashboards while the system is running, learning to interpret metrics, identify issues, and understand system behavior in real-time.
Interactive Dashboard Tour
Learn by doing: Take a guided tour through AgentHub’s Grafana dashboards while the system is running, learning to interpret metrics, identify issues, and understand system behavior in real-time.
Prerequisites
- Observability stack running (from the Observability Demo)
- Observable agents running (broker, publisher, subscriber)
- Grafana open at http://localhost:3333
- 10-15 minutes for the complete tour
Quick Setup Reminder
If you haven’t completed the observability demo yet:
# Start observability stack
cd agenthub/observability
docker-compose up -d
# Run observable agents (3 terminals)
go run broker/main.go
go run agents/subscriber/main.go
go run agents/publisher/main.go
Dashboard Navigation
Accessing the Main Dashboard
- Open Grafana: http://localhost:3333
- Login: admin / admin (skip password change for demo)
- Navigate: Dashboards β Browse β AgentHub β “AgentHub EDA System Observatory”
- Bookmark: Save this URL for quick access: http://localhost:3333/d/agenthub-eda-dashboard
Dashboard Layout Overview
The dashboard is organized in 4 main rows:
π― Row 1: Event Processing Overview
βββ Event Processing Rate (events/sec)
βββ Event Processing Error Rate (%)
π Row 2: Event Analysis
βββ Event Types Distribution (pie chart)
βββ Event Processing Latency (p50, p95, p99)
π Row 3: Distributed Tracing
βββ Jaeger Integration Panel
π» Row 4: System Health
βββ Service CPU Usage (%)
βββ Service Memory Usage (MB)
βββ Go Goroutines Count
βββ Service Health Status
Interactive Tour
Tour 1: Understanding Event Flow (3 minutes)
Step 1: Watch the Event Processing Rate
Location: Top-left panel
What to observe: Real-time lines showing events per second
- Identify the services: - Green line: agenthub-broker(should be highest - processes all events)
- Blue line: agenthub-publisher(events being created)
- Orange line: agenthub-subscriber(events being processed)
 
- Watch the pattern: - Publisher creates bursts of events
- Broker immediately processes them (routing)
- Subscriber processes them shortly after
 
- Understand the flow: - Publisher (creates) β Broker (routes) β Subscriber (processes)
     50/sec      β      150/sec     β      145/sec
 
π‘ Tour Insight: The broker rate is higher because it processes both incoming tasks AND outgoing results.
Step 2: Monitor Error Rates
Location: Top-right panel (gauge)
What to observe: Error percentage gauge
- Healthy system: Should show 0-2% (green zone) 
- If you see higher errors: - Check if all services are running
- Look for red traces in Jaeger (we’ll do this next)
 
- Error rate calculation: - Error Rate = (Failed Events / Total Events) Γ 100
 
π― Action: Note your current error rate - we’ll compare it later.
Tour 2: Event Analysis Deep Dive (3 minutes)
Step 3: Explore Event Types
Location: Middle-left panel (pie chart)
What to observe: Distribution of different event types
- Identify event types: - greeting: Most common (usually 40-50%)
- math_calculation: Compute-heavy tasks (30-40%)
- random_number: Quick tasks (15-25%)
- unknown_task: Error-generating tasks (2-5%)
 
- Business insights: - Larger slices = more frequent tasks
- Small red slice = intentional error tasks for testing
 
π‘ Tour Insight: The publisher randomly generates different task types to simulate real-world workload diversity.
Step 4: Analyze Processing Latency
Location: Middle-right panel
What to observe: Three latency lines (p50, p95, p99)
- Understand percentiles: - p50 (blue): 50% of events process faster than this
- p95 (green): 95% of events process faster than this
- p99 (red): 99% of events process faster than this
 
- Healthy ranges: - p50: < 50ms (very responsive)
- p95: < 200ms (good performance)
- p99: < 500ms (acceptable outliers)
 
- Pattern recognition: - Spiky p99 = occasional slow tasks (normal)
- Rising p50 = systemic slowdown (investigate)
- Flat lines = no activity or measurement issues
 
π― Action: Hover over the lines to see exact values at different times.
Tour 3: Distributed Tracing Exploration (4 minutes)
Step 5: Jump into Jaeger
Location: Middle section - “Distributed Traces” panel
Action: Click the “Explore” button
This opens Jaeger in a new tab. Let’s explore:
- In Jaeger UI: - Service dropdown: Select “agenthub-broker”
- Operation: Leave as “All”
- Click “Find Traces”
 
- Pick a trace to examine: - Look for traces that show multiple spans
- Click on any trace line to open details
 
- Understand the trace structure: - Timeline View:
agenthub-publisher: publish_event [2ms]
  βββ agenthub-broker: process_event [1ms]
      βββ agenthub-subscriber: consume_event [3ms]
          βββ agenthub-subscriber: process_task [15ms]
              βββ agenthub-subscriber: publish_result [2ms]
 
- Explore span details: - Click individual spans to see:- Tags: event_type, event_id, agent names
- Process: Which service handled the span
- Duration: Exact timing information
 
 
π‘ Tour Insight: Each event creates a complete “trace” showing its journey from creation to completion.
Step 6: Find and Analyze an Error
- Search for error traces: - In Jaeger, add tag filter: error=true
- Or look for traces with red spans
 
- Examine the error trace: - Red spans indicate errors
- Error tags show the error type and message
- Stack traces help with debugging
 
- Follow the error propagation: - See how errors affect child spans
- Notice error context in span attributes
 
π― Action: Find a trace with “unknown_task” event type - these are designed to fail for demonstration.
Tour 4: System Health Monitoring (3 minutes)
Step 7: Monitor Resource Usage
Location: Bottom row panels
What to observe: System resource consumption
- CPU Usage Panel (Bottom-left): - Normal range: 10-50% for demo workload
- Watch for: Sustained high CPU (>70%)
- Services comparison: See which service uses most CPU
 
- Memory Usage Panel (Bottom-center-left): - Normal range: 30-80MB per service for demo
- Watch for: Continuously growing memory (memory leaks)
- Pattern: Sawtooth = normal GC, steady growth = potential leak
 
- Goroutines Panel (Bottom-center-right): - Normal range: 10-50 goroutines per service
- Watch for: Continuously growing count (goroutine leaks)
- Pattern: Stable baseline with activity spikes
 
Step 8: Verify Service Health
Location: Bottom-right panel
What to observe: Service up/down status
- Health indicators: - Green: Service healthy and responding
- Red: Service down or health check failing
- Yellow: Service degraded but operational
 
- Health check details: - Each service exposes /healthendpoint
- Prometheus monitors these endpoints
- Dashboard shows aggregated status
 
π― Action: Open http://localhost:8080/health in a new tab to see raw health data.
Tour 5: Time-based Analysis (2 minutes)
Step 9: Change Time Ranges
Location: Top-right of dashboard (time picker)
Current: Likely showing “Last 5 minutes”
- Try different ranges: - Last 15 minutes: See longer trends
- Last 1 hour: See full demo session
- Custom range: Pick specific time period
 
- Observe pattern changes: - Longer ranges: Show trends and patterns
- Shorter ranges: Show real-time detail
- Custom ranges: Zoom into specific incidents
 
Step 10: Use Dashboard Filters
Location: Top of dashboard - variable dropdowns
- Service Filter: - Select “All” to see everything
- Pick specific service to focus analysis
- Useful for isolating service-specific issues
 
- Event Type Filter: - Filter to specific event types
- Compare performance across task types
- Identify problematic event categories
 
π‘ Tour Insight: Filtering helps you drill down from system-wide view to specific components or workloads.
Hands-on Experiments
Experiment 1: Create a Service Outage
Goal: See how the dashboard shows service failures
- Stop the subscriber: - # In subscriber terminal, press Ctrl+C
 
- Watch the dashboard changes: - Error rate increases (top-right gauge turns red)
- Subscriber metrics disappear from bottom panels
- Service health shows subscriber as down
 
- Check Jaeger for failed traces: - Look for traces that don’t complete
- See where the chain breaks
 
- Restart subscriber: - go run agents/subscriber/main.go
 
π― Learning: Dashboard immediately shows impact of service failures.
Experiment 2: Generate High Load
Goal: See system behavior under stress
- Modify publisher to generate more events: - # Edit agents/publisher/main.go
# Change: time.Sleep(5 * time.Second)
# To:     time.Sleep(1 * time.Second)
 
- Watch dashboard changes: - Processing rate increases
- Latency may increase
- CPU/memory usage grows
 
- Observe scaling behavior: - How does the system handle increased load?
- Do error rates increase?
- Where are the bottlenecks?
 
π― Learning: Dashboard shows system performance characteristics under load.
Dashboard Interpretation Guide
What Good Looks Like
β
 Event Processing Rate: Steady activity matching workload
β
 Error Rate: < 5% (green zone)
β
 Event Types: Expected distribution
β
 Latency: p95 < 200ms, p99 < 500ms
β
 CPU Usage: < 50% sustained
β
 Memory: Stable or slow growth with GC cycles
β
 Goroutines: Stable baseline with activity spikes
β
 Service Health: All services green/up
Warning Signs
β οΈ Error Rate: 5-10% (yellow zone)
β οΈ Latency: p95 > 200ms or rising trend
β οΈ CPU: Sustained > 70%
β οΈ Memory: Continuous growth without GC
β οΈ Missing data: Gaps in metrics (service issues)
Critical Issues
π¨ Error Rate: > 10% (red zone)
π¨ Latency: p95 > 500ms
π¨ CPU: Sustained > 90%
π¨ Memory: Rapid growth or OOM
π¨ Service Health: Any service showing red/down
π¨ Traces: Missing or broken trace chains
Next Steps After the Tour
For Daily Operations:
- Bookmark: Save dashboard URL for quick access
- Set up alerts: Configure notifications for critical metrics
- Create views: Use filters to create focused views for your team
For Development:
For Deep Understanding:
Troubleshooting Tour Issues
| Issue | Solution | 
|---|
| Dashboard shows no data | Verify observability environment variables are set | 
| Grafana won’t load | Check docker-compose psin observability/ | 
| Metrics missing | Verify Prometheus targets at http://localhost:9090/targets | 
| Jaeger empty | Ensure trace context propagation is working | 
π Congratulations! You’ve completed the interactive dashboard tour and learned to read AgentHub’s observability signals like a pro!
π― Ready for More?
Master the Tools: Use Grafana Dashboards - Advanced dashboard usage
Troubleshoot Issues: Debug with Distributed Tracing - Use Jaeger effectively
2.2 - AgentHub Observability Demo Tutorial
Experience the complete observability stack with distributed tracing, real-time metrics, and intelligent alerting in under 10 minutes through hands-on learning.
AgentHub Observability Demo Tutorial
Learn by doing: Experience the complete observability stack with distributed tracing, real-time metrics, and intelligent alerting in under 10 minutes.
What You’ll Learn
By the end of this tutorial, you’ll have:
- β
 Seen distributed traces flowing across multiple agents
- β
 Monitored real-time metrics in beautiful Grafana dashboards
- β
 Understood event correlation through trace IDs
- β
 Experienced intelligent alerting when things go wrong
- β
 Explored the complete observability stack components
Prerequisites
- Go 1.24+ installed
- Docker and Docker Compose installed
- Environment variables configured (see Installation and Setup)
- 10 minutes of your time
- Basic terminal knowledge
π‘ Environment Note: AgentHub agents automatically enable observability when JAEGER_ENDPOINT is configured. See Environment Variables Reference for all configuration options.
Step 1: Clone and Setup (1 minute)
# Clone the repository
git clone https://github.com/owulveryck/agenthub.git
cd agenthub
# Verify you have the observability files
ls observability/
# You should see: docker-compose.yml, grafana/, prometheus/, etc.
Step 2: Start the Observability Stack (2 minutes)
# Navigate to observability directory
cd observability
# Start all monitoring services
docker-compose up -d
# Verify services are running
docker-compose ps
Expected Output:
NAME                    COMMAND                  SERVICE             STATUS
agenthub-grafana        "/run.sh"                grafana             running
agenthub-jaeger         "/go/bin/all-in-one"     jaeger              running
agenthub-prometheus     "/bin/prometheus --cβ¦"   prometheus          running
agenthub-otel-collector "/otelcol-contrib --β¦"   otel-collector      running
π― Checkpoint 1: All services should be “running”. If not, check Docker logs: docker-compose logs <service-name>
Step 3: Access the Dashboards (1 minute)
Open these URLs in your browser (keep them open in tabs):
| Service | URL | Purpose | 
|---|
| Grafana | http://localhost:3333 | Main observability dashboard | 
| Jaeger | http://localhost:16686 | Distributed tracing | 
| Prometheus | http://localhost:9090 | Raw metrics and alerts | 
Grafana Login: admin / admin (skip password change for demo)
π― Checkpoint 2: You should see Grafana’s welcome page and Jaeger’s empty trace list.
Step 4: Start the Observable Broker (1 minute)
Open a new terminal and navigate back to the project root:
# From agenthub root directory
go run broker/main.go
Expected Output:
time=2025-09-28T21:00:00.000Z level=INFO msg="Starting health server on port 8080"
time=2025-09-28T21:00:00.000Z level=INFO msg="AgentHub broker gRPC server with observability listening" address="[::]:50051" health_endpoint="http://localhost:8080/health" metrics_endpoint="http://localhost:8080/metrics"
π― Checkpoint 3:
- Broker is listening on port 50051
- Health endpoint available at http://localhost:8080/health
- Metrics endpoint available at http://localhost:8080/metrics
Step 5: Start the Observable Subscriber (1 minute)
Open another terminal:
go run agents/subscriber/main.go
Expected Output:
time=2025-09-28T21:00:01.000Z level=INFO msg="Starting health server on port 8082"
time=2025-09-28T21:00:01.000Z level=INFO msg="Starting observable subscriber"
time=2025-09-28T21:00:01.000Z level=INFO msg="Agent started with observability. Listening for events and tasks."
π― Checkpoint 4:
- Subscriber is connected and listening
- Health available at http://localhost:8082/health
Step 6: Generate Events with the Publisher (2 minutes)
Open a third terminal:
go run agents/publisher/main.go
Expected Output:
time=2025-09-28T21:00:02.000Z level=INFO msg="Starting health server on port 8081"
time=2025-09-28T21:00:02.000Z level=INFO msg="Starting observable publisher demo"
time=2025-09-28T21:00:02.000Z level=INFO msg="Publishing task" task_id=task_greeting_1727557202 task_type=greeting responder_agent_id=agent_demo_subscriber
time=2025-09-28T21:00:02.000Z level=INFO msg="Task published successfully" task_id=task_greeting_1727557202 task_type=greeting
π― Checkpoint 5: You should see:
- Publisher creating and sending tasks
- Subscriber receiving and processing tasks
- Broker routing messages between them
Step 7: Explore Real-time Metrics in Grafana (2 minutes)
- Go to Grafana: http://localhost:3333
- Navigate to Dashboards β Browse β AgentHub β “AgentHub EDA System Observatory”
- Observe the real-time data:
What You’ll See:
Event Processing Rate (Top Left)
- Lines showing events/second for each service
- Should show activity spikes when publisher runs
Error Rate (Top Right)
- Gauge showing error percentage
- Should be green (< 5% errors)
Event Types Distribution (Middle Left)
- Pie chart showing task types: greeting, math_calculation, random_number
- Different colors for each task type
Processing Latency (Middle Right)
- Three lines: p50, p95, p99 latencies
- Should show sub-second processing times
System Health (Bottom)
- CPU usage, memory usage, goroutines
- Service health status (all should be UP)
π― Checkpoint 6: Dashboard should show live metrics with recent activity.
Step 8: Explore Distributed Traces in Jaeger (2 minutes)
- Go to Jaeger: http://localhost:16686
- Select Service: Choose “agenthub-broker” from dropdown
- Click “Find Traces”
- Click on any trace to see details
What You’ll See:
Complete Event Journey:
agenthub-publisher: publish_event (2ms)
  βββ agenthub-broker: process_event (1ms)
      βββ agenthub-subscriber: consume_event (5ms)
          βββ agenthub-subscriber: process_task (15ms)
              βββ agenthub-subscriber: publish_result (2ms)
Trace Details:
- Span Tags: event_id, event_type, service names
- Timing Information: Exact start/end times and durations
- Log Correlation: Each span linked to structured logs
Error Detection:
- Look for red spans indicating errors
- Trace the “unknown_task” type to see how errors propagate
π― Checkpoint 7: You should see complete traces showing the full event lifecycle.
Step 9: Correlate Logs with Traces (1 minute)
- Copy a trace ID from Jaeger (the long hex string) 
- Check broker logs for that trace ID: - # In your broker terminal, look for lines like:
time=2025-09-28T21:00:02.000Z level=INFO msg="Received task request" task_id=task_greeting_1727557202 trace_id=a1b2c3d4e5f6...
 
- Check subscriber logs for the same trace ID 
π― Checkpoint 8: You should find the same trace_id in logs across multiple services.
Step 10: Experience Intelligent Alerting (Optional)
To see alerting in action:
- Simulate errors by stopping the subscriber: - # In subscriber terminal, press Ctrl+C
 
- Keep publisher running (it will fail to process tasks) 
- Check Prometheus alerts: - Go to http://localhost:9090/alerts
- After ~5 minutes, you should see “HighEventProcessingErrorRate” firing
 
- Restart subscriber to clear the alert 
π Congratulations!
You’ve successfully experienced the complete AgentHub observability stack!
Summary: What You Accomplished
β
 Deployed a complete observability stack with Docker Compose
β
 Ran observable agents with automatic instrumentation
β
 Monitored real-time metrics in Grafana dashboards
β
 Traced event flows across multiple services with Jaeger
β
 Correlated logs with traces using trace IDs
β
 Experienced intelligent alerting with Prometheus
β
 Understood the complete event lifecycle from publisher to subscriber
Key Observability Concepts You Learned
Distributed Tracing
- Events get unique trace IDs that follow them everywhere
- Each processing step creates a “span” with timing information
- Complete request flows are visible across service boundaries
Metrics Collection
- 47+ different metrics automatically collected
- Real-time visualization of system health and performance
- Historical data for trend analysis
Structured Logging
- All logs include trace context for correlation
- Consistent format across all services
- Easy debugging and troubleshooting
Intelligent Alerting
- Proactive monitoring for error rates and performance
- Automatic notifications when thresholds are exceeded
- Helps prevent issues before they impact users
Next Steps
For Development:
For Operations:
For Understanding:
Troubleshooting
| Issue | Solution | 
|---|
| Services won’t start | Run docker-compose down && docker-compose up -d | 
| No metrics in Grafana | Check Prometheus targets: http://localhost:9090/targets | 
| No traces in Jaeger | Verify JAEGER_ENDPOINT environment variable is set correctly | 
| Permission errors | Ensure Docker has proper permissions | 
Clean Up
When you’re done exploring:
# Stop the observability stack
cd observability
docker-compose down
# Stop the Go applications
# Press Ctrl+C in each terminal running the agents
π― Ready for More?
Production Usage: Add Observability to Your Agent
Deep Understanding: Distributed Tracing Explained
3 - Workflows
Tutorials for building complex multi-agent workflows
Workflow Tutorials
Learn to design and implement sophisticated multi-agent workflows and orchestration patterns.
Available Tutorials
3.1 - Building Multi-Agent Workflows
Learn to create complex workflows involving multiple specialized agents working together to accomplish sophisticated tasks. Build a real document processing pipeline with multiple agents handling different stages.
Building Multi-Agent Workflows
This advanced tutorial teaches you to create complex workflows involving multiple specialized agents working together to accomplish sophisticated tasks. You’ll build a real document processing pipeline with multiple agents handling different stages.
What You’ll Build
By the end of this tutorial, you’ll have an A2A-compliant multi-agent system that:
- Ingests documents through an A2A Document Intake Agent
- Validates content using an A2A Validation Agent
- Extracts metadata with an A2A Metadata Extraction Agent
- Processes text through an A2A Text Processing Agent
- Generates summaries using an A2A Summary Agent
- Orchestrates the workflow with an A2A Workflow Coordinator Agent
This demonstrates real-world A2A agent collaboration patterns with conversation context, structured message content, and artifact-based results used in production systems.
Prerequisites
Architecture Overview
βββββββββββββββββββ    βββββββββββββββββββ    βββββββββββββββββββ
β  A2A Workflow   β    β   AgentHub      β    β  A2A Specializedβ
β  Coordinator    β    β  A2A Broker     β    β    Agents       β
β                 β    β                 β    β                 β
β β’ A2A context   βββββΊβ β’ Routes A2A    βββββΊβ β’ Document      β
β   management    β    β   messages      β    β   Intake        β
β β’ Conversation  β    β β’ Tracks A2A    β    β β’ Validation    β
β   threading     β    β   conversations β    β β’ Metadata      β
β β’ Artifact      β    β β’ Manages A2A   β    β β’ Text Proc     β
β   aggregation   β    β   state         β    β β’ Summary       β
βββββββββββββββββββ    βββββββββββββββββββ    βββββββββββββββββββ
Step 1: Create the Workflow Coordinator
First, let’s create the main coordinator that manages the document processing pipeline.
Create the coordinator agent:
mkdir -p agents/coordinator
Create agents/coordinator/main.go:
package main
import (
	"context"
	"fmt"
	"log"
	"time"
	"github.com/google/uuid"
	"google.golang.org/grpc"
	"google.golang.org/grpc/credentials/insecure"
	"google.golang.org/protobuf/types/known/structpb"
	"google.golang.org/protobuf/types/known/timestamppb"
	a2a "github.com/owulveryck/agenthub/events/a2a"
	pb "github.com/owulveryck/agenthub/events/eventbus"
)
const (
	agentHubAddr = "localhost:50051"
	agentID      = "a2a_workflow_coordinator"
)
type A2ADocumentWorkflow struct {
	DocumentID    string
	ContextID     string                 // A2A conversation context
	Status        string
	CurrentStage  string
	TaskHistory   []*a2a.Task           // Complete A2A task history
	Artifacts     []*a2a.Artifact       // Collected artifacts from stages
	StartTime     time.Time
	client        pb.AgentHubClient      // A2A-compliant client
}
func main() {
	conn, err := grpc.Dial(agentHubAddr, grpc.WithTransportCredentials(insecure.NewCredentials()))
	if err != nil {
		log.Fatalf("Failed to connect: %v", err)
	}
	defer conn.Close()
	client := pb.NewAgentHubClient(conn)
	coordinator := &A2AWorkflowCoordinator{
		client:        client,
		workflows:     make(map[string]*A2ADocumentWorkflow),
	}
	ctx := context.Background()
	// Start listening for A2A task events
	go coordinator.subscribeToA2AEvents(ctx)
	// Start processing documents with A2A workflow
	coordinator.startA2ADocumentProcessing(ctx)
	// Keep running
	select {}
}
type A2AWorkflowCoordinator struct {
	client    pb.AgentHubClient
	workflows map[string]*A2ADocumentWorkflow
}
func (wc *A2AWorkflowCoordinator) startA2ADocumentProcessing(ctx context.Context) {
	// Simulate document arrival with A2A structured content
	documents := []map[string]interface{}{
		{
			"document_id": "doc_001",
			"content":     "This is a sample business document about quarterly results.",
			"filename":    "q3_results.txt",
			"source":      "email_attachment",
			"doc_type":    "business_report",
		},
		{
			"document_id": "doc_002",
			"content":     "Technical specification for the new API endpoints and authentication mechanisms.",
			"filename":    "api_spec.txt",
			"source":      "file_upload",
			"doc_type":    "technical_spec",
		},
	}
	for _, doc := range documents {
		wc.processA2ADocument(ctx, doc)
		time.Sleep(5 * time.Second)
	}
}
func (wc *A2AWorkflowCoordinator) processA2ADocument(ctx context.Context, document map[string]interface{}) {
	documentID := document["document_id"].(string)
	contextID := fmt.Sprintf("doc_workflow_%s_%s", documentID, uuid.New().String())
	workflow := &A2ADocumentWorkflow{
		DocumentID:   documentID,
		ContextID:    contextID,
		Status:       "started",
		CurrentStage: "intake",
		TaskHistory:  make([]*a2a.Task, 0),
		Artifacts:    make([]*a2a.Artifact, 0),
		StartTime:    time.Now(),
		client:       wc.client,
	}
	wc.workflows[documentID] = workflow
	log.Printf("Starting A2A document processing workflow for %s with context %s", documentID, contextID)
	// Stage 1: A2A Document Intake
	wc.publishA2ATask(ctx, "document_intake", document, "a2a_document_intake_agent", workflow)
}
func (wc *A2AWorkflowCoordinator) publishA2ATask(ctx context.Context, taskDescription string, params map[string]interface{}, targetAgent string, workflow *A2ADocumentWorkflow) {
	taskID := fmt.Sprintf("task_%s_%s", taskDescription, uuid.New().String())
	messageID := fmt.Sprintf("msg_%d_%s", time.Now().Unix(), uuid.New().String())
	// Create A2A structured content
	paramsData, err := structpb.NewStruct(params)
	if err != nil {
		log.Printf("Error creating parameters: %v", err)
		return
	}
	// Create A2A message with structured parts
	requestMessage := &a2a.Message{
		MessageId: messageID,
		ContextId: workflow.ContextID,
		TaskId:    taskID,
		Role:      a2a.Role_USER,
		Content: []*a2a.Part{
			{
				Part: &a2a.Part_Text{
					Text: fmt.Sprintf("Please process %s for document %s", taskDescription, workflow.DocumentID),
				},
			},
			{
				Part: &a2a.Part_Data{
					Data: &a2a.DataPart{
						Data:        paramsData,
						Description: fmt.Sprintf("%s parameters", taskDescription),
					},
				},
			},
		},
	}
	// Create A2A task
	task := &a2a.Task{
		Id:        taskID,
		ContextId: workflow.ContextID,
		Status: &a2a.TaskStatus{
			State:     a2a.TaskState_TASK_STATE_SUBMITTED,
			Update:    requestMessage,
			Timestamp: timestamppb.Now(),
		},
		History: []*a2a.Message{requestMessage},
		Metadata: paramsData,
	}
	// Store in workflow history
	workflow.TaskHistory = append(workflow.TaskHistory, task)
	// Publish A2A task update
	req := &pb.PublishTaskUpdateRequest{
		Task: task,
		Routing: &pb.AgentEventMetadata{
			FromAgentId: agentID,
			ToAgentId:   targetAgent,
			EventType:   "task.submitted",
			Priority:    pb.Priority_PRIORITY_MEDIUM,
		},
	}
	log.Printf("Publishing A2A %s task for workflow %s in context %s", taskDescription, workflow.DocumentID, workflow.ContextID)
	_, err = wc.client.PublishTaskUpdate(ctx, req)
	if err != nil {
		log.Printf("Error publishing A2A task: %v", err)
	}
}
func (wc *WorkflowCoordinator) subscribeToResults(ctx context.Context) {
	req := &pb.SubscribeToTaskResultsRequest{
		RequesterAgentId: agentID,
	}
	stream, err := wc.client.SubscribeToTaskResults(ctx, req)
	if err != nil {
		log.Printf("Error subscribing to results: %v", err)
		return
	}
	for {
		result, err := stream.Recv()
		if err != nil {
			log.Printf("Error receiving result: %v", err)
			return
		}
		wc.handleTaskResult(ctx, result)
	}
}
func (wc *WorkflowCoordinator) handleTaskResult(ctx context.Context, result *pb.TaskResult) {
	params := result.GetResult().AsMap()
	workflowID := params["workflow_id"].(string)
	stage := params["stage"].(string)
	workflow, exists := wc.workflows[workflowID]
	if !exists {
		log.Printf("Unknown workflow ID: %s", workflowID)
		return
	}
	log.Printf("Received result for workflow %s, stage %s: %s",
		workflowID, stage, result.GetStatus().String())
	if result.GetStatus() == pb.TaskStatus_TASK_STATUS_FAILED {
		workflow.Status = "failed"
		log.Printf("Workflow %s failed at stage %s: %s",
			workflowID, stage, result.GetErrorMessage())
		return
	}
	// Store stage results
	workflow.Results[stage] = params
	// Advance to next stage
	wc.advanceWorkflow(ctx, workflow, stage)
}
func (wc *WorkflowCoordinator) advanceWorkflow(ctx context.Context, workflow *DocumentWorkflow, completedStage string) {
	switch completedStage {
	case "document_intake":
		// Move to validation
		workflow.CurrentStage = "validation"
		data := workflow.Results["document_intake"]
		wc.publishTask(ctx, "document_validation", data.(map[string]interface{}), "validation_agent", workflow.DocumentID)
	case "document_validation":
		// Move to metadata extraction
		workflow.CurrentStage = "metadata_extraction"
		data := workflow.Results["document_validation"]
		wc.publishTask(ctx, "metadata_extraction", data.(map[string]interface{}), "metadata_agent", workflow.DocumentID)
	case "metadata_extraction":
		// Move to text processing
		workflow.CurrentStage = "text_processing"
		data := workflow.Results["metadata_extraction"]
		wc.publishTask(ctx, "text_processing", data.(map[string]interface{}), "text_processor_agent", workflow.DocumentID)
	case "text_processing":
		// Move to summary generation
		workflow.CurrentStage = "summary_generation"
		data := workflow.Results["text_processing"]
		wc.publishTask(ctx, "summary_generation", data.(map[string]interface{}), "summary_agent", workflow.DocumentID)
	case "summary_generation":
		// Workflow complete
		workflow.Status = "completed"
		workflow.CurrentStage = "finished"
		duration := time.Since(workflow.StartTime)
		log.Printf("Workflow %s completed successfully in %v", workflow.DocumentID, duration)
		wc.printWorkflowSummary(workflow)
	}
}
func (wc *WorkflowCoordinator) printWorkflowSummary(workflow *DocumentWorkflow) {
	fmt.Printf("\n=== WORKFLOW SUMMARY ===\n")
	fmt.Printf("Document ID: %s\n", workflow.DocumentID)
	fmt.Printf("Status: %s\n", workflow.Status)
	fmt.Printf("Duration: %v\n", time.Since(workflow.StartTime))
	fmt.Printf("Stages completed:\n")
	for stage, result := range workflow.Results {
		fmt.Printf("  - %s: %v\n", stage, result)
	}
	fmt.Printf("=======================\n\n")
}
Step 2: Create Specialized Agents
Now let’s create each specialized agent that handles specific stages of the pipeline.
Document Intake Agent
Create agents/document_intake/main.go:
package main
import (
	"context"
	"crypto/md5"
	"fmt"
	"io"
	"log"
	"strings"
	"time"
	"google.golang.org/grpc"
	"google.golang.org/grpc/credentials/insecure"
	"google.golang.org/protobuf/types/known/structpb"
	"google.golang.org/protobuf/types/known/timestamppb"
	pb "github.com/owulveryck/agenthub/events/a2a"
)
const (
	agentHubAddr = "localhost:50051"
	agentID      = "document_intake_agent"
)
func main() {
	conn, err := grpc.Dial(agentHubAddr, grpc.WithTransportCredentials(insecure.NewCredentials()))
	if err != nil {
		log.Fatalf("Failed to connect: %v", err)
	}
	defer conn.Close()
	client := pb.NewEventBusClient(conn)
	agent := &DocumentIntakeAgent{client: client}
	ctx := context.Background()
	agent.start(ctx)
}
type DocumentIntakeAgent struct {
	client pb.EventBusClient
}
func (dia *DocumentIntakeAgent) start(ctx context.Context) {
	log.Printf("Document Intake Agent %s starting...", agentID)
	req := &pb.SubscribeToTasksRequest{
		AgentId:   agentID,
		TaskTypes: []string{"document_intake"},
	}
	stream, err := dia.client.SubscribeToTasks(ctx, req)
	if err != nil {
		log.Fatalf("Error subscribing: %v", err)
	}
	log.Printf("Subscribed to document intake tasks")
	for {
		task, err := stream.Recv()
		if err == io.EOF {
			return
		}
		if err != nil {
			log.Printf("Error receiving task: %v", err)
			return
		}
		go dia.processTask(ctx, task)
	}
}
func (dia *DocumentIntakeAgent) processTask(ctx context.Context, task *pb.TaskMessage) {
	log.Printf("Processing document intake task: %s", task.GetTaskId())
	params := task.GetParameters().AsMap()
	// Simulate document intake processing
	time.Sleep(2 * time.Second)
	// Generate document hash
	content := params["content"].(string)
	hash := fmt.Sprintf("%x", md5.Sum([]byte(content)))
	// Extract basic metadata
	wordCount := len(strings.Fields(content))
	charCount := len(content)
	result := map[string]interface{}{
		"document_id":   params["document_id"],
		"workflow_id":   params["workflow_id"],
		"stage":         "document_intake",
		"content":       content,
		"filename":      params["filename"],
		"source":        params["source"],
		"document_hash": hash,
		"word_count":    wordCount,
		"char_count":    charCount,
		"intake_timestamp": time.Now().Format(time.RFC3339),
		"status":        "intake_complete",
	}
	dia.publishResult(ctx, task, result, pb.TaskStatus_TASK_STATUS_COMPLETED, "")
}
func (dia *DocumentIntakeAgent) publishResult(ctx context.Context, originalTask *pb.TaskMessage, result map[string]interface{}, status pb.TaskStatus, errorMsg string) {
	resultStruct, err := structpb.NewStruct(result)
	if err != nil {
		log.Printf("Error creating result struct: %v", err)
		return
	}
	taskResult := &pb.TaskResult{
		TaskId:          originalTask.GetTaskId(),
		Status:          status,
		Result:          resultStruct,
		ErrorMessage:    errorMsg,
		ExecutorAgentId: agentID,
		CompletedAt:     timestamppb.Now(),
	}
	req := &pb.PublishTaskResultRequest{Result: taskResult}
	_, err = dia.client.PublishTaskResult(ctx, req)
	if err != nil {
		log.Printf("Error publishing result: %v", err)
	} else {
		log.Printf("Published result for task %s", originalTask.GetTaskId())
	}
}
Validation Agent
Create agents/validation/main.go:
package main
import (
	"context"
	"io"
	"log"
	"strings"
	"time"
	"google.golang.org/grpc"
	"google.golang.org/grpc/credentials/insecure"
	"google.golang.org/protobuf/types/known/structpb"
	"google.golang.org/protobuf/types/known/timestamppb"
	pb "github.com/owulveryck/agenthub/events/a2a"
)
const (
	agentHubAddr = "localhost:50051"
	agentID      = "validation_agent"
)
func main() {
	conn, err := grpc.Dial(agentHubAddr, grpc.WithTransportCredentials(insecure.NewCredentials()))
	if err != nil {
		log.Fatalf("Failed to connect: %v", err)
	}
	defer conn.Close()
	client := pb.NewEventBusClient(conn)
	agent := &ValidationAgent{client: client}
	ctx := context.Background()
	agent.start(ctx)
}
type ValidationAgent struct {
	client pb.EventBusClient
}
func (va *ValidationAgent) start(ctx context.Context) {
	log.Printf("Validation Agent %s starting...", agentID)
	req := &pb.SubscribeToTasksRequest{
		AgentId:   agentID,
		TaskTypes: []string{"document_validation"},
	}
	stream, err := va.client.SubscribeToTasks(ctx, req)
	if err != nil {
		log.Fatalf("Error subscribing: %v", err)
	}
	log.Printf("Subscribed to document validation tasks")
	for {
		task, err := stream.Recv()
		if err == io.EOF {
			return
		}
		if err != nil {
			log.Printf("Error receiving task: %v", err)
			return
		}
		go va.processTask(ctx, task)
	}
}
func (va *ValidationAgent) processTask(ctx context.Context, task *pb.TaskMessage) {
	log.Printf("Processing validation task: %s", task.GetTaskId())
	params := task.GetParameters().AsMap()
	// Simulate validation processing
	time.Sleep(1500 * time.Millisecond)
	content := params["content"].(string)
	// Perform validation checks
	validationResults := va.validateDocument(content)
	result := map[string]interface{}{
		"document_id":       params["document_id"],
		"workflow_id":       params["workflow_id"],
		"stage":             "document_validation",
		"content":           content,
		"filename":          params["filename"],
		"source":            params["source"],
		"document_hash":     params["document_hash"],
		"word_count":        params["word_count"],
		"char_count":        params["char_count"],
		"intake_timestamp":  params["intake_timestamp"],
		"validation_results": validationResults,
		"validation_timestamp": time.Now().Format(time.RFC3339),
		"status":            "validation_complete",
	}
	var status pb.TaskStatus
	var errorMsg string
	if validationResults["is_valid"].(bool) {
		status = pb.TaskStatus_TASK_STATUS_COMPLETED
	} else {
		status = pb.TaskStatus_TASK_STATUS_FAILED
		errorMsg = "Document validation failed: " + validationResults["errors"].(string)
	}
	va.publishResult(ctx, task, result, status, errorMsg)
}
func (va *ValidationAgent) validateDocument(content string) map[string]interface{} {
	// Simple validation rules
	isValid := true
	var errors []string
	// Check minimum length
	if len(content) < 10 {
		isValid = false
		errors = append(errors, "content too short")
	}
	// Check for suspicious content
	suspiciousTerms := []string{"malware", "virus", "hack"}
	for _, term := range suspiciousTerms {
		if strings.Contains(strings.ToLower(content), term) {
			isValid = false
			errors = append(errors, "suspicious content detected")
			break
		}
	}
	// Check language (simple heuristic)
	isEnglish := va.isEnglishContent(content)
	return map[string]interface{}{
		"is_valid":    isValid,
		"is_english":  isEnglish,
		"errors":      strings.Join(errors, "; "),
		"length_ok":   len(content) >= 10,
		"safe_content": !strings.Contains(strings.ToLower(content), "malware"),
	}
}
func (va *ValidationAgent) isEnglishContent(content string) bool {
	// Simple heuristic: check for common English words
	commonWords := []string{"the", "and", "or", "but", "in", "on", "at", "to", "for", "of", "with", "by"}
	lowerContent := strings.ToLower(content)
	matches := 0
	for _, word := range commonWords {
		if strings.Contains(lowerContent, " "+word+" ") {
			matches++
		}
	}
	return matches >= 2
}
func (va *ValidationAgent) publishResult(ctx context.Context, originalTask *pb.TaskMessage, result map[string]interface{}, status pb.TaskStatus, errorMsg string) {
	resultStruct, err := structpb.NewStruct(result)
	if err != nil {
		log.Printf("Error creating result struct: %v", err)
		return
	}
	taskResult := &pb.TaskResult{
		TaskId:          originalTask.GetTaskId(),
		Status:          status,
		Result:          resultStruct,
		ErrorMessage:    errorMsg,
		ExecutorAgentId: agentID,
		CompletedAt:     timestamppb.Now(),
	}
	req := &pb.PublishTaskResultRequest{Result: taskResult}
	_, err = va.client.PublishTaskResult(ctx, req)
	if err != nil {
		log.Printf("Error publishing result: %v", err)
	} else {
		log.Printf("Published result for task %s", originalTask.GetTaskId())
	}
}
Step 3: Build and Test the Multi-Agent System
Update the Makefile to include the new agents:
# Add to Makefile build target
build: proto
	@echo "Building server binary..."
	go build $(GO_BUILD_FLAGS) -o bin/$(SERVER_BINARY) broker/main.go
	@echo "Building coordinator binary..."
	go build $(GO_BUILD_FLAGS) -o bin/coordinator agents/coordinator/main.go
	@echo "Building document intake agent..."
	go build $(GO_BUILD_FLAGS) -o bin/document-intake agents/document_intake/main.go
	@echo "Building validation agent..."
	go build $(GO_BUILD_FLAGS) -o bin/validation agents/validation/main.go
	@echo "Building publisher binary..."
	go build $(GO_BUILD_FLAGS) -o bin/$(PUBLISHER_BINARY) agents/publisher/main.go
	@echo "Building subscriber binary..."
	go build $(GO_BUILD_FLAGS) -o bin/$(SUBSCRIBER_BINARY) agents/subscriber/main.go
	@echo "Build complete. Binaries are in the 'bin/' directory."
Build all components:
Step 4: Run the Multi-Agent Workflow
Now let’s run the complete multi-agent system:
Terminal 1 - Start the broker:
Terminal 2 - Start the document intake agent:
Terminal 3 - Start the validation agent:
Terminal 4 - Start the workflow coordinator:
Step 5: Observe the Workflow
You’ll see the workflow coordinator processing documents through multiple stages:
- Document Intake: Receives and processes raw documents
- Validation: Checks content for safety and validity
- Metadata Extraction: Extracts structured metadata
- Text Processing: Processes and analyzes text content
- Summary Generation: Creates document summaries
Each agent processes its stage and passes results to the next stage via the AgentHub broker.
Understanding the Multi-Agent Pattern
This tutorial demonstrates several key patterns:
1. Workflow Orchestration
The coordinator agent manages the overall workflow, determining which stage comes next and handling failures.
2. Specialized Agents
Each agent has a specific responsibility and can be developed, deployed, and scaled independently.
3. Asynchronous Processing
Agents work asynchronously, allowing for better resource utilization and scalability.
4. Error Handling
The system handles failures gracefully, with the coordinator managing workflow state.
5. Data Flow
Structured data flows between agents, with each stage adding value to the processing pipeline.
Next Steps
Now that you understand multi-agent workflows:
- Add more agents: Create metadata extraction, text processing, and summary agents
- Implement error recovery: Add retry mechanisms and failure handling
- Add monitoring: Create a dashboard agent that tracks workflow progress
- Scale the system: Run multiple instances of each agent type
- Add persistence: Store workflow state in a database for recovery
This pattern scales to handle complex business processes, data pipelines, and automated workflows in production systems.
Common Patterns and Best Practices
Workflow State Management
- Store workflow state persistently for recovery
- Use unique workflow IDs for tracking
- Implement timeouts for stuck workflows
Agent Communication
- Use structured messages with clear schemas
- Include metadata for routing and tracking
- Implement progress reporting for long-running tasks
Error Handling
- Design for partial failures
- Implement retry mechanisms with backoff
- Provide clear error messages and recovery paths
Monitoring and Observability
- Log all state transitions
- Track workflow performance metrics
- Implement health checks for agents
You now have the foundation for building sophisticated multi-agent systems that can handle complex, real-world workflows!