diff --git a/cmd/backrest/backrest.go b/cmd/backrest/backrest.go index b6e0f08d..afa7acb2 100644 --- a/cmd/backrest/backrest.go +++ b/cmd/backrest/backrest.go @@ -15,6 +15,7 @@ import ( "sync/atomic" "syscall" + v1 "github.com/garethgeorge/backrest/gen/go/v1" "github.com/garethgeorge/backrest/gen/go/v1/v1connect" "github.com/garethgeorge/backrest/internal/api" "github.com/garethgeorge/backrest/internal/auth" @@ -24,11 +25,11 @@ import ( "github.com/garethgeorge/backrest/internal/metric" "github.com/garethgeorge/backrest/internal/oplog" "github.com/garethgeorge/backrest/internal/oplog/bboltstore" + "github.com/garethgeorge/backrest/internal/oplog/sqlitestore" "github.com/garethgeorge/backrest/internal/orchestrator" "github.com/garethgeorge/backrest/internal/resticinstaller" "github.com/garethgeorge/backrest/webui" "github.com/mattn/go-colorable" - "go.etcd.io/bbolt" "go.uber.org/zap" "go.uber.org/zap/zapcore" "golang.org/x/net/http2" @@ -66,21 +67,19 @@ func main() { var wg sync.WaitGroup // Create / load the operation log - oplogFile := path.Join(env.DataDir(), "oplog.boltdb") - opstore, err := bboltstore.NewBboltStore(oplogFile) + oplogFile := path.Join(env.DataDir(), "oplog.sqlite") + opstore, err := sqlitestore.NewSqliteStore(oplogFile) if err != nil { - if !errors.Is(err, bbolt.ErrTimeout) { - zap.S().Fatalf("timeout while waiting to open database, is the database open elsewhere?") - } zap.S().Warnf("operation log may be corrupted, if errors recur delete the file %q and restart. Your backups stored in your repos are safe.", oplogFile) - zap.S().Fatalf("error creating oplog : %v", err) + zap.S().Fatalf("error creating oplog: %v", err) } defer opstore.Close() - oplog, err := oplog.NewOpLog(opstore) + log, err := oplog.NewOpLog(opstore) if err != nil { zap.S().Fatalf("error creating oplog: %v", err) } + migrateBboltOplog(opstore) // Create rotating log storage logStore, err := logwriter.NewLogManager(path.Join(env.DataDir(), "rotatinglogs"), 14) // 14 days of logs @@ -89,7 +88,7 @@ func main() { } // Create orchestrator and start task loop. - orchestrator, err := orchestrator.NewOrchestrator(resticPath, cfg, oplog, logStore) + orchestrator, err := orchestrator.NewOrchestrator(resticPath, cfg, log, logStore) if err != nil { zap.S().Fatalf("error creating orchestrator: %v", err) } @@ -104,7 +103,7 @@ func main() { apiBackrestHandler := api.NewBackrestHandler( configStore, orchestrator, - oplog, + log, logStore, ) @@ -116,7 +115,7 @@ func main() { backrestHandlerPath, backrestHandler := v1connect.NewBackrestHandler(apiBackrestHandler) mux.Handle(backrestHandlerPath, auth.RequireAuthentication(backrestHandler, authenticator)) mux.Handle("/", webui.Handler()) - mux.Handle("/download/", http.StripPrefix("/download", api.NewDownloadHandler(oplog))) + mux.Handle("/download/", http.StripPrefix("/download", api.NewDownloadHandler(log))) mux.Handle("/metrics", auth.RequireAuthentication(metric.GetRegistry().Handler(), authenticator)) // Serve the HTTP gateway @@ -225,3 +224,63 @@ func installLoggers() { zap.ReplaceGlobals(zap.New(zapcore.NewTee(pretty, ugly))) zap.S().Infof("writing logs to: %v", logsDir) } + +func migrateBboltOplog(logstore oplog.OpStore) { + oldBboltOplogFile := path.Join(env.DataDir(), "oplog.boltdb") + if _, err := os.Stat(oldBboltOplogFile); err == nil { + zap.S().Warnf("found old bbolt oplog file %q, migrating to sqlite", oldBboltOplogFile) + oldOpstore, err := bboltstore.NewBboltStore(oldBboltOplogFile) + if err != nil { + zap.S().Fatalf("error opening old bbolt oplog: %v", err) + } + + oldOplog, err := oplog.NewOpLog(oldOpstore) + if err != nil { + zap.S().Fatalf("error creating old bbolt oplog: %v", err) + } + + batch := make([]*v1.Operation, 0, 32) + + var errs []error + + if err := oldOplog.Query(oplog.Query{}, func(op *v1.Operation) error { + batch = append(batch, op) + if len(batch) == 256 { + if err := logstore.Add(batch...); err != nil { + errs = append(errs, err) + zap.S().Warnf("error migrating %d operations: %v", len(batch), err) + } else { + zap.S().Debugf("migrated %d oplog operations from bbolt to sqlite store", len(batch)) + } + batch = batch[:0] + } + return nil + }); err != nil { + zap.S().Warnf("couldn't migrate all operations from the old bbolt oplog, if this recurs delete the file %q and restart", oldBboltOplogFile) + zap.S().Fatalf("error migrating old bbolt oplog: %v", err) + } + + if len(batch) > 0 { + if err := logstore.Add(batch...); err != nil { + errs = append(errs, err) + zap.S().Warnf("error migrating %d operations: %v", len(batch), err) + } else { + zap.S().Debugf("migrated %d oplog operations from bbolt to sqlite store", len(batch)) + } + zap.S().Debugf("migrated %d oplog operations from bbolt to sqlite store", len(batch)) + } + + if len(errs) > 0 { + zap.S().Fatalf("encountered %d errors migrating old bbolt oplog, see logs for details. If this probelem recurs delete the file %q and restart", len(errs), oldBboltOplogFile) + } + + if err := oldOpstore.Close(); err != nil { + zap.S().Warnf("error closing old bbolt oplog: %v", err) + } + if err := os.Remove(oldBboltOplogFile); err != nil { + zap.S().Warnf("error removing old bbolt oplog: %v", err) + } + + zap.S().Info("migrated old bbolt oplog to sqlite") + } +} diff --git a/internal/oplog/sqlitestore/sqlitestore.go b/internal/oplog/sqlitestore/sqlitestore.go index cb202766..b94a6b62 100644 --- a/internal/oplog/sqlitestore/sqlitestore.go +++ b/internal/oplog/sqlitestore/sqlitestore.go @@ -2,10 +2,13 @@ package sqlitestore import ( "context" + "crypto/rand" "errors" "fmt" + "math/big" "strings" "sync/atomic" + "time" v1 "github.com/garethgeorge/backrest/gen/go/v1" "github.com/garethgeorge/backrest/internal/oplog" @@ -26,7 +29,7 @@ var _ oplog.OpStore = (*SqliteStore)(nil) func NewSqliteStore(db string) (*SqliteStore, error) { dbpool, err := sqlitex.NewPool(db, sqlitex.PoolOptions{ PoolSize: 16, - Flags: sqlite.OpenReadWrite | sqlite.OpenCreate | sqlite.OpenWAL | sqlite.OpenSharedCache, + Flags: sqlite.OpenReadWrite | sqlite.OpenCreate | sqlite.OpenWAL, }) if err != nil { return nil, fmt.Errorf("open sqlite pool: %v", err) @@ -72,22 +75,18 @@ SELECT 0 WHERE NOT EXISTS (SELECT 1 FROM system_info); return fmt.Errorf("init sqlite: %v", err) } - // find the next id value - if err := sqlitex.ExecuteTransient(conn, "SELECT MAX(id) FROM operations", &sqlitex.ExecOptions{ - ResultFunc: func(stmt *sqlite.Stmt) error { - m.nextIDVal.Store(stmt.GetInt64("MAX(id)") + 1) - return nil - }, - }); err != nil { - return fmt.Errorf("get max ID: %v", err) - } - if m.nextIDVal.Load() == 0 { - m.nextIDVal.Store(1) - } + // rand init value + n, _ := rand.Int(rand.Reader, big.NewInt(1<<20)) + m.nextIDVal.Store(n.Int64()) return nil } +func (o *SqliteStore) nextID(unixTimeMs int64) (int64, error) { + seq := o.nextIDVal.Add(1) + return int64(unixTimeMs<<20) | int64(seq&((1<<20)-1)), nil +} + func (m *SqliteStore) Version() (int64, error) { conn, err := m.dbpool.Take(context.Background()) if err != nil { @@ -262,7 +261,10 @@ func (m *SqliteStore) Add(op ...*v1.Operation) error { return withSqliteTransaction(conn, func() error { for _, o := range op { - o.Id = m.nextIDVal.Add(1) + o.Id, err = m.nextID(time.Now().UnixMilli()) + if err != nil { + return fmt.Errorf("generate operation id: %v", err) + } if o.FlowId == 0 { o.FlowId = o.Id } @@ -281,7 +283,7 @@ func (m *SqliteStore) Add(op ...*v1.Operation) error { Args: []any{o.Id, o.FlowId, o.InstanceId, o.PlanId, o.RepoId, o.SnapshotId, bytes}, }); err != nil { if sqlite.ErrCode(err) == sqlite.ResultConstraintUnique { - return fmt.Errorf("operation already exists: %w", oplog.ErrExist) + return fmt.Errorf("operation already exists %v: %w", o.Id, oplog.ErrExist) } return fmt.Errorf("add operation: %v", err) }