From d46e6a9c5da5287c440a309faeac137f2c91a881 Mon Sep 17 00:00:00 2001 From: kenneth Date: Sun, 12 Apr 2026 14:40:54 +0000 Subject: [PATCH] feat: wrap multi-step DB writes in transactions - saveSourceConfig: upsert + credential update run atomically - updateSourceConfig: SELECT FOR UPDATE prevents lost updates - Widen Database type to accept transaction handles Co-authored-by: Ona --- apps/aelis-backend/src/db/index.ts | 7 ++- .../src/session/user-session-manager.test.ts | 22 ++++++- .../src/session/user-session-manager.ts | 63 ++++++++++--------- apps/aelis-backend/src/sources/http.test.ts | 7 ++- .../aelis-backend/src/sources/user-sources.ts | 12 ++++ 5 files changed, 78 insertions(+), 33 deletions(-) diff --git a/apps/aelis-backend/src/db/index.ts b/apps/aelis-backend/src/db/index.ts index c45e7f9..df53d46 100644 --- a/apps/aelis-backend/src/db/index.ts +++ b/apps/aelis-backend/src/db/index.ts @@ -1,9 +1,12 @@ +import type { PgDatabase } from "drizzle-orm/pg-core" + import { SQL } from "bun" -import { drizzle, type BunSQLDatabase } from "drizzle-orm/bun-sql" +import { drizzle, type BunSQLQueryResultHKT } from "drizzle-orm/bun-sql" import * as schema from "./schema.ts" -export type Database = BunSQLDatabase +/** Covers both the top-level drizzle instance and transaction handles. */ +export type Database = PgDatabase export interface DatabaseConnection { db: Database diff --git a/apps/aelis-backend/src/session/user-session-manager.test.ts b/apps/aelis-backend/src/session/user-session-manager.test.ts index 7bf442f..781dc9f 100644 --- a/apps/aelis-backend/src/session/user-session-manager.test.ts +++ b/apps/aelis-backend/src/session/user-session-manager.test.ts @@ -81,6 +81,24 @@ mock.module("../sources/user-sources.ts", () => ({ updatedAt: now, } }, + async findForUpdate(sourceId: string) { + // Delegates to find — row locking is a no-op in tests. + if (mockFindResult !== undefined) return mockFindResult + const now = new Date() + return { + id: crypto.randomUUID(), + userId, + sourceId, + enabled: true, + config: {}, + credentials: null, + createdAt: now, + updatedAt: now, + } + }, + async updateConfig(_sourceId: string, _update: { enabled?: boolean; config?: unknown }) { + // no-op for tests + }, async upsertConfig(_sourceId: string, _data: { enabled: boolean; config: unknown }) { // no-op for tests }, @@ -93,7 +111,9 @@ mock.module("../sources/user-sources.ts", () => ({ }), })) -const fakeDb = {} as Database +const fakeDb = { + transaction: (fn: (tx: unknown) => Promise) => fn(fakeDb), +} as unknown as Database function createStubSource(id: string, items: FeedItem[] = []): FeedSource { return { diff --git a/apps/aelis-backend/src/session/user-session-manager.ts b/apps/aelis-backend/src/session/user-session-manager.ts index af87366..bb7eb25 100644 --- a/apps/aelis-backend/src/session/user-session-manager.ts +++ b/apps/aelis-backend/src/session/user-session-manager.ts @@ -126,27 +126,29 @@ export class UserSessionManager { return } - // Fetch the existing row for config merging and credential access. - // NOTE: find + updateConfig is not atomic. A concurrent update could - // read stale config. Use SELECT FOR UPDATE or atomic jsonb merge if - // this becomes a problem. - const existingRow = await sources(this.db, userId).find(sourceId) + // Use a transaction with SELECT FOR UPDATE to prevent lost updates + // when concurrent PATCH requests merge config against the same base. + const { existingRow, mergedConfig } = await this.db.transaction(async (tx) => { + const existingRow = await sources(tx, userId).findForUpdate(sourceId) - let mergedConfig: Record | undefined - if (update.config !== undefined && provider.configSchema) { - const existingConfig = (existingRow?.config ?? {}) as Record - mergedConfig = merge({}, existingConfig, update.config) + let mergedConfig: Record | undefined + if (update.config !== undefined && provider.configSchema) { + const existingConfig = (existingRow?.config ?? {}) as Record + mergedConfig = merge({}, existingConfig, update.config) - const validated = provider.configSchema(mergedConfig) - if (validated instanceof type.errors) { - throw new InvalidSourceConfigError(sourceId, validated.summary) + const validated = provider.configSchema(mergedConfig) + if (validated instanceof type.errors) { + throw new InvalidSourceConfigError(sourceId, validated.summary) + } } - } - // Throws SourceNotFoundError if the row doesn't exist - await sources(this.db, userId).updateConfig(sourceId, { - enabled: update.enabled, - config: mergedConfig, + // Throws SourceNotFoundError if the row doesn't exist + await sources(tx, userId).updateConfig(sourceId, { + enabled: update.enabled, + config: mergedConfig, + }) + + return { existingRow, mergedConfig } }) // Refresh the specific source in the active session instead of @@ -202,21 +204,24 @@ export class UserSessionManager { const config = data.config ?? {} - // Fetch existing row before upsert to capture credentials for session refresh. - // For new rows this will be undefined — credentials will be null. - const existingRow = await sources(this.db, userId).find(sourceId) + // Run the upsert + credential update atomically so a failure in + // either step doesn't leave the row in an inconsistent state. + const existingRow = await this.db.transaction(async (tx) => { + const existing = await sources(tx, userId).find(sourceId) - await sources(this.db, userId).upsertConfig(sourceId, { - enabled: data.enabled, - config, + await sources(tx, userId).upsertConfig(sourceId, { + enabled: data.enabled, + config, + }) + + if (data.credentials !== undefined && this.encryptor) { + const encrypted = this.encryptor.encrypt(JSON.stringify(data.credentials)) + await sources(tx, userId).updateCredentials(sourceId, encrypted) + } + + return existing }) - // Persist credentials after the upsert so the row exists. - if (data.credentials !== undefined && this.encryptor) { - const encrypted = this.encryptor.encrypt(JSON.stringify(data.credentials)) - await sources(this.db, userId).updateCredentials(sourceId, encrypted) - } - const session = this.sessions.get(userId) if (session) { if (!data.enabled) { diff --git a/apps/aelis-backend/src/sources/http.test.ts b/apps/aelis-backend/src/sources/http.test.ts index ec20799..3e3a34d 100644 --- a/apps/aelis-backend/src/sources/http.test.ts +++ b/apps/aelis-backend/src/sources/http.test.ts @@ -80,6 +80,9 @@ function createInMemoryStore() { async find(sourceId: string) { return rows.get(key(userId, sourceId)) }, + async findForUpdate(sourceId: string) { + return rows.get(key(userId, sourceId)) + }, async updateConfig(sourceId: string, update: { enabled?: boolean; config?: unknown }) { const existing = rows.get(key(userId, sourceId)) if (!existing) { @@ -125,7 +128,9 @@ mock.module("../sources/user-sources.ts", () => ({ }, })) -const fakeDb = {} as Database +const fakeDb = { + transaction: (fn: (tx: unknown) => Promise) => fn(fakeDb), +} as unknown as Database function createApp(providers: FeedSourceProvider[], userId?: string) { const sessionManager = new UserSessionManager({ providers, db: fakeDb }) diff --git a/apps/aelis-backend/src/sources/user-sources.ts b/apps/aelis-backend/src/sources/user-sources.ts index 8509a65..e5c91d3 100644 --- a/apps/aelis-backend/src/sources/user-sources.ts +++ b/apps/aelis-backend/src/sources/user-sources.ts @@ -26,6 +26,18 @@ export function sources(db: Database, userId: string) { return rows[0] }, + /** Like find(), but acquires a row lock to prevent concurrent modifications. Must be called inside a transaction. */ + async findForUpdate(sourceId: string) { + const rows = await db + .select() + .from(userSources) + .where(and(eq(userSources.userId, userId), eq(userSources.sourceId, sourceId))) + .limit(1) + .for("update") + + return rows[0] + }, + /** Enables a source for the user. Throws if the source row doesn't exist. */ async enableSource(sourceId: string) { const rows = await db