This is the multi-page printable view of this section. Click here to print.

Return to the regular view of this page.

Tutorials

Learning-oriented guides that take you through practical exercises to master AgentHub

Tutorials

These hands-on tutorials will guide you through learning AgentHub by doing. Each tutorial is designed to be followed step-by-step and will help you build practical experience with the system.

πŸ“š Tutorial Categories

🎯 Learning Path

1. Start Here

Begin with Getting Started tutorials to install and run your first examples

2. Build Systems

Progress to Workflows to create sophisticated agent interactions

3. Monitor & Observe

Master Observability to monitor and troubleshoot your deployments

πŸ“‹ Prerequisites

Before starting these tutorials, make sure you have:

  • Go 1.24 or later installed
  • Basic understanding of command-line tools
  • Familiarity with distributed systems concepts (helpful but not required)

πŸ’‘ Tutorial Tips

  • Follow tutorials in order for the best learning experience
  • Each tutorial builds on concepts from previous ones
  • Code examples are tested and should work as written
  • Don’t hesitate to experiment beyond the tutorial steps

1 - Getting Started

Essential tutorials to get you up and running with AgentHub

Getting Started Tutorials

Step-by-step tutorials to help you get AgentHub installed, configured, and running your first examples.

Available Tutorials

1.1 - Installation and Setup Tutorial

Guide for installing AgentHub and setting up your development environment from scratch. Get a working A2A-compliant AgentHub installation ready for building agent systems.

Installation and Setup Tutorial

This tutorial will guide you through installing AgentHub and setting up your development environment from scratch. By the end, you’ll have a working A2A-compliant AgentHub installation ready for building Agent2Agent protocol systems.

Prerequisites Check

Before we begin, let’s verify you have the required software installed.

Step 1: Verify Go Installation

Check if Go 1.24+ is installed:

go version

You should see output like:

go version go1.24.0 darwin/amd64

If Go is not installed or the version is older than 1.24:

macOS (using Homebrew):

brew install go

Linux (using package manager):

# Ubuntu/Debian
sudo apt update && sudo apt install golang-go

# CentOS/RHEL
sudo yum install golang

# Arch Linux
sudo pacman -S go

Windows: Download from https://golang.org/dl/ and run the installer.

Step 2: Verify Protocol Buffers Compiler

Check if protoc is installed:

protoc --version

You should see output like:

libprotoc 3.21.12

If protoc is not installed:

macOS (using Homebrew):

brew install protobuf

Linux:

# Ubuntu/Debian
sudo apt update && sudo apt install protobuf-compiler

# CentOS/RHEL
sudo yum install protobuf-compiler

# Arch Linux
sudo pacman -S protobuf

Windows: Download from Protocol Buffers releases and add to PATH.

Step 3: Install Go Protocol Buffer Plugins

Install the required Go plugins for Protocol Buffers:

go install google.golang.org/protobuf/cmd/protoc-gen-go@latest
go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest

Verify the plugins are in your PATH:

which protoc-gen-go
which protoc-gen-go-grpc

Both commands should return paths to the installed plugins.

Installing AgentHub

Step 4: Clone the Repository

Clone the AgentHub repository:

git clone https://github.com/owulveryck/agenthub.git
cd agenthub

Step 5: Verify Project Structure

Let’s explore what we have:

ls -la

You should see:

drwxr-xr-x agents/           # Sample A2A agent implementations
drwxr-xr-x broker/           # A2A-compliant AgentHub broker server
drwxr-xr-x documentation/    # Complete A2A documentation
drwxr-xr-x events/           # Generated A2A protocol code
drwxr-xr-x internal/         # Internal packages and abstractions
-rw-r--r-- go.mod            # Go module definition
-rw-r--r-- Makefile         # Build automation
drwxr-xr-x proto/           # A2A protocol definitions
-rw-r--r-- README.md        # Project overview

Step 6: Initialize Go Module

Ensure Go modules are properly initialized:

go mod tidy

This downloads all required dependencies. You should see output about downloading packages.

Step 7: Generate Protocol Buffer Code

Generate the Go code from Protocol Buffer definitions:

make proto

You should see:

Generating protobuf code for A2A protocol definitions...
Generating proto/eventbus.proto...
Generating proto/a2a.proto...
Protobuf code generated successfully.

Verify the generated files exist:

ls events/

You should see:

a2a/          # A2A protocol definitions
eventbus/     # AgentHub broker definitions
ls events/a2a/

You should see:

a2a.pb.go
a2a_grpc.pb.go
ls events/eventbus/

You should see:

eventbus.pb.go
eventbus_grpc.pb.go

Step 8: Build All Components

Build the AgentHub components:

make build

You should see:

Building A2A-compliant server binary...
Building A2A publisher binary...
Building A2A subscriber binary...
Build complete. A2A-compliant binaries are in the 'bin/' directory.

Verify the binaries were created:

ls bin/

You should see:

agenthub-server  # A2A-compliant AgentHub broker
publisher        # A2A message publisher
subscriber       # A2A message subscriber

Verification Test

Let’s verify everything works by running a quick test.

Step 9: Test the Installation

Start the A2A-compliant broker server in the background:

./bin/agenthub-server &

You should see:

2025/09/28 10:00:00 A2A-compliant AgentHub broker gRPC server listening on [::]:50051
2025/09/28 10:00:00 AgentHub service ready for A2A protocol communication

Start an A2A subscriber agent:

./bin/subscriber &

You should see:

A2A Agent started. Listening for A2A events and tasks. Press Enter to stop.
2025/09/28 10:00:05 A2A Agent agent_demo_subscriber subscribing to A2A tasks...
2025/09/28 10:00:05 Successfully subscribed to A2A tasks for agent agent_demo_subscriber. Waiting for A2A tasks...

Run the A2A publisher to send test tasks:

./bin/publisher

You should see A2A tasks being published and processed with conversation context and structured artifacts.

Clean up the test processes:

pkill -f agenthub-server
pkill -f subscriber

Development Environment Setup

Step 10: Configure Your Editor

For VS Code users:

Install the Go extension:

  1. Open VS Code
  2. Go to Extensions (Ctrl+Shift+X)
  3. Search for “Go” and install the official Go extension
  4. Open the AgentHub project folder

For other editors:

Ensure your editor has Go language support and Protocol Buffer syntax highlighting.

AgentHub uses environment variables for configuration. Create a .envrc file for local development:

cat > .envrc << EOF
# Core A2A AgentHub Configuration
export AGENTHUB_BROKER_ADDR="localhost"
export AGENTHUB_BROKER_PORT="50051"
export AGENTHUB_GRPC_PORT=":50051"

# A2A Protocol Configuration
export AGENTHUB_A2A_PROTOCOL_VERSION="1.0"
export AGENTHUB_MESSAGE_BUFFER_SIZE="100"
export AGENTHUB_CONTEXT_TIMEOUT="30s"
export AGENTHUB_ARTIFACT_MAX_SIZE="10MB"

# Health Check Ports
export AGENTHUB_HEALTH_PORT="8080"
export A2A_PUBLISHER_HEALTH_PORT="8081"
export A2A_SUBSCRIBER_HEALTH_PORT="8082"

# Observability (optional for development)
export JAEGER_ENDPOINT="http://localhost:14268/api/traces"
export SERVICE_NAME="agenthub-dev"
export SERVICE_VERSION="dev"
export ENVIRONMENT="development"
export LOG_LEVEL="DEBUG"
EOF

Install direnv for automatic loading (recommended):

# macOS
brew install direnv

# Ubuntu/Debian
sudo apt install direnv

# After installation, add to your shell
echo 'eval "$(direnv hook bash)"' >> ~/.bashrc  # For bash
echo 'eval "$(direnv hook zsh)"' >> ~/.zshrc    # For zsh

Allow the environment file:

direnv allow

Alternative: Manual loading

source .envrc

πŸ“– For complete environment variable reference, see Environment Variables Reference

Step 12: Verify Make Targets

Test all available make targets:

make help

You should see all available commands:

Makefile for gRPC Event Bus

Usage:
  make <target>

Targets:
  all              Builds all binaries (default).
  proto            Generates Go code from .proto files.
  build            Builds the server, publisher, and subscriber binaries.
  run-server       Runs the event bus gRPC server.
  run-publisher    Runs the publisher client.
  run-subscriber   Runs the subscriber client.
  clean            Removes generated Go files and build artifacts.
  help             Displays this help message.

Common Issues and Solutions

Issue: “protoc-gen-go: program not found”

Solution: Ensure Go bin directory is in your PATH:

export PATH=$PATH:$(go env GOPATH)/bin
echo 'export PATH=$PATH:$(go env GOPATH)/bin' >> ~/.bashrc
source ~/.bashrc

Issue: “go.mod not found”

Solution: Ensure you’re in the AgentHub project directory:

pwd  # Should show .../agenthub
ls go.mod  # Should exist

Issue: Port 50051 already in use

Solution: Kill existing processes or change the port:

lsof -ti:50051 | xargs kill -9

Issue: Permission denied on binaries

Solution: Make binaries executable:

chmod +x bin/*

Next Steps

Now that you have AgentHub installed and verified:

  1. Learn the basics: Follow the Running the Demo tutorial
  2. Build your first agent: Try Create a Subscriber
  3. Understand the concepts: Read The Agent2Agent Principle

Getting Help

If you encounter issues:

  1. Check the troubleshooting section above
  2. Review the complete documentation
  3. Open an issue on the GitHub repository

Congratulations! You now have a fully functional AgentHub development environment ready for building autonomous agent systems.

1.2 - Running the A2A-Compliant AgentHub Demo

Walk through setting up and running the complete A2A-compliant AgentHub EDA broker system. Learn how agents communicate using Agent2Agent protocol messages through the Event-Driven Architecture broker.

Running the A2A-Compliant AgentHub Demo

This tutorial will walk you through setting up and running the complete Agent2Agent (A2A) protocol-compliant AgentHub Event-Driven Architecture (EDA) broker system. By the end of this tutorial, you’ll have agents communicating using standardized A2A messages through the scalable EDA broker.

Prerequisites

  • Go 1.24 or later installed
  • Protocol Buffers compiler (protoc) installed
  • Basic understanding of gRPC and message brokers

Step 1: Build the A2A-Compliant Components

First, let’s build all the A2A-compliant components using the Makefile:

# Build all A2A-compliant binaries (generates protobuf files first)
make build

This will:

  1. Generate A2A protocol files from proto/a2a_core.proto and proto/eventbus.proto
  2. Build the A2A-compliant broker, publisher, and subscriber binaries
  3. Place all binaries in the bin/ directory

You should see output like:

Building A2A-compliant server binary...
Building A2A-compliant publisher binary...
Building A2A-compliant subscriber binary...
Build complete. A2A-compliant binaries are in the 'bin/' directory.

Step 2: Start the AgentHub Broker Server

Open a terminal and start the AgentHub broker server:

./bin/broker

You should see output like:

time=2025-09-29T11:51:26.612+02:00 level=INFO msg="Starting health server" port=8080
time=2025-09-29T11:51:26.611+02:00 level=INFO msg="AgentHub gRPC server with observability listening" address=[::]:50051 health_endpoint=http://localhost:8080/health metrics_endpoint=http://localhost:8080/metrics component=broker

Keep this terminal open - the AgentHub broker needs to run continuously.

Step 3: Start an Agent (Subscriber)

Open a second terminal and start an agent that can receive and process tasks:

./bin/subscriber

You should see output indicating the agent has started:

time=2025-09-29T11:52:04.727+02:00 level=INFO msg="AgentHub client started with observability" broker_addr=localhost:50051 component=subscriber
time=2025-09-29T11:52:04.727+02:00 level=INFO msg="Starting health server" port=8082
time=2025-09-29T11:52:04.728+02:00 level=INFO msg="Agent started with observability. Listening for events and tasks."
time=2025-09-29T11:52:04.728+02:00 level=INFO msg="Subscribing to task results" agent_id=agent_demo_subscriber
time=2025-09-29T11:52:04.728+02:00 level=INFO msg="Subscribing to tasks" agent_id=agent_demo_subscriber

This agent can process several types of tasks:

  • greeting: Simple greeting messages
  • math_calculation: Basic arithmetic operations
  • random_number: Random number generation
  • Any unknown task type will be rejected

Step 4: Send A2A-Compliant Tasks

Open a third terminal and run the publisher to send A2A protocol-compliant task messages:

./bin/publisher

You’ll see the publisher send various A2A-compliant task messages through the AgentHub EDA broker:

time=2025-09-29T14:41:11.237+02:00 level=INFO msg="Starting publisher demo"
time=2025-09-29T14:41:11.237+02:00 level=INFO msg="Testing Agent2Agent Task Publishing via AgentHub with observability"
time=2025-09-29T14:41:11.237+02:00 level=INFO msg="Publishing A2A task" task_id=task_greeting_1759149671 task_type=greeting responder_agent_id=agent_demo_subscriber context_id=ctx_greeting_1759149671
time=2025-09-29T14:41:11.242+02:00 level=INFO msg="A2A task published successfully" task_id=task_greeting_1759149671 task_type=greeting event_id=evt_msg_greeting_1759149671_1759149671
time=2025-09-29T14:41:11.242+02:00 level=INFO msg="Published greeting task" task_id=task_greeting_1759149671
time=2025-09-29T14:41:14.243+02:00 level=INFO msg="Publishing A2A task" task_id=task_math_calculation_1759149674 task_type=math_calculation responder_agent_id=agent_demo_subscriber context_id=ctx_math_calculation_1759149674
time=2025-09-29T14:41:14.247+02:00 level=INFO msg="A2A task published successfully" task_id=task_math_calculation_1759149674 task_type=math_calculation event_id=evt_msg_math_calculation_1759149674_1759149674
time=2025-09-29T14:41:16.248+02:00 level=INFO msg="Publishing A2A task" task_id=task_random_number_1759149676 task_type=random_number responder_agent_id=agent_demo_subscriber context_id=ctx_random_number_1759149676
time=2025-09-29T14:41:16.249+02:00 level=INFO msg="Published random number task" task_id=task_random_number_1759149676

Notice how the A2A implementation includes:

  • Context IDs: Each task is grouped in a conversation context (ctx_greeting_...)
  • Event IDs: EDA wrapper events have unique identifiers for tracing
  • A2A Task Structure: Tasks use A2A-compliant Message and Part formats

Step 5: Observe A2A Task Processing

Switch back to the subscriber terminal to see the agent processing A2A tasks in real-time:

time=2025-09-29T14:41:11.243+02:00 level=INFO msg="Task processing completed" task_id=task_greeting_1759149671 status=TASK_STATE_COMPLETED has_artifact=true
time=2025-09-29T14:41:14.253+02:00 level=INFO msg="Task processing completed" task_id=task_math_calculation_1759149674 status=TASK_STATE_COMPLETED has_artifact=true
time=2025-09-29T14:41:16.249+02:00 level=INFO msg="Task processing completed" task_id=task_random_number_1759149676 status=TASK_STATE_COMPLETED has_artifact=true

Notice the A2A-compliant processing:

  • Task States: Using A2A standard states (TASK_STATE_COMPLETED)
  • Artifacts: Each completed task generates A2A artifacts (has_artifact=true)
  • Structured Processing: Tasks are processed using A2A Message and Part handlers

Step 6: Check the Broker Logs

In the first terminal (broker server), you’ll see logs showing message routing:

2025/09/27 16:34:33 Received task request: task_greeting_1758983673 (type: greeting) from agent: agent_demo_publisher
2025/09/27 16:34:35 Received task result for task: task_greeting_1758983673 from agent: agent_demo_subscriber
2025/09/27 16:34:35 Received task progress for task: task_greeting_1758983673 (100%) from agent: agent_demo_subscriber

Understanding What Happened

  1. A2A Message Creation: The publisher created A2A-compliant messages with:

    • Message Structure: Using A2A Message format with Part content
    • Context Grouping: Each task belongs to a conversation context
    • Task Association: Messages are linked to specific A2A tasks
    • Role Definition: Messages specify USER (requester) or AGENT (responder) roles
  2. EDA Event Routing: The AgentHub EDA broker:

    • Wrapped A2A Messages: A2A messages wrapped in AgentEvent for EDA transport
    • Event-Driven Routing: Used EDA patterns for scalable message delivery
    • Task Storage: Stored A2A tasks with full message history and artifacts
    • Status Tracking: Managed A2A task lifecycle (SUBMITTED β†’ WORKING β†’ COMPLETED)
  3. A2A Task Processing: The subscriber agent:

    • A2A Task Reception: Received A2A tasks via EDA event streams
    • Message Processing: Processed A2A Message content using Part handlers
    • Artifact Generation: Generated structured A2A artifacts as task output
    • Status Updates: Published A2A-compliant status updates through EDA events
  4. Hybrid Architecture Benefits:

    • A2A Compliance: Full interoperability with other A2A-compliant systems
    • EDA Scalability: Event-driven patterns for high-throughput scenarios
    • Standards-Based: Using industry-standard Agent2Agent protocol
    • Observable: Built-in tracing and metrics for production deployment

Next Steps

Now that you have the basic system working, you can:

  1. Create Multiple Agents: Run multiple subscriber instances with different agent IDs to see task distribution
  2. Add Custom Task Types: Modify the subscriber to handle new types of tasks
  3. Build a Request-Response Flow: Create an agent that both requests and processes tasks
  4. Monitor Task Progress: Build a dashboard that subscribes to task progress updates

Troubleshooting

Port Already in Use: If you see “bind: address already in use”, kill any existing processes:

lsof -ti:50051 | xargs kill -9

Agent Not Receiving Tasks: Ensure the agent ID in the publisher matches the subscriber’s agent ID (agent_demo_subscriber).

Build Errors: Regenerate A2A protocol buffer files and ensure all imports are correct:

# Clean old protobuf files
make clean

# Regenerate A2A protobuf files
make proto

# Rebuild everything
make build

A2A Compliance Issues: Verify A2A protocol structures are correctly generated:

# Check A2A core types
ls events/a2a/

# Should show: a2a_core.pb.go eventbus.pb.go eventbus_grpc.pb.go

You now have a working A2A-compliant AgentHub EDA broker system! The agents can exchange standardized A2A messages, maintain conversation contexts, generate structured artifacts, and track task lifecycles - all through your scalable Event-Driven Architecture broker with full Agent2Agent protocol compliance.

2 - Observability

Tutorials for monitoring and observing AgentHub systems

Observability Tutorials

Learn how to monitor, trace, and observe your AgentHub deployments with comprehensive observability features.

Available Tutorials

2.1 - Interactive Dashboard Tour

Take a guided tour through AgentHub’s Grafana dashboards while the system is running, learning to interpret metrics, identify issues, and understand system behavior in real-time.

Interactive Dashboard Tour

Learn by doing: Take a guided tour through AgentHub’s Grafana dashboards while the system is running, learning to interpret metrics, identify issues, and understand system behavior in real-time.

Prerequisites

  • Observability stack running (from the Observability Demo)
  • Observable agents running (broker, publisher, subscriber)
  • Grafana open at http://localhost:3333
  • 10-15 minutes for the complete tour

Quick Setup Reminder

If you haven’t completed the observability demo yet:

# Start observability stack
cd agenthub/observability
docker-compose up -d

# Run observable agents (3 terminals)
go run broker/main.go
go run agents/subscriber/main.go
go run agents/publisher/main.go

Dashboard Navigation

Accessing the Main Dashboard

  1. Open Grafana: http://localhost:3333
  2. Login: admin / admin (skip password change for demo)
  3. Navigate: Dashboards β†’ Browse β†’ AgentHub β†’ “AgentHub EDA System Observatory”
  4. Bookmark: Save this URL for quick access: http://localhost:3333/d/agenthub-eda-dashboard

Dashboard Layout Overview

The dashboard is organized in 4 main rows:

🎯 Row 1: Event Processing Overview
β”œβ”€β”€ Event Processing Rate (events/sec)
└── Event Processing Error Rate (%)

πŸ“Š Row 2: Event Analysis
β”œβ”€β”€ Event Types Distribution (pie chart)
└── Event Processing Latency (p50, p95, p99)

πŸ” Row 3: Distributed Tracing
└── Jaeger Integration Panel

πŸ’» Row 4: System Health
β”œβ”€β”€ Service CPU Usage (%)
β”œβ”€β”€ Service Memory Usage (MB)
β”œβ”€β”€ Go Goroutines Count
└── Service Health Status

Interactive Tour

Tour 1: Understanding Event Flow (3 minutes)

Step 1: Watch the Event Processing Rate

Location: Top-left panel What to observe: Real-time lines showing events per second

  1. Identify the services:

    • Green line: agenthub-broker (should be highest - processes all events)
    • Blue line: agenthub-publisher (events being created)
    • Orange line: agenthub-subscriber (events being processed)
  2. Watch the pattern:

    • Publisher creates bursts of events
    • Broker immediately processes them (routing)
    • Subscriber processes them shortly after
  3. Understand the flow:

    Publisher (creates) β†’ Broker (routes) β†’ Subscriber (processes)
         50/sec      β†’      150/sec     β†’      145/sec
    

πŸ’‘ Tour Insight: The broker rate is higher because it processes both incoming tasks AND outgoing results.

Step 2: Monitor Error Rates

Location: Top-right panel (gauge) What to observe: Error percentage gauge

  1. Healthy system: Should show 0-2% (green zone)

  2. If you see higher errors:

    • Check if all services are running
    • Look for red traces in Jaeger (we’ll do this next)
  3. Error rate calculation:

    Error Rate = (Failed Events / Total Events) Γ— 100
    

🎯 Action: Note your current error rate - we’ll compare it later.

Tour 2: Event Analysis Deep Dive (3 minutes)

Step 3: Explore Event Types

Location: Middle-left panel (pie chart) What to observe: Distribution of different event types

  1. Identify event types:

    • greeting: Most common (usually 40-50%)
    • math_calculation: Compute-heavy tasks (30-40%)
    • random_number: Quick tasks (15-25%)
    • unknown_task: Error-generating tasks (2-5%)
  2. Business insights:

    • Larger slices = more frequent tasks
    • Small red slice = intentional error tasks for testing

πŸ’‘ Tour Insight: The publisher randomly generates different task types to simulate real-world workload diversity.

Step 4: Analyze Processing Latency

Location: Middle-right panel What to observe: Three latency lines (p50, p95, p99)

  1. Understand percentiles:

    • p50 (blue): 50% of events process faster than this
    • p95 (green): 95% of events process faster than this
    • p99 (red): 99% of events process faster than this
  2. Healthy ranges:

    • p50: < 50ms (very responsive)
    • p95: < 200ms (good performance)
    • p99: < 500ms (acceptable outliers)
  3. Pattern recognition:

    • Spiky p99 = occasional slow tasks (normal)
    • Rising p50 = systemic slowdown (investigate)
    • Flat lines = no activity or measurement issues

🎯 Action: Hover over the lines to see exact values at different times.

Tour 3: Distributed Tracing Exploration (4 minutes)

Step 5: Jump into Jaeger

Location: Middle section - “Distributed Traces” panel Action: Click the “Explore” button

This opens Jaeger in a new tab. Let’s explore:

  1. In Jaeger UI:

    • Service dropdown: Select “agenthub-broker”
    • Operation: Leave as “All”
    • Click “Find Traces”
  2. Pick a trace to examine:

    • Look for traces that show multiple spans
    • Click on any trace line to open details
  3. Understand the trace structure:

    Timeline View:
    agenthub-publisher: publish_event [2ms]
      └── agenthub-broker: process_event [1ms]
          └── agenthub-subscriber: consume_event [3ms]
              └── agenthub-subscriber: process_task [15ms]
                  └── agenthub-subscriber: publish_result [2ms]
    
  4. Explore span details:

    • Click individual spans to see:
      • Tags: event_type, event_id, agent names
      • Process: Which service handled the span
      • Duration: Exact timing information

πŸ’‘ Tour Insight: Each event creates a complete “trace” showing its journey from creation to completion.

Step 6: Find and Analyze an Error

  1. Search for error traces:

    • In Jaeger, add tag filter: error=true
    • Or look for traces with red spans
  2. Examine the error trace:

    • Red spans indicate errors
    • Error tags show the error type and message
    • Stack traces help with debugging
  3. Follow the error propagation:

    • See how errors affect child spans
    • Notice error context in span attributes

🎯 Action: Find a trace with “unknown_task” event type - these are designed to fail for demonstration.

Tour 4: System Health Monitoring (3 minutes)

Step 7: Monitor Resource Usage

Location: Bottom row panels What to observe: System resource consumption

  1. CPU Usage Panel (Bottom-left):

    • Normal range: 10-50% for demo workload
    • Watch for: Sustained high CPU (>70%)
    • Services comparison: See which service uses most CPU
  2. Memory Usage Panel (Bottom-center-left):

    • Normal range: 30-80MB per service for demo
    • Watch for: Continuously growing memory (memory leaks)
    • Pattern: Sawtooth = normal GC, steady growth = potential leak
  3. Goroutines Panel (Bottom-center-right):

    • Normal range: 10-50 goroutines per service
    • Watch for: Continuously growing count (goroutine leaks)
    • Pattern: Stable baseline with activity spikes

Step 8: Verify Service Health

Location: Bottom-right panel What to observe: Service up/down status

  1. Health indicators:

    • Green: Service healthy and responding
    • Red: Service down or health check failing
    • Yellow: Service degraded but operational
  2. Health check details:

    • Each service exposes /health endpoint
    • Prometheus monitors these endpoints
    • Dashboard shows aggregated status

🎯 Action: Open http://localhost:8080/health in a new tab to see raw health data.

Tour 5: Time-based Analysis (2 minutes)

Step 9: Change Time Ranges

Location: Top-right of dashboard (time picker) Current: Likely showing “Last 5 minutes”

  1. Try different ranges:

    • Last 15 minutes: See longer trends
    • Last 1 hour: See full demo session
    • Custom range: Pick specific time period
  2. Observe pattern changes:

    • Longer ranges: Show trends and patterns
    • Shorter ranges: Show real-time detail
    • Custom ranges: Zoom into specific incidents

Step 10: Use Dashboard Filters

Location: Top of dashboard - variable dropdowns

  1. Service Filter:

    • Select “All” to see everything
    • Pick specific service to focus analysis
    • Useful for isolating service-specific issues
  2. Event Type Filter:

    • Filter to specific event types
    • Compare performance across task types
    • Identify problematic event categories

πŸ’‘ Tour Insight: Filtering helps you drill down from system-wide view to specific components or workloads.

Hands-on Experiments

Experiment 1: Create a Service Outage

Goal: See how the dashboard shows service failures

  1. Stop the subscriber:

    # In subscriber terminal, press Ctrl+C
    
  2. Watch the dashboard changes:

    • Error rate increases (top-right gauge turns red)
    • Subscriber metrics disappear from bottom panels
    • Service health shows subscriber as down
  3. Check Jaeger for failed traces:

    • Look for traces that don’t complete
    • See where the chain breaks
  4. Restart subscriber:

    go run agents/subscriber/main.go
    

🎯 Learning: Dashboard immediately shows impact of service failures.

Experiment 2: Generate High Load

Goal: See system behavior under stress

  1. Modify publisher to generate more events:

    # Edit agents/publisher/main.go
    # Change: time.Sleep(5 * time.Second)
    # To:     time.Sleep(1 * time.Second)
    
  2. Watch dashboard changes:

    • Processing rate increases
    • Latency may increase
    • CPU/memory usage grows
  3. Observe scaling behavior:

    • How does the system handle increased load?
    • Do error rates increase?
    • Where are the bottlenecks?

🎯 Learning: Dashboard shows system performance characteristics under load.

Dashboard Interpretation Guide

What Good Looks Like

βœ… Event Processing Rate: Steady activity matching workload βœ… Error Rate: < 5% (green zone) βœ… Event Types: Expected distribution βœ… Latency: p95 < 200ms, p99 < 500ms βœ… CPU Usage: < 50% sustained βœ… Memory: Stable or slow growth with GC cycles βœ… Goroutines: Stable baseline with activity spikes βœ… Service Health: All services green/up

Warning Signs

⚠️ Error Rate: 5-10% (yellow zone) ⚠️ Latency: p95 > 200ms or rising trend ⚠️ CPU: Sustained > 70% ⚠️ Memory: Continuous growth without GC ⚠️ Missing data: Gaps in metrics (service issues)

Critical Issues

🚨 Error Rate: > 10% (red zone) 🚨 Latency: p95 > 500ms 🚨 CPU: Sustained > 90% 🚨 Memory: Rapid growth or OOM 🚨 Service Health: Any service showing red/down 🚨 Traces: Missing or broken trace chains

Next Steps After the Tour

For Daily Operations:

  • Bookmark: Save dashboard URL for quick access
  • Set up alerts: Configure notifications for critical metrics
  • Create views: Use filters to create focused views for your team

For Development:

For Deep Understanding:

Troubleshooting Tour Issues

IssueSolution
Dashboard shows no dataVerify observability environment variables are set
Grafana won’t loadCheck docker-compose ps in observability/
Metrics missingVerify Prometheus targets at http://localhost:9090/targets
Jaeger emptyEnsure trace context propagation is working

πŸŽ‰ Congratulations! You’ve completed the interactive dashboard tour and learned to read AgentHub’s observability signals like a pro!

🎯 Ready for More?

Master the Tools: Use Grafana Dashboards - Advanced dashboard usage

Troubleshoot Issues: Debug with Distributed Tracing - Use Jaeger effectively

2.2 - AgentHub Observability Demo Tutorial

Experience the complete observability stack with distributed tracing, real-time metrics, and intelligent alerting in under 10 minutes through hands-on learning.

AgentHub Observability Demo Tutorial

Learn by doing: Experience the complete observability stack with distributed tracing, real-time metrics, and intelligent alerting in under 10 minutes.

What You’ll Learn

By the end of this tutorial, you’ll have:

  • βœ… Seen distributed traces flowing across multiple agents
  • βœ… Monitored real-time metrics in beautiful Grafana dashboards
  • βœ… Understood event correlation through trace IDs
  • βœ… Experienced intelligent alerting when things go wrong
  • βœ… Explored the complete observability stack components

Prerequisites

  • Go 1.24+ installed
  • Docker and Docker Compose installed
  • Environment variables configured (see Installation and Setup)
  • 10 minutes of your time
  • Basic terminal knowledge

πŸ’‘ Environment Note: AgentHub agents automatically enable observability when JAEGER_ENDPOINT is configured. See Environment Variables Reference for all configuration options.

Step 1: Clone and Setup (1 minute)

# Clone the repository
git clone https://github.com/owulveryck/agenthub.git
cd agenthub

# Verify you have the observability files
ls observability/
# You should see: docker-compose.yml, grafana/, prometheus/, etc.

Step 2: Start the Observability Stack (2 minutes)

# Navigate to observability directory
cd observability

# Start all monitoring services
docker-compose up -d

# Verify services are running
docker-compose ps

Expected Output:

NAME                    COMMAND                  SERVICE             STATUS
agenthub-grafana        "/run.sh"                grafana             running
agenthub-jaeger         "/go/bin/all-in-one"     jaeger              running
agenthub-prometheus     "/bin/prometheus --c…"   prometheus          running
agenthub-otel-collector "/otelcol-contrib --…"   otel-collector      running

🎯 Checkpoint 1: All services should be “running”. If not, check Docker logs: docker-compose logs <service-name>

Step 3: Access the Dashboards (1 minute)

Open these URLs in your browser (keep them open in tabs):

ServiceURLPurpose
Grafanahttp://localhost:3333Main observability dashboard
Jaegerhttp://localhost:16686Distributed tracing
Prometheushttp://localhost:9090Raw metrics and alerts

Grafana Login: admin / admin (skip password change for demo)

🎯 Checkpoint 2: You should see Grafana’s welcome page and Jaeger’s empty trace list.

Step 4: Start the Observable Broker (1 minute)

Open a new terminal and navigate back to the project root:

# From agenthub root directory
go run broker/main.go

Expected Output:

time=2025-09-28T21:00:00.000Z level=INFO msg="Starting health server on port 8080"
time=2025-09-28T21:00:00.000Z level=INFO msg="AgentHub broker gRPC server with observability listening" address="[::]:50051" health_endpoint="http://localhost:8080/health" metrics_endpoint="http://localhost:8080/metrics"

🎯 Checkpoint 3:

  • Broker is listening on port 50051
  • Health endpoint available at http://localhost:8080/health
  • Metrics endpoint available at http://localhost:8080/metrics

Step 5: Start the Observable Subscriber (1 minute)

Open another terminal:

go run agents/subscriber/main.go

Expected Output:

time=2025-09-28T21:00:01.000Z level=INFO msg="Starting health server on port 8082"
time=2025-09-28T21:00:01.000Z level=INFO msg="Starting observable subscriber"
time=2025-09-28T21:00:01.000Z level=INFO msg="Agent started with observability. Listening for events and tasks."

🎯 Checkpoint 4:

  • Subscriber is connected and listening
  • Health available at http://localhost:8082/health

Step 6: Generate Events with the Publisher (2 minutes)

Open a third terminal:

go run agents/publisher/main.go

Expected Output:

time=2025-09-28T21:00:02.000Z level=INFO msg="Starting health server on port 8081"
time=2025-09-28T21:00:02.000Z level=INFO msg="Starting observable publisher demo"
time=2025-09-28T21:00:02.000Z level=INFO msg="Publishing task" task_id=task_greeting_1727557202 task_type=greeting responder_agent_id=agent_demo_subscriber
time=2025-09-28T21:00:02.000Z level=INFO msg="Task published successfully" task_id=task_greeting_1727557202 task_type=greeting

🎯 Checkpoint 5: You should see:

  • Publisher creating and sending tasks
  • Subscriber receiving and processing tasks
  • Broker routing messages between them

Step 7: Explore Real-time Metrics in Grafana (2 minutes)

  1. Go to Grafana: http://localhost:3333
  2. Navigate to Dashboards β†’ Browse β†’ AgentHub β†’ “AgentHub EDA System Observatory”
  3. Observe the real-time data:

What You’ll See:

Event Processing Rate (Top Left)

  • Lines showing events/second for each service
  • Should show activity spikes when publisher runs

Error Rate (Top Right)

  • Gauge showing error percentage
  • Should be green (< 5% errors)

Event Types Distribution (Middle Left)

  • Pie chart showing task types: greeting, math_calculation, random_number
  • Different colors for each task type

Processing Latency (Middle Right)

  • Three lines: p50, p95, p99 latencies
  • Should show sub-second processing times

System Health (Bottom)

  • CPU usage, memory usage, goroutines
  • Service health status (all should be UP)

🎯 Checkpoint 6: Dashboard should show live metrics with recent activity.

Step 8: Explore Distributed Traces in Jaeger (2 minutes)

  1. Go to Jaeger: http://localhost:16686
  2. Select Service: Choose “agenthub-broker” from dropdown
  3. Click “Find Traces”
  4. Click on any trace to see details

What You’ll See:

Complete Event Journey:

agenthub-publisher: publish_event (2ms)
  └── agenthub-broker: process_event (1ms)
      └── agenthub-subscriber: consume_event (5ms)
          └── agenthub-subscriber: process_task (15ms)
              └── agenthub-subscriber: publish_result (2ms)

Trace Details:

  • Span Tags: event_id, event_type, service names
  • Timing Information: Exact start/end times and durations
  • Log Correlation: Each span linked to structured logs

Error Detection:

  • Look for red spans indicating errors
  • Trace the “unknown_task” type to see how errors propagate

🎯 Checkpoint 7: You should see complete traces showing the full event lifecycle.

Step 9: Correlate Logs with Traces (1 minute)

  1. Copy a trace ID from Jaeger (the long hex string)

  2. Check broker logs for that trace ID:

    # In your broker terminal, look for lines like:
    time=2025-09-28T21:00:02.000Z level=INFO msg="Received task request" task_id=task_greeting_1727557202 trace_id=a1b2c3d4e5f6...
    
  3. Check subscriber logs for the same trace ID

🎯 Checkpoint 8: You should find the same trace_id in logs across multiple services.

Step 10: Experience Intelligent Alerting (Optional)

To see alerting in action:

  1. Simulate errors by stopping the subscriber:

    # In subscriber terminal, press Ctrl+C
    
  2. Keep publisher running (it will fail to process tasks)

  3. Check Prometheus alerts:

    • Go to http://localhost:9090/alerts
    • After ~5 minutes, you should see “HighEventProcessingErrorRate” firing
  4. Restart subscriber to clear the alert

πŸŽ‰ Congratulations!

You’ve successfully experienced the complete AgentHub observability stack!

Summary: What You Accomplished

βœ… Deployed a complete observability stack with Docker Compose βœ… Ran observable agents with automatic instrumentation βœ… Monitored real-time metrics in Grafana dashboards βœ… Traced event flows across multiple services with Jaeger βœ… Correlated logs with traces using trace IDs βœ… Experienced intelligent alerting with Prometheus βœ… Understood the complete event lifecycle from publisher to subscriber

Key Observability Concepts You Learned

Distributed Tracing

  • Events get unique trace IDs that follow them everywhere
  • Each processing step creates a “span” with timing information
  • Complete request flows are visible across service boundaries

Metrics Collection

  • 47+ different metrics automatically collected
  • Real-time visualization of system health and performance
  • Historical data for trend analysis

Structured Logging

  • All logs include trace context for correlation
  • Consistent format across all services
  • Easy debugging and troubleshooting

Intelligent Alerting

  • Proactive monitoring for error rates and performance
  • Automatic notifications when thresholds are exceeded
  • Helps prevent issues before they impact users

Next Steps

For Development:

For Operations:

For Understanding:

Troubleshooting

IssueSolution
Services won’t startRun docker-compose down && docker-compose up -d
No metrics in GrafanaCheck Prometheus targets: http://localhost:9090/targets
No traces in JaegerVerify JAEGER_ENDPOINT environment variable is set correctly
Permission errorsEnsure Docker has proper permissions

Clean Up

When you’re done exploring:

# Stop the observability stack
cd observability
docker-compose down

# Stop the Go applications
# Press Ctrl+C in each terminal running the agents

🎯 Ready for More?

Production Usage: Add Observability to Your Agent

Deep Understanding: Distributed Tracing Explained

3 - Workflows

Tutorials for building complex multi-agent workflows

Workflow Tutorials

Learn to design and implement sophisticated multi-agent workflows and orchestration patterns.

Available Tutorials

3.1 - Building Multi-Agent Workflows

Learn to create complex workflows involving multiple specialized agents working together to accomplish sophisticated tasks. Build a real document processing pipeline with multiple agents handling different stages.

Building Multi-Agent Workflows

This advanced tutorial teaches you to create complex workflows involving multiple specialized agents working together to accomplish sophisticated tasks. You’ll build a real document processing pipeline with multiple agents handling different stages.

What You’ll Build

By the end of this tutorial, you’ll have an A2A-compliant multi-agent system that:

  1. Ingests documents through an A2A Document Intake Agent
  2. Validates content using an A2A Validation Agent
  3. Extracts metadata with an A2A Metadata Extraction Agent
  4. Processes text through an A2A Text Processing Agent
  5. Generates summaries using an A2A Summary Agent
  6. Orchestrates the workflow with an A2A Workflow Coordinator Agent

This demonstrates real-world A2A agent collaboration patterns with conversation context, structured message content, and artifact-based results used in production systems.

Prerequisites

Architecture Overview

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚  A2A Workflow   β”‚    β”‚   AgentHub      β”‚    β”‚  A2A Specializedβ”‚
β”‚  Coordinator    β”‚    β”‚  A2A Broker     β”‚    β”‚    Agents       β”‚
β”‚                 β”‚    β”‚                 β”‚    β”‚                 β”‚
β”‚ β€’ A2A context   │◄──►│ β€’ Routes A2A    │◄──►│ β€’ Document      β”‚
β”‚   management    β”‚    β”‚   messages      β”‚    β”‚   Intake        β”‚
β”‚ β€’ Conversation  β”‚    β”‚ β€’ Tracks A2A    β”‚    β”‚ β€’ Validation    β”‚
β”‚   threading     β”‚    β”‚   conversations β”‚    β”‚ β€’ Metadata      β”‚
β”‚ β€’ Artifact      β”‚    β”‚ β€’ Manages A2A   β”‚    β”‚ β€’ Text Proc     β”‚
β”‚   aggregation   β”‚    β”‚   state         β”‚    β”‚ β€’ Summary       β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Step 1: Create the Workflow Coordinator

First, let’s create the main coordinator that manages the document processing pipeline.

Create the coordinator agent:

mkdir -p agents/coordinator

Create agents/coordinator/main.go:

package main

import (
	"context"
	"fmt"
	"log"
	"time"

	"github.com/google/uuid"
	"google.golang.org/grpc"
	"google.golang.org/grpc/credentials/insecure"
	"google.golang.org/protobuf/types/known/structpb"
	"google.golang.org/protobuf/types/known/timestamppb"

	a2a "github.com/owulveryck/agenthub/events/a2a"
	pb "github.com/owulveryck/agenthub/events/eventbus"
)

const (
	agentHubAddr = "localhost:50051"
	agentID      = "a2a_workflow_coordinator"
)

type A2ADocumentWorkflow struct {
	DocumentID    string
	ContextID     string                 // A2A conversation context
	Status        string
	CurrentStage  string
	TaskHistory   []*a2a.Task           // Complete A2A task history
	Artifacts     []*a2a.Artifact       // Collected artifacts from stages
	StartTime     time.Time
	client        pb.AgentHubClient      // A2A-compliant client
}

func main() {
	conn, err := grpc.Dial(agentHubAddr, grpc.WithTransportCredentials(insecure.NewCredentials()))
	if err != nil {
		log.Fatalf("Failed to connect: %v", err)
	}
	defer conn.Close()

	client := pb.NewAgentHubClient(conn)
	coordinator := &A2AWorkflowCoordinator{
		client:        client,
		workflows:     make(map[string]*A2ADocumentWorkflow),
	}

	ctx := context.Background()

	// Start listening for A2A task events
	go coordinator.subscribeToA2AEvents(ctx)

	// Start processing documents with A2A workflow
	coordinator.startA2ADocumentProcessing(ctx)

	// Keep running
	select {}
}

type A2AWorkflowCoordinator struct {
	client    pb.AgentHubClient
	workflows map[string]*A2ADocumentWorkflow
}

func (wc *A2AWorkflowCoordinator) startA2ADocumentProcessing(ctx context.Context) {
	// Simulate document arrival with A2A structured content
	documents := []map[string]interface{}{
		{
			"document_id": "doc_001",
			"content":     "This is a sample business document about quarterly results.",
			"filename":    "q3_results.txt",
			"source":      "email_attachment",
			"doc_type":    "business_report",
		},
		{
			"document_id": "doc_002",
			"content":     "Technical specification for the new API endpoints and authentication mechanisms.",
			"filename":    "api_spec.txt",
			"source":      "file_upload",
			"doc_type":    "technical_spec",
		},
	}

	for _, doc := range documents {
		wc.processA2ADocument(ctx, doc)
		time.Sleep(5 * time.Second)
	}
}

func (wc *A2AWorkflowCoordinator) processA2ADocument(ctx context.Context, document map[string]interface{}) {
	documentID := document["document_id"].(string)
	contextID := fmt.Sprintf("doc_workflow_%s_%s", documentID, uuid.New().String())

	workflow := &A2ADocumentWorkflow{
		DocumentID:   documentID,
		ContextID:    contextID,
		Status:       "started",
		CurrentStage: "intake",
		TaskHistory:  make([]*a2a.Task, 0),
		Artifacts:    make([]*a2a.Artifact, 0),
		StartTime:    time.Now(),
		client:       wc.client,
	}

	wc.workflows[documentID] = workflow

	log.Printf("Starting A2A document processing workflow for %s with context %s", documentID, contextID)

	// Stage 1: A2A Document Intake
	wc.publishA2ATask(ctx, "document_intake", document, "a2a_document_intake_agent", workflow)
}

func (wc *A2AWorkflowCoordinator) publishA2ATask(ctx context.Context, taskDescription string, params map[string]interface{}, targetAgent string, workflow *A2ADocumentWorkflow) {
	taskID := fmt.Sprintf("task_%s_%s", taskDescription, uuid.New().String())
	messageID := fmt.Sprintf("msg_%d_%s", time.Now().Unix(), uuid.New().String())

	// Create A2A structured content
	paramsData, err := structpb.NewStruct(params)
	if err != nil {
		log.Printf("Error creating parameters: %v", err)
		return
	}

	// Create A2A message with structured parts
	requestMessage := &a2a.Message{
		MessageId: messageID,
		ContextId: workflow.ContextID,
		TaskId:    taskID,
		Role:      a2a.Role_USER,
		Content: []*a2a.Part{
			{
				Part: &a2a.Part_Text{
					Text: fmt.Sprintf("Please process %s for document %s", taskDescription, workflow.DocumentID),
				},
			},
			{
				Part: &a2a.Part_Data{
					Data: &a2a.DataPart{
						Data:        paramsData,
						Description: fmt.Sprintf("%s parameters", taskDescription),
					},
				},
			},
		},
	}

	// Create A2A task
	task := &a2a.Task{
		Id:        taskID,
		ContextId: workflow.ContextID,
		Status: &a2a.TaskStatus{
			State:     a2a.TaskState_TASK_STATE_SUBMITTED,
			Update:    requestMessage,
			Timestamp: timestamppb.Now(),
		},
		History: []*a2a.Message{requestMessage},
		Metadata: paramsData,
	}

	// Store in workflow history
	workflow.TaskHistory = append(workflow.TaskHistory, task)

	// Publish A2A task update
	req := &pb.PublishTaskUpdateRequest{
		Task: task,
		Routing: &pb.AgentEventMetadata{
			FromAgentId: agentID,
			ToAgentId:   targetAgent,
			EventType:   "task.submitted",
			Priority:    pb.Priority_PRIORITY_MEDIUM,
		},
	}

	log.Printf("Publishing A2A %s task for workflow %s in context %s", taskDescription, workflow.DocumentID, workflow.ContextID)
	_, err = wc.client.PublishTaskUpdate(ctx, req)
	if err != nil {
		log.Printf("Error publishing A2A task: %v", err)
	}
}

func (wc *WorkflowCoordinator) subscribeToResults(ctx context.Context) {
	req := &pb.SubscribeToTaskResultsRequest{
		RequesterAgentId: agentID,
	}

	stream, err := wc.client.SubscribeToTaskResults(ctx, req)
	if err != nil {
		log.Printf("Error subscribing to results: %v", err)
		return
	}

	for {
		result, err := stream.Recv()
		if err != nil {
			log.Printf("Error receiving result: %v", err)
			return
		}

		wc.handleTaskResult(ctx, result)
	}
}

func (wc *WorkflowCoordinator) handleTaskResult(ctx context.Context, result *pb.TaskResult) {
	params := result.GetResult().AsMap()
	workflowID := params["workflow_id"].(string)
	stage := params["stage"].(string)

	workflow, exists := wc.workflows[workflowID]
	if !exists {
		log.Printf("Unknown workflow ID: %s", workflowID)
		return
	}

	log.Printf("Received result for workflow %s, stage %s: %s",
		workflowID, stage, result.GetStatus().String())

	if result.GetStatus() == pb.TaskStatus_TASK_STATUS_FAILED {
		workflow.Status = "failed"
		log.Printf("Workflow %s failed at stage %s: %s",
			workflowID, stage, result.GetErrorMessage())
		return
	}

	// Store stage results
	workflow.Results[stage] = params

	// Advance to next stage
	wc.advanceWorkflow(ctx, workflow, stage)
}

func (wc *WorkflowCoordinator) advanceWorkflow(ctx context.Context, workflow *DocumentWorkflow, completedStage string) {
	switch completedStage {
	case "document_intake":
		// Move to validation
		workflow.CurrentStage = "validation"
		data := workflow.Results["document_intake"]
		wc.publishTask(ctx, "document_validation", data.(map[string]interface{}), "validation_agent", workflow.DocumentID)

	case "document_validation":
		// Move to metadata extraction
		workflow.CurrentStage = "metadata_extraction"
		data := workflow.Results["document_validation"]
		wc.publishTask(ctx, "metadata_extraction", data.(map[string]interface{}), "metadata_agent", workflow.DocumentID)

	case "metadata_extraction":
		// Move to text processing
		workflow.CurrentStage = "text_processing"
		data := workflow.Results["metadata_extraction"]
		wc.publishTask(ctx, "text_processing", data.(map[string]interface{}), "text_processor_agent", workflow.DocumentID)

	case "text_processing":
		// Move to summary generation
		workflow.CurrentStage = "summary_generation"
		data := workflow.Results["text_processing"]
		wc.publishTask(ctx, "summary_generation", data.(map[string]interface{}), "summary_agent", workflow.DocumentID)

	case "summary_generation":
		// Workflow complete
		workflow.Status = "completed"
		workflow.CurrentStage = "finished"
		duration := time.Since(workflow.StartTime)

		log.Printf("Workflow %s completed successfully in %v", workflow.DocumentID, duration)
		wc.printWorkflowSummary(workflow)
	}
}

func (wc *WorkflowCoordinator) printWorkflowSummary(workflow *DocumentWorkflow) {
	fmt.Printf("\n=== WORKFLOW SUMMARY ===\n")
	fmt.Printf("Document ID: %s\n", workflow.DocumentID)
	fmt.Printf("Status: %s\n", workflow.Status)
	fmt.Printf("Duration: %v\n", time.Since(workflow.StartTime))
	fmt.Printf("Stages completed:\n")

	for stage, result := range workflow.Results {
		fmt.Printf("  - %s: %v\n", stage, result)
	}
	fmt.Printf("=======================\n\n")
}

Step 2: Create Specialized Agents

Now let’s create each specialized agent that handles specific stages of the pipeline.

Document Intake Agent

Create agents/document_intake/main.go:

package main

import (
	"context"
	"crypto/md5"
	"fmt"
	"io"
	"log"
	"strings"
	"time"

	"google.golang.org/grpc"
	"google.golang.org/grpc/credentials/insecure"
	"google.golang.org/protobuf/types/known/structpb"
	"google.golang.org/protobuf/types/known/timestamppb"

	pb "github.com/owulveryck/agenthub/events/a2a"
)

const (
	agentHubAddr = "localhost:50051"
	agentID      = "document_intake_agent"
)

func main() {
	conn, err := grpc.Dial(agentHubAddr, grpc.WithTransportCredentials(insecure.NewCredentials()))
	if err != nil {
		log.Fatalf("Failed to connect: %v", err)
	}
	defer conn.Close()

	client := pb.NewEventBusClient(conn)
	agent := &DocumentIntakeAgent{client: client}

	ctx := context.Background()
	agent.start(ctx)
}

type DocumentIntakeAgent struct {
	client pb.EventBusClient
}

func (dia *DocumentIntakeAgent) start(ctx context.Context) {
	log.Printf("Document Intake Agent %s starting...", agentID)

	req := &pb.SubscribeToTasksRequest{
		AgentId:   agentID,
		TaskTypes: []string{"document_intake"},
	}

	stream, err := dia.client.SubscribeToTasks(ctx, req)
	if err != nil {
		log.Fatalf("Error subscribing: %v", err)
	}

	log.Printf("Subscribed to document intake tasks")

	for {
		task, err := stream.Recv()
		if err == io.EOF {
			return
		}
		if err != nil {
			log.Printf("Error receiving task: %v", err)
			return
		}

		go dia.processTask(ctx, task)
	}
}

func (dia *DocumentIntakeAgent) processTask(ctx context.Context, task *pb.TaskMessage) {
	log.Printf("Processing document intake task: %s", task.GetTaskId())

	params := task.GetParameters().AsMap()

	// Simulate document intake processing
	time.Sleep(2 * time.Second)

	// Generate document hash
	content := params["content"].(string)
	hash := fmt.Sprintf("%x", md5.Sum([]byte(content)))

	// Extract basic metadata
	wordCount := len(strings.Fields(content))
	charCount := len(content)

	result := map[string]interface{}{
		"document_id":   params["document_id"],
		"workflow_id":   params["workflow_id"],
		"stage":         "document_intake",
		"content":       content,
		"filename":      params["filename"],
		"source":        params["source"],
		"document_hash": hash,
		"word_count":    wordCount,
		"char_count":    charCount,
		"intake_timestamp": time.Now().Format(time.RFC3339),
		"status":        "intake_complete",
	}

	dia.publishResult(ctx, task, result, pb.TaskStatus_TASK_STATUS_COMPLETED, "")
}

func (dia *DocumentIntakeAgent) publishResult(ctx context.Context, originalTask *pb.TaskMessage, result map[string]interface{}, status pb.TaskStatus, errorMsg string) {
	resultStruct, err := structpb.NewStruct(result)
	if err != nil {
		log.Printf("Error creating result struct: %v", err)
		return
	}

	taskResult := &pb.TaskResult{
		TaskId:          originalTask.GetTaskId(),
		Status:          status,
		Result:          resultStruct,
		ErrorMessage:    errorMsg,
		ExecutorAgentId: agentID,
		CompletedAt:     timestamppb.Now(),
	}

	req := &pb.PublishTaskResultRequest{Result: taskResult}

	_, err = dia.client.PublishTaskResult(ctx, req)
	if err != nil {
		log.Printf("Error publishing result: %v", err)
	} else {
		log.Printf("Published result for task %s", originalTask.GetTaskId())
	}
}

Validation Agent

Create agents/validation/main.go:

package main

import (
	"context"
	"io"
	"log"
	"strings"
	"time"

	"google.golang.org/grpc"
	"google.golang.org/grpc/credentials/insecure"
	"google.golang.org/protobuf/types/known/structpb"
	"google.golang.org/protobuf/types/known/timestamppb"

	pb "github.com/owulveryck/agenthub/events/a2a"
)

const (
	agentHubAddr = "localhost:50051"
	agentID      = "validation_agent"
)

func main() {
	conn, err := grpc.Dial(agentHubAddr, grpc.WithTransportCredentials(insecure.NewCredentials()))
	if err != nil {
		log.Fatalf("Failed to connect: %v", err)
	}
	defer conn.Close()

	client := pb.NewEventBusClient(conn)
	agent := &ValidationAgent{client: client}

	ctx := context.Background()
	agent.start(ctx)
}

type ValidationAgent struct {
	client pb.EventBusClient
}

func (va *ValidationAgent) start(ctx context.Context) {
	log.Printf("Validation Agent %s starting...", agentID)

	req := &pb.SubscribeToTasksRequest{
		AgentId:   agentID,
		TaskTypes: []string{"document_validation"},
	}

	stream, err := va.client.SubscribeToTasks(ctx, req)
	if err != nil {
		log.Fatalf("Error subscribing: %v", err)
	}

	log.Printf("Subscribed to document validation tasks")

	for {
		task, err := stream.Recv()
		if err == io.EOF {
			return
		}
		if err != nil {
			log.Printf("Error receiving task: %v", err)
			return
		}

		go va.processTask(ctx, task)
	}
}

func (va *ValidationAgent) processTask(ctx context.Context, task *pb.TaskMessage) {
	log.Printf("Processing validation task: %s", task.GetTaskId())

	params := task.GetParameters().AsMap()

	// Simulate validation processing
	time.Sleep(1500 * time.Millisecond)

	content := params["content"].(string)

	// Perform validation checks
	validationResults := va.validateDocument(content)

	result := map[string]interface{}{
		"document_id":       params["document_id"],
		"workflow_id":       params["workflow_id"],
		"stage":             "document_validation",
		"content":           content,
		"filename":          params["filename"],
		"source":            params["source"],
		"document_hash":     params["document_hash"],
		"word_count":        params["word_count"],
		"char_count":        params["char_count"],
		"intake_timestamp":  params["intake_timestamp"],
		"validation_results": validationResults,
		"validation_timestamp": time.Now().Format(time.RFC3339),
		"status":            "validation_complete",
	}

	var status pb.TaskStatus
	var errorMsg string

	if validationResults["is_valid"].(bool) {
		status = pb.TaskStatus_TASK_STATUS_COMPLETED
	} else {
		status = pb.TaskStatus_TASK_STATUS_FAILED
		errorMsg = "Document validation failed: " + validationResults["errors"].(string)
	}

	va.publishResult(ctx, task, result, status, errorMsg)
}

func (va *ValidationAgent) validateDocument(content string) map[string]interface{} {
	// Simple validation rules
	isValid := true
	var errors []string

	// Check minimum length
	if len(content) < 10 {
		isValid = false
		errors = append(errors, "content too short")
	}

	// Check for suspicious content
	suspiciousTerms := []string{"malware", "virus", "hack"}
	for _, term := range suspiciousTerms {
		if strings.Contains(strings.ToLower(content), term) {
			isValid = false
			errors = append(errors, "suspicious content detected")
			break
		}
	}

	// Check language (simple heuristic)
	isEnglish := va.isEnglishContent(content)

	return map[string]interface{}{
		"is_valid":    isValid,
		"is_english":  isEnglish,
		"errors":      strings.Join(errors, "; "),
		"length_ok":   len(content) >= 10,
		"safe_content": !strings.Contains(strings.ToLower(content), "malware"),
	}
}

func (va *ValidationAgent) isEnglishContent(content string) bool {
	// Simple heuristic: check for common English words
	commonWords := []string{"the", "and", "or", "but", "in", "on", "at", "to", "for", "of", "with", "by"}
	lowerContent := strings.ToLower(content)

	matches := 0
	for _, word := range commonWords {
		if strings.Contains(lowerContent, " "+word+" ") {
			matches++
		}
	}

	return matches >= 2
}

func (va *ValidationAgent) publishResult(ctx context.Context, originalTask *pb.TaskMessage, result map[string]interface{}, status pb.TaskStatus, errorMsg string) {
	resultStruct, err := structpb.NewStruct(result)
	if err != nil {
		log.Printf("Error creating result struct: %v", err)
		return
	}

	taskResult := &pb.TaskResult{
		TaskId:          originalTask.GetTaskId(),
		Status:          status,
		Result:          resultStruct,
		ErrorMessage:    errorMsg,
		ExecutorAgentId: agentID,
		CompletedAt:     timestamppb.Now(),
	}

	req := &pb.PublishTaskResultRequest{Result: taskResult}

	_, err = va.client.PublishTaskResult(ctx, req)
	if err != nil {
		log.Printf("Error publishing result: %v", err)
	} else {
		log.Printf("Published result for task %s", originalTask.GetTaskId())
	}
}

Step 3: Build and Test the Multi-Agent System

Update the Makefile to include the new agents:

# Add to Makefile build target
build: proto
	@echo "Building server binary..."
	go build $(GO_BUILD_FLAGS) -o bin/$(SERVER_BINARY) broker/main.go

	@echo "Building coordinator binary..."
	go build $(GO_BUILD_FLAGS) -o bin/coordinator agents/coordinator/main.go

	@echo "Building document intake agent..."
	go build $(GO_BUILD_FLAGS) -o bin/document-intake agents/document_intake/main.go

	@echo "Building validation agent..."
	go build $(GO_BUILD_FLAGS) -o bin/validation agents/validation/main.go

	@echo "Building publisher binary..."
	go build $(GO_BUILD_FLAGS) -o bin/$(PUBLISHER_BINARY) agents/publisher/main.go

	@echo "Building subscriber binary..."
	go build $(GO_BUILD_FLAGS) -o bin/$(SUBSCRIBER_BINARY) agents/subscriber/main.go

	@echo "Build complete. Binaries are in the 'bin/' directory."

Build all components:

make build

Step 4: Run the Multi-Agent Workflow

Now let’s run the complete multi-agent system:

Terminal 1 - Start the broker:

make run-server

Terminal 2 - Start the document intake agent:

./bin/document-intake

Terminal 3 - Start the validation agent:

./bin/validation

Terminal 4 - Start the workflow coordinator:

./bin/coordinator

Step 5: Observe the Workflow

You’ll see the workflow coordinator processing documents through multiple stages:

  1. Document Intake: Receives and processes raw documents
  2. Validation: Checks content for safety and validity
  3. Metadata Extraction: Extracts structured metadata
  4. Text Processing: Processes and analyzes text content
  5. Summary Generation: Creates document summaries

Each agent processes its stage and passes results to the next stage via the AgentHub broker.

Understanding the Multi-Agent Pattern

This tutorial demonstrates several key patterns:

1. Workflow Orchestration

The coordinator agent manages the overall workflow, determining which stage comes next and handling failures.

2. Specialized Agents

Each agent has a specific responsibility and can be developed, deployed, and scaled independently.

3. Asynchronous Processing

Agents work asynchronously, allowing for better resource utilization and scalability.

4. Error Handling

The system handles failures gracefully, with the coordinator managing workflow state.

5. Data Flow

Structured data flows between agents, with each stage adding value to the processing pipeline.

Next Steps

Now that you understand multi-agent workflows:

  1. Add more agents: Create metadata extraction, text processing, and summary agents
  2. Implement error recovery: Add retry mechanisms and failure handling
  3. Add monitoring: Create a dashboard agent that tracks workflow progress
  4. Scale the system: Run multiple instances of each agent type
  5. Add persistence: Store workflow state in a database for recovery

This pattern scales to handle complex business processes, data pipelines, and automated workflows in production systems.

Common Patterns and Best Practices

Workflow State Management

  • Store workflow state persistently for recovery
  • Use unique workflow IDs for tracking
  • Implement timeouts for stuck workflows

Agent Communication

  • Use structured messages with clear schemas
  • Include metadata for routing and tracking
  • Implement progress reporting for long-running tasks

Error Handling

  • Design for partial failures
  • Implement retry mechanisms with backoff
  • Provide clear error messages and recovery paths

Monitoring and Observability

  • Log all state transitions
  • Track workflow performance metrics
  • Implement health checks for agents

You now have the foundation for building sophisticated multi-agent systems that can handle complex, real-world workflows!