feat: sendMessage rpc returns created entry

This commit is contained in:
2026-07-03 01:02:03 +01:00
parent e3a00fe632
commit 430a095f2a
13 changed files with 196 additions and 159 deletions

View File

@@ -2,13 +2,13 @@ import type {
AgentClientApi, AgentClientApi,
AgentEvent, AgentEvent,
AgentServerApi, AgentServerApi,
SendMessageResult,
} from "@freya/agent-protocol" } from "@freya/agent-protocol"
import type { JrpcChannel, JrpcMessage, JsonRpcMessage } from "@nym.sh/jrpc" import type { JrpcChannel, JrpcMessage, JsonRpcMessage } from "@nym.sh/jrpc"
import { JsonRpcClient, JsonRpcServer } from "@nym.sh/jrpc" import { JsonRpcClient, JsonRpcServer } from "@nym.sh/jrpc"
type JsonObject = Record<string, unknown> type JsonObject = Record<string, unknown>
type SendMessageResult = Awaited<ReturnType<AgentServerApi["sendMessage"]>>
interface AuthUser { interface AuthUser {
id: string id: string
@@ -71,7 +71,6 @@ class AgentWebSocketSession implements AgentClientApi {
private readonly client: JsonRpcClient<AgentServerApi> private readonly client: JsonRpcClient<AgentServerApi>
private readonly server: JsonRpcServer<AgentClientApi> private readonly server: JsonRpcServer<AgentClientApi>
private conversationId: string | undefined private conversationId: string | undefined
private responseHadText = false
private constructor(channel: WebSocketJrpcChannel) { private constructor(channel: WebSocketJrpcChannel) {
this.channel = channel this.channel = channel
@@ -105,16 +104,9 @@ class AgentWebSocketSession implements AgentClientApi {
} }
async ask(message: string): Promise<void> { async ask(message: string): Promise<void> {
this.responseHadText = false const entry = await this.sendMessage(message)
this.conversationId = entry.conversationId
const result = await this.sendMessage(message) console.log(`\nqueued> ${entry.kind} ${entry.id}`)
if (result.conversationId) {
this.conversationId = result.conversationId
}
if (!this.responseHadText) {
console.log(`\nagent> ${result.message || "(no message)"}`)
}
console.log("") console.log("")
} }
@@ -156,7 +148,6 @@ class AgentWebSocketSession implements AgentClientApi {
if (text === "") return if (text === "") return
console.log(`\nagent> ${text}`) console.log(`\nagent> ${text}`)
this.responseHadText = true
} }
} }

View File

@@ -26,10 +26,13 @@ export interface ConversationStorage {
appendEntry( appendEntry(
conversationId: string, conversationId: string,
input: AppendConversationEntryInput, input: AppendConversationEntryInput,
): Promise<ConversationStorageEntry> ): Promise<ConversationStorageAppendEntryResult>
listEntries(conversationId: string): Promise<ConversationStorageEntry[]> listEntries(conversationId: string): Promise<ConversationStorageEntry[]>
} }
/** Minimal fields needed immediately after appending a conversation entry. */
export type ConversationStorageAppendEntryResult = Pick<ConversationEntryRow, "id" | "sequence">
/** Minimal persisted entry shape needed by recording and replay agents. */ /** Minimal persisted entry shape needed by recording and replay agents. */
export type ConversationStorageEntry = Pick< export type ConversationStorageEntry = Pick<
ConversationEntryRow, ConversationEntryRow,

View File

@@ -1,12 +1,11 @@
import type { UserEvent } from "@freya/agent-protocol" import type { UserEvent } from "@freya/agent-protocol"
import { ConversationEntryKind, UserMessagePayload } from "@freya/core"
import type { ConversationStorage } from "../conversations/storage" import type { ConversationStorage } from "../conversations/storage"
import type { Job, JobRegistry } from "../lib/job" import type { Job, JobRegistry } from "../lib/job"
import type { AgentResponseJobPayload } from "./job" import type { AgentResponseJobPayload } from "./job"
import { ConversationNotFoundError } from "../conversations/errors";
import { ConversationResponseStateStatus } from "../db/schema"; import { ConversationNotFoundError } from "../conversations/errors"
import { ConversationResponseStateStatus } from "../db/schema"
interface AgentMessageSchedulerConfig { interface AgentMessageSchedulerConfig {
storage: ConversationStorage storage: ConversationStorage
@@ -47,27 +46,20 @@ export class AgentWorkScheduler {
this.jobRegistry.addEventListener("cancelled", this.eraseJob.bind(this)) this.jobRegistry.addEventListener("cancelled", this.eraseJob.bind(this))
} }
async receiveMessage(conversationId: string, message: string) { async scheduleAgentResponse(conversationId: string) {
await this.conversationStorage.transaction(async (storage) => { const existing = this.timers.get(conversationId)
const now = new Date() if (existing) {
clearTimeout(existing)
}
const entry = await storage.appendEntry(conversationId, { this.cancelCurrentJob(conversationId)
kind: ConversationEntryKind.UserMessage,
payload: {
role: "user",
parts: [{ type: "text", text: message }],
} satisfies UserMessagePayload,
})
await storage.upsertConversationResponseState(conversationId, { this.timers.set(
maxWaitUntil: new Date(now.getTime() + this.timing.maxWaitTime), conversationId,
pendingSinceEntryId: entry.id, setTimeout(() => {
status: "pending", this.enqueueAgentResponse(conversationId)
}) }, this.timing.waitTime),
)
return entry
})
this.scheduleAgentResponse(conversationId, this.timing.waitTime)
} }
async receiveUserEvent(conversationId: string, event: UserEvent) { async receiveUserEvent(conversationId: string, event: UserEvent) {
@@ -92,11 +84,11 @@ export class AgentWorkScheduler {
} }
private async delayAgentResponse(conversationId: string) { private async delayAgentResponse(conversationId: string) {
this.cancelCurrentJob(conversationId); this.cancelCurrentJob(conversationId)
try { try {
const ok = await this.conversationStorage.transaction(async (storage) => { const ok = await this.conversationStorage.transaction(async (storage) => {
const state = await storage.findConversationResponseState(conversationId); const state = await storage.findConversationResponseState(conversationId)
if (state && state.status !== ConversationResponseStateStatus.Failed) { if (state && state.status !== ConversationResponseStateStatus.Failed) {
await storage.updateConversationResponseState(conversationId, { await storage.updateConversationResponseState(conversationId, {
status: ConversationResponseStateStatus.Pending, status: ConversationResponseStateStatus.Pending,
@@ -109,7 +101,7 @@ export class AgentWorkScheduler {
return false return false
}) })
if (ok) { if (ok) {
await this.scheduleAgentResponse(conversationId, this.timing.waitTime) await this.scheduleAgentResponse(conversationId)
} }
} catch (error) { } catch (error) {
if (error instanceof ConversationNotFoundError) { if (error instanceof ConversationNotFoundError) {
@@ -123,22 +115,6 @@ export class AgentWorkScheduler {
} }
} }
private async scheduleAgentResponse(conversationId: string, delay: number) {
const existing = this.timers.get(conversationId)
if (existing) {
clearTimeout(existing)
}
this.cancelCurrentJob(conversationId)
this.timers.set(
conversationId,
setTimeout(() => {
this.enqueueAgentResponse(conversationId)
}, delay),
)
}
/** /**
* cancels the current job for agent response for the given conversation id * cancels the current job for agent response for the given conversation id
* no-op if there is no active job for the conversation. * no-op if there is no active job for the conversation.

View File

@@ -1,5 +1,7 @@
import type { UserEvent } from "@freya/agent-protocol" import type { UserEvent } from "@freya/agent-protocol"
import { ConversationEntry, ConversationEntryKind, UserMessagePayload } from "@freya/core"
import type { ConversationStorage } from "../conversations/storage" import type { ConversationStorage } from "../conversations/storage"
import type { NotificationCentral } from "../notification/notification-central" import type { NotificationCentral } from "../notification/notification-central"
import type { UserSessionManager } from "../session" import type { UserSessionManager } from "../session"
@@ -10,6 +12,11 @@ import { AgentResponseJobExecutor, type AgentResponseJobPayload } from "./job"
import { AgentResponseReconciler } from "./reconciler" import { AgentResponseReconciler } from "./reconciler"
import { AgentWorkScheduler } from "./scheduler" import { AgentWorkScheduler } from "./scheduler"
const AgentResponseTiming = {
waitTime: 5 * 1000,
maxWaitTime: 5 * 1000 * 60,
} as const
interface AgentServiceConfig { interface AgentServiceConfig {
storage: ConversationStorage storage: ConversationStorage
userSessionManager: UserSessionManager userSessionManager: UserSessionManager
@@ -30,8 +37,8 @@ export class AgentService {
this.scheduler = new AgentWorkScheduler({ this.scheduler = new AgentWorkScheduler({
storage, storage,
jobRegistry: this.jobRegistry, jobRegistry: this.jobRegistry,
waitTIme: 5 * 1000, waitTIme: AgentResponseTiming.waitTime,
maxWaitTime: 5 * 1000 * 60, maxWaitTime: AgentResponseTiming.maxWaitTime,
}) })
this.reconciler = new AgentResponseReconciler({ this.reconciler = new AgentResponseReconciler({
signal, signal,
@@ -56,8 +63,30 @@ export class AgentService {
this.reconciler.start() this.reconciler.start()
} }
async scheduleAgentResponse(conversationId: string, message: string) { async scheduleAgentResponse(conversationId: string, message: string): Promise<ConversationEntry> {
await this.scheduler.receiveMessage(conversationId, message) const createdEntry = await this.storage.transaction(async (storage) => {
const now = new Date()
const entry = await storage.appendEntry(conversationId, {
kind: ConversationEntryKind.UserMessage,
payload: {
role: "user",
parts: [{ type: "text", text: message }],
} satisfies UserMessagePayload,
})
await storage.upsertConversationResponseState(conversationId, {
maxWaitUntil: new Date(now.getTime() + AgentResponseTiming.maxWaitTime),
pendingSinceEntryId: entry.id,
status: "pending",
})
return entry
})
await this.scheduler.scheduleAgentResponse(conversationId)
return createdEntry
} }
async handleUserEvent(conversationId: string, event: UserEvent) { async handleUserEvent(conversationId: string, event: UserEvent) {

View File

@@ -1,4 +1,5 @@
import type { AgentClientApi, AgentServerApi, UserEvent } from "@freya/agent-protocol" import type { AgentClientApi, AgentServerApi, UserEvent } from "@freya/agent-protocol"
import type { ConversationEntry } from "@freya/core"
import type { JrpcChannel, JrpcMessage, JsonRpcMessage } from "@nym.sh/jrpc" import type { JrpcChannel, JrpcMessage, JsonRpcMessage } from "@nym.sh/jrpc"
import type { Hono, MiddlewareHandler } from "hono" import type { Hono, MiddlewareHandler } from "hono"
import type { WSContext } from "hono/ws" import type { WSContext } from "hono/ws"
@@ -119,13 +120,12 @@ class AgentRpcConnection implements AgentServerApi {
this.agentService.handleUserEvent(this.conversationId, event) this.agentService.handleUserEvent(this.conversationId, event)
} }
async sendMessage(message: string): Promise<boolean> { async sendMessage(message: string): Promise<ConversationEntry> {
try { try {
await this.agentService.scheduleAgentResponse(this.conversationId, message) return await this.agentService.scheduleAgentResponse(this.conversationId, message)
return true
} catch (error) { } catch (error) {
console.log("[agent rpc connection] error when scheduling agent response", error) console.error("[agent rpc connection] error when scheduling agent response", error)
return false throw error
} }
} }

View File

@@ -7,6 +7,7 @@ import {
ContextSummaryPayload, ContextSummaryPayload,
GenericObjectPayload, GenericObjectPayload,
UserMessagePayload, UserMessagePayload,
type ConversationEntry,
type ConversationEntryPayload, type ConversationEntryPayload,
} from "@freya/core" } from "@freya/core"
import { type } from "arktype" import { type } from "arktype"
@@ -38,6 +39,7 @@ import {
user, user,
} from "../db/schema.ts" } from "../db/schema.ts"
import { ConversationNotFoundError } from "./errors.ts" import { ConversationNotFoundError } from "./errors.ts"
import { conversationEntryFromRow } from "./serialization.ts"
const conversationEntryKind = type.enumerated(...Object.values(ConversationEntryKind)) const conversationEntryKind = type.enumerated(...Object.values(ConversationEntryKind))
const conversationEntryVisibility = type.enumerated(...Object.values(ConversationEntryVisibility)) const conversationEntryVisibility = type.enumerated(...Object.values(ConversationEntryVisibility))
@@ -93,8 +95,9 @@ export class DrizzleConversationStorage implements ConversationStorage {
async appendEntry( async appendEntry(
conversationId: string, conversationId: string,
input: AppendConversationEntryInput, input: AppendConversationEntryInput,
): Promise<ConversationEntryRow> { ): Promise<ConversationEntry> {
return this.write((db) => appendEntryToConversation(db, null, conversationId, input)) const row = await this.write((db) => appendEntryToConversation(db, null, conversationId, input))
return conversationEntryFromRow(row)
} }
async appendAttachmentEntry( async appendAttachmentEntry(
@@ -377,8 +380,10 @@ export function conversations(db: Database, userId: string) {
appendEntry( appendEntry(
conversationId: string, conversationId: string,
input: AppendConversationEntryInput, input: AppendConversationEntryInput,
): Promise<ConversationEntryRow> { ): Promise<ConversationEntry> {
return db.transaction((tx) => appendEntryToConversation(tx, userId, conversationId, input)) return db.transaction((tx) =>
appendEntryToConversation(tx, userId, conversationId, input).then(conversationEntryFromRow),
)
}, },
appendAttachmentEntry( appendAttachmentEntry(

View File

@@ -1,13 +1,7 @@
import type { Context, Hono } from "hono" import type { Context, Hono } from "hono"
import { import {
AssistantMessagePayload,
AttachmentPayload,
ConversationEntryKind,
ConversationEntryVisibility, ConversationEntryVisibility,
ContextSummaryPayload,
GenericObjectPayload,
UserMessagePayload,
type Conversation, type Conversation,
type ConversationEntry, type ConversationEntry,
} from "@freya/core" } from "@freya/core"
@@ -16,10 +10,11 @@ import { createMiddleware } from "hono/factory"
import type { AuthSessionMiddleware } from "../auth/session-middleware.ts" import type { AuthSessionMiddleware } from "../auth/session-middleware.ts"
import type { Database } from "../db/index.ts" import type { Database } from "../db/index.ts"
import type { ConversationEntryRow, ConversationRow } from "./storage.ts" import type { ConversationRow } from "./storage.ts"
import { conversations } from "./db-storage.ts" import { conversations } from "./db-storage.ts"
import { ConversationNotFoundError } from "./errors.ts" import { ConversationNotFoundError } from "./errors.ts"
import { conversationEntryFromRow } from "./serialization.ts"
/** Hono environment populated by the conversations route middleware. */ /** Hono environment populated by the conversations route middleware. */
type Env = { type Env = {
@@ -86,7 +81,7 @@ async function handleListEntries(c: Context<Env>) {
visibility: ConversationEntryVisibility.UserVisible, visibility: ConversationEntryVisibility.UserVisible,
}) })
const response: ListConversationEntriesResponse = { const response: ListConversationEntriesResponse = {
entries: entries.map(serializeConversationEntry), entries: entries.map(conversationEntryFromRow),
} }
return c.json(response) return c.json(response)
@@ -105,82 +100,3 @@ function serializeConversation(row: ConversationRow): Conversation {
updatedAt: row.updatedAt.toISOString(), updatedAt: row.updatedAt.toISOString(),
} }
} }
function serializeConversationEntry(row: ConversationEntryRow): ConversationEntry {
const base = {
id: row.id,
conversationId: row.conversationId,
sequence: row.sequence,
visibility: row.visibility,
metadata: row.metadata,
createdAt: row.createdAt.toISOString(),
}
switch (row.kind) {
case ConversationEntryKind.UserMessage:
return {
...base,
kind: row.kind,
fileId: nullFileId(row),
payload: UserMessagePayload.assert(row.payload),
}
case ConversationEntryKind.AssistantMessage:
return {
...base,
kind: row.kind,
fileId: nullFileId(row),
payload: AssistantMessagePayload.assert(row.payload),
}
case ConversationEntryKind.Attachment:
return {
...base,
kind: row.kind,
fileId: requireFileId(row),
payload: AttachmentPayload.assert(row.payload),
}
case ConversationEntryKind.ToolCall:
return {
...base,
kind: row.kind,
fileId: nullFileId(row),
payload: GenericObjectPayload.assert(row.payload),
}
case ConversationEntryKind.ToolResult:
return {
...base,
kind: row.kind,
fileId: nullFileId(row),
payload: GenericObjectPayload.assert(row.payload),
}
case ConversationEntryKind.ContextSummary:
return {
...base,
kind: row.kind,
fileId: nullFileId(row),
payload: ContextSummaryPayload.assert(row.payload),
}
case ConversationEntryKind.SystemNote:
return {
...base,
kind: row.kind,
fileId: nullFileId(row),
payload: GenericObjectPayload.assert(row.payload),
}
}
}
function requireFileId(row: ConversationEntryRow): string {
if (!row.fileId) {
throw new Error(`Conversation attachment entry "${row.id}" is missing a file id`)
}
return row.fileId
}
function nullFileId(row: ConversationEntryRow): null {
if (row.fileId !== null) {
throw new Error(`Conversation entry "${row.id}" unexpectedly references a file`)
}
return null
}

View File

@@ -0,0 +1,90 @@
import {
AssistantMessagePayload,
AttachmentPayload,
ConversationEntryKind,
ContextSummaryPayload,
GenericObjectPayload,
UserMessagePayload,
type ConversationEntry,
} from "@freya/core"
import type { ConversationEntryRow } from "./storage.ts"
export function conversationEntryFromRow(row: ConversationEntryRow): ConversationEntry {
const base = {
id: row.id,
conversationId: row.conversationId,
sequence: row.sequence,
visibility: row.visibility,
metadata: row.metadata,
createdAt: row.createdAt.toISOString(),
}
switch (row.kind) {
case ConversationEntryKind.UserMessage:
return {
...base,
kind: row.kind,
fileId: nullFileId(row),
payload: UserMessagePayload.assert(row.payload),
}
case ConversationEntryKind.AssistantMessage:
return {
...base,
kind: row.kind,
fileId: nullFileId(row),
payload: AssistantMessagePayload.assert(row.payload),
}
case ConversationEntryKind.Attachment:
return {
...base,
kind: row.kind,
fileId: requireFileId(row),
payload: AttachmentPayload.assert(row.payload),
}
case ConversationEntryKind.ToolCall:
return {
...base,
kind: row.kind,
fileId: nullFileId(row),
payload: GenericObjectPayload.assert(row.payload),
}
case ConversationEntryKind.ToolResult:
return {
...base,
kind: row.kind,
fileId: nullFileId(row),
payload: GenericObjectPayload.assert(row.payload),
}
case ConversationEntryKind.ContextSummary:
return {
...base,
kind: row.kind,
fileId: nullFileId(row),
payload: ContextSummaryPayload.assert(row.payload),
}
case ConversationEntryKind.SystemNote:
return {
...base,
kind: row.kind,
fileId: nullFileId(row),
payload: GenericObjectPayload.assert(row.payload),
}
}
}
function requireFileId(row: ConversationEntryRow): string {
if (!row.fileId) {
throw new Error(`Conversation attachment entry "${row.id}" is missing a file id`)
}
return row.fileId
}
function nullFileId(row: ConversationEntryRow): null {
if (row.fileId !== null) {
throw new Error(`Conversation entry "${row.id}" unexpectedly references a file`)
}
return null
}

View File

@@ -7,6 +7,7 @@ import {
ContextSummaryPayload, ContextSummaryPayload,
GenericObjectPayload, GenericObjectPayload,
UserMessagePayload, UserMessagePayload,
type ConversationEntry,
} from "@freya/core" } from "@freya/core"
import { import {
@@ -27,7 +28,7 @@ export interface ConversationStorage {
appendEntry( appendEntry(
conversationId: string, conversationId: string,
input: AppendConversationEntryInput, input: AppendConversationEntryInput,
): Promise<ConversationEntryRow> ): Promise<ConversationEntry>
appendAttachmentEntry( appendAttachmentEntry(
conversationId: string, conversationId: string,
input: AppendAttachmentEntryInput, input: AppendAttachmentEntryInput,

View File

@@ -158,6 +158,9 @@
"packages/freya-agent-protocol": { "packages/freya-agent-protocol": {
"name": "@freya/agent-protocol", "name": "@freya/agent-protocol",
"version": "0.0.0", "version": "0.0.0",
"dependencies": {
"@freya/core": "workspace:*",
},
}, },
"packages/freya-components": { "packages/freya-components": {
"name": "@freya/components", "name": "@freya/components",

View File

@@ -6,5 +6,8 @@
"types": "src/index.ts", "types": "src/index.ts",
"scripts": { "scripts": {
"test": "bun test ./src" "test": "bun test ./src"
},
"dependencies": {
"@freya/core": "workspace:*"
} }
} }

View File

@@ -1,12 +1,30 @@
import { describe, expect, test } from "bun:test" import { describe, expect, test } from "bun:test"
import { ConversationEntryKind, ConversationEntryVisibility } from "@freya/core"
import type { AgentEvent, AgentServerApi } from "./index" import type { AgentEvent, AgentServerApi } from "./index"
describe("agent protocol", () => { describe("agent protocol", () => {
test("defines server methods and agent events", () => { test("defines server methods and agent events", () => {
const server: AgentServerApi = { const server: AgentServerApi = {
async sendMessage(message) { async sendMessage(message) {
return { message, conversationId: "conversation-1" } return {
id: "entry-1",
conversationId: "conversation-1",
sequence: 1,
kind: ConversationEntryKind.UserMessage,
visibility: ConversationEntryVisibility.UserVisible,
fileId: null,
payload: {
role: "user",
parts: [{ type: "text", text: message }],
},
metadata: {},
createdAt: "2026-07-03T00:00:00.000Z",
}
},
notify() {
// no-op for protocol shape test
}, },
ping() { ping() {
return "pong" return "pong"

View File

@@ -1,3 +1,5 @@
import type { ConversationEntry } from "@freya/core"
export type AgentEvent = export type AgentEvent =
| { type: "conversation_started"; conversationId: string } | { type: "conversation_started"; conversationId: string }
| { type: "message_created"; text: string } | { type: "message_created"; text: string }
@@ -9,7 +11,7 @@ export type AgentEvent =
export type UserEvent = { type: "typing" } export type UserEvent = { type: "typing" }
export interface AgentServerApi { export interface AgentServerApi {
sendMessage(message: string): Promise<boolean> sendMessage(message: string): Promise<ConversationEntry>
notify(event: UserEvent): void notify(event: UserEvent): void
ping(): "pong" ping(): "pong"
} }