import express, { Request, Response } from 'express';
import cors from 'cors';
import { OpenAI } from 'openai';
import { EventType, RunAgentInput } from '@ag-ui/core';
import { v4 as uuidv4 } from 'uuid';
const app = express();
const port = process.env.PORT || 8000;
// Middleware
app.use(cors());
app.use(express.json());
// Initialize OpenAI
const openai = new OpenAI({
apiKey: process.env.OPENAI_API_KEY,
});
// Event encoder for AG-UI events
class EventEncoder {
encode(event: any): string {
return `data: ${JSON.stringify(event)}\n\n`;
}
getContentType(): string {
return 'text/event-stream';
}
}
// AG-UI Agent Endpoint
app.post('/agent', async (req: Request, res: Response) => {
const input: RunAgentInput = req.body;
const encoder = new EventEncoder();
// Set up SSE headers
res.setHeader('Content-Type', encoder.getContentType());
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');
res.setHeader('X-Accel-Buffering', 'no'); // Disable buffering for nginx
try {
// Emit RUN_STARTED
res.write(encoder.encode({
type: EventType.RUN_STARTED,
threadId: input.threadId,
runId: input.runId,
}));
// Convert AG-UI messages to OpenAI format
const openaiMessages = input.messages.map((msg) => ({
role: msg.role as 'user' | 'assistant' | 'system',
content: msg.content || '',
...(msg.role === 'assistant' && msg.toolCalls ? {
tool_calls: msg.toolCalls
} : {}),
...(msg.role === 'tool' ? {
tool_call_id: msg.toolCallId
} : {})
}));
// Convert AG-UI tools to OpenAI format
const openaiTools = input.tools?.map((tool) => ({
type: 'function' as const,
function: {
name: tool.name,
description: tool.description,
parameters: tool.parameters,
},
})) || [];
// Call OpenAI with streaming
const stream = await openai.chat.completions.create({
model: 'gpt-4o',
messages: openaiMessages,
tools: openaiTools.length > 0 ? openaiTools : undefined,
stream: true,
});
const messageId = uuidv4();
let hasStartedMessage = false;
// Stream the response
for await (const chunk of stream) {
const delta = chunk.choices[0]?.delta;
// Handle text content
if (delta?.content) {
if (!hasStartedMessage) {
res.write(encoder.encode({
type: EventType.TEXT_MESSAGE_START,
messageId,
role: 'assistant',
}));
hasStartedMessage = true;
}
res.write(encoder.encode({
type: EventType.TEXT_MESSAGE_CONTENT,
messageId,
delta: delta.content,
}));
}
// Handle tool calls
if (delta?.tool_calls) {
for (const toolCall of delta.tool_calls) {
if (toolCall.function?.name) {
res.write(encoder.encode({
type: EventType.TOOL_CALL_START,
toolCallId: toolCall.id,
toolCallName: toolCall.function.name,
parentMessageId: messageId,
}));
}
if (toolCall.function?.arguments) {
res.write(encoder.encode({
type: EventType.TOOL_CALL_ARGS,
toolCallId: toolCall.id,
delta: toolCall.function.arguments,
}));
}
}
}
}
// End message if it was started
if (hasStartedMessage) {
res.write(encoder.encode({
type: EventType.TEXT_MESSAGE_END,
messageId,
}));
}
// Emit RUN_FINISHED
res.write(encoder.encode({
type: EventType.RUN_FINISHED,
threadId: input.threadId,
runId: input.runId,
}));
res.end();
} catch (error: any) {
// Emit RUN_ERROR
res.write(encoder.encode({
type: EventType.RUN_ERROR,
message: error.message || 'An error occurred',
}));
res.end();
}
});
// Health check endpoint
app.get('/health', (req: Request, res: Response) => {
res.json({ status: 'ok', timestamp: new Date().toISOString() });
});
// Start server
app.listen(port, () => {
console.log(`🚀 AG-UI Agent server running on port ${port}`);
console.log(`📍 Agent endpoint: http://localhost:${port}/agent`);
console.log(`💚 Health check: http://localhost:${port}/health`);
});