This is the multi-page printable view of this section. Click here to print.

Return to the regular view of this page.

Reference

Comprehensive technical documentation and API specifications

Reference Documentation

This section contains comprehensive technical documentation for all AgentHub components, APIs, and configuration options. Use this as your authoritative source for implementation details.

📚 Documentation Sections

  • Configuration - Complete configuration options and settings
  • API Reference - gRPC APIs, unified abstractions, and tracing interfaces
  • Observability - Metrics, health endpoints, and monitoring
  • Tasks - Task message specifications and data structures

🎯 How to Use This Reference

  • Accuracy: All information is kept up-to-date with the latest version
  • Completeness: Every public API and configuration option is documented
  • Examples: Code examples illustrate usage where helpful
  • Structure: Information is organized by component and function

🔍 Quick Navigation

By Component

By Use Case

1 - Configuration

Configuration reference and settings documentation

Configuration Reference

This section provides comprehensive documentation for all AgentHub configuration options, environment variables, and settings.

Available Documentation

1.1 - Environment Variables Reference

Complete reference for all environment variables used by AgentHub’s unified abstractions for configuration and observability.

Environment Variables Reference

This reference documents all environment variables used by AgentHub’s unified abstraction system. All components automatically load these variables for configuration.

Core Configuration

Broker Connection

VariableDefaultDescriptionUsed By
AGENTHUB_BROKER_ADDRlocalhostBroker server hostname or IP addressAgents
AGENTHUB_BROKER_PORT50051Broker gRPC port numberAgents
AGENTHUB_GRPC_PORT:50051Server listen address (for broker)Broker

Example:

export AGENTHUB_BROKER_ADDR="production-broker.example.com"
export AGENTHUB_BROKER_PORT="50051"
export AGENTHUB_GRPC_PORT=":50051"

Health Monitoring

VariableDefaultDescriptionUsed By
BROKER_HEALTH_PORT8080Broker health check endpoint portBroker
PUBLISHER_HEALTH_PORT8081Publisher health check endpoint portPublishers
SUBSCRIBER_HEALTH_PORT8082Subscriber health check endpoint portSubscribers

Health Endpoints Available:

  • http://localhost:8080/health - Health check
  • http://localhost:8080/metrics - Prometheus metrics
  • http://localhost:8080/ready - Readiness check

Example:

export BROKER_HEALTH_PORT="8080"
export PUBLISHER_HEALTH_PORT="8081"
export SUBSCRIBER_HEALTH_PORT="8082"

Observability Configuration

Distributed Tracing

VariableDefaultDescriptionUsed By
JAEGER_ENDPOINT127.0.0.1:4317Jaeger OTLP endpoint for tracesAll components
SERVICE_NAMEagenthub-serviceService name for tracingAll components
SERVICE_VERSION1.0.0Service version for telemetryAll components

Example:

export JAEGER_ENDPOINT="http://jaeger.example.com:14268/api/traces"
export SERVICE_NAME="my-agenthub-app"
export SERVICE_VERSION="2.1.0"

Jaeger Integration:

  • When JAEGER_ENDPOINT is set: Automatic tracing enabled
  • When empty or unset: Tracing disabled (minimal overhead)
  • Supports both gRPC (4317) and HTTP (14268) endpoints

Metrics Collection

VariableDefaultDescriptionUsed By
PROMETHEUS_PORT9090Prometheus server portObservability stack
GRAFANA_PORT3333Grafana dashboard portObservability stack
ALERTMANAGER_PORT9093AlertManager portObservability stack

Example:

export PROMETHEUS_PORT="9090"
export GRAFANA_PORT="3333"
export ALERTMANAGER_PORT="9093"

OpenTelemetry Collector

VariableDefaultDescriptionUsed By
OTLP_GRPC_PORT4320OTLP Collector gRPC portObservability stack
OTLP_HTTP_PORT4321OTLP Collector HTTP portObservability stack

Example:

export OTLP_GRPC_PORT="4320"
export OTLP_HTTP_PORT="4321"

Service Configuration

General Settings

VariableDefaultDescriptionUsed By
ENVIRONMENTdevelopmentDeployment environmentAll components
LOG_LEVELINFOLogging level (DEBUG, INFO, WARN, ERROR)All components

Example:

export ENVIRONMENT="production"
export LOG_LEVEL="WARN"

Environment-Specific Configurations

Development Environment

# .envrc for development
export AGENTHUB_BROKER_ADDR="localhost"
export AGENTHUB_BROKER_PORT="50051"
export AGENTHUB_GRPC_PORT=":50051"

# Health ports
export BROKER_HEALTH_PORT="8080"
export PUBLISHER_HEALTH_PORT="8081"
export SUBSCRIBER_HEALTH_PORT="8082"

# Observability (local stack)
export JAEGER_ENDPOINT="http://localhost:14268/api/traces"
export PROMETHEUS_PORT="9090"
export GRAFANA_PORT="3333"

# Service metadata
export SERVICE_NAME="agenthub-dev"
export SERVICE_VERSION="dev"
export ENVIRONMENT="development"
export LOG_LEVEL="DEBUG"

Staging Environment

# .envrc for staging
export AGENTHUB_BROKER_ADDR="staging-broker.example.com"
export AGENTHUB_BROKER_PORT="50051"

# Health ports (non-conflicting)
export BROKER_HEALTH_PORT="8080"
export PUBLISHER_HEALTH_PORT="8081"
export SUBSCRIBER_HEALTH_PORT="8082"

# Observability (staging stack)
export JAEGER_ENDPOINT="http://staging-jaeger.example.com:14268/api/traces"
export PROMETHEUS_PORT="9090"
export GRAFANA_PORT="3333"

# Service metadata
export SERVICE_NAME="agenthub-staging"
export SERVICE_VERSION="1.2.0-rc1"
export ENVIRONMENT="staging"
export LOG_LEVEL="INFO"

Production Environment

# .envrc for production
export AGENTHUB_BROKER_ADDR="prod-broker.example.com"
export AGENTHUB_BROKER_PORT="50051"

# Health ports
export BROKER_HEALTH_PORT="8080"
export PUBLISHER_HEALTH_PORT="8081"
export SUBSCRIBER_HEALTH_PORT="8082"

# Observability (production stack)
export JAEGER_ENDPOINT="http://jaeger.prod.example.com:14268/api/traces"
export PROMETHEUS_PORT="9090"
export GRAFANA_PORT="3333"
export ALERTMANAGER_PORT="9093"

# Service metadata
export SERVICE_NAME="agenthub-prod"
export SERVICE_VERSION="1.2.0"
export ENVIRONMENT="production"
export LOG_LEVEL="WARN"

Configuration Loading

Automatic Loading by Unified Abstractions

The unified abstractions automatically load environment variables:

// Automatic configuration loading
config := agenthub.NewGRPCConfig("my-component")

// Results in:
// config.BrokerAddr = "localhost:50051" (AGENTHUB_BROKER_ADDR + AGENTHUB_BROKER_PORT)
// config.ServerAddr = ":50051" (AGENTHUB_GRPC_PORT)
// config.HealthPort = "8080" (BROKER_HEALTH_PORT)
// config.ComponentName = "my-component" (from parameter)
  1. Install direnv: https://direnv.net/docs/installation.html

  2. Create .envrc file:

    # Create .envrc in project root
    cat > .envrc << 'EOF'
    export AGENTHUB_BROKER_ADDR="localhost"
    export AGENTHUB_BROKER_PORT="50051"
    export JAEGER_ENDPOINT="http://localhost:14268/api/traces"
    export SERVICE_NAME="my-agenthub-app"
    EOF
    
  3. Allow direnv:

    direnv allow
    
  4. Automatic loading: Variables load automatically when entering directory

Manual Loading

# Source variables manually
source .envrc

# Or set individually
export AGENTHUB_BROKER_ADDR="localhost"
export JAEGER_ENDPOINT="http://localhost:14268/api/traces"

Configuration Validation

Required Variables

Minimal configuration (all have defaults):

  • No variables are strictly required
  • Defaults work for local development

Production recommendations:

  • Set JAEGER_ENDPOINT for tracing
  • Set SERVICE_NAME for identification
  • Set ENVIRONMENT to “production”
  • Configure unique health ports if running multiple services

Configuration Verification

Check loaded configuration:

config := agenthub.NewGRPCConfig("test")
fmt.Printf("Broker: %s\n", config.BrokerAddr)
fmt.Printf("Health: %s\n", config.HealthPort)
fmt.Printf("Component: %s\n", config.ComponentName)

Verify health endpoints:

# Check if configuration is working
curl http://localhost:8080/health
curl http://localhost:8081/health  # Publisher
curl http://localhost:8082/health  # Subscriber

Verify tracing:

  • Open Jaeger UI: http://localhost:16686
  • Look for traces from your service name
  • Check spans are being created

Common Patterns

Docker Compose

# docker-compose.yml
version: '3.8'
services:
  broker:
    build: .
    command: go run broker/main.go
    environment:
      - AGENTHUB_GRPC_PORT=:50051
      - BROKER_HEALTH_PORT=8080
      - JAEGER_ENDPOINT=http://jaeger:14268/api/traces
      - SERVICE_NAME=agenthub-broker
    ports:
      - "50051:50051"
      - "8080:8080"

  publisher:
    build: .
    command: go run agents/publisher/main.go
    environment:
      - AGENTHUB_BROKER_ADDR=broker
      - AGENTHUB_BROKER_PORT=50051
      - PUBLISHER_HEALTH_PORT=8081
      - JAEGER_ENDPOINT=http://jaeger:14268/api/traces
      - SERVICE_NAME=agenthub-publisher
    ports:
      - "8081:8081"

Kubernetes ConfigMap

# configmap.yaml
apiVersion: v1
kind: ConfigMap
metadata:
  name: agenthub-config
data:
  AGENTHUB_BROKER_ADDR: "agenthub-broker.default.svc.cluster.local"
  AGENTHUB_BROKER_PORT: "50051"
  JAEGER_ENDPOINT: "http://jaeger.observability.svc.cluster.local:14268/api/traces"
  SERVICE_NAME: "agenthub-k8s"
  SERVICE_VERSION: "1.0.0"
  ENVIRONMENT: "production"
  LOG_LEVEL: "INFO"

---
# deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: agenthub-publisher
spec:
  template:
    spec:
      containers:
      - name: publisher
        image: agenthub:latest
        envFrom:
        - configMapRef:
            name: agenthub-config
        env:
        - name: PUBLISHER_HEALTH_PORT
          value: "8080"

Troubleshooting

Common Issues

ProblemSolution
Agent can’t connect to brokerCheck AGENTHUB_BROKER_ADDR and AGENTHUB_BROKER_PORT
Health endpoint not accessibleVerify *_HEALTH_PORT variables and port availability
No traces in JaegerSet JAEGER_ENDPOINT and ensure Jaeger is running
Port conflictsUse different ports for each component’s health endpoints
Configuration not loadingEnsure variables are exported, check with printenv

Debug Configuration

Check environment variables:

# List all AgentHub variables
printenv | grep AGENTHUB

# List all observability variables
printenv | grep -E "(JAEGER|SERVICE|PROMETHEUS|GRAFANA)"

# Check specific variable
echo $AGENTHUB_BROKER_ADDR

Test configuration:

# Quick test with temporary override
AGENTHUB_BROKER_ADDR=test-broker go run agents/publisher/main.go

# Verify health endpoint responds
curl -f http://localhost:8080/health || echo "Health check failed"

Configuration Precedence

  1. Environment variables (highest priority)
  2. Default values (from code)

Example: If AGENTHUB_BROKER_ADDR is set, it overrides the default “localhost”


This environment variable reference provides comprehensive documentation for configuring AgentHub using the unified abstraction system. For practical usage examples, see the Installation and Setup Tutorial and Configuration Reference.

1.2 - Configuration Reference

Comprehensive reference for configuring AgentHub components using the unified abstraction library with environment-based configuration.

Configuration Reference

This document provides comprehensive reference for configuring AgentHub components using the unified abstraction library with environment-based configuration.

Unified Abstraction Configuration

AgentHub uses environment variables for all configuration with the unified abstraction library providing automatic configuration setup.

Core Environment Variables

gRPC Connection Configuration

VariableDefaultDescription
AGENTHUB_BROKER_ADDRlocalhostBroker server hostname or IP address
AGENTHUB_BROKER_PORT50051Broker gRPC port number
AGENTHUB_GRPC_PORT:50051Server listen address (for broker)

Note: The unified abstraction automatically combines AGENTHUB_BROKER_ADDR and AGENTHUB_BROKER_PORT into a complete broker address (e.g., localhost:50051).

Health Monitoring Configuration

VariableDefaultDescription
BROKER_HEALTH_PORT8080Broker health check endpoint port
PUBLISHER_HEALTH_PORT8081Publisher health check endpoint port
SUBSCRIBER_HEALTH_PORT8082Subscriber health check endpoint port

Observability Configuration

VariableDefaultDescription
JAEGER_ENDPOINT127.0.0.1:4317Jaeger OTLP endpoint for distributed tracing
PROMETHEUS_PORT9090Prometheus metrics collection port
GRAFANA_PORT3333Grafana dashboard web interface port
ALERTMANAGER_PORT9093AlertManager web interface port
OTLP_GRPC_PORT4320OpenTelemetry Collector gRPC port
OTLP_HTTP_PORT4321OpenTelemetry Collector HTTP port

Service Metadata

VariableDefaultDescription
SERVICE_VERSION1.0.0Service version for telemetry and observability
ENVIRONMENTdevelopmentDeployment environment (development, staging, production)

A2A Protocol Configuration

VariableDefaultDescription
AGENTHUB_MESSAGE_BUFFER_SIZE100Buffer size for A2A message processing
AGENTHUB_TASK_UPDATE_INTERVAL1sInterval for publishing task status updates
AGENTHUB_ARTIFACT_MAX_SIZE10MBMaximum size for task artifacts
AGENTHUB_CONTEXT_TIMEOUT30sTimeout for A2A message context
AGENTHUB_A2A_PROTOCOL_VERSION1.0A2A protocol version for compatibility
AGENTHUB_MESSAGE_HISTORY_LIMIT50Maximum message history per task

Unified Abstraction Usage

Using Configuration with the Unified Abstraction

The unified abstraction library automatically loads configuration from environment variables:

// Create configuration from environment variables
config := agenthub.NewGRPCConfig("my-component")

// Configuration is automatically populated:
// - config.BrokerAddr: "localhost:50051" (combined from AGENTHUB_BROKER_ADDR + AGENTHUB_BROKER_PORT)
// - config.ServerAddr: ":50051" (from AGENTHUB_GRPC_PORT)
// - config.HealthPort: "8080" (from BROKER_HEALTH_PORT)
// - config.ComponentName: "my-component" (from parameter)

Environment Variable Loading

The recommended way to load environment variables:

Option 1: Using direnv (recommended)

# Place variables in .envrc file
direnv allow

Option 2: Source manually

source .envrc

Option 3: Set individual variables

export AGENTHUB_BROKER_ADDR=localhost
export AGENTHUB_BROKER_PORT=50051
export JAEGER_ENDPOINT=127.0.0.1:4317

Configuration Override Examples

You can override defaults by setting environment variables before running:

# Use different broker address
export AGENTHUB_BROKER_ADDR=remote-broker.example.com
export AGENTHUB_BROKER_PORT=9090
go run broker/main.go

# Use different health ports to avoid conflicts
export BROKER_HEALTH_PORT=8083
export PUBLISHER_HEALTH_PORT=8084
export SUBSCRIBER_HEALTH_PORT=8085
go run agents/publisher/main.go

# Use custom observability endpoints
export JAEGER_ENDPOINT=jaeger.example.com:4317
export PROMETHEUS_PORT=9091
go run broker/main.go

Configuration Best Practices

  1. Use .envrc for Development: Keep all environment variables in .envrc for consistent development experience
  2. Override Selectively: Only override specific variables when needed, use defaults otherwise
  3. Environment-Specific Configs: Use different variable values for development, staging, and production
  4. Health Port Management: Use different health ports for each component to avoid conflicts
  5. Observability Integration: Always configure observability endpoints for production deployments

Legacy Configuration Migration

If migrating from previous versions of AgentHub:

Old Configuration Pattern:

// Manual server setup (deprecated)
lis, err := net.Listen("tcp", ":50051")
server := grpc.NewServer()
// ... extensive setup code

New Unified Abstraction Pattern:

// Automatic configuration from environment
config := agenthub.NewGRPCConfig("broker")
server, err := agenthub.NewAgentHubServer(config)
service := agenthub.NewAgentHubService(server)
pb.RegisterAgentHubServer(server.Server, service)
server.Start(ctx)

Command-Line Usage

Basic Commands

The unified abstraction provides simplified command execution:

agenthub-server [OPTIONS]

Options:
  -port int
        Server port (default 50051)
  -host string
        Server host (default "0.0.0.0")
  -config string
        Configuration file path
  -log-level string
        Log level: debug, info, warn, error (default "info")
  -log-file string
        Log file path (default: stdout)
  -max-connections int
        Maximum concurrent connections (default 1000)
  -channel-buffer-size int
        Channel buffer size (default 10)
  -help
        Show help message
  -version
        Show version information

Configuration File

The broker can also be configured using a YAML configuration file:

# agenthub.yaml
server:
  host: "0.0.0.0"
  port: 50051
  max_connections: 1000
  timeout: "30s"

logging:
  level: "info"
  format: "json"
  file: "/var/log/agenthub/broker.log"

performance:
  channel_buffer_size: 10
  max_message_size: "4MB"
  keepalive_time: "30s"
  keepalive_timeout: "5s"

limits:
  max_agents: 10000
  max_tasks_per_agent: 100
  memory_limit: "1GB"

security:
  tls_enabled: false
  cert_file: ""
  key_file: ""
  ca_file: ""

Loading Configuration:

agenthub-server -config /path/to/agenthub.yaml

Agent Configuration

Environment Variables

Agents can be configured using environment variables:

Connection Configuration

VariableDefaultDescription
AGENTHUB_BROKER_ADDRESSlocalhost:50051Broker server address
AGENTHUB_AGENT_IDGeneratedUnique agent identifier
AGENTHUB_CONNECTION_TIMEOUT10sConnection timeout
AGENTHUB_RETRY_ATTEMPTS3Connection retry attempts
AGENTHUB_RETRY_DELAY1sDelay between retries

Task Processing Configuration

VariableDefaultDescription
AGENTHUB_MAX_CONCURRENT_TASKS5Maximum concurrent task processing
AGENTHUB_TASK_TIMEOUT300sDefault task timeout
AGENTHUB_PROGRESS_INTERVAL5sProgress reporting interval
AGENTHUB_TASK_TYPES""Comma-separated list of supported task types

Logging Configuration

VariableDefaultDescription
AGENTHUB_AGENT_LOG_LEVELinfoAgent logging level
AGENTHUB_AGENT_LOG_FORMATtextAgent log format
AGENTHUB_AGENT_LOG_FILE""Agent log file path

Agent Configuration Examples

Publisher Configuration

package main

import (
    "os"
    "strconv"
    "time"
)

type PublisherConfig struct {
    BrokerAddress    string
    AgentID          string
    ConnectionTimeout time.Duration
    RetryAttempts    int
    RetryDelay       time.Duration
    LogLevel         string
}

func LoadPublisherConfig() *PublisherConfig {
    config := &PublisherConfig{
        BrokerAddress:    getEnv("AGENTHUB_BROKER_ADDRESS", "localhost:50051"),
        AgentID:          getEnv("AGENTHUB_AGENT_ID", generateAgentID()),
        ConnectionTimeout: getDuration("AGENTHUB_CONNECTION_TIMEOUT", "10s"),
        RetryAttempts:    getInt("AGENTHUB_RETRY_ATTEMPTS", 3),
        RetryDelay:       getDuration("AGENTHUB_RETRY_DELAY", "1s"),
        LogLevel:         getEnv("AGENTHUB_AGENT_LOG_LEVEL", "info"),
    }

    return config
}

func getEnv(key, defaultValue string) string {
    if value := os.Getenv(key); value != "" {
        return value
    }
    return defaultValue
}

func getInt(key string, defaultValue int) int {
    if value := os.Getenv(key); value != "" {
        if i, err := strconv.Atoi(value); err == nil {
            return i
        }
    }
    return defaultValue
}

func getDuration(key string, defaultValue string) time.Duration {
    if value := os.Getenv(key); value != "" {
        if d, err := time.ParseDuration(value); err == nil {
            return d
        }
    }
    d, _ := time.ParseDuration(defaultValue)
    return d
}

Subscriber Configuration

type SubscriberConfig struct {
    BrokerAddress      string
    AgentID            string
    MaxConcurrentTasks int
    TaskTimeout        time.Duration
    ProgressInterval   time.Duration
    SupportedTaskTypes []string
    LogLevel           string
}

func LoadSubscriberConfig() *SubscriberConfig {
    taskTypesStr := getEnv("AGENTHUB_TASK_TYPES", "")
    var taskTypes []string
    if taskTypesStr != "" {
        taskTypes = strings.Split(taskTypesStr, ",")
        for i, taskType := range taskTypes {
            taskTypes[i] = strings.TrimSpace(taskType)
        }
    }

    config := &SubscriberConfig{
        BrokerAddress:      getEnv("AGENTHUB_BROKER_ADDRESS", "localhost:50051"),
        AgentID:            getEnv("AGENTHUB_AGENT_ID", generateAgentID()),
        MaxConcurrentTasks: getInt("AGENTHUB_MAX_CONCURRENT_TASKS", 5),
        TaskTimeout:        getDuration("AGENTHUB_TASK_TIMEOUT", "300s"),
        ProgressInterval:   getDuration("AGENTHUB_PROGRESS_INTERVAL", "5s"),
        SupportedTaskTypes: taskTypes,
        LogLevel:           getEnv("AGENTHUB_AGENT_LOG_LEVEL", "info"),
    }

    return config
}

Agent Configuration File

Agents can also use configuration files:

# agent.yaml
agent:
  id: "data_processor_001"
  broker_address: "broker.example.com:50051"
  connection_timeout: "10s"
  retry_attempts: 3
  retry_delay: "1s"

task_processing:
  max_concurrent_tasks: 5
  task_timeout: "300s"
  progress_interval: "5s"
  supported_task_types:
    - "data_analysis"
    - "data_transformation"
    - "data_validation"

logging:
  level: "info"
  format: "json"
  file: "/var/log/agenthub/agent.log"

health:
  port: 8080
  endpoint: "/health"
  check_interval: "30s"

Security Configuration

TLS Configuration

Broker TLS Setup

# broker configuration
security:
  tls_enabled: true
  cert_file: "/etc/agenthub/certs/server.crt"
  key_file: "/etc/agenthub/certs/server.key"
  ca_file: "/etc/agenthub/certs/ca.crt"
  client_auth: "require_and_verify"

Agent TLS Setup

// Agent TLS connection
func createTLSConnection(address string) (*grpc.ClientConn, error) {
    config := &tls.Config{
        ServerName: "agenthub-broker",
        // Load client certificates if needed
    }

    creds := credentials.NewTLS(config)

    conn, err := grpc.Dial(address, grpc.WithTransportCredentials(creds))
    if err != nil {
        return nil, fmt.Errorf("failed to connect with TLS: %v", err)
    }

    return conn, nil
}

Authentication Configuration

JWT Authentication

# broker configuration
security:
  auth_enabled: true
  auth_method: "jwt"
  jwt_secret: "your-secret-key"
  jwt_issuer: "agenthub-broker"
  jwt_expiry: "24h"
// Agent authentication
type AuthenticatedAgent struct {
    client   pb.AgentHubClient
    token    string
    agentID  string
}

func (a *AuthenticatedAgent) authenticate() error {
    // Add authentication token to context
    ctx := metadata.AppendToOutgoingContext(context.Background(),
        "authorization", "Bearer "+a.token)

    // Use authenticated context for A2A requests
    _, err := a.client.PublishMessage(ctx, request)
    return err
}

Production Configuration Examples

High-Performance Broker Configuration

# production-broker.yaml
server:
  host: "0.0.0.0"
  port: 50051
  max_connections: 5000
  timeout: "60s"

performance:
  channel_buffer_size: 50
  max_message_size: "16MB"
  keepalive_time: "10s"
  keepalive_timeout: "3s"

limits:
  max_agents: 50000
  max_tasks_per_agent: 500
  memory_limit: "8GB"

logging:
  level: "warn"
  format: "json"
  file: "/var/log/agenthub/broker.log"

security:
  tls_enabled: true
  cert_file: "/etc/ssl/certs/agenthub.crt"
  key_file: "/etc/ssl/private/agenthub.key"

Cluster Agent Configuration

# cluster-agent.yaml
agent:
  id: "${HOSTNAME}_${POD_ID}"
  broker_address: "agenthub-broker.agenthub.svc.cluster.local:50051"
  connection_timeout: "15s"
  retry_attempts: 5
  retry_delay: "2s"

task_processing:
  max_concurrent_tasks: 10
  task_timeout: "1800s"  # 30 minutes
  progress_interval: "10s"

logging:
  level: "info"
  format: "json"
  file: "stdout"

health:
  port: 8080
  endpoint: "/health"
  check_interval: "30s"

metrics:
  enabled: true
  port: 9090
  endpoint: "/metrics"

Environment-Specific Configurations

Development Environment

# .env.development
AGENTHUB_PORT=50051
AGENTHUB_LOG_LEVEL=debug
AGENTHUB_LOG_FORMAT=text
AGENTHUB_MAX_CONNECTIONS=100
AGENTHUB_CHANNEL_BUFFER_SIZE=5

# Agent settings
AGENTHUB_BROKER_ADDRESS=localhost:50051
AGENTHUB_MAX_CONCURRENT_TASKS=2
AGENTHUB_TASK_TIMEOUT=60s
AGENTHUB_AGENT_LOG_LEVEL=debug

Staging Environment

# .env.staging
AGENTHUB_PORT=50051
AGENTHUB_LOG_LEVEL=info
AGENTHUB_LOG_FORMAT=json
AGENTHUB_MAX_CONNECTIONS=1000
AGENTHUB_CHANNEL_BUFFER_SIZE=20

# Security
AGENTHUB_TLS_ENABLED=true
AGENTHUB_CERT_FILE=/etc/certs/staging.crt
AGENTHUB_KEY_FILE=/etc/certs/staging.key

# Agent settings
AGENTHUB_BROKER_ADDRESS=staging-broker.example.com:50051
AGENTHUB_MAX_CONCURRENT_TASKS=5
AGENTHUB_TASK_TIMEOUT=300s

Production Environment

# .env.production
AGENTHUB_PORT=50051
AGENTHUB_LOG_LEVEL=warn
AGENTHUB_LOG_FORMAT=json
AGENTHUB_LOG_FILE=/var/log/agenthub/broker.log
AGENTHUB_MAX_CONNECTIONS=5000
AGENTHUB_CHANNEL_BUFFER_SIZE=50

# Security
AGENTHUB_TLS_ENABLED=true
AGENTHUB_CERT_FILE=/etc/ssl/certs/agenthub.crt
AGENTHUB_KEY_FILE=/etc/ssl/private/agenthub.key
AGENTHUB_CA_FILE=/etc/ssl/certs/ca.crt

# Performance
AGENTHUB_MAX_MESSAGE_SIZE=16MB
AGENTHUB_KEEPALIVE_TIME=10s
AGENTHUB_MEMORY_LIMIT=8GB

# Agent settings
AGENTHUB_BROKER_ADDRESS=agenthub-prod.example.com:50051
AGENTHUB_MAX_CONCURRENT_TASKS=10
AGENTHUB_TASK_TIMEOUT=1800s
AGENTHUB_CONNECTION_TIMEOUT=15s
AGENTHUB_RETRY_ATTEMPTS=5

Configuration Validation

Broker Configuration Validation

type BrokerConfig struct {
    Port             int           `yaml:"port" validate:"min=1,max=65535"`
    Host             string        `yaml:"host" validate:"required"`
    MaxConnections   int           `yaml:"max_connections" validate:"min=1"`
    Timeout          time.Duration `yaml:"timeout" validate:"min=1s"`
    ChannelBufferSize int          `yaml:"channel_buffer_size" validate:"min=1"`
}

func (c *BrokerConfig) Validate() error {
    validate := validator.New()
    return validate.Struct(c)
}

Agent Configuration Validation

type AgentConfig struct {
    BrokerAddress      string        `yaml:"broker_address" validate:"required"`
    AgentID            string        `yaml:"agent_id" validate:"required,min=1,max=64"`
    MaxConcurrentTasks int           `yaml:"max_concurrent_tasks" validate:"min=1,max=100"`
    TaskTimeout        time.Duration `yaml:"task_timeout" validate:"min=1s"`
}

func (c *AgentConfig) Validate() error {
    validate := validator.New()
    if err := validate.Struct(c); err != nil {
        return err
    }

    // Custom validation
    if !strings.Contains(c.BrokerAddress, ":") {
        return errors.New("broker_address must include port")
    }

    return nil
}

This comprehensive configuration reference covers all aspects of configuring AgentHub for different environments and use cases.

2 - API Reference

Complete API documentation and specifications

API Reference Documentation

This section contains comprehensive API documentation for all AgentHub interfaces, including gRPC APIs, unified abstractions, and tracing interfaces.

Available Documentation

2.1 - A2A-Compliant AgentHub API Reference

Complete technical reference for the A2A-compliant AgentHub API, including all gRPC services, message types, and operational details.

A2A-Compliant AgentHub API Reference

This document provides complete technical reference for the Agent2Agent (A2A) protocol-compliant AgentHub API, including all gRPC services, message types, and operational details.

gRPC Service Definition

The AgentHub broker implements the AgentHub service as defined in proto/eventbus.proto:

service AgentHub {
  // ===== A2A Message Publishing (EDA style) =====

  // PublishMessage submits an A2A message for delivery through the broker
  rpc PublishMessage(PublishMessageRequest) returns (PublishResponse);

  // PublishTaskUpdate notifies subscribers about A2A task state changes
  rpc PublishTaskUpdate(PublishTaskUpdateRequest) returns (PublishResponse);

  // PublishTaskArtifact delivers A2A task output artifacts to subscribers
  rpc PublishTaskArtifact(PublishTaskArtifactRequest) returns (PublishResponse);

  // ===== A2A Event Subscriptions (EDA style) =====

  // SubscribeToMessages creates a stream of A2A message events for an agent
  rpc SubscribeToMessages(SubscribeToMessagesRequest) returns (stream AgentEvent);

  // SubscribeToTasks creates a stream of A2A task events for an agent
  rpc SubscribeToTasks(SubscribeToTasksRequest) returns (stream AgentEvent);

  // SubscribeToAgentEvents creates a unified stream of all events for an agent
  rpc SubscribeToAgentEvents(SubscribeToAgentEventsRequest) returns (stream AgentEvent);

  // ===== A2A Task Management (compatible with A2A spec) =====

  // GetTask retrieves the current state of an A2A task by ID
  rpc GetTask(GetTaskRequest) returns (a2a.Task);

  // CancelTask cancels an active A2A task and notifies subscribers
  rpc CancelTask(CancelTaskRequest) returns (a2a.Task);

  // ListTasks returns A2A tasks matching the specified criteria
  rpc ListTasks(ListTasksRequest) returns (ListTasksResponse);

  // ===== Agent Discovery (A2A compatible) =====

  // GetAgentCard returns the broker's A2A agent card for discovery
  rpc GetAgentCard(google.protobuf.Empty) returns (a2a.AgentCard);

  // RegisterAgent registers an agent with the broker for event routing
  rpc RegisterAgent(RegisterAgentRequest) returns (RegisterAgentResponse);
}

A2A Message Types

Core A2A Types

A2A Message

Represents an A2A-compliant message for agent communication.

message Message {
  string message_id = 1;       // Required: Unique message identifier
  string context_id = 2;       // Optional: Conversation context
  string task_id = 3;          // Optional: Associated task
  Role role = 4;               // Required: USER or AGENT
  repeated Part content = 5;   // Required: Message content parts
  google.protobuf.Struct metadata = 6; // Optional: Additional metadata
  repeated string extensions = 7;       // Optional: Protocol extensions
}

Field Details:

  • message_id: Must be unique across all messages. Generated automatically if not provided
  • context_id: Groups related messages in a conversation or workflow
  • task_id: Links message to a specific A2A task
  • role: Indicates whether message is from USER (requesting agent) or AGENT (responding agent)
  • content: Array of A2A Part structures containing the actual message content
  • metadata: Additional context for routing, processing, or debugging
  • extensions: Protocol extension identifiers for future compatibility

A2A Part

Represents content within an A2A message.

message Part {
  oneof part {
    string text = 1;           // Text content
    DataPart data = 2;         // Structured data
    FilePart file = 3;         // File reference
  }
}

message DataPart {
  google.protobuf.Struct data = 1;    // Structured data content
  string description = 2;             // Optional data description
}

message FilePart {
  string file_id = 1;                 // File identifier or URI
  string filename = 2;                // Original filename
  string mime_type = 3;               // MIME type
  int64 size_bytes = 4;               // File size in bytes
  google.protobuf.Struct metadata = 5; // Additional file metadata
}

A2A Task

Represents an A2A-compliant task with lifecycle management.

message Task {
  string id = 1;                    // Required: Task identifier
  string context_id = 2;            // Optional: Conversation context
  TaskStatus status = 3;            // Required: Current task status
  repeated Message history = 4;     // Message history for this task
  repeated Artifact artifacts = 5;  // Task output artifacts
  google.protobuf.Struct metadata = 6; // Task metadata
}

message TaskStatus {
  TaskState state = 1;              // Current task state
  Message update = 2;               // Status update message
  google.protobuf.Timestamp timestamp = 3; // Status timestamp
}

enum TaskState {
  TASK_STATE_SUBMITTED = 0;    // Task created and submitted
  TASK_STATE_WORKING = 1;      // Task in progress
  TASK_STATE_COMPLETED = 2;    // Task completed successfully
  TASK_STATE_FAILED = 3;       // Task failed with error
  TASK_STATE_CANCELLED = 4;    // Task cancelled
}

A2A Artifact

Represents structured output from completed tasks.

message Artifact {
  string artifact_id = 1;           // Required: Artifact identifier
  string name = 2;                  // Human-readable name
  string description = 3;           // Artifact description
  repeated Part parts = 4;          // Artifact content parts
  google.protobuf.Struct metadata = 5; // Artifact metadata
}

EDA Event Wrapper Types

AgentEvent

Wraps A2A messages for Event-Driven Architecture transport.

message AgentEvent {
  string event_id = 1;                     // Unique event identifier
  google.protobuf.Timestamp timestamp = 2; // Event timestamp

  // A2A-compliant payload
  oneof payload {
    a2a.Message message = 10;              // A2A Message
    a2a.Task task = 11;                    // A2A Task
    TaskStatusUpdateEvent status_update = 12; // Task status change
    TaskArtifactUpdateEvent artifact_update = 13; // Artifact update
  }

  // EDA routing metadata
  AgentEventMetadata routing = 20;

  // Observability context
  string trace_id = 30;
  string span_id = 31;
}

AgentEventMetadata

Provides routing and delivery information for events.

message AgentEventMetadata {
  string from_agent_id = 1;               // Source agent identifier
  string to_agent_id = 2;                 // Target agent ID (empty = broadcast)
  string event_type = 3;                  // Event classification
  repeated string subscriptions = 4;      // Topic-based routing tags
  Priority priority = 5;                  // Delivery priority
}

Request/Response Messages

PublishMessageRequest

message PublishMessageRequest {
  a2a.Message message = 1;                // A2A message to publish
  AgentEventMetadata routing = 2;         // EDA routing info
}

SubscribeToTasksRequest

message SubscribeToTasksRequest {
  string agent_id = 1;                    // Agent ID for subscription
  repeated string task_types = 2;         // Optional task type filter
  repeated a2a.TaskState states = 3;      // Optional state filter
}

GetTaskRequest

message GetTaskRequest {
  string task_id = 1;                     // Task identifier
  int32 history_length = 2;               // History limit (optional)
}

API Operations

Publishing A2A Messages

PublishMessage

Publishes an A2A message for delivery through the EDA broker.

Go Example:

// Create A2A message content
content := []*pb.Part{
    {
        Part: &pb.Part_Text{
            Text: "Hello! Please process this request.",
        },
    },
    {
        Part: &pb.Part_Data{
            Data: &pb.DataPart{
                Data: &structpb.Struct{
                    Fields: map[string]*structpb.Value{
                        "operation": structpb.NewStringValue("process_data"),
                        "dataset_id": structpb.NewStringValue("dataset_123"),
                    },
                },
            },
        },
    },
}

// Create A2A message
message := &pb.Message{
    MessageId: "msg_12345",
    ContextId: "conversation_abc",
    TaskId:    "task_67890",
    Role:      pb.Role_ROLE_USER,
    Content:   content,
    Metadata: &structpb.Struct{
        Fields: map[string]*structpb.Value{
            "priority": structpb.NewStringValue("high"),
        },
    },
}

// Publish through AgentHub
response, err := client.PublishMessage(ctx, &pb.PublishMessageRequest{
    Message: message,
    Routing: &pb.AgentEventMetadata{
        FromAgentId: "requester_agent",
        ToAgentId:   "processor_agent",
        EventType:   "task_message",
        Priority:    pb.Priority_PRIORITY_HIGH,
    },
})

Subscribing to A2A Events

SubscribeToTasks

Creates a stream of A2A task events for an agent.

Go Example:

req := &pb.SubscribeToTasksRequest{
    AgentId: "processor_agent",
    TaskTypes: []string{"data_processing", "image_analysis"}, // Optional filter
}

stream, err := client.SubscribeToTasks(ctx, req)
if err != nil {
    return err
}

for {
    event, err := stream.Recv()
    if err == io.EOF {
        break
    }
    if err != nil {
        return err
    }

    // Process different event types
    switch payload := event.GetPayload().(type) {
    case *pb.AgentEvent_Task:
        task := payload.Task
        log.Printf("Received A2A task: %s", task.GetId())

        // Process task using A2A handler
        artifact, status, errorMsg := processA2ATask(ctx, task)

        // Publish completion
        publishTaskCompletion(ctx, client, task, artifact, status, errorMsg)

    case *pb.AgentEvent_StatusUpdate:
        update := payload.StatusUpdate
        log.Printf("Task %s status: %s", update.GetTaskId(), update.GetStatus().GetState())

    case *pb.AgentEvent_ArtifactUpdate:
        artifact := payload.ArtifactUpdate
        log.Printf("Received artifact for task %s", artifact.GetTaskId())
    }
}

A2A Task Management

GetTask

Retrieves the current state of an A2A task.

Go Example:

req := &pb.GetTaskRequest{
    TaskId: "task_67890",
    HistoryLength: 10, // Optional: limit message history
}

task, err := client.GetTask(ctx, req)
if err != nil {
    return err
}

log.Printf("Task %s status: %s", task.GetId(), task.GetStatus().GetState())
log.Printf("Message history: %d messages", len(task.GetHistory()))
log.Printf("Artifacts: %d artifacts", len(task.GetArtifacts()))

CancelTask

Cancels an active A2A task.

Go Example:

req := &pb.CancelTaskRequest{
    TaskId: "task_67890",
    Reason: "User requested cancellation",
}

task, err := client.CancelTask(ctx, req)
if err != nil {
    return err
}

log.Printf("Task %s cancelled", task.GetId())

Agent Discovery

GetAgentCard

Returns the broker’s A2A agent card for discovery.

Go Example:

card, err := client.GetAgentCard(ctx, &emptypb.Empty{})
if err != nil {
    return err
}

log.Printf("AgentHub broker: %s v%s", card.GetName(), card.GetVersion())
log.Printf("Protocol version: %s", card.GetProtocolVersion())
log.Printf("Capabilities: streaming=%v", card.GetCapabilities().GetStreaming())

for _, skill := range card.GetSkills() {
    log.Printf("Skill: %s - %s", skill.GetName(), skill.GetDescription())
}

RegisterAgent

Registers an agent with the broker.

Go Example:

agentCard := &pb.AgentCard{
    ProtocolVersion: "0.2.9",
    Name:           "my-processor-agent",
    Description:    "Data processing agent with A2A compliance",
    Version:        "1.0.0",
    Capabilities: &pb.AgentCapabilities{
        Streaming: true,
    },
    Skills: []*pb.AgentSkill{
        {
            Id:          "data_processing",
            Name:        "Data Processing",
            Description: "Process structured datasets",
            Tags:        []string{"data", "analysis"},
        },
    },
}

response, err := client.RegisterAgent(ctx, &pb.RegisterAgentRequest{
    AgentCard: agentCard,
    Subscriptions: []string{"data_processing", "analytics"},
})

if response.GetSuccess() {
    log.Printf("Agent registered with ID: %s", response.GetAgentId())
} else {
    log.Printf("Registration failed: %s", response.GetError())
}

High-Level A2A Client Abstractions

A2ATaskPublisher

Simplified interface for publishing A2A tasks.

taskPublisher := &agenthub.A2ATaskPublisher{
    Client:         client,
    TraceManager:   traceManager,
    MetricsManager: metricsManager,
    Logger:         logger,
    ComponentName:  "my-publisher",
    AgentID:        "my-agent-id",
}

task, err := taskPublisher.PublishTask(ctx, &agenthub.A2APublishTaskRequest{
    TaskType:         "data_analysis",
    Content:          contentParts,
    RequesterAgentID: "my-agent-id",
    ResponderAgentID: "data-processor",
    Priority:         pb.Priority_PRIORITY_MEDIUM,
    ContextID:        "analysis-session-123",
})

A2ATaskSubscriber

Simplified interface for processing A2A tasks.

taskSubscriber := agenthub.NewA2ATaskSubscriber(client, "my-agent-id")

// Register task handlers
taskSubscriber.RegisterTaskHandler("data_analysis", func(ctx context.Context, task *pb.Task, message *pb.Message) (*pb.Artifact, pb.TaskState, string) {
    // Process the A2A task
    result := processDataAnalysis(task, message)

    // Return A2A artifact
    artifact := &pb.Artifact{
        ArtifactId:  fmt.Sprintf("result_%s", task.GetId()),
        Name:        "analysis_result",
        Description: "Data analysis results",
        Parts: []*pb.Part{
            {
                Part: &pb.Part_Data{
                    Data: &pb.DataPart{
                        Data: result,
                    },
                },
            },
        },
    }

    return artifact, pb.TaskState_TASK_STATE_COMPLETED, ""
})

// Start processing A2A tasks
err := taskSubscriber.SubscribeToTasks(ctx)

Error Handling

gRPC Status Codes

AgentHub uses standard gRPC status codes:

InvalidArgument (Code: 3)

  • Missing required fields (message_id, role, content)
  • Invalid A2A message structure
  • Malformed Part content

NotFound (Code: 5)

  • Task ID not found in GetTask/CancelTask
  • Agent not registered

Internal (Code: 13)

  • Server-side processing errors
  • Message routing failures
  • A2A validation errors

Retry Patterns

func publishWithRetry(ctx context.Context, client pb.AgentHubClient, req *pb.PublishMessageRequest) error {
    for attempt := 0; attempt < 3; attempt++ {
        _, err := client.PublishMessage(ctx, req)
        if err == nil {
            return nil
        }

        // Check if error is retryable
        if status.Code(err) == codes.InvalidArgument {
            return err // Don't retry validation errors
        }

        // Exponential backoff
        time.Sleep(time.Duration(1<<attempt) * time.Second)
    }
    return fmt.Errorf("max retries exceeded")
}

Performance Considerations

Message Size Limits

  • Maximum message size: 4MB (gRPC default)
  • Recommended size: <100KB for optimal A2A compliance
  • Large content: Use FilePart references for large data

A2A Best Practices

  1. Use structured Parts: Prefer DataPart for structured data over text
  2. Context management: Group related messages with context_id
  3. Artifact structure: Return well-formed Artifact objects
  4. Task lifecycle: Properly manage TaskState transitions
  5. Connection reuse: Maintain persistent gRPC connections

This completes the comprehensive A2A-compliant API reference for AgentHub, covering all message types, operations, and integration patterns with practical examples.

2.2 - AgentHub Tracing API Reference

Complete API documentation for AgentHub’s OpenTelemetry tracing integration, span management, context propagation, and instrumentation patterns.

🔍 AgentHub Tracing API Reference

Technical reference: Complete API documentation for AgentHub’s OpenTelemetry tracing integration, span management, context propagation, and instrumentation patterns.

Core Components

TraceManager

The TraceManager provides high-level tracing operations for AgentHub events.

Constructor

func NewTraceManager(serviceName string) *TraceManager

Parameters:

  • serviceName - Name of the service creating spans

Returns: Configured TraceManager instance

Methods

StartPublishSpan
func (tm *TraceManager) StartPublishSpan(ctx context.Context, responderAgentID, eventType string) (context.Context, trace.Span)

Purpose: Creates a span for event publishing operations

Parameters:

  • ctx - Parent context (may contain existing trace)
  • responderAgentID - Target agent for the event
  • eventType - Type of event being published

Returns:

  • context.Context - New context with active span
  • trace.Span - The created span

Attributes Set:

  • event.type - Event type being published
  • responder.agent - Target agent ID
  • operation.type - “publish”

Usage:

ctx, span := tm.StartPublishSpan(ctx, "agent_subscriber", "greeting")
defer span.End()
// ... publishing logic
StartEventProcessingSpan
func (tm *TraceManager) StartEventProcessingSpan(ctx context.Context, eventID, eventType, requesterAgentID, responderAgentID string) (context.Context, trace.Span)

Purpose: Creates a span for event processing operations

Parameters:

  • ctx - Context with extracted trace information
  • eventID - Unique identifier for the event
  • eventType - Type of event being processed
  • requesterAgentID - Agent that requested processing
  • responderAgentID - Agent performing processing

Returns:

  • context.Context - Context with processing span
  • trace.Span - The processing span

Attributes Set:

  • event.id - Event identifier
  • event.type - Event type
  • requester.agent - Requesting agent ID
  • responder.agent - Processing agent ID
  • operation.type - “process”
StartBrokerSpan
func (tm *TraceManager) StartBrokerSpan(ctx context.Context, operation, eventType string) (context.Context, trace.Span)

Purpose: Creates spans for broker operations

Parameters:

  • ctx - Request context
  • operation - Broker operation (route, subscribe, unsubscribe)
  • eventType - Event type being handled

Returns:

  • context.Context - Context with broker span
  • trace.Span - The broker span

Attributes Set:

  • operation.type - Broker operation type
  • event.type - Event type being handled
  • component - “broker”
InjectTraceContext
func (tm *TraceManager) InjectTraceContext(ctx context.Context, headers map[string]string)

Purpose: Injects trace context into headers for propagation

Parameters:

  • ctx - Context containing trace information
  • headers - Map to inject headers into

Headers Injected:

  • traceparent - W3C trace context header
  • tracestate - W3C trace state header (if present)

Usage:

headers := make(map[string]string)
tm.InjectTraceContext(ctx, headers)
// headers now contain trace context for propagation
ExtractTraceContext
func (tm *TraceManager) ExtractTraceContext(ctx context.Context, headers map[string]string) context.Context

Purpose: Extracts trace context from headers

Parameters:

  • ctx - Base context
  • headers - Headers containing trace context

Returns: Context with extracted trace information

Usage:

// Extract from event metadata
if metadata := event.GetMetadata(); metadata != nil {
    if traceHeaders, ok := metadata.Fields["trace_headers"]; ok {
        headers := structFieldsToStringMap(traceHeaders.GetStructValue().Fields)
        ctx = tm.ExtractTraceContext(ctx, headers)
    }
}
RecordError
func (tm *TraceManager) RecordError(span trace.Span, err error)

Purpose: Records an error on a span with proper formatting

Parameters:

  • span - Span to record error on
  • err - Error to record

Effects:

  • Sets span status to error
  • Records error as span event
  • Adds error type attribute
SetSpanSuccess
func (tm *TraceManager) SetSpanSuccess(span trace.Span)

Purpose: Marks a span as successful

Parameters:

  • span - Span to mark as successful

Effects:

  • Sets span status to OK
  • Ensures span is properly completed

Context Propagation

W3C Trace Context Standards

AgentHub uses the W3C Trace Context specification for interoperability.

Trace Context Headers

traceparent

Format: 00-{trace-id}-{span-id}-{trace-flags}

  • 00 - Version (currently always 00)
  • trace-id - 32-character hex string
  • span-id - 16-character hex string
  • trace-flags - 2-character hex flags

Example: 00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01

tracestate

Format: Vendor-specific key-value pairs Example: rojo=00f067aa0ba902b7,congo=t61rcWkgMzE

Propagation Implementation

Manual Injection

// Create headers map
headers := make(map[string]string)

// Inject trace context
otel.GetTextMapPropagator().Inject(ctx, propagation.MapCarrier(headers))

// Headers now contain trace context
// Convert to protobuf metadata if needed
metadataStruct, err := structpb.NewStruct(map[string]interface{}{
    "trace_headers": headers,
    "timestamp": time.Now().Format(time.RFC3339),
})

Manual Extraction

// Extract from protobuf metadata
if metadata := task.GetMetadata(); metadata != nil {
    if traceHeaders, ok := metadata.Fields["trace_headers"]; ok {
        headers := make(map[string]string)
        for k, v := range traceHeaders.GetStructValue().Fields {
            headers[k] = v.GetStringValue()
        }
        ctx = otel.GetTextMapPropagator().Extract(ctx, propagation.MapCarrier(headers))
    }
}

Span Lifecycle Management

Creating Spans

Basic Span Creation

tracer := otel.Tracer("my-service")
ctx, span := tracer.Start(ctx, "operation_name")
defer span.End()

Span with Attributes

ctx, span := tracer.Start(ctx, "operation_name", trace.WithAttributes(
    attribute.String("operation.type", "publish"),
    attribute.String("event.type", "greeting"),
    attribute.Int("event.priority", 1),
))
defer span.End()

Child Span Creation

// Parent span
ctx, parentSpan := tracer.Start(ctx, "parent_operation")
defer parentSpan.End()

// Child span (automatically linked)
ctx, childSpan := tracer.Start(ctx, "child_operation")
defer childSpan.End()

Span Attributes

Standard Attributes

AgentHub uses consistent attribute naming:

// Event attributes
attribute.String("event.id", taskID)
attribute.String("event.type", taskType)
attribute.Int("event.priority", priority)

// Agent attributes
attribute.String("agent.id", agentID)
attribute.String("agent.type", agentType)
attribute.String("requester.agent", requesterID)
attribute.String("responder.agent", responderID)

// Operation attributes
attribute.String("operation.type", "publish|process|route")
attribute.String("component", "broker|publisher|subscriber")

// Result attributes
attribute.Bool("success", true)
attribute.String("error.type", "validation|timeout|network")

Custom Attributes

span.SetAttributes(
    attribute.String("business.unit", "sales"),
    attribute.String("user.tenant", "acme-corp"),
    attribute.Int("batch.size", len(items)),
    attribute.Duration("timeout", 30*time.Second),
)

Span Events

Adding Events

// Simple event
span.AddEvent("validation.started")

// Event with attributes
span.AddEvent("cache.miss", trace.WithAttributes(
    attribute.String("cache.key", key),
    attribute.String("cache.type", "redis"),
))

// Event with timestamp
span.AddEvent("external.api.call", trace.WithAttributes(
    attribute.String("api.endpoint", "/v1/users"),
    attribute.Int("api.status_code", 200),
), trace.WithTimestamp(time.Now()))

Common Event Patterns

// Processing milestones
span.AddEvent("processing.started")
span.AddEvent("validation.completed")
span.AddEvent("business.logic.completed")
span.AddEvent("result.published")

// Error events
span.AddEvent("error.occurred", trace.WithAttributes(
    attribute.String("error.message", err.Error()),
    attribute.String("error.stack", debug.Stack()),
))

Span Status

Setting Status

// Success
span.SetStatus(codes.Ok, "")

// Error with message
span.SetStatus(codes.Error, "validation failed")

// Error without message
span.SetStatus(codes.Error, "")

Status Code Mapping

// gRPC codes to OpenTelemetry codes
statusCode := codes.Ok
if err != nil {
    switch {
    case errors.Is(err, context.DeadlineExceeded):
        statusCode = codes.DeadlineExceeded
    case errors.Is(err, context.Canceled):
        statusCode = codes.Cancelled
    default:
        statusCode = codes.Error
    }
}
span.SetStatus(statusCode, err.Error())

Advanced Instrumentation

Baggage Propagation

Setting Baggage

// Add baggage to context
ctx = baggage.ContextWithValues(ctx,
    baggage.String("user.id", userID),
    baggage.String("tenant.id", tenantID),
    baggage.String("request.id", requestID),
)

Reading Baggage

// Read baggage anywhere in the trace
if member := baggage.FromContext(ctx).Member("user.id"); member.Value() != "" {
    userID := member.Value()
    // Use user ID for business logic
}
// Link to related span
linkedSpanContext := trace.SpanContextFromContext(relatedCtx)
ctx, span := tracer.Start(ctx, "operation", trace.WithLinks(
    trace.Link{
        SpanContext: linkedSpanContext,
        Attributes: []attribute.KeyValue{
            attribute.String("link.type", "related_operation"),
        },
    },
))

Sampling Control

Conditional Sampling

// Force sampling for important operations
ctx, span := tracer.Start(ctx, "critical_operation",
    trace.WithNewRoot(), // Start new trace
    trace.WithSpanKind(trace.SpanKindServer),
)

// Add sampling priority
span.SetAttributes(
    attribute.String("sampling.priority", "high"),
)

Integration Patterns

gRPC Integration

Server Interceptor

func TracingUnaryInterceptor(tracer trace.Tracer) grpc.UnaryServerInterceptor {
    return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
        ctx, span := tracer.Start(ctx, info.FullMethod)
        defer span.End()

        resp, err := handler(ctx, req)
        if err != nil {
            span.RecordError(err)
            span.SetStatus(codes.Error, err.Error())
        }
        return resp, err
    }
}

Client Interceptor

func TracingUnaryClientInterceptor(tracer trace.Tracer) grpc.UnaryClientInterceptor {
    return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
        ctx, span := tracer.Start(ctx, method)
        defer span.End()

        err := invoker(ctx, method, req, reply, cc, opts...)
        if err != nil {
            span.RecordError(err)
            span.SetStatus(codes.Error, err.Error())
        }
        return err
    }
}

HTTP Integration

HTTP Handler Wrapper

func TracingHandler(tracer trace.Tracer, next http.Handler) http.Handler {
    return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
        ctx := otel.GetTextMapPropagator().Extract(r.Context(), propagation.HeaderCarrier(r.Header))
        ctx, span := tracer.Start(ctx, r.Method+" "+r.URL.Path)
        defer span.End()

        span.SetAttributes(
            attribute.String("http.method", r.Method),
            attribute.String("http.url", r.URL.String()),
            attribute.String("http.user_agent", r.UserAgent()),
        )

        next.ServeHTTP(w, r.WithContext(ctx))
    })
}

Error Handling

Error Recording Best Practices

Complete Error Recording

if err != nil {
    // Record error on span
    span.RecordError(err)
    span.SetStatus(codes.Error, err.Error())

    // Add error context
    span.SetAttributes(
        attribute.String("error.type", classifyError(err)),
        attribute.Bool("error.retryable", isRetryable(err)),
    )

    // Log with context
    logger.ErrorContext(ctx, "Operation failed",
        slog.Any("error", err),
        slog.String("operation", "event_processing"),
    )

    return err
}

Error Classification

func classifyError(err error) string {
    switch {
    case errors.Is(err, context.DeadlineExceeded):
        return "timeout"
    case errors.Is(err, context.Canceled):
        return "cancelled"
    case strings.Contains(err.Error(), "connection"):
        return "network"
    case strings.Contains(err.Error(), "validation"):
        return "validation"
    default:
        return "unknown"
    }
}

Performance Considerations

Span Creation Overhead

  • Span creation: ~1-2μs per span
  • Attribute setting: ~100ns per attribute
  • Event recording: ~200ns per event
  • Context propagation: ~500ns per injection/extraction

Memory Usage

  • Active span: ~500 bytes
  • Completed span buffer: ~1KB per span
  • Context overhead: ~100 bytes per context

Best Practices

  1. Limit span attributes to essential information
  2. Use batch exporters to reduce network overhead
  3. Sample appropriately for high-throughput services
  4. Pool span contexts where possible
  5. Avoid deep span nesting (>10 levels)

Troubleshooting

Missing Spans Checklist

  1. ✅ OpenTelemetry properly initialized
  2. ✅ Tracer retrieved from global provider
  3. ✅ Context propagated correctly
  4. ✅ Spans properly ended
  5. ✅ Exporter configured and accessible

Common Issues

Broken Trace Chains

// ❌ Wrong - creates new root trace
ctx, span := tracer.Start(context.Background(), "operation")

// ✅ Correct - continues existing trace
ctx, span := tracer.Start(ctx, "operation")

Missing Context Propagation

// ❌ Wrong - context not propagated
go func() {
    ctx, span := tracer.Start(context.Background(), "async_work")
    // work...
}()

// ✅ Correct - context properly propagated
go func(ctx context.Context) {
    ctx, span := tracer.Start(ctx, "async_work")
    // work...
}(ctx)

🎯 Next Steps:

Implementation: Add Observability to Your Agent

Debugging: Debug with Distributed Tracing

Metrics: Observability Metrics Reference

2.3 - Unified Abstraction Library API Reference

The AgentHub unified abstraction library provides simplified APIs for building gRPC-based agent communication systems with built-in observability, automatic configuration, and correlation tracking.

Unified Abstraction Library API Reference

The AgentHub unified abstraction library provides simplified APIs for building gRPC-based agent communication systems with built-in observability, automatic configuration, and correlation tracking.

Package: internal/agenthub

The internal/agenthub package contains the core unified abstraction components that dramatically simplify AgentHub development by providing high-level APIs with automatic observability integration.

Overview

The unified abstraction library reduces agent implementation complexity from 380+ lines to ~29 lines by providing:

  • Automatic gRPC Setup: One-line server and client creation
  • Built-in Observability: Integrated OpenTelemetry tracing and metrics
  • Environment-Based Configuration: Automatic configuration from environment variables
  • Correlation Tracking: Automatic correlation ID generation and propagation
  • Pluggable Architecture: Simple task handler registration

Core Components

GRPCConfig

Configuration structure for gRPC servers and clients with environment-based initialization.

type GRPCConfig struct {
    ServerAddr    string // gRPC server listen address (e.g., ":50051")
    BrokerAddr    string // Broker connection address (e.g., "localhost:50051")
    HealthPort    string // Health check endpoint port
    ComponentName string // Component identifier for observability
}

Constructor

func NewGRPCConfig(componentName string) *GRPCConfig

Creates a new gRPC configuration with environment variable defaults:

Environment VariableDefaultDescription
AGENTHUB_BROKER_ADDRlocalhostBroker server host
AGENTHUB_BROKER_PORT50051Broker gRPC port
AGENTHUB_GRPC_PORT:50051Server listen port
BROKER_HEALTH_PORT8080Health endpoint port

Example:

config := agenthub.NewGRPCConfig("my-agent")
// Results in BrokerAddr: "localhost:50051" (automatically combined)

AgentHubServer

High-level gRPC server wrapper with integrated observability.

type AgentHubServer struct {
    Server         *grpc.Server                    // Underlying gRPC server
    Listener       net.Listener                    // Network listener
    Observability  *observability.Observability    // OpenTelemetry integration
    TraceManager   *observability.TraceManager     // Distributed tracing
    MetricsManager *observability.MetricsManager   // Metrics collection
    HealthServer   *observability.HealthServer     // Health monitoring
    Logger         *slog.Logger                    // Structured logging
    Config         *GRPCConfig                     // Configuration
}

Constructor

func NewAgentHubServer(config *GRPCConfig) (*AgentHubServer, error)

Creates a complete gRPC server with:

  • OpenTelemetry instrumentation
  • Health check endpoints
  • Metrics collection
  • Structured logging with trace correlation

Methods

func (s *AgentHubServer) Start(ctx context.Context) error

Starts the server with automatic:

  • Health endpoint setup (/health, /ready, /metrics)
  • Metrics collection goroutine
  • gRPC server with observability
func (s *AgentHubServer) Shutdown(ctx context.Context) error

Gracefully shuts down all components:

  • gRPC server graceful stop
  • Health server shutdown
  • Observability cleanup

Example:

config := agenthub.NewGRPCConfig("broker")
server, err := agenthub.NewAgentHubServer(config)
if err != nil {
    log.Fatal(err)
}

// Register services
eventBusService := agenthub.NewEventBusService(server)
pb.RegisterEventBusServer(server.Server, eventBusService)

// Start server
if err := server.Start(ctx); err != nil {
    log.Fatal(err)
}

AgentHubClient

High-level gRPC client wrapper with integrated observability.

type AgentHubClient struct {
    Client         pb.EventBusClient               // gRPC client
    Connection     *grpc.ClientConn                // Connection
    Observability  *observability.Observability    // OpenTelemetry integration
    TraceManager   *observability.TraceManager     // Distributed tracing
    MetricsManager *observability.MetricsManager   // Metrics collection
    HealthServer   *observability.HealthServer     // Health monitoring
    Logger         *slog.Logger                    // Structured logging
    Config         *GRPCConfig                     // Configuration
}

Constructor

func NewAgentHubClient(config *GRPCConfig) (*AgentHubClient, error)

Creates a complete gRPC client with:

  • OpenTelemetry instrumentation
  • Connection health monitoring
  • Metrics collection
  • Automatic retry and timeout handling

Methods

func (c *AgentHubClient) Start(ctx context.Context) error

Initializes client with health monitoring and metrics collection.

func (c *AgentHubClient) Shutdown(ctx context.Context) error

Gracefully closes connection and cleans up resources.

Example:

config := agenthub.NewGRPCConfig("publisher")
client, err := agenthub.NewAgentHubClient(config)
if err != nil {
    log.Fatal(err)
}

err = client.Start(ctx)
if err != nil {
    log.Fatal(err)
}

// Use client.Client for gRPC calls

Service Abstractions

EventBusService

Broker service implementation with built-in observability and correlation tracking.

type EventBusService struct {
    Server          *AgentHubServer
    subscriptions   map[string][]Subscription
    resultSubs      map[string][]ResultSubscription
    progressSubs    map[string][]ProgressSubscription
    mu              sync.RWMutex
}

Constructor

func NewEventBusService(server *AgentHubServer) *EventBusService

Creates an EventBus service with automatic:

  • Subscription management
  • Task routing and correlation
  • Observability integration

Key Methods

func (s *EventBusService) PublishTask(ctx context.Context, req *pb.PublishTaskRequest) (*pb.PublishResponse, error)

Publishes tasks with automatic:

  • Input validation
  • Correlation ID generation
  • Distributed tracing
  • Metrics collection
func (s *EventBusService) SubscribeToTasks(req *pb.SubscribeToTasksRequest, stream pb.EventBus_SubscribeToTasksServer) error

Manages task subscriptions with:

  • Automatic subscription lifecycle
  • Context cancellation handling
  • Error recovery

SubscriberAgent

High-level subscriber implementation with pluggable task handlers.

type SubscriberAgent struct {
    client      *AgentHubClient
    agentID     string
    handlers    map[string]TaskHandler
    ctx         context.Context
    cancel      context.CancelFunc
}

Constructor

func NewSubscriberAgent(client *AgentHubClient, agentID string) *SubscriberAgent

Task Handler Interface

type TaskHandler interface {
    Handle(ctx context.Context, task *pb.TaskMessage) (*pb.TaskResult, error)
}

Methods

func (s *SubscriberAgent) RegisterHandler(taskType string, handler TaskHandler)

Registers handlers for specific task types with automatic:

  • Task routing
  • Error handling
  • Result publishing
func (s *SubscriberAgent) Start(ctx context.Context) error

Starts the subscriber with automatic:

  • Task subscription
  • Handler dispatch
  • Observability integration

Example:

type GreetingHandler struct{}

func (h *GreetingHandler) Handle(ctx context.Context, task *pb.TaskMessage) (*pb.TaskResult, error) {
    // Process greeting task
    return result, nil
}

// Register handler
subscriber.RegisterHandler("greeting", &GreetingHandler{})

Utility Functions

Metadata Operations

func ExtractCorrelationID(ctx context.Context) string
func InjectCorrelationID(ctx context.Context, correlationID string) context.Context
func GenerateCorrelationID() string

Automatic correlation ID management for distributed tracing.

Metrics Helpers

func NewMetricsTicker(ctx context.Context, manager *observability.MetricsManager) *MetricsTicker

Automatic metrics collection with configurable intervals.

Configuration Reference

Environment Variables

The unified abstraction library uses environment-based configuration:

VariableTypeDefaultDescription
AGENTHUB_BROKER_ADDRstringlocalhostBroker server hostname
AGENTHUB_BROKER_PORTstring50051Broker gRPC port
AGENTHUB_GRPC_PORTstring:50051Server listen address
BROKER_HEALTH_PORTstring8080Health endpoint port
SERVICE_VERSIONstring1.0.0Service version for observability
ENVIRONMENTstringdevelopmentDeployment environment

Observability Integration

The unified abstraction automatically configures:

  • OpenTelemetry Tracing: Automatic span creation and context propagation
  • Prometheus Metrics: 47+ built-in metrics for performance monitoring
  • Health Checks: Comprehensive health endpoints for service monitoring
  • Structured Logging: Correlated logging with trace context

Performance Characteristics

MetricStandard gRPCUnified AbstractionOverhead
Setup Complexity380+ lines~29 lines-92% code
Throughput10,000+ tasks/sec9,500+ tasks/sec-5%
LatencyBaseline+10ms for tracing+10ms
MemoryBaseline+50MB per agent+50MB
CPUBaseline+5% for observability+5%

Migration Guide

From Standard gRPC

Before (Standard gRPC):

// 380+ lines of boilerplate code
lis, err := net.Listen("tcp", ":50051")
server := grpc.NewServer()
// ... extensive setup code

After (Unified Abstraction):

// 29 lines total
config := agenthub.NewGRPCConfig("my-service")
server, err := agenthub.NewAgentHubServer(config)
service := agenthub.NewEventBusService(server)
pb.RegisterEventBusServer(server.Server, service)
server.Start(ctx)

Observability Benefits

The unified abstraction provides automatic:

  1. Distributed Tracing: Every request automatically traced
  2. Metrics Collection: 47+ metrics without configuration
  3. Health Monitoring: Built-in health and readiness endpoints
  4. Error Correlation: Automatic error tracking across services
  5. Performance Monitoring: Latency, throughput, and error rates

Error Handling

The unified abstraction provides comprehensive error handling:

  • Automatic Retries: Built-in retry logic for transient failures
  • Circuit Breaking: Protection against cascading failures
  • Graceful Degradation: Service continues operating during partial failures
  • Error Correlation: Distributed error tracking across service boundaries

Best Practices

1. Configuration Management

// Use environment-based configuration
config := agenthub.NewGRPCConfig("my-service")

// Override specific values if needed
config.HealthPort = "8083"

2. Handler Registration

// Register handlers before starting
subscriber.RegisterHandler("task-type", handler)
subscriber.Start(ctx)

3. Graceful Shutdown

// Always implement proper shutdown
defer func() {
    ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
    defer cancel()
    server.Shutdown(ctx)
}()

4. Error Handling

// Use context for cancellation
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

result, err := client.Client.PublishTask(ctx, request)
if err != nil {
    // Error is automatically traced and logged
    return fmt.Errorf("failed to publish task: %w", err)
}

See Also

3 - Observability

Monitoring, metrics, and observability reference

Observability Reference

This section provides reference documentation for all observability features, including metrics, health endpoints, and monitoring capabilities.

Available Documentation

3.1 - AgentHub Health Endpoints Reference

Complete documentation for AgentHub’s health monitoring APIs, endpoint specifications, status codes, and integration patterns.

AgentHub Health Endpoints Reference

Technical reference: Complete documentation for AgentHub’s health monitoring APIs, endpoint specifications, status codes, and integration patterns.

Overview

Every observable AgentHub service exposes standardized health endpoints for monitoring, load balancing, and operational management.

Standard Endpoints

Health Check Endpoint

/health

Purpose: Comprehensive service health status Method: GET Port: Service-specific (8080-8083)

Response Format:

{
  "status": "healthy|degraded|unhealthy",
  "timestamp": "2025-09-28T21:00:00.000Z",
  "service": "agenthub-broker",
  "version": "1.0.0",
  "uptime": "2h34m12s",
  "checks": [
    {
      "name": "self",
      "status": "healthy",
      "message": "Service is running normally",
      "last_checked": "2025-09-28T21:00:00.000Z",
      "duration": "1.2ms"
    },
    {
      "name": "database_connection",
      "status": "healthy",
      "message": "Database connection is active",
      "last_checked": "2025-09-28T21:00:00.000Z",
      "duration": "15.6ms"
    }
  ]
}

Status Codes:

  • 200 OK - All checks healthy
  • 503 Service Unavailable - One or more checks unhealthy
  • 500 Internal Server Error - Health check system failure

Readiness Endpoint

/ready

Purpose: Service readiness for traffic acceptance Method: GET

Response Format:

{
  "ready": true,
  "timestamp": "2025-09-28T21:00:00.000Z",
  "service": "agenthub-broker",
  "dependencies": [
    {
      "name": "grpc_server",
      "ready": true,
      "message": "gRPC server listening on :50051"
    },
    {
      "name": "observability",
      "ready": true,
      "message": "OpenTelemetry initialized"
    }
  ]
}

Status Codes:

  • 200 OK - Service ready for traffic
  • 503 Service Unavailable - Service not ready

Metrics Endpoint

/metrics

Purpose: Prometheus metrics exposure Method: GET Content-Type: text/plain

Response Format:

# HELP events_processed_total Total number of events processed
# TYPE events_processed_total counter
events_processed_total{service="agenthub-broker",event_type="greeting",success="true"} 1234

# HELP system_cpu_usage_percent CPU usage percentage
# TYPE system_cpu_usage_percent gauge
system_cpu_usage_percent{service="agenthub-broker"} 23.4

Status Codes:

  • 200 OK - Metrics available
  • 500 Internal Server Error - Metrics collection failure

Service-Specific Configurations

Broker (Port 8080)

Health Checks:

  • self - Basic service health
  • grpc_server - gRPC server status
  • observability - OpenTelemetry health

Example URLs:

  • Health: http://localhost:8080/health
  • Ready: http://localhost:8080/ready
  • Metrics: http://localhost:8080/metrics

Publisher (Port 8081)

Health Checks:

  • self - Basic service health
  • broker_connection - Connection to AgentHub broker
  • observability - Tracing and metrics health

Example URLs:

  • Health: http://localhost:8081/health
  • Ready: http://localhost:8081/ready
  • Metrics: http://localhost:8081/metrics

Subscriber (Port 8082)

Health Checks:

  • self - Basic service health
  • broker_connection - Connection to AgentHub broker
  • task_processor - Task processing capability
  • observability - Observability stack health

Example URLs:

  • Health: http://localhost:8082/health
  • Ready: http://localhost:8082/ready
  • Metrics: http://localhost:8082/metrics

Custom Agents (Port 8083+)

Configurable Health Checks:

  • Custom business logic checks
  • External dependency checks
  • Resource availability checks

Health Check Types

BasicHealthChecker

Purpose: Simple function-based health checks

Implementation:

checker := observability.NewBasicHealthChecker("database", func(ctx context.Context) error {
    return db.Ping()
})
healthServer.AddChecker("database", checker)

Use Cases:

  • Database connectivity
  • File system access
  • Configuration validation
  • Memory/disk space checks

GRPCHealthChecker

Purpose: gRPC connection health verification

Implementation:

checker := observability.NewGRPCHealthChecker("broker_connection", "localhost:50051")
healthServer.AddChecker("broker_connection", checker)

Use Cases:

  • AgentHub broker connectivity
  • External gRPC service dependencies
  • Service mesh health

HTTPHealthChecker

Purpose: HTTP endpoint health verification

Implementation:

checker := observability.NewHTTPHealthChecker("api_gateway", "http://gateway:8080/health")
healthServer.AddChecker("api_gateway", checker)

Use Cases:

  • REST API dependencies
  • Web service health
  • Load balancer backends

Custom Health Checkers

Interface:

type HealthChecker interface {
    Check(ctx context.Context) error
    Name() string
}

Custom Implementation Example:

type BusinessLogicChecker struct {
    name string
    validator func() error
}

func (c *BusinessLogicChecker) Check(ctx context.Context) error {
    return c.validator()
}

func (c *BusinessLogicChecker) Name() string {
    return c.name
}

// Usage
checker := &BusinessLogicChecker{
    name: "license_validation",
    validator: func() error {
        if time.Now().After(licenseExpiry) {
            return errors.New("license expired")
        }
        return nil
    },
}

Health Check Configuration

Check Intervals

Default Intervals:

  • Active checks: Every 30 seconds
  • On-demand checks: Per request
  • Startup checks: During service initialization

Configurable Timing:

config := observability.HealthConfig{
    CheckInterval: 15 * time.Second,
    Timeout:       5 * time.Second,
    RetryCount:    3,
    RetryDelay:    1 * time.Second,
}

Timeout Configuration

Per-Check Timeouts:

checker := observability.NewBasicHealthChecker("slow_service",
    func(ctx context.Context) error {
        // This check will timeout after 10 seconds
        return slowOperation(ctx)
    }).WithTimeout(10 * time.Second)

Global Timeout:

healthServer := observability.NewHealthServer("8080", "my-service", "1.0.0")
healthServer.SetGlobalTimeout(30 * time.Second)

Integration Patterns

Kubernetes Integration

Liveness Probe

livenessProbe:
  httpGet:
    path: /health
    port: 8080
  initialDelaySeconds: 30
  periodSeconds: 10
  timeoutSeconds: 5
  failureThreshold: 3

Readiness Probe

readinessProbe:
  httpGet:
    path: /ready
    port: 8080
  initialDelaySeconds: 5
  periodSeconds: 5
  timeoutSeconds: 3
  failureThreshold: 2

Startup Probe

startupProbe:
  httpGet:
    path: /ready
    port: 8080
  initialDelaySeconds: 10
  periodSeconds: 5
  timeoutSeconds: 3
  failureThreshold: 30

Load Balancer Integration

HAProxy Configuration

backend agentHub_brokers
    balance roundrobin
    option httpchk GET /health
    server broker1 broker1:8080 check
    server broker2 broker2:8080 check

NGINX Configuration

upstream agenthub_backend {
    server broker1:8080;
    server broker2:8080;
}

location /health_check {
    proxy_pass http://agenthub_backend/health;
    proxy_set_header Host $host;
}

Prometheus Integration

Service Discovery

- job_name: 'agenthub-health'
  static_configs:
    - targets:
      - 'broker:8080'
      - 'publisher:8081'
      - 'subscriber:8082'
  metrics_path: '/metrics'
  scrape_interval: 10s
  scrape_timeout: 5s

Health Check Metrics

# Health check status (1=healthy, 0=unhealthy)
health_check_status{service="agenthub-broker",check="database"}

# Health check duration
health_check_duration_seconds{service="agenthub-broker",check="database"}

# Service uptime
service_uptime_seconds{service="agenthub-broker"}

Status Definitions

Service Status Levels

Healthy

Definition: All health checks passing HTTP Status: 200 OK Criteria:

  • All registered checks return no error
  • Service is fully operational
  • All dependencies available

Degraded

Definition: Service operational but with limitations HTTP Status: 200 OK (with warning indicators) Criteria:

  • Critical checks passing
  • Non-critical checks may be failing
  • Service can handle requests with reduced functionality

Unhealthy

Definition: Service cannot handle requests properly HTTP Status: 503 Service Unavailable Criteria:

  • One or more critical checks failing
  • Service should not receive new requests
  • Requires intervention or automatic recovery

Check-Level Status

Passing

  • Check completed successfully
  • No errors detected
  • Within acceptable parameters

Warning

  • Check completed with minor issues
  • Service functional but attention needed
  • May indicate future problems

Critical

  • Check failed
  • Service functionality compromised
  • Immediate attention required

Monitoring and Alerting

Critical Alerts

# Service down alert
- alert: ServiceHealthCheckFailing
  expr: health_check_status == 0
  for: 1m
  labels:
    severity: critical
  annotations:
    summary: "Service health check failing"
    description: "{{ $labels.service }} health check {{ $labels.check }} is failing"

# Service not ready alert
- alert: ServiceNotReady
  expr: up{job=~"agenthub-.*"} == 0
  for: 30s
  labels:
    severity: critical
  annotations:
    summary: "Service not responding"
    description: "{{ $labels.instance }} is not responding to health checks"

Warning Alerts

# Slow health checks
- alert: SlowHealthChecks
  expr: health_check_duration_seconds > 5
  for: 5m
  labels:
    severity: warning
  annotations:
    summary: "Health checks taking too long"
    description: "{{ $labels.service }} health check {{ $labels.check }} taking {{ $value }}s"

# Service degraded
- alert: ServiceDegraded
  expr: service_status == 1  # degraded status
  for: 10m
  labels:
    severity: warning
  annotations:
    summary: "Service running in degraded mode"
    description: "{{ $labels.service }} is degraded but still operational"

API Response Examples

Healthy Service Response

curl http://localhost:8080/health
{
  "status": "healthy",
  "timestamp": "2025-09-28T21:00:00.000Z",
  "service": "agenthub-broker",
  "version": "1.0.0",
  "uptime": "2h34m12s",
  "checks": [
    {
      "name": "self",
      "status": "healthy",
      "message": "Service is running normally",
      "last_checked": "2025-09-28T21:00:00.000Z",
      "duration": "1.2ms"
    },
    {
      "name": "grpc_server",
      "status": "healthy",
      "message": "gRPC server listening on :50051",
      "last_checked": "2025-09-28T21:00:00.000Z",
      "duration": "0.8ms"
    },
    {
      "name": "observability",
      "status": "healthy",
      "message": "OpenTelemetry exporter connected",
      "last_checked": "2025-09-28T21:00:00.000Z",
      "duration": "12.4ms"
    }
  ]
}

Unhealthy Service Response

curl http://localhost:8080/health
{
  "status": "unhealthy",
  "timestamp": "2025-09-28T21:00:00.000Z",
  "service": "agenthub-broker",
  "version": "1.0.0",
  "uptime": "2h34m12s",
  "checks": [
    {
      "name": "self",
      "status": "healthy",
      "message": "Service is running normally",
      "last_checked": "2025-09-28T21:00:00.000Z",
      "duration": "1.2ms"
    },
    {
      "name": "grpc_server",
      "status": "unhealthy",
      "message": "Failed to bind to port :50051: address already in use",
      "last_checked": "2025-09-28T21:00:00.000Z",
      "duration": "0.1ms"
    },
    {
      "name": "observability",
      "status": "healthy",
      "message": "OpenTelemetry exporter connected",
      "last_checked": "2025-09-28T21:00:00.000Z",
      "duration": "12.4ms"
    }
  ]
}

Best Practices

Health Check Design

  1. Fast Execution: Keep checks under 5 seconds
  2. Meaningful Tests: Test actual functionality, not just process existence
  3. Idempotent Operations: Checks should not modify system state
  4. Appropriate Timeouts: Set reasonable timeouts for external dependencies
  5. Clear Messages: Provide actionable error messages

Dependency Management

  1. Critical vs Non-Critical: Distinguish between essential and optional dependencies
  2. Cascade Prevention: Avoid cascading failures through dependency chains
  3. Circuit Breakers: Implement circuit breakers for flaky dependencies
  4. Graceful Degradation: Continue operating when non-critical dependencies fail

Operational Considerations

  1. Monitoring: Set up alerts for health check failures
  2. Documentation: Document what each health check validates
  3. Testing: Test health checks in development and staging
  4. Versioning: Version health check APIs for compatibility

🎯 Next Steps:

Implementation: Add Observability to Your Agent

Monitoring: Use Grafana Dashboards

Metrics: Observability Metrics Reference

3.2 - AgentHub Observability Metrics Reference

Complete catalog of all metrics exposed by AgentHub’s observability system, their meanings, usage patterns, and query examples.

AgentHub Observability Metrics Reference

Technical reference: Complete catalog of all metrics exposed by AgentHub’s observability system, their meanings, usage patterns, and query examples.

Overview

AgentHub automatically collects 47+ distinct metrics across all observable services, providing comprehensive visibility into event processing, system health, and performance characteristics.

Metric Categories

A2A Message Processing Metrics

a2a_messages_processed_total

Type: Counter Description: Total number of A2A messages processed by service Labels:

  • service - Service name (agenthub, publisher, subscriber)
  • message_type - Type of A2A message (task_update, message, artifact)
  • success - Processing success (true/false)
  • context_id - A2A conversation context (for workflow tracking)

Usage:

# A2A message processing rate per service
rate(a2a_messages_processed_total[5m])

# Success rate by A2A message type
rate(a2a_messages_processed_total{success="true"}[5m]) / rate(a2a_messages_processed_total[5m]) * 100

# Error rate across all A2A services
rate(a2a_messages_processed_total{success="false"}[5m]) / rate(a2a_messages_processed_total[5m]) * 100

# Workflow processing rate by context
rate(a2a_messages_processed_total[5m]) by (context_id)

a2a_messages_published_total

Type: Counter Description: Total number of A2A messages published by agents Labels:

  • message_type - Type of A2A message published
  • from_agent_id - Publishing agent identifier
  • to_agent_id - Target agent identifier (empty for broadcast)

Usage:

# A2A publishing rate by message type
rate(a2a_messages_published_total[5m]) by (message_type)

# Most active A2A publishers
topk(5, rate(a2a_messages_published_total[5m]) by (from_agent_id))

# Broadcast vs direct messaging ratio
rate(a2a_messages_published_total{to_agent_id=""}[5m]) / rate(a2a_messages_published_total[5m])

a2a_message_processing_duration_seconds

Type: Histogram Description: Time taken to process A2A messages Labels:

  • service - Service processing the message
  • message_type - Type of A2A message being processed
  • task_state - Current A2A task state (for task-related messages)

Buckets: 0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10

Usage:

# p95 A2A message processing latency
histogram_quantile(0.95, rate(a2a_message_processing_duration_seconds_bucket[5m]))

# p99 latency by service
histogram_quantile(0.99, rate(a2a_message_processing_duration_seconds_bucket[5m])) by (service)

# Average A2A processing time by task state
rate(a2a_message_processing_duration_seconds_sum[5m]) / rate(a2a_message_processing_duration_seconds_count[5m]) by (task_state)

a2a_message_errors_total

Type: Counter Description: Total number of A2A message processing errors Labels:

  • service - Service where error occurred
  • message_type - Type of A2A message that failed
  • error_type - Category of error (grpc_error, validation_error, protocol_error, etc.)
  • a2a_version - A2A protocol version for compatibility tracking

Usage:

# A2A error rate by error type
rate(a2a_message_errors_total[5m]) by (error_type)

# Services with highest A2A error rates
topk(3, rate(a2a_message_errors_total[5m]) by (service))

# A2A protocol version compatibility issues
rate(a2a_message_errors_total{error_type="protocol_error"}[5m]) by (a2a_version)

AgentHub Broker Metrics

agenthub_connections_total

Type: Gauge Description: Number of active agent connections to AgentHub broker Labels:

  • connection_type - Type of connection (a2a_publisher, a2a_subscriber, unified)
  • agent_type - Classification of connected agent

Usage:

# Current AgentHub connection count
agenthub_connections_total

# A2A connection growth over time
increase(agenthub_connections_total[1h])

# Connection distribution by type
agenthub_connections_total by (connection_type)

agenthub_subscriptions_total

Type: Gauge Description: Number of active A2A message subscriptions Labels:

  • agent_id - Subscriber agent identifier
  • subscription_type - Type of A2A subscription (tasks, messages, agent_events)
  • filter_criteria - Applied subscription filters (task_types, states, etc.)

Usage:

# Total active A2A subscriptions
sum(agenthub_subscriptions_total)

# A2A subscriptions by agent
sum(agenthub_subscriptions_total) by (agent_id)

# Most popular A2A subscription types
sum(agenthub_subscriptions_total) by (subscription_type)

# Filtered vs unfiltered subscriptions
sum(agenthub_subscriptions_total{filter_criteria!=""}) / sum(agenthub_subscriptions_total)

agenthub_message_routing_duration_seconds

Type: Histogram Description: Time taken to route A2A messages through AgentHub broker Labels:

  • routing_type - Type of routing (direct, broadcast, filtered)
  • message_size_bucket - Message size classification (small, medium, large)

Buckets: 0.0001, 0.0005, 0.001, 0.005, 0.01, 0.025, 0.05, 0.1

Usage:

# AgentHub A2A routing latency percentiles
histogram_quantile(0.95, rate(agenthub_message_routing_duration_seconds_bucket[5m]))

# A2A routing performance by type
rate(agenthub_message_routing_duration_seconds_sum[5m]) / rate(agenthub_message_routing_duration_seconds_count[5m]) by (routing_type)

# Message size impact on routing
histogram_quantile(0.95, rate(agenthub_message_routing_duration_seconds_bucket[5m])) by (message_size_bucket)

agenthub_queue_size

Type: Gauge Description: Number of A2A messages queued awaiting routing Labels:

  • queue_type - Type of queue (incoming, outgoing, dead_letter, retry)
  • priority - Message priority level
  • context_active - Whether messages belong to active A2A contexts

Usage:

# Current A2A queue sizes
agenthub_queue_size by (queue_type)

# A2A queue growth rate
rate(agenthub_queue_size[5m])

# Priority queue distribution
agenthub_queue_size by (priority)

# Active context message backlog
agenthub_queue_size{context_active="true"}

System Health Metrics

system_cpu_usage_percent

Type: Gauge Description: CPU utilization percentage Labels:

  • service - Service name

Usage:

# Current CPU usage
system_cpu_usage_percent

# High CPU services
system_cpu_usage_percent > 80

# Average CPU over time
avg_over_time(system_cpu_usage_percent[1h])

system_memory_usage_bytes

Type: Gauge Description: Memory usage in bytes Labels:

  • service - Service name
  • type - Memory type (heap, stack, total)

Usage:

# Memory usage in MB
system_memory_usage_bytes / 1024 / 1024

# Memory growth rate
rate(system_memory_usage_bytes[10m])

# Memory usage by type
system_memory_usage_bytes by (type)

system_goroutines_total

Type: Gauge Description: Number of active goroutines Labels:

  • service - Service name

Usage:

# Current goroutine count
system_goroutines_total

# Goroutine leaks detection
increase(system_goroutines_total[1h]) > 1000

# Goroutine efficiency
system_goroutines_total / system_cpu_usage_percent

system_file_descriptors_used

Type: Gauge Description: Number of open file descriptors Labels:

  • service - Service name

Usage:

# Current FD usage
system_file_descriptors_used

# FD growth rate
rate(system_file_descriptors_used[5m])

A2A Task-Specific Metrics

a2a_tasks_created_total

Type: Counter Description: Total number of A2A tasks created Labels:

  • task_type - Type classification of the task
  • context_id - A2A conversation context
  • priority - Task priority level

Usage:

# A2A task creation rate
rate(a2a_tasks_created_total[5m])

# Task creation by type
rate(a2a_tasks_created_total[5m]) by (task_type)

# High priority task rate
rate(a2a_tasks_created_total{priority="PRIORITY_HIGH"}[5m])

a2a_task_state_transitions_total

Type: Counter Description: Total number of A2A task state transitions Labels:

  • from_state - Previous task state
  • to_state - New task state
  • task_type - Type of task transitioning

Usage:

# Task completion rate
rate(a2a_task_state_transitions_total{to_state="TASK_STATE_COMPLETED"}[5m])

# Task failure rate
rate(a2a_task_state_transitions_total{to_state="TASK_STATE_FAILED"}[5m])

# Task state transition patterns
rate(a2a_task_state_transitions_total[5m]) by (from_state, to_state)

a2a_task_duration_seconds

Type: Histogram Description: Duration of A2A task execution from submission to completion Labels:

  • task_type - Type of task
  • final_state - Final task state (COMPLETED, FAILED, CANCELLED)

Buckets: 0.1, 0.5, 1, 5, 10, 30, 60, 300, 600, 1800

Usage:

# A2A task completion time percentiles
histogram_quantile(0.95, rate(a2a_task_duration_seconds_bucket{final_state="TASK_STATE_COMPLETED"}[5m]))

# Task duration by type
histogram_quantile(0.50, rate(a2a_task_duration_seconds_bucket[5m])) by (task_type)

# Failed vs successful task duration comparison
histogram_quantile(0.95, rate(a2a_task_duration_seconds_bucket[5m])) by (final_state)

a2a_artifacts_produced_total

Type: Counter Description: Total number of A2A artifacts produced by completed tasks Labels:

  • artifact_type - Type of artifact (data, file, text)
  • task_type - Type of task that produced the artifact
  • artifact_size_bucket - Size classification of artifact

Usage:

# Artifact production rate
rate(a2a_artifacts_produced_total[5m])

# Artifacts by type
rate(a2a_artifacts_produced_total[5m]) by (artifact_type)

# Large artifact production rate
rate(a2a_artifacts_produced_total{artifact_size_bucket="large"}[5m])

gRPC Metrics

grpc_server_started_total

Type: Counter Description: Total number of RPCs started on the AgentHub server Labels:

  • grpc_method - gRPC method name (PublishMessage, SubscribeToTasks, etc.)
  • grpc_service - gRPC service name (AgentHub)

Usage:

# AgentHub RPC request rate
rate(grpc_server_started_total[5m])

# Most called A2A methods
topk(5, rate(grpc_server_started_total[5m]) by (grpc_method))

# A2A vs EDA method usage
rate(grpc_server_started_total{grpc_method=~".*Message.*|.*Task.*"}[5m])

grpc_server_handled_total

Type: Counter Description: Total number of RPCs completed on the AgentHub server Labels:

  • grpc_method - gRPC method name
  • grpc_service - gRPC service name (AgentHub)
  • grpc_code - gRPC status code
  • a2a_operation - A2A operation type (publish, subscribe, get, cancel)

Usage:

# AgentHub RPC success rate
rate(grpc_server_handled_total{grpc_code="OK"}[5m]) / rate(grpc_server_handled_total[5m]) * 100

# A2A operation error rate
rate(grpc_server_handled_total{grpc_code!="OK"}[5m]) by (a2a_operation)

# A2A method-specific success rates
rate(grpc_server_handled_total{grpc_code="OK"}[5m]) / rate(grpc_server_handled_total[5m]) by (grpc_method)

grpc_server_handling_seconds

Type: Histogram Description: Histogram of response latency of AgentHub RPCs Labels:

  • grpc_method - gRPC method name
  • grpc_service - gRPC service name (AgentHub)
  • a2a_operation - A2A operation type

Usage:

# AgentHub gRPC latency percentiles
histogram_quantile(0.95, rate(grpc_server_handling_seconds_bucket[5m]))

# Slow A2A operations
histogram_quantile(0.95, rate(grpc_server_handling_seconds_bucket[5m])) by (a2a_operation) > 0.1

# A2A method performance comparison
histogram_quantile(0.95, rate(grpc_server_handling_seconds_bucket[5m])) by (grpc_method)

Health Check Metrics

health_check_status

Type: Gauge Description: Health check status (1=healthy, 0=unhealthy) Labels:

  • service - Service name
  • check_name - Name of the health check
  • endpoint - Health check endpoint

Usage:

# Unhealthy services
health_check_status == 0

# Health check success rate
avg_over_time(health_check_status[5m])

health_check_duration_seconds

Type: Histogram Description: Time taken to execute health checks Labels:

  • service - Service name
  • check_name - Name of the health check

Usage:

# Health check latency
histogram_quantile(0.95, rate(health_check_duration_seconds_bucket[5m]))

# Slow health checks
histogram_quantile(0.95, rate(health_check_duration_seconds_bucket[5m])) by (check_name) > 0.5

OpenTelemetry Metrics

otelcol_processor_batch_batch_send_size_count

Type: Counter Description: Number of batches sent by OTEL collector Labels: None

otelcol_exporter_sent_spans

Type: Counter Description: Number of spans sent to tracing backend Labels:

  • exporter - Exporter name (jaeger, otlp)

Usage:

# Span export rate
rate(otelcol_exporter_sent_spans[5m])

# Export success by backend
rate(otelcol_exporter_sent_spans[5m]) by (exporter)

Common Query Patterns

A2A Performance Analysis

# Top 5 slowest A2A message types
topk(5,
  histogram_quantile(0.95,
    rate(a2a_message_processing_duration_seconds_bucket[5m])
  ) by (message_type)
)

# A2A task completion time analysis
histogram_quantile(0.95,
  rate(a2a_task_duration_seconds_bucket{final_state="TASK_STATE_COMPLETED"}[5m])
) by (task_type)

# Services exceeding A2A latency SLA (>500ms p95)
histogram_quantile(0.95,
  rate(a2a_message_processing_duration_seconds_bucket[5m])
) by (service) > 0.5

# A2A throughput efficiency (messages per CPU percent)
rate(a2a_messages_processed_total[5m]) / system_cpu_usage_percent

# Task success rate by type
rate(a2a_task_state_transitions_total{to_state="TASK_STATE_COMPLETED"}[5m]) /
rate(a2a_tasks_created_total[5m]) by (task_type)

A2A Error Analysis

# A2A message error rate by service over time
rate(a2a_message_errors_total[5m]) / rate(a2a_messages_processed_total[5m]) * 100

# A2A task failure rate
rate(a2a_task_state_transitions_total{to_state="TASK_STATE_FAILED"}[5m]) /
rate(a2a_tasks_created_total[5m]) * 100

# Most common A2A error types
topk(5, rate(a2a_message_errors_total[5m]) by (error_type))

# A2A protocol compatibility issues
rate(a2a_message_errors_total{error_type="protocol_error"}[5m]) by (a2a_version)

# Services with increasing A2A error rates
increase(a2a_message_errors_total[1h]) by (service) > 10

A2A Capacity Planning

# Peak hourly A2A message throughput
max_over_time(
  rate(a2a_messages_processed_total[5m])[1h:]
) * 3600

# Peak A2A task creation rate
max_over_time(
  rate(a2a_tasks_created_total[5m])[1h:]
) * 3600

# Resource utilization during peak A2A load
(
  max_over_time(system_cpu_usage_percent[1h:]) +
  max_over_time(system_memory_usage_bytes[1h:] / 1024 / 1024 / 1024)
) by (service)

# AgentHub connection scaling needs
max_over_time(agenthub_connections_total[24h:])

# A2A queue depth trends
max_over_time(agenthub_queue_size[24h:]) by (queue_type)

A2A System Health

# Overall A2A system health score (0-1)
avg(health_check_status)

# A2A services with degraded performance
(
  system_cpu_usage_percent > 70 or
  system_memory_usage_bytes > 1e9 or
  rate(a2a_message_errors_total[5m]) / rate(a2a_messages_processed_total[5m]) > 0.05
)

# A2A task backlog health
agenthub_queue_size{queue_type="incoming"} > 1000

# A2A protocol health indicators
rate(a2a_task_state_transitions_total{to_state="TASK_STATE_FAILED"}[5m]) /
rate(a2a_tasks_created_total[5m]) > 0.1

# Resource leak detection
increase(system_goroutines_total[1h]) > 1000 or
increase(system_file_descriptors_used[1h]) > 100

Alert Rule Examples

Critical A2A Alerts

# High A2A message processing error rate alert
- alert: HighA2AMessageProcessingErrorRate
  expr: |
    (
      rate(a2a_message_errors_total[5m]) /
      rate(a2a_messages_processed_total[5m])
    ) * 100 > 10    
  for: 2m
  annotations:
    summary: "High A2A message processing error rate"
    description: "{{ $labels.service }} has {{ $value }}% A2A error rate"

# High A2A task failure rate alert
- alert: HighA2ATaskFailureRate
  expr: |
    (
      rate(a2a_task_state_transitions_total{to_state="TASK_STATE_FAILED"}[5m]) /
      rate(a2a_tasks_created_total[5m])
    ) * 100 > 15    
  for: 3m
  annotations:
    summary: "High A2A task failure rate"
    description: "{{ $value }}% of A2A tasks are failing for task type {{ $labels.task_type }}"

# AgentHub service down alert
- alert: AgentHubServiceDown
  expr: health_check_status == 0
  for: 1m
  annotations:
    summary: "AgentHub service health check failing"
    description: "{{ $labels.service }} health check {{ $labels.check_name }} is failing"

# A2A queue backlog alert
- alert: A2AQueueBacklog
  expr: agenthub_queue_size{queue_type="incoming"} > 1000
  for: 5m
  annotations:
    summary: "A2A message queue backlog"
    description: "AgentHub has {{ $value }} messages queued"

A2A Warning Alerts

# High A2A message processing latency warning
- alert: HighA2AMessageProcessingLatency
  expr: |
    histogram_quantile(0.95,
      rate(a2a_message_processing_duration_seconds_bucket[5m])
    ) > 0.5    
  for: 5m
  annotations:
    summary: "High A2A message processing latency"
    description: "{{ $labels.service }} A2A p95 latency is {{ $value }}s"

# Slow A2A task completion warning
- alert: SlowA2ATaskCompletion
  expr: |
    histogram_quantile(0.95,
      rate(a2a_task_duration_seconds_bucket{final_state="TASK_STATE_COMPLETED"}[5m])
    ) > 300    
  for: 10m
  annotations:
    summary: "Slow A2A task completion"
    description: "A2A tasks of type {{ $labels.task_type }} taking {{ $value }}s to complete"

# High CPU usage warning
- alert: HighCPUUsage
  expr: system_cpu_usage_percent > 80
  for: 5m
  annotations:
    summary: "High CPU usage"
    description: "{{ $labels.service }} CPU usage is {{ $value }}%"

# A2A protocol version compatibility warning
- alert: A2AProtocolVersionMismatch
  expr: |
    rate(a2a_message_errors_total{error_type="protocol_error"}[5m]) > 0.1    
  for: 3m
  annotations:
    summary: "A2A protocol version compatibility issues"
    description: "A2A protocol errors detected for version {{ $labels.a2a_version }}"

Metric Retention and Storage

Retention Policies

  • Raw metrics: 15 days at 15-second resolution
  • 5m averages: 60 days
  • 1h averages: 1 year
  • 1d averages: 5 years

Storage Requirements

  • Per service: ~2MB/day for all metrics
  • Complete system: ~10MB/day for 5 services
  • 1 year retention: ~3.6GB total

Performance Considerations

  • Scrape interval: 10 seconds (configurable)
  • Evaluation interval: 15 seconds for alerts
  • Query timeout: 30 seconds
  • Max samples: 50M per query

Integration Examples

Grafana Dashboard Variables

{
  "service": {
    "query": "label_values(a2a_messages_processed_total, service)",
    "refresh": "on_time_range_changed"
  },
  "message_type": {
    "query": "label_values(a2a_messages_processed_total{service=\"$service\"}, message_type)",
    "refresh": "on_dashboard_load"
  },
  "task_type": {
    "query": "label_values(a2a_tasks_created_total, task_type)",
    "refresh": "on_dashboard_load"
  },
  "context_id": {
    "query": "label_values(a2a_messages_processed_total{service=\"$service\"}, context_id)",
    "refresh": "on_dashboard_load"
  }
}

Custom A2A Application Metrics

// Register custom A2A counter
a2aCustomCounter, err := meter.Int64Counter(
    "a2a_custom_business_metric_total",
    metric.WithDescription("Custom A2A business metric"),
)

// Increment with A2A context and labels
a2aCustomCounter.Add(ctx, 1, metric.WithAttributes(
    attribute.String("task_type", "custom_analysis"),
    attribute.String("context_id", contextID),
    attribute.String("agent_type", "analytics_agent"),
    attribute.String("a2a_version", "1.0"),
))

// Register A2A task-specific histogram
a2aTaskHistogram, err := meter.Float64Histogram(
    "a2a_custom_task_processing_seconds",
    metric.WithDescription("Custom A2A task processing time"),
    metric.WithUnit("s"),
)

// Record A2A task timing
start := time.Now()
// ... process A2A task ...
duration := time.Since(start).Seconds()
a2aTaskHistogram.Record(ctx, duration, metric.WithAttributes(
    attribute.String("task_type", taskType),
    attribute.String("task_state", "TASK_STATE_COMPLETED"),
))

Troubleshooting Metrics

Missing Metrics Checklist

  1. ✅ Service built with -tags observability
  2. ✅ Prometheus can reach metrics endpoint
  3. ✅ Correct port in Prometheus config
  4. ✅ Service is actually processing events
  5. ✅ OpenTelemetry exporter configured correctly

High Cardinality Warning

Avoid metrics with unbounded label values:

  • ❌ User IDs as labels (millions of values)
  • ❌ Timestamps as labels
  • ❌ Request IDs as labels
  • ✅ Event types (limited set)
  • ✅ Service names (limited set)
  • ✅ Status codes (limited set)

🎯 Next Steps:

Implementation: Add Observability to Your Agent

Monitoring: Use Grafana Dashboards

Understanding: Distributed Tracing Explained

4 - Tasks

Task message specifications and reference

Tasks Reference

This section provides detailed reference documentation for task messages, specifications, and data structures used throughout AgentHub.

Available Documentation

  • Task Reference - Detailed task message specifications and data structures

4.1 - A2A Task Reference

Comprehensive reference for all task-related message types and operations in the Agent2Agent protocol implementation.

A2A Task Reference

This document provides a comprehensive reference for all task-related message types and operations in the Agent2Agent (A2A) protocol implementation within AgentHub’s hybrid Event-Driven Architecture.

Core A2A Task Types

A2A Task

The primary message type for managing work requests between agents in the Agent2Agent protocol.

message Task {
  string id = 1;                    // Required: Task identifier
  string context_id = 2;            // Optional: Conversation context
  TaskStatus status = 3;            // Required: Current task status
  repeated Message history = 4;     // Message history for this task
  repeated Artifact artifacts = 5;  // Task output artifacts
  google.protobuf.Struct metadata = 6; // Task metadata
}

Field Reference

FieldTypeRequiredDescription
idstringYesGlobally unique identifier for the task
context_idstringNoGroups related tasks in a workflow or conversation
statusTaskStatusYesCurrent execution state and last update
historyMessage[]NoComplete message history for this task
artifactsArtifact[]NoOutput artifacts produced by the task
metadataStructNoAdditional context information

Task ID Format

Task IDs should be globally unique and meaningful for debugging:

// Recommended formats:
taskID := fmt.Sprintf("task_%s_%d", taskType, time.Now().Unix())
taskID := fmt.Sprintf("task_%s_%s", taskType, uuid.New().String())
taskID := fmt.Sprintf("%s_%s_%d", requesterID, taskType, sequence)

A2A TaskStatus

Represents the current state and latest update for a task.

message TaskStatus {
  TaskState state = 1;              // Current task state
  Message update = 2;               // Status update message
  google.protobuf.Timestamp timestamp = 3; // Status timestamp
}

Field Reference

FieldTypeRequiredDescription
stateTaskStateYesCurrent execution state
updateMessageNoLatest status message from the executing agent
timestampTimestampYesWhen this status was last updated

A2A Message

Agent-to-agent communication within task context.

message Message {
  string message_id = 1;       // Required: Unique message identifier
  string context_id = 2;       // Optional: Conversation context
  string task_id = 3;          // Optional: Associated task
  Role role = 4;               // Required: USER or AGENT
  repeated Part content = 5;   // Required: Message content parts
  google.protobuf.Struct metadata = 6; // Optional: Additional metadata
  repeated string extensions = 7;       // Optional: Protocol extensions
}

Message Content Parts

Messages contain structured content using A2A Part definitions:

message Part {
  oneof part {
    string text = 1;           // Text content
    DataPart data = 2;         // Structured data
    FilePart file = 3;         // File reference
  }
}

message DataPart {
  google.protobuf.Struct data = 1;    // Structured data content
  string description = 2;             // Optional data description
}

message FilePart {
  string file_id = 1;                 // File identifier or URI
  string filename = 2;                // Original filename
  string mime_type = 3;               // MIME type
  int64 size_bytes = 4;               // File size in bytes
  google.protobuf.Struct metadata = 5; // Additional file metadata
}

A2A Artifact

Structured output produced by completed tasks.

message Artifact {
  string artifact_id = 1;           // Required: Artifact identifier
  string name = 2;                  // Human-readable name
  string description = 3;           // Artifact description
  repeated Part parts = 4;          // Artifact content parts
  google.protobuf.Struct metadata = 5; // Artifact metadata
}

Field Reference

FieldTypeRequiredDescription
artifact_idstringYesUnique identifier for this artifact
namestringNoHuman-readable artifact name
descriptionstringNoDescription of the artifact contents
partsPart[]YesStructured content using A2A Part format
metadataStructNoAdditional artifact information

Enumerations

TaskState

Current state of A2A task execution.

enum TaskState {
  TASK_STATE_SUBMITTED = 0;    // Task created and submitted
  TASK_STATE_WORKING = 1;      // Task in progress
  TASK_STATE_COMPLETED = 2;    // Task completed successfully
  TASK_STATE_FAILED = 3;       // Task failed with error
  TASK_STATE_CANCELLED = 4;    // Task cancelled
}

State Transition Rules

Valid state transitions:

TASK_STATE_SUBMITTED → TASK_STATE_WORKING → TASK_STATE_COMPLETED
TASK_STATE_SUBMITTED → TASK_STATE_WORKING → TASK_STATE_FAILED
TASK_STATE_SUBMITTED → TASK_STATE_WORKING → TASK_STATE_CANCELLED
TASK_STATE_SUBMITTED → TASK_STATE_CANCELLED (before execution starts)

Invalid transitions:

  • Any state → TASK_STATE_SUBMITTED
  • TASK_STATE_COMPLETED → any other state
  • TASK_STATE_FAILED → any other state (except for retry scenarios)

Role

Identifies the role of the message sender in A2A communication.

enum Role {
  USER = 0;    // Message from requesting agent
  AGENT = 1;   // Message from responding agent
}

Priority

Task priority levels for scheduling and resource allocation.

enum Priority {
  PRIORITY_UNSPECIFIED = 0;  // Default value, treated as MEDIUM
  PRIORITY_LOW = 1;          // Low priority, can be delayed
  PRIORITY_MEDIUM = 2;       // Normal priority
  PRIORITY_HIGH = 3;         // High priority, expedited processing
  PRIORITY_CRITICAL = 4;     // Critical priority, immediate processing
}

Priority Usage Guidelines

PriorityUse CasesSLA Expectations
LOWBackground jobs, cleanup tasks, analyticsHours to days
MEDIUMStandard user requests, routine processingMinutes to hours
HIGHUser-visible operations, time-sensitive tasksSeconds to minutes
CRITICALEmergency operations, system health tasksImmediate

AgentHub EDA Request/Response Messages

Task Publishing

PublishTaskUpdateRequest

Request to publish a task status update through the EDA broker.

message PublishTaskUpdateRequest {
  a2a.Task task = 1;                      // Updated A2A task
  AgentEventMetadata routing = 2;         // EDA routing metadata
}

PublishTaskArtifactRequest

Request to publish a task artifact through the EDA broker.

message PublishTaskArtifactRequest {
  string task_id = 1;                     // Associated task ID
  a2a.Artifact artifact = 2;              // A2A artifact
  AgentEventMetadata routing = 3;         // EDA routing metadata
}

Task Subscription

SubscribeToTasksRequest

Request to subscribe to A2A task events through the EDA broker.

message SubscribeToTasksRequest {
  string agent_id = 1;                    // Agent ID for subscription
  repeated string task_types = 2;         // Optional task type filter
  repeated a2a.TaskState states = 3;      // Optional state filter
}

Usage Examples

// Subscribe to all tasks for this agent
req := &pb.SubscribeToTasksRequest{
    AgentId: "data_processor_01",
}

// Subscribe only to working and completed tasks
req := &pb.SubscribeToTasksRequest{
    AgentId: "workflow_orchestrator",
    States: []a2a.TaskState{
        a2a.TaskState_TASK_STATE_WORKING,
        a2a.TaskState_TASK_STATE_COMPLETED,
    },
}

Task Management

GetTaskRequest

Request to retrieve the current state of an A2A task.

message GetTaskRequest {
  string task_id = 1;                     // Task identifier
  int32 history_length = 2;               // History limit (optional)
}

CancelTaskRequest

Request to cancel an active A2A task.

message CancelTaskRequest {
  string task_id = 1;                     // Task to cancel
  string reason = 2;                      // Cancellation reason
}

ListTasksRequest

Request to list A2A tasks matching criteria.

message ListTasksRequest {
  string agent_id = 1;                    // Filter by agent
  repeated a2a.TaskState states = 2;      // Filter by states
  google.protobuf.Timestamp since = 3;    // Filter by timestamp
  int32 limit = 4;                        // Results limit
}

gRPC Service Methods

Task Publishing Methods

PublishTaskUpdate

Publishes a task status update to the EDA broker.

rpc PublishTaskUpdate (PublishTaskUpdateRequest) returns (PublishResponse);

Example:

// Create updated task status
status := &a2a.TaskStatus{
    State: a2a.TaskState_TASK_STATE_WORKING,
    Update: &a2a.Message{
        MessageId: "msg_" + uuid.New().String(),
        TaskId:    taskID,
        Role:      a2a.Role_AGENT,
        Content: []*a2a.Part{
            {
                Part: &a2a.Part_Text{
                    Text: "Processing data analysis...",
                },
            },
        },
    },
    Timestamp: timestamppb.Now(),
}

task := &a2a.Task{
    Id:     taskID,
    Status: status,
}

req := &pb.PublishTaskUpdateRequest{
    Task: task,
    Routing: &pb.AgentEventMetadata{
        FromAgentId: "processor_01",
        EventType:   "task.status_update",
    },
}
res, err := client.PublishTaskUpdate(ctx, req)

PublishTaskArtifact

Publishes a task artifact to the EDA broker.

rpc PublishTaskArtifact (PublishTaskArtifactRequest) returns (PublishResponse);

Example:

// Create artifact with results
artifact := &a2a.Artifact{
    ArtifactId:  "artifact_" + uuid.New().String(),
    Name:        "Analysis Results",
    Description: "Statistical analysis of sales data",
    Parts: []*a2a.Part{
        {
            Part: &a2a.Part_Data{
                Data: &a2a.DataPart{
                    Data: structData, // Contains analysis results
                    Description: "Sales analysis summary statistics",
                },
            },
        },
        {
            Part: &a2a.Part_File{
                File: &a2a.FilePart{
                    FileId:   "file_123",
                    Filename: "analysis_report.pdf",
                    MimeType: "application/pdf",
                    SizeBytes: 1024576,
                },
            },
        },
    },
}

req := &pb.PublishTaskArtifactRequest{
    TaskId:   taskID,
    Artifact: artifact,
    Routing: &pb.AgentEventMetadata{
        FromAgentId: "processor_01",
        EventType:   "task.artifact",
    },
}
res, err := client.PublishTaskArtifact(ctx, req)

Task Subscription Methods

SubscribeToTasks

Subscribes to receive A2A task events through the EDA broker.

rpc SubscribeToTasks (SubscribeToTasksRequest) returns (stream AgentEvent);

Returns: Stream of AgentEvent objects containing A2A task updates

Example:

req := &pb.SubscribeToTasksRequest{
    AgentId: "processor_01",
    States: []a2a.TaskState{a2a.TaskState_TASK_STATE_SUBMITTED},
}
stream, err := client.SubscribeToTasks(ctx, req)

for {
    event, err := stream.Recv()
    if err != nil {
        break
    }

    // Extract A2A task from event
    if task := event.GetTask(); task != nil {
        go processA2ATask(task)
    }
}

Task Management Methods

GetTask

Retrieves the current state of an A2A task by ID.

rpc GetTask (GetTaskRequest) returns (a2a.Task);

CancelTask

Cancels an active A2A task and notifies subscribers.

rpc CancelTask (CancelTaskRequest) returns (a2a.Task);

ListTasks

Returns A2A tasks matching the specified criteria.

rpc ListTasks (ListTasksRequest) returns (ListTasksResponse);

A2A Task Workflow Patterns

Simple Request-Response

// 1. Agent A creates and publishes task request
task := &a2a.Task{
    Id:        "task_analysis_123",
    ContextId: "workflow_456",
    Status: &a2a.TaskStatus{
        State: a2a.TaskState_TASK_STATE_SUBMITTED,
        Update: &a2a.Message{
            MessageId: "msg_" + uuid.New().String(),
            TaskId:    "task_analysis_123",
            Role:      a2a.Role_USER,
            Content: []*a2a.Part{
                {
                    Part: &a2a.Part_Text{
                        Text: "Please analyze the Q4 sales data",
                    },
                },
                {
                    Part: &a2a.Part_Data{
                        Data: &a2a.DataPart{
                            Data: dataStruct, // Contains parameters
                        },
                    },
                },
            },
        },
        Timestamp: timestamppb.Now(),
    },
}

// 2. Agent B receives task and updates status to WORKING
// 3. Agent B publishes progress updates during execution
// 4. Agent B publishes final artifacts and COMPLETED status

Multi-Step Workflow

// 1. Orchestrator creates main task
mainTask := &a2a.Task{
    Id:        "workflow_main_789",
    ContextId: "workflow_context_789",
    // ... initial message
}

// 2. Create subtasks with same context_id
subtask1 := &a2a.Task{
    Id:        "subtask_data_prep_790",
    ContextId: "workflow_context_789", // Same context
    // ... data preparation request
}

subtask2 := &a2a.Task{
    Id:        "subtask_analysis_791",
    ContextId: "workflow_context_789", // Same context
    // ... analysis request (depends on subtask1)
}

// 3. Tasks linked by context_id for workflow tracking

Error Handling Reference

A2A Task Error Patterns

Parameter Validation Errors

// Task fails with validation error
failedTask := &a2a.Task{
    Id: taskID,
    Status: &a2a.TaskStatus{
        State: a2a.TaskState_TASK_STATE_FAILED,
        Update: &a2a.Message{
            Role: a2a.Role_AGENT,
            Content: []*a2a.Part{
                {
                    Part: &a2a.Part_Text{
                        Text: "Task failed: Required parameter 'dataset_path' is missing",
                    },
                },
                {
                    Part: &a2a.Part_Data{
                        Data: &a2a.DataPart{
                            Data: errorDetails, // Structured error info
                            Description: "Validation error details",
                        },
                    },
                },
            },
        },
        Timestamp: timestamppb.Now(),
    },
}

Resource Errors

// Task fails due to resource unavailability
failedTask := &a2a.Task{
    Id: taskID,
    Status: &a2a.TaskStatus{
        State: a2a.TaskState_TASK_STATE_FAILED,
        Update: &a2a.Message{
            Role: a2a.Role_AGENT,
            Content: []*a2a.Part{
                {
                    Part: &a2a.Part_Text{
                        Text: "Cannot access dataset file: /data/sales_2024.csv",
                    },
                },
            },
        },
        Timestamp: timestamppb.Now(),
    },
}

Error Handling Best Practices

  1. Use structured error messages in A2A format for programmatic handling
  2. Include actionable error descriptions in text parts for human operators
  3. Add detailed error data in data parts for debugging and retry logic
  4. Maintain task history to preserve error context
  5. Consider partial results using artifacts for partially successful operations

Migration from Legacy EventBus

Message Type Mappings

Legacy EventBusA2A EquivalentNotes
TaskMessagea2a.Task with initial MessageTask creation with request message
TaskResulta2a.Task with final ArtifactTask completion with result artifacts
TaskProgressa2a.Task with status MessageProgress updates via status messages
TaskStatus enuma2a.TaskState enumState names updated (e.g., IN_PROGRESSTASK_STATE_WORKING)

API Method Mappings

Legacy EventBusA2A EquivalentNotes
PublishTaskPublishTaskUpdateNow publishes A2A task objects
PublishTaskResultPublishTaskArtifactResults published as artifacts
PublishTaskProgressPublishTaskUpdateProgress via task status updates
SubscribeToTasksSubscribeToTasksNow returns A2A task events
SubscribeToTaskResultsSubscribeToTasks (filtered)Filter by COMPLETED state

This reference provides the complete specification for A2A task-related messages and operations in the AgentHub Event-Driven Architecture, enabling robust distributed task coordination with full Agent2Agent protocol compliance.