1 - Agent2Agent (A2A) Protocol Migration
Understanding the migration to Agent2Agent protocol compliance while maintaining Event-Driven Architecture benefits.
Agent2Agent (A2A) Protocol Migration
This document explains the migration of AgentHub to full Agent2Agent (A2A) protocol compliance while maintaining the essential Event-Driven Architecture (EDA) patterns that make the system scalable and resilient.
What is the Agent2Agent Protocol?
The Agent2Agent (A2A) protocol is a standardized specification for communication between AI agents. It defines:
- Standardized Message Formats: Using
Message, Part, Task, and Artifact structures - Task Lifecycle Management: Clear states (SUBMITTED, WORKING, COMPLETED, FAILED, CANCELLED)
- Agent Discovery: Using
AgentCard for capability advertisement - Interoperability: Ensuring agents can communicate across different platforms
Why Migrate to A2A?
Benefits of A2A Compliance
- Interoperability: AgentHub can now communicate with any A2A-compliant agent or system
- Standardization: Clear, well-defined message formats reduce integration complexity
- Ecosystem Compatibility: Join the growing ecosystem of A2A-compatible tools
- Future-Proofing: Built on industry standards rather than custom protocols
Maintained EDA Benefits
- Scalability: Event-driven routing scales to thousands of agents
- Resilience: Asynchronous communication handles network partitions gracefully
- Flexibility: Topic-based routing and priority queues enable sophisticated workflows
- Observability: Built-in tracing and metrics for production deployments
Hybrid Architecture
AgentHub implements a hybrid approach that combines the best of both worlds:
┌─────────────────────────────────────────────────────────────────┐
│ A2A Protocol Layer │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────┐│
│ │ A2A Message │ │ A2A Task │ │ A2A Artifact│ │A2A Agent││
│ │ (standard) │ │ (standard) │ │ (standard) │ │ Card ││
│ └─────────────┘ └─────────────┘ └─────────────┘ └─────────┘│
├─────────────────────────────────────────────────────────────────┤
│ EDA Transport Layer │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────┐│
│ │ AgentEvent │ │Event Router │ │ Subscribers │ │Priority ││
│ │ Wrapper │ │ │ │ Manager │ │ Queues ││
│ └─────────────┘ └─────────────┘ └─────────────┘ └─────────┘│
├─────────────────────────────────────────────────────────────────┤
│ gRPC Infrastructure │
└─────────────────────────────────────────────────────────────────┘
How It Works
- A2A Messages are created using standard A2A structures (
Message, Task, etc.) - EDA Wrapper wraps A2A messages in
AgentEvent for transport - Event Routing uses EDA patterns (pub/sub, priority, topics) for delivery
- A2A Compliance ensures messages follow A2A protocol semantics
API Changes
Before (Legacy API)
// Legacy TaskMessage (deprecated)
taskPublisher.PublishTask(ctx, &agenthub.PublishTaskRequest{
TaskType: "greeting",
Parameters: map[string]interface{}{
"name": "Claude",
},
RequesterAgentID: "my_agent",
ResponderAgentID: "target_agent",
})
After (A2A-Compliant API)
// A2A-compliant task publishing
content := []*pb.Part{
{
Part: &pb.Part_Text{
Text: "Hello! Please provide a greeting for Claude.",
},
},
}
task, err := taskPublisher.PublishTask(ctx, &agenthub.A2APublishTaskRequest{
TaskType: "greeting",
Content: content,
RequesterAgentID: "my_agent",
ResponderAgentID: "target_agent",
Priority: pb.Priority_PRIORITY_MEDIUM,
ContextID: "conversation_123",
})
Message Structure Changes
message Message {
string message_id = 1; // Unique message identifier
string context_id = 2; // Conversation context
string task_id = 3; // Associated task (optional)
Role role = 4; // USER or AGENT
repeated Part content = 5; // Message content parts
google.protobuf.Struct metadata = 6; // Additional metadata
}
message Part {
oneof part {
string text = 1; // Text content
DataPart data = 2; // Structured data
FilePart file = 3; // File reference
}
}
message Task {
string id = 1; // Task identifier
string context_id = 2; // Conversation context
TaskStatus status = 3; // Current status
repeated Message history = 4; // Message history
repeated Artifact artifacts = 5; // Task outputs
google.protobuf.Struct metadata = 6; // Task metadata
}
enum TaskState {
TASK_STATE_SUBMITTED = 0; // Task created
TASK_STATE_WORKING = 1; // Task in progress
TASK_STATE_COMPLETED = 2; // Task completed successfully
TASK_STATE_FAILED = 3; // Task failed
TASK_STATE_CANCELLED = 4; // Task cancelled
}
Migration Guide
For Publishers
- Replace
TaskPublisher with A2ATaskPublisher - Use
A2APublishTaskRequest with A2A Part structures - Handle returned A2A
Task objects
For Subscribers
- Replace
TaskSubscriber with A2ATaskSubscriber - Update handlers to process A2A
Task and Message objects - Return A2A
Artifact objects instead of custom results
For Custom Integrations
- Update protobuf imports to use
events/a2a package - Replace custom message structures with A2A equivalents
- Use
AgentHub service instead of EventBus
Backward Compatibility
The migration maintains wire-level compatibility through:
- Deprecated Types: Legacy message types marked as deprecated but still supported
- Automatic Conversion: EDA broker converts between legacy and A2A formats when needed
- Graceful Migration: Existing agents can migrate incrementally
Testing A2A Compliance
Run the demo to verify A2A compliance:
# Terminal 1: Start A2A broker
make run-server
# Terminal 2: Start A2A subscriber
make run-subscriber
# Terminal 3: Start A2A publisher
make run-publisher
Expected output shows successful A2A task processing:
- Publisher: “Published A2A task”
- Subscriber: “Task processing completed”
- Artifacts generated in A2A format
Best Practices
- Use A2A Types: Always use A2A message structures for new code
- Context Management: Use
context_id to group related messages - Proper Parts: Structure content using appropriate
Part types - Artifact Returns: Return structured
Artifact objects from tasks - Status Updates: Properly manage task lifecycle states
The A2A migration ensures AgentHub remains both standards-compliant and highly scalable through its hybrid EDA+A2A architecture.
2 - Understanding Tasks in Agent2Agent Communication
Tasks are the fundamental unit of work exchange in the Agent2Agent protocol. Deep dive into task semantics, lifecycle, and design patterns.
Understanding Tasks in Agent2Agent Communication
Tasks are the fundamental unit of work exchange in the Agent2Agent protocol. This document provides a deep dive into task semantics, lifecycle, and design patterns.
Task Anatomy
Core Components
Every task in the Agent2Agent system consists of several key components that define its identity, purpose, and execution context:
A2A Task Identity
string id = 1; // Unique task identifier
string context_id = 2; // Optional conversation context
The id serves as a unique identifier that allows all participants to track the task throughout its lifecycle. It should be globally unique and meaningful for debugging purposes.
The context_id groups related tasks in a conversation or workflow context, enabling sophisticated multi-task coordination patterns.
Task classification in A2A is handled through the initial Message content rather than a separate task_type field, providing more flexibility for complex task descriptions.
A2A Task Status and History
TaskStatus status = 3; // Current task status
repeated Message history = 4; // Message history for this task
repeated Artifact artifacts = 5; // Task output artifacts
google.protobuf.Struct metadata = 6; // Task metadata
In A2A, task data is contained within Message content using the structured Part format:
// A2A task request message
message Message {
string message_id = 1;
string context_id = 2;
string task_id = 3;
Role role = 4; // USER (requester) or AGENT (responder)
repeated Part content = 5; // Structured task content
}
message Part {
oneof part {
string text = 1; // Text description
DataPart data = 2; // Structured data
FilePart file = 3; // File references
}
}
// Example: A2A data analysis task
taskMessage := &a2a.Message{
MessageId: "msg_" + uuid.New().String(),
ContextId: "analysis_workflow_123",
TaskId: "task_analysis_456",
Role: a2a.Role_USER,
Content: []*a2a.Part{
{
Part: &a2a.Part_Text{
Text: "Please perform trend analysis on Q4 sales data",
},
},
{
Part: &a2a.Part_Data{
Data: &a2a.DataPart{
Data: analysisParams, // Structured parameters
Description: "Analysis configuration",
},
},
},
},
}
Metadata in A2A tasks provides additional context for execution, auditing, or debugging:
// A2A task metadata
taskMetadata, _ := structpb.NewStruct(map[string]interface{}{
"workflow_id": "workflow_abc123",
"user_id": "user_456",
"request_source": "web_ui",
"correlation_id": "trace_789",
"priority": "high",
"expected_duration": "5m",
})
task := &a2a.Task{
Id: "task_analysis_456",
ContextId: "analysis_workflow_123",
Metadata: taskMetadata,
}
A2A Agent Coordination
In A2A, agent coordination is handled through the EDA routing metadata:
message AgentEventMetadata {
string from_agent_id = 1; // Source agent identifier
string to_agent_id = 2; // Target agent ID (empty = broadcast)
string event_type = 3; // Event classification
repeated string subscriptions = 4; // Topic-based routing tags
Priority priority = 5; // Delivery priority
}
This enables flexible routing patterns:
- from_agent_id identifies the requesting agent
- to_agent_id can specify a target agent or be empty for broadcast
- subscriptions enable topic-based routing for specialized agents
- priority ensures urgent tasks get precedence
A2A Execution Context
A2A handles execution context through the TaskStatus structure:
message TaskStatus {
TaskState state = 1; // SUBMITTED, WORKING, COMPLETED, FAILED, CANCELLED
Message update = 2; // Latest status message
google.protobuf.Timestamp timestamp = 3; // Status timestamp
}
enum TaskState {
TASK_STATE_SUBMITTED = 0;
TASK_STATE_WORKING = 1;
TASK_STATE_COMPLETED = 2;
TASK_STATE_FAILED = 3;
TASK_STATE_CANCELLED = 4;
}
This context helps agents make intelligent scheduling decisions:
- deadline enables time-sensitive prioritization
- priority provides explicit urgency ranking
- created_at enables age-based scheduling policies
Task Lifecycle
1. A2A Task Creation and Publishing
A2A tasks begin their lifecycle when a requesting agent creates a task with an initial message:
// Create A2A task with initial request message
task := &a2a.Task{
Id: "task_analysis_" + uuid.New().String(),
ContextId: "workflow_orchestration_123",
Status: &a2a.TaskStatus{
State: a2a.TaskState_TASK_STATE_SUBMITTED,
Update: &a2a.Message{
MessageId: "msg_" + uuid.New().String(),
TaskId: "task_analysis_" + uuid.New().String(),
Role: a2a.Role_USER,
Content: []*a2a.Part{
{
Part: &a2a.Part_Text{
Text: "Please analyze the quarterly sales data for trends",
},
},
{
Part: &a2a.Part_Data{
Data: &a2a.DataPart{
Data: analysisParams,
Description: "Analysis configuration",
},
},
},
},
},
Timestamp: timestamppb.Now(),
},
}
// Publish to AgentHub broker
client.PublishTaskUpdate(ctx, &pb.PublishTaskUpdateRequest{
Task: task,
Routing: &pb.AgentEventMetadata{
FromAgentId: "data_orchestrator",
ToAgentId: "data_processor_01", // Optional: specific agent
EventType: "task.submitted",
Priority: pb.Priority_PRIORITY_HIGH,
},
})
2. A2A Task Discovery and Acceptance
Agents subscribe to A2A task events and evaluate whether to accept them:
// Agent receives A2A task event
func (a *Agent) evaluateA2ATask(event *pb.AgentEvent) bool {
task := event.GetTask()
if task == nil || task.Status.State != a2a.TaskState_TASK_STATE_SUBMITTED {
return false
}
// Analyze task content to understand requirements
requestMessage := task.Status.Update
taskDescription := a.extractTaskDescription(requestMessage)
// Check if agent can handle this task type
if !a.canHandleTaskType(taskDescription) {
return false
}
// Check capacity constraints
if a.getCurrentLoad() > a.maxCapacity {
return false
}
// Estimate duration from task content and metadata
estimatedDuration := a.estimateA2ATaskDuration(task)
if estimatedDuration > a.maxTaskDuration {
return false
}
return true
}
func (a *Agent) extractTaskDescription(msg *a2a.Message) string {
for _, part := range msg.Content {
if textPart := part.GetText(); textPart != "" {
return textPart
}
}
return ""
}
3. A2A Task Execution with Progress Reporting
Accepted A2A tasks enter the execution phase with regular status updates:
func (a *Agent) executeA2ATask(task *a2a.Task) {
// Update task to WORKING state
a.updateTaskStatus(task, a2a.TaskState_TASK_STATE_WORKING, "Task started")
// Phase 1: Preparation
a.updateTaskStatus(task, a2a.TaskState_TASK_STATE_WORKING, "Preparing data analysis")
prepareResult := a.prepareA2AExecution(task)
// Phase 2: Main processing
a.updateTaskStatus(task, a2a.TaskState_TASK_STATE_WORKING, "Processing data - 50% complete")
processResult := a.processA2AData(prepareResult)
// Phase 3: Finalization
a.updateTaskStatus(task, a2a.TaskState_TASK_STATE_WORKING, "Finalizing results - 75% complete")
finalResult := a.finalizeA2AResults(processResult)
// Completion with artifacts
a.completeTaskWithArtifacts(task, finalResult)
}
func (a *Agent) updateTaskStatus(task *a2a.Task, state a2a.TaskState, message string) {
statusUpdate := &a2a.Message{
MessageId: "msg_" + uuid.New().String(),
TaskId: task.Id,
Role: a2a.Role_AGENT,
Content: []*a2a.Part{
{
Part: &a2a.Part_Text{
Text: message,
},
},
},
}
task.Status = &a2a.TaskStatus{
State: state,
Update: statusUpdate,
Timestamp: timestamppb.Now(),
}
// Publish task update
a.client.PublishTaskUpdate(context.Background(), &pb.PublishTaskUpdateRequest{
Task: task,
Routing: &pb.AgentEventMetadata{
FromAgentId: a.agentId,
EventType: "task.status_update",
},
})
}
4. A2A Result Delivery
A2A task completion delivers results through structured artifacts:
func (a *Agent) completeTaskWithArtifacts(task *a2a.Task, resultData interface{}) {
// Create completion message
completionMessage := &a2a.Message{
MessageId: "msg_" + uuid.New().String(),
TaskId: task.Id,
Role: a2a.Role_AGENT,
Content: []*a2a.Part{
{
Part: &a2a.Part_Text{
Text: "Analysis completed successfully",
},
},
},
}
// Create result artifact
resultArtifact := &a2a.Artifact{
ArtifactId: "artifact_" + uuid.New().String(),
Name: "Analysis Results",
Description: "Quarterly sales trend analysis",
Parts: []*a2a.Part{
{
Part: &a2a.Part_Data{
Data: &a2a.DataPart{
Data: resultData.(structpb.Struct),
Description: "Analysis results and metrics",
},
},
},
},
}
// Update task to completed
task.Status = &a2a.TaskStatus{
State: a2a.TaskState_TASK_STATE_COMPLETED,
Update: completionMessage,
Timestamp: timestamppb.Now(),
}
task.Artifacts = append(task.Artifacts, resultArtifact)
// Publish final task update
a.client.PublishTaskUpdate(context.Background(), &pb.PublishTaskUpdateRequest{
Task: task,
Routing: &pb.AgentEventMetadata{
FromAgentId: a.agentId,
EventType: "task.completed",
},
})
// Publish artifact separately
a.client.PublishTaskArtifact(context.Background(), &pb.PublishTaskArtifactRequest{
TaskId: task.Id,
Artifact: resultArtifact,
Routing: &pb.AgentEventMetadata{
FromAgentId: a.agentId,
EventType: "task.artifact",
},
})
}
A2A Task Design Patterns
1. Simple A2A Request-Response
The most basic pattern where one agent requests work from another using A2A messages:
Agent A ──[A2A Task]──> AgentHub ──[TaskEvent]──> Agent B
Agent A <─[Artifact]─── AgentHub <─[TaskUpdate]── Agent B
A2A Implementation:
// Agent A creates task
task := &a2a.Task{
Id: "simple_task_123",
Status: &a2a.TaskStatus{
State: a2a.TaskState_TASK_STATE_SUBMITTED,
Update: &a2a.Message{
Role: a2a.Role_USER,
Content: []*a2a.Part{{Part: &a2a.Part_Text{Text: "Convert CSV to JSON"}}},
},
},
}
// Agent B responds with artifact
artifact := &a2a.Artifact{
Name: "Converted Data",
Parts: []*a2a.Part{{Part: &a2a.Part_File{File: &a2a.FilePart{FileId: "converted.json"}}}},
}
Use cases:
- File format conversion
- Simple calculations
- Data validation
- Content generation
2. A2A Broadcast Processing
One agent broadcasts a task to multiple potential processors using A2A context-aware routing:
Agent A ──[A2A Task]──> AgentHub ──[TaskEvent]──> Agent B₁
├─[TaskEvent]──> Agent B₂
└─[TaskEvent]──> Agent B₃
A2A Implementation:
// Broadcast task with shared context
task := &a2a.Task{
Id: "broadcast_task_456",
ContextId: "parallel_processing_context",
Status: &a2a.TaskStatus{
State: a2a.TaskState_TASK_STATE_SUBMITTED,
Update: &a2a.Message{
Role: a2a.Role_USER,
Content: []*a2a.Part{
{Part: &a2a.Part_Text{Text: "Process data chunk"}},
{Part: &a2a.Part_Data{Data: &a2a.DataPart{Data: chunkData}}},
},
},
},
}
// Publish without specific target (broadcast)
client.PublishTaskUpdate(ctx, &pb.PublishTaskUpdateRequest{
Task: task,
Routing: &pb.AgentEventMetadata{
FromAgentId: "orchestrator",
// No ToAgentId = broadcast
EventType: "task.broadcast",
},
})
Use cases:
- Distributed computation
- Load testing
- Content distribution
- Parallel processing
3. A2A Pipeline Processing
Tasks flow through a series of specialized agents using shared A2A context:
Agent A ──[A2A Task₁]──> Agent B ──[A2A Task₂]──> Agent C ──[A2A Task₃]──> Agent D
<──[Final Artifact]───────────────────────────────────────────────────┘
A2A Implementation:
// Shared context for pipeline
pipelineContext := "data_pipeline_" + uuid.New().String()
// Stage 1: Data extraction
task1 := &a2a.Task{
Id: "extract_" + uuid.New().String(),
ContextId: pipelineContext,
Status: &a2a.TaskStatus{
State: a2a.TaskState_TASK_STATE_SUBMITTED,
Update: &a2a.Message{
Role: a2a.Role_USER,
Content: []*a2a.Part{{Part: &a2a.Part_Text{Text: "Extract data from source"}}},
},
},
}
// Stage 2: Data transformation (triggered by Stage 1 completion)
task2 := &a2a.Task{
Id: "transform_" + uuid.New().String(),
ContextId: pipelineContext, // Same context
Status: &a2a.TaskStatus{
State: a2a.TaskState_TASK_STATE_SUBMITTED,
Update: &a2a.Message{
Role: a2a.Role_USER,
Content: []*a2a.Part{{Part: &a2a.Part_Text{Text: "Transform extracted data"}}},
},
},
}
// Context linking enables pipeline coordination
Use cases:
- Data processing pipelines
- Image processing workflows
- Document processing chains
- ETL operations
4. A2A Hierarchical Decomposition
Complex tasks are broken down into subtasks using A2A context hierarchy:
Agent A ──[A2A ComplexTask]──> Coordinator
├──[A2A SubTask₁]──> Specialist₁
├──[A2A SubTask₂]──> Specialist₂
└──[A2A SubTask₃]──> Specialist₃
A2A Implementation:
// Parent task
parentTask := &a2a.Task{
Id: "complex_analysis_789",
ContextId: "business_workflow_123",
Status: &a2a.TaskStatus{
State: a2a.TaskState_TASK_STATE_SUBMITTED,
Update: &a2a.Message{
Role: a2a.Role_USER,
Content: []*a2a.Part{{Part: &a2a.Part_Text{Text: "Perform comprehensive business analysis"}}},
},
},
}
// Coordinator creates subtasks with hierarchical context
subtask1 := &a2a.Task{
Id: "financial_analysis_790",
ContextId: "business_workflow_123", // Same parent context
Metadata: map[string]interface{}{
"parent_task_id": "complex_analysis_789",
"subtask_type": "financial",
},
}
subtask2 := &a2a.Task{
Id: "market_analysis_791",
ContextId: "business_workflow_123", // Same parent context
Metadata: map[string]interface{}{
"parent_task_id": "complex_analysis_789",
"subtask_type": "market",
},
}
// Context enables coordination and result aggregation
Use cases:
- Complex business workflows
- Multi-step analysis
- Orchestrated services
- Batch job coordination
5. Competitive Processing
Multiple agents compete to handle the same task (first-come-first-served):
Agent A ──[Task]──> Broker ──[Task]──> Agent B₁ (accepts)
├─[Task]──> Agent B₂ (rejects)
└─[Task]──> Agent B₃ (rejects)
Use cases:
- Resource-constrained environments
- Load balancing
- Fault tolerance
- Performance optimization
A2A Task Content and Semantics
A2A Message-Based Classification
In A2A, task classification is handled through message content rather than rigid type fields, providing more flexibility:
Content-Based Classification
// Data processing task
message := &a2a.Message{
Content: []*a2a.Part{
{Part: &a2a.Part_Text{Text: "Analyze quarterly sales data for trends"}},
{Part: &a2a.Part_Data{Data: &a2a.DataPart{Description: "Analysis parameters"}}},
},
}
// Image processing task
message := &a2a.Message{
Content: []*a2a.Part{
{Part: &a2a.Part_Text{Text: "Generate product image with specifications"}},
{Part: &a2a.Part_Data{Data: &a2a.DataPart{Description: "Image requirements"}}},
},
}
// Notification task
message := &a2a.Message{
Content: []*a2a.Part{
{Part: &a2a.Part_Text{Text: "Send completion notification to user"}},
{Part: &a2a.Part_Data{Data: &a2a.DataPart{Description: "Notification details"}}},
},
}
Operation-Based Classification
create.* - Creation operations
update.* - Modification operations
delete.* - Removal operations
analyze.* - Analysis operations
transform.* - Transformation operations
Complexity-Based Classification
simple.* - Quick, low-resource tasks
standard.* - Normal processing tasks
complex.* - Resource-intensive tasks
background.* - Long-running batch tasks
A2A Content Design Guidelines
Be Explicit: Include all information needed for execution in structured Parts
// Good: Explicit A2A content
content := []*a2a.Part{
{
Part: &a2a.Part_Text{
Text: "Convert CSV file to JSON format with specific options",
},
},
{
Part: &a2a.Part_Data{
Data: &a2a.DataPart{
Data: structpb.NewStruct(map[string]interface{}{
"source_format": "csv",
"target_format": "json",
"include_headers": true,
"delimiter": ",",
"encoding": "utf-8",
}),
Description: "Conversion parameters",
},
},
},
{
Part: &a2a.Part_File{
File: &a2a.FilePart{
FileId: "source_data.csv",
Filename: "data.csv",
MimeType: "text/csv",
},
},
},
}
// Poor: Ambiguous A2A content
content := []*a2a.Part{
{
Part: &a2a.Part_Text{
Text: "Convert file", // Too vague
},
},
}
Use Standard Data Types: Leverage common formats for interoperability
// Good: Standard formats
{
"timestamp": "2024-01-15T10:30:00Z", // ISO 8601
"amount": "123.45", // String for precision
"coordinates": {"lat": 40.7128, "lng": -74.0060}
}
Include Validation Information: Help agents validate inputs
{
"email": "user@example.com",
"email_format": "rfc5322",
"max_length": 254,
"required": true
}
A2A Error Handling and Edge Cases
A2A Task Rejection
Agents should provide meaningful rejection reasons using A2A message format:
func (a *Agent) rejectA2ATask(task *a2a.Task, reason string) {
// Create rejection message
rejectionMessage := &a2a.Message{
MessageId: "msg_" + uuid.New().String(),
TaskId: task.Id,
Role: a2a.Role_AGENT,
Content: []*a2a.Part{
{
Part: &a2a.Part_Text{
Text: "Task rejected: " + reason,
},
},
{
Part: &a2a.Part_Data{
Data: &a2a.DataPart{
Data: structpb.NewStruct(map[string]interface{}{
"rejection_reason": reason,
"agent_id": a.agentId,
"timestamp": time.Now().Unix(),
}),
Description: "Rejection details",
},
},
},
},
}
// Update task status to failed
task.Status = &a2a.TaskStatus{
State: a2a.TaskState_TASK_STATE_FAILED,
Update: rejectionMessage,
Timestamp: timestamppb.Now(),
}
a.publishTaskUpdate(task)
}
Common rejection reasons:
UNSUPPORTED_TASK_TYPE: Agent doesn’t handle this task typeCAPACITY_EXCEEDED: Agent is at maximum capacityDEADLINE_IMPOSSIBLE: Cannot complete within deadlineINVALID_PARAMETERS: Task parameters are malformedRESOURCE_UNAVAILABLE: Required external resources unavailable
Timeout Handling
Both requesters and processors should handle timeouts gracefully:
// Requester timeout
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
select {
case result := <-resultChannel:
// Process result
case <-ctx.Done():
// Handle timeout - possibly retry or fail
}
// Processor timeout
func (a *Agent) executeWithTimeout(task *pb.TaskMessage) {
deadline := task.GetDeadline().AsTime()
ctx, cancel := context.WithDeadline(context.Background(), deadline)
defer cancel()
select {
case result := <-a.processTask(ctx, task):
a.publishResult(task, result, pb.TaskStatus_TASK_STATUS_COMPLETED)
case <-ctx.Done():
a.publishResult(task, nil, pb.TaskStatus_TASK_STATUS_FAILED, "Deadline exceeded")
}
}
Partial Results
For long-running tasks, consider supporting partial results:
type PartialResult struct {
TaskId string
CompletedPortion float64 // 0.0 to 1.0
IntermediateData interface{}
CanResume bool
ResumeToken string
}
Best Practices
Task Design
- Make task types granular but not too fine-grained
- Design for idempotency when possible
- Include retry information in metadata
- Use consistent parameter naming across similar task types
- Version your task schemas to enable evolution
- Batch related tasks when appropriate
- Use appropriate priority levels to avoid starvation
- Set realistic deadlines based on historical performance
- Include resource hints to help with scheduling
- Monitor task completion rates to identify bottlenecks
Security Considerations
- Validate all task parameters before processing
- Sanitize user-provided data in task parameters
- Include authorization context in metadata
- Log task execution for audit trails
- Encrypt sensitive parameters when necessary
A2A tasks form the foundation of Agent2Agent communication, enabling sophisticated distributed processing patterns through structured messages, artifacts, and context-aware coordination. The A2A protocol’s flexible message format and EDA integration provide robust, scalable agent networks with clear semantics and strong observability. Proper A2A task design leverages the protocol’s strengths for building maintainable, interoperable agent systems.
3 -
Agent Discovery Workflow Explained
This document explains how the agent discovery workflow operates in AgentHub, enabling dynamic registration and LLM-based orchestration.
Overview
Agent discovery is the process by which agents dynamically register their capabilities with the Cortex orchestrator, making themselves available for intelligent task delegation via an LLM (Large Language Model).
The Problem This Solves
Traditional multi-agent systems require:
- Hard-coded agent configurations
- Static routing rules
- Manual updates when adding new agents
- No intelligence in task routing
Agent discovery with Cortex provides:
- Dynamic registration: Agents announce themselves when they start
- Intelligent routing: LLM decides which agent to use based on capabilities
- Zero configuration: No central registry to update
- Scalable: Add or remove agents without system changes
How It Works
The Five-Step Flow
┌──────────────┐
│ 1. Agent │ Agent starts and creates an AgentCard
│ Startup │ describing its capabilities
└──────┬───────┘
│
▼
┌──────────────┐
│ 2. Register │ Agent calls RegisterAgent RPC
│ with │ sending the AgentCard to broker
│ Broker │
└──────┬───────┘
│
▼
┌──────────────┐
│ 3. Event │ Broker publishes AgentCardEvent
│ Publishing│ broadcasting to all subscribers
└──────┬───────┘
│
▼
┌──────────────┐
│ 4. Cortex │ Cortex receives event and stores
│ Discovery │ agent in its registry
└──────┬───────┘
│
▼
┌──────────────┐
│ 5. LLM │ Agent is now available in LLM
│ Integration│ prompts for intelligent delegation
└──────────────┘
Step 1: Agent Startup
When an agent starts, it creates an AgentCard that describes:
agentCard := &pb.AgentCard{
Name: "agent_translator",
Description: "Language translation service",
Version: "1.0.0",
Skills: []*pb.AgentSkill{
{
Name: "Text Translation",
Description: "Translates text between languages",
Examples: [
"Translate this to Spanish",
"Convert to French",
],
},
},
}
Key Components:
- Name: Unique identifier (used for routing)
- Description: What the agent does (helps LLM understand)
- Skills: Specific capabilities with examples (used for matching)
Step 2: Registration with Broker
The agent registers by calling the broker’s RegisterAgent RPC:
client.RegisterAgent(ctx, &pb.RegisterAgentRequest{
AgentCard: agentCard,
Subscriptions: []string{"translation_request"},
})
What happens:
- Broker validates the AgentCard
- Stores agent in its registry:
registeredAgents[agentID] = card - Returns success response
Step 3: Event Publishing
The broker immediately publishes an AgentCardEvent:
event := &pb.AgentEvent{
EventId: "agent_registered_translator_...",
Timestamp: now(),
Payload: &pb.AgentEvent_AgentCard{
AgentCard: &pb.AgentCardEvent{
AgentId: "agent_translator",
AgentCard: agentCard,
EventType: "registered",
},
},
Routing: &pb.AgentEventMetadata{
FromAgentId: "agent_translator",
ToAgentId: "", // Broadcast
EventType: "agent.registered",
Priority: PRIORITY_HIGH,
},
}
Routing characteristics:
- Broadcast to all subscribers (empty
ToAgentId) - High priority (processed immediately)
- Event type clearly marked as “agent.registered”
Step 4: Cortex Discovery
Cortex subscribes to agent events:
stream, _ := client.SubscribeToAgentEvents(ctx, &pb.SubscribeToAgentEventsRequest{
AgentId: "cortex",
EventTypes: []string{"agent.registered", "agent.updated"},
})
When receiving an agent card event, Cortex:
func handleAgentCardEvent(event *pb.AgentCardEvent) {
agentID := event.GetAgentId()
agentCard := event.GetAgentCard()
// Store agent
cortex.RegisterAgent(agentID, agentCard)
// Log skills for visibility
log.Info("Agent registered",
"agent_id", agentID,
"skills", extractSkillNames(agentCard.Skills))
}
Result: Agent is now in Cortex’s registeredAgents map.
Step 5: LLM Integration
When a user sends a request, Cortex queries the LLM:
decision, _ := llm.Decide(
conversationHistory,
availableAgents, // Includes our new agent!
newUserMessage,
)
The LLM sees:
Available agents:
- agent_translator: Language translation service
Skills:
* Text Translation: Translates text between languages
Examples: "Translate this to Spanish", "Convert to French"
Decision making:
- User asks: “Can you translate this to Spanish?”
- LLM sees “agent_translator” with matching examples
- LLM decides: Delegate to agent_translator
- Cortex sends task to agent_translator
- Agent processes and responds
- Cortex synthesizes final response
Message Flow Diagram
sequenceDiagram
participant A as Translation Agent
participant B as Broker
participant C as Cortex
participant L as LLM (VertexAI)
participant U as User
Note over A: Step 1: Startup
A->>A: Create AgentCard
Note over A,B: Step 2: Registration
A->>B: RegisterAgent(card)
B->>B: Store in registry
Note over B: Step 3: Event Publishing
B->>C: AgentCardEvent (broadcast)
Note over C: Step 4: Discovery
C->>C: RegisterAgent(id, card)
C->>C: total_agents++
Note over U,L: Step 5: LLM Integration
U->>C: "Translate to Spanish"
C->>L: Decide(availableAgents)
Note over L: Sees translator agent<br/>with matching examples
L-->>C: {delegate: agent_translator}
C->>A: Task message
A->>A: Process translation
A->>C: Result
C->>L: Synthesize
L-->>C: Final response
C->>U: "Here's the Spanish: ..."Technical Implementation Details
Thread Safety
Agent registration is thread-safe:
type AgentHubService struct {
registeredAgents map[string]*pb.AgentCard
agentsMu sync.RWMutex
}
func (s *AgentHubService) RegisterAgent(...) {
s.agentsMu.Lock()
s.registeredAgents[agentID] = card
s.agentsMu.Unlock()
}
Multiple agents can register concurrently without conflicts.
Event Delivery
Events are delivered asynchronously:
for _, subChan := range targetChannels {
go func(ch chan *pb.AgentEvent) {
select {
case ch <- event:
// Delivered
case <-time.After(5 * time.Second):
// Timeout
}
}(subChan)
}
Benefits:
- Non-blocking: Broker doesn’t wait for all deliveries
- Resilient: Timeout prevents hanging
- Concurrent: Multiple subscribers receive events in parallel
LLM Prompt Generation
Cortex builds prompts dynamically:
func buildOrchestrationPrompt(availableAgents []*pb.AgentCard) string {
prompt := "Available agents:\n"
for _, agent := range availableAgents {
prompt += fmt.Sprintf("- %s: %s\n",
agent.Name, agent.Description)
for _, skill := range agent.Skills {
prompt += fmt.Sprintf(" Skills:\n")
prompt += fmt.Sprintf(" * %s: %s\n",
skill.Name, skill.Description)
}
}
return prompt
}
Updated automatically when new agents register.
Typical timings for agent discovery:
Agent startup: 100-200ms
RegisterAgent RPC: < 10ms
Event publishing: < 5ms
Event delivery: < 50ms
Cortex processing: < 10ms
Total discovery time: < 300ms
Fast enough that agents are available for routing within milliseconds of starting.
Error Handling
Registration Failures
If registration fails:
_, err := client.RegisterAgent(ctx, req)
if err != nil {
log.Error("Registration failed", "error", err)
// Agent should retry or exit
panic(err)
}
Common causes:
- Broker not running
- Network issues
- Invalid AgentCard (empty name)
Event Delivery Failures
If event delivery fails:
if err := s.routeEvent(ctx, event); err != nil {
log.Warn("Event routing failed", "error", err)
// Continue anyway - registration still succeeded
}
Graceful degradation: Registration succeeds even if event routing fails.
Cortex Not Subscribed
If Cortex isn’t subscribed yet:
- Events are still published
- Cortex can query
GetAgentCard() RPC later - Or register when Cortex starts
Resilient: System handles various startup orders.
Observability
Broker Logs
level=INFO msg="Agent registered" agent_id=agent_translator
level=DEBUG msg="Routing event to subscribers"
event_type=agent.registered subscriber_count=2
level=DEBUG msg="Event delivered to subscriber"
Cortex Logs
level=INFO msg="Received agent card event"
agent_id=agent_translator event_type=registered
level=INFO msg="Agent skills registered"
skills="[Text Translation: Translates...]"
level=INFO msg="Agent registered with Cortex orchestrator"
total_agents=3
Distributed Tracing
Agent registration creates trace spans:
agent_registered_translator
└─ broker.route_event
├─ deliver_to_cortex
└─ deliver_to_monitor
Visibility into the entire discovery flow.
Lifecycle Management
Agent Startup Sequence
1. Create AgentHub client
2. Connect to broker
3. Create AgentCard
4. Call RegisterAgent
5. Subscribe to messages
6. Enter processing loop
Agent Shutdown
Currently agents don’t explicitly unregister. For graceful shutdown:
// In future enhancement:
defer client.UnregisterAgent(ctx, &pb.UnregisterAgentRequest{
AgentId: myAgentID,
})
This would trigger an “agent.unregistered” event.
Agent Updates
To update capabilities:
// Modify AgentCard
agentCard.Skills = append(agentCard.Skills, newSkill)
// Re-register
client.RegisterAgent(ctx, &pb.RegisterAgentRequest{
AgentCard: agentCard,
})
// Triggers "agent.updated" event
Cortex receives update and refreshes its registry.
Comparison with Other Patterns
vs. Service Discovery (Consul, etcd)
Agent Discovery:
- Includes capability metadata (skills)
- Optimized for LLM consumption
- Event-driven notification
- Rich semantic information
Service Discovery:
- Network location only
- Health checks
- Static metadata
- Pull-based queries
vs. API Gateway
Agent Discovery:
- Dynamic routing based on content
- LLM makes intelligent decisions
- Supports complex multi-step workflows
API Gateway:
- Path-based routing
- Static configuration
- Single request-response
vs. Message Queues
Agent Discovery:
- Agents know their capabilities
- Centralized intelligence (Cortex)
- Rich metadata for decisions
Message Queues:
- Topic-based routing
- No central intelligence
- Minimal metadata
Design Decisions
Why Broadcast Events?
Decision: Publish agent cards to all subscribers
Alternatives considered:
- Point-to-point to Cortex only
- Store-and-query model
Rationale:
- Multiple orchestrators can coexist
- Monitoring agents can track all agents
- Extensible for future use cases
- Low overhead (events are small)
Why High Priority?
Decision: Agent registration events use PRIORITY_HIGH
Rationale:
- New agents should be available quickly
- User requests may come immediately
- Discovery is time-sensitive
- Low volume (not many registrations)
Why Skills with Examples?
Decision: Include example user requests in skills
Rationale:
- LLMs learn by example
- Natural language is ambiguous
- Examples disambiguate capabilities
- Improves matching accuracy
Future Enhancements
See AGENT_DECIDE.md for planned improvements:
- Agent Health Monitoring: Track agent availability
- Agent Deregistration: Explicit removal from registry
- Agent Versioning: Support multiple versions simultaneously
- Capability Queries: Search agents by capability
- Load Balancing: Distribute work among multiple instances
Conclusion
The agent discovery workflow enables:
- Zero-configuration agent deployment
- Intelligent routing via LLM
- Dynamic scaling of agent pools
- Automatic orchestration based on capabilities
- Flexible, extensible multi-agent systems
This architecture supports truly autonomous, self-organizing agent networks that can adapt to changing requirements without manual intervention.
4 -
The Agent2Agent Protocol and AgentHub Implementation
This document explores the core principles of Google’s Agent2Agent protocol and how AgentHub implements a communication broker based on these concepts. We distinguish between the Agent2Agent protocol specification (task structures and communication patterns) and our custom AgentHub broker implementation.
Agent2Agent vs AgentHub: What’s What
Agent2Agent Protocol (Google)
The Agent2Agent protocol defines:
- Task Message Structures:
TaskMessage, TaskResult, TaskProgress with their fields and semantics - Task Status and Priority Enums: Standardized task lifecycle and priority levels
- Communication Patterns: Asynchronous task delegation and result reporting concepts
AgentHub Implementation (This Project)
AgentHub provides:
- Event Bus Broker: Centralized gRPC service that routes tasks between agents
- Pub/Sub Architecture: Publisher-subscriber pattern for task distribution
- Subscription Mechanisms:
SubscribeToTasks, SubscribeToTaskResults, SubscribeToTaskProgress methods - Agent Implementations: Sample publisher and subscriber agents demonstrating the protocol
Philosophy and Core Concepts
Beyond Simple Request-Response
Traditional software architectures rely heavily on synchronous request-response patterns where a client requests a service and waits for an immediate response. While effective for simple operations, this pattern has limitations when dealing with:
- Complex, multi-step processes that require coordination between multiple specialized services
- Long-running operations that may take minutes or hours to complete
- Dynamic workload distribution where the best processor for a task may vary over time
- Autonomous decision-making where agents need to collaborate without central coordination
The Agent2Agent protocol addresses these limitations by defining task structures and communication patterns for autonomous agents. AgentHub implements a broker-based system that enables agents to communicate using Agent2Agent-inspired task structures:
- Delegating work to other agents based on their capabilities
- Accepting and processing tasks according to their specializations
- Reporting progress during long-running operations
- Making collaborative decisions about task distribution and execution
Autonomous Collaboration
In an Agent2Agent system, each agent operates with a degree of autonomy, making decisions about:
- Which tasks to accept based on current capacity and capabilities
- How to prioritize work when multiple tasks are pending
- When to delegate subtasks to other specialized agents
- How to report progress and handle failures
This autonomy enables the system to be more resilient, scalable, and adaptive compared to centrally-controlled architectures.
Key Design Principles
1. Asynchronous Communication
Agent2Agent communication is fundamentally asynchronous. When Agent A requests work from Agent B:
- Agent A doesn’t block waiting for completion
- Agent B can process the task when resources are available
- Progress updates provide visibility into long-running operations
- Results are delivered when the work is complete
This asynchronicity enables:
- Better resource utilization as agents aren’t blocked waiting
- Improved scalability as systems can handle more concurrent operations
- Enhanced resilience as temporary agent unavailability doesn’t block the entire system
2. Rich Task Semantics (Agent2Agent Protocol)
The Agent2Agent protocol defines rich task message structures that AgentHub implements:
message TaskMessage {
string task_id = 1; // Unique identifier for tracking
string task_type = 2; // Semantic type (e.g., "data_analysis")
google.protobuf.Struct parameters = 3; // Flexible parameters
string requester_agent_id = 4; // Who requested the work
string responder_agent_id = 5; // Who should do the work (optional)
google.protobuf.Timestamp deadline = 6; // When it needs to be done
Priority priority = 7; // How urgent it is
google.protobuf.Struct metadata = 8; // Additional context
}
This rich structure enables:
- Intelligent routing based on task type and agent capabilities
- Priority-based scheduling to ensure urgent tasks are handled first
- Deadline awareness for time-sensitive operations
- Context preservation for better decision-making
3. Explicit Progress Tracking
Long-running tasks benefit from explicit progress reporting:
message TaskProgress {
string task_id = 1; // Which task this refers to
TaskStatus status = 2; // Current status
string progress_message = 3; // Human-readable description
int32 progress_percentage = 4; // Quantitative progress (0-100)
google.protobuf.Struct progress_data = 5; // Structured progress information
}
This enables:
- Visibility into system operations for monitoring and debugging
- User experience improvements with real-time progress indicators
- Resource planning by understanding how long operations typically take
- Early failure detection when progress stalls unexpectedly
4. Flexible Agent Addressing
The protocol supports multiple addressing patterns:
- Direct addressing: Tasks sent to specific agents by ID
- Broadcast addressing: Tasks sent to all capable agents
- Capability-based routing: Tasks routed based on agent capabilities
- Load-balanced routing: Tasks distributed among agents with similar capabilities
This flexibility enables different architectural patterns within the same system.
Architectural Patterns
Microservices Enhancement
In a microservices architecture, Agent2Agent can enhance service communication by:
- Replacing synchronous HTTP calls with asynchronous task delegation
- Adding progress visibility to long-running service operations
- Enabling service composition through task chaining
- Improving resilience through task retry and timeout mechanisms
Event-Driven Architecture Integration
Agent2Agent complements event-driven architectures by:
- Adding structure to event processing with explicit task semantics
- Enabling bidirectional communication where events can trigger tasks that produce responses
- Providing progress tracking for complex event processing workflows
- Supporting task-based coordination alongside pure event broadcasting
Workflow Orchestration
Complex business processes can be modeled as Agent2Agent workflows:
- Process Initiation: A workflow agent receives a high-level business request
- Task Decomposition: The request is broken down into specific tasks
- Agent Coordination: Tasks are distributed to specialized agents
- Progress Aggregation: Individual task progress is combined into overall workflow status
- Result Assembly: Task results are combined into a final business outcome
Benefits and Trade-offs
Benefits
Scalability: Asynchronous operation and agent autonomy enable horizontal scaling without central bottlenecks.
Resilience: Agent failures don’t cascade as easily since tasks can be retried or redistributed.
Flexibility: New agent types can be added without modifying existing agents.
Observability: Rich task semantics and progress reporting provide excellent visibility into system operations.
Modularity: Agents can be developed, deployed, and scaled independently.
Trade-offs
Complexity: The system requires more sophisticated error handling and state management compared to simple request-response patterns.
Latency: For simple operations, the overhead of task creation and routing may add latency compared to direct calls.
Debugging: Distributed, asynchronous operations can be more challenging to debug than synchronous call chains.
Consistency: Managing data consistency across asynchronous agent operations requires careful design.
When to Use Agent2Agent
Agent2Agent is particularly well-suited for:
Complex Processing Pipelines
When work involves multiple steps that can be performed by different specialized agents:
- Data ingestion → validation → transformation → analysis → reporting
- Image upload → virus scan → thumbnail generation → metadata extraction
- Order processing → inventory check → payment processing → fulfillment
Long-Running Operations
When operations take significant time and users need progress feedback:
- Large file processing
- Machine learning model training
- Complex data analysis
- Batch job processing
Dynamic Load Distribution
When workload characteristics vary and different agents may be better suited for different tasks:
- Multi-tenant systems with varying customer requirements
- Resource-intensive operations that need specialized hardware
- Geographic distribution where local processing is preferred
System Integration
When connecting heterogeneous systems that need to coordinate:
- Third-party service coordination
- Cross-platform workflows
Comparison with Other Patterns
vs. Message Queues
Traditional message queues provide asynchronous communication but lack:
- Rich task semantics
- Progress tracking
- Bidirectional result delivery
- Priority and deadline awareness
vs. RPC/HTTP APIs
RPC and HTTP APIs provide structured communication but are typically:
- Synchronous (blocking)
- Lacking progress visibility
- Point-to-point rather than flexible routing
- Without built-in retry and timeout semantics
vs. Event Sourcing
Event sourcing provides audit trails and state reconstruction but:
- Focuses on state changes rather than work coordination
- Lacks explicit progress tracking
- Doesn’t provide direct task completion feedback
- Requires more complex query patterns for current state
The SubAgent Library: Simplifying Agent Development
While the Agent2Agent protocol and AgentHub broker provide powerful capabilities for building distributed agent systems, implementing agents from scratch requires significant boilerplate code. The SubAgent library addresses this by providing a high-level abstraction that handles infrastructure concerns, letting developers focus on business logic.
The Problem: Too Much Boilerplate
Traditional agent implementation requires:
- ~200+ lines of setup code: gRPC client configuration, connection management, health checks
- A2A protocol compliance: Correct AgentCard structure with all required fields
- Subscription management: Setting up task streams and handling lifecycle
- Observability integration: Manual tracing span creation, logging, metrics
- Error handling: Graceful shutdown, signal handling, resource cleanup
This creates several issues:
- High barrier to entry: New agents require deep knowledge of the infrastructure
- Code duplication: Every agent reimplements the same patterns
- Maintenance burden: Infrastructure changes require updates across all agents
- Inconsistent quality: Some agents may have better observability or error handling than others
The Solution: Infrastructure as a Library
The SubAgent library encapsulates all infrastructure concerns into a simple, composable API:
// 1. Configure your agent
config := &subagent.Config{
AgentID: "my_agent",
Name: "My Agent",
Description: "Does something useful",
}
// 2. Create and register skills
agent, _ := subagent.New(config)
agent.MustAddSkill("Skill Name", "Description", handlerFunc)
// 3. Run (everything else is automatic)
agent.Run(ctx)
This reduces agent implementation from ~200 lines to ~50 lines (75% reduction), letting developers focus entirely on their domain logic.
Architecture
The SubAgent library implements a layered architecture:
┌─────────────────────────────────────────┐
│ Your Business Logic │
│ (Handler Functions: ~30 lines) │
├─────────────────────────────────────────┤
│ SubAgent Library │
│ - Config & Validation │
│ - AgentCard Creation (A2A compliant) │
│ - Task Subscription & Routing │
│ - Automatic Observability │
│ - Lifecycle Management │
├─────────────────────────────────────────┤
│ AgentHub Client Library │
│ - gRPC Connection │
│ - Message Publishing/Subscription │
│ - TraceManager, Metrics, Logging │
├─────────────────────────────────────────┤
│ AgentHub Broker │
│ - Event Routing │
│ - Agent Registry │
│ - Task Distribution │
└─────────────────────────────────────────┘
Key Features
1. Declarative Configuration
Instead of imperative setup code, agents use declarative configuration:
config := &subagent.Config{
AgentID: "agent_translator", // Required
Name: "Translation Agent", // Required
Description: "Translates text", // Required
Version: "1.0.0", // Optional, defaults
HealthPort: "8087", // Optional, defaults
}
The library:
- Validates all required fields
- Applies sensible defaults for optional fields
- Returns clear error messages for configuration issues
2. Skill-Based Programming Model
Agents define capabilities as “skills” - discrete units of functionality:
agent.MustAddSkill(
"Language Translation", // Name (shown to LLM)
"Translates text between languages", // Description
translateHandler, // Implementation
)
Each skill maps to a handler function with a clear signature:
func (ctx, task, message) -> (artifact, state, errorMessage)
This model:
- Encourages single-responsibility design
- Makes capabilities explicit and discoverable
- Simplifies testing (handlers are pure functions)
- Enables skill-based task routing
3. Automatic A2A Compliance
The library generates complete, A2A-compliant AgentCards:
// Developer writes:
agent.MustAddSkill("Translate", "Translates text", handler)
// Library generates:
&pb.AgentCard{
ProtocolVersion: "0.2.9",
Name: "agent_translator",
Description: "Translation Agent",
Version: "1.0.0",
Skills: []*pb.AgentSkill{
{
Id: "skill_0",
Name: "Translate",
Description: "Translates text",
Tags: []string{"Translate"},
InputModes: []string{"text/plain"},
OutputModes: []string{"text/plain"},
},
},
Capabilities: &pb.AgentCapabilities{
Streaming: false,
PushNotifications: false,
},
}
This ensures all agents follow protocol standards without manual effort.
4. Built-In Observability
Every task execution is automatically wrapped with observability:
Tracing:
// Automatic span creation for each task
taskSpan := traceManager.StartSpan(ctx, "agent.{agentID}.handle_task")
traceManager.AddA2ATaskAttributes(taskSpan, taskID, skillName, contextID, ...)
traceManager.SetSpanSuccess(taskSpan) // or RecordError()
Logging:
// Automatic structured logging
logger.InfoContext(ctx, "Processing task", "task_id", taskID, "skill", skillName)
logger.ErrorContext(ctx, "Task failed", "error", err)
Metrics:
- Task processing duration
- Success/failure counts
- Active task count
- (via AgentHubClient metrics)
Developers get full distributed tracing and logging without writing any observability code.
5. Lifecycle Management
The library handles the complete agent lifecycle:
Startup:
- Validate configuration
- Connect to broker (with retries)
- Register AgentCard
- Subscribe to tasks
- Start health check server
- Signal “ready”
Runtime:
- Receive tasks from broker
- Route to appropriate handler
- Execute with tracing/logging
- Publish results
- Handle errors gracefully
Shutdown:
- Catch SIGINT/SIGTERM signals
- Stop accepting new tasks
- Wait for in-flight tasks (with timeout)
- Close broker connection
- Cleanup resources
- Exit cleanly
All automatically - developers never write lifecycle code.
Design Patterns
The Handler Pattern
Handlers are pure functions that transform inputs to outputs:
func myHandler(ctx context.Context, task *pb.Task, message *pb.Message)
(*pb.Artifact, pb.TaskState, string) {
// Extract input
input := extractInput(message)
// Validate
if err := validate(input); err != nil {
return nil, TASK_STATE_FAILED, err.Error()
}
// Process
result := process(ctx, input)
// Create artifact
artifact := createArtifact(result)
return artifact, TASK_STATE_COMPLETED, ""
}
This pattern:
- Testable: Pure functions are easy to unit test
- Composable: Handlers can call other functions
- Error handling: Explicit return of state and error message
- Context-aware: Receives context for cancellation and tracing
The Configuration Pattern
Configuration is separated from code:
// Development
config := &subagent.Config{
AgentID: "my_agent",
HealthPort: "8080",
}
// Production (from environment)
config := &subagent.Config{
AgentID: os.Getenv("AGENT_ID"),
BrokerAddr: os.Getenv("BROKER_ADDR"),
HealthPort: os.Getenv("HEALTH_PORT"),
}
This enables:
- Different configs for dev/staging/prod
- Easy testing with mock configs
- Container-friendly (12-factor app)
Benefits
For Developers:
- Faster development: 75% less code to write
- Lower complexity: Focus on business logic, not infrastructure
- Better quality: Automatic best practices (observability, error handling)
- Easier testing: Handler functions are pure and testable
- Clearer structure: Skill-based organization is intuitive
For Operations:
- Consistent observability: All agents have same tracing/logging
- Standard health checks: Uniform health endpoints
- Predictable behavior: Lifecycle management is consistent
- Easy monitoring: Metrics are built-in
- Reliable shutdown: Graceful handling is automatic
For the System:
- Better integration: All agents follow same patterns
- Easier debugging: Consistent trace structure across agents
- Simplified maintenance: Library updates improve all agents
- Reduced errors: Less custom code means fewer bugs
Evolution Path
The SubAgent library provides a clear evolution path for agent development:
Phase 1: Simple Agents (Current)
- Single skills, synchronous processing
- Text input/output
- Uses library defaults
Phase 2: Advanced Agents
- Multiple skills per agent
- Streaming responses
- Custom capabilities
- Extended AgentCard fields
Phase 3: Specialized Agents
- Custom observability (additional traces/metrics)
- Advanced error handling
- Multi-modal input/output
- Stateful processing
The library supports all phases through its extensibility points (GetClient(), GetLogger(), custom configs).
Comparison with Manual Implementation
| Aspect | Manual Implementation | SubAgent Library |
|---|
| Lines of Code | ~200 lines setup | ~50 lines total |
| Configuration | 50+ lines imperative | 10 lines declarative |
| AgentCard | Manual struct creation | Automatic generation |
| Observability | Manual span/log calls | Automatic wrapping |
| Lifecycle | Custom signal handling | Built-in management |
| Error Handling | Scattered throughout | Centralized in library |
| Testing | Must mock infrastructure | Test handlers directly |
| Maintenance | Per-agent updates needed | Library update benefits all |
| Learning Curve | High (need infrastructure knowledge) | Low (focus on logic) |
| Time to First Agent | Several hours | Under 30 minutes |
Real-World Impact
The Echo Agent demonstrates the library’s impact:
Before SubAgent Library (211 lines):
- Manual client setup: 45 lines
- AgentCard creation: 30 lines
- Task subscription: 60 lines
- Handler implementation: 50 lines
- Lifecycle management: 26 lines
With SubAgent Library (82 lines):
- Configuration: 10 lines
- Skill registration: 5 lines
- Handler implementation: 50 lines
- Run: 2 lines
- Everything else: automatic
The business logic (50 lines) stays the same, but infrastructure code (161 lines) is eliminated.
When to Use SubAgent Library
Use SubAgent Library when:
- Building new agents from scratch
- Agent has 1-10 skills with clear boundaries
- Standard A2A protocol is sufficient
- You want consistent observability across agents
- Quick development time is important
Consider Manual Implementation when:
- Highly custom protocol requirements
- Need very specific lifecycle control
- Existing agent migration (may not be worth refactoring)
- Experimental/research agents with non-standard patterns
For 99% of agent development, the SubAgent library is the right choice.
Future Evolution
The Agent2Agent principle opens possibilities for:
Intelligent Agent Networks
Agents that learn about each other’s capabilities and performance characteristics to make better delegation decisions.
Self-Organizing Systems
Agent networks that automatically reconfigure based on workload patterns and agent availability.
Cross-Organization Collaboration
Extending Agent2Agent protocols across organizational boundaries for B2B workflow automation.
AI Agent Integration
Natural integration points for AI agents that can understand task semantics and make autonomous decisions about task acceptance and delegation.
The Agent2Agent principle represents a foundational shift toward more intelligent, autonomous, and collaborative software systems that can handle the complexity of modern distributed applications while providing the visibility and control that operators need.