mirror of
https://github.com/kennethnym/aris.git
synced 2026-03-27 12:21:17 +00:00
feat(backend): add DB persistence layer (#79)
* feat(backend): add DB persistence layer Replace raw pg Pool with Drizzle ORM backed by Bun.sql. Add per-user source configuration table (user_sources). Migrate Better Auth to drizzle-adapter. Add AES-256-GCM credential encryption. Co-authored-by: Ona <no-reply@ona.com> * fix(backend): set updatedAt explicitly in all mutations onConflictDoUpdate bypasses Drizzle's $onUpdate hook. Set updatedAt explicitly in all mutation methods. Co-authored-by: Ona <no-reply@ona.com> * fix(backend): add composite index on user_sources Add (user_id, enabled) index for the enabled() query path. Co-authored-by: Ona <no-reply@ona.com> --------- Co-authored-by: Ona <no-reply@ona.com>
This commit is contained in:
79
apps/aelis-backend/src/sources/user-sources.ts
Normal file
79
apps/aelis-backend/src/sources/user-sources.ts
Normal file
@@ -0,0 +1,79 @@
|
||||
import { and, eq } from "drizzle-orm"
|
||||
|
||||
import type { Database } from "../db/index.ts"
|
||||
|
||||
import { userSources } from "../db/schema.ts"
|
||||
import { SourceNotFoundError } from "./errors.ts"
|
||||
|
||||
export function sources(db: Database, userId: string) {
|
||||
return {
|
||||
/** Returns all enabled sources for the user. */
|
||||
async enabled() {
|
||||
return db
|
||||
.select()
|
||||
.from(userSources)
|
||||
.where(and(eq(userSources.userId, userId), eq(userSources.enabled, true)))
|
||||
},
|
||||
|
||||
/** Returns a specific source by ID, or undefined. */
|
||||
async find(sourceId: string) {
|
||||
const rows = await db
|
||||
.select()
|
||||
.from(userSources)
|
||||
.where(and(eq(userSources.userId, userId), eq(userSources.sourceId, sourceId)))
|
||||
.limit(1)
|
||||
|
||||
return rows[0]
|
||||
},
|
||||
|
||||
/** Enables a source for the user. Throws if the source row doesn't exist. */
|
||||
async enableSource(sourceId: string) {
|
||||
const rows = await db
|
||||
.update(userSources)
|
||||
.set({ enabled: true, updatedAt: new Date() })
|
||||
.where(and(eq(userSources.userId, userId), eq(userSources.sourceId, sourceId)))
|
||||
.returning({ id: userSources.id })
|
||||
|
||||
if (rows.length === 0) {
|
||||
throw new SourceNotFoundError(sourceId, userId)
|
||||
}
|
||||
},
|
||||
|
||||
/** Disables a source for the user. Throws if the source row doesn't exist. */
|
||||
async disableSource(sourceId: string) {
|
||||
const rows = await db
|
||||
.update(userSources)
|
||||
.set({ enabled: false, updatedAt: new Date() })
|
||||
.where(and(eq(userSources.userId, userId), eq(userSources.sourceId, sourceId)))
|
||||
.returning({ id: userSources.id })
|
||||
|
||||
if (rows.length === 0) {
|
||||
throw new SourceNotFoundError(sourceId, userId)
|
||||
}
|
||||
},
|
||||
|
||||
/** Creates or updates the config for a source. */
|
||||
async upsertConfig(sourceId: string, config: Record<string, unknown>) {
|
||||
await db
|
||||
.insert(userSources)
|
||||
.values({ userId, sourceId, config })
|
||||
.onConflictDoUpdate({
|
||||
target: [userSources.userId, userSources.sourceId],
|
||||
set: { config, updatedAt: new Date() },
|
||||
})
|
||||
},
|
||||
|
||||
/** Updates the encrypted credentials for a source. Throws if the source row doesn't exist. */
|
||||
async updateCredentials(sourceId: string, credentials: Buffer) {
|
||||
const rows = await db
|
||||
.update(userSources)
|
||||
.set({ credentials, updatedAt: new Date() })
|
||||
.where(and(eq(userSources.userId, userId), eq(userSources.sourceId, sourceId)))
|
||||
.returning({ id: userSources.id })
|
||||
|
||||
if (rows.length === 0) {
|
||||
throw new SourceNotFoundError(sourceId, userId)
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user