fix: healtcheck improvements (#2726)

* fix: decrease logs

* fix: healtcheck improvements
This commit is contained in:
Daniel Salazar
2026-03-25 11:10:58 -07:00
committed by GitHub
parent 08fa2d7a41
commit d292c1e4e4
9 changed files with 257 additions and 79 deletions
@@ -86,7 +86,7 @@ class AppInformationService extends BaseService {
} catch (e) {
console.error('App stats cache failed to update:', e);
}
}, 15.314 * 60 * 1000);
}, 13.314 * 60 * 1000);
})();
}
+43 -30
View File
@@ -66,11 +66,13 @@ class LogContext {
}
sub (name, fields = {}) {
return new LogContext(this.logService,
{
crumbs: name ? [...this.crumbs, name] : [...this.crumbs],
fields: { ...this.fields, ...fields },
});
return new LogContext(
this.logService,
{
crumbs: name ? [...this.crumbs, name] : [...this.crumbs],
fields: { ...this.fields, ...fields },
},
);
}
info (message, fields, objects) {
@@ -99,9 +101,11 @@ class LogContext {
}
cache (isCacheHit, identifier, fields = {}) {
this.log(LOG_LEVEL_DEBU,
isCacheHit ? 'cache_hit' : 'cache_miss',
{ identifier, ...fields });
this.log(
LOG_LEVEL_DEBU,
isCacheHit ? 'cache_hit' : 'cache_miss',
{ identifier, ...fields },
);
}
log (log_level, message, fields = {}, objects = {}) {
@@ -127,13 +131,16 @@ class LogContext {
}
if ( Context.get('injected_logger', { allow_fallback: true }) ) {
Context.get('injected_logger').log(
message + (fields ? (`; fields: ${ JSON.stringify(fields)}`) : ''));
message + (fields ? (`; fields: ${ JSON.stringify(fields)}`) : ''),
);
}
this.logService.log_(log_level,
this.crumbs,
message,
fields,
objects);
this.logService.log_(
log_level,
this.crumbs,
message,
fields,
objects,
);
}
/**
@@ -342,11 +349,13 @@ class CustomLogger {
if ( ret && ret.skip ) return;
if ( ! ret ) {
this.delegate.onLogMessage(log_lvl,
crumbs,
message,
fields,
...a);
this.delegate.onLogMessage(
log_lvl,
crumbs,
message,
fields,
...a,
);
return;
}
@@ -358,11 +367,13 @@ class CustomLogger {
args,
} = ret;
this.delegate.onLogMessage(_log_lvl ?? log_lvl,
_crumbs ?? crumbs,
_message ?? message,
_fields ?? fields,
...(args ?? a ?? []));
this.delegate.onLogMessage(
_log_lvl ?? log_lvl,
_crumbs ?? crumbs,
_message ?? message,
_fields ?? fields,
...(args ?? a ?? []),
);
}
}
@@ -479,7 +490,7 @@ class LogService extends BaseService {
const transports = config.toConsole
? [
new winston.transports.Console({
level: winston_level ?? 'http',
level: winston_level ?? 'info',
}),
]
: [
@@ -595,11 +606,13 @@ class LogService extends BaseService {
* @returns {LogContext} A new log context with the specified prefix and fields
*/
create (prefix, fields = {}) {
const logContext = new LogContext(this,
{
crumbs: [prefix],
fields,
});
const logContext = new LogContext(
this,
{
crumbs: [prefix],
fields,
},
);
return logContext;
}
@@ -20,7 +20,14 @@ const { ServerHealthRedisCacheKeys } = require('./ServerHealthRedisCacheKeys.js'
const BaseService = require('../../../services/BaseService');
const { kv } = require('../../../util/kvSingleton');
const { promise } = require('@heyputer/putility').libs;
const SECOND = 1000;
const CHECK_INTERVAL_MS = 5 * SECOND;
const CHECK_TIMEOUT_MS = 4 * SECOND;
const HEALTH_LOOP_STALE_MULTIPLIER = 3;
const EVENT_LOOP_MONITOR_INTERVAL_MS = SECOND;
const DEFAULT_EVENT_LOOP_LAG_FAIL_MS = 1000;
const DEFAULT_DB_LIVENESS_LATENCY_FAIL_MS = 1500;
/**
* The ServerHealthService class provides comprehensive health monitoring for the server.
@@ -45,12 +52,105 @@ class ServerHealthService extends BaseService {
_construct () {
this.checks_ = [];
this.failures_ = [];
this.health_started_at_ = Date.now();
this.last_check_cycle_started_at_ = 0;
this.last_check_cycle_completed_at_ = 0;
this.event_loop_lag_ms_ = 0;
this.web_checks_registered_ = false;
}
async _init () {
this.init_service_checks_();
this.stats_ = {};
this.#initDefaultChecks();
this.#initEventLoopMonitor();
this.#initServiceCheck();
}
async '__on_ready.webserver' () {
this.#registerWebChecks();
}
#initDefaultChecks () {
const eventLoopLagFailMs = Number(
this.global_config?.server_health?.event_loop_lag_fail_ms,
) || DEFAULT_EVENT_LOOP_LAG_FAIL_MS;
this.add_check('event-loop-lag', async () => {
this.stats_.event_loop_lag_ms = this.event_loop_lag_ms_;
if ( this.event_loop_lag_ms_ > eventLoopLagFailMs ) {
throw new Error(
`event loop lag too high: ${this.event_loop_lag_ms_}ms ` +
`(threshold ${eventLoopLagFailMs}ms)`,
);
}
});
const dbService = this.#getServiceIfAvailable('database');
if ( dbService && typeof dbService.read === 'function' ) {
const dbLivenessLatencyFailMs = Number(
this.global_config?.server_health?.db_liveness_latency_fail_ms,
) || DEFAULT_DB_LIVENESS_LATENCY_FAIL_MS;
this.add_check('database-liveness', async () => {
const startedAt = Date.now();
const rows = await dbService.read('SELECT 1 AS ok');
const durationMs = Date.now() - startedAt;
this.stats_.database_liveness_latency_ms = durationMs;
if ( !Array.isArray(rows) || rows.length === 0 ) {
throw new Error('database liveness check returned no rows');
}
if ( durationMs > dbLivenessLatencyFailMs ) {
throw new Error(
`database liveness query latency too high: ${durationMs}ms ` +
`(threshold ${dbLivenessLatencyFailMs}ms)`,
);
}
});
}
}
#initEventLoopMonitor () {
let expectedTickAt = Date.now() + EVENT_LOOP_MONITOR_INTERVAL_MS;
promise.asyncSafeSetInterval(() => {
const now = Date.now();
this.event_loop_lag_ms_ = Math.max(0, now - expectedTickAt);
this.stats_.event_loop_lag_ms = this.event_loop_lag_ms_;
expectedTickAt = now + EVENT_LOOP_MONITOR_INTERVAL_MS;
}, EVENT_LOOP_MONITOR_INTERVAL_MS);
}
#registerWebChecks () {
if ( this.web_checks_registered_ ) return;
const webServerService = this.#getServiceIfAvailable('web-server');
if ( ! webServerService ) return;
this.add_check('web-server-listening', async () => {
const server = webServerService.get_server?.();
if ( ! server ) {
throw new Error('web server is not initialized');
}
if ( server.listening !== true ) {
throw new Error('web server is not listening');
}
});
const socketioService = this.#getServiceIfAvailable('socketio');
if ( socketioService ) {
this.add_check('socketio-initialized', async () => {
if ( ! socketioService.io ) {
throw new Error('socket.io is not initialized');
}
});
}
this.web_checks_registered_ = true;
}
/**
@@ -61,7 +161,7 @@ class ServerHealthService extends BaseService {
* @param {none} - This method does not take any parameters.
* @returns {void} - This method does not return any value.
*/
init_service_checks_ () {
#initServiceCheck () {
const svc_alarm = this.services.get('alarm');
/**
* Initializes periodic health checks for the server.
@@ -77,8 +177,11 @@ class ServerHealthService extends BaseService {
* @returns {void}
*/
promise.asyncSafeSetInterval(async () => {
this.last_check_cycle_started_at_ = Date.now();
this.stats_.last_check_cycle_started_at = this.last_check_cycle_started_at_;
this.log.tick('service checks');
const check_failures = [];
const check_durations_ms = {};
for ( const { name, fn, chainable } of this.checks_ ) {
const p_timeout = new promise.TeePromise();
/**
@@ -88,42 +191,47 @@ class ServerHealthService extends BaseService {
*/
const timeout = setTimeout(() => {
p_timeout.reject(new Error('Health check timed out'));
}, 5 * SECOND);
}, CHECK_TIMEOUT_MS);
const check_started_at = Date.now();
try {
await Promise.race([
fn(),
p_timeout,
]);
clearTimeout(timeout);
} catch ( err ) {
// Trigger an alarm if this check isn't already in the failure list
if ( this.failures_.some(v => v.name === name) ) {
return;
}
svc_alarm.create(
'health-check-failure',
`Health check ${name} failed`,
{ error: err },
);
check_failures.push({ name });
const alreadyFailing = this.failures_.some(v => v.name === name);
this.log.error(`Error for healthcheck fail on ${name}: ${ err.stack}`);
if ( ! alreadyFailing ) {
svc_alarm.create(
'health-check-failure',
`Health check ${name} failed`,
{ error: err },
);
// Run the on_fail handlers
for ( const fn of chainable.on_fail_ ) {
try {
await fn(err);
} catch ( e ) {
this.log.error(`Error in on_fail handler for ${name}`, e);
// Run the on_fail handlers only on new failures
for ( const fn of chainable.on_fail_ ) {
try {
await fn(err);
} catch ( e ) {
this.log.error(`Error in on_fail handler for ${name}`, e);
}
}
}
this.log.error(`Error for healthcheck fail on ${name}: ${ err.stack}`);
} finally {
clearTimeout(timeout);
check_durations_ms[name] = Date.now() - check_started_at;
}
}
this.failures_ = check_failures;
}, 10 * SECOND, null, {
this.last_check_cycle_completed_at_ = Date.now();
this.stats_.last_check_cycle_completed_at = this.last_check_cycle_completed_at_;
this.stats_.check_durations_ms = check_durations_ms;
this.stats_.failed_checks = this.failures_.map(v => v.name);
}, CHECK_INTERVAL_MS, null, {
onBehindSchedule: (drift) => {
svc_alarm.create(
'health-checks-behind-schedule',
@@ -169,29 +277,71 @@ class ServerHealthService extends BaseService {
const cacheKey = ServerHealthRedisCacheKeys.status;
// Check cache first
const cached = await kv.get(cacheKey);
if ( cached ) {
try {
return JSON.parse(cached);
} catch (e) {
// no op cache is in an invalid state
try {
const cached = await kv.get(cacheKey);
if ( cached ) {
try {
return JSON.parse(cached);
} catch (e) {
// no op cache is in an invalid state
}
}
} catch (e) {
this.log.warn(`Unable to read health status cache: ${e.message}`);
}
// Compute status
const failures = this.failures_.map(v => v.name);
const failures = this.#getStatusFailures();
const status = {
ok: failures.length === 0,
...(failures.length ? { failed: failures } : {}),
};
// Cache with 5 second TTL
await kv.set(cacheKey, JSON.stringify(status), {
EX: 5,
});
try {
await kv.set(cacheKey, JSON.stringify(status), {
EX: 5,
});
} catch (e) {
this.log.warn(`Unable to write health status cache: ${e.message}`);
}
return status;
}
#getStatusFailures () {
const failures = this.failures_.map(v => v.name);
const staleHealthRunnerFailure = this.#getStaleHealthRunnerFailure();
if ( staleHealthRunnerFailure ) {
failures.push(staleHealthRunnerFailure);
}
return failures;
}
#getStaleHealthRunnerFailure () {
const staleAfterMs = Number(
this.global_config?.server_health?.stale_health_loop_fail_ms,
) || (CHECK_INTERVAL_MS * HEALTH_LOOP_STALE_MULTIPLIER);
const now = Date.now();
if ( this.last_check_cycle_completed_at_ === 0 ) {
return (now - this.health_started_at_) > staleAfterMs
? 'health-check-loop-not-running'
: null;
}
return (now - this.last_check_cycle_completed_at_) > staleAfterMs
? 'health-check-loop-stale'
: null;
}
#getServiceIfAvailable (serviceName) {
try {
return this.services.get(serviceName);
} catch {
return null;
}
}
}
module.exports = { ServerHealthService };
@@ -139,8 +139,10 @@ export class TelemetryService extends BaseService {
};
}
static #resolveExporterConfig (serviceConfig) {
return config.jaeger ?? serviceConfig?.jaeger;
static #resolveExporterConfig (_serviceConfig) {
// return config.jaeger ?? serviceConfig?.jaeger;
// TODO DS: reenable if needed
return false;
}
static #getConfiguredExporter (serviceConfig) {
@@ -179,10 +181,11 @@ export class TelemetryService extends BaseService {
}
const resource = Resource.default().merge(
new Resource({
[SemanticResourceAttributes.SERVICE_NAME]: 'puter-backend',
[SemanticResourceAttributes.SERVICE_VERSION]: '0.1.0',
}));
new Resource({
[SemanticResourceAttributes.SERVICE_NAME]: 'puter-backend',
[SemanticResourceAttributes.SERVICE_VERSION]: '0.1.0',
}),
);
const sdkConfig = {
resource,
+15 -4
View File
@@ -62,6 +62,19 @@ const isHostedDomainRequest = (req) => {
hostMatchesDomain(requestHost, hostedDomain));
};
const get_status = async (req) => {
const svc_serverHealth = req.services.get('server-health');
return await svc_serverHealth.get_status();
};
const send_health_status = async (req, res, { fail_with_http_error = false }) => {
const status = await get_status(req);
const shouldFailWithHttpError =
fail_with_http_error || !!req.query['return-http-error'];
const httpStatus = shouldFailWithHttpError && !status.ok ? 500 : 200;
res.status(httpStatus).json(status);
};
// -----------------------------------------------------------------------//
// GET /healthcheck
// -----------------------------------------------------------------------//
@@ -71,9 +84,7 @@ router.get('/healthcheck', async (req, res, next) => {
return;
}
const svc_serverHealth = req.services.get('server-health');
const status = await svc_serverHealth.get_status();
res.status((req.query['return-http-error'] && !status.ok) ? 500 : 200).json(status);
await send_health_status(req, res, { fail_with_http_error: false });
});
module.exports = router;
@@ -15,13 +15,14 @@ class DynamoKVStoreServiceWrapper extends BaseService {
tableName: this.config.tableName || 'store-kv-v1',
});
await this.kvStore.createTableIfNotExists();
await this.#registerHealthcheck();
Object.getOwnPropertyNames(DynamoKVStore.prototype).forEach(fn => {
if ( fn === 'constructor' ) return;
this[fn] = (...args: unknown[]) => this.kvStore[fn](...args);
});
}
async registerHealthcheck () {
async #registerHealthcheck () {
const healthcheckService = this.services.get('server-health');
healthcheckService.add_check('kv-store', async () => {
@@ -28,7 +28,7 @@ export class MeteringService {
this.#eventService = eventService;
setInterval(() => {
this.#checkRateOfChange();
}, 1000 * 60 * 15); // check every 15 minutes
}, 1000 * 60 * 16); // check every 16 minutes
}
utilRecordUsageObject<T extends Record<string, number>>(trackedUsageObject: T, actor: Actor, modelPrefix: string, costsOverrides?: Partial<Record<keyof T, number>>) {
@@ -73,7 +73,7 @@ class RefreshAssociationsService extends BaseService {
await Context.allow_fallback(async () => {
await refresh_associations_cache();
});
}, 30000);
}, 32000);
}, 15000);
}
}
@@ -137,7 +137,7 @@ export class EdgeRateLimitService extends BaseService {
* This method sets an interval that calls the cleanup function every 5 minutes.
*/
async _init () {
asyncSafeSetInterval(() => this.cleanup(), 5 * MINUTE);
asyncSafeSetInterval(() => this.cleanup(), 4.5 * MINUTE);
}
check (scope, noIncrease = false) {