feat: operations IDs are ordered by operation timestamp

This commit is contained in:
Gareth George
2023-11-23 00:53:06 -08:00
parent 338b6f2fdf
commit a1ed6f90ba
4 changed files with 53 additions and 29 deletions

View File

@@ -57,7 +57,10 @@ func NewOpLog(databasePath string) (*OpLog, error) {
return nil, fmt.Errorf("error opening database: %s", err) return nil, fmt.Errorf("error opening database: %s", err)
} }
o := &OpLog{db: db} o := &OpLog{
db: db,
}
o.nextId.Store(1)
if err := db.Update(func(tx *bolt.Tx) error { if err := db.Update(func(tx *bolt.Tx) error {
// Create the buckets if they don't exist // Create the buckets if they don't exist
@@ -194,15 +197,15 @@ func (o *OpLog) getOperationHelper(b *bolt.Bucket, id int64) (*v1.Operation, err
func (o *OpLog) addOperationHelper(tx *bolt.Tx, op *v1.Operation) error { func (o *OpLog) addOperationHelper(tx *bolt.Tx, op *v1.Operation) error {
b := tx.Bucket(OpLogBucket) b := tx.Bucket(OpLogBucket)
if op.Id == 0 { if op.Id == 0 {
// Create a unique ID sorted based on the start time in milliseconds and seq, err := b.NextSequence()
// a counter to ensure uniqueness in the case of multiple operations if err != nil {
// starting at the same time. return fmt.Errorf("error getting next sequence: %w", err)
op.Id = op.UnixTimeStartMs<<20 | (o.nextId.Add(1) & (1<<20 - 1))
if op.Id < 0 {
return fmt.Errorf("overflow in operation ID generation")
} }
if op.UnixTimeStartMs == 0 {
return fmt.Errorf("operation must have a start time")
}
op.Id = op.UnixTimeStartMs<<20 | int64(seq&(1<<20-1))
} }
op.SnapshotId = NormalizeSnapshotId(op.SnapshotId) op.SnapshotId = NormalizeSnapshotId(op.SnapshotId)

View File

@@ -47,6 +47,21 @@ func (t *ScheduledBackupTask) Name() string {
} }
func (t *ScheduledBackupTask) Next(now time.Time) *time.Time { func (t *ScheduledBackupTask) Next(now time.Time) *time.Time {
if ops, err := t.orchestrator.OpLog.GetByPlan(t.plan.Id, indexutil.CollectLastN(10)); err == nil {
var lastBackupOp *v1.Operation
for _, op := range ops {
if _, ok := op.Op.(*v1.Operation_OperationBackup); ok {
lastBackupOp = op
}
}
if lastBackupOp != nil {
now = time.Unix(0, lastBackupOp.UnixTimeEndMs*int64(time.Millisecond))
}
} else {
zap.S().Errorf("error getting last operation for plan %q when computing backup schedule: %v", t.plan.Id, err)
}
next := t.schedule.Next(now) next := t.schedule.Next(now)
return &next return &next
} }
@@ -174,6 +189,7 @@ func indexSnapshotsHelper(ctx context.Context, orchestrator *Orchestrator, plan
UnixTimeStartMs: snapshotProto.UnixTimeMs, UnixTimeStartMs: snapshotProto.UnixTimeMs,
UnixTimeEndMs: snapshotProto.UnixTimeMs, UnixTimeEndMs: snapshotProto.UnixTimeMs,
Status: v1.OperationStatus_STATUS_SUCCESS, Status: v1.OperationStatus_STATUS_SUCCESS,
SnapshotId: snapshotProto.Id,
Op: &v1.Operation_OperationIndexSnapshot{ Op: &v1.Operation_OperationIndexSnapshot{
OperationIndexSnapshot: &v1.OperationIndexSnapshot{ OperationIndexSnapshot: &v1.OperationIndexSnapshot{
Snapshot: snapshotProto, Snapshot: snapshotProto,
@@ -196,15 +212,6 @@ func indexSnapshotsHelper(ctx context.Context, orchestrator *Orchestrator, plan
return err return err
} }
func containsSnapshotOperation(ops []*v1.Operation) bool {
for _, op := range ops {
if _, ok := op.Op.(*v1.Operation_OperationIndexSnapshot); ok {
return true
}
}
return false
}
// WithOperation is a utility that creates an operation to track the function's execution. // WithOperation is a utility that creates an operation to track the function's execution.
// timestamps are automatically added and the status is automatically updated if an error occurs. // timestamps are automatically added and the status is automatically updated if an error occurs.
func WithOperation(oplog *oplog.OpLog, op *v1.Operation, do func() error) error { func WithOperation(oplog *oplog.OpLog, op *v1.Operation, do func() error) error {
@@ -233,3 +240,12 @@ func curTimeMillis() int64 {
t := time.Now() t := time.Now()
return t.Unix()*1000 + int64(t.Nanosecond()/1000000) return t.Unix()*1000 + int64(t.Nanosecond()/1000000)
} }
func containsSnapshotOperation(ops []*v1.Operation) bool {
for _, op := range ops {
if _, ok := op.Op.(*v1.Operation_OperationIndexSnapshot); ok {
return true
}
}
return false
}

View File

@@ -12,13 +12,18 @@ message OperationList {
message Operation { message Operation {
int64 id = 1; int64 id = 1;
string repo_id = 2; // repo id if associated with a repo (always true) // repo id if associated with a repo (always true)
string plan_id = 3; // plan id if associated with a plan (always true) string repo_id = 2;
string snapshot_id = 8; // snapshot id if associated with a snapshot. // plan id if associated with a plan (always true)
string plan_id = 3;
// snapshot id if associated with a snapshot.
string snapshot_id = 8;
OperationStatus status = 4; OperationStatus status = 4;
int64 unix_time_start_ms = 5; // unix time in milliseconds of the operation's creation (ID is derived from this)
int64 unix_time_start_ms = 5;
int64 unix_time_end_ms = 6; int64 unix_time_end_ms = 6;
string display_message = 7; // human readable context message (if any) // human readable context message, typically an error message.
string display_message = 7;
oneof op { oneof op {
OperationBackup operation_backup = 100; OperationBackup operation_backup = 100;

View File

@@ -100,17 +100,17 @@ export const OperationRow = ({
color = "blue"; color = "blue";
} }
let opType = "Message";
if (operation.operationBackup) {
opType = "Backup";
} else if (operation.operationIndexSnapshot) {
opType = "Snapshot";
}
if ( if (
operation.displayMessage && operation.displayMessage &&
operation.status === OperationStatus.STATUS_ERROR operation.status === OperationStatus.STATUS_ERROR
) { ) {
let opType = "Message";
if (operation.operationBackup) {
opType = "Backup";
} else if (operation.operationIndexSnapshot) {
opType = "Snapshot";
}
return ( return (
<List.Item> <List.Item>
<List.Item.Meta <List.Item.Meta