From 2c03653e91b6a035532269566e418cde014f71b9 Mon Sep 17 00:00:00 2001 From: kenneth Date: Thu, 19 Mar 2026 23:27:19 +0000 Subject: [PATCH] feat: runtime provider hotswap Add ability to replace a FeedSourceProvider at runtime and propagate the new source to all active (and pending) user sessions, invalidating their feed caches. Co-authored-by: Ona --- apps/aelis-backend/src/engine/http.test.ts | 45 ++- apps/aelis-backend/src/location/provider.ts | 1 + .../src/session/feed-source-provider.ts | 6 +- apps/aelis-backend/src/session/index.ts | 6 +- .../src/session/user-session-manager.test.ts | 338 +++++++++++++++--- .../src/session/user-session-manager.ts | 79 +++- .../src/session/user-session.test.ts | 172 +++++++++ .../aelis-backend/src/session/user-session.ts | 53 +++ apps/aelis-backend/src/tfl/provider.ts | 1 + apps/aelis-backend/src/weather/provider.ts | 1 + packages/aelis-core/src/feed-engine.test.ts | 75 ++++ packages/aelis-core/src/feed-engine.ts | 21 +- 12 files changed, 717 insertions(+), 81 deletions(-) diff --git a/apps/aelis-backend/src/engine/http.test.ts b/apps/aelis-backend/src/engine/http.test.ts index 1e6c8ca..2880edf 100644 --- a/apps/aelis-backend/src/engine/http.test.ts +++ b/apps/aelis-backend/src/engine/http.test.ts @@ -72,7 +72,14 @@ describe("GET /api/feed", () => { }, ] const manager = new UserSessionManager({ - providers: [async () => createStubSource("test", items)], + providers: [ + { + sourceId: "test", + async feedSourceForUser() { + return createStubSource("test", items) + }, + }, + ], }) const app = buildTestApp(manager, "user-1") @@ -105,7 +112,14 @@ describe("GET /api/feed", () => { }, ] const manager = new UserSessionManager({ - providers: [async () => createStubSource("test", items)], + providers: [ + { + sourceId: "test", + async feedSourceForUser() { + return createStubSource("test", items) + }, + }, + ], }) const app = buildTestApp(manager, "user-1") @@ -136,7 +150,16 @@ describe("GET /api/feed", () => { throw new Error("connection timeout") }, } - const manager = new UserSessionManager({ providers: [async () => failingSource] }) + const manager = new UserSessionManager({ + providers: [ + { + sourceId: "failing", + async feedSourceForUser() { + return failingSource + }, + }, + ], + }) const app = buildTestApp(manager, "user-1") const res = await app.request("/api/feed") @@ -152,8 +175,11 @@ describe("GET /api/feed", () => { test("returns 503 when all providers fail", async () => { const manager = new UserSessionManager({ providers: [ - async () => { - throw new Error("provider down") + { + sourceId: "test", + async feedSourceForUser() { + throw new Error("provider down") + }, }, ], }) @@ -181,7 +207,14 @@ describe("GET /api/context", () => { async function buildContextApp(userId?: string) { const manager = new UserSessionManager({ - providers: [async () => createStubSource("weather", [], contextEntries)], + providers: [ + { + sourceId: "weather", + async feedSourceForUser() { + return createStubSource("weather", [], contextEntries) + }, + }, + ], }) const app = buildTestApp(manager, userId) const session = await manager.getOrCreate(mockUserId) diff --git a/apps/aelis-backend/src/location/provider.ts b/apps/aelis-backend/src/location/provider.ts index ed9bfd3..958640b 100644 --- a/apps/aelis-backend/src/location/provider.ts +++ b/apps/aelis-backend/src/location/provider.ts @@ -7,6 +7,7 @@ import { SourceDisabledError } from "../sources/errors.ts" import { sources } from "../sources/user-sources.ts" export class LocationSourceProvider implements FeedSourceProvider { + readonly sourceId = "aelis.location" private readonly db: Database constructor(db: Database) { diff --git a/apps/aelis-backend/src/session/feed-source-provider.ts b/apps/aelis-backend/src/session/feed-source-provider.ts index 45aa5b3..0430520 100644 --- a/apps/aelis-backend/src/session/feed-source-provider.ts +++ b/apps/aelis-backend/src/session/feed-source-provider.ts @@ -1,9 +1,7 @@ import type { FeedSource } from "@aelis/core" export interface FeedSourceProvider { + /** The source ID this provider is responsible for (e.g., "aelis.location"). */ + readonly sourceId: string feedSourceForUser(userId: string): Promise } - -export type FeedSourceProviderFn = (userId: string) => Promise - -export type FeedSourceProviderInput = FeedSourceProvider | FeedSourceProviderFn diff --git a/apps/aelis-backend/src/session/index.ts b/apps/aelis-backend/src/session/index.ts index 6f77741..e56c84a 100644 --- a/apps/aelis-backend/src/session/index.ts +++ b/apps/aelis-backend/src/session/index.ts @@ -1,7 +1,3 @@ -export type { - FeedSourceProvider, - FeedSourceProviderFn, - FeedSourceProviderInput, -} from "./feed-source-provider.ts" +export type { FeedSourceProvider } from "./feed-source-provider.ts" export { UserSession } from "./user-session.ts" export { UserSessionManager } from "./user-session-manager.ts" diff --git a/apps/aelis-backend/src/session/user-session-manager.test.ts b/apps/aelis-backend/src/session/user-session-manager.test.ts index e49bdbc..cd57a19 100644 --- a/apps/aelis-backend/src/session/user-session-manager.test.ts +++ b/apps/aelis-backend/src/session/user-session-manager.test.ts @@ -1,15 +1,55 @@ +import type { ActionDefinition, ContextEntry, FeedItem, FeedSource } from "@aelis/core" + import { LocationSource } from "@aelis/source-location" import { WeatherSource } from "@aelis/source-weatherkit" import { describe, expect, mock, spyOn, test } from "bun:test" +import type { FeedSourceProvider } from "./feed-source-provider.ts" + import { UserSessionManager } from "./user-session-manager.ts" -const mockWeatherProvider = async () => - new WeatherSource({ client: { fetch: async () => ({}) as never } }) +function createStubSource(id: string, items: FeedItem[] = []): FeedSource { + return { + id, + async listActions(): Promise> { + return {} + }, + async executeAction(): Promise { + return undefined + }, + async fetchContext(): Promise { + return null + }, + async fetchItems() { + return items + }, + } +} + +function createStubProvider( + sourceId: string, + factory: (userId: string) => Promise = async () => createStubSource(sourceId), +): FeedSourceProvider { + return { sourceId, feedSourceForUser: factory } +} + +const locationProvider: FeedSourceProvider = { + sourceId: "aelis.location", + async feedSourceForUser() { + return new LocationSource() + }, +} + +const weatherProvider: FeedSourceProvider = { + sourceId: "aelis.weather", + async feedSourceForUser() { + return new WeatherSource({ client: { fetch: async () => ({}) as never } }) + }, +} describe("UserSessionManager", () => { test("getOrCreate creates session on first call", async () => { - const manager = new UserSessionManager({ providers: [async () => new LocationSource()] }) + const manager = new UserSessionManager({ providers: [locationProvider] }) const session = await manager.getOrCreate("user-1") @@ -18,7 +58,7 @@ describe("UserSessionManager", () => { }) test("getOrCreate returns same session for same user", async () => { - const manager = new UserSessionManager({ providers: [async () => new LocationSource()] }) + const manager = new UserSessionManager({ providers: [locationProvider] }) const session1 = await manager.getOrCreate("user-1") const session2 = await manager.getOrCreate("user-1") @@ -27,7 +67,7 @@ describe("UserSessionManager", () => { }) test("getOrCreate returns different sessions for different users", async () => { - const manager = new UserSessionManager({ providers: [async () => new LocationSource()] }) + const manager = new UserSessionManager({ providers: [locationProvider] }) const session1 = await manager.getOrCreate("user-1") const session2 = await manager.getOrCreate("user-2") @@ -36,7 +76,7 @@ describe("UserSessionManager", () => { }) test("each user gets independent source instances", async () => { - const manager = new UserSessionManager({ providers: [async () => new LocationSource()] }) + const manager = new UserSessionManager({ providers: [locationProvider] }) const session1 = await manager.getOrCreate("user-1") const session2 = await manager.getOrCreate("user-2") @@ -48,7 +88,7 @@ describe("UserSessionManager", () => { }) test("remove destroys session and allows re-creation", async () => { - const manager = new UserSessionManager({ providers: [async () => new LocationSource()] }) + const manager = new UserSessionManager({ providers: [locationProvider] }) const session1 = await manager.getOrCreate("user-1") manager.remove("user-1") @@ -58,33 +98,14 @@ describe("UserSessionManager", () => { }) test("remove is no-op for unknown user", () => { - const manager = new UserSessionManager({ providers: [async () => new LocationSource()] }) + const manager = new UserSessionManager({ providers: [locationProvider] }) expect(() => manager.remove("unknown")).not.toThrow() }) - test("accepts function providers", async () => { - const manager = new UserSessionManager({ providers: [async () => new LocationSource()] }) - - const session = await manager.getOrCreate("user-1") - const result = await session.engine.refresh() - - expect(result.errors).toHaveLength(0) - }) - - test("accepts object providers", async () => { + test("registers multiple providers", async () => { const manager = new UserSessionManager({ - providers: [async () => new LocationSource(), mockWeatherProvider], - }) - - const session = await manager.getOrCreate("user-1") - - expect(session.getSource("aelis.weather")).toBeDefined() - }) - - test("accepts mixed providers", async () => { - const manager = new UserSessionManager({ - providers: [async () => new LocationSource(), mockWeatherProvider], + providers: [locationProvider, weatherProvider], }) const session = await manager.getOrCreate("user-1") @@ -94,7 +115,7 @@ describe("UserSessionManager", () => { }) test("refresh returns feed result through session", async () => { - const manager = new UserSessionManager({ providers: [async () => new LocationSource()] }) + const manager = new UserSessionManager({ providers: [locationProvider] }) const session = await manager.getOrCreate("user-1") const result = await session.engine.refresh() @@ -106,7 +127,7 @@ describe("UserSessionManager", () => { }) test("location update via executeAction works", async () => { - const manager = new UserSessionManager({ providers: [async () => new LocationSource()] }) + const manager = new UserSessionManager({ providers: [locationProvider] }) const session = await manager.getOrCreate("user-1") await session.engine.executeAction("aelis.location", "update-location", { @@ -121,7 +142,7 @@ describe("UserSessionManager", () => { }) test("subscribe receives updates after location push", async () => { - const manager = new UserSessionManager({ providers: [async () => new LocationSource()] }) + const manager = new UserSessionManager({ providers: [locationProvider] }) const callback = mock() const session = await manager.getOrCreate("user-1") @@ -141,7 +162,7 @@ describe("UserSessionManager", () => { }) test("remove stops reactive updates", async () => { - const manager = new UserSessionManager({ providers: [async () => new LocationSource()] }) + const manager = new UserSessionManager({ providers: [locationProvider] }) const callback = mock() const session = await manager.getOrCreate("user-1") @@ -164,13 +185,15 @@ describe("UserSessionManager", () => { }) test("creates session with successful providers when some fail", async () => { + const failingProvider: FeedSourceProvider = { + sourceId: "aelis.failing", + async feedSourceForUser() { + throw new Error("provider failed") + }, + } + const manager = new UserSessionManager({ - providers: [ - async () => new LocationSource(), - async () => { - throw new Error("provider failed") - }, - ], + providers: [locationProvider, failingProvider], }) const spy = spyOn(console, "error").mockImplementation(() => {}) @@ -187,11 +210,17 @@ describe("UserSessionManager", () => { test("throws AggregateError when all providers fail", async () => { const manager = new UserSessionManager({ providers: [ - async () => { - throw new Error("first failed") + { + sourceId: "aelis.fail-1", + async feedSourceForUser() { + throw new Error("first failed") + }, }, - async () => { - throw new Error("second failed") + { + sourceId: "aelis.fail-2", + async feedSourceForUser() { + throw new Error("second failed") + }, }, ], }) @@ -203,11 +232,13 @@ describe("UserSessionManager", () => { let callCount = 0 const manager = new UserSessionManager({ providers: [ - async () => { - callCount++ - // Simulate async work to widen the race window - await new Promise((resolve) => setTimeout(resolve, 10)) - return new LocationSource() + { + sourceId: "aelis.location", + async feedSourceForUser() { + callCount++ + await new Promise((resolve) => setTimeout(resolve, 10)) + return new LocationSource() + }, }, ], }) @@ -229,9 +260,12 @@ describe("UserSessionManager", () => { const manager = new UserSessionManager({ providers: [ - async () => { - await providerGate - return new LocationSource() + { + sourceId: "aelis.location", + async feedSourceForUser() { + await providerGate + return new LocationSource() + }, }, ], }) @@ -252,3 +286,205 @@ describe("UserSessionManager", () => { expect(freshSession.engine).toBeDefined() }) }) + +describe("UserSessionManager.replaceProvider", () => { + test("replaces source in all active sessions", async () => { + const itemsV1: FeedItem[] = [ + { + id: "v1", + sourceId: "test", + type: "test", + timestamp: new Date(), + data: { version: 1 }, + }, + ] + const itemsV2: FeedItem[] = [ + { + id: "v2", + sourceId: "test", + type: "test", + timestamp: new Date(), + data: { version: 2 }, + }, + ] + + const providerV1 = createStubProvider("test", async () => createStubSource("test", itemsV1)) + const manager = new UserSessionManager({ providers: [providerV1] }) + + const session1 = await manager.getOrCreate("user-1") + const session2 = await manager.getOrCreate("user-2") + + // Verify v1 items + const feed1 = await session1.feed() + expect(feed1.items[0]!.data.version).toBe(1) + + // Replace provider + const providerV2 = createStubProvider("test", async () => createStubSource("test", itemsV2)) + await manager.replaceProvider(providerV2) + + // Both sessions should now serve v2 items + const feed1After = await session1.feed() + const feed2After = await session2.feed() + expect(feed1After.items[0]!.data.version).toBe(2) + expect(feed2After.items[0]!.data.version).toBe(2) + }) + + test("throws for unknown provider sourceId", async () => { + const manager = new UserSessionManager({ providers: [locationProvider] }) + + const unknownProvider = createStubProvider("aelis.unknown") + + await expect(manager.replaceProvider(unknownProvider)).rejects.toThrow( + "no existing provider with that sourceId", + ) + }) + + test("removes source from session when new provider fails for a user", async () => { + const providerV1 = createStubProvider("test", async () => createStubSource("test")) + const manager = new UserSessionManager({ providers: [providerV1] }) + + const session = await manager.getOrCreate("user-1") + expect(session.getSource("test")).toBeDefined() + + const spy = spyOn(console, "error").mockImplementation(() => {}) + + const failingProvider = createStubProvider("test", async () => { + throw new Error("source disabled") + }) + await manager.replaceProvider(failingProvider) + + expect(session.getSource("test")).toBeUndefined() + expect(spy).toHaveBeenCalled() + + spy.mockRestore() + }) + + test("new sessions use the replaced provider", async () => { + const itemsV1: FeedItem[] = [ + { + id: "v1", + sourceId: "test", + type: "test", + timestamp: new Date(), + data: { version: 1 }, + }, + ] + const itemsV2: FeedItem[] = [ + { + id: "v2", + sourceId: "test", + type: "test", + timestamp: new Date(), + data: { version: 2 }, + }, + ] + + const providerV1 = createStubProvider("test", async () => createStubSource("test", itemsV1)) + const manager = new UserSessionManager({ providers: [providerV1] }) + + const providerV2 = createStubProvider("test", async () => createStubSource("test", itemsV2)) + await manager.replaceProvider(providerV2) + + // New session should use v2 + const session = await manager.getOrCreate("user-new") + const feed = await session.feed() + expect(feed.items[0]!.data.version).toBe(2) + }) + + test("does not affect other providers' sources", async () => { + const providerA = createStubProvider("source-a", async () => + createStubSource("source-a", [ + { + id: "a-1", + sourceId: "source-a", + type: "test", + timestamp: new Date(), + data: { from: "a" }, + }, + ]), + ) + const providerB = createStubProvider("source-b", async () => + createStubSource("source-b", [ + { + id: "b-1", + sourceId: "source-b", + type: "test", + timestamp: new Date(), + data: { from: "b" }, + }, + ]), + ) + + const manager = new UserSessionManager({ providers: [providerA, providerB] }) + const session = await manager.getOrCreate("user-1") + + // Replace only source-a + const providerA2 = createStubProvider("source-a", async () => + createStubSource("source-a", [ + { + id: "a-2", + sourceId: "source-a", + type: "test", + timestamp: new Date(), + data: { from: "a-new" }, + }, + ]), + ) + await manager.replaceProvider(providerA2) + + // source-b should be unaffected + expect(session.getSource("source-b")).toBeDefined() + const feed = await session.feed() + const ids = feed.items.map((i) => i.id).sort() + expect(ids).toEqual(["a-2", "b-1"]) + }) + + test("updates sessions that are still being created", async () => { + const itemsV1: FeedItem[] = [ + { + id: "v1", + sourceId: "test", + type: "test", + timestamp: new Date(), + data: { version: 1 }, + }, + ] + const itemsV2: FeedItem[] = [ + { + id: "v2", + sourceId: "test", + type: "test", + timestamp: new Date(), + data: { version: 2 }, + }, + ] + + let resolveCreation: () => void + const creationGate = new Promise((r) => { + resolveCreation = r + }) + + const providerV1 = createStubProvider("test", async () => { + await creationGate + return createStubSource("test", itemsV1) + }) + const manager = new UserSessionManager({ providers: [providerV1] }) + + // Start session creation but don't let it finish yet + const sessionPromise = manager.getOrCreate("user-1") + + // Replace provider while session is still pending + const providerV2 = createStubProvider("test", async () => createStubSource("test", itemsV2)) + const replacePromise = manager.replaceProvider(providerV2) + + // Let the original creation finish + resolveCreation!() + + const session = await sessionPromise + await replacePromise + + // Session should have been updated to v2 + const feed = await session.feed() + expect(feed.items[0]!.data.version).toBe(2) + }) +}) diff --git a/apps/aelis-backend/src/session/user-session-manager.ts b/apps/aelis-backend/src/session/user-session-manager.ts index df82908..79e2d63 100644 --- a/apps/aelis-backend/src/session/user-session-manager.ts +++ b/apps/aelis-backend/src/session/user-session-manager.ts @@ -1,29 +1,31 @@ import type { FeedSource } from "@aelis/core" import type { FeedEnhancer } from "../enhancement/enhance-feed.ts" -import type { FeedSourceProviderInput } from "./feed-source-provider.ts" +import type { FeedSourceProvider } from "./feed-source-provider.ts" import { UserSession } from "./user-session.ts" export interface UserSessionManagerConfig { - providers: FeedSourceProviderInput[] + providers: FeedSourceProvider[] feedEnhancer?: FeedEnhancer | null } export class UserSessionManager { - private sessions = new Map() + private sessions = new Map() private pending = new Map>() - private readonly providers: FeedSourceProviderInput[] + private readonly providers = new Map() private readonly feedEnhancer: FeedEnhancer | null constructor(config: UserSessionManagerConfig) { - this.providers = config.providers + for (const provider of config.providers) { + this.providers.set(provider.sourceId, provider) + } this.feedEnhancer = config.feedEnhancer ?? null } async getOrCreate(userId: string): Promise { const existing = this.sessions.get(userId) - if (existing) return existing + if (existing) return existing.session const inflight = this.pending.get(userId) if (inflight) return inflight @@ -38,7 +40,7 @@ export class UserSessionManager { session.destroy() throw new Error(`Session for user ${userId} was removed during creation`) } - this.sessions.set(userId, session) + this.sessions.set(userId, { userId, session }) return session } finally { this.pending.delete(userId) @@ -46,20 +48,71 @@ export class UserSessionManager { } remove(userId: string): void { - const session = this.sessions.get(userId) - if (session) { - session.destroy() + const entry = this.sessions.get(userId) + if (entry) { + entry.session.destroy() this.sessions.delete(userId) } // Cancel any in-flight creation so getOrCreate won't store the session this.pending.delete(userId) } + /** + * Replaces a provider and updates all active sessions. + * The new provider must have the same sourceId as an existing one. + * For each active session, resolves a new source from the provider. + * If the provider fails for a user, the old source is removed from that session. + */ + async replaceProvider(provider: FeedSourceProvider): Promise { + if (!this.providers.has(provider.sourceId)) { + throw new Error( + `Cannot replace provider "${provider.sourceId}": no existing provider with that sourceId`, + ) + } + + this.providers.set(provider.sourceId, provider) + + const updates: Promise[] = [] + + for (const [, { userId, session }] of this.sessions) { + updates.push(this.updateSessionSource(provider, userId, session)) + } + + // Also update sessions that are currently being created so they + // don't land in this.sessions with a stale source. + for (const [userId, pendingPromise] of this.pending) { + updates.push( + pendingPromise + .then((session) => this.updateSessionSource(provider, userId, session)) + .catch(() => { + // Session creation itself failed — nothing to update. + }), + ) + } + + await Promise.all(updates) + } + + private async updateSessionSource( + provider: FeedSourceProvider, + userId: string, + session: UserSession, + ): Promise { + try { + const newSource = await provider.feedSourceForUser(userId) + session.replaceSource(provider.sourceId, newSource) + } catch (err) { + console.error( + `[UserSessionManager] replaceProvider("${provider.sourceId}") failed for user ${userId}:`, + err, + ) + session.removeSource(provider.sourceId) + } + } + private async createSession(userId: string): Promise { const results = await Promise.allSettled( - this.providers.map((p) => - typeof p === "function" ? p(userId) : p.feedSourceForUser(userId), - ), + Array.from(this.providers.values()).map((p) => p.feedSourceForUser(userId)), ) const sources: FeedSource[] = [] diff --git a/apps/aelis-backend/src/session/user-session.test.ts b/apps/aelis-backend/src/session/user-session.test.ts index 4d0a374..9918c33 100644 --- a/apps/aelis-backend/src/session/user-session.test.ts +++ b/apps/aelis-backend/src/session/user-session.test.ts @@ -214,3 +214,175 @@ describe("UserSession.feed", () => { expect(result.items[0]!.data.value).toBe(42) }) }) + +describe("UserSession.replaceSource", () => { + test("replaces source and invalidates feed cache", async () => { + const itemsA: FeedItem[] = [ + { + id: "a-1", + sourceId: "test", + type: "test", + timestamp: new Date("2025-01-01T00:00:00.000Z"), + data: { from: "a" }, + }, + ] + const itemsB: FeedItem[] = [ + { + id: "b-1", + sourceId: "test", + type: "test", + timestamp: new Date("2025-01-01T00:00:00.000Z"), + data: { from: "b" }, + }, + ] + + const sourceA = createStubSource("test", itemsA) + const session = new UserSession([sourceA]) + + const result1 = await session.feed() + expect(result1.items).toHaveLength(1) + expect(result1.items[0]!.data.from).toBe("a") + + const sourceB = createStubSource("test", itemsB) + session.replaceSource("test", sourceB) + + const result2 = await session.feed() + expect(result2.items).toHaveLength(1) + expect(result2.items[0]!.data.from).toBe("b") + }) + + test("getSource returns new source after replace", () => { + const sourceA = createStubSource("test") + const session = new UserSession([sourceA]) + + const sourceB = createStubSource("test") + session.replaceSource("test", sourceB) + + expect(session.getSource("test")).toBe(sourceB) + expect(session.getSource("test")).not.toBe(sourceA) + }) + + test("throws when replacing a source that is not registered", () => { + const session = new UserSession([createStubSource("test")]) + + expect(() => session.replaceSource("nonexistent", createStubSource("other"))).toThrow( + 'Cannot replace source "nonexistent": not registered', + ) + }) + + test("other sources are unaffected by replace", async () => { + const sourceA = createStubSource("source-a", [ + { + id: "a-1", + sourceId: "source-a", + type: "test", + timestamp: new Date(), + data: { from: "a" }, + }, + ]) + const sourceB = createStubSource("source-b", [ + { + id: "b-1", + sourceId: "source-b", + type: "test", + timestamp: new Date(), + data: { from: "b" }, + }, + ]) + const session = new UserSession([sourceA, sourceB]) + + const replacement = createStubSource("source-a", [ + { + id: "a-2", + sourceId: "source-a", + type: "test", + timestamp: new Date(), + data: { from: "a-new" }, + }, + ]) + session.replaceSource("source-a", replacement) + + const result = await session.feed() + expect(result.items).toHaveLength(2) + + const ids = result.items.map((i) => i.id).sort() + expect(ids).toEqual(["a-2", "b-1"]) + }) + + test("invalidates enhancement cache on replace", async () => { + const items: FeedItem[] = [ + { + id: "item-1", + sourceId: "test", + type: "test", + timestamp: new Date(), + data: { version: 1 }, + }, + ] + let enhanceCount = 0 + const enhancer = async (feedItems: FeedItem[]) => { + enhanceCount++ + return feedItems.map((item) => ({ ...item, data: { ...item.data, enhanced: true } })) + } + + const session = new UserSession([createStubSource("test", items)], enhancer) + + await session.feed() + expect(enhanceCount).toBe(1) + + const newItems: FeedItem[] = [ + { + id: "item-2", + sourceId: "test", + type: "test", + timestamp: new Date(), + data: { version: 2 }, + }, + ] + session.replaceSource("test", createStubSource("test", newItems)) + + const result = await session.feed() + expect(enhanceCount).toBe(2) + expect(result.items[0]!.id).toBe("item-2") + expect(result.items[0]!.data.enhanced).toBe(true) + }) +}) + +describe("UserSession.removeSource", () => { + test("removes source from engine and sources map", () => { + const session = new UserSession([createStubSource("test-a"), createStubSource("test-b")]) + + session.removeSource("test-a") + + expect(session.getSource("test-a")).toBeUndefined() + expect(session.getSource("test-b")).toBeDefined() + }) + + test("invalidates feed cache on remove", async () => { + const items: FeedItem[] = [ + { + id: "item-1", + sourceId: "test", + type: "test", + timestamp: new Date(), + data: {}, + }, + ] + const session = new UserSession([createStubSource("test", items)]) + + const result1 = await session.feed() + expect(result1.items).toHaveLength(1) + + session.removeSource("test") + + const result2 = await session.feed() + expect(result2.items).toHaveLength(0) + }) + + test("is a no-op for unknown source", () => { + const session = new UserSession([createStubSource("test")]) + + expect(() => session.removeSource("unknown")).not.toThrow() + expect(session.getSource("test")).toBeDefined() + }) +}) diff --git a/apps/aelis-backend/src/session/user-session.ts b/apps/aelis-backend/src/session/user-session.ts index 31633f5..3243876 100644 --- a/apps/aelis-backend/src/session/user-session.ts +++ b/apps/aelis-backend/src/session/user-session.ts @@ -67,6 +67,59 @@ export class UserSession { return this.sources.get(sourceId) as T | undefined } + /** + * Replaces a source in the engine and invalidates all caches. + * Stops and restarts the engine to re-establish reactive subscriptions. + */ + replaceSource(oldSourceId: string, newSource: FeedSource): void { + if (!this.sources.has(oldSourceId)) { + throw new Error(`Cannot replace source "${oldSourceId}": not registered`) + } + + const wasStarted = this.engine.isStarted() + + if (wasStarted) { + this.engine.stop() + } + + this.engine.unregister(oldSourceId) + this.sources.delete(oldSourceId) + + this.engine.register(newSource) + this.sources.set(newSource.id, newSource) + + this.invalidateEnhancement() + this.enhancingPromise = null + + if (wasStarted) { + this.engine.start() + } + } + + /** + * Removes a source from the engine and invalidates all caches. + * Stops and restarts the engine to clean up reactive subscriptions. + */ + removeSource(sourceId: string): void { + if (!this.sources.has(sourceId)) return + + const wasStarted = this.engine.isStarted() + + if (wasStarted) { + this.engine.stop() + } + + this.engine.unregister(sourceId) + this.sources.delete(sourceId) + + this.invalidateEnhancement() + this.enhancingPromise = null + + if (wasStarted) { + this.engine.start() + } + } + destroy(): void { this.unsubscribe?.() this.unsubscribe = null diff --git a/apps/aelis-backend/src/tfl/provider.ts b/apps/aelis-backend/src/tfl/provider.ts index d69ca96..286b491 100644 --- a/apps/aelis-backend/src/tfl/provider.ts +++ b/apps/aelis-backend/src/tfl/provider.ts @@ -16,6 +16,7 @@ const tflConfig = type({ }) export class TflSourceProvider implements FeedSourceProvider { + readonly sourceId = "aelis.tfl" private readonly db: Database private readonly apiKey: string | undefined private readonly client: ITflApi | undefined diff --git a/apps/aelis-backend/src/weather/provider.ts b/apps/aelis-backend/src/weather/provider.ts index 61d6468..6997a25 100644 --- a/apps/aelis-backend/src/weather/provider.ts +++ b/apps/aelis-backend/src/weather/provider.ts @@ -20,6 +20,7 @@ const weatherConfig = type({ }) export class WeatherSourceProvider implements FeedSourceProvider { + readonly sourceId = "aelis.weather" private readonly db: Database private readonly credentials: WeatherSourceOptions["credentials"] private readonly client: WeatherSourceOptions["client"] diff --git a/packages/aelis-core/src/feed-engine.test.ts b/packages/aelis-core/src/feed-engine.test.ts index 40fb85e..d2217bc 100644 --- a/packages/aelis-core/src/feed-engine.test.ts +++ b/packages/aelis-core/src/feed-engine.test.ts @@ -180,6 +180,31 @@ describe("FeedEngine", () => { expect(engine.refresh()).resolves.toBeDefined() }) + + test("register invalidates feed cache", async () => { + const location = createLocationSource() + const engine = new FeedEngine().register(location) + + await engine.refresh() + expect(engine.lastFeed()).not.toBeNull() + + engine.register(createWeatherSource()) + + expect(engine.lastFeed()).toBeNull() + }) + + test("unregister invalidates feed cache", async () => { + const location = createLocationSource() + const weather = createWeatherSource() + const engine = new FeedEngine().register(location).register(weather) + + await engine.refresh() + expect(engine.lastFeed()).not.toBeNull() + + engine.unregister("weather") + + expect(engine.lastFeed()).toBeNull() + }) }) describe("graph validation", () => { @@ -934,4 +959,54 @@ describe("FeedEngine", () => { engine.stop() }) }) + + describe("invalidateCache", () => { + test("clears cached result", async () => { + const location = createLocationSource() + const engine = new FeedEngine().register(location) + + await engine.refresh() + expect(engine.lastFeed()).not.toBeNull() + + engine.invalidateCache() + + expect(engine.lastFeed()).toBeNull() + }) + + test("is safe to call when no cache exists", () => { + const engine = new FeedEngine() + + expect(() => engine.invalidateCache()).not.toThrow() + expect(engine.lastFeed()).toBeNull() + }) + }) + + describe("isStarted", () => { + test("returns false before start", () => { + const engine = new FeedEngine() + + expect(engine.isStarted()).toBe(false) + }) + + test("returns true after start", () => { + const location = createLocationSource() + const engine = new FeedEngine().register(location) + + engine.start() + + expect(engine.isStarted()).toBe(true) + + engine.stop() + }) + + test("returns false after stop", () => { + const location = createLocationSource() + const engine = new FeedEngine().register(location) + + engine.start() + engine.stop() + + expect(engine.isStarted()).toBe(false) + }) + }) }) diff --git a/packages/aelis-core/src/feed-engine.ts b/packages/aelis-core/src/feed-engine.ts index f1baf6f..c93708b 100644 --- a/packages/aelis-core/src/feed-engine.ts +++ b/packages/aelis-core/src/feed-engine.ts @@ -97,23 +97,33 @@ export class FeedEngine { } /** - * Registers a FeedSource. Invalidates the cached graph. + * Registers a FeedSource. Invalidates the cached graph and feed cache. */ register(source: FeedSource): FeedEngine { this.sources.set(source.id, source) this.graph = null + this.invalidateCache() return this as FeedEngine } /** - * Unregisters a FeedSource by ID. Invalidates the cached graph. + * Unregisters a FeedSource by ID. Invalidates the cached graph and feed cache. */ unregister(sourceId: string): this { this.sources.delete(sourceId) this.graph = null + this.invalidateCache() return this } + /** + * Clears the cached feed result so the next access triggers a fresh refresh. + */ + invalidateCache(): void { + this.cachedResult = null + this.cachedAt = null + } + /** * Registers a post-processor. Processors run in registration order * after items are collected, on every update path. @@ -249,6 +259,13 @@ export class FeedEngine { this.cleanups = [] } + /** + * Returns whether the engine is currently running reactive subscriptions. + */ + isStarted(): boolean { + return this.started + } + /** * Returns the current accumulated context. */