mirror of
https://github.com/HeyPuter/puter.git
synced 2026-05-04 08:30:39 +00:00
99d96edd9c
* feat: s3 fs in oss * feat: new endpoints in OSS * fix: name of fs extension * perf: signed uploads * fix: await socket events to align fs events * fix: default bucket names * fix: backend tests * fix: deps * fix: order
611 lines
19 KiB
JavaScript
611 lines
19 KiB
JavaScript
import { TeePromise } from 'teepromise';
|
|
import BaseOperation from './BaseOperation.js';
|
|
import Delete from './Delete.js';
|
|
import Insert from './Insert.js';
|
|
import Update from './Update.js';
|
|
|
|
const { db } = extension.import('data');
|
|
const svc_params = extension.import('service:params');
|
|
|
|
const { PuterPath } = extension.import('fs');
|
|
|
|
const {
|
|
RootNodeSelector,
|
|
NodeChildSelector,
|
|
NodeUIDSelector,
|
|
NodePathSelector,
|
|
NodeInternalIDSelector,
|
|
} = extension.import('core').fs.selectors;
|
|
|
|
export default class FSEntryController {
|
|
static CONCERN = 'filesystem';
|
|
|
|
static STATUS_READY = {};
|
|
static STATUS_RUNNING_JOB = {};
|
|
|
|
constructor () {
|
|
this.status = FSEntryController.STATUS_READY;
|
|
|
|
this.currentState = {
|
|
queue: [],
|
|
updating_uuids: {},
|
|
};
|
|
this.deferredState = {
|
|
queue: [],
|
|
updating_uuids: {},
|
|
};
|
|
|
|
this.entryListeners_ = {};
|
|
|
|
this.mkPromiseForQueueSize_();
|
|
|
|
// this list of properties is for read operations
|
|
// (originally in FSEntryFetcher)
|
|
this.defaultProperties = [
|
|
'id',
|
|
'associated_app_id',
|
|
'uuid',
|
|
'public_token',
|
|
'bucket',
|
|
'bucket_region',
|
|
'file_request_token',
|
|
'user_id',
|
|
'parent_uid',
|
|
'is_dir',
|
|
'is_public',
|
|
'is_shortcut',
|
|
'is_symlink',
|
|
'symlink_path',
|
|
'shortcut_to',
|
|
'sort_by',
|
|
'sort_order',
|
|
'immutable',
|
|
'name',
|
|
'metadata',
|
|
'modified',
|
|
'created',
|
|
'accessed',
|
|
'size',
|
|
'layout',
|
|
'path',
|
|
];
|
|
|
|
this.subdomainProperties = [
|
|
'uuid',
|
|
'subdomain',
|
|
];
|
|
}
|
|
|
|
init () {
|
|
svc_params.createParameters('fsentry-service', [
|
|
{
|
|
id: 'max_queue',
|
|
description: 'Maximum queue size',
|
|
default: 50,
|
|
},
|
|
], this);
|
|
|
|
}
|
|
|
|
mkPromiseForQueueSize_ () {
|
|
this.queueSizePromise = new Promise((resolve, reject) => {
|
|
this.queueSizeResolve = resolve;
|
|
});
|
|
}
|
|
|
|
// #region write operations
|
|
async insert (entry) {
|
|
const op = new Insert(entry);
|
|
await this.enqueue_(op);
|
|
return op;
|
|
}
|
|
|
|
async update (uuid, entry) {
|
|
const op = new Update(uuid, entry);
|
|
await this.enqueue_(op);
|
|
return op;
|
|
}
|
|
|
|
async delete (uuid) {
|
|
const op = new Delete(uuid);
|
|
await this.enqueue_(op);
|
|
return op;
|
|
}
|
|
// #endregion
|
|
|
|
// #region read operations
|
|
async fast_get_descendants (uuid) {
|
|
return (await db.read(`
|
|
WITH RECURSIVE descendant_cte AS (
|
|
SELECT uuid, parent_uid
|
|
FROM fsentries
|
|
WHERE parent_uid = ?
|
|
|
|
UNION ALL
|
|
|
|
SELECT f.uuid, f.parent_uid
|
|
FROM fsentries f
|
|
INNER JOIN descendant_cte d ON f.parent_uid = d.uuid
|
|
)
|
|
SELECT uuid FROM descendant_cte
|
|
`, [uuid])).map(x => x.uuid);
|
|
}
|
|
|
|
async fast_get_direct_descendants (uuid) {
|
|
return (uuid === PuterPath.NULL_UUID
|
|
? await db.read('SELECT uuid FROM fsentries WHERE parent_uid IS NULL')
|
|
: await db.read(
|
|
'SELECT uuid FROM fsentries WHERE parent_uid = ?',
|
|
[uuid],
|
|
)).map(x => x.uuid);
|
|
}
|
|
|
|
waitForEntry (node, callback) {
|
|
// *** uncomment to debug slow waits ***
|
|
// console.log('ATTEMPT TO WAIT FOR', selector.describe())
|
|
let selector = node.get_selector_of_type(NodeUIDSelector);
|
|
if ( selector === null ) {
|
|
// console.log(new Error('========'));
|
|
return;
|
|
}
|
|
|
|
const entry_already_enqueued =
|
|
Object.prototype.hasOwnProperty.call(this.currentState.updating_uuids, selector.value) ||
|
|
Object.prototype.hasOwnProperty.call(this.deferredState.updating_uuids, selector.value) ;
|
|
|
|
if ( entry_already_enqueued ) {
|
|
callback();
|
|
return;
|
|
}
|
|
|
|
const k = `uid:${selector.value}`;
|
|
if ( ! Object.prototype.hasOwnProperty.call(this.entryListeners_, k) ) {
|
|
this.entryListeners_[k] = [];
|
|
}
|
|
|
|
const det = {
|
|
detach: () => {
|
|
const i = this.entryListeners_[k].indexOf(callback);
|
|
if ( i === -1 ) return;
|
|
this.entryListeners_[k].splice(i, 1);
|
|
if ( this.entryListeners_[k].length === 0 ) {
|
|
delete this.entryListeners_[k];
|
|
}
|
|
},
|
|
};
|
|
|
|
this.entryListeners_[k].push(callback);
|
|
|
|
return det;
|
|
}
|
|
|
|
async get (uuid, fetch_entry_options) {
|
|
const answer = {};
|
|
for ( const op of this.currentState.queue ) {
|
|
if ( op.uuid != uuid ) continue;
|
|
op.apply(answer);
|
|
}
|
|
for ( const op of this.deferredState.queue ) {
|
|
if ( op.uuid != uuid ) continue;
|
|
op.apply(answer);
|
|
op.apply(answer);
|
|
}
|
|
if ( answer.is_diff ) {
|
|
const base_entry = await this.find(
|
|
new NodeUIDSelector(uuid),
|
|
fetch_entry_options,
|
|
);
|
|
answer.entry = { ...base_entry, ...answer.entry };
|
|
}
|
|
return answer.entry;
|
|
}
|
|
|
|
/**
|
|
* Returns UUIDs of child fsentries under the specified
|
|
* parent fsentry
|
|
* @param {string} uuid - UUID of parent fsentry
|
|
* @returns fsentry[]
|
|
*/
|
|
async get_descendants (uuid) {
|
|
return uuid === PuterPath.NULL_UUID
|
|
? await db.read(
|
|
'SELECT uuid FROM fsentries WHERE parent_uid IS NULL',
|
|
[uuid],
|
|
)
|
|
: await db.read(
|
|
'SELECT uuid FROM fsentries WHERE parent_uid = ?',
|
|
[uuid],
|
|
)
|
|
;
|
|
}
|
|
|
|
/**
|
|
* Returns full fsentry nodes for entries under the specified
|
|
* parent fsentry
|
|
* @param {string} uuid - UUID of parent fsentry
|
|
* @returns fsentry[]
|
|
*/
|
|
async get_descendants_full (uuid, fetch_entry_options) {
|
|
const { thumbnail } = fetch_entry_options;
|
|
const columns = `${
|
|
[
|
|
...this.defaultProperties.map(v => `f.${v}`),
|
|
...this.subdomainProperties
|
|
.map(v => `s.${v} AS subdomain_${v}`),
|
|
].join(', ')
|
|
}${thumbnail ? ', thumbnail' : ''}`;
|
|
const results_with_dupes = uuid === PuterPath.NULL_UUID
|
|
? await db.read(
|
|
`SELECT ${columns} FROM fsentries WHERE parent_uid IS NULL`,
|
|
[uuid],
|
|
)
|
|
: await db.read(
|
|
`SELECT ${columns} FROM fsentries AS f ` +
|
|
'LEFT JOIN subdomains AS s ON f.id=s.root_dir_id ' +
|
|
'WHERE parent_uid = ? ORDER BY f.id',
|
|
[uuid],
|
|
)
|
|
;
|
|
|
|
const byId = new Map();
|
|
for ( const row of results_with_dupes ) {
|
|
const id = row.id;
|
|
let entry = byId.get(id);
|
|
if ( ! entry ) {
|
|
entry = { ...row };
|
|
if ( thumbnail ) entry.thumbnail = row.thumbnail;
|
|
entry.subdomains = [];
|
|
byId.set(id, entry);
|
|
}
|
|
if ( row.subdomain_uuid != null ) {
|
|
entry.subdomains.push({
|
|
uuid: row.subdomain_uuid,
|
|
subdomain: row.subdomain_subdomain,
|
|
});
|
|
}
|
|
}
|
|
return Array.from(byId.values());
|
|
}
|
|
|
|
async get_recursive_size (uuid) {
|
|
const cte_query = `
|
|
WITH RECURSIVE descendant_cte AS (
|
|
SELECT uuid, parent_uid, size
|
|
FROM fsentries
|
|
WHERE parent_uid = ?
|
|
|
|
UNION ALL
|
|
|
|
SELECT f.uuid, f.parent_uid, f.size
|
|
FROM fsentries f
|
|
INNER JOIN descendant_cte d
|
|
ON f.parent_uid = d.uuid
|
|
)
|
|
SELECT SUM(size) AS total_size FROM descendant_cte
|
|
`;
|
|
const rows = await db.read(cte_query, [uuid]);
|
|
return rows[0].total_size;
|
|
}
|
|
|
|
/**
|
|
* Finds a filesystem entry using the provided selector.
|
|
* @param {Object} selector - The selector object specifying how to find the entry
|
|
* @param {Object} fetch_entry_options - Options for fetching the entry
|
|
* @returns {Promise<Object|null>} The filesystem entry or null if not found
|
|
*/
|
|
async find (selector, fetch_entry_options) {
|
|
if ( selector instanceof RootNodeSelector ) {
|
|
return selector.entry;
|
|
}
|
|
if ( selector instanceof NodePathSelector ) {
|
|
return await this.findByPath(selector.value, fetch_entry_options);
|
|
}
|
|
if ( selector instanceof NodeUIDSelector ) {
|
|
return await this.findByUID(selector.value, fetch_entry_options);
|
|
}
|
|
if ( selector instanceof NodeInternalIDSelector ) {
|
|
return await this.findByID(selector.id, fetch_entry_options);
|
|
}
|
|
if ( selector instanceof NodeChildSelector ) {
|
|
let id;
|
|
|
|
if ( selector.parent instanceof RootNodeSelector ) {
|
|
id = await this.findNameInRoot(selector.name);
|
|
} else {
|
|
const parentEntry = await this.find(selector.parent);
|
|
if ( ! parentEntry ) return null;
|
|
id = await this.findNameInParent(parentEntry.uuid, selector.name);
|
|
}
|
|
|
|
if ( id === undefined ) return null;
|
|
if ( typeof id !== 'number' ) {
|
|
throw new Error(
|
|
'unexpected type for id value',
|
|
typeof id,
|
|
id,
|
|
);
|
|
}
|
|
return this.find(new NodeInternalIDSelector('mysql', id));
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Finds a filesystem entry by its UUID.
|
|
* @param {string} uuid - The UUID of the entry to find
|
|
* @param {Object} fetch_entry_options - Options including thumbnail flag
|
|
* @returns {Promise<Object|undefined>} The filesystem entry or undefined if not found
|
|
*/
|
|
async findByUID (uuid, fetch_entry_options = {}) {
|
|
const { thumbnail } = fetch_entry_options;
|
|
|
|
let fsentry = await db.tryHardRead(
|
|
`SELECT ${
|
|
this.defaultProperties.join(', ')
|
|
}${thumbnail ? ', thumbnail' : ''
|
|
} FROM fsentries WHERE uuid = ? LIMIT 1`,
|
|
[uuid],
|
|
);
|
|
|
|
return fsentry[0];
|
|
}
|
|
|
|
/**
|
|
* Finds a filesystem entry by its internal database ID.
|
|
* @param {number} id - The internal ID of the entry to find
|
|
* @param {Object} fetch_entry_options - Options including thumbnail flag
|
|
* @returns {Promise<Object|undefined>} The filesystem entry or undefined if not found
|
|
*/
|
|
async findByID (id, fetch_entry_options = {}) {
|
|
const { thumbnail } = fetch_entry_options;
|
|
|
|
let fsentry = await db.tryHardRead(
|
|
`SELECT ${
|
|
this.defaultProperties.join(', ')
|
|
}${thumbnail ? ', thumbnail' : ''
|
|
} FROM fsentries WHERE id = ? LIMIT 1`,
|
|
[id],
|
|
);
|
|
|
|
return fsentry[0];
|
|
}
|
|
|
|
/**
|
|
* Finds a filesystem entry by its full path.
|
|
* @param {string} path - The full path of the entry to find
|
|
* @param {Object} fetch_entry_options - Options including thumbnail flag and tracer
|
|
* @returns {Promise<Object|false>} The filesystem entry or false if not found
|
|
*/
|
|
async findByPath (path, fetch_entry_options = {}) {
|
|
const { thumbnail } = fetch_entry_options;
|
|
|
|
if ( path === '/' ) {
|
|
return this.find(new RootNodeSelector());
|
|
}
|
|
|
|
const parts = path.split('/').filter(path => path !== '');
|
|
if ( parts.length === 0 ) {
|
|
// TODO: invalid path; this should be an error
|
|
return false;
|
|
}
|
|
|
|
// TODO: use a closure table for more efficient path resolving
|
|
let parent_uid = null;
|
|
let result;
|
|
|
|
const resultColsSql = this.defaultProperties.join(', ') +
|
|
(thumbnail ? ', thumbnail' : '');
|
|
|
|
result = await db.read(
|
|
`SELECT ${ resultColsSql
|
|
} FROM fsentries WHERE path=? LIMIT 1`,
|
|
[path],
|
|
);
|
|
|
|
// using knex instead
|
|
|
|
if ( result[0] ) return result[0];
|
|
|
|
const loop = async () => {
|
|
for ( let i = 0 ; i < parts.length ; i++ ) {
|
|
const part = parts[i];
|
|
const isLast = i == parts.length - 1;
|
|
const colsSql = isLast ? resultColsSql : 'uuid';
|
|
if ( parent_uid === null ) {
|
|
result = await db.read(
|
|
`SELECT ${ colsSql
|
|
} FROM fsentries WHERE parent_uid IS NULL AND name=? LIMIT 1`,
|
|
[part],
|
|
);
|
|
} else {
|
|
result = await db.read(
|
|
`SELECT ${ colsSql
|
|
} FROM fsentries WHERE parent_uid=? AND name=? LIMIT 1`,
|
|
[parent_uid, part],
|
|
);
|
|
}
|
|
|
|
if ( ! result[0] ) return false;
|
|
parent_uid = result[0].uuid;
|
|
}
|
|
};
|
|
|
|
if ( fetch_entry_options.tracer ) {
|
|
const tracer = fetch_entry_options.tracer;
|
|
const options = fetch_entry_options.trace_options;
|
|
await tracer.startActiveSpan(
|
|
'fs:sql:findByPath',
|
|
...(options ? [options] : []),
|
|
async span => {
|
|
await loop();
|
|
span.end();
|
|
},
|
|
);
|
|
} else {
|
|
await loop();
|
|
}
|
|
|
|
return result[0];
|
|
}
|
|
|
|
/**
|
|
* Finds the ID of a child entry with the given name in the root directory.
|
|
* @param {string} name - The name of the child entry to find
|
|
* @returns {Promise<number|undefined>} The ID of the child entry or undefined if not found
|
|
*/
|
|
async findNameInRoot (name) {
|
|
let child_id = await db.read(
|
|
'SELECT `id` FROM `fsentries` WHERE `parent_uid` IS NULL AND name = ? LIMIT 1',
|
|
[name],
|
|
);
|
|
return child_id[0]?.id;
|
|
}
|
|
|
|
/**
|
|
* Finds the ID of a child entry with the given name under a specific parent.
|
|
* @param {string} parent_uid - The UUID of the parent directory
|
|
* @param {string} name - The name of the child entry to find
|
|
* @returns {Promise<number|undefined>} The ID of the child entry or undefined if not found
|
|
*/
|
|
async findNameInParent (parent_uid, name) {
|
|
let child_id = await db.read(
|
|
'SELECT `id` FROM `fsentries` WHERE `parent_uid` = ? AND name = ? LIMIT 1',
|
|
[parent_uid, name],
|
|
);
|
|
return child_id[0]?.id;
|
|
}
|
|
|
|
/**
|
|
* Checks if an entry with the given name exists under a specific parent.
|
|
* @param {string} parent_uid - The UUID of the parent directory
|
|
* @param {string} name - The name to check for
|
|
* @returns {Promise<boolean>} True if the name exists under the parent, false otherwise
|
|
*/
|
|
async nameExistsUnderParent (parent_uid, name) {
|
|
let check_dupe = await db.read(
|
|
'SELECT `id` FROM `fsentries` WHERE `parent_uid` = ? AND name = ? LIMIT 1',
|
|
[parent_uid, name],
|
|
);
|
|
return !!check_dupe[0];
|
|
}
|
|
|
|
/**
|
|
* Checks if an entry with the given name exists under a parent specified by ID.
|
|
* @param {number} parent_id - The internal ID of the parent directory
|
|
* @param {string} name - The name to check for
|
|
* @returns {Promise<boolean>} True if the name exists under the parent, false otherwise
|
|
*/
|
|
async nameExistsUnderParentID (parent_id, name) {
|
|
const parent = await this.findByID(parent_id);
|
|
if ( ! parent ) {
|
|
return false;
|
|
}
|
|
return this.nameExistsUnderParent(parent.uuid, name);
|
|
}
|
|
// #endregion
|
|
|
|
// #region queue logic
|
|
async enqueue_ (op) {
|
|
const tp = new TeePromise();
|
|
while (
|
|
this.currentState.queue.length > this.max_queue ||
|
|
this.deferredState.queue.length > this.max_queue
|
|
) {
|
|
await this.queueSizePromise;
|
|
}
|
|
|
|
if ( ! (op instanceof BaseOperation) ) {
|
|
throw new Error('Invalid operation');
|
|
}
|
|
|
|
const state = this.status === FSEntryController.STATUS_READY ?
|
|
this.currentState : this.deferredState;
|
|
|
|
if ( ! Object.prototype.hasOwnProperty.call(state.updating_uuids, op.uuid) ) {
|
|
state.updating_uuids[op.uuid] = [];
|
|
}
|
|
state.updating_uuids[op.uuid].push(state.queue.length);
|
|
|
|
state.queue.push(op);
|
|
|
|
// DRY: same pattern as FSOperationContext:provideValue
|
|
// DRY: same pattern as FSOperationContext:rejectValue
|
|
if ( Object.prototype.hasOwnProperty.call(this.entryListeners_, op.uuid) ) {
|
|
const listeners = this.entryListeners_[op.uuid];
|
|
|
|
delete this.entryListeners_[op.uuid];
|
|
|
|
for ( const lis of listeners ) lis();
|
|
}
|
|
|
|
this.checkShouldExec_();
|
|
|
|
await op.awaitDone();
|
|
}
|
|
|
|
checkShouldExec_ () {
|
|
if ( this.status !== FSEntryController.STATUS_READY ) return;
|
|
if ( this.currentState.queue.length === 0 ) return;
|
|
this.exec_();
|
|
}
|
|
|
|
async exec_ () {
|
|
if ( this.status !== FSEntryController.STATUS_READY ) {
|
|
throw new Error('Duplicate exec_ call');
|
|
}
|
|
|
|
const queue = this.currentState.queue;
|
|
|
|
this.status = FSEntryController.STATUS_RUNNING_JOB;
|
|
|
|
// const conn = await db_primary.promise().getConnection();
|
|
// await conn.beginTransaction();
|
|
|
|
for ( const op of queue ) {
|
|
op.status = op.constructor.STATUS_RUNNING;
|
|
// await conn.execute(stmt, values);
|
|
}
|
|
|
|
// await conn.commit();
|
|
// conn.release();
|
|
|
|
// const stmtAndVals = queue.map(op => op.getStatementAndValues());
|
|
// const stmts = stmtAndVals.map(x => x.stmt).join('; ');
|
|
// const vals = stmtAndVals.reduce((acc, x) => acc.concat(x.values), []);
|
|
|
|
// *** uncomment to debug batch queries ***
|
|
// this.log.debug({ stmts, vals });
|
|
// console.log('<<========================');
|
|
// console.log({ stmts, vals });
|
|
// console.log('>>========================');
|
|
|
|
// this.log.debug('array?', Array.isArray(vals))
|
|
|
|
await db.batch_write(queue.map(op => op.getStatement()));
|
|
|
|
for ( const op of queue ) {
|
|
op.status = op.constructor.STATUS_DONE;
|
|
}
|
|
|
|
this.flipState_();
|
|
this.status = FSEntryController.STATUS_READY;
|
|
|
|
for ( const op of queue ) {
|
|
op.status = op.constructor.STATUS_DONE;
|
|
}
|
|
|
|
this.checkShouldExec_();
|
|
}
|
|
|
|
flipState_ () {
|
|
this.currentState = this.deferredState;
|
|
this.deferredState = {
|
|
queue: [],
|
|
updating_uuids: {},
|
|
};
|
|
const queueSizeResolve = this.queueSizeResolve;
|
|
this.mkPromiseForQueueSize_();
|
|
queueSizeResolve();
|
|
}
|
|
// #endregion
|
|
}
|