This is the multi-page printable view of this section. Click here to print.

Return to the regular view of this page.

API Reference

Complete API documentation and specifications

API Reference Documentation

This section contains comprehensive API documentation for all AgentHub interfaces, including gRPC APIs, unified abstractions, and tracing interfaces.

Available Documentation

1 - A2A-Compliant AgentHub API Reference

Complete technical reference for the A2A-compliant AgentHub API, including all gRPC services, message types, and operational details.

A2A-Compliant AgentHub API Reference

This document provides complete technical reference for the Agent2Agent (A2A) protocol-compliant AgentHub API, including all gRPC services, message types, and operational details.

gRPC Service Definition

The AgentHub broker implements the AgentHub service as defined in proto/eventbus.proto:

service AgentHub {
  // ===== A2A Message Publishing (EDA style) =====

  // PublishMessage submits an A2A message for delivery through the broker
  rpc PublishMessage(PublishMessageRequest) returns (PublishResponse);

  // PublishTaskUpdate notifies subscribers about A2A task state changes
  rpc PublishTaskUpdate(PublishTaskUpdateRequest) returns (PublishResponse);

  // PublishTaskArtifact delivers A2A task output artifacts to subscribers
  rpc PublishTaskArtifact(PublishTaskArtifactRequest) returns (PublishResponse);

  // ===== A2A Event Subscriptions (EDA style) =====

  // SubscribeToMessages creates a stream of A2A message events for an agent
  rpc SubscribeToMessages(SubscribeToMessagesRequest) returns (stream AgentEvent);

  // SubscribeToTasks creates a stream of A2A task events for an agent
  rpc SubscribeToTasks(SubscribeToTasksRequest) returns (stream AgentEvent);

  // SubscribeToAgentEvents creates a unified stream of all events for an agent
  rpc SubscribeToAgentEvents(SubscribeToAgentEventsRequest) returns (stream AgentEvent);

  // ===== A2A Task Management (compatible with A2A spec) =====

  // GetTask retrieves the current state of an A2A task by ID
  rpc GetTask(GetTaskRequest) returns (a2a.Task);

  // CancelTask cancels an active A2A task and notifies subscribers
  rpc CancelTask(CancelTaskRequest) returns (a2a.Task);

  // ListTasks returns A2A tasks matching the specified criteria
  rpc ListTasks(ListTasksRequest) returns (ListTasksResponse);

  // ===== Agent Discovery (A2A compatible) =====

  // GetAgentCard returns the broker's A2A agent card for discovery
  rpc GetAgentCard(google.protobuf.Empty) returns (a2a.AgentCard);

  // RegisterAgent registers an agent with the broker for event routing
  rpc RegisterAgent(RegisterAgentRequest) returns (RegisterAgentResponse);
}

A2A Message Types

Core A2A Types

A2A Message

Represents an A2A-compliant message for agent communication.

message Message {
  string message_id = 1;       // Required: Unique message identifier
  string context_id = 2;       // Optional: Conversation context
  string task_id = 3;          // Optional: Associated task
  Role role = 4;               // Required: USER or AGENT
  repeated Part content = 5;   // Required: Message content parts
  google.protobuf.Struct metadata = 6; // Optional: Additional metadata
  repeated string extensions = 7;       // Optional: Protocol extensions
}

Field Details:

  • message_id: Must be unique across all messages. Generated automatically if not provided
  • context_id: Groups related messages in a conversation or workflow
  • task_id: Links message to a specific A2A task
  • role: Indicates whether message is from USER (requesting agent) or AGENT (responding agent)
  • content: Array of A2A Part structures containing the actual message content
  • metadata: Additional context for routing, processing, or debugging
  • extensions: Protocol extension identifiers for future compatibility

A2A Part

Represents content within an A2A message.

message Part {
  oneof part {
    string text = 1;           // Text content
    DataPart data = 2;         // Structured data
    FilePart file = 3;         // File reference
  }
}

message DataPart {
  google.protobuf.Struct data = 1;    // Structured data content
  string description = 2;             // Optional data description
}

message FilePart {
  string file_id = 1;                 // File identifier or URI
  string filename = 2;                // Original filename
  string mime_type = 3;               // MIME type
  int64 size_bytes = 4;               // File size in bytes
  google.protobuf.Struct metadata = 5; // Additional file metadata
}

A2A Task

Represents an A2A-compliant task with lifecycle management.

message Task {
  string id = 1;                    // Required: Task identifier
  string context_id = 2;            // Optional: Conversation context
  TaskStatus status = 3;            // Required: Current task status
  repeated Message history = 4;     // Message history for this task
  repeated Artifact artifacts = 5;  // Task output artifacts
  google.protobuf.Struct metadata = 6; // Task metadata
}

message TaskStatus {
  TaskState state = 1;              // Current task state
  Message update = 2;               // Status update message
  google.protobuf.Timestamp timestamp = 3; // Status timestamp
}

enum TaskState {
  TASK_STATE_SUBMITTED = 0;    // Task created and submitted
  TASK_STATE_WORKING = 1;      // Task in progress
  TASK_STATE_COMPLETED = 2;    // Task completed successfully
  TASK_STATE_FAILED = 3;       // Task failed with error
  TASK_STATE_CANCELLED = 4;    // Task cancelled
}

A2A Artifact

Represents structured output from completed tasks.

message Artifact {
  string artifact_id = 1;           // Required: Artifact identifier
  string name = 2;                  // Human-readable name
  string description = 3;           // Artifact description
  repeated Part parts = 4;          // Artifact content parts
  google.protobuf.Struct metadata = 5; // Artifact metadata
}

EDA Event Wrapper Types

AgentEvent

Wraps A2A messages for Event-Driven Architecture transport.

message AgentEvent {
  string event_id = 1;                     // Unique event identifier
  google.protobuf.Timestamp timestamp = 2; // Event timestamp

  // A2A-compliant payload
  oneof payload {
    a2a.Message message = 10;              // A2A Message
    a2a.Task task = 11;                    // A2A Task
    TaskStatusUpdateEvent status_update = 12; // Task status change
    TaskArtifactUpdateEvent artifact_update = 13; // Artifact update
  }

  // EDA routing metadata
  AgentEventMetadata routing = 20;

  // Observability context
  string trace_id = 30;
  string span_id = 31;
}

AgentEventMetadata

Provides routing and delivery information for events.

message AgentEventMetadata {
  string from_agent_id = 1;               // Source agent identifier
  string to_agent_id = 2;                 // Target agent ID (empty = broadcast)
  string event_type = 3;                  // Event classification
  repeated string subscriptions = 4;      // Topic-based routing tags
  Priority priority = 5;                  // Delivery priority
}

Request/Response Messages

PublishMessageRequest

message PublishMessageRequest {
  a2a.Message message = 1;                // A2A message to publish
  AgentEventMetadata routing = 2;         // EDA routing info
}

SubscribeToTasksRequest

message SubscribeToTasksRequest {
  string agent_id = 1;                    // Agent ID for subscription
  repeated string task_types = 2;         // Optional task type filter
  repeated a2a.TaskState states = 3;      // Optional state filter
}

GetTaskRequest

message GetTaskRequest {
  string task_id = 1;                     // Task identifier
  int32 history_length = 2;               // History limit (optional)
}

API Operations

Publishing A2A Messages

PublishMessage

Publishes an A2A message for delivery through the EDA broker.

Go Example:

// Create A2A message content
content := []*pb.Part{
    {
        Part: &pb.Part_Text{
            Text: "Hello! Please process this request.",
        },
    },
    {
        Part: &pb.Part_Data{
            Data: &pb.DataPart{
                Data: &structpb.Struct{
                    Fields: map[string]*structpb.Value{
                        "operation": structpb.NewStringValue("process_data"),
                        "dataset_id": structpb.NewStringValue("dataset_123"),
                    },
                },
            },
        },
    },
}

// Create A2A message
message := &pb.Message{
    MessageId: "msg_12345",
    ContextId: "conversation_abc",
    TaskId:    "task_67890",
    Role:      pb.Role_ROLE_USER,
    Content:   content,
    Metadata: &structpb.Struct{
        Fields: map[string]*structpb.Value{
            "priority": structpb.NewStringValue("high"),
        },
    },
}

// Publish through AgentHub
response, err := client.PublishMessage(ctx, &pb.PublishMessageRequest{
    Message: message,
    Routing: &pb.AgentEventMetadata{
        FromAgentId: "requester_agent",
        ToAgentId:   "processor_agent",
        EventType:   "task_message",
        Priority:    pb.Priority_PRIORITY_HIGH,
    },
})

Subscribing to A2A Events

SubscribeToTasks

Creates a stream of A2A task events for an agent.

Go Example:

req := &pb.SubscribeToTasksRequest{
    AgentId: "processor_agent",
    TaskTypes: []string{"data_processing", "image_analysis"}, // Optional filter
}

stream, err := client.SubscribeToTasks(ctx, req)
if err != nil {
    return err
}

for {
    event, err := stream.Recv()
    if err == io.EOF {
        break
    }
    if err != nil {
        return err
    }

    // Process different event types
    switch payload := event.GetPayload().(type) {
    case *pb.AgentEvent_Task:
        task := payload.Task
        log.Printf("Received A2A task: %s", task.GetId())

        // Process task using A2A handler
        artifact, status, errorMsg := processA2ATask(ctx, task)

        // Publish completion
        publishTaskCompletion(ctx, client, task, artifact, status, errorMsg)

    case *pb.AgentEvent_StatusUpdate:
        update := payload.StatusUpdate
        log.Printf("Task %s status: %s", update.GetTaskId(), update.GetStatus().GetState())

    case *pb.AgentEvent_ArtifactUpdate:
        artifact := payload.ArtifactUpdate
        log.Printf("Received artifact for task %s", artifact.GetTaskId())
    }
}

A2A Task Management

GetTask

Retrieves the current state of an A2A task.

Go Example:

req := &pb.GetTaskRequest{
    TaskId: "task_67890",
    HistoryLength: 10, // Optional: limit message history
}

task, err := client.GetTask(ctx, req)
if err != nil {
    return err
}

log.Printf("Task %s status: %s", task.GetId(), task.GetStatus().GetState())
log.Printf("Message history: %d messages", len(task.GetHistory()))
log.Printf("Artifacts: %d artifacts", len(task.GetArtifacts()))

CancelTask

Cancels an active A2A task.

Go Example:

req := &pb.CancelTaskRequest{
    TaskId: "task_67890",
    Reason: "User requested cancellation",
}

task, err := client.CancelTask(ctx, req)
if err != nil {
    return err
}

log.Printf("Task %s cancelled", task.GetId())

Agent Discovery

GetAgentCard

Returns the broker’s A2A agent card for discovery.

Go Example:

card, err := client.GetAgentCard(ctx, &emptypb.Empty{})
if err != nil {
    return err
}

log.Printf("AgentHub broker: %s v%s", card.GetName(), card.GetVersion())
log.Printf("Protocol version: %s", card.GetProtocolVersion())
log.Printf("Capabilities: streaming=%v", card.GetCapabilities().GetStreaming())

for _, skill := range card.GetSkills() {
    log.Printf("Skill: %s - %s", skill.GetName(), skill.GetDescription())
}

RegisterAgent

Registers an agent with the broker.

Go Example:

agentCard := &pb.AgentCard{
    ProtocolVersion: "0.2.9",
    Name:           "my-processor-agent",
    Description:    "Data processing agent with A2A compliance",
    Version:        "1.0.0",
    Capabilities: &pb.AgentCapabilities{
        Streaming: true,
    },
    Skills: []*pb.AgentSkill{
        {
            Id:          "data_processing",
            Name:        "Data Processing",
            Description: "Process structured datasets",
            Tags:        []string{"data", "analysis"},
        },
    },
}

response, err := client.RegisterAgent(ctx, &pb.RegisterAgentRequest{
    AgentCard: agentCard,
    Subscriptions: []string{"data_processing", "analytics"},
})

if response.GetSuccess() {
    log.Printf("Agent registered with ID: %s", response.GetAgentId())
} else {
    log.Printf("Registration failed: %s", response.GetError())
}

High-Level A2A Client Abstractions

A2ATaskPublisher

Simplified interface for publishing A2A tasks.

taskPublisher := &agenthub.A2ATaskPublisher{
    Client:         client,
    TraceManager:   traceManager,
    MetricsManager: metricsManager,
    Logger:         logger,
    ComponentName:  "my-publisher",
    AgentID:        "my-agent-id",
}

task, err := taskPublisher.PublishTask(ctx, &agenthub.A2APublishTaskRequest{
    TaskType:         "data_analysis",
    Content:          contentParts,
    RequesterAgentID: "my-agent-id",
    ResponderAgentID: "data-processor",
    Priority:         pb.Priority_PRIORITY_MEDIUM,
    ContextID:        "analysis-session-123",
})

A2ATaskSubscriber

Simplified interface for processing A2A tasks.

taskSubscriber := agenthub.NewA2ATaskSubscriber(client, "my-agent-id")

// Register task handlers
taskSubscriber.RegisterTaskHandler("data_analysis", func(ctx context.Context, task *pb.Task, message *pb.Message) (*pb.Artifact, pb.TaskState, string) {
    // Process the A2A task
    result := processDataAnalysis(task, message)

    // Return A2A artifact
    artifact := &pb.Artifact{
        ArtifactId:  fmt.Sprintf("result_%s", task.GetId()),
        Name:        "analysis_result",
        Description: "Data analysis results",
        Parts: []*pb.Part{
            {
                Part: &pb.Part_Data{
                    Data: &pb.DataPart{
                        Data: result,
                    },
                },
            },
        },
    }

    return artifact, pb.TaskState_TASK_STATE_COMPLETED, ""
})

// Start processing A2A tasks
err := taskSubscriber.SubscribeToTasks(ctx)

Error Handling

gRPC Status Codes

AgentHub uses standard gRPC status codes:

InvalidArgument (Code: 3)

  • Missing required fields (message_id, role, content)
  • Invalid A2A message structure
  • Malformed Part content

NotFound (Code: 5)

  • Task ID not found in GetTask/CancelTask
  • Agent not registered

Internal (Code: 13)

  • Server-side processing errors
  • Message routing failures
  • A2A validation errors

Retry Patterns

func publishWithRetry(ctx context.Context, client pb.AgentHubClient, req *pb.PublishMessageRequest) error {
    for attempt := 0; attempt < 3; attempt++ {
        _, err := client.PublishMessage(ctx, req)
        if err == nil {
            return nil
        }

        // Check if error is retryable
        if status.Code(err) == codes.InvalidArgument {
            return err // Don't retry validation errors
        }

        // Exponential backoff
        time.Sleep(time.Duration(1<<attempt) * time.Second)
    }
    return fmt.Errorf("max retries exceeded")
}

Performance Considerations

Message Size Limits

  • Maximum message size: 4MB (gRPC default)
  • Recommended size: <100KB for optimal A2A compliance
  • Large content: Use FilePart references for large data

A2A Best Practices

  1. Use structured Parts: Prefer DataPart for structured data over text
  2. Context management: Group related messages with context_id
  3. Artifact structure: Return well-formed Artifact objects
  4. Task lifecycle: Properly manage TaskState transitions
  5. Connection reuse: Maintain persistent gRPC connections

This completes the comprehensive A2A-compliant API reference for AgentHub, covering all message types, operations, and integration patterns with practical examples.

2 - AgentHub Tracing API Reference

Complete API documentation for AgentHub’s OpenTelemetry tracing integration, span management, context propagation, and instrumentation patterns.

🔍 AgentHub Tracing API Reference

Technical reference: Complete API documentation for AgentHub’s OpenTelemetry tracing integration, span management, context propagation, and instrumentation patterns.

Core Components

TraceManager

The TraceManager provides high-level tracing operations for AgentHub events.

Constructor

func NewTraceManager(serviceName string) *TraceManager

Parameters:

  • serviceName - Name of the service creating spans

Returns: Configured TraceManager instance

Methods

StartPublishSpan
func (tm *TraceManager) StartPublishSpan(ctx context.Context, responderAgentID, eventType string) (context.Context, trace.Span)

Purpose: Creates a span for event publishing operations

Parameters:

  • ctx - Parent context (may contain existing trace)
  • responderAgentID - Target agent for the event
  • eventType - Type of event being published

Returns:

  • context.Context - New context with active span
  • trace.Span - The created span

Attributes Set:

  • event.type - Event type being published
  • responder.agent - Target agent ID
  • operation.type - “publish”

Usage:

ctx, span := tm.StartPublishSpan(ctx, "agent_subscriber", "greeting")
defer span.End()
// ... publishing logic
StartEventProcessingSpan
func (tm *TraceManager) StartEventProcessingSpan(ctx context.Context, eventID, eventType, requesterAgentID, responderAgentID string) (context.Context, trace.Span)

Purpose: Creates a span for event processing operations

Parameters:

  • ctx - Context with extracted trace information
  • eventID - Unique identifier for the event
  • eventType - Type of event being processed
  • requesterAgentID - Agent that requested processing
  • responderAgentID - Agent performing processing

Returns:

  • context.Context - Context with processing span
  • trace.Span - The processing span

Attributes Set:

  • event.id - Event identifier
  • event.type - Event type
  • requester.agent - Requesting agent ID
  • responder.agent - Processing agent ID
  • operation.type - “process”
StartBrokerSpan
func (tm *TraceManager) StartBrokerSpan(ctx context.Context, operation, eventType string) (context.Context, trace.Span)

Purpose: Creates spans for broker operations

Parameters:

  • ctx - Request context
  • operation - Broker operation (route, subscribe, unsubscribe)
  • eventType - Event type being handled

Returns:

  • context.Context - Context with broker span
  • trace.Span - The broker span

Attributes Set:

  • operation.type - Broker operation type
  • event.type - Event type being handled
  • component - “broker”
InjectTraceContext
func (tm *TraceManager) InjectTraceContext(ctx context.Context, headers map[string]string)

Purpose: Injects trace context into headers for propagation

Parameters:

  • ctx - Context containing trace information
  • headers - Map to inject headers into

Headers Injected:

  • traceparent - W3C trace context header
  • tracestate - W3C trace state header (if present)

Usage:

headers := make(map[string]string)
tm.InjectTraceContext(ctx, headers)
// headers now contain trace context for propagation
ExtractTraceContext
func (tm *TraceManager) ExtractTraceContext(ctx context.Context, headers map[string]string) context.Context

Purpose: Extracts trace context from headers

Parameters:

  • ctx - Base context
  • headers - Headers containing trace context

Returns: Context with extracted trace information

Usage:

// Extract from event metadata
if metadata := event.GetMetadata(); metadata != nil {
    if traceHeaders, ok := metadata.Fields["trace_headers"]; ok {
        headers := structFieldsToStringMap(traceHeaders.GetStructValue().Fields)
        ctx = tm.ExtractTraceContext(ctx, headers)
    }
}
RecordError
func (tm *TraceManager) RecordError(span trace.Span, err error)

Purpose: Records an error on a span with proper formatting

Parameters:

  • span - Span to record error on
  • err - Error to record

Effects:

  • Sets span status to error
  • Records error as span event
  • Adds error type attribute
SetSpanSuccess
func (tm *TraceManager) SetSpanSuccess(span trace.Span)

Purpose: Marks a span as successful

Parameters:

  • span - Span to mark as successful

Effects:

  • Sets span status to OK
  • Ensures span is properly completed

Context Propagation

W3C Trace Context Standards

AgentHub uses the W3C Trace Context specification for interoperability.

Trace Context Headers

traceparent

Format: 00-{trace-id}-{span-id}-{trace-flags}

  • 00 - Version (currently always 00)
  • trace-id - 32-character hex string
  • span-id - 16-character hex string
  • trace-flags - 2-character hex flags

Example: 00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01

tracestate

Format: Vendor-specific key-value pairs Example: rojo=00f067aa0ba902b7,congo=t61rcWkgMzE

Propagation Implementation

Manual Injection

// Create headers map
headers := make(map[string]string)

// Inject trace context
otel.GetTextMapPropagator().Inject(ctx, propagation.MapCarrier(headers))

// Headers now contain trace context
// Convert to protobuf metadata if needed
metadataStruct, err := structpb.NewStruct(map[string]interface{}{
    "trace_headers": headers,
    "timestamp": time.Now().Format(time.RFC3339),
})

Manual Extraction

// Extract from protobuf metadata
if metadata := task.GetMetadata(); metadata != nil {
    if traceHeaders, ok := metadata.Fields["trace_headers"]; ok {
        headers := make(map[string]string)
        for k, v := range traceHeaders.GetStructValue().Fields {
            headers[k] = v.GetStringValue()
        }
        ctx = otel.GetTextMapPropagator().Extract(ctx, propagation.MapCarrier(headers))
    }
}

Span Lifecycle Management

Creating Spans

Basic Span Creation

tracer := otel.Tracer("my-service")
ctx, span := tracer.Start(ctx, "operation_name")
defer span.End()

Span with Attributes

ctx, span := tracer.Start(ctx, "operation_name", trace.WithAttributes(
    attribute.String("operation.type", "publish"),
    attribute.String("event.type", "greeting"),
    attribute.Int("event.priority", 1),
))
defer span.End()

Child Span Creation

// Parent span
ctx, parentSpan := tracer.Start(ctx, "parent_operation")
defer parentSpan.End()

// Child span (automatically linked)
ctx, childSpan := tracer.Start(ctx, "child_operation")
defer childSpan.End()

Span Attributes

Standard Attributes

AgentHub uses consistent attribute naming:

// Event attributes
attribute.String("event.id", taskID)
attribute.String("event.type", taskType)
attribute.Int("event.priority", priority)

// Agent attributes
attribute.String("agent.id", agentID)
attribute.String("agent.type", agentType)
attribute.String("requester.agent", requesterID)
attribute.String("responder.agent", responderID)

// Operation attributes
attribute.String("operation.type", "publish|process|route")
attribute.String("component", "broker|publisher|subscriber")

// Result attributes
attribute.Bool("success", true)
attribute.String("error.type", "validation|timeout|network")

Custom Attributes

span.SetAttributes(
    attribute.String("business.unit", "sales"),
    attribute.String("user.tenant", "acme-corp"),
    attribute.Int("batch.size", len(items)),
    attribute.Duration("timeout", 30*time.Second),
)

Span Events

Adding Events

// Simple event
span.AddEvent("validation.started")

// Event with attributes
span.AddEvent("cache.miss", trace.WithAttributes(
    attribute.String("cache.key", key),
    attribute.String("cache.type", "redis"),
))

// Event with timestamp
span.AddEvent("external.api.call", trace.WithAttributes(
    attribute.String("api.endpoint", "/v1/users"),
    attribute.Int("api.status_code", 200),
), trace.WithTimestamp(time.Now()))

Common Event Patterns

// Processing milestones
span.AddEvent("processing.started")
span.AddEvent("validation.completed")
span.AddEvent("business.logic.completed")
span.AddEvent("result.published")

// Error events
span.AddEvent("error.occurred", trace.WithAttributes(
    attribute.String("error.message", err.Error()),
    attribute.String("error.stack", debug.Stack()),
))

Span Status

Setting Status

// Success
span.SetStatus(codes.Ok, "")

// Error with message
span.SetStatus(codes.Error, "validation failed")

// Error without message
span.SetStatus(codes.Error, "")

Status Code Mapping

// gRPC codes to OpenTelemetry codes
statusCode := codes.Ok
if err != nil {
    switch {
    case errors.Is(err, context.DeadlineExceeded):
        statusCode = codes.DeadlineExceeded
    case errors.Is(err, context.Canceled):
        statusCode = codes.Cancelled
    default:
        statusCode = codes.Error
    }
}
span.SetStatus(statusCode, err.Error())

Advanced Instrumentation

Baggage Propagation

Setting Baggage

// Add baggage to context
ctx = baggage.ContextWithValues(ctx,
    baggage.String("user.id", userID),
    baggage.String("tenant.id", tenantID),
    baggage.String("request.id", requestID),
)

Reading Baggage

// Read baggage anywhere in the trace
if member := baggage.FromContext(ctx).Member("user.id"); member.Value() != "" {
    userID := member.Value()
    // Use user ID for business logic
}
// Link to related span
linkedSpanContext := trace.SpanContextFromContext(relatedCtx)
ctx, span := tracer.Start(ctx, "operation", trace.WithLinks(
    trace.Link{
        SpanContext: linkedSpanContext,
        Attributes: []attribute.KeyValue{
            attribute.String("link.type", "related_operation"),
        },
    },
))

Sampling Control

Conditional Sampling

// Force sampling for important operations
ctx, span := tracer.Start(ctx, "critical_operation",
    trace.WithNewRoot(), // Start new trace
    trace.WithSpanKind(trace.SpanKindServer),
)

// Add sampling priority
span.SetAttributes(
    attribute.String("sampling.priority", "high"),
)

Integration Patterns

gRPC Integration

Server Interceptor

func TracingUnaryInterceptor(tracer trace.Tracer) grpc.UnaryServerInterceptor {
    return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
        ctx, span := tracer.Start(ctx, info.FullMethod)
        defer span.End()

        resp, err := handler(ctx, req)
        if err != nil {
            span.RecordError(err)
            span.SetStatus(codes.Error, err.Error())
        }
        return resp, err
    }
}

Client Interceptor

func TracingUnaryClientInterceptor(tracer trace.Tracer) grpc.UnaryClientInterceptor {
    return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
        ctx, span := tracer.Start(ctx, method)
        defer span.End()

        err := invoker(ctx, method, req, reply, cc, opts...)
        if err != nil {
            span.RecordError(err)
            span.SetStatus(codes.Error, err.Error())
        }
        return err
    }
}

HTTP Integration

HTTP Handler Wrapper

func TracingHandler(tracer trace.Tracer, next http.Handler) http.Handler {
    return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
        ctx := otel.GetTextMapPropagator().Extract(r.Context(), propagation.HeaderCarrier(r.Header))
        ctx, span := tracer.Start(ctx, r.Method+" "+r.URL.Path)
        defer span.End()

        span.SetAttributes(
            attribute.String("http.method", r.Method),
            attribute.String("http.url", r.URL.String()),
            attribute.String("http.user_agent", r.UserAgent()),
        )

        next.ServeHTTP(w, r.WithContext(ctx))
    })
}

Error Handling

Error Recording Best Practices

Complete Error Recording

if err != nil {
    // Record error on span
    span.RecordError(err)
    span.SetStatus(codes.Error, err.Error())

    // Add error context
    span.SetAttributes(
        attribute.String("error.type", classifyError(err)),
        attribute.Bool("error.retryable", isRetryable(err)),
    )

    // Log with context
    logger.ErrorContext(ctx, "Operation failed",
        slog.Any("error", err),
        slog.String("operation", "event_processing"),
    )

    return err
}

Error Classification

func classifyError(err error) string {
    switch {
    case errors.Is(err, context.DeadlineExceeded):
        return "timeout"
    case errors.Is(err, context.Canceled):
        return "cancelled"
    case strings.Contains(err.Error(), "connection"):
        return "network"
    case strings.Contains(err.Error(), "validation"):
        return "validation"
    default:
        return "unknown"
    }
}

Performance Considerations

Span Creation Overhead

  • Span creation: ~1-2μs per span
  • Attribute setting: ~100ns per attribute
  • Event recording: ~200ns per event
  • Context propagation: ~500ns per injection/extraction

Memory Usage

  • Active span: ~500 bytes
  • Completed span buffer: ~1KB per span
  • Context overhead: ~100 bytes per context

Best Practices

  1. Limit span attributes to essential information
  2. Use batch exporters to reduce network overhead
  3. Sample appropriately for high-throughput services
  4. Pool span contexts where possible
  5. Avoid deep span nesting (>10 levels)

Troubleshooting

Missing Spans Checklist

  1. ✅ OpenTelemetry properly initialized
  2. ✅ Tracer retrieved from global provider
  3. ✅ Context propagated correctly
  4. ✅ Spans properly ended
  5. ✅ Exporter configured and accessible

Common Issues

Broken Trace Chains

// ❌ Wrong - creates new root trace
ctx, span := tracer.Start(context.Background(), "operation")

// ✅ Correct - continues existing trace
ctx, span := tracer.Start(ctx, "operation")

Missing Context Propagation

// ❌ Wrong - context not propagated
go func() {
    ctx, span := tracer.Start(context.Background(), "async_work")
    // work...
}()

// ✅ Correct - context properly propagated
go func(ctx context.Context) {
    ctx, span := tracer.Start(ctx, "async_work")
    // work...
}(ctx)

🎯 Next Steps:

Implementation: Add Observability to Your Agent

Debugging: Debug with Distributed Tracing

Metrics: Observability Metrics Reference

3 - Unified Abstraction Library API Reference

The AgentHub unified abstraction library provides simplified APIs for building gRPC-based agent communication systems with built-in observability, automatic configuration, and correlation tracking.

Unified Abstraction Library API Reference

The AgentHub unified abstraction library provides simplified APIs for building gRPC-based agent communication systems with built-in observability, automatic configuration, and correlation tracking.

Package: internal/agenthub

The internal/agenthub package contains the core unified abstraction components that dramatically simplify AgentHub development by providing high-level APIs with automatic observability integration.

Overview

The unified abstraction library reduces agent implementation complexity from 380+ lines to ~29 lines by providing:

  • Automatic gRPC Setup: One-line server and client creation
  • Built-in Observability: Integrated OpenTelemetry tracing and metrics
  • Environment-Based Configuration: Automatic configuration from environment variables
  • Correlation Tracking: Automatic correlation ID generation and propagation
  • Pluggable Architecture: Simple task handler registration

Core Components

GRPCConfig

Configuration structure for gRPC servers and clients with environment-based initialization.

type GRPCConfig struct {
    ServerAddr    string // gRPC server listen address (e.g., ":50051")
    BrokerAddr    string // Broker connection address (e.g., "localhost:50051")
    HealthPort    string // Health check endpoint port
    ComponentName string // Component identifier for observability
}

Constructor

func NewGRPCConfig(componentName string) *GRPCConfig

Creates a new gRPC configuration with environment variable defaults:

Environment VariableDefaultDescription
AGENTHUB_BROKER_ADDRlocalhostBroker server host
AGENTHUB_BROKER_PORT50051Broker gRPC port
AGENTHUB_GRPC_PORT:50051Server listen port
BROKER_HEALTH_PORT8080Health endpoint port

Example:

config := agenthub.NewGRPCConfig("my-agent")
// Results in BrokerAddr: "localhost:50051" (automatically combined)

AgentHubServer

High-level gRPC server wrapper with integrated observability.

type AgentHubServer struct {
    Server         *grpc.Server                    // Underlying gRPC server
    Listener       net.Listener                    // Network listener
    Observability  *observability.Observability    // OpenTelemetry integration
    TraceManager   *observability.TraceManager     // Distributed tracing
    MetricsManager *observability.MetricsManager   // Metrics collection
    HealthServer   *observability.HealthServer     // Health monitoring
    Logger         *slog.Logger                    // Structured logging
    Config         *GRPCConfig                     // Configuration
}

Constructor

func NewAgentHubServer(config *GRPCConfig) (*AgentHubServer, error)

Creates a complete gRPC server with:

  • OpenTelemetry instrumentation
  • Health check endpoints
  • Metrics collection
  • Structured logging with trace correlation

Methods

func (s *AgentHubServer) Start(ctx context.Context) error

Starts the server with automatic:

  • Health endpoint setup (/health, /ready, /metrics)
  • Metrics collection goroutine
  • gRPC server with observability
func (s *AgentHubServer) Shutdown(ctx context.Context) error

Gracefully shuts down all components:

  • gRPC server graceful stop
  • Health server shutdown
  • Observability cleanup

Example:

config := agenthub.NewGRPCConfig("broker")
server, err := agenthub.NewAgentHubServer(config)
if err != nil {
    log.Fatal(err)
}

// Register services
eventBusService := agenthub.NewEventBusService(server)
pb.RegisterEventBusServer(server.Server, eventBusService)

// Start server
if err := server.Start(ctx); err != nil {
    log.Fatal(err)
}

AgentHubClient

High-level gRPC client wrapper with integrated observability.

type AgentHubClient struct {
    Client         pb.EventBusClient               // gRPC client
    Connection     *grpc.ClientConn                // Connection
    Observability  *observability.Observability    // OpenTelemetry integration
    TraceManager   *observability.TraceManager     // Distributed tracing
    MetricsManager *observability.MetricsManager   // Metrics collection
    HealthServer   *observability.HealthServer     // Health monitoring
    Logger         *slog.Logger                    // Structured logging
    Config         *GRPCConfig                     // Configuration
}

Constructor

func NewAgentHubClient(config *GRPCConfig) (*AgentHubClient, error)

Creates a complete gRPC client with:

  • OpenTelemetry instrumentation
  • Connection health monitoring
  • Metrics collection
  • Automatic retry and timeout handling

Methods

func (c *AgentHubClient) Start(ctx context.Context) error

Initializes client with health monitoring and metrics collection.

func (c *AgentHubClient) Shutdown(ctx context.Context) error

Gracefully closes connection and cleans up resources.

Example:

config := agenthub.NewGRPCConfig("publisher")
client, err := agenthub.NewAgentHubClient(config)
if err != nil {
    log.Fatal(err)
}

err = client.Start(ctx)
if err != nil {
    log.Fatal(err)
}

// Use client.Client for gRPC calls

Service Abstractions

EventBusService

Broker service implementation with built-in observability and correlation tracking.

type EventBusService struct {
    Server          *AgentHubServer
    subscriptions   map[string][]Subscription
    resultSubs      map[string][]ResultSubscription
    progressSubs    map[string][]ProgressSubscription
    mu              sync.RWMutex
}

Constructor

func NewEventBusService(server *AgentHubServer) *EventBusService

Creates an EventBus service with automatic:

  • Subscription management
  • Task routing and correlation
  • Observability integration

Key Methods

func (s *EventBusService) PublishTask(ctx context.Context, req *pb.PublishTaskRequest) (*pb.PublishResponse, error)

Publishes tasks with automatic:

  • Input validation
  • Correlation ID generation
  • Distributed tracing
  • Metrics collection
func (s *EventBusService) SubscribeToTasks(req *pb.SubscribeToTasksRequest, stream pb.EventBus_SubscribeToTasksServer) error

Manages task subscriptions with:

  • Automatic subscription lifecycle
  • Context cancellation handling
  • Error recovery

SubscriberAgent

High-level subscriber implementation with pluggable task handlers.

type SubscriberAgent struct {
    client      *AgentHubClient
    agentID     string
    handlers    map[string]TaskHandler
    ctx         context.Context
    cancel      context.CancelFunc
}

Constructor

func NewSubscriberAgent(client *AgentHubClient, agentID string) *SubscriberAgent

Task Handler Interface

type TaskHandler interface {
    Handle(ctx context.Context, task *pb.TaskMessage) (*pb.TaskResult, error)
}

Methods

func (s *SubscriberAgent) RegisterHandler(taskType string, handler TaskHandler)

Registers handlers for specific task types with automatic:

  • Task routing
  • Error handling
  • Result publishing
func (s *SubscriberAgent) Start(ctx context.Context) error

Starts the subscriber with automatic:

  • Task subscription
  • Handler dispatch
  • Observability integration

Example:

type GreetingHandler struct{}

func (h *GreetingHandler) Handle(ctx context.Context, task *pb.TaskMessage) (*pb.TaskResult, error) {
    // Process greeting task
    return result, nil
}

// Register handler
subscriber.RegisterHandler("greeting", &GreetingHandler{})

Utility Functions

Metadata Operations

func ExtractCorrelationID(ctx context.Context) string
func InjectCorrelationID(ctx context.Context, correlationID string) context.Context
func GenerateCorrelationID() string

Automatic correlation ID management for distributed tracing.

Metrics Helpers

func NewMetricsTicker(ctx context.Context, manager *observability.MetricsManager) *MetricsTicker

Automatic metrics collection with configurable intervals.

Configuration Reference

Environment Variables

The unified abstraction library uses environment-based configuration:

VariableTypeDefaultDescription
AGENTHUB_BROKER_ADDRstringlocalhostBroker server hostname
AGENTHUB_BROKER_PORTstring50051Broker gRPC port
AGENTHUB_GRPC_PORTstring:50051Server listen address
BROKER_HEALTH_PORTstring8080Health endpoint port
SERVICE_VERSIONstring1.0.0Service version for observability
ENVIRONMENTstringdevelopmentDeployment environment

Observability Integration

The unified abstraction automatically configures:

  • OpenTelemetry Tracing: Automatic span creation and context propagation
  • Prometheus Metrics: 47+ built-in metrics for performance monitoring
  • Health Checks: Comprehensive health endpoints for service monitoring
  • Structured Logging: Correlated logging with trace context

Performance Characteristics

MetricStandard gRPCUnified AbstractionOverhead
Setup Complexity380+ lines~29 lines-92% code
Throughput10,000+ tasks/sec9,500+ tasks/sec-5%
LatencyBaseline+10ms for tracing+10ms
MemoryBaseline+50MB per agent+50MB
CPUBaseline+5% for observability+5%

Migration Guide

From Standard gRPC

Before (Standard gRPC):

// 380+ lines of boilerplate code
lis, err := net.Listen("tcp", ":50051")
server := grpc.NewServer()
// ... extensive setup code

After (Unified Abstraction):

// 29 lines total
config := agenthub.NewGRPCConfig("my-service")
server, err := agenthub.NewAgentHubServer(config)
service := agenthub.NewEventBusService(server)
pb.RegisterEventBusServer(server.Server, service)
server.Start(ctx)

Observability Benefits

The unified abstraction provides automatic:

  1. Distributed Tracing: Every request automatically traced
  2. Metrics Collection: 47+ metrics without configuration
  3. Health Monitoring: Built-in health and readiness endpoints
  4. Error Correlation: Automatic error tracking across services
  5. Performance Monitoring: Latency, throughput, and error rates

Error Handling

The unified abstraction provides comprehensive error handling:

  • Automatic Retries: Built-in retry logic for transient failures
  • Circuit Breaking: Protection against cascading failures
  • Graceful Degradation: Service continues operating during partial failures
  • Error Correlation: Distributed error tracking across service boundaries

Best Practices

1. Configuration Management

// Use environment-based configuration
config := agenthub.NewGRPCConfig("my-service")

// Override specific values if needed
config.HealthPort = "8083"

2. Handler Registration

// Register handlers before starting
subscriber.RegisterHandler("task-type", handler)
subscriber.Start(ctx)

3. Graceful Shutdown

// Always implement proper shutdown
defer func() {
    ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
    defer cancel()
    server.Shutdown(ctx)
}()

4. Error Handling

// Use context for cancellation
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

result, err := client.Client.PublishTask(ctx, request)
if err != nil {
    // Error is automatically traced and logged
    return fmt.Errorf("failed to publish task: %w", err)
}

See Also