AgentFlow: Event-Driven Interface Implementation
This document explains how AgentFlow implements the general event system architecture in a web-based interface, providing a concrete example of the event-driven patterns described in the foundational concepts. AgentFlow is the embedded web interface for gomcptest’s OpenAI-compatible server.
What is AgentFlow?
AgentFlow is a specific implementation of the gomcptest event system in the form of a modern web-based chat interface. It demonstrates how the general event-driven architecture can be applied to create transparent, real-time agentic interactions through a browser-based UI.
Core Architecture Overview
ChatEngine Interface Design
The foundation of AgentFlow’s functionality rests on the ChatServer
interface defined in chatengine/chat_server.go
:
type ChatServer interface {
AddMCPTool(client.MCPClient) error
ModelList(context.Context) ListModelsResponse
ModelDetail(ctx context.Context, modelID string) *Model
ListTools(ctx context.Context) []ListToolResponse
HandleCompletionRequest(context.Context, ChatCompletionRequest) (ChatCompletionResponse, error)
SendStreamingChatRequest(context.Context, ChatCompletionRequest) (<-chan StreamEvent, error)
}
This interface abstracts the underlying LLM provider (currently Vertex AI Gemini) and provides a consistent API for tool integration and streaming responses. The key innovation is the SendStreamingChatRequest
method that returns a channel of StreamEvent
interfaces, enabling real-time event streaming.
OpenAI v1 API Compatibility Strategy
A fundamental design decision was to maintain full compatibility with the OpenAI v1 API while extending it with enhanced functionality. This is achieved through:
- Standard Endpoint Preservation: Uses
/v1/chat/completions
,/v1/models
, and/v1/tools
endpoints - Parameter Encoding: Tool selection is encoded within the existing model parameter using a pipe-delimited format
- Event Extension: Additional events are streamed alongside standard chat completion responses
- Backward Compatibility: Existing OpenAI-compatible clients work unchanged
This approach avoids the need to modify standard API endpoints while providing enhanced capabilities through the AgentFlow interface.
Event System Architecture
StreamEvent Interface
The event system is built around the StreamEvent
interface in chatengine/stream_event.go
:
type StreamEvent interface {
IsStreamEvent() bool
}
This simple interface allows for polymorphic event handling, where different event types can be processed through the same streaming pipeline.
Event Types and Structure
Tool Call Events
Defined in chatengine/vertexai/gemini/tool_events.go
, tool call events capture when the AI decides to use a tool:
type ToolCallEvent struct {
ID string `json:"id"`
Object string `json:"object"`
Created int64 `json:"created"`
EventType string `json:"event_type"`
ToolCall ToolCallDetails `json:"tool_call"`
}
type ToolCallDetails struct {
ID string `json:"id"`
Name string `json:"name"`
Arguments map[string]interface{} `json:"arguments"`
}
Tool Response Events
Tool response events capture the results of tool execution:
type ToolResponseEvent struct {
ID string `json:"id"`
Object string `json:"object"`
Created int64 `json:"created"`
EventType string `json:"event_type"`
ToolResponse ToolResponseDetails `json:"tool_response"`
}
type ToolResponseDetails struct {
ID string `json:"id"`
Name string `json:"name"`
Response interface{} `json:"response"`
Error string `json:"error,omitempty"`
}
Server-Sent Events Implementation
The streaming implementation in chatengine/chat_completion_stream.go
provides the SSE infrastructure:
func (o *OpenAIV1WithToolHandler) streamResponse(w http.ResponseWriter, r *http.Request, req ChatCompletionRequest) {
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
w.Header().Set("Transfer-Encoding", "chunked")
// Process events from the stream channel
for event := range stream {
switch res := event.(type) {
case ChatCompletionStreamResponse:
// Handle standard chat completion chunks
default:
// Handle tool events if withAllEvents flag is true
if o.withAllEvents {
jsonBytes, _ := json.Marshal(event)
w.Write([]byte("data: " + string(jsonBytes) + "\n\n"))
}
}
}
}
The withAllEvents
flag controls whether tool events are included in the stream, allowing for backward compatibility with standard OpenAI clients.
Model and Tool Selection Mechanism
Pipe-Delimited Encoding
The tool selection mechanism is implemented through a clever encoding scheme in the model parameter. The ParseModelAndTools
method in chatengine/chat_structure.go
parses this format:
func (req *ChatCompletionRequest) ParseModelAndTools() (string, []string) {
parts := strings.Split(req.Model, "|")
if len(parts) <= 1 {
return req.Model, nil
}
modelName := strings.TrimSpace(parts[0])
toolNames := make([]string, 0, len(parts)-1)
for i := 1; i < len(parts); i++ {
toolName := strings.TrimSpace(parts[i])
if toolName != "" {
toolNames = append(toolNames, toolName)
}
}
return modelName, toolNames
}
This allows formats like:
gemini-2.0-flash
(no tool filtering)gemini-2.0-flash|Edit|View|Bash
(specific tools only)gemini-1.5-pro|VertexAI Code Execution
(model with built-in tools)
Tool Filtering Implementation
The Vertex AI Gemini implementation includes sophisticated tool filtering in chatengine/vertexai/gemini/chatsession.go
:
func (chatsession *ChatSession) FilterTools(requestedToolNames []string) []*genai.Tool {
if len(requestedToolNames) == 0 {
return chatsession.tools // Return all tools if none specified
}
var filteredTools []*genai.Tool
var filteredFunctions []*genai.FunctionDeclaration
for _, tool := range chatsession.tools {
// Handle Vertex AI built-in tools separately
switch {
case tool.CodeExecution != nil && requestedMap[VERTEXAI_CODE_EXECUTION]:
filteredTools = append(filteredTools, &genai.Tool{CodeExecution: tool.CodeExecution})
case tool.GoogleSearch != nil && requestedMap[VERTEXAI_GOOGLE_SEARCH]:
filteredTools = append(filteredTools, &genai.Tool{GoogleSearch: tool.GoogleSearch})
// ... handle other built-in tools
default:
// Handle MCP function declarations
for _, function := range tool.FunctionDeclarations {
if requestedMap[function.Name] {
filteredFunctions = append(filteredFunctions, function)
}
}
}
}
// Combine function declarations into a single tool
if len(filteredFunctions) > 0 {
filteredTools = append(filteredTools, &genai.Tool{
FunctionDeclarations: filteredFunctions,
})
}
return filteredTools
}
This implementation handles both Vertex AI built-in tools (CodeExecution, GoogleSearch, etc.) and MCP function declarations, ensuring they are properly separated to avoid proto validation errors.
Frontend Event Processing
Real-Time Event Handling
The JavaScript implementation in chat-ui.html.tmpl
provides comprehensive event processing through the handleStreamingResponse
method:
async handleStreamingResponse(response) {
const reader = response.body.getReader();
const decoder = new TextDecoder();
while (true) {
const { value, done } = await reader.read();
if (done) break;
const chunk = decoder.decode(value, { stream: true });
const lines = chunk.split('\n');
for (const line of lines) {
if (line.startsWith('data: ')) {
const data = line.slice(6);
if (data === '[DONE]') return;
try {
const parsed = JSON.parse(data);
// Handle different event types
if (parsed.event_type === 'tool_call') {
this.addToolNotification(parsed.tool_call.name, parsed);
this.showToolCallPopup(parsed);
} else if (parsed.event_type === 'tool_response') {
this.updateToolResponsePopup(parsed);
this.storeToolResponse(parsed);
} else if (parsed.choices && parsed.choices[0]) {
// Handle standard chat completion chunks
this.updateMessageContent(messageIndex, assistantMessage, true);
}
} catch (e) {
// Handle JSON parse errors gracefully
}
}
}
}
}
Tool Popup Management
AgentFlow implements a sophisticated popup management system to provide real-time feedback on tool execution:
showToolCallPopup(event) {
const popupId = event.tool_call.id;
// Create popup with loading state
const popup = document.createElement('div');
popup.className = 'tool-popup tool-call';
popup.innerHTML = `
<div class="tool-popup-header">
<div class="tool-popup-title">Tool Executing: ${event.tool_call.name}</div>
<button class="tool-popup-close" onclick="chatUI.closeToolPopup('${popupId}')">×</button>
</div>
<div class="tool-popup-content">
<div class="tool-popup-args">${JSON.stringify(event.tool_call.arguments, null, 2)}</div>
<div class="tool-popup-spinner"></div>
</div>
`;
// Store reference and set auto-close timer
this.toolPopups.set(popupId, popup);
this.popupAutoCloseTimers.set(popupId, setTimeout(() => {
this.closeToolPopup(popupId);
}, 30000));
}
updateToolResponsePopup(event) {
const popup = this.toolPopups.get(event.tool_response.id);
if (!popup) return;
// Update popup with response data
popup.className = `tool-popup ${event.tool_response.error ? 'tool-error' : 'tool-response'}`;
// Update content with response...
// Auto-close after showing result
setTimeout(() => {
this.closeToolPopup(event.tool_response.id);
}, 5500);
}
Model and Tool Parameter Encoding
The frontend buildModelWithTools()
function implements the pipe-delimited encoding:
buildModelWithTools() {
let modelString = this.selectedModel;
if (this.selectedTools.size > 0 && this.selectedTools.size < this.tools.length) {
// Only add tools if not all are selected (all selected means use all tools)
const toolNames = Array.from(this.selectedTools);
modelString += '|' + toolNames.join('|');
}
return modelString;
}
This ensures tool selection is properly encoded in the API request while maintaining OpenAI compatibility.
Technical Design Benefits
Event-Driven Transparency
The event system provides unprecedented visibility into AI decision-making:
- Real-Time Feedback: Users see tool calls as they happen
- Detailed Information: Full argument and response data available
- Error Visibility: Tool failures are clearly communicated
- Learning Opportunity: Users understand how AI approaches problems
Scalable Architecture
The channel-based streaming architecture scales well:
- Non-Blocking: Event processing doesn’t block the main request thread
- Backpressure Handling: Go channels provide natural backpressure
- Resource Management: Proper cleanup prevents memory leaks
- Error Isolation: Tool failures don’t crash the entire system
OpenAI Compatibility
The design maintains full OpenAI v1 API compatibility:
- Standard Endpoints: No custom API modifications required
- Parameter Encoding: Tool selection uses existing model parameter
- Event Extensions: Additional events don’t interfere with standard responses
- Client Compatibility: Existing OpenAI clients work unchanged
Integration Points
MCP Protocol Integration
AgentFlow seamlessly integrates with the Model Context Protocol:
- Tool Discovery: Automatic detection of MCP server capabilities
- Dynamic Loading: Tools can be added/removed without restart
- Protocol Abstraction: MCP details are hidden from the UI
- Error Handling: MCP errors are gracefully handled and displayed
Vertex AI Integration
The Vertex AI backend provides:
- Built-in Tools: Code execution, Google Search, etc.
- Model Selection: Multiple Gemini model variants
- Streaming Support: Native streaming for real-time responses
- Tool Mixing: Combines MCP tools with Vertex AI capabilities
This comprehensive architecture enables AgentFlow to provide an intuitive, powerful interface for agentic interactions while maintaining compatibility with existing OpenAI tooling and providing deep visibility into the AI’s decision-making process.
Feedback
Was this page helpful?
Glad to hear it! Please tell us how we can improve.
Sorry to hear that. Please tell us how we can improve.