mirror of
https://github.com/kennethnym/freya
synced 2026-06-23 18:05:11 +01:00
feat: use websocket in agent cli
This commit is contained in:
@@ -7,5 +7,9 @@
|
|||||||
"format": "oxfmt --write .",
|
"format": "oxfmt --write .",
|
||||||
"start": "bun run src/agent-test-cli.ts",
|
"start": "bun run src/agent-test-cli.ts",
|
||||||
"typecheck": "bun tsc --noEmit"
|
"typecheck": "bun tsc --noEmit"
|
||||||
|
},
|
||||||
|
"dependencies": {
|
||||||
|
"@freya/agent-protocol": "workspace:*",
|
||||||
|
"@nym.sh/jrpc": "^0.1.0"
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,3 +1,13 @@
|
|||||||
|
import type {
|
||||||
|
AgentClientApi,
|
||||||
|
AgentEvent,
|
||||||
|
AgentServerApi,
|
||||||
|
SendMessageResult,
|
||||||
|
} from "@freya/agent-protocol"
|
||||||
|
import type { JrpcChannel, JrpcMessage, JsonRpcMessage } from "@nym.sh/jrpc"
|
||||||
|
|
||||||
|
import { JsonRpcClient, JsonRpcServer } from "@nym.sh/jrpc"
|
||||||
|
|
||||||
type JsonObject = Record<string, unknown>
|
type JsonObject = Record<string, unknown>
|
||||||
|
|
||||||
interface AuthUser {
|
interface AuthUser {
|
||||||
@@ -15,10 +25,6 @@ interface AuthSession {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
interface QueryResponse {
|
|
||||||
message: string
|
|
||||||
}
|
|
||||||
|
|
||||||
interface QueryToolDefinition {
|
interface QueryToolDefinition {
|
||||||
name: string
|
name: string
|
||||||
label: string
|
label: string
|
||||||
@@ -60,6 +66,219 @@ class CookieJar {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
class AgentWebSocketSession implements AgentClientApi {
|
||||||
|
private readonly channel: WebSocketJrpcChannel
|
||||||
|
private readonly client: JsonRpcClient<AgentServerApi>
|
||||||
|
private readonly server: JsonRpcServer<AgentClientApi>
|
||||||
|
private conversationId: string | undefined
|
||||||
|
private responseHadText = false
|
||||||
|
|
||||||
|
private constructor(channel: WebSocketJrpcChannel) {
|
||||||
|
this.channel = channel
|
||||||
|
this.client = new JsonRpcClient<AgentServerApi>(channel)
|
||||||
|
this.server = new JsonRpcServer<AgentClientApi>(
|
||||||
|
{
|
||||||
|
notify: this.notify.bind(this),
|
||||||
|
},
|
||||||
|
channel,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
static async connect(backendUrl: string, cookies: CookieJar): Promise<AgentWebSocketSession> {
|
||||||
|
const channel = new WebSocketJrpcChannel(agentWebSocketUrl(backendUrl), cookies.header())
|
||||||
|
const session = new AgentWebSocketSession(channel)
|
||||||
|
|
||||||
|
try {
|
||||||
|
await channel.waitUntilOpen()
|
||||||
|
void session.server.start().catch((err: unknown) => {
|
||||||
|
if (!channel.isClosed()) {
|
||||||
|
console.error(`\nWebSocket JSON-RPC failed: ${formatError(err)}\n`)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
await session.client.call("ping")
|
||||||
|
} catch (err) {
|
||||||
|
channel.close()
|
||||||
|
throw err
|
||||||
|
}
|
||||||
|
|
||||||
|
return session
|
||||||
|
}
|
||||||
|
|
||||||
|
async ask(message: string): Promise<void> {
|
||||||
|
this.responseHadText = false
|
||||||
|
|
||||||
|
const result = await this.sendMessage(message)
|
||||||
|
if (result.conversationId) {
|
||||||
|
this.conversationId = result.conversationId
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!this.responseHadText) {
|
||||||
|
console.log(`\nagent> ${result.message || "(no message)"}`)
|
||||||
|
}
|
||||||
|
console.log("")
|
||||||
|
}
|
||||||
|
|
||||||
|
notify(event: AgentEvent): void {
|
||||||
|
switch (event.type) {
|
||||||
|
case "conversation_started":
|
||||||
|
this.conversationId = event.conversationId
|
||||||
|
break
|
||||||
|
case "message_created":
|
||||||
|
this.printMessage(event.text)
|
||||||
|
break
|
||||||
|
case "tool_started":
|
||||||
|
console.log(`\ntool> ${event.toolName} started`)
|
||||||
|
break
|
||||||
|
case "tool_finished":
|
||||||
|
console.log(`tool> ${event.toolName} ${event.ok ? "finished" : "failed"}`)
|
||||||
|
break
|
||||||
|
case "message_finished":
|
||||||
|
break
|
||||||
|
case "message_failed":
|
||||||
|
console.log(`\nagent! ${event.error}`)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
describeConversation(): string {
|
||||||
|
return this.conversationId ? `Conversation: ${this.conversationId}` : "No conversation yet."
|
||||||
|
}
|
||||||
|
|
||||||
|
close(): void {
|
||||||
|
this.channel.close()
|
||||||
|
}
|
||||||
|
|
||||||
|
private async sendMessage(message: string): Promise<SendMessageResult> {
|
||||||
|
return this.client.call("sendMessage", message)
|
||||||
|
}
|
||||||
|
|
||||||
|
private printMessage(text: string): void {
|
||||||
|
if (text === "") return
|
||||||
|
|
||||||
|
console.log(`\nagent> ${text}`)
|
||||||
|
this.responseHadText = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
class WebSocketJrpcChannel implements JrpcChannel {
|
||||||
|
private readonly ws: WebSocket
|
||||||
|
private readonly opened: Promise<void>
|
||||||
|
private closed = false
|
||||||
|
private openedOnce = false
|
||||||
|
private queue: JrpcMessage[] = []
|
||||||
|
private waiters: Array<(result: IteratorResult<JrpcMessage, void>) => void> = []
|
||||||
|
|
||||||
|
constructor(url: string, cookieHeader?: string) {
|
||||||
|
this.ws = new WebSocket(url, createWebSocketOptions(cookieHeader))
|
||||||
|
this.opened = new Promise((resolve, reject) => {
|
||||||
|
this.ws.onopen = () => {
|
||||||
|
this.openedOnce = true
|
||||||
|
resolve()
|
||||||
|
}
|
||||||
|
this.ws.onerror = () => {
|
||||||
|
if (!this.openedOnce) {
|
||||||
|
reject(new Error(`Could not connect to ${url}`))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
this.ws.onclose = (event) => {
|
||||||
|
if (!this.openedOnce) {
|
||||||
|
reject(new Error(formatWebSocketClose(url, event)))
|
||||||
|
}
|
||||||
|
this.close()
|
||||||
|
}
|
||||||
|
})
|
||||||
|
this.ws.onmessage = (event) => {
|
||||||
|
this.receive(event.data)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
waitUntilOpen(): Promise<void> {
|
||||||
|
return this.opened
|
||||||
|
}
|
||||||
|
|
||||||
|
isClosed(): boolean {
|
||||||
|
return this.closed
|
||||||
|
}
|
||||||
|
|
||||||
|
async send(msg: JsonRpcMessage): Promise<void> {
|
||||||
|
await this.opened
|
||||||
|
if (this.closed || this.ws.readyState !== WebSocket.OPEN) {
|
||||||
|
throw new Error("JSON-RPC WebSocket channel is closed")
|
||||||
|
}
|
||||||
|
|
||||||
|
this.ws.send(JSON.stringify(msg))
|
||||||
|
}
|
||||||
|
|
||||||
|
async next(): Promise<IteratorResult<JrpcMessage, void>> {
|
||||||
|
const msg = this.queue.shift()
|
||||||
|
if (msg) {
|
||||||
|
return { done: false, value: msg }
|
||||||
|
}
|
||||||
|
|
||||||
|
if (this.closed) {
|
||||||
|
return { done: true, value: undefined }
|
||||||
|
}
|
||||||
|
|
||||||
|
return new Promise((resolve) => {
|
||||||
|
this.waiters.push(resolve)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
async return(): Promise<IteratorResult<JrpcMessage, void>> {
|
||||||
|
this.close()
|
||||||
|
return { done: true, value: undefined }
|
||||||
|
}
|
||||||
|
|
||||||
|
async throw(error?: unknown): Promise<IteratorResult<JrpcMessage, void>> {
|
||||||
|
this.close()
|
||||||
|
throw error
|
||||||
|
}
|
||||||
|
|
||||||
|
async [Symbol.asyncDispose](): Promise<void> {
|
||||||
|
await this.return()
|
||||||
|
}
|
||||||
|
|
||||||
|
close(): void {
|
||||||
|
if (this.closed) return
|
||||||
|
|
||||||
|
this.closed = true
|
||||||
|
for (const resolve of this.waiters.splice(0)) {
|
||||||
|
resolve({ done: true, value: undefined })
|
||||||
|
}
|
||||||
|
|
||||||
|
if (this.ws.readyState === WebSocket.CONNECTING || this.ws.readyState === WebSocket.OPEN) {
|
||||||
|
this.ws.close()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
[Symbol.asyncIterator](): AsyncGenerator<JrpcMessage, void, unknown> {
|
||||||
|
return this
|
||||||
|
}
|
||||||
|
|
||||||
|
private receive(message: unknown): void {
|
||||||
|
const parsed = parseJrpcMessage(message)
|
||||||
|
if (!parsed) {
|
||||||
|
this.ws.close(1003, "Invalid JSON-RPC message")
|
||||||
|
this.close()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
this.push(parsed)
|
||||||
|
}
|
||||||
|
|
||||||
|
private push(msg: JrpcMessage): void {
|
||||||
|
if (this.closed) return
|
||||||
|
|
||||||
|
const resolve = this.waiters.shift()
|
||||||
|
if (resolve) {
|
||||||
|
resolve({ done: false, value: msg })
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
this.queue.push(msg)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
async function main(): Promise<void> {
|
async function main(): Promise<void> {
|
||||||
if (wantsHelp()) {
|
if (wantsHelp()) {
|
||||||
printUsage()
|
printUsage()
|
||||||
@@ -111,8 +330,11 @@ async function runChatLoop(
|
|||||||
cookies: CookieJar,
|
cookies: CookieJar,
|
||||||
session: AuthSession,
|
session: AuthSession,
|
||||||
): Promise<void> {
|
): Promise<void> {
|
||||||
|
const agent = await AgentWebSocketSession.connect(backendUrl, cookies)
|
||||||
|
console.log("Connected to /api/agent/ws")
|
||||||
printHelp()
|
printHelp()
|
||||||
|
|
||||||
|
try {
|
||||||
for (;;) {
|
for (;;) {
|
||||||
const input = askOptional("you> ")?.trim()
|
const input = askOptional("you> ")?.trim()
|
||||||
if (!input) continue
|
if (!input) continue
|
||||||
@@ -132,13 +354,20 @@ async function runChatLoop(
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (input === "/conversation") {
|
||||||
|
console.log(agent.describeConversation())
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
if (input === "/tools") {
|
if (input === "/tools") {
|
||||||
await runCliCommand(() => listQueryTools(backendUrl, cookies))
|
await runCliCommand(() => listQueryTools(backendUrl, cookies))
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
if (input.startsWith("/tool ")) {
|
if (input.startsWith("/tool ")) {
|
||||||
await runCliCommand(() => executeQueryTool(backendUrl, cookies, input.slice("/tool ".length)))
|
await runCliCommand(() =>
|
||||||
|
executeQueryTool(backendUrl, cookies, input.slice("/tool ".length)),
|
||||||
|
)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -157,25 +386,14 @@ async function runChatLoop(
|
|||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
await askAgent(backendUrl, cookies, input)
|
await agent.ask(input)
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
console.error(`\n${formatError(err)}\n`)
|
console.error(`\n${formatError(err)}\n`)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
} finally {
|
||||||
|
agent.close()
|
||||||
async function askAgent(backendUrl: string, cookies: CookieJar, message: string): Promise<void> {
|
|
||||||
const data = await requestJson(backendUrl, cookies, "/api/agent", {
|
|
||||||
method: "POST",
|
|
||||||
body: { message },
|
|
||||||
})
|
|
||||||
|
|
||||||
if (!isQueryResponse(data)) {
|
|
||||||
throw new Error("Query returned an unexpected response shape")
|
|
||||||
}
|
}
|
||||||
|
|
||||||
console.log(`\nagent> ${data.message || "(no message)"}`)
|
|
||||||
console.log("")
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async function runCliCommand(command: () => Promise<void>): Promise<void> {
|
async function runCliCommand(command: () => Promise<void>): Promise<void> {
|
||||||
@@ -327,7 +545,7 @@ async function requestJson(
|
|||||||
|
|
||||||
function printIntro(): void {
|
function printIntro(): void {
|
||||||
console.log("FREYA agent test CLI")
|
console.log("FREYA agent test CLI")
|
||||||
console.log("Connect to a backend, sign in, then send test messages to /api/agent.\n")
|
console.log("Connect to a backend, sign in, then send test messages to /api/agent/ws.\n")
|
||||||
}
|
}
|
||||||
|
|
||||||
function printUsage(): void {
|
function printUsage(): void {
|
||||||
@@ -348,6 +566,7 @@ function printHelp(): void {
|
|||||||
console.log(" /tool Execute an agent debug tool with JSON params")
|
console.log(" /tool Execute an agent debug tool with JSON params")
|
||||||
console.log(" /actions List source actions: /actions <source-id>")
|
console.log(" /actions List source actions: /actions <source-id>")
|
||||||
console.log(" /action Execute source action: /action <source-id> <action-id> <json-params>")
|
console.log(" /action Execute source action: /action <source-id> <action-id> <json-params>")
|
||||||
|
console.log(" /conversation Show the current websocket conversation")
|
||||||
console.log(" /session Show the signed-in user")
|
console.log(" /session Show the signed-in user")
|
||||||
console.log(" /help Show commands")
|
console.log(" /help Show commands")
|
||||||
console.log(" /quit Exit\n")
|
console.log(" /quit Exit\n")
|
||||||
@@ -417,6 +636,33 @@ function normalizeBackendUrl(value: string): string {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
function agentWebSocketUrl(backendUrl: string): string {
|
||||||
|
const url = new URL(backendUrl)
|
||||||
|
url.protocol = url.protocol === "https:" ? "wss:" : "ws:"
|
||||||
|
url.pathname = "/api/agent/ws"
|
||||||
|
url.search = ""
|
||||||
|
url.hash = ""
|
||||||
|
return url.toString()
|
||||||
|
}
|
||||||
|
|
||||||
|
function createWebSocketOptions(cookieHeader?: string): Bun.WebSocketOptions | undefined {
|
||||||
|
if (!cookieHeader) return undefined
|
||||||
|
|
||||||
|
return {
|
||||||
|
headers: {
|
||||||
|
Cookie: cookieHeader,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
function formatWebSocketClose(
|
||||||
|
url: string,
|
||||||
|
event: { code: number; reason: string; wasClean: boolean },
|
||||||
|
): string {
|
||||||
|
const reason = event.reason ? `: ${event.reason}` : ""
|
||||||
|
return `Could not connect to ${url} (${event.code}${reason})`
|
||||||
|
}
|
||||||
|
|
||||||
function formatPromptLabel(label: string, defaultValue?: string): string {
|
function formatPromptLabel(label: string, defaultValue?: string): string {
|
||||||
return defaultValue ? `${label} (${defaultValue}): ` : `${label}: `
|
return defaultValue ? `${label} (${defaultValue}): ` : `${label}: `
|
||||||
}
|
}
|
||||||
@@ -511,6 +757,25 @@ function splitSetCookieHeader(header: string): string[] {
|
|||||||
return parts.filter(Boolean)
|
return parts.filter(Boolean)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
function parseJrpcMessage(message: unknown): JrpcMessage | null {
|
||||||
|
const text = webSocketMessageText(message)
|
||||||
|
if (!text) return null
|
||||||
|
|
||||||
|
try {
|
||||||
|
const value: unknown = JSON.parse(text)
|
||||||
|
return isJrpcMessage(value) ? value : null
|
||||||
|
} catch {
|
||||||
|
return null
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
function webSocketMessageText(message: unknown): string | null {
|
||||||
|
if (typeof message === "string") return message
|
||||||
|
if (message instanceof Uint8Array) return Buffer.from(message).toString("utf8")
|
||||||
|
if (message instanceof ArrayBuffer) return Buffer.from(message).toString("utf8")
|
||||||
|
return null
|
||||||
|
}
|
||||||
|
|
||||||
async function readResponseError(response: Response, path: string): Promise<string> {
|
async function readResponseError(response: Response, path: string): Promise<string> {
|
||||||
const text = await response.text()
|
const text = await response.text()
|
||||||
if (response.status === 404 && path === "/api/agent") {
|
if (response.status === 404 && path === "/api/agent") {
|
||||||
@@ -548,11 +813,6 @@ function isAuthSession(value: unknown): value is AuthSession {
|
|||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
function isQueryResponse(value: unknown): value is QueryResponse {
|
|
||||||
if (!isJsonObject(value)) return false
|
|
||||||
return typeof value.message === "string"
|
|
||||||
}
|
|
||||||
|
|
||||||
function isQueryToolsResponse(value: unknown): value is QueryToolsResponse {
|
function isQueryToolsResponse(value: unknown): value is QueryToolsResponse {
|
||||||
if (!isJsonObject(value) || !Array.isArray(value.tools)) return false
|
if (!isJsonObject(value) || !Array.isArray(value.tools)) return false
|
||||||
return value.tools.every(isQueryToolDefinition)
|
return value.tools.every(isQueryToolDefinition)
|
||||||
@@ -585,6 +845,33 @@ function isSourceActionDefinition(value: unknown): value is { id: string; descri
|
|||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
function isJrpcMessage(value: unknown): value is JrpcMessage {
|
||||||
|
if (!isJsonObject(value) || value.jsonrpc !== "2.0" || typeof value.id !== "number") {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
if ("method" in value) {
|
||||||
|
return (
|
||||||
|
typeof value.method === "string" &&
|
||||||
|
(value.params === undefined || Array.isArray(value.params))
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
if ("result" in value) {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
if ("error" in value) {
|
||||||
|
return isJsonRpcErrorObject(value.error)
|
||||||
|
}
|
||||||
|
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
function isJsonRpcErrorObject(value: unknown): boolean {
|
||||||
|
return isJsonObject(value) && typeof value.code === "number" && typeof value.message === "string"
|
||||||
|
}
|
||||||
|
|
||||||
function isJsonObject(value: unknown): value is JsonObject {
|
function isJsonObject(value: unknown): value is JsonObject {
|
||||||
return typeof value === "object" && value !== null && !Array.isArray(value)
|
return typeof value === "object" && value !== null && !Array.isArray(value)
|
||||||
}
|
}
|
||||||
|
|||||||
4
bun.lock
4
bun.lock
@@ -49,6 +49,10 @@
|
|||||||
"apps/agent-test-cli": {
|
"apps/agent-test-cli": {
|
||||||
"name": "@freya/agent-test-cli",
|
"name": "@freya/agent-test-cli",
|
||||||
"version": "0.0.0",
|
"version": "0.0.0",
|
||||||
|
"dependencies": {
|
||||||
|
"@freya/agent-protocol": "workspace:*",
|
||||||
|
"@nym.sh/jrpc": "^0.1.0",
|
||||||
|
},
|
||||||
},
|
},
|
||||||
"apps/freya-backend": {
|
"apps/freya-backend": {
|
||||||
"name": "@freya/backend",
|
"name": "@freya/backend",
|
||||||
|
|||||||
Reference in New Issue
Block a user