Compare commits

..

1 Commits

Author SHA1 Message Date
dcabc4fbdb fix: handle provider errors during source config upsert
Wrap feedSourceForUser + addSource/replaceSource in upsertSourceConfig
with try/catch. When credentials are not yet available (e.g. new CalDAV
source before updateSourceCredentials is called), the provider throws
but the config is still persisted. updateSourceCredentials will add the
source to the session later.

Co-authored-by: Ona <no-reply@ona.com>
2026-04-12 11:42:58 +00:00
8 changed files with 118 additions and 254 deletions

View File

@@ -20,7 +20,13 @@ import {
import { Separator } from "@/components/ui/separator"
import { Switch } from "@/components/ui/switch"
import { Tooltip, TooltipContent, TooltipTrigger } from "@/components/ui/tooltip"
import { fetchSourceConfig, pushLocation, replaceSource, updateProviderConfig } from "@/lib/api"
import {
fetchSourceConfig,
pushLocation,
replaceSource,
updateProviderConfig,
updateSourceCredentials,
} from "@/lib/api"
interface SourceConfigPanelProps {
source: SourceDefinition
@@ -74,24 +80,23 @@ export function SourceConfigPanel({ source, onUpdate }: SourceConfigPanelProps)
const saveMutation = useMutation({
mutationFn: async () => {
const promises: Promise<void>[] = [
replaceSource(source.id, { enabled, config: getUserConfig() }),
]
const credentialFields = getCredentialFields()
const hasCredentials = Object.values(credentialFields).some(
(v) => typeof v === "string" && v.length > 0,
)
if (hasCredentials) {
if (source.perUserCredentials) {
promises.push(updateSourceCredentials(source.id, credentialFields))
} else {
promises.push(updateProviderConfig(source.id, { credentials: credentialFields }))
}
}
const body: Parameters<typeof replaceSource>[1] = {
enabled,
config: getUserConfig(),
}
if (hasCredentials && source.perUserCredentials) {
body.credentials = credentialFields
}
await replaceSource(source.id, body)
// For non-per-user credentials (provider-level), still use the admin endpoint.
if (hasCredentials && !source.perUserCredentials) {
await updateProviderConfig(source.id, { credentials: credentialFields })
}
await Promise.all(promises)
},
onSuccess() {
setDirty({})

View File

@@ -114,7 +114,7 @@ const sourceDefinitions: SourceDefinition[] = [
timeZone: {
type: "string",
label: "Timezone",
description: 'IANA timezone for determining "today" (e.g. Europe/London). Defaults to UTC.',
description: "IANA timezone for determining \"today\" (e.g. Europe/London). Defaults to UTC.",
},
},
},
@@ -174,7 +174,7 @@ export async function fetchConfigs(): Promise<SourceConfig[]> {
export async function replaceSource(
sourceId: string,
body: { enabled: boolean; config: unknown; credentials?: Record<string, unknown> },
body: { enabled: boolean; config: unknown },
): Promise<void> {
const res = await fetch(`${serverBase()}/sources/${sourceId}`, {
method: "PUT",

View File

@@ -1,12 +1,9 @@
import type { PgDatabase } from "drizzle-orm/pg-core"
import { SQL } from "bun"
import { drizzle, type BunSQLQueryResultHKT } from "drizzle-orm/bun-sql"
import { drizzle, type BunSQLDatabase } from "drizzle-orm/bun-sql"
import * as schema from "./schema.ts"
/** Covers both the top-level drizzle instance and transaction handles. */
export type Database = PgDatabase<BunSQLQueryResultHKT, typeof schema>
export type Database = BunSQLDatabase<typeof schema>
export interface DatabaseConnection {
db: Database

View File

@@ -44,6 +44,15 @@ function getEnabledSourceIds(userId: string): string[] {
*/
let mockFindResult: unknown | undefined
/**
* Spy for `upsertConfig` calls. Tests can inspect calls via
* `mockUpsertConfigCalls`.
*/
const mockUpsertConfigCalls: Array<{
sourceId: string
data: { enabled: boolean; config: unknown }
}> = []
/**
* Spy for `updateCredentials` calls. Tests can inspect calls via
* `mockUpdateCredentialsCalls` or override behavior.
@@ -81,26 +90,8 @@ mock.module("../sources/user-sources.ts", () => ({
updatedAt: now,
}
},
async findForUpdate(sourceId: string) {
// Delegates to find — row locking is a no-op in tests.
if (mockFindResult !== undefined) return mockFindResult
const now = new Date()
return {
id: crypto.randomUUID(),
userId,
sourceId,
enabled: true,
config: {},
credentials: null,
createdAt: now,
updatedAt: now,
}
},
async updateConfig(_sourceId: string, _update: { enabled?: boolean; config?: unknown }) {
// no-op for tests
},
async upsertConfig(_sourceId: string, _data: { enabled: boolean; config: unknown }) {
// no-op for tests
async upsertConfig(sourceId: string, data: { enabled: boolean; config: unknown }) {
mockUpsertConfigCalls.push({ sourceId, data })
},
async updateCredentials(sourceId: string, credentials: Buffer) {
if (mockUpdateCredentialsError) {
@@ -111,9 +102,7 @@ mock.module("../sources/user-sources.ts", () => ({
}),
}))
const fakeDb = {
transaction: <T>(fn: (tx: unknown) => Promise<T>) => fn(fakeDb),
} as unknown as Database
const fakeDb = {} as Database
function createStubSource(id: string, items: FeedItem[] = []): FeedSource {
return {
@@ -161,6 +150,7 @@ const weatherProvider: FeedSourceProvider = {
beforeEach(() => {
enabledByUser.clear()
mockFindResult = undefined
mockUpsertConfigCalls.length = 0
mockUpdateCredentialsCalls.length = 0
mockUpdateCredentialsError = null
})
@@ -848,120 +838,64 @@ describe("UserSessionManager.updateSourceCredentials", () => {
})
})
describe("UserSessionManager.saveSourceConfig", () => {
test("upserts config without credentials (existing behavior)", async () => {
describe("UserSessionManager.upsertSourceConfig", () => {
test("persists config to DB even when feedSourceForUser throws", async () => {
setEnabledSources(["test"])
const factory = mock(async () => createStubSource("test"))
const provider: FeedSourceProvider = { sourceId: "test", feedSourceForUser: factory }
const manager = new UserSessionManager({
db: fakeDb,
providers: [provider],
credentialEncryptor: testEncryptor,
})
// Create a session first so we can verify the source is refreshed
await manager.getOrCreate("user-1")
await manager.saveSourceConfig("user-1", "test", {
enabled: true,
config: { key: "value" },
})
// feedSourceForUser called once for session creation, once for upsert refresh
expect(factory).toHaveBeenCalledTimes(2)
// No credentials should have been persisted
expect(mockUpdateCredentialsCalls).toHaveLength(0)
})
test("upserts config with credentials — persists both and passes credentials to source", async () => {
setEnabledSources(["test"])
let receivedCredentials: unknown = null
const factory = mock(async (_userId: string, _config: unknown, creds: unknown) => {
receivedCredentials = creds
let callCount = 0
const factory = mock(async (_userId: string, _config: unknown, _credentials: unknown) => {
callCount++
// Succeed on first call (session creation), throw on second (upsert refresh)
if (callCount > 1) {
throw new InvalidSourceCredentialsError("test", "credentials required")
}
return createStubSource("test")
})
const provider: FeedSourceProvider = { sourceId: "test", feedSourceForUser: factory }
const manager = new UserSessionManager({
db: fakeDb,
providers: [provider],
credentialEncryptor: testEncryptor,
})
const manager = new UserSessionManager({ db: fakeDb, providers: [provider] })
// Create a session so the source refresh path runs
// Create a session so the session-refresh path is exercised
await manager.getOrCreate("user-1")
const creds = { username: "alice", password: "s3cret" }
await manager.saveSourceConfig("user-1", "test", {
const spy = spyOn(console, "warn").mockImplementation(() => {})
// upsertSourceConfig with no existing credentials — provider will throw
await manager.upsertSourceConfig("user-1", "test", {
enabled: true,
config: { serverUrl: "https://example.com" },
credentials: creds,
config: { url: "https://example.com" },
})
// Credentials were encrypted and persisted
expect(mockUpdateCredentialsCalls).toHaveLength(1)
const decrypted = JSON.parse(testEncryptor.decrypt(mockUpdateCredentialsCalls[0]!.credentials))
expect(decrypted).toEqual(creds)
// Config should still have been persisted to DB
expect(mockUpsertConfigCalls).toHaveLength(1)
expect(mockUpsertConfigCalls[0]!.sourceId).toBe("test")
expect(mockUpsertConfigCalls[0]!.data.enabled).toBe(true)
// feedSourceForUser received the provided credentials (not null)
expect(receivedCredentials).toEqual(creds)
// The error should have been logged, not thrown
expect(spy).toHaveBeenCalled()
spy.mockRestore()
})
test("upserts config with credentials adds source to session when not already present", async () => {
// Start with no enabled sources so the session is empty
setEnabledSources([])
test("adds source to session when feedSourceForUser succeeds", async () => {
setEnabledSources(["test"])
const factory = mock(async () => createStubSource("test"))
const provider: FeedSourceProvider = { sourceId: "test", feedSourceForUser: factory }
const manager = new UserSessionManager({
db: fakeDb,
providers: [provider],
credentialEncryptor: testEncryptor,
})
const manager = new UserSessionManager({ db: fakeDb, providers: [provider] })
const session = await manager.getOrCreate("user-1")
expect(session.hasSource("test")).toBe(false)
await manager.upsertSourceConfig("user-1", "test", { enabled: true })
// Set mockFindResult to undefined so find() returns a row (simulating the row was just created by upsertConfig)
await manager.saveSourceConfig("user-1", "test", {
enabled: true,
config: {},
credentials: { token: "abc" },
})
// Source should now be in the session
expect(session.hasSource("test")).toBe(true)
expect(mockUpdateCredentialsCalls).toHaveLength(1)
})
test("throws CredentialStorageUnavailableError when credentials provided without encryptor", async () => {
setEnabledSources(["test"])
const provider = createStubProvider("test")
const manager = new UserSessionManager({
db: fakeDb,
providers: [provider],
// No credentialEncryptor
})
await expect(
manager.saveSourceConfig("user-1", "test", {
enabled: true,
config: {},
credentials: { token: "abc" },
}),
).rejects.toBeInstanceOf(CredentialStorageUnavailableError)
// Config persisted
expect(mockUpsertConfigCalls).toHaveLength(1)
// Source should be in the session (feedSourceForUser succeeded)
expect(session.getSource("test")).toBeDefined()
})
test("throws SourceNotFoundError for unknown provider", async () => {
const manager = new UserSessionManager({
db: fakeDb,
providers: [],
credentialEncryptor: testEncryptor,
})
setEnabledSources([])
const manager = new UserSessionManager({ db: fakeDb, providers: [] })
await expect(
manager.saveSourceConfig("user-1", "unknown", {
enabled: true,
config: {},
}),
manager.upsertSourceConfig("user-1", "unknown", { enabled: true }),
).rejects.toBeInstanceOf(SourceNotFoundError)
})
})

View File

@@ -126,29 +126,27 @@ export class UserSessionManager {
return
}
// Use a transaction with SELECT FOR UPDATE to prevent lost updates
// when concurrent PATCH requests merge config against the same base.
const { existingRow, mergedConfig } = await this.db.transaction(async (tx) => {
const existingRow = await sources(tx, userId).findForUpdate(sourceId)
// Fetch the existing row for config merging and credential access.
// NOTE: find + updateConfig is not atomic. A concurrent update could
// read stale config. Use SELECT FOR UPDATE or atomic jsonb merge if
// this becomes a problem.
const existingRow = await sources(this.db, userId).find(sourceId)
let mergedConfig: Record<string, unknown> | undefined
if (update.config !== undefined && provider.configSchema) {
const existingConfig = (existingRow?.config ?? {}) as Record<string, unknown>
mergedConfig = merge({}, existingConfig, update.config)
let mergedConfig: Record<string, unknown> | undefined
if (update.config !== undefined && provider.configSchema) {
const existingConfig = (existingRow?.config ?? {}) as Record<string, unknown>
mergedConfig = merge({}, existingConfig, update.config)
const validated = provider.configSchema(mergedConfig)
if (validated instanceof type.errors) {
throw new InvalidSourceConfigError(sourceId, validated.summary)
}
const validated = provider.configSchema(mergedConfig)
if (validated instanceof type.errors) {
throw new InvalidSourceConfigError(sourceId, validated.summary)
}
}
// Throws SourceNotFoundError if the row doesn't exist
await sources(tx, userId).updateConfig(sourceId, {
enabled: update.enabled,
config: mergedConfig,
})
return { existingRow, mergedConfig }
// Throws SourceNotFoundError if the row doesn't exist
await sources(this.db, userId).updateConfig(sourceId, {
enabled: update.enabled,
config: mergedConfig,
})
// Refresh the specific source in the active session instead of
@@ -173,18 +171,13 @@ export class UserSessionManager {
* inserts a new row if one doesn't exist and fully replaces config
* (no merge).
*
* When `credentials` is provided, they are encrypted and persisted
* alongside the config in the same flow, avoiding the race condition
* of separate config + credential requests.
*
* @throws {SourceNotFoundError} if the sourceId has no registered provider
* @throws {InvalidSourceConfigError} if config fails schema validation
* @throws {CredentialStorageUnavailableError} if credentials are provided but no encryptor is configured
*/
async saveSourceConfig(
async upsertSourceConfig(
userId: string,
sourceId: string,
data: { enabled: boolean; config?: unknown; credentials?: unknown },
data: { enabled: boolean; config?: unknown },
): Promise<void> {
const provider = this.providers.get(sourceId)
if (!provider) {
@@ -198,28 +191,15 @@ export class UserSessionManager {
}
}
if (data.credentials !== undefined && !this.encryptor) {
throw new CredentialStorageUnavailableError()
}
const config = data.config ?? {}
// Run the upsert + credential update atomically so a failure in
// either step doesn't leave the row in an inconsistent state.
const existingRow = await this.db.transaction(async (tx) => {
const existing = await sources(tx, userId).find(sourceId)
// Fetch existing row before upsert to capture credentials for session refresh.
// For new rows this will be undefined — credentials will be null.
const existingRow = await sources(this.db, userId).find(sourceId)
await sources(tx, userId).upsertConfig(sourceId, {
enabled: data.enabled,
config,
})
if (data.credentials !== undefined && this.encryptor) {
const encrypted = this.encryptor.encrypt(JSON.stringify(data.credentials))
await sources(tx, userId).updateCredentials(sourceId, encrypted)
}
return existing
await sources(this.db, userId).upsertConfig(sourceId, {
enabled: data.enabled,
config,
})
const session = this.sessions.get(userId)
@@ -227,18 +207,25 @@ export class UserSessionManager {
if (!data.enabled) {
session.removeSource(sourceId)
} else {
// Prefer the just-provided credentials over what was in the DB.
let credentials: unknown = null
if (data.credentials !== undefined) {
credentials = data.credentials
} else if (existingRow?.credentials) {
credentials = this.decryptCredentials(existingRow.credentials)
}
const source = await provider.feedSourceForUser(userId, config, credentials)
if (session.hasSource(sourceId)) {
session.replaceSource(sourceId, source)
} else {
session.addSource(source)
const credentials = existingRow?.credentials
? this.decryptCredentials(existingRow.credentials)
: null
try {
const source = await provider.feedSourceForUser(userId, config, credentials)
if (session.hasSource(sourceId)) {
session.replaceSource(sourceId, source)
} else {
session.addSource(source)
}
} catch (err) {
// Provider may fail when credentials are not yet available (e.g. new
// source added before updateSourceCredentials is called). The config
// is already persisted above; updateSourceCredentials will add the
// source to the session later.
console.warn(
`[UserSessionManager] feedSourceForUser("${sourceId}") failed during upsert for user ${userId}, skipping session update:`,
err,
)
}
}
}

View File

@@ -80,9 +80,6 @@ function createInMemoryStore() {
async find(sourceId: string) {
return rows.get(key(userId, sourceId))
},
async findForUpdate(sourceId: string) {
return rows.get(key(userId, sourceId))
},
async updateConfig(sourceId: string, update: { enabled?: boolean; config?: unknown }) {
const existing = rows.get(key(userId, sourceId))
if (!existing) {
@@ -128,9 +125,7 @@ mock.module("../sources/user-sources.ts", () => ({
},
}))
const fakeDb = {
transaction: <T>(fn: (tx: unknown) => Promise<T>) => fn(fakeDb),
} as unknown as Database
const fakeDb = {} as Database
function createApp(providers: FeedSourceProvider[], userId?: string) {
const sessionManager = new UserSessionManager({ providers, db: fakeDb })
@@ -743,42 +738,6 @@ describe("PUT /api/sources/:sourceId", () => {
expect(res.status).toBe(204)
})
test("returns 204 when credentials are included alongside config", async () => {
activeStore = createInMemoryStore()
const { app } = createAppWithEncryptor(
[createStubProvider("aelis.weather", weatherConfig)],
MOCK_USER_ID,
)
const res = await put(app, "aelis.weather", {
enabled: true,
config: { units: "metric" },
credentials: { apiKey: "secret123" },
})
expect(res.status).toBe(204)
const row = activeStore.rows.get(`${MOCK_USER_ID}:aelis.weather`)
expect(row).toBeDefined()
expect(row!.enabled).toBe(true)
expect(row!.config).toEqual({ units: "metric" })
})
test("returns 503 when credentials are provided but no encryptor is configured", async () => {
activeStore = createInMemoryStore()
// createApp does NOT configure an encryptor
const { app } = createApp([createStubProvider("aelis.weather", weatherConfig)], MOCK_USER_ID)
const res = await put(app, "aelis.weather", {
enabled: true,
config: { units: "metric" },
credentials: { apiKey: "secret123" },
})
expect(res.status).toBe(503)
const body = (await res.json()) as { error: string }
expect(body.error).toContain("not configured")
})
})
describe("PUT /api/sources/:sourceId/credentials", () => {

View File

@@ -34,13 +34,11 @@ const ReplaceSourceConfigRequestBody = type({
"+": "reject",
enabled: "boolean",
config: "unknown",
"credentials?": "unknown",
})
const ReplaceSourceConfigNoConfigRequestBody = type({
"+": "reject",
enabled: "boolean",
"credentials?": "unknown",
})
export function registerSourcesHttpHandlers(
@@ -163,15 +161,14 @@ async function handleReplaceSource(c: Context<Env>) {
return c.json({ error: parsed.summary }, 400)
}
const { enabled, credentials } = parsed
const { enabled } = parsed
const config = "config" in parsed ? parsed.config : undefined
const user = c.get("user")!
try {
await sessionManager.saveSourceConfig(user.id, sourceId, {
await sessionManager.upsertSourceConfig(user.id, sourceId, {
enabled,
config,
credentials,
})
} catch (err) {
if (err instanceof SourceNotFoundError) {
@@ -180,9 +177,6 @@ async function handleReplaceSource(c: Context<Env>) {
if (err instanceof InvalidSourceConfigError) {
return c.json({ error: err.message }, 400)
}
if (err instanceof CredentialStorageUnavailableError) {
return c.json({ error: err.message }, 503)
}
throw err
}

View File

@@ -26,18 +26,6 @@ export function sources(db: Database, userId: string) {
return rows[0]
},
/** Like find(), but acquires a row lock to prevent concurrent modifications. Must be called inside a transaction. */
async findForUpdate(sourceId: string) {
const rows = await db
.select()
.from(userSources)
.where(and(eq(userSources.userId, userId), eq(userSources.sourceId, sourceId)))
.limit(1)
.for("update")
return rows[0]
},
/** Enables a source for the user. Throws if the source row doesn't exist. */
async enableSource(sourceId: string) {
const rows = await db