Event System Architecture
This document explains the foundational event system architecture in gomcptest that enables real-time monitoring of tool interactions, streaming responses, and transparent agentic workflows. This system is implemented across different components and interfaces, with AgentFlow being one specific implementation.
What is the Event System?
The event system in gomcptest provides real-time visibility into AI-tool interactions through a streaming event architecture. It captures and streams events that occur during tool execution, enabling transparency in how AI agents make decisions and use tools.
Important Implementation Detail: By default, the OpenAI-compatible server only streams standard chat completion responses to maintain API compatibility. Tool events (tool calls and responses) are only included in the stream when the withAllEvents
flag is enabled in the server configuration. This design allows for:
- Standard Mode: OpenAI API compatibility with only chat completion chunks
- Enhanced Mode: Full event visibility including tool interactions when
withAllEvents
is true
Core Event Concepts
Event-Driven Transparency
Traditional AI interactions are often “black boxes” where users see only the final result. The gomcptest event system provides transparency by exposing:
- Tool Call Events: When the AI decides to use a tool, what tool it chooses, and what parameters it passes
- Tool Response Events: The results returned by tools, including success responses and error conditions
- Processing Events: Internal state changes and decision points during request processing
- Stream Events: Real-time updates as responses are generated
Event Types
The system defines several core event types:
Tool Interaction Events
Based on the actual implementation in /host/openaiserver/chatengine/vertexai/gemini/tool_events.go
:
ToolCallEvent: Generated when the AI decides to invoke a tool
- Structure:
event_type: "tool_call"
, contains tool name, ID, and arguments - Object type:
"tool.call"
- Generated by:
NewToolCallEvent()
function
- Structure:
ToolResponseEvent: Generated when a tool execution completes
- Structure:
event_type: "tool_response"
, contains response data or error - Object type:
"tool.response"
- Generated by:
NewToolResponseEvent()
function
- Structure:
Chat Completion Events
- ChatCompletionStreamResponse: Standard OpenAI-compatible streaming chunks
- Always included in streams (default behavior)
- Contains incremental content as it’s generated
Event Availability
- Default Streaming: Only
ChatCompletionStreamResponse
events are sent - Enhanced Streaming: When
withAllEvents = true
, includes all tool events
Event Architecture Patterns
Producer-Consumer Model
The event system follows a producer-consumer pattern:
- Event Producers: Components that generate events (chat engines, tool executors, stream processors)
- Event Channels: Transport mechanisms for event delivery (Go channels, HTTP streams)
- Event Consumers: Components that process and present events (web interfaces, logging systems, monitors)
Channel-Based Streaming
Events are delivered through channel-based streaming:
type StreamEvent interface {
IsStreamEvent() bool
}
// Event channel returned by streaming operations
func SendStreamingRequest() (<-chan StreamEvent, error) {
eventChan := make(chan StreamEvent, 100)
// Events are sent to the channel as they occur
go func() {
defer close(eventChan)
// Generate and send events
eventChan <- &ToolCallEvent{...}
eventChan <- &ToolResponseEvent{...}
eventChan <- &ContentEvent{...}
}()
return eventChan, nil
}
Event Metadata
Each event carries standardized metadata:
- Timestamp: When the event occurred
- Event ID: Unique identifier for tracking
- Event Type: Category and specific type
- Context: Related session, request, or operation context
- Payload: Event-specific data
Event Flow Patterns
Request-Response with Events
Traditional request-response patterns are enhanced with event streaming:
- Request Initiated: System generates start events
- Processing Events: Intermediate steps generate progress events
- Tool Interactions: Tool calls and responses generate events
- Content Generation: Streaming content generates incremental events
- Completion: Final response and end events
Event Correlation
Events are correlated through:
- Session IDs: Grouping events within a single chat session
- Request IDs: Linking events to specific API requests
- Tool Call IDs: Connecting tool call and response events
- Parent-Child Relationships: Hierarchical event relationships
Implementation Patterns
Server-Sent Events (SSE) Implementation
The actual implementation in /host/openaiserver/chatengine/chat_completion_stream.go
delivers events via Server-Sent Events with specific formatting:
HTTP Headers Set:
Content-Type: text/event-stream
Cache-Control: no-cache
Connection: keep-alive
Transfer-Encoding: chunked
Event Format:
data: {"event_type":"tool_call","id":"chatcmpl-abc123","object":"tool.call","created":1704067200,"tool_call":{"id":"call_xyz789","name":"sleep","arguments":{"seconds":3}}}
data: {"event_type":"tool_response","id":"chatcmpl-abc123","object":"tool.response","created":1704067201,"tool_response":{"id":"call_xyz789","name":"sleep","response":"Slept for 3 seconds"}}
data: {"id":"chatcmpl-abc123","object":"chat.completion.chunk","created":1704067202,"model":"gemini-2.0-flash","choices":[{"index":0,"delta":{"content":"I have completed the 3-second pause."},"finish_reason":"stop"}]}
data: [DONE]
Event Filtering Logic:
switch res := event.(type) {
case ChatCompletionStreamResponse:
// Always sent - OpenAI compatible
jsonBytes, _ := json.Marshal(res)
w.Write([]byte("data: " + string(jsonBytes) + "\n\n"))
default:
// Tool events only sent when withAllEvents = true
if o.withAllEvents {
jsonBytes, _ := json.Marshal(event)
w.Write([]byte("data: " + string(jsonBytes) + "\n\n"))
}
}
JSON-RPC Event Extensions
For programmatic interfaces, events extend the JSON-RPC protocol:
{
"jsonrpc": "2.0",
"method": "event",
"params": {
"event_type": "tool_call",
"event_data": {
"id": "call_123",
"name": "Edit",
"arguments": {...}
}
}
}
Event Processing Strategies
Real-Time Processing
Events are processed as they occur:
- Immediate Display: Critical events are shown immediately
- Progressive Enhancement: UI updates incrementally as events arrive
- Optimistic Updates: UI shows intended state before confirmation
Buffering and Batching
For performance optimization:
- Event Buffering: Collect multiple events before processing
- Batch Updates: Update UI with multiple events simultaneously
- Debouncing: Reduce update frequency for high-frequency events
Error Handling
Robust error handling in event processing:
- Graceful Degradation: Continue operation when non-critical events fail
- Event Recovery: Attempt to recover from event processing errors
- Fallback Modes: Alternative processing when event system fails
Event System Benefits
Observability
The event system provides comprehensive observability:
- Real-Time Monitoring: See what’s happening as it happens
- Historical Analysis: Review past interactions and decisions
- Performance Insights: Understand timing and bottlenecks
- Error Tracking: Identify and diagnose issues
User Experience
Enhanced user experience through transparency:
- Progress Indication: Users see incremental progress
- Decision Transparency: Understand AI reasoning process
- Interactive Feedback: Respond to tool executions in real-time
- Learning Opportunity: Understand how AI approaches problems
Development and Debugging
Valuable for development:
- Debugging Aid: Trace execution flow and identify issues
- Testing Support: Verify expected event sequences
- Performance Analysis: Identify optimization opportunities
- Integration Testing: Validate event handling across components
Integration Points
Chat Engines Integration
The actual integration in /host/openaiserver/chatengine/vertexai/gemini/
shows specific implementation patterns:
Tool Call Event Generation:
// When AI decides to use a tool
toolCallEvent := NewToolCallEvent(completionID, toolCallID, toolName, args)
eventChannel <- toolCallEvent
Tool Response Event Generation:
// After tool execution completes
toolResponseEvent := NewToolResponseEvent(completionID, toolCallID, toolName, response, err)
eventChannel <- toolResponseEvent
Stream Channel Management:
func (s *ChatSession) SendStreamingChatRequest(ctx context.Context, req chatengine.ChatCompletionRequest) (<-chan chatengine.StreamEvent, error) {
eventChannel := make(chan chatengine.StreamEvent, 100)
go func() {
defer close(eventChannel)
// Process model responses and emit events
for chunk := range modelStream {
// Emit tool events when detected
// Emit content events for streaming text
}
}()
return eventChannel, nil
}
Tool Executors
Tools integrate by:
- Emitting execution start events
- Providing progress updates for long-running operations
- Returning detailed response events
- Generating error events with diagnostic information
User Interfaces
Interfaces integrate by:
- Subscribing to event streams
- Processing events in real-time
- Updating UI based on event content
- Providing user controls for event display
Event System Implementations
The event system is a general architecture that can be implemented in various ways:
AgentFlow Web Interface
AgentFlow implements the event system through:
- Browser-based SSE consumption
- Real-time popup notifications for tool calls
- Progressive content updates
- Interactive event display controls
CLI Interfaces
Command-line interfaces can implement through:
- Terminal-based event display
- Progress indicators and status updates
- Structured logging of events
- Interactive prompts based on events
API Gateways
API gateways can implement through:
- Event forwarding to multiple consumers
- Event filtering and transformation
- Event persistence and replay
- Event-based routing and load balancing
Future Event System Enhancements
Advanced Event Types
- Reasoning Events: Capture AI’s internal reasoning process
- Planning Events: Show multi-step planning and strategy
- Context Events: Track context usage and management
- Performance Events: Detailed timing and resource usage
Event Intelligence
- Event Pattern Recognition: Identify common patterns and anomalies
- Predictive Events: Anticipate likely next events
- Event Summarization: Aggregate events into higher-level insights
- Event Recommendations: Suggest optimizations based on event patterns
Enhanced Delivery
- Event Persistence: Store and replay event histories
- Event Filtering: Selective event delivery based on preferences
- Event Routing: Direct events to multiple consumers
- Event Transformation: Adapt events for different consumer types
Conclusion
The event system architecture in gomcptest provides a foundational layer for transparency, observability, and real-time interaction in agentic systems. By understanding these concepts, developers can effectively implement event-driven interfaces, create monitoring systems, and build tools that provide deep visibility into AI agent behavior.
This event system is implementation-agnostic and serves as the foundation for specific implementations like AgentFlow, while also enabling other interfaces and monitoring systems to provide similar transparency and real-time feedback capabilities.
Feedback
Was this page helpful?
Glad to hear it! Please tell us how we can improve.
Sorry to hear that. Please tell us how we can improve.