Compare commits

..

1 Commits

Author SHA1 Message Date
d949296104 fix: add source to session on cred update
When updateSourceCredentials was called for a source not yet in the
active session (e.g. because credentials were missing at config time),
the source was never instantiated despite being enabled in the DB.

Now, if the source row is enabled but absent from the session, the
source is added instead of skipped.

Co-authored-by: Ona <no-reply@ona.com>
2026-04-12 11:40:56 +00:00
8 changed files with 86 additions and 279 deletions

View File

@@ -20,7 +20,13 @@ import {
import { Separator } from "@/components/ui/separator" import { Separator } from "@/components/ui/separator"
import { Switch } from "@/components/ui/switch" import { Switch } from "@/components/ui/switch"
import { Tooltip, TooltipContent, TooltipTrigger } from "@/components/ui/tooltip" import { Tooltip, TooltipContent, TooltipTrigger } from "@/components/ui/tooltip"
import { fetchSourceConfig, pushLocation, replaceSource, updateProviderConfig } from "@/lib/api" import {
fetchSourceConfig,
pushLocation,
replaceSource,
updateProviderConfig,
updateSourceCredentials,
} from "@/lib/api"
interface SourceConfigPanelProps { interface SourceConfigPanelProps {
source: SourceDefinition source: SourceDefinition
@@ -74,24 +80,23 @@ export function SourceConfigPanel({ source, onUpdate }: SourceConfigPanelProps)
const saveMutation = useMutation({ const saveMutation = useMutation({
mutationFn: async () => { mutationFn: async () => {
const promises: Promise<void>[] = [
replaceSource(source.id, { enabled, config: getUserConfig() }),
]
const credentialFields = getCredentialFields() const credentialFields = getCredentialFields()
const hasCredentials = Object.values(credentialFields).some( const hasCredentials = Object.values(credentialFields).some(
(v) => typeof v === "string" && v.length > 0, (v) => typeof v === "string" && v.length > 0,
) )
if (hasCredentials) {
if (source.perUserCredentials) {
promises.push(updateSourceCredentials(source.id, credentialFields))
} else {
promises.push(updateProviderConfig(source.id, { credentials: credentialFields }))
}
}
const body: Parameters<typeof replaceSource>[1] = { await Promise.all(promises)
enabled,
config: getUserConfig(),
}
if (hasCredentials && source.perUserCredentials) {
body.credentials = credentialFields
}
await replaceSource(source.id, body)
// For non-per-user credentials (provider-level), still use the admin endpoint.
if (hasCredentials && !source.perUserCredentials) {
await updateProviderConfig(source.id, { credentials: credentialFields })
}
}, },
onSuccess() { onSuccess() {
setDirty({}) setDirty({})

View File

@@ -114,7 +114,7 @@ const sourceDefinitions: SourceDefinition[] = [
timeZone: { timeZone: {
type: "string", type: "string",
label: "Timezone", label: "Timezone",
description: 'IANA timezone for determining "today" (e.g. Europe/London). Defaults to UTC.', description: "IANA timezone for determining \"today\" (e.g. Europe/London). Defaults to UTC.",
}, },
}, },
}, },
@@ -174,7 +174,7 @@ export async function fetchConfigs(): Promise<SourceConfig[]> {
export async function replaceSource( export async function replaceSource(
sourceId: string, sourceId: string,
body: { enabled: boolean; config: unknown; credentials?: Record<string, unknown> }, body: { enabled: boolean; config: unknown },
): Promise<void> { ): Promise<void> {
const res = await fetch(`${serverBase()}/sources/${sourceId}`, { const res = await fetch(`${serverBase()}/sources/${sourceId}`, {
method: "PUT", method: "PUT",

View File

@@ -1,12 +1,9 @@
import type { PgDatabase } from "drizzle-orm/pg-core"
import { SQL } from "bun" import { SQL } from "bun"
import { drizzle, type BunSQLQueryResultHKT } from "drizzle-orm/bun-sql" import { drizzle, type BunSQLDatabase } from "drizzle-orm/bun-sql"
import * as schema from "./schema.ts" import * as schema from "./schema.ts"
/** Covers both the top-level drizzle instance and transaction handles. */ export type Database = BunSQLDatabase<typeof schema>
export type Database = PgDatabase<BunSQLQueryResultHKT, typeof schema>
export interface DatabaseConnection { export interface DatabaseConnection {
db: Database db: Database

View File

@@ -81,27 +81,6 @@ mock.module("../sources/user-sources.ts", () => ({
updatedAt: now, updatedAt: now,
} }
}, },
async findForUpdate(sourceId: string) {
// Delegates to find — row locking is a no-op in tests.
if (mockFindResult !== undefined) return mockFindResult
const now = new Date()
return {
id: crypto.randomUUID(),
userId,
sourceId,
enabled: true,
config: {},
credentials: null,
createdAt: now,
updatedAt: now,
}
},
async updateConfig(_sourceId: string, _update: { enabled?: boolean; config?: unknown }) {
// no-op for tests
},
async upsertConfig(_sourceId: string, _data: { enabled: boolean; config: unknown }) {
// no-op for tests
},
async updateCredentials(sourceId: string, credentials: Buffer) { async updateCredentials(sourceId: string, credentials: Buffer) {
if (mockUpdateCredentialsError) { if (mockUpdateCredentialsError) {
throw mockUpdateCredentialsError throw mockUpdateCredentialsError
@@ -111,9 +90,7 @@ mock.module("../sources/user-sources.ts", () => ({
}), }),
})) }))
const fakeDb = { const fakeDb = {} as Database
transaction: <T>(fn: (tx: unknown) => Promise<T>) => fn(fakeDb),
} as unknown as Database
function createStubSource(id: string, items: FeedItem[] = []): FeedSource { function createStubSource(id: string, items: FeedItem[] = []): FeedSource {
return { return {
@@ -829,6 +806,31 @@ describe("UserSessionManager.updateSourceCredentials", () => {
expect(receivedCredentials).toEqual({ token: "refreshed" }) expect(receivedCredentials).toEqual({ token: "refreshed" })
}) })
test("adds source to session when source is enabled but not yet in session", async () => {
// Simulate a source that was never added to the session (e.g. credentials
// were missing at config time), but is enabled in the DB.
setEnabledSources([]) // no sources during session creation
const factory = mock(async () => createStubSource("test"))
const provider: FeedSourceProvider = { sourceId: "test", feedSourceForUser: factory }
const manager = new UserSessionManager({
db: fakeDb,
providers: [provider],
credentialEncryptor: testEncryptor,
})
const session = await manager.getOrCreate("user-1")
// Source is NOT in the session
expect(session.hasSource("test")).toBe(false)
// mockFindResult returns an enabled row by default, so the source
// row exists and is enabled in the DB.
await manager.updateSourceCredentials("user-1", "test", { token: "new-token" })
// Source should now be added to the session
expect(session.hasSource("test")).toBe(true)
expect(factory).toHaveBeenCalledTimes(1)
})
test("persists credentials without session refresh when no active session", async () => { test("persists credentials without session refresh when no active session", async () => {
setEnabledSources(["test"]) setEnabledSources(["test"])
const factory = mock(async () => createStubSource("test")) const factory = mock(async () => createStubSource("test"))
@@ -847,121 +849,3 @@ describe("UserSessionManager.updateSourceCredentials", () => {
expect(factory).not.toHaveBeenCalled() expect(factory).not.toHaveBeenCalled()
}) })
}) })
describe("UserSessionManager.saveSourceConfig", () => {
test("upserts config without credentials (existing behavior)", async () => {
setEnabledSources(["test"])
const factory = mock(async () => createStubSource("test"))
const provider: FeedSourceProvider = { sourceId: "test", feedSourceForUser: factory }
const manager = new UserSessionManager({
db: fakeDb,
providers: [provider],
credentialEncryptor: testEncryptor,
})
// Create a session first so we can verify the source is refreshed
await manager.getOrCreate("user-1")
await manager.saveSourceConfig("user-1", "test", {
enabled: true,
config: { key: "value" },
})
// feedSourceForUser called once for session creation, once for upsert refresh
expect(factory).toHaveBeenCalledTimes(2)
// No credentials should have been persisted
expect(mockUpdateCredentialsCalls).toHaveLength(0)
})
test("upserts config with credentials — persists both and passes credentials to source", async () => {
setEnabledSources(["test"])
let receivedCredentials: unknown = null
const factory = mock(async (_userId: string, _config: unknown, creds: unknown) => {
receivedCredentials = creds
return createStubSource("test")
})
const provider: FeedSourceProvider = { sourceId: "test", feedSourceForUser: factory }
const manager = new UserSessionManager({
db: fakeDb,
providers: [provider],
credentialEncryptor: testEncryptor,
})
// Create a session so the source refresh path runs
await manager.getOrCreate("user-1")
const creds = { username: "alice", password: "s3cret" }
await manager.saveSourceConfig("user-1", "test", {
enabled: true,
config: { serverUrl: "https://example.com" },
credentials: creds,
})
// Credentials were encrypted and persisted
expect(mockUpdateCredentialsCalls).toHaveLength(1)
const decrypted = JSON.parse(testEncryptor.decrypt(mockUpdateCredentialsCalls[0]!.credentials))
expect(decrypted).toEqual(creds)
// feedSourceForUser received the provided credentials (not null)
expect(receivedCredentials).toEqual(creds)
})
test("upserts config with credentials adds source to session when not already present", async () => {
// Start with no enabled sources so the session is empty
setEnabledSources([])
const factory = mock(async () => createStubSource("test"))
const provider: FeedSourceProvider = { sourceId: "test", feedSourceForUser: factory }
const manager = new UserSessionManager({
db: fakeDb,
providers: [provider],
credentialEncryptor: testEncryptor,
})
const session = await manager.getOrCreate("user-1")
expect(session.hasSource("test")).toBe(false)
// Set mockFindResult to undefined so find() returns a row (simulating the row was just created by upsertConfig)
await manager.saveSourceConfig("user-1", "test", {
enabled: true,
config: {},
credentials: { token: "abc" },
})
// Source should now be in the session
expect(session.hasSource("test")).toBe(true)
expect(mockUpdateCredentialsCalls).toHaveLength(1)
})
test("throws CredentialStorageUnavailableError when credentials provided without encryptor", async () => {
setEnabledSources(["test"])
const provider = createStubProvider("test")
const manager = new UserSessionManager({
db: fakeDb,
providers: [provider],
// No credentialEncryptor
})
await expect(
manager.saveSourceConfig("user-1", "test", {
enabled: true,
config: {},
credentials: { token: "abc" },
}),
).rejects.toBeInstanceOf(CredentialStorageUnavailableError)
})
test("throws SourceNotFoundError for unknown provider", async () => {
const manager = new UserSessionManager({
db: fakeDb,
providers: [],
credentialEncryptor: testEncryptor,
})
await expect(
manager.saveSourceConfig("user-1", "unknown", {
enabled: true,
config: {},
}),
).rejects.toBeInstanceOf(SourceNotFoundError)
})
})

View File

@@ -126,29 +126,27 @@ export class UserSessionManager {
return return
} }
// Use a transaction with SELECT FOR UPDATE to prevent lost updates // Fetch the existing row for config merging and credential access.
// when concurrent PATCH requests merge config against the same base. // NOTE: find + updateConfig is not atomic. A concurrent update could
const { existingRow, mergedConfig } = await this.db.transaction(async (tx) => { // read stale config. Use SELECT FOR UPDATE or atomic jsonb merge if
const existingRow = await sources(tx, userId).findForUpdate(sourceId) // this becomes a problem.
const existingRow = await sources(this.db, userId).find(sourceId)
let mergedConfig: Record<string, unknown> | undefined let mergedConfig: Record<string, unknown> | undefined
if (update.config !== undefined && provider.configSchema) { if (update.config !== undefined && provider.configSchema) {
const existingConfig = (existingRow?.config ?? {}) as Record<string, unknown> const existingConfig = (existingRow?.config ?? {}) as Record<string, unknown>
mergedConfig = merge({}, existingConfig, update.config) mergedConfig = merge({}, existingConfig, update.config)
const validated = provider.configSchema(mergedConfig) const validated = provider.configSchema(mergedConfig)
if (validated instanceof type.errors) { if (validated instanceof type.errors) {
throw new InvalidSourceConfigError(sourceId, validated.summary) throw new InvalidSourceConfigError(sourceId, validated.summary)
}
} }
}
// Throws SourceNotFoundError if the row doesn't exist // Throws SourceNotFoundError if the row doesn't exist
await sources(tx, userId).updateConfig(sourceId, { await sources(this.db, userId).updateConfig(sourceId, {
enabled: update.enabled, enabled: update.enabled,
config: mergedConfig, config: mergedConfig,
})
return { existingRow, mergedConfig }
}) })
// Refresh the specific source in the active session instead of // Refresh the specific source in the active session instead of
@@ -173,18 +171,13 @@ export class UserSessionManager {
* inserts a new row if one doesn't exist and fully replaces config * inserts a new row if one doesn't exist and fully replaces config
* (no merge). * (no merge).
* *
* When `credentials` is provided, they are encrypted and persisted
* alongside the config in the same flow, avoiding the race condition
* of separate config + credential requests.
*
* @throws {SourceNotFoundError} if the sourceId has no registered provider * @throws {SourceNotFoundError} if the sourceId has no registered provider
* @throws {InvalidSourceConfigError} if config fails schema validation * @throws {InvalidSourceConfigError} if config fails schema validation
* @throws {CredentialStorageUnavailableError} if credentials are provided but no encryptor is configured
*/ */
async saveSourceConfig( async upsertSourceConfig(
userId: string, userId: string,
sourceId: string, sourceId: string,
data: { enabled: boolean; config?: unknown; credentials?: unknown }, data: { enabled: boolean; config?: unknown },
): Promise<void> { ): Promise<void> {
const provider = this.providers.get(sourceId) const provider = this.providers.get(sourceId)
if (!provider) { if (!provider) {
@@ -198,28 +191,15 @@ export class UserSessionManager {
} }
} }
if (data.credentials !== undefined && !this.encryptor) {
throw new CredentialStorageUnavailableError()
}
const config = data.config ?? {} const config = data.config ?? {}
// Run the upsert + credential update atomically so a failure in // Fetch existing row before upsert to capture credentials for session refresh.
// either step doesn't leave the row in an inconsistent state. // For new rows this will be undefined — credentials will be null.
const existingRow = await this.db.transaction(async (tx) => { const existingRow = await sources(this.db, userId).find(sourceId)
const existing = await sources(tx, userId).find(sourceId)
await sources(tx, userId).upsertConfig(sourceId, { await sources(this.db, userId).upsertConfig(sourceId, {
enabled: data.enabled, enabled: data.enabled,
config, config,
})
if (data.credentials !== undefined && this.encryptor) {
const encrypted = this.encryptor.encrypt(JSON.stringify(data.credentials))
await sources(tx, userId).updateCredentials(sourceId, encrypted)
}
return existing
}) })
const session = this.sessions.get(userId) const session = this.sessions.get(userId)
@@ -227,13 +207,9 @@ export class UserSessionManager {
if (!data.enabled) { if (!data.enabled) {
session.removeSource(sourceId) session.removeSource(sourceId)
} else { } else {
// Prefer the just-provided credentials over what was in the DB. const credentials = existingRow?.credentials
let credentials: unknown = null ? this.decryptCredentials(existingRow.credentials)
if (data.credentials !== undefined) { : null
credentials = data.credentials
} else if (existingRow?.credentials) {
credentials = this.decryptCredentials(existingRow.credentials)
}
const source = await provider.feedSourceForUser(userId, config, credentials) const source = await provider.feedSourceForUser(userId, config, credentials)
if (session.hasSource(sourceId)) { if (session.hasSource(sourceId)) {
session.replaceSource(sourceId, source) session.replaceSource(sourceId, source)
@@ -273,11 +249,15 @@ export class UserSessionManager {
// the DB already has the new credentials but the session keeps the old // the DB already has the new credentials but the session keeps the old
// source. The next session creation will pick up the persisted credentials. // source. The next session creation will pick up the persisted credentials.
const session = this.sessions.get(userId) const session = this.sessions.get(userId)
if (session && session.hasSource(sourceId)) { if (session) {
const row = await sources(this.db, userId).find(sourceId) const row = await sources(this.db, userId).find(sourceId)
if (row?.enabled) { if (row?.enabled) {
const source = await provider.feedSourceForUser(userId, row.config ?? {}, credentials) const source = await provider.feedSourceForUser(userId, row.config ?? {}, credentials)
session.replaceSource(sourceId, source) if (session.hasSource(sourceId)) {
session.replaceSource(sourceId, source)
} else {
session.addSource(source)
}
} }
} }
} }

View File

@@ -80,9 +80,6 @@ function createInMemoryStore() {
async find(sourceId: string) { async find(sourceId: string) {
return rows.get(key(userId, sourceId)) return rows.get(key(userId, sourceId))
}, },
async findForUpdate(sourceId: string) {
return rows.get(key(userId, sourceId))
},
async updateConfig(sourceId: string, update: { enabled?: boolean; config?: unknown }) { async updateConfig(sourceId: string, update: { enabled?: boolean; config?: unknown }) {
const existing = rows.get(key(userId, sourceId)) const existing = rows.get(key(userId, sourceId))
if (!existing) { if (!existing) {
@@ -128,9 +125,7 @@ mock.module("../sources/user-sources.ts", () => ({
}, },
})) }))
const fakeDb = { const fakeDb = {} as Database
transaction: <T>(fn: (tx: unknown) => Promise<T>) => fn(fakeDb),
} as unknown as Database
function createApp(providers: FeedSourceProvider[], userId?: string) { function createApp(providers: FeedSourceProvider[], userId?: string) {
const sessionManager = new UserSessionManager({ providers, db: fakeDb }) const sessionManager = new UserSessionManager({ providers, db: fakeDb })
@@ -743,42 +738,6 @@ describe("PUT /api/sources/:sourceId", () => {
expect(res.status).toBe(204) expect(res.status).toBe(204)
}) })
test("returns 204 when credentials are included alongside config", async () => {
activeStore = createInMemoryStore()
const { app } = createAppWithEncryptor(
[createStubProvider("aelis.weather", weatherConfig)],
MOCK_USER_ID,
)
const res = await put(app, "aelis.weather", {
enabled: true,
config: { units: "metric" },
credentials: { apiKey: "secret123" },
})
expect(res.status).toBe(204)
const row = activeStore.rows.get(`${MOCK_USER_ID}:aelis.weather`)
expect(row).toBeDefined()
expect(row!.enabled).toBe(true)
expect(row!.config).toEqual({ units: "metric" })
})
test("returns 503 when credentials are provided but no encryptor is configured", async () => {
activeStore = createInMemoryStore()
// createApp does NOT configure an encryptor
const { app } = createApp([createStubProvider("aelis.weather", weatherConfig)], MOCK_USER_ID)
const res = await put(app, "aelis.weather", {
enabled: true,
config: { units: "metric" },
credentials: { apiKey: "secret123" },
})
expect(res.status).toBe(503)
const body = (await res.json()) as { error: string }
expect(body.error).toContain("not configured")
})
}) })
describe("PUT /api/sources/:sourceId/credentials", () => { describe("PUT /api/sources/:sourceId/credentials", () => {

View File

@@ -34,13 +34,11 @@ const ReplaceSourceConfigRequestBody = type({
"+": "reject", "+": "reject",
enabled: "boolean", enabled: "boolean",
config: "unknown", config: "unknown",
"credentials?": "unknown",
}) })
const ReplaceSourceConfigNoConfigRequestBody = type({ const ReplaceSourceConfigNoConfigRequestBody = type({
"+": "reject", "+": "reject",
enabled: "boolean", enabled: "boolean",
"credentials?": "unknown",
}) })
export function registerSourcesHttpHandlers( export function registerSourcesHttpHandlers(
@@ -163,15 +161,14 @@ async function handleReplaceSource(c: Context<Env>) {
return c.json({ error: parsed.summary }, 400) return c.json({ error: parsed.summary }, 400)
} }
const { enabled, credentials } = parsed const { enabled } = parsed
const config = "config" in parsed ? parsed.config : undefined const config = "config" in parsed ? parsed.config : undefined
const user = c.get("user")! const user = c.get("user")!
try { try {
await sessionManager.saveSourceConfig(user.id, sourceId, { await sessionManager.upsertSourceConfig(user.id, sourceId, {
enabled, enabled,
config, config,
credentials,
}) })
} catch (err) { } catch (err) {
if (err instanceof SourceNotFoundError) { if (err instanceof SourceNotFoundError) {
@@ -180,9 +177,6 @@ async function handleReplaceSource(c: Context<Env>) {
if (err instanceof InvalidSourceConfigError) { if (err instanceof InvalidSourceConfigError) {
return c.json({ error: err.message }, 400) return c.json({ error: err.message }, 400)
} }
if (err instanceof CredentialStorageUnavailableError) {
return c.json({ error: err.message }, 503)
}
throw err throw err
} }

View File

@@ -26,18 +26,6 @@ export function sources(db: Database, userId: string) {
return rows[0] return rows[0]
}, },
/** Like find(), but acquires a row lock to prevent concurrent modifications. Must be called inside a transaction. */
async findForUpdate(sourceId: string) {
const rows = await db
.select()
.from(userSources)
.where(and(eq(userSources.userId, userId), eq(userSources.sourceId, sourceId)))
.limit(1)
.for("update")
return rows[0]
},
/** Enables a source for the user. Throws if the source row doesn't exist. */ /** Enables a source for the user. Throws if the source row doesn't exist. */
async enableSource(sourceId: string) { async enableSource(sourceId: string) {
const rows = await db const rows = await db