feat: wrap multi-step DB writes in transactions (#118)

- saveSourceConfig: upsert + credential update run atomically
- updateSourceConfig: SELECT FOR UPDATE prevents lost updates
- Widen Database type to accept transaction handles

Co-authored-by: Ona <no-reply@ona.com>
This commit is contained in:
2026-04-12 15:46:30 +01:00
committed by GitHub
parent e54c5d5462
commit 62c8dfe0b1
5 changed files with 78 additions and 33 deletions

View File

@@ -1,9 +1,12 @@
import type { PgDatabase } from "drizzle-orm/pg-core"
import { SQL } from "bun" import { SQL } from "bun"
import { drizzle, type BunSQLDatabase } from "drizzle-orm/bun-sql" import { drizzle, type BunSQLQueryResultHKT } from "drizzle-orm/bun-sql"
import * as schema from "./schema.ts" import * as schema from "./schema.ts"
export type Database = BunSQLDatabase<typeof schema> /** Covers both the top-level drizzle instance and transaction handles. */
export type Database = PgDatabase<BunSQLQueryResultHKT, typeof schema>
export interface DatabaseConnection { export interface DatabaseConnection {
db: Database db: Database

View File

@@ -81,6 +81,24 @@ mock.module("../sources/user-sources.ts", () => ({
updatedAt: now, updatedAt: now,
} }
}, },
async findForUpdate(sourceId: string) {
// Delegates to find — row locking is a no-op in tests.
if (mockFindResult !== undefined) return mockFindResult
const now = new Date()
return {
id: crypto.randomUUID(),
userId,
sourceId,
enabled: true,
config: {},
credentials: null,
createdAt: now,
updatedAt: now,
}
},
async updateConfig(_sourceId: string, _update: { enabled?: boolean; config?: unknown }) {
// no-op for tests
},
async upsertConfig(_sourceId: string, _data: { enabled: boolean; config: unknown }) { async upsertConfig(_sourceId: string, _data: { enabled: boolean; config: unknown }) {
// no-op for tests // no-op for tests
}, },
@@ -93,7 +111,9 @@ mock.module("../sources/user-sources.ts", () => ({
}), }),
})) }))
const fakeDb = {} as Database const fakeDb = {
transaction: <T>(fn: (tx: unknown) => Promise<T>) => fn(fakeDb),
} as unknown as Database
function createStubSource(id: string, items: FeedItem[] = []): FeedSource { function createStubSource(id: string, items: FeedItem[] = []): FeedSource {
return { return {

View File

@@ -126,11 +126,10 @@ export class UserSessionManager {
return return
} }
// Fetch the existing row for config merging and credential access. // Use a transaction with SELECT FOR UPDATE to prevent lost updates
// NOTE: find + updateConfig is not atomic. A concurrent update could // when concurrent PATCH requests merge config against the same base.
// read stale config. Use SELECT FOR UPDATE or atomic jsonb merge if const { existingRow, mergedConfig } = await this.db.transaction(async (tx) => {
// this becomes a problem. const existingRow = await sources(tx, userId).findForUpdate(sourceId)
const existingRow = await sources(this.db, userId).find(sourceId)
let mergedConfig: Record<string, unknown> | undefined let mergedConfig: Record<string, unknown> | undefined
if (update.config !== undefined && provider.configSchema) { if (update.config !== undefined && provider.configSchema) {
@@ -144,11 +143,14 @@ export class UserSessionManager {
} }
// Throws SourceNotFoundError if the row doesn't exist // Throws SourceNotFoundError if the row doesn't exist
await sources(this.db, userId).updateConfig(sourceId, { await sources(tx, userId).updateConfig(sourceId, {
enabled: update.enabled, enabled: update.enabled,
config: mergedConfig, config: mergedConfig,
}) })
return { existingRow, mergedConfig }
})
// Refresh the specific source in the active session instead of // Refresh the specific source in the active session instead of
// destroying the entire session. // destroying the entire session.
const session = this.sessions.get(userId) const session = this.sessions.get(userId)
@@ -202,21 +204,24 @@ export class UserSessionManager {
const config = data.config ?? {} const config = data.config ?? {}
// Fetch existing row before upsert to capture credentials for session refresh. // Run the upsert + credential update atomically so a failure in
// For new rows this will be undefined — credentials will be null. // either step doesn't leave the row in an inconsistent state.
const existingRow = await sources(this.db, userId).find(sourceId) const existingRow = await this.db.transaction(async (tx) => {
const existing = await sources(tx, userId).find(sourceId)
await sources(this.db, userId).upsertConfig(sourceId, { await sources(tx, userId).upsertConfig(sourceId, {
enabled: data.enabled, enabled: data.enabled,
config, config,
}) })
// Persist credentials after the upsert so the row exists.
if (data.credentials !== undefined && this.encryptor) { if (data.credentials !== undefined && this.encryptor) {
const encrypted = this.encryptor.encrypt(JSON.stringify(data.credentials)) const encrypted = this.encryptor.encrypt(JSON.stringify(data.credentials))
await sources(this.db, userId).updateCredentials(sourceId, encrypted) await sources(tx, userId).updateCredentials(sourceId, encrypted)
} }
return existing
})
const session = this.sessions.get(userId) const session = this.sessions.get(userId)
if (session) { if (session) {
if (!data.enabled) { if (!data.enabled) {

View File

@@ -80,6 +80,9 @@ function createInMemoryStore() {
async find(sourceId: string) { async find(sourceId: string) {
return rows.get(key(userId, sourceId)) return rows.get(key(userId, sourceId))
}, },
async findForUpdate(sourceId: string) {
return rows.get(key(userId, sourceId))
},
async updateConfig(sourceId: string, update: { enabled?: boolean; config?: unknown }) { async updateConfig(sourceId: string, update: { enabled?: boolean; config?: unknown }) {
const existing = rows.get(key(userId, sourceId)) const existing = rows.get(key(userId, sourceId))
if (!existing) { if (!existing) {
@@ -125,7 +128,9 @@ mock.module("../sources/user-sources.ts", () => ({
}, },
})) }))
const fakeDb = {} as Database const fakeDb = {
transaction: <T>(fn: (tx: unknown) => Promise<T>) => fn(fakeDb),
} as unknown as Database
function createApp(providers: FeedSourceProvider[], userId?: string) { function createApp(providers: FeedSourceProvider[], userId?: string) {
const sessionManager = new UserSessionManager({ providers, db: fakeDb }) const sessionManager = new UserSessionManager({ providers, db: fakeDb })

View File

@@ -26,6 +26,18 @@ export function sources(db: Database, userId: string) {
return rows[0] return rows[0]
}, },
/** Like find(), but acquires a row lock to prevent concurrent modifications. Must be called inside a transaction. */
async findForUpdate(sourceId: string) {
const rows = await db
.select()
.from(userSources)
.where(and(eq(userSources.userId, userId), eq(userSources.sourceId, sourceId)))
.limit(1)
.for("update")
return rows[0]
},
/** Enables a source for the user. Throws if the source row doesn't exist. */ /** Enables a source for the user. Throws if the source row doesn't exist. */
async enableSource(sourceId: string) { async enableSource(sourceId: string) {
const rows = await db const rows = await db