1 - A2A-Compliant EDA Broker Architecture
Deep dive into the internal architecture of the AgentHub EDA broker, how it implements Agent2Agent (A2A) protocol-compliant communication patterns while maintaining Event-Driven Architecture benefits.
AgentHub A2A-Compliant EDA Broker Architecture
This document explains the internal architecture of the AgentHub Event-Driven Architecture (EDA) broker, how it implements Agent2Agent (A2A) protocol-compliant communication patterns, and the design decisions behind its hybrid approach.
Architectural Overview
The AgentHub broker serves as a centralized Event-Driven Architecture hub that transports Agent2Agent (A2A) protocol-compliant messages between distributed agents. It combines the scalability benefits of EDA with the interoperability guarantees of the A2A protocol.
┌─────────────────────────────────────────────────────────────────┐
│ AgentHub Broker │
├─────────────────────────────────────────────────────────────────┤
│ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐│
│ │ Task Router │ │ Subscriber │ │ Progress ││
│ │ │ │ Manager │ │ Tracker ││
│ │ • Route tasks │ │ │ │ ││
│ │ • Apply filters │ │ • Manage agent │ │ • Track task ││
│ │ • Broadcast │ │ subscriptions │ │ progress ││
│ │ • Load balance │ │ • Handle │ │ • Update ││
│ │ │ │ disconnects │ │ requesters ││
│ └─────────────────┘ └─────────────────┘ └─────────────────┘│
├─────────────────────────────────────────────────────────────────┤
│ gRPC Interface │
├─────────────────────────────────────────────────────────────────┤
│ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐│
│ │ PublishTask │ │SubscribeToTasks│ │SubscribeToTask ││
│ │ PublishResult │ │SubscribeToRes │ │ Progress ││
│ │ PublishProgress │ │ │ │ ││
│ └─────────────────┘ └─────────────────┘ └─────────────────┘│
└─────────────────────────────────────────────────────────────────┘
Core Components
1. Event Bus Server
The main server implementation at broker/main.go:22 provides the central coordination point:
type eventBusServer struct {
pb.UnimplementedEventBusServer
// Subscription management
taskSubscribers map[string][]chan *pb.TaskMessage
taskResultSubscribers map[string][]chan *pb.TaskResult
taskProgressSubscribers map[string][]chan *pb.TaskProgress
taskMu sync.RWMutex
}
Key characteristics:
- Thread-safe: Uses
sync.RWMutex to protect concurrent access to subscriber maps - Channel-based: Uses Go channels for efficient message passing
- Non-blocking: Implements timeouts to prevent blocking on slow consumers
- Stateless: No persistent storage - all state is in-memory
2. Task Routing Engine
The routing logic determines how tasks are delivered to agents:
Direct Routing
When a task specifies a ResponderAgentId, it’s routed directly to that agent:
if responderID := req.GetTask().GetResponderAgentId(); responderID != "" {
if subs, ok := s.taskSubscribers[responderID]; ok {
targetChannels = subs
}
}
Broadcast Routing
When no specific responder is set, tasks are broadcast to all subscribed agents:
} else {
// Broadcast to all task subscribers
for _, subs := range s.taskSubscribers {
targetChannels = append(targetChannels, subs...)
}
}
Routing Features
- Immediate delivery: Tasks are routed immediately upon receipt
- Multiple subscribers: Single agent can have multiple subscription channels
- Timeout protection: 5-second timeout prevents blocking on unresponsive agents
- Error isolation: Failed delivery to one agent doesn’t affect others
3. Subscription Management
The broker manages three types of subscriptions:
Task Subscriptions
Agents subscribe to receive tasks assigned to them:
func (s *eventBusServer) SubscribeToTasks(req *pb.SubscribeToTasksRequest, stream pb.EventBus_SubscribeToTasksServer) error
- Agent-specific: Tasks are delivered based on agent ID
- Type filtering: Optional filtering by task types
- Long-lived streams: Connections persist until agent disconnects
- Automatic cleanup: Subscriptions are removed when connections close
Result Subscriptions
Publishers subscribe to receive results of tasks they requested:
func (s *eventBusServer) SubscribeToTaskResults(req *pb.SubscribeToTaskResultsRequest, stream pb.EventBus_SubscribeToTaskResultsServer) error
Progress Subscriptions
Publishers can track progress of long-running tasks:
func (s *eventBusServer) SubscribeToTaskProgress(req *pb.SubscribeToTaskResultsRequest, stream pb.EventBus_SubscribeToTaskProgressServer) error
4. Message Flow Architecture
Task Publication Flow
- Validation: Incoming tasks are validated for required fields
- Routing: Tasks are routed to appropriate subscribers
- Delivery: Messages are sent via Go channels with timeout protection
- Response: Publisher receives acknowledgment of successful publication
Result Flow
- Receipt: Agents publish task results back to the broker
- Broadcasting: Results are broadcast to all result subscribers
- Filtering: Subscribers receive results for their requested tasks
- Delivery: Results are streamed back to requesting agents
Progress Flow
- Updates: Executing agents send periodic progress updates
- Distribution: Progress updates are sent to interested subscribers
- Real-time delivery: Updates are streamed immediately upon receipt
Design Decisions and Trade-offs
In-Memory State Management
Decision: Store all subscription state in memory using Go maps and channels.
Benefits:
- High performance: No database overhead for message routing
- Low latency: Sub-millisecond message routing
- Simplicity: Easier to develop, test, and maintain
- Concurrent efficiency: Go’s garbage collector handles channel cleanup
Trade-offs:
- No persistence: Broker restart loses all subscription state
- Memory usage: Large numbers of agents increase memory requirements
- Single point of failure: No built-in redundancy
When this works well:
- Development and testing environments
- Small to medium-scale deployments
- Scenarios where agents can re-establish subscriptions on broker restart
Asynchronous Message Delivery
Decision: Use Go channels with timeout-based delivery.
Implementation:
go func(ch chan *pb.TaskMessage, task pb.TaskMessage) {
select {
case ch <- &task:
// Message sent successfully
case <-ctx.Done():
log.Printf("Context cancelled while sending task %s", task.GetTaskId())
case <-time.After(5 * time.Second):
log.Printf("Timeout sending task %s. Dropping message.", task.GetTaskId())
}
}(subChan, taskToSend)
Benefits:
- Non-blocking: Slow agents don’t block the entire system
- Fault tolerance: Timeouts prevent resource leaks
- Scalability: Concurrent delivery to multiple agents
- Resource protection: Prevents unbounded queue growth
Trade-offs:
- Message loss: Timed-out messages are dropped
- Complexity: Requires careful timeout tuning
- No delivery guarantees: No acknowledgment of successful processing
gRPC Streaming for Subscriptions
Decision: Use bidirectional gRPC streams for agent subscriptions.
Benefits:
- Real-time delivery: Messages are pushed immediately
- Connection awareness: Broker knows when agents disconnect
- Flow control: gRPC handles backpressure automatically
- Type safety: Protocol Buffer messages ensure data consistency
Trade-offs:
- Connection overhead: Each agent maintains persistent connections
- Resource usage: Streams consume memory and file descriptors
- Network sensitivity: Transient network issues can break connections
Concurrent Access Patterns
Decision: Use read-write mutexes with channel-based message passing.
Implementation:
s.taskMu.RLock()
// Read subscriber information
var targetChannels []chan *pb.TaskMessage
for _, subs := range s.taskSubscribers {
targetChannels = append(targetChannels, subs...)
}
s.taskMu.RUnlock()
// Send messages without holding locks
for _, subChan := range targetChannels {
go func(ch chan *pb.TaskMessage, task pb.TaskMessage) {
// Async delivery
}(subChan, taskToSend)
}
Benefits:
- High concurrency: Multiple readers can access subscriptions simultaneously
- Lock-free delivery: Message sending doesn’t hold locks
- Deadlock prevention: Clear lock ordering and minimal critical sections
- Performance: Read operations are optimized for the common case
Scalability Characteristics
Throughput
- Task routing: ~10,000+ tasks/second on modern hardware
- Concurrent connections: Limited by file descriptor limits (typically ~1,000s)
- Memory usage: ~1KB per active subscription
Latency
- Task routing: <1ms for local network delivery
- End-to-end: <10ms for simple task processing cycles
- Progress updates: Real-time streaming with minimal buffering
Resource Usage
- CPU: Low CPU usage, primarily network I/O bound
- Memory: Linear growth with number of active subscriptions
- Network: Efficient binary Protocol Buffer encoding
Error Handling and Resilience
Connection Failures
- Automatic cleanup: Subscriptions are removed when connections close
- Graceful degradation: Failed agents don’t affect others
- Reconnection support: Agents can re-establish subscriptions
Message Delivery Failures
- Timeout handling: Messages that can’t be delivered are dropped
- Logging: All failures are logged for debugging
- Isolation: Per-agent timeouts prevent cascading failures
Resource Protection
- Channel buffering: Limited buffer sizes prevent memory exhaustion
- Timeout mechanisms: Prevent resource leaks from stuck operations
- Graceful shutdown: Proper cleanup during server shutdown
Monitoring and Observability
Built-in Logging
The broker provides comprehensive logging:
- Task routing decisions
- Subscription lifecycle events
- Error conditions and recovery
- Performance metrics
Integration Points
- Health checks: HTTP endpoints for monitoring
- Metrics export: Prometheus/metrics integration points
- Distributed tracing: Context propagation support
Future Enhancements
Persistence Layer
- Database backend: Store subscription state for broker restarts
- Message queuing: Durable task queues for reliability
- Transaction support: Atomic message delivery guarantees
Clustering Support
- Horizontal scaling: Multiple broker instances
- Load balancing: Distribute agents across brokers
- Consensus protocols: Consistent state across brokers
Advanced Routing
- Capability-based routing: Route tasks based on agent capabilities
- Load-aware routing: Consider agent load in routing decisions
- Geographic routing: Route based on agent location
Security Enhancements
- Authentication: Agent identity verification
- Authorization: Task-level access controls
- Encryption: TLS for all communications
The AgentHub broker architecture provides a solid foundation for Agent2Agent communication while maintaining simplicity and performance. Its design supports the immediate needs of most agent systems while providing clear paths for future enhancement as requirements evolve.
2 - Cortex Architecture
Understanding the Cortex asynchronous AI orchestration engine
Cortex Architecture
Cortex is an asynchronous, event-driven AI orchestration engine that serves as the “brain” of multi-agent systems. It manages conversations, coordinates tasks across specialized agents, and uses LLM-based decision-making to route work intelligently.
Overview
Traditional chatbots block on long-running operations. Cortex enables non-blocking conversations where users can interact while background tasks execute asynchronously.
Key Innovation
Traditional: User → Request → [BLOCKED] → Response
Cortex: User → Request → Immediate Ack → [Async Work] → Final Response
Users receive instant acknowledgments and can continue conversing while agents process tasks in the background.
Architecture Diagram
┌─────────────────┐ ┌────────────────┐ ┌─────────────┐
│ Chat CLI │─────>│ Event Bus │<─────│ Cortex │
│ (User I/O) │ │ (Broker) │ │ Orchestrator│
└─────────────────┘ └────────────────┘ └─────────────┘
▲ ▲ │
│ chat.response │ task.result │ task.request
│ │ │
│ ┌─────────────┐ │
└──────────────────│ Agent(s) │◄─────────────┘
│ (Workers) │
└─────────────┘
Core Components
1. Cortex Orchestrator
The central decision-making engine that:
- Maintains conversation state - Full history per session
- Registers agents dynamically - Discovers capabilities via Agent Cards
- Decides actions via LLM - Uses AI to route work intelligently
- Coordinates tasks - Tracks pending work and correlates results
File: agents/cortex/cortex.go
2. State Manager
Manages conversational state with thread-safe operations:
type ConversationState struct {
SessionID string
Messages []*pb.Message
PendingTasks map[string]*TaskContext
RegisteredAgents map[string]*pb.AgentCard
}
Key Features:
- Per-session locking (no global bottleneck)
- Interface-based (swappable implementations)
- Currently in-memory (POC), production uses Redis/PostgreSQL
Files: agents/cortex/state/
3. LLM Client
Abstraction for AI-powered decision-making:
type Client interface {
Decide(
ctx context.Context,
conversationHistory []*pb.Message,
availableAgents []*pb.AgentCard,
newEvent *pb.Message,
) (*Decision, error)
}
The LLM analyzes:
- Conversation history
- Available agent capabilities
- New incoming messages
And returns decisions:
chat.response - Reply to usertask.request - Dispatch work to agent
Files: agents/cortex/llm/
IntelligentDecider: Context-Aware Orchestration
The IntelligentDecider is a mock LLM implementation that demonstrates intelligent, intent-based task orchestration. Unlike simple dispatchers that route every message to agents, it analyzes user intent before deciding whether to orchestrate with specialized agents or respond directly.
Key Characteristics:
Intent Detection: Analyzes message content for keywords indicating specific needs
- Echo requests: “echo”, “repeat”, “say back”
- Future: “translate”, “summarize”, “transcribe”, etc.
Conditional Orchestration: Only dispatches to agents when user explicitly requests functionality
- User: “echo hello” → Dispatches to echo_agent
- User: “hello” → Responds directly (no agent needed)
Transparent Reasoning: Always explains decision-making process
- All decisions include detailed reasoning visible in observability traces
- Users understand why Cortex chose specific actions
Example Flow:
// User message: "echo hello world"
decision := IntelligentDecider()(ctx, history, agents, userMsg)
// Returns:
Decision{
Reasoning: "User message 'echo hello world' contains an explicit echo request (detected keywords: echo/repeat/say back). I'm dispatching this to the echo_agent which specializes in repeating messages back.",
Actions: [
{
Type: "chat.response",
ResponseText: "I detected you want me to echo something. I'm asking the echo agent to handle this for you.",
},
{
Type: "task.request",
TaskType: "echo",
TargetAgent: "agent_echo",
TaskPayload: {"input": "echo hello world"},
},
],
}
Comparison to Simple Dispatchers:
| Approach | Every Message | Intent Detection | Explains Reasoning | Responds Directly |
|---|
| TaskDispatcherDecider (deprecated) | Dispatches to agent | No | Minimal | No |
| IntelligentDecider | Analyzes first | Yes | Detailed | Yes |
Design Benefits:
- Reduced Latency: Simple queries get immediate responses without agent roundtrip
- Resource Efficiency: Agents only invoked when their specialized capabilities are needed
- Better UX: Users understand what Cortex is doing and why
- Debuggability: Reasoning in traces makes orchestration logic transparent
- Extensibility: Easy to add new intent patterns for new agent types
Future Evolution:
In production, the IntelligentDecider pattern should be replaced with a real LLM that performs function calling:
// Production LLM receives tools/functions
tools := convertAgentCardsToTools(availableAgents)
decision := realLLM.Decide(history, tools, newMsg)
// LLM naturally decides:
// - "hello" → No function call, direct response
// - "echo hello" → Calls echo_agent function
// - "translate this to French" → Calls translation_agent function
The IntelligentDecider serves as a working example of the decision patterns a real LLM would follow.
4. Message Publisher
Interface for publishing messages to the Event Bus:
type MessagePublisher interface {
PublishMessage(
ctx context.Context,
msg *pb.Message,
routing *pb.AgentEventMetadata,
) error
}
Adapts AgentHub client to Cortex’s needs.
Message Flow
Simple Chat Request
1. User types "Hello" in CLI
↓
2. CLI publishes A2A Message (role=USER, context_id=session-1)
↓
3. Event Bus routes to Cortex
↓
4. Cortex retrieves conversation state
↓
5. Cortex calls LLM.Decide(history, agents, newMsg)
↓
6. LLM returns Decision: [chat.response: "Hello! How can I help?"]
↓
7. Cortex publishes A2A Message (role=AGENT, response text)
↓
8. Event Bus routes to CLI
↓
9. CLI displays response
↓
10. Cortex updates state with both messages
Asynchronous Task Execution
1. User: "Transcribe this audio file"
↓
2. Cortex LLM decides: [chat.response + task.request]
↓
3. Cortex publishes:
- Message to user: "I'll start transcription, this may take a few minutes"
- Task request to transcription agent
↓
4. User sees immediate acknowledgment ✅
User can continue chatting!
↓
5. Transcription agent processes (background, may take minutes)
↓
6. Agent publishes task.result with transcribed text
↓
7. Cortex receives result, calls LLM.Decide()
↓
8. LLM decides: [chat.response: "Transcription complete: <text>"]
↓
9. Cortex publishes final response to user
↓
10. User sees final result
Design Patterns
1. Interface Segregation
All major components are interfaces:
- StateManager - Easy to swap (in-memory → Redis)
- LLM Client - Easy to test (mock → real AI)
- MessagePublisher - Decoupled from transport
Benefits:
- Testability (use mocks)
- Flexibility (swap implementations)
- Clear contracts
2. Session-Level Concurrency
Each session has its own lock:
// NOT this (global bottleneck):
globalMutex.Lock()
updateState()
globalMutex.Unlock()
// But this (per-session):
sessionLock := getSessionLock(sessionID)
sessionLock.Lock()
updateState()
sessionLock.Unlock()
Benefits:
- Multiple sessions can update concurrently
- No contention across sessions
- Scales horizontally
3. LLM as Control Plane
Instead of hard-coded if/else routing:
// Old way:
if strings.Contains(input, "transcribe") {
dispatchToTranscriber()
} else if strings.Contains(input, "translate") {
dispatchToTranslator()
}
// Cortex way:
decision := llm.Decide(history, agents, input)
executeActions(decision.Actions)
Benefits:
- Flexible - LLM adapts to context
- Extensible - Add agents, LLM discovers them
- Natural - Mimics human reasoning
Implementation: The IntelligentDecider (see LLM Client section above) demonstrates this pattern by analyzing user intent and making intelligent routing decisions with transparent reasoning.
4. Message Self-Containment
Every message is fully self-describing:
message Message {
string message_id = 1; // Unique ID
string context_id = 2; // Session/conversation ID
string task_id = 3; // Task correlation (if applicable)
Role role = 4; // USER or AGENT
repeated Part content = 5;
Struct metadata = 6;
}
Benefits:
- Agents are stateless (all context in message)
- Easy correlation (context_id, task_id)
- Traceable (message_id)
State Management
ConversationState Structure
type ConversationState struct {
SessionID string
Messages []*pb.Message // Full history
PendingTasks map[string]*TaskContext
RegisteredAgents map[string]*pb.AgentCard
}
TaskContext Tracking
type TaskContext struct {
TaskID string
TaskType string
RequestedAt int64
OriginalInput *pb.Message
UserNotified bool // Did we acknowledge?
}
Cortex tracks:
- Which tasks are in-flight
- What the user originally requested
- Whether we’ve acknowledged the request
State Lifecycle
1. Get session state (or create new)
2. Lock session for updates
3. Add new message to history
4. Call LLM to decide actions
5. Execute actions (publish messages)
6. Update pending tasks
7. Save state
8. Release lock
Agent Discovery
Agent Card Registration
Agents publish capabilities on startup:
type AgentCard struct {
Name string
Description string
Skills []*AgentSkill
}
Cortex maintains a registry:
registeredAgents map[string]*pb.AgentCard
When making LLM decisions, Cortex provides available agents:
decision := llm.Decide(
ctx,
conversationHistory,
cortex.GetAvailableAgents(), // ← Dynamic list
newEvent,
)
The LLM sees which tools are available and chooses appropriately.
Concurrency Model
- Lock Granularity: Per-session (not global)
- State Access: O(1) lookups via map
- Message Processing: Asynchronous (non-blocking)
Horizontal Scaling
Future: Partition sessions across multiple Cortex instances:
Cortex-1: handles sessions A-M
Cortex-2: handles sessions N-Z
Event Bus routes messages to correct instance based on context_id.
- State Get: O(1) with read lock
- State Set: O(1) with write lock
- Concurrent Sessions: No contention (per-session locks)
Tested: 100 goroutines updating same session → zero lost updates ✅
Error Handling
Agent Failures
When an agent fails:
- Agent publishes
task.result with status=“failed” - Cortex receives result
- LLM decides how to handle (inform user, retry, try alternative)
- Cortex publishes response
LLM Failures
If LLM client errors:
decision, err := llm.Decide(...)
if err != nil {
// Fallback: publish generic error response
publishErrorResponse(ctx, session, err)
return err
}
State Corruption
Protected by:
- Transaction-like WithLock pattern
- Copy-on-read to prevent external mutations
- Validation on state load/save
Implementation Status
✅ Implemented (POC)
- Core orchestrator logic
- In-memory state management
- Mock LLM client with IntelligentDecider (intent-based routing)
- Agent registration
- Message routing
- Task correlation
- CLI interface
- Echo agent (demo)
- Distributed tracing with OpenTelemetry
🚧 Future Work
- Persistent state (Redis, PostgreSQL)
- Real LLM integration (Vertex AI, OpenAI)
- Agent health monitoring
- Web UI with WebSockets
- Retry logic & timeouts
- Advanced error recovery
Code Organization
agents/cortex/
├── cortex.go # Core orchestrator with full observability
├── cortex_test.go # Core tests (4 tests)
├── state/
│ ├── interface.go # StateManager interface
│ ├── memory.go # In-memory implementation
│ └── memory_test.go # State tests (5 tests)
├── llm/
│ ├── interface.go # LLM Client interface
│ ├── mock.go # Mock implementations
│ │ # - IntelligentDecider (intent-based)
│ │ # - TaskDispatcherDecider (deprecated)
│ │ # - SimpleEchoDecider
│ └── mock_test.go # LLM tests (4 tests)
└── cmd/
└── main.go # Service entry point
Total: ~1,200 lines of production code + 500 lines of tests
Testing Strategy
Unit Tests
- State Manager: CRUD, concurrency, locking (5 tests)
- LLM Client: Mock behavior, decision functions (4 tests)
- Cortex Core: Registration, chat, tasks (4 tests)
All tests use interfaces and mocks (no external dependencies).
Concurrency Testing
func TestInMemoryStateManager_Concurrency(t *testing.T) {
// Launch 100 goroutines updating same session
for i := 0; i < 100; i++ {
go func() {
sm.WithLock(sessionID, func(state *ConversationState) error {
state.Messages = append(state.Messages, msg)
return nil
})
}()
}
// Assert: Exactly 100 messages (no lost updates)
}
Integration Testing
Demo script (demo_cortex.sh) tests:
- Broker startup
- Cortex initialization
- Agent registration
- End-to-end message flow
Configuration
Environment Variables
# LLM Model (future)
CORTEX_LLM_MODEL=vertex-ai://gemini-2.0-flash
# AgentHub connection
AGENTHUB_GRPC_PORT=127.0.0.1:50051
AGENTHUB_BROKER_ADDR=127.0.0.1
# Health check
CORTEX_HEALTH_PORT=8086
Programmatic Configuration
cortex := cortex.NewCortex(
state.NewInMemoryStateManager(), // or Redis/Postgres
llm.NewVertexAIClient(model), // or Mock for testing
messagePublisher,
)
Observability
Logging
Structured logging with context:
client.Logger.InfoContext(ctx, "Cortex received message",
"message_id", message.GetMessageId(),
"context_id", message.GetContextId(),
"role", message.GetRole().String(),
)
Tracing
OpenTelemetry spans already in AgentHub client:
- Trace ID propagation
- Span relationships (parent → child)
- Error recording
Metrics (Future)
- Messages processed per session
- LLM decision latency
- Task completion rates
- Error rates by type
Security Considerations
Current (POC)
- No authentication (all agents trusted)
- No authorization (all agents can do anything)
- No message validation (trusts well-formed protobufs)
Future
- Agent authentication via mTLS
- Message signing & verification
- Rate limiting per agent
- Input sanitization for LLM prompts
Best Practices
For Cortex Operators
- Monitor state size - Large conversation histories impact memory
- Configure LLM timeouts - Prevent hanging on slow AI responses
- Use persistent state - In-memory is POC only
- Enable tracing - Essential for debugging async flows
For Agent Developers
- Publish clear Agent Cards - Cortex needs good descriptions
- Handle errors gracefully - Publish failed task results, don’t crash
- Use correlation IDs - Essential for Cortex to track work
- Be stateless - All context should be in messages
Comparison to Alternatives
| Approach | Blocking | State Management | Extensibility |
|---|
| Traditional Chatbot | Yes ✗ | Simple | Hard-coded |
| Function Calling | Yes ✗ | Per-request | Config files |
| Cortex | No ✓ | Persistent | Dynamic |
Cortex enables truly asynchronous, extensible AI systems.
Resources
Next Steps
- Read the Cortex Tutorial to build your first orchestrator
- See How to Create Agents for agent development
- Check Cortex API Reference for detailed interface documentation
3 - Hexagonal Architecture & A2A Protocol Implementation
Understanding AgentHub’s hexagonal architecture with A2A protocol, gRPC communication, and event-driven design
Hexagonal Architecture & A2A Protocol Implementation
This document explains how AgentHub implements hexagonal architecture principles with the Agent2Agent (A2A) protocol, gRPC communication, and event-driven design patterns.
Overview
AgentHub follows hexagonal architecture (also known as Ports and Adapters) to achieve:
- Domain isolation: Core A2A protocol logic separated from infrastructure
- Testability: Clean interfaces enable comprehensive testing
- Flexibility: Multiple adapters for different communication protocols
- Maintainability: Clear separation of concerns and dependencies
System Architecture
graph TB
subgraph "AgentHub Ecosystem"
subgraph "External Agents"
A["Agent A<br/>(Chat REPL)"]
B["Agent B<br/>(Chat Responder)"]
C["Agent C<br/>(Custom Agent)"]
end
subgraph "AgentHub Broker"
subgraph "Adapters (Infrastructure)"
GRPC["gRPC Server<br/>Adapter"]
HEALTH["Health Check<br/>Adapter"]
METRICS["Metrics<br/>Adapter"]
TRACING["Tracing Adapter<br/>(OTLP/Jaeger)"]
end
subgraph "Ports (Interfaces)"
SP["AgentHub<br/>Service Port"]
PP["Message<br/>Publisher Port"]
EP["Event<br/>Subscriber Port"]
OP["Observability<br/>Port"]
end
subgraph "Domain (Core Logic)"
A2A["A2A Protocol<br/>Engine"]
ROUTER["Event Router<br/>& Broker"]
VALIDATOR["Message<br/>Validator"]
CONTEXT["Context<br/>Manager"]
TASK["Task<br/>Lifecycle"]
end
end
subgraph "External Systems"
OTLP["OTLP Collector<br/>& Jaeger"]
STORE["Event Store<br/>(Memory)"]
end
end
%% External agent connections
A -->|"gRPC calls<br/>(PublishMessage,<br/>SubscribeToMessages)"| GRPC
B -->|"gRPC calls"| GRPC
C -->|"gRPC calls"| GRPC
%% Adapter to Port connections
GRPC -->|"implements"| SP
HEALTH -->|"implements"| OP
METRICS -->|"implements"| OP
TRACING -->|"implements"| OP
%% Port to Domain connections
SP -->|"delegates to"| A2A
PP -->|"delegates to"| ROUTER
EP -->|"delegates to"| ROUTER
OP -->|"observes"| A2A
%% Domain internal connections
A2A -->|"uses"| VALIDATOR
A2A -->|"uses"| CONTEXT
A2A -->|"uses"| TASK
ROUTER -->|"persists events"| STORE
TRACING -->|"exports traces"| OTLP
%% Styling
classDef agents fill:#add8e6
classDef adapters fill:#ffa500
classDef ports fill:#e0ffff
classDef domain fill:#ffb6c1
classDef external fill:#dda0dd
class A,B,C agents
class GRPC,HEALTH,METRICS,TRACING adapters
class SP,PP,EP,OP ports
class A2A,ROUTER,VALIDATOR,CONTEXT,TASK domain
class OTLP,STORE externalArchitecture Notes:
- Domain Core: Pure A2A protocol logic with message validation, event routing, context correlation, and task state management
- Ports: Clean, technology-agnostic interfaces providing testable contracts and dependency inversion
- Adapters: Infrastructure concerns including gRPC communication, observability exports, and protocol adaptations
A2A Message Flow
sequenceDiagram
participant REPL as Chat REPL<br/>Agent
participant gRPC as gRPC<br/>Adapter
participant A2A as A2A Protocol<br/>Engine
participant Router as Event<br/>Router
participant Responder as Chat Responder<br/>Agent
rect rgb(240, 248, 255)
Note over REPL, Router: A2A Message Publishing
REPL->>+gRPC: PublishMessage(A2AMessage)
gRPC->>+A2A: validateA2AMessage()
A2A->>A2A: check MessageId, Role, Content
A2A-->>-gRPC: validation result
gRPC->>+Router: routeA2AEvent(messageEvent)
Router->>Router: identify subscribers<br/>by agent_id/broadcast
Router->>Router: create tracing span<br/>with A2A attributes
Router-->>Responder: deliver message event
Router-->>-gRPC: routing success
gRPC-->>-REPL: PublishResponse(event_id)
end
rect rgb(255, 248, 240)
Note over Responder, Router: A2A Message Processing
Responder->>+gRPC: SubscribeToMessages(agent_id)
gRPC->>Router: register subscriber
Router-->>gRPC: subscription stream
gRPC-->>-Responder: message stream
Note over Responder: Process A2A message<br/>with tracing spans
Responder->>+gRPC: PublishMessage(A2AResponse)
gRPC->>A2A: validateA2AMessage()
A2A->>A2A: check AGENT role,<br/>ContextId correlation
gRPC->>Router: routeA2AEvent(responseEvent)
Router-->>REPL: deliver response event
gRPC-->>-Responder: PublishResponse
end
Note over REPL, Responder: A2A Protocol ensures:<br/>• Message structure compliance<br/>• Role semantics (USER/AGENT)<br/>• Context correlation<br/>• Event-driven routingCore Components
1. A2A Protocol Engine (Domain Core)
The heart of the system implementing A2A protocol specifications:
// Core domain logic - technology agnostic
type A2AProtocolEngine struct {
messageValidator MessageValidator
contextManager ContextManager
taskLifecycle TaskLifecycle
}
// A2A message validation
func (e *A2AProtocolEngine) ValidateMessage(msg *Message) error {
// A2A compliance checks
if msg.MessageId == "" { return ErrMissingMessageId }
if msg.Role == ROLE_UNSPECIFIED { return ErrInvalidRole }
if len(msg.Content) == 0 { return ErrEmptyContent }
return nil
}
2. Event Router (Domain Core)
Manages event-driven communication between agents:
type EventRouter struct {
messageSubscribers map[string][]chan *AgentEvent
taskSubscribers map[string][]chan *AgentEvent
eventSubscribers map[string][]chan *AgentEvent
}
func (r *EventRouter) RouteEvent(event *AgentEvent) error {
// Route based on A2A metadata
routing := event.GetRouting()
subscribers := r.getSubscribers(routing.ToAgentId, event.PayloadType)
// Deliver with tracing
for _, sub := range subscribers {
go r.deliverWithTracing(sub, event)
}
}
3. gRPC Adapter (Infrastructure)
Translates between gRPC and domain logic:
type GrpcAdapter struct {
a2aEngine A2AProtocolEngine
eventRouter EventRouter
tracer TracingAdapter
}
func (a *GrpcAdapter) PublishMessage(ctx context.Context, req *PublishMessageRequest) (*PublishResponse, error) {
// Start tracing span
ctx, span := a.tracer.StartA2AMessageSpan(ctx, "publish_message", req.Message.MessageId, req.Message.Role)
defer span.End()
// Validate using domain logic
if err := a.a2aEngine.ValidateMessage(req.Message); err != nil {
a.tracer.RecordError(span, err)
return nil, err
}
// Route using domain logic
event := a.createA2AEvent(req)
if err := a.eventRouter.RouteEvent(event); err != nil {
return nil, err
}
return &PublishResponse{Success: true, EventId: event.EventId}, nil
}
Hexagonal Architecture Benefits
1. Domain Isolation
- A2A protocol logic is pure, testable business logic
- No infrastructure dependencies in the core domain
- Technology-agnostic implementation
2. Adapter Pattern
- gRPC Adapter: Handles Protocol Buffer serialization/deserialization
- Tracing Adapter: OTLP/Jaeger integration without domain coupling
- Health Adapter: Service health monitoring
- Metrics Adapter: Prometheus metrics collection
3. Port Interfaces
// Clean, testable interfaces
type MessagePublisher interface {
PublishMessage(ctx context.Context, msg *Message) (*PublishResponse, error)
}
type EventSubscriber interface {
SubscribeToMessages(ctx context.Context, agentId string) (MessageStream, error)
}
type ObservabilityPort interface {
StartSpan(ctx context.Context, operation string) (context.Context, Span)
RecordMetric(name string, value float64, labels map[string]string)
}
4. Dependency Inversion
- Domain depends on abstractions (ports), not concrete implementations
- Adapters depend on domain through well-defined interfaces
- Easy testing with mock implementations
A2A Protocol Integration
Message Structure Compliance
classDiagram
class A2AMessage {
+string MessageId
+string ContextId
+Role Role
+Part Content
+Metadata Metadata
+string TaskId
}
class Part {
+string Text
+bytes Data
+FileData File
}
class EventMetadata {
+string FromAgentId
+string ToAgentId
+string EventType
+Priority Priority
}
class Role {
<<enumeration>>
USER
AGENT
}
class Metadata {
+Fields map
}
A2AMessage "1" --> "0..*" Part : contains
A2AMessage "1" --> "1" EventMetadata : routed_with
A2AMessage "1" --> "1" Role : has
A2AMessage "1" --> "0..1" Metadata : includesEvent-Driven Architecture
The system implements pure event-driven architecture:
- Publishers emit A2A-compliant events
- Broker routes events based on metadata
- Subscribers receive relevant events
- Correlation through ContextId maintains conversation flow
Observability Integration
Distributed Tracing
sequenceDiagram
participant A as Agent A
participant B as Broker
participant AB as Agent B
participant OTLP as OTLP Collector
participant J as Jaeger
A->>+B: PublishMessage<br/>[trace_id: 123]
B->>B: Create A2A spans<br/>with structured attributes
B->>+AB: RouteEvent<br/>[trace_id: 123]
AB->>AB: Process with<br/>child spans
AB->>-B: PublishResponse<br/>[trace_id: 123]
B->>-A: Success<br/>[trace_id: 123]
par Observability Export
B->>OTLP: Export spans<br/>with A2A attributes
OTLP->>J: Store traces
J->>J: Build trace timeline<br/>with correlation
end
Note over A, J: End-to-end tracing<br/>with A2A protocol visibilityStructured Attributes
Each span includes A2A-specific attributes:
a2a.message.ida2a.message.rolea2a.context.ida2a.event.typea2a.routing.from_agenta2a.routing.to_agent
Testing Strategy
Unit Testing (Domain Core)
func TestA2AEngine_ValidateMessage(t *testing.T) {
engine := NewA2AProtocolEngine()
// Test A2A compliance
msg := &Message{
MessageId: "test_msg_123",
Role: ROLE_USER,
Content: []*Part{{Text: "hello"}},
}
err := engine.ValidateMessage(msg)
assert.NoError(t, err)
}
Integration Testing (Adapters)
func TestGrpcAdapter_PublishMessage(t *testing.T) {
// Mock domain dependencies
mockEngine := &MockA2AEngine{}
mockRouter := &MockEventRouter{}
adapter := NewGrpcAdapter(mockEngine, mockRouter)
// Test adapter behavior
resp, err := adapter.PublishMessage(ctx, validRequest)
assert.NoError(t, err)
assert.True(t, resp.Success)
}
Conclusion
AgentHub’s hexagonal architecture with A2A protocol provides:
- Clean Architecture: Separation of concerns with domain-driven design
- A2A Compliance: Full protocol implementation with validation
- Event-Driven Design: Scalable, loosely-coupled communication
- Rich Observability: Comprehensive tracing and metrics
- Testability: Clean interfaces enable thorough testing
- Flexibility: Easy to extend with new adapters and protocols
This architecture ensures maintainable, scalable, and observable agent communication while maintaining strict A2A protocol compliance.