1.1 - How to Create an A2A Task Publisher
Learn how to create an agent that publishes Agent2Agent (A2A) protocol-compliant tasks to other agents through the AgentHub EDA broker.
How to Create an A2A Task Publisher
This guide shows you how to create an agent that publishes Agent2Agent (A2A) protocol-compliant tasks to other agents through the AgentHub Event-Driven Architecture (EDA) broker.
Basic Setup
Using AgentHub’s unified abstractions, creating a publisher is straightforward:
package main
import (
"context"
"fmt"
"time"
"github.com/owulveryck/agenthub/internal/agenthub"
pb "github.com/owulveryck/agenthub/events/a2a"
)
const (
myAgentID = "my_publisher_agent"
)
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel()
// Create configuration with automatic observability
config := agenthub.NewGRPCConfig("publisher")
config.HealthPort = "8081" // Unique port for this publisher
// Create AgentHub client with built-in observability
client, err := agenthub.NewAgentHubClient(config)
if err != nil {
panic("Failed to create AgentHub client: " + err.Error())
}
// Automatic graceful shutdown
defer func() {
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 10*time.Second)
defer shutdownCancel()
if err := client.Shutdown(shutdownCtx); err != nil {
client.Logger.ErrorContext(shutdownCtx, "Error during shutdown", "error", err)
}
}()
// Start the client (enables observability)
if err := client.Start(ctx); err != nil {
client.Logger.ErrorContext(ctx, "Failed to start client", "error", err)
panic(err)
}
// Create A2A task publisher with automatic tracing and metrics
taskPublisher := &agenthub.A2ATaskPublisher{
Client: client.Client,
TraceManager: client.TraceManager,
MetricsManager: client.MetricsManager,
Logger: client.Logger,
ComponentName: "publisher",
AgentID: myAgentID,
}
// Your A2A task publishing code goes here
}
Publishing a Simple A2A Task
Here’s how to publish a basic A2A task using the A2ATaskPublisher abstraction:
func publishSimpleTask(ctx context.Context, taskPublisher *agenthub.A2ATaskPublisher) error {
// Create A2A-compliant content parts
content := []*pb.Part{
{
Part: &pb.Part_Text{
Text: "Hello! Please provide a greeting for Claude.",
},
},
}
// Publish A2A task using the unified abstraction
task, err := taskPublisher.PublishTask(ctx, &agenthub.A2APublishTaskRequest{
TaskType: "greeting",
Content: content,
RequesterAgentID: myAgentID,
ResponderAgentID: "agent_demo_subscriber", // Target agent
Priority: pb.Priority_PRIORITY_HIGH,
ContextID: "ctx_greeting_demo", // Optional: conversation context
})
if err != nil {
return fmt.Errorf("failed to publish greeting task: %w", err)
}
taskPublisher.Logger.InfoContext(ctx, "Published A2A greeting task",
"task_id", task.GetId(),
"context_id", task.GetContextId())
return nil
}
Publishing Different Task Types
Math Calculation Task with A2A Data Parts
func publishMathTask(ctx context.Context, taskPublisher *agenthub.A2ATaskPublisher) error {
// Create A2A-compliant content with structured data
content := []*pb.Part{
{
Part: &pb.Part_Text{
Text: "Please perform the following mathematical calculation:",
},
},
{
Part: &pb.Part_Data{
Data: &pb.DataPart{
Data: &structpb.Struct{
Fields: map[string]*structpb.Value{
"operation": structpb.NewStringValue("multiply"),
"a": structpb.NewNumberValue(15.0),
"b": structpb.NewNumberValue(7.0),
},
},
},
},
},
}
// Publish A2A math task
task, err := taskPublisher.PublishTask(ctx, &agenthub.A2APublishTaskRequest{
TaskType: "math_calculation",
Content: content,
RequesterAgentID: myAgentID,
ResponderAgentID: "agent_demo_subscriber",
Priority: pb.Priority_PRIORITY_MEDIUM,
ContextID: "ctx_math_demo",
})
if err != nil {
return fmt.Errorf("failed to publish math task: %w", err)
}
taskPublisher.Logger.InfoContext(ctx, "Published A2A math task",
"task_id", task.GetId(),
"operation", "multiply")
return nil
}
Data Processing Task
func publishDataProcessingTask(ctx context.Context, taskPublisher *agenthub.TaskPublisher) {
err := taskPublisher.PublishTask(ctx, &agenthub.PublishTaskRequest{
TaskType: "data_processing",
Parameters: map[string]interface{}{
"dataset_path": "/data/customer_data.csv",
"analysis_type": "summary_statistics",
"output_format": "json",
"filters": map[string]interface{}{
"date_range": "last_30_days",
"status": "active",
},
// Metadata is handled automatically by TaskPublisher
"workflow_id": "workflow_123",
"user_id": "user_456",
},
RequesterAgentID: myAgentID,
ResponderAgentID: "data_agent",
Priority: pb.Priority_PRIORITY_HIGH,
})
if err != nil {
panic(fmt.Sprintf("Failed to publish data processing task: %v", err))
}
}
Broadcasting Tasks (No Specific Responder)
To broadcast a task to all available agents, omit the ResponderAgentID:
func broadcastTask(ctx context.Context, taskPublisher *agenthub.TaskPublisher) {
err := taskPublisher.PublishTask(ctx, &agenthub.PublishTaskRequest{
TaskType: "announcement",
Parameters: map[string]interface{}{
"announcement": "Server maintenance in 30 minutes",
"action_required": false,
},
RequesterAgentID: myAgentID,
// ResponderAgentID omitted - will broadcast to all agents
ResponderAgentID: "",
Priority: pb.Priority_PRIORITY_LOW,
})
if err != nil {
panic(fmt.Sprintf("Failed to publish announcement: %v", err))
}
}
Subscribing to Task Results
As a publisher, you’ll want to receive results from tasks you’ve requested. You can use the AgentHub client directly:
func subscribeToResults(ctx context.Context, client *agenthub.AgentHubClient) {
req := &pb.SubscribeToTaskResultsRequest{
RequesterAgentId: myAgentID,
// TaskIds: []string{"specific_task_id"}, // Optional: filter specific tasks
}
stream, err := client.Client.SubscribeToTaskResults(ctx, req)
if err != nil {
client.Logger.ErrorContext(ctx, "Error subscribing to results", "error", err)
return
}
client.Logger.InfoContext(ctx, "Subscribed to task results", "agent_id", myAgentID)
for {
result, err := stream.Recv()
if err != nil {
client.Logger.ErrorContext(ctx, "Error receiving result", "error", err)
return
}
handleTaskResult(ctx, client, result)
}
}
func handleTaskResult(ctx context.Context, client *agenthub.AgentHubClient, result *pb.TaskResult) {
client.Logger.InfoContext(ctx, "Received task result",
"task_id", result.GetTaskId(),
"status", result.GetStatus().String())
switch result.GetStatus() {
case pb.TaskStatus_TASK_STATUS_COMPLETED:
client.Logger.InfoContext(ctx, "Task completed successfully",
"task_id", result.GetTaskId(),
"result", result.GetResult().AsMap())
case pb.TaskStatus_TASK_STATUS_FAILED:
client.Logger.ErrorContext(ctx, "Task failed",
"task_id", result.GetTaskId(),
"error", result.GetErrorMessage())
case pb.TaskStatus_TASK_STATUS_CANCELLED:
client.Logger.InfoContext(ctx, "Task was cancelled",
"task_id", result.GetTaskId())
}
}
Monitoring Task Progress
Subscribe to progress updates to track long-running tasks:
func subscribeToProgress(ctx context.Context, client *agenthub.AgentHubClient) {
req := &pb.SubscribeToTaskResultsRequest{
RequesterAgentId: myAgentID,
}
stream, err := client.Client.SubscribeToTaskProgress(ctx, req)
if err != nil {
client.Logger.ErrorContext(ctx, "Error subscribing to progress", "error", err)
return
}
client.Logger.InfoContext(ctx, "Subscribed to task progress", "agent_id", myAgentID)
for {
progress, err := stream.Recv()
if err != nil {
client.Logger.ErrorContext(ctx, "Error receiving progress", "error", err)
return
}
client.Logger.InfoContext(ctx, "Task progress update",
"task_id", progress.GetTaskId(),
"progress_percentage", progress.GetProgressPercentage(),
"progress_message", progress.GetProgressMessage())
}
}
Complete Publisher Example
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel()
// Create configuration with automatic observability
config := agenthub.NewGRPCConfig("publisher")
config.HealthPort = "8081"
// Create AgentHub client with built-in observability
client, err := agenthub.NewAgentHubClient(config)
if err != nil {
panic("Failed to create AgentHub client: " + err.Error())
}
defer func() {
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 10*time.Second)
defer shutdownCancel()
if err := client.Shutdown(shutdownCtx); err != nil {
client.Logger.ErrorContext(shutdownCtx, "Error during shutdown", "error", err)
}
}()
// Start the client (enables observability)
if err := client.Start(ctx); err != nil {
client.Logger.ErrorContext(ctx, "Failed to start client", "error", err)
panic(err)
}
// Create task publisher with automatic tracing and metrics
taskPublisher := &agenthub.TaskPublisher{
Client: client.Client,
TraceManager: client.TraceManager,
MetricsManager: client.MetricsManager,
Logger: client.Logger,
ComponentName: "publisher",
}
client.Logger.InfoContext(ctx, "Starting publisher demo")
// Publish various tasks with automatic observability
publishMathTask(ctx, taskPublisher)
time.Sleep(2 * time.Second)
publishDataProcessingTask(ctx, taskPublisher)
time.Sleep(2 * time.Second)
broadcastTask(ctx, taskPublisher)
client.Logger.InfoContext(ctx, "All tasks published! Check subscriber logs for results")
}
Best Practices
Always set a unique task ID: Use timestamps, UUIDs, or sequential IDs to ensure uniqueness.
Use appropriate priorities: Reserve PRIORITY_CRITICAL for urgent tasks that must be processed immediately.
Set realistic deadlines: Include deadlines for time-sensitive tasks to help agents prioritize.
Handle results gracefully: Always subscribe to task results and handle failures appropriately.
Include helpful metadata: Add context information that might be useful for debugging or auditing.
Validate parameters: Ensure task parameters are properly structured before publishing.
Use specific responder IDs when possible: This ensures tasks go to the most appropriate agent.
Your publisher is now ready to send tasks to agents and receive results!
1.2 - How to Create an A2A Task Subscriber (Agent)
Learn how to create an agent that can receive, process, and respond to Agent2Agent (A2A) protocol tasks through the AgentHub EDA broker using A2A-compliant abstractions.
How to Create an A2A Task Subscriber (Agent)
This guide shows you how to create an agent that can receive, process, and respond to Agent2Agent (A2A) protocol tasks through the AgentHub Event-Driven Architecture (EDA) broker using AgentHub’s A2A-compliant abstractions.
Basic Agent Setup
Start by creating the basic structure for your agent using the unified abstraction:
package main
import (
"context"
"os"
"os/signal"
"syscall"
"time"
"github.com/owulveryck/agenthub/internal/agenthub"
pb "github.com/owulveryck/agenthub/events/a2a"
"google.golang.org/protobuf/types/known/structpb"
)
const (
agentID = "my_agent_processor"
)
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Create configuration with automatic observability
config := agenthub.NewGRPCConfig("subscriber")
config.HealthPort = "8082" // Unique port for this agent
// Create AgentHub client with built-in observability
client, err := agenthub.NewAgentHubClient(config)
if err != nil {
panic("Failed to create AgentHub client: " + err.Error())
}
// Automatic graceful shutdown
defer func() {
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 10*time.Second)
defer shutdownCancel()
if err := client.Shutdown(shutdownCtx); err != nil {
client.Logger.ErrorContext(shutdownCtx, "Error during shutdown", "error", err)
}
}()
// Start the client (enables observability)
if err := client.Start(ctx); err != nil {
client.Logger.ErrorContext(ctx, "Failed to start client", "error", err)
panic(err)
}
// Create A2A task subscriber with automatic observability
taskSubscriber := agenthub.NewA2ATaskSubscriber(client, agentID)
// Register A2A task handlers (see below for examples)
taskSubscriber.RegisterDefaultHandlers()
// Handle graceful shutdown
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
go func() {
<-sigChan
client.Logger.Info("Received shutdown signal")
cancel()
}()
client.Logger.InfoContext(ctx, "Starting subscriber agent")
// Start task subscription (with automatic observability)
go func() {
if err := taskSubscriber.SubscribeToTasks(ctx); err != nil {
client.Logger.ErrorContext(ctx, "Task subscription failed", "error", err)
}
}()
// Optional: Subscribe to task results if this agent also publishes tasks
go func() {
if err := taskSubscriber.SubscribeToTaskResults(ctx); err != nil {
client.Logger.ErrorContext(ctx, "Task result subscription failed", "error", err)
}
}()
client.Logger.InfoContext(ctx, "Agent started with observability. Listening for tasks.")
// Wait for context cancellation
<-ctx.Done()
client.Logger.Info("Agent shutdown complete")
}
Default Task Handlers
The RegisterDefaultHandlers() method provides built-in handlers for common task types:
greeting: Simple greeting with name parametermath_calculation: Basic arithmetic operations (add, subtract, multiply, divide)random_number: Random number generation with seed
Custom Task Handlers
Simple Custom Handler
Add your own task handlers using RegisterTaskHandler():
func setupCustomHandlers(taskSubscriber *agenthub.TaskSubscriber) {
// Register a custom data processing handler
taskSubscriber.RegisterTaskHandler("data_processing", handleDataProcessing)
// Register a file conversion handler
taskSubscriber.RegisterTaskHandler("file_conversion", handleFileConversion)
// Register a status check handler
taskSubscriber.RegisterTaskHandler("status_check", handleStatusCheck)
}
func handleDataProcessing(ctx context.Context, task *pb.TaskMessage) (*structpb.Struct, pb.TaskStatus, string) {
params := task.GetParameters()
datasetPath := params.Fields["dataset_path"].GetStringValue()
analysisType := params.Fields["analysis_type"].GetStringValue()
if datasetPath == "" {
return nil, pb.TaskStatus_TASK_STATUS_FAILED, "dataset_path parameter is required"
}
// Simulate data processing
time.Sleep(2 * time.Second)
result, err := structpb.NewStruct(map[string]interface{}{
"dataset_path": datasetPath,
"analysis_type": analysisType,
"records_processed": 1500,
"processing_time": "2.1s",
"summary": map[string]interface{}{
"mean": 42.7,
"median": 41.2,
"stddev": 8.3,
},
"processed_at": time.Now().Format(time.RFC3339),
})
if err != nil {
return nil, pb.TaskStatus_TASK_STATUS_FAILED, "Failed to create result structure"
}
return result, pb.TaskStatus_TASK_STATUS_COMPLETED, ""
}
Advanced Handler with Validation
func handleFileConversion(ctx context.Context, task *pb.TaskMessage) (*structpb.Struct, pb.TaskStatus, string) {
params := task.GetParameters()
// Extract and validate parameters
inputPath := params.Fields["input_path"].GetStringValue()
outputFormat := params.Fields["output_format"].GetStringValue()
if inputPath == "" {
return nil, pb.TaskStatus_TASK_STATUS_FAILED, "input_path parameter is required"
}
if outputFormat == "" {
return nil, pb.TaskStatus_TASK_STATUS_FAILED, "output_format parameter is required"
}
// Validate output format
validFormats := []string{"pdf", "docx", "txt", "html"}
isValidFormat := false
for _, format := range validFormats {
if outputFormat == format {
isValidFormat = true
break
}
}
if !isValidFormat {
return nil, pb.TaskStatus_TASK_STATUS_FAILED, fmt.Sprintf("unsupported output format: %s", outputFormat)
}
// Simulate file conversion process
time.Sleep(1 * time.Second)
outputPath := strings.Replace(inputPath, filepath.Ext(inputPath), "."+outputFormat, 1)
result, err := structpb.NewStruct(map[string]interface{}{
"input_path": inputPath,
"output_path": outputPath,
"output_format": outputFormat,
"file_size": "2.5MB",
"conversion_time": "1.2s",
"status": "success",
"converted_at": time.Now().Format(time.RFC3339),
})
if err != nil {
return nil, pb.TaskStatus_TASK_STATUS_FAILED, "Failed to create result structure"
}
return result, pb.TaskStatus_TASK_STATUS_COMPLETED, ""
}
Handler with External Service Integration
func handleStatusCheck(ctx context.Context, task *pb.TaskMessage) (*structpb.Struct, pb.TaskStatus, string) {
params := task.GetParameters()
serviceURL := params.Fields["service_url"].GetStringValue()
if serviceURL == "" {
return nil, pb.TaskStatus_TASK_STATUS_FAILED, "service_url parameter is required"
}
// Create HTTP client with timeout
client := &http.Client{
Timeout: 10 * time.Second,
}
// Perform health check
resp, err := client.Get(serviceURL + "/health")
if err != nil {
return nil, pb.TaskStatus_TASK_STATUS_FAILED, fmt.Sprintf("Failed to reach service: %v", err)
}
defer resp.Body.Close()
// Determine status
isHealthy := resp.StatusCode >= 200 && resp.StatusCode < 300
status := "unhealthy"
if isHealthy {
status = "healthy"
}
result, err := structpb.NewStruct(map[string]interface{}{
"service_url": serviceURL,
"status": status,
"status_code": resp.StatusCode,
"response_time": "150ms",
"checked_at": time.Now().Format(time.RFC3339),
})
if err != nil {
return nil, pb.TaskStatus_TASK_STATUS_FAILED, "Failed to create result structure"
}
return result, pb.TaskStatus_TASK_STATUS_COMPLETED, ""
}
Complete Agent Example
Here’s a complete agent that handles multiple task types:
package main
import (
"context"
"fmt"
"net/http"
"os"
"os/signal"
"path/filepath"
"strings"
"syscall"
"time"
"github.com/owulveryck/agenthub/internal/agenthub"
pb "github.com/owulveryck/agenthub/events/a2a"
"google.golang.org/protobuf/types/known/structpb"
)
const agentID = "multi_task_agent"
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Create AgentHub client with observability
config := agenthub.NewGRPCConfig("subscriber")
config.HealthPort = "8082"
client, err := agenthub.NewAgentHubClient(config)
if err != nil {
panic("Failed to create AgentHub client: " + err.Error())
}
defer func() {
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 10*time.Second)
defer shutdownCancel()
if err := client.Shutdown(shutdownCtx); err != nil {
client.Logger.ErrorContext(shutdownCtx, "Error during shutdown", "error", err)
}
}()
if err := client.Start(ctx); err != nil {
panic(err)
}
// Create and configure task subscriber
taskSubscriber := agenthub.NewTaskSubscriber(client, agentID)
// Register both default and custom handlers
taskSubscriber.RegisterDefaultHandlers()
setupCustomHandlers(taskSubscriber)
// Graceful shutdown handling
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
go func() {
<-sigChan
client.Logger.Info("Received shutdown signal")
cancel()
}()
client.Logger.InfoContext(ctx, "Starting multi-task agent")
// Start subscriptions
go func() {
if err := taskSubscriber.SubscribeToTasks(ctx); err != nil {
client.Logger.ErrorContext(ctx, "Task subscription failed", "error", err)
}
}()
go func() {
if err := taskSubscriber.SubscribeToTaskResults(ctx); err != nil {
client.Logger.ErrorContext(ctx, "Task result subscription failed", "error", err)
}
}()
client.Logger.InfoContext(ctx, "Agent ready to process tasks",
"supported_tasks", []string{"greeting", "math_calculation", "random_number", "data_processing", "file_conversion", "status_check"})
<-ctx.Done()
client.Logger.Info("Agent shutdown complete")
}
func setupCustomHandlers(taskSubscriber *agenthub.TaskSubscriber) {
taskSubscriber.RegisterTaskHandler("data_processing", handleDataProcessing)
taskSubscriber.RegisterTaskHandler("file_conversion", handleFileConversion)
taskSubscriber.RegisterTaskHandler("status_check", handleStatusCheck)
}
// ... (include the handler functions from above)
Automatic Features
The unified abstraction provides automatic features:
Observability
- Distributed tracing for each task processing
- Metrics collection for processing times and success rates
- Structured logging with correlation IDs
Task Management
- Automatic result publishing back to the broker
- Error handling and status reporting
- Progress tracking capabilities
Resource Management
- Graceful shutdown handling
- Connection management to the broker
- Health endpoints for monitoring
Best Practices
Parameter Validation: Always validate task parameters before processing
if requiredParam == "" {
return nil, pb.TaskStatus_TASK_STATUS_FAILED, "required_param is missing"
}
Error Handling: Provide meaningful error messages
if err != nil {
return nil, pb.TaskStatus_TASK_STATUS_FAILED, fmt.Sprintf("Processing failed: %v", err)
}
Timeouts: Use context with timeouts for external operations
client := &http.Client{Timeout: 10 * time.Second}
Resource Cleanup: Always clean up resources in handlers
defer file.Close()
defer resp.Body.Close()
Structured Results: Return well-structured result data
result, _ := structpb.NewStruct(map[string]interface{}{
"status": "completed",
"timestamp": time.Now().Format(time.RFC3339),
"data": processedData,
})
Handler Function Signature
All task handlers must implement the TaskHandler interface:
type TaskHandler func(ctx context.Context, task *pb.TaskMessage) (*structpb.Struct, pb.TaskStatus, string)
Return values:
*structpb.Struct: The result data (can be nil on failure)pb.TaskStatus: One of:pb.TaskStatus_TASK_STATUS_COMPLETEDpb.TaskStatus_TASK_STATUS_FAILEDpb.TaskStatus_TASK_STATUS_CANCELLED
string: Error message (empty string on success)
Your agent is now ready to receive and process tasks from other agents in the system with full observability and automatic result publishing!
1.3 -
How to Create an Agent with Cortex Auto-Discovery
This guide shows you how to create an agent using the SubAgent library, which handles all the boilerplate and lets you focus on your agent’s business logic.
What You’ll Build
An agent that:
- Automatically registers with the broker on startup
- Gets discovered by Cortex for LLM-based task delegation
- Processes delegated tasks and returns results
- Has built-in observability (tracing, logging, metrics)
- Handles graceful shutdown
All with ~50 lines of code instead of 500+.
Prerequisites
- AgentHub broker running
- Cortex orchestrator running
- Basic understanding of Go
- Familiarity with the A2A protocol (helpful but not required)
Step 1: Import the SubAgent Library
package main
import (
"context"
"fmt"
"log"
"time"
pb "github.com/owulveryck/agenthub/events/a2a"
"github.com/owulveryck/agenthub/internal/subagent"
"google.golang.org/protobuf/types/known/structpb"
)
Step 2: Define Your Agent Configuration
func main() {
// Configure your agent with required fields
config := &subagent.Config{
AgentID: "agent_translator", // Unique agent identifier
ServiceName: "translator_service", // Optional gRPC service name
Name: "Translation Agent", // Human-readable name
Description: "Translates text between languages using AI models",
Version: "1.0.0", // Optional, defaults to 1.0.0
HealthPort: "8087", // Optional, defaults to 8080
}
// Create the subagent
agent, err := subagent.New(config)
if err != nil {
log.Fatal(err)
}
Step 3: Register Your Skills
Skills are capabilities your agent provides. Each skill needs a handler function:
// Add a translation skill
agent.MustAddSkill(
"Language Translation", // Skill name (shown to LLM)
"Translates text from one language to another", // Description
translateHandler, // Your handler function
)
// You can add multiple skills
agent.MustAddSkill(
"Language Detection",
"Detects the language of input text",
detectLanguageHandler,
)
Best Practices for Skill Definition
- Clear Names: Use descriptive skill names that the LLM can understand
- Specific Descriptions: Explain what the skill does and when to use it
- Multiple Skills: An agent can have multiple related skills
Step 4: Implement Your Handler Functions
A handler function receives a task and returns a result:
// Handler signature: (ctx, task, message) -> (artifact, state, errorMessage)
func translateHandler(ctx context.Context, task *pb.Task, message *pb.Message) (*pb.Artifact, pb.TaskState, string) {
// 1. Extract input from the message
var inputText string
for _, part := range message.Content {
if text := part.GetText(); text != "" {
inputText = text
break
}
}
if inputText == "" {
return nil, pb.TaskState_TASK_STATE_FAILED, "No input text provided"
}
// 2. Extract parameters from metadata (optional)
targetLang := "en" // default
if message.Metadata != nil && message.Metadata.Fields != nil {
if lang, exists := message.Metadata.Fields["target_language"]; exists {
targetLang = lang.GetStringValue()
}
}
// 3. Perform your business logic
translatedText, err := performTranslation(ctx, inputText, targetLang)
if err != nil {
return nil, pb.TaskState_TASK_STATE_FAILED, fmt.Sprintf("Translation failed: %v", err)
}
// 4. Create an artifact with your result
artifact := &pb.Artifact{
ArtifactId: fmt.Sprintf("translation_%s_%d", task.GetId(), time.Now().Unix()),
Name: "translation_result",
Description: fmt.Sprintf("Translation to %s", targetLang),
Parts: []*pb.Part{
{
Part: &pb.Part_Text{
Text: translatedText,
},
},
},
Metadata: &structpb.Struct{
Fields: map[string]*structpb.Value{
"original_text": structpb.NewStringValue(inputText),
"target_language": structpb.NewStringValue(targetLang),
"translated_at": structpb.NewStringValue(time.Now().Format(time.RFC3339)),
},
},
}
// 5. Return success
return artifact, pb.TaskState_TASK_STATE_COMPLETED, ""
}
func performTranslation(ctx context.Context, text, targetLang string) (string, error) {
// Implement your actual translation logic here
// This could call an external API, use a local model, etc.
// Example placeholder:
return fmt.Sprintf("[Translated to %s]: %s", targetLang, text), nil
}
Handler Return Values
Your handler returns three values:
*pb.Artifact: The result data (or nil if failed)pb.TaskState: Status code (TASK_STATE_COMPLETED, TASK_STATE_FAILED, etc.)string: Error message (empty string if successful)
Step 5: Run Your Agent
// Run the agent (blocks until shutdown signal)
if err := agent.Run(context.Background()); err != nil {
log.Fatal(err)
}
}
That’s it! The SubAgent library handles:
- ✅ gRPC client setup and connection
- ✅ Agent card creation with A2A-compliant structure
- ✅ Broker registration and auto-discovery by Cortex
- ✅ Task subscription and routing
- ✅ Distributed tracing (automatic span creation)
- ✅ Structured logging (all operations logged)
- ✅ Graceful shutdown (SIGINT/SIGTERM handling)
- ✅ Health checks
- ✅ Error handling
Complete Example
Here’s a full working agent in ~80 lines:
package main
import (
"context"
"fmt"
"log"
"time"
pb "github.com/owulveryck/agenthub/events/a2a"
"github.com/owulveryck/agenthub/internal/subagent"
"google.golang.org/protobuf/types/known/structpb"
)
func main() {
config := &subagent.Config{
AgentID: "agent_translator",
ServiceName: "translator_service",
Name: "Translation Agent",
Description: "Translates text between languages using AI models",
Version: "1.0.0",
HealthPort: "8087",
}
agent, err := subagent.New(config)
if err != nil {
log.Fatal(err)
}
agent.MustAddSkill(
"Language Translation",
"Translates text from one language to another. Supports major languages including English, Spanish, French, German, Japanese, and Chinese",
translateHandler,
)
if err := agent.Run(context.Background()); err != nil {
log.Fatal(err)
}
}
func translateHandler(ctx context.Context, task *pb.Task, message *pb.Message) (*pb.Artifact, pb.TaskState, string) {
var inputText string
for _, part := range message.Content {
if text := part.GetText(); text != "" {
inputText = text
break
}
}
if inputText == "" {
return nil, pb.TaskState_TASK_STATE_FAILED, "No input text provided"
}
targetLang := "en"
if message.Metadata != nil && message.Metadata.Fields != nil {
if lang, exists := message.Metadata.Fields["target_language"]; exists {
targetLang = lang.GetStringValue()
}
}
translatedText := fmt.Sprintf("[Translated to %s]: %s", targetLang, inputText)
artifact := &pb.Artifact{
ArtifactId: fmt.Sprintf("translation_%s_%d", task.GetId(), time.Now().Unix()),
Name: "translation_result",
Description: fmt.Sprintf("Translation to %s", targetLang),
Parts: []*pb.Part{
{Part: &pb.Part_Text{Text: translatedText}},
},
Metadata: &structpb.Struct{
Fields: map[string]*structpb.Value{
"original_text": structpb.NewStringValue(inputText),
"target_language": structpb.NewStringValue(targetLang),
},
},
}
return artifact, pb.TaskState_TASK_STATE_COMPLETED, ""
}
Build and Test
# Build your agent
go build -o bin/translator ./agents/translator
# Start broker (if not running)
./bin/broker &
# Start Cortex (if not running)
./bin/cortex &
# Start your agent
./bin/translator
Verify Registration
Check the logs:
# Your agent logs should show:
time=... level=INFO msg="Agent card registered" agent_id=agent_translator skills=1
time=... level=INFO msg="Agent started successfully" agent_id=agent_translator name="Translation Agent" skills=1
# Cortex logs should show:
time=... level=INFO msg="Received agent card event" agent_id=agent_translator event_type=registered
time=... level=INFO msg="Agent registered with Cortex orchestrator" agent_id=agent_translator total_agents=N
Test with Chat CLI
./bin/chat_cli
# Try these prompts (if using VertexAI LLM):
> Can you translate "hello world" to Spanish?
> Translate "good morning" to French
How It Works
sequenceDiagram
participant A as Your Agent
participant SL as SubAgent Library
participant B as Broker
participant C as Cortex
participant L as LLM
participant U as User
Note over A: agent.Run() called
A->>SL: Initialize
SL->>B: RegisterAgent(AgentCard)
B->>C: AgentCardEvent
C->>C: Register agent
Note over C: Agent now available
U->>C: "Translate this to Spanish"
C->>L: Decide(history, agents, message)
L-->>C: Delegate to translator
C->>B: Publish task
B->>SL: Route to agent
SL->>SL: Start tracing span
SL->>SL: Log task receipt
SL->>A: Call your handler
A->>A: Process translation
A-->>SL: Return artifact
SL->>SL: Log completion
SL->>SL: End tracing span
SL->>B: Publish result
B->>C: Route result
C->>U: "Aquí está: Hola mundo"Advanced Usage
Accessing the Client
If you need access to the underlying AgentHub client:
client := agent.GetClient()
logger := agent.GetLogger()
config := agent.GetConfig()
Custom Configuration
config := &subagent.Config{
AgentID: "my_agent",
ServiceName: "custom_service_name", // Optional
Name: "My Agent",
Description: "Does amazing things",
Version: "2.0.0",
HealthPort: "9000",
BrokerAddr: "broker.example.com", // Optional
BrokerPort: "50051", // Optional
}
Multiple Skills Example
agent.MustAddSkill("Skill A", "Description A", handlerA)
agent.MustAddSkill("Skill B", "Description B", handlerB)
agent.MustAddSkill("Skill C", "Description C", handlerC)
// Each skill gets its own handler function
// The SubAgent library routes tasks to the correct handler based on task type
Error Handling in Handlers
func myHandler(ctx context.Context, task *pb.Task, message *pb.Message) (*pb.Artifact, pb.TaskState, string) {
result, err := doWork(ctx, message)
if err != nil {
// Return failure with error message
return nil, pb.TaskState_TASK_STATE_FAILED, err.Error()
}
// Return success with result
artifact := createArtifact(result)
return artifact, pb.TaskState_TASK_STATE_COMPLETED, ""
}
What the SubAgent Library Provides
Automatic Setup
- gRPC client connection to broker
- Health check endpoint
- Signal handling for graceful shutdown
- Configuration validation with defaults
A2A-Compliant AgentCard
- Correct protocol version (0.2.9)
- Proper capabilities structure
- Complete skill definitions with all required fields
- Automatic skill ID generation and tagging
Observability
- Tracing: Automatic span creation for each task with attributes
- Logging: Structured logging for all operations (registration, task receipt, completion, errors)
- Metrics: Built-in metrics collection (via AgentHub client)
Task Management
- Automatic task subscription
- Skill-based handler routing
- Error handling and reporting
- Result publishing
Developer Experience
- Simple 3-step API:
New() → AddSkill() → Run() - Clear error messages
- Type-safe handler functions
- Automatic resource cleanup
Best Practices
1. Skill Design
- Be specific: Clear descriptions help the LLM delegate correctly
- Single responsibility: Each skill should do one thing well
- Related skills: Group related capabilities in one agent
2. Handler Implementation
- Validate input: Always check that required data is present
- Handle errors gracefully: Return meaningful error messages
- Include metadata: Add useful context to your artifacts
- Keep it focused: Handlers should do one thing
3. Configuration
- Unique ports: Each agent needs a unique health port
- Meaningful names: Use descriptive agent IDs and names
- Version appropriately: Use semantic versioning
4. Testing
- Unit test handlers: Test business logic independently
- Integration test: Verify agent works with broker and Cortex
- E2E test: Test the complete flow with the LLM
Troubleshooting
Build Errors
Import issues:
go mod tidy
go mod download
Agent Not Registering
Check:
- Broker is running and accessible
- Config has all required fields (AgentID, Name, Description)
- Health port is not in use by another service
- Logs for specific error messages
Tasks Not Reaching Agent
Check:
- Cortex is running and has discovered your agent
- Skill names and descriptions match what users are asking for
- LLM is configured (not using mock LLM for delegation)
- Check broker and Cortex logs for routing events
Handler Errors
Check:
- Handler function signature matches
TaskHandler type - Input validation is working correctly
- Error messages are being returned properly
- Logs show task receipt and processing
Next Steps
With the SubAgent library, creating production-ready agents is now as simple as defining your configuration, implementing your business logic, and calling Run()!
1.4 -
How to Design Effective Agent Cards
This guide shows you how to design AgentCards that enable effective LLM-based discovery and delegation in the Cortex orchestration system.
Why AgentCards Matter
When you register an agent with AgentHub, the Cortex orchestrator uses your AgentCard to:
- Understand your agent’s capabilities - What can it do?
- Match user requests - Does this request fit this agent?
- Generate LLM prompts - Include your agent in decision-making
- Delegate tasks - Route appropriate work to your agent
The quality of your AgentCard directly impacts how effectively Cortex can use your agent.
AgentCard Structure
type AgentCard struct {
ProtocolVersion string // A2A protocol version (e.g., "0.2.9")
Name string // Unique agent identifier
Description string // Human-readable description
Version string // Agent version (e.g., "1.0.0")
Url string // Service endpoint (optional)
Capabilities *AgentCapabilities // Technical capabilities
Skills []*AgentSkill // What the agent can do
// ... other fields
}
The most important field for Cortex integration is Skills.
Designing Skills
Each skill represents a specific capability your agent offers. The LLM uses skill information to decide when to delegate to your agent.
Skill Structure
type AgentSkill struct {
Id string // Unique skill identifier
Name string // Human-readable skill name
Description string // Detailed description of what this skill does
Tags []string // Categorization and keywords
Examples []string // Example user requests that match this skill
InputModes []string // Supported input types (e.g., "text/plain")
OutputModes []string // Supported output types
}
Writing Effective Descriptions
❌ Poor description:
Description: "Processes data"
✅ Good description:
Description: "Analyzes time-series data to detect anomalies using statistical methods. " +
"Supports multiple algorithms including Z-score, moving average, and ARIMA. " +
"Returns anomaly locations, severity scores, and confidence intervals."
Why the good description works:
- Specific about what it does (“analyzes time-series data”)
- Mentions the method (“statistical methods”)
- Lists supported features (“Z-score, moving average, ARIMA”)
- Describes output (“anomaly locations, severity scores, confidence intervals”)
Writing Powerful Examples
Examples are critical - the LLM uses them to recognize when a user request matches your skill.
❌ Weak examples:
Examples: []string{
"analyze data",
"find problems",
}
✅ Strong examples:
Examples: []string{
"Can you detect anomalies in this time series?",
"Find unusual patterns in the sensor data",
"Analyze this dataset for outliers",
"Check if there are any abnormal readings",
"Identify spikes or drops in the data",
"Run anomaly detection on this log file",
"Are there any suspicious values in this series?",
}
Why strong examples work:
- Variety: Different phrasings (“detect anomalies”, “find unusual patterns”, “outliers”)
- Natural language: How users actually ask questions
- Specific: Mention domain terms (“time series”, “sensor data”, “log file”)
- Action-oriented: Clear about what to do
- Multiple formats: Questions and commands
Example Categories to Cover
For each skill, include examples that cover:
- Direct requests: “Translate this text to Spanish”
- Questions: “Can you convert this to French?”
- Implied tasks: “I need this in German”
- Variations: “Spanish translation please”
- With context: “Translate the following paragraph to Japanese: …”
- Different phrasings: “Convert to Spanish”, “Change to Spanish”, “Make it Spanish”
Complete Examples
Example 1: Translation Agent
agentCard := &pb.AgentCard{
ProtocolVersion: "0.2.9",
Name: "agent_translator",
Description: "Professional-grade language translation service powered by neural machine translation. " +
"Supports 100+ languages with context-aware translation and proper handling of idioms, " +
"technical terms, and cultural nuances.",
Version: "2.1.0",
Capabilities: &pb.AgentCapabilities{
Streaming: false,
PushNotifications: false,
},
Skills: []*pb.AgentSkill{
{
Id: "translate_text",
Name: "Text Translation",
Description: "Translates text between any pair of 100+ supported languages including " +
"English, Spanish, French, German, Chinese, Japanese, Arabic, Russian, and many more. " +
"Preserves formatting, handles idioms, and maintains context. " +
"Supports both short phrases and long documents.",
Tags: []string{
"translation", "language", "nlp", "localization",
"multilingual", "i18n", "communication",
},
Examples: []string{
"Translate this to Spanish",
"Can you convert this text to French?",
"I need this paragraph in Japanese",
"Translate from English to German",
"What does this mean in Chinese?",
"Convert this Spanish text to English",
"Please translate to Portuguese",
"How do you say this in Italian?",
"Russian translation needed",
"Change this to Arabic",
},
InputModes: []string{"text/plain", "text/html"},
OutputModes: []string{"text/plain", "text/html"},
},
{
Id: "detect_language",
Name: "Language Detection",
Description: "Automatically identifies the language of input text with high accuracy. " +
"Can detect 100+ languages and provides confidence scores. " +
"Useful for routing, preprocessing, and automatic translation workflows.",
Tags: []string{"language", "detection", "nlp", "identification"},
Examples: []string{
"What language is this text in?",
"Detect the language",
"Can you identify this language?",
"Which language is this?",
"Tell me what language this is",
},
InputModes: []string{"text/plain"},
OutputModes: []string{"text/plain"},
},
},
}
Example 2: Data Analysis Agent
agentCard := &pb.AgentCard{
ProtocolVersion: "0.2.9",
Name: "agent_data_analyst",
Description: "Advanced data analysis and statistical computing agent. Performs exploratory " +
"data analysis, statistical tests, correlation analysis, and generates insights from datasets.",
Version: "1.5.2",
Capabilities: &pb.AgentCapabilities{
Streaming: true, // Can stream large results
PushNotifications: false,
},
Skills: []*pb.AgentSkill{
{
Id: "analyze_dataset",
Name: "Dataset Analysis",
Description: "Performs comprehensive statistical analysis on datasets including " +
"descriptive statistics (mean, median, std dev), distribution analysis, " +
"outlier detection, correlation matrices, and trend identification. " +
"Supports CSV, JSON, and structured data formats.",
Tags: []string{
"data-analysis", "statistics", "analytics", "dataset",
"eda", "exploratory", "insights",
},
Examples: []string{
"Analyze this dataset",
"Can you provide statistics for this data?",
"What are the key insights from this CSV?",
"Run an analysis on this data file",
"Give me a statistical summary",
"Find correlations in this dataset",
"What patterns do you see in this data?",
"Analyze the distribution of these values",
"Calculate descriptive statistics",
"Identify trends in this time series",
},
InputModes: []string{"text/csv", "application/json", "text/plain"},
OutputModes: []string{"application/json", "text/plain", "text/html"},
},
{
Id: "visualize_data",
Name: "Data Visualization",
Description: "Creates charts and graphs from data including line charts, bar charts, " +
"scatter plots, histograms, box plots, and heatmaps. Returns visualization " +
"specifications in various formats.",
Tags: []string{"visualization", "charts", "graphs", "plotting"},
Examples: []string{
"Create a chart from this data",
"Visualize this dataset",
"Make a graph of these values",
"Plot this time series",
"Show me a chart",
"Generate a histogram",
"Can you create a scatter plot?",
},
InputModes: []string{"text/csv", "application/json"},
OutputModes: []string{"image/png", "application/json", "text/html"},
},
},
}
Example 3: Image Processing Agent
agentCard := &pb.AgentCard{
ProtocolVersion: "0.2.9",
Name: "agent_image_processor",
Description: "Image processing and computer vision agent with capabilities for transformation, " +
"enhancement, analysis, and object detection. Supports all major image formats.",
Version: "3.0.0",
Skills: []*pb.AgentSkill{
{
Id: "resize_image",
Name: "Image Resizing",
Description: "Resizes images to specified dimensions while maintaining aspect ratio " +
"and quality. Supports various scaling algorithms including bicubic, lanczos, " +
"and nearest neighbor. Can batch process multiple images.",
Tags: []string{"image", "resize", "scale", "transform", "dimensions"},
Examples: []string{
"Resize this image to 800x600",
"Make this image smaller",
"Scale this photo to 50%",
"Can you resize to thumbnail size?",
"Change image dimensions",
"Shrink this image",
"Make it 1920x1080",
},
InputModes: []string{"image/jpeg", "image/png", "image/webp"},
OutputModes: []string{"image/jpeg", "image/png", "image/webp"},
},
{
Id: "detect_objects",
Name: "Object Detection",
Description: "Detects and identifies objects in images using deep learning models. " +
"Can recognize 1000+ object categories including people, animals, vehicles, " +
"furniture, and more. Returns bounding boxes and confidence scores.",
Tags: []string{
"computer-vision", "object-detection", "ai", "recognition",
"detection", "classification",
},
Examples: []string{
"What objects are in this image?",
"Detect objects in this photo",
"What do you see in this picture?",
"Identify items in this image",
"Find all people in this photo",
"Detect cars in this image",
"What's in this picture?",
},
InputModes: []string{"image/jpeg", "image/png"},
OutputModes: []string{"application/json", "text/plain"},
},
},
}
Best Practices Checklist
✅ Description Quality
✅ Skill Design
✅ Examples Coverage
Testing Your AgentCard
1. Manual Testing
Start your agent and check Cortex logs:
grep "Agent skills registered" cortex.log
You should see your skill descriptions displayed.
2. LLM Prompt Testing
Check what the LLM sees by enabling DEBUG logging in Cortex:
LOG_LEVEL=DEBUG ./bin/cortex
Look for prompts that include:
Available agents:
- your_agent: Your agent description
Skills:
* Skill Name: Skill description
3. Integration Testing
Test with actual user requests:
# Start services
./bin/broker &
./bin/cortex &
./bin/your_agent &
# Use chat CLI
./bin/chat_cli
# Try requests that match your examples
> Can you translate this to Spanish?
Watch the logs to see if Cortex delegates to your agent.
Common Mistakes to Avoid
❌ Vague Descriptions
Description: "Does useful things"
Problem: LLM can’t determine if this agent is suitable
❌ Too Few Examples
Examples: []string{"do the thing"}
Problem: LLM won’t recognize variations
❌ Technical Jargon in Examples
Examples: []string{
"Execute POST /api/v1/translate with payload",
}
Problem: Users don’t talk like this
❌ Overly Broad Skills
{
Name: "Do Everything",
Description: "This agent can help with anything",
}
Problem: LLM can’t make good decisions
❌ Missing Context
{
Name: "Process",
Description: "Processes the input",
}
Problem: What kind of processing? What input?
Advanced Topics
Multi-Language Support
Include examples in multiple languages if your agent supports them:
Examples: []string{
"Translate to Spanish",
"Traduire en français",
"Übersetzen Sie nach Deutsch",
"日本語に翻訳",
}
Conditional Capabilities
Use metadata to indicate conditional features:
Metadata: &structpb.Struct{
Fields: map[string]*structpb.Value{
"requires_api_key": structpb.NewBoolValue(true),
"max_input_size": structpb.NewNumberValue(10000),
"rate_limit": structpb.NewStringValue("100/minute"),
},
}
Skill Dependencies
Indicate if skills build on each other:
{
Id: "advanced_analysis",
Description: "Advanced statistical analysis. Requires dataset to be preprocessed " +
"using the 'clean_data' skill first.",
}
Iteration and Improvement
Your AgentCard isn’t set in stone. Improve it based on:
- Usage patterns: What requests do users actually make?
- Delegation success: Is Cortex routing appropriate tasks?
- User feedback: Are users getting what they expect?
- LLM behavior: What decisions is the LLM making?
Update your AgentCard and restart your agent to reflect improvements.
Next Steps
Well-designed AgentCards are the key to effective AI orchestration!
2.1 - How to Add Observability to Your Agent
Use AgentHub’s unified abstractions to automatically get distributed tracing, metrics, and structured logging in your agents.
How to Add Observability to Your Agent
Goal-oriented guide: Use AgentHub’s unified abstractions to automatically get distributed tracing, metrics, and structured logging in your agents with minimal configuration.
Prerequisites
- Go 1.24+ installed
- Basic understanding of AgentHub concepts
- 10-15 minutes
Overview: What You Get Automatically
With AgentHub’s unified abstractions, you automatically get:
✅ Distributed Tracing - OpenTelemetry traces with correlation IDs
✅ Comprehensive Metrics - Performance and health monitoring
✅ Structured Logging - JSON logs with trace correlation
✅ Health Endpoints - HTTP health checks and metrics endpoints
✅ Graceful Shutdown - Clean resource management
Quick Start: Observable Agent in 5 Minutes
Step 1: Create Your Agent Using Abstractions
package main
import (
"context"
"time"
"github.com/owulveryck/agenthub/internal/agenthub"
)
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel()
// Create configuration (observability included automatically)
config := agenthub.NewGRPCConfig("my-agent")
config.HealthPort = "8083" // Unique port for your agent
// Create AgentHub client (observability built-in)
client, err := agenthub.NewAgentHubClient(config)
if err != nil {
panic("Failed to create AgentHub client: " + err.Error())
}
// Automatic graceful shutdown
defer func() {
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 10*time.Second)
defer shutdownCancel()
if err := client.Shutdown(shutdownCtx); err != nil {
client.Logger.ErrorContext(shutdownCtx, "Error during shutdown", "error", err)
}
}()
// Start the client (enables observability)
if err := client.Start(ctx); err != nil {
client.Logger.ErrorContext(ctx, "Failed to start client", "error", err)
panic(err)
}
// Your agent logic here...
client.Logger.Info("My observable agent is running!")
// Keep running
select {}
}
That’s it! Your agent now has full observability.
Set observability configuration via environment:
# Tracing configuration
export JAEGER_ENDPOINT="http://localhost:14268/api/traces"
export OTEL_SERVICE_NAME="my-agent"
export OTEL_SERVICE_VERSION="1.0.0"
# Health server port
export BROKER_HEALTH_PORT="8083"
# Broker connection
export AGENTHUB_BROKER_ADDR="localhost"
export AGENTHUB_BROKER_PORT="50051"
Step 3: Run Your Observable Agent
Expected Output:
time=2025-09-29T10:00:00.000Z level=INFO msg="Starting health server" port=8083
time=2025-09-29T10:00:00.000Z level=INFO msg="AgentHub client connected" broker_addr=localhost:50051
time=2025-09-29T10:00:00.000Z level=INFO msg="My observable agent is running!"
Available Observability Features
Automatic Health Endpoints
Your agent automatically exposes:
- Health Check:
http://localhost:8083/health - Metrics:
http://localhost:8083/metrics (Prometheus format) - Readiness:
http://localhost:8083/ready
Structured Logging
All logs are automatically structured with trace correlation:
{
"time": "2025-09-29T10:00:00.000Z",
"level": "INFO",
"msg": "Task published",
"trace_id": "abc123...",
"span_id": "def456...",
"task_type": "process_document",
"correlation_id": "req_789"
}
Distributed Tracing
Traces are automatically created for:
- gRPC calls to broker
- Task publishing and subscribing
- Custom operations (when you use the TraceManager)
Metrics Collection
Automatic metrics include:
- Task processing duration
- Success/failure rates
- gRPC call metrics
- Health check status
Advanced Usage
Adding Custom Tracing
Use the built-in TraceManager for custom operations:
// Custom operation with tracing
ctx, span := client.TraceManager.StartPublishSpan(ctx, "my_operation", "document")
defer span.End()
// Add custom attributes
client.TraceManager.AddComponentAttribute(span, "my-component")
span.SetAttributes(attribute.String("document.id", "doc-123"))
// Your operation logic
result, err := doCustomOperation(ctx)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
}
Adding Custom Metrics
Use the MetricsManager for custom metrics:
// Start timing an operation
timer := client.MetricsManager.StartTimer()
defer timer(ctx, "my_operation", "my-component")
// Your operation
processDocument()
Custom Log Fields
Use the structured logger with context:
client.Logger.InfoContext(ctx, "Processing document",
"document_id", "doc-123",
"user_id", "user-456",
"processing_type", "ocr",
)
Publisher Example with Observability
package main
import (
"context"
"time"
"github.com/owulveryck/agenthub/internal/agenthub"
pb "github.com/owulveryck/agenthub/events/a2a"
"google.golang.org/protobuf/types/known/structpb"
)
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel()
// Observable client setup
config := agenthub.NewGRPCConfig("publisher")
config.HealthPort = "8081"
client, err := agenthub.NewAgentHubClient(config)
if err != nil {
panic(err)
}
defer client.Shutdown(context.Background())
if err := client.Start(ctx); err != nil {
panic(err)
}
// Create observable task publisher
publisher := &agenthub.TaskPublisher{
Client: client.Client,
TraceManager: client.TraceManager,
MetricsManager: client.MetricsManager,
Logger: client.Logger,
ComponentName: "publisher",
}
// Publish task with automatic tracing
data, _ := structpb.NewStruct(map[string]interface{}{
"message": "Hello, observable world!",
})
task := &pb.TaskMessage{
TaskId: "task-123",
TaskType: "greeting",
Data: data,
Priority: pb.Priority_MEDIUM,
}
// Automatically traced and metered
if err := publisher.PublishTask(ctx, task); err != nil {
client.Logger.ErrorContext(ctx, "Failed to publish task", "error", err)
} else {
client.Logger.InfoContext(ctx, "Task published successfully", "task_id", task.TaskId)
}
}
Subscriber Example with Observability
package main
import (
"context"
"os"
"os/signal"
"syscall"
"github.com/owulveryck/agenthub/internal/agenthub"
pb "github.com/owulveryck/agenthub/events/a2a"
"google.golang.org/protobuf/types/known/structpb"
)
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Observable client setup
config := agenthub.NewGRPCConfig("subscriber")
config.HealthPort = "8082"
client, err := agenthub.NewAgentHubClient(config)
if err != nil {
panic(err)
}
defer client.Shutdown(context.Background())
if err := client.Start(ctx); err != nil {
panic(err)
}
// Create observable task subscriber
subscriber := agenthub.NewTaskSubscriber(client, "my-subscriber")
// Register handler with automatic tracing
subscriber.RegisterHandler("greeting", func(ctx context.Context, task *pb.TaskMessage) (*structpb.Struct, pb.TaskStatus, string) {
// This is automatically traced and logged
client.Logger.InfoContext(ctx, "Processing greeting task", "task_id", task.TaskId)
// Your processing logic
result, _ := structpb.NewStruct(map[string]interface{}{
"response": "Hello back!",
})
return result, pb.TaskStatus_COMPLETED, ""
})
// Start processing with automatic observability
go subscriber.StartProcessing(ctx)
// Graceful shutdown
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
<-sigChan
}
Configuration Reference
📖 Complete Reference: For all environment variables and configuration options, see Environment Variables Reference
Key Environment Variables
| Variable | Description | Default |
|---|
JAEGER_ENDPOINT | Jaeger tracing endpoint | "" (tracing disabled) |
SERVICE_NAME | Service name for tracing | “agenthub-service” |
SERVICE_VERSION | Service version | “1.0.0” |
BROKER_HEALTH_PORT | Health endpoint port | “8080” |
AGENTHUB_BROKER_ADDR | Broker address | “localhost” |
AGENTHUB_BROKER_PORT | Broker port | “50051” |
Health Endpoints
Each agent exposes these endpoints:
| Endpoint | Purpose | Response |
|---|
/health | Overall health status | JSON status |
/metrics | Prometheus metrics | Metrics format |
/ready | Readiness check | 200 OK or 503 |
Troubleshooting
Common Issues
| Issue | Solution |
|---|
| No traces in Jaeger | Set JAEGER_ENDPOINT environment variable |
| Health endpoint not accessible | Check BROKER_HEALTH_PORT is unique |
| Logs not structured | Ensure using client.Logger not standard log |
| Missing correlation IDs | Use context.Context in all operations |
Verification Steps
Check health endpoint:
curl http://localhost:8083/health
Verify metrics:
curl http://localhost:8083/metrics
Check traces in Jaeger:
- Open http://localhost:16686
- Search for your service name
Migration from Manual Setup
If you have existing agents using manual observability setup:
Old Approach (Manual)
// 50+ lines of OpenTelemetry setup
obs, err := observability.NewObservability(config)
traceManager := observability.NewTraceManager(serviceName)
// Manual gRPC client setup
// Manual health server setup
New Approach (Unified)
// 3 lines - everything automatic
config := agenthub.NewGRPCConfig("my-agent")
client, err := agenthub.NewAgentHubClient(config)
client.Start(ctx)
The unified abstractions provide the same observability features with 90% less code and no manual setup required.
With AgentHub’s unified abstractions, observability is no longer an add-on feature but a built-in capability that comes automatically with every agent. Focus on your business logic while the platform handles monitoring, tracing, and health checks for you.
2.2 - How to Use Grafana Dashboards
Master the AgentHub observability dashboards to monitor, analyze, and troubleshoot your event-driven system effectively.
How to Use Grafana Dashboards
Goal-oriented guide: Master the AgentHub observability dashboards to monitor, analyze, and troubleshoot your event-driven system effectively.
Prerequisites
- AgentHub observability stack running (
docker-compose up -d) - AgentHub agents running with observability enabled
- Basic understanding of metrics concepts
- 10-15 minutes
Quick Access
- Grafana Dashboard: http://localhost:3333 (admin/admin)
- Direct Dashboard: http://localhost:3333/d/agenthub-eda-dashboard
Dashboard Overview
The AgentHub EDA System Observatory provides comprehensive monitoring across three main areas:
- Event Metrics (Top Row) - Event processing performance
- Distributed Tracing (Middle) - Request flow visualization
- System Health (Bottom Row) - Infrastructure monitoring
Panel-by-Panel Guide
🚀 Event Processing Rate (Top Left)
What it shows: Events processed per second by each service
How to use:
- Monitor throughput: See how many events your system processes
- Identify bottlenecks: Low rates may indicate performance issues
- Compare services: See which agents are busiest
Reading the chart:
Green line: agenthub-broker (150 events/sec)
Blue line: agenthub-publisher (50 events/sec)
Red line: agenthub-subscriber (145 events/sec)
Troubleshooting:
- Flat lines: No activity - check if agents are running
- Dropping rates: Performance degradation - check CPU/memory
- Spiky patterns: Bursty workloads - consider load balancing
🚨 Event Processing Error Rate (Top Right)
What it shows: Percentage of events that failed processing
How to use:
- Monitor reliability: Should stay below 5% (green zone)
- Alert threshold: Yellow above 5%, red above 10%
- Quick health check: Single glance system reliability
Color coding:
- Green (0-5%): Healthy system
- Yellow (5-10%): Moderate issues
- Red (>10%): Critical problems
Troubleshooting:
- High error rates: Check Jaeger for failing traces
- Sudden spikes: Look for recent deployments or config changes
- Persistent errors: Check logs for recurring issues
📈 Event Types Distribution (Middle Left)
What it shows: Breakdown of event types by volume
How to use:
- Understand workload: See what types of tasks dominate
- Capacity planning: Identify which task types need scaling
- Anomaly detection: Unusual distributions may indicate issues
Example interpretation:
greeting: 40% (blue) - Most common task type
math_calculation: 35% (green) - Heavy computational tasks
random_number: 20% (yellow) - Quick tasks
unknown_task: 5% (red) - Error-generating tasks
Troubleshooting:
- Missing task types: Check if specific agents are down
- Unexpected distributions: May indicate upstream issues
- Dominant error types: Focus optimization efforts
⏱️ Event Processing Latency (Middle Right)
What it shows: Processing time percentiles (p50, p95, p99)
How to use:
- Performance monitoring: Track how fast events are processed
- SLA compliance: Ensure latencies meet requirements
- Outlier detection: p99 shows worst-case scenarios
Understanding percentiles:
- p50 (median): 50% of events process faster than this
- p95: 95% of events process faster than this
- p99: 99% of events process faster than this
Healthy ranges:
- p50: < 50ms (very responsive)
- p95: < 200ms (good performance)
- p99: < 500ms (acceptable outliers)
Troubleshooting:
- Rising latencies: Check CPU/memory usage
- High p99: Look for resource contention or long-running tasks
- Flatlined metrics: May indicate measurement issues
🔍 Distributed Traces (Middle Section)
What it shows: Integration with Jaeger for trace visualization
How to use:
- Click “Explore” to open Jaeger
- Select service from dropdown
- Find specific traces to debug issues
- Analyze request flows across services
When to use:
- Debugging errors: Find root cause of failures
- Performance analysis: Identify slow operations
- Understanding flows: See complete request journeys
🖥️ Service CPU Usage (Bottom Left)
What it shows: CPU utilization by service
How to use:
- Capacity monitoring: Ensure services aren’t overloaded
- Resource planning: Identify when to scale
- Performance correlation: High CPU often explains high latency
Healthy ranges:
- < 50%: Comfortable utilization
- 50-70%: Moderate load
- > 70%: Consider scaling
💾 Service Memory Usage (Bottom Center)
What it shows: Memory consumption by service
How to use:
- Memory leak detection: Watch for continuously growing usage
- Capacity planning: Ensure sufficient memory allocation
- Garbage collection: High usage may impact performance
Monitoring tips:
- Steady growth: May indicate memory leaks
- Sawtooth pattern: Normal GC behavior
- Sudden spikes: Check for large event batches
🧵 Go Goroutines (Bottom Right)
What it shows: Number of concurrent goroutines per service
How to use:
- Concurrency monitoring: Track parallel processing
- Resource leak detection: Continuously growing numbers indicate leaks
- Performance tuning: Optimize concurrency levels
Normal patterns:
- Stable baseline: Normal operation
- Activity spikes: During high load
- Continuous growth: Potential goroutine leaks
🏥 Service Health Status (Bottom Far Right)
What it shows: Up/down status of each service
How to use:
- Quick status check: See if all services are running
- Outage detection: Immediately identify down services
- Health monitoring: Green = UP, Red = DOWN
Dashboard Variables and Filters
Service Filter
Location: Top of dashboard
Purpose: Filter metrics by specific services
Usage:
- Select “All” to see everything
- Choose specific services to focus analysis
- Useful for isolating problems to specific components
Event Type Filter
Location: Top of dashboard
Purpose: Filter by event/task types
Usage:
- Analyze specific workflow types
- Debug particular task categories
- Compare performance across task types
Time Range Selector
Location: Top right of dashboard
Purpose: Control time window for analysis
Common ranges:
- 5 minutes: Real-time monitoring
- 1 hour: Recent trend analysis
- 24 hours: Daily pattern analysis
- 7 days: Weekly trend and capacity planning
Advanced Usage Patterns
Start with Overview:
- Check error rates (should be < 5%)
- Verify processing rates look normal
- Scan for any red/yellow indicators
Drill Down on Issues:
- If high error rates → check distributed traces
- If high latency → examine CPU/memory usage
- If low throughput → check service health
Root Cause Analysis:
- Use time range selector to find when problems started
- Filter by specific services to isolate issues
- Correlate metrics across different panels
Capacity Planning Workflow
Analyze Peak Patterns:
- Set time range to 7 days
- Identify peak usage periods
- Note maximum throughput achieved
Resource Utilization:
- Check CPU usage during peaks
- Monitor memory consumption trends
- Verify goroutine scaling behavior
Plan Scaling:
- If CPU > 70% during peaks, scale up
- If memory continuously growing, investigate leaks
- If error rates spike during load, optimize before scaling
Troubleshooting Workflow
Identify Symptoms:
- High error rates: Focus on traces and logs
- High latency: Check resource utilization
- Low throughput: Verify service health
Time Correlation:
- Use time range to find when issues started
- Look for correlated changes across metrics
- Check for deployment or configuration changes
Service Isolation:
- Use service filter to identify problematic components
- Compare healthy vs unhealthy services
- Check inter-service dependencies
Dashboard Customization
Adding New Panels
- Click “+ Add panel” in top menu
- Choose visualization type:
- Time series for trends
- Stat for current values
- Gauge for thresholds
- Configure query:
# Example: Custom error rate
rate(my_custom_errors_total[5m]) / rate(my_custom_requests_total[5m]) * 100
Creating Alerts
- Edit existing panel or create new one
- Click “Alert” tab
- Configure conditions:
Query: rate(event_errors_total[5m]) / rate(events_processed_total[5m]) * 100
Condition: IS ABOVE 5
Evaluation: Every 1m for 2m
- Set notification channels
Custom Time Ranges
- Click time picker (top right)
- Select “Custom range”
- Set specific dates/times for historical analysis
- Use “Refresh” settings for auto-updating
Troubleshooting Dashboard Issues
Dashboard Not Loading
# Check Grafana status
docker-compose ps grafana
# Check Grafana logs
docker-compose logs grafana
# Restart if needed
docker-compose restart grafana
No Data in Panels
# Check Prometheus connection
curl http://localhost:9090/api/v1/targets
# Verify agents are exposing metrics
curl http://localhost:8080/metrics
curl http://localhost:8081/metrics
curl http://localhost:8082/metrics
# Check Prometheus configuration
docker-compose logs prometheus
- Reduce time range: Use shorter windows for better performance
- Limit service selection: Filter to specific services
- Optimize queries: Use appropriate rate intervals
- Check resource usage: Ensure Prometheus has enough memory
Authentication Issues
- Default credentials: admin/admin
- Reset password: Through Grafana UI after first login
- Lost access: Restart Grafana container to reset
Best Practices
Regular Monitoring
- Check dashboard daily: Quick health overview
- Weekly reviews: Trend analysis and capacity planning
- Set up alerts: Proactive monitoring for critical metrics
- Use appropriate time ranges: Don’t query more data than needed
- Filter effectively: Use service and event type filters
- Refresh intervals: Balance real-time needs with performance
Team Usage
- Share dashboard URLs: Bookmark specific views
- Create annotations: Mark deployments and incidents
- Export snapshots: Share findings with team members
Jaeger Integration
- Click Explore in traces panel
- Auto-links to Jaeger with service context
- Correlate traces with metrics timeframes
Prometheus Integration
- Click Explore on any panel
- Edit queries in Prometheus query language
- Access raw metrics for custom analysis
Log Correlation
- Use trace IDs from Jaeger
- Search logs for matching trace IDs
- Correlate log events with metric spikes
🎯 Next Steps:
Deep Debugging: Debug with Distributed Tracing
Production Setup: Configure Alerts
Understanding: Observability Architecture Explained
3 - Agent2Agent Protocol
Learn how to work with Agent2Agent (A2A) protocol components including messages, conversation contexts, artifacts, and task lifecycle management.
Agent2Agent Protocol How-To Guides
This section provides practical guides for working with the Agent2Agent (A2A) protocol in AgentHub. These guides show you how to implement A2A-compliant communication patterns for building robust agent systems.
Available Guides
Learn how to create, structure, and process A2A messages with text, data, and file content parts. This is the foundation for all A2A communication.
Understand how to manage conversation contexts for multi-turn interactions, workflow coordination, and state preservation across agent communications.
Master the creation and handling of A2A artifacts - structured outputs that deliver rich results from completed tasks.
Learn how to manage the complete task lifecycle from creation through completion, including state transitions, progress updates, and error handling.
A2A Protocol Benefits
The Agent2Agent protocol provides:
- Structured Communication: Standardized message formats with rich content types
- Conversation Threading: Context-aware message grouping for complex workflows
- Rich Artifacts: Structured outputs with multiple content types
- Lifecycle Management: Complete task state tracking from submission to completion
- Interoperability: Standards-based communication for multi-vendor agent systems
Prerequisites
Before following these guides:
- Complete the Installation and Setup tutorial
- Run the AgentHub Demo to see A2A in action
- Understand the Agent2Agent Principle
Implementation Approach
These guides use AgentHub’s unified abstractions from internal/agenthub which provide:
- A2ATaskPublisher: Simplified A2A task creation and publishing
- A2ATaskSubscriber: Streamlined A2A task processing and response generation
- Automatic Observability: Built-in tracing, metrics, and logging
- Environment Configuration: Zero-config setup with environment variables
Start with the A2A Messages guide to learn the fundamentals, then progress through the other guides to build complete A2A-compliant agent systems.
3.1 - How to Work with A2A Messages
Learn how to create, structure, and work with Agent2Agent protocol messages including text, data, and file parts.
How to Work with A2A Messages
This guide shows you how to create and work with Agent2Agent (A2A) protocol messages using AgentHub’s unified abstractions. A2A messages are the foundation of all agent communication.
Understanding A2A Message Structure
A2A messages consist of several key components:
- Message ID: Unique identifier for the message
- Context ID: Groups related messages in a conversation
- Task ID: Links the message to a specific task
- Role: Indicates if the message is from USER (requester) or AGENT (responder)
- Content Parts: The actual message content (text, data, or files)
- Metadata: Additional context for routing and processing
Creating Basic A2A Messages
Text Messages
Create a simple text message:
package main
import (
"context"
"fmt"
"log"
"github.com/google/uuid"
pb "github.com/owulveryck/agenthub/events/a2a"
"google.golang.org/protobuf/types/known/timestamppb"
)
func createTextMessage() *pb.Message {
return &pb.Message{
MessageId: fmt.Sprintf("msg_%s", uuid.New().String()),
ContextId: "conversation_greeting",
Role: pb.Role_USER,
Content: []*pb.Part{
{
Part: &pb.Part_Text{
Text: "Hello! Please process this greeting request.",
},
},
},
Metadata: nil, // Optional
}
}
Data Messages
Include structured data in your message:
import (
"google.golang.org/protobuf/types/known/structpb"
)
func createDataMessage() *pb.Message {
// Create structured data
data, err := structpb.NewStruct(map[string]interface{}{
"operation": "calculate",
"numbers": []float64{10, 20, 30},
"formula": "sum",
"precision": 2,
})
if err != nil {
log.Fatal(err)
}
return &pb.Message{
MessageId: fmt.Sprintf("msg_%s", uuid.New().String()),
ContextId: "conversation_math",
Role: pb.Role_USER,
Content: []*pb.Part{
{
Part: &pb.Part_Text{
Text: "Please perform the calculation described in the data.",
},
},
{
Part: &pb.Part_Data{
Data: &pb.DataPart{
Data: data,
Description: "Calculation parameters",
},
},
},
},
}
}
File Reference Messages
Reference files in your messages:
func createFileMessage() *pb.Message {
// Create file metadata
fileMetadata, _ := structpb.NewStruct(map[string]interface{}{
"source": "user_upload",
"category": "image",
"permissions": "read-only",
})
return &pb.Message{
MessageId: fmt.Sprintf("msg_%s", uuid.New().String()),
ContextId: "conversation_image_analysis",
Role: pb.Role_USER,
Content: []*pb.Part{
{
Part: &pb.Part_Text{
Text: "Please analyze the uploaded image.",
},
},
{
Part: &pb.Part_File{
File: &pb.FilePart{
FileId: "file_abc123",
Filename: "analysis_target.jpg",
MimeType: "image/jpeg",
SizeBytes: 2048576, // 2MB
Metadata: fileMetadata,
},
},
},
},
}
}
Working with Mixed Content
Combine multiple part types in a single message:
func createMixedContentMessage() *pb.Message {
// Configuration data
config, _ := structpb.NewStruct(map[string]interface{}{
"format": "json",
"output_dir": "/results",
"compress": true,
})
return &pb.Message{
MessageId: fmt.Sprintf("msg_%s", uuid.New().String()),
ContextId: "conversation_data_processing",
Role: pb.Role_USER,
Content: []*pb.Part{
{
Part: &pb.Part_Text{
Text: "Process the dataset with the following configuration and source file.",
},
},
{
Part: &pb.Part_Data{
Data: &pb.DataPart{
Data: config,
Description: "Processing configuration",
},
},
},
{
Part: &pb.Part_File{
File: &pb.FilePart{
FileId: "dataset_xyz789",
Filename: "raw_data.csv",
MimeType: "text/csv",
SizeBytes: 5242880, // 5MB
},
},
},
},
}
}
Publishing A2A Messages
Use AgentHub’s unified abstractions to publish messages:
package main
import (
"context"
"log"
"github.com/owulveryck/agenthub/internal/agenthub"
pb "github.com/owulveryck/agenthub/events/eventbus"
)
func publishA2AMessage(ctx context.Context) error {
// Create AgentHub client
config := agenthub.NewGRPCConfig("message_publisher")
client, err := agenthub.NewAgentHubClient(config)
if err != nil {
return err
}
defer client.Close()
// Create A2A message
message := createTextMessage()
// Publish using AgentHub client
response, err := client.Client.PublishMessage(ctx, &pb.PublishMessageRequest{
Message: message,
Routing: &pb.AgentEventMetadata{
FromAgentId: "message_publisher",
ToAgentId: "message_processor",
EventType: "a2a.message",
Priority: pb.Priority_PRIORITY_MEDIUM,
},
})
if err != nil {
return err
}
log.Printf("A2A message published: %s", response.GetEventId())
return nil
}
Processing Received A2A Messages
Handle incoming A2A messages in your agent:
func processA2AMessage(ctx context.Context, message *pb.Message) (string, error) {
var response string
// Process each content part
for i, part := range message.GetContent() {
switch content := part.GetPart().(type) {
case *pb.Part_Text:
log.Printf("Text part %d: %s", i, content.Text)
response += fmt.Sprintf("Processed text: %s\n", content.Text)
case *pb.Part_Data:
log.Printf("Data part %d: %s", i, content.Data.GetDescription())
// Process structured data
data := content.Data.GetData()
response += fmt.Sprintf("Processed data: %s\n", content.Data.GetDescription())
// Access specific fields
if operation, ok := data.GetFields()["operation"]; ok {
log.Printf("Operation: %s", operation.GetStringValue())
}
case *pb.Part_File:
log.Printf("File part %d: %s (%s)", i, content.File.GetFilename(), content.File.GetMimeType())
response += fmt.Sprintf("Processed file: %s\n", content.File.GetFilename())
// Handle file processing based on MIME type
switch content.File.GetMimeType() {
case "image/jpeg", "image/png":
// Process image
response += "Image analysis completed\n"
case "text/csv":
// Process CSV data
response += "CSV data parsed\n"
}
}
}
return response, nil
}
Message Role Management
Properly set message roles for A2A compliance:
// User message (requesting work)
func createUserMessage(content string) *pb.Message {
return &pb.Message{
MessageId: fmt.Sprintf("msg_%s", uuid.New().String()),
Role: pb.Role_USER,
Content: []*pb.Part{
{
Part: &pb.Part_Text{Text: content},
},
},
}
}
// Agent response message
func createAgentResponse(contextId, taskId, response string) *pb.Message {
return &pb.Message{
MessageId: fmt.Sprintf("msg_%s", uuid.New().String()),
ContextId: contextId,
TaskId: taskId,
Role: pb.Role_AGENT,
Content: []*pb.Part{
{
Part: &pb.Part_Text{Text: response},
},
},
}
}
Message Validation
Validate A2A messages before publishing:
func validateA2AMessage(message *pb.Message) error {
if message.GetMessageId() == "" {
return fmt.Errorf("message_id is required")
}
if message.GetRole() == pb.Role_ROLE_UNSPECIFIED {
return fmt.Errorf("role must be specified (USER or AGENT)")
}
if len(message.GetContent()) == 0 {
return fmt.Errorf("message must have at least one content part")
}
// Validate each part
for i, part := range message.GetContent() {
if part.GetPart() == nil {
return fmt.Errorf("content part %d is empty", i)
}
}
return nil
}
Best Practices
1. Always Use Unique Message IDs
messageID := fmt.Sprintf("msg_%d_%s", time.Now().Unix(), uuid.New().String())
2. Group Related Messages with Context IDs
contextID := fmt.Sprintf("ctx_%s_%s", workflowType, uuid.New().String())
dataPart := &pb.DataPart{
Data: structData,
Description: "User preferences for recommendation engine",
}
4. Validate Messages Before Publishing
if err := validateA2AMessage(message); err != nil {
return fmt.Errorf("invalid A2A message: %w", err)
}
5. Handle All Part Types in Message Processors
switch content := part.GetPart().(type) {
case *pb.Part_Text:
// Handle text
case *pb.Part_Data:
// Handle structured data
case *pb.Part_File:
// Handle file references
default:
log.Printf("Unknown part type: %T", content)
}
This guide covered the fundamentals of working with A2A messages. Next, learn about A2A Conversation Context to group related messages and maintain conversation state across multiple interactions.
3.2 - How to Work with A2A Conversation Context
Learn how to manage conversation contexts in Agent2Agent protocol for multi-turn interactions and workflow coordination.
How to Work with A2A Conversation Context
This guide shows you how to use A2A conversation contexts to group related messages, maintain state across interactions, and coordinate multi-agent workflows.
Understanding A2A Conversation Context
A2A conversation context is identified by a context_id that groups related messages and tasks. This enables:
- Multi-turn conversations between agents
- Workflow coordination across multiple tasks
- State preservation throughout long-running processes
- Message threading for audit trails
- Context-aware routing based on conversation history
Creating Conversation Contexts
Simple Conversation Context
Start a basic conversation context:
package main
import (
"fmt"
"github.com/google/uuid"
pb "github.com/owulveryck/agenthub/events/a2a"
)
func createConversationContext(workflowType string) string {
return fmt.Sprintf("ctx_%s_%s", workflowType, uuid.New().String())
}
func startConversation() *pb.Message {
contextID := createConversationContext("user_onboarding")
return &pb.Message{
MessageId: fmt.Sprintf("msg_%s", uuid.New().String()),
ContextId: contextID,
Role: pb.Role_USER,
Content: []*pb.Part{
{
Part: &pb.Part_Text{
Text: "Please start the user onboarding process for new user.",
},
},
},
}
}
Workflow-Specific Contexts
Create contexts for different workflow types:
func createWorkflowContexts() map[string]string {
return map[string]string{
"data_analysis": createConversationContext("data_analysis"),
"image_processing": createConversationContext("image_processing"),
"user_support": createConversationContext("user_support"),
"integration_test": createConversationContext("integration_test"),
}
}
Multi-Turn Conversations
Conversation Initiation
Start a conversation with initial context:
import (
"google.golang.org/protobuf/types/known/structpb"
)
func initiateDataAnalysisConversation() *pb.Message {
contextID := createConversationContext("data_analysis")
// Initial conversation metadata
contextMetadata, _ := structpb.NewStruct(map[string]interface{}{
"workflow_type": "data_analysis",
"initiated_by": "user_12345",
"priority": "high",
"expected_steps": []string{"validation", "processing", "analysis", "report"},
"timeout_minutes": 30,
})
return &pb.Message{
MessageId: fmt.Sprintf("msg_%s", uuid.New().String()),
ContextId: contextID,
Role: pb.Role_USER,
Content: []*pb.Part{
{
Part: &pb.Part_Text{
Text: "Please analyze the uploaded dataset and provide insights.",
},
},
{
Part: &pb.Part_Data{
Data: &pb.DataPart{
Data: contextMetadata,
Description: "Conversation context and workflow parameters",
},
},
},
},
Metadata: contextMetadata,
}
}
Continuing the Conversation
Add follow-up messages to the same context:
func continueConversation(contextID, previousMessageID string) *pb.Message {
return &pb.Message{
MessageId: fmt.Sprintf("msg_%s", uuid.New().String()),
ContextId: contextID, // Same context as initial message
Role: pb.Role_USER,
Content: []*pb.Part{
{
Part: &pb.Part_Text{
Text: "Can you also include trend analysis in the report?",
},
},
},
Metadata: &structpb.Struct{
Fields: map[string]*structpb.Value{
"follows_message": structpb.NewStringValue(previousMessageID),
"conversation_turn": structpb.NewNumberValue(2),
},
},
}
}
Agent Responses in Context
Agents respond within the same conversation context:
func createAgentResponse(contextID, requestMessageID, response string) *pb.Message {
return &pb.Message{
MessageId: fmt.Sprintf("msg_%s", uuid.New().String()),
ContextId: contextID, // Same context as request
Role: pb.Role_AGENT,
Content: []*pb.Part{
{
Part: &pb.Part_Text{
Text: response,
},
},
},
Metadata: &structpb.Struct{
Fields: map[string]*structpb.Value{
"responding_to": structpb.NewStringValue(requestMessageID),
"agent_id": structpb.NewStringValue("data_analysis_agent"),
},
},
}
}
Context-Aware Task Management
Creating Tasks with Context
Link tasks to conversation contexts:
import (
"google.golang.org/protobuf/types/known/timestamppb"
)
func createContextAwareTask(contextID string) *pb.Task {
taskID := fmt.Sprintf("task_%s_%s", "analysis", uuid.New().String())
return &pb.Task{
Id: taskID,
ContextId: contextID, // Link to conversation
Status: &pb.TaskStatus{
State: pb.TaskState_TASK_STATE_SUBMITTED,
Update: &pb.Message{
MessageId: fmt.Sprintf("msg_%s", uuid.New().String()),
ContextId: contextID,
TaskId: taskID,
Role: pb.Role_USER,
Content: []*pb.Part{
{
Part: &pb.Part_Text{
Text: "Task submitted for data analysis workflow",
},
},
},
},
Timestamp: timestamppb.Now(),
},
History: []*pb.Message{}, // Will be populated during processing
Artifacts: []*pb.Artifact{}, // Will be populated on completion
}
}
Context-Based Task Querying
Retrieve all tasks for a conversation context:
func getTasksForContext(ctx context.Context, client pb.AgentHubClient, contextID string) ([]*pb.Task, error) {
response, err := client.ListTasks(ctx, &pb.ListTasksRequest{
ContextId: contextID,
Limit: 100,
})
if err != nil {
return nil, err
}
return response.GetTasks(), nil
}
Workflow Coordination
Multi-Agent Workflow with Shared Context
Coordinate multiple agents within a single conversation:
type WorkflowCoordinator struct {
client pb.AgentHubClient
contextID string
logger *log.Logger
}
func (wc *WorkflowCoordinator) ExecuteDataPipeline(ctx context.Context) error {
// Step 1: Data Validation
validationTask := &pb.Task{
Id: fmt.Sprintf("task_validation_%s", uuid.New().String()),
ContextId: wc.contextID,
Status: &pb.TaskStatus{
State: pb.TaskState_TASK_STATE_SUBMITTED,
Update: &pb.Message{
MessageId: fmt.Sprintf("msg_%s", uuid.New().String()),
ContextId: wc.contextID,
Role: pb.Role_USER,
Content: []*pb.Part{
{
Part: &pb.Part_Text{
Text: "Validate uploaded dataset for quality and completeness",
},
},
},
},
Timestamp: timestamppb.Now(),
},
}
// Publish validation task
_, err := wc.client.PublishTaskUpdate(ctx, &pb.PublishTaskUpdateRequest{
Task: validationTask,
Routing: &pb.AgentEventMetadata{
FromAgentId: "workflow_coordinator",
ToAgentId: "data_validator",
EventType: "task.validation",
Priority: pb.Priority_PRIORITY_HIGH,
},
})
if err != nil {
return err
}
// Step 2: Data Processing (after validation)
processingTask := &pb.Task{
Id: fmt.Sprintf("task_processing_%s", uuid.New().String()),
ContextId: wc.contextID, // Same context
Status: &pb.TaskStatus{
State: pb.TaskState_TASK_STATE_SUBMITTED,
Update: &pb.Message{
MessageId: fmt.Sprintf("msg_%s", uuid.New().String()),
ContextId: wc.contextID,
Role: pb.Role_USER,
Content: []*pb.Part{
{
Part: &pb.Part_Text{
Text: "Process validated dataset and extract features",
},
},
},
Metadata: &structpb.Struct{
Fields: map[string]*structpb.Value{
"depends_on": structpb.NewStringValue(validationTask.GetId()),
"workflow_step": structpb.NewNumberValue(2),
},
},
},
Timestamp: timestamppb.Now(),
},
}
// Publish processing task
_, err = wc.client.PublishTaskUpdate(ctx, &pb.PublishTaskUpdateRequest{
Task: processingTask,
Routing: &pb.AgentEventMetadata{
FromAgentId: "workflow_coordinator",
ToAgentId: "data_processor",
EventType: "task.processing",
Priority: pb.Priority_PRIORITY_MEDIUM,
},
})
return err
}
Context State Management
Tracking Conversation State
Maintain state throughout the conversation:
type ConversationState struct {
ContextID string `json:"context_id"`
WorkflowType string `json:"workflow_type"`
CurrentStep int `json:"current_step"`
TotalSteps int `json:"total_steps"`
CompletedTasks []string `json:"completed_tasks"`
PendingTasks []string `json:"pending_tasks"`
Variables map[string]interface{} `json:"variables"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
}
func (cs *ConversationState) ToMetadata() (*structpb.Struct, error) {
data := map[string]interface{}{
"context_id": cs.ContextID,
"workflow_type": cs.WorkflowType,
"current_step": cs.CurrentStep,
"total_steps": cs.TotalSteps,
"completed_tasks": cs.CompletedTasks,
"pending_tasks": cs.PendingTasks,
"variables": cs.Variables,
"updated_at": cs.UpdatedAt.Format(time.RFC3339),
}
return structpb.NewStruct(data)
}
func (cs *ConversationState) UpdateFromMessage(message *pb.Message) {
cs.UpdatedAt = time.Now()
// Extract state updates from message metadata
if metadata := message.GetMetadata(); metadata != nil {
if step, ok := metadata.GetFields()["current_step"]; ok {
cs.CurrentStep = int(step.GetNumberValue())
}
if vars, ok := metadata.GetFields()["variables"]; ok {
if varsStruct := vars.GetStructValue(); varsStruct != nil {
for key, value := range varsStruct.GetFields() {
cs.Variables[key] = value
}
}
}
}
}
State-Aware Message Creation
Include conversation state in messages:
func createStateAwareMessage(contextID string, state *ConversationState, content string) *pb.Message {
stateMetadata, _ := state.ToMetadata()
return &pb.Message{
MessageId: fmt.Sprintf("msg_%s", uuid.New().String()),
ContextId: contextID,
Role: pb.Role_USER,
Content: []*pb.Part{
{
Part: &pb.Part_Text{
Text: content,
},
},
{
Part: &pb.Part_Data{
Data: &pb.DataPart{
Data: stateMetadata,
Description: "Current conversation state",
},
},
},
},
Metadata: stateMetadata,
}
}
Context-Based Routing
Route Messages Based on Context
Use conversation context for intelligent routing:
func routeByContext(contextID string) *pb.AgentEventMetadata {
// Determine routing based on context type
var targetAgent string
var eventType string
if strings.Contains(contextID, "data_analysis") {
targetAgent = "data_analysis_agent"
eventType = "data.analysis"
} else if strings.Contains(contextID, "image_processing") {
targetAgent = "image_processor"
eventType = "image.processing"
} else if strings.Contains(contextID, "user_support") {
targetAgent = "support_agent"
eventType = "support.request"
} else {
targetAgent = "" // Broadcast to all agents
eventType = "general.message"
}
return &pb.AgentEventMetadata{
FromAgentId: "context_router",
ToAgentId: targetAgent,
EventType: eventType,
Subscriptions: []string{eventType},
Priority: pb.Priority_PRIORITY_MEDIUM,
}
}
Subscribe to Context-Specific Events
Agents can subscribe to specific conversation contexts:
func subscribeToContextEvents(ctx context.Context, client pb.AgentHubClient, agentID, contextPattern string) error {
stream, err := client.SubscribeToMessages(ctx, &pb.SubscribeToMessagesRequest{
AgentId: agentID,
ContextPattern: contextPattern, // e.g., "ctx_data_analysis_*"
})
if err != nil {
return err
}
for {
event, err := stream.Recv()
if err != nil {
return err
}
if message := event.GetMessage(); message != nil {
log.Printf("Received context message: %s in context: %s",
message.GetMessageId(), message.GetContextId())
// Process message within context
processContextMessage(ctx, message)
}
}
}
Best Practices
1. Use Descriptive Context IDs
contextID := fmt.Sprintf("ctx_%s_%s_%s", workflowType, userID, uuid.New().String())
2. Preserve Context Across All Related Messages
// All messages in the same workflow should use the same context_id
message.ContextId = existingContextID
3. Include Context Metadata for State Tracking
contextMetadata := map[string]interface{}{
"workflow_type": "data_pipeline",
"initiated_by": userID,
"current_step": stepNumber,
"total_steps": totalSteps,
}
4. Use Context for Task Dependencies
taskMetadata := map[string]interface{}{
"context_id": contextID,
"depends_on": previousTaskID,
"workflow_step": stepNumber,
}
5. Handle Context Cleanup
// Set context expiration for long-running workflows
contextMetadata["expires_at"] = time.Now().Add(24 * time.Hour).Format(time.RFC3339)
This guide covered conversation context management in A2A protocol. Next, learn about Working with A2A Artifacts to understand how to create and manage structured outputs from completed tasks.
3.3 - How to Work with A2A Artifacts
Learn how to create, structure, and deliver Agent2Agent protocol artifacts as structured outputs from completed tasks.
How to Work with A2A Artifacts
This guide shows you how to create and work with Agent2Agent (A2A) protocol artifacts, which are structured outputs delivered when tasks are completed. Artifacts provide rich, typed results that can include text reports, data files, structured data, and more.
Understanding A2A Artifacts
A2A artifacts are structured containers for task outputs that include:
- Artifact ID: Unique identifier for the artifact
- Name: Human-readable name for the artifact
- Description: Explanation of what the artifact contains
- Parts: The actual content (text, data, files)
- Metadata: Additional context about the artifact
Artifacts are typically generated when tasks reach TASK_STATE_COMPLETED status.
Creating Basic Artifacts
Text Report Artifacts
Create simple text-based results:
package main
import (
"fmt"
"github.com/google/uuid"
pb "github.com/owulveryck/agenthub/events/a2a"
"google.golang.org/protobuf/types/known/structpb"
)
func createTextReportArtifact(taskID, reportContent string) *pb.Artifact {
return &pb.Artifact{
ArtifactId: fmt.Sprintf("artifact_%s_%s", taskID, uuid.New().String()),
Name: "Analysis Report",
Description: "Detailed analysis results and recommendations",
Parts: []*pb.Part{
{
Part: &pb.Part_Text{
Text: reportContent,
},
},
},
Metadata: &structpb.Struct{
Fields: map[string]*structpb.Value{
"artifact_type": structpb.NewStringValue("report"),
"format": structpb.NewStringValue("text"),
"task_id": structpb.NewStringValue(taskID),
"generated_at": structpb.NewStringValue(time.Now().Format(time.RFC3339)),
},
},
}
}
Data Analysis Artifacts
Create artifacts with structured analysis results:
func createDataAnalysisArtifact(taskID string, results map[string]interface{}) *pb.Artifact {
// Convert results to structured data
resultsData, err := structpb.NewStruct(results)
if err != nil {
log.Printf("Error creating results data: %v", err)
resultsData = &structpb.Struct{}
}
// Create summary statistics
summary := map[string]interface{}{
"total_records": results["record_count"],
"processing_time": results["duration_ms"],
"success_rate": results["success_percentage"],
"anomalies_found": results["anomaly_count"],
}
summaryData, _ := structpb.NewStruct(summary)
return &pb.Artifact{
ArtifactId: fmt.Sprintf("artifact_analysis_%s", uuid.New().String()),
Name: "Data Analysis Results",
Description: "Complete analysis results with statistics and insights",
Parts: []*pb.Part{
{
Part: &pb.Part_Text{
Text: "Data analysis completed successfully. See attached results for detailed findings.",
},
},
{
Part: &pb.Part_Data{
Data: &pb.DataPart{
Data: resultsData,
Description: "Complete analysis results",
},
},
},
{
Part: &pb.Part_Data{
Data: &pb.DataPart{
Data: summaryData,
Description: "Summary statistics",
},
},
},
},
Metadata: &structpb.Struct{
Fields: map[string]*structpb.Value{
"artifact_type": structpb.NewStringValue("analysis"),
"analysis_type": structpb.NewStringValue("statistical"),
"data_source": structpb.NewStringValue(results["source"].(string)),
"record_count": structpb.NewNumberValue(results["record_count"].(float64)),
"processing_time": structpb.NewNumberValue(results["duration_ms"].(float64)),
},
},
}
}
File-Based Artifacts
Create artifacts that reference generated files:
func createFileArtifact(taskID, fileID, filename, mimeType string, sizeBytes int64) *pb.Artifact {
// File metadata
fileMetadata, _ := structpb.NewStruct(map[string]interface{}{
"generated_by": "data_processor_v1.2",
"file_version": "1.0",
"encoding": "utf-8",
"compression": "gzip",
"checksum_sha256": "abc123...", // Calculate actual checksum
})
return &pb.Artifact{
ArtifactId: fmt.Sprintf("artifact_file_%s", uuid.New().String()),
Name: "Processed Dataset",
Description: "Cleaned and processed dataset ready for analysis",
Parts: []*pb.Part{
{
Part: &pb.Part_Text{
Text: fmt.Sprintf("Dataset processing completed. Generated file: %s", filename),
},
},
{
Part: &pb.Part_File{
File: &pb.FilePart{
FileId: fileID,
Filename: filename,
MimeType: mimeType,
SizeBytes: sizeBytes,
Metadata: fileMetadata,
},
},
},
},
Metadata: &structpb.Struct{
Fields: map[string]*structpb.Value{
"artifact_type": structpb.NewStringValue("file"),
"file_type": structpb.NewStringValue("dataset"),
"original_task": structpb.NewStringValue(taskID),
"processing_stage": structpb.NewStringValue("cleaned"),
},
},
}
}
Complex Multi-Part Artifacts
Complete Analysis Package
Create comprehensive artifacts with multiple content types:
func createCompleteAnalysisArtifact(taskID string, analysisResults map[string]interface{}) *pb.Artifact {
// Executive summary
summary := fmt.Sprintf(`
Analysis Complete: %s
Key Findings:
- Processed %v records
- Found %v anomalies
- Success rate: %v%%
- Processing time: %v ms
Recommendations:
%s
`,
analysisResults["dataset_name"],
analysisResults["record_count"],
analysisResults["anomaly_count"],
analysisResults["success_percentage"],
analysisResults["duration_ms"],
analysisResults["recommendations"],
)
// Detailed results data
detailedResults, _ := structpb.NewStruct(analysisResults)
// Configuration used
configData, _ := structpb.NewStruct(map[string]interface{}{
"algorithm": "statistical_analysis_v2",
"confidence_level": 0.95,
"outlier_threshold": 2.5,
"normalization": "z-score",
})
return &pb.Artifact{
ArtifactId: fmt.Sprintf("artifact_complete_%s", uuid.New().String()),
Name: "Complete Analysis Package",
Description: "Full analysis results including summary, data, configuration, and generated files",
Parts: []*pb.Part{
{
Part: &pb.Part_Text{
Text: summary,
},
},
{
Part: &pb.Part_Data{
Data: &pb.DataPart{
Data: detailedResults,
Description: "Detailed analysis results and metrics",
},
},
},
{
Part: &pb.Part_Data{
Data: &pb.DataPart{
Data: configData,
Description: "Analysis configuration parameters",
},
},
},
{
Part: &pb.Part_File{
File: &pb.FilePart{
FileId: "results_visualization_123",
Filename: "analysis_charts.png",
MimeType: "image/png",
SizeBytes: 1024000,
},
},
},
{
Part: &pb.Part_File{
File: &pb.FilePart{
FileId: "results_dataset_456",
Filename: "processed_data.csv",
MimeType: "text/csv",
SizeBytes: 5120000,
},
},
},
},
Metadata: &structpb.Struct{
Fields: map[string]*structpb.Value{
"artifact_type": structpb.NewStringValue("complete_package"),
"analysis_type": structpb.NewStringValue("comprehensive"),
"includes_files": structpb.NewBoolValue(true),
"includes_data": structpb.NewBoolValue(true),
"includes_summary": structpb.NewBoolValue(true),
"file_count": structpb.NewNumberValue(2),
"total_size_bytes": structpb.NewNumberValue(6144000),
},
},
}
}
Publishing Artifacts
Using A2A Task Completion
Publish artifacts when completing tasks:
import (
"context"
"github.com/owulveryck/agenthub/internal/agenthub"
eventbus "github.com/owulveryck/agenthub/events/eventbus"
)
func completeTaskWithArtifact(ctx context.Context, client eventbus.AgentHubClient, task *pb.Task, artifact *pb.Artifact) error {
// Update task status to completed
task.Status = &pb.TaskStatus{
State: pb.TaskState_TASK_STATE_COMPLETED,
Update: &pb.Message{
MessageId: fmt.Sprintf("msg_completion_%s", uuid.New().String()),
ContextId: task.GetContextId(),
TaskId: task.GetId(),
Role: pb.Role_AGENT,
Content: []*pb.Part{
{
Part: &pb.Part_Text{
Text: "Task completed successfully. Artifact has been generated.",
},
},
},
},
Timestamp: timestamppb.Now(),
}
// Add artifact to task
task.Artifacts = append(task.Artifacts, artifact)
// Publish task completion
_, err := client.PublishTaskUpdate(ctx, &eventbus.PublishTaskUpdateRequest{
Task: task,
Routing: &eventbus.AgentEventMetadata{
FromAgentId: "processing_agent",
ToAgentId: "", // Broadcast completion
EventType: "task.completed",
Priority: eventbus.Priority_PRIORITY_MEDIUM,
},
})
if err != nil {
return fmt.Errorf("failed to publish task completion: %w", err)
}
// Separately publish artifact update
return publishArtifactUpdate(ctx, client, task.GetId(), artifact)
}
func publishArtifactUpdate(ctx context.Context, client eventbus.AgentHubClient, taskID string, artifact *pb.Artifact) error {
_, err := client.PublishTaskArtifact(ctx, &eventbus.PublishTaskArtifactRequest{
TaskId: taskID,
Artifact: artifact,
Routing: &eventbus.AgentEventMetadata{
FromAgentId: "processing_agent",
ToAgentId: "", // Broadcast to interested parties
EventType: "artifact.created",
Priority: eventbus.Priority_PRIORITY_LOW,
},
})
return err
}
Using A2A Abstractions
Use AgentHub’s simplified artifact publishing:
func completeTaskWithA2AArtifact(ctx context.Context, subscriber *agenthub.A2ATaskSubscriber, task *pb.Task, artifact *pb.Artifact) error {
return subscriber.CompleteA2ATaskWithArtifact(ctx, task, artifact)
}
Processing Received Artifacts
Artifact Event Handling
Handle incoming artifact notifications:
func handleArtifactEvents(ctx context.Context, client eventbus.AgentHubClient, agentID string) error {
stream, err := client.SubscribeToAgentEvents(ctx, &eventbus.SubscribeToAgentEventsRequest{
AgentId: agentID,
EventTypes: []string{"artifact.created", "task.completed"},
})
if err != nil {
return err
}
for {
event, err := stream.Recv()
if err != nil {
return err
}
switch payload := event.GetPayload().(type) {
case *eventbus.AgentEvent_ArtifactUpdate:
artifactEvent := payload.ArtifactUpdate
log.Printf("Received artifact: %s for task: %s",
artifactEvent.GetArtifact().GetArtifactId(),
artifactEvent.GetTaskId())
// Process the artifact
err := processArtifact(ctx, artifactEvent.GetArtifact())
if err != nil {
log.Printf("Error processing artifact: %v", err)
}
case *eventbus.AgentEvent_Task:
task := payload.Task
if task.GetStatus().GetState() == pb.TaskState_TASK_STATE_COMPLETED {
// Process completed task artifacts
for _, artifact := range task.GetArtifacts() {
err := processArtifact(ctx, artifact)
if err != nil {
log.Printf("Error processing task artifact: %v", err)
}
}
}
}
}
}
Artifact Content Processing
Process different types of artifact content:
func processArtifact(ctx context.Context, artifact *pb.Artifact) error {
log.Printf("Processing artifact: %s - %s", artifact.GetName(), artifact.GetDescription())
for i, part := range artifact.GetParts() {
switch content := part.GetPart().(type) {
case *pb.Part_Text:
log.Printf("Text part %d: Processing text content (%d chars)", i, len(content.Text))
// Process text content
err := processTextArtifact(content.Text)
if err != nil {
return fmt.Errorf("failed to process text part: %w", err)
}
case *pb.Part_Data:
log.Printf("Data part %d: Processing structured data (%s)", i, content.Data.GetDescription())
// Process structured data
err := processDataArtifact(content.Data.GetData())
if err != nil {
return fmt.Errorf("failed to process data part: %w", err)
}
case *pb.Part_File:
log.Printf("File part %d: Processing file %s (%s, %d bytes)",
i, content.File.GetFilename(), content.File.GetMimeType(), content.File.GetSizeBytes())
// Process file reference
err := processFileArtifact(ctx, content.File)
if err != nil {
return fmt.Errorf("failed to process file part: %w", err)
}
}
}
return nil
}
func processTextArtifact(text string) error {
// Extract insights, save to database, etc.
log.Printf("Extracting insights from text artifact...")
return nil
}
func processDataArtifact(data *structpb.Struct) error {
// Parse structured data, update metrics, etc.
log.Printf("Processing structured data artifact...")
// Access specific fields
if recordCount, ok := data.GetFields()["record_count"]; ok {
log.Printf("Records processed: %v", recordCount.GetNumberValue())
}
return nil
}
func processFileArtifact(ctx context.Context, file *pb.FilePart) error {
// Download file, process content, etc.
log.Printf("Processing file artifact: %s", file.GetFileId())
// Handle different file types
switch file.GetMimeType() {
case "text/csv":
return processCSVFile(ctx, file.GetFileId())
case "image/png", "image/jpeg":
return processImageFile(ctx, file.GetFileId())
case "application/json":
return processJSONFile(ctx, file.GetFileId())
default:
log.Printf("Unknown file type: %s", file.GetMimeType())
}
return nil
}
Artifact Chaining
Use artifacts from one task as inputs to another:
func chainArtifactProcessing(ctx context.Context, client eventbus.AgentHubClient, inputArtifact *pb.Artifact) error {
// Create a new task using the artifact as input
contextID := fmt.Sprintf("ctx_chained_%s", uuid.New().String())
chainedTask := &pb.Task{
Id: fmt.Sprintf("task_chained_%s", uuid.New().String()),
ContextId: contextID,
Status: &pb.TaskStatus{
State: pb.TaskState_TASK_STATE_SUBMITTED,
Update: &pb.Message{
MessageId: fmt.Sprintf("msg_%s", uuid.New().String()),
ContextId: contextID,
Role: pb.Role_USER,
Content: []*pb.Part{
{
Part: &pb.Part_Text{
Text: "Please process the results from the previous analysis task.",
},
},
{
Part: &pb.Part_Data{
Data: &pb.DataPart{
Data: &structpb.Struct{
Fields: map[string]*structpb.Value{
"input_artifact_id": structpb.NewStringValue(inputArtifact.GetArtifactId()),
"processing_type": structpb.NewStringValue("enhancement"),
},
},
Description: "Processing parameters with input artifact reference",
},
},
},
},
},
Timestamp: timestamppb.Now(),
},
}
// Publish the chained task
_, err := client.PublishTaskUpdate(ctx, &eventbus.PublishTaskUpdateRequest{
Task: chainedTask,
Routing: &eventbus.AgentEventMetadata{
FromAgentId: "workflow_coordinator",
ToAgentId: "enhancement_processor",
EventType: "task.chained",
Priority: eventbus.Priority_PRIORITY_MEDIUM,
},
})
return err
}
Best Practices
1. Use Descriptive Artifact Names and Descriptions
artifact := &pb.Artifact{
Name: "Customer Segmentation Analysis Results",
Description: "Complete customer segmentation with demographics, behavior patterns, and actionable insights",
// ...
}
metadata := map[string]interface{}{
"artifact_type": "analysis",
"domain": "customer_analytics",
"data_source": "customer_transactions_2024",
"algorithm": "k_means_clustering",
"confidence": 0.94,
"generated_by": "analytics_engine_v2.1",
"valid_until": time.Now().Add(30*24*time.Hour).Format(time.RFC3339),
}
3. Structure Multi-Part Artifacts Logically
// Order parts from most important to least important
parts := []*pb.Part{
textSummaryPart, // Human-readable summary first
structuredDataPart, // Machine-readable data second
configurationPart, // Configuration details third
fileReferencePart, // File references last
}
4. Validate Artifacts Before Publishing
func validateArtifact(artifact *pb.Artifact) error {
if artifact.GetArtifactId() == "" {
return fmt.Errorf("artifact_id is required")
}
if len(artifact.GetParts()) == 0 {
return fmt.Errorf("artifact must have at least one part")
}
return nil
}
5. Handle Large Artifacts Appropriately
// For large data, use file references instead of inline data
if len(dataBytes) > 1024*1024 { // 1MB threshold
// Save to file storage and reference
fileID := saveToFileStorage(dataBytes)
part = createFileReferencePart(fileID, filename, mimeType)
} else {
// Include data inline
part = createInlineDataPart(data)
}
This guide covered creating and working with A2A artifacts. Next, learn about A2A Task Lifecycle Management to understand how to properly manage task states and coordinate complex workflows.
3.4 - How to Work with A2A Task Lifecycle
Learn how to manage Agent2Agent protocol task states, handle lifecycle transitions, and coordinate complex task workflows.
How to Work with A2A Task Lifecycle
This guide shows you how to manage the complete lifecycle of Agent2Agent (A2A) protocol tasks, from creation through completion. Understanding task states and transitions is essential for building reliable agent workflows.
Understanding A2A Task States
A2A tasks progress through the following states:
TASK_STATE_SUBMITTED: Task created and submitted for processingTASK_STATE_WORKING: Task accepted and currently being processedTASK_STATE_COMPLETED: Task finished successfully with resultsTASK_STATE_FAILED: Task failed with error informationTASK_STATE_CANCELLED: Task cancelled before completion
Each state transition is recorded with a timestamp and status message.
Creating A2A Tasks
Basic Task Creation
Create a new task with initial state:
package main
import (
"fmt"
"github.com/google/uuid"
pb "github.com/owulveryck/agenthub/events/a2a"
"google.golang.org/protobuf/types/known/timestamppb"
"google.golang.org/protobuf/types/known/structpb"
)
func createA2ATask(contextID, taskType string, content []*pb.Part) *pb.Task {
taskID := fmt.Sprintf("task_%s_%s", taskType, uuid.New().String())
messageID := fmt.Sprintf("msg_%s", uuid.New().String())
return &pb.Task{
Id: taskID,
ContextId: contextID,
Status: &pb.TaskStatus{
State: pb.TaskState_TASK_STATE_SUBMITTED,
Update: &pb.Message{
MessageId: messageID,
ContextId: contextID,
TaskId: taskID,
Role: pb.Role_USER,
Content: content,
Metadata: &structpb.Struct{
Fields: map[string]*structpb.Value{
"task_type": structpb.NewStringValue(taskType),
"submitted_by": structpb.NewStringValue("user_agent"),
"priority": structpb.NewStringValue("medium"),
},
},
},
Timestamp: timestamppb.Now(),
},
History: []*pb.Message{},
Artifacts: []*pb.Artifact{},
Metadata: &structpb.Struct{
Fields: map[string]*structpb.Value{
"task_type": structpb.NewStringValue(taskType),
"created_at": structpb.NewStringValue(time.Now().Format(time.RFC3339)),
"expected_duration": structpb.NewStringValue("5m"),
},
},
}
}
Task with Complex Requirements
Create tasks with detailed specifications:
func createComplexAnalysisTask(contextID string) *pb.Task {
// Task configuration
taskConfig, _ := structpb.NewStruct(map[string]interface{}{
"algorithm": "advanced_ml_analysis",
"confidence_level": 0.95,
"max_processing_time": "30m",
"output_formats": []string{"json", "csv", "visualization"},
"quality_threshold": 0.9,
})
// Input data specification
inputSpec, _ := structpb.NewStruct(map[string]interface{}{
"dataset_id": "customer_data_2024",
"required_fields": []string{"customer_id", "transaction_amount", "timestamp"},
"date_range": map[string]string{"start": "2024-01-01", "end": "2024-12-31"},
"preprocessing": true,
})
content := []*pb.Part{
{
Part: &pb.Part_Text{
Text: "Perform comprehensive customer behavior analysis on the specified dataset with advanced ML algorithms.",
},
},
{
Part: &pb.Part_Data{
Data: &pb.DataPart{
Data: taskConfig,
Description: "Analysis configuration parameters",
},
},
},
{
Part: &pb.Part_Data{
Data: &pb.DataPart{
Data: inputSpec,
Description: "Input dataset specification",
},
},
},
}
task := createA2ATask(contextID, "customer_analysis", content)
// Add complex task metadata
task.Metadata = &structpb.Struct{
Fields: map[string]*structpb.Value{
"task_type": structpb.NewStringValue("customer_analysis"),
"complexity": structpb.NewStringValue("high"),
"estimated_duration": structpb.NewStringValue("30m"),
"required_resources": structpb.NewListValue(&structpb.ListValue{
Values: []*structpb.Value{
structpb.NewStringValue("gpu_compute"),
structpb.NewStringValue("large_memory"),
},
}),
"deliverables": structpb.NewListValue(&structpb.ListValue{
Values: []*structpb.Value{
structpb.NewStringValue("analysis_report"),
structpb.NewStringValue("customer_segments"),
structpb.NewStringValue("predictions"),
},
}),
},
}
return task
}
Task State Transitions
Accepting a Task (SUBMITTED → WORKING)
When an agent accepts a task:
func acceptTask(task *pb.Task, agentID string) *pb.Task {
// Create acceptance message
acceptanceMessage := &pb.Message{
MessageId: fmt.Sprintf("msg_accept_%s", uuid.New().String()),
ContextId: task.GetContextId(),
TaskId: task.GetId(),
Role: pb.Role_AGENT,
Content: []*pb.Part{
{
Part: &pb.Part_Text{
Text: fmt.Sprintf("Task accepted by agent %s. Beginning processing.", agentID),
},
},
},
Metadata: &structpb.Struct{
Fields: map[string]*structpb.Value{
"accepting_agent": structpb.NewStringValue(agentID),
"estimated_completion": structpb.NewStringValue(
time.Now().Add(15*time.Minute).Format(time.RFC3339),
),
},
},
}
// Update task status
task.Status = &pb.TaskStatus{
State: pb.TaskState_TASK_STATE_WORKING,
Update: acceptanceMessage,
Timestamp: timestamppb.Now(),
}
// Add to history
task.History = append(task.History, acceptanceMessage)
return task
}
Progress Updates (WORKING → WORKING)
Send progress updates during processing:
func sendProgressUpdate(task *pb.Task, progressPercentage int, currentPhase, details string) *pb.Task {
// Create progress data
progressData, _ := structpb.NewStruct(map[string]interface{}{
"progress_percentage": progressPercentage,
"current_phase": currentPhase,
"details": details,
"estimated_remaining": calculateRemainingTime(progressPercentage),
"memory_usage_mb": getCurrentMemoryUsage(),
"cpu_usage_percent": getCurrentCPUUsage(),
})
progressMessage := &pb.Message{
MessageId: fmt.Sprintf("msg_progress_%s_%d", uuid.New().String(), progressPercentage),
ContextId: task.GetContextId(),
TaskId: task.GetId(),
Role: pb.Role_AGENT,
Content: []*pb.Part{
{
Part: &pb.Part_Text{
Text: fmt.Sprintf("Progress update: %d%% complete. Current phase: %s",
progressPercentage, currentPhase),
},
},
{
Part: &pb.Part_Data{
Data: &pb.DataPart{
Data: progressData,
Description: "Detailed progress information",
},
},
},
},
Metadata: &structpb.Struct{
Fields: map[string]*structpb.Value{
"update_type": structpb.NewStringValue("progress"),
"progress_percentage": structpb.NewNumberValue(float64(progressPercentage)),
"phase": structpb.NewStringValue(currentPhase),
},
},
}
// Update task status (still WORKING, but with new message)
task.Status = &pb.TaskStatus{
State: pb.TaskState_TASK_STATE_WORKING,
Update: progressMessage,
Timestamp: timestamppb.Now(),
}
// Add to history
task.History = append(task.History, progressMessage)
return task
}
func calculateRemainingTime(progressPercentage int) string {
if progressPercentage <= 0 {
return "unknown"
}
// Simplified estimation logic
remainingMinutes := (100 - progressPercentage) * 15 / 100
return fmt.Sprintf("%dm", remainingMinutes)
}
Completing a Task (WORKING → COMPLETED)
Complete a task with results:
func completeTask(task *pb.Task, results string, artifacts []*pb.Artifact) *pb.Task {
// Create completion message
completionMessage := &pb.Message{
MessageId: fmt.Sprintf("msg_complete_%s", uuid.New().String()),
ContextId: task.GetContextId(),
TaskId: task.GetId(),
Role: pb.Role_AGENT,
Content: []*pb.Part{
{
Part: &pb.Part_Text{
Text: fmt.Sprintf("Task completed successfully. %s", results),
},
},
},
Metadata: &structpb.Struct{
Fields: map[string]*structpb.Value{
"completion_status": structpb.NewStringValue("success"),
"processing_time": structpb.NewStringValue(
time.Since(getTaskStartTime(task)).String(),
),
"artifact_count": structpb.NewNumberValue(float64(len(artifacts))),
},
},
}
// Update task status
task.Status = &pb.TaskStatus{
State: pb.TaskState_TASK_STATE_COMPLETED,
Update: completionMessage,
Timestamp: timestamppb.Now(),
}
// Add completion message to history
task.History = append(task.History, completionMessage)
// Add artifacts
task.Artifacts = append(task.Artifacts, artifacts...)
return task
}
Handling Task Failures (WORKING → FAILED)
Handle task failures with detailed error information:
func failTask(task *pb.Task, errorMessage, errorCode string, errorDetails map[string]interface{}) *pb.Task {
// Create error data
errorData, _ := structpb.NewStruct(map[string]interface{}{
"error_code": errorCode,
"error_message": errorMessage,
"error_details": errorDetails,
"failure_phase": getCurrentProcessingPhase(task),
"retry_possible": determineRetryPossibility(errorCode),
"diagnostic_info": map[string]interface{}{
"memory_at_failure": getCurrentMemoryUsage(),
"cpu_at_failure": getCurrentCPUUsage(),
"logs_reference": getLogReference(),
},
})
failureMessage := &pb.Message{
MessageId: fmt.Sprintf("msg_failure_%s", uuid.New().String()),
ContextId: task.GetContextId(),
TaskId: task.GetId(),
Role: pb.Role_AGENT,
Content: []*pb.Part{
{
Part: &pb.Part_Text{
Text: fmt.Sprintf("Task failed: %s (Code: %s)", errorMessage, errorCode),
},
},
{
Part: &pb.Part_Data{
Data: &pb.DataPart{
Data: errorData,
Description: "Detailed error information and diagnostics",
},
},
},
},
Metadata: &structpb.Struct{
Fields: map[string]*structpb.Value{
"failure_type": structpb.NewStringValue("processing_error"),
"error_code": structpb.NewStringValue(errorCode),
"retry_possible": structpb.NewBoolValue(determineRetryPossibility(errorCode)),
},
},
}
// Update task status
task.Status = &pb.TaskStatus{
State: pb.TaskState_TASK_STATE_FAILED,
Update: failureMessage,
Timestamp: timestamppb.Now(),
}
// Add failure message to history
task.History = append(task.History, failureMessage)
return task
}
func determineRetryPossibility(errorCode string) bool {
// Determine if the error is retryable
retryableErrors := []string{
"TEMPORARY_RESOURCE_UNAVAILABLE",
"NETWORK_TIMEOUT",
"RATE_LIMIT_EXCEEDED",
}
for _, retryable := range retryableErrors {
if errorCode == retryable {
return true
}
}
return false
}
Cancelling Tasks (ANY → CANCELLED)
Handle task cancellation:
func cancelTask(task *pb.Task, reason, cancelledBy string) *pb.Task {
cancellationMessage := &pb.Message{
MessageId: fmt.Sprintf("msg_cancel_%s", uuid.New().String()),
ContextId: task.GetContextId(),
TaskId: task.GetId(),
Role: pb.Role_AGENT,
Content: []*pb.Part{
{
Part: &pb.Part_Text{
Text: fmt.Sprintf("Task cancelled: %s", reason),
},
},
},
Metadata: &structpb.Struct{
Fields: map[string]*structpb.Value{
"cancellation_reason": structpb.NewStringValue(reason),
"cancelled_by": structpb.NewStringValue(cancelledBy),
"previous_state": structpb.NewStringValue(task.GetStatus().GetState().String()),
},
},
}
// Update task status
task.Status = &pb.TaskStatus{
State: pb.TaskState_TASK_STATE_CANCELLED,
Update: cancellationMessage,
Timestamp: timestamppb.Now(),
}
// Add cancellation message to history
task.History = append(task.History, cancellationMessage)
return task
}
Publishing Task Updates
Using AgentHub Client
Publish task updates through the AgentHub broker:
import (
"context"
eventbus "github.com/owulveryck/agenthub/events/eventbus"
)
func publishTaskUpdate(ctx context.Context, client eventbus.AgentHubClient, task *pb.Task, fromAgent, toAgent string) error {
_, err := client.PublishTaskUpdate(ctx, &eventbus.PublishTaskUpdateRequest{
Task: task,
Routing: &eventbus.AgentEventMetadata{
FromAgentId: fromAgent,
ToAgentId: toAgent,
EventType: fmt.Sprintf("task.%s", task.GetStatus().GetState().String()),
Priority: getPriorityFromTaskState(task.GetStatus().GetState()),
},
})
return err
}
func getPriorityFromTaskState(state pb.TaskState) eventbus.Priority {
switch state {
case pb.TaskState_TASK_STATE_FAILED:
return eventbus.Priority_PRIORITY_HIGH
case pb.TaskState_TASK_STATE_COMPLETED:
return eventbus.Priority_PRIORITY_MEDIUM
case pb.TaskState_TASK_STATE_WORKING:
return eventbus.Priority_PRIORITY_LOW
default:
return eventbus.Priority_PRIORITY_MEDIUM
}
}
Using A2A Abstractions
Use simplified A2A task management:
import (
"github.com/owulveryck/agenthub/internal/agenthub"
)
func manageTaskWithA2A(ctx context.Context, subscriber *agenthub.A2ATaskSubscriber, task *pb.Task) error {
// Process the task
artifact, status, errorMsg := processTaskContent(ctx, task)
switch status {
case pb.TaskState_TASK_STATE_COMPLETED:
return subscriber.CompleteA2ATaskWithArtifact(ctx, task, artifact)
case pb.TaskState_TASK_STATE_FAILED:
return subscriber.FailA2ATask(ctx, task, errorMsg)
default:
return subscriber.UpdateA2ATaskProgress(ctx, task, 50, "Processing data", "Halfway complete")
}
}
Task Monitoring and Querying
Get Task Status
Query task status and history:
func getTaskStatus(ctx context.Context, client eventbus.AgentHubClient, taskID string) (*pb.Task, error) {
task, err := client.GetTask(ctx, &eventbus.GetTaskRequest{
TaskId: taskID,
HistoryLength: 10, // Get last 10 messages
})
if err != nil {
return nil, err
}
// Log current status
log.Printf("Task %s status: %s", taskID, task.GetStatus().GetState().String())
log.Printf("Last update: %s", task.GetStatus().GetUpdate().GetContent()[0].GetText())
log.Printf("History length: %d messages", len(task.GetHistory()))
log.Printf("Artifacts: %d", len(task.GetArtifacts()))
return task, nil
}
List Tasks by Context
Get all tasks for a conversation context:
func getTasksInContext(ctx context.Context, client eventbus.AgentHubClient, contextID string) ([]*pb.Task, error) {
response, err := client.ListTasks(ctx, &eventbus.ListTasksRequest{
ContextId: contextID,
States: []pb.TaskState{}, // All states
Limit: 100,
})
if err != nil {
return nil, err
}
tasks := response.GetTasks()
log.Printf("Found %d tasks in context %s", len(tasks), contextID)
// Analyze task distribution
stateCount := make(map[pb.TaskState]int)
for _, task := range tasks {
stateCount[task.GetStatus().GetState()]++
}
for state, count := range stateCount {
log.Printf(" %s: %d tasks", state.String(), count)
}
return tasks, nil
}
Workflow Coordination
Sequential Task Workflow
Create dependent tasks that execute in sequence:
type TaskWorkflow struct {
ContextID string
Tasks []*pb.Task
Current int
}
func (tw *TaskWorkflow) ExecuteNext(ctx context.Context, client eventbus.AgentHubClient) error {
if tw.Current >= len(tw.Tasks) {
return fmt.Errorf("workflow completed")
}
currentTask := tw.Tasks[tw.Current]
// Add dependency metadata if not first task
if tw.Current > 0 {
previousTask := tw.Tasks[tw.Current-1]
dependencyMetadata := map[string]interface{}{
"depends_on": previousTask.GetId(),
"workflow_step": tw.Current + 1,
"total_steps": len(tw.Tasks),
}
metadata, _ := structpb.NewStruct(dependencyMetadata)
currentTask.Metadata = metadata
}
// Publish the task
err := publishTaskUpdate(ctx, client, currentTask, "workflow_coordinator", "")
if err != nil {
return err
}
tw.Current++
return nil
}
Parallel Task Execution
Execute multiple tasks concurrently:
func executeParallelTasks(ctx context.Context, client eventbus.AgentHubClient, tasks []*pb.Task) error {
var wg sync.WaitGroup
errors := make(chan error, len(tasks))
for _, task := range tasks {
wg.Add(1)
go func(t *pb.Task) {
defer wg.Done()
// Add parallel execution metadata
t.Metadata = &structpb.Struct{
Fields: map[string]*structpb.Value{
"execution_mode": structpb.NewStringValue("parallel"),
"batch_id": structpb.NewStringValue(uuid.New().String()),
"batch_size": structpb.NewNumberValue(float64(len(tasks))),
},
}
err := publishTaskUpdate(ctx, client, t, "parallel_coordinator", "")
if err != nil {
errors <- err
}
}(task)
}
wg.Wait()
close(errors)
// Check for errors
for err := range errors {
if err != nil {
return err
}
}
return nil
}
Best Practices
1. Always Update Task Status
// Update status for every significant state change
task = acceptTask(task, agentID)
publishTaskUpdate(ctx, client, task, agentID, "")
2. Provide Meaningful Progress Updates
// Send regular progress updates during long-running tasks
for progress := 10; progress <= 90; progress += 10 {
task = sendProgressUpdate(task, progress, currentPhase, details)
publishTaskUpdate(ctx, client, task, agentID, "")
time.Sleep(processingInterval)
}
errorDetails := map[string]interface{}{
"input_validation_errors": validationErrors,
"system_resources": resourceSnapshot,
"retry_strategy": "exponential_backoff",
}
task = failTask(task, "Data validation failed", "INVALID_INPUT", errorDetails)
4. Maintain Complete Message History
// Always append to history, never replace
task.History = append(task.History, statusMessage)
// Include context for debugging and monitoring
metadata := map[string]interface{}{
"processing_node": hostname,
"resource_usage": resourceMetrics,
"performance_metrics": performanceData,
}
This guide covered the complete A2A task lifecycle management. You now have the tools to create, manage, and coordinate complex task workflows with proper state management and comprehensive observability.
4.1 - How to Debug Agent Issues
Practical steps for troubleshooting common issues when developing and deploying agents with AgentHub.
How to Debug Agent Issues
This guide provides practical steps for troubleshooting common issues when developing and deploying agents with AgentHub.
Common Connection Issues
Problem: Agent Can’t Connect to Broker
Symptoms:
Failed to connect: connection refused
Solutions:
Check if broker is running:
# Check if broker process is running
ps aux | grep broker
# Check if port 50051 is listening
netstat -tlnp | grep 50051
# or
lsof -i :50051
Verify broker address and configuration:
// Using unified abstraction - configuration via environment or code
config := agenthub.NewGRPCConfig("subscriber")
config.BrokerAddr = "localhost" // Default
config.BrokerPort = "50051" // Default
// Or set via environment variables:
// export AGENTHUB_BROKER_ADDR="localhost"
// export AGENTHUB_BROKER_PORT="50051"
Check firewall settings:
# On Linux, check if port is blocked
sudo ufw status
# Allow port if needed
sudo ufw allow 50051
Problem: TLS/SSL Errors
Symptoms:
transport: authentication handshake failed
Solution:
The unified abstraction handles TLS configuration automatically:
// TLS and connection management handled automatically
config := agenthub.NewGRPCConfig("subscriber")
client, err := agenthub.NewAgentHubClient(config)
if err != nil {
panic(err)
}
Task Processing Issues
Problem: Agent Not Receiving Tasks
Debug Steps:
Check subscription logs:
log.Printf("Agent %s subscribing to tasks...", agentID)
// Should see: "Successfully subscribed to tasks for agent {agentID}"
Verify agent ID matching:
// In publisher
ResponderAgentId: "my_processing_agent"
// In subscriber (must match exactly)
const agentID = "my_processing_agent"
Check task type filtering:
req := &pb.SubscribeToTasksRequest{
AgentId: agentID,
TaskTypes: []string{"math_calculation"}, // Remove to receive all types
}
Monitor broker logs:
# Broker should show:
Received task request: task_xyz (type: math) from agent: publisher_agent
# And either:
No subscribers for task from agent 'publisher_agent' # Bad - no matching agents
# Or task routing to subscribers # Good - task delivered
Problem: Tasks Timing Out
Debug Steps:
Check task processing time:
func processTask(ctx context.Context, task *pb.TaskMessage, client pb.EventBusClient) {
start := time.Now()
defer func() {
log.Printf("Task %s took %v to process", task.GetTaskId(), time.Since(start))
}()
// Your processing logic
}
Add timeout handling:
func processTaskWithTimeout(ctx context.Context, task *pb.TaskMessage, client pb.EventBusClient) {
// Create timeout context
taskCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
// Process with timeout
select {
case <-taskCtx.Done():
if taskCtx.Err() == context.DeadlineExceeded {
sendResult(ctx, task, nil, pb.TaskStatus_TASK_STATUS_FAILED, "Task timeout", client)
}
return
default:
// Process normally
}
}
Monitor progress updates:
// Send progress every few seconds
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
go func() {
progress := 0
for range ticker.C {
progress += 10
if progress > 100 {
return
}
sendProgress(ctx, task, int32(progress), "Still processing...", client)
}
}()
Message Serialization Issues
Problem: Parameter Marshaling Errors
Symptoms:
Error creating parameters struct: proto: invalid value type
Solution:
Ensure all parameter values are compatible with structpb:
// Bad - channels, functions, complex types not supported
params := map[string]interface{}{
"callback": func() {}, // Not supported
"channel": make(chan int), // Not supported
}
// Good - basic types only
params := map[string]interface{}{
"name": "value", // string
"count": 42, // number
"enabled": true, // boolean
"items": []string{"a", "b"}, // array
"config": map[string]interface{}{ // nested object
"timeout": 30,
},
}
Problem: Result Unmarshaling Issues
Debug Steps:
Check result structure:
func handleTaskResult(result *pb.TaskResult) {
log.Printf("Raw result: %+v", result.GetResult())
resultMap := result.GetResult().AsMap()
log.Printf("Result as map: %+v", resultMap)
// Type assert carefully
if value, ok := resultMap["count"].(float64); ok {
log.Printf("Count: %f", value)
} else {
log.Printf("Count field missing or wrong type: %T", resultMap["count"])
}
}
Handle type conversion safely:
func getStringField(m map[string]interface{}, key string) (string, error) {
if val, ok := m[key]; ok {
if str, ok := val.(string); ok {
return str, nil
}
return "", fmt.Errorf("field %s is not a string: %T", key, val)
}
return "", fmt.Errorf("field %s not found", key)
}
func getNumberField(m map[string]interface{}, key string) (float64, error) {
if val, ok := m[key]; ok {
if num, ok := val.(float64); ok {
return num, nil
}
return 0, fmt.Errorf("field %s is not a number: %T", key, val)
}
return 0, fmt.Errorf("field %s not found", key)
}
Stream and Connection Issues
Problem: Stream Disconnections
Symptoms:
Error receiving task: rpc error: code = Unavailable desc = connection error
Solutions:
Implement retry logic:
func subscribeToTasksWithRetry(ctx context.Context, client pb.EventBusClient) {
for {
err := subscribeToTasks(ctx, client)
if err != nil {
log.Printf("Subscription error: %v, retrying in 5 seconds...", err)
time.Sleep(5 * time.Second)
continue
}
break
}
}
Handle context cancellation:
for {
task, err := stream.Recv()
if err == io.EOF {
log.Printf("Stream closed by server")
return
}
if err != nil {
if ctx.Err() != nil {
log.Printf("Context cancelled: %v", ctx.Err())
return
}
log.Printf("Stream error: %v", err)
return
}
// Process task
}
Problem: Memory Leaks in Long-Running Agents
Debug Steps:
Monitor memory usage:
# Check memory usage
ps -o pid,ppid,cmd,%mem,%cpu -p $(pgrep -f "your-agent")
# Continuous monitoring
watch -n 5 'ps -o pid,ppid,cmd,%mem,%cpu -p $(pgrep -f "your-agent")'
Profile memory usage:
import _ "net/http/pprof"
import "net/http"
func main() {
// Start pprof server
go func() {
log.Println(http.ListenAndServe("localhost:6060", nil))
}()
// Your agent code
}
Access profiles at http://localhost:6060/debug/pprof/
Check for goroutine leaks:
import "runtime"
func logGoroutines() {
ticker := time.NewTicker(30 * time.Second)
go func() {
for range ticker.C {
log.Printf("Goroutines: %d", runtime.NumGoroutine())
}
}()
}
Problem: Slow Task Processing
Debug Steps:
Add timing measurements:
func processTask(ctx context.Context, task *pb.TaskMessage, client pb.EventBusClient) {
timings := make(map[string]time.Duration)
start := time.Now()
// Phase 1: Parameter validation
timings["validation"] = time.Since(start)
last := time.Now()
// Phase 2: Business logic
// ... your logic here ...
timings["processing"] = time.Since(last)
last = time.Now()
// Phase 3: Result formatting
// ... result creation ...
timings["formatting"] = time.Since(last)
log.Printf("Task %s timings: %+v", task.GetTaskId(), timings)
}
Profile CPU usage:
import "runtime/pprof"
import "os"
func startCPUProfile() func() {
f, err := os.Create("cpu.prof")
if err != nil {
log.Fatal(err)
}
pprof.StartCPUProfile(f)
return func() {
pprof.StopCPUProfile()
f.Close()
}
}
func main() {
stop := startCPUProfile()
defer stop()
// Your agent code
}
Monitor queue sizes:
type Agent struct {
taskQueue chan *pb.TaskMessage
}
func (a *Agent) logQueueSize() {
ticker := time.NewTicker(10 * time.Second)
go func() {
for range ticker.C {
log.Printf("Task queue size: %d/%d", len(a.taskQueue), cap(a.taskQueue))
}
}()
}
1. Enable Verbose Logging
import "log"
import "os"
func init() {
// Enable verbose logging
log.SetFlags(log.LstdFlags | log.Lshortfile)
// Set log level from environment
if os.Getenv("DEBUG") == "true" {
log.SetOutput(os.Stdout)
}
}
2. Add Structured Logging
import "encoding/json"
import "time"
type LogEntry struct {
Timestamp string `json:"timestamp"`
Level string `json:"level"`
AgentID string `json:"agent_id"`
TaskID string `json:"task_id,omitempty"`
Message string `json:"message"`
Data map[string]interface{} `json:"data,omitempty"`
}
func logInfo(agentID, taskID, message string, data map[string]interface{}) {
entry := LogEntry{
Timestamp: time.Now().Format(time.RFC3339),
Level: "INFO",
AgentID: agentID,
TaskID: taskID,
Message: message,
Data: data,
}
if jsonData, err := json.Marshal(entry); err == nil {
log.Println(string(jsonData))
}
}
3. Health Check Endpoint
import "net/http"
import "encoding/json"
type HealthStatus struct {
Status string `json:"status"`
AgentID string `json:"agent_id"`
Uptime string `json:"uptime"`
TasksProcessed int64 `json:"tasks_processed"`
LastTaskTime time.Time `json:"last_task_time"`
}
func startHealthServer(agent *Agent) {
http.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) {
status := HealthStatus{
Status: "healthy",
AgentID: agent.ID,
Uptime: time.Since(agent.StartTime).String(),
TasksProcessed: agent.TasksProcessed,
LastTaskTime: agent.LastTaskTime,
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(status)
})
log.Printf("Health server starting on :8080")
log.Fatal(http.ListenAndServe(":8080", nil))
}
4. Task Tracing
import "context"
type TraceID string
func withTraceID(ctx context.Context) context.Context {
traceID := TraceID(fmt.Sprintf("trace-%d", time.Now().UnixNano()))
return context.WithValue(ctx, "trace_id", traceID)
}
func getTraceID(ctx context.Context) TraceID {
if traceID, ok := ctx.Value("trace_id").(TraceID); ok {
return traceID
}
return ""
}
func processTaskWithTracing(ctx context.Context, task *pb.TaskMessage, client pb.EventBusClient) {
ctx = withTraceID(ctx)
traceID := getTraceID(ctx)
log.Printf("[%s] Starting task %s", traceID, task.GetTaskId())
defer log.Printf("[%s] Finished task %s", traceID, task.GetTaskId())
// Your processing logic with trace ID logging
}
Common Error Patterns
1. Resource Exhaustion
Signs:
- Tasks start failing after running for a while
- Memory usage continuously increases
- File descriptor limits reached
Solutions:
- Implement proper resource cleanup
- Add connection pooling
- Set task processing limits
2. Deadlocks
Signs:
- Agent stops processing tasks
- Health checks show agent as “stuck”
Solutions:
- Avoid blocking operations in main goroutines
- Use timeouts for all operations
- Implement deadlock detection
3. Race Conditions
Signs:
- Intermittent task failures
- Inconsistent behavior
- Data corruption
Solutions:
- Use proper synchronization primitives
- Run race detector:
go run -race your-agent.go - Add mutex protection for shared state
With these debugging techniques, you should be able to identify and resolve most agent-related issues efficiently.