feat(backend): make FeedSourceProvider async (#77)

* feat(backend): make FeedSourceProvider async

Make feedSourceForUser and FeedSourceProviderFn return promises.
Use Promise.allSettled to tolerate partial provider failures.
Guard concurrent getOrCreate calls with in-flight promise dedup.
Return 503 from HTTP handlers when session creation fails.

Co-authored-by: Ona <no-reply@ona.com>

* fix(backend): handle remove() during in-flight session creation

Cancel pending getOrCreate when remove() is called mid-flight.
Destroy the resulting session to prevent it from leaking.

Co-authored-by: Ona <no-reply@ona.com>

---------

Co-authored-by: Ona <no-reply@ona.com>
This commit is contained in:
2026-03-15 22:57:19 +00:00
committed by GitHub
parent 8eedd1f4fd
commit 0b51b97f6c
9 changed files with 255 additions and 76 deletions

View File

@@ -1,9 +1,9 @@
import type { FeedSource } from "@aelis/core"
export interface FeedSourceProvider {
feedSourceForUser(userId: string): FeedSource
feedSourceForUser(userId: string): Promise<FeedSource>
}
export type FeedSourceProviderFn = (userId: string) => FeedSource
export type FeedSourceProviderFn = (userId: string) => Promise<FeedSource>
export type FeedSourceProviderInput = FeedSourceProvider | FeedSourceProviderFn

View File

@@ -1,7 +1,7 @@
import type { WeatherKitClient, WeatherKitResponse } from "@aelis/source-weatherkit"
import { LocationSource } from "@aelis/source-location"
import { describe, expect, mock, test } from "bun:test"
import { describe, expect, mock, spyOn, test } from "bun:test"
import { WeatherSourceProvider } from "../weather/provider.ts"
import { UserSessionManager } from "./user-session-manager.ts"
@@ -11,38 +11,38 @@ const mockWeatherClient: WeatherKitClient = {
}
describe("UserSessionManager", () => {
test("getOrCreate creates session on first call", () => {
const manager = new UserSessionManager({ providers: [() => new LocationSource()] })
test("getOrCreate creates session on first call", async () => {
const manager = new UserSessionManager({ providers: [async () => new LocationSource()] })
const session = manager.getOrCreate("user-1")
const session = await manager.getOrCreate("user-1")
expect(session).toBeDefined()
expect(session.engine).toBeDefined()
})
test("getOrCreate returns same session for same user", () => {
const manager = new UserSessionManager({ providers: [() => new LocationSource()] })
test("getOrCreate returns same session for same user", async () => {
const manager = new UserSessionManager({ providers: [async () => new LocationSource()] })
const session1 = manager.getOrCreate("user-1")
const session2 = manager.getOrCreate("user-1")
const session1 = await manager.getOrCreate("user-1")
const session2 = await manager.getOrCreate("user-1")
expect(session1).toBe(session2)
})
test("getOrCreate returns different sessions for different users", () => {
const manager = new UserSessionManager({ providers: [() => new LocationSource()] })
test("getOrCreate returns different sessions for different users", async () => {
const manager = new UserSessionManager({ providers: [async () => new LocationSource()] })
const session1 = manager.getOrCreate("user-1")
const session2 = manager.getOrCreate("user-2")
const session1 = await manager.getOrCreate("user-1")
const session2 = await manager.getOrCreate("user-2")
expect(session1).not.toBe(session2)
})
test("each user gets independent source instances", () => {
const manager = new UserSessionManager({ providers: [() => new LocationSource()] })
test("each user gets independent source instances", async () => {
const manager = new UserSessionManager({ providers: [async () => new LocationSource()] })
const session1 = manager.getOrCreate("user-1")
const session2 = manager.getOrCreate("user-2")
const session1 = await manager.getOrCreate("user-1")
const session2 = await manager.getOrCreate("user-2")
const source1 = session1.getSource<LocationSource>("aelis.location")
const source2 = session2.getSource<LocationSource>("aelis.location")
@@ -50,58 +50,58 @@ describe("UserSessionManager", () => {
expect(source1).not.toBe(source2)
})
test("remove destroys session and allows re-creation", () => {
const manager = new UserSessionManager({ providers: [() => new LocationSource()] })
test("remove destroys session and allows re-creation", async () => {
const manager = new UserSessionManager({ providers: [async () => new LocationSource()] })
const session1 = manager.getOrCreate("user-1")
const session1 = await manager.getOrCreate("user-1")
manager.remove("user-1")
const session2 = manager.getOrCreate("user-1")
const session2 = await manager.getOrCreate("user-1")
expect(session1).not.toBe(session2)
})
test("remove is no-op for unknown user", () => {
const manager = new UserSessionManager({ providers: [() => new LocationSource()] })
const manager = new UserSessionManager({ providers: [async () => new LocationSource()] })
expect(() => manager.remove("unknown")).not.toThrow()
})
test("accepts function providers", async () => {
const manager = new UserSessionManager({ providers: [() => new LocationSource()] })
const manager = new UserSessionManager({ providers: [async () => new LocationSource()] })
const session = manager.getOrCreate("user-1")
const session = await manager.getOrCreate("user-1")
const result = await session.engine.refresh()
expect(result.errors).toHaveLength(0)
})
test("accepts object providers", () => {
test("accepts object providers", async () => {
const provider = new WeatherSourceProvider({ client: mockWeatherClient })
const manager = new UserSessionManager({
providers: [() => new LocationSource(), provider],
providers: [async () => new LocationSource(), provider],
})
const session = manager.getOrCreate("user-1")
const session = await manager.getOrCreate("user-1")
expect(session.getSource("aelis.weather")).toBeDefined()
})
test("accepts mixed providers", () => {
test("accepts mixed providers", async () => {
const provider = new WeatherSourceProvider({ client: mockWeatherClient })
const manager = new UserSessionManager({
providers: [() => new LocationSource(), provider],
providers: [async () => new LocationSource(), provider],
})
const session = manager.getOrCreate("user-1")
const session = await manager.getOrCreate("user-1")
expect(session.getSource("aelis.location")).toBeDefined()
expect(session.getSource("aelis.weather")).toBeDefined()
})
test("refresh returns feed result through session", async () => {
const manager = new UserSessionManager({ providers: [() => new LocationSource()] })
const manager = new UserSessionManager({ providers: [async () => new LocationSource()] })
const session = manager.getOrCreate("user-1")
const session = await manager.getOrCreate("user-1")
const result = await session.engine.refresh()
expect(result).toHaveProperty("context")
@@ -111,9 +111,9 @@ describe("UserSessionManager", () => {
})
test("location update via executeAction works", async () => {
const manager = new UserSessionManager({ providers: [() => new LocationSource()] })
const manager = new UserSessionManager({ providers: [async () => new LocationSource()] })
const session = manager.getOrCreate("user-1")
const session = await manager.getOrCreate("user-1")
await session.engine.executeAction("aelis.location", "update-location", {
lat: 51.5074,
lng: -0.1278,
@@ -126,10 +126,10 @@ describe("UserSessionManager", () => {
})
test("subscribe receives updates after location push", async () => {
const manager = new UserSessionManager({ providers: [() => new LocationSource()] })
const manager = new UserSessionManager({ providers: [async () => new LocationSource()] })
const callback = mock()
const session = manager.getOrCreate("user-1")
const session = await manager.getOrCreate("user-1")
session.engine.subscribe(callback)
await session.engine.executeAction("aelis.location", "update-location", {
@@ -146,16 +146,16 @@ describe("UserSessionManager", () => {
})
test("remove stops reactive updates", async () => {
const manager = new UserSessionManager({ providers: [() => new LocationSource()] })
const manager = new UserSessionManager({ providers: [async () => new LocationSource()] })
const callback = mock()
const session = manager.getOrCreate("user-1")
const session = await manager.getOrCreate("user-1")
session.engine.subscribe(callback)
manager.remove("user-1")
// Create new session and push location — old callback should not fire
const session2 = manager.getOrCreate("user-1")
const session2 = await manager.getOrCreate("user-1")
await session2.engine.executeAction("aelis.location", "update-location", {
lat: 51.5074,
lng: -0.1278,
@@ -167,4 +167,93 @@ describe("UserSessionManager", () => {
expect(callback).not.toHaveBeenCalled()
})
test("creates session with successful providers when some fail", async () => {
const manager = new UserSessionManager({
providers: [
async () => new LocationSource(),
async () => {
throw new Error("provider failed")
},
],
})
const spy = spyOn(console, "error").mockImplementation(() => {})
const session = await manager.getOrCreate("user-1")
expect(session).toBeDefined()
expect(session.getSource("aelis.location")).toBeDefined()
expect(spy).toHaveBeenCalled()
spy.mockRestore()
})
test("throws AggregateError when all providers fail", async () => {
const manager = new UserSessionManager({
providers: [
async () => {
throw new Error("first failed")
},
async () => {
throw new Error("second failed")
},
],
})
await expect(manager.getOrCreate("user-1")).rejects.toBeInstanceOf(AggregateError)
})
test("concurrent getOrCreate for same user returns same session", async () => {
let callCount = 0
const manager = new UserSessionManager({
providers: [
async () => {
callCount++
// Simulate async work to widen the race window
await new Promise((resolve) => setTimeout(resolve, 10))
return new LocationSource()
},
],
})
const [session1, session2] = await Promise.all([
manager.getOrCreate("user-1"),
manager.getOrCreate("user-1"),
])
expect(session1).toBe(session2)
expect(callCount).toBe(1)
})
test("remove during in-flight getOrCreate prevents session from being stored", async () => {
let resolveProvider: () => void
const providerGate = new Promise<void>((r) => {
resolveProvider = r
})
const manager = new UserSessionManager({
providers: [
async () => {
await providerGate
return new LocationSource()
},
],
})
const sessionPromise = manager.getOrCreate("user-1")
// remove() while provider is still resolving
manager.remove("user-1")
// Let the provider finish
resolveProvider!()
await expect(sessionPromise).rejects.toThrow("removed during creation")
// A fresh getOrCreate should produce a new session, not the cancelled one
const freshSession = await manager.getOrCreate("user-1")
expect(freshSession).toBeDefined()
expect(freshSession.engine).toBeDefined()
})
})

View File

@@ -1,3 +1,5 @@
import type { FeedSource } from "@aelis/core"
import type { FeedEnhancer } from "../enhancement/enhance-feed.ts"
import type { FeedSourceProviderInput } from "./feed-source-provider.ts"
@@ -10,6 +12,7 @@ export interface UserSessionManagerConfig {
export class UserSessionManager {
private sessions = new Map<string, UserSession>()
private pending = new Map<string, Promise<UserSession>>()
private readonly providers: FeedSourceProviderInput[]
private readonly feedEnhancer: FeedEnhancer | null
@@ -18,16 +21,28 @@ export class UserSessionManager {
this.feedEnhancer = config.feedEnhancer ?? null
}
getOrCreate(userId: string): UserSession {
let session = this.sessions.get(userId)
if (!session) {
const sources = this.providers.map((p) =>
typeof p === "function" ? p(userId) : p.feedSourceForUser(userId),
)
session = new UserSession(sources, this.feedEnhancer)
async getOrCreate(userId: string): Promise<UserSession> {
const existing = this.sessions.get(userId)
if (existing) return existing
const inflight = this.pending.get(userId)
if (inflight) return inflight
const promise = this.createSession(userId)
this.pending.set(userId, promise)
try {
const session = await promise
// If remove() was called while we were awaiting, it clears the
// pending entry. Detect that and destroy the session immediately.
if (!this.pending.has(userId)) {
session.destroy()
throw new Error(`Session for user ${userId} was removed during creation`)
}
this.sessions.set(userId, session)
return session
} finally {
this.pending.delete(userId)
}
return session
}
remove(userId: string): void {
@@ -36,5 +51,36 @@ export class UserSessionManager {
session.destroy()
this.sessions.delete(userId)
}
// Cancel any in-flight creation so getOrCreate won't store the session
this.pending.delete(userId)
}
private async createSession(userId: string): Promise<UserSession> {
const results = await Promise.allSettled(
this.providers.map((p) =>
typeof p === "function" ? p(userId) : p.feedSourceForUser(userId),
),
)
const sources: FeedSource[] = []
const errors: unknown[] = []
for (const result of results) {
if (result.status === "fulfilled") {
sources.push(result.value)
} else {
errors.push(result.reason)
}
}
if (sources.length === 0 && errors.length > 0) {
throw new AggregateError(errors, "All feed source providers failed")
}
for (const error of errors) {
console.error("[UserSessionManager] Feed source provider failed:", error)
}
return new UserSession(sources, this.feedEnhancer)
}
}