mirror of
https://github.com/garethgeorge/backrest.git
synced 2026-05-06 04:50:35 +00:00
261 lines
6.6 KiB
Go
261 lines
6.6 KiB
Go
package oplog
|
|
|
|
import (
|
|
"errors"
|
|
"fmt"
|
|
"slices"
|
|
"sync"
|
|
|
|
v1 "github.com/garethgeorge/backrest/gen/go/v1"
|
|
)
|
|
|
|
type OperationEvent int
|
|
|
|
const (
|
|
OPERATION_ADDED OperationEvent = iota
|
|
OPERATION_UPDATED
|
|
OPERATION_DELETED
|
|
)
|
|
|
|
var (
|
|
ErrStopIteration = errors.New("stop iteration")
|
|
ErrNotExist = errors.New("operation does not exist")
|
|
ErrExist = errors.New("operation already exists")
|
|
ErrNoResults = errors.New("no results found")
|
|
|
|
NullOPID = int64(0)
|
|
)
|
|
|
|
type Subscription = func(ops []*v1.Operation, event OperationEvent)
|
|
|
|
type subAndQuery struct {
|
|
f *Subscription
|
|
q Query
|
|
}
|
|
|
|
type OpLog struct {
|
|
store OpStore
|
|
|
|
subscribersMu sync.Mutex
|
|
subscribers []subAndQuery
|
|
}
|
|
|
|
func NewOpLog(store OpStore) (*OpLog, error) {
|
|
o := &OpLog{
|
|
store: store,
|
|
}
|
|
|
|
return o, nil
|
|
}
|
|
|
|
func (o *OpLog) curSubscribers() []subAndQuery {
|
|
o.subscribersMu.Lock()
|
|
defer o.subscribersMu.Unlock()
|
|
return slices.Clone(o.subscribers)
|
|
}
|
|
|
|
func (o *OpLog) notify(ops []*v1.Operation, event OperationEvent) {
|
|
for _, sub := range o.curSubscribers() {
|
|
notifyOps := make([]*v1.Operation, 0, len(ops))
|
|
for _, op := range ops {
|
|
if sub.q.Match(op) {
|
|
notifyOps = append(notifyOps, op)
|
|
}
|
|
}
|
|
if len(notifyOps) > 0 {
|
|
(*sub.f)(notifyOps, event)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (o *OpLog) Query(q Query, f func(*v1.Operation) error) error {
|
|
return o.store.Query(q, f)
|
|
}
|
|
|
|
func (o *OpLog) FindOne(q Query) (*v1.Operation, error) {
|
|
var found *v1.Operation
|
|
err := o.store.Query(q, func(op *v1.Operation) error {
|
|
if found != nil {
|
|
return errors.New("more than one operation found")
|
|
}
|
|
found = op
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if found == nil {
|
|
return nil, ErrNoResults
|
|
}
|
|
return found, nil
|
|
}
|
|
|
|
func (o *OpLog) QueryMetadata(q Query, f func(OpMetadata) error) error {
|
|
return o.store.QueryMetadata(q, f)
|
|
}
|
|
|
|
func (o *OpLog) FindOneMetadata(q Query) (OpMetadata, error) {
|
|
var found OpMetadata
|
|
err := o.store.QueryMetadata(q, func(op OpMetadata) error {
|
|
if found.ID != 0 {
|
|
return errors.New("more than one operation found")
|
|
}
|
|
found = op
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
return OpMetadata{}, err
|
|
}
|
|
if found.ID == 0 {
|
|
return OpMetadata{}, ErrNoResults
|
|
}
|
|
return found, nil
|
|
}
|
|
|
|
func (o *OpLog) Subscribe(q Query, f *Subscription) {
|
|
o.subscribersMu.Lock()
|
|
defer o.subscribersMu.Unlock()
|
|
o.subscribers = append(o.subscribers, subAndQuery{f: f, q: q})
|
|
}
|
|
|
|
func (o *OpLog) Unsubscribe(f *Subscription) error {
|
|
o.subscribersMu.Lock()
|
|
defer o.subscribersMu.Unlock()
|
|
for i, sub := range o.subscribers {
|
|
if sub.f == f {
|
|
o.subscribers = append(o.subscribers[:i], o.subscribers[i+1:]...)
|
|
return nil
|
|
}
|
|
}
|
|
return errors.New("subscription not found")
|
|
}
|
|
|
|
func (o *OpLog) Get(opID int64) (*v1.Operation, error) {
|
|
return o.store.Get(opID)
|
|
}
|
|
|
|
func (o *OpLog) Add(ops ...*v1.Operation) error {
|
|
for _, o := range ops {
|
|
if o.Id != 0 || o.Modno != 0 {
|
|
return errors.New("operation already has an ID or Modno, OpLog.Add is expected to set the ID/Modno")
|
|
}
|
|
}
|
|
if err := o.store.Add(ops...); err != nil {
|
|
return err
|
|
}
|
|
|
|
o.notify(ops, OPERATION_ADDED)
|
|
return nil
|
|
}
|
|
|
|
func (o *OpLog) Update(ops ...*v1.Operation) error {
|
|
for _, o := range ops {
|
|
if o.Id == 0 {
|
|
return errors.New("operation does not have an ID, OpLog.Update is expected to have an ID")
|
|
}
|
|
}
|
|
|
|
if err := o.store.Update(ops...); err != nil {
|
|
return err
|
|
}
|
|
|
|
o.notify(ops, OPERATION_UPDATED)
|
|
return nil
|
|
}
|
|
|
|
// SetOptions configures the behavior of Set.
|
|
type SetOptions struct {
|
|
InsertOnly bool // If true, only insert; fail if the operation already exists (Id != 0).
|
|
UpdateOnly bool // If true, only update; fail if the operation does not exist (Id == 0).
|
|
AllocateID bool // If true, allocate a new Id for operations with Id == 0.
|
|
}
|
|
|
|
func (o *OpLog) Set(opts SetOptions, ops ...*v1.Operation) error {
|
|
if opts.InsertOnly && opts.UpdateOnly {
|
|
return errors.New("InsertOnly and UpdateOnly are mutually exclusive")
|
|
}
|
|
for _, op := range ops {
|
|
if opts.InsertOnly && op.Id != 0 {
|
|
return fmt.Errorf("InsertOnly but operation already has Id %d", op.Id)
|
|
}
|
|
if opts.UpdateOnly && op.Id == 0 {
|
|
return errors.New("UpdateOnly but operation has no Id")
|
|
}
|
|
}
|
|
|
|
isNew := make([]bool, len(ops))
|
|
for i, op := range ops {
|
|
isNew[i] = op.Id == 0
|
|
}
|
|
if err := o.store.Set(opts, ops...); err != nil {
|
|
return err
|
|
}
|
|
for i, op := range ops {
|
|
if isNew[i] {
|
|
o.notify([]*v1.Operation{op}, OPERATION_ADDED)
|
|
} else {
|
|
o.notify([]*v1.Operation{op}, OPERATION_UPDATED)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (o *OpLog) Delete(opID ...int64) error {
|
|
removedOps, err := o.store.Delete(opID...)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
o.notify(removedOps, OPERATION_DELETED)
|
|
return nil
|
|
}
|
|
|
|
func (o *OpLog) Transform(q Query, f func(*v1.Operation) (*v1.Operation, error)) error {
|
|
return o.store.Transform(q, f)
|
|
}
|
|
|
|
func (o *OpLog) GetHighestOpIDAndModno(q Query) (int64, int64, error) {
|
|
return o.store.GetHighestOpIDAndModno(q)
|
|
}
|
|
|
|
type OpStore interface {
|
|
// Query returns all operations that match the query.
|
|
Query(q Query, f func(*v1.Operation) error) error
|
|
// QueryMetadata is like Query, but only returns metadata about the operations.
|
|
// this is useful for very high performance scans that don't deserialize the operation itself.
|
|
QueryMetadata(q Query, f func(OpMetadata) error) error
|
|
// Get returns the operation with the given ID.
|
|
Get(opID int64) (*v1.Operation, error)
|
|
// GetHighestOpIDAndModno returns the highest operation ID and modno in the store, used for synchronization.
|
|
GetHighestOpIDAndModno(q Query) (int64, int64, error)
|
|
// Add adds the given operations to the store.
|
|
Add(op ...*v1.Operation) error
|
|
// Update updates the given operations in the store.
|
|
Update(op ...*v1.Operation) error
|
|
// Set inserts or updates operations. Zero-valued fields (Id, Modno, FlowId) are
|
|
// allocated automatically (like Add/Update), but non-zero values provided by the
|
|
// caller are preserved. If Id is non-zero, it updates; if Id is zero, it inserts.
|
|
Set(opts SetOptions, op ...*v1.Operation) error
|
|
// Delete removes the operations with the given IDs from the store, and returns the removed operations.
|
|
Delete(opID ...int64) ([]*v1.Operation, error)
|
|
// Transform applies the given function to each operation that matches the query.
|
|
Transform(q Query, f func(*v1.Operation) (*v1.Operation, error)) error
|
|
// Version returns the current data version
|
|
Version() (int64, error)
|
|
// SetVersion sets the data version
|
|
SetVersion(version int64) error
|
|
}
|
|
|
|
// OpMetadata is a struct that contains metadata about an operation without fetching the operation itself.
|
|
type OpMetadata struct {
|
|
ID int64
|
|
FlowID int64
|
|
Modno int64
|
|
OriginalID int64
|
|
OriginalFlowID int64
|
|
Status v1.OperationStatus
|
|
RepoID string
|
|
RepoGUID string
|
|
PlanID string
|
|
}
|