A2A-Compliant EDA Broker Architecture
AgentHub A2A-Compliant EDA Broker Architecture
This document explains the internal architecture of the AgentHub Event-Driven Architecture (EDA) broker, how it implements Agent2Agent (A2A) protocol-compliant communication patterns, and the design decisions behind its hybrid approach.
Architectural Overview
The AgentHub broker serves as a centralized Event-Driven Architecture hub that transports Agent2Agent (A2A) protocol-compliant messages between distributed agents. It combines the scalability benefits of EDA with the interoperability guarantees of the A2A protocol.
┌─────────────────────────────────────────────────────────────────┐
│                     AgentHub Broker                             │
├─────────────────────────────────────────────────────────────────┤
│  ┌─────────────────┐    ┌─────────────────┐    ┌─────────────────┐│
│  │   Task Router   │    │   Subscriber    │    │   Progress      ││
│  │                 │    │   Manager       │    │   Tracker       ││
│  │ • Route tasks   │    │                 │    │                 ││
│  │ • Apply filters │    │ • Manage agent  │    │ • Track task    ││
│  │ • Broadcast     │    │   subscriptions │    │   progress      ││
│  │ • Load balance  │    │ • Handle        │    │ • Update        ││
│  │                 │    │   disconnects   │    │   requesters    ││
│  └─────────────────┘    └─────────────────┘    └─────────────────┘│
├─────────────────────────────────────────────────────────────────┤
│                        gRPC Interface                           │
├─────────────────────────────────────────────────────────────────┤
│  ┌─────────────────┐    ┌─────────────────┐    ┌─────────────────┐│
│  │ PublishTask     │    │SubscribeToTasks│    │SubscribeToTask  ││
│  │ PublishResult   │    │SubscribeToRes  │    │ Progress        ││
│  │ PublishProgress │    │                 │    │                 ││
│  └─────────────────┘    └─────────────────┘    └─────────────────┘│
└─────────────────────────────────────────────────────────────────┘
Core Components
1. Event Bus Server
The main server implementation at broker/main.go:22 provides the central coordination point:
type eventBusServer struct {
    pb.UnimplementedEventBusServer
    // Subscription management
    taskSubscribers         map[string][]chan *pb.TaskMessage
    taskResultSubscribers   map[string][]chan *pb.TaskResult
    taskProgressSubscribers map[string][]chan *pb.TaskProgress
    taskMu                  sync.RWMutex
}
Key characteristics:
- Thread-safe: Uses sync.RWMutexto protect concurrent access to subscriber maps
- Channel-based: Uses Go channels for efficient message passing
- Non-blocking: Implements timeouts to prevent blocking on slow consumers
- Stateless: No persistent storage - all state is in-memory
2. Task Routing Engine
The routing logic determines how tasks are delivered to agents:
Direct Routing
When a task specifies a ResponderAgentId, it’s routed directly to that agent:
if responderID := req.GetTask().GetResponderAgentId(); responderID != "" {
    if subs, ok := s.taskSubscribers[responderID]; ok {
        targetChannels = subs
    }
}
Broadcast Routing
When no specific responder is set, tasks are broadcast to all subscribed agents:
} else {
    // Broadcast to all task subscribers
    for _, subs := range s.taskSubscribers {
        targetChannels = append(targetChannels, subs...)
    }
}
Routing Features
- Immediate delivery: Tasks are routed immediately upon receipt
- Multiple subscribers: Single agent can have multiple subscription channels
- Timeout protection: 5-second timeout prevents blocking on unresponsive agents
- Error isolation: Failed delivery to one agent doesn’t affect others
3. Subscription Management
The broker manages three types of subscriptions:
Task Subscriptions
Agents subscribe to receive tasks assigned to them:
func (s *eventBusServer) SubscribeToTasks(req *pb.SubscribeToTasksRequest, stream pb.EventBus_SubscribeToTasksServer) error
- Agent-specific: Tasks are delivered based on agent ID
- Type filtering: Optional filtering by task types
- Long-lived streams: Connections persist until agent disconnects
- Automatic cleanup: Subscriptions are removed when connections close
Result Subscriptions
Publishers subscribe to receive results of tasks they requested:
func (s *eventBusServer) SubscribeToTaskResults(req *pb.SubscribeToTaskResultsRequest, stream pb.EventBus_SubscribeToTaskResultsServer) error
Progress Subscriptions
Publishers can track progress of long-running tasks:
func (s *eventBusServer) SubscribeToTaskProgress(req *pb.SubscribeToTaskResultsRequest, stream pb.EventBus_SubscribeToTaskProgressServer) error
4. Message Flow Architecture
Task Publication Flow
- Validation: Incoming tasks are validated for required fields
- Routing: Tasks are routed to appropriate subscribers
- Delivery: Messages are sent via Go channels with timeout protection
- Response: Publisher receives acknowledgment of successful publication
Result Flow
- Receipt: Agents publish task results back to the broker
- Broadcasting: Results are broadcast to all result subscribers
- Filtering: Subscribers receive results for their requested tasks
- Delivery: Results are streamed back to requesting agents
Progress Flow
- Updates: Executing agents send periodic progress updates
- Distribution: Progress updates are sent to interested subscribers
- Real-time delivery: Updates are streamed immediately upon receipt
Design Decisions and Trade-offs
In-Memory State Management
Decision: Store all subscription state in memory using Go maps and channels.
Benefits:
- High performance: No database overhead for message routing
- Low latency: Sub-millisecond message routing
- Simplicity: Easier to develop, test, and maintain
- Concurrent efficiency: Go’s garbage collector handles channel cleanup
Trade-offs:
- No persistence: Broker restart loses all subscription state
- Memory usage: Large numbers of agents increase memory requirements
- Single point of failure: No built-in redundancy
When this works well:
- Development and testing environments
- Small to medium-scale deployments
- Scenarios where agents can re-establish subscriptions on broker restart
Asynchronous Message Delivery
Decision: Use Go channels with timeout-based delivery.
Implementation:
go func(ch chan *pb.TaskMessage, task pb.TaskMessage) {
    select {
    case ch <- &task:
        // Message sent successfully
    case <-ctx.Done():
        log.Printf("Context cancelled while sending task %s", task.GetTaskId())
    case <-time.After(5 * time.Second):
        log.Printf("Timeout sending task %s. Dropping message.", task.GetTaskId())
    }
}(subChan, taskToSend)
Benefits:
- Non-blocking: Slow agents don’t block the entire system
- Fault tolerance: Timeouts prevent resource leaks
- Scalability: Concurrent delivery to multiple agents
- Resource protection: Prevents unbounded queue growth
Trade-offs:
- Message loss: Timed-out messages are dropped
- Complexity: Requires careful timeout tuning
- No delivery guarantees: No acknowledgment of successful processing
gRPC Streaming for Subscriptions
Decision: Use bidirectional gRPC streams for agent subscriptions.
Benefits:
- Real-time delivery: Messages are pushed immediately
- Connection awareness: Broker knows when agents disconnect
- Flow control: gRPC handles backpressure automatically
- Type safety: Protocol Buffer messages ensure data consistency
Trade-offs:
- Connection overhead: Each agent maintains persistent connections
- Resource usage: Streams consume memory and file descriptors
- Network sensitivity: Transient network issues can break connections
Concurrent Access Patterns
Decision: Use read-write mutexes with channel-based message passing.
Implementation:
s.taskMu.RLock()
// Read subscriber information
var targetChannels []chan *pb.TaskMessage
for _, subs := range s.taskSubscribers {
    targetChannels = append(targetChannels, subs...)
}
s.taskMu.RUnlock()
// Send messages without holding locks
for _, subChan := range targetChannels {
    go func(ch chan *pb.TaskMessage, task pb.TaskMessage) {
        // Async delivery
    }(subChan, taskToSend)
}
Benefits:
- High concurrency: Multiple readers can access subscriptions simultaneously
- Lock-free delivery: Message sending doesn’t hold locks
- Deadlock prevention: Clear lock ordering and minimal critical sections
- Performance: Read operations are optimized for the common case
Scalability Characteristics
Throughput
- Task routing: ~10,000+ tasks/second on modern hardware
- Concurrent connections: Limited by file descriptor limits (typically ~1,000s)
- Memory usage: ~1KB per active subscription
Latency
- Task routing: <1ms for local network delivery
- End-to-end: <10ms for simple task processing cycles
- Progress updates: Real-time streaming with minimal buffering
Resource Usage
- CPU: Low CPU usage, primarily network I/O bound
- Memory: Linear growth with number of active subscriptions
- Network: Efficient binary Protocol Buffer encoding
Error Handling and Resilience
Connection Failures
- Automatic cleanup: Subscriptions are removed when connections close
- Graceful degradation: Failed agents don’t affect others
- Reconnection support: Agents can re-establish subscriptions
Message Delivery Failures
- Timeout handling: Messages that can’t be delivered are dropped
- Logging: All failures are logged for debugging
- Isolation: Per-agent timeouts prevent cascading failures
Resource Protection
- Channel buffering: Limited buffer sizes prevent memory exhaustion
- Timeout mechanisms: Prevent resource leaks from stuck operations
- Graceful shutdown: Proper cleanup during server shutdown
Monitoring and Observability
Built-in Logging
The broker provides comprehensive logging:
- Task routing decisions
- Subscription lifecycle events
- Error conditions and recovery
- Performance metrics
Integration Points
- Health checks: HTTP endpoints for monitoring
- Metrics export: Prometheus/metrics integration points
- Distributed tracing: Context propagation support
Future Enhancements
Persistence Layer
- Database backend: Store subscription state for broker restarts
- Message queuing: Durable task queues for reliability
- Transaction support: Atomic message delivery guarantees
Clustering Support
- Horizontal scaling: Multiple broker instances
- Load balancing: Distribute agents across brokers
- Consensus protocols: Consistent state across brokers
Advanced Routing
- Capability-based routing: Route tasks based on agent capabilities
- Load-aware routing: Consider agent load in routing decisions
- Geographic routing: Route based on agent location
Security Enhancements
- Authentication: Agent identity verification
- Authorization: Task-level access controls
- Encryption: TLS for all communications
The AgentHub broker architecture provides a solid foundation for Agent2Agent communication while maintaining simplicity and performance. Its design supports the immediate needs of most agent systems while providing clear paths for future enhancement as requirements evolve.
Feedback
Was this page helpful?
Glad to hear it! Please tell us how we can improve.
Sorry to hear that. Please tell us how we can improve.