mirror of
https://github.com/kennethnym/aris.git
synced 2026-04-13 13:21:18 +01:00
Compare commits
2 Commits
fix/unifie
...
feat/db-tr
| Author | SHA1 | Date | |
|---|---|---|---|
|
d46e6a9c5d
|
|||
| e54c5d5462 |
@@ -1,9 +1,12 @@
|
||||
import type { PgDatabase } from "drizzle-orm/pg-core"
|
||||
|
||||
import { SQL } from "bun"
|
||||
import { drizzle, type BunSQLDatabase } from "drizzle-orm/bun-sql"
|
||||
import { drizzle, type BunSQLQueryResultHKT } from "drizzle-orm/bun-sql"
|
||||
|
||||
import * as schema from "./schema.ts"
|
||||
|
||||
export type Database = BunSQLDatabase<typeof schema>
|
||||
/** Covers both the top-level drizzle instance and transaction handles. */
|
||||
export type Database = PgDatabase<BunSQLQueryResultHKT, typeof schema>
|
||||
|
||||
export interface DatabaseConnection {
|
||||
db: Database
|
||||
|
||||
@@ -81,6 +81,24 @@ mock.module("../sources/user-sources.ts", () => ({
|
||||
updatedAt: now,
|
||||
}
|
||||
},
|
||||
async findForUpdate(sourceId: string) {
|
||||
// Delegates to find — row locking is a no-op in tests.
|
||||
if (mockFindResult !== undefined) return mockFindResult
|
||||
const now = new Date()
|
||||
return {
|
||||
id: crypto.randomUUID(),
|
||||
userId,
|
||||
sourceId,
|
||||
enabled: true,
|
||||
config: {},
|
||||
credentials: null,
|
||||
createdAt: now,
|
||||
updatedAt: now,
|
||||
}
|
||||
},
|
||||
async updateConfig(_sourceId: string, _update: { enabled?: boolean; config?: unknown }) {
|
||||
// no-op for tests
|
||||
},
|
||||
async upsertConfig(_sourceId: string, _data: { enabled: boolean; config: unknown }) {
|
||||
// no-op for tests
|
||||
},
|
||||
@@ -93,7 +111,9 @@ mock.module("../sources/user-sources.ts", () => ({
|
||||
}),
|
||||
}))
|
||||
|
||||
const fakeDb = {} as Database
|
||||
const fakeDb = {
|
||||
transaction: <T>(fn: (tx: unknown) => Promise<T>) => fn(fakeDb),
|
||||
} as unknown as Database
|
||||
|
||||
function createStubSource(id: string, items: FeedItem[] = []): FeedSource {
|
||||
return {
|
||||
|
||||
@@ -126,27 +126,29 @@ export class UserSessionManager {
|
||||
return
|
||||
}
|
||||
|
||||
// Fetch the existing row for config merging and credential access.
|
||||
// NOTE: find + updateConfig is not atomic. A concurrent update could
|
||||
// read stale config. Use SELECT FOR UPDATE or atomic jsonb merge if
|
||||
// this becomes a problem.
|
||||
const existingRow = await sources(this.db, userId).find(sourceId)
|
||||
// Use a transaction with SELECT FOR UPDATE to prevent lost updates
|
||||
// when concurrent PATCH requests merge config against the same base.
|
||||
const { existingRow, mergedConfig } = await this.db.transaction(async (tx) => {
|
||||
const existingRow = await sources(tx, userId).findForUpdate(sourceId)
|
||||
|
||||
let mergedConfig: Record<string, unknown> | undefined
|
||||
if (update.config !== undefined && provider.configSchema) {
|
||||
const existingConfig = (existingRow?.config ?? {}) as Record<string, unknown>
|
||||
mergedConfig = merge({}, existingConfig, update.config)
|
||||
let mergedConfig: Record<string, unknown> | undefined
|
||||
if (update.config !== undefined && provider.configSchema) {
|
||||
const existingConfig = (existingRow?.config ?? {}) as Record<string, unknown>
|
||||
mergedConfig = merge({}, existingConfig, update.config)
|
||||
|
||||
const validated = provider.configSchema(mergedConfig)
|
||||
if (validated instanceof type.errors) {
|
||||
throw new InvalidSourceConfigError(sourceId, validated.summary)
|
||||
const validated = provider.configSchema(mergedConfig)
|
||||
if (validated instanceof type.errors) {
|
||||
throw new InvalidSourceConfigError(sourceId, validated.summary)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Throws SourceNotFoundError if the row doesn't exist
|
||||
await sources(this.db, userId).updateConfig(sourceId, {
|
||||
enabled: update.enabled,
|
||||
config: mergedConfig,
|
||||
// Throws SourceNotFoundError if the row doesn't exist
|
||||
await sources(tx, userId).updateConfig(sourceId, {
|
||||
enabled: update.enabled,
|
||||
config: mergedConfig,
|
||||
})
|
||||
|
||||
return { existingRow, mergedConfig }
|
||||
})
|
||||
|
||||
// Refresh the specific source in the active session instead of
|
||||
@@ -202,21 +204,24 @@ export class UserSessionManager {
|
||||
|
||||
const config = data.config ?? {}
|
||||
|
||||
// Fetch existing row before upsert to capture credentials for session refresh.
|
||||
// For new rows this will be undefined — credentials will be null.
|
||||
const existingRow = await sources(this.db, userId).find(sourceId)
|
||||
// Run the upsert + credential update atomically so a failure in
|
||||
// either step doesn't leave the row in an inconsistent state.
|
||||
const existingRow = await this.db.transaction(async (tx) => {
|
||||
const existing = await sources(tx, userId).find(sourceId)
|
||||
|
||||
await sources(this.db, userId).upsertConfig(sourceId, {
|
||||
enabled: data.enabled,
|
||||
config,
|
||||
await sources(tx, userId).upsertConfig(sourceId, {
|
||||
enabled: data.enabled,
|
||||
config,
|
||||
})
|
||||
|
||||
if (data.credentials !== undefined && this.encryptor) {
|
||||
const encrypted = this.encryptor.encrypt(JSON.stringify(data.credentials))
|
||||
await sources(tx, userId).updateCredentials(sourceId, encrypted)
|
||||
}
|
||||
|
||||
return existing
|
||||
})
|
||||
|
||||
// Persist credentials after the upsert so the row exists.
|
||||
if (data.credentials !== undefined && this.encryptor) {
|
||||
const encrypted = this.encryptor.encrypt(JSON.stringify(data.credentials))
|
||||
await sources(this.db, userId).updateCredentials(sourceId, encrypted)
|
||||
}
|
||||
|
||||
const session = this.sessions.get(userId)
|
||||
if (session) {
|
||||
if (!data.enabled) {
|
||||
|
||||
@@ -80,6 +80,9 @@ function createInMemoryStore() {
|
||||
async find(sourceId: string) {
|
||||
return rows.get(key(userId, sourceId))
|
||||
},
|
||||
async findForUpdate(sourceId: string) {
|
||||
return rows.get(key(userId, sourceId))
|
||||
},
|
||||
async updateConfig(sourceId: string, update: { enabled?: boolean; config?: unknown }) {
|
||||
const existing = rows.get(key(userId, sourceId))
|
||||
if (!existing) {
|
||||
@@ -125,7 +128,9 @@ mock.module("../sources/user-sources.ts", () => ({
|
||||
},
|
||||
}))
|
||||
|
||||
const fakeDb = {} as Database
|
||||
const fakeDb = {
|
||||
transaction: <T>(fn: (tx: unknown) => Promise<T>) => fn(fakeDb),
|
||||
} as unknown as Database
|
||||
|
||||
function createApp(providers: FeedSourceProvider[], userId?: string) {
|
||||
const sessionManager = new UserSessionManager({ providers, db: fakeDb })
|
||||
|
||||
@@ -26,6 +26,18 @@ export function sources(db: Database, userId: string) {
|
||||
return rows[0]
|
||||
},
|
||||
|
||||
/** Like find(), but acquires a row lock to prevent concurrent modifications. Must be called inside a transaction. */
|
||||
async findForUpdate(sourceId: string) {
|
||||
const rows = await db
|
||||
.select()
|
||||
.from(userSources)
|
||||
.where(and(eq(userSources.userId, userId), eq(userSources.sourceId, sourceId)))
|
||||
.limit(1)
|
||||
.for("update")
|
||||
|
||||
return rows[0]
|
||||
},
|
||||
|
||||
/** Enables a source for the user. Throws if the source row doesn't exist. */
|
||||
async enableSource(sourceId: string) {
|
||||
const rows = await db
|
||||
|
||||
Reference in New Issue
Block a user