import type { Context } from "./context" import type { DataSource } from "./data-source" import type { FeedItem } from "./feed" import type { ReconcileResult } from "./reconciler" import { Reconciler } from "./reconciler" export interface FeedControllerConfig { /** Timeout for each data source query in milliseconds */ timeout?: number /** Debounce window for batching context updates (default: 100ms) */ debounceMs?: number /** Initial context state */ initialContext?: Context } export type FeedSubscriber = (result: ReconcileResult) => void interface RegisteredSource { source: DataSource config: unknown } const DEFAULT_DEBOUNCE_MS = 100 /** * Orchestrates feed reconciliation in response to context updates. * * Holds context state, debounces updates, queries data sources, and * notifies subscribers. Each user should have their own instance. * * @example * ```ts * const controller = new FeedController({ debounceMs: 100 }) * .addDataSource(new WeatherDataSource()) * .addDataSource(new TflDataSource()) * * controller.subscribe((result) => { * console.log(result.items) * }) * * // Context update triggers debounced reconcile * controller.pushContextUpdate({ [LocationKey]: location }) * * // Direct reconcile (no debounce) * const result = await controller.reconcile() * * // Cleanup * controller.stop() * ``` */ export class FeedController { private sources = new Map() private subscribers = new Set>() private context: Context private debounceMs: number private timeout: number | undefined private pendingTimeout: ReturnType | null = null private stopped = false constructor(config?: FeedControllerConfig) { this.context = config?.initialContext ?? { time: new Date() } this.debounceMs = config?.debounceMs ?? DEFAULT_DEBOUNCE_MS this.timeout = config?.timeout } /** Registers a data source. */ addDataSource( source: DataSource, config?: TConfig, ): FeedController { this.sources.set(source.type, { source: source as DataSource, config, }) return this as FeedController } /** Removes a data source by type. */ removeDataSource( sourceType: T, ): FeedController> { this.sources.delete(sourceType) return this as unknown as FeedController> } /** Stops the controller and cancels pending reconciles. */ stop(): void { this.stopped = true if (this.pendingTimeout) { clearTimeout(this.pendingTimeout) this.pendingTimeout = null } } /** Merges update into context and schedules a debounced reconcile. */ pushContextUpdate(update: Partial): void { this.context = { ...this.context, ...update, time: new Date() } this.scheduleReconcile() } /** Subscribes to feed updates. Returns unsubscribe function. */ subscribe(callback: FeedSubscriber): () => void { this.subscribers.add(callback) return () => { this.subscribers.delete(callback) } } /** Immediately reconciles with current or provided context. */ async reconcile(context?: Context): Promise> { const ctx = context ?? this.context const reconciler = this.createReconciler() return reconciler.reconcile(ctx) } /** Returns current context. */ getContext(): Context { return this.context } private scheduleReconcile(): void { if (this.pendingTimeout) return this.pendingTimeout = setTimeout(() => { this.flushPending() }, this.debounceMs) } private async flushPending(): Promise { this.pendingTimeout = null if (this.stopped) return if (this.sources.size === 0) return const reconciler = this.createReconciler() const result = await reconciler.reconcile(this.context) this.notifySubscribers(result) } private createReconciler(): Reconciler { const reconciler = new Reconciler({ timeout: this.timeout }) Array.from(this.sources.values()).forEach(({ source, config }) => { reconciler.register(source, config) }) return reconciler as Reconciler } private notifySubscribers(result: ReconcileResult): void { this.subscribers.forEach((callback) => { try { callback(result) } catch { // Subscriber errors shouldn't break other subscribers } }) } }