Workflow Tutorials
Learn to design and implement sophisticated multi-agent workflows and orchestration patterns.
Available Tutorials
- Building Multi-Agent Workflows - Complete guide to creating complex agent interactions
This is the multi-page printable view of this section. Click here to print.
Learn to design and implement sophisticated multi-agent workflows and orchestration patterns.
This advanced tutorial teaches you to create complex workflows involving multiple specialized agents working together to accomplish sophisticated tasks. You’ll build a real document processing pipeline with multiple agents handling different stages.
By the end of this tutorial, you’ll have an A2A-compliant multi-agent system that:
This demonstrates real-world A2A agent collaboration patterns with conversation context, structured message content, and artifact-based results used in production systems.
┌─────────────────┐    ┌─────────────────┐    ┌─────────────────┐
│  A2A Workflow   │    │   AgentHub      │    │  A2A Specialized│
│  Coordinator    │    │  A2A Broker     │    │    Agents       │
│                 │    │                 │    │                 │
│ • A2A context   │◄──►│ • Routes A2A    │◄──►│ • Document      │
│   management    │    │   messages      │    │   Intake        │
│ • Conversation  │    │ • Tracks A2A    │    │ • Validation    │
│   threading     │    │   conversations │    │ • Metadata      │
│ • Artifact      │    │ • Manages A2A   │    │ • Text Proc     │
│   aggregation   │    │   state         │    │ • Summary       │
└─────────────────┘    └─────────────────┘    └─────────────────┘
First, let’s create the main coordinator that manages the document processing pipeline.
Create the coordinator agent:
mkdir -p agents/coordinator
Create agents/coordinator/main.go:
package main
import (
	"context"
	"fmt"
	"log"
	"time"
	"github.com/google/uuid"
	"google.golang.org/grpc"
	"google.golang.org/grpc/credentials/insecure"
	"google.golang.org/protobuf/types/known/structpb"
	"google.golang.org/protobuf/types/known/timestamppb"
	a2a "github.com/owulveryck/agenthub/events/a2a"
	pb "github.com/owulveryck/agenthub/events/eventbus"
)
const (
	agentHubAddr = "localhost:50051"
	agentID      = "a2a_workflow_coordinator"
)
type A2ADocumentWorkflow struct {
	DocumentID    string
	ContextID     string                 // A2A conversation context
	Status        string
	CurrentStage  string
	TaskHistory   []*a2a.Task           // Complete A2A task history
	Artifacts     []*a2a.Artifact       // Collected artifacts from stages
	StartTime     time.Time
	client        pb.AgentHubClient      // A2A-compliant client
}
func main() {
	conn, err := grpc.Dial(agentHubAddr, grpc.WithTransportCredentials(insecure.NewCredentials()))
	if err != nil {
		log.Fatalf("Failed to connect: %v", err)
	}
	defer conn.Close()
	client := pb.NewAgentHubClient(conn)
	coordinator := &A2AWorkflowCoordinator{
		client:        client,
		workflows:     make(map[string]*A2ADocumentWorkflow),
	}
	ctx := context.Background()
	// Start listening for A2A task events
	go coordinator.subscribeToA2AEvents(ctx)
	// Start processing documents with A2A workflow
	coordinator.startA2ADocumentProcessing(ctx)
	// Keep running
	select {}
}
type A2AWorkflowCoordinator struct {
	client    pb.AgentHubClient
	workflows map[string]*A2ADocumentWorkflow
}
func (wc *A2AWorkflowCoordinator) startA2ADocumentProcessing(ctx context.Context) {
	// Simulate document arrival with A2A structured content
	documents := []map[string]interface{}{
		{
			"document_id": "doc_001",
			"content":     "This is a sample business document about quarterly results.",
			"filename":    "q3_results.txt",
			"source":      "email_attachment",
			"doc_type":    "business_report",
		},
		{
			"document_id": "doc_002",
			"content":     "Technical specification for the new API endpoints and authentication mechanisms.",
			"filename":    "api_spec.txt",
			"source":      "file_upload",
			"doc_type":    "technical_spec",
		},
	}
	for _, doc := range documents {
		wc.processA2ADocument(ctx, doc)
		time.Sleep(5 * time.Second)
	}
}
func (wc *A2AWorkflowCoordinator) processA2ADocument(ctx context.Context, document map[string]interface{}) {
	documentID := document["document_id"].(string)
	contextID := fmt.Sprintf("doc_workflow_%s_%s", documentID, uuid.New().String())
	workflow := &A2ADocumentWorkflow{
		DocumentID:   documentID,
		ContextID:    contextID,
		Status:       "started",
		CurrentStage: "intake",
		TaskHistory:  make([]*a2a.Task, 0),
		Artifacts:    make([]*a2a.Artifact, 0),
		StartTime:    time.Now(),
		client:       wc.client,
	}
	wc.workflows[documentID] = workflow
	log.Printf("Starting A2A document processing workflow for %s with context %s", documentID, contextID)
	// Stage 1: A2A Document Intake
	wc.publishA2ATask(ctx, "document_intake", document, "a2a_document_intake_agent", workflow)
}
func (wc *A2AWorkflowCoordinator) publishA2ATask(ctx context.Context, taskDescription string, params map[string]interface{}, targetAgent string, workflow *A2ADocumentWorkflow) {
	taskID := fmt.Sprintf("task_%s_%s", taskDescription, uuid.New().String())
	messageID := fmt.Sprintf("msg_%d_%s", time.Now().Unix(), uuid.New().String())
	// Create A2A structured content
	paramsData, err := structpb.NewStruct(params)
	if err != nil {
		log.Printf("Error creating parameters: %v", err)
		return
	}
	// Create A2A message with structured parts
	requestMessage := &a2a.Message{
		MessageId: messageID,
		ContextId: workflow.ContextID,
		TaskId:    taskID,
		Role:      a2a.Role_USER,
		Content: []*a2a.Part{
			{
				Part: &a2a.Part_Text{
					Text: fmt.Sprintf("Please process %s for document %s", taskDescription, workflow.DocumentID),
				},
			},
			{
				Part: &a2a.Part_Data{
					Data: &a2a.DataPart{
						Data:        paramsData,
						Description: fmt.Sprintf("%s parameters", taskDescription),
					},
				},
			},
		},
	}
	// Create A2A task
	task := &a2a.Task{
		Id:        taskID,
		ContextId: workflow.ContextID,
		Status: &a2a.TaskStatus{
			State:     a2a.TaskState_TASK_STATE_SUBMITTED,
			Update:    requestMessage,
			Timestamp: timestamppb.Now(),
		},
		History: []*a2a.Message{requestMessage},
		Metadata: paramsData,
	}
	// Store in workflow history
	workflow.TaskHistory = append(workflow.TaskHistory, task)
	// Publish A2A task update
	req := &pb.PublishTaskUpdateRequest{
		Task: task,
		Routing: &pb.AgentEventMetadata{
			FromAgentId: agentID,
			ToAgentId:   targetAgent,
			EventType:   "task.submitted",
			Priority:    pb.Priority_PRIORITY_MEDIUM,
		},
	}
	log.Printf("Publishing A2A %s task for workflow %s in context %s", taskDescription, workflow.DocumentID, workflow.ContextID)
	_, err = wc.client.PublishTaskUpdate(ctx, req)
	if err != nil {
		log.Printf("Error publishing A2A task: %v", err)
	}
}
func (wc *WorkflowCoordinator) subscribeToResults(ctx context.Context) {
	req := &pb.SubscribeToTaskResultsRequest{
		RequesterAgentId: agentID,
	}
	stream, err := wc.client.SubscribeToTaskResults(ctx, req)
	if err != nil {
		log.Printf("Error subscribing to results: %v", err)
		return
	}
	for {
		result, err := stream.Recv()
		if err != nil {
			log.Printf("Error receiving result: %v", err)
			return
		}
		wc.handleTaskResult(ctx, result)
	}
}
func (wc *WorkflowCoordinator) handleTaskResult(ctx context.Context, result *pb.TaskResult) {
	params := result.GetResult().AsMap()
	workflowID := params["workflow_id"].(string)
	stage := params["stage"].(string)
	workflow, exists := wc.workflows[workflowID]
	if !exists {
		log.Printf("Unknown workflow ID: %s", workflowID)
		return
	}
	log.Printf("Received result for workflow %s, stage %s: %s",
		workflowID, stage, result.GetStatus().String())
	if result.GetStatus() == pb.TaskStatus_TASK_STATUS_FAILED {
		workflow.Status = "failed"
		log.Printf("Workflow %s failed at stage %s: %s",
			workflowID, stage, result.GetErrorMessage())
		return
	}
	// Store stage results
	workflow.Results[stage] = params
	// Advance to next stage
	wc.advanceWorkflow(ctx, workflow, stage)
}
func (wc *WorkflowCoordinator) advanceWorkflow(ctx context.Context, workflow *DocumentWorkflow, completedStage string) {
	switch completedStage {
	case "document_intake":
		// Move to validation
		workflow.CurrentStage = "validation"
		data := workflow.Results["document_intake"]
		wc.publishTask(ctx, "document_validation", data.(map[string]interface{}), "validation_agent", workflow.DocumentID)
	case "document_validation":
		// Move to metadata extraction
		workflow.CurrentStage = "metadata_extraction"
		data := workflow.Results["document_validation"]
		wc.publishTask(ctx, "metadata_extraction", data.(map[string]interface{}), "metadata_agent", workflow.DocumentID)
	case "metadata_extraction":
		// Move to text processing
		workflow.CurrentStage = "text_processing"
		data := workflow.Results["metadata_extraction"]
		wc.publishTask(ctx, "text_processing", data.(map[string]interface{}), "text_processor_agent", workflow.DocumentID)
	case "text_processing":
		// Move to summary generation
		workflow.CurrentStage = "summary_generation"
		data := workflow.Results["text_processing"]
		wc.publishTask(ctx, "summary_generation", data.(map[string]interface{}), "summary_agent", workflow.DocumentID)
	case "summary_generation":
		// Workflow complete
		workflow.Status = "completed"
		workflow.CurrentStage = "finished"
		duration := time.Since(workflow.StartTime)
		log.Printf("Workflow %s completed successfully in %v", workflow.DocumentID, duration)
		wc.printWorkflowSummary(workflow)
	}
}
func (wc *WorkflowCoordinator) printWorkflowSummary(workflow *DocumentWorkflow) {
	fmt.Printf("\n=== WORKFLOW SUMMARY ===\n")
	fmt.Printf("Document ID: %s\n", workflow.DocumentID)
	fmt.Printf("Status: %s\n", workflow.Status)
	fmt.Printf("Duration: %v\n", time.Since(workflow.StartTime))
	fmt.Printf("Stages completed:\n")
	for stage, result := range workflow.Results {
		fmt.Printf("  - %s: %v\n", stage, result)
	}
	fmt.Printf("=======================\n\n")
}
Now let’s create each specialized agent that handles specific stages of the pipeline.
Create agents/document_intake/main.go:
package main
import (
	"context"
	"crypto/md5"
	"fmt"
	"io"
	"log"
	"strings"
	"time"
	"google.golang.org/grpc"
	"google.golang.org/grpc/credentials/insecure"
	"google.golang.org/protobuf/types/known/structpb"
	"google.golang.org/protobuf/types/known/timestamppb"
	pb "github.com/owulveryck/agenthub/events/a2a"
)
const (
	agentHubAddr = "localhost:50051"
	agentID      = "document_intake_agent"
)
func main() {
	conn, err := grpc.Dial(agentHubAddr, grpc.WithTransportCredentials(insecure.NewCredentials()))
	if err != nil {
		log.Fatalf("Failed to connect: %v", err)
	}
	defer conn.Close()
	client := pb.NewEventBusClient(conn)
	agent := &DocumentIntakeAgent{client: client}
	ctx := context.Background()
	agent.start(ctx)
}
type DocumentIntakeAgent struct {
	client pb.EventBusClient
}
func (dia *DocumentIntakeAgent) start(ctx context.Context) {
	log.Printf("Document Intake Agent %s starting...", agentID)
	req := &pb.SubscribeToTasksRequest{
		AgentId:   agentID,
		TaskTypes: []string{"document_intake"},
	}
	stream, err := dia.client.SubscribeToTasks(ctx, req)
	if err != nil {
		log.Fatalf("Error subscribing: %v", err)
	}
	log.Printf("Subscribed to document intake tasks")
	for {
		task, err := stream.Recv()
		if err == io.EOF {
			return
		}
		if err != nil {
			log.Printf("Error receiving task: %v", err)
			return
		}
		go dia.processTask(ctx, task)
	}
}
func (dia *DocumentIntakeAgent) processTask(ctx context.Context, task *pb.TaskMessage) {
	log.Printf("Processing document intake task: %s", task.GetTaskId())
	params := task.GetParameters().AsMap()
	// Simulate document intake processing
	time.Sleep(2 * time.Second)
	// Generate document hash
	content := params["content"].(string)
	hash := fmt.Sprintf("%x", md5.Sum([]byte(content)))
	// Extract basic metadata
	wordCount := len(strings.Fields(content))
	charCount := len(content)
	result := map[string]interface{}{
		"document_id":   params["document_id"],
		"workflow_id":   params["workflow_id"],
		"stage":         "document_intake",
		"content":       content,
		"filename":      params["filename"],
		"source":        params["source"],
		"document_hash": hash,
		"word_count":    wordCount,
		"char_count":    charCount,
		"intake_timestamp": time.Now().Format(time.RFC3339),
		"status":        "intake_complete",
	}
	dia.publishResult(ctx, task, result, pb.TaskStatus_TASK_STATUS_COMPLETED, "")
}
func (dia *DocumentIntakeAgent) publishResult(ctx context.Context, originalTask *pb.TaskMessage, result map[string]interface{}, status pb.TaskStatus, errorMsg string) {
	resultStruct, err := structpb.NewStruct(result)
	if err != nil {
		log.Printf("Error creating result struct: %v", err)
		return
	}
	taskResult := &pb.TaskResult{
		TaskId:          originalTask.GetTaskId(),
		Status:          status,
		Result:          resultStruct,
		ErrorMessage:    errorMsg,
		ExecutorAgentId: agentID,
		CompletedAt:     timestamppb.Now(),
	}
	req := &pb.PublishTaskResultRequest{Result: taskResult}
	_, err = dia.client.PublishTaskResult(ctx, req)
	if err != nil {
		log.Printf("Error publishing result: %v", err)
	} else {
		log.Printf("Published result for task %s", originalTask.GetTaskId())
	}
}
Create agents/validation/main.go:
package main
import (
	"context"
	"io"
	"log"
	"strings"
	"time"
	"google.golang.org/grpc"
	"google.golang.org/grpc/credentials/insecure"
	"google.golang.org/protobuf/types/known/structpb"
	"google.golang.org/protobuf/types/known/timestamppb"
	pb "github.com/owulveryck/agenthub/events/a2a"
)
const (
	agentHubAddr = "localhost:50051"
	agentID      = "validation_agent"
)
func main() {
	conn, err := grpc.Dial(agentHubAddr, grpc.WithTransportCredentials(insecure.NewCredentials()))
	if err != nil {
		log.Fatalf("Failed to connect: %v", err)
	}
	defer conn.Close()
	client := pb.NewEventBusClient(conn)
	agent := &ValidationAgent{client: client}
	ctx := context.Background()
	agent.start(ctx)
}
type ValidationAgent struct {
	client pb.EventBusClient
}
func (va *ValidationAgent) start(ctx context.Context) {
	log.Printf("Validation Agent %s starting...", agentID)
	req := &pb.SubscribeToTasksRequest{
		AgentId:   agentID,
		TaskTypes: []string{"document_validation"},
	}
	stream, err := va.client.SubscribeToTasks(ctx, req)
	if err != nil {
		log.Fatalf("Error subscribing: %v", err)
	}
	log.Printf("Subscribed to document validation tasks")
	for {
		task, err := stream.Recv()
		if err == io.EOF {
			return
		}
		if err != nil {
			log.Printf("Error receiving task: %v", err)
			return
		}
		go va.processTask(ctx, task)
	}
}
func (va *ValidationAgent) processTask(ctx context.Context, task *pb.TaskMessage) {
	log.Printf("Processing validation task: %s", task.GetTaskId())
	params := task.GetParameters().AsMap()
	// Simulate validation processing
	time.Sleep(1500 * time.Millisecond)
	content := params["content"].(string)
	// Perform validation checks
	validationResults := va.validateDocument(content)
	result := map[string]interface{}{
		"document_id":       params["document_id"],
		"workflow_id":       params["workflow_id"],
		"stage":             "document_validation",
		"content":           content,
		"filename":          params["filename"],
		"source":            params["source"],
		"document_hash":     params["document_hash"],
		"word_count":        params["word_count"],
		"char_count":        params["char_count"],
		"intake_timestamp":  params["intake_timestamp"],
		"validation_results": validationResults,
		"validation_timestamp": time.Now().Format(time.RFC3339),
		"status":            "validation_complete",
	}
	var status pb.TaskStatus
	var errorMsg string
	if validationResults["is_valid"].(bool) {
		status = pb.TaskStatus_TASK_STATUS_COMPLETED
	} else {
		status = pb.TaskStatus_TASK_STATUS_FAILED
		errorMsg = "Document validation failed: " + validationResults["errors"].(string)
	}
	va.publishResult(ctx, task, result, status, errorMsg)
}
func (va *ValidationAgent) validateDocument(content string) map[string]interface{} {
	// Simple validation rules
	isValid := true
	var errors []string
	// Check minimum length
	if len(content) < 10 {
		isValid = false
		errors = append(errors, "content too short")
	}
	// Check for suspicious content
	suspiciousTerms := []string{"malware", "virus", "hack"}
	for _, term := range suspiciousTerms {
		if strings.Contains(strings.ToLower(content), term) {
			isValid = false
			errors = append(errors, "suspicious content detected")
			break
		}
	}
	// Check language (simple heuristic)
	isEnglish := va.isEnglishContent(content)
	return map[string]interface{}{
		"is_valid":    isValid,
		"is_english":  isEnglish,
		"errors":      strings.Join(errors, "; "),
		"length_ok":   len(content) >= 10,
		"safe_content": !strings.Contains(strings.ToLower(content), "malware"),
	}
}
func (va *ValidationAgent) isEnglishContent(content string) bool {
	// Simple heuristic: check for common English words
	commonWords := []string{"the", "and", "or", "but", "in", "on", "at", "to", "for", "of", "with", "by"}
	lowerContent := strings.ToLower(content)
	matches := 0
	for _, word := range commonWords {
		if strings.Contains(lowerContent, " "+word+" ") {
			matches++
		}
	}
	return matches >= 2
}
func (va *ValidationAgent) publishResult(ctx context.Context, originalTask *pb.TaskMessage, result map[string]interface{}, status pb.TaskStatus, errorMsg string) {
	resultStruct, err := structpb.NewStruct(result)
	if err != nil {
		log.Printf("Error creating result struct: %v", err)
		return
	}
	taskResult := &pb.TaskResult{
		TaskId:          originalTask.GetTaskId(),
		Status:          status,
		Result:          resultStruct,
		ErrorMessage:    errorMsg,
		ExecutorAgentId: agentID,
		CompletedAt:     timestamppb.Now(),
	}
	req := &pb.PublishTaskResultRequest{Result: taskResult}
	_, err = va.client.PublishTaskResult(ctx, req)
	if err != nil {
		log.Printf("Error publishing result: %v", err)
	} else {
		log.Printf("Published result for task %s", originalTask.GetTaskId())
	}
}
Update the Makefile to include the new agents:
# Add to Makefile build target
build: proto
	@echo "Building server binary..."
	go build $(GO_BUILD_FLAGS) -o bin/$(SERVER_BINARY) broker/main.go
	@echo "Building coordinator binary..."
	go build $(GO_BUILD_FLAGS) -o bin/coordinator agents/coordinator/main.go
	@echo "Building document intake agent..."
	go build $(GO_BUILD_FLAGS) -o bin/document-intake agents/document_intake/main.go
	@echo "Building validation agent..."
	go build $(GO_BUILD_FLAGS) -o bin/validation agents/validation/main.go
	@echo "Building publisher binary..."
	go build $(GO_BUILD_FLAGS) -o bin/$(PUBLISHER_BINARY) agents/publisher/main.go
	@echo "Building subscriber binary..."
	go build $(GO_BUILD_FLAGS) -o bin/$(SUBSCRIBER_BINARY) agents/subscriber/main.go
	@echo "Build complete. Binaries are in the 'bin/' directory."
Build all components:
make build
Now let’s run the complete multi-agent system:
Terminal 1 - Start the broker:
make run-server
Terminal 2 - Start the document intake agent:
./bin/document-intake
Terminal 3 - Start the validation agent:
./bin/validation
Terminal 4 - Start the workflow coordinator:
./bin/coordinator
You’ll see the workflow coordinator processing documents through multiple stages:
Each agent processes its stage and passes results to the next stage via the AgentHub broker.
This tutorial demonstrates several key patterns:
The coordinator agent manages the overall workflow, determining which stage comes next and handling failures.
Each agent has a specific responsibility and can be developed, deployed, and scaled independently.
Agents work asynchronously, allowing for better resource utilization and scalability.
The system handles failures gracefully, with the coordinator managing workflow state.
Structured data flows between agents, with each stage adding value to the processing pipeline.
Now that you understand multi-agent workflows:
This pattern scales to handle complex business processes, data pipelines, and automated workflows in production systems.
You now have the foundation for building sophisticated multi-agent systems that can handle complex, real-world workflows!