diff --git a/apps/agent-test-cli/package.json b/apps/agent-test-cli/package.json index 720ea37..5a7dee6 100644 --- a/apps/agent-test-cli/package.json +++ b/apps/agent-test-cli/package.json @@ -7,5 +7,9 @@ "format": "oxfmt --write .", "start": "bun run src/agent-test-cli.ts", "typecheck": "bun tsc --noEmit" + }, + "dependencies": { + "@freya/agent-protocol": "workspace:*", + "@nym.sh/jrpc": "^0.1.0" } } diff --git a/apps/agent-test-cli/src/agent-test-cli.ts b/apps/agent-test-cli/src/agent-test-cli.ts index d0dd1f7..173856d 100644 --- a/apps/agent-test-cli/src/agent-test-cli.ts +++ b/apps/agent-test-cli/src/agent-test-cli.ts @@ -1,3 +1,13 @@ +import type { + AgentClientApi, + AgentEvent, + AgentServerApi, + SendMessageResult, +} from "@freya/agent-protocol" +import type { JrpcChannel, JrpcMessage, JsonRpcMessage } from "@nym.sh/jrpc" + +import { JsonRpcClient, JsonRpcServer } from "@nym.sh/jrpc" + type JsonObject = Record interface AuthUser { @@ -15,10 +25,6 @@ interface AuthSession { } } -interface QueryResponse { - message: string -} - interface QueryToolDefinition { name: string label: string @@ -60,6 +66,219 @@ class CookieJar { } } +class AgentWebSocketSession implements AgentClientApi { + private readonly channel: WebSocketJrpcChannel + private readonly client: JsonRpcClient + private readonly server: JsonRpcServer + private conversationId: string | undefined + private responseHadText = false + + private constructor(channel: WebSocketJrpcChannel) { + this.channel = channel + this.client = new JsonRpcClient(channel) + this.server = new JsonRpcServer( + { + notify: this.notify.bind(this), + }, + channel, + ) + } + + static async connect(backendUrl: string, cookies: CookieJar): Promise { + const channel = new WebSocketJrpcChannel(agentWebSocketUrl(backendUrl), cookies.header()) + const session = new AgentWebSocketSession(channel) + + try { + await channel.waitUntilOpen() + void session.server.start().catch((err: unknown) => { + if (!channel.isClosed()) { + console.error(`\nWebSocket JSON-RPC failed: ${formatError(err)}\n`) + } + }) + await session.client.call("ping") + } catch (err) { + channel.close() + throw err + } + + return session + } + + async ask(message: string): Promise { + this.responseHadText = false + + const result = await this.sendMessage(message) + if (result.conversationId) { + this.conversationId = result.conversationId + } + + if (!this.responseHadText) { + console.log(`\nagent> ${result.message || "(no message)"}`) + } + console.log("") + } + + notify(event: AgentEvent): void { + switch (event.type) { + case "conversation_started": + this.conversationId = event.conversationId + break + case "message_created": + this.printMessage(event.text) + break + case "tool_started": + console.log(`\ntool> ${event.toolName} started`) + break + case "tool_finished": + console.log(`tool> ${event.toolName} ${event.ok ? "finished" : "failed"}`) + break + case "message_finished": + break + case "message_failed": + console.log(`\nagent! ${event.error}`) + break + } + } + + describeConversation(): string { + return this.conversationId ? `Conversation: ${this.conversationId}` : "No conversation yet." + } + + close(): void { + this.channel.close() + } + + private async sendMessage(message: string): Promise { + return this.client.call("sendMessage", message) + } + + private printMessage(text: string): void { + if (text === "") return + + console.log(`\nagent> ${text}`) + this.responseHadText = true + } +} + +class WebSocketJrpcChannel implements JrpcChannel { + private readonly ws: WebSocket + private readonly opened: Promise + private closed = false + private openedOnce = false + private queue: JrpcMessage[] = [] + private waiters: Array<(result: IteratorResult) => void> = [] + + constructor(url: string, cookieHeader?: string) { + this.ws = new WebSocket(url, createWebSocketOptions(cookieHeader)) + this.opened = new Promise((resolve, reject) => { + this.ws.onopen = () => { + this.openedOnce = true + resolve() + } + this.ws.onerror = () => { + if (!this.openedOnce) { + reject(new Error(`Could not connect to ${url}`)) + } + } + this.ws.onclose = (event) => { + if (!this.openedOnce) { + reject(new Error(formatWebSocketClose(url, event))) + } + this.close() + } + }) + this.ws.onmessage = (event) => { + this.receive(event.data) + } + } + + waitUntilOpen(): Promise { + return this.opened + } + + isClosed(): boolean { + return this.closed + } + + async send(msg: JsonRpcMessage): Promise { + await this.opened + if (this.closed || this.ws.readyState !== WebSocket.OPEN) { + throw new Error("JSON-RPC WebSocket channel is closed") + } + + this.ws.send(JSON.stringify(msg)) + } + + async next(): Promise> { + const msg = this.queue.shift() + if (msg) { + return { done: false, value: msg } + } + + if (this.closed) { + return { done: true, value: undefined } + } + + return new Promise((resolve) => { + this.waiters.push(resolve) + }) + } + + async return(): Promise> { + this.close() + return { done: true, value: undefined } + } + + async throw(error?: unknown): Promise> { + this.close() + throw error + } + + async [Symbol.asyncDispose](): Promise { + await this.return() + } + + close(): void { + if (this.closed) return + + this.closed = true + for (const resolve of this.waiters.splice(0)) { + resolve({ done: true, value: undefined }) + } + + if (this.ws.readyState === WebSocket.CONNECTING || this.ws.readyState === WebSocket.OPEN) { + this.ws.close() + } + } + + [Symbol.asyncIterator](): AsyncGenerator { + return this + } + + private receive(message: unknown): void { + const parsed = parseJrpcMessage(message) + if (!parsed) { + this.ws.close(1003, "Invalid JSON-RPC message") + this.close() + return + } + + this.push(parsed) + } + + private push(msg: JrpcMessage): void { + if (this.closed) return + + const resolve = this.waiters.shift() + if (resolve) { + resolve({ done: false, value: msg }) + return + } + + this.queue.push(msg) + } +} + async function main(): Promise { if (wantsHelp()) { printUsage() @@ -111,73 +330,72 @@ async function runChatLoop( cookies: CookieJar, session: AuthSession, ): Promise { + const agent = await AgentWebSocketSession.connect(backendUrl, cookies) + console.log("Connected to /api/agent/ws") printHelp() - for (;;) { - const input = askOptional("you> ")?.trim() - if (!input) continue + try { + for (;;) { + const input = askOptional("you> ")?.trim() + if (!input) continue - if (input === "/quit" || input === "/exit") { - console.log("Bye.") - return - } + if (input === "/quit" || input === "/exit") { + console.log("Bye.") + return + } - if (input === "/help") { - printHelp() - continue - } + if (input === "/help") { + printHelp() + continue + } - if (input === "/session") { - console.log(`${session.user.name || session.user.email} (${session.user.id})`) - continue - } + if (input === "/session") { + console.log(`${session.user.name || session.user.email} (${session.user.id})`) + continue + } - if (input === "/tools") { - await runCliCommand(() => listQueryTools(backendUrl, cookies)) - continue - } + if (input === "/conversation") { + console.log(agent.describeConversation()) + continue + } - if (input.startsWith("/tool ")) { - await runCliCommand(() => executeQueryTool(backendUrl, cookies, input.slice("/tool ".length))) - continue - } + if (input === "/tools") { + await runCliCommand(() => listQueryTools(backendUrl, cookies)) + continue + } - if (input.startsWith("/actions ")) { - await runCliCommand(() => - listSourceActions(backendUrl, cookies, input.slice("/actions ".length)), - ) - continue - } + if (input.startsWith("/tool ")) { + await runCliCommand(() => + executeQueryTool(backendUrl, cookies, input.slice("/tool ".length)), + ) + continue + } - if (input.startsWith("/action ")) { - await runCliCommand(() => - executeSourceAction(backendUrl, cookies, input.slice("/action ".length)), - ) - continue - } + if (input.startsWith("/actions ")) { + await runCliCommand(() => + listSourceActions(backendUrl, cookies, input.slice("/actions ".length)), + ) + continue + } - try { - await askAgent(backendUrl, cookies, input) - } catch (err) { - console.error(`\n${formatError(err)}\n`) + if (input.startsWith("/action ")) { + await runCliCommand(() => + executeSourceAction(backendUrl, cookies, input.slice("/action ".length)), + ) + continue + } + + try { + await agent.ask(input) + } catch (err) { + console.error(`\n${formatError(err)}\n`) + } } + } finally { + agent.close() } } -async function askAgent(backendUrl: string, cookies: CookieJar, message: string): Promise { - const data = await requestJson(backendUrl, cookies, "/api/agent", { - method: "POST", - body: { message }, - }) - - if (!isQueryResponse(data)) { - throw new Error("Query returned an unexpected response shape") - } - - console.log(`\nagent> ${data.message || "(no message)"}`) - console.log("") -} - async function runCliCommand(command: () => Promise): Promise { try { await command() @@ -327,7 +545,7 @@ async function requestJson( function printIntro(): void { console.log("FREYA agent test CLI") - console.log("Connect to a backend, sign in, then send test messages to /api/agent.\n") + console.log("Connect to a backend, sign in, then send test messages to /api/agent/ws.\n") } function printUsage(): void { @@ -348,6 +566,7 @@ function printHelp(): void { console.log(" /tool Execute an agent debug tool with JSON params") console.log(" /actions List source actions: /actions ") console.log(" /action Execute source action: /action ") + console.log(" /conversation Show the current websocket conversation") console.log(" /session Show the signed-in user") console.log(" /help Show commands") console.log(" /quit Exit\n") @@ -417,6 +636,33 @@ function normalizeBackendUrl(value: string): string { } } +function agentWebSocketUrl(backendUrl: string): string { + const url = new URL(backendUrl) + url.protocol = url.protocol === "https:" ? "wss:" : "ws:" + url.pathname = "/api/agent/ws" + url.search = "" + url.hash = "" + return url.toString() +} + +function createWebSocketOptions(cookieHeader?: string): Bun.WebSocketOptions | undefined { + if (!cookieHeader) return undefined + + return { + headers: { + Cookie: cookieHeader, + }, + } +} + +function formatWebSocketClose( + url: string, + event: { code: number; reason: string; wasClean: boolean }, +): string { + const reason = event.reason ? `: ${event.reason}` : "" + return `Could not connect to ${url} (${event.code}${reason})` +} + function formatPromptLabel(label: string, defaultValue?: string): string { return defaultValue ? `${label} (${defaultValue}): ` : `${label}: ` } @@ -511,6 +757,25 @@ function splitSetCookieHeader(header: string): string[] { return parts.filter(Boolean) } +function parseJrpcMessage(message: unknown): JrpcMessage | null { + const text = webSocketMessageText(message) + if (!text) return null + + try { + const value: unknown = JSON.parse(text) + return isJrpcMessage(value) ? value : null + } catch { + return null + } +} + +function webSocketMessageText(message: unknown): string | null { + if (typeof message === "string") return message + if (message instanceof Uint8Array) return Buffer.from(message).toString("utf8") + if (message instanceof ArrayBuffer) return Buffer.from(message).toString("utf8") + return null +} + async function readResponseError(response: Response, path: string): Promise { const text = await response.text() if (response.status === 404 && path === "/api/agent") { @@ -548,11 +813,6 @@ function isAuthSession(value: unknown): value is AuthSession { ) } -function isQueryResponse(value: unknown): value is QueryResponse { - if (!isJsonObject(value)) return false - return typeof value.message === "string" -} - function isQueryToolsResponse(value: unknown): value is QueryToolsResponse { if (!isJsonObject(value) || !Array.isArray(value.tools)) return false return value.tools.every(isQueryToolDefinition) @@ -585,6 +845,33 @@ function isSourceActionDefinition(value: unknown): value is { id: string; descri ) } +function isJrpcMessage(value: unknown): value is JrpcMessage { + if (!isJsonObject(value) || value.jsonrpc !== "2.0" || typeof value.id !== "number") { + return false + } + + if ("method" in value) { + return ( + typeof value.method === "string" && + (value.params === undefined || Array.isArray(value.params)) + ) + } + + if ("result" in value) { + return true + } + + if ("error" in value) { + return isJsonRpcErrorObject(value.error) + } + + return false +} + +function isJsonRpcErrorObject(value: unknown): boolean { + return isJsonObject(value) && typeof value.code === "number" && typeof value.message === "string" +} + function isJsonObject(value: unknown): value is JsonObject { return typeof value === "object" && value !== null && !Array.isArray(value) } diff --git a/bun.lock b/bun.lock index 6ea5778..e59f8ca 100644 --- a/bun.lock +++ b/bun.lock @@ -49,6 +49,10 @@ "apps/agent-test-cli": { "name": "@freya/agent-test-cli", "version": "0.0.0", + "dependencies": { + "@freya/agent-protocol": "workspace:*", + "@nym.sh/jrpc": "^0.1.0", + }, }, "apps/freya-backend": { "name": "@freya/backend",