This is the multi-page printable view of this section. Click here to print.

Return to the regular view of this page.

How-To Guides

Goal-oriented guides that solve specific problems and accomplish particular tasks

How-To Guides

These practical guides will help you solve specific problems and accomplish particular tasks with AgentHub. Each guide focuses on a specific goal and assumes you have some familiarity with the basic concepts.

📚 How-to Categories

📋 Before You Start

These guides assume you have:

  • Completed the Installation and Setup tutorial
  • Basic understanding of AgentHub concepts
  • A working development environment

💡 How to Use These Guides

  • Each guide is self-contained and focuses on one specific task
  • Guides can be followed in any order based on your needs
  • Code examples are production-ready where possible
  • Links to related concepts and references are provided

1 - Agent Development

Practical guides for creating and managing agents

Agent Development How-to Guides

Step-by-step guides for creating, configuring, and managing different types of agents in AgentHub.

Available Guides

1.1 - How to Create an A2A Task Publisher

Learn how to create an agent that publishes Agent2Agent (A2A) protocol-compliant tasks to other agents through the AgentHub EDA broker.

How to Create an A2A Task Publisher

This guide shows you how to create an agent that publishes Agent2Agent (A2A) protocol-compliant tasks to other agents through the AgentHub Event-Driven Architecture (EDA) broker.

Basic Setup

Using AgentHub’s unified abstractions, creating a publisher is straightforward:

package main

import (
    "context"
    "fmt"
    "time"

    "github.com/owulveryck/agenthub/internal/agenthub"
    pb "github.com/owulveryck/agenthub/events/a2a"
)

const (
    myAgentID = "my_publisher_agent"
)

func main() {
    ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
    defer cancel()

    // Create configuration with automatic observability
    config := agenthub.NewGRPCConfig("publisher")
    config.HealthPort = "8081" // Unique port for this publisher

    // Create AgentHub client with built-in observability
    client, err := agenthub.NewAgentHubClient(config)
    if err != nil {
        panic("Failed to create AgentHub client: " + err.Error())
    }

    // Automatic graceful shutdown
    defer func() {
        shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 10*time.Second)
        defer shutdownCancel()
        if err := client.Shutdown(shutdownCtx); err != nil {
            client.Logger.ErrorContext(shutdownCtx, "Error during shutdown", "error", err)
        }
    }()

    // Start the client (enables observability)
    if err := client.Start(ctx); err != nil {
        client.Logger.ErrorContext(ctx, "Failed to start client", "error", err)
        panic(err)
    }

    // Create A2A task publisher with automatic tracing and metrics
    taskPublisher := &agenthub.A2ATaskPublisher{
        Client:         client.Client,
        TraceManager:   client.TraceManager,
        MetricsManager: client.MetricsManager,
        Logger:         client.Logger,
        ComponentName:  "publisher",
        AgentID:        myAgentID,
    }

    // Your A2A task publishing code goes here
}

Publishing a Simple A2A Task

Here’s how to publish a basic A2A task using the A2ATaskPublisher abstraction:

func publishSimpleTask(ctx context.Context, taskPublisher *agenthub.A2ATaskPublisher) error {
    // Create A2A-compliant content parts
    content := []*pb.Part{
        {
            Part: &pb.Part_Text{
                Text: "Hello! Please provide a greeting for Claude.",
            },
        },
    }

    // Publish A2A task using the unified abstraction
    task, err := taskPublisher.PublishTask(ctx, &agenthub.A2APublishTaskRequest{
        TaskType:         "greeting",
        Content:          content,
        RequesterAgentID: myAgentID,
        ResponderAgentID: "agent_demo_subscriber", // Target agent
        Priority:         pb.Priority_PRIORITY_HIGH,
        ContextID:        "ctx_greeting_demo", // Optional: conversation context
    })
    if err != nil {
        return fmt.Errorf("failed to publish greeting task: %w", err)
    }

    taskPublisher.Logger.InfoContext(ctx, "Published A2A greeting task",
        "task_id", task.GetId(),
        "context_id", task.GetContextId())
    return nil
}

Publishing Different Task Types

Math Calculation Task with A2A Data Parts

func publishMathTask(ctx context.Context, taskPublisher *agenthub.A2ATaskPublisher) error {
    // Create A2A-compliant content with structured data
    content := []*pb.Part{
        {
            Part: &pb.Part_Text{
                Text: "Please perform the following mathematical calculation:",
            },
        },
        {
            Part: &pb.Part_Data{
                Data: &pb.DataPart{
                    Data: &structpb.Struct{
                        Fields: map[string]*structpb.Value{
                            "operation": structpb.NewStringValue("multiply"),
                            "a":         structpb.NewNumberValue(15.0),
                            "b":         structpb.NewNumberValue(7.0),
                        },
                    },
                },
            },
        },
    }

    // Publish A2A math task
    task, err := taskPublisher.PublishTask(ctx, &agenthub.A2APublishTaskRequest{
        TaskType:         "math_calculation",
        Content:          content,
        RequesterAgentID: myAgentID,
        ResponderAgentID: "agent_demo_subscriber",
        Priority:         pb.Priority_PRIORITY_MEDIUM,
        ContextID:        "ctx_math_demo",
    })
    if err != nil {
        return fmt.Errorf("failed to publish math task: %w", err)
    }

    taskPublisher.Logger.InfoContext(ctx, "Published A2A math task",
        "task_id", task.GetId(),
        "operation", "multiply")
    return nil
}

Data Processing Task

func publishDataProcessingTask(ctx context.Context, taskPublisher *agenthub.TaskPublisher) {
    err := taskPublisher.PublishTask(ctx, &agenthub.PublishTaskRequest{
        TaskType: "data_processing",
        Parameters: map[string]interface{}{
            "dataset_path":   "/data/customer_data.csv",
            "analysis_type":  "summary_statistics",
            "output_format":  "json",
            "filters": map[string]interface{}{
                "date_range": "last_30_days",
                "status":     "active",
            },
            // Metadata is handled automatically by TaskPublisher
            "workflow_id": "workflow_123",
            "user_id":     "user_456",
        },
        RequesterAgentID: myAgentID,
        ResponderAgentID: "data_agent",
        Priority:         pb.Priority_PRIORITY_HIGH,
    })
    if err != nil {
        panic(fmt.Sprintf("Failed to publish data processing task: %v", err))
    }
}

Broadcasting Tasks (No Specific Responder)

To broadcast a task to all available agents, omit the ResponderAgentID:

func broadcastTask(ctx context.Context, taskPublisher *agenthub.TaskPublisher) {
    err := taskPublisher.PublishTask(ctx, &agenthub.PublishTaskRequest{
        TaskType: "announcement",
        Parameters: map[string]interface{}{
            "announcement":    "Server maintenance in 30 minutes",
            "action_required": false,
        },
        RequesterAgentID: myAgentID,
        // ResponderAgentID omitted - will broadcast to all agents
        ResponderAgentID: "",
        Priority:         pb.Priority_PRIORITY_LOW,
    })
    if err != nil {
        panic(fmt.Sprintf("Failed to publish announcement: %v", err))
    }
}

Subscribing to Task Results

As a publisher, you’ll want to receive results from tasks you’ve requested. You can use the AgentHub client directly:

func subscribeToResults(ctx context.Context, client *agenthub.AgentHubClient) {
    req := &pb.SubscribeToTaskResultsRequest{
        RequesterAgentId: myAgentID,
        // TaskIds: []string{"specific_task_id"}, // Optional: filter specific tasks
    }

    stream, err := client.Client.SubscribeToTaskResults(ctx, req)
    if err != nil {
        client.Logger.ErrorContext(ctx, "Error subscribing to results", "error", err)
        return
    }

    client.Logger.InfoContext(ctx, "Subscribed to task results", "agent_id", myAgentID)

    for {
        result, err := stream.Recv()
        if err != nil {
            client.Logger.ErrorContext(ctx, "Error receiving result", "error", err)
            return
        }

        handleTaskResult(ctx, client, result)
    }
}

func handleTaskResult(ctx context.Context, client *agenthub.AgentHubClient, result *pb.TaskResult) {
    client.Logger.InfoContext(ctx, "Received task result",
        "task_id", result.GetTaskId(),
        "status", result.GetStatus().String())

    switch result.GetStatus() {
    case pb.TaskStatus_TASK_STATUS_COMPLETED:
        client.Logger.InfoContext(ctx, "Task completed successfully",
            "task_id", result.GetTaskId(),
            "result", result.GetResult().AsMap())
    case pb.TaskStatus_TASK_STATUS_FAILED:
        client.Logger.ErrorContext(ctx, "Task failed",
            "task_id", result.GetTaskId(),
            "error", result.GetErrorMessage())
    case pb.TaskStatus_TASK_STATUS_CANCELLED:
        client.Logger.InfoContext(ctx, "Task was cancelled",
            "task_id", result.GetTaskId())
    }
}

Monitoring Task Progress

Subscribe to progress updates to track long-running tasks:

func subscribeToProgress(ctx context.Context, client *agenthub.AgentHubClient) {
    req := &pb.SubscribeToTaskResultsRequest{
        RequesterAgentId: myAgentID,
    }

    stream, err := client.Client.SubscribeToTaskProgress(ctx, req)
    if err != nil {
        client.Logger.ErrorContext(ctx, "Error subscribing to progress", "error", err)
        return
    }

    client.Logger.InfoContext(ctx, "Subscribed to task progress", "agent_id", myAgentID)

    for {
        progress, err := stream.Recv()
        if err != nil {
            client.Logger.ErrorContext(ctx, "Error receiving progress", "error", err)
            return
        }

        client.Logger.InfoContext(ctx, "Task progress update",
            "task_id", progress.GetTaskId(),
            "progress_percentage", progress.GetProgressPercentage(),
            "progress_message", progress.GetProgressMessage())
    }
}

Complete Publisher Example

func main() {
    ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
    defer cancel()

    // Create configuration with automatic observability
    config := agenthub.NewGRPCConfig("publisher")
    config.HealthPort = "8081"

    // Create AgentHub client with built-in observability
    client, err := agenthub.NewAgentHubClient(config)
    if err != nil {
        panic("Failed to create AgentHub client: " + err.Error())
    }

    defer func() {
        shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 10*time.Second)
        defer shutdownCancel()
        if err := client.Shutdown(shutdownCtx); err != nil {
            client.Logger.ErrorContext(shutdownCtx, "Error during shutdown", "error", err)
        }
    }()

    // Start the client (enables observability)
    if err := client.Start(ctx); err != nil {
        client.Logger.ErrorContext(ctx, "Failed to start client", "error", err)
        panic(err)
    }

    // Create task publisher with automatic tracing and metrics
    taskPublisher := &agenthub.TaskPublisher{
        Client:         client.Client,
        TraceManager:   client.TraceManager,
        MetricsManager: client.MetricsManager,
        Logger:         client.Logger,
        ComponentName:  "publisher",
    }

    client.Logger.InfoContext(ctx, "Starting publisher demo")

    // Publish various tasks with automatic observability
    publishMathTask(ctx, taskPublisher)
    time.Sleep(2 * time.Second)

    publishDataProcessingTask(ctx, taskPublisher)
    time.Sleep(2 * time.Second)

    broadcastTask(ctx, taskPublisher)

    client.Logger.InfoContext(ctx, "All tasks published! Check subscriber logs for results")
}

Best Practices

  1. Always set a unique task ID: Use timestamps, UUIDs, or sequential IDs to ensure uniqueness.

  2. Use appropriate priorities: Reserve PRIORITY_CRITICAL for urgent tasks that must be processed immediately.

  3. Set realistic deadlines: Include deadlines for time-sensitive tasks to help agents prioritize.

  4. Handle results gracefully: Always subscribe to task results and handle failures appropriately.

  5. Include helpful metadata: Add context information that might be useful for debugging or auditing.

  6. Validate parameters: Ensure task parameters are properly structured before publishing.

  7. Use specific responder IDs when possible: This ensures tasks go to the most appropriate agent.

Your publisher is now ready to send tasks to agents and receive results!

1.2 - How to Create an A2A Task Subscriber (Agent)

Learn how to create an agent that can receive, process, and respond to Agent2Agent (A2A) protocol tasks through the AgentHub EDA broker using A2A-compliant abstractions.

How to Create an A2A Task Subscriber (Agent)

This guide shows you how to create an agent that can receive, process, and respond to Agent2Agent (A2A) protocol tasks through the AgentHub Event-Driven Architecture (EDA) broker using AgentHub’s A2A-compliant abstractions.

Basic Agent Setup

Start by creating the basic structure for your agent using the unified abstraction:

package main

import (
    "context"
    "os"
    "os/signal"
    "syscall"
    "time"

    "github.com/owulveryck/agenthub/internal/agenthub"
    pb "github.com/owulveryck/agenthub/events/a2a"
    "google.golang.org/protobuf/types/known/structpb"
)

const (
    agentID = "my_agent_processor"
)

func main() {
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    // Create configuration with automatic observability
    config := agenthub.NewGRPCConfig("subscriber")
    config.HealthPort = "8082" // Unique port for this agent

    // Create AgentHub client with built-in observability
    client, err := agenthub.NewAgentHubClient(config)
    if err != nil {
        panic("Failed to create AgentHub client: " + err.Error())
    }

    // Automatic graceful shutdown
    defer func() {
        shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 10*time.Second)
        defer shutdownCancel()
        if err := client.Shutdown(shutdownCtx); err != nil {
            client.Logger.ErrorContext(shutdownCtx, "Error during shutdown", "error", err)
        }
    }()

    // Start the client (enables observability)
    if err := client.Start(ctx); err != nil {
        client.Logger.ErrorContext(ctx, "Failed to start client", "error", err)
        panic(err)
    }

    // Create A2A task subscriber with automatic observability
    taskSubscriber := agenthub.NewA2ATaskSubscriber(client, agentID)

    // Register A2A task handlers (see below for examples)
    taskSubscriber.RegisterDefaultHandlers()

    // Handle graceful shutdown
    sigChan := make(chan os.Signal, 1)
    signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)

    go func() {
        <-sigChan
        client.Logger.Info("Received shutdown signal")
        cancel()
    }()

    client.Logger.InfoContext(ctx, "Starting subscriber agent")

    // Start task subscription (with automatic observability)
    go func() {
        if err := taskSubscriber.SubscribeToTasks(ctx); err != nil {
            client.Logger.ErrorContext(ctx, "Task subscription failed", "error", err)
        }
    }()

    // Optional: Subscribe to task results if this agent also publishes tasks
    go func() {
        if err := taskSubscriber.SubscribeToTaskResults(ctx); err != nil {
            client.Logger.ErrorContext(ctx, "Task result subscription failed", "error", err)
        }
    }()

    client.Logger.InfoContext(ctx, "Agent started with observability. Listening for tasks.")

    // Wait for context cancellation
    <-ctx.Done()
    client.Logger.Info("Agent shutdown complete")
}

Default Task Handlers

The RegisterDefaultHandlers() method provides built-in handlers for common task types:

  • greeting: Simple greeting with name parameter
  • math_calculation: Basic arithmetic operations (add, subtract, multiply, divide)
  • random_number: Random number generation with seed

Custom Task Handlers

Simple Custom Handler

Add your own task handlers using RegisterTaskHandler():

func setupCustomHandlers(taskSubscriber *agenthub.TaskSubscriber) {
    // Register a custom data processing handler
    taskSubscriber.RegisterTaskHandler("data_processing", handleDataProcessing)

    // Register a file conversion handler
    taskSubscriber.RegisterTaskHandler("file_conversion", handleFileConversion)

    // Register a status check handler
    taskSubscriber.RegisterTaskHandler("status_check", handleStatusCheck)
}

func handleDataProcessing(ctx context.Context, task *pb.TaskMessage) (*structpb.Struct, pb.TaskStatus, string) {
    params := task.GetParameters()
    datasetPath := params.Fields["dataset_path"].GetStringValue()
    analysisType := params.Fields["analysis_type"].GetStringValue()

    if datasetPath == "" {
        return nil, pb.TaskStatus_TASK_STATUS_FAILED, "dataset_path parameter is required"
    }

    // Simulate data processing
    time.Sleep(2 * time.Second)

    result, err := structpb.NewStruct(map[string]interface{}{
        "dataset_path":    datasetPath,
        "analysis_type":   analysisType,
        "records_processed": 1500,
        "processing_time": "2.1s",
        "summary": map[string]interface{}{
            "mean":   42.7,
            "median": 41.2,
            "stddev": 8.3,
        },
        "processed_at": time.Now().Format(time.RFC3339),
    })

    if err != nil {
        return nil, pb.TaskStatus_TASK_STATUS_FAILED, "Failed to create result structure"
    }

    return result, pb.TaskStatus_TASK_STATUS_COMPLETED, ""
}

Advanced Handler with Validation

func handleFileConversion(ctx context.Context, task *pb.TaskMessage) (*structpb.Struct, pb.TaskStatus, string) {
    params := task.GetParameters()

    // Extract and validate parameters
    inputPath := params.Fields["input_path"].GetStringValue()
    outputFormat := params.Fields["output_format"].GetStringValue()

    if inputPath == "" {
        return nil, pb.TaskStatus_TASK_STATUS_FAILED, "input_path parameter is required"
    }

    if outputFormat == "" {
        return nil, pb.TaskStatus_TASK_STATUS_FAILED, "output_format parameter is required"
    }

    // Validate output format
    validFormats := []string{"pdf", "docx", "txt", "html"}
    isValidFormat := false
    for _, format := range validFormats {
        if outputFormat == format {
            isValidFormat = true
            break
        }
    }

    if !isValidFormat {
        return nil, pb.TaskStatus_TASK_STATUS_FAILED, fmt.Sprintf("unsupported output format: %s", outputFormat)
    }

    // Simulate file conversion process
    time.Sleep(1 * time.Second)

    outputPath := strings.Replace(inputPath, filepath.Ext(inputPath), "."+outputFormat, 1)

    result, err := structpb.NewStruct(map[string]interface{}{
        "input_path":      inputPath,
        "output_path":     outputPath,
        "output_format":   outputFormat,
        "file_size":       "2.5MB",
        "conversion_time": "1.2s",
        "status":          "success",
        "converted_at":    time.Now().Format(time.RFC3339),
    })

    if err != nil {
        return nil, pb.TaskStatus_TASK_STATUS_FAILED, "Failed to create result structure"
    }

    return result, pb.TaskStatus_TASK_STATUS_COMPLETED, ""
}

Handler with External Service Integration

func handleStatusCheck(ctx context.Context, task *pb.TaskMessage) (*structpb.Struct, pb.TaskStatus, string) {
    params := task.GetParameters()
    serviceURL := params.Fields["service_url"].GetStringValue()

    if serviceURL == "" {
        return nil, pb.TaskStatus_TASK_STATUS_FAILED, "service_url parameter is required"
    }

    // Create HTTP client with timeout
    client := &http.Client{
        Timeout: 10 * time.Second,
    }

    // Perform health check
    resp, err := client.Get(serviceURL + "/health")
    if err != nil {
        return nil, pb.TaskStatus_TASK_STATUS_FAILED, fmt.Sprintf("Failed to reach service: %v", err)
    }
    defer resp.Body.Close()

    // Determine status
    isHealthy := resp.StatusCode >= 200 && resp.StatusCode < 300
    status := "unhealthy"
    if isHealthy {
        status = "healthy"
    }

    result, err := structpb.NewStruct(map[string]interface{}{
        "service_url":     serviceURL,
        "status":          status,
        "status_code":     resp.StatusCode,
        "response_time":   "150ms",
        "checked_at":      time.Now().Format(time.RFC3339),
    })

    if err != nil {
        return nil, pb.TaskStatus_TASK_STATUS_FAILED, "Failed to create result structure"
    }

    return result, pb.TaskStatus_TASK_STATUS_COMPLETED, ""
}

Complete Agent Example

Here’s a complete agent that handles multiple task types:

package main

import (
    "context"
    "fmt"
    "net/http"
    "os"
    "os/signal"
    "path/filepath"
    "strings"
    "syscall"
    "time"

    "github.com/owulveryck/agenthub/internal/agenthub"
    pb "github.com/owulveryck/agenthub/events/a2a"
    "google.golang.org/protobuf/types/known/structpb"
)

const agentID = "multi_task_agent"

func main() {
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    // Create AgentHub client with observability
    config := agenthub.NewGRPCConfig("subscriber")
    config.HealthPort = "8082"

    client, err := agenthub.NewAgentHubClient(config)
    if err != nil {
        panic("Failed to create AgentHub client: " + err.Error())
    }

    defer func() {
        shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 10*time.Second)
        defer shutdownCancel()
        if err := client.Shutdown(shutdownCtx); err != nil {
            client.Logger.ErrorContext(shutdownCtx, "Error during shutdown", "error", err)
        }
    }()

    if err := client.Start(ctx); err != nil {
        panic(err)
    }

    // Create and configure task subscriber
    taskSubscriber := agenthub.NewTaskSubscriber(client, agentID)

    // Register both default and custom handlers
    taskSubscriber.RegisterDefaultHandlers()
    setupCustomHandlers(taskSubscriber)

    // Graceful shutdown handling
    sigChan := make(chan os.Signal, 1)
    signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)

    go func() {
        <-sigChan
        client.Logger.Info("Received shutdown signal")
        cancel()
    }()

    client.Logger.InfoContext(ctx, "Starting multi-task agent")

    // Start subscriptions
    go func() {
        if err := taskSubscriber.SubscribeToTasks(ctx); err != nil {
            client.Logger.ErrorContext(ctx, "Task subscription failed", "error", err)
        }
    }()

    go func() {
        if err := taskSubscriber.SubscribeToTaskResults(ctx); err != nil {
            client.Logger.ErrorContext(ctx, "Task result subscription failed", "error", err)
        }
    }()

    client.Logger.InfoContext(ctx, "Agent ready to process tasks",
        "supported_tasks", []string{"greeting", "math_calculation", "random_number", "data_processing", "file_conversion", "status_check"})

    <-ctx.Done()
    client.Logger.Info("Agent shutdown complete")
}

func setupCustomHandlers(taskSubscriber *agenthub.TaskSubscriber) {
    taskSubscriber.RegisterTaskHandler("data_processing", handleDataProcessing)
    taskSubscriber.RegisterTaskHandler("file_conversion", handleFileConversion)
    taskSubscriber.RegisterTaskHandler("status_check", handleStatusCheck)
}

// ... (include the handler functions from above)

Automatic Features

The unified abstraction provides automatic features:

Observability

  • Distributed tracing for each task processing
  • Metrics collection for processing times and success rates
  • Structured logging with correlation IDs

Task Management

  • Automatic result publishing back to the broker
  • Error handling and status reporting
  • Progress tracking capabilities

Resource Management

  • Graceful shutdown handling
  • Connection management to the broker
  • Health endpoints for monitoring

Best Practices

  1. Parameter Validation: Always validate task parameters before processing

    if requiredParam == "" {
        return nil, pb.TaskStatus_TASK_STATUS_FAILED, "required_param is missing"
    }
    
  2. Error Handling: Provide meaningful error messages

    if err != nil {
        return nil, pb.TaskStatus_TASK_STATUS_FAILED, fmt.Sprintf("Processing failed: %v", err)
    }
    
  3. Timeouts: Use context with timeouts for external operations

    client := &http.Client{Timeout: 10 * time.Second}
    
  4. Resource Cleanup: Always clean up resources in handlers

    defer file.Close()
    defer resp.Body.Close()
    
  5. Structured Results: Return well-structured result data

    result, _ := structpb.NewStruct(map[string]interface{}{
        "status": "completed",
        "timestamp": time.Now().Format(time.RFC3339),
        "data": processedData,
    })
    

Handler Function Signature

All task handlers must implement the TaskHandler interface:

type TaskHandler func(ctx context.Context, task *pb.TaskMessage) (*structpb.Struct, pb.TaskStatus, string)

Return values:

  • *structpb.Struct: The result data (can be nil on failure)
  • pb.TaskStatus: One of:
    • pb.TaskStatus_TASK_STATUS_COMPLETED
    • pb.TaskStatus_TASK_STATUS_FAILED
    • pb.TaskStatus_TASK_STATUS_CANCELLED
  • string: Error message (empty string on success)

Your agent is now ready to receive and process tasks from other agents in the system with full observability and automatic result publishing!

2 - Observability

Practical guides for monitoring and observability setup

Observability How-to Guides

Practical step-by-step guides for setting up monitoring, metrics, and observability in your AgentHub deployments.

Available Guides

2.1 - How to Add Observability to Your Agent

Use AgentHub’s unified abstractions to automatically get distributed tracing, metrics, and structured logging in your agents.

How to Add Observability to Your Agent

Goal-oriented guide: Use AgentHub’s unified abstractions to automatically get distributed tracing, metrics, and structured logging in your agents with minimal configuration.

Prerequisites

  • Go 1.24+ installed
  • Basic understanding of AgentHub concepts
  • 10-15 minutes

Overview: What You Get Automatically

With AgentHub’s unified abstractions, you automatically get:

Distributed Tracing - OpenTelemetry traces with correlation IDs ✅ Comprehensive Metrics - Performance and health monitoring ✅ Structured Logging - JSON logs with trace correlation ✅ Health Endpoints - HTTP health checks and metrics endpoints ✅ Graceful Shutdown - Clean resource management

Quick Start: Observable Agent in 5 Minutes

Step 1: Create Your Agent Using Abstractions

package main

import (
	"context"
	"time"

	"github.com/owulveryck/agenthub/internal/agenthub"
)

func main() {
	ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
	defer cancel()

	// Create configuration (observability included automatically)
	config := agenthub.NewGRPCConfig("my-agent")
	config.HealthPort = "8083" // Unique port for your agent

	// Create AgentHub client (observability built-in)
	client, err := agenthub.NewAgentHubClient(config)
	if err != nil {
		panic("Failed to create AgentHub client: " + err.Error())
	}

	// Automatic graceful shutdown
	defer func() {
		shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 10*time.Second)
		defer shutdownCancel()
		if err := client.Shutdown(shutdownCtx); err != nil {
			client.Logger.ErrorContext(shutdownCtx, "Error during shutdown", "error", err)
		}
	}()

	// Start the client (enables observability)
	if err := client.Start(ctx); err != nil {
		client.Logger.ErrorContext(ctx, "Failed to start client", "error", err)
		panic(err)
	}

	// Your agent logic here...
	client.Logger.Info("My observable agent is running!")

	// Keep running
	select {}
}

That’s it! Your agent now has full observability.

Step 2: Configure Environment Variables

Set observability configuration via environment:

# Tracing configuration
export JAEGER_ENDPOINT="http://localhost:14268/api/traces"
export OTEL_SERVICE_NAME="my-agent"
export OTEL_SERVICE_VERSION="1.0.0"

# Health server port
export BROKER_HEALTH_PORT="8083"

# Broker connection
export AGENTHUB_BROKER_ADDR="localhost"
export AGENTHUB_BROKER_PORT="50051"

Step 3: Run Your Observable Agent

go run main.go

Expected Output:

time=2025-09-29T10:00:00.000Z level=INFO msg="Starting health server" port=8083
time=2025-09-29T10:00:00.000Z level=INFO msg="AgentHub client connected" broker_addr=localhost:50051
time=2025-09-29T10:00:00.000Z level=INFO msg="My observable agent is running!"

Available Observability Features

Automatic Health Endpoints

Your agent automatically exposes:

  • Health Check: http://localhost:8083/health
  • Metrics: http://localhost:8083/metrics (Prometheus format)
  • Readiness: http://localhost:8083/ready

Structured Logging

All logs are automatically structured with trace correlation:

{
  "time": "2025-09-29T10:00:00.000Z",
  "level": "INFO",
  "msg": "Task published",
  "trace_id": "abc123...",
  "span_id": "def456...",
  "task_type": "process_document",
  "correlation_id": "req_789"
}

Distributed Tracing

Traces are automatically created for:

  • gRPC calls to broker
  • Task publishing and subscribing
  • Custom operations (when you use the TraceManager)

Metrics Collection

Automatic metrics include:

  • Task processing duration
  • Success/failure rates
  • gRPC call metrics
  • Health check status

Advanced Usage

Adding Custom Tracing

Use the built-in TraceManager for custom operations:

// Custom operation with tracing
ctx, span := client.TraceManager.StartPublishSpan(ctx, "my_operation", "document")
defer span.End()

// Add custom attributes
client.TraceManager.AddComponentAttribute(span, "my-component")
span.SetAttributes(attribute.String("document.id", "doc-123"))

// Your operation logic
result, err := doCustomOperation(ctx)
if err != nil {
    span.RecordError(err)
    span.SetStatus(codes.Error, err.Error())
}

Adding Custom Metrics

Use the MetricsManager for custom metrics:

// Start timing an operation
timer := client.MetricsManager.StartTimer()
defer timer(ctx, "my_operation", "my-component")

// Your operation
processDocument()

Custom Log Fields

Use the structured logger with context:

client.Logger.InfoContext(ctx, "Processing document",
    "document_id", "doc-123",
    "user_id", "user-456",
    "processing_type", "ocr",
)

Publisher Example with Observability

package main

import (
	"context"
	"time"

	"github.com/owulveryck/agenthub/internal/agenthub"
	pb "github.com/owulveryck/agenthub/events/a2a"
	"google.golang.org/protobuf/types/known/structpb"
)

func main() {
	ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
	defer cancel()

	// Observable client setup
	config := agenthub.NewGRPCConfig("publisher")
	config.HealthPort = "8081"

	client, err := agenthub.NewAgentHubClient(config)
	if err != nil {
		panic(err)
	}
	defer client.Shutdown(context.Background())

	if err := client.Start(ctx); err != nil {
		panic(err)
	}

	// Create observable task publisher
	publisher := &agenthub.TaskPublisher{
		Client:         client.Client,
		TraceManager:   client.TraceManager,
		MetricsManager: client.MetricsManager,
		Logger:         client.Logger,
		ComponentName:  "publisher",
	}

	// Publish task with automatic tracing
	data, _ := structpb.NewStruct(map[string]interface{}{
		"message": "Hello, observable world!",
	})

	task := &pb.TaskMessage{
		TaskId:   "task-123",
		TaskType: "greeting",
		Data:     data,
		Priority: pb.Priority_MEDIUM,
	}

	// Automatically traced and metered
	if err := publisher.PublishTask(ctx, task); err != nil {
		client.Logger.ErrorContext(ctx, "Failed to publish task", "error", err)
	} else {
		client.Logger.InfoContext(ctx, "Task published successfully", "task_id", task.TaskId)
	}
}

Subscriber Example with Observability

package main

import (
	"context"
	"os"
	"os/signal"
	"syscall"

	"github.com/owulveryck/agenthub/internal/agenthub"
	pb "github.com/owulveryck/agenthub/events/a2a"
	"google.golang.org/protobuf/types/known/structpb"
)

func main() {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	// Observable client setup
	config := agenthub.NewGRPCConfig("subscriber")
	config.HealthPort = "8082"

	client, err := agenthub.NewAgentHubClient(config)
	if err != nil {
		panic(err)
	}
	defer client.Shutdown(context.Background())

	if err := client.Start(ctx); err != nil {
		panic(err)
	}

	// Create observable task subscriber
	subscriber := agenthub.NewTaskSubscriber(client, "my-subscriber")

	// Register handler with automatic tracing
	subscriber.RegisterHandler("greeting", func(ctx context.Context, task *pb.TaskMessage) (*structpb.Struct, pb.TaskStatus, string) {
		// This is automatically traced and logged
		client.Logger.InfoContext(ctx, "Processing greeting task", "task_id", task.TaskId)

		// Your processing logic
		result, _ := structpb.NewStruct(map[string]interface{}{
			"response": "Hello back!",
		})

		return result, pb.TaskStatus_COMPLETED, ""
	})

	// Start processing with automatic observability
	go subscriber.StartProcessing(ctx)

	// Graceful shutdown
	sigChan := make(chan os.Signal, 1)
	signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
	<-sigChan
}

Configuration Reference

📖 Complete Reference: For all environment variables and configuration options, see Environment Variables Reference

Key Environment Variables

VariableDescriptionDefault
JAEGER_ENDPOINTJaeger tracing endpoint"" (tracing disabled)
SERVICE_NAMEService name for tracing“agenthub-service”
SERVICE_VERSIONService version“1.0.0”
BROKER_HEALTH_PORTHealth endpoint port“8080”
AGENTHUB_BROKER_ADDRBroker address“localhost”
AGENTHUB_BROKER_PORTBroker port“50051”

Health Endpoints

Each agent exposes these endpoints:

EndpointPurposeResponse
/healthOverall health statusJSON status
/metricsPrometheus metricsMetrics format
/readyReadiness check200 OK or 503

Troubleshooting

Common Issues

IssueSolution
No traces in JaegerSet JAEGER_ENDPOINT environment variable
Health endpoint not accessibleCheck BROKER_HEALTH_PORT is unique
Logs not structuredEnsure using client.Logger not standard log
Missing correlation IDsUse context.Context in all operations

Verification Steps

  1. Check health endpoint:

    curl http://localhost:8083/health
    
  2. Verify metrics:

    curl http://localhost:8083/metrics
    
  3. Check traces in Jaeger:

    • Open http://localhost:16686
    • Search for your service name

Migration from Manual Setup

If you have existing agents using manual observability setup:

Old Approach (Manual)

// 50+ lines of OpenTelemetry setup
obs, err := observability.NewObservability(config)
traceManager := observability.NewTraceManager(serviceName)
// Manual gRPC client setup
// Manual health server setup

New Approach (Unified)

// 3 lines - everything automatic
config := agenthub.NewGRPCConfig("my-agent")
client, err := agenthub.NewAgentHubClient(config)
client.Start(ctx)

The unified abstractions provide the same observability features with 90% less code and no manual setup required.


With AgentHub’s unified abstractions, observability is no longer an add-on feature but a built-in capability that comes automatically with every agent. Focus on your business logic while the platform handles monitoring, tracing, and health checks for you.

2.2 - How to Use Grafana Dashboards

Master the AgentHub observability dashboards to monitor, analyze, and troubleshoot your event-driven system effectively.

How to Use Grafana Dashboards

Goal-oriented guide: Master the AgentHub observability dashboards to monitor, analyze, and troubleshoot your event-driven system effectively.

Prerequisites

  • AgentHub observability stack running (docker-compose up -d)
  • AgentHub agents running with observability enabled
  • Basic understanding of metrics concepts
  • 10-15 minutes

Quick Access

  • Grafana Dashboard: http://localhost:3333 (admin/admin)
  • Direct Dashboard: http://localhost:3333/d/agenthub-eda-dashboard

Dashboard Overview

The AgentHub EDA System Observatory provides comprehensive monitoring across three main areas:

  1. Event Metrics (Top Row) - Event processing performance
  2. Distributed Tracing (Middle) - Request flow visualization
  3. System Health (Bottom Row) - Infrastructure monitoring

Panel-by-Panel Guide

🚀 Event Processing Rate (Top Left)

What it shows: Events processed per second by each service

How to use:

  • Monitor throughput: See how many events your system processes
  • Identify bottlenecks: Low rates may indicate performance issues
  • Compare services: See which agents are busiest

Reading the chart:

Green line: agenthub-broker (150 events/sec)
Blue line:  agenthub-publisher (50 events/sec)
Red line:   agenthub-subscriber (145 events/sec)

Troubleshooting:

  • Flat lines: No activity - check if agents are running
  • Dropping rates: Performance degradation - check CPU/memory
  • Spiky patterns: Bursty workloads - consider load balancing

🚨 Event Processing Error Rate (Top Right)

What it shows: Percentage of events that failed processing

How to use:

  • Monitor reliability: Should stay below 5% (green zone)
  • Alert threshold: Yellow above 5%, red above 10%
  • Quick health check: Single glance system reliability

Color coding:

  • Green (0-5%): Healthy system
  • Yellow (5-10%): Moderate issues
  • Red (>10%): Critical problems

Troubleshooting:

  • High error rates: Check Jaeger for failing traces
  • Sudden spikes: Look for recent deployments or config changes
  • Persistent errors: Check logs for recurring issues

📈 Event Types Distribution (Middle Left)

What it shows: Breakdown of event types by volume

How to use:

  • Understand workload: See what types of tasks dominate
  • Capacity planning: Identify which task types need scaling
  • Anomaly detection: Unusual distributions may indicate issues

Example interpretation:

greeting: 40% (blue) - Most common task type
math_calculation: 35% (green) - Heavy computational tasks
random_number: 20% (yellow) - Quick tasks
unknown_task: 5% (red) - Error-generating tasks

Troubleshooting:

  • Missing task types: Check if specific agents are down
  • Unexpected distributions: May indicate upstream issues
  • Dominant error types: Focus optimization efforts

⏱️ Event Processing Latency (Middle Right)

What it shows: Processing time percentiles (p50, p95, p99)

How to use:

  • Performance monitoring: Track how fast events are processed
  • SLA compliance: Ensure latencies meet requirements
  • Outlier detection: p99 shows worst-case scenarios

Understanding percentiles:

  • p50 (median): 50% of events process faster than this
  • p95: 95% of events process faster than this
  • p99: 99% of events process faster than this

Healthy ranges:

  • p50: < 50ms (very responsive)
  • p95: < 200ms (good performance)
  • p99: < 500ms (acceptable outliers)

Troubleshooting:

  • Rising latencies: Check CPU/memory usage
  • High p99: Look for resource contention or long-running tasks
  • Flatlined metrics: May indicate measurement issues

🔍 Distributed Traces (Middle Section)

What it shows: Integration with Jaeger for trace visualization

How to use:

  1. Click “Explore” to open Jaeger
  2. Select service from dropdown
  3. Find specific traces to debug issues
  4. Analyze request flows across services

When to use:

  • Debugging errors: Find root cause of failures
  • Performance analysis: Identify slow operations
  • Understanding flows: See complete request journeys

🖥️ Service CPU Usage (Bottom Left)

What it shows: CPU utilization by service

How to use:

  • Capacity monitoring: Ensure services aren’t overloaded
  • Resource planning: Identify when to scale
  • Performance correlation: High CPU often explains high latency

Healthy ranges:

  • < 50%: Comfortable utilization
  • 50-70%: Moderate load
  • > 70%: Consider scaling

💾 Service Memory Usage (Bottom Center)

What it shows: Memory consumption by service

How to use:

  • Memory leak detection: Watch for continuously growing usage
  • Capacity planning: Ensure sufficient memory allocation
  • Garbage collection: High usage may impact performance

Monitoring tips:

  • Steady growth: May indicate memory leaks
  • Sawtooth pattern: Normal GC behavior
  • Sudden spikes: Check for large event batches

🧵 Go Goroutines (Bottom Right)

What it shows: Number of concurrent goroutines per service

How to use:

  • Concurrency monitoring: Track parallel processing
  • Resource leak detection: Continuously growing numbers indicate leaks
  • Performance tuning: Optimize concurrency levels

Normal patterns:

  • Stable baseline: Normal operation
  • Activity spikes: During high load
  • Continuous growth: Potential goroutine leaks

🏥 Service Health Status (Bottom Far Right)

What it shows: Up/down status of each service

How to use:

  • Quick status check: See if all services are running
  • Outage detection: Immediately identify down services
  • Health monitoring: Green = UP, Red = DOWN

Dashboard Variables and Filters

Service Filter

Location: Top of dashboard Purpose: Filter metrics by specific services Usage:

  • Select “All” to see everything
  • Choose specific services to focus analysis
  • Useful for isolating problems to specific components

Event Type Filter

Location: Top of dashboard Purpose: Filter by event/task types Usage:

  • Analyze specific workflow types
  • Debug particular task categories
  • Compare performance across task types

Time Range Selector

Location: Top right of dashboard Purpose: Control time window for analysis Common ranges:

  • 5 minutes: Real-time monitoring
  • 1 hour: Recent trend analysis
  • 24 hours: Daily pattern analysis
  • 7 days: Weekly trend and capacity planning

Advanced Usage Patterns

Performance Investigation Workflow

  1. Start with Overview:

    • Check error rates (should be < 5%)
    • Verify processing rates look normal
    • Scan for any red/yellow indicators
  2. Drill Down on Issues:

    • If high error rates → check distributed traces
    • If high latency → examine CPU/memory usage
    • If low throughput → check service health
  3. Root Cause Analysis:

    • Use time range selector to find when problems started
    • Filter by specific services to isolate issues
    • Correlate metrics across different panels

Capacity Planning Workflow

  1. Analyze Peak Patterns:

    • Set time range to 7 days
    • Identify peak usage periods
    • Note maximum throughput achieved
  2. Resource Utilization:

    • Check CPU usage during peaks
    • Monitor memory consumption trends
    • Verify goroutine scaling behavior
  3. Plan Scaling:

    • If CPU > 70% during peaks, scale up
    • If memory continuously growing, investigate leaks
    • If error rates spike during load, optimize before scaling

Troubleshooting Workflow

  1. Identify Symptoms:

    • High error rates: Focus on traces and logs
    • High latency: Check resource utilization
    • Low throughput: Verify service health
  2. Time Correlation:

    • Use time range to find when issues started
    • Look for correlated changes across metrics
    • Check for deployment or configuration changes
  3. Service Isolation:

    • Use service filter to identify problematic components
    • Compare healthy vs unhealthy services
    • Check inter-service dependencies

Dashboard Customization

Adding New Panels

  1. Click “+ Add panel” in top menu
  2. Choose visualization type:
    • Time series for trends
    • Stat for current values
    • Gauge for thresholds
  3. Configure query:
    # Example: Custom error rate
    rate(my_custom_errors_total[5m]) / rate(my_custom_requests_total[5m]) * 100
    

Creating Alerts

  1. Edit existing panel or create new one
  2. Click “Alert” tab
  3. Configure conditions:
    Query: rate(event_errors_total[5m]) / rate(events_processed_total[5m]) * 100
    Condition: IS ABOVE 5
    Evaluation: Every 1m for 2m
    
  4. Set notification channels

Custom Time Ranges

  1. Click time picker (top right)
  2. Select “Custom range”
  3. Set specific dates/times for historical analysis
  4. Use “Refresh” settings for auto-updating

Troubleshooting Dashboard Issues

Dashboard Not Loading

# Check Grafana status
docker-compose ps grafana

# Check Grafana logs
docker-compose logs grafana

# Restart if needed
docker-compose restart grafana

No Data in Panels

# Check Prometheus connection
curl http://localhost:9090/api/v1/targets

# Verify agents are exposing metrics
curl http://localhost:8080/metrics
curl http://localhost:8081/metrics
curl http://localhost:8082/metrics

# Check Prometheus configuration
docker-compose logs prometheus

Slow Dashboard Performance

  1. Reduce time range: Use shorter windows for better performance
  2. Limit service selection: Filter to specific services
  3. Optimize queries: Use appropriate rate intervals
  4. Check resource usage: Ensure Prometheus has enough memory

Authentication Issues

  • Default credentials: admin/admin
  • Reset password: Through Grafana UI after first login
  • Lost access: Restart Grafana container to reset

Best Practices

Regular Monitoring

  • Check dashboard daily: Quick health overview
  • Weekly reviews: Trend analysis and capacity planning
  • Set up alerts: Proactive monitoring for critical metrics

Performance Optimization

  • Use appropriate time ranges: Don’t query more data than needed
  • Filter effectively: Use service and event type filters
  • Refresh intervals: Balance real-time needs with performance

Team Usage

  • Share dashboard URLs: Bookmark specific views
  • Create annotations: Mark deployments and incidents
  • Export snapshots: Share findings with team members

Integration with Other Tools

Jaeger Integration

  • Click Explore in traces panel
  • Auto-links to Jaeger with service context
  • Correlate traces with metrics timeframes

Prometheus Integration

  • Click Explore on any panel
  • Edit queries in Prometheus query language
  • Access raw metrics for custom analysis

Log Correlation

  • Use trace IDs from Jaeger
  • Search logs for matching trace IDs
  • Correlate log events with metric spikes

🎯 Next Steps:

Deep Debugging: Debug with Distributed Tracing

Production Setup: Configure Alerts

Understanding: Observability Architecture Explained

3 - Agent2Agent Protocol

Learn how to work with Agent2Agent (A2A) protocol components including messages, conversation contexts, artifacts, and task lifecycle management.

Agent2Agent Protocol How-To Guides

This section provides practical guides for working with the Agent2Agent (A2A) protocol in AgentHub. These guides show you how to implement A2A-compliant communication patterns for building robust agent systems.

Available Guides

Working with A2A Messages

Learn how to create, structure, and process A2A messages with text, data, and file content parts. This is the foundation for all A2A communication.

Working with A2A Conversation Context

Understand how to manage conversation contexts for multi-turn interactions, workflow coordination, and state preservation across agent communications.

Working with A2A Artifacts

Master the creation and handling of A2A artifacts - structured outputs that deliver rich results from completed tasks.

Working with A2A Task Lifecycle

Learn how to manage the complete task lifecycle from creation through completion, including state transitions, progress updates, and error handling.

A2A Protocol Benefits

The Agent2Agent protocol provides:

  • Structured Communication: Standardized message formats with rich content types
  • Conversation Threading: Context-aware message grouping for complex workflows
  • Rich Artifacts: Structured outputs with multiple content types
  • Lifecycle Management: Complete task state tracking from submission to completion
  • Interoperability: Standards-based communication for multi-vendor agent systems

Prerequisites

Before following these guides:

  1. Complete the Installation and Setup tutorial
  2. Run the AgentHub Demo to see A2A in action
  3. Understand the Agent2Agent Principle

Implementation Approach

These guides use AgentHub’s unified abstractions from internal/agenthub which provide:

  • A2ATaskPublisher: Simplified A2A task creation and publishing
  • A2ATaskSubscriber: Streamlined A2A task processing and response generation
  • Automatic Observability: Built-in tracing, metrics, and logging
  • Environment Configuration: Zero-config setup with environment variables

Start with the A2A Messages guide to learn the fundamentals, then progress through the other guides to build complete A2A-compliant agent systems.

3.1 - How to Work with A2A Messages

Learn how to create, structure, and work with Agent2Agent protocol messages including text, data, and file parts.

How to Work with A2A Messages

This guide shows you how to create and work with Agent2Agent (A2A) protocol messages using AgentHub’s unified abstractions. A2A messages are the foundation of all agent communication.

Understanding A2A Message Structure

A2A messages consist of several key components:

  • Message ID: Unique identifier for the message
  • Context ID: Groups related messages in a conversation
  • Task ID: Links the message to a specific task
  • Role: Indicates if the message is from USER (requester) or AGENT (responder)
  • Content Parts: The actual message content (text, data, or files)
  • Metadata: Additional context for routing and processing

Creating Basic A2A Messages

Text Messages

Create a simple text message:

package main

import (
    "context"
    "fmt"
    "log"

    "github.com/google/uuid"
    pb "github.com/owulveryck/agenthub/events/a2a"
    "google.golang.org/protobuf/types/known/timestamppb"
)

func createTextMessage() *pb.Message {
    return &pb.Message{
        MessageId: fmt.Sprintf("msg_%s", uuid.New().String()),
        ContextId: "conversation_greeting",
        Role:      pb.Role_USER,
        Content: []*pb.Part{
            {
                Part: &pb.Part_Text{
                    Text: "Hello! Please process this greeting request.",
                },
            },
        },
        Metadata: nil, // Optional
    }
}

Data Messages

Include structured data in your message:

import (
    "google.golang.org/protobuf/types/known/structpb"
)

func createDataMessage() *pb.Message {
    // Create structured data
    data, err := structpb.NewStruct(map[string]interface{}{
        "operation": "calculate",
        "numbers":   []float64{10, 20, 30},
        "formula":   "sum",
        "precision": 2,
    })
    if err != nil {
        log.Fatal(err)
    }

    return &pb.Message{
        MessageId: fmt.Sprintf("msg_%s", uuid.New().String()),
        ContextId: "conversation_math",
        Role:      pb.Role_USER,
        Content: []*pb.Part{
            {
                Part: &pb.Part_Text{
                    Text: "Please perform the calculation described in the data.",
                },
            },
            {
                Part: &pb.Part_Data{
                    Data: &pb.DataPart{
                        Data:        data,
                        Description: "Calculation parameters",
                    },
                },
            },
        },
    }
}

File Reference Messages

Reference files in your messages:

func createFileMessage() *pb.Message {
    // Create file metadata
    fileMetadata, _ := structpb.NewStruct(map[string]interface{}{
        "source":      "user_upload",
        "category":    "image",
        "permissions": "read-only",
    })

    return &pb.Message{
        MessageId: fmt.Sprintf("msg_%s", uuid.New().String()),
        ContextId: "conversation_image_analysis",
        Role:      pb.Role_USER,
        Content: []*pb.Part{
            {
                Part: &pb.Part_Text{
                    Text: "Please analyze the uploaded image.",
                },
            },
            {
                Part: &pb.Part_File{
                    File: &pb.FilePart{
                        FileId:   "file_abc123",
                        Filename: "analysis_target.jpg",
                        MimeType: "image/jpeg",
                        SizeBytes: 2048576, // 2MB
                        Metadata:  fileMetadata,
                    },
                },
            },
        },
    }
}

Working with Mixed Content

Combine multiple part types in a single message:

func createMixedContentMessage() *pb.Message {
    // Configuration data
    config, _ := structpb.NewStruct(map[string]interface{}{
        "format":     "json",
        "output_dir": "/results",
        "compress":   true,
    })

    return &pb.Message{
        MessageId: fmt.Sprintf("msg_%s", uuid.New().String()),
        ContextId: "conversation_data_processing",
        Role:      pb.Role_USER,
        Content: []*pb.Part{
            {
                Part: &pb.Part_Text{
                    Text: "Process the dataset with the following configuration and source file.",
                },
            },
            {
                Part: &pb.Part_Data{
                    Data: &pb.DataPart{
                        Data:        config,
                        Description: "Processing configuration",
                    },
                },
            },
            {
                Part: &pb.Part_File{
                    File: &pb.FilePart{
                        FileId:   "dataset_xyz789",
                        Filename: "raw_data.csv",
                        MimeType: "text/csv",
                        SizeBytes: 5242880, // 5MB
                    },
                },
            },
        },
    }
}

Publishing A2A Messages

Use AgentHub’s unified abstractions to publish messages:

package main

import (
    "context"
    "log"

    "github.com/owulveryck/agenthub/internal/agenthub"
    pb "github.com/owulveryck/agenthub/events/eventbus"
)

func publishA2AMessage(ctx context.Context) error {
    // Create AgentHub client
    config := agenthub.NewGRPCConfig("message_publisher")
    client, err := agenthub.NewAgentHubClient(config)
    if err != nil {
        return err
    }
    defer client.Close()

    // Create A2A message
    message := createTextMessage()

    // Publish using AgentHub client
    response, err := client.Client.PublishMessage(ctx, &pb.PublishMessageRequest{
        Message: message,
        Routing: &pb.AgentEventMetadata{
            FromAgentId: "message_publisher",
            ToAgentId:   "message_processor",
            EventType:   "a2a.message",
            Priority:    pb.Priority_PRIORITY_MEDIUM,
        },
    })

    if err != nil {
        return err
    }

    log.Printf("A2A message published: %s", response.GetEventId())
    return nil
}

Processing Received A2A Messages

Handle incoming A2A messages in your agent:

func processA2AMessage(ctx context.Context, message *pb.Message) (string, error) {
    var response string

    // Process each content part
    for i, part := range message.GetContent() {
        switch content := part.GetPart().(type) {
        case *pb.Part_Text:
            log.Printf("Text part %d: %s", i, content.Text)
            response += fmt.Sprintf("Processed text: %s\n", content.Text)

        case *pb.Part_Data:
            log.Printf("Data part %d: %s", i, content.Data.GetDescription())
            // Process structured data
            data := content.Data.GetData()
            response += fmt.Sprintf("Processed data: %s\n", content.Data.GetDescription())

            // Access specific fields
            if operation, ok := data.GetFields()["operation"]; ok {
                log.Printf("Operation: %s", operation.GetStringValue())
            }

        case *pb.Part_File:
            log.Printf("File part %d: %s (%s)", i, content.File.GetFilename(), content.File.GetMimeType())
            response += fmt.Sprintf("Processed file: %s\n", content.File.GetFilename())

            // Handle file processing based on MIME type
            switch content.File.GetMimeType() {
            case "image/jpeg", "image/png":
                // Process image
                response += "Image analysis completed\n"
            case "text/csv":
                // Process CSV data
                response += "CSV data parsed\n"
            }
        }
    }

    return response, nil
}

Message Role Management

Properly set message roles for A2A compliance:

// User message (requesting work)
func createUserMessage(content string) *pb.Message {
    return &pb.Message{
        MessageId: fmt.Sprintf("msg_%s", uuid.New().String()),
        Role:      pb.Role_USER,
        Content: []*pb.Part{
            {
                Part: &pb.Part_Text{Text: content},
            },
        },
    }
}

// Agent response message
func createAgentResponse(contextId, taskId, response string) *pb.Message {
    return &pb.Message{
        MessageId: fmt.Sprintf("msg_%s", uuid.New().String()),
        ContextId: contextId,
        TaskId:    taskId,
        Role:      pb.Role_AGENT,
        Content: []*pb.Part{
            {
                Part: &pb.Part_Text{Text: response},
            },
        },
    }
}

Message Validation

Validate A2A messages before publishing:

func validateA2AMessage(message *pb.Message) error {
    if message.GetMessageId() == "" {
        return fmt.Errorf("message_id is required")
    }

    if message.GetRole() == pb.Role_ROLE_UNSPECIFIED {
        return fmt.Errorf("role must be specified (USER or AGENT)")
    }

    if len(message.GetContent()) == 0 {
        return fmt.Errorf("message must have at least one content part")
    }

    // Validate each part
    for i, part := range message.GetContent() {
        if part.GetPart() == nil {
            return fmt.Errorf("content part %d is empty", i)
        }
    }

    return nil
}

Best Practices

1. Always Use Unique Message IDs

messageID := fmt.Sprintf("msg_%d_%s", time.Now().Unix(), uuid.New().String())
contextID := fmt.Sprintf("ctx_%s_%s", workflowType, uuid.New().String())

3. Include Descriptive Metadata for Complex Data

dataPart := &pb.DataPart{
    Data:        structData,
    Description: "User preferences for recommendation engine",
}

4. Validate Messages Before Publishing

if err := validateA2AMessage(message); err != nil {
    return fmt.Errorf("invalid A2A message: %w", err)
}

5. Handle All Part Types in Message Processors

switch content := part.GetPart().(type) {
case *pb.Part_Text:
    // Handle text
case *pb.Part_Data:
    // Handle structured data
case *pb.Part_File:
    // Handle file references
default:
    log.Printf("Unknown part type: %T", content)
}

This guide covered the fundamentals of working with A2A messages. Next, learn about A2A Conversation Context to group related messages and maintain conversation state across multiple interactions.

3.2 - How to Work with A2A Conversation Context

Learn how to manage conversation contexts in Agent2Agent protocol for multi-turn interactions and workflow coordination.

How to Work with A2A Conversation Context

This guide shows you how to use A2A conversation contexts to group related messages, maintain state across interactions, and coordinate multi-agent workflows.

Understanding A2A Conversation Context

A2A conversation context is identified by a context_id that groups related messages and tasks. This enables:

  • Multi-turn conversations between agents
  • Workflow coordination across multiple tasks
  • State preservation throughout long-running processes
  • Message threading for audit trails
  • Context-aware routing based on conversation history

Creating Conversation Contexts

Simple Conversation Context

Start a basic conversation context:

package main

import (
    "fmt"
    "github.com/google/uuid"
    pb "github.com/owulveryck/agenthub/events/a2a"
)

func createConversationContext(workflowType string) string {
    return fmt.Sprintf("ctx_%s_%s", workflowType, uuid.New().String())
}

func startConversation() *pb.Message {
    contextID := createConversationContext("user_onboarding")

    return &pb.Message{
        MessageId: fmt.Sprintf("msg_%s", uuid.New().String()),
        ContextId: contextID,
        Role:      pb.Role_USER,
        Content: []*pb.Part{
            {
                Part: &pb.Part_Text{
                    Text: "Please start the user onboarding process for new user.",
                },
            },
        },
    }
}

Workflow-Specific Contexts

Create contexts for different workflow types:

func createWorkflowContexts() map[string]string {
    return map[string]string{
        "data_analysis":    createConversationContext("data_analysis"),
        "image_processing": createConversationContext("image_processing"),
        "user_support":     createConversationContext("user_support"),
        "integration_test": createConversationContext("integration_test"),
    }
}

Multi-Turn Conversations

Conversation Initiation

Start a conversation with initial context:

import (
    "google.golang.org/protobuf/types/known/structpb"
)

func initiateDataAnalysisConversation() *pb.Message {
    contextID := createConversationContext("data_analysis")

    // Initial conversation metadata
    contextMetadata, _ := structpb.NewStruct(map[string]interface{}{
        "workflow_type":    "data_analysis",
        "initiated_by":     "user_12345",
        "priority":         "high",
        "expected_steps":   []string{"validation", "processing", "analysis", "report"},
        "timeout_minutes":  30,
    })

    return &pb.Message{
        MessageId: fmt.Sprintf("msg_%s", uuid.New().String()),
        ContextId: contextID,
        Role:      pb.Role_USER,
        Content: []*pb.Part{
            {
                Part: &pb.Part_Text{
                    Text: "Please analyze the uploaded dataset and provide insights.",
                },
            },
            {
                Part: &pb.Part_Data{
                    Data: &pb.DataPart{
                        Data:        contextMetadata,
                        Description: "Conversation context and workflow parameters",
                    },
                },
            },
        },
        Metadata: contextMetadata,
    }
}

Continuing the Conversation

Add follow-up messages to the same context:

func continueConversation(contextID, previousMessageID string) *pb.Message {
    return &pb.Message{
        MessageId: fmt.Sprintf("msg_%s", uuid.New().String()),
        ContextId: contextID, // Same context as initial message
        Role:      pb.Role_USER,
        Content: []*pb.Part{
            {
                Part: &pb.Part_Text{
                    Text: "Can you also include trend analysis in the report?",
                },
            },
        },
        Metadata: &structpb.Struct{
            Fields: map[string]*structpb.Value{
                "follows_message": structpb.NewStringValue(previousMessageID),
                "conversation_turn": structpb.NewNumberValue(2),
            },
        },
    }
}

Agent Responses in Context

Agents respond within the same conversation context:

func createAgentResponse(contextID, requestMessageID, response string) *pb.Message {
    return &pb.Message{
        MessageId: fmt.Sprintf("msg_%s", uuid.New().String()),
        ContextId: contextID, // Same context as request
        Role:      pb.Role_AGENT,
        Content: []*pb.Part{
            {
                Part: &pb.Part_Text{
                    Text: response,
                },
            },
        },
        Metadata: &structpb.Struct{
            Fields: map[string]*structpb.Value{
                "responding_to": structpb.NewStringValue(requestMessageID),
                "agent_id":      structpb.NewStringValue("data_analysis_agent"),
            },
        },
    }
}

Context-Aware Task Management

Creating Tasks with Context

Link tasks to conversation contexts:

import (
    "google.golang.org/protobuf/types/known/timestamppb"
)

func createContextAwareTask(contextID string) *pb.Task {
    taskID := fmt.Sprintf("task_%s_%s", "analysis", uuid.New().String())

    return &pb.Task{
        Id:        taskID,
        ContextId: contextID, // Link to conversation
        Status: &pb.TaskStatus{
            State: pb.TaskState_TASK_STATE_SUBMITTED,
            Update: &pb.Message{
                MessageId: fmt.Sprintf("msg_%s", uuid.New().String()),
                ContextId: contextID,
                TaskId:    taskID,
                Role:      pb.Role_USER,
                Content: []*pb.Part{
                    {
                        Part: &pb.Part_Text{
                            Text: "Task submitted for data analysis workflow",
                        },
                    },
                },
            },
            Timestamp: timestamppb.Now(),
        },
        History: []*pb.Message{}, // Will be populated during processing
        Artifacts: []*pb.Artifact{}, // Will be populated on completion
    }
}

Context-Based Task Querying

Retrieve all tasks for a conversation context:

func getTasksForContext(ctx context.Context, client pb.AgentHubClient, contextID string) ([]*pb.Task, error) {
    response, err := client.ListTasks(ctx, &pb.ListTasksRequest{
        ContextId: contextID,
        Limit:     100,
    })
    if err != nil {
        return nil, err
    }

    return response.GetTasks(), nil
}

Workflow Coordination

Multi-Agent Workflow with Shared Context

Coordinate multiple agents within a single conversation:

type WorkflowCoordinator struct {
    client    pb.AgentHubClient
    contextID string
    logger    *log.Logger
}

func (wc *WorkflowCoordinator) ExecuteDataPipeline(ctx context.Context) error {
    // Step 1: Data Validation
    validationTask := &pb.Task{
        Id:        fmt.Sprintf("task_validation_%s", uuid.New().String()),
        ContextId: wc.contextID,
        Status: &pb.TaskStatus{
            State: pb.TaskState_TASK_STATE_SUBMITTED,
            Update: &pb.Message{
                MessageId: fmt.Sprintf("msg_%s", uuid.New().String()),
                ContextId: wc.contextID,
                Role:      pb.Role_USER,
                Content: []*pb.Part{
                    {
                        Part: &pb.Part_Text{
                            Text: "Validate uploaded dataset for quality and completeness",
                        },
                    },
                },
            },
            Timestamp: timestamppb.Now(),
        },
    }

    // Publish validation task
    _, err := wc.client.PublishTaskUpdate(ctx, &pb.PublishTaskUpdateRequest{
        Task: validationTask,
        Routing: &pb.AgentEventMetadata{
            FromAgentId: "workflow_coordinator",
            ToAgentId:   "data_validator",
            EventType:   "task.validation",
            Priority:    pb.Priority_PRIORITY_HIGH,
        },
    })
    if err != nil {
        return err
    }

    // Step 2: Data Processing (after validation)
    processingTask := &pb.Task{
        Id:        fmt.Sprintf("task_processing_%s", uuid.New().String()),
        ContextId: wc.contextID, // Same context
        Status: &pb.TaskStatus{
            State: pb.TaskState_TASK_STATE_SUBMITTED,
            Update: &pb.Message{
                MessageId: fmt.Sprintf("msg_%s", uuid.New().String()),
                ContextId: wc.contextID,
                Role:      pb.Role_USER,
                Content: []*pb.Part{
                    {
                        Part: &pb.Part_Text{
                            Text: "Process validated dataset and extract features",
                        },
                    },
                },
                Metadata: &structpb.Struct{
                    Fields: map[string]*structpb.Value{
                        "depends_on": structpb.NewStringValue(validationTask.GetId()),
                        "workflow_step": structpb.NewNumberValue(2),
                    },
                },
            },
            Timestamp: timestamppb.Now(),
        },
    }

    // Publish processing task
    _, err = wc.client.PublishTaskUpdate(ctx, &pb.PublishTaskUpdateRequest{
        Task: processingTask,
        Routing: &pb.AgentEventMetadata{
            FromAgentId: "workflow_coordinator",
            ToAgentId:   "data_processor",
            EventType:   "task.processing",
            Priority:    pb.Priority_PRIORITY_MEDIUM,
        },
    })

    return err
}

Context State Management

Tracking Conversation State

Maintain state throughout the conversation:

type ConversationState struct {
    ContextID     string                 `json:"context_id"`
    WorkflowType  string                 `json:"workflow_type"`
    CurrentStep   int                    `json:"current_step"`
    TotalSteps    int                    `json:"total_steps"`
    CompletedTasks []string              `json:"completed_tasks"`
    PendingTasks   []string              `json:"pending_tasks"`
    Variables      map[string]interface{} `json:"variables"`
    CreatedAt      time.Time             `json:"created_at"`
    UpdatedAt      time.Time             `json:"updated_at"`
}

func (cs *ConversationState) ToMetadata() (*structpb.Struct, error) {
    data := map[string]interface{}{
        "context_id":      cs.ContextID,
        "workflow_type":   cs.WorkflowType,
        "current_step":    cs.CurrentStep,
        "total_steps":     cs.TotalSteps,
        "completed_tasks": cs.CompletedTasks,
        "pending_tasks":   cs.PendingTasks,
        "variables":       cs.Variables,
        "updated_at":      cs.UpdatedAt.Format(time.RFC3339),
    }

    return structpb.NewStruct(data)
}

func (cs *ConversationState) UpdateFromMessage(message *pb.Message) {
    cs.UpdatedAt = time.Now()

    // Extract state updates from message metadata
    if metadata := message.GetMetadata(); metadata != nil {
        if step, ok := metadata.GetFields()["current_step"]; ok {
            cs.CurrentStep = int(step.GetNumberValue())
        }

        if vars, ok := metadata.GetFields()["variables"]; ok {
            if varsStruct := vars.GetStructValue(); varsStruct != nil {
                for key, value := range varsStruct.GetFields() {
                    cs.Variables[key] = value
                }
            }
        }
    }
}

State-Aware Message Creation

Include conversation state in messages:

func createStateAwareMessage(contextID string, state *ConversationState, content string) *pb.Message {
    stateMetadata, _ := state.ToMetadata()

    return &pb.Message{
        MessageId: fmt.Sprintf("msg_%s", uuid.New().String()),
        ContextId: contextID,
        Role:      pb.Role_USER,
        Content: []*pb.Part{
            {
                Part: &pb.Part_Text{
                    Text: content,
                },
            },
            {
                Part: &pb.Part_Data{
                    Data: &pb.DataPart{
                        Data:        stateMetadata,
                        Description: "Current conversation state",
                    },
                },
            },
        },
        Metadata: stateMetadata,
    }
}

Context-Based Routing

Route Messages Based on Context

Use conversation context for intelligent routing:

func routeByContext(contextID string) *pb.AgentEventMetadata {
    // Determine routing based on context type
    var targetAgent string
    var eventType string

    if strings.Contains(contextID, "data_analysis") {
        targetAgent = "data_analysis_agent"
        eventType = "data.analysis"
    } else if strings.Contains(contextID, "image_processing") {
        targetAgent = "image_processor"
        eventType = "image.processing"
    } else if strings.Contains(contextID, "user_support") {
        targetAgent = "support_agent"
        eventType = "support.request"
    } else {
        targetAgent = "" // Broadcast to all agents
        eventType = "general.message"
    }

    return &pb.AgentEventMetadata{
        FromAgentId:   "context_router",
        ToAgentId:     targetAgent,
        EventType:     eventType,
        Subscriptions: []string{eventType},
        Priority:      pb.Priority_PRIORITY_MEDIUM,
    }
}

Subscribe to Context-Specific Events

Agents can subscribe to specific conversation contexts:

func subscribeToContextEvents(ctx context.Context, client pb.AgentHubClient, agentID, contextPattern string) error {
    stream, err := client.SubscribeToMessages(ctx, &pb.SubscribeToMessagesRequest{
        AgentId: agentID,
        ContextPattern: contextPattern, // e.g., "ctx_data_analysis_*"
    })
    if err != nil {
        return err
    }

    for {
        event, err := stream.Recv()
        if err != nil {
            return err
        }

        if message := event.GetMessage(); message != nil {
            log.Printf("Received context message: %s in context: %s",
                message.GetMessageId(), message.GetContextId())

            // Process message within context
            processContextMessage(ctx, message)
        }
    }
}

Best Practices

1. Use Descriptive Context IDs

contextID := fmt.Sprintf("ctx_%s_%s_%s", workflowType, userID, uuid.New().String())
// All messages in the same workflow should use the same context_id
message.ContextId = existingContextID

3. Include Context Metadata for State Tracking

contextMetadata := map[string]interface{}{
    "workflow_type":   "data_pipeline",
    "initiated_by":    userID,
    "current_step":    stepNumber,
    "total_steps":     totalSteps,
}

4. Use Context for Task Dependencies

taskMetadata := map[string]interface{}{
    "context_id":     contextID,
    "depends_on":     previousTaskID,
    "workflow_step":  stepNumber,
}

5. Handle Context Cleanup

// Set context expiration for long-running workflows
contextMetadata["expires_at"] = time.Now().Add(24 * time.Hour).Format(time.RFC3339)

This guide covered conversation context management in A2A protocol. Next, learn about Working with A2A Artifacts to understand how to create and manage structured outputs from completed tasks.

3.3 - How to Work with A2A Artifacts

Learn how to create, structure, and deliver Agent2Agent protocol artifacts as structured outputs from completed tasks.

How to Work with A2A Artifacts

This guide shows you how to create and work with Agent2Agent (A2A) protocol artifacts, which are structured outputs delivered when tasks are completed. Artifacts provide rich, typed results that can include text reports, data files, structured data, and more.

Understanding A2A Artifacts

A2A artifacts are structured containers for task outputs that include:

  • Artifact ID: Unique identifier for the artifact
  • Name: Human-readable name for the artifact
  • Description: Explanation of what the artifact contains
  • Parts: The actual content (text, data, files)
  • Metadata: Additional context about the artifact

Artifacts are typically generated when tasks reach TASK_STATE_COMPLETED status.

Creating Basic Artifacts

Text Report Artifacts

Create simple text-based results:

package main

import (
    "fmt"
    "github.com/google/uuid"
    pb "github.com/owulveryck/agenthub/events/a2a"
    "google.golang.org/protobuf/types/known/structpb"
)

func createTextReportArtifact(taskID, reportContent string) *pb.Artifact {
    return &pb.Artifact{
        ArtifactId:  fmt.Sprintf("artifact_%s_%s", taskID, uuid.New().String()),
        Name:        "Analysis Report",
        Description: "Detailed analysis results and recommendations",
        Parts: []*pb.Part{
            {
                Part: &pb.Part_Text{
                    Text: reportContent,
                },
            },
        },
        Metadata: &structpb.Struct{
            Fields: map[string]*structpb.Value{
                "artifact_type": structpb.NewStringValue("report"),
                "format":        structpb.NewStringValue("text"),
                "task_id":       structpb.NewStringValue(taskID),
                "generated_at":  structpb.NewStringValue(time.Now().Format(time.RFC3339)),
            },
        },
    }
}

Data Analysis Artifacts

Create artifacts with structured analysis results:

func createDataAnalysisArtifact(taskID string, results map[string]interface{}) *pb.Artifact {
    // Convert results to structured data
    resultsData, err := structpb.NewStruct(results)
    if err != nil {
        log.Printf("Error creating results data: %v", err)
        resultsData = &structpb.Struct{}
    }

    // Create summary statistics
    summary := map[string]interface{}{
        "total_records":    results["record_count"],
        "processing_time":  results["duration_ms"],
        "success_rate":     results["success_percentage"],
        "anomalies_found":  results["anomaly_count"],
    }
    summaryData, _ := structpb.NewStruct(summary)

    return &pb.Artifact{
        ArtifactId:  fmt.Sprintf("artifact_analysis_%s", uuid.New().String()),
        Name:        "Data Analysis Results",
        Description: "Complete analysis results with statistics and insights",
        Parts: []*pb.Part{
            {
                Part: &pb.Part_Text{
                    Text: "Data analysis completed successfully. See attached results for detailed findings.",
                },
            },
            {
                Part: &pb.Part_Data{
                    Data: &pb.DataPart{
                        Data:        resultsData,
                        Description: "Complete analysis results",
                    },
                },
            },
            {
                Part: &pb.Part_Data{
                    Data: &pb.DataPart{
                        Data:        summaryData,
                        Description: "Summary statistics",
                    },
                },
            },
        },
        Metadata: &structpb.Struct{
            Fields: map[string]*structpb.Value{
                "artifact_type":   structpb.NewStringValue("analysis"),
                "analysis_type":   structpb.NewStringValue("statistical"),
                "data_source":     structpb.NewStringValue(results["source"].(string)),
                "record_count":    structpb.NewNumberValue(results["record_count"].(float64)),
                "processing_time": structpb.NewNumberValue(results["duration_ms"].(float64)),
            },
        },
    }
}

File-Based Artifacts

Create artifacts that reference generated files:

func createFileArtifact(taskID, fileID, filename, mimeType string, sizeBytes int64) *pb.Artifact {
    // File metadata
    fileMetadata, _ := structpb.NewStruct(map[string]interface{}{
        "generated_by":   "data_processor_v1.2",
        "file_version":   "1.0",
        "encoding":       "utf-8",
        "compression":    "gzip",
        "checksum_sha256": "abc123...", // Calculate actual checksum
    })

    return &pb.Artifact{
        ArtifactId:  fmt.Sprintf("artifact_file_%s", uuid.New().String()),
        Name:        "Processed Dataset",
        Description: "Cleaned and processed dataset ready for analysis",
        Parts: []*pb.Part{
            {
                Part: &pb.Part_Text{
                    Text: fmt.Sprintf("Dataset processing completed. Generated file: %s", filename),
                },
            },
            {
                Part: &pb.Part_File{
                    File: &pb.FilePart{
                        FileId:   fileID,
                        Filename: filename,
                        MimeType: mimeType,
                        SizeBytes: sizeBytes,
                        Metadata:  fileMetadata,
                    },
                },
            },
        },
        Metadata: &structpb.Struct{
            Fields: map[string]*structpb.Value{
                "artifact_type":    structpb.NewStringValue("file"),
                "file_type":        structpb.NewStringValue("dataset"),
                "original_task":    structpb.NewStringValue(taskID),
                "processing_stage": structpb.NewStringValue("cleaned"),
            },
        },
    }
}

Complex Multi-Part Artifacts

Complete Analysis Package

Create comprehensive artifacts with multiple content types:

func createCompleteAnalysisArtifact(taskID string, analysisResults map[string]interface{}) *pb.Artifact {
    // Executive summary
    summary := fmt.Sprintf(`
Analysis Complete: %s

Key Findings:
- Processed %v records
- Found %v anomalies
- Success rate: %v%%
- Processing time: %v ms

Recommendations:
%s
`,
        analysisResults["dataset_name"],
        analysisResults["record_count"],
        analysisResults["anomaly_count"],
        analysisResults["success_percentage"],
        analysisResults["duration_ms"],
        analysisResults["recommendations"],
    )

    // Detailed results data
    detailedResults, _ := structpb.NewStruct(analysisResults)

    // Configuration used
    configData, _ := structpb.NewStruct(map[string]interface{}{
        "algorithm":         "statistical_analysis_v2",
        "confidence_level":  0.95,
        "outlier_threshold": 2.5,
        "normalization":     "z-score",
    })

    return &pb.Artifact{
        ArtifactId:  fmt.Sprintf("artifact_complete_%s", uuid.New().String()),
        Name:        "Complete Analysis Package",
        Description: "Full analysis results including summary, data, configuration, and generated files",
        Parts: []*pb.Part{
            {
                Part: &pb.Part_Text{
                    Text: summary,
                },
            },
            {
                Part: &pb.Part_Data{
                    Data: &pb.DataPart{
                        Data:        detailedResults,
                        Description: "Detailed analysis results and metrics",
                    },
                },
            },
            {
                Part: &pb.Part_Data{
                    Data: &pb.DataPart{
                        Data:        configData,
                        Description: "Analysis configuration parameters",
                    },
                },
            },
            {
                Part: &pb.Part_File{
                    File: &pb.FilePart{
                        FileId:   "results_visualization_123",
                        Filename: "analysis_charts.png",
                        MimeType: "image/png",
                        SizeBytes: 1024000,
                    },
                },
            },
            {
                Part: &pb.Part_File{
                    File: &pb.FilePart{
                        FileId:   "results_dataset_456",
                        Filename: "processed_data.csv",
                        MimeType: "text/csv",
                        SizeBytes: 5120000,
                    },
                },
            },
        },
        Metadata: &structpb.Struct{
            Fields: map[string]*structpb.Value{
                "artifact_type":     structpb.NewStringValue("complete_package"),
                "analysis_type":     structpb.NewStringValue("comprehensive"),
                "includes_files":    structpb.NewBoolValue(true),
                "includes_data":     structpb.NewBoolValue(true),
                "includes_summary":  structpb.NewBoolValue(true),
                "file_count":        structpb.NewNumberValue(2),
                "total_size_bytes":  structpb.NewNumberValue(6144000),
            },
        },
    }
}

Publishing Artifacts

Using A2A Task Completion

Publish artifacts when completing tasks:

import (
    "context"
    "github.com/owulveryck/agenthub/internal/agenthub"
    eventbus "github.com/owulveryck/agenthub/events/eventbus"
)

func completeTaskWithArtifact(ctx context.Context, client eventbus.AgentHubClient, task *pb.Task, artifact *pb.Artifact) error {
    // Update task status to completed
    task.Status = &pb.TaskStatus{
        State: pb.TaskState_TASK_STATE_COMPLETED,
        Update: &pb.Message{
            MessageId: fmt.Sprintf("msg_completion_%s", uuid.New().String()),
            ContextId: task.GetContextId(),
            TaskId:    task.GetId(),
            Role:      pb.Role_AGENT,
            Content: []*pb.Part{
                {
                    Part: &pb.Part_Text{
                        Text: "Task completed successfully. Artifact has been generated.",
                    },
                },
            },
        },
        Timestamp: timestamppb.Now(),
    }

    // Add artifact to task
    task.Artifacts = append(task.Artifacts, artifact)

    // Publish task completion
    _, err := client.PublishTaskUpdate(ctx, &eventbus.PublishTaskUpdateRequest{
        Task: task,
        Routing: &eventbus.AgentEventMetadata{
            FromAgentId: "processing_agent",
            ToAgentId:   "", // Broadcast completion
            EventType:   "task.completed",
            Priority:    eventbus.Priority_PRIORITY_MEDIUM,
        },
    })

    if err != nil {
        return fmt.Errorf("failed to publish task completion: %w", err)
    }

    // Separately publish artifact update
    return publishArtifactUpdate(ctx, client, task.GetId(), artifact)
}

func publishArtifactUpdate(ctx context.Context, client eventbus.AgentHubClient, taskID string, artifact *pb.Artifact) error {
    _, err := client.PublishTaskArtifact(ctx, &eventbus.PublishTaskArtifactRequest{
        TaskId:   taskID,
        Artifact: artifact,
        Routing: &eventbus.AgentEventMetadata{
            FromAgentId: "processing_agent",
            ToAgentId:   "", // Broadcast to interested parties
            EventType:   "artifact.created",
            Priority:    eventbus.Priority_PRIORITY_LOW,
        },
    })

    return err
}

Using A2A Abstractions

Use AgentHub’s simplified artifact publishing:

func completeTaskWithA2AArtifact(ctx context.Context, subscriber *agenthub.A2ATaskSubscriber, task *pb.Task, artifact *pb.Artifact) error {
    return subscriber.CompleteA2ATaskWithArtifact(ctx, task, artifact)
}

Processing Received Artifacts

Artifact Event Handling

Handle incoming artifact notifications:

func handleArtifactEvents(ctx context.Context, client eventbus.AgentHubClient, agentID string) error {
    stream, err := client.SubscribeToAgentEvents(ctx, &eventbus.SubscribeToAgentEventsRequest{
        AgentId: agentID,
        EventTypes: []string{"artifact.created", "task.completed"},
    })
    if err != nil {
        return err
    }

    for {
        event, err := stream.Recv()
        if err != nil {
            return err
        }

        switch payload := event.GetPayload().(type) {
        case *eventbus.AgentEvent_ArtifactUpdate:
            artifactEvent := payload.ArtifactUpdate
            log.Printf("Received artifact: %s for task: %s",
                artifactEvent.GetArtifact().GetArtifactId(),
                artifactEvent.GetTaskId())

            // Process the artifact
            err := processArtifact(ctx, artifactEvent.GetArtifact())
            if err != nil {
                log.Printf("Error processing artifact: %v", err)
            }

        case *eventbus.AgentEvent_Task:
            task := payload.Task
            if task.GetStatus().GetState() == pb.TaskState_TASK_STATE_COMPLETED {
                // Process completed task artifacts
                for _, artifact := range task.GetArtifacts() {
                    err := processArtifact(ctx, artifact)
                    if err != nil {
                        log.Printf("Error processing task artifact: %v", err)
                    }
                }
            }
        }
    }
}

Artifact Content Processing

Process different types of artifact content:

func processArtifact(ctx context.Context, artifact *pb.Artifact) error {
    log.Printf("Processing artifact: %s - %s", artifact.GetName(), artifact.GetDescription())

    for i, part := range artifact.GetParts() {
        switch content := part.GetPart().(type) {
        case *pb.Part_Text:
            log.Printf("Text part %d: Processing text content (%d chars)", i, len(content.Text))
            // Process text content
            err := processTextArtifact(content.Text)
            if err != nil {
                return fmt.Errorf("failed to process text part: %w", err)
            }

        case *pb.Part_Data:
            log.Printf("Data part %d: Processing structured data (%s)", i, content.Data.GetDescription())
            // Process structured data
            err := processDataArtifact(content.Data.GetData())
            if err != nil {
                return fmt.Errorf("failed to process data part: %w", err)
            }

        case *pb.Part_File:
            log.Printf("File part %d: Processing file %s (%s, %d bytes)",
                i, content.File.GetFilename(), content.File.GetMimeType(), content.File.GetSizeBytes())
            // Process file reference
            err := processFileArtifact(ctx, content.File)
            if err != nil {
                return fmt.Errorf("failed to process file part: %w", err)
            }
        }
    }

    return nil
}

func processTextArtifact(text string) error {
    // Extract insights, save to database, etc.
    log.Printf("Extracting insights from text artifact...")
    return nil
}

func processDataArtifact(data *structpb.Struct) error {
    // Parse structured data, update metrics, etc.
    log.Printf("Processing structured data artifact...")

    // Access specific fields
    if recordCount, ok := data.GetFields()["record_count"]; ok {
        log.Printf("Records processed: %v", recordCount.GetNumberValue())
    }

    return nil
}

func processFileArtifact(ctx context.Context, file *pb.FilePart) error {
    // Download file, process content, etc.
    log.Printf("Processing file artifact: %s", file.GetFileId())

    // Handle different file types
    switch file.GetMimeType() {
    case "text/csv":
        return processCSVFile(ctx, file.GetFileId())
    case "image/png", "image/jpeg":
        return processImageFile(ctx, file.GetFileId())
    case "application/json":
        return processJSONFile(ctx, file.GetFileId())
    default:
        log.Printf("Unknown file type: %s", file.GetMimeType())
    }

    return nil
}

Artifact Chaining

Using Artifacts as Inputs

Use artifacts from one task as inputs to another:

func chainArtifactProcessing(ctx context.Context, client eventbus.AgentHubClient, inputArtifact *pb.Artifact) error {
    // Create a new task using the artifact as input
    contextID := fmt.Sprintf("ctx_chained_%s", uuid.New().String())

    chainedTask := &pb.Task{
        Id:        fmt.Sprintf("task_chained_%s", uuid.New().String()),
        ContextId: contextID,
        Status: &pb.TaskStatus{
            State: pb.TaskState_TASK_STATE_SUBMITTED,
            Update: &pb.Message{
                MessageId: fmt.Sprintf("msg_%s", uuid.New().String()),
                ContextId: contextID,
                Role:      pb.Role_USER,
                Content: []*pb.Part{
                    {
                        Part: &pb.Part_Text{
                            Text: "Please process the results from the previous analysis task.",
                        },
                    },
                    {
                        Part: &pb.Part_Data{
                            Data: &pb.DataPart{
                                Data: &structpb.Struct{
                                    Fields: map[string]*structpb.Value{
                                        "input_artifact_id": structpb.NewStringValue(inputArtifact.GetArtifactId()),
                                        "processing_type":   structpb.NewStringValue("enhancement"),
                                    },
                                },
                                Description: "Processing parameters with input artifact reference",
                            },
                        },
                    },
                },
            },
            Timestamp: timestamppb.Now(),
        },
    }

    // Publish the chained task
    _, err := client.PublishTaskUpdate(ctx, &eventbus.PublishTaskUpdateRequest{
        Task: chainedTask,
        Routing: &eventbus.AgentEventMetadata{
            FromAgentId: "workflow_coordinator",
            ToAgentId:   "enhancement_processor",
            EventType:   "task.chained",
            Priority:    eventbus.Priority_PRIORITY_MEDIUM,
        },
    })

    return err
}

Best Practices

1. Use Descriptive Artifact Names and Descriptions

artifact := &pb.Artifact{
    Name:        "Customer Segmentation Analysis Results",
    Description: "Complete customer segmentation with demographics, behavior patterns, and actionable insights",
    // ...
}

2. Include Rich Metadata for Discovery

metadata := map[string]interface{}{
    "artifact_type":    "analysis",
    "domain":          "customer_analytics",
    "data_source":     "customer_transactions_2024",
    "algorithm":       "k_means_clustering",
    "confidence":      0.94,
    "generated_by":    "analytics_engine_v2.1",
    "valid_until":     time.Now().Add(30*24*time.Hour).Format(time.RFC3339),
}

3. Structure Multi-Part Artifacts Logically

// Order parts from most important to least important
parts := []*pb.Part{
    textSummaryPart,      // Human-readable summary first
    structuredDataPart,   // Machine-readable data second
    configurationPart,    // Configuration details third
    fileReferencePart,    // File references last
}

4. Validate Artifacts Before Publishing

func validateArtifact(artifact *pb.Artifact) error {
    if artifact.GetArtifactId() == "" {
        return fmt.Errorf("artifact_id is required")
    }
    if len(artifact.GetParts()) == 0 {
        return fmt.Errorf("artifact must have at least one part")
    }
    return nil
}

5. Handle Large Artifacts Appropriately

// For large data, use file references instead of inline data
if len(dataBytes) > 1024*1024 { // 1MB threshold
    // Save to file storage and reference
    fileID := saveToFileStorage(dataBytes)
    part = createFileReferencePart(fileID, filename, mimeType)
} else {
    // Include data inline
    part = createInlineDataPart(data)
}

This guide covered creating and working with A2A artifacts. Next, learn about A2A Task Lifecycle Management to understand how to properly manage task states and coordinate complex workflows.

3.4 - How to Work with A2A Task Lifecycle

Learn how to manage Agent2Agent protocol task states, handle lifecycle transitions, and coordinate complex task workflows.

How to Work with A2A Task Lifecycle

This guide shows you how to manage the complete lifecycle of Agent2Agent (A2A) protocol tasks, from creation through completion. Understanding task states and transitions is essential for building reliable agent workflows.

Understanding A2A Task States

A2A tasks progress through the following states:

  • TASK_STATE_SUBMITTED: Task created and submitted for processing
  • TASK_STATE_WORKING: Task accepted and currently being processed
  • TASK_STATE_COMPLETED: Task finished successfully with results
  • TASK_STATE_FAILED: Task failed with error information
  • TASK_STATE_CANCELLED: Task cancelled before completion

Each state transition is recorded with a timestamp and status message.

Creating A2A Tasks

Basic Task Creation

Create a new task with initial state:

package main

import (
    "fmt"
    "github.com/google/uuid"
    pb "github.com/owulveryck/agenthub/events/a2a"
    "google.golang.org/protobuf/types/known/timestamppb"
    "google.golang.org/protobuf/types/known/structpb"
)

func createA2ATask(contextID, taskType string, content []*pb.Part) *pb.Task {
    taskID := fmt.Sprintf("task_%s_%s", taskType, uuid.New().String())
    messageID := fmt.Sprintf("msg_%s", uuid.New().String())

    return &pb.Task{
        Id:        taskID,
        ContextId: contextID,
        Status: &pb.TaskStatus{
            State: pb.TaskState_TASK_STATE_SUBMITTED,
            Update: &pb.Message{
                MessageId: messageID,
                ContextId: contextID,
                TaskId:    taskID,
                Role:      pb.Role_USER,
                Content:   content,
                Metadata: &structpb.Struct{
                    Fields: map[string]*structpb.Value{
                        "task_type":      structpb.NewStringValue(taskType),
                        "submitted_by":   structpb.NewStringValue("user_agent"),
                        "priority":       structpb.NewStringValue("medium"),
                    },
                },
            },
            Timestamp: timestamppb.Now(),
        },
        History:   []*pb.Message{},
        Artifacts: []*pb.Artifact{},
        Metadata: &structpb.Struct{
            Fields: map[string]*structpb.Value{
                "task_type":    structpb.NewStringValue(taskType),
                "created_at":   structpb.NewStringValue(time.Now().Format(time.RFC3339)),
                "expected_duration": structpb.NewStringValue("5m"),
            },
        },
    }
}

Task with Complex Requirements

Create tasks with detailed specifications:

func createComplexAnalysisTask(contextID string) *pb.Task {
    // Task configuration
    taskConfig, _ := structpb.NewStruct(map[string]interface{}{
        "algorithm":         "advanced_ml_analysis",
        "confidence_level":  0.95,
        "max_processing_time": "30m",
        "output_formats":    []string{"json", "csv", "visualization"},
        "quality_threshold": 0.9,
    })

    // Input data specification
    inputSpec, _ := structpb.NewStruct(map[string]interface{}{
        "dataset_id":       "customer_data_2024",
        "required_fields":  []string{"customer_id", "transaction_amount", "timestamp"},
        "date_range":       map[string]string{"start": "2024-01-01", "end": "2024-12-31"},
        "preprocessing":    true,
    })

    content := []*pb.Part{
        {
            Part: &pb.Part_Text{
                Text: "Perform comprehensive customer behavior analysis on the specified dataset with advanced ML algorithms.",
            },
        },
        {
            Part: &pb.Part_Data{
                Data: &pb.DataPart{
                    Data:        taskConfig,
                    Description: "Analysis configuration parameters",
                },
            },
        },
        {
            Part: &pb.Part_Data{
                Data: &pb.DataPart{
                    Data:        inputSpec,
                    Description: "Input dataset specification",
                },
            },
        },
    }

    task := createA2ATask(contextID, "customer_analysis", content)

    // Add complex task metadata
    task.Metadata = &structpb.Struct{
        Fields: map[string]*structpb.Value{
            "task_type":           structpb.NewStringValue("customer_analysis"),
            "complexity":          structpb.NewStringValue("high"),
            "estimated_duration":  structpb.NewStringValue("30m"),
            "required_resources":  structpb.NewListValue(&structpb.ListValue{
                Values: []*structpb.Value{
                    structpb.NewStringValue("gpu_compute"),
                    structpb.NewStringValue("large_memory"),
                },
            }),
            "deliverables":        structpb.NewListValue(&structpb.ListValue{
                Values: []*structpb.Value{
                    structpb.NewStringValue("analysis_report"),
                    structpb.NewStringValue("customer_segments"),
                    structpb.NewStringValue("predictions"),
                },
            }),
        },
    }

    return task
}

Task State Transitions

Accepting a Task (SUBMITTED → WORKING)

When an agent accepts a task:

func acceptTask(task *pb.Task, agentID string) *pb.Task {
    // Create acceptance message
    acceptanceMessage := &pb.Message{
        MessageId: fmt.Sprintf("msg_accept_%s", uuid.New().String()),
        ContextId: task.GetContextId(),
        TaskId:    task.GetId(),
        Role:      pb.Role_AGENT,
        Content: []*pb.Part{
            {
                Part: &pb.Part_Text{
                    Text: fmt.Sprintf("Task accepted by agent %s. Beginning processing.", agentID),
                },
            },
        },
        Metadata: &structpb.Struct{
            Fields: map[string]*structpb.Value{
                "accepting_agent": structpb.NewStringValue(agentID),
                "estimated_completion": structpb.NewStringValue(
                    time.Now().Add(15*time.Minute).Format(time.RFC3339),
                ),
            },
        },
    }

    // Update task status
    task.Status = &pb.TaskStatus{
        State:     pb.TaskState_TASK_STATE_WORKING,
        Update:    acceptanceMessage,
        Timestamp: timestamppb.Now(),
    }

    // Add to history
    task.History = append(task.History, acceptanceMessage)

    return task
}

Progress Updates (WORKING → WORKING)

Send progress updates during processing:

func sendProgressUpdate(task *pb.Task, progressPercentage int, currentPhase, details string) *pb.Task {
    // Create progress data
    progressData, _ := structpb.NewStruct(map[string]interface{}{
        "progress_percentage": progressPercentage,
        "current_phase":       currentPhase,
        "details":            details,
        "estimated_remaining": calculateRemainingTime(progressPercentage),
        "memory_usage_mb":     getCurrentMemoryUsage(),
        "cpu_usage_percent":   getCurrentCPUUsage(),
    })

    progressMessage := &pb.Message{
        MessageId: fmt.Sprintf("msg_progress_%s_%d", uuid.New().String(), progressPercentage),
        ContextId: task.GetContextId(),
        TaskId:    task.GetId(),
        Role:      pb.Role_AGENT,
        Content: []*pb.Part{
            {
                Part: &pb.Part_Text{
                    Text: fmt.Sprintf("Progress update: %d%% complete. Current phase: %s",
                        progressPercentage, currentPhase),
                },
            },
            {
                Part: &pb.Part_Data{
                    Data: &pb.DataPart{
                        Data:        progressData,
                        Description: "Detailed progress information",
                    },
                },
            },
        },
        Metadata: &structpb.Struct{
            Fields: map[string]*structpb.Value{
                "update_type":         structpb.NewStringValue("progress"),
                "progress_percentage": structpb.NewNumberValue(float64(progressPercentage)),
                "phase":              structpb.NewStringValue(currentPhase),
            },
        },
    }

    // Update task status (still WORKING, but with new message)
    task.Status = &pb.TaskStatus{
        State:     pb.TaskState_TASK_STATE_WORKING,
        Update:    progressMessage,
        Timestamp: timestamppb.Now(),
    }

    // Add to history
    task.History = append(task.History, progressMessage)

    return task
}

func calculateRemainingTime(progressPercentage int) string {
    if progressPercentage <= 0 {
        return "unknown"
    }
    // Simplified estimation logic
    remainingMinutes := (100 - progressPercentage) * 15 / 100
    return fmt.Sprintf("%dm", remainingMinutes)
}

Completing a Task (WORKING → COMPLETED)

Complete a task with results:

func completeTask(task *pb.Task, results string, artifacts []*pb.Artifact) *pb.Task {
    // Create completion message
    completionMessage := &pb.Message{
        MessageId: fmt.Sprintf("msg_complete_%s", uuid.New().String()),
        ContextId: task.GetContextId(),
        TaskId:    task.GetId(),
        Role:      pb.Role_AGENT,
        Content: []*pb.Part{
            {
                Part: &pb.Part_Text{
                    Text: fmt.Sprintf("Task completed successfully. %s", results),
                },
            },
        },
        Metadata: &structpb.Struct{
            Fields: map[string]*structpb.Value{
                "completion_status": structpb.NewStringValue("success"),
                "processing_time":   structpb.NewStringValue(
                    time.Since(getTaskStartTime(task)).String(),
                ),
                "artifact_count":    structpb.NewNumberValue(float64(len(artifacts))),
            },
        },
    }

    // Update task status
    task.Status = &pb.TaskStatus{
        State:     pb.TaskState_TASK_STATE_COMPLETED,
        Update:    completionMessage,
        Timestamp: timestamppb.Now(),
    }

    // Add completion message to history
    task.History = append(task.History, completionMessage)

    // Add artifacts
    task.Artifacts = append(task.Artifacts, artifacts...)

    return task
}

Handling Task Failures (WORKING → FAILED)

Handle task failures with detailed error information:

func failTask(task *pb.Task, errorMessage, errorCode string, errorDetails map[string]interface{}) *pb.Task {
    // Create error data
    errorData, _ := structpb.NewStruct(map[string]interface{}{
        "error_code":    errorCode,
        "error_message": errorMessage,
        "error_details": errorDetails,
        "failure_phase": getCurrentProcessingPhase(task),
        "retry_possible": determineRetryPossibility(errorCode),
        "diagnostic_info": map[string]interface{}{
            "memory_at_failure": getCurrentMemoryUsage(),
            "cpu_at_failure":   getCurrentCPUUsage(),
            "logs_reference":   getLogReference(),
        },
    })

    failureMessage := &pb.Message{
        MessageId: fmt.Sprintf("msg_failure_%s", uuid.New().String()),
        ContextId: task.GetContextId(),
        TaskId:    task.GetId(),
        Role:      pb.Role_AGENT,
        Content: []*pb.Part{
            {
                Part: &pb.Part_Text{
                    Text: fmt.Sprintf("Task failed: %s (Code: %s)", errorMessage, errorCode),
                },
            },
            {
                Part: &pb.Part_Data{
                    Data: &pb.DataPart{
                        Data:        errorData,
                        Description: "Detailed error information and diagnostics",
                    },
                },
            },
        },
        Metadata: &structpb.Struct{
            Fields: map[string]*structpb.Value{
                "failure_type":  structpb.NewStringValue("processing_error"),
                "error_code":    structpb.NewStringValue(errorCode),
                "retry_possible": structpb.NewBoolValue(determineRetryPossibility(errorCode)),
            },
        },
    }

    // Update task status
    task.Status = &pb.TaskStatus{
        State:     pb.TaskState_TASK_STATE_FAILED,
        Update:    failureMessage,
        Timestamp: timestamppb.Now(),
    }

    // Add failure message to history
    task.History = append(task.History, failureMessage)

    return task
}

func determineRetryPossibility(errorCode string) bool {
    // Determine if the error is retryable
    retryableErrors := []string{
        "TEMPORARY_RESOURCE_UNAVAILABLE",
        "NETWORK_TIMEOUT",
        "RATE_LIMIT_EXCEEDED",
    }

    for _, retryable := range retryableErrors {
        if errorCode == retryable {
            return true
        }
    }
    return false
}

Cancelling Tasks (ANY → CANCELLED)

Handle task cancellation:

func cancelTask(task *pb.Task, reason, cancelledBy string) *pb.Task {
    cancellationMessage := &pb.Message{
        MessageId: fmt.Sprintf("msg_cancel_%s", uuid.New().String()),
        ContextId: task.GetContextId(),
        TaskId:    task.GetId(),
        Role:      pb.Role_AGENT,
        Content: []*pb.Part{
            {
                Part: &pb.Part_Text{
                    Text: fmt.Sprintf("Task cancelled: %s", reason),
                },
            },
        },
        Metadata: &structpb.Struct{
            Fields: map[string]*structpb.Value{
                "cancellation_reason": structpb.NewStringValue(reason),
                "cancelled_by":        structpb.NewStringValue(cancelledBy),
                "previous_state":      structpb.NewStringValue(task.GetStatus().GetState().String()),
            },
        },
    }

    // Update task status
    task.Status = &pb.TaskStatus{
        State:     pb.TaskState_TASK_STATE_CANCELLED,
        Update:    cancellationMessage,
        Timestamp: timestamppb.Now(),
    }

    // Add cancellation message to history
    task.History = append(task.History, cancellationMessage)

    return task
}

Publishing Task Updates

Using AgentHub Client

Publish task updates through the AgentHub broker:

import (
    "context"
    eventbus "github.com/owulveryck/agenthub/events/eventbus"
)

func publishTaskUpdate(ctx context.Context, client eventbus.AgentHubClient, task *pb.Task, fromAgent, toAgent string) error {
    _, err := client.PublishTaskUpdate(ctx, &eventbus.PublishTaskUpdateRequest{
        Task: task,
        Routing: &eventbus.AgentEventMetadata{
            FromAgentId: fromAgent,
            ToAgentId:   toAgent,
            EventType:   fmt.Sprintf("task.%s", task.GetStatus().GetState().String()),
            Priority:    getPriorityFromTaskState(task.GetStatus().GetState()),
        },
    })

    return err
}

func getPriorityFromTaskState(state pb.TaskState) eventbus.Priority {
    switch state {
    case pb.TaskState_TASK_STATE_FAILED:
        return eventbus.Priority_PRIORITY_HIGH
    case pb.TaskState_TASK_STATE_COMPLETED:
        return eventbus.Priority_PRIORITY_MEDIUM
    case pb.TaskState_TASK_STATE_WORKING:
        return eventbus.Priority_PRIORITY_LOW
    default:
        return eventbus.Priority_PRIORITY_MEDIUM
    }
}

Using A2A Abstractions

Use simplified A2A task management:

import (
    "github.com/owulveryck/agenthub/internal/agenthub"
)

func manageTaskWithA2A(ctx context.Context, subscriber *agenthub.A2ATaskSubscriber, task *pb.Task) error {
    // Process the task
    artifact, status, errorMsg := processTaskContent(ctx, task)

    switch status {
    case pb.TaskState_TASK_STATE_COMPLETED:
        return subscriber.CompleteA2ATaskWithArtifact(ctx, task, artifact)
    case pb.TaskState_TASK_STATE_FAILED:
        return subscriber.FailA2ATask(ctx, task, errorMsg)
    default:
        return subscriber.UpdateA2ATaskProgress(ctx, task, 50, "Processing data", "Halfway complete")
    }
}

Task Monitoring and Querying

Get Task Status

Query task status and history:

func getTaskStatus(ctx context.Context, client eventbus.AgentHubClient, taskID string) (*pb.Task, error) {
    task, err := client.GetTask(ctx, &eventbus.GetTaskRequest{
        TaskId:        taskID,
        HistoryLength: 10, // Get last 10 messages
    })
    if err != nil {
        return nil, err
    }

    // Log current status
    log.Printf("Task %s status: %s", taskID, task.GetStatus().GetState().String())
    log.Printf("Last update: %s", task.GetStatus().GetUpdate().GetContent()[0].GetText())
    log.Printf("History length: %d messages", len(task.GetHistory()))
    log.Printf("Artifacts: %d", len(task.GetArtifacts()))

    return task, nil
}

List Tasks by Context

Get all tasks for a conversation context:

func getTasksInContext(ctx context.Context, client eventbus.AgentHubClient, contextID string) ([]*pb.Task, error) {
    response, err := client.ListTasks(ctx, &eventbus.ListTasksRequest{
        ContextId: contextID,
        States:    []pb.TaskState{}, // All states
        Limit:     100,
    })
    if err != nil {
        return nil, err
    }

    tasks := response.GetTasks()
    log.Printf("Found %d tasks in context %s", len(tasks), contextID)

    // Analyze task distribution
    stateCount := make(map[pb.TaskState]int)
    for _, task := range tasks {
        stateCount[task.GetStatus().GetState()]++
    }

    for state, count := range stateCount {
        log.Printf("  %s: %d tasks", state.String(), count)
    }

    return tasks, nil
}

Workflow Coordination

Sequential Task Workflow

Create dependent tasks that execute in sequence:

type TaskWorkflow struct {
    ContextID string
    Tasks     []*pb.Task
    Current   int
}

func (tw *TaskWorkflow) ExecuteNext(ctx context.Context, client eventbus.AgentHubClient) error {
    if tw.Current >= len(tw.Tasks) {
        return fmt.Errorf("workflow completed")
    }

    currentTask := tw.Tasks[tw.Current]

    // Add dependency metadata if not first task
    if tw.Current > 0 {
        previousTask := tw.Tasks[tw.Current-1]
        dependencyMetadata := map[string]interface{}{
            "depends_on":     previousTask.GetId(),
            "workflow_step":  tw.Current + 1,
            "total_steps":    len(tw.Tasks),
        }

        metadata, _ := structpb.NewStruct(dependencyMetadata)
        currentTask.Metadata = metadata
    }

    // Publish the task
    err := publishTaskUpdate(ctx, client, currentTask, "workflow_coordinator", "")
    if err != nil {
        return err
    }

    tw.Current++
    return nil
}

Parallel Task Execution

Execute multiple tasks concurrently:

func executeParallelTasks(ctx context.Context, client eventbus.AgentHubClient, tasks []*pb.Task) error {
    var wg sync.WaitGroup
    errors := make(chan error, len(tasks))

    for _, task := range tasks {
        wg.Add(1)
        go func(t *pb.Task) {
            defer wg.Done()

            // Add parallel execution metadata
            t.Metadata = &structpb.Struct{
                Fields: map[string]*structpb.Value{
                    "execution_mode": structpb.NewStringValue("parallel"),
                    "batch_id":       structpb.NewStringValue(uuid.New().String()),
                    "batch_size":     structpb.NewNumberValue(float64(len(tasks))),
                },
            }

            err := publishTaskUpdate(ctx, client, t, "parallel_coordinator", "")
            if err != nil {
                errors <- err
            }
        }(task)
    }

    wg.Wait()
    close(errors)

    // Check for errors
    for err := range errors {
        if err != nil {
            return err
        }
    }

    return nil
}

Best Practices

1. Always Update Task Status

// Update status for every significant state change
task = acceptTask(task, agentID)
publishTaskUpdate(ctx, client, task, agentID, "")

2. Provide Meaningful Progress Updates

// Send regular progress updates during long-running tasks
for progress := 10; progress <= 90; progress += 10 {
    task = sendProgressUpdate(task, progress, currentPhase, details)
    publishTaskUpdate(ctx, client, task, agentID, "")
    time.Sleep(processingInterval)
}

3. Include Rich Error Information

errorDetails := map[string]interface{}{
    "input_validation_errors": validationErrors,
    "system_resources":        resourceSnapshot,
    "retry_strategy":         "exponential_backoff",
}
task = failTask(task, "Data validation failed", "INVALID_INPUT", errorDetails)

4. Maintain Complete Message History

// Always append to history, never replace
task.History = append(task.History, statusMessage)

5. Use Appropriate Metadata

// Include context for debugging and monitoring
metadata := map[string]interface{}{
    "processing_node":  hostname,
    "resource_usage":   resourceMetrics,
    "performance_metrics": performanceData,
}

This guide covered the complete A2A task lifecycle management. You now have the tools to create, manage, and coordinate complex task workflows with proper state management and comprehensive observability.

4 - Debugging

Troubleshooting and debugging guides for AgentHub

Debugging How-to Guides

Practical troubleshooting guides to help you diagnose and resolve issues in your AgentHub deployments.

Available Guides

4.1 - How to Debug Agent Issues

Practical steps for troubleshooting common issues when developing and deploying agents with AgentHub.

How to Debug Agent Issues

This guide provides practical steps for troubleshooting common issues when developing and deploying agents with AgentHub.

Common Connection Issues

Problem: Agent Can’t Connect to Broker

Symptoms:

Failed to connect: connection refused

Solutions:

  1. Check if broker is running:

    # Check if broker process is running
    ps aux | grep broker
    
    # Check if port 50051 is listening
    netstat -tlnp | grep 50051
    # or
    lsof -i :50051
    
  2. Verify broker address and configuration:

    // Using unified abstraction - configuration via environment or code
    config := agenthub.NewGRPCConfig("subscriber")
    config.BrokerAddr = "localhost"  // Default
    config.BrokerPort = "50051"      // Default
    
    // Or set via environment variables:
    // export AGENTHUB_BROKER_ADDR="localhost"
    // export AGENTHUB_BROKER_PORT="50051"
    
  3. Check firewall settings:

    # On Linux, check if port is blocked
    sudo ufw status
    
    # Allow port if needed
    sudo ufw allow 50051
    

Problem: TLS/SSL Errors

Symptoms:

transport: authentication handshake failed

Solution: The unified abstraction handles TLS configuration automatically:

// TLS and connection management handled automatically
config := agenthub.NewGRPCConfig("subscriber")
client, err := agenthub.NewAgentHubClient(config)
if err != nil {
    panic(err)
}

Task Processing Issues

Problem: Agent Not Receiving Tasks

Debug Steps:

  1. Check subscription logs:

    log.Printf("Agent %s subscribing to tasks...", agentID)
    // Should see: "Successfully subscribed to tasks for agent {agentID}"
    
  2. Verify agent ID matching:

    // In publisher
    ResponderAgentId: "my_processing_agent"
    
    // In subscriber (must match exactly)
    const agentID = "my_processing_agent"
    
  3. Check task type filtering:

    req := &pb.SubscribeToTasksRequest{
        AgentId: agentID,
        TaskTypes: []string{"math_calculation"}, // Remove to receive all types
    }
    
  4. Monitor broker logs:

    # Broker should show:
    Received task request: task_xyz (type: math) from agent: publisher_agent
    # And either:
    No subscribers for task from agent 'publisher_agent'  # Bad - no matching agents
    # Or task routing to subscribers  # Good - task delivered
    

Problem: Tasks Timing Out

Debug Steps:

  1. Check task processing time:

    func processTask(ctx context.Context, task *pb.TaskMessage, client pb.EventBusClient) {
        start := time.Now()
        defer func() {
            log.Printf("Task %s took %v to process", task.GetTaskId(), time.Since(start))
        }()
    
        // Your processing logic
    }
    
  2. Add timeout handling:

    func processTaskWithTimeout(ctx context.Context, task *pb.TaskMessage, client pb.EventBusClient) {
        // Create timeout context
        taskCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
        defer cancel()
    
        // Process with timeout
        select {
        case <-taskCtx.Done():
            if taskCtx.Err() == context.DeadlineExceeded {
                sendResult(ctx, task, nil, pb.TaskStatus_TASK_STATUS_FAILED, "Task timeout", client)
            }
            return
        default:
            // Process normally
        }
    }
    
  3. Monitor progress updates:

    // Send progress every few seconds
    ticker := time.NewTicker(5 * time.Second)
    defer ticker.Stop()
    
    go func() {
        progress := 0
        for range ticker.C {
            progress += 10
            if progress > 100 {
                return
            }
            sendProgress(ctx, task, int32(progress), "Still processing...", client)
        }
    }()
    

Message Serialization Issues

Problem: Parameter Marshaling Errors

Symptoms:

Error creating parameters struct: proto: invalid value type

Solution: Ensure all parameter values are compatible with structpb:

// Bad - channels, functions, complex types not supported
params := map[string]interface{}{
    "callback": func() {},  // Not supported
    "channel": make(chan int),  // Not supported
}

// Good - basic types only
params := map[string]interface{}{
    "name": "value",           // string
    "count": 42,               // number
    "enabled": true,           // boolean
    "items": []string{"a", "b"}, // array
    "config": map[string]interface{}{ // nested object
        "timeout": 30,
    },
}

Problem: Result Unmarshaling Issues

Debug Steps:

  1. Check result structure:

    func handleTaskResult(result *pb.TaskResult) {
        log.Printf("Raw result: %+v", result.GetResult())
    
        resultMap := result.GetResult().AsMap()
        log.Printf("Result as map: %+v", resultMap)
    
        // Type assert carefully
        if value, ok := resultMap["count"].(float64); ok {
            log.Printf("Count: %f", value)
        } else {
            log.Printf("Count field missing or wrong type: %T", resultMap["count"])
        }
    }
    
  2. Handle type conversion safely:

    func getStringField(m map[string]interface{}, key string) (string, error) {
        if val, ok := m[key]; ok {
            if str, ok := val.(string); ok {
                return str, nil
            }
            return "", fmt.Errorf("field %s is not a string: %T", key, val)
        }
        return "", fmt.Errorf("field %s not found", key)
    }
    
    func getNumberField(m map[string]interface{}, key string) (float64, error) {
        if val, ok := m[key]; ok {
            if num, ok := val.(float64); ok {
                return num, nil
            }
            return 0, fmt.Errorf("field %s is not a number: %T", key, val)
        }
        return 0, fmt.Errorf("field %s not found", key)
    }
    

Stream and Connection Issues

Problem: Stream Disconnections

Symptoms:

Error receiving task: rpc error: code = Unavailable desc = connection error

Solutions:

  1. Implement retry logic:

    func subscribeToTasksWithRetry(ctx context.Context, client pb.EventBusClient) {
        for {
            err := subscribeToTasks(ctx, client)
            if err != nil {
                log.Printf("Subscription error: %v, retrying in 5 seconds...", err)
                time.Sleep(5 * time.Second)
                continue
            }
            break
        }
    }
    
  2. Handle context cancellation:

    for {
        task, err := stream.Recv()
        if err == io.EOF {
            log.Printf("Stream closed by server")
            return
        }
        if err != nil {
            if ctx.Err() != nil {
                log.Printf("Context cancelled: %v", ctx.Err())
                return
            }
            log.Printf("Stream error: %v", err)
            return
        }
        // Process task
    }
    

Problem: Memory Leaks in Long-Running Agents

Debug Steps:

  1. Monitor memory usage:

    # Check memory usage
    ps -o pid,ppid,cmd,%mem,%cpu -p $(pgrep -f "your-agent")
    
    # Continuous monitoring
    watch -n 5 'ps -o pid,ppid,cmd,%mem,%cpu -p $(pgrep -f "your-agent")'
    
  2. Profile memory usage:

    import _ "net/http/pprof"
    import "net/http"
    
    func main() {
        // Start pprof server
        go func() {
            log.Println(http.ListenAndServe("localhost:6060", nil))
        }()
    
        // Your agent code
    }
    

    Access profiles at http://localhost:6060/debug/pprof/

  3. Check for goroutine leaks:

    import "runtime"
    
    func logGoroutines() {
        ticker := time.NewTicker(30 * time.Second)
        go func() {
            for range ticker.C {
                log.Printf("Goroutines: %d", runtime.NumGoroutine())
            }
        }()
    }
    

Performance Issues

Problem: Slow Task Processing

Debug Steps:

  1. Add timing measurements:

    func processTask(ctx context.Context, task *pb.TaskMessage, client pb.EventBusClient) {
        timings := make(map[string]time.Duration)
    
        start := time.Now()
    
        // Phase 1: Parameter validation
        timings["validation"] = time.Since(start)
        last := time.Now()
    
        // Phase 2: Business logic
        // ... your logic here ...
        timings["processing"] = time.Since(last)
        last = time.Now()
    
        // Phase 3: Result formatting
        // ... result creation ...
        timings["formatting"] = time.Since(last)
    
        log.Printf("Task %s timings: %+v", task.GetTaskId(), timings)
    }
    
  2. Profile CPU usage:

    import "runtime/pprof"
    import "os"
    
    func startCPUProfile() func() {
        f, err := os.Create("cpu.prof")
        if err != nil {
            log.Fatal(err)
        }
        pprof.StartCPUProfile(f)
    
        return func() {
            pprof.StopCPUProfile()
            f.Close()
        }
    }
    
    func main() {
        stop := startCPUProfile()
        defer stop()
    
        // Your agent code
    }
    
  3. Monitor queue sizes:

    type Agent struct {
        taskQueue chan *pb.TaskMessage
    }
    
    func (a *Agent) logQueueSize() {
        ticker := time.NewTicker(10 * time.Second)
        go func() {
            for range ticker.C {
                log.Printf("Task queue size: %d/%d", len(a.taskQueue), cap(a.taskQueue))
            }
        }()
    }
    

Debugging Tools and Techniques

1. Enable Verbose Logging

import "log"
import "os"

func init() {
    // Enable verbose logging
    log.SetFlags(log.LstdFlags | log.Lshortfile)

    // Set log level from environment
    if os.Getenv("DEBUG") == "true" {
        log.SetOutput(os.Stdout)
    }
}

2. Add Structured Logging

import "encoding/json"
import "time"

type LogEntry struct {
    Timestamp string                 `json:"timestamp"`
    Level     string                 `json:"level"`
    AgentID   string                 `json:"agent_id"`
    TaskID    string                 `json:"task_id,omitempty"`
    Message   string                 `json:"message"`
    Data      map[string]interface{} `json:"data,omitempty"`
}

func logInfo(agentID, taskID, message string, data map[string]interface{}) {
    entry := LogEntry{
        Timestamp: time.Now().Format(time.RFC3339),
        Level:     "INFO",
        AgentID:   agentID,
        TaskID:    taskID,
        Message:   message,
        Data:      data,
    }

    if jsonData, err := json.Marshal(entry); err == nil {
        log.Println(string(jsonData))
    }
}

3. Health Check Endpoint

import "net/http"
import "encoding/json"

type HealthStatus struct {
    Status       string    `json:"status"`
    AgentID      string    `json:"agent_id"`
    Uptime       string    `json:"uptime"`
    TasksProcessed int64   `json:"tasks_processed"`
    LastTaskTime  time.Time `json:"last_task_time"`
}

func startHealthServer(agent *Agent) {
    http.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) {
        status := HealthStatus{
            Status:         "healthy",
            AgentID:        agent.ID,
            Uptime:         time.Since(agent.StartTime).String(),
            TasksProcessed: agent.TasksProcessed,
            LastTaskTime:   agent.LastTaskTime,
        }

        w.Header().Set("Content-Type", "application/json")
        json.NewEncoder(w).Encode(status)
    })

    log.Printf("Health server starting on :8080")
    log.Fatal(http.ListenAndServe(":8080", nil))
}

4. Task Tracing

import "context"

type TraceID string

func withTraceID(ctx context.Context) context.Context {
    traceID := TraceID(fmt.Sprintf("trace-%d", time.Now().UnixNano()))
    return context.WithValue(ctx, "trace_id", traceID)
}

func getTraceID(ctx context.Context) TraceID {
    if traceID, ok := ctx.Value("trace_id").(TraceID); ok {
        return traceID
    }
    return ""
}

func processTaskWithTracing(ctx context.Context, task *pb.TaskMessage, client pb.EventBusClient) {
    ctx = withTraceID(ctx)
    traceID := getTraceID(ctx)

    log.Printf("[%s] Starting task %s", traceID, task.GetTaskId())
    defer log.Printf("[%s] Finished task %s", traceID, task.GetTaskId())

    // Your processing logic with trace ID logging
}

Common Error Patterns

1. Resource Exhaustion

Signs:

  • Tasks start failing after running for a while
  • Memory usage continuously increases
  • File descriptor limits reached

Solutions:

  • Implement proper resource cleanup
  • Add connection pooling
  • Set task processing limits

2. Deadlocks

Signs:

  • Agent stops processing tasks
  • Health checks show agent as “stuck”

Solutions:

  • Avoid blocking operations in main goroutines
  • Use timeouts for all operations
  • Implement deadlock detection

3. Race Conditions

Signs:

  • Intermittent task failures
  • Inconsistent behavior
  • Data corruption

Solutions:

  • Use proper synchronization primitives
  • Run race detector: go run -race your-agent.go
  • Add mutex protection for shared state

With these debugging techniques, you should be able to identify and resolve most agent-related issues efficiently.