Unified Abstraction Library API Reference
Unified Abstraction Library API Reference
The AgentHub unified abstraction library provides simplified APIs for building gRPC-based agent communication systems with built-in observability, automatic configuration, and correlation tracking.
Package: internal/agenthub
The internal/agenthub package contains the core unified abstraction components that dramatically simplify AgentHub development by providing high-level APIs with automatic observability integration.
Overview
The unified abstraction library reduces agent implementation complexity from 380+ lines to ~29 lines by providing:
- Automatic gRPC Setup: One-line server and client creation
- Built-in Observability: Integrated OpenTelemetry tracing and metrics
- Environment-Based Configuration: Automatic configuration from environment variables
- Correlation Tracking: Automatic correlation ID generation and propagation
- Pluggable Architecture: Simple task handler registration
Core Components
GRPCConfig
Configuration structure for gRPC servers and clients with environment-based initialization.
type GRPCConfig struct {
ServerAddr string // gRPC server listen address (e.g., ":50051")
BrokerAddr string // Broker connection address (e.g., "localhost:50051")
HealthPort string // Health check endpoint port
ComponentName string // Component identifier for observability
}
Constructor
func NewGRPCConfig(componentName string) *GRPCConfig
Creates a new gRPC configuration with environment variable defaults:
| Environment Variable | Default | Description |
|---|---|---|
AGENTHUB_BROKER_ADDR | localhost | Broker server host |
AGENTHUB_BROKER_PORT | 50051 | Broker gRPC port |
AGENTHUB_GRPC_PORT | :50051 | Server listen port |
BROKER_HEALTH_PORT | 8080 | Health endpoint port |
Example:
config := agenthub.NewGRPCConfig("my-agent")
// Results in BrokerAddr: "localhost:50051" (automatically combined)
AgentHubServer
High-level gRPC server wrapper with integrated observability.
type AgentHubServer struct {
Server *grpc.Server // Underlying gRPC server
Listener net.Listener // Network listener
Observability *observability.Observability // OpenTelemetry integration
TraceManager *observability.TraceManager // Distributed tracing
MetricsManager *observability.MetricsManager // Metrics collection
HealthServer *observability.HealthServer // Health monitoring
Logger *slog.Logger // Structured logging
Config *GRPCConfig // Configuration
}
Constructor
func NewAgentHubServer(config *GRPCConfig) (*AgentHubServer, error)
Creates a complete gRPC server with:
- OpenTelemetry instrumentation
- Health check endpoints
- Metrics collection
- Structured logging with trace correlation
Methods
func (s *AgentHubServer) Start(ctx context.Context) error
Starts the server with automatic:
- Health endpoint setup (
/health,/ready,/metrics) - Metrics collection goroutine
- gRPC server with observability
func (s *AgentHubServer) Shutdown(ctx context.Context) error
Gracefully shuts down all components:
- gRPC server graceful stop
- Health server shutdown
- Observability cleanup
Example:
config := agenthub.NewGRPCConfig("broker")
server, err := agenthub.NewAgentHubServer(config)
if err != nil {
log.Fatal(err)
}
// Register services
eventBusService := agenthub.NewEventBusService(server)
pb.RegisterEventBusServer(server.Server, eventBusService)
// Start server
if err := server.Start(ctx); err != nil {
log.Fatal(err)
}
AgentHubClient
High-level gRPC client wrapper with integrated observability.
type AgentHubClient struct {
Client pb.EventBusClient // gRPC client
Connection *grpc.ClientConn // Connection
Observability *observability.Observability // OpenTelemetry integration
TraceManager *observability.TraceManager // Distributed tracing
MetricsManager *observability.MetricsManager // Metrics collection
HealthServer *observability.HealthServer // Health monitoring
Logger *slog.Logger // Structured logging
Config *GRPCConfig // Configuration
}
Constructor
func NewAgentHubClient(config *GRPCConfig) (*AgentHubClient, error)
Creates a complete gRPC client with:
- OpenTelemetry instrumentation
- Connection health monitoring
- Metrics collection
- Automatic retry and timeout handling
Methods
func (c *AgentHubClient) Start(ctx context.Context) error
Initializes client with health monitoring and metrics collection.
func (c *AgentHubClient) Shutdown(ctx context.Context) error
Gracefully closes connection and cleans up resources.
Example:
config := agenthub.NewGRPCConfig("publisher")
client, err := agenthub.NewAgentHubClient(config)
if err != nil {
log.Fatal(err)
}
err = client.Start(ctx)
if err != nil {
log.Fatal(err)
}
// Use client.Client for gRPC calls
Service Abstractions
EventBusService
Broker service implementation with built-in observability and correlation tracking.
type EventBusService struct {
Server *AgentHubServer
subscriptions map[string][]Subscription
resultSubs map[string][]ResultSubscription
progressSubs map[string][]ProgressSubscription
mu sync.RWMutex
}
Constructor
func NewEventBusService(server *AgentHubServer) *EventBusService
Creates an EventBus service with automatic:
- Subscription management
- Task routing and correlation
- Observability integration
Key Methods
func (s *EventBusService) PublishTask(ctx context.Context, req *pb.PublishTaskRequest) (*pb.PublishResponse, error)
Publishes tasks with automatic:
- Input validation
- Correlation ID generation
- Distributed tracing
- Metrics collection
func (s *EventBusService) SubscribeToTasks(req *pb.SubscribeToTasksRequest, stream pb.EventBus_SubscribeToTasksServer) error
Manages task subscriptions with:
- Automatic subscription lifecycle
- Context cancellation handling
- Error recovery
SubscriberAgent
High-level subscriber implementation with pluggable task handlers.
type SubscriberAgent struct {
client *AgentHubClient
agentID string
handlers map[string]TaskHandler
ctx context.Context
cancel context.CancelFunc
}
Constructor
func NewSubscriberAgent(client *AgentHubClient, agentID string) *SubscriberAgent
Task Handler Interface
type TaskHandler interface {
Handle(ctx context.Context, task *pb.TaskMessage) (*pb.TaskResult, error)
}
Methods
func (s *SubscriberAgent) RegisterHandler(taskType string, handler TaskHandler)
Registers handlers for specific task types with automatic:
- Task routing
- Error handling
- Result publishing
func (s *SubscriberAgent) Start(ctx context.Context) error
Starts the subscriber with automatic:
- Task subscription
- Handler dispatch
- Observability integration
Example:
type GreetingHandler struct{}
func (h *GreetingHandler) Handle(ctx context.Context, task *pb.TaskMessage) (*pb.TaskResult, error) {
// Process greeting task
return result, nil
}
// Register handler
subscriber.RegisterHandler("greeting", &GreetingHandler{})
Utility Functions
Metadata Operations
func ExtractCorrelationID(ctx context.Context) string
func InjectCorrelationID(ctx context.Context, correlationID string) context.Context
func GenerateCorrelationID() string
Automatic correlation ID management for distributed tracing.
Metrics Helpers
func NewMetricsTicker(ctx context.Context, manager *observability.MetricsManager) *MetricsTicker
Automatic metrics collection with configurable intervals.
Configuration Reference
Environment Variables
The unified abstraction library uses environment-based configuration:
| Variable | Type | Default | Description |
|---|---|---|---|
AGENTHUB_BROKER_ADDR | string | localhost | Broker server hostname |
AGENTHUB_BROKER_PORT | string | 50051 | Broker gRPC port |
AGENTHUB_GRPC_PORT | string | :50051 | Server listen address |
BROKER_HEALTH_PORT | string | 8080 | Health endpoint port |
SERVICE_VERSION | string | 1.0.0 | Service version for observability |
ENVIRONMENT | string | development | Deployment environment |
Observability Integration
The unified abstraction automatically configures:
- OpenTelemetry Tracing: Automatic span creation and context propagation
- Prometheus Metrics: 47+ built-in metrics for performance monitoring
- Health Checks: Comprehensive health endpoints for service monitoring
- Structured Logging: Correlated logging with trace context
Performance Characteristics
| Metric | Standard gRPC | Unified Abstraction | Overhead |
|---|---|---|---|
| Setup Complexity | 380+ lines | ~29 lines | -92% code |
| Throughput | 10,000+ tasks/sec | 9,500+ tasks/sec | -5% |
| Latency | Baseline | +10ms for tracing | +10ms |
| Memory | Baseline | +50MB per agent | +50MB |
| CPU | Baseline | +5% for observability | +5% |
Migration Guide
From Standard gRPC
Before (Standard gRPC):
// 380+ lines of boilerplate code
lis, err := net.Listen("tcp", ":50051")
server := grpc.NewServer()
// ... extensive setup code
After (Unified Abstraction):
// 29 lines total
config := agenthub.NewGRPCConfig("my-service")
server, err := agenthub.NewAgentHubServer(config)
service := agenthub.NewEventBusService(server)
pb.RegisterEventBusServer(server.Server, service)
server.Start(ctx)
Observability Benefits
The unified abstraction provides automatic:
- Distributed Tracing: Every request automatically traced
- Metrics Collection: 47+ metrics without configuration
- Health Monitoring: Built-in health and readiness endpoints
- Error Correlation: Automatic error tracking across services
- Performance Monitoring: Latency, throughput, and error rates
Error Handling
The unified abstraction provides comprehensive error handling:
- Automatic Retries: Built-in retry logic for transient failures
- Circuit Breaking: Protection against cascading failures
- Graceful Degradation: Service continues operating during partial failures
- Error Correlation: Distributed error tracking across service boundaries
Best Practices
1. Configuration Management
// Use environment-based configuration
config := agenthub.NewGRPCConfig("my-service")
// Override specific values if needed
config.HealthPort = "8083"
2. Handler Registration
// Register handlers before starting
subscriber.RegisterHandler("task-type", handler)
subscriber.Start(ctx)
3. Graceful Shutdown
// Always implement proper shutdown
defer func() {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
server.Shutdown(ctx)
}()
4. Error Handling
// Use context for cancellation
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
result, err := client.Client.PublishTask(ctx, request)
if err != nil {
// Error is automatically traced and logged
return fmt.Errorf("failed to publish task: %w", err)
}
See Also
- Observability Metrics Reference
- Health Endpoints Reference
- Tracing API Reference
- Configuration Reference
Feedback
Was this page helpful?
Glad to hear it! Please tell us how we can improve.
Sorry to hear that. Please tell us how we can improve.