mirror of
https://github.com/kennethnym/aris.git
synced 2026-03-20 09:01:19 +00:00
feat(backend): make FeedSourceProvider async
Make feedSourceForUser and FeedSourceProviderFn return promises. Use Promise.allSettled to tolerate partial provider failures. Guard concurrent getOrCreate calls with in-flight promise dedup. Return 503 from HTTP handlers when session creation fails. Co-authored-by: Ona <no-reply@ona.com>
This commit is contained in:
@@ -1,7 +1,7 @@
|
|||||||
import type { ActionDefinition, ContextEntry, FeedItem, FeedSource } from "@aelis/core"
|
import type { ActionDefinition, ContextEntry, FeedItem, FeedSource } from "@aelis/core"
|
||||||
|
|
||||||
import { contextKey } from "@aelis/core"
|
import { contextKey } from "@aelis/core"
|
||||||
import { describe, expect, test } from "bun:test"
|
import { describe, expect, spyOn, test } from "bun:test"
|
||||||
import { Hono } from "hono"
|
import { Hono } from "hono"
|
||||||
|
|
||||||
import { mockAuthSessionMiddleware } from "../auth/session-middleware.ts"
|
import { mockAuthSessionMiddleware } from "../auth/session-middleware.ts"
|
||||||
@@ -72,12 +72,12 @@ describe("GET /api/feed", () => {
|
|||||||
},
|
},
|
||||||
]
|
]
|
||||||
const manager = new UserSessionManager({
|
const manager = new UserSessionManager({
|
||||||
providers: [() => createStubSource("test", items)],
|
providers: [async () => createStubSource("test", items)],
|
||||||
})
|
})
|
||||||
const app = buildTestApp(manager, "user-1")
|
const app = buildTestApp(manager, "user-1")
|
||||||
|
|
||||||
// Prime the cache
|
// Prime the cache
|
||||||
const session = manager.getOrCreate("user-1")
|
const session = await manager.getOrCreate("user-1")
|
||||||
await session.engine.refresh()
|
await session.engine.refresh()
|
||||||
expect(session.engine.lastFeed()).not.toBeNull()
|
expect(session.engine.lastFeed()).not.toBeNull()
|
||||||
|
|
||||||
@@ -105,7 +105,7 @@ describe("GET /api/feed", () => {
|
|||||||
},
|
},
|
||||||
]
|
]
|
||||||
const manager = new UserSessionManager({
|
const manager = new UserSessionManager({
|
||||||
providers: [() => createStubSource("test", items)],
|
providers: [async () => createStubSource("test", items)],
|
||||||
})
|
})
|
||||||
const app = buildTestApp(manager, "user-1")
|
const app = buildTestApp(manager, "user-1")
|
||||||
|
|
||||||
@@ -136,7 +136,7 @@ describe("GET /api/feed", () => {
|
|||||||
throw new Error("connection timeout")
|
throw new Error("connection timeout")
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
const manager = new UserSessionManager({ providers: [() => failingSource] })
|
const manager = new UserSessionManager({ providers: [async () => failingSource] })
|
||||||
const app = buildTestApp(manager, "user-1")
|
const app = buildTestApp(manager, "user-1")
|
||||||
|
|
||||||
const res = await app.request("/api/feed")
|
const res = await app.request("/api/feed")
|
||||||
@@ -148,6 +148,27 @@ describe("GET /api/feed", () => {
|
|||||||
expect(body.errors[0]!.sourceId).toBe("failing")
|
expect(body.errors[0]!.sourceId).toBe("failing")
|
||||||
expect(body.errors[0]!.error).toBe("connection timeout")
|
expect(body.errors[0]!.error).toBe("connection timeout")
|
||||||
})
|
})
|
||||||
|
|
||||||
|
test("returns 503 when all providers fail", async () => {
|
||||||
|
const manager = new UserSessionManager({
|
||||||
|
providers: [
|
||||||
|
async () => {
|
||||||
|
throw new Error("provider down")
|
||||||
|
},
|
||||||
|
],
|
||||||
|
})
|
||||||
|
const app = buildTestApp(manager, "user-1")
|
||||||
|
|
||||||
|
const spy = spyOn(console, "error").mockImplementation(() => {})
|
||||||
|
|
||||||
|
const res = await app.request("/api/feed")
|
||||||
|
|
||||||
|
expect(res.status).toBe(503)
|
||||||
|
const body = (await res.json()) as { error: string }
|
||||||
|
expect(body.error).toBe("Service unavailable")
|
||||||
|
|
||||||
|
spy.mockRestore()
|
||||||
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
describe("GET /api/context", () => {
|
describe("GET /api/context", () => {
|
||||||
@@ -158,12 +179,12 @@ describe("GET /api/context", () => {
|
|||||||
// The mock auth middleware always injects this hardcoded user ID
|
// The mock auth middleware always injects this hardcoded user ID
|
||||||
const mockUserId = "k7Gx2mPqRvNwYs9TdLfA4bHcJeUo1iZn"
|
const mockUserId = "k7Gx2mPqRvNwYs9TdLfA4bHcJeUo1iZn"
|
||||||
|
|
||||||
function buildContextApp(userId?: string) {
|
async function buildContextApp(userId?: string) {
|
||||||
const manager = new UserSessionManager({
|
const manager = new UserSessionManager({
|
||||||
providers: [() => createStubSource("weather", [], contextEntries)],
|
providers: [async () => createStubSource("weather", [], contextEntries)],
|
||||||
})
|
})
|
||||||
const app = buildTestApp(manager, userId)
|
const app = buildTestApp(manager, userId)
|
||||||
const session = manager.getOrCreate(mockUserId)
|
const session = await manager.getOrCreate(mockUserId)
|
||||||
return { app, session }
|
return { app, session }
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -177,7 +198,7 @@ describe("GET /api/context", () => {
|
|||||||
})
|
})
|
||||||
|
|
||||||
test("returns 400 when key param is missing", async () => {
|
test("returns 400 when key param is missing", async () => {
|
||||||
const { app } = buildContextApp("user-1")
|
const { app } = await buildContextApp("user-1")
|
||||||
|
|
||||||
const res = await app.request("/api/context")
|
const res = await app.request("/api/context")
|
||||||
|
|
||||||
@@ -187,7 +208,7 @@ describe("GET /api/context", () => {
|
|||||||
})
|
})
|
||||||
|
|
||||||
test("returns 400 when key is invalid JSON", async () => {
|
test("returns 400 when key is invalid JSON", async () => {
|
||||||
const { app } = buildContextApp("user-1")
|
const { app } = await buildContextApp("user-1")
|
||||||
|
|
||||||
const res = await app.request("/api/context?key=notjson")
|
const res = await app.request("/api/context?key=notjson")
|
||||||
|
|
||||||
@@ -197,7 +218,7 @@ describe("GET /api/context", () => {
|
|||||||
})
|
})
|
||||||
|
|
||||||
test("returns 400 when key is not an array", async () => {
|
test("returns 400 when key is not an array", async () => {
|
||||||
const { app } = buildContextApp("user-1")
|
const { app } = await buildContextApp("user-1")
|
||||||
|
|
||||||
const res = await app.request('/api/context?key="string"')
|
const res = await app.request('/api/context?key="string"')
|
||||||
|
|
||||||
@@ -207,7 +228,7 @@ describe("GET /api/context", () => {
|
|||||||
})
|
})
|
||||||
|
|
||||||
test("returns 400 when key contains invalid element types", async () => {
|
test("returns 400 when key contains invalid element types", async () => {
|
||||||
const { app } = buildContextApp("user-1")
|
const { app } = await buildContextApp("user-1")
|
||||||
|
|
||||||
const res = await app.request("/api/context?key=[true,null,[1,2]]")
|
const res = await app.request("/api/context?key=[true,null,[1,2]]")
|
||||||
|
|
||||||
@@ -217,7 +238,7 @@ describe("GET /api/context", () => {
|
|||||||
})
|
})
|
||||||
|
|
||||||
test("returns 400 when key is an empty array", async () => {
|
test("returns 400 when key is an empty array", async () => {
|
||||||
const { app } = buildContextApp("user-1")
|
const { app } = await buildContextApp("user-1")
|
||||||
|
|
||||||
const res = await app.request("/api/context?key=[]")
|
const res = await app.request("/api/context?key=[]")
|
||||||
|
|
||||||
@@ -227,7 +248,7 @@ describe("GET /api/context", () => {
|
|||||||
})
|
})
|
||||||
|
|
||||||
test("returns 400 when match param is invalid", async () => {
|
test("returns 400 when match param is invalid", async () => {
|
||||||
const { app } = buildContextApp("user-1")
|
const { app } = await buildContextApp("user-1")
|
||||||
|
|
||||||
const res = await app.request('/api/context?key=["aelis.weather"]&match=invalid')
|
const res = await app.request('/api/context?key=["aelis.weather"]&match=invalid')
|
||||||
|
|
||||||
@@ -237,7 +258,7 @@ describe("GET /api/context", () => {
|
|||||||
})
|
})
|
||||||
|
|
||||||
test("returns exact match with match=exact", async () => {
|
test("returns exact match with match=exact", async () => {
|
||||||
const { app, session } = buildContextApp("user-1")
|
const { app, session } = await buildContextApp("user-1")
|
||||||
await session.engine.refresh()
|
await session.engine.refresh()
|
||||||
|
|
||||||
const res = await app.request('/api/context?key=["aelis.weather","weather"]&match=exact')
|
const res = await app.request('/api/context?key=["aelis.weather","weather"]&match=exact')
|
||||||
@@ -249,7 +270,7 @@ describe("GET /api/context", () => {
|
|||||||
})
|
})
|
||||||
|
|
||||||
test("returns 404 with match=exact when only prefix would match", async () => {
|
test("returns 404 with match=exact when only prefix would match", async () => {
|
||||||
const { app, session } = buildContextApp("user-1")
|
const { app, session } = await buildContextApp("user-1")
|
||||||
await session.engine.refresh()
|
await session.engine.refresh()
|
||||||
|
|
||||||
const res = await app.request('/api/context?key=["aelis.weather"]&match=exact')
|
const res = await app.request('/api/context?key=["aelis.weather"]&match=exact')
|
||||||
@@ -258,7 +279,7 @@ describe("GET /api/context", () => {
|
|||||||
})
|
})
|
||||||
|
|
||||||
test("returns prefix match with match=prefix", async () => {
|
test("returns prefix match with match=prefix", async () => {
|
||||||
const { app, session } = buildContextApp("user-1")
|
const { app, session } = await buildContextApp("user-1")
|
||||||
await session.engine.refresh()
|
await session.engine.refresh()
|
||||||
|
|
||||||
const res = await app.request('/api/context?key=["aelis.weather"]&match=prefix')
|
const res = await app.request('/api/context?key=["aelis.weather"]&match=prefix')
|
||||||
@@ -275,7 +296,7 @@ describe("GET /api/context", () => {
|
|||||||
})
|
})
|
||||||
|
|
||||||
test("default mode returns exact match when available", async () => {
|
test("default mode returns exact match when available", async () => {
|
||||||
const { app, session } = buildContextApp("user-1")
|
const { app, session } = await buildContextApp("user-1")
|
||||||
await session.engine.refresh()
|
await session.engine.refresh()
|
||||||
|
|
||||||
const res = await app.request('/api/context?key=["aelis.weather","weather"]')
|
const res = await app.request('/api/context?key=["aelis.weather","weather"]')
|
||||||
@@ -287,7 +308,7 @@ describe("GET /api/context", () => {
|
|||||||
})
|
})
|
||||||
|
|
||||||
test("default mode falls back to prefix when no exact match", async () => {
|
test("default mode falls back to prefix when no exact match", async () => {
|
||||||
const { app, session } = buildContextApp("user-1")
|
const { app, session } = await buildContextApp("user-1")
|
||||||
await session.engine.refresh()
|
await session.engine.refresh()
|
||||||
|
|
||||||
const res = await app.request('/api/context?key=["aelis.weather"]')
|
const res = await app.request('/api/context?key=["aelis.weather"]')
|
||||||
@@ -303,7 +324,7 @@ describe("GET /api/context", () => {
|
|||||||
})
|
})
|
||||||
|
|
||||||
test("returns 404 when neither exact nor prefix matches", async () => {
|
test("returns 404 when neither exact nor prefix matches", async () => {
|
||||||
const { app, session } = buildContextApp("user-1")
|
const { app, session } = await buildContextApp("user-1")
|
||||||
await session.engine.refresh()
|
await session.engine.refresh()
|
||||||
|
|
||||||
const res = await app.request('/api/context?key=["nonexistent"]')
|
const res = await app.request('/api/context?key=["nonexistent"]')
|
||||||
|
|||||||
@@ -33,7 +33,14 @@ export function registerFeedHttpHandlers(
|
|||||||
async function handleGetFeed(c: Context<Env>) {
|
async function handleGetFeed(c: Context<Env>) {
|
||||||
const user = c.get("user")!
|
const user = c.get("user")!
|
||||||
const sessionManager = c.get("sessionManager")
|
const sessionManager = c.get("sessionManager")
|
||||||
const session = sessionManager.getOrCreate(user.id)
|
|
||||||
|
let session
|
||||||
|
try {
|
||||||
|
session = await sessionManager.getOrCreate(user.id)
|
||||||
|
} catch (err) {
|
||||||
|
console.error("[handleGetFeed] Failed to create session:", err)
|
||||||
|
return c.json({ error: "Service unavailable" }, 503)
|
||||||
|
}
|
||||||
|
|
||||||
const feed = await session.feed()
|
const feed = await session.feed()
|
||||||
|
|
||||||
@@ -46,7 +53,7 @@ async function handleGetFeed(c: Context<Env>) {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
function handleGetContext(c: Context<Env>) {
|
async function handleGetContext(c: Context<Env>) {
|
||||||
const keyParam = c.req.query("key")
|
const keyParam = c.req.query("key")
|
||||||
if (!keyParam) {
|
if (!keyParam) {
|
||||||
return c.json({ error: 'Invalid or missing "key" parameter: must be a JSON array' }, 400)
|
return c.json({ error: 'Invalid or missing "key" parameter: must be a JSON array' }, 400)
|
||||||
@@ -70,7 +77,15 @@ function handleGetContext(c: Context<Env>) {
|
|||||||
|
|
||||||
const user = c.get("user")!
|
const user = c.get("user")!
|
||||||
const sessionManager = c.get("sessionManager")
|
const sessionManager = c.get("sessionManager")
|
||||||
const session = sessionManager.getOrCreate(user.id)
|
|
||||||
|
let session
|
||||||
|
try {
|
||||||
|
session = await sessionManager.getOrCreate(user.id)
|
||||||
|
} catch (err) {
|
||||||
|
console.error("[handleGetContext] Failed to create session:", err)
|
||||||
|
return c.json({ error: "Service unavailable" }, 503)
|
||||||
|
}
|
||||||
|
|
||||||
const context = session.engine.currentContext()
|
const context = session.engine.currentContext()
|
||||||
const key = contextKey(...parsed)
|
const key = contextKey(...parsed)
|
||||||
|
|
||||||
|
|||||||
@@ -44,7 +44,15 @@ async function handleUpdateLocation(c: Context<Env>) {
|
|||||||
|
|
||||||
const user = c.get("user")!
|
const user = c.get("user")!
|
||||||
const sessionManager = c.get("sessionManager")
|
const sessionManager = c.get("sessionManager")
|
||||||
const session = sessionManager.getOrCreate(user.id)
|
|
||||||
|
let session
|
||||||
|
try {
|
||||||
|
session = await sessionManager.getOrCreate(user.id)
|
||||||
|
} catch (err) {
|
||||||
|
console.error("[handleUpdateLocation] Failed to create session:", err)
|
||||||
|
return c.json({ error: "Service unavailable" }, 503)
|
||||||
|
}
|
||||||
|
|
||||||
await session.engine.executeAction("aelis.location", "update-location", {
|
await session.engine.executeAction("aelis.location", "update-location", {
|
||||||
lat: result.lat,
|
lat: result.lat,
|
||||||
lng: result.lng,
|
lng: result.lng,
|
||||||
|
|||||||
@@ -3,9 +3,9 @@ import { Hono } from "hono"
|
|||||||
|
|
||||||
import { registerAuthHandlers } from "./auth/http.ts"
|
import { registerAuthHandlers } from "./auth/http.ts"
|
||||||
import { mockAuthSessionMiddleware, requireSession } from "./auth/session-middleware.ts"
|
import { mockAuthSessionMiddleware, requireSession } from "./auth/session-middleware.ts"
|
||||||
|
import { registerFeedHttpHandlers } from "./engine/http.ts"
|
||||||
import { createFeedEnhancer } from "./enhancement/enhance-feed.ts"
|
import { createFeedEnhancer } from "./enhancement/enhance-feed.ts"
|
||||||
import { createLlmClient } from "./enhancement/llm-client.ts"
|
import { createLlmClient } from "./enhancement/llm-client.ts"
|
||||||
import { registerFeedHttpHandlers } from "./engine/http.ts"
|
|
||||||
import { registerLocationHttpHandlers } from "./location/http.ts"
|
import { registerLocationHttpHandlers } from "./location/http.ts"
|
||||||
import { UserSessionManager } from "./session/index.ts"
|
import { UserSessionManager } from "./session/index.ts"
|
||||||
import { WeatherSourceProvider } from "./weather/provider.ts"
|
import { WeatherSourceProvider } from "./weather/provider.ts"
|
||||||
@@ -26,7 +26,7 @@ function main() {
|
|||||||
|
|
||||||
const sessionManager = new UserSessionManager({
|
const sessionManager = new UserSessionManager({
|
||||||
providers: [
|
providers: [
|
||||||
() => new LocationSource(),
|
async () => new LocationSource(),
|
||||||
new WeatherSourceProvider({
|
new WeatherSourceProvider({
|
||||||
credentials: {
|
credentials: {
|
||||||
privateKey: process.env.WEATHERKIT_PRIVATE_KEY!,
|
privateKey: process.env.WEATHERKIT_PRIVATE_KEY!,
|
||||||
|
|||||||
@@ -1,9 +1,9 @@
|
|||||||
import type { FeedSource } from "@aelis/core"
|
import type { FeedSource } from "@aelis/core"
|
||||||
|
|
||||||
export interface FeedSourceProvider {
|
export interface FeedSourceProvider {
|
||||||
feedSourceForUser(userId: string): FeedSource
|
feedSourceForUser(userId: string): Promise<FeedSource>
|
||||||
}
|
}
|
||||||
|
|
||||||
export type FeedSourceProviderFn = (userId: string) => FeedSource
|
export type FeedSourceProviderFn = (userId: string) => Promise<FeedSource>
|
||||||
|
|
||||||
export type FeedSourceProviderInput = FeedSourceProvider | FeedSourceProviderFn
|
export type FeedSourceProviderInput = FeedSourceProvider | FeedSourceProviderFn
|
||||||
|
|||||||
@@ -1,7 +1,7 @@
|
|||||||
import type { WeatherKitClient, WeatherKitResponse } from "@aelis/source-weatherkit"
|
import type { WeatherKitClient, WeatherKitResponse } from "@aelis/source-weatherkit"
|
||||||
|
|
||||||
import { LocationSource } from "@aelis/source-location"
|
import { LocationSource } from "@aelis/source-location"
|
||||||
import { describe, expect, mock, test } from "bun:test"
|
import { describe, expect, mock, spyOn, test } from "bun:test"
|
||||||
|
|
||||||
import { WeatherSourceProvider } from "../weather/provider.ts"
|
import { WeatherSourceProvider } from "../weather/provider.ts"
|
||||||
import { UserSessionManager } from "./user-session-manager.ts"
|
import { UserSessionManager } from "./user-session-manager.ts"
|
||||||
@@ -11,38 +11,38 @@ const mockWeatherClient: WeatherKitClient = {
|
|||||||
}
|
}
|
||||||
|
|
||||||
describe("UserSessionManager", () => {
|
describe("UserSessionManager", () => {
|
||||||
test("getOrCreate creates session on first call", () => {
|
test("getOrCreate creates session on first call", async () => {
|
||||||
const manager = new UserSessionManager({ providers: [() => new LocationSource()] })
|
const manager = new UserSessionManager({ providers: [async () => new LocationSource()] })
|
||||||
|
|
||||||
const session = manager.getOrCreate("user-1")
|
const session = await manager.getOrCreate("user-1")
|
||||||
|
|
||||||
expect(session).toBeDefined()
|
expect(session).toBeDefined()
|
||||||
expect(session.engine).toBeDefined()
|
expect(session.engine).toBeDefined()
|
||||||
})
|
})
|
||||||
|
|
||||||
test("getOrCreate returns same session for same user", () => {
|
test("getOrCreate returns same session for same user", async () => {
|
||||||
const manager = new UserSessionManager({ providers: [() => new LocationSource()] })
|
const manager = new UserSessionManager({ providers: [async () => new LocationSource()] })
|
||||||
|
|
||||||
const session1 = manager.getOrCreate("user-1")
|
const session1 = await manager.getOrCreate("user-1")
|
||||||
const session2 = manager.getOrCreate("user-1")
|
const session2 = await manager.getOrCreate("user-1")
|
||||||
|
|
||||||
expect(session1).toBe(session2)
|
expect(session1).toBe(session2)
|
||||||
})
|
})
|
||||||
|
|
||||||
test("getOrCreate returns different sessions for different users", () => {
|
test("getOrCreate returns different sessions for different users", async () => {
|
||||||
const manager = new UserSessionManager({ providers: [() => new LocationSource()] })
|
const manager = new UserSessionManager({ providers: [async () => new LocationSource()] })
|
||||||
|
|
||||||
const session1 = manager.getOrCreate("user-1")
|
const session1 = await manager.getOrCreate("user-1")
|
||||||
const session2 = manager.getOrCreate("user-2")
|
const session2 = await manager.getOrCreate("user-2")
|
||||||
|
|
||||||
expect(session1).not.toBe(session2)
|
expect(session1).not.toBe(session2)
|
||||||
})
|
})
|
||||||
|
|
||||||
test("each user gets independent source instances", () => {
|
test("each user gets independent source instances", async () => {
|
||||||
const manager = new UserSessionManager({ providers: [() => new LocationSource()] })
|
const manager = new UserSessionManager({ providers: [async () => new LocationSource()] })
|
||||||
|
|
||||||
const session1 = manager.getOrCreate("user-1")
|
const session1 = await manager.getOrCreate("user-1")
|
||||||
const session2 = manager.getOrCreate("user-2")
|
const session2 = await manager.getOrCreate("user-2")
|
||||||
|
|
||||||
const source1 = session1.getSource<LocationSource>("aelis.location")
|
const source1 = session1.getSource<LocationSource>("aelis.location")
|
||||||
const source2 = session2.getSource<LocationSource>("aelis.location")
|
const source2 = session2.getSource<LocationSource>("aelis.location")
|
||||||
@@ -50,58 +50,58 @@ describe("UserSessionManager", () => {
|
|||||||
expect(source1).not.toBe(source2)
|
expect(source1).not.toBe(source2)
|
||||||
})
|
})
|
||||||
|
|
||||||
test("remove destroys session and allows re-creation", () => {
|
test("remove destroys session and allows re-creation", async () => {
|
||||||
const manager = new UserSessionManager({ providers: [() => new LocationSource()] })
|
const manager = new UserSessionManager({ providers: [async () => new LocationSource()] })
|
||||||
|
|
||||||
const session1 = manager.getOrCreate("user-1")
|
const session1 = await manager.getOrCreate("user-1")
|
||||||
manager.remove("user-1")
|
manager.remove("user-1")
|
||||||
const session2 = manager.getOrCreate("user-1")
|
const session2 = await manager.getOrCreate("user-1")
|
||||||
|
|
||||||
expect(session1).not.toBe(session2)
|
expect(session1).not.toBe(session2)
|
||||||
})
|
})
|
||||||
|
|
||||||
test("remove is no-op for unknown user", () => {
|
test("remove is no-op for unknown user", () => {
|
||||||
const manager = new UserSessionManager({ providers: [() => new LocationSource()] })
|
const manager = new UserSessionManager({ providers: [async () => new LocationSource()] })
|
||||||
|
|
||||||
expect(() => manager.remove("unknown")).not.toThrow()
|
expect(() => manager.remove("unknown")).not.toThrow()
|
||||||
})
|
})
|
||||||
|
|
||||||
test("accepts function providers", async () => {
|
test("accepts function providers", async () => {
|
||||||
const manager = new UserSessionManager({ providers: [() => new LocationSource()] })
|
const manager = new UserSessionManager({ providers: [async () => new LocationSource()] })
|
||||||
|
|
||||||
const session = manager.getOrCreate("user-1")
|
const session = await manager.getOrCreate("user-1")
|
||||||
const result = await session.engine.refresh()
|
const result = await session.engine.refresh()
|
||||||
|
|
||||||
expect(result.errors).toHaveLength(0)
|
expect(result.errors).toHaveLength(0)
|
||||||
})
|
})
|
||||||
|
|
||||||
test("accepts object providers", () => {
|
test("accepts object providers", async () => {
|
||||||
const provider = new WeatherSourceProvider({ client: mockWeatherClient })
|
const provider = new WeatherSourceProvider({ client: mockWeatherClient })
|
||||||
const manager = new UserSessionManager({
|
const manager = new UserSessionManager({
|
||||||
providers: [() => new LocationSource(), provider],
|
providers: [async () => new LocationSource(), provider],
|
||||||
})
|
})
|
||||||
|
|
||||||
const session = manager.getOrCreate("user-1")
|
const session = await manager.getOrCreate("user-1")
|
||||||
|
|
||||||
expect(session.getSource("aelis.weather")).toBeDefined()
|
expect(session.getSource("aelis.weather")).toBeDefined()
|
||||||
})
|
})
|
||||||
|
|
||||||
test("accepts mixed providers", () => {
|
test("accepts mixed providers", async () => {
|
||||||
const provider = new WeatherSourceProvider({ client: mockWeatherClient })
|
const provider = new WeatherSourceProvider({ client: mockWeatherClient })
|
||||||
const manager = new UserSessionManager({
|
const manager = new UserSessionManager({
|
||||||
providers: [() => new LocationSource(), provider],
|
providers: [async () => new LocationSource(), provider],
|
||||||
})
|
})
|
||||||
|
|
||||||
const session = manager.getOrCreate("user-1")
|
const session = await manager.getOrCreate("user-1")
|
||||||
|
|
||||||
expect(session.getSource("aelis.location")).toBeDefined()
|
expect(session.getSource("aelis.location")).toBeDefined()
|
||||||
expect(session.getSource("aelis.weather")).toBeDefined()
|
expect(session.getSource("aelis.weather")).toBeDefined()
|
||||||
})
|
})
|
||||||
|
|
||||||
test("refresh returns feed result through session", async () => {
|
test("refresh returns feed result through session", async () => {
|
||||||
const manager = new UserSessionManager({ providers: [() => new LocationSource()] })
|
const manager = new UserSessionManager({ providers: [async () => new LocationSource()] })
|
||||||
|
|
||||||
const session = manager.getOrCreate("user-1")
|
const session = await manager.getOrCreate("user-1")
|
||||||
const result = await session.engine.refresh()
|
const result = await session.engine.refresh()
|
||||||
|
|
||||||
expect(result).toHaveProperty("context")
|
expect(result).toHaveProperty("context")
|
||||||
@@ -111,9 +111,9 @@ describe("UserSessionManager", () => {
|
|||||||
})
|
})
|
||||||
|
|
||||||
test("location update via executeAction works", async () => {
|
test("location update via executeAction works", async () => {
|
||||||
const manager = new UserSessionManager({ providers: [() => new LocationSource()] })
|
const manager = new UserSessionManager({ providers: [async () => new LocationSource()] })
|
||||||
|
|
||||||
const session = manager.getOrCreate("user-1")
|
const session = await manager.getOrCreate("user-1")
|
||||||
await session.engine.executeAction("aelis.location", "update-location", {
|
await session.engine.executeAction("aelis.location", "update-location", {
|
||||||
lat: 51.5074,
|
lat: 51.5074,
|
||||||
lng: -0.1278,
|
lng: -0.1278,
|
||||||
@@ -126,10 +126,10 @@ describe("UserSessionManager", () => {
|
|||||||
})
|
})
|
||||||
|
|
||||||
test("subscribe receives updates after location push", async () => {
|
test("subscribe receives updates after location push", async () => {
|
||||||
const manager = new UserSessionManager({ providers: [() => new LocationSource()] })
|
const manager = new UserSessionManager({ providers: [async () => new LocationSource()] })
|
||||||
const callback = mock()
|
const callback = mock()
|
||||||
|
|
||||||
const session = manager.getOrCreate("user-1")
|
const session = await manager.getOrCreate("user-1")
|
||||||
session.engine.subscribe(callback)
|
session.engine.subscribe(callback)
|
||||||
|
|
||||||
await session.engine.executeAction("aelis.location", "update-location", {
|
await session.engine.executeAction("aelis.location", "update-location", {
|
||||||
@@ -146,16 +146,16 @@ describe("UserSessionManager", () => {
|
|||||||
})
|
})
|
||||||
|
|
||||||
test("remove stops reactive updates", async () => {
|
test("remove stops reactive updates", async () => {
|
||||||
const manager = new UserSessionManager({ providers: [() => new LocationSource()] })
|
const manager = new UserSessionManager({ providers: [async () => new LocationSource()] })
|
||||||
const callback = mock()
|
const callback = mock()
|
||||||
|
|
||||||
const session = manager.getOrCreate("user-1")
|
const session = await manager.getOrCreate("user-1")
|
||||||
session.engine.subscribe(callback)
|
session.engine.subscribe(callback)
|
||||||
|
|
||||||
manager.remove("user-1")
|
manager.remove("user-1")
|
||||||
|
|
||||||
// Create new session and push location — old callback should not fire
|
// Create new session and push location — old callback should not fire
|
||||||
const session2 = manager.getOrCreate("user-1")
|
const session2 = await manager.getOrCreate("user-1")
|
||||||
await session2.engine.executeAction("aelis.location", "update-location", {
|
await session2.engine.executeAction("aelis.location", "update-location", {
|
||||||
lat: 51.5074,
|
lat: 51.5074,
|
||||||
lng: -0.1278,
|
lng: -0.1278,
|
||||||
@@ -167,4 +167,62 @@ describe("UserSessionManager", () => {
|
|||||||
|
|
||||||
expect(callback).not.toHaveBeenCalled()
|
expect(callback).not.toHaveBeenCalled()
|
||||||
})
|
})
|
||||||
|
|
||||||
|
test("creates session with successful providers when some fail", async () => {
|
||||||
|
const manager = new UserSessionManager({
|
||||||
|
providers: [
|
||||||
|
async () => new LocationSource(),
|
||||||
|
async () => {
|
||||||
|
throw new Error("provider failed")
|
||||||
|
},
|
||||||
|
],
|
||||||
|
})
|
||||||
|
|
||||||
|
const spy = spyOn(console, "error").mockImplementation(() => {})
|
||||||
|
|
||||||
|
const session = await manager.getOrCreate("user-1")
|
||||||
|
|
||||||
|
expect(session).toBeDefined()
|
||||||
|
expect(session.getSource("aelis.location")).toBeDefined()
|
||||||
|
expect(spy).toHaveBeenCalled()
|
||||||
|
|
||||||
|
spy.mockRestore()
|
||||||
|
})
|
||||||
|
|
||||||
|
test("throws AggregateError when all providers fail", async () => {
|
||||||
|
const manager = new UserSessionManager({
|
||||||
|
providers: [
|
||||||
|
async () => {
|
||||||
|
throw new Error("first failed")
|
||||||
|
},
|
||||||
|
async () => {
|
||||||
|
throw new Error("second failed")
|
||||||
|
},
|
||||||
|
],
|
||||||
|
})
|
||||||
|
|
||||||
|
await expect(manager.getOrCreate("user-1")).rejects.toBeInstanceOf(AggregateError)
|
||||||
|
})
|
||||||
|
|
||||||
|
test("concurrent getOrCreate for same user returns same session", async () => {
|
||||||
|
let callCount = 0
|
||||||
|
const manager = new UserSessionManager({
|
||||||
|
providers: [
|
||||||
|
async () => {
|
||||||
|
callCount++
|
||||||
|
// Simulate async work to widen the race window
|
||||||
|
await new Promise((resolve) => setTimeout(resolve, 10))
|
||||||
|
return new LocationSource()
|
||||||
|
},
|
||||||
|
],
|
||||||
|
})
|
||||||
|
|
||||||
|
const [session1, session2] = await Promise.all([
|
||||||
|
manager.getOrCreate("user-1"),
|
||||||
|
manager.getOrCreate("user-1"),
|
||||||
|
])
|
||||||
|
|
||||||
|
expect(session1).toBe(session2)
|
||||||
|
expect(callCount).toBe(1)
|
||||||
|
})
|
||||||
})
|
})
|
||||||
|
|||||||
@@ -1,3 +1,5 @@
|
|||||||
|
import type { FeedSource } from "@aelis/core"
|
||||||
|
|
||||||
import type { FeedEnhancer } from "../enhancement/enhance-feed.ts"
|
import type { FeedEnhancer } from "../enhancement/enhance-feed.ts"
|
||||||
import type { FeedSourceProviderInput } from "./feed-source-provider.ts"
|
import type { FeedSourceProviderInput } from "./feed-source-provider.ts"
|
||||||
|
|
||||||
@@ -10,6 +12,7 @@ export interface UserSessionManagerConfig {
|
|||||||
|
|
||||||
export class UserSessionManager {
|
export class UserSessionManager {
|
||||||
private sessions = new Map<string, UserSession>()
|
private sessions = new Map<string, UserSession>()
|
||||||
|
private pending = new Map<string, Promise<UserSession>>()
|
||||||
private readonly providers: FeedSourceProviderInput[]
|
private readonly providers: FeedSourceProviderInput[]
|
||||||
private readonly feedEnhancer: FeedEnhancer | null
|
private readonly feedEnhancer: FeedEnhancer | null
|
||||||
|
|
||||||
@@ -18,16 +21,22 @@ export class UserSessionManager {
|
|||||||
this.feedEnhancer = config.feedEnhancer ?? null
|
this.feedEnhancer = config.feedEnhancer ?? null
|
||||||
}
|
}
|
||||||
|
|
||||||
getOrCreate(userId: string): UserSession {
|
async getOrCreate(userId: string): Promise<UserSession> {
|
||||||
let session = this.sessions.get(userId)
|
const existing = this.sessions.get(userId)
|
||||||
if (!session) {
|
if (existing) return existing
|
||||||
const sources = this.providers.map((p) =>
|
|
||||||
typeof p === "function" ? p(userId) : p.feedSourceForUser(userId),
|
const inflight = this.pending.get(userId)
|
||||||
)
|
if (inflight) return inflight
|
||||||
session = new UserSession(sources, this.feedEnhancer)
|
|
||||||
|
const promise = this.createSession(userId)
|
||||||
|
this.pending.set(userId, promise)
|
||||||
|
try {
|
||||||
|
const session = await promise
|
||||||
this.sessions.set(userId, session)
|
this.sessions.set(userId, session)
|
||||||
|
return session
|
||||||
|
} finally {
|
||||||
|
this.pending.delete(userId)
|
||||||
}
|
}
|
||||||
return session
|
|
||||||
}
|
}
|
||||||
|
|
||||||
remove(userId: string): void {
|
remove(userId: string): void {
|
||||||
@@ -37,4 +46,33 @@ export class UserSessionManager {
|
|||||||
this.sessions.delete(userId)
|
this.sessions.delete(userId)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private async createSession(userId: string): Promise<UserSession> {
|
||||||
|
const results = await Promise.allSettled(
|
||||||
|
this.providers.map((p) =>
|
||||||
|
typeof p === "function" ? p(userId) : p.feedSourceForUser(userId),
|
||||||
|
),
|
||||||
|
)
|
||||||
|
|
||||||
|
const sources: FeedSource[] = []
|
||||||
|
const errors: unknown[] = []
|
||||||
|
|
||||||
|
for (const result of results) {
|
||||||
|
if (result.status === "fulfilled") {
|
||||||
|
sources.push(result.value)
|
||||||
|
} else {
|
||||||
|
errors.push(result.reason)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (sources.length === 0 && errors.length > 0) {
|
||||||
|
throw new AggregateError(errors, "All feed source providers failed")
|
||||||
|
}
|
||||||
|
|
||||||
|
for (const error of errors) {
|
||||||
|
console.error("[UserSessionManager] Feed source provider failed:", error)
|
||||||
|
}
|
||||||
|
|
||||||
|
return new UserSession(sources, this.feedEnhancer)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -13,7 +13,7 @@ export class TflSourceProvider implements FeedSourceProvider {
|
|||||||
this.options = options
|
this.options = options
|
||||||
}
|
}
|
||||||
|
|
||||||
feedSourceForUser(_userId: string): TflSource {
|
async feedSourceForUser(_userId: string): Promise<TflSource> {
|
||||||
return new TflSource(this.options)
|
return new TflSource(this.options)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -9,7 +9,7 @@ export class WeatherSourceProvider implements FeedSourceProvider {
|
|||||||
this.options = options
|
this.options = options
|
||||||
}
|
}
|
||||||
|
|
||||||
feedSourceForUser(_userId: string): WeatherSource {
|
async feedSourceForUser(_userId: string): Promise<WeatherSource> {
|
||||||
return new WeatherSource(this.options)
|
return new WeatherSource(this.options)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user