From 30874c9150f32a0fba5f1ea99bc77bcc978d8b03 Mon Sep 17 00:00:00 2001 From: garethgeorge Date: Mon, 1 Jan 2024 15:07:54 -0800 Subject: [PATCH] fix: possible race condition in scheduled task heap --- README.md | 2 ++ go.mod | 5 ++--- go.sum | 16 ---------------- internal/api/server.go | 3 +++ internal/orchestrator/scheduledtaskheap.go | 9 +++++---- internal/orchestrator/taskstats.go | 3 +++ 6 files changed, 15 insertions(+), 23 deletions(-) diff --git a/README.md b/README.md index 3fdfe312..544ab8cb 100644 --- a/README.md +++ b/README.md @@ -79,6 +79,8 @@ services: - BACKREST_CONFIG=/config/config.json # path for the backrest config file. - XDG_CACHE_HOME=/cache # path for the restic cache which greatly improves performance. restart: unless-stopped + ports: + - 9898:9898 ``` diff --git a/go.mod b/go.mod index 4cf742ba..80557a28 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,6 @@ go 1.21 require ( connectrpc.com/connect v1.14.0 github.com/GeertJohan/go.rice v1.0.3 - github.com/NYTimes/gziphandler v1.1.1 github.com/gitploy-io/cronexpr v0.2.2 github.com/hashicorp/go-multierror v1.1.1 github.com/mattn/go-colorable v0.1.13 @@ -22,10 +21,10 @@ require ( require ( github.com/daaku/go.zipexe v1.0.2 // indirect github.com/golang/protobuf v1.5.3 // indirect - github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.0.1 // indirect - github.com/grpc-ecosystem/grpc-gateway/v2 v2.18.1 // indirect + github.com/google/go-cmp v0.6.0 // indirect github.com/hashicorp/errwrap v1.1.0 // indirect github.com/mattn/go-isatty v0.0.20 // indirect + github.com/stretchr/testify v1.8.4 // indirect go.uber.org/multierr v1.11.0 // indirect golang.org/x/sys v0.15.0 // indirect golang.org/x/text v0.14.0 // indirect diff --git a/go.sum b/go.sum index 7696a436..159fb1f2 100644 --- a/go.sum +++ b/go.sum @@ -3,8 +3,6 @@ connectrpc.com/connect v1.14.0/go.mod h1:uoAq5bmhhn43TwhaKdGKN/bZcGtzPW1v+ngDTn5 github.com/GeertJohan/go.incremental v1.0.0/go.mod h1:6fAjUhbVuX1KcMD3c8TEgVUqmo4seqhv0i0kdATSkM0= github.com/GeertJohan/go.rice v1.0.3 h1:k5viR+xGtIhF61125vCE1cmJ5957RQGXG6dmbaWZSmI= github.com/GeertJohan/go.rice v1.0.3/go.mod h1:XVdrU4pW00M4ikZed5q56tPf1v2KwnIKeIdc9CBYNt4= -github.com/NYTimes/gziphandler v1.1.1 h1:ZUDjpQae29j0ryrS0u/B8HZfJBtBQHjqw2rQ2cqUQ3I= -github.com/NYTimes/gziphandler v1.1.1/go.mod h1:n/CVRwUEOgIxrgPvAQhUUr9oeUtvrhMomdKFjzJNB0c= github.com/akavel/rsrc v0.8.0/go.mod h1:uLoCtb9J+EyAqh+26kdrTgmzRBFPGOolLWKpdxkKq+c= github.com/daaku/go.zipexe v1.0.2 h1:Zg55YLYTr7M9wjKn8SY/WcpuuEi+kR2u4E8RhvpyXmk= github.com/daaku/go.zipexe v1.0.2/go.mod h1:5xWogtqlYnfBXkSB1o9xysukNP9GTvaNkqzUZbt3Bw8= @@ -18,10 +16,6 @@ github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiu github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= -github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.0.1 h1:HcUWd006luQPljE73d5sk+/VgYPGUReEVz2y1/qylwY= -github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.0.1/go.mod h1:w9Y7gY31krpLmrVU5ZPG9H7l9fZuRu5/3R3S3FMtVQ4= -github.com/grpc-ecosystem/grpc-gateway/v2 v2.18.1 h1:6UKoz5ujsI55KNpsJH3UwCq3T8kKbZwNZBNPuTTje8U= -github.com/grpc-ecosystem/grpc-gateway/v2 v2.18.1/go.mod h1:YvJ2f6MplWDhfxiUC3KpyTy76kYUZA4W3pTv/wdKQ9Y= github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I= github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= @@ -61,26 +55,16 @@ golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -google.golang.org/genproto v0.0.0-20231127180814-3a041ad873d4 h1:W12Pwm4urIbRdGhMEg2NM9O3TWKjNcxQhs46V0ypf/k= -google.golang.org/genproto v0.0.0-20231127180814-3a041ad873d4/go.mod h1:5RBcpGRxr25RbDzY5w+dmaqpSEvl8Gwl1x2CICf60ic= google.golang.org/genproto v0.0.0-20231212172506-995d672761c0 h1:YJ5pD9rF8o9Qtta0Cmy9rdBwkSjrTCT6XTiUQVOtIos= google.golang.org/genproto v0.0.0-20231212172506-995d672761c0/go.mod h1:l/k7rMz0vFTBPy+tFSGvXEd3z+BcoG1k7EHbqm+YBsY= -google.golang.org/genproto/googleapis/api v0.0.0-20231127180814-3a041ad873d4 h1:ZcOkrmX74HbKFYnpPY8Qsw93fC29TbJXspYKaBkSXDQ= -google.golang.org/genproto/googleapis/api v0.0.0-20231127180814-3a041ad873d4/go.mod h1:k2dtGpRrbsSyKcNPKKI5sstZkrNCZwpU/ns96JoHbGg= google.golang.org/genproto/googleapis/api v0.0.0-20231212172506-995d672761c0 h1:s1w3X6gQxwrLEpxnLd/qXTVLgQE2yXwaOaoa6IlY/+o= google.golang.org/genproto/googleapis/api v0.0.0-20231212172506-995d672761c0/go.mod h1:CAny0tYF+0/9rmDB9fahA9YLzX3+AEVl1qXbv5hhj6c= -google.golang.org/genproto/googleapis/rpc v0.0.0-20231127180814-3a041ad873d4 h1:DC7wcm+i+P1rN3Ff07vL+OndGg5OhNddHyTA+ocPqYE= -google.golang.org/genproto/googleapis/rpc v0.0.0-20231127180814-3a041ad873d4/go.mod h1:eJVxU6o+4G1PSczBr85xmyvSNYAKvAYgkub40YGomFM= google.golang.org/genproto/googleapis/rpc v0.0.0-20231212172506-995d672761c0 h1:/jFB8jK5R3Sq3i/lmeZO0cATSzFfZaJq1J2Euan3XKU= google.golang.org/genproto/googleapis/rpc v0.0.0-20231212172506-995d672761c0/go.mod h1:FUoWkonphQm3RhTS+kOEhF8h0iDpm4tdXolVCeZ9KKA= -google.golang.org/grpc v1.59.0 h1:Z5Iec2pjwb+LEOqzpB2MR12/eKFhDPhuqW91O+4bwUk= -google.golang.org/grpc v1.59.0/go.mod h1:aUPDwccQo6OTjy7Hct4AfBPD1GptF4fyUjIkQ9YtF98= google.golang.org/grpc v1.60.1 h1:26+wFr+cNqSGFcOXcabYC0lUVJVRa2Sb2ortSK7VrEU= google.golang.org/grpc v1.60.1/go.mod h1:OlCHIeLYqSSsLi6i49B5QGdzaMZK9+M7LXN2FKz4eGM= google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= -google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8= -google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= google.golang.org/protobuf v1.32.0 h1:pPC6BG5ex8PDFnkbrGU3EixyhKcQ2aDuBS36lqK/C7I= google.golang.org/protobuf v1.32.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= diff --git a/internal/api/server.go b/internal/api/server.go index 1a1b3d41..6ec90703 100644 --- a/internal/api/server.go +++ b/internal/api/server.go @@ -110,11 +110,14 @@ func (s *Server) AddRepo(ctx context.Context, req *connect.Request[v1.Repo]) (*c return nil, fmt.Errorf("failed to update config: %w", err) } + zap.L().Debug("Applying config") s.orchestrator.ApplyConfig(c) // index snapshots for the newly added repository. + zap.L().Debug("Scheduling index snapshots task") s.orchestrator.ScheduleTask(orchestrator.NewOneoffIndexSnapshotsTask(s.orchestrator, req.Msg.Id, time.Now()), orchestrator.TaskPriorityInteractive+orchestrator.TaskPriorityIndexSnapshots) + zap.L().Debug("Done add repo") return connect.NewResponse(c), nil } diff --git a/internal/orchestrator/scheduledtaskheap.go b/internal/orchestrator/scheduledtaskheap.go index b8f6bd8d..eaf95f8f 100644 --- a/internal/orchestrator/scheduledtaskheap.go +++ b/internal/orchestrator/scheduledtaskheap.go @@ -73,8 +73,10 @@ func (t *taskQueue) Dequeue(ctx context.Context) *scheduledTask { defer t.dequeueMu.Unlock() t.mu.Lock() - t.notify = make(chan struct{}, 1) + defer t.mu.Unlock() + t.notify = make(chan struct{}, 10) defer func() { + close(t.notify) t.notify = nil }() @@ -82,12 +84,12 @@ func (t *taskQueue) Dequeue(ctx context.Context) *scheduledTask { first, ok := t.heap.Peek().(*scheduledTask) if !ok { // no tasks in heap. if t.ready.Len() > 0 { - t.mu.Unlock() return heap.Pop(&t.ready).(*scheduledTask) } t.mu.Unlock() select { case <-ctx.Done(): + t.mu.Lock() return nil case <-t.notify: } @@ -101,7 +103,6 @@ func (t *taskQueue) Dequeue(ctx context.Context) *scheduledTask { ready, ok := t.ready.Peek().(*scheduledTask) if ok && now.Before(first.runAt) { heap.Pop(&t.ready) - t.mu.Unlock() return ready } @@ -137,7 +138,6 @@ func (t *taskQueue) Dequeue(ctx context.Context) *scheduledTask { if t.ready.Len() == 0 { break } - t.mu.Unlock() return heap.Pop(&t.ready).(*scheduledTask) case <-t.notify: // new task was added, loop again to ensure we have the earliest task. t.mu.Lock() @@ -145,6 +145,7 @@ func (t *taskQueue) Dequeue(ctx context.Context) *scheduledTask { <-timer.C } case <-ctx.Done(): + t.mu.Lock() if !timer.Stop() { <-timer.C } diff --git a/internal/orchestrator/taskstats.go b/internal/orchestrator/taskstats.go index d696475a..612b4f7c 100644 --- a/internal/orchestrator/taskstats.go +++ b/internal/orchestrator/taskstats.go @@ -44,6 +44,9 @@ func (t *StatsTask) shouldRun() (bool, error) { if _, ok := op.Op.(*v1.Operation_OperationStats); ok { bytesSinceLastStat = 0 } else if backup, ok := op.Op.(*v1.Operation_OperationBackup); ok && backup.OperationBackup.LastStatus != nil { + if bytesSinceLastStat == -1 { + return nil + } if summary, ok := backup.OperationBackup.LastStatus.Entry.(*v1.BackupProgressEntry_Summary); ok { bytesSinceLastStat += summary.Summary.DataAdded }