import type { Context } from "./context" import type { DataSource } from "./data-source" import type { FeedItem } from "./feed" export interface ReconcilerConfig { timeout?: number } export interface SourceError { sourceType: string error: Error } export interface ReconcileResult { items: TItem[] errors: SourceError[] } interface RegisteredSource { source: DataSource config: unknown } const DEFAULT_TIMEOUT = 5000 export class Reconciler { private sources = new Map() private timeout: number constructor(config?: ReconcilerConfig) { this.timeout = config?.timeout ?? DEFAULT_TIMEOUT } register( source: DataSource, config?: TConfig, ): Reconciler { this.sources.set(source.type, { source: source as DataSource, config, }) return this as Reconciler } unregister(sourceType: T): Reconciler> { this.sources.delete(sourceType) return this as unknown as Reconciler> } async reconcile(context: Context): Promise> { const entries = Array.from(this.sources.values()) const results = await Promise.allSettled( entries.map(({ source, config }) => withTimeout(source.query(context, config), this.timeout, source.type), ), ) const items: FeedItem[] = [] const errors: SourceError[] = [] results.forEach((result, i) => { const sourceType = entries[i]!.source.type if (result.status === "fulfilled") { items.push(...result.value) } else { errors.push({ sourceType, error: result.reason instanceof Error ? result.reason : new Error(String(result.reason)), }) } }) items.sort((a, b) => b.priority - a.priority) return { items, errors } as ReconcileResult } } function withTimeout(promise: Promise, ms: number, sourceType: string): Promise { return Promise.race([ promise, new Promise((_, reject) => setTimeout(() => reject(new Error(`Source "${sourceType}" timed out after ${ms}ms`)), ms), ), ]) }