Reference
Comprehensive technical documentation and API specifications
Reference Documentation
This section contains comprehensive technical documentation for all AgentHub components, APIs, and configuration options. Use this as your authoritative source for implementation details.
📚 Documentation Sections
- Configuration - Complete configuration options and settings
- API Reference - gRPC APIs, unified abstractions, and tracing interfaces
- Observability - Metrics, health endpoints, and monitoring
- Tasks - Task message specifications and data structures
🎯 How to Use This Reference
- Accuracy: All information is kept up-to-date with the latest version
- Completeness: Every public API and configuration option is documented
- Examples: Code examples illustrate usage where helpful
- Structure: Information is organized by component and function
🔍 Quick Navigation
By Component
By Use Case
Note
This reference documentation describes the current stable version. For experimental features, check the explanation section or source code.1 - Configuration
Configuration reference and settings documentation
Configuration Reference
This section provides comprehensive documentation for all AgentHub configuration options, environment variables, and settings.
Available Documentation
1.1 - Environment Variables Reference
Complete reference for all environment variables used by AgentHub’s unified abstractions for configuration and observability.
Environment Variables Reference
This reference documents all environment variables used by AgentHub’s unified abstraction system. All components automatically load these variables for configuration.
Core Configuration
Broker Connection
| Variable | Default | Description | Used By | 
|---|
| AGENTHUB_BROKER_ADDR | localhost | Broker server hostname or IP address | Agents | 
| AGENTHUB_BROKER_PORT | 50051 | Broker gRPC port number | Agents | 
| AGENTHUB_GRPC_PORT | :50051 | Server listen address (for broker) | Broker | 
Example:
export AGENTHUB_BROKER_ADDR="production-broker.example.com"
export AGENTHUB_BROKER_PORT="50051"
export AGENTHUB_GRPC_PORT=":50051"
Health Monitoring
| Variable | Default | Description | Used By | 
|---|
| BROKER_HEALTH_PORT | 8080 | Broker health check endpoint port | Broker | 
| PUBLISHER_HEALTH_PORT | 8081 | Publisher health check endpoint port | Publishers | 
| SUBSCRIBER_HEALTH_PORT | 8082 | Subscriber health check endpoint port | Subscribers | 
Health Endpoints Available:
- http://localhost:8080/health- Health check
- http://localhost:8080/metrics- Prometheus metrics
- http://localhost:8080/ready- Readiness check
Example:
export BROKER_HEALTH_PORT="8080"
export PUBLISHER_HEALTH_PORT="8081"
export SUBSCRIBER_HEALTH_PORT="8082"
Observability Configuration
Distributed Tracing
| Variable | Default | Description | Used By | 
|---|
| JAEGER_ENDPOINT | 127.0.0.1:4317 | Jaeger OTLP endpoint for traces | All components | 
| SERVICE_NAME | agenthub-service | Service name for tracing | All components | 
| SERVICE_VERSION | 1.0.0 | Service version for telemetry | All components | 
Example:
export JAEGER_ENDPOINT="http://jaeger.example.com:14268/api/traces"
export SERVICE_NAME="my-agenthub-app"
export SERVICE_VERSION="2.1.0"
Jaeger Integration:
- When JAEGER_ENDPOINTis set: Automatic tracing enabled
- When empty or unset: Tracing disabled (minimal overhead)
- Supports both gRPC (4317) and HTTP (14268) endpoints
Metrics Collection
| Variable | Default | Description | Used By | 
|---|
| PROMETHEUS_PORT | 9090 | Prometheus server port | Observability stack | 
| GRAFANA_PORT | 3333 | Grafana dashboard port | Observability stack | 
| ALERTMANAGER_PORT | 9093 | AlertManager port | Observability stack | 
Example:
export PROMETHEUS_PORT="9090"
export GRAFANA_PORT="3333"
export ALERTMANAGER_PORT="9093"
OpenTelemetry Collector
| Variable | Default | Description | Used By | 
|---|
| OTLP_GRPC_PORT | 4320 | OTLP Collector gRPC port | Observability stack | 
| OTLP_HTTP_PORT | 4321 | OTLP Collector HTTP port | Observability stack | 
Example:
export OTLP_GRPC_PORT="4320"
export OTLP_HTTP_PORT="4321"
Service Configuration
General Settings
| Variable | Default | Description | Used By | 
|---|
| ENVIRONMENT | development | Deployment environment | All components | 
| LOG_LEVEL | INFO | Logging level (DEBUG, INFO, WARN, ERROR) | All components | 
Example:
export ENVIRONMENT="production"
export LOG_LEVEL="WARN"
Environment-Specific Configurations
Development Environment
# .envrc for development
export AGENTHUB_BROKER_ADDR="localhost"
export AGENTHUB_BROKER_PORT="50051"
export AGENTHUB_GRPC_PORT=":50051"
# Health ports
export BROKER_HEALTH_PORT="8080"
export PUBLISHER_HEALTH_PORT="8081"
export SUBSCRIBER_HEALTH_PORT="8082"
# Observability (local stack)
export JAEGER_ENDPOINT="http://localhost:14268/api/traces"
export PROMETHEUS_PORT="9090"
export GRAFANA_PORT="3333"
# Service metadata
export SERVICE_NAME="agenthub-dev"
export SERVICE_VERSION="dev"
export ENVIRONMENT="development"
export LOG_LEVEL="DEBUG"
Staging Environment
# .envrc for staging
export AGENTHUB_BROKER_ADDR="staging-broker.example.com"
export AGENTHUB_BROKER_PORT="50051"
# Health ports (non-conflicting)
export BROKER_HEALTH_PORT="8080"
export PUBLISHER_HEALTH_PORT="8081"
export SUBSCRIBER_HEALTH_PORT="8082"
# Observability (staging stack)
export JAEGER_ENDPOINT="http://staging-jaeger.example.com:14268/api/traces"
export PROMETHEUS_PORT="9090"
export GRAFANA_PORT="3333"
# Service metadata
export SERVICE_NAME="agenthub-staging"
export SERVICE_VERSION="1.2.0-rc1"
export ENVIRONMENT="staging"
export LOG_LEVEL="INFO"
Production Environment
# .envrc for production
export AGENTHUB_BROKER_ADDR="prod-broker.example.com"
export AGENTHUB_BROKER_PORT="50051"
# Health ports
export BROKER_HEALTH_PORT="8080"
export PUBLISHER_HEALTH_PORT="8081"
export SUBSCRIBER_HEALTH_PORT="8082"
# Observability (production stack)
export JAEGER_ENDPOINT="http://jaeger.prod.example.com:14268/api/traces"
export PROMETHEUS_PORT="9090"
export GRAFANA_PORT="3333"
export ALERTMANAGER_PORT="9093"
# Service metadata
export SERVICE_NAME="agenthub-prod"
export SERVICE_VERSION="1.2.0"
export ENVIRONMENT="production"
export LOG_LEVEL="WARN"
Configuration Loading
Automatic Loading by Unified Abstractions
The unified abstractions automatically load environment variables:
// Automatic configuration loading
config := agenthub.NewGRPCConfig("my-component")
// Results in:
// config.BrokerAddr = "localhost:50051" (AGENTHUB_BROKER_ADDR + AGENTHUB_BROKER_PORT)
// config.ServerAddr = ":50051" (AGENTHUB_GRPC_PORT)
// config.HealthPort = "8080" (BROKER_HEALTH_PORT)
// config.ComponentName = "my-component" (from parameter)
Using direnv (Recommended)
- Install direnv: https://direnv.net/docs/installation.html 
- Create .envrc file: - # Create .envrc in project root
cat > .envrc << 'EOF'
export AGENTHUB_BROKER_ADDR="localhost"
export AGENTHUB_BROKER_PORT="50051"
export JAEGER_ENDPOINT="http://localhost:14268/api/traces"
export SERVICE_NAME="my-agenthub-app"
EOF
 
- Allow direnv: 
- Automatic loading: Variables load automatically when entering directory 
Manual Loading
# Source variables manually
source .envrc
# Or set individually
export AGENTHUB_BROKER_ADDR="localhost"
export JAEGER_ENDPOINT="http://localhost:14268/api/traces"
Configuration Validation
Required Variables
Minimal configuration (all have defaults):
- No variables are strictly required
- Defaults work for local development
Production recommendations:
- Set JAEGER_ENDPOINTfor tracing
- Set SERVICE_NAMEfor identification
- Set ENVIRONMENTto “production”
- Configure unique health ports if running multiple services
Configuration Verification
Check loaded configuration:
config := agenthub.NewGRPCConfig("test")
fmt.Printf("Broker: %s\n", config.BrokerAddr)
fmt.Printf("Health: %s\n", config.HealthPort)
fmt.Printf("Component: %s\n", config.ComponentName)
Verify health endpoints:
# Check if configuration is working
curl http://localhost:8080/health
curl http://localhost:8081/health  # Publisher
curl http://localhost:8082/health  # Subscriber
Verify tracing:
- Open Jaeger UI: http://localhost:16686
- Look for traces from your service name
- Check spans are being created
Common Patterns
Docker Compose
# docker-compose.yml
version: '3.8'
services:
  broker:
    build: .
    command: go run broker/main.go
    environment:
      - AGENTHUB_GRPC_PORT=:50051
      - BROKER_HEALTH_PORT=8080
      - JAEGER_ENDPOINT=http://jaeger:14268/api/traces
      - SERVICE_NAME=agenthub-broker
    ports:
      - "50051:50051"
      - "8080:8080"
  publisher:
    build: .
    command: go run agents/publisher/main.go
    environment:
      - AGENTHUB_BROKER_ADDR=broker
      - AGENTHUB_BROKER_PORT=50051
      - PUBLISHER_HEALTH_PORT=8081
      - JAEGER_ENDPOINT=http://jaeger:14268/api/traces
      - SERVICE_NAME=agenthub-publisher
    ports:
      - "8081:8081"
Kubernetes ConfigMap
# configmap.yaml
apiVersion: v1
kind: ConfigMap
metadata:
  name: agenthub-config
data:
  AGENTHUB_BROKER_ADDR: "agenthub-broker.default.svc.cluster.local"
  AGENTHUB_BROKER_PORT: "50051"
  JAEGER_ENDPOINT: "http://jaeger.observability.svc.cluster.local:14268/api/traces"
  SERVICE_NAME: "agenthub-k8s"
  SERVICE_VERSION: "1.0.0"
  ENVIRONMENT: "production"
  LOG_LEVEL: "INFO"
---
# deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: agenthub-publisher
spec:
  template:
    spec:
      containers:
      - name: publisher
        image: agenthub:latest
        envFrom:
        - configMapRef:
            name: agenthub-config
        env:
        - name: PUBLISHER_HEALTH_PORT
          value: "8080"
Troubleshooting
Common Issues
| Problem | Solution | 
|---|
| Agent can’t connect to broker | Check AGENTHUB_BROKER_ADDRandAGENTHUB_BROKER_PORT | 
| Health endpoint not accessible | Verify *_HEALTH_PORTvariables and port availability | 
| No traces in Jaeger | Set JAEGER_ENDPOINTand ensure Jaeger is running | 
| Port conflicts | Use different ports for each component’s health endpoints | 
| Configuration not loading | Ensure variables are exported, check with printenv | 
Debug Configuration
Check environment variables:
# List all AgentHub variables
printenv | grep AGENTHUB
# List all observability variables
printenv | grep -E "(JAEGER|SERVICE|PROMETHEUS|GRAFANA)"
# Check specific variable
echo $AGENTHUB_BROKER_ADDR
Test configuration:
# Quick test with temporary override
AGENTHUB_BROKER_ADDR=test-broker go run agents/publisher/main.go
# Verify health endpoint responds
curl -f http://localhost:8080/health || echo "Health check failed"
Configuration Precedence
- Environment variables (highest priority)
- Default values (from code)
Example: If AGENTHUB_BROKER_ADDR is set, it overrides the default “localhost”
This environment variable reference provides comprehensive documentation for configuring AgentHub using the unified abstraction system. For practical usage examples, see the Installation and Setup Tutorial and Configuration Reference.
1.2 - Configuration Reference
Comprehensive reference for configuring AgentHub components using the unified abstraction library with environment-based configuration.
Configuration Reference
This document provides comprehensive reference for configuring AgentHub components using the unified abstraction library with environment-based configuration.
Unified Abstraction Configuration
AgentHub uses environment variables for all configuration with the unified abstraction library providing automatic configuration setup.
Core Environment Variables
gRPC Connection Configuration
| Variable | Default | Description | 
|---|
| AGENTHUB_BROKER_ADDR | localhost | Broker server hostname or IP address | 
| AGENTHUB_BROKER_PORT | 50051 | Broker gRPC port number | 
| AGENTHUB_GRPC_PORT | :50051 | Server listen address (for broker) | 
Note: The unified abstraction automatically combines AGENTHUB_BROKER_ADDR and AGENTHUB_BROKER_PORT into a complete broker address (e.g., localhost:50051).
Health Monitoring Configuration
| Variable | Default | Description | 
|---|
| BROKER_HEALTH_PORT | 8080 | Broker health check endpoint port | 
| PUBLISHER_HEALTH_PORT | 8081 | Publisher health check endpoint port | 
| SUBSCRIBER_HEALTH_PORT | 8082 | Subscriber health check endpoint port | 
Observability Configuration
| Variable | Default | Description | 
|---|
| JAEGER_ENDPOINT | 127.0.0.1:4317 | Jaeger OTLP endpoint for distributed tracing | 
| PROMETHEUS_PORT | 9090 | Prometheus metrics collection port | 
| GRAFANA_PORT | 3333 | Grafana dashboard web interface port | 
| ALERTMANAGER_PORT | 9093 | AlertManager web interface port | 
| OTLP_GRPC_PORT | 4320 | OpenTelemetry Collector gRPC port | 
| OTLP_HTTP_PORT | 4321 | OpenTelemetry Collector HTTP port | 
| Variable | Default | Description | 
|---|
| SERVICE_VERSION | 1.0.0 | Service version for telemetry and observability | 
| ENVIRONMENT | development | Deployment environment (development, staging, production) | 
A2A Protocol Configuration
| Variable | Default | Description | 
|---|
| AGENTHUB_MESSAGE_BUFFER_SIZE | 100 | Buffer size for A2A message processing | 
| AGENTHUB_TASK_UPDATE_INTERVAL | 1s | Interval for publishing task status updates | 
| AGENTHUB_ARTIFACT_MAX_SIZE | 10MB | Maximum size for task artifacts | 
| AGENTHUB_CONTEXT_TIMEOUT | 30s | Timeout for A2A message context | 
| AGENTHUB_A2A_PROTOCOL_VERSION | 1.0 | A2A protocol version for compatibility | 
| AGENTHUB_MESSAGE_HISTORY_LIMIT | 50 | Maximum message history per task | 
Unified Abstraction Usage
Using Configuration with the Unified Abstraction
The unified abstraction library automatically loads configuration from environment variables:
// Create configuration from environment variables
config := agenthub.NewGRPCConfig("my-component")
// Configuration is automatically populated:
// - config.BrokerAddr: "localhost:50051" (combined from AGENTHUB_BROKER_ADDR + AGENTHUB_BROKER_PORT)
// - config.ServerAddr: ":50051" (from AGENTHUB_GRPC_PORT)
// - config.HealthPort: "8080" (from BROKER_HEALTH_PORT)
// - config.ComponentName: "my-component" (from parameter)
Environment Variable Loading
The recommended way to load environment variables:
Option 1: Using direnv (recommended)
# Place variables in .envrc file
direnv allow
Option 2: Source manually
Option 3: Set individual variables
export AGENTHUB_BROKER_ADDR=localhost
export AGENTHUB_BROKER_PORT=50051
export JAEGER_ENDPOINT=127.0.0.1:4317
Configuration Override Examples
You can override defaults by setting environment variables before running:
# Use different broker address
export AGENTHUB_BROKER_ADDR=remote-broker.example.com
export AGENTHUB_BROKER_PORT=9090
go run broker/main.go
# Use different health ports to avoid conflicts
export BROKER_HEALTH_PORT=8083
export PUBLISHER_HEALTH_PORT=8084
export SUBSCRIBER_HEALTH_PORT=8085
go run agents/publisher/main.go
# Use custom observability endpoints
export JAEGER_ENDPOINT=jaeger.example.com:4317
export PROMETHEUS_PORT=9091
go run broker/main.go
Configuration Best Practices
- Use .envrc for Development: Keep all environment variables in .envrcfor consistent development experience
- Override Selectively: Only override specific variables when needed, use defaults otherwise
- Environment-Specific Configs: Use different variable values for development, staging, and production
- Health Port Management: Use different health ports for each component to avoid conflicts
- Observability Integration: Always configure observability endpoints for production deployments
Legacy Configuration Migration
If migrating from previous versions of AgentHub:
Old Configuration Pattern:
// Manual server setup (deprecated)
lis, err := net.Listen("tcp", ":50051")
server := grpc.NewServer()
// ... extensive setup code
New Unified Abstraction Pattern:
// Automatic configuration from environment
config := agenthub.NewGRPCConfig("broker")
server, err := agenthub.NewAgentHubServer(config)
service := agenthub.NewAgentHubService(server)
pb.RegisterAgentHubServer(server.Server, service)
server.Start(ctx)
Command-Line Usage
Basic Commands
The unified abstraction provides simplified command execution:
agenthub-server [OPTIONS]
Options:
  -port int
        Server port (default 50051)
  -host string
        Server host (default "0.0.0.0")
  -config string
        Configuration file path
  -log-level string
        Log level: debug, info, warn, error (default "info")
  -log-file string
        Log file path (default: stdout)
  -max-connections int
        Maximum concurrent connections (default 1000)
  -channel-buffer-size int
        Channel buffer size (default 10)
  -help
        Show help message
  -version
        Show version information
Configuration File
The broker can also be configured using a YAML configuration file:
# agenthub.yaml
server:
  host: "0.0.0.0"
  port: 50051
  max_connections: 1000
  timeout: "30s"
logging:
  level: "info"
  format: "json"
  file: "/var/log/agenthub/broker.log"
performance:
  channel_buffer_size: 10
  max_message_size: "4MB"
  keepalive_time: "30s"
  keepalive_timeout: "5s"
limits:
  max_agents: 10000
  max_tasks_per_agent: 100
  memory_limit: "1GB"
security:
  tls_enabled: false
  cert_file: ""
  key_file: ""
  ca_file: ""
Loading Configuration:
agenthub-server -config /path/to/agenthub.yaml
Agent Configuration
Environment Variables
Agents can be configured using environment variables:
Connection Configuration
| Variable | Default | Description | 
|---|
| AGENTHUB_BROKER_ADDRESS | localhost:50051 | Broker server address | 
| AGENTHUB_AGENT_ID | Generated | Unique agent identifier | 
| AGENTHUB_CONNECTION_TIMEOUT | 10s | Connection timeout | 
| AGENTHUB_RETRY_ATTEMPTS | 3 | Connection retry attempts | 
| AGENTHUB_RETRY_DELAY | 1s | Delay between retries | 
Task Processing Configuration
| Variable | Default | Description | 
|---|
| AGENTHUB_MAX_CONCURRENT_TASKS | 5 | Maximum concurrent task processing | 
| AGENTHUB_TASK_TIMEOUT | 300s | Default task timeout | 
| AGENTHUB_PROGRESS_INTERVAL | 5s | Progress reporting interval | 
| AGENTHUB_TASK_TYPES | "" | Comma-separated list of supported task types | 
Logging Configuration
| Variable | Default | Description | 
|---|
| AGENTHUB_AGENT_LOG_LEVEL | info | Agent logging level | 
| AGENTHUB_AGENT_LOG_FORMAT | text | Agent log format | 
| AGENTHUB_AGENT_LOG_FILE | "" | Agent log file path | 
Agent Configuration Examples
Publisher Configuration
package main
import (
    "os"
    "strconv"
    "time"
)
type PublisherConfig struct {
    BrokerAddress    string
    AgentID          string
    ConnectionTimeout time.Duration
    RetryAttempts    int
    RetryDelay       time.Duration
    LogLevel         string
}
func LoadPublisherConfig() *PublisherConfig {
    config := &PublisherConfig{
        BrokerAddress:    getEnv("AGENTHUB_BROKER_ADDRESS", "localhost:50051"),
        AgentID:          getEnv("AGENTHUB_AGENT_ID", generateAgentID()),
        ConnectionTimeout: getDuration("AGENTHUB_CONNECTION_TIMEOUT", "10s"),
        RetryAttempts:    getInt("AGENTHUB_RETRY_ATTEMPTS", 3),
        RetryDelay:       getDuration("AGENTHUB_RETRY_DELAY", "1s"),
        LogLevel:         getEnv("AGENTHUB_AGENT_LOG_LEVEL", "info"),
    }
    return config
}
func getEnv(key, defaultValue string) string {
    if value := os.Getenv(key); value != "" {
        return value
    }
    return defaultValue
}
func getInt(key string, defaultValue int) int {
    if value := os.Getenv(key); value != "" {
        if i, err := strconv.Atoi(value); err == nil {
            return i
        }
    }
    return defaultValue
}
func getDuration(key string, defaultValue string) time.Duration {
    if value := os.Getenv(key); value != "" {
        if d, err := time.ParseDuration(value); err == nil {
            return d
        }
    }
    d, _ := time.ParseDuration(defaultValue)
    return d
}
Subscriber Configuration
type SubscriberConfig struct {
    BrokerAddress      string
    AgentID            string
    MaxConcurrentTasks int
    TaskTimeout        time.Duration
    ProgressInterval   time.Duration
    SupportedTaskTypes []string
    LogLevel           string
}
func LoadSubscriberConfig() *SubscriberConfig {
    taskTypesStr := getEnv("AGENTHUB_TASK_TYPES", "")
    var taskTypes []string
    if taskTypesStr != "" {
        taskTypes = strings.Split(taskTypesStr, ",")
        for i, taskType := range taskTypes {
            taskTypes[i] = strings.TrimSpace(taskType)
        }
    }
    config := &SubscriberConfig{
        BrokerAddress:      getEnv("AGENTHUB_BROKER_ADDRESS", "localhost:50051"),
        AgentID:            getEnv("AGENTHUB_AGENT_ID", generateAgentID()),
        MaxConcurrentTasks: getInt("AGENTHUB_MAX_CONCURRENT_TASKS", 5),
        TaskTimeout:        getDuration("AGENTHUB_TASK_TIMEOUT", "300s"),
        ProgressInterval:   getDuration("AGENTHUB_PROGRESS_INTERVAL", "5s"),
        SupportedTaskTypes: taskTypes,
        LogLevel:           getEnv("AGENTHUB_AGENT_LOG_LEVEL", "info"),
    }
    return config
}
Agent Configuration File
Agents can also use configuration files:
# agent.yaml
agent:
  id: "data_processor_001"
  broker_address: "broker.example.com:50051"
  connection_timeout: "10s"
  retry_attempts: 3
  retry_delay: "1s"
task_processing:
  max_concurrent_tasks: 5
  task_timeout: "300s"
  progress_interval: "5s"
  supported_task_types:
    - "data_analysis"
    - "data_transformation"
    - "data_validation"
logging:
  level: "info"
  format: "json"
  file: "/var/log/agenthub/agent.log"
health:
  port: 8080
  endpoint: "/health"
  check_interval: "30s"
Security Configuration
TLS Configuration
Broker TLS Setup
# broker configuration
security:
  tls_enabled: true
  cert_file: "/etc/agenthub/certs/server.crt"
  key_file: "/etc/agenthub/certs/server.key"
  ca_file: "/etc/agenthub/certs/ca.crt"
  client_auth: "require_and_verify"
Agent TLS Setup
// Agent TLS connection
func createTLSConnection(address string) (*grpc.ClientConn, error) {
    config := &tls.Config{
        ServerName: "agenthub-broker",
        // Load client certificates if needed
    }
    creds := credentials.NewTLS(config)
    conn, err := grpc.Dial(address, grpc.WithTransportCredentials(creds))
    if err != nil {
        return nil, fmt.Errorf("failed to connect with TLS: %v", err)
    }
    return conn, nil
}
Authentication Configuration
JWT Authentication
# broker configuration
security:
  auth_enabled: true
  auth_method: "jwt"
  jwt_secret: "your-secret-key"
  jwt_issuer: "agenthub-broker"
  jwt_expiry: "24h"
// Agent authentication
type AuthenticatedAgent struct {
    client   pb.AgentHubClient
    token    string
    agentID  string
}
func (a *AuthenticatedAgent) authenticate() error {
    // Add authentication token to context
    ctx := metadata.AppendToOutgoingContext(context.Background(),
        "authorization", "Bearer "+a.token)
    // Use authenticated context for A2A requests
    _, err := a.client.PublishMessage(ctx, request)
    return err
}
Production Configuration Examples
# production-broker.yaml
server:
  host: "0.0.0.0"
  port: 50051
  max_connections: 5000
  timeout: "60s"
performance:
  channel_buffer_size: 50
  max_message_size: "16MB"
  keepalive_time: "10s"
  keepalive_timeout: "3s"
limits:
  max_agents: 50000
  max_tasks_per_agent: 500
  memory_limit: "8GB"
logging:
  level: "warn"
  format: "json"
  file: "/var/log/agenthub/broker.log"
security:
  tls_enabled: true
  cert_file: "/etc/ssl/certs/agenthub.crt"
  key_file: "/etc/ssl/private/agenthub.key"
Cluster Agent Configuration
# cluster-agent.yaml
agent:
  id: "${HOSTNAME}_${POD_ID}"
  broker_address: "agenthub-broker.agenthub.svc.cluster.local:50051"
  connection_timeout: "15s"
  retry_attempts: 5
  retry_delay: "2s"
task_processing:
  max_concurrent_tasks: 10
  task_timeout: "1800s"  # 30 minutes
  progress_interval: "10s"
logging:
  level: "info"
  format: "json"
  file: "stdout"
health:
  port: 8080
  endpoint: "/health"
  check_interval: "30s"
metrics:
  enabled: true
  port: 9090
  endpoint: "/metrics"
Environment-Specific Configurations
Development Environment
# .env.development
AGENTHUB_PORT=50051
AGENTHUB_LOG_LEVEL=debug
AGENTHUB_LOG_FORMAT=text
AGENTHUB_MAX_CONNECTIONS=100
AGENTHUB_CHANNEL_BUFFER_SIZE=5
# Agent settings
AGENTHUB_BROKER_ADDRESS=localhost:50051
AGENTHUB_MAX_CONCURRENT_TASKS=2
AGENTHUB_TASK_TIMEOUT=60s
AGENTHUB_AGENT_LOG_LEVEL=debug
Staging Environment
# .env.staging
AGENTHUB_PORT=50051
AGENTHUB_LOG_LEVEL=info
AGENTHUB_LOG_FORMAT=json
AGENTHUB_MAX_CONNECTIONS=1000
AGENTHUB_CHANNEL_BUFFER_SIZE=20
# Security
AGENTHUB_TLS_ENABLED=true
AGENTHUB_CERT_FILE=/etc/certs/staging.crt
AGENTHUB_KEY_FILE=/etc/certs/staging.key
# Agent settings
AGENTHUB_BROKER_ADDRESS=staging-broker.example.com:50051
AGENTHUB_MAX_CONCURRENT_TASKS=5
AGENTHUB_TASK_TIMEOUT=300s
Production Environment
# .env.production
AGENTHUB_PORT=50051
AGENTHUB_LOG_LEVEL=warn
AGENTHUB_LOG_FORMAT=json
AGENTHUB_LOG_FILE=/var/log/agenthub/broker.log
AGENTHUB_MAX_CONNECTIONS=5000
AGENTHUB_CHANNEL_BUFFER_SIZE=50
# Security
AGENTHUB_TLS_ENABLED=true
AGENTHUB_CERT_FILE=/etc/ssl/certs/agenthub.crt
AGENTHUB_KEY_FILE=/etc/ssl/private/agenthub.key
AGENTHUB_CA_FILE=/etc/ssl/certs/ca.crt
# Performance
AGENTHUB_MAX_MESSAGE_SIZE=16MB
AGENTHUB_KEEPALIVE_TIME=10s
AGENTHUB_MEMORY_LIMIT=8GB
# Agent settings
AGENTHUB_BROKER_ADDRESS=agenthub-prod.example.com:50051
AGENTHUB_MAX_CONCURRENT_TASKS=10
AGENTHUB_TASK_TIMEOUT=1800s
AGENTHUB_CONNECTION_TIMEOUT=15s
AGENTHUB_RETRY_ATTEMPTS=5
Configuration Validation
Broker Configuration Validation
type BrokerConfig struct {
    Port             int           `yaml:"port" validate:"min=1,max=65535"`
    Host             string        `yaml:"host" validate:"required"`
    MaxConnections   int           `yaml:"max_connections" validate:"min=1"`
    Timeout          time.Duration `yaml:"timeout" validate:"min=1s"`
    ChannelBufferSize int          `yaml:"channel_buffer_size" validate:"min=1"`
}
func (c *BrokerConfig) Validate() error {
    validate := validator.New()
    return validate.Struct(c)
}
Agent Configuration Validation
type AgentConfig struct {
    BrokerAddress      string        `yaml:"broker_address" validate:"required"`
    AgentID            string        `yaml:"agent_id" validate:"required,min=1,max=64"`
    MaxConcurrentTasks int           `yaml:"max_concurrent_tasks" validate:"min=1,max=100"`
    TaskTimeout        time.Duration `yaml:"task_timeout" validate:"min=1s"`
}
func (c *AgentConfig) Validate() error {
    validate := validator.New()
    if err := validate.Struct(c); err != nil {
        return err
    }
    // Custom validation
    if !strings.Contains(c.BrokerAddress, ":") {
        return errors.New("broker_address must include port")
    }
    return nil
}
This comprehensive configuration reference covers all aspects of configuring AgentHub for different environments and use cases.
2 - API Reference
Complete API documentation and specifications
API Reference Documentation
This section contains comprehensive API documentation for all AgentHub interfaces, including gRPC APIs, unified abstractions, and tracing interfaces.
Available Documentation
2.1 - A2A-Compliant AgentHub API Reference
Complete technical reference for the A2A-compliant AgentHub API, including all gRPC services, message types, and operational details.
A2A-Compliant AgentHub API Reference
This document provides complete technical reference for the Agent2Agent (A2A) protocol-compliant AgentHub API, including all gRPC services, message types, and operational details.
gRPC Service Definition
The AgentHub broker implements the AgentHub service as defined in proto/eventbus.proto:
service AgentHub {
  // ===== A2A Message Publishing (EDA style) =====
  // PublishMessage submits an A2A message for delivery through the broker
  rpc PublishMessage(PublishMessageRequest) returns (PublishResponse);
  // PublishTaskUpdate notifies subscribers about A2A task state changes
  rpc PublishTaskUpdate(PublishTaskUpdateRequest) returns (PublishResponse);
  // PublishTaskArtifact delivers A2A task output artifacts to subscribers
  rpc PublishTaskArtifact(PublishTaskArtifactRequest) returns (PublishResponse);
  // ===== A2A Event Subscriptions (EDA style) =====
  // SubscribeToMessages creates a stream of A2A message events for an agent
  rpc SubscribeToMessages(SubscribeToMessagesRequest) returns (stream AgentEvent);
  // SubscribeToTasks creates a stream of A2A task events for an agent
  rpc SubscribeToTasks(SubscribeToTasksRequest) returns (stream AgentEvent);
  // SubscribeToAgentEvents creates a unified stream of all events for an agent
  rpc SubscribeToAgentEvents(SubscribeToAgentEventsRequest) returns (stream AgentEvent);
  // ===== A2A Task Management (compatible with A2A spec) =====
  // GetTask retrieves the current state of an A2A task by ID
  rpc GetTask(GetTaskRequest) returns (a2a.Task);
  // CancelTask cancels an active A2A task and notifies subscribers
  rpc CancelTask(CancelTaskRequest) returns (a2a.Task);
  // ListTasks returns A2A tasks matching the specified criteria
  rpc ListTasks(ListTasksRequest) returns (ListTasksResponse);
  // ===== Agent Discovery (A2A compatible) =====
  // GetAgentCard returns the broker's A2A agent card for discovery
  rpc GetAgentCard(google.protobuf.Empty) returns (a2a.AgentCard);
  // RegisterAgent registers an agent with the broker for event routing
  rpc RegisterAgent(RegisterAgentRequest) returns (RegisterAgentResponse);
}
A2A Message Types
Core A2A Types
A2A Message
Represents an A2A-compliant message for agent communication.
message Message {
  string message_id = 1;       // Required: Unique message identifier
  string context_id = 2;       // Optional: Conversation context
  string task_id = 3;          // Optional: Associated task
  Role role = 4;               // Required: USER or AGENT
  repeated Part content = 5;   // Required: Message content parts
  google.protobuf.Struct metadata = 6; // Optional: Additional metadata
  repeated string extensions = 7;       // Optional: Protocol extensions
}
Field Details:
- message_id: Must be unique across all messages. Generated automatically if not provided
- context_id: Groups related messages in a conversation or workflow
- task_id: Links message to a specific A2A task
- role: Indicates whether message is from USER (requesting agent) or AGENT (responding agent)
- content: Array of A2A Part structures containing the actual message content
- metadata: Additional context for routing, processing, or debugging
- extensions: Protocol extension identifiers for future compatibility
A2A Part
Represents content within an A2A message.
message Part {
  oneof part {
    string text = 1;           // Text content
    DataPart data = 2;         // Structured data
    FilePart file = 3;         // File reference
  }
}
message DataPart {
  google.protobuf.Struct data = 1;    // Structured data content
  string description = 2;             // Optional data description
}
message FilePart {
  string file_id = 1;                 // File identifier or URI
  string filename = 2;                // Original filename
  string mime_type = 3;               // MIME type
  int64 size_bytes = 4;               // File size in bytes
  google.protobuf.Struct metadata = 5; // Additional file metadata
}
A2A Task
Represents an A2A-compliant task with lifecycle management.
message Task {
  string id = 1;                    // Required: Task identifier
  string context_id = 2;            // Optional: Conversation context
  TaskStatus status = 3;            // Required: Current task status
  repeated Message history = 4;     // Message history for this task
  repeated Artifact artifacts = 5;  // Task output artifacts
  google.protobuf.Struct metadata = 6; // Task metadata
}
message TaskStatus {
  TaskState state = 1;              // Current task state
  Message update = 2;               // Status update message
  google.protobuf.Timestamp timestamp = 3; // Status timestamp
}
enum TaskState {
  TASK_STATE_SUBMITTED = 0;    // Task created and submitted
  TASK_STATE_WORKING = 1;      // Task in progress
  TASK_STATE_COMPLETED = 2;    // Task completed successfully
  TASK_STATE_FAILED = 3;       // Task failed with error
  TASK_STATE_CANCELLED = 4;    // Task cancelled
}
A2A Artifact
Represents structured output from completed tasks.
message Artifact {
  string artifact_id = 1;           // Required: Artifact identifier
  string name = 2;                  // Human-readable name
  string description = 3;           // Artifact description
  repeated Part parts = 4;          // Artifact content parts
  google.protobuf.Struct metadata = 5; // Artifact metadata
}
EDA Event Wrapper Types
AgentEvent
Wraps A2A messages for Event-Driven Architecture transport.
message AgentEvent {
  string event_id = 1;                     // Unique event identifier
  google.protobuf.Timestamp timestamp = 2; // Event timestamp
  // A2A-compliant payload
  oneof payload {
    a2a.Message message = 10;              // A2A Message
    a2a.Task task = 11;                    // A2A Task
    TaskStatusUpdateEvent status_update = 12; // Task status change
    TaskArtifactUpdateEvent artifact_update = 13; // Artifact update
  }
  // EDA routing metadata
  AgentEventMetadata routing = 20;
  // Observability context
  string trace_id = 30;
  string span_id = 31;
}
Provides routing and delivery information for events.
message AgentEventMetadata {
  string from_agent_id = 1;               // Source agent identifier
  string to_agent_id = 2;                 // Target agent ID (empty = broadcast)
  string event_type = 3;                  // Event classification
  repeated string subscriptions = 4;      // Topic-based routing tags
  Priority priority = 5;                  // Delivery priority
}
Request/Response Messages
PublishMessageRequest
message PublishMessageRequest {
  a2a.Message message = 1;                // A2A message to publish
  AgentEventMetadata routing = 2;         // EDA routing info
}
SubscribeToTasksRequest
message SubscribeToTasksRequest {
  string agent_id = 1;                    // Agent ID for subscription
  repeated string task_types = 2;         // Optional task type filter
  repeated a2a.TaskState states = 3;      // Optional state filter
}
GetTaskRequest
message GetTaskRequest {
  string task_id = 1;                     // Task identifier
  int32 history_length = 2;               // History limit (optional)
}
API Operations
Publishing A2A Messages
PublishMessage
Publishes an A2A message for delivery through the EDA broker.
Go Example:
// Create A2A message content
content := []*pb.Part{
    {
        Part: &pb.Part_Text{
            Text: "Hello! Please process this request.",
        },
    },
    {
        Part: &pb.Part_Data{
            Data: &pb.DataPart{
                Data: &structpb.Struct{
                    Fields: map[string]*structpb.Value{
                        "operation": structpb.NewStringValue("process_data"),
                        "dataset_id": structpb.NewStringValue("dataset_123"),
                    },
                },
            },
        },
    },
}
// Create A2A message
message := &pb.Message{
    MessageId: "msg_12345",
    ContextId: "conversation_abc",
    TaskId:    "task_67890",
    Role:      pb.Role_ROLE_USER,
    Content:   content,
    Metadata: &structpb.Struct{
        Fields: map[string]*structpb.Value{
            "priority": structpb.NewStringValue("high"),
        },
    },
}
// Publish through AgentHub
response, err := client.PublishMessage(ctx, &pb.PublishMessageRequest{
    Message: message,
    Routing: &pb.AgentEventMetadata{
        FromAgentId: "requester_agent",
        ToAgentId:   "processor_agent",
        EventType:   "task_message",
        Priority:    pb.Priority_PRIORITY_HIGH,
    },
})
Subscribing to A2A Events
SubscribeToTasks
Creates a stream of A2A task events for an agent.
Go Example:
req := &pb.SubscribeToTasksRequest{
    AgentId: "processor_agent",
    TaskTypes: []string{"data_processing", "image_analysis"}, // Optional filter
}
stream, err := client.SubscribeToTasks(ctx, req)
if err != nil {
    return err
}
for {
    event, err := stream.Recv()
    if err == io.EOF {
        break
    }
    if err != nil {
        return err
    }
    // Process different event types
    switch payload := event.GetPayload().(type) {
    case *pb.AgentEvent_Task:
        task := payload.Task
        log.Printf("Received A2A task: %s", task.GetId())
        // Process task using A2A handler
        artifact, status, errorMsg := processA2ATask(ctx, task)
        // Publish completion
        publishTaskCompletion(ctx, client, task, artifact, status, errorMsg)
    case *pb.AgentEvent_StatusUpdate:
        update := payload.StatusUpdate
        log.Printf("Task %s status: %s", update.GetTaskId(), update.GetStatus().GetState())
    case *pb.AgentEvent_ArtifactUpdate:
        artifact := payload.ArtifactUpdate
        log.Printf("Received artifact for task %s", artifact.GetTaskId())
    }
}
A2A Task Management
GetTask
Retrieves the current state of an A2A task.
Go Example:
req := &pb.GetTaskRequest{
    TaskId: "task_67890",
    HistoryLength: 10, // Optional: limit message history
}
task, err := client.GetTask(ctx, req)
if err != nil {
    return err
}
log.Printf("Task %s status: %s", task.GetId(), task.GetStatus().GetState())
log.Printf("Message history: %d messages", len(task.GetHistory()))
log.Printf("Artifacts: %d artifacts", len(task.GetArtifacts()))
CancelTask
Cancels an active A2A task.
Go Example:
req := &pb.CancelTaskRequest{
    TaskId: "task_67890",
    Reason: "User requested cancellation",
}
task, err := client.CancelTask(ctx, req)
if err != nil {
    return err
}
log.Printf("Task %s cancelled", task.GetId())
Agent Discovery
GetAgentCard
Returns the broker’s A2A agent card for discovery.
Go Example:
card, err := client.GetAgentCard(ctx, &emptypb.Empty{})
if err != nil {
    return err
}
log.Printf("AgentHub broker: %s v%s", card.GetName(), card.GetVersion())
log.Printf("Protocol version: %s", card.GetProtocolVersion())
log.Printf("Capabilities: streaming=%v", card.GetCapabilities().GetStreaming())
for _, skill := range card.GetSkills() {
    log.Printf("Skill: %s - %s", skill.GetName(), skill.GetDescription())
}
RegisterAgent
Registers an agent with the broker.
Go Example:
agentCard := &pb.AgentCard{
    ProtocolVersion: "0.2.9",
    Name:           "my-processor-agent",
    Description:    "Data processing agent with A2A compliance",
    Version:        "1.0.0",
    Capabilities: &pb.AgentCapabilities{
        Streaming: true,
    },
    Skills: []*pb.AgentSkill{
        {
            Id:          "data_processing",
            Name:        "Data Processing",
            Description: "Process structured datasets",
            Tags:        []string{"data", "analysis"},
        },
    },
}
response, err := client.RegisterAgent(ctx, &pb.RegisterAgentRequest{
    AgentCard: agentCard,
    Subscriptions: []string{"data_processing", "analytics"},
})
if response.GetSuccess() {
    log.Printf("Agent registered with ID: %s", response.GetAgentId())
} else {
    log.Printf("Registration failed: %s", response.GetError())
}
High-Level A2A Client Abstractions
A2ATaskPublisher
Simplified interface for publishing A2A tasks.
taskPublisher := &agenthub.A2ATaskPublisher{
    Client:         client,
    TraceManager:   traceManager,
    MetricsManager: metricsManager,
    Logger:         logger,
    ComponentName:  "my-publisher",
    AgentID:        "my-agent-id",
}
task, err := taskPublisher.PublishTask(ctx, &agenthub.A2APublishTaskRequest{
    TaskType:         "data_analysis",
    Content:          contentParts,
    RequesterAgentID: "my-agent-id",
    ResponderAgentID: "data-processor",
    Priority:         pb.Priority_PRIORITY_MEDIUM,
    ContextID:        "analysis-session-123",
})
A2ATaskSubscriber
Simplified interface for processing A2A tasks.
taskSubscriber := agenthub.NewA2ATaskSubscriber(client, "my-agent-id")
// Register task handlers
taskSubscriber.RegisterTaskHandler("data_analysis", func(ctx context.Context, task *pb.Task, message *pb.Message) (*pb.Artifact, pb.TaskState, string) {
    // Process the A2A task
    result := processDataAnalysis(task, message)
    // Return A2A artifact
    artifact := &pb.Artifact{
        ArtifactId:  fmt.Sprintf("result_%s", task.GetId()),
        Name:        "analysis_result",
        Description: "Data analysis results",
        Parts: []*pb.Part{
            {
                Part: &pb.Part_Data{
                    Data: &pb.DataPart{
                        Data: result,
                    },
                },
            },
        },
    }
    return artifact, pb.TaskState_TASK_STATE_COMPLETED, ""
})
// Start processing A2A tasks
err := taskSubscriber.SubscribeToTasks(ctx)
Error Handling
gRPC Status Codes
AgentHub uses standard gRPC status codes:
InvalidArgument (Code: 3)
- Missing required fields (message_id, role, content)
- Invalid A2A message structure
- Malformed Part content
NotFound (Code: 5)
- Task ID not found in GetTask/CancelTask
- Agent not registered
Internal (Code: 13)
- Server-side processing errors
- Message routing failures
- A2A validation errors
Retry Patterns
func publishWithRetry(ctx context.Context, client pb.AgentHubClient, req *pb.PublishMessageRequest) error {
    for attempt := 0; attempt < 3; attempt++ {
        _, err := client.PublishMessage(ctx, req)
        if err == nil {
            return nil
        }
        // Check if error is retryable
        if status.Code(err) == codes.InvalidArgument {
            return err // Don't retry validation errors
        }
        // Exponential backoff
        time.Sleep(time.Duration(1<<attempt) * time.Second)
    }
    return fmt.Errorf("max retries exceeded")
}
Message Size Limits
- Maximum message size: 4MB (gRPC default)
- Recommended size: <100KB for optimal A2A compliance
- Large content: Use FilePart references for large data
A2A Best Practices
- Use structured Parts: Prefer DataPart for structured data over text
- Context management: Group related messages with context_id
- Artifact structure: Return well-formed Artifact objects
- Task lifecycle: Properly manage TaskState transitions
- Connection reuse: Maintain persistent gRPC connections
This completes the comprehensive A2A-compliant API reference for AgentHub, covering all message types, operations, and integration patterns with practical examples.
2.2 - AgentHub Tracing API Reference
Complete API documentation for AgentHub’s OpenTelemetry tracing integration, span management, context propagation, and instrumentation patterns.
🔍 AgentHub Tracing API Reference
Technical reference: Complete API documentation for AgentHub’s OpenTelemetry tracing integration, span management, context propagation, and instrumentation patterns.
Core Components
TraceManager
The TraceManager provides high-level tracing operations for AgentHub events.
Constructor
func NewTraceManager(serviceName string) *TraceManager
Parameters:
- serviceName- Name of the service creating spans
Returns: Configured TraceManager instance
Methods
StartPublishSpan
func (tm *TraceManager) StartPublishSpan(ctx context.Context, responderAgentID, eventType string) (context.Context, trace.Span)
Purpose: Creates a span for event publishing operations
Parameters:
- ctx- Parent context (may contain existing trace)
- responderAgentID- Target agent for the event
- eventType- Type of event being published
Returns:
- context.Context- New context with active span
- trace.Span- The created span
Attributes Set:
- event.type- Event type being published
- responder.agent- Target agent ID
- operation.type- “publish”
Usage:
ctx, span := tm.StartPublishSpan(ctx, "agent_subscriber", "greeting")
defer span.End()
// ... publishing logic
StartEventProcessingSpan
func (tm *TraceManager) StartEventProcessingSpan(ctx context.Context, eventID, eventType, requesterAgentID, responderAgentID string) (context.Context, trace.Span)
Purpose: Creates a span for event processing operations
Parameters:
- ctx- Context with extracted trace information
- eventID- Unique identifier for the event
- eventType- Type of event being processed
- requesterAgentID- Agent that requested processing
- responderAgentID- Agent performing processing
Returns:
- context.Context- Context with processing span
- trace.Span- The processing span
Attributes Set:
- event.id- Event identifier
- event.type- Event type
- requester.agent- Requesting agent ID
- responder.agent- Processing agent ID
- operation.type- “process”
StartBrokerSpan
func (tm *TraceManager) StartBrokerSpan(ctx context.Context, operation, eventType string) (context.Context, trace.Span)
Purpose: Creates spans for broker operations
Parameters:
- ctx- Request context
- operation- Broker operation (route, subscribe, unsubscribe)
- eventType- Event type being handled
Returns:
- context.Context- Context with broker span
- trace.Span- The broker span
Attributes Set:
- operation.type- Broker operation type
- event.type- Event type being handled
- component- “broker”
InjectTraceContext
func (tm *TraceManager) InjectTraceContext(ctx context.Context, headers map[string]string)
Purpose: Injects trace context into headers for propagation
Parameters:
- ctx- Context containing trace information
- headers- Map to inject headers into
Headers Injected:
- traceparent- W3C trace context header
- tracestate- W3C trace state header (if present)
Usage:
headers := make(map[string]string)
tm.InjectTraceContext(ctx, headers)
// headers now contain trace context for propagation
func (tm *TraceManager) ExtractTraceContext(ctx context.Context, headers map[string]string) context.Context
Purpose: Extracts trace context from headers
Parameters:
- ctx- Base context
- headers- Headers containing trace context
Returns: Context with extracted trace information
Usage:
// Extract from event metadata
if metadata := event.GetMetadata(); metadata != nil {
    if traceHeaders, ok := metadata.Fields["trace_headers"]; ok {
        headers := structFieldsToStringMap(traceHeaders.GetStructValue().Fields)
        ctx = tm.ExtractTraceContext(ctx, headers)
    }
}
RecordError
func (tm *TraceManager) RecordError(span trace.Span, err error)
Purpose: Records an error on a span with proper formatting
Parameters:
- span- Span to record error on
- err- Error to record
Effects:
- Sets span status to error
- Records error as span event
- Adds error type attribute
SetSpanSuccess
func (tm *TraceManager) SetSpanSuccess(span trace.Span)
Purpose: Marks a span as successful
Parameters:
- span- Span to mark as successful
Effects:
- Sets span status to OK
- Ensures span is properly completed
Context Propagation
W3C Trace Context Standards
AgentHub uses the W3C Trace Context specification for interoperability.
traceparent
Format: 00-{trace-id}-{span-id}-{trace-flags}
- 00- Version (currently always 00)
- trace-id- 32-character hex string
- span-id- 16-character hex string
- trace-flags- 2-character hex flags
Example: 00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01
tracestate
Format: Vendor-specific key-value pairs
Example: rojo=00f067aa0ba902b7,congo=t61rcWkgMzE
Propagation Implementation
Manual Injection
// Create headers map
headers := make(map[string]string)
// Inject trace context
otel.GetTextMapPropagator().Inject(ctx, propagation.MapCarrier(headers))
// Headers now contain trace context
// Convert to protobuf metadata if needed
metadataStruct, err := structpb.NewStruct(map[string]interface{}{
    "trace_headers": headers,
    "timestamp": time.Now().Format(time.RFC3339),
})
// Extract from protobuf metadata
if metadata := task.GetMetadata(); metadata != nil {
    if traceHeaders, ok := metadata.Fields["trace_headers"]; ok {
        headers := make(map[string]string)
        for k, v := range traceHeaders.GetStructValue().Fields {
            headers[k] = v.GetStringValue()
        }
        ctx = otel.GetTextMapPropagator().Extract(ctx, propagation.MapCarrier(headers))
    }
}
Span Lifecycle Management
Creating Spans
Basic Span Creation
tracer := otel.Tracer("my-service")
ctx, span := tracer.Start(ctx, "operation_name")
defer span.End()
Span with Attributes
ctx, span := tracer.Start(ctx, "operation_name", trace.WithAttributes(
    attribute.String("operation.type", "publish"),
    attribute.String("event.type", "greeting"),
    attribute.Int("event.priority", 1),
))
defer span.End()
Child Span Creation
// Parent span
ctx, parentSpan := tracer.Start(ctx, "parent_operation")
defer parentSpan.End()
// Child span (automatically linked)
ctx, childSpan := tracer.Start(ctx, "child_operation")
defer childSpan.End()
Span Attributes
Standard Attributes
AgentHub uses consistent attribute naming:
// Event attributes
attribute.String("event.id", taskID)
attribute.String("event.type", taskType)
attribute.Int("event.priority", priority)
// Agent attributes
attribute.String("agent.id", agentID)
attribute.String("agent.type", agentType)
attribute.String("requester.agent", requesterID)
attribute.String("responder.agent", responderID)
// Operation attributes
attribute.String("operation.type", "publish|process|route")
attribute.String("component", "broker|publisher|subscriber")
// Result attributes
attribute.Bool("success", true)
attribute.String("error.type", "validation|timeout|network")
Custom Attributes
span.SetAttributes(
    attribute.String("business.unit", "sales"),
    attribute.String("user.tenant", "acme-corp"),
    attribute.Int("batch.size", len(items)),
    attribute.Duration("timeout", 30*time.Second),
)
Span Events
Adding Events
// Simple event
span.AddEvent("validation.started")
// Event with attributes
span.AddEvent("cache.miss", trace.WithAttributes(
    attribute.String("cache.key", key),
    attribute.String("cache.type", "redis"),
))
// Event with timestamp
span.AddEvent("external.api.call", trace.WithAttributes(
    attribute.String("api.endpoint", "/v1/users"),
    attribute.Int("api.status_code", 200),
), trace.WithTimestamp(time.Now()))
Common Event Patterns
// Processing milestones
span.AddEvent("processing.started")
span.AddEvent("validation.completed")
span.AddEvent("business.logic.completed")
span.AddEvent("result.published")
// Error events
span.AddEvent("error.occurred", trace.WithAttributes(
    attribute.String("error.message", err.Error()),
    attribute.String("error.stack", debug.Stack()),
))
Span Status
Setting Status
// Success
span.SetStatus(codes.Ok, "")
// Error with message
span.SetStatus(codes.Error, "validation failed")
// Error without message
span.SetStatus(codes.Error, "")
Status Code Mapping
// gRPC codes to OpenTelemetry codes
statusCode := codes.Ok
if err != nil {
    switch {
    case errors.Is(err, context.DeadlineExceeded):
        statusCode = codes.DeadlineExceeded
    case errors.Is(err, context.Canceled):
        statusCode = codes.Cancelled
    default:
        statusCode = codes.Error
    }
}
span.SetStatus(statusCode, err.Error())
Advanced Instrumentation
Baggage Propagation
Setting Baggage
// Add baggage to context
ctx = baggage.ContextWithValues(ctx,
    baggage.String("user.id", userID),
    baggage.String("tenant.id", tenantID),
    baggage.String("request.id", requestID),
)
Reading Baggage
// Read baggage anywhere in the trace
if member := baggage.FromContext(ctx).Member("user.id"); member.Value() != "" {
    userID := member.Value()
    // Use user ID for business logic
}
Span Links
Creating Links
// Link to related span
linkedSpanContext := trace.SpanContextFromContext(relatedCtx)
ctx, span := tracer.Start(ctx, "operation", trace.WithLinks(
    trace.Link{
        SpanContext: linkedSpanContext,
        Attributes: []attribute.KeyValue{
            attribute.String("link.type", "related_operation"),
        },
    },
))
Sampling Control
Conditional Sampling
// Force sampling for important operations
ctx, span := tracer.Start(ctx, "critical_operation",
    trace.WithNewRoot(), // Start new trace
    trace.WithSpanKind(trace.SpanKindServer),
)
// Add sampling priority
span.SetAttributes(
    attribute.String("sampling.priority", "high"),
)
Integration Patterns
gRPC Integration
Server Interceptor
func TracingUnaryInterceptor(tracer trace.Tracer) grpc.UnaryServerInterceptor {
    return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
        ctx, span := tracer.Start(ctx, info.FullMethod)
        defer span.End()
        resp, err := handler(ctx, req)
        if err != nil {
            span.RecordError(err)
            span.SetStatus(codes.Error, err.Error())
        }
        return resp, err
    }
}
Client Interceptor
func TracingUnaryClientInterceptor(tracer trace.Tracer) grpc.UnaryClientInterceptor {
    return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
        ctx, span := tracer.Start(ctx, method)
        defer span.End()
        err := invoker(ctx, method, req, reply, cc, opts...)
        if err != nil {
            span.RecordError(err)
            span.SetStatus(codes.Error, err.Error())
        }
        return err
    }
}
HTTP Integration
HTTP Handler Wrapper
func TracingHandler(tracer trace.Tracer, next http.Handler) http.Handler {
    return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
        ctx := otel.GetTextMapPropagator().Extract(r.Context(), propagation.HeaderCarrier(r.Header))
        ctx, span := tracer.Start(ctx, r.Method+" "+r.URL.Path)
        defer span.End()
        span.SetAttributes(
            attribute.String("http.method", r.Method),
            attribute.String("http.url", r.URL.String()),
            attribute.String("http.user_agent", r.UserAgent()),
        )
        next.ServeHTTP(w, r.WithContext(ctx))
    })
}
Error Handling
Error Recording Best Practices
Complete Error Recording
if err != nil {
    // Record error on span
    span.RecordError(err)
    span.SetStatus(codes.Error, err.Error())
    // Add error context
    span.SetAttributes(
        attribute.String("error.type", classifyError(err)),
        attribute.Bool("error.retryable", isRetryable(err)),
    )
    // Log with context
    logger.ErrorContext(ctx, "Operation failed",
        slog.Any("error", err),
        slog.String("operation", "event_processing"),
    )
    return err
}
Error Classification
func classifyError(err error) string {
    switch {
    case errors.Is(err, context.DeadlineExceeded):
        return "timeout"
    case errors.Is(err, context.Canceled):
        return "cancelled"
    case strings.Contains(err.Error(), "connection"):
        return "network"
    case strings.Contains(err.Error(), "validation"):
        return "validation"
    default:
        return "unknown"
    }
}
Span Creation Overhead
- Span creation: ~1-2μs per span
- Attribute setting: ~100ns per attribute
- Event recording: ~200ns per event
- Context propagation: ~500ns per injection/extraction
Memory Usage
- Active span: ~500 bytes
- Completed span buffer: ~1KB per span
- Context overhead: ~100 bytes per context
Best Practices
- Limit span attributes to essential information
- Use batch exporters to reduce network overhead
- Sample appropriately for high-throughput services
- Pool span contexts where possible
- Avoid deep span nesting (>10 levels)
Troubleshooting
Missing Spans Checklist
- ✅ OpenTelemetry properly initialized
- ✅ Tracer retrieved from global provider
- ✅ Context propagated correctly
- ✅ Spans properly ended
- ✅ Exporter configured and accessible
Common Issues
Broken Trace Chains
// ❌ Wrong - creates new root trace
ctx, span := tracer.Start(context.Background(), "operation")
// ✅ Correct - continues existing trace
ctx, span := tracer.Start(ctx, "operation")
Missing Context Propagation
// ❌ Wrong - context not propagated
go func() {
    ctx, span := tracer.Start(context.Background(), "async_work")
    // work...
}()
// ✅ Correct - context properly propagated
go func(ctx context.Context) {
    ctx, span := tracer.Start(ctx, "async_work")
    // work...
}(ctx)
🎯 Next Steps:
Implementation: Add Observability to Your Agent
Debugging: Debug with Distributed Tracing
Metrics: Observability Metrics Reference
2.3 - Unified Abstraction Library API Reference
The AgentHub unified abstraction library provides simplified APIs for building gRPC-based agent communication systems with built-in observability, automatic configuration, and correlation tracking.
Unified Abstraction Library API Reference
The AgentHub unified abstraction library provides simplified APIs for building gRPC-based agent communication systems with built-in observability, automatic configuration, and correlation tracking.
Package: internal/agenthub
The internal/agenthub package contains the core unified abstraction components that dramatically simplify AgentHub development by providing high-level APIs with automatic observability integration.
Overview
The unified abstraction library reduces agent implementation complexity from 380+ lines to ~29 lines by providing:
- Automatic gRPC Setup: One-line server and client creation
- Built-in Observability: Integrated OpenTelemetry tracing and metrics
- Environment-Based Configuration: Automatic configuration from environment variables
- Correlation Tracking: Automatic correlation ID generation and propagation
- Pluggable Architecture: Simple task handler registration
Core Components
GRPCConfig
Configuration structure for gRPC servers and clients with environment-based initialization.
type GRPCConfig struct {
    ServerAddr    string // gRPC server listen address (e.g., ":50051")
    BrokerAddr    string // Broker connection address (e.g., "localhost:50051")
    HealthPort    string // Health check endpoint port
    ComponentName string // Component identifier for observability
}
Constructor
func NewGRPCConfig(componentName string) *GRPCConfig
Creates a new gRPC configuration with environment variable defaults:
| Environment Variable | Default | Description | 
|---|
| AGENTHUB_BROKER_ADDR | localhost | Broker server host | 
| AGENTHUB_BROKER_PORT | 50051 | Broker gRPC port | 
| AGENTHUB_GRPC_PORT | :50051 | Server listen port | 
| BROKER_HEALTH_PORT | 8080 | Health endpoint port | 
Example:
config := agenthub.NewGRPCConfig("my-agent")
// Results in BrokerAddr: "localhost:50051" (automatically combined)
AgentHubServer
High-level gRPC server wrapper with integrated observability.
type AgentHubServer struct {
    Server         *grpc.Server                    // Underlying gRPC server
    Listener       net.Listener                    // Network listener
    Observability  *observability.Observability    // OpenTelemetry integration
    TraceManager   *observability.TraceManager     // Distributed tracing
    MetricsManager *observability.MetricsManager   // Metrics collection
    HealthServer   *observability.HealthServer     // Health monitoring
    Logger         *slog.Logger                    // Structured logging
    Config         *GRPCConfig                     // Configuration
}
Constructor
func NewAgentHubServer(config *GRPCConfig) (*AgentHubServer, error)
Creates a complete gRPC server with:
- OpenTelemetry instrumentation
- Health check endpoints
- Metrics collection
- Structured logging with trace correlation
Methods
func (s *AgentHubServer) Start(ctx context.Context) error
Starts the server with automatic:
- Health endpoint setup (/health,/ready,/metrics)
- Metrics collection goroutine
- gRPC server with observability
func (s *AgentHubServer) Shutdown(ctx context.Context) error
Gracefully shuts down all components:
- gRPC server graceful stop
- Health server shutdown
- Observability cleanup
Example:
config := agenthub.NewGRPCConfig("broker")
server, err := agenthub.NewAgentHubServer(config)
if err != nil {
    log.Fatal(err)
}
// Register services
eventBusService := agenthub.NewEventBusService(server)
pb.RegisterEventBusServer(server.Server, eventBusService)
// Start server
if err := server.Start(ctx); err != nil {
    log.Fatal(err)
}
AgentHubClient
High-level gRPC client wrapper with integrated observability.
type AgentHubClient struct {
    Client         pb.EventBusClient               // gRPC client
    Connection     *grpc.ClientConn                // Connection
    Observability  *observability.Observability    // OpenTelemetry integration
    TraceManager   *observability.TraceManager     // Distributed tracing
    MetricsManager *observability.MetricsManager   // Metrics collection
    HealthServer   *observability.HealthServer     // Health monitoring
    Logger         *slog.Logger                    // Structured logging
    Config         *GRPCConfig                     // Configuration
}
Constructor
func NewAgentHubClient(config *GRPCConfig) (*AgentHubClient, error)
Creates a complete gRPC client with:
- OpenTelemetry instrumentation
- Connection health monitoring
- Metrics collection
- Automatic retry and timeout handling
Methods
func (c *AgentHubClient) Start(ctx context.Context) error
Initializes client with health monitoring and metrics collection.
func (c *AgentHubClient) Shutdown(ctx context.Context) error
Gracefully closes connection and cleans up resources.
Example:
config := agenthub.NewGRPCConfig("publisher")
client, err := agenthub.NewAgentHubClient(config)
if err != nil {
    log.Fatal(err)
}
err = client.Start(ctx)
if err != nil {
    log.Fatal(err)
}
// Use client.Client for gRPC calls
Service Abstractions
EventBusService
Broker service implementation with built-in observability and correlation tracking.
type EventBusService struct {
    Server          *AgentHubServer
    subscriptions   map[string][]Subscription
    resultSubs      map[string][]ResultSubscription
    progressSubs    map[string][]ProgressSubscription
    mu              sync.RWMutex
}
Constructor
func NewEventBusService(server *AgentHubServer) *EventBusService
Creates an EventBus service with automatic:
- Subscription management
- Task routing and correlation
- Observability integration
Key Methods
func (s *EventBusService) PublishTask(ctx context.Context, req *pb.PublishTaskRequest) (*pb.PublishResponse, error)
Publishes tasks with automatic:
- Input validation
- Correlation ID generation
- Distributed tracing
- Metrics collection
func (s *EventBusService) SubscribeToTasks(req *pb.SubscribeToTasksRequest, stream pb.EventBus_SubscribeToTasksServer) error
Manages task subscriptions with:
- Automatic subscription lifecycle
- Context cancellation handling
- Error recovery
SubscriberAgent
High-level subscriber implementation with pluggable task handlers.
type SubscriberAgent struct {
    client      *AgentHubClient
    agentID     string
    handlers    map[string]TaskHandler
    ctx         context.Context
    cancel      context.CancelFunc
}
Constructor
func NewSubscriberAgent(client *AgentHubClient, agentID string) *SubscriberAgent
Task Handler Interface
type TaskHandler interface {
    Handle(ctx context.Context, task *pb.TaskMessage) (*pb.TaskResult, error)
}
Methods
func (s *SubscriberAgent) RegisterHandler(taskType string, handler TaskHandler)
Registers handlers for specific task types with automatic:
- Task routing
- Error handling
- Result publishing
func (s *SubscriberAgent) Start(ctx context.Context) error
Starts the subscriber with automatic:
- Task subscription
- Handler dispatch
- Observability integration
Example:
type GreetingHandler struct{}
func (h *GreetingHandler) Handle(ctx context.Context, task *pb.TaskMessage) (*pb.TaskResult, error) {
    // Process greeting task
    return result, nil
}
// Register handler
subscriber.RegisterHandler("greeting", &GreetingHandler{})
Utility Functions
func ExtractCorrelationID(ctx context.Context) string
func InjectCorrelationID(ctx context.Context, correlationID string) context.Context
func GenerateCorrelationID() string
Automatic correlation ID management for distributed tracing.
Metrics Helpers
func NewMetricsTicker(ctx context.Context, manager *observability.MetricsManager) *MetricsTicker
Automatic metrics collection with configurable intervals.
Configuration Reference
Environment Variables
The unified abstraction library uses environment-based configuration:
| Variable | Type | Default | Description | 
|---|
| AGENTHUB_BROKER_ADDR | string | localhost | Broker server hostname | 
| AGENTHUB_BROKER_PORT | string | 50051 | Broker gRPC port | 
| AGENTHUB_GRPC_PORT | string | :50051 | Server listen address | 
| BROKER_HEALTH_PORT | string | 8080 | Health endpoint port | 
| SERVICE_VERSION | string | 1.0.0 | Service version for observability | 
| ENVIRONMENT | string | development | Deployment environment | 
Observability Integration
The unified abstraction automatically configures:
- OpenTelemetry Tracing: Automatic span creation and context propagation
- Prometheus Metrics: 47+ built-in metrics for performance monitoring
- Health Checks: Comprehensive health endpoints for service monitoring
- Structured Logging: Correlated logging with trace context
| Metric | Standard gRPC | Unified Abstraction | Overhead | 
|---|
| Setup Complexity | 380+ lines | ~29 lines | -92% code | 
| Throughput | 10,000+ tasks/sec | 9,500+ tasks/sec | -5% | 
| Latency | Baseline | +10ms for tracing | +10ms | 
| Memory | Baseline | +50MB per agent | +50MB | 
| CPU | Baseline | +5% for observability | +5% | 
Migration Guide
From Standard gRPC
Before (Standard gRPC):
// 380+ lines of boilerplate code
lis, err := net.Listen("tcp", ":50051")
server := grpc.NewServer()
// ... extensive setup code
After (Unified Abstraction):
// 29 lines total
config := agenthub.NewGRPCConfig("my-service")
server, err := agenthub.NewAgentHubServer(config)
service := agenthub.NewEventBusService(server)
pb.RegisterEventBusServer(server.Server, service)
server.Start(ctx)
Observability Benefits
The unified abstraction provides automatic:
- Distributed Tracing: Every request automatically traced
- Metrics Collection: 47+ metrics without configuration
- Health Monitoring: Built-in health and readiness endpoints
- Error Correlation: Automatic error tracking across services
- Performance Monitoring: Latency, throughput, and error rates
Error Handling
The unified abstraction provides comprehensive error handling:
- Automatic Retries: Built-in retry logic for transient failures
- Circuit Breaking: Protection against cascading failures
- Graceful Degradation: Service continues operating during partial failures
- Error Correlation: Distributed error tracking across service boundaries
Best Practices
1. Configuration Management
// Use environment-based configuration
config := agenthub.NewGRPCConfig("my-service")
// Override specific values if needed
config.HealthPort = "8083"
2. Handler Registration
// Register handlers before starting
subscriber.RegisterHandler("task-type", handler)
subscriber.Start(ctx)
3. Graceful Shutdown
// Always implement proper shutdown
defer func() {
    ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
    defer cancel()
    server.Shutdown(ctx)
}()
4. Error Handling
// Use context for cancellation
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
result, err := client.Client.PublishTask(ctx, request)
if err != nil {
    // Error is automatically traced and logged
    return fmt.Errorf("failed to publish task: %w", err)
}
See Also
3 - Observability
Monitoring, metrics, and observability reference
Observability Reference
This section provides reference documentation for all observability features, including metrics, health endpoints, and monitoring capabilities.
Available Documentation
3.1 - AgentHub Health Endpoints Reference
Complete documentation for AgentHub’s health monitoring APIs, endpoint specifications, status codes, and integration patterns.
AgentHub Health Endpoints Reference
Technical reference: Complete documentation for AgentHub’s health monitoring APIs, endpoint specifications, status codes, and integration patterns.
Overview
Every observable AgentHub service exposes standardized health endpoints for monitoring, load balancing, and operational management.
Standard Endpoints
Health Check Endpoint
/health
Purpose: Comprehensive service health status
Method: GET
Port: Service-specific (8080-8083)
Response Format:
{
  "status": "healthy|degraded|unhealthy",
  "timestamp": "2025-09-28T21:00:00.000Z",
  "service": "agenthub-broker",
  "version": "1.0.0",
  "uptime": "2h34m12s",
  "checks": [
    {
      "name": "self",
      "status": "healthy",
      "message": "Service is running normally",
      "last_checked": "2025-09-28T21:00:00.000Z",
      "duration": "1.2ms"
    },
    {
      "name": "database_connection",
      "status": "healthy",
      "message": "Database connection is active",
      "last_checked": "2025-09-28T21:00:00.000Z",
      "duration": "15.6ms"
    }
  ]
}
Status Codes:
- 200 OK- All checks healthy
- 503 Service Unavailable- One or more checks unhealthy
- 500 Internal Server Error- Health check system failure
Readiness Endpoint
/ready
Purpose: Service readiness for traffic acceptance
Method: GET
Response Format:
{
  "ready": true,
  "timestamp": "2025-09-28T21:00:00.000Z",
  "service": "agenthub-broker",
  "dependencies": [
    {
      "name": "grpc_server",
      "ready": true,
      "message": "gRPC server listening on :50051"
    },
    {
      "name": "observability",
      "ready": true,
      "message": "OpenTelemetry initialized"
    }
  ]
}
Status Codes:
- 200 OK- Service ready for traffic
- 503 Service Unavailable- Service not ready
Metrics Endpoint
/metrics
Purpose: Prometheus metrics exposure
Method: GET
Content-Type: text/plain
Response Format:
# HELP events_processed_total Total number of events processed
# TYPE events_processed_total counter
events_processed_total{service="agenthub-broker",event_type="greeting",success="true"} 1234
# HELP system_cpu_usage_percent CPU usage percentage
# TYPE system_cpu_usage_percent gauge
system_cpu_usage_percent{service="agenthub-broker"} 23.4
Status Codes:
- 200 OK- Metrics available
- 500 Internal Server Error- Metrics collection failure
Service-Specific Configurations
Broker (Port 8080)
Health Checks:
- self- Basic service health
- grpc_server- gRPC server status
- observability- OpenTelemetry health
Example URLs:
- Health: http://localhost:8080/health
- Ready: http://localhost:8080/ready
- Metrics: http://localhost:8080/metrics
Publisher (Port 8081)
Health Checks:
- self- Basic service health
- broker_connection- Connection to AgentHub broker
- observability- Tracing and metrics health
Example URLs:
- Health: http://localhost:8081/health
- Ready: http://localhost:8081/ready
- Metrics: http://localhost:8081/metrics
Subscriber (Port 8082)
Health Checks:
- self- Basic service health
- broker_connection- Connection to AgentHub broker
- task_processor- Task processing capability
- observability- Observability stack health
Example URLs:
- Health: http://localhost:8082/health
- Ready: http://localhost:8082/ready
- Metrics: http://localhost:8082/metrics
Custom Agents (Port 8083+)
Configurable Health Checks:
- Custom business logic checks
- External dependency checks
- Resource availability checks
Health Check Types
BasicHealthChecker
Purpose: Simple function-based health checks
Implementation:
checker := observability.NewBasicHealthChecker("database", func(ctx context.Context) error {
    return db.Ping()
})
healthServer.AddChecker("database", checker)
Use Cases:
- Database connectivity
- File system access
- Configuration validation
- Memory/disk space checks
GRPCHealthChecker
Purpose: gRPC connection health verification
Implementation:
checker := observability.NewGRPCHealthChecker("broker_connection", "localhost:50051")
healthServer.AddChecker("broker_connection", checker)
Use Cases:
- AgentHub broker connectivity
- External gRPC service dependencies
- Service mesh health
HTTPHealthChecker
Purpose: HTTP endpoint health verification
Implementation:
checker := observability.NewHTTPHealthChecker("api_gateway", "http://gateway:8080/health")
healthServer.AddChecker("api_gateway", checker)
Use Cases:
- REST API dependencies
- Web service health
- Load balancer backends
Custom Health Checkers
Interface:
type HealthChecker interface {
    Check(ctx context.Context) error
    Name() string
}
Custom Implementation Example:
type BusinessLogicChecker struct {
    name string
    validator func() error
}
func (c *BusinessLogicChecker) Check(ctx context.Context) error {
    return c.validator()
}
func (c *BusinessLogicChecker) Name() string {
    return c.name
}
// Usage
checker := &BusinessLogicChecker{
    name: "license_validation",
    validator: func() error {
        if time.Now().After(licenseExpiry) {
            return errors.New("license expired")
        }
        return nil
    },
}
Health Check Configuration
Check Intervals
Default Intervals:
- Active checks: Every 30 seconds
- On-demand checks: Per request
- Startup checks: During service initialization
Configurable Timing:
config := observability.HealthConfig{
    CheckInterval: 15 * time.Second,
    Timeout:       5 * time.Second,
    RetryCount:    3,
    RetryDelay:    1 * time.Second,
}
Timeout Configuration
Per-Check Timeouts:
checker := observability.NewBasicHealthChecker("slow_service",
    func(ctx context.Context) error {
        // This check will timeout after 10 seconds
        return slowOperation(ctx)
    }).WithTimeout(10 * time.Second)
Global Timeout:
healthServer := observability.NewHealthServer("8080", "my-service", "1.0.0")
healthServer.SetGlobalTimeout(30 * time.Second)
Integration Patterns
Kubernetes Integration
Liveness Probe
livenessProbe:
  httpGet:
    path: /health
    port: 8080
  initialDelaySeconds: 30
  periodSeconds: 10
  timeoutSeconds: 5
  failureThreshold: 3
Readiness Probe
readinessProbe:
  httpGet:
    path: /ready
    port: 8080
  initialDelaySeconds: 5
  periodSeconds: 5
  timeoutSeconds: 3
  failureThreshold: 2
Startup Probe
startupProbe:
  httpGet:
    path: /ready
    port: 8080
  initialDelaySeconds: 10
  periodSeconds: 5
  timeoutSeconds: 3
  failureThreshold: 30
Load Balancer Integration
HAProxy Configuration
backend agentHub_brokers
    balance roundrobin
    option httpchk GET /health
    server broker1 broker1:8080 check
    server broker2 broker2:8080 check
NGINX Configuration
upstream agenthub_backend {
    server broker1:8080;
    server broker2:8080;
}
location /health_check {
    proxy_pass http://agenthub_backend/health;
    proxy_set_header Host $host;
}
Prometheus Integration
Service Discovery
- job_name: 'agenthub-health'
  static_configs:
    - targets:
      - 'broker:8080'
      - 'publisher:8081'
      - 'subscriber:8082'
  metrics_path: '/metrics'
  scrape_interval: 10s
  scrape_timeout: 5s
Health Check Metrics
# Health check status (1=healthy, 0=unhealthy)
health_check_status{service="agenthub-broker",check="database"}
# Health check duration
health_check_duration_seconds{service="agenthub-broker",check="database"}
# Service uptime
service_uptime_seconds{service="agenthub-broker"}
Status Definitions
Service Status Levels
Healthy
Definition: All health checks passing
HTTP Status: 200 OK
Criteria:
- All registered checks return no error
- Service is fully operational
- All dependencies available
Degraded
Definition: Service operational but with limitations
HTTP Status: 200 OK (with warning indicators)
Criteria:
- Critical checks passing
- Non-critical checks may be failing
- Service can handle requests with reduced functionality
Unhealthy
Definition: Service cannot handle requests properly
HTTP Status: 503 Service Unavailable
Criteria:
- One or more critical checks failing
- Service should not receive new requests
- Requires intervention or automatic recovery
Check-Level Status
Passing
- Check completed successfully
- No errors detected
- Within acceptable parameters
Warning
- Check completed with minor issues
- Service functional but attention needed
- May indicate future problems
Critical
- Check failed
- Service functionality compromised
- Immediate attention required
Monitoring and Alerting
Critical Alerts
# Service down alert
- alert: ServiceHealthCheckFailing
  expr: health_check_status == 0
  for: 1m
  labels:
    severity: critical
  annotations:
    summary: "Service health check failing"
    description: "{{ $labels.service }} health check {{ $labels.check }} is failing"
# Service not ready alert
- alert: ServiceNotReady
  expr: up{job=~"agenthub-.*"} == 0
  for: 30s
  labels:
    severity: critical
  annotations:
    summary: "Service not responding"
    description: "{{ $labels.instance }} is not responding to health checks"
Warning Alerts
# Slow health checks
- alert: SlowHealthChecks
  expr: health_check_duration_seconds > 5
  for: 5m
  labels:
    severity: warning
  annotations:
    summary: "Health checks taking too long"
    description: "{{ $labels.service }} health check {{ $labels.check }} taking {{ $value }}s"
# Service degraded
- alert: ServiceDegraded
  expr: service_status == 1  # degraded status
  for: 10m
  labels:
    severity: warning
  annotations:
    summary: "Service running in degraded mode"
    description: "{{ $labels.service }} is degraded but still operational"
API Response Examples
Healthy Service Response
curl http://localhost:8080/health
{
  "status": "healthy",
  "timestamp": "2025-09-28T21:00:00.000Z",
  "service": "agenthub-broker",
  "version": "1.0.0",
  "uptime": "2h34m12s",
  "checks": [
    {
      "name": "self",
      "status": "healthy",
      "message": "Service is running normally",
      "last_checked": "2025-09-28T21:00:00.000Z",
      "duration": "1.2ms"
    },
    {
      "name": "grpc_server",
      "status": "healthy",
      "message": "gRPC server listening on :50051",
      "last_checked": "2025-09-28T21:00:00.000Z",
      "duration": "0.8ms"
    },
    {
      "name": "observability",
      "status": "healthy",
      "message": "OpenTelemetry exporter connected",
      "last_checked": "2025-09-28T21:00:00.000Z",
      "duration": "12.4ms"
    }
  ]
}
Unhealthy Service Response
curl http://localhost:8080/health
{
  "status": "unhealthy",
  "timestamp": "2025-09-28T21:00:00.000Z",
  "service": "agenthub-broker",
  "version": "1.0.0",
  "uptime": "2h34m12s",
  "checks": [
    {
      "name": "self",
      "status": "healthy",
      "message": "Service is running normally",
      "last_checked": "2025-09-28T21:00:00.000Z",
      "duration": "1.2ms"
    },
    {
      "name": "grpc_server",
      "status": "unhealthy",
      "message": "Failed to bind to port :50051: address already in use",
      "last_checked": "2025-09-28T21:00:00.000Z",
      "duration": "0.1ms"
    },
    {
      "name": "observability",
      "status": "healthy",
      "message": "OpenTelemetry exporter connected",
      "last_checked": "2025-09-28T21:00:00.000Z",
      "duration": "12.4ms"
    }
  ]
}
Best Practices
Health Check Design
- Fast Execution: Keep checks under 5 seconds
- Meaningful Tests: Test actual functionality, not just process existence
- Idempotent Operations: Checks should not modify system state
- Appropriate Timeouts: Set reasonable timeouts for external dependencies
- Clear Messages: Provide actionable error messages
Dependency Management
- Critical vs Non-Critical: Distinguish between essential and optional dependencies
- Cascade Prevention: Avoid cascading failures through dependency chains
- Circuit Breakers: Implement circuit breakers for flaky dependencies
- Graceful Degradation: Continue operating when non-critical dependencies fail
Operational Considerations
- Monitoring: Set up alerts for health check failures
- Documentation: Document what each health check validates
- Testing: Test health checks in development and staging
- Versioning: Version health check APIs for compatibility
🎯 Next Steps:
Implementation: Add Observability to Your Agent
Monitoring: Use Grafana Dashboards
Metrics: Observability Metrics Reference
3.2 - AgentHub Observability Metrics Reference
Complete catalog of all metrics exposed by AgentHub’s observability system, their meanings, usage patterns, and query examples.
AgentHub Observability Metrics Reference
Technical reference: Complete catalog of all metrics exposed by AgentHub’s observability system, their meanings, usage patterns, and query examples.
Overview
AgentHub automatically collects 47+ distinct metrics across all observable services, providing comprehensive visibility into event processing, system health, and performance characteristics.
Metric Categories
A2A Message Processing Metrics
a2a_messages_processed_total
Type: Counter
Description: Total number of A2A messages processed by service
Labels:
- service- Service name (agenthub, publisher, subscriber)
- message_type- Type of A2A message (task_update, message, artifact)
- success- Processing success (true/false)
- context_id- A2A conversation context (for workflow tracking)
Usage:
# A2A message processing rate per service
rate(a2a_messages_processed_total[5m])
# Success rate by A2A message type
rate(a2a_messages_processed_total{success="true"}[5m]) / rate(a2a_messages_processed_total[5m]) * 100
# Error rate across all A2A services
rate(a2a_messages_processed_total{success="false"}[5m]) / rate(a2a_messages_processed_total[5m]) * 100
# Workflow processing rate by context
rate(a2a_messages_processed_total[5m]) by (context_id)
a2a_messages_published_total
Type: Counter
Description: Total number of A2A messages published by agents
Labels:
- message_type- Type of A2A message published
- from_agent_id- Publishing agent identifier
- to_agent_id- Target agent identifier (empty for broadcast)
Usage:
# A2A publishing rate by message type
rate(a2a_messages_published_total[5m]) by (message_type)
# Most active A2A publishers
topk(5, rate(a2a_messages_published_total[5m]) by (from_agent_id))
# Broadcast vs direct messaging ratio
rate(a2a_messages_published_total{to_agent_id=""}[5m]) / rate(a2a_messages_published_total[5m])
a2a_message_processing_duration_seconds
Type: Histogram
Description: Time taken to process A2A messages
Labels:
- service- Service processing the message
- message_type- Type of A2A message being processed
- task_state- Current A2A task state (for task-related messages)
Buckets: 0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10
Usage:
# p95 A2A message processing latency
histogram_quantile(0.95, rate(a2a_message_processing_duration_seconds_bucket[5m]))
# p99 latency by service
histogram_quantile(0.99, rate(a2a_message_processing_duration_seconds_bucket[5m])) by (service)
# Average A2A processing time by task state
rate(a2a_message_processing_duration_seconds_sum[5m]) / rate(a2a_message_processing_duration_seconds_count[5m]) by (task_state)
a2a_message_errors_total
Type: Counter
Description: Total number of A2A message processing errors
Labels:
- service- Service where error occurred
- message_type- Type of A2A message that failed
- error_type- Category of error (grpc_error, validation_error, protocol_error, etc.)
- a2a_version- A2A protocol version for compatibility tracking
Usage:
# A2A error rate by error type
rate(a2a_message_errors_total[5m]) by (error_type)
# Services with highest A2A error rates
topk(3, rate(a2a_message_errors_total[5m]) by (service))
# A2A protocol version compatibility issues
rate(a2a_message_errors_total{error_type="protocol_error"}[5m]) by (a2a_version)
AgentHub Broker Metrics
agenthub_connections_total
Type: Gauge
Description: Number of active agent connections to AgentHub broker
Labels:
- connection_type- Type of connection (a2a_publisher, a2a_subscriber, unified)
- agent_type- Classification of connected agent
Usage:
# Current AgentHub connection count
agenthub_connections_total
# A2A connection growth over time
increase(agenthub_connections_total[1h])
# Connection distribution by type
agenthub_connections_total by (connection_type)
agenthub_subscriptions_total
Type: Gauge
Description: Number of active A2A message subscriptions
Labels:
- agent_id- Subscriber agent identifier
- subscription_type- Type of A2A subscription (tasks, messages, agent_events)
- filter_criteria- Applied subscription filters (task_types, states, etc.)
Usage:
# Total active A2A subscriptions
sum(agenthub_subscriptions_total)
# A2A subscriptions by agent
sum(agenthub_subscriptions_total) by (agent_id)
# Most popular A2A subscription types
sum(agenthub_subscriptions_total) by (subscription_type)
# Filtered vs unfiltered subscriptions
sum(agenthub_subscriptions_total{filter_criteria!=""}) / sum(agenthub_subscriptions_total)
agenthub_message_routing_duration_seconds
Type: Histogram
Description: Time taken to route A2A messages through AgentHub broker
Labels:
- routing_type- Type of routing (direct, broadcast, filtered)
- message_size_bucket- Message size classification (small, medium, large)
Buckets: 0.0001, 0.0005, 0.001, 0.005, 0.01, 0.025, 0.05, 0.1
Usage:
# AgentHub A2A routing latency percentiles
histogram_quantile(0.95, rate(agenthub_message_routing_duration_seconds_bucket[5m]))
# A2A routing performance by type
rate(agenthub_message_routing_duration_seconds_sum[5m]) / rate(agenthub_message_routing_duration_seconds_count[5m]) by (routing_type)
# Message size impact on routing
histogram_quantile(0.95, rate(agenthub_message_routing_duration_seconds_bucket[5m])) by (message_size_bucket)
agenthub_queue_size
Type: Gauge
Description: Number of A2A messages queued awaiting routing
Labels:
- queue_type- Type of queue (incoming, outgoing, dead_letter, retry)
- priority- Message priority level
- context_active- Whether messages belong to active A2A contexts
Usage:
# Current A2A queue sizes
agenthub_queue_size by (queue_type)
# A2A queue growth rate
rate(agenthub_queue_size[5m])
# Priority queue distribution
agenthub_queue_size by (priority)
# Active context message backlog
agenthub_queue_size{context_active="true"}
System Health Metrics
system_cpu_usage_percent
Type: Gauge
Description: CPU utilization percentage
Labels:
Usage:
# Current CPU usage
system_cpu_usage_percent
# High CPU services
system_cpu_usage_percent > 80
# Average CPU over time
avg_over_time(system_cpu_usage_percent[1h])
system_memory_usage_bytes
Type: Gauge
Description: Memory usage in bytes
Labels:
- service- Service name
- type- Memory type (heap, stack, total)
Usage:
# Memory usage in MB
system_memory_usage_bytes / 1024 / 1024
# Memory growth rate
rate(system_memory_usage_bytes[10m])
# Memory usage by type
system_memory_usage_bytes by (type)
system_goroutines_total
Type: Gauge
Description: Number of active goroutines
Labels:
Usage:
# Current goroutine count
system_goroutines_total
# Goroutine leaks detection
increase(system_goroutines_total[1h]) > 1000
# Goroutine efficiency
system_goroutines_total / system_cpu_usage_percent
system_file_descriptors_used
Type: Gauge
Description: Number of open file descriptors
Labels:
Usage:
# Current FD usage
system_file_descriptors_used
# FD growth rate
rate(system_file_descriptors_used[5m])
A2A Task-Specific Metrics
a2a_tasks_created_total
Type: Counter
Description: Total number of A2A tasks created
Labels:
- task_type- Type classification of the task
- context_id- A2A conversation context
- priority- Task priority level
Usage:
# A2A task creation rate
rate(a2a_tasks_created_total[5m])
# Task creation by type
rate(a2a_tasks_created_total[5m]) by (task_type)
# High priority task rate
rate(a2a_tasks_created_total{priority="PRIORITY_HIGH"}[5m])
a2a_task_state_transitions_total
Type: Counter
Description: Total number of A2A task state transitions
Labels:
- from_state- Previous task state
- to_state- New task state
- task_type- Type of task transitioning
Usage:
# Task completion rate
rate(a2a_task_state_transitions_total{to_state="TASK_STATE_COMPLETED"}[5m])
# Task failure rate
rate(a2a_task_state_transitions_total{to_state="TASK_STATE_FAILED"}[5m])
# Task state transition patterns
rate(a2a_task_state_transitions_total[5m]) by (from_state, to_state)
a2a_task_duration_seconds
Type: Histogram
Description: Duration of A2A task execution from submission to completion
Labels:
- task_type- Type of task
- final_state- Final task state (COMPLETED, FAILED, CANCELLED)
Buckets: 0.1, 0.5, 1, 5, 10, 30, 60, 300, 600, 1800
Usage:
# A2A task completion time percentiles
histogram_quantile(0.95, rate(a2a_task_duration_seconds_bucket{final_state="TASK_STATE_COMPLETED"}[5m]))
# Task duration by type
histogram_quantile(0.50, rate(a2a_task_duration_seconds_bucket[5m])) by (task_type)
# Failed vs successful task duration comparison
histogram_quantile(0.95, rate(a2a_task_duration_seconds_bucket[5m])) by (final_state)
a2a_artifacts_produced_total
Type: Counter
Description: Total number of A2A artifacts produced by completed tasks
Labels:
- artifact_type- Type of artifact (data, file, text)
- task_type- Type of task that produced the artifact
- artifact_size_bucket- Size classification of artifact
Usage:
# Artifact production rate
rate(a2a_artifacts_produced_total[5m])
# Artifacts by type
rate(a2a_artifacts_produced_total[5m]) by (artifact_type)
# Large artifact production rate
rate(a2a_artifacts_produced_total{artifact_size_bucket="large"}[5m])
gRPC Metrics
grpc_server_started_total
Type: Counter
Description: Total number of RPCs started on the AgentHub server
Labels:
- grpc_method- gRPC method name (PublishMessage, SubscribeToTasks, etc.)
- grpc_service- gRPC service name (AgentHub)
Usage:
# AgentHub RPC request rate
rate(grpc_server_started_total[5m])
# Most called A2A methods
topk(5, rate(grpc_server_started_total[5m]) by (grpc_method))
# A2A vs EDA method usage
rate(grpc_server_started_total{grpc_method=~".*Message.*|.*Task.*"}[5m])
grpc_server_handled_total
Type: Counter
Description: Total number of RPCs completed on the AgentHub server
Labels:
- grpc_method- gRPC method name
- grpc_service- gRPC service name (AgentHub)
- grpc_code- gRPC status code
- a2a_operation- A2A operation type (publish, subscribe, get, cancel)
Usage:
# AgentHub RPC success rate
rate(grpc_server_handled_total{grpc_code="OK"}[5m]) / rate(grpc_server_handled_total[5m]) * 100
# A2A operation error rate
rate(grpc_server_handled_total{grpc_code!="OK"}[5m]) by (a2a_operation)
# A2A method-specific success rates
rate(grpc_server_handled_total{grpc_code="OK"}[5m]) / rate(grpc_server_handled_total[5m]) by (grpc_method)
grpc_server_handling_seconds
Type: Histogram
Description: Histogram of response latency of AgentHub RPCs
Labels:
- grpc_method- gRPC method name
- grpc_service- gRPC service name (AgentHub)
- a2a_operation- A2A operation type
Usage:
# AgentHub gRPC latency percentiles
histogram_quantile(0.95, rate(grpc_server_handling_seconds_bucket[5m]))
# Slow A2A operations
histogram_quantile(0.95, rate(grpc_server_handling_seconds_bucket[5m])) by (a2a_operation) > 0.1
# A2A method performance comparison
histogram_quantile(0.95, rate(grpc_server_handling_seconds_bucket[5m])) by (grpc_method)
Health Check Metrics
health_check_status
Type: Gauge
Description: Health check status (1=healthy, 0=unhealthy)
Labels:
- service- Service name
- check_name- Name of the health check
- endpoint- Health check endpoint
Usage:
# Unhealthy services
health_check_status == 0
# Health check success rate
avg_over_time(health_check_status[5m])
health_check_duration_seconds
Type: Histogram
Description: Time taken to execute health checks
Labels:
- service- Service name
- check_name- Name of the health check
Usage:
# Health check latency
histogram_quantile(0.95, rate(health_check_duration_seconds_bucket[5m]))
# Slow health checks
histogram_quantile(0.95, rate(health_check_duration_seconds_bucket[5m])) by (check_name) > 0.5
OpenTelemetry Metrics
otelcol_processor_batch_batch_send_size_count
Type: Counter
Description: Number of batches sent by OTEL collector
Labels: None
otelcol_exporter_sent_spans
Type: Counter
Description: Number of spans sent to tracing backend
Labels:
- exporter- Exporter name (jaeger, otlp)
Usage:
# Span export rate
rate(otelcol_exporter_sent_spans[5m])
# Export success by backend
rate(otelcol_exporter_sent_spans[5m]) by (exporter)
Common Query Patterns
# Top 5 slowest A2A message types
topk(5,
  histogram_quantile(0.95,
    rate(a2a_message_processing_duration_seconds_bucket[5m])
  ) by (message_type)
)
# A2A task completion time analysis
histogram_quantile(0.95,
  rate(a2a_task_duration_seconds_bucket{final_state="TASK_STATE_COMPLETED"}[5m])
) by (task_type)
# Services exceeding A2A latency SLA (>500ms p95)
histogram_quantile(0.95,
  rate(a2a_message_processing_duration_seconds_bucket[5m])
) by (service) > 0.5
# A2A throughput efficiency (messages per CPU percent)
rate(a2a_messages_processed_total[5m]) / system_cpu_usage_percent
# Task success rate by type
rate(a2a_task_state_transitions_total{to_state="TASK_STATE_COMPLETED"}[5m]) /
rate(a2a_tasks_created_total[5m]) by (task_type)
A2A Error Analysis
# A2A message error rate by service over time
rate(a2a_message_errors_total[5m]) / rate(a2a_messages_processed_total[5m]) * 100
# A2A task failure rate
rate(a2a_task_state_transitions_total{to_state="TASK_STATE_FAILED"}[5m]) /
rate(a2a_tasks_created_total[5m]) * 100
# Most common A2A error types
topk(5, rate(a2a_message_errors_total[5m]) by (error_type))
# A2A protocol compatibility issues
rate(a2a_message_errors_total{error_type="protocol_error"}[5m]) by (a2a_version)
# Services with increasing A2A error rates
increase(a2a_message_errors_total[1h]) by (service) > 10
A2A Capacity Planning
# Peak hourly A2A message throughput
max_over_time(
  rate(a2a_messages_processed_total[5m])[1h:]
) * 3600
# Peak A2A task creation rate
max_over_time(
  rate(a2a_tasks_created_total[5m])[1h:]
) * 3600
# Resource utilization during peak A2A load
(
  max_over_time(system_cpu_usage_percent[1h:]) +
  max_over_time(system_memory_usage_bytes[1h:] / 1024 / 1024 / 1024)
) by (service)
# AgentHub connection scaling needs
max_over_time(agenthub_connections_total[24h:])
# A2A queue depth trends
max_over_time(agenthub_queue_size[24h:]) by (queue_type)
A2A System Health
# Overall A2A system health score (0-1)
avg(health_check_status)
# A2A services with degraded performance
(
  system_cpu_usage_percent > 70 or
  system_memory_usage_bytes > 1e9 or
  rate(a2a_message_errors_total[5m]) / rate(a2a_messages_processed_total[5m]) > 0.05
)
# A2A task backlog health
agenthub_queue_size{queue_type="incoming"} > 1000
# A2A protocol health indicators
rate(a2a_task_state_transitions_total{to_state="TASK_STATE_FAILED"}[5m]) /
rate(a2a_tasks_created_total[5m]) > 0.1
# Resource leak detection
increase(system_goroutines_total[1h]) > 1000 or
increase(system_file_descriptors_used[1h]) > 100
Alert Rule Examples
Critical A2A Alerts
# High A2A message processing error rate alert
- alert: HighA2AMessageProcessingErrorRate
  expr: |
    (
      rate(a2a_message_errors_total[5m]) /
      rate(a2a_messages_processed_total[5m])
    ) * 100 > 10    
  for: 2m
  annotations:
    summary: "High A2A message processing error rate"
    description: "{{ $labels.service }} has {{ $value }}% A2A error rate"
# High A2A task failure rate alert
- alert: HighA2ATaskFailureRate
  expr: |
    (
      rate(a2a_task_state_transitions_total{to_state="TASK_STATE_FAILED"}[5m]) /
      rate(a2a_tasks_created_total[5m])
    ) * 100 > 15    
  for: 3m
  annotations:
    summary: "High A2A task failure rate"
    description: "{{ $value }}% of A2A tasks are failing for task type {{ $labels.task_type }}"
# AgentHub service down alert
- alert: AgentHubServiceDown
  expr: health_check_status == 0
  for: 1m
  annotations:
    summary: "AgentHub service health check failing"
    description: "{{ $labels.service }} health check {{ $labels.check_name }} is failing"
# A2A queue backlog alert
- alert: A2AQueueBacklog
  expr: agenthub_queue_size{queue_type="incoming"} > 1000
  for: 5m
  annotations:
    summary: "A2A message queue backlog"
    description: "AgentHub has {{ $value }} messages queued"
A2A Warning Alerts
# High A2A message processing latency warning
- alert: HighA2AMessageProcessingLatency
  expr: |
    histogram_quantile(0.95,
      rate(a2a_message_processing_duration_seconds_bucket[5m])
    ) > 0.5    
  for: 5m
  annotations:
    summary: "High A2A message processing latency"
    description: "{{ $labels.service }} A2A p95 latency is {{ $value }}s"
# Slow A2A task completion warning
- alert: SlowA2ATaskCompletion
  expr: |
    histogram_quantile(0.95,
      rate(a2a_task_duration_seconds_bucket{final_state="TASK_STATE_COMPLETED"}[5m])
    ) > 300    
  for: 10m
  annotations:
    summary: "Slow A2A task completion"
    description: "A2A tasks of type {{ $labels.task_type }} taking {{ $value }}s to complete"
# High CPU usage warning
- alert: HighCPUUsage
  expr: system_cpu_usage_percent > 80
  for: 5m
  annotations:
    summary: "High CPU usage"
    description: "{{ $labels.service }} CPU usage is {{ $value }}%"
# A2A protocol version compatibility warning
- alert: A2AProtocolVersionMismatch
  expr: |
    rate(a2a_message_errors_total{error_type="protocol_error"}[5m]) > 0.1    
  for: 3m
  annotations:
    summary: "A2A protocol version compatibility issues"
    description: "A2A protocol errors detected for version {{ $labels.a2a_version }}"
Metric Retention and Storage
Retention Policies
- Raw metrics: 15 days at 15-second resolution
- 5m averages: 60 days
- 1h averages: 1 year
- 1d averages: 5 years
Storage Requirements
- Per service: ~2MB/day for all metrics
- Complete system: ~10MB/day for 5 services
- 1 year retention: ~3.6GB total
- Scrape interval: 10 seconds (configurable)
- Evaluation interval: 15 seconds for alerts
- Query timeout: 30 seconds
- Max samples: 50M per query
Integration Examples
Grafana Dashboard Variables
{
  "service": {
    "query": "label_values(a2a_messages_processed_total, service)",
    "refresh": "on_time_range_changed"
  },
  "message_type": {
    "query": "label_values(a2a_messages_processed_total{service=\"$service\"}, message_type)",
    "refresh": "on_dashboard_load"
  },
  "task_type": {
    "query": "label_values(a2a_tasks_created_total, task_type)",
    "refresh": "on_dashboard_load"
  },
  "context_id": {
    "query": "label_values(a2a_messages_processed_total{service=\"$service\"}, context_id)",
    "refresh": "on_dashboard_load"
  }
}
Custom A2A Application Metrics
// Register custom A2A counter
a2aCustomCounter, err := meter.Int64Counter(
    "a2a_custom_business_metric_total",
    metric.WithDescription("Custom A2A business metric"),
)
// Increment with A2A context and labels
a2aCustomCounter.Add(ctx, 1, metric.WithAttributes(
    attribute.String("task_type", "custom_analysis"),
    attribute.String("context_id", contextID),
    attribute.String("agent_type", "analytics_agent"),
    attribute.String("a2a_version", "1.0"),
))
// Register A2A task-specific histogram
a2aTaskHistogram, err := meter.Float64Histogram(
    "a2a_custom_task_processing_seconds",
    metric.WithDescription("Custom A2A task processing time"),
    metric.WithUnit("s"),
)
// Record A2A task timing
start := time.Now()
// ... process A2A task ...
duration := time.Since(start).Seconds()
a2aTaskHistogram.Record(ctx, duration, metric.WithAttributes(
    attribute.String("task_type", taskType),
    attribute.String("task_state", "TASK_STATE_COMPLETED"),
))
Troubleshooting Metrics
Missing Metrics Checklist
- ✅ Service built with -tags observability
- ✅ Prometheus can reach metrics endpoint
- ✅ Correct port in Prometheus config
- ✅ Service is actually processing events
- ✅ OpenTelemetry exporter configured correctly
High Cardinality Warning
Avoid metrics with unbounded label values:
- ❌ User IDs as labels (millions of values)
- ❌ Timestamps as labels
- ❌ Request IDs as labels
- ✅ Event types (limited set)
- ✅ Service names (limited set)
- ✅ Status codes (limited set)
🎯 Next Steps:
Implementation: Add Observability to Your Agent
Monitoring: Use Grafana Dashboards
Understanding: Distributed Tracing Explained
4 - Tasks
Task message specifications and reference
Tasks Reference
This section provides detailed reference documentation for task messages, specifications, and data structures used throughout AgentHub.
Available Documentation
- Task Reference - Detailed task message specifications and data structures
4.1 - A2A Task Reference
Comprehensive reference for all task-related message types and operations in the Agent2Agent protocol implementation.
A2A Task Reference
This document provides a comprehensive reference for all task-related message types and operations in the Agent2Agent (A2A) protocol implementation within AgentHub’s hybrid Event-Driven Architecture.
Core A2A Task Types
A2A Task
The primary message type for managing work requests between agents in the Agent2Agent protocol.
message Task {
  string id = 1;                    // Required: Task identifier
  string context_id = 2;            // Optional: Conversation context
  TaskStatus status = 3;            // Required: Current task status
  repeated Message history = 4;     // Message history for this task
  repeated Artifact artifacts = 5;  // Task output artifacts
  google.protobuf.Struct metadata = 6; // Task metadata
}
Field Reference
| Field | Type | Required | Description | 
|---|
| id | string | Yes | Globally unique identifier for the task | 
| context_id | string | No | Groups related tasks in a workflow or conversation | 
| status | TaskStatus | Yes | Current execution state and last update | 
| history | Message[] | No | Complete message history for this task | 
| artifacts | Artifact[] | No | Output artifacts produced by the task | 
| metadata | Struct | No | Additional context information | 
Task IDs should be globally unique and meaningful for debugging:
// Recommended formats:
taskID := fmt.Sprintf("task_%s_%d", taskType, time.Now().Unix())
taskID := fmt.Sprintf("task_%s_%s", taskType, uuid.New().String())
taskID := fmt.Sprintf("%s_%s_%d", requesterID, taskType, sequence)
A2A TaskStatus
Represents the current state and latest update for a task.
message TaskStatus {
  TaskState state = 1;              // Current task state
  Message update = 2;               // Status update message
  google.protobuf.Timestamp timestamp = 3; // Status timestamp
}
Field Reference
| Field | Type | Required | Description | 
|---|
| state | TaskState | Yes | Current execution state | 
| update | Message | No | Latest status message from the executing agent | 
| timestamp | Timestamp | Yes | When this status was last updated | 
A2A Message
Agent-to-agent communication within task context.
message Message {
  string message_id = 1;       // Required: Unique message identifier
  string context_id = 2;       // Optional: Conversation context
  string task_id = 3;          // Optional: Associated task
  Role role = 4;               // Required: USER or AGENT
  repeated Part content = 5;   // Required: Message content parts
  google.protobuf.Struct metadata = 6; // Optional: Additional metadata
  repeated string extensions = 7;       // Optional: Protocol extensions
}
Message Content Parts
Messages contain structured content using A2A Part definitions:
message Part {
  oneof part {
    string text = 1;           // Text content
    DataPart data = 2;         // Structured data
    FilePart file = 3;         // File reference
  }
}
message DataPart {
  google.protobuf.Struct data = 1;    // Structured data content
  string description = 2;             // Optional data description
}
message FilePart {
  string file_id = 1;                 // File identifier or URI
  string filename = 2;                // Original filename
  string mime_type = 3;               // MIME type
  int64 size_bytes = 4;               // File size in bytes
  google.protobuf.Struct metadata = 5; // Additional file metadata
}
A2A Artifact
Structured output produced by completed tasks.
message Artifact {
  string artifact_id = 1;           // Required: Artifact identifier
  string name = 2;                  // Human-readable name
  string description = 3;           // Artifact description
  repeated Part parts = 4;          // Artifact content parts
  google.protobuf.Struct metadata = 5; // Artifact metadata
}
Field Reference
| Field | Type | Required | Description | 
|---|
| artifact_id | string | Yes | Unique identifier for this artifact | 
| name | string | No | Human-readable artifact name | 
| description | string | No | Description of the artifact contents | 
| parts | Part[] | Yes | Structured content using A2A Part format | 
| metadata | Struct | No | Additional artifact information | 
Enumerations
TaskState
Current state of A2A task execution.
enum TaskState {
  TASK_STATE_SUBMITTED = 0;    // Task created and submitted
  TASK_STATE_WORKING = 1;      // Task in progress
  TASK_STATE_COMPLETED = 2;    // Task completed successfully
  TASK_STATE_FAILED = 3;       // Task failed with error
  TASK_STATE_CANCELLED = 4;    // Task cancelled
}
State Transition Rules
Valid state transitions:
TASK_STATE_SUBMITTED → TASK_STATE_WORKING → TASK_STATE_COMPLETED
TASK_STATE_SUBMITTED → TASK_STATE_WORKING → TASK_STATE_FAILED
TASK_STATE_SUBMITTED → TASK_STATE_WORKING → TASK_STATE_CANCELLED
TASK_STATE_SUBMITTED → TASK_STATE_CANCELLED (before execution starts)
Invalid transitions:
- Any state → TASK_STATE_SUBMITTED
- TASK_STATE_COMPLETED → any other state
- TASK_STATE_FAILED → any other state (except for retry scenarios)
Role
Identifies the role of the message sender in A2A communication.
enum Role {
  USER = 0;    // Message from requesting agent
  AGENT = 1;   // Message from responding agent
}
Priority
Task priority levels for scheduling and resource allocation.
enum Priority {
  PRIORITY_UNSPECIFIED = 0;  // Default value, treated as MEDIUM
  PRIORITY_LOW = 1;          // Low priority, can be delayed
  PRIORITY_MEDIUM = 2;       // Normal priority
  PRIORITY_HIGH = 3;         // High priority, expedited processing
  PRIORITY_CRITICAL = 4;     // Critical priority, immediate processing
}
Priority Usage Guidelines
| Priority | Use Cases | SLA Expectations | 
|---|
| LOW | Background jobs, cleanup tasks, analytics | Hours to days | 
| MEDIUM | Standard user requests, routine processing | Minutes to hours | 
| HIGH | User-visible operations, time-sensitive tasks | Seconds to minutes | 
| CRITICAL | Emergency operations, system health tasks | Immediate | 
AgentHub EDA Request/Response Messages
Task Publishing
PublishTaskUpdateRequest
Request to publish a task status update through the EDA broker.
message PublishTaskUpdateRequest {
  a2a.Task task = 1;                      // Updated A2A task
  AgentEventMetadata routing = 2;         // EDA routing metadata
}
PublishTaskArtifactRequest
Request to publish a task artifact through the EDA broker.
message PublishTaskArtifactRequest {
  string task_id = 1;                     // Associated task ID
  a2a.Artifact artifact = 2;              // A2A artifact
  AgentEventMetadata routing = 3;         // EDA routing metadata
}
Task Subscription
SubscribeToTasksRequest
Request to subscribe to A2A task events through the EDA broker.
message SubscribeToTasksRequest {
  string agent_id = 1;                    // Agent ID for subscription
  repeated string task_types = 2;         // Optional task type filter
  repeated a2a.TaskState states = 3;      // Optional state filter
}
Usage Examples
// Subscribe to all tasks for this agent
req := &pb.SubscribeToTasksRequest{
    AgentId: "data_processor_01",
}
// Subscribe only to working and completed tasks
req := &pb.SubscribeToTasksRequest{
    AgentId: "workflow_orchestrator",
    States: []a2a.TaskState{
        a2a.TaskState_TASK_STATE_WORKING,
        a2a.TaskState_TASK_STATE_COMPLETED,
    },
}
Task Management
GetTaskRequest
Request to retrieve the current state of an A2A task.
message GetTaskRequest {
  string task_id = 1;                     // Task identifier
  int32 history_length = 2;               // History limit (optional)
}
CancelTaskRequest
Request to cancel an active A2A task.
message CancelTaskRequest {
  string task_id = 1;                     // Task to cancel
  string reason = 2;                      // Cancellation reason
}
ListTasksRequest
Request to list A2A tasks matching criteria.
message ListTasksRequest {
  string agent_id = 1;                    // Filter by agent
  repeated a2a.TaskState states = 2;      // Filter by states
  google.protobuf.Timestamp since = 3;    // Filter by timestamp
  int32 limit = 4;                        // Results limit
}
gRPC Service Methods
Task Publishing Methods
PublishTaskUpdate
Publishes a task status update to the EDA broker.
rpc PublishTaskUpdate (PublishTaskUpdateRequest) returns (PublishResponse);
Example:
// Create updated task status
status := &a2a.TaskStatus{
    State: a2a.TaskState_TASK_STATE_WORKING,
    Update: &a2a.Message{
        MessageId: "msg_" + uuid.New().String(),
        TaskId:    taskID,
        Role:      a2a.Role_AGENT,
        Content: []*a2a.Part{
            {
                Part: &a2a.Part_Text{
                    Text: "Processing data analysis...",
                },
            },
        },
    },
    Timestamp: timestamppb.Now(),
}
task := &a2a.Task{
    Id:     taskID,
    Status: status,
}
req := &pb.PublishTaskUpdateRequest{
    Task: task,
    Routing: &pb.AgentEventMetadata{
        FromAgentId: "processor_01",
        EventType:   "task.status_update",
    },
}
res, err := client.PublishTaskUpdate(ctx, req)
PublishTaskArtifact
Publishes a task artifact to the EDA broker.
rpc PublishTaskArtifact (PublishTaskArtifactRequest) returns (PublishResponse);
Example:
// Create artifact with results
artifact := &a2a.Artifact{
    ArtifactId:  "artifact_" + uuid.New().String(),
    Name:        "Analysis Results",
    Description: "Statistical analysis of sales data",
    Parts: []*a2a.Part{
        {
            Part: &a2a.Part_Data{
                Data: &a2a.DataPart{
                    Data: structData, // Contains analysis results
                    Description: "Sales analysis summary statistics",
                },
            },
        },
        {
            Part: &a2a.Part_File{
                File: &a2a.FilePart{
                    FileId:   "file_123",
                    Filename: "analysis_report.pdf",
                    MimeType: "application/pdf",
                    SizeBytes: 1024576,
                },
            },
        },
    },
}
req := &pb.PublishTaskArtifactRequest{
    TaskId:   taskID,
    Artifact: artifact,
    Routing: &pb.AgentEventMetadata{
        FromAgentId: "processor_01",
        EventType:   "task.artifact",
    },
}
res, err := client.PublishTaskArtifact(ctx, req)
Task Subscription Methods
SubscribeToTasks
Subscribes to receive A2A task events through the EDA broker.
rpc SubscribeToTasks (SubscribeToTasksRequest) returns (stream AgentEvent);
Returns: Stream of AgentEvent objects containing A2A task updates
Example:
req := &pb.SubscribeToTasksRequest{
    AgentId: "processor_01",
    States: []a2a.TaskState{a2a.TaskState_TASK_STATE_SUBMITTED},
}
stream, err := client.SubscribeToTasks(ctx, req)
for {
    event, err := stream.Recv()
    if err != nil {
        break
    }
    // Extract A2A task from event
    if task := event.GetTask(); task != nil {
        go processA2ATask(task)
    }
}
Task Management Methods
GetTask
Retrieves the current state of an A2A task by ID.
rpc GetTask (GetTaskRequest) returns (a2a.Task);
CancelTask
Cancels an active A2A task and notifies subscribers.
rpc CancelTask (CancelTaskRequest) returns (a2a.Task);
ListTasks
Returns A2A tasks matching the specified criteria.
rpc ListTasks (ListTasksRequest) returns (ListTasksResponse);
A2A Task Workflow Patterns
Simple Request-Response
// 1. Agent A creates and publishes task request
task := &a2a.Task{
    Id:        "task_analysis_123",
    ContextId: "workflow_456",
    Status: &a2a.TaskStatus{
        State: a2a.TaskState_TASK_STATE_SUBMITTED,
        Update: &a2a.Message{
            MessageId: "msg_" + uuid.New().String(),
            TaskId:    "task_analysis_123",
            Role:      a2a.Role_USER,
            Content: []*a2a.Part{
                {
                    Part: &a2a.Part_Text{
                        Text: "Please analyze the Q4 sales data",
                    },
                },
                {
                    Part: &a2a.Part_Data{
                        Data: &a2a.DataPart{
                            Data: dataStruct, // Contains parameters
                        },
                    },
                },
            },
        },
        Timestamp: timestamppb.Now(),
    },
}
// 2. Agent B receives task and updates status to WORKING
// 3. Agent B publishes progress updates during execution
// 4. Agent B publishes final artifacts and COMPLETED status
Multi-Step Workflow
// 1. Orchestrator creates main task
mainTask := &a2a.Task{
    Id:        "workflow_main_789",
    ContextId: "workflow_context_789",
    // ... initial message
}
// 2. Create subtasks with same context_id
subtask1 := &a2a.Task{
    Id:        "subtask_data_prep_790",
    ContextId: "workflow_context_789", // Same context
    // ... data preparation request
}
subtask2 := &a2a.Task{
    Id:        "subtask_analysis_791",
    ContextId: "workflow_context_789", // Same context
    // ... analysis request (depends on subtask1)
}
// 3. Tasks linked by context_id for workflow tracking
Error Handling Reference
A2A Task Error Patterns
Parameter Validation Errors
// Task fails with validation error
failedTask := &a2a.Task{
    Id: taskID,
    Status: &a2a.TaskStatus{
        State: a2a.TaskState_TASK_STATE_FAILED,
        Update: &a2a.Message{
            Role: a2a.Role_AGENT,
            Content: []*a2a.Part{
                {
                    Part: &a2a.Part_Text{
                        Text: "Task failed: Required parameter 'dataset_path' is missing",
                    },
                },
                {
                    Part: &a2a.Part_Data{
                        Data: &a2a.DataPart{
                            Data: errorDetails, // Structured error info
                            Description: "Validation error details",
                        },
                    },
                },
            },
        },
        Timestamp: timestamppb.Now(),
    },
}
Resource Errors
// Task fails due to resource unavailability
failedTask := &a2a.Task{
    Id: taskID,
    Status: &a2a.TaskStatus{
        State: a2a.TaskState_TASK_STATE_FAILED,
        Update: &a2a.Message{
            Role: a2a.Role_AGENT,
            Content: []*a2a.Part{
                {
                    Part: &a2a.Part_Text{
                        Text: "Cannot access dataset file: /data/sales_2024.csv",
                    },
                },
            },
        },
        Timestamp: timestamppb.Now(),
    },
}
Error Handling Best Practices
- Use structured error messages in A2A format for programmatic handling
- Include actionable error descriptions in text parts for human operators
- Add detailed error data in data parts for debugging and retry logic
- Maintain task history to preserve error context
- Consider partial results using artifacts for partially successful operations
Migration from Legacy EventBus
Message Type Mappings
| Legacy EventBus | A2A Equivalent | Notes | 
|---|
| TaskMessage | a2a.Taskwith initialMessage | Task creation with request message | 
| TaskResult | a2a.Taskwith finalArtifact | Task completion with result artifacts | 
| TaskProgress | a2a.Taskwith statusMessage | Progress updates via status messages | 
| TaskStatusenum | a2a.TaskStateenum | State names updated (e.g., IN_PROGRESS→TASK_STATE_WORKING) | 
API Method Mappings
| Legacy EventBus | A2A Equivalent | Notes | 
|---|
| PublishTask | PublishTaskUpdate | Now publishes A2A task objects | 
| PublishTaskResult | PublishTaskArtifact | Results published as artifacts | 
| PublishTaskProgress | PublishTaskUpdate | Progress via task status updates | 
| SubscribeToTasks | SubscribeToTasks | Now returns A2A task events | 
| SubscribeToTaskResults | SubscribeToTasks(filtered) | Filter by COMPLETED state | 
This reference provides the complete specification for A2A task-related messages and operations in the AgentHub Event-Driven Architecture, enabling robust distributed task coordination with full Agent2Agent protocol compliance.