From f1f81e0b7cf30f5bc876683d84ba35b0303ad958 Mon Sep 17 00:00:00 2001 From: Kenneth Date: Tue, 16 Jun 2026 20:10:55 +0100 Subject: [PATCH] feat: add conversation storage --- .../drizzle/0001_misty_white_tiger.sql | 50 +- .../drizzle/meta/0001_snapshot.json | 986 ++++++++++++------ apps/freya-backend/src/admin/http.test.ts | 14 + ...conversation-recording-query-agent.test.ts | 347 ++++++ .../conversation-recording-query-agent.ts | 252 +++++ apps/freya-backend/src/agent/http.test.ts | 42 +- apps/freya-backend/src/agent/http.ts | 2 + .../src/agent/in-memory-resource-loader.ts | 43 - .../src/agent/pi-query-agent.test.ts | 407 +++++++- .../freya-backend/src/agent/pi-query-agent.ts | 286 ++++- apps/freya-backend/src/agent/query-agent.ts | 63 +- .../src/agent/session-manager.test.ts | 156 +++ .../src/agent/session-manager.ts | 188 ++++ .../src/conversations/storage.ts | 373 +++++++ .../src/conversations/types.test.ts | 146 +++ apps/freya-backend/src/conversations/types.ts | 136 +++ apps/freya-backend/src/db/schema.ts | 86 ++ apps/freya-backend/src/engine/http.test.ts | 14 + .../src/session/user-session-manager.test.ts | 65 ++ .../src/session/user-session-manager.ts | 28 +- .../src/session/user-session.test.ts | 67 ++ .../freya-backend/src/session/user-session.ts | 88 +- apps/freya-backend/src/sources/http.test.ts | 14 + 23 files changed, 3457 insertions(+), 396 deletions(-) create mode 100644 apps/freya-backend/src/agent/conversation-recording-query-agent.test.ts create mode 100644 apps/freya-backend/src/agent/conversation-recording-query-agent.ts delete mode 100644 apps/freya-backend/src/agent/in-memory-resource-loader.ts create mode 100644 apps/freya-backend/src/agent/session-manager.test.ts create mode 100644 apps/freya-backend/src/agent/session-manager.ts create mode 100644 apps/freya-backend/src/conversations/storage.ts create mode 100644 apps/freya-backend/src/conversations/types.test.ts create mode 100644 apps/freya-backend/src/conversations/types.ts diff --git a/apps/freya-backend/drizzle/0001_misty_white_tiger.sql b/apps/freya-backend/drizzle/0001_misty_white_tiger.sql index 0fc4963..200c417 100644 --- a/apps/freya-backend/drizzle/0001_misty_white_tiger.sql +++ b/apps/freya-backend/drizzle/0001_misty_white_tiger.sql @@ -1 +1,49 @@ -CREATE INDEX "user_sources_user_id_enabled_idx" ON "user_sources" USING btree ("user_id","enabled"); \ No newline at end of file +CREATE INDEX "user_sources_user_id_enabled_idx" ON "user_sources" USING btree ("user_id","enabled");--> statement-breakpoint +CREATE TABLE "conversation_entries" ( + "id" uuid PRIMARY KEY DEFAULT gen_random_uuid() NOT NULL, + "conversation_id" uuid NOT NULL, + "sequence" integer NOT NULL, + "kind" text NOT NULL, + "visibility" text DEFAULT 'internal' NOT NULL, + "file_id" uuid, + "payload" jsonb NOT NULL, + "metadata" jsonb DEFAULT '{}'::jsonb NOT NULL, + "created_at" timestamp DEFAULT now() NOT NULL, + CONSTRAINT "conversation_entries_conversation_id_sequence_unique" UNIQUE("conversation_id","sequence") +); +--> statement-breakpoint +CREATE TABLE "conversations" ( + "id" uuid PRIMARY KEY DEFAULT gen_random_uuid() NOT NULL, + "user_id" text NOT NULL, + "created_at" timestamp DEFAULT now() NOT NULL, + "updated_at" timestamp DEFAULT now() NOT NULL +); +--> statement-breakpoint +CREATE TABLE "files" ( + "id" uuid PRIMARY KEY DEFAULT gen_random_uuid() NOT NULL, + "user_id" text NOT NULL, + "storage_key" text NOT NULL, + "original_name" text, + "mime_type" text NOT NULL, + "size_bytes" integer NOT NULL, + "metadata" jsonb DEFAULT '{}'::jsonb NOT NULL, + "created_at" timestamp DEFAULT now() NOT NULL, + CONSTRAINT "files_storage_key_unique" UNIQUE("storage_key") +); +--> statement-breakpoint +ALTER TABLE "session" ADD COLUMN "impersonated_by" text;--> statement-breakpoint +ALTER TABLE "user" ADD COLUMN "role" text;--> statement-breakpoint +ALTER TABLE "user" ADD COLUMN "banned" boolean DEFAULT false;--> statement-breakpoint +ALTER TABLE "user" ADD COLUMN "ban_reason" text;--> statement-breakpoint +ALTER TABLE "user" ADD COLUMN "ban_expires" timestamp;--> statement-breakpoint +ALTER TABLE "conversation_entries" ADD CONSTRAINT "conversation_entries_conversation_id_conversations_id_fk" FOREIGN KEY ("conversation_id") REFERENCES "public"."conversations"("id") ON DELETE cascade ON UPDATE no action;--> statement-breakpoint +ALTER TABLE "conversation_entries" ADD CONSTRAINT "conversation_entries_file_id_files_id_fk" FOREIGN KEY ("file_id") REFERENCES "public"."files"("id") ON DELETE restrict ON UPDATE no action;--> statement-breakpoint +ALTER TABLE "conversation_entries" ADD CONSTRAINT "conversation_entries_attachment_file_id_check" CHECK (("conversation_entries"."kind" = 'attachment' and "conversation_entries"."file_id" is not null) or ("conversation_entries"."kind" <> 'attachment' and "conversation_entries"."file_id" is null));--> statement-breakpoint +ALTER TABLE "conversations" ADD CONSTRAINT "conversations_user_id_user_id_fk" FOREIGN KEY ("user_id") REFERENCES "public"."user"("id") ON DELETE cascade ON UPDATE no action;--> statement-breakpoint +ALTER TABLE "files" ADD CONSTRAINT "files_user_id_user_id_fk" FOREIGN KEY ("user_id") REFERENCES "public"."user"("id") ON DELETE cascade ON UPDATE no action;--> statement-breakpoint +CREATE INDEX "conversation_entries_conversation_id_sequence_idx" ON "conversation_entries" USING btree ("conversation_id","sequence");--> statement-breakpoint +CREATE INDEX "conversation_entries_conversation_id_visibility_sequence_idx" ON "conversation_entries" USING btree ("conversation_id","visibility","sequence");--> statement-breakpoint +CREATE INDEX "conversation_entries_kind_idx" ON "conversation_entries" USING btree ("kind");--> statement-breakpoint +CREATE INDEX "conversation_entries_file_id_idx" ON "conversation_entries" USING btree ("file_id");--> statement-breakpoint +CREATE INDEX "conversations_user_id_updated_at_idx" ON "conversations" USING btree ("user_id","updated_at");--> statement-breakpoint +CREATE INDEX "files_user_id_created_at_idx" ON "files" USING btree ("user_id","created_at"); diff --git a/apps/freya-backend/drizzle/meta/0001_snapshot.json b/apps/freya-backend/drizzle/meta/0001_snapshot.json index 3869111..1a35a45 100644 --- a/apps/freya-backend/drizzle/meta/0001_snapshot.json +++ b/apps/freya-backend/drizzle/meta/0001_snapshot.json @@ -125,6 +125,671 @@ "checkConstraints": {}, "isRLSEnabled": false }, + "public.conversation_entries": { + "name": "conversation_entries", + "schema": "", + "columns": { + "id": { + "name": "id", + "type": "uuid", + "primaryKey": true, + "notNull": true, + "default": "gen_random_uuid()" + }, + "conversation_id": { + "name": "conversation_id", + "type": "uuid", + "primaryKey": false, + "notNull": true + }, + "sequence": { + "name": "sequence", + "type": "integer", + "primaryKey": false, + "notNull": true + }, + "kind": { + "name": "kind", + "type": "text", + "primaryKey": false, + "notNull": true + }, + "visibility": { + "name": "visibility", + "type": "text", + "primaryKey": false, + "notNull": true, + "default": "'internal'" + }, + "file_id": { + "name": "file_id", + "type": "uuid", + "primaryKey": false, + "notNull": false + }, + "payload": { + "name": "payload", + "type": "jsonb", + "primaryKey": false, + "notNull": true + }, + "metadata": { + "name": "metadata", + "type": "jsonb", + "primaryKey": false, + "notNull": true, + "default": "'{}'::jsonb" + }, + "created_at": { + "name": "created_at", + "type": "timestamp", + "primaryKey": false, + "notNull": true, + "default": "now()" + } + }, + "indexes": { + "conversation_entries_conversation_id_sequence_idx": { + "name": "conversation_entries_conversation_id_sequence_idx", + "columns": [ + { + "expression": "conversation_id", + "isExpression": false, + "asc": true, + "nulls": "last" + }, + { + "expression": "sequence", + "isExpression": false, + "asc": true, + "nulls": "last" + } + ], + "isUnique": false, + "concurrently": false, + "method": "btree", + "with": {} + }, + "conversation_entries_conversation_id_visibility_sequence_idx": { + "name": "conversation_entries_conversation_id_visibility_sequence_idx", + "columns": [ + { + "expression": "conversation_id", + "isExpression": false, + "asc": true, + "nulls": "last" + }, + { + "expression": "visibility", + "isExpression": false, + "asc": true, + "nulls": "last" + }, + { + "expression": "sequence", + "isExpression": false, + "asc": true, + "nulls": "last" + } + ], + "isUnique": false, + "concurrently": false, + "method": "btree", + "with": {} + }, + "conversation_entries_kind_idx": { + "name": "conversation_entries_kind_idx", + "columns": [ + { + "expression": "kind", + "isExpression": false, + "asc": true, + "nulls": "last" + } + ], + "isUnique": false, + "concurrently": false, + "method": "btree", + "with": {} + }, + "conversation_entries_file_id_idx": { + "name": "conversation_entries_file_id_idx", + "columns": [ + { + "expression": "file_id", + "isExpression": false, + "asc": true, + "nulls": "last" + } + ], + "isUnique": false, + "concurrently": false, + "method": "btree", + "with": {} + } + }, + "foreignKeys": { + "conversation_entries_conversation_id_conversations_id_fk": { + "name": "conversation_entries_conversation_id_conversations_id_fk", + "tableFrom": "conversation_entries", + "tableTo": "conversations", + "columnsFrom": [ + "conversation_id" + ], + "columnsTo": [ + "id" + ], + "onDelete": "cascade", + "onUpdate": "no action" + }, + "conversation_entries_file_id_files_id_fk": { + "name": "conversation_entries_file_id_files_id_fk", + "tableFrom": "conversation_entries", + "tableTo": "files", + "columnsFrom": [ + "file_id" + ], + "columnsTo": [ + "id" + ], + "onDelete": "restrict", + "onUpdate": "no action" + } + }, + "compositePrimaryKeys": {}, + "uniqueConstraints": { + "conversation_entries_conversation_id_sequence_unique": { + "name": "conversation_entries_conversation_id_sequence_unique", + "nullsNotDistinct": false, + "columns": [ + "conversation_id", + "sequence" + ] + } + }, + "policies": {}, + "checkConstraints": { + "conversation_entries_attachment_file_id_check": { + "name": "conversation_entries_attachment_file_id_check", + "value": "(\"conversation_entries\".\"kind\" = 'attachment' and \"conversation_entries\".\"file_id\" is not null) or (\"conversation_entries\".\"kind\" <> 'attachment' and \"conversation_entries\".\"file_id\" is null)" + } + }, + "isRLSEnabled": false + }, + "public.conversations": { + "name": "conversations", + "schema": "", + "columns": { + "id": { + "name": "id", + "type": "uuid", + "primaryKey": true, + "notNull": true, + "default": "gen_random_uuid()" + }, + "user_id": { + "name": "user_id", + "type": "text", + "primaryKey": false, + "notNull": true + }, + "created_at": { + "name": "created_at", + "type": "timestamp", + "primaryKey": false, + "notNull": true, + "default": "now()" + }, + "updated_at": { + "name": "updated_at", + "type": "timestamp", + "primaryKey": false, + "notNull": true, + "default": "now()" + } + }, + "indexes": { + "conversations_user_id_updated_at_idx": { + "name": "conversations_user_id_updated_at_idx", + "columns": [ + { + "expression": "user_id", + "isExpression": false, + "asc": true, + "nulls": "last" + }, + { + "expression": "updated_at", + "isExpression": false, + "asc": true, + "nulls": "last" + } + ], + "isUnique": false, + "concurrently": false, + "method": "btree", + "with": {} + } + }, + "foreignKeys": { + "conversations_user_id_user_id_fk": { + "name": "conversations_user_id_user_id_fk", + "tableFrom": "conversations", + "tableTo": "user", + "columnsFrom": [ + "user_id" + ], + "columnsTo": [ + "id" + ], + "onDelete": "cascade", + "onUpdate": "no action" + } + }, + "compositePrimaryKeys": {}, + "uniqueConstraints": {}, + "policies": {}, + "checkConstraints": {}, + "isRLSEnabled": false + }, + "public.files": { + "name": "files", + "schema": "", + "columns": { + "id": { + "name": "id", + "type": "uuid", + "primaryKey": true, + "notNull": true, + "default": "gen_random_uuid()" + }, + "user_id": { + "name": "user_id", + "type": "text", + "primaryKey": false, + "notNull": true + }, + "storage_key": { + "name": "storage_key", + "type": "text", + "primaryKey": false, + "notNull": true + }, + "original_name": { + "name": "original_name", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "mime_type": { + "name": "mime_type", + "type": "text", + "primaryKey": false, + "notNull": true + }, + "size_bytes": { + "name": "size_bytes", + "type": "integer", + "primaryKey": false, + "notNull": true + }, + "metadata": { + "name": "metadata", + "type": "jsonb", + "primaryKey": false, + "notNull": true, + "default": "'{}'::jsonb" + }, + "created_at": { + "name": "created_at", + "type": "timestamp", + "primaryKey": false, + "notNull": true, + "default": "now()" + } + }, + "indexes": { + "files_user_id_created_at_idx": { + "name": "files_user_id_created_at_idx", + "columns": [ + { + "expression": "user_id", + "isExpression": false, + "asc": true, + "nulls": "last" + }, + { + "expression": "created_at", + "isExpression": false, + "asc": true, + "nulls": "last" + } + ], + "isUnique": false, + "concurrently": false, + "method": "btree", + "with": {} + } + }, + "foreignKeys": { + "files_user_id_user_id_fk": { + "name": "files_user_id_user_id_fk", + "tableFrom": "files", + "tableTo": "user", + "columnsFrom": [ + "user_id" + ], + "columnsTo": [ + "id" + ], + "onDelete": "cascade", + "onUpdate": "no action" + } + }, + "compositePrimaryKeys": {}, + "uniqueConstraints": { + "files_storage_key_unique": { + "name": "files_storage_key_unique", + "nullsNotDistinct": false, + "columns": [ + "storage_key" + ] + } + }, + "policies": {}, + "checkConstraints": {}, + "isRLSEnabled": false + }, + "public.reminder_occurrence_overrides": { + "name": "reminder_occurrence_overrides", + "schema": "", + "columns": { + "id": { + "name": "id", + "type": "uuid", + "primaryKey": true, + "notNull": true, + "default": "gen_random_uuid()" + }, + "user_id": { + "name": "user_id", + "type": "text", + "primaryKey": false, + "notNull": true + }, + "reminder_id": { + "name": "reminder_id", + "type": "uuid", + "primaryKey": false, + "notNull": true + }, + "occurrence_id": { + "name": "occurrence_id", + "type": "text", + "primaryKey": false, + "notNull": true + }, + "original_due_at": { + "name": "original_due_at", + "type": "timestamp", + "primaryKey": false, + "notNull": true + }, + "patch": { + "name": "patch", + "type": "jsonb", + "primaryKey": false, + "notNull": false + }, + "completed_at": { + "name": "completed_at", + "type": "timestamp", + "primaryKey": false, + "notNull": false + }, + "deleted_at": { + "name": "deleted_at", + "type": "timestamp", + "primaryKey": false, + "notNull": false + }, + "created_at": { + "name": "created_at", + "type": "timestamp", + "primaryKey": false, + "notNull": true, + "default": "now()" + }, + "updated_at": { + "name": "updated_at", + "type": "timestamp", + "primaryKey": false, + "notNull": true, + "default": "now()" + } + }, + "indexes": { + "reminder_occurrence_overrides_user_id_reminder_id_idx": { + "name": "reminder_occurrence_overrides_user_id_reminder_id_idx", + "columns": [ + { + "expression": "user_id", + "isExpression": false, + "asc": true, + "nulls": "last" + }, + { + "expression": "reminder_id", + "isExpression": false, + "asc": true, + "nulls": "last" + } + ], + "isUnique": false, + "concurrently": false, + "method": "btree", + "with": {} + }, + "reminder_occurrence_overrides_user_id_original_due_at_idx": { + "name": "reminder_occurrence_overrides_user_id_original_due_at_idx", + "columns": [ + { + "expression": "user_id", + "isExpression": false, + "asc": true, + "nulls": "last" + }, + { + "expression": "original_due_at", + "isExpression": false, + "asc": true, + "nulls": "last" + } + ], + "isUnique": false, + "concurrently": false, + "method": "btree", + "with": {} + } + }, + "foreignKeys": { + "reminder_occurrence_overrides_user_id_user_id_fk": { + "name": "reminder_occurrence_overrides_user_id_user_id_fk", + "tableFrom": "reminder_occurrence_overrides", + "tableTo": "user", + "columnsFrom": [ + "user_id" + ], + "columnsTo": [ + "id" + ], + "onDelete": "cascade", + "onUpdate": "no action" + }, + "reminder_occurrence_overrides_reminder_id_reminders_id_fk": { + "name": "reminder_occurrence_overrides_reminder_id_reminders_id_fk", + "tableFrom": "reminder_occurrence_overrides", + "tableTo": "reminders", + "columnsFrom": [ + "reminder_id" + ], + "columnsTo": [ + "id" + ], + "onDelete": "cascade", + "onUpdate": "no action" + } + }, + "compositePrimaryKeys": {}, + "uniqueConstraints": { + "reminder_occurrence_overrides_reminder_id_occurrence_id_unique": { + "name": "reminder_occurrence_overrides_reminder_id_occurrence_id_unique", + "nullsNotDistinct": false, + "columns": [ + "reminder_id", + "occurrence_id" + ] + } + }, + "policies": {}, + "checkConstraints": {}, + "isRLSEnabled": false + }, + "public.reminders": { + "name": "reminders", + "schema": "", + "columns": { + "id": { + "name": "id", + "type": "uuid", + "primaryKey": true, + "notNull": true, + "default": "gen_random_uuid()" + }, + "user_id": { + "name": "user_id", + "type": "text", + "primaryKey": false, + "notNull": true + }, + "title": { + "name": "title", + "type": "text", + "primaryKey": false, + "notNull": true + }, + "notes": { + "name": "notes", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "due_at": { + "name": "due_at", + "type": "timestamp", + "primaryKey": false, + "notNull": true + }, + "time_zone": { + "name": "time_zone", + "type": "text", + "primaryKey": false, + "notNull": true, + "default": "'UTC'" + }, + "recurrence": { + "name": "recurrence", + "type": "jsonb", + "primaryKey": false, + "notNull": false + }, + "priority": { + "name": "priority", + "type": "text", + "primaryKey": false, + "notNull": true, + "default": "'normal'" + }, + "created_at": { + "name": "created_at", + "type": "timestamp", + "primaryKey": false, + "notNull": true, + "default": "now()" + }, + "updated_at": { + "name": "updated_at", + "type": "timestamp", + "primaryKey": false, + "notNull": true, + "default": "now()" + } + }, + "indexes": { + "reminders_user_id_due_at_idx": { + "name": "reminders_user_id_due_at_idx", + "columns": [ + { + "expression": "user_id", + "isExpression": false, + "asc": true, + "nulls": "last" + }, + { + "expression": "due_at", + "isExpression": false, + "asc": true, + "nulls": "last" + } + ], + "isUnique": false, + "concurrently": false, + "method": "btree", + "with": {} + }, + "reminders_user_id_updated_at_idx": { + "name": "reminders_user_id_updated_at_idx", + "columns": [ + { + "expression": "user_id", + "isExpression": false, + "asc": true, + "nulls": "last" + }, + { + "expression": "updated_at", + "isExpression": false, + "asc": true, + "nulls": "last" + } + ], + "isUnique": false, + "concurrently": false, + "method": "btree", + "with": {} + } + }, + "foreignKeys": { + "reminders_user_id_user_id_fk": { + "name": "reminders_user_id_user_id_fk", + "tableFrom": "reminders", + "tableTo": "user", + "columnsFrom": [ + "user_id" + ], + "columnsTo": [ + "id" + ], + "onDelete": "cascade", + "onUpdate": "no action" + } + }, + "compositePrimaryKeys": {}, + "uniqueConstraints": {}, + "policies": {}, + "checkConstraints": {}, + "isRLSEnabled": false + }, "public.session": { "name": "session", "schema": "", @@ -176,6 +841,12 @@ "type": "text", "primaryKey": false, "notNull": true + }, + "impersonated_by": { + "name": "impersonated_by", + "type": "text", + "primaryKey": false, + "notNull": false } }, "indexes": { @@ -270,6 +941,31 @@ "type": "timestamp", "primaryKey": false, "notNull": true + }, + "role": { + "name": "role", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "banned": { + "name": "banned", + "type": "boolean", + "primaryKey": false, + "notNull": false, + "default": false + }, + "ban_reason": { + "name": "ban_reason", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "ban_expires": { + "name": "ban_expires", + "type": "timestamp", + "primaryKey": false, + "notNull": false } }, "indexes": {}, @@ -463,296 +1159,6 @@ "policies": {}, "checkConstraints": {}, "isRLSEnabled": false - }, - "public.reminders": { - "name": "reminders", - "schema": "", - "columns": { - "id": { - "name": "id", - "type": "uuid", - "primaryKey": true, - "notNull": true, - "default": "gen_random_uuid()" - }, - "user_id": { - "name": "user_id", - "type": "text", - "primaryKey": false, - "notNull": true - }, - "title": { - "name": "title", - "type": "text", - "primaryKey": false, - "notNull": true - }, - "notes": { - "name": "notes", - "type": "text", - "primaryKey": false, - "notNull": false - }, - "due_at": { - "name": "due_at", - "type": "timestamp", - "primaryKey": false, - "notNull": true - }, - "time_zone": { - "name": "time_zone", - "type": "text", - "primaryKey": false, - "notNull": true, - "default": "'UTC'" - }, - "recurrence": { - "name": "recurrence", - "type": "jsonb", - "primaryKey": false, - "notNull": false - }, - "priority": { - "name": "priority", - "type": "text", - "primaryKey": false, - "notNull": true, - "default": "'normal'" - }, - "created_at": { - "name": "created_at", - "type": "timestamp", - "primaryKey": false, - "notNull": true, - "default": "now()" - }, - "updated_at": { - "name": "updated_at", - "type": "timestamp", - "primaryKey": false, - "notNull": true, - "default": "now()" - } - }, - "indexes": { - "reminders_user_id_due_at_idx": { - "name": "reminders_user_id_due_at_idx", - "columns": [ - { - "expression": "user_id", - "isExpression": false, - "asc": true, - "nulls": "last" - }, - { - "expression": "due_at", - "isExpression": false, - "asc": true, - "nulls": "last" - } - ], - "isUnique": false, - "concurrently": false, - "method": "btree", - "with": {} - }, - "reminders_user_id_updated_at_idx": { - "name": "reminders_user_id_updated_at_idx", - "columns": [ - { - "expression": "user_id", - "isExpression": false, - "asc": true, - "nulls": "last" - }, - { - "expression": "updated_at", - "isExpression": false, - "asc": true, - "nulls": "last" - } - ], - "isUnique": false, - "concurrently": false, - "method": "btree", - "with": {} - } - }, - "foreignKeys": { - "reminders_user_id_user_id_fk": { - "name": "reminders_user_id_user_id_fk", - "tableFrom": "reminders", - "tableTo": "user", - "columnsFrom": [ - "user_id" - ], - "columnsTo": [ - "id" - ], - "onDelete": "cascade", - "onUpdate": "no action" - } - }, - "compositePrimaryKeys": {}, - "uniqueConstraints": {}, - "policies": {}, - "checkConstraints": {}, - "isRLSEnabled": false - }, - "public.reminder_occurrence_overrides": { - "name": "reminder_occurrence_overrides", - "schema": "", - "columns": { - "id": { - "name": "id", - "type": "uuid", - "primaryKey": true, - "notNull": true, - "default": "gen_random_uuid()" - }, - "user_id": { - "name": "user_id", - "type": "text", - "primaryKey": false, - "notNull": true - }, - "reminder_id": { - "name": "reminder_id", - "type": "uuid", - "primaryKey": false, - "notNull": true - }, - "occurrence_id": { - "name": "occurrence_id", - "type": "text", - "primaryKey": false, - "notNull": true - }, - "original_due_at": { - "name": "original_due_at", - "type": "timestamp", - "primaryKey": false, - "notNull": true - }, - "patch": { - "name": "patch", - "type": "jsonb", - "primaryKey": false, - "notNull": false - }, - "completed_at": { - "name": "completed_at", - "type": "timestamp", - "primaryKey": false, - "notNull": false - }, - "deleted_at": { - "name": "deleted_at", - "type": "timestamp", - "primaryKey": false, - "notNull": false - }, - "created_at": { - "name": "created_at", - "type": "timestamp", - "primaryKey": false, - "notNull": true, - "default": "now()" - }, - "updated_at": { - "name": "updated_at", - "type": "timestamp", - "primaryKey": false, - "notNull": true, - "default": "now()" - } - }, - "indexes": { - "reminder_occurrence_overrides_user_id_reminder_id_idx": { - "name": "reminder_occurrence_overrides_user_id_reminder_id_idx", - "columns": [ - { - "expression": "user_id", - "isExpression": false, - "asc": true, - "nulls": "last" - }, - { - "expression": "reminder_id", - "isExpression": false, - "asc": true, - "nulls": "last" - } - ], - "isUnique": false, - "concurrently": false, - "method": "btree", - "with": {} - }, - "reminder_occurrence_overrides_user_id_original_due_at_idx": { - "name": "reminder_occurrence_overrides_user_id_original_due_at_idx", - "columns": [ - { - "expression": "user_id", - "isExpression": false, - "asc": true, - "nulls": "last" - }, - { - "expression": "original_due_at", - "isExpression": false, - "asc": true, - "nulls": "last" - } - ], - "isUnique": false, - "concurrently": false, - "method": "btree", - "with": {} - } - }, - "foreignKeys": { - "reminder_occurrence_overrides_user_id_user_id_fk": { - "name": "reminder_occurrence_overrides_user_id_user_id_fk", - "tableFrom": "reminder_occurrence_overrides", - "tableTo": "user", - "columnsFrom": [ - "user_id" - ], - "columnsTo": [ - "id" - ], - "onDelete": "cascade", - "onUpdate": "no action" - }, - "reminder_occurrence_overrides_reminder_id_reminders_id_fk": { - "name": "reminder_occurrence_overrides_reminder_id_reminders_id_fk", - "tableFrom": "reminder_occurrence_overrides", - "tableTo": "reminders", - "columnsFrom": [ - "reminder_id" - ], - "columnsTo": [ - "id" - ], - "onDelete": "cascade", - "onUpdate": "no action" - } - }, - "compositePrimaryKeys": {}, - "uniqueConstraints": { - "reminder_occurrence_overrides_reminder_id_occurrence_id_unique": { - "name": "reminder_occurrence_overrides_reminder_id_occurrence_id_unique", - "nullsNotDistinct": false, - "columns": [ - "reminder_id", - "occurrence_id" - ] - } - }, - "policies": {}, - "checkConstraints": {}, - "isRLSEnabled": false } }, "enums": {}, diff --git a/apps/freya-backend/src/admin/http.test.ts b/apps/freya-backend/src/admin/http.test.ts index c56a272..7837777 100644 --- a/apps/freya-backend/src/admin/http.test.ts +++ b/apps/freya-backend/src/admin/http.test.ts @@ -44,6 +44,20 @@ mock.module("../sources/user-sources.ts", () => ({ }), })) +mock.module("../conversations/storage.ts", () => ({ + conversations: (_db: Database, userId: string) => ({ + async getOrCreateConversation() { + return { id: `conversation-${userId}` } + }, + async listEntries() { + return [] + }, + async appendEntry() { + return { id: "entry-1", sequence: 1 } + }, + }), +})) + function createStubSource(id: string): FeedSource { return { id, diff --git a/apps/freya-backend/src/agent/conversation-recording-query-agent.test.ts b/apps/freya-backend/src/agent/conversation-recording-query-agent.test.ts new file mode 100644 index 0000000..f1611df --- /dev/null +++ b/apps/freya-backend/src/agent/conversation-recording-query-agent.test.ts @@ -0,0 +1,347 @@ +import { describe, expect, test } from "bun:test" + +import type { AppendConversationEntryInput } from "../conversations/storage.ts" +import type { + ConversationStorage, + ConversationStorageEntry, +} from "./conversation-recording-query-agent.ts" + +import { ConversationEntryKind } from "../conversations/types.ts" +import { ConversationRecordingQueryAgent } from "./conversation-recording-query-agent.ts" +import { + createQueryAgentEventListeners, + QueryAgentEvent, + type QueryAgent, + type QueryAgentAsk, + type QueryAgentCompactionEvent, + type QueryAgentEventListeners, + type QueryAgentEventListener, + type QueryAgentEventMap, + type QueryAgentStreamEvent, +} from "./query-agent.ts" + +interface RecordedEntry { + conversationId: string + input: AppendConversationEntryInput +} + +class FakeQueryAgent implements QueryAgent { + readonly inputs: QueryAgentAsk[] = [] + private readonly events: QueryAgentStreamEvent[] + private readonly eventListeners = createQueryAgentEventListeners() + + constructor(events: QueryAgentStreamEvent[]) { + this.events = events + } + + async *ask(input: QueryAgentAsk): AsyncIterable { + this.inputs.push(input) + for (const event of this.events) { + yield event + } + } + + addEventListener( + type: T, + listener: QueryAgentEventListener, + ): () => void { + const listeners = this.listenersFor(type) + listeners.add(listener) + return () => { + listeners.delete(listener) + } + } + + async emitCompaction(event: QueryAgentCompactionEvent): Promise { + await this.emitEvent(event) + } + + private async emitEvent(event: QueryAgentEventMap[T]): Promise { + const listeners = this.listenersFor(event.type) + for (const listener of listeners) { + await listener(event) + } + } + + private listenersFor(type: T): QueryAgentEventListeners[T] { + return this.eventListeners[type] + } + + dispose(): void {} +} + +class FakeConversationStorage implements ConversationStorage { + getOrCreateCount = 0 + readonly entries: RecordedEntry[] = [] + conversationId = "conversation-1" + + async getOrCreateConversation(): Promise<{ id: string }> { + this.getOrCreateCount += 1 + return { id: this.conversationId } + } + + async appendEntry( + conversationId: string, + input: AppendConversationEntryInput, + ): Promise { + this.entries.push({ conversationId, input }) + return { + id: `entry-${this.entries.length}`, + sequence: this.entries.length, + kind: input.kind, + payload: input.payload, + metadata: input.metadata ?? {}, + createdAt: new Date("2026-06-15T00:00:00.000Z"), + } + } + + async listEntries(_conversationId: string): Promise { + return this.entries.map((entry, index) => ({ + id: `entry-${index + 1}`, + sequence: index + 1, + kind: entry.input.kind, + payload: entry.input.payload, + metadata: entry.input.metadata ?? {}, + createdAt: new Date("2026-06-15T00:00:00.000Z"), + })) + } +} + +describe("ConversationRecordingQueryAgent", () => { + test("records user and assistant messages in the conversation timeline", async () => { + const queryAgent = new FakeQueryAgent([ + { type: "text_delta", text: "Hello " }, + { type: "text_delta", text: "there." }, + { type: "done" }, + ]) + const storage = new FakeConversationStorage() + const agent = new ConversationRecordingQueryAgent({ + agent: queryAgent, + storage, + modelProvider: "openrouter", + modelId: "test-model", + }) + + const events = await collectEvents( + agent.ask({ + message: "hi", + }), + ) + + expect(events[0]).toEqual({ type: "conversation", conversationId: "conversation-1" }) + expect(queryAgent.inputs[0]?.conversationId).toBe("conversation-1") + expect(storage.getOrCreateCount).toBe(1) + expect(storage.entries).toHaveLength(2) + + const userEntry = storage.entries[0]!.input + if (userEntry.kind !== ConversationEntryKind.UserMessage) { + throw new Error("Expected user message entry") + } + expect(userEntry.payload.parts).toEqual([{ type: "text", text: "hi" }]) + + const assistantEntry = storage.entries[1]!.input + if (assistantEntry.kind !== ConversationEntryKind.AssistantMessage) { + throw new Error("Expected assistant message entry") + } + expect(assistantEntry.payload.parts).toEqual([{ type: "text", text: "Hello there." }]) + expect(assistantEntry.metadata?.modelRun?.provider).toBe("openrouter") + expect(assistantEntry.metadata?.modelRun?.model).toBe("test-model") + }) + + test("uses a provided conversation id without creating a default conversation", async () => { + const queryAgent = new FakeQueryAgent([{ type: "done" }]) + const storage = new FakeConversationStorage() + const agent = new ConversationRecordingQueryAgent({ + agent: queryAgent, + storage, + modelProvider: "openrouter", + modelId: "test-model", + }) + + const events = await collectEvents( + agent.ask({ + conversationId: "conversation-existing", + message: "continue", + }), + ) + + expect(events[0]).toEqual({ + type: "conversation", + conversationId: "conversation-existing", + }) + expect(storage.getOrCreateCount).toBe(0) + expect(storage.entries[0]?.conversationId).toBe("conversation-existing") + expect(queryAgent.inputs[0]?.conversationId).toBe("conversation-existing") + }) + + test("uses the eager default conversation id without reading storage on ask", async () => { + const queryAgent = new FakeQueryAgent([{ type: "done" }]) + const storage = new FakeConversationStorage() + const agent = new ConversationRecordingQueryAgent({ + agent: queryAgent, + storage, + defaultConversationId: "conversation-eager", + modelProvider: "openrouter", + modelId: "test-model", + }) + + const events = await collectEvents( + agent.ask({ + message: "continue", + }), + ) + + expect(events[0]).toEqual({ + type: "conversation", + conversationId: "conversation-eager", + }) + expect(storage.getOrCreateCount).toBe(0) + expect(storage.entries[0]?.conversationId).toBe("conversation-eager") + expect(queryAgent.inputs[0]?.conversationId).toBe("conversation-eager") + }) + + test("rejects switching away from the eager default conversation", async () => { + const queryAgent = new FakeQueryAgent([{ type: "done" }]) + const storage = new FakeConversationStorage() + const agent = new ConversationRecordingQueryAgent({ + agent: queryAgent, + storage, + defaultConversationId: "conversation-eager", + modelProvider: "openrouter", + modelId: "test-model", + }) + + const events = await collectEvents( + agent.ask({ + conversationId: "conversation-other", + message: "continue", + }), + ) + + expect(events).toEqual([ + { + type: "error", + message: "Conversation switching is not supported for this session", + }, + ]) + expect(storage.entries).toHaveLength(0) + expect(queryAgent.inputs).toHaveLength(0) + }) + + test("records tool activity and agent errors as internal entries", async () => { + const queryAgent = new FakeQueryAgent([ + { type: "tool_start", toolName: "freya_get_feed" }, + { type: "tool_end", toolName: "freya_get_feed", ok: true }, + { type: "error", message: "model unavailable" }, + ]) + const storage = new FakeConversationStorage() + const agent = new ConversationRecordingQueryAgent({ + agent: queryAgent, + storage, + modelProvider: "openrouter", + modelId: "test-model", + }) + + await collectEvents( + agent.ask({ + message: "what now?", + }), + ) + + expect(storage.entries.map((entry) => entry.input.kind)).toEqual([ + ConversationEntryKind.UserMessage, + ConversationEntryKind.ToolCall, + ConversationEntryKind.ToolResult, + ConversationEntryKind.SystemNote, + ]) + + const toolCall = storage.entries[1]!.input + if (toolCall.kind !== ConversationEntryKind.ToolCall) { + throw new Error("Expected tool call entry") + } + expect(toolCall.payload.toolName).toBe("freya_get_feed") + + const toolResult = storage.entries[2]!.input + if (toolResult.kind !== ConversationEntryKind.ToolResult) { + throw new Error("Expected tool result entry") + } + expect(toolResult.payload.ok).toBe(true) + + const systemNote = storage.entries[3]!.input + if (systemNote.kind !== ConversationEntryKind.SystemNote) { + throw new Error("Expected system note entry") + } + expect(systemNote.payload).toMatchObject({ + type: "agent_error", + message: "model unavailable", + }) + }) + + test("records compaction events as context summaries", async () => { + const queryAgent = new FakeQueryAgent([ + { type: "text_delta", text: "Kept answer." }, + { type: "done" }, + ]) + const storage = new FakeConversationStorage() + const agent = new ConversationRecordingQueryAgent({ + agent: queryAgent, + storage, + defaultConversationId: "conversation-1", + modelProvider: "openrouter", + modelId: "test-model", + }) + const forwardedCompactions: QueryAgentCompactionEvent[] = [] + agent.addEventListener(QueryAgentEvent.Compaction, (event) => { + forwardedCompactions.push(event) + }) + + await collectEvents( + agent.ask({ + message: "remember this", + }), + ) + + await queryAgent.emitCompaction({ + type: QueryAgentEvent.Compaction, + conversationId: "conversation-1", + summary: "The user prefers compact summaries.", + firstKeptEntryId: "pi-entry-7", + compactedEntryRange: { + startSequence: 1, + endSequence: 1, + }, + tokensBefore: 1234, + details: { reason: "threshold" }, + fromExtension: false, + }) + + const summaryEntry = storage.entries.at(-1)?.input + if (summaryEntry?.kind !== ConversationEntryKind.ContextSummary) { + throw new Error("Expected context summary entry") + } + expect(summaryEntry.payload.covers).toEqual({ + startSequence: 1, + endSequence: 1, + }) + expect(summaryEntry.payload.summary.importantDetails).toEqual([ + "The user prefers compact summaries.", + ]) + expect(summaryEntry.metadata?.piCompaction).toMatchObject({ + firstKeptEntryId: "pi-entry-7", + tokensBefore: 1234, + fromExtension: false, + details: { reason: "threshold" }, + }) + expect(forwardedCompactions).toHaveLength(1) + }) +}) + +async function collectEvents( + events: AsyncIterable, +): Promise { + const result: QueryAgentStreamEvent[] = [] + for await (const event of events) { + result.push(event) + } + return result +} diff --git a/apps/freya-backend/src/agent/conversation-recording-query-agent.ts b/apps/freya-backend/src/agent/conversation-recording-query-agent.ts new file mode 100644 index 0000000..041a15d --- /dev/null +++ b/apps/freya-backend/src/agent/conversation-recording-query-agent.ts @@ -0,0 +1,252 @@ +import { randomUUID } from "node:crypto" + +import type { + AppendConversationEntryInput, + ConversationEntryRow, +} from "../conversations/storage.ts" +import type { ConversationEntryMetadata } from "../conversations/types.ts" + +import { ConversationEntryKind } from "../conversations/types.ts" +import { + createQueryAgentEventListeners, + QueryAgentEvent, + type QueryAgent, + type QueryAgentAsk, + type QueryAgentCompactionEvent, + type QueryAgentEventListeners, + type QueryAgentEventListener, + type QueryAgentEventMap, + type QueryAgentStreamEvent, +} from "./query-agent.ts" + +export interface ConversationStorage { + getOrCreateConversation(): Promise<{ id: string }> + appendEntry( + conversationId: string, + input: AppendConversationEntryInput, + ): Promise + listEntries(conversationId: string): Promise +} + +export type ConversationStorageEntry = Pick< + ConversationEntryRow, + "id" | "sequence" | "kind" | "payload" | "metadata" | "createdAt" +> + +export interface ConversationRecordingQueryAgentConfig { + agent: QueryAgent + storage: ConversationStorage + defaultConversationId?: string + route?: string + modelProvider: string + modelId: string +} + +const DefaultRoute = "agent_query" + +export class ConversationRecordingQueryAgent implements QueryAgent { + private readonly agent: QueryAgent + private readonly storage: ConversationStorage + private readonly defaultConversationId: string | undefined + private readonly route: string + private readonly modelProvider: string + private readonly modelId: string + private readonly eventListeners = createQueryAgentEventListeners() + private readonly removeAgentCompactionListener: () => void + + constructor(config: ConversationRecordingQueryAgentConfig) { + this.agent = config.agent + this.storage = config.storage + this.defaultConversationId = config.defaultConversationId + this.route = config.route ?? DefaultRoute + this.modelProvider = config.modelProvider + this.modelId = config.modelId + this.removeAgentCompactionListener = this.agent.addEventListener( + QueryAgentEvent.Compaction, + async (event) => { + await this.appendCompactionSummary(event) + await this.emitEvent(event) + }, + ) + } + + async *ask(input: QueryAgentAsk): AsyncIterable { + if ( + this.defaultConversationId && + input.conversationId && + input.conversationId !== this.defaultConversationId + ) { + yield { + type: "error", + message: "Conversation switching is not supported for this session", + } + return + } + + const conversationId = + input.conversationId ?? + this.defaultConversationId ?? + (await this.storage.getOrCreateConversation()).id + const runId = randomUUID() + + const userEntry = await this.storage.appendEntry(conversationId, { + kind: ConversationEntryKind.UserMessage, + payload: { + role: "user", + parts: [{ type: "text", text: input.message }], + }, + metadata: { runId }, + }) + + yield { type: "conversation", conversationId } + + const assistantText: string[] = [] + for await (const event of this.agent.ask({ + ...input, + conversationId, + userMessageEntry: { + id: userEntry.id, + sequence: userEntry.sequence, + }, + })) { + switch (event.type) { + case "conversation": + break + case "text_delta": + assistantText.push(event.text) + yield event + break + case "tool_start": + await this.storage.appendEntry(conversationId, { + kind: ConversationEntryKind.ToolCall, + payload: { + toolName: event.toolName, + runId, + }, + metadata: { runId }, + }) + yield event + break + case "tool_end": + await this.storage.appendEntry(conversationId, { + kind: ConversationEntryKind.ToolResult, + payload: { + toolName: event.toolName, + ok: event.ok, + runId, + }, + metadata: { runId }, + }) + yield event + break + case "error": + await this.storage.appendEntry(conversationId, { + kind: ConversationEntryKind.SystemNote, + payload: { + type: "agent_error", + message: event.message, + runId, + }, + metadata: { runId }, + }) + yield event + return + case "done": + await this.appendAssistantMessage(conversationId, assistantText, runId) + yield event + return + } + } + + await this.appendAssistantMessage(conversationId, assistantText, runId) + } + + dispose(): void { + this.removeAgentCompactionListener() + this.clearEventListeners() + this.agent.dispose() + } + + addEventListener( + type: T, + listener: QueryAgentEventListener, + ): () => void { + const listeners = this.listenersFor(type) + listeners.add(listener) + return () => { + listeners.delete(listener) + } + } + + private async appendAssistantMessage( + conversationId: string, + assistantText: string[], + runId: string, + ): Promise { + const text = assistantText.join("") + if (text.length === 0) return + + await this.storage.appendEntry(conversationId, { + kind: ConversationEntryKind.AssistantMessage, + payload: { + role: "assistant", + parts: [{ type: "text", text }], + }, + metadata: this.modelRunMetadata(runId), + }) + } + + private modelRunMetadata(runId: string): ConversationEntryMetadata { + const metadata: ConversationEntryMetadata = { runId } + metadata.modelRun = { + route: this.route, + provider: this.modelProvider, + model: this.modelId, + } + return metadata + } + + private async appendCompactionSummary(event: QueryAgentCompactionEvent): Promise { + if (event.compactedEntryRange === null) return + + await this.storage.appendEntry(event.conversationId, { + kind: ConversationEntryKind.ContextSummary, + payload: { + covers: event.compactedEntryRange, + summary: { + durableFacts: [], + preferences: [], + decisions: [], + openTasks: [], + importantDetails: [event.summary], + }, + promptVersion: "pi-sdk-compaction-v1", + }, + metadata: { + piCompaction: { + firstKeptEntryId: event.firstKeptEntryId, + tokensBefore: event.tokensBefore, + fromExtension: event.fromExtension, + details: event.details, + }, + }, + }) + } + + private async emitEvent(event: QueryAgentEventMap[T]): Promise { + const listeners = this.listenersFor(event.type) + for (const listener of listeners) { + await listener(event) + } + } + + private listenersFor(type: T): QueryAgentEventListeners[T] { + return this.eventListeners[type] + } + + private clearEventListeners(): void { + for (const listeners of Object.values(this.eventListeners)) { + listeners.clear() + } + } +} diff --git a/apps/freya-backend/src/agent/http.test.ts b/apps/freya-backend/src/agent/http.test.ts index 4dfdf6d..dda9a95 100644 --- a/apps/freya-backend/src/agent/http.test.ts +++ b/apps/freya-backend/src/agent/http.test.ts @@ -3,7 +3,13 @@ import { Hono } from "hono" import type { UserSessionManager } from "../session/index.ts" import type { QueryDebugTools, QueryDebugToolDefinition } from "./debug-tools.ts" -import type { QueryAgent, QueryAgentAsk, QueryAgentEvent } from "./query-agent.ts" +import type { + QueryAgent, + QueryAgentAsk, + QueryAgentEventListener, + QueryAgentStreamEvent, + QueryAgentEvent, +} from "./query-agent.ts" import { mockAuthSessionMiddleware } from "../auth/session-middleware.ts" import { registerAgentHttpHandlers, registerDebugAgentHttpHandlers } from "./http.ts" @@ -12,19 +18,26 @@ const MockUserId = "k7Gx2mPqRvNwYs9TdLfA4bHcJeUo1iZn" class FakeQueryAgent implements QueryAgent { readonly inputs: QueryAgentAsk[] = [] - private readonly events: QueryAgentEvent[] + private readonly events: QueryAgentStreamEvent[] - constructor(events: QueryAgentEvent[]) { + constructor(events: QueryAgentStreamEvent[]) { this.events = events } - async *ask(input: QueryAgentAsk): AsyncIterable { + async *ask(input: QueryAgentAsk): AsyncIterable { this.inputs.push(input) for (const event of this.events) { yield event } } + addEventListener( + _type: T, + _listener: QueryAgentEventListener, + ): () => void { + return () => {} + } + dispose(): void {} } @@ -110,6 +123,27 @@ describe("POST /api/agent", () => { expect(body.message).toBe("You should leave at 8:30.") }) + test("passes conversation id to the query agent", async () => { + const agent = new FakeQueryAgent([ + { type: "conversation", conversationId: "conversation-1" }, + { type: "done" }, + ]) + const app = buildTestApp(agent, "user-1") + + const res = await app.request("/api/agent", { + method: "POST", + body: JSON.stringify({ + message: "Continue this chat.", + conversationId: "conversation-1", + }), + }) + + expect(res.status).toBe(200) + expect(agent.inputs[0]?.conversationId).toBe("conversation-1") + const body = (await res.json()) as { conversationId?: string } + expect(body.conversationId).toBe("conversation-1") + }) + test("returns 400 for invalid body", async () => { const app = buildTestApp(new FakeQueryAgent([]), "user-1") diff --git a/apps/freya-backend/src/agent/http.ts b/apps/freya-backend/src/agent/http.ts index 77bbd01..983af56 100644 --- a/apps/freya-backend/src/agent/http.ts +++ b/apps/freya-backend/src/agent/http.ts @@ -35,6 +35,7 @@ interface AgentDebugHttpHandlersDeps { const AgentAskRequestBody = type({ "+": "reject", message: "string", + "conversationId?": "string", }) export function registerAgentHttpHandlers( @@ -82,6 +83,7 @@ async function handleAgentAsk(c: Context) { const session = await sessionManager.getOrCreate(user.id) const response = await collectQueryAgentResponse(session.agent, { message: parsed.message, + conversationId: parsed.conversationId, }) return c.json(response) } catch (err) { diff --git a/apps/freya-backend/src/agent/in-memory-resource-loader.ts b/apps/freya-backend/src/agent/in-memory-resource-loader.ts deleted file mode 100644 index c6fd35c..0000000 --- a/apps/freya-backend/src/agent/in-memory-resource-loader.ts +++ /dev/null @@ -1,43 +0,0 @@ -import { createExtensionRuntime, type ResourceLoader } from "@earendil-works/pi-coding-agent" - -export class InMemoryResourceLoader implements ResourceLoader { - private readonly extensions: ReturnType = { - extensions: [], - errors: [], - runtime: createExtensionRuntime(), - } - - constructor(private readonly systemPrompt: string) {} - - getExtensions(): ReturnType { - return this.extensions - } - - getSkills(): ReturnType { - return { skills: [], diagnostics: [] } - } - - getPrompts(): ReturnType { - return { prompts: [], diagnostics: [] } - } - - getThemes(): ReturnType { - return { themes: [], diagnostics: [] } - } - - getAgentsFiles(): ReturnType { - return { agentsFiles: [] } - } - - getSystemPrompt(): string { - return this.systemPrompt - } - - getAppendSystemPrompt(): string[] { - return [] - } - - extendResources(_paths: Parameters[0]): void {} - - async reload(_options?: Parameters[0]): Promise {} -} diff --git a/apps/freya-backend/src/agent/pi-query-agent.test.ts b/apps/freya-backend/src/agent/pi-query-agent.test.ts index 9c9fc98..c97e132 100644 --- a/apps/freya-backend/src/agent/pi-query-agent.test.ts +++ b/apps/freya-backend/src/agent/pi-query-agent.test.ts @@ -1,7 +1,10 @@ import { beforeEach, describe, expect, mock, test } from "bun:test" import type { QueryAgentToolbox } from "./query-agent-toolbox.ts" -import type { QueryAgentEvent } from "./query-agent.ts" +import type { QueryAgentStreamEvent } from "./query-agent.ts" + +import { ConversationEntryKind } from "../conversations/types.ts" +import { QueryAgentEvent } from "./query-agent.ts" interface FakePiSession { subscribe(listener: (event: unknown) => void): () => void @@ -9,6 +12,61 @@ interface FakePiSession { dispose(): void } +type CapturedExtensionHandler = (event: unknown) => Promise | unknown + +interface CapturedExtensionApi { + on(event: string, handler: CapturedExtensionHandler): void +} + +type CapturedExtensionFactory = (pi: CapturedExtensionApi) => Promise | void + +interface CapturedExtension { + handlers: Map +} + +interface CapturedResourceLoader { + getExtensions(): unknown +} + +interface CapturedDefaultResourceLoaderOptions { + extensionFactories?: CapturedExtensionFactory[] +} + +class FakeDefaultResourceLoader implements CapturedResourceLoader { + private readonly extensionFactories: CapturedExtensionFactory[] + private extensionsResult: { extensions: CapturedExtension[] } + + constructor(options: unknown) { + this.extensionFactories = isDefaultResourceLoaderOptions(options) + ? (options.extensionFactories ?? []) + : [] + this.extensionsResult = { extensions: [] } + } + + async reload(): Promise { + const handlers: CapturedExtension["handlers"] = new Map() + const api: CapturedExtensionApi = { + on(event: string, handler: CapturedExtensionHandler): void { + const existing = handlers.get(event) ?? [] + existing.push(handler) + handlers.set(event, existing) + }, + } + + for (const factory of this.extensionFactories) { + await factory(api) + } + + this.extensionsResult = { + extensions: [{ handlers }], + } + } + + getExtensions(): unknown { + return this.extensionsResult + } +} + let createAgentSessionCalls = 0 let createAgentSessionOptions: unknown let runtimeApiKeyCalls: Array<{ provider: string; apiKey: string }> = [] @@ -51,6 +109,44 @@ const fakeSession: FakePiSession = { dispose(): void {}, } +class FakeSessionManager { + private messages: unknown[] = [] + private compaction: { summary: string; tokensBefore: number; timestamp: number } | null = null + + appendMessage(message: unknown): string { + this.messages.push(message) + return `message-${this.messages.length}` + } + + appendCompaction(summary: string, _firstKeptEntryId: string, tokensBefore: number): string { + this.compaction = { + summary, + tokensBefore, + timestamp: Date.now(), + } + this.messages = [] + return "compaction-1" + } + + buildSessionContext(): unknown { + const messages = [...this.messages] + if (this.compaction) { + messages.unshift({ + role: "compactionSummary", + summary: this.compaction.summary, + tokensBefore: this.compaction.tokensBefore, + timestamp: this.compaction.timestamp, + }) + } + + return { + messages, + thinkingLevel: "off", + model: modelFromMessages(messages), + } + } +} + mock.module("@earendil-works/pi-coding-agent", () => ({ AuthStorage: { inMemory() { @@ -71,6 +167,7 @@ mock.module("@earendil-works/pi-coding-agent", () => ({ createExtensionRuntime() { return {} }, + DefaultResourceLoader: FakeDefaultResourceLoader, defineTool(tool: unknown): unknown { return tool }, @@ -86,7 +183,7 @@ mock.module("@earendil-works/pi-coding-agent", () => ({ }, SessionManager: { inMemory(_cwd: string): unknown { - return {} + return new FakeSessionManager() }, }, SettingsManager: { @@ -131,7 +228,6 @@ describe("PiQueryAgent", () => { test("rejects a concurrent first query while the Pi session is being created", async () => { const { PiQueryAgent } = await import("./pi-query-agent.ts") const agent = new PiQueryAgent({ - userId: "user-1", toolbox: createStubToolbox(), apiKey: "test-api-key", cwd: "/tmp/freya-pi-query-agent-test", @@ -155,7 +251,7 @@ describe("PiQueryAgent", () => { expect(secondEvents).toEqual([ { type: "error", - message: "A query is already running for this user", + message: "A query is already running", }, ]) expect(createAgentSessionCalls).toBe(1) @@ -175,6 +271,228 @@ describe("PiQueryAgent", () => { } expect("agentDir" in createAgentSessionOptions).toBe(false) expect(createAgentSessionOptions.resourceLoader).toBeDefined() + expect(typeof sessionCompactHandlerFromCapturedOptions()).toBe("function") + + agent.dispose() + }) + + test("hydrates initial entries into the Pi session manager", async () => { + const { PiQueryAgent } = await import("./pi-query-agent.ts") + const agent = new PiQueryAgent({ + toolbox: createStubToolbox(), + cwd: "/tmp/freya-pi-query-agent-test", + systemPrompt: "test", + initialEntries: [ + { + id: "entry-1", + sequence: 1, + kind: ConversationEntryKind.UserMessage, + payload: { + role: "user", + parts: [{ type: "text", text: "stored hello" }], + }, + metadata: {}, + createdAt: new Date("2026-06-15T00:00:00.000Z"), + }, + { + id: "entry-2", + sequence: 2, + kind: ConversationEntryKind.AssistantMessage, + payload: { + role: "assistant", + parts: [{ type: "text", text: "stored reply" }], + }, + metadata: {}, + createdAt: new Date("2026-06-15T00:00:01.000Z"), + }, + ], + }) + + const events = collectEvents( + agent.ask({ + message: "hello", + }), + ) + + await sessionCreationStarted + if (!isRecord(createAgentSessionOptions)) { + throw new Error("createAgentSession options were not captured") + } + const sessionManager = createAgentSessionOptions.sessionManager + if (!(sessionManager instanceof FakeSessionManager)) { + throw new Error("session manager was not hydrated by PiQueryAgent") + } + const context = sessionManager.buildSessionContext() + if (!isRecord(context) || !Array.isArray(context.messages)) { + throw new Error("session context messages were not captured") + } + expect(context.messages[0]).toEqual({ + role: "user", + content: "stored hello", + timestamp: new Date("2026-06-15T00:00:00.000Z").getTime(), + }) + expect(context.messages[1]).toMatchObject({ + role: "assistant", + provider: "openrouter", + model: "z-ai/glm-4.7-flash", + stopReason: "stop", + timestamp: new Date("2026-06-15T00:00:01.000Z").getTime(), + }) + + releaseSessionCreation() + await promptStarted + releasePrompt() + + expect(await events).toEqual([{ type: "done" }]) + + agent.dispose() + }) + + test("emits Pi compaction events for the active conversation", async () => { + const recordedCompactions: unknown[] = [] + const { PiQueryAgent } = await import("./pi-query-agent.ts") + const agent = new PiQueryAgent({ + toolbox: createStubToolbox(), + cwd: "/tmp/freya-pi-query-agent-test", + systemPrompt: "test", + }) + agent.addEventListener(QueryAgentEvent.Compaction, (event) => { + recordedCompactions.push(event) + }) + + const events = collectEvents( + agent.ask({ + conversationId: "conversation-1", + message: "hello", + }), + ) + + await sessionCreationStarted + releaseSessionCreation() + await promptStarted + + const handler = sessionCompactHandlerFromCapturedOptions() + await handler({ + type: "session_compact", + fromExtension: false, + compactionEntry: { + type: "compaction", + id: "pi-compaction-1", + timestamp: "2026-06-15T00:00:00.000Z", + summary: "The user prefers concise updates.", + firstKeptEntryId: "pi-entry-7", + tokensBefore: 1234, + details: { reason: "threshold" }, + }, + }) + + expect(recordedCompactions).toEqual([ + { + type: QueryAgentEvent.Compaction, + conversationId: "conversation-1", + summary: "The user prefers concise updates.", + firstKeptEntryId: "pi-entry-7", + compactedEntryRange: null, + tokensBefore: 1234, + details: { reason: "threshold" }, + fromExtension: false, + }, + ]) + + releasePrompt() + + expect(await events).toEqual([{ type: "done" }]) + expect(unsubscribeCalls).toBe(1) + + agent.dispose() + }) + + test("emits Freya coverage through the entry before Pi's kept boundary", async () => { + const recordedCompactions: unknown[] = [] + const { PiQueryAgent } = await import("./pi-query-agent.ts") + const agent = new PiQueryAgent({ + toolbox: createStubToolbox(), + cwd: "/tmp/freya-pi-query-agent-test", + systemPrompt: "test", + initialEntries: [ + { + id: "entry-1", + sequence: 1, + kind: ConversationEntryKind.UserMessage, + payload: { + role: "user", + parts: [{ type: "text", text: "old hello" }], + }, + metadata: {}, + createdAt: new Date("2026-06-15T00:00:00.000Z"), + }, + { + id: "entry-2", + sequence: 2, + kind: ConversationEntryKind.AssistantMessage, + payload: { + role: "assistant", + parts: [{ type: "text", text: "kept reply" }], + }, + metadata: {}, + createdAt: new Date("2026-06-15T00:00:01.000Z"), + }, + ], + }) + agent.addEventListener(QueryAgentEvent.Compaction, (event) => { + recordedCompactions.push(event) + }) + + const events = collectEvents( + agent.ask({ + conversationId: "conversation-1", + message: "hello", + }), + ) + + await sessionCreationStarted + + await extensionHandlerFromCapturedOptions("session_before_compact")({ + type: "session_before_compact", + preparation: { + firstKeptEntryId: "message-2", + }, + branchEntries: [{ id: "message-1" }, { id: "message-2" }], + }) + await extensionHandlerFromCapturedOptions("session_compact")({ + type: "session_compact", + fromExtension: false, + compactionEntry: { + type: "compaction", + id: "pi-compaction-1", + timestamp: "2026-06-15T00:00:00.000Z", + summary: "Old hello was discussed.", + firstKeptEntryId: "message-2", + tokensBefore: 1234, + }, + }) + + expect(recordedCompactions).toEqual([ + { + type: QueryAgentEvent.Compaction, + conversationId: "conversation-1", + summary: "Old hello was discussed.", + firstKeptEntryId: "message-2", + compactedEntryRange: { + startSequence: 1, + endSequence: 1, + }, + tokensBefore: 1234, + details: undefined, + fromExtension: false, + }, + ]) + + releaseSessionCreation() + await promptStarted + releasePrompt() + + expect(await events).toEqual([{ type: "done" }]) agent.dispose() }) @@ -182,7 +500,6 @@ describe("PiQueryAgent", () => { test("surfaces Pi message_end provider errors instead of done", async () => { const { PiQueryAgent } = await import("./pi-query-agent.ts") const agent = new PiQueryAgent({ - userId: "user-1", toolbox: createStubToolbox(), cwd: "/tmp/freya-pi-query-agent-test", systemPrompt: "test", @@ -219,7 +536,6 @@ describe("PiQueryAgent", () => { test("surfaces Pi agent_end provider errors instead of done", async () => { const { PiQueryAgent } = await import("./pi-query-agent.ts") const agent = new PiQueryAgent({ - userId: "user-1", toolbox: createStubToolbox(), cwd: "/tmp/freya-pi-query-agent-test", systemPrompt: "test", @@ -256,8 +572,10 @@ describe("PiQueryAgent", () => { }) }) -async function collectEvents(events: AsyncIterable): Promise { - const result: QueryAgentEvent[] = [] +async function collectEvents( + events: AsyncIterable, +): Promise { + const result: QueryAgentStreamEvent[] = [] for await (const event of events) { result.push(event) } @@ -290,6 +608,79 @@ function createStubToolbox(): QueryAgentToolbox { } } +function sessionCompactHandlerFromCapturedOptions(): CapturedExtensionHandler { + return extensionHandlerFromCapturedOptions("session_compact") +} + +function extensionHandlerFromCapturedOptions(eventName: string): CapturedExtensionHandler { + if (!isRecord(createAgentSessionOptions)) { + throw new Error("createAgentSession options were not captured") + } + + const resourceLoader = createAgentSessionOptions.resourceLoader + if (!isCapturedResourceLoader(resourceLoader)) { + throw new Error("resourceLoader was not captured") + } + + const extensionsResult = resourceLoader.getExtensions() + if (!isRecord(extensionsResult) || !Array.isArray(extensionsResult.extensions)) { + throw new Error("extensions were not captured") + } + + const extension = extensionsResult.extensions[0] + if (!isCapturedExtension(extension)) { + throw new Error("compaction extension was not captured") + } + + const handlers = extension.handlers.get(eventName) + const handler = handlers?.[0] + if (!handler) { + throw new Error(`${eventName} handler was not captured`) + } + + return handler +} + +function isCapturedResourceLoader(value: unknown): value is CapturedResourceLoader { + return isRecord(value) && typeof value.getExtensions === "function" +} + +function isCapturedExtension(value: unknown): value is CapturedExtension { + return isRecord(value) && value.handlers instanceof Map +} + +function isDefaultResourceLoaderOptions( + value: unknown, +): value is CapturedDefaultResourceLoaderOptions { + return ( + isRecord(value) && + (value.extensionFactories === undefined || + (Array.isArray(value.extensionFactories) && + value.extensionFactories.every(isCapturedExtensionFactory))) + ) +} + +function isCapturedExtensionFactory(value: unknown): value is CapturedExtensionFactory { + return typeof value === "function" +} + +function modelFromMessages(messages: unknown[]): { provider: string; modelId: string } | null { + let model: { provider: string; modelId: string } | null = null + + for (const message of messages) { + if (!isRecord(message)) continue + if (message.role !== "assistant") continue + if (typeof message.provider !== "string" || typeof message.model !== "string") continue + + model = { + provider: message.provider, + modelId: message.model, + } + } + + return model +} + function isRecord(value: unknown): value is Record { return typeof value === "object" && value !== null } diff --git a/apps/freya-backend/src/agent/pi-query-agent.ts b/apps/freya-backend/src/agent/pi-query-agent.ts index 56a3e1a..a08593c 100644 --- a/apps/freya-backend/src/agent/pi-query-agent.ts +++ b/apps/freya-backend/src/agent/pi-query-agent.ts @@ -1,73 +1,119 @@ -import type { AgentSessionEvent } from "@earendil-works/pi-coding-agent" +import type { + AgentSessionEvent, + ExtensionFactory, + SessionEntry, +} from "@earendil-works/pi-coding-agent" import { AuthStorage, createAgentSession, + DefaultResourceLoader, ModelRegistry, - SessionManager, SettingsManager, } from "@earendil-works/pi-coding-agent" import { tmpdir } from "node:os" +import type { ConversationStorageEntry } from "./conversation-recording-query-agent.ts" import type { QueryAgentToolbox } from "./query-agent-toolbox.ts" -import type { QueryAgent, QueryAgentAsk, QueryAgentEvent } from "./query-agent.ts" -import { InMemoryResourceLoader } from "./in-memory-resource-loader.ts" import defaultSystemPrompt from "./prompts/system.txt" +import { + createQueryAgentEventListeners, + QueryAgentEvent, + type QueryAgent, + type QueryAgentAsk, + type QueryAgentCompactedEntryRange, + type QueryAgentCompactionEvent, + type QueryAgentConversationEntryRef, + type QueryAgentEventListeners, + type QueryAgentEventListener, + type QueryAgentEventMap, + type QueryAgentStreamEvent, +} from "./query-agent.ts" +import { createSessionManager } from "./session-manager.ts" import { createFreyaAgentTools, FREYA_AGENT_TOOL_NAMES } from "./tools.ts" type PiSession = Awaited>["session"] type PiMessageEndEvent = Extract type PiAgentMessage = PiMessageEndEvent["message"] type PiAgentEndEvent = Extract +type PiSessionManager = ReturnType +type PiSessionMessage = Parameters[0] export interface PiQueryAgentConfig { - userId: string toolbox: QueryAgentToolbox apiKey?: string cwd?: string systemPrompt?: string + initialEntries?: ConversationStorageEntry[] } -const MODEL_PROVIDER = "openrouter" -const MODEL_ID = "z-ai/glm-4.7-flash" +export const PI_MODEL_PROVIDER = "openrouter" +export const PI_MODEL_ID = "z-ai/glm-4.7-flash" export class PiQueryAgent implements QueryAgent { - private readonly userId: string private readonly toolbox: QueryAgentToolbox private readonly cwd: string private readonly systemPrompt: string private readonly apiKey: string | undefined + private readonly initialEntries: ConversationStorageEntry[] + private readonly eventListeners = createQueryAgentEventListeners() private session: PiSession | null = null private pendingSession: Promise | null = null - private activeRun: symbol | null = null + /** + * Conversation currently receiving Pi events for an active ask(). + * + * Pi's compaction hook fires from the SDK session rather than from our + * QueryAgent call stack, so the hook reads this value to attach the + * compaction summary to the right Freya conversation. null means no active + * run; "" means a run is active but no Freya conversation id was supplied. + */ + private activeConversationId: string | null = null + /** + * Freya entry for the user message currently being handed to Pi. + * + * ConversationRecordingQueryAgent appends the user message before calling + * PiQueryAgent. Pi later persists its own copy of that user message into its + * SessionManager, and this one-shot reference lets us map Pi's generated + * session entry id back to the Freya sequence. + */ + private activeUserMessageEntry: QueryAgentConversationEntryRef | null = null + /** + * Maps Pi SessionManager entry ids to Freya conversation sequences. + * + * Pi compaction reports boundaries with Pi entry ids, while our DB replay + * logic uses monotonically increasing Freya sequences. This map is the bridge + * that lets us translate Pi's firstKeptEntryId into a compacted entry range. + */ + private readonly piEntryConversationSequences = new Map() private disposed = false constructor(config: PiQueryAgentConfig) { - this.userId = config.userId this.toolbox = config.toolbox this.apiKey = config.apiKey this.cwd = config.cwd ?? tmpdir() this.systemPrompt = config.systemPrompt ?? defaultSystemPrompt + this.initialEntries = config.initialEntries ?? [] } - async *ask(input: QueryAgentAsk): AsyncIterable { - if (this.activeRun) { + async *ask(input: QueryAgentAsk): AsyncIterable { + if (this.activeConversationId !== null) { yield { type: "error", - message: "A query is already running for this user", + message: "A query is already running", } return } - const run = Symbol(this.userId) - this.activeRun = run + this.activeConversationId = input.conversationId ?? "" + this.activeUserMessageEntry = input.userMessageEntry ?? null let session: PiSession try { session = await this.getOrCreateSession() } catch (err) { - this.clearActiveRun(run) + this.activeConversationId = null + this.activeUserMessageEntry = null yield { type: "error", message: `Failed to create query session: ${errorMessage(err)}`, @@ -75,11 +121,11 @@ export class PiQueryAgent implements QueryAgent { return } - const events: QueryAgentEvent[] = [] + const events: QueryAgentStreamEvent[] = [] let closed = false let wake: (() => void) | null = null - function push(event: QueryAgentEvent): void { + function push(event: QueryAgentStreamEvent): void { events.push(event) if (wake) { wake() @@ -88,7 +134,7 @@ export class PiQueryAgent implements QueryAgent { } let runFailed = false - function pushRunEvent(event: QueryAgentEvent): void { + function pushRunEvent(event: QueryAgentStreamEvent): void { if (event.type === "error") { if (runFailed) return runFailed = true @@ -108,7 +154,8 @@ export class PiQueryAgent implements QueryAgent { this.handlePiEvent(event, pushRunEvent) }) - void this.runPrompt(session, input) + session + .prompt(input.message) .then(() => { if (runFailed) return pushRunEvent({ type: "done" }) @@ -118,7 +165,8 @@ export class PiQueryAgent implements QueryAgent { }) .finally(() => { unsubscribe() - this.clearActiveRun(run) + this.activeConversationId = null + this.activeUserMessageEntry = null close() }) @@ -140,12 +188,19 @@ export class PiQueryAgent implements QueryAgent { this.session?.dispose() this.session = null this.pendingSession = null - this.activeRun = null + this.activeConversationId = null + this.activeUserMessageEntry = null + this.clearEventListeners() } - private clearActiveRun(run: symbol): void { - if (this.activeRun === run) { - this.activeRun = null + addEventListener( + type: T, + listener: QueryAgentEventListener, + ): () => void { + const listeners = this.listenersFor(type) + listeners.add(listener) + return () => { + listeners.delete(listener) } } @@ -184,38 +239,200 @@ export class PiQueryAgent implements QueryAgent { }) const authStorage = AuthStorage.inMemory() if (this.apiKey) { - authStorage.setRuntimeApiKey(MODEL_PROVIDER, this.apiKey) + authStorage.setRuntimeApiKey(PI_MODEL_PROVIDER, this.apiKey) } const modelRegistry = ModelRegistry.inMemory(authStorage) - const model = modelRegistry.find(MODEL_PROVIDER, MODEL_ID) + const model = modelRegistry.find(PI_MODEL_PROVIDER, PI_MODEL_ID) if (!model) { - throw new Error(`Pi model not found: ${MODEL_PROVIDER}/${MODEL_ID}`) + throw new Error(`Pi model not found: ${PI_MODEL_PROVIDER}/${PI_MODEL_ID}`) } + const resourceLoader = new DefaultResourceLoader({ + cwd: this.cwd, + agentDir: this.cwd, + settingsManager, + systemPrompt: this.systemPrompt, + extensionFactories: [this.createCompactionExtension()], + noExtensions: true, + noSkills: true, + noPromptTemplates: true, + noThemes: true, + noContextFiles: true, + }) + await resourceLoader.reload() + + const sessionManager = this.createMappedSessionManager() + const { session } = await createAgentSession({ cwd: this.cwd, authStorage, modelRegistry, model, - resourceLoader: new InMemoryResourceLoader(this.systemPrompt), + resourceLoader, settingsManager, - sessionManager: SessionManager.inMemory(this.cwd), + sessionManager, noTools: "builtin", customTools: createFreyaAgentTools({ toolbox: this.toolbox, }), - tools: [...FREYA_AGENT_TOOL_NAMES], + tools: FREYA_AGENT_TOOL_NAMES, }) return session } - private async runPrompt(session: PiSession, input: QueryAgentAsk): Promise { - await session.prompt(input.message) + /** + * Creates Pi's SessionManager and records Pi-id -> Freya-sequence mappings. + * + * Hydrated DB messages are mapped through createSessionManager's callback. + * Live user messages are mapped by wrapping appendMessage(), because Pi owns + * the generated session entry id for messages written during prompt handling. + */ + private createMappedSessionManager(): PiSessionManager { + this.piEntryConversationSequences.clear() + const sessionManager = createSessionManager({ + cwd: this.cwd, + entries: this.initialEntries, + modelProvider: PI_MODEL_PROVIDER, + modelId: PI_MODEL_ID, + onMessageEntryAppended: (piEntryId, entry) => { + this.piEntryConversationSequences.set(piEntryId, entry.sequence) + }, + }) + const appendMessage = sessionManager.appendMessage.bind(sessionManager) + + sessionManager.appendMessage = (message: PiSessionMessage): string => { + const piEntryId = appendMessage(message) + const sequence = this.liveConversationSequenceForMessage(message) + if (sequence !== null) { + this.piEntryConversationSequences.set(piEntryId, sequence) + } + return piEntryId + } + + return sessionManager } - private handlePiEvent(event: AgentSessionEvent, push: (event: QueryAgentEvent) => void): void { + /** + * Returns the Freya sequence for Pi's persisted live user message. + * + * We only map user messages here because they are the messages Freya writes + * before invoking Pi. Assistant/tool entries are recorded from the stream + * outside Pi's SessionManager and do not have a stable live Pi id available + * at the storage boundary. + */ + private liveConversationSequenceForMessage(message: PiSessionMessage): number | null { + if (message.role !== "user") return null + + const entry = this.activeUserMessageEntry + this.activeUserMessageEntry = null + if (!entry) return null + + return entry.sequence + } + + /** + * Installs the minimal Pi extension used to observe compaction. + * + * session_before_compact gives us the full branch plus firstKeptEntryId, so + * we translate that boundary before Pi writes the compaction entry. The later + * session_compact event carries the saved summary, which we forward with the + * cached Freya compacted entry range. + */ + private createCompactionExtension(): ExtensionFactory { + return (pi) => { + /** + * Temporary handoff between Pi's before/after compaction hooks. + * + * session_compact receives the saved compaction entry, not the original + * branch entries needed for boundary translation. + */ + let pendingCompactedEntryRange: QueryAgentCompactedEntryRange | null = null + + pi.on("session_before_compact", async (event) => { + pendingCompactedEntryRange = this.compactedEntryRangeBeforePiEntry( + event.branchEntries, + event.preparation.firstKeptEntryId, + ) + }) + + pi.on("session_compact", async (event) => { + const conversationId = this.activeConversationId + if (!conversationId) return + + const entry = event.compactionEntry + const compactedEntryRange = pendingCompactedEntryRange + pendingCompactedEntryRange = null + const compactionEvent: QueryAgentCompactionEvent = { + type: QueryAgentEvent.Compaction, + conversationId, + summary: entry.summary, + firstKeptEntryId: entry.firstKeptEntryId, + compactedEntryRange, + tokensBefore: entry.tokensBefore, + details: entry.details, + fromExtension: event.fromExtension, + } + + await this.emitEvent(compactionEvent) + }) + } + } + + /** + * Returns the Freya entry range compacted before a Pi session entry. + * + * Pi keeps firstKeptEntryId and everything after it as raw context. Therefore + * the summary covers only mapped entries before that Pi entry. If none of + * those entries map back to Freya, we return null so storage can avoid + * recording a summary with an unsafe coverage range. + */ + private compactedEntryRangeBeforePiEntry( + branchEntries: SessionEntry[], + piEntryId: string, + ): QueryAgentCompactedEntryRange | null { + let endSequence: number | null = null + for (const entry of branchEntries) { + if (entry.id === piEntryId) { + if (endSequence === null) return null + + return { + startSequence: 1, + endSequence, + } + } + + const sequence = this.piEntryConversationSequences.get(entry.id) + if (typeof sequence === "number") { + endSequence = sequence + } + } + + return null + } + + private async emitEvent(event: QueryAgentEventMap[T]): Promise { + const listeners = this.listenersFor(event.type) + for (const listener of listeners) { + await listener(event) + } + } + + private listenersFor(type: T): QueryAgentEventListeners[T] { + return this.eventListeners[type] + } + + private clearEventListeners(): void { + for (const listeners of Object.values(this.eventListeners)) { + listeners.clear() + } + } + + private handlePiEvent( + event: AgentSessionEvent, + push: (event: QueryAgentStreamEvent) => void, + ): void { switch (event.type) { case "message_end": { const message = piAssistantMessageError(event.message) @@ -283,7 +500,6 @@ function piAssistantMessageError(message: PiAgentMessage): string | null { case "toolUse": return null } - return null default: return null } diff --git a/apps/freya-backend/src/agent/query-agent.ts b/apps/freya-backend/src/agent/query-agent.ts index ad76e66..54bd132 100644 --- a/apps/freya-backend/src/agent/query-agent.ts +++ b/apps/freya-backend/src/agent/query-agent.ts @@ -1,21 +1,74 @@ export interface QueryAgentAsk { message: string + conversationId?: string + userMessageEntry?: QueryAgentConversationEntryRef } -export type QueryAgentEvent = +export type QueryAgentStreamEvent = + | { type: "conversation"; conversationId: string } | { type: "text_delta"; text: string } | { type: "tool_start"; toolName: string } | { type: "tool_end"; toolName: string; ok: boolean } | { type: "done" } | { type: "error"; message: string } +export const QueryAgentEvent = { + Compaction: "compaction", +} as const + +export type QueryAgentEvent = (typeof QueryAgentEvent)[keyof typeof QueryAgentEvent] + +export interface QueryAgentConversationEntryRef { + id: string + sequence: number +} + +export interface QueryAgentCompactedEntryRange { + startSequence: number + endSequence: number +} + +export interface QueryAgentCompactionEvent { + type: typeof QueryAgentEvent.Compaction + conversationId: string + summary: string + firstKeptEntryId: string + compactedEntryRange: QueryAgentCompactedEntryRange | null + tokensBefore: number + details?: unknown + fromExtension: boolean +} + +export interface QueryAgentEventMap { + [QueryAgentEvent.Compaction]: QueryAgentCompactionEvent +} + +export type QueryAgentEventListener = ( + event: QueryAgentEventMap[T], +) => void | Promise + +export type QueryAgentEventListeners = { + [T in QueryAgentEvent]: Set> +} + +export function createQueryAgentEventListeners(): QueryAgentEventListeners { + return { + [QueryAgentEvent.Compaction]: new Set(), + } +} + export interface QueryAgent { - ask(input: QueryAgentAsk): AsyncIterable + ask(input: QueryAgentAsk): AsyncIterable + addEventListener( + type: T, + listener: QueryAgentEventListener, + ): () => void dispose(): void } export interface QueryAgentResponse { message: string + conversationId?: string } export class QueryAgentError extends Error { @@ -30,9 +83,13 @@ export async function collectQueryAgentResponse( input: QueryAgentAsk, ): Promise { let message = "" + let conversationId: string | undefined for await (const event of agent.ask(input)) { switch (event.type) { + case "conversation": + conversationId = event.conversationId + break case "text_delta": message += event.text break @@ -45,5 +102,5 @@ export async function collectQueryAgentResponse( } } - return { message } + return { message, conversationId } } diff --git a/apps/freya-backend/src/agent/session-manager.test.ts b/apps/freya-backend/src/agent/session-manager.test.ts new file mode 100644 index 0000000..925aae0 --- /dev/null +++ b/apps/freya-backend/src/agent/session-manager.test.ts @@ -0,0 +1,156 @@ +import { describe, expect, test } from "bun:test" + +import type { ConversationStorageEntry } from "./conversation-recording-query-agent.ts" + +import { ConversationEntryKind } from "../conversations/types.ts" +import { createSessionManager } from "./session-manager.ts" + +describe("createSessionManager", () => { + test("hydrates user and assistant entries into Pi session context", () => { + const sessionManager = createSessionManager({ + entries: [ + entry({ + id: "entry-1", + sequence: 1, + kind: ConversationEntryKind.UserMessage, + payload: { + role: "user", + parts: [{ type: "text", text: "hello" }], + }, + }), + entry({ + id: "entry-2", + sequence: 2, + kind: ConversationEntryKind.AssistantMessage, + payload: { + role: "assistant", + parts: [{ type: "text", text: "hi there" }], + }, + metadata: { + modelRun: { + route: "agent_query", + provider: "openrouter", + model: "stored-model", + }, + }, + }), + ], + modelProvider: "openrouter", + modelId: "fallback-model", + }) + + const context = sessionManager.buildSessionContext() + + expect(context.messages.map(roleOf)).toEqual(["user", "assistant"]) + expect(textFromMessage(context.messages[0])).toBe("hello") + expect(textFromMessage(context.messages[1])).toBe("hi there") + expect(context.model).toEqual({ + provider: "openrouter", + modelId: "stored-model", + }) + }) + + test("uses the latest context summary and replays only uncovered raw entries", () => { + const sessionManager = createSessionManager({ + entries: [ + entry({ + id: "entry-1", + sequence: 1, + kind: ConversationEntryKind.UserMessage, + payload: { + role: "user", + parts: [{ type: "text", text: "old question" }], + }, + }), + entry({ + id: "entry-2", + sequence: 2, + kind: ConversationEntryKind.AssistantMessage, + payload: { + role: "assistant", + parts: [{ type: "text", text: "old answer" }], + }, + }), + entry({ + id: "entry-3", + sequence: 3, + kind: ConversationEntryKind.ContextSummary, + payload: { + covers: { + startSequence: 1, + endSequence: 2, + }, + summary: { + durableFacts: ["The user is designing conversation storage."], + preferences: [], + decisions: ["Context compaction is stored as a conversation entry."], + openTasks: [], + importantDetails: [], + }, + promptVersion: "test-v1", + }, + }), + entry({ + id: "entry-4", + sequence: 4, + kind: ConversationEntryKind.UserMessage, + payload: { + role: "user", + parts: [{ type: "text", text: "new question" }], + }, + }), + ], + modelProvider: "openrouter", + modelId: "fallback-model", + }) + + const context = sessionManager.buildSessionContext() + + expect(context.messages.map(roleOf)).toEqual(["compactionSummary", "user"]) + expect(textFromMessage(context.messages[0])).toContain( + "The user is designing conversation storage.", + ) + expect(textFromMessage(context.messages[0])).toContain( + "Context compaction is stored as a conversation entry.", + ) + expect(textFromMessage(context.messages[1])).toBe("new question") + }) +}) + +function entry( + input: Omit & { + createdAt?: Date + metadata?: ConversationStorageEntry["metadata"] + }, +): ConversationStorageEntry { + return { + ...input, + metadata: input.metadata ?? {}, + createdAt: input.createdAt ?? new Date("2026-06-15T00:00:00.000Z"), + } +} + +function roleOf(message: unknown): string | undefined { + if (!isRecord(message)) return undefined + return typeof message.role === "string" ? message.role : undefined +} + +function textFromMessage(message: unknown): string { + if (!isRecord(message)) return "" + if (typeof message.summary === "string") return message.summary + + const content = message.content + if (typeof content === "string") return content + if (!Array.isArray(content)) return "" + + return content.map(textFromContentPart).join("") +} + +function textFromContentPart(part: unknown): string { + if (!isRecord(part)) return "" + return typeof part.text === "string" ? part.text : "" +} + +function isRecord(value: unknown): value is Record { + return typeof value === "object" && value !== null +} diff --git a/apps/freya-backend/src/agent/session-manager.ts b/apps/freya-backend/src/agent/session-manager.ts new file mode 100644 index 0000000..8716905 --- /dev/null +++ b/apps/freya-backend/src/agent/session-manager.ts @@ -0,0 +1,188 @@ +import { SessionManager } from "@earendil-works/pi-coding-agent" +import { tmpdir } from "node:os" + +import type { ConversationStorageEntry } from "./conversation-recording-query-agent.ts" + +import { + AssistantMessagePayload, + ContextSummaryPayload, + ConversationEntryKind, + UserMessagePayload, +} from "../conversations/types.ts" + +type PiMessage = Parameters[0] +type PiAssistantMessage = Extract + +export interface CreateSessionManagerInput { + cwd?: string + entries: ConversationStorageEntry[] + modelProvider: string + modelId: string + onMessageEntryAppended?: (piEntryId: string, entry: ConversationStorageEntry) => void +} + +export function createSessionManager(input: CreateSessionManagerInput): SessionManager { + const sessionManager = SessionManager.inMemory(input.cwd ?? tmpdir()) + const context = buildContextFromEntries(input.entries) + + if (context.summary) { + sessionManager.appendCompaction( + context.summary.text, + "freya-db-context-start", + 0, + { + conversationEntryId: context.summary.entry.id, + covers: context.summary.covers, + }, + true, + ) + } + + for (const entry of context.entries) { + const message = messageForEntry(entry, input.modelProvider, input.modelId) + if (message) { + const piEntryId = sessionManager.appendMessage(message) + input.onMessageEntryAppended?.(piEntryId, entry) + } + } + + return sessionManager +} + +function buildContextFromEntries(entries: ConversationStorageEntry[]): { + summary?: { entry: ConversationStorageEntry; text: string; covers: unknown } + entries: ConversationStorageEntry[] +} { + const orderedEntries = [...entries].sort((left, right) => left.sequence - right.sequence) + const summaryEntry = latestContextSummaryEntry(orderedEntries) + if (!summaryEntry || summaryEntry.kind !== ConversationEntryKind.ContextSummary) { + return { entries: orderedEntries } + } + + const payload = ContextSummaryPayload.assert(summaryEntry.payload) + const text = contextSummaryText(payload.summary) + const rawStartSequence = payload.covers.endSequence + 1 + + return { + summary: { + entry: summaryEntry, + text, + covers: payload.covers, + }, + entries: orderedEntries.filter((entry) => entry.sequence >= rawStartSequence), + } +} + +function latestContextSummaryEntry( + entries: ConversationStorageEntry[], +): ConversationStorageEntry | undefined { + let latest: ConversationStorageEntry | undefined + + for (const entry of entries) { + if (entry.kind !== ConversationEntryKind.ContextSummary) continue + if (!latest || entry.sequence > latest.sequence) { + latest = entry + } + } + + return latest +} + +function messageForEntry( + entry: ConversationStorageEntry, + modelProvider: string, + modelId: string, +): PiMessage | null { + switch (entry.kind) { + case ConversationEntryKind.UserMessage: { + const payload = UserMessagePayload.assert(entry.payload) + return { + role: "user", + content: messagePartsText(payload.parts), + timestamp: entry.createdAt.getTime(), + } + } + case ConversationEntryKind.AssistantMessage: { + const payload = AssistantMessagePayload.assert(entry.payload) + return { + role: "assistant", + content: [{ type: "text", text: messagePartsText(payload.parts) }], + api: "anthropic-messages", + provider: entry.metadata.modelRun?.provider ?? modelProvider, + model: entry.metadata.modelRun?.model ?? modelId, + usage: zeroUsage(), + stopReason: "stop", + timestamp: entry.createdAt.getTime(), + } satisfies PiAssistantMessage + } + case ConversationEntryKind.Attachment: + case ConversationEntryKind.ContextSummary: + case ConversationEntryKind.SystemNote: + case ConversationEntryKind.ToolCall: + case ConversationEntryKind.ToolResult: + return null + } +} + +function messagePartsText( + parts: Array<{ type: "text"; text: string } | { type: "json"; value: unknown }>, +): string { + return parts.map(messagePartText).join("\n") +} + +function messagePartText( + part: { type: "text"; text: string } | { type: "json"; value: unknown }, +): string { + switch (part.type) { + case "text": + return part.text + case "json": + return stringifyJson(part.value) + } +} + +function contextSummaryText(summary: { + userIntent?: string + durableFacts: string[] + preferences: string[] + decisions: string[] + openTasks: string[] + importantDetails: string[] +}): string { + const sections: string[] = [] + pushSection(sections, "User intent", summary.userIntent ? [summary.userIntent] : []) + pushSection(sections, "Durable facts", summary.durableFacts) + pushSection(sections, "Preferences", summary.preferences) + pushSection(sections, "Decisions", summary.decisions) + pushSection(sections, "Open tasks", summary.openTasks) + pushSection(sections, "Important details", summary.importantDetails) + return sections.join("\n\n") +} + +function pushSection(sections: string[], title: string, values: string[]): void { + const trimmedValues = values.map((value) => value.trim()).filter(Boolean) + if (trimmedValues.length === 0) return + + sections.push(`${title}:\n${trimmedValues.map((value) => `- ${value}`).join("\n")}`) +} + +function stringifyJson(value: unknown): string { + return JSON.stringify(value, null, 2) ?? String(value) +} + +function zeroUsage(): PiAssistantMessage["usage"] { + return { + input: 0, + output: 0, + cacheRead: 0, + cacheWrite: 0, + totalTokens: 0, + cost: { + input: 0, + output: 0, + cacheRead: 0, + cacheWrite: 0, + total: 0, + }, + } +} diff --git a/apps/freya-backend/src/conversations/storage.ts b/apps/freya-backend/src/conversations/storage.ts new file mode 100644 index 0000000..de4c4ef --- /dev/null +++ b/apps/freya-backend/src/conversations/storage.ts @@ -0,0 +1,373 @@ +import { and, asc, desc, eq } from "drizzle-orm" + +import type { Database } from "../db/index.ts" +import type { + AssistantMessagePayload, + AttachmentPayload, + ContextSummaryPayload, + ConversationEntryKind as ConversationEntryKindType, + ConversationEntryMetadata, + ConversationEntryPayload, + ConversationEntryVisibility as ConversationEntryVisibilityType, + GenericObjectPayload, + UserMessagePayload, +} from "./types.ts" + +import { + conversationEntries, + conversations as conversationsTable, + files, + user, +} from "../db/schema.ts" +import { + ConversationEntryMetadata as ConversationEntryMetadataSchema, + AssistantMessagePayload as AssistantMessagePayloadSchema, + AttachmentPayload as AttachmentPayloadSchema, + ConversationEntryKind, + ConversationEntryKindInput, + ConversationEntryVisibility, + ConversationEntryVisibilityInput, + ContextSummaryPayload as ContextSummaryPayloadSchema, + GenericObjectPayload as GenericObjectPayloadSchema, + UserMessagePayload as UserMessagePayloadSchema, +} from "./types.ts" + +export type ConversationRow = typeof conversationsTable.$inferSelect +export type ConversationEntryRow = typeof conversationEntries.$inferSelect +export type FileRow = typeof files.$inferSelect + +export interface CreateFileInput { + storageKey: string + originalName?: string + mimeType: string + sizeBytes: number + metadata?: Record +} + +export interface AppendAttachmentEntryInput { + file: CreateFileInput + payload: AttachmentPayload + visibility?: ConversationEntryVisibilityType + metadata?: ConversationEntryMetadata +} + +export interface AppendAttachmentEntryResult { + file: FileRow + entry: ConversationEntryRow +} + +interface AppendConversationEntryBase { + visibility?: ConversationEntryVisibilityType + metadata?: ConversationEntryMetadata +} + +export type AppendConversationEntryInput = + | (AppendConversationEntryBase & { + kind: typeof ConversationEntryKind.UserMessage + payload: UserMessagePayload + fileId?: never + }) + | (AppendConversationEntryBase & { + kind: typeof ConversationEntryKind.AssistantMessage + payload: AssistantMessagePayload + fileId?: never + }) + | (AppendConversationEntryBase & { + kind: typeof ConversationEntryKind.Attachment + payload: AttachmentPayload + fileId: string + }) + | (AppendConversationEntryBase & { + kind: typeof ConversationEntryKind.ContextSummary + payload: ContextSummaryPayload + fileId?: never + }) + | (AppendConversationEntryBase & { + kind: + | typeof ConversationEntryKind.ToolCall + | typeof ConversationEntryKind.ToolResult + | typeof ConversationEntryKind.SystemNote + payload: GenericObjectPayload + fileId?: never + }) + +export interface ListConversationEntriesParams { + visibility?: ConversationEntryVisibilityType +} + +export function conversations(db: Database, userId: string) { + return { + async createConversation(): Promise { + return insertConversation(db, userId) + }, + + async getOrCreateConversation(): Promise { + return db.transaction(async (tx) => { + await requireUserForUpdate(tx, userId) + const existing = await latestConversation(tx, userId) + if (existing) return existing + + return insertConversation(tx, userId) + }) + }, + + async createFile(input: CreateFileInput): Promise { + return insertFile(db, userId, input) + }, + + async appendEntry( + conversationId: string, + input: AppendConversationEntryInput, + ): Promise { + const kind = ConversationEntryKindInput.assert(input.kind) + const visibility = ConversationEntryVisibilityInput.assert( + input.visibility ?? defaultVisibilityForKind(kind), + ) + const payload = payloadForKind(kind, input.payload) + const metadata = ConversationEntryMetadataSchema.assert(input.metadata ?? {}) + let fileId: string | null = null + + if (input.kind === ConversationEntryKind.Attachment) { + fileId = input.fileId + await requireFile(db, userId, fileId) + } + + const rows = await db.transaction(async (tx) => { + await requireConversationForUpdate(tx, userId, conversationId) + const sequence = await nextSequence(tx, conversationId) + + const rows = await tx + .insert(conversationEntries) + .values({ + conversationId, + sequence, + kind, + visibility, + fileId, + payload, + metadata, + }) + .returning() + + await touchConversation(tx, userId, conversationId) + return rows + }) + + return requireRow(rows) + }, + + async appendAttachmentEntry( + conversationId: string, + input: AppendAttachmentEntryInput, + ): Promise { + const payload = AttachmentPayloadSchema.assert(input.payload) + const visibility = ConversationEntryVisibilityInput.assert( + input.visibility ?? defaultVisibilityForKind(ConversationEntryKind.Attachment), + ) + const metadata = ConversationEntryMetadataSchema.assert(input.metadata ?? {}) + + return db.transaction(async (tx) => { + await requireConversationForUpdate(tx, userId, conversationId) + + const file = await insertFile(tx, userId, input.file) + const sequence = await nextSequence(tx, conversationId) + const rows = await tx + .insert(conversationEntries) + .values({ + conversationId, + sequence, + kind: ConversationEntryKind.Attachment, + visibility, + fileId: file.id, + payload, + metadata, + }) + .returning() + + await touchConversation(tx, userId, conversationId) + return { + file, + entry: requireRow(rows), + } + }) + }, + + async listEntries( + conversationId: string, + params: ListConversationEntriesParams = {}, + ): Promise { + await requireConversation(db, userId, conversationId) + + if (params.visibility) { + return db + .select() + .from(conversationEntries) + .where( + and( + eq(conversationEntries.conversationId, conversationId), + eq(conversationEntries.visibility, params.visibility), + ), + ) + .orderBy(asc(conversationEntries.sequence)) + } + + return db + .select() + .from(conversationEntries) + .where(eq(conversationEntries.conversationId, conversationId)) + .orderBy(asc(conversationEntries.sequence)) + }, + } +} + +function payloadForKind( + kind: ConversationEntryKindType, + payload: AppendConversationEntryInput["payload"], +): ConversationEntryPayload { + switch (kind) { + case ConversationEntryKind.UserMessage: + return UserMessagePayloadSchema.assert(payload) + case ConversationEntryKind.AssistantMessage: + return AssistantMessagePayloadSchema.assert(payload) + case ConversationEntryKind.Attachment: + return AttachmentPayloadSchema.assert(payload) + case ConversationEntryKind.ContextSummary: + return ContextSummaryPayloadSchema.assert(payload) + case ConversationEntryKind.ToolCall: + case ConversationEntryKind.ToolResult: + case ConversationEntryKind.SystemNote: + return GenericObjectPayloadSchema.assert(payload) + } +} + +async function requireUserForUpdate(db: Database, userId: string): Promise { + const rows = await db + .select({ id: user.id }) + .from(user) + .where(eq(user.id, userId)) + .limit(1) + .for("update") + + requireRow(rows, `User not found: ${userId}`) +} + +async function requireConversation( + db: Database, + userId: string, + conversationId: string, +): Promise { + const rows = await db + .select() + .from(conversationsTable) + .where(and(eq(conversationsTable.id, conversationId), eq(conversationsTable.userId, userId))) + .limit(1) + + return requireRow(rows, `Conversation not found: ${conversationId}`) +} + +async function requireConversationForUpdate( + db: Database, + userId: string, + conversationId: string, +): Promise { + const rows = await db + .select() + .from(conversationsTable) + .where(and(eq(conversationsTable.id, conversationId), eq(conversationsTable.userId, userId))) + .limit(1) + .for("update") + + return requireRow(rows, `Conversation not found: ${conversationId}`) +} + +async function latestConversation(db: Database, userId: string): Promise { + const rows = await db + .select() + .from(conversationsTable) + .where(eq(conversationsTable.userId, userId)) + .orderBy(desc(conversationsTable.updatedAt), desc(conversationsTable.createdAt)) + .limit(1) + + return rows[0] ?? null +} + +async function insertConversation(db: Database, userId: string): Promise { + const rows = await db + .insert(conversationsTable) + .values({ + userId, + }) + .returning() + + return requireRow(rows) +} + +async function requireFile(db: Database, userId: string, fileId: string): Promise { + const rows = await db + .select() + .from(files) + .where(and(eq(files.id, fileId), eq(files.userId, userId))) + .limit(1) + + return requireRow(rows, `File not found: ${fileId}`) +} + +async function insertFile(db: Database, userId: string, input: CreateFileInput): Promise { + const rows = await db + .insert(files) + .values({ + userId, + storageKey: input.storageKey, + originalName: input.originalName ?? null, + mimeType: input.mimeType, + sizeBytes: input.sizeBytes, + metadata: input.metadata ?? {}, + }) + .returning() + + return requireRow(rows) +} + +async function touchConversation( + db: Database, + userId: string, + conversationId: string, +): Promise { + await db + .update(conversationsTable) + .set({ updatedAt: new Date() }) + .where(and(eq(conversationsTable.id, conversationId), eq(conversationsTable.userId, userId))) +} + +async function nextSequence(db: Database, conversationId: string): Promise { + const rows = await db + .select({ sequence: conversationEntries.sequence }) + .from(conversationEntries) + .where(eq(conversationEntries.conversationId, conversationId)) + .orderBy(desc(conversationEntries.sequence)) + .limit(1) + + return (rows[0]?.sequence ?? 0) + 1 +} + +function requireRow(rows: T[], message = "Expected database row"): T { + const row = rows[0] + if (!row) throw new Error(message) + return row +} + +function defaultVisibilityForKind( + kind: ConversationEntryKindType, +): ConversationEntryVisibilityType { + switch (kind) { + case ConversationEntryKind.UserMessage: + case ConversationEntryKind.AssistantMessage: + case ConversationEntryKind.Attachment: + return ConversationEntryVisibility.UserVisible + case ConversationEntryKind.ToolCall: + case ConversationEntryKind.ToolResult: + case ConversationEntryKind.ContextSummary: + case ConversationEntryKind.SystemNote: + return ConversationEntryVisibility.Internal + } +} diff --git a/apps/freya-backend/src/conversations/types.test.ts b/apps/freya-backend/src/conversations/types.test.ts new file mode 100644 index 0000000..9c513fe --- /dev/null +++ b/apps/freya-backend/src/conversations/types.test.ts @@ -0,0 +1,146 @@ +import { describe, expect, test } from "bun:test" + +import { + AttachmentType, + AttachmentPayload, + ContextSummaryPayload, + ConversationEntryMetadata, + GenericObjectPayload, + UserMessagePayload, +} from "./types.ts" + +describe("conversation entry schemas", () => { + test("parses valid user message payloads", () => { + const payload = UserMessagePayload.assert({ + role: "user", + parts: [{ type: "text", text: "hello" }], + }) + + expect(payload).toEqual({ + role: "user", + parts: [{ type: "text", text: "hello" }], + }) + }) + + test("rejects user message payloads with the wrong role", () => { + expect(() => + UserMessagePayload.assert({ + role: "assistant", + parts: [{ type: "text", text: "hello" }], + }), + ).toThrow() + }) + + test("rejects user message payloads with no parts", () => { + expect(() => + UserMessagePayload.assert({ + role: "user", + parts: [], + }), + ).toThrow() + }) + + test("parses valid attachment payloads", () => { + const payload = AttachmentPayload.assert({ + role: "user", + name: "whiteboard.png", + mimeType: "image/png", + attachmentType: AttachmentType.Image, + caption: "whiteboard sketch", + }) + + expect(payload).toEqual({ + role: "user", + name: "whiteboard.png", + mimeType: "image/png", + attachmentType: AttachmentType.Image, + caption: "whiteboard sketch", + }) + }) + + test("rejects extra fields on structured payloads", () => { + expect(() => + AttachmentPayload.assert({ + role: "user", + name: "whiteboard.png", + mimeType: "image/png", + attachmentType: AttachmentType.Image, + fileId: "file-1", + }), + ).toThrow() + }) + + test("parses context summary payloads", () => { + const payload = ContextSummaryPayload.assert({ + covers: { + startSequence: 1, + endSequence: 12, + }, + summary: { + userIntent: "Design message storage.", + durableFacts: [], + preferences: ["Keep the schema simple."], + decisions: ["Use conversation_entries as the timeline."], + openTasks: [], + importantDetails: [], + }, + promptVersion: "conversation-summary-v1", + sourceEntryIds: ["entry-1", "entry-2"], + }) + + expect(payload).toMatchObject({ + covers: { + startSequence: 1, + endSequence: 12, + }, + promptVersion: "conversation-summary-v1", + }) + }) + + test("allows generic object payloads for tool entries", () => { + const payload = GenericObjectPayload.assert({ + toolCallId: "call-1", + toolName: "calendar.search", + input: { date: "2026-06-15" }, + }) + + expect(payload).toEqual({ + toolCallId: "call-1", + toolName: "calendar.search", + input: { date: "2026-06-15" }, + }) + }) + + test("rejects non-object generic payloads", () => { + expect(() => GenericObjectPayload.assert("done")).toThrow() + }) + + test("parses model run metadata and allows extra top-level metadata", () => { + const metadata = ConversationEntryMetadata.assert({ + modelRun: { + route: "default-chat", + provider: "pi", + model: "pi-model", + inputTokens: 120, + outputTokens: 24, + }, + traceId: "trace-1", + }) + + expect(metadata.modelRun?.model).toBe("pi-model") + expect(metadata.traceId).toBe("trace-1") + }) + + test("rejects invalid model run metadata", () => { + expect(() => + ConversationEntryMetadata.assert({ + modelRun: { + route: "default-chat", + provider: "pi", + model: "pi-model", + inputTokens: -1, + }, + }), + ).toThrow() + }) +}) diff --git a/apps/freya-backend/src/conversations/types.ts b/apps/freya-backend/src/conversations/types.ts new file mode 100644 index 0000000..e178770 --- /dev/null +++ b/apps/freya-backend/src/conversations/types.ts @@ -0,0 +1,136 @@ +import { type } from "arktype" + +export const ConversationEntryKind = { + UserMessage: "user_message", + AssistantMessage: "assistant_message", + Attachment: "attachment", + ToolCall: "tool_call", + ToolResult: "tool_result", + ContextSummary: "context_summary", + SystemNote: "system_note", +} as const + +export type ConversationEntryKind = + (typeof ConversationEntryKind)[keyof typeof ConversationEntryKind] + +export const ConversationEntryVisibility = { + UserVisible: "user_visible", + Internal: "internal", +} as const + +export type ConversationEntryVisibility = + (typeof ConversationEntryVisibility)[keyof typeof ConversationEntryVisibility] + +export const AttachmentType = { + Image: "image", + Audio: "audio", + Video: "video", + Document: "document", + Other: "other", +} as const + +export type AttachmentType = (typeof AttachmentType)[keyof typeof AttachmentType] + +export const ConversationEntryKindInput = type.enumerated(...Object.values(ConversationEntryKind)) +export const ConversationEntryVisibilityInput = type.enumerated( + ...Object.values(ConversationEntryVisibility), +) +export const AttachmentTypeInput = type.enumerated(...Object.values(AttachmentType)) + +const TextMessagePart = type({ + "+": "reject", + type: "'text'", + text: "string", +}) + +const JsonMessagePart = type({ + "+": "reject", + type: "'json'", + value: "unknown", +}) + +export const MessagePart = type.or(TextMessagePart, JsonMessagePart) +export type MessagePart = typeof MessagePart.infer + +export const UserMessagePayload = type({ + "+": "reject", + role: "'user'", + parts: MessagePart.array().atLeastLength(1), +}) + +export type UserMessagePayload = typeof UserMessagePayload.infer + +export const AssistantMessagePayload = type({ + "+": "reject", + role: "'assistant'", + parts: MessagePart.array().atLeastLength(1), +}) + +export type AssistantMessagePayload = typeof AssistantMessagePayload.infer + +export const AttachmentPayload = type({ + "+": "reject", + role: type.enumerated("user", "assistant"), + name: "string", + mimeType: "string", + attachmentType: AttachmentTypeInput, + "caption?": "string", +}) + +export type AttachmentPayload = typeof AttachmentPayload.infer + +const ContextSummary = type({ + "+": "reject", + "userIntent?": "string", + durableFacts: type.string.array(), + preferences: type.string.array(), + decisions: type.string.array(), + openTasks: type.string.array(), + importantDetails: type.string.array(), +}) + +export const ContextSummaryPayload = type({ + "+": "reject", + covers: type({ + "+": "reject", + startSequence: "number.integer >= 1", + endSequence: "number.integer >= 1", + }), + summary: ContextSummary, + promptVersion: "string", + "sourceEntryIds?": type.string.array(), +}) + +export type ContextSummaryPayload = typeof ContextSummaryPayload.infer + +export const ModelRunMetadata = type({ + "+": "reject", + route: "string", + provider: "string", + model: "string", + "contextSummaryEntryId?": "string", + "rawEntriesStartSequence?": "number.integer >= 1", + "rawEntriesEndSequence?": "number.integer >= 1", + "inputTokens?": "number.integer >= 0", + "outputTokens?": "number.integer >= 0", + "providerRequestId?": "string", +}) + +export type ModelRunMetadata = typeof ModelRunMetadata.infer + +export const ConversationEntryMetadata = type({ + "modelRun?": ModelRunMetadata, + "[string]": "unknown", +}) + +export type ConversationEntryMetadata = typeof ConversationEntryMetadata.infer + +export const GenericObjectPayload = type("Record") +export type GenericObjectPayload = typeof GenericObjectPayload.infer + +export type ConversationEntryPayload = + | UserMessagePayload + | AssistantMessagePayload + | AttachmentPayload + | ContextSummaryPayload + | GenericObjectPayload diff --git a/apps/freya-backend/src/db/schema.ts b/apps/freya-backend/src/db/schema.ts index e04d8af..b4f3258 100644 --- a/apps/freya-backend/src/db/schema.ts +++ b/apps/freya-backend/src/db/schema.ts @@ -1,6 +1,9 @@ +import { sql } from "drizzle-orm" import { boolean, + check, customType, + integer, index, jsonb, pgTable, @@ -10,6 +13,14 @@ import { uuid, } from "drizzle-orm/pg-core" +import { + ConversationEntryVisibility, + type ConversationEntryKind, + type ConversationEntryMetadata, + type ConversationEntryPayload, + type ConversationEntryVisibility as ConversationEntryVisibilityType, +} from "../conversations/types.ts" + // --------------------------------------------------------------------------- // Better Auth core tables // Re-exported from CLI-generated schema. @@ -61,6 +72,81 @@ export const userSources = pgTable( ], ) +// --------------------------------------------------------------------------- +// FREYA — conversations +// --------------------------------------------------------------------------- + +export const conversations = pgTable( + "conversations", + { + id: uuid("id").primaryKey().defaultRandom(), + userId: text("user_id") + .notNull() + .references(() => user.id, { onDelete: "cascade" }), + createdAt: timestamp("created_at").notNull().defaultNow(), + updatedAt: timestamp("updated_at") + .notNull() + .defaultNow() + .$onUpdate(() => new Date()), + }, + (t) => [index("conversations_user_id_updated_at_idx").on(t.userId, t.updatedAt)], +) + +export const files = pgTable( + "files", + { + id: uuid("id").primaryKey().defaultRandom(), + userId: text("user_id") + .notNull() + .references(() => user.id, { onDelete: "cascade" }), + storageKey: text("storage_key").notNull(), + originalName: text("original_name"), + mimeType: text("mime_type").notNull(), + sizeBytes: integer("size_bytes").notNull(), + metadata: jsonb("metadata").$type>().notNull().default({}), + createdAt: timestamp("created_at").notNull().defaultNow(), + }, + (t) => [ + unique("files_storage_key_unique").on(t.storageKey), + index("files_user_id_created_at_idx").on(t.userId, t.createdAt), + ], +) + +export const conversationEntries = pgTable( + "conversation_entries", + { + id: uuid("id").primaryKey().defaultRandom(), + conversationId: uuid("conversation_id") + .notNull() + .references(() => conversations.id, { onDelete: "cascade" }), + sequence: integer("sequence").notNull(), + kind: text("kind").$type().notNull(), + visibility: text("visibility") + .$type() + .notNull() + .default(ConversationEntryVisibility.Internal), + fileId: uuid("file_id").references(() => files.id, { onDelete: "restrict" }), + payload: jsonb("payload").$type().notNull(), + metadata: jsonb("metadata").$type().notNull().default({}), + createdAt: timestamp("created_at").notNull().defaultNow(), + }, + (t) => [ + unique("conversation_entries_conversation_id_sequence_unique").on(t.conversationId, t.sequence), + index("conversation_entries_conversation_id_sequence_idx").on(t.conversationId, t.sequence), + index("conversation_entries_conversation_id_visibility_sequence_idx").on( + t.conversationId, + t.visibility, + t.sequence, + ), + index("conversation_entries_kind_idx").on(t.kind), + index("conversation_entries_file_id_idx").on(t.fileId), + check( + "conversation_entries_attachment_file_id_check", + sql`(${t.kind} = 'attachment' and ${t.fileId} is not null) or (${t.kind} <> 'attachment' and ${t.fileId} is null)`, + ), + ], +) + // --------------------------------------------------------------------------- // FREYA — reminders source storage // --------------------------------------------------------------------------- diff --git a/apps/freya-backend/src/engine/http.test.ts b/apps/freya-backend/src/engine/http.test.ts index 4bcdc9b..f2ebb31 100644 --- a/apps/freya-backend/src/engine/http.test.ts +++ b/apps/freya-backend/src/engine/http.test.ts @@ -85,6 +85,20 @@ mock.module("../sources/user-sources.ts", () => ({ }), })) +mock.module("../conversations/storage.ts", () => ({ + conversations: (_db: Database, userId: string) => ({ + async getOrCreateConversation() { + return { id: `conversation-${userId}` } + }, + async listEntries() { + return [] + }, + async appendEntry() { + return { id: "entry-1", sequence: 1 } + }, + }), +})) + const fakeDb = {} as Database describe("GET /api/feed", () => { diff --git a/apps/freya-backend/src/session/user-session-manager.test.ts b/apps/freya-backend/src/session/user-session-manager.test.ts index faad31d..852a32c 100644 --- a/apps/freya-backend/src/session/user-session-manager.test.ts +++ b/apps/freya-backend/src/session/user-session-manager.test.ts @@ -4,9 +4,12 @@ import { LocationSource } from "@freya/source-location" import { WeatherSource } from "@freya/source-weatherkit" import { beforeEach, describe, expect, mock, spyOn, test } from "bun:test" +import type { ConversationStorageEntry } from "../agent/conversation-recording-query-agent.ts" +import type { AppendConversationEntryInput } from "../conversations/storage.ts" import type { Database } from "../db/index.ts" import type { FeedSourceProvider } from "./feed-source-provider.ts" +import { ConversationEntryKind } from "../conversations/types.ts" import { CredentialEncryptor } from "../lib/crypto.ts" import { CredentialStorageUnavailableError, @@ -21,6 +24,8 @@ import { UserSessionManager } from "./user-session-manager.ts" * Key = userId (or "*" for a default), value = array of enabled sourceIds. */ const enabledByUser = new Map() +const conversationEntriesByUser = new Map() +const mockConversationCalls: Array<{ type: "getOrCreate" | "listEntries"; userId: string }> = [] /** Set which sourceIds are enabled for all users. */ function setEnabledSources(sourceIds: string[]) { @@ -37,6 +42,10 @@ function getEnabledSourceIds(userId: string): string[] { return enabledByUser.get(userId) ?? enabledByUser.get("*") ?? [] } +function setConversationEntriesForUser(userId: string, entries: ConversationStorageEntry[]) { + conversationEntriesByUser.set(userId, entries) +} + /** * Controls what `find()` returns in the mock. When `undefined` (the default), * `find()` returns a standard enabled row. Set to a specific value (including @@ -111,6 +120,35 @@ mock.module("../sources/user-sources.ts", () => ({ }), })) +mock.module("../conversations/storage.ts", () => ({ + conversations: (_db: Database, userId: string) => ({ + async getOrCreateConversation(): Promise<{ id: string }> { + mockConversationCalls.push({ type: "getOrCreate", userId }) + return { id: `conversation-${userId}` } + }, + async listEntries(_conversationId: string): Promise { + mockConversationCalls.push({ type: "listEntries", userId }) + return conversationEntriesByUser.get(userId) ?? [] + }, + async appendEntry( + _conversationId: string, + input: AppendConversationEntryInput, + ): Promise { + const entries = conversationEntriesByUser.get(userId) ?? [] + const row: ConversationStorageEntry = { + id: `entry-${entries.length + 1}`, + sequence: entries.length + 1, + kind: input.kind, + payload: input.payload, + metadata: input.metadata ?? {}, + createdAt: new Date("2026-06-15T00:00:00.000Z"), + } + conversationEntriesByUser.set(userId, [...entries, row]) + return row + }, + }), +})) + const fakeDb = { transaction: (fn: (tx: unknown) => Promise) => fn(fakeDb), } as unknown as Database @@ -160,6 +198,8 @@ const weatherProvider: FeedSourceProvider = { beforeEach(() => { enabledByUser.clear() + conversationEntriesByUser.clear() + mockConversationCalls.length = 0 mockFindResult = undefined mockUpdateCredentialsCalls.length = 0 mockUpdateCredentialsError = null @@ -176,6 +216,31 @@ describe("UserSessionManager", () => { expect(session.engine).toBeDefined() }) + test("getOrCreate eagerly loads conversation entries for the user session", async () => { + setEnabledSources([]) + setConversationEntriesForUser("user-1", [ + { + id: "entry-1", + sequence: 1, + kind: ConversationEntryKind.UserMessage, + payload: { + role: "user", + parts: [{ type: "text", text: "stored hello" }], + }, + metadata: {}, + createdAt: new Date("2026-06-15T00:00:00.000Z"), + }, + ]) + const manager = new UserSessionManager({ db: fakeDb, providers: [] }) + + await manager.getOrCreate("user-1") + + expect(mockConversationCalls).toEqual([ + { type: "getOrCreate", userId: "user-1" }, + { type: "listEntries", userId: "user-1" }, + ]) + }) + test("getOrCreate returns same session for same user", async () => { setEnabledSources(["freya.location"]) const manager = new UserSessionManager({ db: fakeDb, providers: [locationProvider] }) diff --git a/apps/freya-backend/src/session/user-session-manager.ts b/apps/freya-backend/src/session/user-session-manager.ts index b4f6adf..18143d1 100644 --- a/apps/freya-backend/src/session/user-session-manager.ts +++ b/apps/freya-backend/src/session/user-session-manager.ts @@ -8,6 +8,7 @@ import type { FeedEnhancer } from "../enhancement/enhance-feed.ts" import type { CredentialEncryptor } from "../lib/crypto.ts" import type { FeedSourceProvider } from "./feed-source-provider.ts" +import { conversations } from "../conversations/storage.ts" import { CredentialStorageUnavailableError, InvalidSourceConfigError, @@ -362,6 +363,7 @@ export class UserSessionManager { private async createSession(userId: string): Promise { const enabledRows = await sources(this.db, userId).enabled() + const agentConfig = this.queryAgentConfigForUser(userId) const promises: Promise[] = [] for (const row of enabledRows) { @@ -373,7 +375,7 @@ export class UserSessionManager { } if (promises.length === 0) { - return new UserSession(userId, [], this.feedEnhancer, this.queryAgentConfig) + return this.initializedSession(userId, [], agentConfig) } const results = await Promise.allSettled(promises) @@ -397,7 +399,29 @@ export class UserSessionManager { console.error("[UserSessionManager] Feed source provider failed:", error) } - return new UserSession(userId, feedSources, this.feedEnhancer, this.queryAgentConfig) + return this.initializedSession(userId, feedSources, agentConfig) + } + + private queryAgentConfigForUser(userId: string): UserSessionAgentConfig { + return { + ...(this.queryAgentConfig ?? {}), + conversationStorage: conversations(this.db, userId), + } + } + + private async initializedSession( + userId: string, + sources: FeedSource[], + agentConfig: UserSessionAgentConfig, + ): Promise { + const session = new UserSession(userId, sources, this.feedEnhancer, agentConfig) + try { + await session.initialize() + return session + } catch (err) { + session.destroy() + throw err + } } /** diff --git a/apps/freya-backend/src/session/user-session.test.ts b/apps/freya-backend/src/session/user-session.test.ts index 9dd4b46..0933549 100644 --- a/apps/freya-backend/src/session/user-session.test.ts +++ b/apps/freya-backend/src/session/user-session.test.ts @@ -3,6 +3,13 @@ import type { ActionDefinition, ContextEntry, FeedItem, FeedSource } from "@frey import { LocationSource } from "@freya/source-location" import { describe, expect, spyOn, test } from "bun:test" +import type { + ConversationStorage, + ConversationStorageEntry, +} from "../agent/conversation-recording-query-agent.ts" +import type { AppendConversationEntryInput } from "../conversations/storage.ts" + +import { ConversationEntryKind } from "../conversations/types.ts" import { UserSession } from "./user-session.ts" function createStubSource(id: string, items: FeedItem[] = []): FeedSource { @@ -23,6 +30,40 @@ function createStubSource(id: string, items: FeedItem[] = []): FeedSource { } } +class FakeConversationStorage implements ConversationStorage { + readonly calls: string[] = [] + private readonly entries: ConversationStorageEntry[] + + constructor(entries: ConversationStorageEntry[] = []) { + this.entries = entries + } + + async getOrCreateConversation(): Promise<{ id: string }> { + this.calls.push("getOrCreateConversation") + return { id: "conversation-1" } + } + + async appendEntry( + _conversationId: string, + input: AppendConversationEntryInput, + ): Promise { + this.calls.push("appendEntry") + return { + id: "entry-appended", + sequence: 1, + kind: input.kind, + payload: input.payload, + metadata: input.metadata ?? {}, + createdAt: new Date("2026-06-15T00:00:00.000Z"), + } + } + + async listEntries(_conversationId: string): Promise { + this.calls.push("listEntries") + return this.entries + } +} + describe("UserSession", () => { test("registers sources and starts engine", async () => { const session = new UserSession("test-user", [ @@ -67,6 +108,32 @@ describe("UserSession", () => { expect(disposeSpy).toHaveBeenCalled() }) + test("initialize loads conversation entries before exposing stored agent", async () => { + const storage = new FakeConversationStorage([ + { + id: "entry-1", + sequence: 1, + kind: ConversationEntryKind.UserMessage, + payload: { + role: "user", + parts: [{ type: "text", text: "stored hello" }], + }, + metadata: {}, + createdAt: new Date("2026-06-15T00:00:00.000Z"), + }, + ]) + const session = new UserSession("test-user", [createStubSource("test")], null, { + conversationStorage: storage, + }) + + expect(() => session.agent).toThrow("UserSession has not been initialized") + + await session.initialize() + + expect(storage.calls).toEqual(["getOrCreateConversation", "listEntries"]) + expect(session.agent).toBeDefined() + }) + test("engine.executeAction routes to correct source", async () => { const location = new LocationSource() const session = new UserSession("test-user", [location]) diff --git a/apps/freya-backend/src/session/user-session.ts b/apps/freya-backend/src/session/user-session.ts index da5c8d9..2ee5c95 100644 --- a/apps/freya-backend/src/session/user-session.ts +++ b/apps/freya-backend/src/session/user-session.ts @@ -10,22 +10,30 @@ import type { QueryAgentToolbox } from "../agent/query-agent-toolbox.ts" import type { QueryAgent } from "../agent/query-agent.ts" import type { FeedEnhancer } from "../enhancement/enhance-feed.ts" -import { PiQueryAgent } from "../agent/pi-query-agent.ts" +import { + ConversationRecordingQueryAgent, + type ConversationStorage, +} from "../agent/conversation-recording-query-agent.ts" +import { PiQueryAgent, PI_MODEL_ID, PI_MODEL_PROVIDER } from "../agent/pi-query-agent.ts" import { UserSessionQueryAgentToolbox } from "../agent/user-session-query-agent-toolbox.ts" export interface UserSessionAgentConfig { apiKey?: string cwd?: string systemPrompt?: string + conversationStorage?: ConversationStorage } export class UserSession { readonly userId: string readonly engine: FeedEngine readonly toolbox: QueryAgentToolbox - readonly agent: QueryAgent private sources = new Map() private readonly enhancer: FeedEnhancer | null + private readonly agentConfig: UserSessionAgentConfig | undefined + private queryAgent: QueryAgent | null = null + private initializePromise: Promise | null = null + private initialized = false private enhancedItems: FeedItem[] | null = null /** The FeedResult that enhancedItems was derived from. */ private enhancedSource: FeedResult | null = null @@ -41,6 +49,7 @@ export class UserSession { this.userId = userId this.engine = new FeedEngine() this.enhancer = enhancer ?? null + this.agentConfig = agentConfig for (const source of sources) { this.sources.set(source.id, source) this.engine.register(source) @@ -54,17 +63,43 @@ export class UserSession { } this.toolbox = new UserSessionQueryAgentToolbox(this) - this.agent = new PiQueryAgent({ - userId: this.userId, - toolbox: this.toolbox, - apiKey: agentConfig?.apiKey, - cwd: agentConfig?.cwd, - systemPrompt: agentConfig?.systemPrompt, - }) + if (!agentConfig?.conversationStorage) { + this.queryAgent = new PiQueryAgent({ + toolbox: this.toolbox, + apiKey: this.agentConfig?.apiKey, + cwd: this.agentConfig?.cwd, + systemPrompt: this.agentConfig?.systemPrompt, + }) + this.initialized = true + } this.engine.start() } + get agent(): QueryAgent { + if (!this.queryAgent) { + throw new Error("UserSession has not been initialized") + } + return this.queryAgent + } + + async initialize(): Promise { + if (this.initialized) return + if (this.initializePromise) return this.initializePromise + + const promise = this.initializeAgent() + this.initializePromise = promise + + try { + await promise + this.initialized = true + } finally { + if (this.initializePromise === promise) { + this.initializePromise = null + } + } + } + /** * Returns the current feed, refreshing if the engine cache expired. * Enhancement runs eagerly on engine updates; this method awaits @@ -201,7 +236,8 @@ export class UserSession { } destroy(): void { - this.agent.dispose() + this.queryAgent?.dispose() + this.queryAgent = null this.unsubscribe?.() this.unsubscribe = null this.engine.stop() @@ -210,6 +246,38 @@ export class UserSession { this.enhancingPromise = null } + private async initializeAgent(): Promise { + if (this.queryAgent) return + + const conversationStorage = this.agentConfig?.conversationStorage + if (!conversationStorage) { + this.queryAgent = new PiQueryAgent({ + toolbox: this.toolbox, + apiKey: this.agentConfig?.apiKey, + cwd: this.agentConfig?.cwd, + systemPrompt: this.agentConfig?.systemPrompt, + }) + return + } + + const conversation = await conversationStorage.getOrCreateConversation() + const entries = await conversationStorage.listEntries(conversation.id) + + this.queryAgent = new ConversationRecordingQueryAgent({ + agent: new PiQueryAgent({ + toolbox: this.toolbox, + apiKey: this.agentConfig?.apiKey, + cwd: this.agentConfig?.cwd, + systemPrompt: this.agentConfig?.systemPrompt, + initialEntries: entries, + }), + storage: conversationStorage, + defaultConversationId: conversation.id, + modelProvider: PI_MODEL_PROVIDER, + modelId: PI_MODEL_ID, + }) + } + private invalidateEnhancement(): void { this.enhancedItems = null this.enhancedSource = null diff --git a/apps/freya-backend/src/sources/http.test.ts b/apps/freya-backend/src/sources/http.test.ts index a8d6d6a..abee1f8 100644 --- a/apps/freya-backend/src/sources/http.test.ts +++ b/apps/freya-backend/src/sources/http.test.ts @@ -128,6 +128,20 @@ mock.module("../sources/user-sources.ts", () => ({ }, })) +mock.module("../conversations/storage.ts", () => ({ + conversations: (_db: Database, userId: string) => ({ + async getOrCreateConversation() { + return { id: `conversation-${userId}` } + }, + async listEntries() { + return [] + }, + async appendEntry() { + return { id: "entry-1", sequence: 1 } + }, + }), +})) + const fakeDb = { transaction: (fn: (tx: unknown) => Promise) => fn(fakeDb), } as unknown as Database