From bfaad8b69e95e13006d3f64e6daa956dc060833c Mon Sep 17 00:00:00 2001 From: Gareth Date: Wed, 4 Sep 2024 22:03:10 -0700 Subject: [PATCH] feat: support live logrefs for in-progress operations (#456) --- .github/workflows/test.yml | 10 +- cmd/backrest/backrest.go | 4 +- gen/go/v1/operations.pb.go | 127 ++++++----- gen/go/v1/service.pb.go | 44 ++-- gen/go/v1/service_grpc.pb.go | 83 +++++--- gen/go/v1/v1connect/service.connect.go | 14 +- internal/api/backresthandler.go | 70 +++++- internal/api/backresthandler_test.go | 9 +- internal/logwriter/errors.go | 7 + internal/logwriter/livelog.go | 199 ++++++++++++++++++ internal/logwriter/livelog_test.go | 108 ++++++++++ internal/logwriter/manager.go | 85 ++++++++ internal/logwriter/manager_test.go | 44 ++++ .../{rotatinglog => logwriter}/rotatinglog.go | 7 +- .../rotatinglog_test.go | 2 +- internal/orchestrator/logging/logging.go | 4 +- internal/orchestrator/orchestrator.go | 55 +++-- internal/orchestrator/repo/logging.go | 2 +- internal/orchestrator/repo/repo.go | 2 +- internal/orchestrator/taskrunnerimpl.go | 42 +++- internal/orchestrator/tasks/task.go | 11 + internal/orchestrator/tasks/taskcheck.go | 49 ++--- internal/orchestrator/tasks/taskprune.go | 49 ++--- proto/v1/operations.proto | 6 +- proto/v1/service.proto | 2 +- webui/gen/ts/v1/operations_pb.ts | 22 +- webui/gen/ts/v1/service_connect.ts | 2 +- webui/package-lock.json | 64 +++++- webui/package.json | 6 +- webui/src/components/LogView.tsx | 66 ++++++ webui/src/components/OperationRow.tsx | 42 ++-- 31 files changed, 960 insertions(+), 277 deletions(-) create mode 100644 internal/logwriter/errors.go create mode 100644 internal/logwriter/livelog.go create mode 100644 internal/logwriter/livelog_test.go create mode 100644 internal/logwriter/manager.go create mode 100644 internal/logwriter/manager_test.go rename internal/{rotatinglog => logwriter}/rotatinglog.go (96%) rename internal/{rotatinglog => logwriter}/rotatinglog_test.go (99%) create mode 100644 webui/src/components/LogView.tsx diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index ccb08f3..5c8b18d 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -48,13 +48,9 @@ jobs: with: go-version: "1.21" - - name: Setup NodeJS - uses: actions/setup-node@v4 - with: - node-version: "20" - - - name: Generate - run: go generate ./... + - name: Create Fake WebUI Sources + run: | + New-Item -Path .\webui\dist-windows\index.html -ItemType File -Force - name: Build run: go build ./... diff --git a/cmd/backrest/backrest.go b/cmd/backrest/backrest.go index 82457f5..14f681a 100644 --- a/cmd/backrest/backrest.go +++ b/cmd/backrest/backrest.go @@ -20,11 +20,11 @@ import ( "github.com/garethgeorge/backrest/internal/auth" "github.com/garethgeorge/backrest/internal/config" "github.com/garethgeorge/backrest/internal/env" + "github.com/garethgeorge/backrest/internal/logwriter" "github.com/garethgeorge/backrest/internal/oplog" "github.com/garethgeorge/backrest/internal/oplog/bboltstore" "github.com/garethgeorge/backrest/internal/orchestrator" "github.com/garethgeorge/backrest/internal/resticinstaller" - "github.com/garethgeorge/backrest/internal/rotatinglog" "github.com/garethgeorge/backrest/webui" "github.com/mattn/go-colorable" "go.etcd.io/bbolt" @@ -82,7 +82,7 @@ func main() { oplog := oplog.NewOpLog(opstore) // Create rotating log storage - logStore := rotatinglog.NewRotatingLog(path.Join(env.DataDir(), "rotatinglogs"), 14) // 14 days of logs + logStore, err := logwriter.NewLogManager(path.Join(env.DataDir(), "rotatinglogs"), 14) // 14 days of logs if err != nil { zap.S().Fatalf("error creating rotating log storage: %v", err) } diff --git a/gen/go/v1/operations.pb.go b/gen/go/v1/operations.pb.go index 1941e2f..b363028 100644 --- a/gen/go/v1/operations.pb.go +++ b/gen/go/v1/operations.pb.go @@ -728,7 +728,9 @@ type OperationPrune struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - Output string `protobuf:"bytes,1,opt,name=output,proto3" json:"output,omitempty"` // output of the prune. + // Deprecated: Marked as deprecated in v1/operations.proto. + Output string `protobuf:"bytes,1,opt,name=output,proto3" json:"output,omitempty"` // output of the prune. + OutputLogref string `protobuf:"bytes,2,opt,name=output_logref,json=outputLogref,proto3" json:"output_logref,omitempty"` // logref of the prune output. } func (x *OperationPrune) Reset() { @@ -763,6 +765,7 @@ func (*OperationPrune) Descriptor() ([]byte, []int) { return file_v1_operations_proto_rawDescGZIP(), []int{6} } +// Deprecated: Marked as deprecated in v1/operations.proto. func (x *OperationPrune) GetOutput() string { if x != nil { return x.Output @@ -770,13 +773,22 @@ func (x *OperationPrune) GetOutput() string { return "" } +func (x *OperationPrune) GetOutputLogref() string { + if x != nil { + return x.OutputLogref + } + return "" +} + // OperationCheck tracks a check operation. type OperationCheck struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - Output string `protobuf:"bytes,1,opt,name=output,proto3" json:"output,omitempty"` // output of the check operation. + // Deprecated: Marked as deprecated in v1/operations.proto. + Output string `protobuf:"bytes,1,opt,name=output,proto3" json:"output,omitempty"` // output of the check operation. + OutputLogref string `protobuf:"bytes,2,opt,name=output_logref,json=outputLogref,proto3" json:"output_logref,omitempty"` // logref of the check output. } func (x *OperationCheck) Reset() { @@ -811,6 +823,7 @@ func (*OperationCheck) Descriptor() ([]byte, []int) { return file_v1_operations_proto_rawDescGZIP(), []int{7} } +// Deprecated: Marked as deprecated in v1/operations.proto. func (x *OperationCheck) GetOutput() string { if x != nil { return x.Output @@ -818,6 +831,13 @@ func (x *OperationCheck) GetOutput() string { return "" } +func (x *OperationCheck) GetOutputLogref() string { + if x != nil { + return x.OutputLogref + } + return "" +} + // OperationRestore tracks a restore operation. type OperationRestore struct { state protoimpl.MessageState @@ -1109,55 +1129,60 @@ var file_v1_operations_proto_rawDesc = []byte{ 0x73, 0x68, 0x6f, 0x74, 0x52, 0x06, 0x66, 0x6f, 0x72, 0x67, 0x65, 0x74, 0x12, 0x2b, 0x0a, 0x06, 0x70, 0x6f, 0x6c, 0x69, 0x63, 0x79, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x13, 0x2e, 0x76, 0x31, 0x2e, 0x52, 0x65, 0x74, 0x65, 0x6e, 0x74, 0x69, 0x6f, 0x6e, 0x50, 0x6f, 0x6c, 0x69, 0x63, - 0x79, 0x52, 0x06, 0x70, 0x6f, 0x6c, 0x69, 0x63, 0x79, 0x22, 0x28, 0x0a, 0x0e, 0x4f, 0x70, 0x65, - 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x50, 0x72, 0x75, 0x6e, 0x65, 0x12, 0x16, 0x0a, 0x06, 0x6f, - 0x75, 0x74, 0x70, 0x75, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x6f, 0x75, 0x74, - 0x70, 0x75, 0x74, 0x22, 0x28, 0x0a, 0x0e, 0x4f, 0x70, 0x65, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, - 0x43, 0x68, 0x65, 0x63, 0x6b, 0x12, 0x16, 0x0a, 0x06, 0x6f, 0x75, 0x74, 0x70, 0x75, 0x74, 0x18, - 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x6f, 0x75, 0x74, 0x70, 0x75, 0x74, 0x22, 0x79, 0x0a, - 0x10, 0x4f, 0x70, 0x65, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x73, 0x74, 0x6f, 0x72, - 0x65, 0x12, 0x12, 0x0a, 0x04, 0x70, 0x61, 0x74, 0x68, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, - 0x04, 0x70, 0x61, 0x74, 0x68, 0x12, 0x16, 0x0a, 0x06, 0x74, 0x61, 0x72, 0x67, 0x65, 0x74, 0x18, - 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x74, 0x61, 0x72, 0x67, 0x65, 0x74, 0x12, 0x39, 0x0a, - 0x0b, 0x6c, 0x61, 0x73, 0x74, 0x5f, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x18, 0x03, 0x20, 0x01, - 0x28, 0x0b, 0x32, 0x18, 0x2e, 0x76, 0x31, 0x2e, 0x52, 0x65, 0x73, 0x74, 0x6f, 0x72, 0x65, 0x50, - 0x72, 0x6f, 0x67, 0x72, 0x65, 0x73, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x0a, 0x6c, 0x61, - 0x73, 0x74, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x22, 0x35, 0x0a, 0x0e, 0x4f, 0x70, 0x65, 0x72, - 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x53, 0x74, 0x61, 0x74, 0x73, 0x12, 0x23, 0x0a, 0x05, 0x73, 0x74, - 0x61, 0x74, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0d, 0x2e, 0x76, 0x31, 0x2e, 0x52, - 0x65, 0x70, 0x6f, 0x53, 0x74, 0x61, 0x74, 0x73, 0x52, 0x05, 0x73, 0x74, 0x61, 0x74, 0x73, 0x22, - 0x9a, 0x01, 0x0a, 0x10, 0x4f, 0x70, 0x65, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x75, 0x6e, - 0x48, 0x6f, 0x6f, 0x6b, 0x12, 0x1b, 0x0a, 0x09, 0x70, 0x61, 0x72, 0x65, 0x6e, 0x74, 0x5f, 0x6f, - 0x70, 0x18, 0x04, 0x20, 0x01, 0x28, 0x03, 0x52, 0x08, 0x70, 0x61, 0x72, 0x65, 0x6e, 0x74, 0x4f, - 0x70, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, - 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x23, 0x0a, 0x0d, 0x6f, 0x75, 0x74, 0x70, 0x75, 0x74, 0x5f, - 0x6c, 0x6f, 0x67, 0x72, 0x65, 0x66, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, 0x6f, 0x75, - 0x74, 0x70, 0x75, 0x74, 0x4c, 0x6f, 0x67, 0x72, 0x65, 0x66, 0x12, 0x30, 0x0a, 0x09, 0x63, 0x6f, - 0x6e, 0x64, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x12, 0x2e, - 0x76, 0x31, 0x2e, 0x48, 0x6f, 0x6f, 0x6b, 0x2e, 0x43, 0x6f, 0x6e, 0x64, 0x69, 0x74, 0x69, 0x6f, - 0x6e, 0x52, 0x09, 0x63, 0x6f, 0x6e, 0x64, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x2a, 0x60, 0x0a, 0x12, - 0x4f, 0x70, 0x65, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x54, 0x79, - 0x70, 0x65, 0x12, 0x11, 0x0a, 0x0d, 0x45, 0x56, 0x45, 0x4e, 0x54, 0x5f, 0x55, 0x4e, 0x4b, 0x4e, - 0x4f, 0x57, 0x4e, 0x10, 0x00, 0x12, 0x11, 0x0a, 0x0d, 0x45, 0x56, 0x45, 0x4e, 0x54, 0x5f, 0x43, - 0x52, 0x45, 0x41, 0x54, 0x45, 0x44, 0x10, 0x01, 0x12, 0x11, 0x0a, 0x0d, 0x45, 0x56, 0x45, 0x4e, - 0x54, 0x5f, 0x55, 0x50, 0x44, 0x41, 0x54, 0x45, 0x44, 0x10, 0x02, 0x12, 0x11, 0x0a, 0x0d, 0x45, - 0x56, 0x45, 0x4e, 0x54, 0x5f, 0x44, 0x45, 0x4c, 0x45, 0x54, 0x45, 0x44, 0x10, 0x03, 0x2a, 0xc2, - 0x01, 0x0a, 0x0f, 0x4f, 0x70, 0x65, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x53, 0x74, 0x61, 0x74, - 0x75, 0x73, 0x12, 0x12, 0x0a, 0x0e, 0x53, 0x54, 0x41, 0x54, 0x55, 0x53, 0x5f, 0x55, 0x4e, 0x4b, - 0x4e, 0x4f, 0x57, 0x4e, 0x10, 0x00, 0x12, 0x12, 0x0a, 0x0e, 0x53, 0x54, 0x41, 0x54, 0x55, 0x53, - 0x5f, 0x50, 0x45, 0x4e, 0x44, 0x49, 0x4e, 0x47, 0x10, 0x01, 0x12, 0x15, 0x0a, 0x11, 0x53, 0x54, - 0x41, 0x54, 0x55, 0x53, 0x5f, 0x49, 0x4e, 0x50, 0x52, 0x4f, 0x47, 0x52, 0x45, 0x53, 0x53, 0x10, - 0x02, 0x12, 0x12, 0x0a, 0x0e, 0x53, 0x54, 0x41, 0x54, 0x55, 0x53, 0x5f, 0x53, 0x55, 0x43, 0x43, - 0x45, 0x53, 0x53, 0x10, 0x03, 0x12, 0x12, 0x0a, 0x0e, 0x53, 0x54, 0x41, 0x54, 0x55, 0x53, 0x5f, - 0x57, 0x41, 0x52, 0x4e, 0x49, 0x4e, 0x47, 0x10, 0x07, 0x12, 0x10, 0x0a, 0x0c, 0x53, 0x54, 0x41, - 0x54, 0x55, 0x53, 0x5f, 0x45, 0x52, 0x52, 0x4f, 0x52, 0x10, 0x04, 0x12, 0x1b, 0x0a, 0x17, 0x53, - 0x54, 0x41, 0x54, 0x55, 0x53, 0x5f, 0x53, 0x59, 0x53, 0x54, 0x45, 0x4d, 0x5f, 0x43, 0x41, 0x4e, - 0x43, 0x45, 0x4c, 0x4c, 0x45, 0x44, 0x10, 0x05, 0x12, 0x19, 0x0a, 0x15, 0x53, 0x54, 0x41, 0x54, - 0x55, 0x53, 0x5f, 0x55, 0x53, 0x45, 0x52, 0x5f, 0x43, 0x41, 0x4e, 0x43, 0x45, 0x4c, 0x4c, 0x45, - 0x44, 0x10, 0x06, 0x42, 0x2c, 0x5a, 0x2a, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, - 0x6d, 0x2f, 0x67, 0x61, 0x72, 0x65, 0x74, 0x68, 0x67, 0x65, 0x6f, 0x72, 0x67, 0x65, 0x2f, 0x62, - 0x61, 0x63, 0x6b, 0x72, 0x65, 0x73, 0x74, 0x2f, 0x67, 0x65, 0x6e, 0x2f, 0x67, 0x6f, 0x2f, 0x76, - 0x31, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x79, 0x52, 0x06, 0x70, 0x6f, 0x6c, 0x69, 0x63, 0x79, 0x22, 0x51, 0x0a, 0x0e, 0x4f, 0x70, 0x65, + 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x50, 0x72, 0x75, 0x6e, 0x65, 0x12, 0x1a, 0x0a, 0x06, 0x6f, + 0x75, 0x74, 0x70, 0x75, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x42, 0x02, 0x18, 0x01, 0x52, + 0x06, 0x6f, 0x75, 0x74, 0x70, 0x75, 0x74, 0x12, 0x23, 0x0a, 0x0d, 0x6f, 0x75, 0x74, 0x70, 0x75, + 0x74, 0x5f, 0x6c, 0x6f, 0x67, 0x72, 0x65, 0x66, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, + 0x6f, 0x75, 0x74, 0x70, 0x75, 0x74, 0x4c, 0x6f, 0x67, 0x72, 0x65, 0x66, 0x22, 0x51, 0x0a, 0x0e, + 0x4f, 0x70, 0x65, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x12, 0x1a, + 0x0a, 0x06, 0x6f, 0x75, 0x74, 0x70, 0x75, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x42, 0x02, + 0x18, 0x01, 0x52, 0x06, 0x6f, 0x75, 0x74, 0x70, 0x75, 0x74, 0x12, 0x23, 0x0a, 0x0d, 0x6f, 0x75, + 0x74, 0x70, 0x75, 0x74, 0x5f, 0x6c, 0x6f, 0x67, 0x72, 0x65, 0x66, 0x18, 0x02, 0x20, 0x01, 0x28, + 0x09, 0x52, 0x0c, 0x6f, 0x75, 0x74, 0x70, 0x75, 0x74, 0x4c, 0x6f, 0x67, 0x72, 0x65, 0x66, 0x22, + 0x79, 0x0a, 0x10, 0x4f, 0x70, 0x65, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x73, 0x74, + 0x6f, 0x72, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x70, 0x61, 0x74, 0x68, 0x18, 0x01, 0x20, 0x01, 0x28, + 0x09, 0x52, 0x04, 0x70, 0x61, 0x74, 0x68, 0x12, 0x16, 0x0a, 0x06, 0x74, 0x61, 0x72, 0x67, 0x65, + 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x74, 0x61, 0x72, 0x67, 0x65, 0x74, 0x12, + 0x39, 0x0a, 0x0b, 0x6c, 0x61, 0x73, 0x74, 0x5f, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x18, 0x03, + 0x20, 0x01, 0x28, 0x0b, 0x32, 0x18, 0x2e, 0x76, 0x31, 0x2e, 0x52, 0x65, 0x73, 0x74, 0x6f, 0x72, + 0x65, 0x50, 0x72, 0x6f, 0x67, 0x72, 0x65, 0x73, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x0a, + 0x6c, 0x61, 0x73, 0x74, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x22, 0x35, 0x0a, 0x0e, 0x4f, 0x70, + 0x65, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x53, 0x74, 0x61, 0x74, 0x73, 0x12, 0x23, 0x0a, 0x05, + 0x73, 0x74, 0x61, 0x74, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0d, 0x2e, 0x76, 0x31, + 0x2e, 0x52, 0x65, 0x70, 0x6f, 0x53, 0x74, 0x61, 0x74, 0x73, 0x52, 0x05, 0x73, 0x74, 0x61, 0x74, + 0x73, 0x22, 0x9a, 0x01, 0x0a, 0x10, 0x4f, 0x70, 0x65, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x52, + 0x75, 0x6e, 0x48, 0x6f, 0x6f, 0x6b, 0x12, 0x1b, 0x0a, 0x09, 0x70, 0x61, 0x72, 0x65, 0x6e, 0x74, + 0x5f, 0x6f, 0x70, 0x18, 0x04, 0x20, 0x01, 0x28, 0x03, 0x52, 0x08, 0x70, 0x61, 0x72, 0x65, 0x6e, + 0x74, 0x4f, 0x70, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, + 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x23, 0x0a, 0x0d, 0x6f, 0x75, 0x74, 0x70, 0x75, + 0x74, 0x5f, 0x6c, 0x6f, 0x67, 0x72, 0x65, 0x66, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, + 0x6f, 0x75, 0x74, 0x70, 0x75, 0x74, 0x4c, 0x6f, 0x67, 0x72, 0x65, 0x66, 0x12, 0x30, 0x0a, 0x09, + 0x63, 0x6f, 0x6e, 0x64, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0e, 0x32, + 0x12, 0x2e, 0x76, 0x31, 0x2e, 0x48, 0x6f, 0x6f, 0x6b, 0x2e, 0x43, 0x6f, 0x6e, 0x64, 0x69, 0x74, + 0x69, 0x6f, 0x6e, 0x52, 0x09, 0x63, 0x6f, 0x6e, 0x64, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x2a, 0x60, + 0x0a, 0x12, 0x4f, 0x70, 0x65, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x45, 0x76, 0x65, 0x6e, 0x74, + 0x54, 0x79, 0x70, 0x65, 0x12, 0x11, 0x0a, 0x0d, 0x45, 0x56, 0x45, 0x4e, 0x54, 0x5f, 0x55, 0x4e, + 0x4b, 0x4e, 0x4f, 0x57, 0x4e, 0x10, 0x00, 0x12, 0x11, 0x0a, 0x0d, 0x45, 0x56, 0x45, 0x4e, 0x54, + 0x5f, 0x43, 0x52, 0x45, 0x41, 0x54, 0x45, 0x44, 0x10, 0x01, 0x12, 0x11, 0x0a, 0x0d, 0x45, 0x56, + 0x45, 0x4e, 0x54, 0x5f, 0x55, 0x50, 0x44, 0x41, 0x54, 0x45, 0x44, 0x10, 0x02, 0x12, 0x11, 0x0a, + 0x0d, 0x45, 0x56, 0x45, 0x4e, 0x54, 0x5f, 0x44, 0x45, 0x4c, 0x45, 0x54, 0x45, 0x44, 0x10, 0x03, + 0x2a, 0xc2, 0x01, 0x0a, 0x0f, 0x4f, 0x70, 0x65, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x53, 0x74, + 0x61, 0x74, 0x75, 0x73, 0x12, 0x12, 0x0a, 0x0e, 0x53, 0x54, 0x41, 0x54, 0x55, 0x53, 0x5f, 0x55, + 0x4e, 0x4b, 0x4e, 0x4f, 0x57, 0x4e, 0x10, 0x00, 0x12, 0x12, 0x0a, 0x0e, 0x53, 0x54, 0x41, 0x54, + 0x55, 0x53, 0x5f, 0x50, 0x45, 0x4e, 0x44, 0x49, 0x4e, 0x47, 0x10, 0x01, 0x12, 0x15, 0x0a, 0x11, + 0x53, 0x54, 0x41, 0x54, 0x55, 0x53, 0x5f, 0x49, 0x4e, 0x50, 0x52, 0x4f, 0x47, 0x52, 0x45, 0x53, + 0x53, 0x10, 0x02, 0x12, 0x12, 0x0a, 0x0e, 0x53, 0x54, 0x41, 0x54, 0x55, 0x53, 0x5f, 0x53, 0x55, + 0x43, 0x43, 0x45, 0x53, 0x53, 0x10, 0x03, 0x12, 0x12, 0x0a, 0x0e, 0x53, 0x54, 0x41, 0x54, 0x55, + 0x53, 0x5f, 0x57, 0x41, 0x52, 0x4e, 0x49, 0x4e, 0x47, 0x10, 0x07, 0x12, 0x10, 0x0a, 0x0c, 0x53, + 0x54, 0x41, 0x54, 0x55, 0x53, 0x5f, 0x45, 0x52, 0x52, 0x4f, 0x52, 0x10, 0x04, 0x12, 0x1b, 0x0a, + 0x17, 0x53, 0x54, 0x41, 0x54, 0x55, 0x53, 0x5f, 0x53, 0x59, 0x53, 0x54, 0x45, 0x4d, 0x5f, 0x43, + 0x41, 0x4e, 0x43, 0x45, 0x4c, 0x4c, 0x45, 0x44, 0x10, 0x05, 0x12, 0x19, 0x0a, 0x15, 0x53, 0x54, + 0x41, 0x54, 0x55, 0x53, 0x5f, 0x55, 0x53, 0x45, 0x52, 0x5f, 0x43, 0x41, 0x4e, 0x43, 0x45, 0x4c, + 0x4c, 0x45, 0x44, 0x10, 0x06, 0x42, 0x2c, 0x5a, 0x2a, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, + 0x63, 0x6f, 0x6d, 0x2f, 0x67, 0x61, 0x72, 0x65, 0x74, 0x68, 0x67, 0x65, 0x6f, 0x72, 0x67, 0x65, + 0x2f, 0x62, 0x61, 0x63, 0x6b, 0x72, 0x65, 0x73, 0x74, 0x2f, 0x67, 0x65, 0x6e, 0x2f, 0x67, 0x6f, + 0x2f, 0x76, 0x31, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( diff --git a/gen/go/v1/service.pb.go b/gen/go/v1/service.pb.go index 8a02217..f2cb256 100644 --- a/gen/go/v1/service.pb.go +++ b/gen/go/v1/service.pb.go @@ -961,7 +961,7 @@ var file_v1_service_proto_rawDesc = []byte{ 0x74, 0x12, 0x17, 0x0a, 0x07, 0x72, 0x65, 0x70, 0x6f, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x72, 0x65, 0x70, 0x6f, 0x49, 0x64, 0x12, 0x18, 0x0a, 0x07, 0x63, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x63, 0x6f, 0x6d, - 0x6d, 0x61, 0x6e, 0x64, 0x32, 0xf7, 0x07, 0x0a, 0x08, 0x42, 0x61, 0x63, 0x6b, 0x72, 0x65, 0x73, + 0x6d, 0x61, 0x6e, 0x64, 0x32, 0xf9, 0x07, 0x0a, 0x08, 0x42, 0x61, 0x63, 0x6b, 0x72, 0x65, 0x73, 0x74, 0x12, 0x31, 0x0a, 0x09, 0x47, 0x65, 0x74, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x0a, 0x2e, 0x76, 0x31, 0x2e, 0x43, 0x6f, 0x6e, 0x66, @@ -1006,29 +1006,29 @@ var file_v1_service_proto_rawDesc = []byte{ 0x43, 0x61, 0x6e, 0x63, 0x65, 0x6c, 0x12, 0x11, 0x2e, 0x74, 0x79, 0x70, 0x65, 0x73, 0x2e, 0x49, 0x6e, 0x74, 0x36, 0x34, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, - 0x79, 0x22, 0x00, 0x12, 0x32, 0x0a, 0x07, 0x47, 0x65, 0x74, 0x4c, 0x6f, 0x67, 0x73, 0x12, 0x12, + 0x79, 0x22, 0x00, 0x12, 0x34, 0x0a, 0x07, 0x47, 0x65, 0x74, 0x4c, 0x6f, 0x67, 0x73, 0x12, 0x12, 0x2e, 0x76, 0x31, 0x2e, 0x4c, 0x6f, 0x67, 0x44, 0x61, 0x74, 0x61, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x11, 0x2e, 0x74, 0x79, 0x70, 0x65, 0x73, 0x2e, 0x42, 0x79, 0x74, 0x65, 0x73, - 0x56, 0x61, 0x6c, 0x75, 0x65, 0x22, 0x00, 0x12, 0x3a, 0x0a, 0x0a, 0x52, 0x75, 0x6e, 0x43, 0x6f, - 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x12, 0x15, 0x2e, 0x76, 0x31, 0x2e, 0x52, 0x75, 0x6e, 0x43, 0x6f, - 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x11, 0x2e, 0x74, - 0x79, 0x70, 0x65, 0x73, 0x2e, 0x42, 0x79, 0x74, 0x65, 0x73, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x22, - 0x00, 0x30, 0x01, 0x12, 0x39, 0x0a, 0x0e, 0x47, 0x65, 0x74, 0x44, 0x6f, 0x77, 0x6e, 0x6c, 0x6f, - 0x61, 0x64, 0x55, 0x52, 0x4c, 0x12, 0x11, 0x2e, 0x74, 0x79, 0x70, 0x65, 0x73, 0x2e, 0x49, 0x6e, - 0x74, 0x36, 0x34, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x1a, 0x12, 0x2e, 0x74, 0x79, 0x70, 0x65, 0x73, - 0x2e, 0x53, 0x74, 0x72, 0x69, 0x6e, 0x67, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x22, 0x00, 0x12, 0x41, - 0x0a, 0x0c, 0x43, 0x6c, 0x65, 0x61, 0x72, 0x48, 0x69, 0x73, 0x74, 0x6f, 0x72, 0x79, 0x12, 0x17, - 0x2e, 0x76, 0x31, 0x2e, 0x43, 0x6c, 0x65, 0x61, 0x72, 0x48, 0x69, 0x73, 0x74, 0x6f, 0x72, 0x79, - 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, - 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, - 0x00, 0x12, 0x3b, 0x0a, 0x10, 0x50, 0x61, 0x74, 0x68, 0x41, 0x75, 0x74, 0x6f, 0x63, 0x6f, 0x6d, - 0x70, 0x6c, 0x65, 0x74, 0x65, 0x12, 0x12, 0x2e, 0x74, 0x79, 0x70, 0x65, 0x73, 0x2e, 0x53, 0x74, - 0x72, 0x69, 0x6e, 0x67, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x1a, 0x11, 0x2e, 0x74, 0x79, 0x70, 0x65, - 0x73, 0x2e, 0x53, 0x74, 0x72, 0x69, 0x6e, 0x67, 0x4c, 0x69, 0x73, 0x74, 0x22, 0x00, 0x42, 0x2c, - 0x5a, 0x2a, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x67, 0x61, 0x72, - 0x65, 0x74, 0x68, 0x67, 0x65, 0x6f, 0x72, 0x67, 0x65, 0x2f, 0x62, 0x61, 0x63, 0x6b, 0x72, 0x65, - 0x73, 0x74, 0x2f, 0x67, 0x65, 0x6e, 0x2f, 0x67, 0x6f, 0x2f, 0x76, 0x31, 0x62, 0x06, 0x70, 0x72, - 0x6f, 0x74, 0x6f, 0x33, + 0x56, 0x61, 0x6c, 0x75, 0x65, 0x22, 0x00, 0x30, 0x01, 0x12, 0x3a, 0x0a, 0x0a, 0x52, 0x75, 0x6e, + 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x12, 0x15, 0x2e, 0x76, 0x31, 0x2e, 0x52, 0x75, 0x6e, + 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x11, + 0x2e, 0x74, 0x79, 0x70, 0x65, 0x73, 0x2e, 0x42, 0x79, 0x74, 0x65, 0x73, 0x56, 0x61, 0x6c, 0x75, + 0x65, 0x22, 0x00, 0x30, 0x01, 0x12, 0x39, 0x0a, 0x0e, 0x47, 0x65, 0x74, 0x44, 0x6f, 0x77, 0x6e, + 0x6c, 0x6f, 0x61, 0x64, 0x55, 0x52, 0x4c, 0x12, 0x11, 0x2e, 0x74, 0x79, 0x70, 0x65, 0x73, 0x2e, + 0x49, 0x6e, 0x74, 0x36, 0x34, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x1a, 0x12, 0x2e, 0x74, 0x79, 0x70, + 0x65, 0x73, 0x2e, 0x53, 0x74, 0x72, 0x69, 0x6e, 0x67, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x22, 0x00, + 0x12, 0x41, 0x0a, 0x0c, 0x43, 0x6c, 0x65, 0x61, 0x72, 0x48, 0x69, 0x73, 0x74, 0x6f, 0x72, 0x79, + 0x12, 0x17, 0x2e, 0x76, 0x31, 0x2e, 0x43, 0x6c, 0x65, 0x61, 0x72, 0x48, 0x69, 0x73, 0x74, 0x6f, + 0x72, 0x79, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, + 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, + 0x79, 0x22, 0x00, 0x12, 0x3b, 0x0a, 0x10, 0x50, 0x61, 0x74, 0x68, 0x41, 0x75, 0x74, 0x6f, 0x63, + 0x6f, 0x6d, 0x70, 0x6c, 0x65, 0x74, 0x65, 0x12, 0x12, 0x2e, 0x74, 0x79, 0x70, 0x65, 0x73, 0x2e, + 0x53, 0x74, 0x72, 0x69, 0x6e, 0x67, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x1a, 0x11, 0x2e, 0x74, 0x79, + 0x70, 0x65, 0x73, 0x2e, 0x53, 0x74, 0x72, 0x69, 0x6e, 0x67, 0x4c, 0x69, 0x73, 0x74, 0x22, 0x00, + 0x42, 0x2c, 0x5a, 0x2a, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x67, + 0x61, 0x72, 0x65, 0x74, 0x68, 0x67, 0x65, 0x6f, 0x72, 0x67, 0x65, 0x2f, 0x62, 0x61, 0x63, 0x6b, + 0x72, 0x65, 0x73, 0x74, 0x2f, 0x67, 0x65, 0x6e, 0x2f, 0x67, 0x6f, 0x2f, 0x76, 0x31, 0x62, 0x06, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( diff --git a/gen/go/v1/service_grpc.pb.go b/gen/go/v1/service_grpc.pb.go index 2132a9e..ee2ebfd 100644 --- a/gen/go/v1/service_grpc.pb.go +++ b/gen/go/v1/service_grpc.pb.go @@ -62,7 +62,7 @@ type BackrestClient interface { // Cancel attempts to cancel a task with the given operation ID. Not guaranteed to succeed. Cancel(ctx context.Context, in *types.Int64Value, opts ...grpc.CallOption) (*emptypb.Empty, error) // GetLogs returns the keyed large data for the given operation. - GetLogs(ctx context.Context, in *LogDataRequest, opts ...grpc.CallOption) (*types.BytesValue, error) + GetLogs(ctx context.Context, in *LogDataRequest, opts ...grpc.CallOption) (Backrest_GetLogsClient, error) // RunCommand executes a generic restic command on the repository. RunCommand(ctx context.Context, in *RunCommandRequest, opts ...grpc.CallOption) (Backrest_RunCommandClient, error) // GetDownloadURL returns a signed download URL given a forget operation ID. @@ -212,17 +212,40 @@ func (c *backrestClient) Cancel(ctx context.Context, in *types.Int64Value, opts return out, nil } -func (c *backrestClient) GetLogs(ctx context.Context, in *LogDataRequest, opts ...grpc.CallOption) (*types.BytesValue, error) { - out := new(types.BytesValue) - err := c.cc.Invoke(ctx, Backrest_GetLogs_FullMethodName, in, out, opts...) +func (c *backrestClient) GetLogs(ctx context.Context, in *LogDataRequest, opts ...grpc.CallOption) (Backrest_GetLogsClient, error) { + stream, err := c.cc.NewStream(ctx, &Backrest_ServiceDesc.Streams[1], Backrest_GetLogs_FullMethodName, opts...) if err != nil { return nil, err } - return out, nil + x := &backrestGetLogsClient{stream} + if err := x.ClientStream.SendMsg(in); err != nil { + return nil, err + } + if err := x.ClientStream.CloseSend(); err != nil { + return nil, err + } + return x, nil +} + +type Backrest_GetLogsClient interface { + Recv() (*types.BytesValue, error) + grpc.ClientStream +} + +type backrestGetLogsClient struct { + grpc.ClientStream +} + +func (x *backrestGetLogsClient) Recv() (*types.BytesValue, error) { + m := new(types.BytesValue) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil } func (c *backrestClient) RunCommand(ctx context.Context, in *RunCommandRequest, opts ...grpc.CallOption) (Backrest_RunCommandClient, error) { - stream, err := c.cc.NewStream(ctx, &Backrest_ServiceDesc.Streams[1], Backrest_RunCommand_FullMethodName, opts...) + stream, err := c.cc.NewStream(ctx, &Backrest_ServiceDesc.Streams[2], Backrest_RunCommand_FullMethodName, opts...) if err != nil { return nil, err } @@ -302,7 +325,7 @@ type BackrestServer interface { // Cancel attempts to cancel a task with the given operation ID. Not guaranteed to succeed. Cancel(context.Context, *types.Int64Value) (*emptypb.Empty, error) // GetLogs returns the keyed large data for the given operation. - GetLogs(context.Context, *LogDataRequest) (*types.BytesValue, error) + GetLogs(*LogDataRequest, Backrest_GetLogsServer) error // RunCommand executes a generic restic command on the repository. RunCommand(*RunCommandRequest, Backrest_RunCommandServer) error // GetDownloadURL returns a signed download URL given a forget operation ID. @@ -354,8 +377,8 @@ func (UnimplementedBackrestServer) Restore(context.Context, *RestoreSnapshotRequ func (UnimplementedBackrestServer) Cancel(context.Context, *types.Int64Value) (*emptypb.Empty, error) { return nil, status.Errorf(codes.Unimplemented, "method Cancel not implemented") } -func (UnimplementedBackrestServer) GetLogs(context.Context, *LogDataRequest) (*types.BytesValue, error) { - return nil, status.Errorf(codes.Unimplemented, "method GetLogs not implemented") +func (UnimplementedBackrestServer) GetLogs(*LogDataRequest, Backrest_GetLogsServer) error { + return status.Errorf(codes.Unimplemented, "method GetLogs not implemented") } func (UnimplementedBackrestServer) RunCommand(*RunCommandRequest, Backrest_RunCommandServer) error { return status.Errorf(codes.Unimplemented, "method RunCommand not implemented") @@ -601,22 +624,25 @@ func _Backrest_Cancel_Handler(srv interface{}, ctx context.Context, dec func(int return interceptor(ctx, in, info, handler) } -func _Backrest_GetLogs_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(LogDataRequest) - if err := dec(in); err != nil { - return nil, err +func _Backrest_GetLogs_Handler(srv interface{}, stream grpc.ServerStream) error { + m := new(LogDataRequest) + if err := stream.RecvMsg(m); err != nil { + return err } - if interceptor == nil { - return srv.(BackrestServer).GetLogs(ctx, in) - } - info := &grpc.UnaryServerInfo{ - Server: srv, - FullMethod: Backrest_GetLogs_FullMethodName, - } - handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(BackrestServer).GetLogs(ctx, req.(*LogDataRequest)) - } - return interceptor(ctx, in, info, handler) + return srv.(BackrestServer).GetLogs(m, &backrestGetLogsServer{stream}) +} + +type Backrest_GetLogsServer interface { + Send(*types.BytesValue) error + grpc.ServerStream +} + +type backrestGetLogsServer struct { + grpc.ServerStream +} + +func (x *backrestGetLogsServer) Send(m *types.BytesValue) error { + return x.ServerStream.SendMsg(m) } func _Backrest_RunCommand_Handler(srv interface{}, stream grpc.ServerStream) error { @@ -745,10 +771,6 @@ var Backrest_ServiceDesc = grpc.ServiceDesc{ MethodName: "Cancel", Handler: _Backrest_Cancel_Handler, }, - { - MethodName: "GetLogs", - Handler: _Backrest_GetLogs_Handler, - }, { MethodName: "GetDownloadURL", Handler: _Backrest_GetDownloadURL_Handler, @@ -768,6 +790,11 @@ var Backrest_ServiceDesc = grpc.ServiceDesc{ Handler: _Backrest_GetOperationEvents_Handler, ServerStreams: true, }, + { + StreamName: "GetLogs", + Handler: _Backrest_GetLogs_Handler, + ServerStreams: true, + }, { StreamName: "RunCommand", Handler: _Backrest_RunCommand_Handler, diff --git a/gen/go/v1/v1connect/service.connect.go b/gen/go/v1/v1connect/service.connect.go index 703d92c..2326a1e 100644 --- a/gen/go/v1/v1connect/service.connect.go +++ b/gen/go/v1/v1connect/service.connect.go @@ -116,7 +116,7 @@ type BackrestClient interface { // Cancel attempts to cancel a task with the given operation ID. Not guaranteed to succeed. Cancel(context.Context, *connect.Request[types.Int64Value]) (*connect.Response[emptypb.Empty], error) // GetLogs returns the keyed large data for the given operation. - GetLogs(context.Context, *connect.Request[v1.LogDataRequest]) (*connect.Response[types.BytesValue], error) + GetLogs(context.Context, *connect.Request[v1.LogDataRequest]) (*connect.ServerStreamForClient[types.BytesValue], error) // RunCommand executes a generic restic command on the repository. RunCommand(context.Context, *connect.Request[v1.RunCommandRequest]) (*connect.ServerStreamForClient[types.BytesValue], error) // GetDownloadURL returns a signed download URL given a forget operation ID. @@ -324,8 +324,8 @@ func (c *backrestClient) Cancel(ctx context.Context, req *connect.Request[types. } // GetLogs calls v1.Backrest.GetLogs. -func (c *backrestClient) GetLogs(ctx context.Context, req *connect.Request[v1.LogDataRequest]) (*connect.Response[types.BytesValue], error) { - return c.getLogs.CallUnary(ctx, req) +func (c *backrestClient) GetLogs(ctx context.Context, req *connect.Request[v1.LogDataRequest]) (*connect.ServerStreamForClient[types.BytesValue], error) { + return c.getLogs.CallServerStream(ctx, req) } // RunCommand calls v1.Backrest.RunCommand. @@ -368,7 +368,7 @@ type BackrestHandler interface { // Cancel attempts to cancel a task with the given operation ID. Not guaranteed to succeed. Cancel(context.Context, *connect.Request[types.Int64Value]) (*connect.Response[emptypb.Empty], error) // GetLogs returns the keyed large data for the given operation. - GetLogs(context.Context, *connect.Request[v1.LogDataRequest]) (*connect.Response[types.BytesValue], error) + GetLogs(context.Context, *connect.Request[v1.LogDataRequest], *connect.ServerStream[types.BytesValue]) error // RunCommand executes a generic restic command on the repository. RunCommand(context.Context, *connect.Request[v1.RunCommandRequest], *connect.ServerStream[types.BytesValue]) error // GetDownloadURL returns a signed download URL given a forget operation ID. @@ -457,7 +457,7 @@ func NewBackrestHandler(svc BackrestHandler, opts ...connect.HandlerOption) (str connect.WithSchema(backrestCancelMethodDescriptor), connect.WithHandlerOptions(opts...), ) - backrestGetLogsHandler := connect.NewUnaryHandler( + backrestGetLogsHandler := connect.NewServerStreamHandler( BackrestGetLogsProcedure, svc.GetLogs, connect.WithSchema(backrestGetLogsMethodDescriptor), @@ -580,8 +580,8 @@ func (UnimplementedBackrestHandler) Cancel(context.Context, *connect.Request[typ return nil, connect.NewError(connect.CodeUnimplemented, errors.New("v1.Backrest.Cancel is not implemented")) } -func (UnimplementedBackrestHandler) GetLogs(context.Context, *connect.Request[v1.LogDataRequest]) (*connect.Response[types.BytesValue], error) { - return nil, connect.NewError(connect.CodeUnimplemented, errors.New("v1.Backrest.GetLogs is not implemented")) +func (UnimplementedBackrestHandler) GetLogs(context.Context, *connect.Request[v1.LogDataRequest], *connect.ServerStream[types.BytesValue]) error { + return connect.NewError(connect.CodeUnimplemented, errors.New("v1.Backrest.GetLogs is not implemented")) } func (UnimplementedBackrestHandler) RunCommand(context.Context, *connect.Request[v1.RunCommandRequest], *connect.ServerStream[types.BytesValue]) error { diff --git a/internal/api/backresthandler.go b/internal/api/backresthandler.go index 4a32c29..db4fa65 100644 --- a/internal/api/backresthandler.go +++ b/internal/api/backresthandler.go @@ -10,6 +10,7 @@ import ( "path" "reflect" "slices" + "sync" "time" "connectrpc.com/connect" @@ -17,13 +18,13 @@ import ( v1 "github.com/garethgeorge/backrest/gen/go/v1" "github.com/garethgeorge/backrest/gen/go/v1/v1connect" "github.com/garethgeorge/backrest/internal/config" + "github.com/garethgeorge/backrest/internal/logwriter" "github.com/garethgeorge/backrest/internal/oplog" "github.com/garethgeorge/backrest/internal/orchestrator" "github.com/garethgeorge/backrest/internal/orchestrator/repo" "github.com/garethgeorge/backrest/internal/orchestrator/tasks" "github.com/garethgeorge/backrest/internal/protoutil" "github.com/garethgeorge/backrest/internal/resticinstaller" - "github.com/garethgeorge/backrest/internal/rotatinglog" "github.com/garethgeorge/backrest/pkg/restic" "go.uber.org/zap" "google.golang.org/protobuf/proto" @@ -35,12 +36,12 @@ type BackrestHandler struct { config config.ConfigStore orchestrator *orchestrator.Orchestrator oplog *oplog.OpLog - logStore *rotatinglog.RotatingLog + logStore *logwriter.LogManager } var _ v1connect.BackrestHandler = &BackrestHandler{} -func NewBackrestHandler(config config.ConfigStore, orchestrator *orchestrator.Orchestrator, oplog *oplog.OpLog, logStore *rotatinglog.RotatingLog) *BackrestHandler { +func NewBackrestHandler(config config.ConfigStore, orchestrator *orchestrator.Orchestrator, oplog *oplog.OpLog, logStore *logwriter.LogManager) *BackrestHandler { s := &BackrestHandler{ config: config, orchestrator: orchestrator, @@ -530,18 +531,65 @@ func (s *BackrestHandler) ClearHistory(ctx context.Context, req *connect.Request return connect.NewResponse(&emptypb.Empty{}), err } -func (s *BackrestHandler) GetLogs(ctx context.Context, req *connect.Request[v1.LogDataRequest]) (*connect.Response[types.BytesValue], error) { - data, err := s.logStore.Read(req.Msg.GetRef()) +func (s *BackrestHandler) GetLogs(ctx context.Context, req *connect.Request[v1.LogDataRequest], resp *connect.ServerStream[types.BytesValue]) error { + ch, err := s.logStore.Subscribe(req.Msg.Ref) if err != nil { - if errors.Is(err, rotatinglog.ErrFileNotFound) { - return connect.NewResponse(&types.BytesValue{ + if errors.Is(err, logwriter.ErrFileNotFound) { + resp.Send(&types.BytesValue{ Value: []byte(fmt.Sprintf("file associated with log %v not found, it may have rotated out of the log history", req.Msg.GetRef())), - }), nil + }) + } + return fmt.Errorf("get log data %v: %w", req.Msg.GetRef(), err) + } + + doneCh := make(chan struct{}) + + var mu sync.Mutex + var buf bytes.Buffer + interval := time.NewTicker(250 * time.Millisecond) + defer interval.Stop() + + go func() { + for { + select { + case data, ok := <-ch: + if !ok { + close(doneCh) + return + } + mu.Lock() + buf.Write(data) + mu.Unlock() + case <-ctx.Done(): + return + } + } + }() + + flushHelper := func() error { + mu.Lock() + defer mu.Unlock() + if buf.Len() > 0 { + if err := resp.Send(&types.BytesValue{Value: buf.Bytes()}); err != nil { + return err + } + buf.Reset() + } + return nil + } + + for { + select { + case <-interval.C: + if err := flushHelper(); err != nil { + return err + } + case <-ctx.Done(): + return ctx.Err() + case <-doneCh: + return flushHelper() } - - return nil, fmt.Errorf("get log data %v: %w", req.Msg.GetRef(), err) } - return connect.NewResponse(&types.BytesValue{Value: data}), nil } func (s *BackrestHandler) GetDownloadURL(ctx context.Context, req *connect.Request[types.Int64Value]) (*connect.Response[types.StringValue], error) { diff --git a/internal/api/backresthandler_test.go b/internal/api/backresthandler_test.go index 6f56406..670edc7 100644 --- a/internal/api/backresthandler_test.go +++ b/internal/api/backresthandler_test.go @@ -18,11 +18,11 @@ import ( "github.com/garethgeorge/backrest/gen/go/types" v1 "github.com/garethgeorge/backrest/gen/go/v1" "github.com/garethgeorge/backrest/internal/config" + "github.com/garethgeorge/backrest/internal/logwriter" "github.com/garethgeorge/backrest/internal/oplog" "github.com/garethgeorge/backrest/internal/oplog/bboltstore" "github.com/garethgeorge/backrest/internal/orchestrator" "github.com/garethgeorge/backrest/internal/resticinstaller" - "github.com/garethgeorge/backrest/internal/rotatinglog" "golang.org/x/sync/errgroup" "google.golang.org/protobuf/proto" ) @@ -769,7 +769,7 @@ type systemUnderTest struct { oplog *oplog.OpLog opstore *bboltstore.BboltStore orch *orchestrator.Orchestrator - logStore *rotatinglog.RotatingLog + logStore *logwriter.LogManager config *v1.Config } @@ -791,7 +791,10 @@ func createSystemUnderTest(t *testing.T, config config.ConfigStore) systemUnderT } t.Cleanup(func() { opstore.Close() }) oplog := oplog.NewOpLog(opstore) - logStore := rotatinglog.NewRotatingLog(dir+"/log", 10) + logStore, err := logwriter.NewLogManager(dir+"/log", 10) + if err != nil { + t.Fatalf("Failed to create log store: %v", err) + } orch, err := orchestrator.NewOrchestrator( resticBin, cfg, oplog, logStore, ) diff --git a/internal/logwriter/errors.go b/internal/logwriter/errors.go new file mode 100644 index 0000000..4e81023 --- /dev/null +++ b/internal/logwriter/errors.go @@ -0,0 +1,7 @@ +package logwriter + +import "errors" + +var ErrFileNotFound = errors.New("file not found") +var ErrNotFound = errors.New("entry not found") +var ErrBadName = errors.New("bad name") diff --git a/internal/logwriter/livelog.go b/internal/logwriter/livelog.go new file mode 100644 index 0000000..0344497 --- /dev/null +++ b/internal/logwriter/livelog.go @@ -0,0 +1,199 @@ +package logwriter + +import ( + "bytes" + "errors" + "io" + "os" + "path" + "slices" + "sync" +) + +var ErrAlreadyExists = errors.New("already exists") + +type LiveLog struct { + mu sync.Mutex + dir string + writers map[string]*LiveLogWriter +} + +func NewLiveLogger(dir string) (*LiveLog, error) { + if err := os.MkdirAll(dir, 0755); err != nil { + return nil, err + } + return &LiveLog{dir: dir, writers: make(map[string]*LiveLogWriter)}, nil +} + +func (t *LiveLog) ListIDs() []string { + t.mu.Lock() + defer t.mu.Unlock() + + files, err := os.ReadDir(t.dir) + if err != nil { + return nil + } + + ids := make([]string, 0, len(files)) + for _, f := range files { + if !f.IsDir() { + ids = append(ids, f.Name()) + } + } + return ids +} + +func (t *LiveLog) NewWriter(id string) (*LiveLogWriter, error) { + t.mu.Lock() + defer t.mu.Unlock() + if _, ok := t.writers[id]; ok { + return nil, ErrAlreadyExists + } + fh, err := os.Create(path.Join(t.dir, id)) + if err != nil { + return nil, err + } + w := &LiveLogWriter{ + fh: fh, + id: id, + ll: t, + path: path.Join(t.dir, id), + } + t.writers[id] = w + return w, nil +} + +func (t *LiveLog) Unsubscribe(id string, ch chan []byte) { + t.mu.Lock() + defer t.mu.Unlock() + + if w, ok := t.writers[id]; ok { + w.mu.Lock() + defer w.mu.Unlock() + w.subscribers = slices.DeleteFunc(w.subscribers, func(c chan []byte) bool { + return c == ch + }) + } +} + +func (t *LiveLog) Subscribe(id string) (chan []byte, error) { + t.mu.Lock() + defer t.mu.Unlock() + + if w, ok := t.writers[id]; ok { + // If there is a writer, block writes until we are done opening the file + w.mu.Lock() + defer w.mu.Unlock() + } + + fh, err := os.Open(path.Join(t.dir, id)) + if err != nil { + if os.IsNotExist(err) { + return nil, ErrFileNotFound + } + return nil, err + } + + ch := make(chan []byte, 1) + go func() { + buf := make([]byte, 4096) + for { + n, err := fh.Read(buf) + if err == io.EOF { + break + } else if err != nil { + return + } + ch <- bytes.Clone(buf[:n]) + } + + // Lock the writer to prevent writes while we switch subscription modes + t.mu.Lock() + if w, ok := t.writers[id]; ok { + w.mu.Lock() + defer w.mu.Unlock() + } + t.mu.Unlock() + + // Read anything written while we were acquiring the lock + for { + n, err := fh.Read(buf) + if err == io.EOF { + break + } + if err != nil { + close(ch) + fh.Close() + return + } + ch <- bytes.Clone(buf[:n]) + } + fh.Close() + + // Install subscription in the writer OR close the channel if the writer is gone + t.mu.Lock() + if w, ok := t.writers[id]; ok { + w.subscribers = append(w.subscribers, ch) + } else { + close(ch) + } + t.mu.Unlock() + }() + + return ch, nil +} + +func (t *LiveLog) Remove(id string) error { + t.mu.Lock() + defer t.mu.Unlock() + delete(t.writers, id) + return os.Remove(path.Join(t.dir, id)) +} + +func (t *LiveLog) IsAlive(id string) bool { + t.mu.Lock() + defer t.mu.Unlock() + _, ok := t.writers[id] + return ok +} + +type LiveLogWriter struct { + mu sync.Mutex + ll *LiveLog + fh *os.File + id string + path string + subscribers []chan []byte +} + +func (t *LiveLogWriter) Write(data []byte) (int, error) { + t.mu.Lock() + defer t.mu.Unlock() + + n, err := t.fh.Write(data) + if err != nil { + return 0, err + } + if n != len(data) { + return n, errors.New("short write") + } + for _, ch := range t.subscribers { + ch <- bytes.Clone(data) + } + return n, nil +} + +func (t *LiveLogWriter) Close() error { + t.mu.Lock() + defer t.mu.Unlock() + + t.ll.mu.Lock() + defer t.ll.mu.Unlock() + delete(t.ll.writers, t.id) + + for _, ch := range t.subscribers { + close(ch) + } + + return t.fh.Close() +} diff --git a/internal/logwriter/livelog_test.go b/internal/logwriter/livelog_test.go new file mode 100644 index 0000000..9d136c1 --- /dev/null +++ b/internal/logwriter/livelog_test.go @@ -0,0 +1,108 @@ +package logwriter + +import ( + "bytes" + "testing" +) + +func TestWriteThenRead(t *testing.T) { + t.TempDir() + + logger, err := NewLiveLogger(t.TempDir()) + if err != nil { + t.Fatalf("NewLiveLogger failed: %v", err) + } + + writer, err := logger.NewWriter("test") + if err != nil { + t.Fatalf("NewWriter failed: %v", err) + } + + data := []byte("test") + if _, err := writer.Write(data); err != nil { + t.Fatalf("Write failed: %v", err) + } + writer.Close() + + ch, err := logger.Subscribe("test") + if err != nil { + t.Fatalf("Subscribe failed: %v", err) + } + d := <-ch + if string(d) != "test" { + t.Fatalf("Read failed: expected test, got %s", string(d)) + } +} + +func TestBigWriteThenRead(t *testing.T) { + bigtext := genbytes(32 * 1000) + logger, err := NewLiveLogger(t.TempDir()) + if err != nil { + t.Fatalf("NewLiveLogger failed: %v", err) + } + + writer, err := logger.NewWriter("test") + if err != nil { + t.Fatalf("NewWriter failed: %v", err) + } + + if _, err := writer.Write([]byte(bigtext)); err != nil { + t.Fatalf("Write failed: %v", err) + } + writer.Close() + + ch, err := logger.Subscribe("test") + if err != nil { + t.Fatalf("Subscribe failed: %v", err) + } + + data := make([]byte, 0) + for d := range ch { + data = append(data, d...) + } + if !bytes.Equal(data, bigtext) { + t.Fatalf("Read failed: expected %d bytes, got %d", len(bigtext), len(data)) + } +} + +func TestWritingWhileReading(t *testing.T) { + logger, err := NewLiveLogger(t.TempDir()) + if err != nil { + t.Fatalf("NewLiveLogger failed: %v", err) + } + + writer, err := logger.NewWriter("test") + if err != nil { + t.Fatalf("NewWriter failed: %v", err) + } + + if _, err := writer.Write([]byte("test")); err != nil { + t.Fatalf("Write failed: %v", err) + } + + ch, err := logger.Subscribe("test") + if err != nil { + t.Fatalf("Subscribe failed: %v", err) + } + + if r1 := <-ch; string(r1) != "test" { + t.Fatalf("Read failed: expected test, got %s", string(r1)) + } + + go func() { + writer.Write([]byte("test2")) + writer.Close() + }() + + if r2 := <-ch; string(r2) != "test2" { + t.Fatalf("Read failed: expected test2, got %s", string(r2)) + } +} + +func genbytes(length int) []byte { + data := make([]byte, length) + for i := 0; i < length; i++ { + data[i] = 'a' + } + return data +} diff --git a/internal/logwriter/manager.go b/internal/logwriter/manager.go new file mode 100644 index 0000000..56bf90d --- /dev/null +++ b/internal/logwriter/manager.go @@ -0,0 +1,85 @@ +package logwriter + +import ( + "errors" + "fmt" + "io" + "path" + "strings" +) + +type LogManager struct { + llm *LiveLog + rlm *RotatingLog +} + +func NewLogManager(dir string, maxLogFiles int) (*LogManager, error) { + ll, err := NewLiveLogger(path.Join(dir, ".live")) + if err != nil { + return nil, err + } + + rl := NewRotatingLog(path.Join(dir), maxLogFiles) + if err != nil { + return nil, err + } + + return &LogManager{ + llm: ll, + rlm: rl, + }, nil +} + +// NewLiveWriter creates a new live log writer. The ID is the base name of the log file, a transformed ID is returned. +func (lm *LogManager) NewLiveWriter(idbase string) (string, io.WriteCloser, error) { + id := fmt.Sprintf("%s.livelog", idbase) + w, err := lm.llm.NewWriter(id) + return id, w, err +} + +func (lm *LogManager) Subscribe(id string) (chan []byte, error) { + if strings.HasSuffix(id, ".livelog") { + return lm.llm.Subscribe(id) + } else { + // TODO: implement streaming from rotating log storage + ch := make(chan []byte, 1) + data, err := lm.rlm.Read(id) + if err != nil { + return nil, err + } + ch <- data + close(ch) + return ch, nil + } +} + +func (lm *LogManager) Unsubscribe(id string, ch chan []byte) { + lm.llm.Unsubscribe(id, ch) +} + +// LiveLogIDs returns the list of IDs of live logs e.g. with writes in progress. +func (lm *LogManager) LiveLogIDs() []string { + return lm.llm.ListIDs() +} + +func (lm *LogManager) Finalize(id string) (frozenID string, err error) { + if lm.llm.IsAlive(id) { + return "", errors.New("live log still being written") + } + + ch, err := lm.llm.Subscribe(id) + if err != nil { + return "", err + } + + bytes := make([]byte, 0) + for data := range ch { + bytes = append(bytes, data...) + } + + if err := lm.llm.Remove(id); err != nil { + return "", err + } + + return lm.rlm.Write(bytes) +} diff --git a/internal/logwriter/manager_test.go b/internal/logwriter/manager_test.go new file mode 100644 index 0000000..6043931 --- /dev/null +++ b/internal/logwriter/manager_test.go @@ -0,0 +1,44 @@ +package logwriter + +import "testing" + +func TestLogLifecycle(t *testing.T) { + mgr, err := NewLogManager(t.TempDir(), 10) + if err != nil { + t.Fatalf("NewLogManager failed: %v", err) + } + + id, w, err := mgr.NewLiveWriter("test") + if err != nil { + t.Fatalf("NewLiveWriter failed: %v", err) + } + + ch, err := mgr.Subscribe(id) + if err != nil { + t.Fatalf("Subscribe to live log %q failed: %v", id, err) + } + + contents := "Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum." + if _, err := w.Write([]byte(contents)); err != nil { + t.Fatalf("Write failed: %v", err) + } + w.Close() + + if data := <-ch; string(data) != contents { + t.Fatalf("Read failed: expected %q, got %q", contents, string(data)) + } + + finalID, err := mgr.Finalize(id) + if err != nil { + t.Fatalf("Finalize failed: %v", err) + } + + finalCh, err := mgr.Subscribe(finalID) + if err != nil { + t.Fatalf("Subscribe to finalized log %q failed: %v", finalID, err) + } + + if data := <-finalCh; string(data) != contents { + t.Fatalf("Read failed: expected %q, got %q", contents, string(data)) + } +} diff --git a/internal/rotatinglog/rotatinglog.go b/internal/logwriter/rotatinglog.go similarity index 96% rename from internal/rotatinglog/rotatinglog.go rename to internal/logwriter/rotatinglog.go index 4e48918..098f9f6 100644 --- a/internal/rotatinglog/rotatinglog.go +++ b/internal/logwriter/rotatinglog.go @@ -1,10 +1,9 @@ -package rotatinglog +package logwriter import ( "archive/tar" "bytes" "compress/gzip" - "errors" "fmt" "io" "io/fs" @@ -20,10 +19,6 @@ import ( "go.uber.org/zap" ) -var ErrFileNotFound = errors.New("file not found") -var ErrNotFound = errors.New("entry not found") -var ErrBadName = errors.New("bad name") - type RotatingLog struct { mu sync.Mutex dir string diff --git a/internal/rotatinglog/rotatinglog_test.go b/internal/logwriter/rotatinglog_test.go similarity index 99% rename from internal/rotatinglog/rotatinglog_test.go rename to internal/logwriter/rotatinglog_test.go index 4515025..12c3e7f 100644 --- a/internal/rotatinglog/rotatinglog_test.go +++ b/internal/logwriter/rotatinglog_test.go @@ -1,4 +1,4 @@ -package rotatinglog +package logwriter import ( "fmt" diff --git a/internal/orchestrator/logging/logging.go b/internal/orchestrator/logging/logging.go index 7554bf1..c0827c4 100644 --- a/internal/orchestrator/logging/logging.go +++ b/internal/orchestrator/logging/logging.go @@ -29,7 +29,7 @@ func ContextWithWriter(ctx context.Context, logger io.Writer) context.Context { // Logger returns a logger from the context, or the global logger if none is found. // this is somewhat expensive, it should be called once per task. -func Logger(ctx context.Context) *zap.Logger { +func Logger(ctx context.Context, prefix string) *zap.Logger { writer := WriterFromContext(ctx) if writer == nil { return zap.L() @@ -39,7 +39,7 @@ func Logger(ctx context.Context) *zap.Logger { fe := zapcore.NewConsoleEncoder(p) l := zap.New(zapcore.NewTee( zap.L().Core(), - zapcore.NewCore(fe, zapcore.AddSync(&ioutil.LinePrefixer{W: writer, Prefix: []byte("[tasklog] ")}), zapcore.DebugLevel), + zapcore.NewCore(fe, zapcore.AddSync(&ioutil.LinePrefixer{W: writer, Prefix: []byte(prefix)}), zapcore.DebugLevel), )) return l } diff --git a/internal/orchestrator/orchestrator.go b/internal/orchestrator/orchestrator.go index 076c459..e418701 100644 --- a/internal/orchestrator/orchestrator.go +++ b/internal/orchestrator/orchestrator.go @@ -1,23 +1,22 @@ package orchestrator import ( - "bytes" "context" "errors" "fmt" + "io" "slices" "sync" "time" v1 "github.com/garethgeorge/backrest/gen/go/v1" "github.com/garethgeorge/backrest/internal/config" - "github.com/garethgeorge/backrest/internal/ioutil" + "github.com/garethgeorge/backrest/internal/logwriter" "github.com/garethgeorge/backrest/internal/oplog" "github.com/garethgeorge/backrest/internal/orchestrator/logging" "github.com/garethgeorge/backrest/internal/orchestrator/repo" "github.com/garethgeorge/backrest/internal/orchestrator/tasks" "github.com/garethgeorge/backrest/internal/queue" - "github.com/garethgeorge/backrest/internal/rotatinglog" "go.uber.org/zap" "google.golang.org/protobuf/proto" ) @@ -33,7 +32,7 @@ type Orchestrator struct { OpLog *oplog.OpLog repoPool *resticRepoPool taskQueue *queue.TimePriorityQueue[stContainer] - logStore *rotatinglog.RotatingLog + logStore *logwriter.LogManager // cancelNotify is a list of channels that are notified when a task should be cancelled. cancelNotify []chan int64 @@ -59,7 +58,7 @@ func (st stContainer) Less(other stContainer) bool { return st.ScheduledTask.Less(other.ScheduledTask) } -func NewOrchestrator(resticBin string, cfg *v1.Config, log *oplog.OpLog, logStore *rotatinglog.RotatingLog) (*Orchestrator, error) { +func NewOrchestrator(resticBin string, cfg *v1.Config, log *oplog.OpLog, logStore *logwriter.LogManager) (*Orchestrator, error) { cfg = proto.Clone(cfg).(*v1.Config) // create the orchestrator. @@ -96,6 +95,14 @@ func NewOrchestrator(resticBin string, cfg *v1.Config, log *oplog.OpLog, logStor } for _, op := range incompleteOps { + // check for logs to finalize + if op.Logref != "" { + if frozenID, err := logStore.Finalize(op.Logref); err != nil { + zap.L().Warn("failed to finalize livelog ref for incomplete operation", zap.String("logref", op.Logref), zap.Error(err)) + } else { + op.Logref = frozenID + } + } op.Status = v1.OperationStatus_STATUS_ERROR op.DisplayMessage = "Operation was incomplete when orchestrator was restarted." op.UnixTimeEndMs = op.UnixTimeStartMs @@ -122,6 +129,12 @@ func NewOrchestrator(resticBin string, cfg *v1.Config, log *oplog.OpLog, logStor } } + for _, id := range logStore.LiveLogIDs() { + if _, err := logStore.Finalize(id); err != nil { + zap.L().Warn("failed to finalize unassociated live log", zap.String("id", id), zap.Error(err)) + } + } + zap.L().Info("scrubbed operation log for incomplete operations", zap.Duration("duration", time.Since(startTime)), zap.Int("incomplete_ops", len(incompleteOps)), @@ -378,15 +391,19 @@ func (o *Orchestrator) Run(ctx context.Context) { } func (o *Orchestrator) RunTask(ctx context.Context, st tasks.ScheduledTask) error { - logs := bytes.NewBuffer(nil) - ctx = logging.ContextWithWriter(ctx, &ioutil.SynchronizedWriter{W: logs}) - - op := st.Op - runner := newTaskRunnerImpl(o, st.Task, st.Op) - zap.L().Info("running task", zap.String("task", st.Task.Name()), zap.String("runAt", st.RunAt.Format(time.RFC3339))) - + var liveLogID string + var logWriter io.WriteCloser + op := st.Op if op != nil { + var err error + liveLogID, logWriter, err = o.logStore.NewLiveWriter(fmt.Sprintf("%x", op.GetId())) + if err != nil { + zap.S().Errorf("failed to create live log writer: %v", err) + } + ctx = logging.ContextWithWriter(ctx, logWriter) + + op.Logref = liveLogID // set the logref to the live log. op.UnixTimeStartMs = time.Now().UnixMilli() if op.Status == v1.OperationStatus_STATUS_PENDING || op.Status == v1.OperationStatus_STATUS_UNKNOWN { op.Status = v1.OperationStatus_STATUS_INPROGRESS @@ -403,6 +420,7 @@ func (o *Orchestrator) RunTask(ctx context.Context, st tasks.ScheduledTask) erro } start := time.Now() + runner := newTaskRunnerImpl(o, st.Task, st.Op) err := st.Task.Run(ctx, st, runner) if err != nil { runner.Logger(ctx).Error("task failed", zap.Error(err), zap.Duration("duration", time.Since(start))) @@ -412,14 +430,17 @@ func (o *Orchestrator) RunTask(ctx context.Context, st tasks.ScheduledTask) erro if op != nil { // write logs to log storage for this task. - if logs.Len() > 0 { - ref, err := o.logStore.Write(logs.Bytes()) - if err != nil { - zap.S().Errorf("failed to write logs for task %q to log store: %v", st.Task.Name(), err) + if logWriter != nil { + if err := logWriter.Close(); err != nil { + zap.S().Errorf("failed to close live log writer: %v", err) + } + if finalID, err := o.logStore.Finalize(liveLogID); err != nil { + zap.S().Errorf("failed to finalize live log: %v", err) } else { - op.Logref = ref + op.Logref = finalID } } + if err != nil { var taskCancelledError *tasks.TaskCancelledError var taskRetryError *tasks.TaskRetryError diff --git a/internal/orchestrator/repo/logging.go b/internal/orchestrator/repo/logging.go index ce36de0..ade3d43 100644 --- a/internal/orchestrator/repo/logging.go +++ b/internal/orchestrator/repo/logging.go @@ -20,7 +20,7 @@ func forwardResticLogs(ctx context.Context) (context.Context, func()) { prefixWriter := &ioutil.LinePrefixer{W: limitWriter, Prefix: []byte("[restic] ")} return restic.ContextWithLogger(ctx, prefixWriter), func() { if limitWriter.D > 0 { - fmt.Fprintf(writer, "Output truncated, %d bytes dropped\n", limitWriter.D) + fmt.Fprintf(prefixWriter, "... Output truncated, %d bytes dropped\n", limitWriter.D) } prefixWriter.Close() } diff --git a/internal/orchestrator/repo/repo.go b/internal/orchestrator/repo/repo.go index 6329734..b78e311 100644 --- a/internal/orchestrator/repo/repo.go +++ b/internal/orchestrator/repo/repo.go @@ -82,7 +82,7 @@ func NewRepoOrchestrator(config *v1.Config, repoConfig *v1.Repo, resticPath stri } func (r *RepoOrchestrator) logger(ctx context.Context) *zap.Logger { - return logging.Logger(ctx).With(zap.String("repo", r.repoConfig.Id)) + return logging.Logger(ctx, "[repo-manager] ").With(zap.String("repo", r.repoConfig.Id)) } func (r *RepoOrchestrator) Init(ctx context.Context) error { diff --git a/internal/orchestrator/taskrunnerimpl.go b/internal/orchestrator/taskrunnerimpl.go index 895be7f..87fb73e 100644 --- a/internal/orchestrator/taskrunnerimpl.go +++ b/internal/orchestrator/taskrunnerimpl.go @@ -2,12 +2,16 @@ package orchestrator import ( "context" + "crypto/rand" + "encoding/hex" "errors" "fmt" + "io" "time" v1 "github.com/garethgeorge/backrest/gen/go/v1" "github.com/garethgeorge/backrest/internal/hook" + "github.com/garethgeorge/backrest/internal/logwriter" "github.com/garethgeorge/backrest/internal/oplog" "github.com/garethgeorge/backrest/internal/orchestrator/logging" "github.com/garethgeorge/backrest/internal/orchestrator/repo" @@ -155,5 +159,41 @@ func (t *taskRunnerImpl) Config() *v1.Config { } func (t *taskRunnerImpl) Logger(ctx context.Context) *zap.Logger { - return logging.Logger(ctx).Named(t.t.Name()) + return logging.Logger(ctx, "[tasklog] ").Named(t.t.Name()) +} + +func (t *taskRunnerImpl) LogrefWriter() (string, tasks.LogrefWriter, error) { + id := make([]byte, 16) + if _, err := rand.Read(id); err != nil { + return "", nil, fmt.Errorf("read random: %w", err) + } + idStr := hex.EncodeToString(id) + liveID, writer, err := t.orchestrator.logStore.NewLiveWriter(idStr) + if err != nil { + return "", nil, fmt.Errorf("new log writer: %w", err) + } + return liveID, &logrefWriter{ + logmgr: t.orchestrator.logStore, + id: liveID, + writer: writer, + }, nil +} + +type logrefWriter struct { + logmgr *logwriter.LogManager + id string + writer io.WriteCloser +} + +var _ tasks.LogrefWriter = &logrefWriter{} + +func (l *logrefWriter) Write(p []byte) (n int, err error) { + return l.writer.Write(p) +} + +func (l *logrefWriter) Close() (string, error) { + if err := l.writer.Close(); err != nil { + return "", err + } + return l.logmgr.Finalize(l.id) } diff --git a/internal/orchestrator/tasks/task.go b/internal/orchestrator/tasks/task.go index ffb200f..9df8d4e 100644 --- a/internal/orchestrator/tasks/task.go +++ b/internal/orchestrator/tasks/task.go @@ -52,6 +52,13 @@ type TaskRunner interface { Config() *v1.Config // Logger returns the logger. Logger(ctx context.Context) *zap.Logger + // LogrefWriter returns a writer that can be used to track streaming operation output. + LogrefWriter() (liveID string, w LogrefWriter, err error) +} + +type LogrefWriter interface { + Write(data []byte) (int, error) + Close() (frozenID string, err error) } type TaskExecutor interface { @@ -211,3 +218,7 @@ func (t *testTaskRunner) Config() *v1.Config { func (t *testTaskRunner) Logger(ctx context.Context) *zap.Logger { return zap.L() } + +func (t *testTaskRunner) LogrefWriter() (liveID string, w LogrefWriter, err error) { + panic("not implemented") +} diff --git a/internal/orchestrator/tasks/taskcheck.go b/internal/orchestrator/tasks/taskcheck.go index 06bddcc..4455438 100644 --- a/internal/orchestrator/tasks/taskcheck.go +++ b/internal/orchestrator/tasks/taskcheck.go @@ -1,18 +1,14 @@ package tasks import ( - "bytes" "context" "errors" "fmt" - "sync" "time" v1 "github.com/garethgeorge/backrest/gen/go/v1" - "github.com/garethgeorge/backrest/internal/ioutil" "github.com/garethgeorge/backrest/internal/oplog" "github.com/garethgeorge/backrest/internal/protoutil" - "go.uber.org/zap" ) type CheckTask struct { @@ -117,38 +113,17 @@ func (t *CheckTask) Run(ctx context.Context, st ScheduledTask, runner TaskRunner } op.Op = opCheck - checkCtx, cancelCheckCtx := context.WithCancel(ctx) - interval := time.NewTicker(1 * time.Second) - defer interval.Stop() - buf := bytes.NewBuffer(nil) - bufWriter := &ioutil.SynchronizedWriter{W: &ioutil.LimitWriter{W: buf, N: 16 * 1024}} - var wg sync.WaitGroup - wg.Add(1) - go func() { - defer wg.Done() - for { - select { - case <-interval.C: - bufWriter.Mu.Lock() - output := buf.String() - bufWriter.Mu.Unlock() + liveID, writer, err := runner.LogrefWriter() + if err != nil { + return fmt.Errorf("create logref writer: %w", err) + } + opCheck.OperationCheck.OutputLogref = liveID - if opCheck.OperationCheck.Output != string(output) { - opCheck.OperationCheck.Output = string(output) + if err := runner.UpdateOperation(op); err != nil { + return fmt.Errorf("update operation: %w", err) + } - if err := runner.OpLog().Update(op); err != nil { - zap.L().Error("update check operation with status output", zap.Error(err)) - } - } - case <-checkCtx.Done(): - return - } - } - }() - - err = repo.Check(checkCtx, bufWriter) - cancelCheckCtx() - wg.Wait() + err = repo.Check(ctx, writer) if err != nil { runner.ExecuteHooks(ctx, []v1.Hook_Condition{ v1.Hook_CONDITION_CHECK_ERROR, @@ -160,7 +135,11 @@ func (t *CheckTask) Run(ctx context.Context, st ScheduledTask, runner TaskRunner return fmt.Errorf("check: %w", err) } - opCheck.OperationCheck.Output = string(buf.Bytes()) + frozenID, err := writer.Close() + if err != nil { + return fmt.Errorf("close logref writer: %w", err) + } + opCheck.OperationCheck.OutputLogref = frozenID if err := runner.ExecuteHooks(ctx, []v1.Hook_Condition{ v1.Hook_CONDITION_CHECK_SUCCESS, diff --git a/internal/orchestrator/tasks/taskprune.go b/internal/orchestrator/tasks/taskprune.go index 5d07604..0350b67 100644 --- a/internal/orchestrator/tasks/taskprune.go +++ b/internal/orchestrator/tasks/taskprune.go @@ -1,15 +1,12 @@ package tasks import ( - "bytes" "context" "errors" "fmt" - "sync" "time" v1 "github.com/garethgeorge/backrest/gen/go/v1" - "github.com/garethgeorge/backrest/internal/ioutil" "github.com/garethgeorge/backrest/internal/oplog" "github.com/garethgeorge/backrest/internal/protoutil" "go.uber.org/zap" @@ -116,40 +113,20 @@ func (t *PruneTask) Run(ctx context.Context, st ScheduledTask, runner TaskRunner } op.Op = opPrune - pruneCtx, cancelPruneCtx := context.WithCancel(ctx) - interval := time.NewTicker(1 * time.Second) - defer interval.Stop() - buf := bytes.NewBuffer(nil) - bufWriter := &ioutil.SynchronizedWriter{W: &ioutil.LimitWriter{W: buf, N: 16 * 1024}} - var wg sync.WaitGroup - wg.Add(1) - go func() { - defer wg.Done() - for { - select { - case <-interval.C: - bufWriter.Mu.Lock() - output := buf.String() - bufWriter.Mu.Unlock() + liveID, writer, err := runner.LogrefWriter() + if err != nil { + return fmt.Errorf("create logref writer: %w", err) + } + opPrune.OperationPrune.OutputLogref = liveID - if opPrune.OperationPrune.Output != string(output) { - opPrune.OperationPrune.Output = string(output) + if err := runner.UpdateOperation(op); err != nil { + return fmt.Errorf("update operation: %w", err) + } - if err := runner.OpLog().Update(op); err != nil { - zap.L().Error("update prune operation with status output", zap.Error(err)) - } - } - case <-pruneCtx.Done(): - return - } - } - }() - - err = repo.Prune(pruneCtx, bufWriter) - cancelPruneCtx() - wg.Wait() + err = repo.Prune(ctx, writer) if err != nil { runner.ExecuteHooks(ctx, []v1.Hook_Condition{ + v1.Hook_CONDITION_PRUNE_ERROR, v1.Hook_CONDITION_ANY_ERROR, }, HookVars{ Error: err.Error(), @@ -158,7 +135,11 @@ func (t *PruneTask) Run(ctx context.Context, st ScheduledTask, runner TaskRunner return fmt.Errorf("prune: %w", err) } - opPrune.OperationPrune.Output = string(buf.Bytes()) + frozenID, err := writer.Close() + if err != nil { + return fmt.Errorf("close logref writer: %w", err) + } + opPrune.OperationPrune.OutputLogref = frozenID // Run a stats task after a successful prune if err := runner.ScheduleTask(NewStatsTask(t.RepoID(), PlanForSystemTasks, false), TaskPriorityStats); err != nil { diff --git a/proto/v1/operations.proto b/proto/v1/operations.proto index bd7b742..2b4277e 100644 --- a/proto/v1/operations.proto +++ b/proto/v1/operations.proto @@ -92,12 +92,14 @@ message OperationForget { // OperationPrune tracks a prune operation. message OperationPrune { - string output = 1; // output of the prune. + string output = 1 [deprecated = true]; // output of the prune. + string output_logref = 2; // logref of the prune output. } // OperationCheck tracks a check operation. message OperationCheck { - string output = 1; // output of the check operation. + string output = 1 [deprecated = true]; // output of the check operation. + string output_logref = 2; // logref of the check output. } // OperationRestore tracks a restore operation. diff --git a/proto/v1/service.proto b/proto/v1/service.proto index 7a3ff2e..629cc67 100644 --- a/proto/v1/service.proto +++ b/proto/v1/service.proto @@ -42,7 +42,7 @@ service Backrest { rpc Cancel(types.Int64Value) returns (google.protobuf.Empty) {} // GetLogs returns the keyed large data for the given operation. - rpc GetLogs(LogDataRequest) returns (types.BytesValue) {} + rpc GetLogs(LogDataRequest) returns (stream types.BytesValue) {} // RunCommand executes a generic restic command on the repository. rpc RunCommand(RunCommandRequest) returns (stream types.BytesValue) {} diff --git a/webui/gen/ts/v1/operations_pb.ts b/webui/gen/ts/v1/operations_pb.ts index 461a6d2..b335ce0 100644 --- a/webui/gen/ts/v1/operations_pb.ts +++ b/webui/gen/ts/v1/operations_pb.ts @@ -538,10 +538,18 @@ export class OperationPrune extends Message { /** * output of the prune. * - * @generated from field: string output = 1; + * @generated from field: string output = 1 [deprecated = true]; + * @deprecated */ output = ""; + /** + * logref of the prune output. + * + * @generated from field: string output_logref = 2; + */ + outputLogref = ""; + constructor(data?: PartialMessage) { super(); proto3.util.initPartial(data, this); @@ -551,6 +559,7 @@ export class OperationPrune extends Message { static readonly typeName = "v1.OperationPrune"; static readonly fields: FieldList = proto3.util.newFieldList(() => [ { no: 1, name: "output", kind: "scalar", T: 9 /* ScalarType.STRING */ }, + { no: 2, name: "output_logref", kind: "scalar", T: 9 /* ScalarType.STRING */ }, ]); static fromBinary(bytes: Uint8Array, options?: Partial): OperationPrune { @@ -579,10 +588,18 @@ export class OperationCheck extends Message { /** * output of the check operation. * - * @generated from field: string output = 1; + * @generated from field: string output = 1 [deprecated = true]; + * @deprecated */ output = ""; + /** + * logref of the check output. + * + * @generated from field: string output_logref = 2; + */ + outputLogref = ""; + constructor(data?: PartialMessage) { super(); proto3.util.initPartial(data, this); @@ -592,6 +609,7 @@ export class OperationCheck extends Message { static readonly typeName = "v1.OperationCheck"; static readonly fields: FieldList = proto3.util.newFieldList(() => [ { no: 1, name: "output", kind: "scalar", T: 9 /* ScalarType.STRING */ }, + { no: 2, name: "output_logref", kind: "scalar", T: 9 /* ScalarType.STRING */ }, ]); static fromBinary(bytes: Uint8Array, options?: Partial): OperationCheck { diff --git a/webui/gen/ts/v1/service_connect.ts b/webui/gen/ts/v1/service_connect.ts index 38b5094..e22c8b3 100644 --- a/webui/gen/ts/v1/service_connect.ts +++ b/webui/gen/ts/v1/service_connect.ts @@ -143,7 +143,7 @@ export const Backrest = { name: "GetLogs", I: LogDataRequest, O: BytesValue, - kind: MethodKind.Unary, + kind: MethodKind.ServerStreaming, }, /** * RunCommand executes a generic restic command on the repository. diff --git a/webui/package-lock.json b/webui/package-lock.json index db36938..4b693fb 100644 --- a/webui/package-lock.json +++ b/webui/package-lock.json @@ -25,11 +25,13 @@ "react": "^18.2.0", "react-dom": "^18.2.0", "react-js-cron": "^5.0.1", + "react-virtualized": "^9.22.5", "recharts": "^2.12.7", "typescript": "^5.2.2" }, "devDependencies": { "@parcel/transformer-sass": "^2.10.3", + "@types/react-virtualized": "^9.21.30", "cross-env": "^7.0.3", "events": "^3.3.0", "rimraf": "^5.0.5" @@ -2434,6 +2436,16 @@ "@types/react": "*" } }, + "node_modules/@types/react-virtualized": { + "version": "9.21.30", + "resolved": "https://registry.npmjs.org/@types/react-virtualized/-/react-virtualized-9.21.30.tgz", + "integrity": "sha512-4l2TFLQ8BCjNDQlvH85tU6gctuZoEdgYzENQyZHpgTHU7hoLzYgPSOALMAeA58LOWua8AzC6wBivPj1lfl6JgQ==", + "dev": true, + "dependencies": { + "@types/prop-types": "*", + "@types/react": "*" + } + }, "node_modules/@types/scheduler": { "version": "0.16.8", "resolved": "https://registry.npmjs.org/@types/scheduler/-/scheduler-0.16.8.tgz", @@ -2613,11 +2625,11 @@ } }, "node_modules/braces": { - "version": "3.0.2", - "resolved": "https://registry.npmjs.org/braces/-/braces-3.0.2.tgz", - "integrity": "sha512-b8um+L1RzM3WDSzvhm6gIz1yfTbBt6YTlcEKAvsmqCZZFw46z626lVj9j1yEPW33H5H+lBQpZMP1k8l+78Ha0A==", + "version": "3.0.3", + "resolved": "https://registry.npmjs.org/braces/-/braces-3.0.3.tgz", + "integrity": "sha512-yQbXgO/OSZVD2IsiLlro+7Hf6Q18EJrKSEsdoMzKePKXct3gvD8oLcOQdIzGupr5Fj+EDe8gO/lxc1BzfMpxvA==", "dependencies": { - "fill-range": "^7.0.1" + "fill-range": "^7.1.1" }, "engines": { "node": ">=8" @@ -3305,9 +3317,9 @@ } }, "node_modules/fill-range": { - "version": "7.0.1", - "resolved": "https://registry.npmjs.org/fill-range/-/fill-range-7.0.1.tgz", - "integrity": "sha512-qOo9F+dMUmC2Lcb4BbVvnKJxTPjCm+RRpe4gDuGrzkL7mEVl/djYSu2OdQ2Pa302N4oqkSg9ir6jaLWJ2USVpQ==", + "version": "7.1.1", + "resolved": "https://registry.npmjs.org/fill-range/-/fill-range-7.1.1.tgz", + "integrity": "sha512-YsGpe3WHLK8ZYi4tWDg2Jy3ebRz2rXowDxnld4bkQB00cc/1Zw9AWnC0i9ztDJitivtQvaI9KaLyKrc+hBW0yg==", "dependencies": { "to-regex-range": "^5.0.1" }, @@ -3908,11 +3920,11 @@ "peer": true }, "node_modules/micromatch": { - "version": "4.0.5", - "resolved": "https://registry.npmjs.org/micromatch/-/micromatch-4.0.5.tgz", - "integrity": "sha512-DMy+ERcEW2q8Z2Po+WNXuw3c5YaUSFjAO5GsJqfEl7UjvtIuFKO6ZrKvcItdy98dwFI2N1tg3zNIdKaQT+aNdA==", + "version": "4.0.8", + "resolved": "https://registry.npmjs.org/micromatch/-/micromatch-4.0.8.tgz", + "integrity": "sha512-PXwfBhYu0hBCPw8Dn0E+WDYb7af3dSLVWKi3HGv84IdF4TyFoC0ysxFd0Goxw7nSv4T/PzEJQxsYsEiFCKo2BA==", "dependencies": { - "braces": "^3.0.2", + "braces": "^3.0.3", "picomatch": "^2.3.1" }, "engines": { @@ -4867,6 +4879,11 @@ "react-dom": ">=17.0.0" } }, + "node_modules/react-lifecycles-compat": { + "version": "3.0.4", + "resolved": "https://registry.npmjs.org/react-lifecycles-compat/-/react-lifecycles-compat-3.0.4.tgz", + "integrity": "sha512-fBASbA6LnOU9dOU2eW7aQ8xmYBSXUIWr+UmF9b1efZBazGNO+rcXT/icdKnYm2pTwcRylVUYwW7H1PHfLekVzA==" + }, "node_modules/react-refresh": { "version": "0.9.0", "resolved": "https://registry.npmjs.org/react-refresh/-/react-refresh-0.9.0.tgz", @@ -4904,6 +4921,31 @@ "react-dom": ">=16.6.0" } }, + "node_modules/react-virtualized": { + "version": "9.22.5", + "resolved": "https://registry.npmjs.org/react-virtualized/-/react-virtualized-9.22.5.tgz", + "integrity": "sha512-YqQMRzlVANBv1L/7r63OHa2b0ZsAaDp1UhVNEdUaXI8A5u6hTpA5NYtUueLH2rFuY/27mTGIBl7ZhqFKzw18YQ==", + "dependencies": { + "@babel/runtime": "^7.7.2", + "clsx": "^1.0.4", + "dom-helpers": "^5.1.3", + "loose-envify": "^1.4.0", + "prop-types": "^15.7.2", + "react-lifecycles-compat": "^3.0.4" + }, + "peerDependencies": { + "react": "^15.3.0 || ^16.0.0-alpha || ^17.0.0 || ^18.0.0", + "react-dom": "^15.3.0 || ^16.0.0-alpha || ^17.0.0 || ^18.0.0" + } + }, + "node_modules/react-virtualized/node_modules/clsx": { + "version": "1.2.1", + "resolved": "https://registry.npmjs.org/clsx/-/clsx-1.2.1.tgz", + "integrity": "sha512-EcR6r5a8bj6pu3ycsa/E/cKVGuTgZJZdsyUYHOksG/UHIiKfjxzRxYJpyVBwYaQeOvghal9fcc4PidlgzugAQg==", + "engines": { + "node": ">=6" + } + }, "node_modules/readdirp": { "version": "3.6.0", "resolved": "https://registry.npmjs.org/readdirp/-/readdirp-3.6.0.tgz", diff --git a/webui/package.json b/webui/package.json index 5076f6a..a0ddb73 100644 --- a/webui/package.json +++ b/webui/package.json @@ -29,11 +29,13 @@ "react": "^18.2.0", "react-dom": "^18.2.0", "react-js-cron": "^5.0.1", - "typescript": "^5.2.2", - "recharts": "^2.12.7" + "react-virtualized": "^9.22.5", + "recharts": "^2.12.7", + "typescript": "^5.2.2" }, "devDependencies": { "@parcel/transformer-sass": "^2.10.3", + "@types/react-virtualized": "^9.21.30", "cross-env": "^7.0.3", "events": "^3.3.0", "rimraf": "^5.0.5" diff --git a/webui/src/components/LogView.tsx b/webui/src/components/LogView.tsx new file mode 100644 index 0000000..f0fcb1f --- /dev/null +++ b/webui/src/components/LogView.tsx @@ -0,0 +1,66 @@ +import React, { useEffect, useState } from "react"; +import { LogDataRequest } from "../../gen/ts/v1/service_pb"; +import { backrestService } from "../api"; + +import AutoSizer from "react-virtualized/dist/commonjs/AutoSizer"; +import List from "react-virtualized/dist/commonjs/List"; +import { set } from "lodash"; + +// TODO: refactor this to use the provider pattern +export const LogView = ({ logref }: { logref: string }) => { + const [lines, setLines] = useState([""]); + + console.log("LogView", logref); + + useEffect(() => { + if (!logref) { + return; + } + + const controller = new AbortController(); + + (async () => { + try { + for await (const log of backrestService.getLogs( + new LogDataRequest({ + ref: logref, + }), + { signal: controller.signal } + )) { + const text = new TextDecoder("utf-8").decode(log.value); + const lines = text.split("\n"); + setLines((prev) => { + const copy = [...prev]; + copy[copy.length - 1] += lines[0]; + copy.push(...lines.slice(1)); + return copy; + }); + } + } catch (e) { + setLines((prev) => [...prev, `Fetch log error: ${e}`]); + } + })(); + + return () => { + setLines([]); + controller.abort(); + }; + }, [logref]); + + console.log("LogView", lines); + + return ( +
+ {lines.map((line, i) => ( +
+          {line}
+        
+ ))} +
+ ); +}; diff --git a/webui/src/components/OperationRow.tsx b/webui/src/components/OperationRow.tsx index 900d80e..616ea85 100644 --- a/webui/src/components/OperationRow.tsx +++ b/webui/src/components/OperationRow.tsx @@ -41,6 +41,7 @@ import { nameForStatus, } from "../state/flowdisplayaggregator"; import { OperationIcon } from "./OperationIcon"; +import { LogView } from "./LogView"; export const OperationRow = ({ operation, @@ -93,7 +94,7 @@ export const OperationRow = ({ showModal(null); }} > - + ); }; @@ -214,14 +215,22 @@ export const OperationRow = ({ bodyItems.push({ key: "prune", label: "Prune Output", - children:
{prune.output}
, + children: prune.outputLogref ? ( + + ) : ( +
{prune.output}
+ ), }); } else if (operation.op.case === "operationCheck") { const check = operation.op.value; bodyItems.push({ key: "check", label: "Check Output", - children:
{check.output}
, + children: check.outputLogref ? ( + + ) : ( +
{check.output}
+ ), }); } else if (operation.op.case === "operationRestore") { expandedBodyItems.push("restore"); @@ -236,7 +245,7 @@ export const OperationRow = ({ bodyItems.push({ key: "logref", label: "Hook Output", - children: , + children: , }); } } @@ -564,28 +573,3 @@ const ForgetOperationDetails = ({ ); }; - -// TODO: refactor this to use the provider pattern -const BigOperationDataVerbatim = ({ logref }: { logref: string }) => { - const [output, setOutput] = useState(undefined); - - useEffect(() => { - if (!logref) { - return; - } - backrestService - .getLogs( - new LogDataRequest({ - ref: logref, - }) - ) - .then((resp) => { - setOutput(new TextDecoder("utf-8").decode(resp.value)); - }) - .catch((e) => { - console.error("Failed to fetch hook output: ", e); - }); - }, [logref]); - - return
{output}
; -};