From 181160b018db716625763395be7cfecd7a32a6fc Mon Sep 17 00:00:00 2001 From: kenneth Date: Sat, 24 Jan 2026 22:42:00 +0000 Subject: [PATCH] feat(core): add FeedEngine for FeedSource orchestration Introduces FeedEngine that consumes FeedSource instances and manages the dependency graph for context flow and item collection. - Validates dependency graph (missing deps, circular references) - Topologically sorts sources for execution order - Runs fetchContext() in dependency order, accumulating context - Runs fetchItems() on all sources with final context - Supports reactive updates via onContextUpdate/onItemsUpdate - Graceful error handling (continues after source failures) Marks DataSource, ContextProvider, ContextBridge, Reconciler, and FeedController as deprecated in favor of FeedSource + FeedEngine. Co-authored-by: Ona --- packages/aris-core/src/feed-engine.test.ts | 458 +++++++++++++++++++++ packages/aris-core/src/feed-engine.ts | 335 +++++++++++++++ packages/aris-core/src/index.ts | 27 +- 3 files changed, 814 insertions(+), 6 deletions(-) create mode 100644 packages/aris-core/src/feed-engine.test.ts create mode 100644 packages/aris-core/src/feed-engine.ts diff --git a/packages/aris-core/src/feed-engine.test.ts b/packages/aris-core/src/feed-engine.test.ts new file mode 100644 index 0000000..db6cdac --- /dev/null +++ b/packages/aris-core/src/feed-engine.test.ts @@ -0,0 +1,458 @@ +import { describe, expect, test } from "bun:test" + +import type { Context, ContextKey, FeedItem, FeedSource } from "./index" + +import { FeedEngine } from "./feed-engine" +import { contextKey, contextValue } from "./index" + +// ============================================================================= +// CONTEXT KEYS +// ============================================================================= + +interface Location { + lat: number + lng: number +} + +interface Weather { + temperature: number + condition: string +} + +const LocationKey: ContextKey = contextKey("location") +const WeatherKey: ContextKey = contextKey("weather") + +// ============================================================================= +// FEED ITEMS +// ============================================================================= + +type WeatherFeedItem = FeedItem<"weather", { temperature: number; condition: string }> +type AlertFeedItem = FeedItem<"alert", { message: string }> + +// ============================================================================= +// TEST HELPERS +// ============================================================================= + +interface SimulatedLocationSource extends FeedSource { + simulateUpdate(location: Location): void +} + +function createLocationSource(): SimulatedLocationSource { + let callback: ((update: Partial) => void) | null = null + let currentLocation: Location = { lat: 0, lng: 0 } + + return { + id: "location", + + onContextUpdate(cb) { + callback = cb + return () => { + callback = null + } + }, + + async fetchContext() { + return { [LocationKey]: currentLocation } + }, + + simulateUpdate(location: Location) { + currentLocation = location + callback?.({ [LocationKey]: location }) + }, + } +} + +function createWeatherSource( + fetchWeather: (location: Location) => Promise = async () => ({ + temperature: 20, + condition: "sunny", + }), +): FeedSource { + return { + id: "weather", + dependencies: ["location"], + + async fetchContext(context) { + const location = contextValue(context, LocationKey) + if (!location) return {} + + const weather = await fetchWeather(location) + return { [WeatherKey]: weather } + }, + + async fetchItems(context) { + const weather = contextValue(context, WeatherKey) + if (!weather) return [] + + return [ + { + id: `weather-${Date.now()}`, + type: "weather", + priority: 0.5, + timestamp: new Date(), + data: { + temperature: weather.temperature, + condition: weather.condition, + }, + }, + ] + }, + } +} + +function createAlertSource(): FeedSource { + return { + id: "alert", + dependencies: ["weather"], + + async fetchItems(context) { + const weather = contextValue(context, WeatherKey) + if (!weather) return [] + + if (weather.condition === "storm") { + return [ + { + id: "alert-storm", + type: "alert", + priority: 1.0, + timestamp: new Date(), + data: { message: "Storm warning!" }, + }, + ] + } + + return [] + }, + } +} + +// ============================================================================= +// TESTS +// ============================================================================= + +describe("FeedEngine", () => { + describe("registration", () => { + test("registers sources", () => { + const engine = new FeedEngine() + const location = createLocationSource() + + engine.register(location) + + // Can refresh without error + expect(engine.refresh()).resolves.toBeDefined() + }) + + test("unregisters sources", async () => { + const engine = new FeedEngine() + const location = createLocationSource() + + engine.register(location) + engine.unregister("location") + + const result = await engine.refresh() + expect(result.items).toHaveLength(0) + }) + + test("allows chained registration", () => { + const engine = new FeedEngine() + .register(createLocationSource()) + .register(createWeatherSource()) + .register(createAlertSource()) + + expect(engine.refresh()).resolves.toBeDefined() + }) + }) + + describe("graph validation", () => { + test("throws on missing dependency", () => { + const engine = new FeedEngine() + const orphan: FeedSource = { + id: "orphan", + dependencies: ["nonexistent"], + } + + engine.register(orphan) + + expect(engine.refresh()).rejects.toThrow( + 'Source "orphan" depends on "nonexistent" which is not registered', + ) + }) + + test("throws on circular dependency", () => { + const engine = new FeedEngine() + const a: FeedSource = { id: "a", dependencies: ["b"] } + const b: FeedSource = { id: "b", dependencies: ["a"] } + + engine.register(a).register(b) + + expect(engine.refresh()).rejects.toThrow("Circular dependency detected: a → b → a") + }) + + test("throws on longer cycles", () => { + const engine = new FeedEngine() + const a: FeedSource = { id: "a", dependencies: ["c"] } + const b: FeedSource = { id: "b", dependencies: ["a"] } + const c: FeedSource = { id: "c", dependencies: ["b"] } + + engine.register(a).register(b).register(c) + + expect(engine.refresh()).rejects.toThrow("Circular dependency detected") + }) + }) + + describe("refresh", () => { + test("runs fetchContext in dependency order", async () => { + const order: string[] = [] + + const location: FeedSource = { + id: "location", + async fetchContext() { + order.push("location") + return { [LocationKey]: { lat: 51.5, lng: -0.1 } } + }, + } + + const weather: FeedSource = { + id: "weather", + dependencies: ["location"], + async fetchContext(ctx) { + order.push("weather") + const loc = contextValue(ctx, LocationKey) + expect(loc).toBeDefined() + return { [WeatherKey]: { temperature: 20, condition: "sunny" } } + }, + } + + const engine = new FeedEngine().register(weather).register(location) + + await engine.refresh() + + expect(order).toEqual(["location", "weather"]) + }) + + test("accumulates context across sources", async () => { + const location = createLocationSource() + location.simulateUpdate({ lat: 51.5, lng: -0.1 }) + + const weather = createWeatherSource() + + const engine = new FeedEngine().register(location).register(weather) + + const { context } = await engine.refresh() + + expect(contextValue(context, LocationKey)).toEqual({ lat: 51.5, lng: -0.1 }) + expect(contextValue(context, WeatherKey)).toEqual({ temperature: 20, condition: "sunny" }) + }) + + test("collects items from all sources", async () => { + const location = createLocationSource() + location.simulateUpdate({ lat: 51.5, lng: -0.1 }) + + const weather = createWeatherSource() + + const engine = new FeedEngine().register(location).register(weather) + + const { items } = await engine.refresh() + + expect(items).toHaveLength(1) + expect(items[0]!.type).toBe("weather") + }) + + test("sorts items by priority descending", async () => { + const location = createLocationSource() + location.simulateUpdate({ lat: 51.5, lng: -0.1 }) + + const weather = createWeatherSource(async () => ({ + temperature: 15, + condition: "storm", + })) + + const alert = createAlertSource() + + const engine = new FeedEngine().register(location).register(weather).register(alert) + + const { items } = await engine.refresh() + + expect(items).toHaveLength(2) + expect(items[0]!.type).toBe("alert") // priority 1.0 + expect(items[1]!.type).toBe("weather") // priority 0.5 + }) + + test("handles missing upstream context gracefully", async () => { + const location: FeedSource = { + id: "location", + async fetchContext() { + return {} // No location available + }, + } + + const weather = createWeatherSource() + + const engine = new FeedEngine().register(location).register(weather) + + const { context, items } = await engine.refresh() + + expect(contextValue(context, WeatherKey)).toBeUndefined() + expect(items).toHaveLength(0) + }) + + test("captures errors from fetchContext", async () => { + const failing: FeedSource = { + id: "failing", + async fetchContext() { + throw new Error("Context fetch failed") + }, + } + + const engine = new FeedEngine().register(failing) + + const { errors } = await engine.refresh() + + expect(errors).toHaveLength(1) + expect(errors[0]!.sourceId).toBe("failing") + expect(errors[0]!.error.message).toBe("Context fetch failed") + }) + + test("captures errors from fetchItems", async () => { + const failing: FeedSource = { + id: "failing", + async fetchItems() { + throw new Error("Items fetch failed") + }, + } + + const engine = new FeedEngine().register(failing) + + const { errors } = await engine.refresh() + + expect(errors).toHaveLength(1) + expect(errors[0]!.sourceId).toBe("failing") + expect(errors[0]!.error.message).toBe("Items fetch failed") + }) + + test("continues after source error", async () => { + const failing: FeedSource = { + id: "failing", + async fetchContext() { + throw new Error("Failed") + }, + } + + const working: FeedSource = { + id: "working", + async fetchItems() { + return [ + { + id: "item-1", + type: "test", + priority: 0.5, + timestamp: new Date(), + data: {}, + }, + ] + }, + } + + const engine = new FeedEngine().register(failing).register(working) + + const { items, errors } = await engine.refresh() + + expect(errors).toHaveLength(1) + expect(items).toHaveLength(1) + }) + }) + + describe("currentContext", () => { + test("returns initial context before refresh", () => { + const engine = new FeedEngine() + + const context = engine.currentContext() + + expect(context.time).toBeInstanceOf(Date) + }) + + test("returns accumulated context after refresh", async () => { + const location = createLocationSource() + location.simulateUpdate({ lat: 51.5, lng: -0.1 }) + + const engine = new FeedEngine().register(location) + + await engine.refresh() + + const context = engine.currentContext() + expect(contextValue(context, LocationKey)).toEqual({ lat: 51.5, lng: -0.1 }) + }) + }) + + describe("subscribe", () => { + test("returns unsubscribe function", () => { + const engine = new FeedEngine() + let callCount = 0 + + const unsubscribe = engine.subscribe(() => { + callCount++ + }) + + unsubscribe() + + // Subscriber should not be called after unsubscribe + expect(callCount).toBe(0) + }) + }) + + describe("reactive updates", () => { + test("start subscribes to onContextUpdate", async () => { + const location = createLocationSource() + const weather = createWeatherSource() + + const engine = new FeedEngine().register(location).register(weather) + + const results: Array<{ items: FeedItem[] }> = [] + engine.subscribe((result) => { + results.push({ items: result.items }) + }) + + engine.start() + + // Simulate location update + location.simulateUpdate({ lat: 51.5, lng: -0.1 }) + + // Wait for async refresh + await new Promise((resolve) => setTimeout(resolve, 50)) + + expect(results.length).toBeGreaterThan(0) + expect(results[0]!.items[0]!.type).toBe("weather") + }) + + test("stop unsubscribes from all sources", async () => { + const location = createLocationSource() + + const engine = new FeedEngine().register(location) + + let callCount = 0 + engine.subscribe(() => { + callCount++ + }) + + engine.start() + engine.stop() + + // Simulate update after stop + location.simulateUpdate({ lat: 1, lng: 1 }) + + await new Promise((resolve) => setTimeout(resolve, 50)) + + expect(callCount).toBe(0) + }) + + test("start is idempotent", () => { + const location = createLocationSource() + const engine = new FeedEngine().register(location) + + // Should not throw or double-subscribe + engine.start() + engine.start() + engine.stop() + }) + }) +}) diff --git a/packages/aris-core/src/feed-engine.ts b/packages/aris-core/src/feed-engine.ts new file mode 100644 index 0000000..f4c95c9 --- /dev/null +++ b/packages/aris-core/src/feed-engine.ts @@ -0,0 +1,335 @@ +import type { Context } from "./context" +import type { FeedItem } from "./feed" +import type { FeedSource } from "./feed-source" + +export interface SourceError { + sourceId: string + error: Error +} + +export interface FeedResult { + context: Context + items: TItem[] + errors: SourceError[] +} + +export type FeedSubscriber = (result: FeedResult) => void + +interface SourceGraph { + sources: Map + sorted: FeedSource[] + dependents: Map +} + +/** + * Orchestrates FeedSources, managing the dependency graph and context flow. + * + * Sources declare dependencies on other sources. The engine: + * - Validates the dependency graph (no missing deps, no cycles) + * - Runs fetchContext() in topological order during refresh + * - Runs fetchItems() on all sources with accumulated context + * - Subscribes to reactive updates via onContextUpdate/onItemsUpdate + * + * @example + * ```ts + * const engine = new FeedEngine() + * .register(locationSource) + * .register(weatherSource) + * .register(alertSource) + * + * // Pull-based refresh + * const { context, items, errors } = await engine.refresh() + * + * // Reactive updates + * engine.subscribe((result) => { + * console.log(result.items) + * }) + * engine.start() + * + * // Cleanup + * engine.stop() + * ``` + */ +export class FeedEngine { + private sources = new Map() + private graph: SourceGraph | null = null + private context: Context = { time: new Date() } + private subscribers = new Set>() + private cleanups: Array<() => void> = [] + private started = false + + /** + * Registers a FeedSource. Invalidates the cached graph. + */ + register(source: FeedSource): FeedEngine { + this.sources.set(source.id, source) + this.graph = null + return this as FeedEngine + } + + /** + * Unregisters a FeedSource by ID. Invalidates the cached graph. + */ + unregister(sourceId: string): this { + this.sources.delete(sourceId) + this.graph = null + return this + } + + /** + * Refreshes the feed by running all sources in dependency order. + * Calls fetchContext() then fetchItems() on each source. + */ + async refresh(): Promise> { + const graph = this.ensureGraph() + const errors: SourceError[] = [] + + // Reset context with fresh time + let context: Context = { time: new Date() } + + // Run fetchContext in topological order + for (const source of graph.sorted) { + if (source.fetchContext) { + try { + const update = await source.fetchContext(context) + context = { ...context, ...update } + } catch (err) { + errors.push({ + sourceId: source.id, + error: err instanceof Error ? err : new Error(String(err)), + }) + } + } + } + + // Run fetchItems on all sources + const items: FeedItem[] = [] + for (const source of graph.sorted) { + if (source.fetchItems) { + try { + const sourceItems = await source.fetchItems(context) + items.push(...sourceItems) + } catch (err) { + errors.push({ + sourceId: source.id, + error: err instanceof Error ? err : new Error(String(err)), + }) + } + } + } + + // Sort by priority descending + items.sort((a, b) => b.priority - a.priority) + + this.context = context + + return { context, items: items as TItems[], errors } + } + + /** + * Subscribes to feed updates. Returns unsubscribe function. + */ + subscribe(callback: FeedSubscriber): () => void { + this.subscribers.add(callback) + return () => { + this.subscribers.delete(callback) + } + } + + /** + * Starts reactive subscriptions on all sources. + * Sources with onContextUpdate will trigger re-computation of dependents. + */ + start(): void { + if (this.started) return + + this.started = true + const graph = this.ensureGraph() + + for (const source of graph.sorted) { + if (source.onContextUpdate) { + const cleanup = source.onContextUpdate( + (update) => { + this.handleContextUpdate(source.id, update) + }, + () => this.context, + ) + this.cleanups.push(cleanup) + } + + if (source.onItemsUpdate) { + const cleanup = source.onItemsUpdate( + () => { + this.scheduleRefresh() + }, + () => this.context, + ) + this.cleanups.push(cleanup) + } + } + } + + /** + * Stops all reactive subscriptions. + */ + stop(): void { + this.started = false + for (const cleanup of this.cleanups) { + cleanup() + } + this.cleanups = [] + } + + /** + * Returns the current accumulated context. + */ + currentContext(): Context { + return this.context + } + + private ensureGraph(): SourceGraph { + if (!this.graph) { + this.graph = buildGraph(Array.from(this.sources.values())) + } + return this.graph + } + + private handleContextUpdate(sourceId: string, update: Partial): void { + this.context = { ...this.context, ...update, time: new Date() } + + // Re-run dependents and notify + this.refreshDependents(sourceId) + } + + private async refreshDependents(sourceId: string): Promise { + const graph = this.ensureGraph() + const toRefresh = this.collectDependents(sourceId, graph) + + // Re-run fetchContext for dependents in order + for (const id of toRefresh) { + const source = graph.sources.get(id) + if (source?.fetchContext) { + try { + const update = await source.fetchContext(this.context) + this.context = { ...this.context, ...update } + } catch { + // Errors during reactive updates are logged but don't stop propagation + } + } + } + + // Collect items from all sources + const items: FeedItem[] = [] + const errors: SourceError[] = [] + + for (const source of graph.sorted) { + if (source.fetchItems) { + try { + const sourceItems = await source.fetchItems(this.context) + items.push(...sourceItems) + } catch (err) { + errors.push({ + sourceId: source.id, + error: err instanceof Error ? err : new Error(String(err)), + }) + } + } + } + + items.sort((a, b) => b.priority - a.priority) + + this.notifySubscribers({ context: this.context, items: items as TItems[], errors }) + } + + private collectDependents(sourceId: string, graph: SourceGraph): string[] { + const result: string[] = [] + const visited = new Set() + + const collect = (id: string): void => { + const deps = graph.dependents.get(id) ?? [] + for (const dep of deps) { + if (!visited.has(dep)) { + visited.add(dep) + result.push(dep) + collect(dep) + } + } + } + + collect(sourceId) + + // Return in topological order + return graph.sorted.filter((s) => result.includes(s.id)).map((s) => s.id) + } + + private scheduleRefresh(): void { + // Simple immediate refresh for now - could add debouncing later + this.refresh().then((result) => { + this.notifySubscribers(result) + }) + } + + private notifySubscribers(result: FeedResult): void { + this.subscribers.forEach((callback) => { + try { + callback(result) + } catch { + // Subscriber errors shouldn't break other subscribers + } + }) + } +} + +function buildGraph(sources: FeedSource[]): SourceGraph { + const byId = new Map() + for (const source of sources) { + byId.set(source.id, source) + } + + // Validate dependencies exist + for (const source of sources) { + for (const dep of source.dependencies ?? []) { + if (!byId.has(dep)) { + throw new Error(`Source "${source.id}" depends on "${dep}" which is not registered`) + } + } + } + + // Check for cycles and topologically sort + const visited = new Set() + const visiting = new Set() + const sorted: FeedSource[] = [] + + function visit(id: string, path: string[]): void { + if (visiting.has(id)) { + const cycle = [...path.slice(path.indexOf(id)), id].join(" → ") + throw new Error(`Circular dependency detected: ${cycle}`) + } + if (visited.has(id)) return + + visiting.add(id) + const source = byId.get(id)! + for (const dep of source.dependencies ?? []) { + visit(dep, [...path, id]) + } + visiting.delete(id) + visited.add(id) + sorted.push(source) + } + + for (const source of sources) { + visit(source.id, []) + } + + // Build reverse dependency map + const dependents = new Map() + for (const source of sources) { + for (const dep of source.dependencies ?? []) { + const list = dependents.get(dep) ?? [] + list.push(source.id) + dependents.set(dep, list) + } + } + + return { sources: byId, sorted, dependents } +} diff --git a/packages/aris-core/src/index.ts b/packages/aris-core/src/index.ts index 0e0ad02..691e5c8 100644 --- a/packages/aris-core/src/index.ts +++ b/packages/aris-core/src/index.ts @@ -8,20 +8,35 @@ export type { FeedItem } from "./feed" // Feed Source export type { FeedSource } from "./feed-source" +// Feed Engine +export type { FeedResult, FeedSubscriber, SourceError } from "./feed-engine" +export { FeedEngine } from "./feed-engine" + +// ============================================================================= +// DEPRECATED - Use FeedSource + FeedEngine instead +// ============================================================================= + // Data Source (deprecated - use FeedSource) export type { DataSource } from "./data-source" -// Context Provider +// Context Provider (deprecated - use FeedSource) export type { ContextProvider } from "./context-provider" -// Context Bridge +// Context Bridge (deprecated - use FeedEngine) export type { ProviderError, RefreshResult } from "./context-bridge" export { ContextBridge } from "./context-bridge" -// Reconciler -export type { ReconcileResult, ReconcilerConfig, SourceError } from "./reconciler" +// Reconciler (deprecated - use FeedEngine) +export type { + ReconcileResult, + ReconcilerConfig, + SourceError as ReconcilerSourceError, +} from "./reconciler" export { Reconciler } from "./reconciler" -// Feed Controller -export type { FeedControllerConfig, FeedSubscriber } from "./feed-controller" +// Feed Controller (deprecated - use FeedEngine) +export type { + FeedControllerConfig, + FeedSubscriber as FeedControllerSubscriber, +} from "./feed-controller" export { FeedController } from "./feed-controller"