1 - A2A-Compliant AgentHub API Reference
Complete technical reference for the A2A-compliant AgentHub API, including all gRPC services, message types, and operational details.
A2A-Compliant AgentHub API Reference
This document provides complete technical reference for the Agent2Agent (A2A) protocol-compliant AgentHub API, including all gRPC services, message types, and operational details.
gRPC Service Definition
The AgentHub broker implements the AgentHub service as defined in proto/eventbus.proto:
service AgentHub {
// ===== A2A Message Publishing (EDA style) =====
// PublishMessage submits an A2A message for delivery through the broker
rpc PublishMessage(PublishMessageRequest) returns (PublishResponse);
// PublishTaskUpdate notifies subscribers about A2A task state changes
rpc PublishTaskUpdate(PublishTaskUpdateRequest) returns (PublishResponse);
// PublishTaskArtifact delivers A2A task output artifacts to subscribers
rpc PublishTaskArtifact(PublishTaskArtifactRequest) returns (PublishResponse);
// ===== A2A Event Subscriptions (EDA style) =====
// SubscribeToMessages creates a stream of A2A message events for an agent
rpc SubscribeToMessages(SubscribeToMessagesRequest) returns (stream AgentEvent);
// SubscribeToTasks creates a stream of A2A task events for an agent
rpc SubscribeToTasks(SubscribeToTasksRequest) returns (stream AgentEvent);
// SubscribeToAgentEvents creates a unified stream of all events for an agent
rpc SubscribeToAgentEvents(SubscribeToAgentEventsRequest) returns (stream AgentEvent);
// ===== A2A Task Management (compatible with A2A spec) =====
// GetTask retrieves the current state of an A2A task by ID
rpc GetTask(GetTaskRequest) returns (a2a.Task);
// CancelTask cancels an active A2A task and notifies subscribers
rpc CancelTask(CancelTaskRequest) returns (a2a.Task);
// ListTasks returns A2A tasks matching the specified criteria
rpc ListTasks(ListTasksRequest) returns (ListTasksResponse);
// ===== Agent Discovery (A2A compatible) =====
// GetAgentCard returns the broker's A2A agent card for discovery
rpc GetAgentCard(google.protobuf.Empty) returns (a2a.AgentCard);
// RegisterAgent registers an agent with the broker for event routing
rpc RegisterAgent(RegisterAgentRequest) returns (RegisterAgentResponse);
}
A2A Message Types
Core A2A Types
A2A Message
Represents an A2A-compliant message for agent communication.
message Message {
string message_id = 1; // Required: Unique message identifier
string context_id = 2; // Optional: Conversation context
string task_id = 3; // Optional: Associated task
Role role = 4; // Required: USER or AGENT
repeated Part content = 5; // Required: Message content parts
google.protobuf.Struct metadata = 6; // Optional: Additional metadata
repeated string extensions = 7; // Optional: Protocol extensions
}
Field Details:
message_id: Must be unique across all messages. Generated automatically if not providedcontext_id: Groups related messages in a conversation or workflowtask_id: Links message to a specific A2A taskrole: Indicates whether message is from USER (requesting agent) or AGENT (responding agent)content: Array of A2A Part structures containing the actual message contentmetadata: Additional context for routing, processing, or debuggingextensions: Protocol extension identifiers for future compatibility
A2A Part
Represents content within an A2A message.
message Part {
oneof part {
string text = 1; // Text content
DataPart data = 2; // Structured data
FilePart file = 3; // File reference
}
}
message DataPart {
google.protobuf.Struct data = 1; // Structured data content
string description = 2; // Optional data description
}
message FilePart {
string file_id = 1; // File identifier or URI
string filename = 2; // Original filename
string mime_type = 3; // MIME type
int64 size_bytes = 4; // File size in bytes
google.protobuf.Struct metadata = 5; // Additional file metadata
}
A2A Task
Represents an A2A-compliant task with lifecycle management.
message Task {
string id = 1; // Required: Task identifier
string context_id = 2; // Optional: Conversation context
TaskStatus status = 3; // Required: Current task status
repeated Message history = 4; // Message history for this task
repeated Artifact artifacts = 5; // Task output artifacts
google.protobuf.Struct metadata = 6; // Task metadata
}
message TaskStatus {
TaskState state = 1; // Current task state
Message update = 2; // Status update message
google.protobuf.Timestamp timestamp = 3; // Status timestamp
}
enum TaskState {
TASK_STATE_SUBMITTED = 0; // Task created and submitted
TASK_STATE_WORKING = 1; // Task in progress
TASK_STATE_COMPLETED = 2; // Task completed successfully
TASK_STATE_FAILED = 3; // Task failed with error
TASK_STATE_CANCELLED = 4; // Task cancelled
}
A2A Artifact
Represents structured output from completed tasks.
message Artifact {
string artifact_id = 1; // Required: Artifact identifier
string name = 2; // Human-readable name
string description = 3; // Artifact description
repeated Part parts = 4; // Artifact content parts
google.protobuf.Struct metadata = 5; // Artifact metadata
}
EDA Event Wrapper Types
AgentEvent
Wraps A2A messages for Event-Driven Architecture transport.
message AgentEvent {
string event_id = 1; // Unique event identifier
google.protobuf.Timestamp timestamp = 2; // Event timestamp
// A2A-compliant payload
oneof payload {
a2a.Message message = 10; // A2A Message
a2a.Task task = 11; // A2A Task
TaskStatusUpdateEvent status_update = 12; // Task status change
TaskArtifactUpdateEvent artifact_update = 13; // Artifact update
}
// EDA routing metadata
AgentEventMetadata routing = 20;
// Observability context
string trace_id = 30;
string span_id = 31;
}
Provides routing and delivery information for events.
message AgentEventMetadata {
string from_agent_id = 1; // Source agent identifier
string to_agent_id = 2; // Target agent ID (empty = broadcast)
string event_type = 3; // Event classification
repeated string subscriptions = 4; // Topic-based routing tags
Priority priority = 5; // Delivery priority
}
Request/Response Messages
PublishMessageRequest
message PublishMessageRequest {
a2a.Message message = 1; // A2A message to publish
AgentEventMetadata routing = 2; // EDA routing info
}
SubscribeToTasksRequest
message SubscribeToTasksRequest {
string agent_id = 1; // Agent ID for subscription
repeated string task_types = 2; // Optional task type filter
repeated a2a.TaskState states = 3; // Optional state filter
}
GetTaskRequest
message GetTaskRequest {
string task_id = 1; // Task identifier
int32 history_length = 2; // History limit (optional)
}
API Operations
Publishing A2A Messages
PublishMessage
Publishes an A2A message for delivery through the EDA broker.
Go Example:
// Create A2A message content
content := []*pb.Part{
{
Part: &pb.Part_Text{
Text: "Hello! Please process this request.",
},
},
{
Part: &pb.Part_Data{
Data: &pb.DataPart{
Data: &structpb.Struct{
Fields: map[string]*structpb.Value{
"operation": structpb.NewStringValue("process_data"),
"dataset_id": structpb.NewStringValue("dataset_123"),
},
},
},
},
},
}
// Create A2A message
message := &pb.Message{
MessageId: "msg_12345",
ContextId: "conversation_abc",
TaskId: "task_67890",
Role: pb.Role_ROLE_USER,
Content: content,
Metadata: &structpb.Struct{
Fields: map[string]*structpb.Value{
"priority": structpb.NewStringValue("high"),
},
},
}
// Publish through AgentHub
response, err := client.PublishMessage(ctx, &pb.PublishMessageRequest{
Message: message,
Routing: &pb.AgentEventMetadata{
FromAgentId: "requester_agent",
ToAgentId: "processor_agent",
EventType: "task_message",
Priority: pb.Priority_PRIORITY_HIGH,
},
})
Subscribing to A2A Events
SubscribeToTasks
Creates a stream of A2A task events for an agent.
Go Example:
req := &pb.SubscribeToTasksRequest{
AgentId: "processor_agent",
TaskTypes: []string{"data_processing", "image_analysis"}, // Optional filter
}
stream, err := client.SubscribeToTasks(ctx, req)
if err != nil {
return err
}
for {
event, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
return err
}
// Process different event types
switch payload := event.GetPayload().(type) {
case *pb.AgentEvent_Task:
task := payload.Task
log.Printf("Received A2A task: %s", task.GetId())
// Process task using A2A handler
artifact, status, errorMsg := processA2ATask(ctx, task)
// Publish completion
publishTaskCompletion(ctx, client, task, artifact, status, errorMsg)
case *pb.AgentEvent_StatusUpdate:
update := payload.StatusUpdate
log.Printf("Task %s status: %s", update.GetTaskId(), update.GetStatus().GetState())
case *pb.AgentEvent_ArtifactUpdate:
artifact := payload.ArtifactUpdate
log.Printf("Received artifact for task %s", artifact.GetTaskId())
}
}
A2A Task Management
GetTask
Retrieves the current state of an A2A task.
Go Example:
req := &pb.GetTaskRequest{
TaskId: "task_67890",
HistoryLength: 10, // Optional: limit message history
}
task, err := client.GetTask(ctx, req)
if err != nil {
return err
}
log.Printf("Task %s status: %s", task.GetId(), task.GetStatus().GetState())
log.Printf("Message history: %d messages", len(task.GetHistory()))
log.Printf("Artifacts: %d artifacts", len(task.GetArtifacts()))
CancelTask
Cancels an active A2A task.
Go Example:
req := &pb.CancelTaskRequest{
TaskId: "task_67890",
Reason: "User requested cancellation",
}
task, err := client.CancelTask(ctx, req)
if err != nil {
return err
}
log.Printf("Task %s cancelled", task.GetId())
Agent Discovery
GetAgentCard
Returns the broker’s A2A agent card for discovery.
Go Example:
card, err := client.GetAgentCard(ctx, &emptypb.Empty{})
if err != nil {
return err
}
log.Printf("AgentHub broker: %s v%s", card.GetName(), card.GetVersion())
log.Printf("Protocol version: %s", card.GetProtocolVersion())
log.Printf("Capabilities: streaming=%v", card.GetCapabilities().GetStreaming())
for _, skill := range card.GetSkills() {
log.Printf("Skill: %s - %s", skill.GetName(), skill.GetDescription())
}
RegisterAgent
Registers an agent with the broker.
Go Example:
agentCard := &pb.AgentCard{
ProtocolVersion: "0.2.9",
Name: "my-processor-agent",
Description: "Data processing agent with A2A compliance",
Version: "1.0.0",
Capabilities: &pb.AgentCapabilities{
Streaming: true,
},
Skills: []*pb.AgentSkill{
{
Id: "data_processing",
Name: "Data Processing",
Description: "Process structured datasets",
Tags: []string{"data", "analysis"},
},
},
}
response, err := client.RegisterAgent(ctx, &pb.RegisterAgentRequest{
AgentCard: agentCard,
Subscriptions: []string{"data_processing", "analytics"},
})
if response.GetSuccess() {
log.Printf("Agent registered with ID: %s", response.GetAgentId())
} else {
log.Printf("Registration failed: %s", response.GetError())
}
High-Level A2A Client Abstractions
A2ATaskPublisher
Simplified interface for publishing A2A tasks.
taskPublisher := &agenthub.A2ATaskPublisher{
Client: client,
TraceManager: traceManager,
MetricsManager: metricsManager,
Logger: logger,
ComponentName: "my-publisher",
AgentID: "my-agent-id",
}
task, err := taskPublisher.PublishTask(ctx, &agenthub.A2APublishTaskRequest{
TaskType: "data_analysis",
Content: contentParts,
RequesterAgentID: "my-agent-id",
ResponderAgentID: "data-processor",
Priority: pb.Priority_PRIORITY_MEDIUM,
ContextID: "analysis-session-123",
})
A2ATaskSubscriber
Simplified interface for processing A2A tasks.
taskSubscriber := agenthub.NewA2ATaskSubscriber(client, "my-agent-id")
// Register task handlers
taskSubscriber.RegisterTaskHandler("data_analysis", func(ctx context.Context, task *pb.Task, message *pb.Message) (*pb.Artifact, pb.TaskState, string) {
// Process the A2A task
result := processDataAnalysis(task, message)
// Return A2A artifact
artifact := &pb.Artifact{
ArtifactId: fmt.Sprintf("result_%s", task.GetId()),
Name: "analysis_result",
Description: "Data analysis results",
Parts: []*pb.Part{
{
Part: &pb.Part_Data{
Data: &pb.DataPart{
Data: result,
},
},
},
},
}
return artifact, pb.TaskState_TASK_STATE_COMPLETED, ""
})
// Start processing A2A tasks
err := taskSubscriber.SubscribeToTasks(ctx)
Error Handling
gRPC Status Codes
AgentHub uses standard gRPC status codes:
InvalidArgument (Code: 3)
- Missing required fields (message_id, role, content)
- Invalid A2A message structure
- Malformed Part content
NotFound (Code: 5)
- Task ID not found in GetTask/CancelTask
- Agent not registered
Internal (Code: 13)
- Server-side processing errors
- Message routing failures
- A2A validation errors
Retry Patterns
func publishWithRetry(ctx context.Context, client pb.AgentHubClient, req *pb.PublishMessageRequest) error {
for attempt := 0; attempt < 3; attempt++ {
_, err := client.PublishMessage(ctx, req)
if err == nil {
return nil
}
// Check if error is retryable
if status.Code(err) == codes.InvalidArgument {
return err // Don't retry validation errors
}
// Exponential backoff
time.Sleep(time.Duration(1<<attempt) * time.Second)
}
return fmt.Errorf("max retries exceeded")
}
Message Size Limits
- Maximum message size: 4MB (gRPC default)
- Recommended size: <100KB for optimal A2A compliance
- Large content: Use FilePart references for large data
A2A Best Practices
- Use structured Parts: Prefer DataPart for structured data over text
- Context management: Group related messages with context_id
- Artifact structure: Return well-formed Artifact objects
- Task lifecycle: Properly manage TaskState transitions
- Connection reuse: Maintain persistent gRPC connections
This completes the comprehensive A2A-compliant API reference for AgentHub, covering all message types, operations, and integration patterns with practical examples.
2 - AgentHub Tracing API Reference
Complete API documentation for AgentHub’s OpenTelemetry tracing integration, span management, context propagation, and instrumentation patterns.
🔍 AgentHub Tracing API Reference
Technical reference: Complete API documentation for AgentHub’s OpenTelemetry tracing integration, span management, context propagation, and instrumentation patterns.
Core Components
TraceManager
The TraceManager provides high-level tracing operations for AgentHub events.
Constructor
func NewTraceManager(serviceName string) *TraceManager
Parameters:
serviceName - Name of the service creating spans
Returns: Configured TraceManager instance
Methods
StartPublishSpan
func (tm *TraceManager) StartPublishSpan(ctx context.Context, responderAgentID, eventType string) (context.Context, trace.Span)
Purpose: Creates a span for event publishing operations
Parameters:
ctx - Parent context (may contain existing trace)responderAgentID - Target agent for the eventeventType - Type of event being published
Returns:
context.Context - New context with active spantrace.Span - The created span
Attributes Set:
event.type - Event type being publishedresponder.agent - Target agent IDoperation.type - “publish”
Usage:
ctx, span := tm.StartPublishSpan(ctx, "agent_subscriber", "greeting")
defer span.End()
// ... publishing logic
StartEventProcessingSpan
func (tm *TraceManager) StartEventProcessingSpan(ctx context.Context, eventID, eventType, requesterAgentID, responderAgentID string) (context.Context, trace.Span)
Purpose: Creates a span for event processing operations
Parameters:
ctx - Context with extracted trace informationeventID - Unique identifier for the eventeventType - Type of event being processedrequesterAgentID - Agent that requested processingresponderAgentID - Agent performing processing
Returns:
context.Context - Context with processing spantrace.Span - The processing span
Attributes Set:
event.id - Event identifierevent.type - Event typerequester.agent - Requesting agent IDresponder.agent - Processing agent IDoperation.type - “process”
StartBrokerSpan
func (tm *TraceManager) StartBrokerSpan(ctx context.Context, operation, eventType string) (context.Context, trace.Span)
Purpose: Creates spans for broker operations
Parameters:
ctx - Request contextoperation - Broker operation (route, subscribe, unsubscribe)eventType - Event type being handled
Returns:
context.Context - Context with broker spantrace.Span - The broker span
Attributes Set:
operation.type - Broker operation typeevent.type - Event type being handledcomponent - “broker”
InjectTraceContext
func (tm *TraceManager) InjectTraceContext(ctx context.Context, headers map[string]string)
Purpose: Injects trace context into headers for propagation
Parameters:
ctx - Context containing trace informationheaders - Map to inject headers into
Headers Injected:
traceparent - W3C trace context headertracestate - W3C trace state header (if present)
Usage:
headers := make(map[string]string)
tm.InjectTraceContext(ctx, headers)
// headers now contain trace context for propagation
func (tm *TraceManager) ExtractTraceContext(ctx context.Context, headers map[string]string) context.Context
Purpose: Extracts trace context from headers
Parameters:
ctx - Base contextheaders - Headers containing trace context
Returns: Context with extracted trace information
Usage:
// Extract from event metadata
if metadata := event.GetMetadata(); metadata != nil {
if traceHeaders, ok := metadata.Fields["trace_headers"]; ok {
headers := structFieldsToStringMap(traceHeaders.GetStructValue().Fields)
ctx = tm.ExtractTraceContext(ctx, headers)
}
}
RecordError
func (tm *TraceManager) RecordError(span trace.Span, err error)
Purpose: Records an error on a span with proper formatting
Parameters:
span - Span to record error onerr - Error to record
Effects:
- Sets span status to error
- Records error as span event
- Adds error type attribute
SetSpanSuccess
func (tm *TraceManager) SetSpanSuccess(span trace.Span)
Purpose: Marks a span as successful
Parameters:
span - Span to mark as successful
Effects:
- Sets span status to OK
- Ensures span is properly completed
Context Propagation
W3C Trace Context Standards
AgentHub uses the W3C Trace Context specification for interoperability.
traceparent
Format: 00-{trace-id}-{span-id}-{trace-flags}
00 - Version (currently always 00)trace-id - 32-character hex stringspan-id - 16-character hex stringtrace-flags - 2-character hex flags
Example: 00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01
tracestate
Format: Vendor-specific key-value pairs
Example: rojo=00f067aa0ba902b7,congo=t61rcWkgMzE
Propagation Implementation
Manual Injection
// Create headers map
headers := make(map[string]string)
// Inject trace context
otel.GetTextMapPropagator().Inject(ctx, propagation.MapCarrier(headers))
// Headers now contain trace context
// Convert to protobuf metadata if needed
metadataStruct, err := structpb.NewStruct(map[string]interface{}{
"trace_headers": headers,
"timestamp": time.Now().Format(time.RFC3339),
})
// Extract from protobuf metadata
if metadata := task.GetMetadata(); metadata != nil {
if traceHeaders, ok := metadata.Fields["trace_headers"]; ok {
headers := make(map[string]string)
for k, v := range traceHeaders.GetStructValue().Fields {
headers[k] = v.GetStringValue()
}
ctx = otel.GetTextMapPropagator().Extract(ctx, propagation.MapCarrier(headers))
}
}
Span Lifecycle Management
Creating Spans
Basic Span Creation
tracer := otel.Tracer("my-service")
ctx, span := tracer.Start(ctx, "operation_name")
defer span.End()
Span with Attributes
ctx, span := tracer.Start(ctx, "operation_name", trace.WithAttributes(
attribute.String("operation.type", "publish"),
attribute.String("event.type", "greeting"),
attribute.Int("event.priority", 1),
))
defer span.End()
Child Span Creation
// Parent span
ctx, parentSpan := tracer.Start(ctx, "parent_operation")
defer parentSpan.End()
// Child span (automatically linked)
ctx, childSpan := tracer.Start(ctx, "child_operation")
defer childSpan.End()
Span Attributes
Standard Attributes
AgentHub uses consistent attribute naming:
// Event attributes
attribute.String("event.id", taskID)
attribute.String("event.type", taskType)
attribute.Int("event.priority", priority)
// Agent attributes
attribute.String("agent.id", agentID)
attribute.String("agent.type", agentType)
attribute.String("requester.agent", requesterID)
attribute.String("responder.agent", responderID)
// Operation attributes
attribute.String("operation.type", "publish|process|route")
attribute.String("component", "broker|publisher|subscriber")
// Result attributes
attribute.Bool("success", true)
attribute.String("error.type", "validation|timeout|network")
Custom Attributes
span.SetAttributes(
attribute.String("business.unit", "sales"),
attribute.String("user.tenant", "acme-corp"),
attribute.Int("batch.size", len(items)),
attribute.Duration("timeout", 30*time.Second),
)
Span Events
Adding Events
// Simple event
span.AddEvent("validation.started")
// Event with attributes
span.AddEvent("cache.miss", trace.WithAttributes(
attribute.String("cache.key", key),
attribute.String("cache.type", "redis"),
))
// Event with timestamp
span.AddEvent("external.api.call", trace.WithAttributes(
attribute.String("api.endpoint", "/v1/users"),
attribute.Int("api.status_code", 200),
), trace.WithTimestamp(time.Now()))
Common Event Patterns
// Processing milestones
span.AddEvent("processing.started")
span.AddEvent("validation.completed")
span.AddEvent("business.logic.completed")
span.AddEvent("result.published")
// Error events
span.AddEvent("error.occurred", trace.WithAttributes(
attribute.String("error.message", err.Error()),
attribute.String("error.stack", debug.Stack()),
))
Span Status
Setting Status
// Success
span.SetStatus(codes.Ok, "")
// Error with message
span.SetStatus(codes.Error, "validation failed")
// Error without message
span.SetStatus(codes.Error, "")
Status Code Mapping
// gRPC codes to OpenTelemetry codes
statusCode := codes.Ok
if err != nil {
switch {
case errors.Is(err, context.DeadlineExceeded):
statusCode = codes.DeadlineExceeded
case errors.Is(err, context.Canceled):
statusCode = codes.Cancelled
default:
statusCode = codes.Error
}
}
span.SetStatus(statusCode, err.Error())
Advanced Instrumentation
Baggage Propagation
Setting Baggage
// Add baggage to context
ctx = baggage.ContextWithValues(ctx,
baggage.String("user.id", userID),
baggage.String("tenant.id", tenantID),
baggage.String("request.id", requestID),
)
Reading Baggage
// Read baggage anywhere in the trace
if member := baggage.FromContext(ctx).Member("user.id"); member.Value() != "" {
userID := member.Value()
// Use user ID for business logic
}
Span Links
Creating Links
// Link to related span
linkedSpanContext := trace.SpanContextFromContext(relatedCtx)
ctx, span := tracer.Start(ctx, "operation", trace.WithLinks(
trace.Link{
SpanContext: linkedSpanContext,
Attributes: []attribute.KeyValue{
attribute.String("link.type", "related_operation"),
},
},
))
Sampling Control
Conditional Sampling
// Force sampling for important operations
ctx, span := tracer.Start(ctx, "critical_operation",
trace.WithNewRoot(), // Start new trace
trace.WithSpanKind(trace.SpanKindServer),
)
// Add sampling priority
span.SetAttributes(
attribute.String("sampling.priority", "high"),
)
Integration Patterns
gRPC Integration
Server Interceptor
func TracingUnaryInterceptor(tracer trace.Tracer) grpc.UnaryServerInterceptor {
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
ctx, span := tracer.Start(ctx, info.FullMethod)
defer span.End()
resp, err := handler(ctx, req)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
}
return resp, err
}
}
Client Interceptor
func TracingUnaryClientInterceptor(tracer trace.Tracer) grpc.UnaryClientInterceptor {
return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
ctx, span := tracer.Start(ctx, method)
defer span.End()
err := invoker(ctx, method, req, reply, cc, opts...)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
}
return err
}
}
HTTP Integration
HTTP Handler Wrapper
func TracingHandler(tracer trace.Tracer, next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
ctx := otel.GetTextMapPropagator().Extract(r.Context(), propagation.HeaderCarrier(r.Header))
ctx, span := tracer.Start(ctx, r.Method+" "+r.URL.Path)
defer span.End()
span.SetAttributes(
attribute.String("http.method", r.Method),
attribute.String("http.url", r.URL.String()),
attribute.String("http.user_agent", r.UserAgent()),
)
next.ServeHTTP(w, r.WithContext(ctx))
})
}
Error Handling
Error Recording Best Practices
Complete Error Recording
if err != nil {
// Record error on span
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
// Add error context
span.SetAttributes(
attribute.String("error.type", classifyError(err)),
attribute.Bool("error.retryable", isRetryable(err)),
)
// Log with context
logger.ErrorContext(ctx, "Operation failed",
slog.Any("error", err),
slog.String("operation", "event_processing"),
)
return err
}
Error Classification
func classifyError(err error) string {
switch {
case errors.Is(err, context.DeadlineExceeded):
return "timeout"
case errors.Is(err, context.Canceled):
return "cancelled"
case strings.Contains(err.Error(), "connection"):
return "network"
case strings.Contains(err.Error(), "validation"):
return "validation"
default:
return "unknown"
}
}
Span Creation Overhead
- Span creation: ~1-2μs per span
- Attribute setting: ~100ns per attribute
- Event recording: ~200ns per event
- Context propagation: ~500ns per injection/extraction
Memory Usage
- Active span: ~500 bytes
- Completed span buffer: ~1KB per span
- Context overhead: ~100 bytes per context
Best Practices
- Limit span attributes to essential information
- Use batch exporters to reduce network overhead
- Sample appropriately for high-throughput services
- Pool span contexts where possible
- Avoid deep span nesting (>10 levels)
Troubleshooting
Missing Spans Checklist
- ✅ OpenTelemetry properly initialized
- ✅ Tracer retrieved from global provider
- ✅ Context propagated correctly
- ✅ Spans properly ended
- ✅ Exporter configured and accessible
Common Issues
Broken Trace Chains
// ❌ Wrong - creates new root trace
ctx, span := tracer.Start(context.Background(), "operation")
// ✅ Correct - continues existing trace
ctx, span := tracer.Start(ctx, "operation")
Missing Context Propagation
// ❌ Wrong - context not propagated
go func() {
ctx, span := tracer.Start(context.Background(), "async_work")
// work...
}()
// ✅ Correct - context properly propagated
go func(ctx context.Context) {
ctx, span := tracer.Start(ctx, "async_work")
// work...
}(ctx)
🎯 Next Steps:
Implementation: Add Observability to Your Agent
Debugging: Debug with Distributed Tracing
Metrics: Observability Metrics Reference
3 - Unified Abstraction Library API Reference
The AgentHub unified abstraction library provides simplified APIs for building gRPC-based agent communication systems with built-in observability, automatic configuration, and correlation tracking.
Unified Abstraction Library API Reference
The AgentHub unified abstraction library provides simplified APIs for building gRPC-based agent communication systems with built-in observability, automatic configuration, and correlation tracking.
Package: internal/agenthub
The internal/agenthub package contains the core unified abstraction components that dramatically simplify AgentHub development by providing high-level APIs with automatic observability integration.
Overview
The unified abstraction library reduces agent implementation complexity from 380+ lines to ~29 lines by providing:
- Automatic gRPC Setup: One-line server and client creation
- Built-in Observability: Integrated OpenTelemetry tracing and metrics
- Environment-Based Configuration: Automatic configuration from environment variables
- Correlation Tracking: Automatic correlation ID generation and propagation
- Pluggable Architecture: Simple task handler registration
Core Components
GRPCConfig
Configuration structure for gRPC servers and clients with environment-based initialization.
type GRPCConfig struct {
ServerAddr string // gRPC server listen address (e.g., ":50051")
BrokerAddr string // Broker connection address (e.g., "localhost:50051")
HealthPort string // Health check endpoint port
ComponentName string // Component identifier for observability
}
Constructor
func NewGRPCConfig(componentName string) *GRPCConfig
Creates a new gRPC configuration with environment variable defaults:
| Environment Variable | Default | Description |
|---|
AGENTHUB_BROKER_ADDR | localhost | Broker server host |
AGENTHUB_BROKER_PORT | 50051 | Broker gRPC port |
AGENTHUB_GRPC_PORT | :50051 | Server listen port |
BROKER_HEALTH_PORT | 8080 | Health endpoint port |
Example:
config := agenthub.NewGRPCConfig("my-agent")
// Results in BrokerAddr: "localhost:50051" (automatically combined)
AgentHubServer
High-level gRPC server wrapper with integrated observability.
type AgentHubServer struct {
Server *grpc.Server // Underlying gRPC server
Listener net.Listener // Network listener
Observability *observability.Observability // OpenTelemetry integration
TraceManager *observability.TraceManager // Distributed tracing
MetricsManager *observability.MetricsManager // Metrics collection
HealthServer *observability.HealthServer // Health monitoring
Logger *slog.Logger // Structured logging
Config *GRPCConfig // Configuration
}
Constructor
func NewAgentHubServer(config *GRPCConfig) (*AgentHubServer, error)
Creates a complete gRPC server with:
- OpenTelemetry instrumentation
- Health check endpoints
- Metrics collection
- Structured logging with trace correlation
Methods
func (s *AgentHubServer) Start(ctx context.Context) error
Starts the server with automatic:
- Health endpoint setup (
/health, /ready, /metrics) - Metrics collection goroutine
- gRPC server with observability
func (s *AgentHubServer) Shutdown(ctx context.Context) error
Gracefully shuts down all components:
- gRPC server graceful stop
- Health server shutdown
- Observability cleanup
Example:
config := agenthub.NewGRPCConfig("broker")
server, err := agenthub.NewAgentHubServer(config)
if err != nil {
log.Fatal(err)
}
// Register services
eventBusService := agenthub.NewEventBusService(server)
pb.RegisterEventBusServer(server.Server, eventBusService)
// Start server
if err := server.Start(ctx); err != nil {
log.Fatal(err)
}
AgentHubClient
High-level gRPC client wrapper with integrated observability.
type AgentHubClient struct {
Client pb.EventBusClient // gRPC client
Connection *grpc.ClientConn // Connection
Observability *observability.Observability // OpenTelemetry integration
TraceManager *observability.TraceManager // Distributed tracing
MetricsManager *observability.MetricsManager // Metrics collection
HealthServer *observability.HealthServer // Health monitoring
Logger *slog.Logger // Structured logging
Config *GRPCConfig // Configuration
}
Constructor
func NewAgentHubClient(config *GRPCConfig) (*AgentHubClient, error)
Creates a complete gRPC client with:
- OpenTelemetry instrumentation
- Connection health monitoring
- Metrics collection
- Automatic retry and timeout handling
Methods
func (c *AgentHubClient) Start(ctx context.Context) error
Initializes client with health monitoring and metrics collection.
func (c *AgentHubClient) Shutdown(ctx context.Context) error
Gracefully closes connection and cleans up resources.
Example:
config := agenthub.NewGRPCConfig("publisher")
client, err := agenthub.NewAgentHubClient(config)
if err != nil {
log.Fatal(err)
}
err = client.Start(ctx)
if err != nil {
log.Fatal(err)
}
// Use client.Client for gRPC calls
Service Abstractions
EventBusService
Broker service implementation with built-in observability and correlation tracking.
type EventBusService struct {
Server *AgentHubServer
subscriptions map[string][]Subscription
resultSubs map[string][]ResultSubscription
progressSubs map[string][]ProgressSubscription
mu sync.RWMutex
}
Constructor
func NewEventBusService(server *AgentHubServer) *EventBusService
Creates an EventBus service with automatic:
- Subscription management
- Task routing and correlation
- Observability integration
Key Methods
func (s *EventBusService) PublishTask(ctx context.Context, req *pb.PublishTaskRequest) (*pb.PublishResponse, error)
Publishes tasks with automatic:
- Input validation
- Correlation ID generation
- Distributed tracing
- Metrics collection
func (s *EventBusService) SubscribeToTasks(req *pb.SubscribeToTasksRequest, stream pb.EventBus_SubscribeToTasksServer) error
Manages task subscriptions with:
- Automatic subscription lifecycle
- Context cancellation handling
- Error recovery
SubscriberAgent
High-level subscriber implementation with pluggable task handlers.
type SubscriberAgent struct {
client *AgentHubClient
agentID string
handlers map[string]TaskHandler
ctx context.Context
cancel context.CancelFunc
}
Constructor
func NewSubscriberAgent(client *AgentHubClient, agentID string) *SubscriberAgent
Task Handler Interface
type TaskHandler interface {
Handle(ctx context.Context, task *pb.TaskMessage) (*pb.TaskResult, error)
}
Methods
func (s *SubscriberAgent) RegisterHandler(taskType string, handler TaskHandler)
Registers handlers for specific task types with automatic:
- Task routing
- Error handling
- Result publishing
func (s *SubscriberAgent) Start(ctx context.Context) error
Starts the subscriber with automatic:
- Task subscription
- Handler dispatch
- Observability integration
Example:
type GreetingHandler struct{}
func (h *GreetingHandler) Handle(ctx context.Context, task *pb.TaskMessage) (*pb.TaskResult, error) {
// Process greeting task
return result, nil
}
// Register handler
subscriber.RegisterHandler("greeting", &GreetingHandler{})
Utility Functions
func ExtractCorrelationID(ctx context.Context) string
func InjectCorrelationID(ctx context.Context, correlationID string) context.Context
func GenerateCorrelationID() string
Automatic correlation ID management for distributed tracing.
Metrics Helpers
func NewMetricsTicker(ctx context.Context, manager *observability.MetricsManager) *MetricsTicker
Automatic metrics collection with configurable intervals.
Configuration Reference
Environment Variables
The unified abstraction library uses environment-based configuration:
| Variable | Type | Default | Description |
|---|
AGENTHUB_BROKER_ADDR | string | localhost | Broker server hostname |
AGENTHUB_BROKER_PORT | string | 50051 | Broker gRPC port |
AGENTHUB_GRPC_PORT | string | :50051 | Server listen address |
BROKER_HEALTH_PORT | string | 8080 | Health endpoint port |
SERVICE_VERSION | string | 1.0.0 | Service version for observability |
ENVIRONMENT | string | development | Deployment environment |
Observability Integration
The unified abstraction automatically configures:
- OpenTelemetry Tracing: Automatic span creation and context propagation
- Prometheus Metrics: 47+ built-in metrics for performance monitoring
- Health Checks: Comprehensive health endpoints for service monitoring
- Structured Logging: Correlated logging with trace context
| Metric | Standard gRPC | Unified Abstraction | Overhead |
|---|
| Setup Complexity | 380+ lines | ~29 lines | -92% code |
| Throughput | 10,000+ tasks/sec | 9,500+ tasks/sec | -5% |
| Latency | Baseline | +10ms for tracing | +10ms |
| Memory | Baseline | +50MB per agent | +50MB |
| CPU | Baseline | +5% for observability | +5% |
Migration Guide
From Standard gRPC
Before (Standard gRPC):
// 380+ lines of boilerplate code
lis, err := net.Listen("tcp", ":50051")
server := grpc.NewServer()
// ... extensive setup code
After (Unified Abstraction):
// 29 lines total
config := agenthub.NewGRPCConfig("my-service")
server, err := agenthub.NewAgentHubServer(config)
service := agenthub.NewEventBusService(server)
pb.RegisterEventBusServer(server.Server, service)
server.Start(ctx)
Observability Benefits
The unified abstraction provides automatic:
- Distributed Tracing: Every request automatically traced
- Metrics Collection: 47+ metrics without configuration
- Health Monitoring: Built-in health and readiness endpoints
- Error Correlation: Automatic error tracking across services
- Performance Monitoring: Latency, throughput, and error rates
Error Handling
The unified abstraction provides comprehensive error handling:
- Automatic Retries: Built-in retry logic for transient failures
- Circuit Breaking: Protection against cascading failures
- Graceful Degradation: Service continues operating during partial failures
- Error Correlation: Distributed error tracking across service boundaries
Best Practices
1. Configuration Management
// Use environment-based configuration
config := agenthub.NewGRPCConfig("my-service")
// Override specific values if needed
config.HealthPort = "8083"
2. Handler Registration
// Register handlers before starting
subscriber.RegisterHandler("task-type", handler)
subscriber.Start(ctx)
3. Graceful Shutdown
// Always implement proper shutdown
defer func() {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
server.Shutdown(ctx)
}()
4. Error Handling
// Use context for cancellation
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
result, err := client.Client.PublishTask(ctx, request)
if err != nil {
// Error is automatically traced and logged
return fmt.Errorf("failed to publish task: %w", err)
}
See Also