mirror of
https://github.com/garethgeorge/backrest.git
synced 2025-12-15 18:15:37 +00:00
fix: concurrency issues in run command handler
This commit is contained in:
@@ -1,6 +1,7 @@
|
||||
package api
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/hex"
|
||||
"errors"
|
||||
@@ -419,14 +420,16 @@ func (s *BackrestHandler) RunCommand(ctx context.Context, req *connect.Request[v
|
||||
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
|
||||
errChan := make(chan error, 1)
|
||||
outputs := make(chan []byte, 100)
|
||||
errChan := make(chan error, 1)
|
||||
go func() {
|
||||
start := time.Now()
|
||||
if err := repo.RunCommand(ctx, req.Msg.Command, func(output []byte) {
|
||||
outputs <- output
|
||||
outputs <- bytes.Clone(output)
|
||||
}); err != nil {
|
||||
errChan <- err
|
||||
}
|
||||
outputs <- []byte("took " + time.Since(start).String())
|
||||
cancel()
|
||||
}()
|
||||
|
||||
|
||||
@@ -108,7 +108,7 @@ func validatePlan(plan *v1.Plan, repos map[string]*v1.Repo) error {
|
||||
|
||||
if plan.Retention != nil && plan.Retention.Policy == nil {
|
||||
err = multierror.Append(err, errors.New("retention policy must be nil or must specify a policy"))
|
||||
} else if policyTimeBucketed, ok := plan.Retention.Policy.(*v1.RetentionPolicy_PolicyTimeBucketed); ok {
|
||||
} else if policyTimeBucketed, ok := plan.Retention.GetPolicy().(*v1.RetentionPolicy_PolicyTimeBucketed); ok {
|
||||
if proto.Equal(policyTimeBucketed.PolicyTimeBucketed, &v1.RetentionPolicy_TimeBucketedCounts{}) {
|
||||
err = multierror.Append(err, errors.New("time bucketed policy must specify a non-empty bucket"))
|
||||
}
|
||||
|
||||
@@ -88,14 +88,16 @@ func NewOpLog(databasePath string) (*OpLog, error) {
|
||||
// Scan checks the log for incomplete operations. Should only be called at startup.
|
||||
func (o *OpLog) Scan(onIncomplete func(op *v1.Operation)) error {
|
||||
zap.L().Debug("scanning oplog for incomplete operations")
|
||||
t := time.Now()
|
||||
err := o.db.Update(func(tx *bolt.Tx) error {
|
||||
sysBucket := tx.Bucket(SystemBucket)
|
||||
opLogBucket := tx.Bucket(OpLogBucket)
|
||||
c := opLogBucket.Cursor()
|
||||
var k, v []byte
|
||||
if lastValidated := sysBucket.Get([]byte("last_validated")); lastValidated != nil {
|
||||
c.Seek(lastValidated)
|
||||
k, v = c.Seek(lastValidated)
|
||||
}
|
||||
for k, v := c.Prev(); k != nil; k, v = c.Next() {
|
||||
for ; k != nil; k, v = c.Next() {
|
||||
op := &v1.Operation{}
|
||||
if err := proto.Unmarshal(v, op); err != nil {
|
||||
zap.L().Error("error unmarshalling operation, there may be corruption in the oplog", zap.Error(err))
|
||||
@@ -124,7 +126,7 @@ func (o *OpLog) Scan(onIncomplete func(op *v1.Operation)) error {
|
||||
if err != nil {
|
||||
return fmt.Errorf("scanning log: %v", err)
|
||||
}
|
||||
zap.L().Debug("scan complete")
|
||||
zap.L().Debug("scan complete", zap.Duration("duration", time.Since(t)))
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
@@ -379,7 +379,7 @@ func chunkBy[T any](items []T, chunkSize int) (chunks [][]T) {
|
||||
}
|
||||
|
||||
type callbackWriter struct {
|
||||
callback func([]byte)
|
||||
callback func([]byte) // note: callback must not retain the byte slice
|
||||
}
|
||||
|
||||
func (w *callbackWriter) Write(p []byte) (n int, err error) {
|
||||
|
||||
@@ -388,7 +388,7 @@ export const colorForStatus = (status: OperationStatus) => {
|
||||
case OperationStatus.STATUS_SUCCESS:
|
||||
return "green";
|
||||
case OperationStatus.STATUS_USER_CANCELLED:
|
||||
return "orange";
|
||||
return "yellow";
|
||||
default:
|
||||
return "grey";
|
||||
}
|
||||
@@ -432,7 +432,7 @@ export const detailsForOperation = (
|
||||
break;
|
||||
case OperationStatus.STATUS_USER_CANCELLED:
|
||||
state = "cancelled";
|
||||
color = "orange";
|
||||
color = "yellow";
|
||||
break;
|
||||
default:
|
||||
state = "";
|
||||
|
||||
Reference in New Issue
Block a user