feat: support live logrefs for in-progress operations (#456)

This commit is contained in:
Gareth
2024-09-04 22:03:10 -07:00
committed by GitHub
parent b5e6febf45
commit bfaad8b69e
31 changed files with 960 additions and 277 deletions

View File

@@ -29,7 +29,7 @@ func ContextWithWriter(ctx context.Context, logger io.Writer) context.Context {
// Logger returns a logger from the context, or the global logger if none is found.
// this is somewhat expensive, it should be called once per task.
func Logger(ctx context.Context) *zap.Logger {
func Logger(ctx context.Context, prefix string) *zap.Logger {
writer := WriterFromContext(ctx)
if writer == nil {
return zap.L()
@@ -39,7 +39,7 @@ func Logger(ctx context.Context) *zap.Logger {
fe := zapcore.NewConsoleEncoder(p)
l := zap.New(zapcore.NewTee(
zap.L().Core(),
zapcore.NewCore(fe, zapcore.AddSync(&ioutil.LinePrefixer{W: writer, Prefix: []byte("[tasklog] ")}), zapcore.DebugLevel),
zapcore.NewCore(fe, zapcore.AddSync(&ioutil.LinePrefixer{W: writer, Prefix: []byte(prefix)}), zapcore.DebugLevel),
))
return l
}

View File

@@ -1,23 +1,22 @@
package orchestrator
import (
"bytes"
"context"
"errors"
"fmt"
"io"
"slices"
"sync"
"time"
v1 "github.com/garethgeorge/backrest/gen/go/v1"
"github.com/garethgeorge/backrest/internal/config"
"github.com/garethgeorge/backrest/internal/ioutil"
"github.com/garethgeorge/backrest/internal/logwriter"
"github.com/garethgeorge/backrest/internal/oplog"
"github.com/garethgeorge/backrest/internal/orchestrator/logging"
"github.com/garethgeorge/backrest/internal/orchestrator/repo"
"github.com/garethgeorge/backrest/internal/orchestrator/tasks"
"github.com/garethgeorge/backrest/internal/queue"
"github.com/garethgeorge/backrest/internal/rotatinglog"
"go.uber.org/zap"
"google.golang.org/protobuf/proto"
)
@@ -33,7 +32,7 @@ type Orchestrator struct {
OpLog *oplog.OpLog
repoPool *resticRepoPool
taskQueue *queue.TimePriorityQueue[stContainer]
logStore *rotatinglog.RotatingLog
logStore *logwriter.LogManager
// cancelNotify is a list of channels that are notified when a task should be cancelled.
cancelNotify []chan int64
@@ -59,7 +58,7 @@ func (st stContainer) Less(other stContainer) bool {
return st.ScheduledTask.Less(other.ScheduledTask)
}
func NewOrchestrator(resticBin string, cfg *v1.Config, log *oplog.OpLog, logStore *rotatinglog.RotatingLog) (*Orchestrator, error) {
func NewOrchestrator(resticBin string, cfg *v1.Config, log *oplog.OpLog, logStore *logwriter.LogManager) (*Orchestrator, error) {
cfg = proto.Clone(cfg).(*v1.Config)
// create the orchestrator.
@@ -96,6 +95,14 @@ func NewOrchestrator(resticBin string, cfg *v1.Config, log *oplog.OpLog, logStor
}
for _, op := range incompleteOps {
// check for logs to finalize
if op.Logref != "" {
if frozenID, err := logStore.Finalize(op.Logref); err != nil {
zap.L().Warn("failed to finalize livelog ref for incomplete operation", zap.String("logref", op.Logref), zap.Error(err))
} else {
op.Logref = frozenID
}
}
op.Status = v1.OperationStatus_STATUS_ERROR
op.DisplayMessage = "Operation was incomplete when orchestrator was restarted."
op.UnixTimeEndMs = op.UnixTimeStartMs
@@ -122,6 +129,12 @@ func NewOrchestrator(resticBin string, cfg *v1.Config, log *oplog.OpLog, logStor
}
}
for _, id := range logStore.LiveLogIDs() {
if _, err := logStore.Finalize(id); err != nil {
zap.L().Warn("failed to finalize unassociated live log", zap.String("id", id), zap.Error(err))
}
}
zap.L().Info("scrubbed operation log for incomplete operations",
zap.Duration("duration", time.Since(startTime)),
zap.Int("incomplete_ops", len(incompleteOps)),
@@ -378,15 +391,19 @@ func (o *Orchestrator) Run(ctx context.Context) {
}
func (o *Orchestrator) RunTask(ctx context.Context, st tasks.ScheduledTask) error {
logs := bytes.NewBuffer(nil)
ctx = logging.ContextWithWriter(ctx, &ioutil.SynchronizedWriter{W: logs})
op := st.Op
runner := newTaskRunnerImpl(o, st.Task, st.Op)
zap.L().Info("running task", zap.String("task", st.Task.Name()), zap.String("runAt", st.RunAt.Format(time.RFC3339)))
var liveLogID string
var logWriter io.WriteCloser
op := st.Op
if op != nil {
var err error
liveLogID, logWriter, err = o.logStore.NewLiveWriter(fmt.Sprintf("%x", op.GetId()))
if err != nil {
zap.S().Errorf("failed to create live log writer: %v", err)
}
ctx = logging.ContextWithWriter(ctx, logWriter)
op.Logref = liveLogID // set the logref to the live log.
op.UnixTimeStartMs = time.Now().UnixMilli()
if op.Status == v1.OperationStatus_STATUS_PENDING || op.Status == v1.OperationStatus_STATUS_UNKNOWN {
op.Status = v1.OperationStatus_STATUS_INPROGRESS
@@ -403,6 +420,7 @@ func (o *Orchestrator) RunTask(ctx context.Context, st tasks.ScheduledTask) erro
}
start := time.Now()
runner := newTaskRunnerImpl(o, st.Task, st.Op)
err := st.Task.Run(ctx, st, runner)
if err != nil {
runner.Logger(ctx).Error("task failed", zap.Error(err), zap.Duration("duration", time.Since(start)))
@@ -412,14 +430,17 @@ func (o *Orchestrator) RunTask(ctx context.Context, st tasks.ScheduledTask) erro
if op != nil {
// write logs to log storage for this task.
if logs.Len() > 0 {
ref, err := o.logStore.Write(logs.Bytes())
if err != nil {
zap.S().Errorf("failed to write logs for task %q to log store: %v", st.Task.Name(), err)
if logWriter != nil {
if err := logWriter.Close(); err != nil {
zap.S().Errorf("failed to close live log writer: %v", err)
}
if finalID, err := o.logStore.Finalize(liveLogID); err != nil {
zap.S().Errorf("failed to finalize live log: %v", err)
} else {
op.Logref = ref
op.Logref = finalID
}
}
if err != nil {
var taskCancelledError *tasks.TaskCancelledError
var taskRetryError *tasks.TaskRetryError

View File

@@ -20,7 +20,7 @@ func forwardResticLogs(ctx context.Context) (context.Context, func()) {
prefixWriter := &ioutil.LinePrefixer{W: limitWriter, Prefix: []byte("[restic] ")}
return restic.ContextWithLogger(ctx, prefixWriter), func() {
if limitWriter.D > 0 {
fmt.Fprintf(writer, "Output truncated, %d bytes dropped\n", limitWriter.D)
fmt.Fprintf(prefixWriter, "... Output truncated, %d bytes dropped\n", limitWriter.D)
}
prefixWriter.Close()
}

View File

@@ -82,7 +82,7 @@ func NewRepoOrchestrator(config *v1.Config, repoConfig *v1.Repo, resticPath stri
}
func (r *RepoOrchestrator) logger(ctx context.Context) *zap.Logger {
return logging.Logger(ctx).With(zap.String("repo", r.repoConfig.Id))
return logging.Logger(ctx, "[repo-manager] ").With(zap.String("repo", r.repoConfig.Id))
}
func (r *RepoOrchestrator) Init(ctx context.Context) error {

View File

@@ -2,12 +2,16 @@ package orchestrator
import (
"context"
"crypto/rand"
"encoding/hex"
"errors"
"fmt"
"io"
"time"
v1 "github.com/garethgeorge/backrest/gen/go/v1"
"github.com/garethgeorge/backrest/internal/hook"
"github.com/garethgeorge/backrest/internal/logwriter"
"github.com/garethgeorge/backrest/internal/oplog"
"github.com/garethgeorge/backrest/internal/orchestrator/logging"
"github.com/garethgeorge/backrest/internal/orchestrator/repo"
@@ -155,5 +159,41 @@ func (t *taskRunnerImpl) Config() *v1.Config {
}
func (t *taskRunnerImpl) Logger(ctx context.Context) *zap.Logger {
return logging.Logger(ctx).Named(t.t.Name())
return logging.Logger(ctx, "[tasklog] ").Named(t.t.Name())
}
func (t *taskRunnerImpl) LogrefWriter() (string, tasks.LogrefWriter, error) {
id := make([]byte, 16)
if _, err := rand.Read(id); err != nil {
return "", nil, fmt.Errorf("read random: %w", err)
}
idStr := hex.EncodeToString(id)
liveID, writer, err := t.orchestrator.logStore.NewLiveWriter(idStr)
if err != nil {
return "", nil, fmt.Errorf("new log writer: %w", err)
}
return liveID, &logrefWriter{
logmgr: t.orchestrator.logStore,
id: liveID,
writer: writer,
}, nil
}
type logrefWriter struct {
logmgr *logwriter.LogManager
id string
writer io.WriteCloser
}
var _ tasks.LogrefWriter = &logrefWriter{}
func (l *logrefWriter) Write(p []byte) (n int, err error) {
return l.writer.Write(p)
}
func (l *logrefWriter) Close() (string, error) {
if err := l.writer.Close(); err != nil {
return "", err
}
return l.logmgr.Finalize(l.id)
}

View File

@@ -52,6 +52,13 @@ type TaskRunner interface {
Config() *v1.Config
// Logger returns the logger.
Logger(ctx context.Context) *zap.Logger
// LogrefWriter returns a writer that can be used to track streaming operation output.
LogrefWriter() (liveID string, w LogrefWriter, err error)
}
type LogrefWriter interface {
Write(data []byte) (int, error)
Close() (frozenID string, err error)
}
type TaskExecutor interface {
@@ -211,3 +218,7 @@ func (t *testTaskRunner) Config() *v1.Config {
func (t *testTaskRunner) Logger(ctx context.Context) *zap.Logger {
return zap.L()
}
func (t *testTaskRunner) LogrefWriter() (liveID string, w LogrefWriter, err error) {
panic("not implemented")
}

View File

@@ -1,18 +1,14 @@
package tasks
import (
"bytes"
"context"
"errors"
"fmt"
"sync"
"time"
v1 "github.com/garethgeorge/backrest/gen/go/v1"
"github.com/garethgeorge/backrest/internal/ioutil"
"github.com/garethgeorge/backrest/internal/oplog"
"github.com/garethgeorge/backrest/internal/protoutil"
"go.uber.org/zap"
)
type CheckTask struct {
@@ -117,38 +113,17 @@ func (t *CheckTask) Run(ctx context.Context, st ScheduledTask, runner TaskRunner
}
op.Op = opCheck
checkCtx, cancelCheckCtx := context.WithCancel(ctx)
interval := time.NewTicker(1 * time.Second)
defer interval.Stop()
buf := bytes.NewBuffer(nil)
bufWriter := &ioutil.SynchronizedWriter{W: &ioutil.LimitWriter{W: buf, N: 16 * 1024}}
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case <-interval.C:
bufWriter.Mu.Lock()
output := buf.String()
bufWriter.Mu.Unlock()
liveID, writer, err := runner.LogrefWriter()
if err != nil {
return fmt.Errorf("create logref writer: %w", err)
}
opCheck.OperationCheck.OutputLogref = liveID
if opCheck.OperationCheck.Output != string(output) {
opCheck.OperationCheck.Output = string(output)
if err := runner.UpdateOperation(op); err != nil {
return fmt.Errorf("update operation: %w", err)
}
if err := runner.OpLog().Update(op); err != nil {
zap.L().Error("update check operation with status output", zap.Error(err))
}
}
case <-checkCtx.Done():
return
}
}
}()
err = repo.Check(checkCtx, bufWriter)
cancelCheckCtx()
wg.Wait()
err = repo.Check(ctx, writer)
if err != nil {
runner.ExecuteHooks(ctx, []v1.Hook_Condition{
v1.Hook_CONDITION_CHECK_ERROR,
@@ -160,7 +135,11 @@ func (t *CheckTask) Run(ctx context.Context, st ScheduledTask, runner TaskRunner
return fmt.Errorf("check: %w", err)
}
opCheck.OperationCheck.Output = string(buf.Bytes())
frozenID, err := writer.Close()
if err != nil {
return fmt.Errorf("close logref writer: %w", err)
}
opCheck.OperationCheck.OutputLogref = frozenID
if err := runner.ExecuteHooks(ctx, []v1.Hook_Condition{
v1.Hook_CONDITION_CHECK_SUCCESS,

View File

@@ -1,15 +1,12 @@
package tasks
import (
"bytes"
"context"
"errors"
"fmt"
"sync"
"time"
v1 "github.com/garethgeorge/backrest/gen/go/v1"
"github.com/garethgeorge/backrest/internal/ioutil"
"github.com/garethgeorge/backrest/internal/oplog"
"github.com/garethgeorge/backrest/internal/protoutil"
"go.uber.org/zap"
@@ -116,40 +113,20 @@ func (t *PruneTask) Run(ctx context.Context, st ScheduledTask, runner TaskRunner
}
op.Op = opPrune
pruneCtx, cancelPruneCtx := context.WithCancel(ctx)
interval := time.NewTicker(1 * time.Second)
defer interval.Stop()
buf := bytes.NewBuffer(nil)
bufWriter := &ioutil.SynchronizedWriter{W: &ioutil.LimitWriter{W: buf, N: 16 * 1024}}
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case <-interval.C:
bufWriter.Mu.Lock()
output := buf.String()
bufWriter.Mu.Unlock()
liveID, writer, err := runner.LogrefWriter()
if err != nil {
return fmt.Errorf("create logref writer: %w", err)
}
opPrune.OperationPrune.OutputLogref = liveID
if opPrune.OperationPrune.Output != string(output) {
opPrune.OperationPrune.Output = string(output)
if err := runner.UpdateOperation(op); err != nil {
return fmt.Errorf("update operation: %w", err)
}
if err := runner.OpLog().Update(op); err != nil {
zap.L().Error("update prune operation with status output", zap.Error(err))
}
}
case <-pruneCtx.Done():
return
}
}
}()
err = repo.Prune(pruneCtx, bufWriter)
cancelPruneCtx()
wg.Wait()
err = repo.Prune(ctx, writer)
if err != nil {
runner.ExecuteHooks(ctx, []v1.Hook_Condition{
v1.Hook_CONDITION_PRUNE_ERROR,
v1.Hook_CONDITION_ANY_ERROR,
}, HookVars{
Error: err.Error(),
@@ -158,7 +135,11 @@ func (t *PruneTask) Run(ctx context.Context, st ScheduledTask, runner TaskRunner
return fmt.Errorf("prune: %w", err)
}
opPrune.OperationPrune.Output = string(buf.Bytes())
frozenID, err := writer.Close()
if err != nil {
return fmt.Errorf("close logref writer: %w", err)
}
opPrune.OperationPrune.OutputLogref = frozenID
// Run a stats task after a successful prune
if err := runner.ScheduleTask(NewStatsTask(t.RepoID(), PlanForSystemTasks, false), TaskPriorityStats); err != nil {