fix subsequent update subs not sending notif
This commit is contained in:
49
main.go
49
main.go
@@ -76,7 +76,7 @@ type state struct {
|
|||||||
|
|
||||||
// subscriptions maps location keys to the list of registered subscriptions
|
// subscriptions maps location keys to the list of registered subscriptions
|
||||||
// that are subscribed to updates for the location
|
// that are subscribed to updates for the location
|
||||||
subscriptions map[string][]registeredSubscription
|
subscriptions map[string][]*registeredSubscription
|
||||||
// subscriptionsMutex syncs writes to subscriptions
|
// subscriptionsMutex syncs writes to subscriptions
|
||||||
subscriptionsMutex sync.Mutex
|
subscriptionsMutex sync.Mutex
|
||||||
|
|
||||||
@@ -208,7 +208,7 @@ func main() {
|
|||||||
summaryChans: map[string]chan string{},
|
summaryChans: map[string]chan string{},
|
||||||
genai: genaiClient,
|
genai: genaiClient,
|
||||||
|
|
||||||
subscriptions: map[string][]registeredSubscription{},
|
subscriptions: map[string][]*registeredSubscription{},
|
||||||
|
|
||||||
vapidPublicKey: os.Getenv("VAPID_PUBLIC_KEY_BASE64"),
|
vapidPublicKey: os.Getenv("VAPID_PUBLIC_KEY_BASE64"),
|
||||||
vapidPrivateKey: os.Getenv("VAPID_PRIVATE_KEY_BASE64"),
|
vapidPrivateKey: os.Getenv("VAPID_PRIVATE_KEY_BASE64"),
|
||||||
@@ -240,7 +240,7 @@ func main() {
|
|||||||
schedulers = append(schedulers, s)
|
schedulers = append(schedulers, s)
|
||||||
c := make(chan string)
|
c := make(chan string)
|
||||||
|
|
||||||
state.subscriptions[locKey] = []registeredSubscription{}
|
state.subscriptions[locKey] = []*registeredSubscription{}
|
||||||
state.summaryChans[locKey] = c
|
state.summaryChans[locKey] = c
|
||||||
|
|
||||||
// listen for summary updates, and publish updates to all update subscribers via web push
|
// listen for summary updates, and publish updates to all update subscribers via web push
|
||||||
@@ -434,6 +434,7 @@ func loadSubscriptions(state *state) error {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
defer rows.Close()
|
||||||
|
|
||||||
for rows.Next() {
|
for rows.Next() {
|
||||||
var id string
|
var id string
|
||||||
@@ -451,7 +452,7 @@ func loadSubscriptions(state *state) error {
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
reg := registeredSubscription{
|
reg := ®isteredSubscription{
|
||||||
ID: uuid.MustParse(id),
|
ID: uuid.MustParse(id),
|
||||||
Locations: strings.Split(locations, ","),
|
Locations: strings.Split(locations, ","),
|
||||||
Subscription: &s,
|
Subscription: &s,
|
||||||
@@ -471,19 +472,45 @@ func updateRegisteredSubscription(state *state, id uuid.UUID, update *updateSubs
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
rows, err := state.db.Query("SELECT locations FROM subscriptions WHERE id = ?", id)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
rows.Next()
|
||||||
|
|
||||||
|
var locStr string
|
||||||
|
err = rows.Scan(&locStr)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
rows.Close()
|
||||||
|
|
||||||
|
locs := strings.Split(locStr, ",")
|
||||||
|
locs = append(locs, update.Locations...)
|
||||||
|
|
||||||
_, err = state.db.Exec(
|
_, err = state.db.Exec(
|
||||||
"UPDATE subscriptions SET subscription_json = ?, locations = ? WHERE id = ?",
|
"UPDATE subscriptions SET subscription_json = ?, locations = ? WHERE id = ?",
|
||||||
string(j), strings.Join(update.Locations, ","), id,
|
string(j), strings.Join(locs, ","), id,
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return ®isteredSubscription{
|
reg := ®isteredSubscription{
|
||||||
ID: id,
|
ID: id,
|
||||||
Subscription: &update.Subscription,
|
Subscription: &update.Subscription,
|
||||||
Locations: update.Locations,
|
Locations: locs,
|
||||||
}, nil
|
}
|
||||||
|
|
||||||
|
state.subscriptionsMutex.Lock()
|
||||||
|
for _, l := range update.Locations {
|
||||||
|
state.subscriptions[l] = append(state.subscriptions[l], reg)
|
||||||
|
}
|
||||||
|
state.subscriptionsMutex.Unlock()
|
||||||
|
|
||||||
|
return reg, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func registerSubscription(state *state, sub *updateSubscription) (*registeredSubscription, error) {
|
func registerSubscription(state *state, sub *updateSubscription) (*registeredSubscription, error) {
|
||||||
@@ -511,11 +538,11 @@ func registerSubscription(state *state, sub *updateSubscription) (*registeredSub
|
|||||||
Locations: sub.Locations,
|
Locations: sub.Locations,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
state.subscriptionsMutex.Lock()
|
||||||
for _, l := range sub.Locations {
|
for _, l := range sub.Locations {
|
||||||
state.subscriptionsMutex.Lock()
|
state.subscriptions[l] = append(state.subscriptions[l], ®)
|
||||||
state.subscriptions[l] = append(state.subscriptions[l], reg)
|
|
||||||
state.subscriptionsMutex.Unlock()
|
|
||||||
}
|
}
|
||||||
|
state.subscriptionsMutex.Unlock()
|
||||||
|
|
||||||
return ®, nil
|
return ®, nil
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user