fix: race condition in taskcollectgarbage potentially prematurely deletes logs for tasks currently running (#828)
Some checks failed
Build Snapshot Release / build (push) Has been cancelled
Release Please / release-please (push) Has been cancelled
Test / test-nix (push) Has been cancelled
Test / test-win (push) Has been cancelled
Update Restic / update-restic-version (push) Has been cancelled

This commit is contained in:
Gareth
2025-06-29 21:56:33 -07:00
committed by GitHub
parent 6e0c201025
commit cd5814d848
5 changed files with 29 additions and 18 deletions

View File

@@ -849,6 +849,7 @@ func TestRestore(t *testing.T) {
}
func TestRunCommand(t *testing.T) {
testutil.InstallZapLogger(t)
sut := createSystemUnderTest(t, createConfigManager(&v1.Config{
Modno: 1234,
Instance: "test",
@@ -1087,7 +1088,7 @@ func createSystemUnderTest(t *testing.T, config *config.ConfigManager) systemUnd
}
}
h := NewBackrestHandler(config, peerStateManager, orch, oplog, logStore)
h := NewBackrestHandler(config, peerStateManager, orch, oplog, logStore)
return systemUnderTest{
handler: h,

View File

@@ -154,7 +154,7 @@ func (ls *LogStore) Create(id string, parentOpID int64, ttl time.Duration) (io.W
defer ls.dbpool.Put(conn)
// potentially prune any expired logs
if err := sqlitex.ExecuteTransient(conn, "DELETE FROM logs WHERE expiration_ts_unix < ? AND expiration_ts_unix != 0", &sqlitex.ExecOptions{
if err := sqlitex.Execute(conn, "DELETE FROM logs WHERE expiration_ts_unix < ? AND expiration_ts_unix != 0", &sqlitex.ExecOptions{
Args: []any{time.Now().Unix()},
}); err != nil {
return nil, fmt.Errorf("prune expired logs: %v", err)
@@ -171,15 +171,13 @@ func (ls *LogStore) Create(id string, parentOpID int64, ttl time.Duration) (io.W
return nil, fmt.Errorf("create temp file: %v", err)
}
expire_ts_unix := time.Unix(0, 0)
var expire_ts_unix int64 = 0
if ttl != 0 {
expire_ts_unix = time.Now().Add(ttl)
expire_ts_unix = time.Now().Add(ttl).Unix()
}
// fmt.Printf("INSERT INTO logs (id, expiration_ts_unix, owner_opid, data_fname) VALUES (%v, %v, %v, %v)\n", id, expire_ts_unix.Unix(), parentOpID, fname)
if err := sqlitex.ExecuteTransient(conn, "INSERT INTO logs (id, expiration_ts_unix, owner_opid, data_fname) VALUES (?, ?, ?, ?)", &sqlitex.ExecOptions{
Args: []any{id, expire_ts_unix.Unix(), parentOpID, fname},
if err := sqlitex.Execute(conn, "INSERT INTO logs (id, expiration_ts_unix, owner_opid, data_fname) VALUES (?, ?, ?, ?)", &sqlitex.ExecOptions{
Args: []any{id, expire_ts_unix, parentOpID, fname},
}); err != nil {
return nil, fmt.Errorf("insert log: %v", err)
}
@@ -210,7 +208,7 @@ func (ls *LogStore) Open(id string) (io.ReadCloser, error) {
var found bool
var fname string
var dataGz []byte
if err := sqlitex.ExecuteTransient(conn, "SELECT data_fname, data_gz FROM logs WHERE id = ?", &sqlitex.ExecOptions{
if err := sqlitex.Execute(conn, "SELECT data_fname, data_gz FROM logs WHERE id = ?", &sqlitex.ExecOptions{
Args: []any{id},
ResultFunc: func(stmt *sqlite.Stmt) error {
found = true
@@ -267,7 +265,7 @@ func (ls *LogStore) Delete(id string) error {
}
defer ls.dbpool.Put(conn)
if err := sqlitex.ExecuteTransient(conn, "DELETE FROM logs WHERE id = ?", &sqlitex.ExecOptions{
if err := sqlitex.Execute(conn, "DELETE FROM logs WHERE id = ?", &sqlitex.ExecOptions{
Args: []any{id},
}); err != nil {
return fmt.Errorf("delete log: %v", err)
@@ -286,7 +284,7 @@ func (ls *LogStore) DeleteWithParent(parentOpID int64) error {
}
defer ls.dbpool.Put(conn)
if err := sqlitex.ExecuteTransient(conn, "DELETE FROM logs WHERE owner_opid = ?", &sqlitex.ExecOptions{
if err := sqlitex.Execute(conn, "DELETE FROM logs WHERE owner_opid = ?", &sqlitex.ExecOptions{
Args: []any{parentOpID},
}); err != nil {
return fmt.Errorf("delete log: %v", err)
@@ -302,7 +300,7 @@ func (ls *LogStore) SelectAll(f func(id string, parentID int64)) error {
}
defer ls.dbpool.Put(conn)
return sqlitex.ExecuteTransient(conn, "SELECT id, owner_opid FROM logs ORDER BY owner_opid", &sqlitex.ExecOptions{
return sqlitex.Execute(conn, "SELECT id, owner_opid FROM logs ORDER BY owner_opid", &sqlitex.ExecOptions{
ResultFunc: func(stmt *sqlite.Stmt) error {
f(stmt.ColumnText(0), stmt.ColumnInt64(1))
return nil
@@ -364,7 +362,7 @@ func (ls *LogStore) finalizeLogFile(id string, fname string) error {
}); e != nil {
return fmt.Errorf("update log: %v", e)
} else if conn.Changes() != 1 {
return fmt.Errorf("expected 1 row to be updated, got %d", conn.Changes())
return fmt.Errorf("expected 1 row to be updated for %q, got %d", id, conn.Changes())
}
return nil

View File

@@ -572,7 +572,7 @@ func (o *Orchestrator) setupTaskContext(ctx context.Context, op *v1.Operation, c
o.taskCancelMu.Unlock()
// Set up logging
logID := uuid.New().String()
logID := "t-" + uuid.New().String()
var err error
logWriter, err = o.logStore.Create(logID, op.Id, defaultTaskLogDuration)
if err != nil {

View File

@@ -172,7 +172,7 @@ func (t *taskRunnerImpl) Logger(ctx context.Context) *zap.Logger {
}
func (t *taskRunnerImpl) LogrefWriter() (string, io.WriteCloser, error) {
logID := uuid.New().String()
logID := "c-" + uuid.New().String()
writer, err := t.orchestrator.logStore.Create(logID, t.op.GetId(), time.Duration(0))
return logID, writer, err
}

View File

@@ -2,6 +2,7 @@ package tasks
import (
"context"
"errors"
"fmt"
"reflect"
"time"
@@ -212,18 +213,29 @@ func (t *CollectGarbageTask) gcOperations(runner TaskRunner) error {
zap.Any("removed_by_unknown_peer_keyid", deletedByUnknownPeerKeyid))
// cleaning up logstore
toDelete := []string{}
toDelete := make(map[string]int64)
if err := t.logstore.SelectAll(func(id string, parentID int64) {
if parentID == 0 {
return
}
if _, ok := validIDs[parentID]; !ok {
toDelete = append(toDelete, id)
toDelete[id] = parentID // this logstore entry is orphaned, mark it for deletion
}
}); err != nil {
return fmt.Errorf("selecting all logstore entries: %w", err)
}
for _, id := range toDelete {
for id, parentID := range toDelete {
// Confirm that the ID is invalid by trying to get it from the oplog
if _, err := runner.GetOperation(parentID); !errors.Is(err, oplog.ErrNotExist) {
if err != nil {
zap.L().Error("getting operation for logstore entry", zap.String("id", id), zap.Int64("parent_id", parentID), zap.Error(err))
continue
}
zap.L().Debug("logstore entry is still valid, skipping deletion", zap.String("id", id), zap.Error(err))
continue
}
// The logstore entry is orphaned, delete it
if err := t.logstore.Delete(id); err != nil {
zap.L().Error("deleting logstore entry", zap.String("id", id), zap.Error(err))
}