From 0b51b97f6c037f69efa5aa41abc9c7313dec09fb Mon Sep 17 00:00:00 2001 From: Kenneth Date: Sun, 15 Mar 2026 22:57:19 +0000 Subject: [PATCH] feat(backend): make FeedSourceProvider async (#77) * feat(backend): make FeedSourceProvider async Make feedSourceForUser and FeedSourceProviderFn return promises. Use Promise.allSettled to tolerate partial provider failures. Guard concurrent getOrCreate calls with in-flight promise dedup. Return 503 from HTTP handlers when session creation fails. Co-authored-by: Ona * fix(backend): handle remove() during in-flight session creation Cancel pending getOrCreate when remove() is called mid-flight. Destroy the resulting session to prevent it from leaking. Co-authored-by: Ona --------- Co-authored-by: Ona --- apps/aelis-backend/src/engine/http.test.ts | 61 ++++--- apps/aelis-backend/src/engine/http.ts | 21 ++- apps/aelis-backend/src/location/http.ts | 10 +- apps/aelis-backend/src/server.ts | 4 +- .../src/session/feed-source-provider.ts | 4 +- .../src/session/user-session-manager.test.ts | 165 ++++++++++++++---- .../src/session/user-session-manager.ts | 62 ++++++- apps/aelis-backend/src/tfl/provider.ts | 2 +- apps/aelis-backend/src/weather/provider.ts | 2 +- 9 files changed, 255 insertions(+), 76 deletions(-) diff --git a/apps/aelis-backend/src/engine/http.test.ts b/apps/aelis-backend/src/engine/http.test.ts index 21644ec..1e6c8ca 100644 --- a/apps/aelis-backend/src/engine/http.test.ts +++ b/apps/aelis-backend/src/engine/http.test.ts @@ -1,7 +1,7 @@ import type { ActionDefinition, ContextEntry, FeedItem, FeedSource } from "@aelis/core" import { contextKey } from "@aelis/core" -import { describe, expect, test } from "bun:test" +import { describe, expect, spyOn, test } from "bun:test" import { Hono } from "hono" import { mockAuthSessionMiddleware } from "../auth/session-middleware.ts" @@ -72,12 +72,12 @@ describe("GET /api/feed", () => { }, ] const manager = new UserSessionManager({ - providers: [() => createStubSource("test", items)], + providers: [async () => createStubSource("test", items)], }) const app = buildTestApp(manager, "user-1") // Prime the cache - const session = manager.getOrCreate("user-1") + const session = await manager.getOrCreate("user-1") await session.engine.refresh() expect(session.engine.lastFeed()).not.toBeNull() @@ -105,7 +105,7 @@ describe("GET /api/feed", () => { }, ] const manager = new UserSessionManager({ - providers: [() => createStubSource("test", items)], + providers: [async () => createStubSource("test", items)], }) const app = buildTestApp(manager, "user-1") @@ -136,7 +136,7 @@ describe("GET /api/feed", () => { throw new Error("connection timeout") }, } - const manager = new UserSessionManager({ providers: [() => failingSource] }) + const manager = new UserSessionManager({ providers: [async () => failingSource] }) const app = buildTestApp(manager, "user-1") const res = await app.request("/api/feed") @@ -148,6 +148,27 @@ describe("GET /api/feed", () => { expect(body.errors[0]!.sourceId).toBe("failing") expect(body.errors[0]!.error).toBe("connection timeout") }) + + test("returns 503 when all providers fail", async () => { + const manager = new UserSessionManager({ + providers: [ + async () => { + throw new Error("provider down") + }, + ], + }) + const app = buildTestApp(manager, "user-1") + + const spy = spyOn(console, "error").mockImplementation(() => {}) + + const res = await app.request("/api/feed") + + expect(res.status).toBe(503) + const body = (await res.json()) as { error: string } + expect(body.error).toBe("Service unavailable") + + spy.mockRestore() + }) }) describe("GET /api/context", () => { @@ -158,12 +179,12 @@ describe("GET /api/context", () => { // The mock auth middleware always injects this hardcoded user ID const mockUserId = "k7Gx2mPqRvNwYs9TdLfA4bHcJeUo1iZn" - function buildContextApp(userId?: string) { + async function buildContextApp(userId?: string) { const manager = new UserSessionManager({ - providers: [() => createStubSource("weather", [], contextEntries)], + providers: [async () => createStubSource("weather", [], contextEntries)], }) const app = buildTestApp(manager, userId) - const session = manager.getOrCreate(mockUserId) + const session = await manager.getOrCreate(mockUserId) return { app, session } } @@ -177,7 +198,7 @@ describe("GET /api/context", () => { }) test("returns 400 when key param is missing", async () => { - const { app } = buildContextApp("user-1") + const { app } = await buildContextApp("user-1") const res = await app.request("/api/context") @@ -187,7 +208,7 @@ describe("GET /api/context", () => { }) test("returns 400 when key is invalid JSON", async () => { - const { app } = buildContextApp("user-1") + const { app } = await buildContextApp("user-1") const res = await app.request("/api/context?key=notjson") @@ -197,7 +218,7 @@ describe("GET /api/context", () => { }) test("returns 400 when key is not an array", async () => { - const { app } = buildContextApp("user-1") + const { app } = await buildContextApp("user-1") const res = await app.request('/api/context?key="string"') @@ -207,7 +228,7 @@ describe("GET /api/context", () => { }) test("returns 400 when key contains invalid element types", async () => { - const { app } = buildContextApp("user-1") + const { app } = await buildContextApp("user-1") const res = await app.request("/api/context?key=[true,null,[1,2]]") @@ -217,7 +238,7 @@ describe("GET /api/context", () => { }) test("returns 400 when key is an empty array", async () => { - const { app } = buildContextApp("user-1") + const { app } = await buildContextApp("user-1") const res = await app.request("/api/context?key=[]") @@ -227,7 +248,7 @@ describe("GET /api/context", () => { }) test("returns 400 when match param is invalid", async () => { - const { app } = buildContextApp("user-1") + const { app } = await buildContextApp("user-1") const res = await app.request('/api/context?key=["aelis.weather"]&match=invalid') @@ -237,7 +258,7 @@ describe("GET /api/context", () => { }) test("returns exact match with match=exact", async () => { - const { app, session } = buildContextApp("user-1") + const { app, session } = await buildContextApp("user-1") await session.engine.refresh() const res = await app.request('/api/context?key=["aelis.weather","weather"]&match=exact') @@ -249,7 +270,7 @@ describe("GET /api/context", () => { }) test("returns 404 with match=exact when only prefix would match", async () => { - const { app, session } = buildContextApp("user-1") + const { app, session } = await buildContextApp("user-1") await session.engine.refresh() const res = await app.request('/api/context?key=["aelis.weather"]&match=exact') @@ -258,7 +279,7 @@ describe("GET /api/context", () => { }) test("returns prefix match with match=prefix", async () => { - const { app, session } = buildContextApp("user-1") + const { app, session } = await buildContextApp("user-1") await session.engine.refresh() const res = await app.request('/api/context?key=["aelis.weather"]&match=prefix') @@ -275,7 +296,7 @@ describe("GET /api/context", () => { }) test("default mode returns exact match when available", async () => { - const { app, session } = buildContextApp("user-1") + const { app, session } = await buildContextApp("user-1") await session.engine.refresh() const res = await app.request('/api/context?key=["aelis.weather","weather"]') @@ -287,7 +308,7 @@ describe("GET /api/context", () => { }) test("default mode falls back to prefix when no exact match", async () => { - const { app, session } = buildContextApp("user-1") + const { app, session } = await buildContextApp("user-1") await session.engine.refresh() const res = await app.request('/api/context?key=["aelis.weather"]') @@ -303,7 +324,7 @@ describe("GET /api/context", () => { }) test("returns 404 when neither exact nor prefix matches", async () => { - const { app, session } = buildContextApp("user-1") + const { app, session } = await buildContextApp("user-1") await session.engine.refresh() const res = await app.request('/api/context?key=["nonexistent"]') diff --git a/apps/aelis-backend/src/engine/http.ts b/apps/aelis-backend/src/engine/http.ts index af1a851..06702c2 100644 --- a/apps/aelis-backend/src/engine/http.ts +++ b/apps/aelis-backend/src/engine/http.ts @@ -33,7 +33,14 @@ export function registerFeedHttpHandlers( async function handleGetFeed(c: Context) { const user = c.get("user")! const sessionManager = c.get("sessionManager") - const session = sessionManager.getOrCreate(user.id) + + let session + try { + session = await sessionManager.getOrCreate(user.id) + } catch (err) { + console.error("[handleGetFeed] Failed to create session:", err) + return c.json({ error: "Service unavailable" }, 503) + } const feed = await session.feed() @@ -46,7 +53,7 @@ async function handleGetFeed(c: Context) { }) } -function handleGetContext(c: Context) { +async function handleGetContext(c: Context) { const keyParam = c.req.query("key") if (!keyParam) { return c.json({ error: 'Invalid or missing "key" parameter: must be a JSON array' }, 400) @@ -70,7 +77,15 @@ function handleGetContext(c: Context) { const user = c.get("user")! const sessionManager = c.get("sessionManager") - const session = sessionManager.getOrCreate(user.id) + + let session + try { + session = await sessionManager.getOrCreate(user.id) + } catch (err) { + console.error("[handleGetContext] Failed to create session:", err) + return c.json({ error: "Service unavailable" }, 503) + } + const context = session.engine.currentContext() const key = contextKey(...parsed) diff --git a/apps/aelis-backend/src/location/http.ts b/apps/aelis-backend/src/location/http.ts index 0bf1c2d..2f37dfc 100644 --- a/apps/aelis-backend/src/location/http.ts +++ b/apps/aelis-backend/src/location/http.ts @@ -44,7 +44,15 @@ async function handleUpdateLocation(c: Context) { const user = c.get("user")! const sessionManager = c.get("sessionManager") - const session = sessionManager.getOrCreate(user.id) + + let session + try { + session = await sessionManager.getOrCreate(user.id) + } catch (err) { + console.error("[handleUpdateLocation] Failed to create session:", err) + return c.json({ error: "Service unavailable" }, 503) + } + await session.engine.executeAction("aelis.location", "update-location", { lat: result.lat, lng: result.lng, diff --git a/apps/aelis-backend/src/server.ts b/apps/aelis-backend/src/server.ts index 8cf89ab..ca03828 100644 --- a/apps/aelis-backend/src/server.ts +++ b/apps/aelis-backend/src/server.ts @@ -3,9 +3,9 @@ import { Hono } from "hono" import { registerAuthHandlers } from "./auth/http.ts" import { mockAuthSessionMiddleware, requireSession } from "./auth/session-middleware.ts" +import { registerFeedHttpHandlers } from "./engine/http.ts" import { createFeedEnhancer } from "./enhancement/enhance-feed.ts" import { createLlmClient } from "./enhancement/llm-client.ts" -import { registerFeedHttpHandlers } from "./engine/http.ts" import { registerLocationHttpHandlers } from "./location/http.ts" import { UserSessionManager } from "./session/index.ts" import { WeatherSourceProvider } from "./weather/provider.ts" @@ -26,7 +26,7 @@ function main() { const sessionManager = new UserSessionManager({ providers: [ - () => new LocationSource(), + async () => new LocationSource(), new WeatherSourceProvider({ credentials: { privateKey: process.env.WEATHERKIT_PRIVATE_KEY!, diff --git a/apps/aelis-backend/src/session/feed-source-provider.ts b/apps/aelis-backend/src/session/feed-source-provider.ts index b1151ab..45aa5b3 100644 --- a/apps/aelis-backend/src/session/feed-source-provider.ts +++ b/apps/aelis-backend/src/session/feed-source-provider.ts @@ -1,9 +1,9 @@ import type { FeedSource } from "@aelis/core" export interface FeedSourceProvider { - feedSourceForUser(userId: string): FeedSource + feedSourceForUser(userId: string): Promise } -export type FeedSourceProviderFn = (userId: string) => FeedSource +export type FeedSourceProviderFn = (userId: string) => Promise export type FeedSourceProviderInput = FeedSourceProvider | FeedSourceProviderFn diff --git a/apps/aelis-backend/src/session/user-session-manager.test.ts b/apps/aelis-backend/src/session/user-session-manager.test.ts index de3be83..ca9078e 100644 --- a/apps/aelis-backend/src/session/user-session-manager.test.ts +++ b/apps/aelis-backend/src/session/user-session-manager.test.ts @@ -1,7 +1,7 @@ import type { WeatherKitClient, WeatherKitResponse } from "@aelis/source-weatherkit" import { LocationSource } from "@aelis/source-location" -import { describe, expect, mock, test } from "bun:test" +import { describe, expect, mock, spyOn, test } from "bun:test" import { WeatherSourceProvider } from "../weather/provider.ts" import { UserSessionManager } from "./user-session-manager.ts" @@ -11,38 +11,38 @@ const mockWeatherClient: WeatherKitClient = { } describe("UserSessionManager", () => { - test("getOrCreate creates session on first call", () => { - const manager = new UserSessionManager({ providers: [() => new LocationSource()] }) + test("getOrCreate creates session on first call", async () => { + const manager = new UserSessionManager({ providers: [async () => new LocationSource()] }) - const session = manager.getOrCreate("user-1") + const session = await manager.getOrCreate("user-1") expect(session).toBeDefined() expect(session.engine).toBeDefined() }) - test("getOrCreate returns same session for same user", () => { - const manager = new UserSessionManager({ providers: [() => new LocationSource()] }) + test("getOrCreate returns same session for same user", async () => { + const manager = new UserSessionManager({ providers: [async () => new LocationSource()] }) - const session1 = manager.getOrCreate("user-1") - const session2 = manager.getOrCreate("user-1") + const session1 = await manager.getOrCreate("user-1") + const session2 = await manager.getOrCreate("user-1") expect(session1).toBe(session2) }) - test("getOrCreate returns different sessions for different users", () => { - const manager = new UserSessionManager({ providers: [() => new LocationSource()] }) + test("getOrCreate returns different sessions for different users", async () => { + const manager = new UserSessionManager({ providers: [async () => new LocationSource()] }) - const session1 = manager.getOrCreate("user-1") - const session2 = manager.getOrCreate("user-2") + const session1 = await manager.getOrCreate("user-1") + const session2 = await manager.getOrCreate("user-2") expect(session1).not.toBe(session2) }) - test("each user gets independent source instances", () => { - const manager = new UserSessionManager({ providers: [() => new LocationSource()] }) + test("each user gets independent source instances", async () => { + const manager = new UserSessionManager({ providers: [async () => new LocationSource()] }) - const session1 = manager.getOrCreate("user-1") - const session2 = manager.getOrCreate("user-2") + const session1 = await manager.getOrCreate("user-1") + const session2 = await manager.getOrCreate("user-2") const source1 = session1.getSource("aelis.location") const source2 = session2.getSource("aelis.location") @@ -50,58 +50,58 @@ describe("UserSessionManager", () => { expect(source1).not.toBe(source2) }) - test("remove destroys session and allows re-creation", () => { - const manager = new UserSessionManager({ providers: [() => new LocationSource()] }) + test("remove destroys session and allows re-creation", async () => { + const manager = new UserSessionManager({ providers: [async () => new LocationSource()] }) - const session1 = manager.getOrCreate("user-1") + const session1 = await manager.getOrCreate("user-1") manager.remove("user-1") - const session2 = manager.getOrCreate("user-1") + const session2 = await manager.getOrCreate("user-1") expect(session1).not.toBe(session2) }) test("remove is no-op for unknown user", () => { - const manager = new UserSessionManager({ providers: [() => new LocationSource()] }) + const manager = new UserSessionManager({ providers: [async () => new LocationSource()] }) expect(() => manager.remove("unknown")).not.toThrow() }) test("accepts function providers", async () => { - const manager = new UserSessionManager({ providers: [() => new LocationSource()] }) + const manager = new UserSessionManager({ providers: [async () => new LocationSource()] }) - const session = manager.getOrCreate("user-1") + const session = await manager.getOrCreate("user-1") const result = await session.engine.refresh() expect(result.errors).toHaveLength(0) }) - test("accepts object providers", () => { + test("accepts object providers", async () => { const provider = new WeatherSourceProvider({ client: mockWeatherClient }) const manager = new UserSessionManager({ - providers: [() => new LocationSource(), provider], + providers: [async () => new LocationSource(), provider], }) - const session = manager.getOrCreate("user-1") + const session = await manager.getOrCreate("user-1") expect(session.getSource("aelis.weather")).toBeDefined() }) - test("accepts mixed providers", () => { + test("accepts mixed providers", async () => { const provider = new WeatherSourceProvider({ client: mockWeatherClient }) const manager = new UserSessionManager({ - providers: [() => new LocationSource(), provider], + providers: [async () => new LocationSource(), provider], }) - const session = manager.getOrCreate("user-1") + const session = await manager.getOrCreate("user-1") expect(session.getSource("aelis.location")).toBeDefined() expect(session.getSource("aelis.weather")).toBeDefined() }) test("refresh returns feed result through session", async () => { - const manager = new UserSessionManager({ providers: [() => new LocationSource()] }) + const manager = new UserSessionManager({ providers: [async () => new LocationSource()] }) - const session = manager.getOrCreate("user-1") + const session = await manager.getOrCreate("user-1") const result = await session.engine.refresh() expect(result).toHaveProperty("context") @@ -111,9 +111,9 @@ describe("UserSessionManager", () => { }) test("location update via executeAction works", async () => { - const manager = new UserSessionManager({ providers: [() => new LocationSource()] }) + const manager = new UserSessionManager({ providers: [async () => new LocationSource()] }) - const session = manager.getOrCreate("user-1") + const session = await manager.getOrCreate("user-1") await session.engine.executeAction("aelis.location", "update-location", { lat: 51.5074, lng: -0.1278, @@ -126,10 +126,10 @@ describe("UserSessionManager", () => { }) test("subscribe receives updates after location push", async () => { - const manager = new UserSessionManager({ providers: [() => new LocationSource()] }) + const manager = new UserSessionManager({ providers: [async () => new LocationSource()] }) const callback = mock() - const session = manager.getOrCreate("user-1") + const session = await manager.getOrCreate("user-1") session.engine.subscribe(callback) await session.engine.executeAction("aelis.location", "update-location", { @@ -146,16 +146,16 @@ describe("UserSessionManager", () => { }) test("remove stops reactive updates", async () => { - const manager = new UserSessionManager({ providers: [() => new LocationSource()] }) + const manager = new UserSessionManager({ providers: [async () => new LocationSource()] }) const callback = mock() - const session = manager.getOrCreate("user-1") + const session = await manager.getOrCreate("user-1") session.engine.subscribe(callback) manager.remove("user-1") // Create new session and push location — old callback should not fire - const session2 = manager.getOrCreate("user-1") + const session2 = await manager.getOrCreate("user-1") await session2.engine.executeAction("aelis.location", "update-location", { lat: 51.5074, lng: -0.1278, @@ -167,4 +167,93 @@ describe("UserSessionManager", () => { expect(callback).not.toHaveBeenCalled() }) + + test("creates session with successful providers when some fail", async () => { + const manager = new UserSessionManager({ + providers: [ + async () => new LocationSource(), + async () => { + throw new Error("provider failed") + }, + ], + }) + + const spy = spyOn(console, "error").mockImplementation(() => {}) + + const session = await manager.getOrCreate("user-1") + + expect(session).toBeDefined() + expect(session.getSource("aelis.location")).toBeDefined() + expect(spy).toHaveBeenCalled() + + spy.mockRestore() + }) + + test("throws AggregateError when all providers fail", async () => { + const manager = new UserSessionManager({ + providers: [ + async () => { + throw new Error("first failed") + }, + async () => { + throw new Error("second failed") + }, + ], + }) + + await expect(manager.getOrCreate("user-1")).rejects.toBeInstanceOf(AggregateError) + }) + + test("concurrent getOrCreate for same user returns same session", async () => { + let callCount = 0 + const manager = new UserSessionManager({ + providers: [ + async () => { + callCount++ + // Simulate async work to widen the race window + await new Promise((resolve) => setTimeout(resolve, 10)) + return new LocationSource() + }, + ], + }) + + const [session1, session2] = await Promise.all([ + manager.getOrCreate("user-1"), + manager.getOrCreate("user-1"), + ]) + + expect(session1).toBe(session2) + expect(callCount).toBe(1) + }) + + test("remove during in-flight getOrCreate prevents session from being stored", async () => { + let resolveProvider: () => void + const providerGate = new Promise((r) => { + resolveProvider = r + }) + + const manager = new UserSessionManager({ + providers: [ + async () => { + await providerGate + return new LocationSource() + }, + ], + }) + + const sessionPromise = manager.getOrCreate("user-1") + + // remove() while provider is still resolving + manager.remove("user-1") + + // Let the provider finish + resolveProvider!() + + await expect(sessionPromise).rejects.toThrow("removed during creation") + + // A fresh getOrCreate should produce a new session, not the cancelled one + const freshSession = await manager.getOrCreate("user-1") + expect(freshSession).toBeDefined() + expect(freshSession.engine).toBeDefined() + }) }) diff --git a/apps/aelis-backend/src/session/user-session-manager.ts b/apps/aelis-backend/src/session/user-session-manager.ts index 58dac4d..df82908 100644 --- a/apps/aelis-backend/src/session/user-session-manager.ts +++ b/apps/aelis-backend/src/session/user-session-manager.ts @@ -1,3 +1,5 @@ +import type { FeedSource } from "@aelis/core" + import type { FeedEnhancer } from "../enhancement/enhance-feed.ts" import type { FeedSourceProviderInput } from "./feed-source-provider.ts" @@ -10,6 +12,7 @@ export interface UserSessionManagerConfig { export class UserSessionManager { private sessions = new Map() + private pending = new Map>() private readonly providers: FeedSourceProviderInput[] private readonly feedEnhancer: FeedEnhancer | null @@ -18,16 +21,28 @@ export class UserSessionManager { this.feedEnhancer = config.feedEnhancer ?? null } - getOrCreate(userId: string): UserSession { - let session = this.sessions.get(userId) - if (!session) { - const sources = this.providers.map((p) => - typeof p === "function" ? p(userId) : p.feedSourceForUser(userId), - ) - session = new UserSession(sources, this.feedEnhancer) + async getOrCreate(userId: string): Promise { + const existing = this.sessions.get(userId) + if (existing) return existing + + const inflight = this.pending.get(userId) + if (inflight) return inflight + + const promise = this.createSession(userId) + this.pending.set(userId, promise) + try { + const session = await promise + // If remove() was called while we were awaiting, it clears the + // pending entry. Detect that and destroy the session immediately. + if (!this.pending.has(userId)) { + session.destroy() + throw new Error(`Session for user ${userId} was removed during creation`) + } this.sessions.set(userId, session) + return session + } finally { + this.pending.delete(userId) } - return session } remove(userId: string): void { @@ -36,5 +51,36 @@ export class UserSessionManager { session.destroy() this.sessions.delete(userId) } + // Cancel any in-flight creation so getOrCreate won't store the session + this.pending.delete(userId) + } + + private async createSession(userId: string): Promise { + const results = await Promise.allSettled( + this.providers.map((p) => + typeof p === "function" ? p(userId) : p.feedSourceForUser(userId), + ), + ) + + const sources: FeedSource[] = [] + const errors: unknown[] = [] + + for (const result of results) { + if (result.status === "fulfilled") { + sources.push(result.value) + } else { + errors.push(result.reason) + } + } + + if (sources.length === 0 && errors.length > 0) { + throw new AggregateError(errors, "All feed source providers failed") + } + + for (const error of errors) { + console.error("[UserSessionManager] Feed source provider failed:", error) + } + + return new UserSession(sources, this.feedEnhancer) } } diff --git a/apps/aelis-backend/src/tfl/provider.ts b/apps/aelis-backend/src/tfl/provider.ts index d810ec3..4e84ced 100644 --- a/apps/aelis-backend/src/tfl/provider.ts +++ b/apps/aelis-backend/src/tfl/provider.ts @@ -13,7 +13,7 @@ export class TflSourceProvider implements FeedSourceProvider { this.options = options } - feedSourceForUser(_userId: string): TflSource { + async feedSourceForUser(_userId: string): Promise { return new TflSource(this.options) } } diff --git a/apps/aelis-backend/src/weather/provider.ts b/apps/aelis-backend/src/weather/provider.ts index 8af3bc6..626ec87 100644 --- a/apps/aelis-backend/src/weather/provider.ts +++ b/apps/aelis-backend/src/weather/provider.ts @@ -9,7 +9,7 @@ export class WeatherSourceProvider implements FeedSourceProvider { this.options = options } - feedSourceForUser(_userId: string): WeatherSource { + async feedSourceForUser(_userId: string): Promise { return new WeatherSource(this.options) } }