This is the multi-page printable view of this section. Click here to print.

Return to the regular view of this page.

Explanations

Understanding-oriented discussions that provide context and background

Explanations

This section provides in-depth discussions and explanations to help you understand the concepts, design decisions, and architecture behind AgentHub. These materials are designed to broaden your understanding beyond just how to use the system.

πŸ“š Explanation Categories

  • Architecture - System design and architectural principles
  • Core Concepts - Fundamental concepts and mental models
  • Features - Deep dives into specific features and capabilities

🎯 How to Use These Explanations

These documents are designed to:

  • Provide context for why things work the way they do
  • Explain trade-offs and design decisions
  • Offer multiple perspectives on the same concepts
  • Help you make informed decisions about using AgentHub

πŸ“– Reading Path

For Understanding Core Concepts

  1. Core Concepts - Start with fundamental principles
  2. Architecture - Understand system design
  3. Features - Explore specific capabilities

For System Design

  1. Architecture - System design patterns
  2. Features - Performance and scaling considerations

For Implementation Details

  1. Features - Technical implementation deep dives

πŸ’‘ Discussion Topics

These explanations discuss:

  • Design philosophy and principles
  • Architectural decisions and their rationale
  • Performance considerations and trade-offs
  • Future directions and possibilities
  • Alternative approaches and their pros/cons

1 - Architecture

Deep dive into AgentHub’s system architecture and design

Architecture Explanations

Understand the fundamental architecture and design principles behind AgentHub’s distributed agent system.

Available Documentation

1.1 - A2A-Compliant EDA Broker Architecture

Deep dive into the internal architecture of the AgentHub EDA broker, how it implements Agent2Agent (A2A) protocol-compliant communication patterns while maintaining Event-Driven Architecture benefits.

AgentHub A2A-Compliant EDA Broker Architecture

This document explains the internal architecture of the AgentHub Event-Driven Architecture (EDA) broker, how it implements Agent2Agent (A2A) protocol-compliant communication patterns, and the design decisions behind its hybrid approach.

Architectural Overview

The AgentHub broker serves as a centralized Event-Driven Architecture hub that transports Agent2Agent (A2A) protocol-compliant messages between distributed agents. It combines the scalability benefits of EDA with the interoperability guarantees of the A2A protocol.

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                     AgentHub Broker                             β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”β”‚
β”‚  β”‚   Task Router   β”‚    β”‚   Subscriber    β”‚    β”‚   Progress      β”‚β”‚
β”‚  β”‚                 β”‚    β”‚   Manager       β”‚    β”‚   Tracker       β”‚β”‚
β”‚  β”‚ β€’ Route tasks   β”‚    β”‚                 β”‚    β”‚                 β”‚β”‚
β”‚  β”‚ β€’ Apply filters β”‚    β”‚ β€’ Manage agent  β”‚    β”‚ β€’ Track task    β”‚β”‚
β”‚  β”‚ β€’ Broadcast     β”‚    β”‚   subscriptions β”‚    β”‚   progress      β”‚β”‚
β”‚  β”‚ β€’ Load balance  β”‚    β”‚ β€’ Handle        β”‚    β”‚ β€’ Update        β”‚β”‚
β”‚  β”‚                 β”‚    β”‚   disconnects   β”‚    β”‚   requesters    β”‚β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                        gRPC Interface                           β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”β”‚
β”‚  β”‚ PublishTask     β”‚    β”‚SubscribeToTasksβ”‚    β”‚SubscribeToTask  β”‚β”‚
β”‚  β”‚ PublishResult   β”‚    β”‚SubscribeToRes  β”‚    β”‚ Progress        β”‚β”‚
β”‚  β”‚ PublishProgress β”‚    β”‚                 β”‚    β”‚                 β”‚β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Core Components

1. Event Bus Server

The main server implementation at broker/main.go:22 provides the central coordination point:

type eventBusServer struct {
    pb.UnimplementedEventBusServer

    // Subscription management
    taskSubscribers         map[string][]chan *pb.TaskMessage
    taskResultSubscribers   map[string][]chan *pb.TaskResult
    taskProgressSubscribers map[string][]chan *pb.TaskProgress
    taskMu                  sync.RWMutex
}

Key characteristics:

  • Thread-safe: Uses sync.RWMutex to protect concurrent access to subscriber maps
  • Channel-based: Uses Go channels for efficient message passing
  • Non-blocking: Implements timeouts to prevent blocking on slow consumers
  • Stateless: No persistent storage - all state is in-memory

2. Task Routing Engine

The routing logic determines how tasks are delivered to agents:

Direct Routing

When a task specifies a ResponderAgentId, it’s routed directly to that agent:

if responderID := req.GetTask().GetResponderAgentId(); responderID != "" {
    if subs, ok := s.taskSubscribers[responderID]; ok {
        targetChannels = subs
    }
}

Broadcast Routing

When no specific responder is set, tasks are broadcast to all subscribed agents:

} else {
    // Broadcast to all task subscribers
    for _, subs := range s.taskSubscribers {
        targetChannels = append(targetChannels, subs...)
    }
}

Routing Features

  • Immediate delivery: Tasks are routed immediately upon receipt
  • Multiple subscribers: Single agent can have multiple subscription channels
  • Timeout protection: 5-second timeout prevents blocking on unresponsive agents
  • Error isolation: Failed delivery to one agent doesn’t affect others

3. Subscription Management

The broker manages three types of subscriptions:

Task Subscriptions

Agents subscribe to receive tasks assigned to them:

func (s *eventBusServer) SubscribeToTasks(req *pb.SubscribeToTasksRequest, stream pb.EventBus_SubscribeToTasksServer) error
  • Agent-specific: Tasks are delivered based on agent ID
  • Type filtering: Optional filtering by task types
  • Long-lived streams: Connections persist until agent disconnects
  • Automatic cleanup: Subscriptions are removed when connections close

Result Subscriptions

Publishers subscribe to receive results of tasks they requested:

func (s *eventBusServer) SubscribeToTaskResults(req *pb.SubscribeToTaskResultsRequest, stream pb.EventBus_SubscribeToTaskResultsServer) error

Progress Subscriptions

Publishers can track progress of long-running tasks:

func (s *eventBusServer) SubscribeToTaskProgress(req *pb.SubscribeToTaskResultsRequest, stream pb.EventBus_SubscribeToTaskProgressServer) error

4. Message Flow Architecture

Task Publication Flow

  1. Validation: Incoming tasks are validated for required fields
  2. Routing: Tasks are routed to appropriate subscribers
  3. Delivery: Messages are sent via Go channels with timeout protection
  4. Response: Publisher receives acknowledgment of successful publication

Result Flow

  1. Receipt: Agents publish task results back to the broker
  2. Broadcasting: Results are broadcast to all result subscribers
  3. Filtering: Subscribers receive results for their requested tasks
  4. Delivery: Results are streamed back to requesting agents

Progress Flow

  1. Updates: Executing agents send periodic progress updates
  2. Distribution: Progress updates are sent to interested subscribers
  3. Real-time delivery: Updates are streamed immediately upon receipt

Design Decisions and Trade-offs

In-Memory State Management

Decision: Store all subscription state in memory using Go maps and channels.

Benefits:

  • High performance: No database overhead for message routing
  • Low latency: Sub-millisecond message routing
  • Simplicity: Easier to develop, test, and maintain
  • Concurrent efficiency: Go’s garbage collector handles channel cleanup

Trade-offs:

  • No persistence: Broker restart loses all subscription state
  • Memory usage: Large numbers of agents increase memory requirements
  • Single point of failure: No built-in redundancy

When this works well:

  • Development and testing environments
  • Small to medium-scale deployments
  • Scenarios where agents can re-establish subscriptions on broker restart

Asynchronous Message Delivery

Decision: Use Go channels with timeout-based delivery.

Implementation:

go func(ch chan *pb.TaskMessage, task pb.TaskMessage) {
    select {
    case ch <- &task:
        // Message sent successfully
    case <-ctx.Done():
        log.Printf("Context cancelled while sending task %s", task.GetTaskId())
    case <-time.After(5 * time.Second):
        log.Printf("Timeout sending task %s. Dropping message.", task.GetTaskId())
    }
}(subChan, taskToSend)

Benefits:

  • Non-blocking: Slow agents don’t block the entire system
  • Fault tolerance: Timeouts prevent resource leaks
  • Scalability: Concurrent delivery to multiple agents
  • Resource protection: Prevents unbounded queue growth

Trade-offs:

  • Message loss: Timed-out messages are dropped
  • Complexity: Requires careful timeout tuning
  • No delivery guarantees: No acknowledgment of successful processing

gRPC Streaming for Subscriptions

Decision: Use bidirectional gRPC streams for agent subscriptions.

Benefits:

  • Real-time delivery: Messages are pushed immediately
  • Connection awareness: Broker knows when agents disconnect
  • Flow control: gRPC handles backpressure automatically
  • Type safety: Protocol Buffer messages ensure data consistency

Trade-offs:

  • Connection overhead: Each agent maintains persistent connections
  • Resource usage: Streams consume memory and file descriptors
  • Network sensitivity: Transient network issues can break connections

Concurrent Access Patterns

Decision: Use read-write mutexes with channel-based message passing.

Implementation:

s.taskMu.RLock()
// Read subscriber information
var targetChannels []chan *pb.TaskMessage
for _, subs := range s.taskSubscribers {
    targetChannels = append(targetChannels, subs...)
}
s.taskMu.RUnlock()

// Send messages without holding locks
for _, subChan := range targetChannels {
    go func(ch chan *pb.TaskMessage, task pb.TaskMessage) {
        // Async delivery
    }(subChan, taskToSend)
}

Benefits:

  • High concurrency: Multiple readers can access subscriptions simultaneously
  • Lock-free delivery: Message sending doesn’t hold locks
  • Deadlock prevention: Clear lock ordering and minimal critical sections
  • Performance: Read operations are optimized for the common case

Scalability Characteristics

Throughput

  • Task routing: ~10,000+ tasks/second on modern hardware
  • Concurrent connections: Limited by file descriptor limits (typically ~1,000s)
  • Memory usage: ~1KB per active subscription

Latency

  • Task routing: <1ms for local network delivery
  • End-to-end: <10ms for simple task processing cycles
  • Progress updates: Real-time streaming with minimal buffering

Resource Usage

  • CPU: Low CPU usage, primarily network I/O bound
  • Memory: Linear growth with number of active subscriptions
  • Network: Efficient binary Protocol Buffer encoding

Error Handling and Resilience

Connection Failures

  • Automatic cleanup: Subscriptions are removed when connections close
  • Graceful degradation: Failed agents don’t affect others
  • Reconnection support: Agents can re-establish subscriptions

Message Delivery Failures

  • Timeout handling: Messages that can’t be delivered are dropped
  • Logging: All failures are logged for debugging
  • Isolation: Per-agent timeouts prevent cascading failures

Resource Protection

  • Channel buffering: Limited buffer sizes prevent memory exhaustion
  • Timeout mechanisms: Prevent resource leaks from stuck operations
  • Graceful shutdown: Proper cleanup during server shutdown

Monitoring and Observability

Built-in Logging

The broker provides comprehensive logging:

  • Task routing decisions
  • Subscription lifecycle events
  • Error conditions and recovery
  • Performance metrics

Integration Points

  • Health checks: HTTP endpoints for monitoring
  • Metrics export: Prometheus/metrics integration points
  • Distributed tracing: Context propagation support

Future Enhancements

Persistence Layer

  • Database backend: Store subscription state for broker restarts
  • Message queuing: Durable task queues for reliability
  • Transaction support: Atomic message delivery guarantees

Clustering Support

  • Horizontal scaling: Multiple broker instances
  • Load balancing: Distribute agents across brokers
  • Consensus protocols: Consistent state across brokers

Advanced Routing

  • Capability-based routing: Route tasks based on agent capabilities
  • Load-aware routing: Consider agent load in routing decisions
  • Geographic routing: Route based on agent location

Security Enhancements

  • Authentication: Agent identity verification
  • Authorization: Task-level access controls
  • Encryption: TLS for all communications

The AgentHub broker architecture provides a solid foundation for Agent2Agent communication while maintaining simplicity and performance. Its design supports the immediate needs of most agent systems while providing clear paths for future enhancement as requirements evolve.

1.2 - Hexagonal Architecture & A2A Protocol Implementation

Understanding AgentHub’s hexagonal architecture with A2A protocol, gRPC communication, and event-driven design

Hexagonal Architecture & A2A Protocol Implementation

This document explains how AgentHub implements hexagonal architecture principles with the Agent2Agent (A2A) protocol, gRPC communication, and event-driven design patterns.

Overview

AgentHub follows hexagonal architecture (also known as Ports and Adapters) to achieve:

  • Domain isolation: Core A2A protocol logic separated from infrastructure
  • Testability: Clean interfaces enable comprehensive testing
  • Flexibility: Multiple adapters for different communication protocols
  • Maintainability: Clear separation of concerns and dependencies

System Architecture

graph TB
    subgraph "AgentHub Ecosystem"
        subgraph "External Agents"
            A["Agent A<br/>(Chat REPL)"]
            B["Agent B<br/>(Chat Responder)"]
            C["Agent C<br/>(Custom Agent)"]
        end

        subgraph "AgentHub Broker"
            subgraph "Adapters (Infrastructure)"
                GRPC["gRPC Server<br/>Adapter"]
                HEALTH["Health Check<br/>Adapter"]
                METRICS["Metrics<br/>Adapter"]
                TRACING["Tracing Adapter<br/>(OTLP/Jaeger)"]
            end

            subgraph "Ports (Interfaces)"
                SP["AgentHub<br/>Service Port"]
                PP["Message<br/>Publisher Port"]
                EP["Event<br/>Subscriber Port"]
                OP["Observability<br/>Port"]
            end

            subgraph "Domain (Core Logic)"
                A2A["A2A Protocol<br/>Engine"]
                ROUTER["Event Router<br/>& Broker"]
                VALIDATOR["Message<br/>Validator"]
                CONTEXT["Context<br/>Manager"]
                TASK["Task<br/>Lifecycle"]
            end
        end

        subgraph "External Systems"
            OTLP["OTLP Collector<br/>& Jaeger"]
            STORE["Event Store<br/>(Memory)"]
        end
    end

    %% External agent connections
    A -->|"gRPC calls<br/>(PublishMessage,<br/>SubscribeToMessages)"| GRPC
    B -->|"gRPC calls"| GRPC
    C -->|"gRPC calls"| GRPC

    %% Adapter to Port connections
    GRPC -->|"implements"| SP
    HEALTH -->|"implements"| OP
    METRICS -->|"implements"| OP
    TRACING -->|"implements"| OP

    %% Port to Domain connections
    SP -->|"delegates to"| A2A
    PP -->|"delegates to"| ROUTER
    EP -->|"delegates to"| ROUTER
    OP -->|"observes"| A2A

    %% Domain internal connections
    A2A -->|"uses"| VALIDATOR
    A2A -->|"uses"| CONTEXT
    A2A -->|"uses"| TASK
    ROUTER -->|"persists events"| STORE
    TRACING -->|"exports traces"| OTLP

    %% Styling
    classDef agents fill:#add8e6
    classDef adapters fill:#ffa500
    classDef ports fill:#e0ffff
    classDef domain fill:#ffb6c1
    classDef external fill:#dda0dd

    class A,B,C agents
    class GRPC,HEALTH,METRICS,TRACING adapters
    class SP,PP,EP,OP ports
    class A2A,ROUTER,VALIDATOR,CONTEXT,TASK domain
    class OTLP,STORE external

Architecture Notes:

  • Domain Core: Pure A2A protocol logic with message validation, event routing, context correlation, and task state management
  • Ports: Clean, technology-agnostic interfaces providing testable contracts and dependency inversion
  • Adapters: Infrastructure concerns including gRPC communication, observability exports, and protocol adaptations

A2A Message Flow

sequenceDiagram
    participant REPL as Chat REPL<br/>Agent
    participant gRPC as gRPC<br/>Adapter
    participant A2A as A2A Protocol<br/>Engine
    participant Router as Event<br/>Router
    participant Responder as Chat Responder<br/>Agent

    rect rgb(240, 248, 255)
        Note over REPL, Router: A2A Message Publishing
        REPL->>+gRPC: PublishMessage(A2AMessage)
        gRPC->>+A2A: validateA2AMessage()
        A2A->>A2A: check MessageId, Role, Content
        A2A-->>-gRPC: validation result
        gRPC->>+Router: routeA2AEvent(messageEvent)
        Router->>Router: identify subscribers<br/>by agent_id/broadcast
        Router->>Router: create tracing span<br/>with A2A attributes
        Router-->>Responder: deliver message event
        Router-->>-gRPC: routing success
        gRPC-->>-REPL: PublishResponse(event_id)
    end

    rect rgb(255, 248, 240)
        Note over Responder, Router: A2A Message Processing
        Responder->>+gRPC: SubscribeToMessages(agent_id)
        gRPC->>Router: register subscriber
        Router-->>gRPC: subscription stream
        gRPC-->>-Responder: message stream
        Note over Responder: Process A2A message<br/>with tracing spans
        Responder->>+gRPC: PublishMessage(A2AResponse)
        gRPC->>A2A: validateA2AMessage()
        A2A->>A2A: check AGENT role,<br/>ContextId correlation
        gRPC->>Router: routeA2AEvent(responseEvent)
        Router-->>REPL: deliver response event
        gRPC-->>-Responder: PublishResponse
    end

    Note over REPL, Responder: A2A Protocol ensures:<br/>β€’ Message structure compliance<br/>β€’ Role semantics (USER/AGENT)<br/>β€’ Context correlation<br/>β€’ Event-driven routing

Core Components

1. A2A Protocol Engine (Domain Core)

The heart of the system implementing A2A protocol specifications:

// Core domain logic - technology agnostic
type A2AProtocolEngine struct {
    messageValidator MessageValidator
    contextManager   ContextManager
    taskLifecycle    TaskLifecycle
}

// A2A message validation
func (e *A2AProtocolEngine) ValidateMessage(msg *Message) error {
    // A2A compliance checks
    if msg.MessageId == "" { return ErrMissingMessageId }
    if msg.Role == ROLE_UNSPECIFIED { return ErrInvalidRole }
    if len(msg.Content) == 0 { return ErrEmptyContent }
    return nil
}

2. Event Router (Domain Core)

Manages event-driven communication between agents:

type EventRouter struct {
    messageSubscribers map[string][]chan *AgentEvent
    taskSubscribers    map[string][]chan *AgentEvent
    eventSubscribers   map[string][]chan *AgentEvent
}

func (r *EventRouter) RouteEvent(event *AgentEvent) error {
    // Route based on A2A metadata
    routing := event.GetRouting()
    subscribers := r.getSubscribers(routing.ToAgentId, event.PayloadType)

    // Deliver with tracing
    for _, sub := range subscribers {
        go r.deliverWithTracing(sub, event)
    }
}

3. gRPC Adapter (Infrastructure)

Translates between gRPC and domain logic:

type GrpcAdapter struct {
    a2aEngine    A2AProtocolEngine
    eventRouter  EventRouter
    tracer       TracingAdapter
}

func (a *GrpcAdapter) PublishMessage(ctx context.Context, req *PublishMessageRequest) (*PublishResponse, error) {
    // Start tracing span
    ctx, span := a.tracer.StartA2AMessageSpan(ctx, "publish_message", req.Message.MessageId, req.Message.Role)
    defer span.End()

    // Validate using domain logic
    if err := a.a2aEngine.ValidateMessage(req.Message); err != nil {
        a.tracer.RecordError(span, err)
        return nil, err
    }

    // Route using domain logic
    event := a.createA2AEvent(req)
    if err := a.eventRouter.RouteEvent(event); err != nil {
        return nil, err
    }

    return &PublishResponse{Success: true, EventId: event.EventId}, nil
}

Hexagonal Architecture Benefits

1. Domain Isolation

  • A2A protocol logic is pure, testable business logic
  • No infrastructure dependencies in the core domain
  • Technology-agnostic implementation

2. Adapter Pattern

  • gRPC Adapter: Handles Protocol Buffer serialization/deserialization
  • Tracing Adapter: OTLP/Jaeger integration without domain coupling
  • Health Adapter: Service health monitoring
  • Metrics Adapter: Prometheus metrics collection

3. Port Interfaces

// Clean, testable interfaces
type MessagePublisher interface {
    PublishMessage(ctx context.Context, msg *Message) (*PublishResponse, error)
}

type EventSubscriber interface {
    SubscribeToMessages(ctx context.Context, agentId string) (MessageStream, error)
}

type ObservabilityPort interface {
    StartSpan(ctx context.Context, operation string) (context.Context, Span)
    RecordMetric(name string, value float64, labels map[string]string)
}

4. Dependency Inversion

  • Domain depends on abstractions (ports), not concrete implementations
  • Adapters depend on domain through well-defined interfaces
  • Easy testing with mock implementations

A2A Protocol Integration

Message Structure Compliance

classDiagram
    class A2AMessage {
        +string MessageId
        +string ContextId
        +Role Role
        +Part Content
        +Metadata Metadata
        +string TaskId
    }

    class Part {
        +string Text
        +bytes Data
        +FileData File
    }

    class EventMetadata {
        +string FromAgentId
        +string ToAgentId
        +string EventType
        +Priority Priority
    }

    class Role {
        <<enumeration>>
        USER
        AGENT
    }

    class Metadata {
        +Fields map
    }

    A2AMessage "1" --> "0..*" Part : contains
    A2AMessage "1" --> "1" EventMetadata : routed_with
    A2AMessage "1" --> "1" Role : has
    A2AMessage "1" --> "0..1" Metadata : includes

Event-Driven Architecture

The system implements pure event-driven architecture:

  1. Publishers emit A2A-compliant events
  2. Broker routes events based on metadata
  3. Subscribers receive relevant events
  4. Correlation through ContextId maintains conversation flow

Observability Integration

Distributed Tracing

sequenceDiagram
    participant A as Agent A
    participant B as Broker
    participant AB as Agent B
    participant OTLP as OTLP Collector
    participant J as Jaeger

    A->>+B: PublishMessage<br/>[trace_id: 123]
    B->>B: Create A2A spans<br/>with structured attributes
    B->>+AB: RouteEvent<br/>[trace_id: 123]
    AB->>AB: Process with<br/>child spans
    AB->>-B: PublishResponse<br/>[trace_id: 123]
    B->>-A: Success<br/>[trace_id: 123]

    par Observability Export
        B->>OTLP: Export spans<br/>with A2A attributes
        OTLP->>J: Store traces
        J->>J: Build trace timeline<br/>with correlation
    end

    Note over A, J: End-to-end tracing<br/>with A2A protocol visibility

Structured Attributes

Each span includes A2A-specific attributes:

  • a2a.message.id
  • a2a.message.role
  • a2a.context.id
  • a2a.event.type
  • a2a.routing.from_agent
  • a2a.routing.to_agent

Testing Strategy

Unit Testing (Domain Core)

func TestA2AEngine_ValidateMessage(t *testing.T) {
    engine := NewA2AProtocolEngine()

    // Test A2A compliance
    msg := &Message{
        MessageId: "test_msg_123",
        Role: ROLE_USER,
        Content: []*Part{{Text: "hello"}},
    }

    err := engine.ValidateMessage(msg)
    assert.NoError(t, err)
}

Integration Testing (Adapters)

func TestGrpcAdapter_PublishMessage(t *testing.T) {
    // Mock domain dependencies
    mockEngine := &MockA2AEngine{}
    mockRouter := &MockEventRouter{}

    adapter := NewGrpcAdapter(mockEngine, mockRouter)

    // Test adapter behavior
    resp, err := adapter.PublishMessage(ctx, validRequest)
    assert.NoError(t, err)
    assert.True(t, resp.Success)
}

Conclusion

AgentHub’s hexagonal architecture with A2A protocol provides:

  1. Clean Architecture: Separation of concerns with domain-driven design
  2. A2A Compliance: Full protocol implementation with validation
  3. Event-Driven Design: Scalable, loosely-coupled communication
  4. Rich Observability: Comprehensive tracing and metrics
  5. Testability: Clean interfaces enable thorough testing
  6. Flexibility: Easy to extend with new adapters and protocols

This architecture ensures maintainable, scalable, and observable agent communication while maintaining strict A2A protocol compliance.

2 - Core Concepts

Fundamental concepts and principles of AgentHub

Core Concepts

Explore the fundamental concepts, principles, and mental models that underpin AgentHub’s agent-to-agent communication system.

Available Documentation

2.1 - The Agent2Agent Principle

Deep dive into the philosophy and design principles behind Agent2Agent communication and how AgentHub implements this pattern

The Agent2Agent Protocol and AgentHub Implementation

This document explores the core principles of Google’s Agent2Agent protocol and how AgentHub implements a communication broker based on these concepts. We distinguish between the Agent2Agent protocol specification (task structures and communication patterns) and our custom AgentHub broker implementation.

Agent2Agent vs AgentHub: What’s What

Agent2Agent Protocol (Google)

The Agent2Agent protocol defines:

  • A2A Message Structures: Message, Task, Artifact with structured content parts
  • Task State Management: TaskState enums (SUBMITTED, WORKING, COMPLETED, FAILED, CANCELLED)
  • Communication Patterns: Asynchronous task delegation with context-aware message handling

AgentHub Implementation (This Project)

AgentHub provides:

  • Hybrid EDA+A2A Broker: Centralized gRPC service implementing A2A protocol within Event-Driven Architecture
  • A2A-Compliant Pub/Sub: Publisher-subscriber pattern using native A2A message structures
  • A2A Subscription Mechanisms: SubscribeToTasks, SubscribeToMessages, SubscribeToAgentEvents methods
  • A2A Agent Implementations: Sample agents using A2ATaskPublisher and A2ATaskSubscriber abstractions

Philosophy and Core Concepts

Beyond Simple Request-Response

Traditional software architectures rely heavily on synchronous request-response patterns where a client requests a service and waits for an immediate response. While effective for simple operations, this pattern has limitations when dealing with:

  • Complex, multi-step processes that require coordination between multiple specialized services
  • Long-running operations that may take minutes or hours to complete
  • Dynamic workload distribution where the best processor for a task may vary over time
  • Autonomous decision-making where agents need to collaborate without central coordination

The Agent2Agent protocol addresses these limitations by defining task structures and communication patterns for autonomous agents. AgentHub implements a broker-based system that enables agents to communicate using Agent2Agent-inspired task structures:

  1. Delegating work to other agents based on their capabilities
  2. Accepting and processing tasks according to their specializations
  3. Reporting progress during long-running operations
  4. Making collaborative decisions about task distribution and execution

Autonomous Collaboration

In an Agent2Agent system, each agent operates with a degree of autonomy, making decisions about:

  • Which tasks to accept based on current capacity and capabilities
  • How to prioritize work when multiple tasks are pending
  • When to delegate subtasks to other specialized agents
  • How to report progress and handle failures

This autonomy enables the system to be more resilient, scalable, and adaptive compared to centrally-controlled architectures.

Key Design Principles

1. Asynchronous Communication

Agent2Agent communication is fundamentally asynchronous. When Agent A requests work from Agent B:

  • Agent A doesn’t block waiting for completion
  • Agent B can process the task when resources are available
  • Progress updates provide visibility into long-running operations
  • Results are delivered when the work is complete

This asynchronicity enables:

  • Better resource utilization as agents aren’t blocked waiting
  • Improved scalability as systems can handle more concurrent operations
  • Enhanced resilience as temporary agent unavailability doesn’t block the entire system

2. Rich A2A Task Semantics

The Agent2Agent protocol defines rich task structures with flexible message content that AgentHub implements:

message Task {
  string id = 1;                         // Unique task identifier
  string context_id = 2;                 // Conversation/workflow context
  TaskStatus status = 3;                 // Current status with latest message
  repeated Message history = 4;          // Complete message history
  repeated Artifact artifacts = 5;       // Task output artifacts
  google.protobuf.Struct metadata = 6;   // Additional context
}

message Message {
  string message_id = 1;                 // Unique message identifier
  string context_id = 2;                 // Conversation context
  string task_id = 3;                    // Associated task
  Role role = 4;                         // USER or AGENT
  repeated Part content = 5;             // Structured content parts
  google.protobuf.Struct metadata = 6;   // Message metadata
}

message TaskStatus {
  TaskState state = 1;                   // SUBMITTED, WORKING, COMPLETED, etc.
  Message update = 2;                    // Latest status message
  google.protobuf.Timestamp timestamp = 3; // Status timestamp
}

This rich A2A structure enables:

  • Context-aware routing based on conversation context and message content
  • Flexible content handling through structured Part types (text, data, files)
  • Workflow coordination via shared context IDs across related tasks
  • Complete communication history for debugging and audit trails
  • Structured artifact delivery for rich result types

3. A2A Status Updates and Progress Tracking

Long-running tasks benefit from A2A status updates through the message history:

// Progress updates are A2A messages within the task
message TaskStatus {
  TaskState state = 1;                   // Current execution state
  Message update = 2;                    // Latest status message from agent
  google.protobuf.Timestamp timestamp = 3; // When this status was set
}

// Progress information is conveyed through message content
message Message {
  // ... other fields
  repeated Part content = 5;             // Can include progress details
}

// Example progress message content
Part progressPart = {
  part: {
    data: {
      data: {
        "progress_percentage": 65,
        "phase": "data_analysis",
        "estimated_remaining": "2m30s"
      },
      description: "Processing progress update"
    }
  }
}

This A2A approach enables:

  • Rich progress communication through structured message content
  • Complete audit trails via message history preservation
  • Context-aware status updates linking progress to specific workflows
  • Flexible progress formats supporting text, data, and file-based updates
  • Multi-agent coordination through shared context and message threading

4. A2A EDA Routing Flexibility

AgentHub’s A2A implementation supports multiple routing patterns through EDA metadata:

message AgentEventMetadata {
  string from_agent_id = 1;              // Source agent
  string to_agent_id = 2;                // Target agent (empty = broadcast)
  string event_type = 3;                 // Event classification
  repeated string subscriptions = 4;      // Topic-based routing
  Priority priority = 5;                 // Delivery priority
}
  • Direct A2A addressing: Tasks sent to specific agents via to_agent_id
  • Broadcast A2A addressing: Tasks sent to all subscribed agents (empty to_agent_id)
  • Topic-based A2A routing: Tasks routed via subscription filters and event types
  • Context-aware routing: Tasks routed based on A2A context and conversation state

This hybrid EDA+A2A approach enables sophisticated routing patterns while maintaining A2A protocol compliance.

Architectural Patterns

Microservices Enhancement

In a microservices architecture, Agent2Agent can enhance service communication by:

  • Replacing synchronous HTTP calls with asynchronous task delegation
  • Adding progress visibility to long-running service operations
  • Enabling service composition through task chaining
  • Improving resilience through task retry and timeout mechanisms

Event-Driven Architecture with A2A Protocol

AgentHub integrates A2A protocol within Event-Driven Architecture by:

  • Wrapping A2A messages in EDA event envelopes for routing and delivery
  • Preserving A2A semantics while leveraging EDA scalability and reliability
  • Enabling A2A conversation contexts within event-driven message flows
  • Supporting A2A task coordination alongside traditional event broadcasting
  • Providing A2A-compliant APIs that internally use EDA for transport
// A2A message wrapped in EDA event
type AgentEvent struct {
    EventId   string
    Timestamp timestamppb.Timestamp

    // A2A-compliant payload
    Payload oneof {
        a2a.Message message = 10
        a2a.Task task = 11
        TaskStatusUpdateEvent status_update = 12
        TaskArtifactUpdateEvent artifact_update = 13
    }

    // EDA routing metadata
    Routing AgentEventMetadata
}

Workflow Orchestration

Complex business processes can be modeled as Agent2Agent workflows:

  1. Process Initiation: A workflow agent receives a high-level business request
  2. Task Decomposition: The request is broken down into specific tasks
  3. Agent Coordination: Tasks are distributed to specialized agents
  4. Progress Aggregation: Individual task progress is combined into overall workflow status
  5. Result Assembly: Task results are combined into a final business outcome

Benefits and Trade-offs

Benefits

Scalability: Asynchronous operation and agent autonomy enable horizontal scaling without central bottlenecks.

Resilience: Agent failures don’t cascade as easily since tasks can be retried or redistributed.

Flexibility: New agent types can be added without modifying existing agents.

Observability: Rich task semantics and progress reporting provide excellent visibility into system operations.

Modularity: Agents can be developed, deployed, and scaled independently.

Trade-offs

Complexity: The system requires more sophisticated error handling and state management compared to simple request-response patterns.

Latency: For simple operations, the overhead of task creation and routing may add latency compared to direct calls.

Debugging: Distributed, asynchronous operations can be more challenging to debug than synchronous call chains.

Consistency: Managing data consistency across asynchronous agent operations requires careful design.

When to Use Agent2Agent

Agent2Agent is particularly well-suited for:

Complex Processing Pipelines

When work involves multiple steps that can be performed by different specialized agents:

  • Data ingestion β†’ validation β†’ transformation β†’ analysis β†’ reporting
  • Image upload β†’ virus scan β†’ thumbnail generation β†’ metadata extraction
  • Order processing β†’ inventory check β†’ payment processing β†’ fulfillment

Long-Running Operations

When operations take significant time and users need progress feedback:

  • Large file processing
  • Machine learning model training
  • Complex data analysis
  • Batch job processing

Dynamic Load Distribution

When workload characteristics vary and different agents may be better suited for different tasks:

  • Multi-tenant systems with varying customer requirements
  • Resource-intensive operations that need specialized hardware
  • Geographic distribution where local processing is preferred

System Integration

When connecting heterogeneous systems that need to coordinate:

  • Third-party service coordination
  • Cross-platform workflows

A2A Protocol Comparison with Other Patterns

vs. Message Queues

Traditional message queues provide asynchronous communication but lack:

  • A2A structured message parts (text, data, files)
  • A2A conversation context and task threading
  • A2A bidirectional artifact delivery
  • A2A complete message history preservation
  • A2A flexible content types and metadata

vs. RPC/HTTP APIs

RPC and HTTP APIs provide structured communication but are typically:

  • Synchronous (blocking) vs A2A asynchronous task delegation
  • Lacking A2A-style progress tracking through message history
  • Point-to-point rather than A2A context-aware routing
  • Without A2A structured content parts and artifact handling
  • Missing A2A conversation threading and workflow coordination

vs. Event Sourcing

Event sourcing provides audit trails and state reconstruction but:

  • Focuses on state changes rather than A2A work coordination
  • Lacks A2A structured task status and message threading
  • Doesn’t provide A2A artifact-based result delivery
  • Requires more complex patterns vs A2A’s built-in conversation context
  • Missing A2A’s multi-modal content handling (text, data, files)

A2A Protocol Future Evolution

The A2A protocol and AgentHub implementation opens possibilities for:

Intelligent A2A Agent Networks

Agents that learn from A2A conversation contexts and message patterns to make better delegation decisions based on historical performance and capability matching.

Self-Organizing A2A Systems

Agent networks that automatically reconfigure based on A2A workflow patterns, context relationships, and agent availability, using A2A metadata for intelligent routing decisions.

Cross-Organization A2A Collaboration

Extending A2A protocols across organizational boundaries for B2B workflow automation, leveraging A2A’s structured content parts and artifact handling for secure inter-org communication.

AI Agent A2A Integration

Natural integration points for AI agents that can:

  • Parse A2A message content parts for semantic understanding
  • Generate appropriate A2A responses with structured artifacts
  • Maintain A2A conversation context across complex multi-turn interactions
  • Make autonomous decisions about A2A task acceptance based on content analysis

Enhanced A2A Features

  • A2A Protocol Extensions: Custom Part types for domain-specific content
  • Advanced A2A Routing: ML-based routing decisions using conversation context
  • A2A Federation: Cross-cluster A2A communication with context preservation
  • A2A Analytics: Deep insights from conversation patterns and artifact flows

The A2A protocol represents a foundational shift toward more intelligent, context-aware, and collaborative software systems that can handle complex distributed workflows while maintaining strong semantics, complete audit trails, and rich inter-agent communication patterns.

2.2 - Agent2Agent (A2A) Protocol Migration

Understanding the migration to Agent2Agent protocol compliance while maintaining Event-Driven Architecture benefits.

Agent2Agent (A2A) Protocol Migration

This document explains the migration of AgentHub to full Agent2Agent (A2A) protocol compliance while maintaining the essential Event-Driven Architecture (EDA) patterns that make the system scalable and resilient.

What is the Agent2Agent Protocol?

The Agent2Agent (A2A) protocol is a standardized specification for communication between AI agents. It defines:

  • Standardized Message Formats: Using Message, Part, Task, and Artifact structures
  • Task Lifecycle Management: Clear states (SUBMITTED, WORKING, COMPLETED, FAILED, CANCELLED)
  • Agent Discovery: Using AgentCard for capability advertisement
  • Interoperability: Ensuring agents can communicate across different platforms

Why Migrate to A2A?

Benefits of A2A Compliance

  1. Interoperability: AgentHub can now communicate with any A2A-compliant agent or system
  2. Standardization: Clear, well-defined message formats reduce integration complexity
  3. Ecosystem Compatibility: Join the growing ecosystem of A2A-compatible tools
  4. Future-Proofing: Built on industry standards rather than custom protocols

Maintained EDA Benefits

  • Scalability: Event-driven routing scales to thousands of agents
  • Resilience: Asynchronous communication handles network partitions gracefully
  • Flexibility: Topic-based routing and priority queues enable sophisticated workflows
  • Observability: Built-in tracing and metrics for production deployments

Hybrid Architecture

AgentHub implements a hybrid approach that combines the best of both worlds:

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                   A2A Protocol Layer                           β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”β”‚
β”‚  β”‚ A2A Message β”‚  β”‚  A2A Task   β”‚  β”‚ A2A Artifactβ”‚  β”‚A2A Agentβ”‚β”‚
β”‚  β”‚  (standard) β”‚  β”‚ (standard)  β”‚  β”‚ (standard)  β”‚  β”‚  Card   β”‚β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                    EDA Transport Layer                         β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”β”‚
β”‚  β”‚ AgentEvent  β”‚  β”‚Event Router β”‚  β”‚ Subscribers β”‚  β”‚Priority β”‚β”‚
β”‚  β”‚  Wrapper    β”‚  β”‚             β”‚  β”‚  Manager    β”‚  β”‚ Queues  β”‚β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                      gRPC Infrastructure                       β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

How It Works

  1. A2A Messages are created using standard A2A structures (Message, Task, etc.)
  2. EDA Wrapper wraps A2A messages in AgentEvent for transport
  3. Event Routing uses EDA patterns (pub/sub, priority, topics) for delivery
  4. A2A Compliance ensures messages follow A2A protocol semantics

API Changes

Before (Legacy API)

// Legacy TaskMessage (deprecated)
taskPublisher.PublishTask(ctx, &agenthub.PublishTaskRequest{
    TaskType: "greeting",
    Parameters: map[string]interface{}{
        "name": "Claude",
    },
    RequesterAgentID: "my_agent",
    ResponderAgentID: "target_agent",
})

After (A2A-Compliant API)

// A2A-compliant task publishing
content := []*pb.Part{
    {
        Part: &pb.Part_Text{
            Text: "Hello! Please provide a greeting for Claude.",
        },
    },
}

task, err := taskPublisher.PublishTask(ctx, &agenthub.A2APublishTaskRequest{
    TaskType:         "greeting",
    Content:          content,
    RequesterAgentID: "my_agent",
    ResponderAgentID: "target_agent",
    Priority:         pb.Priority_PRIORITY_MEDIUM,
    ContextID:        "conversation_123",
})

Message Structure Changes

A2A Message Format

message Message {
  string message_id = 1;       // Unique message identifier
  string context_id = 2;       // Conversation context
  string task_id = 3;          // Associated task (optional)
  Role role = 4;               // USER or AGENT
  repeated Part content = 5;   // Message content parts
  google.protobuf.Struct metadata = 6; // Additional metadata
}

message Part {
  oneof part {
    string text = 1;           // Text content
    DataPart data = 2;         // Structured data
    FilePart file = 3;         // File reference
  }
}

A2A Task Format

message Task {
  string id = 1;                    // Task identifier
  string context_id = 2;            // Conversation context
  TaskStatus status = 3;            // Current status
  repeated Message history = 4;     // Message history
  repeated Artifact artifacts = 5;  // Task outputs
  google.protobuf.Struct metadata = 6; // Task metadata
}

enum TaskState {
  TASK_STATE_SUBMITTED = 0;    // Task created
  TASK_STATE_WORKING = 1;      // Task in progress
  TASK_STATE_COMPLETED = 2;    // Task completed successfully
  TASK_STATE_FAILED = 3;       // Task failed
  TASK_STATE_CANCELLED = 4;    // Task cancelled
}

Migration Guide

For Publishers

  1. Replace TaskPublisher with A2ATaskPublisher
  2. Use A2APublishTaskRequest with A2A Part structures
  3. Handle returned A2A Task objects

For Subscribers

  1. Replace TaskSubscriber with A2ATaskSubscriber
  2. Update handlers to process A2A Task and Message objects
  3. Return A2A Artifact objects instead of custom results

For Custom Integrations

  1. Update protobuf imports to use events/a2a package
  2. Replace custom message structures with A2A equivalents
  3. Use AgentHub service instead of EventBus

Backward Compatibility

The migration maintains wire-level compatibility through:

  • Deprecated Types: Legacy message types marked as deprecated but still supported
  • Automatic Conversion: EDA broker converts between legacy and A2A formats when needed
  • Graceful Migration: Existing agents can migrate incrementally

Testing A2A Compliance

Run the demo to verify A2A compliance:

# Terminal 1: Start A2A broker
make run-server

# Terminal 2: Start A2A subscriber
make run-subscriber

# Terminal 3: Start A2A publisher
make run-publisher

Expected output shows successful A2A task processing:

  • Publisher: “Published A2A task”
  • Subscriber: “Task processing completed”
  • Artifacts generated in A2A format

Best Practices

  1. Use A2A Types: Always use A2A message structures for new code
  2. Context Management: Use context_id to group related messages
  3. Proper Parts: Structure content using appropriate Part types
  4. Artifact Returns: Return structured Artifact objects from tasks
  5. Status Updates: Properly manage task lifecycle states

The A2A migration ensures AgentHub remains both standards-compliant and highly scalable through its hybrid EDA+A2A architecture.

2.3 - Understanding Tasks in Agent2Agent Communication

Tasks are the fundamental unit of work exchange in the Agent2Agent protocol. Deep dive into task semantics, lifecycle, and design patterns.

Understanding Tasks in Agent2Agent Communication

Tasks are the fundamental unit of work exchange in the Agent2Agent protocol. This document provides a deep dive into task semantics, lifecycle, and design patterns.

Task Anatomy

Core Components

Every task in the Agent2Agent system consists of several key components that define its identity, purpose, and execution context:

A2A Task Identity

string id = 1;                         // Unique task identifier
string context_id = 2;                 // Optional conversation context

The id serves as a unique identifier that allows all participants to track the task throughout its lifecycle. It should be globally unique and meaningful for debugging purposes.

The context_id groups related tasks in a conversation or workflow context, enabling sophisticated multi-task coordination patterns.

Task classification in A2A is handled through the initial Message content rather than a separate task_type field, providing more flexibility for complex task descriptions.

A2A Task Status and History

TaskStatus status = 3;                 // Current task status
repeated Message history = 4;          // Message history for this task
repeated Artifact artifacts = 5;       // Task output artifacts
google.protobuf.Struct metadata = 6;   // Task metadata

In A2A, task data is contained within Message content using the structured Part format:

// A2A task request message
message Message {
  string message_id = 1;
  string context_id = 2;
  string task_id = 3;
  Role role = 4;                    // USER (requester) or AGENT (responder)
  repeated Part content = 5;        // Structured task content
}

message Part {
  oneof part {
    string text = 1;               // Text description
    DataPart data = 2;             // Structured data
    FilePart file = 3;             // File references
  }
}
// Example: A2A data analysis task
taskMessage := &a2a.Message{
    MessageId: "msg_" + uuid.New().String(),
    ContextId: "analysis_workflow_123",
    TaskId:    "task_analysis_456",
    Role:      a2a.Role_USER,
    Content: []*a2a.Part{
        {
            Part: &a2a.Part_Text{
                Text: "Please perform trend analysis on Q4 sales data",
            },
        },
        {
            Part: &a2a.Part_Data{
                Data: &a2a.DataPart{
                    Data: analysisParams, // Structured parameters
                    Description: "Analysis configuration",
                },
            },
        },
    },
}

Metadata in A2A tasks provides additional context for execution, auditing, or debugging:

// A2A task metadata
taskMetadata, _ := structpb.NewStruct(map[string]interface{}{
    "workflow_id":     "workflow_abc123",
    "user_id":         "user_456",
    "request_source":  "web_ui",
    "correlation_id":  "trace_789",
    "priority":        "high",
    "expected_duration": "5m",
})

task := &a2a.Task{
    Id:        "task_analysis_456",
    ContextId: "analysis_workflow_123",
    Metadata:  taskMetadata,
}

A2A Agent Coordination

In A2A, agent coordination is handled through the EDA routing metadata:

message AgentEventMetadata {
  string from_agent_id = 1;           // Source agent identifier
  string to_agent_id = 2;             // Target agent ID (empty = broadcast)
  string event_type = 3;              // Event classification
  repeated string subscriptions = 4;   // Topic-based routing tags
  Priority priority = 5;              // Delivery priority
}

This enables flexible routing patterns:

  • from_agent_id identifies the requesting agent
  • to_agent_id can specify a target agent or be empty for broadcast
  • subscriptions enable topic-based routing for specialized agents
  • priority ensures urgent tasks get precedence

A2A Execution Context

A2A handles execution context through the TaskStatus structure:

message TaskStatus {
  TaskState state = 1;                   // SUBMITTED, WORKING, COMPLETED, FAILED, CANCELLED
  Message update = 2;                    // Latest status message
  google.protobuf.Timestamp timestamp = 3; // Status timestamp
}

enum TaskState {
  TASK_STATE_SUBMITTED = 0;
  TASK_STATE_WORKING = 1;
  TASK_STATE_COMPLETED = 2;
  TASK_STATE_FAILED = 3;
  TASK_STATE_CANCELLED = 4;
}

This context helps agents make intelligent scheduling decisions:

  • deadline enables time-sensitive prioritization
  • priority provides explicit urgency ranking
  • created_at enables age-based scheduling policies

Task Lifecycle

1. A2A Task Creation and Publishing

A2A tasks begin their lifecycle when a requesting agent creates a task with an initial message:

// Create A2A task with initial request message
task := &a2a.Task{
    Id:        "task_analysis_" + uuid.New().String(),
    ContextId: "workflow_orchestration_123",
    Status: &a2a.TaskStatus{
        State: a2a.TaskState_TASK_STATE_SUBMITTED,
        Update: &a2a.Message{
            MessageId: "msg_" + uuid.New().String(),
            TaskId:    "task_analysis_" + uuid.New().String(),
            Role:      a2a.Role_USER,
            Content: []*a2a.Part{
                {
                    Part: &a2a.Part_Text{
                        Text: "Please analyze the quarterly sales data for trends",
                    },
                },
                {
                    Part: &a2a.Part_Data{
                        Data: &a2a.DataPart{
                            Data: analysisParams,
                            Description: "Analysis configuration",
                        },
                    },
                },
            },
        },
        Timestamp: timestamppb.Now(),
    },
}

// Publish to AgentHub broker
client.PublishTaskUpdate(ctx, &pb.PublishTaskUpdateRequest{
    Task: task,
    Routing: &pb.AgentEventMetadata{
        FromAgentId: "data_orchestrator",
        ToAgentId:   "data_processor_01", // Optional: specific agent
        EventType:   "task.submitted",
        Priority:    pb.Priority_PRIORITY_HIGH,
    },
})

2. A2A Task Discovery and Acceptance

Agents subscribe to A2A task events and evaluate whether to accept them:

// Agent receives A2A task event
func (a *Agent) evaluateA2ATask(event *pb.AgentEvent) bool {
    task := event.GetTask()
    if task == nil || task.Status.State != a2a.TaskState_TASK_STATE_SUBMITTED {
        return false
    }

    // Analyze task content to understand requirements
    requestMessage := task.Status.Update
    taskDescription := a.extractTaskDescription(requestMessage)

    // Check if agent can handle this task type
    if !a.canHandleTaskType(taskDescription) {
        return false
    }

    // Check capacity constraints
    if a.getCurrentLoad() > a.maxCapacity {
        return false
    }

    // Estimate duration from task content and metadata
    estimatedDuration := a.estimateA2ATaskDuration(task)
    if estimatedDuration > a.maxTaskDuration {
        return false
    }

    return true
}

func (a *Agent) extractTaskDescription(msg *a2a.Message) string {
    for _, part := range msg.Content {
        if textPart := part.GetText(); textPart != "" {
            return textPart
        }
    }
    return ""
}

3. A2A Task Execution with Progress Reporting

Accepted A2A tasks enter the execution phase with regular status updates:

func (a *Agent) executeA2ATask(task *a2a.Task) {
    // Update task to WORKING state
    a.updateTaskStatus(task, a2a.TaskState_TASK_STATE_WORKING, "Task started")

    // Phase 1: Preparation
    a.updateTaskStatus(task, a2a.TaskState_TASK_STATE_WORKING, "Preparing data analysis")
    prepareResult := a.prepareA2AExecution(task)

    // Phase 2: Main processing
    a.updateTaskStatus(task, a2a.TaskState_TASK_STATE_WORKING, "Processing data - 50% complete")
    processResult := a.processA2AData(prepareResult)

    // Phase 3: Finalization
    a.updateTaskStatus(task, a2a.TaskState_TASK_STATE_WORKING, "Finalizing results - 75% complete")
    finalResult := a.finalizeA2AResults(processResult)

    // Completion with artifacts
    a.completeTaskWithArtifacts(task, finalResult)
}

func (a *Agent) updateTaskStatus(task *a2a.Task, state a2a.TaskState, message string) {
    statusUpdate := &a2a.Message{
        MessageId: "msg_" + uuid.New().String(),
        TaskId:    task.Id,
        Role:      a2a.Role_AGENT,
        Content: []*a2a.Part{
            {
                Part: &a2a.Part_Text{
                    Text: message,
                },
            },
        },
    }

    task.Status = &a2a.TaskStatus{
        State:     state,
        Update:    statusUpdate,
        Timestamp: timestamppb.Now(),
    }

    // Publish task update
    a.client.PublishTaskUpdate(context.Background(), &pb.PublishTaskUpdateRequest{
        Task: task,
        Routing: &pb.AgentEventMetadata{
            FromAgentId: a.agentId,
            EventType:   "task.status_update",
        },
    })
}

4. A2A Result Delivery

A2A task completion delivers results through structured artifacts:

func (a *Agent) completeTaskWithArtifacts(task *a2a.Task, resultData interface{}) {
    // Create completion message
    completionMessage := &a2a.Message{
        MessageId: "msg_" + uuid.New().String(),
        TaskId:    task.Id,
        Role:      a2a.Role_AGENT,
        Content: []*a2a.Part{
            {
                Part: &a2a.Part_Text{
                    Text: "Analysis completed successfully",
                },
            },
        },
    }

    // Create result artifact
    resultArtifact := &a2a.Artifact{
        ArtifactId:  "artifact_" + uuid.New().String(),
        Name:        "Analysis Results",
        Description: "Quarterly sales trend analysis",
        Parts: []*a2a.Part{
            {
                Part: &a2a.Part_Data{
                    Data: &a2a.DataPart{
                        Data:        resultData.(structpb.Struct),
                        Description: "Analysis results and metrics",
                    },
                },
            },
        },
    }

    // Update task to completed
    task.Status = &a2a.TaskStatus{
        State:     a2a.TaskState_TASK_STATE_COMPLETED,
        Update:    completionMessage,
        Timestamp: timestamppb.Now(),
    }
    task.Artifacts = append(task.Artifacts, resultArtifact)

    // Publish final task update
    a.client.PublishTaskUpdate(context.Background(), &pb.PublishTaskUpdateRequest{
        Task: task,
        Routing: &pb.AgentEventMetadata{
            FromAgentId: a.agentId,
            EventType:   "task.completed",
        },
    })

    // Publish artifact separately
    a.client.PublishTaskArtifact(context.Background(), &pb.PublishTaskArtifactRequest{
        TaskId:   task.Id,
        Artifact: resultArtifact,
        Routing: &pb.AgentEventMetadata{
            FromAgentId: a.agentId,
            EventType:   "task.artifact",
        },
    })
}

A2A Task Design Patterns

1. Simple A2A Request-Response

The most basic pattern where one agent requests work from another using A2A messages:

Agent A ──[A2A Task]──> AgentHub ──[TaskEvent]──> Agent B
Agent A <─[Artifact]─── AgentHub <─[TaskUpdate]── Agent B

A2A Implementation:

// Agent A creates task
task := &a2a.Task{
    Id: "simple_task_123",
    Status: &a2a.TaskStatus{
        State: a2a.TaskState_TASK_STATE_SUBMITTED,
        Update: &a2a.Message{
            Role: a2a.Role_USER,
            Content: []*a2a.Part{{Part: &a2a.Part_Text{Text: "Convert CSV to JSON"}}},
        },
    },
}

// Agent B responds with artifact
artifact := &a2a.Artifact{
    Name: "Converted Data",
    Parts: []*a2a.Part{{Part: &a2a.Part_File{File: &a2a.FilePart{FileId: "converted.json"}}}},
}

Use cases:

  • File format conversion
  • Simple calculations
  • Data validation
  • Content generation

2. A2A Broadcast Processing

One agent broadcasts a task to multiple potential processors using A2A context-aware routing:

Agent A ──[A2A Task]──> AgentHub ──[TaskEvent]──> Agent B₁
                                β”œβ”€[TaskEvent]──> Agent Bβ‚‚
                                └─[TaskEvent]──> Agent B₃

A2A Implementation:

// Broadcast task with shared context
task := &a2a.Task{
    Id:        "broadcast_task_456",
    ContextId: "parallel_processing_context",
    Status: &a2a.TaskStatus{
        State: a2a.TaskState_TASK_STATE_SUBMITTED,
        Update: &a2a.Message{
            Role: a2a.Role_USER,
            Content: []*a2a.Part{
                {Part: &a2a.Part_Text{Text: "Process data chunk"}},
                {Part: &a2a.Part_Data{Data: &a2a.DataPart{Data: chunkData}}},
            },
        },
    },
}

// Publish without specific target (broadcast)
client.PublishTaskUpdate(ctx, &pb.PublishTaskUpdateRequest{
    Task: task,
    Routing: &pb.AgentEventMetadata{
        FromAgentId: "orchestrator",
        // No ToAgentId = broadcast
        EventType: "task.broadcast",
    },
})

Use cases:

  • Distributed computation
  • Load testing
  • Content distribution
  • Parallel processing

3. A2A Pipeline Processing

Tasks flow through a series of specialized agents using shared A2A context:

Agent A ──[A2A Task₁]──> Agent B ──[A2A Taskβ‚‚]──> Agent C ──[A2A Task₃]──> Agent D
       <──[Final Artifact]β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

A2A Implementation:

// Shared context for pipeline
pipelineContext := "data_pipeline_" + uuid.New().String()

// Stage 1: Data extraction
task1 := &a2a.Task{
    Id:        "extract_" + uuid.New().String(),
    ContextId: pipelineContext,
    Status: &a2a.TaskStatus{
        State: a2a.TaskState_TASK_STATE_SUBMITTED,
        Update: &a2a.Message{
            Role: a2a.Role_USER,
            Content: []*a2a.Part{{Part: &a2a.Part_Text{Text: "Extract data from source"}}},
        },
    },
}

// Stage 2: Data transformation (triggered by Stage 1 completion)
task2 := &a2a.Task{
    Id:        "transform_" + uuid.New().String(),
    ContextId: pipelineContext, // Same context
    Status: &a2a.TaskStatus{
        State: a2a.TaskState_TASK_STATE_SUBMITTED,
        Update: &a2a.Message{
            Role: a2a.Role_USER,
            Content: []*a2a.Part{{Part: &a2a.Part_Text{Text: "Transform extracted data"}}},
        },
    },
}

// Context linking enables pipeline coordination

Use cases:

  • Data processing pipelines
  • Image processing workflows
  • Document processing chains
  • ETL operations

4. A2A Hierarchical Decomposition

Complex tasks are broken down into subtasks using A2A context hierarchy:

Agent A ──[A2A ComplexTask]──> Coordinator
                                  β”œβ”€β”€[A2A SubTask₁]──> Specialist₁
                                  β”œβ”€β”€[A2A SubTaskβ‚‚]──> Specialistβ‚‚
                                  └──[A2A SubTask₃]──> Specialist₃

A2A Implementation:

// Parent task
parentTask := &a2a.Task{
    Id:        "complex_analysis_789",
    ContextId: "business_workflow_123",
    Status: &a2a.TaskStatus{
        State: a2a.TaskState_TASK_STATE_SUBMITTED,
        Update: &a2a.Message{
            Role: a2a.Role_USER,
            Content: []*a2a.Part{{Part: &a2a.Part_Text{Text: "Perform comprehensive business analysis"}}},
        },
    },
}

// Coordinator creates subtasks with hierarchical context
subtask1 := &a2a.Task{
    Id:        "financial_analysis_790",
    ContextId: "business_workflow_123", // Same parent context
    Metadata: map[string]interface{}{
        "parent_task_id": "complex_analysis_789",
        "subtask_type":   "financial",
    },
}

subtask2 := &a2a.Task{
    Id:        "market_analysis_791",
    ContextId: "business_workflow_123", // Same parent context
    Metadata: map[string]interface{}{
        "parent_task_id": "complex_analysis_789",
        "subtask_type":   "market",
    },
}

// Context enables coordination and result aggregation

Use cases:

  • Complex business workflows
  • Multi-step analysis
  • Orchestrated services
  • Batch job coordination

5. Competitive Processing

Multiple agents compete to handle the same task (first-come-first-served):

Agent A ──[Task]──> Broker ──[Task]──> Agent B₁ (accepts)
                           β”œβ”€[Task]──> Agent Bβ‚‚ (rejects)
                           └─[Task]──> Agent B₃ (rejects)

Use cases:

  • Resource-constrained environments
  • Load balancing
  • Fault tolerance
  • Performance optimization

A2A Task Content and Semantics

A2A Message-Based Classification

In A2A, task classification is handled through message content rather than rigid type fields, providing more flexibility:

Content-Based Classification

// Data processing task
message := &a2a.Message{
    Content: []*a2a.Part{
        {Part: &a2a.Part_Text{Text: "Analyze quarterly sales data for trends"}},
        {Part: &a2a.Part_Data{Data: &a2a.DataPart{Description: "Analysis parameters"}}},
    },
}

// Image processing task
message := &a2a.Message{
    Content: []*a2a.Part{
        {Part: &a2a.Part_Text{Text: "Generate product image with specifications"}},
        {Part: &a2a.Part_Data{Data: &a2a.DataPart{Description: "Image requirements"}}},
    },
}

// Notification task
message := &a2a.Message{
    Content: []*a2a.Part{
        {Part: &a2a.Part_Text{Text: "Send completion notification to user"}},
        {Part: &a2a.Part_Data{Data: &a2a.DataPart{Description: "Notification details"}}},
    },
}

Operation-Based Classification

create.*        - Creation operations
update.*        - Modification operations
delete.*        - Removal operations
analyze.*       - Analysis operations
transform.*     - Transformation operations

Complexity-Based Classification

simple.*        - Quick, low-resource tasks
standard.*      - Normal processing tasks
complex.*       - Resource-intensive tasks
background.*    - Long-running batch tasks

A2A Content Design Guidelines

Be Explicit: Include all information needed for execution in structured Parts

// Good: Explicit A2A content
content := []*a2a.Part{
    {
        Part: &a2a.Part_Text{
            Text: "Convert CSV file to JSON format with specific options",
        },
    },
    {
        Part: &a2a.Part_Data{
            Data: &a2a.DataPart{
                Data: structpb.NewStruct(map[string]interface{}{
                    "source_format":   "csv",
                    "target_format":   "json",
                    "include_headers": true,
                    "delimiter":       ",",
                    "encoding":        "utf-8",
                }),
                Description: "Conversion parameters",
            },
        },
    },
    {
        Part: &a2a.Part_File{
            File: &a2a.FilePart{
                FileId:   "source_data.csv",
                Filename: "data.csv",
                MimeType: "text/csv",
            },
        },
    },
}

// Poor: Ambiguous A2A content
content := []*a2a.Part{
    {
        Part: &a2a.Part_Text{
            Text: "Convert file", // Too vague
        },
    },
}

Use Standard Data Types: Leverage common formats for interoperability

// Good: Standard formats
{
  "timestamp": "2024-01-15T10:30:00Z",      // ISO 8601
  "amount": "123.45",                        // String for precision
  "coordinates": {"lat": 40.7128, "lng": -74.0060}
}

Include Validation Information: Help agents validate inputs

{
  "email": "user@example.com",
  "email_format": "rfc5322",
  "max_length": 254,
  "required": true
}

A2A Error Handling and Edge Cases

A2A Task Rejection

Agents should provide meaningful rejection reasons using A2A message format:

func (a *Agent) rejectA2ATask(task *a2a.Task, reason string) {
    // Create rejection message
    rejectionMessage := &a2a.Message{
        MessageId: "msg_" + uuid.New().String(),
        TaskId:    task.Id,
        Role:      a2a.Role_AGENT,
        Content: []*a2a.Part{
            {
                Part: &a2a.Part_Text{
                    Text: "Task rejected: " + reason,
                },
            },
            {
                Part: &a2a.Part_Data{
                    Data: &a2a.DataPart{
                        Data: structpb.NewStruct(map[string]interface{}{
                            "rejection_reason": reason,
                            "agent_id":         a.agentId,
                            "timestamp":        time.Now().Unix(),
                        }),
                        Description: "Rejection details",
                    },
                },
            },
        },
    }

    // Update task status to failed
    task.Status = &a2a.TaskStatus{
        State:     a2a.TaskState_TASK_STATE_FAILED,
        Update:    rejectionMessage,
        Timestamp: timestamppb.Now(),
    }

    a.publishTaskUpdate(task)
}

Common rejection reasons:

  • UNSUPPORTED_TASK_TYPE: Agent doesn’t handle this task type
  • CAPACITY_EXCEEDED: Agent is at maximum capacity
  • DEADLINE_IMPOSSIBLE: Cannot complete within deadline
  • INVALID_PARAMETERS: Task parameters are malformed
  • RESOURCE_UNAVAILABLE: Required external resources unavailable

Timeout Handling

Both requesters and processors should handle timeouts gracefully:

// Requester timeout
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()

select {
case result := <-resultChannel:
    // Process result
case <-ctx.Done():
    // Handle timeout - possibly retry or fail
}

// Processor timeout
func (a *Agent) executeWithTimeout(task *pb.TaskMessage) {
    deadline := task.GetDeadline().AsTime()
    ctx, cancel := context.WithDeadline(context.Background(), deadline)
    defer cancel()

    select {
    case result := <-a.processTask(ctx, task):
        a.publishResult(task, result, pb.TaskStatus_TASK_STATUS_COMPLETED)
    case <-ctx.Done():
        a.publishResult(task, nil, pb.TaskStatus_TASK_STATUS_FAILED, "Deadline exceeded")
    }
}

Partial Results

For long-running tasks, consider supporting partial results:

type PartialResult struct {
    TaskId          string
    CompletedPortion float64    // 0.0 to 1.0
    IntermediateData interface{}
    CanResume       bool
    ResumeToken     string
}

Best Practices

Task Design

  1. Make task types granular but not too fine-grained
  2. Design for idempotency when possible
  3. Include retry information in metadata
  4. Use consistent parameter naming across similar task types
  5. Version your task schemas to enable evolution

Performance Considerations

  1. Batch related tasks when appropriate
  2. Use appropriate priority levels to avoid starvation
  3. Set realistic deadlines based on historical performance
  4. Include resource hints to help with scheduling
  5. Monitor task completion rates to identify bottlenecks

Security Considerations

  1. Validate all task parameters before processing
  2. Sanitize user-provided data in task parameters
  3. Include authorization context in metadata
  4. Log task execution for audit trails
  5. Encrypt sensitive parameters when necessary

A2A tasks form the foundation of Agent2Agent communication, enabling sophisticated distributed processing patterns through structured messages, artifacts, and context-aware coordination. The A2A protocol’s flexible message format and EDA integration provide robust, scalable agent networks with clear semantics and strong observability. Proper A2A task design leverages the protocol’s strengths for building maintainable, interoperable agent systems.

3 - Features

Deep explanations of AgentHub’s key features and capabilities

Feature Explanations

Detailed explanations of AgentHub’s advanced features, their design rationale, and implementation details.

Available Documentation

3.1 - Distributed Tracing & OpenTelemetry

Deep dive into distributed tracing concepts, OpenTelemetry architecture, and how AgentHub implements comprehensive observability for event-driven systems.

πŸ” Distributed Tracing & OpenTelemetry

Understanding-oriented: Deep dive into distributed tracing concepts, OpenTelemetry architecture, and how AgentHub implements comprehensive observability for event-driven systems.

The Problem: Observing Distributed Systems

Traditional monolithic applications are relatively easy to debugβ€”everything happens in one process, on one machine, with one log file. But modern event-driven architectures like AgentHub present unique challenges:

The Complexity of Event-Driven Systems

Request Flow in AgentHub:
User β†’ Publisher Agent β†’ AgentHub Broker β†’ Subscriber Agent β†’ Result β†’ Publisher Agent

Each step involves:

  • Different processes (potentially on different machines)
  • Asynchronous communication (events, not direct calls)
  • Multiple protocol layers (gRPC, HTTP, network)
  • Independent failure modes (network partitions, service crashes)
  • Varying performance characteristics (CPU, memory, I/O)

Traditional Debugging Challenges

Without distributed tracing:

Publisher logs:   "Published task task_123 at 10:00:01"
Broker logs:     "Received task from agent_pub at 10:00:01"
                 "Routed task to agent_sub at 10:00:01"
Subscriber logs: "Processing task task_456 at 10:00:02"
                 "Completed task task_789 at 10:00:03"

Questions you can’t answer:

  • Which subscriber processed task_123?
  • How long did task_123 take end-to-end?
  • Where did task_123 fail?
  • What was the complete flow for a specific request?

The Solution: Distributed Tracing

Distributed tracing solves these problems by creating a unified view of requests as they flow through multiple services.

Core Concepts

Trace

A trace represents a complete request journey through the system. In AgentHub, a trace might represent:

  • Publishing a task
  • Processing the task
  • Publishing the result
  • Receiving the result
Trace ID: a1b2c3d4e5f67890
Duration: 150ms
Services: 3 (publisher, broker, subscriber)
Spans: 5
Status: Success

Span

A span represents a single operation within a trace. Each span has:

  • Name: What operation it represents
  • Start/End time: When it happened
  • Tags: Metadata about the operation
  • Logs: Events that occurred during the operation
  • Status: Success, error, or timeout
Span: "publish_event"
  Service: agenthub-publisher
  Duration: 25ms
  Tags:
    event.type: "greeting"
    event.id: "task_123"
    responder.agent: "agent_demo_subscriber"
  Status: OK

Span Context

The glue that connects spans across service boundaries. Contains:

  • Trace ID: Unique identifier for the entire request
  • Span ID: Unique identifier for the current operation
  • Trace Flags: Sampling decisions, debug mode, etc.

How Tracing Works in AgentHub

1. Trace Initiation

When a publisher creates a task, it starts a new trace:

// Publisher starts a trace
ctx, span := tracer.Start(ctx, "publish_event")
defer span.End()

// Add metadata
span.SetAttributes(
    attribute.String("event.type", "greeting"),
    attribute.String("event.id", taskID),
)

2. Context Propagation

The trace context is injected into the task metadata:

// Inject trace context into task headers
headers := make(map[string]string)
otel.GetTextMapPropagator().Inject(ctx, propagation.MapCarrier(headers))

// Embed headers in task metadata
task.Metadata = &structpb.Struct{
    Fields: map[string]*structpb.Value{
        "trace_headers": structpb.NewStructValue(&structpb.Struct{
            Fields: stringMapToStructFields(headers),
        }),
    },
}

3. Context Extraction

The broker and subscriber extract the trace context:

// Extract trace context from task metadata
if metadata := task.GetMetadata(); metadata != nil {
    if traceHeaders, ok := metadata.Fields["trace_headers"]; ok {
        headers := structFieldsToStringMap(traceHeaders.GetStructValue().Fields)
        ctx = otel.GetTextMapPropagator().Extract(ctx, propagation.MapCarrier(headers))
    }
}

// Continue the trace
ctx, span := tracer.Start(ctx, "process_event")
defer span.End()

4. Complete Request Flow

The result is a complete trace showing the entire request journey:

Trace: a1b2c3d4e5f67890
β”œβ”€β”€ publish_event (agenthub-publisher) [25ms]
β”‚   β”œβ”€β”€ event.type: greeting
β”‚   └── event.id: task_123
β”œβ”€β”€ route_task (agenthub-broker) [2ms]
β”‚   β”œβ”€β”€ source.agent: agent_demo_publisher
β”‚   └── target.agent: agent_demo_subscriber
β”œβ”€β”€ consume_event (agenthub-subscriber) [5ms]
β”‚   └── messaging.operation: receive
β”œβ”€β”€ process_task (agenthub-subscriber) [98ms]
β”‚   β”œβ”€β”€ task.type: greeting
β”‚   β”œβ”€β”€ task.parameter.name: Claude
β”‚   └── processing.status: completed
└── publish_result (agenthub-subscriber) [20ms]
    └── result.status: success

OpenTelemetry Architecture

OpenTelemetry is the observability framework that powers AgentHub’s tracing implementation.

The OpenTelemetry Stack

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                    Applications                         β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”      β”‚
β”‚  β”‚  Publisher  β”‚ β”‚   Broker    β”‚ β”‚ Subscriber  β”‚      β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜      β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”˜
                  β”‚               β”‚               β”‚
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β–Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β–Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β–Όβ”€β”€β”€β”€β”€β”
β”‚              OpenTelemetry SDK                        β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”      β”‚
β”‚  β”‚   Tracer    β”‚ β”‚    Meter    β”‚ β”‚   Logger    β”‚      β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜      β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                                  β”‚
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β–Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚            OpenTelemetry Collector                     β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”      β”‚
β”‚  β”‚  Receivers  β”‚ β”‚ Processors  β”‚ β”‚  Exporters  β”‚      β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜      β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”˜
                  β”‚               β”‚               β”‚
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β–Όβ”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β–Όβ”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β–Όβ”€β”€β”€β”€β”€β”
β”‚      Jaeger           β”‚ β”‚  Prometheus   β”‚ β”‚   Logs    β”‚
β”‚   (Tracing)           β”‚ β”‚  (Metrics)    β”‚ β”‚(Logging)  β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Core Components

Tracer

Creates and manages spans:

tracer := otel.Tracer("agenthub-publisher")
ctx, span := tracer.Start(ctx, "publish_event")
defer span.End()

Meter

Creates and manages metrics:

meter := otel.Meter("agenthub-publisher")
counter, _ := meter.Int64Counter("events_published_total")
counter.Add(ctx, 1)

Propagators

Handle context propagation across service boundaries:

// Inject context
otel.GetTextMapPropagator().Inject(ctx, carrier)

// Extract context
ctx = otel.GetTextMapPropagator().Extract(ctx, carrier)

Exporters

Send telemetry data to backend systems:

  • OTLP Exporter: Sends to OpenTelemetry Collector
  • Jaeger Exporter: Sends directly to Jaeger
  • Prometheus Exporter: Exposes metrics for Prometheus

AgentHub’s OpenTelemetry Implementation

Configuration

func NewObservability(config Config) (*Observability, error) {
    // Create resource (service identification)
    res, err := resource.New(ctx,
        resource.WithAttributes(
            semconv.ServiceName(config.ServiceName),
            semconv.ServiceVersion(config.ServiceVersion),
        ),
    )

    // Setup tracing
    traceExporter, err := otlptracegrpc.New(ctx,
        otlptracegrpc.WithEndpoint(config.JaegerEndpoint),
        otlptracegrpc.WithInsecure(),
    )

    tracerProvider := sdktrace.NewTracerProvider(
        sdktrace.WithBatcher(traceExporter),
        sdktrace.WithResource(res),
        sdktrace.WithSampler(sdktrace.AlwaysSample()),
    )

    otel.SetTracerProvider(tracerProvider)

    // Setup metrics
    meterProvider := sdkmetric.NewMeterProvider(
        sdkmetric.WithResource(res),
        sdkmetric.WithReader(promExporter),
    )

    otel.SetMeterProvider(meterProvider)
}

Custom slog Handler Integration

AgentHub’s custom logging handler automatically correlates logs with traces:

func (h *ObservabilityHandler) Handle(ctx context.Context, r slog.Record) error {
    // Extract trace context
    if span := trace.SpanFromContext(ctx); span.SpanContext().IsValid() {
        spanCtx := span.SpanContext()
        attrs = append(attrs,
            slog.String("trace_id", spanCtx.TraceID().String()),
            slog.String("span_id", spanCtx.SpanID().String()),
        )
    }

    // Structured log output with trace correlation
    logData := map[string]interface{}{
        "time":     r.Time.Format(time.RFC3339),
        "level":    r.Level.String(),
        "msg":      r.Message,
        "trace_id": spanCtx.TraceID().String(),
        "span_id":  spanCtx.SpanID().String(),
        "service":  h.serviceName,
    }
}

Observability Patterns in Event-Driven Systems

Pattern 1: Event Correlation

Challenge: Correlating events across async boundaries Solution: Inject trace context into event metadata

// Publisher injects context
headers := make(map[string]string)
otel.GetTextMapPropagator().Inject(ctx, propagation.MapCarrier(headers))
event.Metadata["trace_headers"] = headers

// Consumer extracts context
ctx = otel.GetTextMapPropagator().Extract(ctx, propagation.MapCarrier(event.Metadata["trace_headers"]))

Pattern 2: Async Operation Tracking

Challenge: Tracking operations that complete asynchronously Solution: Create child spans that can outlive their parents

// Start async operation
ctx, span := tracer.Start(ctx, "async_operation")

go func() {
    defer span.End()
    // Long-running async work
    processTask()
    span.SetStatus(2, "") // Success
}()

// Parent can continue/return immediately

Pattern 3: Error Propagation

Challenge: Understanding how errors flow through the system Solution: Record errors at each span and propagate error status

if err != nil {
    span.RecordError(err)
    span.SetStatus(1, err.Error()) // Error status

    // Optionally add error details
    span.SetAttributes(
        attribute.String("error.type", "validation_error"),
        attribute.String("error.message", err.Error()),
    )
}

Pattern 4: Performance Attribution

Challenge: Understanding where time is spent in complex flows Solution: Detailed span hierarchy with timing

// High-level operation
ctx, span := tracer.Start(ctx, "process_task")
defer span.End()

// Sub-operations
ctx, validateSpan := tracer.Start(ctx, "validate_input")
// ... validation logic
validateSpan.End()

ctx, computeSpan := tracer.Start(ctx, "compute_result")
// ... computation logic
computeSpan.End()

ctx, persistSpan := tracer.Start(ctx, "persist_result")
// ... persistence logic
persistSpan.End()

Benefits of AgentHub’s Observability Implementation

1. Complete Request Visibility

  • See every step of event processing
  • Understand inter-service dependencies
  • Track request flows across async boundaries

2. Performance Analysis

  • Identify bottlenecks in event processing
  • Understand where time is spent
  • Optimize critical paths

3. Error Diagnosis

  • Pinpoint exactly where failures occur
  • Understand error propagation patterns
  • Correlate errors with system state

4. Capacity Planning

  • Understand system throughput characteristics
  • Identify scaling bottlenecks
  • Plan resource allocation

5. Troubleshooting

  • Correlate logs, metrics, and traces
  • Understand system behavior under load
  • Debug complex distributed issues

Advanced Tracing Concepts

Sampling

Not every request needs to be traced. Sampling reduces overhead:

// Probability sampling (trace 10% of requests)
sdktrace.WithSampler(sdktrace.ParentBased(
    sdktrace.TraceIDRatioBased(0.1),
))

// Rate limiting sampling (max 100 traces/second)
sdktrace.WithSampler(sdktrace.ParentBased(
    sdktrace.RateLimited(100),
))

Custom Attributes

Add business context to spans:

span.SetAttributes(
    attribute.String("user.id", userID),
    attribute.String("tenant.id", tenantID),
    attribute.Int("batch.size", len(items)),
    attribute.String("workflow.type", workflowType),
)

Span Events

Add timestamped events within spans:

span.AddEvent("validation.started")
// ... validation logic
span.AddEvent("validation.completed", trace.WithAttributes(
    attribute.Int("validation.rules.evaluated", ruleCount),
))

Baggage

Propagate key-value pairs across the entire trace:

// Set baggage
ctx = baggage.ContextWithValues(ctx,
    baggage.String("user.tier", "premium"),
    baggage.String("feature.flag", "new_algorithm"),
)

// Read baggage in any service
if member := baggage.FromContext(ctx).Member("user.tier"); member.Value() == "premium" {
    // Use premium algorithm
}

Performance Considerations

Overhead Analysis

AgentHub’s observability adds:

  • CPU: ~5% overhead for tracing
  • Memory: ~50MB per service for buffers and metadata
  • Network: Minimal (async batched export)
  • Latency: ~10ms additional end-to-end latency

Optimization Strategies

  1. Sampling: Reduce trace volume for high-throughput systems
  2. Batching: Export spans in batches to reduce network overhead
  3. Async Processing: Never block business logic for observability
  4. Resource Limits: Use memory limiters in the collector

Production Recommendations

  • Enable sampling for high-volume systems
  • Monitor collector performance and scale horizontally if needed
  • Set retention policies for traces and metrics
  • Use dedicated infrastructure for observability stack

Troubleshooting Common Issues

Missing Traces

Symptoms: No traces appear in Jaeger Causes:

  • Context not propagated correctly
  • Exporter configuration issues
  • Collector connectivity problems

Debugging:

# Check if spans are being created
curl http://localhost:8080/metrics | grep trace

# Check collector logs
docker-compose logs otel-collector

# Verify Jaeger connectivity
curl http://localhost:16686/api/traces

Broken Trace Chains

Symptoms: Spans appear disconnected Causes:

  • Context not extracted properly
  • New context created instead of continuing existing

Debugging:

// Always check if context contains active span
if span := trace.SpanFromContext(ctx); span.SpanContext().IsValid() {
    fmt.Printf("Active trace: %s\n", span.SpanContext().TraceID())
} else {
    fmt.Println("No active trace context")
}

High Memory Usage

Symptoms: Observability causing OOM errors Causes:

  • Too many spans in memory
  • Large span attributes
  • Export failures causing backlog

Solutions:

// Configure memory limits
config := sdktrace.NewTracerProvider(
    sdktrace.WithSpanLimits(sdktrace.SpanLimits{
        AttributeCountLimit: 128,
        EventCountLimit:     128,
        LinkCountLimit:      128,
    }),
)

The Future of Observability

  1. eBPF-based Observability: Automatic instrumentation without code changes
  2. AI-Powered Analysis: Automatic anomaly detection and root cause analysis
  3. Unified Observability: Single pane of glass for metrics, traces, logs, and profiles
  4. Real-time Alerting: Faster detection and response to issues

OpenTelemetry Roadmap

  • Profiling: Continuous profiling integration
  • Client-side Observability: Browser and mobile app tracing
  • Database Instrumentation: Automatic query tracing
  • Infrastructure Correlation: Link application traces to infrastructure metrics

Conclusion

Distributed tracing transforms debugging from guesswork into precise investigation. AgentHub’s implementation with OpenTelemetry provides:

  • Complete visibility into event-driven workflows
  • Performance insights for optimization
  • Error correlation for faster resolution
  • Business context through custom attributes

The investment in observability pays dividends in:

  • Reduced MTTR (Mean Time To Resolution)
  • Improved performance through data-driven optimization
  • Better user experience through proactive monitoring
  • Team productivity through better tooling

🎯 Ready to Implement?

Hands-on: Observability Demo Tutorial

Production: Add Observability to Your Agent

Deep Dive: Observability Architecture

3.2 - Architecture Evolution: From Build Tags to Unified Abstractions

Understanding AgentHub’s evolution from build tag-based conditional compilation to unified abstractions with built-in observability.

πŸ”„ Architecture Evolution: From Build Tags to Unified Abstractions

Understanding-oriented: Learn how AgentHub evolved from build tag-based conditional compilation to a unified abstraction approach that dramatically simplifies development while providing comprehensive observability.

The Journey: Why AgentHub Moved Away from Build Tags

Legacy Approach: Build Tags for Conditional Features

AgentHub originally used Go build tags to handle different deployment scenarios:

  • Development: Fast builds with minimal features (go build)
  • Production: Full observability builds (go build -tags observability)
  • Testing: Lightweight versions for testing environments

Problems with Build Tags:

  • Maintenance overhead: Separate code paths for different builds
  • Testing complexity: Hard to ensure feature parity across variants
  • Developer experience: Multiple build commands and configurations
  • Binary complexity: Different feature sets in different binaries

Modern Solution: Unified Abstractions

AgentHub now uses a unified abstraction layer (internal/agenthub/) that provides:

  • Single codebase: No more separate files for different builds
  • Built-in observability: Always available, configured via environment
  • Simplified development: One build command, one binary
  • Runtime configuration: Features controlled by environment variables

The New Architecture

Core Components

The unified abstraction provides these key components:

1. AgentHubServer

// Single server implementation with built-in observability
server, err := agenthub.NewAgentHubServer(config)
if err != nil {
    return err
}

// Automatic OpenTelemetry, metrics, health checks
err = server.Start(ctx)

2. AgentHubClient

// Single client implementation with built-in observability
client, err := agenthub.NewAgentHubClient(config)
if err != nil {
    return err
}

// Automatic tracing, metrics, structured logging
err = client.Start(ctx)

3. TaskPublisher & TaskSubscriber

// High-level abstractions with automatic correlation
publisher := &agenthub.TaskPublisher{
    Client: client.Client,
    TraceManager: client.TraceManager,
    // Built-in observability
}

subscriber := agenthub.NewTaskSubscriber(client, agentID)
// Automatic task processing with tracing

Before vs After Comparison

Old Build Tag Approach

File Structure (Legacy):

agents/publisher/
β”œβ”€β”€ main.go                 # Basic version (~200 lines)
β”œβ”€β”€ main_observability.go   # Observable version (~380 lines)
β”œβ”€β”€ shared.go              # Common code
└── config.go              # Configuration

broker/
β”œβ”€β”€ main.go                 # Basic broker (~150 lines)
β”œβ”€β”€ main_observability.go   # Observable broker (~300 lines)
└── server.go              # Core logic

Build Commands (Legacy):

# Basic build
go build -o bin/publisher agents/publisher/

# Observable build
go build -tags observability -o bin/publisher-obs agents/publisher/

# Testing observable features
go test -tags observability ./...

New Unified Approach

File Structure (Current):

agents/publisher/
└── main.go                 # Single implementation (~50 lines)

agents/subscriber/
└── main.go                 # Single implementation (~60 lines)

broker/
└── main.go                 # Single implementation (~30 lines)

internal/agenthub/          # Unified abstraction layer
β”œβ”€β”€ grpc.go                # Client/server with observability
β”œβ”€β”€ subscriber.go          # Task processing abstractions
β”œβ”€β”€ broker.go             # Event bus implementation
└── metadata.go           # Correlation and metadata

Build Commands (Current):

# Single build for all use cases
go build -o bin/publisher agents/publisher/
go build -o bin/subscriber agents/subscriber/
go build -o bin/broker broker/

# Testing (no special tags needed)
go test ./...

Configuration Evolution

Environment-Based Configuration

Instead of build tags, features are now controlled via environment variables:

# Observability configuration
export JAEGER_ENDPOINT="http://localhost:14268/api/traces"
export OTEL_SERVICE_NAME="agenthub"
export OTEL_SERVICE_VERSION="1.0.0"

# Health and metrics ports
export BROKER_HEALTH_PORT="8080"

# Broker connection
export AGENTHUB_BROKER_ADDR="localhost"
export AGENTHUB_BROKER_PORT="50051"

Automatic Feature Detection

The unified abstractions automatically configure features based on environment:

// Observability is automatically configured
config := agenthub.NewGRPCConfig("publisher")
client, err := agenthub.NewAgentHubClient(config)

// If JAEGER_ENDPOINT is set β†’ tracing enabled
// If BROKER_HEALTH_PORT is set β†’ health server enabled
// Always includes structured logging and basic metrics

Benefits of the New Architecture

1. Developer Experience

  • Single build command: No more tag confusion
  • Consistent behavior: Same binary for all environments
  • Easier testing: No need for multiple test runs
  • Simplified CI/CD: One build pipeline

2. Maintenance Reduction

  • 90% less code: From 380+ lines to 29 lines for broker
  • Single code path: No more duplicate implementations
  • Unified testing: Test once, works everywhere
  • Automatic features: Observability included by default

3. Operational Benefits

  • Runtime configuration: Change behavior without rebuilding
  • Consistent deployment: Same binary across environments
  • Better observability: Always available when needed
  • Easier debugging: Full context always present

Migration Guide

For users migrating from the old build tag approach:

Old Commands β†’ New Commands

# OLD: Basic builds
go build -o bin/publisher agents/publisher/
# NEW: Same command (unchanged)
go build -o bin/publisher agents/publisher/

# OLD: Observable builds
go build -tags observability -o bin/publisher-obs agents/publisher/
# NEW: Same binary, configure via environment
export JAEGER_ENDPOINT="http://localhost:14268/api/traces"
go build -o bin/publisher agents/publisher/

# OLD: Testing with tags
go test -tags observability ./...
# NEW: Standard testing
go test ./...

Configuration Migration

# OLD: Feature controlled by build tags
go build -tags observability

# NEW: Feature controlled by environment
export JAEGER_ENDPOINT="http://localhost:14268/api/traces"
export OTEL_SERVICE_NAME="my-service"

Architecture Philosophy

From Compile-Time to Runtime

The move from build tags to unified abstractions represents a fundamental shift:

Build Tags Philosophy (Old):

  • “Choose features at compile time”
  • “Different binaries for different needs”
  • “Minimize what’s included”

Unified Abstractions Philosophy (New):

  • “Include everything, configure at runtime”
  • “One binary, many configurations”
  • “Maximize developer experience”

Why This Change?

  1. Cloud-Native Reality: Modern deployments use containers with environment-based config
  2. Developer Productivity: Unified approach eliminates confusion and errors
  3. Testing Simplicity: One code path means reliable testing
  4. Operational Excellence: Runtime configuration enables better operations

Performance Considerations

Resource Impact

The unified approach has minimal overhead:

Binary Size:
- Old basic: ~8MB
- Old observable: ~15MB
- New unified: ~12MB

Memory Usage:
- Baseline: ~10MB
- With observability: ~15MB (when enabled)
- Without observability: ~10MB (minimal overhead)

Startup Time:
- With observability enabled: ~150ms
- With observability disabled: ~50ms

Optimization Strategy

The abstractions use lazy initialization:

// Observability components only initialize if configured
if config.JaegerEndpoint != "" {
    // Initialize tracing
}

if config.HealthPort != "" {
    // Start health server
}

// Always minimal logging and basic metrics

Future Evolution

Planned Enhancements

  1. Plugin Architecture: Dynamic feature loading
  2. Configuration Profiles: Predefined environment sets
  3. Feature Flags: Runtime feature toggling
  4. Auto-Configuration: Intelligent environment detection

Compatibility Promise

The unified abstractions maintain backward compatibility:

  • Old environment variables still work
  • Gradual migration path available
  • No breaking changes in core APIs

This architectural evolution demonstrates how AgentHub prioritizes developer experience and operational simplicity while maintaining full observability capabilities. The move from build tags to unified abstractions represents a maturation of the platform toward cloud-native best practices.

3.3 - Performance and Scaling Considerations

Explore the performance characteristics of AgentHub, scaling patterns, and optimization strategies for different deployment scenarios.

Performance and Scaling Considerations

This document explores the performance characteristics of AgentHub, scaling patterns, and optimization strategies for different deployment scenarios.

Performance Characteristics

Baseline Performance Metrics

Test Environment:

  • 4-core Intel i7 processor
  • 16GB RAM
  • Local network (localhost)
  • Go 1.24

Measured Performance:

  • Task throughput: 8,000-12,000 tasks/second
  • Task routing latency: 0.1-0.5ms average
  • End-to-end latency: 2-10ms (including processing)
  • Memory per agent: ~1KB active subscription state
  • Concurrent agents: 1,000+ agents per broker instance

Performance Factors

1. Task Routing Performance

Task routing is the core performance bottleneck in AgentHub:

// Fast path: Direct agent routing
if responderID := req.GetTask().GetResponderAgentId(); responderID != "" {
    if subs, ok := s.taskSubscribers[responderID]; ok {
        targetChannels = subs  // O(1) lookup
    }
}

Optimization factors:

  • Direct routing: O(1) lookup time for targeted tasks
  • Broadcast routing: O(n) where n = number of subscribed agents
  • Channel delivery: Concurrent delivery via goroutines
  • Lock contention: Read locks allow concurrent routing

2. Message Serialization

Protocol Buffers provide efficient serialization:

  • Binary encoding: ~60% smaller than JSON
  • Zero-copy operations: Direct memory mapping where possible
  • Schema evolution: Backward/forward compatibility
  • Type safety: Compile-time validation

3. Memory Usage Patterns

// Memory usage breakdown per agent:
type agentMemoryFootprint struct {
    SubscriptionState    int // ~200 bytes (map entry + channel)
    ChannelBuffer       int // ~800 bytes (10 message buffer * 80 bytes avg)
    ConnectionOverhead  int // ~2KB (gRPC stream state)
    // Total: ~3KB per active agent
}

Memory optimization strategies:

  • Bounded channels: Prevent unbounded growth
  • Connection pooling: Reuse gRPC connections
  • Garbage collection: Go’s GC handles cleanup automatically

Scaling Patterns

Vertical Scaling (Scale Up)

Increasing resources on a single broker instance:

CPU Scaling

  • Multi-core utilization: Go’s runtime leverages multiple cores
  • Goroutine efficiency: Lightweight concurrency (2KB stack)
  • CPU-bound operations: Message serialization, routing logic
// Configure for CPU optimization
export GOMAXPROCS=8  // Match available CPU cores

Memory Scaling

  • Linear growth: Memory usage scales with number of agents
  • Buffer tuning: Adjust channel buffer sizes based on throughput
// Memory-optimized configuration
subChan := make(chan *pb.TaskMessage, 5)  // Smaller buffers for memory-constrained environments
// vs
subChan := make(chan *pb.TaskMessage, 50) // Larger buffers for high-throughput environments

Network Scaling

  • Connection limits: OS file descriptor limits (ulimit -n)
  • Bandwidth utilization: Protocol Buffers minimize bandwidth usage
  • Connection keepalive: Efficient connection reuse

Horizontal Scaling (Scale Out)

Distributing load across multiple broker instances:

1. Agent Partitioning

Static Partitioning:

Agent Groups:
β”œβ”€β”€ Broker 1: agents_1-1000
β”œβ”€β”€ Broker 2: agents_1001-2000
└── Broker 3: agents_2001-3000

Hash-based Partitioning:

func selectBroker(agentID string) string {
    hash := fnv.New32a()
    hash.Write([]byte(agentID))
    brokerIndex := hash.Sum32() % uint32(len(brokers))
    return brokers[brokerIndex]
}

2. Task Type Partitioning

Specialized Brokers:

Task Routing:
β”œβ”€β”€ Broker 1: data_processing, analytics
β”œβ”€β”€ Broker 2: image_processing, ml_inference
└── Broker 3: notifications, logging

3. Geographic Partitioning

Regional Distribution:

Geographic Deployment:
β”œβ”€β”€ US-East: Broker cluster for East Coast agents
β”œβ”€β”€ US-West: Broker cluster for West Coast agents
└── EU: Broker cluster for European agents

Load Balancing Strategies

1. Round-Robin Agent Distribution

type LoadBalancer struct {
    brokers []string
    current int
    mu      sync.Mutex
}

func (lb *LoadBalancer) NextBroker() string {
    lb.mu.Lock()
    defer lb.mu.Unlock()

    broker := lb.brokers[lb.current]
    lb.current = (lb.current + 1) % len(lb.brokers)
    return broker
}

2. Capacity-Based Routing

type BrokerMetrics struct {
    ActiveAgents int
    TasksPerSec  float64
    CPUUsage     float64
    MemoryUsage  float64
}

func selectBestBroker(brokers []BrokerMetrics) int {
    // Select broker with lowest load score
    bestIndex := 0
    bestScore := calculateLoadScore(brokers[0])

    for i, broker := range brokers[1:] {
        score := calculateLoadScore(broker)
        if score < bestScore {
            bestScore = score
            bestIndex = i + 1
        }
    }
    return bestIndex
}

Performance Optimization Strategies

1. Message Batching

For high-throughput scenarios, implement message batching:

type BatchProcessor struct {
    tasks     []*pb.TaskMessage
    batchSize int
    timeout   time.Duration
    ticker    *time.Ticker
}

func (bp *BatchProcessor) processBatch() {
    batch := make([]*pb.TaskMessage, len(bp.tasks))
    copy(batch, bp.tasks)
    bp.tasks = bp.tasks[:0] // Clear slice

    // Process entire batch
    go bp.routeBatch(batch)
}

2. Connection Pooling

Optimize gRPC connections for better resource utilization:

type ConnectionPool struct {
    connections map[string]*grpc.ClientConn
    maxConns    int
    mu          sync.RWMutex
}

func (cp *ConnectionPool) GetConnection(addr string) (*grpc.ClientConn, error) {
    cp.mu.RLock()
    if conn, exists := cp.connections[addr]; exists {
        cp.mu.RUnlock()
        return conn, nil
    }
    cp.mu.RUnlock()

    // Create new connection
    return cp.createConnection(addr)
}

3. Adaptive Channel Sizing

Dynamically adjust channel buffer sizes based on load:

func calculateOptimalBufferSize(avgTaskRate float64, processingTime time.Duration) int {
    // Buffer size = rate * processing time + safety margin
    bufferSize := int(avgTaskRate * processingTime.Seconds()) + 10

    // Clamp to reasonable bounds
    if bufferSize < 5 {
        return 5
    }
    if bufferSize > 100 {
        return 100
    }
    return bufferSize
}

4. Memory Optimization

Reduce memory allocations in hot paths:

// Use sync.Pool for frequent allocations
var taskPool = sync.Pool{
    New: func() interface{} {
        return &pb.TaskMessage{}
    },
}

func processTaskOptimized(task *pb.TaskMessage) {
    // Reuse task objects
    pooledTask := taskPool.Get().(*pb.TaskMessage)
    defer taskPool.Put(pooledTask)

    // Copy and process
    *pooledTask = *task
    // ... processing logic
}

Monitoring and Metrics

Key Performance Indicators (KPIs)

Throughput Metrics

type ThroughputMetrics struct {
    TasksPerSecond     float64
    ResultsPerSecond   float64
    ProgressPerSecond  float64
    MessagesPerSecond  float64
}

Latency Metrics

type LatencyMetrics struct {
    RoutingLatency     time.Duration // Broker routing time
    ProcessingLatency  time.Duration // Agent processing time
    EndToEndLatency    time.Duration // Total task completion time
    P50, P95, P99      time.Duration // Percentile latencies
}

Resource Metrics

type ResourceMetrics struct {
    ActiveAgents       int
    ActiveTasks        int
    MemoryUsage        int64
    CPUUsage           float64
    GoroutineCount     int
    OpenConnections    int
}

Monitoring Implementation

import "github.com/prometheus/client_golang/prometheus"

var (
    taskCounter = prometheus.NewCounterVec(
        prometheus.CounterOpts{
            Name: "agenthub_tasks_total",
            Help: "Total number of tasks processed",
        },
        []string{"task_type", "status"},
    )

    latencyHistogram = prometheus.NewHistogramVec(
        prometheus.HistogramOpts{
            Name:    "agenthub_task_duration_seconds",
            Help:    "Task processing duration",
            Buckets: prometheus.DefBuckets,
        },
        []string{"task_type"},
    )
)

Scaling Recommendations

Small Deployments (1-100 agents)

  • Single broker instance: Sufficient for most small deployments
  • Vertical scaling: Add CPU/memory as needed
  • Simple monitoring: Basic logging and health checks

Medium Deployments (100-1,000 agents)

  • Load balancing: Implement agent distribution
  • Resource monitoring: Track CPU, memory, and throughput
  • Optimization: Tune channel buffer sizes and timeouts

Large Deployments (1,000+ agents)

  • Horizontal scaling: Multiple broker instances
  • Partitioning strategy: Implement agent or task type partitioning
  • Advanced monitoring: Full metrics and alerting
  • Performance testing: Regular load testing and optimization

High-Throughput Scenarios (10,000+ tasks/second)

  • Message batching: Implement batch processing
  • Connection optimization: Use connection pooling
  • Hardware optimization: SSD storage, high-speed networking
  • Profiling: Regular performance profiling and optimization

Troubleshooting Performance Issues

Common Performance Problems

1. High Latency

Symptoms: Slow task processing times Causes: Network latency, overloaded agents, inefficient routing Solutions: Optimize routing, add caching, scale horizontally

2. Memory Leaks

Symptoms: Increasing memory usage over time Causes: Unclosed channels, goroutine leaks, connection leaks Solutions: Proper cleanup, monitoring, garbage collection tuning

3. Connection Limits

Symptoms: New agents can’t connect Causes: OS file descriptor limits, broker resource limits Solutions: Increase limits, implement connection pooling

4. Message Loss

Symptoms: Tasks not reaching agents or results not returned Causes: Timeout issues, network problems, buffer overflows Solutions: Increase timeouts, improve error handling, adjust buffer sizes

Performance Testing

Load Testing Script

func loadTest() {
    // Create multiple publishers
    publishers := make([]Publisher, 10)
    for i := range publishers {
        publishers[i] = NewPublisher(fmt.Sprintf("publisher_%d", i))
    }

    // Send tasks concurrently
    taskRate := 1000 // tasks per second
    duration := 60 * time.Second

    ticker := time.NewTicker(time.Duration(1e9 / taskRate))
    timeout := time.After(duration)

    for {
        select {
        case <-ticker.C:
            publisher := publishers[rand.Intn(len(publishers))]
            go publisher.PublishTask(generateRandomTask())
        case <-timeout:
            return
        }
    }
}

The AgentHub architecture provides solid performance for most use cases and clear scaling paths for growing deployments. Regular monitoring and optimization ensure continued performance as your agent ecosystem evolves.

3.4 - The Unified Abstraction Library

The AgentHub Unified Abstraction Library dramatically simplifies the development of agents and brokers while providing built-in observability, environment-based configuration, and automatic correlation tracking.

The A2A-Compliant Unified Abstraction Library

Overview

The AgentHub Unified Abstraction Library (internal/agenthub/) is a comprehensive set of A2A protocol-compliant abstractions that dramatically simplifies the development of A2A agents and brokers while providing built-in observability, environment-based configuration, and automatic correlation tracking.

Key Benefits

Before and After Comparison

Before (Legacy approach):

  • broker/main_observability.go: 380+ lines of boilerplate
  • Manual OpenTelemetry setup in every component
  • Duplicate configuration handling across components
  • Manual correlation ID management
  • Separate observability and non-observability variants

After (Unified abstractions):

  • broker/main.go: 29 lines using abstractions
  • Automatic OpenTelemetry integration
  • Environment-based configuration
  • Automatic correlation ID generation and propagation
  • Single implementation with built-in observability

Core Components

1. gRPC Abstractions (grpc.go)

AgentHubServer

Provides a complete gRPC server abstraction with:

  • Automatic OpenTelemetry instrumentation
  • Environment-based configuration
  • Built-in health checks
  • Metrics collection
  • Graceful shutdown
// Create and start a broker in one line
func StartBroker(ctx context.Context) error {
    config := NewGRPCConfig("broker")
    server, err := NewAgentHubServer(config)
    if err != nil {
        return err
    }
    return server.Start(ctx)
}

AgentHubClient

Provides a complete gRPC client abstraction with:

  • Automatic connection management
  • Built-in observability
  • Environment-based server discovery
  • Health monitoring
// Create a client with built-in observability
config := agenthub.NewGRPCConfig("publisher")
client, err := agenthub.NewAgentHubClient(config)

2. A2A Task Management Abstractions (a2a.go)

A2ATaskPublisher

Simplifies A2A task publishing with:

  • Automatic A2A message generation
  • Built-in observability tracing
  • A2A context management
  • Structured error handling
  • A2A-compliant message formatting
a2aPublisher := &agenthub.A2ATaskPublisher{
    Client:         client.Client,
    TraceManager:   client.TraceManager,
    MetricsManager: client.MetricsManager,
    Logger:         client.Logger,
    ComponentName:  "a2a_publisher",
}

// Create A2A task with structured message content
task := &a2a.Task{
    Id:        "task_greeting_" + uuid.New().String(),
    ContextId: "conversation_123",
    Status: &a2a.TaskStatus{
        State: a2a.TaskState_TASK_STATE_SUBMITTED,
        Update: &a2a.Message{
            MessageId: "msg_" + uuid.New().String(),
            Role:      a2a.Role_USER,
            Content: []*a2a.Part{
                {
                    Part: &a2a.Part_Text{
                        Text: "Please process greeting task",
                    },
                },
                {
                    Part: &a2a.Part_Data{
                        Data: &a2a.DataPart{
                            Data:        greetingParams,
                            Description: "Greeting parameters",
                        },
                    },
                },
            },
        },
        Timestamp: timestamppb.Now(),
    },
}

err := a2aPublisher.PublishA2ATask(ctx, task, &pb.AgentEventMetadata{
    FromAgentId: "publisher_id",
    ToAgentId:   "subscriber_id",
    EventType:   "task.submitted",
    Priority:    pb.Priority_PRIORITY_MEDIUM,
})

A2ATaskProcessor

Provides full observability for A2A task processing:

  • Automatic A2A trace propagation
  • Rich A2A span annotations with context and message details
  • A2A message processing metrics
  • A2A conversation context tracking
  • Error tracking with A2A-compliant error messages

3. A2A Subscriber Abstractions (a2a_subscriber.go)

A2ATaskSubscriber

Complete A2A subscriber implementation with:

  • A2A-compliant task handler system
  • Built-in A2A message processors
  • Automatic A2A artifact publishing
  • Full A2A observability integration
  • A2A conversation context awareness
a2aSubscriber := agenthub.NewA2ATaskSubscriber(client, agentID)
a2aSubscriber.RegisterDefaultA2AHandlers()

// Custom A2A task handlers
a2aSubscriber.RegisterA2ATaskHandler("greeting", func(ctx context.Context, event *pb.AgentEvent) error {
    task := event.GetTask()
    if task == nil {
        return fmt.Errorf("no task in event")
    }

    // Process A2A task content
    requestMessage := task.Status.Update
    response := a2aSubscriber.ProcessA2AMessage(ctx, requestMessage)

    // Create completion artifact
    artifact := &a2a.Artifact{
        ArtifactId: "artifact_" + uuid.New().String(),
        Name:       "Greeting Response",
        Description: "Processed greeting task result",
        Parts: []*a2a.Part{
            {
                Part: &a2a.Part_Text{
                    Text: response,
                },
            },
        },
    }

    // Complete task with artifact
    return a2aSubscriber.CompleteA2ATaskWithArtifact(ctx, task, artifact)
})

go a2aSubscriber.SubscribeToA2ATasks(ctx)
go a2aSubscriber.SubscribeToA2AMessages(ctx)

4. A2A Broker Service (a2a_broker.go)

Complete A2A-compliant AgentHub service implementation that handles:

  • A2A message routing and delivery
  • A2A subscription management with context filtering
  • A2A artifact distribution
  • A2A task state management
  • EDA+A2A hybrid routing
  • Full A2A observability
// A2A broker service with unified abstractions
type A2ABrokerService struct {
    // A2A-specific components
    MessageRouter    *A2AMessageRouter
    TaskManager      *A2ATaskManager
    ContextManager   *A2AContextManager
    ArtifactManager  *A2AArtifactManager

    // EDA integration
    EventBus         *EDAEventBus
    SubscriptionMgr  *A2ASubscriptionManager

    // Observability
    TraceManager     *TraceManager
    MetricsManager   *A2AMetricsManager
}

A2A Environment-Based Configuration

The library uses environment variables for zero-configuration A2A setup:

# Core AgentHub A2A Settings
export AGENTHUB_BROKER_ADDR=localhost
export AGENTHUB_BROKER_PORT=50051

# A2A Protocol Configuration
export AGENTHUB_A2A_PROTOCOL_VERSION=1.0
export AGENTHUB_MESSAGE_BUFFER_SIZE=100
export AGENTHUB_CONTEXT_TIMEOUT=30s
export AGENTHUB_ARTIFACT_MAX_SIZE=10MB

# Observability Endpoints
export JAEGER_ENDPOINT=127.0.0.1:4317
export OTEL_EXPORTER_OTLP_ENDPOINT=http://127.0.0.1:4317

# A2A Health Check Ports
export AGENTHUB_HEALTH_PORT=8080
export A2A_PUBLISHER_HEALTH_PORT=8081
export A2A_SUBSCRIBER_HEALTH_PORT=8082

A2A Automatic Observability

A2A Distributed Tracing

  • Automatic A2A instrumentation: OpenTelemetry gRPC interceptors handle A2A trace propagation
  • A2A service naming: Unified “agenthub” service with A2A component differentiation
  • Rich A2A annotations: Message content, conversation context, task state transitions, and artifact details
  • A2A context tracking: Complete conversation thread visibility across multiple agents

A2A Metrics Collection

  • A2A message metrics: Message processing rates, A2A error rates, latencies by message type
  • A2A task metrics: Task completion rates, state transition times, artifact production metrics
  • A2A context metrics: Conversation context tracking, multi-agent coordination patterns
  • A2A system metrics: Health checks, A2A connection status, protocol version compatibility
  • A2A component metrics: Per-agent A2A performance, broker routing efficiency

Health Monitoring

  • Automatic endpoints: /health, /ready, /metrics
  • Component tracking: Individual health per service
  • Graceful shutdown: Proper cleanup and connection management

A2A Correlation and Context Tracking

Automatic A2A Correlation IDs

// A2A task ID generation
taskID := fmt.Sprintf("task_%s_%s", taskDescription, uuid.New().String())

// A2A message ID generation
messageID := fmt.Sprintf("msg_%d_%s", time.Now().Unix(), uuid.New().String())

// A2A context ID for conversation threading
contextID := fmt.Sprintf("ctx_%s_%s", workflowType, uuid.New().String())

A2A Context Propagation

  • A2A conversation threading: Context IDs link related tasks across agents
  • A2A message history: Complete audit trail of all messages in a conversation
  • A2A workflow tracking: End-to-end visibility of multi-agent workflows

Trace Propagation

  • W3C Trace Context: Standard distributed tracing headers
  • Automatic propagation: gRPC interceptors handle context passing
  • End-to-end visibility: Publisher β†’ Broker β†’ Subscriber traces

A2A Migration Guide

From Legacy EventBus to A2A Abstractions

Before (Legacy EventBus):

// 50+ lines of observability setup
obs, err := observability.New(ctx, observability.Config{...})
server := grpc.NewServer(grpc.UnaryInterceptor(...))
pb.RegisterEventBusServer(server, &eventBusService{...})

// Manual task message creation
task := &pb.TaskMessage{
    TaskId:   "task_123",
    TaskType: "greeting",
    // ... manual field population
}

After (A2A Abstractions):

// One line A2A broker startup
err := agenthub.StartA2ABroker(ctx)

// A2A task creation with abstractions
task := a2aPublisher.CreateA2ATask("greeting", greetingContent, "conversation_123")
err := a2aPublisher.PublishA2ATask(ctx, task, routingMetadata)

Best Practices

1. Use Environment Configuration

Let the library handle configuration automatically:

source .envrc  # Load all environment variables
go run broker/main.go

2. Register Custom A2A Handlers

Extend functionality with custom A2A task handlers:

a2aSubscriber.RegisterA2ATaskHandler("my_task", myCustomA2AHandler)

// A2A handler signature with event and context
func myCustomA2AHandler(ctx context.Context, event *pb.AgentEvent) error {
    task := event.GetTask()
    // Process A2A message content
    return a2aSubscriber.CompleteA2ATaskWithArtifact(ctx, task, resultArtifact)
}

3. Leverage Built-in Observability

The library provides comprehensive observability by default - no additional setup required.

4. Use A2A Structured Logging

The library provides structured loggers with A2A trace correlation:

// A2A-aware logging with context
client.Logger.InfoContext(ctx, "Processing A2A task",
    "task_id", task.GetId(),
    "context_id", task.GetContextId(),
    "message_count", len(task.GetHistory()),
    "current_state", task.GetStatus().GetState().String(),
)

A2A Architecture Benefits

Code Reduction with A2A Abstractions

  • A2A Broker: 380+ lines β†’ 29 lines (92% reduction)
  • A2A Publisher: 150+ lines β†’ 45 lines (70% reduction)
  • A2A Subscriber: 200+ lines β†’ 55 lines (72% reduction)
  • A2A Message Handling: Complex manual parsing β†’ automatic Part processing
  • A2A Context Management: Manual tracking β†’ automatic conversation threading

A2A Maintainability

  • A2A protocol compliance: Centralized A2A message handling ensures protocol adherence
  • Consistent A2A patterns: Same abstractions across all A2A components
  • A2A-aware configuration: Environment variables tuned for A2A performance
  • A2A context preservation: Automatic conversation context management

A2A Developer Experience

  • Zero A2A boilerplate: Built-in A2A message parsing and artifact handling
  • A2A-native architecture: Easy to extend with custom A2A message processors
  • Automatic A2A setup: One-line A2A service creation with protocol compliance
  • A2A debugging: Rich conversation context and message history for troubleshooting

A2A Future Extensibility

The A2A abstraction library is designed for A2A protocol extension:

  • Custom A2A Part types: Easy to add new content types (text, data, files, custom)
  • Custom A2A observability: Extend A2A metrics and conversation tracing
  • A2A configuration: Override A2A protocol defaults with environment variables
  • A2A transport options: Extend beyond gRPC while maintaining A2A compliance
  • A2A protocol evolution: Built-in version compatibility and migration support

A2A Protocol Extension Points

// Custom A2A Part type
type CustomPart struct {
    CustomData interface{} `json:"custom_data"`
    Format     string      `json:"format"`
}

// Custom A2A artifact processor
type CustomArtifactProcessor struct {
    SupportedTypes []string
    ProcessFunc    func(ctx context.Context, artifact *a2a.Artifact) error
}

// Custom A2A context manager
type CustomContextManager struct {
    ContextRules map[string]ContextRule
    RouteFunc    func(contextId string, message *a2a.Message) []string
}

This A2A-compliant unified approach provides a solid foundation for building complex multi-agent systems with full Agent2Agent protocol support while maintaining simplicity, comprehensive observability, and rich conversation capabilities.