feat: add conversation storage (#140)

This commit is contained in:
2026-06-16 20:16:03 +01:00
committed by GitHub
parent 95f6c99f19
commit e11051b04b
23 changed files with 3457 additions and 396 deletions

View File

@@ -1,73 +1,119 @@
import type { AgentSessionEvent } from "@earendil-works/pi-coding-agent"
import type {
AgentSessionEvent,
ExtensionFactory,
SessionEntry,
} from "@earendil-works/pi-coding-agent"
import {
AuthStorage,
createAgentSession,
DefaultResourceLoader,
ModelRegistry,
SessionManager,
SettingsManager,
} from "@earendil-works/pi-coding-agent"
import { tmpdir } from "node:os"
import type { ConversationStorageEntry } from "./conversation-recording-query-agent.ts"
import type { QueryAgentToolbox } from "./query-agent-toolbox.ts"
import type { QueryAgent, QueryAgentAsk, QueryAgentEvent } from "./query-agent.ts"
import { InMemoryResourceLoader } from "./in-memory-resource-loader.ts"
import defaultSystemPrompt from "./prompts/system.txt"
import {
createQueryAgentEventListeners,
QueryAgentEvent,
type QueryAgent,
type QueryAgentAsk,
type QueryAgentCompactedEntryRange,
type QueryAgentCompactionEvent,
type QueryAgentConversationEntryRef,
type QueryAgentEventListeners,
type QueryAgentEventListener,
type QueryAgentEventMap,
type QueryAgentStreamEvent,
} from "./query-agent.ts"
import { createSessionManager } from "./session-manager.ts"
import { createFreyaAgentTools, FREYA_AGENT_TOOL_NAMES } from "./tools.ts"
type PiSession = Awaited<ReturnType<typeof createAgentSession>>["session"]
type PiMessageEndEvent = Extract<AgentSessionEvent, { type: "message_end" }>
type PiAgentMessage = PiMessageEndEvent["message"]
type PiAgentEndEvent = Extract<AgentSessionEvent, { type: "agent_end" }>
type PiSessionManager = ReturnType<typeof createSessionManager>
type PiSessionMessage = Parameters<PiSessionManager["appendMessage"]>[0]
export interface PiQueryAgentConfig {
userId: string
toolbox: QueryAgentToolbox
apiKey?: string
cwd?: string
systemPrompt?: string
initialEntries?: ConversationStorageEntry[]
}
const MODEL_PROVIDER = "openrouter"
const MODEL_ID = "z-ai/glm-4.7-flash"
export const PI_MODEL_PROVIDER = "openrouter"
export const PI_MODEL_ID = "z-ai/glm-4.7-flash"
export class PiQueryAgent implements QueryAgent {
private readonly userId: string
private readonly toolbox: QueryAgentToolbox
private readonly cwd: string
private readonly systemPrompt: string
private readonly apiKey: string | undefined
private readonly initialEntries: ConversationStorageEntry[]
private readonly eventListeners = createQueryAgentEventListeners()
private session: PiSession | null = null
private pendingSession: Promise<PiSession> | null = null
private activeRun: symbol | null = null
/**
* Conversation currently receiving Pi events for an active ask().
*
* Pi's compaction hook fires from the SDK session rather than from our
* QueryAgent call stack, so the hook reads this value to attach the
* compaction summary to the right Freya conversation. null means no active
* run; "" means a run is active but no Freya conversation id was supplied.
*/
private activeConversationId: string | null = null
/**
* Freya entry for the user message currently being handed to Pi.
*
* ConversationRecordingQueryAgent appends the user message before calling
* PiQueryAgent. Pi later persists its own copy of that user message into its
* SessionManager, and this one-shot reference lets us map Pi's generated
* session entry id back to the Freya sequence.
*/
private activeUserMessageEntry: QueryAgentConversationEntryRef | null = null
/**
* Maps Pi SessionManager entry ids to Freya conversation sequences.
*
* Pi compaction reports boundaries with Pi entry ids, while our DB replay
* logic uses monotonically increasing Freya sequences. This map is the bridge
* that lets us translate Pi's firstKeptEntryId into a compacted entry range.
*/
private readonly piEntryConversationSequences = new Map<string, number>()
private disposed = false
constructor(config: PiQueryAgentConfig) {
this.userId = config.userId
this.toolbox = config.toolbox
this.apiKey = config.apiKey
this.cwd = config.cwd ?? tmpdir()
this.systemPrompt = config.systemPrompt ?? defaultSystemPrompt
this.initialEntries = config.initialEntries ?? []
}
async *ask(input: QueryAgentAsk): AsyncIterable<QueryAgentEvent> {
if (this.activeRun) {
async *ask(input: QueryAgentAsk): AsyncIterable<QueryAgentStreamEvent> {
if (this.activeConversationId !== null) {
yield {
type: "error",
message: "A query is already running for this user",
message: "A query is already running",
}
return
}
const run = Symbol(this.userId)
this.activeRun = run
this.activeConversationId = input.conversationId ?? ""
this.activeUserMessageEntry = input.userMessageEntry ?? null
let session: PiSession
try {
session = await this.getOrCreateSession()
} catch (err) {
this.clearActiveRun(run)
this.activeConversationId = null
this.activeUserMessageEntry = null
yield {
type: "error",
message: `Failed to create query session: ${errorMessage(err)}`,
@@ -75,11 +121,11 @@ export class PiQueryAgent implements QueryAgent {
return
}
const events: QueryAgentEvent[] = []
const events: QueryAgentStreamEvent[] = []
let closed = false
let wake: (() => void) | null = null
function push(event: QueryAgentEvent): void {
function push(event: QueryAgentStreamEvent): void {
events.push(event)
if (wake) {
wake()
@@ -88,7 +134,7 @@ export class PiQueryAgent implements QueryAgent {
}
let runFailed = false
function pushRunEvent(event: QueryAgentEvent): void {
function pushRunEvent(event: QueryAgentStreamEvent): void {
if (event.type === "error") {
if (runFailed) return
runFailed = true
@@ -108,7 +154,8 @@ export class PiQueryAgent implements QueryAgent {
this.handlePiEvent(event, pushRunEvent)
})
void this.runPrompt(session, input)
session
.prompt(input.message)
.then(() => {
if (runFailed) return
pushRunEvent({ type: "done" })
@@ -118,7 +165,8 @@ export class PiQueryAgent implements QueryAgent {
})
.finally(() => {
unsubscribe()
this.clearActiveRun(run)
this.activeConversationId = null
this.activeUserMessageEntry = null
close()
})
@@ -140,12 +188,19 @@ export class PiQueryAgent implements QueryAgent {
this.session?.dispose()
this.session = null
this.pendingSession = null
this.activeRun = null
this.activeConversationId = null
this.activeUserMessageEntry = null
this.clearEventListeners()
}
private clearActiveRun(run: symbol): void {
if (this.activeRun === run) {
this.activeRun = null
addEventListener<T extends QueryAgentEvent>(
type: T,
listener: QueryAgentEventListener<T>,
): () => void {
const listeners = this.listenersFor(type)
listeners.add(listener)
return () => {
listeners.delete(listener)
}
}
@@ -184,38 +239,200 @@ export class PiQueryAgent implements QueryAgent {
})
const authStorage = AuthStorage.inMemory()
if (this.apiKey) {
authStorage.setRuntimeApiKey(MODEL_PROVIDER, this.apiKey)
authStorage.setRuntimeApiKey(PI_MODEL_PROVIDER, this.apiKey)
}
const modelRegistry = ModelRegistry.inMemory(authStorage)
const model = modelRegistry.find(MODEL_PROVIDER, MODEL_ID)
const model = modelRegistry.find(PI_MODEL_PROVIDER, PI_MODEL_ID)
if (!model) {
throw new Error(`Pi model not found: ${MODEL_PROVIDER}/${MODEL_ID}`)
throw new Error(`Pi model not found: ${PI_MODEL_PROVIDER}/${PI_MODEL_ID}`)
}
const resourceLoader = new DefaultResourceLoader({
cwd: this.cwd,
agentDir: this.cwd,
settingsManager,
systemPrompt: this.systemPrompt,
extensionFactories: [this.createCompactionExtension()],
noExtensions: true,
noSkills: true,
noPromptTemplates: true,
noThemes: true,
noContextFiles: true,
})
await resourceLoader.reload()
const sessionManager = this.createMappedSessionManager()
const { session } = await createAgentSession({
cwd: this.cwd,
authStorage,
modelRegistry,
model,
resourceLoader: new InMemoryResourceLoader(this.systemPrompt),
resourceLoader,
settingsManager,
sessionManager: SessionManager.inMemory(this.cwd),
sessionManager,
noTools: "builtin",
customTools: createFreyaAgentTools({
toolbox: this.toolbox,
}),
tools: [...FREYA_AGENT_TOOL_NAMES],
tools: FREYA_AGENT_TOOL_NAMES,
})
return session
}
private async runPrompt(session: PiSession, input: QueryAgentAsk): Promise<void> {
await session.prompt(input.message)
/**
* Creates Pi's SessionManager and records Pi-id -> Freya-sequence mappings.
*
* Hydrated DB messages are mapped through createSessionManager's callback.
* Live user messages are mapped by wrapping appendMessage(), because Pi owns
* the generated session entry id for messages written during prompt handling.
*/
private createMappedSessionManager(): PiSessionManager {
this.piEntryConversationSequences.clear()
const sessionManager = createSessionManager({
cwd: this.cwd,
entries: this.initialEntries,
modelProvider: PI_MODEL_PROVIDER,
modelId: PI_MODEL_ID,
onMessageEntryAppended: (piEntryId, entry) => {
this.piEntryConversationSequences.set(piEntryId, entry.sequence)
},
})
const appendMessage = sessionManager.appendMessage.bind(sessionManager)
sessionManager.appendMessage = (message: PiSessionMessage): string => {
const piEntryId = appendMessage(message)
const sequence = this.liveConversationSequenceForMessage(message)
if (sequence !== null) {
this.piEntryConversationSequences.set(piEntryId, sequence)
}
return piEntryId
}
return sessionManager
}
private handlePiEvent(event: AgentSessionEvent, push: (event: QueryAgentEvent) => void): void {
/**
* Returns the Freya sequence for Pi's persisted live user message.
*
* We only map user messages here because they are the messages Freya writes
* before invoking Pi. Assistant/tool entries are recorded from the stream
* outside Pi's SessionManager and do not have a stable live Pi id available
* at the storage boundary.
*/
private liveConversationSequenceForMessage(message: PiSessionMessage): number | null {
if (message.role !== "user") return null
const entry = this.activeUserMessageEntry
this.activeUserMessageEntry = null
if (!entry) return null
return entry.sequence
}
/**
* Installs the minimal Pi extension used to observe compaction.
*
* session_before_compact gives us the full branch plus firstKeptEntryId, so
* we translate that boundary before Pi writes the compaction entry. The later
* session_compact event carries the saved summary, which we forward with the
* cached Freya compacted entry range.
*/
private createCompactionExtension(): ExtensionFactory {
return (pi) => {
/**
* Temporary handoff between Pi's before/after compaction hooks.
*
* session_compact receives the saved compaction entry, not the original
* branch entries needed for boundary translation.
*/
let pendingCompactedEntryRange: QueryAgentCompactedEntryRange | null = null
pi.on("session_before_compact", async (event) => {
pendingCompactedEntryRange = this.compactedEntryRangeBeforePiEntry(
event.branchEntries,
event.preparation.firstKeptEntryId,
)
})
pi.on("session_compact", async (event) => {
const conversationId = this.activeConversationId
if (!conversationId) return
const entry = event.compactionEntry
const compactedEntryRange = pendingCompactedEntryRange
pendingCompactedEntryRange = null
const compactionEvent: QueryAgentCompactionEvent = {
type: QueryAgentEvent.Compaction,
conversationId,
summary: entry.summary,
firstKeptEntryId: entry.firstKeptEntryId,
compactedEntryRange,
tokensBefore: entry.tokensBefore,
details: entry.details,
fromExtension: event.fromExtension,
}
await this.emitEvent(compactionEvent)
})
}
}
/**
* Returns the Freya entry range compacted before a Pi session entry.
*
* Pi keeps firstKeptEntryId and everything after it as raw context. Therefore
* the summary covers only mapped entries before that Pi entry. If none of
* those entries map back to Freya, we return null so storage can avoid
* recording a summary with an unsafe coverage range.
*/
private compactedEntryRangeBeforePiEntry(
branchEntries: SessionEntry[],
piEntryId: string,
): QueryAgentCompactedEntryRange | null {
let endSequence: number | null = null
for (const entry of branchEntries) {
if (entry.id === piEntryId) {
if (endSequence === null) return null
return {
startSequence: 1,
endSequence,
}
}
const sequence = this.piEntryConversationSequences.get(entry.id)
if (typeof sequence === "number") {
endSequence = sequence
}
}
return null
}
private async emitEvent<T extends QueryAgentEvent>(event: QueryAgentEventMap[T]): Promise<void> {
const listeners = this.listenersFor(event.type)
for (const listener of listeners) {
await listener(event)
}
}
private listenersFor<T extends QueryAgentEvent>(type: T): QueryAgentEventListeners[T] {
return this.eventListeners[type]
}
private clearEventListeners(): void {
for (const listeners of Object.values(this.eventListeners)) {
listeners.clear()
}
}
private handlePiEvent(
event: AgentSessionEvent,
push: (event: QueryAgentStreamEvent) => void,
): void {
switch (event.type) {
case "message_end": {
const message = piAssistantMessageError(event.message)
@@ -283,7 +500,6 @@ function piAssistantMessageError(message: PiAgentMessage): string | null {
case "toolUse":
return null
}
return null
default:
return null
}