Compare commits

..

8 Commits

Author SHA1 Message Date
682df6a573 fix(backend): disable reasoning and fallback to reasoning field
Set reasoning effort to none in the LLM client to reduce latency
and token usage. Fall back to the reasoning field when content is
absent in the response.

Co-authored-by: Ona <no-reply@ona.com>
2026-03-22 22:46:03 +00:00
a52addebd8 feat(backend): add GET /api/sources/:sourceId (#89)
Return { enabled, config } for a user's source. Defaults to
{ enabled: false, config: {} } when no row exists.

Co-authored-by: Ona <no-reply@ona.com>
2026-03-22 21:45:17 +00:00
4cef7f2ea1 feat(backend): add PUT /api/sources/:sourceId (#87)
Add a PUT endpoint that inserts or fully replaces a user's source
config. Unlike PATCH (which deep-merges and requires an existing row),
PUT requires both `enabled` and `config`, performs an upsert via
INSERT ... ON CONFLICT DO UPDATE, and replaces config entirely.

- Add `upsertConfig` to user-sources data layer
- Add `upsertSourceConfig` to UserSessionManager
- Add `addSource` to UserSession for new source registration
- 12 new tests covering insert, replace, validation, and session refresh

Co-authored-by: Ona <no-reply@ona.com>
2026-03-22 18:37:40 +00:00
dd2b37938f feat(backend): add PATCH /api/sources/:sourceId (#86)
Add endpoint for users to update their source config
and enabled state. Config is deep-merged with existing
values via lodash.merge and validated against the
provider's schema before persisting.

Co-authored-by: Ona <no-reply@ona.com>
2026-03-22 17:57:54 +00:00
a6be7b31e7 feat(session): query enabled sources before providers (#85)
UserSessionManager now queries the user_sources table for enabled
sources before calling any provider. Providers receive the per-user
JSON config directly instead of querying the DB themselves, removing
their db dependency and eliminating redundant round-trips.

Co-authored-by: Ona <no-reply@ona.com>
2026-03-22 16:28:19 +00:00
b24d879d31 feat(session): add per-user source refresh (#84)
* feat(session): add per-user source refresh

Add refreshSource(provider) to UserSession so per-user
config changes can re-resolve a source without replacing
the global provider.

- UserSession now carries userId
- Simplify UserSessionManager sessions map
- replaceProvider delegates to session.refreshSource
- Remove updateSessionSource from manager

Co-authored-by: Ona <no-reply@ona.com>

* docs: fix stale jsdoc on provider failure behavior

Co-authored-by: Ona <no-reply@ona.com>

---------

Co-authored-by: Ona <no-reply@ona.com>
2026-03-22 00:13:22 +00:00
7862a6d367 feat(backend): add admin API with provider config endpoint (#83)
* feat(backend): add admin API with provider config endpoint

Add /api/admin/* route group with admin role middleware and a
PUT /api/admin/:sourceId/config endpoint for updating feed source
provider config at runtime. Currently supports aelis.weather.

Co-authored-by: Ona <no-reply@ona.com>

* test: remove weak active session test

Co-authored-by: Ona <no-reply@ona.com>

---------

Co-authored-by: Ona <no-reply@ona.com>
2026-03-21 19:01:43 +00:00
0095d9cd72 feat: runtime provider hotswap (#82)
Add ability to replace a FeedSourceProvider at runtime and propagate
the new source to all active (and pending) user sessions, invalidating
their feed caches.

Co-authored-by: Ona <no-reply@ona.com>
2026-03-19 23:32:29 +00:00
21 changed files with 1792 additions and 146 deletions

View File

@@ -25,9 +25,11 @@
"arktype": "^2.1.29", "arktype": "^2.1.29",
"better-auth": "^1", "better-auth": "^1",
"drizzle-orm": "^0.45.1", "drizzle-orm": "^0.45.1",
"hono": "^4" "hono": "^4",
"lodash.merge": "^4.6.2"
}, },
"devDependencies": { "devDependencies": {
"@types/lodash.merge": "^4.6.9",
"drizzle-kit": "^0.31.9" "drizzle-kit": "^0.31.9"
} }
} }

View File

@@ -0,0 +1,195 @@
import type { ActionDefinition, ContextEntry, FeedItem, FeedSource } from "@aelis/core"
import { describe, expect, mock, test } from "bun:test"
import { Hono } from "hono"
import type { AdminMiddleware } from "../auth/admin-middleware.ts"
import type { AuthSession, AuthUser } from "../auth/session.ts"
import type { Database } from "../db/index.ts"
import type { FeedSourceProvider } from "../session/feed-source-provider.ts"
import { UserSessionManager } from "../session/user-session-manager.ts"
import { registerAdminHttpHandlers } from "./http.ts"
let mockEnabledSourceIds: string[] = []
mock.module("../sources/user-sources.ts", () => ({
sources: (_db: Database, _userId: string) => ({
async enabled() {
const now = new Date()
return mockEnabledSourceIds.map((sourceId) => ({
id: crypto.randomUUID(),
userId: _userId,
sourceId,
enabled: true,
config: {},
credentials: null,
createdAt: now,
updatedAt: now,
}))
},
async find(sourceId: string) {
const now = new Date()
return {
id: crypto.randomUUID(),
userId: _userId,
sourceId,
enabled: true,
config: {},
credentials: null,
createdAt: now,
updatedAt: now,
}
},
}),
}))
function createStubSource(id: string): FeedSource {
return {
id,
async listActions(): Promise<Record<string, ActionDefinition>> {
return {}
},
async executeAction(): Promise<unknown> {
return undefined
},
async fetchContext(): Promise<readonly ContextEntry[] | null> {
return null
},
async fetchItems(): Promise<FeedItem[]> {
return []
},
}
}
function createStubProvider(sourceId: string): FeedSourceProvider {
return {
sourceId,
async feedSourceForUser() {
return createStubSource(sourceId)
},
}
}
/** Passthrough admin middleware for testing (assumes admin). */
function passthroughAdminMiddleware(): AdminMiddleware {
const now = new Date()
return async (c, next) => {
c.set("user", {
id: "admin-1",
name: "Admin",
email: "admin@test.com",
emailVerified: true,
image: null,
createdAt: now,
updatedAt: now,
role: "admin",
banned: false,
banReason: null,
banExpires: null,
} as AuthUser)
c.set("session", { id: "sess-1" } as AuthSession)
await next()
}
}
const fakeDb = {} as Database
function createApp(providers: FeedSourceProvider[]) {
mockEnabledSourceIds = providers.map((p) => p.sourceId)
const sessionManager = new UserSessionManager({ db: fakeDb, providers })
const app = new Hono()
registerAdminHttpHandlers(app, {
sessionManager,
adminMiddleware: passthroughAdminMiddleware(),
db: fakeDb,
})
return { app, sessionManager }
}
const validWeatherConfig = {
credentials: {
privateKey: "pk-123",
keyId: "key-456",
teamId: "team-789",
serviceId: "svc-abc",
},
}
describe("PUT /api/admin/:sourceId/config", () => {
test("returns 404 for unknown provider", async () => {
const { app } = createApp([createStubProvider("aelis.location")])
const res = await app.request("/api/admin/aelis.nonexistent/config", {
method: "PUT",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ key: "value" }),
})
expect(res.status).toBe(404)
const body = (await res.json()) as { error: string }
expect(body.error).toContain("not found")
})
test("returns 404 for provider without runtime config support", async () => {
const { app } = createApp([createStubProvider("aelis.location")])
const res = await app.request("/api/admin/aelis.location/config", {
method: "PUT",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ key: "value" }),
})
expect(res.status).toBe(404)
const body = (await res.json()) as { error: string }
expect(body.error).toContain("not found")
})
test("returns 400 for invalid JSON body", async () => {
const { app } = createApp([createStubProvider("aelis.weather")])
const res = await app.request("/api/admin/aelis.weather/config", {
method: "PUT",
headers: { "Content-Type": "application/json" },
body: "not json",
})
expect(res.status).toBe(400)
const body = (await res.json()) as { error: string }
expect(body.error).toContain("Invalid JSON")
})
test("returns 400 when weather config fails validation", async () => {
const { app } = createApp([createStubProvider("aelis.weather")])
const res = await app.request("/api/admin/aelis.weather/config", {
method: "PUT",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ credentials: { privateKey: 123 } }),
})
expect(res.status).toBe(400)
const body = (await res.json()) as { error: string }
expect(body.error).toBeDefined()
})
test("returns 204 and applies valid weather config", async () => {
const { app, sessionManager } = createApp([createStubProvider("aelis.weather")])
const originalProvider = sessionManager.getProvider("aelis.weather")
const res = await app.request("/api/admin/aelis.weather/config", {
method: "PUT",
headers: { "Content-Type": "application/json" },
body: JSON.stringify(validWeatherConfig),
})
expect(res.status).toBe(204)
// Provider was replaced with a new instance
const provider = sessionManager.getProvider("aelis.weather")
expect(provider).toBeDefined()
expect(provider!.sourceId).toBe("aelis.weather")
expect(provider).not.toBe(originalProvider)
})
})

View File

@@ -0,0 +1,86 @@
import type { Context, Hono } from "hono"
import { type } from "arktype"
import { createMiddleware } from "hono/factory"
import type { AdminMiddleware } from "../auth/admin-middleware.ts"
import type { Database } from "../db/index.ts"
import type { UserSessionManager } from "../session/index.ts"
import { WeatherSourceProvider } from "../weather/provider.ts"
type Env = {
Variables: {
sessionManager: UserSessionManager
db: Database
}
}
interface AdminHttpHandlersDeps {
sessionManager: UserSessionManager
adminMiddleware: AdminMiddleware
db: Database
}
export function registerAdminHttpHandlers(
app: Hono,
{ sessionManager, adminMiddleware, db }: AdminHttpHandlersDeps,
) {
const inject = createMiddleware<Env>(async (c, next) => {
c.set("sessionManager", sessionManager)
c.set("db", db)
await next()
})
app.put("/api/admin/:sourceId/config", inject, adminMiddleware, handleUpdateProviderConfig)
}
const WeatherKitSourceProviderConfig = type({
credentials: {
privateKey: "string",
keyId: "string",
teamId: "string",
serviceId: "string",
},
})
async function handleUpdateProviderConfig(c: Context<Env>) {
const sourceId = c.req.param("sourceId")
if (!sourceId) {
return c.body(null, 404)
}
const sessionManager = c.get("sessionManager")
let body: unknown
try {
body = await c.req.json()
} catch {
return c.json({ error: "Invalid JSON" }, 400)
}
switch (sourceId) {
case "aelis.weather": {
const parsed = WeatherKitSourceProviderConfig(body)
if (parsed instanceof type.errors) {
return c.json({ error: parsed.summary }, 400)
}
const updated = new WeatherSourceProvider({
credentials: parsed.credentials,
})
try {
await sessionManager.replaceProvider(updated)
} catch (err) {
console.error(`[admin] replaceProvider("${sourceId}") failed:`, err)
return c.json({ error: "Failed to apply config" }, 500)
}
return c.body(null, 204)
}
default:
return c.json({ error: `Provider "${sourceId}" not found` }, 404)
}
}

View File

@@ -0,0 +1,95 @@
import { Hono } from "hono"
import { describe, expect, test } from "bun:test"
import type { Auth } from "./index.ts"
import type { AuthSession, AuthUser } from "./session.ts"
import { createRequireAdmin } from "./admin-middleware.ts"
function makeUser(role: string | null): AuthUser {
const now = new Date()
return {
id: "user-1",
name: "Test User",
email: "test@example.com",
emailVerified: true,
image: null,
createdAt: now,
updatedAt: now,
role,
banned: false,
banReason: null,
banExpires: null,
}
}
function makeSession(): AuthSession {
const now = new Date()
return {
id: "sess-1",
userId: "user-1",
token: "tok-1",
expiresAt: new Date(now.getTime() + 7 * 24 * 60 * 60 * 1000),
ipAddress: "127.0.0.1",
userAgent: "test",
createdAt: now,
updatedAt: now,
}
}
function mockAuth(sessionResult: { user: AuthUser; session: AuthSession } | null): Auth {
return {
api: {
getSession: async () => sessionResult,
},
} as unknown as Auth
}
function createApp(auth: Auth) {
const app = new Hono()
const middleware = createRequireAdmin(auth)
app.get("/api/admin/test", middleware, (c) => c.json({ ok: true }))
return app
}
describe("createRequireAdmin", () => {
test("returns 401 when no session", async () => {
const app = createApp(mockAuth(null))
const res = await app.request("/api/admin/test")
expect(res.status).toBe(401)
const body = (await res.json()) as { error: string }
expect(body.error).toBe("Unauthorized")
})
test("returns 403 when user is not admin", async () => {
const app = createApp(mockAuth({ user: makeUser("user"), session: makeSession() }))
const res = await app.request("/api/admin/test")
expect(res.status).toBe(403)
const body = (await res.json()) as { error: string }
expect(body.error).toBe("Forbidden")
})
test("returns 403 when role is null", async () => {
const app = createApp(mockAuth({ user: makeUser(null), session: makeSession() }))
const res = await app.request("/api/admin/test")
expect(res.status).toBe(403)
})
test("allows admin users through and sets context", async () => {
const user = makeUser("admin")
const session = makeSession()
const app = createApp(mockAuth({ user, session }))
const res = await app.request("/api/admin/test")
expect(res.status).toBe(200)
const body = (await res.json()) as { ok: boolean }
expect(body.ok).toBe(true)
})
})

View File

@@ -0,0 +1,28 @@
import type { Context, MiddlewareHandler, Next } from "hono"
import type { Auth } from "./index.ts"
import type { AuthSessionEnv } from "./session-middleware.ts"
export type AdminMiddleware = MiddlewareHandler<AuthSessionEnv>
/**
* Creates a middleware that requires a valid session with admin role.
* Returns 401 if not authenticated, 403 if not admin.
*/
export function createRequireAdmin(auth: Auth): AdminMiddleware {
return async (c: Context, next: Next): Promise<Response | void> => {
const session = await auth.api.getSession({ headers: c.req.raw.headers })
if (!session) {
return c.json({ error: "Unauthorized" }, 401)
}
if (session.user.role !== "admin") {
return c.json({ error: "Forbidden" }, 403)
}
c.set("user", session.user)
c.set("session", session.session)
await next()
}
}

View File

@@ -1,9 +1,11 @@
import type { ActionDefinition, ContextEntry, FeedItem, FeedSource } from "@aelis/core" import type { ActionDefinition, ContextEntry, FeedItem, FeedSource } from "@aelis/core"
import { contextKey } from "@aelis/core" import { contextKey } from "@aelis/core"
import { describe, expect, spyOn, test } from "bun:test" import { describe, expect, mock, spyOn, test } from "bun:test"
import { Hono } from "hono" import { Hono } from "hono"
import type { Database } from "../db/index.ts"
import { mockAuthSessionMiddleware } from "../auth/session-middleware.ts" import { mockAuthSessionMiddleware } from "../auth/session-middleware.ts"
import { UserSessionManager } from "../session/index.ts" import { UserSessionManager } from "../session/index.ts"
import { registerFeedHttpHandlers } from "./http.ts" import { registerFeedHttpHandlers } from "./http.ts"
@@ -50,9 +52,45 @@ function buildTestApp(sessionManager: UserSessionManager, userId?: string) {
return app return app
} }
let mockEnabledSourceIds: string[] = []
mock.module("../sources/user-sources.ts", () => ({
sources: (_db: Database, _userId: string) => ({
async enabled() {
const now = new Date()
return mockEnabledSourceIds.map((sourceId) => ({
id: crypto.randomUUID(),
userId: _userId,
sourceId,
enabled: true,
config: {},
credentials: null,
createdAt: now,
updatedAt: now,
}))
},
async find(sourceId: string) {
const now = new Date()
return {
id: crypto.randomUUID(),
userId: _userId,
sourceId,
enabled: true,
config: {},
credentials: null,
createdAt: now,
updatedAt: now,
}
},
}),
}))
const fakeDb = {} as Database
describe("GET /api/feed", () => { describe("GET /api/feed", () => {
test("returns 401 without auth", async () => { test("returns 401 without auth", async () => {
const manager = new UserSessionManager({ providers: [] }) mockEnabledSourceIds = []
const manager = new UserSessionManager({ db: fakeDb, providers: [] })
const app = buildTestApp(manager) const app = buildTestApp(manager)
const res = await app.request("/api/feed") const res = await app.request("/api/feed")
@@ -71,7 +109,9 @@ describe("GET /api/feed", () => {
data: { value: 42 }, data: { value: 42 },
}, },
] ]
mockEnabledSourceIds = ["test"]
const manager = new UserSessionManager({ const manager = new UserSessionManager({
db: fakeDb,
providers: [ providers: [
{ {
sourceId: "test", sourceId: "test",
@@ -111,7 +151,9 @@ describe("GET /api/feed", () => {
data: { fresh: true }, data: { fresh: true },
}, },
] ]
mockEnabledSourceIds = ["test"]
const manager = new UserSessionManager({ const manager = new UserSessionManager({
db: fakeDb,
providers: [ providers: [
{ {
sourceId: "test", sourceId: "test",
@@ -150,7 +192,9 @@ describe("GET /api/feed", () => {
throw new Error("connection timeout") throw new Error("connection timeout")
}, },
} }
mockEnabledSourceIds = ["failing"]
const manager = new UserSessionManager({ const manager = new UserSessionManager({
db: fakeDb,
providers: [ providers: [
{ {
sourceId: "failing", sourceId: "failing",
@@ -173,7 +217,9 @@ describe("GET /api/feed", () => {
}) })
test("returns 503 when all providers fail", async () => { test("returns 503 when all providers fail", async () => {
mockEnabledSourceIds = ["test"]
const manager = new UserSessionManager({ const manager = new UserSessionManager({
db: fakeDb,
providers: [ providers: [
{ {
sourceId: "test", sourceId: "test",
@@ -206,7 +252,9 @@ describe("GET /api/context", () => {
const mockUserId = "k7Gx2mPqRvNwYs9TdLfA4bHcJeUo1iZn" const mockUserId = "k7Gx2mPqRvNwYs9TdLfA4bHcJeUo1iZn"
async function buildContextApp(userId?: string) { async function buildContextApp(userId?: string) {
mockEnabledSourceIds = ["weather"]
const manager = new UserSessionManager({ const manager = new UserSessionManager({
db: fakeDb,
providers: [ providers: [
{ {
sourceId: "weather", sourceId: "weather",
@@ -222,7 +270,8 @@ describe("GET /api/context", () => {
} }
test("returns 401 without auth", async () => { test("returns 401 without auth", async () => {
const manager = new UserSessionManager({ providers: [] }) mockEnabledSourceIds = []
const manager = new UserSessionManager({ db: fakeDb, providers: [] })
const app = buildTestApp(manager) const app = buildTestApp(manager)
const res = await app.request('/api/context?key=["aelis.weather","weather"]') const res = await app.request('/api/context?key=["aelis.weather","weather"]')

View File

@@ -50,11 +50,13 @@ export function createLlmClient(config: LlmClientConfig): LlmClient {
schema: enhancementResultJsonSchema, schema: enhancementResultJsonSchema,
}, },
}, },
reasoning: { effort: "none" },
stream: false, stream: false,
}, },
}) })
const content = response.choices?.[0]?.message?.content const message = response.choices?.[0]?.message
const content = message?.content ?? message?.reasoning
if (typeof content !== "string") { if (typeof content !== "string") {
console.warn("[enhancement] LLM returned no content in response") console.warn("[enhancement] LLM returned no content in response")
return null return null

View File

@@ -1,26 +1,11 @@
import { LocationSource } from "@aelis/source-location" import { LocationSource } from "@aelis/source-location"
import type { Database } from "../db/index.ts"
import type { FeedSourceProvider } from "../session/feed-source-provider.ts" import type { FeedSourceProvider } from "../session/feed-source-provider.ts"
import { SourceDisabledError } from "../sources/errors.ts"
import { sources } from "../sources/user-sources.ts"
export class LocationSourceProvider implements FeedSourceProvider { export class LocationSourceProvider implements FeedSourceProvider {
readonly sourceId = "aelis.location" readonly sourceId = "aelis.location"
private readonly db: Database
constructor(db: Database) {
this.db = db
}
async feedSourceForUser(userId: string): Promise<LocationSource> {
const row = await sources(this.db, userId).find("aelis.location")
if (!row || !row.enabled) {
throw new SourceDisabledError("aelis.location", userId)
}
async feedSourceForUser(_userId: string, _config: unknown): Promise<LocationSource> {
return new LocationSource() return new LocationSource()
} }
} }

View File

@@ -1,5 +1,7 @@
import { Hono } from "hono" import { Hono } from "hono"
import { registerAdminHttpHandlers } from "./admin/http.ts"
import { createRequireAdmin } from "./auth/admin-middleware.ts"
import { registerAuthHandlers } from "./auth/http.ts" import { registerAuthHandlers } from "./auth/http.ts"
import { createAuth } from "./auth/index.ts" import { createAuth } from "./auth/index.ts"
import { createRequireSession } from "./auth/session-middleware.ts" import { createRequireSession } from "./auth/session-middleware.ts"
@@ -10,6 +12,7 @@ import { createLlmClient } from "./enhancement/llm-client.ts"
import { registerLocationHttpHandlers } from "./location/http.ts" import { registerLocationHttpHandlers } from "./location/http.ts"
import { LocationSourceProvider } from "./location/provider.ts" import { LocationSourceProvider } from "./location/provider.ts"
import { UserSessionManager } from "./session/index.ts" import { UserSessionManager } from "./session/index.ts"
import { registerSourcesHttpHandlers } from "./sources/http.ts"
import { WeatherSourceProvider } from "./weather/provider.ts" import { WeatherSourceProvider } from "./weather/provider.ts"
function main() { function main() {
@@ -30,10 +33,10 @@ function main() {
} }
const sessionManager = new UserSessionManager({ const sessionManager = new UserSessionManager({
db,
providers: [ providers: [
new LocationSourceProvider(db), new LocationSourceProvider(),
new WeatherSourceProvider({ new WeatherSourceProvider({
db,
credentials: { credentials: {
privateKey: process.env.WEATHERKIT_PRIVATE_KEY!, privateKey: process.env.WEATHERKIT_PRIVATE_KEY!,
keyId: process.env.WEATHERKIT_KEY_ID!, keyId: process.env.WEATHERKIT_KEY_ID!,
@@ -50,6 +53,7 @@ function main() {
app.get("/health", (c) => c.json({ status: "ok" })) app.get("/health", (c) => c.json({ status: "ok" }))
const authSessionMiddleware = createRequireSession(auth) const authSessionMiddleware = createRequireSession(auth)
const adminMiddleware = createRequireAdmin(auth)
registerAuthHandlers(app, auth) registerAuthHandlers(app, auth)
@@ -58,6 +62,8 @@ function main() {
authSessionMiddleware, authSessionMiddleware,
}) })
registerLocationHttpHandlers(app, { sessionManager, authSessionMiddleware }) registerLocationHttpHandlers(app, { sessionManager, authSessionMiddleware })
registerSourcesHttpHandlers(app, { sessionManager, authSessionMiddleware })
registerAdminHttpHandlers(app, { sessionManager, adminMiddleware, db })
process.on("SIGTERM", async () => { process.on("SIGTERM", async () => {
await closeDb() await closeDb()

View File

@@ -1,7 +1,12 @@
import type { FeedSource } from "@aelis/core" import type { FeedSource } from "@aelis/core"
import type { type } from "arktype"
export type ConfigSchema = ReturnType<typeof type>
export interface FeedSourceProvider { export interface FeedSourceProvider {
/** The source ID this provider is responsible for (e.g., "aelis.location"). */ /** The source ID this provider is responsible for (e.g., "aelis.location"). */
readonly sourceId: string readonly sourceId: string
feedSourceForUser(userId: string): Promise<FeedSource> /** Arktype schema for validating user-provided config. Omit if the source has no config. */
readonly configSchema?: ConfigSchema
feedSourceForUser(userId: string, config: unknown): Promise<FeedSource>
} }

View File

@@ -2,12 +2,77 @@ import type { ActionDefinition, ContextEntry, FeedItem, FeedSource } from "@aeli
import { LocationSource } from "@aelis/source-location" import { LocationSource } from "@aelis/source-location"
import { WeatherSource } from "@aelis/source-weatherkit" import { WeatherSource } from "@aelis/source-weatherkit"
import { describe, expect, mock, spyOn, test } from "bun:test" import { beforeEach, describe, expect, mock, spyOn, test } from "bun:test"
import type { Database } from "../db/index.ts"
import type { FeedSourceProvider } from "./feed-source-provider.ts" import type { FeedSourceProvider } from "./feed-source-provider.ts"
import { UserSessionManager } from "./user-session-manager.ts" import { UserSessionManager } from "./user-session-manager.ts"
/**
* Per-user enabled source IDs used by the mocked `sources` module.
* Tests configure this before calling getOrCreate.
* Key = userId (or "*" for a default), value = array of enabled sourceIds.
*/
const enabledByUser = new Map<string, string[]>()
/** Set which sourceIds are enabled for all users. */
function setEnabledSources(sourceIds: string[]) {
enabledByUser.clear()
enabledByUser.set("*", sourceIds)
}
/** Set which sourceIds are enabled for a specific user. */
function setEnabledSourcesForUser(userId: string, sourceIds: string[]) {
enabledByUser.set(userId, sourceIds)
}
function getEnabledSourceIds(userId: string): string[] {
return enabledByUser.get(userId) ?? enabledByUser.get("*") ?? []
}
/**
* Controls what `find()` returns in the mock. When `undefined` (the default),
* `find()` returns a standard enabled row. Set to a specific value (including
* `null`) to override the return value for all `find()` calls.
*/
let mockFindResult: unknown | undefined
// Mock the sources module so UserSessionManager's DB query returns controlled data.
mock.module("../sources/user-sources.ts", () => ({
sources: (_db: Database, userId: string) => ({
async enabled() {
const now = new Date()
return getEnabledSourceIds(userId).map((sourceId) => ({
id: crypto.randomUUID(),
userId,
sourceId,
enabled: true,
config: {},
credentials: null,
createdAt: now,
updatedAt: now,
}))
},
async find(sourceId: string) {
if (mockFindResult !== undefined) return mockFindResult
const now = new Date()
return {
id: crypto.randomUUID(),
userId,
sourceId,
enabled: true,
config: {},
credentials: null,
createdAt: now,
updatedAt: now,
}
},
}),
}))
const fakeDb = {} as Database
function createStubSource(id: string, items: FeedItem[] = []): FeedSource { function createStubSource(id: string, items: FeedItem[] = []): FeedSource {
return { return {
id, id,
@@ -28,7 +93,8 @@ function createStubSource(id: string, items: FeedItem[] = []): FeedSource {
function createStubProvider( function createStubProvider(
sourceId: string, sourceId: string,
factory: (userId: string) => Promise<FeedSource> = async () => createStubSource(sourceId), factory: (userId: string, config: Record<string, unknown>) => Promise<FeedSource> = async () =>
createStubSource(sourceId),
): FeedSourceProvider { ): FeedSourceProvider {
return { sourceId, feedSourceForUser: factory } return { sourceId, feedSourceForUser: factory }
} }
@@ -47,9 +113,15 @@ const weatherProvider: FeedSourceProvider = {
}, },
} }
beforeEach(() => {
enabledByUser.clear()
mockFindResult = undefined
})
describe("UserSessionManager", () => { describe("UserSessionManager", () => {
test("getOrCreate creates session on first call", async () => { test("getOrCreate creates session on first call", async () => {
const manager = new UserSessionManager({ providers: [locationProvider] }) setEnabledSources(["aelis.location"])
const manager = new UserSessionManager({ db: fakeDb, providers: [locationProvider] })
const session = await manager.getOrCreate("user-1") const session = await manager.getOrCreate("user-1")
@@ -58,7 +130,8 @@ describe("UserSessionManager", () => {
}) })
test("getOrCreate returns same session for same user", async () => { test("getOrCreate returns same session for same user", async () => {
const manager = new UserSessionManager({ providers: [locationProvider] }) setEnabledSources(["aelis.location"])
const manager = new UserSessionManager({ db: fakeDb, providers: [locationProvider] })
const session1 = await manager.getOrCreate("user-1") const session1 = await manager.getOrCreate("user-1")
const session2 = await manager.getOrCreate("user-1") const session2 = await manager.getOrCreate("user-1")
@@ -67,7 +140,8 @@ describe("UserSessionManager", () => {
}) })
test("getOrCreate returns different sessions for different users", async () => { test("getOrCreate returns different sessions for different users", async () => {
const manager = new UserSessionManager({ providers: [locationProvider] }) setEnabledSources(["aelis.location"])
const manager = new UserSessionManager({ db: fakeDb, providers: [locationProvider] })
const session1 = await manager.getOrCreate("user-1") const session1 = await manager.getOrCreate("user-1")
const session2 = await manager.getOrCreate("user-2") const session2 = await manager.getOrCreate("user-2")
@@ -76,7 +150,8 @@ describe("UserSessionManager", () => {
}) })
test("each user gets independent source instances", async () => { test("each user gets independent source instances", async () => {
const manager = new UserSessionManager({ providers: [locationProvider] }) setEnabledSources(["aelis.location"])
const manager = new UserSessionManager({ db: fakeDb, providers: [locationProvider] })
const session1 = await manager.getOrCreate("user-1") const session1 = await manager.getOrCreate("user-1")
const session2 = await manager.getOrCreate("user-2") const session2 = await manager.getOrCreate("user-2")
@@ -88,7 +163,8 @@ describe("UserSessionManager", () => {
}) })
test("remove destroys session and allows re-creation", async () => { test("remove destroys session and allows re-creation", async () => {
const manager = new UserSessionManager({ providers: [locationProvider] }) setEnabledSources(["aelis.location"])
const manager = new UserSessionManager({ db: fakeDb, providers: [locationProvider] })
const session1 = await manager.getOrCreate("user-1") const session1 = await manager.getOrCreate("user-1")
manager.remove("user-1") manager.remove("user-1")
@@ -98,13 +174,16 @@ describe("UserSessionManager", () => {
}) })
test("remove is no-op for unknown user", () => { test("remove is no-op for unknown user", () => {
const manager = new UserSessionManager({ providers: [locationProvider] }) setEnabledSources(["aelis.location"])
const manager = new UserSessionManager({ db: fakeDb, providers: [locationProvider] })
expect(() => manager.remove("unknown")).not.toThrow() expect(() => manager.remove("unknown")).not.toThrow()
}) })
test("registers multiple providers", async () => { test("registers multiple providers", async () => {
setEnabledSources(["aelis.location", "aelis.weather"])
const manager = new UserSessionManager({ const manager = new UserSessionManager({
db: fakeDb,
providers: [locationProvider, weatherProvider], providers: [locationProvider, weatherProvider],
}) })
@@ -115,7 +194,8 @@ describe("UserSessionManager", () => {
}) })
test("refresh returns feed result through session", async () => { test("refresh returns feed result through session", async () => {
const manager = new UserSessionManager({ providers: [locationProvider] }) setEnabledSources(["aelis.location"])
const manager = new UserSessionManager({ db: fakeDb, providers: [locationProvider] })
const session = await manager.getOrCreate("user-1") const session = await manager.getOrCreate("user-1")
const result = await session.engine.refresh() const result = await session.engine.refresh()
@@ -127,7 +207,8 @@ describe("UserSessionManager", () => {
}) })
test("location update via executeAction works", async () => { test("location update via executeAction works", async () => {
const manager = new UserSessionManager({ providers: [locationProvider] }) setEnabledSources(["aelis.location"])
const manager = new UserSessionManager({ db: fakeDb, providers: [locationProvider] })
const session = await manager.getOrCreate("user-1") const session = await manager.getOrCreate("user-1")
await session.engine.executeAction("aelis.location", "update-location", { await session.engine.executeAction("aelis.location", "update-location", {
@@ -142,7 +223,8 @@ describe("UserSessionManager", () => {
}) })
test("subscribe receives updates after location push", async () => { test("subscribe receives updates after location push", async () => {
const manager = new UserSessionManager({ providers: [locationProvider] }) setEnabledSources(["aelis.location"])
const manager = new UserSessionManager({ db: fakeDb, providers: [locationProvider] })
const callback = mock() const callback = mock()
const session = await manager.getOrCreate("user-1") const session = await manager.getOrCreate("user-1")
@@ -162,7 +244,8 @@ describe("UserSessionManager", () => {
}) })
test("remove stops reactive updates", async () => { test("remove stops reactive updates", async () => {
const manager = new UserSessionManager({ providers: [locationProvider] }) setEnabledSources(["aelis.location"])
const manager = new UserSessionManager({ db: fakeDb, providers: [locationProvider] })
const callback = mock() const callback = mock()
const session = await manager.getOrCreate("user-1") const session = await manager.getOrCreate("user-1")
@@ -185,6 +268,7 @@ describe("UserSessionManager", () => {
}) })
test("creates session with successful providers when some fail", async () => { test("creates session with successful providers when some fail", async () => {
setEnabledSources(["aelis.location", "aelis.failing"])
const failingProvider: FeedSourceProvider = { const failingProvider: FeedSourceProvider = {
sourceId: "aelis.failing", sourceId: "aelis.failing",
async feedSourceForUser() { async feedSourceForUser() {
@@ -193,6 +277,7 @@ describe("UserSessionManager", () => {
} }
const manager = new UserSessionManager({ const manager = new UserSessionManager({
db: fakeDb,
providers: [locationProvider, failingProvider], providers: [locationProvider, failingProvider],
}) })
@@ -208,7 +293,9 @@ describe("UserSessionManager", () => {
}) })
test("throws AggregateError when all providers fail", async () => { test("throws AggregateError when all providers fail", async () => {
setEnabledSources(["aelis.fail-1", "aelis.fail-2"])
const manager = new UserSessionManager({ const manager = new UserSessionManager({
db: fakeDb,
providers: [ providers: [
{ {
sourceId: "aelis.fail-1", sourceId: "aelis.fail-1",
@@ -229,8 +316,10 @@ describe("UserSessionManager", () => {
}) })
test("concurrent getOrCreate for same user returns same session", async () => { test("concurrent getOrCreate for same user returns same session", async () => {
setEnabledSources(["aelis.location"])
let callCount = 0 let callCount = 0
const manager = new UserSessionManager({ const manager = new UserSessionManager({
db: fakeDb,
providers: [ providers: [
{ {
sourceId: "aelis.location", sourceId: "aelis.location",
@@ -253,12 +342,14 @@ describe("UserSessionManager", () => {
}) })
test("remove during in-flight getOrCreate prevents session from being stored", async () => { test("remove during in-flight getOrCreate prevents session from being stored", async () => {
setEnabledSources(["aelis.location"])
let resolveProvider: () => void let resolveProvider: () => void
const providerGate = new Promise<void>((r) => { const providerGate = new Promise<void>((r) => {
resolveProvider = r resolveProvider = r
}) })
const manager = new UserSessionManager({ const manager = new UserSessionManager({
db: fakeDb,
providers: [ providers: [
{ {
sourceId: "aelis.location", sourceId: "aelis.location",
@@ -285,10 +376,67 @@ describe("UserSessionManager", () => {
expect(freshSession).toBeDefined() expect(freshSession).toBeDefined()
expect(freshSession.engine).toBeDefined() expect(freshSession.engine).toBeDefined()
}) })
test("only invokes providers for sources enabled for the user", async () => {
setEnabledSources(["aelis.location"])
const locationFactory = mock(async () => createStubSource("aelis.location"))
const weatherFactory = mock(async () => createStubSource("aelis.weather"))
const manager = new UserSessionManager({
db: fakeDb,
providers: [
{ sourceId: "aelis.location", feedSourceForUser: locationFactory },
{ sourceId: "aelis.weather", feedSourceForUser: weatherFactory },
],
})
const session = await manager.getOrCreate("user-1")
expect(locationFactory).toHaveBeenCalledTimes(1)
expect(weatherFactory).not.toHaveBeenCalled()
expect(session.getSource("aelis.location")).toBeDefined()
expect(session.getSource("aelis.weather")).toBeUndefined()
})
test("creates empty session when no sources are enabled", async () => {
setEnabledSources([])
const factory = mock(async () => createStubSource("aelis.location"))
const manager = new UserSessionManager({
db: fakeDb,
providers: [{ sourceId: "aelis.location", feedSourceForUser: factory }],
})
const session = await manager.getOrCreate("user-1")
expect(factory).not.toHaveBeenCalled()
expect(session).toBeDefined()
expect(session.getSource("aelis.location")).toBeUndefined()
})
test("per-user enabled sources are respected", async () => {
enabledByUser.clear()
setEnabledSourcesForUser("user-1", ["aelis.location"])
setEnabledSourcesForUser("user-2", ["aelis.weather"])
const manager = new UserSessionManager({
db: fakeDb,
providers: [createStubProvider("aelis.location"), createStubProvider("aelis.weather")],
})
const session1 = await manager.getOrCreate("user-1")
const session2 = await manager.getOrCreate("user-2")
expect(session1.getSource("aelis.location")).toBeDefined()
expect(session1.getSource("aelis.weather")).toBeUndefined()
expect(session2.getSource("aelis.location")).toBeUndefined()
expect(session2.getSource("aelis.weather")).toBeDefined()
})
}) })
describe("UserSessionManager.replaceProvider", () => { describe("UserSessionManager.replaceProvider", () => {
test("replaces source in all active sessions", async () => { test("replaces source in all active sessions", async () => {
setEnabledSources(["test"])
const itemsV1: FeedItem[] = [ const itemsV1: FeedItem[] = [
{ {
id: "v1", id: "v1",
@@ -309,7 +457,7 @@ describe("UserSessionManager.replaceProvider", () => {
] ]
const providerV1 = createStubProvider("test", async () => createStubSource("test", itemsV1)) const providerV1 = createStubProvider("test", async () => createStubSource("test", itemsV1))
const manager = new UserSessionManager({ providers: [providerV1] }) const manager = new UserSessionManager({ db: fakeDb, providers: [providerV1] })
const session1 = await manager.getOrCreate("user-1") const session1 = await manager.getOrCreate("user-1")
const session2 = await manager.getOrCreate("user-2") const session2 = await manager.getOrCreate("user-2")
@@ -330,7 +478,8 @@ describe("UserSessionManager.replaceProvider", () => {
}) })
test("throws for unknown provider sourceId", async () => { test("throws for unknown provider sourceId", async () => {
const manager = new UserSessionManager({ providers: [locationProvider] }) setEnabledSources(["aelis.location"])
const manager = new UserSessionManager({ db: fakeDb, providers: [locationProvider] })
const unknownProvider = createStubProvider("aelis.unknown") const unknownProvider = createStubProvider("aelis.unknown")
@@ -339,9 +488,10 @@ describe("UserSessionManager.replaceProvider", () => {
) )
}) })
test("removes source from session when new provider fails for a user", async () => { test("keeps existing source when new provider fails for a user", async () => {
setEnabledSources(["test"])
const providerV1 = createStubProvider("test", async () => createStubSource("test")) const providerV1 = createStubProvider("test", async () => createStubSource("test"))
const manager = new UserSessionManager({ providers: [providerV1] }) const manager = new UserSessionManager({ db: fakeDb, providers: [providerV1] })
const session = await manager.getOrCreate("user-1") const session = await manager.getOrCreate("user-1")
expect(session.getSource("test")).toBeDefined() expect(session.getSource("test")).toBeDefined()
@@ -353,13 +503,14 @@ describe("UserSessionManager.replaceProvider", () => {
}) })
await manager.replaceProvider(failingProvider) await manager.replaceProvider(failingProvider)
expect(session.getSource("test")).toBeUndefined() expect(session.getSource("test")).toBeDefined()
expect(spy).toHaveBeenCalled() expect(spy).toHaveBeenCalled()
spy.mockRestore() spy.mockRestore()
}) })
test("new sessions use the replaced provider", async () => { test("new sessions use the replaced provider", async () => {
setEnabledSources(["test"])
const itemsV1: FeedItem[] = [ const itemsV1: FeedItem[] = [
{ {
id: "v1", id: "v1",
@@ -380,7 +531,7 @@ describe("UserSessionManager.replaceProvider", () => {
] ]
const providerV1 = createStubProvider("test", async () => createStubSource("test", itemsV1)) const providerV1 = createStubProvider("test", async () => createStubSource("test", itemsV1))
const manager = new UserSessionManager({ providers: [providerV1] }) const manager = new UserSessionManager({ db: fakeDb, providers: [providerV1] })
const providerV2 = createStubProvider("test", async () => createStubSource("test", itemsV2)) const providerV2 = createStubProvider("test", async () => createStubSource("test", itemsV2))
await manager.replaceProvider(providerV2) await manager.replaceProvider(providerV2)
@@ -392,6 +543,7 @@ describe("UserSessionManager.replaceProvider", () => {
}) })
test("does not affect other providers' sources", async () => { test("does not affect other providers' sources", async () => {
setEnabledSources(["source-a", "source-b"])
const providerA = createStubProvider("source-a", async () => const providerA = createStubProvider("source-a", async () =>
createStubSource("source-a", [ createStubSource("source-a", [
{ {
@@ -415,7 +567,7 @@ describe("UserSessionManager.replaceProvider", () => {
]), ]),
) )
const manager = new UserSessionManager({ providers: [providerA, providerB] }) const manager = new UserSessionManager({ db: fakeDb, providers: [providerA, providerB] })
const session = await manager.getOrCreate("user-1") const session = await manager.getOrCreate("user-1")
// Replace only source-a // Replace only source-a
@@ -440,6 +592,7 @@ describe("UserSessionManager.replaceProvider", () => {
}) })
test("updates sessions that are still being created", async () => { test("updates sessions that are still being created", async () => {
setEnabledSources(["test"])
const itemsV1: FeedItem[] = [ const itemsV1: FeedItem[] = [
{ {
id: "v1", id: "v1",
@@ -468,7 +621,7 @@ describe("UserSessionManager.replaceProvider", () => {
await creationGate await creationGate
return createStubSource("test", itemsV1) return createStubSource("test", itemsV1)
}) })
const manager = new UserSessionManager({ providers: [providerV1] }) const manager = new UserSessionManager({ db: fakeDb, providers: [providerV1] })
// Start session creation but don't let it finish yet // Start session creation but don't let it finish yet
const sessionPromise = manager.getOrCreate("user-1") const sessionPromise = manager.getOrCreate("user-1")
@@ -487,4 +640,44 @@ describe("UserSessionManager.replaceProvider", () => {
const feed = await session.feed() const feed = await session.feed()
expect(feed.items[0]!.data.version).toBe(2) expect(feed.items[0]!.data.version).toBe(2)
}) })
test("skips source replacement when source was disabled between creation and replace", async () => {
setEnabledSources(["test"])
const itemsV1: FeedItem[] = [
{
id: "v1",
sourceId: "test",
type: "test",
timestamp: new Date(),
data: { version: 1 },
},
]
const providerV1 = createStubProvider("test", async () => createStubSource("test", itemsV1))
const manager = new UserSessionManager({ db: fakeDb, providers: [providerV1] })
const session = await manager.getOrCreate("user-1")
const feedBefore = await session.feed()
expect(feedBefore.items[0]!.data.version).toBe(1)
// Simulate the source being disabled/deleted between session creation and replace
mockFindResult = null
const providerV2 = createStubProvider("test", async () =>
createStubSource("test", [
{
id: "v2",
sourceId: "test",
type: "test",
timestamp: new Date(),
data: { version: 2 },
},
]),
)
await manager.replaceProvider(providerV2)
// Session should still have v1 — the replace was skipped
const feedAfter = await session.feed()
expect(feedAfter.items[0]!.data.version).toBe(1)
})
}) })

View File

@@ -1,31 +1,67 @@
import type { FeedSource } from "@aelis/core" import type { FeedSource } from "@aelis/core"
import { type } from "arktype"
import merge from "lodash.merge"
import type { Database } from "../db/index.ts"
import type { FeedEnhancer } from "../enhancement/enhance-feed.ts" import type { FeedEnhancer } from "../enhancement/enhance-feed.ts"
import { InvalidSourceConfigError, SourceNotFoundError } from "../sources/errors.ts"
import { sources } from "../sources/user-sources.ts"
import type { FeedSourceProvider } from "./feed-source-provider.ts" import type { FeedSourceProvider } from "./feed-source-provider.ts"
import { UserSession } from "./user-session.ts" import { UserSession } from "./user-session.ts"
export interface UserSessionManagerConfig { export interface UserSessionManagerConfig {
db: Database
providers: FeedSourceProvider[] providers: FeedSourceProvider[]
feedEnhancer?: FeedEnhancer | null feedEnhancer?: FeedEnhancer | null
} }
export class UserSessionManager { export class UserSessionManager {
private sessions = new Map<string, { userId: string; session: UserSession }>() private sessions = new Map<string, UserSession>()
private pending = new Map<string, Promise<UserSession>>() private pending = new Map<string, Promise<UserSession>>()
private readonly db: Database
private readonly providers = new Map<string, FeedSourceProvider>() private readonly providers = new Map<string, FeedSourceProvider>()
private readonly feedEnhancer: FeedEnhancer | null private readonly feedEnhancer: FeedEnhancer | null
private readonly db: Database
constructor(config: UserSessionManagerConfig) { constructor(config: UserSessionManagerConfig) {
this.db = config.db
for (const provider of config.providers) { for (const provider of config.providers) {
this.providers.set(provider.sourceId, provider) this.providers.set(provider.sourceId, provider)
} }
this.feedEnhancer = config.feedEnhancer ?? null this.feedEnhancer = config.feedEnhancer ?? null
this.db = config.db
}
getProvider(sourceId: string): FeedSourceProvider | undefined {
return this.providers.get(sourceId)
}
/**
* Returns the user's config for a source, or defaults if no row exists.
*
* @throws {SourceNotFoundError} if the sourceId has no registered provider
*/
async fetchSourceConfig(
userId: string,
sourceId: string,
): Promise<{ enabled: boolean; config: unknown }> {
const provider = this.providers.get(sourceId)
if (!provider) {
throw new SourceNotFoundError(sourceId, userId)
}
const row = await sources(this.db, userId).find(sourceId)
return {
enabled: row?.enabled ?? false,
config: row?.config ?? {},
}
} }
async getOrCreate(userId: string): Promise<UserSession> { async getOrCreate(userId: string): Promise<UserSession> {
const existing = this.sessions.get(userId) const existing = this.sessions.get(userId)
if (existing) return existing.session if (existing) return existing
const inflight = this.pending.get(userId) const inflight = this.pending.get(userId)
if (inflight) return inflight if (inflight) return inflight
@@ -40,7 +76,7 @@ export class UserSessionManager {
session.destroy() session.destroy()
throw new Error(`Session for user ${userId} was removed during creation`) throw new Error(`Session for user ${userId} was removed during creation`)
} }
this.sessions.set(userId, { userId, session }) this.sessions.set(userId, session)
return session return session
} finally { } finally {
this.pending.delete(userId) this.pending.delete(userId)
@@ -48,20 +84,131 @@ export class UserSessionManager {
} }
remove(userId: string): void { remove(userId: string): void {
const entry = this.sessions.get(userId) const session = this.sessions.get(userId)
if (entry) { if (session) {
entry.session.destroy() session.destroy()
this.sessions.delete(userId) this.sessions.delete(userId)
} }
// Cancel any in-flight creation so getOrCreate won't store the session // Cancel any in-flight creation so getOrCreate won't store the session
this.pending.delete(userId) this.pending.delete(userId)
} }
/**
* Merges, validates, and persists a user's source config and/or enabled
* state, then invalidates the cached session.
*
* @throws {SourceNotFoundError} if the source row doesn't exist
* @throws {InvalidSourceConfigError} if the merged config fails schema validation
*/
async updateSourceConfig(
userId: string,
sourceId: string,
update: { enabled?: boolean; config?: unknown },
): Promise<void> {
const provider = this.providers.get(sourceId)
if (!provider) {
throw new SourceNotFoundError(sourceId, userId)
}
// Nothing to update
if (update.enabled === undefined && update.config === undefined) {
// Still validate existence — updateConfig would throw, but
// we can avoid the DB write entirely.
if (!(await sources(this.db, userId).find(sourceId))) {
throw new SourceNotFoundError(sourceId, userId)
}
return
}
// When config is provided, fetch existing to deep-merge before validating.
// NOTE: find + updateConfig is not atomic. A concurrent update could
// read stale config. Use SELECT FOR UPDATE or atomic jsonb merge if
// this becomes a problem.
let mergedConfig: Record<string, unknown> | undefined
if (update.config !== undefined) {
const existing = await sources(this.db, userId).find(sourceId)
const existingConfig = (existing?.config ?? {}) as Record<string, unknown>
mergedConfig = merge({}, existingConfig, update.config)
if (provider.configSchema) {
const validated = provider.configSchema(mergedConfig)
if (validated instanceof type.errors) {
throw new InvalidSourceConfigError(sourceId, validated.summary)
}
}
}
// Throws SourceNotFoundError if the row doesn't exist
await sources(this.db, userId).updateConfig(sourceId, {
enabled: update.enabled,
config: mergedConfig,
})
// Refresh the specific source in the active session instead of
// destroying the entire session.
const session = this.sessions.get(userId)
if (session) {
if (update.enabled === false) {
session.removeSource(sourceId)
} else {
const source = await provider.feedSourceForUser(userId, mergedConfig ?? {})
session.replaceSource(sourceId, source)
}
}
}
/**
* Validates, persists, and upserts a user's source config, then
* refreshes the cached session. Unlike updateSourceConfig, this
* inserts a new row if one doesn't exist and fully replaces config
* (no merge).
*
* @throws {SourceNotFoundError} if the sourceId has no registered provider
* @throws {InvalidSourceConfigError} if config fails schema validation
*/
async upsertSourceConfig(
userId: string,
sourceId: string,
data: { enabled: boolean; config: unknown },
): Promise<void> {
const provider = this.providers.get(sourceId)
if (!provider) {
throw new SourceNotFoundError(sourceId, userId)
}
if (provider.configSchema) {
const validated = provider.configSchema(data.config)
if (validated instanceof type.errors) {
throw new InvalidSourceConfigError(sourceId, validated.summary)
}
}
await sources(this.db, userId).upsertConfig(sourceId, {
enabled: data.enabled,
config: data.config,
})
const session = this.sessions.get(userId)
if (session) {
if (!data.enabled) {
session.removeSource(sourceId)
} else {
const source = await provider.feedSourceForUser(userId, data.config)
if (session.hasSource(sourceId)) {
session.replaceSource(sourceId, source)
} else {
session.addSource(source)
}
}
}
}
/** /**
* Replaces a provider and updates all active sessions. * Replaces a provider and updates all active sessions.
* The new provider must have the same sourceId as an existing one. * The new provider must have the same sourceId as an existing one.
* For each active session, resolves a new source from the provider. * For each active session, queries the user's source config from the DB
* If the provider fails for a user, the old source is removed from that session. * and re-resolves the source. If the provider fails for a user, the
* existing source is kept.
*/ */
async replaceProvider(provider: FeedSourceProvider): Promise<void> { async replaceProvider(provider: FeedSourceProvider): Promise<void> {
if (!this.providers.has(provider.sourceId)) { if (!this.providers.has(provider.sourceId)) {
@@ -74,16 +221,16 @@ export class UserSessionManager {
const updates: Promise<void>[] = [] const updates: Promise<void>[] = []
for (const [, { userId, session }] of this.sessions) { for (const [, session] of this.sessions) {
updates.push(this.updateSessionSource(provider, userId, session)) updates.push(this.refreshSessionSource(session, provider))
} }
// Also update sessions that are currently being created so they // Also update sessions that are currently being created so they
// don't land in this.sessions with a stale source. // don't land in this.sessions with a stale source.
for (const [userId, pendingPromise] of this.pending) { for (const [, pendingPromise] of this.pending) {
updates.push( updates.push(
pendingPromise pendingPromise
.then((session) => this.updateSessionSource(provider, userId, session)) .then((session) => this.refreshSessionSource(session, provider))
.catch(() => { .catch(() => {
// Session creation itself failed — nothing to update. // Session creation itself failed — nothing to update.
}), }),
@@ -93,40 +240,60 @@ export class UserSessionManager {
await Promise.all(updates) await Promise.all(updates)
} }
private async updateSessionSource( /**
provider: FeedSourceProvider, * Re-resolves a single source for a session by querying the user's config
userId: string, * from the DB and calling the provider. If the provider fails, the existing
* source is kept.
*/
private async refreshSessionSource(
session: UserSession, session: UserSession,
provider: FeedSourceProvider,
): Promise<void> { ): Promise<void> {
if (!session.hasSource(provider.sourceId)) return
try { try {
const newSource = await provider.feedSourceForUser(userId) const row = await sources(this.db, session.userId).find(provider.sourceId)
if (!row?.enabled) return
const newSource = await provider.feedSourceForUser(session.userId, row.config ?? {})
session.replaceSource(provider.sourceId, newSource) session.replaceSource(provider.sourceId, newSource)
} catch (err) { } catch (err) {
console.error( console.error(
`[UserSessionManager] replaceProvider("${provider.sourceId}") failed for user ${userId}:`, `[UserSessionManager] refreshSource("${provider.sourceId}") failed for user ${session.userId}:`,
err, err,
) )
session.removeSource(provider.sourceId)
} }
} }
private async createSession(userId: string): Promise<UserSession> { private async createSession(userId: string): Promise<UserSession> {
const results = await Promise.allSettled( const enabledRows = await sources(this.db, userId).enabled()
Array.from(this.providers.values()).map((p) => p.feedSourceForUser(userId)),
)
const sources: FeedSource[] = [] const promises: Promise<FeedSource>[] = []
for (const row of enabledRows) {
const provider = this.providers.get(row.sourceId)
if (provider) {
promises.push(provider.feedSourceForUser(userId, row.config ?? {}))
}
}
if (promises.length === 0) {
return new UserSession(userId, [], this.feedEnhancer)
}
const results = await Promise.allSettled(promises)
const feedSources: FeedSource[] = []
const errors: unknown[] = [] const errors: unknown[] = []
for (const result of results) { for (const result of results) {
if (result.status === "fulfilled") { if (result.status === "fulfilled") {
sources.push(result.value) feedSources.push(result.value)
} else { } else {
errors.push(result.reason) errors.push(result.reason)
} }
} }
if (sources.length === 0 && errors.length > 0) { if (feedSources.length === 0 && errors.length > 0) {
throw new AggregateError(errors, "All feed source providers failed") throw new AggregateError(errors, "All feed source providers failed")
} }
@@ -134,6 +301,6 @@ export class UserSessionManager {
console.error("[UserSessionManager] Feed source provider failed:", error) console.error("[UserSessionManager] Feed source provider failed:", error)
} }
return new UserSession(sources, this.feedEnhancer) return new UserSession(userId, feedSources, this.feedEnhancer)
} }
} }

View File

@@ -1,7 +1,7 @@
import type { ActionDefinition, ContextEntry, FeedItem, FeedSource } from "@aelis/core" import type { ActionDefinition, ContextEntry, FeedItem, FeedSource } from "@aelis/core"
import { LocationSource } from "@aelis/source-location" import { LocationSource } from "@aelis/source-location"
import { describe, expect, test } from "bun:test" import { describe, expect, spyOn, test } from "bun:test"
import { UserSession } from "./user-session.ts" import { UserSession } from "./user-session.ts"
@@ -25,7 +25,10 @@ function createStubSource(id: string, items: FeedItem[] = []): FeedSource {
describe("UserSession", () => { describe("UserSession", () => {
test("registers sources and starts engine", async () => { test("registers sources and starts engine", async () => {
const session = new UserSession([createStubSource("test-a"), createStubSource("test-b")]) const session = new UserSession("test-user", [
createStubSource("test-a"),
createStubSource("test-b"),
])
const result = await session.engine.refresh() const result = await session.engine.refresh()
@@ -34,7 +37,7 @@ describe("UserSession", () => {
test("getSource returns registered source", () => { test("getSource returns registered source", () => {
const location = new LocationSource() const location = new LocationSource()
const session = new UserSession([location]) const session = new UserSession("test-user", [location])
const result = session.getSource<LocationSource>("aelis.location") const result = session.getSource<LocationSource>("aelis.location")
@@ -42,13 +45,13 @@ describe("UserSession", () => {
}) })
test("getSource returns undefined for unknown source", () => { test("getSource returns undefined for unknown source", () => {
const session = new UserSession([createStubSource("test")]) const session = new UserSession("test-user", [createStubSource("test")])
expect(session.getSource("unknown")).toBeUndefined() expect(session.getSource("unknown")).toBeUndefined()
}) })
test("destroy stops engine and clears sources", () => { test("destroy stops engine and clears sources", () => {
const session = new UserSession([createStubSource("test")]) const session = new UserSession("test-user", [createStubSource("test")])
session.destroy() session.destroy()
@@ -57,7 +60,7 @@ describe("UserSession", () => {
test("engine.executeAction routes to correct source", async () => { test("engine.executeAction routes to correct source", async () => {
const location = new LocationSource() const location = new LocationSource()
const session = new UserSession([location]) const session = new UserSession("test-user", [location])
await session.engine.executeAction("aelis.location", "update-location", { await session.engine.executeAction("aelis.location", "update-location", {
lat: 51.5, lat: 51.5,
@@ -82,7 +85,7 @@ describe("UserSession.feed", () => {
data: { value: 42 }, data: { value: 42 },
}, },
] ]
const session = new UserSession([createStubSource("test", items)]) const session = new UserSession("test-user", [createStubSource("test", items)])
const result = await session.feed() const result = await session.feed()
@@ -103,7 +106,7 @@ describe("UserSession.feed", () => {
const enhancer = async (feedItems: FeedItem[]) => const enhancer = async (feedItems: FeedItem[]) =>
feedItems.map((item) => ({ ...item, data: { ...item.data, enhanced: true } })) feedItems.map((item) => ({ ...item, data: { ...item.data, enhanced: true } }))
const session = new UserSession([createStubSource("test", items)], enhancer) const session = new UserSession("test-user", [createStubSource("test", items)], enhancer)
const result = await session.feed() const result = await session.feed()
@@ -127,7 +130,7 @@ describe("UserSession.feed", () => {
return feedItems.map((item) => ({ ...item, data: { ...item.data, enhanced: true } })) return feedItems.map((item) => ({ ...item, data: { ...item.data, enhanced: true } }))
} }
const session = new UserSession([createStubSource("test", items)], enhancer) const session = new UserSession("test-user", [createStubSource("test", items)], enhancer)
const result1 = await session.feed() const result1 = await session.feed()
expect(result1.items[0]!.data.enhanced).toBe(true) expect(result1.items[0]!.data.enhanced).toBe(true)
@@ -162,7 +165,7 @@ describe("UserSession.feed", () => {
})) }))
} }
const session = new UserSession([source], enhancer) const session = new UserSession("test-user", [source], enhancer)
// First feed triggers refresh + enhancement // First feed triggers refresh + enhancement
const result1 = await session.feed() const result1 = await session.feed()
@@ -205,7 +208,7 @@ describe("UserSession.feed", () => {
throw new Error("enhancement exploded") throw new Error("enhancement exploded")
} }
const session = new UserSession([createStubSource("test", items)], enhancer) const session = new UserSession("test-user", [createStubSource("test", items)], enhancer)
const result = await session.feed() const result = await session.feed()
@@ -237,7 +240,7 @@ describe("UserSession.replaceSource", () => {
] ]
const sourceA = createStubSource("test", itemsA) const sourceA = createStubSource("test", itemsA)
const session = new UserSession([sourceA]) const session = new UserSession("test-user", [sourceA])
const result1 = await session.feed() const result1 = await session.feed()
expect(result1.items).toHaveLength(1) expect(result1.items).toHaveLength(1)
@@ -253,7 +256,7 @@ describe("UserSession.replaceSource", () => {
test("getSource returns new source after replace", () => { test("getSource returns new source after replace", () => {
const sourceA = createStubSource("test") const sourceA = createStubSource("test")
const session = new UserSession([sourceA]) const session = new UserSession("test-user", [sourceA])
const sourceB = createStubSource("test") const sourceB = createStubSource("test")
session.replaceSource("test", sourceB) session.replaceSource("test", sourceB)
@@ -263,7 +266,7 @@ describe("UserSession.replaceSource", () => {
}) })
test("throws when replacing a source that is not registered", () => { test("throws when replacing a source that is not registered", () => {
const session = new UserSession([createStubSource("test")]) const session = new UserSession("test-user", [createStubSource("test")])
expect(() => session.replaceSource("nonexistent", createStubSource("other"))).toThrow( expect(() => session.replaceSource("nonexistent", createStubSource("other"))).toThrow(
'Cannot replace source "nonexistent": not registered', 'Cannot replace source "nonexistent": not registered',
@@ -289,7 +292,7 @@ describe("UserSession.replaceSource", () => {
data: { from: "b" }, data: { from: "b" },
}, },
]) ])
const session = new UserSession([sourceA, sourceB]) const session = new UserSession("test-user", [sourceA, sourceB])
const replacement = createStubSource("source-a", [ const replacement = createStubSource("source-a", [
{ {
@@ -325,7 +328,7 @@ describe("UserSession.replaceSource", () => {
return feedItems.map((item) => ({ ...item, data: { ...item.data, enhanced: true } })) return feedItems.map((item) => ({ ...item, data: { ...item.data, enhanced: true } }))
} }
const session = new UserSession([createStubSource("test", items)], enhancer) const session = new UserSession("test-user", [createStubSource("test", items)], enhancer)
await session.feed() await session.feed()
expect(enhanceCount).toBe(1) expect(enhanceCount).toBe(1)
@@ -350,7 +353,10 @@ describe("UserSession.replaceSource", () => {
describe("UserSession.removeSource", () => { describe("UserSession.removeSource", () => {
test("removes source from engine and sources map", () => { test("removes source from engine and sources map", () => {
const session = new UserSession([createStubSource("test-a"), createStubSource("test-b")]) const session = new UserSession("test-user", [
createStubSource("test-a"),
createStubSource("test-b"),
])
session.removeSource("test-a") session.removeSource("test-a")
@@ -368,7 +374,7 @@ describe("UserSession.removeSource", () => {
data: {}, data: {},
}, },
] ]
const session = new UserSession([createStubSource("test", items)]) const session = new UserSession("test-user", [createStubSource("test", items)])
const result1 = await session.feed() const result1 = await session.feed()
expect(result1.items).toHaveLength(1) expect(result1.items).toHaveLength(1)
@@ -380,7 +386,7 @@ describe("UserSession.removeSource", () => {
}) })
test("is a no-op for unknown source", () => { test("is a no-op for unknown source", () => {
const session = new UserSession([createStubSource("test")]) const session = new UserSession("test-user", [createStubSource("test")])
expect(() => session.removeSource("unknown")).not.toThrow() expect(() => session.removeSource("unknown")).not.toThrow()
expect(session.getSource("test")).toBeDefined() expect(session.getSource("test")).toBeDefined()

View File

@@ -3,6 +3,7 @@ import { FeedEngine, type FeedItem, type FeedResult, type FeedSource } from "@ae
import type { FeedEnhancer } from "../enhancement/enhance-feed.ts" import type { FeedEnhancer } from "../enhancement/enhance-feed.ts"
export class UserSession { export class UserSession {
readonly userId: string
readonly engine: FeedEngine readonly engine: FeedEngine
private sources = new Map<string, FeedSource>() private sources = new Map<string, FeedSource>()
private readonly enhancer: FeedEnhancer | null private readonly enhancer: FeedEnhancer | null
@@ -12,7 +13,8 @@ export class UserSession {
private enhancingPromise: Promise<void> | null = null private enhancingPromise: Promise<void> | null = null
private unsubscribe: (() => void) | null = null private unsubscribe: (() => void) | null = null
constructor(sources: FeedSource[], enhancer?: FeedEnhancer | null) { constructor(userId: string, sources: FeedSource[], enhancer?: FeedEnhancer | null) {
this.userId = userId
this.engine = new FeedEngine() this.engine = new FeedEngine()
this.enhancer = enhancer ?? null this.enhancer = enhancer ?? null
for (const source of sources) { for (const source of sources) {
@@ -67,6 +69,36 @@ export class UserSession {
return this.sources.get(sourceId) as T | undefined return this.sources.get(sourceId) as T | undefined
} }
hasSource(sourceId: string): boolean {
return this.sources.has(sourceId)
}
/**
* Registers a new source in the engine and invalidates all caches.
* Stops and restarts the engine to establish reactive subscriptions.
*/
addSource(source: FeedSource): void {
if (this.sources.has(source.id)) {
throw new Error(`Cannot add source "${source.id}": already registered`)
}
const wasStarted = this.engine.isStarted()
if (wasStarted) {
this.engine.stop()
}
this.engine.register(source)
this.sources.set(source.id, source)
this.invalidateEnhancement()
this.enhancingPromise = null
if (wasStarted) {
this.engine.start()
}
}
/** /**
* Replaces a source in the engine and invalidates all caches. * Replaces a source in the engine and invalidates all caches.
* Stops and restarts the engine to re-establish reactive subscriptions. * Stops and restarts the engine to re-establish reactive subscriptions.

View File

@@ -1,21 +1,3 @@
/**
* Thrown by a FeedSourceProvider when the source is not enabled for a user.
*
* UserSessionManager's Promise.allSettled handles this gracefully —
* the source is excluded from the session without crashing.
*/
export class SourceDisabledError extends Error {
readonly sourceId: string
readonly userId: string
constructor(sourceId: string, userId: string) {
super(`Source "${sourceId}" is not enabled for user "${userId}"`)
this.name = "SourceDisabledError"
this.sourceId = sourceId
this.userId = userId
}
}
/** /**
* Thrown when an operation targets a user source that doesn't exist. * Thrown when an operation targets a user source that doesn't exist.
*/ */
@@ -30,3 +12,15 @@ export class SourceNotFoundError extends Error {
this.userId = userId this.userId = userId
} }
} }
/**
* Thrown when a source config update fails schema validation.
*/
export class InvalidSourceConfigError extends Error {
readonly sourceId: string
constructor(sourceId: string, summary: string) {
super(summary)
this.sourceId = sourceId
}
}

View File

@@ -0,0 +1,628 @@
import type { ActionDefinition, ContextEntry, FeedItem, FeedSource } from "@aelis/core"
import { describe, expect, mock, spyOn, test } from "bun:test"
import { Hono } from "hono"
import type { Database } from "../db/index.ts"
import type { ConfigSchema, FeedSourceProvider } from "../session/feed-source-provider.ts"
import { mockAuthSessionMiddleware } from "../auth/session-middleware.ts"
import { UserSessionManager } from "../session/user-session-manager.ts"
import { tflConfig } from "../tfl/provider.ts"
import { weatherConfig } from "../weather/provider.ts"
import { SourceNotFoundError } from "./errors.ts"
import { registerSourcesHttpHandlers } from "./http.ts"
// ---------------------------------------------------------------------------
// Helpers
// ---------------------------------------------------------------------------
function createStubSource(id: string): FeedSource {
return {
id,
async listActions(): Promise<Record<string, ActionDefinition>> {
return {}
},
async executeAction(): Promise<unknown> {
return undefined
},
async fetchContext(): Promise<readonly ContextEntry[] | null> {
return null
},
async fetchItems(): Promise<FeedItem[]> {
return []
},
}
}
function createStubProvider(sourceId: string, configSchema?: ConfigSchema): FeedSourceProvider {
return {
sourceId,
configSchema,
async feedSourceForUser() {
return createStubSource(sourceId)
},
}
}
const MOCK_USER_ID = "k7Gx2mPqRvNwYs9TdLfA4bHcJeUo1iZn"
type SourceRow = {
userId: string
sourceId: string
enabled: boolean
config: Record<string, unknown>
}
function createInMemoryStore() {
const rows = new Map<string, SourceRow>()
function key(userId: string, sourceId: string) {
return `${userId}:${sourceId}`
}
return {
rows,
seed(userId: string, sourceId: string, data: Partial<SourceRow> = {}) {
rows.set(key(userId, sourceId), {
userId,
sourceId,
enabled: data.enabled ?? true,
config: data.config ?? {},
})
},
forUser(userId: string) {
return {
async enabled() {
return [...rows.values()].filter((r) => r.userId === userId && r.enabled)
},
async find(sourceId: string) {
return rows.get(key(userId, sourceId))
},
async updateConfig(sourceId: string, update: { enabled?: boolean; config?: unknown }) {
const existing = rows.get(key(userId, sourceId))
if (!existing) {
throw new SourceNotFoundError(sourceId, userId)
}
if (update.enabled !== undefined) {
existing.enabled = update.enabled
}
if (update.config !== undefined) {
existing.config = update.config as Record<string, unknown>
}
},
async upsertConfig(sourceId: string, data: { enabled: boolean; config: unknown }) {
const existing = rows.get(key(userId, sourceId))
if (existing) {
existing.enabled = data.enabled
existing.config = data.config as Record<string, unknown>
} else {
rows.set(key(userId, sourceId), {
userId,
sourceId,
enabled: data.enabled,
config: (data.config ?? {}) as Record<string, unknown>,
})
}
},
}
},
}
}
let activeStore: ReturnType<typeof createInMemoryStore>
mock.module("../sources/user-sources.ts", () => ({
sources(_db: unknown, userId: string) {
return activeStore.forUser(userId)
},
}))
const fakeDb = {} as Database
function createApp(providers: FeedSourceProvider[], userId?: string) {
const sessionManager = new UserSessionManager({ providers, db: fakeDb })
const app = new Hono()
registerSourcesHttpHandlers(app, {
sessionManager,
authSessionMiddleware: mockAuthSessionMiddleware(userId),
})
return { app, sessionManager }
}
function patch(app: Hono, sourceId: string, body: unknown) {
return app.request(`/api/sources/${sourceId}`, {
method: "PATCH",
headers: { "Content-Type": "application/json" },
body: JSON.stringify(body),
})
}
function get(app: Hono, sourceId: string) {
return app.request(`/api/sources/${sourceId}`, { method: "GET" })
}
function put(app: Hono, sourceId: string, body: unknown) {
return app.request(`/api/sources/${sourceId}`, {
method: "PUT",
headers: { "Content-Type": "application/json" },
body: JSON.stringify(body),
})
}
// ---------------------------------------------------------------------------
// Tests
// ---------------------------------------------------------------------------
describe("GET /api/sources/:sourceId", () => {
test("returns 401 without auth", async () => {
activeStore = createInMemoryStore()
const { app } = createApp([createStubProvider("aelis.weather", weatherConfig)])
const res = await get(app, "aelis.weather")
expect(res.status).toBe(401)
})
test("returns 404 for unknown source", async () => {
activeStore = createInMemoryStore()
const { app } = createApp([createStubProvider("aelis.weather", weatherConfig)], MOCK_USER_ID)
const res = await get(app, "unknown.source")
expect(res.status).toBe(404)
const body = (await res.json()) as { error: string }
expect(body.error).toContain("not found")
})
test("returns enabled and config for existing source", async () => {
activeStore = createInMemoryStore()
activeStore.seed(MOCK_USER_ID, "aelis.weather", {
enabled: true,
config: { units: "metric" },
})
const { app } = createApp([createStubProvider("aelis.weather", weatherConfig)], MOCK_USER_ID)
const res = await get(app, "aelis.weather")
expect(res.status).toBe(200)
const body = (await res.json()) as { enabled: boolean; config: unknown }
expect(body.enabled).toBe(true)
expect(body.config).toEqual({ units: "metric" })
})
test("returns defaults when user has no row for source", async () => {
activeStore = createInMemoryStore()
const { app } = createApp([createStubProvider("aelis.weather", weatherConfig)], MOCK_USER_ID)
const res = await get(app, "aelis.weather")
expect(res.status).toBe(200)
const body = (await res.json()) as { enabled: boolean; config: unknown }
expect(body.enabled).toBe(false)
expect(body.config).toEqual({})
})
test("returns disabled source", async () => {
activeStore = createInMemoryStore()
activeStore.seed(MOCK_USER_ID, "aelis.weather", {
enabled: false,
config: { units: "imperial" },
})
const { app } = createApp([createStubProvider("aelis.weather", weatherConfig)], MOCK_USER_ID)
const res = await get(app, "aelis.weather")
expect(res.status).toBe(200)
const body = (await res.json()) as { enabled: boolean; config: unknown }
expect(body.enabled).toBe(false)
expect(body.config).toEqual({ units: "imperial" })
})
})
describe("PATCH /api/sources/:sourceId", () => {
test("returns 401 without auth", async () => {
activeStore = createInMemoryStore()
const { app } = createApp([createStubProvider("aelis.weather", weatherConfig)])
const res = await patch(app, "aelis.weather", { enabled: true })
expect(res.status).toBe(401)
})
test("returns 404 for unknown source", async () => {
activeStore = createInMemoryStore()
const { app } = createApp([createStubProvider("aelis.weather", weatherConfig)], MOCK_USER_ID)
const res = await patch(app, "unknown.source", { enabled: true })
expect(res.status).toBe(404)
const body = (await res.json()) as { error: string }
expect(body.error).toContain("not found")
})
test("returns 404 when user has no existing row for source", async () => {
activeStore = createInMemoryStore()
const { app } = createApp([createStubProvider("aelis.weather", weatherConfig)], MOCK_USER_ID)
const res = await patch(app, "aelis.weather", { enabled: true })
expect(res.status).toBe(404)
const body = (await res.json()) as { error: string }
expect(body.error).toContain("not found")
})
test("returns 204 when body is empty object (no-op) on existing source", async () => {
activeStore = createInMemoryStore()
activeStore.seed(MOCK_USER_ID, "aelis.weather")
const { app } = createApp([createStubProvider("aelis.weather", weatherConfig)], MOCK_USER_ID)
const res = await patch(app, "aelis.weather", {})
expect(res.status).toBe(204)
})
test("returns 404 when body is empty object on nonexistent user source", async () => {
activeStore = createInMemoryStore()
const { app } = createApp([createStubProvider("aelis.weather", weatherConfig)], MOCK_USER_ID)
const res = await patch(app, "aelis.weather", {})
expect(res.status).toBe(404)
})
test("returns 400 for invalid JSON body", async () => {
activeStore = createInMemoryStore()
activeStore.seed(MOCK_USER_ID, "aelis.weather")
const { app } = createApp([createStubProvider("aelis.weather", weatherConfig)], MOCK_USER_ID)
const res = await app.request("/api/sources/aelis.weather", {
method: "PATCH",
headers: { "Content-Type": "application/json" },
body: "not json",
})
expect(res.status).toBe(400)
const body = (await res.json()) as { error: string }
expect(body.error).toContain("Invalid JSON")
})
test("returns 400 when weather config fails validation", async () => {
activeStore = createInMemoryStore()
activeStore.seed(MOCK_USER_ID, "aelis.weather")
const { app } = createApp([createStubProvider("aelis.weather", weatherConfig)], MOCK_USER_ID)
const res = await patch(app, "aelis.weather", {
config: { units: "invalid" },
})
expect(res.status).toBe(400)
})
test("returns 204 and updates enabled", async () => {
activeStore = createInMemoryStore()
activeStore.seed(MOCK_USER_ID, "aelis.weather", {
enabled: true,
config: { units: "metric" },
})
const { app } = createApp([createStubProvider("aelis.weather", weatherConfig)], MOCK_USER_ID)
const res = await patch(app, "aelis.weather", { enabled: false })
expect(res.status).toBe(204)
const row = activeStore.rows.get(`${MOCK_USER_ID}:aelis.weather`)
expect(row!.enabled).toBe(false)
expect(row!.config).toEqual({ units: "metric" })
})
test("returns 204 and updates config", async () => {
activeStore = createInMemoryStore()
activeStore.seed(MOCK_USER_ID, "aelis.weather", {
config: { units: "metric" },
})
const { app } = createApp([createStubProvider("aelis.weather", weatherConfig)], MOCK_USER_ID)
const res = await patch(app, "aelis.weather", {
config: { units: "imperial" },
})
expect(res.status).toBe(204)
const row = activeStore.rows.get(`${MOCK_USER_ID}:aelis.weather`)
expect(row!.config).toEqual({ units: "imperial" })
})
test("preserves config when only updating enabled", async () => {
activeStore = createInMemoryStore()
activeStore.seed(MOCK_USER_ID, "aelis.tfl", {
enabled: true,
config: { lines: ["bakerloo"] },
})
const { app } = createApp([createStubProvider("aelis.tfl", tflConfig)], MOCK_USER_ID)
const res = await patch(app, "aelis.tfl", { enabled: false })
expect(res.status).toBe(204)
const row = activeStore.rows.get(`${MOCK_USER_ID}:aelis.tfl`)
expect(row!.enabled).toBe(false)
expect(row!.config).toEqual({ lines: ["bakerloo"] })
})
test("deep-merges config on update", async () => {
activeStore = createInMemoryStore()
activeStore.seed(MOCK_USER_ID, "aelis.weather", {
config: { units: "metric", hourlyLimit: 12 },
})
const { app } = createApp([createStubProvider("aelis.weather", weatherConfig)], MOCK_USER_ID)
const res = await patch(app, "aelis.weather", {
config: { dailyLimit: 5 },
})
expect(res.status).toBe(204)
const row = activeStore.rows.get(`${MOCK_USER_ID}:aelis.weather`)
expect(row!.config).toEqual({
units: "metric",
hourlyLimit: 12,
dailyLimit: 5,
})
})
test("refreshes source in active session after config update", async () => {
activeStore = createInMemoryStore()
activeStore.seed(MOCK_USER_ID, "aelis.weather", {
config: { units: "metric" },
})
const { app, sessionManager } = createApp(
[createStubProvider("aelis.weather", weatherConfig)],
MOCK_USER_ID,
)
const session = await sessionManager.getOrCreate(MOCK_USER_ID)
const replaceSpy = spyOn(session, "replaceSource")
const res = await patch(app, "aelis.weather", {
config: { units: "imperial" },
})
expect(res.status).toBe(204)
expect(replaceSpy).toHaveBeenCalled()
replaceSpy.mockRestore()
})
test("removes source from session when disabled", async () => {
activeStore = createInMemoryStore()
activeStore.seed(MOCK_USER_ID, "aelis.weather", {
enabled: true,
config: { units: "metric" },
})
const { app, sessionManager } = createApp(
[createStubProvider("aelis.weather", weatherConfig)],
MOCK_USER_ID,
)
const session = await sessionManager.getOrCreate(MOCK_USER_ID)
const removeSpy = spyOn(session, "removeSource")
const res = await patch(app, "aelis.weather", { enabled: false })
expect(res.status).toBe(204)
expect(removeSpy).toHaveBeenCalledWith("aelis.weather")
removeSpy.mockRestore()
})
test("accepts location source with arbitrary config (no schema)", async () => {
activeStore = createInMemoryStore()
activeStore.seed(MOCK_USER_ID, "aelis.location")
const { app } = createApp([createStubProvider("aelis.location")], MOCK_USER_ID)
const res = await patch(app, "aelis.location", {
config: { something: "value" },
})
expect(res.status).toBe(204)
})
test("updates enabled on location source", async () => {
activeStore = createInMemoryStore()
activeStore.seed(MOCK_USER_ID, "aelis.location", { enabled: true })
const { app } = createApp([createStubProvider("aelis.location")], MOCK_USER_ID)
const res = await patch(app, "aelis.location", { enabled: false })
expect(res.status).toBe(204)
const row = activeStore.rows.get(`${MOCK_USER_ID}:aelis.location`)
expect(row!.enabled).toBe(false)
})
})
// ---------------------------------------------------------------------------
// PUT /api/sources/:sourceId
// ---------------------------------------------------------------------------
describe("PUT /api/sources/:sourceId", () => {
test("returns 401 without auth", async () => {
activeStore = createInMemoryStore()
const { app } = createApp([createStubProvider("aelis.weather", weatherConfig)])
const res = await put(app, "aelis.weather", { enabled: true, config: {} })
expect(res.status).toBe(401)
})
test("returns 404 for unknown source", async () => {
activeStore = createInMemoryStore()
const { app } = createApp([createStubProvider("aelis.weather", weatherConfig)], MOCK_USER_ID)
const res = await put(app, "unknown.source", { enabled: true, config: {} })
expect(res.status).toBe(404)
const body = (await res.json()) as { error: string }
expect(body.error).toContain("not found")
})
test("returns 400 for invalid JSON", async () => {
activeStore = createInMemoryStore()
const { app } = createApp([createStubProvider("aelis.weather", weatherConfig)], MOCK_USER_ID)
const res = await app.request("/api/sources/aelis.weather", {
method: "PUT",
headers: { "Content-Type": "application/json" },
body: "not json",
})
expect(res.status).toBe(400)
const body = (await res.json()) as { error: string }
expect(body.error).toContain("Invalid JSON")
})
test("returns 400 when enabled is missing", async () => {
activeStore = createInMemoryStore()
const { app } = createApp([createStubProvider("aelis.weather", weatherConfig)], MOCK_USER_ID)
const res = await put(app, "aelis.weather", { config: {} })
expect(res.status).toBe(400)
})
test("returns 400 when config is missing", async () => {
activeStore = createInMemoryStore()
const { app } = createApp([createStubProvider("aelis.weather", weatherConfig)], MOCK_USER_ID)
const res = await put(app, "aelis.weather", { enabled: true })
expect(res.status).toBe(400)
})
test("returns 400 when config fails schema validation", async () => {
activeStore = createInMemoryStore()
const { app } = createApp([createStubProvider("aelis.weather", weatherConfig)], MOCK_USER_ID)
const res = await put(app, "aelis.weather", {
enabled: true,
config: { units: "invalid" },
})
expect(res.status).toBe(400)
})
test("returns 204 and inserts when row does not exist", async () => {
activeStore = createInMemoryStore()
const { app } = createApp([createStubProvider("aelis.weather", weatherConfig)], MOCK_USER_ID)
const res = await put(app, "aelis.weather", {
enabled: true,
config: { units: "metric" },
})
expect(res.status).toBe(204)
const row = activeStore.rows.get(`${MOCK_USER_ID}:aelis.weather`)
expect(row).toBeDefined()
expect(row!.enabled).toBe(true)
expect(row!.config).toEqual({ units: "metric" })
})
test("returns 204 and fully replaces existing row", async () => {
activeStore = createInMemoryStore()
activeStore.seed(MOCK_USER_ID, "aelis.weather", {
enabled: true,
config: { units: "metric", hourlyLimit: 12 },
})
const { app } = createApp([createStubProvider("aelis.weather", weatherConfig)], MOCK_USER_ID)
const res = await put(app, "aelis.weather", {
enabled: false,
config: { units: "imperial" },
})
expect(res.status).toBe(204)
const row = activeStore.rows.get(`${MOCK_USER_ID}:aelis.weather`)
expect(row!.enabled).toBe(false)
// hourlyLimit should be gone — full replace, not merge
expect(row!.config).toEqual({ units: "imperial" })
})
test("refreshes source in active session after upsert", async () => {
activeStore = createInMemoryStore()
activeStore.seed(MOCK_USER_ID, "aelis.weather", {
config: { units: "metric" },
})
const { app, sessionManager } = createApp(
[createStubProvider("aelis.weather", weatherConfig)],
MOCK_USER_ID,
)
const session = await sessionManager.getOrCreate(MOCK_USER_ID)
const replaceSpy = spyOn(session, "replaceSource")
const res = await put(app, "aelis.weather", {
enabled: true,
config: { units: "imperial" },
})
expect(res.status).toBe(204)
expect(replaceSpy).toHaveBeenCalled()
replaceSpy.mockRestore()
})
test("removes source from session when disabled via upsert", async () => {
activeStore = createInMemoryStore()
activeStore.seed(MOCK_USER_ID, "aelis.weather", {
enabled: true,
config: { units: "metric" },
})
const { app, sessionManager } = createApp(
[createStubProvider("aelis.weather", weatherConfig)],
MOCK_USER_ID,
)
const session = await sessionManager.getOrCreate(MOCK_USER_ID)
const removeSpy = spyOn(session, "removeSource")
const res = await put(app, "aelis.weather", {
enabled: false,
config: { units: "metric" },
})
expect(res.status).toBe(204)
expect(removeSpy).toHaveBeenCalledWith("aelis.weather")
removeSpy.mockRestore()
})
test("adds source to active session when inserting a new source", async () => {
activeStore = createInMemoryStore()
// Seed a different source so the session can be created
activeStore.seed(MOCK_USER_ID, "aelis.location", { enabled: true })
const { app, sessionManager } = createApp(
[createStubProvider("aelis.location"), createStubProvider("aelis.weather", weatherConfig)],
MOCK_USER_ID,
)
// Create session — only has aelis.location
const session = await sessionManager.getOrCreate(MOCK_USER_ID)
expect(session.hasSource("aelis.weather")).toBe(false)
// PUT a new source that didn't exist before
const res = await put(app, "aelis.weather", {
enabled: true,
config: { units: "metric" },
})
expect(res.status).toBe(204)
expect(session.hasSource("aelis.weather")).toBe(true)
})
test("accepts location source with arbitrary config (no schema)", async () => {
activeStore = createInMemoryStore()
const { app } = createApp([createStubProvider("aelis.location")], MOCK_USER_ID)
const res = await put(app, "aelis.location", {
enabled: true,
config: { something: "value" },
})
expect(res.status).toBe(204)
const row = activeStore.rows.get(`${MOCK_USER_ID}:aelis.location`)
expect(row).toBeDefined()
expect(row!.config).toEqual({ something: "value" })
})
})

View File

@@ -0,0 +1,158 @@
import type { Context, Hono } from "hono"
import { type } from "arktype"
import { createMiddleware } from "hono/factory"
import type { AuthSessionMiddleware } from "../auth/session-middleware.ts"
import type { UserSessionManager } from "../session/index.ts"
import { InvalidSourceConfigError, SourceNotFoundError } from "./errors.ts"
type Env = {
Variables: {
sessionManager: UserSessionManager
}
}
interface SourcesHttpHandlersDeps {
sessionManager: UserSessionManager
authSessionMiddleware: AuthSessionMiddleware
}
const UpdateSourceConfigRequestBody = type({
"enabled?": "boolean",
"config?": "unknown",
})
const ReplaceSourceConfigRequestBody = type({
enabled: "boolean",
config: "unknown",
})
export function registerSourcesHttpHandlers(
app: Hono,
{ sessionManager, authSessionMiddleware }: SourcesHttpHandlersDeps,
) {
const inject = createMiddleware<Env>(async (c, next) => {
c.set("sessionManager", sessionManager)
await next()
})
app.get("/api/sources/:sourceId", inject, authSessionMiddleware, handleGetSource)
app.patch("/api/sources/:sourceId", inject, authSessionMiddleware, handleUpdateSource)
app.put("/api/sources/:sourceId", inject, authSessionMiddleware, handleReplaceSource)
}
async function handleGetSource(c: Context<Env>) {
const sourceId = c.req.param("sourceId")
if (!sourceId) {
return c.body(null, 404)
}
const sessionManager = c.get("sessionManager")
const user = c.get("user")!
try {
const result = await sessionManager.fetchSourceConfig(user.id, sourceId)
return c.json(result)
} catch (err) {
if (err instanceof SourceNotFoundError) {
return c.json({ error: err.message }, 404)
}
throw err
}
}
async function handleUpdateSource(c: Context<Env>) {
const sourceId = c.req.param("sourceId")
if (!sourceId) {
return c.body(null, 404)
}
const sessionManager = c.get("sessionManager")
// Validate source exists as a registered provider
const provider = sessionManager.getProvider(sourceId)
if (!provider) {
return c.json({ error: `Source "${sourceId}" not found` }, 404)
}
// Parse request body
let body: unknown
try {
body = await c.req.json()
} catch {
return c.json({ error: "Invalid JSON" }, 400)
}
const parsed = UpdateSourceConfigRequestBody(body)
if (parsed instanceof type.errors) {
return c.json({ error: parsed.summary }, 400)
}
const { enabled, config: newConfig } = parsed
const user = c.get("user")!
try {
await sessionManager.updateSourceConfig(user.id, sourceId, {
enabled,
config: newConfig,
})
} catch (err) {
if (err instanceof SourceNotFoundError) {
return c.json({ error: err.message }, 404)
}
if (err instanceof InvalidSourceConfigError) {
return c.json({ error: err.message }, 400)
}
throw err
}
return c.body(null, 204)
}
async function handleReplaceSource(c: Context<Env>) {
const sourceId = c.req.param("sourceId")
if (!sourceId) {
return c.body(null, 404)
}
const sessionManager = c.get("sessionManager")
const provider = sessionManager.getProvider(sourceId)
if (!provider) {
return c.json({ error: `Source "${sourceId}" not found` }, 404)
}
let body: unknown
try {
body = await c.req.json()
} catch {
return c.json({ error: "Invalid JSON" }, 400)
}
const parsed = ReplaceSourceConfigRequestBody(body)
if (parsed instanceof type.errors) {
return c.json({ error: parsed.summary }, 400)
}
const { enabled, config } = parsed
const user = c.get("user")!
try {
await sessionManager.upsertSourceConfig(user.id, sourceId, {
enabled,
config,
})
} catch (err) {
if (err instanceof SourceNotFoundError) {
return c.json({ error: err.message }, 404)
}
if (err instanceof InvalidSourceConfigError) {
return c.json({ error: err.message }, 400)
}
throw err
}
return c.body(null, 204)
}

View File

@@ -52,14 +52,46 @@ export function sources(db: Database, userId: string) {
} }
}, },
/** Creates or updates the config for a source. */ /** Updates an existing user source row. Throws if the row doesn't exist. */
async upsertConfig(sourceId: string, config: Record<string, unknown>) { async updateConfig(sourceId: string, update: { enabled?: boolean; config?: unknown }) {
const set: Record<string, unknown> = { updatedAt: new Date() }
if (update.enabled !== undefined) {
set.enabled = update.enabled
}
if (update.config !== undefined) {
set.config = update.config
}
const rows = await db
.update(userSources)
.set(set)
.where(and(eq(userSources.userId, userId), eq(userSources.sourceId, sourceId)))
.returning({ id: userSources.id })
if (rows.length === 0) {
throw new SourceNotFoundError(sourceId, userId)
}
},
/** Inserts a new user source row or fully replaces enabled/config on an existing one. */
async upsertConfig(sourceId: string, data: { enabled: boolean; config: unknown }) {
const now = new Date()
await db await db
.insert(userSources) .insert(userSources)
.values({ userId, sourceId, config }) .values({
userId,
sourceId,
enabled: data.enabled,
config: data.config,
createdAt: now,
updatedAt: now,
})
.onConflictDoUpdate({ .onConflictDoUpdate({
target: [userSources.userId, userSources.sourceId], target: [userSources.userId, userSources.sourceId],
set: { config, updatedAt: new Date() }, set: {
enabled: data.enabled,
config: data.config,
updatedAt: now,
},
}) })
}, },

View File

@@ -1,42 +1,31 @@
import { TflSource, type ITflApi, type TflLineId } from "@aelis/source-tfl" import { TflSource, type ITflApi, type TflLineId } from "@aelis/source-tfl"
import { type } from "arktype" import { type } from "arktype"
import type { Database } from "../db/index.ts"
import type { FeedSourceProvider } from "../session/feed-source-provider.ts" import type { FeedSourceProvider } from "../session/feed-source-provider.ts"
import { SourceDisabledError } from "../sources/errors.ts"
import { sources } from "../sources/user-sources.ts"
export type TflSourceProviderOptions = export type TflSourceProviderOptions =
| { db: Database; apiKey: string; client?: never } | { apiKey: string; client?: never }
| { db: Database; apiKey?: never; client: ITflApi } | { apiKey?: never; client: ITflApi }
const tflConfig = type({ export const tflConfig = type({
"lines?": "string[]", "lines?": "string[]",
}) })
export class TflSourceProvider implements FeedSourceProvider { export class TflSourceProvider implements FeedSourceProvider {
readonly sourceId = "aelis.tfl" readonly sourceId = "aelis.tfl"
private readonly db: Database readonly configSchema = tflConfig
private readonly apiKey: string | undefined private readonly apiKey: string | undefined
private readonly client: ITflApi | undefined private readonly client: ITflApi | undefined
constructor(options: TflSourceProviderOptions) { constructor(options: TflSourceProviderOptions) {
this.db = options.db
this.apiKey = "apiKey" in options ? options.apiKey : undefined this.apiKey = "apiKey" in options ? options.apiKey : undefined
this.client = "client" in options ? options.client : undefined this.client = "client" in options ? options.client : undefined
} }
async feedSourceForUser(userId: string): Promise<TflSource> { async feedSourceForUser(_userId: string, config: unknown): Promise<TflSource> {
const row = await sources(this.db, userId).find("aelis.tfl") const parsed = tflConfig(config)
if (!row || !row.enabled) {
throw new SourceDisabledError("aelis.tfl", userId)
}
const parsed = tflConfig(row.config ?? {})
if (parsed instanceof type.errors) { if (parsed instanceof type.errors) {
throw new Error(`Invalid TFL config for user ${userId}: ${parsed.summary}`) throw new Error(`Invalid TFL config: ${parsed.summary}`)
} }
return new TflSource({ return new TflSource({

View File

@@ -1,19 +1,14 @@
import { WeatherSource, type WeatherSourceOptions } from "@aelis/source-weatherkit" import { WeatherSource, type WeatherSourceOptions } from "@aelis/source-weatherkit"
import { type } from "arktype" import { type } from "arktype"
import type { Database } from "../db/index.ts"
import type { FeedSourceProvider } from "../session/feed-source-provider.ts" import type { FeedSourceProvider } from "../session/feed-source-provider.ts"
import { SourceDisabledError } from "../sources/errors.ts"
import { sources } from "../sources/user-sources.ts"
export interface WeatherSourceProviderOptions { export interface WeatherSourceProviderOptions {
db: Database
credentials: WeatherSourceOptions["credentials"] credentials: WeatherSourceOptions["credentials"]
client?: WeatherSourceOptions["client"] client?: WeatherSourceOptions["client"]
} }
const weatherConfig = type({ export const weatherConfig = type({
"units?": "'metric' | 'imperial'", "units?": "'metric' | 'imperial'",
"hourlyLimit?": "number", "hourlyLimit?": "number",
"dailyLimit?": "number", "dailyLimit?": "number",
@@ -21,26 +16,19 @@ const weatherConfig = type({
export class WeatherSourceProvider implements FeedSourceProvider { export class WeatherSourceProvider implements FeedSourceProvider {
readonly sourceId = "aelis.weather" readonly sourceId = "aelis.weather"
private readonly db: Database readonly configSchema = weatherConfig
private readonly credentials: WeatherSourceOptions["credentials"] private readonly credentials: WeatherSourceOptions["credentials"]
private readonly client: WeatherSourceOptions["client"] private readonly client: WeatherSourceOptions["client"]
constructor(options: WeatherSourceProviderOptions) { constructor(options: WeatherSourceProviderOptions) {
this.db = options.db
this.credentials = options.credentials this.credentials = options.credentials
this.client = options.client this.client = options.client
} }
async feedSourceForUser(userId: string): Promise<WeatherSource> { async feedSourceForUser(_userId: string, config: unknown): Promise<WeatherSource> {
const row = await sources(this.db, userId).find("aelis.weather") const parsed = weatherConfig(config)
if (!row || !row.enabled) {
throw new SourceDisabledError("aelis.weather", userId)
}
const parsed = weatherConfig(row.config ?? {})
if (parsed instanceof type.errors) { if (parsed instanceof type.errors) {
throw new Error(`Invalid weather config for user ${userId}: ${parsed.summary}`) throw new Error(`Invalid weather config: ${parsed.summary}`)
} }
return new WeatherSource({ return new WeatherSource({

View File

@@ -30,8 +30,10 @@
"better-auth": "^1", "better-auth": "^1",
"drizzle-orm": "^0.45.1", "drizzle-orm": "^0.45.1",
"hono": "^4", "hono": "^4",
"lodash.merge": "^4.6.2",
}, },
"devDependencies": { "devDependencies": {
"@types/lodash.merge": "^4.6.9",
"drizzle-kit": "^0.31.9", "drizzle-kit": "^0.31.9",
}, },
}, },
@@ -1246,6 +1248,10 @@
"@types/json5": ["@types/json5@0.0.29", "", {}, "sha512-dRLjCWHYg4oaA77cxO64oO+7JwCwnIzkZPdrrC71jQmQtlhM556pwKo5bUzqvZndkVbeFLIIi+9TC40JNF5hNQ=="], "@types/json5": ["@types/json5@0.0.29", "", {}, "sha512-dRLjCWHYg4oaA77cxO64oO+7JwCwnIzkZPdrrC71jQmQtlhM556pwKo5bUzqvZndkVbeFLIIi+9TC40JNF5hNQ=="],
"@types/lodash": ["@types/lodash@4.17.24", "", {}, "sha512-gIW7lQLZbue7lRSWEFql49QJJWThrTFFeIMJdp3eH4tKoxm1OvEPg02rm4wCCSHS0cL3/Fizimb35b7k8atwsQ=="],
"@types/lodash.merge": ["@types/lodash.merge@4.6.9", "", { "dependencies": { "@types/lodash": "*" } }, "sha512-23sHDPmzd59kUgWyKGiOMO2Qb9YtqRO/x4IhkgNUiPQ1+5MUVqi6bCZeq9nBJ17msjIMbEIO5u+XW4Kz6aGUhQ=="],
"@types/mdast": ["@types/mdast@4.0.4", "", { "dependencies": { "@types/unist": "*" } }, "sha512-kGaNbPh1k7AFzgpud/gMdvIm5xuECykRR+JnWKQno9TAXVa6WIVCGTPvYGekIDL4uwCZQSYbUxNBSb1aUo79oA=="], "@types/mdast": ["@types/mdast@4.0.4", "", { "dependencies": { "@types/unist": "*" } }, "sha512-kGaNbPh1k7AFzgpud/gMdvIm5xuECykRR+JnWKQno9TAXVa6WIVCGTPvYGekIDL4uwCZQSYbUxNBSb1aUo79oA=="],
"@types/ms": ["@types/ms@2.1.0", "", {}, "sha512-GsCCIZDE/p3i96vtEqx+7dBUGXrc7zeSK3wwPHIaRThS+9OhWIXRqzs4d6k1SVU8g91DrNRWxWUGhp5KXQb2VA=="], "@types/ms": ["@types/ms@2.1.0", "", {}, "sha512-GsCCIZDE/p3i96vtEqx+7dBUGXrc7zeSK3wwPHIaRThS+9OhWIXRqzs4d6k1SVU8g91DrNRWxWUGhp5KXQb2VA=="],