mirror of
https://github.com/kennethnym/aris.git
synced 2026-02-02 13:11:17 +00:00
feat(core): add FeedController orchestration layer
Adds orchestration for feed reconciliation with context-driven updates: - FeedController: holds context, debounces updates, reconciles sources - ContextBridge: bridges context providers to controller - ContextProvider: reactive + on-demand context value interface - Branded ContextKey<T> for type-safe context keys Moves source files to src/ directory and consolidates tests into integration test. Co-authored-by: Ona <no-reply@ona.com>
This commit is contained in:
79
packages/aris-core/src/context-bridge.ts
Normal file
79
packages/aris-core/src/context-bridge.ts
Normal file
@@ -0,0 +1,79 @@
|
||||
import type { Context } from "./context"
|
||||
import type { ContextProvider } from "./context-provider"
|
||||
|
||||
interface ContextUpdatable {
|
||||
pushContextUpdate(update: Partial<Context>): void
|
||||
}
|
||||
|
||||
/**
|
||||
* Bridges context providers to a feed controller.
|
||||
*
|
||||
* Subscribes to provider updates and forwards them to the controller.
|
||||
* Supports manual refresh to gather current values from all providers.
|
||||
*
|
||||
* @example
|
||||
* ```ts
|
||||
* const controller = new FeedController()
|
||||
* .addDataSource(new WeatherDataSource())
|
||||
* .addDataSource(new TflDataSource())
|
||||
*
|
||||
* const bridge = new ContextBridge(controller)
|
||||
* .addProvider(new LocationProvider())
|
||||
* .addProvider(new MusicProvider())
|
||||
*
|
||||
* // Manual refresh gathers from all providers
|
||||
* await bridge.refresh()
|
||||
*
|
||||
* // Cleanup
|
||||
* bridge.stop()
|
||||
* controller.stop()
|
||||
* ```
|
||||
*/
|
||||
export class ContextBridge {
|
||||
private controller: ContextUpdatable
|
||||
private providers = new Map<string, ContextProvider>()
|
||||
private cleanups: Array<() => void> = []
|
||||
|
||||
constructor(controller: ContextUpdatable) {
|
||||
this.controller = controller
|
||||
}
|
||||
|
||||
/**
|
||||
* Registers a context provider. Immediately subscribes to updates.
|
||||
*/
|
||||
addProvider<T>(provider: ContextProvider<T>): this {
|
||||
this.providers.set(provider.key, provider as ContextProvider)
|
||||
|
||||
const cleanup = provider.onUpdate((value) => {
|
||||
this.controller.pushContextUpdate({ [provider.key]: value })
|
||||
})
|
||||
this.cleanups.push(cleanup)
|
||||
|
||||
return this
|
||||
}
|
||||
|
||||
/**
|
||||
* Gathers current values from all providers and pushes to controller.
|
||||
* Use for manual refresh when user pulls to refresh.
|
||||
*/
|
||||
async refresh(): Promise<void> {
|
||||
const updates: Partial<Context> = {}
|
||||
|
||||
const entries = Array.from(this.providers.entries())
|
||||
const values = await Promise.all(entries.map(([_, provider]) => provider.getCurrentValue()))
|
||||
|
||||
entries.forEach(([key], i) => {
|
||||
updates[key] = values[i]
|
||||
})
|
||||
|
||||
this.controller.pushContextUpdate(updates)
|
||||
}
|
||||
|
||||
/**
|
||||
* Unsubscribes from all providers.
|
||||
*/
|
||||
stop(): void {
|
||||
this.cleanups.forEach((cleanup) => cleanup())
|
||||
this.cleanups = []
|
||||
}
|
||||
}
|
||||
35
packages/aris-core/src/context-provider.ts
Normal file
35
packages/aris-core/src/context-provider.ts
Normal file
@@ -0,0 +1,35 @@
|
||||
/**
|
||||
* Provides context values reactively and on-demand.
|
||||
*
|
||||
* Implementations push updates when values change (reactive) and
|
||||
* return current values when requested (for manual refresh).
|
||||
*
|
||||
* @example
|
||||
* ```ts
|
||||
* class LocationProvider implements ContextProvider<Location> {
|
||||
* readonly key = LocationKey
|
||||
*
|
||||
* onUpdate(callback: (value: Location) => void): () => void {
|
||||
* const watchId = navigator.geolocation.watchPosition(pos => {
|
||||
* callback({ lat: pos.coords.latitude, lng: pos.coords.longitude, accuracy: pos.coords.accuracy })
|
||||
* })
|
||||
* return () => navigator.geolocation.clearWatch(watchId)
|
||||
* }
|
||||
*
|
||||
* async getCurrentValue(): Promise<Location> {
|
||||
* const pos = await getCurrentPosition()
|
||||
* return { lat: pos.coords.latitude, lng: pos.coords.longitude, accuracy: pos.coords.accuracy }
|
||||
* }
|
||||
* }
|
||||
* ```
|
||||
*/
|
||||
export interface ContextProvider<T = unknown> {
|
||||
/** The context key this provider populates */
|
||||
readonly key: string
|
||||
|
||||
/** Subscribe to value changes. Returns cleanup function. */
|
||||
onUpdate(callback: (value: T) => void): () => void
|
||||
|
||||
/** Get current value on-demand (used for manual refresh). */
|
||||
getCurrentValue(): Promise<T>
|
||||
}
|
||||
46
packages/aris-core/src/context.ts
Normal file
46
packages/aris-core/src/context.ts
Normal file
@@ -0,0 +1,46 @@
|
||||
/**
|
||||
* Branded type for type-safe context keys.
|
||||
*
|
||||
* Each package defines its own keys with associated value types:
|
||||
* ```ts
|
||||
* const LocationKey: ContextKey<Location> = contextKey("location")
|
||||
* ```
|
||||
*/
|
||||
export type ContextKey<T> = string & { __contextValue?: T }
|
||||
|
||||
/**
|
||||
* Creates a typed context key.
|
||||
*
|
||||
* @example
|
||||
* ```ts
|
||||
* interface Location { lat: number; lng: number; accuracy: number }
|
||||
* const LocationKey: ContextKey<Location> = contextKey("location")
|
||||
* ```
|
||||
*/
|
||||
export function contextKey<T>(key: string): ContextKey<T> {
|
||||
return key as ContextKey<T>
|
||||
}
|
||||
|
||||
/**
|
||||
* Type-safe accessor for context values.
|
||||
*
|
||||
* @example
|
||||
* ```ts
|
||||
* const location = contextValue(context, LocationKey)
|
||||
* if (location) {
|
||||
* console.log(location.lat, location.lng)
|
||||
* }
|
||||
* ```
|
||||
*/
|
||||
export function contextValue<T>(context: Context, key: ContextKey<T>): T | undefined {
|
||||
return context[key] as T | undefined
|
||||
}
|
||||
|
||||
/**
|
||||
* Arbitrary key-value bag representing the current state.
|
||||
* Always includes `time`. Other keys are added by context providers.
|
||||
*/
|
||||
export interface Context {
|
||||
time: Date
|
||||
[key: string]: unknown
|
||||
}
|
||||
35
packages/aris-core/src/data-source.ts
Normal file
35
packages/aris-core/src/data-source.ts
Normal file
@@ -0,0 +1,35 @@
|
||||
import type { Context } from "./context"
|
||||
import type { FeedItem } from "./feed"
|
||||
|
||||
/**
|
||||
* Produces feed items from an external source.
|
||||
*
|
||||
* @example
|
||||
* ```ts
|
||||
* type WeatherItem = FeedItem<"weather", { temp: number }>
|
||||
*
|
||||
* class WeatherDataSource implements DataSource<WeatherItem> {
|
||||
* readonly type = "weather"
|
||||
*
|
||||
* async query(context: Context): Promise<WeatherItem[]> {
|
||||
* const location = contextValue(context, LocationKey)
|
||||
* if (!location) return []
|
||||
* const data = await fetchWeather(location)
|
||||
* return [{
|
||||
* id: `weather-${Date.now()}`,
|
||||
* type: this.type,
|
||||
* priority: 0.5,
|
||||
* timestamp: context.time,
|
||||
* data: { temp: data.temperature },
|
||||
* }]
|
||||
* }
|
||||
* }
|
||||
* ```
|
||||
*/
|
||||
export interface DataSource<TItem extends FeedItem = FeedItem, TConfig = unknown> {
|
||||
/** Unique identifier for this source type */
|
||||
readonly type: TItem["type"]
|
||||
|
||||
/** Queries the source and returns feed items */
|
||||
query(context: Context, config: TConfig): Promise<TItem[]>
|
||||
}
|
||||
161
packages/aris-core/src/feed-controller.ts
Normal file
161
packages/aris-core/src/feed-controller.ts
Normal file
@@ -0,0 +1,161 @@
|
||||
import type { Context } from "./context"
|
||||
import type { DataSource } from "./data-source"
|
||||
import type { FeedItem } from "./feed"
|
||||
import type { ReconcileResult } from "./reconciler"
|
||||
|
||||
import { Reconciler } from "./reconciler"
|
||||
|
||||
export interface FeedControllerConfig {
|
||||
/** Timeout for each data source query in milliseconds */
|
||||
timeout?: number
|
||||
/** Debounce window for batching context updates (default: 100ms) */
|
||||
debounceMs?: number
|
||||
/** Initial context state */
|
||||
initialContext?: Context
|
||||
}
|
||||
|
||||
export type FeedSubscriber<TItems extends FeedItem> = (result: ReconcileResult<TItems>) => void
|
||||
|
||||
interface RegisteredSource {
|
||||
source: DataSource<FeedItem, unknown>
|
||||
config: unknown
|
||||
}
|
||||
|
||||
const DEFAULT_DEBOUNCE_MS = 100
|
||||
|
||||
/**
|
||||
* Orchestrates feed reconciliation in response to context updates.
|
||||
*
|
||||
* Holds context state, debounces updates, queries data sources, and
|
||||
* notifies subscribers. Each user should have their own instance.
|
||||
*
|
||||
* @example
|
||||
* ```ts
|
||||
* const controller = new FeedController({ debounceMs: 100 })
|
||||
* .addDataSource(new WeatherDataSource())
|
||||
* .addDataSource(new TflDataSource())
|
||||
*
|
||||
* controller.subscribe((result) => {
|
||||
* console.log(result.items)
|
||||
* })
|
||||
*
|
||||
* // Context update triggers debounced reconcile
|
||||
* controller.pushContextUpdate({ [LocationKey]: location })
|
||||
*
|
||||
* // Direct reconcile (no debounce)
|
||||
* const result = await controller.reconcile()
|
||||
*
|
||||
* // Cleanup
|
||||
* controller.stop()
|
||||
* ```
|
||||
*/
|
||||
export class FeedController<TItems extends FeedItem = never> {
|
||||
private sources = new Map<string, RegisteredSource>()
|
||||
private subscribers = new Set<FeedSubscriber<TItems>>()
|
||||
private context: Context
|
||||
private debounceMs: number
|
||||
private timeout: number | undefined
|
||||
private pendingTimeout: ReturnType<typeof setTimeout> | null = null
|
||||
private stopped = false
|
||||
|
||||
constructor(config?: FeedControllerConfig) {
|
||||
this.context = config?.initialContext ?? { time: new Date() }
|
||||
this.debounceMs = config?.debounceMs ?? DEFAULT_DEBOUNCE_MS
|
||||
this.timeout = config?.timeout
|
||||
}
|
||||
|
||||
/** Registers a data source. */
|
||||
addDataSource<TItem extends FeedItem, TConfig>(
|
||||
source: DataSource<TItem, TConfig>,
|
||||
config?: TConfig,
|
||||
): FeedController<TItems | TItem> {
|
||||
this.sources.set(source.type, {
|
||||
source: source as DataSource<FeedItem, unknown>,
|
||||
config,
|
||||
})
|
||||
return this as FeedController<TItems | TItem>
|
||||
}
|
||||
|
||||
/** Removes a data source by type. */
|
||||
removeDataSource<T extends TItems["type"]>(
|
||||
sourceType: T,
|
||||
): FeedController<Exclude<TItems, { type: T }>> {
|
||||
this.sources.delete(sourceType)
|
||||
return this as unknown as FeedController<Exclude<TItems, { type: T }>>
|
||||
}
|
||||
|
||||
/** Stops the controller and cancels pending reconciles. */
|
||||
stop(): void {
|
||||
this.stopped = true
|
||||
|
||||
if (this.pendingTimeout) {
|
||||
clearTimeout(this.pendingTimeout)
|
||||
this.pendingTimeout = null
|
||||
}
|
||||
}
|
||||
|
||||
/** Merges update into context and schedules a debounced reconcile. */
|
||||
pushContextUpdate(update: Partial<Context>): void {
|
||||
this.context = { ...this.context, ...update, time: new Date() }
|
||||
this.scheduleReconcile()
|
||||
}
|
||||
|
||||
/** Subscribes to feed updates. Returns unsubscribe function. */
|
||||
subscribe(callback: FeedSubscriber<TItems>): () => void {
|
||||
this.subscribers.add(callback)
|
||||
|
||||
return () => {
|
||||
this.subscribers.delete(callback)
|
||||
}
|
||||
}
|
||||
|
||||
/** Immediately reconciles with current or provided context. */
|
||||
async reconcile(context?: Context): Promise<ReconcileResult<TItems>> {
|
||||
const ctx = context ?? this.context
|
||||
const reconciler = this.createReconciler()
|
||||
return reconciler.reconcile(ctx)
|
||||
}
|
||||
|
||||
/** Returns current context. */
|
||||
getContext(): Context {
|
||||
return this.context
|
||||
}
|
||||
|
||||
private scheduleReconcile(): void {
|
||||
if (this.pendingTimeout) return
|
||||
|
||||
this.pendingTimeout = setTimeout(() => {
|
||||
this.flushPending()
|
||||
}, this.debounceMs)
|
||||
}
|
||||
|
||||
private async flushPending(): Promise<void> {
|
||||
this.pendingTimeout = null
|
||||
|
||||
if (this.stopped) return
|
||||
if (this.sources.size === 0) return
|
||||
|
||||
const reconciler = this.createReconciler()
|
||||
const result = await reconciler.reconcile(this.context)
|
||||
|
||||
this.notifySubscribers(result)
|
||||
}
|
||||
|
||||
private createReconciler(): Reconciler<TItems> {
|
||||
const reconciler = new Reconciler<TItems>({ timeout: this.timeout })
|
||||
Array.from(this.sources.values()).forEach(({ source, config }) => {
|
||||
reconciler.register(source, config)
|
||||
})
|
||||
return reconciler as Reconciler<TItems>
|
||||
}
|
||||
|
||||
private notifySubscribers(result: ReconcileResult<TItems>): void {
|
||||
this.subscribers.forEach((callback) => {
|
||||
try {
|
||||
callback(result)
|
||||
} catch {
|
||||
// Subscriber errors shouldn't break other subscribers
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
31
packages/aris-core/src/feed.ts
Normal file
31
packages/aris-core/src/feed.ts
Normal file
@@ -0,0 +1,31 @@
|
||||
/**
|
||||
* A single item in the feed.
|
||||
*
|
||||
* @example
|
||||
* ```ts
|
||||
* type WeatherItem = FeedItem<"weather", { temp: number; condition: string }>
|
||||
*
|
||||
* const item: WeatherItem = {
|
||||
* id: "weather-123",
|
||||
* type: "weather",
|
||||
* priority: 0.5,
|
||||
* timestamp: new Date(),
|
||||
* data: { temp: 18, condition: "cloudy" },
|
||||
* }
|
||||
* ```
|
||||
*/
|
||||
export interface FeedItem<
|
||||
TType extends string = string,
|
||||
TData extends Record<string, unknown> = Record<string, unknown>,
|
||||
> {
|
||||
/** Unique identifier */
|
||||
id: string
|
||||
/** Item type, matches the data source type */
|
||||
type: TType
|
||||
/** Sort priority (higher = more important, shown first) */
|
||||
priority: number
|
||||
/** When this item was generated */
|
||||
timestamp: Date
|
||||
/** Type-specific payload */
|
||||
data: TData
|
||||
}
|
||||
23
packages/aris-core/src/index.ts
Normal file
23
packages/aris-core/src/index.ts
Normal file
@@ -0,0 +1,23 @@
|
||||
// Context
|
||||
export type { Context, ContextKey } from "./context"
|
||||
export { contextKey, contextValue } from "./context"
|
||||
|
||||
// Feed
|
||||
export type { FeedItem } from "./feed"
|
||||
|
||||
// Data Source
|
||||
export type { DataSource } from "./data-source"
|
||||
|
||||
// Context Provider
|
||||
export type { ContextProvider } from "./context-provider"
|
||||
|
||||
// Context Bridge
|
||||
export { ContextBridge } from "./context-bridge"
|
||||
|
||||
// Reconciler
|
||||
export type { ReconcileResult, ReconcilerConfig, SourceError } from "./reconciler"
|
||||
export { Reconciler } from "./reconciler"
|
||||
|
||||
// Feed Controller
|
||||
export type { FeedControllerConfig, FeedSubscriber } from "./feed-controller"
|
||||
export { FeedController } from "./feed-controller"
|
||||
336
packages/aris-core/src/integration.test.ts
Normal file
336
packages/aris-core/src/integration.test.ts
Normal file
@@ -0,0 +1,336 @@
|
||||
import { afterEach, describe, expect, test } from "bun:test"
|
||||
|
||||
import type { ContextKey, ContextProvider, DataSource, FeedItem } from "./index"
|
||||
|
||||
import { contextKey, contextValue, ContextBridge, FeedController } from "./index"
|
||||
|
||||
// =============================================================================
|
||||
// CONTEXT KEYS
|
||||
// =============================================================================
|
||||
|
||||
interface Location {
|
||||
lat: number
|
||||
lng: number
|
||||
accuracy: number
|
||||
}
|
||||
|
||||
interface CurrentTrack {
|
||||
trackId: string
|
||||
title: string
|
||||
artist: string
|
||||
startedAt: Date
|
||||
}
|
||||
|
||||
const LocationKey: ContextKey<Location> = contextKey("location")
|
||||
const CurrentTrackKey: ContextKey<CurrentTrack> = contextKey("currentTrack")
|
||||
|
||||
// =============================================================================
|
||||
// DATA SOURCES
|
||||
// =============================================================================
|
||||
|
||||
type WeatherItem = FeedItem<"weather", { temp: number; condition: string }>
|
||||
|
||||
function createWeatherSource(): DataSource<WeatherItem> {
|
||||
return {
|
||||
type: "weather",
|
||||
async query(context) {
|
||||
const location = contextValue(context, LocationKey)
|
||||
if (!location) return []
|
||||
return [
|
||||
{
|
||||
id: `weather-${Date.now()}`,
|
||||
type: "weather",
|
||||
priority: 0.5,
|
||||
timestamp: context.time,
|
||||
data: { temp: 18, condition: "cloudy" },
|
||||
},
|
||||
]
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
type TflItem = FeedItem<"tfl-alert", { line: string; status: string }>
|
||||
|
||||
function createTflSource(): DataSource<TflItem> {
|
||||
return {
|
||||
type: "tfl-alert",
|
||||
async query(context) {
|
||||
const location = contextValue(context, LocationKey)
|
||||
if (!location) return []
|
||||
return [
|
||||
{
|
||||
id: "tfl-victoria-delays",
|
||||
type: "tfl-alert",
|
||||
priority: 0.8,
|
||||
timestamp: context.time,
|
||||
data: { line: "Victoria", status: "Minor delays" },
|
||||
},
|
||||
]
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
type MusicContextItem = FeedItem<"music-context", { suggestion: string }>
|
||||
|
||||
function createMusicContextSource(): DataSource<MusicContextItem> {
|
||||
return {
|
||||
type: "music-context",
|
||||
async query(context) {
|
||||
const track = contextValue(context, CurrentTrackKey)
|
||||
if (!track) return []
|
||||
return [
|
||||
{
|
||||
id: `music-ctx-${track.trackId}`,
|
||||
type: "music-context",
|
||||
priority: 0.3,
|
||||
timestamp: context.time,
|
||||
data: { suggestion: `You might also like similar artists to ${track.artist}` },
|
||||
},
|
||||
]
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// =============================================================================
|
||||
// CONTEXT PROVIDERS
|
||||
// =============================================================================
|
||||
|
||||
interface SimulatedLocationProvider extends ContextProvider<Location> {
|
||||
simulateUpdate(location: Location): void
|
||||
}
|
||||
|
||||
function createLocationProvider(): SimulatedLocationProvider {
|
||||
let callback: ((value: Location) => void) | null = null
|
||||
let currentLocation: Location = { lat: 0, lng: 0, accuracy: 0 }
|
||||
|
||||
return {
|
||||
key: LocationKey,
|
||||
onUpdate(cb) {
|
||||
callback = cb
|
||||
return () => {
|
||||
callback = null
|
||||
}
|
||||
},
|
||||
async getCurrentValue() {
|
||||
return currentLocation
|
||||
},
|
||||
simulateUpdate(location: Location) {
|
||||
currentLocation = location
|
||||
callback?.(location)
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// =============================================================================
|
||||
// HELPERS
|
||||
// =============================================================================
|
||||
|
||||
function delay(ms: number): Promise<void> {
|
||||
return new Promise((resolve) => setTimeout(resolve, ms))
|
||||
}
|
||||
|
||||
type AppFeedItem = WeatherItem | TflItem | MusicContextItem
|
||||
|
||||
// =============================================================================
|
||||
// TESTS
|
||||
// =============================================================================
|
||||
|
||||
describe("Integration", () => {
|
||||
let controller: FeedController<AppFeedItem>
|
||||
let bridge: ContextBridge
|
||||
let locationProvider: SimulatedLocationProvider
|
||||
|
||||
afterEach(() => {
|
||||
bridge?.stop()
|
||||
controller?.stop()
|
||||
})
|
||||
|
||||
test("location update triggers feed with location-dependent sources", async () => {
|
||||
controller = new FeedController<AppFeedItem>({ debounceMs: 10 })
|
||||
.addDataSource(createWeatherSource())
|
||||
.addDataSource(createTflSource())
|
||||
.addDataSource(createMusicContextSource())
|
||||
|
||||
locationProvider = createLocationProvider()
|
||||
bridge = new ContextBridge(controller).addProvider(locationProvider)
|
||||
|
||||
const results: Array<{ items: AppFeedItem[] }> = []
|
||||
controller.subscribe((result) => {
|
||||
results.push({ items: [...result.items] })
|
||||
})
|
||||
|
||||
locationProvider.simulateUpdate({ lat: 51.5074, lng: -0.1278, accuracy: 10 })
|
||||
await delay(50)
|
||||
|
||||
expect(results).toHaveLength(1)
|
||||
expect(results[0]!.items).toHaveLength(2) // weather + tfl, no music
|
||||
expect(results[0]!.items.map((i) => i.type).sort()).toEqual(["tfl-alert", "weather"])
|
||||
})
|
||||
|
||||
test("music change triggers feed with music-dependent source", async () => {
|
||||
controller = new FeedController<AppFeedItem>({ debounceMs: 10 })
|
||||
.addDataSource(createWeatherSource())
|
||||
.addDataSource(createTflSource())
|
||||
.addDataSource(createMusicContextSource())
|
||||
|
||||
locationProvider = createLocationProvider()
|
||||
bridge = new ContextBridge(controller).addProvider(locationProvider)
|
||||
|
||||
// Set initial location
|
||||
locationProvider.simulateUpdate({ lat: 51.5074, lng: -0.1278, accuracy: 10 })
|
||||
await delay(50)
|
||||
|
||||
const results: Array<{ items: AppFeedItem[] }> = []
|
||||
controller.subscribe((result) => {
|
||||
results.push({ items: [...result.items] })
|
||||
})
|
||||
|
||||
// Push music change directly to controller
|
||||
controller.pushContextUpdate({
|
||||
[CurrentTrackKey]: {
|
||||
trackId: "track-456",
|
||||
title: "Bohemian Rhapsody",
|
||||
artist: "Queen",
|
||||
startedAt: new Date(),
|
||||
},
|
||||
})
|
||||
await delay(50)
|
||||
|
||||
expect(results).toHaveLength(1)
|
||||
expect(results[0]!.items).toHaveLength(3) // weather + tfl + music
|
||||
expect(results[0]!.items.map((i) => i.type).sort()).toEqual([
|
||||
"music-context",
|
||||
"tfl-alert",
|
||||
"weather",
|
||||
])
|
||||
|
||||
const musicItem = results[0]!.items.find((i) => i.type === "music-context") as MusicContextItem
|
||||
expect(musicItem.data.suggestion).toContain("Queen")
|
||||
})
|
||||
|
||||
test("manual refresh gathers from all providers and reconciles", async () => {
|
||||
controller = new FeedController<AppFeedItem>({ debounceMs: 10 })
|
||||
.addDataSource(createWeatherSource())
|
||||
.addDataSource(createTflSource())
|
||||
|
||||
locationProvider = createLocationProvider()
|
||||
// Set location without triggering update
|
||||
locationProvider.simulateUpdate({ lat: 40.7128, lng: -74.006, accuracy: 5 })
|
||||
|
||||
// Clear the callback so simulateUpdate doesn't trigger reconcile
|
||||
const originalOnUpdate = locationProvider.onUpdate
|
||||
locationProvider.onUpdate = (cb) => {
|
||||
return originalOnUpdate(cb)
|
||||
}
|
||||
|
||||
bridge = new ContextBridge(controller).addProvider(locationProvider)
|
||||
|
||||
const results: Array<{ items: AppFeedItem[] }> = []
|
||||
controller.subscribe((result) => {
|
||||
results.push({ items: [...result.items] })
|
||||
})
|
||||
|
||||
// Manual refresh should gather current location and reconcile
|
||||
await bridge.refresh()
|
||||
await delay(50)
|
||||
|
||||
expect(results).toHaveLength(1)
|
||||
expect(results[0]!.items).toHaveLength(2)
|
||||
|
||||
const ctx = controller.getContext()
|
||||
expect(contextValue(ctx, LocationKey)).toEqual({ lat: 40.7128, lng: -74.006, accuracy: 5 })
|
||||
})
|
||||
|
||||
test("context accumulates across multiple updates", async () => {
|
||||
controller = new FeedController<AppFeedItem>({ debounceMs: 10 })
|
||||
.addDataSource(createWeatherSource())
|
||||
.addDataSource(createMusicContextSource())
|
||||
|
||||
locationProvider = createLocationProvider()
|
||||
bridge = new ContextBridge(controller).addProvider(locationProvider)
|
||||
|
||||
// Location update
|
||||
locationProvider.simulateUpdate({ lat: 51.5074, lng: -0.1278, accuracy: 10 })
|
||||
await delay(50)
|
||||
|
||||
// Music update
|
||||
controller.pushContextUpdate({
|
||||
[CurrentTrackKey]: {
|
||||
trackId: "track-789",
|
||||
title: "Stairway to Heaven",
|
||||
artist: "Led Zeppelin",
|
||||
startedAt: new Date(),
|
||||
},
|
||||
})
|
||||
await delay(50)
|
||||
|
||||
const ctx = controller.getContext()
|
||||
expect(contextValue(ctx, LocationKey)).toEqual({ lat: 51.5074, lng: -0.1278, accuracy: 10 })
|
||||
expect(contextValue(ctx, CurrentTrackKey)?.artist).toBe("Led Zeppelin")
|
||||
})
|
||||
|
||||
test("items are sorted by priority descending", async () => {
|
||||
controller = new FeedController<AppFeedItem>({ debounceMs: 10 })
|
||||
.addDataSource(createWeatherSource()) // priority 0.5
|
||||
.addDataSource(createTflSource()) // priority 0.8
|
||||
.addDataSource(createMusicContextSource()) // priority 0.3
|
||||
|
||||
locationProvider = createLocationProvider()
|
||||
bridge = new ContextBridge(controller).addProvider(locationProvider)
|
||||
|
||||
locationProvider.simulateUpdate({ lat: 51.5074, lng: -0.1278, accuracy: 10 })
|
||||
|
||||
controller.pushContextUpdate({
|
||||
[CurrentTrackKey]: {
|
||||
trackId: "track-1",
|
||||
title: "Test",
|
||||
artist: "Test",
|
||||
startedAt: new Date(),
|
||||
},
|
||||
})
|
||||
await delay(50)
|
||||
|
||||
const result = await controller.reconcile()
|
||||
|
||||
expect(result.items[0]!.type).toBe("tfl-alert") // 0.8
|
||||
expect(result.items[1]!.type).toBe("weather") // 0.5
|
||||
expect(result.items[2]!.type).toBe("music-context") // 0.3
|
||||
})
|
||||
|
||||
test("cleanup stops providers and pending reconciles", async () => {
|
||||
let queryCount = 0
|
||||
const trackingSource: DataSource<WeatherItem> = {
|
||||
type: "weather",
|
||||
async query(context) {
|
||||
queryCount++
|
||||
const location = contextValue(context, LocationKey)
|
||||
if (!location) return []
|
||||
return [
|
||||
{
|
||||
id: "weather-1",
|
||||
type: "weather",
|
||||
priority: 0.5,
|
||||
timestamp: context.time,
|
||||
data: { temp: 20, condition: "sunny" },
|
||||
},
|
||||
]
|
||||
},
|
||||
}
|
||||
|
||||
const ctrl = new FeedController<WeatherItem>({ debounceMs: 100 }).addDataSource(trackingSource)
|
||||
locationProvider = createLocationProvider()
|
||||
const br = new ContextBridge(ctrl).addProvider(locationProvider)
|
||||
|
||||
ctrl.subscribe(() => {})
|
||||
|
||||
// Trigger update but stop before debounce flushes
|
||||
locationProvider.simulateUpdate({ lat: 51.5, lng: -0.1, accuracy: 10 })
|
||||
|
||||
br.stop()
|
||||
ctrl.stop()
|
||||
|
||||
await delay(150)
|
||||
|
||||
expect(queryCount).toBe(0)
|
||||
})
|
||||
})
|
||||
88
packages/aris-core/src/reconciler.ts
Normal file
88
packages/aris-core/src/reconciler.ts
Normal file
@@ -0,0 +1,88 @@
|
||||
import type { Context } from "./context"
|
||||
import type { DataSource } from "./data-source"
|
||||
import type { FeedItem } from "./feed"
|
||||
|
||||
export interface ReconcilerConfig {
|
||||
timeout?: number
|
||||
}
|
||||
|
||||
export interface SourceError {
|
||||
sourceType: string
|
||||
error: Error
|
||||
}
|
||||
|
||||
export interface ReconcileResult<TItem extends FeedItem = FeedItem> {
|
||||
items: TItem[]
|
||||
errors: SourceError[]
|
||||
}
|
||||
|
||||
interface RegisteredSource {
|
||||
source: DataSource<FeedItem, unknown>
|
||||
config: unknown
|
||||
}
|
||||
|
||||
const DEFAULT_TIMEOUT = 5000
|
||||
|
||||
export class Reconciler<TItems extends FeedItem = never> {
|
||||
private sources = new Map<string, RegisteredSource>()
|
||||
private timeout: number
|
||||
|
||||
constructor(config?: ReconcilerConfig) {
|
||||
this.timeout = config?.timeout ?? DEFAULT_TIMEOUT
|
||||
}
|
||||
|
||||
register<TItem extends FeedItem, TConfig>(
|
||||
source: DataSource<TItem, TConfig>,
|
||||
config?: TConfig,
|
||||
): Reconciler<TItems | TItem> {
|
||||
this.sources.set(source.type, {
|
||||
source: source as DataSource<FeedItem, unknown>,
|
||||
config,
|
||||
})
|
||||
return this as Reconciler<TItems | TItem>
|
||||
}
|
||||
|
||||
unregister<T extends TItems["type"]>(sourceType: T): Reconciler<Exclude<TItems, { type: T }>> {
|
||||
this.sources.delete(sourceType)
|
||||
return this as unknown as Reconciler<Exclude<TItems, { type: T }>>
|
||||
}
|
||||
|
||||
async reconcile(context: Context): Promise<ReconcileResult<TItems>> {
|
||||
const entries = Array.from(this.sources.values())
|
||||
|
||||
const results = await Promise.allSettled(
|
||||
entries.map(({ source, config }) =>
|
||||
withTimeout(source.query(context, config), this.timeout, source.type),
|
||||
),
|
||||
)
|
||||
|
||||
const items: FeedItem[] = []
|
||||
const errors: SourceError[] = []
|
||||
|
||||
results.forEach((result, i) => {
|
||||
const sourceType = entries[i]!.source.type
|
||||
|
||||
if (result.status === "fulfilled") {
|
||||
items.push(...result.value)
|
||||
} else {
|
||||
errors.push({
|
||||
sourceType,
|
||||
error: result.reason instanceof Error ? result.reason : new Error(String(result.reason)),
|
||||
})
|
||||
}
|
||||
})
|
||||
|
||||
items.sort((a, b) => b.priority - a.priority)
|
||||
|
||||
return { items, errors } as ReconcileResult<TItems>
|
||||
}
|
||||
}
|
||||
|
||||
function withTimeout<T>(promise: Promise<T>, ms: number, sourceType: string): Promise<T> {
|
||||
return Promise.race([
|
||||
promise,
|
||||
new Promise<never>((_, reject) =>
|
||||
setTimeout(() => reject(new Error(`Source "${sourceType}" timed out after ${ms}ms`)), ms),
|
||||
),
|
||||
])
|
||||
}
|
||||
Reference in New Issue
Block a user