mirror of
https://github.com/HeyPuter/puter.git
synced 2026-05-03 16:10:31 +00:00
fix: redis max retries and fail fast timeouts (#2793)
Docker Image CI / build-and-push-image (push) Has been cancelled
Maintain Release Merge PR / update-release-pr (push) Has been cancelled
Notify HeyPuter / notify (push) Has been cancelled
release-please / release-please (push) Has been cancelled
test / test-backend (24.x) (push) Has been cancelled
test / API tests (node env, api-test) (24.x) (push) Has been cancelled
test / puterjs (node env, vitest) (24.x) (push) Has been cancelled
Docker Image CI / build-and-push-image (push) Has been cancelled
Maintain Release Merge PR / update-release-pr (push) Has been cancelled
Notify HeyPuter / notify (push) Has been cancelled
release-please / release-please (push) Has been cancelled
test / test-backend (24.x) (push) Has been cancelled
test / API tests (node env, api-test) (24.x) (push) Has been cancelled
test / puterjs (node env, vitest) (24.x) (push) Has been cancelled
This commit is contained in:
@@ -85,7 +85,7 @@ describe('redisSingleton', () => {
|
||||
dnsLookup: expect.any(Function),
|
||||
redisOptions: expect.objectContaining({
|
||||
connectTimeout: 10000,
|
||||
maxRetriesPerRequest: null,
|
||||
maxRetriesPerRequest: 2,
|
||||
tls: {},
|
||||
}),
|
||||
}));
|
||||
|
||||
@@ -4,6 +4,7 @@ import MockRedis from 'ioredis-mock';
|
||||
const redisStartupRetryMaxDelayMs = 2000;
|
||||
const redisSlotsRefreshTimeoutMs = 5000;
|
||||
const redisConnectTimeoutMs = 10000;
|
||||
const redisMaxRetriesPerRequest = 2;
|
||||
const redisBootRetryRegex = /Cluster(All)?FailedError|None of startup nodes is available/i;
|
||||
|
||||
const formatRedisError = (error: unknown): string => {
|
||||
@@ -56,7 +57,7 @@ if ( process.env.REDIS_CONFIG ) {
|
||||
redisOptions: {
|
||||
tls: {},
|
||||
connectTimeout: redisConnectTimeoutMs,
|
||||
maxRetriesPerRequest: null,
|
||||
maxRetriesPerRequest: redisMaxRetriesPerRequest,
|
||||
},
|
||||
});
|
||||
attachClusterEventHandlers(redisOpt);
|
||||
|
||||
@@ -92,8 +92,8 @@ const driverPolicies = {
|
||||
temp: {
|
||||
kv: {
|
||||
'rate-limit': {
|
||||
max: 10,
|
||||
period: 1000,
|
||||
max: 100,
|
||||
period: 10000,
|
||||
},
|
||||
},
|
||||
es: {
|
||||
@@ -106,8 +106,8 @@ const driverPolicies = {
|
||||
user: {
|
||||
kv: {
|
||||
'rate-limit': {
|
||||
max: 20,
|
||||
period: 1000,
|
||||
max: 200,
|
||||
period: 10000,
|
||||
},
|
||||
},
|
||||
es: {
|
||||
|
||||
@@ -24,7 +24,7 @@ const { reading_has_terminal } = require('../../unstructured/permission-scan-lib
|
||||
const { trace } = require('@opentelemetry/api');
|
||||
const BaseService = require('../BaseService');
|
||||
const { DB_WRITE } = require('../database/consts');
|
||||
const { UserActorType, Actor, AppUnderUserActorType } = require('./Actor');
|
||||
const { UserActorType, Actor } = require('./Actor');
|
||||
const { PERM_KEY_PREFIX, MANAGE_PERM_PREFIX } = require('./permissionConts.mjs');
|
||||
const { PermissionUtil, PermissionExploder, PermissionImplicator, PermissionRewriter } = require('./permissionUtils.mjs');
|
||||
const { spanify } = require('../../util/otelutil');
|
||||
@@ -34,6 +34,24 @@ const { redisClient } = require('../../clients/redis/redisSingleton');
|
||||
const { PermissionScanRedisCacheSpace } = require('./PermissionScanRedisCacheSpace.js');
|
||||
const { Context } = require('../../util/context');
|
||||
|
||||
const defaultPermissionRedisTimeoutMs = 200;
|
||||
const formatErrorMessage = (error) => error instanceof Error ? error.message : String(error);
|
||||
const withTimeout = async (operationPromise, timeoutMs, timeoutMessage) => {
|
||||
let timeout;
|
||||
try {
|
||||
return await Promise.race([
|
||||
operationPromise,
|
||||
new Promise((_, reject) => {
|
||||
timeout = setTimeout(() => {
|
||||
reject(new Error(timeoutMessage));
|
||||
}, timeoutMs);
|
||||
}),
|
||||
]);
|
||||
} finally {
|
||||
if ( timeout ) clearTimeout(timeout);
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
* @class PermissionService
|
||||
* @extends BaseService
|
||||
@@ -206,7 +224,24 @@ class PermissionService extends BaseService {
|
||||
joinPermissionParts: PermissionUtil.join,
|
||||
});
|
||||
|
||||
const cached = await redisClient.get(cacheKey);
|
||||
const permissionRedisTimeoutMs = Number(this.global_config?.services?.permission?.redis_timeout_ms)
|
||||
|| defaultPermissionRedisTimeoutMs;
|
||||
let cached;
|
||||
if ( ! scan_options.no_cache ) {
|
||||
try {
|
||||
cached = await withTimeout(
|
||||
redisClient.get(cacheKey),
|
||||
permissionRedisTimeoutMs,
|
||||
`permission scan cache read timed out after ${permissionRedisTimeoutMs}ms`,
|
||||
);
|
||||
} catch ( error ) {
|
||||
this.log.warn('permission scan cache read failed; continuing without cache', {
|
||||
actorUid: actor.uid,
|
||||
cacheKey,
|
||||
error: formatErrorMessage(error),
|
||||
});
|
||||
}
|
||||
}
|
||||
if ( cached && !scan_options.no_cache ) {
|
||||
try {
|
||||
return JSON.parse(cached);
|
||||
@@ -237,10 +272,22 @@ class PermissionService extends BaseService {
|
||||
value: end_ts - start_ts,
|
||||
});
|
||||
|
||||
await setRedisCacheValue(cacheKey, JSON.stringify(reading), {
|
||||
ttlSeconds: this._PERMISSION_SCAN_CACHE_TTL_SECONDS,
|
||||
eventData: reading,
|
||||
});
|
||||
try {
|
||||
await withTimeout(
|
||||
setRedisCacheValue(cacheKey, JSON.stringify(reading), {
|
||||
ttlSeconds: this._PERMISSION_SCAN_CACHE_TTL_SECONDS,
|
||||
eventData: reading,
|
||||
}),
|
||||
permissionRedisTimeoutMs,
|
||||
`permission scan cache write timed out after ${permissionRedisTimeoutMs}ms`,
|
||||
);
|
||||
} catch ( error ) {
|
||||
this.log.warn('permission scan cache write failed; continuing without cache', {
|
||||
actorUid: actor.uid,
|
||||
cacheKey,
|
||||
error: formatErrorMessage(error),
|
||||
});
|
||||
}
|
||||
|
||||
return reading;
|
||||
}
|
||||
|
||||
@@ -15,8 +15,8 @@ class PermissionShortcutService extends BaseService {
|
||||
return {
|
||||
policy: {
|
||||
'rate-limit': {
|
||||
max: 20,
|
||||
period: 1000,
|
||||
max: 200,
|
||||
period: 10000,
|
||||
},
|
||||
},
|
||||
};
|
||||
|
||||
@@ -24,8 +24,25 @@ const { DB_WRITE } = require('../database/consts');
|
||||
const { redisClient } = require('../../clients/redis/redisSingleton');
|
||||
const { RateLimitRedisCacheSpace } = require('./RateLimitRedisCacheSpace.js');
|
||||
|
||||
const ts_to_sql = (ts) => Math.floor(ts / 1000);
|
||||
const ts_fr_sql = (ts) => ts * 1000;
|
||||
const toSqlTimestamp = (timestampMs) => Math.floor(timestampMs / 1000);
|
||||
const fromSqlTimestamp = (timestampSec) => timestampSec * 1000;
|
||||
const defaultRateLimitRedisTimeoutMs = 200;
|
||||
const formatErrorMessage = (error) => error instanceof Error ? error.message : String(error);
|
||||
const withTimeout = async (operationPromise, timeoutMs, timeoutMessage) => {
|
||||
let timeout;
|
||||
try {
|
||||
return await Promise.race([
|
||||
operationPromise,
|
||||
new Promise((_, reject) => {
|
||||
timeout = setTimeout(() => {
|
||||
reject(new Error(timeoutMessage));
|
||||
}, timeoutMs);
|
||||
}),
|
||||
]);
|
||||
} finally {
|
||||
if ( timeout ) clearTimeout(timeout);
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
* RateLimitService class handles rate limiting functionality for API requests.
|
||||
@@ -52,6 +69,49 @@ class RateLimitService extends BaseService {
|
||||
this.db = this.services.get('database').get(DB_WRITE, 'rate-limit');
|
||||
}
|
||||
|
||||
async #checkAndIncrementInDb ({ dbKey, max, period, methodName }) {
|
||||
const rows = await this.db.read(
|
||||
'SELECT * FROM `rl_usage_fixed_window` WHERE `key` = ?',
|
||||
[dbKey],
|
||||
);
|
||||
|
||||
let windowStart = 0;
|
||||
let currentCount = 0;
|
||||
|
||||
if ( rows.length === 0 ) {
|
||||
windowStart = Date.now();
|
||||
this.db.write(
|
||||
'INSERT INTO `rl_usage_fixed_window` (`key`, `window_start`, `count`) VALUES (?, ?, ?)',
|
||||
[dbKey, toSqlTimestamp(windowStart), 0],
|
||||
);
|
||||
} else {
|
||||
const row = rows[0];
|
||||
windowStart = fromSqlTimestamp(row.window_start);
|
||||
currentCount = Number.isFinite(Number(row.count)) ? Number(row.count) : 0;
|
||||
}
|
||||
|
||||
if ( windowStart + period < Date.now() ) {
|
||||
windowStart = Date.now();
|
||||
currentCount = 0;
|
||||
this.db.write(
|
||||
'UPDATE `rl_usage_fixed_window` SET `window_start` = ?, `count` = ? WHERE `key` = ?',
|
||||
[toSqlTimestamp(windowStart), 0, dbKey],
|
||||
);
|
||||
}
|
||||
|
||||
if ( currentCount >= max ) {
|
||||
throw APIError.create('rate_limit_exceeded', null, {
|
||||
method_name: methodName,
|
||||
rate_limit: { max, period },
|
||||
});
|
||||
}
|
||||
|
||||
this.db.write(
|
||||
'UPDATE `rl_usage_fixed_window` SET `count` = `count` + 1 WHERE `key` = ?',
|
||||
[dbKey],
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks if a rate limit has been exceeded and increments the counter
|
||||
* @param {string} key - The rate limit key/identifier
|
||||
@@ -62,79 +122,108 @@ class RateLimitService extends BaseService {
|
||||
* @throws {APIError} When rate limit is exceeded
|
||||
*/
|
||||
async check_and_increment (key, max, period, options = {}) {
|
||||
const consumer_id = this._get_consumer_id();
|
||||
const method_name = key;
|
||||
key = `${consumer_id}:${key}`;
|
||||
const windowStartKey = RateLimitRedisCacheSpace.windowStartKey(key);
|
||||
const countKey = RateLimitRedisCacheSpace.countKey(key);
|
||||
const dbkey = options.global ? key : `${this.global_config.server_id}:${key}`;
|
||||
const consumerId = this._get_consumer_id();
|
||||
const methodName = key;
|
||||
const rateLimitKey = `${consumerId}:${key}`;
|
||||
const windowStartKey = RateLimitRedisCacheSpace.windowStartKey(rateLimitKey);
|
||||
const countKey = RateLimitRedisCacheSpace.countKey(rateLimitKey);
|
||||
const dbKey = options.global
|
||||
? rateLimitKey
|
||||
: `${this.global_config.server_id}:${rateLimitKey}`;
|
||||
const rateLimitRedisTimeoutMs = Number(this.global_config?.services?.['rate-limit']?.redis_timeout_ms)
|
||||
|| defaultRateLimitRedisTimeoutMs;
|
||||
const runRedis = async (operationName, operationPromise) => {
|
||||
try {
|
||||
const value = await withTimeout(
|
||||
operationPromise,
|
||||
rateLimitRedisTimeoutMs,
|
||||
`rate-limit redis ${operationName} timed out after ${rateLimitRedisTimeoutMs}ms`,
|
||||
);
|
||||
return { ok: true, value };
|
||||
} catch ( error ) {
|
||||
this.log.warn('rate-limit redis operation failed; continuing with db fallback', {
|
||||
operationName,
|
||||
rateLimitKey,
|
||||
error: formatErrorMessage(error),
|
||||
});
|
||||
return { ok: false, value: null };
|
||||
}
|
||||
};
|
||||
|
||||
// Fixed window counter strategy (see devlog 2023-11-21)
|
||||
const window_start_raw = await redisClient.get(windowStartKey);
|
||||
let window_start = Number.isFinite(Number(window_start_raw)) ? Number(window_start_raw) : 0;
|
||||
if ( window_start === 0 ) {
|
||||
const windowStartRead = await runRedis('window-start-read', redisClient.get(windowStartKey));
|
||||
if ( ! windowStartRead.ok ) {
|
||||
await this.#checkAndIncrementInDb({ dbKey, max, period, methodName });
|
||||
return;
|
||||
}
|
||||
let windowStart = Number.isFinite(Number(windowStartRead.value)) ? Number(windowStartRead.value) : 0;
|
||||
if ( windowStart === 0 ) {
|
||||
// Try database
|
||||
const rows = await this.db.read(
|
||||
'SELECT * FROM `rl_usage_fixed_window` WHERE `key` = ?',
|
||||
[dbkey],
|
||||
[dbKey],
|
||||
);
|
||||
|
||||
if ( rows.length !== 0 ) {
|
||||
const row = rows[0];
|
||||
window_start = ts_fr_sql(row.window_start);
|
||||
windowStart = fromSqlTimestamp(row.window_start);
|
||||
const count = row.count;
|
||||
|
||||
await Promise.all([
|
||||
redisClient.set(windowStartKey, window_start),
|
||||
redisClient.set(countKey, count),
|
||||
void Promise.all([
|
||||
runRedis('window-start-seed', redisClient.set(windowStartKey, windowStart)),
|
||||
runRedis('count-seed', redisClient.set(countKey, count)),
|
||||
]);
|
||||
}
|
||||
}
|
||||
|
||||
if ( window_start === 0 ) {
|
||||
window_start = Date.now();
|
||||
await Promise.all([
|
||||
redisClient.set(windowStartKey, window_start),
|
||||
redisClient.set(countKey, 0),
|
||||
if ( windowStart === 0 ) {
|
||||
windowStart = Date.now();
|
||||
void Promise.all([
|
||||
runRedis('window-start-init', redisClient.set(windowStartKey, windowStart)),
|
||||
runRedis('count-init', redisClient.set(countKey, 0)),
|
||||
]);
|
||||
|
||||
this.db.write(
|
||||
'INSERT INTO `rl_usage_fixed_window` (`key`, `window_start`, `count`) VALUES (?, ?, ?)',
|
||||
[dbkey, ts_to_sql(window_start), 0],
|
||||
[dbKey, toSqlTimestamp(windowStart), 0],
|
||||
);
|
||||
|
||||
this.log.debug(
|
||||
'CREATE window_start and count',
|
||||
{ window_start, count: 0 },
|
||||
'create windowStart and count',
|
||||
{ windowStart, count: 0 },
|
||||
);
|
||||
}
|
||||
|
||||
if ( window_start + period < Date.now() ) {
|
||||
window_start = Date.now();
|
||||
await Promise.all([
|
||||
redisClient.set(windowStartKey, window_start),
|
||||
redisClient.set(countKey, 0),
|
||||
if ( windowStart + period < Date.now() ) {
|
||||
windowStart = Date.now();
|
||||
Promise.all([
|
||||
runRedis('window-start-reset', redisClient.set(windowStartKey, windowStart)),
|
||||
runRedis('count-reset', redisClient.set(countKey, 0)),
|
||||
]);
|
||||
|
||||
this.db.write(
|
||||
'UPDATE `rl_usage_fixed_window` SET `window_start` = ?, `count` = ? WHERE `key` = ?',
|
||||
[ts_to_sql(window_start), 0, dbkey],
|
||||
[toSqlTimestamp(windowStart), 0, dbKey],
|
||||
);
|
||||
}
|
||||
|
||||
const current_raw = await redisClient.get(countKey);
|
||||
const current = Number.isFinite(Number(current_raw)) ? Number(current_raw) : 0;
|
||||
const currentRead = await runRedis('count-read', redisClient.get(countKey));
|
||||
if ( ! currentRead.ok ) {
|
||||
await this.#checkAndIncrementInDb({ dbKey, max, period, methodName });
|
||||
return;
|
||||
}
|
||||
const current = Number.isFinite(Number(currentRead.value)) ? Number(currentRead.value) : 0;
|
||||
if ( current >= max ) {
|
||||
throw APIError.create('rate_limit_exceeded', null, {
|
||||
method_name,
|
||||
method_name: methodName,
|
||||
rate_limit: { max, period },
|
||||
});
|
||||
}
|
||||
|
||||
await redisClient.incr(countKey);
|
||||
runRedis('count-incr', redisClient.incr(countKey));
|
||||
this.db.write(
|
||||
'UPDATE `rl_usage_fixed_window` SET `count` = `count` + 1 WHERE `key` = ?',
|
||||
[dbkey],
|
||||
[dbKey],
|
||||
);
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user