Compare commits

..

1 Commits

Author SHA1 Message Date
54e4b0dcf7 feat(backend): add TflService
Manages per-user TflSource instances with individual line
configuration. Implements FeedSourceProvider so it can be
wired into FeedEngineService.

Adds TflSource.setLines() so line config can be mutated
in place, keeping engine references valid.

Also exports ITflApi from @aris/source-tfl for testability.

Co-authored-by: Ona <no-reply@ona.com>
2026-02-13 19:38:21 +00:00
15 changed files with 343 additions and 218 deletions

View File

@@ -10,7 +10,6 @@
"context": ".",
"dockerfile": "Dockerfile"
},
"postCreateCommand": "bun install",
"postStartCommand": "./scripts/setup-git.sh && ./scripts/setup-nvim.sh"
// Features add additional features to your environment. See https://containers.dev/features
// Beware: features are not supported on all platforms and may have unintended side-effects.

View File

@@ -6,9 +6,3 @@ BETTER_AUTH_SECRET=
# Base URL of the backend
BETTER_AUTH_URL=http://localhost:3000
# Apple WeatherKit credentials
WEATHERKIT_PRIVATE_KEY=
WEATHERKIT_KEY_ID=
WEATHERKIT_TEAM_ID=
WEATHERKIT_SERVICE_ID=

View File

@@ -11,6 +11,7 @@
"dependencies": {
"@aris/core": "workspace:*",
"@aris/source-location": "workspace:*",
"@aris/source-tfl": "workspace:*",
"@aris/source-weatherkit": "workspace:*",
"@hono/trpc-server": "^0.3",
"@trpc/server": "^11",

View File

@@ -1,11 +1,10 @@
import { TRPCError } from "@trpc/server"
import { type } from "arktype"
import { UserNotFoundError } from "../lib/error.ts"
import type { TRPC } from "../trpc/router.ts"
import type { LocationService } from "./service.ts"
import { UserNotFoundError } from "../lib/error.ts"
const locationInput = type({
lat: "number",
lng: "number",
@@ -13,10 +12,7 @@ const locationInput = type({
timestamp: "Date",
})
export function createLocationRouter(
t: TRPC,
{ locationService }: { locationService: LocationService },
) {
export function createLocationRouter(t: TRPC, { locationService }: { locationService: LocationService }) {
return t.router({
update: t.procedure.input(locationInput).mutation(({ input, ctx }) => {
try {

View File

@@ -5,21 +5,11 @@ import { registerAuthHandlers } from "./auth/http.ts"
import { LocationService } from "./location/service.ts"
import { createContext } from "./trpc/context.ts"
import { createTRPCRouter } from "./trpc/router.ts"
import { WeatherService } from "./weather/service.ts"
function main() {
const locationService = new LocationService()
const weatherService = new WeatherService({
credentials: {
privateKey: process.env.WEATHERKIT_PRIVATE_KEY!,
keyId: process.env.WEATHERKIT_KEY_ID!,
teamId: process.env.WEATHERKIT_TEAM_ID!,
serviceId: process.env.WEATHERKIT_SERVICE_ID!,
},
})
const trpcRouter = createTRPCRouter({ locationService, weatherService })
const trpcRouter = createTRPCRouter({ locationService })
const app = new Hono()

View File

@@ -0,0 +1,206 @@
import type { Context } from "@aris/core"
import type { ITflApi, StationLocation, TflLineId, TflLineStatus } from "@aris/source-tfl"
import { describe, expect, test } from "bun:test"
import { UserNotFoundError } from "../lib/error.ts"
import { TflService } from "./service.ts"
class StubTflApi implements ITflApi {
private statuses: TflLineStatus[]
constructor(statuses: TflLineStatus[] = []) {
this.statuses = statuses
}
async fetchLineStatuses(lines?: TflLineId[]): Promise<TflLineStatus[]> {
if (lines) {
return this.statuses.filter((s) => lines.includes(s.lineId))
}
return this.statuses
}
async fetchStations(): Promise<StationLocation[]> {
return [
{
id: "940GZZLUKSX",
name: "King's Cross",
lat: 51.5308,
lng: -0.1238,
lines: ["northern", "victoria"],
},
]
}
}
function createContext(): Context {
return { time: new Date("2026-01-15T12:00:00Z") }
}
const sampleStatuses: TflLineStatus[] = [
{
lineId: "northern",
lineName: "Northern",
severity: "minor-delays",
description: "Minor delays on the Northern line",
},
{
lineId: "victoria",
lineName: "Victoria",
severity: "major-delays",
description: "Severe delays on the Victoria line",
},
{
lineId: "central",
lineName: "Central",
severity: "closure",
description: "Central line suspended",
},
]
describe("TflService", () => {
test("feedSourceForUser creates source on first call", () => {
const service = new TflService(new StubTflApi())
const source = service.feedSourceForUser("user-1")
expect(source).toBeDefined()
expect(source.id).toBe("tfl")
})
test("feedSourceForUser returns same source for same user", () => {
const service = new TflService(new StubTflApi())
const source1 = service.feedSourceForUser("user-1")
const source2 = service.feedSourceForUser("user-1")
expect(source1).toBe(source2)
})
test("feedSourceForUser returns different sources for different users", () => {
const service = new TflService(new StubTflApi())
const source1 = service.feedSourceForUser("user-1")
const source2 = service.feedSourceForUser("user-2")
expect(source1).not.toBe(source2)
})
test("updateLinesOfInterest mutates the existing source in place", () => {
const service = new TflService(new StubTflApi())
const original = service.feedSourceForUser("user-1")
service.updateLinesOfInterest("user-1", ["northern", "victoria"])
const after = service.feedSourceForUser("user-1")
expect(after).toBe(original)
})
test("updateLinesOfInterest throws if source does not exist", () => {
const service = new TflService(new StubTflApi())
expect(() => service.updateLinesOfInterest("user-1", ["northern"])).toThrow(UserNotFoundError)
})
test("removeUser removes the source", () => {
const service = new TflService(new StubTflApi())
const source1 = service.feedSourceForUser("user-1")
service.removeUser("user-1")
const source2 = service.feedSourceForUser("user-1")
expect(source1).not.toBe(source2)
})
test("removeUser clears line configuration", async () => {
const api = new StubTflApi(sampleStatuses)
const service = new TflService(api)
service.feedSourceForUser("user-1")
service.updateLinesOfInterest("user-1", ["northern"])
service.removeUser("user-1")
const items = await service.feedSourceForUser("user-1").fetchItems(createContext())
expect(items.length).toBe(3)
})
test("shares single api instance across users", () => {
const api = new StubTflApi()
const service = new TflService(api)
service.feedSourceForUser("user-1")
service.feedSourceForUser("user-2")
expect(service.feedSourceForUser("user-1").id).toBe("tfl")
expect(service.feedSourceForUser("user-2").id).toBe("tfl")
})
describe("returned source fetches items", () => {
test("source returns feed items from api", async () => {
const api = new StubTflApi(sampleStatuses)
const service = new TflService(api)
const source = service.feedSourceForUser("user-1")
const items = await source.fetchItems(createContext())
expect(items.length).toBe(3)
for (const item of items) {
expect(item.type).toBe("tfl-alert")
expect(item.id).toMatch(/^tfl-alert-/)
expect(typeof item.priority).toBe("number")
expect(item.timestamp).toBeInstanceOf(Date)
}
})
test("source returns items sorted by priority descending", async () => {
const api = new StubTflApi(sampleStatuses)
const service = new TflService(api)
const source = service.feedSourceForUser("user-1")
const items = await source.fetchItems(createContext())
for (let i = 1; i < items.length; i++) {
expect(items[i - 1]!.priority).toBeGreaterThanOrEqual(items[i]!.priority)
}
})
test("source returns empty array when no disruptions", async () => {
const api = new StubTflApi([])
const service = new TflService(api)
const source = service.feedSourceForUser("user-1")
const items = await source.fetchItems(createContext())
expect(items).toEqual([])
})
test("updateLinesOfInterest filters items to configured lines", async () => {
const api = new StubTflApi(sampleStatuses)
const service = new TflService(api)
const before = await service.feedSourceForUser("user-1").fetchItems(createContext())
expect(before.length).toBe(3)
service.updateLinesOfInterest("user-1", ["northern"])
const after = await service.feedSourceForUser("user-1").fetchItems(createContext())
expect(after.length).toBe(1)
expect(after[0]!.data.line).toBe("northern")
})
test("different users get independent line configs", async () => {
const api = new StubTflApi(sampleStatuses)
const service = new TflService(api)
service.feedSourceForUser("user-1")
service.feedSourceForUser("user-2")
service.updateLinesOfInterest("user-1", ["northern"])
service.updateLinesOfInterest("user-2", ["central"])
const items1 = await service.feedSourceForUser("user-1").fetchItems(createContext())
const items2 = await service.feedSourceForUser("user-2").fetchItems(createContext())
expect(items1.length).toBe(1)
expect(items1[0]!.data.line).toBe("northern")
expect(items2.length).toBe(1)
expect(items2[0]!.data.line).toBe("central")
})
})
})

View File

@@ -0,0 +1,40 @@
import { TflSource, type ITflApi, type TflLineId } from "@aris/source-tfl"
import type { FeedSourceProvider } from "../feed/service.ts"
import { UserNotFoundError } from "../lib/error.ts"
/**
* Manages per-user TflSource instances with individual line configuration.
*/
export class TflService implements FeedSourceProvider {
private sources = new Map<string, TflSource>()
constructor(private readonly api: ITflApi) {}
feedSourceForUser(userId: string): TflSource {
let source = this.sources.get(userId)
if (!source) {
source = new TflSource({ client: this.api })
this.sources.set(userId, source)
}
return source
}
/**
* Update monitored lines for a user. Mutates the existing TflSource
* so that references held by FeedEngine remain valid.
* @throws {UserNotFoundError} If no source exists for the user
*/
updateLinesOfInterest(userId: string, lines: TflLineId[]): void {
const source = this.sources.get(userId)
if (!source) {
throw new UserNotFoundError(userId)
}
source.setLinesOfInterest(lines)
}
removeUser(userId: string): void {
this.sources.delete(userId)
}
}

View File

@@ -1,10 +1,8 @@
import { initTRPC, TRPCError } from "@trpc/server"
import type { LocationService } from "../location/service.ts"
import type { WeatherService } from "../weather/service.ts"
import type { Context } from "./context.ts"
import { createLocationRouter } from "../location/router.ts"
import type { LocationService } from "../location/service.ts"
import type { Context } from "./context.ts"
interface AuthedContext {
user: NonNullable<Context["user"]>
@@ -36,7 +34,6 @@ export type TRPC = ReturnType<typeof createTRPC>
export interface TRPCRouterDeps {
locationService: LocationService
weatherService: WeatherService
}
export function createTRPCRouter({ locationService }: TRPCRouterDeps) {

View File

@@ -1,116 +0,0 @@
import type { Context } from "@aris/core"
import { LocationKey } from "@aris/source-location"
import {
Units,
WeatherFeedItemType,
type WeatherKitClient,
type WeatherKitResponse,
} from "@aris/source-weatherkit"
import { describe, expect, test } from "bun:test"
import fixture from "../../../../packages/aris-source-weatherkit/fixtures/san-francisco.json"
import { WeatherService } from "./service.ts"
const mockClient = createMockClient(fixture.response as WeatherKitResponse)
function createMockClient(response: WeatherKitResponse): WeatherKitClient {
return {
fetch: async () => response,
}
}
function createMockContext(location?: { lat: number; lng: number }): Context {
const ctx: Context = { time: new Date("2026-01-17T00:00:00Z") }
if (location) {
ctx[LocationKey] = { ...location, accuracy: 10, timestamp: new Date() }
}
return ctx
}
describe("WeatherService", () => {
test("feedSourceForUser creates source on first call", () => {
const service = new WeatherService({ client: mockClient })
const source = service.feedSourceForUser("user-1")
expect(source).toBeDefined()
expect(source.id).toBe("weather")
})
test("feedSourceForUser returns same source for same user", () => {
const service = new WeatherService({ client: mockClient })
const source1 = service.feedSourceForUser("user-1")
const source2 = service.feedSourceForUser("user-1")
expect(source1).toBe(source2)
})
test("feedSourceForUser returns different sources for different users", () => {
const service = new WeatherService({ client: mockClient })
const source1 = service.feedSourceForUser("user-1")
const source2 = service.feedSourceForUser("user-2")
expect(source1).not.toBe(source2)
})
test("feedSourceForUser applies hourly and daily limits", async () => {
const service = new WeatherService({
client: mockClient,
hourlyLimit: 3,
dailyLimit: 2,
})
const source = service.feedSourceForUser("user-1")
const context = createMockContext({ lat: 37.7749, lng: -122.4194 })
const items = await source.fetchItems(context)
const hourly = items.filter((i) => i.type === WeatherFeedItemType.hourly)
const daily = items.filter((i) => i.type === WeatherFeedItemType.daily)
expect(hourly).toHaveLength(3)
expect(daily).toHaveLength(2)
})
test("feedSourceForUser applies units", async () => {
const service = new WeatherService({
client: mockClient,
units: Units.imperial,
})
const source = service.feedSourceForUser("user-1")
const context = createMockContext({ lat: 37.7749, lng: -122.4194 })
const items = await source.fetchItems(context)
const current = items.find((i) => i.type === WeatherFeedItemType.current)
expect(current).toBeDefined()
// Fixture has ~15.87°C, imperial should be ~60.6°F
expect(current!.data.temperature).toBeGreaterThan(50)
})
test("removeUser removes the source", () => {
const service = new WeatherService({ client: mockClient })
service.feedSourceForUser("user-1")
service.removeUser("user-1")
// After removal, feedSourceForUser should create a new instance
const source2 = service.feedSourceForUser("user-1")
expect(source2).toBeDefined()
})
test("removeUser allows new source to be created", () => {
const service = new WeatherService({ client: mockClient })
const source1 = service.feedSourceForUser("user-1")
service.removeUser("user-1")
const source2 = service.feedSourceForUser("user-1")
expect(source1).not.toBe(source2)
})
test("removeUser is no-op for unknown user", () => {
const service = new WeatherService({ client: mockClient })
expect(() => service.removeUser("unknown")).not.toThrow()
})
})

View File

@@ -1,40 +0,0 @@
import { WeatherSource, type WeatherSourceOptions } from "@aris/source-weatherkit"
import type { FeedSourceProvider } from "../feed/service.ts"
/**
* Options forwarded to every per-user WeatherSource.
* Must include either `credentials` or `client` (same requirement as WeatherSourceOptions).
*/
export type WeatherServiceOptions = WeatherSourceOptions
/**
* Manages WeatherSource instances per user.
*/
export class WeatherService implements FeedSourceProvider {
private sources = new Map<string, WeatherSource>()
private readonly options: WeatherServiceOptions
constructor(options: WeatherServiceOptions) {
this.options = options
}
/**
* Get or create a WeatherSource for a user.
*/
feedSourceForUser(userId: string): WeatherSource {
let source = this.sources.get(userId)
if (!source) {
source = new WeatherSource(this.options)
this.sources.set(userId, source)
}
return source
}
/**
* Remove a user's WeatherSource.
*/
removeUser(userId: string): void {
this.sources.delete(userId)
}
}

View File

@@ -19,6 +19,7 @@
"dependencies": {
"@aris/core": "workspace:*",
"@aris/source-location": "workspace:*",
"@aris/source-tfl": "workspace:*",
"@aris/source-weatherkit": "workspace:*",
"@hono/trpc-server": "^0.3",
"@trpc/server": "^11",

View File

@@ -7,14 +7,12 @@ ARIS needs a backend service that manages per-user FeedEngine instances and deli
## Requirements
### Authentication
- Email/password authentication using BetterAuth
- PostgreSQL for session and user storage
- Session tokens validated via `Authorization: Bearer <token>` header
- Auth endpoints exposed via BetterAuth's built-in routes
### FeedEngine Management
- Each authenticated user gets their own FeedEngine instance
- Instances are cached in memory with a 30-minute TTL
- TTL resets on any activity (WebSocket message, location update)
@@ -22,7 +20,6 @@ ARIS needs a backend service that manages per-user FeedEngine instances and deli
- Source configuration is hardcoded initially (customization deferred)
### WebSocket Connection
- Single endpoint: `GET /ws` (upgrades to WebSocket)
- Authentication via `Authorization: Bearer <token>` header on upgrade request
- Rejected before upgrade if token is invalid
@@ -31,44 +28,39 @@ ARIS needs a backend service that manages per-user FeedEngine instances and deli
- On connect: immediately send current feed state
### JSON-RPC Protocol
All WebSocket communication uses JSON-RPC 2.0.
**Client → Server (Requests):**
```json
{ "jsonrpc": "2.0", "method": "location.update", "params": { "lat": 51.5, "lng": -0.1, "accuracy": 10, "timestamp": "2025-01-01T12:00:00Z" }, "id": 1 }
{ "jsonrpc": "2.0", "method": "feed.refresh", "params": {}, "id": 2 }
```
**Server → Client (Responses):**
```json
{ "jsonrpc": "2.0", "result": { "ok": true }, "id": 1 }
```
**Server → Client (Notifications - no id):**
```json
{ "jsonrpc": "2.0", "method": "feed.update", "params": { "items": [...], "errors": [...] } }
```
### JSON-RPC Methods
| Method | Params | Description |
| ----------------- | ----------------------------------- | ------------------------------------------- |
| Method | Params | Description |
|--------|--------|-------------|
| `location.update` | `{ lat, lng, accuracy, timestamp }` | Push location update, triggers feed refresh |
| `feed.refresh` | `{}` | Force manual feed refresh |
| `feed.refresh` | `{}` | Force manual feed refresh |
### Server Notifications
| Method | Params | Description |
| ------------- | ---------------------------- | ---------------------- |
| `feed.update` | `{ context, items, errors }` | Feed state changed |
| `error` | `{ code, message, data? }` | Source or system error |
| Method | Params | Description |
|--------|--------|-------------|
| `feed.update` | `{ context, items, errors }` | Feed state changed |
| `error` | `{ code, message, data? }` | Source or system error |
### Error Handling
- Source failures during refresh are reported via `error` notification
- Format: `{ "jsonrpc": "2.0", "method": "error", "params": { "code": -32000, "message": "...", "data": { "sourceId": "weather" } } }`
@@ -104,19 +96,16 @@ All WebSocket communication uses JSON-RPC 2.0.
## Implementation Approach
### Phase 1: Project Setup
1. Create `apps/aris-backend` with Hono
2. Configure TypeScript, add dependencies (hono, better-auth, postgres driver)
3. Set up database connection and BetterAuth
### Phase 2: Authentication
4. Configure BetterAuth with email/password provider
5. Mount BetterAuth routes at `/api/auth/*`
6. Create session validation helper for extracting user from token
### Phase 3: FeedEngine Manager
7. Create `FeedEngineManager` class:
- `getOrCreate(userId): FeedEngine` - returns cached or creates new
- `touch(userId)` - resets TTL
@@ -125,26 +114,22 @@ All WebSocket communication uses JSON-RPC 2.0.
8. Factory function to create FeedEngine with default sources
### Phase 4: WebSocket Handler
9. Create WebSocket upgrade endpoint at `/ws`
10. Validate `Authorization` header before upgrade
11. On connect: register connection, send initial feed state
12. On disconnect: unregister connection
### Phase 5: JSON-RPC Handler
13. Create JSON-RPC message parser and dispatcher
14. Implement `location.update` method
15. Implement `feed.refresh` method
16. Wire FeedEngine subscription to broadcast `feed.update` to all user connections
### Phase 6: Connection Manager
17. Create `ConnectionManager` to track WebSocket connections per user
18. Broadcast helper to send to all connections for a user
### Phase 7: Integration & Testing
19. Integration test: auth → connect → location update → receive feed
20. Test multiple connections receive same updates
21. Test TTL cleanup
@@ -173,15 +158,15 @@ apps/aris-backend/
```json
{
"dependencies": {
"hono": "^4",
"better-auth": "^1",
"postgres": "^3",
"@aris/core": "workspace:*",
"@aris/source-location": "workspace:*",
"@aris/source-weatherkit": "workspace:*",
"@aris/data-source-tfl": "workspace:*"
}
"dependencies": {
"hono": "^4",
"better-auth": "^1",
"postgres": "^3",
"@aris/core": "workspace:*",
"@aris/source-location": "workspace:*",
"@aris/source-weatherkit": "workspace:*",
"@aris/data-source-tfl": "workspace:*"
}
}
```

View File

@@ -2,6 +2,7 @@ export { TflSource } from "./tfl-source.ts"
export { TflApi } from "./tfl-api.ts"
export type { TflLineId } from "./tfl-api.ts"
export type {
ITflApi,
StationLocation,
TflAlertData,
TflAlertFeedItem,

View File

@@ -112,6 +112,49 @@ describe("TflSource", () => {
})
})
describe("setLinesOfInterest", () => {
const lineFilteringApi: ITflApi = {
async fetchLineStatuses(lines?: TflLineId[]): Promise<TflLineStatus[]> {
const all: TflLineStatus[] = [
{
lineId: "northern",
lineName: "Northern",
severity: "minor-delays",
description: "Delays",
},
{ lineId: "central", lineName: "Central", severity: "closure", description: "Closed" },
]
return lines ? all.filter((s) => lines.includes(s.lineId)) : all
},
async fetchStations(): Promise<StationLocation[]> {
return []
},
}
test("changes which lines are fetched", async () => {
const source = new TflSource({ client: lineFilteringApi })
const before = await source.fetchItems(createContext())
expect(before.length).toBe(2)
source.setLinesOfInterest(["northern"])
const after = await source.fetchItems(createContext())
expect(after.length).toBe(1)
expect(after[0]!.data.line).toBe("northern")
})
test("DEFAULT_LINES_OF_INTEREST restores all lines", async () => {
const source = new TflSource({ client: lineFilteringApi, lines: ["northern"] })
const filtered = await source.fetchItems(createContext())
expect(filtered.length).toBe(1)
source.setLinesOfInterest([...TflSource.DEFAULT_LINES_OF_INTEREST])
const all = await source.fetchItems(createContext())
expect(all.length).toBe(2)
})
})
describe("fetchItems", () => {
test("returns feed items array", async () => {
const source = new TflSource({ client: api })

View File

@@ -42,18 +42,46 @@ const SEVERITY_PRIORITY: Record<TflAlertSeverity, number> = {
* ```
*/
export class TflSource implements FeedSource<TflAlertFeedItem> {
static readonly DEFAULT_LINES_OF_INTEREST: readonly TflLineId[] = [
"bakerloo",
"central",
"circle",
"district",
"hammersmith-city",
"jubilee",
"metropolitan",
"northern",
"piccadilly",
"victoria",
"waterloo-city",
"lioness",
"mildmay",
"windrush",
"weaver",
"suffragette",
"liberty",
"elizabeth",
]
readonly id = "tfl"
readonly dependencies = ["location"]
private readonly client: ITflApi
private readonly lines?: TflLineId[]
private lines: TflLineId[]
constructor(options: TflSourceOptions) {
if (!options.client && !options.apiKey) {
throw new Error("Either client or apiKey must be provided")
}
this.client = options.client ?? new TflApi(options.apiKey!)
this.lines = options.lines
this.lines = options.lines ?? [...TflSource.DEFAULT_LINES_OF_INTEREST]
}
/**
* Update the set of monitored lines. Takes effect on the next fetchItems call.
*/
setLinesOfInterest(lines: TflLineId[]): void {
this.lines = lines
}
async fetchItems(context: Context): Promise<TflAlertFeedItem[]> {