mirror of
https://github.com/garethgeorge/backrest.git
synced 2026-05-06 04:50:35 +00:00
partially implement sidenav with repo views
This commit is contained in:
+223
-146
@@ -4,7 +4,6 @@ import (
|
||||
"context"
|
||||
"crypto/rand"
|
||||
"crypto/sha256"
|
||||
"database/sql"
|
||||
"encoding/hex"
|
||||
"errors"
|
||||
"flag"
|
||||
@@ -25,6 +24,7 @@ import (
|
||||
"github.com/garethgeorge/backrest/internal/auth"
|
||||
"github.com/garethgeorge/backrest/internal/config"
|
||||
"github.com/garethgeorge/backrest/internal/env"
|
||||
"github.com/garethgeorge/backrest/internal/kvstore"
|
||||
"github.com/garethgeorge/backrest/internal/logstore"
|
||||
"github.com/garethgeorge/backrest/internal/metric"
|
||||
"github.com/garethgeorge/backrest/internal/oplog"
|
||||
@@ -40,161 +40,98 @@ import (
|
||||
"gopkg.in/natefinch/lumberjack.v2"
|
||||
)
|
||||
|
||||
var InstallDepsOnly = flag.Bool("install-deps-only", false, "install dependencies and exit")
|
||||
var installDepsOnly = flag.Bool("install-deps-only", false, "install dependencies and exit")
|
||||
|
||||
var (
|
||||
version = "unknown"
|
||||
commit = "unknown"
|
||||
)
|
||||
|
||||
func runApp() {
|
||||
installLoggers()
|
||||
flag.Parse()
|
||||
installLoggers(version, commit)
|
||||
|
||||
// Install dependencies if requested
|
||||
resticPath, err := resticinstaller.FindOrInstallResticBinary()
|
||||
if err != nil {
|
||||
zap.S().Fatalf("error finding or installing restic: %v", err)
|
||||
zap.L().Fatal("error finding or installing restic", zap.Error(err))
|
||||
}
|
||||
|
||||
if *InstallDepsOnly {
|
||||
zap.S().Info("dependencies installed, exiting")
|
||||
if *installDepsOnly {
|
||||
zap.L().Info("dependencies installed, exiting")
|
||||
return
|
||||
}
|
||||
|
||||
// Setup context and signal handling
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
go onterm(os.Interrupt, cancel)
|
||||
go onterm(os.Interrupt, newForceKillHandler())
|
||||
|
||||
// Load the configuration
|
||||
configMgr := &config.ConfigManager{Store: createConfigProvider()}
|
||||
cfg, err := configMgr.Get()
|
||||
// Create dependency components
|
||||
configStore := createConfigStore()
|
||||
cfg, err := configStore.Get()
|
||||
if err != nil {
|
||||
zap.S().Fatalf("error loading config: %v", err)
|
||||
zap.L().Fatal("error loading config", zap.Error(err))
|
||||
}
|
||||
configMgr := &config.ConfigManager{Store: configStore}
|
||||
|
||||
var wg sync.WaitGroup
|
||||
|
||||
// Create / load the operation log
|
||||
oplogFile := path.Join(env.DataDir(), "oplog.sqlite")
|
||||
opstore, err := sqlitestore.NewSqliteStore(oplogFile)
|
||||
if errors.Is(err, sqlitestore.ErrLocked) {
|
||||
zap.S().Fatalf("oplog is locked by another instance of backrest that is using the same data directory %q, kill that instance before starting another one.", env.DataDir())
|
||||
} else if err != nil {
|
||||
zap.S().Warnf("operation log may be corrupted, if errors recur delete the file %q and restart. Your backups stored in your repos are safe.", oplogFile)
|
||||
zap.S().Fatalf("error creating oplog: %v", err)
|
||||
}
|
||||
defer opstore.Close()
|
||||
|
||||
log, err := oplog.NewOpLog(opstore)
|
||||
opLog, opLogStore, err := newOpLog(cfg)
|
||||
if err != nil {
|
||||
zap.S().Fatalf("error creating oplog: %v", err)
|
||||
}
|
||||
migratePopulateGuids(opstore, cfg)
|
||||
if err := oplog.ApplyMigrations(log); err != nil {
|
||||
zap.S().Fatalf("error applying oplog migrations: %v", err)
|
||||
zap.L().Fatal("error creating oplog", zap.Error(err))
|
||||
}
|
||||
defer opLogStore.Close()
|
||||
|
||||
// Create rotating log storage
|
||||
logStore, err := logstore.NewLogStore(filepath.Join(env.DataDir(), "tasklogs"))
|
||||
logStore, unsubscribeLogStore, err := newLogStore(opLog)
|
||||
if err != nil {
|
||||
zap.S().Fatalf("error creating task log store: %v", err)
|
||||
zap.L().Fatal("error creating log store", zap.Error(err))
|
||||
}
|
||||
logstore.MigrateTarLogsInDir(logStore, filepath.Join(env.DataDir(), "rotatinglogs"))
|
||||
deleteLogsForOp := func(ops []*v1.Operation, event oplog.OperationEvent) {
|
||||
if event != oplog.OPERATION_DELETED {
|
||||
return
|
||||
}
|
||||
for _, op := range ops {
|
||||
if err := logStore.DeleteWithParent(op.Id); err != nil {
|
||||
zap.S().Warnf("error deleting logs for operation %q: %v", op.Id, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
log.Subscribe(oplog.Query{}, &deleteLogsForOp)
|
||||
defer unsubscribeLogStore()
|
||||
defer func() {
|
||||
if err := logStore.Close(); err != nil {
|
||||
zap.S().Warnf("error closing log store: %v", err)
|
||||
zap.L().Warn("error closing log store", zap.Error(err))
|
||||
}
|
||||
log.Unsubscribe(&deleteLogsForOp)
|
||||
}()
|
||||
|
||||
// Create orchestrator and start task loop.
|
||||
orchestrator, err := orchestrator.NewOrchestrator(resticPath, configMgr, log, logStore)
|
||||
orch, err := orchestrator.NewOrchestrator(resticPath, configMgr, opLog, logStore)
|
||||
if err != nil {
|
||||
zap.S().Fatalf("error creating orchestrator: %v", err)
|
||||
zap.L().Fatal("error creating orchestrator", zap.Error(err))
|
||||
}
|
||||
|
||||
wg.Add(1)
|
||||
kvdbPath := path.Join(env.DataDir(), "kvdb.sqlite")
|
||||
sharedKvdb, err := kvstore.NewSqliteDbForKvStore(kvdbPath)
|
||||
if err != nil {
|
||||
zap.L().Fatal("error creating general kvstore database pool", zap.Error(err))
|
||||
}
|
||||
defer sharedKvdb.Close()
|
||||
|
||||
peerStateManager, err := syncapi.NewSqlitePeerStateManager(sharedKvdb)
|
||||
if err != nil {
|
||||
zap.L().Fatal("error creating peer state manager", zap.Error(err))
|
||||
}
|
||||
syncMgr := syncapi.NewSyncManager(configMgr, opLog, orch, peerStateManager)
|
||||
|
||||
authenticator := newAuthenticator(configMgr)
|
||||
|
||||
// Start background services
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(2)
|
||||
go func() {
|
||||
orchestrator.Run(ctx)
|
||||
wg.Done()
|
||||
defer wg.Done()
|
||||
orch.Run(ctx)
|
||||
}()
|
||||
|
||||
// Create peerstate manager
|
||||
// Note: we don't have to acquire a lock since the sqlitestore already checks this, elsewise we should here.
|
||||
peerStateDbPath := path.Join(env.DataDir(), "general.sqlite")
|
||||
peerStateDbPool, err := sql.Open("sqlite3", peerStateDbPath)
|
||||
if err != nil {
|
||||
zap.S().Fatalf("error creating sqlite pool for peer state: %v", err)
|
||||
}
|
||||
peerStateManager, err := syncapi.NewSqlitePeerStateManager(peerStateDbPool)
|
||||
if err != nil {
|
||||
zap.S().Fatalf("error creating peer state manager: %v", err)
|
||||
}
|
||||
defer peerStateDbPool.Close()
|
||||
|
||||
// Create and serve the HTTP gateway
|
||||
syncMgr := syncapi.NewSyncManager(configMgr, log, orchestrator, peerStateManager)
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
syncMgr.RunSync(ctx)
|
||||
wg.Done()
|
||||
}()
|
||||
|
||||
apiBackrestHandler := api.NewBackrestHandler(
|
||||
configMgr,
|
||||
peerStateManager,
|
||||
orchestrator,
|
||||
log,
|
||||
logStore,
|
||||
)
|
||||
authenticator := auth.NewAuthenticator(getSecret(), configMgr)
|
||||
apiAuthenticationHandler := api.NewAuthenticationHandler(authenticator)
|
||||
|
||||
mux := http.NewServeMux()
|
||||
mux.Handle(v1connect.NewAuthenticationHandler(apiAuthenticationHandler))
|
||||
backrestHandlerPath, backrestHandler := v1connect.NewBackrestHandler(apiBackrestHandler)
|
||||
mux.Handle(backrestHandlerPath, auth.RequireAuthentication(backrestHandler, authenticator))
|
||||
mux.Handle(v1connect.NewBackrestSyncServiceHandler(syncapi.NewBackrestSyncHandler(syncMgr)))
|
||||
syncStateHandlerPath, syncStateHandler := v1connect.NewBackrestSyncStateServiceHandler(syncapi.NewBackrestSyncStateHandler(syncMgr))
|
||||
mux.Handle(syncStateHandlerPath, auth.RequireAuthentication(syncStateHandler, authenticator))
|
||||
mux.Handle("/", webui.Handler())
|
||||
mux.Handle("/download/", http.StripPrefix("/download", api.NewDownloadHandler(log, orchestrator)))
|
||||
mux.Handle("/metrics", auth.RequireAuthentication(metric.GetRegistry().Handler(), authenticator))
|
||||
|
||||
// Serve the HTTP gateway
|
||||
var handler http.Handler = mux
|
||||
if version == "unknown" { // dev build, enable CORS for local development
|
||||
handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
w.Header().Set("Access-Control-Allow-Origin", "*")
|
||||
w.Header().Set("Access-Control-Allow-Methods", "GET, POST, PUT, DELETE, OPTIONS")
|
||||
w.Header().Set("Access-Control-Allow-Headers", "Content-Type, Connect-Protocol-Version, Connect-Timeout-Ms, Authorization")
|
||||
if r.Method == "OPTIONS" {
|
||||
w.WriteHeader(http.StatusOK)
|
||||
return
|
||||
}
|
||||
mux.ServeHTTP(w, r)
|
||||
})
|
||||
}
|
||||
|
||||
server := &http.Server{
|
||||
Addr: env.BindAddress(),
|
||||
Handler: h2c.NewHandler(handler, &http2.Server{}), // h2c is HTTP/2 without TLS for grpc-connect support.
|
||||
}
|
||||
|
||||
zap.S().Infof("starting web server %v", server.Addr)
|
||||
// Setup and start HTTP server
|
||||
server := newServer(configMgr, peerStateManager, orch, opLog, logStore, syncMgr, authenticator)
|
||||
go func() {
|
||||
<-ctx.Done()
|
||||
server.Shutdown(context.Background())
|
||||
}()
|
||||
|
||||
zap.L().Info("starting web server", zap.String("addr", server.Addr))
|
||||
if err := server.ListenAndServe(); !errors.Is(err, http.ErrServerClosed) {
|
||||
zap.L().Error("error starting server", zap.Error(err))
|
||||
}
|
||||
@@ -203,10 +140,172 @@ func runApp() {
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func createConfigProvider() config.ConfigStore {
|
||||
func createConfigStore() config.ConfigStore {
|
||||
return &config.JsonFileStore{Path: env.ConfigFilePath()}
|
||||
}
|
||||
|
||||
func newOpLog(cfg *v1.Config) (*oplog.OpLog, *sqlitestore.SqliteStore, error) {
|
||||
oplogFile := path.Join(env.DataDir(), "oplog.sqlite")
|
||||
opstore, err := sqlitestore.NewSqliteStore(oplogFile)
|
||||
if errors.Is(err, sqlitestore.ErrLocked) {
|
||||
zap.L().Fatal("oplog is locked by another instance of backrest", zap.String("data_dir", env.DataDir()))
|
||||
} else if err != nil {
|
||||
zap.L().Warn("operation log may be corrupted, if errors recur delete the file and restart. Your backups stored in your repos are safe.", zap.String("oplog_file", oplogFile))
|
||||
return nil, nil, err
|
||||
}
|
||||
migratePopulateGuids(opstore, cfg)
|
||||
log, err := oplog.NewOpLog(opstore)
|
||||
if err != nil {
|
||||
opstore.Close()
|
||||
return nil, nil, err
|
||||
}
|
||||
if err := oplog.ApplyMigrations(log); err != nil {
|
||||
zap.S().Fatalf("error applying oplog migrations: %v", err)
|
||||
}
|
||||
return log, opstore, nil
|
||||
}
|
||||
|
||||
func newLogStore(opLog *oplog.OpLog) (*logstore.LogStore, func(), error) {
|
||||
logStore, err := logstore.NewLogStore(filepath.Join(env.DataDir(), "tasklogs"))
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
logstore.MigrateTarLogsInDir(logStore, filepath.Join(env.DataDir(), "rotatinglogs"))
|
||||
|
||||
deleteLogsForOp := func(ops []*v1.Operation, event oplog.OperationEvent) {
|
||||
if event != oplog.OPERATION_DELETED {
|
||||
return
|
||||
}
|
||||
for _, op := range ops {
|
||||
if err := logStore.DeleteWithParent(op.Id); err != nil {
|
||||
zap.L().Warn("error deleting logs for operation", zap.Int64("op_id", op.Id), zap.Error(err))
|
||||
}
|
||||
}
|
||||
}
|
||||
opLog.Subscribe(oplog.Query{}, &deleteLogsForOp)
|
||||
|
||||
unsubscribe := func() {
|
||||
opLog.Unsubscribe(&deleteLogsForOp)
|
||||
}
|
||||
|
||||
return logStore, unsubscribe, nil
|
||||
}
|
||||
|
||||
func newAuthenticator(configMgr *config.ConfigManager) *auth.Authenticator {
|
||||
secretFile := path.Join(env.DataDir(), "jwt-secret")
|
||||
data, err := os.ReadFile(secretFile)
|
||||
if err != nil {
|
||||
zap.L().Info("generating new auth secret")
|
||||
secret := make([]byte, 64)
|
||||
if n, err := rand.Read(secret); err != nil || n != 64 {
|
||||
zap.L().Fatal("error generating secret", zap.Error(err))
|
||||
}
|
||||
if err := os.MkdirAll(env.DataDir(), 0700); err != nil {
|
||||
zap.L().Fatal("error creating data directory", zap.Error(err))
|
||||
}
|
||||
if err := os.WriteFile(secretFile, secret, 0600); err != nil {
|
||||
zap.L().Fatal("error writing secret to file", zap.Error(err))
|
||||
}
|
||||
data = secret
|
||||
} else {
|
||||
zap.L().Debug("loading auth secret from file")
|
||||
}
|
||||
return auth.NewAuthenticator(data, configMgr)
|
||||
}
|
||||
|
||||
func newServer(
|
||||
configMgr *config.ConfigManager,
|
||||
peerStateManager syncapi.PeerStateManager,
|
||||
orch *orchestrator.Orchestrator,
|
||||
opLog *oplog.OpLog,
|
||||
logStore *logstore.LogStore,
|
||||
syncMgr *syncapi.SyncManager,
|
||||
authenticator *auth.Authenticator,
|
||||
) *http.Server {
|
||||
// API Handlers
|
||||
apiBackrestHandler := api.NewBackrestHandler(configMgr, peerStateManager, orch, opLog, logStore)
|
||||
apiAuthenticationHandler := api.NewAuthenticationHandler(authenticator)
|
||||
syncHandler := syncapi.NewBackrestSyncHandler(syncMgr)
|
||||
syncStateHandler := syncapi.NewBackrestSyncStateHandler(syncMgr)
|
||||
downloadHandler := api.NewDownloadHandler(opLog, orch)
|
||||
|
||||
// Routing
|
||||
rootMux := newRootMux(apiBackrestHandler, apiAuthenticationHandler, syncHandler, syncStateHandler, downloadHandler, authenticator)
|
||||
|
||||
var handler http.Handler = rootMux
|
||||
if version == "unknown" { // dev build, enable CORS for local development
|
||||
handler = newCorsMiddleware(rootMux)
|
||||
}
|
||||
|
||||
return &http.Server{
|
||||
Addr: env.BindAddress(),
|
||||
Handler: h2c.NewHandler(handler, &http2.Server{}),
|
||||
}
|
||||
}
|
||||
|
||||
func newRootMux(
|
||||
apiBackrestHandler v1connect.BackrestHandler,
|
||||
apiAuthenticationHandler v1connect.AuthenticationHandler,
|
||||
syncHandler v1connect.BackrestSyncServiceHandler,
|
||||
syncStateHandler v1connect.BackrestSyncStateServiceHandler,
|
||||
downloadHandler http.Handler,
|
||||
authenticator *auth.Authenticator,
|
||||
) *http.ServeMux {
|
||||
// Authenticated routes
|
||||
authedMux := http.NewServeMux()
|
||||
backrestPath, backrestHandler := v1connect.NewBackrestHandler(apiBackrestHandler)
|
||||
authedMux.Handle(backrestPath, backrestHandler)
|
||||
syncStatePath, syncStateHandlerUnauthed := v1connect.NewBackrestSyncStateServiceHandler(syncStateHandler)
|
||||
authedMux.Handle(syncStatePath, syncStateHandlerUnauthed)
|
||||
authedMux.Handle("/download/", http.StripPrefix("/download", downloadHandler))
|
||||
authedMux.Handle("/metrics", metric.GetRegistry().Handler())
|
||||
|
||||
// Unauthenticated routes
|
||||
unauthedMux := http.NewServeMux()
|
||||
authPath, authHandler := v1connect.NewAuthenticationHandler(apiAuthenticationHandler)
|
||||
unauthedMux.Handle(authPath, authHandler)
|
||||
syncPath, syncHandlerUnauthed := v1connect.NewBackrestSyncServiceHandler(syncHandler)
|
||||
unauthedMux.Handle(syncPath, syncHandlerUnauthed)
|
||||
|
||||
// Root mux to dispatch to authenticated or unauthenticated handlers
|
||||
rootMux := http.NewServeMux()
|
||||
|
||||
// Create a fall through handler which tries the muxes in order to find one to handle the route.
|
||||
rootMux.Handle("/", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
// Check if unauthedMux has a handler for this path
|
||||
handler, pattern := unauthedMux.Handler(r)
|
||||
if pattern != "" {
|
||||
handler.ServeHTTP(w, r)
|
||||
return
|
||||
}
|
||||
|
||||
// Check if the mux can provide a handler for the authenticated routes
|
||||
handler, pattern = authedMux.Handler(r)
|
||||
if pattern != "" {
|
||||
auth.RequireAuthentication(handler, authenticator).ServeHTTP(w, r)
|
||||
return
|
||||
}
|
||||
|
||||
// Fall back to web UI handler.
|
||||
webui.Handler().ServeHTTP(w, r)
|
||||
}))
|
||||
|
||||
return rootMux
|
||||
}
|
||||
|
||||
func newCorsMiddleware(h http.Handler) http.Handler {
|
||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
w.Header().Set("Access-Control-Allow-Origin", "*")
|
||||
w.Header().Set("Access-Control-Allow-Methods", "GET, POST, PUT, DELETE, OPTIONS")
|
||||
w.Header().Set("Access-Control-Allow-Headers", "Content-Type, Connect-Protocol-Version, Connect-Timeout-Ms, Authorization, Accept-Encoding")
|
||||
if r.Method == "OPTIONS" {
|
||||
w.WriteHeader(http.StatusOK)
|
||||
return
|
||||
}
|
||||
h.ServeHTTP(w, r)
|
||||
})
|
||||
}
|
||||
|
||||
func onterm(s os.Signal, callback func()) {
|
||||
sigchan := make(chan os.Signal, 1)
|
||||
signal.Notify(sigchan, s, syscall.SIGTERM)
|
||||
@@ -216,28 +315,6 @@ func onterm(s os.Signal, callback func()) {
|
||||
}
|
||||
}
|
||||
|
||||
func getSecret() []byte {
|
||||
secretFile := path.Join(env.DataDir(), "jwt-secret")
|
||||
data, err := os.ReadFile(secretFile)
|
||||
if err == nil {
|
||||
zap.L().Debug("loading auth secret from file")
|
||||
return data
|
||||
}
|
||||
|
||||
zap.L().Info("generating new auth secret")
|
||||
secret := make([]byte, 64)
|
||||
if n, err := rand.Read(secret); err != nil || n != 64 {
|
||||
zap.S().Fatalf("error generating secret: %v", err)
|
||||
}
|
||||
if err := os.MkdirAll(env.DataDir(), 0700); err != nil {
|
||||
zap.S().Fatalf("error creating data directory: %v", err)
|
||||
}
|
||||
if err := os.WriteFile(secretFile, secret, 0600); err != nil {
|
||||
zap.S().Fatalf("error writing secret to file: %v", err)
|
||||
}
|
||||
return secret
|
||||
}
|
||||
|
||||
func newForceKillHandler() func() {
|
||||
var times atomic.Int32
|
||||
return func() {
|
||||
@@ -245,34 +322,34 @@ func newForceKillHandler() func() {
|
||||
buf := make([]byte, 1<<16)
|
||||
runtime.Stack(buf, true)
|
||||
os.Stderr.Write(buf)
|
||||
zap.S().Fatal("dumped all running coroutine stack traces, forcing termination")
|
||||
zap.L().Fatal("dumped all running coroutine stack traces, forcing termination")
|
||||
}
|
||||
times.Add(1)
|
||||
zap.S().Warn("attempting graceful shutdown, to force termination press Ctrl+C again")
|
||||
zap.L().Warn("attempting graceful shutdown, to force termination press Ctrl+C again")
|
||||
}
|
||||
}
|
||||
|
||||
func installLoggers() {
|
||||
func installLoggers(version, commit string) {
|
||||
// Pretty logging for console
|
||||
c := zap.NewDevelopmentEncoderConfig()
|
||||
c.EncodeLevel = zapcore.CapitalColorLevelEncoder
|
||||
c.EncodeTime = zapcore.ISO8601TimeEncoder
|
||||
|
||||
debugLevel := zapcore.InfoLevel
|
||||
logLevel := zapcore.InfoLevel
|
||||
if version == "unknown" { // dev build
|
||||
debugLevel = zapcore.DebugLevel
|
||||
logLevel = zapcore.DebugLevel
|
||||
}
|
||||
pretty := zapcore.NewCore(
|
||||
zapcore.NewConsoleEncoder(c),
|
||||
zapcore.AddSync(colorable.NewColorableStdout()),
|
||||
debugLevel,
|
||||
logLevel,
|
||||
)
|
||||
|
||||
// JSON logging to log directory
|
||||
logsDir := env.LogsPath()
|
||||
if err := os.MkdirAll(logsDir, 0755); err != nil {
|
||||
zap.ReplaceGlobals(zap.New(pretty))
|
||||
zap.S().Errorf("error creating logs directory %q, will only log to console for now: %v", err)
|
||||
zap.L().Error("error creating logs directory, will only log to console for now", zap.String("path", logsDir), zap.Error(err))
|
||||
return
|
||||
}
|
||||
|
||||
@@ -291,7 +368,7 @@ func installLoggers() {
|
||||
)
|
||||
|
||||
zap.ReplaceGlobals(zap.New(zapcore.NewTee(pretty, ugly)))
|
||||
zap.S().Infof("backrest version %v@%v, using log directory: %v", version, commit, logsDir)
|
||||
zap.L().Info("backrest starting", zap.String("version", version), zap.String("commit", commit), zap.String("log_dir", logsDir))
|
||||
}
|
||||
|
||||
func migratePopulateGuids(logstore oplog.OpStore, cfg *v1.Config) {
|
||||
@@ -323,8 +400,8 @@ func migratePopulateGuids(logstore oplog.OpStore, cfg *v1.Config) {
|
||||
migratedOpCount++
|
||||
return op, nil
|
||||
}); err != nil {
|
||||
zap.S().Fatalf("error populating repo GUIDs for existing operations: %v", err)
|
||||
zap.L().Fatal("error populating repo GUIDs for existing operations", zap.Error(err))
|
||||
} else if migratedOpCount > 0 {
|
||||
zap.S().Infof("populated repo GUIDs for %d existing operations", migratedOpCount)
|
||||
zap.L().Info("populated repo GUIDs for existing operations", zap.Int("count", migratedOpCount))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,51 @@
|
||||
package kvstore
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"runtime"
|
||||
"testing"
|
||||
|
||||
"github.com/ncruces/go-sqlite3/vfs"
|
||||
"github.com/ncruces/go-sqlite3/vfs/memdb"
|
||||
)
|
||||
|
||||
func NewSqliteDbForKvStore(db string) (*sql.DB, error) {
|
||||
if err := os.MkdirAll(filepath.Dir(db), 0700); err != nil {
|
||||
return nil, fmt.Errorf("create sqlite db directory: %v", err)
|
||||
}
|
||||
if !vfs.SupportsFileLocking {
|
||||
return nil, fmt.Errorf("file locking not supported")
|
||||
}
|
||||
dbpool, err := sql.Open("sqlite3", db)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("open sqlite pool: %v", err)
|
||||
}
|
||||
if vfs.SupportsSharedMemory {
|
||||
_, err = dbpool.ExecContext(context.Background(), `
|
||||
PRAGMA journal_mode = WAL;
|
||||
PRAGMA synchronous = NORMAL;
|
||||
`)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("run multiline query: %v", err)
|
||||
}
|
||||
}
|
||||
if runtime.GOOS == "darwin" {
|
||||
_, err = dbpool.ExecContext(context.Background(), "PRAGMA checkpoint_fullfsync = 1")
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("run multiline query: %v", err)
|
||||
}
|
||||
}
|
||||
return dbpool, nil
|
||||
}
|
||||
|
||||
func NewInMemorySqliteDbForKvStore(t testing.TB) (*sql.DB, error) {
|
||||
dbpool, err := sql.Open("sqlite3", memdb.TestDB(t))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("open sqlite pool: %v", err)
|
||||
}
|
||||
return dbpool, nil
|
||||
}
|
||||
@@ -90,6 +90,52 @@ const RepoViewContainer = () => {
|
||||
);
|
||||
};
|
||||
|
||||
const RemoteRepoViewContainer = () => {
|
||||
const { peerInstanceId, repoId } = useParams();
|
||||
const [config, setConfig] = useConfig();
|
||||
const [peerStates, setPeerStates] = useState<PeerState[]>([]);
|
||||
|
||||
// subscribe to peer states
|
||||
useEffect(() => {
|
||||
if (!config || !config.multihost) return;
|
||||
const cb = (states: PeerState[]) => {
|
||||
setPeerStates(states);
|
||||
};
|
||||
subscribeToPeerStates(cb);
|
||||
return () => {
|
||||
unsubscribeFromPeerStates(cb);
|
||||
};
|
||||
}, [config]);
|
||||
|
||||
if (!config) {
|
||||
return <Spin />;
|
||||
}
|
||||
|
||||
// Peer state is used to find the right repo
|
||||
const peerState = peerStates.find(
|
||||
(state) => state.peerInstanceId === peerInstanceId
|
||||
);
|
||||
const peerRepo = (peerState?.knownRepos || []).find((r) => r.id === repoId);
|
||||
|
||||
return (
|
||||
<MainContentAreaTemplate
|
||||
breadcrumbs={[
|
||||
{ title: "Peer" },
|
||||
{ title: peerInstanceId || "Unknown Peer" },
|
||||
{ title: "Repo" },
|
||||
{ title: repoId || "Unknown Repo" },
|
||||
]}
|
||||
key={`${peerInstanceId}-${repoId}`}
|
||||
>
|
||||
{peerRepo ? (
|
||||
<RepoView repo={peerRepo} />
|
||||
) : (
|
||||
<Empty description={`Repo ${repoId} not found`} />
|
||||
)}
|
||||
</MainContentAreaTemplate>
|
||||
);
|
||||
};
|
||||
|
||||
const PlanViewContainer = () => {
|
||||
const { planId } = useParams();
|
||||
const [config, setConfig] = useConfig();
|
||||
@@ -227,6 +273,10 @@ export const App: React.FC = () => {
|
||||
/>
|
||||
<Route path="/plan/:planId" element={<PlanViewContainer />} />
|
||||
<Route path="/repo/:repoId" element={<RepoViewContainer />} />
|
||||
<Route
|
||||
path="/peer/:peerInstanceId/repo/:repoId"
|
||||
element={<RemoteRepoViewContainer />}
|
||||
/>
|
||||
<Route
|
||||
path="/*"
|
||||
element={
|
||||
@@ -420,6 +470,22 @@ const getSidenavItems = (
|
||||
originalInstanceKeyid: peerState.peerKeyid,
|
||||
repoGuid: repo.guid,
|
||||
});
|
||||
|
||||
return {
|
||||
key: `repo-${peerState.peerKeyid}-${repo.guid}`,
|
||||
icon: <IconForResource selector={sel} />,
|
||||
label: (
|
||||
<div
|
||||
className="backrest visible-on-hover"
|
||||
style={{ width: "100%", height: "100%" }}
|
||||
>
|
||||
{repo.id}
|
||||
</div>
|
||||
),
|
||||
onClick: async () => {
|
||||
navigate(`/peer/${peerState.peerInstanceId}/repo/${repo.id}`);
|
||||
},
|
||||
};
|
||||
});
|
||||
|
||||
return {
|
||||
@@ -439,6 +505,7 @@ const getSidenavItems = (
|
||||
{peerState.peerInstanceId}
|
||||
</div>
|
||||
),
|
||||
children: repos.length > 0 ? repos : undefined,
|
||||
};
|
||||
};
|
||||
|
||||
|
||||
@@ -16,10 +16,19 @@ import { useConfig } from "../components/ConfigProvider";
|
||||
import { formatErrorAlert, useAlertApi } from "../components/Alerts";
|
||||
import { useShowModal } from "../components/ModalManager";
|
||||
import { create } from "@bufbuild/protobuf";
|
||||
import { SyncRepoMetadata } from "../../gen/ts/v1/syncservice_pb";
|
||||
|
||||
const StatsPanel = React.lazy(() => import("../components/StatsPanel"));
|
||||
|
||||
export const RepoView = ({ repo }: React.PropsWithChildren<{ repo: Repo }>) => {
|
||||
// Type intersection to combine properties from Repo and SyncRepoMetadata
|
||||
interface RepoProps {
|
||||
id: string;
|
||||
guid: string;
|
||||
}
|
||||
|
||||
export const RepoView = ({
|
||||
repo,
|
||||
}: React.PropsWithChildren<{ repo: RepoProps }>) => {
|
||||
const [config, _] = useConfig();
|
||||
const showModal = useShowModal();
|
||||
const alertsApi = useAlertApi()!;
|
||||
|
||||
Reference in New Issue
Block a user