From f8669c681cd236212aedcb3bdc349748ad16f4ce Mon Sep 17 00:00:00 2001 From: Kenneth Date: Sat, 4 Jul 2026 17:35:50 +0100 Subject: [PATCH] Add agent response scheduler (#158) * feat: add agent response scheduler * feat: sendMessage rpc returns created entry * feat: make agent event include conversation entry * feat: add agent response logging --- apps/agent-test-cli/package.json | 2 +- apps/agent-test-cli/src/agent-test-cli.ts | 95 ++- apps/freya-backend/package.json | 4 +- apps/freya-backend/src/admin/http.test.ts | 2 +- .../conversation-recording-query-agent.ts | 5 +- apps/freya-backend/src/agent/job.test.ts | 377 ++++++++++ apps/freya-backend/src/agent/job.ts | 294 ++++++++ .../freya-backend/src/agent/pi-query-agent.ts | 10 + apps/freya-backend/src/agent/query-agent.ts | 1 + apps/freya-backend/src/agent/reconciler.ts | 100 +++ apps/freya-backend/src/agent/scheduler.ts | 176 +++++ apps/freya-backend/src/agent/service.ts | 152 ++++ .../freya-backend/src/agent/streaming.test.ts | 97 ++- apps/freya-backend/src/agent/streaming.ts | 97 ++- apps/freya-backend/src/agent/ws.test.ts | 12 +- apps/freya-backend/src/agent/ws.ts | 197 +++-- .../src/conversations/db-storage.ts | 691 ++++++++++++++++++ .../src/conversations/http.test.ts | 2 +- apps/freya-backend/src/conversations/http.ts | 92 +-- .../src/conversations/serialization.ts | 90 +++ .../src/conversations/storage.ts | 369 ++-------- apps/freya-backend/src/db/schema.ts | 41 ++ apps/freya-backend/src/engine/http.test.ts | 6 +- .../src/enhancement/schema.test.ts | 3 +- apps/freya-backend/src/lib/job.ts | 116 +++ apps/freya-backend/src/lib/logger.ts | 83 +++ apps/freya-backend/src/lib/queue.ts | 69 ++ apps/freya-backend/src/lib/worker.ts | 86 +++ .../src/notification/notification-central.ts | 36 + apps/freya-backend/src/server.ts | 30 +- .../src/session/user-session-manager.test.ts | 2 +- .../src/session/user-session-manager.ts | 2 +- .../freya-backend/src/session/user-session.ts | 18 +- apps/freya-backend/src/sources/http.test.ts | 2 +- bun.lock | 58 +- 35 files changed, 2769 insertions(+), 648 deletions(-) create mode 100644 apps/freya-backend/src/agent/job.test.ts create mode 100644 apps/freya-backend/src/agent/job.ts create mode 100644 apps/freya-backend/src/agent/reconciler.ts create mode 100644 apps/freya-backend/src/agent/scheduler.ts create mode 100644 apps/freya-backend/src/agent/service.ts create mode 100644 apps/freya-backend/src/conversations/db-storage.ts create mode 100644 apps/freya-backend/src/conversations/serialization.ts create mode 100644 apps/freya-backend/src/lib/job.ts create mode 100644 apps/freya-backend/src/lib/logger.ts create mode 100644 apps/freya-backend/src/lib/queue.ts create mode 100644 apps/freya-backend/src/lib/worker.ts create mode 100644 apps/freya-backend/src/notification/notification-central.ts diff --git a/apps/agent-test-cli/package.json b/apps/agent-test-cli/package.json index 5a7dee6..3bd5e48 100644 --- a/apps/agent-test-cli/package.json +++ b/apps/agent-test-cli/package.json @@ -10,6 +10,6 @@ }, "dependencies": { "@freya/agent-protocol": "workspace:*", - "@nym.sh/jrpc": "^0.1.0" + "@nym.sh/jrpc": "1.1.0" } } diff --git a/apps/agent-test-cli/src/agent-test-cli.ts b/apps/agent-test-cli/src/agent-test-cli.ts index 173856d..1067a45 100644 --- a/apps/agent-test-cli/src/agent-test-cli.ts +++ b/apps/agent-test-cli/src/agent-test-cli.ts @@ -1,14 +1,17 @@ -import type { - AgentClientApi, - AgentEvent, - AgentServerApi, - SendMessageResult, -} from "@freya/agent-protocol" import type { JrpcChannel, JrpcMessage, JsonRpcMessage } from "@nym.sh/jrpc" +import { + AgentEventKind, + type AgentClientApi, + type AgentConversationEntryCreatedEvent, + type AgentEvent, + type AgentServerApi, +} from "@freya/agent-protocol" import { JsonRpcClient, JsonRpcServer } from "@nym.sh/jrpc" type JsonObject = Record +type MessagePart = { type: "text"; text: string } | { type: "json"; value: unknown } +type SendMessageResult = Awaited> interface AuthUser { id: string @@ -71,7 +74,6 @@ class AgentWebSocketSession implements AgentClientApi { private readonly client: JsonRpcClient private readonly server: JsonRpcServer private conversationId: string | undefined - private responseHadText = false private constructor(channel: WebSocketJrpcChannel) { this.channel = channel @@ -105,36 +107,23 @@ class AgentWebSocketSession implements AgentClientApi { } async ask(message: string): Promise { - this.responseHadText = false - - const result = await this.sendMessage(message) - if (result.conversationId) { - this.conversationId = result.conversationId - } - - if (!this.responseHadText) { - console.log(`\nagent> ${result.message || "(no message)"}`) - } + const entry = await this.sendMessage(message) + this.conversationId = entry.conversationId + console.log(`\nqueued> ${entry.kind} ${entry.id}`) console.log("") } notify(event: AgentEvent): void { - switch (event.type) { - case "conversation_started": + switch (event.kind) { + case AgentEventKind.ConversationStarted: this.conversationId = event.conversationId break - case "message_created": - this.printMessage(event.text) + case AgentEventKind.ConversationEntryCreated: + this.printConversationEntry(event.entry) break - case "tool_started": - console.log(`\ntool> ${event.toolName} started`) + case AgentEventKind.ResponseFinished: break - case "tool_finished": - console.log(`tool> ${event.toolName} ${event.ok ? "finished" : "failed"}`) - break - case "message_finished": - break - case "message_failed": + case AgentEventKind.ResponseFailed: console.log(`\nagent! ${event.error}`) break } @@ -156,7 +145,31 @@ class AgentWebSocketSession implements AgentClientApi { if (text === "") return console.log(`\nagent> ${text}`) - this.responseHadText = true + } + + private printConversationEntry(entry: AgentConversationEntryCreatedEvent["entry"]): void { + this.conversationId = entry.conversationId + + switch (entry.kind) { + case "assistant_message": + this.printMessage(messagePartsText(entry.payload.parts)) + break + case "tool_call": + console.log(`\ntool> ${payloadString(entry.payload, "toolName", "unknown")} started`) + break + case "tool_result": + console.log( + `tool> ${payloadString(entry.payload, "toolName", "unknown")} ${ + payloadBoolean(entry.payload, "ok") ? "finished" : "failed" + }`, + ) + break + case "user_message": + case "attachment": + case "context_summary": + case "system_note": + break + } } } @@ -691,6 +704,28 @@ function parseJsonArgument(value: string, fallback: unknown): unknown { } } +function messagePartsText(parts: MessagePart[]): string { + return parts.map(messagePartText).join("\n") +} + +function messagePartText(part: MessagePart): string { + switch (part.type) { + case "text": + return part.text + case "json": + return formatJson(part.value) + } +} + +function payloadString(payload: Record, key: string, fallback: string): string { + const value = payload[key] + return typeof value === "string" ? value : fallback +} + +function payloadBoolean(payload: Record, key: string): boolean { + return payload[key] === true +} + function formatJson(value: unknown): string { const serialized = JSON.stringify(value, null, 2) return serialized ?? "undefined" diff --git a/apps/freya-backend/package.json b/apps/freya-backend/package.json index 5d8d1d3..32dde12 100644 --- a/apps/freya-backend/package.json +++ b/apps/freya-backend/package.json @@ -26,13 +26,15 @@ "@freya/source-tfl": "workspace:*", "@freya/source-weatherkit": "workspace:*", "@freya/source-web-search": "workspace:*", - "@nym.sh/jrpc": "^0.1.0", + "@nym.sh/jrpc": "1.1.0", "@openrouter/sdk": "^0.9.11", "arktype": "^2.1.29", "better-auth": "^1", "drizzle-orm": "^0.45.1", "hono": "^4", "lodash.merge": "^4.6.2", + "pino": "^10.3.1", + "pino-pretty": "^13.1.3", "typebox": "^1.1.38" }, "devDependencies": { diff --git a/apps/freya-backend/src/admin/http.test.ts b/apps/freya-backend/src/admin/http.test.ts index 7837777..d1dfa17 100644 --- a/apps/freya-backend/src/admin/http.test.ts +++ b/apps/freya-backend/src/admin/http.test.ts @@ -44,7 +44,7 @@ mock.module("../sources/user-sources.ts", () => ({ }), })) -mock.module("../conversations/storage.ts", () => ({ +mock.module("../conversations/db-storage.ts", () => ({ conversations: (_db: Database, userId: string) => ({ async getOrCreateConversation() { return { id: `conversation-${userId}` } diff --git a/apps/freya-backend/src/agent/conversation-recording-query-agent.ts b/apps/freya-backend/src/agent/conversation-recording-query-agent.ts index 2d6677b..5735096 100644 --- a/apps/freya-backend/src/agent/conversation-recording-query-agent.ts +++ b/apps/freya-backend/src/agent/conversation-recording-query-agent.ts @@ -26,10 +26,13 @@ export interface ConversationStorage { appendEntry( conversationId: string, input: AppendConversationEntryInput, - ): Promise + ): Promise listEntries(conversationId: string): Promise } +/** Minimal fields needed immediately after appending a conversation entry. */ +export type ConversationStorageAppendEntryResult = Pick + /** Minimal persisted entry shape needed by recording and replay agents. */ export type ConversationStorageEntry = Pick< ConversationEntryRow, diff --git a/apps/freya-backend/src/agent/job.test.ts b/apps/freya-backend/src/agent/job.test.ts new file mode 100644 index 0000000..9e110b0 --- /dev/null +++ b/apps/freya-backend/src/agent/job.test.ts @@ -0,0 +1,377 @@ +import { AgentEventKind, type AgentEvent } from "@freya/agent-protocol" +import { + ConversationEntryKind, + ConversationEntryVisibility, + type ConversationEntry, + type ConversationEntryVisibility as ConversationEntryVisibilityType, +} from "@freya/core" +import { describe, expect, test } from "bun:test" + +import type { + AppendAttachmentEntryInput, + AppendAttachmentEntryResult, + AppendConversationEntryInput, + ConversationEntryRow, + ConversationResponseStateRow, + ConversationRow, + ConversationStorage, + CreateFileInput, + FileRow, + ListConversationEntriesParams, + UpdateConversationResponseStateInput, + UpsertConversationResponseStateInput, +} from "../conversations/storage.ts" +import type { ConversationResponseStateStatus } from "../db/schema.ts" +import type { Job } from "../lib/job.ts" +import type { UserSessionManager } from "../session/index.ts" +import type { + QueryAgent, + QueryAgentAsk, + QueryAgentEvent, + QueryAgentEventListener, + QueryAgentStreamEvent, +} from "./query-agent.ts" + +import { ConversationResponseStateStatus as ResponseStateStatus } from "../db/schema.ts" +import { + NotificationCentral, + type NotificationPayload, +} from "../notification/notification-central.ts" +import { AgentResponseJobExecutor, type AgentResponseJobPayload } from "./job.ts" + +const ConversationId = "00000000-0000-4000-8000-000000000001" +const UserId = "user-1" +const UserEntryId = "00000000-0000-4000-8000-000000000002" +const Now = new Date("2026-07-03T00:00:00.000Z") + +class FakeConversationStorage implements ConversationStorage { + readonly appended: ConversationEntry[] = [] + readonly clearedConversationIds: string[] = [] + readonly markedStatuses: Array<{ + conversationIds: string[] + status: ConversationResponseStateStatus + }> = [] + + private nextSequenceValue = 2 + + async transaction(tx: (storage: ConversationStorage) => T | Promise): Promise { + return tx(this) + } + + async createConversation(_userId: string): Promise { + throw new Error("createConversation is not implemented") + } + + async listUserConversations(_userId: string): Promise { + throw new Error("listUserConversations is not implemented") + } + + async findConversation(conversationId: string): Promise { + if (conversationId !== ConversationId) return null + return conversationRow() + } + + async getOrCreateConversation(_userId: string): Promise { + throw new Error("getOrCreateConversation is not implemented") + } + + async createFile(_userId: string, _input: CreateFileInput): Promise { + throw new Error("createFile is not implemented") + } + + async appendEntry( + conversationId: string, + input: AppendConversationEntryInput, + ): Promise { + const entry = conversationEntryFromAppendInput(conversationId, this.nextSequenceValue, input) + this.nextSequenceValue += 1 + this.appended.push(entry) + return entry + } + + async appendAttachmentEntry( + _conversationId: string, + _input: AppendAttachmentEntryInput, + ): Promise { + throw new Error("appendAttachmentEntry is not implemented") + } + + async nextSequence(_conversationId: string): Promise { + return this.nextSequenceValue + } + + async listUserConversationEntries( + _userId: string, + _conversationId: string, + _params?: ListConversationEntriesParams, + ): Promise { + throw new Error("listUserConversationEntries is not implemented") + } + + async listPendingUserConversationEntries( + _userId: string, + conversationId: string, + ): Promise { + return [pendingUserEntryRow(conversationId)] + } + + async findConversationResponseState( + _conversationId: string, + ): Promise { + throw new Error("findConversationResponseState is not implemented") + } + + async listPendingResponseStates(): Promise { + throw new Error("listPendingResponseStates is not implemented") + } + + async listRunningResponseStates(): Promise { + throw new Error("listRunningResponseStates is not implemented") + } + + async upsertConversationResponseState( + _conversationId: string, + _input: UpsertConversationResponseStateInput, + ): Promise { + throw new Error("upsertConversationResponseState is not implemented") + } + + async updateConversationResponseState( + _conversationId: string, + _input: UpdateConversationResponseStateInput, + ): Promise { + throw new Error("updateConversationResponseState is not implemented") + } + + async markResponseStateStatus( + conversationIds: string[], + status: ConversationResponseStateStatus, + ): Promise { + this.markedStatuses.push({ conversationIds, status }) + return [] + } + + async claimPendingConversationResponseState( + conversationId: string, + ): Promise { + if (conversationId !== ConversationId) return null + return conversationResponseStateRow() + } + + async clearConversationResponseState(conversationId: string): Promise { + this.clearedConversationIds.push(conversationId) + } +} + +class FakeQueryAgent implements QueryAgent { + readonly inputs: QueryAgentAsk[] = [] + private readonly events: QueryAgentStreamEvent[] + + constructor(events: QueryAgentStreamEvent[]) { + this.events = events + } + + async *ask(input: QueryAgentAsk): AsyncIterable { + this.inputs.push(input) + for (const event of this.events) { + yield event + } + } + + addEventListener( + _type: T, + _listener: QueryAgentEventListener, + ): () => void { + return () => {} + } + + dispose(): void {} +} + +describe("AgentResponseJobExecutor", () => { + test("notifies persisted conversation entries for streamed response events", async () => { + const storage = new FakeConversationStorage() + const agent = new FakeQueryAgent([ + { type: "text_delta", text: "I'll check\n" }, + { type: "tool_start", toolName: "calendar" }, + { type: "tool_end", toolName: "calendar", ok: true }, + { type: "text_delta", text: "All set" }, + { type: "done" }, + ]) + const notifications: NotificationPayload[] = [] + const notificationCentral = new NotificationCentral() + notificationCentral.registerListenerForUser(UserId, async (notification) => { + notifications.push(notification) + }) + const executor = new AgentResponseJobExecutor({ + conversationStorage: storage, + userSessionManager: fakeUserSessionManager(agent), + notificationCentral, + }) + + await executor.execute(agentResponseJob()) + + expect(agent.inputs).toHaveLength(1) + expect(agent.inputs[0]?.message).toContain("What's next?") + expect(agent.inputs[0]?.conversationId).toBe(ConversationId) + expect(agent.inputs[0]?.signal).toBeInstanceOf(AbortSignal) + expect(storage.appended.map((entry) => entry.kind)).toEqual([ + ConversationEntryKind.AssistantMessage, + ConversationEntryKind.ToolCall, + ConversationEntryKind.ToolResult, + ConversationEntryKind.AssistantMessage, + ]) + expect(notifications.map((notification) => notification.payload.kind)).toEqual([ + AgentEventKind.ConversationEntryCreated, + AgentEventKind.ConversationEntryCreated, + AgentEventKind.ConversationEntryCreated, + AgentEventKind.ConversationEntryCreated, + AgentEventKind.ResponseFinished, + ]) + expect(conversationEntryNotifications(notifications).map((event) => event.entry)).toEqual( + storage.appended, + ) + expect(storage.clearedConversationIds).toEqual([ConversationId]) + expect(storage.markedStatuses).toEqual([]) + }) +}) + +function fakeUserSessionManager(agent: QueryAgent): UserSessionManager { + return { + async getOrCreate(userId: string) { + expect(userId).toBe(UserId) + return { agent } + }, + } as unknown as UserSessionManager +} + +function agentResponseJob(): Job { + const controller = new AbortController() + return { + id: 1, + payload: { conversationId: ConversationId }, + signal: controller.signal, + } +} + +function conversationEntryNotifications( + notifications: NotificationPayload[], +): Array> { + return notifications + .map((notification) => notification.payload) + .filter(isConversationEntryCreatedEvent) +} + +function isConversationEntryCreatedEvent( + event: AgentEvent, +): event is Extract { + return event.kind === AgentEventKind.ConversationEntryCreated +} + +function conversationRow(): ConversationRow { + return { + id: ConversationId, + userId: UserId, + createdAt: Now, + updatedAt: Now, + } +} + +function conversationResponseStateRow(): ConversationResponseStateRow { + return { + conversationId: ConversationId, + status: ResponseStateStatus.Running, + pendingSinceEntryId: UserEntryId, + maxWaitUntil: Now, + runningSince: Now, + createdAt: Now, + updatedAt: Now, + } +} + +function pendingUserEntryRow(conversationId: string): ConversationEntryRow { + return { + id: UserEntryId, + conversationId, + sequence: 1, + kind: ConversationEntryKind.UserMessage, + visibility: ConversationEntryVisibility.UserVisible, + fileId: null, + payload: { + role: "user", + parts: [{ type: "text", text: "What's next?" }], + }, + metadata: {}, + createdAt: Now, + } +} + +function conversationEntryFromAppendInput( + conversationId: string, + sequence: number, + input: AppendConversationEntryInput, +): ConversationEntry { + const base = { + id: entryId(sequence), + conversationId, + sequence, + visibility: input.visibility ?? defaultVisibilityForKind(input.kind), + fileId: null, + metadata: input.metadata ?? {}, + createdAt: Now.toISOString(), + } + + switch (input.kind) { + case ConversationEntryKind.UserMessage: + return { + ...base, + kind: input.kind, + payload: input.payload, + } + case ConversationEntryKind.AssistantMessage: + return { + ...base, + kind: input.kind, + payload: input.payload, + } + case ConversationEntryKind.ToolCall: + case ConversationEntryKind.ToolResult: + case ConversationEntryKind.SystemNote: + return { + ...base, + kind: input.kind, + payload: input.payload, + } + case ConversationEntryKind.ContextSummary: + return { + ...base, + kind: input.kind, + payload: input.payload, + } + case ConversationEntryKind.Attachment: + return { + ...base, + kind: input.kind, + fileId: input.fileId, + payload: input.payload, + } + } +} + +function defaultVisibilityForKind(kind: ConversationEntryKind): ConversationEntryVisibilityType { + switch (kind) { + case ConversationEntryKind.UserMessage: + case ConversationEntryKind.AssistantMessage: + case ConversationEntryKind.Attachment: + return ConversationEntryVisibility.UserVisible + case ConversationEntryKind.ToolCall: + case ConversationEntryKind.ToolResult: + case ConversationEntryKind.ContextSummary: + case ConversationEntryKind.SystemNote: + return ConversationEntryVisibility.Internal + } +} + +function entryId(sequence: number): string { + return `00000000-0000-4000-8000-${sequence.toString().padStart(12, "0")}` +} diff --git a/apps/freya-backend/src/agent/job.ts b/apps/freya-backend/src/agent/job.ts new file mode 100644 index 0000000..44f4098 --- /dev/null +++ b/apps/freya-backend/src/agent/job.ts @@ -0,0 +1,294 @@ +import { AgentEventKind } from "@freya/agent-protocol" +import { + AssistantMessagePayload, + ConversationEntryKind, + ToolCallPayload, + ToolResultPayload, + UserMessagePayload, +} from "@freya/core" +import { type } from "arktype" + +import type { ConversationStorage } from "../conversations/storage" +import type { Job } from "../lib/job" +import type { AppLogger } from "../lib/logger" +import type { JobExecutor } from "../lib/worker" +import type { NotificationCentral } from "../notification/notification-central" +import type { UserSessionManager } from "../session" + +import { ConversationResponseStateStatus } from "../db/schema" +import { logger as rootLogger } from "../lib/logger" +import { + AgentResponseStreamEventKind, + streamAgentResponse, + type AgentResponseStreamEvent, +} from "./streaming" + +export interface AgentResponseJobPayload { + conversationId: string +} + +interface AgentResponseWorkerConfig { + conversationStorage: ConversationStorage + logger?: AppLogger + userSessionManager: UserSessionManager + notificationCentral: NotificationCentral +} + +export class AgentResponseJobExecutor implements JobExecutor { + private conversationStorage: ConversationStorage + private logger: AppLogger + private userSessionManager: UserSessionManager + private notificationCentral: NotificationCentral + + constructor({ + conversationStorage, + logger, + userSessionManager, + notificationCentral, + }: AgentResponseWorkerConfig) { + this.conversationStorage = conversationStorage + this.logger = logger ?? rootLogger.child({ component: "agent_response_job" }) + this.userSessionManager = userSessionManager + this.notificationCentral = notificationCentral + } + + async execute(job: Job): Promise { + const conversationId = job.payload.conversationId + const startedAt = Date.now() + const logger = this.logger.child({ conversationId, jobId: job.id }) + + logger.info("agent response job started") + + const conversation = await this.conversationStorage.findConversation(conversationId) + if (!conversation) { + logger.warn("conversation not found for agent response job") + return + } + + const claimed = + await this.conversationStorage.claimPendingConversationResponseState(conversationId) + if (!claimed) { + // conversation response state not found or already claimed + logger.debug("agent response state was not claimable") + return + } + + try { + const pendingEntries = await this.conversationStorage.listPendingUserConversationEntries( + conversation.userId, + conversation.id, + ) + if (pendingEntries.length === 0) { + await this.conversationStorage.clearConversationResponseState(conversationId) + logger.debug("agent response state cleared with no pending user entries") + return + } + + let invalidPendingEntryCount = 0 + const message = pendingEntries.reduce((acc, entry) => { + const payload = UserMessagePayload(entry.payload) + if (payload instanceof type.errors) { + invalidPendingEntryCount += 1 + return acc + } + return ( + acc + + "\n" + + payload.parts.reduce((msg, p) => (p.type === "text" ? msg + p.text : msg), "") + ) + }, "") + + logger.debug( + { + invalidPendingEntryCount, + messageLength: message.length, + pendingEntryCount: pendingEntries.length, + }, + "agent response pending user entries loaded", + ) + + if (invalidPendingEntryCount > 0) { + logger.warn( + { invalidPendingEntryCount }, + "invalid pending user entries skipped for agent response", + ) + } + + const session = await this.userSessionManager.getOrCreate(conversation.userId) + + for await (const event of streamAgentResponse({ + agent: session.agent, + input: { + message, + conversationId: conversation.id, + signal: job.signal, + }, + })) { + if (job.signal.aborted) { + break + } + + await this.handleStreamEvent(event, conversation.id, conversation.userId, logger) + } + + // if job is aborted, stop everything immediately, including clean up. + // the aborter is assumed responsibility on how to proceed. + if (!job.signal.aborted) { + await this.conversationStorage.clearConversationResponseState(conversationId) + logger.info({ durationMs: Date.now() - startedAt }, "agent response job completed") + } else { + logger.warn({ durationMs: Date.now() - startedAt }, "agent response job aborted") + } + } catch (err) { + const durationMs = Date.now() - startedAt + if (job.signal.aborted) { + logger.warn({ durationMs, err }, "agent response job aborted") + return + } + + logger.error({ durationMs, err }, "agent response job failed") + try { + await this.conversationStorage.markResponseStateStatus( + [conversationId], + ConversationResponseStateStatus.Failed, + ) + logger.warn( + { status: ConversationResponseStateStatus.Failed }, + "conversation response state marked failed", + ) + } catch (markErr) { + logger.error({ err: markErr }, "failed to mark conversation response state failed") + } + } + } + + private async handleStreamEvent( + event: AgentResponseStreamEvent, + conversationId: string, + userId: string, + logger: AppLogger, + ): Promise { + switch (event.kind) { + case AgentResponseStreamEventKind.ConversationStarted: + await this.notificationCentral.notifyUser(userId, { + kind: "agent", + payload: { + kind: AgentEventKind.ConversationStarted, + conversationId: event.conversationId, + }, + }) + logger.debug( + { + eventKind: event.kind, + upstreamConversationId: event.conversationId, + }, + "agent conversation started", + ) + break + + case AgentResponseStreamEventKind.AssistantMessage: { + const entry = await this.conversationStorage.appendEntry(conversationId, { + kind: ConversationEntryKind.AssistantMessage, + payload: { + role: "assistant", + parts: [{ type: "text", text: event.text }], + } satisfies AssistantMessagePayload, + }) + await this.notificationCentral.notifyUser(userId, { + kind: "agent", + payload: { + kind: AgentEventKind.ConversationEntryCreated, + entry, + }, + }) + logger.debug( + { + entryId: entry.id, + eventKind: event.kind, + textLength: event.text.length, + }, + "agent assistant message persisted", + ) + break + } + + case AgentResponseStreamEventKind.ToolStarted: { + const entry = await this.conversationStorage.appendEntry(conversationId, { + kind: ConversationEntryKind.ToolCall, + payload: { + toolName: event.toolName, + } satisfies ToolCallPayload, + }) + await this.notificationCentral.notifyUser(userId, { + kind: "agent", + payload: { + kind: AgentEventKind.ConversationEntryCreated, + entry, + }, + }) + logger.debug( + { + entryId: entry.id, + eventKind: event.kind, + toolName: event.toolName, + }, + "agent tool call persisted", + ) + break + } + + case AgentResponseStreamEventKind.ToolFinished: { + const entry = await this.conversationStorage.appendEntry(conversationId, { + kind: ConversationEntryKind.ToolResult, + payload: { + toolName: event.toolName, + ok: event.ok, + } satisfies ToolResultPayload, + }) + await this.notificationCentral.notifyUser(userId, { + kind: "agent", + payload: { + kind: AgentEventKind.ConversationEntryCreated, + entry, + }, + }) + logger.debug( + { + entryId: entry.id, + eventKind: event.kind, + ok: event.ok, + toolName: event.toolName, + }, + "agent tool result persisted", + ) + break + } + + case AgentResponseStreamEventKind.ResponseFinished: + await this.notificationCentral.notifyUser(userId, { + kind: "agent", + payload: { + kind: AgentEventKind.ResponseFinished, + conversationId, + }, + }) + logger.info({ eventKind: event.kind }, "agent response finished") + break + + case AgentResponseStreamEventKind.ResponseFailed: + await this.notificationCentral.notifyUser(userId, { + kind: "agent", + payload: { + kind: AgentEventKind.ResponseFailed, + conversationId, + error: event.error, + }, + }) + logger.warn( + { agentError: event.error, eventKind: event.kind }, + "agent response failed event received", + ) + break + } + } +} diff --git a/apps/freya-backend/src/agent/pi-query-agent.ts b/apps/freya-backend/src/agent/pi-query-agent.ts index 8c285ca..50abe64 100644 --- a/apps/freya-backend/src/agent/pi-query-agent.ts +++ b/apps/freya-backend/src/agent/pi-query-agent.ts @@ -166,6 +166,16 @@ export class PiQueryAgent implements QueryAgent { this.handlePiEvent(event, pushRunEvent) }) + input.signal?.addEventListener( + "abort", + async () => { + await session.abort() + close() + unsubscribe() + }, + { once: true }, + ) + session .prompt(input.message) .then(() => { diff --git a/apps/freya-backend/src/agent/query-agent.ts b/apps/freya-backend/src/agent/query-agent.ts index 54bd132..14589a9 100644 --- a/apps/freya-backend/src/agent/query-agent.ts +++ b/apps/freya-backend/src/agent/query-agent.ts @@ -2,6 +2,7 @@ export interface QueryAgentAsk { message: string conversationId?: string userMessageEntry?: QueryAgentConversationEntryRef + signal?: AbortSignal } export type QueryAgentStreamEvent = diff --git a/apps/freya-backend/src/agent/reconciler.ts b/apps/freya-backend/src/agent/reconciler.ts new file mode 100644 index 0000000..9f13783 --- /dev/null +++ b/apps/freya-backend/src/agent/reconciler.ts @@ -0,0 +1,100 @@ +import type { ConversationStorage } from "../conversations/storage" +import type { AppLogger } from "../lib/logger" +import type { AgentWorkScheduler } from "./scheduler" + +import { logger as rootLogger } from "../lib/logger" + +interface AgentResponseReconcilerConfig { + storage: ConversationStorage + interval: number + logger?: AppLogger + scheduler: AgentWorkScheduler + signal: AbortSignal +} + +export class AgentResponseReconciler { + private storage: ConversationStorage + private interval: number + private logger: AppLogger + private scheduler: AgentWorkScheduler + private signal: AbortSignal + + private stopLoop: ReturnType | null = null + + constructor({ storage, interval, logger, scheduler, signal }: AgentResponseReconcilerConfig) { + this.storage = storage + this.interval = interval + this.logger = logger ?? rootLogger.child({ component: "agent_reconciler" }) + this.scheduler = scheduler + this.signal = signal + } + + start() { + this.signal.throwIfAborted() + this.logger.debug({ intervalMs: this.interval }, "agent response reconciler started") + + this.signal.addEventListener( + "abort", + () => { + if (this.stopLoop !== null) { + clearInterval(this.stopLoop) + this.stopLoop = null + this.logger.debug("agent response reconciler stopped") + } + }, + { once: true }, + ) + + this.stopLoop = setInterval(() => { + void this.reconcile() + }, this.interval) + } + + private async reconcile() { + try { + // enqueue pending responses + const pendingStates = await this.storage.listPendingResponseStates() + const now = new Date().getTime() + this.logger.debug({ pendingCount: pendingStates.length }, "agent response reconcile tick") + for (const state of pendingStates) { + if (state.maxWaitUntil.getTime() < now) { + this.logger.info( + { + conversationId: state.conversationId, + maxWaitUntil: state.maxWaitUntil.toISOString(), + }, + "pending agent response reached max wait", + ) + this.scheduler.enqueueAgentResponse(state.conversationId) + } + } + + // re-enqueue stuck responses + const runningStates = await this.storage.listRunningResponseStates() + const stuckIds: string[] = [] + for (const state of runningStates) { + if (state.runningSince && Math.max(now - state.runningSince.getTime(), 0) > 5 * 1000 * 60) { + // if the response is running for more than 5 minutes + // we assume that its stuck and enqueue it for retry + stuckIds.push(state.conversationId) + } + } + this.logger.debug( + { runningCount: runningStates.length, stuckCount: stuckIds.length }, + "agent response running states reconciled", + ) + if (stuckIds.length > 0) { + await this.storage.markResponseStateStatus(stuckIds, "pending") + this.logger.warn( + { conversationIds: stuckIds, count: stuckIds.length }, + "stuck agent responses requeued", + ) + for (const id of stuckIds) { + this.scheduler.enqueueAgentResponse(id) + } + } + } catch (err) { + this.logger.error({ err }, "agent response reconciliation failed") + } + } +} diff --git a/apps/freya-backend/src/agent/scheduler.ts b/apps/freya-backend/src/agent/scheduler.ts new file mode 100644 index 0000000..00ef424 --- /dev/null +++ b/apps/freya-backend/src/agent/scheduler.ts @@ -0,0 +1,176 @@ +import type { UserEvent } from "@freya/agent-protocol" + +import type { ConversationStorage } from "../conversations/storage" +import type { Job, JobRegistry } from "../lib/job" +import type { AppLogger } from "../lib/logger" +import type { AgentResponseJobPayload } from "./job" + +import { ConversationNotFoundError } from "../conversations/errors" +import { ConversationResponseStateStatus } from "../db/schema" +import { logger as rootLogger } from "../lib/logger" + +const AgentJobCancellationReason = { + NewUserActivity: "new_user_activity", + SupersededByEnqueue: "superseded_by_enqueue", +} as const +type AgentJobCancellationReason = + (typeof AgentJobCancellationReason)[keyof typeof AgentJobCancellationReason] + +interface AgentMessageSchedulerConfig { + storage: ConversationStorage + maxWaitTime: number + + /** + * How long to wait before responding to the user. + */ + waitTime: number + + jobRegistry: JobRegistry + logger?: AppLogger +} + +/** + * Schedules and manages the flow of messages between the user and the query agent for a specific conversation. + */ +export class AgentWorkScheduler { + private conversationStorage: ConversationStorage + private jobRegistry: JobRegistry + private logger: AppLogger + + private timing: { + maxWaitTime: number + waitTime: number + } + + private timers = new Map>() + private runningJobs = new Map>() + + constructor(config: AgentMessageSchedulerConfig) { + this.conversationStorage = config.storage + this.jobRegistry = config.jobRegistry + this.logger = config.logger ?? rootLogger.child({ component: "agent_scheduler" }) + this.timing = { + maxWaitTime: config.maxWaitTime, + waitTime: config.waitTime, + } + + this.jobRegistry.addEventListener("settled", this.eraseJob.bind(this)) + this.jobRegistry.addEventListener("cancelled", this.eraseJob.bind(this)) + } + + async scheduleAgentResponse(conversationId: string) { + const existing = this.timers.get(conversationId) + if (existing) { + clearTimeout(existing) + this.logger.debug({ conversationId }, "existing agent response timer replaced") + } + + this.cancelCurrentJob(conversationId, AgentJobCancellationReason.NewUserActivity) + + this.timers.set( + conversationId, + setTimeout(() => { + this.enqueueAgentResponse(conversationId) + }, this.timing.waitTime), + ) + + this.logger.info( + { + conversationId, + maxWaitMs: this.timing.maxWaitTime, + waitMs: this.timing.waitTime, + }, + "agent response scheduled", + ) + } + + async receiveUserEvent(conversationId: string, event: UserEvent) { + if (event.type === "typing") { + this.logger.debug({ conversationId, eventType: event.type }, "user event received") + await this.delayAgentResponse(conversationId) + this.logger.debug({ conversationId }, "agent response delay handled") + } + } + + enqueueAgentResponse(conversationId: string): void { + const existing = this.timers.get(conversationId) + if (existing) { + clearTimeout(existing) + this.timers.delete(conversationId) + this.logger.debug({ conversationId }, "agent response timer consumed") + } + + this.cancelCurrentJob(conversationId, AgentJobCancellationReason.SupersededByEnqueue) + + const job = this.jobRegistry.addJob({ + payload: { conversationId }, + }) + this.runningJobs.set(conversationId, job) + this.logger.info({ conversationId, jobId: job.id }, "agent response job enqueued") + } + + private async delayAgentResponse(conversationId: string) { + this.cancelCurrentJob(conversationId, AgentJobCancellationReason.NewUserActivity) + + try { + const ok = await this.conversationStorage.transaction(async (storage) => { + const state = await storage.findConversationResponseState(conversationId) + if (state && state.status !== ConversationResponseStateStatus.Failed) { + await storage.updateConversationResponseState(conversationId, { + status: ConversationResponseStateStatus.Pending, + // the agent response was cancelled, so its no longer running + // clear runningSince timestamp + runningSince: null, + }) + return true + } + return false + }) + if (ok) { + await this.scheduleAgentResponse(conversationId) + } else { + this.logger.debug( + { conversationId }, + "agent response delay skipped because response state is not pending", + ) + } + } catch (error) { + if (error instanceof ConversationNotFoundError) { + // the user is typing but there isn't a scheduled agent response yet + // which means the user is typing their first message after the agent has previously responded + // swallow the error + this.logger.debug({ conversationId }, "typing event received without active conversation") + } else { + this.logger.error({ err: error, conversationId }, "agent response delay failed") + } + return + } + } + + /** + * cancels the current job for agent response for the given conversation id + * no-op if there is no active job for the conversation. + */ + private cancelCurrentJob(conversationId: string, reason: AgentJobCancellationReason): void { + const job = this.runningJobs.get(conversationId) + if (!job) return + + // If an active response is working on stale context, abort it so the next + // job can answer using the latest pending user messages. + this.jobRegistry.cancelJob(job) + this.logger.info( + { conversationId, jobId: job.id, reason }, + "active agent response job cancelled", + ) + } + + private eraseJob(job: Job) { + if (this.runningJobs.get(job.payload.conversationId) === job) { + this.runningJobs.delete(job.payload.conversationId) + this.logger.debug( + { conversationId: job.payload.conversationId, jobId: job.id }, + "agent response job tracking cleared", + ) + } + } +} diff --git a/apps/freya-backend/src/agent/service.ts b/apps/freya-backend/src/agent/service.ts new file mode 100644 index 0000000..6ad2b7c --- /dev/null +++ b/apps/freya-backend/src/agent/service.ts @@ -0,0 +1,152 @@ +import type { UserEvent } from "@freya/agent-protocol" + +import { ConversationEntry, ConversationEntryKind, UserMessagePayload } from "@freya/core" + +import type { ConversationStorage } from "../conversations/storage" +import type { AppLogger } from "../lib/logger" +import type { NotificationCentral } from "../notification/notification-central" +import type { UserSessionManager } from "../session" + +import { JobRegistry } from "../lib/job" +import { logger as rootLogger } from "../lib/logger" +import { Worker } from "../lib/worker" +import { AgentResponseJobExecutor, type AgentResponseJobPayload } from "./job" +import { AgentResponseReconciler } from "./reconciler" +import { AgentWorkScheduler } from "./scheduler" + +const AgentResponseTiming = { + waitTime: 5 * 1000, + maxWaitTime: 5 * 1000 * 60, +} as const + +interface AgentServiceConfig { + storage: ConversationStorage + userSessionManager: UserSessionManager + notificationCentral: NotificationCentral + signal: AbortSignal + logger?: AppLogger +} + +export class AgentService { + private readonly storage: ConversationStorage + private readonly logger: AppLogger + private readonly scheduler: AgentWorkScheduler + private readonly reconciler: AgentResponseReconciler + private readonly worker: Worker + + private readonly jobRegistry = new JobRegistry() + + constructor({ + storage, + userSessionManager, + notificationCentral, + signal, + logger, + }: AgentServiceConfig) { + const baseLogger = logger ?? rootLogger + + this.storage = storage + this.logger = baseLogger.child({ component: "agent_service" }) + this.scheduler = new AgentWorkScheduler({ + storage, + jobRegistry: this.jobRegistry, + waitTime: AgentResponseTiming.waitTime, + maxWaitTime: AgentResponseTiming.maxWaitTime, + logger: baseLogger.child({ component: "agent_scheduler" }), + }) + this.reconciler = new AgentResponseReconciler({ + signal, + storage: this.storage, + interval: 60 * 1000, + scheduler: this.scheduler, + logger: baseLogger.child({ component: "agent_reconciler" }), + }) + this.worker = new Worker({ + signal, + concurrency: 10, + registry: this.jobRegistry, + logger: baseLogger.child({ component: "agent_worker" }), + jobFields: agentResponseJobFields, + runner: new AgentResponseJobExecutor({ + conversationStorage: storage, + notificationCentral, + userSessionManager, + logger: baseLogger.child({ component: "agent_response_job" }), + }), + }) + } + + start() { + this.logger.info("agent service starting") + this.worker.start() + this.reconciler.start() + } + + async scheduleAgentResponse(conversationId: string, message: string): Promise { + this.logger.info( + { conversationId, messageLength: message.length }, + "scheduling agent response from user message", + ) + + try { + const scheduledResponse = await this.storage.transaction(async (storage) => { + const now = new Date() + const maxWaitUntil = new Date(now.getTime() + AgentResponseTiming.maxWaitTime) + + const entry = await storage.appendEntry(conversationId, { + kind: ConversationEntryKind.UserMessage, + payload: { + role: "user", + parts: [{ type: "text", text: message }], + } satisfies UserMessagePayload, + }) + + await storage.upsertConversationResponseState(conversationId, { + maxWaitUntil, + pendingSinceEntryId: entry.id, + status: "pending", + }) + + return { entry, maxWaitUntil } + }) + + this.logger.info( + { + conversationId, + entryId: scheduledResponse.entry.id, + maxWaitMs: AgentResponseTiming.maxWaitTime, + maxWaitUntil: scheduledResponse.maxWaitUntil.toISOString(), + waitMs: AgentResponseTiming.waitTime, + }, + "agent response state set pending", + ) + + await this.scheduler.scheduleAgentResponse(conversationId) + + return scheduledResponse.entry + } catch (err) { + this.logger.error({ err, conversationId }, "failed to schedule agent response") + throw err + } + } + + async handleUserEvent(conversationId: string, event: UserEvent) { + try { + await this.scheduler.receiveUserEvent(conversationId, event) + } catch (err) { + this.logger.error( + { err, conversationId, eventType: event.type }, + "failed to handle user event", + ) + throw err + } + } +} + +function agentResponseJobFields(job: { + payload: AgentResponseJobPayload +}): Record { + return { + conversationId: job.payload.conversationId, + } +} diff --git a/apps/freya-backend/src/agent/streaming.test.ts b/apps/freya-backend/src/agent/streaming.test.ts index 3e4f2d3..b516786 100644 --- a/apps/freya-backend/src/agent/streaming.test.ts +++ b/apps/freya-backend/src/agent/streaming.test.ts @@ -1,5 +1,3 @@ -import type { AgentEvent } from "@freya/agent-protocol" - import { describe, expect, test } from "bun:test" import type { @@ -9,9 +7,12 @@ import type { QueryAgentEventListener, QueryAgentStreamEvent, } from "./query-agent.ts" -import type { AgentResponseStreamItem } from "./streaming.ts" -import { streamAgentResponse } from "./streaming.ts" +import { + AgentResponseStreamEventKind, + streamAgentResponse, + type AgentResponseStreamEvent, +} from "./streaming.ts" class FakeQueryAgent implements QueryAgent { readonly inputs: QueryAgentAsk[] = [] @@ -47,23 +48,22 @@ describe("streamAgentResponse", () => { { type: "done" }, ]) - const { events, result } = await collectStreamAgentResponse( + const events = await collectStreamAgentResponse( streamAgentResponse({ agent, input: { message: "hello" }, }), ) - expect(result).toEqual({ - conversationId: "conversation-1", - message: "First message\nSecond message\nThird message", - }) expect(events).toEqual([ - { type: "conversation_started", conversationId: "conversation-1" }, - { type: "message_created", text: "First message" }, - { type: "message_created", text: "Second message" }, - { type: "message_created", text: "Third message" }, - { type: "message_finished" }, + { + kind: AgentResponseStreamEventKind.ConversationStarted, + conversationId: "conversation-1", + }, + { kind: AgentResponseStreamEventKind.AssistantMessage, text: "First message" }, + { kind: AgentResponseStreamEventKind.AssistantMessage, text: "Second message" }, + { kind: AgentResponseStreamEventKind.AssistantMessage, text: "Third message" }, + { kind: AgentResponseStreamEventKind.ResponseFinished }, ]) }) @@ -74,22 +74,21 @@ describe("streamAgentResponse", () => { { type: "done" }, ]) - const { events, result } = await collectStreamAgentResponse( + const events = await collectStreamAgentResponse( streamAgentResponse({ agent, input: { message: "hello" }, }), ) - expect(result).toEqual({ - conversationId: "conversation-1", - message: " const value = 1 \n\n return value", - }) expect(events).toEqual([ - { type: "conversation_started", conversationId: "conversation-1" }, - { type: "message_created", text: " const value = 1 " }, - { type: "message_created", text: " return value" }, - { type: "message_finished" }, + { + kind: AgentResponseStreamEventKind.ConversationStarted, + conversationId: "conversation-1", + }, + { kind: AgentResponseStreamEventKind.AssistantMessage, text: " const value = 1 " }, + { kind: AgentResponseStreamEventKind.AssistantMessage, text: " return value" }, + { kind: AgentResponseStreamEventKind.ResponseFinished }, ]) }) @@ -106,44 +105,38 @@ describe("streamAgentResponse", () => { agent, input: { message: "hello" }, }) - const events: AgentEvent[] = [] + const events: AgentResponseStreamEvent[] = [] await expect(collectStreamAgentResponse(stream, events)).rejects.toThrow("model unavailable") expect(events).toEqual([ - { type: "conversation_started", conversationId: "conversation-1" }, - { type: "message_created", text: "I'll check" }, - { type: "tool_started", toolName: "calendar" }, - { type: "tool_finished", toolName: "calendar", ok: false }, - { type: "message_created", text: "That failed" }, - { type: "message_failed", error: "model unavailable" }, + { + kind: AgentResponseStreamEventKind.ConversationStarted, + conversationId: "conversation-1", + }, + { kind: AgentResponseStreamEventKind.AssistantMessage, text: "I'll check" }, + { kind: AgentResponseStreamEventKind.ToolStarted, toolName: "calendar" }, + { + kind: AgentResponseStreamEventKind.ToolFinished, + toolName: "calendar", + ok: false, + }, + { kind: AgentResponseStreamEventKind.AssistantMessage, text: "That failed" }, + { + kind: AgentResponseStreamEventKind.ResponseFailed, + error: "model unavailable", + }, ]) }) }) async function collectStreamAgentResponse( - stream: AsyncIterable, - events: AgentEvent[] = [], -): Promise<{ - events: AgentEvent[] - result: { message: string; conversationId: string } -}> { - let result: { message: string; conversationId: string } | null = null - - for await (const item of stream) { - switch (item.type) { - case "event": - events.push(item.event) - break - case "result": - result = item.result - break - } + stream: AsyncIterable, + events: AgentResponseStreamEvent[] = [], +): Promise { + for await (const event of stream) { + events.push(event) } - if (!result) { - throw new Error("Expected stream result") - } - - return { events, result } + return events } diff --git a/apps/freya-backend/src/agent/streaming.ts b/apps/freya-backend/src/agent/streaming.ts index f80abda..8a2ccc3 100644 --- a/apps/freya-backend/src/agent/streaming.ts +++ b/apps/freya-backend/src/agent/streaming.ts @@ -1,10 +1,42 @@ -import type { AgentEvent, SendMessageResult } from "@freya/agent-protocol" - import type { QueryAgent, QueryAgentAsk } from "./query-agent.ts" -export type AgentResponseStreamItem = - | { type: "event"; event: AgentEvent } - | { type: "result"; result: SendMessageResult } +export const AgentResponseStreamEventKind = { + ConversationStarted: "conversation_started", + AssistantMessage: "assistant_message", + ToolStarted: "tool_started", + ToolFinished: "tool_finished", + ResponseFinished: "response_finished", + ResponseFailed: "response_failed", +} as const + +export type AgentResponseStreamEventKind = + (typeof AgentResponseStreamEventKind)[keyof typeof AgentResponseStreamEventKind] + +export type AgentResponseStreamEvent = + | { + kind: typeof AgentResponseStreamEventKind.ConversationStarted + conversationId: string + } + | { + kind: typeof AgentResponseStreamEventKind.AssistantMessage + text: string + } + | { + kind: typeof AgentResponseStreamEventKind.ToolStarted + toolName: string + } + | { + kind: typeof AgentResponseStreamEventKind.ToolFinished + toolName: string + ok: boolean + } + | { + kind: typeof AgentResponseStreamEventKind.ResponseFinished + } + | { + kind: typeof AgentResponseStreamEventKind.ResponseFailed + error: string + } export async function* streamAgentResponse({ agent, @@ -12,18 +44,16 @@ export async function* streamAgentResponse({ }: { agent: QueryAgent input: QueryAgentAsk -}): AsyncGenerator { - let message = "" - let conversationId: string | null = null +}): AsyncGenerator { const splitter = new AgentMessageSplitter() - function messageEvent(text: string): AgentResponseStreamItem | null { + function messageEvent(text: string): AgentResponseStreamEvent | null { if (text.trim() === "") return null - return { type: "event", event: { type: "message_created", text } } + return { kind: AgentResponseStreamEventKind.AssistantMessage, text } } - function flushPendingMessage(): AgentResponseStreamItem | null { + function flushPendingMessage(): AgentResponseStreamEvent | null { const text = splitter.flush() if (text === null) return null @@ -31,14 +61,19 @@ export async function* streamAgentResponse({ } for await (const event of agent.ask(input)) { + if (input.signal?.aborted) { + break + } + switch (event.type) { case "conversation": - conversationId = event.conversationId - yield { type: "event", event: { type: "conversation_started", conversationId } } + yield { + kind: AgentResponseStreamEventKind.ConversationStarted, + conversationId: event.conversationId, + } break case "text_delta": - message += event.text for (const line of splitter.push(event.text)) { const item = messageEvent(line) if (item) yield item @@ -50,7 +85,10 @@ export async function* streamAgentResponse({ const item = flushPendingMessage() if (item) yield item } - yield { type: "event", event: { type: "tool_started", toolName: event.toolName } } + yield { + kind: AgentResponseStreamEventKind.ToolStarted, + toolName: event.toolName, + } break case "tool_end": @@ -59,12 +97,9 @@ export async function* streamAgentResponse({ if (item) yield item } yield { - type: "event", - event: { - type: "tool_finished", - toolName: event.toolName, - ok: event.ok, - }, + kind: AgentResponseStreamEventKind.ToolFinished, + toolName: event.toolName, + ok: event.ok, } break @@ -73,7 +108,10 @@ export async function* streamAgentResponse({ const item = flushPendingMessage() if (item) yield item } - yield { type: "event", event: { type: "message_failed", error: event.message } } + yield { + kind: AgentResponseStreamEventKind.ResponseFailed, + error: event.message, + } throw new Error(event.message) case "done": @@ -81,26 +119,15 @@ export async function* streamAgentResponse({ const item = flushPendingMessage() if (item) yield item } - const result = createResult(message, conversationId) - yield { type: "event", event: { type: "message_finished" } } - yield { type: "result", result } + yield { kind: AgentResponseStreamEventKind.ResponseFinished } return } } const item = flushPendingMessage() if (item) yield item - const result = createResult(message, conversationId) - yield { type: "event", event: { type: "message_finished" } } - yield { type: "result", result } -} -function createResult(message: string, conversationId: string | null): SendMessageResult { - if (!conversationId) { - throw new Error("Agent response stream ended without a conversation id") - } - - return { message, conversationId } + yield { kind: AgentResponseStreamEventKind.ResponseFinished } } class AgentMessageSplitter { diff --git a/apps/freya-backend/src/agent/ws.test.ts b/apps/freya-backend/src/agent/ws.test.ts index 74ceb8a..f13867f 100644 --- a/apps/freya-backend/src/agent/ws.test.ts +++ b/apps/freya-backend/src/agent/ws.test.ts @@ -1,8 +1,10 @@ import { describe, expect, test } from "bun:test" import { Hono } from "hono" -import type { UserSessionManager } from "../session/index.ts" +import type { ConversationStorage } from "../conversations/storage.ts" +import type { NotificationCentral } from "../notification/notification-central.ts" +import type { AgentService } from "./service.ts" import { registerAgentWebSocketHandlers } from "./ws.ts" describe("agent websocket handler", () => { @@ -11,7 +13,9 @@ describe("agent websocket handler", () => { const app = new Hono() registerAgentWebSocketHandlers(app, { - sessionManager: {} as UserSessionManager, + agentService: {} as AgentService, + storage: {} as ConversationStorage, + notificationCentral: {} as NotificationCentral, corsMiddleware: async (c, next) => { const origin = c.req.header("origin") if (origin && origin !== "https://app.freya.test") { @@ -44,7 +48,9 @@ describe("agent websocket handler", () => { const app = new Hono() registerAgentWebSocketHandlers(app, { - sessionManager: {} as UserSessionManager, + agentService: {} as AgentService, + storage: {} as ConversationStorage, + notificationCentral: {} as NotificationCentral, corsMiddleware: async (_c, next) => { await next() }, diff --git a/apps/freya-backend/src/agent/ws.ts b/apps/freya-backend/src/agent/ws.ts index 9d66fb0..290f578 100644 --- a/apps/freya-backend/src/agent/ws.ts +++ b/apps/freya-backend/src/agent/ws.ts @@ -1,60 +1,74 @@ -import type { AgentClientApi, AgentServerApi, SendMessageResult } from "@freya/agent-protocol" +import type { AgentClientApi, AgentServerApi, UserEvent } from "@freya/agent-protocol" +import type { ConversationEntry } from "@freya/core" import type { JrpcChannel, JrpcMessage, JsonRpcMessage } from "@nym.sh/jrpc" import type { Hono, MiddlewareHandler } from "hono" import type { WSContext } from "hono/ws" -import { JsonRpcClient, JsonRpcServer } from "@nym.sh/jrpc" -import { type } from "arktype" +import { JsonRpcClient, JsonRpcServer, deserializeJrpcMessage } from "@nym.sh/jrpc" import { upgradeWebSocket, websocket } from "hono/bun" import type { AuthSessionMiddleware } from "../auth/session-middleware.ts" -import type { UserSessionManager } from "../session/index.ts" +import type { ConversationStorage } from "../conversations/storage.ts" +import type { AppLogger } from "../lib/logger.ts" +import type { + NotificationCentral, + NotificationPayload, +} from "../notification/notification-central.ts" +import type { AgentService } from "./service.ts" -import { streamAgentResponse } from "./streaming.ts" +import { logger } from "../lib/logger.ts" + +const agentWebSocketLogger = logger.child({ component: "agent_ws" }) interface AgentWebSocketHandlerDeps { - sessionManager: UserSessionManager + agentService: AgentService + storage: ConversationStorage + notificationCentral: NotificationCentral authSessionMiddleware: AuthSessionMiddleware corsMiddleware: MiddlewareHandler } -interface ValidSendMessageInput { - message: string -} - export const agentWebSocket = websocket -const SendMessageInputBody = type({ - "+": "reject", - message: "string", -}) - export function registerAgentWebSocketHandlers( app: Hono, - { sessionManager, authSessionMiddleware, corsMiddleware }: AgentWebSocketHandlerDeps, + { + agentService, + storage, + notificationCentral, + authSessionMiddleware, + corsMiddleware, + }: AgentWebSocketHandlerDeps, ): void { app.get( "/api/agent/ws", corsMiddleware, authSessionMiddleware, - upgradeWebSocket((c) => { + upgradeWebSocket(async (c) => { const user = c.get("user") if (!user) { throw new Error("Authenticated WebSocket user missing") } + const conversation = await storage.getOrCreateConversation(user.id) + const channel = new HonoWebSocketJrpcChannel() const connection = new AgentRpcConnection({ channel, - sessionManager, + notificationCentral, + agentService, userId: user.id, + conversationId: conversation.id, }) return { onOpen(_event, ws) { channel.attach(ws) void connection.start().catch((err: unknown) => { - console.error("[query] Agent WebSocket JSON-RPC failed:", errorMessage(err)) + agentWebSocketLogger.error( + { err, conversationId: conversation.id }, + "agent websocket JSON-RPC failed", + ) ws.close(1011, "Agent RPC connection failed") }) }, @@ -64,6 +78,7 @@ export function registerAgentWebSocketHandlers( }, onClose() { + connection.close() channel.close() }, } @@ -74,54 +89,58 @@ export function registerAgentWebSocketHandlers( class AgentRpcConnection implements AgentServerApi { private readonly client: JsonRpcClient private readonly server: JsonRpcServer - private activeMessage: Promise | null = null - private readonly sessionManager: UserSessionManager + private readonly agentService: AgentService + private readonly logger: AppLogger + private readonly notificationCentral: NotificationCentral private readonly userId: string + private readonly conversationId: string + + private cleanup: (() => void) | null = null constructor({ + agentService, + notificationCentral, channel, - sessionManager, userId, + conversationId, }: { + agentService: AgentService + notificationCentral: NotificationCentral channel: JrpcChannel - sessionManager: UserSessionManager userId: string + conversationId: string }) { - this.sessionManager = sessionManager - this.userId = userId this.client = new JsonRpcClient(channel) + this.agentService = agentService + this.logger = agentWebSocketLogger.child({ conversationId }) + this.notificationCentral = notificationCentral + this.userId = userId + this.conversationId = conversationId this.server = new JsonRpcServer( { sendMessage: this.sendMessage.bind(this), + notify: this.notify.bind(this), ping: this.ping.bind(this), }, channel, ) } - start(): Promise { - return this.server.start() + notify(event: UserEvent): void { + void this.agentService.handleUserEvent(this.conversationId, event).catch((err: unknown) => { + this.logger.error({ err, eventType: event.type }, "failed to handle agent user event") + }) } - async sendMessage(message: string): Promise { - const parsed = SendMessageInputBody({ message }) - if (parsed instanceof type.errors) { - throw new Error(parsed.summary) - } - - if (this.activeMessage) { - throw new Error("A message is already running") - } - - const run = this.runMessage(parsed) - this.activeMessage = run - + async sendMessage(message: string): Promise { try { - return await run - } finally { - if (this.activeMessage === run) { - this.activeMessage = null - } + return await this.agentService.scheduleAgentResponse(this.conversationId, message) + } catch (error) { + this.logger.error( + { err: error, messageLength: message.length }, + "agent rpc schedule agent response failed", + ) + throw error } } @@ -129,26 +148,22 @@ class AgentRpcConnection implements AgentServerApi { return "pong" } - private async runMessage(input: ValidSendMessageInput): Promise { - const session = await this.sessionManager.getOrCreate(this.userId) - let result: SendMessageResult | null = null + async start() { + this.cleanup = this.notificationCentral.registerListenerForUser( + this.userId, + this.onNotificationReceived.bind(this), + ) + await this.server.start() + } - for await (const item of streamAgentResponse({ agent: session.agent, input })) { - switch (item.type) { - case "event": - await this.client.call("notify", item.event) - break - case "result": - result = item.result - break - } + close() { + this.cleanup?.() + } + + private async onNotificationReceived(notification: NotificationPayload) { + if (notification.kind === "agent") { + await this.client.call("notify", notification.payload) } - - if (!result) { - throw new Error("Agent response stream ended without a result") - } - - return result } } @@ -171,7 +186,11 @@ class HonoWebSocketJrpcChannel implements JrpcChannel { } receive(message: unknown): void { - const parsed = parseJrpcMessage(message) + if (typeof message !== "string") { + return + } + + const parsed = deserializeJrpcMessage(message) if (!parsed) { this.ws?.close(1003, "Invalid JSON-RPC message") return @@ -235,53 +254,3 @@ class HonoWebSocketJrpcChannel implements JrpcChannel { this.queue.push(msg) } } - -function parseJrpcMessage(message: unknown): JrpcMessage | null { - const text = webSocketMessageText(message) - if (text === null) return null - - try { - const value: unknown = JSON.parse(text) - return isJrpcMessage(value) ? value : null - } catch { - return null - } -} - -function webSocketMessageText(message: unknown): string | null { - if (typeof message === "string") return message - if (message instanceof ArrayBuffer) return Buffer.from(message).toString("utf8") - if (ArrayBuffer.isView(message)) { - return Buffer.from(message.buffer, message.byteOffset, message.byteLength).toString("utf8") - } - - return null -} - -function isJrpcMessage(value: unknown): value is JrpcMessage { - if (typeof value !== "object" || value === null) return false - if (!("jsonrpc" in value) || value.jsonrpc !== "2.0") return false - - if ("method" in value) { - return "id" in value && typeof value.id === "number" && typeof value.method === "string" - } - - if ("result" in value) { - return "id" in value && typeof value.id === "number" - } - - if ("error" in value) { - return ( - "id" in value && - typeof value.id === "number" && - typeof value.error === "object" && - value.error !== null - ) - } - - return false -} - -function errorMessage(error: unknown): string { - return error instanceof Error ? error.message : String(error) -} diff --git a/apps/freya-backend/src/conversations/db-storage.ts b/apps/freya-backend/src/conversations/db-storage.ts new file mode 100644 index 0000000..c599a20 --- /dev/null +++ b/apps/freya-backend/src/conversations/db-storage.ts @@ -0,0 +1,691 @@ +import { + AssistantMessagePayload, + AttachmentPayload, + ConversationEntryKind, + ConversationEntryMetadata, + ConversationEntryVisibility, + ContextSummaryPayload, + GenericObjectPayload, + UserMessagePayload, + type ConversationEntry, + type ConversationEntryPayload, +} from "@freya/core" +import { type } from "arktype" +import { and, asc, desc, eq, gte, inArray } from "drizzle-orm" +import { alias } from "drizzle-orm/pg-core" + +import type { Database } from "../db/index.ts" +import type { + AppendAttachmentEntryInput, + AppendAttachmentEntryResult, + AppendConversationEntryInput, + ConversationEntryRow, + ConversationResponseStateRow, + ConversationRow, + ConversationStorage, + CreateFileInput, + FileRow, + ListConversationEntriesParams, + UpdateConversationResponseStateInput, + UpsertConversationResponseStateInput, +} from "./storage.ts" + +import { + conversationEntries, + ConversationResponseStateStatus, + conversationResponseState as conversationResponseStateTable, + conversations as conversationsTable, + files, + user, +} from "../db/schema.ts" +import { ConversationNotFoundError } from "./errors.ts" +import { conversationEntryFromRow } from "./serialization.ts" + +const conversationEntryKind = type.enumerated(...Object.values(ConversationEntryKind)) +const conversationEntryVisibility = type.enumerated(...Object.values(ConversationEntryVisibility)) +const pendingSinceEntry = alias(conversationEntries, "pending_since_entry") + +export class DrizzleConversationStorage implements ConversationStorage { + private readonly db: Database + private readonly inTransaction: boolean + + constructor(db: Database, inTransaction = false) { + this.db = db + this.inTransaction = inTransaction + } + + async transaction(tx: (storage: ConversationStorage) => T | Promise): Promise { + if (this.inTransaction) return tx(this) + + return this.db.transaction(async (transactionDb) => + tx(new DrizzleConversationStorage(transactionDb, true)), + ) + } + + async createConversation(userId: string): Promise { + return insertConversation(this.db, userId) + } + + async listUserConversations(userId: string): Promise { + return this.db + .select() + .from(conversationsTable) + .where(eq(conversationsTable.userId, userId)) + .orderBy(desc(conversationsTable.updatedAt), desc(conversationsTable.createdAt)) + } + + async findConversation(conversationId: string): Promise { + return findConversation(this.db, conversationId) + } + + async getOrCreateConversation(userId: string): Promise { + return this.write(async (db) => { + await requireUserForUpdate(db, userId) + const existing = await latestConversation(db, userId) + if (existing) return existing + + return insertConversation(db, userId) + }) + } + + async createFile(userId: string, input: CreateFileInput): Promise { + return insertFile(this.db, userId, input) + } + + async appendEntry( + conversationId: string, + input: AppendConversationEntryInput, + ): Promise { + const row = await this.write((db) => appendEntryToConversation(db, null, conversationId, input)) + return conversationEntryFromRow(row) + } + + async appendAttachmentEntry( + conversationId: string, + input: AppendAttachmentEntryInput, + ): Promise { + return this.write((db) => appendAttachmentEntryToConversation(db, null, conversationId, input)) + } + + async nextSequence(conversationId: string): Promise { + return nextSequence(this.db, conversationId) + } + + async listUserConversationEntries( + userId: string, + conversationId: string, + params: ListConversationEntriesParams = {}, + ): Promise { + if (!(await findUserConversation(this.db, userId, conversationId))) { + throw new ConversationNotFoundError(conversationId, userId) + } + + if (params.visibility) { + return this.db + .select() + .from(conversationEntries) + .where( + and( + eq(conversationEntries.conversationId, conversationId), + eq(conversationEntries.visibility, params.visibility), + ), + ) + .orderBy(asc(conversationEntries.sequence)) + } + + return this.db + .select() + .from(conversationEntries) + .where(eq(conversationEntries.conversationId, conversationId)) + .orderBy(asc(conversationEntries.sequence)) + } + + async listPendingUserConversationEntries( + userId: string, + conversationId: string, + ): Promise { + const entries = await this.db + .select({ entry: conversationEntries }) + .from(conversationResponseStateTable) + .innerJoin( + conversationsTable, + and( + eq(conversationsTable.id, conversationResponseStateTable.conversationId), + eq(conversationsTable.userId, userId), + ), + ) + .innerJoin( + pendingSinceEntry, + and( + eq(pendingSinceEntry.id, conversationResponseStateTable.pendingSinceEntryId), + eq(pendingSinceEntry.conversationId, conversationResponseStateTable.conversationId), + ), + ) + .innerJoin( + conversationEntries, + and( + eq(conversationEntries.conversationId, conversationResponseStateTable.conversationId), + eq(conversationEntries.kind, ConversationEntryKind.UserMessage), + gte(conversationEntries.sequence, pendingSinceEntry.sequence), + ), + ) + .where( + and( + eq(conversationResponseStateTable.conversationId, conversationId), + eq(conversationEntries.conversationId, conversationId), + ), + ) + .orderBy(asc(conversationEntries.sequence)) + + if (entries.length > 0) return entries.map(({ entry }) => entry) + if (await findUserConversation(this.db, userId, conversationId)) return [] + + throw new ConversationNotFoundError(conversationId, userId) + } + + async findConversationResponseState( + conversationId: string, + ): Promise { + const rows = await this.db + .select() + .from(conversationResponseStateTable) + .where(eq(conversationResponseStateTable.conversationId, conversationId)) + .limit(1) + + return rows[0] ?? null + } + + async listPendingResponseStates(): Promise { + const rows = await this.db + .select() + .from(conversationResponseStateTable) + .where(eq(conversationResponseStateTable.status, ConversationResponseStateStatus.Pending)) + + return rows + } + + async listRunningResponseStates(): Promise { + const rows = await this.db + .select() + .from(conversationResponseStateTable) + .where(eq(conversationResponseStateTable.status, ConversationResponseStateStatus.Running)) + + return rows + } + + async upsertConversationResponseState( + conversationId: string, + input: UpsertConversationResponseStateInput, + ): Promise { + const now = new Date() + + return this.write(async (db) => { + if (!(await findConversationByIdForUpdate(db, conversationId))) { + throw new ConversationNotFoundError(conversationId, "") + } + + const rows = await db + .insert(conversationResponseStateTable) + .values({ + conversationId, + status: input.status ?? ConversationResponseStateStatus.Pending, + pendingSinceEntryId: input.pendingSinceEntryId, + maxWaitUntil: input.maxWaitUntil, + runningSince: input.runningSince ?? null, + updatedAt: now, + }) + .onConflictDoUpdate({ + target: conversationResponseStateTable.conversationId, + set: { + status: input.status ?? ConversationResponseStateStatus.Pending, + maxWaitUntil: input.maxWaitUntil, + runningSince: input.runningSince ?? null, + updatedAt: now, + }, + }) + .returning() + + return requireRow(rows) + }) + } + + async updateConversationResponseState( + conversationId: string, + input: UpdateConversationResponseStateInput, + ): Promise { + return this.write(async (db) => { + if (!(await findConversationByIdForUpdate(db, conversationId))) { + throw new ConversationNotFoundError(conversationId, "") + } + + const rows = await db + .update(conversationResponseStateTable) + .set({ + status: input.status, + pendingSinceEntryId: input.pendingSinceEntryId, + maxWaitUntil: input.maxWaitUntil, + runningSince: input.runningSince, + updatedAt: new Date(), + }) + .where(eq(conversationResponseStateTable.conversationId, conversationId)) + .returning() + + return rows[0] ?? null + }) + } + + async markResponseStateStatus( + conversationIds: string[], + status: ConversationResponseStateStatus, + ): Promise { + return this.write(async (db) => { + const now = new Date() + + let runningSince: Date | null + switch (status) { + case "pending": + case "failed": + runningSince = null + break + case "running": + runningSince = now + break + } + + const rows = await db + .update(conversationResponseStateTable) + .set({ + status, + runningSince, + updatedAt: now, + }) + .where(inArray(conversationResponseStateTable.conversationId, conversationIds)) + .returning() + + return rows + }) + } + + async claimPendingConversationResponseState( + conversationId: string, + ): Promise { + return this.write(async (db) => { + const now = new Date() + const rows = await db + .update(conversationResponseStateTable) + .set({ + status: "running", + runningSince: now, + updatedAt: now, + }) + .where( + and( + eq(conversationResponseStateTable.conversationId, conversationId), + eq(conversationResponseStateTable.status, "pending"), + ), + ) + .returning() + + return rows[0] ?? null + }) + } + + async clearConversationResponseState(conversationId: string): Promise { + await this.write(async (db) => { + if (!(await findConversationByIdForUpdate(db, conversationId))) { + throw new ConversationNotFoundError(conversationId, "") + } + + await db + .delete(conversationResponseStateTable) + .where(eq(conversationResponseStateTable.conversationId, conversationId)) + }) + } + + private async write(fn: (db: Database) => Promise): Promise { + if (this.inTransaction) return fn(this.db) + + return this.db.transaction(fn) + } +} + +export function createConversationStorage(db: Database): ConversationStorage { + return new DrizzleConversationStorage(db) +} + +export function conversations(db: Database, userId: string) { + const storage = createConversationStorage(db) + + return { + createConversation(): Promise { + return storage.createConversation(userId) + }, + + listConversations(): Promise { + return storage.listUserConversations(userId) + }, + + getConversation(conversationId: string): Promise { + return findUserConversation(db, userId, conversationId) + }, + + getOrCreateConversation(): Promise { + return storage.getOrCreateConversation(userId) + }, + + createFile(input: CreateFileInput): Promise { + return storage.createFile(userId, input) + }, + + appendEntry( + conversationId: string, + input: AppendConversationEntryInput, + ): Promise { + return db.transaction((tx) => + appendEntryToConversation(tx, userId, conversationId, input).then(conversationEntryFromRow), + ) + }, + + appendAttachmentEntry( + conversationId: string, + input: AppendAttachmentEntryInput, + ): Promise { + return db.transaction((tx) => + appendAttachmentEntryToConversation(tx, userId, conversationId, input), + ) + }, + + listEntries( + conversationId: string, + params: ListConversationEntriesParams = {}, + ): Promise { + return storage.listUserConversationEntries(userId, conversationId, params) + }, + } +} + +export function conversationResponse(db: Database, _userId: string, conversationId: string) { + const storage = createConversationStorage(db) + + return { + get(): Promise { + return storage.findConversationResponseState(conversationId) + }, + + upsert(input: UpsertConversationResponseStateInput): Promise { + return storage.upsertConversationResponseState(conversationId, input) + }, + + update( + input: UpdateConversationResponseStateInput, + ): Promise { + return storage.updateConversationResponseState(conversationId, input) + }, + + clear(): Promise { + return storage.clearConversationResponseState(conversationId) + }, + } +} + +function payloadForKind( + kind: ConversationEntryKind, + payload: AppendConversationEntryInput["payload"], +): ConversationEntryPayload { + switch (kind) { + case ConversationEntryKind.UserMessage: + return UserMessagePayload.assert(payload) + case ConversationEntryKind.AssistantMessage: + return AssistantMessagePayload.assert(payload) + case ConversationEntryKind.Attachment: + return AttachmentPayload.assert(payload) + case ConversationEntryKind.ContextSummary: + return ContextSummaryPayload.assert(payload) + case ConversationEntryKind.ToolCall: + case ConversationEntryKind.ToolResult: + case ConversationEntryKind.SystemNote: + return GenericObjectPayload.assert(payload) + } +} + +async function appendEntryToConversation( + db: Database, + userId: string | null, + conversationId: string, + input: AppendConversationEntryInput, +): Promise { + const kind = conversationEntryKind.assert(input.kind) + const visibility = conversationEntryVisibility.assert( + input.visibility ?? defaultVisibilityForKind(kind), + ) + const payload = payloadForKind(kind, input.payload) + const metadata = ConversationEntryMetadata.assert(input.metadata ?? {}) + let fileId: string | null = null + + if (input.kind === ConversationEntryKind.Attachment) { + fileId = input.fileId + } + + const conversation = userId + ? await findConversationForUpdate(db, userId, conversationId) + : await findConversationByIdForUpdate(db, conversationId) + if (!conversation) { + throw new ConversationNotFoundError(conversationId, userId ?? "") + } + if (fileId) await requireFile(db, conversation.userId, fileId) + + const sequence = await nextSequence(db, conversationId) + const rows = await db + .insert(conversationEntries) + .values({ + conversationId, + sequence, + kind, + visibility, + fileId, + payload, + metadata, + }) + .returning() + + await touchConversation(db, conversation.userId, conversationId) + return requireRow(rows) +} + +async function appendAttachmentEntryToConversation( + db: Database, + userId: string | null, + conversationId: string, + input: AppendAttachmentEntryInput, +): Promise { + const payload = AttachmentPayload.assert(input.payload) + const visibility = conversationEntryVisibility.assert( + input.visibility ?? defaultVisibilityForKind(ConversationEntryKind.Attachment), + ) + const metadata = ConversationEntryMetadata.assert(input.metadata ?? {}) + const conversation = userId + ? await findConversationForUpdate(db, userId, conversationId) + : await findConversationByIdForUpdate(db, conversationId) + + if (!conversation) { + throw new ConversationNotFoundError(conversationId, userId ?? "") + } + + const file = await insertFile(db, conversation.userId, input.file) + const sequence = await nextSequence(db, conversationId) + const rows = await db + .insert(conversationEntries) + .values({ + conversationId, + sequence, + kind: ConversationEntryKind.Attachment, + visibility, + fileId: file.id, + payload, + metadata, + }) + .returning() + + await touchConversation(db, conversation.userId, conversationId) + return { + file, + entry: requireRow(rows), + } +} + +async function requireUserForUpdate(db: Database, userId: string): Promise { + const rows = await db + .select({ id: user.id }) + .from(user) + .where(eq(user.id, userId)) + .limit(1) + .for("update") + + requireRow(rows, `User not found: ${userId}`) +} + +export async function findConversation( + db: Database, + conversationId: string, +): Promise { + const rows = await db + .select() + .from(conversationsTable) + .where(eq(conversationsTable.id, conversationId)) + .limit(1) + + return rows[0] ?? null +} + +async function findUserConversation( + db: Database, + userId: string, + conversationId: string, +): Promise { + const rows = await db + .select() + .from(conversationsTable) + .where(and(eq(conversationsTable.id, conversationId), eq(conversationsTable.userId, userId))) + .limit(1) + + return rows[0] ?? null +} + +async function findConversationForUpdate( + db: Database, + userId: string, + conversationId: string, +): Promise { + const rows = await db + .select() + .from(conversationsTable) + .where(and(eq(conversationsTable.id, conversationId), eq(conversationsTable.userId, userId))) + .limit(1) + .for("update") + + return rows[0] ?? null +} + +async function findConversationByIdForUpdate( + db: Database, + conversationId: string, +): Promise { + const rows = await db + .select() + .from(conversationsTable) + .where(eq(conversationsTable.id, conversationId)) + .limit(1) + .for("update") + + return rows[0] ?? null +} + +async function latestConversation(db: Database, userId: string): Promise { + const rows = await db + .select() + .from(conversationsTable) + .where(eq(conversationsTable.userId, userId)) + .orderBy(desc(conversationsTable.updatedAt), desc(conversationsTable.createdAt)) + .limit(1) + + return rows[0] ?? null +} + +async function insertConversation(db: Database, userId: string): Promise { + const rows = await db + .insert(conversationsTable) + .values({ + userId, + }) + .returning() + + return requireRow(rows) +} + +async function requireFile(db: Database, userId: string, fileId: string): Promise { + const rows = await db + .select() + .from(files) + .where(and(eq(files.id, fileId), eq(files.userId, userId))) + .limit(1) + + return requireRow(rows, `File not found: ${fileId}`) +} + +async function insertFile(db: Database, userId: string, input: CreateFileInput): Promise { + const rows = await db + .insert(files) + .values({ + userId, + storageKey: input.storageKey, + originalName: input.originalName ?? null, + mimeType: input.mimeType, + sizeBytes: input.sizeBytes, + metadata: input.metadata ?? {}, + }) + .returning() + + return requireRow(rows) +} + +async function touchConversation( + db: Database, + userId: string, + conversationId: string, +): Promise { + await db + .update(conversationsTable) + .set({ updatedAt: new Date() }) + .where(and(eq(conversationsTable.id, conversationId), eq(conversationsTable.userId, userId))) +} + +async function nextSequence(db: Database, conversationId: string): Promise { + const rows = await db + .select({ sequence: conversationEntries.sequence }) + .from(conversationEntries) + .where(eq(conversationEntries.conversationId, conversationId)) + .orderBy(desc(conversationEntries.sequence)) + .limit(1) + + return (rows[0]?.sequence ?? 0) + 1 +} + +function requireRow(rows: T[], message = "Expected database row"): T { + const row = rows[0] + if (!row) throw new Error(message) + return row +} + +function defaultVisibilityForKind(kind: ConversationEntryKind): ConversationEntryVisibility { + switch (kind) { + case ConversationEntryKind.UserMessage: + case ConversationEntryKind.AssistantMessage: + case ConversationEntryKind.Attachment: + return ConversationEntryVisibility.UserVisible + case ConversationEntryKind.ToolCall: + case ConversationEntryKind.ToolResult: + case ConversationEntryKind.ContextSummary: + case ConversationEntryKind.SystemNote: + return ConversationEntryVisibility.Internal + } +} diff --git a/apps/freya-backend/src/conversations/http.test.ts b/apps/freya-backend/src/conversations/http.test.ts index fc093e8..680356c 100644 --- a/apps/freya-backend/src/conversations/http.test.ts +++ b/apps/freya-backend/src/conversations/http.test.ts @@ -25,7 +25,7 @@ const listEntriesCalls: Array<{ params: ListConversationEntriesParams }> = [] -mock.module("./storage.ts", () => ({ +mock.module("./db-storage.ts", () => ({ conversations: (_db: Database, userId: string) => ({ async listConversations(): Promise { return conversationRowsByUser.get(userId) ?? [] diff --git a/apps/freya-backend/src/conversations/http.ts b/apps/freya-backend/src/conversations/http.ts index 39546b1..1860676 100644 --- a/apps/freya-backend/src/conversations/http.ts +++ b/apps/freya-backend/src/conversations/http.ts @@ -1,13 +1,7 @@ import type { Context, Hono } from "hono" import { - AssistantMessagePayload, - AttachmentPayload, - ConversationEntryKind, ConversationEntryVisibility, - ContextSummaryPayload, - GenericObjectPayload, - UserMessagePayload, type Conversation, type ConversationEntry, } from "@freya/core" @@ -16,10 +10,11 @@ import { createMiddleware } from "hono/factory" import type { AuthSessionMiddleware } from "../auth/session-middleware.ts" import type { Database } from "../db/index.ts" -import type { ConversationEntryRow, ConversationRow } from "./storage.ts" +import type { ConversationRow } from "./storage.ts" +import { conversations } from "./db-storage.ts" import { ConversationNotFoundError } from "./errors.ts" -import { conversations } from "./storage.ts" +import { conversationEntryFromRow } from "./serialization.ts" /** Hono environment populated by the conversations route middleware. */ type Env = { @@ -86,7 +81,7 @@ async function handleListEntries(c: Context) { visibility: ConversationEntryVisibility.UserVisible, }) const response: ListConversationEntriesResponse = { - entries: entries.map(serializeConversationEntry), + entries: entries.map(conversationEntryFromRow), } return c.json(response) @@ -105,82 +100,3 @@ function serializeConversation(row: ConversationRow): Conversation { updatedAt: row.updatedAt.toISOString(), } } - -function serializeConversationEntry(row: ConversationEntryRow): ConversationEntry { - const base = { - id: row.id, - conversationId: row.conversationId, - sequence: row.sequence, - visibility: row.visibility, - metadata: row.metadata, - createdAt: row.createdAt.toISOString(), - } - - switch (row.kind) { - case ConversationEntryKind.UserMessage: - return { - ...base, - kind: row.kind, - fileId: nullFileId(row), - payload: UserMessagePayload.assert(row.payload), - } - case ConversationEntryKind.AssistantMessage: - return { - ...base, - kind: row.kind, - fileId: nullFileId(row), - payload: AssistantMessagePayload.assert(row.payload), - } - case ConversationEntryKind.Attachment: - return { - ...base, - kind: row.kind, - fileId: requireFileId(row), - payload: AttachmentPayload.assert(row.payload), - } - case ConversationEntryKind.ToolCall: - return { - ...base, - kind: row.kind, - fileId: nullFileId(row), - payload: GenericObjectPayload.assert(row.payload), - } - case ConversationEntryKind.ToolResult: - return { - ...base, - kind: row.kind, - fileId: nullFileId(row), - payload: GenericObjectPayload.assert(row.payload), - } - case ConversationEntryKind.ContextSummary: - return { - ...base, - kind: row.kind, - fileId: nullFileId(row), - payload: ContextSummaryPayload.assert(row.payload), - } - case ConversationEntryKind.SystemNote: - return { - ...base, - kind: row.kind, - fileId: nullFileId(row), - payload: GenericObjectPayload.assert(row.payload), - } - } -} - -function requireFileId(row: ConversationEntryRow): string { - if (!row.fileId) { - throw new Error(`Conversation attachment entry "${row.id}" is missing a file id`) - } - - return row.fileId -} - -function nullFileId(row: ConversationEntryRow): null { - if (row.fileId !== null) { - throw new Error(`Conversation entry "${row.id}" unexpectedly references a file`) - } - - return null -} diff --git a/apps/freya-backend/src/conversations/serialization.ts b/apps/freya-backend/src/conversations/serialization.ts new file mode 100644 index 0000000..6064a70 --- /dev/null +++ b/apps/freya-backend/src/conversations/serialization.ts @@ -0,0 +1,90 @@ +import { + AssistantMessagePayload, + AttachmentPayload, + ConversationEntryKind, + ContextSummaryPayload, + GenericObjectPayload, + UserMessagePayload, + type ConversationEntry, +} from "@freya/core" + +import type { ConversationEntryRow } from "./storage.ts" + +export function conversationEntryFromRow(row: ConversationEntryRow): ConversationEntry { + const base = { + id: row.id, + conversationId: row.conversationId, + sequence: row.sequence, + visibility: row.visibility, + metadata: row.metadata, + createdAt: row.createdAt.toISOString(), + } + + switch (row.kind) { + case ConversationEntryKind.UserMessage: + return { + ...base, + kind: row.kind, + fileId: nullFileId(row), + payload: UserMessagePayload.assert(row.payload), + } + case ConversationEntryKind.AssistantMessage: + return { + ...base, + kind: row.kind, + fileId: nullFileId(row), + payload: AssistantMessagePayload.assert(row.payload), + } + case ConversationEntryKind.Attachment: + return { + ...base, + kind: row.kind, + fileId: requireFileId(row), + payload: AttachmentPayload.assert(row.payload), + } + case ConversationEntryKind.ToolCall: + return { + ...base, + kind: row.kind, + fileId: nullFileId(row), + payload: GenericObjectPayload.assert(row.payload), + } + case ConversationEntryKind.ToolResult: + return { + ...base, + kind: row.kind, + fileId: nullFileId(row), + payload: GenericObjectPayload.assert(row.payload), + } + case ConversationEntryKind.ContextSummary: + return { + ...base, + kind: row.kind, + fileId: nullFileId(row), + payload: ContextSummaryPayload.assert(row.payload), + } + case ConversationEntryKind.SystemNote: + return { + ...base, + kind: row.kind, + fileId: nullFileId(row), + payload: GenericObjectPayload.assert(row.payload), + } + } +} + +function requireFileId(row: ConversationEntryRow): string { + if (!row.fileId) { + throw new Error(`Conversation attachment entry "${row.id}" is missing a file id`) + } + + return row.fileId +} + +function nullFileId(row: ConversationEntryRow): null { + if (row.fileId !== null) { + throw new Error(`Conversation entry "${row.id}" unexpectedly references a file`) + } + + return null +} diff --git a/apps/freya-backend/src/conversations/storage.ts b/apps/freya-backend/src/conversations/storage.ts index 67b31a7..996df83 100644 --- a/apps/freya-backend/src/conversations/storage.ts +++ b/apps/freya-backend/src/conversations/storage.ts @@ -2,28 +2,71 @@ import { AssistantMessagePayload, AttachmentPayload, ConversationEntryKind, + ConversationEntryMetadata, ConversationEntryVisibility, ContextSummaryPayload, - ConversationEntryMetadata, GenericObjectPayload, UserMessagePayload, - type ConversationEntryPayload, + type ConversationEntry, } from "@freya/core" -import { type } from "arktype" -import { and, asc, desc, eq } from "drizzle-orm" - -import type { Database } from "../db/index.ts" import { conversationEntries, + conversationResponseState as conversationResponseStateTable, conversations as conversationsTable, files, - user, + type ConversationResponseStateStatus, } from "../db/schema.ts" -import { ConversationNotFoundError } from "./errors.ts" -const conversationEntryKind = type.enumerated(...Object.values(ConversationEntryKind)) -const conversationEntryVisibility = type.enumerated(...Object.values(ConversationEntryVisibility)) +export interface ConversationStorage { + transaction(tx: (storage: ConversationStorage) => T | Promise): Promise + createConversation(userId: string): Promise + listUserConversations(userId: string): Promise + findConversation(conversationId: string): Promise + getOrCreateConversation(userId: string): Promise + createFile(userId: string, input: CreateFileInput): Promise + appendEntry( + conversationId: string, + input: AppendConversationEntryInput, + ): Promise + appendAttachmentEntry( + conversationId: string, + input: AppendAttachmentEntryInput, + ): Promise + nextSequence(conversationId: string): Promise + listUserConversationEntries( + userId: string, + conversationId: string, + params?: ListConversationEntriesParams, + ): Promise + listPendingUserConversationEntries( + userId: string, + conversationId: string, + ): Promise + findConversationResponseState( + conversationId: string, + ): Promise + // TODO: add pagination support + listPendingResponseStates(): Promise + // TODO: add pagination support + listRunningResponseStates(): Promise + upsertConversationResponseState( + conversationId: string, + input: UpsertConversationResponseStateInput, + ): Promise + updateConversationResponseState( + conversationId: string, + input: UpdateConversationResponseStateInput, + ): Promise + markResponseStateStatus( + conversationIds: string[], + status: ConversationResponseStateStatus, + ): Promise + claimPendingConversationResponseState( + conversationId: string, + ): Promise + clearConversationResponseState(conversationId: string): Promise +} /** Database row shape for a conversation owned by a user. */ export type ConversationRow = typeof conversationsTable.$inferSelect @@ -31,6 +74,9 @@ export type ConversationRow = typeof conversationsTable.$inferSelect /** Database row shape for an entry in a conversation timeline. */ export type ConversationEntryRow = typeof conversationEntries.$inferSelect +/** Database row shape for pending assistant response state in a conversation. */ +export type ConversationResponseStateRow = typeof conversationResponseStateTable.$inferSelect + /** Database row shape for an uploaded file referenced by conversations. */ export type FileRow = typeof files.$inferSelect @@ -99,291 +145,26 @@ export interface ListConversationEntriesParams { visibility?: ConversationEntryVisibility } -export function conversations(db: Database, userId: string) { - const storage = { - async createConversation(): Promise { - return insertConversation(db, userId) - }, - - async listConversations(): Promise { - return db - .select() - .from(conversationsTable) - .where(eq(conversationsTable.userId, userId)) - .orderBy(desc(conversationsTable.updatedAt), desc(conversationsTable.createdAt)) - }, - - async getConversation(conversationId: string): Promise { - const rows = await db - .select() - .from(conversationsTable) - .where( - and(eq(conversationsTable.id, conversationId), eq(conversationsTable.userId, userId)), - ) - .limit(1) - - return rows[0] ?? null - }, - - async getOrCreateConversation(): Promise { - return db.transaction(async (tx) => { - await requireUserForUpdate(tx, userId) - const existing = await latestConversation(tx, userId) - if (existing) return existing - - return insertConversation(tx, userId) - }) - }, - - async createFile(input: CreateFileInput): Promise { - return insertFile(db, userId, input) - }, - - async appendEntry( - conversationId: string, - input: AppendConversationEntryInput, - ): Promise { - const kind = conversationEntryKind.assert(input.kind) - const visibility = conversationEntryVisibility.assert( - input.visibility ?? defaultVisibilityForKind(kind), - ) - const payload = payloadForKind(kind, input.payload) - const metadata = ConversationEntryMetadata.assert(input.metadata ?? {}) - let fileId: string | null = null - - if (input.kind === ConversationEntryKind.Attachment) { - fileId = input.fileId - await requireFile(db, userId, fileId) - } - - const rows = await db.transaction(async (tx) => { - if (!(await findConversationForUpdate(tx, userId, conversationId))) { - throw new ConversationNotFoundError(conversationId, userId) - } - const sequence = await nextSequence(tx, conversationId) - - const rows = await tx - .insert(conversationEntries) - .values({ - conversationId, - sequence, - kind, - visibility, - fileId, - payload, - metadata, - }) - .returning() - - await touchConversation(tx, userId, conversationId) - return rows - }) - - return requireRow(rows) - }, - - async appendAttachmentEntry( - conversationId: string, - input: AppendAttachmentEntryInput, - ): Promise { - const payload = AttachmentPayload.assert(input.payload) - const visibility = conversationEntryVisibility.assert( - input.visibility ?? defaultVisibilityForKind(ConversationEntryKind.Attachment), - ) - const metadata = ConversationEntryMetadata.assert(input.metadata ?? {}) - - return db.transaction(async (tx) => { - if (!(await findConversationForUpdate(tx, userId, conversationId))) { - throw new ConversationNotFoundError(conversationId, userId) - } - - const file = await insertFile(tx, userId, input.file) - const sequence = await nextSequence(tx, conversationId) - const rows = await tx - .insert(conversationEntries) - .values({ - conversationId, - sequence, - kind: ConversationEntryKind.Attachment, - visibility, - fileId: file.id, - payload, - metadata, - }) - .returning() - - await touchConversation(tx, userId, conversationId) - return { - file, - entry: requireRow(rows), - } - }) - }, - - async listEntries( - conversationId: string, - params: ListConversationEntriesParams = {}, - ): Promise { - if (!(await storage.getConversation(conversationId))) { - throw new ConversationNotFoundError(conversationId, userId) - } - - if (params.visibility) { - return db - .select() - .from(conversationEntries) - .where( - and( - eq(conversationEntries.conversationId, conversationId), - eq(conversationEntries.visibility, params.visibility), - ), - ) - .orderBy(asc(conversationEntries.sequence)) - } - - return db - .select() - .from(conversationEntries) - .where(eq(conversationEntries.conversationId, conversationId)) - .orderBy(asc(conversationEntries.sequence)) - }, - } - - return storage +/** Input for creating or replacing pending assistant response state. */ +export interface UpsertConversationResponseStateInput { + status?: ConversationResponseStateStatus + pendingSinceEntryId: string + maxWaitUntil: Date + runningSince?: Date | null } -function payloadForKind( - kind: ConversationEntryKind, - payload: AppendConversationEntryInput["payload"], -): ConversationEntryPayload { - switch (kind) { - case ConversationEntryKind.UserMessage: - return UserMessagePayload.assert(payload) - case ConversationEntryKind.AssistantMessage: - return AssistantMessagePayload.assert(payload) - case ConversationEntryKind.Attachment: - return AttachmentPayload.assert(payload) - case ConversationEntryKind.ContextSummary: - return ContextSummaryPayload.assert(payload) - case ConversationEntryKind.ToolCall: - case ConversationEntryKind.ToolResult: - case ConversationEntryKind.SystemNote: - return GenericObjectPayload.assert(payload) - } +/** Input for patching pending assistant response state. */ +export interface UpdateConversationResponseStateInput { + status?: ConversationResponseStateStatus + pendingSinceEntryId?: string + maxWaitUntil?: Date + runningSince?: Date | null } -async function requireUserForUpdate(db: Database, userId: string): Promise { - const rows = await db - .select({ id: user.id }) - .from(user) - .where(eq(user.id, userId)) - .limit(1) - .for("update") - - requireRow(rows, `User not found: ${userId}`) -} - -async function findConversationForUpdate( - db: Database, - userId: string, - conversationId: string, -): Promise { - const rows = await db - .select() - .from(conversationsTable) - .where(and(eq(conversationsTable.id, conversationId), eq(conversationsTable.userId, userId))) - .limit(1) - .for("update") - - return rows[0] ?? null -} - -async function latestConversation(db: Database, userId: string): Promise { - const rows = await db - .select() - .from(conversationsTable) - .where(eq(conversationsTable.userId, userId)) - .orderBy(desc(conversationsTable.updatedAt), desc(conversationsTable.createdAt)) - .limit(1) - - return rows[0] ?? null -} - -async function insertConversation(db: Database, userId: string): Promise { - const rows = await db - .insert(conversationsTable) - .values({ - userId, - }) - .returning() - - return requireRow(rows) -} - -async function requireFile(db: Database, userId: string, fileId: string): Promise { - const rows = await db - .select() - .from(files) - .where(and(eq(files.id, fileId), eq(files.userId, userId))) - .limit(1) - - return requireRow(rows, `File not found: ${fileId}`) -} - -async function insertFile(db: Database, userId: string, input: CreateFileInput): Promise { - const rows = await db - .insert(files) - .values({ - userId, - storageKey: input.storageKey, - originalName: input.originalName ?? null, - mimeType: input.mimeType, - sizeBytes: input.sizeBytes, - metadata: input.metadata ?? {}, - }) - .returning() - - return requireRow(rows) -} - -async function touchConversation( - db: Database, - userId: string, - conversationId: string, -): Promise { - await db - .update(conversationsTable) - .set({ updatedAt: new Date() }) - .where(and(eq(conversationsTable.id, conversationId), eq(conversationsTable.userId, userId))) -} - -async function nextSequence(db: Database, conversationId: string): Promise { - const rows = await db - .select({ sequence: conversationEntries.sequence }) - .from(conversationEntries) - .where(eq(conversationEntries.conversationId, conversationId)) - .orderBy(desc(conversationEntries.sequence)) - .limit(1) - - return (rows[0]?.sequence ?? 0) + 1 -} - -function requireRow(rows: T[], message = "Expected database row"): T { - const row = rows[0] - if (!row) throw new Error(message) - return row -} - -function defaultVisibilityForKind(kind: ConversationEntryKind): ConversationEntryVisibility { - switch (kind) { - case ConversationEntryKind.UserMessage: - case ConversationEntryKind.AssistantMessage: - case ConversationEntryKind.Attachment: - return ConversationEntryVisibility.UserVisible - case ConversationEntryKind.ToolCall: - case ConversationEntryKind.ToolResult: - case ConversationEntryKind.ContextSummary: - case ConversationEntryKind.SystemNote: - return ConversationEntryVisibility.Internal - } -} +export { + createConversationStorage, + conversationResponse, + conversations, + DrizzleConversationStorage, + findConversation, +} from "./db-storage.ts" diff --git a/apps/freya-backend/src/db/schema.ts b/apps/freya-backend/src/db/schema.ts index d822c90..cfb252a 100644 --- a/apps/freya-backend/src/db/schema.ts +++ b/apps/freya-backend/src/db/schema.ts @@ -48,6 +48,15 @@ const bytea = customType<{ data: Buffer }>({ }, }) +export const ConversationResponseStateStatus = { + Pending: "pending", + Running: "running", + Failed: "failed", +} as const + +export type ConversationResponseStateStatus = + (typeof ConversationResponseStateStatus)[keyof typeof ConversationResponseStateStatus] + export const userSources = pgTable( "user_sources", { @@ -146,6 +155,38 @@ export const conversationEntries = pgTable( ], ) +export const conversationResponseState = pgTable( + "conversation_response_state", + { + conversationId: uuid("conversation_id") + .primaryKey() + .references(() => conversations.id, { onDelete: "cascade" }), + status: text("status") + .$type() + .notNull() + .default(ConversationResponseStateStatus.Pending), + pendingSinceEntryId: uuid("pending_since_entry_id") + .notNull() + .references(() => conversationEntries.id, { onDelete: "cascade" }), + maxWaitUntil: timestamp("max_wait_until").notNull(), + runningSince: timestamp("running_since"), + createdAt: timestamp("created_at").notNull().defaultNow(), + updatedAt: timestamp("updated_at") + .notNull() + .defaultNow() + .$onUpdate(() => new Date()), + }, + (t) => [ + index("conversation_response_state_status_max_wait_until_idx").on(t.status, t.maxWaitUntil), + index("conversation_response_state_running_since_idx").on(t.runningSince), + index("conversation_response_state_pending_since_entry_id_idx").on(t.pendingSinceEntryId), + check( + "conversation_response_state_status_check", + sql`${t.status} in ('pending', 'running', 'failed')`, + ), + ], +) + // --------------------------------------------------------------------------- // FREYA — reminders source storage // --------------------------------------------------------------------------- diff --git a/apps/freya-backend/src/engine/http.test.ts b/apps/freya-backend/src/engine/http.test.ts index f2ebb31..1a205b0 100644 --- a/apps/freya-backend/src/engine/http.test.ts +++ b/apps/freya-backend/src/engine/http.test.ts @@ -14,7 +14,6 @@ interface FeedResponse { items: Array<{ id: string type: string - priority: number timestamp: string data: Record }> @@ -85,7 +84,7 @@ mock.module("../sources/user-sources.ts", () => ({ }), })) -mock.module("../conversations/storage.ts", () => ({ +mock.module("../conversations/db-storage.ts", () => ({ conversations: (_db: Database, userId: string) => ({ async getOrCreateConversation() { return { id: `conversation-${userId}` } @@ -118,7 +117,6 @@ describe("GET /api/feed", () => { id: "item-1", sourceId: "test", type: "test", - priority: 0.8, timestamp: new Date("2025-01-01T00:00:00.000Z"), data: { value: 42 }, }, @@ -149,7 +147,6 @@ describe("GET /api/feed", () => { expect(body.items).toHaveLength(1) expect(body.items[0]!.id).toBe("item-1") expect(body.items[0]!.type).toBe("test") - expect(body.items[0]!.priority).toBe(0.8) expect(body.items[0]!.timestamp).toBe("2025-01-01T00:00:00.000Z") expect(body.errors).toHaveLength(0) }) @@ -160,7 +157,6 @@ describe("GET /api/feed", () => { id: "fresh-1", sourceId: "test", type: "test", - priority: 0.5, timestamp: new Date("2025-06-01T12:00:00.000Z"), data: { fresh: true }, }, diff --git a/apps/freya-backend/src/enhancement/schema.test.ts b/apps/freya-backend/src/enhancement/schema.test.ts index 418de6c..01fee02 100644 --- a/apps/freya-backend/src/enhancement/schema.test.ts +++ b/apps/freya-backend/src/enhancement/schema.test.ts @@ -135,8 +135,9 @@ describe("schema sync", () => { // JSON Schema structure matches const jsonSchema = enhancementResultJsonSchema + const payloadKeys = Object.keys(payload).sort() as Array<(typeof jsonSchema.required)[number]> expect(Object.keys(jsonSchema.properties).sort()).toEqual(Object.keys(payload).sort()) - expect([...jsonSchema.required].sort()).toEqual(Object.keys(payload).sort()) + expect([...jsonSchema.required].sort()).toEqual(payloadKeys) // syntheticItems item schema has the right required fields const itemSchema = jsonSchema.properties.syntheticItems.items diff --git a/apps/freya-backend/src/lib/job.ts b/apps/freya-backend/src/lib/job.ts new file mode 100644 index 0000000..78b856a --- /dev/null +++ b/apps/freya-backend/src/lib/job.ts @@ -0,0 +1,116 @@ +import { Queue } from "./queue" + +const JobStatus = { + Pending: "pending", + Running: "running", +} as const +type JobStatus = (typeof JobStatus)[keyof typeof JobStatus] + +export interface Job { + id: number + payload: Payload + signal: AbortSignal +} + +interface PendingJob { + status: typeof JobStatus.Pending + controller: AbortController + job: Job +} + +interface RunningJob { + status: typeof JobStatus.Running + controller: AbortController + job: Job +} + +type JobState = PendingJob | RunningJob + +type JobEventListener = (job: Job) => void + +type JobEvent = "settled" | "cancelled" + +export class JobRegistry { + private queue = new Queue>() + + private states = new Map>() + + private listeners: Record[]> = { + settled: [], + cancelled: [], + } + + addJob({ payload }: { payload: Payload }): Job { + const controller = new AbortController() + const job: Job = { + id: this.generateJobId(), + payload, + signal: controller.signal, + } + this.queue.enqueue(job) + this.states.set(job.id, { status: JobStatus.Pending, controller, job }) + return job + } + + async nextJob(signal?: AbortSignal): Promise | null> { + while (true) { + const job = await this.queue.next(signal) + if (!job) { + return null + } + + const state = this.states.get(job.id) + + if (!state || state.job !== job || state.status === JobStatus.Running) { + continue + } + if (state.controller.signal.aborted) { + this.states.delete(job.id) + continue + } + + this.states.set(job.id, { status: JobStatus.Running, controller: state.controller, job }) + + return job + } + } + + cancelJob(job: Job): void { + const state = this.states.get(job.id) + if (state?.job === job) { + state?.controller.abort() + this.notifyListeners("cancelled", job.id) + this.states.delete(job.id) + } + } + + markJobAsCompleted(job: Job): void { + const state = this.states.get(job.id) + if (state?.job === job) { + this.notifyListeners("settled", job.id) + this.states.delete(job.id) + } + } + + addEventListener(event: JobEvent, listener: JobEventListener): () => void { + this.listeners[event].push(listener) + return () => { + this.listeners[event] = this.listeners[event].filter((l) => l !== listener) + } + } + + private generateJobId(): number { + let id: number + do { + id = Math.floor(Math.random() * 1000000) + } while (this.states.has(id)) + return id + } + + private notifyListeners(event: JobEvent, id: number): void { + const job = this.states.get(id)?.job + if (job) { + this.listeners[event].forEach((listener) => listener(job)) + } + } +} diff --git a/apps/freya-backend/src/lib/logger.ts b/apps/freya-backend/src/lib/logger.ts new file mode 100644 index 0000000..57af2c7 --- /dev/null +++ b/apps/freya-backend/src/lib/logger.ts @@ -0,0 +1,83 @@ +import { hostname } from "node:os" +import pino, { type Logger } from "pino" + +export type AppLogger = Logger + +const LogLevel = { + Trace: "trace", + Debug: "debug", + Info: "info", + Warn: "warn", + Error: "error", + Fatal: "fatal", + Silent: "silent", +} as const +type LogLevel = (typeof LogLevel)[keyof typeof LogLevel] + +export const logger = pino({ + base: { + hostname: hostname(), + pid: process.pid, + service: "freya-backend", + }, + level: readLogLevel(process.env.LOG_LEVEL, process.env.NODE_ENV), + transport: + readLogFormat(process.env.LOG_FORMAT, process.env.NODE_ENV) === "pretty" + ? { + target: "pino-pretty", + options: { + colorize: true, + ignore: "hostname,pid", + singleLine: false, + translateTime: "SYS:standard", + }, + } + : undefined, + redact: { + censor: "[redacted]", + paths: [ + "apiKey", + "authorization", + "cookie", + "credentialEncryptionKey", + "password", + "privateKey", + "*.apiKey", + "*.authorization", + "*.cookie", + "*.credentialEncryptionKey", + "*.password", + "*.privateKey", + ], + }, + serializers: { + err: pino.stdSerializers.err, + }, +}) + +function readLogFormat(value: string | undefined, nodeEnv: string | undefined): "json" | "pretty" { + const normalized = value?.trim().toLowerCase() + if (normalized === "json") { + return "json" + } + if (nodeEnv === "test") { + return "json" + } + return "pretty" +} + +function readLogLevel(value: string | undefined, nodeEnv: string | undefined): LogLevel { + const normalized = value?.trim().toLowerCase() + switch (normalized) { + case LogLevel.Trace: + case LogLevel.Debug: + case LogLevel.Info: + case LogLevel.Warn: + case LogLevel.Error: + case LogLevel.Fatal: + case LogLevel.Silent: + return normalized + default: + return nodeEnv === "test" ? LogLevel.Silent : LogLevel.Info + } +} diff --git a/apps/freya-backend/src/lib/queue.ts b/apps/freya-backend/src/lib/queue.ts new file mode 100644 index 0000000..b4b8e29 --- /dev/null +++ b/apps/freya-backend/src/lib/queue.ts @@ -0,0 +1,69 @@ +interface Item { + value: T + next: Item | null +} + +export class Queue { + private front: Item | null = null + private back: Item | null = null + private waiters: Array<(value: T) => void> = [] + + enqueue(value: T): void { + const waiter = this.waiters.shift() + if (waiter) { + waiter(value) + return + } + + const newItem: Item = { value, next: null } + if (this.back) { + this.back.next = newItem + } else { + this.front = newItem + } + this.back = newItem + } + + dequeue(): T | null { + if (!this.front) return null + const value = this.front.value + this.front = this.front.next + if (!this.front) this.back = null + return value + } + + next(signal?: AbortSignal): Promise { + const value = this.dequeue() + if (value !== null) return Promise.resolve(value) + + return new Promise((resolve) => { + if (signal) { + if (signal.aborted) { + resolve(null) + } else { + let _resolve: (v: T) => void + + const onAbort = () => { + this.waiters = this.waiters.filter((w) => w !== _resolve) + resolve(null) + } + + signal.addEventListener( + "abort", + onAbort, + { once: true }, + ) + + _resolve = (v: T) => { + signal.removeEventListener("abort", onAbort) + resolve(v) + } + + this.waiters.push(_resolve) + } + } else { + this.waiters.push(resolve) + } + }) + } +} diff --git a/apps/freya-backend/src/lib/worker.ts b/apps/freya-backend/src/lib/worker.ts new file mode 100644 index 0000000..6feb331 --- /dev/null +++ b/apps/freya-backend/src/lib/worker.ts @@ -0,0 +1,86 @@ +import type { Job, JobRegistry } from "./job" +import type { AppLogger } from "./logger" + +import { logger as rootLogger } from "./logger" + +export interface JobExecutor { + execute(job: Job): Promise +} + +export interface WorkerConfig { + concurrency: number + jobFields?: (job: Job) => Record + logger?: AppLogger + registry: JobRegistry + runner: JobExecutor + signal: AbortSignal +} + +export class Worker { + private concurrency: number + private jobFields: (job: Job) => Record + private logger: AppLogger + private registry: JobRegistry + private runner: JobExecutor + private signal: AbortSignal + + constructor({ + concurrency, + jobFields, + logger, + registry, + runner, + signal, + }: WorkerConfig) { + this.concurrency = concurrency + this.jobFields = jobFields ?? emptyJobFields + this.logger = logger ?? rootLogger.child({ component: "worker" }) + this.registry = registry + this.runner = runner + this.signal = signal + } + + start() { + if (this.signal.aborted) return + this.logger.debug({ concurrency: this.concurrency }, "worker started") + for (let i = 0; i < this.concurrency; i++) { + void this.pollJobFromRegistry(i) + } + } + + private async pollJobFromRegistry(workerId: number) { + while (!this.signal.aborted) { + const job = await this.registry.nextJob(this.signal) + if (!job) { + return + } + + const startedAt = Date.now() + const fields = { + ...this.jobFields(job), + jobId: job.id, + workerId, + } + this.logger.debug(fields, "job execution started") + + try { + await this.runner.execute(job) + this.logger.debug( + { ...fields, durationMs: Date.now() - startedAt }, + "job execution completed", + ) + } catch (err) { + this.logger.error( + { ...fields, durationMs: Date.now() - startedAt, err }, + "job execution failed", + ) + } finally { + this.registry.markJobAsCompleted(job) + } + } + } +} + +function emptyJobFields(_job: Job): Record { + return {} +} diff --git a/apps/freya-backend/src/notification/notification-central.ts b/apps/freya-backend/src/notification/notification-central.ts new file mode 100644 index 0000000..a3d87d0 --- /dev/null +++ b/apps/freya-backend/src/notification/notification-central.ts @@ -0,0 +1,36 @@ +import type { AgentEvent } from "@freya/agent-protocol" + +export interface AgentNotification { + kind: "agent" + payload: AgentEvent +} + +export type NotificationPayload = AgentNotification +export type NotificationListener = (notification: NotificationPayload) => Promise + +export class NotificationCentral { + private listeners: Map> = new Map() + + registerListenerForUser(userId: string, listener: NotificationListener): () => void { + let listeners = this.listeners.get(userId) + if (!listeners) { + listeners = new Set() + this.listeners.set(userId, listeners) + } + + listeners.add(listener) + return () => { + listeners.delete(listener) + if (listeners.size === 0) { + this.listeners.delete(userId) + } + } + } + + async notifyUser(userId: string, notification: NotificationPayload): Promise { + const listeners = this.listeners.get(userId) + if (!listeners) return + + await Promise.allSettled(Array.from(listeners).map((listener) => listener(notification))) + } +} diff --git a/apps/freya-backend/src/server.ts b/apps/freya-backend/src/server.ts index 34ab18d..1332a58 100644 --- a/apps/freya-backend/src/server.ts +++ b/apps/freya-backend/src/server.ts @@ -5,6 +5,7 @@ import { createMiddleware } from "hono/factory" import { registerAdminHttpHandlers } from "./admin/http.ts" import { createQueryDebugTools } from "./agent/debug-tools.ts" import { registerAgentHttpHandlers, registerDebugAgentHttpHandlers } from "./agent/http.ts" +import { AgentService } from "./agent/service.ts" import { agentWebSocket, registerAgentWebSocketHandlers } from "./agent/ws.ts" import { createRequireAdmin } from "./auth/admin-middleware.ts" import { registerAuthHandlers } from "./auth/http.ts" @@ -12,6 +13,7 @@ import { createAuth } from "./auth/index.ts" import { createRequireSession } from "./auth/session-middleware.ts" import { CalDavSourceProvider } from "./caldav/provider.ts" import { registerConversationsHttpHandlers } from "./conversations/http.ts" +import { DrizzleConversationStorage } from "./conversations/storage.ts" import { createDatabase } from "./db/index.ts" import { registerFeedHttpHandlers } from "./engine/http.ts" import { createFeedEnhancer } from "./enhancement/enhance-feed.ts" @@ -19,8 +21,10 @@ import { createLlmClient } from "./enhancement/llm-client.ts" import { GoogleMapsSourceProvider } from "./google-maps/provider.ts" import { CredentialEncryptor } from "./lib/crypto.ts" import { ensureEnv } from "./lib/env.ts" +import { logger } from "./lib/logger.ts" import { registerLocationHttpHandlers } from "./location/http.ts" import { LocationSourceProvider } from "./location/provider.ts" +import { NotificationCentral } from "./notification/notification-central.ts" import { ReminderSourceProvider } from "./reminders/provider.ts" import { UserSessionManager } from "./session/index.ts" import { registerSourcesHttpHandlers } from "./sources/http.ts" @@ -32,8 +36,12 @@ function main() { const env = ensureEnv(process.env) const { db, close: closeDb } = createDatabase(env.databaseUrl) + const conversationStorage = new DrizzleConversationStorage(db, false) + const auth = createAuth(db) + const abortController = new AbortController() + const feedEnhancer = createFeedEnhancer({ client: createLlmClient({ apiKey: env.openrouterApiKey, @@ -70,9 +78,22 @@ function main() { }, }) if (!piApiKey) { - console.warn("[query] PI_API_KEY or OPENROUTER_API_KEY not set — query agent unavailable") + logger.warn( + { component: "query_agent" }, + "PI_API_KEY or OPENROUTER_API_KEY not set; query agent unavailable", + ) } + const notificationCentral = new NotificationCentral() + + const agentService = new AgentService({ + notificationCentral, + storage: conversationStorage, + userSessionManager: sessionManager, + signal: abortController.signal, + logger, + }) + const app = new Hono() const isDev = process.env.NODE_ENV !== "production" @@ -141,17 +162,22 @@ function main() { registerAdminHttpHandlers(app, { sessionManager, adminMiddleware, db }) registerAgentWebSocketHandlers(app, { - sessionManager, + agentService, + notificationCentral, + storage: conversationStorage, authSessionMiddleware, corsMiddleware: agentWebSocketCorsMiddleware, }) process.on("SIGTERM", async () => { sessionManager.dispose() + abortController.abort() await closeDb() process.exit(0) }) + agentService.start() + return app } diff --git a/apps/freya-backend/src/session/user-session-manager.test.ts b/apps/freya-backend/src/session/user-session-manager.test.ts index 48f5764..c3a2e21 100644 --- a/apps/freya-backend/src/session/user-session-manager.test.ts +++ b/apps/freya-backend/src/session/user-session-manager.test.ts @@ -120,7 +120,7 @@ mock.module("../sources/user-sources.ts", () => ({ }), })) -mock.module("../conversations/storage.ts", () => ({ +mock.module("../conversations/db-storage.ts", () => ({ conversations: (_db: Database, userId: string) => ({ async getOrCreateConversation(): Promise<{ id: string }> { mockConversationCalls.push({ type: "getOrCreate", userId }) diff --git a/apps/freya-backend/src/session/user-session-manager.ts b/apps/freya-backend/src/session/user-session-manager.ts index 18143d1..221db6d 100644 --- a/apps/freya-backend/src/session/user-session-manager.ts +++ b/apps/freya-backend/src/session/user-session-manager.ts @@ -8,7 +8,7 @@ import type { FeedEnhancer } from "../enhancement/enhance-feed.ts" import type { CredentialEncryptor } from "../lib/crypto.ts" import type { FeedSourceProvider } from "./feed-source-provider.ts" -import { conversations } from "../conversations/storage.ts" +import { conversations } from "../conversations/db-storage.ts" import { CredentialStorageUnavailableError, InvalidSourceConfigError, diff --git a/apps/freya-backend/src/session/user-session.ts b/apps/freya-backend/src/session/user-session.ts index 2ee5c95..a6aa719 100644 --- a/apps/freya-backend/src/session/user-session.ts +++ b/apps/freya-backend/src/session/user-session.ts @@ -263,18 +263,12 @@ export class UserSession { const conversation = await conversationStorage.getOrCreateConversation() const entries = await conversationStorage.listEntries(conversation.id) - this.queryAgent = new ConversationRecordingQueryAgent({ - agent: new PiQueryAgent({ - toolbox: this.toolbox, - apiKey: this.agentConfig?.apiKey, - cwd: this.agentConfig?.cwd, - systemPrompt: this.agentConfig?.systemPrompt, - initialEntries: entries, - }), - storage: conversationStorage, - defaultConversationId: conversation.id, - modelProvider: PI_MODEL_PROVIDER, - modelId: PI_MODEL_ID, + this.queryAgent = new PiQueryAgent({ + toolbox: this.toolbox, + apiKey: this.agentConfig?.apiKey, + cwd: this.agentConfig?.cwd, + systemPrompt: this.agentConfig?.systemPrompt, + initialEntries: entries, }) } diff --git a/apps/freya-backend/src/sources/http.test.ts b/apps/freya-backend/src/sources/http.test.ts index abee1f8..b63fdef 100644 --- a/apps/freya-backend/src/sources/http.test.ts +++ b/apps/freya-backend/src/sources/http.test.ts @@ -128,7 +128,7 @@ mock.module("../sources/user-sources.ts", () => ({ }, })) -mock.module("../conversations/storage.ts", () => ({ +mock.module("../conversations/db-storage.ts", () => ({ conversations: (_db: Database, userId: string) => ({ async getOrCreateConversation() { return { id: `conversation-${userId}` } diff --git a/bun.lock b/bun.lock index a33c138..7ed832c 100644 --- a/bun.lock +++ b/bun.lock @@ -51,7 +51,7 @@ "version": "0.0.0", "dependencies": { "@freya/agent-protocol": "workspace:*", - "@nym.sh/jrpc": "^0.1.0", + "@nym.sh/jrpc": "1.1.0", }, }, "apps/freya-backend": { @@ -69,13 +69,15 @@ "@freya/source-tfl": "workspace:*", "@freya/source-weatherkit": "workspace:*", "@freya/source-web-search": "workspace:*", - "@nym.sh/jrpc": "^0.1.0", + "@nym.sh/jrpc": "1.1.0", "@openrouter/sdk": "^0.9.11", "arktype": "^2.1.29", "better-auth": "^1", "drizzle-orm": "^0.45.1", "hono": "^4", "lodash.merge": "^4.6.2", + "pino": "^10.3.1", + "pino-pretty": "^13.1.3", "typebox": "^1.1.38", }, "devDependencies": { @@ -841,7 +843,7 @@ "@nolyfill/is-core-module": ["@nolyfill/is-core-module@1.0.39", "", {}, "sha512-nn5ozdjYQpUCZlWGuxcJY/KpxkWQs4DcbMCmKojjyrYDEAGy4Ce19NN4v5MduafTwJlbKc99UA8YhSVqq9yPZA=="], - "@nym.sh/jrpc": ["@nym.sh/jrpc@0.1.0", "", {}, "sha512-qH+vqKojPrX4RkW67U2R4J98uWHxZOwYxX2J5GLZcfm/yjklCcN5zTfDNLfgAa9jAoOFVscC3DFWhvdZOmN3fA=="], + "@nym.sh/jrpc": ["@nym.sh/jrpc@1.1.0", "", {}, "sha512-212SYMB37GdL8enaRTTqG/LNa5bJ7eYth6jfQfECuedQCuaju0bOMUzCN6hvY5KkrxdYuqVKmr2Uz+ZZTjPlaQ=="], "@nym.sh/jrx": ["@nym.sh/jrx@0.2.0", "", { "peerDependencies": { "@json-render/core": ">=0.10.0" } }, "sha512-jd7Z1Q6T21366MtSUnwCFiu6Yl1AdNc9s5m6HxeUg265P+0enZCiyyxOuHsFwvpUcSEs/2DVBsqfMptdca44lA=="], @@ -907,6 +909,8 @@ "@oxlint/binding-win32-x64-msvc": ["@oxlint/binding-win32-x64-msvc@1.52.0", "", { "os": "win32", "cpu": "x64" }, "sha512-wikx9I9J9/lPOZlrCCNgm8YjWkia8NZfhWd1TTvZTMguyChbw/oA2VEM6Fzx+kkpA+1qu5Mo7nrLdOXEJavw8g=="], + "@pinojs/redact": ["@pinojs/redact@0.4.0", "", {}, "sha512-k2ENnmBugE/rzQfEcdWHcCY+/FM3VLzH9cYEsbdsoqrvzAKRhUZeRNhAZvB8OitQJ1TBed3yqWtdjzS6wJKBwg=="], + "@prisma/client": ["@prisma/client@7.4.2", "", { "dependencies": { "@prisma/client-runtime-utils": "7.4.2" }, "peerDependencies": { "prisma": "*", "typescript": ">=5.4.0" }, "optionalPeers": ["prisma", "typescript"] }, "sha512-ts2mu+cQHriAhSxngO3StcYubBGTWDtu/4juZhXCUKOwgh26l+s4KD3vT2kMUzFyrYnll9u/3qWrtzRv9CGWzA=="], "@prisma/client-runtime-utils": ["@prisma/client-runtime-utils@7.4.2", "", {}, "sha512-cID+rzOEb38VyMsx5LwJMEY4NGIrWCNpKu/0ImbeooQ2Px7TI+kOt7cm0NelxUzF2V41UVVXAmYjANZQtCu1/Q=="], @@ -1677,6 +1681,8 @@ "async-limiter": ["async-limiter@1.0.1", "", {}, "sha512-csOlWGAcRFJaI6m+F2WKdnMKr4HhdhFVBk0H/QbJFMCr+uO2kwohwXQPxw/9OCxp05r5ghVBFSyioixx3gfkNQ=="], + "atomic-sleep": ["atomic-sleep@1.0.0", "", {}, "sha512-kNOjDqAh7px0XWNI+4QbzoiR/nTkHAWNud2uvnJquD1/x5a7EQZMJT0AczqK0Qn67oY/TTQ1LbUKajZpp3I9tQ=="], + "available-typed-arrays": ["available-typed-arrays@1.0.7", "", { "dependencies": { "possible-typed-array-names": "^1.0.0" } }, "sha512-wvUjBtSGN7+7SjNpq/9M2Tg350UZD3q62IFZLbRAR1bSMlCo1ZaeW+BJ+D090e4hIIZLBcTDWe4Mh4jvUDajzQ=="], "aws-ssl-profiles": ["aws-ssl-profiles@1.1.2", "", {}, "sha512-NZKeq9AfyQvEeNlN0zSYAaWrmBffJh3IELMZfRpJVWgrpEbtEpnjvzqBPf+mxoI287JohRDoa+/nsfqqiZmF6g=="], @@ -1819,6 +1825,8 @@ "color-string": ["color-string@1.9.1", "", { "dependencies": { "color-name": "^1.0.0", "simple-swizzle": "^0.2.2" } }, "sha512-shrVawQFojnZv6xM40anx4CkoDP+fZsw/ZerEMsW/pyzsRbElpsL/DBVW7q3ExxwusdNXI3lXpuhEZkzs8p5Eg=="], + "colorette": ["colorette@2.0.20", "", {}, "sha512-IfEDxwoWIjkeXL1eXcDiow4UbKjhLdq6/EuSVR9GMN7KVH3r9gQ83e73hsz1Nd1T3ijd5xv1wcWRYO+D6kCI2w=="], + "comma-separated-tokens": ["comma-separated-tokens@2.0.3", "", {}, "sha512-Fu4hJdvzeylCfQPp9SGWidpzrMs7tTrlu6Vb8XGaRGck8QSNZJJp538Wrb60Lax4fPwR64ViY468OIUTbRlGZg=="], "commander": ["commander@14.0.3", "", {}, "sha512-H+y0Jo/T1RZ9qPP4Eh1pkcQcLRglraJaSLoyOtHxu6AapkjWVCy2Sit1QQ4x3Dng8qDlSsZEet7g5Pq06MvTgw=="], @@ -1879,6 +1887,8 @@ "data-view-byte-offset": ["data-view-byte-offset@1.0.1", "", { "dependencies": { "call-bound": "^1.0.2", "es-errors": "^1.3.0", "is-data-view": "^1.0.1" } }, "sha512-BS8PfmtDGnrgYdOonGZQdLZslWIeCGFP9tpan0hi1Co2Zr2NKADsvGYA8XxuG/4UWgJ6Cjtv+YJnB6MM69QGlQ=="], + "dateformat": ["dateformat@4.6.3", "", {}, "sha512-2P0p0pFGzHS5EMnhdxQi7aJN+iMheud0UhG4dlE1DLAlvL8JHjJJTX/CSm4JXwV0Ka5nGk3zC5mcb5bUQUxxMA=="], + "debug": ["debug@4.4.3", "", { "dependencies": { "ms": "^2.1.3" } }, "sha512-RGwwWnwQvkVfavKVt22FGLw+xYSdzARwm0ru6DhTVA3umU5hZc28V3kO4stgYryrTlLpuvgI9GiijltAjNbcqA=="], "decimal.js": ["decimal.js@10.6.0", "", {}, "sha512-YpgQiITW3JXGntzdUmyUR1V812Hn8T1YVXhCu+wO3OpS4eU9l4YdD3qjyiKdV6mvV29zapkMeD390UVEf2lkUg=="], @@ -1969,6 +1979,8 @@ "encodeurl": ["encodeurl@2.0.0", "", {}, "sha512-Q0n9HRi4m6JuGIV1eFlmvJB7ZEVxu93IrMyiMsGC0lrMJMWzRgx6WGquyfQgZVb31vhGgXnfmPNNXmxnOkRBrg=="], + "end-of-stream": ["end-of-stream@1.4.5", "", { "dependencies": { "once": "^1.4.0" } }, "sha512-ooEGc6HP26xXq/N+GCGOT0JKCLDGrq2bQUZrQ7gyrJiZANJ/8YDTxTpQBXGMn+WbIQXNVpyWymm7KYVICQnyOg=="], + "enhanced-resolve": ["enhanced-resolve@5.20.0", "", { "dependencies": { "graceful-fs": "^4.2.4", "tapable": "^2.3.0" } }, "sha512-/ce7+jQ1PQ6rVXwe+jKEg5hW5ciicHwIQUagZkp6IufBoY3YDgdTTY1azVs0qoRgVmvsNB+rbjLJxDAeHHtwsQ=="], "entities": ["entities@4.5.0", "", {}, "sha512-V0hjH4dGPh9Ao5p0MoRY6BVqtwCjhz6vI5LT8AJ55H+4g9/4vbHx1I54fS0XuclLhDHArPQCiMjDxjaL8fPxhw=="], @@ -2123,6 +2135,8 @@ "fast-check": ["fast-check@3.23.2", "", { "dependencies": { "pure-rand": "^6.1.0" } }, "sha512-h5+1OzzfCC3Ef7VbtKdcv7zsstUQwUDlYpUTvjeUsJAssPgLn7QzbboPtL5ro04Mq0rPOsMzl7q5hIbRs2wD1A=="], + "fast-copy": ["fast-copy@4.0.3", "", {}, "sha512-58apWr0GUiDFM8+3afrO6eYwJBn9ZAhDOzG3L+/9llab/haCARS2UIfffmOurYLwbgDRs8n0rfr6qAAPEAuAQw=="], + "fast-deep-equal": ["fast-deep-equal@3.1.3", "", {}, "sha512-f3qQ9oQy9j2AhBe/H9VC91wLmKBCCU/gDOnKNAYG5hswO7BLKj09Hc5HYNz9cGI++xlpDCIgDaitVs03ATR84Q=="], "fast-glob": ["fast-glob@3.3.3", "", { "dependencies": { "@nodelib/fs.stat": "^2.0.2", "@nodelib/fs.walk": "^1.2.3", "glob-parent": "^5.1.2", "merge2": "^1.3.0", "micromatch": "^4.0.8" } }, "sha512-7MptL8U0cqcFdzIzwOTHoilX9x5BrNqye7Z/LuC7kCMRio1EMSyqRK3BEAUD7sXRq4iT4AzTVuZdhgQ2TCvYLg=="], @@ -2131,6 +2145,8 @@ "fast-levenshtein": ["fast-levenshtein@2.0.6", "", {}, "sha512-DCXu6Ifhqcks7TZKY3Hxp3y6qphY5SJZmrWMDrKcERSOXWQdMhU9Ig/PYrzyw/ul9jOIyh0N4M0tbC5hodg8dw=="], + "fast-safe-stringify": ["fast-safe-stringify@2.1.1", "", {}, "sha512-W+KJc2dmILlPplD/H4K9l9LcAHAfPtP6BY84uVLXQ6Evcz9Lcg33Y2z1IVblT6xdY54PXYVHEv+0Wpq8Io6zkA=="], + "fast-sha256": ["fast-sha256@1.3.0", "", {}, "sha512-n11RGP/lrWEFI/bWdygLxhI+pVeo1ZYIVwvvPkW7azl/rOy+F3HYRZ2K5zeE9mmkhQppyv9sQFx0JM9UabnpPQ=="], "fast-uri": ["fast-uri@3.1.0", "", {}, "sha512-iPeeDKJSWf4IEOasVVrknXpaBV0IApz/gp7S2bb7Z4Lljbl2MGJRqInZiUrQwV16cpzw/D3S5j5Julj/gT52AA=="], @@ -2293,6 +2309,8 @@ "headers-polyfill": ["headers-polyfill@4.0.3", "", {}, "sha512-IScLbePpkvO846sIwOtOTDjutRMWdXdJmXdMvk6gCBHxFO8d+QKOQedyZSxFTTFYRSmlgSTDtXqqq4pcenBXLQ=="], + "help-me": ["help-me@5.0.0", "", {}, "sha512-7xgomUX6ADmcYzFik0HzAxh/73YlKR9bmFzf51CZwR+b6YtzU2m0u49hQCqV6SvlqIqsaxovfwdvbnsw3b/zpg=="], + "hermes-compiler": ["hermes-compiler@250829098.0.10", "", {}, "sha512-TcRlZ0/TlyfJqquRFAWoyElVNnkdYRi/sEp4/Qy8/GYxjg8j2cS9D4MjuaQ+qimkmLN7AmO+44IznRf06mAr0w=="], "hermes-estree": ["hermes-estree@0.25.1", "", {}, "sha512-0wUoCcLp+5Ev5pDW2OriHC2MJCbwLwuRx+gAqMTOkGKJJiBCLjtrvy4PWUGn6MIVefecRpzoOZ/UV6iGdOr+Cw=="], @@ -2467,6 +2485,8 @@ "jose": ["jose@6.2.1", "", {}, "sha512-jUaKr1yrbfaImV7R2TN/b3IcZzsw38/chqMpo2XJ7i2F8AfM/lA4G1goC3JVEwg0H7UldTmSt3P68nt31W7/mw=="], + "joycon": ["joycon@3.1.1", "", {}, "sha512-34wB/Y7MW7bzjKRjUKTa46I2Z7eV62Rkhva+KkopW7Qvv/OSWBqvkSY7vusOPrNuZcUG3tApvdVgNB8POj3SPw=="], + "js-tokens": ["js-tokens@4.0.0", "", {}, "sha512-RdJUflcE3cUzKiMqQgsCu06FPu9UdIJO0beYbPhHN4k6apgJtifcoCtT9bcxOpYBtpD2kCM6Sbzg4CausW/PKQ=="], "js-yaml": ["js-yaml@4.1.1", "", { "dependencies": { "argparse": "^2.0.1" }, "bin": { "js-yaml": "bin/js-yaml.js" } }, "sha512-qQKT4zQxXl8lLwBtHMWwaTcGfFOZviOJet3Oy/xmGk2gZH677CJM9EvtfdSkgWcATZhj/55JZ0rmy3myCT5lsA=="], @@ -2823,6 +2843,8 @@ "ohash": ["ohash@2.0.11", "", {}, "sha512-RdR9FQrFwNBNXAr4GixM8YaRZRJ5PUWbKYbE5eOsrwAjJW0q2REGcf79oYPsLyskQCZG1PLN+S/K1V00joZAoQ=="], + "on-exit-leak-free": ["on-exit-leak-free@2.1.2", "", {}, "sha512-0eJJY6hXLGf1udHwfNftBqH+g73EU4B504nZeKpz1sYRKafAghwxEJunB2O7rDZkL4PGfsMVnTXZ2EjibbqcsA=="], + "on-finished": ["on-finished@2.4.1", "", { "dependencies": { "ee-first": "1.1.1" } }, "sha512-oVlzkg3ENAhCk2zdv7IJwd/QUD4z2RxRwpkcGY8psCVcCYZNq4wYnVWALHM+brtuJjePWiYF/ClmuDr8Ch5+kg=="], "on-headers": ["on-headers@1.1.0", "", {}, "sha512-737ZY3yNnXy37FHkQxPzt4UZ2UWPWiCZWLvFZ4fu5cueciegX0zGPnrlY6bwRg4FdQOe9YU8MkmJwGhoMybl8A=="], @@ -2911,6 +2933,14 @@ "pify": ["pify@2.3.0", "", {}, "sha512-udgsAY+fTnvv7kI7aaxbqwWNb0AHiB0qBO89PZKPkoTmGOgdbrHDKD+0B2X4uTfJ/FT1R09r9gTsjUjNJotuog=="], + "pino": ["pino@10.3.1", "", { "dependencies": { "@pinojs/redact": "^0.4.0", "atomic-sleep": "^1.0.0", "on-exit-leak-free": "^2.1.0", "pino-abstract-transport": "^3.0.0", "pino-std-serializers": "^7.0.0", "process-warning": "^5.0.0", "quick-format-unescaped": "^4.0.3", "real-require": "^0.2.0", "safe-stable-stringify": "^2.3.1", "sonic-boom": "^4.0.1", "thread-stream": "^4.0.0" }, "bin": { "pino": "bin.js" } }, "sha512-r34yH/GlQpKZbU1BvFFqOjhISRo1MNx1tWYsYvmj6KIRHSPMT2+yHOEb1SG6NMvRoHRF0a07kCOox/9yakl1vg=="], + + "pino-abstract-transport": ["pino-abstract-transport@3.0.0", "", { "dependencies": { "split2": "^4.0.0" } }, "sha512-wlfUczU+n7Hy/Ha5j9a/gZNy7We5+cXp8YL+X+PG8S0KXxw7n/JXA3c46Y0zQznIJ83URJiwy7Lh56WLokNuxg=="], + + "pino-pretty": ["pino-pretty@13.1.3", "", { "dependencies": { "colorette": "^2.0.7", "dateformat": "^4.6.3", "fast-copy": "^4.0.0", "fast-safe-stringify": "^2.1.1", "help-me": "^5.0.0", "joycon": "^3.1.1", "minimist": "^1.2.6", "on-exit-leak-free": "^2.1.0", "pino-abstract-transport": "^3.0.0", "pump": "^3.0.0", "secure-json-parse": "^4.0.0", "sonic-boom": "^4.0.1", "strip-json-comments": "^5.0.2" }, "bin": { "pino-pretty": "bin.js" } }, "sha512-ttXRkkOz6WWC95KeY9+xxWL6AtImwbyMHrL1mSwqwW9u+vLp/WIElvHvCSDg0xO/Dzrggz1zv3rN5ovTRVowKg=="], + + "pino-std-serializers": ["pino-std-serializers@7.1.0", "", {}, "sha512-BndPH67/JxGExRgiX1dX0w1FvZck5Wa4aal9198SrRhZjH3GxKQUKIBnYJTdj2HDN3UQAS06HlfcSbQj2OHmaw=="], + "pirates": ["pirates@4.0.7", "", {}, "sha512-TfySrs/5nm8fQJDcBDuUng3VOUKsd7S+zqvbOTiGXHfxX4wK31ard+hoNuvkicM/2YFzlpDgABOevKSsB4G/FA=="], "pkce-challenge": ["pkce-challenge@5.0.1", "", {}, "sha512-wQ0b/W4Fr01qtpHlqSqspcj3EhBvimsdh0KlHhH8HRZnMsEa0ea2fTULOXOS9ccQr3om+GcGRk4e+isrZWV8qQ=="], @@ -2963,6 +2993,8 @@ "proc-log": ["proc-log@4.2.0", "", {}, "sha512-g8+OnU/L2v+wyiVK+D5fA34J7EH8jZ8DDlvwhRCMxmMj7UCBvxiO1mGeN+36JXIKF4zevU4kRBd8lVgG9vLelA=="], + "process-warning": ["process-warning@5.0.0", "", {}, "sha512-a39t9ApHNx2L4+HBnQKqxxHNs1r7KF+Intd8Q/g1bUh6q0WIp9voPXJ/x0j+ZL45KF1pJd9+q2jLIRMfvEshkA=="], + "progress": ["progress@2.0.3", "", {}, "sha512-7PiHtLll5LdnKIMw100I+8xJXR5gW2QwWYkT6iJva0bXitZKa/XMrSbdmg3r2Xnaidz9Qumd0VPaMrZlF9V9sA=="], "promise": ["promise@8.3.0", "", { "dependencies": { "asap": "~2.0.6" } }, "sha512-rZPNPKTOYVNEEKFaq1HqTgOwZD+4/YHS5ukLzQCypkj+OkYx7iv0mA91lJlpPPZ8vMau3IIGj5Qlwrx+8iiSmg=="], @@ -2979,6 +3011,8 @@ "proxy-addr": ["proxy-addr@2.0.7", "", { "dependencies": { "forwarded": "0.2.0", "ipaddr.js": "1.9.1" } }, "sha512-llQsMLSUDUPT44jdrU/O37qlnifitDP+ZwrmmZcoSKyLKvtZxpyV0n2/bD/N4tBAAZ/gJEdZU7KMraoK1+XYAg=="], + "pump": ["pump@3.0.4", "", { "dependencies": { "end-of-stream": "^1.1.0", "once": "^1.3.1" } }, "sha512-VS7sjc6KR7e1ukRFhQSY5LM2uBWAUPiOPa/A3mkKmiMwSmRFUITt0xuj+/lesgnCv+dPIEYlkzrcyXgquIHMcA=="], + "punycode": ["punycode@2.3.1", "", {}, "sha512-vYt7UD1U9Wg6138shLtLOvdAu+8DsC/ilFtEVHcH+wydcSpNE20AfSOduf6MkRFahL5FY7X1oU7nKVZFtfq8Fg=="], "pure-rand": ["pure-rand@6.1.0", "", {}, "sha512-bVWawvoZoBYpp6yIoQtQXHZjmz35RSVHnUOTefl8Vcjr8snTPY1wnpSPMWekcFwbxI6gtmT7rSYPFvz71ldiOA=="], @@ -2991,6 +3025,8 @@ "queue-microtask": ["queue-microtask@1.2.3", "", {}, "sha512-NuaNSa6flKT5JaSYQzJok04JzTL1CA6aGhv5rfLW3PgqA+M2ChpZQnAC8h8i4ZFkBS8X5RqkDBHA7r4hej3K9A=="], + "quick-format-unescaped": ["quick-format-unescaped@4.0.4", "", {}, "sha512-tYC1Q1hgyRuHgloV/YXs2w15unPVh8qfu/qCTfhTYamaw7fyhumKa2yGpdSo87vY32rIclj+4fWYQXUMs9EHvg=="], + "radix-ui": ["radix-ui@1.4.3", "", { "dependencies": { "@radix-ui/primitive": "1.1.3", "@radix-ui/react-accessible-icon": "1.1.7", "@radix-ui/react-accordion": "1.2.12", "@radix-ui/react-alert-dialog": "1.1.15", "@radix-ui/react-arrow": "1.1.7", "@radix-ui/react-aspect-ratio": "1.1.7", "@radix-ui/react-avatar": "1.1.10", "@radix-ui/react-checkbox": "1.3.3", "@radix-ui/react-collapsible": "1.1.12", "@radix-ui/react-collection": "1.1.7", "@radix-ui/react-compose-refs": "1.1.2", "@radix-ui/react-context": "1.1.2", "@radix-ui/react-context-menu": "2.2.16", "@radix-ui/react-dialog": "1.1.15", "@radix-ui/react-direction": "1.1.1", "@radix-ui/react-dismissable-layer": "1.1.11", "@radix-ui/react-dropdown-menu": "2.1.16", "@radix-ui/react-focus-guards": "1.1.3", "@radix-ui/react-focus-scope": "1.1.7", "@radix-ui/react-form": "0.1.8", "@radix-ui/react-hover-card": "1.1.15", "@radix-ui/react-label": "2.1.7", "@radix-ui/react-menu": "2.1.16", "@radix-ui/react-menubar": "1.1.16", "@radix-ui/react-navigation-menu": "1.2.14", "@radix-ui/react-one-time-password-field": "0.1.8", "@radix-ui/react-password-toggle-field": "0.1.3", "@radix-ui/react-popover": "1.1.15", "@radix-ui/react-popper": "1.2.8", "@radix-ui/react-portal": "1.1.9", "@radix-ui/react-presence": "1.1.5", "@radix-ui/react-primitive": "2.1.3", "@radix-ui/react-progress": "1.1.7", "@radix-ui/react-radio-group": "1.3.8", "@radix-ui/react-roving-focus": "1.1.11", "@radix-ui/react-scroll-area": "1.2.10", "@radix-ui/react-select": "2.2.6", "@radix-ui/react-separator": "1.1.7", "@radix-ui/react-slider": "1.3.6", "@radix-ui/react-slot": "1.2.3", "@radix-ui/react-switch": "1.2.6", "@radix-ui/react-tabs": "1.1.13", "@radix-ui/react-toast": "1.2.15", "@radix-ui/react-toggle": "1.1.10", "@radix-ui/react-toggle-group": "1.1.11", "@radix-ui/react-toolbar": "1.1.11", "@radix-ui/react-tooltip": "1.2.8", "@radix-ui/react-use-callback-ref": "1.1.1", "@radix-ui/react-use-controllable-state": "1.2.2", "@radix-ui/react-use-effect-event": "0.0.2", "@radix-ui/react-use-escape-keydown": "1.1.1", "@radix-ui/react-use-is-hydrated": "0.1.0", "@radix-ui/react-use-layout-effect": "1.1.1", "@radix-ui/react-use-size": "1.1.1", "@radix-ui/react-visually-hidden": "1.2.3" }, "peerDependencies": { "@types/react": "*", "@types/react-dom": "*", "react": "^16.8 || ^17.0 || ^18.0 || ^19.0 || ^19.0.0-rc", "react-dom": "^16.8 || ^17.0 || ^18.0 || ^19.0 || ^19.0.0-rc" }, "optionalPeers": ["@types/react", "@types/react-dom"] }, "sha512-aWizCQiyeAenIdUbqEpXgRA1ya65P13NKn/W8rWkcN0OPkRDxdBVLWnIEDsS2RpwCK2nobI7oMUSmexzTDyAmA=="], "range-parser": ["range-parser@1.2.1", "", {}, "sha512-Hrgsx+orqoygnmhFbKaHE6c296J+HTAQXoxEF6gNupROmmGJRoyzfG3ccAveqCBrwr/2yxQ5BVd/GTl5agOwSg=="], @@ -3051,6 +3087,8 @@ "readdirp": ["readdirp@4.1.2", "", {}, "sha512-GDhwkLfywWL2s6vEjyhri+eXmfH6j1L7JE27WhqLeYzoh/A3DBaYGEj2H/HFZCn/kMfim73FXxEJTw06WtxQwg=="], + "real-require": ["real-require@0.2.0", "", {}, "sha512-57frrGM/OCTLqLOAh0mhVA9VBMHd+9U7Zb2THMGdBUoZVOtGbJzjxsYGDJ3A9AYYCP4hn6y1TVbaOfzWtm5GFg=="], + "recast": ["recast@0.23.11", "", { "dependencies": { "ast-types": "^0.16.1", "esprima": "~4.0.0", "source-map": "~0.6.1", "tiny-invariant": "^1.3.3", "tslib": "^2.0.1" } }, "sha512-YTUo+Flmw4ZXiWfQKGcwwc11KnoRAYgzAE2E7mXKCjSviTKShtxBsN6YUUBB2gtaBzKzeKunxhUwNHQuRryhWA=="], "redent": ["redent@3.0.0", "", { "dependencies": { "indent-string": "^4.0.0", "strip-indent": "^3.0.0" } }, "sha512-6tDA8g98We0zd0GvVeMT9arEOnTw9qM03L9cJXaCjrip1OO764RDBLBfrB4cwzNGDj5OA5ioymC9GkizgWJDUg=="], @@ -3131,12 +3169,16 @@ "safe-regex-test": ["safe-regex-test@1.1.0", "", { "dependencies": { "call-bound": "^1.0.2", "es-errors": "^1.3.0", "is-regex": "^1.2.1" } }, "sha512-x/+Cz4YrimQxQccJf5mKEbIa1NzeCRNI5Ecl/ekmlYaampdNLPalVyIcCZNNH3MvmqBugV5TMYZXv0ljslUlaw=="], + "safe-stable-stringify": ["safe-stable-stringify@2.5.0", "", {}, "sha512-b3rppTKm9T+PsVCBEOUR46GWI7fdOs00VKZ1+9c1EWDaDMvjQc6tUwuFyIprgGgTcWoVHSKrU8H31ZHA2e0RHA=="], + "safer-buffer": ["safer-buffer@2.1.2", "", {}, "sha512-YZo3K82SD7Riyi0E1EQPojLz7kpepnSQI9IyPbHHg1XXXevb5dJI7tpyN2ADxGcQbHG7vcyRHk0cbwqcQriUtg=="], "sax": ["sax@1.5.0", "", {}, "sha512-21IYA3Q5cQf089Z6tgaUTr7lDAyzoTPx5HRtbhsME8Udispad8dC/+sziTNugOEx54ilvatQ9YCzl4KQLPcRHA=="], "scheduler": ["scheduler@0.27.0", "", {}, "sha512-eNv+WrVbKu1f3vbYJT/xtiF5syA5HPIMtf9IgY/nKg0sWqzAUEvqY/xm7OcZc/qafLx/iO9FgOmeSAp4v5ti/Q=="], + "secure-json-parse": ["secure-json-parse@4.1.0", "", {}, "sha512-l4KnYfEyqYJxDwlNVyRfO2E4NTHfMKAWdUuA8J0yve2Dz/E/PdBepY03RvyJpssIpRFwJoCD55wA+mEDs6ByWA=="], + "semver": ["semver@7.7.4", "", { "bin": { "semver": "bin/semver.js" } }, "sha512-vFKC2IEtQnVhpT78h1Yp8wzwrf8CM+MzKMHGJZfBtzhZNycRFnXsHk6E5TxIkkMsgNS7mdX3AGB7x2QM2di4lA=="], "send": ["send@1.2.1", "", { "dependencies": { "debug": "^4.4.3", "encodeurl": "^2.0.0", "escape-html": "^1.0.3", "etag": "^1.8.1", "fresh": "^2.0.0", "http-errors": "^2.0.1", "mime-types": "^3.0.2", "ms": "^2.1.3", "on-finished": "^2.4.1", "range-parser": "^1.2.1", "statuses": "^2.0.2" } }, "sha512-1gnZf7DFcoIcajTjTwjwuDjzuz4PPcY2StKPlsGAQ1+YH20IRVrBaXSWmdjowTJ6u8Rc01PoYOGHXfP1mYcZNQ=="], @@ -3195,6 +3237,8 @@ "slugify": ["slugify@1.6.6", "", {}, "sha512-h+z7HKHYXj6wJU+AnS/+IH8Uh9fdcX1Lrhg1/VMdf9PwoBQXFcXiAdsy2tSK0P6gKwJLXp02r90ahUCqHk9rrw=="], + "sonic-boom": ["sonic-boom@4.2.1", "", { "dependencies": { "atomic-sleep": "^1.0.0" } }, "sha512-w6AxtubXa2wTXAUsZMMWERrsIRAdrK0Sc+FUytWvYAhBJLyuI4llrMIC1DtlNSdI99EI86KZum2MMq3EAZlF9Q=="], + "sonner": ["sonner@2.0.7", "", { "peerDependencies": { "react": "^18.0.0 || ^19.0.0 || ^19.0.0-rc", "react-dom": "^18.0.0 || ^19.0.0 || ^19.0.0-rc" } }, "sha512-W6ZN4p58k8aDKA4XPcx2hpIQXBRAgyiWVkYhT7CvK6D3iAu7xjvVyhQHg2/iaKJZ1XVJ4r7XuwGL+WGEK37i9w=="], "source-map": ["source-map@0.6.1", "", {}, "sha512-UjgapumWlbMhkBgzT7Ykc5YXUT46F0iKu8SGXq0bcwP5dz/h0Plj6enJqjz1Zbq2l5WaqYnrVbwWOWMyF3F47g=="], @@ -3263,7 +3307,7 @@ "strip-indent": ["strip-indent@3.0.0", "", { "dependencies": { "min-indent": "^1.0.0" } }, "sha512-laJTa3Jb+VQpaC6DseHhF7dXVqHTfJPCRDaEbid/drOhgitgYku/letMUqOXFoWV0zIIUbjpdH2t+tYj4bQMRQ=="], - "strip-json-comments": ["strip-json-comments@3.1.1", "", {}, "sha512-6fPc+R4ihwqP6N/aIv2f1gMH8lOVtWQHoqC4yK6oSDVVocumAsfCqjkXnqiYMhmMwS/mEHLp7Vehlt3ql6lEig=="], + "strip-json-comments": ["strip-json-comments@5.0.3", "", {}, "sha512-1tB5mhVo7U+ETBKNf92xT4hrQa3pm0MZ0PQvuDnWgAAGHDsfp4lPSpiS6psrSiet87wyGPh9ft6wmhOMQ0hDiw=="], "strnum": ["strnum@2.4.0", "", { "dependencies": { "anynum": "^1.0.0" } }, "sha512-sHrVyWWdq28RbhjuJdZsA1SnGRJV6NiXbk6AXBxDOsgAcA+lmpUZCYjOdLBxkXMwis6RRe7dlZt4VlIWFVzkmg=="], @@ -3301,6 +3345,8 @@ "thenify-all": ["thenify-all@1.6.0", "", { "dependencies": { "thenify": ">= 3.1.0 < 4" } }, "sha512-RNxQH/qI8/t3thXJDwcstUO4zeqo64+Uy/+sNVRBx4Xn2OX+OZ9oP+iJnNFqplFra2ZUVeKCSa2oVWi3T4uVmA=="], + "thread-stream": ["thread-stream@4.2.0", "", { "dependencies": { "real-require": "^1.0.0" } }, "sha512-e2zZ96wSChazBsbENf/Pcm/4swHt2cEKQ92rhUjkL9GCKiTDJIaTBenjE/m9DXi0QBmTMDkFDdOomUy20A1tDQ=="], + "throat": ["throat@5.0.0", "", {}, "sha512-fcwX4mndzpLQKBS1DVYhGAcYaYt7vsHNIvQV+WXMvnow5cgjPphq5CaayLaGsjRdSCKZFNGt7/GYAuXaNOiYCA=="], "tiny-invariant": ["tiny-invariant@1.3.3", "", {}, "sha512-+FbBPE1o9QAYvviau/qC5SE3caw21q3xkvWKBtja5vgqOWIHHJ3ioaq1VPfn/Szqctz2bU/oYeKd9/z5BL+PVg=="], @@ -3633,6 +3679,8 @@ "@eslint/eslintrc/minimatch": ["minimatch@3.1.5", "", { "dependencies": { "brace-expansion": "^1.1.7" } }, "sha512-VgjWUsnnT6n+NUk6eZq77zeFdpW2LWDzP6zFGrCbHXiYNul5Dzqk2HHQ5uFH2DNW5Xbp8+jVzaeNt94ssEEl4w=="], + "@eslint/eslintrc/strip-json-comments": ["strip-json-comments@3.1.1", "", {}, "sha512-6fPc+R4ihwqP6N/aIv2f1gMH8lOVtWQHoqC4yK6oSDVVocumAsfCqjkXnqiYMhmMwS/mEHLp7Vehlt3ql6lEig=="], + "@expo/cli/accepts": ["accepts@1.3.8", "", { "dependencies": { "mime-types": "~2.1.34", "negotiator": "0.6.3" } }, "sha512-PYAthTa2m2VKxuvSD3DPC/Gy+U+sOA1LAuT8mkmRuvw+NACSaeXEQ+NHcVF7rONl6qcaxV3Uuemwawk+7+SJLw=="], "@expo/cli/chalk": ["chalk@4.1.2", "", { "dependencies": { "ansi-styles": "^4.1.0", "supports-color": "^7.1.0" } }, "sha512-oKnbhFyRIXpUuez8iBMmyEa4nbj4IOQyuhc/wy9kY7/WVPcwIO9VA668Pu8RkO7+0G76SLROeyw9CpQ061i4mA=="], @@ -4015,6 +4063,8 @@ "terser/commander": ["commander@2.20.3", "", {}, "sha512-GpVkmM8vF2vQUkj2LvZmD35JxeJOLCwJ9cUkugyk2nuhbv3+mJvpLYYt+0+USMxE+oj+ey/lJEnhZw75x/OMcQ=="], + "thread-stream/real-require": ["real-require@1.0.0", "", {}, "sha512-P4nbQYQfePJxRSmY+v/KINxVucm4NF3p3s7pJveMTtom52FR4YGltUQLB8idDXwDDWW+eYrWDFbuzUnjoWHF7g=="], + "twrnc/tailwindcss": ["tailwindcss@3.4.19", "", { "dependencies": { "@alloc/quick-lru": "^5.2.0", "arg": "^5.0.2", "chokidar": "^3.6.0", "didyoumean": "^1.2.2", "dlv": "^1.1.3", "fast-glob": "^3.3.2", "glob-parent": "^6.0.2", "is-glob": "^4.0.3", "jiti": "^1.21.7", "lilconfig": "^3.1.3", "micromatch": "^4.0.8", "normalize-path": "^3.0.0", "object-hash": "^3.0.0", "picocolors": "^1.1.1", "postcss": "^8.4.47", "postcss-import": "^15.1.0", "postcss-js": "^4.0.1", "postcss-load-config": "^4.0.2 || ^5.0 || ^6.0", "postcss-nested": "^6.2.0", "postcss-selector-parser": "^6.1.2", "resolve": "^1.22.8", "sucrase": "^3.35.0" }, "bin": { "tailwind": "lib/cli.js", "tailwindcss": "lib/cli.js" } }, "sha512-3ofp+LL8E+pK/JuPLPggVAIaEuhvIz4qNcf3nA1Xn2o/7fb7s/TYpHhwGDv1ZU3PkBluUVaF8PyCHcm48cKLWQ=="], "vite/esbuild": ["esbuild@0.27.3", "", { "optionalDependencies": { "@esbuild/aix-ppc64": "0.27.3", "@esbuild/android-arm": "0.27.3", "@esbuild/android-arm64": "0.27.3", "@esbuild/android-x64": "0.27.3", "@esbuild/darwin-arm64": "0.27.3", "@esbuild/darwin-x64": "0.27.3", "@esbuild/freebsd-arm64": "0.27.3", "@esbuild/freebsd-x64": "0.27.3", "@esbuild/linux-arm": "0.27.3", "@esbuild/linux-arm64": "0.27.3", "@esbuild/linux-ia32": "0.27.3", "@esbuild/linux-loong64": "0.27.3", "@esbuild/linux-mips64el": "0.27.3", "@esbuild/linux-ppc64": "0.27.3", "@esbuild/linux-riscv64": "0.27.3", "@esbuild/linux-s390x": "0.27.3", "@esbuild/linux-x64": "0.27.3", "@esbuild/netbsd-arm64": "0.27.3", "@esbuild/netbsd-x64": "0.27.3", "@esbuild/openbsd-arm64": "0.27.3", "@esbuild/openbsd-x64": "0.27.3", "@esbuild/openharmony-arm64": "0.27.3", "@esbuild/sunos-x64": "0.27.3", "@esbuild/win32-arm64": "0.27.3", "@esbuild/win32-ia32": "0.27.3", "@esbuild/win32-x64": "0.27.3" }, "bin": { "esbuild": "bin/esbuild" } }, "sha512-8VwMnyGCONIs6cWue2IdpHxHnAjzxnw2Zr7MkVxB2vjmQ2ivqGFb4LEG3SMnv0Gb2F/G/2yA8zUaiL1gywDCCg=="],