From 2b1a50349c18bb2fb1dfea5aba74aeb639997f97 Mon Sep 17 00:00:00 2001 From: kenneth Date: Thu, 5 Mar 2026 01:48:24 +0000 Subject: [PATCH] refactor: move feed enhancement into UserSession Move enhancement logic from HTTP handler into UserSession so the transport layer has no knowledge of enhancement. UserSession.feed() handles refresh, enhancement, and caching in one place. - UserSession subscribes to engine updates and re-enhances eagerly - Enhancement cache tracks source identity to prevent stale results - UserSessionManager accepts config object with optional enhancer - HTTP handler simplified to just call session.feed() Co-authored-by: Ona --- apps/aris-backend/src/feed/http.test.ts | 73 +-------- apps/aris-backend/src/feed/http.ts | 20 +-- apps/aris-backend/src/server.ts | 28 ++-- .../src/session/user-session-manager.test.ts | 30 ++-- .../src/session/user-session-manager.ts | 14 +- .../src/session/user-session.test.ts | 144 +++++++++++++++++- apps/aris-backend/src/session/user-session.ts | 84 +++++++++- packages/aris-core/src/context.ts | 5 +- 8 files changed, 278 insertions(+), 120 deletions(-) diff --git a/apps/aris-backend/src/feed/http.test.ts b/apps/aris-backend/src/feed/http.test.ts index ab7b9b4..a7a5343 100644 --- a/apps/aris-backend/src/feed/http.test.ts +++ b/apps/aris-backend/src/feed/http.test.ts @@ -41,14 +41,13 @@ function buildTestApp(sessionManager: UserSessionManager, userId?: string) { registerFeedHttpHandlers(app, { sessionManager, authSessionMiddleware: mockAuthSessionMiddleware(userId), - feedEnhancer: null, }) return app } describe("GET /api/feed", () => { test("returns 401 without auth", async () => { - const manager = new UserSessionManager([]) + const manager = new UserSessionManager({ providers: [] }) const app = buildTestApp(manager) const res = await app.request("/api/feed") @@ -66,7 +65,9 @@ describe("GET /api/feed", () => { data: { value: 42 }, }, ] - const manager = new UserSessionManager([() => createStubSource("test", items)]) + const manager = new UserSessionManager({ + providers: [() => createStubSource("test", items)], + }) const app = buildTestApp(manager, "user-1") // Prime the cache @@ -96,7 +97,9 @@ describe("GET /api/feed", () => { data: { fresh: true }, }, ] - const manager = new UserSessionManager([() => createStubSource("test", items)]) + const manager = new UserSessionManager({ + providers: [() => createStubSource("test", items)], + }) const app = buildTestApp(manager, "user-1") // No prior refresh — lastFeed() returns null, handler should call refresh() @@ -110,66 +113,6 @@ describe("GET /api/feed", () => { expect(body.errors).toHaveLength(0) }) - test("returns enhanced items when feedEnhancer is provided", async () => { - const items: FeedItem[] = [ - { - id: "item-1", - type: "test", - timestamp: new Date("2025-01-01T00:00:00.000Z"), - data: { value: 42 }, - }, - ] - const manager = new UserSessionManager([() => createStubSource("test", items)]) - - const enhancer = async (feedItems: FeedItem[]) => - feedItems.map((item) => ({ ...item, data: { ...item.data, enhanced: true } })) - - const app = new Hono() - registerFeedHttpHandlers(app, { - sessionManager: manager, - authSessionMiddleware: mockAuthSessionMiddleware("user-1"), - feedEnhancer: enhancer, - }) - - const res = await app.request("/api/feed") - - expect(res.status).toBe(200) - const body = (await res.json()) as FeedResponse - expect(body.items).toHaveLength(1) - expect(body.items[0]!.data.enhanced).toBe(true) - }) - - test("falls back to raw items when feedEnhancer throws", async () => { - const items: FeedItem[] = [ - { - id: "item-1", - type: "test", - timestamp: new Date("2025-01-01T00:00:00.000Z"), - data: { value: 42 }, - }, - ] - const manager = new UserSessionManager([() => createStubSource("test", items)]) - - const enhancer = async () => { - throw new Error("enhancement exploded") - } - - const app = new Hono() - registerFeedHttpHandlers(app, { - sessionManager: manager, - authSessionMiddleware: mockAuthSessionMiddleware("user-1"), - feedEnhancer: enhancer, - }) - - const res = await app.request("/api/feed") - - expect(res.status).toBe(200) - const body = (await res.json()) as FeedResponse - expect(body.items).toHaveLength(1) - expect(body.items[0]!.id).toBe("item-1") - expect(body.items[0]!.data.value).toBe(42) - }) - test("serializes source errors as message strings", async () => { const failingSource: FeedSource = { id: "failing", @@ -186,7 +129,7 @@ describe("GET /api/feed", () => { throw new Error("connection timeout") }, } - const manager = new UserSessionManager([() => failingSource]) + const manager = new UserSessionManager({ providers: [() => failingSource] }) const app = buildTestApp(manager, "user-1") const res = await app.request("/api/feed") diff --git a/apps/aris-backend/src/feed/http.ts b/apps/aris-backend/src/feed/http.ts index 545104e..185926e 100644 --- a/apps/aris-backend/src/feed/http.ts +++ b/apps/aris-backend/src/feed/http.ts @@ -3,29 +3,25 @@ import type { Context, Hono } from "hono" import { createMiddleware } from "hono/factory" import type { AuthSessionMiddleware } from "../auth/session-middleware.ts" -import type { FeedEnhancer } from "../enhancement/enhance-feed.ts" import type { UserSessionManager } from "../session/index.ts" type Env = { Variables: { sessionManager: UserSessionManager - feedEnhancer: FeedEnhancer | null } } interface FeedHttpHandlersDeps { sessionManager: UserSessionManager authSessionMiddleware: AuthSessionMiddleware - feedEnhancer: FeedEnhancer | null } export function registerFeedHttpHandlers( app: Hono, - { sessionManager, authSessionMiddleware, feedEnhancer }: FeedHttpHandlersDeps, + { sessionManager, authSessionMiddleware }: FeedHttpHandlersDeps, ) { const inject = createMiddleware(async (c, next) => { c.set("sessionManager", sessionManager) - c.set("feedEnhancer", feedEnhancer) await next() }) @@ -37,20 +33,10 @@ async function handleGetFeed(c: Context) { const sessionManager = c.get("sessionManager") const session = sessionManager.getOrCreate(user.id) - const feed = session.engine.lastFeed() ?? (await session.engine.refresh()) - - let items = feed.items - const enhance = c.get("feedEnhancer") - if (enhance) { - try { - items = await enhance(feed.items) - } catch (err) { - console.error("[enhancement] Unexpected error, returning unenhanced feed:", err) - } - } + const feed = await session.feed() return c.json({ - items, + items: feed.items, errors: feed.errors.map((e) => ({ sourceId: e.sourceId, error: e.error.message, diff --git a/apps/aris-backend/src/server.ts b/apps/aris-backend/src/server.ts index a1af4b2..dbfc163 100644 --- a/apps/aris-backend/src/server.ts +++ b/apps/aris-backend/src/server.ts @@ -11,18 +11,6 @@ import { UserSessionManager } from "./session/index.ts" import { WeatherSourceProvider } from "./weather/provider.ts" function main() { - const sessionManager = new UserSessionManager([ - () => new LocationSource(), - new WeatherSourceProvider({ - credentials: { - privateKey: process.env.WEATHERKIT_PRIVATE_KEY!, - keyId: process.env.WEATHERKIT_KEY_ID!, - teamId: process.env.WEATHERKIT_TEAM_ID!, - serviceId: process.env.WEATHERKIT_SERVICE_ID!, - }, - }), - ]) - const openrouterApiKey = process.env.OPENROUTER_API_KEY const feedEnhancer = openrouterApiKey ? createFeedEnhancer({ @@ -36,6 +24,21 @@ function main() { console.warn("[enhancement] OPENROUTER_API_KEY not set — feed enhancement disabled") } + const sessionManager = new UserSessionManager({ + providers: [ + () => new LocationSource(), + new WeatherSourceProvider({ + credentials: { + privateKey: process.env.WEATHERKIT_PRIVATE_KEY!, + keyId: process.env.WEATHERKIT_KEY_ID!, + teamId: process.env.WEATHERKIT_TEAM_ID!, + serviceId: process.env.WEATHERKIT_SERVICE_ID!, + }, + }), + ], + feedEnhancer, + }) + const app = new Hono() app.get("/health", (c) => c.json({ status: "ok" })) @@ -44,7 +47,6 @@ function main() { registerFeedHttpHandlers(app, { sessionManager, authSessionMiddleware: requireSession, - feedEnhancer, }) registerLocationHttpHandlers(app, { sessionManager }) diff --git a/apps/aris-backend/src/session/user-session-manager.test.ts b/apps/aris-backend/src/session/user-session-manager.test.ts index 19a7ddb..3a751bf 100644 --- a/apps/aris-backend/src/session/user-session-manager.test.ts +++ b/apps/aris-backend/src/session/user-session-manager.test.ts @@ -12,7 +12,7 @@ const mockWeatherClient: WeatherKitClient = { describe("UserSessionManager", () => { test("getOrCreate creates session on first call", () => { - const manager = new UserSessionManager([() => new LocationSource()]) + const manager = new UserSessionManager({ providers: [() => new LocationSource()] }) const session = manager.getOrCreate("user-1") @@ -21,7 +21,7 @@ describe("UserSessionManager", () => { }) test("getOrCreate returns same session for same user", () => { - const manager = new UserSessionManager([() => new LocationSource()]) + const manager = new UserSessionManager({ providers: [() => new LocationSource()] }) const session1 = manager.getOrCreate("user-1") const session2 = manager.getOrCreate("user-1") @@ -30,7 +30,7 @@ describe("UserSessionManager", () => { }) test("getOrCreate returns different sessions for different users", () => { - const manager = new UserSessionManager([() => new LocationSource()]) + const manager = new UserSessionManager({ providers: [() => new LocationSource()] }) const session1 = manager.getOrCreate("user-1") const session2 = manager.getOrCreate("user-2") @@ -39,7 +39,7 @@ describe("UserSessionManager", () => { }) test("each user gets independent source instances", () => { - const manager = new UserSessionManager([() => new LocationSource()]) + const manager = new UserSessionManager({ providers: [() => new LocationSource()] }) const session1 = manager.getOrCreate("user-1") const session2 = manager.getOrCreate("user-2") @@ -51,7 +51,7 @@ describe("UserSessionManager", () => { }) test("remove destroys session and allows re-creation", () => { - const manager = new UserSessionManager([() => new LocationSource()]) + const manager = new UserSessionManager({ providers: [() => new LocationSource()] }) const session1 = manager.getOrCreate("user-1") manager.remove("user-1") @@ -61,13 +61,13 @@ describe("UserSessionManager", () => { }) test("remove is no-op for unknown user", () => { - const manager = new UserSessionManager([() => new LocationSource()]) + const manager = new UserSessionManager({ providers: [() => new LocationSource()] }) expect(() => manager.remove("unknown")).not.toThrow() }) test("accepts function providers", async () => { - const manager = new UserSessionManager([() => new LocationSource()]) + const manager = new UserSessionManager({ providers: [() => new LocationSource()] }) const session = manager.getOrCreate("user-1") const result = await session.engine.refresh() @@ -77,7 +77,9 @@ describe("UserSessionManager", () => { test("accepts object providers", () => { const provider = new WeatherSourceProvider({ client: mockWeatherClient }) - const manager = new UserSessionManager([() => new LocationSource(), provider]) + const manager = new UserSessionManager({ + providers: [() => new LocationSource(), provider], + }) const session = manager.getOrCreate("user-1") @@ -86,7 +88,9 @@ describe("UserSessionManager", () => { test("accepts mixed providers", () => { const provider = new WeatherSourceProvider({ client: mockWeatherClient }) - const manager = new UserSessionManager([() => new LocationSource(), provider]) + const manager = new UserSessionManager({ + providers: [() => new LocationSource(), provider], + }) const session = manager.getOrCreate("user-1") @@ -95,7 +99,7 @@ describe("UserSessionManager", () => { }) test("refresh returns feed result through session", async () => { - const manager = new UserSessionManager([() => new LocationSource()]) + const manager = new UserSessionManager({ providers: [() => new LocationSource()] }) const session = manager.getOrCreate("user-1") const result = await session.engine.refresh() @@ -107,7 +111,7 @@ describe("UserSessionManager", () => { }) test("location update via executeAction works", async () => { - const manager = new UserSessionManager([() => new LocationSource()]) + const manager = new UserSessionManager({ providers: [() => new LocationSource()] }) const session = manager.getOrCreate("user-1") await session.engine.executeAction("aris.location", "update-location", { @@ -122,7 +126,7 @@ describe("UserSessionManager", () => { }) test("subscribe receives updates after location push", async () => { - const manager = new UserSessionManager([() => new LocationSource()]) + const manager = new UserSessionManager({ providers: [() => new LocationSource()] }) const callback = mock() const session = manager.getOrCreate("user-1") @@ -142,7 +146,7 @@ describe("UserSessionManager", () => { }) test("remove stops reactive updates", async () => { - const manager = new UserSessionManager([() => new LocationSource()]) + const manager = new UserSessionManager({ providers: [() => new LocationSource()] }) const callback = mock() const session = manager.getOrCreate("user-1") diff --git a/apps/aris-backend/src/session/user-session-manager.ts b/apps/aris-backend/src/session/user-session-manager.ts index 7c9251e..58dac4d 100644 --- a/apps/aris-backend/src/session/user-session-manager.ts +++ b/apps/aris-backend/src/session/user-session-manager.ts @@ -1,13 +1,21 @@ +import type { FeedEnhancer } from "../enhancement/enhance-feed.ts" import type { FeedSourceProviderInput } from "./feed-source-provider.ts" import { UserSession } from "./user-session.ts" +export interface UserSessionManagerConfig { + providers: FeedSourceProviderInput[] + feedEnhancer?: FeedEnhancer | null +} + export class UserSessionManager { private sessions = new Map() private readonly providers: FeedSourceProviderInput[] + private readonly feedEnhancer: FeedEnhancer | null - constructor(providers: FeedSourceProviderInput[]) { - this.providers = providers + constructor(config: UserSessionManagerConfig) { + this.providers = config.providers + this.feedEnhancer = config.feedEnhancer ?? null } getOrCreate(userId: string): UserSession { @@ -16,7 +24,7 @@ export class UserSessionManager { const sources = this.providers.map((p) => typeof p === "function" ? p(userId) : p.feedSourceForUser(userId), ) - session = new UserSession(sources) + session = new UserSession(sources, this.feedEnhancer) this.sessions.set(userId, session) } return session diff --git a/apps/aris-backend/src/session/user-session.test.ts b/apps/aris-backend/src/session/user-session.test.ts index 794221d..241ddb4 100644 --- a/apps/aris-backend/src/session/user-session.test.ts +++ b/apps/aris-backend/src/session/user-session.test.ts @@ -1,11 +1,11 @@ -import type { ActionDefinition, ContextEntry, FeedSource } from "@aris/core" +import type { ActionDefinition, ContextEntry, FeedItem, FeedSource } from "@aris/core" import { LocationSource } from "@aris/source-location" import { describe, expect, test } from "bun:test" import { UserSession } from "./user-session.ts" -function createStubSource(id: string): FeedSource { +function createStubSource(id: string, items: FeedItem[] = []): FeedSource { return { id, async listActions(): Promise> { @@ -18,7 +18,7 @@ function createStubSource(id: string): FeedSource { return null }, async fetchItems() { - return [] + return items }, } } @@ -70,3 +70,141 @@ describe("UserSession", () => { expect(location.lastLocation!.lat).toBe(51.5) }) }) + +describe("UserSession.feed", () => { + test("returns feed items without enhancer", async () => { + const items: FeedItem[] = [ + { + id: "item-1", + type: "test", + timestamp: new Date("2025-01-01T00:00:00.000Z"), + data: { value: 42 }, + }, + ] + const session = new UserSession([createStubSource("test", items)]) + + const result = await session.feed() + + expect(result.items).toHaveLength(1) + expect(result.items[0]!.id).toBe("item-1") + }) + + test("returns enhanced items when enhancer is provided", async () => { + const items: FeedItem[] = [ + { + id: "item-1", + type: "test", + timestamp: new Date("2025-01-01T00:00:00.000Z"), + data: { value: 42 }, + }, + ] + const enhancer = async (feedItems: FeedItem[]) => + feedItems.map((item) => ({ ...item, data: { ...item.data, enhanced: true } })) + + const session = new UserSession([createStubSource("test", items)], enhancer) + + const result = await session.feed() + + expect(result.items).toHaveLength(1) + expect(result.items[0]!.data.enhanced).toBe(true) + }) + + test("caches enhanced items on subsequent calls", async () => { + const items: FeedItem[] = [ + { + id: "item-1", + type: "test", + timestamp: new Date("2025-01-01T00:00:00.000Z"), + data: { value: 42 }, + }, + ] + let enhancerCallCount = 0 + const enhancer = async (feedItems: FeedItem[]) => { + enhancerCallCount++ + return feedItems.map((item) => ({ ...item, data: { ...item.data, enhanced: true } })) + } + + const session = new UserSession([createStubSource("test", items)], enhancer) + + const result1 = await session.feed() + expect(result1.items[0]!.data.enhanced).toBe(true) + expect(enhancerCallCount).toBe(1) + + const result2 = await session.feed() + expect(result2.items[0]!.data.enhanced).toBe(true) + expect(enhancerCallCount).toBe(1) + }) + + test("re-enhances after engine refresh with new data", async () => { + let currentItems: FeedItem[] = [ + { + id: "item-1", + type: "test", + timestamp: new Date("2025-01-01T00:00:00.000Z"), + data: { version: 1 }, + }, + ] + const source = createStubSource("test", currentItems) + // Make fetchItems dynamic so refresh returns new data + source.fetchItems = async () => currentItems + + const enhancedVersions: number[] = [] + const enhancer = async (feedItems: FeedItem[]) => { + const version = feedItems[0]!.data.version as number + enhancedVersions.push(version) + return feedItems.map((item) => ({ + ...item, + data: { ...item.data, enhanced: true }, + })) + } + + const session = new UserSession([source], enhancer) + + // First feed triggers refresh + enhancement + const result1 = await session.feed() + expect(result1.items[0]!.data.version).toBe(1) + expect(result1.items[0]!.data.enhanced).toBe(true) + + // Update source data and trigger engine refresh + currentItems = [ + { + id: "item-1", + type: "test", + timestamp: new Date("2025-01-02T00:00:00.000Z"), + data: { version: 2 }, + }, + ] + await session.engine.refresh() + + // Wait for subscriber-triggered background enhancement + await new Promise((resolve) => setTimeout(resolve, 10)) + + // feed() should now serve re-enhanced items with version 2 + const result2 = await session.feed() + expect(result2.items[0]!.data.version).toBe(2) + expect(result2.items[0]!.data.enhanced).toBe(true) + expect(enhancedVersions).toEqual([1, 2]) + }) + + test("falls back to unenhanced items when enhancer throws", async () => { + const items: FeedItem[] = [ + { + id: "item-1", + type: "test", + timestamp: new Date("2025-01-01T00:00:00.000Z"), + data: { value: 42 }, + }, + ] + const enhancer = async () => { + throw new Error("enhancement exploded") + } + + const session = new UserSession([createStubSource("test", items)], enhancer) + + const result = await session.feed() + + expect(result.items).toHaveLength(1) + expect(result.items[0]!.id).toBe("item-1") + expect(result.items[0]!.data.value).toBe(42) + }) +}) diff --git a/apps/aris-backend/src/session/user-session.ts b/apps/aris-backend/src/session/user-session.ts index 7e42a63..237506f 100644 --- a/apps/aris-backend/src/session/user-session.ts +++ b/apps/aris-backend/src/session/user-session.ts @@ -1,24 +1,104 @@ -import { FeedEngine, type FeedSource } from "@aris/core" +import { FeedEngine, type FeedItem, type FeedResult, type FeedSource } from "@aris/core" + +import type { FeedEnhancer } from "../enhancement/enhance-feed.ts" export class UserSession { readonly engine: FeedEngine private sources = new Map() + private readonly enhancer: FeedEnhancer | null + private enhancedItems: FeedItem[] | null = null + /** The FeedResult that enhancedItems was derived from. */ + private enhancedSource: FeedResult | null = null + private enhancingPromise: Promise | null = null + private unsubscribe: (() => void) | null = null - constructor(sources: FeedSource[]) { + constructor(sources: FeedSource[], enhancer?: FeedEnhancer | null) { this.engine = new FeedEngine() + this.enhancer = enhancer ?? null for (const source of sources) { this.sources.set(source.id, source) this.engine.register(source) } + + if (this.enhancer) { + this.unsubscribe = this.engine.subscribe((result) => { + this.invalidateEnhancement() + this.runEnhancement(result) + }) + } + this.engine.start() } + /** + * Returns the current feed, refreshing if the engine cache expired. + * Enhancement runs eagerly on engine updates; this method awaits + * any in-flight enhancement or triggers one if needed. + */ + async feed(): Promise { + const cached = this.engine.lastFeed() + const result = cached ?? (await this.engine.refresh()) + + if (!this.enhancer) { + return result + } + + // Wait for any in-flight background enhancement to finish + if (this.enhancingPromise) { + await this.enhancingPromise + } + + // Serve cached enhancement only if it matches the current engine result + if (this.enhancedItems && this.enhancedSource === result) { + return { ...result, items: this.enhancedItems } + } + + // Stale or missing — re-enhance + await this.runEnhancement(result) + + if (this.enhancedItems) { + return { ...result, items: this.enhancedItems } + } + + return result + } + getSource(sourceId: string): T | undefined { return this.sources.get(sourceId) as T | undefined } destroy(): void { + this.unsubscribe?.() + this.unsubscribe = null this.engine.stop() this.sources.clear() + this.invalidateEnhancement() + this.enhancingPromise = null + } + + private invalidateEnhancement(): void { + this.enhancedItems = null + this.enhancedSource = null + } + + private runEnhancement(result: FeedResult): Promise { + const promise = this.enhance(result) + this.enhancingPromise = promise + promise.finally(() => { + if (this.enhancingPromise === promise) { + this.enhancingPromise = null + } + }) + return promise + } + + private async enhance(result: FeedResult): Promise { + try { + this.enhancedItems = await this.enhancer!(result.items) + this.enhancedSource = result + } catch (err) { + console.error("[enhancement] Unexpected error:", err) + this.invalidateEnhancement() + } } } diff --git a/packages/aris-core/src/context.ts b/packages/aris-core/src/context.ts index 5d1ba22..958a362 100644 --- a/packages/aris-core/src/context.ts +++ b/packages/aris-core/src/context.ts @@ -61,10 +61,7 @@ function partsEqual(a: unknown, b: unknown): boolean { const bKeys = Object.keys(b) if (aKeys.length !== bKeys.length) return false return aKeys.every((key) => - partsEqual( - (a as Record)[key], - (b as Record)[key], - ), + partsEqual((a as Record)[key], (b as Record)[key]), ) } return false