mirror of
https://github.com/HeyPuter/puter.git
synced 2026-05-03 16:10:31 +00:00
cleanup: remove informationService in favour of existing helper methods and logic (#2374)
Docker Image CI / build-and-push-image (push) Has been cancelled
Maintain Release Merge PR / update-release-pr (push) Has been cancelled
release-please / release-please (push) Has been cancelled
test / test-backend (24.x) (push) Has been cancelled
test / API tests (node env, api-test) (24.x) (push) Has been cancelled
test / puterjs (node env, vitest) (24.x) (push) Has been cancelled
Docker Image CI / build-and-push-image (push) Has been cancelled
Maintain Release Merge PR / update-release-pr (push) Has been cancelled
release-please / release-please (push) Has been cancelled
test / test-backend (24.x) (push) Has been cancelled
test / API tests (node env, api-test) (24.x) (push) Has been cancelled
test / puterjs (node env, vitest) (24.x) (push) Has been cancelled
This commit is contained in:
@@ -27,7 +27,6 @@ import { v4 as uuidv4 } from 'uuid';
|
||||
const { db } = extension.import('data');
|
||||
|
||||
const svc_metering = extension.import('service:meteringService');
|
||||
const svc_trace = extension.import('service:traceService');
|
||||
const svc_fs = extension.import('service:filesystem');
|
||||
const { stuck_detector_stream, hashing_stream } = extension.import('core').util.streamutil;
|
||||
|
||||
@@ -63,6 +62,7 @@ const {
|
||||
|
||||
const {
|
||||
ParallelTasks,
|
||||
getTracer,
|
||||
} = extension.import('core').util.otelutil;
|
||||
|
||||
const {
|
||||
@@ -389,7 +389,7 @@ export default class PuterFSProvider {
|
||||
// explicitly requested.
|
||||
|
||||
if ( options.tracer == null ) {
|
||||
options.tracer = svc_trace.tracer;
|
||||
options.tracer = getTracer();
|
||||
}
|
||||
|
||||
if ( options.op ) {
|
||||
@@ -468,7 +468,7 @@ export default class PuterFSProvider {
|
||||
const actor = (context ?? Context).get('actor');
|
||||
const user = actor.type.user;
|
||||
|
||||
const tracer = svc_trace.tracer;
|
||||
const tracer = getTracer();
|
||||
const uuid = uuidv4();
|
||||
const timestamp = Math.round(Date.now() / 1000);
|
||||
await parent.fetchEntry();
|
||||
@@ -1016,7 +1016,7 @@ export default class PuterFSProvider {
|
||||
|
||||
svc_metering.incrementUsage(ownerActor, 'filesystem:delete:bytes', fileSize);
|
||||
|
||||
const tracer = svc_trace.tracer;
|
||||
const tracer = getTracer();
|
||||
const tasks = new ParallelTasks({ tracer, max: 4 });
|
||||
|
||||
tasks.add('remove-fsentry', async () => {
|
||||
|
||||
@@ -5,14 +5,9 @@ import Update from './Update.js';
|
||||
|
||||
const { db } = extension.import('data');
|
||||
const svc_params = extension.import('service:params');
|
||||
const svc_info = extension.import('service:information');
|
||||
|
||||
const { PuterPath } = extension.import('fs');
|
||||
|
||||
const { Context } = extension.import('core');
|
||||
|
||||
const { id2path } = extension.import('core').util.helpers;
|
||||
|
||||
const {
|
||||
RootNodeSelector,
|
||||
NodeChildSelector,
|
||||
@@ -21,14 +16,14 @@ const {
|
||||
NodeInternalIDSelector,
|
||||
} = extension.import('core').fs.selectors;
|
||||
|
||||
export default class {
|
||||
export default class FSEntryController {
|
||||
static CONCERN = 'filesystem';
|
||||
|
||||
static STATUS_READY = {};
|
||||
static STATUS_RUNNING_JOB = {};
|
||||
|
||||
constructor () {
|
||||
this.status = this.constructor.STATUS_READY;
|
||||
this.status = FSEntryController.STATUS_READY;
|
||||
|
||||
this.currentState = {
|
||||
queue: [],
|
||||
@@ -84,19 +79,6 @@ export default class {
|
||||
},
|
||||
], this);
|
||||
|
||||
// Register information providers
|
||||
|
||||
// uuid -> path via mysql
|
||||
svc_info.given('fs.fsentry:uuid').provide('fs.fsentry:path')
|
||||
.addStrategy('mysql', async uuid => {
|
||||
// TODO: move id2path here
|
||||
try {
|
||||
return await id2path(uuid);
|
||||
} catch (e) {
|
||||
console.error('DASH VOID ERROR !!', e);
|
||||
return `/-void/${ uuid}`;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
mkPromiseForQueueSize_ () {
|
||||
@@ -160,8 +142,8 @@ export default class {
|
||||
}
|
||||
|
||||
const entry_already_enqueued =
|
||||
this.currentState.updating_uuids.hasOwnProperty(selector.value) ||
|
||||
this.deferredState.updating_uuids.hasOwnProperty(selector.value) ;
|
||||
Object.prototype.hasOwnProperty.call(this.currentState.updating_uuids, selector.value) ||
|
||||
Object.prototype.hasOwnProperty.call(this.deferredState.updating_uuids, selector.value) ;
|
||||
|
||||
if ( entry_already_enqueued ) {
|
||||
callback();
|
||||
@@ -169,7 +151,7 @@ export default class {
|
||||
}
|
||||
|
||||
const k = `uid:${selector.value}`;
|
||||
if ( ! this.entryListeners_.hasOwnProperty(k) ) {
|
||||
if ( ! Object.prototype.hasOwnProperty.call(this.entryListeners_, k) ) {
|
||||
this.entryListeners_[k] = [];
|
||||
}
|
||||
|
||||
@@ -471,10 +453,10 @@ export default class {
|
||||
throw new Error('Invalid operation');
|
||||
}
|
||||
|
||||
const state = this.status === this.constructor.STATUS_READY ?
|
||||
const state = this.status === FSEntryController.STATUS_READY ?
|
||||
this.currentState : this.deferredState;
|
||||
|
||||
if ( ! state.updating_uuids.hasOwnProperty(op.uuid) ) {
|
||||
if ( ! Object.prototype.hasOwnProperty.call(state.updating_uuids, op.uuid) ) {
|
||||
state.updating_uuids[op.uuid] = [];
|
||||
}
|
||||
state.updating_uuids[op.uuid].push(state.queue.length);
|
||||
@@ -483,7 +465,7 @@ export default class {
|
||||
|
||||
// DRY: same pattern as FSOperationContext:provideValue
|
||||
// DRY: same pattern as FSOperationContext:rejectValue
|
||||
if ( this.entryListeners_.hasOwnProperty(op.uuid) ) {
|
||||
if ( Object.prototype.hasOwnProperty.call(this.entryListeners_, op.uuid) ) {
|
||||
const listeners = this.entryListeners_[op.uuid];
|
||||
|
||||
delete this.entryListeners_[op.uuid];
|
||||
@@ -495,19 +477,19 @@ export default class {
|
||||
}
|
||||
|
||||
checkShouldExec_ () {
|
||||
if ( this.status !== this.constructor.STATUS_READY ) return;
|
||||
if ( this.status !== FSEntryController.STATUS_READY ) return;
|
||||
if ( this.currentState.queue.length === 0 ) return;
|
||||
this.exec_();
|
||||
}
|
||||
|
||||
async exec_ () {
|
||||
if ( this.status !== this.constructor.STATUS_READY ) {
|
||||
if ( this.status !== FSEntryController.STATUS_READY ) {
|
||||
throw new Error('Duplicate exec_ call');
|
||||
}
|
||||
|
||||
const queue = this.currentState.queue;
|
||||
|
||||
this.status = this.constructor.STATUS_RUNNING_JOB;
|
||||
this.status = FSEntryController.STATUS_RUNNING_JOB;
|
||||
|
||||
// const conn = await db_primary.promise().getConnection();
|
||||
// await conn.beginTransaction();
|
||||
@@ -539,7 +521,7 @@ export default class {
|
||||
}
|
||||
|
||||
this.flipState_();
|
||||
this.status = this.constructor.STATUS_READY;
|
||||
this.status = FSEntryController.STATUS_READY;
|
||||
|
||||
for ( const op of queue ) {
|
||||
op.status = op.constructor.STATUS_DONE;
|
||||
@@ -559,4 +541,4 @@ export default class {
|
||||
queueSizeResolve();
|
||||
}
|
||||
// #endregion
|
||||
}
|
||||
}
|
||||
|
||||
@@ -204,12 +204,6 @@ const install = async ({ context, services, app, useapi, modapi }) => {
|
||||
const { EntriService } = require('./services/EntriService.js');
|
||||
services.registerService('entri-service', EntriService);
|
||||
|
||||
const { InformationService } = require('./services/information/InformationService');
|
||||
services.registerService('information', InformationService);
|
||||
|
||||
const { TraceService } = require('./services/TraceService.js');
|
||||
services.registerService('traceService', TraceService);
|
||||
|
||||
const { FilesystemService } = require('./filesystem/FilesystemService');
|
||||
services.registerService('filesystem', FilesystemService);
|
||||
|
||||
|
||||
@@ -23,6 +23,7 @@ const config = require('../config');
|
||||
const _path = require('path');
|
||||
const { NodeInternalIDSelector, NodeChildSelector, NodeUIDSelector, RootNodeSelector, NodePathSelector } = require('./node/selectors');
|
||||
const { Context } = require('../util/context');
|
||||
const { getTracer, span } = require('../util/otelutil');
|
||||
const { NodeRawEntrySelector } = require('./node/selectors');
|
||||
const { DB_READ } = require('../services/database/consts');
|
||||
const { UserActorType, AppUnderUserActorType, Actor } = require('../services/auth/Actor');
|
||||
@@ -135,7 +136,7 @@ module.exports = class FSNodeContext {
|
||||
for ( const method of fetch_methods ) {
|
||||
const original_method = this[method];
|
||||
this[method] = async (...args) => {
|
||||
const tracer = this.services.get('traceService').tracer;
|
||||
const tracer = getTracer();
|
||||
let result;
|
||||
const opts = { attributes: {
|
||||
selector: selector.describe(),
|
||||
@@ -276,11 +277,22 @@ module.exports = class FSNodeContext {
|
||||
|
||||
async fetchPath () {
|
||||
if ( this.path ) return;
|
||||
if ( this.entry?.path ) {
|
||||
this.path = this.entry.path;
|
||||
return;
|
||||
}
|
||||
const uid = this.entry?.uuid ?? this.uid;
|
||||
if ( ! uid ) return;
|
||||
this.path = await this.#resolvePathFromUuid(uid);
|
||||
}
|
||||
|
||||
this.path = await this.services.get('information')
|
||||
.with('fs.fsentry')
|
||||
.obtain('fs.fsentry:path')
|
||||
.exec(this.entry);
|
||||
async #resolvePathFromUuid (uuid) {
|
||||
if ( ! uuid ) return undefined;
|
||||
try {
|
||||
return await id2path(uuid);
|
||||
} catch (e) {
|
||||
return `/-void/${ uuid }`;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -295,7 +307,7 @@ module.exports = class FSNodeContext {
|
||||
*/
|
||||
async fetchEntry (fetch_entry_options = {}) {
|
||||
if ( this.fetching !== null ) {
|
||||
await Context.get('services').get('traceService').spanify('fetching', async () => {
|
||||
await span('fetching', async () => {
|
||||
// ???: does this need to be double-checked? I'm not actually sure...
|
||||
if ( this.fetching === null ) return;
|
||||
await this.fetching;
|
||||
@@ -839,8 +851,6 @@ module.exports = class FSNodeContext {
|
||||
if ( fsentry.owner ) delete fsentry.owner.email;
|
||||
}
|
||||
|
||||
const info = this.services.get('information');
|
||||
|
||||
if ( !this.uid && !this.entry.uuid ) {
|
||||
console.warn(`Potential Error in getSafeEntry with no uid or entry.uuid ${
|
||||
this.selector.describe() } ${
|
||||
@@ -849,10 +859,8 @@ module.exports = class FSNodeContext {
|
||||
|
||||
// If fsentry was found by a path but the entry doesn't
|
||||
// have a path, use the path that was used to find it.
|
||||
fsentry.path = res.path ?? this.path ?? await info
|
||||
.with('fs.fsentry:uuid')
|
||||
.obtain('fs.fsentry:path')
|
||||
.exec(this.uid ?? this.entry.uuid);
|
||||
const entry_uid = this.uid ?? this.entry.uuid;
|
||||
fsentry.path = res.path ?? this.path ?? await this.#resolvePathFromUuid(entry_uid);
|
||||
|
||||
if ( fsentry.path && fsentry.path.startsWith('/-void/') ) {
|
||||
fsentry.broken = true;
|
||||
|
||||
@@ -47,15 +47,6 @@ class FilesystemService extends BaseService {
|
||||
// used by update_child_paths
|
||||
this.db = services.get('database').get(DB_WRITE, 'filesystem');
|
||||
|
||||
const info = services.get('information');
|
||||
info.given('fs.fsentry').provide('fs.fsentry:path')
|
||||
.addStrategy('entry-or-delegate', async entry => {
|
||||
if ( entry.path ) return entry.path;
|
||||
return await info
|
||||
.with('fs.fsentry:uuid')
|
||||
.obtain('fs.fsentry:path')
|
||||
.exec(entry.uuid);
|
||||
});
|
||||
}
|
||||
|
||||
async _init () {
|
||||
|
||||
@@ -24,6 +24,7 @@ const { HLFilesystemOperation } = require('./definitions');
|
||||
const { MkTree } = require('./hl_mkdir');
|
||||
const { HLRemove } = require('./hl_remove');
|
||||
const { LLCopy } = require('../ll_operations/ll_copy');
|
||||
const { getTracer } = require('../../util/otelutil');
|
||||
|
||||
class HLCopy extends HLFilesystemOperation {
|
||||
static DESCRIPTION = `
|
||||
@@ -129,7 +130,7 @@ class HLCopy extends HLFilesystemOperation {
|
||||
}
|
||||
|
||||
// NEXT: implement _verify_room with profiling
|
||||
const tracer = svc.get('traceService').tracer;
|
||||
const tracer = getTracer();
|
||||
await tracer.startActiveSpan('fs:cp:verify-size-constraints', async span => {
|
||||
const source_file = source.entry;
|
||||
const dest_fsentry = parent.entry;
|
||||
|
||||
@@ -31,7 +31,6 @@ class LLCopy extends LLFilesystemOperation {
|
||||
const { source, parent, user, actor, target_name } = this.values;
|
||||
const svc = context.get('services');
|
||||
|
||||
const tracer = svc.get('traceService').tracer;
|
||||
const fs = svc.get('filesystem');
|
||||
const svc_event = svc.get('event');
|
||||
|
||||
|
||||
@@ -18,7 +18,7 @@
|
||||
*/
|
||||
const APIError = require('../../api/APIError');
|
||||
const { MemoryFSProvider } = require('../../modules/puterfs/customfs/MemoryFSProvider');
|
||||
const { ParallelTasks } = require('../../util/otelutil');
|
||||
const { ParallelTasks, getTracer } = require('../../util/otelutil');
|
||||
const FSNodeContext = require('../FSNodeContext');
|
||||
const { NodeUIDSelector } = require('../node/selectors');
|
||||
const { LLFilesystemOperation } = require('./definitions');
|
||||
@@ -68,7 +68,7 @@ class LLRmDir extends LLFilesystemOperation {
|
||||
throw APIError.create('not_empty');
|
||||
}
|
||||
|
||||
const tracer = svc.get('traceService').tracer;
|
||||
const tracer = getTracer();
|
||||
const tasks = new ParallelTasks({ tracer, max: max_tasks });
|
||||
|
||||
for ( const child_uuid of children ) {
|
||||
|
||||
+79
-88
@@ -848,6 +848,85 @@ function byte_format (bytes) {
|
||||
return `${Math.round(bytes / Math.pow(1024, i), 2) } ${ sizes[i]}`;
|
||||
};
|
||||
|
||||
const get_descendants = spanify('get_descendants', async (...args) => {
|
||||
return await getDescendantsHelper(...args);
|
||||
});
|
||||
|
||||
/**
|
||||
*
|
||||
* @param {integer} entry_id
|
||||
* @returns
|
||||
*/
|
||||
const id2path = spanify('helpers:id2path', async (entry_uid) => {
|
||||
if ( entry_uid == null ) {
|
||||
throw new Error('got null or undefined entry id');
|
||||
}
|
||||
|
||||
/** @type BaseDatabaseAccessService */
|
||||
const db = _servicesHolder.services.get('database').get(DB_READ, 'filesystem');
|
||||
|
||||
const log = _servicesHolder.services.get('log-service').create('helpers.id2path');
|
||||
log.traceOn();
|
||||
const errors = _servicesHolder.services.get('error-service').create(log);
|
||||
log.called();
|
||||
|
||||
let result;
|
||||
|
||||
log.debug(`entry id: ${entry_uid}`);
|
||||
if ( typeof entry_uid === 'number' ) {
|
||||
const old = entry_uid;
|
||||
entry_uid = await id2uuid(entry_uid);
|
||||
log.debug(`entry id resolved: resolved ${old} ${entry_uid}`);
|
||||
}
|
||||
|
||||
try {
|
||||
result = await db.read(`
|
||||
WITH RECURSIVE cte AS (
|
||||
SELECT uuid, parent_uid, name, name AS path
|
||||
FROM fsentries
|
||||
WHERE uuid = ?
|
||||
|
||||
UNION ALL
|
||||
|
||||
SELECT e.uuid, e.parent_uid, e.name, ${
|
||||
db.case({
|
||||
sqlite: 'e.name || \'/\' || cte.path',
|
||||
otherwise: 'CONCAT(e.name, \'/\', cte.path)',
|
||||
})
|
||||
}
|
||||
FROM fsentries e
|
||||
INNER JOIN cte ON cte.parent_uid = e.uuid
|
||||
)
|
||||
SELECT *
|
||||
FROM cte
|
||||
WHERE parent_uid IS NULL
|
||||
`, [entry_uid]);
|
||||
} catch (e) {
|
||||
errors.report('id2path.select', {
|
||||
alarm: true,
|
||||
source: e,
|
||||
message: `error while resolving path for ${entry_uid}: ${e.message}`,
|
||||
extra: {
|
||||
entry_uid,
|
||||
},
|
||||
});
|
||||
throw new ManagedError(`cannot create path for ${entry_uid}`);
|
||||
}
|
||||
|
||||
if ( !result || !result[0] ) {
|
||||
errors.report('id2path.select', {
|
||||
alarm: true,
|
||||
message: `no result for ${entry_uid}`,
|
||||
extra: {
|
||||
entry_uid,
|
||||
},
|
||||
});
|
||||
throw new ManagedError(`cannot create path for ${entry_uid}`);
|
||||
}
|
||||
|
||||
return `/${ result[0].path}`;
|
||||
});
|
||||
|
||||
/**
|
||||
* Recursively retrieve all files, directories, and subdirectories under `path`.
|
||||
* Optionally the `depth` can be set.
|
||||
@@ -1035,16 +1114,6 @@ async function getDescendantsHelper (path, user, depth, return_thumbnail = false
|
||||
return ret.flat();
|
||||
};
|
||||
|
||||
async function get_descendants (...args) {
|
||||
const tracer = _servicesHolder.services.get('traceService').tracer;
|
||||
let ret;
|
||||
await tracer.startActiveSpan('get_descendants', async span => {
|
||||
ret = await getDescendantsHelper(...args);
|
||||
span.end();
|
||||
});
|
||||
return ret;
|
||||
};
|
||||
|
||||
const get_dir_size = async (path, user) => {
|
||||
let size = 0;
|
||||
const descendants = await get_descendants(path, user);
|
||||
@@ -1057,84 +1126,6 @@ const get_dir_size = async (path, user) => {
|
||||
return size;
|
||||
};
|
||||
|
||||
/**
|
||||
*
|
||||
* @param {integer} entry_id
|
||||
* @returns
|
||||
*/
|
||||
async function id2path (entry_uid) {
|
||||
if ( entry_uid == null ) {
|
||||
throw new Error('got null or undefined entry id');
|
||||
}
|
||||
|
||||
/** @type BaseDatabaseAccessService */
|
||||
const db = _servicesHolder.services.get('database').get(DB_READ, 'filesystem');
|
||||
|
||||
const traces = _servicesHolder.services.get('traceService');
|
||||
const log = _servicesHolder.services.get('log-service').create('helpers.id2path');
|
||||
log.traceOn();
|
||||
const errors = _servicesHolder.services.get('error-service').create(log);
|
||||
log.called();
|
||||
|
||||
let result;
|
||||
|
||||
return await traces.spanify('helpers:id2path', async () => {
|
||||
log.debug(`entry id: ${entry_uid}`);
|
||||
if ( typeof entry_uid === 'number' ) {
|
||||
const old = entry_uid;
|
||||
entry_uid = await id2uuid(entry_uid);
|
||||
log.debug(`entry id resolved: resolved ${old} ${entry_uid}`);
|
||||
}
|
||||
|
||||
try {
|
||||
result = await db.read(`
|
||||
WITH RECURSIVE cte AS (
|
||||
SELECT uuid, parent_uid, name, name AS path
|
||||
FROM fsentries
|
||||
WHERE uuid = ?
|
||||
|
||||
UNION ALL
|
||||
|
||||
SELECT e.uuid, e.parent_uid, e.name, ${
|
||||
db.case({
|
||||
sqlite: 'e.name || \'/\' || cte.path',
|
||||
otherwise: 'CONCAT(e.name, \'/\', cte.path)',
|
||||
})
|
||||
}
|
||||
FROM fsentries e
|
||||
INNER JOIN cte ON cte.parent_uid = e.uuid
|
||||
)
|
||||
SELECT *
|
||||
FROM cte
|
||||
WHERE parent_uid IS NULL
|
||||
`, [entry_uid]);
|
||||
} catch (e) {
|
||||
errors.report('id2path.select', {
|
||||
alarm: true,
|
||||
source: e,
|
||||
message: `error while resolving path for ${entry_uid}: ${e.message}`,
|
||||
extra: {
|
||||
entry_uid,
|
||||
},
|
||||
});
|
||||
throw new ManagedError(`cannot create path for ${entry_uid}`);
|
||||
}
|
||||
|
||||
if ( !result || !result[0] ) {
|
||||
errors.report('id2path.select', {
|
||||
alarm: true,
|
||||
message: `no result for ${entry_uid}`,
|
||||
extra: {
|
||||
entry_uid,
|
||||
},
|
||||
});
|
||||
throw new ManagedError(`cannot create path for ${entry_uid}`);
|
||||
}
|
||||
|
||||
return `/${ result[0].path}`;
|
||||
});
|
||||
};
|
||||
|
||||
/**
|
||||
*
|
||||
* @param {string} glob
|
||||
|
||||
@@ -10,7 +10,6 @@ import { DetailProviderService } from '../../services/DetailProviderService.js';
|
||||
import { EventService } from '../../services/EventService.js';
|
||||
import { FeatureFlagService } from '../../services/FeatureFlagService.js';
|
||||
import { GetUserService } from '../../services/GetUserService.js';
|
||||
import { InformationService } from '../../services/information/InformationService.js';
|
||||
import { MeteringServiceWrapper } from '../../services/MeteringService/MeteringServiceWrapper.mjs';
|
||||
import { NotificationService } from '../../services/NotificationService';
|
||||
import { RegistrantService } from '../../services/RegistrantService';
|
||||
@@ -21,7 +20,6 @@ import { ScriptService } from '../../services/ScriptService';
|
||||
import { SessionService } from '../../services/SessionService';
|
||||
import { SUService } from '../../services/SUService';
|
||||
import { SystemValidationService } from '../../services/SystemValidationService';
|
||||
import { TraceService } from '../../services/TraceService';
|
||||
import { AlarmService } from '../core/AlarmService';
|
||||
import APIErrorService from '../web/APIErrorService';
|
||||
|
||||
@@ -32,7 +30,6 @@ export class TestCoreModule {
|
||||
services.registerService('whoami', DetailProviderService);
|
||||
services.registerService('get-user', GetUserService);
|
||||
services.registerService('database', SqliteDatabaseAccessService);
|
||||
services.registerService('traceService', TraceService);
|
||||
services.registerService('su', SUService);
|
||||
services.registerService('alarm', AlarmService);
|
||||
services.registerService('event', EventService);
|
||||
@@ -48,7 +45,6 @@ export class TestCoreModule {
|
||||
services.registerService('__registrant', RegistrantService);
|
||||
services.registerService('feature-flag', FeatureFlagService);
|
||||
services.registerService('token', TokenService);
|
||||
services.registerService('information', InformationService);
|
||||
services.registerService('auth', AuthService);
|
||||
services.registerService('session', SessionService);
|
||||
services.registerService('notification', NotificationService);
|
||||
|
||||
@@ -21,6 +21,7 @@ const eggspress = require('../../api/eggspress.js');
|
||||
const FSNodeParam = require('../../api/filesystem/FSNodeParam.js');
|
||||
const { HLCopy } = require('../../filesystem/hl_operations/hl_copy.js');
|
||||
const { Context } = require('../../util/context.js');
|
||||
const { getTracer } = require('../../util/otelutil.js');
|
||||
|
||||
// -----------------------------------------------------------------------//
|
||||
// POST /copy
|
||||
@@ -58,7 +59,7 @@ module.exports = eggspress('/copy', {
|
||||
x.set(operationTraceSvc.ckey('frame'), frame);
|
||||
}
|
||||
|
||||
const tracer = req.services.get('traceService').tracer;
|
||||
const tracer = getTracer();
|
||||
await tracer.startActiveSpan('filesystem_api.copy', async span => {
|
||||
|
||||
// === upcoming copy behaviour ===
|
||||
|
||||
@@ -21,6 +21,7 @@ const eggspress = require('../../api/eggspress.js');
|
||||
const FSNodeParam = require('../../api/filesystem/FSNodeParam.js');
|
||||
const { HLMove } = require('../../filesystem/hl_operations/hl_move.js');
|
||||
const { Context } = require('../../util/context.js');
|
||||
const { getTracer } = require('../../util/otelutil.js');
|
||||
|
||||
// -----------------------------------------------------------------------//
|
||||
// POST /move
|
||||
@@ -57,7 +58,7 @@ module.exports = eggspress('/move', {
|
||||
x.set(operationTraceSvc.ckey('frame'), frame);
|
||||
}
|
||||
|
||||
const tracer = req.services.get('traceService').tracer;
|
||||
const tracer = getTracer();
|
||||
await tracer.startActiveSpan('filesystem_api.move', async span => {
|
||||
const hl_move = new HLMove();
|
||||
const response = await hl_move.run({
|
||||
|
||||
@@ -1,86 +0,0 @@
|
||||
/*
|
||||
* Copyright (C) 2024-present Puter Technologies Inc.
|
||||
*
|
||||
* This file is part of Puter.
|
||||
*
|
||||
* Puter is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Affero General Public License as published
|
||||
* by the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU Affero General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
*/
|
||||
const opentelemetry = require('@opentelemetry/api');
|
||||
const BaseService = require('./BaseService');
|
||||
|
||||
/**
|
||||
* @class TraceService
|
||||
* @description This class is responsible for creating and managing
|
||||
* traces for the Puter application using the OpenTelemetry API.
|
||||
* It provides methods to start spans, which are used for tracking
|
||||
* operations and measuring performance within the application.
|
||||
*/
|
||||
class TraceService extends BaseService {
|
||||
_construct () {
|
||||
this.tracer_ = opentelemetry.trace.getTracer('puter-filesystem-tracer');
|
||||
}
|
||||
|
||||
/**
|
||||
* Retrieves the tracer instance used for creating spans.
|
||||
* This method is a getter that returns the current tracer object.
|
||||
*
|
||||
* @returns {import("@opentelemetry/api").Tracer} The tracer instance for this service.
|
||||
*/
|
||||
get tracer () {
|
||||
return this.tracer_;
|
||||
}
|
||||
|
||||
/**
|
||||
* Starts an active span for executing a function with tracing.
|
||||
* This method wraps the provided function `fn` in a span, managing
|
||||
* span lifecycle, error handling, and status updates.
|
||||
*
|
||||
* @param {string} name - The name of the span.
|
||||
* @param {Function} fn - The asynchronous function to execute within the span.
|
||||
* @param {opentelemetry.SpanOptions} [options] - The opentelemetry options object
|
||||
* @returns {Promise} - A promise that resolves to the return value of `fn`.
|
||||
*/
|
||||
async span (name, fn, options) {
|
||||
const args = [name];
|
||||
if ( options !== null && typeof options === 'object' ) {
|
||||
args.push(options);
|
||||
}
|
||||
args.push(async span => {
|
||||
try {
|
||||
return await fn({ span });
|
||||
} catch ( error ) {
|
||||
span.setStatus({ code: opentelemetry.SpanStatusCode.ERROR, message: error.message });
|
||||
throw error;
|
||||
} finally {
|
||||
span.end();
|
||||
}
|
||||
});
|
||||
this.tracer.startActiveSpan('name', { }, () => {
|
||||
// This block intentionally left blank
|
||||
});
|
||||
return await this.tracer.startActiveSpan(...args);
|
||||
}
|
||||
|
||||
/**
|
||||
* @deprecated use `span` instead to avoid confusion with the spanify
|
||||
* function from otelutil.
|
||||
*/
|
||||
async spanify (name, fn, options) {
|
||||
return await this.span(name, fn, options);
|
||||
}
|
||||
}
|
||||
|
||||
module.exports = {
|
||||
TraceService,
|
||||
};
|
||||
@@ -1,64 +0,0 @@
|
||||
import { describe, expect, it } from 'vitest';
|
||||
import { createTestKernel } from '../../tools/test.mjs';
|
||||
import { TraceService } from './TraceService';
|
||||
|
||||
describe('TraceService', async () => {
|
||||
const testKernel = await createTestKernel({
|
||||
serviceMap: {
|
||||
trace: TraceService,
|
||||
},
|
||||
initLevelString: 'construct',
|
||||
});
|
||||
|
||||
const traceService = testKernel.services!.get('trace') as TraceService;
|
||||
|
||||
it('should be instantiated', () => {
|
||||
expect(traceService).toBeInstanceOf(TraceService);
|
||||
});
|
||||
|
||||
it('should have a tracer', () => {
|
||||
expect(traceService.tracer).toBeDefined();
|
||||
});
|
||||
|
||||
it('should create spans with spanify', async () => {
|
||||
const result = await traceService.spanify('test-span', async ({ span }) => {
|
||||
expect(span).toBeDefined();
|
||||
return 'test-result';
|
||||
});
|
||||
expect(result).toBe('test-result');
|
||||
});
|
||||
|
||||
it('should execute callback within span', async () => {
|
||||
let executed = false;
|
||||
await traceService.spanify('exec-span', async () => {
|
||||
executed = true;
|
||||
});
|
||||
expect(executed).toBe(true);
|
||||
});
|
||||
|
||||
it('should handle errors in spanify', async () => {
|
||||
await expect(
|
||||
traceService.spanify('error-span', async () => {
|
||||
throw new Error('Test span error');
|
||||
})
|
||||
).rejects.toThrow('Test span error');
|
||||
});
|
||||
|
||||
it('should support options in spanify', async () => {
|
||||
const result = await traceService.spanify('options-span', async ({ span }) => {
|
||||
return 'with-options';
|
||||
}, {
|
||||
attributes: { 'test.attribute': 'value' },
|
||||
});
|
||||
expect(result).toBe('with-options');
|
||||
});
|
||||
|
||||
it('should return values from span callback', async () => {
|
||||
const obj = { value: 42 };
|
||||
const result = await traceService.spanify('return-span', async () => {
|
||||
return obj;
|
||||
});
|
||||
expect(result).toEqual(obj);
|
||||
});
|
||||
});
|
||||
|
||||
@@ -21,11 +21,13 @@ const { hardcoded_user_group_permissions } = require('../../data/hardcoded-permi
|
||||
const { ECMAP } = require('../../filesystem/ECMAP');
|
||||
const { get_user, get_app } = require('../../helpers');
|
||||
const { reading_has_terminal } = require('../../unstructured/permission-scan-lib');
|
||||
const { trace } = require('@opentelemetry/api');
|
||||
const BaseService = require('../BaseService');
|
||||
const { DB_WRITE } = require('../database/consts');
|
||||
const { UserActorType, Actor, AppUnderUserActorType } = require('./Actor');
|
||||
const { PERM_KEY_PREFIX, MANAGE_PERM_PREFIX } = require('./permissionConts.mjs');
|
||||
const { PermissionUtil, PermissionExploder, PermissionImplicator, PermissionRewriter } = require('./permissionUtils.mjs');
|
||||
const { spanify } = require('../../util/otelutil');
|
||||
|
||||
/**
|
||||
* @class PermissionService
|
||||
@@ -128,14 +130,11 @@ class PermissionService extends BaseService {
|
||||
* Can be a single permission string or an array of permission strings.
|
||||
* @returns {Promise<boolean>} - True if the actor has at least one of the permissions, false otherwise.
|
||||
*/
|
||||
async check (actor, permission_options, scan_options = {}) {
|
||||
const svc_trace = this.services.get('traceService');
|
||||
return await svc_trace.spanify('permission:check', async () => {
|
||||
const reading = await this.scan(actor, permission_options, undefined, undefined, scan_options);
|
||||
const options = PermissionUtil.reading_to_options(reading);
|
||||
return options.length > 0;
|
||||
});
|
||||
}
|
||||
check = spanify('permission:check', async (actor, permission_options, scan_options = {}) => {
|
||||
const reading = await this.scan(actor, permission_options, undefined, undefined, scan_options);
|
||||
const options = PermissionUtil.reading_to_options(reading);
|
||||
return options.length > 0;
|
||||
});
|
||||
/**
|
||||
* Checks if the actor has grant access to any of the specified permissions.
|
||||
*
|
||||
@@ -143,15 +142,12 @@ class PermissionService extends BaseService {
|
||||
* @param {string} permission - The permission to check against.
|
||||
* @returns {Promise<boolean>} - True if the actor has at least one of the permissions, false otherwise.
|
||||
*/
|
||||
async canManagePermission (actor, permission) {
|
||||
const svc_trace = this.services.get('traceService');
|
||||
return await svc_trace.spanify('permission:check', async () => {
|
||||
const managePermission = PermissionUtil.join(MANAGE_PERM_PREFIX, ...PermissionUtil.split(permission));
|
||||
const reading = await this.scan(actor, managePermission);
|
||||
const options = PermissionUtil.reading_to_options(reading);
|
||||
return options.length > 0;
|
||||
});
|
||||
}
|
||||
canManagePermission = spanify('permission:check', async (actor, permission) => {
|
||||
const managePermission = PermissionUtil.join(MANAGE_PERM_PREFIX, ...PermissionUtil.split(permission));
|
||||
const reading = await this.scan(actor, managePermission);
|
||||
const options = PermissionUtil.reading_to_options(reading);
|
||||
return options.length > 0;
|
||||
});
|
||||
|
||||
/**
|
||||
* Scans the permissions for an actor against specified permission options.
|
||||
@@ -168,14 +164,22 @@ class PermissionService extends BaseService {
|
||||
*
|
||||
* @returns {Promise<Array>} A promise that resolves to an array of permission readings.
|
||||
*/
|
||||
async scan (actor, permission_options, _reserved, state, scan_options = {}) {
|
||||
const svc_trace = this.services.get('traceService');
|
||||
return await svc_trace.spanify('permission:scan', async () => {
|
||||
return await ECMAP.arun(async () => {
|
||||
return await this.#scan(actor, permission_options, _reserved, state, scan_options);
|
||||
});
|
||||
}, { attributes: { permission_options }, actor: actor.uid });
|
||||
}
|
||||
scan = spanify('permission:scan', async (actor, permission_options, _reserved, state, scan_options = {}) => {
|
||||
const activeSpan = trace.getActiveSpan();
|
||||
if ( activeSpan ) {
|
||||
const options = Array.isArray(permission_options)
|
||||
? permission_options
|
||||
: [permission_options];
|
||||
activeSpan.setAttribute('permission_options', options);
|
||||
if ( actor?.uid != null ) {
|
||||
activeSpan.setAttribute('actor', actor.uid);
|
||||
}
|
||||
}
|
||||
return await ECMAP.arun(async () => {
|
||||
return await this.#scan(actor, permission_options, _reserved, state, scan_options);
|
||||
});
|
||||
});
|
||||
|
||||
async #scan (actor, permission_options, _reserved, state, scan_options = {}) {
|
||||
if ( ! state ) {
|
||||
this.log.debug('scan', {
|
||||
|
||||
@@ -16,8 +16,10 @@
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
*/
|
||||
const { trace } = require('@opentelemetry/api');
|
||||
const BaseService = require('../BaseService');
|
||||
const { DB_WRITE, DB_READ } = require('./consts');
|
||||
const { spanify } = require('../../util/otelutil');
|
||||
|
||||
/**
|
||||
* BaseDatabaseAccessService class extends BaseService to provide
|
||||
@@ -28,9 +30,15 @@ const { DB_WRITE, DB_READ } = require('./consts');
|
||||
class BaseDatabaseAccessService extends BaseService {
|
||||
static DB_WRITE = DB_WRITE;
|
||||
static DB_READ = DB_READ;
|
||||
_setDbSpanAttributes (query) {
|
||||
const activeSpan = trace.getActiveSpan();
|
||||
if ( ! activeSpan ) return;
|
||||
activeSpan.setAttribute('query', query);
|
||||
activeSpan.setAttribute('trace', (new Error()).stack);
|
||||
}
|
||||
case ( choices ) {
|
||||
const engine_name = this.constructor.ENGINE_NAME;
|
||||
if ( choices.hasOwnProperty(engine_name) ) {
|
||||
if ( Object.prototype.hasOwnProperty.call(choices, engine_name) ) {
|
||||
return choices[engine_name];
|
||||
}
|
||||
return choices.otherwise;
|
||||
@@ -54,13 +62,11 @@ class BaseDatabaseAccessService extends BaseService {
|
||||
return this;
|
||||
}
|
||||
|
||||
async read (query, params) {
|
||||
const svc_trace = this.services.get('traceService');
|
||||
return await svc_trace.spanify('database:read', async () => {
|
||||
if ( this.config.slow ) await new Promise(rslv => setTimeout(rslv, 70));
|
||||
return await this._read(query, params);
|
||||
}, { attributes: { query, trace: (new Error()).stack } });
|
||||
}
|
||||
read = spanify('database:read', async (query, params) => {
|
||||
this._setDbSpanAttributes(query);
|
||||
if ( this.config.slow ) await new Promise(rslv => setTimeout(rslv, 70));
|
||||
return await this._read(query, params);
|
||||
});
|
||||
|
||||
/**
|
||||
* requireRead will fallback to the primary database
|
||||
@@ -95,21 +101,17 @@ class BaseDatabaseAccessService extends BaseService {
|
||||
return results;
|
||||
}
|
||||
|
||||
async pread (query, params) {
|
||||
pread = spanify('database:pread', async (query, params) => {
|
||||
this._setDbSpanAttributes(query);
|
||||
if ( this.config.slow ) await new Promise(rslv => setTimeout(rslv, 70));
|
||||
const svc_trace = this.services.get('traceService');
|
||||
return await svc_trace.spanify('database:pread', async () => {
|
||||
return await this._read(query, params, { use_primary: true });
|
||||
}, { attributes: { query, trace: (new Error()).stack } });
|
||||
}
|
||||
return await this._read(query, params, { use_primary: true });
|
||||
});
|
||||
|
||||
async write (query, params) {
|
||||
write = spanify('database:write', async (query, params) => {
|
||||
this._setDbSpanAttributes(query);
|
||||
if ( this.config.slow ) await new Promise(rslv => setTimeout(rslv, 70));
|
||||
const svc_trace = this.services.get('traceService');
|
||||
return await svc_trace.spanify('database:write', async () => {
|
||||
return await this._write(query, params);
|
||||
}, { attributes: { query, trace: (new Error()).stack } });
|
||||
}
|
||||
return await this._write(query, params);
|
||||
});
|
||||
|
||||
async insert (table_name, data) {
|
||||
const values = Object.values(data);
|
||||
|
||||
@@ -25,6 +25,7 @@ const { PermissionUtil } = require('../auth/permissionUtils.mjs');
|
||||
const { Invoker } = require('../../../../putility/src/libs/invoker');
|
||||
const { get_user } = require('../../helpers');
|
||||
const { AdvancedBase } = require('@heyputer/putility');
|
||||
const { span } = require('../../util/otelutil');
|
||||
|
||||
const strutil = require('@heyputer/putility').libs.string;
|
||||
|
||||
@@ -345,9 +346,7 @@ class DriverService extends BaseService {
|
||||
|
||||
svc_event.emit('driver.create-call-context', event);
|
||||
|
||||
const svc_trace = this.services.get('traceService');
|
||||
|
||||
return await svc_trace.spanify(`driver:${driver}:${iface}:${method}`, async () => {
|
||||
return await span(`driver:${driver}:${iface}:${method}`, async () => {
|
||||
return event.context.arun(async () => {
|
||||
const result = await this.call_new_({
|
||||
actor,
|
||||
|
||||
@@ -1,233 +0,0 @@
|
||||
/*
|
||||
* Copyright (C) 2024-present Puter Technologies Inc.
|
||||
*
|
||||
* This file is part of Puter.
|
||||
*
|
||||
* Puter is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Affero General Public License as published
|
||||
* by the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU Affero General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
const BaseService = require('../BaseService');
|
||||
|
||||
/**
|
||||
* @class InformationProvider
|
||||
* @classdesc The InformationProvider class facilitates the registration of strategies for providing information based on given inputs. It allows services to register methods for obtaining information and optimizes the process by determining the most efficient methods for retrieving the required information.
|
||||
*/
|
||||
class InformationProvider {
|
||||
constructor (informationService, input) {
|
||||
this.informationService = informationService;
|
||||
this.input = input;
|
||||
}
|
||||
|
||||
provide (output) {
|
||||
this.output = output;
|
||||
return this;
|
||||
}
|
||||
|
||||
addStrategy (id, provider) {
|
||||
this.informationService.register_provider_(this.output, this.input, { id, fn: provider });
|
||||
return this;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Class InformationObtainer
|
||||
*
|
||||
* This class is responsible for obtaining information from various services. It takes an
|
||||
* InformationService instance and an input, allowing it to obtain specific outputs based on
|
||||
* the input provided. The class provides methods to specify the desired output and execute
|
||||
* the information retrieval process.
|
||||
*/
|
||||
class InformationObtainer {
|
||||
constructor (informationService, input) {
|
||||
this.informationService = informationService;
|
||||
this.input = input;
|
||||
}
|
||||
|
||||
obtain (output) {
|
||||
this.output = output;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Executes the information obtaining process asynchronously.
|
||||
*
|
||||
* This method wraps the process of obtaining information in a trace span for monitoring purposes.
|
||||
* It retrieves the necessary services and traces, then delegates the actual obtaining process
|
||||
* to the `obtain_` method of the `InformationService`.
|
||||
*
|
||||
* @async
|
||||
* @function exec
|
||||
* @param {...*} args - Variable number of arguments to be passed to the obtaining process.
|
||||
* @returns {Promise<*>} - A promise that resolves to the obtained information.
|
||||
*/
|
||||
async exec (...args) {
|
||||
const services = this.informationService.services;
|
||||
const traces = services.get('traceService');
|
||||
/**
|
||||
* Executes the obtaining process for the specified output from the specified input.
|
||||
* This method retrieves the relevant information service, traces, and spans the execution to
|
||||
* obtain the information asynchronously. It uses the informationService.obtain_ method to perform the actual retrieval.
|
||||
*
|
||||
* @async
|
||||
* @method exec
|
||||
* @param {...*} args - The arguments required for the obtaining process.
|
||||
* @returns {Promise<*>} - A promise that resolves to the obtained information.
|
||||
*/
|
||||
return await traces.spanify(`OBTAIN ${this.output} FROM ${this.input}`, async () => {
|
||||
return (await this.informationService.obtain_(this.output, this.input, ...args)).result;
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Allows services to provide methods for obtaining information,
|
||||
* and other services to obtain that information. Also optimizes
|
||||
* obtaining information by determining which methods are the
|
||||
* most efficient for obtaining the information.
|
||||
*
|
||||
* @example Obtain an fsentry given a path:
|
||||
*
|
||||
* const infosvc = services.get('information');
|
||||
* const fsentry = await infosvc
|
||||
* .with('fs.fsentry:path').obtain('fs.fsentry')
|
||||
* .exec(path);
|
||||
*
|
||||
* @example Register a method for obtaining an fsentry given a path:
|
||||
*
|
||||
* const infosvc = services.get('information');
|
||||
* infosvc.given('fs.fsentry:path').provide('fs.fsentry')
|
||||
* .addStrategy(async path => {
|
||||
* // code to obtain fsentry from path
|
||||
* });
|
||||
*/
|
||||
class InformationService extends BaseService {
|
||||
/**
|
||||
* @class
|
||||
* @extends BaseService
|
||||
* @description Provides a service for managing information providers and obtaining information efficiently.
|
||||
* @notes This class extends BaseService and includes methods for registering providers, obtaining information,
|
||||
* and managing command registrations.
|
||||
*/
|
||||
_construct () {
|
||||
this.providers_ = {};
|
||||
}
|
||||
|
||||
/**
|
||||
* Initializes the service by registering commands.
|
||||
*
|
||||
* @private
|
||||
* @method _init
|
||||
* @returns {void}
|
||||
*/
|
||||
_init () {
|
||||
this._register_commands(this.services.get('commands'));
|
||||
}
|
||||
|
||||
given (input) {
|
||||
return new InformationProvider(this, input);
|
||||
}
|
||||
|
||||
with (input) {
|
||||
return new InformationObtainer(this, input);
|
||||
}
|
||||
|
||||
register_provider_ (output, input, provider) {
|
||||
this.providers_ = this.providers_ || {};
|
||||
this.providers_[output] = this.providers_[output] || {};
|
||||
this.providers_[output][input] = this.providers_[output][input] || [];
|
||||
this.providers_[output][input].push(provider);
|
||||
}
|
||||
|
||||
/**
|
||||
* Asynchronously obtains information based on the provided output and input parameters.
|
||||
* This method iterates through registered providers, sorts them for optimization,
|
||||
* and attempts to fetch the desired information.
|
||||
*
|
||||
* @async
|
||||
* @function obtain_
|
||||
* @param {string} output - The type of information to obtain.
|
||||
* @param {string} input - The input parameter required to obtain the information.
|
||||
* @param {...*} args - Additional arguments to pass to the provider functions.
|
||||
* @returns {Promise<Object>} An object containing the provider ID and the result.
|
||||
* @throws {Error} If no providers are available for the given output and input.
|
||||
*/
|
||||
async obtain_ (output, input, ...args) {
|
||||
const providers = this.providers_[output][input];
|
||||
if ( ! providers ) {
|
||||
throw new Error(`no providers for ${output} <- ${input}`);
|
||||
}
|
||||
|
||||
// shuffle providers (for future performance optimization)
|
||||
providers.sort(() => Math.random() - 0.5);
|
||||
|
||||
// put providers with id 'redis' first
|
||||
providers.sort((a, b) => {
|
||||
if ( a.id === 'redis' ) return -1;
|
||||
if ( b.id === 'redis' ) return 1;
|
||||
return 0;
|
||||
});
|
||||
|
||||
// for now, go with the first provider that provides something
|
||||
for ( const provider of providers ) {
|
||||
this.log.debug(`trying provider ${provider.id} for ${output} <- ${input}`);
|
||||
const result = await provider.fn(...args);
|
||||
this.log.debug(`provider ${provider.id} for ${output} <- ${input} returned ${result}`);
|
||||
// TODO: log strategy used as span attribute/tag
|
||||
if ( result !== undefined ) return { provider: provider.id, result };
|
||||
}
|
||||
}
|
||||
|
||||
_register_commands (commands) {
|
||||
commands.registerCommands('info', [
|
||||
{
|
||||
id: 'providers',
|
||||
description: 'List information providers',
|
||||
handler: async (args, log) => {
|
||||
const providers = this.providers_;
|
||||
for ( const [output, inputs] of Object.entries(providers) ) {
|
||||
for ( const [input, providers] of Object.entries(inputs) ) {
|
||||
for ( const provider of providers ) {
|
||||
log.log(`${output} <- ${input} (${provider.id})`);
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
},
|
||||
{
|
||||
id: 'get',
|
||||
description: 'List information providers',
|
||||
handler: async (args, log) => {
|
||||
if ( args.length < 1 ) {
|
||||
log.log('usage: info:get <want> <have> <value>');
|
||||
return;
|
||||
}
|
||||
const [want, have, value] = args;
|
||||
this.log.debug(`info:get ${want} <- ${have} (${value})`);
|
||||
const result = await this.obtain_(want, have, value);
|
||||
let result_str;
|
||||
try {
|
||||
result_str = JSON.stringify(result.result);
|
||||
} catch (e) {
|
||||
result_str = `${ result.result}`;
|
||||
}
|
||||
log.log(`${want} <- ${have} (${value}) = ${result_str} (via ${result.provider})`);
|
||||
},
|
||||
},
|
||||
]);
|
||||
}
|
||||
}
|
||||
|
||||
module.exports = {
|
||||
InformationService,
|
||||
};
|
||||
@@ -17,6 +17,7 @@
|
||||
* along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
*/
|
||||
const { Context } = require('../util/context');
|
||||
const { getTracer } = require('../util/otelutil');
|
||||
|
||||
class OtelFeature {
|
||||
constructor (method_include_list) {
|
||||
@@ -32,7 +33,7 @@ class OtelFeature {
|
||||
|
||||
const class_name = instance.constructor.name;
|
||||
|
||||
const tracer = context.get('services').get('traceService').tracer;
|
||||
const tracer = getTracer();
|
||||
let result;
|
||||
await tracer.startActiveSpan(`${class_name}:${method_name}`, async span => {
|
||||
result = await original_method.call(instance, ...args);
|
||||
@@ -44,33 +45,6 @@ class OtelFeature {
|
||||
}
|
||||
}
|
||||
|
||||
class SyncOtelFeature {
|
||||
constructor (method_include_list) {
|
||||
this.method_include_list = method_include_list;
|
||||
}
|
||||
install_in_instance (instance) {
|
||||
for ( const method_name of this.method_include_list ) {
|
||||
const original_method = instance[method_name];
|
||||
instance[method_name] = (...args) => {
|
||||
const context = Context.get();
|
||||
if ( ! context ) {
|
||||
throw new Error('missing context');
|
||||
}
|
||||
|
||||
const class_name = instance.constructor.name;
|
||||
|
||||
const tracer = context.get('services').get('traceService').tracer;
|
||||
let result;
|
||||
tracer.startActiveSpan(`${class_name}:${method_name}`, async span => {
|
||||
result = original_method.call(instance, ...args);
|
||||
span.end();
|
||||
});
|
||||
return result;
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
module.exports = {
|
||||
OtelFeature,
|
||||
};
|
||||
|
||||
@@ -22,7 +22,6 @@
|
||||
// be the correct path, not a way to shoot yourself in the foot.
|
||||
|
||||
import { context, trace, SpanStatusCode } from '@opentelemetry/api';
|
||||
import { Context } from './context.js';
|
||||
import { TeePromise } from '@heyputer/putility/src/libs/promise.js';
|
||||
|
||||
/*
|
||||
@@ -42,53 +41,57 @@ promises.push(tracer.startActiveSpan(`job:${job.id}`, (span) => {
|
||||
}));
|
||||
*/
|
||||
|
||||
/** @type {<T extends Function>(label:string, fn:T, tracer?: unknown)=> T} */
|
||||
export const spanify = (label, fn, tracer) => async function (...args) {
|
||||
const context = Context.get();
|
||||
if ( ! context ) {
|
||||
// We don't use the proper logger here because we would normally
|
||||
// be getting it from context
|
||||
console.error('spanify failed', new Error('missing context'));
|
||||
export const DEFAULT_TRACER_NAME = 'puter-tracer';
|
||||
|
||||
export const getTracer = (name = DEFAULT_TRACER_NAME) =>
|
||||
trace.getTracer(name ?? DEFAULT_TRACER_NAME);
|
||||
|
||||
const resolveTracer = (tracer, name) =>
|
||||
tracer ?? getTracer(name ?? DEFAULT_TRACER_NAME);
|
||||
|
||||
/** @type {<T extends Function>(label:string, fn:T, options?: object | unknown, tracer?: unknown)=> T} */
|
||||
export const spanify = (label, fn, options, tracer) => async function (...args) {
|
||||
if ( options && typeof options.startActiveSpan === 'function' && !tracer ) {
|
||||
tracer = options;
|
||||
options = undefined;
|
||||
}
|
||||
|
||||
tracer = tracer ?? context?.get('services')?.get('traceService')?.tracer;
|
||||
if ( ! tracer ) {
|
||||
console.error('spanify failed', new Error('missing tracer or services'));
|
||||
// eslint-disable-next-line no-invalid-this
|
||||
return await fn.apply(this, args);
|
||||
}
|
||||
const resolvedTracer = resolveTracer(tracer);
|
||||
let result;
|
||||
return await tracer.startActiveSpan(label, async span => {
|
||||
const spanArgs = [label];
|
||||
if ( options !== null && typeof options === 'object' ) {
|
||||
spanArgs.push(options);
|
||||
}
|
||||
spanArgs.push(async span => {
|
||||
try {
|
||||
// eslint-disable-next-line no-invalid-this
|
||||
// eslint-disable-next-line no-invalid-this
|
||||
result = await fn.apply(this, args);
|
||||
span.setStatus({ code: SpanStatusCode.OK });
|
||||
return result;
|
||||
} catch (e) {
|
||||
span.recordException(e);
|
||||
span.setStatus({ code: SpanStatusCode.ERROR, message: e.message });
|
||||
throw e;
|
||||
} finally {
|
||||
span.end();
|
||||
}
|
||||
});
|
||||
return await resolvedTracer.startActiveSpan(...spanArgs);
|
||||
};
|
||||
|
||||
/** @type {(label: string, tracer?: unknown) => MethodDecorator} */
|
||||
export const Span = (label, tracer) => (_target, _propertyKey, descriptor) => {
|
||||
/** @type {<T extends Function>(label:string, fn:T, options?: object | unknown, tracer?: unknown)=> ReturnType<T>} */
|
||||
export const span = async (label, fn, options, tracer) =>
|
||||
await spanify(label, fn, options, tracer)();
|
||||
|
||||
/** @type {(label: string, options?: object | unknown, tracer?: unknown) => MethodDecorator} */
|
||||
export const Span = (label, options, tracer) => (_target, _propertyKey, descriptor) => {
|
||||
if ( !descriptor || typeof descriptor.value !== 'function' ) return descriptor;
|
||||
descriptor.value = spanify(label, descriptor.value, tracer);
|
||||
descriptor.value = spanify(label, descriptor.value, options, tracer);
|
||||
return descriptor;
|
||||
};
|
||||
|
||||
export const abtest = async (label, impls) => {
|
||||
const context = Context.get();
|
||||
if ( ! context ) {
|
||||
// We don't use the proper logger here because we would normally
|
||||
// be getting it from context
|
||||
console.error('abtest failed', new Error('missing context'));
|
||||
}
|
||||
|
||||
const tracer = context.get('services').get('traceService').tracer;
|
||||
const tracer = getTracer();
|
||||
let result;
|
||||
const impl_keys = Object.keys(impls);
|
||||
const impl_i = Math.floor(Math.random() * impl_keys.length);
|
||||
@@ -105,7 +108,7 @@ export const abtest = async (label, impls) => {
|
||||
|
||||
export class ParallelTasks {
|
||||
constructor ({ tracer, max } = {}) {
|
||||
this.tracer = tracer;
|
||||
this.tracer = tracer ?? getTracer();
|
||||
this.max = max ?? Infinity;
|
||||
this.promises = [];
|
||||
|
||||
@@ -166,4 +169,4 @@ export class ParallelTasks {
|
||||
throw new AggregateError(errors);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user