feat: initial zigbee control implementation
Some checks failed
Build and Publish Docker Image / build-and-push (push) Failing after 27s

Co-authored-by: Ona <no-reply@ona.com>
This commit is contained in:
2025-10-28 01:11:57 +00:00
parent 2e63609129
commit b55d99dd9e
30 changed files with 901 additions and 70 deletions

View File

@@ -1,18 +1,21 @@
{
"name": "backend",
"version": "1.0.0",
"private": true,
"scripts": {
"dev": "bun run --watch src/index.ts",
"build": "bun build src/index.ts --outdir dist --target bun",
"start": "bun run dist/index.js",
"typecheck": "tsc --noEmit"
},
"dependencies": {
"hono": "^4.6.14"
},
"devDependencies": {
"@types/bun": "latest",
"typescript": "^5.6.3"
}
"name": "@eva/backend",
"version": "1.0.0",
"private": true,
"scripts": {
"dev": "bun run --watch src/index.ts",
"build": "bun build src/index.ts --outdir dist --target bun",
"start": "bun run dist/index.js",
"typecheck": "tsc --noEmit"
},
"dependencies": {
"hono": "^4.6.14",
"mqtt": "^5.14.1",
"@eva/jrpc": "workspace:*",
"@eva/zigbee": "workspace:*"
},
"devDependencies": {
"@types/bun": "latest",
"typescript": "^5.6.3"
}
}

View File

@@ -8,5 +8,9 @@ declare namespace NodeJS {
BESZEL_HOST?: string
BESZEL_EMAIL?: string
BESZEL_PASSWORD?: string
MQTT_HOST: string
MQTT_PORT: number
MQTT_USERNAME: string
MQTT_PASSWORD: string
}
}

View File

@@ -1,10 +1,19 @@
import { Hono } from "hono"
import { serveStatic, websocket } from "hono/bun"
import { cors } from "hono/cors"
import { logger } from "hono/logger"
import { serveStatic } from "hono/bun"
import weather from "./weather"
import tfl from "./tfl"
import beszel from "./beszel"
import { createMqttClient } from "./mqtt"
import tfl from "./tfl"
import weather from "./weather"
import zigbee from "./zigbee/routes"
const mqtt = await createMqttClient({
host: process.env.MQTT_HOST,
port: process.env.MQTT_PORT,
username: process.env.MQTT_USERNAME,
password: process.env.MQTT_PASSWORD,
})
const app = new Hono()
@@ -24,6 +33,9 @@ app.route("/api/tfl", tfl)
// Mount Beszel routes
app.route("/api/beszel", beszel)
// Mount Zigbee routes
app.route("/api/zigbee", zigbee(mqtt))
// Serve static files from dashboard build
app.use("/*", serveStatic({ root: "../dashboard/dist" }))
@@ -33,4 +45,5 @@ app.get("*", serveStatic({ path: "../dashboard/dist/index.html" }))
export default {
port: 8000,
fetch: app.fetch,
websocket,
}

6
apps/backend/src/jrpc.ts Normal file
View File

@@ -0,0 +1,6 @@
export type JrpcRequest = {
jsonrpc: "2.0"
method: string
params: unknown
id: number
}

15
apps/backend/src/mqtt.ts Normal file
View File

@@ -0,0 +1,15 @@
import mqtt from "mqtt"
export async function createMqttClient({
host,
port,
username,
password,
}: { host: string; port: number; username: string; password: string }) {
return await mqtt.connectAsync({
host,
port,
username,
password,
})
}

View File

@@ -0,0 +1 @@
const BASE_TOPIC = "nexus"

View File

@@ -0,0 +1,50 @@
import type { ZigbeeDeviceName } from "@eva/zigbee"
import type { MqttClient } from "mqtt"
export type DeviceMessageListener = (msg: unknown) => void
export class ZigbeeController {
private deviceListeners: Map<string, DeviceMessageListener[]> = new Map()
constructor(
private readonly baseTopic: string,
private readonly mqtt: MqttClient,
) {
this.mqtt.on("message", (topic, message) => {
const [baseTopic, deviceName] = topic.split("/")
if (baseTopic !== this.baseTopic) {
return
}
const listeners = this.deviceListeners.get(deviceName)
if (listeners) {
for (const listener of listeners) {
listener(JSON.parse(message.toString()))
}
}
})
}
async subscribeToDevice(deviceName: ZigbeeDeviceName, listener: DeviceMessageListener): Promise<void> {
await this.mqtt.publishAsync(`${this.baseTopic}/${deviceName}/get`, JSON.stringify({ state: {} }))
await this.mqtt.subscribeAsync(`${this.baseTopic}/${deviceName}`)
if (!this.deviceListeners.has(deviceName)) {
this.deviceListeners.set(deviceName, [])
}
this.deviceListeners.get(deviceName)?.push(listener)
}
async unsubscribeFromDevice(deviceName: ZigbeeDeviceName, listener: DeviceMessageListener): Promise<void> {
await this.mqtt.unsubscribeAsync(`${this.baseTopic}/${deviceName}`)
const listeners = this.deviceListeners.get(deviceName)
if (listeners) {
listeners.splice(listeners.indexOf(listener), 1)
if (listeners.length === 0) {
this.deviceListeners.delete(deviceName)
}
}
}
async setDeviceState(deviceName: ZigbeeDeviceName, state: unknown): Promise<void> {
await this.mqtt.publishAsync(`${this.baseTopic}/${deviceName}/set`, JSON.stringify(state))
}
}

View File

@@ -0,0 +1,18 @@
import { ZIGBEE_BASE_TOPIC } from "@eva/zigbee"
import { createMiddleware } from "hono/factory"
import type { MqttClient } from "mqtt"
import { ZigbeeController } from "./controller"
export function zigbeeController(mqtt: MqttClient) {
const controller = new ZigbeeController(ZIGBEE_BASE_TOPIC, mqtt)
return createMiddleware((c, next) => {
c.set("zigbeeController", controller)
return next()
})
}
export type ZigbeeContext = {
Variables: {
zigbeeController: ZigbeeController
}
}

View File

@@ -0,0 +1,36 @@
import { Hono } from "hono"
import { upgradeWebSocket } from "hono/bun"
import type { WSContext } from "hono/ws"
import type { MqttClient } from "mqtt"
import type { ZigbeeController } from "./controller"
import { type ZigbeeContext, zigbeeController } from "./middleware"
import { WebSocketHandler } from "./ws"
export function zigbee(mqtt: MqttClient) {
const h = new Hono<ZigbeeContext>()
h.use("*", zigbeeController(mqtt))
h.get(
"/",
upgradeWebSocket((c) => {
const controller = c.get("zigbeeController") as ZigbeeController
const wsHandler = new WebSocketHandler(controller)
return {
onOpen: (event, ws) => {
wsHandler.handleWebsocketOpen(event, ws)
},
onMessage: (event, ws) => {
wsHandler.handleWebsocketMessage(event, ws)
},
onClose: (event, ws) => {
wsHandler.handleWebsocketClose(ws)
},
}
}),
)
return h
}
export default zigbee

View File

@@ -0,0 +1,54 @@
import type { JrpcRequest, JrpcResponse } from "@eva/jrpc"
import { ALL_ZIGBEE_DEVICE_NAMES, type ZigbeeDeviceName, type ZigbeeDeviceState } from "@eva/zigbee"
import type { WSContext } from "hono/ws"
import type { DeviceMessageListener, ZigbeeController } from "./controller"
export class WebSocketHandler {
private deviceListeners: Map<ZigbeeDeviceName, DeviceMessageListener> = new Map()
constructor(private readonly controller: ZigbeeController) {}
handleWebsocketOpen(event: Event, ws: WSContext) {
for (const device of ALL_ZIGBEE_DEVICE_NAMES) {
const l: DeviceMessageListener = (msg) => {
const state = msg as ZigbeeDeviceState
const request: JrpcRequest<"showDeviceState"> = {
id: crypto.randomUUID(),
jsonrpc: "2.0",
method: "showDeviceState",
params: { deviceName: device, state },
}
ws.send(JSON.stringify(request))
}
this.controller.subscribeToDevice(device, l)
this.deviceListeners.set(device, l)
}
}
async handleWebsocketMessage(event: MessageEvent, ws: WSContext) {
const message = JSON.parse(event.data) as JrpcRequest | JrpcResponse
if ("method" in message) {
await this.handleRequest(message, ws)
}
}
handleWebsocketClose(_ws: WSContext) {
for (const [device, listener] of this.deviceListeners.entries()) {
this.controller.unsubscribeFromDevice(device, listener)
}
}
private async handleRequest(message: JrpcRequest, ws: WSContext) {
switch (message.method) {
case "setDeviceState": {
await this.controller.setDeviceState(message.params.deviceName, message.params.state)
const response: JrpcResponse<"setDeviceState"> = {
id: message.id,
jsonrpc: "2.0",
result: true,
}
ws.send(JSON.stringify(response))
}
}
}
}