refactor: move feed enhancement into UserSession

Move enhancement logic from HTTP handler into UserSession so the
transport layer has no knowledge of enhancement. UserSession.feed()
handles refresh, enhancement, and caching in one place.

- UserSession subscribes to engine updates and re-enhances eagerly
- Enhancement cache tracks source identity to prevent stale results
- UserSessionManager accepts config object with optional enhancer
- HTTP handler simplified to just call session.feed()

Co-authored-by: Ona <no-reply@ona.com>
This commit is contained in:
2026-03-05 01:48:24 +00:00
parent bb92c9f227
commit 2b1a50349c
8 changed files with 278 additions and 120 deletions

View File

@@ -41,14 +41,13 @@ function buildTestApp(sessionManager: UserSessionManager, userId?: string) {
registerFeedHttpHandlers(app, {
sessionManager,
authSessionMiddleware: mockAuthSessionMiddleware(userId),
feedEnhancer: null,
})
return app
}
describe("GET /api/feed", () => {
test("returns 401 without auth", async () => {
const manager = new UserSessionManager([])
const manager = new UserSessionManager({ providers: [] })
const app = buildTestApp(manager)
const res = await app.request("/api/feed")
@@ -66,7 +65,9 @@ describe("GET /api/feed", () => {
data: { value: 42 },
},
]
const manager = new UserSessionManager([() => createStubSource("test", items)])
const manager = new UserSessionManager({
providers: [() => createStubSource("test", items)],
})
const app = buildTestApp(manager, "user-1")
// Prime the cache
@@ -96,7 +97,9 @@ describe("GET /api/feed", () => {
data: { fresh: true },
},
]
const manager = new UserSessionManager([() => createStubSource("test", items)])
const manager = new UserSessionManager({
providers: [() => createStubSource("test", items)],
})
const app = buildTestApp(manager, "user-1")
// No prior refresh — lastFeed() returns null, handler should call refresh()
@@ -110,66 +113,6 @@ describe("GET /api/feed", () => {
expect(body.errors).toHaveLength(0)
})
test("returns enhanced items when feedEnhancer is provided", async () => {
const items: FeedItem[] = [
{
id: "item-1",
type: "test",
timestamp: new Date("2025-01-01T00:00:00.000Z"),
data: { value: 42 },
},
]
const manager = new UserSessionManager([() => createStubSource("test", items)])
const enhancer = async (feedItems: FeedItem[]) =>
feedItems.map((item) => ({ ...item, data: { ...item.data, enhanced: true } }))
const app = new Hono()
registerFeedHttpHandlers(app, {
sessionManager: manager,
authSessionMiddleware: mockAuthSessionMiddleware("user-1"),
feedEnhancer: enhancer,
})
const res = await app.request("/api/feed")
expect(res.status).toBe(200)
const body = (await res.json()) as FeedResponse
expect(body.items).toHaveLength(1)
expect(body.items[0]!.data.enhanced).toBe(true)
})
test("falls back to raw items when feedEnhancer throws", async () => {
const items: FeedItem[] = [
{
id: "item-1",
type: "test",
timestamp: new Date("2025-01-01T00:00:00.000Z"),
data: { value: 42 },
},
]
const manager = new UserSessionManager([() => createStubSource("test", items)])
const enhancer = async () => {
throw new Error("enhancement exploded")
}
const app = new Hono()
registerFeedHttpHandlers(app, {
sessionManager: manager,
authSessionMiddleware: mockAuthSessionMiddleware("user-1"),
feedEnhancer: enhancer,
})
const res = await app.request("/api/feed")
expect(res.status).toBe(200)
const body = (await res.json()) as FeedResponse
expect(body.items).toHaveLength(1)
expect(body.items[0]!.id).toBe("item-1")
expect(body.items[0]!.data.value).toBe(42)
})
test("serializes source errors as message strings", async () => {
const failingSource: FeedSource = {
id: "failing",
@@ -186,7 +129,7 @@ describe("GET /api/feed", () => {
throw new Error("connection timeout")
},
}
const manager = new UserSessionManager([() => failingSource])
const manager = new UserSessionManager({ providers: [() => failingSource] })
const app = buildTestApp(manager, "user-1")
const res = await app.request("/api/feed")

View File

@@ -3,29 +3,25 @@ import type { Context, Hono } from "hono"
import { createMiddleware } from "hono/factory"
import type { AuthSessionMiddleware } from "../auth/session-middleware.ts"
import type { FeedEnhancer } from "../enhancement/enhance-feed.ts"
import type { UserSessionManager } from "../session/index.ts"
type Env = {
Variables: {
sessionManager: UserSessionManager
feedEnhancer: FeedEnhancer | null
}
}
interface FeedHttpHandlersDeps {
sessionManager: UserSessionManager
authSessionMiddleware: AuthSessionMiddleware
feedEnhancer: FeedEnhancer | null
}
export function registerFeedHttpHandlers(
app: Hono,
{ sessionManager, authSessionMiddleware, feedEnhancer }: FeedHttpHandlersDeps,
{ sessionManager, authSessionMiddleware }: FeedHttpHandlersDeps,
) {
const inject = createMiddleware<Env>(async (c, next) => {
c.set("sessionManager", sessionManager)
c.set("feedEnhancer", feedEnhancer)
await next()
})
@@ -37,20 +33,10 @@ async function handleGetFeed(c: Context<Env>) {
const sessionManager = c.get("sessionManager")
const session = sessionManager.getOrCreate(user.id)
const feed = session.engine.lastFeed() ?? (await session.engine.refresh())
let items = feed.items
const enhance = c.get("feedEnhancer")
if (enhance) {
try {
items = await enhance(feed.items)
} catch (err) {
console.error("[enhancement] Unexpected error, returning unenhanced feed:", err)
}
}
const feed = await session.feed()
return c.json({
items,
items: feed.items,
errors: feed.errors.map((e) => ({
sourceId: e.sourceId,
error: e.error.message,

View File

@@ -11,18 +11,6 @@ import { UserSessionManager } from "./session/index.ts"
import { WeatherSourceProvider } from "./weather/provider.ts"
function main() {
const sessionManager = new UserSessionManager([
() => new LocationSource(),
new WeatherSourceProvider({
credentials: {
privateKey: process.env.WEATHERKIT_PRIVATE_KEY!,
keyId: process.env.WEATHERKIT_KEY_ID!,
teamId: process.env.WEATHERKIT_TEAM_ID!,
serviceId: process.env.WEATHERKIT_SERVICE_ID!,
},
}),
])
const openrouterApiKey = process.env.OPENROUTER_API_KEY
const feedEnhancer = openrouterApiKey
? createFeedEnhancer({
@@ -36,6 +24,21 @@ function main() {
console.warn("[enhancement] OPENROUTER_API_KEY not set — feed enhancement disabled")
}
const sessionManager = new UserSessionManager({
providers: [
() => new LocationSource(),
new WeatherSourceProvider({
credentials: {
privateKey: process.env.WEATHERKIT_PRIVATE_KEY!,
keyId: process.env.WEATHERKIT_KEY_ID!,
teamId: process.env.WEATHERKIT_TEAM_ID!,
serviceId: process.env.WEATHERKIT_SERVICE_ID!,
},
}),
],
feedEnhancer,
})
const app = new Hono()
app.get("/health", (c) => c.json({ status: "ok" }))
@@ -44,7 +47,6 @@ function main() {
registerFeedHttpHandlers(app, {
sessionManager,
authSessionMiddleware: requireSession,
feedEnhancer,
})
registerLocationHttpHandlers(app, { sessionManager })

View File

@@ -12,7 +12,7 @@ const mockWeatherClient: WeatherKitClient = {
describe("UserSessionManager", () => {
test("getOrCreate creates session on first call", () => {
const manager = new UserSessionManager([() => new LocationSource()])
const manager = new UserSessionManager({ providers: [() => new LocationSource()] })
const session = manager.getOrCreate("user-1")
@@ -21,7 +21,7 @@ describe("UserSessionManager", () => {
})
test("getOrCreate returns same session for same user", () => {
const manager = new UserSessionManager([() => new LocationSource()])
const manager = new UserSessionManager({ providers: [() => new LocationSource()] })
const session1 = manager.getOrCreate("user-1")
const session2 = manager.getOrCreate("user-1")
@@ -30,7 +30,7 @@ describe("UserSessionManager", () => {
})
test("getOrCreate returns different sessions for different users", () => {
const manager = new UserSessionManager([() => new LocationSource()])
const manager = new UserSessionManager({ providers: [() => new LocationSource()] })
const session1 = manager.getOrCreate("user-1")
const session2 = manager.getOrCreate("user-2")
@@ -39,7 +39,7 @@ describe("UserSessionManager", () => {
})
test("each user gets independent source instances", () => {
const manager = new UserSessionManager([() => new LocationSource()])
const manager = new UserSessionManager({ providers: [() => new LocationSource()] })
const session1 = manager.getOrCreate("user-1")
const session2 = manager.getOrCreate("user-2")
@@ -51,7 +51,7 @@ describe("UserSessionManager", () => {
})
test("remove destroys session and allows re-creation", () => {
const manager = new UserSessionManager([() => new LocationSource()])
const manager = new UserSessionManager({ providers: [() => new LocationSource()] })
const session1 = manager.getOrCreate("user-1")
manager.remove("user-1")
@@ -61,13 +61,13 @@ describe("UserSessionManager", () => {
})
test("remove is no-op for unknown user", () => {
const manager = new UserSessionManager([() => new LocationSource()])
const manager = new UserSessionManager({ providers: [() => new LocationSource()] })
expect(() => manager.remove("unknown")).not.toThrow()
})
test("accepts function providers", async () => {
const manager = new UserSessionManager([() => new LocationSource()])
const manager = new UserSessionManager({ providers: [() => new LocationSource()] })
const session = manager.getOrCreate("user-1")
const result = await session.engine.refresh()
@@ -77,7 +77,9 @@ describe("UserSessionManager", () => {
test("accepts object providers", () => {
const provider = new WeatherSourceProvider({ client: mockWeatherClient })
const manager = new UserSessionManager([() => new LocationSource(), provider])
const manager = new UserSessionManager({
providers: [() => new LocationSource(), provider],
})
const session = manager.getOrCreate("user-1")
@@ -86,7 +88,9 @@ describe("UserSessionManager", () => {
test("accepts mixed providers", () => {
const provider = new WeatherSourceProvider({ client: mockWeatherClient })
const manager = new UserSessionManager([() => new LocationSource(), provider])
const manager = new UserSessionManager({
providers: [() => new LocationSource(), provider],
})
const session = manager.getOrCreate("user-1")
@@ -95,7 +99,7 @@ describe("UserSessionManager", () => {
})
test("refresh returns feed result through session", async () => {
const manager = new UserSessionManager([() => new LocationSource()])
const manager = new UserSessionManager({ providers: [() => new LocationSource()] })
const session = manager.getOrCreate("user-1")
const result = await session.engine.refresh()
@@ -107,7 +111,7 @@ describe("UserSessionManager", () => {
})
test("location update via executeAction works", async () => {
const manager = new UserSessionManager([() => new LocationSource()])
const manager = new UserSessionManager({ providers: [() => new LocationSource()] })
const session = manager.getOrCreate("user-1")
await session.engine.executeAction("aris.location", "update-location", {
@@ -122,7 +126,7 @@ describe("UserSessionManager", () => {
})
test("subscribe receives updates after location push", async () => {
const manager = new UserSessionManager([() => new LocationSource()])
const manager = new UserSessionManager({ providers: [() => new LocationSource()] })
const callback = mock()
const session = manager.getOrCreate("user-1")
@@ -142,7 +146,7 @@ describe("UserSessionManager", () => {
})
test("remove stops reactive updates", async () => {
const manager = new UserSessionManager([() => new LocationSource()])
const manager = new UserSessionManager({ providers: [() => new LocationSource()] })
const callback = mock()
const session = manager.getOrCreate("user-1")

View File

@@ -1,13 +1,21 @@
import type { FeedEnhancer } from "../enhancement/enhance-feed.ts"
import type { FeedSourceProviderInput } from "./feed-source-provider.ts"
import { UserSession } from "./user-session.ts"
export interface UserSessionManagerConfig {
providers: FeedSourceProviderInput[]
feedEnhancer?: FeedEnhancer | null
}
export class UserSessionManager {
private sessions = new Map<string, UserSession>()
private readonly providers: FeedSourceProviderInput[]
private readonly feedEnhancer: FeedEnhancer | null
constructor(providers: FeedSourceProviderInput[]) {
this.providers = providers
constructor(config: UserSessionManagerConfig) {
this.providers = config.providers
this.feedEnhancer = config.feedEnhancer ?? null
}
getOrCreate(userId: string): UserSession {
@@ -16,7 +24,7 @@ export class UserSessionManager {
const sources = this.providers.map((p) =>
typeof p === "function" ? p(userId) : p.feedSourceForUser(userId),
)
session = new UserSession(sources)
session = new UserSession(sources, this.feedEnhancer)
this.sessions.set(userId, session)
}
return session

View File

@@ -1,11 +1,11 @@
import type { ActionDefinition, ContextEntry, FeedSource } from "@aris/core"
import type { ActionDefinition, ContextEntry, FeedItem, FeedSource } from "@aris/core"
import { LocationSource } from "@aris/source-location"
import { describe, expect, test } from "bun:test"
import { UserSession } from "./user-session.ts"
function createStubSource(id: string): FeedSource {
function createStubSource(id: string, items: FeedItem[] = []): FeedSource {
return {
id,
async listActions(): Promise<Record<string, ActionDefinition>> {
@@ -18,7 +18,7 @@ function createStubSource(id: string): FeedSource {
return null
},
async fetchItems() {
return []
return items
},
}
}
@@ -70,3 +70,141 @@ describe("UserSession", () => {
expect(location.lastLocation!.lat).toBe(51.5)
})
})
describe("UserSession.feed", () => {
test("returns feed items without enhancer", async () => {
const items: FeedItem[] = [
{
id: "item-1",
type: "test",
timestamp: new Date("2025-01-01T00:00:00.000Z"),
data: { value: 42 },
},
]
const session = new UserSession([createStubSource("test", items)])
const result = await session.feed()
expect(result.items).toHaveLength(1)
expect(result.items[0]!.id).toBe("item-1")
})
test("returns enhanced items when enhancer is provided", async () => {
const items: FeedItem[] = [
{
id: "item-1",
type: "test",
timestamp: new Date("2025-01-01T00:00:00.000Z"),
data: { value: 42 },
},
]
const enhancer = async (feedItems: FeedItem[]) =>
feedItems.map((item) => ({ ...item, data: { ...item.data, enhanced: true } }))
const session = new UserSession([createStubSource("test", items)], enhancer)
const result = await session.feed()
expect(result.items).toHaveLength(1)
expect(result.items[0]!.data.enhanced).toBe(true)
})
test("caches enhanced items on subsequent calls", async () => {
const items: FeedItem[] = [
{
id: "item-1",
type: "test",
timestamp: new Date("2025-01-01T00:00:00.000Z"),
data: { value: 42 },
},
]
let enhancerCallCount = 0
const enhancer = async (feedItems: FeedItem[]) => {
enhancerCallCount++
return feedItems.map((item) => ({ ...item, data: { ...item.data, enhanced: true } }))
}
const session = new UserSession([createStubSource("test", items)], enhancer)
const result1 = await session.feed()
expect(result1.items[0]!.data.enhanced).toBe(true)
expect(enhancerCallCount).toBe(1)
const result2 = await session.feed()
expect(result2.items[0]!.data.enhanced).toBe(true)
expect(enhancerCallCount).toBe(1)
})
test("re-enhances after engine refresh with new data", async () => {
let currentItems: FeedItem[] = [
{
id: "item-1",
type: "test",
timestamp: new Date("2025-01-01T00:00:00.000Z"),
data: { version: 1 },
},
]
const source = createStubSource("test", currentItems)
// Make fetchItems dynamic so refresh returns new data
source.fetchItems = async () => currentItems
const enhancedVersions: number[] = []
const enhancer = async (feedItems: FeedItem[]) => {
const version = feedItems[0]!.data.version as number
enhancedVersions.push(version)
return feedItems.map((item) => ({
...item,
data: { ...item.data, enhanced: true },
}))
}
const session = new UserSession([source], enhancer)
// First feed triggers refresh + enhancement
const result1 = await session.feed()
expect(result1.items[0]!.data.version).toBe(1)
expect(result1.items[0]!.data.enhanced).toBe(true)
// Update source data and trigger engine refresh
currentItems = [
{
id: "item-1",
type: "test",
timestamp: new Date("2025-01-02T00:00:00.000Z"),
data: { version: 2 },
},
]
await session.engine.refresh()
// Wait for subscriber-triggered background enhancement
await new Promise((resolve) => setTimeout(resolve, 10))
// feed() should now serve re-enhanced items with version 2
const result2 = await session.feed()
expect(result2.items[0]!.data.version).toBe(2)
expect(result2.items[0]!.data.enhanced).toBe(true)
expect(enhancedVersions).toEqual([1, 2])
})
test("falls back to unenhanced items when enhancer throws", async () => {
const items: FeedItem[] = [
{
id: "item-1",
type: "test",
timestamp: new Date("2025-01-01T00:00:00.000Z"),
data: { value: 42 },
},
]
const enhancer = async () => {
throw new Error("enhancement exploded")
}
const session = new UserSession([createStubSource("test", items)], enhancer)
const result = await session.feed()
expect(result.items).toHaveLength(1)
expect(result.items[0]!.id).toBe("item-1")
expect(result.items[0]!.data.value).toBe(42)
})
})

View File

@@ -1,24 +1,104 @@
import { FeedEngine, type FeedSource } from "@aris/core"
import { FeedEngine, type FeedItem, type FeedResult, type FeedSource } from "@aris/core"
import type { FeedEnhancer } from "../enhancement/enhance-feed.ts"
export class UserSession {
readonly engine: FeedEngine
private sources = new Map<string, FeedSource>()
private readonly enhancer: FeedEnhancer | null
private enhancedItems: FeedItem[] | null = null
/** The FeedResult that enhancedItems was derived from. */
private enhancedSource: FeedResult | null = null
private enhancingPromise: Promise<void> | null = null
private unsubscribe: (() => void) | null = null
constructor(sources: FeedSource[]) {
constructor(sources: FeedSource[], enhancer?: FeedEnhancer | null) {
this.engine = new FeedEngine()
this.enhancer = enhancer ?? null
for (const source of sources) {
this.sources.set(source.id, source)
this.engine.register(source)
}
if (this.enhancer) {
this.unsubscribe = this.engine.subscribe((result) => {
this.invalidateEnhancement()
this.runEnhancement(result)
})
}
this.engine.start()
}
/**
* Returns the current feed, refreshing if the engine cache expired.
* Enhancement runs eagerly on engine updates; this method awaits
* any in-flight enhancement or triggers one if needed.
*/
async feed(): Promise<FeedResult> {
const cached = this.engine.lastFeed()
const result = cached ?? (await this.engine.refresh())
if (!this.enhancer) {
return result
}
// Wait for any in-flight background enhancement to finish
if (this.enhancingPromise) {
await this.enhancingPromise
}
// Serve cached enhancement only if it matches the current engine result
if (this.enhancedItems && this.enhancedSource === result) {
return { ...result, items: this.enhancedItems }
}
// Stale or missing — re-enhance
await this.runEnhancement(result)
if (this.enhancedItems) {
return { ...result, items: this.enhancedItems }
}
return result
}
getSource<T extends FeedSource>(sourceId: string): T | undefined {
return this.sources.get(sourceId) as T | undefined
}
destroy(): void {
this.unsubscribe?.()
this.unsubscribe = null
this.engine.stop()
this.sources.clear()
this.invalidateEnhancement()
this.enhancingPromise = null
}
private invalidateEnhancement(): void {
this.enhancedItems = null
this.enhancedSource = null
}
private runEnhancement(result: FeedResult): Promise<void> {
const promise = this.enhance(result)
this.enhancingPromise = promise
promise.finally(() => {
if (this.enhancingPromise === promise) {
this.enhancingPromise = null
}
})
return promise
}
private async enhance(result: FeedResult): Promise<void> {
try {
this.enhancedItems = await this.enhancer!(result.items)
this.enhancedSource = result
} catch (err) {
console.error("[enhancement] Unexpected error:", err)
this.invalidateEnhancement()
}
}
}