This is the multi-page printable view of this section. Click here to print.

Return to the regular view of this page.

Cortex Reference

Technical reference for the Cortex orchestration engine

    Cortex Reference Documentation

    Complete API and configuration reference for Cortex.

    Overview

    Cortex is an asynchronous AI orchestration engine that coordinates multi-agent workflows through event-driven architecture.

    Version: 0.1.0 (POC) Status: Production-Ready Architecture (Mock LLM)

    Core Interfaces

    StateManager Interface

    Manages conversation persistence.

    package state
    
    type StateManager interface {
        // Get retrieves conversation state for a session
        Get(sessionID string) (*ConversationState, error)
    
        // Set persists conversation state
        Set(sessionID string, state *ConversationState) error
    
        // Delete removes conversation state
        Delete(sessionID string) error
    
        // WithLock executes a function with exclusive session access
        WithLock(sessionID string, fn func(*ConversationState) error) error
    }
    

    Implementations:

    • InMemoryStateManager - POC implementation (in-memory)
    • Future: RedisStateManager, PostgresStateManager

    Usage:

    sm := state.NewInMemoryStateManager()
    
    // Get state
    state, err := sm.Get("session-123")
    
    // Update with lock
    err = sm.WithLock("session-123", func(state *ConversationState) error {
        state.Messages = append(state.Messages, newMessage)
        return nil
    })
    

    LLM Client Interface

    Abstraction for AI decision-making.

    package llm
    
    type Client interface {
        // Decide analyzes state and returns actions to take
        Decide(
            ctx context.Context,
            conversationHistory []*pb.Message,
            availableAgents []*pb.AgentCard,
            newEvent *pb.Message,
        ) (*Decision, error)
    }
    

    Decision Structure:

    type Decision struct {
        Reasoning string   // Why these actions were chosen
        Actions   []Action // Actions to execute
    }
    
    type Action struct {
        Type string // "chat.response" or "task.request"
    
        // For chat.response
        ResponseText string
    
        // For task.request
        TaskType    string
        TaskPayload map[string]interface{}
        TargetAgent string
    
        CorrelationID string
    }
    

    Implementations:

    • MockClient - Testing implementation
    • Future: VertexAIClient, OpenAIClient

    Usage:

    llmClient := llm.NewMockClient()
    
    decision, err := llmClient.Decide(
        ctx,
        conversationHistory,
        availableAgents,
        newMessage,
    )
    
    for _, action := range decision.Actions {
        // Execute action
    }
    

    MessagePublisher Interface

    Publishes messages to Event Bus.

    package cortex
    
    type MessagePublisher interface {
        PublishMessage(
            ctx context.Context,
            msg *pb.Message,
            routing *pb.AgentEventMetadata,
        ) error
    }
    

    Implementation:

    type AgentHubMessagePublisher struct {
        client *agenthub.AgentHubClient
    }
    
    func (a *AgentHubMessagePublisher) PublishMessage(
        ctx context.Context,
        msg *pb.Message,
        routing *pb.AgentEventMetadata,
    ) error {
        _, err := a.client.Client.PublishMessage(ctx, &pb.PublishMessageRequest{
            Message: msg,
            Routing: routing,
        })
        return err
    }
    

    Data Structures

    ConversationState

    type ConversationState struct {
        SessionID        string                      // Unique session identifier
        Messages         []*pb.Message               // Full conversation history
        PendingTasks     map[string]*TaskContext     // In-flight tasks
        RegisteredAgents map[string]*pb.AgentCard    // Available agents
    }
    

    Fields:

    • SessionID - Unique identifier for the conversation (e.g., “session-123”)
    • Messages - Complete message history (USER and AGENT messages)
    • PendingTasks - Tasks awaiting completion (keyed by task_id)
    • RegisteredAgents - Agents available for this session (keyed by agent_id)

    TaskContext

    type TaskContext struct {
        TaskID        string       // Unique task identifier
        TaskType      string       // Type of task (e.g., "transcription")
        RequestedAt   int64        // Unix timestamp when task was created
        OriginalInput *pb.Message  // Original user message that triggered task
        UserNotified  bool         // Whether user received acknowledgment
    }
    

    Cortex Core API

    Constructor

    func NewCortex(
        stateManager StateManager,
        llmClient llm.Client,
        messagePublisher MessagePublisher,
    ) *Cortex
    

    Parameters:

    • stateManager - State persistence implementation
    • llmClient - LLM decision engine
    • messagePublisher - Message publishing adapter

    Returns: Configured Cortex instance

    Example:

    cortex := cortex.NewCortex(
        state.NewInMemoryStateManager(),
        llm.NewMockClient(),
        &AgentHubMessagePublisher{client: agentHubClient},
    )
    

    RegisterAgent

    func (c *Cortex) RegisterAgent(agentID string, card *pb.AgentCard)
    

    Registers an agent’s capabilities with Cortex.

    Parameters:

    • agentID - Unique agent identifier
    • card - Agent capability card

    Example:

    cortex.RegisterAgent("transcriber-1", &pb.AgentCard{
        Name: "Audio Transcriber",
        Description: "Transcribes audio files to text",
        Skills: []*pb.AgentSkill{
            {
                Id: "transcribe",
                Name: "Transcription",
                Description: "Converts speech to text",
            },
        },
    })
    

    GetAvailableAgents

    func (c *Cortex) GetAvailableAgents() []*pb.AgentCard
    

    Returns all registered agents.

    Returns: Slice of agent cards

    Example:

    agents := cortex.GetAvailableAgents()
    for _, agent := range agents {
        fmt.Printf("Agent: %s - %s\n", agent.Name, agent.Description)
    }
    

    HandleMessage

    func (c *Cortex) HandleMessage(ctx context.Context, msg *pb.Message) error
    

    Main entry point for processing messages.

    Parameters:

    • ctx - Context for cancellation and tracing
    • msg - A2A protocol message

    Returns: Error if processing failed

    Message Types Handled:

    1. Chat requests (role=USER) → Cortex decides response
    2. Task results (role=AGENT, task_id set) → Cortex synthesizes result

    Example:

    message := &pb.Message{
        MessageId: "msg-123",
        ContextId: "session-456",
        Role: pb.Role_ROLE_USER,
        Content: []*pb.Part{
            {Part: &pb.Part_Text{Text: "Hello"}},
        },
    }
    
    err := cortex.HandleMessage(ctx, message)
    

    Configuration

    Environment Variables

    # LLM Configuration
    CORTEX_LLM_MODEL=vertex-ai://gemini-2.0-flash  # LLM model to use
    
    # AgentHub Connection
    AGENTHUB_GRPC_PORT=127.0.0.1:50051            # Broker gRPC address
    AGENTHUB_BROKER_ADDR=127.0.0.1                # Broker host
    
    # Health Check
    CORTEX_HEALTH_PORT=8086                       # Health check HTTP port
    
    # Logging
    LOG_LEVEL=info                                # Log level (debug, info, warn, error)
    

    Programmatic Configuration

    // Create state manager
    stateManager := state.NewInMemoryStateManager()
    // Or for production:
    // stateManager := redis.NewRedisStateManager(redisClient)
    
    // Create LLM client
    llmClient := llm.NewMockClient()
    // Or for production:
    // llmClient := vertexai.NewClient(os.Getenv("CORTEX_LLM_MODEL"))
    
    // Create message publisher
    agentHubClient, _ := agenthub.NewAgentHubClient(config)
    messagePublisher := &AgentHubMessagePublisher{client: agentHubClient}
    
    // Create Cortex
    cortex := cortex.NewCortex(stateManager, llmClient, messagePublisher)
    

    Message Correlation

    Session Management

    Each conversation has a unique context_id (session ID).

    Session Lifecycle:

    1. CLI creates session: cli_session_<timestamp>
    2. All messages in conversation share this context_id
    3. Cortex maintains state per context_id
    4. State persists across restarts (if using persistent storage)

    Task Correlation

    Tasks are correlated via task_id.

    Task Lifecycle:

    1. User message triggers task
    2. Cortex creates task_id: task_<timestamp>
    3. Cortex adds to PendingTasks map
    4. Agent receives task (with task_id)
    5. Agent publishes result (same task_id)
    6. Cortex matches result to pending task
    7. Cortex removes from PendingTasks

    Error Handling

    State Errors

    type StateError struct {
        Op  string // Operation that failed
        Err string // Error message
    }
    

    Common Errors:

    • Empty session ID
    • Nil state
    • Lock timeout

    Handling:

    state, err := sm.Get(sessionID)
    if err != nil {
        if stateErr, ok := err.(*state.StateError); ok {
            log.Printf("State operation %s failed: %s", stateErr.Op, stateErr.Err)
        }
    }
    

    LLM Errors

    Handling:

    decision, err := llmClient.Decide(ctx, history, agents, event)
    if err != nil {
        // Fallback: send generic error response to user
        cortex.publishErrorResponse(ctx, sessionID, err)
        return err
    }
    

    Message Processing Errors

    Errors during HandleMessage are logged but don’t crash Cortex:

    err := cortex.HandleMessage(ctx, msg)
    if err != nil {
        logger.ErrorContext(ctx, "Failed to handle message",
            "error", err,
            "message_id", msg.GetMessageId(),
        )
        // Cortex continues processing other messages
    }
    

    Performance

    Concurrency

    • Thread-Safe: All operations use proper locking
    • Per-Session Locks: No global bottleneck
    • Tested: 100 concurrent goroutines, zero lost updates

    Complexity

    OperationTime ComplexityNotes
    Get StateO(1)Map lookup
    Set StateO(1)Map insert
    WithLockO(1) + fnPer-session lock
    HandleMessageO(n)n = message history for LLM

    Scalability

    Vertical Scaling:

    • In-memory state limited by RAM
    • Recommendation: ~10,000 active sessions per instance

    Horizontal Scaling (Future):

    • Partition sessions by context_id hash
    • Multiple Cortex instances
    • Shared persistent state (Redis Cluster)

    Testing

    Unit Tests

    Run all tests:

    go test -v ./agents/cortex/...
    

    Coverage:

    • State manager: 5 tests
    • LLM client: 4 tests
    • Cortex core: 4 tests

    Integration Testing

    Use demo script:

    ./demo_cortex.sh
    

    Mock LLM

    For testing custom decision logic:

    mockLLM := llm.NewMockClientWithFunc(
        func(ctx context.Context, history []*pb.Message, agents []*pb.AgentCard, event *pb.Message) (*llm.Decision, error) {
            // Custom logic
            return &llm.Decision{
                Actions: []llm.Action{
                    {Type: "chat.response", ResponseText: "Test response"},
                },
            }, nil
        },
    )
    
    cortex := cortex.NewCortex(stateManager, mockLLM, publisher)
    

    Migration Guide

    From Mock LLM to Real LLM

    1. Implement llm.Client interface:
    type VertexAIClient struct {
        client *genai.Client
        model  string
    }
    
    func (v *VertexAIClient) Decide(...) (*Decision, error) {
        // Build prompt
        prompt := buildPrompt(conversationHistory, availableAgents, newEvent)
    
        // Call Vertex AI
        response, err := v.client.Generate(ctx, prompt)
        if err != nil {
            return nil, err
        }
    
        // Parse response into Decision
        return parseDecision(response)
    }
    
    1. Update configuration:
    llmClient := vertexai.NewClient(os.Getenv("CORTEX_LLM_MODEL"))
    cortex := cortex.NewCortex(stateManager, llmClient, publisher)
    

    From In-Memory to Persistent State

    1. Implement state.StateManager interface:
    type RedisStateManager struct {
        client *redis.Client
    }
    
    func (r *RedisStateManager) Get(sessionID string) (*ConversationState, error) {
        data, err := r.client.Get(ctx, sessionID).Bytes()
        // Deserialize and return
    }
    
    func (r *RedisStateManager) Set(sessionID string, state *ConversationState) error {
        data, _ := json.Marshal(state)
        return r.client.Set(ctx, sessionID, data, 24*time.Hour).Err()
    }
    
    1. Update configuration:
    redisClient := redis.NewClient(&redis.Options{Addr: "localhost:6379"})
    stateManager := redis.NewRedisStateManager(redisClient)
    cortex := cortex.NewCortex(stateManager, llmClient, publisher)
    

    Resources