AgentHub API Reference
AgentHub API Reference
This document provides complete technical reference for the AgentHub API, including all gRPC services, message types, and operational details.
gRPC Service Definition
The AgentHub broker implements the EventBus service as defined in proto/eventbus.proto:100:
service EventBus {
// Publisher methods
rpc PublishTask (PublishTaskRequest) returns (PublishResponse);
rpc PublishTaskResult (PublishTaskResultRequest) returns (PublishResponse);
rpc PublishTaskProgress (PublishTaskProgressRequest) returns (PublishResponse);
// Subscriber methods
rpc SubscribeToTasks (SubscribeToTasksRequest) returns (stream TaskMessage);
rpc SubscribeToTaskResults (SubscribeToTaskResultsRequest) returns (stream TaskResult);
rpc SubscribeToTaskProgress (SubscribeToTaskResultsRequest) returns (stream TaskProgress);
}
Message Types
Core Task Messages
TaskMessage
Represents a task to be executed by an agent.
message TaskMessage {
string task_id = 1; // Required: Unique identifier
string task_type = 2; // Required: Type of task
google.protobuf.Struct parameters = 3; // Optional: Task parameters
string requester_agent_id = 4; // Required: Requesting agent ID
string responder_agent_id = 5; // Optional: Target agent ID
google.protobuf.Timestamp deadline = 6; // Optional: Task deadline
Priority priority = 7; // Optional: Task priority (default: UNSPECIFIED)
google.protobuf.Struct metadata = 8; // Optional: Additional metadata
google.protobuf.Timestamp created_at = 9; // Required: Creation timestamp
}
Field Details:
task_id: Must be unique across all tasks. Recommended format:{task_type}_{timestamp}_{uuid}task_type: Semantic identifier for task category (e.g., “data_analysis”, “image_processing”)parameters: Flexible JSON-like structure containing task-specific parametersrequester_agent_id: ID of the agent requesting the taskresponder_agent_id: If specified, task is routed only to this agent; otherwise broadcastdeadline: RFC3339 timestamp indicating when task must completepriority: Task priority level (see Priority enum)metadata: Additional context information for debugging, routing, or processingcreated_at: Timestamp when task was created
TaskResult
Represents the completion result of a task.
message TaskResult {
string task_id = 1; // Required: Reference to original task
TaskStatus status = 2; // Required: Completion status
google.protobuf.Struct result = 3; // Optional: Task results
string error_message = 4; // Optional: Error details if failed
string executor_agent_id = 5; // Required: Executing agent ID
google.protobuf.Timestamp completed_at = 6; // Required: Completion timestamp
google.protobuf.Struct execution_metadata = 7; // Optional: Execution details
}
Field Details:
task_id: Must match the original task’stask_idstatus: Final status of task execution (see TaskStatus enum)result: Structured result data if task completed successfullyerror_message: Human-readable error description if status is FAILEDexecutor_agent_id: ID of the agent that executed the taskcompleted_at: Timestamp when task execution finishedexecution_metadata: Additional execution details (timing, resource usage, etc.)
TaskProgress
Represents progress updates during task execution.
message TaskProgress {
string task_id = 1; // Required: Reference to original task
TaskStatus status = 2; // Required: Current status
string progress_message = 3; // Optional: Human-readable progress
int32 progress_percentage = 4; // Optional: Progress as percentage (0-100)
google.protobuf.Struct progress_data = 5; // Optional: Structured progress data
string executor_agent_id = 6; // Required: Executing agent ID
google.protobuf.Timestamp updated_at = 7; // Required: Progress update timestamp
}
Field Details:
task_id: Must match the original task’stask_idstatus: Current execution status (typically IN_PROGRESS)progress_message: Human-readable description of current activityprogress_percentage: Numeric progress indicator (0-100)progress_data: Structured data about progress (e.g., records processed, files completed)executor_agent_id: ID of the agent executing the taskupdated_at: Timestamp of this progress update
Enums
Priority
Defines task priority levels:
enum Priority {
PRIORITY_UNSPECIFIED = 0; // Default priority
PRIORITY_LOW = 1; // Background tasks
PRIORITY_MEDIUM = 2; // Normal tasks
PRIORITY_HIGH = 3; // Important tasks
PRIORITY_CRITICAL = 4; // Urgent tasks requiring immediate attention
}
Usage Guidelines:
PRIORITY_LOW: Batch jobs, maintenance tasks, background processingPRIORITY_MEDIUM: Standard user requests, regular business operationsPRIORITY_HIGH: User-facing operations, time-sensitive tasksPRIORITY_CRITICAL: Emergency operations, system alerts, health checks
TaskStatus
Defines task execution states:
enum TaskStatus {
TASK_STATUS_UNSPECIFIED = 0; // Default/unknown status
TASK_STATUS_PENDING = 1; // Task queued, waiting for execution
TASK_STATUS_IN_PROGRESS = 2; // Task currently being processed
TASK_STATUS_COMPLETED = 3; // Task finished successfully
TASK_STATUS_FAILED = 4; // Task failed with error
TASK_STATUS_CANCELLED = 5; // Task was cancelled
}
State Transitions:
PENDING → IN_PROGRESS → COMPLETED
↓
FAILED
↓
CANCELLED (from any state)
Request/Response Messages
PublishTaskRequest
message PublishTaskRequest {
TaskMessage task = 1; // Required: Task to publish
}
PublishTaskResultRequest
message PublishTaskResultRequest {
TaskResult result = 1; // Required: Task result to publish
}
PublishTaskProgressRequest
message PublishTaskProgressRequest {
TaskProgress progress = 1; // Required: Progress update to publish
}
PublishResponse
message PublishResponse {
bool success = 1; // True if message was accepted
string error = 2; // Error message if success is false
}
SubscribeToTasksRequest
message SubscribeToTasksRequest {
string agent_id = 1; // Required: Agent ID to receive tasks for
repeated string task_types = 2; // Optional: Filter by task types
}
Filtering Behavior:
- If
task_typesis empty: Agent receives all tasks addressed to them - If
task_typesis specified: Agent only receives tasks with matchingtask_type
SubscribeToTaskResultsRequest
message SubscribeToTaskResultsRequest {
string requester_agent_id = 1; // Required: Agent ID that requested tasks
repeated string task_ids = 2; // Optional: Filter by specific task IDs
}
Filtering Behavior:
- If
task_idsis empty: Agent receives results for all tasks they requested - If
task_idsis specified: Agent only receives results for those specific tasks
API Operations
Publishing Operations
PublishTask
Publishes a new task for execution.
Go Example:
task := &pb.TaskMessage{
TaskId: "task_data_analysis_123",
TaskType: "data_analysis",
Parameters: parametersStruct,
RequesterAgentId: "analytics_coordinator",
ResponderAgentId: "data_processor_agent", // Optional
Priority: pb.Priority_PRIORITY_HIGH,
CreatedAt: timestamppb.Now(),
}
response, err := client.PublishTask(ctx, &pb.PublishTaskRequest{
Task: task,
})
Validation Rules:
task_idmust be non-empty and uniquetask_typemust be non-emptyrequester_agent_idmust be non-emptycreated_atmust be set
Error Conditions:
InvalidArgument: Required fields missing or invalidInternal: Server error during processing
PublishTaskResult
Publishes the result of task execution.
Go Example:
result := &pb.TaskResult{
TaskId: originalTask.GetTaskId(),
Status: pb.TaskStatus_TASK_STATUS_COMPLETED,
Result: resultStruct,
ExecutorAgentId: "data_processor_agent",
CompletedAt: timestamppb.Now(),
}
response, err := client.PublishTaskResult(ctx, &pb.PublishTaskResultRequest{
Result: result,
})
PublishTaskProgress
Publishes progress updates during task execution.
Go Example:
progress := &pb.TaskProgress{
TaskId: originalTask.GetTaskId(),
Status: pb.TaskStatus_TASK_STATUS_IN_PROGRESS,
ProgressMessage: "Processing batch 3 of 10",
ProgressPercentage: 30,
ExecutorAgentId: "data_processor_agent",
UpdatedAt: timestamppb.Now(),
}
response, err := client.PublishTaskProgress(ctx, &pb.PublishTaskProgressRequest{
Progress: progress,
})
Subscription Operations
SubscribeToTasks
Subscribes to receive tasks assigned to an agent.
Go Example:
req := &pb.SubscribeToTasksRequest{
AgentId: "data_processor_agent",
TaskTypes: []string{"data_analysis", "data_transformation"}, // Optional
}
stream, err := client.SubscribeToTasks(ctx, req)
if err != nil {
return err
}
for {
task, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
return err
}
// Process task
go processTask(ctx, task, client)
}
Stream Behavior:
- Long-lived bidirectional stream
- Messages are pushed immediately when available
- Stream closes when client disconnects or context is cancelled
- Automatic cleanup removes subscription when stream closes
SubscribeToTaskResults
Subscribes to receive results of tasks requested by an agent.
Go Example:
req := &pb.SubscribeToTaskResultsRequest{
RequesterAgentId: "analytics_coordinator",
TaskIds: []string{"task_123", "task_456"}, // Optional
}
stream, err := client.SubscribeToTaskResults(ctx, req)
if err != nil {
return err
}
for {
result, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
return err
}
// Handle result
handleTaskResult(result)
}
SubscribeToTaskProgress
Subscribes to receive progress updates for tasks requested by an agent.
Go Example:
req := &pb.SubscribeToTaskResultsRequest{
RequesterAgentId: "analytics_coordinator",
}
stream, err := client.SubscribeToTaskProgress(ctx, req)
if err != nil {
return err
}
for {
progress, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
return err
}
// Handle progress update
updateProgressUI(progress)
}
Error Handling
gRPC Status Codes
AgentHub uses standard gRPC status codes:
InvalidArgument (Code: 3)
Causes:
- Missing required fields in request
- Invalid field values (empty task_id, etc.)
- Malformed timestamps or enums
Example Response:
rpc error: code = InvalidArgument desc = task_id cannot be empty
Internal (Code: 13)
Causes:
- Server-side processing errors
- Message serialization failures
- Resource allocation failures
Example Response:
rpc error: code = Internal desc = failed to route task
Unavailable (Code: 14)
Causes:
- Broker server not running
- Network connectivity issues
- Server overload
Example Response:
rpc error: code = Unavailable desc = connection refused
Error Recovery Patterns
Retry Logic
func publishTaskWithRetry(ctx context.Context, client pb.EventBusClient, task *pb.TaskMessage) error {
var lastErr error
for attempt := 0; attempt < 3; attempt++ {
_, err := client.PublishTask(ctx, &pb.PublishTaskRequest{Task: task})
if err == nil {
return nil
}
lastErr = err
// Check if error is retryable
if grpc.Code(err) == codes.InvalidArgument {
return err // Don't retry validation errors
}
// Exponential backoff
time.Sleep(time.Duration(1<<attempt) * time.Second)
}
return lastErr
}
Stream Reconnection
func subscribeWithReconnect(ctx context.Context, client pb.EventBusClient, agentID string) {
for {
err := subscribeToTasks(ctx, client, agentID)
if ctx.Err() != nil {
return // Context cancelled
}
log.Printf("Subscription failed: %v, reconnecting in 5s...", err)
time.Sleep(5 * time.Second)
}
}
Performance Considerations
Message Size Limits
- Maximum message size: 4MB (gRPC default)
- Recommended message size: <100KB for optimal performance
- Large payloads: Consider using external storage with references
Throughput Optimization
Batch Operations
For high-throughput scenarios, consider batching multiple operations:
// Instead of individual calls
for _, task := range tasks {
client.PublishTask(ctx, &pb.PublishTaskRequest{Task: task})
}
// Consider client-side batching
taskBatch := make([]*pb.TaskMessage, 0, 100)
for _, task := range tasks {
taskBatch = append(taskBatch, task)
if len(taskBatch) >= 100 {
publishBatch(ctx, client, taskBatch)
taskBatch = taskBatch[:0]
}
}
Connection Reuse
Reuse gRPC connections for better performance:
// Single connection for multiple operations
conn, err := grpc.Dial(address, opts...)
if err != nil {
return err
}
defer conn.Close()
client := pb.NewEventBusClient(conn)
// Use client for multiple operations
Memory Management
Struct Reuse
Reuse message structs to reduce garbage collection:
var taskPool = sync.Pool{
New: func() interface{} {
return &pb.TaskMessage{}
},
}
func publishTask(params TaskParams) {
task := taskPool.Get().(*pb.TaskMessage)
defer taskPool.Put(task)
// Reset and populate task
*task = pb.TaskMessage{
TaskId: params.ID,
TaskType: params.Type,
// ... other fields
}
// Publish task
}
This completes the comprehensive API reference for AgentHub. All message types, operations, and integration patterns are documented with practical examples and error handling guidance.
Feedback
Was this page helpful?
Glad to hear it! Please tell us how we can improve.
Sorry to hear that. Please tell us how we can improve.