fix: dedup get_apps app requests (#2325)
Docker Image CI / build-and-push-image (push) Has been cancelled
Maintain Release Merge PR / update-release-pr (push) Has been cancelled
release-please / release-please (push) Has been cancelled
test / test-backend (24.x) (push) Has been cancelled
test / API tests (node env, api-test) (24.x) (push) Has been cancelled
test / puterjs (node env, vitest) (24.x) (push) Has been cancelled

* fix: dedup get_apps app requests

* fix: WSPushService errors

* fix: error messages
This commit is contained in:
Daniel Salazar
2026-01-22 11:07:42 -08:00
committed by GitHub
parent 2a8ec97c09
commit d3a27be88e
4 changed files with 121 additions and 41 deletions
+95 -18
View File
@@ -397,9 +397,42 @@ async function get_apps (specifiers, options = {}) {
appById.set(app.id, app);
};
const missingUids = new Set();
const missingNames = new Set();
const missingIds = new Set();
const pendingLookups = new Map();
const pendingToResolve = new Map();
const queryUids = new Set();
const queryNames = new Set();
const queryIds = new Set();
const queueMissing = (type, value) => {
const queryKey = `${type}:${value}`;
if ( pendingToResolve.has(queryKey) || pendingLookups.has(queryKey) ) {
return;
}
const pendingKey = `pending_app:${queryKey}`;
const pending = kv.get(pendingKey);
if ( pending ) {
pendingLookups.set(queryKey, pending);
return;
}
let resolveQuery;
let rejectQuery;
const queryPromise = new Promise((resolve, reject) => {
resolveQuery = resolve;
rejectQuery = reject;
});
kv.set(pendingKey, queryPromise, { EX: PENDING_QUERY_TTL });
pendingToResolve.set(queryKey, { resolveQuery, rejectQuery, pendingKey });
if ( type === 'uid' ) {
queryUids.add(value);
} else if ( type === 'name' ) {
queryNames.add(value);
} else if ( type === 'id' ) {
queryIds.add(value);
}
};
for ( const spec of normalized ) {
if ( spec.uid ) {
@@ -407,7 +440,7 @@ async function get_apps (specifiers, options = {}) {
if ( cached ) {
addApp(cached);
} else {
missingUids.add(spec.uid);
queueMissing('uid', spec.uid);
}
continue;
}
@@ -416,7 +449,7 @@ async function get_apps (specifiers, options = {}) {
if ( cached ) {
addApp(cached);
} else {
missingNames.add(spec.name);
queueMissing('name', spec.name);
}
continue;
}
@@ -425,43 +458,87 @@ async function get_apps (specifiers, options = {}) {
if ( cached ) {
addApp(cached);
} else {
missingIds.add(spec.id);
queueMissing('id', spec.id);
}
}
}
if ( missingUids.size || missingNames.size || missingIds.size ) {
const pendingResultsPromise = pendingLookups.size
? Promise.all(Array.from(pendingLookups.values()))
: Promise.resolve([]);
if ( queryUids.size || queryNames.size || queryIds.size ) {
/** @type BaseDatabaseAccessService */
const db = _servicesHolder.services.get('database').get(DB_READ, 'apps');
const clauses = [];
const params = [];
if ( missingUids.size ) {
const uids = Array.from(missingUids);
if ( queryUids.size ) {
const uids = Array.from(queryUids);
clauses.push(`uid IN (${uids.map(() => '?').join(', ')})`);
params.push(...uids);
}
if ( missingNames.size ) {
const names = Array.from(missingNames);
if ( queryNames.size ) {
const names = Array.from(queryNames);
clauses.push(`name IN (${names.map(() => '?').join(', ')})`);
params.push(...names);
}
if ( missingIds.size ) {
const ids = Array.from(missingIds);
if ( queryIds.size ) {
const ids = Array.from(queryIds);
clauses.push(`id IN (${ids.map(() => '?').join(', ')})`);
params.push(...ids);
}
const rows = await db.read(`SELECT * FROM \`apps\` WHERE ${clauses.join(' OR ')}`,
params);
let rows = [];
const resolvedKeys = new Set();
try {
rows = await db.read(`SELECT * FROM \`apps\` WHERE ${clauses.join(' OR ')}`,
params);
for ( const app of rows ) {
cacheApp(app);
addApp(app);
for ( const app of rows ) {
cacheApp(app);
addApp(app);
const uidKey = `uid:${app.uid}`;
const nameKey = `name:${app.name}`;
const idKey = `id:${app.id}`;
if ( pendingToResolve.has(uidKey) ) {
pendingToResolve.get(uidKey).resolveQuery(app);
resolvedKeys.add(uidKey);
}
if ( pendingToResolve.has(nameKey) ) {
pendingToResolve.get(nameKey).resolveQuery(app);
resolvedKeys.add(nameKey);
}
if ( pendingToResolve.has(idKey) ) {
pendingToResolve.get(idKey).resolveQuery(app);
resolvedKeys.add(idKey);
}
}
for ( const [key, { resolveQuery }] of pendingToResolve.entries() ) {
if ( ! resolvedKeys.has(key) ) {
resolveQuery(null);
}
}
} catch ( err ) {
for ( const { rejectQuery } of pendingToResolve.values() ) {
rejectQuery(err);
}
throw err;
} finally {
for ( const { pendingKey } of pendingToResolve.values() ) {
kv.del(pendingKey);
}
}
}
const pendingResults = await pendingResultsPromise;
for ( const app of pendingResults ) {
addApp(app);
}
return normalized.map(spec => {
let app;
if ( spec.uid ) {
@@ -65,9 +65,8 @@ const configurable_auth = options => async (req, res, next) => {
token = req.header('Authorization');
token = token.replace('Bearer ', '').trim();
if ( token === 'undefined' ) {
APIError.create('unexpected_undefined', null, {
msg: 'The Authorization token cannot be the string "undefined"',
});
res.status(401).send('Unauthenticated - token format invalid');
return;
}
}
// Cookie
@@ -91,10 +90,10 @@ const configurable_auth = options => async (req, res, next) => {
next();
return;
}
APIError.create('token_missing').write(res);
res.status(401).send('Unauthenticated - Token missing');
return;
} else if ( typeof token !== 'string' ) {
APIError.create('token_auth_failed').write(res);
res.status(401).send('Unauthenticated - token authentication failed');
return;
} else {
token = token.replace('Bearer ', '');
@@ -114,7 +113,7 @@ const configurable_auth = options => async (req, res, next) => {
actor = await svc_auth.authenticate_from_token(token);
} catch ( e ) {
if ( e instanceof APIError ) {
e.write(res);
res.status(500).send(e.message);
return;
}
if ( e instanceof LegacyTokenError && is_whoami(req) ) {
@@ -141,8 +140,7 @@ const configurable_auth = options => async (req, res, next) => {
next();
return;
}
const re = APIError.create('token_auth_failed');
re.write(res);
res.status(401).send('Unauthenticated - token authentication failed');
return;
}
@@ -150,12 +148,24 @@ const configurable_auth = options => async (req, res, next) => {
context.set('actor', actor);
if ( actor.type.user ) {
if ( actor.type.user?.suspended ) {
throw APIError.create('forbidden');
console.warn('Suspended user attempted to make request:', { userId: actor.type.user.id, username: actor.type.user.username });
res.status(403).send('Forbidden - user suspended');
return;
}
context.set('user', actor.type.user);
}
// === Populate Request ===
if ( actor ) {
console.log(`Authenticated actor ${actor.type?.user?.username} making request:`, {
actorId: actor.id,
actorType: actor.type.type,
userId: actor.type?.user?.id,
username: actor.type?.user?.username,
requestPath: req.path,
ip: req.ip,
});
}
req.actor = actor;
req.user = actor.type.user;
req.token = token;
@@ -27,13 +27,9 @@ module.exports = eggspress('/cache/last-change-timestamp', {
fs: true,
json: true,
allowedMethods: ['GET'],
}, async (req, res, next) => {
const svc_driver = Context.get('services').get('driver');
const driver_response = await svc_driver.call({
iface: 'puter-kvstore',
method: 'get',
args: { key: `last_change_timestamp:${req.user?.id}` },
});
const timestamp = driver_response.result;
}, async (req, res) => {
/** @type {import('../../services/repositories/DynamoKVStore/DynamoKVStore.js').DynamoKVStore} */
const kvStore = Context.get('services').get('puter-kvstore');
const timestamp = await kvStore.get({ key: `last_change_timestamp:${req.user?.id}` });
res.json({ timestamp });
});
+3 -6
View File
@@ -336,12 +336,9 @@ class WSPushService extends BaseService {
const key = `last_change_timestamp:${user_id}`;
try {
const svc_driver = Context.get('services').get('driver');
await svc_driver.call({
iface: 'puter-kvstore',
method: 'set',
args: { key, value: ts },
});
/** @type {import('./repositories/DynamoKVStore/DynamoKVStore.js').DynamoKVStore} */
const kvStore = Context.get('services').get('puter-kvstore');
await kvStore.set({ key: key, value: ts });
} catch ( error ) {
this.log.error('Failed to update user timestamp in kvstore', { user_id, error: error.message });
}