feat(backend): add WeatherService

Manage per-user WeatherSource instances via FeedSourceProvider,
following the same pattern as LocationService. Wire into
FeedEngineService so weather data is included in the feed.

Co-authored-by: Ona <no-reply@ona.com>
This commit is contained in:
2026-02-13 19:16:33 +00:00
parent 31a82c1d9f
commit e5f1273baf
7 changed files with 214 additions and 20 deletions

View File

@@ -6,3 +6,9 @@ BETTER_AUTH_SECRET=
# Base URL of the backend # Base URL of the backend
BETTER_AUTH_URL=http://localhost:3000 BETTER_AUTH_URL=http://localhost:3000
# Apple WeatherKit credentials
WEATHERKIT_PRIVATE_KEY=
WEATHERKIT_KEY_ID=
WEATHERKIT_TEAM_ID=
WEATHERKIT_SERVICE_ID=

View File

@@ -1,10 +1,11 @@
import { TRPCError } from "@trpc/server" import { TRPCError } from "@trpc/server"
import { type } from "arktype" import { type } from "arktype"
import { UserNotFoundError } from "../lib/error.ts"
import type { TRPC } from "../trpc/router.ts" import type { TRPC } from "../trpc/router.ts"
import type { LocationService } from "./service.ts" import type { LocationService } from "./service.ts"
import { UserNotFoundError } from "../lib/error.ts"
const locationInput = type({ const locationInput = type({
lat: "number", lat: "number",
lng: "number", lng: "number",
@@ -12,7 +13,10 @@ const locationInput = type({
timestamp: "Date", timestamp: "Date",
}) })
export function createLocationRouter(t: TRPC, { locationService }: { locationService: LocationService }) { export function createLocationRouter(
t: TRPC,
{ locationService }: { locationService: LocationService },
) {
return t.router({ return t.router({
update: t.procedure.input(locationInput).mutation(({ input, ctx }) => { update: t.procedure.input(locationInput).mutation(({ input, ctx }) => {
try { try {

View File

@@ -5,11 +5,21 @@ import { registerAuthHandlers } from "./auth/http.ts"
import { LocationService } from "./location/service.ts" import { LocationService } from "./location/service.ts"
import { createContext } from "./trpc/context.ts" import { createContext } from "./trpc/context.ts"
import { createTRPCRouter } from "./trpc/router.ts" import { createTRPCRouter } from "./trpc/router.ts"
import { WeatherService } from "./weather/service.ts"
function main() { function main() {
const locationService = new LocationService() const locationService = new LocationService()
const trpcRouter = createTRPCRouter({ locationService }) const weatherService = new WeatherService({
credentials: {
privateKey: process.env.WEATHERKIT_PRIVATE_KEY!,
keyId: process.env.WEATHERKIT_KEY_ID!,
teamId: process.env.WEATHERKIT_TEAM_ID!,
serviceId: process.env.WEATHERKIT_SERVICE_ID!,
},
})
const trpcRouter = createTRPCRouter({ locationService, weatherService })
const app = new Hono() const app = new Hono()

View File

@@ -1,9 +1,11 @@
import { initTRPC, TRPCError } from "@trpc/server" import { initTRPC, TRPCError } from "@trpc/server"
import { createLocationRouter } from "../location/router.ts"
import type { LocationService } from "../location/service.ts" import type { LocationService } from "../location/service.ts"
import type { WeatherService } from "../weather/service.ts"
import type { Context } from "./context.ts" import type { Context } from "./context.ts"
import { createLocationRouter } from "../location/router.ts"
interface AuthedContext { interface AuthedContext {
user: NonNullable<Context["user"]> user: NonNullable<Context["user"]>
session: NonNullable<Context["session"]> session: NonNullable<Context["session"]>
@@ -34,6 +36,7 @@ export type TRPC = ReturnType<typeof createTRPC>
export interface TRPCRouterDeps { export interface TRPCRouterDeps {
locationService: LocationService locationService: LocationService
weatherService: WeatherService
} }
export function createTRPCRouter({ locationService }: TRPCRouterDeps) { export function createTRPCRouter({ locationService }: TRPCRouterDeps) {

View File

@@ -0,0 +1,116 @@
import type { Context } from "@aris/core"
import { LocationKey } from "@aris/source-location"
import {
Units,
WeatherFeedItemType,
type WeatherKitClient,
type WeatherKitResponse,
} from "@aris/source-weatherkit"
import { describe, expect, test } from "bun:test"
import fixture from "../../../../packages/aris-source-weatherkit/fixtures/san-francisco.json"
import { WeatherService } from "./service.ts"
const mockClient = createMockClient(fixture.response as WeatherKitResponse)
function createMockClient(response: WeatherKitResponse): WeatherKitClient {
return {
fetch: async () => response,
}
}
function createMockContext(location?: { lat: number; lng: number }): Context {
const ctx: Context = { time: new Date("2026-01-17T00:00:00Z") }
if (location) {
ctx[LocationKey] = { ...location, accuracy: 10, timestamp: new Date() }
}
return ctx
}
describe("WeatherService", () => {
test("feedSourceForUser creates source on first call", () => {
const service = new WeatherService({ client: mockClient })
const source = service.feedSourceForUser("user-1")
expect(source).toBeDefined()
expect(source.id).toBe("weather")
})
test("feedSourceForUser returns same source for same user", () => {
const service = new WeatherService({ client: mockClient })
const source1 = service.feedSourceForUser("user-1")
const source2 = service.feedSourceForUser("user-1")
expect(source1).toBe(source2)
})
test("feedSourceForUser returns different sources for different users", () => {
const service = new WeatherService({ client: mockClient })
const source1 = service.feedSourceForUser("user-1")
const source2 = service.feedSourceForUser("user-2")
expect(source1).not.toBe(source2)
})
test("feedSourceForUser applies hourly and daily limits", async () => {
const service = new WeatherService({
client: mockClient,
hourlyLimit: 3,
dailyLimit: 2,
})
const source = service.feedSourceForUser("user-1")
const context = createMockContext({ lat: 37.7749, lng: -122.4194 })
const items = await source.fetchItems(context)
const hourly = items.filter((i) => i.type === WeatherFeedItemType.hourly)
const daily = items.filter((i) => i.type === WeatherFeedItemType.daily)
expect(hourly).toHaveLength(3)
expect(daily).toHaveLength(2)
})
test("feedSourceForUser applies units", async () => {
const service = new WeatherService({
client: mockClient,
units: Units.imperial,
})
const source = service.feedSourceForUser("user-1")
const context = createMockContext({ lat: 37.7749, lng: -122.4194 })
const items = await source.fetchItems(context)
const current = items.find((i) => i.type === WeatherFeedItemType.current)
expect(current).toBeDefined()
// Fixture has ~15.87°C, imperial should be ~60.6°F
expect(current!.data.temperature).toBeGreaterThan(50)
})
test("removeUser removes the source", () => {
const service = new WeatherService({ client: mockClient })
service.feedSourceForUser("user-1")
service.removeUser("user-1")
// After removal, feedSourceForUser should create a new instance
const source2 = service.feedSourceForUser("user-1")
expect(source2).toBeDefined()
})
test("removeUser allows new source to be created", () => {
const service = new WeatherService({ client: mockClient })
const source1 = service.feedSourceForUser("user-1")
service.removeUser("user-1")
const source2 = service.feedSourceForUser("user-1")
expect(source1).not.toBe(source2)
})
test("removeUser is no-op for unknown user", () => {
const service = new WeatherService({ client: mockClient })
expect(() => service.removeUser("unknown")).not.toThrow()
})
})

View File

@@ -0,0 +1,40 @@
import { WeatherSource, type WeatherSourceOptions } from "@aris/source-weatherkit"
import type { FeedSourceProvider } from "../feed/service.ts"
/**
* Options forwarded to every per-user WeatherSource.
* Must include either `credentials` or `client` (same requirement as WeatherSourceOptions).
*/
export type WeatherServiceOptions = WeatherSourceOptions
/**
* Manages WeatherSource instances per user.
*/
export class WeatherService implements FeedSourceProvider {
private sources = new Map<string, WeatherSource>()
private readonly options: WeatherServiceOptions
constructor(options: WeatherServiceOptions) {
this.options = options
}
/**
* Get or create a WeatherSource for a user.
*/
feedSourceForUser(userId: string): WeatherSource {
let source = this.sources.get(userId)
if (!source) {
source = new WeatherSource(this.options)
this.sources.set(userId, source)
}
return source
}
/**
* Remove a user's WeatherSource.
*/
removeUser(userId: string): void {
this.sources.delete(userId)
}
}

View File

@@ -7,12 +7,14 @@ ARIS needs a backend service that manages per-user FeedEngine instances and deli
## Requirements ## Requirements
### Authentication ### Authentication
- Email/password authentication using BetterAuth - Email/password authentication using BetterAuth
- PostgreSQL for session and user storage - PostgreSQL for session and user storage
- Session tokens validated via `Authorization: Bearer <token>` header - Session tokens validated via `Authorization: Bearer <token>` header
- Auth endpoints exposed via BetterAuth's built-in routes - Auth endpoints exposed via BetterAuth's built-in routes
### FeedEngine Management ### FeedEngine Management
- Each authenticated user gets their own FeedEngine instance - Each authenticated user gets their own FeedEngine instance
- Instances are cached in memory with a 30-minute TTL - Instances are cached in memory with a 30-minute TTL
- TTL resets on any activity (WebSocket message, location update) - TTL resets on any activity (WebSocket message, location update)
@@ -20,6 +22,7 @@ ARIS needs a backend service that manages per-user FeedEngine instances and deli
- Source configuration is hardcoded initially (customization deferred) - Source configuration is hardcoded initially (customization deferred)
### WebSocket Connection ### WebSocket Connection
- Single endpoint: `GET /ws` (upgrades to WebSocket) - Single endpoint: `GET /ws` (upgrades to WebSocket)
- Authentication via `Authorization: Bearer <token>` header on upgrade request - Authentication via `Authorization: Bearer <token>` header on upgrade request
- Rejected before upgrade if token is invalid - Rejected before upgrade if token is invalid
@@ -28,20 +31,24 @@ ARIS needs a backend service that manages per-user FeedEngine instances and deli
- On connect: immediately send current feed state - On connect: immediately send current feed state
### JSON-RPC Protocol ### JSON-RPC Protocol
All WebSocket communication uses JSON-RPC 2.0. All WebSocket communication uses JSON-RPC 2.0.
**Client → Server (Requests):** **Client → Server (Requests):**
```json ```json
{ "jsonrpc": "2.0", "method": "location.update", "params": { "lat": 51.5, "lng": -0.1, "accuracy": 10, "timestamp": "2025-01-01T12:00:00Z" }, "id": 1 } { "jsonrpc": "2.0", "method": "location.update", "params": { "lat": 51.5, "lng": -0.1, "accuracy": 10, "timestamp": "2025-01-01T12:00:00Z" }, "id": 1 }
{ "jsonrpc": "2.0", "method": "feed.refresh", "params": {}, "id": 2 } { "jsonrpc": "2.0", "method": "feed.refresh", "params": {}, "id": 2 }
``` ```
**Server → Client (Responses):** **Server → Client (Responses):**
```json ```json
{ "jsonrpc": "2.0", "result": { "ok": true }, "id": 1 } { "jsonrpc": "2.0", "result": { "ok": true }, "id": 1 }
``` ```
**Server → Client (Notifications - no id):** **Server → Client (Notifications - no id):**
```json ```json
{ "jsonrpc": "2.0", "method": "feed.update", "params": { "items": [...], "errors": [...] } } { "jsonrpc": "2.0", "method": "feed.update", "params": { "items": [...], "errors": [...] } }
``` ```
@@ -49,18 +56,19 @@ All WebSocket communication uses JSON-RPC 2.0.
### JSON-RPC Methods ### JSON-RPC Methods
| Method | Params | Description | | Method | Params | Description |
|--------|--------|-------------| | ----------------- | ----------------------------------- | ------------------------------------------- |
| `location.update` | `{ lat, lng, accuracy, timestamp }` | Push location update, triggers feed refresh | | `location.update` | `{ lat, lng, accuracy, timestamp }` | Push location update, triggers feed refresh |
| `feed.refresh` | `{}` | Force manual feed refresh | | `feed.refresh` | `{}` | Force manual feed refresh |
### Server Notifications ### Server Notifications
| Method | Params | Description | | Method | Params | Description |
|--------|--------|-------------| | ------------- | ---------------------------- | ---------------------- |
| `feed.update` | `{ context, items, errors }` | Feed state changed | | `feed.update` | `{ context, items, errors }` | Feed state changed |
| `error` | `{ code, message, data? }` | Source or system error | | `error` | `{ code, message, data? }` | Source or system error |
### Error Handling ### Error Handling
- Source failures during refresh are reported via `error` notification - Source failures during refresh are reported via `error` notification
- Format: `{ "jsonrpc": "2.0", "method": "error", "params": { "code": -32000, "message": "...", "data": { "sourceId": "weather" } } }` - Format: `{ "jsonrpc": "2.0", "method": "error", "params": { "code": -32000, "message": "...", "data": { "sourceId": "weather" } } }`
@@ -96,16 +104,19 @@ All WebSocket communication uses JSON-RPC 2.0.
## Implementation Approach ## Implementation Approach
### Phase 1: Project Setup ### Phase 1: Project Setup
1. Create `apps/aris-backend` with Hono 1. Create `apps/aris-backend` with Hono
2. Configure TypeScript, add dependencies (hono, better-auth, postgres driver) 2. Configure TypeScript, add dependencies (hono, better-auth, postgres driver)
3. Set up database connection and BetterAuth 3. Set up database connection and BetterAuth
### Phase 2: Authentication ### Phase 2: Authentication
4. Configure BetterAuth with email/password provider 4. Configure BetterAuth with email/password provider
5. Mount BetterAuth routes at `/api/auth/*` 5. Mount BetterAuth routes at `/api/auth/*`
6. Create session validation helper for extracting user from token 6. Create session validation helper for extracting user from token
### Phase 3: FeedEngine Manager ### Phase 3: FeedEngine Manager
7. Create `FeedEngineManager` class: 7. Create `FeedEngineManager` class:
- `getOrCreate(userId): FeedEngine` - returns cached or creates new - `getOrCreate(userId): FeedEngine` - returns cached or creates new
- `touch(userId)` - resets TTL - `touch(userId)` - resets TTL
@@ -114,22 +125,26 @@ All WebSocket communication uses JSON-RPC 2.0.
8. Factory function to create FeedEngine with default sources 8. Factory function to create FeedEngine with default sources
### Phase 4: WebSocket Handler ### Phase 4: WebSocket Handler
9. Create WebSocket upgrade endpoint at `/ws` 9. Create WebSocket upgrade endpoint at `/ws`
10. Validate `Authorization` header before upgrade 10. Validate `Authorization` header before upgrade
11. On connect: register connection, send initial feed state 11. On connect: register connection, send initial feed state
12. On disconnect: unregister connection 12. On disconnect: unregister connection
### Phase 5: JSON-RPC Handler ### Phase 5: JSON-RPC Handler
13. Create JSON-RPC message parser and dispatcher 13. Create JSON-RPC message parser and dispatcher
14. Implement `location.update` method 14. Implement `location.update` method
15. Implement `feed.refresh` method 15. Implement `feed.refresh` method
16. Wire FeedEngine subscription to broadcast `feed.update` to all user connections 16. Wire FeedEngine subscription to broadcast `feed.update` to all user connections
### Phase 6: Connection Manager ### Phase 6: Connection Manager
17. Create `ConnectionManager` to track WebSocket connections per user 17. Create `ConnectionManager` to track WebSocket connections per user
18. Broadcast helper to send to all connections for a user 18. Broadcast helper to send to all connections for a user
### Phase 7: Integration & Testing ### Phase 7: Integration & Testing
19. Integration test: auth → connect → location update → receive feed 19. Integration test: auth → connect → location update → receive feed
20. Test multiple connections receive same updates 20. Test multiple connections receive same updates
21. Test TTL cleanup 21. Test TTL cleanup