Skip to content

Commit

Permalink
fix: UI buttons spin while waiting for tasks to complete
Browse files Browse the repository at this point in the history
  • Loading branch information
garethgeorge committed Dec 31, 2023
1 parent 85f70e6 commit c767fa7
Show file tree
Hide file tree
Showing 5 changed files with 42 additions and 19 deletions.
31 changes: 24 additions & 7 deletions internal/api/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"os"
"path"
"sync"
"time"

"connectrpc.com/connect"
Expand Down Expand Up @@ -268,8 +269,14 @@ func (s *Server) Backup(ctx context.Context, req *connect.Request[types.StringVa
if err != nil {
return nil, fmt.Errorf("failed to get plan %q: %w", req.Msg.Value, err)
}
s.orchestrator.ScheduleTask(orchestrator.NewOneoffBackupTask(s.orchestrator, plan, time.Now()), orchestrator.TaskPriorityInteractive)
return connect.NewResponse(&emptypb.Empty{}), nil
var wg sync.WaitGroup
wg.Add(1)
s.orchestrator.ScheduleTask(orchestrator.NewOneoffBackupTask(s.orchestrator, plan, time.Now()), orchestrator.TaskPriorityInteractive, func(e error) {
err = e
wg.Done()
})
wg.Wait()
return connect.NewResponse(&emptypb.Empty{}), err
}

func (s *Server) Forget(ctx context.Context, req *connect.Request[types.StringValue]) (*connect.Response[emptypb.Empty], error) {
Expand All @@ -279,11 +286,15 @@ func (s *Server) Forget(ctx context.Context, req *connect.Request[types.StringVa
}

at := time.Now()

s.orchestrator.ScheduleTask(orchestrator.NewOneoffForgetTask(s.orchestrator, plan, "", at), orchestrator.TaskPriorityInteractive+orchestrator.TaskPriorityForget)
var wg sync.WaitGroup
wg.Add(1)
s.orchestrator.ScheduleTask(orchestrator.NewOneoffForgetTask(s.orchestrator, plan, "", at), orchestrator.TaskPriorityInteractive+orchestrator.TaskPriorityForget, func(e error) {
err = e
wg.Done()
})
s.orchestrator.ScheduleTask(orchestrator.NewOneoffIndexSnapshotsTask(s.orchestrator, plan.Repo, at), orchestrator.TaskPriorityInteractive+orchestrator.TaskPriorityIndexSnapshots)

return connect.NewResponse(&emptypb.Empty{}), nil
wg.Wait()
return connect.NewResponse(&emptypb.Empty{}), err
}

func (s *Server) Prune(ctx context.Context, req *connect.Request[types.StringValue]) (*connect.Response[emptypb.Empty], error) {
Expand All @@ -293,7 +304,13 @@ func (s *Server) Prune(ctx context.Context, req *connect.Request[types.StringVal
}

at := time.Now()
s.orchestrator.ScheduleTask(orchestrator.NewOneoffPruneTask(s.orchestrator, plan, "", at, true), orchestrator.TaskPriorityInteractive+orchestrator.TaskPriorityPrune)
var wg sync.WaitGroup
wg.Add(1)
s.orchestrator.ScheduleTask(orchestrator.NewOneoffPruneTask(s.orchestrator, plan, "", at, true), orchestrator.TaskPriorityInteractive+orchestrator.TaskPriorityPrune, func(e error) {
err = e
wg.Done()
})
wg.Wait()

return connect.NewResponse(&emptypb.Empty{}), nil
}
Expand Down
19 changes: 12 additions & 7 deletions internal/orchestrator/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,14 +230,18 @@ func (o *Orchestrator) Run(mainCtx context.Context) {
}

start := time.Now()
if err := t.task.Run(taskCtx); err != nil {
zap.L().Error("task failed", zap.String("task", t.task.Name()), zap.Error(err))
err := t.task.Run(taskCtx)
if err != nil {
zap.L().Error("task failed", zap.String("task", t.task.Name()), zap.Error(err), zap.Duration("duration", time.Since(start)))
} else {
zap.L().Info("task finished", zap.String("task", t.task.Name()), zap.Duration("duration", time.Since(start)))
}

o.runningTask.Store(nil)

for _, cb := range t.callbacks {
cb(err)
}

if nextTime := t.task.Next(o.curTime()); nextTime != nil {
o.taskQueue.Push(scheduledTask{
task: t.task,
Expand All @@ -247,16 +251,17 @@ func (o *Orchestrator) Run(mainCtx context.Context) {
}
}

func (o *Orchestrator) ScheduleTask(t Task, priority int) {
func (o *Orchestrator) ScheduleTask(t Task, priority int, callbacks ...func(error)) {
nextRun := t.Next(o.curTime())
if nextRun == nil {
return
}
zap.L().Info("scheduling task", zap.String("task", t.Name()), zap.String("runAt", nextRun.Format(time.RFC3339)))
o.taskQueue.Push(scheduledTask{
task: t,
runAt: *nextRun,
priority: priority,
task: t,
runAt: *nextRun,
priority: priority,
callbacks: callbacks,
})
}

Expand Down
7 changes: 4 additions & 3 deletions internal/orchestrator/scheduledtaskheap.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,9 +154,10 @@ func (t *taskQueue) Dequeue(ctx context.Context) *scheduledTask {
}

type scheduledTask struct {
task Task
runAt time.Time
priority int
task Task
runAt time.Time
priority int
callbacks []func(error)
}

type scheduledTaskHeap struct {
Expand Down
2 changes: 1 addition & 1 deletion webui/src/views/PlanView.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ export const PlanView = ({ plan }: React.PropsWithChildren<{ plan: Plan }>) => {

const handleBackupNow = async () => {
try {
backrestService.backup({ value: plan.id });
await backrestService.backup({ value: plan.id });
alertsApi.success("Backup scheduled.");
} catch (e: any) {
alertsApi.error("Failed to schedule backup: " + e.message);
Expand Down
2 changes: 1 addition & 1 deletion webui/src/views/RepoView.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ export const RepoView = ({ repo }: React.PropsWithChildren<{ repo: Repo }>) => {
<>
<h3>Repo stats computed on {formatTime(Number(statsOperation.unixTimeStartMs))}</h3>
{statsOperation.op.case === "operationStats" && <StatsTable stats={statsOperation.op.value.stats!} />}
<small>Stats are refreshed periodically in the background as new data is added.</small>
<small>Stats are refreshed periodically in the background as new data is added (e.g. every 10GB added or every 50 operations).</small>
</>
}
</>
Expand Down

0 comments on commit c767fa7

Please sign in to comment.