diff --git a/apps/aelis-backend/src/session/user-session-manager.test.ts b/apps/aelis-backend/src/session/user-session-manager.test.ts index cd57a19..1ff398b 100644 --- a/apps/aelis-backend/src/session/user-session-manager.test.ts +++ b/apps/aelis-backend/src/session/user-session-manager.test.ts @@ -339,7 +339,7 @@ describe("UserSessionManager.replaceProvider", () => { ) }) - test("removes source from session when new provider fails for a user", async () => { + test("keeps existing source when new provider fails for a user", async () => { const providerV1 = createStubProvider("test", async () => createStubSource("test")) const manager = new UserSessionManager({ providers: [providerV1] }) @@ -353,7 +353,7 @@ describe("UserSessionManager.replaceProvider", () => { }) await manager.replaceProvider(failingProvider) - expect(session.getSource("test")).toBeUndefined() + expect(session.getSource("test")).toBeDefined() expect(spy).toHaveBeenCalled() spy.mockRestore() diff --git a/apps/aelis-backend/src/session/user-session-manager.ts b/apps/aelis-backend/src/session/user-session-manager.ts index e0d5612..ee4be3a 100644 --- a/apps/aelis-backend/src/session/user-session-manager.ts +++ b/apps/aelis-backend/src/session/user-session-manager.ts @@ -11,7 +11,7 @@ export interface UserSessionManagerConfig { } export class UserSessionManager { - private sessions = new Map() + private sessions = new Map() private pending = new Map>() private readonly providers = new Map() private readonly feedEnhancer: FeedEnhancer | null @@ -29,7 +29,7 @@ export class UserSessionManager { async getOrCreate(userId: string): Promise { const existing = this.sessions.get(userId) - if (existing) return existing.session + if (existing) return existing const inflight = this.pending.get(userId) if (inflight) return inflight @@ -44,7 +44,7 @@ export class UserSessionManager { session.destroy() throw new Error(`Session for user ${userId} was removed during creation`) } - this.sessions.set(userId, { userId, session }) + this.sessions.set(userId, session) return session } finally { this.pending.delete(userId) @@ -52,9 +52,9 @@ export class UserSessionManager { } remove(userId: string): void { - const entry = this.sessions.get(userId) - if (entry) { - entry.session.destroy() + const session = this.sessions.get(userId) + if (session) { + session.destroy() this.sessions.delete(userId) } // Cancel any in-flight creation so getOrCreate won't store the session @@ -64,8 +64,8 @@ export class UserSessionManager { /** * Replaces a provider and updates all active sessions. * The new provider must have the same sourceId as an existing one. - * For each active session, resolves a new source from the provider. - * If the provider fails for a user, the old source is removed from that session. + * For each active session, re-resolves the source via session.refreshSource. + * If the provider fails for a user, the existing source is kept. */ async replaceProvider(provider: FeedSourceProvider): Promise { if (!this.providers.has(provider.sourceId)) { @@ -78,16 +78,16 @@ export class UserSessionManager { const updates: Promise[] = [] - for (const [, { userId, session }] of this.sessions) { - updates.push(this.updateSessionSource(provider, userId, session)) + for (const [, session] of this.sessions) { + updates.push(session.refreshSource(provider)) } // Also update sessions that are currently being created so they // don't land in this.sessions with a stale source. - for (const [userId, pendingPromise] of this.pending) { + for (const [, pendingPromise] of this.pending) { updates.push( pendingPromise - .then((session) => this.updateSessionSource(provider, userId, session)) + .then((session) => session.refreshSource(provider)) .catch(() => { // Session creation itself failed — nothing to update. }), @@ -97,23 +97,6 @@ export class UserSessionManager { await Promise.all(updates) } - private async updateSessionSource( - provider: FeedSourceProvider, - userId: string, - session: UserSession, - ): Promise { - try { - const newSource = await provider.feedSourceForUser(userId) - session.replaceSource(provider.sourceId, newSource) - } catch (err) { - console.error( - `[UserSessionManager] replaceProvider("${provider.sourceId}") failed for user ${userId}:`, - err, - ) - session.removeSource(provider.sourceId) - } - } - private async createSession(userId: string): Promise { const results = await Promise.allSettled( Array.from(this.providers.values()).map((p) => p.feedSourceForUser(userId)), @@ -138,6 +121,6 @@ export class UserSessionManager { console.error("[UserSessionManager] Feed source provider failed:", error) } - return new UserSession(sources, this.feedEnhancer) + return new UserSession(userId, sources, this.feedEnhancer) } } diff --git a/apps/aelis-backend/src/session/user-session.test.ts b/apps/aelis-backend/src/session/user-session.test.ts index 9918c33..25274d9 100644 --- a/apps/aelis-backend/src/session/user-session.test.ts +++ b/apps/aelis-backend/src/session/user-session.test.ts @@ -1,7 +1,9 @@ import type { ActionDefinition, ContextEntry, FeedItem, FeedSource } from "@aelis/core" import { LocationSource } from "@aelis/source-location" -import { describe, expect, test } from "bun:test" +import { describe, expect, spyOn, test } from "bun:test" + +import type { FeedSourceProvider } from "./feed-source-provider.ts" import { UserSession } from "./user-session.ts" @@ -25,7 +27,10 @@ function createStubSource(id: string, items: FeedItem[] = []): FeedSource { describe("UserSession", () => { test("registers sources and starts engine", async () => { - const session = new UserSession([createStubSource("test-a"), createStubSource("test-b")]) + const session = new UserSession("test-user", [ + createStubSource("test-a"), + createStubSource("test-b"), + ]) const result = await session.engine.refresh() @@ -34,7 +39,7 @@ describe("UserSession", () => { test("getSource returns registered source", () => { const location = new LocationSource() - const session = new UserSession([location]) + const session = new UserSession("test-user", [location]) const result = session.getSource("aelis.location") @@ -42,13 +47,13 @@ describe("UserSession", () => { }) test("getSource returns undefined for unknown source", () => { - const session = new UserSession([createStubSource("test")]) + const session = new UserSession("test-user", [createStubSource("test")]) expect(session.getSource("unknown")).toBeUndefined() }) test("destroy stops engine and clears sources", () => { - const session = new UserSession([createStubSource("test")]) + const session = new UserSession("test-user", [createStubSource("test")]) session.destroy() @@ -57,7 +62,7 @@ describe("UserSession", () => { test("engine.executeAction routes to correct source", async () => { const location = new LocationSource() - const session = new UserSession([location]) + const session = new UserSession("test-user", [location]) await session.engine.executeAction("aelis.location", "update-location", { lat: 51.5, @@ -82,7 +87,7 @@ describe("UserSession.feed", () => { data: { value: 42 }, }, ] - const session = new UserSession([createStubSource("test", items)]) + const session = new UserSession("test-user", [createStubSource("test", items)]) const result = await session.feed() @@ -103,7 +108,7 @@ describe("UserSession.feed", () => { const enhancer = async (feedItems: FeedItem[]) => feedItems.map((item) => ({ ...item, data: { ...item.data, enhanced: true } })) - const session = new UserSession([createStubSource("test", items)], enhancer) + const session = new UserSession("test-user", [createStubSource("test", items)], enhancer) const result = await session.feed() @@ -127,7 +132,7 @@ describe("UserSession.feed", () => { return feedItems.map((item) => ({ ...item, data: { ...item.data, enhanced: true } })) } - const session = new UserSession([createStubSource("test", items)], enhancer) + const session = new UserSession("test-user", [createStubSource("test", items)], enhancer) const result1 = await session.feed() expect(result1.items[0]!.data.enhanced).toBe(true) @@ -162,7 +167,7 @@ describe("UserSession.feed", () => { })) } - const session = new UserSession([source], enhancer) + const session = new UserSession("test-user", [source], enhancer) // First feed triggers refresh + enhancement const result1 = await session.feed() @@ -205,7 +210,7 @@ describe("UserSession.feed", () => { throw new Error("enhancement exploded") } - const session = new UserSession([createStubSource("test", items)], enhancer) + const session = new UserSession("test-user", [createStubSource("test", items)], enhancer) const result = await session.feed() @@ -237,7 +242,7 @@ describe("UserSession.replaceSource", () => { ] const sourceA = createStubSource("test", itemsA) - const session = new UserSession([sourceA]) + const session = new UserSession("test-user", [sourceA]) const result1 = await session.feed() expect(result1.items).toHaveLength(1) @@ -253,7 +258,7 @@ describe("UserSession.replaceSource", () => { test("getSource returns new source after replace", () => { const sourceA = createStubSource("test") - const session = new UserSession([sourceA]) + const session = new UserSession("test-user", [sourceA]) const sourceB = createStubSource("test") session.replaceSource("test", sourceB) @@ -263,7 +268,7 @@ describe("UserSession.replaceSource", () => { }) test("throws when replacing a source that is not registered", () => { - const session = new UserSession([createStubSource("test")]) + const session = new UserSession("test-user", [createStubSource("test")]) expect(() => session.replaceSource("nonexistent", createStubSource("other"))).toThrow( 'Cannot replace source "nonexistent": not registered', @@ -289,7 +294,7 @@ describe("UserSession.replaceSource", () => { data: { from: "b" }, }, ]) - const session = new UserSession([sourceA, sourceB]) + const session = new UserSession("test-user", [sourceA, sourceB]) const replacement = createStubSource("source-a", [ { @@ -325,7 +330,7 @@ describe("UserSession.replaceSource", () => { return feedItems.map((item) => ({ ...item, data: { ...item.data, enhanced: true } })) } - const session = new UserSession([createStubSource("test", items)], enhancer) + const session = new UserSession("test-user", [createStubSource("test", items)], enhancer) await session.feed() expect(enhanceCount).toBe(1) @@ -350,7 +355,10 @@ describe("UserSession.replaceSource", () => { describe("UserSession.removeSource", () => { test("removes source from engine and sources map", () => { - const session = new UserSession([createStubSource("test-a"), createStubSource("test-b")]) + const session = new UserSession("test-user", [ + createStubSource("test-a"), + createStubSource("test-b"), + ]) session.removeSource("test-a") @@ -368,7 +376,7 @@ describe("UserSession.removeSource", () => { data: {}, }, ] - const session = new UserSession([createStubSource("test", items)]) + const session = new UserSession("test-user", [createStubSource("test", items)]) const result1 = await session.feed() expect(result1.items).toHaveLength(1) @@ -380,9 +388,79 @@ describe("UserSession.removeSource", () => { }) test("is a no-op for unknown source", () => { - const session = new UserSession([createStubSource("test")]) + const session = new UserSession("test-user", [createStubSource("test")]) expect(() => session.removeSource("unknown")).not.toThrow() expect(session.getSource("test")).toBeDefined() }) }) + +describe("UserSession.refreshSource", () => { + test("replaces existing source via provider", async () => { + const itemsV1: FeedItem[] = [ + { + id: "v1", + sourceId: "test", + type: "test", + timestamp: new Date(), + data: { version: 1 }, + }, + ] + const itemsV2: FeedItem[] = [ + { + id: "v2", + sourceId: "test", + type: "test", + timestamp: new Date(), + data: { version: 2 }, + }, + ] + + const session = new UserSession("test-user", [createStubSource("test", itemsV1)]) + + const provider: FeedSourceProvider = { + sourceId: "test", + async feedSourceForUser() { + return createStubSource("test", itemsV2) + }, + } + + await session.refreshSource(provider) + + const result = await session.feed() + expect(result.items[0]!.data.version).toBe(2) + }) + + test("throws when source is not registered", async () => { + const session = new UserSession("test-user", [createStubSource("existing")]) + + const provider: FeedSourceProvider = { + sourceId: "new-source", + async feedSourceForUser() { + return createStubSource("new-source") + }, + } + + await expect(session.refreshSource(provider)).rejects.toThrow() + }) + + test("keeps existing source when provider fails", async () => { + const session = new UserSession("test-user", [createStubSource("test")]) + + const spy = spyOn(console, "error").mockImplementation(() => {}) + + const provider: FeedSourceProvider = { + sourceId: "test", + async feedSourceForUser() { + throw new Error("source disabled") + }, + } + + await session.refreshSource(provider) + + expect(session.getSource("test")).toBeDefined() + expect(spy).toHaveBeenCalled() + + spy.mockRestore() + }) +}) diff --git a/apps/aelis-backend/src/session/user-session.ts b/apps/aelis-backend/src/session/user-session.ts index 3243876..a09a6ee 100644 --- a/apps/aelis-backend/src/session/user-session.ts +++ b/apps/aelis-backend/src/session/user-session.ts @@ -1,8 +1,10 @@ import { FeedEngine, type FeedItem, type FeedResult, type FeedSource } from "@aelis/core" import type { FeedEnhancer } from "../enhancement/enhance-feed.ts" +import type { FeedSourceProvider } from "./feed-source-provider.ts" export class UserSession { + readonly userId: string readonly engine: FeedEngine private sources = new Map() private readonly enhancer: FeedEnhancer | null @@ -12,7 +14,8 @@ export class UserSession { private enhancingPromise: Promise | null = null private unsubscribe: (() => void) | null = null - constructor(sources: FeedSource[], enhancer?: FeedEnhancer | null) { + constructor(userId: string, sources: FeedSource[], enhancer?: FeedEnhancer | null) { + this.userId = userId this.engine = new FeedEngine() this.enhancer = enhancer ?? null for (const source of sources) { @@ -67,6 +70,27 @@ export class UserSession { return this.sources.get(sourceId) as T | undefined } + /** + * Re-resolves a source from its provider using this session's userId. + * The source must already be registered. Throws if it isn't. + * If the provider fails, the existing source is kept. + */ + async refreshSource(provider: FeedSourceProvider): Promise { + if (!this.sources.has(provider.sourceId)) { + throw new Error(`Cannot refresh source "${provider.sourceId}": not registered`) + } + + try { + const newSource = await provider.feedSourceForUser(this.userId) + this.replaceSource(provider.sourceId, newSource) + } catch (err) { + console.error( + `[UserSession] refreshSource("${provider.sourceId}") failed for user ${this.userId}:`, + err, + ) + } + } + /** * Replaces a source in the engine and invalidates all caches. * Stops and restarts the engine to re-establish reactive subscriptions.