diff --git a/cmd/backrest/backrest.go b/cmd/backrest/backrest.go index d8d2f3b3..259c4ff9 100644 --- a/cmd/backrest/backrest.go +++ b/cmd/backrest/backrest.go @@ -4,7 +4,6 @@ import ( "context" "crypto/rand" "crypto/sha256" - "database/sql" "encoding/hex" "errors" "flag" @@ -25,6 +24,7 @@ import ( "github.com/garethgeorge/backrest/internal/auth" "github.com/garethgeorge/backrest/internal/config" "github.com/garethgeorge/backrest/internal/env" + "github.com/garethgeorge/backrest/internal/kvstore" "github.com/garethgeorge/backrest/internal/logstore" "github.com/garethgeorge/backrest/internal/metric" "github.com/garethgeorge/backrest/internal/oplog" @@ -40,161 +40,98 @@ import ( "gopkg.in/natefinch/lumberjack.v2" ) -var InstallDepsOnly = flag.Bool("install-deps-only", false, "install dependencies and exit") +var installDepsOnly = flag.Bool("install-deps-only", false, "install dependencies and exit") + var ( version = "unknown" commit = "unknown" ) func runApp() { - installLoggers() + flag.Parse() + installLoggers(version, commit) + // Install dependencies if requested resticPath, err := resticinstaller.FindOrInstallResticBinary() if err != nil { - zap.S().Fatalf("error finding or installing restic: %v", err) + zap.L().Fatal("error finding or installing restic", zap.Error(err)) } - - if *InstallDepsOnly { - zap.S().Info("dependencies installed, exiting") + if *installDepsOnly { + zap.L().Info("dependencies installed, exiting") return } + // Setup context and signal handling ctx, cancel := context.WithCancel(context.Background()) + defer cancel() go onterm(os.Interrupt, cancel) go onterm(os.Interrupt, newForceKillHandler()) - // Load the configuration - configMgr := &config.ConfigManager{Store: createConfigProvider()} - cfg, err := configMgr.Get() + // Create dependency components + configStore := createConfigStore() + cfg, err := configStore.Get() if err != nil { - zap.S().Fatalf("error loading config: %v", err) + zap.L().Fatal("error loading config", zap.Error(err)) } + configMgr := &config.ConfigManager{Store: configStore} - var wg sync.WaitGroup - - // Create / load the operation log - oplogFile := path.Join(env.DataDir(), "oplog.sqlite") - opstore, err := sqlitestore.NewSqliteStore(oplogFile) - if errors.Is(err, sqlitestore.ErrLocked) { - zap.S().Fatalf("oplog is locked by another instance of backrest that is using the same data directory %q, kill that instance before starting another one.", env.DataDir()) - } else if err != nil { - zap.S().Warnf("operation log may be corrupted, if errors recur delete the file %q and restart. Your backups stored in your repos are safe.", oplogFile) - zap.S().Fatalf("error creating oplog: %v", err) - } - defer opstore.Close() - - log, err := oplog.NewOpLog(opstore) + opLog, opLogStore, err := newOpLog(cfg) if err != nil { - zap.S().Fatalf("error creating oplog: %v", err) - } - migratePopulateGuids(opstore, cfg) - if err := oplog.ApplyMigrations(log); err != nil { - zap.S().Fatalf("error applying oplog migrations: %v", err) + zap.L().Fatal("error creating oplog", zap.Error(err)) } + defer opLogStore.Close() - // Create rotating log storage - logStore, err := logstore.NewLogStore(filepath.Join(env.DataDir(), "tasklogs")) + logStore, unsubscribeLogStore, err := newLogStore(opLog) if err != nil { - zap.S().Fatalf("error creating task log store: %v", err) + zap.L().Fatal("error creating log store", zap.Error(err)) } - logstore.MigrateTarLogsInDir(logStore, filepath.Join(env.DataDir(), "rotatinglogs")) - deleteLogsForOp := func(ops []*v1.Operation, event oplog.OperationEvent) { - if event != oplog.OPERATION_DELETED { - return - } - for _, op := range ops { - if err := logStore.DeleteWithParent(op.Id); err != nil { - zap.S().Warnf("error deleting logs for operation %q: %v", op.Id, err) - } - } - } - log.Subscribe(oplog.Query{}, &deleteLogsForOp) + defer unsubscribeLogStore() defer func() { if err := logStore.Close(); err != nil { - zap.S().Warnf("error closing log store: %v", err) + zap.L().Warn("error closing log store", zap.Error(err)) } - log.Unsubscribe(&deleteLogsForOp) }() - // Create orchestrator and start task loop. - orchestrator, err := orchestrator.NewOrchestrator(resticPath, configMgr, log, logStore) + orch, err := orchestrator.NewOrchestrator(resticPath, configMgr, opLog, logStore) if err != nil { - zap.S().Fatalf("error creating orchestrator: %v", err) + zap.L().Fatal("error creating orchestrator", zap.Error(err)) } - wg.Add(1) + kvdbPath := path.Join(env.DataDir(), "kvdb.sqlite") + sharedKvdb, err := kvstore.NewSqliteDbForKvStore(kvdbPath) + if err != nil { + zap.L().Fatal("error creating general kvstore database pool", zap.Error(err)) + } + defer sharedKvdb.Close() + + peerStateManager, err := syncapi.NewSqlitePeerStateManager(sharedKvdb) + if err != nil { + zap.L().Fatal("error creating peer state manager", zap.Error(err)) + } + syncMgr := syncapi.NewSyncManager(configMgr, opLog, orch, peerStateManager) + + authenticator := newAuthenticator(configMgr) + + // Start background services + var wg sync.WaitGroup + wg.Add(2) go func() { - orchestrator.Run(ctx) - wg.Done() + defer wg.Done() + orch.Run(ctx) }() - - // Create peerstate manager - // Note: we don't have to acquire a lock since the sqlitestore already checks this, elsewise we should here. - peerStateDbPath := path.Join(env.DataDir(), "general.sqlite") - peerStateDbPool, err := sql.Open("sqlite3", peerStateDbPath) - if err != nil { - zap.S().Fatalf("error creating sqlite pool for peer state: %v", err) - } - peerStateManager, err := syncapi.NewSqlitePeerStateManager(peerStateDbPool) - if err != nil { - zap.S().Fatalf("error creating peer state manager: %v", err) - } - defer peerStateDbPool.Close() - - // Create and serve the HTTP gateway - syncMgr := syncapi.NewSyncManager(configMgr, log, orchestrator, peerStateManager) - wg.Add(1) go func() { + defer wg.Done() syncMgr.RunSync(ctx) - wg.Done() }() - apiBackrestHandler := api.NewBackrestHandler( - configMgr, - peerStateManager, - orchestrator, - log, - logStore, - ) - authenticator := auth.NewAuthenticator(getSecret(), configMgr) - apiAuthenticationHandler := api.NewAuthenticationHandler(authenticator) - - mux := http.NewServeMux() - mux.Handle(v1connect.NewAuthenticationHandler(apiAuthenticationHandler)) - backrestHandlerPath, backrestHandler := v1connect.NewBackrestHandler(apiBackrestHandler) - mux.Handle(backrestHandlerPath, auth.RequireAuthentication(backrestHandler, authenticator)) - mux.Handle(v1connect.NewBackrestSyncServiceHandler(syncapi.NewBackrestSyncHandler(syncMgr))) - syncStateHandlerPath, syncStateHandler := v1connect.NewBackrestSyncStateServiceHandler(syncapi.NewBackrestSyncStateHandler(syncMgr)) - mux.Handle(syncStateHandlerPath, auth.RequireAuthentication(syncStateHandler, authenticator)) - mux.Handle("/", webui.Handler()) - mux.Handle("/download/", http.StripPrefix("/download", api.NewDownloadHandler(log, orchestrator))) - mux.Handle("/metrics", auth.RequireAuthentication(metric.GetRegistry().Handler(), authenticator)) - - // Serve the HTTP gateway - var handler http.Handler = mux - if version == "unknown" { // dev build, enable CORS for local development - handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - w.Header().Set("Access-Control-Allow-Origin", "*") - w.Header().Set("Access-Control-Allow-Methods", "GET, POST, PUT, DELETE, OPTIONS") - w.Header().Set("Access-Control-Allow-Headers", "Content-Type, Connect-Protocol-Version, Connect-Timeout-Ms, Authorization") - if r.Method == "OPTIONS" { - w.WriteHeader(http.StatusOK) - return - } - mux.ServeHTTP(w, r) - }) - } - - server := &http.Server{ - Addr: env.BindAddress(), - Handler: h2c.NewHandler(handler, &http2.Server{}), // h2c is HTTP/2 without TLS for grpc-connect support. - } - - zap.S().Infof("starting web server %v", server.Addr) + // Setup and start HTTP server + server := newServer(configMgr, peerStateManager, orch, opLog, logStore, syncMgr, authenticator) go func() { <-ctx.Done() server.Shutdown(context.Background()) }() + + zap.L().Info("starting web server", zap.String("addr", server.Addr)) if err := server.ListenAndServe(); !errors.Is(err, http.ErrServerClosed) { zap.L().Error("error starting server", zap.Error(err)) } @@ -203,10 +140,172 @@ func runApp() { wg.Wait() } -func createConfigProvider() config.ConfigStore { +func createConfigStore() config.ConfigStore { return &config.JsonFileStore{Path: env.ConfigFilePath()} } +func newOpLog(cfg *v1.Config) (*oplog.OpLog, *sqlitestore.SqliteStore, error) { + oplogFile := path.Join(env.DataDir(), "oplog.sqlite") + opstore, err := sqlitestore.NewSqliteStore(oplogFile) + if errors.Is(err, sqlitestore.ErrLocked) { + zap.L().Fatal("oplog is locked by another instance of backrest", zap.String("data_dir", env.DataDir())) + } else if err != nil { + zap.L().Warn("operation log may be corrupted, if errors recur delete the file and restart. Your backups stored in your repos are safe.", zap.String("oplog_file", oplogFile)) + return nil, nil, err + } + migratePopulateGuids(opstore, cfg) + log, err := oplog.NewOpLog(opstore) + if err != nil { + opstore.Close() + return nil, nil, err + } + if err := oplog.ApplyMigrations(log); err != nil { + zap.S().Fatalf("error applying oplog migrations: %v", err) + } + return log, opstore, nil +} + +func newLogStore(opLog *oplog.OpLog) (*logstore.LogStore, func(), error) { + logStore, err := logstore.NewLogStore(filepath.Join(env.DataDir(), "tasklogs")) + if err != nil { + return nil, nil, err + } + logstore.MigrateTarLogsInDir(logStore, filepath.Join(env.DataDir(), "rotatinglogs")) + + deleteLogsForOp := func(ops []*v1.Operation, event oplog.OperationEvent) { + if event != oplog.OPERATION_DELETED { + return + } + for _, op := range ops { + if err := logStore.DeleteWithParent(op.Id); err != nil { + zap.L().Warn("error deleting logs for operation", zap.Int64("op_id", op.Id), zap.Error(err)) + } + } + } + opLog.Subscribe(oplog.Query{}, &deleteLogsForOp) + + unsubscribe := func() { + opLog.Unsubscribe(&deleteLogsForOp) + } + + return logStore, unsubscribe, nil +} + +func newAuthenticator(configMgr *config.ConfigManager) *auth.Authenticator { + secretFile := path.Join(env.DataDir(), "jwt-secret") + data, err := os.ReadFile(secretFile) + if err != nil { + zap.L().Info("generating new auth secret") + secret := make([]byte, 64) + if n, err := rand.Read(secret); err != nil || n != 64 { + zap.L().Fatal("error generating secret", zap.Error(err)) + } + if err := os.MkdirAll(env.DataDir(), 0700); err != nil { + zap.L().Fatal("error creating data directory", zap.Error(err)) + } + if err := os.WriteFile(secretFile, secret, 0600); err != nil { + zap.L().Fatal("error writing secret to file", zap.Error(err)) + } + data = secret + } else { + zap.L().Debug("loading auth secret from file") + } + return auth.NewAuthenticator(data, configMgr) +} + +func newServer( + configMgr *config.ConfigManager, + peerStateManager syncapi.PeerStateManager, + orch *orchestrator.Orchestrator, + opLog *oplog.OpLog, + logStore *logstore.LogStore, + syncMgr *syncapi.SyncManager, + authenticator *auth.Authenticator, +) *http.Server { + // API Handlers + apiBackrestHandler := api.NewBackrestHandler(configMgr, peerStateManager, orch, opLog, logStore) + apiAuthenticationHandler := api.NewAuthenticationHandler(authenticator) + syncHandler := syncapi.NewBackrestSyncHandler(syncMgr) + syncStateHandler := syncapi.NewBackrestSyncStateHandler(syncMgr) + downloadHandler := api.NewDownloadHandler(opLog, orch) + + // Routing + rootMux := newRootMux(apiBackrestHandler, apiAuthenticationHandler, syncHandler, syncStateHandler, downloadHandler, authenticator) + + var handler http.Handler = rootMux + if version == "unknown" { // dev build, enable CORS for local development + handler = newCorsMiddleware(rootMux) + } + + return &http.Server{ + Addr: env.BindAddress(), + Handler: h2c.NewHandler(handler, &http2.Server{}), + } +} + +func newRootMux( + apiBackrestHandler v1connect.BackrestHandler, + apiAuthenticationHandler v1connect.AuthenticationHandler, + syncHandler v1connect.BackrestSyncServiceHandler, + syncStateHandler v1connect.BackrestSyncStateServiceHandler, + downloadHandler http.Handler, + authenticator *auth.Authenticator, +) *http.ServeMux { + // Authenticated routes + authedMux := http.NewServeMux() + backrestPath, backrestHandler := v1connect.NewBackrestHandler(apiBackrestHandler) + authedMux.Handle(backrestPath, backrestHandler) + syncStatePath, syncStateHandlerUnauthed := v1connect.NewBackrestSyncStateServiceHandler(syncStateHandler) + authedMux.Handle(syncStatePath, syncStateHandlerUnauthed) + authedMux.Handle("/download/", http.StripPrefix("/download", downloadHandler)) + authedMux.Handle("/metrics", metric.GetRegistry().Handler()) + + // Unauthenticated routes + unauthedMux := http.NewServeMux() + authPath, authHandler := v1connect.NewAuthenticationHandler(apiAuthenticationHandler) + unauthedMux.Handle(authPath, authHandler) + syncPath, syncHandlerUnauthed := v1connect.NewBackrestSyncServiceHandler(syncHandler) + unauthedMux.Handle(syncPath, syncHandlerUnauthed) + + // Root mux to dispatch to authenticated or unauthenticated handlers + rootMux := http.NewServeMux() + + // Create a fall through handler which tries the muxes in order to find one to handle the route. + rootMux.Handle("/", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // Check if unauthedMux has a handler for this path + handler, pattern := unauthedMux.Handler(r) + if pattern != "" { + handler.ServeHTTP(w, r) + return + } + + // Check if the mux can provide a handler for the authenticated routes + handler, pattern = authedMux.Handler(r) + if pattern != "" { + auth.RequireAuthentication(handler, authenticator).ServeHTTP(w, r) + return + } + + // Fall back to web UI handler. + webui.Handler().ServeHTTP(w, r) + })) + + return rootMux +} + +func newCorsMiddleware(h http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Access-Control-Allow-Origin", "*") + w.Header().Set("Access-Control-Allow-Methods", "GET, POST, PUT, DELETE, OPTIONS") + w.Header().Set("Access-Control-Allow-Headers", "Content-Type, Connect-Protocol-Version, Connect-Timeout-Ms, Authorization, Accept-Encoding") + if r.Method == "OPTIONS" { + w.WriteHeader(http.StatusOK) + return + } + h.ServeHTTP(w, r) + }) +} + func onterm(s os.Signal, callback func()) { sigchan := make(chan os.Signal, 1) signal.Notify(sigchan, s, syscall.SIGTERM) @@ -216,28 +315,6 @@ func onterm(s os.Signal, callback func()) { } } -func getSecret() []byte { - secretFile := path.Join(env.DataDir(), "jwt-secret") - data, err := os.ReadFile(secretFile) - if err == nil { - zap.L().Debug("loading auth secret from file") - return data - } - - zap.L().Info("generating new auth secret") - secret := make([]byte, 64) - if n, err := rand.Read(secret); err != nil || n != 64 { - zap.S().Fatalf("error generating secret: %v", err) - } - if err := os.MkdirAll(env.DataDir(), 0700); err != nil { - zap.S().Fatalf("error creating data directory: %v", err) - } - if err := os.WriteFile(secretFile, secret, 0600); err != nil { - zap.S().Fatalf("error writing secret to file: %v", err) - } - return secret -} - func newForceKillHandler() func() { var times atomic.Int32 return func() { @@ -245,34 +322,34 @@ func newForceKillHandler() func() { buf := make([]byte, 1<<16) runtime.Stack(buf, true) os.Stderr.Write(buf) - zap.S().Fatal("dumped all running coroutine stack traces, forcing termination") + zap.L().Fatal("dumped all running coroutine stack traces, forcing termination") } times.Add(1) - zap.S().Warn("attempting graceful shutdown, to force termination press Ctrl+C again") + zap.L().Warn("attempting graceful shutdown, to force termination press Ctrl+C again") } } -func installLoggers() { +func installLoggers(version, commit string) { // Pretty logging for console c := zap.NewDevelopmentEncoderConfig() c.EncodeLevel = zapcore.CapitalColorLevelEncoder c.EncodeTime = zapcore.ISO8601TimeEncoder - debugLevel := zapcore.InfoLevel + logLevel := zapcore.InfoLevel if version == "unknown" { // dev build - debugLevel = zapcore.DebugLevel + logLevel = zapcore.DebugLevel } pretty := zapcore.NewCore( zapcore.NewConsoleEncoder(c), zapcore.AddSync(colorable.NewColorableStdout()), - debugLevel, + logLevel, ) // JSON logging to log directory logsDir := env.LogsPath() if err := os.MkdirAll(logsDir, 0755); err != nil { zap.ReplaceGlobals(zap.New(pretty)) - zap.S().Errorf("error creating logs directory %q, will only log to console for now: %v", err) + zap.L().Error("error creating logs directory, will only log to console for now", zap.String("path", logsDir), zap.Error(err)) return } @@ -291,7 +368,7 @@ func installLoggers() { ) zap.ReplaceGlobals(zap.New(zapcore.NewTee(pretty, ugly))) - zap.S().Infof("backrest version %v@%v, using log directory: %v", version, commit, logsDir) + zap.L().Info("backrest starting", zap.String("version", version), zap.String("commit", commit), zap.String("log_dir", logsDir)) } func migratePopulateGuids(logstore oplog.OpStore, cfg *v1.Config) { @@ -323,8 +400,8 @@ func migratePopulateGuids(logstore oplog.OpStore, cfg *v1.Config) { migratedOpCount++ return op, nil }); err != nil { - zap.S().Fatalf("error populating repo GUIDs for existing operations: %v", err) + zap.L().Fatal("error populating repo GUIDs for existing operations", zap.Error(err)) } else if migratedOpCount > 0 { - zap.S().Infof("populated repo GUIDs for %d existing operations", migratedOpCount) + zap.L().Info("populated repo GUIDs for existing operations", zap.Int("count", migratedOpCount)) } } diff --git a/internal/kvstore/sqlitedb.go b/internal/kvstore/sqlitedb.go new file mode 100644 index 00000000..a940fc9f --- /dev/null +++ b/internal/kvstore/sqlitedb.go @@ -0,0 +1,51 @@ +package kvstore + +import ( + "context" + "database/sql" + "fmt" + "os" + "path/filepath" + "runtime" + "testing" + + "github.com/ncruces/go-sqlite3/vfs" + "github.com/ncruces/go-sqlite3/vfs/memdb" +) + +func NewSqliteDbForKvStore(db string) (*sql.DB, error) { + if err := os.MkdirAll(filepath.Dir(db), 0700); err != nil { + return nil, fmt.Errorf("create sqlite db directory: %v", err) + } + if !vfs.SupportsFileLocking { + return nil, fmt.Errorf("file locking not supported") + } + dbpool, err := sql.Open("sqlite3", db) + if err != nil { + return nil, fmt.Errorf("open sqlite pool: %v", err) + } + if vfs.SupportsSharedMemory { + _, err = dbpool.ExecContext(context.Background(), ` + PRAGMA journal_mode = WAL; + PRAGMA synchronous = NORMAL; + `) + if err != nil { + return nil, fmt.Errorf("run multiline query: %v", err) + } + } + if runtime.GOOS == "darwin" { + _, err = dbpool.ExecContext(context.Background(), "PRAGMA checkpoint_fullfsync = 1") + if err != nil { + return nil, fmt.Errorf("run multiline query: %v", err) + } + } + return dbpool, nil +} + +func NewInMemorySqliteDbForKvStore(t testing.TB) (*sql.DB, error) { + dbpool, err := sql.Open("sqlite3", memdb.TestDB(t)) + if err != nil { + return nil, fmt.Errorf("open sqlite pool: %v", err) + } + return dbpool, nil +} diff --git a/webui/src/views/App.tsx b/webui/src/views/App.tsx index 44f2ad25..94ede453 100644 --- a/webui/src/views/App.tsx +++ b/webui/src/views/App.tsx @@ -90,6 +90,52 @@ const RepoViewContainer = () => { ); }; +const RemoteRepoViewContainer = () => { + const { peerInstanceId, repoId } = useParams(); + const [config, setConfig] = useConfig(); + const [peerStates, setPeerStates] = useState([]); + + // subscribe to peer states + useEffect(() => { + if (!config || !config.multihost) return; + const cb = (states: PeerState[]) => { + setPeerStates(states); + }; + subscribeToPeerStates(cb); + return () => { + unsubscribeFromPeerStates(cb); + }; + }, [config]); + + if (!config) { + return ; + } + + // Peer state is used to find the right repo + const peerState = peerStates.find( + (state) => state.peerInstanceId === peerInstanceId + ); + const peerRepo = (peerState?.knownRepos || []).find((r) => r.id === repoId); + + return ( + + {peerRepo ? ( + + ) : ( + + )} + + ); +}; + const PlanViewContainer = () => { const { planId } = useParams(); const [config, setConfig] = useConfig(); @@ -227,6 +273,10 @@ export const App: React.FC = () => { /> } /> } /> + } + /> , + label: ( +
+ {repo.id} +
+ ), + onClick: async () => { + navigate(`/peer/${peerState.peerInstanceId}/repo/${repo.id}`); + }, + }; }); return { @@ -439,6 +505,7 @@ const getSidenavItems = ( {peerState.peerInstanceId} ), + children: repos.length > 0 ? repos : undefined, }; }; diff --git a/webui/src/views/RepoView.tsx b/webui/src/views/RepoView.tsx index 9b76ce20..122aa7e9 100644 --- a/webui/src/views/RepoView.tsx +++ b/webui/src/views/RepoView.tsx @@ -16,10 +16,19 @@ import { useConfig } from "../components/ConfigProvider"; import { formatErrorAlert, useAlertApi } from "../components/Alerts"; import { useShowModal } from "../components/ModalManager"; import { create } from "@bufbuild/protobuf"; +import { SyncRepoMetadata } from "../../gen/ts/v1/syncservice_pb"; const StatsPanel = React.lazy(() => import("../components/StatsPanel")); -export const RepoView = ({ repo }: React.PropsWithChildren<{ repo: Repo }>) => { +// Type intersection to combine properties from Repo and SyncRepoMetadata +interface RepoProps { + id: string; + guid: string; +} + +export const RepoView = ({ + repo, +}: React.PropsWithChildren<{ repo: RepoProps }>) => { const [config, _] = useConfig(); const showModal = useShowModal(); const alertsApi = useAlertApi()!; diff --git a/webui/src/views/SelectorView.tsx b/webui/src/views/SelectorView.tsx new file mode 100644 index 00000000..e69de29b