Compare commits

..

1 Commits

Author SHA1 Message Date
d949296104 fix: add source to session on cred update
When updateSourceCredentials was called for a source not yet in the
active session (e.g. because credentials were missing at config time),
the source was never instantiated despite being enabled in the DB.

Now, if the source row is enabled but absent from the session, the
source is added instead of skipped.

Co-authored-by: Ona <no-reply@ona.com>
2026-04-12 11:40:56 +00:00
8 changed files with 86 additions and 279 deletions

View File

@@ -20,7 +20,13 @@ import {
import { Separator } from "@/components/ui/separator"
import { Switch } from "@/components/ui/switch"
import { Tooltip, TooltipContent, TooltipTrigger } from "@/components/ui/tooltip"
import { fetchSourceConfig, pushLocation, replaceSource, updateProviderConfig } from "@/lib/api"
import {
fetchSourceConfig,
pushLocation,
replaceSource,
updateProviderConfig,
updateSourceCredentials,
} from "@/lib/api"
interface SourceConfigPanelProps {
source: SourceDefinition
@@ -74,24 +80,23 @@ export function SourceConfigPanel({ source, onUpdate }: SourceConfigPanelProps)
const saveMutation = useMutation({
mutationFn: async () => {
const promises: Promise<void>[] = [
replaceSource(source.id, { enabled, config: getUserConfig() }),
]
const credentialFields = getCredentialFields()
const hasCredentials = Object.values(credentialFields).some(
(v) => typeof v === "string" && v.length > 0,
)
if (hasCredentials) {
if (source.perUserCredentials) {
promises.push(updateSourceCredentials(source.id, credentialFields))
} else {
promises.push(updateProviderConfig(source.id, { credentials: credentialFields }))
}
}
const body: Parameters<typeof replaceSource>[1] = {
enabled,
config: getUserConfig(),
}
if (hasCredentials && source.perUserCredentials) {
body.credentials = credentialFields
}
await replaceSource(source.id, body)
// For non-per-user credentials (provider-level), still use the admin endpoint.
if (hasCredentials && !source.perUserCredentials) {
await updateProviderConfig(source.id, { credentials: credentialFields })
}
await Promise.all(promises)
},
onSuccess() {
setDirty({})

View File

@@ -114,7 +114,7 @@ const sourceDefinitions: SourceDefinition[] = [
timeZone: {
type: "string",
label: "Timezone",
description: 'IANA timezone for determining "today" (e.g. Europe/London). Defaults to UTC.',
description: "IANA timezone for determining \"today\" (e.g. Europe/London). Defaults to UTC.",
},
},
},
@@ -174,7 +174,7 @@ export async function fetchConfigs(): Promise<SourceConfig[]> {
export async function replaceSource(
sourceId: string,
body: { enabled: boolean; config: unknown; credentials?: Record<string, unknown> },
body: { enabled: boolean; config: unknown },
): Promise<void> {
const res = await fetch(`${serverBase()}/sources/${sourceId}`, {
method: "PUT",

View File

@@ -1,12 +1,9 @@
import type { PgDatabase } from "drizzle-orm/pg-core"
import { SQL } from "bun"
import { drizzle, type BunSQLQueryResultHKT } from "drizzle-orm/bun-sql"
import { drizzle, type BunSQLDatabase } from "drizzle-orm/bun-sql"
import * as schema from "./schema.ts"
/** Covers both the top-level drizzle instance and transaction handles. */
export type Database = PgDatabase<BunSQLQueryResultHKT, typeof schema>
export type Database = BunSQLDatabase<typeof schema>
export interface DatabaseConnection {
db: Database

View File

@@ -81,27 +81,6 @@ mock.module("../sources/user-sources.ts", () => ({
updatedAt: now,
}
},
async findForUpdate(sourceId: string) {
// Delegates to find — row locking is a no-op in tests.
if (mockFindResult !== undefined) return mockFindResult
const now = new Date()
return {
id: crypto.randomUUID(),
userId,
sourceId,
enabled: true,
config: {},
credentials: null,
createdAt: now,
updatedAt: now,
}
},
async updateConfig(_sourceId: string, _update: { enabled?: boolean; config?: unknown }) {
// no-op for tests
},
async upsertConfig(_sourceId: string, _data: { enabled: boolean; config: unknown }) {
// no-op for tests
},
async updateCredentials(sourceId: string, credentials: Buffer) {
if (mockUpdateCredentialsError) {
throw mockUpdateCredentialsError
@@ -111,9 +90,7 @@ mock.module("../sources/user-sources.ts", () => ({
}),
}))
const fakeDb = {
transaction: <T>(fn: (tx: unknown) => Promise<T>) => fn(fakeDb),
} as unknown as Database
const fakeDb = {} as Database
function createStubSource(id: string, items: FeedItem[] = []): FeedSource {
return {
@@ -829,6 +806,31 @@ describe("UserSessionManager.updateSourceCredentials", () => {
expect(receivedCredentials).toEqual({ token: "refreshed" })
})
test("adds source to session when source is enabled but not yet in session", async () => {
// Simulate a source that was never added to the session (e.g. credentials
// were missing at config time), but is enabled in the DB.
setEnabledSources([]) // no sources during session creation
const factory = mock(async () => createStubSource("test"))
const provider: FeedSourceProvider = { sourceId: "test", feedSourceForUser: factory }
const manager = new UserSessionManager({
db: fakeDb,
providers: [provider],
credentialEncryptor: testEncryptor,
})
const session = await manager.getOrCreate("user-1")
// Source is NOT in the session
expect(session.hasSource("test")).toBe(false)
// mockFindResult returns an enabled row by default, so the source
// row exists and is enabled in the DB.
await manager.updateSourceCredentials("user-1", "test", { token: "new-token" })
// Source should now be added to the session
expect(session.hasSource("test")).toBe(true)
expect(factory).toHaveBeenCalledTimes(1)
})
test("persists credentials without session refresh when no active session", async () => {
setEnabledSources(["test"])
const factory = mock(async () => createStubSource("test"))
@@ -847,121 +849,3 @@ describe("UserSessionManager.updateSourceCredentials", () => {
expect(factory).not.toHaveBeenCalled()
})
})
describe("UserSessionManager.saveSourceConfig", () => {
test("upserts config without credentials (existing behavior)", async () => {
setEnabledSources(["test"])
const factory = mock(async () => createStubSource("test"))
const provider: FeedSourceProvider = { sourceId: "test", feedSourceForUser: factory }
const manager = new UserSessionManager({
db: fakeDb,
providers: [provider],
credentialEncryptor: testEncryptor,
})
// Create a session first so we can verify the source is refreshed
await manager.getOrCreate("user-1")
await manager.saveSourceConfig("user-1", "test", {
enabled: true,
config: { key: "value" },
})
// feedSourceForUser called once for session creation, once for upsert refresh
expect(factory).toHaveBeenCalledTimes(2)
// No credentials should have been persisted
expect(mockUpdateCredentialsCalls).toHaveLength(0)
})
test("upserts config with credentials — persists both and passes credentials to source", async () => {
setEnabledSources(["test"])
let receivedCredentials: unknown = null
const factory = mock(async (_userId: string, _config: unknown, creds: unknown) => {
receivedCredentials = creds
return createStubSource("test")
})
const provider: FeedSourceProvider = { sourceId: "test", feedSourceForUser: factory }
const manager = new UserSessionManager({
db: fakeDb,
providers: [provider],
credentialEncryptor: testEncryptor,
})
// Create a session so the source refresh path runs
await manager.getOrCreate("user-1")
const creds = { username: "alice", password: "s3cret" }
await manager.saveSourceConfig("user-1", "test", {
enabled: true,
config: { serverUrl: "https://example.com" },
credentials: creds,
})
// Credentials were encrypted and persisted
expect(mockUpdateCredentialsCalls).toHaveLength(1)
const decrypted = JSON.parse(testEncryptor.decrypt(mockUpdateCredentialsCalls[0]!.credentials))
expect(decrypted).toEqual(creds)
// feedSourceForUser received the provided credentials (not null)
expect(receivedCredentials).toEqual(creds)
})
test("upserts config with credentials adds source to session when not already present", async () => {
// Start with no enabled sources so the session is empty
setEnabledSources([])
const factory = mock(async () => createStubSource("test"))
const provider: FeedSourceProvider = { sourceId: "test", feedSourceForUser: factory }
const manager = new UserSessionManager({
db: fakeDb,
providers: [provider],
credentialEncryptor: testEncryptor,
})
const session = await manager.getOrCreate("user-1")
expect(session.hasSource("test")).toBe(false)
// Set mockFindResult to undefined so find() returns a row (simulating the row was just created by upsertConfig)
await manager.saveSourceConfig("user-1", "test", {
enabled: true,
config: {},
credentials: { token: "abc" },
})
// Source should now be in the session
expect(session.hasSource("test")).toBe(true)
expect(mockUpdateCredentialsCalls).toHaveLength(1)
})
test("throws CredentialStorageUnavailableError when credentials provided without encryptor", async () => {
setEnabledSources(["test"])
const provider = createStubProvider("test")
const manager = new UserSessionManager({
db: fakeDb,
providers: [provider],
// No credentialEncryptor
})
await expect(
manager.saveSourceConfig("user-1", "test", {
enabled: true,
config: {},
credentials: { token: "abc" },
}),
).rejects.toBeInstanceOf(CredentialStorageUnavailableError)
})
test("throws SourceNotFoundError for unknown provider", async () => {
const manager = new UserSessionManager({
db: fakeDb,
providers: [],
credentialEncryptor: testEncryptor,
})
await expect(
manager.saveSourceConfig("user-1", "unknown", {
enabled: true,
config: {},
}),
).rejects.toBeInstanceOf(SourceNotFoundError)
})
})

View File

@@ -126,29 +126,27 @@ export class UserSessionManager {
return
}
// Use a transaction with SELECT FOR UPDATE to prevent lost updates
// when concurrent PATCH requests merge config against the same base.
const { existingRow, mergedConfig } = await this.db.transaction(async (tx) => {
const existingRow = await sources(tx, userId).findForUpdate(sourceId)
// Fetch the existing row for config merging and credential access.
// NOTE: find + updateConfig is not atomic. A concurrent update could
// read stale config. Use SELECT FOR UPDATE or atomic jsonb merge if
// this becomes a problem.
const existingRow = await sources(this.db, userId).find(sourceId)
let mergedConfig: Record<string, unknown> | undefined
if (update.config !== undefined && provider.configSchema) {
const existingConfig = (existingRow?.config ?? {}) as Record<string, unknown>
mergedConfig = merge({}, existingConfig, update.config)
let mergedConfig: Record<string, unknown> | undefined
if (update.config !== undefined && provider.configSchema) {
const existingConfig = (existingRow?.config ?? {}) as Record<string, unknown>
mergedConfig = merge({}, existingConfig, update.config)
const validated = provider.configSchema(mergedConfig)
if (validated instanceof type.errors) {
throw new InvalidSourceConfigError(sourceId, validated.summary)
}
const validated = provider.configSchema(mergedConfig)
if (validated instanceof type.errors) {
throw new InvalidSourceConfigError(sourceId, validated.summary)
}
}
// Throws SourceNotFoundError if the row doesn't exist
await sources(tx, userId).updateConfig(sourceId, {
enabled: update.enabled,
config: mergedConfig,
})
return { existingRow, mergedConfig }
// Throws SourceNotFoundError if the row doesn't exist
await sources(this.db, userId).updateConfig(sourceId, {
enabled: update.enabled,
config: mergedConfig,
})
// Refresh the specific source in the active session instead of
@@ -173,18 +171,13 @@ export class UserSessionManager {
* inserts a new row if one doesn't exist and fully replaces config
* (no merge).
*
* When `credentials` is provided, they are encrypted and persisted
* alongside the config in the same flow, avoiding the race condition
* of separate config + credential requests.
*
* @throws {SourceNotFoundError} if the sourceId has no registered provider
* @throws {InvalidSourceConfigError} if config fails schema validation
* @throws {CredentialStorageUnavailableError} if credentials are provided but no encryptor is configured
*/
async saveSourceConfig(
async upsertSourceConfig(
userId: string,
sourceId: string,
data: { enabled: boolean; config?: unknown; credentials?: unknown },
data: { enabled: boolean; config?: unknown },
): Promise<void> {
const provider = this.providers.get(sourceId)
if (!provider) {
@@ -198,28 +191,15 @@ export class UserSessionManager {
}
}
if (data.credentials !== undefined && !this.encryptor) {
throw new CredentialStorageUnavailableError()
}
const config = data.config ?? {}
// Run the upsert + credential update atomically so a failure in
// either step doesn't leave the row in an inconsistent state.
const existingRow = await this.db.transaction(async (tx) => {
const existing = await sources(tx, userId).find(sourceId)
// Fetch existing row before upsert to capture credentials for session refresh.
// For new rows this will be undefined — credentials will be null.
const existingRow = await sources(this.db, userId).find(sourceId)
await sources(tx, userId).upsertConfig(sourceId, {
enabled: data.enabled,
config,
})
if (data.credentials !== undefined && this.encryptor) {
const encrypted = this.encryptor.encrypt(JSON.stringify(data.credentials))
await sources(tx, userId).updateCredentials(sourceId, encrypted)
}
return existing
await sources(this.db, userId).upsertConfig(sourceId, {
enabled: data.enabled,
config,
})
const session = this.sessions.get(userId)
@@ -227,13 +207,9 @@ export class UserSessionManager {
if (!data.enabled) {
session.removeSource(sourceId)
} else {
// Prefer the just-provided credentials over what was in the DB.
let credentials: unknown = null
if (data.credentials !== undefined) {
credentials = data.credentials
} else if (existingRow?.credentials) {
credentials = this.decryptCredentials(existingRow.credentials)
}
const credentials = existingRow?.credentials
? this.decryptCredentials(existingRow.credentials)
: null
const source = await provider.feedSourceForUser(userId, config, credentials)
if (session.hasSource(sourceId)) {
session.replaceSource(sourceId, source)
@@ -273,11 +249,15 @@ export class UserSessionManager {
// the DB already has the new credentials but the session keeps the old
// source. The next session creation will pick up the persisted credentials.
const session = this.sessions.get(userId)
if (session && session.hasSource(sourceId)) {
if (session) {
const row = await sources(this.db, userId).find(sourceId)
if (row?.enabled) {
const source = await provider.feedSourceForUser(userId, row.config ?? {}, credentials)
session.replaceSource(sourceId, source)
if (session.hasSource(sourceId)) {
session.replaceSource(sourceId, source)
} else {
session.addSource(source)
}
}
}
}

View File

@@ -80,9 +80,6 @@ function createInMemoryStore() {
async find(sourceId: string) {
return rows.get(key(userId, sourceId))
},
async findForUpdate(sourceId: string) {
return rows.get(key(userId, sourceId))
},
async updateConfig(sourceId: string, update: { enabled?: boolean; config?: unknown }) {
const existing = rows.get(key(userId, sourceId))
if (!existing) {
@@ -128,9 +125,7 @@ mock.module("../sources/user-sources.ts", () => ({
},
}))
const fakeDb = {
transaction: <T>(fn: (tx: unknown) => Promise<T>) => fn(fakeDb),
} as unknown as Database
const fakeDb = {} as Database
function createApp(providers: FeedSourceProvider[], userId?: string) {
const sessionManager = new UserSessionManager({ providers, db: fakeDb })
@@ -743,42 +738,6 @@ describe("PUT /api/sources/:sourceId", () => {
expect(res.status).toBe(204)
})
test("returns 204 when credentials are included alongside config", async () => {
activeStore = createInMemoryStore()
const { app } = createAppWithEncryptor(
[createStubProvider("aelis.weather", weatherConfig)],
MOCK_USER_ID,
)
const res = await put(app, "aelis.weather", {
enabled: true,
config: { units: "metric" },
credentials: { apiKey: "secret123" },
})
expect(res.status).toBe(204)
const row = activeStore.rows.get(`${MOCK_USER_ID}:aelis.weather`)
expect(row).toBeDefined()
expect(row!.enabled).toBe(true)
expect(row!.config).toEqual({ units: "metric" })
})
test("returns 503 when credentials are provided but no encryptor is configured", async () => {
activeStore = createInMemoryStore()
// createApp does NOT configure an encryptor
const { app } = createApp([createStubProvider("aelis.weather", weatherConfig)], MOCK_USER_ID)
const res = await put(app, "aelis.weather", {
enabled: true,
config: { units: "metric" },
credentials: { apiKey: "secret123" },
})
expect(res.status).toBe(503)
const body = (await res.json()) as { error: string }
expect(body.error).toContain("not configured")
})
})
describe("PUT /api/sources/:sourceId/credentials", () => {

View File

@@ -34,13 +34,11 @@ const ReplaceSourceConfigRequestBody = type({
"+": "reject",
enabled: "boolean",
config: "unknown",
"credentials?": "unknown",
})
const ReplaceSourceConfigNoConfigRequestBody = type({
"+": "reject",
enabled: "boolean",
"credentials?": "unknown",
})
export function registerSourcesHttpHandlers(
@@ -163,15 +161,14 @@ async function handleReplaceSource(c: Context<Env>) {
return c.json({ error: parsed.summary }, 400)
}
const { enabled, credentials } = parsed
const { enabled } = parsed
const config = "config" in parsed ? parsed.config : undefined
const user = c.get("user")!
try {
await sessionManager.saveSourceConfig(user.id, sourceId, {
await sessionManager.upsertSourceConfig(user.id, sourceId, {
enabled,
config,
credentials,
})
} catch (err) {
if (err instanceof SourceNotFoundError) {
@@ -180,9 +177,6 @@ async function handleReplaceSource(c: Context<Env>) {
if (err instanceof InvalidSourceConfigError) {
return c.json({ error: err.message }, 400)
}
if (err instanceof CredentialStorageUnavailableError) {
return c.json({ error: err.message }, 503)
}
throw err
}

View File

@@ -26,18 +26,6 @@ export function sources(db: Database, userId: string) {
return rows[0]
},
/** Like find(), but acquires a row lock to prevent concurrent modifications. Must be called inside a transaction. */
async findForUpdate(sourceId: string) {
const rows = await db
.select()
.from(userSources)
.where(and(eq(userSources.userId, userId), eq(userSources.sourceId, sourceId)))
.limit(1)
.for("update")
return rows[0]
},
/** Enables a source for the user. Throws if the source row doesn't exist. */
async enableSource(sourceId: string) {
const rows = await db