feat: make agent event include conversation entry

This commit is contained in:
2026-07-04 00:14:14 +01:00
parent 430a095f2a
commit 7eacbfd846
7 changed files with 655 additions and 85 deletions

View File

@@ -0,0 +1,377 @@
import { AgentEventKind, type AgentEvent } from "@freya/agent-protocol"
import {
ConversationEntryKind,
ConversationEntryVisibility,
type ConversationEntry,
type ConversationEntryVisibility as ConversationEntryVisibilityType,
} from "@freya/core"
import { describe, expect, test } from "bun:test"
import type {
AppendAttachmentEntryInput,
AppendAttachmentEntryResult,
AppendConversationEntryInput,
ConversationEntryRow,
ConversationResponseStateRow,
ConversationRow,
ConversationStorage,
CreateFileInput,
FileRow,
ListConversationEntriesParams,
UpdateConversationResponseStateInput,
UpsertConversationResponseStateInput,
} from "../conversations/storage.ts"
import type { ConversationResponseStateStatus } from "../db/schema.ts"
import type { Job } from "../lib/job.ts"
import type { UserSessionManager } from "../session/index.ts"
import type {
QueryAgent,
QueryAgentAsk,
QueryAgentEvent,
QueryAgentEventListener,
QueryAgentStreamEvent,
} from "./query-agent.ts"
import { ConversationResponseStateStatus as ResponseStateStatus } from "../db/schema.ts"
import {
NotificationCentral,
type NotificationPayload,
} from "../notification/notification-central.ts"
import { AgentResponseJobExecutor, type AgentResponseJobPayload } from "./job.ts"
const ConversationId = "00000000-0000-4000-8000-000000000001"
const UserId = "user-1"
const UserEntryId = "00000000-0000-4000-8000-000000000002"
const Now = new Date("2026-07-03T00:00:00.000Z")
class FakeConversationStorage implements ConversationStorage {
readonly appended: ConversationEntry[] = []
readonly clearedConversationIds: string[] = []
readonly markedStatuses: Array<{
conversationIds: string[]
status: ConversationResponseStateStatus
}> = []
private nextSequenceValue = 2
async transaction<T>(tx: (storage: ConversationStorage) => T | Promise<T>): Promise<T> {
return tx(this)
}
async createConversation(_userId: string): Promise<ConversationRow> {
throw new Error("createConversation is not implemented")
}
async listUserConversations(_userId: string): Promise<ConversationRow[]> {
throw new Error("listUserConversations is not implemented")
}
async findConversation(conversationId: string): Promise<ConversationRow | null> {
if (conversationId !== ConversationId) return null
return conversationRow()
}
async getOrCreateConversation(_userId: string): Promise<ConversationRow> {
throw new Error("getOrCreateConversation is not implemented")
}
async createFile(_userId: string, _input: CreateFileInput): Promise<FileRow> {
throw new Error("createFile is not implemented")
}
async appendEntry(
conversationId: string,
input: AppendConversationEntryInput,
): Promise<ConversationEntry> {
const entry = conversationEntryFromAppendInput(conversationId, this.nextSequenceValue, input)
this.nextSequenceValue += 1
this.appended.push(entry)
return entry
}
async appendAttachmentEntry(
_conversationId: string,
_input: AppendAttachmentEntryInput,
): Promise<AppendAttachmentEntryResult> {
throw new Error("appendAttachmentEntry is not implemented")
}
async nextSequence(_conversationId: string): Promise<number> {
return this.nextSequenceValue
}
async listUserConversationEntries(
_userId: string,
_conversationId: string,
_params?: ListConversationEntriesParams,
): Promise<ConversationEntryRow[]> {
throw new Error("listUserConversationEntries is not implemented")
}
async listPendingUserConversationEntries(
_userId: string,
conversationId: string,
): Promise<ConversationEntryRow[]> {
return [pendingUserEntryRow(conversationId)]
}
async findConversationResponseState(
_conversationId: string,
): Promise<ConversationResponseStateRow | null> {
throw new Error("findConversationResponseState is not implemented")
}
async listPendingResponseStates(): Promise<ConversationResponseStateRow[]> {
throw new Error("listPendingResponseStates is not implemented")
}
async listRunningResponseStates(): Promise<ConversationResponseStateRow[]> {
throw new Error("listRunningResponseStates is not implemented")
}
async upsertConversationResponseState(
_conversationId: string,
_input: UpsertConversationResponseStateInput,
): Promise<ConversationResponseStateRow> {
throw new Error("upsertConversationResponseState is not implemented")
}
async updateConversationResponseState(
_conversationId: string,
_input: UpdateConversationResponseStateInput,
): Promise<ConversationResponseStateRow | null> {
throw new Error("updateConversationResponseState is not implemented")
}
async markResponseStateStatus(
conversationIds: string[],
status: ConversationResponseStateStatus,
): Promise<ConversationResponseStateRow[]> {
this.markedStatuses.push({ conversationIds, status })
return []
}
async claimPendingConversationResponseState(
conversationId: string,
): Promise<ConversationResponseStateRow | null> {
if (conversationId !== ConversationId) return null
return conversationResponseStateRow()
}
async clearConversationResponseState(conversationId: string): Promise<void> {
this.clearedConversationIds.push(conversationId)
}
}
class FakeQueryAgent implements QueryAgent {
readonly inputs: QueryAgentAsk[] = []
private readonly events: QueryAgentStreamEvent[]
constructor(events: QueryAgentStreamEvent[]) {
this.events = events
}
async *ask(input: QueryAgentAsk): AsyncIterable<QueryAgentStreamEvent> {
this.inputs.push(input)
for (const event of this.events) {
yield event
}
}
addEventListener<T extends QueryAgentEvent>(
_type: T,
_listener: QueryAgentEventListener<T>,
): () => void {
return () => {}
}
dispose(): void {}
}
describe("AgentResponseJobExecutor", () => {
test("notifies persisted conversation entries for streamed response events", async () => {
const storage = new FakeConversationStorage()
const agent = new FakeQueryAgent([
{ type: "text_delta", text: "I'll check\n" },
{ type: "tool_start", toolName: "calendar" },
{ type: "tool_end", toolName: "calendar", ok: true },
{ type: "text_delta", text: "All set" },
{ type: "done" },
])
const notifications: NotificationPayload[] = []
const notificationCentral = new NotificationCentral()
notificationCentral.registerListenerForUser(UserId, async (notification) => {
notifications.push(notification)
})
const executor = new AgentResponseJobExecutor({
conversationStorage: storage,
userSessionManager: fakeUserSessionManager(agent),
notificationCentral,
})
await executor.execute(agentResponseJob())
expect(agent.inputs).toHaveLength(1)
expect(agent.inputs[0]?.message).toContain("What's next?")
expect(agent.inputs[0]?.conversationId).toBe(ConversationId)
expect(agent.inputs[0]?.signal).toBeInstanceOf(AbortSignal)
expect(storage.appended.map((entry) => entry.kind)).toEqual([
ConversationEntryKind.AssistantMessage,
ConversationEntryKind.ToolCall,
ConversationEntryKind.ToolResult,
ConversationEntryKind.AssistantMessage,
])
expect(notifications.map((notification) => notification.payload.kind)).toEqual([
AgentEventKind.ConversationEntryCreated,
AgentEventKind.ConversationEntryCreated,
AgentEventKind.ConversationEntryCreated,
AgentEventKind.ConversationEntryCreated,
AgentEventKind.ResponseFinished,
])
expect(conversationEntryNotifications(notifications).map((event) => event.entry)).toEqual(
storage.appended,
)
expect(storage.clearedConversationIds).toEqual([ConversationId])
expect(storage.markedStatuses).toEqual([])
})
})
function fakeUserSessionManager(agent: QueryAgent): UserSessionManager {
return {
async getOrCreate(userId: string) {
expect(userId).toBe(UserId)
return { agent }
},
} as unknown as UserSessionManager
}
function agentResponseJob(): Job<AgentResponseJobPayload> {
const controller = new AbortController()
return {
id: 1,
payload: { conversationId: ConversationId },
signal: controller.signal,
}
}
function conversationEntryNotifications(
notifications: NotificationPayload[],
): Array<Extract<AgentEvent, { kind: typeof AgentEventKind.ConversationEntryCreated }>> {
return notifications
.map((notification) => notification.payload)
.filter(isConversationEntryCreatedEvent)
}
function isConversationEntryCreatedEvent(
event: AgentEvent,
): event is Extract<AgentEvent, { kind: typeof AgentEventKind.ConversationEntryCreated }> {
return event.kind === AgentEventKind.ConversationEntryCreated
}
function conversationRow(): ConversationRow {
return {
id: ConversationId,
userId: UserId,
createdAt: Now,
updatedAt: Now,
}
}
function conversationResponseStateRow(): ConversationResponseStateRow {
return {
conversationId: ConversationId,
status: ResponseStateStatus.Running,
pendingSinceEntryId: UserEntryId,
maxWaitUntil: Now,
runningSince: Now,
createdAt: Now,
updatedAt: Now,
}
}
function pendingUserEntryRow(conversationId: string): ConversationEntryRow {
return {
id: UserEntryId,
conversationId,
sequence: 1,
kind: ConversationEntryKind.UserMessage,
visibility: ConversationEntryVisibility.UserVisible,
fileId: null,
payload: {
role: "user",
parts: [{ type: "text", text: "What's next?" }],
},
metadata: {},
createdAt: Now,
}
}
function conversationEntryFromAppendInput(
conversationId: string,
sequence: number,
input: AppendConversationEntryInput,
): ConversationEntry {
const base = {
id: entryId(sequence),
conversationId,
sequence,
visibility: input.visibility ?? defaultVisibilityForKind(input.kind),
fileId: null,
metadata: input.metadata ?? {},
createdAt: Now.toISOString(),
}
switch (input.kind) {
case ConversationEntryKind.UserMessage:
return {
...base,
kind: input.kind,
payload: input.payload,
}
case ConversationEntryKind.AssistantMessage:
return {
...base,
kind: input.kind,
payload: input.payload,
}
case ConversationEntryKind.ToolCall:
case ConversationEntryKind.ToolResult:
case ConversationEntryKind.SystemNote:
return {
...base,
kind: input.kind,
payload: input.payload,
}
case ConversationEntryKind.ContextSummary:
return {
...base,
kind: input.kind,
payload: input.payload,
}
case ConversationEntryKind.Attachment:
return {
...base,
kind: input.kind,
fileId: input.fileId,
payload: input.payload,
}
}
}
function defaultVisibilityForKind(kind: ConversationEntryKind): ConversationEntryVisibilityType {
switch (kind) {
case ConversationEntryKind.UserMessage:
case ConversationEntryKind.AssistantMessage:
case ConversationEntryKind.Attachment:
return ConversationEntryVisibility.UserVisible
case ConversationEntryKind.ToolCall:
case ConversationEntryKind.ToolResult:
case ConversationEntryKind.ContextSummary:
case ConversationEntryKind.SystemNote:
return ConversationEntryVisibility.Internal
}
}
function entryId(sequence: number): string {
return `00000000-0000-4000-8000-${sequence.toString().padStart(12, "0")}`
}

View File

@@ -1,11 +1,10 @@
import type { AgentEvent } from "@freya/agent-protocol"
import { AgentEventKind } from "@freya/agent-protocol"
import {
AssistantMessagePayload,
ConversationEntryKind,
UserMessagePayload,
ToolCallPayload,
ToolResultPayload,
UserMessagePayload,
} from "@freya/core"
import { type } from "arktype"
@@ -16,7 +15,11 @@ import type { NotificationCentral } from "../notification/notification-central"
import type { UserSessionManager } from "../session"
import { ConversationResponseStateStatus } from "../db/schema"
import { streamAgentResponse } from "./streaming"
import {
AgentResponseStreamEventKind,
streamAgentResponse,
type AgentResponseStreamEvent,
} from "./streaming"
export interface AgentResponseJobPayload {
conversationId: string
@@ -81,17 +84,17 @@ export class AgentResponseJobExecutor implements JobExecutor<AgentResponseJobPay
try {
for await (const event of streamAgentResponse({
agent: session.agent,
input: { message, signal: job.signal },
input: {
message,
conversationId: conversation.id,
signal: job.signal,
},
})) {
if (job.signal.aborted) {
break
}
await this.recordAgentEvent(event, conversation.id)
await this.notificationCentral.notifyUser(conversation.userId, {
kind: "agent",
payload: event,
})
await this.handleStreamEvent(event, conversation.id, conversation.userId)
}
// if job is aborted, stop everything immediately, including clean up.
@@ -110,35 +113,94 @@ export class AgentResponseJobExecutor implements JobExecutor<AgentResponseJobPay
}
}
private async recordAgentEvent(event: AgentEvent, conversationId: string) {
switch (event.type) {
case "message_created":
await this.conversationStorage.appendEntry(conversationId, {
private async handleStreamEvent(
event: AgentResponseStreamEvent,
conversationId: string,
userId: string,
): Promise<void> {
switch (event.kind) {
case AgentResponseStreamEventKind.ConversationStarted:
await this.notificationCentral.notifyUser(userId, {
kind: "agent",
payload: {
kind: AgentEventKind.ConversationStarted,
conversationId: event.conversationId,
},
})
break
case AgentResponseStreamEventKind.AssistantMessage: {
const entry = await this.conversationStorage.appendEntry(conversationId, {
kind: ConversationEntryKind.AssistantMessage,
payload: {
role: "assistant",
parts: [{ type: "text", text: event.text }],
} satisfies AssistantMessagePayload,
})
await this.notificationCentral.notifyUser(userId, {
kind: "agent",
payload: {
kind: AgentEventKind.ConversationEntryCreated,
entry,
},
})
break
}
case "tool_started":
await this.conversationStorage.appendEntry(conversationId, {
case AgentResponseStreamEventKind.ToolStarted: {
const entry = await this.conversationStorage.appendEntry(conversationId, {
kind: ConversationEntryKind.ToolCall,
payload: {
toolName: event.toolName,
} satisfies ToolCallPayload,
})
await this.notificationCentral.notifyUser(userId, {
kind: "agent",
payload: {
kind: AgentEventKind.ConversationEntryCreated,
entry,
},
})
break
}
case "tool_finished":
await this.conversationStorage.appendEntry(conversationId, {
case AgentResponseStreamEventKind.ToolFinished: {
const entry = await this.conversationStorage.appendEntry(conversationId, {
kind: ConversationEntryKind.ToolResult,
payload: {
toolName: event.toolName,
ok: event.ok,
} satisfies ToolResultPayload,
})
await this.notificationCentral.notifyUser(userId, {
kind: "agent",
payload: {
kind: AgentEventKind.ConversationEntryCreated,
entry,
},
})
break
}
case AgentResponseStreamEventKind.ResponseFinished:
await this.notificationCentral.notifyUser(userId, {
kind: "agent",
payload: {
kind: AgentEventKind.ResponseFinished,
conversationId,
},
})
break
case AgentResponseStreamEventKind.ResponseFailed:
await this.notificationCentral.notifyUser(userId, {
kind: "agent",
payload: {
kind: AgentEventKind.ResponseFailed,
conversationId,
error: event.error,
},
})
break
}
}

View File

@@ -1,5 +1,3 @@
import type { AgentEvent } from "@freya/agent-protocol"
import { describe, expect, test } from "bun:test"
import type {
@@ -10,7 +8,11 @@ import type {
QueryAgentStreamEvent,
} from "./query-agent.ts"
import { streamAgentResponse } from "./streaming.ts"
import {
AgentResponseStreamEventKind,
streamAgentResponse,
type AgentResponseStreamEvent,
} from "./streaming.ts"
class FakeQueryAgent implements QueryAgent {
readonly inputs: QueryAgentAsk[] = []
@@ -54,11 +56,14 @@ describe("streamAgentResponse", () => {
)
expect(events).toEqual([
{ type: "conversation_started", conversationId: "conversation-1" },
{ type: "message_created", text: "First message" },
{ type: "message_created", text: "Second message" },
{ type: "message_created", text: "Third message" },
{ type: "message_finished" },
{
kind: AgentResponseStreamEventKind.ConversationStarted,
conversationId: "conversation-1",
},
{ kind: AgentResponseStreamEventKind.AssistantMessage, text: "First message" },
{ kind: AgentResponseStreamEventKind.AssistantMessage, text: "Second message" },
{ kind: AgentResponseStreamEventKind.AssistantMessage, text: "Third message" },
{ kind: AgentResponseStreamEventKind.ResponseFinished },
])
})
@@ -77,10 +82,13 @@ describe("streamAgentResponse", () => {
)
expect(events).toEqual([
{ type: "conversation_started", conversationId: "conversation-1" },
{ type: "message_created", text: " const value = 1 " },
{ type: "message_created", text: " return value" },
{ type: "message_finished" },
{
kind: AgentResponseStreamEventKind.ConversationStarted,
conversationId: "conversation-1",
},
{ kind: AgentResponseStreamEventKind.AssistantMessage, text: " const value = 1 " },
{ kind: AgentResponseStreamEventKind.AssistantMessage, text: " return value" },
{ kind: AgentResponseStreamEventKind.ResponseFinished },
])
})
@@ -97,25 +105,35 @@ describe("streamAgentResponse", () => {
agent,
input: { message: "hello" },
})
const events: AgentEvent[] = []
const events: AgentResponseStreamEvent[] = []
await expect(collectStreamAgentResponse(stream, events)).rejects.toThrow("model unavailable")
expect(events).toEqual([
{ type: "conversation_started", conversationId: "conversation-1" },
{ type: "message_created", text: "I'll check" },
{ type: "tool_started", toolName: "calendar" },
{ type: "tool_finished", toolName: "calendar", ok: false },
{ type: "message_created", text: "That failed" },
{ type: "message_failed", error: "model unavailable" },
{
kind: AgentResponseStreamEventKind.ConversationStarted,
conversationId: "conversation-1",
},
{ kind: AgentResponseStreamEventKind.AssistantMessage, text: "I'll check" },
{ kind: AgentResponseStreamEventKind.ToolStarted, toolName: "calendar" },
{
kind: AgentResponseStreamEventKind.ToolFinished,
toolName: "calendar",
ok: false,
},
{ kind: AgentResponseStreamEventKind.AssistantMessage, text: "That failed" },
{
kind: AgentResponseStreamEventKind.ResponseFailed,
error: "model unavailable",
},
])
})
})
async function collectStreamAgentResponse(
stream: AsyncIterable<AgentEvent>,
events: AgentEvent[] = [],
): Promise<AgentEvent[]> {
stream: AsyncIterable<AgentResponseStreamEvent>,
events: AgentResponseStreamEvent[] = [],
): Promise<AgentResponseStreamEvent[]> {
for await (const event of stream) {
events.push(event)
}

View File

@@ -1,8 +1,42 @@
import type { AgentEvent } from "@freya/agent-protocol"
import type { QueryAgent, QueryAgentAsk } from "./query-agent.ts"
export type AgentResponseStreamItem = { type: "event"; event: AgentEvent }
export const AgentResponseStreamEventKind = {
ConversationStarted: "conversation_started",
AssistantMessage: "assistant_message",
ToolStarted: "tool_started",
ToolFinished: "tool_finished",
ResponseFinished: "response_finished",
ResponseFailed: "response_failed",
} as const
export type AgentResponseStreamEventKind =
(typeof AgentResponseStreamEventKind)[keyof typeof AgentResponseStreamEventKind]
export type AgentResponseStreamEvent =
| {
kind: typeof AgentResponseStreamEventKind.ConversationStarted
conversationId: string
}
| {
kind: typeof AgentResponseStreamEventKind.AssistantMessage
text: string
}
| {
kind: typeof AgentResponseStreamEventKind.ToolStarted
toolName: string
}
| {
kind: typeof AgentResponseStreamEventKind.ToolFinished
toolName: string
ok: boolean
}
| {
kind: typeof AgentResponseStreamEventKind.ResponseFinished
}
| {
kind: typeof AgentResponseStreamEventKind.ResponseFailed
error: string
}
export async function* streamAgentResponse({
agent,
@@ -10,18 +44,16 @@ export async function* streamAgentResponse({
}: {
agent: QueryAgent
input: QueryAgentAsk
}): AsyncGenerator<AgentEvent, void, void> {
let message = ""
let conversationId: string | null = null
}): AsyncGenerator<AgentResponseStreamEvent, void, void> {
const splitter = new AgentMessageSplitter()
function messageEvent(text: string): AgentEvent | null {
function messageEvent(text: string): AgentResponseStreamEvent | null {
if (text.trim() === "") return null
return { type: "message_created", text }
return { kind: AgentResponseStreamEventKind.AssistantMessage, text }
}
function flushPendingMessage(): AgentEvent | null {
function flushPendingMessage(): AgentResponseStreamEvent | null {
const text = splitter.flush()
if (text === null) return null
@@ -35,12 +67,13 @@ export async function* streamAgentResponse({
switch (event.type) {
case "conversation":
conversationId = event.conversationId
yield { type: "conversation_started", conversationId }
yield {
kind: AgentResponseStreamEventKind.ConversationStarted,
conversationId: event.conversationId,
}
break
case "text_delta":
message += event.text
for (const line of splitter.push(event.text)) {
const item = messageEvent(line)
if (item) yield item
@@ -52,7 +85,10 @@ export async function* streamAgentResponse({
const item = flushPendingMessage()
if (item) yield item
}
yield { type: "tool_started", toolName: event.toolName }
yield {
kind: AgentResponseStreamEventKind.ToolStarted,
toolName: event.toolName,
}
break
case "tool_end":
@@ -61,7 +97,7 @@ export async function* streamAgentResponse({
if (item) yield item
}
yield {
type: "tool_finished",
kind: AgentResponseStreamEventKind.ToolFinished,
toolName: event.toolName,
ok: event.ok,
}
@@ -72,7 +108,10 @@ export async function* streamAgentResponse({
const item = flushPendingMessage()
if (item) yield item
}
yield { type: "message_failed", error: event.message }
yield {
kind: AgentResponseStreamEventKind.ResponseFailed,
error: event.message,
}
throw new Error(event.message)
case "done":
@@ -80,7 +119,7 @@ export async function* streamAgentResponse({
const item = flushPendingMessage()
if (item) yield item
}
yield { type: "message_finished" }
yield { kind: AgentResponseStreamEventKind.ResponseFinished }
return
}
}
@@ -88,7 +127,7 @@ export async function* streamAgentResponse({
const item = flushPendingMessage()
if (item) yield item
yield { type: "message_finished" }
yield { kind: AgentResponseStreamEventKind.ResponseFinished }
}
class AgentMessageSplitter {