Compare commits

..

6 Commits

29 changed files with 3465 additions and 405 deletions

View File

@@ -1 +1,49 @@
CREATE INDEX "user_sources_user_id_enabled_idx" ON "user_sources" USING btree ("user_id","enabled"); CREATE INDEX "user_sources_user_id_enabled_idx" ON "user_sources" USING btree ("user_id","enabled");--> statement-breakpoint
CREATE TABLE "conversation_entries" (
"id" uuid PRIMARY KEY DEFAULT gen_random_uuid() NOT NULL,
"conversation_id" uuid NOT NULL,
"sequence" integer NOT NULL,
"kind" text NOT NULL,
"visibility" text DEFAULT 'internal' NOT NULL,
"file_id" uuid,
"payload" jsonb NOT NULL,
"metadata" jsonb DEFAULT '{}'::jsonb NOT NULL,
"created_at" timestamp DEFAULT now() NOT NULL,
CONSTRAINT "conversation_entries_conversation_id_sequence_unique" UNIQUE("conversation_id","sequence")
);
--> statement-breakpoint
CREATE TABLE "conversations" (
"id" uuid PRIMARY KEY DEFAULT gen_random_uuid() NOT NULL,
"user_id" text NOT NULL,
"created_at" timestamp DEFAULT now() NOT NULL,
"updated_at" timestamp DEFAULT now() NOT NULL
);
--> statement-breakpoint
CREATE TABLE "files" (
"id" uuid PRIMARY KEY DEFAULT gen_random_uuid() NOT NULL,
"user_id" text NOT NULL,
"storage_key" text NOT NULL,
"original_name" text,
"mime_type" text NOT NULL,
"size_bytes" integer NOT NULL,
"metadata" jsonb DEFAULT '{}'::jsonb NOT NULL,
"created_at" timestamp DEFAULT now() NOT NULL,
CONSTRAINT "files_storage_key_unique" UNIQUE("storage_key")
);
--> statement-breakpoint
ALTER TABLE "session" ADD COLUMN "impersonated_by" text;--> statement-breakpoint
ALTER TABLE "user" ADD COLUMN "role" text;--> statement-breakpoint
ALTER TABLE "user" ADD COLUMN "banned" boolean DEFAULT false;--> statement-breakpoint
ALTER TABLE "user" ADD COLUMN "ban_reason" text;--> statement-breakpoint
ALTER TABLE "user" ADD COLUMN "ban_expires" timestamp;--> statement-breakpoint
ALTER TABLE "conversation_entries" ADD CONSTRAINT "conversation_entries_conversation_id_conversations_id_fk" FOREIGN KEY ("conversation_id") REFERENCES "public"."conversations"("id") ON DELETE cascade ON UPDATE no action;--> statement-breakpoint
ALTER TABLE "conversation_entries" ADD CONSTRAINT "conversation_entries_file_id_files_id_fk" FOREIGN KEY ("file_id") REFERENCES "public"."files"("id") ON DELETE restrict ON UPDATE no action;--> statement-breakpoint
ALTER TABLE "conversation_entries" ADD CONSTRAINT "conversation_entries_attachment_file_id_check" CHECK (("conversation_entries"."kind" = 'attachment' and "conversation_entries"."file_id" is not null) or ("conversation_entries"."kind" <> 'attachment' and "conversation_entries"."file_id" is null));--> statement-breakpoint
ALTER TABLE "conversations" ADD CONSTRAINT "conversations_user_id_user_id_fk" FOREIGN KEY ("user_id") REFERENCES "public"."user"("id") ON DELETE cascade ON UPDATE no action;--> statement-breakpoint
ALTER TABLE "files" ADD CONSTRAINT "files_user_id_user_id_fk" FOREIGN KEY ("user_id") REFERENCES "public"."user"("id") ON DELETE cascade ON UPDATE no action;--> statement-breakpoint
CREATE INDEX "conversation_entries_conversation_id_sequence_idx" ON "conversation_entries" USING btree ("conversation_id","sequence");--> statement-breakpoint
CREATE INDEX "conversation_entries_conversation_id_visibility_sequence_idx" ON "conversation_entries" USING btree ("conversation_id","visibility","sequence");--> statement-breakpoint
CREATE INDEX "conversation_entries_kind_idx" ON "conversation_entries" USING btree ("kind");--> statement-breakpoint
CREATE INDEX "conversation_entries_file_id_idx" ON "conversation_entries" USING btree ("file_id");--> statement-breakpoint
CREATE INDEX "conversations_user_id_updated_at_idx" ON "conversations" USING btree ("user_id","updated_at");--> statement-breakpoint
CREATE INDEX "files_user_id_created_at_idx" ON "files" USING btree ("user_id","created_at");

File diff suppressed because it is too large Load Diff

View File

@@ -44,6 +44,20 @@ mock.module("../sources/user-sources.ts", () => ({
}), }),
})) }))
mock.module("../conversations/storage.ts", () => ({
conversations: (_db: Database, userId: string) => ({
async getOrCreateConversation() {
return { id: `conversation-${userId}` }
},
async listEntries() {
return []
},
async appendEntry() {
return { id: "entry-1", sequence: 1 }
},
}),
}))
function createStubSource(id: string): FeedSource { function createStubSource(id: string): FeedSource {
return { return {
id, id,

View File

@@ -0,0 +1,347 @@
import { describe, expect, test } from "bun:test"
import type { AppendConversationEntryInput } from "../conversations/storage.ts"
import type {
ConversationStorage,
ConversationStorageEntry,
} from "./conversation-recording-query-agent.ts"
import { ConversationEntryKind } from "../conversations/types.ts"
import { ConversationRecordingQueryAgent } from "./conversation-recording-query-agent.ts"
import {
createQueryAgentEventListeners,
QueryAgentEvent,
type QueryAgent,
type QueryAgentAsk,
type QueryAgentCompactionEvent,
type QueryAgentEventListeners,
type QueryAgentEventListener,
type QueryAgentEventMap,
type QueryAgentStreamEvent,
} from "./query-agent.ts"
interface RecordedEntry {
conversationId: string
input: AppendConversationEntryInput
}
class FakeQueryAgent implements QueryAgent {
readonly inputs: QueryAgentAsk[] = []
private readonly events: QueryAgentStreamEvent[]
private readonly eventListeners = createQueryAgentEventListeners()
constructor(events: QueryAgentStreamEvent[]) {
this.events = events
}
async *ask(input: QueryAgentAsk): AsyncIterable<QueryAgentStreamEvent> {
this.inputs.push(input)
for (const event of this.events) {
yield event
}
}
addEventListener<T extends QueryAgentEvent>(
type: T,
listener: QueryAgentEventListener<T>,
): () => void {
const listeners = this.listenersFor(type)
listeners.add(listener)
return () => {
listeners.delete(listener)
}
}
async emitCompaction(event: QueryAgentCompactionEvent): Promise<void> {
await this.emitEvent(event)
}
private async emitEvent<T extends QueryAgentEvent>(event: QueryAgentEventMap[T]): Promise<void> {
const listeners = this.listenersFor(event.type)
for (const listener of listeners) {
await listener(event)
}
}
private listenersFor<T extends QueryAgentEvent>(type: T): QueryAgentEventListeners[T] {
return this.eventListeners[type]
}
dispose(): void {}
}
class FakeConversationStorage implements ConversationStorage {
getOrCreateCount = 0
readonly entries: RecordedEntry[] = []
conversationId = "conversation-1"
async getOrCreateConversation(): Promise<{ id: string }> {
this.getOrCreateCount += 1
return { id: this.conversationId }
}
async appendEntry(
conversationId: string,
input: AppendConversationEntryInput,
): Promise<ConversationStorageEntry> {
this.entries.push({ conversationId, input })
return {
id: `entry-${this.entries.length}`,
sequence: this.entries.length,
kind: input.kind,
payload: input.payload,
metadata: input.metadata ?? {},
createdAt: new Date("2026-06-15T00:00:00.000Z"),
}
}
async listEntries(_conversationId: string): Promise<ConversationStorageEntry[]> {
return this.entries.map((entry, index) => ({
id: `entry-${index + 1}`,
sequence: index + 1,
kind: entry.input.kind,
payload: entry.input.payload,
metadata: entry.input.metadata ?? {},
createdAt: new Date("2026-06-15T00:00:00.000Z"),
}))
}
}
describe("ConversationRecordingQueryAgent", () => {
test("records user and assistant messages in the conversation timeline", async () => {
const queryAgent = new FakeQueryAgent([
{ type: "text_delta", text: "Hello " },
{ type: "text_delta", text: "there." },
{ type: "done" },
])
const storage = new FakeConversationStorage()
const agent = new ConversationRecordingQueryAgent({
agent: queryAgent,
storage,
modelProvider: "openrouter",
modelId: "test-model",
})
const events = await collectEvents(
agent.ask({
message: "hi",
}),
)
expect(events[0]).toEqual({ type: "conversation", conversationId: "conversation-1" })
expect(queryAgent.inputs[0]?.conversationId).toBe("conversation-1")
expect(storage.getOrCreateCount).toBe(1)
expect(storage.entries).toHaveLength(2)
const userEntry = storage.entries[0]!.input
if (userEntry.kind !== ConversationEntryKind.UserMessage) {
throw new Error("Expected user message entry")
}
expect(userEntry.payload.parts).toEqual([{ type: "text", text: "hi" }])
const assistantEntry = storage.entries[1]!.input
if (assistantEntry.kind !== ConversationEntryKind.AssistantMessage) {
throw new Error("Expected assistant message entry")
}
expect(assistantEntry.payload.parts).toEqual([{ type: "text", text: "Hello there." }])
expect(assistantEntry.metadata?.modelRun?.provider).toBe("openrouter")
expect(assistantEntry.metadata?.modelRun?.model).toBe("test-model")
})
test("uses a provided conversation id without creating a default conversation", async () => {
const queryAgent = new FakeQueryAgent([{ type: "done" }])
const storage = new FakeConversationStorage()
const agent = new ConversationRecordingQueryAgent({
agent: queryAgent,
storage,
modelProvider: "openrouter",
modelId: "test-model",
})
const events = await collectEvents(
agent.ask({
conversationId: "conversation-existing",
message: "continue",
}),
)
expect(events[0]).toEqual({
type: "conversation",
conversationId: "conversation-existing",
})
expect(storage.getOrCreateCount).toBe(0)
expect(storage.entries[0]?.conversationId).toBe("conversation-existing")
expect(queryAgent.inputs[0]?.conversationId).toBe("conversation-existing")
})
test("uses the eager default conversation id without reading storage on ask", async () => {
const queryAgent = new FakeQueryAgent([{ type: "done" }])
const storage = new FakeConversationStorage()
const agent = new ConversationRecordingQueryAgent({
agent: queryAgent,
storage,
defaultConversationId: "conversation-eager",
modelProvider: "openrouter",
modelId: "test-model",
})
const events = await collectEvents(
agent.ask({
message: "continue",
}),
)
expect(events[0]).toEqual({
type: "conversation",
conversationId: "conversation-eager",
})
expect(storage.getOrCreateCount).toBe(0)
expect(storage.entries[0]?.conversationId).toBe("conversation-eager")
expect(queryAgent.inputs[0]?.conversationId).toBe("conversation-eager")
})
test("rejects switching away from the eager default conversation", async () => {
const queryAgent = new FakeQueryAgent([{ type: "done" }])
const storage = new FakeConversationStorage()
const agent = new ConversationRecordingQueryAgent({
agent: queryAgent,
storage,
defaultConversationId: "conversation-eager",
modelProvider: "openrouter",
modelId: "test-model",
})
const events = await collectEvents(
agent.ask({
conversationId: "conversation-other",
message: "continue",
}),
)
expect(events).toEqual([
{
type: "error",
message: "Conversation switching is not supported for this session",
},
])
expect(storage.entries).toHaveLength(0)
expect(queryAgent.inputs).toHaveLength(0)
})
test("records tool activity and agent errors as internal entries", async () => {
const queryAgent = new FakeQueryAgent([
{ type: "tool_start", toolName: "freya_get_feed" },
{ type: "tool_end", toolName: "freya_get_feed", ok: true },
{ type: "error", message: "model unavailable" },
])
const storage = new FakeConversationStorage()
const agent = new ConversationRecordingQueryAgent({
agent: queryAgent,
storage,
modelProvider: "openrouter",
modelId: "test-model",
})
await collectEvents(
agent.ask({
message: "what now?",
}),
)
expect(storage.entries.map((entry) => entry.input.kind)).toEqual([
ConversationEntryKind.UserMessage,
ConversationEntryKind.ToolCall,
ConversationEntryKind.ToolResult,
ConversationEntryKind.SystemNote,
])
const toolCall = storage.entries[1]!.input
if (toolCall.kind !== ConversationEntryKind.ToolCall) {
throw new Error("Expected tool call entry")
}
expect(toolCall.payload.toolName).toBe("freya_get_feed")
const toolResult = storage.entries[2]!.input
if (toolResult.kind !== ConversationEntryKind.ToolResult) {
throw new Error("Expected tool result entry")
}
expect(toolResult.payload.ok).toBe(true)
const systemNote = storage.entries[3]!.input
if (systemNote.kind !== ConversationEntryKind.SystemNote) {
throw new Error("Expected system note entry")
}
expect(systemNote.payload).toMatchObject({
type: "agent_error",
message: "model unavailable",
})
})
test("records compaction events as context summaries", async () => {
const queryAgent = new FakeQueryAgent([
{ type: "text_delta", text: "Kept answer." },
{ type: "done" },
])
const storage = new FakeConversationStorage()
const agent = new ConversationRecordingQueryAgent({
agent: queryAgent,
storage,
defaultConversationId: "conversation-1",
modelProvider: "openrouter",
modelId: "test-model",
})
const forwardedCompactions: QueryAgentCompactionEvent[] = []
agent.addEventListener(QueryAgentEvent.Compaction, (event) => {
forwardedCompactions.push(event)
})
await collectEvents(
agent.ask({
message: "remember this",
}),
)
await queryAgent.emitCompaction({
type: QueryAgentEvent.Compaction,
conversationId: "conversation-1",
summary: "The user prefers compact summaries.",
firstKeptEntryId: "pi-entry-7",
compactedEntryRange: {
startSequence: 1,
endSequence: 1,
},
tokensBefore: 1234,
details: { reason: "threshold" },
fromExtension: false,
})
const summaryEntry = storage.entries.at(-1)?.input
if (summaryEntry?.kind !== ConversationEntryKind.ContextSummary) {
throw new Error("Expected context summary entry")
}
expect(summaryEntry.payload.covers).toEqual({
startSequence: 1,
endSequence: 1,
})
expect(summaryEntry.payload.summary.importantDetails).toEqual([
"The user prefers compact summaries.",
])
expect(summaryEntry.metadata?.piCompaction).toMatchObject({
firstKeptEntryId: "pi-entry-7",
tokensBefore: 1234,
fromExtension: false,
details: { reason: "threshold" },
})
expect(forwardedCompactions).toHaveLength(1)
})
})
async function collectEvents(
events: AsyncIterable<QueryAgentStreamEvent>,
): Promise<QueryAgentStreamEvent[]> {
const result: QueryAgentStreamEvent[] = []
for await (const event of events) {
result.push(event)
}
return result
}

View File

@@ -0,0 +1,252 @@
import { randomUUID } from "node:crypto"
import type {
AppendConversationEntryInput,
ConversationEntryRow,
} from "../conversations/storage.ts"
import type { ConversationEntryMetadata } from "../conversations/types.ts"
import { ConversationEntryKind } from "../conversations/types.ts"
import {
createQueryAgentEventListeners,
QueryAgentEvent,
type QueryAgent,
type QueryAgentAsk,
type QueryAgentCompactionEvent,
type QueryAgentEventListeners,
type QueryAgentEventListener,
type QueryAgentEventMap,
type QueryAgentStreamEvent,
} from "./query-agent.ts"
export interface ConversationStorage {
getOrCreateConversation(): Promise<{ id: string }>
appendEntry(
conversationId: string,
input: AppendConversationEntryInput,
): Promise<ConversationStorageEntry>
listEntries(conversationId: string): Promise<ConversationStorageEntry[]>
}
export type ConversationStorageEntry = Pick<
ConversationEntryRow,
"id" | "sequence" | "kind" | "payload" | "metadata" | "createdAt"
>
export interface ConversationRecordingQueryAgentConfig {
agent: QueryAgent
storage: ConversationStorage
defaultConversationId?: string
route?: string
modelProvider: string
modelId: string
}
const DefaultRoute = "agent_query"
export class ConversationRecordingQueryAgent implements QueryAgent {
private readonly agent: QueryAgent
private readonly storage: ConversationStorage
private readonly defaultConversationId: string | undefined
private readonly route: string
private readonly modelProvider: string
private readonly modelId: string
private readonly eventListeners = createQueryAgentEventListeners()
private readonly removeAgentCompactionListener: () => void
constructor(config: ConversationRecordingQueryAgentConfig) {
this.agent = config.agent
this.storage = config.storage
this.defaultConversationId = config.defaultConversationId
this.route = config.route ?? DefaultRoute
this.modelProvider = config.modelProvider
this.modelId = config.modelId
this.removeAgentCompactionListener = this.agent.addEventListener(
QueryAgentEvent.Compaction,
async (event) => {
await this.appendCompactionSummary(event)
await this.emitEvent(event)
},
)
}
async *ask(input: QueryAgentAsk): AsyncIterable<QueryAgentStreamEvent> {
if (
this.defaultConversationId &&
input.conversationId &&
input.conversationId !== this.defaultConversationId
) {
yield {
type: "error",
message: "Conversation switching is not supported for this session",
}
return
}
const conversationId =
input.conversationId ??
this.defaultConversationId ??
(await this.storage.getOrCreateConversation()).id
const runId = randomUUID()
const userEntry = await this.storage.appendEntry(conversationId, {
kind: ConversationEntryKind.UserMessage,
payload: {
role: "user",
parts: [{ type: "text", text: input.message }],
},
metadata: { runId },
})
yield { type: "conversation", conversationId }
const assistantText: string[] = []
for await (const event of this.agent.ask({
...input,
conversationId,
userMessageEntry: {
id: userEntry.id,
sequence: userEntry.sequence,
},
})) {
switch (event.type) {
case "conversation":
break
case "text_delta":
assistantText.push(event.text)
yield event
break
case "tool_start":
await this.storage.appendEntry(conversationId, {
kind: ConversationEntryKind.ToolCall,
payload: {
toolName: event.toolName,
runId,
},
metadata: { runId },
})
yield event
break
case "tool_end":
await this.storage.appendEntry(conversationId, {
kind: ConversationEntryKind.ToolResult,
payload: {
toolName: event.toolName,
ok: event.ok,
runId,
},
metadata: { runId },
})
yield event
break
case "error":
await this.storage.appendEntry(conversationId, {
kind: ConversationEntryKind.SystemNote,
payload: {
type: "agent_error",
message: event.message,
runId,
},
metadata: { runId },
})
yield event
return
case "done":
await this.appendAssistantMessage(conversationId, assistantText, runId)
yield event
return
}
}
await this.appendAssistantMessage(conversationId, assistantText, runId)
}
dispose(): void {
this.removeAgentCompactionListener()
this.clearEventListeners()
this.agent.dispose()
}
addEventListener<T extends QueryAgentEvent>(
type: T,
listener: QueryAgentEventListener<T>,
): () => void {
const listeners = this.listenersFor(type)
listeners.add(listener)
return () => {
listeners.delete(listener)
}
}
private async appendAssistantMessage(
conversationId: string,
assistantText: string[],
runId: string,
): Promise<void> {
const text = assistantText.join("")
if (text.length === 0) return
await this.storage.appendEntry(conversationId, {
kind: ConversationEntryKind.AssistantMessage,
payload: {
role: "assistant",
parts: [{ type: "text", text }],
},
metadata: this.modelRunMetadata(runId),
})
}
private modelRunMetadata(runId: string): ConversationEntryMetadata {
const metadata: ConversationEntryMetadata = { runId }
metadata.modelRun = {
route: this.route,
provider: this.modelProvider,
model: this.modelId,
}
return metadata
}
private async appendCompactionSummary(event: QueryAgentCompactionEvent): Promise<void> {
if (event.compactedEntryRange === null) return
await this.storage.appendEntry(event.conversationId, {
kind: ConversationEntryKind.ContextSummary,
payload: {
covers: event.compactedEntryRange,
summary: {
durableFacts: [],
preferences: [],
decisions: [],
openTasks: [],
importantDetails: [event.summary],
},
promptVersion: "pi-sdk-compaction-v1",
},
metadata: {
piCompaction: {
firstKeptEntryId: event.firstKeptEntryId,
tokensBefore: event.tokensBefore,
fromExtension: event.fromExtension,
details: event.details,
},
},
})
}
private async emitEvent<T extends QueryAgentEvent>(event: QueryAgentEventMap[T]): Promise<void> {
const listeners = this.listenersFor(event.type)
for (const listener of listeners) {
await listener(event)
}
}
private listenersFor<T extends QueryAgentEvent>(type: T): QueryAgentEventListeners[T] {
return this.eventListeners[type]
}
private clearEventListeners(): void {
for (const listeners of Object.values(this.eventListeners)) {
listeners.clear()
}
}
}

View File

@@ -3,7 +3,13 @@ import { Hono } from "hono"
import type { UserSessionManager } from "../session/index.ts" import type { UserSessionManager } from "../session/index.ts"
import type { QueryDebugTools, QueryDebugToolDefinition } from "./debug-tools.ts" import type { QueryDebugTools, QueryDebugToolDefinition } from "./debug-tools.ts"
import type { QueryAgent, QueryAgentAsk, QueryAgentEvent } from "./query-agent.ts" import type {
QueryAgent,
QueryAgentAsk,
QueryAgentEventListener,
QueryAgentStreamEvent,
QueryAgentEvent,
} from "./query-agent.ts"
import { mockAuthSessionMiddleware } from "../auth/session-middleware.ts" import { mockAuthSessionMiddleware } from "../auth/session-middleware.ts"
import { registerAgentHttpHandlers, registerDebugAgentHttpHandlers } from "./http.ts" import { registerAgentHttpHandlers, registerDebugAgentHttpHandlers } from "./http.ts"
@@ -12,19 +18,26 @@ const MockUserId = "k7Gx2mPqRvNwYs9TdLfA4bHcJeUo1iZn"
class FakeQueryAgent implements QueryAgent { class FakeQueryAgent implements QueryAgent {
readonly inputs: QueryAgentAsk[] = [] readonly inputs: QueryAgentAsk[] = []
private readonly events: QueryAgentEvent[] private readonly events: QueryAgentStreamEvent[]
constructor(events: QueryAgentEvent[]) { constructor(events: QueryAgentStreamEvent[]) {
this.events = events this.events = events
} }
async *ask(input: QueryAgentAsk): AsyncIterable<QueryAgentEvent> { async *ask(input: QueryAgentAsk): AsyncIterable<QueryAgentStreamEvent> {
this.inputs.push(input) this.inputs.push(input)
for (const event of this.events) { for (const event of this.events) {
yield event yield event
} }
} }
addEventListener<T extends QueryAgentEvent>(
_type: T,
_listener: QueryAgentEventListener<T>,
): () => void {
return () => {}
}
dispose(): void {} dispose(): void {}
} }
@@ -110,6 +123,27 @@ describe("POST /api/agent", () => {
expect(body.message).toBe("You should leave at 8:30.") expect(body.message).toBe("You should leave at 8:30.")
}) })
test("passes conversation id to the query agent", async () => {
const agent = new FakeQueryAgent([
{ type: "conversation", conversationId: "conversation-1" },
{ type: "done" },
])
const app = buildTestApp(agent, "user-1")
const res = await app.request("/api/agent", {
method: "POST",
body: JSON.stringify({
message: "Continue this chat.",
conversationId: "conversation-1",
}),
})
expect(res.status).toBe(200)
expect(agent.inputs[0]?.conversationId).toBe("conversation-1")
const body = (await res.json()) as { conversationId?: string }
expect(body.conversationId).toBe("conversation-1")
})
test("returns 400 for invalid body", async () => { test("returns 400 for invalid body", async () => {
const app = buildTestApp(new FakeQueryAgent([]), "user-1") const app = buildTestApp(new FakeQueryAgent([]), "user-1")

View File

@@ -35,6 +35,7 @@ interface AgentDebugHttpHandlersDeps {
const AgentAskRequestBody = type({ const AgentAskRequestBody = type({
"+": "reject", "+": "reject",
message: "string", message: "string",
"conversationId?": "string",
}) })
export function registerAgentHttpHandlers( export function registerAgentHttpHandlers(
@@ -82,6 +83,7 @@ async function handleAgentAsk(c: Context<Env>) {
const session = await sessionManager.getOrCreate(user.id) const session = await sessionManager.getOrCreate(user.id)
const response = await collectQueryAgentResponse(session.agent, { const response = await collectQueryAgentResponse(session.agent, {
message: parsed.message, message: parsed.message,
conversationId: parsed.conversationId,
}) })
return c.json(response) return c.json(response)
} catch (err) { } catch (err) {

View File

@@ -1,43 +0,0 @@
import { createExtensionRuntime, type ResourceLoader } from "@earendil-works/pi-coding-agent"
export class InMemoryResourceLoader implements ResourceLoader {
private readonly extensions: ReturnType<ResourceLoader["getExtensions"]> = {
extensions: [],
errors: [],
runtime: createExtensionRuntime(),
}
constructor(private readonly systemPrompt: string) {}
getExtensions(): ReturnType<ResourceLoader["getExtensions"]> {
return this.extensions
}
getSkills(): ReturnType<ResourceLoader["getSkills"]> {
return { skills: [], diagnostics: [] }
}
getPrompts(): ReturnType<ResourceLoader["getPrompts"]> {
return { prompts: [], diagnostics: [] }
}
getThemes(): ReturnType<ResourceLoader["getThemes"]> {
return { themes: [], diagnostics: [] }
}
getAgentsFiles(): ReturnType<ResourceLoader["getAgentsFiles"]> {
return { agentsFiles: [] }
}
getSystemPrompt(): string {
return this.systemPrompt
}
getAppendSystemPrompt(): string[] {
return []
}
extendResources(_paths: Parameters<ResourceLoader["extendResources"]>[0]): void {}
async reload(_options?: Parameters<ResourceLoader["reload"]>[0]): Promise<void> {}
}

View File

@@ -1,7 +1,10 @@
import { beforeEach, describe, expect, mock, test } from "bun:test" import { beforeEach, describe, expect, mock, test } from "bun:test"
import type { QueryAgentToolbox } from "./query-agent-toolbox.ts" import type { QueryAgentToolbox } from "./query-agent-toolbox.ts"
import type { QueryAgentEvent } from "./query-agent.ts" import type { QueryAgentStreamEvent } from "./query-agent.ts"
import { ConversationEntryKind } from "../conversations/types.ts"
import { QueryAgentEvent } from "./query-agent.ts"
interface FakePiSession { interface FakePiSession {
subscribe(listener: (event: unknown) => void): () => void subscribe(listener: (event: unknown) => void): () => void
@@ -9,6 +12,61 @@ interface FakePiSession {
dispose(): void dispose(): void
} }
type CapturedExtensionHandler = (event: unknown) => Promise<unknown> | unknown
interface CapturedExtensionApi {
on(event: string, handler: CapturedExtensionHandler): void
}
type CapturedExtensionFactory = (pi: CapturedExtensionApi) => Promise<void> | void
interface CapturedExtension {
handlers: Map<string, CapturedExtensionHandler[]>
}
interface CapturedResourceLoader {
getExtensions(): unknown
}
interface CapturedDefaultResourceLoaderOptions {
extensionFactories?: CapturedExtensionFactory[]
}
class FakeDefaultResourceLoader implements CapturedResourceLoader {
private readonly extensionFactories: CapturedExtensionFactory[]
private extensionsResult: { extensions: CapturedExtension[] }
constructor(options: unknown) {
this.extensionFactories = isDefaultResourceLoaderOptions(options)
? (options.extensionFactories ?? [])
: []
this.extensionsResult = { extensions: [] }
}
async reload(): Promise<void> {
const handlers: CapturedExtension["handlers"] = new Map()
const api: CapturedExtensionApi = {
on(event: string, handler: CapturedExtensionHandler): void {
const existing = handlers.get(event) ?? []
existing.push(handler)
handlers.set(event, existing)
},
}
for (const factory of this.extensionFactories) {
await factory(api)
}
this.extensionsResult = {
extensions: [{ handlers }],
}
}
getExtensions(): unknown {
return this.extensionsResult
}
}
let createAgentSessionCalls = 0 let createAgentSessionCalls = 0
let createAgentSessionOptions: unknown let createAgentSessionOptions: unknown
let runtimeApiKeyCalls: Array<{ provider: string; apiKey: string }> = [] let runtimeApiKeyCalls: Array<{ provider: string; apiKey: string }> = []
@@ -51,6 +109,44 @@ const fakeSession: FakePiSession = {
dispose(): void {}, dispose(): void {},
} }
class FakeSessionManager {
private messages: unknown[] = []
private compaction: { summary: string; tokensBefore: number; timestamp: number } | null = null
appendMessage(message: unknown): string {
this.messages.push(message)
return `message-${this.messages.length}`
}
appendCompaction(summary: string, _firstKeptEntryId: string, tokensBefore: number): string {
this.compaction = {
summary,
tokensBefore,
timestamp: Date.now(),
}
this.messages = []
return "compaction-1"
}
buildSessionContext(): unknown {
const messages = [...this.messages]
if (this.compaction) {
messages.unshift({
role: "compactionSummary",
summary: this.compaction.summary,
tokensBefore: this.compaction.tokensBefore,
timestamp: this.compaction.timestamp,
})
}
return {
messages,
thinkingLevel: "off",
model: modelFromMessages(messages),
}
}
}
mock.module("@earendil-works/pi-coding-agent", () => ({ mock.module("@earendil-works/pi-coding-agent", () => ({
AuthStorage: { AuthStorage: {
inMemory() { inMemory() {
@@ -71,6 +167,7 @@ mock.module("@earendil-works/pi-coding-agent", () => ({
createExtensionRuntime() { createExtensionRuntime() {
return {} return {}
}, },
DefaultResourceLoader: FakeDefaultResourceLoader,
defineTool(tool: unknown): unknown { defineTool(tool: unknown): unknown {
return tool return tool
}, },
@@ -86,7 +183,7 @@ mock.module("@earendil-works/pi-coding-agent", () => ({
}, },
SessionManager: { SessionManager: {
inMemory(_cwd: string): unknown { inMemory(_cwd: string): unknown {
return {} return new FakeSessionManager()
}, },
}, },
SettingsManager: { SettingsManager: {
@@ -131,7 +228,6 @@ describe("PiQueryAgent", () => {
test("rejects a concurrent first query while the Pi session is being created", async () => { test("rejects a concurrent first query while the Pi session is being created", async () => {
const { PiQueryAgent } = await import("./pi-query-agent.ts") const { PiQueryAgent } = await import("./pi-query-agent.ts")
const agent = new PiQueryAgent({ const agent = new PiQueryAgent({
userId: "user-1",
toolbox: createStubToolbox(), toolbox: createStubToolbox(),
apiKey: "test-api-key", apiKey: "test-api-key",
cwd: "/tmp/freya-pi-query-agent-test", cwd: "/tmp/freya-pi-query-agent-test",
@@ -155,7 +251,7 @@ describe("PiQueryAgent", () => {
expect(secondEvents).toEqual([ expect(secondEvents).toEqual([
{ {
type: "error", type: "error",
message: "A query is already running for this user", message: "A query is already running",
}, },
]) ])
expect(createAgentSessionCalls).toBe(1) expect(createAgentSessionCalls).toBe(1)
@@ -175,6 +271,228 @@ describe("PiQueryAgent", () => {
} }
expect("agentDir" in createAgentSessionOptions).toBe(false) expect("agentDir" in createAgentSessionOptions).toBe(false)
expect(createAgentSessionOptions.resourceLoader).toBeDefined() expect(createAgentSessionOptions.resourceLoader).toBeDefined()
expect(typeof sessionCompactHandlerFromCapturedOptions()).toBe("function")
agent.dispose()
})
test("hydrates initial entries into the Pi session manager", async () => {
const { PiQueryAgent } = await import("./pi-query-agent.ts")
const agent = new PiQueryAgent({
toolbox: createStubToolbox(),
cwd: "/tmp/freya-pi-query-agent-test",
systemPrompt: "test",
initialEntries: [
{
id: "entry-1",
sequence: 1,
kind: ConversationEntryKind.UserMessage,
payload: {
role: "user",
parts: [{ type: "text", text: "stored hello" }],
},
metadata: {},
createdAt: new Date("2026-06-15T00:00:00.000Z"),
},
{
id: "entry-2",
sequence: 2,
kind: ConversationEntryKind.AssistantMessage,
payload: {
role: "assistant",
parts: [{ type: "text", text: "stored reply" }],
},
metadata: {},
createdAt: new Date("2026-06-15T00:00:01.000Z"),
},
],
})
const events = collectEvents(
agent.ask({
message: "hello",
}),
)
await sessionCreationStarted
if (!isRecord(createAgentSessionOptions)) {
throw new Error("createAgentSession options were not captured")
}
const sessionManager = createAgentSessionOptions.sessionManager
if (!(sessionManager instanceof FakeSessionManager)) {
throw new Error("session manager was not hydrated by PiQueryAgent")
}
const context = sessionManager.buildSessionContext()
if (!isRecord(context) || !Array.isArray(context.messages)) {
throw new Error("session context messages were not captured")
}
expect(context.messages[0]).toEqual({
role: "user",
content: "stored hello",
timestamp: new Date("2026-06-15T00:00:00.000Z").getTime(),
})
expect(context.messages[1]).toMatchObject({
role: "assistant",
provider: "openrouter",
model: "z-ai/glm-4.7-flash",
stopReason: "stop",
timestamp: new Date("2026-06-15T00:00:01.000Z").getTime(),
})
releaseSessionCreation()
await promptStarted
releasePrompt()
expect(await events).toEqual([{ type: "done" }])
agent.dispose()
})
test("emits Pi compaction events for the active conversation", async () => {
const recordedCompactions: unknown[] = []
const { PiQueryAgent } = await import("./pi-query-agent.ts")
const agent = new PiQueryAgent({
toolbox: createStubToolbox(),
cwd: "/tmp/freya-pi-query-agent-test",
systemPrompt: "test",
})
agent.addEventListener(QueryAgentEvent.Compaction, (event) => {
recordedCompactions.push(event)
})
const events = collectEvents(
agent.ask({
conversationId: "conversation-1",
message: "hello",
}),
)
await sessionCreationStarted
releaseSessionCreation()
await promptStarted
const handler = sessionCompactHandlerFromCapturedOptions()
await handler({
type: "session_compact",
fromExtension: false,
compactionEntry: {
type: "compaction",
id: "pi-compaction-1",
timestamp: "2026-06-15T00:00:00.000Z",
summary: "The user prefers concise updates.",
firstKeptEntryId: "pi-entry-7",
tokensBefore: 1234,
details: { reason: "threshold" },
},
})
expect(recordedCompactions).toEqual([
{
type: QueryAgentEvent.Compaction,
conversationId: "conversation-1",
summary: "The user prefers concise updates.",
firstKeptEntryId: "pi-entry-7",
compactedEntryRange: null,
tokensBefore: 1234,
details: { reason: "threshold" },
fromExtension: false,
},
])
releasePrompt()
expect(await events).toEqual([{ type: "done" }])
expect(unsubscribeCalls).toBe(1)
agent.dispose()
})
test("emits Freya coverage through the entry before Pi's kept boundary", async () => {
const recordedCompactions: unknown[] = []
const { PiQueryAgent } = await import("./pi-query-agent.ts")
const agent = new PiQueryAgent({
toolbox: createStubToolbox(),
cwd: "/tmp/freya-pi-query-agent-test",
systemPrompt: "test",
initialEntries: [
{
id: "entry-1",
sequence: 1,
kind: ConversationEntryKind.UserMessage,
payload: {
role: "user",
parts: [{ type: "text", text: "old hello" }],
},
metadata: {},
createdAt: new Date("2026-06-15T00:00:00.000Z"),
},
{
id: "entry-2",
sequence: 2,
kind: ConversationEntryKind.AssistantMessage,
payload: {
role: "assistant",
parts: [{ type: "text", text: "kept reply" }],
},
metadata: {},
createdAt: new Date("2026-06-15T00:00:01.000Z"),
},
],
})
agent.addEventListener(QueryAgentEvent.Compaction, (event) => {
recordedCompactions.push(event)
})
const events = collectEvents(
agent.ask({
conversationId: "conversation-1",
message: "hello",
}),
)
await sessionCreationStarted
await extensionHandlerFromCapturedOptions("session_before_compact")({
type: "session_before_compact",
preparation: {
firstKeptEntryId: "message-2",
},
branchEntries: [{ id: "message-1" }, { id: "message-2" }],
})
await extensionHandlerFromCapturedOptions("session_compact")({
type: "session_compact",
fromExtension: false,
compactionEntry: {
type: "compaction",
id: "pi-compaction-1",
timestamp: "2026-06-15T00:00:00.000Z",
summary: "Old hello was discussed.",
firstKeptEntryId: "message-2",
tokensBefore: 1234,
},
})
expect(recordedCompactions).toEqual([
{
type: QueryAgentEvent.Compaction,
conversationId: "conversation-1",
summary: "Old hello was discussed.",
firstKeptEntryId: "message-2",
compactedEntryRange: {
startSequence: 1,
endSequence: 1,
},
tokensBefore: 1234,
details: undefined,
fromExtension: false,
},
])
releaseSessionCreation()
await promptStarted
releasePrompt()
expect(await events).toEqual([{ type: "done" }])
agent.dispose() agent.dispose()
}) })
@@ -182,7 +500,6 @@ describe("PiQueryAgent", () => {
test("surfaces Pi message_end provider errors instead of done", async () => { test("surfaces Pi message_end provider errors instead of done", async () => {
const { PiQueryAgent } = await import("./pi-query-agent.ts") const { PiQueryAgent } = await import("./pi-query-agent.ts")
const agent = new PiQueryAgent({ const agent = new PiQueryAgent({
userId: "user-1",
toolbox: createStubToolbox(), toolbox: createStubToolbox(),
cwd: "/tmp/freya-pi-query-agent-test", cwd: "/tmp/freya-pi-query-agent-test",
systemPrompt: "test", systemPrompt: "test",
@@ -219,7 +536,6 @@ describe("PiQueryAgent", () => {
test("surfaces Pi agent_end provider errors instead of done", async () => { test("surfaces Pi agent_end provider errors instead of done", async () => {
const { PiQueryAgent } = await import("./pi-query-agent.ts") const { PiQueryAgent } = await import("./pi-query-agent.ts")
const agent = new PiQueryAgent({ const agent = new PiQueryAgent({
userId: "user-1",
toolbox: createStubToolbox(), toolbox: createStubToolbox(),
cwd: "/tmp/freya-pi-query-agent-test", cwd: "/tmp/freya-pi-query-agent-test",
systemPrompt: "test", systemPrompt: "test",
@@ -256,8 +572,10 @@ describe("PiQueryAgent", () => {
}) })
}) })
async function collectEvents(events: AsyncIterable<QueryAgentEvent>): Promise<QueryAgentEvent[]> { async function collectEvents(
const result: QueryAgentEvent[] = [] events: AsyncIterable<QueryAgentStreamEvent>,
): Promise<QueryAgentStreamEvent[]> {
const result: QueryAgentStreamEvent[] = []
for await (const event of events) { for await (const event of events) {
result.push(event) result.push(event)
} }
@@ -290,6 +608,79 @@ function createStubToolbox(): QueryAgentToolbox {
} }
} }
function sessionCompactHandlerFromCapturedOptions(): CapturedExtensionHandler {
return extensionHandlerFromCapturedOptions("session_compact")
}
function extensionHandlerFromCapturedOptions(eventName: string): CapturedExtensionHandler {
if (!isRecord(createAgentSessionOptions)) {
throw new Error("createAgentSession options were not captured")
}
const resourceLoader = createAgentSessionOptions.resourceLoader
if (!isCapturedResourceLoader(resourceLoader)) {
throw new Error("resourceLoader was not captured")
}
const extensionsResult = resourceLoader.getExtensions()
if (!isRecord(extensionsResult) || !Array.isArray(extensionsResult.extensions)) {
throw new Error("extensions were not captured")
}
const extension = extensionsResult.extensions[0]
if (!isCapturedExtension(extension)) {
throw new Error("compaction extension was not captured")
}
const handlers = extension.handlers.get(eventName)
const handler = handlers?.[0]
if (!handler) {
throw new Error(`${eventName} handler was not captured`)
}
return handler
}
function isCapturedResourceLoader(value: unknown): value is CapturedResourceLoader {
return isRecord(value) && typeof value.getExtensions === "function"
}
function isCapturedExtension(value: unknown): value is CapturedExtension {
return isRecord(value) && value.handlers instanceof Map
}
function isDefaultResourceLoaderOptions(
value: unknown,
): value is CapturedDefaultResourceLoaderOptions {
return (
isRecord(value) &&
(value.extensionFactories === undefined ||
(Array.isArray(value.extensionFactories) &&
value.extensionFactories.every(isCapturedExtensionFactory)))
)
}
function isCapturedExtensionFactory(value: unknown): value is CapturedExtensionFactory {
return typeof value === "function"
}
function modelFromMessages(messages: unknown[]): { provider: string; modelId: string } | null {
let model: { provider: string; modelId: string } | null = null
for (const message of messages) {
if (!isRecord(message)) continue
if (message.role !== "assistant") continue
if (typeof message.provider !== "string" || typeof message.model !== "string") continue
model = {
provider: message.provider,
modelId: message.model,
}
}
return model
}
function isRecord(value: unknown): value is Record<string, unknown> { function isRecord(value: unknown): value is Record<string, unknown> {
return typeof value === "object" && value !== null return typeof value === "object" && value !== null
} }

View File

@@ -1,73 +1,119 @@
import type { AgentSessionEvent } from "@earendil-works/pi-coding-agent" import type {
AgentSessionEvent,
ExtensionFactory,
SessionEntry,
} from "@earendil-works/pi-coding-agent"
import { import {
AuthStorage, AuthStorage,
createAgentSession, createAgentSession,
DefaultResourceLoader,
ModelRegistry, ModelRegistry,
SessionManager,
SettingsManager, SettingsManager,
} from "@earendil-works/pi-coding-agent" } from "@earendil-works/pi-coding-agent"
import { tmpdir } from "node:os" import { tmpdir } from "node:os"
import type { ConversationStorageEntry } from "./conversation-recording-query-agent.ts"
import type { QueryAgentToolbox } from "./query-agent-toolbox.ts" import type { QueryAgentToolbox } from "./query-agent-toolbox.ts"
import type { QueryAgent, QueryAgentAsk, QueryAgentEvent } from "./query-agent.ts"
import { InMemoryResourceLoader } from "./in-memory-resource-loader.ts"
import defaultSystemPrompt from "./prompts/system.txt" import defaultSystemPrompt from "./prompts/system.txt"
import {
createQueryAgentEventListeners,
QueryAgentEvent,
type QueryAgent,
type QueryAgentAsk,
type QueryAgentCompactedEntryRange,
type QueryAgentCompactionEvent,
type QueryAgentConversationEntryRef,
type QueryAgentEventListeners,
type QueryAgentEventListener,
type QueryAgentEventMap,
type QueryAgentStreamEvent,
} from "./query-agent.ts"
import { createSessionManager } from "./session-manager.ts"
import { createFreyaAgentTools, FREYA_AGENT_TOOL_NAMES } from "./tools.ts" import { createFreyaAgentTools, FREYA_AGENT_TOOL_NAMES } from "./tools.ts"
type PiSession = Awaited<ReturnType<typeof createAgentSession>>["session"] type PiSession = Awaited<ReturnType<typeof createAgentSession>>["session"]
type PiMessageEndEvent = Extract<AgentSessionEvent, { type: "message_end" }> type PiMessageEndEvent = Extract<AgentSessionEvent, { type: "message_end" }>
type PiAgentMessage = PiMessageEndEvent["message"] type PiAgentMessage = PiMessageEndEvent["message"]
type PiAgentEndEvent = Extract<AgentSessionEvent, { type: "agent_end" }> type PiAgentEndEvent = Extract<AgentSessionEvent, { type: "agent_end" }>
type PiSessionManager = ReturnType<typeof createSessionManager>
type PiSessionMessage = Parameters<PiSessionManager["appendMessage"]>[0]
export interface PiQueryAgentConfig { export interface PiQueryAgentConfig {
userId: string
toolbox: QueryAgentToolbox toolbox: QueryAgentToolbox
apiKey?: string apiKey?: string
cwd?: string cwd?: string
systemPrompt?: string systemPrompt?: string
initialEntries?: ConversationStorageEntry[]
} }
const MODEL_PROVIDER = "openrouter" export const PI_MODEL_PROVIDER = "openrouter"
const MODEL_ID = "z-ai/glm-4.7-flash" export const PI_MODEL_ID = "z-ai/glm-4.7-flash"
export class PiQueryAgent implements QueryAgent { export class PiQueryAgent implements QueryAgent {
private readonly userId: string
private readonly toolbox: QueryAgentToolbox private readonly toolbox: QueryAgentToolbox
private readonly cwd: string private readonly cwd: string
private readonly systemPrompt: string private readonly systemPrompt: string
private readonly apiKey: string | undefined private readonly apiKey: string | undefined
private readonly initialEntries: ConversationStorageEntry[]
private readonly eventListeners = createQueryAgentEventListeners()
private session: PiSession | null = null private session: PiSession | null = null
private pendingSession: Promise<PiSession> | null = null private pendingSession: Promise<PiSession> | null = null
private activeRun: symbol | null = null /**
* Conversation currently receiving Pi events for an active ask().
*
* Pi's compaction hook fires from the SDK session rather than from our
* QueryAgent call stack, so the hook reads this value to attach the
* compaction summary to the right Freya conversation. null means no active
* run; "" means a run is active but no Freya conversation id was supplied.
*/
private activeConversationId: string | null = null
/**
* Freya entry for the user message currently being handed to Pi.
*
* ConversationRecordingQueryAgent appends the user message before calling
* PiQueryAgent. Pi later persists its own copy of that user message into its
* SessionManager, and this one-shot reference lets us map Pi's generated
* session entry id back to the Freya sequence.
*/
private activeUserMessageEntry: QueryAgentConversationEntryRef | null = null
/**
* Maps Pi SessionManager entry ids to Freya conversation sequences.
*
* Pi compaction reports boundaries with Pi entry ids, while our DB replay
* logic uses monotonically increasing Freya sequences. This map is the bridge
* that lets us translate Pi's firstKeptEntryId into a compacted entry range.
*/
private readonly piEntryConversationSequences = new Map<string, number>()
private disposed = false private disposed = false
constructor(config: PiQueryAgentConfig) { constructor(config: PiQueryAgentConfig) {
this.userId = config.userId
this.toolbox = config.toolbox this.toolbox = config.toolbox
this.apiKey = config.apiKey this.apiKey = config.apiKey
this.cwd = config.cwd ?? tmpdir() this.cwd = config.cwd ?? tmpdir()
this.systemPrompt = config.systemPrompt ?? defaultSystemPrompt this.systemPrompt = config.systemPrompt ?? defaultSystemPrompt
this.initialEntries = config.initialEntries ?? []
} }
async *ask(input: QueryAgentAsk): AsyncIterable<QueryAgentEvent> { async *ask(input: QueryAgentAsk): AsyncIterable<QueryAgentStreamEvent> {
if (this.activeRun) { if (this.activeConversationId !== null) {
yield { yield {
type: "error", type: "error",
message: "A query is already running for this user", message: "A query is already running",
} }
return return
} }
const run = Symbol(this.userId) this.activeConversationId = input.conversationId ?? ""
this.activeRun = run this.activeUserMessageEntry = input.userMessageEntry ?? null
let session: PiSession let session: PiSession
try { try {
session = await this.getOrCreateSession() session = await this.getOrCreateSession()
} catch (err) { } catch (err) {
this.clearActiveRun(run) this.activeConversationId = null
this.activeUserMessageEntry = null
yield { yield {
type: "error", type: "error",
message: `Failed to create query session: ${errorMessage(err)}`, message: `Failed to create query session: ${errorMessage(err)}`,
@@ -75,11 +121,11 @@ export class PiQueryAgent implements QueryAgent {
return return
} }
const events: QueryAgentEvent[] = [] const events: QueryAgentStreamEvent[] = []
let closed = false let closed = false
let wake: (() => void) | null = null let wake: (() => void) | null = null
function push(event: QueryAgentEvent): void { function push(event: QueryAgentStreamEvent): void {
events.push(event) events.push(event)
if (wake) { if (wake) {
wake() wake()
@@ -88,7 +134,7 @@ export class PiQueryAgent implements QueryAgent {
} }
let runFailed = false let runFailed = false
function pushRunEvent(event: QueryAgentEvent): void { function pushRunEvent(event: QueryAgentStreamEvent): void {
if (event.type === "error") { if (event.type === "error") {
if (runFailed) return if (runFailed) return
runFailed = true runFailed = true
@@ -108,7 +154,8 @@ export class PiQueryAgent implements QueryAgent {
this.handlePiEvent(event, pushRunEvent) this.handlePiEvent(event, pushRunEvent)
}) })
void this.runPrompt(session, input) session
.prompt(input.message)
.then(() => { .then(() => {
if (runFailed) return if (runFailed) return
pushRunEvent({ type: "done" }) pushRunEvent({ type: "done" })
@@ -118,7 +165,8 @@ export class PiQueryAgent implements QueryAgent {
}) })
.finally(() => { .finally(() => {
unsubscribe() unsubscribe()
this.clearActiveRun(run) this.activeConversationId = null
this.activeUserMessageEntry = null
close() close()
}) })
@@ -140,12 +188,19 @@ export class PiQueryAgent implements QueryAgent {
this.session?.dispose() this.session?.dispose()
this.session = null this.session = null
this.pendingSession = null this.pendingSession = null
this.activeRun = null this.activeConversationId = null
this.activeUserMessageEntry = null
this.clearEventListeners()
} }
private clearActiveRun(run: symbol): void { addEventListener<T extends QueryAgentEvent>(
if (this.activeRun === run) { type: T,
this.activeRun = null listener: QueryAgentEventListener<T>,
): () => void {
const listeners = this.listenersFor(type)
listeners.add(listener)
return () => {
listeners.delete(listener)
} }
} }
@@ -184,38 +239,200 @@ export class PiQueryAgent implements QueryAgent {
}) })
const authStorage = AuthStorage.inMemory() const authStorage = AuthStorage.inMemory()
if (this.apiKey) { if (this.apiKey) {
authStorage.setRuntimeApiKey(MODEL_PROVIDER, this.apiKey) authStorage.setRuntimeApiKey(PI_MODEL_PROVIDER, this.apiKey)
} }
const modelRegistry = ModelRegistry.inMemory(authStorage) const modelRegistry = ModelRegistry.inMemory(authStorage)
const model = modelRegistry.find(MODEL_PROVIDER, MODEL_ID) const model = modelRegistry.find(PI_MODEL_PROVIDER, PI_MODEL_ID)
if (!model) { if (!model) {
throw new Error(`Pi model not found: ${MODEL_PROVIDER}/${MODEL_ID}`) throw new Error(`Pi model not found: ${PI_MODEL_PROVIDER}/${PI_MODEL_ID}`)
} }
const resourceLoader = new DefaultResourceLoader({
cwd: this.cwd,
agentDir: this.cwd,
settingsManager,
systemPrompt: this.systemPrompt,
extensionFactories: [this.createCompactionExtension()],
noExtensions: true,
noSkills: true,
noPromptTemplates: true,
noThemes: true,
noContextFiles: true,
})
await resourceLoader.reload()
const sessionManager = this.createMappedSessionManager()
const { session } = await createAgentSession({ const { session } = await createAgentSession({
cwd: this.cwd, cwd: this.cwd,
authStorage, authStorage,
modelRegistry, modelRegistry,
model, model,
resourceLoader: new InMemoryResourceLoader(this.systemPrompt), resourceLoader,
settingsManager, settingsManager,
sessionManager: SessionManager.inMemory(this.cwd), sessionManager,
noTools: "builtin", noTools: "builtin",
customTools: createFreyaAgentTools({ customTools: createFreyaAgentTools({
toolbox: this.toolbox, toolbox: this.toolbox,
}), }),
tools: [...FREYA_AGENT_TOOL_NAMES], tools: FREYA_AGENT_TOOL_NAMES,
}) })
return session return session
} }
private async runPrompt(session: PiSession, input: QueryAgentAsk): Promise<void> { /**
await session.prompt(input.message) * Creates Pi's SessionManager and records Pi-id -> Freya-sequence mappings.
*
* Hydrated DB messages are mapped through createSessionManager's callback.
* Live user messages are mapped by wrapping appendMessage(), because Pi owns
* the generated session entry id for messages written during prompt handling.
*/
private createMappedSessionManager(): PiSessionManager {
this.piEntryConversationSequences.clear()
const sessionManager = createSessionManager({
cwd: this.cwd,
entries: this.initialEntries,
modelProvider: PI_MODEL_PROVIDER,
modelId: PI_MODEL_ID,
onMessageEntryAppended: (piEntryId, entry) => {
this.piEntryConversationSequences.set(piEntryId, entry.sequence)
},
})
const appendMessage = sessionManager.appendMessage.bind(sessionManager)
sessionManager.appendMessage = (message: PiSessionMessage): string => {
const piEntryId = appendMessage(message)
const sequence = this.liveConversationSequenceForMessage(message)
if (sequence !== null) {
this.piEntryConversationSequences.set(piEntryId, sequence)
}
return piEntryId
}
return sessionManager
} }
private handlePiEvent(event: AgentSessionEvent, push: (event: QueryAgentEvent) => void): void { /**
* Returns the Freya sequence for Pi's persisted live user message.
*
* We only map user messages here because they are the messages Freya writes
* before invoking Pi. Assistant/tool entries are recorded from the stream
* outside Pi's SessionManager and do not have a stable live Pi id available
* at the storage boundary.
*/
private liveConversationSequenceForMessage(message: PiSessionMessage): number | null {
if (message.role !== "user") return null
const entry = this.activeUserMessageEntry
this.activeUserMessageEntry = null
if (!entry) return null
return entry.sequence
}
/**
* Installs the minimal Pi extension used to observe compaction.
*
* session_before_compact gives us the full branch plus firstKeptEntryId, so
* we translate that boundary before Pi writes the compaction entry. The later
* session_compact event carries the saved summary, which we forward with the
* cached Freya compacted entry range.
*/
private createCompactionExtension(): ExtensionFactory {
return (pi) => {
/**
* Temporary handoff between Pi's before/after compaction hooks.
*
* session_compact receives the saved compaction entry, not the original
* branch entries needed for boundary translation.
*/
let pendingCompactedEntryRange: QueryAgentCompactedEntryRange | null = null
pi.on("session_before_compact", async (event) => {
pendingCompactedEntryRange = this.compactedEntryRangeBeforePiEntry(
event.branchEntries,
event.preparation.firstKeptEntryId,
)
})
pi.on("session_compact", async (event) => {
const conversationId = this.activeConversationId
if (!conversationId) return
const entry = event.compactionEntry
const compactedEntryRange = pendingCompactedEntryRange
pendingCompactedEntryRange = null
const compactionEvent: QueryAgentCompactionEvent = {
type: QueryAgentEvent.Compaction,
conversationId,
summary: entry.summary,
firstKeptEntryId: entry.firstKeptEntryId,
compactedEntryRange,
tokensBefore: entry.tokensBefore,
details: entry.details,
fromExtension: event.fromExtension,
}
await this.emitEvent(compactionEvent)
})
}
}
/**
* Returns the Freya entry range compacted before a Pi session entry.
*
* Pi keeps firstKeptEntryId and everything after it as raw context. Therefore
* the summary covers only mapped entries before that Pi entry. If none of
* those entries map back to Freya, we return null so storage can avoid
* recording a summary with an unsafe coverage range.
*/
private compactedEntryRangeBeforePiEntry(
branchEntries: SessionEntry[],
piEntryId: string,
): QueryAgentCompactedEntryRange | null {
let endSequence: number | null = null
for (const entry of branchEntries) {
if (entry.id === piEntryId) {
if (endSequence === null) return null
return {
startSequence: 1,
endSequence,
}
}
const sequence = this.piEntryConversationSequences.get(entry.id)
if (typeof sequence === "number") {
endSequence = sequence
}
}
return null
}
private async emitEvent<T extends QueryAgentEvent>(event: QueryAgentEventMap[T]): Promise<void> {
const listeners = this.listenersFor(event.type)
for (const listener of listeners) {
await listener(event)
}
}
private listenersFor<T extends QueryAgentEvent>(type: T): QueryAgentEventListeners[T] {
return this.eventListeners[type]
}
private clearEventListeners(): void {
for (const listeners of Object.values(this.eventListeners)) {
listeners.clear()
}
}
private handlePiEvent(
event: AgentSessionEvent,
push: (event: QueryAgentStreamEvent) => void,
): void {
switch (event.type) { switch (event.type) {
case "message_end": { case "message_end": {
const message = piAssistantMessageError(event.message) const message = piAssistantMessageError(event.message)
@@ -283,7 +500,6 @@ function piAssistantMessageError(message: PiAgentMessage): string | null {
case "toolUse": case "toolUse":
return null return null
} }
return null
default: default:
return null return null
} }

View File

@@ -1,21 +1,74 @@
export interface QueryAgentAsk { export interface QueryAgentAsk {
message: string message: string
conversationId?: string
userMessageEntry?: QueryAgentConversationEntryRef
} }
export type QueryAgentEvent = export type QueryAgentStreamEvent =
| { type: "conversation"; conversationId: string }
| { type: "text_delta"; text: string } | { type: "text_delta"; text: string }
| { type: "tool_start"; toolName: string } | { type: "tool_start"; toolName: string }
| { type: "tool_end"; toolName: string; ok: boolean } | { type: "tool_end"; toolName: string; ok: boolean }
| { type: "done" } | { type: "done" }
| { type: "error"; message: string } | { type: "error"; message: string }
export const QueryAgentEvent = {
Compaction: "compaction",
} as const
export type QueryAgentEvent = (typeof QueryAgentEvent)[keyof typeof QueryAgentEvent]
export interface QueryAgentConversationEntryRef {
id: string
sequence: number
}
export interface QueryAgentCompactedEntryRange {
startSequence: number
endSequence: number
}
export interface QueryAgentCompactionEvent {
type: typeof QueryAgentEvent.Compaction
conversationId: string
summary: string
firstKeptEntryId: string
compactedEntryRange: QueryAgentCompactedEntryRange | null
tokensBefore: number
details?: unknown
fromExtension: boolean
}
export interface QueryAgentEventMap {
[QueryAgentEvent.Compaction]: QueryAgentCompactionEvent
}
export type QueryAgentEventListener<T extends QueryAgentEvent> = (
event: QueryAgentEventMap[T],
) => void | Promise<void>
export type QueryAgentEventListeners = {
[T in QueryAgentEvent]: Set<QueryAgentEventListener<T>>
}
export function createQueryAgentEventListeners(): QueryAgentEventListeners {
return {
[QueryAgentEvent.Compaction]: new Set(),
}
}
export interface QueryAgent { export interface QueryAgent {
ask(input: QueryAgentAsk): AsyncIterable<QueryAgentEvent> ask(input: QueryAgentAsk): AsyncIterable<QueryAgentStreamEvent>
addEventListener<T extends QueryAgentEvent>(
type: T,
listener: QueryAgentEventListener<T>,
): () => void
dispose(): void dispose(): void
} }
export interface QueryAgentResponse { export interface QueryAgentResponse {
message: string message: string
conversationId?: string
} }
export class QueryAgentError extends Error { export class QueryAgentError extends Error {
@@ -30,9 +83,13 @@ export async function collectQueryAgentResponse(
input: QueryAgentAsk, input: QueryAgentAsk,
): Promise<QueryAgentResponse> { ): Promise<QueryAgentResponse> {
let message = "" let message = ""
let conversationId: string | undefined
for await (const event of agent.ask(input)) { for await (const event of agent.ask(input)) {
switch (event.type) { switch (event.type) {
case "conversation":
conversationId = event.conversationId
break
case "text_delta": case "text_delta":
message += event.text message += event.text
break break
@@ -45,5 +102,5 @@ export async function collectQueryAgentResponse(
} }
} }
return { message } return { message, conversationId }
} }

View File

@@ -0,0 +1,156 @@
import { describe, expect, test } from "bun:test"
import type { ConversationStorageEntry } from "./conversation-recording-query-agent.ts"
import { ConversationEntryKind } from "../conversations/types.ts"
import { createSessionManager } from "./session-manager.ts"
describe("createSessionManager", () => {
test("hydrates user and assistant entries into Pi session context", () => {
const sessionManager = createSessionManager({
entries: [
entry({
id: "entry-1",
sequence: 1,
kind: ConversationEntryKind.UserMessage,
payload: {
role: "user",
parts: [{ type: "text", text: "hello" }],
},
}),
entry({
id: "entry-2",
sequence: 2,
kind: ConversationEntryKind.AssistantMessage,
payload: {
role: "assistant",
parts: [{ type: "text", text: "hi there" }],
},
metadata: {
modelRun: {
route: "agent_query",
provider: "openrouter",
model: "stored-model",
},
},
}),
],
modelProvider: "openrouter",
modelId: "fallback-model",
})
const context = sessionManager.buildSessionContext()
expect(context.messages.map(roleOf)).toEqual(["user", "assistant"])
expect(textFromMessage(context.messages[0])).toBe("hello")
expect(textFromMessage(context.messages[1])).toBe("hi there")
expect(context.model).toEqual({
provider: "openrouter",
modelId: "stored-model",
})
})
test("uses the latest context summary and replays only uncovered raw entries", () => {
const sessionManager = createSessionManager({
entries: [
entry({
id: "entry-1",
sequence: 1,
kind: ConversationEntryKind.UserMessage,
payload: {
role: "user",
parts: [{ type: "text", text: "old question" }],
},
}),
entry({
id: "entry-2",
sequence: 2,
kind: ConversationEntryKind.AssistantMessage,
payload: {
role: "assistant",
parts: [{ type: "text", text: "old answer" }],
},
}),
entry({
id: "entry-3",
sequence: 3,
kind: ConversationEntryKind.ContextSummary,
payload: {
covers: {
startSequence: 1,
endSequence: 2,
},
summary: {
durableFacts: ["The user is designing conversation storage."],
preferences: [],
decisions: ["Context compaction is stored as a conversation entry."],
openTasks: [],
importantDetails: [],
},
promptVersion: "test-v1",
},
}),
entry({
id: "entry-4",
sequence: 4,
kind: ConversationEntryKind.UserMessage,
payload: {
role: "user",
parts: [{ type: "text", text: "new question" }],
},
}),
],
modelProvider: "openrouter",
modelId: "fallback-model",
})
const context = sessionManager.buildSessionContext()
expect(context.messages.map(roleOf)).toEqual(["compactionSummary", "user"])
expect(textFromMessage(context.messages[0])).toContain(
"The user is designing conversation storage.",
)
expect(textFromMessage(context.messages[0])).toContain(
"Context compaction is stored as a conversation entry.",
)
expect(textFromMessage(context.messages[1])).toBe("new question")
})
})
function entry(
input: Omit<ConversationStorageEntry, "createdAt" | "metadata"> & {
createdAt?: Date
metadata?: ConversationStorageEntry["metadata"]
},
): ConversationStorageEntry {
return {
...input,
metadata: input.metadata ?? {},
createdAt: input.createdAt ?? new Date("2026-06-15T00:00:00.000Z"),
}
}
function roleOf(message: unknown): string | undefined {
if (!isRecord(message)) return undefined
return typeof message.role === "string" ? message.role : undefined
}
function textFromMessage(message: unknown): string {
if (!isRecord(message)) return ""
if (typeof message.summary === "string") return message.summary
const content = message.content
if (typeof content === "string") return content
if (!Array.isArray(content)) return ""
return content.map(textFromContentPart).join("")
}
function textFromContentPart(part: unknown): string {
if (!isRecord(part)) return ""
return typeof part.text === "string" ? part.text : ""
}
function isRecord(value: unknown): value is Record<string, unknown> {
return typeof value === "object" && value !== null
}

View File

@@ -0,0 +1,188 @@
import { SessionManager } from "@earendil-works/pi-coding-agent"
import { tmpdir } from "node:os"
import type { ConversationStorageEntry } from "./conversation-recording-query-agent.ts"
import {
AssistantMessagePayload,
ContextSummaryPayload,
ConversationEntryKind,
UserMessagePayload,
} from "../conversations/types.ts"
type PiMessage = Parameters<SessionManager["appendMessage"]>[0]
type PiAssistantMessage = Extract<PiMessage, { role: "assistant" }>
export interface CreateSessionManagerInput {
cwd?: string
entries: ConversationStorageEntry[]
modelProvider: string
modelId: string
onMessageEntryAppended?: (piEntryId: string, entry: ConversationStorageEntry) => void
}
export function createSessionManager(input: CreateSessionManagerInput): SessionManager {
const sessionManager = SessionManager.inMemory(input.cwd ?? tmpdir())
const context = buildContextFromEntries(input.entries)
if (context.summary) {
sessionManager.appendCompaction(
context.summary.text,
"freya-db-context-start",
0,
{
conversationEntryId: context.summary.entry.id,
covers: context.summary.covers,
},
true,
)
}
for (const entry of context.entries) {
const message = messageForEntry(entry, input.modelProvider, input.modelId)
if (message) {
const piEntryId = sessionManager.appendMessage(message)
input.onMessageEntryAppended?.(piEntryId, entry)
}
}
return sessionManager
}
function buildContextFromEntries(entries: ConversationStorageEntry[]): {
summary?: { entry: ConversationStorageEntry; text: string; covers: unknown }
entries: ConversationStorageEntry[]
} {
const orderedEntries = [...entries].sort((left, right) => left.sequence - right.sequence)
const summaryEntry = latestContextSummaryEntry(orderedEntries)
if (!summaryEntry || summaryEntry.kind !== ConversationEntryKind.ContextSummary) {
return { entries: orderedEntries }
}
const payload = ContextSummaryPayload.assert(summaryEntry.payload)
const text = contextSummaryText(payload.summary)
const rawStartSequence = payload.covers.endSequence + 1
return {
summary: {
entry: summaryEntry,
text,
covers: payload.covers,
},
entries: orderedEntries.filter((entry) => entry.sequence >= rawStartSequence),
}
}
function latestContextSummaryEntry(
entries: ConversationStorageEntry[],
): ConversationStorageEntry | undefined {
let latest: ConversationStorageEntry | undefined
for (const entry of entries) {
if (entry.kind !== ConversationEntryKind.ContextSummary) continue
if (!latest || entry.sequence > latest.sequence) {
latest = entry
}
}
return latest
}
function messageForEntry(
entry: ConversationStorageEntry,
modelProvider: string,
modelId: string,
): PiMessage | null {
switch (entry.kind) {
case ConversationEntryKind.UserMessage: {
const payload = UserMessagePayload.assert(entry.payload)
return {
role: "user",
content: messagePartsText(payload.parts),
timestamp: entry.createdAt.getTime(),
}
}
case ConversationEntryKind.AssistantMessage: {
const payload = AssistantMessagePayload.assert(entry.payload)
return {
role: "assistant",
content: [{ type: "text", text: messagePartsText(payload.parts) }],
api: "anthropic-messages",
provider: entry.metadata.modelRun?.provider ?? modelProvider,
model: entry.metadata.modelRun?.model ?? modelId,
usage: zeroUsage(),
stopReason: "stop",
timestamp: entry.createdAt.getTime(),
} satisfies PiAssistantMessage
}
case ConversationEntryKind.Attachment:
case ConversationEntryKind.ContextSummary:
case ConversationEntryKind.SystemNote:
case ConversationEntryKind.ToolCall:
case ConversationEntryKind.ToolResult:
return null
}
}
function messagePartsText(
parts: Array<{ type: "text"; text: string } | { type: "json"; value: unknown }>,
): string {
return parts.map(messagePartText).join("\n")
}
function messagePartText(
part: { type: "text"; text: string } | { type: "json"; value: unknown },
): string {
switch (part.type) {
case "text":
return part.text
case "json":
return stringifyJson(part.value)
}
}
function contextSummaryText(summary: {
userIntent?: string
durableFacts: string[]
preferences: string[]
decisions: string[]
openTasks: string[]
importantDetails: string[]
}): string {
const sections: string[] = []
pushSection(sections, "User intent", summary.userIntent ? [summary.userIntent] : [])
pushSection(sections, "Durable facts", summary.durableFacts)
pushSection(sections, "Preferences", summary.preferences)
pushSection(sections, "Decisions", summary.decisions)
pushSection(sections, "Open tasks", summary.openTasks)
pushSection(sections, "Important details", summary.importantDetails)
return sections.join("\n\n")
}
function pushSection(sections: string[], title: string, values: string[]): void {
const trimmedValues = values.map((value) => value.trim()).filter(Boolean)
if (trimmedValues.length === 0) return
sections.push(`${title}:\n${trimmedValues.map((value) => `- ${value}`).join("\n")}`)
}
function stringifyJson(value: unknown): string {
return JSON.stringify(value, null, 2) ?? String(value)
}
function zeroUsage(): PiAssistantMessage["usage"] {
return {
input: 0,
output: 0,
cacheRead: 0,
cacheWrite: 0,
totalTokens: 0,
cost: {
input: 0,
output: 0,
cacheRead: 0,
cacheWrite: 0,
total: 0,
},
}
}

View File

@@ -0,0 +1,373 @@
import { and, asc, desc, eq } from "drizzle-orm"
import type { Database } from "../db/index.ts"
import type {
AssistantMessagePayload,
AttachmentPayload,
ContextSummaryPayload,
ConversationEntryKind as ConversationEntryKindType,
ConversationEntryMetadata,
ConversationEntryPayload,
ConversationEntryVisibility as ConversationEntryVisibilityType,
GenericObjectPayload,
UserMessagePayload,
} from "./types.ts"
import {
conversationEntries,
conversations as conversationsTable,
files,
user,
} from "../db/schema.ts"
import {
ConversationEntryMetadata as ConversationEntryMetadataSchema,
AssistantMessagePayload as AssistantMessagePayloadSchema,
AttachmentPayload as AttachmentPayloadSchema,
ConversationEntryKind,
ConversationEntryKindInput,
ConversationEntryVisibility,
ConversationEntryVisibilityInput,
ContextSummaryPayload as ContextSummaryPayloadSchema,
GenericObjectPayload as GenericObjectPayloadSchema,
UserMessagePayload as UserMessagePayloadSchema,
} from "./types.ts"
export type ConversationRow = typeof conversationsTable.$inferSelect
export type ConversationEntryRow = typeof conversationEntries.$inferSelect
export type FileRow = typeof files.$inferSelect
export interface CreateFileInput {
storageKey: string
originalName?: string
mimeType: string
sizeBytes: number
metadata?: Record<string, unknown>
}
export interface AppendAttachmentEntryInput {
file: CreateFileInput
payload: AttachmentPayload
visibility?: ConversationEntryVisibilityType
metadata?: ConversationEntryMetadata
}
export interface AppendAttachmentEntryResult {
file: FileRow
entry: ConversationEntryRow
}
interface AppendConversationEntryBase {
visibility?: ConversationEntryVisibilityType
metadata?: ConversationEntryMetadata
}
export type AppendConversationEntryInput =
| (AppendConversationEntryBase & {
kind: typeof ConversationEntryKind.UserMessage
payload: UserMessagePayload
fileId?: never
})
| (AppendConversationEntryBase & {
kind: typeof ConversationEntryKind.AssistantMessage
payload: AssistantMessagePayload
fileId?: never
})
| (AppendConversationEntryBase & {
kind: typeof ConversationEntryKind.Attachment
payload: AttachmentPayload
fileId: string
})
| (AppendConversationEntryBase & {
kind: typeof ConversationEntryKind.ContextSummary
payload: ContextSummaryPayload
fileId?: never
})
| (AppendConversationEntryBase & {
kind:
| typeof ConversationEntryKind.ToolCall
| typeof ConversationEntryKind.ToolResult
| typeof ConversationEntryKind.SystemNote
payload: GenericObjectPayload
fileId?: never
})
export interface ListConversationEntriesParams {
visibility?: ConversationEntryVisibilityType
}
export function conversations(db: Database, userId: string) {
return {
async createConversation(): Promise<ConversationRow> {
return insertConversation(db, userId)
},
async getOrCreateConversation(): Promise<ConversationRow> {
return db.transaction(async (tx) => {
await requireUserForUpdate(tx, userId)
const existing = await latestConversation(tx, userId)
if (existing) return existing
return insertConversation(tx, userId)
})
},
async createFile(input: CreateFileInput): Promise<FileRow> {
return insertFile(db, userId, input)
},
async appendEntry(
conversationId: string,
input: AppendConversationEntryInput,
): Promise<ConversationEntryRow> {
const kind = ConversationEntryKindInput.assert(input.kind)
const visibility = ConversationEntryVisibilityInput.assert(
input.visibility ?? defaultVisibilityForKind(kind),
)
const payload = payloadForKind(kind, input.payload)
const metadata = ConversationEntryMetadataSchema.assert(input.metadata ?? {})
let fileId: string | null = null
if (input.kind === ConversationEntryKind.Attachment) {
fileId = input.fileId
await requireFile(db, userId, fileId)
}
const rows = await db.transaction(async (tx) => {
await requireConversationForUpdate(tx, userId, conversationId)
const sequence = await nextSequence(tx, conversationId)
const rows = await tx
.insert(conversationEntries)
.values({
conversationId,
sequence,
kind,
visibility,
fileId,
payload,
metadata,
})
.returning()
await touchConversation(tx, userId, conversationId)
return rows
})
return requireRow(rows)
},
async appendAttachmentEntry(
conversationId: string,
input: AppendAttachmentEntryInput,
): Promise<AppendAttachmentEntryResult> {
const payload = AttachmentPayloadSchema.assert(input.payload)
const visibility = ConversationEntryVisibilityInput.assert(
input.visibility ?? defaultVisibilityForKind(ConversationEntryKind.Attachment),
)
const metadata = ConversationEntryMetadataSchema.assert(input.metadata ?? {})
return db.transaction(async (tx) => {
await requireConversationForUpdate(tx, userId, conversationId)
const file = await insertFile(tx, userId, input.file)
const sequence = await nextSequence(tx, conversationId)
const rows = await tx
.insert(conversationEntries)
.values({
conversationId,
sequence,
kind: ConversationEntryKind.Attachment,
visibility,
fileId: file.id,
payload,
metadata,
})
.returning()
await touchConversation(tx, userId, conversationId)
return {
file,
entry: requireRow(rows),
}
})
},
async listEntries(
conversationId: string,
params: ListConversationEntriesParams = {},
): Promise<ConversationEntryRow[]> {
await requireConversation(db, userId, conversationId)
if (params.visibility) {
return db
.select()
.from(conversationEntries)
.where(
and(
eq(conversationEntries.conversationId, conversationId),
eq(conversationEntries.visibility, params.visibility),
),
)
.orderBy(asc(conversationEntries.sequence))
}
return db
.select()
.from(conversationEntries)
.where(eq(conversationEntries.conversationId, conversationId))
.orderBy(asc(conversationEntries.sequence))
},
}
}
function payloadForKind(
kind: ConversationEntryKindType,
payload: AppendConversationEntryInput["payload"],
): ConversationEntryPayload {
switch (kind) {
case ConversationEntryKind.UserMessage:
return UserMessagePayloadSchema.assert(payload)
case ConversationEntryKind.AssistantMessage:
return AssistantMessagePayloadSchema.assert(payload)
case ConversationEntryKind.Attachment:
return AttachmentPayloadSchema.assert(payload)
case ConversationEntryKind.ContextSummary:
return ContextSummaryPayloadSchema.assert(payload)
case ConversationEntryKind.ToolCall:
case ConversationEntryKind.ToolResult:
case ConversationEntryKind.SystemNote:
return GenericObjectPayloadSchema.assert(payload)
}
}
async function requireUserForUpdate(db: Database, userId: string): Promise<void> {
const rows = await db
.select({ id: user.id })
.from(user)
.where(eq(user.id, userId))
.limit(1)
.for("update")
requireRow(rows, `User not found: ${userId}`)
}
async function requireConversation(
db: Database,
userId: string,
conversationId: string,
): Promise<ConversationRow> {
const rows = await db
.select()
.from(conversationsTable)
.where(and(eq(conversationsTable.id, conversationId), eq(conversationsTable.userId, userId)))
.limit(1)
return requireRow(rows, `Conversation not found: ${conversationId}`)
}
async function requireConversationForUpdate(
db: Database,
userId: string,
conversationId: string,
): Promise<ConversationRow> {
const rows = await db
.select()
.from(conversationsTable)
.where(and(eq(conversationsTable.id, conversationId), eq(conversationsTable.userId, userId)))
.limit(1)
.for("update")
return requireRow(rows, `Conversation not found: ${conversationId}`)
}
async function latestConversation(db: Database, userId: string): Promise<ConversationRow | null> {
const rows = await db
.select()
.from(conversationsTable)
.where(eq(conversationsTable.userId, userId))
.orderBy(desc(conversationsTable.updatedAt), desc(conversationsTable.createdAt))
.limit(1)
return rows[0] ?? null
}
async function insertConversation(db: Database, userId: string): Promise<ConversationRow> {
const rows = await db
.insert(conversationsTable)
.values({
userId,
})
.returning()
return requireRow(rows)
}
async function requireFile(db: Database, userId: string, fileId: string): Promise<FileRow> {
const rows = await db
.select()
.from(files)
.where(and(eq(files.id, fileId), eq(files.userId, userId)))
.limit(1)
return requireRow(rows, `File not found: ${fileId}`)
}
async function insertFile(db: Database, userId: string, input: CreateFileInput): Promise<FileRow> {
const rows = await db
.insert(files)
.values({
userId,
storageKey: input.storageKey,
originalName: input.originalName ?? null,
mimeType: input.mimeType,
sizeBytes: input.sizeBytes,
metadata: input.metadata ?? {},
})
.returning()
return requireRow(rows)
}
async function touchConversation(
db: Database,
userId: string,
conversationId: string,
): Promise<void> {
await db
.update(conversationsTable)
.set({ updatedAt: new Date() })
.where(and(eq(conversationsTable.id, conversationId), eq(conversationsTable.userId, userId)))
}
async function nextSequence(db: Database, conversationId: string): Promise<number> {
const rows = await db
.select({ sequence: conversationEntries.sequence })
.from(conversationEntries)
.where(eq(conversationEntries.conversationId, conversationId))
.orderBy(desc(conversationEntries.sequence))
.limit(1)
return (rows[0]?.sequence ?? 0) + 1
}
function requireRow<T>(rows: T[], message = "Expected database row"): T {
const row = rows[0]
if (!row) throw new Error(message)
return row
}
function defaultVisibilityForKind(
kind: ConversationEntryKindType,
): ConversationEntryVisibilityType {
switch (kind) {
case ConversationEntryKind.UserMessage:
case ConversationEntryKind.AssistantMessage:
case ConversationEntryKind.Attachment:
return ConversationEntryVisibility.UserVisible
case ConversationEntryKind.ToolCall:
case ConversationEntryKind.ToolResult:
case ConversationEntryKind.ContextSummary:
case ConversationEntryKind.SystemNote:
return ConversationEntryVisibility.Internal
}
}

View File

@@ -0,0 +1,146 @@
import { describe, expect, test } from "bun:test"
import {
AttachmentType,
AttachmentPayload,
ContextSummaryPayload,
ConversationEntryMetadata,
GenericObjectPayload,
UserMessagePayload,
} from "./types.ts"
describe("conversation entry schemas", () => {
test("parses valid user message payloads", () => {
const payload = UserMessagePayload.assert({
role: "user",
parts: [{ type: "text", text: "hello" }],
})
expect(payload).toEqual({
role: "user",
parts: [{ type: "text", text: "hello" }],
})
})
test("rejects user message payloads with the wrong role", () => {
expect(() =>
UserMessagePayload.assert({
role: "assistant",
parts: [{ type: "text", text: "hello" }],
}),
).toThrow()
})
test("rejects user message payloads with no parts", () => {
expect(() =>
UserMessagePayload.assert({
role: "user",
parts: [],
}),
).toThrow()
})
test("parses valid attachment payloads", () => {
const payload = AttachmentPayload.assert({
role: "user",
name: "whiteboard.png",
mimeType: "image/png",
attachmentType: AttachmentType.Image,
caption: "whiteboard sketch",
})
expect(payload).toEqual({
role: "user",
name: "whiteboard.png",
mimeType: "image/png",
attachmentType: AttachmentType.Image,
caption: "whiteboard sketch",
})
})
test("rejects extra fields on structured payloads", () => {
expect(() =>
AttachmentPayload.assert({
role: "user",
name: "whiteboard.png",
mimeType: "image/png",
attachmentType: AttachmentType.Image,
fileId: "file-1",
}),
).toThrow()
})
test("parses context summary payloads", () => {
const payload = ContextSummaryPayload.assert({
covers: {
startSequence: 1,
endSequence: 12,
},
summary: {
userIntent: "Design message storage.",
durableFacts: [],
preferences: ["Keep the schema simple."],
decisions: ["Use conversation_entries as the timeline."],
openTasks: [],
importantDetails: [],
},
promptVersion: "conversation-summary-v1",
sourceEntryIds: ["entry-1", "entry-2"],
})
expect(payload).toMatchObject({
covers: {
startSequence: 1,
endSequence: 12,
},
promptVersion: "conversation-summary-v1",
})
})
test("allows generic object payloads for tool entries", () => {
const payload = GenericObjectPayload.assert({
toolCallId: "call-1",
toolName: "calendar.search",
input: { date: "2026-06-15" },
})
expect(payload).toEqual({
toolCallId: "call-1",
toolName: "calendar.search",
input: { date: "2026-06-15" },
})
})
test("rejects non-object generic payloads", () => {
expect(() => GenericObjectPayload.assert("done")).toThrow()
})
test("parses model run metadata and allows extra top-level metadata", () => {
const metadata = ConversationEntryMetadata.assert({
modelRun: {
route: "default-chat",
provider: "pi",
model: "pi-model",
inputTokens: 120,
outputTokens: 24,
},
traceId: "trace-1",
})
expect(metadata.modelRun?.model).toBe("pi-model")
expect(metadata.traceId).toBe("trace-1")
})
test("rejects invalid model run metadata", () => {
expect(() =>
ConversationEntryMetadata.assert({
modelRun: {
route: "default-chat",
provider: "pi",
model: "pi-model",
inputTokens: -1,
},
}),
).toThrow()
})
})

View File

@@ -0,0 +1,136 @@
import { type } from "arktype"
export const ConversationEntryKind = {
UserMessage: "user_message",
AssistantMessage: "assistant_message",
Attachment: "attachment",
ToolCall: "tool_call",
ToolResult: "tool_result",
ContextSummary: "context_summary",
SystemNote: "system_note",
} as const
export type ConversationEntryKind =
(typeof ConversationEntryKind)[keyof typeof ConversationEntryKind]
export const ConversationEntryVisibility = {
UserVisible: "user_visible",
Internal: "internal",
} as const
export type ConversationEntryVisibility =
(typeof ConversationEntryVisibility)[keyof typeof ConversationEntryVisibility]
export const AttachmentType = {
Image: "image",
Audio: "audio",
Video: "video",
Document: "document",
Other: "other",
} as const
export type AttachmentType = (typeof AttachmentType)[keyof typeof AttachmentType]
export const ConversationEntryKindInput = type.enumerated(...Object.values(ConversationEntryKind))
export const ConversationEntryVisibilityInput = type.enumerated(
...Object.values(ConversationEntryVisibility),
)
export const AttachmentTypeInput = type.enumerated(...Object.values(AttachmentType))
const TextMessagePart = type({
"+": "reject",
type: "'text'",
text: "string",
})
const JsonMessagePart = type({
"+": "reject",
type: "'json'",
value: "unknown",
})
export const MessagePart = type.or(TextMessagePart, JsonMessagePart)
export type MessagePart = typeof MessagePart.infer
export const UserMessagePayload = type({
"+": "reject",
role: "'user'",
parts: MessagePart.array().atLeastLength(1),
})
export type UserMessagePayload = typeof UserMessagePayload.infer
export const AssistantMessagePayload = type({
"+": "reject",
role: "'assistant'",
parts: MessagePart.array().atLeastLength(1),
})
export type AssistantMessagePayload = typeof AssistantMessagePayload.infer
export const AttachmentPayload = type({
"+": "reject",
role: type.enumerated("user", "assistant"),
name: "string",
mimeType: "string",
attachmentType: AttachmentTypeInput,
"caption?": "string",
})
export type AttachmentPayload = typeof AttachmentPayload.infer
const ContextSummary = type({
"+": "reject",
"userIntent?": "string",
durableFacts: type.string.array(),
preferences: type.string.array(),
decisions: type.string.array(),
openTasks: type.string.array(),
importantDetails: type.string.array(),
})
export const ContextSummaryPayload = type({
"+": "reject",
covers: type({
"+": "reject",
startSequence: "number.integer >= 1",
endSequence: "number.integer >= 1",
}),
summary: ContextSummary,
promptVersion: "string",
"sourceEntryIds?": type.string.array(),
})
export type ContextSummaryPayload = typeof ContextSummaryPayload.infer
export const ModelRunMetadata = type({
"+": "reject",
route: "string",
provider: "string",
model: "string",
"contextSummaryEntryId?": "string",
"rawEntriesStartSequence?": "number.integer >= 1",
"rawEntriesEndSequence?": "number.integer >= 1",
"inputTokens?": "number.integer >= 0",
"outputTokens?": "number.integer >= 0",
"providerRequestId?": "string",
})
export type ModelRunMetadata = typeof ModelRunMetadata.infer
export const ConversationEntryMetadata = type({
"modelRun?": ModelRunMetadata,
"[string]": "unknown",
})
export type ConversationEntryMetadata = typeof ConversationEntryMetadata.infer
export const GenericObjectPayload = type("Record<string, unknown>")
export type GenericObjectPayload = typeof GenericObjectPayload.infer
export type ConversationEntryPayload =
| UserMessagePayload
| AssistantMessagePayload
| AttachmentPayload
| ContextSummaryPayload
| GenericObjectPayload

View File

@@ -1,6 +1,9 @@
import { sql } from "drizzle-orm"
import { import {
boolean, boolean,
check,
customType, customType,
integer,
index, index,
jsonb, jsonb,
pgTable, pgTable,
@@ -10,6 +13,14 @@ import {
uuid, uuid,
} from "drizzle-orm/pg-core" } from "drizzle-orm/pg-core"
import {
ConversationEntryVisibility,
type ConversationEntryKind,
type ConversationEntryMetadata,
type ConversationEntryPayload,
type ConversationEntryVisibility as ConversationEntryVisibilityType,
} from "../conversations/types.ts"
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
// Better Auth core tables // Better Auth core tables
// Re-exported from CLI-generated schema. // Re-exported from CLI-generated schema.
@@ -61,6 +72,81 @@ export const userSources = pgTable(
], ],
) )
// ---------------------------------------------------------------------------
// FREYA — conversations
// ---------------------------------------------------------------------------
export const conversations = pgTable(
"conversations",
{
id: uuid("id").primaryKey().defaultRandom(),
userId: text("user_id")
.notNull()
.references(() => user.id, { onDelete: "cascade" }),
createdAt: timestamp("created_at").notNull().defaultNow(),
updatedAt: timestamp("updated_at")
.notNull()
.defaultNow()
.$onUpdate(() => new Date()),
},
(t) => [index("conversations_user_id_updated_at_idx").on(t.userId, t.updatedAt)],
)
export const files = pgTable(
"files",
{
id: uuid("id").primaryKey().defaultRandom(),
userId: text("user_id")
.notNull()
.references(() => user.id, { onDelete: "cascade" }),
storageKey: text("storage_key").notNull(),
originalName: text("original_name"),
mimeType: text("mime_type").notNull(),
sizeBytes: integer("size_bytes").notNull(),
metadata: jsonb("metadata").$type<Record<string, unknown>>().notNull().default({}),
createdAt: timestamp("created_at").notNull().defaultNow(),
},
(t) => [
unique("files_storage_key_unique").on(t.storageKey),
index("files_user_id_created_at_idx").on(t.userId, t.createdAt),
],
)
export const conversationEntries = pgTable(
"conversation_entries",
{
id: uuid("id").primaryKey().defaultRandom(),
conversationId: uuid("conversation_id")
.notNull()
.references(() => conversations.id, { onDelete: "cascade" }),
sequence: integer("sequence").notNull(),
kind: text("kind").$type<ConversationEntryKind>().notNull(),
visibility: text("visibility")
.$type<ConversationEntryVisibilityType>()
.notNull()
.default(ConversationEntryVisibility.Internal),
fileId: uuid("file_id").references(() => files.id, { onDelete: "restrict" }),
payload: jsonb("payload").$type<ConversationEntryPayload>().notNull(),
metadata: jsonb("metadata").$type<ConversationEntryMetadata>().notNull().default({}),
createdAt: timestamp("created_at").notNull().defaultNow(),
},
(t) => [
unique("conversation_entries_conversation_id_sequence_unique").on(t.conversationId, t.sequence),
index("conversation_entries_conversation_id_sequence_idx").on(t.conversationId, t.sequence),
index("conversation_entries_conversation_id_visibility_sequence_idx").on(
t.conversationId,
t.visibility,
t.sequence,
),
index("conversation_entries_kind_idx").on(t.kind),
index("conversation_entries_file_id_idx").on(t.fileId),
check(
"conversation_entries_attachment_file_id_check",
sql`(${t.kind} = 'attachment' and ${t.fileId} is not null) or (${t.kind} <> 'attachment' and ${t.fileId} is null)`,
),
],
)
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
// FREYA — reminders source storage // FREYA — reminders source storage
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------

View File

@@ -85,6 +85,20 @@ mock.module("../sources/user-sources.ts", () => ({
}), }),
})) }))
mock.module("../conversations/storage.ts", () => ({
conversations: (_db: Database, userId: string) => ({
async getOrCreateConversation() {
return { id: `conversation-${userId}` }
},
async listEntries() {
return []
},
async appendEntry() {
return { id: "entry-1", sequence: 1 }
},
}),
}))
const fakeDb = {} as Database const fakeDb = {} as Database
describe("GET /api/feed", () => { describe("GET /api/feed", () => {

View File

@@ -4,9 +4,12 @@ import { LocationSource } from "@freya/source-location"
import { WeatherSource } from "@freya/source-weatherkit" import { WeatherSource } from "@freya/source-weatherkit"
import { beforeEach, describe, expect, mock, spyOn, test } from "bun:test" import { beforeEach, describe, expect, mock, spyOn, test } from "bun:test"
import type { ConversationStorageEntry } from "../agent/conversation-recording-query-agent.ts"
import type { AppendConversationEntryInput } from "../conversations/storage.ts"
import type { Database } from "../db/index.ts" import type { Database } from "../db/index.ts"
import type { FeedSourceProvider } from "./feed-source-provider.ts" import type { FeedSourceProvider } from "./feed-source-provider.ts"
import { ConversationEntryKind } from "../conversations/types.ts"
import { CredentialEncryptor } from "../lib/crypto.ts" import { CredentialEncryptor } from "../lib/crypto.ts"
import { import {
CredentialStorageUnavailableError, CredentialStorageUnavailableError,
@@ -21,6 +24,8 @@ import { UserSessionManager } from "./user-session-manager.ts"
* Key = userId (or "*" for a default), value = array of enabled sourceIds. * Key = userId (or "*" for a default), value = array of enabled sourceIds.
*/ */
const enabledByUser = new Map<string, string[]>() const enabledByUser = new Map<string, string[]>()
const conversationEntriesByUser = new Map<string, ConversationStorageEntry[]>()
const mockConversationCalls: Array<{ type: "getOrCreate" | "listEntries"; userId: string }> = []
/** Set which sourceIds are enabled for all users. */ /** Set which sourceIds are enabled for all users. */
function setEnabledSources(sourceIds: string[]) { function setEnabledSources(sourceIds: string[]) {
@@ -37,6 +42,10 @@ function getEnabledSourceIds(userId: string): string[] {
return enabledByUser.get(userId) ?? enabledByUser.get("*") ?? [] return enabledByUser.get(userId) ?? enabledByUser.get("*") ?? []
} }
function setConversationEntriesForUser(userId: string, entries: ConversationStorageEntry[]) {
conversationEntriesByUser.set(userId, entries)
}
/** /**
* Controls what `find()` returns in the mock. When `undefined` (the default), * Controls what `find()` returns in the mock. When `undefined` (the default),
* `find()` returns a standard enabled row. Set to a specific value (including * `find()` returns a standard enabled row. Set to a specific value (including
@@ -111,6 +120,35 @@ mock.module("../sources/user-sources.ts", () => ({
}), }),
})) }))
mock.module("../conversations/storage.ts", () => ({
conversations: (_db: Database, userId: string) => ({
async getOrCreateConversation(): Promise<{ id: string }> {
mockConversationCalls.push({ type: "getOrCreate", userId })
return { id: `conversation-${userId}` }
},
async listEntries(_conversationId: string): Promise<ConversationStorageEntry[]> {
mockConversationCalls.push({ type: "listEntries", userId })
return conversationEntriesByUser.get(userId) ?? []
},
async appendEntry(
_conversationId: string,
input: AppendConversationEntryInput,
): Promise<ConversationStorageEntry> {
const entries = conversationEntriesByUser.get(userId) ?? []
const row: ConversationStorageEntry = {
id: `entry-${entries.length + 1}`,
sequence: entries.length + 1,
kind: input.kind,
payload: input.payload,
metadata: input.metadata ?? {},
createdAt: new Date("2026-06-15T00:00:00.000Z"),
}
conversationEntriesByUser.set(userId, [...entries, row])
return row
},
}),
}))
const fakeDb = { const fakeDb = {
transaction: <T>(fn: (tx: unknown) => Promise<T>) => fn(fakeDb), transaction: <T>(fn: (tx: unknown) => Promise<T>) => fn(fakeDb),
} as unknown as Database } as unknown as Database
@@ -160,6 +198,8 @@ const weatherProvider: FeedSourceProvider = {
beforeEach(() => { beforeEach(() => {
enabledByUser.clear() enabledByUser.clear()
conversationEntriesByUser.clear()
mockConversationCalls.length = 0
mockFindResult = undefined mockFindResult = undefined
mockUpdateCredentialsCalls.length = 0 mockUpdateCredentialsCalls.length = 0
mockUpdateCredentialsError = null mockUpdateCredentialsError = null
@@ -176,6 +216,31 @@ describe("UserSessionManager", () => {
expect(session.engine).toBeDefined() expect(session.engine).toBeDefined()
}) })
test("getOrCreate eagerly loads conversation entries for the user session", async () => {
setEnabledSources([])
setConversationEntriesForUser("user-1", [
{
id: "entry-1",
sequence: 1,
kind: ConversationEntryKind.UserMessage,
payload: {
role: "user",
parts: [{ type: "text", text: "stored hello" }],
},
metadata: {},
createdAt: new Date("2026-06-15T00:00:00.000Z"),
},
])
const manager = new UserSessionManager({ db: fakeDb, providers: [] })
await manager.getOrCreate("user-1")
expect(mockConversationCalls).toEqual([
{ type: "getOrCreate", userId: "user-1" },
{ type: "listEntries", userId: "user-1" },
])
})
test("getOrCreate returns same session for same user", async () => { test("getOrCreate returns same session for same user", async () => {
setEnabledSources(["freya.location"]) setEnabledSources(["freya.location"])
const manager = new UserSessionManager({ db: fakeDb, providers: [locationProvider] }) const manager = new UserSessionManager({ db: fakeDb, providers: [locationProvider] })

View File

@@ -8,6 +8,7 @@ import type { FeedEnhancer } from "../enhancement/enhance-feed.ts"
import type { CredentialEncryptor } from "../lib/crypto.ts" import type { CredentialEncryptor } from "../lib/crypto.ts"
import type { FeedSourceProvider } from "./feed-source-provider.ts" import type { FeedSourceProvider } from "./feed-source-provider.ts"
import { conversations } from "../conversations/storage.ts"
import { import {
CredentialStorageUnavailableError, CredentialStorageUnavailableError,
InvalidSourceConfigError, InvalidSourceConfigError,
@@ -362,6 +363,7 @@ export class UserSessionManager {
private async createSession(userId: string): Promise<UserSession> { private async createSession(userId: string): Promise<UserSession> {
const enabledRows = await sources(this.db, userId).enabled() const enabledRows = await sources(this.db, userId).enabled()
const agentConfig = this.queryAgentConfigForUser(userId)
const promises: Promise<FeedSource>[] = [] const promises: Promise<FeedSource>[] = []
for (const row of enabledRows) { for (const row of enabledRows) {
@@ -373,7 +375,7 @@ export class UserSessionManager {
} }
if (promises.length === 0) { if (promises.length === 0) {
return new UserSession(userId, [], this.feedEnhancer, this.queryAgentConfig) return this.initializedSession(userId, [], agentConfig)
} }
const results = await Promise.allSettled(promises) const results = await Promise.allSettled(promises)
@@ -397,7 +399,29 @@ export class UserSessionManager {
console.error("[UserSessionManager] Feed source provider failed:", error) console.error("[UserSessionManager] Feed source provider failed:", error)
} }
return new UserSession(userId, feedSources, this.feedEnhancer, this.queryAgentConfig) return this.initializedSession(userId, feedSources, agentConfig)
}
private queryAgentConfigForUser(userId: string): UserSessionAgentConfig {
return {
...(this.queryAgentConfig ?? {}),
conversationStorage: conversations(this.db, userId),
}
}
private async initializedSession(
userId: string,
sources: FeedSource[],
agentConfig: UserSessionAgentConfig,
): Promise<UserSession> {
const session = new UserSession(userId, sources, this.feedEnhancer, agentConfig)
try {
await session.initialize()
return session
} catch (err) {
session.destroy()
throw err
}
} }
/** /**

View File

@@ -3,6 +3,13 @@ import type { ActionDefinition, ContextEntry, FeedItem, FeedSource } from "@frey
import { LocationSource } from "@freya/source-location" import { LocationSource } from "@freya/source-location"
import { describe, expect, spyOn, test } from "bun:test" import { describe, expect, spyOn, test } from "bun:test"
import type {
ConversationStorage,
ConversationStorageEntry,
} from "../agent/conversation-recording-query-agent.ts"
import type { AppendConversationEntryInput } from "../conversations/storage.ts"
import { ConversationEntryKind } from "../conversations/types.ts"
import { UserSession } from "./user-session.ts" import { UserSession } from "./user-session.ts"
function createStubSource(id: string, items: FeedItem[] = []): FeedSource { function createStubSource(id: string, items: FeedItem[] = []): FeedSource {
@@ -23,6 +30,40 @@ function createStubSource(id: string, items: FeedItem[] = []): FeedSource {
} }
} }
class FakeConversationStorage implements ConversationStorage {
readonly calls: string[] = []
private readonly entries: ConversationStorageEntry[]
constructor(entries: ConversationStorageEntry[] = []) {
this.entries = entries
}
async getOrCreateConversation(): Promise<{ id: string }> {
this.calls.push("getOrCreateConversation")
return { id: "conversation-1" }
}
async appendEntry(
_conversationId: string,
input: AppendConversationEntryInput,
): Promise<ConversationStorageEntry> {
this.calls.push("appendEntry")
return {
id: "entry-appended",
sequence: 1,
kind: input.kind,
payload: input.payload,
metadata: input.metadata ?? {},
createdAt: new Date("2026-06-15T00:00:00.000Z"),
}
}
async listEntries(_conversationId: string): Promise<ConversationStorageEntry[]> {
this.calls.push("listEntries")
return this.entries
}
}
describe("UserSession", () => { describe("UserSession", () => {
test("registers sources and starts engine", async () => { test("registers sources and starts engine", async () => {
const session = new UserSession("test-user", [ const session = new UserSession("test-user", [
@@ -67,6 +108,32 @@ describe("UserSession", () => {
expect(disposeSpy).toHaveBeenCalled() expect(disposeSpy).toHaveBeenCalled()
}) })
test("initialize loads conversation entries before exposing stored agent", async () => {
const storage = new FakeConversationStorage([
{
id: "entry-1",
sequence: 1,
kind: ConversationEntryKind.UserMessage,
payload: {
role: "user",
parts: [{ type: "text", text: "stored hello" }],
},
metadata: {},
createdAt: new Date("2026-06-15T00:00:00.000Z"),
},
])
const session = new UserSession("test-user", [createStubSource("test")], null, {
conversationStorage: storage,
})
expect(() => session.agent).toThrow("UserSession has not been initialized")
await session.initialize()
expect(storage.calls).toEqual(["getOrCreateConversation", "listEntries"])
expect(session.agent).toBeDefined()
})
test("engine.executeAction routes to correct source", async () => { test("engine.executeAction routes to correct source", async () => {
const location = new LocationSource() const location = new LocationSource()
const session = new UserSession("test-user", [location]) const session = new UserSession("test-user", [location])

View File

@@ -10,22 +10,30 @@ import type { QueryAgentToolbox } from "../agent/query-agent-toolbox.ts"
import type { QueryAgent } from "../agent/query-agent.ts" import type { QueryAgent } from "../agent/query-agent.ts"
import type { FeedEnhancer } from "../enhancement/enhance-feed.ts" import type { FeedEnhancer } from "../enhancement/enhance-feed.ts"
import { PiQueryAgent } from "../agent/pi-query-agent.ts" import {
ConversationRecordingQueryAgent,
type ConversationStorage,
} from "../agent/conversation-recording-query-agent.ts"
import { PiQueryAgent, PI_MODEL_ID, PI_MODEL_PROVIDER } from "../agent/pi-query-agent.ts"
import { UserSessionQueryAgentToolbox } from "../agent/user-session-query-agent-toolbox.ts" import { UserSessionQueryAgentToolbox } from "../agent/user-session-query-agent-toolbox.ts"
export interface UserSessionAgentConfig { export interface UserSessionAgentConfig {
apiKey?: string apiKey?: string
cwd?: string cwd?: string
systemPrompt?: string systemPrompt?: string
conversationStorage?: ConversationStorage
} }
export class UserSession { export class UserSession {
readonly userId: string readonly userId: string
readonly engine: FeedEngine readonly engine: FeedEngine
readonly toolbox: QueryAgentToolbox readonly toolbox: QueryAgentToolbox
readonly agent: QueryAgent
private sources = new Map<string, FeedSource>() private sources = new Map<string, FeedSource>()
private readonly enhancer: FeedEnhancer | null private readonly enhancer: FeedEnhancer | null
private readonly agentConfig: UserSessionAgentConfig | undefined
private queryAgent: QueryAgent | null = null
private initializePromise: Promise<void> | null = null
private initialized = false
private enhancedItems: FeedItem[] | null = null private enhancedItems: FeedItem[] | null = null
/** The FeedResult that enhancedItems was derived from. */ /** The FeedResult that enhancedItems was derived from. */
private enhancedSource: FeedResult | null = null private enhancedSource: FeedResult | null = null
@@ -41,6 +49,7 @@ export class UserSession {
this.userId = userId this.userId = userId
this.engine = new FeedEngine() this.engine = new FeedEngine()
this.enhancer = enhancer ?? null this.enhancer = enhancer ?? null
this.agentConfig = agentConfig
for (const source of sources) { for (const source of sources) {
this.sources.set(source.id, source) this.sources.set(source.id, source)
this.engine.register(source) this.engine.register(source)
@@ -54,17 +63,43 @@ export class UserSession {
} }
this.toolbox = new UserSessionQueryAgentToolbox(this) this.toolbox = new UserSessionQueryAgentToolbox(this)
this.agent = new PiQueryAgent({ if (!agentConfig?.conversationStorage) {
userId: this.userId, this.queryAgent = new PiQueryAgent({
toolbox: this.toolbox, toolbox: this.toolbox,
apiKey: agentConfig?.apiKey, apiKey: this.agentConfig?.apiKey,
cwd: agentConfig?.cwd, cwd: this.agentConfig?.cwd,
systemPrompt: agentConfig?.systemPrompt, systemPrompt: this.agentConfig?.systemPrompt,
}) })
this.initialized = true
}
this.engine.start() this.engine.start()
} }
get agent(): QueryAgent {
if (!this.queryAgent) {
throw new Error("UserSession has not been initialized")
}
return this.queryAgent
}
async initialize(): Promise<void> {
if (this.initialized) return
if (this.initializePromise) return this.initializePromise
const promise = this.initializeAgent()
this.initializePromise = promise
try {
await promise
this.initialized = true
} finally {
if (this.initializePromise === promise) {
this.initializePromise = null
}
}
}
/** /**
* Returns the current feed, refreshing if the engine cache expired. * Returns the current feed, refreshing if the engine cache expired.
* Enhancement runs eagerly on engine updates; this method awaits * Enhancement runs eagerly on engine updates; this method awaits
@@ -201,7 +236,8 @@ export class UserSession {
} }
destroy(): void { destroy(): void {
this.agent.dispose() this.queryAgent?.dispose()
this.queryAgent = null
this.unsubscribe?.() this.unsubscribe?.()
this.unsubscribe = null this.unsubscribe = null
this.engine.stop() this.engine.stop()
@@ -210,6 +246,38 @@ export class UserSession {
this.enhancingPromise = null this.enhancingPromise = null
} }
private async initializeAgent(): Promise<void> {
if (this.queryAgent) return
const conversationStorage = this.agentConfig?.conversationStorage
if (!conversationStorage) {
this.queryAgent = new PiQueryAgent({
toolbox: this.toolbox,
apiKey: this.agentConfig?.apiKey,
cwd: this.agentConfig?.cwd,
systemPrompt: this.agentConfig?.systemPrompt,
})
return
}
const conversation = await conversationStorage.getOrCreateConversation()
const entries = await conversationStorage.listEntries(conversation.id)
this.queryAgent = new ConversationRecordingQueryAgent({
agent: new PiQueryAgent({
toolbox: this.toolbox,
apiKey: this.agentConfig?.apiKey,
cwd: this.agentConfig?.cwd,
systemPrompt: this.agentConfig?.systemPrompt,
initialEntries: entries,
}),
storage: conversationStorage,
defaultConversationId: conversation.id,
modelProvider: PI_MODEL_PROVIDER,
modelId: PI_MODEL_ID,
})
}
private invalidateEnhancement(): void { private invalidateEnhancement(): void {
this.enhancedItems = null this.enhancedItems = null
this.enhancedSource = null this.enhancedSource = null

View File

@@ -128,6 +128,20 @@ mock.module("../sources/user-sources.ts", () => ({
}, },
})) }))
mock.module("../conversations/storage.ts", () => ({
conversations: (_db: Database, userId: string) => ({
async getOrCreateConversation() {
return { id: `conversation-${userId}` }
},
async listEntries() {
return []
},
async appendEntry() {
return { id: "entry-1", sequence: 1 }
},
}),
}))
const fakeDb = { const fakeDb = {
transaction: <T>(fn: (tx: unknown) => Promise<T>) => fn(fakeDb), transaction: <T>(fn: (tx: unknown) => Promise<T>) => fn(fakeDb),
} as unknown as Database } as unknown as Database

View File

@@ -37,13 +37,13 @@ export function meta({}: Route.MetaArgs) {
}, },
{ property: "og:title", content: PAGE_TITLE }, { property: "og:title", content: PAGE_TITLE },
{ property: "og:description", content: PAGE_DESCRIPTION }, { property: "og:description", content: PAGE_DESCRIPTION },
{ property: "og:image", content: "https://ael.is/social-media-preview.png" }, { property: "og:image", content: "https://freya.chat/social-media-preview.jpg" },
{ property: "og:url", content: "https://ael.is" }, { property: "og:url", content: "https://freya.chat" },
{ property: "og:type", content: "website" }, { property: "og:type", content: "website" },
{ name: "twitter:card", content: "summary_large_image" }, { name: "twitter:card", content: "summary_large_image" },
{ name: "twitter:title", content: PAGE_TITLE }, { name: "twitter:title", content: PAGE_TITLE },
{ name: "twitter:description", content: PAGE_DESCRIPTION }, { name: "twitter:description", content: PAGE_DESCRIPTION },
{ name: "twitter:image", content: "https://ael.is/social-media-preview.png" }, { name: "twitter:image", content: "https://freya.chat/social-media-preview.jpg" },
] ]
} }
@@ -84,7 +84,7 @@ export async function action({ request }: Route.ActionArgs) {
await new Promise((resolve) => setTimeout(resolve, 1000)) await new Promise((resolve) => setTimeout(resolve, 1000))
const emailRes = await resend.emails.send({ const emailRes = await resend.emails.send({
from: "Freya <no-reply@ael.is>", from: "Freya <no-reply@freya.chat>",
to: email, to: email,
template: { template: {
id: "waitlist-confirmation", id: "waitlist-confirmation",
@@ -380,7 +380,6 @@ function SystemMessageBubble({
isAnimating={isStreaming} isAnimating={isStreaming}
linkSafety={{ enabled: false }} linkSafety={{ enabled: false }}
components={{ components={{
// @ts-expect-error
a: ({ className, ...props }) => <a className={`underline ${className}`} {...props} />, a: ({ className, ...props }) => <a className={`underline ${className}`} {...props} />,
}} }}
> >

View File

@@ -40,7 +40,7 @@ const POLICY = `# Privacy Policy
**Last updated:** March 5, 2026 **Last updated:** March 5, 2026
This Privacy Policy describes how **Freya** ("we", "us", or "our") collects, uses, and protects your personal information when you visit **https://ael.is** or interact with our services. This Privacy Policy describes how **Freya** ("we", "us", or "our") collects, uses, and protects your personal information when you visit **https://freya.chat** or interact with our services.
If you do not agree with this Privacy Policy, please do not use the website. If you do not agree with this Privacy Policy, please do not use the website.

View File

@@ -1,4 +1,4 @@
User-agent: * User-agent: *
Allow: / Allow: /
Sitemap: https://ael.is/sitemap.xml Sitemap: https://freya.chat/sitemap.xml

View File

@@ -1,9 +1,9 @@
<?xml version="1.0" encoding="UTF-8"?> <?xml version="1.0" encoding="UTF-8"?>
<urlset xmlns="http://www.sitemaps.org/schemas/sitemap/0.9"> <urlset xmlns="http://www.sitemaps.org/schemas/sitemap/0.9">
<url> <url>
<loc>https://ael.is/</loc> <loc>https://freya.chat/</loc>
</url> </url>
<url> <url>
<loc>https://ael.is/privacy</loc> <loc>https://freya.chat/privacy</loc>
</url> </url>
</urlset> </urlset>

Binary file not shown.

After

Width:  |  Height:  |  Size: 45 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 68 KiB