From 430a095f2a57ff5474f72ed9189fb974595cb161 Mon Sep 17 00:00:00 2001 From: Kenneth Date: Fri, 3 Jul 2026 01:02:03 +0100 Subject: [PATCH] feat: sendMessage rpc returns created entry --- apps/agent-test-cli/src/agent-test-cli.ts | 17 +--- .../conversation-recording-query-agent.ts | 5 +- apps/freya-backend/src/agent/scheduler.ts | 60 ++++--------- apps/freya-backend/src/agent/service.ts | 37 +++++++- apps/freya-backend/src/agent/ws.ts | 10 +-- .../src/conversations/db-storage.ts | 13 ++- apps/freya-backend/src/conversations/http.ts | 90 +------------------ .../src/conversations/serialization.ts | 90 +++++++++++++++++++ .../src/conversations/storage.ts | 3 +- bun.lock | 3 + packages/freya-agent-protocol/package.json | 3 + .../freya-agent-protocol/src/index.test.ts | 20 ++++- packages/freya-agent-protocol/src/index.ts | 4 +- 13 files changed, 196 insertions(+), 159 deletions(-) create mode 100644 apps/freya-backend/src/conversations/serialization.ts diff --git a/apps/agent-test-cli/src/agent-test-cli.ts b/apps/agent-test-cli/src/agent-test-cli.ts index 173856d..1ce121c 100644 --- a/apps/agent-test-cli/src/agent-test-cli.ts +++ b/apps/agent-test-cli/src/agent-test-cli.ts @@ -2,13 +2,13 @@ import type { AgentClientApi, AgentEvent, AgentServerApi, - SendMessageResult, } from "@freya/agent-protocol" import type { JrpcChannel, JrpcMessage, JsonRpcMessage } from "@nym.sh/jrpc" import { JsonRpcClient, JsonRpcServer } from "@nym.sh/jrpc" type JsonObject = Record +type SendMessageResult = Awaited> interface AuthUser { id: string @@ -71,7 +71,6 @@ class AgentWebSocketSession implements AgentClientApi { private readonly client: JsonRpcClient private readonly server: JsonRpcServer private conversationId: string | undefined - private responseHadText = false private constructor(channel: WebSocketJrpcChannel) { this.channel = channel @@ -105,16 +104,9 @@ class AgentWebSocketSession implements AgentClientApi { } async ask(message: string): Promise { - this.responseHadText = false - - const result = await this.sendMessage(message) - if (result.conversationId) { - this.conversationId = result.conversationId - } - - if (!this.responseHadText) { - console.log(`\nagent> ${result.message || "(no message)"}`) - } + const entry = await this.sendMessage(message) + this.conversationId = entry.conversationId + console.log(`\nqueued> ${entry.kind} ${entry.id}`) console.log("") } @@ -156,7 +148,6 @@ class AgentWebSocketSession implements AgentClientApi { if (text === "") return console.log(`\nagent> ${text}`) - this.responseHadText = true } } diff --git a/apps/freya-backend/src/agent/conversation-recording-query-agent.ts b/apps/freya-backend/src/agent/conversation-recording-query-agent.ts index 2d6677b..5735096 100644 --- a/apps/freya-backend/src/agent/conversation-recording-query-agent.ts +++ b/apps/freya-backend/src/agent/conversation-recording-query-agent.ts @@ -26,10 +26,13 @@ export interface ConversationStorage { appendEntry( conversationId: string, input: AppendConversationEntryInput, - ): Promise + ): Promise listEntries(conversationId: string): Promise } +/** Minimal fields needed immediately after appending a conversation entry. */ +export type ConversationStorageAppendEntryResult = Pick + /** Minimal persisted entry shape needed by recording and replay agents. */ export type ConversationStorageEntry = Pick< ConversationEntryRow, diff --git a/apps/freya-backend/src/agent/scheduler.ts b/apps/freya-backend/src/agent/scheduler.ts index fe31471..9e52071 100644 --- a/apps/freya-backend/src/agent/scheduler.ts +++ b/apps/freya-backend/src/agent/scheduler.ts @@ -1,12 +1,11 @@ import type { UserEvent } from "@freya/agent-protocol" -import { ConversationEntryKind, UserMessagePayload } from "@freya/core" - import type { ConversationStorage } from "../conversations/storage" import type { Job, JobRegistry } from "../lib/job" import type { AgentResponseJobPayload } from "./job" -import { ConversationNotFoundError } from "../conversations/errors"; -import { ConversationResponseStateStatus } from "../db/schema"; + +import { ConversationNotFoundError } from "../conversations/errors" +import { ConversationResponseStateStatus } from "../db/schema" interface AgentMessageSchedulerConfig { storage: ConversationStorage @@ -47,27 +46,20 @@ export class AgentWorkScheduler { this.jobRegistry.addEventListener("cancelled", this.eraseJob.bind(this)) } - async receiveMessage(conversationId: string, message: string) { - await this.conversationStorage.transaction(async (storage) => { - const now = new Date() + async scheduleAgentResponse(conversationId: string) { + const existing = this.timers.get(conversationId) + if (existing) { + clearTimeout(existing) + } - const entry = await storage.appendEntry(conversationId, { - kind: ConversationEntryKind.UserMessage, - payload: { - role: "user", - parts: [{ type: "text", text: message }], - } satisfies UserMessagePayload, - }) + this.cancelCurrentJob(conversationId) - await storage.upsertConversationResponseState(conversationId, { - maxWaitUntil: new Date(now.getTime() + this.timing.maxWaitTime), - pendingSinceEntryId: entry.id, - status: "pending", - }) - - return entry - }) - this.scheduleAgentResponse(conversationId, this.timing.waitTime) + this.timers.set( + conversationId, + setTimeout(() => { + this.enqueueAgentResponse(conversationId) + }, this.timing.waitTime), + ) } async receiveUserEvent(conversationId: string, event: UserEvent) { @@ -92,11 +84,11 @@ export class AgentWorkScheduler { } private async delayAgentResponse(conversationId: string) { - this.cancelCurrentJob(conversationId); + this.cancelCurrentJob(conversationId) try { const ok = await this.conversationStorage.transaction(async (storage) => { - const state = await storage.findConversationResponseState(conversationId); + const state = await storage.findConversationResponseState(conversationId) if (state && state.status !== ConversationResponseStateStatus.Failed) { await storage.updateConversationResponseState(conversationId, { status: ConversationResponseStateStatus.Pending, @@ -109,7 +101,7 @@ export class AgentWorkScheduler { return false }) if (ok) { - await this.scheduleAgentResponse(conversationId, this.timing.waitTime) + await this.scheduleAgentResponse(conversationId) } } catch (error) { if (error instanceof ConversationNotFoundError) { @@ -123,22 +115,6 @@ export class AgentWorkScheduler { } } - private async scheduleAgentResponse(conversationId: string, delay: number) { - const existing = this.timers.get(conversationId) - if (existing) { - clearTimeout(existing) - } - - this.cancelCurrentJob(conversationId) - - this.timers.set( - conversationId, - setTimeout(() => { - this.enqueueAgentResponse(conversationId) - }, delay), - ) - } - /** * cancels the current job for agent response for the given conversation id * no-op if there is no active job for the conversation. diff --git a/apps/freya-backend/src/agent/service.ts b/apps/freya-backend/src/agent/service.ts index 4889d96..ee48d71 100644 --- a/apps/freya-backend/src/agent/service.ts +++ b/apps/freya-backend/src/agent/service.ts @@ -1,5 +1,7 @@ import type { UserEvent } from "@freya/agent-protocol" +import { ConversationEntry, ConversationEntryKind, UserMessagePayload } from "@freya/core" + import type { ConversationStorage } from "../conversations/storage" import type { NotificationCentral } from "../notification/notification-central" import type { UserSessionManager } from "../session" @@ -10,6 +12,11 @@ import { AgentResponseJobExecutor, type AgentResponseJobPayload } from "./job" import { AgentResponseReconciler } from "./reconciler" import { AgentWorkScheduler } from "./scheduler" +const AgentResponseTiming = { + waitTime: 5 * 1000, + maxWaitTime: 5 * 1000 * 60, +} as const + interface AgentServiceConfig { storage: ConversationStorage userSessionManager: UserSessionManager @@ -30,8 +37,8 @@ export class AgentService { this.scheduler = new AgentWorkScheduler({ storage, jobRegistry: this.jobRegistry, - waitTIme: 5 * 1000, - maxWaitTime: 5 * 1000 * 60, + waitTIme: AgentResponseTiming.waitTime, + maxWaitTime: AgentResponseTiming.maxWaitTime, }) this.reconciler = new AgentResponseReconciler({ signal, @@ -56,8 +63,30 @@ export class AgentService { this.reconciler.start() } - async scheduleAgentResponse(conversationId: string, message: string) { - await this.scheduler.receiveMessage(conversationId, message) + async scheduleAgentResponse(conversationId: string, message: string): Promise { + const createdEntry = await this.storage.transaction(async (storage) => { + const now = new Date() + + const entry = await storage.appendEntry(conversationId, { + kind: ConversationEntryKind.UserMessage, + payload: { + role: "user", + parts: [{ type: "text", text: message }], + } satisfies UserMessagePayload, + }) + + await storage.upsertConversationResponseState(conversationId, { + maxWaitUntil: new Date(now.getTime() + AgentResponseTiming.maxWaitTime), + pendingSinceEntryId: entry.id, + status: "pending", + }) + + return entry + }) + + await this.scheduler.scheduleAgentResponse(conversationId) + + return createdEntry } async handleUserEvent(conversationId: string, event: UserEvent) { diff --git a/apps/freya-backend/src/agent/ws.ts b/apps/freya-backend/src/agent/ws.ts index 03ab819..2e8af46 100644 --- a/apps/freya-backend/src/agent/ws.ts +++ b/apps/freya-backend/src/agent/ws.ts @@ -1,4 +1,5 @@ import type { AgentClientApi, AgentServerApi, UserEvent } from "@freya/agent-protocol" +import type { ConversationEntry } from "@freya/core" import type { JrpcChannel, JrpcMessage, JsonRpcMessage } from "@nym.sh/jrpc" import type { Hono, MiddlewareHandler } from "hono" import type { WSContext } from "hono/ws" @@ -119,13 +120,12 @@ class AgentRpcConnection implements AgentServerApi { this.agentService.handleUserEvent(this.conversationId, event) } - async sendMessage(message: string): Promise { + async sendMessage(message: string): Promise { try { - await this.agentService.scheduleAgentResponse(this.conversationId, message) - return true + return await this.agentService.scheduleAgentResponse(this.conversationId, message) } catch (error) { - console.log("[agent rpc connection] error when scheduling agent response", error) - return false + console.error("[agent rpc connection] error when scheduling agent response", error) + throw error } } diff --git a/apps/freya-backend/src/conversations/db-storage.ts b/apps/freya-backend/src/conversations/db-storage.ts index 32e2d26..c599a20 100644 --- a/apps/freya-backend/src/conversations/db-storage.ts +++ b/apps/freya-backend/src/conversations/db-storage.ts @@ -7,6 +7,7 @@ import { ContextSummaryPayload, GenericObjectPayload, UserMessagePayload, + type ConversationEntry, type ConversationEntryPayload, } from "@freya/core" import { type } from "arktype" @@ -38,6 +39,7 @@ import { user, } from "../db/schema.ts" import { ConversationNotFoundError } from "./errors.ts" +import { conversationEntryFromRow } from "./serialization.ts" const conversationEntryKind = type.enumerated(...Object.values(ConversationEntryKind)) const conversationEntryVisibility = type.enumerated(...Object.values(ConversationEntryVisibility)) @@ -93,8 +95,9 @@ export class DrizzleConversationStorage implements ConversationStorage { async appendEntry( conversationId: string, input: AppendConversationEntryInput, - ): Promise { - return this.write((db) => appendEntryToConversation(db, null, conversationId, input)) + ): Promise { + const row = await this.write((db) => appendEntryToConversation(db, null, conversationId, input)) + return conversationEntryFromRow(row) } async appendAttachmentEntry( @@ -377,8 +380,10 @@ export function conversations(db: Database, userId: string) { appendEntry( conversationId: string, input: AppendConversationEntryInput, - ): Promise { - return db.transaction((tx) => appendEntryToConversation(tx, userId, conversationId, input)) + ): Promise { + return db.transaction((tx) => + appendEntryToConversation(tx, userId, conversationId, input).then(conversationEntryFromRow), + ) }, appendAttachmentEntry( diff --git a/apps/freya-backend/src/conversations/http.ts b/apps/freya-backend/src/conversations/http.ts index b4d6027..1860676 100644 --- a/apps/freya-backend/src/conversations/http.ts +++ b/apps/freya-backend/src/conversations/http.ts @@ -1,13 +1,7 @@ import type { Context, Hono } from "hono" import { - AssistantMessagePayload, - AttachmentPayload, - ConversationEntryKind, ConversationEntryVisibility, - ContextSummaryPayload, - GenericObjectPayload, - UserMessagePayload, type Conversation, type ConversationEntry, } from "@freya/core" @@ -16,10 +10,11 @@ import { createMiddleware } from "hono/factory" import type { AuthSessionMiddleware } from "../auth/session-middleware.ts" import type { Database } from "../db/index.ts" -import type { ConversationEntryRow, ConversationRow } from "./storage.ts" +import type { ConversationRow } from "./storage.ts" import { conversations } from "./db-storage.ts" import { ConversationNotFoundError } from "./errors.ts" +import { conversationEntryFromRow } from "./serialization.ts" /** Hono environment populated by the conversations route middleware. */ type Env = { @@ -86,7 +81,7 @@ async function handleListEntries(c: Context) { visibility: ConversationEntryVisibility.UserVisible, }) const response: ListConversationEntriesResponse = { - entries: entries.map(serializeConversationEntry), + entries: entries.map(conversationEntryFromRow), } return c.json(response) @@ -105,82 +100,3 @@ function serializeConversation(row: ConversationRow): Conversation { updatedAt: row.updatedAt.toISOString(), } } - -function serializeConversationEntry(row: ConversationEntryRow): ConversationEntry { - const base = { - id: row.id, - conversationId: row.conversationId, - sequence: row.sequence, - visibility: row.visibility, - metadata: row.metadata, - createdAt: row.createdAt.toISOString(), - } - - switch (row.kind) { - case ConversationEntryKind.UserMessage: - return { - ...base, - kind: row.kind, - fileId: nullFileId(row), - payload: UserMessagePayload.assert(row.payload), - } - case ConversationEntryKind.AssistantMessage: - return { - ...base, - kind: row.kind, - fileId: nullFileId(row), - payload: AssistantMessagePayload.assert(row.payload), - } - case ConversationEntryKind.Attachment: - return { - ...base, - kind: row.kind, - fileId: requireFileId(row), - payload: AttachmentPayload.assert(row.payload), - } - case ConversationEntryKind.ToolCall: - return { - ...base, - kind: row.kind, - fileId: nullFileId(row), - payload: GenericObjectPayload.assert(row.payload), - } - case ConversationEntryKind.ToolResult: - return { - ...base, - kind: row.kind, - fileId: nullFileId(row), - payload: GenericObjectPayload.assert(row.payload), - } - case ConversationEntryKind.ContextSummary: - return { - ...base, - kind: row.kind, - fileId: nullFileId(row), - payload: ContextSummaryPayload.assert(row.payload), - } - case ConversationEntryKind.SystemNote: - return { - ...base, - kind: row.kind, - fileId: nullFileId(row), - payload: GenericObjectPayload.assert(row.payload), - } - } -} - -function requireFileId(row: ConversationEntryRow): string { - if (!row.fileId) { - throw new Error(`Conversation attachment entry "${row.id}" is missing a file id`) - } - - return row.fileId -} - -function nullFileId(row: ConversationEntryRow): null { - if (row.fileId !== null) { - throw new Error(`Conversation entry "${row.id}" unexpectedly references a file`) - } - - return null -} diff --git a/apps/freya-backend/src/conversations/serialization.ts b/apps/freya-backend/src/conversations/serialization.ts new file mode 100644 index 0000000..6064a70 --- /dev/null +++ b/apps/freya-backend/src/conversations/serialization.ts @@ -0,0 +1,90 @@ +import { + AssistantMessagePayload, + AttachmentPayload, + ConversationEntryKind, + ContextSummaryPayload, + GenericObjectPayload, + UserMessagePayload, + type ConversationEntry, +} from "@freya/core" + +import type { ConversationEntryRow } from "./storage.ts" + +export function conversationEntryFromRow(row: ConversationEntryRow): ConversationEntry { + const base = { + id: row.id, + conversationId: row.conversationId, + sequence: row.sequence, + visibility: row.visibility, + metadata: row.metadata, + createdAt: row.createdAt.toISOString(), + } + + switch (row.kind) { + case ConversationEntryKind.UserMessage: + return { + ...base, + kind: row.kind, + fileId: nullFileId(row), + payload: UserMessagePayload.assert(row.payload), + } + case ConversationEntryKind.AssistantMessage: + return { + ...base, + kind: row.kind, + fileId: nullFileId(row), + payload: AssistantMessagePayload.assert(row.payload), + } + case ConversationEntryKind.Attachment: + return { + ...base, + kind: row.kind, + fileId: requireFileId(row), + payload: AttachmentPayload.assert(row.payload), + } + case ConversationEntryKind.ToolCall: + return { + ...base, + kind: row.kind, + fileId: nullFileId(row), + payload: GenericObjectPayload.assert(row.payload), + } + case ConversationEntryKind.ToolResult: + return { + ...base, + kind: row.kind, + fileId: nullFileId(row), + payload: GenericObjectPayload.assert(row.payload), + } + case ConversationEntryKind.ContextSummary: + return { + ...base, + kind: row.kind, + fileId: nullFileId(row), + payload: ContextSummaryPayload.assert(row.payload), + } + case ConversationEntryKind.SystemNote: + return { + ...base, + kind: row.kind, + fileId: nullFileId(row), + payload: GenericObjectPayload.assert(row.payload), + } + } +} + +function requireFileId(row: ConversationEntryRow): string { + if (!row.fileId) { + throw new Error(`Conversation attachment entry "${row.id}" is missing a file id`) + } + + return row.fileId +} + +function nullFileId(row: ConversationEntryRow): null { + if (row.fileId !== null) { + throw new Error(`Conversation entry "${row.id}" unexpectedly references a file`) + } + + return null +} diff --git a/apps/freya-backend/src/conversations/storage.ts b/apps/freya-backend/src/conversations/storage.ts index a7747ca..996df83 100644 --- a/apps/freya-backend/src/conversations/storage.ts +++ b/apps/freya-backend/src/conversations/storage.ts @@ -7,6 +7,7 @@ import { ContextSummaryPayload, GenericObjectPayload, UserMessagePayload, + type ConversationEntry, } from "@freya/core" import { @@ -27,7 +28,7 @@ export interface ConversationStorage { appendEntry( conversationId: string, input: AppendConversationEntryInput, - ): Promise + ): Promise appendAttachmentEntry( conversationId: string, input: AppendAttachmentEntryInput, diff --git a/bun.lock b/bun.lock index fca2798..47b987b 100644 --- a/bun.lock +++ b/bun.lock @@ -158,6 +158,9 @@ "packages/freya-agent-protocol": { "name": "@freya/agent-protocol", "version": "0.0.0", + "dependencies": { + "@freya/core": "workspace:*", + }, }, "packages/freya-components": { "name": "@freya/components", diff --git a/packages/freya-agent-protocol/package.json b/packages/freya-agent-protocol/package.json index 23573ab..4d1a68e 100644 --- a/packages/freya-agent-protocol/package.json +++ b/packages/freya-agent-protocol/package.json @@ -6,5 +6,8 @@ "types": "src/index.ts", "scripts": { "test": "bun test ./src" + }, + "dependencies": { + "@freya/core": "workspace:*" } } diff --git a/packages/freya-agent-protocol/src/index.test.ts b/packages/freya-agent-protocol/src/index.test.ts index 1913a2f..612c7b5 100644 --- a/packages/freya-agent-protocol/src/index.test.ts +++ b/packages/freya-agent-protocol/src/index.test.ts @@ -1,12 +1,30 @@ import { describe, expect, test } from "bun:test" +import { ConversationEntryKind, ConversationEntryVisibility } from "@freya/core" + import type { AgentEvent, AgentServerApi } from "./index" describe("agent protocol", () => { test("defines server methods and agent events", () => { const server: AgentServerApi = { async sendMessage(message) { - return { message, conversationId: "conversation-1" } + return { + id: "entry-1", + conversationId: "conversation-1", + sequence: 1, + kind: ConversationEntryKind.UserMessage, + visibility: ConversationEntryVisibility.UserVisible, + fileId: null, + payload: { + role: "user", + parts: [{ type: "text", text: message }], + }, + metadata: {}, + createdAt: "2026-07-03T00:00:00.000Z", + } + }, + notify() { + // no-op for protocol shape test }, ping() { return "pong" diff --git a/packages/freya-agent-protocol/src/index.ts b/packages/freya-agent-protocol/src/index.ts index 0418489..db1c262 100644 --- a/packages/freya-agent-protocol/src/index.ts +++ b/packages/freya-agent-protocol/src/index.ts @@ -1,3 +1,5 @@ +import type { ConversationEntry } from "@freya/core" + export type AgentEvent = | { type: "conversation_started"; conversationId: string } | { type: "message_created"; text: string } @@ -9,7 +11,7 @@ export type AgentEvent = export type UserEvent = { type: "typing" } export interface AgentServerApi { - sendMessage(message: string): Promise + sendMessage(message: string): Promise notify(event: UserEvent): void ping(): "pong" }