How to Work with A2A Conversation Context
How to Work with A2A Conversation Context
This guide shows you how to use A2A conversation contexts to group related messages, maintain state across interactions, and coordinate multi-agent workflows.
Understanding A2A Conversation Context
A2A conversation context is identified by a context_id that groups related messages and tasks. This enables:
- Multi-turn conversations between agents
- Workflow coordination across multiple tasks
- State preservation throughout long-running processes
- Message threading for audit trails
- Context-aware routing based on conversation history
Creating Conversation Contexts
Simple Conversation Context
Start a basic conversation context:
package main
import (
    "fmt"
    "github.com/google/uuid"
    pb "github.com/owulveryck/agenthub/events/a2a"
)
func createConversationContext(workflowType string) string {
    return fmt.Sprintf("ctx_%s_%s", workflowType, uuid.New().String())
}
func startConversation() *pb.Message {
    contextID := createConversationContext("user_onboarding")
    return &pb.Message{
        MessageId: fmt.Sprintf("msg_%s", uuid.New().String()),
        ContextId: contextID,
        Role:      pb.Role_USER,
        Content: []*pb.Part{
            {
                Part: &pb.Part_Text{
                    Text: "Please start the user onboarding process for new user.",
                },
            },
        },
    }
}
Workflow-Specific Contexts
Create contexts for different workflow types:
func createWorkflowContexts() map[string]string {
    return map[string]string{
        "data_analysis":    createConversationContext("data_analysis"),
        "image_processing": createConversationContext("image_processing"),
        "user_support":     createConversationContext("user_support"),
        "integration_test": createConversationContext("integration_test"),
    }
}
Multi-Turn Conversations
Conversation Initiation
Start a conversation with initial context:
import (
    "google.golang.org/protobuf/types/known/structpb"
)
func initiateDataAnalysisConversation() *pb.Message {
    contextID := createConversationContext("data_analysis")
    // Initial conversation metadata
    contextMetadata, _ := structpb.NewStruct(map[string]interface{}{
        "workflow_type":    "data_analysis",
        "initiated_by":     "user_12345",
        "priority":         "high",
        "expected_steps":   []string{"validation", "processing", "analysis", "report"},
        "timeout_minutes":  30,
    })
    return &pb.Message{
        MessageId: fmt.Sprintf("msg_%s", uuid.New().String()),
        ContextId: contextID,
        Role:      pb.Role_USER,
        Content: []*pb.Part{
            {
                Part: &pb.Part_Text{
                    Text: "Please analyze the uploaded dataset and provide insights.",
                },
            },
            {
                Part: &pb.Part_Data{
                    Data: &pb.DataPart{
                        Data:        contextMetadata,
                        Description: "Conversation context and workflow parameters",
                    },
                },
            },
        },
        Metadata: contextMetadata,
    }
}
Continuing the Conversation
Add follow-up messages to the same context:
func continueConversation(contextID, previousMessageID string) *pb.Message {
    return &pb.Message{
        MessageId: fmt.Sprintf("msg_%s", uuid.New().String()),
        ContextId: contextID, // Same context as initial message
        Role:      pb.Role_USER,
        Content: []*pb.Part{
            {
                Part: &pb.Part_Text{
                    Text: "Can you also include trend analysis in the report?",
                },
            },
        },
        Metadata: &structpb.Struct{
            Fields: map[string]*structpb.Value{
                "follows_message": structpb.NewStringValue(previousMessageID),
                "conversation_turn": structpb.NewNumberValue(2),
            },
        },
    }
}
Agent Responses in Context
Agents respond within the same conversation context:
func createAgentResponse(contextID, requestMessageID, response string) *pb.Message {
    return &pb.Message{
        MessageId: fmt.Sprintf("msg_%s", uuid.New().String()),
        ContextId: contextID, // Same context as request
        Role:      pb.Role_AGENT,
        Content: []*pb.Part{
            {
                Part: &pb.Part_Text{
                    Text: response,
                },
            },
        },
        Metadata: &structpb.Struct{
            Fields: map[string]*structpb.Value{
                "responding_to": structpb.NewStringValue(requestMessageID),
                "agent_id":      structpb.NewStringValue("data_analysis_agent"),
            },
        },
    }
}
Context-Aware Task Management
Creating Tasks with Context
Link tasks to conversation contexts:
import (
    "google.golang.org/protobuf/types/known/timestamppb"
)
func createContextAwareTask(contextID string) *pb.Task {
    taskID := fmt.Sprintf("task_%s_%s", "analysis", uuid.New().String())
    return &pb.Task{
        Id:        taskID,
        ContextId: contextID, // Link to conversation
        Status: &pb.TaskStatus{
            State: pb.TaskState_TASK_STATE_SUBMITTED,
            Update: &pb.Message{
                MessageId: fmt.Sprintf("msg_%s", uuid.New().String()),
                ContextId: contextID,
                TaskId:    taskID,
                Role:      pb.Role_USER,
                Content: []*pb.Part{
                    {
                        Part: &pb.Part_Text{
                            Text: "Task submitted for data analysis workflow",
                        },
                    },
                },
            },
            Timestamp: timestamppb.Now(),
        },
        History: []*pb.Message{}, // Will be populated during processing
        Artifacts: []*pb.Artifact{}, // Will be populated on completion
    }
}
Context-Based Task Querying
Retrieve all tasks for a conversation context:
func getTasksForContext(ctx context.Context, client pb.AgentHubClient, contextID string) ([]*pb.Task, error) {
    response, err := client.ListTasks(ctx, &pb.ListTasksRequest{
        ContextId: contextID,
        Limit:     100,
    })
    if err != nil {
        return nil, err
    }
    return response.GetTasks(), nil
}
Workflow Coordination
Multi-Agent Workflow with Shared Context
Coordinate multiple agents within a single conversation:
type WorkflowCoordinator struct {
    client    pb.AgentHubClient
    contextID string
    logger    *log.Logger
}
func (wc *WorkflowCoordinator) ExecuteDataPipeline(ctx context.Context) error {
    // Step 1: Data Validation
    validationTask := &pb.Task{
        Id:        fmt.Sprintf("task_validation_%s", uuid.New().String()),
        ContextId: wc.contextID,
        Status: &pb.TaskStatus{
            State: pb.TaskState_TASK_STATE_SUBMITTED,
            Update: &pb.Message{
                MessageId: fmt.Sprintf("msg_%s", uuid.New().String()),
                ContextId: wc.contextID,
                Role:      pb.Role_USER,
                Content: []*pb.Part{
                    {
                        Part: &pb.Part_Text{
                            Text: "Validate uploaded dataset for quality and completeness",
                        },
                    },
                },
            },
            Timestamp: timestamppb.Now(),
        },
    }
    // Publish validation task
    _, err := wc.client.PublishTaskUpdate(ctx, &pb.PublishTaskUpdateRequest{
        Task: validationTask,
        Routing: &pb.AgentEventMetadata{
            FromAgentId: "workflow_coordinator",
            ToAgentId:   "data_validator",
            EventType:   "task.validation",
            Priority:    pb.Priority_PRIORITY_HIGH,
        },
    })
    if err != nil {
        return err
    }
    // Step 2: Data Processing (after validation)
    processingTask := &pb.Task{
        Id:        fmt.Sprintf("task_processing_%s", uuid.New().String()),
        ContextId: wc.contextID, // Same context
        Status: &pb.TaskStatus{
            State: pb.TaskState_TASK_STATE_SUBMITTED,
            Update: &pb.Message{
                MessageId: fmt.Sprintf("msg_%s", uuid.New().String()),
                ContextId: wc.contextID,
                Role:      pb.Role_USER,
                Content: []*pb.Part{
                    {
                        Part: &pb.Part_Text{
                            Text: "Process validated dataset and extract features",
                        },
                    },
                },
                Metadata: &structpb.Struct{
                    Fields: map[string]*structpb.Value{
                        "depends_on": structpb.NewStringValue(validationTask.GetId()),
                        "workflow_step": structpb.NewNumberValue(2),
                    },
                },
            },
            Timestamp: timestamppb.Now(),
        },
    }
    // Publish processing task
    _, err = wc.client.PublishTaskUpdate(ctx, &pb.PublishTaskUpdateRequest{
        Task: processingTask,
        Routing: &pb.AgentEventMetadata{
            FromAgentId: "workflow_coordinator",
            ToAgentId:   "data_processor",
            EventType:   "task.processing",
            Priority:    pb.Priority_PRIORITY_MEDIUM,
        },
    })
    return err
}
Context State Management
Tracking Conversation State
Maintain state throughout the conversation:
type ConversationState struct {
    ContextID     string                 `json:"context_id"`
    WorkflowType  string                 `json:"workflow_type"`
    CurrentStep   int                    `json:"current_step"`
    TotalSteps    int                    `json:"total_steps"`
    CompletedTasks []string              `json:"completed_tasks"`
    PendingTasks   []string              `json:"pending_tasks"`
    Variables      map[string]interface{} `json:"variables"`
    CreatedAt      time.Time             `json:"created_at"`
    UpdatedAt      time.Time             `json:"updated_at"`
}
func (cs *ConversationState) ToMetadata() (*structpb.Struct, error) {
    data := map[string]interface{}{
        "context_id":      cs.ContextID,
        "workflow_type":   cs.WorkflowType,
        "current_step":    cs.CurrentStep,
        "total_steps":     cs.TotalSteps,
        "completed_tasks": cs.CompletedTasks,
        "pending_tasks":   cs.PendingTasks,
        "variables":       cs.Variables,
        "updated_at":      cs.UpdatedAt.Format(time.RFC3339),
    }
    return structpb.NewStruct(data)
}
func (cs *ConversationState) UpdateFromMessage(message *pb.Message) {
    cs.UpdatedAt = time.Now()
    // Extract state updates from message metadata
    if metadata := message.GetMetadata(); metadata != nil {
        if step, ok := metadata.GetFields()["current_step"]; ok {
            cs.CurrentStep = int(step.GetNumberValue())
        }
        if vars, ok := metadata.GetFields()["variables"]; ok {
            if varsStruct := vars.GetStructValue(); varsStruct != nil {
                for key, value := range varsStruct.GetFields() {
                    cs.Variables[key] = value
                }
            }
        }
    }
}
State-Aware Message Creation
Include conversation state in messages:
func createStateAwareMessage(contextID string, state *ConversationState, content string) *pb.Message {
    stateMetadata, _ := state.ToMetadata()
    return &pb.Message{
        MessageId: fmt.Sprintf("msg_%s", uuid.New().String()),
        ContextId: contextID,
        Role:      pb.Role_USER,
        Content: []*pb.Part{
            {
                Part: &pb.Part_Text{
                    Text: content,
                },
            },
            {
                Part: &pb.Part_Data{
                    Data: &pb.DataPart{
                        Data:        stateMetadata,
                        Description: "Current conversation state",
                    },
                },
            },
        },
        Metadata: stateMetadata,
    }
}
Context-Based Routing
Route Messages Based on Context
Use conversation context for intelligent routing:
func routeByContext(contextID string) *pb.AgentEventMetadata {
    // Determine routing based on context type
    var targetAgent string
    var eventType string
    if strings.Contains(contextID, "data_analysis") {
        targetAgent = "data_analysis_agent"
        eventType = "data.analysis"
    } else if strings.Contains(contextID, "image_processing") {
        targetAgent = "image_processor"
        eventType = "image.processing"
    } else if strings.Contains(contextID, "user_support") {
        targetAgent = "support_agent"
        eventType = "support.request"
    } else {
        targetAgent = "" // Broadcast to all agents
        eventType = "general.message"
    }
    return &pb.AgentEventMetadata{
        FromAgentId:   "context_router",
        ToAgentId:     targetAgent,
        EventType:     eventType,
        Subscriptions: []string{eventType},
        Priority:      pb.Priority_PRIORITY_MEDIUM,
    }
}
Subscribe to Context-Specific Events
Agents can subscribe to specific conversation contexts:
func subscribeToContextEvents(ctx context.Context, client pb.AgentHubClient, agentID, contextPattern string) error {
    stream, err := client.SubscribeToMessages(ctx, &pb.SubscribeToMessagesRequest{
        AgentId: agentID,
        ContextPattern: contextPattern, // e.g., "ctx_data_analysis_*"
    })
    if err != nil {
        return err
    }
    for {
        event, err := stream.Recv()
        if err != nil {
            return err
        }
        if message := event.GetMessage(); message != nil {
            log.Printf("Received context message: %s in context: %s",
                message.GetMessageId(), message.GetContextId())
            // Process message within context
            processContextMessage(ctx, message)
        }
    }
}
Best Practices
1. Use Descriptive Context IDs
contextID := fmt.Sprintf("ctx_%s_%s_%s", workflowType, userID, uuid.New().String())
2. Preserve Context Across All Related Messages
// All messages in the same workflow should use the same context_id
message.ContextId = existingContextID
3. Include Context Metadata for State Tracking
contextMetadata := map[string]interface{}{
    "workflow_type":   "data_pipeline",
    "initiated_by":    userID,
    "current_step":    stepNumber,
    "total_steps":     totalSteps,
}
4. Use Context for Task Dependencies
taskMetadata := map[string]interface{}{
    "context_id":     contextID,
    "depends_on":     previousTaskID,
    "workflow_step":  stepNumber,
}
5. Handle Context Cleanup
// Set context expiration for long-running workflows
contextMetadata["expires_at"] = time.Now().Add(24 * time.Hour).Format(time.RFC3339)
This guide covered conversation context management in A2A protocol. Next, learn about Working with A2A Artifacts to understand how to create and manage structured outputs from completed tasks.
Feedback
Was this page helpful?
Glad to hear it! Please tell us how we can improve.
Sorry to hear that. Please tell us how we can improve.