This is the multi-page printable view of this section. Click here to print.

Return to the regular view of this page.

Agent Development

Practical guides for creating and managing agents

Agent Development How-to Guides

Step-by-step guides for creating, configuring, and managing different types of agents in AgentHub.

Available Guides

1 - How to Create an A2A Task Publisher

Learn how to create an agent that publishes Agent2Agent (A2A) protocol-compliant tasks to other agents through the AgentHub EDA broker.

How to Create an A2A Task Publisher

This guide shows you how to create an agent that publishes Agent2Agent (A2A) protocol-compliant tasks to other agents through the AgentHub Event-Driven Architecture (EDA) broker.

Basic Setup

Using AgentHub’s unified abstractions, creating a publisher is straightforward:

package main

import (
    "context"
    "fmt"
    "time"

    "github.com/owulveryck/agenthub/internal/agenthub"
    pb "github.com/owulveryck/agenthub/events/a2a"
)

const (
    myAgentID = "my_publisher_agent"
)

func main() {
    ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
    defer cancel()

    // Create configuration with automatic observability
    config := agenthub.NewGRPCConfig("publisher")
    config.HealthPort = "8081" // Unique port for this publisher

    // Create AgentHub client with built-in observability
    client, err := agenthub.NewAgentHubClient(config)
    if err != nil {
        panic("Failed to create AgentHub client: " + err.Error())
    }

    // Automatic graceful shutdown
    defer func() {
        shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 10*time.Second)
        defer shutdownCancel()
        if err := client.Shutdown(shutdownCtx); err != nil {
            client.Logger.ErrorContext(shutdownCtx, "Error during shutdown", "error", err)
        }
    }()

    // Start the client (enables observability)
    if err := client.Start(ctx); err != nil {
        client.Logger.ErrorContext(ctx, "Failed to start client", "error", err)
        panic(err)
    }

    // Create A2A task publisher with automatic tracing and metrics
    taskPublisher := &agenthub.A2ATaskPublisher{
        Client:         client.Client,
        TraceManager:   client.TraceManager,
        MetricsManager: client.MetricsManager,
        Logger:         client.Logger,
        ComponentName:  "publisher",
        AgentID:        myAgentID,
    }

    // Your A2A task publishing code goes here
}

Publishing a Simple A2A Task

Here’s how to publish a basic A2A task using the A2ATaskPublisher abstraction:

func publishSimpleTask(ctx context.Context, taskPublisher *agenthub.A2ATaskPublisher) error {
    // Create A2A-compliant content parts
    content := []*pb.Part{
        {
            Part: &pb.Part_Text{
                Text: "Hello! Please provide a greeting for Claude.",
            },
        },
    }

    // Publish A2A task using the unified abstraction
    task, err := taskPublisher.PublishTask(ctx, &agenthub.A2APublishTaskRequest{
        TaskType:         "greeting",
        Content:          content,
        RequesterAgentID: myAgentID,
        ResponderAgentID: "agent_demo_subscriber", // Target agent
        Priority:         pb.Priority_PRIORITY_HIGH,
        ContextID:        "ctx_greeting_demo", // Optional: conversation context
    })
    if err != nil {
        return fmt.Errorf("failed to publish greeting task: %w", err)
    }

    taskPublisher.Logger.InfoContext(ctx, "Published A2A greeting task",
        "task_id", task.GetId(),
        "context_id", task.GetContextId())
    return nil
}

Publishing Different Task Types

Math Calculation Task with A2A Data Parts

func publishMathTask(ctx context.Context, taskPublisher *agenthub.A2ATaskPublisher) error {
    // Create A2A-compliant content with structured data
    content := []*pb.Part{
        {
            Part: &pb.Part_Text{
                Text: "Please perform the following mathematical calculation:",
            },
        },
        {
            Part: &pb.Part_Data{
                Data: &pb.DataPart{
                    Data: &structpb.Struct{
                        Fields: map[string]*structpb.Value{
                            "operation": structpb.NewStringValue("multiply"),
                            "a":         structpb.NewNumberValue(15.0),
                            "b":         structpb.NewNumberValue(7.0),
                        },
                    },
                },
            },
        },
    }

    // Publish A2A math task
    task, err := taskPublisher.PublishTask(ctx, &agenthub.A2APublishTaskRequest{
        TaskType:         "math_calculation",
        Content:          content,
        RequesterAgentID: myAgentID,
        ResponderAgentID: "agent_demo_subscriber",
        Priority:         pb.Priority_PRIORITY_MEDIUM,
        ContextID:        "ctx_math_demo",
    })
    if err != nil {
        return fmt.Errorf("failed to publish math task: %w", err)
    }

    taskPublisher.Logger.InfoContext(ctx, "Published A2A math task",
        "task_id", task.GetId(),
        "operation", "multiply")
    return nil
}

Data Processing Task

func publishDataProcessingTask(ctx context.Context, taskPublisher *agenthub.TaskPublisher) {
    err := taskPublisher.PublishTask(ctx, &agenthub.PublishTaskRequest{
        TaskType: "data_processing",
        Parameters: map[string]interface{}{
            "dataset_path":   "/data/customer_data.csv",
            "analysis_type":  "summary_statistics",
            "output_format":  "json",
            "filters": map[string]interface{}{
                "date_range": "last_30_days",
                "status":     "active",
            },
            // Metadata is handled automatically by TaskPublisher
            "workflow_id": "workflow_123",
            "user_id":     "user_456",
        },
        RequesterAgentID: myAgentID,
        ResponderAgentID: "data_agent",
        Priority:         pb.Priority_PRIORITY_HIGH,
    })
    if err != nil {
        panic(fmt.Sprintf("Failed to publish data processing task: %v", err))
    }
}

Broadcasting Tasks (No Specific Responder)

To broadcast a task to all available agents, omit the ResponderAgentID:

func broadcastTask(ctx context.Context, taskPublisher *agenthub.TaskPublisher) {
    err := taskPublisher.PublishTask(ctx, &agenthub.PublishTaskRequest{
        TaskType: "announcement",
        Parameters: map[string]interface{}{
            "announcement":    "Server maintenance in 30 minutes",
            "action_required": false,
        },
        RequesterAgentID: myAgentID,
        // ResponderAgentID omitted - will broadcast to all agents
        ResponderAgentID: "",
        Priority:         pb.Priority_PRIORITY_LOW,
    })
    if err != nil {
        panic(fmt.Sprintf("Failed to publish announcement: %v", err))
    }
}

Subscribing to Task Results

As a publisher, you’ll want to receive results from tasks you’ve requested. You can use the AgentHub client directly:

func subscribeToResults(ctx context.Context, client *agenthub.AgentHubClient) {
    req := &pb.SubscribeToTaskResultsRequest{
        RequesterAgentId: myAgentID,
        // TaskIds: []string{"specific_task_id"}, // Optional: filter specific tasks
    }

    stream, err := client.Client.SubscribeToTaskResults(ctx, req)
    if err != nil {
        client.Logger.ErrorContext(ctx, "Error subscribing to results", "error", err)
        return
    }

    client.Logger.InfoContext(ctx, "Subscribed to task results", "agent_id", myAgentID)

    for {
        result, err := stream.Recv()
        if err != nil {
            client.Logger.ErrorContext(ctx, "Error receiving result", "error", err)
            return
        }

        handleTaskResult(ctx, client, result)
    }
}

func handleTaskResult(ctx context.Context, client *agenthub.AgentHubClient, result *pb.TaskResult) {
    client.Logger.InfoContext(ctx, "Received task result",
        "task_id", result.GetTaskId(),
        "status", result.GetStatus().String())

    switch result.GetStatus() {
    case pb.TaskStatus_TASK_STATUS_COMPLETED:
        client.Logger.InfoContext(ctx, "Task completed successfully",
            "task_id", result.GetTaskId(),
            "result", result.GetResult().AsMap())
    case pb.TaskStatus_TASK_STATUS_FAILED:
        client.Logger.ErrorContext(ctx, "Task failed",
            "task_id", result.GetTaskId(),
            "error", result.GetErrorMessage())
    case pb.TaskStatus_TASK_STATUS_CANCELLED:
        client.Logger.InfoContext(ctx, "Task was cancelled",
            "task_id", result.GetTaskId())
    }
}

Monitoring Task Progress

Subscribe to progress updates to track long-running tasks:

func subscribeToProgress(ctx context.Context, client *agenthub.AgentHubClient) {
    req := &pb.SubscribeToTaskResultsRequest{
        RequesterAgentId: myAgentID,
    }

    stream, err := client.Client.SubscribeToTaskProgress(ctx, req)
    if err != nil {
        client.Logger.ErrorContext(ctx, "Error subscribing to progress", "error", err)
        return
    }

    client.Logger.InfoContext(ctx, "Subscribed to task progress", "agent_id", myAgentID)

    for {
        progress, err := stream.Recv()
        if err != nil {
            client.Logger.ErrorContext(ctx, "Error receiving progress", "error", err)
            return
        }

        client.Logger.InfoContext(ctx, "Task progress update",
            "task_id", progress.GetTaskId(),
            "progress_percentage", progress.GetProgressPercentage(),
            "progress_message", progress.GetProgressMessage())
    }
}

Complete Publisher Example

func main() {
    ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
    defer cancel()

    // Create configuration with automatic observability
    config := agenthub.NewGRPCConfig("publisher")
    config.HealthPort = "8081"

    // Create AgentHub client with built-in observability
    client, err := agenthub.NewAgentHubClient(config)
    if err != nil {
        panic("Failed to create AgentHub client: " + err.Error())
    }

    defer func() {
        shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 10*time.Second)
        defer shutdownCancel()
        if err := client.Shutdown(shutdownCtx); err != nil {
            client.Logger.ErrorContext(shutdownCtx, "Error during shutdown", "error", err)
        }
    }()

    // Start the client (enables observability)
    if err := client.Start(ctx); err != nil {
        client.Logger.ErrorContext(ctx, "Failed to start client", "error", err)
        panic(err)
    }

    // Create task publisher with automatic tracing and metrics
    taskPublisher := &agenthub.TaskPublisher{
        Client:         client.Client,
        TraceManager:   client.TraceManager,
        MetricsManager: client.MetricsManager,
        Logger:         client.Logger,
        ComponentName:  "publisher",
    }

    client.Logger.InfoContext(ctx, "Starting publisher demo")

    // Publish various tasks with automatic observability
    publishMathTask(ctx, taskPublisher)
    time.Sleep(2 * time.Second)

    publishDataProcessingTask(ctx, taskPublisher)
    time.Sleep(2 * time.Second)

    broadcastTask(ctx, taskPublisher)

    client.Logger.InfoContext(ctx, "All tasks published! Check subscriber logs for results")
}

Best Practices

  1. Always set a unique task ID: Use timestamps, UUIDs, or sequential IDs to ensure uniqueness.

  2. Use appropriate priorities: Reserve PRIORITY_CRITICAL for urgent tasks that must be processed immediately.

  3. Set realistic deadlines: Include deadlines for time-sensitive tasks to help agents prioritize.

  4. Handle results gracefully: Always subscribe to task results and handle failures appropriately.

  5. Include helpful metadata: Add context information that might be useful for debugging or auditing.

  6. Validate parameters: Ensure task parameters are properly structured before publishing.

  7. Use specific responder IDs when possible: This ensures tasks go to the most appropriate agent.

Your publisher is now ready to send tasks to agents and receive results!

2 - How to Create an A2A Task Subscriber (Agent)

Learn how to create an agent that can receive, process, and respond to Agent2Agent (A2A) protocol tasks through the AgentHub EDA broker using A2A-compliant abstractions.

How to Create an A2A Task Subscriber (Agent)

This guide shows you how to create an agent that can receive, process, and respond to Agent2Agent (A2A) protocol tasks through the AgentHub Event-Driven Architecture (EDA) broker using AgentHub’s A2A-compliant abstractions.

Basic Agent Setup

Start by creating the basic structure for your agent using the unified abstraction:

package main

import (
    "context"
    "os"
    "os/signal"
    "syscall"
    "time"

    "github.com/owulveryck/agenthub/internal/agenthub"
    pb "github.com/owulveryck/agenthub/events/a2a"
    "google.golang.org/protobuf/types/known/structpb"
)

const (
    agentID = "my_agent_processor"
)

func main() {
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    // Create configuration with automatic observability
    config := agenthub.NewGRPCConfig("subscriber")
    config.HealthPort = "8082" // Unique port for this agent

    // Create AgentHub client with built-in observability
    client, err := agenthub.NewAgentHubClient(config)
    if err != nil {
        panic("Failed to create AgentHub client: " + err.Error())
    }

    // Automatic graceful shutdown
    defer func() {
        shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 10*time.Second)
        defer shutdownCancel()
        if err := client.Shutdown(shutdownCtx); err != nil {
            client.Logger.ErrorContext(shutdownCtx, "Error during shutdown", "error", err)
        }
    }()

    // Start the client (enables observability)
    if err := client.Start(ctx); err != nil {
        client.Logger.ErrorContext(ctx, "Failed to start client", "error", err)
        panic(err)
    }

    // Create A2A task subscriber with automatic observability
    taskSubscriber := agenthub.NewA2ATaskSubscriber(client, agentID)

    // Register A2A task handlers (see below for examples)
    taskSubscriber.RegisterDefaultHandlers()

    // Handle graceful shutdown
    sigChan := make(chan os.Signal, 1)
    signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)

    go func() {
        <-sigChan
        client.Logger.Info("Received shutdown signal")
        cancel()
    }()

    client.Logger.InfoContext(ctx, "Starting subscriber agent")

    // Start task subscription (with automatic observability)
    go func() {
        if err := taskSubscriber.SubscribeToTasks(ctx); err != nil {
            client.Logger.ErrorContext(ctx, "Task subscription failed", "error", err)
        }
    }()

    // Optional: Subscribe to task results if this agent also publishes tasks
    go func() {
        if err := taskSubscriber.SubscribeToTaskResults(ctx); err != nil {
            client.Logger.ErrorContext(ctx, "Task result subscription failed", "error", err)
        }
    }()

    client.Logger.InfoContext(ctx, "Agent started with observability. Listening for tasks.")

    // Wait for context cancellation
    <-ctx.Done()
    client.Logger.Info("Agent shutdown complete")
}

Default Task Handlers

The RegisterDefaultHandlers() method provides built-in handlers for common task types:

  • greeting: Simple greeting with name parameter
  • math_calculation: Basic arithmetic operations (add, subtract, multiply, divide)
  • random_number: Random number generation with seed

Custom Task Handlers

Simple Custom Handler

Add your own task handlers using RegisterTaskHandler():

func setupCustomHandlers(taskSubscriber *agenthub.TaskSubscriber) {
    // Register a custom data processing handler
    taskSubscriber.RegisterTaskHandler("data_processing", handleDataProcessing)

    // Register a file conversion handler
    taskSubscriber.RegisterTaskHandler("file_conversion", handleFileConversion)

    // Register a status check handler
    taskSubscriber.RegisterTaskHandler("status_check", handleStatusCheck)
}

func handleDataProcessing(ctx context.Context, task *pb.TaskMessage) (*structpb.Struct, pb.TaskStatus, string) {
    params := task.GetParameters()
    datasetPath := params.Fields["dataset_path"].GetStringValue()
    analysisType := params.Fields["analysis_type"].GetStringValue()

    if datasetPath == "" {
        return nil, pb.TaskStatus_TASK_STATUS_FAILED, "dataset_path parameter is required"
    }

    // Simulate data processing
    time.Sleep(2 * time.Second)

    result, err := structpb.NewStruct(map[string]interface{}{
        "dataset_path":    datasetPath,
        "analysis_type":   analysisType,
        "records_processed": 1500,
        "processing_time": "2.1s",
        "summary": map[string]interface{}{
            "mean":   42.7,
            "median": 41.2,
            "stddev": 8.3,
        },
        "processed_at": time.Now().Format(time.RFC3339),
    })

    if err != nil {
        return nil, pb.TaskStatus_TASK_STATUS_FAILED, "Failed to create result structure"
    }

    return result, pb.TaskStatus_TASK_STATUS_COMPLETED, ""
}

Advanced Handler with Validation

func handleFileConversion(ctx context.Context, task *pb.TaskMessage) (*structpb.Struct, pb.TaskStatus, string) {
    params := task.GetParameters()

    // Extract and validate parameters
    inputPath := params.Fields["input_path"].GetStringValue()
    outputFormat := params.Fields["output_format"].GetStringValue()

    if inputPath == "" {
        return nil, pb.TaskStatus_TASK_STATUS_FAILED, "input_path parameter is required"
    }

    if outputFormat == "" {
        return nil, pb.TaskStatus_TASK_STATUS_FAILED, "output_format parameter is required"
    }

    // Validate output format
    validFormats := []string{"pdf", "docx", "txt", "html"}
    isValidFormat := false
    for _, format := range validFormats {
        if outputFormat == format {
            isValidFormat = true
            break
        }
    }

    if !isValidFormat {
        return nil, pb.TaskStatus_TASK_STATUS_FAILED, fmt.Sprintf("unsupported output format: %s", outputFormat)
    }

    // Simulate file conversion process
    time.Sleep(1 * time.Second)

    outputPath := strings.Replace(inputPath, filepath.Ext(inputPath), "."+outputFormat, 1)

    result, err := structpb.NewStruct(map[string]interface{}{
        "input_path":      inputPath,
        "output_path":     outputPath,
        "output_format":   outputFormat,
        "file_size":       "2.5MB",
        "conversion_time": "1.2s",
        "status":          "success",
        "converted_at":    time.Now().Format(time.RFC3339),
    })

    if err != nil {
        return nil, pb.TaskStatus_TASK_STATUS_FAILED, "Failed to create result structure"
    }

    return result, pb.TaskStatus_TASK_STATUS_COMPLETED, ""
}

Handler with External Service Integration

func handleStatusCheck(ctx context.Context, task *pb.TaskMessage) (*structpb.Struct, pb.TaskStatus, string) {
    params := task.GetParameters()
    serviceURL := params.Fields["service_url"].GetStringValue()

    if serviceURL == "" {
        return nil, pb.TaskStatus_TASK_STATUS_FAILED, "service_url parameter is required"
    }

    // Create HTTP client with timeout
    client := &http.Client{
        Timeout: 10 * time.Second,
    }

    // Perform health check
    resp, err := client.Get(serviceURL + "/health")
    if err != nil {
        return nil, pb.TaskStatus_TASK_STATUS_FAILED, fmt.Sprintf("Failed to reach service: %v", err)
    }
    defer resp.Body.Close()

    // Determine status
    isHealthy := resp.StatusCode >= 200 && resp.StatusCode < 300
    status := "unhealthy"
    if isHealthy {
        status = "healthy"
    }

    result, err := structpb.NewStruct(map[string]interface{}{
        "service_url":     serviceURL,
        "status":          status,
        "status_code":     resp.StatusCode,
        "response_time":   "150ms",
        "checked_at":      time.Now().Format(time.RFC3339),
    })

    if err != nil {
        return nil, pb.TaskStatus_TASK_STATUS_FAILED, "Failed to create result structure"
    }

    return result, pb.TaskStatus_TASK_STATUS_COMPLETED, ""
}

Complete Agent Example

Here’s a complete agent that handles multiple task types:

package main

import (
    "context"
    "fmt"
    "net/http"
    "os"
    "os/signal"
    "path/filepath"
    "strings"
    "syscall"
    "time"

    "github.com/owulveryck/agenthub/internal/agenthub"
    pb "github.com/owulveryck/agenthub/events/a2a"
    "google.golang.org/protobuf/types/known/structpb"
)

const agentID = "multi_task_agent"

func main() {
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    // Create AgentHub client with observability
    config := agenthub.NewGRPCConfig("subscriber")
    config.HealthPort = "8082"

    client, err := agenthub.NewAgentHubClient(config)
    if err != nil {
        panic("Failed to create AgentHub client: " + err.Error())
    }

    defer func() {
        shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 10*time.Second)
        defer shutdownCancel()
        if err := client.Shutdown(shutdownCtx); err != nil {
            client.Logger.ErrorContext(shutdownCtx, "Error during shutdown", "error", err)
        }
    }()

    if err := client.Start(ctx); err != nil {
        panic(err)
    }

    // Create and configure task subscriber
    taskSubscriber := agenthub.NewTaskSubscriber(client, agentID)

    // Register both default and custom handlers
    taskSubscriber.RegisterDefaultHandlers()
    setupCustomHandlers(taskSubscriber)

    // Graceful shutdown handling
    sigChan := make(chan os.Signal, 1)
    signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)

    go func() {
        <-sigChan
        client.Logger.Info("Received shutdown signal")
        cancel()
    }()

    client.Logger.InfoContext(ctx, "Starting multi-task agent")

    // Start subscriptions
    go func() {
        if err := taskSubscriber.SubscribeToTasks(ctx); err != nil {
            client.Logger.ErrorContext(ctx, "Task subscription failed", "error", err)
        }
    }()

    go func() {
        if err := taskSubscriber.SubscribeToTaskResults(ctx); err != nil {
            client.Logger.ErrorContext(ctx, "Task result subscription failed", "error", err)
        }
    }()

    client.Logger.InfoContext(ctx, "Agent ready to process tasks",
        "supported_tasks", []string{"greeting", "math_calculation", "random_number", "data_processing", "file_conversion", "status_check"})

    <-ctx.Done()
    client.Logger.Info("Agent shutdown complete")
}

func setupCustomHandlers(taskSubscriber *agenthub.TaskSubscriber) {
    taskSubscriber.RegisterTaskHandler("data_processing", handleDataProcessing)
    taskSubscriber.RegisterTaskHandler("file_conversion", handleFileConversion)
    taskSubscriber.RegisterTaskHandler("status_check", handleStatusCheck)
}

// ... (include the handler functions from above)

Automatic Features

The unified abstraction provides automatic features:

Observability

  • Distributed tracing for each task processing
  • Metrics collection for processing times and success rates
  • Structured logging with correlation IDs

Task Management

  • Automatic result publishing back to the broker
  • Error handling and status reporting
  • Progress tracking capabilities

Resource Management

  • Graceful shutdown handling
  • Connection management to the broker
  • Health endpoints for monitoring

Best Practices

  1. Parameter Validation: Always validate task parameters before processing

    if requiredParam == "" {
        return nil, pb.TaskStatus_TASK_STATUS_FAILED, "required_param is missing"
    }
    
  2. Error Handling: Provide meaningful error messages

    if err != nil {
        return nil, pb.TaskStatus_TASK_STATUS_FAILED, fmt.Sprintf("Processing failed: %v", err)
    }
    
  3. Timeouts: Use context with timeouts for external operations

    client := &http.Client{Timeout: 10 * time.Second}
    
  4. Resource Cleanup: Always clean up resources in handlers

    defer file.Close()
    defer resp.Body.Close()
    
  5. Structured Results: Return well-structured result data

    result, _ := structpb.NewStruct(map[string]interface{}{
        "status": "completed",
        "timestamp": time.Now().Format(time.RFC3339),
        "data": processedData,
    })
    

Handler Function Signature

All task handlers must implement the TaskHandler interface:

type TaskHandler func(ctx context.Context, task *pb.TaskMessage) (*structpb.Struct, pb.TaskStatus, string)

Return values:

  • *structpb.Struct: The result data (can be nil on failure)
  • pb.TaskStatus: One of:
    • pb.TaskStatus_TASK_STATUS_COMPLETED
    • pb.TaskStatus_TASK_STATUS_FAILED
    • pb.TaskStatus_TASK_STATUS_CANCELLED
  • string: Error message (empty string on success)

Your agent is now ready to receive and process tasks from other agents in the system with full observability and automatic result publishing!