Understanding Tasks in Agent2Agent Communication
Understanding Tasks in Agent2Agent Communication
Tasks are the fundamental unit of work exchange in the Agent2Agent protocol. This document provides a deep dive into task semantics, lifecycle, and design patterns.
Task Anatomy
Core Components
Every task in the Agent2Agent system consists of several key components that define its identity, purpose, and execution context:
Task Identity
string task_id = 1; // Unique identifier
string task_type = 2; // Semantic classification
The task_id serves as a unique identifier that allows all participants to track the task throughout its lifecycle. It should be globally unique and meaningful for debugging purposes.
The task_type provides semantic classification that allows agents to understand what kind of work is being requested. Examples include:
data_analysis- Request for data processing and analysisimage_generation- Request for creating or manipulating imagesfile_conversion- Request for converting files between formatsnotification_delivery- Request for sending notifications to users
Task Payload
google.protobuf.Struct parameters = 3; // Task-specific data
google.protobuf.Struct metadata = 8; // Additional context
Parameters contain the specific data required to execute the task. The structure is flexible and task-type dependent:
// Example: data_analysis task parameters
{
"dataset_path": "/data/sales_2024.csv",
"analysis_type": "trend_analysis",
"time_period": "quarterly",
"output_format": "json"
}
// Example: image_generation task parameters
{
"prompt": "A futuristic cityscape at sunset",
"style": "photorealistic",
"resolution": "1920x1080",
"quality": "high"
}
Metadata provides additional context that may be useful for execution, auditing, or debugging but isn’t directly required for task completion:
{
"workflow_id": "workflow_abc123",
"user_id": "user_456",
"request_source": "web_ui",
"correlation_id": "trace_789"
}
Agent Coordination
string requester_agent_id = 4; // Who initiated the task
string responder_agent_id = 5; // Who should execute (optional)
These fields establish the communication relationship:
- requester_agent_id identifies which agent requested the task, enabling result delivery
- responder_agent_id can specify a particular agent for execution, or be omitted for broadcast/routing
Execution Context
google.protobuf.Timestamp deadline = 6; // When task must complete
Priority priority = 7; // Urgency level
google.protobuf.Timestamp created_at = 9; // Creation timestamp
This context helps agents make intelligent scheduling decisions:
- deadline enables time-sensitive prioritization
- priority provides explicit urgency ranking
- created_at enables age-based scheduling policies
Task Lifecycle
1. Task Creation and Publishing
Tasks begin their lifecycle when a requesting agent identifies work that needs to be done:
// Agent identifies need for data analysis
task := &pb.TaskMessage{
TaskId: generateUniqueID(),
TaskType: "data_analysis",
Parameters: analysisParams,
RequesterAgentId: "data_orchestrator",
ResponderAgentId: "data_processor_01", // Optional: specific agent
Priority: pb.Priority_PRIORITY_HIGH,
Deadline: timestamppb.New(time.Now().Add(30 * time.Minute)),
CreatedAt: timestamppb.Now(),
}
// Publish to the broker
client.PublishTask(ctx, &pb.PublishTaskRequest{Task: task})
2. Task Discovery and Acceptance
Agents subscribe to tasks and evaluate whether to accept them:
// Agent receives task notification
func (a *Agent) evaluateTask(task *pb.TaskMessage) bool {
// Check if agent can handle this task type
if !a.canHandle(task.GetTaskType()) {
return false
}
// Check capacity constraints
if a.getCurrentLoad() > a.maxCapacity {
return false
}
// Check deadline feasibility
estimatedDuration := a.estimateTaskDuration(task)
if time.Now().Add(estimatedDuration).After(task.GetDeadline().AsTime()) {
return false
}
return true
}
3. Task Execution with Progress Reporting
Accepted tasks enter the execution phase with regular progress updates:
func (a *Agent) executeTask(task *pb.TaskMessage) {
// Initial progress
a.reportProgress(task, 0, "Task started")
// Phase 1: Preparation
a.reportProgress(task, 25, "Preparing data")
prepareResult := a.prepareExecution(task)
// Phase 2: Main processing
a.reportProgress(task, 50, "Processing data")
processResult := a.processData(prepareResult)
// Phase 3: Finalization
a.reportProgress(task, 75, "Finalizing results")
finalResult := a.finalizeResults(processResult)
// Completion
a.reportProgress(task, 100, "Task completed")
a.publishResult(task, finalResult, pb.TaskStatus_TASK_STATUS_COMPLETED)
}
4. Result Delivery
Task completion triggers result publication back to the requesting agent:
result := &pb.TaskResult{
TaskId: task.GetTaskId(),
Status: pb.TaskStatus_TASK_STATUS_COMPLETED,
Result: resultData,
ExecutorAgentId: a.agentId,
CompletedAt: timestamppb.Now(),
ExecutionMetadata: executionContext,
}
client.PublishTaskResult(ctx, &pb.PublishTaskResultRequest{Result: result})
Task Design Patterns
1. Simple Request-Response
The most basic pattern where one agent requests work from another:
Agent A ──[Task]──> Broker ──[Task]──> Agent B
Agent A <─[Result]─ Broker <─[Result]─ Agent B
Use cases:
- File format conversion
- Simple calculations
- Data validation
- Content generation
2. Broadcast Processing
One agent broadcasts a task to multiple potential processors:
Agent A ──[Task]──> Broker ──[Task]──> Agent B₁
├─[Task]──> Agent B₂
└─[Task]──> Agent B₃
Use cases:
- Distributed computation
- Load testing
- Content distribution
- Parallel processing
3. Pipeline Processing
Tasks flow through a series of specialized agents:
Agent A ──[Task₁]──> Agent B ──[Task₂]──> Agent C ──[Task₃]──> Agent D
<──[Result]─────────────────────────────────────────────┘
Use cases:
- Data processing pipelines
- Image processing workflows
- Document processing chains
- ETL operations
4. Hierarchical Decomposition
Complex tasks are broken down into subtasks by coordinator agents:
Agent A ──[ComplexTask]──> Coordinator
├──[SubTask₁]──> Specialist₁
├──[SubTask₂]──> Specialist₂
└──[SubTask₃]──> Specialist₃
Use cases:
- Complex business workflows
- Multi-step analysis
- Orchestrated services
- Batch job coordination
5. Competitive Processing
Multiple agents compete to handle the same task (first-come-first-served):
Agent A ──[Task]──> Broker ──[Task]──> Agent B₁ (accepts)
├─[Task]──> Agent B₂ (rejects)
└─[Task]──> Agent B₃ (rejects)
Use cases:
- Resource-constrained environments
- Load balancing
- Fault tolerance
- Performance optimization
Task Types and Semantics
Classification Strategies
Task types should follow consistent naming conventions that enable agents to understand capabilities:
Domain-Based Classification
data.* - Data processing tasks
data.analysis
data.transformation
data.validation
image.* - Image processing tasks
image.generation
image.enhancement
image.conversion
notification.* - Communication tasks
notification.email
notification.sms
notification.push
Operation-Based Classification
create.* - Creation operations
update.* - Modification operations
delete.* - Removal operations
analyze.* - Analysis operations
transform.* - Transformation operations
Complexity-Based Classification
simple.* - Quick, low-resource tasks
standard.* - Normal processing tasks
complex.* - Resource-intensive tasks
background.* - Long-running batch tasks
Parameter Design Guidelines
Be Explicit: Include all information needed for execution
// Good: Explicit parameters
{
"source_format": "csv",
"target_format": "json",
"include_headers": true,
"delimiter": ",",
"encoding": "utf-8"
}
// Poor: Ambiguous parameters
{
"file": "data.csv",
"convert": "json"
}
Use Standard Data Types: Leverage common formats for interoperability
// Good: Standard formats
{
"timestamp": "2024-01-15T10:30:00Z", // ISO 8601
"amount": "123.45", // String for precision
"coordinates": {"lat": 40.7128, "lng": -74.0060}
}
Include Validation Information: Help agents validate inputs
{
"email": "user@example.com",
"email_format": "rfc5322",
"max_length": 254,
"required": true
}
Error Handling and Edge Cases
Task Rejection
Agents should provide meaningful rejection reasons:
func (a *Agent) rejectTask(task *pb.TaskMessage, reason string) {
result := &pb.TaskResult{
TaskId: task.GetTaskId(),
Status: pb.TaskStatus_TASK_STATUS_FAILED,
ErrorMessage: reason,
ExecutorAgentId: a.agentId,
CompletedAt: timestamppb.Now(),
}
a.publishResult(result)
}
Common rejection reasons:
UNSUPPORTED_TASK_TYPE: Agent doesn’t handle this task typeCAPACITY_EXCEEDED: Agent is at maximum capacityDEADLINE_IMPOSSIBLE: Cannot complete within deadlineINVALID_PARAMETERS: Task parameters are malformedRESOURCE_UNAVAILABLE: Required external resources unavailable
Timeout Handling
Both requesters and processors should handle timeouts gracefully:
// Requester timeout
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
select {
case result := <-resultChannel:
// Process result
case <-ctx.Done():
// Handle timeout - possibly retry or fail
}
// Processor timeout
func (a *Agent) executeWithTimeout(task *pb.TaskMessage) {
deadline := task.GetDeadline().AsTime()
ctx, cancel := context.WithDeadline(context.Background(), deadline)
defer cancel()
select {
case result := <-a.processTask(ctx, task):
a.publishResult(task, result, pb.TaskStatus_TASK_STATUS_COMPLETED)
case <-ctx.Done():
a.publishResult(task, nil, pb.TaskStatus_TASK_STATUS_FAILED, "Deadline exceeded")
}
}
Partial Results
For long-running tasks, consider supporting partial results:
type PartialResult struct {
TaskId string
CompletedPortion float64 // 0.0 to 1.0
IntermediateData interface{}
CanResume bool
ResumeToken string
}
Best Practices
Task Design
- Make task types granular but not too fine-grained
- Design for idempotency when possible
- Include retry information in metadata
- Use consistent parameter naming across similar task types
- Version your task schemas to enable evolution
Performance Considerations
- Batch related tasks when appropriate
- Use appropriate priority levels to avoid starvation
- Set realistic deadlines based on historical performance
- Include resource hints to help with scheduling
- Monitor task completion rates to identify bottlenecks
Security Considerations
- Validate all task parameters before processing
- Sanitize user-provided data in task parameters
- Include authorization context in metadata
- Log task execution for audit trails
- Encrypt sensitive parameters when necessary
Tasks form the foundation of Agent2Agent communication, enabling sophisticated distributed processing patterns while maintaining clear semantics and strong observability. Proper task design is crucial for building robust, scalable agent networks.
Feedback
Was this page helpful?
Glad to hear it! Please tell us how we can improve.
Sorry to hear that. Please tell us how we can improve.