Explanations
Understanding-oriented discussions that provide context and background
Explanations
This section provides in-depth discussions and explanations to help you understand the concepts, design decisions, and architecture behind AgentHub. These materials are designed to broaden your understanding beyond just how to use the system.
π Explanation Categories
- Architecture - System design and architectural principles
- Core Concepts - Fundamental concepts and mental models
- Features - Deep dives into specific features and capabilities
π― How to Use These Explanations
These documents are designed to:
- Provide context for why things work the way they do
- Explain trade-offs and design decisions
- Offer multiple perspectives on the same concepts
- Help you make informed decisions about using AgentHub
π Reading Path
For Understanding Core Concepts
- Core Concepts - Start with fundamental principles
- Architecture - Understand system design
- Features - Explore specific capabilities
For System Design
- Architecture - System design patterns
- Features - Performance and scaling considerations
For Implementation Details
- Features - Technical implementation deep dives
π‘ Discussion Topics
These explanations discuss:
- Design philosophy and principles
- Architectural decisions and their rationale
- Performance considerations and trade-offs
- Future directions and possibilities
- Alternative approaches and their pros/cons
Note
Explanations focus on understanding rather than implementation. For practical guidance, see the
tutorials and
how-to guides.
1 - Architecture
Deep dive into AgentHub’s system architecture and design
Architecture Explanations
Understand the fundamental architecture and design principles behind AgentHub’s distributed agent system.
Available Documentation
1.1 - A2A-Compliant EDA Broker Architecture
Deep dive into the internal architecture of the AgentHub EDA broker, how it implements Agent2Agent (A2A) protocol-compliant communication patterns while maintaining Event-Driven Architecture benefits.
AgentHub A2A-Compliant EDA Broker Architecture
This document explains the internal architecture of the AgentHub Event-Driven Architecture (EDA) broker, how it implements Agent2Agent (A2A) protocol-compliant communication patterns, and the design decisions behind its hybrid approach.
Architectural Overview
The AgentHub broker serves as a centralized Event-Driven Architecture hub that transports Agent2Agent (A2A) protocol-compliant messages between distributed agents. It combines the scalability benefits of EDA with the interoperability guarantees of the A2A protocol.
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β AgentHub Broker β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β βββββββββββββββββββ βββββββββββββββββββ ββββββββββββββββββββ
β β Task Router β β Subscriber β β Progress ββ
β β β β Manager β β Tracker ββ
β β β’ Route tasks β β β β ββ
β β β’ Apply filters β β β’ Manage agent β β β’ Track task ββ
β β β’ Broadcast β β subscriptions β β progress ββ
β β β’ Load balance β β β’ Handle β β β’ Update ββ
β β β β disconnects β β requesters ββ
β βββββββββββββββββββ βββββββββββββββββββ ββββββββββββββββββββ
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β gRPC Interface β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β βββββββββββββββββββ βββββββββββββββββββ ββββββββββββββββββββ
β β PublishTask β βSubscribeToTasksβ βSubscribeToTask ββ
β β PublishResult β βSubscribeToRes β β Progress ββ
β β PublishProgress β β β β ββ
β βββββββββββββββββββ βββββββββββββββββββ ββββββββββββββββββββ
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Core Components
1. Event Bus Server
The main server implementation at broker/main.go:22 provides the central coordination point:
type eventBusServer struct {
pb.UnimplementedEventBusServer
// Subscription management
taskSubscribers map[string][]chan *pb.TaskMessage
taskResultSubscribers map[string][]chan *pb.TaskResult
taskProgressSubscribers map[string][]chan *pb.TaskProgress
taskMu sync.RWMutex
}
Key characteristics:
- Thread-safe: Uses
sync.RWMutex to protect concurrent access to subscriber maps - Channel-based: Uses Go channels for efficient message passing
- Non-blocking: Implements timeouts to prevent blocking on slow consumers
- Stateless: No persistent storage - all state is in-memory
2. Task Routing Engine
The routing logic determines how tasks are delivered to agents:
Direct Routing
When a task specifies a ResponderAgentId, it’s routed directly to that agent:
if responderID := req.GetTask().GetResponderAgentId(); responderID != "" {
if subs, ok := s.taskSubscribers[responderID]; ok {
targetChannels = subs
}
}
Broadcast Routing
When no specific responder is set, tasks are broadcast to all subscribed agents:
} else {
// Broadcast to all task subscribers
for _, subs := range s.taskSubscribers {
targetChannels = append(targetChannels, subs...)
}
}
Routing Features
- Immediate delivery: Tasks are routed immediately upon receipt
- Multiple subscribers: Single agent can have multiple subscription channels
- Timeout protection: 5-second timeout prevents blocking on unresponsive agents
- Error isolation: Failed delivery to one agent doesn’t affect others
3. Subscription Management
The broker manages three types of subscriptions:
Task Subscriptions
Agents subscribe to receive tasks assigned to them:
func (s *eventBusServer) SubscribeToTasks(req *pb.SubscribeToTasksRequest, stream pb.EventBus_SubscribeToTasksServer) error
- Agent-specific: Tasks are delivered based on agent ID
- Type filtering: Optional filtering by task types
- Long-lived streams: Connections persist until agent disconnects
- Automatic cleanup: Subscriptions are removed when connections close
Result Subscriptions
Publishers subscribe to receive results of tasks they requested:
func (s *eventBusServer) SubscribeToTaskResults(req *pb.SubscribeToTaskResultsRequest, stream pb.EventBus_SubscribeToTaskResultsServer) error
Progress Subscriptions
Publishers can track progress of long-running tasks:
func (s *eventBusServer) SubscribeToTaskProgress(req *pb.SubscribeToTaskResultsRequest, stream pb.EventBus_SubscribeToTaskProgressServer) error
4. Message Flow Architecture
Task Publication Flow
- Validation: Incoming tasks are validated for required fields
- Routing: Tasks are routed to appropriate subscribers
- Delivery: Messages are sent via Go channels with timeout protection
- Response: Publisher receives acknowledgment of successful publication
Result Flow
- Receipt: Agents publish task results back to the broker
- Broadcasting: Results are broadcast to all result subscribers
- Filtering: Subscribers receive results for their requested tasks
- Delivery: Results are streamed back to requesting agents
Progress Flow
- Updates: Executing agents send periodic progress updates
- Distribution: Progress updates are sent to interested subscribers
- Real-time delivery: Updates are streamed immediately upon receipt
Design Decisions and Trade-offs
In-Memory State Management
Decision: Store all subscription state in memory using Go maps and channels.
Benefits:
- High performance: No database overhead for message routing
- Low latency: Sub-millisecond message routing
- Simplicity: Easier to develop, test, and maintain
- Concurrent efficiency: Go’s garbage collector handles channel cleanup
Trade-offs:
- No persistence: Broker restart loses all subscription state
- Memory usage: Large numbers of agents increase memory requirements
- Single point of failure: No built-in redundancy
When this works well:
- Development and testing environments
- Small to medium-scale deployments
- Scenarios where agents can re-establish subscriptions on broker restart
Asynchronous Message Delivery
Decision: Use Go channels with timeout-based delivery.
Implementation:
go func(ch chan *pb.TaskMessage, task pb.TaskMessage) {
select {
case ch <- &task:
// Message sent successfully
case <-ctx.Done():
log.Printf("Context cancelled while sending task %s", task.GetTaskId())
case <-time.After(5 * time.Second):
log.Printf("Timeout sending task %s. Dropping message.", task.GetTaskId())
}
}(subChan, taskToSend)
Benefits:
- Non-blocking: Slow agents don’t block the entire system
- Fault tolerance: Timeouts prevent resource leaks
- Scalability: Concurrent delivery to multiple agents
- Resource protection: Prevents unbounded queue growth
Trade-offs:
- Message loss: Timed-out messages are dropped
- Complexity: Requires careful timeout tuning
- No delivery guarantees: No acknowledgment of successful processing
gRPC Streaming for Subscriptions
Decision: Use bidirectional gRPC streams for agent subscriptions.
Benefits:
- Real-time delivery: Messages are pushed immediately
- Connection awareness: Broker knows when agents disconnect
- Flow control: gRPC handles backpressure automatically
- Type safety: Protocol Buffer messages ensure data consistency
Trade-offs:
- Connection overhead: Each agent maintains persistent connections
- Resource usage: Streams consume memory and file descriptors
- Network sensitivity: Transient network issues can break connections
Concurrent Access Patterns
Decision: Use read-write mutexes with channel-based message passing.
Implementation:
s.taskMu.RLock()
// Read subscriber information
var targetChannels []chan *pb.TaskMessage
for _, subs := range s.taskSubscribers {
targetChannels = append(targetChannels, subs...)
}
s.taskMu.RUnlock()
// Send messages without holding locks
for _, subChan := range targetChannels {
go func(ch chan *pb.TaskMessage, task pb.TaskMessage) {
// Async delivery
}(subChan, taskToSend)
}
Benefits:
- High concurrency: Multiple readers can access subscriptions simultaneously
- Lock-free delivery: Message sending doesn’t hold locks
- Deadlock prevention: Clear lock ordering and minimal critical sections
- Performance: Read operations are optimized for the common case
Scalability Characteristics
Throughput
- Task routing: ~10,000+ tasks/second on modern hardware
- Concurrent connections: Limited by file descriptor limits (typically ~1,000s)
- Memory usage: ~1KB per active subscription
Latency
- Task routing: <1ms for local network delivery
- End-to-end: <10ms for simple task processing cycles
- Progress updates: Real-time streaming with minimal buffering
Resource Usage
- CPU: Low CPU usage, primarily network I/O bound
- Memory: Linear growth with number of active subscriptions
- Network: Efficient binary Protocol Buffer encoding
Error Handling and Resilience
Connection Failures
- Automatic cleanup: Subscriptions are removed when connections close
- Graceful degradation: Failed agents don’t affect others
- Reconnection support: Agents can re-establish subscriptions
Message Delivery Failures
- Timeout handling: Messages that can’t be delivered are dropped
- Logging: All failures are logged for debugging
- Isolation: Per-agent timeouts prevent cascading failures
Resource Protection
- Channel buffering: Limited buffer sizes prevent memory exhaustion
- Timeout mechanisms: Prevent resource leaks from stuck operations
- Graceful shutdown: Proper cleanup during server shutdown
Monitoring and Observability
Built-in Logging
The broker provides comprehensive logging:
- Task routing decisions
- Subscription lifecycle events
- Error conditions and recovery
- Performance metrics
Integration Points
- Health checks: HTTP endpoints for monitoring
- Metrics export: Prometheus/metrics integration points
- Distributed tracing: Context propagation support
Future Enhancements
Persistence Layer
- Database backend: Store subscription state for broker restarts
- Message queuing: Durable task queues for reliability
- Transaction support: Atomic message delivery guarantees
Clustering Support
- Horizontal scaling: Multiple broker instances
- Load balancing: Distribute agents across brokers
- Consensus protocols: Consistent state across brokers
Advanced Routing
- Capability-based routing: Route tasks based on agent capabilities
- Load-aware routing: Consider agent load in routing decisions
- Geographic routing: Route based on agent location
Security Enhancements
- Authentication: Agent identity verification
- Authorization: Task-level access controls
- Encryption: TLS for all communications
The AgentHub broker architecture provides a solid foundation for Agent2Agent communication while maintaining simplicity and performance. Its design supports the immediate needs of most agent systems while providing clear paths for future enhancement as requirements evolve.
1.2 - Cortex Architecture
Understanding the Cortex asynchronous AI orchestration engine
Cortex Architecture
Cortex is an asynchronous, event-driven AI orchestration engine that serves as the “brain” of multi-agent systems. It manages conversations, coordinates tasks across specialized agents, and uses LLM-based decision-making to route work intelligently.
Overview
Traditional chatbots block on long-running operations. Cortex enables non-blocking conversations where users can interact while background tasks execute asynchronously.
Key Innovation
Traditional: User β Request β [BLOCKED] β Response
Cortex: User β Request β Immediate Ack β [Async Work] β Final Response
Users receive instant acknowledgments and can continue conversing while agents process tasks in the background.
Architecture Diagram
βββββββββββββββββββ ββββββββββββββββββ βββββββββββββββ
β Chat CLI ββββββ>β Event Bus β<ββββββ Cortex β
β (User I/O) β β (Broker) β β Orchestratorβ
βββββββββββββββββββ ββββββββββββββββββ βββββββββββββββ
β² β² β
β chat.response β task.result β task.request
β β β
β βββββββββββββββ β
ββββββββββββββββββββ Agent(s) ββββββββββββββββ
β (Workers) β
βββββββββββββββ
Core Components
1. Cortex Orchestrator
The central decision-making engine that:
- Maintains conversation state - Full history per session
- Registers agents dynamically - Discovers capabilities via Agent Cards
- Decides actions via LLM - Uses AI to route work intelligently
- Coordinates tasks - Tracks pending work and correlates results
File: agents/cortex/cortex.go
2. State Manager
Manages conversational state with thread-safe operations:
type ConversationState struct {
SessionID string
Messages []*pb.Message
PendingTasks map[string]*TaskContext
RegisteredAgents map[string]*pb.AgentCard
}
Key Features:
- Per-session locking (no global bottleneck)
- Interface-based (swappable implementations)
- Currently in-memory (POC), production uses Redis/PostgreSQL
Files: agents/cortex/state/
3. LLM Client
Abstraction for AI-powered decision-making:
type Client interface {
Decide(
ctx context.Context,
conversationHistory []*pb.Message,
availableAgents []*pb.AgentCard,
newEvent *pb.Message,
) (*Decision, error)
}
The LLM analyzes:
- Conversation history
- Available agent capabilities
- New incoming messages
And returns decisions:
chat.response - Reply to usertask.request - Dispatch work to agent
Files: agents/cortex/llm/
IntelligentDecider: Context-Aware Orchestration
The IntelligentDecider is a mock LLM implementation that demonstrates intelligent, intent-based task orchestration. Unlike simple dispatchers that route every message to agents, it analyzes user intent before deciding whether to orchestrate with specialized agents or respond directly.
Key Characteristics:
Intent Detection: Analyzes message content for keywords indicating specific needs
- Echo requests: “echo”, “repeat”, “say back”
- Future: “translate”, “summarize”, “transcribe”, etc.
Conditional Orchestration: Only dispatches to agents when user explicitly requests functionality
- User: “echo hello” β Dispatches to echo_agent
- User: “hello” β Responds directly (no agent needed)
Transparent Reasoning: Always explains decision-making process
- All decisions include detailed reasoning visible in observability traces
- Users understand why Cortex chose specific actions
Example Flow:
// User message: "echo hello world"
decision := IntelligentDecider()(ctx, history, agents, userMsg)
// Returns:
Decision{
Reasoning: "User message 'echo hello world' contains an explicit echo request (detected keywords: echo/repeat/say back). I'm dispatching this to the echo_agent which specializes in repeating messages back.",
Actions: [
{
Type: "chat.response",
ResponseText: "I detected you want me to echo something. I'm asking the echo agent to handle this for you.",
},
{
Type: "task.request",
TaskType: "echo",
TargetAgent: "agent_echo",
TaskPayload: {"input": "echo hello world"},
},
],
}
Comparison to Simple Dispatchers:
| Approach | Every Message | Intent Detection | Explains Reasoning | Responds Directly |
|---|
| TaskDispatcherDecider (deprecated) | Dispatches to agent | No | Minimal | No |
| IntelligentDecider | Analyzes first | Yes | Detailed | Yes |
Design Benefits:
- Reduced Latency: Simple queries get immediate responses without agent roundtrip
- Resource Efficiency: Agents only invoked when their specialized capabilities are needed
- Better UX: Users understand what Cortex is doing and why
- Debuggability: Reasoning in traces makes orchestration logic transparent
- Extensibility: Easy to add new intent patterns for new agent types
Future Evolution:
In production, the IntelligentDecider pattern should be replaced with a real LLM that performs function calling:
// Production LLM receives tools/functions
tools := convertAgentCardsToTools(availableAgents)
decision := realLLM.Decide(history, tools, newMsg)
// LLM naturally decides:
// - "hello" β No function call, direct response
// - "echo hello" β Calls echo_agent function
// - "translate this to French" β Calls translation_agent function
The IntelligentDecider serves as a working example of the decision patterns a real LLM would follow.
4. Message Publisher
Interface for publishing messages to the Event Bus:
type MessagePublisher interface {
PublishMessage(
ctx context.Context,
msg *pb.Message,
routing *pb.AgentEventMetadata,
) error
}
Adapts AgentHub client to Cortex’s needs.
Message Flow
Simple Chat Request
1. User types "Hello" in CLI
β
2. CLI publishes A2A Message (role=USER, context_id=session-1)
β
3. Event Bus routes to Cortex
β
4. Cortex retrieves conversation state
β
5. Cortex calls LLM.Decide(history, agents, newMsg)
β
6. LLM returns Decision: [chat.response: "Hello! How can I help?"]
β
7. Cortex publishes A2A Message (role=AGENT, response text)
β
8. Event Bus routes to CLI
β
9. CLI displays response
β
10. Cortex updates state with both messages
Asynchronous Task Execution
1. User: "Transcribe this audio file"
β
2. Cortex LLM decides: [chat.response + task.request]
β
3. Cortex publishes:
- Message to user: "I'll start transcription, this may take a few minutes"
- Task request to transcription agent
β
4. User sees immediate acknowledgment β
User can continue chatting!
β
5. Transcription agent processes (background, may take minutes)
β
6. Agent publishes task.result with transcribed text
β
7. Cortex receives result, calls LLM.Decide()
β
8. LLM decides: [chat.response: "Transcription complete: <text>"]
β
9. Cortex publishes final response to user
β
10. User sees final result
Design Patterns
1. Interface Segregation
All major components are interfaces:
- StateManager - Easy to swap (in-memory β Redis)
- LLM Client - Easy to test (mock β real AI)
- MessagePublisher - Decoupled from transport
Benefits:
- Testability (use mocks)
- Flexibility (swap implementations)
- Clear contracts
2. Session-Level Concurrency
Each session has its own lock:
// NOT this (global bottleneck):
globalMutex.Lock()
updateState()
globalMutex.Unlock()
// But this (per-session):
sessionLock := getSessionLock(sessionID)
sessionLock.Lock()
updateState()
sessionLock.Unlock()
Benefits:
- Multiple sessions can update concurrently
- No contention across sessions
- Scales horizontally
3. LLM as Control Plane
Instead of hard-coded if/else routing:
// Old way:
if strings.Contains(input, "transcribe") {
dispatchToTranscriber()
} else if strings.Contains(input, "translate") {
dispatchToTranslator()
}
// Cortex way:
decision := llm.Decide(history, agents, input)
executeActions(decision.Actions)
Benefits:
- Flexible - LLM adapts to context
- Extensible - Add agents, LLM discovers them
- Natural - Mimics human reasoning
Implementation: The IntelligentDecider (see LLM Client section above) demonstrates this pattern by analyzing user intent and making intelligent routing decisions with transparent reasoning.
4. Message Self-Containment
Every message is fully self-describing:
message Message {
string message_id = 1; // Unique ID
string context_id = 2; // Session/conversation ID
string task_id = 3; // Task correlation (if applicable)
Role role = 4; // USER or AGENT
repeated Part content = 5;
Struct metadata = 6;
}
Benefits:
- Agents are stateless (all context in message)
- Easy correlation (context_id, task_id)
- Traceable (message_id)
State Management
ConversationState Structure
type ConversationState struct {
SessionID string
Messages []*pb.Message // Full history
PendingTasks map[string]*TaskContext
RegisteredAgents map[string]*pb.AgentCard
}
TaskContext Tracking
type TaskContext struct {
TaskID string
TaskType string
RequestedAt int64
OriginalInput *pb.Message
UserNotified bool // Did we acknowledge?
}
Cortex tracks:
- Which tasks are in-flight
- What the user originally requested
- Whether we’ve acknowledged the request
State Lifecycle
1. Get session state (or create new)
2. Lock session for updates
3. Add new message to history
4. Call LLM to decide actions
5. Execute actions (publish messages)
6. Update pending tasks
7. Save state
8. Release lock
Agent Discovery
Agent Card Registration
Agents publish capabilities on startup:
type AgentCard struct {
Name string
Description string
Skills []*AgentSkill
}
Cortex maintains a registry:
registeredAgents map[string]*pb.AgentCard
When making LLM decisions, Cortex provides available agents:
decision := llm.Decide(
ctx,
conversationHistory,
cortex.GetAvailableAgents(), // β Dynamic list
newEvent,
)
The LLM sees which tools are available and chooses appropriately.
Concurrency Model
- Lock Granularity: Per-session (not global)
- State Access: O(1) lookups via map
- Message Processing: Asynchronous (non-blocking)
Horizontal Scaling
Future: Partition sessions across multiple Cortex instances:
Cortex-1: handles sessions A-M
Cortex-2: handles sessions N-Z
Event Bus routes messages to correct instance based on context_id.
- State Get: O(1) with read lock
- State Set: O(1) with write lock
- Concurrent Sessions: No contention (per-session locks)
Tested: 100 goroutines updating same session β zero lost updates β
Error Handling
Agent Failures
When an agent fails:
- Agent publishes
task.result with status=“failed” - Cortex receives result
- LLM decides how to handle (inform user, retry, try alternative)
- Cortex publishes response
LLM Failures
If LLM client errors:
decision, err := llm.Decide(...)
if err != nil {
// Fallback: publish generic error response
publishErrorResponse(ctx, session, err)
return err
}
State Corruption
Protected by:
- Transaction-like WithLock pattern
- Copy-on-read to prevent external mutations
- Validation on state load/save
Implementation Status
β
Implemented (POC)
- Core orchestrator logic
- In-memory state management
- Mock LLM client with IntelligentDecider (intent-based routing)
- Agent registration
- Message routing
- Task correlation
- CLI interface
- Echo agent (demo)
- Distributed tracing with OpenTelemetry
π§ Future Work
- Persistent state (Redis, PostgreSQL)
- Real LLM integration (Vertex AI, OpenAI)
- Agent health monitoring
- Web UI with WebSockets
- Retry logic & timeouts
- Advanced error recovery
Code Organization
agents/cortex/
βββ cortex.go # Core orchestrator with full observability
βββ cortex_test.go # Core tests (4 tests)
βββ state/
β βββ interface.go # StateManager interface
β βββ memory.go # In-memory implementation
β βββ memory_test.go # State tests (5 tests)
βββ llm/
β βββ interface.go # LLM Client interface
β βββ mock.go # Mock implementations
β β # - IntelligentDecider (intent-based)
β β # - TaskDispatcherDecider (deprecated)
β β # - SimpleEchoDecider
β βββ mock_test.go # LLM tests (4 tests)
βββ cmd/
βββ main.go # Service entry point
Total: ~1,200 lines of production code + 500 lines of tests
Testing Strategy
Unit Tests
- State Manager: CRUD, concurrency, locking (5 tests)
- LLM Client: Mock behavior, decision functions (4 tests)
- Cortex Core: Registration, chat, tasks (4 tests)
All tests use interfaces and mocks (no external dependencies).
Concurrency Testing
func TestInMemoryStateManager_Concurrency(t *testing.T) {
// Launch 100 goroutines updating same session
for i := 0; i < 100; i++ {
go func() {
sm.WithLock(sessionID, func(state *ConversationState) error {
state.Messages = append(state.Messages, msg)
return nil
})
}()
}
// Assert: Exactly 100 messages (no lost updates)
}
Integration Testing
Demo script (demo_cortex.sh) tests:
- Broker startup
- Cortex initialization
- Agent registration
- End-to-end message flow
Configuration
Environment Variables
# LLM Model (future)
CORTEX_LLM_MODEL=vertex-ai://gemini-2.0-flash
# AgentHub connection
AGENTHUB_GRPC_PORT=127.0.0.1:50051
AGENTHUB_BROKER_ADDR=127.0.0.1
# Health check
CORTEX_HEALTH_PORT=8086
Programmatic Configuration
cortex := cortex.NewCortex(
state.NewInMemoryStateManager(), // or Redis/Postgres
llm.NewVertexAIClient(model), // or Mock for testing
messagePublisher,
)
Observability
Logging
Structured logging with context:
client.Logger.InfoContext(ctx, "Cortex received message",
"message_id", message.GetMessageId(),
"context_id", message.GetContextId(),
"role", message.GetRole().String(),
)
Tracing
OpenTelemetry spans already in AgentHub client:
- Trace ID propagation
- Span relationships (parent β child)
- Error recording
Metrics (Future)
- Messages processed per session
- LLM decision latency
- Task completion rates
- Error rates by type
Security Considerations
Current (POC)
- No authentication (all agents trusted)
- No authorization (all agents can do anything)
- No message validation (trusts well-formed protobufs)
Future
- Agent authentication via mTLS
- Message signing & verification
- Rate limiting per agent
- Input sanitization for LLM prompts
Best Practices
For Cortex Operators
- Monitor state size - Large conversation histories impact memory
- Configure LLM timeouts - Prevent hanging on slow AI responses
- Use persistent state - In-memory is POC only
- Enable tracing - Essential for debugging async flows
For Agent Developers
- Publish clear Agent Cards - Cortex needs good descriptions
- Handle errors gracefully - Publish failed task results, don’t crash
- Use correlation IDs - Essential for Cortex to track work
- Be stateless - All context should be in messages
Comparison to Alternatives
| Approach | Blocking | State Management | Extensibility |
|---|
| Traditional Chatbot | Yes β | Simple | Hard-coded |
| Function Calling | Yes β | Per-request | Config files |
| Cortex | No β | Persistent | Dynamic |
Cortex enables truly asynchronous, extensible AI systems.
Resources
Next Steps
- Read the Cortex Tutorial to build your first orchestrator
- See How to Create Agents for agent development
- Check Cortex API Reference for detailed interface documentation
1.3 - Hexagonal Architecture & A2A Protocol Implementation
Understanding AgentHub’s hexagonal architecture with A2A protocol, gRPC communication, and event-driven design
Hexagonal Architecture & A2A Protocol Implementation
This document explains how AgentHub implements hexagonal architecture principles with the Agent2Agent (A2A) protocol, gRPC communication, and event-driven design patterns.
Overview
AgentHub follows hexagonal architecture (also known as Ports and Adapters) to achieve:
- Domain isolation: Core A2A protocol logic separated from infrastructure
- Testability: Clean interfaces enable comprehensive testing
- Flexibility: Multiple adapters for different communication protocols
- Maintainability: Clear separation of concerns and dependencies
System Architecture
graph TB
subgraph "AgentHub Ecosystem"
subgraph "External Agents"
A["Agent A<br/>(Chat REPL)"]
B["Agent B<br/>(Chat Responder)"]
C["Agent C<br/>(Custom Agent)"]
end
subgraph "AgentHub Broker"
subgraph "Adapters (Infrastructure)"
GRPC["gRPC Server<br/>Adapter"]
HEALTH["Health Check<br/>Adapter"]
METRICS["Metrics<br/>Adapter"]
TRACING["Tracing Adapter<br/>(OTLP/Jaeger)"]
end
subgraph "Ports (Interfaces)"
SP["AgentHub<br/>Service Port"]
PP["Message<br/>Publisher Port"]
EP["Event<br/>Subscriber Port"]
OP["Observability<br/>Port"]
end
subgraph "Domain (Core Logic)"
A2A["A2A Protocol<br/>Engine"]
ROUTER["Event Router<br/>& Broker"]
VALIDATOR["Message<br/>Validator"]
CONTEXT["Context<br/>Manager"]
TASK["Task<br/>Lifecycle"]
end
end
subgraph "External Systems"
OTLP["OTLP Collector<br/>& Jaeger"]
STORE["Event Store<br/>(Memory)"]
end
end
%% External agent connections
A -->|"gRPC calls<br/>(PublishMessage,<br/>SubscribeToMessages)"| GRPC
B -->|"gRPC calls"| GRPC
C -->|"gRPC calls"| GRPC
%% Adapter to Port connections
GRPC -->|"implements"| SP
HEALTH -->|"implements"| OP
METRICS -->|"implements"| OP
TRACING -->|"implements"| OP
%% Port to Domain connections
SP -->|"delegates to"| A2A
PP -->|"delegates to"| ROUTER
EP -->|"delegates to"| ROUTER
OP -->|"observes"| A2A
%% Domain internal connections
A2A -->|"uses"| VALIDATOR
A2A -->|"uses"| CONTEXT
A2A -->|"uses"| TASK
ROUTER -->|"persists events"| STORE
TRACING -->|"exports traces"| OTLP
%% Styling
classDef agents fill:#add8e6
classDef adapters fill:#ffa500
classDef ports fill:#e0ffff
classDef domain fill:#ffb6c1
classDef external fill:#dda0dd
class A,B,C agents
class GRPC,HEALTH,METRICS,TRACING adapters
class SP,PP,EP,OP ports
class A2A,ROUTER,VALIDATOR,CONTEXT,TASK domain
class OTLP,STORE externalArchitecture Notes:
- Domain Core: Pure A2A protocol logic with message validation, event routing, context correlation, and task state management
- Ports: Clean, technology-agnostic interfaces providing testable contracts and dependency inversion
- Adapters: Infrastructure concerns including gRPC communication, observability exports, and protocol adaptations
A2A Message Flow
sequenceDiagram
participant REPL as Chat REPL<br/>Agent
participant gRPC as gRPC<br/>Adapter
participant A2A as A2A Protocol<br/>Engine
participant Router as Event<br/>Router
participant Responder as Chat Responder<br/>Agent
rect rgb(240, 248, 255)
Note over REPL, Router: A2A Message Publishing
REPL->>+gRPC: PublishMessage(A2AMessage)
gRPC->>+A2A: validateA2AMessage()
A2A->>A2A: check MessageId, Role, Content
A2A-->>-gRPC: validation result
gRPC->>+Router: routeA2AEvent(messageEvent)
Router->>Router: identify subscribers<br/>by agent_id/broadcast
Router->>Router: create tracing span<br/>with A2A attributes
Router-->>Responder: deliver message event
Router-->>-gRPC: routing success
gRPC-->>-REPL: PublishResponse(event_id)
end
rect rgb(255, 248, 240)
Note over Responder, Router: A2A Message Processing
Responder->>+gRPC: SubscribeToMessages(agent_id)
gRPC->>Router: register subscriber
Router-->>gRPC: subscription stream
gRPC-->>-Responder: message stream
Note over Responder: Process A2A message<br/>with tracing spans
Responder->>+gRPC: PublishMessage(A2AResponse)
gRPC->>A2A: validateA2AMessage()
A2A->>A2A: check AGENT role,<br/>ContextId correlation
gRPC->>Router: routeA2AEvent(responseEvent)
Router-->>REPL: deliver response event
gRPC-->>-Responder: PublishResponse
end
Note over REPL, Responder: A2A Protocol ensures:<br/>β’ Message structure compliance<br/>β’ Role semantics (USER/AGENT)<br/>β’ Context correlation<br/>β’ Event-driven routingCore Components
1. A2A Protocol Engine (Domain Core)
The heart of the system implementing A2A protocol specifications:
// Core domain logic - technology agnostic
type A2AProtocolEngine struct {
messageValidator MessageValidator
contextManager ContextManager
taskLifecycle TaskLifecycle
}
// A2A message validation
func (e *A2AProtocolEngine) ValidateMessage(msg *Message) error {
// A2A compliance checks
if msg.MessageId == "" { return ErrMissingMessageId }
if msg.Role == ROLE_UNSPECIFIED { return ErrInvalidRole }
if len(msg.Content) == 0 { return ErrEmptyContent }
return nil
}
2. Event Router (Domain Core)
Manages event-driven communication between agents:
type EventRouter struct {
messageSubscribers map[string][]chan *AgentEvent
taskSubscribers map[string][]chan *AgentEvent
eventSubscribers map[string][]chan *AgentEvent
}
func (r *EventRouter) RouteEvent(event *AgentEvent) error {
// Route based on A2A metadata
routing := event.GetRouting()
subscribers := r.getSubscribers(routing.ToAgentId, event.PayloadType)
// Deliver with tracing
for _, sub := range subscribers {
go r.deliverWithTracing(sub, event)
}
}
3. gRPC Adapter (Infrastructure)
Translates between gRPC and domain logic:
type GrpcAdapter struct {
a2aEngine A2AProtocolEngine
eventRouter EventRouter
tracer TracingAdapter
}
func (a *GrpcAdapter) PublishMessage(ctx context.Context, req *PublishMessageRequest) (*PublishResponse, error) {
// Start tracing span
ctx, span := a.tracer.StartA2AMessageSpan(ctx, "publish_message", req.Message.MessageId, req.Message.Role)
defer span.End()
// Validate using domain logic
if err := a.a2aEngine.ValidateMessage(req.Message); err != nil {
a.tracer.RecordError(span, err)
return nil, err
}
// Route using domain logic
event := a.createA2AEvent(req)
if err := a.eventRouter.RouteEvent(event); err != nil {
return nil, err
}
return &PublishResponse{Success: true, EventId: event.EventId}, nil
}
Hexagonal Architecture Benefits
1. Domain Isolation
- A2A protocol logic is pure, testable business logic
- No infrastructure dependencies in the core domain
- Technology-agnostic implementation
2. Adapter Pattern
- gRPC Adapter: Handles Protocol Buffer serialization/deserialization
- Tracing Adapter: OTLP/Jaeger integration without domain coupling
- Health Adapter: Service health monitoring
- Metrics Adapter: Prometheus metrics collection
3. Port Interfaces
// Clean, testable interfaces
type MessagePublisher interface {
PublishMessage(ctx context.Context, msg *Message) (*PublishResponse, error)
}
type EventSubscriber interface {
SubscribeToMessages(ctx context.Context, agentId string) (MessageStream, error)
}
type ObservabilityPort interface {
StartSpan(ctx context.Context, operation string) (context.Context, Span)
RecordMetric(name string, value float64, labels map[string]string)
}
4. Dependency Inversion
- Domain depends on abstractions (ports), not concrete implementations
- Adapters depend on domain through well-defined interfaces
- Easy testing with mock implementations
A2A Protocol Integration
Message Structure Compliance
classDiagram
class A2AMessage {
+string MessageId
+string ContextId
+Role Role
+Part Content
+Metadata Metadata
+string TaskId
}
class Part {
+string Text
+bytes Data
+FileData File
}
class EventMetadata {
+string FromAgentId
+string ToAgentId
+string EventType
+Priority Priority
}
class Role {
<<enumeration>>
USER
AGENT
}
class Metadata {
+Fields map
}
A2AMessage "1" --> "0..*" Part : contains
A2AMessage "1" --> "1" EventMetadata : routed_with
A2AMessage "1" --> "1" Role : has
A2AMessage "1" --> "0..1" Metadata : includesEvent-Driven Architecture
The system implements pure event-driven architecture:
- Publishers emit A2A-compliant events
- Broker routes events based on metadata
- Subscribers receive relevant events
- Correlation through ContextId maintains conversation flow
Observability Integration
Distributed Tracing
sequenceDiagram
participant A as Agent A
participant B as Broker
participant AB as Agent B
participant OTLP as OTLP Collector
participant J as Jaeger
A->>+B: PublishMessage<br/>[trace_id: 123]
B->>B: Create A2A spans<br/>with structured attributes
B->>+AB: RouteEvent<br/>[trace_id: 123]
AB->>AB: Process with<br/>child spans
AB->>-B: PublishResponse<br/>[trace_id: 123]
B->>-A: Success<br/>[trace_id: 123]
par Observability Export
B->>OTLP: Export spans<br/>with A2A attributes
OTLP->>J: Store traces
J->>J: Build trace timeline<br/>with correlation
end
Note over A, J: End-to-end tracing<br/>with A2A protocol visibilityStructured Attributes
Each span includes A2A-specific attributes:
a2a.message.ida2a.message.rolea2a.context.ida2a.event.typea2a.routing.from_agenta2a.routing.to_agent
Testing Strategy
Unit Testing (Domain Core)
func TestA2AEngine_ValidateMessage(t *testing.T) {
engine := NewA2AProtocolEngine()
// Test A2A compliance
msg := &Message{
MessageId: "test_msg_123",
Role: ROLE_USER,
Content: []*Part{{Text: "hello"}},
}
err := engine.ValidateMessage(msg)
assert.NoError(t, err)
}
Integration Testing (Adapters)
func TestGrpcAdapter_PublishMessage(t *testing.T) {
// Mock domain dependencies
mockEngine := &MockA2AEngine{}
mockRouter := &MockEventRouter{}
adapter := NewGrpcAdapter(mockEngine, mockRouter)
// Test adapter behavior
resp, err := adapter.PublishMessage(ctx, validRequest)
assert.NoError(t, err)
assert.True(t, resp.Success)
}
Conclusion
AgentHub’s hexagonal architecture with A2A protocol provides:
- Clean Architecture: Separation of concerns with domain-driven design
- A2A Compliance: Full protocol implementation with validation
- Event-Driven Design: Scalable, loosely-coupled communication
- Rich Observability: Comprehensive tracing and metrics
- Testability: Clean interfaces enable thorough testing
- Flexibility: Easy to extend with new adapters and protocols
This architecture ensures maintainable, scalable, and observable agent communication while maintaining strict A2A protocol compliance.
2 - Core Concepts
Fundamental concepts and principles of AgentHub
Core Concepts
Explore the fundamental concepts, principles, and mental models that underpin AgentHub’s agent-to-agent communication system.
Available Documentation
2.1 - Agent2Agent (A2A) Protocol Migration
Understanding the migration to Agent2Agent protocol compliance while maintaining Event-Driven Architecture benefits.
Agent2Agent (A2A) Protocol Migration
This document explains the migration of AgentHub to full Agent2Agent (A2A) protocol compliance while maintaining the essential Event-Driven Architecture (EDA) patterns that make the system scalable and resilient.
What is the Agent2Agent Protocol?
The Agent2Agent (A2A) protocol is a standardized specification for communication between AI agents. It defines:
- Standardized Message Formats: Using
Message, Part, Task, and Artifact structures - Task Lifecycle Management: Clear states (SUBMITTED, WORKING, COMPLETED, FAILED, CANCELLED)
- Agent Discovery: Using
AgentCard for capability advertisement - Interoperability: Ensuring agents can communicate across different platforms
Why Migrate to A2A?
Benefits of A2A Compliance
- Interoperability: AgentHub can now communicate with any A2A-compliant agent or system
- Standardization: Clear, well-defined message formats reduce integration complexity
- Ecosystem Compatibility: Join the growing ecosystem of A2A-compatible tools
- Future-Proofing: Built on industry standards rather than custom protocols
Maintained EDA Benefits
- Scalability: Event-driven routing scales to thousands of agents
- Resilience: Asynchronous communication handles network partitions gracefully
- Flexibility: Topic-based routing and priority queues enable sophisticated workflows
- Observability: Built-in tracing and metrics for production deployments
Hybrid Architecture
AgentHub implements a hybrid approach that combines the best of both worlds:
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β A2A Protocol Layer β
β βββββββββββββββ βββββββββββββββ βββββββββββββββ ββββββββββββ
β β A2A Message β β A2A Task β β A2A Artifactβ βA2A Agentββ
β β (standard) β β (standard) β β (standard) β β Card ββ
β βββββββββββββββ βββββββββββββββ βββββββββββββββ ββββββββββββ
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β EDA Transport Layer β
β βββββββββββββββ βββββββββββββββ βββββββββββββββ ββββββββββββ
β β AgentEvent β βEvent Router β β Subscribers β βPriority ββ
β β Wrapper β β β β Manager β β Queues ββ
β βββββββββββββββ βββββββββββββββ βββββββββββββββ ββββββββββββ
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β gRPC Infrastructure β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
How It Works
- A2A Messages are created using standard A2A structures (
Message, Task, etc.) - EDA Wrapper wraps A2A messages in
AgentEvent for transport - Event Routing uses EDA patterns (pub/sub, priority, topics) for delivery
- A2A Compliance ensures messages follow A2A protocol semantics
API Changes
Before (Legacy API)
// Legacy TaskMessage (deprecated)
taskPublisher.PublishTask(ctx, &agenthub.PublishTaskRequest{
TaskType: "greeting",
Parameters: map[string]interface{}{
"name": "Claude",
},
RequesterAgentID: "my_agent",
ResponderAgentID: "target_agent",
})
After (A2A-Compliant API)
// A2A-compliant task publishing
content := []*pb.Part{
{
Part: &pb.Part_Text{
Text: "Hello! Please provide a greeting for Claude.",
},
},
}
task, err := taskPublisher.PublishTask(ctx, &agenthub.A2APublishTaskRequest{
TaskType: "greeting",
Content: content,
RequesterAgentID: "my_agent",
ResponderAgentID: "target_agent",
Priority: pb.Priority_PRIORITY_MEDIUM,
ContextID: "conversation_123",
})
Message Structure Changes
message Message {
string message_id = 1; // Unique message identifier
string context_id = 2; // Conversation context
string task_id = 3; // Associated task (optional)
Role role = 4; // USER or AGENT
repeated Part content = 5; // Message content parts
google.protobuf.Struct metadata = 6; // Additional metadata
}
message Part {
oneof part {
string text = 1; // Text content
DataPart data = 2; // Structured data
FilePart file = 3; // File reference
}
}
message Task {
string id = 1; // Task identifier
string context_id = 2; // Conversation context
TaskStatus status = 3; // Current status
repeated Message history = 4; // Message history
repeated Artifact artifacts = 5; // Task outputs
google.protobuf.Struct metadata = 6; // Task metadata
}
enum TaskState {
TASK_STATE_SUBMITTED = 0; // Task created
TASK_STATE_WORKING = 1; // Task in progress
TASK_STATE_COMPLETED = 2; // Task completed successfully
TASK_STATE_FAILED = 3; // Task failed
TASK_STATE_CANCELLED = 4; // Task cancelled
}
Migration Guide
For Publishers
- Replace
TaskPublisher with A2ATaskPublisher - Use
A2APublishTaskRequest with A2A Part structures - Handle returned A2A
Task objects
For Subscribers
- Replace
TaskSubscriber with A2ATaskSubscriber - Update handlers to process A2A
Task and Message objects - Return A2A
Artifact objects instead of custom results
For Custom Integrations
- Update protobuf imports to use
events/a2a package - Replace custom message structures with A2A equivalents
- Use
AgentHub service instead of EventBus
Backward Compatibility
The migration maintains wire-level compatibility through:
- Deprecated Types: Legacy message types marked as deprecated but still supported
- Automatic Conversion: EDA broker converts between legacy and A2A formats when needed
- Graceful Migration: Existing agents can migrate incrementally
Testing A2A Compliance
Run the demo to verify A2A compliance:
# Terminal 1: Start A2A broker
make run-server
# Terminal 2: Start A2A subscriber
make run-subscriber
# Terminal 3: Start A2A publisher
make run-publisher
Expected output shows successful A2A task processing:
- Publisher: “Published A2A task”
- Subscriber: “Task processing completed”
- Artifacts generated in A2A format
Best Practices
- Use A2A Types: Always use A2A message structures for new code
- Context Management: Use
context_id to group related messages - Proper Parts: Structure content using appropriate
Part types - Artifact Returns: Return structured
Artifact objects from tasks - Status Updates: Properly manage task lifecycle states
The A2A migration ensures AgentHub remains both standards-compliant and highly scalable through its hybrid EDA+A2A architecture.
2.2 - Understanding Tasks in Agent2Agent Communication
Tasks are the fundamental unit of work exchange in the Agent2Agent protocol. Deep dive into task semantics, lifecycle, and design patterns.
Understanding Tasks in Agent2Agent Communication
Tasks are the fundamental unit of work exchange in the Agent2Agent protocol. This document provides a deep dive into task semantics, lifecycle, and design patterns.
Task Anatomy
Core Components
Every task in the Agent2Agent system consists of several key components that define its identity, purpose, and execution context:
A2A Task Identity
string id = 1; // Unique task identifier
string context_id = 2; // Optional conversation context
The id serves as a unique identifier that allows all participants to track the task throughout its lifecycle. It should be globally unique and meaningful for debugging purposes.
The context_id groups related tasks in a conversation or workflow context, enabling sophisticated multi-task coordination patterns.
Task classification in A2A is handled through the initial Message content rather than a separate task_type field, providing more flexibility for complex task descriptions.
A2A Task Status and History
TaskStatus status = 3; // Current task status
repeated Message history = 4; // Message history for this task
repeated Artifact artifacts = 5; // Task output artifacts
google.protobuf.Struct metadata = 6; // Task metadata
In A2A, task data is contained within Message content using the structured Part format:
// A2A task request message
message Message {
string message_id = 1;
string context_id = 2;
string task_id = 3;
Role role = 4; // USER (requester) or AGENT (responder)
repeated Part content = 5; // Structured task content
}
message Part {
oneof part {
string text = 1; // Text description
DataPart data = 2; // Structured data
FilePart file = 3; // File references
}
}
// Example: A2A data analysis task
taskMessage := &a2a.Message{
MessageId: "msg_" + uuid.New().String(),
ContextId: "analysis_workflow_123",
TaskId: "task_analysis_456",
Role: a2a.Role_USER,
Content: []*a2a.Part{
{
Part: &a2a.Part_Text{
Text: "Please perform trend analysis on Q4 sales data",
},
},
{
Part: &a2a.Part_Data{
Data: &a2a.DataPart{
Data: analysisParams, // Structured parameters
Description: "Analysis configuration",
},
},
},
},
}
Metadata in A2A tasks provides additional context for execution, auditing, or debugging:
// A2A task metadata
taskMetadata, _ := structpb.NewStruct(map[string]interface{}{
"workflow_id": "workflow_abc123",
"user_id": "user_456",
"request_source": "web_ui",
"correlation_id": "trace_789",
"priority": "high",
"expected_duration": "5m",
})
task := &a2a.Task{
Id: "task_analysis_456",
ContextId: "analysis_workflow_123",
Metadata: taskMetadata,
}
A2A Agent Coordination
In A2A, agent coordination is handled through the EDA routing metadata:
message AgentEventMetadata {
string from_agent_id = 1; // Source agent identifier
string to_agent_id = 2; // Target agent ID (empty = broadcast)
string event_type = 3; // Event classification
repeated string subscriptions = 4; // Topic-based routing tags
Priority priority = 5; // Delivery priority
}
This enables flexible routing patterns:
- from_agent_id identifies the requesting agent
- to_agent_id can specify a target agent or be empty for broadcast
- subscriptions enable topic-based routing for specialized agents
- priority ensures urgent tasks get precedence
A2A Execution Context
A2A handles execution context through the TaskStatus structure:
message TaskStatus {
TaskState state = 1; // SUBMITTED, WORKING, COMPLETED, FAILED, CANCELLED
Message update = 2; // Latest status message
google.protobuf.Timestamp timestamp = 3; // Status timestamp
}
enum TaskState {
TASK_STATE_SUBMITTED = 0;
TASK_STATE_WORKING = 1;
TASK_STATE_COMPLETED = 2;
TASK_STATE_FAILED = 3;
TASK_STATE_CANCELLED = 4;
}
This context helps agents make intelligent scheduling decisions:
- deadline enables time-sensitive prioritization
- priority provides explicit urgency ranking
- created_at enables age-based scheduling policies
Task Lifecycle
1. A2A Task Creation and Publishing
A2A tasks begin their lifecycle when a requesting agent creates a task with an initial message:
// Create A2A task with initial request message
task := &a2a.Task{
Id: "task_analysis_" + uuid.New().String(),
ContextId: "workflow_orchestration_123",
Status: &a2a.TaskStatus{
State: a2a.TaskState_TASK_STATE_SUBMITTED,
Update: &a2a.Message{
MessageId: "msg_" + uuid.New().String(),
TaskId: "task_analysis_" + uuid.New().String(),
Role: a2a.Role_USER,
Content: []*a2a.Part{
{
Part: &a2a.Part_Text{
Text: "Please analyze the quarterly sales data for trends",
},
},
{
Part: &a2a.Part_Data{
Data: &a2a.DataPart{
Data: analysisParams,
Description: "Analysis configuration",
},
},
},
},
},
Timestamp: timestamppb.Now(),
},
}
// Publish to AgentHub broker
client.PublishTaskUpdate(ctx, &pb.PublishTaskUpdateRequest{
Task: task,
Routing: &pb.AgentEventMetadata{
FromAgentId: "data_orchestrator",
ToAgentId: "data_processor_01", // Optional: specific agent
EventType: "task.submitted",
Priority: pb.Priority_PRIORITY_HIGH,
},
})
2. A2A Task Discovery and Acceptance
Agents subscribe to A2A task events and evaluate whether to accept them:
// Agent receives A2A task event
func (a *Agent) evaluateA2ATask(event *pb.AgentEvent) bool {
task := event.GetTask()
if task == nil || task.Status.State != a2a.TaskState_TASK_STATE_SUBMITTED {
return false
}
// Analyze task content to understand requirements
requestMessage := task.Status.Update
taskDescription := a.extractTaskDescription(requestMessage)
// Check if agent can handle this task type
if !a.canHandleTaskType(taskDescription) {
return false
}
// Check capacity constraints
if a.getCurrentLoad() > a.maxCapacity {
return false
}
// Estimate duration from task content and metadata
estimatedDuration := a.estimateA2ATaskDuration(task)
if estimatedDuration > a.maxTaskDuration {
return false
}
return true
}
func (a *Agent) extractTaskDescription(msg *a2a.Message) string {
for _, part := range msg.Content {
if textPart := part.GetText(); textPart != "" {
return textPart
}
}
return ""
}
3. A2A Task Execution with Progress Reporting
Accepted A2A tasks enter the execution phase with regular status updates:
func (a *Agent) executeA2ATask(task *a2a.Task) {
// Update task to WORKING state
a.updateTaskStatus(task, a2a.TaskState_TASK_STATE_WORKING, "Task started")
// Phase 1: Preparation
a.updateTaskStatus(task, a2a.TaskState_TASK_STATE_WORKING, "Preparing data analysis")
prepareResult := a.prepareA2AExecution(task)
// Phase 2: Main processing
a.updateTaskStatus(task, a2a.TaskState_TASK_STATE_WORKING, "Processing data - 50% complete")
processResult := a.processA2AData(prepareResult)
// Phase 3: Finalization
a.updateTaskStatus(task, a2a.TaskState_TASK_STATE_WORKING, "Finalizing results - 75% complete")
finalResult := a.finalizeA2AResults(processResult)
// Completion with artifacts
a.completeTaskWithArtifacts(task, finalResult)
}
func (a *Agent) updateTaskStatus(task *a2a.Task, state a2a.TaskState, message string) {
statusUpdate := &a2a.Message{
MessageId: "msg_" + uuid.New().String(),
TaskId: task.Id,
Role: a2a.Role_AGENT,
Content: []*a2a.Part{
{
Part: &a2a.Part_Text{
Text: message,
},
},
},
}
task.Status = &a2a.TaskStatus{
State: state,
Update: statusUpdate,
Timestamp: timestamppb.Now(),
}
// Publish task update
a.client.PublishTaskUpdate(context.Background(), &pb.PublishTaskUpdateRequest{
Task: task,
Routing: &pb.AgentEventMetadata{
FromAgentId: a.agentId,
EventType: "task.status_update",
},
})
}
4. A2A Result Delivery
A2A task completion delivers results through structured artifacts:
func (a *Agent) completeTaskWithArtifacts(task *a2a.Task, resultData interface{}) {
// Create completion message
completionMessage := &a2a.Message{
MessageId: "msg_" + uuid.New().String(),
TaskId: task.Id,
Role: a2a.Role_AGENT,
Content: []*a2a.Part{
{
Part: &a2a.Part_Text{
Text: "Analysis completed successfully",
},
},
},
}
// Create result artifact
resultArtifact := &a2a.Artifact{
ArtifactId: "artifact_" + uuid.New().String(),
Name: "Analysis Results",
Description: "Quarterly sales trend analysis",
Parts: []*a2a.Part{
{
Part: &a2a.Part_Data{
Data: &a2a.DataPart{
Data: resultData.(structpb.Struct),
Description: "Analysis results and metrics",
},
},
},
},
}
// Update task to completed
task.Status = &a2a.TaskStatus{
State: a2a.TaskState_TASK_STATE_COMPLETED,
Update: completionMessage,
Timestamp: timestamppb.Now(),
}
task.Artifacts = append(task.Artifacts, resultArtifact)
// Publish final task update
a.client.PublishTaskUpdate(context.Background(), &pb.PublishTaskUpdateRequest{
Task: task,
Routing: &pb.AgentEventMetadata{
FromAgentId: a.agentId,
EventType: "task.completed",
},
})
// Publish artifact separately
a.client.PublishTaskArtifact(context.Background(), &pb.PublishTaskArtifactRequest{
TaskId: task.Id,
Artifact: resultArtifact,
Routing: &pb.AgentEventMetadata{
FromAgentId: a.agentId,
EventType: "task.artifact",
},
})
}
A2A Task Design Patterns
1. Simple A2A Request-Response
The most basic pattern where one agent requests work from another using A2A messages:
Agent A ββ[A2A Task]ββ> AgentHub ββ[TaskEvent]ββ> Agent B
Agent A <β[Artifact]βββ AgentHub <β[TaskUpdate]ββ Agent B
A2A Implementation:
// Agent A creates task
task := &a2a.Task{
Id: "simple_task_123",
Status: &a2a.TaskStatus{
State: a2a.TaskState_TASK_STATE_SUBMITTED,
Update: &a2a.Message{
Role: a2a.Role_USER,
Content: []*a2a.Part{{Part: &a2a.Part_Text{Text: "Convert CSV to JSON"}}},
},
},
}
// Agent B responds with artifact
artifact := &a2a.Artifact{
Name: "Converted Data",
Parts: []*a2a.Part{{Part: &a2a.Part_File{File: &a2a.FilePart{FileId: "converted.json"}}}},
}
Use cases:
- File format conversion
- Simple calculations
- Data validation
- Content generation
2. A2A Broadcast Processing
One agent broadcasts a task to multiple potential processors using A2A context-aware routing:
Agent A ββ[A2A Task]ββ> AgentHub ββ[TaskEvent]ββ> Agent Bβ
ββ[TaskEvent]ββ> Agent Bβ
ββ[TaskEvent]ββ> Agent Bβ
A2A Implementation:
// Broadcast task with shared context
task := &a2a.Task{
Id: "broadcast_task_456",
ContextId: "parallel_processing_context",
Status: &a2a.TaskStatus{
State: a2a.TaskState_TASK_STATE_SUBMITTED,
Update: &a2a.Message{
Role: a2a.Role_USER,
Content: []*a2a.Part{
{Part: &a2a.Part_Text{Text: "Process data chunk"}},
{Part: &a2a.Part_Data{Data: &a2a.DataPart{Data: chunkData}}},
},
},
},
}
// Publish without specific target (broadcast)
client.PublishTaskUpdate(ctx, &pb.PublishTaskUpdateRequest{
Task: task,
Routing: &pb.AgentEventMetadata{
FromAgentId: "orchestrator",
// No ToAgentId = broadcast
EventType: "task.broadcast",
},
})
Use cases:
- Distributed computation
- Load testing
- Content distribution
- Parallel processing
3. A2A Pipeline Processing
Tasks flow through a series of specialized agents using shared A2A context:
Agent A ββ[A2A Taskβ]ββ> Agent B ββ[A2A Taskβ]ββ> Agent C ββ[A2A Taskβ]ββ> Agent D
<ββ[Final Artifact]ββββββββββββββββββββββββββββββββββββββββββββββββββββ
A2A Implementation:
// Shared context for pipeline
pipelineContext := "data_pipeline_" + uuid.New().String()
// Stage 1: Data extraction
task1 := &a2a.Task{
Id: "extract_" + uuid.New().String(),
ContextId: pipelineContext,
Status: &a2a.TaskStatus{
State: a2a.TaskState_TASK_STATE_SUBMITTED,
Update: &a2a.Message{
Role: a2a.Role_USER,
Content: []*a2a.Part{{Part: &a2a.Part_Text{Text: "Extract data from source"}}},
},
},
}
// Stage 2: Data transformation (triggered by Stage 1 completion)
task2 := &a2a.Task{
Id: "transform_" + uuid.New().String(),
ContextId: pipelineContext, // Same context
Status: &a2a.TaskStatus{
State: a2a.TaskState_TASK_STATE_SUBMITTED,
Update: &a2a.Message{
Role: a2a.Role_USER,
Content: []*a2a.Part{{Part: &a2a.Part_Text{Text: "Transform extracted data"}}},
},
},
}
// Context linking enables pipeline coordination
Use cases:
- Data processing pipelines
- Image processing workflows
- Document processing chains
- ETL operations
4. A2A Hierarchical Decomposition
Complex tasks are broken down into subtasks using A2A context hierarchy:
Agent A ββ[A2A ComplexTask]ββ> Coordinator
βββ[A2A SubTaskβ]ββ> Specialistβ
βββ[A2A SubTaskβ]ββ> Specialistβ
βββ[A2A SubTaskβ]ββ> Specialistβ
A2A Implementation:
// Parent task
parentTask := &a2a.Task{
Id: "complex_analysis_789",
ContextId: "business_workflow_123",
Status: &a2a.TaskStatus{
State: a2a.TaskState_TASK_STATE_SUBMITTED,
Update: &a2a.Message{
Role: a2a.Role_USER,
Content: []*a2a.Part{{Part: &a2a.Part_Text{Text: "Perform comprehensive business analysis"}}},
},
},
}
// Coordinator creates subtasks with hierarchical context
subtask1 := &a2a.Task{
Id: "financial_analysis_790",
ContextId: "business_workflow_123", // Same parent context
Metadata: map[string]interface{}{
"parent_task_id": "complex_analysis_789",
"subtask_type": "financial",
},
}
subtask2 := &a2a.Task{
Id: "market_analysis_791",
ContextId: "business_workflow_123", // Same parent context
Metadata: map[string]interface{}{
"parent_task_id": "complex_analysis_789",
"subtask_type": "market",
},
}
// Context enables coordination and result aggregation
Use cases:
- Complex business workflows
- Multi-step analysis
- Orchestrated services
- Batch job coordination
5. Competitive Processing
Multiple agents compete to handle the same task (first-come-first-served):
Agent A ββ[Task]ββ> Broker ββ[Task]ββ> Agent Bβ (accepts)
ββ[Task]ββ> Agent Bβ (rejects)
ββ[Task]ββ> Agent Bβ (rejects)
Use cases:
- Resource-constrained environments
- Load balancing
- Fault tolerance
- Performance optimization
A2A Task Content and Semantics
A2A Message-Based Classification
In A2A, task classification is handled through message content rather than rigid type fields, providing more flexibility:
Content-Based Classification
// Data processing task
message := &a2a.Message{
Content: []*a2a.Part{
{Part: &a2a.Part_Text{Text: "Analyze quarterly sales data for trends"}},
{Part: &a2a.Part_Data{Data: &a2a.DataPart{Description: "Analysis parameters"}}},
},
}
// Image processing task
message := &a2a.Message{
Content: []*a2a.Part{
{Part: &a2a.Part_Text{Text: "Generate product image with specifications"}},
{Part: &a2a.Part_Data{Data: &a2a.DataPart{Description: "Image requirements"}}},
},
}
// Notification task
message := &a2a.Message{
Content: []*a2a.Part{
{Part: &a2a.Part_Text{Text: "Send completion notification to user"}},
{Part: &a2a.Part_Data{Data: &a2a.DataPart{Description: "Notification details"}}},
},
}
Operation-Based Classification
create.* - Creation operations
update.* - Modification operations
delete.* - Removal operations
analyze.* - Analysis operations
transform.* - Transformation operations
Complexity-Based Classification
simple.* - Quick, low-resource tasks
standard.* - Normal processing tasks
complex.* - Resource-intensive tasks
background.* - Long-running batch tasks
A2A Content Design Guidelines
Be Explicit: Include all information needed for execution in structured Parts
// Good: Explicit A2A content
content := []*a2a.Part{
{
Part: &a2a.Part_Text{
Text: "Convert CSV file to JSON format with specific options",
},
},
{
Part: &a2a.Part_Data{
Data: &a2a.DataPart{
Data: structpb.NewStruct(map[string]interface{}{
"source_format": "csv",
"target_format": "json",
"include_headers": true,
"delimiter": ",",
"encoding": "utf-8",
}),
Description: "Conversion parameters",
},
},
},
{
Part: &a2a.Part_File{
File: &a2a.FilePart{
FileId: "source_data.csv",
Filename: "data.csv",
MimeType: "text/csv",
},
},
},
}
// Poor: Ambiguous A2A content
content := []*a2a.Part{
{
Part: &a2a.Part_Text{
Text: "Convert file", // Too vague
},
},
}
Use Standard Data Types: Leverage common formats for interoperability
// Good: Standard formats
{
"timestamp": "2024-01-15T10:30:00Z", // ISO 8601
"amount": "123.45", // String for precision
"coordinates": {"lat": 40.7128, "lng": -74.0060}
}
Include Validation Information: Help agents validate inputs
{
"email": "user@example.com",
"email_format": "rfc5322",
"max_length": 254,
"required": true
}
A2A Error Handling and Edge Cases
A2A Task Rejection
Agents should provide meaningful rejection reasons using A2A message format:
func (a *Agent) rejectA2ATask(task *a2a.Task, reason string) {
// Create rejection message
rejectionMessage := &a2a.Message{
MessageId: "msg_" + uuid.New().String(),
TaskId: task.Id,
Role: a2a.Role_AGENT,
Content: []*a2a.Part{
{
Part: &a2a.Part_Text{
Text: "Task rejected: " + reason,
},
},
{
Part: &a2a.Part_Data{
Data: &a2a.DataPart{
Data: structpb.NewStruct(map[string]interface{}{
"rejection_reason": reason,
"agent_id": a.agentId,
"timestamp": time.Now().Unix(),
}),
Description: "Rejection details",
},
},
},
},
}
// Update task status to failed
task.Status = &a2a.TaskStatus{
State: a2a.TaskState_TASK_STATE_FAILED,
Update: rejectionMessage,
Timestamp: timestamppb.Now(),
}
a.publishTaskUpdate(task)
}
Common rejection reasons:
UNSUPPORTED_TASK_TYPE: Agent doesn’t handle this task typeCAPACITY_EXCEEDED: Agent is at maximum capacityDEADLINE_IMPOSSIBLE: Cannot complete within deadlineINVALID_PARAMETERS: Task parameters are malformedRESOURCE_UNAVAILABLE: Required external resources unavailable
Timeout Handling
Both requesters and processors should handle timeouts gracefully:
// Requester timeout
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
select {
case result := <-resultChannel:
// Process result
case <-ctx.Done():
// Handle timeout - possibly retry or fail
}
// Processor timeout
func (a *Agent) executeWithTimeout(task *pb.TaskMessage) {
deadline := task.GetDeadline().AsTime()
ctx, cancel := context.WithDeadline(context.Background(), deadline)
defer cancel()
select {
case result := <-a.processTask(ctx, task):
a.publishResult(task, result, pb.TaskStatus_TASK_STATUS_COMPLETED)
case <-ctx.Done():
a.publishResult(task, nil, pb.TaskStatus_TASK_STATUS_FAILED, "Deadline exceeded")
}
}
Partial Results
For long-running tasks, consider supporting partial results:
type PartialResult struct {
TaskId string
CompletedPortion float64 // 0.0 to 1.0
IntermediateData interface{}
CanResume bool
ResumeToken string
}
Best Practices
Task Design
- Make task types granular but not too fine-grained
- Design for idempotency when possible
- Include retry information in metadata
- Use consistent parameter naming across similar task types
- Version your task schemas to enable evolution
- Batch related tasks when appropriate
- Use appropriate priority levels to avoid starvation
- Set realistic deadlines based on historical performance
- Include resource hints to help with scheduling
- Monitor task completion rates to identify bottlenecks
Security Considerations
- Validate all task parameters before processing
- Sanitize user-provided data in task parameters
- Include authorization context in metadata
- Log task execution for audit trails
- Encrypt sensitive parameters when necessary
A2A tasks form the foundation of Agent2Agent communication, enabling sophisticated distributed processing patterns through structured messages, artifacts, and context-aware coordination. The A2A protocol’s flexible message format and EDA integration provide robust, scalable agent networks with clear semantics and strong observability. Proper A2A task design leverages the protocol’s strengths for building maintainable, interoperable agent systems.
2.3 -
Agent Discovery Workflow Explained
This document explains how the agent discovery workflow operates in AgentHub, enabling dynamic registration and LLM-based orchestration.
Overview
Agent discovery is the process by which agents dynamically register their capabilities with the Cortex orchestrator, making themselves available for intelligent task delegation via an LLM (Large Language Model).
The Problem This Solves
Traditional multi-agent systems require:
- Hard-coded agent configurations
- Static routing rules
- Manual updates when adding new agents
- No intelligence in task routing
Agent discovery with Cortex provides:
- Dynamic registration: Agents announce themselves when they start
- Intelligent routing: LLM decides which agent to use based on capabilities
- Zero configuration: No central registry to update
- Scalable: Add or remove agents without system changes
How It Works
The Five-Step Flow
ββββββββββββββββ
β 1. Agent β Agent starts and creates an AgentCard
β Startup β describing its capabilities
ββββββββ¬ββββββββ
β
βΌ
ββββββββββββββββ
β 2. Register β Agent calls RegisterAgent RPC
β with β sending the AgentCard to broker
β Broker β
ββββββββ¬ββββββββ
β
βΌ
ββββββββββββββββ
β 3. Event β Broker publishes AgentCardEvent
β Publishingβ broadcasting to all subscribers
ββββββββ¬ββββββββ
β
βΌ
ββββββββββββββββ
β 4. Cortex β Cortex receives event and stores
β Discovery β agent in its registry
ββββββββ¬ββββββββ
β
βΌ
ββββββββββββββββ
β 5. LLM β Agent is now available in LLM
β Integrationβ prompts for intelligent delegation
ββββββββββββββββ
Step 1: Agent Startup
When an agent starts, it creates an AgentCard that describes:
agentCard := &pb.AgentCard{
Name: "agent_translator",
Description: "Language translation service",
Version: "1.0.0",
Skills: []*pb.AgentSkill{
{
Name: "Text Translation",
Description: "Translates text between languages",
Examples: [
"Translate this to Spanish",
"Convert to French",
],
},
},
}
Key Components:
- Name: Unique identifier (used for routing)
- Description: What the agent does (helps LLM understand)
- Skills: Specific capabilities with examples (used for matching)
Step 2: Registration with Broker
The agent registers by calling the broker’s RegisterAgent RPC:
client.RegisterAgent(ctx, &pb.RegisterAgentRequest{
AgentCard: agentCard,
Subscriptions: []string{"translation_request"},
})
What happens:
- Broker validates the AgentCard
- Stores agent in its registry:
registeredAgents[agentID] = card - Returns success response
Step 3: Event Publishing
The broker immediately publishes an AgentCardEvent:
event := &pb.AgentEvent{
EventId: "agent_registered_translator_...",
Timestamp: now(),
Payload: &pb.AgentEvent_AgentCard{
AgentCard: &pb.AgentCardEvent{
AgentId: "agent_translator",
AgentCard: agentCard,
EventType: "registered",
},
},
Routing: &pb.AgentEventMetadata{
FromAgentId: "agent_translator",
ToAgentId: "", // Broadcast
EventType: "agent.registered",
Priority: PRIORITY_HIGH,
},
}
Routing characteristics:
- Broadcast to all subscribers (empty
ToAgentId) - High priority (processed immediately)
- Event type clearly marked as “agent.registered”
Step 4: Cortex Discovery
Cortex subscribes to agent events:
stream, _ := client.SubscribeToAgentEvents(ctx, &pb.SubscribeToAgentEventsRequest{
AgentId: "cortex",
EventTypes: []string{"agent.registered", "agent.updated"},
})
When receiving an agent card event, Cortex:
func handleAgentCardEvent(event *pb.AgentCardEvent) {
agentID := event.GetAgentId()
agentCard := event.GetAgentCard()
// Store agent
cortex.RegisterAgent(agentID, agentCard)
// Log skills for visibility
log.Info("Agent registered",
"agent_id", agentID,
"skills", extractSkillNames(agentCard.Skills))
}
Result: Agent is now in Cortex’s registeredAgents map.
Step 5: LLM Integration
When a user sends a request, Cortex queries the LLM:
decision, _ := llm.Decide(
conversationHistory,
availableAgents, // Includes our new agent!
newUserMessage,
)
The LLM sees:
Available agents:
- agent_translator: Language translation service
Skills:
* Text Translation: Translates text between languages
Examples: "Translate this to Spanish", "Convert to French"
Decision making:
- User asks: “Can you translate this to Spanish?”
- LLM sees “agent_translator” with matching examples
- LLM decides: Delegate to agent_translator
- Cortex sends task to agent_translator
- Agent processes and responds
- Cortex synthesizes final response
Message Flow Diagram
sequenceDiagram
participant A as Translation Agent
participant B as Broker
participant C as Cortex
participant L as LLM (VertexAI)
participant U as User
Note over A: Step 1: Startup
A->>A: Create AgentCard
Note over A,B: Step 2: Registration
A->>B: RegisterAgent(card)
B->>B: Store in registry
Note over B: Step 3: Event Publishing
B->>C: AgentCardEvent (broadcast)
Note over C: Step 4: Discovery
C->>C: RegisterAgent(id, card)
C->>C: total_agents++
Note over U,L: Step 5: LLM Integration
U->>C: "Translate to Spanish"
C->>L: Decide(availableAgents)
Note over L: Sees translator agent<br/>with matching examples
L-->>C: {delegate: agent_translator}
C->>A: Task message
A->>A: Process translation
A->>C: Result
C->>L: Synthesize
L-->>C: Final response
C->>U: "Here's the Spanish: ..."Technical Implementation Details
Thread Safety
Agent registration is thread-safe:
type AgentHubService struct {
registeredAgents map[string]*pb.AgentCard
agentsMu sync.RWMutex
}
func (s *AgentHubService) RegisterAgent(...) {
s.agentsMu.Lock()
s.registeredAgents[agentID] = card
s.agentsMu.Unlock()
}
Multiple agents can register concurrently without conflicts.
Event Delivery
Events are delivered asynchronously:
for _, subChan := range targetChannels {
go func(ch chan *pb.AgentEvent) {
select {
case ch <- event:
// Delivered
case <-time.After(5 * time.Second):
// Timeout
}
}(subChan)
}
Benefits:
- Non-blocking: Broker doesn’t wait for all deliveries
- Resilient: Timeout prevents hanging
- Concurrent: Multiple subscribers receive events in parallel
LLM Prompt Generation
Cortex builds prompts dynamically:
func buildOrchestrationPrompt(availableAgents []*pb.AgentCard) string {
prompt := "Available agents:\n"
for _, agent := range availableAgents {
prompt += fmt.Sprintf("- %s: %s\n",
agent.Name, agent.Description)
for _, skill := range agent.Skills {
prompt += fmt.Sprintf(" Skills:\n")
prompt += fmt.Sprintf(" * %s: %s\n",
skill.Name, skill.Description)
}
}
return prompt
}
Updated automatically when new agents register.
Typical timings for agent discovery:
Agent startup: 100-200ms
RegisterAgent RPC: < 10ms
Event publishing: < 5ms
Event delivery: < 50ms
Cortex processing: < 10ms
Total discovery time: < 300ms
Fast enough that agents are available for routing within milliseconds of starting.
Error Handling
Registration Failures
If registration fails:
_, err := client.RegisterAgent(ctx, req)
if err != nil {
log.Error("Registration failed", "error", err)
// Agent should retry or exit
panic(err)
}
Common causes:
- Broker not running
- Network issues
- Invalid AgentCard (empty name)
Event Delivery Failures
If event delivery fails:
if err := s.routeEvent(ctx, event); err != nil {
log.Warn("Event routing failed", "error", err)
// Continue anyway - registration still succeeded
}
Graceful degradation: Registration succeeds even if event routing fails.
Cortex Not Subscribed
If Cortex isn’t subscribed yet:
- Events are still published
- Cortex can query
GetAgentCard() RPC later - Or register when Cortex starts
Resilient: System handles various startup orders.
Observability
Broker Logs
level=INFO msg="Agent registered" agent_id=agent_translator
level=DEBUG msg="Routing event to subscribers"
event_type=agent.registered subscriber_count=2
level=DEBUG msg="Event delivered to subscriber"
Cortex Logs
level=INFO msg="Received agent card event"
agent_id=agent_translator event_type=registered
level=INFO msg="Agent skills registered"
skills="[Text Translation: Translates...]"
level=INFO msg="Agent registered with Cortex orchestrator"
total_agents=3
Distributed Tracing
Agent registration creates trace spans:
agent_registered_translator
ββ broker.route_event
ββ deliver_to_cortex
ββ deliver_to_monitor
Visibility into the entire discovery flow.
Lifecycle Management
Agent Startup Sequence
1. Create AgentHub client
2. Connect to broker
3. Create AgentCard
4. Call RegisterAgent
5. Subscribe to messages
6. Enter processing loop
Agent Shutdown
Currently agents don’t explicitly unregister. For graceful shutdown:
// In future enhancement:
defer client.UnregisterAgent(ctx, &pb.UnregisterAgentRequest{
AgentId: myAgentID,
})
This would trigger an “agent.unregistered” event.
Agent Updates
To update capabilities:
// Modify AgentCard
agentCard.Skills = append(agentCard.Skills, newSkill)
// Re-register
client.RegisterAgent(ctx, &pb.RegisterAgentRequest{
AgentCard: agentCard,
})
// Triggers "agent.updated" event
Cortex receives update and refreshes its registry.
Comparison with Other Patterns
vs. Service Discovery (Consul, etcd)
Agent Discovery:
- Includes capability metadata (skills)
- Optimized for LLM consumption
- Event-driven notification
- Rich semantic information
Service Discovery:
- Network location only
- Health checks
- Static metadata
- Pull-based queries
vs. API Gateway
Agent Discovery:
- Dynamic routing based on content
- LLM makes intelligent decisions
- Supports complex multi-step workflows
API Gateway:
- Path-based routing
- Static configuration
- Single request-response
vs. Message Queues
Agent Discovery:
- Agents know their capabilities
- Centralized intelligence (Cortex)
- Rich metadata for decisions
Message Queues:
- Topic-based routing
- No central intelligence
- Minimal metadata
Design Decisions
Why Broadcast Events?
Decision: Publish agent cards to all subscribers
Alternatives considered:
- Point-to-point to Cortex only
- Store-and-query model
Rationale:
- Multiple orchestrators can coexist
- Monitoring agents can track all agents
- Extensible for future use cases
- Low overhead (events are small)
Why High Priority?
Decision: Agent registration events use PRIORITY_HIGH
Rationale:
- New agents should be available quickly
- User requests may come immediately
- Discovery is time-sensitive
- Low volume (not many registrations)
Why Skills with Examples?
Decision: Include example user requests in skills
Rationale:
- LLMs learn by example
- Natural language is ambiguous
- Examples disambiguate capabilities
- Improves matching accuracy
Future Enhancements
See AGENT_DECIDE.md for planned improvements:
- Agent Health Monitoring: Track agent availability
- Agent Deregistration: Explicit removal from registry
- Agent Versioning: Support multiple versions simultaneously
- Capability Queries: Search agents by capability
- Load Balancing: Distribute work among multiple instances
Conclusion
The agent discovery workflow enables:
- Zero-configuration agent deployment
- Intelligent routing via LLM
- Dynamic scaling of agent pools
- Automatic orchestration based on capabilities
- Flexible, extensible multi-agent systems
This architecture supports truly autonomous, self-organizing agent networks that can adapt to changing requirements without manual intervention.
2.4 -
The Agent2Agent Protocol and AgentHub Implementation
This document explores the core principles of Google’s Agent2Agent protocol and how AgentHub implements a communication broker based on these concepts. We distinguish between the Agent2Agent protocol specification (task structures and communication patterns) and our custom AgentHub broker implementation.
Agent2Agent vs AgentHub: What’s What
Agent2Agent Protocol (Google)
The Agent2Agent protocol defines:
- Task Message Structures:
TaskMessage, TaskResult, TaskProgress with their fields and semantics - Task Status and Priority Enums: Standardized task lifecycle and priority levels
- Communication Patterns: Asynchronous task delegation and result reporting concepts
AgentHub Implementation (This Project)
AgentHub provides:
- Event Bus Broker: Centralized gRPC service that routes tasks between agents
- Pub/Sub Architecture: Publisher-subscriber pattern for task distribution
- Subscription Mechanisms:
SubscribeToTasks, SubscribeToTaskResults, SubscribeToTaskProgress methods - Agent Implementations: Sample publisher and subscriber agents demonstrating the protocol
Philosophy and Core Concepts
Beyond Simple Request-Response
Traditional software architectures rely heavily on synchronous request-response patterns where a client requests a service and waits for an immediate response. While effective for simple operations, this pattern has limitations when dealing with:
- Complex, multi-step processes that require coordination between multiple specialized services
- Long-running operations that may take minutes or hours to complete
- Dynamic workload distribution where the best processor for a task may vary over time
- Autonomous decision-making where agents need to collaborate without central coordination
The Agent2Agent protocol addresses these limitations by defining task structures and communication patterns for autonomous agents. AgentHub implements a broker-based system that enables agents to communicate using Agent2Agent-inspired task structures:
- Delegating work to other agents based on their capabilities
- Accepting and processing tasks according to their specializations
- Reporting progress during long-running operations
- Making collaborative decisions about task distribution and execution
Autonomous Collaboration
In an Agent2Agent system, each agent operates with a degree of autonomy, making decisions about:
- Which tasks to accept based on current capacity and capabilities
- How to prioritize work when multiple tasks are pending
- When to delegate subtasks to other specialized agents
- How to report progress and handle failures
This autonomy enables the system to be more resilient, scalable, and adaptive compared to centrally-controlled architectures.
Key Design Principles
1. Asynchronous Communication
Agent2Agent communication is fundamentally asynchronous. When Agent A requests work from Agent B:
- Agent A doesn’t block waiting for completion
- Agent B can process the task when resources are available
- Progress updates provide visibility into long-running operations
- Results are delivered when the work is complete
This asynchronicity enables:
- Better resource utilization as agents aren’t blocked waiting
- Improved scalability as systems can handle more concurrent operations
- Enhanced resilience as temporary agent unavailability doesn’t block the entire system
2. Rich Task Semantics (Agent2Agent Protocol)
The Agent2Agent protocol defines rich task message structures that AgentHub implements:
message TaskMessage {
string task_id = 1; // Unique identifier for tracking
string task_type = 2; // Semantic type (e.g., "data_analysis")
google.protobuf.Struct parameters = 3; // Flexible parameters
string requester_agent_id = 4; // Who requested the work
string responder_agent_id = 5; // Who should do the work (optional)
google.protobuf.Timestamp deadline = 6; // When it needs to be done
Priority priority = 7; // How urgent it is
google.protobuf.Struct metadata = 8; // Additional context
}
This rich structure enables:
- Intelligent routing based on task type and agent capabilities
- Priority-based scheduling to ensure urgent tasks are handled first
- Deadline awareness for time-sensitive operations
- Context preservation for better decision-making
3. Explicit Progress Tracking
Long-running tasks benefit from explicit progress reporting:
message TaskProgress {
string task_id = 1; // Which task this refers to
TaskStatus status = 2; // Current status
string progress_message = 3; // Human-readable description
int32 progress_percentage = 4; // Quantitative progress (0-100)
google.protobuf.Struct progress_data = 5; // Structured progress information
}
This enables:
- Visibility into system operations for monitoring and debugging
- User experience improvements with real-time progress indicators
- Resource planning by understanding how long operations typically take
- Early failure detection when progress stalls unexpectedly
4. Flexible Agent Addressing
The protocol supports multiple addressing patterns:
- Direct addressing: Tasks sent to specific agents by ID
- Broadcast addressing: Tasks sent to all capable agents
- Capability-based routing: Tasks routed based on agent capabilities
- Load-balanced routing: Tasks distributed among agents with similar capabilities
This flexibility enables different architectural patterns within the same system.
Architectural Patterns
Microservices Enhancement
In a microservices architecture, Agent2Agent can enhance service communication by:
- Replacing synchronous HTTP calls with asynchronous task delegation
- Adding progress visibility to long-running service operations
- Enabling service composition through task chaining
- Improving resilience through task retry and timeout mechanisms
Event-Driven Architecture Integration
Agent2Agent complements event-driven architectures by:
- Adding structure to event processing with explicit task semantics
- Enabling bidirectional communication where events can trigger tasks that produce responses
- Providing progress tracking for complex event processing workflows
- Supporting task-based coordination alongside pure event broadcasting
Workflow Orchestration
Complex business processes can be modeled as Agent2Agent workflows:
- Process Initiation: A workflow agent receives a high-level business request
- Task Decomposition: The request is broken down into specific tasks
- Agent Coordination: Tasks are distributed to specialized agents
- Progress Aggregation: Individual task progress is combined into overall workflow status
- Result Assembly: Task results are combined into a final business outcome
Benefits and Trade-offs
Benefits
Scalability: Asynchronous operation and agent autonomy enable horizontal scaling without central bottlenecks.
Resilience: Agent failures don’t cascade as easily since tasks can be retried or redistributed.
Flexibility: New agent types can be added without modifying existing agents.
Observability: Rich task semantics and progress reporting provide excellent visibility into system operations.
Modularity: Agents can be developed, deployed, and scaled independently.
Trade-offs
Complexity: The system requires more sophisticated error handling and state management compared to simple request-response patterns.
Latency: For simple operations, the overhead of task creation and routing may add latency compared to direct calls.
Debugging: Distributed, asynchronous operations can be more challenging to debug than synchronous call chains.
Consistency: Managing data consistency across asynchronous agent operations requires careful design.
When to Use Agent2Agent
Agent2Agent is particularly well-suited for:
Complex Processing Pipelines
When work involves multiple steps that can be performed by different specialized agents:
- Data ingestion β validation β transformation β analysis β reporting
- Image upload β virus scan β thumbnail generation β metadata extraction
- Order processing β inventory check β payment processing β fulfillment
Long-Running Operations
When operations take significant time and users need progress feedback:
- Large file processing
- Machine learning model training
- Complex data analysis
- Batch job processing
Dynamic Load Distribution
When workload characteristics vary and different agents may be better suited for different tasks:
- Multi-tenant systems with varying customer requirements
- Resource-intensive operations that need specialized hardware
- Geographic distribution where local processing is preferred
System Integration
When connecting heterogeneous systems that need to coordinate:
- Third-party service coordination
- Cross-platform workflows
Comparison with Other Patterns
vs. Message Queues
Traditional message queues provide asynchronous communication but lack:
- Rich task semantics
- Progress tracking
- Bidirectional result delivery
- Priority and deadline awareness
vs. RPC/HTTP APIs
RPC and HTTP APIs provide structured communication but are typically:
- Synchronous (blocking)
- Lacking progress visibility
- Point-to-point rather than flexible routing
- Without built-in retry and timeout semantics
vs. Event Sourcing
Event sourcing provides audit trails and state reconstruction but:
- Focuses on state changes rather than work coordination
- Lacks explicit progress tracking
- Doesn’t provide direct task completion feedback
- Requires more complex query patterns for current state
The SubAgent Library: Simplifying Agent Development
While the Agent2Agent protocol and AgentHub broker provide powerful capabilities for building distributed agent systems, implementing agents from scratch requires significant boilerplate code. The SubAgent library addresses this by providing a high-level abstraction that handles infrastructure concerns, letting developers focus on business logic.
The Problem: Too Much Boilerplate
Traditional agent implementation requires:
- ~200+ lines of setup code: gRPC client configuration, connection management, health checks
- A2A protocol compliance: Correct AgentCard structure with all required fields
- Subscription management: Setting up task streams and handling lifecycle
- Observability integration: Manual tracing span creation, logging, metrics
- Error handling: Graceful shutdown, signal handling, resource cleanup
This creates several issues:
- High barrier to entry: New agents require deep knowledge of the infrastructure
- Code duplication: Every agent reimplements the same patterns
- Maintenance burden: Infrastructure changes require updates across all agents
- Inconsistent quality: Some agents may have better observability or error handling than others
The Solution: Infrastructure as a Library
The SubAgent library encapsulates all infrastructure concerns into a simple, composable API:
// 1. Configure your agent
config := &subagent.Config{
AgentID: "my_agent",
Name: "My Agent",
Description: "Does something useful",
}
// 2. Create and register skills
agent, _ := subagent.New(config)
agent.MustAddSkill("Skill Name", "Description", handlerFunc)
// 3. Run (everything else is automatic)
agent.Run(ctx)
This reduces agent implementation from ~200 lines to ~50 lines (75% reduction), letting developers focus entirely on their domain logic.
Architecture
The SubAgent library implements a layered architecture:
βββββββββββββββββββββββββββββββββββββββββββ
β Your Business Logic β
β (Handler Functions: ~30 lines) β
βββββββββββββββββββββββββββββββββββββββββββ€
β SubAgent Library β
β - Config & Validation β
β - AgentCard Creation (A2A compliant) β
β - Task Subscription & Routing β
β - Automatic Observability β
β - Lifecycle Management β
βββββββββββββββββββββββββββββββββββββββββββ€
β AgentHub Client Library β
β - gRPC Connection β
β - Message Publishing/Subscription β
β - TraceManager, Metrics, Logging β
βββββββββββββββββββββββββββββββββββββββββββ€
β AgentHub Broker β
β - Event Routing β
β - Agent Registry β
β - Task Distribution β
βββββββββββββββββββββββββββββββββββββββββββ
Key Features
1. Declarative Configuration
Instead of imperative setup code, agents use declarative configuration:
config := &subagent.Config{
AgentID: "agent_translator", // Required
Name: "Translation Agent", // Required
Description: "Translates text", // Required
Version: "1.0.0", // Optional, defaults
HealthPort: "8087", // Optional, defaults
}
The library:
- Validates all required fields
- Applies sensible defaults for optional fields
- Returns clear error messages for configuration issues
2. Skill-Based Programming Model
Agents define capabilities as “skills” - discrete units of functionality:
agent.MustAddSkill(
"Language Translation", // Name (shown to LLM)
"Translates text between languages", // Description
translateHandler, // Implementation
)
Each skill maps to a handler function with a clear signature:
func (ctx, task, message) -> (artifact, state, errorMessage)
This model:
- Encourages single-responsibility design
- Makes capabilities explicit and discoverable
- Simplifies testing (handlers are pure functions)
- Enables skill-based task routing
3. Automatic A2A Compliance
The library generates complete, A2A-compliant AgentCards:
// Developer writes:
agent.MustAddSkill("Translate", "Translates text", handler)
// Library generates:
&pb.AgentCard{
ProtocolVersion: "0.2.9",
Name: "agent_translator",
Description: "Translation Agent",
Version: "1.0.0",
Skills: []*pb.AgentSkill{
{
Id: "skill_0",
Name: "Translate",
Description: "Translates text",
Tags: []string{"Translate"},
InputModes: []string{"text/plain"},
OutputModes: []string{"text/plain"},
},
},
Capabilities: &pb.AgentCapabilities{
Streaming: false,
PushNotifications: false,
},
}
This ensures all agents follow protocol standards without manual effort.
4. Built-In Observability
Every task execution is automatically wrapped with observability:
Tracing:
// Automatic span creation for each task
taskSpan := traceManager.StartSpan(ctx, "agent.{agentID}.handle_task")
traceManager.AddA2ATaskAttributes(taskSpan, taskID, skillName, contextID, ...)
traceManager.SetSpanSuccess(taskSpan) // or RecordError()
Logging:
// Automatic structured logging
logger.InfoContext(ctx, "Processing task", "task_id", taskID, "skill", skillName)
logger.ErrorContext(ctx, "Task failed", "error", err)
Metrics:
- Task processing duration
- Success/failure counts
- Active task count
- (via AgentHubClient metrics)
Developers get full distributed tracing and logging without writing any observability code.
5. Lifecycle Management
The library handles the complete agent lifecycle:
Startup:
- Validate configuration
- Connect to broker (with retries)
- Register AgentCard
- Subscribe to tasks
- Start health check server
- Signal “ready”
Runtime:
- Receive tasks from broker
- Route to appropriate handler
- Execute with tracing/logging
- Publish results
- Handle errors gracefully
Shutdown:
- Catch SIGINT/SIGTERM signals
- Stop accepting new tasks
- Wait for in-flight tasks (with timeout)
- Close broker connection
- Cleanup resources
- Exit cleanly
All automatically - developers never write lifecycle code.
Design Patterns
The Handler Pattern
Handlers are pure functions that transform inputs to outputs:
func myHandler(ctx context.Context, task *pb.Task, message *pb.Message)
(*pb.Artifact, pb.TaskState, string) {
// Extract input
input := extractInput(message)
// Validate
if err := validate(input); err != nil {
return nil, TASK_STATE_FAILED, err.Error()
}
// Process
result := process(ctx, input)
// Create artifact
artifact := createArtifact(result)
return artifact, TASK_STATE_COMPLETED, ""
}
This pattern:
- Testable: Pure functions are easy to unit test
- Composable: Handlers can call other functions
- Error handling: Explicit return of state and error message
- Context-aware: Receives context for cancellation and tracing
The Configuration Pattern
Configuration is separated from code:
// Development
config := &subagent.Config{
AgentID: "my_agent",
HealthPort: "8080",
}
// Production (from environment)
config := &subagent.Config{
AgentID: os.Getenv("AGENT_ID"),
BrokerAddr: os.Getenv("BROKER_ADDR"),
HealthPort: os.Getenv("HEALTH_PORT"),
}
This enables:
- Different configs for dev/staging/prod
- Easy testing with mock configs
- Container-friendly (12-factor app)
Benefits
For Developers:
- Faster development: 75% less code to write
- Lower complexity: Focus on business logic, not infrastructure
- Better quality: Automatic best practices (observability, error handling)
- Easier testing: Handler functions are pure and testable
- Clearer structure: Skill-based organization is intuitive
For Operations:
- Consistent observability: All agents have same tracing/logging
- Standard health checks: Uniform health endpoints
- Predictable behavior: Lifecycle management is consistent
- Easy monitoring: Metrics are built-in
- Reliable shutdown: Graceful handling is automatic
For the System:
- Better integration: All agents follow same patterns
- Easier debugging: Consistent trace structure across agents
- Simplified maintenance: Library updates improve all agents
- Reduced errors: Less custom code means fewer bugs
Evolution Path
The SubAgent library provides a clear evolution path for agent development:
Phase 1: Simple Agents (Current)
- Single skills, synchronous processing
- Text input/output
- Uses library defaults
Phase 2: Advanced Agents
- Multiple skills per agent
- Streaming responses
- Custom capabilities
- Extended AgentCard fields
Phase 3: Specialized Agents
- Custom observability (additional traces/metrics)
- Advanced error handling
- Multi-modal input/output
- Stateful processing
The library supports all phases through its extensibility points (GetClient(), GetLogger(), custom configs).
Comparison with Manual Implementation
| Aspect | Manual Implementation | SubAgent Library |
|---|
| Lines of Code | ~200 lines setup | ~50 lines total |
| Configuration | 50+ lines imperative | 10 lines declarative |
| AgentCard | Manual struct creation | Automatic generation |
| Observability | Manual span/log calls | Automatic wrapping |
| Lifecycle | Custom signal handling | Built-in management |
| Error Handling | Scattered throughout | Centralized in library |
| Testing | Must mock infrastructure | Test handlers directly |
| Maintenance | Per-agent updates needed | Library update benefits all |
| Learning Curve | High (need infrastructure knowledge) | Low (focus on logic) |
| Time to First Agent | Several hours | Under 30 minutes |
Real-World Impact
The Echo Agent demonstrates the library’s impact:
Before SubAgent Library (211 lines):
- Manual client setup: 45 lines
- AgentCard creation: 30 lines
- Task subscription: 60 lines
- Handler implementation: 50 lines
- Lifecycle management: 26 lines
With SubAgent Library (82 lines):
- Configuration: 10 lines
- Skill registration: 5 lines
- Handler implementation: 50 lines
- Run: 2 lines
- Everything else: automatic
The business logic (50 lines) stays the same, but infrastructure code (161 lines) is eliminated.
When to Use SubAgent Library
Use SubAgent Library when:
- Building new agents from scratch
- Agent has 1-10 skills with clear boundaries
- Standard A2A protocol is sufficient
- You want consistent observability across agents
- Quick development time is important
Consider Manual Implementation when:
- Highly custom protocol requirements
- Need very specific lifecycle control
- Existing agent migration (may not be worth refactoring)
- Experimental/research agents with non-standard patterns
For 99% of agent development, the SubAgent library is the right choice.
Future Evolution
The Agent2Agent principle opens possibilities for:
Intelligent Agent Networks
Agents that learn about each other’s capabilities and performance characteristics to make better delegation decisions.
Self-Organizing Systems
Agent networks that automatically reconfigure based on workload patterns and agent availability.
Cross-Organization Collaboration
Extending Agent2Agent protocols across organizational boundaries for B2B workflow automation.
AI Agent Integration
Natural integration points for AI agents that can understand task semantics and make autonomous decisions about task acceptance and delegation.
The Agent2Agent principle represents a foundational shift toward more intelligent, autonomous, and collaborative software systems that can handle the complexity of modern distributed applications while providing the visibility and control that operators need.
3 - Features
Deep explanations of AgentHub’s key features and capabilities
Feature Explanations
Detailed explanations of AgentHub’s advanced features, their design rationale, and implementation details.
Available Documentation
3.1 - Distributed Tracing & OpenTelemetry
Deep dive into distributed tracing concepts, OpenTelemetry architecture, and how AgentHub implements comprehensive observability for event-driven systems.
π Distributed Tracing & OpenTelemetry
Understanding-oriented: Deep dive into distributed tracing concepts, OpenTelemetry architecture, and how AgentHub implements comprehensive observability for event-driven systems.
The Problem: Observing Distributed Systems
Traditional monolithic applications are relatively easy to debugβeverything happens in one process, on one machine, with one log file. But modern event-driven architectures like AgentHub present unique challenges:
The Complexity of Event-Driven Systems
Request Flow in AgentHub:
User β Publisher Agent β AgentHub Broker β Subscriber Agent β Result β Publisher Agent
Each step involves:
- Different processes (potentially on different machines)
- Asynchronous communication (events, not direct calls)
- Multiple protocol layers (gRPC, HTTP, network)
- Independent failure modes (network partitions, service crashes)
- Varying performance characteristics (CPU, memory, I/O)
Traditional Debugging Challenges
Without distributed tracing:
Publisher logs: "Published task task_123 at 10:00:01"
Broker logs: "Received task from agent_pub at 10:00:01"
"Routed task to agent_sub at 10:00:01"
Subscriber logs: "Processing task task_456 at 10:00:02"
"Completed task task_789 at 10:00:03"
Questions you can’t answer:
- Which subscriber processed task_123?
- How long did task_123 take end-to-end?
- Where did task_123 fail?
- What was the complete flow for a specific request?
The Solution: Distributed Tracing
Distributed tracing solves these problems by creating a unified view of requests as they flow through multiple services.
Core Concepts
Trace
A trace represents a complete request journey through the system. In AgentHub, a trace might represent:
- Publishing a task
- Processing the task
- Publishing the result
- Receiving the result
Trace ID: a1b2c3d4e5f67890
Duration: 150ms
Services: 3 (publisher, broker, subscriber)
Spans: 5
Status: Success
Span
A span represents a single operation within a trace. Each span has:
- Name: What operation it represents
- Start/End time: When it happened
- Tags: Metadata about the operation
- Logs: Events that occurred during the operation
- Status: Success, error, or timeout
Span: "publish_event"
Service: agenthub-publisher
Duration: 25ms
Tags:
event.type: "greeting"
event.id: "task_123"
responder.agent: "agent_demo_subscriber"
Status: OK
Span Context
The glue that connects spans across service boundaries. Contains:
- Trace ID: Unique identifier for the entire request
- Span ID: Unique identifier for the current operation
- Trace Flags: Sampling decisions, debug mode, etc.
How Tracing Works in AgentHub
1. Trace Initiation
When a publisher creates a task, it starts a new trace:
// Publisher starts a trace
ctx, span := tracer.Start(ctx, "publish_event")
defer span.End()
// Add metadata
span.SetAttributes(
attribute.String("event.type", "greeting"),
attribute.String("event.id", taskID),
)
2. Context Propagation
The trace context is injected into the task metadata:
// Inject trace context into task headers
headers := make(map[string]string)
otel.GetTextMapPropagator().Inject(ctx, propagation.MapCarrier(headers))
// Embed headers in task metadata
task.Metadata = &structpb.Struct{
Fields: map[string]*structpb.Value{
"trace_headers": structpb.NewStructValue(&structpb.Struct{
Fields: stringMapToStructFields(headers),
}),
},
}
The broker and subscriber extract the trace context:
// Extract trace context from task metadata
if metadata := task.GetMetadata(); metadata != nil {
if traceHeaders, ok := metadata.Fields["trace_headers"]; ok {
headers := structFieldsToStringMap(traceHeaders.GetStructValue().Fields)
ctx = otel.GetTextMapPropagator().Extract(ctx, propagation.MapCarrier(headers))
}
}
// Continue the trace
ctx, span := tracer.Start(ctx, "process_event")
defer span.End()
4. Complete Request Flow
The result is a complete trace showing the entire request journey:
Trace: a1b2c3d4e5f67890
βββ publish_event (agenthub-publisher) [25ms]
β βββ event.type: greeting
β βββ event.id: task_123
βββ route_task (agenthub-broker) [2ms]
β βββ source.agent: agent_demo_publisher
β βββ target.agent: agent_demo_subscriber
βββ consume_event (agenthub-subscriber) [5ms]
β βββ messaging.operation: receive
βββ process_task (agenthub-subscriber) [98ms]
β βββ task.type: greeting
β βββ task.parameter.name: Claude
β βββ processing.status: completed
βββ publish_result (agenthub-subscriber) [20ms]
βββ result.status: success
OpenTelemetry Architecture
OpenTelemetry is the observability framework that powers AgentHub’s tracing implementation.
The OpenTelemetry Stack
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Applications β
β βββββββββββββββ βββββββββββββββ βββββββββββββββ β
β β Publisher β β Broker β β Subscriber β β
β βββββββββββββββ βββββββββββββββ βββββββββββββββ β
βββββββββββββββββββ¬ββββββββββββββββ¬ββββββββββββββββ¬ββββββ
β β β
βββββββββββββββββββΌββββββββββββββββΌββββββββββββββββΌββββββ
β OpenTelemetry SDK β
β βββββββββββββββ βββββββββββββββ βββββββββββββββ β
β β Tracer β β Meter β β Logger β β
β βββββββββββββββ βββββββββββββββ βββββββββββββββ β
βββββββββββββββββββββββββββββββββββ¬ββββββββββββββββββββββββ
β
βββββββββββββββββββββββββββββββββββΌββββββββββββββββββββββββ
β OpenTelemetry Collector β
β βββββββββββββββ βββββββββββββββ βββββββββββββββ β
β β Receivers β β Processors β β Exporters β β
β βββββββββββββββ βββββββββββββββ βββββββββββββββ β
βββββββββββββββββββ¬ββββββββββββββββ¬ββββββββββββββββ¬ββββββ
β β β
βββββββββββββββββββΌββββββ βββββββββΌββββββββ βββββββΌββββββ
β Jaeger β β Prometheus β β Logs β
β (Tracing) β β (Metrics) β β(Logging) β
βββββββββββββββββββββββββ βββββββββββββββββ βββββββββββββ
Core Components
Tracer
Creates and manages spans:
tracer := otel.Tracer("agenthub-publisher")
ctx, span := tracer.Start(ctx, "publish_event")
defer span.End()
Meter
Creates and manages metrics:
meter := otel.Meter("agenthub-publisher")
counter, _ := meter.Int64Counter("events_published_total")
counter.Add(ctx, 1)
Propagators
Handle context propagation across service boundaries:
// Inject context
otel.GetTextMapPropagator().Inject(ctx, carrier)
// Extract context
ctx = otel.GetTextMapPropagator().Extract(ctx, carrier)
Exporters
Send telemetry data to backend systems:
- OTLP Exporter: Sends to OpenTelemetry Collector
- Jaeger Exporter: Sends directly to Jaeger
- Prometheus Exporter: Exposes metrics for Prometheus
AgentHub’s OpenTelemetry Implementation
Configuration
func NewObservability(config Config) (*Observability, error) {
// Create resource (service identification)
res, err := resource.New(ctx,
resource.WithAttributes(
semconv.ServiceName(config.ServiceName),
semconv.ServiceVersion(config.ServiceVersion),
),
)
// Setup tracing
traceExporter, err := otlptracegrpc.New(ctx,
otlptracegrpc.WithEndpoint(config.JaegerEndpoint),
otlptracegrpc.WithInsecure(),
)
tracerProvider := sdktrace.NewTracerProvider(
sdktrace.WithBatcher(traceExporter),
sdktrace.WithResource(res),
sdktrace.WithSampler(sdktrace.AlwaysSample()),
)
otel.SetTracerProvider(tracerProvider)
// Setup metrics
meterProvider := sdkmetric.NewMeterProvider(
sdkmetric.WithResource(res),
sdkmetric.WithReader(promExporter),
)
otel.SetMeterProvider(meterProvider)
}
Custom slog Handler Integration
AgentHub’s custom logging handler automatically correlates logs with traces:
func (h *ObservabilityHandler) Handle(ctx context.Context, r slog.Record) error {
// Extract trace context
if span := trace.SpanFromContext(ctx); span.SpanContext().IsValid() {
spanCtx := span.SpanContext()
attrs = append(attrs,
slog.String("trace_id", spanCtx.TraceID().String()),
slog.String("span_id", spanCtx.SpanID().String()),
)
}
// Structured log output with trace correlation
logData := map[string]interface{}{
"time": r.Time.Format(time.RFC3339),
"level": r.Level.String(),
"msg": r.Message,
"trace_id": spanCtx.TraceID().String(),
"span_id": spanCtx.SpanID().String(),
"service": h.serviceName,
}
}
Observability Patterns in Event-Driven Systems
Pattern 1: Event Correlation
Challenge: Correlating events across async boundaries
Solution: Inject trace context into event metadata
// Publisher injects context
headers := make(map[string]string)
otel.GetTextMapPropagator().Inject(ctx, propagation.MapCarrier(headers))
event.Metadata["trace_headers"] = headers
// Consumer extracts context
ctx = otel.GetTextMapPropagator().Extract(ctx, propagation.MapCarrier(event.Metadata["trace_headers"]))
Pattern 2: Async Operation Tracking
Challenge: Tracking operations that complete asynchronously
Solution: Create child spans that can outlive their parents
// Start async operation
ctx, span := tracer.Start(ctx, "async_operation")
go func() {
defer span.End()
// Long-running async work
processTask()
span.SetStatus(2, "") // Success
}()
// Parent can continue/return immediately
Pattern 3: Error Propagation
Challenge: Understanding how errors flow through the system
Solution: Record errors at each span and propagate error status
if err != nil {
span.RecordError(err)
span.SetStatus(1, err.Error()) // Error status
// Optionally add error details
span.SetAttributes(
attribute.String("error.type", "validation_error"),
attribute.String("error.message", err.Error()),
)
}
Challenge: Understanding where time is spent in complex flows
Solution: Detailed span hierarchy with timing
// High-level operation
ctx, span := tracer.Start(ctx, "process_task")
defer span.End()
// Sub-operations
ctx, validateSpan := tracer.Start(ctx, "validate_input")
// ... validation logic
validateSpan.End()
ctx, computeSpan := tracer.Start(ctx, "compute_result")
// ... computation logic
computeSpan.End()
ctx, persistSpan := tracer.Start(ctx, "persist_result")
// ... persistence logic
persistSpan.End()
Benefits of AgentHub’s Observability Implementation
1. Complete Request Visibility
- See every step of event processing
- Understand inter-service dependencies
- Track request flows across async boundaries
- Identify bottlenecks in event processing
- Understand where time is spent
- Optimize critical paths
3. Error Diagnosis
- Pinpoint exactly where failures occur
- Understand error propagation patterns
- Correlate errors with system state
4. Capacity Planning
- Understand system throughput characteristics
- Identify scaling bottlenecks
- Plan resource allocation
5. Troubleshooting
- Correlate logs, metrics, and traces
- Understand system behavior under load
- Debug complex distributed issues
Advanced Tracing Concepts
Sampling
Not every request needs to be traced. Sampling reduces overhead:
// Probability sampling (trace 10% of requests)
sdktrace.WithSampler(sdktrace.ParentBased(
sdktrace.TraceIDRatioBased(0.1),
))
// Rate limiting sampling (max 100 traces/second)
sdktrace.WithSampler(sdktrace.ParentBased(
sdktrace.RateLimited(100),
))
Custom Attributes
Add business context to spans:
span.SetAttributes(
attribute.String("user.id", userID),
attribute.String("tenant.id", tenantID),
attribute.Int("batch.size", len(items)),
attribute.String("workflow.type", workflowType),
)
Span Events
Add timestamped events within spans:
span.AddEvent("validation.started")
// ... validation logic
span.AddEvent("validation.completed", trace.WithAttributes(
attribute.Int("validation.rules.evaluated", ruleCount),
))
Baggage
Propagate key-value pairs across the entire trace:
// Set baggage
ctx = baggage.ContextWithValues(ctx,
baggage.String("user.tier", "premium"),
baggage.String("feature.flag", "new_algorithm"),
)
// Read baggage in any service
if member := baggage.FromContext(ctx).Member("user.tier"); member.Value() == "premium" {
// Use premium algorithm
}
Overhead Analysis
AgentHub’s observability adds:
- CPU: ~5% overhead for tracing
- Memory: ~50MB per service for buffers and metadata
- Network: Minimal (async batched export)
- Latency: ~10ms additional end-to-end latency
Optimization Strategies
- Sampling: Reduce trace volume for high-throughput systems
- Batching: Export spans in batches to reduce network overhead
- Async Processing: Never block business logic for observability
- Resource Limits: Use memory limiters in the collector
Production Recommendations
- Enable sampling for high-volume systems
- Monitor collector performance and scale horizontally if needed
- Set retention policies for traces and metrics
- Use dedicated infrastructure for observability stack
Troubleshooting Common Issues
Missing Traces
Symptoms: No traces appear in Jaeger
Causes:
- Context not propagated correctly
- Exporter configuration issues
- Collector connectivity problems
Debugging:
# Check if spans are being created
curl http://localhost:8080/metrics | grep trace
# Check collector logs
docker-compose logs otel-collector
# Verify Jaeger connectivity
curl http://localhost:16686/api/traces
Broken Trace Chains
Symptoms: Spans appear disconnected
Causes:
- Context not extracted properly
- New context created instead of continuing existing
Debugging:
// Always check if context contains active span
if span := trace.SpanFromContext(ctx); span.SpanContext().IsValid() {
fmt.Printf("Active trace: %s\n", span.SpanContext().TraceID())
} else {
fmt.Println("No active trace context")
}
High Memory Usage
Symptoms: Observability causing OOM errors
Causes:
- Too many spans in memory
- Large span attributes
- Export failures causing backlog
Solutions:
// Configure memory limits
config := sdktrace.NewTracerProvider(
sdktrace.WithSpanLimits(sdktrace.SpanLimits{
AttributeCountLimit: 128,
EventCountLimit: 128,
LinkCountLimit: 128,
}),
)
The Future of Observability
Emerging Trends
- eBPF-based Observability: Automatic instrumentation without code changes
- AI-Powered Analysis: Automatic anomaly detection and root cause analysis
- Unified Observability: Single pane of glass for metrics, traces, logs, and profiles
- Real-time Alerting: Faster detection and response to issues
OpenTelemetry Roadmap
- Profiling: Continuous profiling integration
- Client-side Observability: Browser and mobile app tracing
- Database Instrumentation: Automatic query tracing
- Infrastructure Correlation: Link application traces to infrastructure metrics
Conclusion
Distributed tracing transforms debugging from guesswork into precise investigation. AgentHub’s implementation with OpenTelemetry provides:
- Complete visibility into event-driven workflows
- Performance insights for optimization
- Error correlation for faster resolution
- Business context through custom attributes
The investment in observability pays dividends in:
- Reduced MTTR (Mean Time To Resolution)
- Improved performance through data-driven optimization
- Better user experience through proactive monitoring
- Team productivity through better tooling
π― Ready to Implement?
Hands-on: Observability Demo Tutorial
Production: Add Observability to Your Agent
Deep Dive: Observability Architecture
3.2 - Observability Span Naming Convention
Standard naming convention for OpenTelemetry spans across all agents
Observability Span Naming Convention
Overview
To enable quick visual identification of which component is performing which operation in distributed traces, AgentHub follows a consistent span naming convention across all agents and services.
Naming Standard
All manually-created spans MUST follow this format:
{component}.{operation}
Where:
- component: The agent or service name (lowercase, underscores for multi-word)
- operation: The operation being performed (lowercase, underscores for multi-word)
Examples
| Component | Operation | Span Name |
|---|
| Broker | publish_event | broker.publish_event |
| Broker | route_event | broker.route_event |
| Broker | subscribe_messages | broker.subscribe_messages |
| Cortex | handle_message | cortex.handle_message |
| Cortex | llm_decide | cortex.llm_decide |
| Cortex | execute_actions | cortex.execute_actions |
| Cortex | send_chat_response | cortex.send_chat_response |
| Echo Agent | handle_request | echo_agent.handle_request |
| Echo Agent | publish_response | echo_agent.publish_response |
| Chat CLI | publish_message | chat_cli.publish_message |
| Chat CLI | display_response | chat_cli.display_response |
Component Names
Standard component names for AgentHub:
| Component | Span Prefix | Description |
|---|
| Event Bus Broker | broker. | Core message routing service |
| Cortex Orchestrator | cortex. | AI orchestration engine |
| Echo Agent | echo_agent. | Echo/repeat agent |
| Chat CLI | chat_cli. | Command-line chat interface |
| Publisher Agent | publisher. | Demo publisher |
| Subscriber Agent | subscriber. | Demo subscriber |
| Chat Responder | chat_responder. | Chat response agent |
For new agents, use the agent’s ID or a short descriptive name.
Implementation
Creating Spans
When creating spans, use the component name as prefix:
// Good: Clear component identification
ctx, span := traceManager.StartSpan(ctx, "cortex.handle_message")
defer span.End()
// Bad: Missing component prefix
ctx, span := traceManager.StartSpan(ctx, "handle_message") // β
defer span.End()
Component Attribute
In addition to the span name prefix, ALWAYS add the component attribute for filtering and querying:
ctx, span := traceManager.StartSpan(ctx, "cortex.handle_message")
defer span.End()
// Add component attribute
traceManager.AddComponentAttribute(span, "cortex")
This enables:
- Visual identification: Span name shows component in trace waterfall
- Query filtering: Component attribute enables filtering traces by component
A2A Message Spans
For spans specifically tracking A2A message handling, use the specialized method:
ctx, span := traceManager.StartA2AMessageSpan(
ctx,
"cortex.handle_message", // Note: includes component prefix
message.GetMessageId(),
message.GetRole().String(),
)
defer span.End()
// Add component attribute
traceManager.AddComponentAttribute(span, "cortex")
Operation Naming Guidelines
Use Action Verbs
Operations should describe what the component is doing:
handle_message - Processing an incoming messagepublish_event - Publishing an eventroute_event - Routing an event to subscribersexecute_actions - Executing a list of actionssend_response - Sending a response
Be Specific
When possible, be specific about what kind of operation:
β
cortex.llm_decide (specific: LLM decision making)
β cortex.decide (too generic)
β
echo_agent.handle_echo_request (specific: echo request handling)
β echo_agent.handle (too generic)
Use Underscores
Separate words with underscores, not hyphens or camelCase:
- β
broker.publish_event - β
broker.publish-event (hyphens) - β
broker.publishEvent (camelCase)
Auto-Generated Spans
Some spans are auto-generated by instrumentation libraries (e.g., gRPC):
agenthub.AgentHub/PublishMessage
agenthub.AgentHub/SubscribeToMessages
agenthub.AgentHub/RegisterAgent
Why These Are Acceptable
These auto-generated gRPC spans are acceptable and should NOT be changed because:
- Standard Format: They follow gRPC’s OpenTelemetry standard naming convention:
package.Service/Method - Automatic Instrumentation: Generated automatically by gRPC’s built-in OpenTelemetry interceptors
- Breaking Changes: Modifying them would break standard gRPC tracing and break compatibility with observability tools
- Clear Indication: The format clearly indicates these are RPC calls (the
/ separator is distinctive) - Component Context: Parent spans provide the component context
Visual Example in Traces
In practice, you’ll see this pattern:
cortex.handle_message β Manual span (component prefix)
ββ cortex.llm_decide β Manual span (component prefix)
ββ cortex.execute_actions β Manual span (component prefix)
β ββ cortex.send_chat_response β Manual span (component prefix)
β ββ agenthub.AgentHub/PublishMessage β Auto-generated gRPC span (standard)
β ββ broker.publish_event β Manual span (component prefix)
β ββ broker.route_event β Manual span (component prefix)
ββ echo_agent.handle_request β Manual span (component prefix)
ββ echo_agent.publish_response β Manual span (component prefix)
ββ agenthub.AgentHub/PublishMessage β Auto-generated gRPC span (standard)
Key Point: The gRPC spans (agenthub.AgentHub/*) are nested within component-prefixed spans, so the component context is always clear from the parent span.
Best Practice: Create Parent Spans
For clarity, always create a parent span with your component prefix that wraps gRPC calls:
// Good: Parent span with component prefix
ctx, span := traceManager.StartSpan(ctx, "cortex.send_chat_response")
defer span.End()
traceManager.AddComponentAttribute(span, "cortex")
// Child span will be auto-created by gRPC instrumentation
_, err := client.Client.PublishMessage(ctx, request)
// Creates child span: agenthub.AgentHub/PublishMessage
// Result in trace:
// cortex.send_chat_response (your span)
// ββ agenthub.AgentHub/PublishMessage (gRPC auto-span)
This pattern ensures:
- Component identification at the operation level
- Standard gRPC tracing compatibility
- Clear parent-child relationships
- No modification of auto-generated spans
Benefits
Visual Clarity
In trace visualizations, you can immediately identify components:
broker.publish_event
ββ broker.route_event
β ββ cortex.handle_message
β β ββ cortex.llm_decide
β β ββ cortex.execute_actions
β β ββ cortex.send_chat_response
β ββ echo_agent.handle_request
β ββ echo_agent.publish_response
ββ broker.route_event
ββ chat_cli.display_response
Query & Filter
Filter traces by component using attributes:
# All cortex operations
component = "cortex"
# All message handling across components
span.name LIKE "%.handle_message"
# Echo agent operations only
component = "echo_agent"
Debugging
When debugging issues:
- Look at span name to identify which component failed
- No need to expand span details to find component
- Quickly trace request flow across components
Migration
Existing code should be updated to follow this convention:
Before
// Inconsistent naming
ctx, span := traceManager.StartSpan(ctx, "handle_message") // Who is handling?
ctx, span := traceManager.StartSpan(ctx, "cortex_chat_request") // Inconsistent separator
ctx, span := traceManager.StartSpan(ctx, "cli_publish_user_message") // Inconsistent prefix
After
// Consistent naming
ctx, span := traceManager.StartSpan(ctx, "cortex.handle_message")
ctx, span := traceManager.StartSpan(ctx, "cortex.chat_request")
ctx, span := traceManager.StartSpan(ctx, "chat_cli.publish_message")
Validation
To ensure compliance, span names should be validated:
- Code Review: Check span names follow
{component}.{operation} format - Testing: Verify component attribute is set on all spans
- Trace Review: Inspect actual traces to confirm naming consistency
See Also
3.3 - Architecture Evolution: From Build Tags to Unified Abstractions
Understanding AgentHub’s evolution from build tag-based conditional compilation to unified abstractions with built-in observability.
Understanding-oriented: Learn how AgentHub evolved from build tag-based conditional compilation to a unified abstraction approach that dramatically simplifies development while providing comprehensive observability.
AgentHub originally used Go build tags to handle different deployment scenarios:
- Development: Fast builds with minimal features (
go build) - Production: Full observability builds (
go build -tags observability) - Testing: Lightweight versions for testing environments
Problems with Build Tags:
- Maintenance overhead: Separate code paths for different builds
- Testing complexity: Hard to ensure feature parity across variants
- Developer experience: Multiple build commands and configurations
- Binary complexity: Different feature sets in different binaries
Modern Solution: Unified Abstractions
AgentHub now uses a unified abstraction layer (internal/agenthub/) that provides:
- Single codebase: No more separate files for different builds
- Built-in observability: Always available, configured via environment
- Simplified development: One build command, one binary
- Runtime configuration: Features controlled by environment variables
The New Architecture
Core Components
The unified abstraction provides these key components:
1. AgentHubServer
// Single server implementation with built-in observability
server, err := agenthub.NewAgentHubServer(config)
if err != nil {
return err
}
// Automatic OpenTelemetry, metrics, health checks
err = server.Start(ctx)
2. AgentHubClient
// Single client implementation with built-in observability
client, err := agenthub.NewAgentHubClient(config)
if err != nil {
return err
}
// Automatic tracing, metrics, structured logging
err = client.Start(ctx)
3. TaskPublisher & TaskSubscriber
// High-level abstractions with automatic correlation
publisher := &agenthub.TaskPublisher{
Client: client.Client,
TraceManager: client.TraceManager,
// Built-in observability
}
subscriber := agenthub.NewTaskSubscriber(client, agentID)
// Automatic task processing with tracing
Before vs After Comparison
Old Build Tag Approach
File Structure (Legacy):
agents/publisher/
βββ main.go # Basic version (~200 lines)
βββ main_observability.go # Observable version (~380 lines)
βββ shared.go # Common code
βββ config.go # Configuration
broker/
βββ main.go # Basic broker (~150 lines)
βββ main_observability.go # Observable broker (~300 lines)
βββ server.go # Core logic
Build Commands (Legacy):
# Basic build
go build -o bin/publisher agents/publisher/
# Observable build
go build -tags observability -o bin/publisher-obs agents/publisher/
# Testing observable features
go test -tags observability ./...
New Unified Approach
File Structure (Current):
agents/publisher/
βββ main.go # Single implementation (~50 lines)
agents/subscriber/
βββ main.go # Single implementation (~60 lines)
broker/
βββ main.go # Single implementation (~30 lines)
internal/agenthub/ # Unified abstraction layer
βββ grpc.go # Client/server with observability
βββ subscriber.go # Task processing abstractions
βββ broker.go # Event bus implementation
βββ metadata.go # Correlation and metadata
Build Commands (Current):
# Single build for all use cases
go build -o bin/publisher agents/publisher/
go build -o bin/subscriber agents/subscriber/
go build -o bin/broker broker/
# Testing (no special tags needed)
go test ./...
Configuration Evolution
Environment-Based Configuration
Instead of build tags, features are now controlled via environment variables:
# Observability configuration
export JAEGER_ENDPOINT="http://localhost:14268/api/traces"
export OTEL_SERVICE_NAME="agenthub"
export OTEL_SERVICE_VERSION="1.0.0"
# Health and metrics ports
export BROKER_HEALTH_PORT="8080"
# Broker connection
export AGENTHUB_BROKER_ADDR="localhost"
export AGENTHUB_BROKER_PORT="50051"
Automatic Feature Detection
The unified abstractions automatically configure features based on environment:
// Observability is automatically configured
config := agenthub.NewGRPCConfig("publisher")
client, err := agenthub.NewAgentHubClient(config)
// If JAEGER_ENDPOINT is set β tracing enabled
// If BROKER_HEALTH_PORT is set β health server enabled
// Always includes structured logging and basic metrics
Benefits of the New Architecture
1. Developer Experience
- Single build command: No more tag confusion
- Consistent behavior: Same binary for all environments
- Easier testing: No need for multiple test runs
- Simplified CI/CD: One build pipeline
2. Maintenance Reduction
- 90% less code: From 380+ lines to 29 lines for broker
- Single code path: No more duplicate implementations
- Unified testing: Test once, works everywhere
- Automatic features: Observability included by default
3. Operational Benefits
- Runtime configuration: Change behavior without rebuilding
- Consistent deployment: Same binary across environments
- Better observability: Always available when needed
- Easier debugging: Full context always present
Migration Guide
For users migrating from the old build tag approach:
Old Commands β New Commands
# OLD: Basic builds
go build -o bin/publisher agents/publisher/
# NEW: Same command (unchanged)
go build -o bin/publisher agents/publisher/
# OLD: Observable builds
go build -tags observability -o bin/publisher-obs agents/publisher/
# NEW: Same binary, configure via environment
export JAEGER_ENDPOINT="http://localhost:14268/api/traces"
go build -o bin/publisher agents/publisher/
# OLD: Testing with tags
go test -tags observability ./...
# NEW: Standard testing
go test ./...
Configuration Migration
# OLD: Feature controlled by build tags
go build -tags observability
# NEW: Feature controlled by environment
export JAEGER_ENDPOINT="http://localhost:14268/api/traces"
export OTEL_SERVICE_NAME="my-service"
Architecture Philosophy
From Compile-Time to Runtime
The move from build tags to unified abstractions represents a fundamental shift:
Build Tags Philosophy (Old):
- “Choose features at compile time”
- “Different binaries for different needs”
- “Minimize what’s included”
Unified Abstractions Philosophy (New):
- “Include everything, configure at runtime”
- “One binary, many configurations”
- “Maximize developer experience”
Why This Change?
- Cloud-Native Reality: Modern deployments use containers with environment-based config
- Developer Productivity: Unified approach eliminates confusion and errors
- Testing Simplicity: One code path means reliable testing
- Operational Excellence: Runtime configuration enables better operations
Resource Impact
The unified approach has minimal overhead:
Binary Size:
- Old basic: ~8MB
- Old observable: ~15MB
- New unified: ~12MB
Memory Usage:
- Baseline: ~10MB
- With observability: ~15MB (when enabled)
- Without observability: ~10MB (minimal overhead)
Startup Time:
- With observability enabled: ~150ms
- With observability disabled: ~50ms
Optimization Strategy
The abstractions use lazy initialization:
// Observability components only initialize if configured
if config.JaegerEndpoint != "" {
// Initialize tracing
}
if config.HealthPort != "" {
// Start health server
}
// Always minimal logging and basic metrics
Future Evolution
Planned Enhancements
- Plugin Architecture: Dynamic feature loading
- Configuration Profiles: Predefined environment sets
- Feature Flags: Runtime feature toggling
- Auto-Configuration: Intelligent environment detection
Compatibility Promise
The unified abstractions maintain backward compatibility:
- Old environment variables still work
- Gradual migration path available
- No breaking changes in core APIs
This architectural evolution demonstrates how AgentHub prioritizes developer experience and operational simplicity while maintaining full observability capabilities. The move from build tags to unified abstractions represents a maturation of the platform toward cloud-native best practices.
3.4 - Performance and Scaling Considerations
Explore the performance characteristics of AgentHub, scaling patterns, and optimization strategies for different deployment scenarios.
This document explores the performance characteristics of AgentHub, scaling patterns, and optimization strategies for different deployment scenarios.
Test Environment:
- 4-core Intel i7 processor
- 16GB RAM
- Local network (localhost)
- Go 1.24
Measured Performance:
- Task throughput: 8,000-12,000 tasks/second
- Task routing latency: 0.1-0.5ms average
- End-to-end latency: 2-10ms (including processing)
- Memory per agent: ~1KB active subscription state
- Concurrent agents: 1,000+ agents per broker instance
Task routing is the core performance bottleneck in AgentHub:
// Fast path: Direct agent routing
if responderID := req.GetTask().GetResponderAgentId(); responderID != "" {
if subs, ok := s.taskSubscribers[responderID]; ok {
targetChannels = subs // O(1) lookup
}
}
Optimization factors:
- Direct routing: O(1) lookup time for targeted tasks
- Broadcast routing: O(n) where n = number of subscribed agents
- Channel delivery: Concurrent delivery via goroutines
- Lock contention: Read locks allow concurrent routing
2. Message Serialization
Protocol Buffers provide efficient serialization:
- Binary encoding: ~60% smaller than JSON
- Zero-copy operations: Direct memory mapping where possible
- Schema evolution: Backward/forward compatibility
- Type safety: Compile-time validation
3. Memory Usage Patterns
// Memory usage breakdown per agent:
type agentMemoryFootprint struct {
SubscriptionState int // ~200 bytes (map entry + channel)
ChannelBuffer int // ~800 bytes (10 message buffer * 80 bytes avg)
ConnectionOverhead int // ~2KB (gRPC stream state)
// Total: ~3KB per active agent
}
Memory optimization strategies:
- Bounded channels: Prevent unbounded growth
- Connection pooling: Reuse gRPC connections
- Garbage collection: Go’s GC handles cleanup automatically
Scaling Patterns
Vertical Scaling (Scale Up)
Increasing resources on a single broker instance:
CPU Scaling
- Multi-core utilization: Go’s runtime leverages multiple cores
- Goroutine efficiency: Lightweight concurrency (2KB stack)
- CPU-bound operations: Message serialization, routing logic
// Configure for CPU optimization
export GOMAXPROCS=8 // Match available CPU cores
Memory Scaling
- Linear growth: Memory usage scales with number of agents
- Buffer tuning: Adjust channel buffer sizes based on throughput
// Memory-optimized configuration
subChan := make(chan *pb.TaskMessage, 5) // Smaller buffers for memory-constrained environments
// vs
subChan := make(chan *pb.TaskMessage, 50) // Larger buffers for high-throughput environments
Network Scaling
- Connection limits: OS file descriptor limits (ulimit -n)
- Bandwidth utilization: Protocol Buffers minimize bandwidth usage
- Connection keepalive: Efficient connection reuse
Horizontal Scaling (Scale Out)
Distributing load across multiple broker instances:
1. Agent Partitioning
Static Partitioning:
Agent Groups:
βββ Broker 1: agents_1-1000
βββ Broker 2: agents_1001-2000
βββ Broker 3: agents_2001-3000
Hash-based Partitioning:
func selectBroker(agentID string) string {
hash := fnv.New32a()
hash.Write([]byte(agentID))
brokerIndex := hash.Sum32() % uint32(len(brokers))
return brokers[brokerIndex]
}
2. Task Type Partitioning
Specialized Brokers:
Task Routing:
βββ Broker 1: data_processing, analytics
βββ Broker 2: image_processing, ml_inference
βββ Broker 3: notifications, logging
3. Geographic Partitioning
Regional Distribution:
Geographic Deployment:
βββ US-East: Broker cluster for East Coast agents
βββ US-West: Broker cluster for West Coast agents
βββ EU: Broker cluster for European agents
Load Balancing Strategies
1. Round-Robin Agent Distribution
type LoadBalancer struct {
brokers []string
current int
mu sync.Mutex
}
func (lb *LoadBalancer) NextBroker() string {
lb.mu.Lock()
defer lb.mu.Unlock()
broker := lb.brokers[lb.current]
lb.current = (lb.current + 1) % len(lb.brokers)
return broker
}
2. Capacity-Based Routing
type BrokerMetrics struct {
ActiveAgents int
TasksPerSec float64
CPUUsage float64
MemoryUsage float64
}
func selectBestBroker(brokers []BrokerMetrics) int {
// Select broker with lowest load score
bestIndex := 0
bestScore := calculateLoadScore(brokers[0])
for i, broker := range brokers[1:] {
score := calculateLoadScore(broker)
if score < bestScore {
bestScore = score
bestIndex = i + 1
}
}
return bestIndex
}
1. Message Batching
For high-throughput scenarios, implement message batching:
type BatchProcessor struct {
tasks []*pb.TaskMessage
batchSize int
timeout time.Duration
ticker *time.Ticker
}
func (bp *BatchProcessor) processBatch() {
batch := make([]*pb.TaskMessage, len(bp.tasks))
copy(batch, bp.tasks)
bp.tasks = bp.tasks[:0] // Clear slice
// Process entire batch
go bp.routeBatch(batch)
}
2. Connection Pooling
Optimize gRPC connections for better resource utilization:
type ConnectionPool struct {
connections map[string]*grpc.ClientConn
maxConns int
mu sync.RWMutex
}
func (cp *ConnectionPool) GetConnection(addr string) (*grpc.ClientConn, error) {
cp.mu.RLock()
if conn, exists := cp.connections[addr]; exists {
cp.mu.RUnlock()
return conn, nil
}
cp.mu.RUnlock()
// Create new connection
return cp.createConnection(addr)
}
3. Adaptive Channel Sizing
Dynamically adjust channel buffer sizes based on load:
func calculateOptimalBufferSize(avgTaskRate float64, processingTime time.Duration) int {
// Buffer size = rate * processing time + safety margin
bufferSize := int(avgTaskRate * processingTime.Seconds()) + 10
// Clamp to reasonable bounds
if bufferSize < 5 {
return 5
}
if bufferSize > 100 {
return 100
}
return bufferSize
}
4. Memory Optimization
Reduce memory allocations in hot paths:
// Use sync.Pool for frequent allocations
var taskPool = sync.Pool{
New: func() interface{} {
return &pb.TaskMessage{}
},
}
func processTaskOptimized(task *pb.TaskMessage) {
// Reuse task objects
pooledTask := taskPool.Get().(*pb.TaskMessage)
defer taskPool.Put(pooledTask)
// Copy and process
*pooledTask = *task
// ... processing logic
}
Monitoring and Metrics
Throughput Metrics
type ThroughputMetrics struct {
TasksPerSecond float64
ResultsPerSecond float64
ProgressPerSecond float64
MessagesPerSecond float64
}
Latency Metrics
type LatencyMetrics struct {
RoutingLatency time.Duration // Broker routing time
ProcessingLatency time.Duration // Agent processing time
EndToEndLatency time.Duration // Total task completion time
P50, P95, P99 time.Duration // Percentile latencies
}
Resource Metrics
type ResourceMetrics struct {
ActiveAgents int
ActiveTasks int
MemoryUsage int64
CPUUsage float64
GoroutineCount int
OpenConnections int
}
Monitoring Implementation
import "github.com/prometheus/client_golang/prometheus"
var (
taskCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "agenthub_tasks_total",
Help: "Total number of tasks processed",
},
[]string{"task_type", "status"},
)
latencyHistogram = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "agenthub_task_duration_seconds",
Help: "Task processing duration",
Buckets: prometheus.DefBuckets,
},
[]string{"task_type"},
)
)
Scaling Recommendations
Small Deployments (1-100 agents)
- Single broker instance: Sufficient for most small deployments
- Vertical scaling: Add CPU/memory as needed
- Simple monitoring: Basic logging and health checks
Medium Deployments (100-1,000 agents)
- Load balancing: Implement agent distribution
- Resource monitoring: Track CPU, memory, and throughput
- Optimization: Tune channel buffer sizes and timeouts
Large Deployments (1,000+ agents)
- Horizontal scaling: Multiple broker instances
- Partitioning strategy: Implement agent or task type partitioning
- Advanced monitoring: Full metrics and alerting
- Performance testing: Regular load testing and optimization
High-Throughput Scenarios (10,000+ tasks/second)
- Message batching: Implement batch processing
- Connection optimization: Use connection pooling
- Hardware optimization: SSD storage, high-speed networking
- Profiling: Regular performance profiling and optimization
1. High Latency
Symptoms: Slow task processing times
Causes: Network latency, overloaded agents, inefficient routing
Solutions: Optimize routing, add caching, scale horizontally
2. Memory Leaks
Symptoms: Increasing memory usage over time
Causes: Unclosed channels, goroutine leaks, connection leaks
Solutions: Proper cleanup, monitoring, garbage collection tuning
3. Connection Limits
Symptoms: New agents can’t connect
Causes: OS file descriptor limits, broker resource limits
Solutions: Increase limits, implement connection pooling
4. Message Loss
Symptoms: Tasks not reaching agents or results not returned
Causes: Timeout issues, network problems, buffer overflows
Solutions: Increase timeouts, improve error handling, adjust buffer sizes
Load Testing Script
func loadTest() {
// Create multiple publishers
publishers := make([]Publisher, 10)
for i := range publishers {
publishers[i] = NewPublisher(fmt.Sprintf("publisher_%d", i))
}
// Send tasks concurrently
taskRate := 1000 // tasks per second
duration := 60 * time.Second
ticker := time.NewTicker(time.Duration(1e9 / taskRate))
timeout := time.After(duration)
for {
select {
case <-ticker.C:
publisher := publishers[rand.Intn(len(publishers))]
go publisher.PublishTask(generateRandomTask())
case <-timeout:
return
}
}
}
The AgentHub architecture provides solid performance for most use cases and clear scaling paths for growing deployments. Regular monitoring and optimization ensure continued performance as your agent ecosystem evolves.
3.5 - The Unified Abstraction Library
The AgentHub Unified Abstraction Library dramatically simplifies the development of agents and brokers while providing built-in observability, environment-based configuration, and automatic correlation tracking.
The A2A-Compliant Unified Abstraction Library
Overview
The AgentHub Unified Abstraction Library (internal/agenthub/) is a comprehensive set of A2A protocol-compliant abstractions that dramatically simplifies the development of A2A agents and brokers while providing built-in observability, environment-based configuration, and automatic correlation tracking.
Key Benefits
Before and After Comparison
Before (Legacy approach):
broker/main_observability.go: 380+ lines of boilerplate- Manual OpenTelemetry setup in every component
- Duplicate configuration handling across components
- Manual correlation ID management
- Separate observability and non-observability variants
After (Unified abstractions):
broker/main.go: 29 lines using abstractions- Automatic OpenTelemetry integration
- Environment-based configuration
- Automatic correlation ID generation and propagation
- Single implementation with built-in observability
Core Components
1. gRPC Abstractions (grpc.go)
AgentHubServer
Provides a complete gRPC server abstraction with:
- Automatic OpenTelemetry instrumentation
- Environment-based configuration
- Built-in health checks
- Metrics collection
- Graceful shutdown
// Create and start a broker in one line
func StartBroker(ctx context.Context) error {
config := NewGRPCConfig("broker")
server, err := NewAgentHubServer(config)
if err != nil {
return err
}
return server.Start(ctx)
}
AgentHubClient
Provides a complete gRPC client abstraction with:
- Automatic connection management
- Built-in observability
- Environment-based server discovery
- Health monitoring
// Create a client with built-in observability
config := agenthub.NewGRPCConfig("publisher")
client, err := agenthub.NewAgentHubClient(config)
2. A2A Task Management Abstractions (a2a.go)
A2ATaskPublisher
Simplifies A2A task publishing with:
- Automatic A2A message generation
- Built-in observability tracing
- A2A context management
- Structured error handling
- A2A-compliant message formatting
a2aPublisher := &agenthub.A2ATaskPublisher{
Client: client.Client,
TraceManager: client.TraceManager,
MetricsManager: client.MetricsManager,
Logger: client.Logger,
ComponentName: "a2a_publisher",
}
// Create A2A task with structured message content
task := &a2a.Task{
Id: "task_greeting_" + uuid.New().String(),
ContextId: "conversation_123",
Status: &a2a.TaskStatus{
State: a2a.TaskState_TASK_STATE_SUBMITTED,
Update: &a2a.Message{
MessageId: "msg_" + uuid.New().String(),
Role: a2a.Role_USER,
Content: []*a2a.Part{
{
Part: &a2a.Part_Text{
Text: "Please process greeting task",
},
},
{
Part: &a2a.Part_Data{
Data: &a2a.DataPart{
Data: greetingParams,
Description: "Greeting parameters",
},
},
},
},
},
Timestamp: timestamppb.Now(),
},
}
err := a2aPublisher.PublishA2ATask(ctx, task, &pb.AgentEventMetadata{
FromAgentId: "publisher_id",
ToAgentId: "subscriber_id",
EventType: "task.submitted",
Priority: pb.Priority_PRIORITY_MEDIUM,
})
A2ATaskProcessor
Provides full observability for A2A task processing:
- Automatic A2A trace propagation
- Rich A2A span annotations with context and message details
- A2A message processing metrics
- A2A conversation context tracking
- Error tracking with A2A-compliant error messages
3. A2A Subscriber Abstractions (a2a_subscriber.go)
A2ATaskSubscriber
Complete A2A subscriber implementation with:
- A2A-compliant task handler system
- Built-in A2A message processors
- Automatic A2A artifact publishing
- Full A2A observability integration
- A2A conversation context awareness
a2aSubscriber := agenthub.NewA2ATaskSubscriber(client, agentID)
a2aSubscriber.RegisterDefaultA2AHandlers()
// Custom A2A task handlers
a2aSubscriber.RegisterA2ATaskHandler("greeting", func(ctx context.Context, event *pb.AgentEvent) error {
task := event.GetTask()
if task == nil {
return fmt.Errorf("no task in event")
}
// Process A2A task content
requestMessage := task.Status.Update
response := a2aSubscriber.ProcessA2AMessage(ctx, requestMessage)
// Create completion artifact
artifact := &a2a.Artifact{
ArtifactId: "artifact_" + uuid.New().String(),
Name: "Greeting Response",
Description: "Processed greeting task result",
Parts: []*a2a.Part{
{
Part: &a2a.Part_Text{
Text: response,
},
},
},
}
// Complete task with artifact
return a2aSubscriber.CompleteA2ATaskWithArtifact(ctx, task, artifact)
})
go a2aSubscriber.SubscribeToA2ATasks(ctx)
go a2aSubscriber.SubscribeToA2AMessages(ctx)
4. A2A Broker Service (a2a_broker.go)
Complete A2A-compliant AgentHub service implementation that handles:
- A2A message routing and delivery
- A2A subscription management with context filtering
- A2A artifact distribution
- A2A task state management
- EDA+A2A hybrid routing
- Full A2A observability
// A2A broker service with unified abstractions
type A2ABrokerService struct {
// A2A-specific components
MessageRouter *A2AMessageRouter
TaskManager *A2ATaskManager
ContextManager *A2AContextManager
ArtifactManager *A2AArtifactManager
// EDA integration
EventBus *EDAEventBus
SubscriptionMgr *A2ASubscriptionManager
// Observability
TraceManager *TraceManager
MetricsManager *A2AMetricsManager
}
A2A Environment-Based Configuration
The library uses environment variables for zero-configuration A2A setup:
# Core AgentHub A2A Settings
export AGENTHUB_BROKER_ADDR=localhost
export AGENTHUB_BROKER_PORT=50051
# A2A Protocol Configuration
export AGENTHUB_A2A_PROTOCOL_VERSION=1.0
export AGENTHUB_MESSAGE_BUFFER_SIZE=100
export AGENTHUB_CONTEXT_TIMEOUT=30s
export AGENTHUB_ARTIFACT_MAX_SIZE=10MB
# Observability Endpoints
export JAEGER_ENDPOINT=127.0.0.1:4317
export OTEL_EXPORTER_OTLP_ENDPOINT=http://127.0.0.1:4317
# A2A Health Check Ports
export AGENTHUB_HEALTH_PORT=8080
export A2A_PUBLISHER_HEALTH_PORT=8081
export A2A_SUBSCRIBER_HEALTH_PORT=8082
A2A Automatic Observability
A2A Distributed Tracing
- Automatic A2A instrumentation: OpenTelemetry gRPC interceptors handle A2A trace propagation
- A2A service naming: Unified “agenthub” service with A2A component differentiation
- Rich A2A annotations: Message content, conversation context, task state transitions, and artifact details
- A2A context tracking: Complete conversation thread visibility across multiple agents
A2A Metrics Collection
- A2A message metrics: Message processing rates, A2A error rates, latencies by message type
- A2A task metrics: Task completion rates, state transition times, artifact production metrics
- A2A context metrics: Conversation context tracking, multi-agent coordination patterns
- A2A system metrics: Health checks, A2A connection status, protocol version compatibility
- A2A component metrics: Per-agent A2A performance, broker routing efficiency
Health Monitoring
- Automatic endpoints:
/health, /ready, /metrics - Component tracking: Individual health per service
- Graceful shutdown: Proper cleanup and connection management
A2A Correlation and Context Tracking
Automatic A2A Correlation IDs
// A2A task ID generation
taskID := fmt.Sprintf("task_%s_%s", taskDescription, uuid.New().String())
// A2A message ID generation
messageID := fmt.Sprintf("msg_%d_%s", time.Now().Unix(), uuid.New().String())
// A2A context ID for conversation threading
contextID := fmt.Sprintf("ctx_%s_%s", workflowType, uuid.New().String())
A2A Context Propagation
- A2A conversation threading: Context IDs link related tasks across agents
- A2A message history: Complete audit trail of all messages in a conversation
- A2A workflow tracking: End-to-end visibility of multi-agent workflows
Trace Propagation
- W3C Trace Context: Standard distributed tracing headers
- Automatic propagation: gRPC interceptors handle context passing
- End-to-end visibility: Publisher β Broker β Subscriber traces
A2A Migration Guide
From Legacy EventBus to A2A Abstractions
Before (Legacy EventBus):
// 50+ lines of observability setup
obs, err := observability.New(ctx, observability.Config{...})
server := grpc.NewServer(grpc.UnaryInterceptor(...))
pb.RegisterEventBusServer(server, &eventBusService{...})
// Manual task message creation
task := &pb.TaskMessage{
TaskId: "task_123",
TaskType: "greeting",
// ... manual field population
}
After (A2A Abstractions):
// One line A2A broker startup
err := agenthub.StartA2ABroker(ctx)
// A2A task creation with abstractions
task := a2aPublisher.CreateA2ATask("greeting", greetingContent, "conversation_123")
err := a2aPublisher.PublishA2ATask(ctx, task, routingMetadata)
Best Practices
1. Use Environment Configuration
Let the library handle configuration automatically:
source .envrc # Load all environment variables
go run broker/main.go
2. Register Custom A2A Handlers
Extend functionality with custom A2A task handlers:
a2aSubscriber.RegisterA2ATaskHandler("my_task", myCustomA2AHandler)
// A2A handler signature with event and context
func myCustomA2AHandler(ctx context.Context, event *pb.AgentEvent) error {
task := event.GetTask()
// Process A2A message content
return a2aSubscriber.CompleteA2ATaskWithArtifact(ctx, task, resultArtifact)
}
3. Leverage Built-in Observability
The library provides comprehensive observability by default - no additional setup required.
4. Use A2A Structured Logging
The library provides structured loggers with A2A trace correlation:
// A2A-aware logging with context
client.Logger.InfoContext(ctx, "Processing A2A task",
"task_id", task.GetId(),
"context_id", task.GetContextId(),
"message_count", len(task.GetHistory()),
"current_state", task.GetStatus().GetState().String(),
)
A2A Architecture Benefits
Code Reduction with A2A Abstractions
- A2A Broker: 380+ lines β 29 lines (92% reduction)
- A2A Publisher: 150+ lines β 45 lines (70% reduction)
- A2A Subscriber: 200+ lines β 55 lines (72% reduction)
- A2A Message Handling: Complex manual parsing β automatic Part processing
- A2A Context Management: Manual tracking β automatic conversation threading
A2A Maintainability
- A2A protocol compliance: Centralized A2A message handling ensures protocol adherence
- Consistent A2A patterns: Same abstractions across all A2A components
- A2A-aware configuration: Environment variables tuned for A2A performance
- A2A context preservation: Automatic conversation context management
A2A Developer Experience
- Zero A2A boilerplate: Built-in A2A message parsing and artifact handling
- A2A-native architecture: Easy to extend with custom A2A message processors
- Automatic A2A setup: One-line A2A service creation with protocol compliance
- A2A debugging: Rich conversation context and message history for troubleshooting
A2A Future Extensibility
The A2A abstraction library is designed for A2A protocol extension:
- Custom A2A Part types: Easy to add new content types (text, data, files, custom)
- Custom A2A observability: Extend A2A metrics and conversation tracing
- A2A configuration: Override A2A protocol defaults with environment variables
- A2A transport options: Extend beyond gRPC while maintaining A2A compliance
- A2A protocol evolution: Built-in version compatibility and migration support
A2A Protocol Extension Points
// Custom A2A Part type
type CustomPart struct {
CustomData interface{} `json:"custom_data"`
Format string `json:"format"`
}
// Custom A2A artifact processor
type CustomArtifactProcessor struct {
SupportedTypes []string
ProcessFunc func(ctx context.Context, artifact *a2a.Artifact) error
}
// Custom A2A context manager
type CustomContextManager struct {
ContextRules map[string]ContextRule
RouteFunc func(contextId string, message *a2a.Message) []string
}
This A2A-compliant unified approach provides a solid foundation for building complex multi-agent systems with full Agent2Agent protocol support while maintaining simplicity, comprehensive observability, and rich conversation capabilities.