Skip to content

Commit

Permalink
feat: implement data retention mechanism
Browse files Browse the repository at this point in the history
  • Loading branch information
muety committed Dec 1, 2022
1 parent 2db065d commit 5ae7527
Show file tree
Hide file tree
Showing 14 changed files with 172 additions and 19 deletions.
2 changes: 2 additions & 0 deletions config.default.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@ app:
aggregation_time: '0 15 2 * * *' # time at which to run daily aggregation batch jobs
leaderboard_generation_time: '0 0 6 * * *,0 0 18 * * *' # times at which to re-calculate the leaderboard
report_time_weekly: '0 0 18 * * 5' # time at which to fan out weekly reports (extended cron)
data_cleanup_time: '0 0 6 * * 7' # time at which to run old data cleanup (if enabled through data_retention_months)
inactive_days: 7 # time of previous days within a user must have logged in to be considered active
import_batch_size: 50 # maximum number of heartbeats to insert into the database within one transaction
heartbeat_max_age: '4320h' # maximum acceptable age of a heartbeat (see https://pkg.go.dev/time#ParseDuration)
data_retention_months: -1 # maximum retention period on months for user data (heartbeats) (-1 for infinity)
custom_languages:
vue: Vue
jsx: JSX
Expand Down
14 changes: 11 additions & 3 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,14 +68,16 @@ var cFlag = flag.String("config", defaultConfigPath, "config file location")
var env string

type appConfig struct {
AggregationTime string `yaml:"aggregation_time" default:"02:15" env:"WAKAPI_AGGREGATION_TIME"`
LeaderboardGenerationTime string `yaml:"leaderboard_generation_time" default:"06:00;18:00" env:"WAKAPI_LEADERBOARD_GENERATION_TIME"`
ReportTimeWeekly string `yaml:"report_time_weekly" default:"fri,18:00" env:"WAKAPI_REPORT_TIME_WEEKLY"`
AggregationTime string `yaml:"aggregation_time" default:"0 15 2 * * *" env:"WAKAPI_AGGREGATION_TIME"`
LeaderboardGenerationTime string `yaml:"leaderboard_generation_time" default:"0 0 6 * * *,0 0 18 * * *" env:"WAKAPI_LEADERBOARD_GENERATION_TIME"`
ReportTimeWeekly string `yaml:"report_time_weekly" default:"0 0 18 * * 5" env:"WAKAPI_REPORT_TIME_WEEKLY"`
DataCleanupTime string `yaml:"data_cleanup_time" default:"0 0 6 * * 7" env:"WAKAPI_DATA_CLEANUP_TIME"`
ImportBackoffMin int `yaml:"import_backoff_min" default:"5" env:"WAKAPI_IMPORT_BACKOFF_MIN"`
ImportBatchSize int `yaml:"import_batch_size" default:"50" env:"WAKAPI_IMPORT_BATCH_SIZE"`
InactiveDays int `yaml:"inactive_days" default:"7" env:"WAKAPI_INACTIVE_DAYS"`
HeartbeatMaxAge string `yaml:"heartbeat_max_age" default:"4320h" env:"WAKAPI_HEARTBEAT_MAX_AGE"`
CountCacheTTLMin int `yaml:"count_cache_ttl_min" default:"30" env:"WAKAPI_COUNT_CACHE_TTL_MIN"`
DataRetentionMonths int `yaml:"data_retention_months" default:"-1" env:"WAKAPI_DATA_RETENTION_MONTHS"`
AvatarURLTemplate string `yaml:"avatar_url_template" default:"api/avatar/{username_hash}.svg" env:"WAKAPI_AVATAR_URL_TEMPLATE"`
CustomLanguages map[string]string `yaml:"custom_languages"`
Colors map[string]map[string]string `yaml:"-"`
Expand Down Expand Up @@ -442,6 +444,12 @@ func Load(version string) *Config {
initSentry(config.Sentry, config.IsDev())
}

if config.App.DataRetentionMonths <= 0 {
logbuch.Info("disabling data retention policy, keeping data forever")
} else {
logbuch.Info("data retention policy set to keep data for %d months at max", config.App.DataRetentionMonths)
}

// some validation checks
if config.Server.ListenIpV4 == "" && config.Server.ListenIpV6 == "" && config.Server.ListenSocket == "" {
logbuch.Fatal("either of listen_ipv4 or listen_ipv6 or listen_socket must be set")
Expand Down
20 changes: 15 additions & 5 deletions config/jobqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,11 @@ var jobQueues map[string]*artifex.Dispatcher
var jobCounts map[string]int

const (
QueueDefault = "wakapi.default"
QueueProcessing = "wakapi.processing"
QueueReports = "wakapi.reports"
QueueImports = "wakapi.imports"
QueueDefault = "wakapi.default"
QueueProcessing = "wakapi.processing"
QueueReports = "wakapi.reports"
QueueImports = "wakapi.imports"
QueueHousekeeping = "wakapi.housekeeping"
)

type JobQueueMetrics struct {
Expand All @@ -28,9 +29,10 @@ func init() {
jobQueues = make(map[string]*artifex.Dispatcher)

InitQueue(QueueDefault, 1)
InitQueue(QueueProcessing, int(math.Ceil(float64(runtime.NumCPU())/2.0)))
InitQueue(QueueProcessing, halfCPUs())
InitQueue(QueueReports, 1)
InitQueue(QueueImports, 1)
InitQueue(QueueHousekeeping, halfCPUs())
}

func InitQueue(name string, workers int) error {
Expand Down Expand Up @@ -71,3 +73,11 @@ func CloseQueues() {
q.Stop()
}
}

func allCPUs() int {
return runtime.NumCPU()
}

func halfCPUs() int {
return int(math.Ceil(float64(runtime.NumCPU()) / 2.0))
}
3 changes: 3 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ var (
keyValueService services.IKeyValueService
reportService services.IReportService
diagnosticsService services.IDiagnosticsService
housekeepingService services.IHousekeepingService
miscService services.IMiscService
)

Expand Down Expand Up @@ -182,12 +183,14 @@ func main() {
keyValueService = services.NewKeyValueService(keyValueRepository)
reportService = services.NewReportService(summaryService, userService, mailService)
diagnosticsService = services.NewDiagnosticsService(diagnosticsRepository)
housekeepingService = services.NewHousekeepingService(userService, heartbeatService, summaryService)
miscService = services.NewMiscService(userService, summaryService, keyValueService)

// Schedule background tasks
go aggregationService.Schedule()
go leaderboardService.Schedule()
go reportService.Schedule()
go housekeepingService.Schedule()
go miscService.ScheduleCountTotalTime()

routes.Init()
Expand Down
13 changes: 9 additions & 4 deletions mocks/heartbeat_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,13 @@ type HeartbeatServiceMock struct {
mock.Mock
}

func (m *HeartbeatServiceMock) Insert(heartbeat *models.Heartbeat) error {
args := m.Called(heartbeat)
func (m *HeartbeatServiceMock) Insert(h *models.Heartbeat) error {
args := m.Called(h)
return args.Error(0)
}

func (m *HeartbeatServiceMock) InsertBatch(heartbeats []*models.Heartbeat) error {
args := m.Called(heartbeats)
func (m *HeartbeatServiceMock) InsertBatch(h []*models.Heartbeat) error {
args := m.Called(h)
return args.Error(0)
}

Expand Down Expand Up @@ -74,3 +74,8 @@ func (m *HeartbeatServiceMock) DeleteByUser(u *models.User) error {
args := m.Called(u)
return args.Error(0)
}

func (m *HeartbeatServiceMock) DeleteByUserBefore(u *models.User, t time.Time) error {
args := m.Called(u, t)
return args.Error(0)
}
13 changes: 9 additions & 4 deletions mocks/summary_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ type SummaryRepositoryMock struct {
mock.Mock
}

func (m *SummaryRepositoryMock) Insert(summary *models.Summary) error {
args := m.Called(summary)
func (m *SummaryRepositoryMock) Insert(s *models.Summary) error {
args := m.Called(s)
return args.Error(0)
}

Expand All @@ -20,8 +20,8 @@ func (m *SummaryRepositoryMock) GetAll() ([]*models.Summary, error) {
return args.Get(0).([]*models.Summary), args.Error(1)
}

func (m *SummaryRepositoryMock) GetByUserWithin(user *models.User, time time.Time, time2 time.Time) ([]*models.Summary, error) {
args := m.Called(user, time, time2)
func (m *SummaryRepositoryMock) GetByUserWithin(u *models.User, t1 time.Time, t2 time.Time) ([]*models.Summary, error) {
args := m.Called(u, t1, t2)
return args.Get(0).([]*models.Summary), args.Error(1)
}

Expand All @@ -34,3 +34,8 @@ func (m *SummaryRepositoryMock) DeleteByUser(s string) error {
args := m.Called(s)
return args.Error(0)
}

func (m *SummaryRepositoryMock) DeleteByUserBefore(s string, t time.Time) error {
args := m.Called(s, t)
return args.Error(0)
}
12 changes: 11 additions & 1 deletion repositories/heartbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ func (r *HeartbeatRepository) CountByUsers(users []*models.User) ([]*models.Coun
return counts, nil
}

func (r HeartbeatRepository) GetEntitySetByUser(entityType uint8, user *models.User) ([]string, error) {
func (r *HeartbeatRepository) GetEntitySetByUser(entityType uint8, user *models.User) ([]string, error) {
var results []string
if err := r.db.
Model(&models.Heartbeat{}).
Expand Down Expand Up @@ -199,3 +199,13 @@ func (r *HeartbeatRepository) DeleteByUser(user *models.User) error {
}
return nil
}

func (r *HeartbeatRepository) DeleteByUserBefore(user *models.User, t time.Time) error {
if err := r.db.
Where("user_id = ?", user.ID).
Where("time <= ?", t.Local()).
Delete(models.Heartbeat{}).Error; err != nil {
return err
}
return nil
}
2 changes: 2 additions & 0 deletions repositories/repositories.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type IHeartbeatRepository interface {
GetEntitySetByUser(uint8, *models.User) ([]string, error)
DeleteBefore(time.Time) error
DeleteByUser(*models.User) error
DeleteByUserBefore(*models.User, time.Time) error
}

type IDiagnosticsRepository interface {
Expand Down Expand Up @@ -66,6 +67,7 @@ type ISummaryRepository interface {
GetByUserWithin(*models.User, time.Time, time.Time) ([]*models.Summary, error)
GetLastByUser() ([]*models.TimeByUser, error)
DeleteByUser(string) error
DeleteByUserBefore(string, time.Time) error
}

type IUserRepository interface {
Expand Down
10 changes: 10 additions & 0 deletions repositories/summary.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,16 @@ func (r *SummaryRepository) DeleteByUser(userId string) error {
return nil
}

func (r *SummaryRepository) DeleteByUserBefore(userId string, t time.Time) error {
if err := r.db.
Where("user_id = ?", userId).
Where("to_time <= ?", t.Local()).
Delete(models.Summary{}).Error; err != nil {
return err
}
return nil
}

// inplace
func (r *SummaryRepository) populateItems(summaries []*models.Summary, conditions []clause.Interface) error {
var items []*models.SummaryItem
Expand Down
4 changes: 2 additions & 2 deletions routes/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -657,12 +657,12 @@ func (h *SettingsHandler) validateWakatimeKey(apiKey string, baseUrl string) boo
func (h *SettingsHandler) regenerateSummaries(user *models.User) error {
logbuch.Info("clearing summaries for user '%s'", user.ID)
if err := h.summarySrvc.DeleteByUser(user.ID); err != nil {
logbuch.Error("failed to clear summaries: %v", err)
conf.Log().Error("failed to clear summaries: %v", err)
return err
}

if err := h.aggregationSrvc.AggregateSummaries(datastructure.NewSet(user.ID)); err != nil {
logbuch.Error("failed to regenerate summaries: %v", err)
conf.Log().Error("failed to regenerate summaries: %v", err)
return err
}

Expand Down
5 changes: 5 additions & 0 deletions services/heartbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,11 @@ func (srv *HeartbeatService) DeleteByUser(user *models.User) error {
return srv.repository.DeleteByUser(user)
}

func (srv *HeartbeatService) DeleteByUserBefore(user *models.User, t time.Time) error {
go srv.cache.Flush()
return srv.repository.DeleteByUserBefore(user, t)
}

func (srv *HeartbeatService) augmented(heartbeats []*models.Heartbeat, userId string) ([]*models.Heartbeat, error) {
languageMapping, err := srv.languageMappingSrvc.ResolveByUser(userId)
if err != nil {
Expand Down
81 changes: 81 additions & 0 deletions services/housekeeping.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package services

import (
"github.com/emvi/logbuch"
"github.com/muety/artifex/v2"
"github.com/muety/wakapi/config"
"github.com/muety/wakapi/models"
"time"
)

type HousekeepingService struct {
config *config.Config
userSrvc IUserService
heartbeatSrvc IHeartbeatService
summarySrvc ISummaryService
queueDefault *artifex.Dispatcher
queueWorkers *artifex.Dispatcher
}

func NewHousekeepingService(userService IUserService, heartbeatService IHeartbeatService, summaryService ISummaryService) *HousekeepingService {
return &HousekeepingService{
config: config.Get(),
userSrvc: userService,
heartbeatSrvc: heartbeatService,
summarySrvc: summaryService,
queueDefault: config.GetDefaultQueue(),
queueWorkers: config.GetQueue(config.QueueHousekeeping),
}
}

func (s *HousekeepingService) Schedule() {
if s.config.App.DataRetentionMonths <= 0 {
return
}

logbuch.Info("scheduling data cleanup")

// this is not exactly precise, because of summer / winter time, etc.
retentionDuration := time.Now().Sub(time.Now().AddDate(0, -s.config.App.DataRetentionMonths, 0))

_, err := s.queueDefault.DispatchCron(func() {
// fetch all users
users, err := s.userSrvc.GetAll()
if err != nil {
config.Log().Error("failed to get users for data cleanup, %v", err)
return
}

// schedule jobs
for _, u := range users {
user := *u
s.queueWorkers.Dispatch(func() {
if err := s.ClearOldUserData(&user, retentionDuration); err != nil {
config.Log().Error("failed to clear old user data for '%s'", user.ID)
}
})
}
}, s.config.App.DataCleanupTime)

if err != nil {
config.Log().Error("failed to dispatch data cleanup jobs, %v", err)
}
}

func (s *HousekeepingService) ClearOldUserData(user *models.User, maxAge time.Duration) error {
before := time.Now().Add(-maxAge)
logbuch.Warn("cleaning up user data for '%s' older than %v", user.ID, before)

// clear old heartbeats
if err := s.heartbeatSrvc.DeleteByUserBefore(user, before); err != nil {
return err
}

// clear old summaries
logbuch.Info("clearing summaries for user '%s' older than %v", user.ID, before)
if err := s.summarySrvc.DeleteByUserBefore(user.ID, before); err != nil {
return err
}

return nil
}
7 changes: 7 additions & 0 deletions services/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ type IHeartbeatService interface {
GetEntitySetByUser(uint8, *models.User) ([]string, error)
DeleteBefore(time.Time) error
DeleteByUser(*models.User) error
DeleteByUserBefore(*models.User, time.Time) error
}

type IDiagnosticsService interface {
Expand Down Expand Up @@ -89,6 +90,7 @@ type ISummaryService interface {
Summarize(time.Time, time.Time, *models.User, *models.Filters) (*models.Summary, error)
GetLatestByUser() ([]*models.TimeByUser, error)
DeleteByUser(string) error
DeleteByUserBefore(string, time.Time) error
Insert(*models.Summary) error
}

Expand All @@ -97,6 +99,11 @@ type IReportService interface {
SendReport(*models.User, time.Duration) error
}

type IHousekeepingService interface {
Schedule()
ClearOldUserData(*models.User, time.Duration) error
}

type ILeaderboardService interface {
Schedule()
ComputeLeaderboard([]*models.User, *models.IntervalKey, []uint8) error
Expand Down
5 changes: 5 additions & 0 deletions services/summary.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,11 @@ func (srv *SummaryService) DeleteByUser(userId string) error {
return srv.repository.DeleteByUser(userId)
}

func (srv *SummaryService) DeleteByUserBefore(userId string, t time.Time) error {
srv.invalidateUserCache(userId)
return srv.repository.DeleteByUserBefore(userId, t)
}

func (srv *SummaryService) Insert(summary *models.Summary) error {
srv.invalidateUserCache(summary.UserID)
return srv.repository.Insert(summary)
Expand Down

0 comments on commit 5ae7527

Please sign in to comment.