diff --git a/packages/aris-core/src/feed-engine.ts b/packages/aris-core/src/feed-engine.ts index 64c4972..1797365 100644 --- a/packages/aris-core/src/feed-engine.ts +++ b/packages/aris-core/src/feed-engine.ts @@ -1,6 +1,7 @@ import type { ActionDefinition } from "./action" import type { Context } from "./context" import type { FeedItem } from "./feed" +import type { FeedPostProcessor, ItemGroup } from "./feed-post-processor" import type { FeedSource } from "./feed-source" export interface SourceError { @@ -12,6 +13,8 @@ export interface FeedResult { context: Context items: TItem[] errors: SourceError[] + /** Item groups produced by post-processors */ + groupedItems?: ItemGroup[] } export type FeedSubscriber = (result: FeedResult) => void @@ -66,6 +69,7 @@ export class FeedEngine { private subscribers = new Set>() private cleanups: Array<() => void> = [] private started = false + private postProcessors: FeedPostProcessor[] = [] private readonly cacheTtlMs: number private cachedResult: FeedResult | null = null @@ -108,6 +112,23 @@ export class FeedEngine { return this } + /** + * Registers a post-processor. Processors run in registration order + * after items are collected, on every update path. + */ + registerPostProcessor(processor: FeedPostProcessor): this { + this.postProcessors.push(processor) + return this + } + + /** + * Unregisters a post-processor by reference. + */ + unregisterPostProcessor(processor: FeedPostProcessor): this { + this.postProcessors = this.postProcessors.filter((p) => p !== processor) + return this + } + /** * Refreshes the feed by running all sources in dependency order. * Calls fetchContext() then fetchItems() on each source. @@ -152,7 +173,18 @@ export class FeedEngine { this.context = context - const result: FeedResult = { context, items: items as TItems[], errors } + const { + items: processedItems, + groupedItems, + errors: postProcessorErrors, + } = await this.applyPostProcessors(items as TItems[], errors) + + const result: FeedResult = { + context, + items: processedItems, + errors: postProcessorErrors, + ...(groupedItems.length > 0 ? { groupedItems } : {}), + } this.updateCache(result) return result @@ -260,6 +292,58 @@ export class FeedEngine { return actions } + private async applyPostProcessors( + items: TItems[], + errors: SourceError[], + ): Promise<{ items: TItems[]; groupedItems: ItemGroup[]; errors: SourceError[] }> { + let currentItems = items + const allGroupedItems: ItemGroup[] = [] + const allErrors = [...errors] + + for (const processor of this.postProcessors) { + const snapshot = currentItems + try { + const enhancement = await processor(currentItems) + + if (enhancement.additionalItems?.length) { + // Post-processors operate on FeedItem[] without knowledge of TItems. + // Additional items are merged untyped — this is intentional. The + // processor contract is "FeedItem in, FeedItem out"; type narrowing + // is the caller's responsibility when consuming FeedResult. + currentItems = [...currentItems, ...(enhancement.additionalItems as TItems[])] + } + + if (enhancement.suppress?.length) { + const suppressSet = new Set(enhancement.suppress) + currentItems = currentItems.filter((item) => !suppressSet.has(item.id)) + } + + if (enhancement.groupedItems?.length) { + allGroupedItems.push(...enhancement.groupedItems) + } + } catch (err) { + const sourceId = processor.name || "anonymous" + allErrors.push({ + sourceId, + error: err instanceof Error ? err : new Error(String(err)), + }) + currentItems = snapshot + } + } + + // Remove stale item IDs from groups and drop empty groups + const itemIds = new Set(currentItems.map((item) => item.id)) + const validGroups = allGroupedItems.reduce((acc, group) => { + const ids = group.itemIds.filter((id) => itemIds.has(id)) + if (ids.length > 0) { + acc.push({ ...group, itemIds: ids }) + } + return acc + }, []) + + return { items: currentItems, groupedItems: validGroups, errors: allErrors } + } + private ensureGraph(): SourceGraph { if (!this.graph) { this.graph = buildGraph(Array.from(this.sources.values())) @@ -311,10 +395,17 @@ export class FeedEngine { } } + const { + items: processedItems, + groupedItems, + errors: postProcessorErrors, + } = await this.applyPostProcessors(items as TItems[], errors) + const result: FeedResult = { context: this.context, - items: items as TItems[], - errors, + items: processedItems, + errors: postProcessorErrors, + ...(groupedItems.length > 0 ? { groupedItems } : {}), } this.updateCache(result) diff --git a/packages/aris-core/src/feed-post-processor.test.ts b/packages/aris-core/src/feed-post-processor.test.ts new file mode 100644 index 0000000..00ce945 --- /dev/null +++ b/packages/aris-core/src/feed-post-processor.test.ts @@ -0,0 +1,443 @@ +import { describe, expect, mock, test } from "bun:test" + +import type { ActionDefinition, FeedItem, FeedPostProcessor, FeedSource } from "./index" + +import { FeedEngine } from "./feed-engine" +import { UnknownActionError } from "./index" + +// No-op action methods for test sources +const noActions = { + async listActions(): Promise> { + return {} + }, + async executeAction(actionId: string): Promise { + throw new UnknownActionError(actionId) + }, +} + +// ============================================================================= +// FEED ITEMS +// ============================================================================= + +type WeatherItem = FeedItem<"weather", { temp: number }> +type CalendarItem = FeedItem<"calendar", { title: string }> + +function weatherItem(id: string, temp: number): WeatherItem { + return { id, type: "weather", timestamp: new Date(), data: { temp } } +} + +function calendarItem(id: string, title: string): CalendarItem { + return { id, type: "calendar", timestamp: new Date(), data: { title } } +} + +// ============================================================================= +// TEST SOURCES +// ============================================================================= + +function createWeatherSource(items: WeatherItem[]) { + return { + id: "aris.weather", + ...noActions, + async fetchContext() { + return null + }, + async fetchItems(): Promise { + return items + }, + } +} + +function createCalendarSource(items: CalendarItem[]) { + return { + id: "aris.calendar", + ...noActions, + async fetchContext() { + return null + }, + async fetchItems(): Promise { + return items + }, + } +} + +// ============================================================================= +// REGISTRATION +// ============================================================================= + +describe("FeedPostProcessor", () => { + describe("registration", () => { + test("registerPostProcessor is chainable", () => { + const engine = new FeedEngine() + const processor: FeedPostProcessor = async () => ({}) + const result = engine.registerPostProcessor(processor) + expect(result).toBe(engine) + }) + + test("unregisterPostProcessor is chainable", () => { + const engine = new FeedEngine() + const processor: FeedPostProcessor = async () => ({}) + const result = engine.unregisterPostProcessor(processor) + expect(result).toBe(engine) + }) + + test("unregistered processor does not run", async () => { + const processor = mock(async () => ({})) + + const engine = new FeedEngine() + .register(createWeatherSource([weatherItem("w1", 20)])) + .registerPostProcessor(processor) + .unregisterPostProcessor(processor) + + await engine.refresh() + expect(processor).not.toHaveBeenCalled() + }) + }) + + // ============================================================================= + // ADDITIONAL ITEMS + // ============================================================================= + + describe("additionalItems", () => { + test("injects additional items into the feed", async () => { + const extra = calendarItem("c1", "Meeting") + + const engine = new FeedEngine() + .register(createWeatherSource([weatherItem("w1", 20)])) + .registerPostProcessor(async () => ({ additionalItems: [extra] })) + + const result = await engine.refresh() + expect(result.items).toHaveLength(2) + expect(result.items.find((i) => i.id === "c1")).toBeDefined() + }) + }) + + // ============================================================================= + // SUPPRESS + // ============================================================================= + + describe("suppress", () => { + test("removes suppressed items from the feed", async () => { + const engine = new FeedEngine() + .register(createWeatherSource([weatherItem("w1", 20), weatherItem("w2", 25)])) + .registerPostProcessor(async () => ({ suppress: ["w1"] })) + + const result = await engine.refresh() + expect(result.items).toHaveLength(1) + expect(result.items[0].id).toBe("w2") + }) + + test("suppressing nonexistent ID is a no-op", async () => { + const engine = new FeedEngine() + .register(createWeatherSource([weatherItem("w1", 20)])) + .registerPostProcessor(async () => ({ suppress: ["nonexistent"] })) + + const result = await engine.refresh() + expect(result.items).toHaveLength(1) + }) + }) + + // ============================================================================= + // GROUPED ITEMS + // ============================================================================= + + describe("groupedItems", () => { + test("accumulates grouped items on FeedResult", async () => { + const engine = new FeedEngine() + .register( + createCalendarSource([calendarItem("c1", "Meeting A"), calendarItem("c2", "Meeting B")]), + ) + .registerPostProcessor(async () => ({ + groupedItems: [{ itemIds: ["c1", "c2"], summary: "Busy afternoon" }], + })) + + const result = await engine.refresh() + expect(result.groupedItems).toEqual([{ itemIds: ["c1", "c2"], summary: "Busy afternoon" }]) + }) + + test("multiple processors accumulate groups", async () => { + const engine = new FeedEngine() + .register( + createCalendarSource([calendarItem("c1", "Meeting A"), calendarItem("c2", "Meeting B")]), + ) + .registerPostProcessor(async () => ({ + groupedItems: [{ itemIds: ["c1"], summary: "Group A" }], + })) + .registerPostProcessor(async () => ({ + groupedItems: [{ itemIds: ["c2"], summary: "Group B" }], + })) + + const result = await engine.refresh() + expect(result.groupedItems).toEqual([ + { itemIds: ["c1"], summary: "Group A" }, + { itemIds: ["c2"], summary: "Group B" }, + ]) + }) + + test("stale item IDs are removed from groups after suppression", async () => { + const engine = new FeedEngine() + .register( + createCalendarSource([calendarItem("c1", "Meeting A"), calendarItem("c2", "Meeting B")]), + ) + .registerPostProcessor(async () => ({ + groupedItems: [{ itemIds: ["c1", "c2"], summary: "Afternoon" }], + })) + .registerPostProcessor(async () => ({ suppress: ["c1"] })) + + const result = await engine.refresh() + expect(result.groupedItems).toEqual([{ itemIds: ["c2"], summary: "Afternoon" }]) + }) + + test("groups with all items suppressed are dropped", async () => { + const engine = new FeedEngine() + .register(createCalendarSource([calendarItem("c1", "Meeting A")])) + .registerPostProcessor(async () => ({ + groupedItems: [{ itemIds: ["c1"], summary: "Solo" }], + })) + .registerPostProcessor(async () => ({ suppress: ["c1"] })) + + const result = await engine.refresh() + expect(result.groupedItems).toBeUndefined() + }) + + test("groupedItems is omitted when no processors produce groups", async () => { + const engine = new FeedEngine() + .register(createWeatherSource([weatherItem("w1", 20)])) + .registerPostProcessor(async () => ({})) + + const result = await engine.refresh() + expect(result.groupedItems).toBeUndefined() + }) + }) + + // ============================================================================= + // PIPELINE ORDERING + // ============================================================================= + + describe("pipeline ordering", () => { + test("each processor sees items as modified by the previous processor", async () => { + const seen: string[] = [] + + const engine = new FeedEngine() + .register(createWeatherSource([weatherItem("w1", 20)])) + .registerPostProcessor(async () => ({ + additionalItems: [calendarItem("c1", "Injected")], + })) + .registerPostProcessor(async (items) => { + seen.push(...items.map((i) => i.id)) + return {} + }) + + await engine.refresh() + expect(seen).toEqual(["w1", "c1"]) + }) + + test("suppression in first processor affects second processor", async () => { + const seen: string[] = [] + + const engine = new FeedEngine() + .register(createWeatherSource([weatherItem("w1", 20), weatherItem("w2", 25)])) + .registerPostProcessor(async () => ({ suppress: ["w1"] })) + .registerPostProcessor(async (items) => { + seen.push(...items.map((i) => i.id)) + return {} + }) + + await engine.refresh() + expect(seen).toEqual(["w2"]) + }) + }) + + // ============================================================================= + // ERROR HANDLING + // ============================================================================= + + describe("error handling", () => { + test("throwing processor is recorded in errors and pipeline continues", async () => { + const seen: string[] = [] + + async function failingProcessor(): Promise { + throw new Error("processor failed") + } + + const engine = new FeedEngine() + .register(createWeatherSource([weatherItem("w1", 20)])) + .registerPostProcessor(failingProcessor) + .registerPostProcessor(async (items) => { + seen.push(...items.map((i) => i.id)) + return {} + }) + + const result = await engine.refresh() + + const ppError = result.errors.find((e) => e.sourceId === "failingProcessor") + expect(ppError).toBeDefined() + expect(ppError!.error.message).toBe("processor failed") + + // Pipeline continued — observer still saw the original item + expect(seen).toEqual(["w1"]) + expect(result.items).toHaveLength(1) + }) + + test("anonymous throwing processor uses 'anonymous' as sourceId", async () => { + const engine = new FeedEngine() + .register(createWeatherSource([weatherItem("w1", 20)])) + .registerPostProcessor(async () => { + throw new Error("anon failed") + }) + + const result = await engine.refresh() + const ppError = result.errors.find((e) => e.sourceId === "anonymous") + expect(ppError).toBeDefined() + }) + + test("non-Error throw is wrapped", async () => { + async function failingProcessor(): Promise { + throw "string error" + } + + const engine = new FeedEngine() + .register(createWeatherSource([weatherItem("w1", 20)])) + .registerPostProcessor(failingProcessor) + + const result = await engine.refresh() + const ppError = result.errors.find((e) => e.sourceId === "failingProcessor") + expect(ppError).toBeDefined() + expect(ppError!.error).toBeInstanceOf(Error) + }) + }) + + // ============================================================================= + // REACTIVE PATHS + // ============================================================================= + + describe("reactive updates", () => { + test("post-processors run during reactive context updates", async () => { + let callCount = 0 + + let triggerUpdate: ((update: Record) => void) | null = null + + const source: FeedSource = { + id: "aris.reactive", + ...noActions, + async fetchContext() { + return null + }, + async fetchItems() { + return [weatherItem("w1", 20)] + }, + onContextUpdate(callback, _getContext) { + triggerUpdate = callback + return () => { + triggerUpdate = null + } + }, + } + + const engine = new FeedEngine() + .register(source) + .registerPostProcessor(async () => { + callCount++ + return {} + }) + + engine.start() + + // Wait for initial periodic refresh + await new Promise((resolve) => setTimeout(resolve, 50)) + const countAfterStart = callCount + + // Trigger a reactive context update + triggerUpdate!({ foo: "bar" }) + await new Promise((resolve) => setTimeout(resolve, 50)) + + expect(callCount).toBeGreaterThan(countAfterStart) + + engine.stop() + }) + + test("post-processors run during reactive item updates", async () => { + let callCount = 0 + + let triggerItemsUpdate: (() => void) | null = null + + const source: FeedSource = { + id: "aris.reactive", + ...noActions, + async fetchContext() { + return null + }, + async fetchItems() { + return [weatherItem("w1", 20)] + }, + onItemsUpdate(callback, _getContext) { + triggerItemsUpdate = callback + return () => { + triggerItemsUpdate = null + } + }, + } + + const engine = new FeedEngine() + .register(source) + .registerPostProcessor(async () => { + callCount++ + return {} + }) + + engine.start() + + await new Promise((resolve) => setTimeout(resolve, 50)) + const countAfterStart = callCount + + // Trigger a reactive items update + triggerItemsUpdate!() + await new Promise((resolve) => setTimeout(resolve, 50)) + + expect(callCount).toBeGreaterThan(countAfterStart) + + engine.stop() + }) + }) + + // ============================================================================= + // NO PROCESSORS = NO CHANGE + // ============================================================================= + + describe("no processors", () => { + test("engine without post-processors returns raw items unchanged", async () => { + const items = [weatherItem("w1", 20), weatherItem("w2", 25)] + const engine = new FeedEngine().register(createWeatherSource(items)) + + const result = await engine.refresh() + expect(result.items).toHaveLength(2) + expect(result.items[0].id).toBe("w1") + expect(result.items[1].id).toBe("w2") + expect(result.groupedItems).toBeUndefined() + }) + }) + + // ============================================================================= + // COMBINED ENHANCEMENT + // ============================================================================= + + describe("combined enhancement", () => { + test("single processor can use all enhancement fields at once", async () => { + const engine = new FeedEngine() + .register(createWeatherSource([weatherItem("w1", 20), weatherItem("w2", 25)])) + .registerPostProcessor(async () => ({ + additionalItems: [calendarItem("c1", "Injected")], + suppress: ["w2"], + groupedItems: [{ itemIds: ["w1", "c1"], summary: "Related" }], + })) + + const result = await engine.refresh() + + // w2 suppressed, c1 injected → w1 + c1 + expect(result.items).toHaveLength(2) + expect(result.items.map((i) => i.id)).toEqual(["w1", "c1"]) + + // Groups on result + expect(result.groupedItems).toEqual([{ itemIds: ["w1", "c1"], summary: "Related" }]) + }) + }) +}) diff --git a/packages/aris-core/src/feed-post-processor.ts b/packages/aris-core/src/feed-post-processor.ts new file mode 100644 index 0000000..0b211e3 --- /dev/null +++ b/packages/aris-core/src/feed-post-processor.ts @@ -0,0 +1,23 @@ +import type { FeedItem } from "./feed" + +export interface ItemGroup { + /** IDs of items to present together */ + itemIds: string[] + /** Summary text for the group */ + summary: string +} + +export interface FeedEnhancement { + /** New items to inject into the feed */ + additionalItems?: FeedItem[] + /** Groups of items to present together with a summary */ + groupedItems?: ItemGroup[] + /** Item IDs to remove from the feed */ + suppress?: string[] +} + +/** + * A function that transforms feed items and produces enhancement directives. + * Use named functions for meaningful error attribution. + */ +export type FeedPostProcessor = (items: FeedItem[]) => Promise diff --git a/packages/aris-core/src/index.ts b/packages/aris-core/src/index.ts index 7b38c9e..ecd3348 100644 --- a/packages/aris-core/src/index.ts +++ b/packages/aris-core/src/index.ts @@ -13,6 +13,9 @@ export { TimeRelevance } from "./feed" // Feed Source export type { FeedSource } from "./feed-source" +// Feed Post-Processor +export type { FeedEnhancement, FeedPostProcessor, ItemGroup } from "./feed-post-processor" + // Feed Engine export type { FeedEngineConfig, FeedResult, FeedSubscriber, SourceError } from "./feed-engine" export { FeedEngine } from "./feed-engine"