3 Commits

Author SHA1 Message Date
b73e603c90 feat(core): return RefreshResult from ContextBridge.refresh()
Surfaces provider errors through RefreshResult.errors instead of
silently ignoring them.

Co-authored-by: Ona <no-reply@ona.com>
2026-01-18 20:28:54 +00:00
037589cf4f refactor(core): rename getCurrentValue to fetchCurrentValue
Also use Promise.allSettled in ContextBridge.refresh() to handle
provider errors gracefully.

Co-authored-by: Ona <no-reply@ona.com>
2026-01-18 20:23:54 +00:00
3c16dd4275 feat(core): add FeedController orchestration layer
Adds orchestration for feed reconciliation with context-driven updates:

- FeedController: holds context, debounces updates, reconciles sources
- ContextBridge: bridges context providers to controller
- ContextProvider: reactive + on-demand context value interface
- Branded ContextKey<T> for type-safe context keys

Moves source files to src/ directory and consolidates tests into
integration test.

Co-authored-by: Ona <no-reply@ona.com>
2026-01-18 00:58:29 +00:00
19 changed files with 1017 additions and 284 deletions

View File

@@ -0,0 +1,219 @@
# @aris/core
Core orchestration layer for ARIS feed reconciliation.
## Overview
```mermaid
flowchart TB
subgraph Providers["Context Providers"]
LP[Location Provider]
MP[Music Provider]
end
subgraph Bridge["ContextBridge"]
direction TB
B1[Manages providers]
B2[Forwards updates]
B3[Gathers on refresh]
end
subgraph Controller["FeedController"]
direction TB
C1[Holds context]
C2[Debounces updates]
C3[Reconciles sources]
C4[Notifies subscribers]
end
subgraph Sources["Data Sources"]
WS[Weather]
TS[TFL]
CS[Calendar]
end
LP & MP --> Bridge
Bridge -->|pushContextUpdate| Controller
Controller -->|query| Sources
Controller -->|subscribe| Sub[Subscribers]
```
## Usage
### Define Context Keys
Each package defines its own typed context keys:
```ts
import { contextKey, type ContextKey } from "@aris/core"
interface Location {
lat: number
lng: number
accuracy: number
}
export const LocationKey: ContextKey<Location> = contextKey("location")
```
### Create Data Sources
Data sources query external APIs and return feed items:
```ts
import { contextValue, type Context, type DataSource, type FeedItem } from "@aris/core"
type WeatherItem = FeedItem<"weather", { temp: number; condition: string }>
class WeatherDataSource implements DataSource<WeatherItem> {
readonly type = "weather"
async query(context: Context): Promise<WeatherItem[]> {
const location = contextValue(context, LocationKey)
if (!location) return []
const data = await fetchWeather(location.lat, location.lng)
return [
{
id: `weather-${Date.now()}`,
type: this.type,
priority: 0.5,
timestamp: context.time,
data: { temp: data.temp, condition: data.condition },
},
]
}
}
```
### Create Context Providers
Context providers push updates reactively and provide current values on demand:
```ts
import type { ContextProvider } from "@aris/core"
class LocationProvider implements ContextProvider<Location> {
readonly key = LocationKey
onUpdate(callback: (value: Location) => void): () => void {
const watchId = navigator.geolocation.watchPosition((pos) => {
callback({
lat: pos.coords.latitude,
lng: pos.coords.longitude,
accuracy: pos.coords.accuracy,
})
})
return () => navigator.geolocation.clearWatch(watchId)
}
async fetchCurrentValue(): Promise<Location> {
const pos = await new Promise<GeolocationPosition>((resolve, reject) => {
navigator.geolocation.getCurrentPosition(resolve, reject)
})
return {
lat: pos.coords.latitude,
lng: pos.coords.longitude,
accuracy: pos.coords.accuracy,
}
}
}
```
### Wire It Together
```ts
import { ContextBridge, FeedController } from "@aris/core"
// Create controller with data sources
const controller = new FeedController({ debounceMs: 100 })
.addDataSource(weatherSource)
.addDataSource(tflSource)
// Bridge context providers to controller
const bridge = new ContextBridge(controller)
.addProvider(locationProvider)
.addProvider(musicProvider)
// Subscribe to feed updates
controller.subscribe((result) => {
console.log("Feed items:", result.items)
console.log("Errors:", result.errors)
})
// Manual refresh (gathers from all providers)
await bridge.refresh()
// Direct context update (bypasses providers)
controller.pushContextUpdate({
[CurrentTrackKey]: { trackId: "123", title: "Song", artist: "Artist", startedAt: new Date() },
})
// Cleanup
bridge.stop()
controller.stop()
```
### Per-User Pattern
Each user gets their own controller instance:
```ts
const connections = new Map<string, { controller: FeedController; bridge: ContextBridge }>()
function onUserConnect(userId: string, ws: WebSocket) {
const controller = new FeedController({ debounceMs: 100 })
.addDataSource(weatherSource)
.addDataSource(tflSource)
const bridge = new ContextBridge(controller).addProvider(createLocationProvider())
controller.subscribe((result) => {
ws.send(JSON.stringify({ type: "feed-update", items: result.items }))
})
connections.set(userId, { controller, bridge })
}
function onUserDisconnect(userId: string) {
const conn = connections.get(userId)
if (conn) {
conn.bridge.stop()
conn.controller.stop()
connections.delete(userId)
}
}
```
## API
### Context
| Export | Description |
| ---------------------------- | --------------------------------------- |
| `ContextKey<T>` | Branded type for type-safe context keys |
| `contextKey<T>(key)` | Creates a typed context key |
| `contextValue(context, key)` | Type-safe context value accessor |
| `Context` | Time + arbitrary key-value bag |
### Data Sources
| Export | Description |
| ---------------------------- | --------------------------------- |
| `DataSource<TItem, TConfig>` | Interface for feed item producers |
| `FeedItem<TType, TData>` | Single item in the feed |
### Orchestration
| Export | Description |
| -------------------- | ---------------------------------------------------- |
| `FeedController` | Holds context, debounces updates, reconciles sources |
| `ContextProvider<T>` | Reactive + on-demand context value provider |
| `ContextBridge` | Bridges providers to controller |
### Reconciler
| Export | Description |
| -------------------- | --------------------------------------------- |
| `Reconciler` | Low-level: queries sources, sorts by priority |
| `ReconcileResult<T>` | Items + errors from reconciliation |

View File

@@ -1,10 +0,0 @@
export interface Location {
lat: number
lng: number
accuracy: number
}
export interface Context {
time: Date
location?: Location
}

View File

@@ -1,7 +0,0 @@
import type { Context } from "./context"
import type { FeedItem } from "./feed"
export interface DataSource<TItem extends FeedItem = FeedItem, TConfig = unknown> {
readonly type: TItem["type"]
query(context: Context, config: TConfig): Promise<TItem[]>
}

View File

@@ -1,10 +0,0 @@
export interface FeedItem<
TType extends string = string,
TData extends Record<string, unknown> = Record<string, unknown>,
> {
id: string
type: TType
priority: number
timestamp: Date
data: TData
}

View File

@@ -1,5 +0,0 @@
export type { Context, Location } from "./context"
export type { FeedItem } from "./feed"
export type { DataSource } from "./data-source"
export type { ReconcilerConfig, ReconcileResult, SourceError } from "./reconciler"
export { Reconciler } from "./reconciler"

View File

@@ -1,240 +0,0 @@
import { describe, expect, test } from "bun:test"
import type { Context } from "./context"
import type { DataSource } from "./data-source"
import type { FeedItem } from "./feed"
import { Reconciler } from "./reconciler"
type WeatherData = { temp: number }
type WeatherItem = FeedItem<"weather", WeatherData>
type CalendarData = { title: string }
type CalendarItem = FeedItem<"calendar", CalendarData>
const createMockContext = (): Context => ({
time: new Date("2026-01-15T12:00:00Z"),
})
const createWeatherSource = (items: WeatherItem[], delay = 0): DataSource<WeatherItem> => ({
type: "weather",
async query() {
if (delay > 0) {
await new Promise((resolve) => setTimeout(resolve, delay))
}
return items
},
})
const createCalendarSource = (items: CalendarItem[]): DataSource<CalendarItem> => ({
type: "calendar",
async query() {
return items
},
})
const createFailingSource = (type: string, error: Error): DataSource<FeedItem> => ({
type,
async query() {
throw error
},
})
describe("Reconciler", () => {
test("returns empty result when no sources registered", async () => {
const reconciler = new Reconciler()
const result = await reconciler.reconcile(createMockContext())
expect(result.items).toEqual([])
expect(result.errors).toEqual([])
})
test("collects items from single source", async () => {
const items: WeatherItem[] = [
{
id: "weather-1",
type: "weather",
priority: 0.5,
timestamp: new Date(),
data: { temp: 20 },
},
]
const reconciler = new Reconciler().register(createWeatherSource(items))
const result = await reconciler.reconcile(createMockContext())
expect(result.items).toEqual(items)
expect(result.errors).toEqual([])
})
test("collects items from multiple sources", async () => {
const weatherItems: WeatherItem[] = [
{
id: "weather-1",
type: "weather",
priority: 0.5,
timestamp: new Date(),
data: { temp: 20 },
},
]
const calendarItems: CalendarItem[] = [
{
id: "calendar-1",
type: "calendar",
priority: 0.8,
timestamp: new Date(),
data: { title: "Meeting" },
},
]
const reconciler = new Reconciler()
.register(createWeatherSource(weatherItems))
.register(createCalendarSource(calendarItems))
const result = await reconciler.reconcile(createMockContext())
expect(result.items).toHaveLength(2)
expect(result.errors).toEqual([])
})
test("sorts items by priority descending", async () => {
const weatherItems: WeatherItem[] = [
{
id: "weather-1",
type: "weather",
priority: 0.2,
timestamp: new Date(),
data: { temp: 20 },
},
]
const calendarItems: CalendarItem[] = [
{
id: "calendar-1",
type: "calendar",
priority: 0.9,
timestamp: new Date(),
data: { title: "Meeting" },
},
]
const reconciler = new Reconciler()
.register(createWeatherSource(weatherItems))
.register(createCalendarSource(calendarItems))
const result = await reconciler.reconcile(createMockContext())
expect(result.items[0]?.id).toBe("calendar-1")
expect(result.items[1]?.id).toBe("weather-1")
})
test("captures errors from failing sources", async () => {
const error = new Error("Source failed")
const reconciler = new Reconciler().register(createFailingSource("failing", error))
const result = await reconciler.reconcile(createMockContext())
expect(result.items).toEqual([])
expect(result.errors).toHaveLength(1)
expect(result.errors[0]?.sourceType).toBe("failing")
expect(result.errors[0]?.error.message).toBe("Source failed")
})
test("returns partial results when some sources fail", async () => {
const items: WeatherItem[] = [
{
id: "weather-1",
type: "weather",
priority: 0.5,
timestamp: new Date(),
data: { temp: 20 },
},
]
const reconciler = new Reconciler()
.register(createWeatherSource(items))
.register(createFailingSource("failing", new Error("Failed")))
const result = await reconciler.reconcile(createMockContext())
expect(result.items).toHaveLength(1)
expect(result.errors).toHaveLength(1)
})
test("times out slow sources", async () => {
const items: WeatherItem[] = [
{
id: "weather-1",
type: "weather",
priority: 0.5,
timestamp: new Date(),
data: { temp: 20 },
},
]
const reconciler = new Reconciler({ timeout: 50 }).register(createWeatherSource(items, 100))
const result = await reconciler.reconcile(createMockContext())
expect(result.items).toEqual([])
expect(result.errors).toHaveLength(1)
expect(result.errors[0]?.sourceType).toBe("weather")
expect(result.errors[0]?.error.message).toContain("timed out")
})
test("unregister removes source", async () => {
const items: WeatherItem[] = [
{
id: "weather-1",
type: "weather",
priority: 0.5,
timestamp: new Date(),
data: { temp: 20 },
},
]
const reconciler = new Reconciler().register(createWeatherSource(items)).unregister("weather")
const result = await reconciler.reconcile(createMockContext())
expect(result.items).toEqual([])
})
test("infers discriminated union type from chained registers", async () => {
const weatherItems: WeatherItem[] = [
{
id: "weather-1",
type: "weather",
priority: 0.5,
timestamp: new Date(),
data: { temp: 20 },
},
]
const calendarItems: CalendarItem[] = [
{
id: "calendar-1",
type: "calendar",
priority: 0.8,
timestamp: new Date(),
data: { title: "Meeting" },
},
]
const reconciler = new Reconciler()
.register(createWeatherSource(weatherItems))
.register(createCalendarSource(calendarItems))
const { items } = await reconciler.reconcile(createMockContext())
// Type narrowing should work
for (const item of items) {
if (item.type === "weather") {
expect(typeof item.data.temp).toBe("number")
} else if (item.type === "calendar") {
expect(typeof item.data.title).toBe("string")
}
}
})
})

View File

@@ -0,0 +1,102 @@
import type { Context } from "./context"
import type { ContextProvider } from "./context-provider"
interface ContextUpdatable {
pushContextUpdate(update: Partial<Context>): void
}
export interface ProviderError {
key: string
error: Error
}
export interface RefreshResult {
errors: ProviderError[]
}
/**
* Bridges context providers to a feed controller.
*
* Subscribes to provider updates and forwards them to the controller.
* Supports manual refresh to gather current values from all providers.
*
* @example
* ```ts
* const controller = new FeedController()
* .addDataSource(new WeatherDataSource())
* .addDataSource(new TflDataSource())
*
* const bridge = new ContextBridge(controller)
* .addProvider(new LocationProvider())
* .addProvider(new MusicProvider())
*
* // Manual refresh gathers from all providers
* await bridge.refresh()
*
* // Cleanup
* bridge.stop()
* controller.stop()
* ```
*/
export class ContextBridge {
private controller: ContextUpdatable
private providers = new Map<string, ContextProvider>()
private cleanups: Array<() => void> = []
constructor(controller: ContextUpdatable) {
this.controller = controller
}
/**
* Registers a context provider. Immediately subscribes to updates.
*/
addProvider<T>(provider: ContextProvider<T>): this {
this.providers.set(provider.key, provider as ContextProvider)
const cleanup = provider.onUpdate((value) => {
this.controller.pushContextUpdate({ [provider.key]: value })
})
this.cleanups.push(cleanup)
return this
}
/**
* Gathers current values from all providers and pushes to controller.
* Use for manual refresh when user pulls to refresh.
* Returns errors from providers that failed to fetch.
*/
async refresh(): Promise<RefreshResult> {
const updates: Partial<Context> = {}
const errors: ProviderError[] = []
const entries = Array.from(this.providers.entries())
const results = await Promise.allSettled(
entries.map(([_, provider]) => provider.fetchCurrentValue()),
)
entries.forEach(([key], i) => {
const result = results[i]
if (result?.status === "fulfilled") {
updates[key] = result.value
} else if (result?.status === "rejected") {
errors.push({
key,
error: result.reason instanceof Error ? result.reason : new Error(String(result.reason)),
})
}
})
this.controller.pushContextUpdate(updates)
return { errors }
}
/**
* Unsubscribes from all providers.
*/
stop(): void {
this.cleanups.forEach((cleanup) => cleanup())
this.cleanups = []
}
}

View File

@@ -0,0 +1,35 @@
/**
* Provides context values reactively and on-demand.
*
* Implementations push updates when values change (reactive) and
* return current values when requested (for manual refresh).
*
* @example
* ```ts
* class LocationProvider implements ContextProvider<Location> {
* readonly key = LocationKey
*
* onUpdate(callback: (value: Location) => void): () => void {
* const watchId = navigator.geolocation.watchPosition(pos => {
* callback({ lat: pos.coords.latitude, lng: pos.coords.longitude, accuracy: pos.coords.accuracy })
* })
* return () => navigator.geolocation.clearWatch(watchId)
* }
*
* async fetchCurrentValue(): Promise<Location> {
* const pos = await getCurrentPosition()
* return { lat: pos.coords.latitude, lng: pos.coords.longitude, accuracy: pos.coords.accuracy }
* }
* }
* ```
*/
export interface ContextProvider<T = unknown> {
/** The context key this provider populates */
readonly key: string
/** Subscribe to value changes. Returns cleanup function. */
onUpdate(callback: (value: T) => void): () => void
/** Fetch current value on-demand (used for manual refresh). */
fetchCurrentValue(): Promise<T>
}

View File

@@ -0,0 +1,46 @@
/**
* Branded type for type-safe context keys.
*
* Each package defines its own keys with associated value types:
* ```ts
* const LocationKey: ContextKey<Location> = contextKey("location")
* ```
*/
export type ContextKey<T> = string & { __contextValue?: T }
/**
* Creates a typed context key.
*
* @example
* ```ts
* interface Location { lat: number; lng: number; accuracy: number }
* const LocationKey: ContextKey<Location> = contextKey("location")
* ```
*/
export function contextKey<T>(key: string): ContextKey<T> {
return key as ContextKey<T>
}
/**
* Type-safe accessor for context values.
*
* @example
* ```ts
* const location = contextValue(context, LocationKey)
* if (location) {
* console.log(location.lat, location.lng)
* }
* ```
*/
export function contextValue<T>(context: Context, key: ContextKey<T>): T | undefined {
return context[key] as T | undefined
}
/**
* Arbitrary key-value bag representing the current state.
* Always includes `time`. Other keys are added by context providers.
*/
export interface Context {
time: Date
[key: string]: unknown
}

View File

@@ -0,0 +1,35 @@
import type { Context } from "./context"
import type { FeedItem } from "./feed"
/**
* Produces feed items from an external source.
*
* @example
* ```ts
* type WeatherItem = FeedItem<"weather", { temp: number }>
*
* class WeatherDataSource implements DataSource<WeatherItem> {
* readonly type = "weather"
*
* async query(context: Context): Promise<WeatherItem[]> {
* const location = contextValue(context, LocationKey)
* if (!location) return []
* const data = await fetchWeather(location)
* return [{
* id: `weather-${Date.now()}`,
* type: this.type,
* priority: 0.5,
* timestamp: context.time,
* data: { temp: data.temperature },
* }]
* }
* }
* ```
*/
export interface DataSource<TItem extends FeedItem = FeedItem, TConfig = unknown> {
/** Unique identifier for this source type */
readonly type: TItem["type"]
/** Queries the source and returns feed items */
query(context: Context, config: TConfig): Promise<TItem[]>
}

View File

@@ -0,0 +1,161 @@
import type { Context } from "./context"
import type { DataSource } from "./data-source"
import type { FeedItem } from "./feed"
import type { ReconcileResult } from "./reconciler"
import { Reconciler } from "./reconciler"
export interface FeedControllerConfig {
/** Timeout for each data source query in milliseconds */
timeout?: number
/** Debounce window for batching context updates (default: 100ms) */
debounceMs?: number
/** Initial context state */
initialContext?: Context
}
export type FeedSubscriber<TItems extends FeedItem> = (result: ReconcileResult<TItems>) => void
interface RegisteredSource {
source: DataSource<FeedItem, unknown>
config: unknown
}
const DEFAULT_DEBOUNCE_MS = 100
/**
* Orchestrates feed reconciliation in response to context updates.
*
* Holds context state, debounces updates, queries data sources, and
* notifies subscribers. Each user should have their own instance.
*
* @example
* ```ts
* const controller = new FeedController({ debounceMs: 100 })
* .addDataSource(new WeatherDataSource())
* .addDataSource(new TflDataSource())
*
* controller.subscribe((result) => {
* console.log(result.items)
* })
*
* // Context update triggers debounced reconcile
* controller.pushContextUpdate({ [LocationKey]: location })
*
* // Direct reconcile (no debounce)
* const result = await controller.reconcile()
*
* // Cleanup
* controller.stop()
* ```
*/
export class FeedController<TItems extends FeedItem = never> {
private sources = new Map<string, RegisteredSource>()
private subscribers = new Set<FeedSubscriber<TItems>>()
private context: Context
private debounceMs: number
private timeout: number | undefined
private pendingTimeout: ReturnType<typeof setTimeout> | null = null
private stopped = false
constructor(config?: FeedControllerConfig) {
this.context = config?.initialContext ?? { time: new Date() }
this.debounceMs = config?.debounceMs ?? DEFAULT_DEBOUNCE_MS
this.timeout = config?.timeout
}
/** Registers a data source. */
addDataSource<TItem extends FeedItem, TConfig>(
source: DataSource<TItem, TConfig>,
config?: TConfig,
): FeedController<TItems | TItem> {
this.sources.set(source.type, {
source: source as DataSource<FeedItem, unknown>,
config,
})
return this as FeedController<TItems | TItem>
}
/** Removes a data source by type. */
removeDataSource<T extends TItems["type"]>(
sourceType: T,
): FeedController<Exclude<TItems, { type: T }>> {
this.sources.delete(sourceType)
return this as unknown as FeedController<Exclude<TItems, { type: T }>>
}
/** Stops the controller and cancels pending reconciles. */
stop(): void {
this.stopped = true
if (this.pendingTimeout) {
clearTimeout(this.pendingTimeout)
this.pendingTimeout = null
}
}
/** Merges update into context and schedules a debounced reconcile. */
pushContextUpdate(update: Partial<Context>): void {
this.context = { ...this.context, ...update, time: new Date() }
this.scheduleReconcile()
}
/** Subscribes to feed updates. Returns unsubscribe function. */
subscribe(callback: FeedSubscriber<TItems>): () => void {
this.subscribers.add(callback)
return () => {
this.subscribers.delete(callback)
}
}
/** Immediately reconciles with current or provided context. */
async reconcile(context?: Context): Promise<ReconcileResult<TItems>> {
const ctx = context ?? this.context
const reconciler = this.createReconciler()
return reconciler.reconcile(ctx)
}
/** Returns current context. */
getContext(): Context {
return this.context
}
private scheduleReconcile(): void {
if (this.pendingTimeout) return
this.pendingTimeout = setTimeout(() => {
this.flushPending()
}, this.debounceMs)
}
private async flushPending(): Promise<void> {
this.pendingTimeout = null
if (this.stopped) return
if (this.sources.size === 0) return
const reconciler = this.createReconciler()
const result = await reconciler.reconcile(this.context)
this.notifySubscribers(result)
}
private createReconciler(): Reconciler<TItems> {
const reconciler = new Reconciler<TItems>({ timeout: this.timeout })
Array.from(this.sources.values()).forEach(({ source, config }) => {
reconciler.register(source, config)
})
return reconciler as Reconciler<TItems>
}
private notifySubscribers(result: ReconcileResult<TItems>): void {
this.subscribers.forEach((callback) => {
try {
callback(result)
} catch {
// Subscriber errors shouldn't break other subscribers
}
})
}
}

View File

@@ -0,0 +1,31 @@
/**
* A single item in the feed.
*
* @example
* ```ts
* type WeatherItem = FeedItem<"weather", { temp: number; condition: string }>
*
* const item: WeatherItem = {
* id: "weather-123",
* type: "weather",
* priority: 0.5,
* timestamp: new Date(),
* data: { temp: 18, condition: "cloudy" },
* }
* ```
*/
export interface FeedItem<
TType extends string = string,
TData extends Record<string, unknown> = Record<string, unknown>,
> {
/** Unique identifier */
id: string
/** Item type, matches the data source type */
type: TType
/** Sort priority (higher = more important, shown first) */
priority: number
/** When this item was generated */
timestamp: Date
/** Type-specific payload */
data: TData
}

View File

@@ -0,0 +1,24 @@
// Context
export type { Context, ContextKey } from "./context"
export { contextKey, contextValue } from "./context"
// Feed
export type { FeedItem } from "./feed"
// Data Source
export type { DataSource } from "./data-source"
// Context Provider
export type { ContextProvider } from "./context-provider"
// Context Bridge
export type { ProviderError, RefreshResult } from "./context-bridge"
export { ContextBridge } from "./context-bridge"
// Reconciler
export type { ReconcileResult, ReconcilerConfig, SourceError } from "./reconciler"
export { Reconciler } from "./reconciler"
// Feed Controller
export type { FeedControllerConfig, FeedSubscriber } from "./feed-controller"
export { FeedController } from "./feed-controller"

View File

@@ -0,0 +1,336 @@
import { afterEach, describe, expect, test } from "bun:test"
import type { ContextKey, ContextProvider, DataSource, FeedItem } from "./index"
import { contextKey, contextValue, ContextBridge, FeedController } from "./index"
// =============================================================================
// CONTEXT KEYS
// =============================================================================
interface Location {
lat: number
lng: number
accuracy: number
}
interface CurrentTrack {
trackId: string
title: string
artist: string
startedAt: Date
}
const LocationKey: ContextKey<Location> = contextKey("location")
const CurrentTrackKey: ContextKey<CurrentTrack> = contextKey("currentTrack")
// =============================================================================
// DATA SOURCES
// =============================================================================
type WeatherItem = FeedItem<"weather", { temp: number; condition: string }>
function createWeatherSource(): DataSource<WeatherItem> {
return {
type: "weather",
async query(context) {
const location = contextValue(context, LocationKey)
if (!location) return []
return [
{
id: `weather-${Date.now()}`,
type: "weather",
priority: 0.5,
timestamp: context.time,
data: { temp: 18, condition: "cloudy" },
},
]
},
}
}
type TflItem = FeedItem<"tfl-alert", { line: string; status: string }>
function createTflSource(): DataSource<TflItem> {
return {
type: "tfl-alert",
async query(context) {
const location = contextValue(context, LocationKey)
if (!location) return []
return [
{
id: "tfl-victoria-delays",
type: "tfl-alert",
priority: 0.8,
timestamp: context.time,
data: { line: "Victoria", status: "Minor delays" },
},
]
},
}
}
type MusicContextItem = FeedItem<"music-context", { suggestion: string }>
function createMusicContextSource(): DataSource<MusicContextItem> {
return {
type: "music-context",
async query(context) {
const track = contextValue(context, CurrentTrackKey)
if (!track) return []
return [
{
id: `music-ctx-${track.trackId}`,
type: "music-context",
priority: 0.3,
timestamp: context.time,
data: { suggestion: `You might also like similar artists to ${track.artist}` },
},
]
},
}
}
// =============================================================================
// CONTEXT PROVIDERS
// =============================================================================
interface SimulatedLocationProvider extends ContextProvider<Location> {
simulateUpdate(location: Location): void
}
function createLocationProvider(): SimulatedLocationProvider {
let callback: ((value: Location) => void) | null = null
let currentLocation: Location = { lat: 0, lng: 0, accuracy: 0 }
return {
key: LocationKey,
onUpdate(cb) {
callback = cb
return () => {
callback = null
}
},
async fetchCurrentValue() {
return currentLocation
},
simulateUpdate(location: Location) {
currentLocation = location
callback?.(location)
},
}
}
// =============================================================================
// HELPERS
// =============================================================================
function delay(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms))
}
type AppFeedItem = WeatherItem | TflItem | MusicContextItem
// =============================================================================
// TESTS
// =============================================================================
describe("Integration", () => {
let controller: FeedController<AppFeedItem>
let bridge: ContextBridge
let locationProvider: SimulatedLocationProvider
afterEach(() => {
bridge?.stop()
controller?.stop()
})
test("location update triggers feed with location-dependent sources", async () => {
controller = new FeedController<AppFeedItem>({ debounceMs: 10 })
.addDataSource(createWeatherSource())
.addDataSource(createTflSource())
.addDataSource(createMusicContextSource())
locationProvider = createLocationProvider()
bridge = new ContextBridge(controller).addProvider(locationProvider)
const results: Array<{ items: AppFeedItem[] }> = []
controller.subscribe((result) => {
results.push({ items: [...result.items] })
})
locationProvider.simulateUpdate({ lat: 51.5074, lng: -0.1278, accuracy: 10 })
await delay(50)
expect(results).toHaveLength(1)
expect(results[0]!.items).toHaveLength(2) // weather + tfl, no music
expect(results[0]!.items.map((i) => i.type).sort()).toEqual(["tfl-alert", "weather"])
})
test("music change triggers feed with music-dependent source", async () => {
controller = new FeedController<AppFeedItem>({ debounceMs: 10 })
.addDataSource(createWeatherSource())
.addDataSource(createTflSource())
.addDataSource(createMusicContextSource())
locationProvider = createLocationProvider()
bridge = new ContextBridge(controller).addProvider(locationProvider)
// Set initial location
locationProvider.simulateUpdate({ lat: 51.5074, lng: -0.1278, accuracy: 10 })
await delay(50)
const results: Array<{ items: AppFeedItem[] }> = []
controller.subscribe((result) => {
results.push({ items: [...result.items] })
})
// Push music change directly to controller
controller.pushContextUpdate({
[CurrentTrackKey]: {
trackId: "track-456",
title: "Bohemian Rhapsody",
artist: "Queen",
startedAt: new Date(),
},
})
await delay(50)
expect(results).toHaveLength(1)
expect(results[0]!.items).toHaveLength(3) // weather + tfl + music
expect(results[0]!.items.map((i) => i.type).sort()).toEqual([
"music-context",
"tfl-alert",
"weather",
])
const musicItem = results[0]!.items.find((i) => i.type === "music-context") as MusicContextItem
expect(musicItem.data.suggestion).toContain("Queen")
})
test("manual refresh gathers from all providers and reconciles", async () => {
controller = new FeedController<AppFeedItem>({ debounceMs: 10 })
.addDataSource(createWeatherSource())
.addDataSource(createTflSource())
locationProvider = createLocationProvider()
// Set location without triggering update
locationProvider.simulateUpdate({ lat: 40.7128, lng: -74.006, accuracy: 5 })
// Clear the callback so simulateUpdate doesn't trigger reconcile
const originalOnUpdate = locationProvider.onUpdate
locationProvider.onUpdate = (cb) => {
return originalOnUpdate(cb)
}
bridge = new ContextBridge(controller).addProvider(locationProvider)
const results: Array<{ items: AppFeedItem[] }> = []
controller.subscribe((result) => {
results.push({ items: [...result.items] })
})
// Manual refresh should gather current location and reconcile
await bridge.refresh()
await delay(50)
expect(results).toHaveLength(1)
expect(results[0]!.items).toHaveLength(2)
const ctx = controller.getContext()
expect(contextValue(ctx, LocationKey)).toEqual({ lat: 40.7128, lng: -74.006, accuracy: 5 })
})
test("context accumulates across multiple updates", async () => {
controller = new FeedController<AppFeedItem>({ debounceMs: 10 })
.addDataSource(createWeatherSource())
.addDataSource(createMusicContextSource())
locationProvider = createLocationProvider()
bridge = new ContextBridge(controller).addProvider(locationProvider)
// Location update
locationProvider.simulateUpdate({ lat: 51.5074, lng: -0.1278, accuracy: 10 })
await delay(50)
// Music update
controller.pushContextUpdate({
[CurrentTrackKey]: {
trackId: "track-789",
title: "Stairway to Heaven",
artist: "Led Zeppelin",
startedAt: new Date(),
},
})
await delay(50)
const ctx = controller.getContext()
expect(contextValue(ctx, LocationKey)).toEqual({ lat: 51.5074, lng: -0.1278, accuracy: 10 })
expect(contextValue(ctx, CurrentTrackKey)?.artist).toBe("Led Zeppelin")
})
test("items are sorted by priority descending", async () => {
controller = new FeedController<AppFeedItem>({ debounceMs: 10 })
.addDataSource(createWeatherSource()) // priority 0.5
.addDataSource(createTflSource()) // priority 0.8
.addDataSource(createMusicContextSource()) // priority 0.3
locationProvider = createLocationProvider()
bridge = new ContextBridge(controller).addProvider(locationProvider)
locationProvider.simulateUpdate({ lat: 51.5074, lng: -0.1278, accuracy: 10 })
controller.pushContextUpdate({
[CurrentTrackKey]: {
trackId: "track-1",
title: "Test",
artist: "Test",
startedAt: new Date(),
},
})
await delay(50)
const result = await controller.reconcile()
expect(result.items[0]!.type).toBe("tfl-alert") // 0.8
expect(result.items[1]!.type).toBe("weather") // 0.5
expect(result.items[2]!.type).toBe("music-context") // 0.3
})
test("cleanup stops providers and pending reconciles", async () => {
let queryCount = 0
const trackingSource: DataSource<WeatherItem> = {
type: "weather",
async query(context) {
queryCount++
const location = contextValue(context, LocationKey)
if (!location) return []
return [
{
id: "weather-1",
type: "weather",
priority: 0.5,
timestamp: context.time,
data: { temp: 20, condition: "sunny" },
},
]
},
}
const ctrl = new FeedController<WeatherItem>({ debounceMs: 100 }).addDataSource(trackingSource)
locationProvider = createLocationProvider()
const br = new ContextBridge(ctrl).addProvider(locationProvider)
ctrl.subscribe(() => {})
// Trigger update but stop before debounce flushes
locationProvider.simulateUpdate({ lat: 51.5, lng: -0.1, accuracy: 10 })
br.stop()
ctrl.stop()
await delay(150)
expect(queryCount).toBe(0)
})
})

View File

@@ -1,5 +1,5 @@
import type { Context, DataSource } from "@aris/core" import type { Context, DataSource } from "@aris/core"
import { TflApi, type ITflApi } from "./tfl-api.ts"
import type { import type {
StationLocation, StationLocation,
TflAlertData, TflAlertData,
@@ -10,6 +10,8 @@ import type {
TflLineId, TflLineId,
} from "./types.ts" } from "./types.ts"
import { TflApi, type ITflApi } from "./tfl-api.ts"
const SEVERITY_PRIORITY: Record<TflAlertSeverity, number> = { const SEVERITY_PRIORITY: Record<TflAlertSeverity, number> = {
closure: 100, closure: 100,
"major-delays": 80, "major-delays": 80,
@@ -22,7 +24,10 @@ function haversineDistance(lat1: number, lng1: number, lat2: number, lng2: numbe
const dLng = ((lng2 - lng1) * Math.PI) / 180 const dLng = ((lng2 - lng1) * Math.PI) / 180
const a = const a =
Math.sin(dLat / 2) * Math.sin(dLat / 2) + Math.sin(dLat / 2) * Math.sin(dLat / 2) +
Math.cos((lat1 * Math.PI) / 180) * Math.cos((lat2 * Math.PI) / 180) * Math.sin(dLng / 2) * Math.sin(dLng / 2) Math.cos((lat1 * Math.PI) / 180) *
Math.cos((lat2 * Math.PI) / 180) *
Math.sin(dLng / 2) *
Math.sin(dLng / 2)
const c = 2 * Math.atan2(Math.sqrt(a), Math.sqrt(1 - a)) const c = 2 * Math.atan2(Math.sqrt(a), Math.sqrt(1 - a))
return R * c return R * c
} }
@@ -62,12 +67,19 @@ export class TflDataSource implements DataSource<TflAlertFeedItem, TflDataSource
} }
async query(context: Context, config: TflDataSourceConfig): Promise<TflAlertFeedItem[]> { async query(context: Context, config: TflDataSourceConfig): Promise<TflAlertFeedItem[]> {
const [statuses, stations] = await Promise.all([this.api.fetchLineStatuses(config.lines), this.api.fetchStations()]) const [statuses, stations] = await Promise.all([
this.api.fetchLineStatuses(config.lines),
this.api.fetchStations(),
])
const items: TflAlertFeedItem[] = statuses.map((status) => { const items: TflAlertFeedItem[] = statuses.map((status) => {
const closestStationDistance = const closestStationDistance = context.location
context.location ? ? findClosestStationDistance(
findClosestStationDistance(status.lineId, stations, context.location.lat, context.location.lng) status.lineId,
stations,
context.location.lat,
context.location.lng,
)
: null : null
const data: TflAlertData = { const data: TflAlertData = {

View File

@@ -1,11 +1,12 @@
import type { Context } from "@aris/core"
import { describe, expect, test } from "bun:test" import { describe, expect, test } from "bun:test"
import type { Context } from "@aris/core"
import { TflDataSource } from "./data-source.ts"
import type { ITflApi, TflLineStatus } from "./tfl-api.ts" import type { ITflApi, TflLineStatus } from "./tfl-api.ts"
import type { StationLocation, TflLineId } from "./types.ts" import type { StationLocation, TflLineId } from "./types.ts"
import fixtures from "../fixtures/tfl-responses.json" import fixtures from "../fixtures/tfl-responses.json"
import { TflDataSource } from "./data-source.ts"
// Mock API that returns fixture data // Mock API that returns fixture data
class FixtureTflApi implements ITflApi { class FixtureTflApi implements ITflApi {
@@ -109,9 +110,10 @@ describe("TfL Feed Items (using fixture data)", () => {
expect(typeof item.data.lineName).toBe("string") expect(typeof item.data.lineName).toBe("string")
expect(["minor-delays", "major-delays", "closure"]).toContain(item.data.severity) expect(["minor-delays", "major-delays", "closure"]).toContain(item.data.severity)
expect(typeof item.data.description).toBe("string") expect(typeof item.data.description).toBe("string")
expect(item.data.closestStationDistance === null || typeof item.data.closestStationDistance === "number").toBe( expect(
true, item.data.closestStationDistance === null ||
) typeof item.data.closestStationDistance === "number",
).toBe(true)
} }
}) })

View File

@@ -1,4 +1,5 @@
import { type } from "arktype" import { type } from "arktype"
import type { StationLocation, TflAlertSeverity } from "./types.ts" import type { StationLocation, TflAlertSeverity } from "./types.ts"
const TFL_API_BASE = "https://api.tfl.gov.uk" const TFL_API_BASE = "https://api.tfl.gov.uk"

View File

@@ -1,4 +1,5 @@
import type { FeedItem } from "@aris/core" import type { FeedItem } from "@aris/core"
import type { TflLineId } from "./tfl-api.ts" import type { TflLineId } from "./tfl-api.ts"
export type { TflLineId } from "./tfl-api.ts" export type { TflLineId } from "./tfl-api.ts"