package api import ( "context" "errors" "fmt" "os" "path" "sync" "time" "connectrpc.com/connect" "github.com/garethgeorge/backrest/gen/go/types" v1 "github.com/garethgeorge/backrest/gen/go/v1" "github.com/garethgeorge/backrest/gen/go/v1/v1connect" "github.com/garethgeorge/backrest/internal/config" "github.com/garethgeorge/backrest/internal/oplog" "github.com/garethgeorge/backrest/internal/oplog/indexutil" "github.com/garethgeorge/backrest/internal/orchestrator" "github.com/garethgeorge/backrest/internal/protoutil" "github.com/garethgeorge/backrest/internal/resticinstaller" "github.com/garethgeorge/backrest/internal/rotatinglog" "github.com/garethgeorge/backrest/pkg/restic" "go.uber.org/zap" "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/emptypb" ) type BackrestHandler struct { v1connect.UnimplementedBackrestHandler config config.ConfigStore orchestrator *orchestrator.Orchestrator oplog *oplog.OpLog logStore *rotatinglog.RotatingLog } var _ v1connect.BackrestHandler = &BackrestHandler{} func NewBackrestHandler(config config.ConfigStore, orchestrator *orchestrator.Orchestrator, oplog *oplog.OpLog, logStore *rotatinglog.RotatingLog) *BackrestHandler { s := &BackrestHandler{ config: config, orchestrator: orchestrator, oplog: oplog, logStore: logStore, } return s } // GetConfig implements GET /v1/config func (s *BackrestHandler) GetConfig(ctx context.Context, req *connect.Request[emptypb.Empty]) (*connect.Response[v1.Config], error) { config, err := s.config.Get() if err != nil { return nil, fmt.Errorf("failed to get config: %w", err) } return connect.NewResponse(config), nil } // SetConfig implements POST /v1/config func (s *BackrestHandler) SetConfig(ctx context.Context, req *connect.Request[v1.Config]) (*connect.Response[v1.Config], error) { existing, err := s.config.Get() if err != nil { return nil, fmt.Errorf("failed to check current config: %w", err) } // Compare and increment modno if existing.Modno != req.Msg.Modno { return nil, errors.New("config modno mismatch, reload and try again") } if err := config.ValidateConfig(req.Msg); err != nil { return nil, fmt.Errorf("validation error: %w", err) } req.Msg.Modno += 1 if err := s.config.Update(req.Msg); err != nil { return nil, fmt.Errorf("failed to update config: %w", err) } newConfig, err := s.config.Get() if err != nil { return nil, fmt.Errorf("failed to get newly set config: %w", err) } if err := s.orchestrator.ApplyConfig(newConfig); err != nil { return nil, fmt.Errorf("failed to apply config: %w", err) } return connect.NewResponse(newConfig), nil } // AddRepo implements POST /v1/config/repo, it includes validation that the repo can be initialized. func (s *BackrestHandler) AddRepo(ctx context.Context, req *connect.Request[v1.Repo]) (*connect.Response[v1.Config], error) { c, err := s.config.Get() if err != nil { return nil, fmt.Errorf("failed to get config: %w", err) } c = proto.Clone(c).(*v1.Config) c.Repos = append(c.Repos, req.Msg) if err := config.ValidateConfig(c); err != nil { return nil, fmt.Errorf("validation error: %w", err) } bin, err := resticinstaller.FindOrInstallResticBinary() if err != nil { return nil, fmt.Errorf("failed to find or install restic binary: %w", err) } r := restic.NewRepo(bin, req.Msg) // use background context such that the init op can try to complete even if the connection is closed. if err := r.Init(context.Background(), restic.WithPropagatedEnvVars(restic.EnvToPropagate...)); err != nil { return nil, fmt.Errorf("failed to init repo: %w", err) } zap.L().Debug("Updating config") if err := s.config.Update(c); err != nil { return nil, fmt.Errorf("failed to update config: %w", err) } zap.L().Debug("Applying config") s.orchestrator.ApplyConfig(c) // index snapshots for the newly added repository. zap.L().Debug("Scheduling index snapshots task") s.orchestrator.ScheduleTask(orchestrator.NewOneoffIndexSnapshotsTask(s.orchestrator, req.Msg.Id, time.Now()), orchestrator.TaskPriorityInteractive+orchestrator.TaskPriorityIndexSnapshots) zap.L().Debug("Done add repo") return connect.NewResponse(c), nil } // ListSnapshots implements POST /v1/snapshots func (s *BackrestHandler) ListSnapshots(ctx context.Context, req *connect.Request[v1.ListSnapshotsRequest]) (*connect.Response[v1.ResticSnapshotList], error) { query := req.Msg repo, err := s.orchestrator.GetRepo(query.RepoId) if err != nil { return nil, fmt.Errorf("failed to get repo: %w", err) } var snapshots []*restic.Snapshot if query.PlanId != "" { var plan *v1.Plan plan, err = s.orchestrator.GetPlan(query.PlanId) if err != nil { return nil, fmt.Errorf("failed to get plan %q: %w", query.PlanId, err) } snapshots, err = repo.SnapshotsForPlan(ctx, plan) } else { snapshots, err = repo.Snapshots(ctx) } if err != nil { return nil, fmt.Errorf("failed to list snapshots: %w", err) } // Transform the snapshots and return them. var rs []*v1.ResticSnapshot for _, snapshot := range snapshots { rs = append(rs, protoutil.SnapshotToProto(snapshot)) } return connect.NewResponse(&v1.ResticSnapshotList{ Snapshots: rs, }), nil } func (s *BackrestHandler) ListSnapshotFiles(ctx context.Context, req *connect.Request[v1.ListSnapshotFilesRequest]) (*connect.Response[v1.ListSnapshotFilesResponse], error) { query := req.Msg repo, err := s.orchestrator.GetRepo(query.RepoId) if err != nil { return nil, fmt.Errorf("failed to get repo: %w", err) } entries, err := repo.ListSnapshotFiles(ctx, query.SnapshotId, query.Path) if err != nil { return nil, fmt.Errorf("failed to list snapshot files: %w", err) } return connect.NewResponse(&v1.ListSnapshotFilesResponse{ Path: query.Path, Entries: entries, }), nil } // GetOperationEvents implements GET /v1/events/operations func (s *BackrestHandler) GetOperationEvents(ctx context.Context, req *connect.Request[emptypb.Empty], resp *connect.ServerStream[v1.OperationEvent]) error { errorChan := make(chan error) defer close(errorChan) callback := func(oldOp *v1.Operation, newOp *v1.Operation) { var event *v1.OperationEvent if oldOp == nil && newOp != nil { event = &v1.OperationEvent{ Type: v1.OperationEventType_EVENT_CREATED, Operation: newOp, } } else if oldOp != nil && newOp != nil { event = &v1.OperationEvent{ Type: v1.OperationEventType_EVENT_UPDATED, Operation: newOp, } } else if oldOp != nil && newOp == nil { event = &v1.OperationEvent{ Type: v1.OperationEventType_EVENT_DELETED, Operation: oldOp, } } else { zap.L().Error("Unknown event type") return } if err := resp.Send(event); err != nil { errorChan <- fmt.Errorf("failed to send event: %w", err) } } s.oplog.Subscribe(&callback) defer s.oplog.Unsubscribe(&callback) select { case <-ctx.Done(): return nil case err := <-errorChan: return err } } func (s *BackrestHandler) GetOperations(ctx context.Context, req *connect.Request[v1.GetOperationsRequest]) (*connect.Response[v1.OperationList], error) { idCollector := indexutil.CollectAll() if req.Msg.LastN != 0 { idCollector = indexutil.CollectLastN(int(req.Msg.LastN)) } var err error var ops []*v1.Operation opCollector := func(op *v1.Operation) error { ops = append(ops, op) return nil } if req.Msg.RepoId != "" && req.Msg.PlanId != "" { return nil, errors.New("cannot specify both repoId and planId") } else if req.Msg.PlanId != "" { err = s.oplog.ForEachByPlan(req.Msg.PlanId, idCollector, opCollector) } else if req.Msg.RepoId != "" { err = s.oplog.ForEachByRepo(req.Msg.RepoId, idCollector, opCollector) } else if req.Msg.SnapshotId != "" { err = s.oplog.ForEachBySnapshotId(req.Msg.SnapshotId, idCollector, opCollector) } else if len(req.Msg.Ids) > 0 { ops = make([]*v1.Operation, 0, len(req.Msg.Ids)) for i, id := range req.Msg.Ids { op, err := s.oplog.Get(id) if err != nil { return nil, fmt.Errorf("failed to get operation %d: %w", i, err) } ops = append(ops, op) } } else { err = s.oplog.ForAll(opCollector) } if err != nil { return nil, fmt.Errorf("failed to get operations: %w", err) } return connect.NewResponse(&v1.OperationList{ Operations: ops, }), nil } func (s *BackrestHandler) IndexSnapshots(ctx context.Context, req *connect.Request[types.StringValue]) (*connect.Response[emptypb.Empty], error) { _, err := s.orchestrator.GetRepo(req.Msg.Value) if err != nil { return nil, fmt.Errorf("failed to get repo %q: %w", req.Msg.Value, err) } s.orchestrator.ScheduleTask(orchestrator.NewOneoffIndexSnapshotsTask(s.orchestrator, req.Msg.Value, time.Now()), orchestrator.TaskPriorityInteractive+orchestrator.TaskPriorityIndexSnapshots) return connect.NewResponse(&emptypb.Empty{}), nil } func (s *BackrestHandler) Backup(ctx context.Context, req *connect.Request[types.StringValue]) (*connect.Response[emptypb.Empty], error) { plan, err := s.orchestrator.GetPlan(req.Msg.Value) if err != nil { return nil, fmt.Errorf("failed to get plan %q: %w", req.Msg.Value, err) } var wg sync.WaitGroup wg.Add(1) s.orchestrator.ScheduleTask(orchestrator.NewOneoffBackupTask(s.orchestrator, plan, time.Now()), orchestrator.TaskPriorityInteractive, func(e error) { err = e wg.Done() }) wg.Wait() return connect.NewResponse(&emptypb.Empty{}), err } func (s *BackrestHandler) Forget(ctx context.Context, req *connect.Request[v1.ForgetRequest]) (*connect.Response[emptypb.Empty], error) { at := time.Now() var err error if req.Msg.SnapshotId != "" && req.Msg.PlanId != "" && req.Msg.RepoId != "" { wait := make(chan struct{}) s.orchestrator.ScheduleTask( orchestrator.NewOneoffForgetSnapshotTask(s.orchestrator, req.Msg.RepoId, req.Msg.PlanId, req.Msg.SnapshotId, at), orchestrator.TaskPriorityInteractive+orchestrator.TaskPriorityForget, func(e error) { err = e close(wait) }) <-wait } else if req.Msg.RepoId != "" && req.Msg.PlanId != "" { plan, err := s.orchestrator.GetPlan(req.Msg.PlanId) if err != nil { return nil, fmt.Errorf("failed to get plan %q: %w", req.Msg.PlanId, err) } wait := make(chan struct{}) s.orchestrator.ScheduleTask( orchestrator.NewOneoffForgetTask(s.orchestrator, plan, "", at), orchestrator.TaskPriorityInteractive+orchestrator.TaskPriorityForget, func(e error) { err = e close(wait) }) <-wait } else { return nil, errors.New("must specify repoId and planId and (optionally) snapshotId") } if err != nil { return nil, err } return connect.NewResponse(&emptypb.Empty{}), nil } func (s *BackrestHandler) Prune(ctx context.Context, req *connect.Request[types.StringValue]) (*connect.Response[emptypb.Empty], error) { plan, err := s.orchestrator.GetPlan(req.Msg.Value) if err != nil { return nil, fmt.Errorf("failed to get plan %q: %w", req.Msg.Value, err) } at := time.Now() var wg sync.WaitGroup wg.Add(1) s.orchestrator.ScheduleTask(orchestrator.NewOneoffPruneTask(s.orchestrator, plan, at, true), orchestrator.TaskPriorityInteractive+orchestrator.TaskPriorityPrune, func(e error) { err = e wg.Done() }) wg.Wait() return connect.NewResponse(&emptypb.Empty{}), nil } func (s *BackrestHandler) Restore(ctx context.Context, req *connect.Request[v1.RestoreSnapshotRequest]) (*connect.Response[emptypb.Empty], error) { if req.Msg.Target == "" { req.Msg.Target = path.Join(os.Getenv("HOME"), "Downloads") } if req.Msg.Path == "" { req.Msg.Path = "/" } target := path.Join(req.Msg.Target, fmt.Sprintf("restic-restore-%v", time.Now().Format("2006-01-02T15-04-05"))) _, err := os.Stat(target) if !errors.Is(err, os.ErrNotExist) { return nil, fmt.Errorf("restore target dir %q already exists", req.Msg.Target) } at := time.Now() s.orchestrator.ScheduleTask(orchestrator.NewOneoffRestoreTask(s.orchestrator, orchestrator.RestoreTaskOpts{ RepoId: req.Msg.RepoId, PlanId: req.Msg.PlanId, SnapshotId: req.Msg.SnapshotId, Path: req.Msg.Path, Target: target, }, at), orchestrator.TaskPriorityInteractive+orchestrator.TaskPriorityDefault) return connect.NewResponse(&emptypb.Empty{}), nil } func (s *BackrestHandler) Unlock(ctx context.Context, req *connect.Request[types.StringValue]) (*connect.Response[emptypb.Empty], error) { repo, err := s.orchestrator.GetRepo(req.Msg.Value) if err != nil { return nil, fmt.Errorf("failed to get repo %q: %w", req.Msg.Value, err) } if err := repo.Unlock(context.Background()); err != nil { return nil, fmt.Errorf("failed to unlock repo %q: %w", req.Msg.Value, err) } return connect.NewResponse(&emptypb.Empty{}), nil } func (s *BackrestHandler) Cancel(ctx context.Context, req *connect.Request[types.Int64Value]) (*connect.Response[emptypb.Empty], error) { if err := s.orchestrator.CancelOperation(req.Msg.Value, v1.OperationStatus_STATUS_USER_CANCELLED); err != nil { return nil, err } return connect.NewResponse(&emptypb.Empty{}), nil } func (s *BackrestHandler) ClearHistory(ctx context.Context, req *connect.Request[v1.ClearHistoryRequest]) (*connect.Response[emptypb.Empty], error) { var err error var ids []int64 if len(req.Msg.Ops) != 0 { ids = append(ids, req.Msg.Ops...) } opCollector := func(op *v1.Operation) error { if !req.Msg.OnlyFailed || op.Status == v1.OperationStatus_STATUS_ERROR { ids = append(ids, op.Id) } return nil } if req.Msg.RepoId != "" && req.Msg.PlanId != "" { return nil, errors.New("cannot specify both repoId and planId") } else if req.Msg.PlanId != "" { err = s.oplog.ForEachByPlan(req.Msg.PlanId, indexutil.CollectAll(), opCollector) } else if req.Msg.RepoId != "" { err = s.oplog.ForEachByRepo(req.Msg.RepoId, indexutil.CollectAll(), opCollector) } if err != nil { return nil, fmt.Errorf("failed to get operations to delete: %w", err) } if err := s.oplog.Delete(ids...); err != nil { return nil, fmt.Errorf("failed to delete operations: %w", err) } return connect.NewResponse(&emptypb.Empty{}), err } func (s *BackrestHandler) GetLogs(ctx context.Context, req *connect.Request[v1.LogDataRequest]) (*connect.Response[types.BytesValue], error) { data, err := s.logStore.Read(req.Msg.GetRef()) if err != nil { if errors.Is(err, rotatinglog.ErrFileNotFound) { return connect.NewResponse(&types.BytesValue{ Value: []byte(fmt.Sprintf("file associated with log %v not found, it may have rotated out of the log history", req.Msg.GetRef())), }), nil } return nil, fmt.Errorf("get log data %v: %w", req.Msg.GetRef(), err) } return connect.NewResponse(&types.BytesValue{Value: data}), nil } func (s *BackrestHandler) PathAutocomplete(ctx context.Context, path *connect.Request[types.StringValue]) (*connect.Response[types.StringList], error) { ents, err := os.ReadDir(path.Msg.Value) if errors.Is(err, os.ErrNotExist) { return connect.NewResponse(&types.StringList{}), nil } else if err != nil { return nil, err } var paths []string for _, ent := range ents { paths = append(paths, ent.Name()) } return connect.NewResponse(&types.StringList{Values: paths}), nil }