feat: new kvstore operatios for more granular updates 🚀 (#2254)

This commit is contained in:
Daniel Salazar
2026-01-09 12:20:54 -08:00
committed by GitHub
parent 53133e2623
commit 23d7b751b9
6 changed files with 444 additions and 7 deletions
@@ -26,6 +26,8 @@ const BaseService = require('../../services/BaseService');
* @property {function(KVStoreDelParams): Promise<void>} del - Delete a value by key.
* @property {function(KVStoreListParams): Promise<string[]>} list - List all key-value pairs, optionally as a specific type.
* @property {function(): Promise<void>} flush - Delete all key-value pairs in the store.
* @property {(params: KVStoreUpdateParams) => Promise<unknown>} update - Update nested values by key.
* @property {(params: KVStoreAddParams) => Promise<unknown>} add - Append values into list paths by key.
* @property {(params: {key:string, pathAndAmountMap: Record<string, number>}) => Promise<unknown>} incr - Increment a numeric value by key.
* @property {(params: {key:string, pathAndAmountMap: Record<string, number>}) => Promise<unknown>} decr - Decrement a numeric value by key.
* @property {function(KVStoreExpireAtParams): Promise<number>} expireAt - Set a key to expire at a specific UNIX timestamp (seconds).
@@ -45,6 +47,15 @@ const BaseService = require('../../services/BaseService');
* @typedef {Object} KVStoreListParams
* @property {string} [as] - Optional type to list as (e.g., 'array', 'object').
*
* @typedef {Object} KVStoreUpdateParams
* @property {string} key - The key to update.
* @property {Object.<string, *>} pathAndValueMap - Map of period-joined paths to values.
* @property {number} [ttl] - Optional TTL in seconds for the whole object.
*
* @typedef {Object} KVStoreAddParams
* @property {string} key - The key to update.
* @property {Object.<string, *>} pathAndValueMap - Map of period-joined paths to values to append.
*
* @typedef {Object} KVStoreExpireAtParams
* @property {string} key - The key to set expiration for.
* @property {number} timestamp - UNIX timestamp (seconds) when the key should expire.
@@ -109,6 +120,23 @@ class KVStoreInterfaceService extends BaseService {
parameters: {},
result: { type: 'void' },
},
update: {
description: 'Update nested values by key.',
parameters: {
key: { type: 'string', required: true },
pathAndValueMap: { type: 'json', required: true, description: 'map of period-joined path to value' },
ttl: { type: 'number', description: 'optional TTL in seconds for the whole object' },
},
result: { type: 'json', description: 'The updated value' },
},
add: {
description: 'Append values into list paths by key.',
parameters: {
key: { type: 'string', required: true },
pathAndValueMap: { type: 'json', required: true, description: 'map of period-joined path to value to append' },
},
result: { type: 'json', description: 'The updated value' },
},
incr: {
description: 'Increment a value by key.',
parameters: {
@@ -151,4 +179,4 @@ class KVStoreInterfaceService extends BaseService {
module.exports = {
KVStoreInterfaceService,
};
};
@@ -192,6 +192,151 @@ describe('DynamoKVStore', async () => {
expect(valTtl).toBeNull();
});
it('updates nested paths and creates missing maps', async () => {
const actor = makeActor(12);
const key = 'update-key';
const updated = await su.sudo(actor, () => kvStore.update({
key,
pathAndValueMap: {
'profile.name': 'Ada',
'profile.stats.score': 7,
'active': true,
},
}));
expect(updated).toMatchObject({
profile: { name: 'Ada', stats: { score: 7 } },
active: true,
});
const stored = await su.sudo(actor, () => kvStore.get({ key }));
expect(stored).toMatchObject({
profile: { name: 'Ada', stats: { score: 7 } },
active: true,
});
});
it('update can set ttl for the whole object', async () => {
const actor = makeActor(13);
const key = 'update-ttl';
await su.sudo(actor, () => kvStore.update({
key,
pathAndValueMap: { 'count': 1 },
ttl: -1,
}));
const stored = await su.sudo(actor, () => kvStore.get({ key }));
expect(stored).toBeNull();
});
it('supports list index paths when updating', async () => {
const actor = makeActor(17);
const key = 'update-list-index';
await su.sudo(actor, () => kvStore.set({
key,
value: { a: { b: [1, 2] } },
}));
const updated = await su.sudo(actor, () => kvStore.update({
key,
pathAndValueMap: { 'a.b[1]': 5 },
}));
expect((updated as { a?: { b?: number[] } }).a?.b).toEqual([1, 5]);
const stored = await su.sudo(actor, () => kvStore.get({ key }));
expect((stored as { a?: { b?: number[] } }).a?.b).toEqual([1, 5]);
});
it('adds values to nested lists and creates missing maps', async () => {
const actor = makeActor(15);
const key = 'add-key';
const first = await su.sudo(actor, () => kvStore.add({
key,
pathAndValueMap: {
'a.b': 1,
},
}));
expect(first).toMatchObject({ a: { b: [1] } });
const second = await su.sudo(actor, () => kvStore.add({
key,
pathAndValueMap: {
'a.b': 2,
'a.c': ['x', 'y'],
},
}));
expect(second).toMatchObject({ a: { b: [1, 2], c: ['x', 'y'] } });
const stored = await su.sudo(actor, () => kvStore.get({ key }));
expect(stored).toMatchObject({ a: { b: [1, 2], c: ['x', 'y'] } });
});
it('supports list index paths when appending', async () => {
const actor = makeActor(18);
const key = 'add-list-index';
await su.sudo(actor, () => kvStore.set({
key,
value: { a: { b: [[1], [2]] } },
}));
const updated = await su.sudo(actor, () => kvStore.add({
key,
pathAndValueMap: { 'a.b[1]': 3 },
}));
expect((updated as { a?: { b?: number[][] } }).a?.b).toEqual([[1], [2, 3]]);
const stored = await su.sudo(actor, () => kvStore.get({ key }));
expect((stored as { a?: { b?: number[][] } }).a?.b).toEqual([[1], [2, 3]]);
});
it('incr initializes nested maps for missing keys', async () => {
const actor = makeActor(14);
const key = 'incr-missing';
const first = await su.sudo(actor, () => kvStore.incr({
key,
pathAndAmountMap: { 'a.b.c': 2, 'x': 1 },
}));
expect(first).toMatchObject({ a: { b: { c: 2 } }, x: 1 });
const second = await su.sudo(actor, () => kvStore.incr({
key,
pathAndAmountMap: { 'a.b.c': 3 },
}));
expect(second).toMatchObject({ a: { b: { c: 5 } }, x: 1 });
});
it('supports list index paths when incrementing', async () => {
const actor = makeActor(16);
const key = 'incr-list-index';
await su.sudo(actor, () => kvStore.set({
key,
value: { a: { b: [1, 2] } },
}));
const updated = await su.sudo(actor, () => kvStore.incr({
key,
pathAndAmountMap: { 'a.b[1]': 3 },
}));
expect((updated as { a?: { b?: number[] } }).a?.b).toEqual([1, 5]);
const stored = await su.sudo(actor, () => kvStore.get({ key }));
expect((stored as { a?: { b?: number[] } }).a?.b).toEqual([1, 5]);
});
it('enforces key and value size limits', async () => {
const actor = makeActor(11);
const oversizedKey = 'a'.repeat((config.kv_max_key_size as number) + 1);
@@ -326,6 +326,47 @@ export class DynamoKVStore {
async #createPaths ( namespace: string, key: string, pathList: string[]) {
const nestedMapValue = (() => {
const valueRoot: Record<string, unknown> = {};
let hasPaths = false;
pathList.forEach((valPath) => {
if ( ! valPath ) return;
hasPaths = true;
const chunks = valPath.split('.').filter(Boolean);
let cursor: Record<string, unknown> = valueRoot;
for ( let i = 0; i < chunks.length - 1; i++ ) {
const chunk = chunks[i];
const existing = cursor[chunk];
if ( !existing || typeof existing !== 'object' || Array.isArray(existing) ) {
cursor[chunk] = {};
}
cursor = cursor[chunk] as Record<string, unknown>;
}
});
return hasPaths ? valueRoot : null;
})();
if ( ! nestedMapValue ) {
return 0;
}
const isPlainObject = (value: unknown): value is Record<string, unknown> => {
return !!value && typeof value === 'object' && !Array.isArray(value);
};
const objectsEqual = (left: unknown, right: unknown): boolean => {
if ( left === right ) return true;
if ( !isPlainObject(left) || !isPlainObject(right) ) return false;
const leftKeys = Object.keys(left);
const rightKeys = Object.keys(right);
if ( leftKeys.length !== rightKeys.length ) return false;
for ( const key of leftKeys ) {
if ( ! rightKeys.includes(key) ) return false;
if ( ! objectsEqual(left[key], right[key]) ) return false;
}
return true;
};
// Collect all intermediate map paths for all entries
const allIntermediatePaths = new Set<string>();
pathList.forEach((valPath) => {
@@ -337,11 +378,11 @@ export class DynamoKVStore {
}
});
// TODO DS: make it so that the top layers are checked first to avoid creating each layer multiple times
let writeUnits = 0;
// Ensure each intermediate map layer exists by issuing a separate DynamoDB update for each
for ( const layerPath of allIntermediatePaths ) {
const orderedPaths = [...allIntermediatePaths]
.sort((left, right) => left.split('.').length - right.split('.').length);
for ( const layerPath of orderedPaths ) {
// Build attribute names for the layer
const chunks = layerPath.split('.');
const attrName = chunks.map((chunk) => `#${chunk}`.replaceAll(this.#pathCleanerRegex, '')).join('.');
@@ -350,13 +391,21 @@ export class DynamoKVStore {
const cleanedChunk = chunk.split(/\[\d*\]/g)[0];
expressionNames[`#${cleanedChunk}`.replaceAll(this.#pathCleanerRegex, '')] = cleanedChunk;
});
const isRootLayer = layerPath === 'value';
const expressionValues = isRootLayer
? { ':nestedMap': nestedMapValue }
: { ':emptyMap': {} };
const valueToken = isRootLayer ? ':nestedMap' : ':emptyMap';
// Issue update to set layer to {} if not exists
const layerUpsertRes = await this.#ddbClient.update(this.#tableName,
{ key, namespace },
`SET ${attrName} = if_not_exists(${attrName}, :emptyMap)`,
{ ':emptyMap': {} },
`SET ${attrName} = if_not_exists(${attrName}, ${valueToken})`,
expressionValues,
expressionNames);
writeUnits += layerUpsertRes.ConsumedCapacity?.CapacityUnits ?? 0;
if ( isRootLayer && objectsEqual(layerUpsertRes.Attributes?.value, nestedMapValue) ) {
return writeUnits;
}
}
return writeUnits;
}
@@ -422,6 +471,128 @@ export class DynamoKVStore {
return res.Attributes?.value;
}
async add ({ key, pathAndValueMap }: { key: string; pathAndValueMap: Record<string, unknown>; }): Promise<unknown> {
if ( !pathAndValueMap || Object.keys(pathAndValueMap).length === 0 ) {
throw new Error('invalid use of #add: no pathAndValueMap');
}
if ( key === '' ) {
throw APIError.create('field_empty', null, {
key: 'key',
});
}
const actor = Context.get('actor');
const user = actor.type?.user ?? undefined;
if ( ! user ) throw new Error('User not found');
const namespace = this.#getNameSpace(actor);
if ( this.#enableMigrationFromSQL ) {
// trigger get to move element if exists
await this.get({ key });
}
const cleanerRegex = /[:\-+/*]/g;
let writeUnits = await this.#createPaths(namespace, key, Object.keys(pathAndValueMap));
const setStatements = Object.entries(pathAndValueMap).map(([valPath, _val], idx) => {
const path = ['value', ...valPath.split('.')].filter(Boolean).join('.');
const attrName = path.split('.').map((chunk) => `#${chunk}`.replaceAll(cleanerRegex, '')).join('.');
return `${attrName} = list_append(if_not_exists(${attrName}, :emptyList${idx}), :append${idx})`;
});
const valueAttributeValues = Object.entries(pathAndValueMap).reduce((acc, [_path, val], idx) => {
acc[`:append${idx}`] = Array.isArray(val) ? val : [val];
acc[`:emptyList${idx}`] = [];
return acc;
}, {} as Record<string, unknown>);
const valueAttributeNames = Object.entries(pathAndValueMap).reduce((acc, [valPath, _val]) => {
const path = ['value', ...valPath.split('.')].filter(Boolean).join('.');
path.split('.').forEach((chunk) => {
const cleanedChunk = chunk.split(/\[\d*\]/g)[0];
acc[`#${cleanedChunk}`.replaceAll(cleanerRegex, '')] = cleanedChunk;
});
return acc;
}, {} as Record<string, string>);
const res = await this.#ddbClient.update(this.#tableName,
{ key, namespace },
`SET ${[...setStatements].join(', ')}`,
valueAttributeValues,
{ ...valueAttributeNames, '#value': 'value' });
writeUnits += res.ConsumedCapacity?.CapacityUnits ?? 0;
this.#meteringService.incrementUsage(actor, 'kv:write', writeUnits);
return res.Attributes?.value;
}
async update ({ key, pathAndValueMap, ttl }: { key: string; pathAndValueMap: Record<string, unknown>; ttl?: number; }): Promise<unknown> {
if ( !pathAndValueMap || Object.keys(pathAndValueMap).length === 0 ) {
throw new Error('invalid use of #update: no pathAndValueMap');
}
if ( key === '' ) {
throw APIError.create('field_empty', null, {
key: 'key',
});
}
const actor = Context.get('actor');
const user = actor.type?.user ?? undefined;
if ( ! user ) throw new Error('User not found');
const namespace = this.#getNameSpace(actor);
if ( this.#enableMigrationFromSQL ) {
// trigger get to move element if exists
await this.get({ key });
}
const cleanerRegex = /[:\-+/*]/g;
let writeUnits = await this.#createPaths(namespace, key, Object.keys(pathAndValueMap));
const setStatements = Object.entries(pathAndValueMap).map(([valPath, _val], idx) => {
const path = ['value', ...valPath.split('.')].filter(Boolean).join('.');
const attrName = path.split('.').map((chunk) => `#${chunk}`.replaceAll(cleanerRegex, '')).join('.');
return `${attrName} = :value${idx}`;
});
const valueAttributeValues = Object.entries(pathAndValueMap).reduce((acc, [_path, val], idx) => {
acc[`:value${idx}`] = val;
return acc;
}, {} as Record<string, unknown>);
const valueAttributeNames = Object.entries(pathAndValueMap).reduce((acc, [valPath, _val]) => {
const path = ['value', ...valPath.split('.')].filter(Boolean).join('.');
path.split('.').forEach((chunk) => {
const cleanedChunk = chunk.split(/\[\d*\]/g)[0];
acc[`#${cleanedChunk}`.replaceAll(cleanerRegex, '')] = cleanedChunk;
});
return acc;
}, {} as Record<string, string>);
if ( ttl !== undefined ) {
const ttlSeconds = Number(ttl);
if ( Number.isNaN(ttlSeconds) ) {
throw new Error('ttl must be a number');
}
const timestamp = Math.floor(Date.now() / 1000) + ttlSeconds;
setStatements.push('#ttl = :ttl');
valueAttributeValues[':ttl'] = timestamp;
valueAttributeNames['#ttl'] = 'ttl';
}
const res = await this.#ddbClient.update(this.#tableName,
{ key, namespace },
`SET ${[...setStatements].join(', ')}`,
valueAttributeValues,
{ ...valueAttributeNames, '#value': 'value' });
writeUnits += res.ConsumedCapacity?.CapacityUnits ?? 0;
this.#meteringService.incrementUsage(actor, 'kv:write', writeUnits);
return res.Attributes?.value;
}
async decr<T extends Record<string, number>>({ key, pathAndAmountMap }: { key: string; pathAndAmountMap: T; }) {
return await this.incr({ key, pathAndAmountMap: Object.fromEntries(Object.entries(pathAndAmountMap).map(([k, v]) => [k, -v])) as T });
}
+46
View File
@@ -212,6 +212,52 @@ class KV {
return utils.make_driver_method(['key'], 'puter-kvstore', undefined, 'decr').call(this, options);
};
add = async (...args) => {
let options = {};
// arguments are required
if ( !args || args.length === 0 ) {
throw ({ message: 'Arguments are required', code: 'arguments_required' });
}
options.key = args[0];
const provided = args[1];
const isPathMap = provided && typeof provided === 'object' && !Array.isArray(provided);
options.pathAndValueMap = provided === undefined ? { '': 1 } : isPathMap ? provided : { '': provided };
// key size cannot be larger than MAX_KEY_SIZE
if ( options.key.length > this.MAX_KEY_SIZE ) {
throw ({ message: `Key size cannot be larger than ${this.MAX_KEY_SIZE}`, code: 'key_too_large' });
}
return utils.make_driver_method(['key'], 'puter-kvstore', undefined, 'add').call(this, options);
};
update = utils.make_driver_method(['key', 'pathAndValueMap', 'ttl'], 'puter-kvstore', undefined, 'update', {
preprocess: (args) => {
if ( args.key === undefined || args.key === null ) {
throw { message: 'Key cannot be undefined', code: 'key_undefined' };
}
if ( args.key.length > this.MAX_KEY_SIZE ) {
throw { message: `Key size cannot be larger than ${this.MAX_KEY_SIZE}`, code: 'key_too_large' };
}
if ( args.pathAndValueMap === undefined || args.pathAndValueMap === null || Array.isArray(args.pathAndValueMap) || typeof args.pathAndValueMap !== 'object' ) {
throw { message: 'pathAndValueMap must be an object', code: 'path_map_invalid' };
}
if ( Object.keys(args.pathAndValueMap).length === 0 ) {
throw { message: 'pathAndValueMap cannot be empty', code: 'path_map_invalid' };
}
if ( args.ttl !== undefined && args.ttl !== null ) {
const ttl = Number(args.ttl);
if ( Number.isNaN(ttl) ) {
throw { message: 'ttl must be a number', code: 'ttl_invalid' };
}
args.ttl = ttl;
}
return args;
},
});
/**
* Set a time to live (in seconds) on a key. After the time to live has expired, the key will be deleted.
* Prefer this over expireAt if you want timestamp to be set by the server, to avoid issues with clock drift.
+10
View File
@@ -10,6 +10,14 @@ export interface KVIncrementPath {
[path: string]: number;
}
export interface KVUpdatePath {
[path: string]: KVValue;
}
export interface KVAddPath {
[path: string]: KVValue | KVValue[];
}
export class KV {
readonly MAX_KEY_SIZE: number;
readonly MAX_VALUE_SIZE: number;
@@ -19,6 +27,8 @@ export class KV {
del (key: string): Promise<boolean>;
incr (key: string, amount?: number | KVIncrementPath): Promise<number>;
decr (key: string, amount?: number | KVIncrementPath): Promise<number>;
add (key: string, value?: KVValue | KVAddPath): Promise<KVValue>;
update (key: string, pathAndValueMap: KVUpdatePath, ttlSeconds?: number): Promise<KVValue>;
expire (key: string, ttlSeconds: number): Promise<boolean>;
expireAt (key: string, timestampSeconds: number): Promise<boolean>;
list (pattern?: string, returnValues?: false): Promise<string[]>;
+38 -1
View File
@@ -95,6 +95,43 @@ describe('Puter KV Module', () => {
expect(finalGet).toEqual({ a: { b: -1 } });
});
it('should update a key with nested path', async () => {
const updateKey = `${TEST_KEY}-update`;
await puter.kv.set(updateKey, { profile: { name: 'old' } });
const updateRes = await puter.kv.update(updateKey, { 'profile.name': 'new' });
expect(updateRes).toEqual({ profile: { name: 'new' } });
const finalGet = await puter.kv.get(updateKey);
expect(finalGet).toEqual({ profile: { name: 'new' } });
});
it('should update a key with indexed path', async () => {
const updateKey = `${TEST_KEY}-update-index`;
await puter.kv.set(updateKey, { a: { b: [1, 2] } });
const updateRes = await puter.kv.update(updateKey, { 'a.b[1]': 5 });
expect(updateRes).toEqual({ a: { b: [1, 5] } });
const finalGet = await puter.kv.get(updateKey);
expect(finalGet).toEqual({ a: { b: [1, 5] } });
});
it('should add values into list paths', async () => {
const addKey = `${TEST_KEY}-add`;
const addRes = await puter.kv.add(addKey, { 'a.b': 1 });
expect(addRes).toEqual({ a: { b: [1] } });
const secondAdd = await puter.kv.add(addKey, { 'a.b': 2, 'a.c': ['x'] });
expect(secondAdd).toEqual({ a: { b: [1, 2], c: ['x'] } });
const finalGet = await puter.kv.get(addKey);
expect(finalGet).toEqual({ a: { b: [1, 2], c: ['x'] } });
});
it('should add values into indexed list paths', async () => {
const addKey = `${TEST_KEY}-add-index`;
await puter.kv.set(addKey, { a: { b: [[1], [2]] } });
const addRes = await puter.kv.add(addKey, { 'a.b[1]': 3 });
expect(addRes).toEqual({ a: { b: [[1], [2, 3]] } });
const finalGet = await puter.kv.get(addKey);
expect(finalGet).toEqual({ a: { b: [[1], [2, 3]] } });
});
it('should list keys', async () => {
const listRes = await puter.kv.list();
expect(Array.isArray(listRes)).toBe(true);
@@ -117,4 +154,4 @@ describe('Puter KV Module', () => {
const getRes = await puter.kv.get(TEST_KEY);
expect(getRes).toBeNull();
});
});
});