This is the multi-page printable view of this section. Click here to print.

Return to the regular view of this page.

Architecture

Deep dive into AgentHub’s system architecture and design

Architecture Explanations

Understand the fundamental architecture and design principles behind AgentHub’s distributed agent system.

Available Documentation

1 - A2A-Compliant EDA Broker Architecture

Deep dive into the internal architecture of the AgentHub EDA broker, how it implements Agent2Agent (A2A) protocol-compliant communication patterns while maintaining Event-Driven Architecture benefits.

AgentHub A2A-Compliant EDA Broker Architecture

This document explains the internal architecture of the AgentHub Event-Driven Architecture (EDA) broker, how it implements Agent2Agent (A2A) protocol-compliant communication patterns, and the design decisions behind its hybrid approach.

Architectural Overview

The AgentHub broker serves as a centralized Event-Driven Architecture hub that transports Agent2Agent (A2A) protocol-compliant messages between distributed agents. It combines the scalability benefits of EDA with the interoperability guarantees of the A2A protocol.

┌─────────────────────────────────────────────────────────────────┐
│                     AgentHub Broker                             │
├─────────────────────────────────────────────────────────────────┤
│  ┌─────────────────┐    ┌─────────────────┐    ┌─────────────────┐│
│  │   Task Router   │    │   Subscriber    │    │   Progress      ││
│  │                 │    │   Manager       │    │   Tracker       ││
│  │ • Route tasks   │    │                 │    │                 ││
│  │ • Apply filters │    │ • Manage agent  │    │ • Track task    ││
│  │ • Broadcast     │    │   subscriptions │    │   progress      ││
│  │ • Load balance  │    │ • Handle        │    │ • Update        ││
│  │                 │    │   disconnects   │    │   requesters    ││
│  └─────────────────┘    └─────────────────┘    └─────────────────┘│
├─────────────────────────────────────────────────────────────────┤
│                        gRPC Interface                           │
├─────────────────────────────────────────────────────────────────┤
│  ┌─────────────────┐    ┌─────────────────┐    ┌─────────────────┐│
│  │ PublishTask     │    │SubscribeToTasks│    │SubscribeToTask  ││
│  │ PublishResult   │    │SubscribeToRes  │    │ Progress        ││
│  │ PublishProgress │    │                 │    │                 ││
│  └─────────────────┘    └─────────────────┘    └─────────────────┘│
└─────────────────────────────────────────────────────────────────┘

Core Components

1. Event Bus Server

The main server implementation at broker/main.go:22 provides the central coordination point:

type eventBusServer struct {
    pb.UnimplementedEventBusServer

    // Subscription management
    taskSubscribers         map[string][]chan *pb.TaskMessage
    taskResultSubscribers   map[string][]chan *pb.TaskResult
    taskProgressSubscribers map[string][]chan *pb.TaskProgress
    taskMu                  sync.RWMutex
}

Key characteristics:

  • Thread-safe: Uses sync.RWMutex to protect concurrent access to subscriber maps
  • Channel-based: Uses Go channels for efficient message passing
  • Non-blocking: Implements timeouts to prevent blocking on slow consumers
  • Stateless: No persistent storage - all state is in-memory

2. Task Routing Engine

The routing logic determines how tasks are delivered to agents:

Direct Routing

When a task specifies a ResponderAgentId, it’s routed directly to that agent:

if responderID := req.GetTask().GetResponderAgentId(); responderID != "" {
    if subs, ok := s.taskSubscribers[responderID]; ok {
        targetChannels = subs
    }
}

Broadcast Routing

When no specific responder is set, tasks are broadcast to all subscribed agents:

} else {
    // Broadcast to all task subscribers
    for _, subs := range s.taskSubscribers {
        targetChannels = append(targetChannels, subs...)
    }
}

Routing Features

  • Immediate delivery: Tasks are routed immediately upon receipt
  • Multiple subscribers: Single agent can have multiple subscription channels
  • Timeout protection: 5-second timeout prevents blocking on unresponsive agents
  • Error isolation: Failed delivery to one agent doesn’t affect others

3. Subscription Management

The broker manages three types of subscriptions:

Task Subscriptions

Agents subscribe to receive tasks assigned to them:

func (s *eventBusServer) SubscribeToTasks(req *pb.SubscribeToTasksRequest, stream pb.EventBus_SubscribeToTasksServer) error
  • Agent-specific: Tasks are delivered based on agent ID
  • Type filtering: Optional filtering by task types
  • Long-lived streams: Connections persist until agent disconnects
  • Automatic cleanup: Subscriptions are removed when connections close

Result Subscriptions

Publishers subscribe to receive results of tasks they requested:

func (s *eventBusServer) SubscribeToTaskResults(req *pb.SubscribeToTaskResultsRequest, stream pb.EventBus_SubscribeToTaskResultsServer) error

Progress Subscriptions

Publishers can track progress of long-running tasks:

func (s *eventBusServer) SubscribeToTaskProgress(req *pb.SubscribeToTaskResultsRequest, stream pb.EventBus_SubscribeToTaskProgressServer) error

4. Message Flow Architecture

Task Publication Flow

  1. Validation: Incoming tasks are validated for required fields
  2. Routing: Tasks are routed to appropriate subscribers
  3. Delivery: Messages are sent via Go channels with timeout protection
  4. Response: Publisher receives acknowledgment of successful publication

Result Flow

  1. Receipt: Agents publish task results back to the broker
  2. Broadcasting: Results are broadcast to all result subscribers
  3. Filtering: Subscribers receive results for their requested tasks
  4. Delivery: Results are streamed back to requesting agents

Progress Flow

  1. Updates: Executing agents send periodic progress updates
  2. Distribution: Progress updates are sent to interested subscribers
  3. Real-time delivery: Updates are streamed immediately upon receipt

Design Decisions and Trade-offs

In-Memory State Management

Decision: Store all subscription state in memory using Go maps and channels.

Benefits:

  • High performance: No database overhead for message routing
  • Low latency: Sub-millisecond message routing
  • Simplicity: Easier to develop, test, and maintain
  • Concurrent efficiency: Go’s garbage collector handles channel cleanup

Trade-offs:

  • No persistence: Broker restart loses all subscription state
  • Memory usage: Large numbers of agents increase memory requirements
  • Single point of failure: No built-in redundancy

When this works well:

  • Development and testing environments
  • Small to medium-scale deployments
  • Scenarios where agents can re-establish subscriptions on broker restart

Asynchronous Message Delivery

Decision: Use Go channels with timeout-based delivery.

Implementation:

go func(ch chan *pb.TaskMessage, task pb.TaskMessage) {
    select {
    case ch <- &task:
        // Message sent successfully
    case <-ctx.Done():
        log.Printf("Context cancelled while sending task %s", task.GetTaskId())
    case <-time.After(5 * time.Second):
        log.Printf("Timeout sending task %s. Dropping message.", task.GetTaskId())
    }
}(subChan, taskToSend)

Benefits:

  • Non-blocking: Slow agents don’t block the entire system
  • Fault tolerance: Timeouts prevent resource leaks
  • Scalability: Concurrent delivery to multiple agents
  • Resource protection: Prevents unbounded queue growth

Trade-offs:

  • Message loss: Timed-out messages are dropped
  • Complexity: Requires careful timeout tuning
  • No delivery guarantees: No acknowledgment of successful processing

gRPC Streaming for Subscriptions

Decision: Use bidirectional gRPC streams for agent subscriptions.

Benefits:

  • Real-time delivery: Messages are pushed immediately
  • Connection awareness: Broker knows when agents disconnect
  • Flow control: gRPC handles backpressure automatically
  • Type safety: Protocol Buffer messages ensure data consistency

Trade-offs:

  • Connection overhead: Each agent maintains persistent connections
  • Resource usage: Streams consume memory and file descriptors
  • Network sensitivity: Transient network issues can break connections

Concurrent Access Patterns

Decision: Use read-write mutexes with channel-based message passing.

Implementation:

s.taskMu.RLock()
// Read subscriber information
var targetChannels []chan *pb.TaskMessage
for _, subs := range s.taskSubscribers {
    targetChannels = append(targetChannels, subs...)
}
s.taskMu.RUnlock()

// Send messages without holding locks
for _, subChan := range targetChannels {
    go func(ch chan *pb.TaskMessage, task pb.TaskMessage) {
        // Async delivery
    }(subChan, taskToSend)
}

Benefits:

  • High concurrency: Multiple readers can access subscriptions simultaneously
  • Lock-free delivery: Message sending doesn’t hold locks
  • Deadlock prevention: Clear lock ordering and minimal critical sections
  • Performance: Read operations are optimized for the common case

Scalability Characteristics

Throughput

  • Task routing: ~10,000+ tasks/second on modern hardware
  • Concurrent connections: Limited by file descriptor limits (typically ~1,000s)
  • Memory usage: ~1KB per active subscription

Latency

  • Task routing: <1ms for local network delivery
  • End-to-end: <10ms for simple task processing cycles
  • Progress updates: Real-time streaming with minimal buffering

Resource Usage

  • CPU: Low CPU usage, primarily network I/O bound
  • Memory: Linear growth with number of active subscriptions
  • Network: Efficient binary Protocol Buffer encoding

Error Handling and Resilience

Connection Failures

  • Automatic cleanup: Subscriptions are removed when connections close
  • Graceful degradation: Failed agents don’t affect others
  • Reconnection support: Agents can re-establish subscriptions

Message Delivery Failures

  • Timeout handling: Messages that can’t be delivered are dropped
  • Logging: All failures are logged for debugging
  • Isolation: Per-agent timeouts prevent cascading failures

Resource Protection

  • Channel buffering: Limited buffer sizes prevent memory exhaustion
  • Timeout mechanisms: Prevent resource leaks from stuck operations
  • Graceful shutdown: Proper cleanup during server shutdown

Monitoring and Observability

Built-in Logging

The broker provides comprehensive logging:

  • Task routing decisions
  • Subscription lifecycle events
  • Error conditions and recovery
  • Performance metrics

Integration Points

  • Health checks: HTTP endpoints for monitoring
  • Metrics export: Prometheus/metrics integration points
  • Distributed tracing: Context propagation support

Future Enhancements

Persistence Layer

  • Database backend: Store subscription state for broker restarts
  • Message queuing: Durable task queues for reliability
  • Transaction support: Atomic message delivery guarantees

Clustering Support

  • Horizontal scaling: Multiple broker instances
  • Load balancing: Distribute agents across brokers
  • Consensus protocols: Consistent state across brokers

Advanced Routing

  • Capability-based routing: Route tasks based on agent capabilities
  • Load-aware routing: Consider agent load in routing decisions
  • Geographic routing: Route based on agent location

Security Enhancements

  • Authentication: Agent identity verification
  • Authorization: Task-level access controls
  • Encryption: TLS for all communications

The AgentHub broker architecture provides a solid foundation for Agent2Agent communication while maintaining simplicity and performance. Its design supports the immediate needs of most agent systems while providing clear paths for future enhancement as requirements evolve.

2 - Hexagonal Architecture & A2A Protocol Implementation

Understanding AgentHub’s hexagonal architecture with A2A protocol, gRPC communication, and event-driven design

Hexagonal Architecture & A2A Protocol Implementation

This document explains how AgentHub implements hexagonal architecture principles with the Agent2Agent (A2A) protocol, gRPC communication, and event-driven design patterns.

Overview

AgentHub follows hexagonal architecture (also known as Ports and Adapters) to achieve:

  • Domain isolation: Core A2A protocol logic separated from infrastructure
  • Testability: Clean interfaces enable comprehensive testing
  • Flexibility: Multiple adapters for different communication protocols
  • Maintainability: Clear separation of concerns and dependencies

System Architecture

graph TB
    subgraph "AgentHub Ecosystem"
        subgraph "External Agents"
            A["Agent A<br/>(Chat REPL)"]
            B["Agent B<br/>(Chat Responder)"]
            C["Agent C<br/>(Custom Agent)"]
        end

        subgraph "AgentHub Broker"
            subgraph "Adapters (Infrastructure)"
                GRPC["gRPC Server<br/>Adapter"]
                HEALTH["Health Check<br/>Adapter"]
                METRICS["Metrics<br/>Adapter"]
                TRACING["Tracing Adapter<br/>(OTLP/Jaeger)"]
            end

            subgraph "Ports (Interfaces)"
                SP["AgentHub<br/>Service Port"]
                PP["Message<br/>Publisher Port"]
                EP["Event<br/>Subscriber Port"]
                OP["Observability<br/>Port"]
            end

            subgraph "Domain (Core Logic)"
                A2A["A2A Protocol<br/>Engine"]
                ROUTER["Event Router<br/>& Broker"]
                VALIDATOR["Message<br/>Validator"]
                CONTEXT["Context<br/>Manager"]
                TASK["Task<br/>Lifecycle"]
            end
        end

        subgraph "External Systems"
            OTLP["OTLP Collector<br/>& Jaeger"]
            STORE["Event Store<br/>(Memory)"]
        end
    end

    %% External agent connections
    A -->|"gRPC calls<br/>(PublishMessage,<br/>SubscribeToMessages)"| GRPC
    B -->|"gRPC calls"| GRPC
    C -->|"gRPC calls"| GRPC

    %% Adapter to Port connections
    GRPC -->|"implements"| SP
    HEALTH -->|"implements"| OP
    METRICS -->|"implements"| OP
    TRACING -->|"implements"| OP

    %% Port to Domain connections
    SP -->|"delegates to"| A2A
    PP -->|"delegates to"| ROUTER
    EP -->|"delegates to"| ROUTER
    OP -->|"observes"| A2A

    %% Domain internal connections
    A2A -->|"uses"| VALIDATOR
    A2A -->|"uses"| CONTEXT
    A2A -->|"uses"| TASK
    ROUTER -->|"persists events"| STORE
    TRACING -->|"exports traces"| OTLP

    %% Styling
    classDef agents fill:#add8e6
    classDef adapters fill:#ffa500
    classDef ports fill:#e0ffff
    classDef domain fill:#ffb6c1
    classDef external fill:#dda0dd

    class A,B,C agents
    class GRPC,HEALTH,METRICS,TRACING adapters
    class SP,PP,EP,OP ports
    class A2A,ROUTER,VALIDATOR,CONTEXT,TASK domain
    class OTLP,STORE external

Architecture Notes:

  • Domain Core: Pure A2A protocol logic with message validation, event routing, context correlation, and task state management
  • Ports: Clean, technology-agnostic interfaces providing testable contracts and dependency inversion
  • Adapters: Infrastructure concerns including gRPC communication, observability exports, and protocol adaptations

A2A Message Flow

sequenceDiagram
    participant REPL as Chat REPL<br/>Agent
    participant gRPC as gRPC<br/>Adapter
    participant A2A as A2A Protocol<br/>Engine
    participant Router as Event<br/>Router
    participant Responder as Chat Responder<br/>Agent

    rect rgb(240, 248, 255)
        Note over REPL, Router: A2A Message Publishing
        REPL->>+gRPC: PublishMessage(A2AMessage)
        gRPC->>+A2A: validateA2AMessage()
        A2A->>A2A: check MessageId, Role, Content
        A2A-->>-gRPC: validation result
        gRPC->>+Router: routeA2AEvent(messageEvent)
        Router->>Router: identify subscribers<br/>by agent_id/broadcast
        Router->>Router: create tracing span<br/>with A2A attributes
        Router-->>Responder: deliver message event
        Router-->>-gRPC: routing success
        gRPC-->>-REPL: PublishResponse(event_id)
    end

    rect rgb(255, 248, 240)
        Note over Responder, Router: A2A Message Processing
        Responder->>+gRPC: SubscribeToMessages(agent_id)
        gRPC->>Router: register subscriber
        Router-->>gRPC: subscription stream
        gRPC-->>-Responder: message stream
        Note over Responder: Process A2A message<br/>with tracing spans
        Responder->>+gRPC: PublishMessage(A2AResponse)
        gRPC->>A2A: validateA2AMessage()
        A2A->>A2A: check AGENT role,<br/>ContextId correlation
        gRPC->>Router: routeA2AEvent(responseEvent)
        Router-->>REPL: deliver response event
        gRPC-->>-Responder: PublishResponse
    end

    Note over REPL, Responder: A2A Protocol ensures:<br/>• Message structure compliance<br/>• Role semantics (USER/AGENT)<br/>• Context correlation<br/>• Event-driven routing

Core Components

1. A2A Protocol Engine (Domain Core)

The heart of the system implementing A2A protocol specifications:

// Core domain logic - technology agnostic
type A2AProtocolEngine struct {
    messageValidator MessageValidator
    contextManager   ContextManager
    taskLifecycle    TaskLifecycle
}

// A2A message validation
func (e *A2AProtocolEngine) ValidateMessage(msg *Message) error {
    // A2A compliance checks
    if msg.MessageId == "" { return ErrMissingMessageId }
    if msg.Role == ROLE_UNSPECIFIED { return ErrInvalidRole }
    if len(msg.Content) == 0 { return ErrEmptyContent }
    return nil
}

2. Event Router (Domain Core)

Manages event-driven communication between agents:

type EventRouter struct {
    messageSubscribers map[string][]chan *AgentEvent
    taskSubscribers    map[string][]chan *AgentEvent
    eventSubscribers   map[string][]chan *AgentEvent
}

func (r *EventRouter) RouteEvent(event *AgentEvent) error {
    // Route based on A2A metadata
    routing := event.GetRouting()
    subscribers := r.getSubscribers(routing.ToAgentId, event.PayloadType)

    // Deliver with tracing
    for _, sub := range subscribers {
        go r.deliverWithTracing(sub, event)
    }
}

3. gRPC Adapter (Infrastructure)

Translates between gRPC and domain logic:

type GrpcAdapter struct {
    a2aEngine    A2AProtocolEngine
    eventRouter  EventRouter
    tracer       TracingAdapter
}

func (a *GrpcAdapter) PublishMessage(ctx context.Context, req *PublishMessageRequest) (*PublishResponse, error) {
    // Start tracing span
    ctx, span := a.tracer.StartA2AMessageSpan(ctx, "publish_message", req.Message.MessageId, req.Message.Role)
    defer span.End()

    // Validate using domain logic
    if err := a.a2aEngine.ValidateMessage(req.Message); err != nil {
        a.tracer.RecordError(span, err)
        return nil, err
    }

    // Route using domain logic
    event := a.createA2AEvent(req)
    if err := a.eventRouter.RouteEvent(event); err != nil {
        return nil, err
    }

    return &PublishResponse{Success: true, EventId: event.EventId}, nil
}

Hexagonal Architecture Benefits

1. Domain Isolation

  • A2A protocol logic is pure, testable business logic
  • No infrastructure dependencies in the core domain
  • Technology-agnostic implementation

2. Adapter Pattern

  • gRPC Adapter: Handles Protocol Buffer serialization/deserialization
  • Tracing Adapter: OTLP/Jaeger integration without domain coupling
  • Health Adapter: Service health monitoring
  • Metrics Adapter: Prometheus metrics collection

3. Port Interfaces

// Clean, testable interfaces
type MessagePublisher interface {
    PublishMessage(ctx context.Context, msg *Message) (*PublishResponse, error)
}

type EventSubscriber interface {
    SubscribeToMessages(ctx context.Context, agentId string) (MessageStream, error)
}

type ObservabilityPort interface {
    StartSpan(ctx context.Context, operation string) (context.Context, Span)
    RecordMetric(name string, value float64, labels map[string]string)
}

4. Dependency Inversion

  • Domain depends on abstractions (ports), not concrete implementations
  • Adapters depend on domain through well-defined interfaces
  • Easy testing with mock implementations

A2A Protocol Integration

Message Structure Compliance

classDiagram
    class A2AMessage {
        +string MessageId
        +string ContextId
        +Role Role
        +Part Content
        +Metadata Metadata
        +string TaskId
    }

    class Part {
        +string Text
        +bytes Data
        +FileData File
    }

    class EventMetadata {
        +string FromAgentId
        +string ToAgentId
        +string EventType
        +Priority Priority
    }

    class Role {
        <<enumeration>>
        USER
        AGENT
    }

    class Metadata {
        +Fields map
    }

    A2AMessage "1" --> "0..*" Part : contains
    A2AMessage "1" --> "1" EventMetadata : routed_with
    A2AMessage "1" --> "1" Role : has
    A2AMessage "1" --> "0..1" Metadata : includes

Event-Driven Architecture

The system implements pure event-driven architecture:

  1. Publishers emit A2A-compliant events
  2. Broker routes events based on metadata
  3. Subscribers receive relevant events
  4. Correlation through ContextId maintains conversation flow

Observability Integration

Distributed Tracing

sequenceDiagram
    participant A as Agent A
    participant B as Broker
    participant AB as Agent B
    participant OTLP as OTLP Collector
    participant J as Jaeger

    A->>+B: PublishMessage<br/>[trace_id: 123]
    B->>B: Create A2A spans<br/>with structured attributes
    B->>+AB: RouteEvent<br/>[trace_id: 123]
    AB->>AB: Process with<br/>child spans
    AB->>-B: PublishResponse<br/>[trace_id: 123]
    B->>-A: Success<br/>[trace_id: 123]

    par Observability Export
        B->>OTLP: Export spans<br/>with A2A attributes
        OTLP->>J: Store traces
        J->>J: Build trace timeline<br/>with correlation
    end

    Note over A, J: End-to-end tracing<br/>with A2A protocol visibility

Structured Attributes

Each span includes A2A-specific attributes:

  • a2a.message.id
  • a2a.message.role
  • a2a.context.id
  • a2a.event.type
  • a2a.routing.from_agent
  • a2a.routing.to_agent

Testing Strategy

Unit Testing (Domain Core)

func TestA2AEngine_ValidateMessage(t *testing.T) {
    engine := NewA2AProtocolEngine()

    // Test A2A compliance
    msg := &Message{
        MessageId: "test_msg_123",
        Role: ROLE_USER,
        Content: []*Part{{Text: "hello"}},
    }

    err := engine.ValidateMessage(msg)
    assert.NoError(t, err)
}

Integration Testing (Adapters)

func TestGrpcAdapter_PublishMessage(t *testing.T) {
    // Mock domain dependencies
    mockEngine := &MockA2AEngine{}
    mockRouter := &MockEventRouter{}

    adapter := NewGrpcAdapter(mockEngine, mockRouter)

    // Test adapter behavior
    resp, err := adapter.PublishMessage(ctx, validRequest)
    assert.NoError(t, err)
    assert.True(t, resp.Success)
}

Conclusion

AgentHub’s hexagonal architecture with A2A protocol provides:

  1. Clean Architecture: Separation of concerns with domain-driven design
  2. A2A Compliance: Full protocol implementation with validation
  3. Event-Driven Design: Scalable, loosely-coupled communication
  4. Rich Observability: Comprehensive tracing and metrics
  5. Testability: Clean interfaces enable thorough testing
  6. Flexibility: Easy to extend with new adapters and protocols

This architecture ensures maintainable, scalable, and observable agent communication while maintaining strict A2A protocol compliance.