This is the multi-page printable view of this section. Click here to print.

Return to the regular view of this page.

Core Concepts

Fundamental concepts and principles of AgentHub

Core Concepts

Explore the fundamental concepts, principles, and mental models that underpin AgentHub’s agent-to-agent communication system.

Available Documentation

1 - Agent2Agent (A2A) Protocol Migration

Understanding the migration to Agent2Agent protocol compliance while maintaining Event-Driven Architecture benefits.

Agent2Agent (A2A) Protocol Migration

This document explains the migration of AgentHub to full Agent2Agent (A2A) protocol compliance while maintaining the essential Event-Driven Architecture (EDA) patterns that make the system scalable and resilient.

What is the Agent2Agent Protocol?

The Agent2Agent (A2A) protocol is a standardized specification for communication between AI agents. It defines:

  • Standardized Message Formats: Using Message, Part, Task, and Artifact structures
  • Task Lifecycle Management: Clear states (SUBMITTED, WORKING, COMPLETED, FAILED, CANCELLED)
  • Agent Discovery: Using AgentCard for capability advertisement
  • Interoperability: Ensuring agents can communicate across different platforms

Why Migrate to A2A?

Benefits of A2A Compliance

  1. Interoperability: AgentHub can now communicate with any A2A-compliant agent or system
  2. Standardization: Clear, well-defined message formats reduce integration complexity
  3. Ecosystem Compatibility: Join the growing ecosystem of A2A-compatible tools
  4. Future-Proofing: Built on industry standards rather than custom protocols

Maintained EDA Benefits

  • Scalability: Event-driven routing scales to thousands of agents
  • Resilience: Asynchronous communication handles network partitions gracefully
  • Flexibility: Topic-based routing and priority queues enable sophisticated workflows
  • Observability: Built-in tracing and metrics for production deployments

Hybrid Architecture

AgentHub implements a hybrid approach that combines the best of both worlds:

┌─────────────────────────────────────────────────────────────────┐
│                   A2A Protocol Layer                           │
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐  ┌─────────┐│
│  │ A2A Message │  │  A2A Task   │  │ A2A Artifact│  │A2A Agent││
│  │  (standard) │  │ (standard)  │  │ (standard)  │  │  Card   ││
│  └─────────────┘  └─────────────┘  └─────────────┘  └─────────┘│
├─────────────────────────────────────────────────────────────────┤
│                    EDA Transport Layer                         │
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐  ┌─────────┐│
│  │ AgentEvent  │  │Event Router │  │ Subscribers │  │Priority ││
│  │  Wrapper    │  │             │  │  Manager    │  │ Queues  ││
│  └─────────────┘  └─────────────┘  └─────────────┘  └─────────┘│
├─────────────────────────────────────────────────────────────────┤
│                      gRPC Infrastructure                       │
└─────────────────────────────────────────────────────────────────┘

How It Works

  1. A2A Messages are created using standard A2A structures (Message, Task, etc.)
  2. EDA Wrapper wraps A2A messages in AgentEvent for transport
  3. Event Routing uses EDA patterns (pub/sub, priority, topics) for delivery
  4. A2A Compliance ensures messages follow A2A protocol semantics

API Changes

Before (Legacy API)

// Legacy TaskMessage (deprecated)
taskPublisher.PublishTask(ctx, &agenthub.PublishTaskRequest{
    TaskType: "greeting",
    Parameters: map[string]interface{}{
        "name": "Claude",
    },
    RequesterAgentID: "my_agent",
    ResponderAgentID: "target_agent",
})

After (A2A-Compliant API)

// A2A-compliant task publishing
content := []*pb.Part{
    {
        Part: &pb.Part_Text{
            Text: "Hello! Please provide a greeting for Claude.",
        },
    },
}

task, err := taskPublisher.PublishTask(ctx, &agenthub.A2APublishTaskRequest{
    TaskType:         "greeting",
    Content:          content,
    RequesterAgentID: "my_agent",
    ResponderAgentID: "target_agent",
    Priority:         pb.Priority_PRIORITY_MEDIUM,
    ContextID:        "conversation_123",
})

Message Structure Changes

A2A Message Format

message Message {
  string message_id = 1;       // Unique message identifier
  string context_id = 2;       // Conversation context
  string task_id = 3;          // Associated task (optional)
  Role role = 4;               // USER or AGENT
  repeated Part content = 5;   // Message content parts
  google.protobuf.Struct metadata = 6; // Additional metadata
}

message Part {
  oneof part {
    string text = 1;           // Text content
    DataPart data = 2;         // Structured data
    FilePart file = 3;         // File reference
  }
}

A2A Task Format

message Task {
  string id = 1;                    // Task identifier
  string context_id = 2;            // Conversation context
  TaskStatus status = 3;            // Current status
  repeated Message history = 4;     // Message history
  repeated Artifact artifacts = 5;  // Task outputs
  google.protobuf.Struct metadata = 6; // Task metadata
}

enum TaskState {
  TASK_STATE_SUBMITTED = 0;    // Task created
  TASK_STATE_WORKING = 1;      // Task in progress
  TASK_STATE_COMPLETED = 2;    // Task completed successfully
  TASK_STATE_FAILED = 3;       // Task failed
  TASK_STATE_CANCELLED = 4;    // Task cancelled
}

Migration Guide

For Publishers

  1. Replace TaskPublisher with A2ATaskPublisher
  2. Use A2APublishTaskRequest with A2A Part structures
  3. Handle returned A2A Task objects

For Subscribers

  1. Replace TaskSubscriber with A2ATaskSubscriber
  2. Update handlers to process A2A Task and Message objects
  3. Return A2A Artifact objects instead of custom results

For Custom Integrations

  1. Update protobuf imports to use events/a2a package
  2. Replace custom message structures with A2A equivalents
  3. Use AgentHub service instead of EventBus

Backward Compatibility

The migration maintains wire-level compatibility through:

  • Deprecated Types: Legacy message types marked as deprecated but still supported
  • Automatic Conversion: EDA broker converts between legacy and A2A formats when needed
  • Graceful Migration: Existing agents can migrate incrementally

Testing A2A Compliance

Run the demo to verify A2A compliance:

# Terminal 1: Start A2A broker
make run-server

# Terminal 2: Start A2A subscriber
make run-subscriber

# Terminal 3: Start A2A publisher
make run-publisher

Expected output shows successful A2A task processing:

  • Publisher: “Published A2A task”
  • Subscriber: “Task processing completed”
  • Artifacts generated in A2A format

Best Practices

  1. Use A2A Types: Always use A2A message structures for new code
  2. Context Management: Use context_id to group related messages
  3. Proper Parts: Structure content using appropriate Part types
  4. Artifact Returns: Return structured Artifact objects from tasks
  5. Status Updates: Properly manage task lifecycle states

The A2A migration ensures AgentHub remains both standards-compliant and highly scalable through its hybrid EDA+A2A architecture.

2 - Understanding Tasks in Agent2Agent Communication

Tasks are the fundamental unit of work exchange in the Agent2Agent protocol. Deep dive into task semantics, lifecycle, and design patterns.

Understanding Tasks in Agent2Agent Communication

Tasks are the fundamental unit of work exchange in the Agent2Agent protocol. This document provides a deep dive into task semantics, lifecycle, and design patterns.

Task Anatomy

Core Components

Every task in the Agent2Agent system consists of several key components that define its identity, purpose, and execution context:

A2A Task Identity

string id = 1;                         // Unique task identifier
string context_id = 2;                 // Optional conversation context

The id serves as a unique identifier that allows all participants to track the task throughout its lifecycle. It should be globally unique and meaningful for debugging purposes.

The context_id groups related tasks in a conversation or workflow context, enabling sophisticated multi-task coordination patterns.

Task classification in A2A is handled through the initial Message content rather than a separate task_type field, providing more flexibility for complex task descriptions.

A2A Task Status and History

TaskStatus status = 3;                 // Current task status
repeated Message history = 4;          // Message history for this task
repeated Artifact artifacts = 5;       // Task output artifacts
google.protobuf.Struct metadata = 6;   // Task metadata

In A2A, task data is contained within Message content using the structured Part format:

// A2A task request message
message Message {
  string message_id = 1;
  string context_id = 2;
  string task_id = 3;
  Role role = 4;                    // USER (requester) or AGENT (responder)
  repeated Part content = 5;        // Structured task content
}

message Part {
  oneof part {
    string text = 1;               // Text description
    DataPart data = 2;             // Structured data
    FilePart file = 3;             // File references
  }
}
// Example: A2A data analysis task
taskMessage := &a2a.Message{
    MessageId: "msg_" + uuid.New().String(),
    ContextId: "analysis_workflow_123",
    TaskId:    "task_analysis_456",
    Role:      a2a.Role_USER,
    Content: []*a2a.Part{
        {
            Part: &a2a.Part_Text{
                Text: "Please perform trend analysis on Q4 sales data",
            },
        },
        {
            Part: &a2a.Part_Data{
                Data: &a2a.DataPart{
                    Data: analysisParams, // Structured parameters
                    Description: "Analysis configuration",
                },
            },
        },
    },
}

Metadata in A2A tasks provides additional context for execution, auditing, or debugging:

// A2A task metadata
taskMetadata, _ := structpb.NewStruct(map[string]interface{}{
    "workflow_id":     "workflow_abc123",
    "user_id":         "user_456",
    "request_source":  "web_ui",
    "correlation_id":  "trace_789",
    "priority":        "high",
    "expected_duration": "5m",
})

task := &a2a.Task{
    Id:        "task_analysis_456",
    ContextId: "analysis_workflow_123",
    Metadata:  taskMetadata,
}

A2A Agent Coordination

In A2A, agent coordination is handled through the EDA routing metadata:

message AgentEventMetadata {
  string from_agent_id = 1;           // Source agent identifier
  string to_agent_id = 2;             // Target agent ID (empty = broadcast)
  string event_type = 3;              // Event classification
  repeated string subscriptions = 4;   // Topic-based routing tags
  Priority priority = 5;              // Delivery priority
}

This enables flexible routing patterns:

  • from_agent_id identifies the requesting agent
  • to_agent_id can specify a target agent or be empty for broadcast
  • subscriptions enable topic-based routing for specialized agents
  • priority ensures urgent tasks get precedence

A2A Execution Context

A2A handles execution context through the TaskStatus structure:

message TaskStatus {
  TaskState state = 1;                   // SUBMITTED, WORKING, COMPLETED, FAILED, CANCELLED
  Message update = 2;                    // Latest status message
  google.protobuf.Timestamp timestamp = 3; // Status timestamp
}

enum TaskState {
  TASK_STATE_SUBMITTED = 0;
  TASK_STATE_WORKING = 1;
  TASK_STATE_COMPLETED = 2;
  TASK_STATE_FAILED = 3;
  TASK_STATE_CANCELLED = 4;
}

This context helps agents make intelligent scheduling decisions:

  • deadline enables time-sensitive prioritization
  • priority provides explicit urgency ranking
  • created_at enables age-based scheduling policies

Task Lifecycle

1. A2A Task Creation and Publishing

A2A tasks begin their lifecycle when a requesting agent creates a task with an initial message:

// Create A2A task with initial request message
task := &a2a.Task{
    Id:        "task_analysis_" + uuid.New().String(),
    ContextId: "workflow_orchestration_123",
    Status: &a2a.TaskStatus{
        State: a2a.TaskState_TASK_STATE_SUBMITTED,
        Update: &a2a.Message{
            MessageId: "msg_" + uuid.New().String(),
            TaskId:    "task_analysis_" + uuid.New().String(),
            Role:      a2a.Role_USER,
            Content: []*a2a.Part{
                {
                    Part: &a2a.Part_Text{
                        Text: "Please analyze the quarterly sales data for trends",
                    },
                },
                {
                    Part: &a2a.Part_Data{
                        Data: &a2a.DataPart{
                            Data: analysisParams,
                            Description: "Analysis configuration",
                        },
                    },
                },
            },
        },
        Timestamp: timestamppb.Now(),
    },
}

// Publish to AgentHub broker
client.PublishTaskUpdate(ctx, &pb.PublishTaskUpdateRequest{
    Task: task,
    Routing: &pb.AgentEventMetadata{
        FromAgentId: "data_orchestrator",
        ToAgentId:   "data_processor_01", // Optional: specific agent
        EventType:   "task.submitted",
        Priority:    pb.Priority_PRIORITY_HIGH,
    },
})

2. A2A Task Discovery and Acceptance

Agents subscribe to A2A task events and evaluate whether to accept them:

// Agent receives A2A task event
func (a *Agent) evaluateA2ATask(event *pb.AgentEvent) bool {
    task := event.GetTask()
    if task == nil || task.Status.State != a2a.TaskState_TASK_STATE_SUBMITTED {
        return false
    }

    // Analyze task content to understand requirements
    requestMessage := task.Status.Update
    taskDescription := a.extractTaskDescription(requestMessage)

    // Check if agent can handle this task type
    if !a.canHandleTaskType(taskDescription) {
        return false
    }

    // Check capacity constraints
    if a.getCurrentLoad() > a.maxCapacity {
        return false
    }

    // Estimate duration from task content and metadata
    estimatedDuration := a.estimateA2ATaskDuration(task)
    if estimatedDuration > a.maxTaskDuration {
        return false
    }

    return true
}

func (a *Agent) extractTaskDescription(msg *a2a.Message) string {
    for _, part := range msg.Content {
        if textPart := part.GetText(); textPart != "" {
            return textPart
        }
    }
    return ""
}

3. A2A Task Execution with Progress Reporting

Accepted A2A tasks enter the execution phase with regular status updates:

func (a *Agent) executeA2ATask(task *a2a.Task) {
    // Update task to WORKING state
    a.updateTaskStatus(task, a2a.TaskState_TASK_STATE_WORKING, "Task started")

    // Phase 1: Preparation
    a.updateTaskStatus(task, a2a.TaskState_TASK_STATE_WORKING, "Preparing data analysis")
    prepareResult := a.prepareA2AExecution(task)

    // Phase 2: Main processing
    a.updateTaskStatus(task, a2a.TaskState_TASK_STATE_WORKING, "Processing data - 50% complete")
    processResult := a.processA2AData(prepareResult)

    // Phase 3: Finalization
    a.updateTaskStatus(task, a2a.TaskState_TASK_STATE_WORKING, "Finalizing results - 75% complete")
    finalResult := a.finalizeA2AResults(processResult)

    // Completion with artifacts
    a.completeTaskWithArtifacts(task, finalResult)
}

func (a *Agent) updateTaskStatus(task *a2a.Task, state a2a.TaskState, message string) {
    statusUpdate := &a2a.Message{
        MessageId: "msg_" + uuid.New().String(),
        TaskId:    task.Id,
        Role:      a2a.Role_AGENT,
        Content: []*a2a.Part{
            {
                Part: &a2a.Part_Text{
                    Text: message,
                },
            },
        },
    }

    task.Status = &a2a.TaskStatus{
        State:     state,
        Update:    statusUpdate,
        Timestamp: timestamppb.Now(),
    }

    // Publish task update
    a.client.PublishTaskUpdate(context.Background(), &pb.PublishTaskUpdateRequest{
        Task: task,
        Routing: &pb.AgentEventMetadata{
            FromAgentId: a.agentId,
            EventType:   "task.status_update",
        },
    })
}

4. A2A Result Delivery

A2A task completion delivers results through structured artifacts:

func (a *Agent) completeTaskWithArtifacts(task *a2a.Task, resultData interface{}) {
    // Create completion message
    completionMessage := &a2a.Message{
        MessageId: "msg_" + uuid.New().String(),
        TaskId:    task.Id,
        Role:      a2a.Role_AGENT,
        Content: []*a2a.Part{
            {
                Part: &a2a.Part_Text{
                    Text: "Analysis completed successfully",
                },
            },
        },
    }

    // Create result artifact
    resultArtifact := &a2a.Artifact{
        ArtifactId:  "artifact_" + uuid.New().String(),
        Name:        "Analysis Results",
        Description: "Quarterly sales trend analysis",
        Parts: []*a2a.Part{
            {
                Part: &a2a.Part_Data{
                    Data: &a2a.DataPart{
                        Data:        resultData.(structpb.Struct),
                        Description: "Analysis results and metrics",
                    },
                },
            },
        },
    }

    // Update task to completed
    task.Status = &a2a.TaskStatus{
        State:     a2a.TaskState_TASK_STATE_COMPLETED,
        Update:    completionMessage,
        Timestamp: timestamppb.Now(),
    }
    task.Artifacts = append(task.Artifacts, resultArtifact)

    // Publish final task update
    a.client.PublishTaskUpdate(context.Background(), &pb.PublishTaskUpdateRequest{
        Task: task,
        Routing: &pb.AgentEventMetadata{
            FromAgentId: a.agentId,
            EventType:   "task.completed",
        },
    })

    // Publish artifact separately
    a.client.PublishTaskArtifact(context.Background(), &pb.PublishTaskArtifactRequest{
        TaskId:   task.Id,
        Artifact: resultArtifact,
        Routing: &pb.AgentEventMetadata{
            FromAgentId: a.agentId,
            EventType:   "task.artifact",
        },
    })
}

A2A Task Design Patterns

1. Simple A2A Request-Response

The most basic pattern where one agent requests work from another using A2A messages:

Agent A ──[A2A Task]──> AgentHub ──[TaskEvent]──> Agent B
Agent A <─[Artifact]─── AgentHub <─[TaskUpdate]── Agent B

A2A Implementation:

// Agent A creates task
task := &a2a.Task{
    Id: "simple_task_123",
    Status: &a2a.TaskStatus{
        State: a2a.TaskState_TASK_STATE_SUBMITTED,
        Update: &a2a.Message{
            Role: a2a.Role_USER,
            Content: []*a2a.Part{{Part: &a2a.Part_Text{Text: "Convert CSV to JSON"}}},
        },
    },
}

// Agent B responds with artifact
artifact := &a2a.Artifact{
    Name: "Converted Data",
    Parts: []*a2a.Part{{Part: &a2a.Part_File{File: &a2a.FilePart{FileId: "converted.json"}}}},
}

Use cases:

  • File format conversion
  • Simple calculations
  • Data validation
  • Content generation

2. A2A Broadcast Processing

One agent broadcasts a task to multiple potential processors using A2A context-aware routing:

Agent A ──[A2A Task]──> AgentHub ──[TaskEvent]──> Agent B₁
                                ├─[TaskEvent]──> Agent B₂
                                └─[TaskEvent]──> Agent B₃

A2A Implementation:

// Broadcast task with shared context
task := &a2a.Task{
    Id:        "broadcast_task_456",
    ContextId: "parallel_processing_context",
    Status: &a2a.TaskStatus{
        State: a2a.TaskState_TASK_STATE_SUBMITTED,
        Update: &a2a.Message{
            Role: a2a.Role_USER,
            Content: []*a2a.Part{
                {Part: &a2a.Part_Text{Text: "Process data chunk"}},
                {Part: &a2a.Part_Data{Data: &a2a.DataPart{Data: chunkData}}},
            },
        },
    },
}

// Publish without specific target (broadcast)
client.PublishTaskUpdate(ctx, &pb.PublishTaskUpdateRequest{
    Task: task,
    Routing: &pb.AgentEventMetadata{
        FromAgentId: "orchestrator",
        // No ToAgentId = broadcast
        EventType: "task.broadcast",
    },
})

Use cases:

  • Distributed computation
  • Load testing
  • Content distribution
  • Parallel processing

3. A2A Pipeline Processing

Tasks flow through a series of specialized agents using shared A2A context:

Agent A ──[A2A Task₁]──> Agent B ──[A2A Task₂]──> Agent C ──[A2A Task₃]──> Agent D
       <──[Final Artifact]───────────────────────────────────────────────────┘

A2A Implementation:

// Shared context for pipeline
pipelineContext := "data_pipeline_" + uuid.New().String()

// Stage 1: Data extraction
task1 := &a2a.Task{
    Id:        "extract_" + uuid.New().String(),
    ContextId: pipelineContext,
    Status: &a2a.TaskStatus{
        State: a2a.TaskState_TASK_STATE_SUBMITTED,
        Update: &a2a.Message{
            Role: a2a.Role_USER,
            Content: []*a2a.Part{{Part: &a2a.Part_Text{Text: "Extract data from source"}}},
        },
    },
}

// Stage 2: Data transformation (triggered by Stage 1 completion)
task2 := &a2a.Task{
    Id:        "transform_" + uuid.New().String(),
    ContextId: pipelineContext, // Same context
    Status: &a2a.TaskStatus{
        State: a2a.TaskState_TASK_STATE_SUBMITTED,
        Update: &a2a.Message{
            Role: a2a.Role_USER,
            Content: []*a2a.Part{{Part: &a2a.Part_Text{Text: "Transform extracted data"}}},
        },
    },
}

// Context linking enables pipeline coordination

Use cases:

  • Data processing pipelines
  • Image processing workflows
  • Document processing chains
  • ETL operations

4. A2A Hierarchical Decomposition

Complex tasks are broken down into subtasks using A2A context hierarchy:

Agent A ──[A2A ComplexTask]──> Coordinator
                                  ├──[A2A SubTask₁]──> Specialist₁
                                  ├──[A2A SubTask₂]──> Specialist₂
                                  └──[A2A SubTask₃]──> Specialist₃

A2A Implementation:

// Parent task
parentTask := &a2a.Task{
    Id:        "complex_analysis_789",
    ContextId: "business_workflow_123",
    Status: &a2a.TaskStatus{
        State: a2a.TaskState_TASK_STATE_SUBMITTED,
        Update: &a2a.Message{
            Role: a2a.Role_USER,
            Content: []*a2a.Part{{Part: &a2a.Part_Text{Text: "Perform comprehensive business analysis"}}},
        },
    },
}

// Coordinator creates subtasks with hierarchical context
subtask1 := &a2a.Task{
    Id:        "financial_analysis_790",
    ContextId: "business_workflow_123", // Same parent context
    Metadata: map[string]interface{}{
        "parent_task_id": "complex_analysis_789",
        "subtask_type":   "financial",
    },
}

subtask2 := &a2a.Task{
    Id:        "market_analysis_791",
    ContextId: "business_workflow_123", // Same parent context
    Metadata: map[string]interface{}{
        "parent_task_id": "complex_analysis_789",
        "subtask_type":   "market",
    },
}

// Context enables coordination and result aggregation

Use cases:

  • Complex business workflows
  • Multi-step analysis
  • Orchestrated services
  • Batch job coordination

5. Competitive Processing

Multiple agents compete to handle the same task (first-come-first-served):

Agent A ──[Task]──> Broker ──[Task]──> Agent B₁ (accepts)
                           ├─[Task]──> Agent B₂ (rejects)
                           └─[Task]──> Agent B₃ (rejects)

Use cases:

  • Resource-constrained environments
  • Load balancing
  • Fault tolerance
  • Performance optimization

A2A Task Content and Semantics

A2A Message-Based Classification

In A2A, task classification is handled through message content rather than rigid type fields, providing more flexibility:

Content-Based Classification

// Data processing task
message := &a2a.Message{
    Content: []*a2a.Part{
        {Part: &a2a.Part_Text{Text: "Analyze quarterly sales data for trends"}},
        {Part: &a2a.Part_Data{Data: &a2a.DataPart{Description: "Analysis parameters"}}},
    },
}

// Image processing task
message := &a2a.Message{
    Content: []*a2a.Part{
        {Part: &a2a.Part_Text{Text: "Generate product image with specifications"}},
        {Part: &a2a.Part_Data{Data: &a2a.DataPart{Description: "Image requirements"}}},
    },
}

// Notification task
message := &a2a.Message{
    Content: []*a2a.Part{
        {Part: &a2a.Part_Text{Text: "Send completion notification to user"}},
        {Part: &a2a.Part_Data{Data: &a2a.DataPart{Description: "Notification details"}}},
    },
}

Operation-Based Classification

create.*        - Creation operations
update.*        - Modification operations
delete.*        - Removal operations
analyze.*       - Analysis operations
transform.*     - Transformation operations

Complexity-Based Classification

simple.*        - Quick, low-resource tasks
standard.*      - Normal processing tasks
complex.*       - Resource-intensive tasks
background.*    - Long-running batch tasks

A2A Content Design Guidelines

Be Explicit: Include all information needed for execution in structured Parts

// Good: Explicit A2A content
content := []*a2a.Part{
    {
        Part: &a2a.Part_Text{
            Text: "Convert CSV file to JSON format with specific options",
        },
    },
    {
        Part: &a2a.Part_Data{
            Data: &a2a.DataPart{
                Data: structpb.NewStruct(map[string]interface{}{
                    "source_format":   "csv",
                    "target_format":   "json",
                    "include_headers": true,
                    "delimiter":       ",",
                    "encoding":        "utf-8",
                }),
                Description: "Conversion parameters",
            },
        },
    },
    {
        Part: &a2a.Part_File{
            File: &a2a.FilePart{
                FileId:   "source_data.csv",
                Filename: "data.csv",
                MimeType: "text/csv",
            },
        },
    },
}

// Poor: Ambiguous A2A content
content := []*a2a.Part{
    {
        Part: &a2a.Part_Text{
            Text: "Convert file", // Too vague
        },
    },
}

Use Standard Data Types: Leverage common formats for interoperability

// Good: Standard formats
{
  "timestamp": "2024-01-15T10:30:00Z",      // ISO 8601
  "amount": "123.45",                        // String for precision
  "coordinates": {"lat": 40.7128, "lng": -74.0060}
}

Include Validation Information: Help agents validate inputs

{
  "email": "user@example.com",
  "email_format": "rfc5322",
  "max_length": 254,
  "required": true
}

A2A Error Handling and Edge Cases

A2A Task Rejection

Agents should provide meaningful rejection reasons using A2A message format:

func (a *Agent) rejectA2ATask(task *a2a.Task, reason string) {
    // Create rejection message
    rejectionMessage := &a2a.Message{
        MessageId: "msg_" + uuid.New().String(),
        TaskId:    task.Id,
        Role:      a2a.Role_AGENT,
        Content: []*a2a.Part{
            {
                Part: &a2a.Part_Text{
                    Text: "Task rejected: " + reason,
                },
            },
            {
                Part: &a2a.Part_Data{
                    Data: &a2a.DataPart{
                        Data: structpb.NewStruct(map[string]interface{}{
                            "rejection_reason": reason,
                            "agent_id":         a.agentId,
                            "timestamp":        time.Now().Unix(),
                        }),
                        Description: "Rejection details",
                    },
                },
            },
        },
    }

    // Update task status to failed
    task.Status = &a2a.TaskStatus{
        State:     a2a.TaskState_TASK_STATE_FAILED,
        Update:    rejectionMessage,
        Timestamp: timestamppb.Now(),
    }

    a.publishTaskUpdate(task)
}

Common rejection reasons:

  • UNSUPPORTED_TASK_TYPE: Agent doesn’t handle this task type
  • CAPACITY_EXCEEDED: Agent is at maximum capacity
  • DEADLINE_IMPOSSIBLE: Cannot complete within deadline
  • INVALID_PARAMETERS: Task parameters are malformed
  • RESOURCE_UNAVAILABLE: Required external resources unavailable

Timeout Handling

Both requesters and processors should handle timeouts gracefully:

// Requester timeout
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()

select {
case result := <-resultChannel:
    // Process result
case <-ctx.Done():
    // Handle timeout - possibly retry or fail
}

// Processor timeout
func (a *Agent) executeWithTimeout(task *pb.TaskMessage) {
    deadline := task.GetDeadline().AsTime()
    ctx, cancel := context.WithDeadline(context.Background(), deadline)
    defer cancel()

    select {
    case result := <-a.processTask(ctx, task):
        a.publishResult(task, result, pb.TaskStatus_TASK_STATUS_COMPLETED)
    case <-ctx.Done():
        a.publishResult(task, nil, pb.TaskStatus_TASK_STATUS_FAILED, "Deadline exceeded")
    }
}

Partial Results

For long-running tasks, consider supporting partial results:

type PartialResult struct {
    TaskId          string
    CompletedPortion float64    // 0.0 to 1.0
    IntermediateData interface{}
    CanResume       bool
    ResumeToken     string
}

Best Practices

Task Design

  1. Make task types granular but not too fine-grained
  2. Design for idempotency when possible
  3. Include retry information in metadata
  4. Use consistent parameter naming across similar task types
  5. Version your task schemas to enable evolution

Performance Considerations

  1. Batch related tasks when appropriate
  2. Use appropriate priority levels to avoid starvation
  3. Set realistic deadlines based on historical performance
  4. Include resource hints to help with scheduling
  5. Monitor task completion rates to identify bottlenecks

Security Considerations

  1. Validate all task parameters before processing
  2. Sanitize user-provided data in task parameters
  3. Include authorization context in metadata
  4. Log task execution for audit trails
  5. Encrypt sensitive parameters when necessary

A2A tasks form the foundation of Agent2Agent communication, enabling sophisticated distributed processing patterns through structured messages, artifacts, and context-aware coordination. The A2A protocol’s flexible message format and EDA integration provide robust, scalable agent networks with clear semantics and strong observability. Proper A2A task design leverages the protocol’s strengths for building maintainable, interoperable agent systems.

3 -

Agent Discovery Workflow Explained

This document explains how the agent discovery workflow operates in AgentHub, enabling dynamic registration and LLM-based orchestration.

Overview

Agent discovery is the process by which agents dynamically register their capabilities with the Cortex orchestrator, making themselves available for intelligent task delegation via an LLM (Large Language Model).

The Problem This Solves

Traditional multi-agent systems require:

  • Hard-coded agent configurations
  • Static routing rules
  • Manual updates when adding new agents
  • No intelligence in task routing

Agent discovery with Cortex provides:

  • Dynamic registration: Agents announce themselves when they start
  • Intelligent routing: LLM decides which agent to use based on capabilities
  • Zero configuration: No central registry to update
  • Scalable: Add or remove agents without system changes

How It Works

The Five-Step Flow

┌──────────────┐
│ 1. Agent     │ Agent starts and creates an AgentCard
│    Startup   │ describing its capabilities
└──────┬───────┘
       │
       ▼
┌──────────────┐
│ 2. Register  │ Agent calls RegisterAgent RPC
│    with      │ sending the AgentCard to broker
│    Broker    │
└──────┬───────┘
       │
       ▼
┌──────────────┐
│ 3. Event     │ Broker publishes AgentCardEvent
│    Publishing│ broadcasting to all subscribers
└──────┬───────┘
       │
       ▼
┌──────────────┐
│ 4. Cortex    │ Cortex receives event and stores
│    Discovery │ agent in its registry
└──────┬───────┘
       │
       ▼
┌──────────────┐
│ 5. LLM       │ Agent is now available in LLM
│    Integration│ prompts for intelligent delegation
└──────────────┘

Step 1: Agent Startup

When an agent starts, it creates an AgentCard that describes:

agentCard := &pb.AgentCard{
    Name:        "agent_translator",
    Description: "Language translation service",
    Version:     "1.0.0",
    Skills: []*pb.AgentSkill{
        {
            Name: "Text Translation",
            Description: "Translates text between languages",
            Examples: [
                "Translate this to Spanish",
                "Convert to French",
            ],
        },
    },
}

Key Components:

  • Name: Unique identifier (used for routing)
  • Description: What the agent does (helps LLM understand)
  • Skills: Specific capabilities with examples (used for matching)

Step 2: Registration with Broker

The agent registers by calling the broker’s RegisterAgent RPC:

client.RegisterAgent(ctx, &pb.RegisterAgentRequest{
    AgentCard:     agentCard,
    Subscriptions: []string{"translation_request"},
})

What happens:

  1. Broker validates the AgentCard
  2. Stores agent in its registry: registeredAgents[agentID] = card
  3. Returns success response

Step 3: Event Publishing

The broker immediately publishes an AgentCardEvent:

event := &pb.AgentEvent{
    EventId:   "agent_registered_translator_...",
    Timestamp: now(),
    Payload: &pb.AgentEvent_AgentCard{
        AgentCard: &pb.AgentCardEvent{
            AgentId:   "agent_translator",
            AgentCard: agentCard,
            EventType: "registered",
        },
    },
    Routing: &pb.AgentEventMetadata{
        FromAgentId: "agent_translator",
        ToAgentId:   "", // Broadcast
        EventType:   "agent.registered",
        Priority:    PRIORITY_HIGH,
    },
}

Routing characteristics:

  • Broadcast to all subscribers (empty ToAgentId)
  • High priority (processed immediately)
  • Event type clearly marked as “agent.registered”

Step 4: Cortex Discovery

Cortex subscribes to agent events:

stream, _ := client.SubscribeToAgentEvents(ctx, &pb.SubscribeToAgentEventsRequest{
    AgentId:    "cortex",
    EventTypes: []string{"agent.registered", "agent.updated"},
})

When receiving an agent card event, Cortex:

func handleAgentCardEvent(event *pb.AgentCardEvent) {
    agentID := event.GetAgentId()
    agentCard := event.GetAgentCard()

    // Store agent
    cortex.RegisterAgent(agentID, agentCard)

    // Log skills for visibility
    log.Info("Agent registered",
        "agent_id", agentID,
        "skills", extractSkillNames(agentCard.Skills))
}

Result: Agent is now in Cortex’s registeredAgents map.

Step 5: LLM Integration

When a user sends a request, Cortex queries the LLM:

decision, _ := llm.Decide(
    conversationHistory,
    availableAgents,  // Includes our new agent!
    newUserMessage,
)

The LLM sees:

Available agents:
- agent_translator: Language translation service
  Skills:
    * Text Translation: Translates text between languages
      Examples: "Translate this to Spanish", "Convert to French"

Decision making:

  1. User asks: “Can you translate this to Spanish?”
  2. LLM sees “agent_translator” with matching examples
  3. LLM decides: Delegate to agent_translator
  4. Cortex sends task to agent_translator
  5. Agent processes and responds
  6. Cortex synthesizes final response

Message Flow Diagram

sequenceDiagram
    participant A as Translation Agent
    participant B as Broker
    participant C as Cortex
    participant L as LLM (VertexAI)
    participant U as User

    Note over A: Step 1: Startup
    A->>A: Create AgentCard

    Note over A,B: Step 2: Registration
    A->>B: RegisterAgent(card)
    B->>B: Store in registry

    Note over B: Step 3: Event Publishing
    B->>C: AgentCardEvent (broadcast)

    Note over C: Step 4: Discovery
    C->>C: RegisterAgent(id, card)
    C->>C: total_agents++

    Note over U,L: Step 5: LLM Integration
    U->>C: "Translate to Spanish"
    C->>L: Decide(availableAgents)

    Note over L: Sees translator agent<br/>with matching examples

    L-->>C: {delegate: agent_translator}
    C->>A: Task message
    A->>A: Process translation
    A->>C: Result
    C->>L: Synthesize
    L-->>C: Final response
    C->>U: "Here's the Spanish: ..."

Technical Implementation Details

Thread Safety

Agent registration is thread-safe:

type AgentHubService struct {
    registeredAgents map[string]*pb.AgentCard
    agentsMu         sync.RWMutex
}

func (s *AgentHubService) RegisterAgent(...) {
    s.agentsMu.Lock()
    s.registeredAgents[agentID] = card
    s.agentsMu.Unlock()
}

Multiple agents can register concurrently without conflicts.

Event Delivery

Events are delivered asynchronously:

for _, subChan := range targetChannels {
    go func(ch chan *pb.AgentEvent) {
        select {
        case ch <- event:
            // Delivered
        case <-time.After(5 * time.Second):
            // Timeout
        }
    }(subChan)
}

Benefits:

  • Non-blocking: Broker doesn’t wait for all deliveries
  • Resilient: Timeout prevents hanging
  • Concurrent: Multiple subscribers receive events in parallel

LLM Prompt Generation

Cortex builds prompts dynamically:

func buildOrchestrationPrompt(availableAgents []*pb.AgentCard) string {
    prompt := "Available agents:\n"

    for _, agent := range availableAgents {
        prompt += fmt.Sprintf("- %s: %s\n",
            agent.Name, agent.Description)

        for _, skill := range agent.Skills {
            prompt += fmt.Sprintf("  Skills:\n")
            prompt += fmt.Sprintf("    * %s: %s\n",
                skill.Name, skill.Description)
        }
    }

    return prompt
}

Updated automatically when new agents register.

Timing and Performance

Typical timings for agent discovery:

Agent startup:        100-200ms
RegisterAgent RPC:    < 10ms
Event publishing:     < 5ms
Event delivery:       < 50ms
Cortex processing:    < 10ms
Total discovery time: < 300ms

Fast enough that agents are available for routing within milliseconds of starting.

Error Handling

Registration Failures

If registration fails:

_, err := client.RegisterAgent(ctx, req)
if err != nil {
    log.Error("Registration failed", "error", err)
    // Agent should retry or exit
    panic(err)
}

Common causes:

  • Broker not running
  • Network issues
  • Invalid AgentCard (empty name)

Event Delivery Failures

If event delivery fails:

if err := s.routeEvent(ctx, event); err != nil {
    log.Warn("Event routing failed", "error", err)
    // Continue anyway - registration still succeeded
}

Graceful degradation: Registration succeeds even if event routing fails.

Cortex Not Subscribed

If Cortex isn’t subscribed yet:

  • Events are still published
  • Cortex can query GetAgentCard() RPC later
  • Or register when Cortex starts

Resilient: System handles various startup orders.

Observability

Broker Logs

level=INFO msg="Agent registered" agent_id=agent_translator
level=DEBUG msg="Routing event to subscribers"
    event_type=agent.registered subscriber_count=2
level=DEBUG msg="Event delivered to subscriber"

Cortex Logs

level=INFO msg="Received agent card event"
    agent_id=agent_translator event_type=registered
level=INFO msg="Agent skills registered"
    skills="[Text Translation: Translates...]"
level=INFO msg="Agent registered with Cortex orchestrator"
    total_agents=3

Distributed Tracing

Agent registration creates trace spans:

agent_registered_translator
  └─ broker.route_event
      ├─ deliver_to_cortex
      └─ deliver_to_monitor

Visibility into the entire discovery flow.

Lifecycle Management

Agent Startup Sequence

1. Create AgentHub client
2. Connect to broker
3. Create AgentCard
4. Call RegisterAgent
5. Subscribe to messages
6. Enter processing loop

Agent Shutdown

Currently agents don’t explicitly unregister. For graceful shutdown:

// In future enhancement:
defer client.UnregisterAgent(ctx, &pb.UnregisterAgentRequest{
    AgentId: myAgentID,
})

This would trigger an “agent.unregistered” event.

Agent Updates

To update capabilities:

// Modify AgentCard
agentCard.Skills = append(agentCard.Skills, newSkill)

// Re-register
client.RegisterAgent(ctx, &pb.RegisterAgentRequest{
    AgentCard: agentCard,
})

// Triggers "agent.updated" event

Cortex receives update and refreshes its registry.

Comparison with Other Patterns

vs. Service Discovery (Consul, etcd)

Agent Discovery:

  • Includes capability metadata (skills)
  • Optimized for LLM consumption
  • Event-driven notification
  • Rich semantic information

Service Discovery:

  • Network location only
  • Health checks
  • Static metadata
  • Pull-based queries

vs. API Gateway

Agent Discovery:

  • Dynamic routing based on content
  • LLM makes intelligent decisions
  • Supports complex multi-step workflows

API Gateway:

  • Path-based routing
  • Static configuration
  • Single request-response

vs. Message Queues

Agent Discovery:

  • Agents know their capabilities
  • Centralized intelligence (Cortex)
  • Rich metadata for decisions

Message Queues:

  • Topic-based routing
  • No central intelligence
  • Minimal metadata

Design Decisions

Why Broadcast Events?

Decision: Publish agent cards to all subscribers

Alternatives considered:

  • Point-to-point to Cortex only
  • Store-and-query model

Rationale:

  • Multiple orchestrators can coexist
  • Monitoring agents can track all agents
  • Extensible for future use cases
  • Low overhead (events are small)

Why High Priority?

Decision: Agent registration events use PRIORITY_HIGH

Rationale:

  • New agents should be available quickly
  • User requests may come immediately
  • Discovery is time-sensitive
  • Low volume (not many registrations)

Why Skills with Examples?

Decision: Include example user requests in skills

Rationale:

  • LLMs learn by example
  • Natural language is ambiguous
  • Examples disambiguate capabilities
  • Improves matching accuracy

Future Enhancements

See AGENT_DECIDE.md for planned improvements:

  • Agent Health Monitoring: Track agent availability
  • Agent Deregistration: Explicit removal from registry
  • Agent Versioning: Support multiple versions simultaneously
  • Capability Queries: Search agents by capability
  • Load Balancing: Distribute work among multiple instances

Conclusion

The agent discovery workflow enables:

  1. Zero-configuration agent deployment
  2. Intelligent routing via LLM
  3. Dynamic scaling of agent pools
  4. Automatic orchestration based on capabilities
  5. Flexible, extensible multi-agent systems

This architecture supports truly autonomous, self-organizing agent networks that can adapt to changing requirements without manual intervention.

4 -

The Agent2Agent Protocol and AgentHub Implementation

This document explores the core principles of Google’s Agent2Agent protocol and how AgentHub implements a communication broker based on these concepts. We distinguish between the Agent2Agent protocol specification (task structures and communication patterns) and our custom AgentHub broker implementation.

Agent2Agent vs AgentHub: What’s What

Agent2Agent Protocol (Google)

The Agent2Agent protocol defines:

  • Task Message Structures: TaskMessage, TaskResult, TaskProgress with their fields and semantics
  • Task Status and Priority Enums: Standardized task lifecycle and priority levels
  • Communication Patterns: Asynchronous task delegation and result reporting concepts

AgentHub Implementation (This Project)

AgentHub provides:

  • Event Bus Broker: Centralized gRPC service that routes tasks between agents
  • Pub/Sub Architecture: Publisher-subscriber pattern for task distribution
  • Subscription Mechanisms: SubscribeToTasks, SubscribeToTaskResults, SubscribeToTaskProgress methods
  • Agent Implementations: Sample publisher and subscriber agents demonstrating the protocol

Philosophy and Core Concepts

Beyond Simple Request-Response

Traditional software architectures rely heavily on synchronous request-response patterns where a client requests a service and waits for an immediate response. While effective for simple operations, this pattern has limitations when dealing with:

  • Complex, multi-step processes that require coordination between multiple specialized services
  • Long-running operations that may take minutes or hours to complete
  • Dynamic workload distribution where the best processor for a task may vary over time
  • Autonomous decision-making where agents need to collaborate without central coordination

The Agent2Agent protocol addresses these limitations by defining task structures and communication patterns for autonomous agents. AgentHub implements a broker-based system that enables agents to communicate using Agent2Agent-inspired task structures:

  1. Delegating work to other agents based on their capabilities
  2. Accepting and processing tasks according to their specializations
  3. Reporting progress during long-running operations
  4. Making collaborative decisions about task distribution and execution

Autonomous Collaboration

In an Agent2Agent system, each agent operates with a degree of autonomy, making decisions about:

  • Which tasks to accept based on current capacity and capabilities
  • How to prioritize work when multiple tasks are pending
  • When to delegate subtasks to other specialized agents
  • How to report progress and handle failures

This autonomy enables the system to be more resilient, scalable, and adaptive compared to centrally-controlled architectures.

Key Design Principles

1. Asynchronous Communication

Agent2Agent communication is fundamentally asynchronous. When Agent A requests work from Agent B:

  • Agent A doesn’t block waiting for completion
  • Agent B can process the task when resources are available
  • Progress updates provide visibility into long-running operations
  • Results are delivered when the work is complete

This asynchronicity enables:

  • Better resource utilization as agents aren’t blocked waiting
  • Improved scalability as systems can handle more concurrent operations
  • Enhanced resilience as temporary agent unavailability doesn’t block the entire system

2. Rich Task Semantics (Agent2Agent Protocol)

The Agent2Agent protocol defines rich task message structures that AgentHub implements:

message TaskMessage {
  string task_id = 1;                    // Unique identifier for tracking
  string task_type = 2;                  // Semantic type (e.g., "data_analysis")
  google.protobuf.Struct parameters = 3; // Flexible parameters
  string requester_agent_id = 4;         // Who requested the work
  string responder_agent_id = 5;         // Who should do the work (optional)
  google.protobuf.Timestamp deadline = 6; // When it needs to be done
  Priority priority = 7;                 // How urgent it is
  google.protobuf.Struct metadata = 8;   // Additional context
}

This rich structure enables:

  • Intelligent routing based on task type and agent capabilities
  • Priority-based scheduling to ensure urgent tasks are handled first
  • Deadline awareness for time-sensitive operations
  • Context preservation for better decision-making

3. Explicit Progress Tracking

Long-running tasks benefit from explicit progress reporting:

message TaskProgress {
  string task_id = 1;                    // Which task this refers to
  TaskStatus status = 2;                 // Current status
  string progress_message = 3;           // Human-readable description
  int32 progress_percentage = 4;         // Quantitative progress (0-100)
  google.protobuf.Struct progress_data = 5; // Structured progress information
}

This enables:

  • Visibility into system operations for monitoring and debugging
  • User experience improvements with real-time progress indicators
  • Resource planning by understanding how long operations typically take
  • Early failure detection when progress stalls unexpectedly

4. Flexible Agent Addressing

The protocol supports multiple addressing patterns:

  • Direct addressing: Tasks sent to specific agents by ID
  • Broadcast addressing: Tasks sent to all capable agents
  • Capability-based routing: Tasks routed based on agent capabilities
  • Load-balanced routing: Tasks distributed among agents with similar capabilities

This flexibility enables different architectural patterns within the same system.

Architectural Patterns

Microservices Enhancement

In a microservices architecture, Agent2Agent can enhance service communication by:

  • Replacing synchronous HTTP calls with asynchronous task delegation
  • Adding progress visibility to long-running service operations
  • Enabling service composition through task chaining
  • Improving resilience through task retry and timeout mechanisms

Event-Driven Architecture Integration

Agent2Agent complements event-driven architectures by:

  • Adding structure to event processing with explicit task semantics
  • Enabling bidirectional communication where events can trigger tasks that produce responses
  • Providing progress tracking for complex event processing workflows
  • Supporting task-based coordination alongside pure event broadcasting

Workflow Orchestration

Complex business processes can be modeled as Agent2Agent workflows:

  1. Process Initiation: A workflow agent receives a high-level business request
  2. Task Decomposition: The request is broken down into specific tasks
  3. Agent Coordination: Tasks are distributed to specialized agents
  4. Progress Aggregation: Individual task progress is combined into overall workflow status
  5. Result Assembly: Task results are combined into a final business outcome

Benefits and Trade-offs

Benefits

Scalability: Asynchronous operation and agent autonomy enable horizontal scaling without central bottlenecks.

Resilience: Agent failures don’t cascade as easily since tasks can be retried or redistributed.

Flexibility: New agent types can be added without modifying existing agents.

Observability: Rich task semantics and progress reporting provide excellent visibility into system operations.

Modularity: Agents can be developed, deployed, and scaled independently.

Trade-offs

Complexity: The system requires more sophisticated error handling and state management compared to simple request-response patterns.

Latency: For simple operations, the overhead of task creation and routing may add latency compared to direct calls.

Debugging: Distributed, asynchronous operations can be more challenging to debug than synchronous call chains.

Consistency: Managing data consistency across asynchronous agent operations requires careful design.

When to Use Agent2Agent

Agent2Agent is particularly well-suited for:

Complex Processing Pipelines

When work involves multiple steps that can be performed by different specialized agents:

  • Data ingestion → validation → transformation → analysis → reporting
  • Image upload → virus scan → thumbnail generation → metadata extraction
  • Order processing → inventory check → payment processing → fulfillment

Long-Running Operations

When operations take significant time and users need progress feedback:

  • Large file processing
  • Machine learning model training
  • Complex data analysis
  • Batch job processing

Dynamic Load Distribution

When workload characteristics vary and different agents may be better suited for different tasks:

  • Multi-tenant systems with varying customer requirements
  • Resource-intensive operations that need specialized hardware
  • Geographic distribution where local processing is preferred

System Integration

When connecting heterogeneous systems that need to coordinate:

  • Third-party service coordination
  • Cross-platform workflows

Comparison with Other Patterns

vs. Message Queues

Traditional message queues provide asynchronous communication but lack:

  • Rich task semantics
  • Progress tracking
  • Bidirectional result delivery
  • Priority and deadline awareness

vs. RPC/HTTP APIs

RPC and HTTP APIs provide structured communication but are typically:

  • Synchronous (blocking)
  • Lacking progress visibility
  • Point-to-point rather than flexible routing
  • Without built-in retry and timeout semantics

vs. Event Sourcing

Event sourcing provides audit trails and state reconstruction but:

  • Focuses on state changes rather than work coordination
  • Lacks explicit progress tracking
  • Doesn’t provide direct task completion feedback
  • Requires more complex query patterns for current state

The SubAgent Library: Simplifying Agent Development

While the Agent2Agent protocol and AgentHub broker provide powerful capabilities for building distributed agent systems, implementing agents from scratch requires significant boilerplate code. The SubAgent library addresses this by providing a high-level abstraction that handles infrastructure concerns, letting developers focus on business logic.

The Problem: Too Much Boilerplate

Traditional agent implementation requires:

  • ~200+ lines of setup code: gRPC client configuration, connection management, health checks
  • A2A protocol compliance: Correct AgentCard structure with all required fields
  • Subscription management: Setting up task streams and handling lifecycle
  • Observability integration: Manual tracing span creation, logging, metrics
  • Error handling: Graceful shutdown, signal handling, resource cleanup

This creates several issues:

  • High barrier to entry: New agents require deep knowledge of the infrastructure
  • Code duplication: Every agent reimplements the same patterns
  • Maintenance burden: Infrastructure changes require updates across all agents
  • Inconsistent quality: Some agents may have better observability or error handling than others

The Solution: Infrastructure as a Library

The SubAgent library encapsulates all infrastructure concerns into a simple, composable API:

// 1. Configure your agent
config := &subagent.Config{
    AgentID:     "my_agent",
    Name:        "My Agent",
    Description: "Does something useful",
}

// 2. Create and register skills
agent, _ := subagent.New(config)
agent.MustAddSkill("Skill Name", "Description", handlerFunc)

// 3. Run (everything else is automatic)
agent.Run(ctx)

This reduces agent implementation from ~200 lines to ~50 lines (75% reduction), letting developers focus entirely on their domain logic.

Architecture

The SubAgent library implements a layered architecture:

┌─────────────────────────────────────────┐
│         Your Business Logic             │
│    (Handler Functions: ~30 lines)       │
├─────────────────────────────────────────┤
│         SubAgent Library                │
│  - Config & Validation                  │
│  - AgentCard Creation (A2A compliant)   │
│  - Task Subscription & Routing          │
│  - Automatic Observability              │
│  - Lifecycle Management                 │
├─────────────────────────────────────────┤
│      AgentHub Client Library            │
│  - gRPC Connection                      │
│  - Message Publishing/Subscription      │
│  - TraceManager, Metrics, Logging       │
├─────────────────────────────────────────┤
│         AgentHub Broker                 │
│  - Event Routing                        │
│  - Agent Registry                       │
│  - Task Distribution                    │
└─────────────────────────────────────────┘

Key Features

1. Declarative Configuration

Instead of imperative setup code, agents use declarative configuration:

config := &subagent.Config{
    AgentID:     "agent_translator",     // Required
    Name:        "Translation Agent",    // Required
    Description: "Translates text",      // Required
    Version:     "1.0.0",                // Optional, defaults
    HealthPort:  "8087",                 // Optional, defaults
}

The library:

  • Validates all required fields
  • Applies sensible defaults for optional fields
  • Returns clear error messages for configuration issues

2. Skill-Based Programming Model

Agents define capabilities as “skills” - discrete units of functionality:

agent.MustAddSkill(
    "Language Translation",              // Name (shown to LLM)
    "Translates text between languages", // Description
    translateHandler,                    // Implementation
)

Each skill maps to a handler function with a clear signature:

func (ctx, task, message) -> (artifact, state, errorMessage)

This model:

  • Encourages single-responsibility design
  • Makes capabilities explicit and discoverable
  • Simplifies testing (handlers are pure functions)
  • Enables skill-based task routing

3. Automatic A2A Compliance

The library generates complete, A2A-compliant AgentCards:

// Developer writes:
agent.MustAddSkill("Translate", "Translates text", handler)

// Library generates:
&pb.AgentCard{
    ProtocolVersion: "0.2.9",
    Name:            "agent_translator",
    Description:     "Translation Agent",
    Version:         "1.0.0",
    Skills: []*pb.AgentSkill{
        {
            Id:          "skill_0",
            Name:        "Translate",
            Description: "Translates text",
            Tags:        []string{"Translate"},
            InputModes:  []string{"text/plain"},
            OutputModes: []string{"text/plain"},
        },
    },
    Capabilities: &pb.AgentCapabilities{
        Streaming:         false,
        PushNotifications: false,
    },
}

This ensures all agents follow protocol standards without manual effort.

4. Built-In Observability

Every task execution is automatically wrapped with observability:

Tracing:

// Automatic span creation for each task
taskSpan := traceManager.StartSpan(ctx, "agent.{agentID}.handle_task")
traceManager.AddA2ATaskAttributes(taskSpan, taskID, skillName, contextID, ...)
traceManager.SetSpanSuccess(taskSpan)  // or RecordError()

Logging:

// Automatic structured logging
logger.InfoContext(ctx, "Processing task", "task_id", taskID, "skill", skillName)
logger.ErrorContext(ctx, "Task failed", "error", err)

Metrics:

  • Task processing duration
  • Success/failure counts
  • Active task count
  • (via AgentHubClient metrics)

Developers get full distributed tracing and logging without writing any observability code.

5. Lifecycle Management

The library handles the complete agent lifecycle:

Startup:

  1. Validate configuration
  2. Connect to broker (with retries)
  3. Register AgentCard
  4. Subscribe to tasks
  5. Start health check server
  6. Signal “ready”

Runtime:

  1. Receive tasks from broker
  2. Route to appropriate handler
  3. Execute with tracing/logging
  4. Publish results
  5. Handle errors gracefully

Shutdown:

  1. Catch SIGINT/SIGTERM signals
  2. Stop accepting new tasks
  3. Wait for in-flight tasks (with timeout)
  4. Close broker connection
  5. Cleanup resources
  6. Exit cleanly

All automatically - developers never write lifecycle code.

Design Patterns

The Handler Pattern

Handlers are pure functions that transform inputs to outputs:

func myHandler(ctx context.Context, task *pb.Task, message *pb.Message)
    (*pb.Artifact, pb.TaskState, string) {

    // Extract input
    input := extractInput(message)

    // Validate
    if err := validate(input); err != nil {
        return nil, TASK_STATE_FAILED, err.Error()
    }

    // Process
    result := process(ctx, input)

    // Create artifact
    artifact := createArtifact(result)

    return artifact, TASK_STATE_COMPLETED, ""
}

This pattern:

  • Testable: Pure functions are easy to unit test
  • Composable: Handlers can call other functions
  • Error handling: Explicit return of state and error message
  • Context-aware: Receives context for cancellation and tracing

The Configuration Pattern

Configuration is separated from code:

// Development
config := &subagent.Config{
    AgentID:    "my_agent",
    HealthPort: "8080",
}

// Production (from environment)
config := &subagent.Config{
    AgentID:    os.Getenv("AGENT_ID"),
    BrokerAddr: os.Getenv("BROKER_ADDR"),
    HealthPort: os.Getenv("HEALTH_PORT"),
}

This enables:

  • Different configs for dev/staging/prod
  • Easy testing with mock configs
  • Container-friendly (12-factor app)

Benefits

For Developers:

  • Faster development: 75% less code to write
  • Lower complexity: Focus on business logic, not infrastructure
  • Better quality: Automatic best practices (observability, error handling)
  • Easier testing: Handler functions are pure and testable
  • Clearer structure: Skill-based organization is intuitive

For Operations:

  • Consistent observability: All agents have same tracing/logging
  • Standard health checks: Uniform health endpoints
  • Predictable behavior: Lifecycle management is consistent
  • Easy monitoring: Metrics are built-in
  • Reliable shutdown: Graceful handling is automatic

For the System:

  • Better integration: All agents follow same patterns
  • Easier debugging: Consistent trace structure across agents
  • Simplified maintenance: Library updates improve all agents
  • Reduced errors: Less custom code means fewer bugs

Evolution Path

The SubAgent library provides a clear evolution path for agent development:

Phase 1: Simple Agents (Current)

  • Single skills, synchronous processing
  • Text input/output
  • Uses library defaults

Phase 2: Advanced Agents

  • Multiple skills per agent
  • Streaming responses
  • Custom capabilities
  • Extended AgentCard fields

Phase 3: Specialized Agents

  • Custom observability (additional traces/metrics)
  • Advanced error handling
  • Multi-modal input/output
  • Stateful processing

The library supports all phases through its extensibility points (GetClient(), GetLogger(), custom configs).

Comparison with Manual Implementation

AspectManual ImplementationSubAgent Library
Lines of Code~200 lines setup~50 lines total
Configuration50+ lines imperative10 lines declarative
AgentCardManual struct creationAutomatic generation
ObservabilityManual span/log callsAutomatic wrapping
LifecycleCustom signal handlingBuilt-in management
Error HandlingScattered throughoutCentralized in library
TestingMust mock infrastructureTest handlers directly
MaintenancePer-agent updates neededLibrary update benefits all
Learning CurveHigh (need infrastructure knowledge)Low (focus on logic)
Time to First AgentSeveral hoursUnder 30 minutes

Real-World Impact

The Echo Agent demonstrates the library’s impact:

Before SubAgent Library (211 lines):

  • Manual client setup: 45 lines
  • AgentCard creation: 30 lines
  • Task subscription: 60 lines
  • Handler implementation: 50 lines
  • Lifecycle management: 26 lines

With SubAgent Library (82 lines):

  • Configuration: 10 lines
  • Skill registration: 5 lines
  • Handler implementation: 50 lines
  • Run: 2 lines
  • Everything else: automatic

The business logic (50 lines) stays the same, but infrastructure code (161 lines) is eliminated.

When to Use SubAgent Library

Use SubAgent Library when:

  • Building new agents from scratch
  • Agent has 1-10 skills with clear boundaries
  • Standard A2A protocol is sufficient
  • You want consistent observability across agents
  • Quick development time is important

Consider Manual Implementation when:

  • Highly custom protocol requirements
  • Need very specific lifecycle control
  • Existing agent migration (may not be worth refactoring)
  • Experimental/research agents with non-standard patterns

For 99% of agent development, the SubAgent library is the right choice.

Future Evolution

The Agent2Agent principle opens possibilities for:

Intelligent Agent Networks

Agents that learn about each other’s capabilities and performance characteristics to make better delegation decisions.

Self-Organizing Systems

Agent networks that automatically reconfigure based on workload patterns and agent availability.

Cross-Organization Collaboration

Extending Agent2Agent protocols across organizational boundaries for B2B workflow automation.

AI Agent Integration

Natural integration points for AI agents that can understand task semantics and make autonomous decisions about task acceptance and delegation.

The Agent2Agent principle represents a foundational shift toward more intelligent, autonomous, and collaborative software systems that can handle the complexity of modern distributed applications while providing the visibility and control that operators need.