From a6be7b31e7642ee4a57118cf07c8b38cc20aeed9 Mon Sep 17 00:00:00 2001 From: Kenneth Date: Sun, 22 Mar 2026 16:28:19 +0000 Subject: [PATCH] feat(session): query enabled sources before providers (#85) UserSessionManager now queries the user_sources table for enabled sources before calling any provider. Providers receive the per-user JSON config directly instead of querying the DB themselves, removing their db dependency and eliminating redundant round-trips. Co-authored-by: Ona --- apps/aelis-backend/src/admin/http.test.ts | 39 ++- apps/aelis-backend/src/admin/http.ts | 2 - apps/aelis-backend/src/engine/http.test.ts | 55 ++++- apps/aelis-backend/src/location/provider.ts | 17 +- apps/aelis-backend/src/server.ts | 4 +- .../src/session/feed-source-provider.ts | 2 +- .../src/session/user-session-manager.test.ts | 229 ++++++++++++++++-- .../src/session/user-session-manager.ts | 67 ++++- .../src/session/user-session.test.ts | 72 ------ .../aelis-backend/src/session/user-session.ts | 22 +- apps/aelis-backend/src/sources/errors.ts | 18 -- apps/aelis-backend/src/tfl/provider.ts | 22 +- apps/aelis-backend/src/weather/provider.ts | 19 +- 13 files changed, 368 insertions(+), 200 deletions(-) diff --git a/apps/aelis-backend/src/admin/http.test.ts b/apps/aelis-backend/src/admin/http.test.ts index 60ef7c8..ed73986 100644 --- a/apps/aelis-backend/src/admin/http.test.ts +++ b/apps/aelis-backend/src/admin/http.test.ts @@ -1,7 +1,7 @@ import type { ActionDefinition, ContextEntry, FeedItem, FeedSource } from "@aelis/core" +import { describe, expect, mock, test } from "bun:test" import { Hono } from "hono" -import { describe, expect, test } from "bun:test" import type { AdminMiddleware } from "../auth/admin-middleware.ts" import type { AuthSession, AuthUser } from "../auth/session.ts" @@ -11,6 +11,39 @@ import type { FeedSourceProvider } from "../session/feed-source-provider.ts" import { UserSessionManager } from "../session/user-session-manager.ts" import { registerAdminHttpHandlers } from "./http.ts" +let mockEnabledSourceIds: string[] = [] + +mock.module("../sources/user-sources.ts", () => ({ + sources: (_db: Database, _userId: string) => ({ + async enabled() { + const now = new Date() + return mockEnabledSourceIds.map((sourceId) => ({ + id: crypto.randomUUID(), + userId: _userId, + sourceId, + enabled: true, + config: {}, + credentials: null, + createdAt: now, + updatedAt: now, + })) + }, + async find(sourceId: string) { + const now = new Date() + return { + id: crypto.randomUUID(), + userId: _userId, + sourceId, + enabled: true, + config: {}, + credentials: null, + createdAt: now, + updatedAt: now, + } + }, + }), +})) + function createStubSource(id: string): FeedSource { return { id, @@ -63,7 +96,8 @@ function passthroughAdminMiddleware(): AdminMiddleware { const fakeDb = {} as Database function createApp(providers: FeedSourceProvider[]) { - const sessionManager = new UserSessionManager({ providers }) + mockEnabledSourceIds = providers.map((p) => p.sourceId) + const sessionManager = new UserSessionManager({ db: fakeDb, providers }) const app = new Hono() registerAdminHttpHandlers(app, { sessionManager, @@ -158,5 +192,4 @@ describe("PUT /api/admin/:sourceId/config", () => { expect(provider!.sourceId).toBe("aelis.weather") expect(provider).not.toBe(originalProvider) }) - }) diff --git a/apps/aelis-backend/src/admin/http.ts b/apps/aelis-backend/src/admin/http.ts index 52921fc..4ad3e0e 100644 --- a/apps/aelis-backend/src/admin/http.ts +++ b/apps/aelis-backend/src/admin/http.ts @@ -51,7 +51,6 @@ async function handleUpdateProviderConfig(c: Context) { } const sessionManager = c.get("sessionManager") - const db = c.get("db") let body: unknown try { @@ -68,7 +67,6 @@ async function handleUpdateProviderConfig(c: Context) { } const updated = new WeatherSourceProvider({ - db, credentials: parsed.credentials, }) diff --git a/apps/aelis-backend/src/engine/http.test.ts b/apps/aelis-backend/src/engine/http.test.ts index 2880edf..14c4a39 100644 --- a/apps/aelis-backend/src/engine/http.test.ts +++ b/apps/aelis-backend/src/engine/http.test.ts @@ -1,9 +1,11 @@ import type { ActionDefinition, ContextEntry, FeedItem, FeedSource } from "@aelis/core" import { contextKey } from "@aelis/core" -import { describe, expect, spyOn, test } from "bun:test" +import { describe, expect, mock, spyOn, test } from "bun:test" import { Hono } from "hono" +import type { Database } from "../db/index.ts" + import { mockAuthSessionMiddleware } from "../auth/session-middleware.ts" import { UserSessionManager } from "../session/index.ts" import { registerFeedHttpHandlers } from "./http.ts" @@ -50,9 +52,45 @@ function buildTestApp(sessionManager: UserSessionManager, userId?: string) { return app } +let mockEnabledSourceIds: string[] = [] + +mock.module("../sources/user-sources.ts", () => ({ + sources: (_db: Database, _userId: string) => ({ + async enabled() { + const now = new Date() + return mockEnabledSourceIds.map((sourceId) => ({ + id: crypto.randomUUID(), + userId: _userId, + sourceId, + enabled: true, + config: {}, + credentials: null, + createdAt: now, + updatedAt: now, + })) + }, + async find(sourceId: string) { + const now = new Date() + return { + id: crypto.randomUUID(), + userId: _userId, + sourceId, + enabled: true, + config: {}, + credentials: null, + createdAt: now, + updatedAt: now, + } + }, + }), +})) + +const fakeDb = {} as Database + describe("GET /api/feed", () => { test("returns 401 without auth", async () => { - const manager = new UserSessionManager({ providers: [] }) + mockEnabledSourceIds = [] + const manager = new UserSessionManager({ db: fakeDb, providers: [] }) const app = buildTestApp(manager) const res = await app.request("/api/feed") @@ -71,7 +109,9 @@ describe("GET /api/feed", () => { data: { value: 42 }, }, ] + mockEnabledSourceIds = ["test"] const manager = new UserSessionManager({ + db: fakeDb, providers: [ { sourceId: "test", @@ -111,7 +151,9 @@ describe("GET /api/feed", () => { data: { fresh: true }, }, ] + mockEnabledSourceIds = ["test"] const manager = new UserSessionManager({ + db: fakeDb, providers: [ { sourceId: "test", @@ -150,7 +192,9 @@ describe("GET /api/feed", () => { throw new Error("connection timeout") }, } + mockEnabledSourceIds = ["failing"] const manager = new UserSessionManager({ + db: fakeDb, providers: [ { sourceId: "failing", @@ -173,7 +217,9 @@ describe("GET /api/feed", () => { }) test("returns 503 when all providers fail", async () => { + mockEnabledSourceIds = ["test"] const manager = new UserSessionManager({ + db: fakeDb, providers: [ { sourceId: "test", @@ -206,7 +252,9 @@ describe("GET /api/context", () => { const mockUserId = "k7Gx2mPqRvNwYs9TdLfA4bHcJeUo1iZn" async function buildContextApp(userId?: string) { + mockEnabledSourceIds = ["weather"] const manager = new UserSessionManager({ + db: fakeDb, providers: [ { sourceId: "weather", @@ -222,7 +270,8 @@ describe("GET /api/context", () => { } test("returns 401 without auth", async () => { - const manager = new UserSessionManager({ providers: [] }) + mockEnabledSourceIds = [] + const manager = new UserSessionManager({ db: fakeDb, providers: [] }) const app = buildTestApp(manager) const res = await app.request('/api/context?key=["aelis.weather","weather"]') diff --git a/apps/aelis-backend/src/location/provider.ts b/apps/aelis-backend/src/location/provider.ts index 958640b..7e3bd6d 100644 --- a/apps/aelis-backend/src/location/provider.ts +++ b/apps/aelis-backend/src/location/provider.ts @@ -1,26 +1,11 @@ import { LocationSource } from "@aelis/source-location" -import type { Database } from "../db/index.ts" import type { FeedSourceProvider } from "../session/feed-source-provider.ts" -import { SourceDisabledError } from "../sources/errors.ts" -import { sources } from "../sources/user-sources.ts" - export class LocationSourceProvider implements FeedSourceProvider { readonly sourceId = "aelis.location" - private readonly db: Database - - constructor(db: Database) { - this.db = db - } - - async feedSourceForUser(userId: string): Promise { - const row = await sources(this.db, userId).find("aelis.location") - - if (!row || !row.enabled) { - throw new SourceDisabledError("aelis.location", userId) - } + async feedSourceForUser(_userId: string, _config: unknown): Promise { return new LocationSource() } } diff --git a/apps/aelis-backend/src/server.ts b/apps/aelis-backend/src/server.ts index 4b4ca91..10c0cc5 100644 --- a/apps/aelis-backend/src/server.ts +++ b/apps/aelis-backend/src/server.ts @@ -32,10 +32,10 @@ function main() { } const sessionManager = new UserSessionManager({ + db, providers: [ - new LocationSourceProvider(db), + new LocationSourceProvider(), new WeatherSourceProvider({ - db, credentials: { privateKey: process.env.WEATHERKIT_PRIVATE_KEY!, keyId: process.env.WEATHERKIT_KEY_ID!, diff --git a/apps/aelis-backend/src/session/feed-source-provider.ts b/apps/aelis-backend/src/session/feed-source-provider.ts index 0430520..3072a18 100644 --- a/apps/aelis-backend/src/session/feed-source-provider.ts +++ b/apps/aelis-backend/src/session/feed-source-provider.ts @@ -3,5 +3,5 @@ import type { FeedSource } from "@aelis/core" export interface FeedSourceProvider { /** The source ID this provider is responsible for (e.g., "aelis.location"). */ readonly sourceId: string - feedSourceForUser(userId: string): Promise + feedSourceForUser(userId: string, config: unknown): Promise } diff --git a/apps/aelis-backend/src/session/user-session-manager.test.ts b/apps/aelis-backend/src/session/user-session-manager.test.ts index 1ff398b..cbe15b2 100644 --- a/apps/aelis-backend/src/session/user-session-manager.test.ts +++ b/apps/aelis-backend/src/session/user-session-manager.test.ts @@ -2,12 +2,77 @@ import type { ActionDefinition, ContextEntry, FeedItem, FeedSource } from "@aeli import { LocationSource } from "@aelis/source-location" import { WeatherSource } from "@aelis/source-weatherkit" -import { describe, expect, mock, spyOn, test } from "bun:test" +import { beforeEach, describe, expect, mock, spyOn, test } from "bun:test" +import type { Database } from "../db/index.ts" import type { FeedSourceProvider } from "./feed-source-provider.ts" import { UserSessionManager } from "./user-session-manager.ts" +/** + * Per-user enabled source IDs used by the mocked `sources` module. + * Tests configure this before calling getOrCreate. + * Key = userId (or "*" for a default), value = array of enabled sourceIds. + */ +const enabledByUser = new Map() + +/** Set which sourceIds are enabled for all users. */ +function setEnabledSources(sourceIds: string[]) { + enabledByUser.clear() + enabledByUser.set("*", sourceIds) +} + +/** Set which sourceIds are enabled for a specific user. */ +function setEnabledSourcesForUser(userId: string, sourceIds: string[]) { + enabledByUser.set(userId, sourceIds) +} + +function getEnabledSourceIds(userId: string): string[] { + return enabledByUser.get(userId) ?? enabledByUser.get("*") ?? [] +} + +/** + * Controls what `find()` returns in the mock. When `undefined` (the default), + * `find()` returns a standard enabled row. Set to a specific value (including + * `null`) to override the return value for all `find()` calls. + */ +let mockFindResult: unknown | undefined + +// Mock the sources module so UserSessionManager's DB query returns controlled data. +mock.module("../sources/user-sources.ts", () => ({ + sources: (_db: Database, userId: string) => ({ + async enabled() { + const now = new Date() + return getEnabledSourceIds(userId).map((sourceId) => ({ + id: crypto.randomUUID(), + userId, + sourceId, + enabled: true, + config: {}, + credentials: null, + createdAt: now, + updatedAt: now, + })) + }, + async find(sourceId: string) { + if (mockFindResult !== undefined) return mockFindResult + const now = new Date() + return { + id: crypto.randomUUID(), + userId, + sourceId, + enabled: true, + config: {}, + credentials: null, + createdAt: now, + updatedAt: now, + } + }, + }), +})) + +const fakeDb = {} as Database + function createStubSource(id: string, items: FeedItem[] = []): FeedSource { return { id, @@ -28,7 +93,8 @@ function createStubSource(id: string, items: FeedItem[] = []): FeedSource { function createStubProvider( sourceId: string, - factory: (userId: string) => Promise = async () => createStubSource(sourceId), + factory: (userId: string, config: Record) => Promise = async () => + createStubSource(sourceId), ): FeedSourceProvider { return { sourceId, feedSourceForUser: factory } } @@ -47,9 +113,15 @@ const weatherProvider: FeedSourceProvider = { }, } +beforeEach(() => { + enabledByUser.clear() + mockFindResult = undefined +}) + describe("UserSessionManager", () => { test("getOrCreate creates session on first call", async () => { - const manager = new UserSessionManager({ providers: [locationProvider] }) + setEnabledSources(["aelis.location"]) + const manager = new UserSessionManager({ db: fakeDb, providers: [locationProvider] }) const session = await manager.getOrCreate("user-1") @@ -58,7 +130,8 @@ describe("UserSessionManager", () => { }) test("getOrCreate returns same session for same user", async () => { - const manager = new UserSessionManager({ providers: [locationProvider] }) + setEnabledSources(["aelis.location"]) + const manager = new UserSessionManager({ db: fakeDb, providers: [locationProvider] }) const session1 = await manager.getOrCreate("user-1") const session2 = await manager.getOrCreate("user-1") @@ -67,7 +140,8 @@ describe("UserSessionManager", () => { }) test("getOrCreate returns different sessions for different users", async () => { - const manager = new UserSessionManager({ providers: [locationProvider] }) + setEnabledSources(["aelis.location"]) + const manager = new UserSessionManager({ db: fakeDb, providers: [locationProvider] }) const session1 = await manager.getOrCreate("user-1") const session2 = await manager.getOrCreate("user-2") @@ -76,7 +150,8 @@ describe("UserSessionManager", () => { }) test("each user gets independent source instances", async () => { - const manager = new UserSessionManager({ providers: [locationProvider] }) + setEnabledSources(["aelis.location"]) + const manager = new UserSessionManager({ db: fakeDb, providers: [locationProvider] }) const session1 = await manager.getOrCreate("user-1") const session2 = await manager.getOrCreate("user-2") @@ -88,7 +163,8 @@ describe("UserSessionManager", () => { }) test("remove destroys session and allows re-creation", async () => { - const manager = new UserSessionManager({ providers: [locationProvider] }) + setEnabledSources(["aelis.location"]) + const manager = new UserSessionManager({ db: fakeDb, providers: [locationProvider] }) const session1 = await manager.getOrCreate("user-1") manager.remove("user-1") @@ -98,13 +174,16 @@ describe("UserSessionManager", () => { }) test("remove is no-op for unknown user", () => { - const manager = new UserSessionManager({ providers: [locationProvider] }) + setEnabledSources(["aelis.location"]) + const manager = new UserSessionManager({ db: fakeDb, providers: [locationProvider] }) expect(() => manager.remove("unknown")).not.toThrow() }) test("registers multiple providers", async () => { + setEnabledSources(["aelis.location", "aelis.weather"]) const manager = new UserSessionManager({ + db: fakeDb, providers: [locationProvider, weatherProvider], }) @@ -115,7 +194,8 @@ describe("UserSessionManager", () => { }) test("refresh returns feed result through session", async () => { - const manager = new UserSessionManager({ providers: [locationProvider] }) + setEnabledSources(["aelis.location"]) + const manager = new UserSessionManager({ db: fakeDb, providers: [locationProvider] }) const session = await manager.getOrCreate("user-1") const result = await session.engine.refresh() @@ -127,7 +207,8 @@ describe("UserSessionManager", () => { }) test("location update via executeAction works", async () => { - const manager = new UserSessionManager({ providers: [locationProvider] }) + setEnabledSources(["aelis.location"]) + const manager = new UserSessionManager({ db: fakeDb, providers: [locationProvider] }) const session = await manager.getOrCreate("user-1") await session.engine.executeAction("aelis.location", "update-location", { @@ -142,7 +223,8 @@ describe("UserSessionManager", () => { }) test("subscribe receives updates after location push", async () => { - const manager = new UserSessionManager({ providers: [locationProvider] }) + setEnabledSources(["aelis.location"]) + const manager = new UserSessionManager({ db: fakeDb, providers: [locationProvider] }) const callback = mock() const session = await manager.getOrCreate("user-1") @@ -162,7 +244,8 @@ describe("UserSessionManager", () => { }) test("remove stops reactive updates", async () => { - const manager = new UserSessionManager({ providers: [locationProvider] }) + setEnabledSources(["aelis.location"]) + const manager = new UserSessionManager({ db: fakeDb, providers: [locationProvider] }) const callback = mock() const session = await manager.getOrCreate("user-1") @@ -185,6 +268,7 @@ describe("UserSessionManager", () => { }) test("creates session with successful providers when some fail", async () => { + setEnabledSources(["aelis.location", "aelis.failing"]) const failingProvider: FeedSourceProvider = { sourceId: "aelis.failing", async feedSourceForUser() { @@ -193,6 +277,7 @@ describe("UserSessionManager", () => { } const manager = new UserSessionManager({ + db: fakeDb, providers: [locationProvider, failingProvider], }) @@ -208,7 +293,9 @@ describe("UserSessionManager", () => { }) test("throws AggregateError when all providers fail", async () => { + setEnabledSources(["aelis.fail-1", "aelis.fail-2"]) const manager = new UserSessionManager({ + db: fakeDb, providers: [ { sourceId: "aelis.fail-1", @@ -229,8 +316,10 @@ describe("UserSessionManager", () => { }) test("concurrent getOrCreate for same user returns same session", async () => { + setEnabledSources(["aelis.location"]) let callCount = 0 const manager = new UserSessionManager({ + db: fakeDb, providers: [ { sourceId: "aelis.location", @@ -253,12 +342,14 @@ describe("UserSessionManager", () => { }) test("remove during in-flight getOrCreate prevents session from being stored", async () => { + setEnabledSources(["aelis.location"]) let resolveProvider: () => void const providerGate = new Promise((r) => { resolveProvider = r }) const manager = new UserSessionManager({ + db: fakeDb, providers: [ { sourceId: "aelis.location", @@ -285,10 +376,67 @@ describe("UserSessionManager", () => { expect(freshSession).toBeDefined() expect(freshSession.engine).toBeDefined() }) + + test("only invokes providers for sources enabled for the user", async () => { + setEnabledSources(["aelis.location"]) + const locationFactory = mock(async () => createStubSource("aelis.location")) + const weatherFactory = mock(async () => createStubSource("aelis.weather")) + + const manager = new UserSessionManager({ + db: fakeDb, + providers: [ + { sourceId: "aelis.location", feedSourceForUser: locationFactory }, + { sourceId: "aelis.weather", feedSourceForUser: weatherFactory }, + ], + }) + + const session = await manager.getOrCreate("user-1") + + expect(locationFactory).toHaveBeenCalledTimes(1) + expect(weatherFactory).not.toHaveBeenCalled() + expect(session.getSource("aelis.location")).toBeDefined() + expect(session.getSource("aelis.weather")).toBeUndefined() + }) + + test("creates empty session when no sources are enabled", async () => { + setEnabledSources([]) + const factory = mock(async () => createStubSource("aelis.location")) + + const manager = new UserSessionManager({ + db: fakeDb, + providers: [{ sourceId: "aelis.location", feedSourceForUser: factory }], + }) + + const session = await manager.getOrCreate("user-1") + + expect(factory).not.toHaveBeenCalled() + expect(session).toBeDefined() + expect(session.getSource("aelis.location")).toBeUndefined() + }) + + test("per-user enabled sources are respected", async () => { + enabledByUser.clear() + setEnabledSourcesForUser("user-1", ["aelis.location"]) + setEnabledSourcesForUser("user-2", ["aelis.weather"]) + + const manager = new UserSessionManager({ + db: fakeDb, + providers: [createStubProvider("aelis.location"), createStubProvider("aelis.weather")], + }) + + const session1 = await manager.getOrCreate("user-1") + const session2 = await manager.getOrCreate("user-2") + + expect(session1.getSource("aelis.location")).toBeDefined() + expect(session1.getSource("aelis.weather")).toBeUndefined() + expect(session2.getSource("aelis.location")).toBeUndefined() + expect(session2.getSource("aelis.weather")).toBeDefined() + }) }) describe("UserSessionManager.replaceProvider", () => { test("replaces source in all active sessions", async () => { + setEnabledSources(["test"]) const itemsV1: FeedItem[] = [ { id: "v1", @@ -309,7 +457,7 @@ describe("UserSessionManager.replaceProvider", () => { ] const providerV1 = createStubProvider("test", async () => createStubSource("test", itemsV1)) - const manager = new UserSessionManager({ providers: [providerV1] }) + const manager = new UserSessionManager({ db: fakeDb, providers: [providerV1] }) const session1 = await manager.getOrCreate("user-1") const session2 = await manager.getOrCreate("user-2") @@ -330,7 +478,8 @@ describe("UserSessionManager.replaceProvider", () => { }) test("throws for unknown provider sourceId", async () => { - const manager = new UserSessionManager({ providers: [locationProvider] }) + setEnabledSources(["aelis.location"]) + const manager = new UserSessionManager({ db: fakeDb, providers: [locationProvider] }) const unknownProvider = createStubProvider("aelis.unknown") @@ -340,8 +489,9 @@ describe("UserSessionManager.replaceProvider", () => { }) test("keeps existing source when new provider fails for a user", async () => { + setEnabledSources(["test"]) const providerV1 = createStubProvider("test", async () => createStubSource("test")) - const manager = new UserSessionManager({ providers: [providerV1] }) + const manager = new UserSessionManager({ db: fakeDb, providers: [providerV1] }) const session = await manager.getOrCreate("user-1") expect(session.getSource("test")).toBeDefined() @@ -360,6 +510,7 @@ describe("UserSessionManager.replaceProvider", () => { }) test("new sessions use the replaced provider", async () => { + setEnabledSources(["test"]) const itemsV1: FeedItem[] = [ { id: "v1", @@ -380,7 +531,7 @@ describe("UserSessionManager.replaceProvider", () => { ] const providerV1 = createStubProvider("test", async () => createStubSource("test", itemsV1)) - const manager = new UserSessionManager({ providers: [providerV1] }) + const manager = new UserSessionManager({ db: fakeDb, providers: [providerV1] }) const providerV2 = createStubProvider("test", async () => createStubSource("test", itemsV2)) await manager.replaceProvider(providerV2) @@ -392,6 +543,7 @@ describe("UserSessionManager.replaceProvider", () => { }) test("does not affect other providers' sources", async () => { + setEnabledSources(["source-a", "source-b"]) const providerA = createStubProvider("source-a", async () => createStubSource("source-a", [ { @@ -415,7 +567,7 @@ describe("UserSessionManager.replaceProvider", () => { ]), ) - const manager = new UserSessionManager({ providers: [providerA, providerB] }) + const manager = new UserSessionManager({ db: fakeDb, providers: [providerA, providerB] }) const session = await manager.getOrCreate("user-1") // Replace only source-a @@ -440,6 +592,7 @@ describe("UserSessionManager.replaceProvider", () => { }) test("updates sessions that are still being created", async () => { + setEnabledSources(["test"]) const itemsV1: FeedItem[] = [ { id: "v1", @@ -468,7 +621,7 @@ describe("UserSessionManager.replaceProvider", () => { await creationGate return createStubSource("test", itemsV1) }) - const manager = new UserSessionManager({ providers: [providerV1] }) + const manager = new UserSessionManager({ db: fakeDb, providers: [providerV1] }) // Start session creation but don't let it finish yet const sessionPromise = manager.getOrCreate("user-1") @@ -487,4 +640,44 @@ describe("UserSessionManager.replaceProvider", () => { const feed = await session.feed() expect(feed.items[0]!.data.version).toBe(2) }) + + test("skips source replacement when source was disabled between creation and replace", async () => { + setEnabledSources(["test"]) + const itemsV1: FeedItem[] = [ + { + id: "v1", + sourceId: "test", + type: "test", + timestamp: new Date(), + data: { version: 1 }, + }, + ] + + const providerV1 = createStubProvider("test", async () => createStubSource("test", itemsV1)) + const manager = new UserSessionManager({ db: fakeDb, providers: [providerV1] }) + + const session = await manager.getOrCreate("user-1") + const feedBefore = await session.feed() + expect(feedBefore.items[0]!.data.version).toBe(1) + + // Simulate the source being disabled/deleted between session creation and replace + mockFindResult = null + + const providerV2 = createStubProvider("test", async () => + createStubSource("test", [ + { + id: "v2", + sourceId: "test", + type: "test", + timestamp: new Date(), + data: { version: 2 }, + }, + ]), + ) + await manager.replaceProvider(providerV2) + + // Session should still have v1 — the replace was skipped + const feedAfter = await session.feed() + expect(feedAfter.items[0]!.data.version).toBe(1) + }) }) diff --git a/apps/aelis-backend/src/session/user-session-manager.ts b/apps/aelis-backend/src/session/user-session-manager.ts index ee4be3a..146cc31 100644 --- a/apps/aelis-backend/src/session/user-session-manager.ts +++ b/apps/aelis-backend/src/session/user-session-manager.ts @@ -1,11 +1,14 @@ import type { FeedSource } from "@aelis/core" +import type { Database } from "../db/index.ts" import type { FeedEnhancer } from "../enhancement/enhance-feed.ts" import type { FeedSourceProvider } from "./feed-source-provider.ts" +import { sources } from "../sources/user-sources.ts" import { UserSession } from "./user-session.ts" export interface UserSessionManagerConfig { + db: Database providers: FeedSourceProvider[] feedEnhancer?: FeedEnhancer | null } @@ -13,10 +16,12 @@ export interface UserSessionManagerConfig { export class UserSessionManager { private sessions = new Map() private pending = new Map>() + private readonly db: Database private readonly providers = new Map() private readonly feedEnhancer: FeedEnhancer | null constructor(config: UserSessionManagerConfig) { + this.db = config.db for (const provider of config.providers) { this.providers.set(provider.sourceId, provider) } @@ -64,8 +69,9 @@ export class UserSessionManager { /** * Replaces a provider and updates all active sessions. * The new provider must have the same sourceId as an existing one. - * For each active session, re-resolves the source via session.refreshSource. - * If the provider fails for a user, the existing source is kept. + * For each active session, queries the user's source config from the DB + * and re-resolves the source. If the provider fails for a user, the + * existing source is kept. */ async replaceProvider(provider: FeedSourceProvider): Promise { if (!this.providers.has(provider.sourceId)) { @@ -79,7 +85,7 @@ export class UserSessionManager { const updates: Promise[] = [] for (const [, session] of this.sessions) { - updates.push(session.refreshSource(provider)) + updates.push(this.refreshSessionSource(session, provider)) } // Also update sessions that are currently being created so they @@ -87,7 +93,7 @@ export class UserSessionManager { for (const [, pendingPromise] of this.pending) { updates.push( pendingPromise - .then((session) => session.refreshSource(provider)) + .then((session) => this.refreshSessionSource(session, provider)) .catch(() => { // Session creation itself failed — nothing to update. }), @@ -97,23 +103,60 @@ export class UserSessionManager { await Promise.all(updates) } - private async createSession(userId: string): Promise { - const results = await Promise.allSettled( - Array.from(this.providers.values()).map((p) => p.feedSourceForUser(userId)), - ) + /** + * Re-resolves a single source for a session by querying the user's config + * from the DB and calling the provider. If the provider fails, the existing + * source is kept. + */ + private async refreshSessionSource( + session: UserSession, + provider: FeedSourceProvider, + ): Promise { + if (!session.hasSource(provider.sourceId)) return - const sources: FeedSource[] = [] + try { + const row = await sources(this.db, session.userId).find(provider.sourceId) + if (!row?.enabled) return + + const newSource = await provider.feedSourceForUser(session.userId, row.config ?? {}) + session.replaceSource(provider.sourceId, newSource) + } catch (err) { + console.error( + `[UserSessionManager] refreshSource("${provider.sourceId}") failed for user ${session.userId}:`, + err, + ) + } + } + + private async createSession(userId: string): Promise { + const enabledRows = await sources(this.db, userId).enabled() + + const promises: Promise[] = [] + for (const row of enabledRows) { + const provider = this.providers.get(row.sourceId) + if (provider) { + promises.push(provider.feedSourceForUser(userId, row.config ?? {})) + } + } + + if (promises.length === 0) { + return new UserSession(userId, [], this.feedEnhancer) + } + + const results = await Promise.allSettled(promises) + + const feedSources: FeedSource[] = [] const errors: unknown[] = [] for (const result of results) { if (result.status === "fulfilled") { - sources.push(result.value) + feedSources.push(result.value) } else { errors.push(result.reason) } } - if (sources.length === 0 && errors.length > 0) { + if (feedSources.length === 0 && errors.length > 0) { throw new AggregateError(errors, "All feed source providers failed") } @@ -121,6 +164,6 @@ export class UserSessionManager { console.error("[UserSessionManager] Feed source provider failed:", error) } - return new UserSession(userId, sources, this.feedEnhancer) + return new UserSession(userId, feedSources, this.feedEnhancer) } } diff --git a/apps/aelis-backend/src/session/user-session.test.ts b/apps/aelis-backend/src/session/user-session.test.ts index 25274d9..2ec8967 100644 --- a/apps/aelis-backend/src/session/user-session.test.ts +++ b/apps/aelis-backend/src/session/user-session.test.ts @@ -3,8 +3,6 @@ import type { ActionDefinition, ContextEntry, FeedItem, FeedSource } from "@aeli import { LocationSource } from "@aelis/source-location" import { describe, expect, spyOn, test } from "bun:test" -import type { FeedSourceProvider } from "./feed-source-provider.ts" - import { UserSession } from "./user-session.ts" function createStubSource(id: string, items: FeedItem[] = []): FeedSource { @@ -394,73 +392,3 @@ describe("UserSession.removeSource", () => { expect(session.getSource("test")).toBeDefined() }) }) - -describe("UserSession.refreshSource", () => { - test("replaces existing source via provider", async () => { - const itemsV1: FeedItem[] = [ - { - id: "v1", - sourceId: "test", - type: "test", - timestamp: new Date(), - data: { version: 1 }, - }, - ] - const itemsV2: FeedItem[] = [ - { - id: "v2", - sourceId: "test", - type: "test", - timestamp: new Date(), - data: { version: 2 }, - }, - ] - - const session = new UserSession("test-user", [createStubSource("test", itemsV1)]) - - const provider: FeedSourceProvider = { - sourceId: "test", - async feedSourceForUser() { - return createStubSource("test", itemsV2) - }, - } - - await session.refreshSource(provider) - - const result = await session.feed() - expect(result.items[0]!.data.version).toBe(2) - }) - - test("throws when source is not registered", async () => { - const session = new UserSession("test-user", [createStubSource("existing")]) - - const provider: FeedSourceProvider = { - sourceId: "new-source", - async feedSourceForUser() { - return createStubSource("new-source") - }, - } - - await expect(session.refreshSource(provider)).rejects.toThrow() - }) - - test("keeps existing source when provider fails", async () => { - const session = new UserSession("test-user", [createStubSource("test")]) - - const spy = spyOn(console, "error").mockImplementation(() => {}) - - const provider: FeedSourceProvider = { - sourceId: "test", - async feedSourceForUser() { - throw new Error("source disabled") - }, - } - - await session.refreshSource(provider) - - expect(session.getSource("test")).toBeDefined() - expect(spy).toHaveBeenCalled() - - spy.mockRestore() - }) -}) diff --git a/apps/aelis-backend/src/session/user-session.ts b/apps/aelis-backend/src/session/user-session.ts index a09a6ee..6f35a12 100644 --- a/apps/aelis-backend/src/session/user-session.ts +++ b/apps/aelis-backend/src/session/user-session.ts @@ -1,7 +1,6 @@ import { FeedEngine, type FeedItem, type FeedResult, type FeedSource } from "@aelis/core" import type { FeedEnhancer } from "../enhancement/enhance-feed.ts" -import type { FeedSourceProvider } from "./feed-source-provider.ts" export class UserSession { readonly userId: string @@ -70,25 +69,8 @@ export class UserSession { return this.sources.get(sourceId) as T | undefined } - /** - * Re-resolves a source from its provider using this session's userId. - * The source must already be registered. Throws if it isn't. - * If the provider fails, the existing source is kept. - */ - async refreshSource(provider: FeedSourceProvider): Promise { - if (!this.sources.has(provider.sourceId)) { - throw new Error(`Cannot refresh source "${provider.sourceId}": not registered`) - } - - try { - const newSource = await provider.feedSourceForUser(this.userId) - this.replaceSource(provider.sourceId, newSource) - } catch (err) { - console.error( - `[UserSession] refreshSource("${provider.sourceId}") failed for user ${this.userId}:`, - err, - ) - } + hasSource(sourceId: string): boolean { + return this.sources.has(sourceId) } /** diff --git a/apps/aelis-backend/src/sources/errors.ts b/apps/aelis-backend/src/sources/errors.ts index 9ceab77..8ee845f 100644 --- a/apps/aelis-backend/src/sources/errors.ts +++ b/apps/aelis-backend/src/sources/errors.ts @@ -1,21 +1,3 @@ -/** - * Thrown by a FeedSourceProvider when the source is not enabled for a user. - * - * UserSessionManager's Promise.allSettled handles this gracefully — - * the source is excluded from the session without crashing. - */ -export class SourceDisabledError extends Error { - readonly sourceId: string - readonly userId: string - - constructor(sourceId: string, userId: string) { - super(`Source "${sourceId}" is not enabled for user "${userId}"`) - this.name = "SourceDisabledError" - this.sourceId = sourceId - this.userId = userId - } -} - /** * Thrown when an operation targets a user source that doesn't exist. */ diff --git a/apps/aelis-backend/src/tfl/provider.ts b/apps/aelis-backend/src/tfl/provider.ts index 286b491..039cba8 100644 --- a/apps/aelis-backend/src/tfl/provider.ts +++ b/apps/aelis-backend/src/tfl/provider.ts @@ -1,15 +1,11 @@ import { TflSource, type ITflApi, type TflLineId } from "@aelis/source-tfl" import { type } from "arktype" -import type { Database } from "../db/index.ts" import type { FeedSourceProvider } from "../session/feed-source-provider.ts" -import { SourceDisabledError } from "../sources/errors.ts" -import { sources } from "../sources/user-sources.ts" - export type TflSourceProviderOptions = - | { db: Database; apiKey: string; client?: never } - | { db: Database; apiKey?: never; client: ITflApi } + | { apiKey: string; client?: never } + | { apiKey?: never; client: ITflApi } const tflConfig = type({ "lines?": "string[]", @@ -17,26 +13,18 @@ const tflConfig = type({ export class TflSourceProvider implements FeedSourceProvider { readonly sourceId = "aelis.tfl" - private readonly db: Database private readonly apiKey: string | undefined private readonly client: ITflApi | undefined constructor(options: TflSourceProviderOptions) { - this.db = options.db this.apiKey = "apiKey" in options ? options.apiKey : undefined this.client = "client" in options ? options.client : undefined } - async feedSourceForUser(userId: string): Promise { - const row = await sources(this.db, userId).find("aelis.tfl") - - if (!row || !row.enabled) { - throw new SourceDisabledError("aelis.tfl", userId) - } - - const parsed = tflConfig(row.config ?? {}) + async feedSourceForUser(_userId: string, config: unknown): Promise { + const parsed = tflConfig(config) if (parsed instanceof type.errors) { - throw new Error(`Invalid TFL config for user ${userId}: ${parsed.summary}`) + throw new Error(`Invalid TFL config: ${parsed.summary}`) } return new TflSource({ diff --git a/apps/aelis-backend/src/weather/provider.ts b/apps/aelis-backend/src/weather/provider.ts index 6997a25..ce29107 100644 --- a/apps/aelis-backend/src/weather/provider.ts +++ b/apps/aelis-backend/src/weather/provider.ts @@ -1,14 +1,9 @@ import { WeatherSource, type WeatherSourceOptions } from "@aelis/source-weatherkit" import { type } from "arktype" -import type { Database } from "../db/index.ts" import type { FeedSourceProvider } from "../session/feed-source-provider.ts" -import { SourceDisabledError } from "../sources/errors.ts" -import { sources } from "../sources/user-sources.ts" - export interface WeatherSourceProviderOptions { - db: Database credentials: WeatherSourceOptions["credentials"] client?: WeatherSourceOptions["client"] } @@ -21,26 +16,18 @@ const weatherConfig = type({ export class WeatherSourceProvider implements FeedSourceProvider { readonly sourceId = "aelis.weather" - private readonly db: Database private readonly credentials: WeatherSourceOptions["credentials"] private readonly client: WeatherSourceOptions["client"] constructor(options: WeatherSourceProviderOptions) { - this.db = options.db this.credentials = options.credentials this.client = options.client } - async feedSourceForUser(userId: string): Promise { - const row = await sources(this.db, userId).find("aelis.weather") - - if (!row || !row.enabled) { - throw new SourceDisabledError("aelis.weather", userId) - } - - const parsed = weatherConfig(row.config ?? {}) + async feedSourceForUser(_userId: string, config: unknown): Promise { + const parsed = weatherConfig(config) if (parsed instanceof type.errors) { - throw new Error(`Invalid weather config for user ${userId}: ${parsed.summary}`) + throw new Error(`Invalid weather config: ${parsed.summary}`) } return new WeatherSource({