This is the multi-page printable view of this section. Click here to print.

Return to the regular view of this page.

Agent2Agent Protocol

Learn how to work with Agent2Agent (A2A) protocol components including messages, conversation contexts, artifacts, and task lifecycle management.

Agent2Agent Protocol How-To Guides

This section provides practical guides for working with the Agent2Agent (A2A) protocol in AgentHub. These guides show you how to implement A2A-compliant communication patterns for building robust agent systems.

Available Guides

Working with A2A Messages

Learn how to create, structure, and process A2A messages with text, data, and file content parts. This is the foundation for all A2A communication.

Working with A2A Conversation Context

Understand how to manage conversation contexts for multi-turn interactions, workflow coordination, and state preservation across agent communications.

Working with A2A Artifacts

Master the creation and handling of A2A artifacts - structured outputs that deliver rich results from completed tasks.

Working with A2A Task Lifecycle

Learn how to manage the complete task lifecycle from creation through completion, including state transitions, progress updates, and error handling.

A2A Protocol Benefits

The Agent2Agent protocol provides:

  • Structured Communication: Standardized message formats with rich content types
  • Conversation Threading: Context-aware message grouping for complex workflows
  • Rich Artifacts: Structured outputs with multiple content types
  • Lifecycle Management: Complete task state tracking from submission to completion
  • Interoperability: Standards-based communication for multi-vendor agent systems

Prerequisites

Before following these guides:

  1. Complete the Installation and Setup tutorial
  2. Run the AgentHub Demo to see A2A in action
  3. Understand the Agent2Agent Principle

Implementation Approach

These guides use AgentHub’s unified abstractions from internal/agenthub which provide:

  • A2ATaskPublisher: Simplified A2A task creation and publishing
  • A2ATaskSubscriber: Streamlined A2A task processing and response generation
  • Automatic Observability: Built-in tracing, metrics, and logging
  • Environment Configuration: Zero-config setup with environment variables

Start with the A2A Messages guide to learn the fundamentals, then progress through the other guides to build complete A2A-compliant agent systems.

1 - How to Work with A2A Messages

Learn how to create, structure, and work with Agent2Agent protocol messages including text, data, and file parts.

How to Work with A2A Messages

This guide shows you how to create and work with Agent2Agent (A2A) protocol messages using AgentHub’s unified abstractions. A2A messages are the foundation of all agent communication.

Understanding A2A Message Structure

A2A messages consist of several key components:

  • Message ID: Unique identifier for the message
  • Context ID: Groups related messages in a conversation
  • Task ID: Links the message to a specific task
  • Role: Indicates if the message is from USER (requester) or AGENT (responder)
  • Content Parts: The actual message content (text, data, or files)
  • Metadata: Additional context for routing and processing

Creating Basic A2A Messages

Text Messages

Create a simple text message:

package main

import (
    "context"
    "fmt"
    "log"

    "github.com/google/uuid"
    pb "github.com/owulveryck/agenthub/events/a2a"
    "google.golang.org/protobuf/types/known/timestamppb"
)

func createTextMessage() *pb.Message {
    return &pb.Message{
        MessageId: fmt.Sprintf("msg_%s", uuid.New().String()),
        ContextId: "conversation_greeting",
        Role:      pb.Role_USER,
        Content: []*pb.Part{
            {
                Part: &pb.Part_Text{
                    Text: "Hello! Please process this greeting request.",
                },
            },
        },
        Metadata: nil, // Optional
    }
}

Data Messages

Include structured data in your message:

import (
    "google.golang.org/protobuf/types/known/structpb"
)

func createDataMessage() *pb.Message {
    // Create structured data
    data, err := structpb.NewStruct(map[string]interface{}{
        "operation": "calculate",
        "numbers":   []float64{10, 20, 30},
        "formula":   "sum",
        "precision": 2,
    })
    if err != nil {
        log.Fatal(err)
    }

    return &pb.Message{
        MessageId: fmt.Sprintf("msg_%s", uuid.New().String()),
        ContextId: "conversation_math",
        Role:      pb.Role_USER,
        Content: []*pb.Part{
            {
                Part: &pb.Part_Text{
                    Text: "Please perform the calculation described in the data.",
                },
            },
            {
                Part: &pb.Part_Data{
                    Data: &pb.DataPart{
                        Data:        data,
                        Description: "Calculation parameters",
                    },
                },
            },
        },
    }
}

File Reference Messages

Reference files in your messages:

func createFileMessage() *pb.Message {
    // Create file metadata
    fileMetadata, _ := structpb.NewStruct(map[string]interface{}{
        "source":      "user_upload",
        "category":    "image",
        "permissions": "read-only",
    })

    return &pb.Message{
        MessageId: fmt.Sprintf("msg_%s", uuid.New().String()),
        ContextId: "conversation_image_analysis",
        Role:      pb.Role_USER,
        Content: []*pb.Part{
            {
                Part: &pb.Part_Text{
                    Text: "Please analyze the uploaded image.",
                },
            },
            {
                Part: &pb.Part_File{
                    File: &pb.FilePart{
                        FileId:   "file_abc123",
                        Filename: "analysis_target.jpg",
                        MimeType: "image/jpeg",
                        SizeBytes: 2048576, // 2MB
                        Metadata:  fileMetadata,
                    },
                },
            },
        },
    }
}

Working with Mixed Content

Combine multiple part types in a single message:

func createMixedContentMessage() *pb.Message {
    // Configuration data
    config, _ := structpb.NewStruct(map[string]interface{}{
        "format":     "json",
        "output_dir": "/results",
        "compress":   true,
    })

    return &pb.Message{
        MessageId: fmt.Sprintf("msg_%s", uuid.New().String()),
        ContextId: "conversation_data_processing",
        Role:      pb.Role_USER,
        Content: []*pb.Part{
            {
                Part: &pb.Part_Text{
                    Text: "Process the dataset with the following configuration and source file.",
                },
            },
            {
                Part: &pb.Part_Data{
                    Data: &pb.DataPart{
                        Data:        config,
                        Description: "Processing configuration",
                    },
                },
            },
            {
                Part: &pb.Part_File{
                    File: &pb.FilePart{
                        FileId:   "dataset_xyz789",
                        Filename: "raw_data.csv",
                        MimeType: "text/csv",
                        SizeBytes: 5242880, // 5MB
                    },
                },
            },
        },
    }
}

Publishing A2A Messages

Use AgentHub’s unified abstractions to publish messages:

package main

import (
    "context"
    "log"

    "github.com/owulveryck/agenthub/internal/agenthub"
    pb "github.com/owulveryck/agenthub/events/eventbus"
)

func publishA2AMessage(ctx context.Context) error {
    // Create AgentHub client
    config := agenthub.NewGRPCConfig("message_publisher")
    client, err := agenthub.NewAgentHubClient(config)
    if err != nil {
        return err
    }
    defer client.Close()

    // Create A2A message
    message := createTextMessage()

    // Publish using AgentHub client
    response, err := client.Client.PublishMessage(ctx, &pb.PublishMessageRequest{
        Message: message,
        Routing: &pb.AgentEventMetadata{
            FromAgentId: "message_publisher",
            ToAgentId:   "message_processor",
            EventType:   "a2a.message",
            Priority:    pb.Priority_PRIORITY_MEDIUM,
        },
    })

    if err != nil {
        return err
    }

    log.Printf("A2A message published: %s", response.GetEventId())
    return nil
}

Processing Received A2A Messages

Handle incoming A2A messages in your agent:

func processA2AMessage(ctx context.Context, message *pb.Message) (string, error) {
    var response string

    // Process each content part
    for i, part := range message.GetContent() {
        switch content := part.GetPart().(type) {
        case *pb.Part_Text:
            log.Printf("Text part %d: %s", i, content.Text)
            response += fmt.Sprintf("Processed text: %s\n", content.Text)

        case *pb.Part_Data:
            log.Printf("Data part %d: %s", i, content.Data.GetDescription())
            // Process structured data
            data := content.Data.GetData()
            response += fmt.Sprintf("Processed data: %s\n", content.Data.GetDescription())

            // Access specific fields
            if operation, ok := data.GetFields()["operation"]; ok {
                log.Printf("Operation: %s", operation.GetStringValue())
            }

        case *pb.Part_File:
            log.Printf("File part %d: %s (%s)", i, content.File.GetFilename(), content.File.GetMimeType())
            response += fmt.Sprintf("Processed file: %s\n", content.File.GetFilename())

            // Handle file processing based on MIME type
            switch content.File.GetMimeType() {
            case "image/jpeg", "image/png":
                // Process image
                response += "Image analysis completed\n"
            case "text/csv":
                // Process CSV data
                response += "CSV data parsed\n"
            }
        }
    }

    return response, nil
}

Message Role Management

Properly set message roles for A2A compliance:

// User message (requesting work)
func createUserMessage(content string) *pb.Message {
    return &pb.Message{
        MessageId: fmt.Sprintf("msg_%s", uuid.New().String()),
        Role:      pb.Role_USER,
        Content: []*pb.Part{
            {
                Part: &pb.Part_Text{Text: content},
            },
        },
    }
}

// Agent response message
func createAgentResponse(contextId, taskId, response string) *pb.Message {
    return &pb.Message{
        MessageId: fmt.Sprintf("msg_%s", uuid.New().String()),
        ContextId: contextId,
        TaskId:    taskId,
        Role:      pb.Role_AGENT,
        Content: []*pb.Part{
            {
                Part: &pb.Part_Text{Text: response},
            },
        },
    }
}

Message Validation

Validate A2A messages before publishing:

func validateA2AMessage(message *pb.Message) error {
    if message.GetMessageId() == "" {
        return fmt.Errorf("message_id is required")
    }

    if message.GetRole() == pb.Role_ROLE_UNSPECIFIED {
        return fmt.Errorf("role must be specified (USER or AGENT)")
    }

    if len(message.GetContent()) == 0 {
        return fmt.Errorf("message must have at least one content part")
    }

    // Validate each part
    for i, part := range message.GetContent() {
        if part.GetPart() == nil {
            return fmt.Errorf("content part %d is empty", i)
        }
    }

    return nil
}

Best Practices

1. Always Use Unique Message IDs

messageID := fmt.Sprintf("msg_%d_%s", time.Now().Unix(), uuid.New().String())
contextID := fmt.Sprintf("ctx_%s_%s", workflowType, uuid.New().String())

3. Include Descriptive Metadata for Complex Data

dataPart := &pb.DataPart{
    Data:        structData,
    Description: "User preferences for recommendation engine",
}

4. Validate Messages Before Publishing

if err := validateA2AMessage(message); err != nil {
    return fmt.Errorf("invalid A2A message: %w", err)
}

5. Handle All Part Types in Message Processors

switch content := part.GetPart().(type) {
case *pb.Part_Text:
    // Handle text
case *pb.Part_Data:
    // Handle structured data
case *pb.Part_File:
    // Handle file references
default:
    log.Printf("Unknown part type: %T", content)
}

This guide covered the fundamentals of working with A2A messages. Next, learn about A2A Conversation Context to group related messages and maintain conversation state across multiple interactions.

2 - How to Work with A2A Conversation Context

Learn how to manage conversation contexts in Agent2Agent protocol for multi-turn interactions and workflow coordination.

How to Work with A2A Conversation Context

This guide shows you how to use A2A conversation contexts to group related messages, maintain state across interactions, and coordinate multi-agent workflows.

Understanding A2A Conversation Context

A2A conversation context is identified by a context_id that groups related messages and tasks. This enables:

  • Multi-turn conversations between agents
  • Workflow coordination across multiple tasks
  • State preservation throughout long-running processes
  • Message threading for audit trails
  • Context-aware routing based on conversation history

Creating Conversation Contexts

Simple Conversation Context

Start a basic conversation context:

package main

import (
    "fmt"
    "github.com/google/uuid"
    pb "github.com/owulveryck/agenthub/events/a2a"
)

func createConversationContext(workflowType string) string {
    return fmt.Sprintf("ctx_%s_%s", workflowType, uuid.New().String())
}

func startConversation() *pb.Message {
    contextID := createConversationContext("user_onboarding")

    return &pb.Message{
        MessageId: fmt.Sprintf("msg_%s", uuid.New().String()),
        ContextId: contextID,
        Role:      pb.Role_USER,
        Content: []*pb.Part{
            {
                Part: &pb.Part_Text{
                    Text: "Please start the user onboarding process for new user.",
                },
            },
        },
    }
}

Workflow-Specific Contexts

Create contexts for different workflow types:

func createWorkflowContexts() map[string]string {
    return map[string]string{
        "data_analysis":    createConversationContext("data_analysis"),
        "image_processing": createConversationContext("image_processing"),
        "user_support":     createConversationContext("user_support"),
        "integration_test": createConversationContext("integration_test"),
    }
}

Multi-Turn Conversations

Conversation Initiation

Start a conversation with initial context:

import (
    "google.golang.org/protobuf/types/known/structpb"
)

func initiateDataAnalysisConversation() *pb.Message {
    contextID := createConversationContext("data_analysis")

    // Initial conversation metadata
    contextMetadata, _ := structpb.NewStruct(map[string]interface{}{
        "workflow_type":    "data_analysis",
        "initiated_by":     "user_12345",
        "priority":         "high",
        "expected_steps":   []string{"validation", "processing", "analysis", "report"},
        "timeout_minutes":  30,
    })

    return &pb.Message{
        MessageId: fmt.Sprintf("msg_%s", uuid.New().String()),
        ContextId: contextID,
        Role:      pb.Role_USER,
        Content: []*pb.Part{
            {
                Part: &pb.Part_Text{
                    Text: "Please analyze the uploaded dataset and provide insights.",
                },
            },
            {
                Part: &pb.Part_Data{
                    Data: &pb.DataPart{
                        Data:        contextMetadata,
                        Description: "Conversation context and workflow parameters",
                    },
                },
            },
        },
        Metadata: contextMetadata,
    }
}

Continuing the Conversation

Add follow-up messages to the same context:

func continueConversation(contextID, previousMessageID string) *pb.Message {
    return &pb.Message{
        MessageId: fmt.Sprintf("msg_%s", uuid.New().String()),
        ContextId: contextID, // Same context as initial message
        Role:      pb.Role_USER,
        Content: []*pb.Part{
            {
                Part: &pb.Part_Text{
                    Text: "Can you also include trend analysis in the report?",
                },
            },
        },
        Metadata: &structpb.Struct{
            Fields: map[string]*structpb.Value{
                "follows_message": structpb.NewStringValue(previousMessageID),
                "conversation_turn": structpb.NewNumberValue(2),
            },
        },
    }
}

Agent Responses in Context

Agents respond within the same conversation context:

func createAgentResponse(contextID, requestMessageID, response string) *pb.Message {
    return &pb.Message{
        MessageId: fmt.Sprintf("msg_%s", uuid.New().String()),
        ContextId: contextID, // Same context as request
        Role:      pb.Role_AGENT,
        Content: []*pb.Part{
            {
                Part: &pb.Part_Text{
                    Text: response,
                },
            },
        },
        Metadata: &structpb.Struct{
            Fields: map[string]*structpb.Value{
                "responding_to": structpb.NewStringValue(requestMessageID),
                "agent_id":      structpb.NewStringValue("data_analysis_agent"),
            },
        },
    }
}

Context-Aware Task Management

Creating Tasks with Context

Link tasks to conversation contexts:

import (
    "google.golang.org/protobuf/types/known/timestamppb"
)

func createContextAwareTask(contextID string) *pb.Task {
    taskID := fmt.Sprintf("task_%s_%s", "analysis", uuid.New().String())

    return &pb.Task{
        Id:        taskID,
        ContextId: contextID, // Link to conversation
        Status: &pb.TaskStatus{
            State: pb.TaskState_TASK_STATE_SUBMITTED,
            Update: &pb.Message{
                MessageId: fmt.Sprintf("msg_%s", uuid.New().String()),
                ContextId: contextID,
                TaskId:    taskID,
                Role:      pb.Role_USER,
                Content: []*pb.Part{
                    {
                        Part: &pb.Part_Text{
                            Text: "Task submitted for data analysis workflow",
                        },
                    },
                },
            },
            Timestamp: timestamppb.Now(),
        },
        History: []*pb.Message{}, // Will be populated during processing
        Artifacts: []*pb.Artifact{}, // Will be populated on completion
    }
}

Context-Based Task Querying

Retrieve all tasks for a conversation context:

func getTasksForContext(ctx context.Context, client pb.AgentHubClient, contextID string) ([]*pb.Task, error) {
    response, err := client.ListTasks(ctx, &pb.ListTasksRequest{
        ContextId: contextID,
        Limit:     100,
    })
    if err != nil {
        return nil, err
    }

    return response.GetTasks(), nil
}

Workflow Coordination

Multi-Agent Workflow with Shared Context

Coordinate multiple agents within a single conversation:

type WorkflowCoordinator struct {
    client    pb.AgentHubClient
    contextID string
    logger    *log.Logger
}

func (wc *WorkflowCoordinator) ExecuteDataPipeline(ctx context.Context) error {
    // Step 1: Data Validation
    validationTask := &pb.Task{
        Id:        fmt.Sprintf("task_validation_%s", uuid.New().String()),
        ContextId: wc.contextID,
        Status: &pb.TaskStatus{
            State: pb.TaskState_TASK_STATE_SUBMITTED,
            Update: &pb.Message{
                MessageId: fmt.Sprintf("msg_%s", uuid.New().String()),
                ContextId: wc.contextID,
                Role:      pb.Role_USER,
                Content: []*pb.Part{
                    {
                        Part: &pb.Part_Text{
                            Text: "Validate uploaded dataset for quality and completeness",
                        },
                    },
                },
            },
            Timestamp: timestamppb.Now(),
        },
    }

    // Publish validation task
    _, err := wc.client.PublishTaskUpdate(ctx, &pb.PublishTaskUpdateRequest{
        Task: validationTask,
        Routing: &pb.AgentEventMetadata{
            FromAgentId: "workflow_coordinator",
            ToAgentId:   "data_validator",
            EventType:   "task.validation",
            Priority:    pb.Priority_PRIORITY_HIGH,
        },
    })
    if err != nil {
        return err
    }

    // Step 2: Data Processing (after validation)
    processingTask := &pb.Task{
        Id:        fmt.Sprintf("task_processing_%s", uuid.New().String()),
        ContextId: wc.contextID, // Same context
        Status: &pb.TaskStatus{
            State: pb.TaskState_TASK_STATE_SUBMITTED,
            Update: &pb.Message{
                MessageId: fmt.Sprintf("msg_%s", uuid.New().String()),
                ContextId: wc.contextID,
                Role:      pb.Role_USER,
                Content: []*pb.Part{
                    {
                        Part: &pb.Part_Text{
                            Text: "Process validated dataset and extract features",
                        },
                    },
                },
                Metadata: &structpb.Struct{
                    Fields: map[string]*structpb.Value{
                        "depends_on": structpb.NewStringValue(validationTask.GetId()),
                        "workflow_step": structpb.NewNumberValue(2),
                    },
                },
            },
            Timestamp: timestamppb.Now(),
        },
    }

    // Publish processing task
    _, err = wc.client.PublishTaskUpdate(ctx, &pb.PublishTaskUpdateRequest{
        Task: processingTask,
        Routing: &pb.AgentEventMetadata{
            FromAgentId: "workflow_coordinator",
            ToAgentId:   "data_processor",
            EventType:   "task.processing",
            Priority:    pb.Priority_PRIORITY_MEDIUM,
        },
    })

    return err
}

Context State Management

Tracking Conversation State

Maintain state throughout the conversation:

type ConversationState struct {
    ContextID     string                 `json:"context_id"`
    WorkflowType  string                 `json:"workflow_type"`
    CurrentStep   int                    `json:"current_step"`
    TotalSteps    int                    `json:"total_steps"`
    CompletedTasks []string              `json:"completed_tasks"`
    PendingTasks   []string              `json:"pending_tasks"`
    Variables      map[string]interface{} `json:"variables"`
    CreatedAt      time.Time             `json:"created_at"`
    UpdatedAt      time.Time             `json:"updated_at"`
}

func (cs *ConversationState) ToMetadata() (*structpb.Struct, error) {
    data := map[string]interface{}{
        "context_id":      cs.ContextID,
        "workflow_type":   cs.WorkflowType,
        "current_step":    cs.CurrentStep,
        "total_steps":     cs.TotalSteps,
        "completed_tasks": cs.CompletedTasks,
        "pending_tasks":   cs.PendingTasks,
        "variables":       cs.Variables,
        "updated_at":      cs.UpdatedAt.Format(time.RFC3339),
    }

    return structpb.NewStruct(data)
}

func (cs *ConversationState) UpdateFromMessage(message *pb.Message) {
    cs.UpdatedAt = time.Now()

    // Extract state updates from message metadata
    if metadata := message.GetMetadata(); metadata != nil {
        if step, ok := metadata.GetFields()["current_step"]; ok {
            cs.CurrentStep = int(step.GetNumberValue())
        }

        if vars, ok := metadata.GetFields()["variables"]; ok {
            if varsStruct := vars.GetStructValue(); varsStruct != nil {
                for key, value := range varsStruct.GetFields() {
                    cs.Variables[key] = value
                }
            }
        }
    }
}

State-Aware Message Creation

Include conversation state in messages:

func createStateAwareMessage(contextID string, state *ConversationState, content string) *pb.Message {
    stateMetadata, _ := state.ToMetadata()

    return &pb.Message{
        MessageId: fmt.Sprintf("msg_%s", uuid.New().String()),
        ContextId: contextID,
        Role:      pb.Role_USER,
        Content: []*pb.Part{
            {
                Part: &pb.Part_Text{
                    Text: content,
                },
            },
            {
                Part: &pb.Part_Data{
                    Data: &pb.DataPart{
                        Data:        stateMetadata,
                        Description: "Current conversation state",
                    },
                },
            },
        },
        Metadata: stateMetadata,
    }
}

Context-Based Routing

Route Messages Based on Context

Use conversation context for intelligent routing:

func routeByContext(contextID string) *pb.AgentEventMetadata {
    // Determine routing based on context type
    var targetAgent string
    var eventType string

    if strings.Contains(contextID, "data_analysis") {
        targetAgent = "data_analysis_agent"
        eventType = "data.analysis"
    } else if strings.Contains(contextID, "image_processing") {
        targetAgent = "image_processor"
        eventType = "image.processing"
    } else if strings.Contains(contextID, "user_support") {
        targetAgent = "support_agent"
        eventType = "support.request"
    } else {
        targetAgent = "" // Broadcast to all agents
        eventType = "general.message"
    }

    return &pb.AgentEventMetadata{
        FromAgentId:   "context_router",
        ToAgentId:     targetAgent,
        EventType:     eventType,
        Subscriptions: []string{eventType},
        Priority:      pb.Priority_PRIORITY_MEDIUM,
    }
}

Subscribe to Context-Specific Events

Agents can subscribe to specific conversation contexts:

func subscribeToContextEvents(ctx context.Context, client pb.AgentHubClient, agentID, contextPattern string) error {
    stream, err := client.SubscribeToMessages(ctx, &pb.SubscribeToMessagesRequest{
        AgentId: agentID,
        ContextPattern: contextPattern, // e.g., "ctx_data_analysis_*"
    })
    if err != nil {
        return err
    }

    for {
        event, err := stream.Recv()
        if err != nil {
            return err
        }

        if message := event.GetMessage(); message != nil {
            log.Printf("Received context message: %s in context: %s",
                message.GetMessageId(), message.GetContextId())

            // Process message within context
            processContextMessage(ctx, message)
        }
    }
}

Best Practices

1. Use Descriptive Context IDs

contextID := fmt.Sprintf("ctx_%s_%s_%s", workflowType, userID, uuid.New().String())
// All messages in the same workflow should use the same context_id
message.ContextId = existingContextID

3. Include Context Metadata for State Tracking

contextMetadata := map[string]interface{}{
    "workflow_type":   "data_pipeline",
    "initiated_by":    userID,
    "current_step":    stepNumber,
    "total_steps":     totalSteps,
}

4. Use Context for Task Dependencies

taskMetadata := map[string]interface{}{
    "context_id":     contextID,
    "depends_on":     previousTaskID,
    "workflow_step":  stepNumber,
}

5. Handle Context Cleanup

// Set context expiration for long-running workflows
contextMetadata["expires_at"] = time.Now().Add(24 * time.Hour).Format(time.RFC3339)

This guide covered conversation context management in A2A protocol. Next, learn about Working with A2A Artifacts to understand how to create and manage structured outputs from completed tasks.

3 - How to Work with A2A Artifacts

Learn how to create, structure, and deliver Agent2Agent protocol artifacts as structured outputs from completed tasks.

How to Work with A2A Artifacts

This guide shows you how to create and work with Agent2Agent (A2A) protocol artifacts, which are structured outputs delivered when tasks are completed. Artifacts provide rich, typed results that can include text reports, data files, structured data, and more.

Understanding A2A Artifacts

A2A artifacts are structured containers for task outputs that include:

  • Artifact ID: Unique identifier for the artifact
  • Name: Human-readable name for the artifact
  • Description: Explanation of what the artifact contains
  • Parts: The actual content (text, data, files)
  • Metadata: Additional context about the artifact

Artifacts are typically generated when tasks reach TASK_STATE_COMPLETED status.

Creating Basic Artifacts

Text Report Artifacts

Create simple text-based results:

package main

import (
    "fmt"
    "github.com/google/uuid"
    pb "github.com/owulveryck/agenthub/events/a2a"
    "google.golang.org/protobuf/types/known/structpb"
)

func createTextReportArtifact(taskID, reportContent string) *pb.Artifact {
    return &pb.Artifact{
        ArtifactId:  fmt.Sprintf("artifact_%s_%s", taskID, uuid.New().String()),
        Name:        "Analysis Report",
        Description: "Detailed analysis results and recommendations",
        Parts: []*pb.Part{
            {
                Part: &pb.Part_Text{
                    Text: reportContent,
                },
            },
        },
        Metadata: &structpb.Struct{
            Fields: map[string]*structpb.Value{
                "artifact_type": structpb.NewStringValue("report"),
                "format":        structpb.NewStringValue("text"),
                "task_id":       structpb.NewStringValue(taskID),
                "generated_at":  structpb.NewStringValue(time.Now().Format(time.RFC3339)),
            },
        },
    }
}

Data Analysis Artifacts

Create artifacts with structured analysis results:

func createDataAnalysisArtifact(taskID string, results map[string]interface{}) *pb.Artifact {
    // Convert results to structured data
    resultsData, err := structpb.NewStruct(results)
    if err != nil {
        log.Printf("Error creating results data: %v", err)
        resultsData = &structpb.Struct{}
    }

    // Create summary statistics
    summary := map[string]interface{}{
        "total_records":    results["record_count"],
        "processing_time":  results["duration_ms"],
        "success_rate":     results["success_percentage"],
        "anomalies_found":  results["anomaly_count"],
    }
    summaryData, _ := structpb.NewStruct(summary)

    return &pb.Artifact{
        ArtifactId:  fmt.Sprintf("artifact_analysis_%s", uuid.New().String()),
        Name:        "Data Analysis Results",
        Description: "Complete analysis results with statistics and insights",
        Parts: []*pb.Part{
            {
                Part: &pb.Part_Text{
                    Text: "Data analysis completed successfully. See attached results for detailed findings.",
                },
            },
            {
                Part: &pb.Part_Data{
                    Data: &pb.DataPart{
                        Data:        resultsData,
                        Description: "Complete analysis results",
                    },
                },
            },
            {
                Part: &pb.Part_Data{
                    Data: &pb.DataPart{
                        Data:        summaryData,
                        Description: "Summary statistics",
                    },
                },
            },
        },
        Metadata: &structpb.Struct{
            Fields: map[string]*structpb.Value{
                "artifact_type":   structpb.NewStringValue("analysis"),
                "analysis_type":   structpb.NewStringValue("statistical"),
                "data_source":     structpb.NewStringValue(results["source"].(string)),
                "record_count":    structpb.NewNumberValue(results["record_count"].(float64)),
                "processing_time": structpb.NewNumberValue(results["duration_ms"].(float64)),
            },
        },
    }
}

File-Based Artifacts

Create artifacts that reference generated files:

func createFileArtifact(taskID, fileID, filename, mimeType string, sizeBytes int64) *pb.Artifact {
    // File metadata
    fileMetadata, _ := structpb.NewStruct(map[string]interface{}{
        "generated_by":   "data_processor_v1.2",
        "file_version":   "1.0",
        "encoding":       "utf-8",
        "compression":    "gzip",
        "checksum_sha256": "abc123...", // Calculate actual checksum
    })

    return &pb.Artifact{
        ArtifactId:  fmt.Sprintf("artifact_file_%s", uuid.New().String()),
        Name:        "Processed Dataset",
        Description: "Cleaned and processed dataset ready for analysis",
        Parts: []*pb.Part{
            {
                Part: &pb.Part_Text{
                    Text: fmt.Sprintf("Dataset processing completed. Generated file: %s", filename),
                },
            },
            {
                Part: &pb.Part_File{
                    File: &pb.FilePart{
                        FileId:   fileID,
                        Filename: filename,
                        MimeType: mimeType,
                        SizeBytes: sizeBytes,
                        Metadata:  fileMetadata,
                    },
                },
            },
        },
        Metadata: &structpb.Struct{
            Fields: map[string]*structpb.Value{
                "artifact_type":    structpb.NewStringValue("file"),
                "file_type":        structpb.NewStringValue("dataset"),
                "original_task":    structpb.NewStringValue(taskID),
                "processing_stage": structpb.NewStringValue("cleaned"),
            },
        },
    }
}

Complex Multi-Part Artifacts

Complete Analysis Package

Create comprehensive artifacts with multiple content types:

func createCompleteAnalysisArtifact(taskID string, analysisResults map[string]interface{}) *pb.Artifact {
    // Executive summary
    summary := fmt.Sprintf(`
Analysis Complete: %s

Key Findings:
- Processed %v records
- Found %v anomalies
- Success rate: %v%%
- Processing time: %v ms

Recommendations:
%s
`,
        analysisResults["dataset_name"],
        analysisResults["record_count"],
        analysisResults["anomaly_count"],
        analysisResults["success_percentage"],
        analysisResults["duration_ms"],
        analysisResults["recommendations"],
    )

    // Detailed results data
    detailedResults, _ := structpb.NewStruct(analysisResults)

    // Configuration used
    configData, _ := structpb.NewStruct(map[string]interface{}{
        "algorithm":         "statistical_analysis_v2",
        "confidence_level":  0.95,
        "outlier_threshold": 2.5,
        "normalization":     "z-score",
    })

    return &pb.Artifact{
        ArtifactId:  fmt.Sprintf("artifact_complete_%s", uuid.New().String()),
        Name:        "Complete Analysis Package",
        Description: "Full analysis results including summary, data, configuration, and generated files",
        Parts: []*pb.Part{
            {
                Part: &pb.Part_Text{
                    Text: summary,
                },
            },
            {
                Part: &pb.Part_Data{
                    Data: &pb.DataPart{
                        Data:        detailedResults,
                        Description: "Detailed analysis results and metrics",
                    },
                },
            },
            {
                Part: &pb.Part_Data{
                    Data: &pb.DataPart{
                        Data:        configData,
                        Description: "Analysis configuration parameters",
                    },
                },
            },
            {
                Part: &pb.Part_File{
                    File: &pb.FilePart{
                        FileId:   "results_visualization_123",
                        Filename: "analysis_charts.png",
                        MimeType: "image/png",
                        SizeBytes: 1024000,
                    },
                },
            },
            {
                Part: &pb.Part_File{
                    File: &pb.FilePart{
                        FileId:   "results_dataset_456",
                        Filename: "processed_data.csv",
                        MimeType: "text/csv",
                        SizeBytes: 5120000,
                    },
                },
            },
        },
        Metadata: &structpb.Struct{
            Fields: map[string]*structpb.Value{
                "artifact_type":     structpb.NewStringValue("complete_package"),
                "analysis_type":     structpb.NewStringValue("comprehensive"),
                "includes_files":    structpb.NewBoolValue(true),
                "includes_data":     structpb.NewBoolValue(true),
                "includes_summary":  structpb.NewBoolValue(true),
                "file_count":        structpb.NewNumberValue(2),
                "total_size_bytes":  structpb.NewNumberValue(6144000),
            },
        },
    }
}

Publishing Artifacts

Using A2A Task Completion

Publish artifacts when completing tasks:

import (
    "context"
    "github.com/owulveryck/agenthub/internal/agenthub"
    eventbus "github.com/owulveryck/agenthub/events/eventbus"
)

func completeTaskWithArtifact(ctx context.Context, client eventbus.AgentHubClient, task *pb.Task, artifact *pb.Artifact) error {
    // Update task status to completed
    task.Status = &pb.TaskStatus{
        State: pb.TaskState_TASK_STATE_COMPLETED,
        Update: &pb.Message{
            MessageId: fmt.Sprintf("msg_completion_%s", uuid.New().String()),
            ContextId: task.GetContextId(),
            TaskId:    task.GetId(),
            Role:      pb.Role_AGENT,
            Content: []*pb.Part{
                {
                    Part: &pb.Part_Text{
                        Text: "Task completed successfully. Artifact has been generated.",
                    },
                },
            },
        },
        Timestamp: timestamppb.Now(),
    }

    // Add artifact to task
    task.Artifacts = append(task.Artifacts, artifact)

    // Publish task completion
    _, err := client.PublishTaskUpdate(ctx, &eventbus.PublishTaskUpdateRequest{
        Task: task,
        Routing: &eventbus.AgentEventMetadata{
            FromAgentId: "processing_agent",
            ToAgentId:   "", // Broadcast completion
            EventType:   "task.completed",
            Priority:    eventbus.Priority_PRIORITY_MEDIUM,
        },
    })

    if err != nil {
        return fmt.Errorf("failed to publish task completion: %w", err)
    }

    // Separately publish artifact update
    return publishArtifactUpdate(ctx, client, task.GetId(), artifact)
}

func publishArtifactUpdate(ctx context.Context, client eventbus.AgentHubClient, taskID string, artifact *pb.Artifact) error {
    _, err := client.PublishTaskArtifact(ctx, &eventbus.PublishTaskArtifactRequest{
        TaskId:   taskID,
        Artifact: artifact,
        Routing: &eventbus.AgentEventMetadata{
            FromAgentId: "processing_agent",
            ToAgentId:   "", // Broadcast to interested parties
            EventType:   "artifact.created",
            Priority:    eventbus.Priority_PRIORITY_LOW,
        },
    })

    return err
}

Using A2A Abstractions

Use AgentHub’s simplified artifact publishing:

func completeTaskWithA2AArtifact(ctx context.Context, subscriber *agenthub.A2ATaskSubscriber, task *pb.Task, artifact *pb.Artifact) error {
    return subscriber.CompleteA2ATaskWithArtifact(ctx, task, artifact)
}

Processing Received Artifacts

Artifact Event Handling

Handle incoming artifact notifications:

func handleArtifactEvents(ctx context.Context, client eventbus.AgentHubClient, agentID string) error {
    stream, err := client.SubscribeToAgentEvents(ctx, &eventbus.SubscribeToAgentEventsRequest{
        AgentId: agentID,
        EventTypes: []string{"artifact.created", "task.completed"},
    })
    if err != nil {
        return err
    }

    for {
        event, err := stream.Recv()
        if err != nil {
            return err
        }

        switch payload := event.GetPayload().(type) {
        case *eventbus.AgentEvent_ArtifactUpdate:
            artifactEvent := payload.ArtifactUpdate
            log.Printf("Received artifact: %s for task: %s",
                artifactEvent.GetArtifact().GetArtifactId(),
                artifactEvent.GetTaskId())

            // Process the artifact
            err := processArtifact(ctx, artifactEvent.GetArtifact())
            if err != nil {
                log.Printf("Error processing artifact: %v", err)
            }

        case *eventbus.AgentEvent_Task:
            task := payload.Task
            if task.GetStatus().GetState() == pb.TaskState_TASK_STATE_COMPLETED {
                // Process completed task artifacts
                for _, artifact := range task.GetArtifacts() {
                    err := processArtifact(ctx, artifact)
                    if err != nil {
                        log.Printf("Error processing task artifact: %v", err)
                    }
                }
            }
        }
    }
}

Artifact Content Processing

Process different types of artifact content:

func processArtifact(ctx context.Context, artifact *pb.Artifact) error {
    log.Printf("Processing artifact: %s - %s", artifact.GetName(), artifact.GetDescription())

    for i, part := range artifact.GetParts() {
        switch content := part.GetPart().(type) {
        case *pb.Part_Text:
            log.Printf("Text part %d: Processing text content (%d chars)", i, len(content.Text))
            // Process text content
            err := processTextArtifact(content.Text)
            if err != nil {
                return fmt.Errorf("failed to process text part: %w", err)
            }

        case *pb.Part_Data:
            log.Printf("Data part %d: Processing structured data (%s)", i, content.Data.GetDescription())
            // Process structured data
            err := processDataArtifact(content.Data.GetData())
            if err != nil {
                return fmt.Errorf("failed to process data part: %w", err)
            }

        case *pb.Part_File:
            log.Printf("File part %d: Processing file %s (%s, %d bytes)",
                i, content.File.GetFilename(), content.File.GetMimeType(), content.File.GetSizeBytes())
            // Process file reference
            err := processFileArtifact(ctx, content.File)
            if err != nil {
                return fmt.Errorf("failed to process file part: %w", err)
            }
        }
    }

    return nil
}

func processTextArtifact(text string) error {
    // Extract insights, save to database, etc.
    log.Printf("Extracting insights from text artifact...")
    return nil
}

func processDataArtifact(data *structpb.Struct) error {
    // Parse structured data, update metrics, etc.
    log.Printf("Processing structured data artifact...")

    // Access specific fields
    if recordCount, ok := data.GetFields()["record_count"]; ok {
        log.Printf("Records processed: %v", recordCount.GetNumberValue())
    }

    return nil
}

func processFileArtifact(ctx context.Context, file *pb.FilePart) error {
    // Download file, process content, etc.
    log.Printf("Processing file artifact: %s", file.GetFileId())

    // Handle different file types
    switch file.GetMimeType() {
    case "text/csv":
        return processCSVFile(ctx, file.GetFileId())
    case "image/png", "image/jpeg":
        return processImageFile(ctx, file.GetFileId())
    case "application/json":
        return processJSONFile(ctx, file.GetFileId())
    default:
        log.Printf("Unknown file type: %s", file.GetMimeType())
    }

    return nil
}

Artifact Chaining

Using Artifacts as Inputs

Use artifacts from one task as inputs to another:

func chainArtifactProcessing(ctx context.Context, client eventbus.AgentHubClient, inputArtifact *pb.Artifact) error {
    // Create a new task using the artifact as input
    contextID := fmt.Sprintf("ctx_chained_%s", uuid.New().String())

    chainedTask := &pb.Task{
        Id:        fmt.Sprintf("task_chained_%s", uuid.New().String()),
        ContextId: contextID,
        Status: &pb.TaskStatus{
            State: pb.TaskState_TASK_STATE_SUBMITTED,
            Update: &pb.Message{
                MessageId: fmt.Sprintf("msg_%s", uuid.New().String()),
                ContextId: contextID,
                Role:      pb.Role_USER,
                Content: []*pb.Part{
                    {
                        Part: &pb.Part_Text{
                            Text: "Please process the results from the previous analysis task.",
                        },
                    },
                    {
                        Part: &pb.Part_Data{
                            Data: &pb.DataPart{
                                Data: &structpb.Struct{
                                    Fields: map[string]*structpb.Value{
                                        "input_artifact_id": structpb.NewStringValue(inputArtifact.GetArtifactId()),
                                        "processing_type":   structpb.NewStringValue("enhancement"),
                                    },
                                },
                                Description: "Processing parameters with input artifact reference",
                            },
                        },
                    },
                },
            },
            Timestamp: timestamppb.Now(),
        },
    }

    // Publish the chained task
    _, err := client.PublishTaskUpdate(ctx, &eventbus.PublishTaskUpdateRequest{
        Task: chainedTask,
        Routing: &eventbus.AgentEventMetadata{
            FromAgentId: "workflow_coordinator",
            ToAgentId:   "enhancement_processor",
            EventType:   "task.chained",
            Priority:    eventbus.Priority_PRIORITY_MEDIUM,
        },
    })

    return err
}

Best Practices

1. Use Descriptive Artifact Names and Descriptions

artifact := &pb.Artifact{
    Name:        "Customer Segmentation Analysis Results",
    Description: "Complete customer segmentation with demographics, behavior patterns, and actionable insights",
    // ...
}

2. Include Rich Metadata for Discovery

metadata := map[string]interface{}{
    "artifact_type":    "analysis",
    "domain":          "customer_analytics",
    "data_source":     "customer_transactions_2024",
    "algorithm":       "k_means_clustering",
    "confidence":      0.94,
    "generated_by":    "analytics_engine_v2.1",
    "valid_until":     time.Now().Add(30*24*time.Hour).Format(time.RFC3339),
}

3. Structure Multi-Part Artifacts Logically

// Order parts from most important to least important
parts := []*pb.Part{
    textSummaryPart,      // Human-readable summary first
    structuredDataPart,   // Machine-readable data second
    configurationPart,    // Configuration details third
    fileReferencePart,    // File references last
}

4. Validate Artifacts Before Publishing

func validateArtifact(artifact *pb.Artifact) error {
    if artifact.GetArtifactId() == "" {
        return fmt.Errorf("artifact_id is required")
    }
    if len(artifact.GetParts()) == 0 {
        return fmt.Errorf("artifact must have at least one part")
    }
    return nil
}

5. Handle Large Artifacts Appropriately

// For large data, use file references instead of inline data
if len(dataBytes) > 1024*1024 { // 1MB threshold
    // Save to file storage and reference
    fileID := saveToFileStorage(dataBytes)
    part = createFileReferencePart(fileID, filename, mimeType)
} else {
    // Include data inline
    part = createInlineDataPart(data)
}

This guide covered creating and working with A2A artifacts. Next, learn about A2A Task Lifecycle Management to understand how to properly manage task states and coordinate complex workflows.

4 - How to Work with A2A Task Lifecycle

Learn how to manage Agent2Agent protocol task states, handle lifecycle transitions, and coordinate complex task workflows.

How to Work with A2A Task Lifecycle

This guide shows you how to manage the complete lifecycle of Agent2Agent (A2A) protocol tasks, from creation through completion. Understanding task states and transitions is essential for building reliable agent workflows.

Understanding A2A Task States

A2A tasks progress through the following states:

  • TASK_STATE_SUBMITTED: Task created and submitted for processing
  • TASK_STATE_WORKING: Task accepted and currently being processed
  • TASK_STATE_COMPLETED: Task finished successfully with results
  • TASK_STATE_FAILED: Task failed with error information
  • TASK_STATE_CANCELLED: Task cancelled before completion

Each state transition is recorded with a timestamp and status message.

Creating A2A Tasks

Basic Task Creation

Create a new task with initial state:

package main

import (
    "fmt"
    "github.com/google/uuid"
    pb "github.com/owulveryck/agenthub/events/a2a"
    "google.golang.org/protobuf/types/known/timestamppb"
    "google.golang.org/protobuf/types/known/structpb"
)

func createA2ATask(contextID, taskType string, content []*pb.Part) *pb.Task {
    taskID := fmt.Sprintf("task_%s_%s", taskType, uuid.New().String())
    messageID := fmt.Sprintf("msg_%s", uuid.New().String())

    return &pb.Task{
        Id:        taskID,
        ContextId: contextID,
        Status: &pb.TaskStatus{
            State: pb.TaskState_TASK_STATE_SUBMITTED,
            Update: &pb.Message{
                MessageId: messageID,
                ContextId: contextID,
                TaskId:    taskID,
                Role:      pb.Role_USER,
                Content:   content,
                Metadata: &structpb.Struct{
                    Fields: map[string]*structpb.Value{
                        "task_type":      structpb.NewStringValue(taskType),
                        "submitted_by":   structpb.NewStringValue("user_agent"),
                        "priority":       structpb.NewStringValue("medium"),
                    },
                },
            },
            Timestamp: timestamppb.Now(),
        },
        History:   []*pb.Message{},
        Artifacts: []*pb.Artifact{},
        Metadata: &structpb.Struct{
            Fields: map[string]*structpb.Value{
                "task_type":    structpb.NewStringValue(taskType),
                "created_at":   structpb.NewStringValue(time.Now().Format(time.RFC3339)),
                "expected_duration": structpb.NewStringValue("5m"),
            },
        },
    }
}

Task with Complex Requirements

Create tasks with detailed specifications:

func createComplexAnalysisTask(contextID string) *pb.Task {
    // Task configuration
    taskConfig, _ := structpb.NewStruct(map[string]interface{}{
        "algorithm":         "advanced_ml_analysis",
        "confidence_level":  0.95,
        "max_processing_time": "30m",
        "output_formats":    []string{"json", "csv", "visualization"},
        "quality_threshold": 0.9,
    })

    // Input data specification
    inputSpec, _ := structpb.NewStruct(map[string]interface{}{
        "dataset_id":       "customer_data_2024",
        "required_fields":  []string{"customer_id", "transaction_amount", "timestamp"},
        "date_range":       map[string]string{"start": "2024-01-01", "end": "2024-12-31"},
        "preprocessing":    true,
    })

    content := []*pb.Part{
        {
            Part: &pb.Part_Text{
                Text: "Perform comprehensive customer behavior analysis on the specified dataset with advanced ML algorithms.",
            },
        },
        {
            Part: &pb.Part_Data{
                Data: &pb.DataPart{
                    Data:        taskConfig,
                    Description: "Analysis configuration parameters",
                },
            },
        },
        {
            Part: &pb.Part_Data{
                Data: &pb.DataPart{
                    Data:        inputSpec,
                    Description: "Input dataset specification",
                },
            },
        },
    }

    task := createA2ATask(contextID, "customer_analysis", content)

    // Add complex task metadata
    task.Metadata = &structpb.Struct{
        Fields: map[string]*structpb.Value{
            "task_type":           structpb.NewStringValue("customer_analysis"),
            "complexity":          structpb.NewStringValue("high"),
            "estimated_duration":  structpb.NewStringValue("30m"),
            "required_resources":  structpb.NewListValue(&structpb.ListValue{
                Values: []*structpb.Value{
                    structpb.NewStringValue("gpu_compute"),
                    structpb.NewStringValue("large_memory"),
                },
            }),
            "deliverables":        structpb.NewListValue(&structpb.ListValue{
                Values: []*structpb.Value{
                    structpb.NewStringValue("analysis_report"),
                    structpb.NewStringValue("customer_segments"),
                    structpb.NewStringValue("predictions"),
                },
            }),
        },
    }

    return task
}

Task State Transitions

Accepting a Task (SUBMITTED → WORKING)

When an agent accepts a task:

func acceptTask(task *pb.Task, agentID string) *pb.Task {
    // Create acceptance message
    acceptanceMessage := &pb.Message{
        MessageId: fmt.Sprintf("msg_accept_%s", uuid.New().String()),
        ContextId: task.GetContextId(),
        TaskId:    task.GetId(),
        Role:      pb.Role_AGENT,
        Content: []*pb.Part{
            {
                Part: &pb.Part_Text{
                    Text: fmt.Sprintf("Task accepted by agent %s. Beginning processing.", agentID),
                },
            },
        },
        Metadata: &structpb.Struct{
            Fields: map[string]*structpb.Value{
                "accepting_agent": structpb.NewStringValue(agentID),
                "estimated_completion": structpb.NewStringValue(
                    time.Now().Add(15*time.Minute).Format(time.RFC3339),
                ),
            },
        },
    }

    // Update task status
    task.Status = &pb.TaskStatus{
        State:     pb.TaskState_TASK_STATE_WORKING,
        Update:    acceptanceMessage,
        Timestamp: timestamppb.Now(),
    }

    // Add to history
    task.History = append(task.History, acceptanceMessage)

    return task
}

Progress Updates (WORKING → WORKING)

Send progress updates during processing:

func sendProgressUpdate(task *pb.Task, progressPercentage int, currentPhase, details string) *pb.Task {
    // Create progress data
    progressData, _ := structpb.NewStruct(map[string]interface{}{
        "progress_percentage": progressPercentage,
        "current_phase":       currentPhase,
        "details":            details,
        "estimated_remaining": calculateRemainingTime(progressPercentage),
        "memory_usage_mb":     getCurrentMemoryUsage(),
        "cpu_usage_percent":   getCurrentCPUUsage(),
    })

    progressMessage := &pb.Message{
        MessageId: fmt.Sprintf("msg_progress_%s_%d", uuid.New().String(), progressPercentage),
        ContextId: task.GetContextId(),
        TaskId:    task.GetId(),
        Role:      pb.Role_AGENT,
        Content: []*pb.Part{
            {
                Part: &pb.Part_Text{
                    Text: fmt.Sprintf("Progress update: %d%% complete. Current phase: %s",
                        progressPercentage, currentPhase),
                },
            },
            {
                Part: &pb.Part_Data{
                    Data: &pb.DataPart{
                        Data:        progressData,
                        Description: "Detailed progress information",
                    },
                },
            },
        },
        Metadata: &structpb.Struct{
            Fields: map[string]*structpb.Value{
                "update_type":         structpb.NewStringValue("progress"),
                "progress_percentage": structpb.NewNumberValue(float64(progressPercentage)),
                "phase":              structpb.NewStringValue(currentPhase),
            },
        },
    }

    // Update task status (still WORKING, but with new message)
    task.Status = &pb.TaskStatus{
        State:     pb.TaskState_TASK_STATE_WORKING,
        Update:    progressMessage,
        Timestamp: timestamppb.Now(),
    }

    // Add to history
    task.History = append(task.History, progressMessage)

    return task
}

func calculateRemainingTime(progressPercentage int) string {
    if progressPercentage <= 0 {
        return "unknown"
    }
    // Simplified estimation logic
    remainingMinutes := (100 - progressPercentage) * 15 / 100
    return fmt.Sprintf("%dm", remainingMinutes)
}

Completing a Task (WORKING → COMPLETED)

Complete a task with results:

func completeTask(task *pb.Task, results string, artifacts []*pb.Artifact) *pb.Task {
    // Create completion message
    completionMessage := &pb.Message{
        MessageId: fmt.Sprintf("msg_complete_%s", uuid.New().String()),
        ContextId: task.GetContextId(),
        TaskId:    task.GetId(),
        Role:      pb.Role_AGENT,
        Content: []*pb.Part{
            {
                Part: &pb.Part_Text{
                    Text: fmt.Sprintf("Task completed successfully. %s", results),
                },
            },
        },
        Metadata: &structpb.Struct{
            Fields: map[string]*structpb.Value{
                "completion_status": structpb.NewStringValue("success"),
                "processing_time":   structpb.NewStringValue(
                    time.Since(getTaskStartTime(task)).String(),
                ),
                "artifact_count":    structpb.NewNumberValue(float64(len(artifacts))),
            },
        },
    }

    // Update task status
    task.Status = &pb.TaskStatus{
        State:     pb.TaskState_TASK_STATE_COMPLETED,
        Update:    completionMessage,
        Timestamp: timestamppb.Now(),
    }

    // Add completion message to history
    task.History = append(task.History, completionMessage)

    // Add artifacts
    task.Artifacts = append(task.Artifacts, artifacts...)

    return task
}

Handling Task Failures (WORKING → FAILED)

Handle task failures with detailed error information:

func failTask(task *pb.Task, errorMessage, errorCode string, errorDetails map[string]interface{}) *pb.Task {
    // Create error data
    errorData, _ := structpb.NewStruct(map[string]interface{}{
        "error_code":    errorCode,
        "error_message": errorMessage,
        "error_details": errorDetails,
        "failure_phase": getCurrentProcessingPhase(task),
        "retry_possible": determineRetryPossibility(errorCode),
        "diagnostic_info": map[string]interface{}{
            "memory_at_failure": getCurrentMemoryUsage(),
            "cpu_at_failure":   getCurrentCPUUsage(),
            "logs_reference":   getLogReference(),
        },
    })

    failureMessage := &pb.Message{
        MessageId: fmt.Sprintf("msg_failure_%s", uuid.New().String()),
        ContextId: task.GetContextId(),
        TaskId:    task.GetId(),
        Role:      pb.Role_AGENT,
        Content: []*pb.Part{
            {
                Part: &pb.Part_Text{
                    Text: fmt.Sprintf("Task failed: %s (Code: %s)", errorMessage, errorCode),
                },
            },
            {
                Part: &pb.Part_Data{
                    Data: &pb.DataPart{
                        Data:        errorData,
                        Description: "Detailed error information and diagnostics",
                    },
                },
            },
        },
        Metadata: &structpb.Struct{
            Fields: map[string]*structpb.Value{
                "failure_type":  structpb.NewStringValue("processing_error"),
                "error_code":    structpb.NewStringValue(errorCode),
                "retry_possible": structpb.NewBoolValue(determineRetryPossibility(errorCode)),
            },
        },
    }

    // Update task status
    task.Status = &pb.TaskStatus{
        State:     pb.TaskState_TASK_STATE_FAILED,
        Update:    failureMessage,
        Timestamp: timestamppb.Now(),
    }

    // Add failure message to history
    task.History = append(task.History, failureMessage)

    return task
}

func determineRetryPossibility(errorCode string) bool {
    // Determine if the error is retryable
    retryableErrors := []string{
        "TEMPORARY_RESOURCE_UNAVAILABLE",
        "NETWORK_TIMEOUT",
        "RATE_LIMIT_EXCEEDED",
    }

    for _, retryable := range retryableErrors {
        if errorCode == retryable {
            return true
        }
    }
    return false
}

Cancelling Tasks (ANY → CANCELLED)

Handle task cancellation:

func cancelTask(task *pb.Task, reason, cancelledBy string) *pb.Task {
    cancellationMessage := &pb.Message{
        MessageId: fmt.Sprintf("msg_cancel_%s", uuid.New().String()),
        ContextId: task.GetContextId(),
        TaskId:    task.GetId(),
        Role:      pb.Role_AGENT,
        Content: []*pb.Part{
            {
                Part: &pb.Part_Text{
                    Text: fmt.Sprintf("Task cancelled: %s", reason),
                },
            },
        },
        Metadata: &structpb.Struct{
            Fields: map[string]*structpb.Value{
                "cancellation_reason": structpb.NewStringValue(reason),
                "cancelled_by":        structpb.NewStringValue(cancelledBy),
                "previous_state":      structpb.NewStringValue(task.GetStatus().GetState().String()),
            },
        },
    }

    // Update task status
    task.Status = &pb.TaskStatus{
        State:     pb.TaskState_TASK_STATE_CANCELLED,
        Update:    cancellationMessage,
        Timestamp: timestamppb.Now(),
    }

    // Add cancellation message to history
    task.History = append(task.History, cancellationMessage)

    return task
}

Publishing Task Updates

Using AgentHub Client

Publish task updates through the AgentHub broker:

import (
    "context"
    eventbus "github.com/owulveryck/agenthub/events/eventbus"
)

func publishTaskUpdate(ctx context.Context, client eventbus.AgentHubClient, task *pb.Task, fromAgent, toAgent string) error {
    _, err := client.PublishTaskUpdate(ctx, &eventbus.PublishTaskUpdateRequest{
        Task: task,
        Routing: &eventbus.AgentEventMetadata{
            FromAgentId: fromAgent,
            ToAgentId:   toAgent,
            EventType:   fmt.Sprintf("task.%s", task.GetStatus().GetState().String()),
            Priority:    getPriorityFromTaskState(task.GetStatus().GetState()),
        },
    })

    return err
}

func getPriorityFromTaskState(state pb.TaskState) eventbus.Priority {
    switch state {
    case pb.TaskState_TASK_STATE_FAILED:
        return eventbus.Priority_PRIORITY_HIGH
    case pb.TaskState_TASK_STATE_COMPLETED:
        return eventbus.Priority_PRIORITY_MEDIUM
    case pb.TaskState_TASK_STATE_WORKING:
        return eventbus.Priority_PRIORITY_LOW
    default:
        return eventbus.Priority_PRIORITY_MEDIUM
    }
}

Using A2A Abstractions

Use simplified A2A task management:

import (
    "github.com/owulveryck/agenthub/internal/agenthub"
)

func manageTaskWithA2A(ctx context.Context, subscriber *agenthub.A2ATaskSubscriber, task *pb.Task) error {
    // Process the task
    artifact, status, errorMsg := processTaskContent(ctx, task)

    switch status {
    case pb.TaskState_TASK_STATE_COMPLETED:
        return subscriber.CompleteA2ATaskWithArtifact(ctx, task, artifact)
    case pb.TaskState_TASK_STATE_FAILED:
        return subscriber.FailA2ATask(ctx, task, errorMsg)
    default:
        return subscriber.UpdateA2ATaskProgress(ctx, task, 50, "Processing data", "Halfway complete")
    }
}

Task Monitoring and Querying

Get Task Status

Query task status and history:

func getTaskStatus(ctx context.Context, client eventbus.AgentHubClient, taskID string) (*pb.Task, error) {
    task, err := client.GetTask(ctx, &eventbus.GetTaskRequest{
        TaskId:        taskID,
        HistoryLength: 10, // Get last 10 messages
    })
    if err != nil {
        return nil, err
    }

    // Log current status
    log.Printf("Task %s status: %s", taskID, task.GetStatus().GetState().String())
    log.Printf("Last update: %s", task.GetStatus().GetUpdate().GetContent()[0].GetText())
    log.Printf("History length: %d messages", len(task.GetHistory()))
    log.Printf("Artifacts: %d", len(task.GetArtifacts()))

    return task, nil
}

List Tasks by Context

Get all tasks for a conversation context:

func getTasksInContext(ctx context.Context, client eventbus.AgentHubClient, contextID string) ([]*pb.Task, error) {
    response, err := client.ListTasks(ctx, &eventbus.ListTasksRequest{
        ContextId: contextID,
        States:    []pb.TaskState{}, // All states
        Limit:     100,
    })
    if err != nil {
        return nil, err
    }

    tasks := response.GetTasks()
    log.Printf("Found %d tasks in context %s", len(tasks), contextID)

    // Analyze task distribution
    stateCount := make(map[pb.TaskState]int)
    for _, task := range tasks {
        stateCount[task.GetStatus().GetState()]++
    }

    for state, count := range stateCount {
        log.Printf("  %s: %d tasks", state.String(), count)
    }

    return tasks, nil
}

Workflow Coordination

Sequential Task Workflow

Create dependent tasks that execute in sequence:

type TaskWorkflow struct {
    ContextID string
    Tasks     []*pb.Task
    Current   int
}

func (tw *TaskWorkflow) ExecuteNext(ctx context.Context, client eventbus.AgentHubClient) error {
    if tw.Current >= len(tw.Tasks) {
        return fmt.Errorf("workflow completed")
    }

    currentTask := tw.Tasks[tw.Current]

    // Add dependency metadata if not first task
    if tw.Current > 0 {
        previousTask := tw.Tasks[tw.Current-1]
        dependencyMetadata := map[string]interface{}{
            "depends_on":     previousTask.GetId(),
            "workflow_step":  tw.Current + 1,
            "total_steps":    len(tw.Tasks),
        }

        metadata, _ := structpb.NewStruct(dependencyMetadata)
        currentTask.Metadata = metadata
    }

    // Publish the task
    err := publishTaskUpdate(ctx, client, currentTask, "workflow_coordinator", "")
    if err != nil {
        return err
    }

    tw.Current++
    return nil
}

Parallel Task Execution

Execute multiple tasks concurrently:

func executeParallelTasks(ctx context.Context, client eventbus.AgentHubClient, tasks []*pb.Task) error {
    var wg sync.WaitGroup
    errors := make(chan error, len(tasks))

    for _, task := range tasks {
        wg.Add(1)
        go func(t *pb.Task) {
            defer wg.Done()

            // Add parallel execution metadata
            t.Metadata = &structpb.Struct{
                Fields: map[string]*structpb.Value{
                    "execution_mode": structpb.NewStringValue("parallel"),
                    "batch_id":       structpb.NewStringValue(uuid.New().String()),
                    "batch_size":     structpb.NewNumberValue(float64(len(tasks))),
                },
            }

            err := publishTaskUpdate(ctx, client, t, "parallel_coordinator", "")
            if err != nil {
                errors <- err
            }
        }(task)
    }

    wg.Wait()
    close(errors)

    // Check for errors
    for err := range errors {
        if err != nil {
            return err
        }
    }

    return nil
}

Best Practices

1. Always Update Task Status

// Update status for every significant state change
task = acceptTask(task, agentID)
publishTaskUpdate(ctx, client, task, agentID, "")

2. Provide Meaningful Progress Updates

// Send regular progress updates during long-running tasks
for progress := 10; progress <= 90; progress += 10 {
    task = sendProgressUpdate(task, progress, currentPhase, details)
    publishTaskUpdate(ctx, client, task, agentID, "")
    time.Sleep(processingInterval)
}

3. Include Rich Error Information

errorDetails := map[string]interface{}{
    "input_validation_errors": validationErrors,
    "system_resources":        resourceSnapshot,
    "retry_strategy":         "exponential_backoff",
}
task = failTask(task, "Data validation failed", "INVALID_INPUT", errorDetails)

4. Maintain Complete Message History

// Always append to history, never replace
task.History = append(task.History, statusMessage)

5. Use Appropriate Metadata

// Include context for debugging and monitoring
metadata := map[string]interface{}{
    "processing_node":  hostname,
    "resource_usage":   resourceMetrics,
    "performance_metrics": performanceData,
}

This guide covered the complete A2A task lifecycle management. You now have the tools to create, manage, and coordinate complex task workflows with proper state management and comprehensive observability.