Compare commits

..

3 Commits

Author SHA1 Message Date
2cdc769f49 ci: add nix dev tooling 2026-06-18 13:11:02 +01:00
d3d9def260 feat: use websocket in agent cli (#143) 2026-06-17 23:35:00 +01:00
227f196d5e feat: add agent websocket endpoint (#142) 2026-06-17 23:19:45 +01:00
7 changed files with 729 additions and 102 deletions

View File

@@ -39,4 +39,13 @@ Use Bun exclusively. Do not use npm or yarn.
- Branch: `feat/<task>`, `fix/<task>`, `ci/<task>`, etc.
- Commits: conventional commit format, title <= 50 chars
- Signing: If `GPG_PRIVATE_KEY_PASSPHRASE` env var is available, use it to sign commits with `git commit -S`
## Nix
Use the Nix dev shell for project commands by default.
- Run repo tooling through `nix develop -c`, e.g. `nix develop -c bun test`.
- Use Bun exclusively inside the Nix shell.
- Do not use host `bun`, `node`, `tsc`, or package binaries for project tasks unless explicitly checking host behavior.
- Simple inspection commands like `rg`, `sed`, `ls`, and `git status` may run outside Nix.
- While `flake.nix` is untracked, use `nix develop path:. -c <command>`.

View File

@@ -7,5 +7,9 @@
"format": "oxfmt --write .",
"start": "bun run src/agent-test-cli.ts",
"typecheck": "bun tsc --noEmit"
},
"dependencies": {
"@freya/agent-protocol": "workspace:*",
"@nym.sh/jrpc": "^0.1.0"
}
}

View File

@@ -1,3 +1,13 @@
import type {
AgentClientApi,
AgentEvent,
AgentServerApi,
SendMessageResult,
} from "@freya/agent-protocol"
import type { JrpcChannel, JrpcMessage, JsonRpcMessage } from "@nym.sh/jrpc"
import { JsonRpcClient, JsonRpcServer } from "@nym.sh/jrpc"
type JsonObject = Record<string, unknown>
interface AuthUser {
@@ -15,10 +25,6 @@ interface AuthSession {
}
}
interface QueryResponse {
message: string
}
interface QueryToolDefinition {
name: string
label: string
@@ -60,6 +66,219 @@ class CookieJar {
}
}
class AgentWebSocketSession implements AgentClientApi {
private readonly channel: WebSocketJrpcChannel
private readonly client: JsonRpcClient<AgentServerApi>
private readonly server: JsonRpcServer<AgentClientApi>
private conversationId: string | undefined
private responseHadText = false
private constructor(channel: WebSocketJrpcChannel) {
this.channel = channel
this.client = new JsonRpcClient<AgentServerApi>(channel)
this.server = new JsonRpcServer<AgentClientApi>(
{
notify: this.notify.bind(this),
},
channel,
)
}
static async connect(backendUrl: string, cookies: CookieJar): Promise<AgentWebSocketSession> {
const channel = new WebSocketJrpcChannel(agentWebSocketUrl(backendUrl), cookies.header())
const session = new AgentWebSocketSession(channel)
try {
await channel.waitUntilOpen()
void session.server.start().catch((err: unknown) => {
if (!channel.isClosed()) {
console.error(`\nWebSocket JSON-RPC failed: ${formatError(err)}\n`)
}
})
await session.client.call("ping")
} catch (err) {
channel.close()
throw err
}
return session
}
async ask(message: string): Promise<void> {
this.responseHadText = false
const result = await this.sendMessage(message)
if (result.conversationId) {
this.conversationId = result.conversationId
}
if (!this.responseHadText) {
console.log(`\nagent> ${result.message || "(no message)"}`)
}
console.log("")
}
notify(event: AgentEvent): void {
switch (event.type) {
case "conversation_started":
this.conversationId = event.conversationId
break
case "message_created":
this.printMessage(event.text)
break
case "tool_started":
console.log(`\ntool> ${event.toolName} started`)
break
case "tool_finished":
console.log(`tool> ${event.toolName} ${event.ok ? "finished" : "failed"}`)
break
case "message_finished":
break
case "message_failed":
console.log(`\nagent! ${event.error}`)
break
}
}
describeConversation(): string {
return this.conversationId ? `Conversation: ${this.conversationId}` : "No conversation yet."
}
close(): void {
this.channel.close()
}
private async sendMessage(message: string): Promise<SendMessageResult> {
return this.client.call("sendMessage", message)
}
private printMessage(text: string): void {
if (text === "") return
console.log(`\nagent> ${text}`)
this.responseHadText = true
}
}
class WebSocketJrpcChannel implements JrpcChannel {
private readonly ws: WebSocket
private readonly opened: Promise<void>
private closed = false
private openedOnce = false
private queue: JrpcMessage[] = []
private waiters: Array<(result: IteratorResult<JrpcMessage, void>) => void> = []
constructor(url: string, cookieHeader?: string) {
this.ws = new WebSocket(url, createWebSocketOptions(cookieHeader))
this.opened = new Promise((resolve, reject) => {
this.ws.onopen = () => {
this.openedOnce = true
resolve()
}
this.ws.onerror = () => {
if (!this.openedOnce) {
reject(new Error(`Could not connect to ${url}`))
}
}
this.ws.onclose = (event) => {
if (!this.openedOnce) {
reject(new Error(formatWebSocketClose(url, event)))
}
this.close()
}
})
this.ws.onmessage = (event) => {
this.receive(event.data)
}
}
waitUntilOpen(): Promise<void> {
return this.opened
}
isClosed(): boolean {
return this.closed
}
async send(msg: JsonRpcMessage): Promise<void> {
await this.opened
if (this.closed || this.ws.readyState !== WebSocket.OPEN) {
throw new Error("JSON-RPC WebSocket channel is closed")
}
this.ws.send(JSON.stringify(msg))
}
async next(): Promise<IteratorResult<JrpcMessage, void>> {
const msg = this.queue.shift()
if (msg) {
return { done: false, value: msg }
}
if (this.closed) {
return { done: true, value: undefined }
}
return new Promise((resolve) => {
this.waiters.push(resolve)
})
}
async return(): Promise<IteratorResult<JrpcMessage, void>> {
this.close()
return { done: true, value: undefined }
}
async throw(error?: unknown): Promise<IteratorResult<JrpcMessage, void>> {
this.close()
throw error
}
async [Symbol.asyncDispose](): Promise<void> {
await this.return()
}
close(): void {
if (this.closed) return
this.closed = true
for (const resolve of this.waiters.splice(0)) {
resolve({ done: true, value: undefined })
}
if (this.ws.readyState === WebSocket.CONNECTING || this.ws.readyState === WebSocket.OPEN) {
this.ws.close()
}
}
[Symbol.asyncIterator](): AsyncGenerator<JrpcMessage, void, unknown> {
return this
}
private receive(message: unknown): void {
const parsed = parseJrpcMessage(message)
if (!parsed) {
this.ws.close(1003, "Invalid JSON-RPC message")
this.close()
return
}
this.push(parsed)
}
private push(msg: JrpcMessage): void {
if (this.closed) return
const resolve = this.waiters.shift()
if (resolve) {
resolve({ done: false, value: msg })
return
}
this.queue.push(msg)
}
}
async function main(): Promise<void> {
if (wantsHelp()) {
printUsage()
@@ -111,73 +330,72 @@ async function runChatLoop(
cookies: CookieJar,
session: AuthSession,
): Promise<void> {
const agent = await AgentWebSocketSession.connect(backendUrl, cookies)
console.log("Connected to /api/agent/ws")
printHelp()
for (;;) {
const input = askOptional("you> ")?.trim()
if (!input) continue
try {
for (;;) {
const input = askOptional("you> ")?.trim()
if (!input) continue
if (input === "/quit" || input === "/exit") {
console.log("Bye.")
return
}
if (input === "/quit" || input === "/exit") {
console.log("Bye.")
return
}
if (input === "/help") {
printHelp()
continue
}
if (input === "/help") {
printHelp()
continue
}
if (input === "/session") {
console.log(`${session.user.name || session.user.email} (${session.user.id})`)
continue
}
if (input === "/session") {
console.log(`${session.user.name || session.user.email} (${session.user.id})`)
continue
}
if (input === "/tools") {
await runCliCommand(() => listQueryTools(backendUrl, cookies))
continue
}
if (input === "/conversation") {
console.log(agent.describeConversation())
continue
}
if (input.startsWith("/tool ")) {
await runCliCommand(() => executeQueryTool(backendUrl, cookies, input.slice("/tool ".length)))
continue
}
if (input === "/tools") {
await runCliCommand(() => listQueryTools(backendUrl, cookies))
continue
}
if (input.startsWith("/actions ")) {
await runCliCommand(() =>
listSourceActions(backendUrl, cookies, input.slice("/actions ".length)),
)
continue
}
if (input.startsWith("/tool ")) {
await runCliCommand(() =>
executeQueryTool(backendUrl, cookies, input.slice("/tool ".length)),
)
continue
}
if (input.startsWith("/action ")) {
await runCliCommand(() =>
executeSourceAction(backendUrl, cookies, input.slice("/action ".length)),
)
continue
}
if (input.startsWith("/actions ")) {
await runCliCommand(() =>
listSourceActions(backendUrl, cookies, input.slice("/actions ".length)),
)
continue
}
try {
await askAgent(backendUrl, cookies, input)
} catch (err) {
console.error(`\n${formatError(err)}\n`)
if (input.startsWith("/action ")) {
await runCliCommand(() =>
executeSourceAction(backendUrl, cookies, input.slice("/action ".length)),
)
continue
}
try {
await agent.ask(input)
} catch (err) {
console.error(`\n${formatError(err)}\n`)
}
}
} finally {
agent.close()
}
}
async function askAgent(backendUrl: string, cookies: CookieJar, message: string): Promise<void> {
const data = await requestJson(backendUrl, cookies, "/api/agent", {
method: "POST",
body: { message },
})
if (!isQueryResponse(data)) {
throw new Error("Query returned an unexpected response shape")
}
console.log(`\nagent> ${data.message || "(no message)"}`)
console.log("")
}
async function runCliCommand(command: () => Promise<void>): Promise<void> {
try {
await command()
@@ -327,7 +545,7 @@ async function requestJson(
function printIntro(): void {
console.log("FREYA agent test CLI")
console.log("Connect to a backend, sign in, then send test messages to /api/agent.\n")
console.log("Connect to a backend, sign in, then send test messages to /api/agent/ws.\n")
}
function printUsage(): void {
@@ -348,6 +566,7 @@ function printHelp(): void {
console.log(" /tool Execute an agent debug tool with JSON params")
console.log(" /actions List source actions: /actions <source-id>")
console.log(" /action Execute source action: /action <source-id> <action-id> <json-params>")
console.log(" /conversation Show the current websocket conversation")
console.log(" /session Show the signed-in user")
console.log(" /help Show commands")
console.log(" /quit Exit\n")
@@ -417,6 +636,33 @@ function normalizeBackendUrl(value: string): string {
}
}
function agentWebSocketUrl(backendUrl: string): string {
const url = new URL(backendUrl)
url.protocol = url.protocol === "https:" ? "wss:" : "ws:"
url.pathname = "/api/agent/ws"
url.search = ""
url.hash = ""
return url.toString()
}
function createWebSocketOptions(cookieHeader?: string): Bun.WebSocketOptions | undefined {
if (!cookieHeader) return undefined
return {
headers: {
Cookie: cookieHeader,
},
}
}
function formatWebSocketClose(
url: string,
event: { code: number; reason: string; wasClean: boolean },
): string {
const reason = event.reason ? `: ${event.reason}` : ""
return `Could not connect to ${url} (${event.code}${reason})`
}
function formatPromptLabel(label: string, defaultValue?: string): string {
return defaultValue ? `${label} (${defaultValue}): ` : `${label}: `
}
@@ -511,6 +757,25 @@ function splitSetCookieHeader(header: string): string[] {
return parts.filter(Boolean)
}
function parseJrpcMessage(message: unknown): JrpcMessage | null {
const text = webSocketMessageText(message)
if (!text) return null
try {
const value: unknown = JSON.parse(text)
return isJrpcMessage(value) ? value : null
} catch {
return null
}
}
function webSocketMessageText(message: unknown): string | null {
if (typeof message === "string") return message
if (message instanceof Uint8Array) return Buffer.from(message).toString("utf8")
if (message instanceof ArrayBuffer) return Buffer.from(message).toString("utf8")
return null
}
async function readResponseError(response: Response, path: string): Promise<string> {
const text = await response.text()
if (response.status === 404 && path === "/api/agent") {
@@ -548,11 +813,6 @@ function isAuthSession(value: unknown): value is AuthSession {
)
}
function isQueryResponse(value: unknown): value is QueryResponse {
if (!isJsonObject(value)) return false
return typeof value.message === "string"
}
function isQueryToolsResponse(value: unknown): value is QueryToolsResponse {
if (!isJsonObject(value) || !Array.isArray(value.tools)) return false
return value.tools.every(isQueryToolDefinition)
@@ -585,6 +845,33 @@ function isSourceActionDefinition(value: unknown): value is { id: string; descri
)
}
function isJrpcMessage(value: unknown): value is JrpcMessage {
if (!isJsonObject(value) || value.jsonrpc !== "2.0" || typeof value.id !== "number") {
return false
}
if ("method" in value) {
return (
typeof value.method === "string" &&
(value.params === undefined || Array.isArray(value.params))
)
}
if ("result" in value) {
return true
}
if ("error" in value) {
return isJsonRpcErrorObject(value.error)
}
return false
}
function isJsonRpcErrorObject(value: unknown): boolean {
return isJsonObject(value) && typeof value.code === "number" && typeof value.message === "string"
}
function isJsonObject(value: unknown): value is JsonObject {
return typeof value === "object" && value !== null && !Array.isArray(value)
}

View File

@@ -49,6 +49,10 @@
"apps/agent-test-cli": {
"name": "@freya/agent-test-cli",
"version": "0.0.0",
"dependencies": {
"@freya/agent-protocol": "workspace:*",
"@nym.sh/jrpc": "^0.1.0",
},
},
"apps/freya-backend": {
"name": "@freya/backend",

27
flake.lock generated Normal file
View File

@@ -0,0 +1,27 @@
{
"nodes": {
"nixpkgs": {
"locked": {
"lastModified": 1781577229,
"narHash": "sha256-lrp67w8AulE9Ks53n27I45ADSzbOCn4H+CNW1Ck8B+8=",
"owner": "NixOS",
"repo": "nixpkgs",
"rev": "567a49d1913ce81ac6e9582e3553dd90a955875f",
"type": "github"
},
"original": {
"owner": "NixOS",
"ref": "nixos-unstable",
"repo": "nixpkgs",
"type": "github"
}
},
"root": {
"inputs": {
"nixpkgs": "nixpkgs"
}
}
},
"root": "root",
"version": 7
}

278
flake.nix Normal file
View File

@@ -0,0 +1,278 @@
{
description = "FREYA development shell";
inputs = {
nixpkgs.url = "github:NixOS/nixpkgs/nixos-unstable";
};
outputs =
{ nixpkgs, ... }:
let
systems = [
"x86_64-linux"
"aarch64-linux"
"x86_64-darwin"
"aarch64-darwin"
];
lib = nixpkgs.lib;
forEachSystem = lib.genAttrs systems;
pkgsFor = forEachSystem (system: import nixpkgs { inherit system; });
# App outputs are for long-running local tools and dev servers.
appScripts = {
expo = "expo";
drizzle-studio = "drizzle-studio";
freya-backend = "freya-backend";
admin-dashboard = "admin-dashboard";
agent-test-cli = "agent-test-cli";
};
# Check outputs are the CI-like validation commands run by `nix flake check`.
checkCommands = {
format-check = "bun run format:check";
lint = "bun run lint";
test = "bun run test";
};
# Dev-shell conveniences mirror the common app/check commands.
shellScripts = appScripts // {
freya-test = "test";
lint = "lint";
format-check = "format:check";
};
# node_modules is content-addressed. If bun.lock or package manifests
# change, Nix will report the new hash to put here.
nodeModulesHashes = {
x86_64-linux = "sha256-apVZaFGf9OKpil1WdcQ1CJODsIdjLWlBBZErHg5mjZA=";
};
checkSystems = lib.attrNames nodeModulesHashes;
# Dependency derivations only need the lockfile and workspace manifests,
# so source-only edits do not force Bun to reinstall.
dependencySource = lib.fileset.toSource {
root = ./.;
fileset = lib.fileset.fileFilter (file: file.name == "bun.lock" || file.name == "package.json") ./.;
};
# Checks run against a clean source tree, even when using `path:.`.
# Without this filter, local node_modules can sneak into the Nix sandbox.
projectSource = builtins.path {
name = "freya-source";
path = ./.;
filter =
path: type:
let
name = builtins.baseNameOf path;
in
!(type == "directory" && (name == ".git" || name == "node_modules")) && name != "result";
};
mkBunScriptCommands =
pkgs: scripts:
let
mkBunScript =
name: script:
pkgs.writeShellApplication {
inherit name;
runtimeInputs = with pkgs; [
bun
git
];
text = ''
repo_root="$(git rev-parse --show-toplevel 2>/dev/null || pwd)"
cd "$repo_root"
exec bun run ${lib.escapeShellArg script} "$@"
'';
};
in
lib.mapAttrs mkBunScript scripts;
mkBunApps =
commands:
lib.mapAttrs (name: command: {
type = "app";
program = "${command}/bin/${name}";
}) commands;
mkBunNodeModules =
system: pkgs:
pkgs.stdenvNoCC.mkDerivation {
pname = "freya-node-modules";
version = "1";
__structuredAttrs = true;
src = dependencySource;
nativeBuildInputs = with pkgs; [
bun
cacert
nodejs
];
SSL_CERT_FILE = "${pkgs.cacert}/etc/ssl/certs/ca-bundle.crt";
GIT_SSL_CAINFO = "${pkgs.cacert}/etc/ssl/certs/ca-bundle.crt";
outputHashAlgo = "sha256";
outputHashMode = "recursive";
outputHash = nodeModulesHashes.${system};
# `patchShebangs` embeds Nix store interpreters in package bins. The
# check derivations also depend on bun/node, so this dependency blob
# can safely drop those references after its hash is verified.
unsafeDiscardReferences.out = true;
dontConfigure = true;
# Workspace package links are completed inside each check's source tree,
# so they are intentionally dangling in this dependency-only output.
dontFixup = true;
buildPhase = ''
runHook preBuild
export HOME="$TMPDIR/home"
mkdir -p "$HOME"
# Keep the real workspace manifest for `--frozen-lockfile`, but
# filter out frontend workspaces that do not participate in checks.
# `--force` matters in the Nix sandbox: without it, Bun can accept
# manifest-only cached packages and leave tool binaries missing.
bun install \
--force \
--frozen-lockfile \
--ignore-scripts \
--backend copyfile \
--filter freya \
--filter '@freya/*' \
--filter '@freya/backend' \
--no-progress
patchShebangs node_modules
runHook postBuild
'';
installPhase = ''
runHook preInstall
mkdir -p "$out"
# Keep the root install in the store; checks symlink this directly.
cp -a node_modules "$out/node_modules"
# Bun also creates per-workspace node_modules directories. These are
# mostly relative symlinks, so checks copy the symlink entries into
# their writable source tree instead of symlinking the directory.
find apps packages -mindepth 2 -maxdepth 2 -type d -name node_modules -print |
while IFS= read -r node_modules_dir; do
mkdir -p "$out/$(dirname "$node_modules_dir")"
cp -a "$node_modules_dir" "$out/$node_modules_dir"
done
runHook postInstall
'';
};
mkBunCheck =
pkgs: nodeModules: name: command:
pkgs.stdenvNoCC.mkDerivation {
pname = "freya-${name}";
version = "1";
src = projectSource;
nativeBuildInputs = with pkgs; [
bun
nodejs
];
dontConfigure = true;
buildPhase = ''
runHook preBuild
export HOME="$TMPDIR/home"
mkdir -p "$HOME"
# Root dependencies are read-only and shared across checks.
ln -s "${nodeModules}/node_modules" node_modules
# Workspace node_modules contain relative symlinks back to packages/
# and apps/, so copy just those symlink entries into this source tree.
for node_modules_dir in "${nodeModules}"/apps/*/node_modules "${nodeModules}"/packages/*/node_modules; do
if [ -d "$node_modules_dir" ]; then
relative_path="''${node_modules_dir#"${nodeModules}/"}"
mkdir -p "$relative_path"
cp -a "$node_modules_dir/." "$relative_path/"
fi
done
${command}
runHook postBuild
'';
installPhase = ''
runHook preInstall
mkdir -p "$out"
touch "$out/${name}"
runHook postInstall
'';
};
in
{
apps = forEachSystem (
system:
let
pkgs = pkgsFor.${system};
in
mkBunApps (mkBunScriptCommands pkgs appScripts)
);
checks = lib.genAttrs checkSystems (
system:
let
pkgs = pkgsFor.${system};
nodeModules = mkBunNodeModules system pkgs;
in
lib.mapAttrs (mkBunCheck pkgs nodeModules) checkCommands
);
devShells = forEachSystem (
system:
let
pkgs = pkgsFor.${system};
bunScriptCommands = lib.attrValues (mkBunScriptCommands pkgs shellScripts);
commonPackages = with pkgs; [
bun
git
gnumake
nixfmt
nodejs
openssl
pkg-config
postgresql
python3
watchman
];
linuxPackages = with pkgs; [
gcc
inotify-tools
tailscale
];
in
{
default = pkgs.mkShell {
packages =
commonPackages ++ bunScriptCommands ++ pkgs.lib.optionals pkgs.stdenv.isLinux linuxPackages;
SSL_CERT_FILE = "${pkgs.cacert}/etc/ssl/certs/ca-bundle.crt";
shellHook = ''
export PATH="$PWD/node_modules/.bin:$PATH"
'';
};
}
);
formatter = forEachSystem (system: pkgsFor.${system}.nixfmt);
};
}

View File

@@ -1,4 +1,4 @@
import { describe, expect, test } from "bun:test"
import { describe, expect, spyOn, test } from "bun:test"
import type { ActionDefinition, ContextEntry, ContextKey, FeedItem, FeedSource } from "./index"
@@ -145,6 +145,16 @@ function createAlertSource(): FeedSource<AlertFeedItem> {
}
}
async function waitForCondition(predicate: () => boolean, timeoutMs = 2_000): Promise<void> {
const deadline = Date.now() + timeoutMs
while (!predicate()) {
if (Date.now() > deadline) {
throw new Error("Timed out waiting for condition")
}
await new Promise((resolve) => setTimeout(resolve, 10))
}
}
// =============================================================================
// TESTS
// =============================================================================
@@ -807,28 +817,35 @@ describe("FeedEngine", () => {
})
test("TTL resets after reactive update", async () => {
let now = 1_000
const nowSpy = spyOn(Date, "now").mockImplementation(() => now)
const location = createLocationSource()
const weather = createWeatherSource()
const engine = new FeedEngine({ cacheTtlMs: 100 }).register(location).register(weather)
engine.start()
try {
engine.start()
// Initial reactive update
location.simulateUpdate({ lat: 51.5, lng: -0.1 })
await new Promise((resolve) => setTimeout(resolve, 50))
// Initial reactive update
location.simulateUpdate({ lat: 51.5, lng: -0.1 })
await new Promise((resolve) => setTimeout(resolve, 50))
expect(engine.lastFeed()).not.toBeNull()
expect(engine.lastFeed()).not.toBeNull()
// Wait 70ms (total 120ms from first update, past original TTL)
// but trigger another update at 50ms to reset TTL
location.simulateUpdate({ lat: 52.0, lng: -0.2 })
await new Promise((resolve) => setTimeout(resolve, 50))
// Move past the original TTL, then trigger another update to reset it.
now += 120
location.simulateUpdate({ lat: 52.0, lng: -0.2 })
await new Promise((resolve) => setTimeout(resolve, 50))
// Should still be cached because TTL was reset by second update
expect(engine.lastFeed()).not.toBeNull()
// Should still be cached because TTL was reset by second update.
expect(engine.lastFeed()).not.toBeNull()
engine.stop()
engine.stop()
} finally {
engine.stop()
nowSpy.mockRestore()
}
})
test("cacheTtlMs is configurable", async () => {
@@ -869,17 +886,21 @@ describe("FeedEngine", () => {
},
}
const engine = new FeedEngine({ cacheTtlMs: 50 }).register(source)
engine.start()
const engine = new FeedEngine({ cacheTtlMs: 20 }).register(source)
await engine.refresh()
// Wait for two TTL intervals to elapse
await new Promise((resolve) => setTimeout(resolve, 120))
expect(fetchCount).toBe(1)
// Should have auto-refreshed at least twice
expect(fetchCount).toBeGreaterThanOrEqual(2)
expect(engine.lastFeed()).not.toBeNull()
try {
engine.start()
engine.stop()
await waitForCondition(() => fetchCount >= 2)
expect(fetchCount).toBeGreaterThanOrEqual(2)
expect(engine.lastFeed()).not.toBeNull()
} finally {
engine.stop()
}
})
test("stop cancels periodic refresh", async () => {
@@ -935,28 +956,25 @@ describe("FeedEngine", () => {
},
}
const engine = new FeedEngine({ cacheTtlMs: 100 })
const engine = new FeedEngine({ cacheTtlMs: 10_000 })
.register(location)
.register(countingWeather)
const clearTimeoutSpy = spyOn(globalThis, "clearTimeout")
engine.start()
try {
engine.start()
// At 40ms, push a reactive update — this resets the timer
await new Promise((resolve) => setTimeout(resolve, 40))
const countBeforeUpdate = fetchCount
location.simulateUpdate({ lat: 51.5, lng: -0.1 })
await new Promise((resolve) => setTimeout(resolve, 20))
const countBeforeUpdate = fetchCount
location.simulateUpdate({ lat: 51.5, lng: -0.1 })
await waitForCondition(() => fetchCount > countBeforeUpdate && engine.lastFeed() !== null)
// Reactive update triggered a fetch
expect(fetchCount).toBeGreaterThan(countBeforeUpdate)
const countAfterUpdate = fetchCount
// At 100ms from start (60ms after reactive update), the original
// timer would have fired, but it was reset. No extra fetch yet.
await new Promise((resolve) => setTimeout(resolve, 40))
expect(fetchCount).toBe(countAfterUpdate)
engine.stop()
// Reactive updates refresh the cache and reset the pending periodic timer.
expect(fetchCount).toBeGreaterThan(countBeforeUpdate)
expect(clearTimeoutSpy).toHaveBeenCalled()
} finally {
engine.stop()
clearTimeoutSpy.mockRestore()
}
})
})