Files
7am/main.go

807 lines
21 KiB
Go
Raw Permalink Normal View History

2025-05-10 00:36:38 +01:00
package main
import (
"context"
2025-05-10 13:04:39 +01:00
"database/sql"
2025-05-10 00:36:38 +01:00
"embed"
_ "embed"
2025-05-10 13:04:39 +01:00
"encoding/json"
"errors"
2025-05-10 15:21:48 +01:00
"flag"
2025-05-10 00:36:38 +01:00
"fmt"
2025-05-10 13:04:39 +01:00
"github.com/SherClockHolmes/webpush-go"
2025-05-10 00:36:38 +01:00
"github.com/go-co-op/gocron/v2"
2025-05-10 13:04:39 +01:00
"github.com/google/uuid"
2025-05-10 00:36:38 +01:00
"github.com/joho/godotenv"
"google.golang.org/genai"
"html/template"
"log"
2025-05-11 11:46:59 +01:00
"log/slog"
2025-05-10 00:36:38 +01:00
"mime"
2025-05-10 13:04:39 +01:00
_ "modernc.org/sqlite"
2025-05-10 00:36:38 +01:00
"net/http"
"os"
"path/filepath"
2025-05-10 18:27:29 +01:00
"slices"
2025-05-10 00:36:38 +01:00
"strings"
"sync"
"time"
)
type location struct {
2025-05-11 11:14:38 +01:00
tz *time.Location
lat float32
lon float32
2025-05-10 22:44:47 +01:00
ianaName string
2025-05-11 11:14:38 +01:00
displayName string
2025-05-10 00:36:38 +01:00
}
2025-05-10 15:13:31 +01:00
// pageTemplate stores all pre-compiled HTML templates for the application
2025-05-10 00:36:38 +01:00
type pageTemplate struct {
summary *template.Template
}
2025-05-10 15:13:31 +01:00
// summaryTemplateData stores template data for summary.html
2025-05-10 00:36:38 +01:00
type summaryTemplateData struct {
2025-05-11 00:08:20 +01:00
Summary string
Location string
LocationName string
2025-05-10 00:36:38 +01:00
}
2025-05-10 15:13:31 +01:00
// updateSubscription is the request body for creating/updating registration
2025-05-10 13:04:39 +01:00
type updateSubscription struct {
2025-05-10 18:27:29 +01:00
Subscription webpush.Subscription `json:"subscription"`
Locations []string `json:"locations"`
RemoveLocations []string `json:"removeLocations"`
2025-05-10 13:04:39 +01:00
}
2025-05-10 15:13:31 +01:00
// registeredSubscription represents a registered webpush subscription.
2025-05-10 13:04:39 +01:00
type registeredSubscription struct {
ID uuid.UUID `json:"id"`
Subscription *webpush.Subscription `json:"-"`
Locations []string `json:"locations"`
}
2025-05-10 18:35:06 +01:00
type webpushNotificationPayload struct {
Summary string `json:"summary"`
Location string `json:"location"`
}
2025-05-12 12:09:58 +01:00
type metAPIData struct {
Properties struct {
TimeSeries []map[string]any `json:"timeseries"`
} `json:"properties"`
}
2025-05-12 18:58:31 +01:00
type updateSummaryOptions struct {
locKey string
location *location
pushUpdate bool
}
2025-05-10 13:04:39 +01:00
type state struct {
2025-05-11 11:14:38 +01:00
ctx context.Context
metAPIUserAgent string
genai *genai.Client
template pageTemplate
2025-05-10 13:04:39 +01:00
2025-05-12 20:09:31 +01:00
db *sql.DB
dbMutex sync.Mutex
2025-05-10 22:44:47 +01:00
2025-05-10 15:13:31 +01:00
// summaries maps location keys to their latest weather summary
summaries sync.Map
// summaryChans stores a map of location key to the corresponding summary channel
// which is used to track summary updates
2025-05-10 13:04:39 +01:00
summaryChans map[string]chan string
2025-05-10 15:13:31 +01:00
// subscriptions maps location keys to the list of registered subscriptions
// that are subscribed to updates for the location
subscriptions map[string][]*registeredSubscription
2025-05-10 15:13:31 +01:00
// subscriptionsMutex syncs writes to subscriptions
2025-05-10 13:04:39 +01:00
subscriptionsMutex sync.Mutex
vapidSubject string
2025-05-10 15:13:31 +01:00
// vapidPublicKey is the base64 url encoded VAPID public key
vapidPublicKey string
// vapidPrivateKey is the base64 url encoded VAPID private key
2025-05-10 13:04:39 +01:00
vapidPrivateKey string
}
2025-05-10 00:36:38 +01:00
//go:embed web
var webDir embed.FS
2025-05-11 11:14:38 +01:00
var envKeys = []string{"GEMINI_API_KEY", "MET_API_USER_AGENT", "VAPID_SUBJECT", "VAPID_PRIVATE_KEY_BASE64", "VAPID_PUBLIC_KEY_BASE64"}
2025-05-10 22:44:47 +01:00
2025-05-11 16:53:44 +01:00
//go:embed prompt.txt
var prompt string
2025-05-10 22:44:47 +01:00
2025-05-12 18:58:31 +01:00
var supportedLocations = map[string]*location{
"london": {nil, 51.507351, -0.127758, "Europe/London", "London"},
"sf": {nil, 37.774929, -122.419418, "America/Los_Angeles", "San Francisco"},
"sj": {nil, 37.338207, -121.886330, "America/Los_Angeles", "San Jose"},
"la": {nil, 34.052235, -118.243683, "America/Los_Angeles", "Los Angeles"},
"nyc": {nil, 40.712776, -74.005974, "America/New_York", "New York City"},
"tokyo": {nil, 35.689487, 139.691711, "Asia/Tokyo", "Tokyo"},
"singapore": {nil, 1.290270, 103.851959, "Asia/Singapore", "Singapore"},
"manila": {nil, 14.599512, 120.984222, "Asia/Manila", "Manila"},
"hk": {nil, 22.317053, 114.169547, "Asia/Hong_Kong", "Hong Kong"},
"warsaw": {nil, 52.229675, 21.012230, "Europe/Warsaw", "Warsaw"},
"zurich": {nil, 47.369019, 8.538030, "Europe/Zurich", "Zurich"},
"berlin": {nil, 52.520008, 13.404954, "Europe/Berlin", "Berlin"},
"dubai": {nil, 25.204849, 55.270782, "Asia/Dubai", "Dubai"},
"paris": {nil, 48.864716, 2.349014, "Europe/Paris", "Paris"},
"stockholm": {nil, 59.329323, 18.068581, "Europe/Stockholm", "Stockholm"},
"amsterdam": {nil, 52.370216, 4.895168, "Europe/Amsterdam", "Amsterdam"},
2025-05-10 16:21:43 +01:00
}
2025-05-10 00:36:38 +01:00
func main() {
2025-05-10 15:21:48 +01:00
port := flag.Int("port", 8080, "the port that the server should listen on")
2025-05-10 15:28:55 +01:00
genKeys := flag.Bool("generate-vapid-keys", false, "generate a new vapid key pair, which will be outputted to stdout.")
flag.Parse()
if *genKeys {
generateKeys()
2025-05-12 18:58:31 +01:00
} else if err := startServer(*port); err != nil {
log.Fatal(err)
2025-05-10 15:28:55 +01:00
}
2025-05-12 18:58:31 +01:00
}
2025-05-10 15:21:48 +01:00
2025-05-12 18:58:31 +01:00
func startServer(port int) error {
2025-05-11 11:46:59 +01:00
slog.Info("starting 7am...")
2025-05-12 18:58:31 +01:00
err := loadTimeZones()
if err != nil {
return err
}
2025-05-10 17:35:51 +01:00
_ = godotenv.Load()
2025-05-12 18:58:31 +01:00
err = checkEnv()
2025-05-10 00:36:38 +01:00
if err != nil {
2025-05-12 18:58:31 +01:00
return err
2025-05-10 00:36:38 +01:00
}
2025-05-11 11:53:17 +01:00
wd, err := os.Getwd()
if err != nil {
2025-05-12 18:58:31 +01:00
return fmt.Errorf("failed to get cwd: %w", err)
2025-05-11 11:53:17 +01:00
}
p := filepath.Join(wd, "data")
err = os.MkdirAll(p, os.ModePerm)
if err != nil {
2025-05-12 18:58:31 +01:00
return fmt.Errorf("failed to create data directory at %v: %w", p, err)
2025-05-11 11:53:17 +01:00
}
slog.Info("data directory created", "path", p)
2025-05-10 13:04:39 +01:00
db, err := initDB()
if err != nil {
2025-05-12 18:58:31 +01:00
return fmt.Errorf("failed to initialize db: %w", err)
2025-05-10 13:04:39 +01:00
}
2025-05-10 00:36:38 +01:00
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
genaiClient, err := genai.NewClient(ctx, &genai.ClientConfig{
APIKey: os.Getenv("GEMINI_API_KEY"),
Backend: genai.BackendGeminiAPI,
})
if err != nil {
2025-05-12 18:58:31 +01:00
return fmt.Errorf("failed to initialize gemini client: %w\n", err)
2025-05-10 00:36:38 +01:00
}
summaryHTML, _ := webDir.ReadFile("web/summary.html")
summaryPageTemplate, _ := template.New("summary.html").Parse(string(summaryHTML))
state := state{
2025-05-11 11:14:38 +01:00
ctx: ctx,
metAPIUserAgent: os.Getenv("MET_API_USER_AGENT"),
2025-05-10 00:36:38 +01:00
template: pageTemplate{
summary: summaryPageTemplate,
},
summaries: sync.Map{},
summaryChans: map[string]chan string{},
genai: genaiClient,
2025-05-10 13:04:39 +01:00
2025-05-12 20:09:31 +01:00
db: db,
dbMutex: sync.Mutex{},
2025-05-10 22:44:47 +01:00
subscriptions: map[string][]*registeredSubscription{},
2025-05-10 13:04:39 +01:00
vapidSubject: os.Getenv("VAPID_SUBJECT"),
2025-05-10 13:04:39 +01:00
vapidPublicKey: os.Getenv("VAPID_PUBLIC_KEY_BASE64"),
vapidPrivateKey: os.Getenv("VAPID_PRIVATE_KEY_BASE64"),
2025-05-10 00:36:38 +01:00
}
2025-05-12 18:58:31 +01:00
fetchInitialSummaries(&state)
2025-05-10 00:36:38 +01:00
var schedulers []gocron.Scheduler
2025-05-10 15:13:31 +01:00
// schedule periodic updates of weather summary for each supported location
2025-05-10 00:36:38 +01:00
for locKey, loc := range supportedLocations {
2025-05-12 18:58:31 +01:00
s, err := gocron.NewScheduler(gocron.WithLocation(loc.tz))
2025-05-10 00:36:38 +01:00
if err != nil {
2025-05-12 18:58:31 +01:00
return fmt.Errorf("failed to create gocron scheduler for %v: %w", locKey, err)
2025-05-10 00:36:38 +01:00
}
_, err = s.NewJob(
2025-05-11 00:12:23 +01:00
gocron.DailyJob(1, gocron.NewAtTimes(gocron.NewAtTime(7, 0, 0))),
2025-05-12 18:58:31 +01:00
gocron.NewTask(func(ctx context.Context) {
updateSummary(ctx, &state, updateSummaryOptions{
locKey: locKey,
location: loc,
pushUpdate: true,
})
}),
2025-05-10 13:04:39 +01:00
)
2025-05-10 00:36:38 +01:00
if err != nil {
2025-05-12 18:58:31 +01:00
return fmt.Errorf("failed to scheduel gocron job for %v: %w", locKey, err)
2025-05-10 00:36:38 +01:00
}
schedulers = append(schedulers, s)
2025-05-10 13:04:39 +01:00
c := make(chan string)
state.subscriptions[locKey] = []*registeredSubscription{}
2025-05-10 13:04:39 +01:00
state.summaryChans[locKey] = c
2025-05-10 15:13:31 +01:00
// listen for summary updates, and publish updates to all update subscribers via web push
2025-05-12 19:07:19 +01:00
go listenForSummaryUpdates(&state, locKey, c)
2025-05-10 00:36:38 +01:00
s.Start()
2025-05-11 11:46:59 +01:00
slog.Info("update job scheduled", "location", locKey)
2025-05-10 00:36:38 +01:00
}
2025-05-10 14:58:41 +01:00
err = loadSubscriptions(&state)
if err != nil {
2025-05-12 18:58:31 +01:00
return fmt.Errorf("failed to load existing subscriptions: %w", err)
2025-05-10 14:58:41 +01:00
}
2025-05-10 13:04:39 +01:00
2025-05-10 00:36:38 +01:00
http.HandleFunc("/", handleHTTPRequest(&state))
2025-05-10 15:21:48 +01:00
2025-05-12 18:58:31 +01:00
slog.Info("server starting", "port", port)
2025-05-10 15:21:48 +01:00
2025-05-12 18:58:31 +01:00
err = http.ListenAndServe(fmt.Sprintf(":%d", port), nil)
2025-05-10 14:58:41 +01:00
if err != nil {
2025-05-12 18:58:31 +01:00
return fmt.Errorf("failed to start http server: %w", err)
2025-05-10 14:58:41 +01:00
}
2025-05-10 00:36:38 +01:00
for _, s := range schedulers {
s.Shutdown()
}
2025-05-11 11:46:59 +01:00
slog.Info("7am shut down")
2025-05-12 18:58:31 +01:00
return nil
2025-05-10 00:36:38 +01:00
}
2025-05-10 15:28:55 +01:00
func generateKeys() {
priv, pub, err := webpush.GenerateVAPIDKeys()
if err != nil {
log.Fatal(err)
}
fmt.Println("all keys are base64 url encoded.")
fmt.Printf("public key: %v\n", pub)
fmt.Printf("private key: %v\n", priv)
}
2025-05-10 17:35:51 +01:00
func checkEnv() error {
var missing []string
for _, k := range envKeys {
v := os.Getenv(k)
if v == "" {
missing = append(missing, k)
}
}
if len(missing) > 0 {
return fmt.Errorf("missing env: %v", strings.Join(missing, ", "))
}
return nil
}
2025-05-10 00:36:38 +01:00
func handleHTTPRequest(state *state) http.HandlerFunc {
return func(writer http.ResponseWriter, request *http.Request) {
path := strings.TrimPrefix(request.URL.Path, "/")
2025-05-10 13:04:39 +01:00
if path == "" {
if request.Method == "" || request.Method == "GET" {
index, _ := webDir.ReadFile("web/index.html")
writer.Write(index)
} else {
writer.WriteHeader(http.StatusMethodNotAllowed)
}
2025-05-11 13:11:46 +01:00
} else if path == "instructions" {
f, _ := webDir.ReadFile("web/instructions.html")
writer.Write(f)
2025-05-10 13:04:39 +01:00
} else if path == "vapid" {
if request.Method == "" || request.Method == "GET" {
writer.Write([]byte(state.vapidPublicKey))
} else {
writer.WriteHeader(http.StatusMethodNotAllowed)
2025-05-10 00:36:38 +01:00
}
2025-05-10 13:04:39 +01:00
} else if strings.HasPrefix(path, "registrations") {
if path == "registrations" && request.Method == "POST" {
defer request.Body.Close()
update := updateSubscription{}
err := json.NewDecoder(request.Body).Decode(&update)
if err != nil {
writer.WriteHeader(http.StatusBadRequest)
return
}
2025-05-10 00:36:38 +01:00
2025-05-10 13:04:39 +01:00
reg, err := registerSubscription(state, &update)
if err != nil {
2025-05-11 11:46:59 +01:00
slog.Error("web push subscription registration failed", "error", err)
2025-05-10 13:04:39 +01:00
writer.WriteHeader(http.StatusBadRequest)
return
}
err = json.NewEncoder(writer).Encode(reg)
if err != nil {
writer.WriteHeader(http.StatusBadRequest)
2025-05-11 11:46:59 +01:00
} else {
slog.Info("new web push registration", "id", reg.ID)
2025-05-10 13:04:39 +01:00
}
2025-05-10 14:58:41 +01:00
} else if request.Method == "PATCH" || request.Method == "DELETE" {
2025-05-10 13:04:39 +01:00
parts := strings.Split(path, "/")
if len(parts) < 2 {
writer.WriteHeader(http.StatusMethodNotAllowed)
return
}
regID, err := uuid.Parse(parts[1])
if err != nil {
writer.WriteHeader(http.StatusNotFound)
return
}
2025-05-10 14:58:41 +01:00
switch request.Method {
case "PATCH":
defer request.Body.Close()
2025-05-10 13:04:39 +01:00
2025-05-10 14:58:41 +01:00
update := updateSubscription{}
err = json.NewDecoder(request.Body).Decode(&update)
if err != nil {
writer.WriteHeader(http.StatusBadRequest)
return
}
2025-05-10 13:04:39 +01:00
2025-05-10 14:58:41 +01:00
reg, err := updateRegisteredSubscription(state, regID, &update)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
writer.WriteHeader(http.StatusNotFound)
} else {
writer.WriteHeader(http.StatusInternalServerError)
}
2025-05-10 13:04:39 +01:00
} else {
2025-05-10 14:58:41 +01:00
json.NewEncoder(writer).Encode(reg)
2025-05-11 11:46:59 +01:00
slog.Info("web push registration updated", "id", reg.ID, "locations", strings.Join(reg.Locations, ","))
2025-05-10 14:58:41 +01:00
}
case "DELETE":
err = deleteSubscription(state, regID)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
writer.WriteHeader(http.StatusNotFound)
} else {
writer.WriteHeader(http.StatusInternalServerError)
}
} else {
writer.WriteHeader(http.StatusNoContent)
2025-05-11 11:46:59 +01:00
slog.Info("web push registration deleted", "id", regID)
2025-05-10 13:04:39 +01:00
}
}
2025-05-10 00:36:38 +01:00
2025-05-10 13:04:39 +01:00
} else {
writer.WriteHeader(http.StatusMethodNotAllowed)
}
2025-06-20 22:04:30 +01:00
} else if strings.HasPrefix(path, "api/") {
if origin := request.Header.Get("Origin"); origin != "" {
writer.Header().Set("Access-Control-Allow-Origin", origin)
}
if strings.HasPrefix(path, "api/summary/") {
switch request.Method {
case "OPTIONS":
writer.Header().Set("Access-Control-Allow-Methods", "GET, OPTIONS")
writer.WriteHeader(http.StatusOK)
case "GET":
location := strings.TrimPrefix(path, "api/summary/")
summary, ok := state.summaries.Load(location)
if !ok {
writer.WriteHeader(http.StatusNotFound)
return
}
response := map[string]string{
"summary": strings.TrimSpace(summary.(string)),
}
writer.Header().Set("Content-Type", "application/json")
json.NewEncoder(writer).Encode(response)
default:
writer.WriteHeader(http.StatusMethodNotAllowed)
}
} else {
writer.WriteHeader(http.StatusNotFound)
}
2025-05-10 13:04:39 +01:00
} else {
if request.Method != "" && request.Method != "GET" {
writer.WriteHeader(http.StatusMethodNotAllowed)
return
2025-05-10 00:36:38 +01:00
}
summary, ok := state.summaries.Load(path)
if ok {
2025-05-11 00:08:20 +01:00
loc := supportedLocations[path]
state.template.summary.Execute(writer, summaryTemplateData{summary.(string), path, loc.displayName})
2025-05-10 00:36:38 +01:00
} else {
f, err := webDir.ReadFile("web/" + path)
if err != nil {
2025-05-10 14:58:41 +01:00
writer.WriteHeader(http.StatusNotFound)
2025-05-10 00:36:38 +01:00
} else {
m := mime.TypeByExtension(filepath.Ext(path))
if m != "" {
writer.Header().Set("Content-Type", m)
}
writer.Write(f)
}
}
}
}
}
2025-05-10 13:04:39 +01:00
func initDB() (*sql.DB, error) {
2025-05-10 19:14:37 +01:00
db, err := sql.Open("sqlite", "file:data/data.sqlite")
2025-05-10 13:04:39 +01:00
if err != nil {
log.Fatalln("failed to initialize database")
}
2025-05-10 00:36:38 +01:00
2025-05-10 13:04:39 +01:00
_, err = db.Exec(`
2025-05-12 20:09:31 +01:00
PRAGMA journal_mode = WAL;
PRAGMA synchronous = NORMAL;
2025-05-10 13:04:39 +01:00
CREATE TABLE IF NOT EXISTS subscriptions(
id TEXT PRIMARY KEY,
locations TEXT NOT NULL,
subscription_json TEXT NOT NULL
);
2025-05-12 18:58:31 +01:00
CREATE TABLE IF NOT EXISTS summaries(
location TEXT PRIMARY KEY,
summary TEXT NOT NULL
);
2025-05-10 13:04:39 +01:00
`)
if err != nil {
return nil, err
}
return db, nil
}
func loadSubscriptions(state *state) error {
rows, err := state.db.Query(`SELECT id, locations, subscription_json FROM subscriptions;`)
if err != nil {
return err
}
defer rows.Close()
2025-05-10 13:04:39 +01:00
for rows.Next() {
var id string
var locations string
var j string
err := rows.Scan(&id, &locations, &j)
2025-05-10 00:36:38 +01:00
if err != nil {
2025-05-11 11:46:59 +01:00
slog.Warn("unable to load a subscription", "error", err)
2025-05-10 13:04:39 +01:00
continue
2025-05-10 00:36:38 +01:00
}
2025-05-10 13:04:39 +01:00
s := webpush.Subscription{}
err = json.Unmarshal([]byte(j), &s)
if err != nil {
2025-05-11 11:46:59 +01:00
slog.Warn("invalid web push subscription json encountered", "id", id, "error", err)
2025-05-10 13:04:39 +01:00
continue
}
2025-05-10 00:36:38 +01:00
reg := &registeredSubscription{
2025-05-10 13:04:39 +01:00
ID: uuid.MustParse(id),
Locations: strings.Split(locations, ","),
Subscription: &s,
}
for _, l := range reg.Locations {
state.subscriptions[l] = append(state.subscriptions[l], reg)
2025-05-10 00:36:38 +01:00
}
}
2025-05-10 13:04:39 +01:00
return nil
}
func updateRegisteredSubscription(state *state, id uuid.UUID, update *updateSubscription) (*registeredSubscription, error) {
j, err := json.Marshal(update.Subscription)
if err != nil {
return nil, err
}
rows, err := state.db.Query("SELECT locations FROM subscriptions WHERE id = ?", id)
if err != nil {
return nil, err
}
rows.Next()
var locStr string
err = rows.Scan(&locStr)
if err != nil {
return nil, err
}
rows.Close()
2025-05-10 18:27:29 +01:00
// not very proud of this one
// ideally the list of locations should be stored in a separate table
// but since the list is very small, and im too lazy to bring in a separate table
// this should be fine for now
locs := strings.Split(locStr, ",")
locs = append(locs, update.Locations...)
2025-05-10 18:27:29 +01:00
locs = slices.DeleteFunc(locs, func(l string) bool {
return slices.Contains(update.RemoveLocations, l)
})
locs = slices.Compact(locs)
2025-05-10 13:04:39 +01:00
_, err = state.db.Exec(
"UPDATE subscriptions SET subscription_json = ?, locations = ? WHERE id = ?",
string(j), strings.Join(locs, ","), id,
2025-05-10 13:04:39 +01:00
)
if err != nil {
return nil, err
}
reg := &registeredSubscription{
2025-05-10 13:04:39 +01:00
ID: id,
Subscription: &update.Subscription,
Locations: locs,
}
state.subscriptionsMutex.Lock()
for _, l := range update.Locations {
state.subscriptions[l] = append(state.subscriptions[l], reg)
}
2025-05-10 18:27:29 +01:00
for _, l := range update.RemoveLocations {
state.subscriptions[l] = slices.DeleteFunc(state.subscriptions[l], func(s *registeredSubscription) bool {
return s.ID == reg.ID
})
}
state.subscriptionsMutex.Unlock()
return reg, nil
2025-05-10 13:04:39 +01:00
}
func registerSubscription(state *state, sub *updateSubscription) (*registeredSubscription, error) {
j, err := json.Marshal(sub.Subscription)
if err != nil {
2025-05-11 11:46:59 +01:00
return nil, fmt.Errorf("invalid web push subscription object: %w", err)
2025-05-10 13:04:39 +01:00
}
id, err := uuid.NewV7()
if err != nil {
2025-05-11 11:46:59 +01:00
return nil, fmt.Errorf("unable to generate id for subscription: %w", err)
2025-05-10 13:04:39 +01:00
}
2025-05-10 18:27:29 +01:00
locs := slices.Compact(sub.Locations)
2025-05-10 13:04:39 +01:00
_, err = state.db.Exec(
"INSERT INTO subscriptions (id, locations, subscription_json) VALUES (?, ?, ?);",
2025-05-10 18:27:29 +01:00
id, strings.Join(locs, ","), string(j),
2025-05-10 13:04:39 +01:00
)
if err != nil {
2025-05-11 11:46:59 +01:00
return nil, fmt.Errorf("unable to insert into subscriptions table: %w", err)
2025-05-10 13:04:39 +01:00
}
reg := registeredSubscription{
ID: id,
Subscription: &sub.Subscription,
2025-05-10 18:27:29 +01:00
Locations: locs,
2025-05-10 13:04:39 +01:00
}
state.subscriptionsMutex.Lock()
2025-05-10 13:04:39 +01:00
for _, l := range sub.Locations {
state.subscriptions[l] = append(state.subscriptions[l], &reg)
2025-05-10 13:04:39 +01:00
}
state.subscriptionsMutex.Unlock()
2025-05-10 13:04:39 +01:00
return &reg, nil
2025-05-10 00:36:38 +01:00
}
2025-05-10 14:58:41 +01:00
func deleteSubscription(state *state, regID uuid.UUID) error {
_, err := state.db.Exec("DELETE FROM subscriptions WHERE id = ?", regID)
return err
}
2025-05-12 18:58:31 +01:00
func loadTimeZones() error {
for locKey, loc := range supportedLocations {
tz, err := time.LoadLocation(loc.ianaName)
if err != nil {
return fmt.Errorf("failed to load time zone for %v: %w", locKey, err)
}
loc.tz = tz
}
return nil
}
func fetchInitialSummaries(state *state) {
var wg sync.WaitGroup
for locKey, loc := range supportedLocations {
wg.Add(1)
go func() {
2025-05-12 19:07:19 +01:00
ctx, cancel := context.WithCancel(state.ctx)
defer cancel()
2025-05-12 18:58:31 +01:00
defer wg.Done()
summary := ""
2025-05-12 19:07:19 +01:00
rows, err := state.db.QueryContext(ctx, "SELECT summary FROM summaries WHERE location = ?", locKey)
2025-05-12 18:58:31 +01:00
if err != nil && !errors.Is(err, sql.ErrNoRows) {
slog.Warn("unable to get cached weather summary", "location", locKey, "error", err)
} else if err == nil {
defer rows.Close()
ok := rows.Next()
if ok {
err = rows.Scan(&summary)
if err != nil {
slog.Warn("unable to get cached weather summary", "location", locKey, "error", err)
}
2025-05-12 18:58:31 +01:00
}
}
if summary == "" {
updateSummary(state.ctx, state, updateSummaryOptions{
locKey: locKey,
location: loc,
pushUpdate: false,
})
} else {
state.summaries.Store(locKey, summary)
}
}()
}
wg.Wait()
}
func updateSummary(ctx context.Context, state *state, opts updateSummaryOptions) {
locKey := opts.locKey
loc := opts.location
2025-05-11 11:46:59 +01:00
slog.Info("updating weather summary", "location", locKey)
2025-05-10 00:36:38 +01:00
2025-05-12 12:09:58 +01:00
today := time.Now().In(loc.tz)
2025-05-12 19:07:19 +01:00
req, err := http.NewRequestWithContext(ctx, "GET", fmt.Sprintf("https://api.met.no/weatherapi/locationforecast/2.0/compact?lat=%v&lon=%v", loc.lat, loc.lon), nil)
if err != nil {
slog.Error("failed to query weather data", "location", locKey, "error", err)
return
}
req.Header.Set("User-Agent", state.metAPIUserAgent)
2025-05-11 11:14:38 +01:00
2025-05-12 19:07:19 +01:00
resp, err := http.DefaultClient.Do(req)
if err != nil {
slog.Error("failed to query weather data", "location", locKey, "error", err)
return
}
2025-05-10 00:36:38 +01:00
2025-05-12 19:07:19 +01:00
defer resp.Body.Close()
2025-05-12 12:09:58 +01:00
2025-05-12 19:07:19 +01:00
data := metAPIData{}
err = json.NewDecoder(resp.Body).Decode(&data)
if err != nil {
slog.Error("failed to decode received weather data", "location", locKey, "error", err)
return
}
2025-05-12 12:09:58 +01:00
2025-05-12 19:07:19 +01:00
y, m, d := today.Date()
2025-05-12 12:09:58 +01:00
2025-05-12 19:07:19 +01:00
t := slices.DeleteFunc(data.Properties.TimeSeries, func(series map[string]any) bool {
if ts, ok := series["time"].(string); ok {
t, err := time.Parse(time.RFC3339, ts)
if err != nil {
return false
2025-05-12 12:09:58 +01:00
}
2025-05-12 19:07:19 +01:00
ty, tm, td := t.In(loc.tz).Date()
return !(y == ty && m == tm && d == td)
2025-05-10 22:44:47 +01:00
}
2025-05-12 19:07:19 +01:00
return false
})
2025-05-10 22:44:47 +01:00
2025-05-12 19:07:19 +01:00
b, err := json.Marshal(t)
if err != nil {
slog.Error("failed to marshal processed time series data", "location", locKey, "error", err)
return
2025-05-10 00:36:38 +01:00
}
2025-05-12 19:07:19 +01:00
weatherJSON := string(b)
2025-05-12 18:58:31 +01:00
result, err := state.genai.Models.GenerateContent(ctx, "gemini-2.0-flash", []*genai.Content{{
2025-05-10 00:36:38 +01:00
Parts: []*genai.Part{
2025-05-12 12:09:58 +01:00
{Text: fmt.Sprintf(prompt, today.Format("2006-02-01"), loc.displayName, loc.displayName)},
2025-05-10 22:44:47 +01:00
{Text: weatherJSON},
2025-05-10 00:36:38 +01:00
},
}}, nil)
if err != nil {
2025-05-11 11:46:59 +01:00
slog.Error("failed to generate weather summary", "location", locKey, "error", err)
2025-05-10 00:36:38 +01:00
return
}
summary := result.Text()
2025-05-12 18:58:31 +01:00
2025-05-13 00:52:16 +01:00
state.dbMutex.Lock()
_, err = state.db.ExecContext(ctx, "INSERT OR REPLACE INTO summaries (location, summary) VALUES (?, ?)", locKey, summary)
2025-05-12 18:58:31 +01:00
if err != nil {
slog.Warn("unable to cache generated weather summary to db", "location", locKey, "error", err)
}
2025-05-13 00:52:16 +01:00
state.dbMutex.Unlock()
2025-05-10 00:36:38 +01:00
state.summaries.Store(locKey, summary)
2025-05-12 18:58:31 +01:00
if opts.pushUpdate {
c := state.summaryChans[locKey]
if len(state.subscriptions[locKey]) > 0 {
c <- summary
}
2025-05-10 00:36:38 +01:00
}
2025-05-11 11:46:59 +01:00
slog.Info("updated weather summary", "location", locKey)
2025-05-10 00:36:38 +01:00
}
2025-05-10 13:04:39 +01:00
2025-05-12 19:07:19 +01:00
func listenForSummaryUpdates(state *state, locKey string, c <-chan string) {
2025-05-10 15:13:31 +01:00
opts := webpush.Options{
Subscriber: state.vapidSubject,
2025-05-10 15:13:31 +01:00
VAPIDPublicKey: state.vapidPublicKey,
VAPIDPrivateKey: state.vapidPrivateKey,
TTL: 30,
}
2025-05-10 13:04:39 +01:00
for {
select {
case summary := <-c:
2025-05-10 18:35:06 +01:00
payload := webpushNotificationPayload{
Summary: summary,
Location: locKey,
}
b, err := json.Marshal(&payload)
if err != nil {
2025-05-11 11:46:59 +01:00
slog.Error("failed to create web push notification payload", "location", locKey, "error", err)
2025-05-10 18:35:06 +01:00
continue
}
2025-05-11 11:46:59 +01:00
subs := state.subscriptions[locKey]
slog.Info("pushing weather summary to subscribers", "count", len(subs), "location", locKey)
2025-05-10 14:58:41 +01:00
var wg sync.WaitGroup
2025-05-11 11:46:59 +01:00
for _, sub := range subs {
2025-05-10 14:58:41 +01:00
wg.Add(1)
go func() {
defer wg.Done()
2025-05-10 18:35:06 +01:00
_, err := webpush.SendNotificationWithContext(state.ctx, b, sub.Subscription, &opts)
2025-05-10 14:58:41 +01:00
if err != nil {
2025-05-11 11:46:59 +01:00
slog.Warn("unable to send web push to subscription", "id", sub.ID, "location", locKey, "error", err)
2025-05-10 14:58:41 +01:00
}
}()
2025-05-10 13:04:39 +01:00
}
2025-05-10 14:58:41 +01:00
wg.Wait()
2025-05-11 11:46:59 +01:00
slog.Info("pushed weather summary to subscribers", "count", len(subs), "location", locKey)
2025-05-10 13:04:39 +01:00
case <-state.ctx.Done():
return
}
}
}