feat(session): query enabled sources before providers (#85)

UserSessionManager now queries the user_sources table for enabled
sources before calling any provider. Providers receive the per-user
JSON config directly instead of querying the DB themselves, removing
their db dependency and eliminating redundant round-trips.

Co-authored-by: Ona <no-reply@ona.com>
This commit is contained in:
2026-03-22 16:28:19 +00:00
committed by GitHub
parent b24d879d31
commit a6be7b31e7
13 changed files with 368 additions and 200 deletions

View File

@@ -1,11 +1,14 @@
import type { FeedSource } from "@aelis/core"
import type { Database } from "../db/index.ts"
import type { FeedEnhancer } from "../enhancement/enhance-feed.ts"
import type { FeedSourceProvider } from "./feed-source-provider.ts"
import { sources } from "../sources/user-sources.ts"
import { UserSession } from "./user-session.ts"
export interface UserSessionManagerConfig {
db: Database
providers: FeedSourceProvider[]
feedEnhancer?: FeedEnhancer | null
}
@@ -13,10 +16,12 @@ export interface UserSessionManagerConfig {
export class UserSessionManager {
private sessions = new Map<string, UserSession>()
private pending = new Map<string, Promise<UserSession>>()
private readonly db: Database
private readonly providers = new Map<string, FeedSourceProvider>()
private readonly feedEnhancer: FeedEnhancer | null
constructor(config: UserSessionManagerConfig) {
this.db = config.db
for (const provider of config.providers) {
this.providers.set(provider.sourceId, provider)
}
@@ -64,8 +69,9 @@ export class UserSessionManager {
/**
* Replaces a provider and updates all active sessions.
* The new provider must have the same sourceId as an existing one.
* For each active session, re-resolves the source via session.refreshSource.
* If the provider fails for a user, the existing source is kept.
* For each active session, queries the user's source config from the DB
* and re-resolves the source. If the provider fails for a user, the
* existing source is kept.
*/
async replaceProvider(provider: FeedSourceProvider): Promise<void> {
if (!this.providers.has(provider.sourceId)) {
@@ -79,7 +85,7 @@ export class UserSessionManager {
const updates: Promise<void>[] = []
for (const [, session] of this.sessions) {
updates.push(session.refreshSource(provider))
updates.push(this.refreshSessionSource(session, provider))
}
// Also update sessions that are currently being created so they
@@ -87,7 +93,7 @@ export class UserSessionManager {
for (const [, pendingPromise] of this.pending) {
updates.push(
pendingPromise
.then((session) => session.refreshSource(provider))
.then((session) => this.refreshSessionSource(session, provider))
.catch(() => {
// Session creation itself failed — nothing to update.
}),
@@ -97,23 +103,60 @@ export class UserSessionManager {
await Promise.all(updates)
}
private async createSession(userId: string): Promise<UserSession> {
const results = await Promise.allSettled(
Array.from(this.providers.values()).map((p) => p.feedSourceForUser(userId)),
)
/**
* Re-resolves a single source for a session by querying the user's config
* from the DB and calling the provider. If the provider fails, the existing
* source is kept.
*/
private async refreshSessionSource(
session: UserSession,
provider: FeedSourceProvider,
): Promise<void> {
if (!session.hasSource(provider.sourceId)) return
const sources: FeedSource[] = []
try {
const row = await sources(this.db, session.userId).find(provider.sourceId)
if (!row?.enabled) return
const newSource = await provider.feedSourceForUser(session.userId, row.config ?? {})
session.replaceSource(provider.sourceId, newSource)
} catch (err) {
console.error(
`[UserSessionManager] refreshSource("${provider.sourceId}") failed for user ${session.userId}:`,
err,
)
}
}
private async createSession(userId: string): Promise<UserSession> {
const enabledRows = await sources(this.db, userId).enabled()
const promises: Promise<FeedSource>[] = []
for (const row of enabledRows) {
const provider = this.providers.get(row.sourceId)
if (provider) {
promises.push(provider.feedSourceForUser(userId, row.config ?? {}))
}
}
if (promises.length === 0) {
return new UserSession(userId, [], this.feedEnhancer)
}
const results = await Promise.allSettled(promises)
const feedSources: FeedSource[] = []
const errors: unknown[] = []
for (const result of results) {
if (result.status === "fulfilled") {
sources.push(result.value)
feedSources.push(result.value)
} else {
errors.push(result.reason)
}
}
if (sources.length === 0 && errors.length > 0) {
if (feedSources.length === 0 && errors.length > 0) {
throw new AggregateError(errors, "All feed source providers failed")
}
@@ -121,6 +164,6 @@ export class UserSessionManager {
console.error("[UserSessionManager] Feed source provider failed:", error)
}
return new UserSession(userId, sources, this.feedEnhancer)
return new UserSession(userId, feedSources, this.feedEnhancer)
}
}