Event System Architecture

Understanding the event-driven architecture that enables real-time tool interaction monitoring and streaming responses in gomcptest.

This document explains the foundational event system architecture in gomcptest that enables real-time monitoring of tool interactions, streaming responses, and transparent agentic workflows. This system is implemented across different components and interfaces, with AgentFlow being one specific implementation.

What is the Event System?

The event system in gomcptest provides real-time visibility into AI-tool interactions through a streaming event architecture. It captures and streams events that occur during tool execution, enabling transparency in how AI agents make decisions and use tools.

Important Implementation Detail: By default, the OpenAI-compatible server only streams standard chat completion responses to maintain API compatibility. Tool events (tool calls and responses) are only included in the stream when the withAllEvents flag is enabled in the server configuration. This design allows for:

  • Standard Mode: OpenAI API compatibility with only chat completion chunks
  • Enhanced Mode: Full event visibility including tool interactions when withAllEvents is true

Core Event Concepts

Event-Driven Transparency

Traditional AI interactions are often “black boxes” where users see only the final result. The gomcptest event system provides transparency by exposing:

  1. Tool Call Events: When the AI decides to use a tool, what tool it chooses, and what parameters it passes
  2. Tool Response Events: The results returned by tools, including success responses and error conditions
  3. Processing Events: Internal state changes and decision points during request processing
  4. Stream Events: Real-time updates as responses are generated

Event Types

The system defines several core event types:

Tool Interaction Events

Based on the actual implementation in /host/openaiserver/chatengine/vertexai/gemini/tool_events.go:

  • ToolCallEvent: Generated when the AI decides to invoke a tool

    • Structure: event_type: "tool_call", contains tool name, ID, and arguments
    • Object type: "tool.call"
    • Generated by: NewToolCallEvent() function
  • ToolResponseEvent: Generated when a tool execution completes

    • Structure: event_type: "tool_response", contains response data or error
    • Object type: "tool.response"
    • Generated by: NewToolResponseEvent() function

Chat Completion Events

  • ChatCompletionStreamResponse: Standard OpenAI-compatible streaming chunks
    • Always included in streams (default behavior)
    • Contains incremental content as it’s generated

Event Availability

  • Default Streaming: Only ChatCompletionStreamResponse events are sent
  • Enhanced Streaming: When withAllEvents = true, includes all tool events

Event Architecture Patterns

Producer-Consumer Model

The event system follows a producer-consumer pattern:

  1. Event Producers: Components that generate events (chat engines, tool executors, stream processors)
  2. Event Channels: Transport mechanisms for event delivery (Go channels, HTTP streams)
  3. Event Consumers: Components that process and present events (web interfaces, logging systems, monitors)

Channel-Based Streaming

Events are delivered through channel-based streaming:

type StreamEvent interface {
    IsStreamEvent() bool
}

// Event channel returned by streaming operations
func SendStreamingRequest() (<-chan StreamEvent, error) {
    eventChan := make(chan StreamEvent, 100)
    
    // Events are sent to the channel as they occur
    go func() {
        defer close(eventChan)
        
        // Generate and send events
        eventChan <- &ToolCallEvent{...}
        eventChan <- &ToolResponseEvent{...}
        eventChan <- &ContentEvent{...}
    }()
    
    return eventChan, nil
}

Event Metadata

Each event carries standardized metadata:

  • Timestamp: When the event occurred
  • Event ID: Unique identifier for tracking
  • Event Type: Category and specific type
  • Context: Related session, request, or operation context
  • Payload: Event-specific data

Event Flow Patterns

Request-Response with Events

Traditional request-response patterns are enhanced with event streaming:

  1. Request Initiated: System generates start events
  2. Processing Events: Intermediate steps generate progress events
  3. Tool Interactions: Tool calls and responses generate events
  4. Content Generation: Streaming content generates incremental events
  5. Completion: Final response and end events

Event Correlation

Events are correlated through:

  • Session IDs: Grouping events within a single chat session
  • Request IDs: Linking events to specific API requests
  • Tool Call IDs: Connecting tool call and response events
  • Parent-Child Relationships: Hierarchical event relationships

Implementation Patterns

Server-Sent Events (SSE) Implementation

The actual implementation in /host/openaiserver/chatengine/chat_completion_stream.go delivers events via Server-Sent Events with specific formatting:

HTTP Headers Set:

Content-Type: text/event-stream
Cache-Control: no-cache
Connection: keep-alive
Transfer-Encoding: chunked

Event Format:

data: {"event_type":"tool_call","id":"chatcmpl-abc123","object":"tool.call","created":1704067200,"tool_call":{"id":"call_xyz789","name":"sleep","arguments":{"seconds":3}}}

data: {"event_type":"tool_response","id":"chatcmpl-abc123","object":"tool.response","created":1704067201,"tool_response":{"id":"call_xyz789","name":"sleep","response":"Slept for 3 seconds"}}

data: {"id":"chatcmpl-abc123","object":"chat.completion.chunk","created":1704067202,"model":"gemini-2.0-flash","choices":[{"index":0,"delta":{"content":"I have completed the 3-second pause."},"finish_reason":"stop"}]}

data: [DONE]

Event Filtering Logic:

switch res := event.(type) {
case ChatCompletionStreamResponse:
    // Always sent - OpenAI compatible
    jsonBytes, _ := json.Marshal(res)
    w.Write([]byte("data: " + string(jsonBytes) + "\n\n"))
default:
    // Tool events only sent when withAllEvents = true
    if o.withAllEvents {
        jsonBytes, _ := json.Marshal(event)
        w.Write([]byte("data: " + string(jsonBytes) + "\n\n"))
    }
}

JSON-RPC Event Extensions

For programmatic interfaces, events extend the JSON-RPC protocol:

{
  "jsonrpc": "2.0",
  "method": "event",
  "params": {
    "event_type": "tool_call",
    "event_data": {
      "id": "call_123",
      "name": "Edit", 
      "arguments": {...}
    }
  }
}

Event Processing Strategies

Real-Time Processing

Events are processed as they occur:

  • Immediate Display: Critical events are shown immediately
  • Progressive Enhancement: UI updates incrementally as events arrive
  • Optimistic Updates: UI shows intended state before confirmation

Buffering and Batching

For performance optimization:

  • Event Buffering: Collect multiple events before processing
  • Batch Updates: Update UI with multiple events simultaneously
  • Debouncing: Reduce update frequency for high-frequency events

Error Handling

Robust error handling in event processing:

  • Graceful Degradation: Continue operation when non-critical events fail
  • Event Recovery: Attempt to recover from event processing errors
  • Fallback Modes: Alternative processing when event system fails

Event System Benefits

Observability

The event system provides comprehensive observability:

  • Real-Time Monitoring: See what’s happening as it happens
  • Historical Analysis: Review past interactions and decisions
  • Performance Insights: Understand timing and bottlenecks
  • Error Tracking: Identify and diagnose issues

User Experience

Enhanced user experience through transparency:

  • Progress Indication: Users see incremental progress
  • Decision Transparency: Understand AI reasoning process
  • Interactive Feedback: Respond to tool executions in real-time
  • Learning Opportunity: Understand how AI approaches problems

Development and Debugging

Valuable for development:

  • Debugging Aid: Trace execution flow and identify issues
  • Testing Support: Verify expected event sequences
  • Performance Analysis: Identify optimization opportunities
  • Integration Testing: Validate event handling across components

Integration Points

Chat Engines Integration

The actual integration in /host/openaiserver/chatengine/vertexai/gemini/ shows specific implementation patterns:

Tool Call Event Generation:

// When AI decides to use a tool
toolCallEvent := NewToolCallEvent(completionID, toolCallID, toolName, args)
eventChannel <- toolCallEvent

Tool Response Event Generation:

// After tool execution completes
toolResponseEvent := NewToolResponseEvent(completionID, toolCallID, toolName, response, err)
eventChannel <- toolResponseEvent

Stream Channel Management:

func (s *ChatSession) SendStreamingChatRequest(ctx context.Context, req chatengine.ChatCompletionRequest) (<-chan chatengine.StreamEvent, error) {
    eventChannel := make(chan chatengine.StreamEvent, 100)
    
    go func() {
        defer close(eventChannel)
        // Process model responses and emit events
        for chunk := range modelStream {
            // Emit tool events when detected
            // Emit content events for streaming text
        }
    }()
    
    return eventChannel, nil
}

Tool Executors

Tools integrate by:

  • Emitting execution start events
  • Providing progress updates for long-running operations
  • Returning detailed response events
  • Generating error events with diagnostic information

User Interfaces

Interfaces integrate by:

  • Subscribing to event streams
  • Processing events in real-time
  • Updating UI based on event content
  • Providing user controls for event display

Event System Implementations

The event system is a general architecture that can be implemented in various ways:

AgentFlow Web Interface

AgentFlow implements the event system through:

  • Browser-based SSE consumption
  • Real-time popup notifications for tool calls
  • Progressive content updates
  • Interactive event display controls

CLI Interfaces

Command-line interfaces can implement through:

  • Terminal-based event display
  • Progress indicators and status updates
  • Structured logging of events
  • Interactive prompts based on events

API Gateways

API gateways can implement through:

  • Event forwarding to multiple consumers
  • Event filtering and transformation
  • Event persistence and replay
  • Event-based routing and load balancing

Future Event System Enhancements

Advanced Event Types

  • Reasoning Events: Capture AI’s internal reasoning process
  • Planning Events: Show multi-step planning and strategy
  • Context Events: Track context usage and management
  • Performance Events: Detailed timing and resource usage

Event Intelligence

  • Event Pattern Recognition: Identify common patterns and anomalies
  • Predictive Events: Anticipate likely next events
  • Event Summarization: Aggregate events into higher-level insights
  • Event Recommendations: Suggest optimizations based on event patterns

Enhanced Delivery

  • Event Persistence: Store and replay event histories
  • Event Filtering: Selective event delivery based on preferences
  • Event Routing: Direct events to multiple consumers
  • Event Transformation: Adapt events for different consumer types

Conclusion

The event system architecture in gomcptest provides a foundational layer for transparency, observability, and real-time interaction in agentic systems. By understanding these concepts, developers can effectively implement event-driven interfaces, create monitoring systems, and build tools that provide deep visibility into AI agent behavior.

This event system is implementation-agnostic and serves as the foundation for specific implementations like AgentFlow, while also enabling other interfaces and monitoring systems to provide similar transparency and real-time feedback capabilities.