Building Multi-Agent Workflows
Building Multi-Agent Workflows
This advanced tutorial teaches you to create complex workflows involving multiple specialized agents working together to accomplish sophisticated tasks. You’ll build a real document processing pipeline with multiple agents handling different stages.
What You’ll Build
By the end of this tutorial, you’ll have a multi-agent system that:
- Ingests documents through a Document Intake Agent
- Validates content using a Validation Agent
- Extracts metadata with a Metadata Extraction Agent
- Processes text through a Text Processing Agent
- Generates summaries using a Summary Agent
- Orchestrates the workflow with a Workflow Coordinator Agent
This demonstrates real-world agent collaboration patterns used in production systems.
Prerequisites
- Complete the Installation and Setup tutorial
- Complete the Running the Demo tutorial
- Familiarity with Go programming
- Understanding of basic agent concepts
Architecture Overview
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ Workflow │ │ AgentHub │ │ Specialized │
│ Coordinator │ │ Broker │ │ Agents │
│ │ │ │ │ │
│ • Orchestrates │◄──►│ • Routes tasks │◄──►│ • Document │
│ pipeline │ │ • Tracks │ │ Intake │
│ • Manages │ │ progress │ │ • Validation │
│ dependencies │ │ • Handles │ │ • Metadata │
│ • Handles │ │ failures │ │ • Text Proc │
│ failures │ │ │ │ • Summary │
└─────────────────┘ └─────────────────┘ └─────────────────┘
Step 1: Create the Workflow Coordinator
First, let’s create the main coordinator that manages the document processing pipeline.
Create the coordinator agent:
mkdir -p agents/coordinator
Create agents/coordinator/main.go:
package main
import (
"context"
"fmt"
"log"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/protobuf/types/known/structpb"
"google.golang.org/protobuf/types/known/timestamppb"
pb "github.com/owulveryck/agenthub/internal/grpc"
)
const (
agentHubAddr = "localhost:50051"
agentID = "workflow_coordinator"
)
type DocumentWorkflow struct {
DocumentID string
Status string
CurrentStage string
Results map[string]interface{}
StartTime time.Time
client pb.EventBusClient
}
func main() {
conn, err := grpc.Dial(agentHubAddr, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
log.Fatalf("Failed to connect: %v", err)
}
defer conn.Close()
client := pb.NewEventBusClient(conn)
coordinator := &WorkflowCoordinator{
client: client,
workflows: make(map[string]*DocumentWorkflow),
}
ctx := context.Background()
// Start listening for task results
go coordinator.subscribeToResults(ctx)
// Start processing documents
coordinator.startDocumentProcessing(ctx)
// Keep running
select {}
}
type WorkflowCoordinator struct {
client pb.EventBusClient
workflows map[string]*DocumentWorkflow
}
func (wc *WorkflowCoordinator) startDocumentProcessing(ctx context.Context) {
// Simulate document arrival
documents := []map[string]interface{}{
{
"document_id": "doc_001",
"content": "This is a sample business document about quarterly results.",
"filename": "q3_results.txt",
"source": "email_attachment",
},
{
"document_id": "doc_002",
"content": "Technical specification for the new API endpoints and authentication mechanisms.",
"filename": "api_spec.txt",
"source": "file_upload",
},
}
for _, doc := range documents {
wc.processDocument(ctx, doc)
time.Sleep(5 * time.Second)
}
}
func (wc *WorkflowCoordinator) processDocument(ctx context.Context, document map[string]interface{}) {
documentID := document["document_id"].(string)
workflow := &DocumentWorkflow{
DocumentID: documentID,
Status: "started",
CurrentStage: "intake",
Results: make(map[string]interface{}),
StartTime: time.Now(),
}
wc.workflows[documentID] = workflow
log.Printf("Starting document processing workflow for %s", documentID)
// Stage 1: Document Intake
wc.publishTask(ctx, "document_intake", document, "document_intake_agent", documentID)
}
func (wc *WorkflowCoordinator) publishTask(ctx context.Context, taskType string, params map[string]interface{}, targetAgent, workflowID string) {
taskID := fmt.Sprintf("%s_%s_%d", taskType, workflowID, time.Now().Unix())
// Add workflow metadata
params["workflow_id"] = workflowID
params["stage"] = taskType
parametersStruct, err := structpb.NewStruct(params)
if err != nil {
log.Printf("Error creating parameters: %v", err)
return
}
task := &pb.TaskMessage{
TaskId: taskID,
TaskType: taskType,
Parameters: parametersStruct,
RequesterAgentId: agentID,
ResponderAgentId: targetAgent,
Priority: pb.Priority_PRIORITY_MEDIUM,
CreatedAt: timestamppb.Now(),
}
req := &pb.PublishTaskRequest{Task: task}
log.Printf("Publishing %s task for workflow %s", taskType, workflowID)
_, err = wc.client.PublishTask(ctx, req)
if err != nil {
log.Printf("Error publishing task: %v", err)
}
}
func (wc *WorkflowCoordinator) subscribeToResults(ctx context.Context) {
req := &pb.SubscribeToTaskResultsRequest{
RequesterAgentId: agentID,
}
stream, err := wc.client.SubscribeToTaskResults(ctx, req)
if err != nil {
log.Printf("Error subscribing to results: %v", err)
return
}
for {
result, err := stream.Recv()
if err != nil {
log.Printf("Error receiving result: %v", err)
return
}
wc.handleTaskResult(ctx, result)
}
}
func (wc *WorkflowCoordinator) handleTaskResult(ctx context.Context, result *pb.TaskResult) {
params := result.GetResult().AsMap()
workflowID := params["workflow_id"].(string)
stage := params["stage"].(string)
workflow, exists := wc.workflows[workflowID]
if !exists {
log.Printf("Unknown workflow ID: %s", workflowID)
return
}
log.Printf("Received result for workflow %s, stage %s: %s",
workflowID, stage, result.GetStatus().String())
if result.GetStatus() == pb.TaskStatus_TASK_STATUS_FAILED {
workflow.Status = "failed"
log.Printf("Workflow %s failed at stage %s: %s",
workflowID, stage, result.GetErrorMessage())
return
}
// Store stage results
workflow.Results[stage] = params
// Advance to next stage
wc.advanceWorkflow(ctx, workflow, stage)
}
func (wc *WorkflowCoordinator) advanceWorkflow(ctx context.Context, workflow *DocumentWorkflow, completedStage string) {
switch completedStage {
case "document_intake":
// Move to validation
workflow.CurrentStage = "validation"
data := workflow.Results["document_intake"]
wc.publishTask(ctx, "document_validation", data.(map[string]interface{}), "validation_agent", workflow.DocumentID)
case "document_validation":
// Move to metadata extraction
workflow.CurrentStage = "metadata_extraction"
data := workflow.Results["document_validation"]
wc.publishTask(ctx, "metadata_extraction", data.(map[string]interface{}), "metadata_agent", workflow.DocumentID)
case "metadata_extraction":
// Move to text processing
workflow.CurrentStage = "text_processing"
data := workflow.Results["metadata_extraction"]
wc.publishTask(ctx, "text_processing", data.(map[string]interface{}), "text_processor_agent", workflow.DocumentID)
case "text_processing":
// Move to summary generation
workflow.CurrentStage = "summary_generation"
data := workflow.Results["text_processing"]
wc.publishTask(ctx, "summary_generation", data.(map[string]interface{}), "summary_agent", workflow.DocumentID)
case "summary_generation":
// Workflow complete
workflow.Status = "completed"
workflow.CurrentStage = "finished"
duration := time.Since(workflow.StartTime)
log.Printf("Workflow %s completed successfully in %v", workflow.DocumentID, duration)
wc.printWorkflowSummary(workflow)
}
}
func (wc *WorkflowCoordinator) printWorkflowSummary(workflow *DocumentWorkflow) {
fmt.Printf("\n=== WORKFLOW SUMMARY ===\n")
fmt.Printf("Document ID: %s\n", workflow.DocumentID)
fmt.Printf("Status: %s\n", workflow.Status)
fmt.Printf("Duration: %v\n", time.Since(workflow.StartTime))
fmt.Printf("Stages completed:\n")
for stage, result := range workflow.Results {
fmt.Printf(" - %s: %v\n", stage, result)
}
fmt.Printf("=======================\n\n")
}
Step 2: Create Specialized Agents
Now let’s create each specialized agent that handles specific stages of the pipeline.
Document Intake Agent
Create agents/document_intake/main.go:
package main
import (
"context"
"crypto/md5"
"fmt"
"io"
"log"
"strings"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/protobuf/types/known/structpb"
"google.golang.org/protobuf/types/known/timestamppb"
pb "github.com/owulveryck/agenthub/internal/grpc"
)
const (
agentHubAddr = "localhost:50051"
agentID = "document_intake_agent"
)
func main() {
conn, err := grpc.Dial(agentHubAddr, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
log.Fatalf("Failed to connect: %v", err)
}
defer conn.Close()
client := pb.NewEventBusClient(conn)
agent := &DocumentIntakeAgent{client: client}
ctx := context.Background()
agent.start(ctx)
}
type DocumentIntakeAgent struct {
client pb.EventBusClient
}
func (dia *DocumentIntakeAgent) start(ctx context.Context) {
log.Printf("Document Intake Agent %s starting...", agentID)
req := &pb.SubscribeToTasksRequest{
AgentId: agentID,
TaskTypes: []string{"document_intake"},
}
stream, err := dia.client.SubscribeToTasks(ctx, req)
if err != nil {
log.Fatalf("Error subscribing: %v", err)
}
log.Printf("Subscribed to document intake tasks")
for {
task, err := stream.Recv()
if err == io.EOF {
return
}
if err != nil {
log.Printf("Error receiving task: %v", err)
return
}
go dia.processTask(ctx, task)
}
}
func (dia *DocumentIntakeAgent) processTask(ctx context.Context, task *pb.TaskMessage) {
log.Printf("Processing document intake task: %s", task.GetTaskId())
params := task.GetParameters().AsMap()
// Simulate document intake processing
time.Sleep(2 * time.Second)
// Generate document hash
content := params["content"].(string)
hash := fmt.Sprintf("%x", md5.Sum([]byte(content)))
// Extract basic metadata
wordCount := len(strings.Fields(content))
charCount := len(content)
result := map[string]interface{}{
"document_id": params["document_id"],
"workflow_id": params["workflow_id"],
"stage": "document_intake",
"content": content,
"filename": params["filename"],
"source": params["source"],
"document_hash": hash,
"word_count": wordCount,
"char_count": charCount,
"intake_timestamp": time.Now().Format(time.RFC3339),
"status": "intake_complete",
}
dia.publishResult(ctx, task, result, pb.TaskStatus_TASK_STATUS_COMPLETED, "")
}
func (dia *DocumentIntakeAgent) publishResult(ctx context.Context, originalTask *pb.TaskMessage, result map[string]interface{}, status pb.TaskStatus, errorMsg string) {
resultStruct, err := structpb.NewStruct(result)
if err != nil {
log.Printf("Error creating result struct: %v", err)
return
}
taskResult := &pb.TaskResult{
TaskId: originalTask.GetTaskId(),
Status: status,
Result: resultStruct,
ErrorMessage: errorMsg,
ExecutorAgentId: agentID,
CompletedAt: timestamppb.Now(),
}
req := &pb.PublishTaskResultRequest{Result: taskResult}
_, err = dia.client.PublishTaskResult(ctx, req)
if err != nil {
log.Printf("Error publishing result: %v", err)
} else {
log.Printf("Published result for task %s", originalTask.GetTaskId())
}
}
Validation Agent
Create agents/validation/main.go:
package main
import (
"context"
"io"
"log"
"strings"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/protobuf/types/known/structpb"
"google.golang.org/protobuf/types/known/timestamppb"
pb "github.com/owulveryck/agenthub/internal/grpc"
)
const (
agentHubAddr = "localhost:50051"
agentID = "validation_agent"
)
func main() {
conn, err := grpc.Dial(agentHubAddr, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
log.Fatalf("Failed to connect: %v", err)
}
defer conn.Close()
client := pb.NewEventBusClient(conn)
agent := &ValidationAgent{client: client}
ctx := context.Background()
agent.start(ctx)
}
type ValidationAgent struct {
client pb.EventBusClient
}
func (va *ValidationAgent) start(ctx context.Context) {
log.Printf("Validation Agent %s starting...", agentID)
req := &pb.SubscribeToTasksRequest{
AgentId: agentID,
TaskTypes: []string{"document_validation"},
}
stream, err := va.client.SubscribeToTasks(ctx, req)
if err != nil {
log.Fatalf("Error subscribing: %v", err)
}
log.Printf("Subscribed to document validation tasks")
for {
task, err := stream.Recv()
if err == io.EOF {
return
}
if err != nil {
log.Printf("Error receiving task: %v", err)
return
}
go va.processTask(ctx, task)
}
}
func (va *ValidationAgent) processTask(ctx context.Context, task *pb.TaskMessage) {
log.Printf("Processing validation task: %s", task.GetTaskId())
params := task.GetParameters().AsMap()
// Simulate validation processing
time.Sleep(1500 * time.Millisecond)
content := params["content"].(string)
// Perform validation checks
validationResults := va.validateDocument(content)
result := map[string]interface{}{
"document_id": params["document_id"],
"workflow_id": params["workflow_id"],
"stage": "document_validation",
"content": content,
"filename": params["filename"],
"source": params["source"],
"document_hash": params["document_hash"],
"word_count": params["word_count"],
"char_count": params["char_count"],
"intake_timestamp": params["intake_timestamp"],
"validation_results": validationResults,
"validation_timestamp": time.Now().Format(time.RFC3339),
"status": "validation_complete",
}
var status pb.TaskStatus
var errorMsg string
if validationResults["is_valid"].(bool) {
status = pb.TaskStatus_TASK_STATUS_COMPLETED
} else {
status = pb.TaskStatus_TASK_STATUS_FAILED
errorMsg = "Document validation failed: " + validationResults["errors"].(string)
}
va.publishResult(ctx, task, result, status, errorMsg)
}
func (va *ValidationAgent) validateDocument(content string) map[string]interface{} {
// Simple validation rules
isValid := true
var errors []string
// Check minimum length
if len(content) < 10 {
isValid = false
errors = append(errors, "content too short")
}
// Check for suspicious content
suspiciousTerms := []string{"malware", "virus", "hack"}
for _, term := range suspiciousTerms {
if strings.Contains(strings.ToLower(content), term) {
isValid = false
errors = append(errors, "suspicious content detected")
break
}
}
// Check language (simple heuristic)
isEnglish := va.isEnglishContent(content)
return map[string]interface{}{
"is_valid": isValid,
"is_english": isEnglish,
"errors": strings.Join(errors, "; "),
"length_ok": len(content) >= 10,
"safe_content": !strings.Contains(strings.ToLower(content), "malware"),
}
}
func (va *ValidationAgent) isEnglishContent(content string) bool {
// Simple heuristic: check for common English words
commonWords := []string{"the", "and", "or", "but", "in", "on", "at", "to", "for", "of", "with", "by"}
lowerContent := strings.ToLower(content)
matches := 0
for _, word := range commonWords {
if strings.Contains(lowerContent, " "+word+" ") {
matches++
}
}
return matches >= 2
}
func (va *ValidationAgent) publishResult(ctx context.Context, originalTask *pb.TaskMessage, result map[string]interface{}, status pb.TaskStatus, errorMsg string) {
resultStruct, err := structpb.NewStruct(result)
if err != nil {
log.Printf("Error creating result struct: %v", err)
return
}
taskResult := &pb.TaskResult{
TaskId: originalTask.GetTaskId(),
Status: status,
Result: resultStruct,
ErrorMessage: errorMsg,
ExecutorAgentId: agentID,
CompletedAt: timestamppb.Now(),
}
req := &pb.PublishTaskResultRequest{Result: taskResult}
_, err = va.client.PublishTaskResult(ctx, req)
if err != nil {
log.Printf("Error publishing result: %v", err)
} else {
log.Printf("Published result for task %s", originalTask.GetTaskId())
}
}
Step 3: Build and Test the Multi-Agent System
Update the Makefile to include the new agents:
# Add to Makefile build target
build: proto
@echo "Building server binary..."
go build $(GO_BUILD_FLAGS) -o bin/$(SERVER_BINARY) broker/main.go
@echo "Building coordinator binary..."
go build $(GO_BUILD_FLAGS) -o bin/coordinator agents/coordinator/main.go
@echo "Building document intake agent..."
go build $(GO_BUILD_FLAGS) -o bin/document-intake agents/document_intake/main.go
@echo "Building validation agent..."
go build $(GO_BUILD_FLAGS) -o bin/validation agents/validation/main.go
@echo "Building publisher binary..."
go build $(GO_BUILD_FLAGS) -o bin/$(PUBLISHER_BINARY) agents/publisher/main.go
@echo "Building subscriber binary..."
go build $(GO_BUILD_FLAGS) -o bin/$(SUBSCRIBER_BINARY) agents/subscriber/main.go
@echo "Build complete. Binaries are in the 'bin/' directory."
Build all components:
make build
Step 4: Run the Multi-Agent Workflow
Now let’s run the complete multi-agent system:
Terminal 1 - Start the broker:
make run-server
Terminal 2 - Start the document intake agent:
./bin/document-intake
Terminal 3 - Start the validation agent:
./bin/validation
Terminal 4 - Start the workflow coordinator:
./bin/coordinator
Step 5: Observe the Workflow
You’ll see the workflow coordinator processing documents through multiple stages:
- Document Intake: Receives and processes raw documents
- Validation: Checks content for safety and validity
- Metadata Extraction: Extracts structured metadata
- Text Processing: Processes and analyzes text content
- Summary Generation: Creates document summaries
Each agent processes its stage and passes results to the next stage via the AgentHub broker.
Understanding the Multi-Agent Pattern
This tutorial demonstrates several key patterns:
1. Workflow Orchestration
The coordinator agent manages the overall workflow, determining which stage comes next and handling failures.
2. Specialized Agents
Each agent has a specific responsibility and can be developed, deployed, and scaled independently.
3. Asynchronous Processing
Agents work asynchronously, allowing for better resource utilization and scalability.
4. Error Handling
The system handles failures gracefully, with the coordinator managing workflow state.
5. Data Flow
Structured data flows between agents, with each stage adding value to the processing pipeline.
Next Steps
Now that you understand multi-agent workflows:
- Add more agents: Create metadata extraction, text processing, and summary agents
- Implement error recovery: Add retry mechanisms and failure handling
- Add monitoring: Create a dashboard agent that tracks workflow progress
- Scale the system: Run multiple instances of each agent type
- Add persistence: Store workflow state in a database for recovery
This pattern scales to handle complex business processes, data pipelines, and automated workflows in production systems.
Common Patterns and Best Practices
Workflow State Management
- Store workflow state persistently for recovery
- Use unique workflow IDs for tracking
- Implement timeouts for stuck workflows
Agent Communication
- Use structured messages with clear schemas
- Include metadata for routing and tracking
- Implement progress reporting for long-running tasks
Error Handling
- Design for partial failures
- Implement retry mechanisms with backoff
- Provide clear error messages and recovery paths
Monitoring and Observability
- Log all state transitions
- Track workflow performance metrics
- Implement health checks for agents
You now have the foundation for building sophisticated multi-agent systems that can handle complex, real-world workflows!
Feedback
Was this page helpful?
Glad to hear it! Please tell us how we can improve.
Sorry to hear that. Please tell us how we can improve.