mirror of
https://github.com/garethgeorge/backrest.git
synced 2026-05-04 03:50:30 +00:00
fix: possible race condition in scheduled task heap
This commit is contained in:
@@ -79,6 +79,8 @@ services:
|
||||
- BACKREST_CONFIG=/config/config.json # path for the backrest config file.
|
||||
- XDG_CACHE_HOME=/cache # path for the restic cache which greatly improves performance.
|
||||
restart: unless-stopped
|
||||
ports:
|
||||
- 9898:9898
|
||||
```
|
||||
</details>
|
||||
|
||||
|
||||
@@ -5,7 +5,6 @@ go 1.21
|
||||
require (
|
||||
connectrpc.com/connect v1.14.0
|
||||
github.com/GeertJohan/go.rice v1.0.3
|
||||
github.com/NYTimes/gziphandler v1.1.1
|
||||
github.com/gitploy-io/cronexpr v0.2.2
|
||||
github.com/hashicorp/go-multierror v1.1.1
|
||||
github.com/mattn/go-colorable v0.1.13
|
||||
@@ -22,10 +21,10 @@ require (
|
||||
require (
|
||||
github.com/daaku/go.zipexe v1.0.2 // indirect
|
||||
github.com/golang/protobuf v1.5.3 // indirect
|
||||
github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.0.1 // indirect
|
||||
github.com/grpc-ecosystem/grpc-gateway/v2 v2.18.1 // indirect
|
||||
github.com/google/go-cmp v0.6.0 // indirect
|
||||
github.com/hashicorp/errwrap v1.1.0 // indirect
|
||||
github.com/mattn/go-isatty v0.0.20 // indirect
|
||||
github.com/stretchr/testify v1.8.4 // indirect
|
||||
go.uber.org/multierr v1.11.0 // indirect
|
||||
golang.org/x/sys v0.15.0 // indirect
|
||||
golang.org/x/text v0.14.0 // indirect
|
||||
|
||||
@@ -3,8 +3,6 @@ connectrpc.com/connect v1.14.0/go.mod h1:uoAq5bmhhn43TwhaKdGKN/bZcGtzPW1v+ngDTn5
|
||||
github.com/GeertJohan/go.incremental v1.0.0/go.mod h1:6fAjUhbVuX1KcMD3c8TEgVUqmo4seqhv0i0kdATSkM0=
|
||||
github.com/GeertJohan/go.rice v1.0.3 h1:k5viR+xGtIhF61125vCE1cmJ5957RQGXG6dmbaWZSmI=
|
||||
github.com/GeertJohan/go.rice v1.0.3/go.mod h1:XVdrU4pW00M4ikZed5q56tPf1v2KwnIKeIdc9CBYNt4=
|
||||
github.com/NYTimes/gziphandler v1.1.1 h1:ZUDjpQae29j0ryrS0u/B8HZfJBtBQHjqw2rQ2cqUQ3I=
|
||||
github.com/NYTimes/gziphandler v1.1.1/go.mod h1:n/CVRwUEOgIxrgPvAQhUUr9oeUtvrhMomdKFjzJNB0c=
|
||||
github.com/akavel/rsrc v0.8.0/go.mod h1:uLoCtb9J+EyAqh+26kdrTgmzRBFPGOolLWKpdxkKq+c=
|
||||
github.com/daaku/go.zipexe v1.0.2 h1:Zg55YLYTr7M9wjKn8SY/WcpuuEi+kR2u4E8RhvpyXmk=
|
||||
github.com/daaku/go.zipexe v1.0.2/go.mod h1:5xWogtqlYnfBXkSB1o9xysukNP9GTvaNkqzUZbt3Bw8=
|
||||
@@ -18,10 +16,6 @@ github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiu
|
||||
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
|
||||
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
|
||||
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
|
||||
github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.0.1 h1:HcUWd006luQPljE73d5sk+/VgYPGUReEVz2y1/qylwY=
|
||||
github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.0.1/go.mod h1:w9Y7gY31krpLmrVU5ZPG9H7l9fZuRu5/3R3S3FMtVQ4=
|
||||
github.com/grpc-ecosystem/grpc-gateway/v2 v2.18.1 h1:6UKoz5ujsI55KNpsJH3UwCq3T8kKbZwNZBNPuTTje8U=
|
||||
github.com/grpc-ecosystem/grpc-gateway/v2 v2.18.1/go.mod h1:YvJ2f6MplWDhfxiUC3KpyTy76kYUZA4W3pTv/wdKQ9Y=
|
||||
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
|
||||
github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I=
|
||||
github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
|
||||
@@ -61,26 +55,16 @@ golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
|
||||
golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ=
|
||||
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
|
||||
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||
google.golang.org/genproto v0.0.0-20231127180814-3a041ad873d4 h1:W12Pwm4urIbRdGhMEg2NM9O3TWKjNcxQhs46V0ypf/k=
|
||||
google.golang.org/genproto v0.0.0-20231127180814-3a041ad873d4/go.mod h1:5RBcpGRxr25RbDzY5w+dmaqpSEvl8Gwl1x2CICf60ic=
|
||||
google.golang.org/genproto v0.0.0-20231212172506-995d672761c0 h1:YJ5pD9rF8o9Qtta0Cmy9rdBwkSjrTCT6XTiUQVOtIos=
|
||||
google.golang.org/genproto v0.0.0-20231212172506-995d672761c0/go.mod h1:l/k7rMz0vFTBPy+tFSGvXEd3z+BcoG1k7EHbqm+YBsY=
|
||||
google.golang.org/genproto/googleapis/api v0.0.0-20231127180814-3a041ad873d4 h1:ZcOkrmX74HbKFYnpPY8Qsw93fC29TbJXspYKaBkSXDQ=
|
||||
google.golang.org/genproto/googleapis/api v0.0.0-20231127180814-3a041ad873d4/go.mod h1:k2dtGpRrbsSyKcNPKKI5sstZkrNCZwpU/ns96JoHbGg=
|
||||
google.golang.org/genproto/googleapis/api v0.0.0-20231212172506-995d672761c0 h1:s1w3X6gQxwrLEpxnLd/qXTVLgQE2yXwaOaoa6IlY/+o=
|
||||
google.golang.org/genproto/googleapis/api v0.0.0-20231212172506-995d672761c0/go.mod h1:CAny0tYF+0/9rmDB9fahA9YLzX3+AEVl1qXbv5hhj6c=
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20231127180814-3a041ad873d4 h1:DC7wcm+i+P1rN3Ff07vL+OndGg5OhNddHyTA+ocPqYE=
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20231127180814-3a041ad873d4/go.mod h1:eJVxU6o+4G1PSczBr85xmyvSNYAKvAYgkub40YGomFM=
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20231212172506-995d672761c0 h1:/jFB8jK5R3Sq3i/lmeZO0cATSzFfZaJq1J2Euan3XKU=
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20231212172506-995d672761c0/go.mod h1:FUoWkonphQm3RhTS+kOEhF8h0iDpm4tdXolVCeZ9KKA=
|
||||
google.golang.org/grpc v1.59.0 h1:Z5Iec2pjwb+LEOqzpB2MR12/eKFhDPhuqW91O+4bwUk=
|
||||
google.golang.org/grpc v1.59.0/go.mod h1:aUPDwccQo6OTjy7Hct4AfBPD1GptF4fyUjIkQ9YtF98=
|
||||
google.golang.org/grpc v1.60.1 h1:26+wFr+cNqSGFcOXcabYC0lUVJVRa2Sb2ortSK7VrEU=
|
||||
google.golang.org/grpc v1.60.1/go.mod h1:OlCHIeLYqSSsLi6i49B5QGdzaMZK9+M7LXN2FKz4eGM=
|
||||
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
|
||||
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
|
||||
google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8=
|
||||
google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
|
||||
google.golang.org/protobuf v1.32.0 h1:pPC6BG5ex8PDFnkbrGU3EixyhKcQ2aDuBS36lqK/C7I=
|
||||
google.golang.org/protobuf v1.32.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
|
||||
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
|
||||
|
||||
@@ -110,11 +110,14 @@ func (s *Server) AddRepo(ctx context.Context, req *connect.Request[v1.Repo]) (*c
|
||||
return nil, fmt.Errorf("failed to update config: %w", err)
|
||||
}
|
||||
|
||||
zap.L().Debug("Applying config")
|
||||
s.orchestrator.ApplyConfig(c)
|
||||
|
||||
// index snapshots for the newly added repository.
|
||||
zap.L().Debug("Scheduling index snapshots task")
|
||||
s.orchestrator.ScheduleTask(orchestrator.NewOneoffIndexSnapshotsTask(s.orchestrator, req.Msg.Id, time.Now()), orchestrator.TaskPriorityInteractive+orchestrator.TaskPriorityIndexSnapshots)
|
||||
|
||||
zap.L().Debug("Done add repo")
|
||||
return connect.NewResponse(c), nil
|
||||
}
|
||||
|
||||
|
||||
@@ -73,8 +73,10 @@ func (t *taskQueue) Dequeue(ctx context.Context) *scheduledTask {
|
||||
defer t.dequeueMu.Unlock()
|
||||
|
||||
t.mu.Lock()
|
||||
t.notify = make(chan struct{}, 1)
|
||||
defer t.mu.Unlock()
|
||||
t.notify = make(chan struct{}, 10)
|
||||
defer func() {
|
||||
close(t.notify)
|
||||
t.notify = nil
|
||||
}()
|
||||
|
||||
@@ -82,12 +84,12 @@ func (t *taskQueue) Dequeue(ctx context.Context) *scheduledTask {
|
||||
first, ok := t.heap.Peek().(*scheduledTask)
|
||||
if !ok { // no tasks in heap.
|
||||
if t.ready.Len() > 0 {
|
||||
t.mu.Unlock()
|
||||
return heap.Pop(&t.ready).(*scheduledTask)
|
||||
}
|
||||
t.mu.Unlock()
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
t.mu.Lock()
|
||||
return nil
|
||||
case <-t.notify:
|
||||
}
|
||||
@@ -101,7 +103,6 @@ func (t *taskQueue) Dequeue(ctx context.Context) *scheduledTask {
|
||||
ready, ok := t.ready.Peek().(*scheduledTask)
|
||||
if ok && now.Before(first.runAt) {
|
||||
heap.Pop(&t.ready)
|
||||
t.mu.Unlock()
|
||||
return ready
|
||||
}
|
||||
|
||||
@@ -137,7 +138,6 @@ func (t *taskQueue) Dequeue(ctx context.Context) *scheduledTask {
|
||||
if t.ready.Len() == 0 {
|
||||
break
|
||||
}
|
||||
t.mu.Unlock()
|
||||
return heap.Pop(&t.ready).(*scheduledTask)
|
||||
case <-t.notify: // new task was added, loop again to ensure we have the earliest task.
|
||||
t.mu.Lock()
|
||||
@@ -145,6 +145,7 @@ func (t *taskQueue) Dequeue(ctx context.Context) *scheduledTask {
|
||||
<-timer.C
|
||||
}
|
||||
case <-ctx.Done():
|
||||
t.mu.Lock()
|
||||
if !timer.Stop() {
|
||||
<-timer.C
|
||||
}
|
||||
|
||||
@@ -44,6 +44,9 @@ func (t *StatsTask) shouldRun() (bool, error) {
|
||||
if _, ok := op.Op.(*v1.Operation_OperationStats); ok {
|
||||
bytesSinceLastStat = 0
|
||||
} else if backup, ok := op.Op.(*v1.Operation_OperationBackup); ok && backup.OperationBackup.LastStatus != nil {
|
||||
if bytesSinceLastStat == -1 {
|
||||
return nil
|
||||
}
|
||||
if summary, ok := backup.OperationBackup.LastStatus.Entry.(*v1.BackupProgressEntry_Summary); ok {
|
||||
bytesSinceLastStat += summary.Summary.DataAdded
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user