Compare commits

..

1 Commits

Author SHA1 Message Date
dc91a048c9 fix: upgrade client to expo 56
Upgrade the React Native client through Expo SDK 56, align workspace React versions, switch Bun installs to the hoisted linker for Expo compatibility, and fix the Metro proxy to handle localhost/IPv6 loopback after the SDK upgrade.
2026-06-18 16:24:26 +01:00
55 changed files with 561 additions and 3000 deletions

View File

@@ -1,134 +0,0 @@
---
name: upgrading-expo
description: Guidelines for upgrading Expo SDK versions and fixing dependency issues
version: 1.0.0
license: MIT
---
## References
- ./references/react-19.md -- SDK +54: React 19 changes (useContext → use, Context.Provider → Context, forwardRef removal)
- ./references/new-architecture.md -- SDK +53: New Architecture migration guide
- ./references/react-compiler.md -- SDK +54: React Compiler setup and migration guide
- ./references/native-tabs.md -- SDK +55: Native tabs changes (Icon/Label/Badge now accessed via NativeTabs.Trigger.\*)
- ./references/expo-av-to-audio.md -- SDK +55: Migrate audio playback and recording from expo-av to expo-audio
- ./references/expo-av-to-video.md -- SDK +55: Migrate video playback from expo-av to expo-video
- ./references/react-navigation-to-expo-router.md -- SDK +56: Migrate `@react-navigation/*` imports to `expo-router` entry points (codemod + manual mapping)
## Beta/Preview Releases
Beta versions use `.preview` suffix (e.g., `55.0.0-preview.2`), published under `@next` tag.
Check if latest is beta: https://exp.host/--/api/v2/versions (look for `-preview` in `expoVersion`)
```bash
npx expo install expo@next --fix # install beta
```
## Step-by-Step Upgrade Process
1. Upgrade Expo and dependencies
```bash
npx expo install expo@latest
npx expo install --fix
```
2. Run diagnostics: `npx expo-doctor`
3. Clear caches and reinstall
```bash
npx expo export -p ios --clear
rm -rf node_modules .expo
watchman watch-del-all
```
## Breaking Changes Checklist
- Check for removed APIs in release notes
- Update import paths for moved modules
- Review native module changes requiring prebuild
- Test all camera, audio, and video features
- Verify navigation still works correctly
## Prebuild for Native Changes
**First check if `ios/` and `android/` directories exist in the project.** If neither directory exists, the project uses Continuous Native Generation (CNG) and native projects are regenerated at build time — skip this section and "Clear caches for bare workflow" entirely.
If upgrading requires native changes:
```bash
npx expo prebuild --clean
```
This regenerates the `ios` and `android` directories. Ensure the project is not a bare workflow app before running this command.
## Clear caches for bare workflow
These steps only apply when `ios/` and/or `android/` directories exist in the project:
- Clear the cocoapods cache for iOS: `cd ios && pod install --repo-update`
- Clear derived data for Xcode: `npx expo run:ios --no-build-cache`
- Clear the Gradle cache for Android: `cd android && ./gradlew clean`
## Housekeeping
- Review release notes for the target SDK version at https://expo.dev/changelog
- If using Expo SDK 54 or later, ensure react-native-worklets is installed — this is required for react-native-reanimated to work.
- Enable React Compiler in SDK 54+ by adding `"experiments": { "reactCompiler": true }` to app.json — it's stable and recommended
- Delete sdkVersion from `app.json` to let Expo manage it automatically
- Remove implicit packages from `package.json`: `@babel/core`, `babel-preset-expo`, `expo-constants`.
- If the babel.config.js only contains 'babel-preset-expo', delete the file
- If the metro.config.js only contains expo defaults, delete the file
## Deprecated Packages
| Old Package | Replacement |
| -------------------- | ---------------------------------------------------- |
| `expo-av` | `expo-audio` and `expo-video` |
| `expo-permissions` | Individual package permission APIs |
| `@expo/vector-icons` | `expo-symbols` (for SF Symbols) |
| `AsyncStorage` | `expo-sqlite/localStorage/install` |
| `expo-app-loading` | `expo-splash-screen` |
| expo-linear-gradient | experimental_backgroundImage + CSS gradients in View |
When migrating deprecated packages, update all code usage before removing the old package. For expo-av, consult the migration references to convert Audio.Sound to useAudioPlayer, Audio.Recording to useAudioRecorder, and Video components to VideoView with useVideoPlayer.
## expo.install.exclude
Check if package.json has excluded packages:
```json
{
"expo": { "install": { "exclude": ["react-native-reanimated"] } }
}
```
Exclusions are often workarounds that may no longer be needed after upgrading. Review each one.
## Removing patches
Check if there are any outdated patches in the `patches/` directory. Remove them if they are no longer needed.
## Postcss
- `autoprefixer` isn't needed in SDK +53. Remove it from dependencies and check `postcss.config.js` or `postcss.config.mjs` to remove it from the plugins list.
- Use `postcss.config.mjs` in SDK +53.
## Metro
Remove redundant metro config options:
- resolver.unstable_enablePackageExports is enabled by default in SDK +53.
- `experimentalImportSupport` is enabled by default in SDK +54.
- `EXPO_USE_FAST_RESOLVER=1` is removed in SDK +54.
- cjs and mjs extensions are supported by default in SDK +50.
- Expo webpack is deprecated, migrate to [Expo Router and Metro web](https://docs.expo.dev/router/migrate/from-expo-webpack/).
## Hermes engine v1
Since SDK 55, users can opt-in to use Hermes engine v1 for improved runtime performance. This requires setting `useHermesV1: true` in the `expo-build-properties` config plugin, and may require a specific version of the `hermes-compiler` npm package. Hermes v1 will become a default in some future SDK release.
## New Architecture
The new architecture is enabled by default, the app.json field `"newArchEnabled": true` is no longer needed as it's the default. Expo Go only supports the new architecture as of SDK +53.

View File

@@ -1,4 +0,0 @@
interface:
display_name: "Upgrading Expo"
short_description: "Upgrade Expo SDKs, fix dependencies, adopt React 19 / React Compiler, and migrate deprecated Expo packages"
default_prompt: "Use $upgrading-expo to upgrade an Expo SDK, run diagnostics, fix dependency conflicts, decide whether prebuild/cache clearing applies, and migrate away from deprecated Expo packages."

View File

@@ -1,132 +0,0 @@
# Migrating from expo-av to expo-audio
## Imports
```tsx
// Before
import { Audio } from 'expo-av';
// After
import { useAudioPlayer, useAudioRecorder, RecordingPresets, AudioModule, setAudioModeAsync } from 'expo-audio';
```
## Audio Playback
### Before (expo-av)
```tsx
const [sound, setSound] = useState<Audio.Sound>();
async function playSound() {
const { sound } = await Audio.Sound.createAsync(require('./audio.mp3'));
setSound(sound);
await sound.playAsync();
}
useEffect(() => {
return sound ? () => { sound.unloadAsync(); } : undefined;
}, [sound]);
```
### After (expo-audio)
```tsx
const player = useAudioPlayer(require('./audio.mp3'));
// Play
player.play();
```
## Audio Recording
### Before (expo-av)
```tsx
const [recording, setRecording] = useState<Audio.Recording>();
async function startRecording() {
await Audio.requestPermissionsAsync();
await Audio.setAudioModeAsync({ allowsRecordingIOS: true, playsInSilentModeIOS: true });
const { recording } = await Audio.Recording.createAsync(Audio.RecordingOptionsPresets.HIGH_QUALITY);
setRecording(recording);
}
async function stopRecording() {
await recording?.stopAndUnloadAsync();
const uri = recording?.getURI();
}
```
### After (expo-audio)
```tsx
const recorder = useAudioRecorder(RecordingPresets.HIGH_QUALITY);
async function startRecording() {
await AudioModule.requestRecordingPermissionsAsync();
await recorder.prepareToRecordAsync();
recorder.record();
}
async function stopRecording() {
await recorder.stop();
const uri = recorder.uri;
}
```
## Audio Mode
### Before (expo-av)
```tsx
await Audio.setAudioModeAsync({
allowsRecordingIOS: true,
playsInSilentModeIOS: true,
staysActiveInBackground: true,
interruptionModeIOS: InterruptionModeIOS.DoNotMix,
});
```
### After (expo-audio)
```tsx
await setAudioModeAsync({
playsInSilentMode: true,
shouldPlayInBackground: true,
interruptionMode: 'doNotMix',
});
```
## API Mapping
| expo-av | expo-audio |
|---------|------------|
| `Audio.Sound.createAsync()` | `useAudioPlayer(source)` |
| `sound.playAsync()` | `player.play()` |
| `sound.pauseAsync()` | `player.pause()` |
| `sound.setPositionAsync(ms)` | `player.seekTo(seconds)` |
| `sound.setVolumeAsync(vol)` | `player.volume = vol` |
| `sound.setRateAsync(rate)` | `player.playbackRate = rate` |
| `sound.setIsLoopingAsync(loop)` | `player.loop = loop` |
| `sound.unloadAsync()` | Automatic via hook |
| `playbackStatus.positionMillis` | `player.currentTime` (seconds) |
| `playbackStatus.durationMillis` | `player.duration` (seconds) |
| `playbackStatus.isPlaying` | `player.playing` |
| `Audio.Recording.createAsync()` | `useAudioRecorder(preset)` |
| `Audio.RecordingOptionsPresets.*` | `RecordingPresets.*` |
| `recording.stopAndUnloadAsync()` | `recorder.stop()` |
| `recording.getURI()` | `recorder.uri` |
| `Audio.requestPermissionsAsync()` | `AudioModule.requestRecordingPermissionsAsync()` |
## Key Differences
- **No auto-reset on finish**: After `play()` completes, the player stays paused at the end. To replay, call `player.seekTo(0)` then `play()`
- **Time in seconds**: expo-audio uses seconds, not milliseconds (matching web standards)
- **Immediate loading**: Audio loads immediately when the hook mounts—no explicit preloading needed
- **Automatic cleanup**: No need to call `unloadAsync()`, hooks handle resource cleanup on unmount
- **Multiple players**: Create multiple `useAudioPlayer` instances and store them—all load immediately
- **Direct property access**: Set volume, rate, loop directly on the player object (`player.volume = 0.5`)
## API Reference
https://docs.expo.dev/versions/latest/sdk/audio/

View File

@@ -1,160 +0,0 @@
# Migrating from expo-av to expo-video
## Imports
```tsx
// Before
import { Video, ResizeMode } from 'expo-av';
// After
import { useVideoPlayer, VideoView, VideoSource } from 'expo-video';
import { useEvent, useEventListener } from 'expo';
```
## Video Playback
### Before (expo-av)
```tsx
const videoRef = useRef<Video>(null);
const [status, setStatus] = useState({});
<Video
ref={videoRef}
source={{ uri: 'https://example.com/video.mp4' }}
style={{ width: 350, height: 200 }}
resizeMode={ResizeMode.CONTAIN}
isLooping
onPlaybackStatusUpdate={setStatus}
/>
// Control
videoRef.current?.playAsync();
videoRef.current?.pauseAsync();
```
### After (expo-video)
```tsx
const player = useVideoPlayer('https://example.com/video.mp4', player => {
player.loop = true;
});
const { isPlaying } = useEvent(player, 'playingChange', { isPlaying: player.playing });
<VideoView
player={player}
style={{ width: 350, height: 200 }}
contentFit="contain"
/>
// Control
player.play();
player.pause();
```
## Status Updates
### Before (expo-av)
```tsx
<Video
onPlaybackStatusUpdate={status => {
if (status.isLoaded) {
console.log(status.positionMillis, status.durationMillis, status.isPlaying);
if (status.didJustFinish) console.log('finished');
}
}}
/>
```
### After (expo-video)
```tsx
// Reactive state
const { isPlaying } = useEvent(player, 'playingChange', { isPlaying: player.playing });
// Side effects
useEventListener(player, 'playToEnd', () => console.log('finished'));
// Direct access
console.log(player.currentTime, player.duration, player.playing);
```
## Local Files
### Before (expo-av)
```tsx
<Video source={require('./video.mp4')} />
```
### After (expo-video)
```tsx
const player = useVideoPlayer({ assetId: require('./video.mp4') });
```
## Fullscreen and PiP
```tsx
<VideoView
player={player}
allowsFullscreen
allowsPictureInPicture
onFullscreenEnter={() => {}}
onFullscreenExit={() => {}}
/>
```
For PiP and background playback, add to app.json:
```json
{
"expo": {
"plugins": [
["expo-video", { "supportsBackgroundPlayback": true, "supportsPictureInPicture": true }]
]
}
}
```
## API Mapping
| expo-av | expo-video |
|---------|------------|
| `<Video>` | `<VideoView>` |
| `ref={videoRef}` | `player={useVideoPlayer()}` |
| `source={{ uri }}` | Pass to `useVideoPlayer(uri)` |
| `resizeMode={ResizeMode.CONTAIN}` | `contentFit="contain"` |
| `isLooping` | `player.loop = true` |
| `shouldPlay` | `player.play()` in setup |
| `isMuted` | `player.muted = true` |
| `useNativeControls` | `nativeControls={true}` |
| `onPlaybackStatusUpdate` | `useEvent` / `useEventListener` |
| `videoRef.current.playAsync()` | `player.play()` |
| `videoRef.current.pauseAsync()` | `player.pause()` |
| `videoRef.current.replayAsync()` | `player.replay()` |
| `videoRef.current.setPositionAsync(ms)` | `player.currentTime = seconds` |
| `status.positionMillis` | `player.currentTime` (seconds) |
| `status.durationMillis` | `player.duration` (seconds) |
| `status.didJustFinish` | `useEventListener(player, 'playToEnd')` |
## Key Differences
- **Separate player and view**: Player logic decoupled from the view—one player can be used across multiple views
- **Time in seconds**: Uses seconds, not milliseconds
- **Event system**: Uses `useEvent`/`useEventListener` from `expo` instead of callback props
- **Video preloading**: Create a player without mounting a VideoView to preload for faster transitions
- **Built-in caching**: Set `useCaching: true` in VideoSource for persistent offline caching
## Known Issues
- **Uninstall expo-av first**: On Android, having both expo-av and expo-video installed can cause VideoView to show a black screen. Uninstall expo-av before installing expo-video
- **Android: Reusing players**: Mounting the same player in multiple VideoViews simultaneously can cause black screens on Android (works on iOS)
- **Android: currentTime in setup**: Setting `player.currentTime` in the `useVideoPlayer` setup callback may not work on Android—set it after mount instead
- **Changing source**: Use `player.replace(newSource)` to change videos without recreating the player
## API Reference
https://docs.expo.dev/versions/latest/sdk/video/

View File

@@ -1,124 +0,0 @@
# Native Tabs Migration (SDK 55)
In SDK 55, `Label`, `Icon`, `Badge`, and `VectorIcon` are now accessed as static properties on `NativeTabs.Trigger` rather than separate imports.
## Import Changes
```tsx
// SDK 53/54
import {
NativeTabs,
Icon,
Label,
Badge,
VectorIcon,
} from "expo-router/unstable-native-tabs";
// SDK 55+
import { NativeTabs } from "expo-router/unstable-native-tabs";
```
## Component Changes
| SDK 53/54 | SDK 55+ |
| ---------------- | ----------------------------------- |
| `<Icon />` | `<NativeTabs.Trigger.Icon />` |
| `<Label />` | `<NativeTabs.Trigger.Label />` |
| `<Badge />` | `<NativeTabs.Trigger.Badge />` |
| `<VectorIcon />` | `<NativeTabs.Trigger.VectorIcon />` |
| (n/a) | `<NativeTabs.BottomAccessory />` |
## New in SDK 55
### BottomAccessory
New component for Apple Music-style mini players on iOS +26 that float above the tab bar:
```tsx
<NativeTabs>
<NativeTabs.BottomAccessory>
{/* Content above tabs */}
</NativeTabs.BottomAccessory>
<NativeTabs.Trigger name="(index)">
<NativeTabs.Trigger.Label>Home</NativeTabs.Trigger.Label>
</NativeTabs.Trigger>
</NativeTabs>
```
On Android and web, this component will render as a no-op. Position a view absolutely above the tab bar instead.
### Icon `md` Prop
New `md` prop for Material icon glyphs on Android (alongside existing `drawable`):
```tsx
<NativeTabs.Trigger.Icon sf="house" md="home" />
```
## Full Migration Example
### Before (SDK 53/54)
```tsx
import {
NativeTabs,
Icon,
Label,
Badge,
} from "expo-router/unstable-native-tabs";
export default function TabLayout() {
return (
<NativeTabs minimizeBehavior="onScrollDown">
<NativeTabs.Trigger name="(index)">
<Label>Home</Label>
<Icon sf="house.fill" />
<Badge>3</Badge>
</NativeTabs.Trigger>
<NativeTabs.Trigger name="(settings)">
<Label>Settings</Label>
<Icon sf="gear" />
</NativeTabs.Trigger>
<NativeTabs.Trigger name="(search)" role="search">
<Label>Search</Label>
</NativeTabs.Trigger>
</NativeTabs>
);
}
```
### After (SDK 55+)
```tsx
import { NativeTabs } from "expo-router/unstable-native-tabs";
export default function TabLayout() {
return (
<NativeTabs minimizeBehavior="onScrollDown">
<NativeTabs.Trigger name="(index)">
<NativeTabs.Trigger.Label>Home</NativeTabs.Trigger.Label>
<NativeTabs.Trigger.Icon sf="house.fill" md="home" />
<NativeTabs.Trigger.Badge>3</NativeTabs.Trigger.Badge>
</NativeTabs.Trigger>
<NativeTabs.Trigger name="(settings)">
<NativeTabs.Trigger.Label>Settings</NativeTabs.Trigger.Label>
<NativeTabs.Trigger.Icon sf="gear" md="settings" />
</NativeTabs.Trigger>
<NativeTabs.Trigger name="(search)" role="search">
<NativeTabs.Trigger.Label>Search</NativeTabs.Trigger.Label>
</NativeTabs.Trigger>
</NativeTabs>
);
}
```
## Migration Checklist
1. Remove `Icon`, `Label`, `Badge`, `VectorIcon` from imports
2. Keep only `NativeTabs` import from `expo-router/unstable-native-tabs`
3. Replace `<Icon />` with `<NativeTabs.Trigger.Icon />`
4. Replace `<Label />` with `<NativeTabs.Trigger.Label />`
5. Replace `<Badge />` with `<NativeTabs.Trigger.Badge />`
6. Replace `<VectorIcon />` with `<NativeTabs.Trigger.VectorIcon />`
- Read docs for more info https://docs.expo.dev/versions/v55.0.0/sdk/router-native-tabs/

View File

@@ -1,79 +0,0 @@
# New Architecture
The New Architecture is enabled by default in Expo SDK 53+. It replaces the legacy bridge with a faster, synchronous communication layer between JavaScript and native code.
## Documentation
Full guide: https://docs.expo.dev/guides/new-architecture/
## What Changed
- **JSI (JavaScript Interface)** — Direct synchronous calls between JS and native
- **Fabric** — New rendering system with concurrent features
- **TurboModules** — Lazy-loaded native modules with type safety
## SDK Compatibility
| SDK Version | New Architecture Status |
| ----------- | ----------------------- |
| SDK 53+ | Enabled by default |
| SDK 52 | Opt-in via app.json |
| SDK 51- | Experimental |
## Configuration
New Architecture is enabled by default. To explicitly disable (not recommended):
```json
{
"expo": {
"newArchEnabled": false
}
}
```
## Expo Go
Expo Go only supports the New Architecture as of SDK 53. Apps using the old architecture must use development builds.
## Common Migration Issues
### Native Module Compatibility
Some older native modules may not support the New Architecture. Check:
1. Module documentation for New Architecture support
2. GitHub issues for compatibility discussions
3. Consider alternatives if module is unmaintained
### Reanimated
React Native Reanimated requires `react-native-worklets` in SDK 54+:
```bash
npx expo install react-native-worklets
```
### Layout Animations
Some layout animations behave differently. Test thoroughly after upgrading.
## Verifying New Architecture
Check if New Architecture is active:
```tsx
import { Platform } from "react-native";
// Returns true if Fabric is enabled
const isNewArch = global._IS_FABRIC !== undefined;
```
Verify from the command line if the currently running app uses the New Architecture: `bunx xcobra expo eval "_IS_FABRIC"` -> `true`
## Troubleshooting
1. **Clear caches**`npx expo start --clear`
2. **Clean prebuild**`npx expo prebuild --clean`
3. **Check native modules** — Ensure all dependencies support New Architecture
4. **Review console warnings** — Legacy modules log compatibility warnings

View File

@@ -1,79 +0,0 @@
# React 19
React 19 is included in Expo SDK 54. This release simplifies several common patterns.
## Context Changes
### useContext → use
The `use` hook replaces `useContext`:
```tsx
// Before (React 18)
import { useContext } from "react";
const value = useContext(MyContext);
// After (React 19)
import { use } from "react";
const value = use(MyContext);
```
- The `use` hook can also read promises, enabling Suspense-based data fetching.
- `use` can be called conditionally, this simplifies components that consume multiple contexts.
### Context.Provider → Context
Context providers no longer need the `.Provider` suffix:
```tsx
// Before (React 18)
<ThemeContext.Provider value={theme}>
{children}
</ThemeContext.Provider>
// After (React 19)
<ThemeContext value={theme}>
{children}
</ThemeContext>
```
## ref as a Prop
### Removing forwardRef
Components can now receive `ref` as a regular prop. `forwardRef` is no longer needed:
```tsx
// Before (React 18)
import { forwardRef } from "react";
const Input = forwardRef<TextInput, Props>((props, ref) => {
return <TextInput ref={ref} {...props} />;
});
// After (React 19)
function Input({ ref, ...props }: Props & { ref?: React.Ref<TextInput> }) {
return <TextInput ref={ref} {...props} />;
}
```
### Migration Steps
1. Remove `forwardRef` wrapper
2. Add `ref` to the props destructuring
3. Update the type to include `ref?: React.Ref<T>`
## Other React 19 Features
- **Actions** — Functions that handle async transitions
- **useOptimistic** — Optimistic UI updates
- **useFormStatus** — Form submission state (web)
- **Document Metadata** — Native `<title>` and `<meta>` support (web)
## Cleanup Checklist
When upgrading to SDK 54:
- [ ] Replace `useContext` with `use`
- [ ] Remove `.Provider` from Context components
- [ ] Remove `forwardRef` wrappers, use `ref` prop instead

View File

@@ -1,59 +0,0 @@
# React Compiler
React Compiler is stable in Expo SDK 54 and later. It automatically memoizes components and hooks, eliminating the need for manual `useMemo`, `useCallback`, and `React.memo`.
## Enabling React Compiler
Add to `app.json`:
```json
{
"expo": {
"experiments": {
"reactCompiler": true
}
}
}
```
## What React Compiler Does
- Automatically memoizes components and values
- Eliminates unnecessary re-renders
- Removes the need for manual `useMemo` and `useCallback`
- Works with existing code without modifications
## Cleanup After Enabling
Once React Compiler is enabled, you can remove manual memoization:
```tsx
// Before (manual memoization)
const memoizedValue = useMemo(() => computeExpensive(a, b), [a, b]);
const memoizedCallback = useCallback(() => doSomething(a), [a]);
const MemoizedComponent = React.memo(MyComponent);
// After (React Compiler handles it)
const value = computeExpensive(a, b);
const callback = () => doSomething(a);
// Just use MyComponent directly
```
## Requirements
- Expo SDK 54 or later
- New Architecture enabled (default in SDK 54+)
## Verifying It's Working
React Compiler runs at build time. Check the Metro bundler output for compilation messages. You can also use React DevTools to verify components are being optimized.
## Troubleshooting
If you encounter issues:
1. Ensure New Architecture is enabled
2. Clear Metro cache: `npx expo start --clear`
3. Check for incompatible patterns in your code (rare)
React Compiler is designed to work with idiomatic React code. If it can't safely optimize a component, it skips that component without breaking your app.

View File

@@ -1,61 +0,0 @@
# Migrating from react-navigation to expo-router
In SDK 56+, application code must not import from `@react-navigation/*` directly. Repoint those imports to the matching `expo-router` entry points. Runtime API is unchanged — only the module specifiers move.
## Steps
1. Prefer the automated codemod (see below). If it is not viable, fall back to the manual mapping.
2. Replace imports using the table. Use the entry point that matches the `@react-navigation/*` source.
3. After rewriting, check whether any of the rewritten imports are deprecated in `expo-router` (see [Check for deprecated imports](#check-for-deprecated-imports)). If so, surface the deprecation reason and the suggested replacement to the user before continuing.
4. Validate: search for remaining `@react-navigation/` references in source files, then run typecheck/build/start.
5. Remove `@react-navigation/*` packages that are no longer imported from `package.json` and reinstall (delete `node_modules` if needed).
## Automated migration (preferred)
Run from the project root over your application code (replace `src` with the actual directory or glob):
```sh
npx expo-codemod sdk-56-expo-router-react-navigation-replace src
```
```sh
npx expo-codemod sdk-56-expo-router-react-navigation-replace '**/*.{ts,tsx,js,jsx}'
```
## Manual API mapping
| React Navigation source | Expo Router target |
| ------------------------------------- | ------------------------------------------------------------------------ |
| `@react-navigation/native` | `expo-router/react-navigation` |
| `@react-navigation/core` | `expo-router/react-navigation` |
| `@react-navigation/elements` | `expo-router/react-navigation` |
| `@react-navigation/routers` | `expo-router/react-navigation` |
| `@react-navigation/stack` | `expo-router/js-stack` |
| `@react-navigation/bottom-tabs` | `expo-router/js-tabs` |
| `@react-navigation/material-top-tabs` | `expo-router/js-top-tabs` |
| `@react-navigation/native-stack` | No direct equivalent. Use the `Stack` layout from `expo-router` instead. |
**Stack caveat:** Do NOT rewrite `import { Stack } from "expo-router"` to `expo-router/js-stack`. The root `Stack` is the Expo Router layout component used in route files; only use `expo-router/js-stack` when replacing a `@react-navigation/stack` JS stack navigator.
If you encounter a symbol that has no replacement, ask the user to file an issue in the `expo/expo` repository describing what is needed and why.
## Check for deprecated imports
A successful rewrite to `expo-router/*` does not guarantee the new import is the recommended one. Some symbols are re-exported as deprecated shims and the project may need to migrate further (for example, to a different `expo-router` API or to a first-party Expo package).
For each symbol rewritten in step 2:
1. Resolve the rewritten module to its source in `node_modules` (e.g., `node_modules/expo-router/build/react-navigation.d.ts`, `js-stack`, `js-tabs`, `js-top-tabs`).
2. Look for a `@deprecated` JSDoc tag on the named export, or a runtime deprecation warning in the implementation file.
3. If deprecated, capture both the reason and the recommended replacement from the JSDoc/comment.
4. Report each deprecated symbol to the user with: the import path, the symbol, the deprecation reason, and the suggested replacement. Wait for the user to confirm before mass-applying further changes.
## Done when
1. No `@react-navigation/*` imports remain in source files.
2. No unused `@react-navigation/*` entries remain in `package.json`.
3. Typecheck and bundler start without `@react-navigation/*` errors.
## Reference
- Official Expo Router SDK 55 → 56 migration guide: https://docs.expo.dev/router/migrate/sdk-55-to-56

View File

@@ -1,19 +0,0 @@
local ok, conform = pcall(require, "conform")
if not ok then
vim.notify("conform.nvim not loaded", vim.log.levels.WARN)
return
end
conform.setup({
formatters_by_ft = {
javascript = { "oxfmt" },
javascriptreact = { "oxfmt" },
typescript = { "oxfmt" },
typescriptreact = { "oxfmt" },
json = { "oxfmt" },
jsonc = { "oxfmt" },
},
})
vim.lsp.enable("tsgo")
vim.lsp.enable("oxlint")

View File

@@ -1,20 +0,0 @@
// Folder-specific settings
//
// For a full list of overridable settings, and general information on folder-specific settings,
// see the documentation: https://zed.dev/docs/configuring-zed#settings-files
{
"languages": {
"TypeScript": {
"formatter": { "language_server": { "name": "oxfmt" } }
},
"TSX": {
"formatter": { "language_server": { "name": "oxfmt" } }
},
"JavaScript": {
"formatter": { "language_server": { "name": "oxfmt" } }
},
"JSX": {
"formatter": { "language_server": { "name": "oxfmt" } }
}
}
}

View File

@@ -10,6 +10,6 @@
}, },
"dependencies": { "dependencies": {
"@freya/agent-protocol": "workspace:*", "@freya/agent-protocol": "workspace:*",
"@nym.sh/jrpc": "1.1.0" "@nym.sh/jrpc": "^0.1.0"
} }
} }

View File

@@ -26,7 +26,7 @@
"@freya/source-tfl": "workspace:*", "@freya/source-tfl": "workspace:*",
"@freya/source-weatherkit": "workspace:*", "@freya/source-weatherkit": "workspace:*",
"@freya/source-web-search": "workspace:*", "@freya/source-web-search": "workspace:*",
"@nym.sh/jrpc": "1.1.0", "@nym.sh/jrpc": "^0.1.0",
"@openrouter/sdk": "^0.9.11", "@openrouter/sdk": "^0.9.11",
"arktype": "^2.1.29", "arktype": "^2.1.29",
"better-auth": "^1", "better-auth": "^1",

View File

@@ -44,7 +44,7 @@ mock.module("../sources/user-sources.ts", () => ({
}), }),
})) }))
mock.module("../conversations/db-storage.ts", () => ({ mock.module("../conversations/storage.ts", () => ({
conversations: (_db: Database, userId: string) => ({ conversations: (_db: Database, userId: string) => ({
async getOrCreateConversation() { async getOrCreateConversation() {
return { id: `conversation-${userId}` } return { id: `conversation-${userId}` }

View File

@@ -1,4 +1,3 @@
import { ConversationEntryKind } from "@freya/core"
import { describe, expect, test } from "bun:test" import { describe, expect, test } from "bun:test"
import type { AppendConversationEntryInput } from "../conversations/storage.ts" import type { AppendConversationEntryInput } from "../conversations/storage.ts"
@@ -7,6 +6,7 @@ import type {
ConversationStorageEntry, ConversationStorageEntry,
} from "./conversation-recording-query-agent.ts" } from "./conversation-recording-query-agent.ts"
import { ConversationEntryKind } from "../conversations/types.ts"
import { ConversationRecordingQueryAgent } from "./conversation-recording-query-agent.ts" import { ConversationRecordingQueryAgent } from "./conversation-recording-query-agent.ts"
import { import {
createQueryAgentEventListeners, createQueryAgentEventListeners,

View File

@@ -1,13 +1,12 @@
import type { ConversationEntryMetadata } from "@freya/core"
import { ConversationEntryKind } from "@freya/core"
import { randomUUID } from "node:crypto" import { randomUUID } from "node:crypto"
import type { import type {
AppendConversationEntryInput, AppendConversationEntryInput,
ConversationEntryRow, ConversationEntryRow,
} from "../conversations/storage.ts" } from "../conversations/storage.ts"
import type { ConversationEntryMetadata } from "../conversations/types.ts"
import { ConversationEntryKind } from "../conversations/types.ts"
import { import {
createQueryAgentEventListeners, createQueryAgentEventListeners,
QueryAgentEvent, QueryAgentEvent,
@@ -20,7 +19,6 @@ import {
type QueryAgentStreamEvent, type QueryAgentStreamEvent,
} from "./query-agent.ts" } from "./query-agent.ts"
/** Storage operations used to persist and replay query-agent conversation entries. */
export interface ConversationStorage { export interface ConversationStorage {
getOrCreateConversation(): Promise<{ id: string }> getOrCreateConversation(): Promise<{ id: string }>
appendEntry( appendEntry(
@@ -30,13 +28,11 @@ export interface ConversationStorage {
listEntries(conversationId: string): Promise<ConversationStorageEntry[]> listEntries(conversationId: string): Promise<ConversationStorageEntry[]>
} }
/** Minimal persisted entry shape needed by recording and replay agents. */
export type ConversationStorageEntry = Pick< export type ConversationStorageEntry = Pick<
ConversationEntryRow, ConversationEntryRow,
"id" | "sequence" | "kind" | "payload" | "metadata" | "createdAt" "id" | "sequence" | "kind" | "payload" | "metadata" | "createdAt"
> >
/** Configuration for wrapping a QueryAgent with conversation recording. */
export interface ConversationRecordingQueryAgentConfig { export interface ConversationRecordingQueryAgentConfig {
agent: QueryAgent agent: QueryAgent
storage: ConversationStorage storage: ConversationStorage

View File

@@ -1,145 +0,0 @@
import type { AgentEvent } from "@freya/agent-protocol"
import {
AssistantMessagePayload,
ConversationEntryKind,
UserMessagePayload,
ToolCallPayload,
ToolResultPayload,
} from "@freya/core"
import { type } from "arktype"
import type { ConversationStorage } from "../conversations/storage"
import type { Job } from "../lib/job"
import type { JobExecutor } from "../lib/worker"
import type { NotificationCentral } from "../notification/notification-central"
import type { UserSessionManager } from "../session"
import { ConversationResponseStateStatus } from "../db/schema"
import { streamAgentResponse } from "./streaming"
export interface AgentResponseJobPayload {
conversationId: string
}
interface AgentResponseWorkerConfig {
conversationStorage: ConversationStorage
userSessionManager: UserSessionManager
notificationCentral: NotificationCentral
}
export class AgentResponseJobExecutor implements JobExecutor<AgentResponseJobPayload> {
private conversationStorage: ConversationStorage
private userSessionManager: UserSessionManager
private notificationCentral: NotificationCentral
constructor({
conversationStorage,
userSessionManager,
notificationCentral,
}: AgentResponseWorkerConfig) {
this.conversationStorage = conversationStorage
this.userSessionManager = userSessionManager
this.notificationCentral = notificationCentral
}
async execute(job: Job<AgentResponseJobPayload>): Promise<void> {
const conversation = await this.conversationStorage.findConversation(job.payload.conversationId)
if (!conversation) {
return
}
const claimed = await this.conversationStorage.claimPendingConversationResponseState(
job.payload.conversationId,
)
if (!claimed) {
// conversation response state not found or already claimed
return
}
const pendingEntries = await this.conversationStorage.listPendingUserConversationEntries(
conversation.userId,
conversation.id,
)
if (pendingEntries.length === 0) {
await this.conversationStorage.clearConversationResponseState(job.payload.conversationId)
return
}
const message = pendingEntries.reduce((acc, entry) => {
const payload = UserMessagePayload(entry.payload)
if (payload instanceof type.errors) {
return acc
}
return (
acc + "\n" + payload.parts.reduce((msg, p) => (p.type === "text" ? msg + p.text : msg), "")
)
}, "")
const session = await this.userSessionManager.getOrCreate(conversation.userId)
try {
for await (const event of streamAgentResponse({
agent: session.agent,
input: { message, signal: job.signal },
})) {
if (job.signal.aborted) {
break
}
await this.recordAgentEvent(event, conversation.id)
await this.notificationCentral.notifyUser(conversation.userId, {
kind: "agent",
payload: event,
})
}
// if job is aborted, stop everything immediately, including clean up.
// the aborter is assumed responsibility on how to proceed.
if (!job.signal.aborted) {
await this.conversationStorage.clearConversationResponseState(job.payload.conversationId)
}
} catch (err) {
console.error("[agent job executor] error streaming agent response:", err)
if (!job.signal.aborted) {
await this.conversationStorage.markResponseStateStatus(
[job.payload.conversationId],
ConversationResponseStateStatus.Failed,
)
}
}
}
private async recordAgentEvent(event: AgentEvent, conversationId: string) {
switch (event.type) {
case "message_created":
await this.conversationStorage.appendEntry(conversationId, {
kind: ConversationEntryKind.AssistantMessage,
payload: {
role: "assistant",
parts: [{ type: "text", text: event.text }],
} satisfies AssistantMessagePayload,
})
break
case "tool_started":
await this.conversationStorage.appendEntry(conversationId, {
kind: ConversationEntryKind.ToolCall,
payload: {
toolName: event.toolName,
} satisfies ToolCallPayload,
})
break
case "tool_finished":
await this.conversationStorage.appendEntry(conversationId, {
kind: ConversationEntryKind.ToolResult,
payload: {
toolName: event.toolName,
ok: event.ok,
} satisfies ToolResultPayload,
})
break
}
}
}

View File

@@ -1,9 +1,9 @@
import { ConversationEntryKind } from "@freya/core"
import { beforeEach, describe, expect, mock, test } from "bun:test" import { beforeEach, describe, expect, mock, test } from "bun:test"
import type { QueryAgentToolbox } from "./query-agent-toolbox.ts" import type { QueryAgentToolbox } from "./query-agent-toolbox.ts"
import type { QueryAgentStreamEvent } from "./query-agent.ts" import type { QueryAgentStreamEvent } from "./query-agent.ts"
import { ConversationEntryKind } from "../conversations/types.ts"
import { QueryAgentEvent } from "./query-agent.ts" import { QueryAgentEvent } from "./query-agent.ts"
interface FakePiSession { interface FakePiSession {

View File

@@ -33,25 +33,13 @@ import {
import { createSessionManager } from "./session-manager.ts" import { createSessionManager } from "./session-manager.ts"
import { createFreyaAgentTools, FREYA_AGENT_TOOL_NAMES } from "./tools.ts" import { createFreyaAgentTools, FREYA_AGENT_TOOL_NAMES } from "./tools.ts"
/** Active Pi SDK session instance returned by createAgentSession. */
type PiSession = Awaited<ReturnType<typeof createAgentSession>>["session"] type PiSession = Awaited<ReturnType<typeof createAgentSession>>["session"]
/** Pi event emitted when a message finishes. */
type PiMessageEndEvent = Extract<AgentSessionEvent, { type: "message_end" }> type PiMessageEndEvent = Extract<AgentSessionEvent, { type: "message_end" }>
/** Message payload carried by Pi's message-end event. */
type PiAgentMessage = PiMessageEndEvent["message"] type PiAgentMessage = PiMessageEndEvent["message"]
/** Pi event emitted when an agent run finishes. */
type PiAgentEndEvent = Extract<AgentSessionEvent, { type: "agent_end" }> type PiAgentEndEvent = Extract<AgentSessionEvent, { type: "agent_end" }>
/** Session manager created for Pi conversation replay. */
type PiSessionManager = ReturnType<typeof createSessionManager> type PiSessionManager = ReturnType<typeof createSessionManager>
/** Message shape accepted by the replay session manager. */
type PiSessionMessage = Parameters<PiSessionManager["appendMessage"]>[0] type PiSessionMessage = Parameters<PiSessionManager["appendMessage"]>[0]
/** Configuration for the Pi-backed query agent. */
export interface PiQueryAgentConfig { export interface PiQueryAgentConfig {
toolbox: QueryAgentToolbox toolbox: QueryAgentToolbox
apiKey?: string apiKey?: string
@@ -166,16 +154,6 @@ export class PiQueryAgent implements QueryAgent {
this.handlePiEvent(event, pushRunEvent) this.handlePiEvent(event, pushRunEvent)
}) })
input.signal?.addEventListener(
"abort",
async () => {
await session.abort()
close()
unsubscribe()
},
{ once: true },
)
session session
.prompt(input.message) .prompt(input.message)
.then(() => { .then(() => {

View File

@@ -2,7 +2,6 @@ export interface QueryAgentAsk {
message: string message: string
conversationId?: string conversationId?: string
userMessageEntry?: QueryAgentConversationEntryRef userMessageEntry?: QueryAgentConversationEntryRef
signal?: AbortSignal
} }
export type QueryAgentStreamEvent = export type QueryAgentStreamEvent =

View File

@@ -1,70 +0,0 @@
import type { ConversationStorage } from "../conversations/storage"
import type { AgentWorkScheduler } from "./scheduler"
interface AgentResponseReconcilerConfig {
storage: ConversationStorage
interval: number
scheduler: AgentWorkScheduler
signal: AbortSignal
}
export class AgentResponseReconciler {
private storage: ConversationStorage
private interval: number
private scheduler: AgentWorkScheduler
private signal: AbortSignal
private stopLoop: ReturnType<typeof setInterval> | null = null
constructor({ storage, interval, scheduler, signal }: AgentResponseReconcilerConfig) {
this.storage = storage
this.interval = interval
this.scheduler = scheduler
this.signal = signal
}
start() {
this.signal.throwIfAborted()
this.signal.addEventListener(
"abort",
() => {
if (this.stopLoop !== null) {
clearInterval(this.stopLoop)
this.stopLoop = null
}
},
{ once: true },
)
this.stopLoop = setInterval(this.reconcile.bind(this), this.interval)
}
private async reconcile() {
// enqueue pending responses
const pendingStates = await this.storage.listPendingResponseStates()
const now = new Date().getTime()
for (const state of pendingStates) {
if (state.maxWaitUntil.getTime() < now) {
this.scheduler.enqueueAgentResponse(state.conversationId)
}
}
// re-enqueue stuck responses
const runningStates = await this.storage.listRunningResponseStates()
const stuckIds: string[] = []
for (const state of runningStates) {
if (state.runningSince && Math.max(now - state.runningSince.getTime(), 0) > 5 * 1000 * 60) {
// if the response is running for more than 5 minutes
// we assume that its stuck and enqueue it for retry
stuckIds.push(state.conversationId)
}
}
if (stuckIds.length > 0) {
await this.storage.markResponseStateStatus(stuckIds, "pending")
for (const id of stuckIds) {
this.scheduler.enqueueAgentResponse(id)
}
}
}
}

View File

@@ -1,160 +0,0 @@
import type { UserEvent } from "@freya/agent-protocol"
import { ConversationEntryKind, UserMessagePayload } from "@freya/core"
import type { ConversationStorage } from "../conversations/storage"
import type { Job, JobRegistry } from "../lib/job"
import type { AgentResponseJobPayload } from "./job"
import { ConversationNotFoundError } from "../conversations/errors";
import { ConversationResponseStateStatus } from "../db/schema";
interface AgentMessageSchedulerConfig {
storage: ConversationStorage
maxWaitTime: number
/**
* How long to wait before responding to the user.
*/
waitTIme: number
jobRegistry: JobRegistry<AgentResponseJobPayload>
}
/**
* Schedules and manages the flow of messages between the user and the query agent for a specific conversation.
*/
export class AgentWorkScheduler {
private conversationStorage: ConversationStorage
private jobRegistry: JobRegistry<AgentResponseJobPayload>
private timing: {
maxWaitTime: number
waitTime: number
}
private timers = new Map<string, ReturnType<typeof setTimeout>>()
private runningJobs = new Map<string, Job<AgentResponseJobPayload>>()
constructor(config: AgentMessageSchedulerConfig) {
this.conversationStorage = config.storage
this.jobRegistry = config.jobRegistry
this.timing = {
maxWaitTime: config.maxWaitTime,
waitTime: config.waitTIme,
}
this.jobRegistry.addEventListener("settled", this.eraseJob.bind(this))
this.jobRegistry.addEventListener("cancelled", this.eraseJob.bind(this))
}
async receiveMessage(conversationId: string, message: string) {
await this.conversationStorage.transaction(async (storage) => {
const now = new Date()
const entry = await storage.appendEntry(conversationId, {
kind: ConversationEntryKind.UserMessage,
payload: {
role: "user",
parts: [{ type: "text", text: message }],
} satisfies UserMessagePayload,
})
await storage.upsertConversationResponseState(conversationId, {
maxWaitUntil: new Date(now.getTime() + this.timing.maxWaitTime),
pendingSinceEntryId: entry.id,
status: "pending",
})
return entry
})
this.scheduleAgentResponse(conversationId, this.timing.waitTime)
}
async receiveUserEvent(conversationId: string, event: UserEvent) {
if (event.type === "typing") {
await this.delayAgentResponse(conversationId)
}
}
enqueueAgentResponse(conversationId: string): void {
const existing = this.timers.get(conversationId)
if (existing) {
clearTimeout(existing)
this.timers.delete(conversationId)
}
this.cancelCurrentJob(conversationId)
const job = this.jobRegistry.addJob({
payload: { conversationId },
})
this.runningJobs.set(conversationId, job)
}
private async delayAgentResponse(conversationId: string) {
this.cancelCurrentJob(conversationId);
try {
const ok = await this.conversationStorage.transaction(async (storage) => {
const state = await storage.findConversationResponseState(conversationId);
if (state && state.status !== ConversationResponseStateStatus.Failed) {
await storage.updateConversationResponseState(conversationId, {
status: ConversationResponseStateStatus.Pending,
// the agent response was cancelled, so its no longer running
// clear runningSince timestamp
runningSince: null,
})
return true
}
return false
})
if (ok) {
await this.scheduleAgentResponse(conversationId, this.timing.waitTime)
}
} catch (error) {
if (error instanceof ConversationNotFoundError) {
// the user is typing but there isn't a scheduled agent response yet
// which means the user is typing their first message after the agent has previously responded
// swallow the error
} else {
console.error("[agent response scheduler] error delaying agent response", error)
}
return
}
}
private async scheduleAgentResponse(conversationId: string, delay: number) {
const existing = this.timers.get(conversationId)
if (existing) {
clearTimeout(existing)
}
this.cancelCurrentJob(conversationId)
this.timers.set(
conversationId,
setTimeout(() => {
this.enqueueAgentResponse(conversationId)
}, delay),
)
}
/**
* cancels the current job for agent response for the given conversation id
* no-op if there is no active job for the conversation.
*/
private cancelCurrentJob(conversationId: string): void {
const job = this.runningJobs.get(conversationId)
if (!job) return
// If an active response is working on stale context, abort it so the next
// job can answer using the latest pending user messages.
this.jobRegistry.cancelJob(job)
}
private eraseJob(job: Job<AgentResponseJobPayload>) {
if (this.runningJobs.get(job.payload.conversationId) === job) {
this.runningJobs.delete(job.payload.conversationId)
}
}
}

View File

@@ -1,66 +0,0 @@
import type { UserEvent } from "@freya/agent-protocol"
import type { ConversationStorage } from "../conversations/storage"
import type { NotificationCentral } from "../notification/notification-central"
import type { UserSessionManager } from "../session"
import { JobRegistry } from "../lib/job"
import { Worker } from "../lib/worker"
import { AgentResponseJobExecutor, type AgentResponseJobPayload } from "./job"
import { AgentResponseReconciler } from "./reconciler"
import { AgentWorkScheduler } from "./scheduler"
interface AgentServiceConfig {
storage: ConversationStorage
userSessionManager: UserSessionManager
notificationCentral: NotificationCentral
signal: AbortSignal
}
export class AgentService {
private readonly storage: ConversationStorage
private readonly scheduler: AgentWorkScheduler
private readonly reconciler: AgentResponseReconciler
private readonly worker: Worker<AgentResponseJobPayload>
private readonly jobRegistry = new JobRegistry<AgentResponseJobPayload>()
constructor({ storage, userSessionManager, notificationCentral, signal }: AgentServiceConfig) {
this.storage = storage
this.scheduler = new AgentWorkScheduler({
storage,
jobRegistry: this.jobRegistry,
waitTIme: 5 * 1000,
maxWaitTime: 5 * 1000 * 60,
})
this.reconciler = new AgentResponseReconciler({
signal,
storage: this.storage,
interval: 60 * 1000,
scheduler: this.scheduler,
})
this.worker = new Worker<AgentResponseJobPayload>({
signal,
concurrency: 10,
registry: this.jobRegistry,
runner: new AgentResponseJobExecutor({
conversationStorage: storage,
notificationCentral,
userSessionManager,
}),
})
}
start() {
this.worker.start()
this.reconciler.start()
}
async scheduleAgentResponse(conversationId: string, message: string) {
await this.scheduler.receiveMessage(conversationId, message)
}
async handleUserEvent(conversationId: string, event: UserEvent) {
await this.scheduler.receiveUserEvent(conversationId, event)
}
}

View File

@@ -1,8 +1,8 @@
import { ConversationEntryKind } from "@freya/core"
import { describe, expect, test } from "bun:test" import { describe, expect, test } from "bun:test"
import type { ConversationStorageEntry } from "./conversation-recording-query-agent.ts" import type { ConversationStorageEntry } from "./conversation-recording-query-agent.ts"
import { ConversationEntryKind } from "../conversations/types.ts"
import { createSessionManager } from "./session-manager.ts" import { createSessionManager } from "./session-manager.ts"
describe("createSessionManager", () => { describe("createSessionManager", () => {

View File

@@ -1,21 +1,18 @@
import { SessionManager } from "@earendil-works/pi-coding-agent" import { SessionManager } from "@earendil-works/pi-coding-agent"
import { tmpdir } from "node:os"
import type { ConversationStorageEntry } from "./conversation-recording-query-agent.ts"
import { import {
AssistantMessagePayload, AssistantMessagePayload,
ContextSummaryPayload, ContextSummaryPayload,
ConversationEntryKind, ConversationEntryKind,
UserMessagePayload, UserMessagePayload,
} from "@freya/core" } from "../conversations/types.ts"
import { tmpdir } from "node:os"
import type { ConversationStorageEntry } from "./conversation-recording-query-agent.ts"
/** Message shape accepted by Pi's SessionManager.appendMessage API. */
type PiMessage = Parameters<SessionManager["appendMessage"]>[0] type PiMessage = Parameters<SessionManager["appendMessage"]>[0]
/** Assistant message variant required when replaying stored assistant entries. */
type PiAssistantMessage = Extract<PiMessage, { role: "assistant" }> type PiAssistantMessage = Extract<PiMessage, { role: "assistant" }>
/** Inputs required to rebuild a Pi session manager from stored conversation entries. */
export interface CreateSessionManagerInput { export interface CreateSessionManagerInput {
cwd?: string cwd?: string
entries: ConversationStorageEntry[] entries: ConversationStorageEntry[]

View File

@@ -9,6 +9,7 @@ import type {
QueryAgentEventListener, QueryAgentEventListener,
QueryAgentStreamEvent, QueryAgentStreamEvent,
} from "./query-agent.ts" } from "./query-agent.ts"
import type { AgentResponseStreamItem } from "./streaming.ts"
import { streamAgentResponse } from "./streaming.ts" import { streamAgentResponse } from "./streaming.ts"
@@ -46,13 +47,17 @@ describe("streamAgentResponse", () => {
{ type: "done" }, { type: "done" },
]) ])
const events = await collectStreamAgentResponse( const { events, result } = await collectStreamAgentResponse(
streamAgentResponse({ streamAgentResponse({
agent, agent,
input: { message: "hello" }, input: { message: "hello" },
}), }),
) )
expect(result).toEqual({
conversationId: "conversation-1",
message: "First message\nSecond message\nThird message",
})
expect(events).toEqual([ expect(events).toEqual([
{ type: "conversation_started", conversationId: "conversation-1" }, { type: "conversation_started", conversationId: "conversation-1" },
{ type: "message_created", text: "First message" }, { type: "message_created", text: "First message" },
@@ -69,13 +74,17 @@ describe("streamAgentResponse", () => {
{ type: "done" }, { type: "done" },
]) ])
const events = await collectStreamAgentResponse( const { events, result } = await collectStreamAgentResponse(
streamAgentResponse({ streamAgentResponse({
agent, agent,
input: { message: "hello" }, input: { message: "hello" },
}), }),
) )
expect(result).toEqual({
conversationId: "conversation-1",
message: " const value = 1 \n\n return value",
})
expect(events).toEqual([ expect(events).toEqual([
{ type: "conversation_started", conversationId: "conversation-1" }, { type: "conversation_started", conversationId: "conversation-1" },
{ type: "message_created", text: " const value = 1 " }, { type: "message_created", text: " const value = 1 " },
@@ -113,12 +122,28 @@ describe("streamAgentResponse", () => {
}) })
async function collectStreamAgentResponse( async function collectStreamAgentResponse(
stream: AsyncIterable<AgentEvent>, stream: AsyncIterable<AgentResponseStreamItem>,
events: AgentEvent[] = [], events: AgentEvent[] = [],
): Promise<AgentEvent[]> { ): Promise<{
for await (const event of stream) { events: AgentEvent[]
events.push(event) result: { message: string; conversationId: string }
}> {
let result: { message: string; conversationId: string } | null = null
for await (const item of stream) {
switch (item.type) {
case "event":
events.push(item.event)
break
case "result":
result = item.result
break
}
} }
return events if (!result) {
throw new Error("Expected stream result")
}
return { events, result }
} }

View File

@@ -1,8 +1,10 @@
import type { AgentEvent } from "@freya/agent-protocol" import type { AgentEvent, SendMessageResult } from "@freya/agent-protocol"
import type { QueryAgent, QueryAgentAsk } from "./query-agent.ts" import type { QueryAgent, QueryAgentAsk } from "./query-agent.ts"
export type AgentResponseStreamItem = { type: "event"; event: AgentEvent } export type AgentResponseStreamItem =
| { type: "event"; event: AgentEvent }
| { type: "result"; result: SendMessageResult }
export async function* streamAgentResponse({ export async function* streamAgentResponse({
agent, agent,
@@ -10,18 +12,18 @@ export async function* streamAgentResponse({
}: { }: {
agent: QueryAgent agent: QueryAgent
input: QueryAgentAsk input: QueryAgentAsk
}): AsyncGenerator<AgentEvent, void, void> { }): AsyncGenerator<AgentResponseStreamItem, void, void> {
let message = "" let message = ""
let conversationId: string | null = null let conversationId: string | null = null
const splitter = new AgentMessageSplitter() const splitter = new AgentMessageSplitter()
function messageEvent(text: string): AgentEvent | null { function messageEvent(text: string): AgentResponseStreamItem | null {
if (text.trim() === "") return null if (text.trim() === "") return null
return { type: "message_created", text } return { type: "event", event: { type: "message_created", text } }
} }
function flushPendingMessage(): AgentEvent | null { function flushPendingMessage(): AgentResponseStreamItem | null {
const text = splitter.flush() const text = splitter.flush()
if (text === null) return null if (text === null) return null
@@ -29,14 +31,10 @@ export async function* streamAgentResponse({
} }
for await (const event of agent.ask(input)) { for await (const event of agent.ask(input)) {
if (input.signal?.aborted) {
break
}
switch (event.type) { switch (event.type) {
case "conversation": case "conversation":
conversationId = event.conversationId conversationId = event.conversationId
yield { type: "conversation_started", conversationId } yield { type: "event", event: { type: "conversation_started", conversationId } }
break break
case "text_delta": case "text_delta":
@@ -52,7 +50,7 @@ export async function* streamAgentResponse({
const item = flushPendingMessage() const item = flushPendingMessage()
if (item) yield item if (item) yield item
} }
yield { type: "tool_started", toolName: event.toolName } yield { type: "event", event: { type: "tool_started", toolName: event.toolName } }
break break
case "tool_end": case "tool_end":
@@ -61,9 +59,12 @@ export async function* streamAgentResponse({
if (item) yield item if (item) yield item
} }
yield { yield {
type: "tool_finished", type: "event",
toolName: event.toolName, event: {
ok: event.ok, type: "tool_finished",
toolName: event.toolName,
ok: event.ok,
},
} }
break break
@@ -72,7 +73,7 @@ export async function* streamAgentResponse({
const item = flushPendingMessage() const item = flushPendingMessage()
if (item) yield item if (item) yield item
} }
yield { type: "message_failed", error: event.message } yield { type: "event", event: { type: "message_failed", error: event.message } }
throw new Error(event.message) throw new Error(event.message)
case "done": case "done":
@@ -80,15 +81,26 @@ export async function* streamAgentResponse({
const item = flushPendingMessage() const item = flushPendingMessage()
if (item) yield item if (item) yield item
} }
yield { type: "message_finished" } const result = createResult(message, conversationId)
yield { type: "event", event: { type: "message_finished" } }
yield { type: "result", result }
return return
} }
} }
const item = flushPendingMessage() const item = flushPendingMessage()
if (item) yield item if (item) yield item
const result = createResult(message, conversationId)
yield { type: "event", event: { type: "message_finished" } }
yield { type: "result", result }
}
yield { type: "message_finished" } function createResult(message: string, conversationId: string | null): SendMessageResult {
if (!conversationId) {
throw new Error("Agent response stream ended without a conversation id")
}
return { message, conversationId }
} }
class AgentMessageSplitter { class AgentMessageSplitter {

View File

@@ -1,10 +1,8 @@
import { describe, expect, test } from "bun:test" import { describe, expect, test } from "bun:test"
import { Hono } from "hono" import { Hono } from "hono"
import type { ConversationStorage } from "../conversations/storage.ts" import type { UserSessionManager } from "../session/index.ts"
import type { NotificationCentral } from "../notification/notification-central.ts"
import type { AgentService } from "./service.ts"
import { registerAgentWebSocketHandlers } from "./ws.ts" import { registerAgentWebSocketHandlers } from "./ws.ts"
describe("agent websocket handler", () => { describe("agent websocket handler", () => {
@@ -13,9 +11,7 @@ describe("agent websocket handler", () => {
const app = new Hono() const app = new Hono()
registerAgentWebSocketHandlers(app, { registerAgentWebSocketHandlers(app, {
agentService: {} as AgentService, sessionManager: {} as UserSessionManager,
storage: {} as ConversationStorage,
notificationCentral: {} as NotificationCentral,
corsMiddleware: async (c, next) => { corsMiddleware: async (c, next) => {
const origin = c.req.header("origin") const origin = c.req.header("origin")
if (origin && origin !== "https://app.freya.test") { if (origin && origin !== "https://app.freya.test") {
@@ -48,9 +44,7 @@ describe("agent websocket handler", () => {
const app = new Hono() const app = new Hono()
registerAgentWebSocketHandlers(app, { registerAgentWebSocketHandlers(app, {
agentService: {} as AgentService, sessionManager: {} as UserSessionManager,
storage: {} as ConversationStorage,
notificationCentral: {} as NotificationCentral,
corsMiddleware: async (_c, next) => { corsMiddleware: async (_c, next) => {
await next() await next()
}, },

View File

@@ -1,58 +1,53 @@
import type { AgentClientApi, AgentServerApi, UserEvent } from "@freya/agent-protocol" import type { AgentClientApi, AgentServerApi, SendMessageResult } from "@freya/agent-protocol"
import type { JrpcChannel, JrpcMessage, JsonRpcMessage } from "@nym.sh/jrpc" import type { JrpcChannel, JrpcMessage, JsonRpcMessage } from "@nym.sh/jrpc"
import type { Hono, MiddlewareHandler } from "hono" import type { Hono, MiddlewareHandler } from "hono"
import type { WSContext } from "hono/ws" import type { WSContext } from "hono/ws"
import { JsonRpcClient, JsonRpcServer, deserializeJrpcMessage } from "@nym.sh/jrpc" import { JsonRpcClient, JsonRpcServer } from "@nym.sh/jrpc"
import { type } from "arktype"
import { upgradeWebSocket, websocket } from "hono/bun" import { upgradeWebSocket, websocket } from "hono/bun"
import type { AuthSessionMiddleware } from "../auth/session-middleware.ts" import type { AuthSessionMiddleware } from "../auth/session-middleware.ts"
import type { ConversationStorage } from "../conversations/storage.ts" import type { UserSessionManager } from "../session/index.ts"
import type {
NotificationCentral, import { streamAgentResponse } from "./streaming.ts"
NotificationPayload,
} from "../notification/notification-central.ts"
import type { AgentService } from "./service.ts"
interface AgentWebSocketHandlerDeps { interface AgentWebSocketHandlerDeps {
agentService: AgentService sessionManager: UserSessionManager
storage: ConversationStorage
notificationCentral: NotificationCentral
authSessionMiddleware: AuthSessionMiddleware authSessionMiddleware: AuthSessionMiddleware
corsMiddleware: MiddlewareHandler corsMiddleware: MiddlewareHandler
} }
interface ValidSendMessageInput {
message: string
}
export const agentWebSocket = websocket export const agentWebSocket = websocket
const SendMessageInputBody = type({
"+": "reject",
message: "string",
})
export function registerAgentWebSocketHandlers( export function registerAgentWebSocketHandlers(
app: Hono, app: Hono,
{ { sessionManager, authSessionMiddleware, corsMiddleware }: AgentWebSocketHandlerDeps,
agentService,
storage,
notificationCentral,
authSessionMiddleware,
corsMiddleware,
}: AgentWebSocketHandlerDeps,
): void { ): void {
app.get( app.get(
"/api/agent/ws", "/api/agent/ws",
corsMiddleware, corsMiddleware,
authSessionMiddleware, authSessionMiddleware,
upgradeWebSocket(async (c) => { upgradeWebSocket((c) => {
const user = c.get("user") const user = c.get("user")
if (!user) { if (!user) {
throw new Error("Authenticated WebSocket user missing") throw new Error("Authenticated WebSocket user missing")
} }
const conversation = await storage.getOrCreateConversation(user.id)
const channel = new HonoWebSocketJrpcChannel() const channel = new HonoWebSocketJrpcChannel()
const connection = new AgentRpcConnection({ const connection = new AgentRpcConnection({
channel, channel,
notificationCentral, sessionManager,
agentService,
userId: user.id, userId: user.id,
conversationId: conversation.id,
}) })
return { return {
@@ -69,7 +64,6 @@ export function registerAgentWebSocketHandlers(
}, },
onClose() { onClose() {
connection.close()
channel.close() channel.close()
}, },
} }
@@ -80,52 +74,54 @@ export function registerAgentWebSocketHandlers(
class AgentRpcConnection implements AgentServerApi { class AgentRpcConnection implements AgentServerApi {
private readonly client: JsonRpcClient<AgentClientApi> private readonly client: JsonRpcClient<AgentClientApi>
private readonly server: JsonRpcServer<AgentServerApi> private readonly server: JsonRpcServer<AgentServerApi>
private readonly agentService: AgentService private activeMessage: Promise<SendMessageResult> | null = null
private readonly notificationCentral: NotificationCentral private readonly sessionManager: UserSessionManager
private readonly userId: string private readonly userId: string
private readonly conversationId: string
private cleanup: (() => void) | null = null
constructor({ constructor({
agentService,
notificationCentral,
channel, channel,
sessionManager,
userId, userId,
conversationId,
}: { }: {
agentService: AgentService
notificationCentral: NotificationCentral
channel: JrpcChannel channel: JrpcChannel
sessionManager: UserSessionManager
userId: string userId: string
conversationId: string
}) { }) {
this.client = new JsonRpcClient<AgentClientApi>(channel) this.sessionManager = sessionManager
this.agentService = agentService
this.notificationCentral = notificationCentral
this.userId = userId this.userId = userId
this.conversationId = conversationId this.client = new JsonRpcClient<AgentClientApi>(channel)
this.server = new JsonRpcServer<AgentServerApi>( this.server = new JsonRpcServer<AgentServerApi>(
{ {
sendMessage: this.sendMessage.bind(this), sendMessage: this.sendMessage.bind(this),
notify: this.notify.bind(this),
ping: this.ping.bind(this), ping: this.ping.bind(this),
}, },
channel, channel,
) )
} }
notify(event: UserEvent): void { start(): Promise<void> {
this.agentService.handleUserEvent(this.conversationId, event) return this.server.start()
} }
async sendMessage(message: string): Promise<boolean> { async sendMessage(message: string): Promise<SendMessageResult> {
const parsed = SendMessageInputBody({ message })
if (parsed instanceof type.errors) {
throw new Error(parsed.summary)
}
if (this.activeMessage) {
throw new Error("A message is already running")
}
const run = this.runMessage(parsed)
this.activeMessage = run
try { try {
await this.agentService.scheduleAgentResponse(this.conversationId, message) return await run
return true } finally {
} catch (error) { if (this.activeMessage === run) {
console.log("[agent rpc connection] error when scheduling agent response", error) this.activeMessage = null
return false }
} }
} }
@@ -133,22 +129,26 @@ class AgentRpcConnection implements AgentServerApi {
return "pong" return "pong"
} }
async start() { private async runMessage(input: ValidSendMessageInput): Promise<SendMessageResult> {
this.cleanup = this.notificationCentral.registerListenerForUser( const session = await this.sessionManager.getOrCreate(this.userId)
this.userId, let result: SendMessageResult | null = null
this.onNotificationReceived.bind(this),
)
await this.server.start()
}
close() { for await (const item of streamAgentResponse({ agent: session.agent, input })) {
this.cleanup?.() switch (item.type) {
} case "event":
await this.client.call("notify", item.event)
private async onNotificationReceived(notification: NotificationPayload) { break
if (notification.kind === "agent") { case "result":
await this.client.call("notify", notification.payload) result = item.result
break
}
} }
if (!result) {
throw new Error("Agent response stream ended without a result")
}
return result
} }
} }
@@ -171,11 +171,7 @@ class HonoWebSocketJrpcChannel implements JrpcChannel {
} }
receive(message: unknown): void { receive(message: unknown): void {
if (typeof message !== "string") { const parsed = parseJrpcMessage(message)
return
}
const parsed = deserializeJrpcMessage(message)
if (!parsed) { if (!parsed) {
this.ws?.close(1003, "Invalid JSON-RPC message") this.ws?.close(1003, "Invalid JSON-RPC message")
return return
@@ -240,6 +236,52 @@ class HonoWebSocketJrpcChannel implements JrpcChannel {
} }
} }
function parseJrpcMessage(message: unknown): JrpcMessage | null {
const text = webSocketMessageText(message)
if (text === null) return null
try {
const value: unknown = JSON.parse(text)
return isJrpcMessage(value) ? value : null
} catch {
return null
}
}
function webSocketMessageText(message: unknown): string | null {
if (typeof message === "string") return message
if (message instanceof ArrayBuffer) return Buffer.from(message).toString("utf8")
if (ArrayBuffer.isView(message)) {
return Buffer.from(message.buffer, message.byteOffset, message.byteLength).toString("utf8")
}
return null
}
function isJrpcMessage(value: unknown): value is JrpcMessage {
if (typeof value !== "object" || value === null) return false
if (!("jsonrpc" in value) || value.jsonrpc !== "2.0") return false
if ("method" in value) {
return "id" in value && typeof value.id === "number" && typeof value.method === "string"
}
if ("result" in value) {
return "id" in value && typeof value.id === "number"
}
if ("error" in value) {
return (
"id" in value &&
typeof value.id === "number" &&
typeof value.error === "object" &&
value.error !== null
)
}
return false
}
function errorMessage(error: unknown): string { function errorMessage(error: unknown): string {
return error instanceof Error ? error.message : String(error) return error instanceof Error ? error.message : String(error)
} }

View File

@@ -1,686 +0,0 @@
import {
AssistantMessagePayload,
AttachmentPayload,
ConversationEntryKind,
ConversationEntryMetadata,
ConversationEntryVisibility,
ContextSummaryPayload,
GenericObjectPayload,
UserMessagePayload,
type ConversationEntryPayload,
} from "@freya/core"
import { type } from "arktype"
import { and, asc, desc, eq, gte, inArray } from "drizzle-orm"
import { alias } from "drizzle-orm/pg-core"
import type { Database } from "../db/index.ts"
import type {
AppendAttachmentEntryInput,
AppendAttachmentEntryResult,
AppendConversationEntryInput,
ConversationEntryRow,
ConversationResponseStateRow,
ConversationRow,
ConversationStorage,
CreateFileInput,
FileRow,
ListConversationEntriesParams,
UpdateConversationResponseStateInput,
UpsertConversationResponseStateInput,
} from "./storage.ts"
import {
conversationEntries,
ConversationResponseStateStatus,
conversationResponseState as conversationResponseStateTable,
conversations as conversationsTable,
files,
user,
} from "../db/schema.ts"
import { ConversationNotFoundError } from "./errors.ts"
const conversationEntryKind = type.enumerated(...Object.values(ConversationEntryKind))
const conversationEntryVisibility = type.enumerated(...Object.values(ConversationEntryVisibility))
const pendingSinceEntry = alias(conversationEntries, "pending_since_entry")
export class DrizzleConversationStorage implements ConversationStorage {
private readonly db: Database
private readonly inTransaction: boolean
constructor(db: Database, inTransaction = false) {
this.db = db
this.inTransaction = inTransaction
}
async transaction<T>(tx: (storage: ConversationStorage) => T | Promise<T>): Promise<T> {
if (this.inTransaction) return tx(this)
return this.db.transaction(async (transactionDb) =>
tx(new DrizzleConversationStorage(transactionDb, true)),
)
}
async createConversation(userId: string): Promise<ConversationRow> {
return insertConversation(this.db, userId)
}
async listUserConversations(userId: string): Promise<ConversationRow[]> {
return this.db
.select()
.from(conversationsTable)
.where(eq(conversationsTable.userId, userId))
.orderBy(desc(conversationsTable.updatedAt), desc(conversationsTable.createdAt))
}
async findConversation(conversationId: string): Promise<ConversationRow | null> {
return findConversation(this.db, conversationId)
}
async getOrCreateConversation(userId: string): Promise<ConversationRow> {
return this.write(async (db) => {
await requireUserForUpdate(db, userId)
const existing = await latestConversation(db, userId)
if (existing) return existing
return insertConversation(db, userId)
})
}
async createFile(userId: string, input: CreateFileInput): Promise<FileRow> {
return insertFile(this.db, userId, input)
}
async appendEntry(
conversationId: string,
input: AppendConversationEntryInput,
): Promise<ConversationEntryRow> {
return this.write((db) => appendEntryToConversation(db, null, conversationId, input))
}
async appendAttachmentEntry(
conversationId: string,
input: AppendAttachmentEntryInput,
): Promise<AppendAttachmentEntryResult> {
return this.write((db) => appendAttachmentEntryToConversation(db, null, conversationId, input))
}
async nextSequence(conversationId: string): Promise<number> {
return nextSequence(this.db, conversationId)
}
async listUserConversationEntries(
userId: string,
conversationId: string,
params: ListConversationEntriesParams = {},
): Promise<ConversationEntryRow[]> {
if (!(await findUserConversation(this.db, userId, conversationId))) {
throw new ConversationNotFoundError(conversationId, userId)
}
if (params.visibility) {
return this.db
.select()
.from(conversationEntries)
.where(
and(
eq(conversationEntries.conversationId, conversationId),
eq(conversationEntries.visibility, params.visibility),
),
)
.orderBy(asc(conversationEntries.sequence))
}
return this.db
.select()
.from(conversationEntries)
.where(eq(conversationEntries.conversationId, conversationId))
.orderBy(asc(conversationEntries.sequence))
}
async listPendingUserConversationEntries(
userId: string,
conversationId: string,
): Promise<ConversationEntryRow[]> {
const entries = await this.db
.select({ entry: conversationEntries })
.from(conversationResponseStateTable)
.innerJoin(
conversationsTable,
and(
eq(conversationsTable.id, conversationResponseStateTable.conversationId),
eq(conversationsTable.userId, userId),
),
)
.innerJoin(
pendingSinceEntry,
and(
eq(pendingSinceEntry.id, conversationResponseStateTable.pendingSinceEntryId),
eq(pendingSinceEntry.conversationId, conversationResponseStateTable.conversationId),
),
)
.innerJoin(
conversationEntries,
and(
eq(conversationEntries.conversationId, conversationResponseStateTable.conversationId),
eq(conversationEntries.kind, ConversationEntryKind.UserMessage),
gte(conversationEntries.sequence, pendingSinceEntry.sequence),
),
)
.where(
and(
eq(conversationResponseStateTable.conversationId, conversationId),
eq(conversationEntries.conversationId, conversationId),
),
)
.orderBy(asc(conversationEntries.sequence))
if (entries.length > 0) return entries.map(({ entry }) => entry)
if (await findUserConversation(this.db, userId, conversationId)) return []
throw new ConversationNotFoundError(conversationId, userId)
}
async findConversationResponseState(
conversationId: string,
): Promise<ConversationResponseStateRow | null> {
const rows = await this.db
.select()
.from(conversationResponseStateTable)
.where(eq(conversationResponseStateTable.conversationId, conversationId))
.limit(1)
return rows[0] ?? null
}
async listPendingResponseStates(): Promise<ConversationResponseStateRow[]> {
const rows = await this.db
.select()
.from(conversationResponseStateTable)
.where(eq(conversationResponseStateTable.status, ConversationResponseStateStatus.Pending))
return rows
}
async listRunningResponseStates(): Promise<ConversationResponseStateRow[]> {
const rows = await this.db
.select()
.from(conversationResponseStateTable)
.where(eq(conversationResponseStateTable.status, ConversationResponseStateStatus.Running))
return rows
}
async upsertConversationResponseState(
conversationId: string,
input: UpsertConversationResponseStateInput,
): Promise<ConversationResponseStateRow> {
const now = new Date()
return this.write(async (db) => {
if (!(await findConversationByIdForUpdate(db, conversationId))) {
throw new ConversationNotFoundError(conversationId, "")
}
const rows = await db
.insert(conversationResponseStateTable)
.values({
conversationId,
status: input.status ?? ConversationResponseStateStatus.Pending,
pendingSinceEntryId: input.pendingSinceEntryId,
maxWaitUntil: input.maxWaitUntil,
runningSince: input.runningSince ?? null,
updatedAt: now,
})
.onConflictDoUpdate({
target: conversationResponseStateTable.conversationId,
set: {
status: input.status ?? ConversationResponseStateStatus.Pending,
maxWaitUntil: input.maxWaitUntil,
runningSince: input.runningSince ?? null,
updatedAt: now,
},
})
.returning()
return requireRow(rows)
})
}
async updateConversationResponseState(
conversationId: string,
input: UpdateConversationResponseStateInput,
): Promise<ConversationResponseStateRow | null> {
return this.write(async (db) => {
if (!(await findConversationByIdForUpdate(db, conversationId))) {
throw new ConversationNotFoundError(conversationId, "")
}
const rows = await db
.update(conversationResponseStateTable)
.set({
status: input.status,
pendingSinceEntryId: input.pendingSinceEntryId,
maxWaitUntil: input.maxWaitUntil,
runningSince: input.runningSince,
updatedAt: new Date(),
})
.where(eq(conversationResponseStateTable.conversationId, conversationId))
.returning()
return rows[0] ?? null
})
}
async markResponseStateStatus(
conversationIds: string[],
status: ConversationResponseStateStatus,
): Promise<ConversationResponseStateRow[]> {
return this.write(async (db) => {
const now = new Date()
let runningSince: Date | null
switch (status) {
case "pending":
case "failed":
runningSince = null
break
case "running":
runningSince = now
break
}
const rows = await db
.update(conversationResponseStateTable)
.set({
status,
runningSince,
updatedAt: now,
})
.where(inArray(conversationResponseStateTable.conversationId, conversationIds))
.returning()
return rows
})
}
async claimPendingConversationResponseState(
conversationId: string,
): Promise<ConversationResponseStateRow | null> {
return this.write(async (db) => {
const now = new Date()
const rows = await db
.update(conversationResponseStateTable)
.set({
status: "running",
runningSince: now,
updatedAt: now,
})
.where(
and(
eq(conversationResponseStateTable.conversationId, conversationId),
eq(conversationResponseStateTable.status, "pending"),
),
)
.returning()
return rows[0] ?? null
})
}
async clearConversationResponseState(conversationId: string): Promise<void> {
await this.write(async (db) => {
if (!(await findConversationByIdForUpdate(db, conversationId))) {
throw new ConversationNotFoundError(conversationId, "")
}
await db
.delete(conversationResponseStateTable)
.where(eq(conversationResponseStateTable.conversationId, conversationId))
})
}
private async write<T>(fn: (db: Database) => Promise<T>): Promise<T> {
if (this.inTransaction) return fn(this.db)
return this.db.transaction(fn)
}
}
export function createConversationStorage(db: Database): ConversationStorage {
return new DrizzleConversationStorage(db)
}
export function conversations(db: Database, userId: string) {
const storage = createConversationStorage(db)
return {
createConversation(): Promise<ConversationRow> {
return storage.createConversation(userId)
},
listConversations(): Promise<ConversationRow[]> {
return storage.listUserConversations(userId)
},
getConversation(conversationId: string): Promise<ConversationRow | null> {
return findUserConversation(db, userId, conversationId)
},
getOrCreateConversation(): Promise<ConversationRow> {
return storage.getOrCreateConversation(userId)
},
createFile(input: CreateFileInput): Promise<FileRow> {
return storage.createFile(userId, input)
},
appendEntry(
conversationId: string,
input: AppendConversationEntryInput,
): Promise<ConversationEntryRow> {
return db.transaction((tx) => appendEntryToConversation(tx, userId, conversationId, input))
},
appendAttachmentEntry(
conversationId: string,
input: AppendAttachmentEntryInput,
): Promise<AppendAttachmentEntryResult> {
return db.transaction((tx) =>
appendAttachmentEntryToConversation(tx, userId, conversationId, input),
)
},
listEntries(
conversationId: string,
params: ListConversationEntriesParams = {},
): Promise<ConversationEntryRow[]> {
return storage.listUserConversationEntries(userId, conversationId, params)
},
}
}
export function conversationResponse(db: Database, _userId: string, conversationId: string) {
const storage = createConversationStorage(db)
return {
get(): Promise<ConversationResponseStateRow | null> {
return storage.findConversationResponseState(conversationId)
},
upsert(input: UpsertConversationResponseStateInput): Promise<ConversationResponseStateRow> {
return storage.upsertConversationResponseState(conversationId, input)
},
update(
input: UpdateConversationResponseStateInput,
): Promise<ConversationResponseStateRow | null> {
return storage.updateConversationResponseState(conversationId, input)
},
clear(): Promise<void> {
return storage.clearConversationResponseState(conversationId)
},
}
}
function payloadForKind(
kind: ConversationEntryKind,
payload: AppendConversationEntryInput["payload"],
): ConversationEntryPayload {
switch (kind) {
case ConversationEntryKind.UserMessage:
return UserMessagePayload.assert(payload)
case ConversationEntryKind.AssistantMessage:
return AssistantMessagePayload.assert(payload)
case ConversationEntryKind.Attachment:
return AttachmentPayload.assert(payload)
case ConversationEntryKind.ContextSummary:
return ContextSummaryPayload.assert(payload)
case ConversationEntryKind.ToolCall:
case ConversationEntryKind.ToolResult:
case ConversationEntryKind.SystemNote:
return GenericObjectPayload.assert(payload)
}
}
async function appendEntryToConversation(
db: Database,
userId: string | null,
conversationId: string,
input: AppendConversationEntryInput,
): Promise<ConversationEntryRow> {
const kind = conversationEntryKind.assert(input.kind)
const visibility = conversationEntryVisibility.assert(
input.visibility ?? defaultVisibilityForKind(kind),
)
const payload = payloadForKind(kind, input.payload)
const metadata = ConversationEntryMetadata.assert(input.metadata ?? {})
let fileId: string | null = null
if (input.kind === ConversationEntryKind.Attachment) {
fileId = input.fileId
}
const conversation = userId
? await findConversationForUpdate(db, userId, conversationId)
: await findConversationByIdForUpdate(db, conversationId)
if (!conversation) {
throw new ConversationNotFoundError(conversationId, userId ?? "")
}
if (fileId) await requireFile(db, conversation.userId, fileId)
const sequence = await nextSequence(db, conversationId)
const rows = await db
.insert(conversationEntries)
.values({
conversationId,
sequence,
kind,
visibility,
fileId,
payload,
metadata,
})
.returning()
await touchConversation(db, conversation.userId, conversationId)
return requireRow(rows)
}
async function appendAttachmentEntryToConversation(
db: Database,
userId: string | null,
conversationId: string,
input: AppendAttachmentEntryInput,
): Promise<AppendAttachmentEntryResult> {
const payload = AttachmentPayload.assert(input.payload)
const visibility = conversationEntryVisibility.assert(
input.visibility ?? defaultVisibilityForKind(ConversationEntryKind.Attachment),
)
const metadata = ConversationEntryMetadata.assert(input.metadata ?? {})
const conversation = userId
? await findConversationForUpdate(db, userId, conversationId)
: await findConversationByIdForUpdate(db, conversationId)
if (!conversation) {
throw new ConversationNotFoundError(conversationId, userId ?? "")
}
const file = await insertFile(db, conversation.userId, input.file)
const sequence = await nextSequence(db, conversationId)
const rows = await db
.insert(conversationEntries)
.values({
conversationId,
sequence,
kind: ConversationEntryKind.Attachment,
visibility,
fileId: file.id,
payload,
metadata,
})
.returning()
await touchConversation(db, conversation.userId, conversationId)
return {
file,
entry: requireRow(rows),
}
}
async function requireUserForUpdate(db: Database, userId: string): Promise<void> {
const rows = await db
.select({ id: user.id })
.from(user)
.where(eq(user.id, userId))
.limit(1)
.for("update")
requireRow(rows, `User not found: ${userId}`)
}
export async function findConversation(
db: Database,
conversationId: string,
): Promise<ConversationRow | null> {
const rows = await db
.select()
.from(conversationsTable)
.where(eq(conversationsTable.id, conversationId))
.limit(1)
return rows[0] ?? null
}
async function findUserConversation(
db: Database,
userId: string,
conversationId: string,
): Promise<ConversationRow | null> {
const rows = await db
.select()
.from(conversationsTable)
.where(and(eq(conversationsTable.id, conversationId), eq(conversationsTable.userId, userId)))
.limit(1)
return rows[0] ?? null
}
async function findConversationForUpdate(
db: Database,
userId: string,
conversationId: string,
): Promise<ConversationRow | null> {
const rows = await db
.select()
.from(conversationsTable)
.where(and(eq(conversationsTable.id, conversationId), eq(conversationsTable.userId, userId)))
.limit(1)
.for("update")
return rows[0] ?? null
}
async function findConversationByIdForUpdate(
db: Database,
conversationId: string,
): Promise<ConversationRow | null> {
const rows = await db
.select()
.from(conversationsTable)
.where(eq(conversationsTable.id, conversationId))
.limit(1)
.for("update")
return rows[0] ?? null
}
async function latestConversation(db: Database, userId: string): Promise<ConversationRow | null> {
const rows = await db
.select()
.from(conversationsTable)
.where(eq(conversationsTable.userId, userId))
.orderBy(desc(conversationsTable.updatedAt), desc(conversationsTable.createdAt))
.limit(1)
return rows[0] ?? null
}
async function insertConversation(db: Database, userId: string): Promise<ConversationRow> {
const rows = await db
.insert(conversationsTable)
.values({
userId,
})
.returning()
return requireRow(rows)
}
async function requireFile(db: Database, userId: string, fileId: string): Promise<FileRow> {
const rows = await db
.select()
.from(files)
.where(and(eq(files.id, fileId), eq(files.userId, userId)))
.limit(1)
return requireRow(rows, `File not found: ${fileId}`)
}
async function insertFile(db: Database, userId: string, input: CreateFileInput): Promise<FileRow> {
const rows = await db
.insert(files)
.values({
userId,
storageKey: input.storageKey,
originalName: input.originalName ?? null,
mimeType: input.mimeType,
sizeBytes: input.sizeBytes,
metadata: input.metadata ?? {},
})
.returning()
return requireRow(rows)
}
async function touchConversation(
db: Database,
userId: string,
conversationId: string,
): Promise<void> {
await db
.update(conversationsTable)
.set({ updatedAt: new Date() })
.where(and(eq(conversationsTable.id, conversationId), eq(conversationsTable.userId, userId)))
}
async function nextSequence(db: Database, conversationId: string): Promise<number> {
const rows = await db
.select({ sequence: conversationEntries.sequence })
.from(conversationEntries)
.where(eq(conversationEntries.conversationId, conversationId))
.orderBy(desc(conversationEntries.sequence))
.limit(1)
return (rows[0]?.sequence ?? 0) + 1
}
function requireRow<T>(rows: T[], message = "Expected database row"): T {
const row = rows[0]
if (!row) throw new Error(message)
return row
}
function defaultVisibilityForKind(kind: ConversationEntryKind): ConversationEntryVisibility {
switch (kind) {
case ConversationEntryKind.UserMessage:
case ConversationEntryKind.AssistantMessage:
case ConversationEntryKind.Attachment:
return ConversationEntryVisibility.UserVisible
case ConversationEntryKind.ToolCall:
case ConversationEntryKind.ToolResult:
case ConversationEntryKind.ContextSummary:
case ConversationEntryKind.SystemNote:
return ConversationEntryVisibility.Internal
}
}

View File

@@ -1,11 +0,0 @@
export class ConversationNotFoundError extends Error {
readonly conversationId: string
readonly userId: string
constructor(conversationId: string, userId: string) {
super(`Conversation "${conversationId}" not found for user "${userId}"`)
this.name = "ConversationNotFoundError"
this.conversationId = conversationId
this.userId = userId
}
}

View File

@@ -1,55 +1,21 @@
import { ConversationEntryKind, ConversationEntryVisibility } from "@freya/core"
import { beforeEach, describe, expect, mock, test } from "bun:test" import { beforeEach, describe, expect, mock, test } from "bun:test"
import { Hono } from "hono" import { Hono } from "hono"
import type { Database } from "../db/index.ts" import type { Database } from "../db/index.ts"
import type { import type { ConversationRow } from "./storage.ts"
ConversationEntryRow,
ConversationRow,
ListConversationEntriesParams,
} from "./storage.ts"
import { mockAuthSessionMiddleware } from "../auth/session-middleware.ts" import { mockAuthSessionMiddleware } from "../auth/session-middleware.ts"
import { ConversationNotFoundError } from "./errors.ts"
import { registerConversationsHttpHandlers } from "./http.ts" import { registerConversationsHttpHandlers } from "./http.ts"
const MockUserId = "k7Gx2mPqRvNwYs9TdLfA4bHcJeUo1iZn" const MockUserId = "k7Gx2mPqRvNwYs9TdLfA4bHcJeUo1iZn"
const ConversationId = "11111111-1111-4111-8111-111111111111"
const MissingConversationId = "22222222-2222-4222-8222-222222222222"
const conversationRowsByUser = new Map<string, ConversationRow[]>() const conversationRowsByUser = new Map<string, ConversationRow[]>()
const conversationEntryRowsByUserAndConversation = new Map<string, ConversationEntryRow[]>()
const listEntriesCalls: Array<{
userId: string
conversationId: string
params: ListConversationEntriesParams
}> = []
mock.module("./db-storage.ts", () => ({ mock.module("./storage.ts", () => ({
conversations: (_db: Database, userId: string) => ({ conversations: (_db: Database, userId: string) => ({
async listConversations(): Promise<ConversationRow[]> { async listConversations(): Promise<ConversationRow[]> {
return conversationRowsByUser.get(userId) ?? [] return conversationRowsByUser.get(userId) ?? []
}, },
async listEntries(
conversationId: string,
params: ListConversationEntriesParams = {},
): Promise<ConversationEntryRow[]> {
listEntriesCalls.push({ userId, conversationId, params })
const rows = conversationEntryRowsByUserAndConversation.get(
conversationEntriesKey(userId, conversationId),
)
if (!rows) {
throw new ConversationNotFoundError(conversationId, userId)
}
if (params.visibility) {
return rows.filter((row) => row.visibility === params.visibility)
}
return rows
},
}), }),
})) }))
@@ -78,39 +44,9 @@ function createConversationRow(
} }
} }
function createConversationEntryRow(
id: string,
conversationId: string,
sequence: number,
kind: ConversationEntryRow["kind"],
visibility: ConversationEntryRow["visibility"],
payload: ConversationEntryRow["payload"],
createdAt: string,
metadata: ConversationEntryRow["metadata"] = {},
fileId: string | null = null,
): ConversationEntryRow {
return {
id,
conversationId,
sequence,
kind,
visibility,
fileId,
payload,
metadata,
createdAt: new Date(createdAt),
}
}
function conversationEntriesKey(userId: string, conversationId: string): string {
return `${userId}:${conversationId}`
}
describe("GET /api/conversations", () => { describe("GET /api/conversations", () => {
beforeEach(() => { beforeEach(() => {
conversationRowsByUser.clear() conversationRowsByUser.clear()
conversationEntryRowsByUserAndConversation.clear()
listEntriesCalls.length = 0
}) })
test("returns 401 without auth", async () => { test("returns 401 without auth", async () => {
@@ -172,162 +108,3 @@ describe("GET /api/conversations", () => {
}) })
}) })
}) })
describe("GET /api/conversations/:id/entries", () => {
beforeEach(() => {
conversationRowsByUser.clear()
conversationEntryRowsByUserAndConversation.clear()
listEntriesCalls.length = 0
})
test("returns 401 without auth", async () => {
const app = buildTestApp()
const res = await app.request("/api/conversations/conversation-1/entries")
expect(res.status).toBe(401)
})
test("returns user-visible entries for the authenticated user", async () => {
conversationEntryRowsByUserAndConversation.set(
conversationEntriesKey(MockUserId, ConversationId),
[
createConversationEntryRow(
"entry-user",
ConversationId,
1,
ConversationEntryKind.UserMessage,
ConversationEntryVisibility.UserVisible,
{
role: "user",
parts: [{ type: "text", text: "What is on today?" }],
},
"2026-06-17T09:30:00.000Z",
),
createConversationEntryRow(
"entry-tool",
ConversationId,
2,
ConversationEntryKind.ToolCall,
ConversationEntryVisibility.Internal,
{
toolName: "freya_list_context",
input: {},
},
"2026-06-17T09:30:01.000Z",
),
createConversationEntryRow(
"entry-assistant",
ConversationId,
3,
ConversationEntryKind.AssistantMessage,
ConversationEntryVisibility.UserVisible,
{
role: "assistant",
parts: [{ type: "text", text: "You have two calendar events." }],
},
"2026-06-17T09:30:02.000Z",
{ runId: "run-1" },
),
],
)
const app = buildTestApp("user-1")
const res = await app.request(`/api/conversations/${ConversationId}/entries`)
expect(res.status).toBe(200)
expect(listEntriesCalls).toEqual([
{
userId: MockUserId,
conversationId: ConversationId,
params: { visibility: ConversationEntryVisibility.UserVisible },
},
])
const body = (await res.json()) as { entries: unknown[] }
expect(body).toEqual({
entries: [
{
id: "entry-user",
conversationId: ConversationId,
sequence: 1,
kind: ConversationEntryKind.UserMessage,
visibility: ConversationEntryVisibility.UserVisible,
fileId: null,
payload: {
role: "user",
parts: [{ type: "text", text: "What is on today?" }],
},
metadata: {},
createdAt: "2026-06-17T09:30:00.000Z",
},
{
id: "entry-assistant",
conversationId: ConversationId,
sequence: 3,
kind: ConversationEntryKind.AssistantMessage,
visibility: ConversationEntryVisibility.UserVisible,
fileId: null,
payload: {
role: "assistant",
parts: [{ type: "text", text: "You have two calendar events." }],
},
metadata: { runId: "run-1" },
createdAt: "2026-06-17T09:30:02.000Z",
},
],
})
})
test("returns an empty list when the conversation has no user-visible entries", async () => {
conversationEntryRowsByUserAndConversation.set(
conversationEntriesKey(MockUserId, ConversationId),
[
createConversationEntryRow(
"entry-tool",
ConversationId,
1,
ConversationEntryKind.ToolResult,
ConversationEntryVisibility.Internal,
{ toolCallId: "call-1", output: { ok: true } },
"2026-06-17T09:30:00.000Z",
),
],
)
const app = buildTestApp("user-1")
const res = await app.request(`/api/conversations/${ConversationId}/entries`)
expect(res.status).toBe(200)
const body = (await res.json()) as { entries: unknown[] }
expect(body).toEqual({ entries: [] })
})
test("returns 404 for malformed conversation ids without querying storage", async () => {
const app = buildTestApp("user-1")
const res = await app.request("/api/conversations/missing-conversation/entries")
expect(res.status).toBe(404)
expect(listEntriesCalls).toEqual([])
const body = (await res.json()) as { error: string }
expect(body).toEqual({ error: "Conversation not found" })
})
test("returns 404 when the conversation does not exist for the user", async () => {
const app = buildTestApp("user-1")
const res = await app.request(`/api/conversations/${MissingConversationId}/entries`)
expect(res.status).toBe(404)
expect(listEntriesCalls).toEqual([
{
userId: MockUserId,
conversationId: MissingConversationId,
params: { visibility: ConversationEntryVisibility.UserVisible },
},
])
const body = (await res.json()) as { error: string }
expect(body).toEqual({ error: "Conversation not found" })
})
})

View File

@@ -1,38 +1,23 @@
import type { Context, Hono } from "hono" import type { Context, Hono } from "hono"
import { ConversationEntryVisibility } from "@freya/core"
import { type } from "arktype"
import { createMiddleware } from "hono/factory" import { createMiddleware } from "hono/factory"
import type { AuthSessionMiddleware } from "../auth/session-middleware.ts" import type { AuthSessionMiddleware } from "../auth/session-middleware.ts"
import type { Database } from "../db/index.ts" import type { Database } from "../db/index.ts"
import type { ConversationRow } from "./storage.ts"
import { conversations } from "./db-storage.ts" import { conversations } from "./storage.ts"
import { ConversationNotFoundError } from "./errors.ts"
/** Hono environment populated by the conversations route middleware. */
type Env = { type Env = {
Variables: { Variables: {
db: Database db: Database
} }
} }
/** Serialized conversation summary returned by the list endpoint. */
interface ConversationSummaryResponse {
id: string
createdAt: string
updatedAt: string
}
/** Dependencies required to register conversation HTTP handlers. */
interface ConversationsHttpHandlersDeps { interface ConversationsHttpHandlersDeps {
db: Database db: Database
authSessionMiddleware: AuthSessionMiddleware authSessionMiddleware: AuthSessionMiddleware
} }
const ConversationIdParam = type("string.uuid")
export function registerConversationsHttpHandlers( export function registerConversationsHttpHandlers(
app: Hono, app: Hono,
{ db, authSessionMiddleware }: ConversationsHttpHandlersDeps, { db, authSessionMiddleware }: ConversationsHttpHandlersDeps,
@@ -43,7 +28,6 @@ export function registerConversationsHttpHandlers(
}) })
app.get("/api/conversations", inject, authSessionMiddleware, handleListConversations) app.get("/api/conversations", inject, authSessionMiddleware, handleListConversations)
app.get("/api/conversations/:id/entries", inject, authSessionMiddleware, handleListEntries)
} }
async function handleListConversations(c: Context<Env>) { async function handleListConversations(c: Context<Env>) {
@@ -51,54 +35,10 @@ async function handleListConversations(c: Context<Env>) {
const db = c.get("db") const db = c.get("db")
return c.json({ return c.json({
conversations: (await conversations(db, user.id).listConversations()).map( conversations: (await conversations(db, user.id).listConversations()).map((row) => ({
serializeConversation, id: row.id,
), createdAt: row.createdAt.toISOString(),
updatedAt: row.updatedAt.toISOString(),
})),
}) })
} }
async function handleListEntries(c: Context<Env>) {
const user = c.get("user")!
const db = c.get("db")
const conversationId = c.req.param("id")
if (!conversationId) {
return c.json({ error: "Conversation not found" }, 404)
}
const parsedConversationId = ConversationIdParam(conversationId)
if (parsedConversationId instanceof type.errors) {
return c.json({ error: "Conversation not found" }, 404)
}
try {
const entries = await conversations(db, user.id).listEntries(parsedConversationId, {
visibility: ConversationEntryVisibility.UserVisible,
})
return c.json({
entries: entries.map((row) => ({
id: row.id,
conversationId: row.conversationId,
sequence: row.sequence,
kind: row.kind,
visibility: row.visibility,
fileId: row.fileId,
payload: row.payload,
metadata: row.metadata,
createdAt: row.createdAt.toISOString(),
})),
})
} catch (err) {
if (err instanceof ConversationNotFoundError) {
return c.json({ error: "Conversation not found" }, 404)
}
throw err
}
}
function serializeConversation(row: ConversationRow): ConversationSummaryResponse {
return {
id: row.id,
createdAt: row.createdAt.toISOString(),
updatedAt: row.updatedAt.toISOString(),
}
}

View File

@@ -1,85 +1,41 @@
import { import { and, asc, desc, eq } from "drizzle-orm"
import type { Database } from "../db/index.ts"
import type {
AssistantMessagePayload, AssistantMessagePayload,
AttachmentPayload, AttachmentPayload,
ConversationEntryKind,
ConversationEntryMetadata,
ConversationEntryVisibility,
ContextSummaryPayload, ContextSummaryPayload,
ConversationEntryKind as ConversationEntryKindType,
ConversationEntryMetadata,
ConversationEntryPayload,
ConversationEntryVisibility as ConversationEntryVisibilityType,
GenericObjectPayload, GenericObjectPayload,
UserMessagePayload, UserMessagePayload,
} from "@freya/core" } from "./types.ts"
import { import {
conversationEntries, conversationEntries,
conversationResponseState as conversationResponseStateTable,
conversations as conversationsTable, conversations as conversationsTable,
files, files,
type ConversationResponseStateStatus, user,
} from "../db/schema.ts" } from "../db/schema.ts"
import {
ConversationEntryMetadata as ConversationEntryMetadataSchema,
AssistantMessagePayload as AssistantMessagePayloadSchema,
AttachmentPayload as AttachmentPayloadSchema,
ConversationEntryKind,
ConversationEntryKindInput,
ConversationEntryVisibility,
ConversationEntryVisibilityInput,
ContextSummaryPayload as ContextSummaryPayloadSchema,
GenericObjectPayload as GenericObjectPayloadSchema,
UserMessagePayload as UserMessagePayloadSchema,
} from "./types.ts"
export interface ConversationStorage {
transaction<T>(tx: (storage: ConversationStorage) => T | Promise<T>): Promise<T>
createConversation(userId: string): Promise<ConversationRow>
listUserConversations(userId: string): Promise<ConversationRow[]>
findConversation(conversationId: string): Promise<ConversationRow | null>
getOrCreateConversation(userId: string): Promise<ConversationRow>
createFile(userId: string, input: CreateFileInput): Promise<FileRow>
appendEntry(
conversationId: string,
input: AppendConversationEntryInput,
): Promise<ConversationEntryRow>
appendAttachmentEntry(
conversationId: string,
input: AppendAttachmentEntryInput,
): Promise<AppendAttachmentEntryResult>
nextSequence(conversationId: string): Promise<number>
listUserConversationEntries(
userId: string,
conversationId: string,
params?: ListConversationEntriesParams,
): Promise<ConversationEntryRow[]>
listPendingUserConversationEntries(
userId: string,
conversationId: string,
): Promise<ConversationEntryRow[]>
findConversationResponseState(
conversationId: string,
): Promise<ConversationResponseStateRow | null>
// TODO: add pagination support
listPendingResponseStates(): Promise<ConversationResponseStateRow[]>
// TODO: add pagination support
listRunningResponseStates(): Promise<ConversationResponseStateRow[]>
upsertConversationResponseState(
conversationId: string,
input: UpsertConversationResponseStateInput,
): Promise<ConversationResponseStateRow>
updateConversationResponseState(
conversationId: string,
input: UpdateConversationResponseStateInput,
): Promise<ConversationResponseStateRow | null>
markResponseStateStatus(
conversationIds: string[],
status: ConversationResponseStateStatus,
): Promise<ConversationResponseStateRow[]>
claimPendingConversationResponseState(
conversationId: string,
): Promise<ConversationResponseStateRow | null>
clearConversationResponseState(conversationId: string): Promise<void>
}
/** Database row shape for a conversation owned by a user. */
export type ConversationRow = typeof conversationsTable.$inferSelect export type ConversationRow = typeof conversationsTable.$inferSelect
/** Database row shape for an entry in a conversation timeline. */
export type ConversationEntryRow = typeof conversationEntries.$inferSelect export type ConversationEntryRow = typeof conversationEntries.$inferSelect
/** Database row shape for pending assistant response state in a conversation. */
export type ConversationResponseStateRow = typeof conversationResponseStateTable.$inferSelect
/** Database row shape for an uploaded file referenced by conversations. */
export type FileRow = typeof files.$inferSelect export type FileRow = typeof files.$inferSelect
/** Input required to create a stored file record. */
export interface CreateFileInput { export interface CreateFileInput {
storageKey: string storageKey: string
originalName?: string originalName?: string
@@ -88,27 +44,23 @@ export interface CreateFileInput {
metadata?: Record<string, unknown> metadata?: Record<string, unknown>
} }
/** Input for creating a file and appending its attachment entry together. */
export interface AppendAttachmentEntryInput { export interface AppendAttachmentEntryInput {
file: CreateFileInput file: CreateFileInput
payload: AttachmentPayload payload: AttachmentPayload
visibility?: ConversationEntryVisibility visibility?: ConversationEntryVisibilityType
metadata?: ConversationEntryMetadata metadata?: ConversationEntryMetadata
} }
/** Result returned after a file-backed attachment entry is appended. */
export interface AppendAttachmentEntryResult { export interface AppendAttachmentEntryResult {
file: FileRow file: FileRow
entry: ConversationEntryRow entry: ConversationEntryRow
} }
/** Common fields accepted when appending any conversation entry. */
interface AppendConversationEntryBase { interface AppendConversationEntryBase {
visibility?: ConversationEntryVisibility visibility?: ConversationEntryVisibilityType
metadata?: ConversationEntryMetadata metadata?: ConversationEntryMetadata
} }
/** Discriminated input for appending any supported entry kind to a conversation. */
export type AppendConversationEntryInput = export type AppendConversationEntryInput =
| (AppendConversationEntryBase & { | (AppendConversationEntryBase & {
kind: typeof ConversationEntryKind.UserMessage kind: typeof ConversationEntryKind.UserMessage
@@ -139,31 +91,291 @@ export type AppendConversationEntryInput =
fileId?: never fileId?: never
}) })
/** Filters accepted when listing conversation entries. */
export interface ListConversationEntriesParams { export interface ListConversationEntriesParams {
visibility?: ConversationEntryVisibility visibility?: ConversationEntryVisibilityType
} }
/** Input for creating or replacing pending assistant response state. */ export function conversations(db: Database, userId: string) {
export interface UpsertConversationResponseStateInput { return {
status?: ConversationResponseStateStatus async createConversation(): Promise<ConversationRow> {
pendingSinceEntryId: string return insertConversation(db, userId)
maxWaitUntil: Date },
runningSince?: Date | null
async listConversations(): Promise<ConversationRow[]> {
return db
.select()
.from(conversationsTable)
.where(eq(conversationsTable.userId, userId))
.orderBy(desc(conversationsTable.updatedAt), desc(conversationsTable.createdAt))
},
async getOrCreateConversation(): Promise<ConversationRow> {
return db.transaction(async (tx) => {
await requireUserForUpdate(tx, userId)
const existing = await latestConversation(tx, userId)
if (existing) return existing
return insertConversation(tx, userId)
})
},
async createFile(input: CreateFileInput): Promise<FileRow> {
return insertFile(db, userId, input)
},
async appendEntry(
conversationId: string,
input: AppendConversationEntryInput,
): Promise<ConversationEntryRow> {
const kind = ConversationEntryKindInput.assert(input.kind)
const visibility = ConversationEntryVisibilityInput.assert(
input.visibility ?? defaultVisibilityForKind(kind),
)
const payload = payloadForKind(kind, input.payload)
const metadata = ConversationEntryMetadataSchema.assert(input.metadata ?? {})
let fileId: string | null = null
if (input.kind === ConversationEntryKind.Attachment) {
fileId = input.fileId
await requireFile(db, userId, fileId)
}
const rows = await db.transaction(async (tx) => {
await requireConversationForUpdate(tx, userId, conversationId)
const sequence = await nextSequence(tx, conversationId)
const rows = await tx
.insert(conversationEntries)
.values({
conversationId,
sequence,
kind,
visibility,
fileId,
payload,
metadata,
})
.returning()
await touchConversation(tx, userId, conversationId)
return rows
})
return requireRow(rows)
},
async appendAttachmentEntry(
conversationId: string,
input: AppendAttachmentEntryInput,
): Promise<AppendAttachmentEntryResult> {
const payload = AttachmentPayloadSchema.assert(input.payload)
const visibility = ConversationEntryVisibilityInput.assert(
input.visibility ?? defaultVisibilityForKind(ConversationEntryKind.Attachment),
)
const metadata = ConversationEntryMetadataSchema.assert(input.metadata ?? {})
return db.transaction(async (tx) => {
await requireConversationForUpdate(tx, userId, conversationId)
const file = await insertFile(tx, userId, input.file)
const sequence = await nextSequence(tx, conversationId)
const rows = await tx
.insert(conversationEntries)
.values({
conversationId,
sequence,
kind: ConversationEntryKind.Attachment,
visibility,
fileId: file.id,
payload,
metadata,
})
.returning()
await touchConversation(tx, userId, conversationId)
return {
file,
entry: requireRow(rows),
}
})
},
async listEntries(
conversationId: string,
params: ListConversationEntriesParams = {},
): Promise<ConversationEntryRow[]> {
await requireConversation(db, userId, conversationId)
if (params.visibility) {
return db
.select()
.from(conversationEntries)
.where(
and(
eq(conversationEntries.conversationId, conversationId),
eq(conversationEntries.visibility, params.visibility),
),
)
.orderBy(asc(conversationEntries.sequence))
}
return db
.select()
.from(conversationEntries)
.where(eq(conversationEntries.conversationId, conversationId))
.orderBy(asc(conversationEntries.sequence))
},
}
} }
/** Input for patching pending assistant response state. */ function payloadForKind(
export interface UpdateConversationResponseStateInput { kind: ConversationEntryKindType,
status?: ConversationResponseStateStatus payload: AppendConversationEntryInput["payload"],
pendingSinceEntryId?: string ): ConversationEntryPayload {
maxWaitUntil?: Date switch (kind) {
runningSince?: Date | null case ConversationEntryKind.UserMessage:
return UserMessagePayloadSchema.assert(payload)
case ConversationEntryKind.AssistantMessage:
return AssistantMessagePayloadSchema.assert(payload)
case ConversationEntryKind.Attachment:
return AttachmentPayloadSchema.assert(payload)
case ConversationEntryKind.ContextSummary:
return ContextSummaryPayloadSchema.assert(payload)
case ConversationEntryKind.ToolCall:
case ConversationEntryKind.ToolResult:
case ConversationEntryKind.SystemNote:
return GenericObjectPayloadSchema.assert(payload)
}
} }
export { async function requireUserForUpdate(db: Database, userId: string): Promise<void> {
createConversationStorage, const rows = await db
conversationResponse, .select({ id: user.id })
conversations, .from(user)
DrizzleConversationStorage, .where(eq(user.id, userId))
findConversation, .limit(1)
} from "./db-storage.ts" .for("update")
requireRow(rows, `User not found: ${userId}`)
}
async function requireConversation(
db: Database,
userId: string,
conversationId: string,
): Promise<ConversationRow> {
const rows = await db
.select()
.from(conversationsTable)
.where(and(eq(conversationsTable.id, conversationId), eq(conversationsTable.userId, userId)))
.limit(1)
return requireRow(rows, `Conversation not found: ${conversationId}`)
}
async function requireConversationForUpdate(
db: Database,
userId: string,
conversationId: string,
): Promise<ConversationRow> {
const rows = await db
.select()
.from(conversationsTable)
.where(and(eq(conversationsTable.id, conversationId), eq(conversationsTable.userId, userId)))
.limit(1)
.for("update")
return requireRow(rows, `Conversation not found: ${conversationId}`)
}
async function latestConversation(db: Database, userId: string): Promise<ConversationRow | null> {
const rows = await db
.select()
.from(conversationsTable)
.where(eq(conversationsTable.userId, userId))
.orderBy(desc(conversationsTable.updatedAt), desc(conversationsTable.createdAt))
.limit(1)
return rows[0] ?? null
}
async function insertConversation(db: Database, userId: string): Promise<ConversationRow> {
const rows = await db
.insert(conversationsTable)
.values({
userId,
})
.returning()
return requireRow(rows)
}
async function requireFile(db: Database, userId: string, fileId: string): Promise<FileRow> {
const rows = await db
.select()
.from(files)
.where(and(eq(files.id, fileId), eq(files.userId, userId)))
.limit(1)
return requireRow(rows, `File not found: ${fileId}`)
}
async function insertFile(db: Database, userId: string, input: CreateFileInput): Promise<FileRow> {
const rows = await db
.insert(files)
.values({
userId,
storageKey: input.storageKey,
originalName: input.originalName ?? null,
mimeType: input.mimeType,
sizeBytes: input.sizeBytes,
metadata: input.metadata ?? {},
})
.returning()
return requireRow(rows)
}
async function touchConversation(
db: Database,
userId: string,
conversationId: string,
): Promise<void> {
await db
.update(conversationsTable)
.set({ updatedAt: new Date() })
.where(and(eq(conversationsTable.id, conversationId), eq(conversationsTable.userId, userId)))
}
async function nextSequence(db: Database, conversationId: string): Promise<number> {
const rows = await db
.select({ sequence: conversationEntries.sequence })
.from(conversationEntries)
.where(eq(conversationEntries.conversationId, conversationId))
.orderBy(desc(conversationEntries.sequence))
.limit(1)
return (rows[0]?.sequence ?? 0) + 1
}
function requireRow<T>(rows: T[], message = "Expected database row"): T {
const row = rows[0]
if (!row) throw new Error(message)
return row
}
function defaultVisibilityForKind(
kind: ConversationEntryKindType,
): ConversationEntryVisibilityType {
switch (kind) {
case ConversationEntryKind.UserMessage:
case ConversationEntryKind.AssistantMessage:
case ConversationEntryKind.Attachment:
return ConversationEntryVisibility.UserVisible
case ConversationEntryKind.ToolCall:
case ConversationEntryKind.ToolResult:
case ConversationEntryKind.ContextSummary:
case ConversationEntryKind.SystemNote:
return ConversationEntryVisibility.Internal
}
}

View File

@@ -7,7 +7,7 @@ import {
ConversationEntryMetadata, ConversationEntryMetadata,
GenericObjectPayload, GenericObjectPayload,
UserMessagePayload, UserMessagePayload,
} from "./conversation" } from "./types.ts"
describe("conversation entry schemas", () => { describe("conversation entry schemas", () => {
test("parses valid user message payloads", () => { test("parses valid user message payloads", () => {

View File

@@ -1,6 +1,5 @@
import { type } from "arktype" import { type } from "arktype"
/** Entry kinds supported by the persisted conversation timeline. */
export const ConversationEntryKind = { export const ConversationEntryKind = {
UserMessage: "user_message", UserMessage: "user_message",
AssistantMessage: "assistant_message", AssistantMessage: "assistant_message",
@@ -11,21 +10,17 @@ export const ConversationEntryKind = {
SystemNote: "system_note", SystemNote: "system_note",
} as const } as const
/** Discriminator for the payload shape and handling of a conversation entry. */
export type ConversationEntryKind = export type ConversationEntryKind =
(typeof ConversationEntryKind)[keyof typeof ConversationEntryKind] (typeof ConversationEntryKind)[keyof typeof ConversationEntryKind]
/** Visibility scopes supported by stored conversation entries. */
export const ConversationEntryVisibility = { export const ConversationEntryVisibility = {
UserVisible: "user_visible", UserVisible: "user_visible",
Internal: "internal", Internal: "internal",
} as const } as const
/** Indicates whether a conversation entry should be exposed to the user. */
export type ConversationEntryVisibility = export type ConversationEntryVisibility =
(typeof ConversationEntryVisibility)[keyof typeof ConversationEntryVisibility] (typeof ConversationEntryVisibility)[keyof typeof ConversationEntryVisibility]
/** Attachment media categories accepted by conversation entries. */
export const AttachmentType = { export const AttachmentType = {
Image: "image", Image: "image",
Audio: "audio", Audio: "audio",
@@ -34,64 +29,57 @@ export const AttachmentType = {
Other: "other", Other: "other",
} as const } as const
/** File or media category associated with an attachment payload. */
export type AttachmentType = (typeof AttachmentType)[keyof typeof AttachmentType] export type AttachmentType = (typeof AttachmentType)[keyof typeof AttachmentType]
/** Plain text content part for a message. */ export const ConversationEntryKindInput = type.enumerated(...Object.values(ConversationEntryKind))
export const TextMessagePart = type({ export const ConversationEntryVisibilityInput = type.enumerated(
...Object.values(ConversationEntryVisibility),
)
export const AttachmentTypeInput = type.enumerated(...Object.values(AttachmentType))
const TextMessagePart = type({
"+": "reject", "+": "reject",
type: "'text'", type: "'text'",
text: "string", text: "string",
}) })
/** Structured JSON content part for a message. */ const JsonMessagePart = type({
export const JsonMessagePart = type({
"+": "reject", "+": "reject",
type: "'json'", type: "'json'",
value: "unknown", value: "unknown",
}) })
/** Content part variants supported by user and assistant messages. */
export const MessagePart = type.or(TextMessagePart, JsonMessagePart) export const MessagePart = type.or(TextMessagePart, JsonMessagePart)
/** A structured content part inside a user or assistant message payload. */
export type MessagePart = typeof MessagePart.infer export type MessagePart = typeof MessagePart.infer
/** User-authored message entry payload. */
export const UserMessagePayload = type({ export const UserMessagePayload = type({
"+": "reject", "+": "reject",
role: "'user'", role: "'user'",
parts: MessagePart.array().atLeastLength(1), parts: MessagePart.array().atLeastLength(1),
}) })
/** Payload stored for a conversation entry containing a user message. */
export type UserMessagePayload = typeof UserMessagePayload.infer export type UserMessagePayload = typeof UserMessagePayload.infer
/** Assistant-authored message entry payload. */
export const AssistantMessagePayload = type({ export const AssistantMessagePayload = type({
"+": "reject", "+": "reject",
role: "'assistant'", role: "'assistant'",
parts: MessagePart.array().atLeastLength(1), parts: MessagePart.array().atLeastLength(1),
}) })
/** Payload stored for a conversation entry containing an assistant message. */
export type AssistantMessagePayload = typeof AssistantMessagePayload.infer export type AssistantMessagePayload = typeof AssistantMessagePayload.infer
/** Attachment entry payload. */
export const AttachmentPayload = type({ export const AttachmentPayload = type({
"+": "reject", "+": "reject",
role: type.enumerated("user", "assistant"), role: type.enumerated("user", "assistant"),
name: "string", name: "string",
mimeType: "string", mimeType: "string",
attachmentType: type.enumerated(...Object.values(AttachmentType)), attachmentType: AttachmentTypeInput,
"caption?": "string", "caption?": "string",
}) })
/** Payload stored for a conversation entry that references an uploaded file. */
export type AttachmentPayload = typeof AttachmentPayload.infer export type AttachmentPayload = typeof AttachmentPayload.infer
/** Durable facts extracted from compacted conversation history. */ const ContextSummary = type({
export const ContextSummary = type({
"+": "reject", "+": "reject",
"userIntent?": "string", "userIntent?": "string",
durableFacts: type.string.array(), durableFacts: type.string.array(),
@@ -101,10 +89,6 @@ export const ContextSummary = type({
importantDetails: type.string.array(), importantDetails: type.string.array(),
}) })
/** Durable facts and follow-ups retained from compacted conversation history. */
export type ContextSummary = typeof ContextSummary.infer
/** Context-summary conversation entry payload. */
export const ContextSummaryPayload = type({ export const ContextSummaryPayload = type({
"+": "reject", "+": "reject",
covers: type({ covers: type({
@@ -117,10 +101,8 @@ export const ContextSummaryPayload = type({
"sourceEntryIds?": type.string.array(), "sourceEntryIds?": type.string.array(),
}) })
/** Payload describing a compaction summary and the sequence range it covers. */
export type ContextSummaryPayload = typeof ContextSummaryPayload.infer export type ContextSummaryPayload = typeof ContextSummaryPayload.infer
/** Model invocation metadata recorded on generated entries. */
export const ModelRunMetadata = type({ export const ModelRunMetadata = type({
"+": "reject", "+": "reject",
route: "string", route: "string",
@@ -134,43 +116,21 @@ export const ModelRunMetadata = type({
"providerRequestId?": "string", "providerRequestId?": "string",
}) })
/** Metadata describing the model run that produced a conversation entry. */
export type ModelRunMetadata = typeof ModelRunMetadata.infer export type ModelRunMetadata = typeof ModelRunMetadata.infer
/** Arbitrary metadata stored alongside conversation entries. */
export const ConversationEntryMetadata = type({ export const ConversationEntryMetadata = type({
"modelRun?": ModelRunMetadata, "modelRun?": ModelRunMetadata,
"[string]": "unknown", "[string]": "unknown",
}) })
/** Metadata bag attached to a conversation entry. */
export type ConversationEntryMetadata = typeof ConversationEntryMetadata.infer export type ConversationEntryMetadata = typeof ConversationEntryMetadata.infer
export const ToolCallPayload = type({
toolName: "string",
})
export type ToolCallPayload = typeof ToolCallPayload.infer
export const ToolResultPayload = type({
toolName: "string",
ok: "boolean",
})
export type ToolResultPayload = typeof ToolResultPayload.infer
/** Generic object payload used by operational entries. */
export const GenericObjectPayload = type("Record<string, unknown>") export const GenericObjectPayload = type("Record<string, unknown>")
/** Fallback payload shape for tool calls, tool results, and system notes. */
export type GenericObjectPayload = typeof GenericObjectPayload.infer export type GenericObjectPayload = typeof GenericObjectPayload.infer
/** Union of payload shapes that can be stored on a conversation entry. */
export type ConversationEntryPayload = export type ConversationEntryPayload =
| UserMessagePayload | UserMessagePayload
| AssistantMessagePayload | AssistantMessagePayload
| AttachmentPayload | AttachmentPayload
| ContextSummaryPayload | ContextSummaryPayload
| ToolCallPayload
| ToolResultPayload
| GenericObjectPayload | GenericObjectPayload

View File

@@ -1,10 +1,3 @@
import {
ConversationEntryVisibility,
type ConversationEntryKind,
type ConversationEntryMetadata,
type ConversationEntryPayload,
type ConversationEntryVisibility as ConversationEntryVisibilityType,
} from "@freya/core"
import { sql } from "drizzle-orm" import { sql } from "drizzle-orm"
import { import {
boolean, boolean,
@@ -20,6 +13,14 @@ import {
uuid, uuid,
} from "drizzle-orm/pg-core" } from "drizzle-orm/pg-core"
import {
ConversationEntryVisibility,
type ConversationEntryKind,
type ConversationEntryMetadata,
type ConversationEntryPayload,
type ConversationEntryVisibility as ConversationEntryVisibilityType,
} from "../conversations/types.ts"
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
// Better Auth core tables // Better Auth core tables
// Re-exported from CLI-generated schema. // Re-exported from CLI-generated schema.
@@ -48,15 +49,6 @@ const bytea = customType<{ data: Buffer }>({
}, },
}) })
export const ConversationResponseStateStatus = {
Pending: "pending",
Running: "running",
Failed: "failed",
} as const
export type ConversationResponseStateStatus =
(typeof ConversationResponseStateStatus)[keyof typeof ConversationResponseStateStatus]
export const userSources = pgTable( export const userSources = pgTable(
"user_sources", "user_sources",
{ {
@@ -155,38 +147,6 @@ export const conversationEntries = pgTable(
], ],
) )
export const conversationResponseState = pgTable(
"conversation_response_state",
{
conversationId: uuid("conversation_id")
.primaryKey()
.references(() => conversations.id, { onDelete: "cascade" }),
status: text("status")
.$type<ConversationResponseStateStatus>()
.notNull()
.default(ConversationResponseStateStatus.Pending),
pendingSinceEntryId: uuid("pending_since_entry_id")
.notNull()
.references(() => conversationEntries.id, { onDelete: "cascade" }),
maxWaitUntil: timestamp("max_wait_until").notNull(),
runningSince: timestamp("running_since"),
createdAt: timestamp("created_at").notNull().defaultNow(),
updatedAt: timestamp("updated_at")
.notNull()
.defaultNow()
.$onUpdate(() => new Date()),
},
(t) => [
index("conversation_response_state_status_max_wait_until_idx").on(t.status, t.maxWaitUntil),
index("conversation_response_state_running_since_idx").on(t.runningSince),
index("conversation_response_state_pending_since_entry_id_idx").on(t.pendingSinceEntryId),
check(
"conversation_response_state_status_check",
sql`${t.status} in ('pending', 'running', 'failed')`,
),
],
)
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
// FREYA — reminders source storage // FREYA — reminders source storage
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------

View File

@@ -14,6 +14,7 @@ interface FeedResponse {
items: Array<{ items: Array<{
id: string id: string
type: string type: string
priority: number
timestamp: string timestamp: string
data: Record<string, unknown> data: Record<string, unknown>
}> }>
@@ -84,7 +85,7 @@ mock.module("../sources/user-sources.ts", () => ({
}), }),
})) }))
mock.module("../conversations/db-storage.ts", () => ({ mock.module("../conversations/storage.ts", () => ({
conversations: (_db: Database, userId: string) => ({ conversations: (_db: Database, userId: string) => ({
async getOrCreateConversation() { async getOrCreateConversation() {
return { id: `conversation-${userId}` } return { id: `conversation-${userId}` }
@@ -117,6 +118,7 @@ describe("GET /api/feed", () => {
id: "item-1", id: "item-1",
sourceId: "test", sourceId: "test",
type: "test", type: "test",
priority: 0.8,
timestamp: new Date("2025-01-01T00:00:00.000Z"), timestamp: new Date("2025-01-01T00:00:00.000Z"),
data: { value: 42 }, data: { value: 42 },
}, },
@@ -147,6 +149,7 @@ describe("GET /api/feed", () => {
expect(body.items).toHaveLength(1) expect(body.items).toHaveLength(1)
expect(body.items[0]!.id).toBe("item-1") expect(body.items[0]!.id).toBe("item-1")
expect(body.items[0]!.type).toBe("test") expect(body.items[0]!.type).toBe("test")
expect(body.items[0]!.priority).toBe(0.8)
expect(body.items[0]!.timestamp).toBe("2025-01-01T00:00:00.000Z") expect(body.items[0]!.timestamp).toBe("2025-01-01T00:00:00.000Z")
expect(body.errors).toHaveLength(0) expect(body.errors).toHaveLength(0)
}) })
@@ -157,6 +160,7 @@ describe("GET /api/feed", () => {
id: "fresh-1", id: "fresh-1",
sourceId: "test", sourceId: "test",
type: "test", type: "test",
priority: 0.5,
timestamp: new Date("2025-06-01T12:00:00.000Z"), timestamp: new Date("2025-06-01T12:00:00.000Z"),
data: { fresh: true }, data: { fresh: true },
}, },

View File

@@ -135,9 +135,8 @@ describe("schema sync", () => {
// JSON Schema structure matches // JSON Schema structure matches
const jsonSchema = enhancementResultJsonSchema const jsonSchema = enhancementResultJsonSchema
const payloadKeys = Object.keys(payload).sort() as Array<(typeof jsonSchema.required)[number]>
expect(Object.keys(jsonSchema.properties).sort()).toEqual(Object.keys(payload).sort()) expect(Object.keys(jsonSchema.properties).sort()).toEqual(Object.keys(payload).sort())
expect([...jsonSchema.required].sort()).toEqual(payloadKeys) expect([...jsonSchema.required].sort()).toEqual(Object.keys(payload).sort())
// syntheticItems item schema has the right required fields // syntheticItems item schema has the right required fields
const itemSchema = jsonSchema.properties.syntheticItems.items const itemSchema = jsonSchema.properties.syntheticItems.items

View File

@@ -1,116 +0,0 @@
import { Queue } from "./queue"
const JobStatus = {
Pending: "pending",
Running: "running",
} as const
type JobStatus = (typeof JobStatus)[keyof typeof JobStatus]
export interface Job<Payload> {
id: number
payload: Payload
signal: AbortSignal
}
interface PendingJob<Payload> {
status: typeof JobStatus.Pending
controller: AbortController
job: Job<Payload>
}
interface RunningJob<Payload> {
status: typeof JobStatus.Running
controller: AbortController
job: Job<Payload>
}
type JobState<Payload> = PendingJob<Payload> | RunningJob<Payload>
type JobEventListener<Payload> = (job: Job<Payload>) => void
type JobEvent = "settled" | "cancelled"
export class JobRegistry<Payload> {
private queue = new Queue<Job<Payload>>()
private states = new Map<number, JobState<Payload>>()
private listeners: Record<JobEvent, JobEventListener<Payload>[]> = {
settled: [],
cancelled: [],
}
addJob({ payload }: { payload: Payload }): Job<Payload> {
const controller = new AbortController()
const job: Job<Payload> = {
id: this.generateJobId(),
payload,
signal: controller.signal,
}
this.queue.enqueue(job)
this.states.set(job.id, { status: JobStatus.Pending, controller, job })
return job
}
async nextJob(signal?: AbortSignal): Promise<Job<Payload> | null> {
while (true) {
const job = await this.queue.next(signal)
if (!job) {
return null
}
const state = this.states.get(job.id)
if (!state || state.job !== job || state.status === JobStatus.Running) {
continue
}
if (state.controller.signal.aborted) {
this.states.delete(job.id)
continue
}
this.states.set(job.id, { status: JobStatus.Running, controller: state.controller, job })
return job
}
}
cancelJob(job: Job<unknown>): void {
const state = this.states.get(job.id)
if (state?.job === job) {
state?.controller.abort()
this.notifyListeners("cancelled", job.id)
this.states.delete(job.id)
}
}
markJobAsCompleted(job: Job<unknown>): void {
const state = this.states.get(job.id)
if (state?.job === job) {
this.notifyListeners("settled", job.id)
this.states.delete(job.id)
}
}
addEventListener(event: JobEvent, listener: JobEventListener<Payload>): () => void {
this.listeners[event].push(listener)
return () => {
this.listeners[event] = this.listeners[event].filter((l) => l !== listener)
}
}
private generateJobId(): number {
let id: number
do {
id = Math.floor(Math.random() * 1000000)
} while (this.states.has(id))
return id
}
private notifyListeners(event: JobEvent, id: number): void {
const job = this.states.get(id)?.job
if (job) {
this.listeners[event].forEach((listener) => listener(job))
}
}
}

View File

@@ -1,69 +0,0 @@
interface Item<T> {
value: T
next: Item<T> | null
}
export class Queue<T> {
private front: Item<T> | null = null
private back: Item<T> | null = null
private waiters: Array<(value: T) => void> = []
enqueue(value: T): void {
const waiter = this.waiters.shift()
if (waiter) {
waiter(value)
return
}
const newItem: Item<T> = { value, next: null }
if (this.back) {
this.back.next = newItem
} else {
this.front = newItem
}
this.back = newItem
}
dequeue(): T | null {
if (!this.front) return null
const value = this.front.value
this.front = this.front.next
if (!this.front) this.back = null
return value
}
next(signal?: AbortSignal): Promise<T | null> {
const value = this.dequeue()
if (value !== null) return Promise.resolve(value)
return new Promise((resolve) => {
if (signal) {
if (signal.aborted) {
resolve(null)
} else {
let _resolve: (v: T) => void
const onAbort = () => {
this.waiters = this.waiters.filter((w) => w !== _resolve)
resolve(null)
}
signal.addEventListener(
"abort",
onAbort,
{ once: true },
)
_resolve = (v: T) => {
signal.removeEventListener("abort", onAbort)
resolve(v)
}
this.waiters.push(_resolve)
}
} else {
this.waiters.push(resolve)
}
})
}
}

View File

@@ -1,51 +0,0 @@
import type { Job, JobRegistry } from "./job"
import type { Queue } from "./queue"
export interface JobExecutor<JobPayload> {
execute(job: Job<JobPayload>): Promise<void>
}
export interface WorkerConfig<Job> {
concurrency: number
registry: JobRegistry<Job>
runner: JobExecutor<Job>
signal: AbortSignal
}
export class Worker<Job> {
private concurrency: number
private registry: JobRegistry<Job>
private runner: JobExecutor<Job>
private signal: AbortSignal
constructor({ concurrency, registry, runner, signal }: WorkerConfig<Job>) {
this.concurrency = concurrency
this.registry = registry
this.runner = runner
this.signal = signal
}
start() {
if (this.signal.aborted) return
for (let i = 0; i < this.concurrency; i++) {
void this.pollJobFromRegistry()
}
}
private async pollJobFromRegistry() {
while (!this.signal.aborted) {
const job = await this.registry.nextJob(this.signal)
if (!job) {
return
}
try {
await this.runner.execute(job)
} catch {
// TODO: handle logging of job execution errors
} finally {
this.registry.markJobAsCompleted(job)
}
}
}
}

View File

@@ -1,36 +0,0 @@
import type { AgentEvent } from "@freya/agent-protocol"
export interface AgentNotification {
kind: "agent"
payload: AgentEvent
}
export type NotificationPayload = AgentNotification
export type NotificationListener = (notification: NotificationPayload) => Promise<void>
export class NotificationCentral {
private listeners: Map<string, Set<NotificationListener>> = new Map()
registerListenerForUser(userId: string, listener: NotificationListener): () => void {
let listeners = this.listeners.get(userId)
if (!listeners) {
listeners = new Set()
this.listeners.set(userId, listeners)
}
listeners.add(listener)
return () => {
listeners.delete(listener)
if (listeners.size === 0) {
this.listeners.delete(userId)
}
}
}
async notifyUser(userId: string, notification: NotificationPayload): Promise<void> {
const listeners = this.listeners.get(userId)
if (!listeners) return
await Promise.allSettled(Array.from(listeners).map((listener) => listener(notification)))
}
}

View File

@@ -5,15 +5,12 @@ import { createMiddleware } from "hono/factory"
import { registerAdminHttpHandlers } from "./admin/http.ts" import { registerAdminHttpHandlers } from "./admin/http.ts"
import { createQueryDebugTools } from "./agent/debug-tools.ts" import { createQueryDebugTools } from "./agent/debug-tools.ts"
import { registerAgentHttpHandlers, registerDebugAgentHttpHandlers } from "./agent/http.ts" import { registerAgentHttpHandlers, registerDebugAgentHttpHandlers } from "./agent/http.ts"
import { AgentService } from "./agent/service.ts"
import { agentWebSocket, registerAgentWebSocketHandlers } from "./agent/ws.ts" import { agentWebSocket, registerAgentWebSocketHandlers } from "./agent/ws.ts"
import { createRequireAdmin } from "./auth/admin-middleware.ts" import { createRequireAdmin } from "./auth/admin-middleware.ts"
import { registerAuthHandlers } from "./auth/http.ts" import { registerAuthHandlers } from "./auth/http.ts"
import { createAuth } from "./auth/index.ts" import { createAuth } from "./auth/index.ts"
import { createRequireSession } from "./auth/session-middleware.ts" import { createRequireSession } from "./auth/session-middleware.ts"
import { CalDavSourceProvider } from "./caldav/provider.ts" import { CalDavSourceProvider } from "./caldav/provider.ts"
import { registerConversationsHttpHandlers } from "./conversations/http.ts"
import { DrizzleConversationStorage } from "./conversations/storage.ts"
import { createDatabase } from "./db/index.ts" import { createDatabase } from "./db/index.ts"
import { registerFeedHttpHandlers } from "./engine/http.ts" import { registerFeedHttpHandlers } from "./engine/http.ts"
import { createFeedEnhancer } from "./enhancement/enhance-feed.ts" import { createFeedEnhancer } from "./enhancement/enhance-feed.ts"
@@ -23,7 +20,6 @@ import { CredentialEncryptor } from "./lib/crypto.ts"
import { ensureEnv } from "./lib/env.ts" import { ensureEnv } from "./lib/env.ts"
import { registerLocationHttpHandlers } from "./location/http.ts" import { registerLocationHttpHandlers } from "./location/http.ts"
import { LocationSourceProvider } from "./location/provider.ts" import { LocationSourceProvider } from "./location/provider.ts"
import { NotificationCentral } from "./notification/notification-central.ts"
import { ReminderSourceProvider } from "./reminders/provider.ts" import { ReminderSourceProvider } from "./reminders/provider.ts"
import { UserSessionManager } from "./session/index.ts" import { UserSessionManager } from "./session/index.ts"
import { registerSourcesHttpHandlers } from "./sources/http.ts" import { registerSourcesHttpHandlers } from "./sources/http.ts"
@@ -35,12 +31,8 @@ function main() {
const env = ensureEnv(process.env) const env = ensureEnv(process.env)
const { db, close: closeDb } = createDatabase(env.databaseUrl) const { db, close: closeDb } = createDatabase(env.databaseUrl)
const conversationStorage = new DrizzleConversationStorage(db, false)
const auth = createAuth(db) const auth = createAuth(db)
const abortController = new AbortController()
const feedEnhancer = createFeedEnhancer({ const feedEnhancer = createFeedEnhancer({
client: createLlmClient({ client: createLlmClient({
apiKey: env.openrouterApiKey, apiKey: env.openrouterApiKey,
@@ -80,15 +72,6 @@ function main() {
console.warn("[query] PI_API_KEY or OPENROUTER_API_KEY not set — query agent unavailable") console.warn("[query] PI_API_KEY or OPENROUTER_API_KEY not set — query agent unavailable")
} }
const notificationCentral = new NotificationCentral()
const agentService = new AgentService({
notificationCentral,
storage: conversationStorage,
userSessionManager: sessionManager,
signal: abortController.signal,
})
const app = new Hono() const app = new Hono()
const isDev = process.env.NODE_ENV !== "production" const isDev = process.env.NODE_ENV !== "production"
@@ -146,7 +129,6 @@ function main() {
sessionManager, sessionManager,
authSessionMiddleware, authSessionMiddleware,
}) })
registerConversationsHttpHandlers(app, { db, authSessionMiddleware })
if (isDebugMode) { if (isDebugMode) {
registerDebugAgentHttpHandlers(app, { registerDebugAgentHttpHandlers(app, {
authSessionMiddleware, authSessionMiddleware,
@@ -157,22 +139,17 @@ function main() {
registerAdminHttpHandlers(app, { sessionManager, adminMiddleware, db }) registerAdminHttpHandlers(app, { sessionManager, adminMiddleware, db })
registerAgentWebSocketHandlers(app, { registerAgentWebSocketHandlers(app, {
agentService, sessionManager,
notificationCentral,
storage: conversationStorage,
authSessionMiddleware, authSessionMiddleware,
corsMiddleware: agentWebSocketCorsMiddleware, corsMiddleware: agentWebSocketCorsMiddleware,
}) })
process.on("SIGTERM", async () => { process.on("SIGTERM", async () => {
sessionManager.dispose() sessionManager.dispose()
abortController.abort()
await closeDb() await closeDb()
process.exit(0) process.exit(0)
}) })
agentService.start()
return app return app
} }

View File

@@ -1,6 +1,5 @@
import type { ActionDefinition, ContextEntry, FeedItem, FeedSource } from "@freya/core" import type { ActionDefinition, ContextEntry, FeedItem, FeedSource } from "@freya/core"
import { ConversationEntryKind } from "@freya/core"
import { LocationSource } from "@freya/source-location" import { LocationSource } from "@freya/source-location"
import { WeatherSource } from "@freya/source-weatherkit" import { WeatherSource } from "@freya/source-weatherkit"
import { beforeEach, describe, expect, mock, spyOn, test } from "bun:test" import { beforeEach, describe, expect, mock, spyOn, test } from "bun:test"
@@ -10,6 +9,7 @@ import type { AppendConversationEntryInput } from "../conversations/storage.ts"
import type { Database } from "../db/index.ts" import type { Database } from "../db/index.ts"
import type { FeedSourceProvider } from "./feed-source-provider.ts" import type { FeedSourceProvider } from "./feed-source-provider.ts"
import { ConversationEntryKind } from "../conversations/types.ts"
import { CredentialEncryptor } from "../lib/crypto.ts" import { CredentialEncryptor } from "../lib/crypto.ts"
import { import {
CredentialStorageUnavailableError, CredentialStorageUnavailableError,
@@ -120,7 +120,7 @@ mock.module("../sources/user-sources.ts", () => ({
}), }),
})) }))
mock.module("../conversations/db-storage.ts", () => ({ mock.module("../conversations/storage.ts", () => ({
conversations: (_db: Database, userId: string) => ({ conversations: (_db: Database, userId: string) => ({
async getOrCreateConversation(): Promise<{ id: string }> { async getOrCreateConversation(): Promise<{ id: string }> {
mockConversationCalls.push({ type: "getOrCreate", userId }) mockConversationCalls.push({ type: "getOrCreate", userId })

View File

@@ -8,7 +8,7 @@ import type { FeedEnhancer } from "../enhancement/enhance-feed.ts"
import type { CredentialEncryptor } from "../lib/crypto.ts" import type { CredentialEncryptor } from "../lib/crypto.ts"
import type { FeedSourceProvider } from "./feed-source-provider.ts" import type { FeedSourceProvider } from "./feed-source-provider.ts"
import { conversations } from "../conversations/db-storage.ts" import { conversations } from "../conversations/storage.ts"
import { import {
CredentialStorageUnavailableError, CredentialStorageUnavailableError,
InvalidSourceConfigError, InvalidSourceConfigError,

View File

@@ -1,6 +1,5 @@
import type { ActionDefinition, ContextEntry, FeedItem, FeedSource } from "@freya/core" import type { ActionDefinition, ContextEntry, FeedItem, FeedSource } from "@freya/core"
import { ConversationEntryKind } from "@freya/core"
import { LocationSource } from "@freya/source-location" import { LocationSource } from "@freya/source-location"
import { describe, expect, spyOn, test } from "bun:test" import { describe, expect, spyOn, test } from "bun:test"
@@ -10,6 +9,7 @@ import type {
} from "../agent/conversation-recording-query-agent.ts" } from "../agent/conversation-recording-query-agent.ts"
import type { AppendConversationEntryInput } from "../conversations/storage.ts" import type { AppendConversationEntryInput } from "../conversations/storage.ts"
import { ConversationEntryKind } from "../conversations/types.ts"
import { UserSession } from "./user-session.ts" import { UserSession } from "./user-session.ts"
function createStubSource(id: string, items: FeedItem[] = []): FeedSource { function createStubSource(id: string, items: FeedItem[] = []): FeedSource {

View File

@@ -263,12 +263,18 @@ export class UserSession {
const conversation = await conversationStorage.getOrCreateConversation() const conversation = await conversationStorage.getOrCreateConversation()
const entries = await conversationStorage.listEntries(conversation.id) const entries = await conversationStorage.listEntries(conversation.id)
this.queryAgent = new PiQueryAgent({ this.queryAgent = new ConversationRecordingQueryAgent({
toolbox: this.toolbox, agent: new PiQueryAgent({
apiKey: this.agentConfig?.apiKey, toolbox: this.toolbox,
cwd: this.agentConfig?.cwd, apiKey: this.agentConfig?.apiKey,
systemPrompt: this.agentConfig?.systemPrompt, cwd: this.agentConfig?.cwd,
initialEntries: entries, systemPrompt: this.agentConfig?.systemPrompt,
initialEntries: entries,
}),
storage: conversationStorage,
defaultConversationId: conversation.id,
modelProvider: PI_MODEL_PROVIDER,
modelId: PI_MODEL_ID,
}) })
} }

View File

@@ -128,7 +128,7 @@ mock.module("../sources/user-sources.ts", () => ({
}, },
})) }))
mock.module("../conversations/db-storage.ts", () => ({ mock.module("../conversations/storage.ts", () => ({
conversations: (_db: Database, userId: string) => ({ conversations: (_db: Database, userId: string) => ({
async getOrCreateConversation() { async getOrCreateConversation() {
return { id: `conversation-${userId}` } return { id: `conversation-${userId}` }

View File

@@ -51,7 +51,7 @@
"version": "0.0.0", "version": "0.0.0",
"dependencies": { "dependencies": {
"@freya/agent-protocol": "workspace:*", "@freya/agent-protocol": "workspace:*",
"@nym.sh/jrpc": "1.1.0", "@nym.sh/jrpc": "^0.1.0",
}, },
}, },
"apps/freya-backend": { "apps/freya-backend": {
@@ -69,7 +69,7 @@
"@freya/source-tfl": "workspace:*", "@freya/source-tfl": "workspace:*",
"@freya/source-weatherkit": "workspace:*", "@freya/source-weatherkit": "workspace:*",
"@freya/source-web-search": "workspace:*", "@freya/source-web-search": "workspace:*",
"@nym.sh/jrpc": "1.1.0", "@nym.sh/jrpc": "^0.1.0",
"@openrouter/sdk": "^0.9.11", "@openrouter/sdk": "^0.9.11",
"arktype": "^2.1.29", "arktype": "^2.1.29",
"better-auth": "^1", "better-auth": "^1",
@@ -172,7 +172,6 @@
"version": "0.0.0", "version": "0.0.0",
"dependencies": { "dependencies": {
"@standard-schema/spec": "^1.1.0", "@standard-schema/spec": "^1.1.0",
"arktype": "^2.1.29",
}, },
"peerDependencies": { "peerDependencies": {
"@json-render/core": "*", "@json-render/core": "*",
@@ -838,7 +837,7 @@
"@nolyfill/is-core-module": ["@nolyfill/is-core-module@1.0.39", "", {}, "sha512-nn5ozdjYQpUCZlWGuxcJY/KpxkWQs4DcbMCmKojjyrYDEAGy4Ce19NN4v5MduafTwJlbKc99UA8YhSVqq9yPZA=="], "@nolyfill/is-core-module": ["@nolyfill/is-core-module@1.0.39", "", {}, "sha512-nn5ozdjYQpUCZlWGuxcJY/KpxkWQs4DcbMCmKojjyrYDEAGy4Ce19NN4v5MduafTwJlbKc99UA8YhSVqq9yPZA=="],
"@nym.sh/jrpc": ["@nym.sh/jrpc@1.1.0", "", {}, "sha512-212SYMB37GdL8enaRTTqG/LNa5bJ7eYth6jfQfECuedQCuaju0bOMUzCN6hvY5KkrxdYuqVKmr2Uz+ZZTjPlaQ=="], "@nym.sh/jrpc": ["@nym.sh/jrpc@0.1.0", "", {}, "sha512-qH+vqKojPrX4RkW67U2R4J98uWHxZOwYxX2J5GLZcfm/yjklCcN5zTfDNLfgAa9jAoOFVscC3DFWhvdZOmN3fA=="],
"@nym.sh/jrx": ["@nym.sh/jrx@0.2.0", "", { "peerDependencies": { "@json-render/core": ">=0.10.0" } }, "sha512-jd7Z1Q6T21366MtSUnwCFiu6Yl1AdNc9s5m6HxeUg265P+0enZCiyyxOuHsFwvpUcSEs/2DVBsqfMptdca44lA=="], "@nym.sh/jrx": ["@nym.sh/jrx@0.2.0", "", { "peerDependencies": { "@json-render/core": ">=0.10.0" } }, "sha512-jd7Z1Q6T21366MtSUnwCFiu6Yl1AdNc9s5m6HxeUg265P+0enZCiyyxOuHsFwvpUcSEs/2DVBsqfMptdca44lA=="],

View File

@@ -53,10 +53,9 @@
# so source-only edits do not force Bun to reinstall. # so source-only edits do not force Bun to reinstall.
dependencySource = lib.fileset.toSource { dependencySource = lib.fileset.toSource {
root = ./.; root = ./.;
fileset = lib.fileset.fileFilter fileset = lib.fileset.fileFilter (
( file: file.name == "bun.lock" || file.name == "package.json" || file.name == "bunfig.toml"
file: file.name == "bun.lock" || file.name == "package.json" || file.name == "bunfig.toml" ) ./.;
) ./.;
}; };
# Checks run against a clean source tree, even when using `path:.`. # Checks run against a clean source tree, even when using `path:.`.
@@ -93,12 +92,10 @@
lib.mapAttrs mkBunScript scripts; lib.mapAttrs mkBunScript scripts;
mkBunApps = mkBunApps =
commands: commands:
lib.mapAttrs lib.mapAttrs (name: command: {
(name: command: { type = "app";
type = "app"; program = "${command}/bin/${name}";
program = "${command}/bin/${name}"; }) commands;
})
commands;
mkBunNodeModules = mkBunNodeModules =
system: pkgs: system: pkgs:
pkgs.stdenvNoCC.mkDerivation { pkgs.stdenvNoCC.mkDerivation {
@@ -258,7 +255,6 @@
pkg-config pkg-config
postgresql postgresql
python3 python3
typescript-go
watchman watchman
]; ];
linuxPackages = with pkgs; [ linuxPackages = with pkgs; [

View File

@@ -1,3 +1,8 @@
export interface SendMessageResult {
message: string
conversationId: string
}
export type AgentEvent = export type AgentEvent =
| { type: "conversation_started"; conversationId: string } | { type: "conversation_started"; conversationId: string }
| { type: "message_created"; text: string } | { type: "message_created"; text: string }
@@ -6,11 +11,8 @@ export type AgentEvent =
| { type: "message_finished" } | { type: "message_finished" }
| { type: "message_failed"; error: string } | { type: "message_failed"; error: string }
export type UserEvent = { type: "typing" }
export interface AgentServerApi { export interface AgentServerApi {
sendMessage(message: string): Promise<boolean> sendMessage(message: string): Promise<SendMessageResult>
notify(event: UserEvent): void
ping(): "pong" ping(): "pong"
} }

View File

@@ -8,8 +8,7 @@
"test": "bun test ." "test": "bun test ."
}, },
"dependencies": { "dependencies": {
"@standard-schema/spec": "^1.1.0", "@standard-schema/spec": "^1.1.0"
"arktype": "^2.1.29"
}, },
"peerDependencies": { "peerDependencies": {
"@json-render/core": "*", "@json-render/core": "*",

View File

@@ -6,27 +6,6 @@ export { Context, contextKey, serializeKey } from "./context"
export type { ActionDefinition } from "./action" export type { ActionDefinition } from "./action"
export { UnknownActionError } from "./action" export { UnknownActionError } from "./action"
// Conversation
export type { ConversationEntryPayload } from "./conversation"
export {
AssistantMessagePayload,
AttachmentPayload,
AttachmentType,
ContextSummary,
ContextSummaryPayload,
ConversationEntryKind,
ConversationEntryMetadata,
ConversationEntryVisibility,
GenericObjectPayload,
JsonMessagePart,
MessagePart,
ModelRunMetadata,
TextMessagePart,
UserMessagePayload,
ToolCallPayload,
ToolResultPayload,
} from "./conversation"
// Feed // Feed
export type { FeedItem, FeedItemRenderer, FeedItemSignals, RenderedFeedItem, Slot } from "./feed" export type { FeedItem, FeedItemRenderer, FeedItemSignals, RenderedFeedItem, Slot } from "./feed"
export { TimeRelevance } from "./feed" export { TimeRelevance } from "./feed"

View File

@@ -1,11 +0,0 @@
{
"version": 1,
"skills": {
"upgrading-expo": {
"source": "expo/skills",
"sourceType": "github",
"skillPath": "plugins/expo/skills/upgrading-expo/SKILL.md",
"computedHash": "98d228925a442126789d90783fdd4ae5de33ab690024b575698057d2ffc44b40"
}
}
}