How to Debug Agent Issues
How to Debug Agent Issues
This guide provides practical steps for troubleshooting common issues when developing and deploying agents with AgentHub.
Common Connection Issues
Problem: Agent Can’t Connect to Broker
Symptoms:
Failed to connect: connection refused
Solutions:
- Check if broker is running: - # Check if broker process is running ps aux | grep broker # Check if port 50051 is listening netstat -tlnp | grep 50051 # or lsof -i :50051
- Verify broker address and configuration: - // Using unified abstraction - configuration via environment or code config := agenthub.NewGRPCConfig("subscriber") config.BrokerAddr = "localhost" // Default config.BrokerPort = "50051" // Default // Or set via environment variables: // export AGENTHUB_BROKER_ADDR="localhost" // export AGENTHUB_BROKER_PORT="50051"
- Check firewall settings: - # On Linux, check if port is blocked sudo ufw status # Allow port if needed sudo ufw allow 50051
Problem: TLS/SSL Errors
Symptoms:
transport: authentication handshake failed
Solution: The unified abstraction handles TLS configuration automatically:
// TLS and connection management handled automatically
config := agenthub.NewGRPCConfig("subscriber")
client, err := agenthub.NewAgentHubClient(config)
if err != nil {
    panic(err)
}
Task Processing Issues
Problem: Agent Not Receiving Tasks
Debug Steps:
- Check subscription logs: - log.Printf("Agent %s subscribing to tasks...", agentID) // Should see: "Successfully subscribed to tasks for agent {agentID}"
- Verify agent ID matching: - // In publisher ResponderAgentId: "my_processing_agent" // In subscriber (must match exactly) const agentID = "my_processing_agent"
- Check task type filtering: - req := &pb.SubscribeToTasksRequest{ AgentId: agentID, TaskTypes: []string{"math_calculation"}, // Remove to receive all types }
- Monitor broker logs: - # Broker should show: Received task request: task_xyz (type: math) from agent: publisher_agent # And either: No subscribers for task from agent 'publisher_agent' # Bad - no matching agents # Or task routing to subscribers # Good - task delivered
Problem: Tasks Timing Out
Debug Steps:
- Check task processing time: - func processTask(ctx context.Context, task *pb.TaskMessage, client pb.EventBusClient) { start := time.Now() defer func() { log.Printf("Task %s took %v to process", task.GetTaskId(), time.Since(start)) }() // Your processing logic }
- Add timeout handling: - func processTaskWithTimeout(ctx context.Context, task *pb.TaskMessage, client pb.EventBusClient) { // Create timeout context taskCtx, cancel := context.WithTimeout(ctx, 30*time.Second) defer cancel() // Process with timeout select { case <-taskCtx.Done(): if taskCtx.Err() == context.DeadlineExceeded { sendResult(ctx, task, nil, pb.TaskStatus_TASK_STATUS_FAILED, "Task timeout", client) } return default: // Process normally } }
- Monitor progress updates: - // Send progress every few seconds ticker := time.NewTicker(5 * time.Second) defer ticker.Stop() go func() { progress := 0 for range ticker.C { progress += 10 if progress > 100 { return } sendProgress(ctx, task, int32(progress), "Still processing...", client) } }()
Message Serialization Issues
Problem: Parameter Marshaling Errors
Symptoms:
Error creating parameters struct: proto: invalid value type
Solution:
Ensure all parameter values are compatible with structpb:
// Bad - channels, functions, complex types not supported
params := map[string]interface{}{
    "callback": func() {},  // Not supported
    "channel": make(chan int),  // Not supported
}
// Good - basic types only
params := map[string]interface{}{
    "name": "value",           // string
    "count": 42,               // number
    "enabled": true,           // boolean
    "items": []string{"a", "b"}, // array
    "config": map[string]interface{}{ // nested object
        "timeout": 30,
    },
}
Problem: Result Unmarshaling Issues
Debug Steps:
- Check result structure: - func handleTaskResult(result *pb.TaskResult) { log.Printf("Raw result: %+v", result.GetResult()) resultMap := result.GetResult().AsMap() log.Printf("Result as map: %+v", resultMap) // Type assert carefully if value, ok := resultMap["count"].(float64); ok { log.Printf("Count: %f", value) } else { log.Printf("Count field missing or wrong type: %T", resultMap["count"]) } }
- Handle type conversion safely: - func getStringField(m map[string]interface{}, key string) (string, error) { if val, ok := m[key]; ok { if str, ok := val.(string); ok { return str, nil } return "", fmt.Errorf("field %s is not a string: %T", key, val) } return "", fmt.Errorf("field %s not found", key) } func getNumberField(m map[string]interface{}, key string) (float64, error) { if val, ok := m[key]; ok { if num, ok := val.(float64); ok { return num, nil } return 0, fmt.Errorf("field %s is not a number: %T", key, val) } return 0, fmt.Errorf("field %s not found", key) }
Stream and Connection Issues
Problem: Stream Disconnections
Symptoms:
Error receiving task: rpc error: code = Unavailable desc = connection error
Solutions:
- Implement retry logic: - func subscribeToTasksWithRetry(ctx context.Context, client pb.EventBusClient) { for { err := subscribeToTasks(ctx, client) if err != nil { log.Printf("Subscription error: %v, retrying in 5 seconds...", err) time.Sleep(5 * time.Second) continue } break } }
- Handle context cancellation: - for { task, err := stream.Recv() if err == io.EOF { log.Printf("Stream closed by server") return } if err != nil { if ctx.Err() != nil { log.Printf("Context cancelled: %v", ctx.Err()) return } log.Printf("Stream error: %v", err) return } // Process task }
Problem: Memory Leaks in Long-Running Agents
Debug Steps:
- Monitor memory usage: - # Check memory usage ps -o pid,ppid,cmd,%mem,%cpu -p $(pgrep -f "your-agent") # Continuous monitoring watch -n 5 'ps -o pid,ppid,cmd,%mem,%cpu -p $(pgrep -f "your-agent")'
- Profile memory usage: - import _ "net/http/pprof" import "net/http" func main() { // Start pprof server go func() { log.Println(http.ListenAndServe("localhost:6060", nil)) }() // Your agent code }- Access profiles at - http://localhost:6060/debug/pprof/
- Check for goroutine leaks: - import "runtime" func logGoroutines() { ticker := time.NewTicker(30 * time.Second) go func() { for range ticker.C { log.Printf("Goroutines: %d", runtime.NumGoroutine()) } }() }
Performance Issues
Problem: Slow Task Processing
Debug Steps:
- Add timing measurements: - func processTask(ctx context.Context, task *pb.TaskMessage, client pb.EventBusClient) { timings := make(map[string]time.Duration) start := time.Now() // Phase 1: Parameter validation timings["validation"] = time.Since(start) last := time.Now() // Phase 2: Business logic // ... your logic here ... timings["processing"] = time.Since(last) last = time.Now() // Phase 3: Result formatting // ... result creation ... timings["formatting"] = time.Since(last) log.Printf("Task %s timings: %+v", task.GetTaskId(), timings) }
- Profile CPU usage: - import "runtime/pprof" import "os" func startCPUProfile() func() { f, err := os.Create("cpu.prof") if err != nil { log.Fatal(err) } pprof.StartCPUProfile(f) return func() { pprof.StopCPUProfile() f.Close() } } func main() { stop := startCPUProfile() defer stop() // Your agent code }
- Monitor queue sizes: - type Agent struct { taskQueue chan *pb.TaskMessage } func (a *Agent) logQueueSize() { ticker := time.NewTicker(10 * time.Second) go func() { for range ticker.C { log.Printf("Task queue size: %d/%d", len(a.taskQueue), cap(a.taskQueue)) } }() }
Debugging Tools and Techniques
1. Enable Verbose Logging
import "log"
import "os"
func init() {
    // Enable verbose logging
    log.SetFlags(log.LstdFlags | log.Lshortfile)
    // Set log level from environment
    if os.Getenv("DEBUG") == "true" {
        log.SetOutput(os.Stdout)
    }
}
2. Add Structured Logging
import "encoding/json"
import "time"
type LogEntry struct {
    Timestamp string                 `json:"timestamp"`
    Level     string                 `json:"level"`
    AgentID   string                 `json:"agent_id"`
    TaskID    string                 `json:"task_id,omitempty"`
    Message   string                 `json:"message"`
    Data      map[string]interface{} `json:"data,omitempty"`
}
func logInfo(agentID, taskID, message string, data map[string]interface{}) {
    entry := LogEntry{
        Timestamp: time.Now().Format(time.RFC3339),
        Level:     "INFO",
        AgentID:   agentID,
        TaskID:    taskID,
        Message:   message,
        Data:      data,
    }
    if jsonData, err := json.Marshal(entry); err == nil {
        log.Println(string(jsonData))
    }
}
3. Health Check Endpoint
import "net/http"
import "encoding/json"
type HealthStatus struct {
    Status       string    `json:"status"`
    AgentID      string    `json:"agent_id"`
    Uptime       string    `json:"uptime"`
    TasksProcessed int64   `json:"tasks_processed"`
    LastTaskTime  time.Time `json:"last_task_time"`
}
func startHealthServer(agent *Agent) {
    http.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) {
        status := HealthStatus{
            Status:         "healthy",
            AgentID:        agent.ID,
            Uptime:         time.Since(agent.StartTime).String(),
            TasksProcessed: agent.TasksProcessed,
            LastTaskTime:   agent.LastTaskTime,
        }
        w.Header().Set("Content-Type", "application/json")
        json.NewEncoder(w).Encode(status)
    })
    log.Printf("Health server starting on :8080")
    log.Fatal(http.ListenAndServe(":8080", nil))
}
4. Task Tracing
import "context"
type TraceID string
func withTraceID(ctx context.Context) context.Context {
    traceID := TraceID(fmt.Sprintf("trace-%d", time.Now().UnixNano()))
    return context.WithValue(ctx, "trace_id", traceID)
}
func getTraceID(ctx context.Context) TraceID {
    if traceID, ok := ctx.Value("trace_id").(TraceID); ok {
        return traceID
    }
    return ""
}
func processTaskWithTracing(ctx context.Context, task *pb.TaskMessage, client pb.EventBusClient) {
    ctx = withTraceID(ctx)
    traceID := getTraceID(ctx)
    log.Printf("[%s] Starting task %s", traceID, task.GetTaskId())
    defer log.Printf("[%s] Finished task %s", traceID, task.GetTaskId())
    // Your processing logic with trace ID logging
}
Common Error Patterns
1. Resource Exhaustion
Signs:
- Tasks start failing after running for a while
- Memory usage continuously increases
- File descriptor limits reached
Solutions:
- Implement proper resource cleanup
- Add connection pooling
- Set task processing limits
2. Deadlocks
Signs:
- Agent stops processing tasks
- Health checks show agent as “stuck”
Solutions:
- Avoid blocking operations in main goroutines
- Use timeouts for all operations
- Implement deadlock detection
3. Race Conditions
Signs:
- Intermittent task failures
- Inconsistent behavior
- Data corruption
Solutions:
- Use proper synchronization primitives
- Run race detector: go run -race your-agent.go
- Add mutex protection for shared state
With these debugging techniques, you should be able to identify and resolve most agent-related issues efficiently.
Feedback
Was this page helpful?
Glad to hear it! Please tell us how we can improve.
Sorry to hear that. Please tell us how we can improve.