Architecture Explanations
Understand the fundamental architecture and design principles behind AgentHub’s distributed agent system.
Available Documentation
- Broker Architecture - Central broker design and communication patterns
This is the multi-page printable view of this section. Click here to print.
Understand the fundamental architecture and design principles behind AgentHub’s distributed agent system.
This document explains the internal architecture of the AgentHub Event-Driven Architecture (EDA) broker, how it implements Agent2Agent (A2A) protocol-compliant communication patterns, and the design decisions behind its hybrid approach.
The AgentHub broker serves as a centralized Event-Driven Architecture hub that transports Agent2Agent (A2A) protocol-compliant messages between distributed agents. It combines the scalability benefits of EDA with the interoperability guarantees of the A2A protocol.
┌─────────────────────────────────────────────────────────────────┐
│                     AgentHub Broker                             │
├─────────────────────────────────────────────────────────────────┤
│  ┌─────────────────┐    ┌─────────────────┐    ┌─────────────────┐│
│  │   Task Router   │    │   Subscriber    │    │   Progress      ││
│  │                 │    │   Manager       │    │   Tracker       ││
│  │ • Route tasks   │    │                 │    │                 ││
│  │ • Apply filters │    │ • Manage agent  │    │ • Track task    ││
│  │ • Broadcast     │    │   subscriptions │    │   progress      ││
│  │ • Load balance  │    │ • Handle        │    │ • Update        ││
│  │                 │    │   disconnects   │    │   requesters    ││
│  └─────────────────┘    └─────────────────┘    └─────────────────┘│
├─────────────────────────────────────────────────────────────────┤
│                        gRPC Interface                           │
├─────────────────────────────────────────────────────────────────┤
│  ┌─────────────────┐    ┌─────────────────┐    ┌─────────────────┐│
│  │ PublishTask     │    │SubscribeToTasks│    │SubscribeToTask  ││
│  │ PublishResult   │    │SubscribeToRes  │    │ Progress        ││
│  │ PublishProgress │    │                 │    │                 ││
│  └─────────────────┘    └─────────────────┘    └─────────────────┘│
└─────────────────────────────────────────────────────────────────┘
The main server implementation at broker/main.go:22 provides the central coordination point:
type eventBusServer struct {
    pb.UnimplementedEventBusServer
    // Subscription management
    taskSubscribers         map[string][]chan *pb.TaskMessage
    taskResultSubscribers   map[string][]chan *pb.TaskResult
    taskProgressSubscribers map[string][]chan *pb.TaskProgress
    taskMu                  sync.RWMutex
}
Key characteristics:
sync.RWMutex to protect concurrent access to subscriber mapsThe routing logic determines how tasks are delivered to agents:
When a task specifies a ResponderAgentId, it’s routed directly to that agent:
if responderID := req.GetTask().GetResponderAgentId(); responderID != "" {
    if subs, ok := s.taskSubscribers[responderID]; ok {
        targetChannels = subs
    }
}
When no specific responder is set, tasks are broadcast to all subscribed agents:
} else {
    // Broadcast to all task subscribers
    for _, subs := range s.taskSubscribers {
        targetChannels = append(targetChannels, subs...)
    }
}
The broker manages three types of subscriptions:
Agents subscribe to receive tasks assigned to them:
func (s *eventBusServer) SubscribeToTasks(req *pb.SubscribeToTasksRequest, stream pb.EventBus_SubscribeToTasksServer) error
Publishers subscribe to receive results of tasks they requested:
func (s *eventBusServer) SubscribeToTaskResults(req *pb.SubscribeToTaskResultsRequest, stream pb.EventBus_SubscribeToTaskResultsServer) error
Publishers can track progress of long-running tasks:
func (s *eventBusServer) SubscribeToTaskProgress(req *pb.SubscribeToTaskResultsRequest, stream pb.EventBus_SubscribeToTaskProgressServer) error
Decision: Store all subscription state in memory using Go maps and channels.
Benefits:
Trade-offs:
When this works well:
Decision: Use Go channels with timeout-based delivery.
Implementation:
go func(ch chan *pb.TaskMessage, task pb.TaskMessage) {
    select {
    case ch <- &task:
        // Message sent successfully
    case <-ctx.Done():
        log.Printf("Context cancelled while sending task %s", task.GetTaskId())
    case <-time.After(5 * time.Second):
        log.Printf("Timeout sending task %s. Dropping message.", task.GetTaskId())
    }
}(subChan, taskToSend)
Benefits:
Trade-offs:
Decision: Use bidirectional gRPC streams for agent subscriptions.
Benefits:
Trade-offs:
Decision: Use read-write mutexes with channel-based message passing.
Implementation:
s.taskMu.RLock()
// Read subscriber information
var targetChannels []chan *pb.TaskMessage
for _, subs := range s.taskSubscribers {
    targetChannels = append(targetChannels, subs...)
}
s.taskMu.RUnlock()
// Send messages without holding locks
for _, subChan := range targetChannels {
    go func(ch chan *pb.TaskMessage, task pb.TaskMessage) {
        // Async delivery
    }(subChan, taskToSend)
}
Benefits:
The broker provides comprehensive logging:
The AgentHub broker architecture provides a solid foundation for Agent2Agent communication while maintaining simplicity and performance. Its design supports the immediate needs of most agent systems while providing clear paths for future enhancement as requirements evolve.
This document explains how AgentHub implements hexagonal architecture principles with the Agent2Agent (A2A) protocol, gRPC communication, and event-driven design patterns.
AgentHub follows hexagonal architecture (also known as Ports and Adapters) to achieve:
graph TB
    subgraph "AgentHub Ecosystem"
        subgraph "External Agents"
            A["Agent A<br/>(Chat REPL)"]
            B["Agent B<br/>(Chat Responder)"]
            C["Agent C<br/>(Custom Agent)"]
        end
        subgraph "AgentHub Broker"
            subgraph "Adapters (Infrastructure)"
                GRPC["gRPC Server<br/>Adapter"]
                HEALTH["Health Check<br/>Adapter"]
                METRICS["Metrics<br/>Adapter"]
                TRACING["Tracing Adapter<br/>(OTLP/Jaeger)"]
            end
            subgraph "Ports (Interfaces)"
                SP["AgentHub<br/>Service Port"]
                PP["Message<br/>Publisher Port"]
                EP["Event<br/>Subscriber Port"]
                OP["Observability<br/>Port"]
            end
            subgraph "Domain (Core Logic)"
                A2A["A2A Protocol<br/>Engine"]
                ROUTER["Event Router<br/>& Broker"]
                VALIDATOR["Message<br/>Validator"]
                CONTEXT["Context<br/>Manager"]
                TASK["Task<br/>Lifecycle"]
            end
        end
        subgraph "External Systems"
            OTLP["OTLP Collector<br/>& Jaeger"]
            STORE["Event Store<br/>(Memory)"]
        end
    end
    %% External agent connections
    A -->|"gRPC calls<br/>(PublishMessage,<br/>SubscribeToMessages)"| GRPC
    B -->|"gRPC calls"| GRPC
    C -->|"gRPC calls"| GRPC
    %% Adapter to Port connections
    GRPC -->|"implements"| SP
    HEALTH -->|"implements"| OP
    METRICS -->|"implements"| OP
    TRACING -->|"implements"| OP
    %% Port to Domain connections
    SP -->|"delegates to"| A2A
    PP -->|"delegates to"| ROUTER
    EP -->|"delegates to"| ROUTER
    OP -->|"observes"| A2A
    %% Domain internal connections
    A2A -->|"uses"| VALIDATOR
    A2A -->|"uses"| CONTEXT
    A2A -->|"uses"| TASK
    ROUTER -->|"persists events"| STORE
    TRACING -->|"exports traces"| OTLP
    %% Styling
    classDef agents fill:#add8e6
    classDef adapters fill:#ffa500
    classDef ports fill:#e0ffff
    classDef domain fill:#ffb6c1
    classDef external fill:#dda0dd
    class A,B,C agents
    class GRPC,HEALTH,METRICS,TRACING adapters
    class SP,PP,EP,OP ports
    class A2A,ROUTER,VALIDATOR,CONTEXT,TASK domain
    class OTLP,STORE externalArchitecture Notes:
sequenceDiagram
    participant REPL as Chat REPL<br/>Agent
    participant gRPC as gRPC<br/>Adapter
    participant A2A as A2A Protocol<br/>Engine
    participant Router as Event<br/>Router
    participant Responder as Chat Responder<br/>Agent
    rect rgb(240, 248, 255)
        Note over REPL, Router: A2A Message Publishing
        REPL->>+gRPC: PublishMessage(A2AMessage)
        gRPC->>+A2A: validateA2AMessage()
        A2A->>A2A: check MessageId, Role, Content
        A2A-->>-gRPC: validation result
        gRPC->>+Router: routeA2AEvent(messageEvent)
        Router->>Router: identify subscribers<br/>by agent_id/broadcast
        Router->>Router: create tracing span<br/>with A2A attributes
        Router-->>Responder: deliver message event
        Router-->>-gRPC: routing success
        gRPC-->>-REPL: PublishResponse(event_id)
    end
    rect rgb(255, 248, 240)
        Note over Responder, Router: A2A Message Processing
        Responder->>+gRPC: SubscribeToMessages(agent_id)
        gRPC->>Router: register subscriber
        Router-->>gRPC: subscription stream
        gRPC-->>-Responder: message stream
        Note over Responder: Process A2A message<br/>with tracing spans
        Responder->>+gRPC: PublishMessage(A2AResponse)
        gRPC->>A2A: validateA2AMessage()
        A2A->>A2A: check AGENT role,<br/>ContextId correlation
        gRPC->>Router: routeA2AEvent(responseEvent)
        Router-->>REPL: deliver response event
        gRPC-->>-Responder: PublishResponse
    end
    Note over REPL, Responder: A2A Protocol ensures:<br/>• Message structure compliance<br/>• Role semantics (USER/AGENT)<br/>• Context correlation<br/>• Event-driven routingThe heart of the system implementing A2A protocol specifications:
// Core domain logic - technology agnostic
type A2AProtocolEngine struct {
    messageValidator MessageValidator
    contextManager   ContextManager
    taskLifecycle    TaskLifecycle
}
// A2A message validation
func (e *A2AProtocolEngine) ValidateMessage(msg *Message) error {
    // A2A compliance checks
    if msg.MessageId == "" { return ErrMissingMessageId }
    if msg.Role == ROLE_UNSPECIFIED { return ErrInvalidRole }
    if len(msg.Content) == 0 { return ErrEmptyContent }
    return nil
}
Manages event-driven communication between agents:
type EventRouter struct {
    messageSubscribers map[string][]chan *AgentEvent
    taskSubscribers    map[string][]chan *AgentEvent
    eventSubscribers   map[string][]chan *AgentEvent
}
func (r *EventRouter) RouteEvent(event *AgentEvent) error {
    // Route based on A2A metadata
    routing := event.GetRouting()
    subscribers := r.getSubscribers(routing.ToAgentId, event.PayloadType)
    // Deliver with tracing
    for _, sub := range subscribers {
        go r.deliverWithTracing(sub, event)
    }
}
Translates between gRPC and domain logic:
type GrpcAdapter struct {
    a2aEngine    A2AProtocolEngine
    eventRouter  EventRouter
    tracer       TracingAdapter
}
func (a *GrpcAdapter) PublishMessage(ctx context.Context, req *PublishMessageRequest) (*PublishResponse, error) {
    // Start tracing span
    ctx, span := a.tracer.StartA2AMessageSpan(ctx, "publish_message", req.Message.MessageId, req.Message.Role)
    defer span.End()
    // Validate using domain logic
    if err := a.a2aEngine.ValidateMessage(req.Message); err != nil {
        a.tracer.RecordError(span, err)
        return nil, err
    }
    // Route using domain logic
    event := a.createA2AEvent(req)
    if err := a.eventRouter.RouteEvent(event); err != nil {
        return nil, err
    }
    return &PublishResponse{Success: true, EventId: event.EventId}, nil
}
// Clean, testable interfaces
type MessagePublisher interface {
    PublishMessage(ctx context.Context, msg *Message) (*PublishResponse, error)
}
type EventSubscriber interface {
    SubscribeToMessages(ctx context.Context, agentId string) (MessageStream, error)
}
type ObservabilityPort interface {
    StartSpan(ctx context.Context, operation string) (context.Context, Span)
    RecordMetric(name string, value float64, labels map[string]string)
}
classDiagram
    class A2AMessage {
        +string MessageId
        +string ContextId
        +Role Role
        +Part Content
        +Metadata Metadata
        +string TaskId
    }
    class Part {
        +string Text
        +bytes Data
        +FileData File
    }
    class EventMetadata {
        +string FromAgentId
        +string ToAgentId
        +string EventType
        +Priority Priority
    }
    class Role {
        <<enumeration>>
        USER
        AGENT
    }
    class Metadata {
        +Fields map
    }
    A2AMessage "1" --> "0..*" Part : contains
    A2AMessage "1" --> "1" EventMetadata : routed_with
    A2AMessage "1" --> "1" Role : has
    A2AMessage "1" --> "0..1" Metadata : includesThe system implements pure event-driven architecture:
sequenceDiagram
    participant A as Agent A
    participant B as Broker
    participant AB as Agent B
    participant OTLP as OTLP Collector
    participant J as Jaeger
    A->>+B: PublishMessage<br/>[trace_id: 123]
    B->>B: Create A2A spans<br/>with structured attributes
    B->>+AB: RouteEvent<br/>[trace_id: 123]
    AB->>AB: Process with<br/>child spans
    AB->>-B: PublishResponse<br/>[trace_id: 123]
    B->>-A: Success<br/>[trace_id: 123]
    par Observability Export
        B->>OTLP: Export spans<br/>with A2A attributes
        OTLP->>J: Store traces
        J->>J: Build trace timeline<br/>with correlation
    end
    Note over A, J: End-to-end tracing<br/>with A2A protocol visibilityEach span includes A2A-specific attributes:
a2a.message.ida2a.message.rolea2a.context.ida2a.event.typea2a.routing.from_agenta2a.routing.to_agentfunc TestA2AEngine_ValidateMessage(t *testing.T) {
    engine := NewA2AProtocolEngine()
    // Test A2A compliance
    msg := &Message{
        MessageId: "test_msg_123",
        Role: ROLE_USER,
        Content: []*Part{{Text: "hello"}},
    }
    err := engine.ValidateMessage(msg)
    assert.NoError(t, err)
}
func TestGrpcAdapter_PublishMessage(t *testing.T) {
    // Mock domain dependencies
    mockEngine := &MockA2AEngine{}
    mockRouter := &MockEventRouter{}
    adapter := NewGrpcAdapter(mockEngine, mockRouter)
    // Test adapter behavior
    resp, err := adapter.PublishMessage(ctx, validRequest)
    assert.NoError(t, err)
    assert.True(t, resp.Success)
}
AgentHub’s hexagonal architecture with A2A protocol provides:
This architecture ensures maintainable, scalable, and observable agent communication while maintaining strict A2A protocol compliance.