feat: add post-processor pipeline to FeedEngine (#41)

* feat: add post-processor pipeline to FeedEngine

Add FeedPostProcessor type and FeedEnhancement interface.
Post-processors run after item collection on all update
paths (refresh, reactive context, reactive items).

Pipeline is chained — each processor sees items as modified
by the previous one. Enhancement merging handles additional
items, suppression, and grouped items. Throwing processors
are caught and recorded in FeedResult.errors.

Co-authored-by: Ona <no-reply@ona.com>

* docs: document intentional TItems cast in post-processor merge

Co-authored-by: Ona <no-reply@ona.com>

* fix: filter stale item IDs from groups after pipeline

Groups accumulated during the pipeline can reference items
that a later processor suppressed. The engine now strips
stale IDs and drops empty groups before returning.

Co-authored-by: Ona <no-reply@ona.com>

* refactor: use reduce for stale group filtering

Co-authored-by: Ona <no-reply@ona.com>

---------

Co-authored-by: Ona <no-reply@ona.com>
This commit is contained in:
2026-02-28 15:57:01 +00:00
committed by GitHub
parent ffea38b986
commit 82ac2b577d
4 changed files with 563 additions and 3 deletions

View File

@@ -1,6 +1,7 @@
import type { ActionDefinition } from "./action"
import type { Context } from "./context"
import type { FeedItem } from "./feed"
import type { FeedPostProcessor, ItemGroup } from "./feed-post-processor"
import type { FeedSource } from "./feed-source"
export interface SourceError {
@@ -12,6 +13,8 @@ export interface FeedResult<TItem extends FeedItem = FeedItem> {
context: Context
items: TItem[]
errors: SourceError[]
/** Item groups produced by post-processors */
groupedItems?: ItemGroup[]
}
export type FeedSubscriber<TItem extends FeedItem = FeedItem> = (result: FeedResult<TItem>) => void
@@ -66,6 +69,7 @@ export class FeedEngine<TItems extends FeedItem = FeedItem> {
private subscribers = new Set<FeedSubscriber<TItems>>()
private cleanups: Array<() => void> = []
private started = false
private postProcessors: FeedPostProcessor[] = []
private readonly cacheTtlMs: number
private cachedResult: FeedResult<TItems> | null = null
@@ -108,6 +112,23 @@ export class FeedEngine<TItems extends FeedItem = FeedItem> {
return this
}
/**
* Registers a post-processor. Processors run in registration order
* after items are collected, on every update path.
*/
registerPostProcessor(processor: FeedPostProcessor): this {
this.postProcessors.push(processor)
return this
}
/**
* Unregisters a post-processor by reference.
*/
unregisterPostProcessor(processor: FeedPostProcessor): this {
this.postProcessors = this.postProcessors.filter((p) => p !== processor)
return this
}
/**
* Refreshes the feed by running all sources in dependency order.
* Calls fetchContext() then fetchItems() on each source.
@@ -152,7 +173,18 @@ export class FeedEngine<TItems extends FeedItem = FeedItem> {
this.context = context
const result: FeedResult<TItems> = { context, items: items as TItems[], errors }
const {
items: processedItems,
groupedItems,
errors: postProcessorErrors,
} = await this.applyPostProcessors(items as TItems[], errors)
const result: FeedResult<TItems> = {
context,
items: processedItems,
errors: postProcessorErrors,
...(groupedItems.length > 0 ? { groupedItems } : {}),
}
this.updateCache(result)
return result
@@ -260,6 +292,58 @@ export class FeedEngine<TItems extends FeedItem = FeedItem> {
return actions
}
private async applyPostProcessors(
items: TItems[],
errors: SourceError[],
): Promise<{ items: TItems[]; groupedItems: ItemGroup[]; errors: SourceError[] }> {
let currentItems = items
const allGroupedItems: ItemGroup[] = []
const allErrors = [...errors]
for (const processor of this.postProcessors) {
const snapshot = currentItems
try {
const enhancement = await processor(currentItems)
if (enhancement.additionalItems?.length) {
// Post-processors operate on FeedItem[] without knowledge of TItems.
// Additional items are merged untyped — this is intentional. The
// processor contract is "FeedItem in, FeedItem out"; type narrowing
// is the caller's responsibility when consuming FeedResult.
currentItems = [...currentItems, ...(enhancement.additionalItems as TItems[])]
}
if (enhancement.suppress?.length) {
const suppressSet = new Set(enhancement.suppress)
currentItems = currentItems.filter((item) => !suppressSet.has(item.id))
}
if (enhancement.groupedItems?.length) {
allGroupedItems.push(...enhancement.groupedItems)
}
} catch (err) {
const sourceId = processor.name || "anonymous"
allErrors.push({
sourceId,
error: err instanceof Error ? err : new Error(String(err)),
})
currentItems = snapshot
}
}
// Remove stale item IDs from groups and drop empty groups
const itemIds = new Set(currentItems.map((item) => item.id))
const validGroups = allGroupedItems.reduce<ItemGroup[]>((acc, group) => {
const ids = group.itemIds.filter((id) => itemIds.has(id))
if (ids.length > 0) {
acc.push({ ...group, itemIds: ids })
}
return acc
}, [])
return { items: currentItems, groupedItems: validGroups, errors: allErrors }
}
private ensureGraph(): SourceGraph {
if (!this.graph) {
this.graph = buildGraph(Array.from(this.sources.values()))
@@ -311,10 +395,17 @@ export class FeedEngine<TItems extends FeedItem = FeedItem> {
}
}
const {
items: processedItems,
groupedItems,
errors: postProcessorErrors,
} = await this.applyPostProcessors(items as TItems[], errors)
const result: FeedResult<TItems> = {
context: this.context,
items: items as TItems[],
errors,
items: processedItems,
errors: postProcessorErrors,
...(groupedItems.length > 0 ? { groupedItems } : {}),
}
this.updateCache(result)