diff --git a/internal/api/server.go b/internal/api/server.go index 1e0f053..1a1b3d4 100644 --- a/internal/api/server.go +++ b/internal/api/server.go @@ -6,6 +6,7 @@ import ( "fmt" "os" "path" + "sync" "time" "connectrpc.com/connect" @@ -268,8 +269,14 @@ func (s *Server) Backup(ctx context.Context, req *connect.Request[types.StringVa if err != nil { return nil, fmt.Errorf("failed to get plan %q: %w", req.Msg.Value, err) } - s.orchestrator.ScheduleTask(orchestrator.NewOneoffBackupTask(s.orchestrator, plan, time.Now()), orchestrator.TaskPriorityInteractive) - return connect.NewResponse(&emptypb.Empty{}), nil + var wg sync.WaitGroup + wg.Add(1) + s.orchestrator.ScheduleTask(orchestrator.NewOneoffBackupTask(s.orchestrator, plan, time.Now()), orchestrator.TaskPriorityInteractive, func(e error) { + err = e + wg.Done() + }) + wg.Wait() + return connect.NewResponse(&emptypb.Empty{}), err } func (s *Server) Forget(ctx context.Context, req *connect.Request[types.StringValue]) (*connect.Response[emptypb.Empty], error) { @@ -279,11 +286,15 @@ func (s *Server) Forget(ctx context.Context, req *connect.Request[types.StringVa } at := time.Now() - - s.orchestrator.ScheduleTask(orchestrator.NewOneoffForgetTask(s.orchestrator, plan, "", at), orchestrator.TaskPriorityInteractive+orchestrator.TaskPriorityForget) + var wg sync.WaitGroup + wg.Add(1) + s.orchestrator.ScheduleTask(orchestrator.NewOneoffForgetTask(s.orchestrator, plan, "", at), orchestrator.TaskPriorityInteractive+orchestrator.TaskPriorityForget, func(e error) { + err = e + wg.Done() + }) s.orchestrator.ScheduleTask(orchestrator.NewOneoffIndexSnapshotsTask(s.orchestrator, plan.Repo, at), orchestrator.TaskPriorityInteractive+orchestrator.TaskPriorityIndexSnapshots) - - return connect.NewResponse(&emptypb.Empty{}), nil + wg.Wait() + return connect.NewResponse(&emptypb.Empty{}), err } func (s *Server) Prune(ctx context.Context, req *connect.Request[types.StringValue]) (*connect.Response[emptypb.Empty], error) { @@ -293,7 +304,13 @@ func (s *Server) Prune(ctx context.Context, req *connect.Request[types.StringVal } at := time.Now() - s.orchestrator.ScheduleTask(orchestrator.NewOneoffPruneTask(s.orchestrator, plan, "", at, true), orchestrator.TaskPriorityInteractive+orchestrator.TaskPriorityPrune) + var wg sync.WaitGroup + wg.Add(1) + s.orchestrator.ScheduleTask(orchestrator.NewOneoffPruneTask(s.orchestrator, plan, "", at, true), orchestrator.TaskPriorityInteractive+orchestrator.TaskPriorityPrune, func(e error) { + err = e + wg.Done() + }) + wg.Wait() return connect.NewResponse(&emptypb.Empty{}), nil } diff --git a/internal/orchestrator/orchestrator.go b/internal/orchestrator/orchestrator.go index 97a2884..ee72c90 100644 --- a/internal/orchestrator/orchestrator.go +++ b/internal/orchestrator/orchestrator.go @@ -230,14 +230,18 @@ func (o *Orchestrator) Run(mainCtx context.Context) { } start := time.Now() - if err := t.task.Run(taskCtx); err != nil { - zap.L().Error("task failed", zap.String("task", t.task.Name()), zap.Error(err)) + err := t.task.Run(taskCtx) + if err != nil { + zap.L().Error("task failed", zap.String("task", t.task.Name()), zap.Error(err), zap.Duration("duration", time.Since(start))) } else { zap.L().Info("task finished", zap.String("task", t.task.Name()), zap.Duration("duration", time.Since(start))) } - o.runningTask.Store(nil) + for _, cb := range t.callbacks { + cb(err) + } + if nextTime := t.task.Next(o.curTime()); nextTime != nil { o.taskQueue.Push(scheduledTask{ task: t.task, @@ -247,16 +251,17 @@ func (o *Orchestrator) Run(mainCtx context.Context) { } } -func (o *Orchestrator) ScheduleTask(t Task, priority int) { +func (o *Orchestrator) ScheduleTask(t Task, priority int, callbacks ...func(error)) { nextRun := t.Next(o.curTime()) if nextRun == nil { return } zap.L().Info("scheduling task", zap.String("task", t.Name()), zap.String("runAt", nextRun.Format(time.RFC3339))) o.taskQueue.Push(scheduledTask{ - task: t, - runAt: *nextRun, - priority: priority, + task: t, + runAt: *nextRun, + priority: priority, + callbacks: callbacks, }) } diff --git a/internal/orchestrator/scheduledtaskheap.go b/internal/orchestrator/scheduledtaskheap.go index 405f884..b8f6bd8 100644 --- a/internal/orchestrator/scheduledtaskheap.go +++ b/internal/orchestrator/scheduledtaskheap.go @@ -154,9 +154,10 @@ func (t *taskQueue) Dequeue(ctx context.Context) *scheduledTask { } type scheduledTask struct { - task Task - runAt time.Time - priority int + task Task + runAt time.Time + priority int + callbacks []func(error) } type scheduledTaskHeap struct { diff --git a/webui/src/views/PlanView.tsx b/webui/src/views/PlanView.tsx index b04c46a..eccd96c 100644 --- a/webui/src/views/PlanView.tsx +++ b/webui/src/views/PlanView.tsx @@ -24,7 +24,7 @@ export const PlanView = ({ plan }: React.PropsWithChildren<{ plan: Plan }>) => { const handleBackupNow = async () => { try { - backrestService.backup({ value: plan.id }); + await backrestService.backup({ value: plan.id }); alertsApi.success("Backup scheduled."); } catch (e: any) { alertsApi.error("Failed to schedule backup: " + e.message); diff --git a/webui/src/views/RepoView.tsx b/webui/src/views/RepoView.tsx index b94ee65..eb5123f 100644 --- a/webui/src/views/RepoView.tsx +++ b/webui/src/views/RepoView.tsx @@ -67,7 +67,7 @@ export const RepoView = ({ repo }: React.PropsWithChildren<{ repo: Repo }>) => { <>