mirror of
https://github.com/kennethnym/aris.git
synced 2026-02-02 05:01:17 +00:00
feat(core): add FeedEngine for FeedSource orchestration
Introduces FeedEngine that consumes FeedSource instances and manages the dependency graph for context flow and item collection. - Validates dependency graph (missing deps, circular references) - Topologically sorts sources for execution order - Runs fetchContext() in dependency order, accumulating context - Runs fetchItems() on all sources with final context - Supports reactive updates via onContextUpdate/onItemsUpdate - Graceful error handling (continues after source failures) Marks DataSource, ContextProvider, ContextBridge, Reconciler, and FeedController as deprecated in favor of FeedSource + FeedEngine. Co-authored-by: Ona <no-reply@ona.com>
This commit is contained in:
458
packages/aris-core/src/feed-engine.test.ts
Normal file
458
packages/aris-core/src/feed-engine.test.ts
Normal file
@@ -0,0 +1,458 @@
|
||||
import { describe, expect, test } from "bun:test"
|
||||
|
||||
import type { Context, ContextKey, FeedItem, FeedSource } from "./index"
|
||||
|
||||
import { FeedEngine } from "./feed-engine"
|
||||
import { contextKey, contextValue } from "./index"
|
||||
|
||||
// =============================================================================
|
||||
// CONTEXT KEYS
|
||||
// =============================================================================
|
||||
|
||||
interface Location {
|
||||
lat: number
|
||||
lng: number
|
||||
}
|
||||
|
||||
interface Weather {
|
||||
temperature: number
|
||||
condition: string
|
||||
}
|
||||
|
||||
const LocationKey: ContextKey<Location> = contextKey("location")
|
||||
const WeatherKey: ContextKey<Weather> = contextKey("weather")
|
||||
|
||||
// =============================================================================
|
||||
// FEED ITEMS
|
||||
// =============================================================================
|
||||
|
||||
type WeatherFeedItem = FeedItem<"weather", { temperature: number; condition: string }>
|
||||
type AlertFeedItem = FeedItem<"alert", { message: string }>
|
||||
|
||||
// =============================================================================
|
||||
// TEST HELPERS
|
||||
// =============================================================================
|
||||
|
||||
interface SimulatedLocationSource extends FeedSource {
|
||||
simulateUpdate(location: Location): void
|
||||
}
|
||||
|
||||
function createLocationSource(): SimulatedLocationSource {
|
||||
let callback: ((update: Partial<Context>) => void) | null = null
|
||||
let currentLocation: Location = { lat: 0, lng: 0 }
|
||||
|
||||
return {
|
||||
id: "location",
|
||||
|
||||
onContextUpdate(cb) {
|
||||
callback = cb
|
||||
return () => {
|
||||
callback = null
|
||||
}
|
||||
},
|
||||
|
||||
async fetchContext() {
|
||||
return { [LocationKey]: currentLocation }
|
||||
},
|
||||
|
||||
simulateUpdate(location: Location) {
|
||||
currentLocation = location
|
||||
callback?.({ [LocationKey]: location })
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
function createWeatherSource(
|
||||
fetchWeather: (location: Location) => Promise<Weather> = async () => ({
|
||||
temperature: 20,
|
||||
condition: "sunny",
|
||||
}),
|
||||
): FeedSource<WeatherFeedItem> {
|
||||
return {
|
||||
id: "weather",
|
||||
dependencies: ["location"],
|
||||
|
||||
async fetchContext(context) {
|
||||
const location = contextValue(context, LocationKey)
|
||||
if (!location) return {}
|
||||
|
||||
const weather = await fetchWeather(location)
|
||||
return { [WeatherKey]: weather }
|
||||
},
|
||||
|
||||
async fetchItems(context) {
|
||||
const weather = contextValue(context, WeatherKey)
|
||||
if (!weather) return []
|
||||
|
||||
return [
|
||||
{
|
||||
id: `weather-${Date.now()}`,
|
||||
type: "weather",
|
||||
priority: 0.5,
|
||||
timestamp: new Date(),
|
||||
data: {
|
||||
temperature: weather.temperature,
|
||||
condition: weather.condition,
|
||||
},
|
||||
},
|
||||
]
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
function createAlertSource(): FeedSource<AlertFeedItem> {
|
||||
return {
|
||||
id: "alert",
|
||||
dependencies: ["weather"],
|
||||
|
||||
async fetchItems(context) {
|
||||
const weather = contextValue(context, WeatherKey)
|
||||
if (!weather) return []
|
||||
|
||||
if (weather.condition === "storm") {
|
||||
return [
|
||||
{
|
||||
id: "alert-storm",
|
||||
type: "alert",
|
||||
priority: 1.0,
|
||||
timestamp: new Date(),
|
||||
data: { message: "Storm warning!" },
|
||||
},
|
||||
]
|
||||
}
|
||||
|
||||
return []
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// =============================================================================
|
||||
// TESTS
|
||||
// =============================================================================
|
||||
|
||||
describe("FeedEngine", () => {
|
||||
describe("registration", () => {
|
||||
test("registers sources", () => {
|
||||
const engine = new FeedEngine()
|
||||
const location = createLocationSource()
|
||||
|
||||
engine.register(location)
|
||||
|
||||
// Can refresh without error
|
||||
expect(engine.refresh()).resolves.toBeDefined()
|
||||
})
|
||||
|
||||
test("unregisters sources", async () => {
|
||||
const engine = new FeedEngine()
|
||||
const location = createLocationSource()
|
||||
|
||||
engine.register(location)
|
||||
engine.unregister("location")
|
||||
|
||||
const result = await engine.refresh()
|
||||
expect(result.items).toHaveLength(0)
|
||||
})
|
||||
|
||||
test("allows chained registration", () => {
|
||||
const engine = new FeedEngine()
|
||||
.register(createLocationSource())
|
||||
.register(createWeatherSource())
|
||||
.register(createAlertSource())
|
||||
|
||||
expect(engine.refresh()).resolves.toBeDefined()
|
||||
})
|
||||
})
|
||||
|
||||
describe("graph validation", () => {
|
||||
test("throws on missing dependency", () => {
|
||||
const engine = new FeedEngine()
|
||||
const orphan: FeedSource = {
|
||||
id: "orphan",
|
||||
dependencies: ["nonexistent"],
|
||||
}
|
||||
|
||||
engine.register(orphan)
|
||||
|
||||
expect(engine.refresh()).rejects.toThrow(
|
||||
'Source "orphan" depends on "nonexistent" which is not registered',
|
||||
)
|
||||
})
|
||||
|
||||
test("throws on circular dependency", () => {
|
||||
const engine = new FeedEngine()
|
||||
const a: FeedSource = { id: "a", dependencies: ["b"] }
|
||||
const b: FeedSource = { id: "b", dependencies: ["a"] }
|
||||
|
||||
engine.register(a).register(b)
|
||||
|
||||
expect(engine.refresh()).rejects.toThrow("Circular dependency detected: a → b → a")
|
||||
})
|
||||
|
||||
test("throws on longer cycles", () => {
|
||||
const engine = new FeedEngine()
|
||||
const a: FeedSource = { id: "a", dependencies: ["c"] }
|
||||
const b: FeedSource = { id: "b", dependencies: ["a"] }
|
||||
const c: FeedSource = { id: "c", dependencies: ["b"] }
|
||||
|
||||
engine.register(a).register(b).register(c)
|
||||
|
||||
expect(engine.refresh()).rejects.toThrow("Circular dependency detected")
|
||||
})
|
||||
})
|
||||
|
||||
describe("refresh", () => {
|
||||
test("runs fetchContext in dependency order", async () => {
|
||||
const order: string[] = []
|
||||
|
||||
const location: FeedSource = {
|
||||
id: "location",
|
||||
async fetchContext() {
|
||||
order.push("location")
|
||||
return { [LocationKey]: { lat: 51.5, lng: -0.1 } }
|
||||
},
|
||||
}
|
||||
|
||||
const weather: FeedSource = {
|
||||
id: "weather",
|
||||
dependencies: ["location"],
|
||||
async fetchContext(ctx) {
|
||||
order.push("weather")
|
||||
const loc = contextValue(ctx, LocationKey)
|
||||
expect(loc).toBeDefined()
|
||||
return { [WeatherKey]: { temperature: 20, condition: "sunny" } }
|
||||
},
|
||||
}
|
||||
|
||||
const engine = new FeedEngine().register(weather).register(location)
|
||||
|
||||
await engine.refresh()
|
||||
|
||||
expect(order).toEqual(["location", "weather"])
|
||||
})
|
||||
|
||||
test("accumulates context across sources", async () => {
|
||||
const location = createLocationSource()
|
||||
location.simulateUpdate({ lat: 51.5, lng: -0.1 })
|
||||
|
||||
const weather = createWeatherSource()
|
||||
|
||||
const engine = new FeedEngine().register(location).register(weather)
|
||||
|
||||
const { context } = await engine.refresh()
|
||||
|
||||
expect(contextValue(context, LocationKey)).toEqual({ lat: 51.5, lng: -0.1 })
|
||||
expect(contextValue(context, WeatherKey)).toEqual({ temperature: 20, condition: "sunny" })
|
||||
})
|
||||
|
||||
test("collects items from all sources", async () => {
|
||||
const location = createLocationSource()
|
||||
location.simulateUpdate({ lat: 51.5, lng: -0.1 })
|
||||
|
||||
const weather = createWeatherSource()
|
||||
|
||||
const engine = new FeedEngine().register(location).register(weather)
|
||||
|
||||
const { items } = await engine.refresh()
|
||||
|
||||
expect(items).toHaveLength(1)
|
||||
expect(items[0]!.type).toBe("weather")
|
||||
})
|
||||
|
||||
test("sorts items by priority descending", async () => {
|
||||
const location = createLocationSource()
|
||||
location.simulateUpdate({ lat: 51.5, lng: -0.1 })
|
||||
|
||||
const weather = createWeatherSource(async () => ({
|
||||
temperature: 15,
|
||||
condition: "storm",
|
||||
}))
|
||||
|
||||
const alert = createAlertSource()
|
||||
|
||||
const engine = new FeedEngine().register(location).register(weather).register(alert)
|
||||
|
||||
const { items } = await engine.refresh()
|
||||
|
||||
expect(items).toHaveLength(2)
|
||||
expect(items[0]!.type).toBe("alert") // priority 1.0
|
||||
expect(items[1]!.type).toBe("weather") // priority 0.5
|
||||
})
|
||||
|
||||
test("handles missing upstream context gracefully", async () => {
|
||||
const location: FeedSource = {
|
||||
id: "location",
|
||||
async fetchContext() {
|
||||
return {} // No location available
|
||||
},
|
||||
}
|
||||
|
||||
const weather = createWeatherSource()
|
||||
|
||||
const engine = new FeedEngine().register(location).register(weather)
|
||||
|
||||
const { context, items } = await engine.refresh()
|
||||
|
||||
expect(contextValue(context, WeatherKey)).toBeUndefined()
|
||||
expect(items).toHaveLength(0)
|
||||
})
|
||||
|
||||
test("captures errors from fetchContext", async () => {
|
||||
const failing: FeedSource = {
|
||||
id: "failing",
|
||||
async fetchContext() {
|
||||
throw new Error("Context fetch failed")
|
||||
},
|
||||
}
|
||||
|
||||
const engine = new FeedEngine().register(failing)
|
||||
|
||||
const { errors } = await engine.refresh()
|
||||
|
||||
expect(errors).toHaveLength(1)
|
||||
expect(errors[0]!.sourceId).toBe("failing")
|
||||
expect(errors[0]!.error.message).toBe("Context fetch failed")
|
||||
})
|
||||
|
||||
test("captures errors from fetchItems", async () => {
|
||||
const failing: FeedSource = {
|
||||
id: "failing",
|
||||
async fetchItems() {
|
||||
throw new Error("Items fetch failed")
|
||||
},
|
||||
}
|
||||
|
||||
const engine = new FeedEngine().register(failing)
|
||||
|
||||
const { errors } = await engine.refresh()
|
||||
|
||||
expect(errors).toHaveLength(1)
|
||||
expect(errors[0]!.sourceId).toBe("failing")
|
||||
expect(errors[0]!.error.message).toBe("Items fetch failed")
|
||||
})
|
||||
|
||||
test("continues after source error", async () => {
|
||||
const failing: FeedSource = {
|
||||
id: "failing",
|
||||
async fetchContext() {
|
||||
throw new Error("Failed")
|
||||
},
|
||||
}
|
||||
|
||||
const working: FeedSource = {
|
||||
id: "working",
|
||||
async fetchItems() {
|
||||
return [
|
||||
{
|
||||
id: "item-1",
|
||||
type: "test",
|
||||
priority: 0.5,
|
||||
timestamp: new Date(),
|
||||
data: {},
|
||||
},
|
||||
]
|
||||
},
|
||||
}
|
||||
|
||||
const engine = new FeedEngine().register(failing).register(working)
|
||||
|
||||
const { items, errors } = await engine.refresh()
|
||||
|
||||
expect(errors).toHaveLength(1)
|
||||
expect(items).toHaveLength(1)
|
||||
})
|
||||
})
|
||||
|
||||
describe("currentContext", () => {
|
||||
test("returns initial context before refresh", () => {
|
||||
const engine = new FeedEngine()
|
||||
|
||||
const context = engine.currentContext()
|
||||
|
||||
expect(context.time).toBeInstanceOf(Date)
|
||||
})
|
||||
|
||||
test("returns accumulated context after refresh", async () => {
|
||||
const location = createLocationSource()
|
||||
location.simulateUpdate({ lat: 51.5, lng: -0.1 })
|
||||
|
||||
const engine = new FeedEngine().register(location)
|
||||
|
||||
await engine.refresh()
|
||||
|
||||
const context = engine.currentContext()
|
||||
expect(contextValue(context, LocationKey)).toEqual({ lat: 51.5, lng: -0.1 })
|
||||
})
|
||||
})
|
||||
|
||||
describe("subscribe", () => {
|
||||
test("returns unsubscribe function", () => {
|
||||
const engine = new FeedEngine()
|
||||
let callCount = 0
|
||||
|
||||
const unsubscribe = engine.subscribe(() => {
|
||||
callCount++
|
||||
})
|
||||
|
||||
unsubscribe()
|
||||
|
||||
// Subscriber should not be called after unsubscribe
|
||||
expect(callCount).toBe(0)
|
||||
})
|
||||
})
|
||||
|
||||
describe("reactive updates", () => {
|
||||
test("start subscribes to onContextUpdate", async () => {
|
||||
const location = createLocationSource()
|
||||
const weather = createWeatherSource()
|
||||
|
||||
const engine = new FeedEngine().register(location).register(weather)
|
||||
|
||||
const results: Array<{ items: FeedItem[] }> = []
|
||||
engine.subscribe((result) => {
|
||||
results.push({ items: result.items })
|
||||
})
|
||||
|
||||
engine.start()
|
||||
|
||||
// Simulate location update
|
||||
location.simulateUpdate({ lat: 51.5, lng: -0.1 })
|
||||
|
||||
// Wait for async refresh
|
||||
await new Promise((resolve) => setTimeout(resolve, 50))
|
||||
|
||||
expect(results.length).toBeGreaterThan(0)
|
||||
expect(results[0]!.items[0]!.type).toBe("weather")
|
||||
})
|
||||
|
||||
test("stop unsubscribes from all sources", async () => {
|
||||
const location = createLocationSource()
|
||||
|
||||
const engine = new FeedEngine().register(location)
|
||||
|
||||
let callCount = 0
|
||||
engine.subscribe(() => {
|
||||
callCount++
|
||||
})
|
||||
|
||||
engine.start()
|
||||
engine.stop()
|
||||
|
||||
// Simulate update after stop
|
||||
location.simulateUpdate({ lat: 1, lng: 1 })
|
||||
|
||||
await new Promise((resolve) => setTimeout(resolve, 50))
|
||||
|
||||
expect(callCount).toBe(0)
|
||||
})
|
||||
|
||||
test("start is idempotent", () => {
|
||||
const location = createLocationSource()
|
||||
const engine = new FeedEngine().register(location)
|
||||
|
||||
// Should not throw or double-subscribe
|
||||
engine.start()
|
||||
engine.start()
|
||||
engine.stop()
|
||||
})
|
||||
})
|
||||
})
|
||||
335
packages/aris-core/src/feed-engine.ts
Normal file
335
packages/aris-core/src/feed-engine.ts
Normal file
@@ -0,0 +1,335 @@
|
||||
import type { Context } from "./context"
|
||||
import type { FeedItem } from "./feed"
|
||||
import type { FeedSource } from "./feed-source"
|
||||
|
||||
export interface SourceError {
|
||||
sourceId: string
|
||||
error: Error
|
||||
}
|
||||
|
||||
export interface FeedResult<TItem extends FeedItem = FeedItem> {
|
||||
context: Context
|
||||
items: TItem[]
|
||||
errors: SourceError[]
|
||||
}
|
||||
|
||||
export type FeedSubscriber<TItem extends FeedItem = FeedItem> = (result: FeedResult<TItem>) => void
|
||||
|
||||
interface SourceGraph {
|
||||
sources: Map<string, FeedSource>
|
||||
sorted: FeedSource[]
|
||||
dependents: Map<string, string[]>
|
||||
}
|
||||
|
||||
/**
|
||||
* Orchestrates FeedSources, managing the dependency graph and context flow.
|
||||
*
|
||||
* Sources declare dependencies on other sources. The engine:
|
||||
* - Validates the dependency graph (no missing deps, no cycles)
|
||||
* - Runs fetchContext() in topological order during refresh
|
||||
* - Runs fetchItems() on all sources with accumulated context
|
||||
* - Subscribes to reactive updates via onContextUpdate/onItemsUpdate
|
||||
*
|
||||
* @example
|
||||
* ```ts
|
||||
* const engine = new FeedEngine()
|
||||
* .register(locationSource)
|
||||
* .register(weatherSource)
|
||||
* .register(alertSource)
|
||||
*
|
||||
* // Pull-based refresh
|
||||
* const { context, items, errors } = await engine.refresh()
|
||||
*
|
||||
* // Reactive updates
|
||||
* engine.subscribe((result) => {
|
||||
* console.log(result.items)
|
||||
* })
|
||||
* engine.start()
|
||||
*
|
||||
* // Cleanup
|
||||
* engine.stop()
|
||||
* ```
|
||||
*/
|
||||
export class FeedEngine<TItems extends FeedItem = FeedItem> {
|
||||
private sources = new Map<string, FeedSource>()
|
||||
private graph: SourceGraph | null = null
|
||||
private context: Context = { time: new Date() }
|
||||
private subscribers = new Set<FeedSubscriber<TItems>>()
|
||||
private cleanups: Array<() => void> = []
|
||||
private started = false
|
||||
|
||||
/**
|
||||
* Registers a FeedSource. Invalidates the cached graph.
|
||||
*/
|
||||
register<TItem extends FeedItem>(source: FeedSource<TItem>): FeedEngine<TItems | TItem> {
|
||||
this.sources.set(source.id, source)
|
||||
this.graph = null
|
||||
return this as FeedEngine<TItems | TItem>
|
||||
}
|
||||
|
||||
/**
|
||||
* Unregisters a FeedSource by ID. Invalidates the cached graph.
|
||||
*/
|
||||
unregister(sourceId: string): this {
|
||||
this.sources.delete(sourceId)
|
||||
this.graph = null
|
||||
return this
|
||||
}
|
||||
|
||||
/**
|
||||
* Refreshes the feed by running all sources in dependency order.
|
||||
* Calls fetchContext() then fetchItems() on each source.
|
||||
*/
|
||||
async refresh(): Promise<FeedResult<TItems>> {
|
||||
const graph = this.ensureGraph()
|
||||
const errors: SourceError[] = []
|
||||
|
||||
// Reset context with fresh time
|
||||
let context: Context = { time: new Date() }
|
||||
|
||||
// Run fetchContext in topological order
|
||||
for (const source of graph.sorted) {
|
||||
if (source.fetchContext) {
|
||||
try {
|
||||
const update = await source.fetchContext(context)
|
||||
context = { ...context, ...update }
|
||||
} catch (err) {
|
||||
errors.push({
|
||||
sourceId: source.id,
|
||||
error: err instanceof Error ? err : new Error(String(err)),
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Run fetchItems on all sources
|
||||
const items: FeedItem[] = []
|
||||
for (const source of graph.sorted) {
|
||||
if (source.fetchItems) {
|
||||
try {
|
||||
const sourceItems = await source.fetchItems(context)
|
||||
items.push(...sourceItems)
|
||||
} catch (err) {
|
||||
errors.push({
|
||||
sourceId: source.id,
|
||||
error: err instanceof Error ? err : new Error(String(err)),
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Sort by priority descending
|
||||
items.sort((a, b) => b.priority - a.priority)
|
||||
|
||||
this.context = context
|
||||
|
||||
return { context, items: items as TItems[], errors }
|
||||
}
|
||||
|
||||
/**
|
||||
* Subscribes to feed updates. Returns unsubscribe function.
|
||||
*/
|
||||
subscribe(callback: FeedSubscriber<TItems>): () => void {
|
||||
this.subscribers.add(callback)
|
||||
return () => {
|
||||
this.subscribers.delete(callback)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Starts reactive subscriptions on all sources.
|
||||
* Sources with onContextUpdate will trigger re-computation of dependents.
|
||||
*/
|
||||
start(): void {
|
||||
if (this.started) return
|
||||
|
||||
this.started = true
|
||||
const graph = this.ensureGraph()
|
||||
|
||||
for (const source of graph.sorted) {
|
||||
if (source.onContextUpdate) {
|
||||
const cleanup = source.onContextUpdate(
|
||||
(update) => {
|
||||
this.handleContextUpdate(source.id, update)
|
||||
},
|
||||
() => this.context,
|
||||
)
|
||||
this.cleanups.push(cleanup)
|
||||
}
|
||||
|
||||
if (source.onItemsUpdate) {
|
||||
const cleanup = source.onItemsUpdate(
|
||||
() => {
|
||||
this.scheduleRefresh()
|
||||
},
|
||||
() => this.context,
|
||||
)
|
||||
this.cleanups.push(cleanup)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Stops all reactive subscriptions.
|
||||
*/
|
||||
stop(): void {
|
||||
this.started = false
|
||||
for (const cleanup of this.cleanups) {
|
||||
cleanup()
|
||||
}
|
||||
this.cleanups = []
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the current accumulated context.
|
||||
*/
|
||||
currentContext(): Context {
|
||||
return this.context
|
||||
}
|
||||
|
||||
private ensureGraph(): SourceGraph {
|
||||
if (!this.graph) {
|
||||
this.graph = buildGraph(Array.from(this.sources.values()))
|
||||
}
|
||||
return this.graph
|
||||
}
|
||||
|
||||
private handleContextUpdate(sourceId: string, update: Partial<Context>): void {
|
||||
this.context = { ...this.context, ...update, time: new Date() }
|
||||
|
||||
// Re-run dependents and notify
|
||||
this.refreshDependents(sourceId)
|
||||
}
|
||||
|
||||
private async refreshDependents(sourceId: string): Promise<void> {
|
||||
const graph = this.ensureGraph()
|
||||
const toRefresh = this.collectDependents(sourceId, graph)
|
||||
|
||||
// Re-run fetchContext for dependents in order
|
||||
for (const id of toRefresh) {
|
||||
const source = graph.sources.get(id)
|
||||
if (source?.fetchContext) {
|
||||
try {
|
||||
const update = await source.fetchContext(this.context)
|
||||
this.context = { ...this.context, ...update }
|
||||
} catch {
|
||||
// Errors during reactive updates are logged but don't stop propagation
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Collect items from all sources
|
||||
const items: FeedItem[] = []
|
||||
const errors: SourceError[] = []
|
||||
|
||||
for (const source of graph.sorted) {
|
||||
if (source.fetchItems) {
|
||||
try {
|
||||
const sourceItems = await source.fetchItems(this.context)
|
||||
items.push(...sourceItems)
|
||||
} catch (err) {
|
||||
errors.push({
|
||||
sourceId: source.id,
|
||||
error: err instanceof Error ? err : new Error(String(err)),
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
items.sort((a, b) => b.priority - a.priority)
|
||||
|
||||
this.notifySubscribers({ context: this.context, items: items as TItems[], errors })
|
||||
}
|
||||
|
||||
private collectDependents(sourceId: string, graph: SourceGraph): string[] {
|
||||
const result: string[] = []
|
||||
const visited = new Set<string>()
|
||||
|
||||
const collect = (id: string): void => {
|
||||
const deps = graph.dependents.get(id) ?? []
|
||||
for (const dep of deps) {
|
||||
if (!visited.has(dep)) {
|
||||
visited.add(dep)
|
||||
result.push(dep)
|
||||
collect(dep)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
collect(sourceId)
|
||||
|
||||
// Return in topological order
|
||||
return graph.sorted.filter((s) => result.includes(s.id)).map((s) => s.id)
|
||||
}
|
||||
|
||||
private scheduleRefresh(): void {
|
||||
// Simple immediate refresh for now - could add debouncing later
|
||||
this.refresh().then((result) => {
|
||||
this.notifySubscribers(result)
|
||||
})
|
||||
}
|
||||
|
||||
private notifySubscribers(result: FeedResult<TItems>): void {
|
||||
this.subscribers.forEach((callback) => {
|
||||
try {
|
||||
callback(result)
|
||||
} catch {
|
||||
// Subscriber errors shouldn't break other subscribers
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
function buildGraph(sources: FeedSource[]): SourceGraph {
|
||||
const byId = new Map<string, FeedSource>()
|
||||
for (const source of sources) {
|
||||
byId.set(source.id, source)
|
||||
}
|
||||
|
||||
// Validate dependencies exist
|
||||
for (const source of sources) {
|
||||
for (const dep of source.dependencies ?? []) {
|
||||
if (!byId.has(dep)) {
|
||||
throw new Error(`Source "${source.id}" depends on "${dep}" which is not registered`)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Check for cycles and topologically sort
|
||||
const visited = new Set<string>()
|
||||
const visiting = new Set<string>()
|
||||
const sorted: FeedSource[] = []
|
||||
|
||||
function visit(id: string, path: string[]): void {
|
||||
if (visiting.has(id)) {
|
||||
const cycle = [...path.slice(path.indexOf(id)), id].join(" → ")
|
||||
throw new Error(`Circular dependency detected: ${cycle}`)
|
||||
}
|
||||
if (visited.has(id)) return
|
||||
|
||||
visiting.add(id)
|
||||
const source = byId.get(id)!
|
||||
for (const dep of source.dependencies ?? []) {
|
||||
visit(dep, [...path, id])
|
||||
}
|
||||
visiting.delete(id)
|
||||
visited.add(id)
|
||||
sorted.push(source)
|
||||
}
|
||||
|
||||
for (const source of sources) {
|
||||
visit(source.id, [])
|
||||
}
|
||||
|
||||
// Build reverse dependency map
|
||||
const dependents = new Map<string, string[]>()
|
||||
for (const source of sources) {
|
||||
for (const dep of source.dependencies ?? []) {
|
||||
const list = dependents.get(dep) ?? []
|
||||
list.push(source.id)
|
||||
dependents.set(dep, list)
|
||||
}
|
||||
}
|
||||
|
||||
return { sources: byId, sorted, dependents }
|
||||
}
|
||||
@@ -8,20 +8,35 @@ export type { FeedItem } from "./feed"
|
||||
// Feed Source
|
||||
export type { FeedSource } from "./feed-source"
|
||||
|
||||
// Feed Engine
|
||||
export type { FeedResult, FeedSubscriber, SourceError } from "./feed-engine"
|
||||
export { FeedEngine } from "./feed-engine"
|
||||
|
||||
// =============================================================================
|
||||
// DEPRECATED - Use FeedSource + FeedEngine instead
|
||||
// =============================================================================
|
||||
|
||||
// Data Source (deprecated - use FeedSource)
|
||||
export type { DataSource } from "./data-source"
|
||||
|
||||
// Context Provider
|
||||
// Context Provider (deprecated - use FeedSource)
|
||||
export type { ContextProvider } from "./context-provider"
|
||||
|
||||
// Context Bridge
|
||||
// Context Bridge (deprecated - use FeedEngine)
|
||||
export type { ProviderError, RefreshResult } from "./context-bridge"
|
||||
export { ContextBridge } from "./context-bridge"
|
||||
|
||||
// Reconciler
|
||||
export type { ReconcileResult, ReconcilerConfig, SourceError } from "./reconciler"
|
||||
// Reconciler (deprecated - use FeedEngine)
|
||||
export type {
|
||||
ReconcileResult,
|
||||
ReconcilerConfig,
|
||||
SourceError as ReconcilerSourceError,
|
||||
} from "./reconciler"
|
||||
export { Reconciler } from "./reconciler"
|
||||
|
||||
// Feed Controller
|
||||
export type { FeedControllerConfig, FeedSubscriber } from "./feed-controller"
|
||||
// Feed Controller (deprecated - use FeedEngine)
|
||||
export type {
|
||||
FeedControllerConfig,
|
||||
FeedSubscriber as FeedControllerSubscriber,
|
||||
} from "./feed-controller"
|
||||
export { FeedController } from "./feed-controller"
|
||||
|
||||
Reference in New Issue
Block a user