package logstore import ( "bytes" "compress/gzip" "context" "crypto/rand" "encoding/hex" "errors" "fmt" "io" "os" "path/filepath" "sync" "time" "github.com/hashicorp/go-multierror" "go.uber.org/zap" "zombiezen.com/go/sqlite" "zombiezen.com/go/sqlite/sqlitex" ) var ( ErrLogNotFound = fmt.Errorf("log not found") ) type LogStore struct { dir string inprogressDir string mu shardedRWMutex dbpool *sqlitex.Pool trackingMu sync.Mutex // guards refcount and subscribers refcount map[string]int // id : refcount subscribers map[string][]chan struct{} } func NewLogStore(dir string) (*LogStore, error) { if err := os.MkdirAll(dir, 0755); err != nil { return nil, fmt.Errorf("create dir: %v", err) } dbpath := filepath.Join(dir, "logs.sqlite") dbpool, err := sqlitex.NewPool(dbpath, sqlitex.PoolOptions{ PoolSize: 16, Flags: sqlite.OpenReadWrite | sqlite.OpenCreate | sqlite.OpenWAL, }) if err != nil { return nil, fmt.Errorf("open sqlite pool: %v", err) } ls := &LogStore{ dir: dir, inprogressDir: filepath.Join(dir, ".inprogress"), mu: newShardedRWMutex(64), // 64 shards should be enough to avoid much contention dbpool: dbpool, subscribers: make(map[string][]chan struct{}), refcount: make(map[string]int), } if err := ls.init(); err != nil { return nil, fmt.Errorf("init log store: %v", err) } return ls, nil } func (ls *LogStore) init() error { if err := os.MkdirAll(ls.inprogressDir, 0755); err != nil { return fmt.Errorf("create inprogress dir: %v", err) } conn, err := ls.dbpool.Take(context.Background()) if err != nil { return fmt.Errorf("take connection: %v", err) } defer ls.dbpool.Put(conn) if err := sqlitex.ExecuteScript(conn, ` PRAGMA auto_vacuum = 1; PRAGMA journal_mode=WAL; CREATE TABLE IF NOT EXISTS logs ( id TEXT PRIMARY KEY, expiration_ts_unix INTEGER DEFAULT 0, -- unix timestamp of when the log will expire owner_opid INTEGER DEFAULT 0, -- id of the operation that owns this log; will be used for cleanup. data_fname TEXT, -- relative path to the file containing the log data data_gz BLOB -- compressed log data as an alternative to data_fname ); CREATE INDEX IF NOT EXISTS logs_data_fname_idx ON logs (data_fname); CREATE INDEX IF NOT EXISTS logs_expiration_ts_unix_idx ON logs (expiration_ts_unix); CREATE TABLE IF NOT EXISTS version_info ( version INTEGER NOT NULL ); -- Create a table to store the schema version, will be used for migrations in the future INSERT INTO version_info (version) SELECT 0 WHERE NOT EXISTS (SELECT 1 FROM version_info); `, nil); err != nil { return fmt.Errorf("execute init script: %v", err) } // loop through all inprogress files and finalize them if they are in the database files, err := os.ReadDir(ls.inprogressDir) if err != nil { return fmt.Errorf("read inprogress dir: %v", err) } for _, file := range files { if file.IsDir() { continue } fname := file.Name() var id string if err := sqlitex.ExecuteTransient(conn, "SELECT id FROM logs WHERE data_fname = ?", &sqlitex.ExecOptions{ Args: []any{fname}, ResultFunc: func(stmt *sqlite.Stmt) error { id = stmt.ColumnText(0) return nil }, }); err != nil { return fmt.Errorf("select log: %v", err) } if id != "" { err := ls.finalizeLogFile(id, fname) if err != nil { zap.S().Warnf("sqlite log writer couldn't finalize dangling inprogress log file %v: %v", fname, err) continue } } if err := os.Remove(filepath.Join(ls.inprogressDir, fname)); err != nil { zap.S().Warnf("sqlite log writer couldn't remove dangling inprogress log file %v: %v", fname, err) } } return nil } func (ls *LogStore) Close() error { return ls.dbpool.Close() } func (ls *LogStore) Create(id string, parentOpID int64, ttl time.Duration) (io.WriteCloser, error) { ls.mu.Lock(id) defer ls.mu.Unlock(id) conn, err := ls.dbpool.Take(context.Background()) if err != nil { return nil, fmt.Errorf("take connection: %v", err) } defer ls.dbpool.Put(conn) // potentially prune any expired logs if err := sqlitex.ExecuteTransient(conn, "DELETE FROM logs WHERE expiration_ts_unix < ? AND expiration_ts_unix != 0", &sqlitex.ExecOptions{ Args: []any{time.Now().Unix()}, }); err != nil { return nil, fmt.Errorf("prune expired logs: %v", err) } // create a random file for the log while it's being written randBytes := make([]byte, 16) if _, err := rand.Read(randBytes); err != nil { return nil, fmt.Errorf("generate random bytes: %v", err) } fname := hex.EncodeToString(randBytes) + ".log" f, err := os.Create(filepath.Join(ls.inprogressDir, fname)) if err != nil { return nil, fmt.Errorf("create temp file: %v", err) } expire_ts_unix := time.Unix(0, 0) if ttl != 0 { expire_ts_unix = time.Now().Add(ttl) } // fmt.Printf("INSERT INTO logs (id, expiration_ts_unix, owner_opid, data_fname) VALUES (%v, %v, %v, %v)\n", id, expire_ts_unix.Unix(), parentOpID, fname) if err := sqlitex.ExecuteTransient(conn, "INSERT INTO logs (id, expiration_ts_unix, owner_opid, data_fname) VALUES (?, ?, ?, ?)", &sqlitex.ExecOptions{ Args: []any{id, expire_ts_unix.Unix(), parentOpID, fname}, }); err != nil { return nil, fmt.Errorf("insert log: %v", err) } ls.trackingMu.Lock() ls.subscribers[id] = make([]chan struct{}, 0) ls.refcount[id] = 1 ls.trackingMu.Unlock() return &writer{ ls: ls, f: f, fname: fname, id: id, }, nil } func (ls *LogStore) Open(id string) (io.ReadCloser, error) { ls.mu.Lock(id) defer ls.mu.Unlock(id) conn, err := ls.dbpool.Take(context.Background()) if err != nil { return nil, fmt.Errorf("take connection: %v", err) } defer ls.dbpool.Put(conn) var found bool var fname string var dataGz []byte if err := sqlitex.ExecuteTransient(conn, "SELECT data_fname, data_gz FROM logs WHERE id = ?", &sqlitex.ExecOptions{ Args: []any{id}, ResultFunc: func(stmt *sqlite.Stmt) error { found = true if !stmt.ColumnIsNull(0) { fname = stmt.ColumnText(0) } if !stmt.ColumnIsNull(1) { dataGz = make([]byte, stmt.ColumnLen(1)) stmt.ColumnBytes(1, dataGz) } return nil }, }); err != nil { return nil, fmt.Errorf("select log: %v", err) } else if !found { return nil, ErrLogNotFound } if fname != "" { f, err := os.Open(filepath.Join(ls.inprogressDir, fname)) if err != nil { return nil, fmt.Errorf("open data file: %v", err) } ls.trackingMu.Lock() ls.refcount[id]++ ls.trackingMu.Unlock() return &reader{ ls: ls, f: f, id: id, fname: fname, closed: make(chan struct{}), }, nil } else if dataGz != nil { gzr, err := gzip.NewReader(bytes.NewReader(dataGz)) if err != nil { return nil, fmt.Errorf("create gzip reader: %v", err) } return gzr, nil } else { return nil, errors.New("log has no associated data. This shouldn't be possible") } } func (ls *LogStore) Delete(id string) error { ls.mu.Lock(id) defer ls.mu.Unlock(id) conn, err := ls.dbpool.Take(context.Background()) if err != nil { return fmt.Errorf("take connection: %v", err) } defer ls.dbpool.Put(conn) if err := sqlitex.ExecuteTransient(conn, "DELETE FROM logs WHERE id = ?", &sqlitex.ExecOptions{ Args: []any{id}, }); err != nil { return fmt.Errorf("delete log: %v", err) } if conn.Changes() == 0 { return ErrLogNotFound } return nil } func (ls *LogStore) DeleteWithParent(parentOpID int64) error { conn, err := ls.dbpool.Take(context.Background()) if err != nil { return fmt.Errorf("take connection: %v", err) } defer ls.dbpool.Put(conn) if err := sqlitex.ExecuteTransient(conn, "DELETE FROM logs WHERE owner_opid = ?", &sqlitex.ExecOptions{ Args: []any{parentOpID}, }); err != nil { return fmt.Errorf("delete log: %v", err) } return nil } func (ls *LogStore) SelectAll(f func(id string, parentID int64)) error { conn, err := ls.dbpool.Take(context.Background()) if err != nil { return fmt.Errorf("take connection: %v", err) } defer ls.dbpool.Put(conn) return sqlitex.ExecuteTransient(conn, "SELECT id, owner_opid FROM logs ORDER BY owner_opid", &sqlitex.ExecOptions{ ResultFunc: func(stmt *sqlite.Stmt) error { f(stmt.ColumnText(0), stmt.ColumnInt64(1)) return nil }, }) } func (ls *LogStore) subscribe(id string) chan struct{} { ls.trackingMu.Lock() defer ls.trackingMu.Unlock() subs, ok := ls.subscribers[id] if !ok { return nil } ch := make(chan struct{}) ls.subscribers[id] = append(subs, ch) return ch } func (ls *LogStore) notify(id string) { ls.trackingMu.Lock() defer ls.trackingMu.Unlock() subs, ok := ls.subscribers[id] if !ok { return } for _, ch := range subs { close(ch) } ls.subscribers[id] = subs[:0] } func (ls *LogStore) finalizeLogFile(id string, fname string) error { conn, err := ls.dbpool.Take(context.Background()) if err != nil { return fmt.Errorf("take connection: %v", err) } defer ls.dbpool.Put(conn) f, err := os.Open(filepath.Join(ls.inprogressDir, fname)) if err != nil { return err } defer f.Close() var dataGz bytes.Buffer gzw := gzip.NewWriter(&dataGz) if _, e := io.Copy(gzw, f); e != nil { return fmt.Errorf("compress log: %v", e) } if e := gzw.Close(); e != nil { return fmt.Errorf("close gzip writer: %v", err) } if e := sqlitex.ExecuteTransient(conn, "UPDATE logs SET data_fname = NULL, data_gz = ? WHERE id = ?", &sqlitex.ExecOptions{ Args: []any{dataGz.Bytes(), id}, }); e != nil { return fmt.Errorf("update log: %v", e) } else if conn.Changes() != 1 { return fmt.Errorf("expected 1 row to be updated, got %d", conn.Changes()) } return nil } func (ls *LogStore) maybeReleaseTempFile(id, fname string) error { ls.trackingMu.Lock() defer ls.trackingMu.Unlock() _, ok := ls.refcount[id] if ok { return nil } return os.Remove(filepath.Join(ls.inprogressDir, fname)) } type writer struct { ls *LogStore id string fname string f *os.File onClose sync.Once } var _ io.WriteCloser = (*writer)(nil) func (w *writer) Write(p []byte) (n int, err error) { w.ls.mu.Lock(w.id) defer w.ls.mu.Unlock(w.id) n, err = w.f.Write(p) if n != 0 { w.ls.notify(w.id) } return } func (w *writer) Close() error { err := w.f.Close() w.onClose.Do(func() { w.ls.mu.Lock(w.id) defer w.ls.mu.Unlock(w.id) defer w.ls.notify(w.id) if e := w.ls.finalizeLogFile(w.id, w.fname); e != nil { err = multierror.Append(err, fmt.Errorf("finalize %v: %w", w.fname, e)) } else { w.ls.refcount[w.id]-- } // manually close all subscribers and delete the subscriber entry from the map; there are no more writes coming. w.ls.trackingMu.Lock() if w.ls.refcount[w.id] == 0 { delete(w.ls.refcount, w.id) } subs := w.ls.subscribers[w.id] for _, ch := range subs { close(ch) } delete(w.ls.subscribers, w.id) w.ls.trackingMu.Unlock() w.ls.maybeReleaseTempFile(w.id, w.fname) }) return err } type reader struct { ls *LogStore id string fname string f *os.File onClose sync.Once closed chan struct{} // unblocks any read calls e.g. can be used for early cancellation } var _ io.ReadCloser = (*reader)(nil) func (r *reader) Read(p []byte) (n int, err error) { r.ls.mu.RLock(r.id) n, err = r.f.Read(p) if err == io.EOF { waiter := r.ls.subscribe(r.id) r.ls.mu.RUnlock(r.id) if waiter != nil { select { case <-waiter: case <-r.closed: return 0, io.EOF } } r.ls.mu.RLock(r.id) n, err = r.f.Read(p) } r.ls.mu.RUnlock(r.id) return } func (r *reader) Close() error { r.ls.mu.Lock(r.id) defer r.ls.mu.Unlock(r.id) err := r.f.Close() r.onClose.Do(func() { r.ls.trackingMu.Lock() r.ls.refcount[r.id]-- if r.ls.refcount[r.id] == 0 { delete(r.ls.refcount, r.id) } r.ls.trackingMu.Unlock() r.ls.maybeReleaseTempFile(r.id, r.fname) close(r.closed) }) return err }