refactor: rename aris to aelis (#59)

Rename all references across the codebase: package names,
imports, source IDs, directory names, docs, and configs.

Co-authored-by: Ona <no-reply@ona.com>
This commit is contained in:
2026-03-10 19:19:23 +00:00
committed by GitHub
parent 230116d9f7
commit 863c298bd3
201 changed files with 891 additions and 647 deletions

View File

@@ -0,0 +1,186 @@
# @aelis/core
Core orchestration layer for AELIS feed reconciliation.
## Overview
```mermaid
flowchart TB
subgraph Sources["Feed Sources (Graph)"]
LS[Location Source]
WS[Weather Source]
TS[TFL Source]
CS[Calendar Source]
end
LS --> WS
LS --> TS
subgraph Controller["FeedController"]
direction TB
C1[Holds context]
C2[Manages source graph]
C3[Reconciles on update]
C4[Notifies subscribers]
end
Sources --> Controller
Controller --> Sub[Subscribers]
```
## Concepts
### FeedSource
A unified interface for sources that provide context and/or feed items. Sources form a dependency graph.
```ts
interface FeedSource<TItem extends FeedItem = FeedItem> {
readonly id: string
readonly dependencies?: readonly string[]
// Context production (optional)
onContextUpdate?(
callback: (update: Partial<Context>) => void,
getContext: () => Context,
): () => void
fetchContext?(context: Context): Promise<Partial<Context>>
// Feed item production (optional)
onItemsUpdate?(callback: (items: TItem[]) => void, getContext: () => Context): () => void
fetchItems?(context: Context): Promise<TItem[]>
}
```
A source may:
- Provide context for other sources (implement `fetchContext`/`onContextUpdate`)
- Produce feed items (implement `fetchItems`/`onItemsUpdate`)
- Both
### Context Keys
Each package exports typed context keys for type-safe access:
```ts
import { contextKey, type ContextKey } from "@aelis/core"
interface Location {
lat: number
lng: number
}
export const LocationKey: ContextKey<Location> = contextKey("location")
```
## Usage
### Define a Context-Only Source
```ts
import type { FeedSource } from "@aelis/core"
const locationSource: FeedSource = {
id: "location",
onContextUpdate(callback, _getContext) {
const watchId = navigator.geolocation.watchPosition((pos) => {
callback({
[LocationKey]: { lat: pos.coords.latitude, lng: pos.coords.longitude },
})
})
return () => navigator.geolocation.clearWatch(watchId)
},
async fetchContext() {
const pos = await getCurrentPosition()
return {
[LocationKey]: { lat: pos.coords.latitude, lng: pos.coords.longitude },
}
},
}
```
### Define a Source with Dependencies
```ts
import type { FeedSource, FeedItem } from "@aelis/core"
import { contextValue } from "@aelis/core"
type WeatherItem = FeedItem<"weather", { temp: number; condition: string }>
const weatherSource: FeedSource<WeatherItem> = {
id: "weather",
dependencies: ["location"],
async fetchContext(context) {
const location = contextValue(context, LocationKey)
if (!location) return {}
const weather = await fetchWeatherApi(location)
return { [WeatherKey]: weather }
},
async fetchItems(context) {
const weather = contextValue(context, WeatherKey)
if (!weather) return []
return [
{
id: `weather-${Date.now()}`,
type: "weather",
priority: 0.5,
timestamp: new Date(),
data: { temp: weather.temp, condition: weather.condition },
},
]
},
}
```
### Graph Behavior
The source graph:
1. Validates all dependencies exist
2. Detects circular dependencies
3. Topologically sorts sources
On refresh:
1. `fetchContext` runs in dependency order
2. `fetchItems` runs on all sources
3. Combined items returned to subscribers
On reactive update:
1. Source pushes context update via `onContextUpdate` callback
2. Dependent sources re-run `fetchContext`
3. Affected sources re-run `fetchItems`
4. Subscribers notified
## API
### Context
| Export | Description |
| ---------------------------- | --------------------------------------- |
| `ContextKey<T>` | Branded type for type-safe context keys |
| `contextKey<T>(key)` | Creates a typed context key |
| `contextValue(context, key)` | Type-safe context value accessor |
| `Context` | Time + arbitrary key-value bag |
### Feed
| Export | Description |
| ------------------------ | ------------------------ |
| `FeedSource<TItem>` | Unified source interface |
| `FeedItem<TType, TData>` | Single item in the feed |
### Legacy (deprecated)
| Export | Description |
| ---------------------------- | ------------------------ |
| `DataSource<TItem, TConfig>` | Use `FeedSource` instead |
| `ContextProvider<T>` | Use `FeedSource` instead |
| `ContextBridge` | Use source graph instead |

View File

@@ -0,0 +1,13 @@
{
"name": "@aelis/core",
"version": "0.0.0",
"type": "module",
"main": "src/index.ts",
"types": "src/index.ts",
"scripts": {
"test": "bun test ."
},
"dependencies": {
"@standard-schema/spec": "^1.1.0"
}
}

View File

@@ -0,0 +1,27 @@
import type { StandardSchemaV1 } from "@standard-schema/spec"
/**
* Describes an action a source can perform.
*
* Action IDs use descriptive verb-noun kebab-case (e.g., "update-location", "play-track").
* Combined with the source's reverse-domain ID, they form a globally unique identifier:
* `<sourceId>/<actionId>` (e.g., "aelis.location/update-location").
*/
export class UnknownActionError extends Error {
readonly actionId: string
constructor(actionId: string) {
super(`Unknown action: ${actionId}`)
this.name = "UnknownActionError"
this.actionId = actionId
}
}
export interface ActionDefinition<TInput = unknown> {
/** Descriptive action name in kebab-case (e.g., "update-location", "play-track") */
readonly id: string
/** Optional longer description */
readonly description?: string
/** Schema for input validation. Accepts any Standard Schema compatible validator (arktype, zod, valibot, etc.). */
readonly input?: StandardSchemaV1<TInput>
}

View File

@@ -0,0 +1,104 @@
import type { ContextEntry } from "./context"
import type { ContextProvider } from "./context-provider"
import { contextKey } from "./context"
interface ContextUpdatable {
pushContextUpdate(entries: readonly ContextEntry[]): void
}
export interface ProviderError {
key: string
error: Error
}
export interface RefreshResult {
errors: ProviderError[]
}
/**
* Bridges context providers to a feed controller.
*
* Subscribes to provider updates and forwards them to the controller.
* Supports manual refresh to gather current values from all providers.
*
* @example
* ```ts
* const controller = new FeedController()
* .addDataSource(new WeatherDataSource())
* .addDataSource(new TflDataSource())
*
* const bridge = new ContextBridge(controller)
* .addProvider(new LocationProvider())
* .addProvider(new MusicProvider())
*
* // Manual refresh gathers from all providers
* await bridge.refresh()
*
* // Cleanup
* bridge.stop()
* controller.stop()
* ```
*/
export class ContextBridge {
private controller: ContextUpdatable
private providers = new Map<string, ContextProvider>()
private cleanups: Array<() => void> = []
constructor(controller: ContextUpdatable) {
this.controller = controller
}
/**
* Registers a context provider. Immediately subscribes to updates.
*/
addProvider<T>(provider: ContextProvider<T>): this {
this.providers.set(provider.key, provider as ContextProvider)
const cleanup = provider.onUpdate((value) => {
this.controller.pushContextUpdate([[contextKey(provider.key), value]])
})
this.cleanups.push(cleanup)
return this
}
/**
* Gathers current values from all providers and pushes to controller.
* Use for manual refresh when user pulls to refresh.
* Returns errors from providers that failed to fetch.
*/
async refresh(): Promise<RefreshResult> {
const collected: ContextEntry[] = []
const errors: ProviderError[] = []
const entries = Array.from(this.providers.entries())
const results = await Promise.allSettled(
entries.map(([_, provider]) => provider.fetchCurrentValue()),
)
entries.forEach(([key], i) => {
const result = results[i]
if (result?.status === "fulfilled") {
collected.push([contextKey(key), result.value])
} else if (result?.status === "rejected") {
errors.push({
key,
error: result.reason instanceof Error ? result.reason : new Error(String(result.reason)),
})
}
})
this.controller.pushContextUpdate(collected)
return { errors }
}
/**
* Unsubscribes from all providers.
*/
stop(): void {
this.cleanups.forEach((cleanup) => cleanup())
this.cleanups = []
}
}

View File

@@ -0,0 +1,35 @@
/**
* Provides context values reactively and on-demand.
*
* Implementations push updates when values change (reactive) and
* return current values when requested (for manual refresh).
*
* @example
* ```ts
* class LocationProvider implements ContextProvider<Location> {
* readonly key = LocationKey
*
* onUpdate(callback: (value: Location) => void): () => void {
* const watchId = navigator.geolocation.watchPosition(pos => {
* callback({ lat: pos.coords.latitude, lng: pos.coords.longitude, accuracy: pos.coords.accuracy })
* })
* return () => navigator.geolocation.clearWatch(watchId)
* }
*
* async fetchCurrentValue(): Promise<Location> {
* const pos = await getCurrentPosition()
* return { lat: pos.coords.latitude, lng: pos.coords.longitude, accuracy: pos.coords.accuracy }
* }
* }
* ```
*/
export interface ContextProvider<T = unknown> {
/** The context key this provider populates */
readonly key: string
/** Subscribe to value changes. Returns cleanup function. */
onUpdate(callback: (value: T) => void): () => void
/** Fetch current value on-demand (used for manual refresh). */
fetchCurrentValue(): Promise<T>
}

View File

@@ -0,0 +1,184 @@
import { describe, expect, test } from "bun:test"
import type { ContextKey } from "./context"
import { Context, contextKey } from "./context"
interface Weather {
temperature: number
}
interface NextEvent {
title: string
}
const WeatherKey: ContextKey<Weather> = contextKey("aelis.weather", "current")
const NextEventKey: ContextKey<NextEvent> = contextKey("aelis.google-calendar", "nextEvent")
describe("Context", () => {
describe("get", () => {
test("returns undefined for missing key", () => {
const ctx = new Context()
expect(ctx.get(WeatherKey)).toBeUndefined()
})
test("returns value for exact key match", () => {
const ctx = new Context()
const weather: Weather = { temperature: 20 }
ctx.set([[WeatherKey, weather]])
expect(ctx.get(WeatherKey)).toEqual(weather)
})
test("distinguishes keys with different parts", () => {
const ctx = new Context()
ctx.set([
[WeatherKey, { temperature: 20 }],
[NextEventKey, { title: "Standup" }],
])
expect(ctx.get(WeatherKey)).toEqual({ temperature: 20 })
expect(ctx.get(NextEventKey)).toEqual({ title: "Standup" })
})
test("last write wins for same key", () => {
const ctx = new Context()
ctx.set([[WeatherKey, { temperature: 20 }]])
ctx.set([[WeatherKey, { temperature: 25 }]])
expect(ctx.get(WeatherKey)).toEqual({ temperature: 25 })
})
})
describe("find", () => {
test("returns empty array when no keys match", () => {
const ctx = new Context()
expect(ctx.find(WeatherKey)).toEqual([])
})
test("returns exact match as single result", () => {
const ctx = new Context()
ctx.set([[NextEventKey, { title: "Standup" }]])
const results = ctx.find(NextEventKey)
expect(results).toHaveLength(1)
expect(results[0]!.value).toEqual({ title: "Standup" })
})
test("prefix match returns multiple instances", () => {
const workKey = contextKey<NextEvent>("aelis.google-calendar", "nextEvent", {
account: "work",
})
const personalKey = contextKey<NextEvent>("aelis.google-calendar", "nextEvent", {
account: "personal",
})
const ctx = new Context()
ctx.set([
[workKey, { title: "Sprint Planning" }],
[personalKey, { title: "Dentist" }],
])
const prefix = contextKey<NextEvent>("aelis.google-calendar", "nextEvent")
const results = ctx.find(prefix)
expect(results).toHaveLength(2)
const titles = results.map((r) => r.value.title).sort()
expect(titles).toEqual(["Dentist", "Sprint Planning"])
})
test("prefix match includes exact match and longer keys", () => {
const baseKey = contextKey<NextEvent>("aelis.google-calendar", "nextEvent")
const instanceKey = contextKey<NextEvent>("aelis.google-calendar", "nextEvent", {
account: "work",
})
const ctx = new Context()
ctx.set([
[baseKey, { title: "Base" }],
[instanceKey, { title: "Instance" }],
])
const results = ctx.find(baseKey)
expect(results).toHaveLength(2)
})
test("does not match keys that share a string prefix but differ at segment boundary", () => {
const keyA = contextKey<string>("aelis.calendar", "next")
const keyB = contextKey<string>("aelis.calendar", "nextEvent")
const ctx = new Context()
ctx.set([
[keyA, "a"],
[keyB, "b"],
])
const results = ctx.find(keyA)
expect(results).toHaveLength(1)
expect(results[0]!.value).toBe("a")
})
test("object key parts with different property order match", () => {
const key1 = contextKey<string>("source", "ctx", { b: 2, a: 1 })
const key2 = contextKey<string>("source", "ctx", { a: 1, b: 2 })
const ctx = new Context()
ctx.set([[key1, "value"]])
// Exact match via get should work regardless of property order
expect(ctx.get(key2)).toBe("value")
// find with the reordered key as prefix should also match
const prefix = contextKey<string>("source", "ctx")
const results = ctx.find(prefix)
expect(results).toHaveLength(1)
})
test("single-segment prefix matches all keys starting with that segment", () => {
const ctx = new Context()
ctx.set([
[contextKey("aelis.weather", "current"), { temperature: 20 }],
[contextKey("aelis.weather", "forecast"), { high: 25 }],
[contextKey("aelis.calendar", "nextEvent"), { title: "Meeting" }],
])
const results = ctx.find(contextKey("aelis.weather"))
expect(results).toHaveLength(2)
})
test("does not match shorter keys", () => {
const ctx = new Context()
ctx.set([[contextKey("aelis.weather"), "short"]])
const results = ctx.find(contextKey("aelis.weather", "current"))
expect(results).toHaveLength(0)
})
test("numeric key parts match correctly", () => {
const ctx = new Context()
ctx.set([
[contextKey("source", 1, "data"), "one"],
[contextKey("source", 2, "data"), "two"],
])
const results = ctx.find(contextKey("source", 1))
expect(results).toHaveLength(1)
expect(results[0]!.value).toBe("one")
})
})
describe("size", () => {
test("returns 0 for empty context", () => {
expect(new Context().size).toBe(0)
})
test("reflects number of entries", () => {
const ctx = new Context()
ctx.set([
[WeatherKey, { temperature: 20 }],
[NextEventKey, { title: "Standup" }],
])
expect(ctx.size).toBe(2)
})
})
})

View File

@@ -0,0 +1,128 @@
/**
* Tuple-keyed context system inspired by React Query's query keys.
*
* Context keys are arrays that form a hierarchy. Sources write to specific
* keys (e.g., ["aelis.google-calendar", "nextEvent", { account: "work" }])
* and consumers can query by exact match or prefix match to get all values
* of a given type across source instances.
*/
// -- Key types --
/** A single segment of a context key: string, number, or a record of primitives. */
export type ContextKeyPart = string | number | Record<string, unknown>
/** A context key is a readonly tuple of parts, branded with the value type. */
export type ContextKey<T> = readonly ContextKeyPart[] & { __contextValue?: T }
/** Creates a typed context key. */
export function contextKey<T>(...parts: ContextKeyPart[]): ContextKey<T> {
return parts as ContextKey<T>
}
// -- Serialization --
/**
* Deterministic serialization of a context key for use as a Map key.
* Object parts have their keys sorted for stable comparison.
*/
export function serializeKey(key: readonly ContextKeyPart[]): string {
return JSON.stringify(key, (_key, value) => {
if (value !== null && typeof value === "object" && !Array.isArray(value)) {
const sorted: Record<string, unknown> = {}
for (const k of Object.keys(value).sort()) {
sorted[k] = value[k]
}
return sorted
}
return value
})
}
// -- Key matching --
/** Returns true if `key` starts with all parts of `prefix`. */
function keyStartsWith(key: readonly ContextKeyPart[], prefix: readonly ContextKeyPart[]): boolean {
if (key.length < prefix.length) return false
for (let i = 0; i < prefix.length; i++) {
if (!partsEqual(key[i]!, prefix[i]!)) return false
}
return true
}
/** Recursive structural equality, matching React Query's partialMatchKey approach. */
function partsEqual(a: unknown, b: unknown): boolean {
if (a === b) return true
if (typeof a !== typeof b) return false
if (a && b && typeof a === "object" && typeof b === "object") {
const aKeys = Object.keys(a)
const bKeys = Object.keys(b)
if (aKeys.length !== bKeys.length) return false
return aKeys.every((key) =>
partsEqual((a as Record<string, unknown>)[key], (b as Record<string, unknown>)[key]),
)
}
return false
}
// -- Context store --
/** A single context entry: a key-value pair. */
export type ContextEntry<T = unknown> = readonly [ContextKey<T>, T]
/**
* Mutable context store with tuple keys.
*
* Supports exact-match lookups and prefix-match queries.
* Sources write context in topological order during refresh.
*/
export class Context {
time: Date
private readonly store: Map<string, { key: readonly ContextKeyPart[]; value: unknown }>
constructor(time: Date = new Date()) {
this.time = time
this.store = new Map()
}
/** Merges entries into this context. */
set(entries: readonly ContextEntry[]): void {
for (const [key, value] of entries) {
this.store.set(serializeKey(key), { key, value })
}
}
/** Exact-match lookup. Returns the value for the given key, or undefined. */
get<T>(key: ContextKey<T>): T | undefined {
const entry = this.store.get(serializeKey(key))
return entry?.value as T | undefined
}
/**
* Prefix-match query. Returns all entries whose key starts with the given prefix.
*
* @example
* ```ts
* // Get all "nextEvent" values across calendar source instances
* const events = context.find(contextKey("nextEvent"))
* ```
*/
find<T>(prefix: ContextKey<T>): Array<{ key: readonly ContextKeyPart[]; value: T }> {
const results: Array<{ key: readonly ContextKeyPart[]; value: T }> = []
for (const entry of this.store.values()) {
if (keyStartsWith(entry.key, prefix)) {
results.push({ key: entry.key, value: entry.value as T })
}
}
return results
}
/** Returns the number of entries (excluding time). */
get size(): number {
return this.store.size
}
}

View File

@@ -0,0 +1,35 @@
import type { Context } from "./context"
import type { FeedItem } from "./feed"
/**
* Produces feed items from an external source.
*
* @example
* ```ts
* type WeatherItem = FeedItem<"weather", { temp: number }>
*
* class WeatherDataSource implements DataSource<WeatherItem> {
* readonly type = "weather"
*
* async query(context: Context): Promise<WeatherItem[]> {
* const location = context.get(LocationKey)
* if (!location) return []
* const data = await fetchWeather(location)
* return [{
* id: `weather-${Date.now()}`,
* type: this.type,
* timestamp: context.time,
* data: { temp: data.temperature },
* signals: { urgency: 0.5, timeRelevance: "ambient" },
* }]
* }
* }
* ```
*/
export interface DataSource<TItem extends FeedItem = FeedItem, TConfig = unknown> {
/** Unique identifier for this source type */
readonly type: TItem["type"]
/** Queries the source and returns feed items */
query(context: Context, config: TConfig): Promise<TItem[]>
}

View File

@@ -0,0 +1,163 @@
import type { ContextEntry } from "./context"
import type { DataSource } from "./data-source"
import type { FeedItem } from "./feed"
import type { ReconcileResult } from "./reconciler"
import { Context } from "./context"
import { Reconciler } from "./reconciler"
export interface FeedControllerConfig {
/** Timeout for each data source query in milliseconds */
timeout?: number
/** Debounce window for batching context updates (default: 100ms) */
debounceMs?: number
/** Initial context state */
initialContext?: Context
}
export type FeedSubscriber<TItems extends FeedItem> = (result: ReconcileResult<TItems>) => void
interface RegisteredSource {
source: DataSource<FeedItem, unknown>
config: unknown
}
const DEFAULT_DEBOUNCE_MS = 100
/**
* Orchestrates feed reconciliation in response to context updates.
*
* Holds context state, debounces updates, queries data sources, and
* notifies subscribers. Each user should have their own instance.
*
* @example
* ```ts
* const controller = new FeedController({ debounceMs: 100 })
* .addDataSource(new WeatherDataSource())
* .addDataSource(new TflDataSource())
*
* controller.subscribe((result) => {
* console.log(result.items)
* })
*
* // Context update triggers debounced reconcile
* controller.pushContextUpdate([[LocationKey, location]])
*
* // Direct reconcile (no debounce)
* const result = await controller.reconcile()
*
* // Cleanup
* controller.stop()
* ```
*/
export class FeedController<TItems extends FeedItem = never> {
private sources = new Map<string, RegisteredSource>()
private subscribers = new Set<FeedSubscriber<TItems>>()
private context: Context
private debounceMs: number
private timeout: number | undefined
private pendingTimeout: ReturnType<typeof setTimeout> | null = null
private stopped = false
constructor(config?: FeedControllerConfig) {
this.context = config?.initialContext ?? new Context()
this.debounceMs = config?.debounceMs ?? DEFAULT_DEBOUNCE_MS
this.timeout = config?.timeout
}
/** Registers a data source. */
addDataSource<TItem extends FeedItem, TConfig>(
source: DataSource<TItem, TConfig>,
config?: TConfig,
): FeedController<TItems | TItem> {
this.sources.set(source.type, {
source: source as DataSource<FeedItem, unknown>,
config,
})
return this as FeedController<TItems | TItem>
}
/** Removes a data source by type. */
removeDataSource<T extends TItems["type"]>(
sourceType: T,
): FeedController<Exclude<TItems, { type: T }>> {
this.sources.delete(sourceType)
return this as unknown as FeedController<Exclude<TItems, { type: T }>>
}
/** Stops the controller and cancels pending reconciles. */
stop(): void {
this.stopped = true
if (this.pendingTimeout) {
clearTimeout(this.pendingTimeout)
this.pendingTimeout = null
}
}
/** Merges entries into context and schedules a debounced reconcile. */
pushContextUpdate(entries: readonly ContextEntry[]): void {
this.context.time = new Date()
this.context.set(entries)
this.scheduleReconcile()
}
/** Subscribes to feed updates. Returns unsubscribe function. */
subscribe(callback: FeedSubscriber<TItems>): () => void {
this.subscribers.add(callback)
return () => {
this.subscribers.delete(callback)
}
}
/** Immediately reconciles with current or provided context. */
async reconcile(context?: Context): Promise<ReconcileResult<TItems>> {
const ctx = context ?? this.context
const reconciler = this.createReconciler()
return reconciler.reconcile(ctx)
}
/** Returns current context. */
getContext(): Context {
return this.context
}
private scheduleReconcile(): void {
if (this.pendingTimeout) return
this.pendingTimeout = setTimeout(() => {
this.flushPending()
}, this.debounceMs)
}
private async flushPending(): Promise<void> {
this.pendingTimeout = null
if (this.stopped) return
if (this.sources.size === 0) return
const reconciler = this.createReconciler()
const result = await reconciler.reconcile(this.context)
this.notifySubscribers(result)
}
private createReconciler(): Reconciler<TItems> {
const reconciler = new Reconciler<TItems>({ timeout: this.timeout })
Array.from(this.sources.values()).forEach(({ source, config }) => {
reconciler.register(source, config)
})
return reconciler as Reconciler<TItems>
}
private notifySubscribers(result: ReconcileResult<TItems>): void {
this.subscribers.forEach((callback) => {
try {
callback(result)
} catch {
// Subscriber errors shouldn't break other subscribers
}
})
}
}

View File

@@ -0,0 +1,931 @@
import { describe, expect, test } from "bun:test"
import type { ActionDefinition, ContextEntry, ContextKey, FeedItem, FeedSource } from "./index"
import { FeedEngine } from "./feed-engine"
import { Context, TimeRelevance, UnknownActionError, contextKey } from "./index"
// No-op action methods for test sources
const noActions = {
async listActions(): Promise<Record<string, ActionDefinition>> {
return {}
},
async executeAction(actionId: string): Promise<void> {
throw new UnknownActionError(actionId)
},
}
// =============================================================================
// CONTEXT KEYS
// =============================================================================
interface Location {
lat: number
lng: number
}
interface Weather {
temperature: number
condition: string
}
const LocationKey: ContextKey<Location> = contextKey("location")
const WeatherKey: ContextKey<Weather> = contextKey("weather")
// =============================================================================
// FEED ITEMS
// =============================================================================
type WeatherFeedItem = FeedItem<"weather", { temperature: number; condition: string }>
type AlertFeedItem = FeedItem<"alert", { message: string }>
// =============================================================================
// TEST HELPERS
// =============================================================================
interface SimulatedLocationSource extends FeedSource {
simulateUpdate(location: Location): void
}
function createLocationSource(): SimulatedLocationSource {
let callback: ((entries: readonly ContextEntry[]) => void) | null = null
let currentLocation: Location = { lat: 0, lng: 0 }
return {
id: "location",
...noActions,
onContextUpdate(cb) {
callback = cb
return () => {
callback = null
}
},
async fetchContext() {
return [[LocationKey, currentLocation]]
},
simulateUpdate(location: Location) {
currentLocation = location
callback?.([[LocationKey, location]])
},
}
}
function createWeatherSource(
fetchWeather: (location: Location) => Promise<Weather> = async () => ({
temperature: 20,
condition: "sunny",
}),
): FeedSource<WeatherFeedItem> {
return {
id: "weather",
dependencies: ["location"],
...noActions,
async fetchContext(context) {
const location = context.get(LocationKey)
if (!location) return null
const weather = await fetchWeather(location)
return [[WeatherKey, weather]]
},
async fetchItems(context) {
const weather = context.get(WeatherKey)
if (!weather) return []
return [
{
id: `weather-${Date.now()}`,
type: "weather",
timestamp: new Date(),
data: {
temperature: weather.temperature,
condition: weather.condition,
},
signals: { urgency: 0.5, timeRelevance: TimeRelevance.Ambient },
},
]
},
}
}
function createAlertSource(): FeedSource<AlertFeedItem> {
return {
id: "alert",
dependencies: ["weather"],
...noActions,
async fetchContext() {
return null
},
async fetchItems(context) {
const weather = context.get(WeatherKey)
if (!weather) return []
if (weather.condition === "storm") {
return [
{
id: "alert-storm",
type: "alert",
timestamp: new Date(),
data: { message: "Storm warning!" },
signals: { urgency: 1.0, timeRelevance: TimeRelevance.Imminent },
},
]
}
return []
},
}
}
// =============================================================================
// TESTS
// =============================================================================
describe("FeedEngine", () => {
describe("registration", () => {
test("registers sources", () => {
const engine = new FeedEngine()
const location = createLocationSource()
engine.register(location)
// Can refresh without error
expect(engine.refresh()).resolves.toBeDefined()
})
test("unregisters sources", async () => {
const engine = new FeedEngine()
const location = createLocationSource()
engine.register(location)
engine.unregister("location")
const result = await engine.refresh()
expect(result.items).toHaveLength(0)
})
test("allows chained registration", () => {
const engine = new FeedEngine()
.register(createLocationSource())
.register(createWeatherSource())
.register(createAlertSource())
expect(engine.refresh()).resolves.toBeDefined()
})
})
describe("graph validation", () => {
test("throws on missing dependency", async () => {
const engine = new FeedEngine()
const orphan: FeedSource = {
id: "orphan",
dependencies: ["nonexistent"],
...noActions,
async fetchContext() {
return null
},
}
engine.register(orphan)
await expect(engine.refresh()).rejects.toThrow(
'Source "orphan" depends on "nonexistent" which is not registered',
)
})
test("throws on circular dependency", async () => {
const engine = new FeedEngine()
const a: FeedSource = {
id: "a",
dependencies: ["b"],
...noActions,
async fetchContext() {
return null
},
}
const b: FeedSource = {
id: "b",
dependencies: ["a"],
...noActions,
async fetchContext() {
return null
},
}
engine.register(a).register(b)
await expect(engine.refresh()).rejects.toThrow("Circular dependency detected: a → b → a")
})
test("throws on longer cycles", async () => {
const engine = new FeedEngine()
const a: FeedSource = {
id: "a",
dependencies: ["c"],
...noActions,
async fetchContext() {
return null
},
}
const b: FeedSource = {
id: "b",
dependencies: ["a"],
...noActions,
async fetchContext() {
return null
},
}
const c: FeedSource = {
id: "c",
dependencies: ["b"],
...noActions,
async fetchContext() {
return null
},
}
engine.register(a).register(b).register(c)
await expect(engine.refresh()).rejects.toThrow("Circular dependency detected")
})
})
describe("refresh", () => {
test("runs fetchContext in dependency order", async () => {
const order: string[] = []
const location: FeedSource = {
id: "location",
...noActions,
async fetchContext() {
order.push("location")
return [[LocationKey, { lat: 51.5, lng: -0.1 }]]
},
}
const weather: FeedSource = {
id: "weather",
dependencies: ["location"],
...noActions,
async fetchContext(ctx) {
order.push("weather")
const loc = ctx.get(LocationKey)
expect(loc).toBeDefined()
return [[WeatherKey, { temperature: 20, condition: "sunny" }]]
},
}
const engine = new FeedEngine().register(weather).register(location)
await engine.refresh()
expect(order).toEqual(["location", "weather"])
})
test("accumulates context across sources", async () => {
const location = createLocationSource()
location.simulateUpdate({ lat: 51.5, lng: -0.1 })
const weather = createWeatherSource()
const engine = new FeedEngine().register(location).register(weather)
const { context } = await engine.refresh()
expect(context.get(LocationKey)).toEqual({
lat: 51.5,
lng: -0.1,
})
expect(context.get(WeatherKey)).toEqual({
temperature: 20,
condition: "sunny",
})
})
test("collects items from all sources", async () => {
const location = createLocationSource()
location.simulateUpdate({ lat: 51.5, lng: -0.1 })
const weather = createWeatherSource()
const engine = new FeedEngine().register(location).register(weather)
const { items } = await engine.refresh()
expect(items).toHaveLength(1)
expect(items[0]!.type).toBe("weather")
})
test("returns items in source graph order (no engine-level sorting)", async () => {
const location = createLocationSource()
location.simulateUpdate({ lat: 51.5, lng: -0.1 })
const weather = createWeatherSource(async () => ({
temperature: 15,
condition: "storm",
}))
const alert = createAlertSource()
const engine = new FeedEngine().register(location).register(weather).register(alert)
const { items } = await engine.refresh()
expect(items).toHaveLength(2)
// Items returned in topological order (weather before alert)
expect(items[0]!.type).toBe("weather")
expect(items[1]!.type).toBe("alert")
// Signals are preserved for post-processors to consume
expect(items[0]!.signals?.urgency).toBe(0.5)
expect(items[1]!.signals?.urgency).toBe(1.0)
})
test("handles missing upstream context gracefully", async () => {
const location: FeedSource = {
id: "location",
...noActions,
async fetchContext() {
return null // No location available
},
}
const weather = createWeatherSource()
const engine = new FeedEngine().register(location).register(weather)
const { context, items } = await engine.refresh()
expect(context.get(WeatherKey)).toBeUndefined()
expect(items).toHaveLength(0)
})
test("captures errors from fetchContext", async () => {
const failing: FeedSource = {
id: "failing",
...noActions,
async fetchContext() {
throw new Error("Context fetch failed")
},
}
const engine = new FeedEngine().register(failing)
const { errors } = await engine.refresh()
expect(errors).toHaveLength(1)
expect(errors[0]!.sourceId).toBe("failing")
expect(errors[0]!.error.message).toBe("Context fetch failed")
})
test("captures errors from fetchItems", async () => {
const failing: FeedSource = {
id: "failing",
...noActions,
async fetchContext() {
return null
},
async fetchItems() {
throw new Error("Items fetch failed")
},
}
const engine = new FeedEngine().register(failing)
const { errors } = await engine.refresh()
expect(errors).toHaveLength(1)
expect(errors[0]!.sourceId).toBe("failing")
expect(errors[0]!.error.message).toBe("Items fetch failed")
})
test("continues after source error", async () => {
const failing: FeedSource = {
id: "failing",
...noActions,
async fetchContext() {
throw new Error("Failed")
},
}
const working: FeedSource = {
id: "working",
...noActions,
async fetchContext() {
return null
},
async fetchItems() {
return [
{
id: "item-1",
type: "test",
priority: 0.5,
timestamp: new Date(),
data: {},
},
]
},
}
const engine = new FeedEngine().register(failing).register(working)
const { items, errors } = await engine.refresh()
expect(errors).toHaveLength(1)
expect(items).toHaveLength(1)
})
})
describe("currentContext", () => {
test("returns initial context before refresh", () => {
const engine = new FeedEngine()
const context = engine.currentContext()
expect(context.time).toBeInstanceOf(Date)
})
test("returns accumulated context after refresh", async () => {
const location = createLocationSource()
location.simulateUpdate({ lat: 51.5, lng: -0.1 })
const engine = new FeedEngine().register(location)
await engine.refresh()
const context = engine.currentContext()
expect(context.get(LocationKey)).toEqual({
lat: 51.5,
lng: -0.1,
})
})
})
describe("subscribe", () => {
test("returns unsubscribe function", () => {
const engine = new FeedEngine()
let callCount = 0
const unsubscribe = engine.subscribe(() => {
callCount++
})
unsubscribe()
// Subscriber should not be called after unsubscribe
expect(callCount).toBe(0)
})
})
describe("reactive updates", () => {
test("start subscribes to onContextUpdate", async () => {
const location = createLocationSource()
const weather = createWeatherSource()
const engine = new FeedEngine().register(location).register(weather)
const results: Array<{ items: FeedItem[] }> = []
engine.subscribe((result) => {
results.push({ items: result.items })
})
engine.start()
// Simulate location update
location.simulateUpdate({ lat: 51.5, lng: -0.1 })
// Wait for async refresh
await new Promise((resolve) => setTimeout(resolve, 50))
expect(results.length).toBeGreaterThan(0)
expect(results[0]!.items[0]!.type).toBe("weather")
})
test("stop unsubscribes from all sources", async () => {
const location = createLocationSource()
const engine = new FeedEngine().register(location)
let callCount = 0
engine.subscribe(() => {
callCount++
})
engine.start()
engine.stop()
// Simulate update after stop
location.simulateUpdate({ lat: 1, lng: 1 })
await new Promise((resolve) => setTimeout(resolve, 50))
expect(callCount).toBe(0)
})
test("start is idempotent", () => {
const location = createLocationSource()
const engine = new FeedEngine().register(location)
// Should not throw or double-subscribe
engine.start()
engine.start()
engine.stop()
})
})
describe("executeAction", () => {
test("routes action to correct source", async () => {
let receivedAction = ""
let receivedParams: unknown = {}
const source: FeedSource = {
id: "test-source",
async listActions() {
return {
"do-thing": { id: "do-thing" },
}
},
async executeAction(actionId, params) {
receivedAction = actionId
receivedParams = params
},
async fetchContext() {
return null
},
}
const engine = new FeedEngine().register(source)
await engine.executeAction("test-source", "do-thing", { key: "value" })
expect(receivedAction).toBe("do-thing")
expect(receivedParams).toEqual({ key: "value" })
})
test("throws for unknown source", async () => {
const engine = new FeedEngine()
await expect(engine.executeAction("nonexistent", "action", {})).rejects.toThrow(
"Source not found: nonexistent",
)
})
test("throws for unknown action on source", async () => {
const source: FeedSource = {
id: "test-source",
...noActions,
async fetchContext() {
return null
},
}
const engine = new FeedEngine().register(source)
await expect(engine.executeAction("test-source", "nonexistent", {})).rejects.toThrow(
'Action "nonexistent" not found on source "test-source"',
)
})
})
describe("listActions", () => {
test("returns actions for a specific source", async () => {
const source: FeedSource = {
id: "test-source",
async listActions() {
return {
"action-1": { id: "action-1" },
"action-2": { id: "action-2" },
}
},
async executeAction() {},
async fetchContext() {
return null
},
}
const engine = new FeedEngine().register(source)
const actions = await engine.listActions("test-source")
expect(Object.keys(actions)).toEqual(["action-1", "action-2"])
})
test("throws for unknown source", async () => {
const engine = new FeedEngine()
await expect(engine.listActions("nonexistent")).rejects.toThrow(
"Source not found: nonexistent",
)
})
test("throws on mismatched action ID", async () => {
const source: FeedSource = {
id: "bad-source",
async listActions() {
return {
"correct-key": { id: "wrong-id" },
}
},
async executeAction() {},
async fetchContext() {
return null
},
}
const engine = new FeedEngine().register(source)
await expect(engine.listActions("bad-source")).rejects.toThrow(
'Action ID mismatch on source "bad-source"',
)
})
})
describe("lastFeed", () => {
test("returns null before any refresh", () => {
const engine = new FeedEngine()
expect(engine.lastFeed()).toBeNull()
})
test("returns cached result after refresh", async () => {
const location = createLocationSource()
location.simulateUpdate({ lat: 51.5, lng: -0.1 })
const weather = createWeatherSource()
const engine = new FeedEngine().register(location).register(weather)
const refreshResult = await engine.refresh()
const cached = engine.lastFeed()
expect(cached).not.toBeNull()
expect(cached!.items).toEqual(refreshResult.items)
expect(cached!.context).toEqual(refreshResult.context)
})
test("returns null after TTL expires", async () => {
const engine = new FeedEngine({ cacheTtlMs: 50 })
const location = createLocationSource()
location.simulateUpdate({ lat: 51.5, lng: -0.1 })
engine.register(location)
await engine.refresh()
expect(engine.lastFeed()).not.toBeNull()
await new Promise((resolve) => setTimeout(resolve, 60))
expect(engine.lastFeed()).toBeNull()
})
test("defaults to 5 minute TTL", async () => {
const engine = new FeedEngine()
const location = createLocationSource()
location.simulateUpdate({ lat: 51.5, lng: -0.1 })
engine.register(location)
await engine.refresh()
// Should still be cached immediately
expect(engine.lastFeed()).not.toBeNull()
})
test("refresh always fetches from sources", async () => {
let fetchCount = 0
const source: FeedSource = {
id: "counter",
...noActions,
async fetchContext() {
fetchCount++
return null
},
}
const engine = new FeedEngine().register(source)
await engine.refresh()
await engine.refresh()
await engine.refresh()
expect(fetchCount).toBe(3)
})
test("reactive context update refreshes cache", async () => {
const location = createLocationSource()
const weather = createWeatherSource()
const engine = new FeedEngine({ cacheTtlMs: 5000 }).register(location).register(weather)
engine.start()
// Simulate location update which triggers reactive refresh
location.simulateUpdate({ lat: 51.5, lng: -0.1 })
// Wait for async reactive refresh to complete
await new Promise((resolve) => setTimeout(resolve, 50))
const cached = engine.lastFeed()
expect(cached).not.toBeNull()
expect(cached!.items.length).toBeGreaterThan(0)
engine.stop()
})
test("reactive item update refreshes cache", async () => {
let itemUpdateCallback: ((items: FeedItem[]) => void) | null = null
const source: FeedSource = {
id: "reactive-items",
...noActions,
async fetchContext() {
return null
},
async fetchItems() {
return [
{
id: "item-1",
type: "test",
priority: 0.5,
timestamp: new Date(),
data: {},
},
]
},
onItemsUpdate(callback) {
itemUpdateCallback = callback
return () => {
itemUpdateCallback = null
}
},
}
const engine = new FeedEngine().register(source)
engine.start()
// Trigger item update
itemUpdateCallback!([])
// Wait for async refresh
await new Promise((resolve) => setTimeout(resolve, 50))
const cached = engine.lastFeed()
expect(cached).not.toBeNull()
expect(cached!.items).toHaveLength(1)
engine.stop()
})
test("TTL resets after reactive update", async () => {
const location = createLocationSource()
const weather = createWeatherSource()
const engine = new FeedEngine({ cacheTtlMs: 100 }).register(location).register(weather)
engine.start()
// Initial reactive update
location.simulateUpdate({ lat: 51.5, lng: -0.1 })
await new Promise((resolve) => setTimeout(resolve, 50))
expect(engine.lastFeed()).not.toBeNull()
// Wait 70ms (total 120ms from first update, past original TTL)
// but trigger another update at 50ms to reset TTL
location.simulateUpdate({ lat: 52.0, lng: -0.2 })
await new Promise((resolve) => setTimeout(resolve, 50))
// Should still be cached because TTL was reset by second update
expect(engine.lastFeed()).not.toBeNull()
engine.stop()
})
test("cacheTtlMs is configurable", async () => {
const engine = new FeedEngine({ cacheTtlMs: 30 })
const location = createLocationSource()
location.simulateUpdate({ lat: 51.5, lng: -0.1 })
engine.register(location)
await engine.refresh()
expect(engine.lastFeed()).not.toBeNull()
await new Promise((resolve) => setTimeout(resolve, 40))
expect(engine.lastFeed()).toBeNull()
})
test("auto-refreshes on TTL interval after start", async () => {
let fetchCount = 0
const source: FeedSource = {
id: "counter",
...noActions,
async fetchContext() {
fetchCount++
return null
},
async fetchItems() {
return [
{
id: `item-${fetchCount}`,
type: "test",
priority: 0.5,
timestamp: new Date(),
data: {},
},
]
},
}
const engine = new FeedEngine({ cacheTtlMs: 50 }).register(source)
engine.start()
// Wait for two TTL intervals to elapse
await new Promise((resolve) => setTimeout(resolve, 120))
// Should have auto-refreshed at least twice
expect(fetchCount).toBeGreaterThanOrEqual(2)
expect(engine.lastFeed()).not.toBeNull()
engine.stop()
})
test("stop cancels periodic refresh", async () => {
let fetchCount = 0
const source: FeedSource = {
id: "counter",
...noActions,
async fetchContext() {
fetchCount++
return null
},
}
const engine = new FeedEngine({ cacheTtlMs: 50 }).register(source)
engine.start()
engine.stop()
const countAfterStop = fetchCount
// Wait past TTL
await new Promise((resolve) => setTimeout(resolve, 80))
// No additional fetches after stop
expect(fetchCount).toBe(countAfterStop)
})
test("reactive update resets periodic refresh timer", async () => {
let fetchCount = 0
const location = createLocationSource()
const countingWeather: FeedSource<WeatherFeedItem> = {
id: "weather",
dependencies: ["location"],
...noActions,
async fetchContext(ctx) {
fetchCount++
const loc = ctx.get(LocationKey)
if (!loc) return null
return [[WeatherKey, { temperature: 20, condition: "sunny" }]]
},
async fetchItems(ctx) {
const weather = ctx.get(WeatherKey)
if (!weather) return []
return [
{
id: `weather-${Date.now()}`,
type: "weather",
priority: 0.5,
timestamp: new Date(),
data: { temperature: weather.temperature, condition: weather.condition },
},
]
},
}
const engine = new FeedEngine({ cacheTtlMs: 100 })
.register(location)
.register(countingWeather)
engine.start()
// At 40ms, push a reactive update — this resets the timer
await new Promise((resolve) => setTimeout(resolve, 40))
const countBeforeUpdate = fetchCount
location.simulateUpdate({ lat: 51.5, lng: -0.1 })
await new Promise((resolve) => setTimeout(resolve, 20))
// Reactive update triggered a fetch
expect(fetchCount).toBeGreaterThan(countBeforeUpdate)
const countAfterUpdate = fetchCount
// At 100ms from start (60ms after reactive update), the original
// timer would have fired, but it was reset. No extra fetch yet.
await new Promise((resolve) => setTimeout(resolve, 40))
expect(fetchCount).toBe(countAfterUpdate)
engine.stop()
})
})
})

View File

@@ -0,0 +1,599 @@
import type { ActionDefinition } from "./action"
import type { ContextEntry } from "./context"
import type { FeedItem } from "./feed"
import type { FeedPostProcessor, ItemGroup } from "./feed-post-processor"
import type { FeedSource } from "./feed-source"
import { Context } from "./context"
export interface SourceError {
sourceId: string
error: Error
}
export interface FeedResult<TItem extends FeedItem = FeedItem> {
context: Context
items: TItem[]
errors: SourceError[]
/** Item groups produced by post-processors */
groupedItems?: ItemGroup[]
}
export type FeedSubscriber<TItem extends FeedItem = FeedItem> = (result: FeedResult<TItem>) => void
const DEFAULT_CACHE_TTL_MS = 300_000 // 5 minutes
const MIN_CACHE_TTL_MS = 10 // prevent spin from zero/negative values
export interface FeedEngineConfig {
/** Cache TTL in milliseconds. Default: 300_000 (5 minutes). Minimum: 10. */
cacheTtlMs?: number
}
interface SourceGraph {
sources: Map<string, FeedSource>
sorted: FeedSource[]
dependents: Map<string, string[]>
}
/**
* Orchestrates FeedSources, managing the dependency graph and context flow.
*
* Sources declare dependencies on other sources. The engine:
* - Validates the dependency graph (no missing deps, no cycles)
* - Runs fetchContext() in topological order during refresh
* - Runs fetchItems() on all sources with accumulated context
* - Subscribes to reactive updates via onContextUpdate/onItemsUpdate
*
* @example
* ```ts
* const engine = new FeedEngine()
* .register(locationSource)
* .register(weatherSource)
* .register(alertSource)
*
* // Pull-based refresh
* const { context, items, errors } = await engine.refresh()
*
* // Reactive updates
* engine.subscribe((result) => {
* console.log(result.items)
* })
* engine.start()
*
* // Cleanup
* engine.stop()
* ```
*/
export class FeedEngine<TItems extends FeedItem = FeedItem> {
private sources = new Map<string, FeedSource>()
private graph: SourceGraph | null = null
private context: Context = new Context()
private subscribers = new Set<FeedSubscriber<TItems>>()
private cleanups: Array<() => void> = []
private started = false
private postProcessors: FeedPostProcessor[] = []
private readonly cacheTtlMs: number
private cachedResult: FeedResult<TItems> | null = null
private cachedAt: number | null = null
private refreshTimer: ReturnType<typeof setTimeout> | null = null
constructor(config?: FeedEngineConfig) {
this.cacheTtlMs = Math.max(config?.cacheTtlMs ?? DEFAULT_CACHE_TTL_MS, MIN_CACHE_TTL_MS)
}
/**
* Returns the cached FeedResult if available and not expired.
* Returns null if no refresh has completed or the cache TTL has elapsed.
*/
lastFeed(): FeedResult<TItems> | null {
if (this.cachedResult === null || this.cachedAt === null) {
return null
}
if (Date.now() - this.cachedAt > this.cacheTtlMs) {
return null
}
return this.cachedResult
}
/**
* Registers a FeedSource. Invalidates the cached graph.
*/
register<TItem extends FeedItem>(source: FeedSource<TItem>): FeedEngine<TItems | TItem> {
this.sources.set(source.id, source)
this.graph = null
return this as FeedEngine<TItems | TItem>
}
/**
* Unregisters a FeedSource by ID. Invalidates the cached graph.
*/
unregister(sourceId: string): this {
this.sources.delete(sourceId)
this.graph = null
return this
}
/**
* Registers a post-processor. Processors run in registration order
* after items are collected, on every update path.
*/
registerPostProcessor(processor: FeedPostProcessor): this {
this.postProcessors.push(processor)
return this
}
/**
* Unregisters a post-processor by reference.
*/
unregisterPostProcessor(processor: FeedPostProcessor): this {
this.postProcessors = this.postProcessors.filter((p) => p !== processor)
return this
}
/**
* Refreshes the feed by running all sources in dependency order.
* Calls fetchContext() then fetchItems() on each source.
*/
async refresh(): Promise<FeedResult<TItems>> {
const graph = this.ensureGraph()
const errors: SourceError[] = []
// Reset context with fresh time
const context = new Context()
// Run fetchContext in topological order
for (const source of graph.sorted) {
try {
const entries = await source.fetchContext(context)
if (entries) {
context.set(entries)
}
} catch (err) {
errors.push({
sourceId: source.id,
error: err instanceof Error ? err : new Error(String(err)),
})
}
}
// Run fetchItems on all sources
const items: FeedItem[] = []
for (const source of graph.sorted) {
if (source.fetchItems) {
try {
const sourceItems = await source.fetchItems(context)
items.push(...sourceItems)
} catch (err) {
errors.push({
sourceId: source.id,
error: err instanceof Error ? err : new Error(String(err)),
})
}
}
}
this.context = context
const {
items: processedItems,
groupedItems,
errors: postProcessorErrors,
} = await this.applyPostProcessors(items as TItems[], context, errors)
const result: FeedResult<TItems> = {
context,
items: processedItems,
errors: postProcessorErrors,
...(groupedItems.length > 0 ? { groupedItems } : {}),
}
this.updateCache(result)
return result
}
/**
* Subscribes to feed updates. Returns unsubscribe function.
*/
subscribe(callback: FeedSubscriber<TItems>): () => void {
this.subscribers.add(callback)
return () => {
this.subscribers.delete(callback)
}
}
/**
* Starts reactive subscriptions on all sources and begins periodic refresh.
* Sources with onContextUpdate will trigger re-computation of dependents.
*/
start(): void {
if (this.started) return
this.started = true
const graph = this.ensureGraph()
for (const source of graph.sorted) {
if (source.onContextUpdate) {
const cleanup = source.onContextUpdate(
(entries) => {
this.handleContextUpdate(source.id, entries)
},
() => this.context,
)
this.cleanups.push(cleanup)
}
if (source.onItemsUpdate) {
const cleanup = source.onItemsUpdate(
() => {
this.scheduleRefresh()
},
() => this.context,
)
this.cleanups.push(cleanup)
}
}
this.scheduleNextRefresh()
}
/**
* Stops all reactive subscriptions and the periodic refresh timer.
*/
stop(): void {
this.started = false
this.cancelScheduledRefresh()
for (const cleanup of this.cleanups) {
cleanup()
}
this.cleanups = []
}
/**
* Returns the current accumulated context.
*/
currentContext(): Context {
return this.context
}
/**
* Execute an action on a registered source.
* Validates the action exists before dispatching.
*
* In pull-only mode (before `start()` is called), the action mutates source
* state but does not automatically refresh dependents. Call `refresh()`
* after to propagate changes. In reactive mode (`start()` called), sources
* that push context updates (e.g., LocationSource) will trigger dependent
* refresh automatically.
*/
async executeAction(sourceId: string, actionId: string, params: unknown): Promise<unknown> {
const actions = await this.listActions(sourceId)
if (!(actionId in actions)) {
throw new Error(`Action "${actionId}" not found on source "${sourceId}"`)
}
return this.sources.get(sourceId)!.executeAction(actionId, params)
}
/**
* List actions available on a specific source.
* Validates that action definition IDs match their record keys.
*/
async listActions(sourceId: string): Promise<Record<string, ActionDefinition>> {
const source = this.sources.get(sourceId)
if (!source) {
throw new Error(`Source not found: ${sourceId}`)
}
const actions = await source.listActions()
for (const [key, definition] of Object.entries(actions)) {
if (key !== definition.id) {
throw new Error(
`Action ID mismatch on source "${sourceId}": key "${key}" !== definition.id "${definition.id}"`,
)
}
}
return actions
}
private async applyPostProcessors(
items: TItems[],
context: Context,
errors: SourceError[],
): Promise<{ items: TItems[]; groupedItems: ItemGroup[]; errors: SourceError[] }> {
let currentItems = items
const allGroupedItems: ItemGroup[] = []
const allErrors = [...errors]
const boostScores = new Map<string, number>()
for (const processor of this.postProcessors) {
const snapshot = currentItems
try {
const enhancement = await processor(currentItems, context)
if (enhancement.additionalItems?.length) {
// Post-processors operate on FeedItem[] without knowledge of TItems.
// Additional items are merged untyped — this is intentional. The
// processor contract is "FeedItem in, FeedItem out"; type narrowing
// is the caller's responsibility when consuming FeedResult.
currentItems = [...currentItems, ...(enhancement.additionalItems as TItems[])]
}
if (enhancement.suppress?.length) {
const suppressSet = new Set(enhancement.suppress)
currentItems = currentItems.filter((item) => !suppressSet.has(item.id))
}
if (enhancement.groupedItems?.length) {
allGroupedItems.push(...enhancement.groupedItems)
}
if (enhancement.boost) {
for (const [id, score] of Object.entries(enhancement.boost)) {
boostScores.set(id, (boostScores.get(id) ?? 0) + score)
}
}
} catch (err) {
const sourceId = processor.name || "anonymous"
allErrors.push({
sourceId,
error: err instanceof Error ? err : new Error(String(err)),
})
currentItems = snapshot
}
}
// Apply boost reordering: positive-boost first (desc), then zero, then negative (desc).
// Stable sort within each tier preserves original relative order.
if (boostScores.size > 0) {
currentItems = applyBoostOrder(currentItems, boostScores)
}
// Remove stale item IDs from groups and drop empty groups
const itemIds = new Set(currentItems.map((item) => item.id))
const validGroups = allGroupedItems.reduce<ItemGroup[]>((acc, group) => {
const ids = group.itemIds.filter((id) => itemIds.has(id))
if (ids.length > 0) {
acc.push({ ...group, itemIds: ids })
}
return acc
}, [])
return { items: currentItems, groupedItems: validGroups, errors: allErrors }
}
private ensureGraph(): SourceGraph {
if (!this.graph) {
this.graph = buildGraph(Array.from(this.sources.values()))
}
return this.graph
}
private handleContextUpdate(sourceId: string, entries: readonly ContextEntry[]): void {
this.context.time = new Date()
this.context.set(entries)
// Re-run dependents and notify
this.refreshDependents(sourceId)
}
private async refreshDependents(sourceId: string): Promise<void> {
const graph = this.ensureGraph()
const toRefresh = this.collectDependents(sourceId, graph)
// Re-run fetchContext for dependents in order
for (const id of toRefresh) {
const source = graph.sources.get(id)
if (source) {
try {
const entries = await source.fetchContext(this.context)
if (entries) {
this.context.set(entries)
}
} catch {
// Errors during reactive updates are logged but don't stop propagation
}
}
}
// Collect items from all sources
const items: FeedItem[] = []
const errors: SourceError[] = []
for (const source of graph.sorted) {
if (source.fetchItems) {
try {
const sourceItems = await source.fetchItems(this.context)
items.push(...sourceItems)
} catch (err) {
errors.push({
sourceId: source.id,
error: err instanceof Error ? err : new Error(String(err)),
})
}
}
}
const {
items: processedItems,
groupedItems,
errors: postProcessorErrors,
} = await this.applyPostProcessors(items as TItems[], this.context, errors)
const result: FeedResult<TItems> = {
context: this.context,
items: processedItems,
errors: postProcessorErrors,
...(groupedItems.length > 0 ? { groupedItems } : {}),
}
this.updateCache(result)
this.notifySubscribers(result)
}
private collectDependents(sourceId: string, graph: SourceGraph): string[] {
const result: string[] = []
const visited = new Set<string>()
const collect = (id: string): void => {
const deps = graph.dependents.get(id) ?? []
for (const dep of deps) {
if (!visited.has(dep)) {
visited.add(dep)
result.push(dep)
collect(dep)
}
}
}
collect(sourceId)
// Return in topological order
return graph.sorted.filter((s) => result.includes(s.id)).map((s) => s.id)
}
private updateCache(result: FeedResult<TItems>): void {
this.cachedResult = result
this.cachedAt = Date.now()
if (this.started) {
this.scheduleNextRefresh()
}
}
private scheduleNextRefresh(): void {
this.cancelScheduledRefresh()
this.refreshTimer = setTimeout(() => {
this.refresh()
.then((result) => {
this.notifySubscribers(result)
})
.catch(() => {
// Periodic refresh errors are non-fatal; schedule next attempt
if (this.started) {
this.scheduleNextRefresh()
}
})
}, this.cacheTtlMs)
}
private cancelScheduledRefresh(): void {
if (this.refreshTimer !== null) {
clearTimeout(this.refreshTimer)
this.refreshTimer = null
}
}
private scheduleRefresh(): void {
// Simple immediate refresh for now - could add debouncing later
this.refresh()
.then((result) => {
this.notifySubscribers(result)
})
.catch(() => {
// Reactive refresh errors are non-fatal
})
}
private notifySubscribers(result: FeedResult<TItems>): void {
this.subscribers.forEach((callback) => {
try {
callback(result)
} catch {
// Subscriber errors shouldn't break other subscribers
}
})
}
}
function clamp(value: number, min: number, max: number): number {
return Math.min(max, Math.max(min, value))
}
function applyBoostOrder<T extends FeedItem>(items: T[], boostScores: Map<string, number>): T[] {
const positive: T[] = []
const neutral: T[] = []
const negative: T[] = []
for (const item of items) {
const raw = boostScores.get(item.id)
if (raw === undefined || raw === 0) {
neutral.push(item)
} else {
const clamped = clamp(raw, -1, 1)
if (clamped > 0) {
positive.push(item)
} else if (clamped < 0) {
negative.push(item)
} else {
neutral.push(item)
}
}
}
// Sort positive descending by boost, negative descending (least negative first, most negative last)
positive.sort((a, b) => {
const aScore = clamp(boostScores.get(a.id) ?? 0, -1, 1)
const bScore = clamp(boostScores.get(b.id) ?? 0, -1, 1)
return bScore - aScore
})
negative.sort((a, b) => {
const aScore = clamp(boostScores.get(a.id) ?? 0, -1, 1)
const bScore = clamp(boostScores.get(b.id) ?? 0, -1, 1)
return bScore - aScore
})
return [...positive, ...neutral, ...negative]
}
function buildGraph(sources: FeedSource[]): SourceGraph {
const byId = new Map<string, FeedSource>()
for (const source of sources) {
byId.set(source.id, source)
}
// Validate dependencies exist
for (const source of sources) {
for (const dep of source.dependencies ?? []) {
if (!byId.has(dep)) {
throw new Error(`Source "${source.id}" depends on "${dep}" which is not registered`)
}
}
}
// Check for cycles and topologically sort
const visited = new Set<string>()
const visiting = new Set<string>()
const sorted: FeedSource[] = []
function visit(id: string, path: string[]): void {
if (visiting.has(id)) {
const cycle = [...path.slice(path.indexOf(id)), id].join(" → ")
throw new Error(`Circular dependency detected: ${cycle}`)
}
if (visited.has(id)) return
visiting.add(id)
const source = byId.get(id)!
for (const dep of source.dependencies ?? []) {
visit(dep, [...path, id])
}
visiting.delete(id)
visited.add(id)
sorted.push(source)
}
for (const source of sources) {
visit(source.id, [])
}
// Build reverse dependency map
const dependents = new Map<string, string[]>()
for (const source of sources) {
for (const dep of source.dependencies ?? []) {
const list = dependents.get(dep) ?? []
list.push(source.id)
dependents.set(dep, list)
}
}
return { sources: byId, sorted, dependents }
}

View File

@@ -0,0 +1,602 @@
import { describe, expect, mock, test } from "bun:test"
import type {
ActionDefinition,
ContextEntry,
FeedItem,
FeedPostProcessor,
FeedSource,
} from "./index"
import { FeedEngine } from "./feed-engine"
import { UnknownActionError } from "./index"
// No-op action methods for test sources
const noActions = {
async listActions(): Promise<Record<string, ActionDefinition>> {
return {}
},
async executeAction(actionId: string): Promise<void> {
throw new UnknownActionError(actionId)
},
}
// =============================================================================
// FEED ITEMS
// =============================================================================
type WeatherItem = FeedItem<"weather", { temp: number }>
type CalendarItem = FeedItem<"calendar", { title: string }>
function weatherItem(id: string, temp: number): WeatherItem {
return { id, type: "weather", timestamp: new Date(), data: { temp } }
}
function calendarItem(id: string, title: string): CalendarItem {
return { id, type: "calendar", timestamp: new Date(), data: { title } }
}
// =============================================================================
// TEST SOURCES
// =============================================================================
function createWeatherSource(items: WeatherItem[]) {
return {
id: "aelis.weather",
...noActions,
async fetchContext() {
return null
},
async fetchItems(): Promise<WeatherItem[]> {
return items
},
}
}
function createCalendarSource(items: CalendarItem[]) {
return {
id: "aelis.calendar",
...noActions,
async fetchContext() {
return null
},
async fetchItems(): Promise<CalendarItem[]> {
return items
},
}
}
// =============================================================================
// REGISTRATION
// =============================================================================
describe("FeedPostProcessor", () => {
describe("registration", () => {
test("registerPostProcessor is chainable", () => {
const engine = new FeedEngine()
const processor: FeedPostProcessor = async () => ({})
const result = engine.registerPostProcessor(processor)
expect(result).toBe(engine)
})
test("unregisterPostProcessor is chainable", () => {
const engine = new FeedEngine()
const processor: FeedPostProcessor = async () => ({})
const result = engine.unregisterPostProcessor(processor)
expect(result).toBe(engine)
})
test("unregistered processor does not run", async () => {
const processor = mock(async () => ({}))
const engine = new FeedEngine()
.register(createWeatherSource([weatherItem("w1", 20)]))
.registerPostProcessor(processor)
.unregisterPostProcessor(processor)
await engine.refresh()
expect(processor).not.toHaveBeenCalled()
})
})
// =============================================================================
// ADDITIONAL ITEMS
// =============================================================================
describe("additionalItems", () => {
test("injects additional items into the feed", async () => {
const extra = calendarItem("c1", "Meeting")
const engine = new FeedEngine()
.register(createWeatherSource([weatherItem("w1", 20)]))
.registerPostProcessor(async () => ({ additionalItems: [extra] }))
const result = await engine.refresh()
expect(result.items).toHaveLength(2)
expect(result.items.find((i) => i.id === "c1")).toBeDefined()
})
})
// =============================================================================
// SUPPRESS
// =============================================================================
describe("suppress", () => {
test("removes suppressed items from the feed", async () => {
const engine = new FeedEngine()
.register(createWeatherSource([weatherItem("w1", 20), weatherItem("w2", 25)]))
.registerPostProcessor(async () => ({ suppress: ["w1"] }))
const result = await engine.refresh()
expect(result.items).toHaveLength(1)
expect(result.items[0].id).toBe("w2")
})
test("suppressing nonexistent ID is a no-op", async () => {
const engine = new FeedEngine()
.register(createWeatherSource([weatherItem("w1", 20)]))
.registerPostProcessor(async () => ({ suppress: ["nonexistent"] }))
const result = await engine.refresh()
expect(result.items).toHaveLength(1)
})
})
// =============================================================================
// GROUPED ITEMS
// =============================================================================
describe("groupedItems", () => {
test("accumulates grouped items on FeedResult", async () => {
const engine = new FeedEngine()
.register(
createCalendarSource([calendarItem("c1", "Meeting A"), calendarItem("c2", "Meeting B")]),
)
.registerPostProcessor(async () => ({
groupedItems: [{ itemIds: ["c1", "c2"], summary: "Busy afternoon" }],
}))
const result = await engine.refresh()
expect(result.groupedItems).toEqual([{ itemIds: ["c1", "c2"], summary: "Busy afternoon" }])
})
test("multiple processors accumulate groups", async () => {
const engine = new FeedEngine()
.register(
createCalendarSource([calendarItem("c1", "Meeting A"), calendarItem("c2", "Meeting B")]),
)
.registerPostProcessor(async () => ({
groupedItems: [{ itemIds: ["c1"], summary: "Group A" }],
}))
.registerPostProcessor(async () => ({
groupedItems: [{ itemIds: ["c2"], summary: "Group B" }],
}))
const result = await engine.refresh()
expect(result.groupedItems).toEqual([
{ itemIds: ["c1"], summary: "Group A" },
{ itemIds: ["c2"], summary: "Group B" },
])
})
test("stale item IDs are removed from groups after suppression", async () => {
const engine = new FeedEngine()
.register(
createCalendarSource([calendarItem("c1", "Meeting A"), calendarItem("c2", "Meeting B")]),
)
.registerPostProcessor(async () => ({
groupedItems: [{ itemIds: ["c1", "c2"], summary: "Afternoon" }],
}))
.registerPostProcessor(async () => ({ suppress: ["c1"] }))
const result = await engine.refresh()
expect(result.groupedItems).toEqual([{ itemIds: ["c2"], summary: "Afternoon" }])
})
test("groups with all items suppressed are dropped", async () => {
const engine = new FeedEngine()
.register(createCalendarSource([calendarItem("c1", "Meeting A")]))
.registerPostProcessor(async () => ({
groupedItems: [{ itemIds: ["c1"], summary: "Solo" }],
}))
.registerPostProcessor(async () => ({ suppress: ["c1"] }))
const result = await engine.refresh()
expect(result.groupedItems).toBeUndefined()
})
test("groupedItems is omitted when no processors produce groups", async () => {
const engine = new FeedEngine()
.register(createWeatherSource([weatherItem("w1", 20)]))
.registerPostProcessor(async () => ({}))
const result = await engine.refresh()
expect(result.groupedItems).toBeUndefined()
})
})
// =============================================================================
// BOOST
// =============================================================================
describe("boost", () => {
test("positive boost moves item before non-boosted items", async () => {
const engine = new FeedEngine()
.register(createWeatherSource([weatherItem("w1", 20), weatherItem("w2", 25)]))
.registerPostProcessor(async () => ({ boost: { w2: 0.8 } }))
const result = await engine.refresh()
expect(result.items.map((i) => i.id)).toEqual(["w2", "w1"])
})
test("negative boost moves item after non-boosted items", async () => {
const engine = new FeedEngine()
.register(createWeatherSource([weatherItem("w1", 20), weatherItem("w2", 25)]))
.registerPostProcessor(async () => ({ boost: { w1: -0.5 } }))
const result = await engine.refresh()
expect(result.items.map((i) => i.id)).toEqual(["w2", "w1"])
})
test("multiple boosted items are sorted by boost descending", async () => {
const engine = new FeedEngine()
.register(
createWeatherSource([
weatherItem("w1", 20),
weatherItem("w2", 25),
weatherItem("w3", 30),
]),
)
.registerPostProcessor(async () => ({
boost: { w3: 0.3, w1: 0.9 },
}))
const result = await engine.refresh()
// w1 (0.9) first, w3 (0.3) second, w2 (no boost) last
expect(result.items.map((i) => i.id)).toEqual(["w1", "w3", "w2"])
})
test("multiple processors accumulate boost scores", async () => {
const engine = new FeedEngine()
.register(createWeatherSource([weatherItem("w1", 20), weatherItem("w2", 25)]))
.registerPostProcessor(async () => ({ boost: { w1: 0.3 } }))
.registerPostProcessor(async () => ({ boost: { w1: 0.4 } }))
const result = await engine.refresh()
// w1 accumulated boost = 0.7, moves before w2
expect(result.items.map((i) => i.id)).toEqual(["w1", "w2"])
})
test("accumulated boost is clamped to [-1, 1]", async () => {
const engine = new FeedEngine()
.register(
createWeatherSource([
weatherItem("w1", 20),
weatherItem("w2", 25),
weatherItem("w3", 30),
]),
)
.registerPostProcessor(async () => ({ boost: { w1: 0.8, w2: 0.9 } }))
.registerPostProcessor(async () => ({ boost: { w1: 0.8 } }))
const result = await engine.refresh()
// w1 accumulated = 1.6 clamped to 1, w2 = 0.9 — w1 still first
expect(result.items.map((i) => i.id)).toEqual(["w1", "w2", "w3"])
})
test("out-of-range boost values are clamped", async () => {
const engine = new FeedEngine()
.register(createWeatherSource([weatherItem("w1", 20), weatherItem("w2", 25)]))
.registerPostProcessor(async () => ({ boost: { w1: 5.0 } }))
const result = await engine.refresh()
// Clamped to 1, still boosted to front
expect(result.items.map((i) => i.id)).toEqual(["w1", "w2"])
})
test("boosting a suppressed item is a no-op", async () => {
const engine = new FeedEngine()
.register(createWeatherSource([weatherItem("w1", 20), weatherItem("w2", 25)]))
.registerPostProcessor(async () => ({
suppress: ["w1"],
boost: { w1: 1.0 },
}))
const result = await engine.refresh()
expect(result.items).toHaveLength(1)
expect(result.items[0].id).toBe("w2")
})
test("boosting a nonexistent item ID is a no-op", async () => {
const engine = new FeedEngine()
.register(createWeatherSource([weatherItem("w1", 20)]))
.registerPostProcessor(async () => ({ boost: { nonexistent: 1.0 } }))
const result = await engine.refresh()
expect(result.items).toHaveLength(1)
expect(result.items[0].id).toBe("w1")
})
test("items with equal boost retain original relative order", async () => {
const engine = new FeedEngine()
.register(
createWeatherSource([
weatherItem("w1", 20),
weatherItem("w2", 25),
weatherItem("w3", 30),
]),
)
.registerPostProcessor(async () => ({
boost: { w1: 0.5, w3: 0.5 },
}))
const result = await engine.refresh()
// w1 and w3 have equal boost — original order preserved: w1 before w3
expect(result.items.map((i) => i.id)).toEqual(["w1", "w3", "w2"])
})
test("negative boosts preserve relative order among demoted items", async () => {
const engine = new FeedEngine()
.register(
createWeatherSource([
weatherItem("w1", 20),
weatherItem("w2", 25),
weatherItem("w3", 30),
]),
)
.registerPostProcessor(async () => ({
boost: { w1: -0.3, w2: -0.3 },
}))
const result = await engine.refresh()
// w3 (neutral) first, then w1 and w2 (equal negative) in original order
expect(result.items.map((i) => i.id)).toEqual(["w3", "w1", "w2"])
})
test("boost works alongside additionalItems and groupedItems", async () => {
const extra = calendarItem("c1", "Meeting")
const engine = new FeedEngine()
.register(createWeatherSource([weatherItem("w1", 20), weatherItem("w2", 25)]))
.registerPostProcessor(async () => ({
additionalItems: [extra],
boost: { c1: 1.0 },
groupedItems: [{ itemIds: ["w1", "c1"], summary: "Related" }],
}))
const result = await engine.refresh()
// c1 boosted to front
expect(result.items[0].id).toBe("c1")
expect(result.items).toHaveLength(3)
expect(result.groupedItems).toEqual([{ itemIds: ["w1", "c1"], summary: "Related" }])
})
})
// =============================================================================
// PIPELINE ORDERING
// =============================================================================
describe("pipeline ordering", () => {
test("each processor sees items as modified by the previous processor", async () => {
const seen: string[] = []
const engine = new FeedEngine()
.register(createWeatherSource([weatherItem("w1", 20)]))
.registerPostProcessor(async () => ({
additionalItems: [calendarItem("c1", "Injected")],
}))
.registerPostProcessor(async (items) => {
seen.push(...items.map((i) => i.id))
return {}
})
await engine.refresh()
expect(seen).toEqual(["w1", "c1"])
})
test("suppression in first processor affects second processor", async () => {
const seen: string[] = []
const engine = new FeedEngine()
.register(createWeatherSource([weatherItem("w1", 20), weatherItem("w2", 25)]))
.registerPostProcessor(async () => ({ suppress: ["w1"] }))
.registerPostProcessor(async (items) => {
seen.push(...items.map((i) => i.id))
return {}
})
await engine.refresh()
expect(seen).toEqual(["w2"])
})
})
// =============================================================================
// ERROR HANDLING
// =============================================================================
describe("error handling", () => {
test("throwing processor is recorded in errors and pipeline continues", async () => {
const seen: string[] = []
async function failingProcessor(): Promise<never> {
throw new Error("processor failed")
}
const engine = new FeedEngine()
.register(createWeatherSource([weatherItem("w1", 20)]))
.registerPostProcessor(failingProcessor)
.registerPostProcessor(async (items) => {
seen.push(...items.map((i) => i.id))
return {}
})
const result = await engine.refresh()
const ppError = result.errors.find((e) => e.sourceId === "failingProcessor")
expect(ppError).toBeDefined()
expect(ppError!.error.message).toBe("processor failed")
// Pipeline continued — observer still saw the original item
expect(seen).toEqual(["w1"])
expect(result.items).toHaveLength(1)
})
test("anonymous throwing processor uses 'anonymous' as sourceId", async () => {
const engine = new FeedEngine()
.register(createWeatherSource([weatherItem("w1", 20)]))
.registerPostProcessor(async () => {
throw new Error("anon failed")
})
const result = await engine.refresh()
const ppError = result.errors.find((e) => e.sourceId === "anonymous")
expect(ppError).toBeDefined()
})
test("non-Error throw is wrapped", async () => {
async function failingProcessor(): Promise<never> {
throw "string error"
}
const engine = new FeedEngine()
.register(createWeatherSource([weatherItem("w1", 20)]))
.registerPostProcessor(failingProcessor)
const result = await engine.refresh()
const ppError = result.errors.find((e) => e.sourceId === "failingProcessor")
expect(ppError).toBeDefined()
expect(ppError!.error).toBeInstanceOf(Error)
})
})
// =============================================================================
// REACTIVE PATHS
// =============================================================================
describe("reactive updates", () => {
test("post-processors run during reactive context updates", async () => {
let callCount = 0
let triggerUpdate: ((entries: readonly ContextEntry[]) => void) | null = null
const source: FeedSource = {
id: "aelis.reactive",
...noActions,
async fetchContext() {
return null
},
async fetchItems() {
return [weatherItem("w1", 20)]
},
onContextUpdate(callback, _getContext) {
triggerUpdate = callback
return () => {
triggerUpdate = null
}
},
}
const engine = new FeedEngine().register(source).registerPostProcessor(async () => {
callCount++
return {}
})
engine.start()
// Wait for initial periodic refresh
await new Promise((resolve) => setTimeout(resolve, 50))
const countAfterStart = callCount
// Trigger a reactive context update
triggerUpdate!([])
await new Promise((resolve) => setTimeout(resolve, 50))
expect(callCount).toBeGreaterThan(countAfterStart)
engine.stop()
})
test("post-processors run during reactive item updates", async () => {
let callCount = 0
let triggerItemsUpdate: ((items: FeedItem[]) => void) | null = null
const source: FeedSource = {
id: "aelis.reactive",
...noActions,
async fetchContext() {
return null
},
async fetchItems() {
return [weatherItem("w1", 20)]
},
onItemsUpdate(callback, _getContext) {
triggerItemsUpdate = callback
return () => {
triggerItemsUpdate = null
}
},
}
const engine = new FeedEngine().register(source).registerPostProcessor(async () => {
callCount++
return {}
})
engine.start()
await new Promise((resolve) => setTimeout(resolve, 50))
const countAfterStart = callCount
// Trigger a reactive items update
triggerItemsUpdate!([weatherItem("w1", 25)])
await new Promise((resolve) => setTimeout(resolve, 50))
expect(callCount).toBeGreaterThan(countAfterStart)
engine.stop()
})
})
// =============================================================================
// NO PROCESSORS = NO CHANGE
// =============================================================================
describe("no processors", () => {
test("engine without post-processors returns raw items unchanged", async () => {
const items = [weatherItem("w1", 20), weatherItem("w2", 25)]
const engine = new FeedEngine().register(createWeatherSource(items))
const result = await engine.refresh()
expect(result.items).toHaveLength(2)
expect(result.items[0].id).toBe("w1")
expect(result.items[1].id).toBe("w2")
expect(result.groupedItems).toBeUndefined()
})
})
// =============================================================================
// COMBINED ENHANCEMENT
// =============================================================================
describe("combined enhancement", () => {
test("single processor can use all enhancement fields at once", async () => {
const engine = new FeedEngine()
.register(createWeatherSource([weatherItem("w1", 20), weatherItem("w2", 25)]))
.registerPostProcessor(async () => ({
additionalItems: [calendarItem("c1", "Injected")],
suppress: ["w2"],
groupedItems: [{ itemIds: ["w1", "c1"], summary: "Related" }],
}))
const result = await engine.refresh()
// w2 suppressed, c1 injected → w1 + c1
expect(result.items).toHaveLength(2)
expect(result.items.map((i) => i.id)).toEqual(["w1", "c1"])
// Groups on result
expect(result.groupedItems).toEqual([{ itemIds: ["w1", "c1"], summary: "Related" }])
})
})
})

View File

@@ -0,0 +1,26 @@
import type { Context } from "./context"
import type { FeedItem } from "./feed"
export interface ItemGroup {
/** IDs of items to present together */
itemIds: string[]
/** Summary text for the group */
summary: string
}
export interface FeedEnhancement {
/** New items to inject into the feed */
additionalItems?: FeedItem[]
/** Groups of items to present together with a summary */
groupedItems?: ItemGroup[]
/** Item IDs to remove from the feed */
suppress?: string[]
/** Map of item ID to boost score (-1 to 1). Positive promotes, negative demotes. */
boost?: Record<string, number>
}
/**
* A function that transforms feed items and produces enhancement directives.
* Use named functions for meaningful error attribution.
*/
export type FeedPostProcessor = (items: FeedItem[], context: Context) => Promise<FeedEnhancement>

View File

@@ -0,0 +1,492 @@
import { describe, expect, test } from "bun:test"
import type { ActionDefinition, ContextEntry, ContextKey, FeedItem, FeedSource } from "./index"
import { Context, TimeRelevance, UnknownActionError, contextKey } from "./index"
// No-op action methods for test sources
const noActions = {
async listActions(): Promise<Record<string, ActionDefinition>> {
return {}
},
async executeAction(actionId: string): Promise<void> {
throw new UnknownActionError(actionId)
},
}
// =============================================================================
// CONTEXT KEYS
// =============================================================================
interface Location {
lat: number
lng: number
}
interface Weather {
temperature: number
condition: string
}
const LocationKey: ContextKey<Location> = contextKey("location")
const WeatherKey: ContextKey<Weather> = contextKey("weather")
// =============================================================================
// FEED ITEMS
// =============================================================================
type WeatherFeedItem = FeedItem<"weather", { temperature: number; condition: string }>
type AlertFeedItem = FeedItem<"alert", { message: string }>
// =============================================================================
// TEST HELPERS
// =============================================================================
interface SimulatedLocationSource extends FeedSource {
simulateUpdate(location: Location): void
}
function createLocationSource(): SimulatedLocationSource {
let callback: ((entries: readonly ContextEntry[]) => void) | null = null
let currentLocation: Location = { lat: 0, lng: 0 }
return {
id: "location",
...noActions,
onContextUpdate(cb) {
callback = cb
return () => {
callback = null
}
},
async fetchContext() {
return [[LocationKey, currentLocation]]
},
simulateUpdate(location: Location) {
currentLocation = location
callback?.([[LocationKey, location]])
},
}
}
function createWeatherSource(
fetchWeather: (location: Location) => Promise<Weather> = async () => ({
temperature: 20,
condition: "sunny",
}),
): FeedSource<WeatherFeedItem> {
return {
id: "weather",
dependencies: ["location"],
...noActions,
async fetchContext(context) {
const location = context.get(LocationKey)
if (!location) return null
const weather = await fetchWeather(location)
return [[WeatherKey, weather]]
},
async fetchItems(context) {
const weather = context.get(WeatherKey)
if (!weather) return []
return [
{
id: `weather-${Date.now()}`,
type: "weather",
timestamp: new Date(),
data: {
temperature: weather.temperature,
condition: weather.condition,
},
signals: { urgency: 0.5, timeRelevance: TimeRelevance.Ambient },
},
]
},
}
}
function createAlertSource(): FeedSource<AlertFeedItem> {
return {
id: "alert",
dependencies: ["weather"],
...noActions,
async fetchContext() {
return null
},
async fetchItems(context) {
const weather = context.get(WeatherKey)
if (!weather) return []
if (weather.condition === "storm") {
return [
{
id: "alert-storm",
type: "alert",
timestamp: new Date(),
data: { message: "Storm warning!" },
signals: { urgency: 1.0, timeRelevance: TimeRelevance.Imminent },
},
]
}
return []
},
}
}
// =============================================================================
// GRAPH SIMULATION (until FeedController is updated)
// =============================================================================
interface SourceGraph {
sources: Map<string, FeedSource>
sorted: FeedSource[]
dependents: Map<string, string[]>
}
function buildGraph(sources: FeedSource[]): SourceGraph {
const byId = new Map<string, FeedSource>()
for (const source of sources) {
byId.set(source.id, source)
}
// Validate dependencies exist
for (const source of sources) {
for (const dep of source.dependencies ?? []) {
if (!byId.has(dep)) {
throw new Error(`Source "${source.id}" depends on "${dep}" which is not registered`)
}
}
}
// Check for cycles and topologically sort
const visited = new Set<string>()
const visiting = new Set<string>()
const sorted: FeedSource[] = []
function visit(id: string, path: string[]): void {
if (visiting.has(id)) {
const cycle = [...path.slice(path.indexOf(id)), id].join(" → ")
throw new Error(`Circular dependency detected: ${cycle}`)
}
if (visited.has(id)) return
visiting.add(id)
const source = byId.get(id)!
for (const dep of source.dependencies ?? []) {
visit(dep, [...path, id])
}
visiting.delete(id)
visited.add(id)
sorted.push(source)
}
for (const source of sources) {
visit(source.id, [])
}
// Build reverse dependency map
const dependents = new Map<string, string[]>()
for (const source of sources) {
for (const dep of source.dependencies ?? []) {
const list = dependents.get(dep) ?? []
list.push(source.id)
dependents.set(dep, list)
}
}
return { sources: byId, sorted, dependents }
}
async function refreshGraph(graph: SourceGraph): Promise<{ context: Context; items: FeedItem[] }> {
const context = new Context()
// Run fetchContext in topological order
for (const source of graph.sorted) {
const entries = await source.fetchContext(context)
if (entries) {
context.set(entries)
}
}
// Run fetchItems on all sources
const items: FeedItem[] = []
for (const source of graph.sorted) {
if (source.fetchItems) {
const sourceItems = await source.fetchItems(context)
items.push(...sourceItems)
}
}
return { context, items }
}
// =============================================================================
// TESTS
// =============================================================================
describe("FeedSource", () => {
describe("interface", () => {
test("source with only context production", () => {
const source = createLocationSource()
expect(source.id).toBe("location")
expect(source.dependencies).toBeUndefined()
expect(source.fetchContext).toBeDefined()
expect(source.onContextUpdate).toBeDefined()
expect(source.fetchItems).toBeUndefined()
})
test("source with dependencies and both context and items", () => {
const source = createWeatherSource()
expect(source.id).toBe("weather")
expect(source.dependencies).toEqual(["location"])
expect(source.fetchContext).toBeDefined()
expect(source.fetchItems).toBeDefined()
})
test("source with only item production", () => {
const source = createAlertSource()
expect(source.id).toBe("alert")
expect(source.dependencies).toEqual(["weather"])
expect(source.fetchContext).toBeDefined()
expect(source.fetchItems).toBeDefined()
})
test("source without context returns null from fetchContext", async () => {
const source = createAlertSource()
const result = await source.fetchContext(new Context())
expect(result).toBeNull()
})
})
describe("graph validation", () => {
test("validates all dependencies exist", () => {
const orphan: FeedSource = {
id: "orphan",
dependencies: ["nonexistent"],
...noActions,
async fetchContext() {
return null
},
}
expect(() => buildGraph([orphan])).toThrow(
'Source "orphan" depends on "nonexistent" which is not registered',
)
})
test("detects circular dependencies", () => {
const a: FeedSource = {
id: "a",
dependencies: ["b"],
...noActions,
async fetchContext() {
return null
},
}
const b: FeedSource = {
id: "b",
dependencies: ["a"],
...noActions,
async fetchContext() {
return null
},
}
expect(() => buildGraph([a, b])).toThrow("Circular dependency detected: a → b → a")
})
test("detects longer cycles", () => {
const a: FeedSource = {
id: "a",
dependencies: ["c"],
...noActions,
async fetchContext() {
return null
},
}
const b: FeedSource = {
id: "b",
dependencies: ["a"],
...noActions,
async fetchContext() {
return null
},
}
const c: FeedSource = {
id: "c",
dependencies: ["b"],
...noActions,
async fetchContext() {
return null
},
}
expect(() => buildGraph([a, b, c])).toThrow("Circular dependency detected")
})
test("topologically sorts sources", () => {
const location = createLocationSource()
const weather = createWeatherSource()
const alert = createAlertSource()
// Register in wrong order
const graph = buildGraph([alert, weather, location])
expect(graph.sorted.map((s) => s.id)).toEqual(["location", "weather", "alert"])
})
test("builds reverse dependency map", () => {
const location = createLocationSource()
const weather = createWeatherSource()
const alert = createAlertSource()
const graph = buildGraph([location, weather, alert])
expect(graph.dependents.get("location")).toEqual(["weather"])
expect(graph.dependents.get("weather")).toEqual(["alert"])
expect(graph.dependents.get("alert")).toBeUndefined()
})
})
describe("graph refresh", () => {
test("runs fetchContext in dependency order", async () => {
const order: string[] = []
const location: FeedSource = {
id: "location",
...noActions,
async fetchContext() {
order.push("location")
return [[LocationKey, { lat: 51.5, lng: -0.1 }]]
},
}
const weather: FeedSource = {
id: "weather",
dependencies: ["location"],
...noActions,
async fetchContext(ctx) {
order.push("weather")
const loc = ctx.get(LocationKey)
expect(loc).toBeDefined()
return [[WeatherKey, { temperature: 20, condition: "sunny" }]]
},
}
const graph = buildGraph([weather, location])
await refreshGraph(graph)
expect(order).toEqual(["location", "weather"])
})
test("accumulates context across sources", async () => {
const location = createLocationSource()
location.simulateUpdate({ lat: 51.5, lng: -0.1 })
const weather = createWeatherSource()
const graph = buildGraph([location, weather])
const { context } = await refreshGraph(graph)
expect(context.get(LocationKey)).toEqual({
lat: 51.5,
lng: -0.1,
})
expect(context.get(WeatherKey)).toEqual({
temperature: 20,
condition: "sunny",
})
})
test("collects items from all sources", async () => {
const location = createLocationSource()
location.simulateUpdate({ lat: 51.5, lng: -0.1 })
const weather = createWeatherSource()
const graph = buildGraph([location, weather])
const { items } = await refreshGraph(graph)
expect(items).toHaveLength(1)
expect(items[0]!.type).toBe("weather")
})
test("downstream source receives upstream context", async () => {
const location = createLocationSource()
location.simulateUpdate({ lat: 51.5, lng: -0.1 })
const weather = createWeatherSource(async () => ({
temperature: 15,
condition: "storm",
}))
const alert = createAlertSource()
const graph = buildGraph([location, weather, alert])
const { items } = await refreshGraph(graph)
expect(items).toHaveLength(2)
// Items returned in topological order (weather before alert)
expect(items[0]!.type).toBe("weather")
expect(items[1]!.type).toBe("alert")
// Signals preserved for post-processors
expect(items[0]!.signals?.urgency).toBe(0.5)
expect(items[1]!.signals?.urgency).toBe(1.0)
})
test("source without location context returns empty items", async () => {
const location: FeedSource = {
id: "location",
...noActions,
async fetchContext() {
return null
},
}
const weather = createWeatherSource()
const graph = buildGraph([location, weather])
const { context, items } = await refreshGraph(graph)
expect(context.get(WeatherKey)).toBeUndefined()
expect(items).toHaveLength(0)
})
})
describe("reactive updates", () => {
test("onContextUpdate receives callback and returns cleanup", () => {
const location = createLocationSource()
let updateCount = 0
const cleanup = location.onContextUpdate!(
() => {
updateCount++
},
() => new Context(),
)
location.simulateUpdate({ lat: 1, lng: 1 })
expect(updateCount).toBe(1)
location.simulateUpdate({ lat: 2, lng: 2 })
expect(updateCount).toBe(2)
cleanup()
location.simulateUpdate({ lat: 3, lng: 3 })
expect(updateCount).toBe(2) // no more updates after cleanup
})
})
})

View File

@@ -0,0 +1,86 @@
import type { ActionDefinition } from "./action"
import type { Context, ContextEntry } from "./context"
import type { FeedItem } from "./feed"
/**
* Unified interface for sources that provide context, feed items, and actions.
*
* Sources form a dependency graph — a source declares which other sources
* it depends on, and the graph ensures dependencies are resolved before
* dependents run.
*
* Source IDs use reverse domain notation. Built-in sources use `aelis.<name>`,
* third parties use their own domain (e.g., `com.spotify`).
*
* Every method maps to a protocol operation for remote source support:
* - `id`, `dependencies` → source/describe
* - `listActions()` → source/listActions
* - `executeAction()` → source/executeAction
* - `fetchContext()` → source/fetchContext
* - `fetchItems()` → source/fetchItems
* - `onContextUpdate()` → source/contextUpdated (notification)
* - `onItemsUpdate()` → source/itemsUpdated (notification)
*
* @example
* ```ts
* const locationSource: FeedSource = {
* id: "aelis.location",
* async listActions() { return { "update-location": { id: "update-location" } } },
* async executeAction(actionId) { throw new UnknownActionError(actionId) },
* async fetchContext() { ... },
* }
* ```
*/
export interface FeedSource<TItem extends FeedItem = FeedItem> {
/** Unique identifier for this source in reverse-domain format */
readonly id: string
/** IDs of sources this source depends on */
readonly dependencies?: readonly string[]
/**
* List actions this source supports. Empty record if none.
* Maps to: source/listActions
*/
listActions(): Promise<Record<string, ActionDefinition>>
/**
* Execute an action by ID. Throws on unknown action or invalid input.
* Maps to: source/executeAction
*/
executeAction(actionId: string, params: unknown): Promise<unknown>
/**
* Subscribe to reactive context updates.
* Called when the source can push context changes proactively.
* Returns cleanup function.
* Maps to: source/contextUpdated (notification, source → host)
*/
onContextUpdate?(
callback: (entries: readonly ContextEntry[]) => void,
getContext: () => Context,
): () => void
/**
* Fetch context on-demand.
* Called during manual refresh or initial load.
* Return null if this source cannot provide context.
* Maps to: source/fetchContext
*/
fetchContext(context: Context): Promise<readonly ContextEntry[] | null>
/**
* Subscribe to reactive feed item updates.
* Called when the source can push item changes proactively.
* Returns cleanup function.
* Maps to: source/itemsUpdated (notification, source → host)
*/
onItemsUpdate?(callback: (items: TItem[]) => void, getContext: () => Context): () => void
/**
* Fetch feed items on-demand.
* Called during manual refresh or when dependencies update.
* Maps to: source/fetchItems
*/
fetchItems?(context: Context): Promise<TItem[]>
}

View File

@@ -0,0 +1,87 @@
import { describe, expect, test } from "bun:test"
import type { FeedItem, Slot } from "./feed"
describe("FeedItem slots", () => {
test("FeedItem without slots is valid", () => {
const item: FeedItem<"test", { value: number }> = {
id: "test-1",
type: "test",
timestamp: new Date(),
data: { value: 42 },
}
expect(item.slots).toBeUndefined()
})
test("FeedItem with unfilled slots", () => {
const item: FeedItem<"weather", { temp: number }> = {
id: "weather-1",
type: "weather",
timestamp: new Date(),
data: { temp: 18 },
slots: {
insight: {
description: "A short contextual insight about the current weather",
content: null,
},
"cross-source": {
description: "Connection between weather and calendar events",
content: null,
},
},
}
expect(item.slots).toBeDefined()
expect(Object.keys(item.slots!)).toEqual(["insight", "cross-source"])
expect(item.slots!.insight!.content).toBeNull()
expect(item.slots!["cross-source"]!.content).toBeNull()
})
test("FeedItem with filled slots", () => {
const item: FeedItem<"weather", { temp: number }> = {
id: "weather-1",
type: "weather",
timestamp: new Date(),
data: { temp: 18 },
slots: {
insight: {
description: "A short contextual insight about the current weather",
content: "Rain after 3pm — grab a jacket before your walk",
},
},
}
expect(item.slots!.insight!.content).toBe("Rain after 3pm — grab a jacket before your walk")
})
test("Slot interface enforces required fields", () => {
const slot: Slot = {
description: "Test slot description",
content: null,
}
expect(slot.description).toBe("Test slot description")
expect(slot.content).toBeNull()
const filledSlot: Slot = {
description: "Test slot description",
content: "Filled content",
}
expect(filledSlot.content).toBe("Filled content")
})
test("FeedItem with empty slots record", () => {
const item: FeedItem<"test", { value: number }> = {
id: "test-1",
type: "test",
timestamp: new Date(),
data: { value: 1 },
slots: {},
}
expect(item.slots).toEqual({})
expect(Object.keys(item.slots!)).toHaveLength(0)
})
})

View File

@@ -0,0 +1,78 @@
/**
* Source-provided hints for post-processors.
*
* Sources express domain-specific relevance without determining final ranking.
* Post-processors consume these signals alongside other inputs (user affinity,
* time of day, interaction history) to produce the final feed order.
*/
export const TimeRelevance = {
/** Needs attention now (e.g., event starting in minutes, severe alert) */
Imminent: "imminent",
/** Relevant soon (e.g., event in the next hour, approaching deadline) */
Upcoming: "upcoming",
/** Background information (e.g., daily forecast, low-priority status) */
Ambient: "ambient",
} as const
export type TimeRelevance = (typeof TimeRelevance)[keyof typeof TimeRelevance]
export interface FeedItemSignals {
/** Source-assessed urgency (0-1). Post-processors use this as one ranking input. */
urgency?: number
/** How time-sensitive this item is relative to now. */
timeRelevance?: TimeRelevance
}
/**
* A named slot for LLM-fillable content on a feed item.
*
* Sources declare slots with a description that tells the LLM what content
* to generate. The enhancement harness fills `content` asynchronously;
* until then it remains `null`.
*/
export interface Slot {
/** Tells the LLM what this slot wants — written by the source */
description: string
/** LLM-filled text content, null until enhanced */
content: string | null
}
/**
* A single item in the feed.
*
* @example
* ```ts
* type WeatherItem = FeedItem<"weather", { temp: number; condition: string }>
*
* const item: WeatherItem = {
* id: "weather-123",
* type: "weather",
* timestamp: new Date(),
* data: { temp: 18, condition: "cloudy" },
* signals: { urgency: 0.5, timeRelevance: "ambient" },
* slots: {
* insight: {
* description: "A short contextual insight about the current weather",
* content: null,
* },
* },
* }
* ```
*/
export interface FeedItem<
TType extends string = string,
TData extends Record<string, unknown> = Record<string, unknown>,
> {
/** Unique identifier */
id: string
/** Item type, matches the data source type */
type: TType
/** When this item was generated */
timestamp: Date
/** Type-specific payload */
data: TData
/** Source-provided hints for post-processors. Optional — omit if no signals apply. */
signals?: FeedItemSignals
/** Named slots for LLM-fillable content. Keys are slot names. */
slots?: Record<string, Slot>
}

View File

@@ -0,0 +1,50 @@
// Context
export type { ContextEntry, ContextKey, ContextKeyPart } from "./context"
export { Context, contextKey, serializeKey } from "./context"
// Actions
export type { ActionDefinition } from "./action"
export { UnknownActionError } from "./action"
// Feed
export type { FeedItem, FeedItemSignals, Slot } from "./feed"
export { TimeRelevance } from "./feed"
// Feed Source
export type { FeedSource } from "./feed-source"
// Feed Post-Processor
export type { FeedEnhancement, FeedPostProcessor, ItemGroup } from "./feed-post-processor"
// Feed Engine
export type { FeedEngineConfig, FeedResult, FeedSubscriber, SourceError } from "./feed-engine"
export { FeedEngine } from "./feed-engine"
// =============================================================================
// DEPRECATED - Use FeedSource + FeedEngine instead
// =============================================================================
// Data Source (deprecated - use FeedSource)
export type { DataSource } from "./data-source"
// Context Provider (deprecated - use FeedSource)
export type { ContextProvider } from "./context-provider"
// Context Bridge (deprecated - use FeedEngine)
export type { ProviderError, RefreshResult } from "./context-bridge"
export { ContextBridge } from "./context-bridge"
// Reconciler (deprecated - use FeedEngine)
export type {
ReconcileResult,
ReconcilerConfig,
SourceError as ReconcilerSourceError,
} from "./reconciler"
export { Reconciler } from "./reconciler"
// Feed Controller (deprecated - use FeedEngine)
export type {
FeedControllerConfig,
FeedSubscriber as FeedControllerSubscriber,
} from "./feed-controller"
export { FeedController } from "./feed-controller"

View File

@@ -0,0 +1,86 @@
import type { Context } from "./context"
import type { DataSource } from "./data-source"
import type { FeedItem } from "./feed"
export interface ReconcilerConfig {
timeout?: number
}
export interface SourceError {
sourceType: string
error: Error
}
export interface ReconcileResult<TItem extends FeedItem = FeedItem> {
items: TItem[]
errors: SourceError[]
}
interface RegisteredSource {
source: DataSource<FeedItem, unknown>
config: unknown
}
const DEFAULT_TIMEOUT = 5000
export class Reconciler<TItems extends FeedItem = never> {
private sources = new Map<string, RegisteredSource>()
private timeout: number
constructor(config?: ReconcilerConfig) {
this.timeout = config?.timeout ?? DEFAULT_TIMEOUT
}
register<TItem extends FeedItem, TConfig>(
source: DataSource<TItem, TConfig>,
config?: TConfig,
): Reconciler<TItems | TItem> {
this.sources.set(source.type, {
source: source as DataSource<FeedItem, unknown>,
config,
})
return this as Reconciler<TItems | TItem>
}
unregister<T extends TItems["type"]>(sourceType: T): Reconciler<Exclude<TItems, { type: T }>> {
this.sources.delete(sourceType)
return this as unknown as Reconciler<Exclude<TItems, { type: T }>>
}
async reconcile(context: Context): Promise<ReconcileResult<TItems>> {
const entries = Array.from(this.sources.values())
const results = await Promise.allSettled(
entries.map(({ source, config }) =>
withTimeout(source.query(context, config), this.timeout, source.type),
),
)
const items: FeedItem[] = []
const errors: SourceError[] = []
results.forEach((result, i) => {
const sourceType = entries[i]!.source.type
if (result.status === "fulfilled") {
items.push(...result.value)
} else {
errors.push({
sourceType,
error: result.reason instanceof Error ? result.reason : new Error(String(result.reason)),
})
}
})
return { items, errors } as ReconcileResult<TItems>
}
}
function withTimeout<T>(promise: Promise<T>, ms: number, sourceType: string): Promise<T> {
return Promise.race([
promise,
new Promise<never>((_, reject) =>
setTimeout(() => reject(new Error(`Source "${sourceType}" timed out after ${ms}ms`)), ms),
),
])
}