otel: add spans to all kv methods (#2358)

This commit is contained in:
Daniel Salazar
2026-01-27 19:16:04 -08:00
committed by GitHub
parent 4183d5de06
commit 89c49902b6
3 changed files with 30 additions and 7 deletions
@@ -7,6 +7,7 @@ import murmurhash from 'murmurhash';
import { DDBClient } from '../DDBClient.js';
import { PUTER_KV_STORE_TABLE_DEFINITION } from './tableDefinition.js';
import APIError from '../../../api/APIError.js';
import { Span } from '../../../util/otelutil.js';
export class DynamoKVStore {
static GLOBAL_APP_KEY = 'os-global';
@@ -45,6 +46,7 @@ export class DynamoKVStore {
}
}
@Span('kv:get')
async get ({ key }: { key: string | string[]; }): Promise<unknown | null | (unknown | null)[]> {
if ( key === '' ) {
throw APIError.create('field_empty', null, {
@@ -138,6 +140,7 @@ export class DynamoKVStore {
}
@Span('kv:set')
async set ({ key, value, expireAt }: { key: string; value: unknown; expireAt?: number; }): Promise<boolean> {
const context = Context.get();
@@ -171,6 +174,7 @@ export class DynamoKVStore {
return true;
}
@Span('kv:del')
async del ({ key }: { key: string; }): Promise<boolean> {
const actor = Context.get('actor');
@@ -267,6 +271,7 @@ export class DynamoKVStore {
return trimmed;
}
@Span('kv:list')
async list ({
as,
limit,
@@ -364,6 +369,7 @@ export class DynamoKVStore {
return items;
}
@Span('kv:flush')
async flush () {
const actor = Context.get('actor');
@@ -407,6 +413,7 @@ export class DynamoKVStore {
return !!allRes;
}
@Span('kv:expireAt')
async expireAt ({ key, timestamp }: { key: string; timestamp: number; }): Promise<void> {
if ( key === '' ) {
throw APIError.create('field_empty', null, {
@@ -419,6 +426,7 @@ export class DynamoKVStore {
return await this.#expireAt(key, timestamp);
}
@Span('kv:expire')
async expire ({ key, ttl }: { key: string; ttl: number; }): Promise<void> {
if ( key === '' ) {
throw APIError.create('field_empty', null, {
@@ -521,6 +529,7 @@ export class DynamoKVStore {
}
// Ideally the paths support syntax like "a.b[2].c"
@Span('kv:incr')
async incr<T extends Record<string, number>>({ key, pathAndAmountMap }: { key: string; pathAndAmountMap: T; }): Promise<T extends { '': number; } ? number : RecursiveRecord<number>> {
if ( Object.values(pathAndAmountMap).find((v) => typeof v !== 'number') ) {
throw new Error('All values in pathAndAmountMap must be numbers');
@@ -581,6 +590,11 @@ export class DynamoKVStore {
return res.Attributes?.value;
}
async decr<T extends Record<string, number>>({ key, pathAndAmountMap }: { key: string; pathAndAmountMap: T; }) {
return await this.incr({ key, pathAndAmountMap: Object.fromEntries(Object.entries(pathAndAmountMap).map(([k, v]) => [k, -v])) as T });
}
@Span('kv:add')
async add ({ key, pathAndValueMap }: { key: string; pathAndValueMap: Record<string, unknown>; }): Promise<unknown> {
if ( !pathAndValueMap || Object.keys(pathAndValueMap).length === 0 ) {
throw new Error('invalid use of #add: no pathAndValueMap');
@@ -637,6 +651,7 @@ export class DynamoKVStore {
return res.Attributes?.value;
}
@Span('kv:remove')
async remove ({ key, paths }: { key: string; paths: string[]; }): Promise<unknown> {
if ( !paths || paths.length === 0 ) {
throw new Error('invalid use of #remove: no paths');
@@ -698,6 +713,7 @@ export class DynamoKVStore {
}
}
@Span('kv:update')
async update ({ key, pathAndValueMap, ttl }: { key: string; pathAndValueMap: Record<string, unknown>; ttl?: number; }): Promise<unknown> {
if ( !pathAndValueMap || Object.keys(pathAndValueMap).length === 0 ) {
throw new Error('invalid use of #update: no pathAndValueMap');
@@ -764,10 +780,6 @@ export class DynamoKVStore {
return res.Attributes?.value;
}
async decr<T extends Record<string, number>>({ key, pathAndAmountMap }: { key: string; pathAndAmountMap: T; }) {
return await this.incr({ key, pathAndAmountMap: Object.fromEntries(Object.entries(pathAndAmountMap).map(([k, v]) => [k, -v])) as T });
}
async #expireAt (key: string, timestamp: number) {
const actor = Context.get('actor');
+12 -2
View File
@@ -42,7 +42,8 @@ promises.push(tracer.startActiveSpan(`job:${job.id}`, (span) => {
}));
*/
const spanify = (label, fn, tracer) => async (...args) => {
/** @type {<T extends Function>(label:string, fn:T, tracer?: unknown)=> T} */
const spanify = (label, fn, tracer) => async function (...args) {
const context = Context.get();
if ( ! context ) {
// We don't use the proper logger here because we would normally
@@ -53,12 +54,20 @@ const spanify = (label, fn, tracer) => async (...args) => {
tracer = tracer ?? context.get('services').get('traceService').tracer;
let result;
await tracer.startActiveSpan(label, async span => {
result = await fn(...args);
// eslint-disable-next-line no-invalid-this
result = await fn.apply(this, args);
span.end();
});
return result;
};
/** @type {(label: string, tracer?: unknown) => MethodDecorator} */
const Span = (label, tracer) => (_target, _propertyKey, descriptor) => {
if ( !descriptor || typeof descriptor.value !== 'function' ) return descriptor;
descriptor.value = spanify(label, descriptor.value, tracer);
return descriptor;
};
const abtest = async (label, impls) => {
const context = Context.get();
if ( ! context ) {
@@ -150,5 +159,6 @@ class ParallelTasks {
module.exports = {
ParallelTasks,
spanify,
Span,
abtest,
};
+2 -1
View File
@@ -4,6 +4,7 @@
"module": "esnext",
"moduleResolution": "bundler",
"rootDir": "./src/backend",
"experimentalDecorators": true,
"strict": true,
"forceConsistentCasingInFileNames": true,
"skipLibCheck": true,
@@ -12,4 +13,4 @@
"noEmitOnError": true,
"noImplicitAny": false
}
}
}