1 Commits

Author SHA1 Message Date
181160b018 feat(core): add FeedEngine for FeedSource orchestration
Introduces FeedEngine that consumes FeedSource instances and manages
the dependency graph for context flow and item collection.

- Validates dependency graph (missing deps, circular references)
- Topologically sorts sources for execution order
- Runs fetchContext() in dependency order, accumulating context
- Runs fetchItems() on all sources with final context
- Supports reactive updates via onContextUpdate/onItemsUpdate
- Graceful error handling (continues after source failures)

Marks DataSource, ContextProvider, ContextBridge, Reconciler, and
FeedController as deprecated in favor of FeedSource + FeedEngine.

Co-authored-by: Ona <no-reply@ona.com>
2026-01-24 22:42:00 +00:00
3 changed files with 814 additions and 6 deletions

View File

@@ -0,0 +1,458 @@
import { describe, expect, test } from "bun:test"
import type { Context, ContextKey, FeedItem, FeedSource } from "./index"
import { FeedEngine } from "./feed-engine"
import { contextKey, contextValue } from "./index"
// =============================================================================
// CONTEXT KEYS
// =============================================================================
interface Location {
lat: number
lng: number
}
interface Weather {
temperature: number
condition: string
}
const LocationKey: ContextKey<Location> = contextKey("location")
const WeatherKey: ContextKey<Weather> = contextKey("weather")
// =============================================================================
// FEED ITEMS
// =============================================================================
type WeatherFeedItem = FeedItem<"weather", { temperature: number; condition: string }>
type AlertFeedItem = FeedItem<"alert", { message: string }>
// =============================================================================
// TEST HELPERS
// =============================================================================
interface SimulatedLocationSource extends FeedSource {
simulateUpdate(location: Location): void
}
function createLocationSource(): SimulatedLocationSource {
let callback: ((update: Partial<Context>) => void) | null = null
let currentLocation: Location = { lat: 0, lng: 0 }
return {
id: "location",
onContextUpdate(cb) {
callback = cb
return () => {
callback = null
}
},
async fetchContext() {
return { [LocationKey]: currentLocation }
},
simulateUpdate(location: Location) {
currentLocation = location
callback?.({ [LocationKey]: location })
},
}
}
function createWeatherSource(
fetchWeather: (location: Location) => Promise<Weather> = async () => ({
temperature: 20,
condition: "sunny",
}),
): FeedSource<WeatherFeedItem> {
return {
id: "weather",
dependencies: ["location"],
async fetchContext(context) {
const location = contextValue(context, LocationKey)
if (!location) return {}
const weather = await fetchWeather(location)
return { [WeatherKey]: weather }
},
async fetchItems(context) {
const weather = contextValue(context, WeatherKey)
if (!weather) return []
return [
{
id: `weather-${Date.now()}`,
type: "weather",
priority: 0.5,
timestamp: new Date(),
data: {
temperature: weather.temperature,
condition: weather.condition,
},
},
]
},
}
}
function createAlertSource(): FeedSource<AlertFeedItem> {
return {
id: "alert",
dependencies: ["weather"],
async fetchItems(context) {
const weather = contextValue(context, WeatherKey)
if (!weather) return []
if (weather.condition === "storm") {
return [
{
id: "alert-storm",
type: "alert",
priority: 1.0,
timestamp: new Date(),
data: { message: "Storm warning!" },
},
]
}
return []
},
}
}
// =============================================================================
// TESTS
// =============================================================================
describe("FeedEngine", () => {
describe("registration", () => {
test("registers sources", () => {
const engine = new FeedEngine()
const location = createLocationSource()
engine.register(location)
// Can refresh without error
expect(engine.refresh()).resolves.toBeDefined()
})
test("unregisters sources", async () => {
const engine = new FeedEngine()
const location = createLocationSource()
engine.register(location)
engine.unregister("location")
const result = await engine.refresh()
expect(result.items).toHaveLength(0)
})
test("allows chained registration", () => {
const engine = new FeedEngine()
.register(createLocationSource())
.register(createWeatherSource())
.register(createAlertSource())
expect(engine.refresh()).resolves.toBeDefined()
})
})
describe("graph validation", () => {
test("throws on missing dependency", () => {
const engine = new FeedEngine()
const orphan: FeedSource = {
id: "orphan",
dependencies: ["nonexistent"],
}
engine.register(orphan)
expect(engine.refresh()).rejects.toThrow(
'Source "orphan" depends on "nonexistent" which is not registered',
)
})
test("throws on circular dependency", () => {
const engine = new FeedEngine()
const a: FeedSource = { id: "a", dependencies: ["b"] }
const b: FeedSource = { id: "b", dependencies: ["a"] }
engine.register(a).register(b)
expect(engine.refresh()).rejects.toThrow("Circular dependency detected: a → b → a")
})
test("throws on longer cycles", () => {
const engine = new FeedEngine()
const a: FeedSource = { id: "a", dependencies: ["c"] }
const b: FeedSource = { id: "b", dependencies: ["a"] }
const c: FeedSource = { id: "c", dependencies: ["b"] }
engine.register(a).register(b).register(c)
expect(engine.refresh()).rejects.toThrow("Circular dependency detected")
})
})
describe("refresh", () => {
test("runs fetchContext in dependency order", async () => {
const order: string[] = []
const location: FeedSource = {
id: "location",
async fetchContext() {
order.push("location")
return { [LocationKey]: { lat: 51.5, lng: -0.1 } }
},
}
const weather: FeedSource = {
id: "weather",
dependencies: ["location"],
async fetchContext(ctx) {
order.push("weather")
const loc = contextValue(ctx, LocationKey)
expect(loc).toBeDefined()
return { [WeatherKey]: { temperature: 20, condition: "sunny" } }
},
}
const engine = new FeedEngine().register(weather).register(location)
await engine.refresh()
expect(order).toEqual(["location", "weather"])
})
test("accumulates context across sources", async () => {
const location = createLocationSource()
location.simulateUpdate({ lat: 51.5, lng: -0.1 })
const weather = createWeatherSource()
const engine = new FeedEngine().register(location).register(weather)
const { context } = await engine.refresh()
expect(contextValue(context, LocationKey)).toEqual({ lat: 51.5, lng: -0.1 })
expect(contextValue(context, WeatherKey)).toEqual({ temperature: 20, condition: "sunny" })
})
test("collects items from all sources", async () => {
const location = createLocationSource()
location.simulateUpdate({ lat: 51.5, lng: -0.1 })
const weather = createWeatherSource()
const engine = new FeedEngine().register(location).register(weather)
const { items } = await engine.refresh()
expect(items).toHaveLength(1)
expect(items[0]!.type).toBe("weather")
})
test("sorts items by priority descending", async () => {
const location = createLocationSource()
location.simulateUpdate({ lat: 51.5, lng: -0.1 })
const weather = createWeatherSource(async () => ({
temperature: 15,
condition: "storm",
}))
const alert = createAlertSource()
const engine = new FeedEngine().register(location).register(weather).register(alert)
const { items } = await engine.refresh()
expect(items).toHaveLength(2)
expect(items[0]!.type).toBe("alert") // priority 1.0
expect(items[1]!.type).toBe("weather") // priority 0.5
})
test("handles missing upstream context gracefully", async () => {
const location: FeedSource = {
id: "location",
async fetchContext() {
return {} // No location available
},
}
const weather = createWeatherSource()
const engine = new FeedEngine().register(location).register(weather)
const { context, items } = await engine.refresh()
expect(contextValue(context, WeatherKey)).toBeUndefined()
expect(items).toHaveLength(0)
})
test("captures errors from fetchContext", async () => {
const failing: FeedSource = {
id: "failing",
async fetchContext() {
throw new Error("Context fetch failed")
},
}
const engine = new FeedEngine().register(failing)
const { errors } = await engine.refresh()
expect(errors).toHaveLength(1)
expect(errors[0]!.sourceId).toBe("failing")
expect(errors[0]!.error.message).toBe("Context fetch failed")
})
test("captures errors from fetchItems", async () => {
const failing: FeedSource = {
id: "failing",
async fetchItems() {
throw new Error("Items fetch failed")
},
}
const engine = new FeedEngine().register(failing)
const { errors } = await engine.refresh()
expect(errors).toHaveLength(1)
expect(errors[0]!.sourceId).toBe("failing")
expect(errors[0]!.error.message).toBe("Items fetch failed")
})
test("continues after source error", async () => {
const failing: FeedSource = {
id: "failing",
async fetchContext() {
throw new Error("Failed")
},
}
const working: FeedSource = {
id: "working",
async fetchItems() {
return [
{
id: "item-1",
type: "test",
priority: 0.5,
timestamp: new Date(),
data: {},
},
]
},
}
const engine = new FeedEngine().register(failing).register(working)
const { items, errors } = await engine.refresh()
expect(errors).toHaveLength(1)
expect(items).toHaveLength(1)
})
})
describe("currentContext", () => {
test("returns initial context before refresh", () => {
const engine = new FeedEngine()
const context = engine.currentContext()
expect(context.time).toBeInstanceOf(Date)
})
test("returns accumulated context after refresh", async () => {
const location = createLocationSource()
location.simulateUpdate({ lat: 51.5, lng: -0.1 })
const engine = new FeedEngine().register(location)
await engine.refresh()
const context = engine.currentContext()
expect(contextValue(context, LocationKey)).toEqual({ lat: 51.5, lng: -0.1 })
})
})
describe("subscribe", () => {
test("returns unsubscribe function", () => {
const engine = new FeedEngine()
let callCount = 0
const unsubscribe = engine.subscribe(() => {
callCount++
})
unsubscribe()
// Subscriber should not be called after unsubscribe
expect(callCount).toBe(0)
})
})
describe("reactive updates", () => {
test("start subscribes to onContextUpdate", async () => {
const location = createLocationSource()
const weather = createWeatherSource()
const engine = new FeedEngine().register(location).register(weather)
const results: Array<{ items: FeedItem[] }> = []
engine.subscribe((result) => {
results.push({ items: result.items })
})
engine.start()
// Simulate location update
location.simulateUpdate({ lat: 51.5, lng: -0.1 })
// Wait for async refresh
await new Promise((resolve) => setTimeout(resolve, 50))
expect(results.length).toBeGreaterThan(0)
expect(results[0]!.items[0]!.type).toBe("weather")
})
test("stop unsubscribes from all sources", async () => {
const location = createLocationSource()
const engine = new FeedEngine().register(location)
let callCount = 0
engine.subscribe(() => {
callCount++
})
engine.start()
engine.stop()
// Simulate update after stop
location.simulateUpdate({ lat: 1, lng: 1 })
await new Promise((resolve) => setTimeout(resolve, 50))
expect(callCount).toBe(0)
})
test("start is idempotent", () => {
const location = createLocationSource()
const engine = new FeedEngine().register(location)
// Should not throw or double-subscribe
engine.start()
engine.start()
engine.stop()
})
})
})

View File

@@ -0,0 +1,335 @@
import type { Context } from "./context"
import type { FeedItem } from "./feed"
import type { FeedSource } from "./feed-source"
export interface SourceError {
sourceId: string
error: Error
}
export interface FeedResult<TItem extends FeedItem = FeedItem> {
context: Context
items: TItem[]
errors: SourceError[]
}
export type FeedSubscriber<TItem extends FeedItem = FeedItem> = (result: FeedResult<TItem>) => void
interface SourceGraph {
sources: Map<string, FeedSource>
sorted: FeedSource[]
dependents: Map<string, string[]>
}
/**
* Orchestrates FeedSources, managing the dependency graph and context flow.
*
* Sources declare dependencies on other sources. The engine:
* - Validates the dependency graph (no missing deps, no cycles)
* - Runs fetchContext() in topological order during refresh
* - Runs fetchItems() on all sources with accumulated context
* - Subscribes to reactive updates via onContextUpdate/onItemsUpdate
*
* @example
* ```ts
* const engine = new FeedEngine()
* .register(locationSource)
* .register(weatherSource)
* .register(alertSource)
*
* // Pull-based refresh
* const { context, items, errors } = await engine.refresh()
*
* // Reactive updates
* engine.subscribe((result) => {
* console.log(result.items)
* })
* engine.start()
*
* // Cleanup
* engine.stop()
* ```
*/
export class FeedEngine<TItems extends FeedItem = FeedItem> {
private sources = new Map<string, FeedSource>()
private graph: SourceGraph | null = null
private context: Context = { time: new Date() }
private subscribers = new Set<FeedSubscriber<TItems>>()
private cleanups: Array<() => void> = []
private started = false
/**
* Registers a FeedSource. Invalidates the cached graph.
*/
register<TItem extends FeedItem>(source: FeedSource<TItem>): FeedEngine<TItems | TItem> {
this.sources.set(source.id, source)
this.graph = null
return this as FeedEngine<TItems | TItem>
}
/**
* Unregisters a FeedSource by ID. Invalidates the cached graph.
*/
unregister(sourceId: string): this {
this.sources.delete(sourceId)
this.graph = null
return this
}
/**
* Refreshes the feed by running all sources in dependency order.
* Calls fetchContext() then fetchItems() on each source.
*/
async refresh(): Promise<FeedResult<TItems>> {
const graph = this.ensureGraph()
const errors: SourceError[] = []
// Reset context with fresh time
let context: Context = { time: new Date() }
// Run fetchContext in topological order
for (const source of graph.sorted) {
if (source.fetchContext) {
try {
const update = await source.fetchContext(context)
context = { ...context, ...update }
} catch (err) {
errors.push({
sourceId: source.id,
error: err instanceof Error ? err : new Error(String(err)),
})
}
}
}
// Run fetchItems on all sources
const items: FeedItem[] = []
for (const source of graph.sorted) {
if (source.fetchItems) {
try {
const sourceItems = await source.fetchItems(context)
items.push(...sourceItems)
} catch (err) {
errors.push({
sourceId: source.id,
error: err instanceof Error ? err : new Error(String(err)),
})
}
}
}
// Sort by priority descending
items.sort((a, b) => b.priority - a.priority)
this.context = context
return { context, items: items as TItems[], errors }
}
/**
* Subscribes to feed updates. Returns unsubscribe function.
*/
subscribe(callback: FeedSubscriber<TItems>): () => void {
this.subscribers.add(callback)
return () => {
this.subscribers.delete(callback)
}
}
/**
* Starts reactive subscriptions on all sources.
* Sources with onContextUpdate will trigger re-computation of dependents.
*/
start(): void {
if (this.started) return
this.started = true
const graph = this.ensureGraph()
for (const source of graph.sorted) {
if (source.onContextUpdate) {
const cleanup = source.onContextUpdate(
(update) => {
this.handleContextUpdate(source.id, update)
},
() => this.context,
)
this.cleanups.push(cleanup)
}
if (source.onItemsUpdate) {
const cleanup = source.onItemsUpdate(
() => {
this.scheduleRefresh()
},
() => this.context,
)
this.cleanups.push(cleanup)
}
}
}
/**
* Stops all reactive subscriptions.
*/
stop(): void {
this.started = false
for (const cleanup of this.cleanups) {
cleanup()
}
this.cleanups = []
}
/**
* Returns the current accumulated context.
*/
currentContext(): Context {
return this.context
}
private ensureGraph(): SourceGraph {
if (!this.graph) {
this.graph = buildGraph(Array.from(this.sources.values()))
}
return this.graph
}
private handleContextUpdate(sourceId: string, update: Partial<Context>): void {
this.context = { ...this.context, ...update, time: new Date() }
// Re-run dependents and notify
this.refreshDependents(sourceId)
}
private async refreshDependents(sourceId: string): Promise<void> {
const graph = this.ensureGraph()
const toRefresh = this.collectDependents(sourceId, graph)
// Re-run fetchContext for dependents in order
for (const id of toRefresh) {
const source = graph.sources.get(id)
if (source?.fetchContext) {
try {
const update = await source.fetchContext(this.context)
this.context = { ...this.context, ...update }
} catch {
// Errors during reactive updates are logged but don't stop propagation
}
}
}
// Collect items from all sources
const items: FeedItem[] = []
const errors: SourceError[] = []
for (const source of graph.sorted) {
if (source.fetchItems) {
try {
const sourceItems = await source.fetchItems(this.context)
items.push(...sourceItems)
} catch (err) {
errors.push({
sourceId: source.id,
error: err instanceof Error ? err : new Error(String(err)),
})
}
}
}
items.sort((a, b) => b.priority - a.priority)
this.notifySubscribers({ context: this.context, items: items as TItems[], errors })
}
private collectDependents(sourceId: string, graph: SourceGraph): string[] {
const result: string[] = []
const visited = new Set<string>()
const collect = (id: string): void => {
const deps = graph.dependents.get(id) ?? []
for (const dep of deps) {
if (!visited.has(dep)) {
visited.add(dep)
result.push(dep)
collect(dep)
}
}
}
collect(sourceId)
// Return in topological order
return graph.sorted.filter((s) => result.includes(s.id)).map((s) => s.id)
}
private scheduleRefresh(): void {
// Simple immediate refresh for now - could add debouncing later
this.refresh().then((result) => {
this.notifySubscribers(result)
})
}
private notifySubscribers(result: FeedResult<TItems>): void {
this.subscribers.forEach((callback) => {
try {
callback(result)
} catch {
// Subscriber errors shouldn't break other subscribers
}
})
}
}
function buildGraph(sources: FeedSource[]): SourceGraph {
const byId = new Map<string, FeedSource>()
for (const source of sources) {
byId.set(source.id, source)
}
// Validate dependencies exist
for (const source of sources) {
for (const dep of source.dependencies ?? []) {
if (!byId.has(dep)) {
throw new Error(`Source "${source.id}" depends on "${dep}" which is not registered`)
}
}
}
// Check for cycles and topologically sort
const visited = new Set<string>()
const visiting = new Set<string>()
const sorted: FeedSource[] = []
function visit(id: string, path: string[]): void {
if (visiting.has(id)) {
const cycle = [...path.slice(path.indexOf(id)), id].join(" → ")
throw new Error(`Circular dependency detected: ${cycle}`)
}
if (visited.has(id)) return
visiting.add(id)
const source = byId.get(id)!
for (const dep of source.dependencies ?? []) {
visit(dep, [...path, id])
}
visiting.delete(id)
visited.add(id)
sorted.push(source)
}
for (const source of sources) {
visit(source.id, [])
}
// Build reverse dependency map
const dependents = new Map<string, string[]>()
for (const source of sources) {
for (const dep of source.dependencies ?? []) {
const list = dependents.get(dep) ?? []
list.push(source.id)
dependents.set(dep, list)
}
}
return { sources: byId, sorted, dependents }
}

View File

@@ -8,20 +8,35 @@ export type { FeedItem } from "./feed"
// Feed Source // Feed Source
export type { FeedSource } from "./feed-source" export type { FeedSource } from "./feed-source"
// Feed Engine
export type { FeedResult, FeedSubscriber, SourceError } from "./feed-engine"
export { FeedEngine } from "./feed-engine"
// =============================================================================
// DEPRECATED - Use FeedSource + FeedEngine instead
// =============================================================================
// Data Source (deprecated - use FeedSource) // Data Source (deprecated - use FeedSource)
export type { DataSource } from "./data-source" export type { DataSource } from "./data-source"
// Context Provider // Context Provider (deprecated - use FeedSource)
export type { ContextProvider } from "./context-provider" export type { ContextProvider } from "./context-provider"
// Context Bridge // Context Bridge (deprecated - use FeedEngine)
export type { ProviderError, RefreshResult } from "./context-bridge" export type { ProviderError, RefreshResult } from "./context-bridge"
export { ContextBridge } from "./context-bridge" export { ContextBridge } from "./context-bridge"
// Reconciler // Reconciler (deprecated - use FeedEngine)
export type { ReconcileResult, ReconcilerConfig, SourceError } from "./reconciler" export type {
ReconcileResult,
ReconcilerConfig,
SourceError as ReconcilerSourceError,
} from "./reconciler"
export { Reconciler } from "./reconciler" export { Reconciler } from "./reconciler"
// Feed Controller // Feed Controller (deprecated - use FeedEngine)
export type { FeedControllerConfig, FeedSubscriber } from "./feed-controller" export type {
FeedControllerConfig,
FeedSubscriber as FeedControllerSubscriber,
} from "./feed-controller"
export { FeedController } from "./feed-controller" export { FeedController } from "./feed-controller"