1 - How to Create an A2A Task Publisher
Learn how to create an agent that publishes Agent2Agent (A2A) protocol-compliant tasks to other agents through the AgentHub EDA broker.
How to Create an A2A Task Publisher
This guide shows you how to create an agent that publishes Agent2Agent (A2A) protocol-compliant tasks to other agents through the AgentHub Event-Driven Architecture (EDA) broker.
Basic Setup
Using AgentHub’s unified abstractions, creating a publisher is straightforward:
package main
import (
"context"
"fmt"
"time"
"github.com/owulveryck/agenthub/internal/agenthub"
pb "github.com/owulveryck/agenthub/events/a2a"
)
const (
myAgentID = "my_publisher_agent"
)
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel()
// Create configuration with automatic observability
config := agenthub.NewGRPCConfig("publisher")
config.HealthPort = "8081" // Unique port for this publisher
// Create AgentHub client with built-in observability
client, err := agenthub.NewAgentHubClient(config)
if err != nil {
panic("Failed to create AgentHub client: " + err.Error())
}
// Automatic graceful shutdown
defer func() {
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 10*time.Second)
defer shutdownCancel()
if err := client.Shutdown(shutdownCtx); err != nil {
client.Logger.ErrorContext(shutdownCtx, "Error during shutdown", "error", err)
}
}()
// Start the client (enables observability)
if err := client.Start(ctx); err != nil {
client.Logger.ErrorContext(ctx, "Failed to start client", "error", err)
panic(err)
}
// Create A2A task publisher with automatic tracing and metrics
taskPublisher := &agenthub.A2ATaskPublisher{
Client: client.Client,
TraceManager: client.TraceManager,
MetricsManager: client.MetricsManager,
Logger: client.Logger,
ComponentName: "publisher",
AgentID: myAgentID,
}
// Your A2A task publishing code goes here
}
Publishing a Simple A2A Task
Here’s how to publish a basic A2A task using the A2ATaskPublisher abstraction:
func publishSimpleTask(ctx context.Context, taskPublisher *agenthub.A2ATaskPublisher) error {
// Create A2A-compliant content parts
content := []*pb.Part{
{
Part: &pb.Part_Text{
Text: "Hello! Please provide a greeting for Claude.",
},
},
}
// Publish A2A task using the unified abstraction
task, err := taskPublisher.PublishTask(ctx, &agenthub.A2APublishTaskRequest{
TaskType: "greeting",
Content: content,
RequesterAgentID: myAgentID,
ResponderAgentID: "agent_demo_subscriber", // Target agent
Priority: pb.Priority_PRIORITY_HIGH,
ContextID: "ctx_greeting_demo", // Optional: conversation context
})
if err != nil {
return fmt.Errorf("failed to publish greeting task: %w", err)
}
taskPublisher.Logger.InfoContext(ctx, "Published A2A greeting task",
"task_id", task.GetId(),
"context_id", task.GetContextId())
return nil
}
Publishing Different Task Types
Math Calculation Task with A2A Data Parts
func publishMathTask(ctx context.Context, taskPublisher *agenthub.A2ATaskPublisher) error {
// Create A2A-compliant content with structured data
content := []*pb.Part{
{
Part: &pb.Part_Text{
Text: "Please perform the following mathematical calculation:",
},
},
{
Part: &pb.Part_Data{
Data: &pb.DataPart{
Data: &structpb.Struct{
Fields: map[string]*structpb.Value{
"operation": structpb.NewStringValue("multiply"),
"a": structpb.NewNumberValue(15.0),
"b": structpb.NewNumberValue(7.0),
},
},
},
},
},
}
// Publish A2A math task
task, err := taskPublisher.PublishTask(ctx, &agenthub.A2APublishTaskRequest{
TaskType: "math_calculation",
Content: content,
RequesterAgentID: myAgentID,
ResponderAgentID: "agent_demo_subscriber",
Priority: pb.Priority_PRIORITY_MEDIUM,
ContextID: "ctx_math_demo",
})
if err != nil {
return fmt.Errorf("failed to publish math task: %w", err)
}
taskPublisher.Logger.InfoContext(ctx, "Published A2A math task",
"task_id", task.GetId(),
"operation", "multiply")
return nil
}
Data Processing Task
func publishDataProcessingTask(ctx context.Context, taskPublisher *agenthub.TaskPublisher) {
err := taskPublisher.PublishTask(ctx, &agenthub.PublishTaskRequest{
TaskType: "data_processing",
Parameters: map[string]interface{}{
"dataset_path": "/data/customer_data.csv",
"analysis_type": "summary_statistics",
"output_format": "json",
"filters": map[string]interface{}{
"date_range": "last_30_days",
"status": "active",
},
// Metadata is handled automatically by TaskPublisher
"workflow_id": "workflow_123",
"user_id": "user_456",
},
RequesterAgentID: myAgentID,
ResponderAgentID: "data_agent",
Priority: pb.Priority_PRIORITY_HIGH,
})
if err != nil {
panic(fmt.Sprintf("Failed to publish data processing task: %v", err))
}
}
Broadcasting Tasks (No Specific Responder)
To broadcast a task to all available agents, omit the ResponderAgentID:
func broadcastTask(ctx context.Context, taskPublisher *agenthub.TaskPublisher) {
err := taskPublisher.PublishTask(ctx, &agenthub.PublishTaskRequest{
TaskType: "announcement",
Parameters: map[string]interface{}{
"announcement": "Server maintenance in 30 minutes",
"action_required": false,
},
RequesterAgentID: myAgentID,
// ResponderAgentID omitted - will broadcast to all agents
ResponderAgentID: "",
Priority: pb.Priority_PRIORITY_LOW,
})
if err != nil {
panic(fmt.Sprintf("Failed to publish announcement: %v", err))
}
}
Subscribing to Task Results
As a publisher, you’ll want to receive results from tasks you’ve requested. You can use the AgentHub client directly:
func subscribeToResults(ctx context.Context, client *agenthub.AgentHubClient) {
req := &pb.SubscribeToTaskResultsRequest{
RequesterAgentId: myAgentID,
// TaskIds: []string{"specific_task_id"}, // Optional: filter specific tasks
}
stream, err := client.Client.SubscribeToTaskResults(ctx, req)
if err != nil {
client.Logger.ErrorContext(ctx, "Error subscribing to results", "error", err)
return
}
client.Logger.InfoContext(ctx, "Subscribed to task results", "agent_id", myAgentID)
for {
result, err := stream.Recv()
if err != nil {
client.Logger.ErrorContext(ctx, "Error receiving result", "error", err)
return
}
handleTaskResult(ctx, client, result)
}
}
func handleTaskResult(ctx context.Context, client *agenthub.AgentHubClient, result *pb.TaskResult) {
client.Logger.InfoContext(ctx, "Received task result",
"task_id", result.GetTaskId(),
"status", result.GetStatus().String())
switch result.GetStatus() {
case pb.TaskStatus_TASK_STATUS_COMPLETED:
client.Logger.InfoContext(ctx, "Task completed successfully",
"task_id", result.GetTaskId(),
"result", result.GetResult().AsMap())
case pb.TaskStatus_TASK_STATUS_FAILED:
client.Logger.ErrorContext(ctx, "Task failed",
"task_id", result.GetTaskId(),
"error", result.GetErrorMessage())
case pb.TaskStatus_TASK_STATUS_CANCELLED:
client.Logger.InfoContext(ctx, "Task was cancelled",
"task_id", result.GetTaskId())
}
}
Monitoring Task Progress
Subscribe to progress updates to track long-running tasks:
func subscribeToProgress(ctx context.Context, client *agenthub.AgentHubClient) {
req := &pb.SubscribeToTaskResultsRequest{
RequesterAgentId: myAgentID,
}
stream, err := client.Client.SubscribeToTaskProgress(ctx, req)
if err != nil {
client.Logger.ErrorContext(ctx, "Error subscribing to progress", "error", err)
return
}
client.Logger.InfoContext(ctx, "Subscribed to task progress", "agent_id", myAgentID)
for {
progress, err := stream.Recv()
if err != nil {
client.Logger.ErrorContext(ctx, "Error receiving progress", "error", err)
return
}
client.Logger.InfoContext(ctx, "Task progress update",
"task_id", progress.GetTaskId(),
"progress_percentage", progress.GetProgressPercentage(),
"progress_message", progress.GetProgressMessage())
}
}
Complete Publisher Example
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel()
// Create configuration with automatic observability
config := agenthub.NewGRPCConfig("publisher")
config.HealthPort = "8081"
// Create AgentHub client with built-in observability
client, err := agenthub.NewAgentHubClient(config)
if err != nil {
panic("Failed to create AgentHub client: " + err.Error())
}
defer func() {
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 10*time.Second)
defer shutdownCancel()
if err := client.Shutdown(shutdownCtx); err != nil {
client.Logger.ErrorContext(shutdownCtx, "Error during shutdown", "error", err)
}
}()
// Start the client (enables observability)
if err := client.Start(ctx); err != nil {
client.Logger.ErrorContext(ctx, "Failed to start client", "error", err)
panic(err)
}
// Create task publisher with automatic tracing and metrics
taskPublisher := &agenthub.TaskPublisher{
Client: client.Client,
TraceManager: client.TraceManager,
MetricsManager: client.MetricsManager,
Logger: client.Logger,
ComponentName: "publisher",
}
client.Logger.InfoContext(ctx, "Starting publisher demo")
// Publish various tasks with automatic observability
publishMathTask(ctx, taskPublisher)
time.Sleep(2 * time.Second)
publishDataProcessingTask(ctx, taskPublisher)
time.Sleep(2 * time.Second)
broadcastTask(ctx, taskPublisher)
client.Logger.InfoContext(ctx, "All tasks published! Check subscriber logs for results")
}
Best Practices
Always set a unique task ID: Use timestamps, UUIDs, or sequential IDs to ensure uniqueness.
Use appropriate priorities: Reserve PRIORITY_CRITICAL for urgent tasks that must be processed immediately.
Set realistic deadlines: Include deadlines for time-sensitive tasks to help agents prioritize.
Handle results gracefully: Always subscribe to task results and handle failures appropriately.
Include helpful metadata: Add context information that might be useful for debugging or auditing.
Validate parameters: Ensure task parameters are properly structured before publishing.
Use specific responder IDs when possible: This ensures tasks go to the most appropriate agent.
Your publisher is now ready to send tasks to agents and receive results!
2 - How to Create an A2A Task Subscriber (Agent)
Learn how to create an agent that can receive, process, and respond to Agent2Agent (A2A) protocol tasks through the AgentHub EDA broker using A2A-compliant abstractions.
How to Create an A2A Task Subscriber (Agent)
This guide shows you how to create an agent that can receive, process, and respond to Agent2Agent (A2A) protocol tasks through the AgentHub Event-Driven Architecture (EDA) broker using AgentHub’s A2A-compliant abstractions.
Basic Agent Setup
Start by creating the basic structure for your agent using the unified abstraction:
package main
import (
"context"
"os"
"os/signal"
"syscall"
"time"
"github.com/owulveryck/agenthub/internal/agenthub"
pb "github.com/owulveryck/agenthub/events/a2a"
"google.golang.org/protobuf/types/known/structpb"
)
const (
agentID = "my_agent_processor"
)
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Create configuration with automatic observability
config := agenthub.NewGRPCConfig("subscriber")
config.HealthPort = "8082" // Unique port for this agent
// Create AgentHub client with built-in observability
client, err := agenthub.NewAgentHubClient(config)
if err != nil {
panic("Failed to create AgentHub client: " + err.Error())
}
// Automatic graceful shutdown
defer func() {
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 10*time.Second)
defer shutdownCancel()
if err := client.Shutdown(shutdownCtx); err != nil {
client.Logger.ErrorContext(shutdownCtx, "Error during shutdown", "error", err)
}
}()
// Start the client (enables observability)
if err := client.Start(ctx); err != nil {
client.Logger.ErrorContext(ctx, "Failed to start client", "error", err)
panic(err)
}
// Create A2A task subscriber with automatic observability
taskSubscriber := agenthub.NewA2ATaskSubscriber(client, agentID)
// Register A2A task handlers (see below for examples)
taskSubscriber.RegisterDefaultHandlers()
// Handle graceful shutdown
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
go func() {
<-sigChan
client.Logger.Info("Received shutdown signal")
cancel()
}()
client.Logger.InfoContext(ctx, "Starting subscriber agent")
// Start task subscription (with automatic observability)
go func() {
if err := taskSubscriber.SubscribeToTasks(ctx); err != nil {
client.Logger.ErrorContext(ctx, "Task subscription failed", "error", err)
}
}()
// Optional: Subscribe to task results if this agent also publishes tasks
go func() {
if err := taskSubscriber.SubscribeToTaskResults(ctx); err != nil {
client.Logger.ErrorContext(ctx, "Task result subscription failed", "error", err)
}
}()
client.Logger.InfoContext(ctx, "Agent started with observability. Listening for tasks.")
// Wait for context cancellation
<-ctx.Done()
client.Logger.Info("Agent shutdown complete")
}
Default Task Handlers
The RegisterDefaultHandlers() method provides built-in handlers for common task types:
greeting: Simple greeting with name parametermath_calculation: Basic arithmetic operations (add, subtract, multiply, divide)random_number: Random number generation with seed
Custom Task Handlers
Simple Custom Handler
Add your own task handlers using RegisterTaskHandler():
func setupCustomHandlers(taskSubscriber *agenthub.TaskSubscriber) {
// Register a custom data processing handler
taskSubscriber.RegisterTaskHandler("data_processing", handleDataProcessing)
// Register a file conversion handler
taskSubscriber.RegisterTaskHandler("file_conversion", handleFileConversion)
// Register a status check handler
taskSubscriber.RegisterTaskHandler("status_check", handleStatusCheck)
}
func handleDataProcessing(ctx context.Context, task *pb.TaskMessage) (*structpb.Struct, pb.TaskStatus, string) {
params := task.GetParameters()
datasetPath := params.Fields["dataset_path"].GetStringValue()
analysisType := params.Fields["analysis_type"].GetStringValue()
if datasetPath == "" {
return nil, pb.TaskStatus_TASK_STATUS_FAILED, "dataset_path parameter is required"
}
// Simulate data processing
time.Sleep(2 * time.Second)
result, err := structpb.NewStruct(map[string]interface{}{
"dataset_path": datasetPath,
"analysis_type": analysisType,
"records_processed": 1500,
"processing_time": "2.1s",
"summary": map[string]interface{}{
"mean": 42.7,
"median": 41.2,
"stddev": 8.3,
},
"processed_at": time.Now().Format(time.RFC3339),
})
if err != nil {
return nil, pb.TaskStatus_TASK_STATUS_FAILED, "Failed to create result structure"
}
return result, pb.TaskStatus_TASK_STATUS_COMPLETED, ""
}
Advanced Handler with Validation
func handleFileConversion(ctx context.Context, task *pb.TaskMessage) (*structpb.Struct, pb.TaskStatus, string) {
params := task.GetParameters()
// Extract and validate parameters
inputPath := params.Fields["input_path"].GetStringValue()
outputFormat := params.Fields["output_format"].GetStringValue()
if inputPath == "" {
return nil, pb.TaskStatus_TASK_STATUS_FAILED, "input_path parameter is required"
}
if outputFormat == "" {
return nil, pb.TaskStatus_TASK_STATUS_FAILED, "output_format parameter is required"
}
// Validate output format
validFormats := []string{"pdf", "docx", "txt", "html"}
isValidFormat := false
for _, format := range validFormats {
if outputFormat == format {
isValidFormat = true
break
}
}
if !isValidFormat {
return nil, pb.TaskStatus_TASK_STATUS_FAILED, fmt.Sprintf("unsupported output format: %s", outputFormat)
}
// Simulate file conversion process
time.Sleep(1 * time.Second)
outputPath := strings.Replace(inputPath, filepath.Ext(inputPath), "."+outputFormat, 1)
result, err := structpb.NewStruct(map[string]interface{}{
"input_path": inputPath,
"output_path": outputPath,
"output_format": outputFormat,
"file_size": "2.5MB",
"conversion_time": "1.2s",
"status": "success",
"converted_at": time.Now().Format(time.RFC3339),
})
if err != nil {
return nil, pb.TaskStatus_TASK_STATUS_FAILED, "Failed to create result structure"
}
return result, pb.TaskStatus_TASK_STATUS_COMPLETED, ""
}
Handler with External Service Integration
func handleStatusCheck(ctx context.Context, task *pb.TaskMessage) (*structpb.Struct, pb.TaskStatus, string) {
params := task.GetParameters()
serviceURL := params.Fields["service_url"].GetStringValue()
if serviceURL == "" {
return nil, pb.TaskStatus_TASK_STATUS_FAILED, "service_url parameter is required"
}
// Create HTTP client with timeout
client := &http.Client{
Timeout: 10 * time.Second,
}
// Perform health check
resp, err := client.Get(serviceURL + "/health")
if err != nil {
return nil, pb.TaskStatus_TASK_STATUS_FAILED, fmt.Sprintf("Failed to reach service: %v", err)
}
defer resp.Body.Close()
// Determine status
isHealthy := resp.StatusCode >= 200 && resp.StatusCode < 300
status := "unhealthy"
if isHealthy {
status = "healthy"
}
result, err := structpb.NewStruct(map[string]interface{}{
"service_url": serviceURL,
"status": status,
"status_code": resp.StatusCode,
"response_time": "150ms",
"checked_at": time.Now().Format(time.RFC3339),
})
if err != nil {
return nil, pb.TaskStatus_TASK_STATUS_FAILED, "Failed to create result structure"
}
return result, pb.TaskStatus_TASK_STATUS_COMPLETED, ""
}
Complete Agent Example
Here’s a complete agent that handles multiple task types:
package main
import (
"context"
"fmt"
"net/http"
"os"
"os/signal"
"path/filepath"
"strings"
"syscall"
"time"
"github.com/owulveryck/agenthub/internal/agenthub"
pb "github.com/owulveryck/agenthub/events/a2a"
"google.golang.org/protobuf/types/known/structpb"
)
const agentID = "multi_task_agent"
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Create AgentHub client with observability
config := agenthub.NewGRPCConfig("subscriber")
config.HealthPort = "8082"
client, err := agenthub.NewAgentHubClient(config)
if err != nil {
panic("Failed to create AgentHub client: " + err.Error())
}
defer func() {
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 10*time.Second)
defer shutdownCancel()
if err := client.Shutdown(shutdownCtx); err != nil {
client.Logger.ErrorContext(shutdownCtx, "Error during shutdown", "error", err)
}
}()
if err := client.Start(ctx); err != nil {
panic(err)
}
// Create and configure task subscriber
taskSubscriber := agenthub.NewTaskSubscriber(client, agentID)
// Register both default and custom handlers
taskSubscriber.RegisterDefaultHandlers()
setupCustomHandlers(taskSubscriber)
// Graceful shutdown handling
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
go func() {
<-sigChan
client.Logger.Info("Received shutdown signal")
cancel()
}()
client.Logger.InfoContext(ctx, "Starting multi-task agent")
// Start subscriptions
go func() {
if err := taskSubscriber.SubscribeToTasks(ctx); err != nil {
client.Logger.ErrorContext(ctx, "Task subscription failed", "error", err)
}
}()
go func() {
if err := taskSubscriber.SubscribeToTaskResults(ctx); err != nil {
client.Logger.ErrorContext(ctx, "Task result subscription failed", "error", err)
}
}()
client.Logger.InfoContext(ctx, "Agent ready to process tasks",
"supported_tasks", []string{"greeting", "math_calculation", "random_number", "data_processing", "file_conversion", "status_check"})
<-ctx.Done()
client.Logger.Info("Agent shutdown complete")
}
func setupCustomHandlers(taskSubscriber *agenthub.TaskSubscriber) {
taskSubscriber.RegisterTaskHandler("data_processing", handleDataProcessing)
taskSubscriber.RegisterTaskHandler("file_conversion", handleFileConversion)
taskSubscriber.RegisterTaskHandler("status_check", handleStatusCheck)
}
// ... (include the handler functions from above)
Automatic Features
The unified abstraction provides automatic features:
Observability
- Distributed tracing for each task processing
- Metrics collection for processing times and success rates
- Structured logging with correlation IDs
Task Management
- Automatic result publishing back to the broker
- Error handling and status reporting
- Progress tracking capabilities
Resource Management
- Graceful shutdown handling
- Connection management to the broker
- Health endpoints for monitoring
Best Practices
Parameter Validation: Always validate task parameters before processing
if requiredParam == "" {
return nil, pb.TaskStatus_TASK_STATUS_FAILED, "required_param is missing"
}
Error Handling: Provide meaningful error messages
if err != nil {
return nil, pb.TaskStatus_TASK_STATUS_FAILED, fmt.Sprintf("Processing failed: %v", err)
}
Timeouts: Use context with timeouts for external operations
client := &http.Client{Timeout: 10 * time.Second}
Resource Cleanup: Always clean up resources in handlers
defer file.Close()
defer resp.Body.Close()
Structured Results: Return well-structured result data
result, _ := structpb.NewStruct(map[string]interface{}{
"status": "completed",
"timestamp": time.Now().Format(time.RFC3339),
"data": processedData,
})
Handler Function Signature
All task handlers must implement the TaskHandler interface:
type TaskHandler func(ctx context.Context, task *pb.TaskMessage) (*structpb.Struct, pb.TaskStatus, string)
Return values:
*structpb.Struct: The result data (can be nil on failure)pb.TaskStatus: One of:pb.TaskStatus_TASK_STATUS_COMPLETEDpb.TaskStatus_TASK_STATUS_FAILEDpb.TaskStatus_TASK_STATUS_CANCELLED
string: Error message (empty string on success)
Your agent is now ready to receive and process tasks from other agents in the system with full observability and automatic result publishing!
3 -
How to Create an Agent with Cortex Auto-Discovery
This guide shows you how to create an agent using the SubAgent library, which handles all the boilerplate and lets you focus on your agent’s business logic.
What You’ll Build
An agent that:
- Automatically registers with the broker on startup
- Gets discovered by Cortex for LLM-based task delegation
- Processes delegated tasks and returns results
- Has built-in observability (tracing, logging, metrics)
- Handles graceful shutdown
All with ~50 lines of code instead of 500+.
Prerequisites
- AgentHub broker running
- Cortex orchestrator running
- Basic understanding of Go
- Familiarity with the A2A protocol (helpful but not required)
Step 1: Import the SubAgent Library
package main
import (
"context"
"fmt"
"log"
"time"
pb "github.com/owulveryck/agenthub/events/a2a"
"github.com/owulveryck/agenthub/internal/subagent"
"google.golang.org/protobuf/types/known/structpb"
)
Step 2: Define Your Agent Configuration
func main() {
// Configure your agent with required fields
config := &subagent.Config{
AgentID: "agent_translator", // Unique agent identifier
ServiceName: "translator_service", // Optional gRPC service name
Name: "Translation Agent", // Human-readable name
Description: "Translates text between languages using AI models",
Version: "1.0.0", // Optional, defaults to 1.0.0
HealthPort: "8087", // Optional, defaults to 8080
}
// Create the subagent
agent, err := subagent.New(config)
if err != nil {
log.Fatal(err)
}
Step 3: Register Your Skills
Skills are capabilities your agent provides. Each skill needs a handler function:
// Add a translation skill
agent.MustAddSkill(
"Language Translation", // Skill name (shown to LLM)
"Translates text from one language to another", // Description
translateHandler, // Your handler function
)
// You can add multiple skills
agent.MustAddSkill(
"Language Detection",
"Detects the language of input text",
detectLanguageHandler,
)
Best Practices for Skill Definition
- Clear Names: Use descriptive skill names that the LLM can understand
- Specific Descriptions: Explain what the skill does and when to use it
- Multiple Skills: An agent can have multiple related skills
Step 4: Implement Your Handler Functions
A handler function receives a task and returns a result:
// Handler signature: (ctx, task, message) -> (artifact, state, errorMessage)
func translateHandler(ctx context.Context, task *pb.Task, message *pb.Message) (*pb.Artifact, pb.TaskState, string) {
// 1. Extract input from the message
var inputText string
for _, part := range message.Content {
if text := part.GetText(); text != "" {
inputText = text
break
}
}
if inputText == "" {
return nil, pb.TaskState_TASK_STATE_FAILED, "No input text provided"
}
// 2. Extract parameters from metadata (optional)
targetLang := "en" // default
if message.Metadata != nil && message.Metadata.Fields != nil {
if lang, exists := message.Metadata.Fields["target_language"]; exists {
targetLang = lang.GetStringValue()
}
}
// 3. Perform your business logic
translatedText, err := performTranslation(ctx, inputText, targetLang)
if err != nil {
return nil, pb.TaskState_TASK_STATE_FAILED, fmt.Sprintf("Translation failed: %v", err)
}
// 4. Create an artifact with your result
artifact := &pb.Artifact{
ArtifactId: fmt.Sprintf("translation_%s_%d", task.GetId(), time.Now().Unix()),
Name: "translation_result",
Description: fmt.Sprintf("Translation to %s", targetLang),
Parts: []*pb.Part{
{
Part: &pb.Part_Text{
Text: translatedText,
},
},
},
Metadata: &structpb.Struct{
Fields: map[string]*structpb.Value{
"original_text": structpb.NewStringValue(inputText),
"target_language": structpb.NewStringValue(targetLang),
"translated_at": structpb.NewStringValue(time.Now().Format(time.RFC3339)),
},
},
}
// 5. Return success
return artifact, pb.TaskState_TASK_STATE_COMPLETED, ""
}
func performTranslation(ctx context.Context, text, targetLang string) (string, error) {
// Implement your actual translation logic here
// This could call an external API, use a local model, etc.
// Example placeholder:
return fmt.Sprintf("[Translated to %s]: %s", targetLang, text), nil
}
Handler Return Values
Your handler returns three values:
*pb.Artifact: The result data (or nil if failed)pb.TaskState: Status code (TASK_STATE_COMPLETED, TASK_STATE_FAILED, etc.)string: Error message (empty string if successful)
Step 5: Run Your Agent
// Run the agent (blocks until shutdown signal)
if err := agent.Run(context.Background()); err != nil {
log.Fatal(err)
}
}
That’s it! The SubAgent library handles:
- ✅ gRPC client setup and connection
- ✅ Agent card creation with A2A-compliant structure
- ✅ Broker registration and auto-discovery by Cortex
- ✅ Task subscription and routing
- ✅ Distributed tracing (automatic span creation)
- ✅ Structured logging (all operations logged)
- ✅ Graceful shutdown (SIGINT/SIGTERM handling)
- ✅ Health checks
- ✅ Error handling
Complete Example
Here’s a full working agent in ~80 lines:
package main
import (
"context"
"fmt"
"log"
"time"
pb "github.com/owulveryck/agenthub/events/a2a"
"github.com/owulveryck/agenthub/internal/subagent"
"google.golang.org/protobuf/types/known/structpb"
)
func main() {
config := &subagent.Config{
AgentID: "agent_translator",
ServiceName: "translator_service",
Name: "Translation Agent",
Description: "Translates text between languages using AI models",
Version: "1.0.0",
HealthPort: "8087",
}
agent, err := subagent.New(config)
if err != nil {
log.Fatal(err)
}
agent.MustAddSkill(
"Language Translation",
"Translates text from one language to another. Supports major languages including English, Spanish, French, German, Japanese, and Chinese",
translateHandler,
)
if err := agent.Run(context.Background()); err != nil {
log.Fatal(err)
}
}
func translateHandler(ctx context.Context, task *pb.Task, message *pb.Message) (*pb.Artifact, pb.TaskState, string) {
var inputText string
for _, part := range message.Content {
if text := part.GetText(); text != "" {
inputText = text
break
}
}
if inputText == "" {
return nil, pb.TaskState_TASK_STATE_FAILED, "No input text provided"
}
targetLang := "en"
if message.Metadata != nil && message.Metadata.Fields != nil {
if lang, exists := message.Metadata.Fields["target_language"]; exists {
targetLang = lang.GetStringValue()
}
}
translatedText := fmt.Sprintf("[Translated to %s]: %s", targetLang, inputText)
artifact := &pb.Artifact{
ArtifactId: fmt.Sprintf("translation_%s_%d", task.GetId(), time.Now().Unix()),
Name: "translation_result",
Description: fmt.Sprintf("Translation to %s", targetLang),
Parts: []*pb.Part{
{Part: &pb.Part_Text{Text: translatedText}},
},
Metadata: &structpb.Struct{
Fields: map[string]*structpb.Value{
"original_text": structpb.NewStringValue(inputText),
"target_language": structpb.NewStringValue(targetLang),
},
},
}
return artifact, pb.TaskState_TASK_STATE_COMPLETED, ""
}
Build and Test
# Build your agent
go build -o bin/translator ./agents/translator
# Start broker (if not running)
./bin/broker &
# Start Cortex (if not running)
./bin/cortex &
# Start your agent
./bin/translator
Verify Registration
Check the logs:
# Your agent logs should show:
time=... level=INFO msg="Agent card registered" agent_id=agent_translator skills=1
time=... level=INFO msg="Agent started successfully" agent_id=agent_translator name="Translation Agent" skills=1
# Cortex logs should show:
time=... level=INFO msg="Received agent card event" agent_id=agent_translator event_type=registered
time=... level=INFO msg="Agent registered with Cortex orchestrator" agent_id=agent_translator total_agents=N
Test with Chat CLI
./bin/chat_cli
# Try these prompts (if using VertexAI LLM):
> Can you translate "hello world" to Spanish?
> Translate "good morning" to French
How It Works
sequenceDiagram
participant A as Your Agent
participant SL as SubAgent Library
participant B as Broker
participant C as Cortex
participant L as LLM
participant U as User
Note over A: agent.Run() called
A->>SL: Initialize
SL->>B: RegisterAgent(AgentCard)
B->>C: AgentCardEvent
C->>C: Register agent
Note over C: Agent now available
U->>C: "Translate this to Spanish"
C->>L: Decide(history, agents, message)
L-->>C: Delegate to translator
C->>B: Publish task
B->>SL: Route to agent
SL->>SL: Start tracing span
SL->>SL: Log task receipt
SL->>A: Call your handler
A->>A: Process translation
A-->>SL: Return artifact
SL->>SL: Log completion
SL->>SL: End tracing span
SL->>B: Publish result
B->>C: Route result
C->>U: "Aquí está: Hola mundo"Advanced Usage
Accessing the Client
If you need access to the underlying AgentHub client:
client := agent.GetClient()
logger := agent.GetLogger()
config := agent.GetConfig()
Custom Configuration
config := &subagent.Config{
AgentID: "my_agent",
ServiceName: "custom_service_name", // Optional
Name: "My Agent",
Description: "Does amazing things",
Version: "2.0.0",
HealthPort: "9000",
BrokerAddr: "broker.example.com", // Optional
BrokerPort: "50051", // Optional
}
Multiple Skills Example
agent.MustAddSkill("Skill A", "Description A", handlerA)
agent.MustAddSkill("Skill B", "Description B", handlerB)
agent.MustAddSkill("Skill C", "Description C", handlerC)
// Each skill gets its own handler function
// The SubAgent library routes tasks to the correct handler based on task type
Error Handling in Handlers
func myHandler(ctx context.Context, task *pb.Task, message *pb.Message) (*pb.Artifact, pb.TaskState, string) {
result, err := doWork(ctx, message)
if err != nil {
// Return failure with error message
return nil, pb.TaskState_TASK_STATE_FAILED, err.Error()
}
// Return success with result
artifact := createArtifact(result)
return artifact, pb.TaskState_TASK_STATE_COMPLETED, ""
}
What the SubAgent Library Provides
Automatic Setup
- gRPC client connection to broker
- Health check endpoint
- Signal handling for graceful shutdown
- Configuration validation with defaults
A2A-Compliant AgentCard
- Correct protocol version (0.2.9)
- Proper capabilities structure
- Complete skill definitions with all required fields
- Automatic skill ID generation and tagging
Observability
- Tracing: Automatic span creation for each task with attributes
- Logging: Structured logging for all operations (registration, task receipt, completion, errors)
- Metrics: Built-in metrics collection (via AgentHub client)
Task Management
- Automatic task subscription
- Skill-based handler routing
- Error handling and reporting
- Result publishing
Developer Experience
- Simple 3-step API:
New() → AddSkill() → Run() - Clear error messages
- Type-safe handler functions
- Automatic resource cleanup
Best Practices
1. Skill Design
- Be specific: Clear descriptions help the LLM delegate correctly
- Single responsibility: Each skill should do one thing well
- Related skills: Group related capabilities in one agent
2. Handler Implementation
- Validate input: Always check that required data is present
- Handle errors gracefully: Return meaningful error messages
- Include metadata: Add useful context to your artifacts
- Keep it focused: Handlers should do one thing
3. Configuration
- Unique ports: Each agent needs a unique health port
- Meaningful names: Use descriptive agent IDs and names
- Version appropriately: Use semantic versioning
4. Testing
- Unit test handlers: Test business logic independently
- Integration test: Verify agent works with broker and Cortex
- E2E test: Test the complete flow with the LLM
Troubleshooting
Build Errors
Import issues:
go mod tidy
go mod download
Agent Not Registering
Check:
- Broker is running and accessible
- Config has all required fields (AgentID, Name, Description)
- Health port is not in use by another service
- Logs for specific error messages
Tasks Not Reaching Agent
Check:
- Cortex is running and has discovered your agent
- Skill names and descriptions match what users are asking for
- LLM is configured (not using mock LLM for delegation)
- Check broker and Cortex logs for routing events
Handler Errors
Check:
- Handler function signature matches
TaskHandler type - Input validation is working correctly
- Error messages are being returned properly
- Logs show task receipt and processing
Next Steps
With the SubAgent library, creating production-ready agents is now as simple as defining your configuration, implementing your business logic, and calling Run()!
4 -
How to Design Effective Agent Cards
This guide shows you how to design AgentCards that enable effective LLM-based discovery and delegation in the Cortex orchestration system.
Why AgentCards Matter
When you register an agent with AgentHub, the Cortex orchestrator uses your AgentCard to:
- Understand your agent’s capabilities - What can it do?
- Match user requests - Does this request fit this agent?
- Generate LLM prompts - Include your agent in decision-making
- Delegate tasks - Route appropriate work to your agent
The quality of your AgentCard directly impacts how effectively Cortex can use your agent.
AgentCard Structure
type AgentCard struct {
ProtocolVersion string // A2A protocol version (e.g., "0.2.9")
Name string // Unique agent identifier
Description string // Human-readable description
Version string // Agent version (e.g., "1.0.0")
Url string // Service endpoint (optional)
Capabilities *AgentCapabilities // Technical capabilities
Skills []*AgentSkill // What the agent can do
// ... other fields
}
The most important field for Cortex integration is Skills.
Designing Skills
Each skill represents a specific capability your agent offers. The LLM uses skill information to decide when to delegate to your agent.
Skill Structure
type AgentSkill struct {
Id string // Unique skill identifier
Name string // Human-readable skill name
Description string // Detailed description of what this skill does
Tags []string // Categorization and keywords
Examples []string // Example user requests that match this skill
InputModes []string // Supported input types (e.g., "text/plain")
OutputModes []string // Supported output types
}
Writing Effective Descriptions
❌ Poor description:
Description: "Processes data"
✅ Good description:
Description: "Analyzes time-series data to detect anomalies using statistical methods. " +
"Supports multiple algorithms including Z-score, moving average, and ARIMA. " +
"Returns anomaly locations, severity scores, and confidence intervals."
Why the good description works:
- Specific about what it does (“analyzes time-series data”)
- Mentions the method (“statistical methods”)
- Lists supported features (“Z-score, moving average, ARIMA”)
- Describes output (“anomaly locations, severity scores, confidence intervals”)
Writing Powerful Examples
Examples are critical - the LLM uses them to recognize when a user request matches your skill.
❌ Weak examples:
Examples: []string{
"analyze data",
"find problems",
}
✅ Strong examples:
Examples: []string{
"Can you detect anomalies in this time series?",
"Find unusual patterns in the sensor data",
"Analyze this dataset for outliers",
"Check if there are any abnormal readings",
"Identify spikes or drops in the data",
"Run anomaly detection on this log file",
"Are there any suspicious values in this series?",
}
Why strong examples work:
- Variety: Different phrasings (“detect anomalies”, “find unusual patterns”, “outliers”)
- Natural language: How users actually ask questions
- Specific: Mention domain terms (“time series”, “sensor data”, “log file”)
- Action-oriented: Clear about what to do
- Multiple formats: Questions and commands
Example Categories to Cover
For each skill, include examples that cover:
- Direct requests: “Translate this text to Spanish”
- Questions: “Can you convert this to French?”
- Implied tasks: “I need this in German”
- Variations: “Spanish translation please”
- With context: “Translate the following paragraph to Japanese: …”
- Different phrasings: “Convert to Spanish”, “Change to Spanish”, “Make it Spanish”
Complete Examples
Example 1: Translation Agent
agentCard := &pb.AgentCard{
ProtocolVersion: "0.2.9",
Name: "agent_translator",
Description: "Professional-grade language translation service powered by neural machine translation. " +
"Supports 100+ languages with context-aware translation and proper handling of idioms, " +
"technical terms, and cultural nuances.",
Version: "2.1.0",
Capabilities: &pb.AgentCapabilities{
Streaming: false,
PushNotifications: false,
},
Skills: []*pb.AgentSkill{
{
Id: "translate_text",
Name: "Text Translation",
Description: "Translates text between any pair of 100+ supported languages including " +
"English, Spanish, French, German, Chinese, Japanese, Arabic, Russian, and many more. " +
"Preserves formatting, handles idioms, and maintains context. " +
"Supports both short phrases and long documents.",
Tags: []string{
"translation", "language", "nlp", "localization",
"multilingual", "i18n", "communication",
},
Examples: []string{
"Translate this to Spanish",
"Can you convert this text to French?",
"I need this paragraph in Japanese",
"Translate from English to German",
"What does this mean in Chinese?",
"Convert this Spanish text to English",
"Please translate to Portuguese",
"How do you say this in Italian?",
"Russian translation needed",
"Change this to Arabic",
},
InputModes: []string{"text/plain", "text/html"},
OutputModes: []string{"text/plain", "text/html"},
},
{
Id: "detect_language",
Name: "Language Detection",
Description: "Automatically identifies the language of input text with high accuracy. " +
"Can detect 100+ languages and provides confidence scores. " +
"Useful for routing, preprocessing, and automatic translation workflows.",
Tags: []string{"language", "detection", "nlp", "identification"},
Examples: []string{
"What language is this text in?",
"Detect the language",
"Can you identify this language?",
"Which language is this?",
"Tell me what language this is",
},
InputModes: []string{"text/plain"},
OutputModes: []string{"text/plain"},
},
},
}
Example 2: Data Analysis Agent
agentCard := &pb.AgentCard{
ProtocolVersion: "0.2.9",
Name: "agent_data_analyst",
Description: "Advanced data analysis and statistical computing agent. Performs exploratory " +
"data analysis, statistical tests, correlation analysis, and generates insights from datasets.",
Version: "1.5.2",
Capabilities: &pb.AgentCapabilities{
Streaming: true, // Can stream large results
PushNotifications: false,
},
Skills: []*pb.AgentSkill{
{
Id: "analyze_dataset",
Name: "Dataset Analysis",
Description: "Performs comprehensive statistical analysis on datasets including " +
"descriptive statistics (mean, median, std dev), distribution analysis, " +
"outlier detection, correlation matrices, and trend identification. " +
"Supports CSV, JSON, and structured data formats.",
Tags: []string{
"data-analysis", "statistics", "analytics", "dataset",
"eda", "exploratory", "insights",
},
Examples: []string{
"Analyze this dataset",
"Can you provide statistics for this data?",
"What are the key insights from this CSV?",
"Run an analysis on this data file",
"Give me a statistical summary",
"Find correlations in this dataset",
"What patterns do you see in this data?",
"Analyze the distribution of these values",
"Calculate descriptive statistics",
"Identify trends in this time series",
},
InputModes: []string{"text/csv", "application/json", "text/plain"},
OutputModes: []string{"application/json", "text/plain", "text/html"},
},
{
Id: "visualize_data",
Name: "Data Visualization",
Description: "Creates charts and graphs from data including line charts, bar charts, " +
"scatter plots, histograms, box plots, and heatmaps. Returns visualization " +
"specifications in various formats.",
Tags: []string{"visualization", "charts", "graphs", "plotting"},
Examples: []string{
"Create a chart from this data",
"Visualize this dataset",
"Make a graph of these values",
"Plot this time series",
"Show me a chart",
"Generate a histogram",
"Can you create a scatter plot?",
},
InputModes: []string{"text/csv", "application/json"},
OutputModes: []string{"image/png", "application/json", "text/html"},
},
},
}
Example 3: Image Processing Agent
agentCard := &pb.AgentCard{
ProtocolVersion: "0.2.9",
Name: "agent_image_processor",
Description: "Image processing and computer vision agent with capabilities for transformation, " +
"enhancement, analysis, and object detection. Supports all major image formats.",
Version: "3.0.0",
Skills: []*pb.AgentSkill{
{
Id: "resize_image",
Name: "Image Resizing",
Description: "Resizes images to specified dimensions while maintaining aspect ratio " +
"and quality. Supports various scaling algorithms including bicubic, lanczos, " +
"and nearest neighbor. Can batch process multiple images.",
Tags: []string{"image", "resize", "scale", "transform", "dimensions"},
Examples: []string{
"Resize this image to 800x600",
"Make this image smaller",
"Scale this photo to 50%",
"Can you resize to thumbnail size?",
"Change image dimensions",
"Shrink this image",
"Make it 1920x1080",
},
InputModes: []string{"image/jpeg", "image/png", "image/webp"},
OutputModes: []string{"image/jpeg", "image/png", "image/webp"},
},
{
Id: "detect_objects",
Name: "Object Detection",
Description: "Detects and identifies objects in images using deep learning models. " +
"Can recognize 1000+ object categories including people, animals, vehicles, " +
"furniture, and more. Returns bounding boxes and confidence scores.",
Tags: []string{
"computer-vision", "object-detection", "ai", "recognition",
"detection", "classification",
},
Examples: []string{
"What objects are in this image?",
"Detect objects in this photo",
"What do you see in this picture?",
"Identify items in this image",
"Find all people in this photo",
"Detect cars in this image",
"What's in this picture?",
},
InputModes: []string{"image/jpeg", "image/png"},
OutputModes: []string{"application/json", "text/plain"},
},
},
}
Best Practices Checklist
✅ Description Quality
✅ Skill Design
✅ Examples Coverage
Testing Your AgentCard
1. Manual Testing
Start your agent and check Cortex logs:
grep "Agent skills registered" cortex.log
You should see your skill descriptions displayed.
2. LLM Prompt Testing
Check what the LLM sees by enabling DEBUG logging in Cortex:
LOG_LEVEL=DEBUG ./bin/cortex
Look for prompts that include:
Available agents:
- your_agent: Your agent description
Skills:
* Skill Name: Skill description
3. Integration Testing
Test with actual user requests:
# Start services
./bin/broker &
./bin/cortex &
./bin/your_agent &
# Use chat CLI
./bin/chat_cli
# Try requests that match your examples
> Can you translate this to Spanish?
Watch the logs to see if Cortex delegates to your agent.
Common Mistakes to Avoid
❌ Vague Descriptions
Description: "Does useful things"
Problem: LLM can’t determine if this agent is suitable
❌ Too Few Examples
Examples: []string{"do the thing"}
Problem: LLM won’t recognize variations
❌ Technical Jargon in Examples
Examples: []string{
"Execute POST /api/v1/translate with payload",
}
Problem: Users don’t talk like this
❌ Overly Broad Skills
{
Name: "Do Everything",
Description: "This agent can help with anything",
}
Problem: LLM can’t make good decisions
❌ Missing Context
{
Name: "Process",
Description: "Processes the input",
}
Problem: What kind of processing? What input?
Advanced Topics
Multi-Language Support
Include examples in multiple languages if your agent supports them:
Examples: []string{
"Translate to Spanish",
"Traduire en français",
"Übersetzen Sie nach Deutsch",
"日本語に翻訳",
}
Conditional Capabilities
Use metadata to indicate conditional features:
Metadata: &structpb.Struct{
Fields: map[string]*structpb.Value{
"requires_api_key": structpb.NewBoolValue(true),
"max_input_size": structpb.NewNumberValue(10000),
"rate_limit": structpb.NewStringValue("100/minute"),
},
}
Skill Dependencies
Indicate if skills build on each other:
{
Id: "advanced_analysis",
Description: "Advanced statistical analysis. Requires dataset to be preprocessed " +
"using the 'clean_data' skill first.",
}
Iteration and Improvement
Your AgentCard isn’t set in stone. Improve it based on:
- Usage patterns: What requests do users actually make?
- Delegation success: Is Cortex routing appropriate tasks?
- User feedback: Are users getting what they expect?
- LLM behavior: What decisions is the LLM making?
Update your AgentCard and restart your agent to reflect improvements.
Next Steps
Well-designed AgentCards are the key to effective AI orchestration!