-
-
Notifications
You must be signed in to change notification settings - Fork 178
/
Copy pathaggregation.go
205 lines (178 loc) · 6.2 KB
/
aggregation.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
package services
import (
"errors"
datastructure "github.com/duke-git/lancet/v2/datastructure/set"
"github.com/muety/artifex/v2"
"github.com/muety/wakapi/config"
"log/slog"
"sync"
"time"
"github.com/muety/wakapi/models"
)
const (
aggregateIntervalDays int = 1
)
var aggregationLock = sync.Mutex{}
type AggregationService struct {
config *config.Config
userService IUserService
summaryService ISummaryService
heartbeatService IHeartbeatService
inProgress datastructure.Set[string]
queueDefault *artifex.Dispatcher
queueWorkers *artifex.Dispatcher
}
func NewAggregationService(userService IUserService, summaryService ISummaryService, heartbeatService IHeartbeatService) *AggregationService {
return &AggregationService{
config: config.Get(),
userService: userService,
summaryService: summaryService,
heartbeatService: heartbeatService,
inProgress: datastructure.New[string](),
queueDefault: config.GetDefaultQueue(),
queueWorkers: config.GetQueue(config.QueueProcessing),
}
}
type AggregationJob struct {
User *models.User
From time.Time
To time.Time
}
// Schedule a job to (re-)generate summaries every day shortly after midnight
func (srv *AggregationService) Schedule() {
slog.Info("scheduling summary aggregation")
if _, err := srv.queueDefault.DispatchCron(func() {
if err := srv.AggregateSummaries(datastructure.New[string]()); err != nil {
config.Log().Error("failed to generate summaries", "error", err)
}
}, srv.config.App.GetAggregationTimeCron()); err != nil {
config.Log().Error("failed to schedule summary generation", "error", err)
}
}
func (srv *AggregationService) AggregateSummaries(userIds datastructure.Set[string]) error {
if err := srv.lockUsers(userIds); err != nil {
return err
}
defer srv.unlockUsers(userIds)
slog.Info("generating summaries")
// Get a map from user ids to the time of their latest summary or nil if none exists yet
lastUserSummaryTimes, err := srv.summaryService.GetLatestByUser() // TODO: build user-specific variant of this query for efficiency
if err != nil {
config.Log().Error("error occurred", "error", err)
return err
}
// Get a map from user ids to the time of their earliest heartbeats or nil if none exists yet
firstUserHeartbeatTimes, err := srv.heartbeatService.GetFirstByUsers() // TODO: build user-specific variant of this query for efficiency
if err != nil {
config.Log().Error("error occurred", "error", err)
return err
}
// Build actual lookup table from it
firstUserHeartbeatLookup := make(map[string]models.CustomTime)
for _, e := range firstUserHeartbeatTimes {
firstUserHeartbeatLookup[e.User] = e.Time
}
// Dispatch summary generation jobs
jobs := make(chan *AggregationJob)
defer close(jobs)
go func() {
for jobRef := range jobs {
job := *jobRef
if err := srv.queueWorkers.Dispatch(func() {
srv.process(job)
}); err != nil {
config.Log().Error("failed to dispatch summary generation job", "userID", job.User.ID)
}
}
}()
// Fetch complete user objects
var users map[string]*models.User
if userIds != nil && !userIds.IsEmpty() {
users, err = srv.userService.GetManyMapped(userIds.Values())
} else {
users, err = srv.userService.GetAllMapped()
}
if err != nil {
return err
}
// Generate summary aggregation jobs
for _, e := range lastUserSummaryTimes {
if userIds != nil && !userIds.IsEmpty() && !userIds.Contain(e.User) {
continue
}
u, _ := users[e.User]
if e.Time.Valid() {
// Case 1: User has aggregated summaries already
// -> Spawn jobs to create summaries from their latest aggregation to now
slog.Info("generating summary aggregation jobs for user", "user", u.ID, "from", e.Time.T())
generateUserJobs(u, e.Time.T(), jobs)
} else if t := firstUserHeartbeatLookup[e.User]; t.Valid() {
// Case 2: User has no aggregated summaries, yet, but has heartbeats
// -> Spawn jobs to create summaries from their first heartbeat to now
slog.Info("generating summary aggregation jobs for user", "user", u.ID, "from", t.T())
generateUserJobs(u, t.T(), jobs)
} else {
// Case 3: User doesn't have heartbeats at all
// -> Nothing to do
slog.Info("skipping summary aggregation because user has no heartbeats", "user", u.ID)
}
}
return nil
}
func (srv *AggregationService) process(job AggregationJob) {
if summary, err := srv.summaryService.Summarize(job.From, job.To, job.User, nil); err != nil {
config.Log().Error("failed to generate summary", "from", job.From, "to", job.To, "userID", job.User.ID, "error", err)
} else {
slog.Info("successfully generated summary", "from", job.From, "to", job.To, "userID", job.User.ID)
if err := srv.summaryService.Insert(summary); err != nil {
config.Log().Error("failed to save summary", "userID", summary.UserID, "fromTime", summary.FromTime, "toTime", summary.ToTime, "error", err)
}
}
}
func generateUserJobs(user *models.User, from time.Time, jobs chan<- *AggregationJob) {
var to time.Time
// Go to next day of either user's first heartbeat or latest aggregation
from = from.Add(-1 * time.Second)
from = time.Date(
from.Year(),
from.Month(),
from.Day()+aggregateIntervalDays,
0, 0, 0, 0,
from.Location(),
)
// Iteratively aggregate per-day summaries until end of yesterday is reached
end := getStartOfToday().Add(-1 * time.Second)
for from.Before(end) && to.Before(end) {
to = time.Date(
from.Year(),
from.Month(),
from.Day()+aggregateIntervalDays,
0, 0, 0, 0,
from.Location(),
)
jobs <- &AggregationJob{user, from, to}
from = to
}
}
func (srv *AggregationService) lockUsers(userIds datastructure.Set[string]) error {
aggregationLock.Lock()
defer aggregationLock.Unlock()
for uid := range userIds {
if srv.inProgress.Contain(uid) {
return errors.New("aggregation already in progress for at least of the request users")
}
}
srv.inProgress = srv.inProgress.Union(userIds)
return nil
}
func (srv *AggregationService) unlockUsers(userIds datastructure.Set[string]) {
aggregationLock.Lock()
defer aggregationLock.Unlock()
for uid := range userIds {
srv.inProgress.Delete(uid)
}
}
func getStartOfToday() time.Time {
now := time.Now()
return time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 1, now.Location())
}