Compare commits

..

2 Commits

Author SHA1 Message Date
d46e6a9c5d feat: wrap multi-step DB writes in transactions
- saveSourceConfig: upsert + credential update run atomically
- updateSourceConfig: SELECT FOR UPDATE prevents lost updates
- Widen Database type to accept transaction handles

Co-authored-by: Ona <no-reply@ona.com>
2026-04-12 14:40:54 +00:00
e54c5d5462 fix: accept credentials in source config upsert (#117)
* fix: unified source config + credentials

Accept optional credentials in PUT /api/sources/:sourceId so the
dashboard can send config and credentials in a single request,
eliminating the race condition between parallel config/credential
updates that left sources uninitialized until server restart.

The existing /credentials endpoint is preserved for independent
credential updates.

Co-authored-by: Ona <no-reply@ona.com>

* refactor: rename upsertSourceConfig to saveSourceConfig

Co-authored-by: Ona <no-reply@ona.com>

---------

Co-authored-by: Ona <no-reply@ona.com>
2026-04-12 15:17:29 +01:00
5 changed files with 78 additions and 33 deletions

View File

@@ -1,9 +1,12 @@
import type { PgDatabase } from "drizzle-orm/pg-core"
import { SQL } from "bun" import { SQL } from "bun"
import { drizzle, type BunSQLDatabase } from "drizzle-orm/bun-sql" import { drizzle, type BunSQLQueryResultHKT } from "drizzle-orm/bun-sql"
import * as schema from "./schema.ts" import * as schema from "./schema.ts"
export type Database = BunSQLDatabase<typeof schema> /** Covers both the top-level drizzle instance and transaction handles. */
export type Database = PgDatabase<BunSQLQueryResultHKT, typeof schema>
export interface DatabaseConnection { export interface DatabaseConnection {
db: Database db: Database

View File

@@ -81,6 +81,24 @@ mock.module("../sources/user-sources.ts", () => ({
updatedAt: now, updatedAt: now,
} }
}, },
async findForUpdate(sourceId: string) {
// Delegates to find — row locking is a no-op in tests.
if (mockFindResult !== undefined) return mockFindResult
const now = new Date()
return {
id: crypto.randomUUID(),
userId,
sourceId,
enabled: true,
config: {},
credentials: null,
createdAt: now,
updatedAt: now,
}
},
async updateConfig(_sourceId: string, _update: { enabled?: boolean; config?: unknown }) {
// no-op for tests
},
async upsertConfig(_sourceId: string, _data: { enabled: boolean; config: unknown }) { async upsertConfig(_sourceId: string, _data: { enabled: boolean; config: unknown }) {
// no-op for tests // no-op for tests
}, },
@@ -93,7 +111,9 @@ mock.module("../sources/user-sources.ts", () => ({
}), }),
})) }))
const fakeDb = {} as Database const fakeDb = {
transaction: <T>(fn: (tx: unknown) => Promise<T>) => fn(fakeDb),
} as unknown as Database
function createStubSource(id: string, items: FeedItem[] = []): FeedSource { function createStubSource(id: string, items: FeedItem[] = []): FeedSource {
return { return {

View File

@@ -126,27 +126,29 @@ export class UserSessionManager {
return return
} }
// Fetch the existing row for config merging and credential access. // Use a transaction with SELECT FOR UPDATE to prevent lost updates
// NOTE: find + updateConfig is not atomic. A concurrent update could // when concurrent PATCH requests merge config against the same base.
// read stale config. Use SELECT FOR UPDATE or atomic jsonb merge if const { existingRow, mergedConfig } = await this.db.transaction(async (tx) => {
// this becomes a problem. const existingRow = await sources(tx, userId).findForUpdate(sourceId)
const existingRow = await sources(this.db, userId).find(sourceId)
let mergedConfig: Record<string, unknown> | undefined let mergedConfig: Record<string, unknown> | undefined
if (update.config !== undefined && provider.configSchema) { if (update.config !== undefined && provider.configSchema) {
const existingConfig = (existingRow?.config ?? {}) as Record<string, unknown> const existingConfig = (existingRow?.config ?? {}) as Record<string, unknown>
mergedConfig = merge({}, existingConfig, update.config) mergedConfig = merge({}, existingConfig, update.config)
const validated = provider.configSchema(mergedConfig) const validated = provider.configSchema(mergedConfig)
if (validated instanceof type.errors) { if (validated instanceof type.errors) {
throw new InvalidSourceConfigError(sourceId, validated.summary) throw new InvalidSourceConfigError(sourceId, validated.summary)
}
} }
}
// Throws SourceNotFoundError if the row doesn't exist // Throws SourceNotFoundError if the row doesn't exist
await sources(this.db, userId).updateConfig(sourceId, { await sources(tx, userId).updateConfig(sourceId, {
enabled: update.enabled, enabled: update.enabled,
config: mergedConfig, config: mergedConfig,
})
return { existingRow, mergedConfig }
}) })
// Refresh the specific source in the active session instead of // Refresh the specific source in the active session instead of
@@ -202,21 +204,24 @@ export class UserSessionManager {
const config = data.config ?? {} const config = data.config ?? {}
// Fetch existing row before upsert to capture credentials for session refresh. // Run the upsert + credential update atomically so a failure in
// For new rows this will be undefined — credentials will be null. // either step doesn't leave the row in an inconsistent state.
const existingRow = await sources(this.db, userId).find(sourceId) const existingRow = await this.db.transaction(async (tx) => {
const existing = await sources(tx, userId).find(sourceId)
await sources(this.db, userId).upsertConfig(sourceId, { await sources(tx, userId).upsertConfig(sourceId, {
enabled: data.enabled, enabled: data.enabled,
config, config,
})
if (data.credentials !== undefined && this.encryptor) {
const encrypted = this.encryptor.encrypt(JSON.stringify(data.credentials))
await sources(tx, userId).updateCredentials(sourceId, encrypted)
}
return existing
}) })
// Persist credentials after the upsert so the row exists.
if (data.credentials !== undefined && this.encryptor) {
const encrypted = this.encryptor.encrypt(JSON.stringify(data.credentials))
await sources(this.db, userId).updateCredentials(sourceId, encrypted)
}
const session = this.sessions.get(userId) const session = this.sessions.get(userId)
if (session) { if (session) {
if (!data.enabled) { if (!data.enabled) {

View File

@@ -80,6 +80,9 @@ function createInMemoryStore() {
async find(sourceId: string) { async find(sourceId: string) {
return rows.get(key(userId, sourceId)) return rows.get(key(userId, sourceId))
}, },
async findForUpdate(sourceId: string) {
return rows.get(key(userId, sourceId))
},
async updateConfig(sourceId: string, update: { enabled?: boolean; config?: unknown }) { async updateConfig(sourceId: string, update: { enabled?: boolean; config?: unknown }) {
const existing = rows.get(key(userId, sourceId)) const existing = rows.get(key(userId, sourceId))
if (!existing) { if (!existing) {
@@ -125,7 +128,9 @@ mock.module("../sources/user-sources.ts", () => ({
}, },
})) }))
const fakeDb = {} as Database const fakeDb = {
transaction: <T>(fn: (tx: unknown) => Promise<T>) => fn(fakeDb),
} as unknown as Database
function createApp(providers: FeedSourceProvider[], userId?: string) { function createApp(providers: FeedSourceProvider[], userId?: string) {
const sessionManager = new UserSessionManager({ providers, db: fakeDb }) const sessionManager = new UserSessionManager({ providers, db: fakeDb })

View File

@@ -26,6 +26,18 @@ export function sources(db: Database, userId: string) {
return rows[0] return rows[0]
}, },
/** Like find(), but acquires a row lock to prevent concurrent modifications. Must be called inside a transaction. */
async findForUpdate(sourceId: string) {
const rows = await db
.select()
.from(userSources)
.where(and(eq(userSources.userId, userId), eq(userSources.sourceId, sourceId)))
.limit(1)
.for("update")
return rows[0]
},
/** Enables a source for the user. Throws if the source row doesn't exist. */ /** Enables a source for the user. Throws if the source row doesn't exist. */
async enableSource(sourceId: string) { async enableSource(sourceId: string) {
const rows = await db const rows = await db