From 23d7b751b92233dfb0e4f8d7c6a23f5c52db800e Mon Sep 17 00:00:00 2001 From: Daniel Salazar Date: Fri, 9 Jan 2026 12:20:54 -0800 Subject: [PATCH] feat: new kvstore operatios for more granular updates :rocket: (#2254) --- .../kvstore/KVStoreInterfaceService.js | 30 ++- .../DynamoKVStore/DynamoKVStore.test.ts | 145 ++++++++++++++ .../DynamoKVStore/DynamoKVStore.ts | 181 +++++++++++++++++- src/puter-js/src/modules/KV.js | 46 +++++ src/puter-js/types/modules/kv.d.ts | 10 + tests/puterJsApiTests/kv.test.ts | 39 +++- 6 files changed, 444 insertions(+), 7 deletions(-) diff --git a/src/backend/src/modules/kvstore/KVStoreInterfaceService.js b/src/backend/src/modules/kvstore/KVStoreInterfaceService.js index 12cfae9a9..67f51f7ee 100644 --- a/src/backend/src/modules/kvstore/KVStoreInterfaceService.js +++ b/src/backend/src/modules/kvstore/KVStoreInterfaceService.js @@ -26,6 +26,8 @@ const BaseService = require('../../services/BaseService'); * @property {function(KVStoreDelParams): Promise} del - Delete a value by key. * @property {function(KVStoreListParams): Promise} list - List all key-value pairs, optionally as a specific type. * @property {function(): Promise} flush - Delete all key-value pairs in the store. + * @property {(params: KVStoreUpdateParams) => Promise} update - Update nested values by key. + * @property {(params: KVStoreAddParams) => Promise} add - Append values into list paths by key. * @property {(params: {key:string, pathAndAmountMap: Record}) => Promise} incr - Increment a numeric value by key. * @property {(params: {key:string, pathAndAmountMap: Record}) => Promise} decr - Decrement a numeric value by key. * @property {function(KVStoreExpireAtParams): Promise} expireAt - Set a key to expire at a specific UNIX timestamp (seconds). @@ -45,6 +47,15 @@ const BaseService = require('../../services/BaseService'); * @typedef {Object} KVStoreListParams * @property {string} [as] - Optional type to list as (e.g., 'array', 'object'). * + * @typedef {Object} KVStoreUpdateParams + * @property {string} key - The key to update. + * @property {Object.} pathAndValueMap - Map of period-joined paths to values. + * @property {number} [ttl] - Optional TTL in seconds for the whole object. + * + * @typedef {Object} KVStoreAddParams + * @property {string} key - The key to update. + * @property {Object.} pathAndValueMap - Map of period-joined paths to values to append. + * * @typedef {Object} KVStoreExpireAtParams * @property {string} key - The key to set expiration for. * @property {number} timestamp - UNIX timestamp (seconds) when the key should expire. @@ -109,6 +120,23 @@ class KVStoreInterfaceService extends BaseService { parameters: {}, result: { type: 'void' }, }, + update: { + description: 'Update nested values by key.', + parameters: { + key: { type: 'string', required: true }, + pathAndValueMap: { type: 'json', required: true, description: 'map of period-joined path to value' }, + ttl: { type: 'number', description: 'optional TTL in seconds for the whole object' }, + }, + result: { type: 'json', description: 'The updated value' }, + }, + add: { + description: 'Append values into list paths by key.', + parameters: { + key: { type: 'string', required: true }, + pathAndValueMap: { type: 'json', required: true, description: 'map of period-joined path to value to append' }, + }, + result: { type: 'json', description: 'The updated value' }, + }, incr: { description: 'Increment a value by key.', parameters: { @@ -151,4 +179,4 @@ class KVStoreInterfaceService extends BaseService { module.exports = { KVStoreInterfaceService, -}; \ No newline at end of file +}; diff --git a/src/backend/src/services/repositories/DynamoKVStore/DynamoKVStore.test.ts b/src/backend/src/services/repositories/DynamoKVStore/DynamoKVStore.test.ts index fe7da6250..464cedfee 100644 --- a/src/backend/src/services/repositories/DynamoKVStore/DynamoKVStore.test.ts +++ b/src/backend/src/services/repositories/DynamoKVStore/DynamoKVStore.test.ts @@ -192,6 +192,151 @@ describe('DynamoKVStore', async () => { expect(valTtl).toBeNull(); }); + it('updates nested paths and creates missing maps', async () => { + const actor = makeActor(12); + const key = 'update-key'; + + const updated = await su.sudo(actor, () => kvStore.update({ + key, + pathAndValueMap: { + 'profile.name': 'Ada', + 'profile.stats.score': 7, + 'active': true, + }, + })); + + expect(updated).toMatchObject({ + profile: { name: 'Ada', stats: { score: 7 } }, + active: true, + }); + + const stored = await su.sudo(actor, () => kvStore.get({ key })); + expect(stored).toMatchObject({ + profile: { name: 'Ada', stats: { score: 7 } }, + active: true, + }); + }); + + it('update can set ttl for the whole object', async () => { + const actor = makeActor(13); + const key = 'update-ttl'; + + await su.sudo(actor, () => kvStore.update({ + key, + pathAndValueMap: { 'count': 1 }, + ttl: -1, + })); + + const stored = await su.sudo(actor, () => kvStore.get({ key })); + expect(stored).toBeNull(); + }); + + it('supports list index paths when updating', async () => { + const actor = makeActor(17); + const key = 'update-list-index'; + + await su.sudo(actor, () => kvStore.set({ + key, + value: { a: { b: [1, 2] } }, + })); + + const updated = await su.sudo(actor, () => kvStore.update({ + key, + pathAndValueMap: { 'a.b[1]': 5 }, + })); + + expect((updated as { a?: { b?: number[] } }).a?.b).toEqual([1, 5]); + + const stored = await su.sudo(actor, () => kvStore.get({ key })); + expect((stored as { a?: { b?: number[] } }).a?.b).toEqual([1, 5]); + }); + + it('adds values to nested lists and creates missing maps', async () => { + const actor = makeActor(15); + const key = 'add-key'; + + const first = await su.sudo(actor, () => kvStore.add({ + key, + pathAndValueMap: { + 'a.b': 1, + }, + })); + + expect(first).toMatchObject({ a: { b: [1] } }); + + const second = await su.sudo(actor, () => kvStore.add({ + key, + pathAndValueMap: { + 'a.b': 2, + 'a.c': ['x', 'y'], + }, + })); + + expect(second).toMatchObject({ a: { b: [1, 2], c: ['x', 'y'] } }); + + const stored = await su.sudo(actor, () => kvStore.get({ key })); + expect(stored).toMatchObject({ a: { b: [1, 2], c: ['x', 'y'] } }); + }); + + it('supports list index paths when appending', async () => { + const actor = makeActor(18); + const key = 'add-list-index'; + + await su.sudo(actor, () => kvStore.set({ + key, + value: { a: { b: [[1], [2]] } }, + })); + + const updated = await su.sudo(actor, () => kvStore.add({ + key, + pathAndValueMap: { 'a.b[1]': 3 }, + })); + + expect((updated as { a?: { b?: number[][] } }).a?.b).toEqual([[1], [2, 3]]); + + const stored = await su.sudo(actor, () => kvStore.get({ key })); + expect((stored as { a?: { b?: number[][] } }).a?.b).toEqual([[1], [2, 3]]); + }); + + it('incr initializes nested maps for missing keys', async () => { + const actor = makeActor(14); + const key = 'incr-missing'; + + const first = await su.sudo(actor, () => kvStore.incr({ + key, + pathAndAmountMap: { 'a.b.c': 2, 'x': 1 }, + })); + + expect(first).toMatchObject({ a: { b: { c: 2 } }, x: 1 }); + + const second = await su.sudo(actor, () => kvStore.incr({ + key, + pathAndAmountMap: { 'a.b.c': 3 }, + })); + + expect(second).toMatchObject({ a: { b: { c: 5 } }, x: 1 }); + }); + + it('supports list index paths when incrementing', async () => { + const actor = makeActor(16); + const key = 'incr-list-index'; + + await su.sudo(actor, () => kvStore.set({ + key, + value: { a: { b: [1, 2] } }, + })); + + const updated = await su.sudo(actor, () => kvStore.incr({ + key, + pathAndAmountMap: { 'a.b[1]': 3 }, + })); + + expect((updated as { a?: { b?: number[] } }).a?.b).toEqual([1, 5]); + + const stored = await su.sudo(actor, () => kvStore.get({ key })); + expect((stored as { a?: { b?: number[] } }).a?.b).toEqual([1, 5]); + }); + it('enforces key and value size limits', async () => { const actor = makeActor(11); const oversizedKey = 'a'.repeat((config.kv_max_key_size as number) + 1); diff --git a/src/backend/src/services/repositories/DynamoKVStore/DynamoKVStore.ts b/src/backend/src/services/repositories/DynamoKVStore/DynamoKVStore.ts index 51005e621..117e762b1 100644 --- a/src/backend/src/services/repositories/DynamoKVStore/DynamoKVStore.ts +++ b/src/backend/src/services/repositories/DynamoKVStore/DynamoKVStore.ts @@ -326,6 +326,47 @@ export class DynamoKVStore { async #createPaths ( namespace: string, key: string, pathList: string[]) { + const nestedMapValue = (() => { + const valueRoot: Record = {}; + let hasPaths = false; + pathList.forEach((valPath) => { + if ( ! valPath ) return; + hasPaths = true; + const chunks = valPath.split('.').filter(Boolean); + let cursor: Record = valueRoot; + for ( let i = 0; i < chunks.length - 1; i++ ) { + const chunk = chunks[i]; + const existing = cursor[chunk]; + if ( !existing || typeof existing !== 'object' || Array.isArray(existing) ) { + cursor[chunk] = {}; + } + cursor = cursor[chunk] as Record; + } + }); + return hasPaths ? valueRoot : null; + })(); + + if ( ! nestedMapValue ) { + return 0; + } + + const isPlainObject = (value: unknown): value is Record => { + return !!value && typeof value === 'object' && !Array.isArray(value); + }; + + const objectsEqual = (left: unknown, right: unknown): boolean => { + if ( left === right ) return true; + if ( !isPlainObject(left) || !isPlainObject(right) ) return false; + const leftKeys = Object.keys(left); + const rightKeys = Object.keys(right); + if ( leftKeys.length !== rightKeys.length ) return false; + for ( const key of leftKeys ) { + if ( ! rightKeys.includes(key) ) return false; + if ( ! objectsEqual(left[key], right[key]) ) return false; + } + return true; + }; + // Collect all intermediate map paths for all entries const allIntermediatePaths = new Set(); pathList.forEach((valPath) => { @@ -337,11 +378,11 @@ export class DynamoKVStore { } }); - // TODO DS: make it so that the top layers are checked first to avoid creating each layer multiple times - let writeUnits = 0; // Ensure each intermediate map layer exists by issuing a separate DynamoDB update for each - for ( const layerPath of allIntermediatePaths ) { + const orderedPaths = [...allIntermediatePaths] + .sort((left, right) => left.split('.').length - right.split('.').length); + for ( const layerPath of orderedPaths ) { // Build attribute names for the layer const chunks = layerPath.split('.'); const attrName = chunks.map((chunk) => `#${chunk}`.replaceAll(this.#pathCleanerRegex, '')).join('.'); @@ -350,13 +391,21 @@ export class DynamoKVStore { const cleanedChunk = chunk.split(/\[\d*\]/g)[0]; expressionNames[`#${cleanedChunk}`.replaceAll(this.#pathCleanerRegex, '')] = cleanedChunk; }); + const isRootLayer = layerPath === 'value'; + const expressionValues = isRootLayer + ? { ':nestedMap': nestedMapValue } + : { ':emptyMap': {} }; + const valueToken = isRootLayer ? ':nestedMap' : ':emptyMap'; // Issue update to set layer to {} if not exists const layerUpsertRes = await this.#ddbClient.update(this.#tableName, { key, namespace }, - `SET ${attrName} = if_not_exists(${attrName}, :emptyMap)`, - { ':emptyMap': {} }, + `SET ${attrName} = if_not_exists(${attrName}, ${valueToken})`, + expressionValues, expressionNames); writeUnits += layerUpsertRes.ConsumedCapacity?.CapacityUnits ?? 0; + if ( isRootLayer && objectsEqual(layerUpsertRes.Attributes?.value, nestedMapValue) ) { + return writeUnits; + } } return writeUnits; } @@ -422,6 +471,128 @@ export class DynamoKVStore { return res.Attributes?.value; } + async add ({ key, pathAndValueMap }: { key: string; pathAndValueMap: Record; }): Promise { + if ( !pathAndValueMap || Object.keys(pathAndValueMap).length === 0 ) { + throw new Error('invalid use of #add: no pathAndValueMap'); + } + if ( key === '' ) { + throw APIError.create('field_empty', null, { + key: 'key', + }); + } + + const actor = Context.get('actor'); + + const user = actor.type?.user ?? undefined; + if ( ! user ) throw new Error('User not found'); + + const namespace = this.#getNameSpace(actor); + + if ( this.#enableMigrationFromSQL ) { + // trigger get to move element if exists + await this.get({ key }); + } + + const cleanerRegex = /[:\-+/*]/g; + + let writeUnits = await this.#createPaths(namespace, key, Object.keys(pathAndValueMap)); + + const setStatements = Object.entries(pathAndValueMap).map(([valPath, _val], idx) => { + const path = ['value', ...valPath.split('.')].filter(Boolean).join('.'); + const attrName = path.split('.').map((chunk) => `#${chunk}`.replaceAll(cleanerRegex, '')).join('.'); + return `${attrName} = list_append(if_not_exists(${attrName}, :emptyList${idx}), :append${idx})`; + }); + const valueAttributeValues = Object.entries(pathAndValueMap).reduce((acc, [_path, val], idx) => { + acc[`:append${idx}`] = Array.isArray(val) ? val : [val]; + acc[`:emptyList${idx}`] = []; + return acc; + }, {} as Record); + const valueAttributeNames = Object.entries(pathAndValueMap).reduce((acc, [valPath, _val]) => { + const path = ['value', ...valPath.split('.')].filter(Boolean).join('.'); + path.split('.').forEach((chunk) => { + const cleanedChunk = chunk.split(/\[\d*\]/g)[0]; + acc[`#${cleanedChunk}`.replaceAll(cleanerRegex, '')] = cleanedChunk; + }); + return acc; + }, {} as Record); + + const res = await this.#ddbClient.update(this.#tableName, + { key, namespace }, + `SET ${[...setStatements].join(', ')}`, + valueAttributeValues, + { ...valueAttributeNames, '#value': 'value' }); + + writeUnits += res.ConsumedCapacity?.CapacityUnits ?? 0; + this.#meteringService.incrementUsage(actor, 'kv:write', writeUnits); + return res.Attributes?.value; + } + + async update ({ key, pathAndValueMap, ttl }: { key: string; pathAndValueMap: Record; ttl?: number; }): Promise { + if ( !pathAndValueMap || Object.keys(pathAndValueMap).length === 0 ) { + throw new Error('invalid use of #update: no pathAndValueMap'); + } + if ( key === '' ) { + throw APIError.create('field_empty', null, { + key: 'key', + }); + } + + const actor = Context.get('actor'); + + const user = actor.type?.user ?? undefined; + if ( ! user ) throw new Error('User not found'); + + const namespace = this.#getNameSpace(actor); + + if ( this.#enableMigrationFromSQL ) { + // trigger get to move element if exists + await this.get({ key }); + } + + const cleanerRegex = /[:\-+/*]/g; + + let writeUnits = await this.#createPaths(namespace, key, Object.keys(pathAndValueMap)); + + const setStatements = Object.entries(pathAndValueMap).map(([valPath, _val], idx) => { + const path = ['value', ...valPath.split('.')].filter(Boolean).join('.'); + const attrName = path.split('.').map((chunk) => `#${chunk}`.replaceAll(cleanerRegex, '')).join('.'); + return `${attrName} = :value${idx}`; + }); + const valueAttributeValues = Object.entries(pathAndValueMap).reduce((acc, [_path, val], idx) => { + acc[`:value${idx}`] = val; + return acc; + }, {} as Record); + const valueAttributeNames = Object.entries(pathAndValueMap).reduce((acc, [valPath, _val]) => { + const path = ['value', ...valPath.split('.')].filter(Boolean).join('.'); + path.split('.').forEach((chunk) => { + const cleanedChunk = chunk.split(/\[\d*\]/g)[0]; + acc[`#${cleanedChunk}`.replaceAll(cleanerRegex, '')] = cleanedChunk; + }); + return acc; + }, {} as Record); + + if ( ttl !== undefined ) { + const ttlSeconds = Number(ttl); + if ( Number.isNaN(ttlSeconds) ) { + throw new Error('ttl must be a number'); + } + const timestamp = Math.floor(Date.now() / 1000) + ttlSeconds; + setStatements.push('#ttl = :ttl'); + valueAttributeValues[':ttl'] = timestamp; + valueAttributeNames['#ttl'] = 'ttl'; + } + + const res = await this.#ddbClient.update(this.#tableName, + { key, namespace }, + `SET ${[...setStatements].join(', ')}`, + valueAttributeValues, + { ...valueAttributeNames, '#value': 'value' }); + + writeUnits += res.ConsumedCapacity?.CapacityUnits ?? 0; + this.#meteringService.incrementUsage(actor, 'kv:write', writeUnits); + return res.Attributes?.value; + } + async decr>({ key, pathAndAmountMap }: { key: string; pathAndAmountMap: T; }) { return await this.incr({ key, pathAndAmountMap: Object.fromEntries(Object.entries(pathAndAmountMap).map(([k, v]) => [k, -v])) as T }); } diff --git a/src/puter-js/src/modules/KV.js b/src/puter-js/src/modules/KV.js index 4e753362d..110387070 100644 --- a/src/puter-js/src/modules/KV.js +++ b/src/puter-js/src/modules/KV.js @@ -212,6 +212,52 @@ class KV { return utils.make_driver_method(['key'], 'puter-kvstore', undefined, 'decr').call(this, options); }; + add = async (...args) => { + let options = {}; + + // arguments are required + if ( !args || args.length === 0 ) { + throw ({ message: 'Arguments are required', code: 'arguments_required' }); + } + + options.key = args[0]; + const provided = args[1]; + const isPathMap = provided && typeof provided === 'object' && !Array.isArray(provided); + options.pathAndValueMap = provided === undefined ? { '': 1 } : isPathMap ? provided : { '': provided }; + + // key size cannot be larger than MAX_KEY_SIZE + if ( options.key.length > this.MAX_KEY_SIZE ) { + throw ({ message: `Key size cannot be larger than ${this.MAX_KEY_SIZE}`, code: 'key_too_large' }); + } + + return utils.make_driver_method(['key'], 'puter-kvstore', undefined, 'add').call(this, options); + }; + + update = utils.make_driver_method(['key', 'pathAndValueMap', 'ttl'], 'puter-kvstore', undefined, 'update', { + preprocess: (args) => { + if ( args.key === undefined || args.key === null ) { + throw { message: 'Key cannot be undefined', code: 'key_undefined' }; + } + if ( args.key.length > this.MAX_KEY_SIZE ) { + throw { message: `Key size cannot be larger than ${this.MAX_KEY_SIZE}`, code: 'key_too_large' }; + } + if ( args.pathAndValueMap === undefined || args.pathAndValueMap === null || Array.isArray(args.pathAndValueMap) || typeof args.pathAndValueMap !== 'object' ) { + throw { message: 'pathAndValueMap must be an object', code: 'path_map_invalid' }; + } + if ( Object.keys(args.pathAndValueMap).length === 0 ) { + throw { message: 'pathAndValueMap cannot be empty', code: 'path_map_invalid' }; + } + if ( args.ttl !== undefined && args.ttl !== null ) { + const ttl = Number(args.ttl); + if ( Number.isNaN(ttl) ) { + throw { message: 'ttl must be a number', code: 'ttl_invalid' }; + } + args.ttl = ttl; + } + return args; + }, + }); + /** * Set a time to live (in seconds) on a key. After the time to live has expired, the key will be deleted. * Prefer this over expireAt if you want timestamp to be set by the server, to avoid issues with clock drift. diff --git a/src/puter-js/types/modules/kv.d.ts b/src/puter-js/types/modules/kv.d.ts index d9de8dcf6..a21f5b396 100644 --- a/src/puter-js/types/modules/kv.d.ts +++ b/src/puter-js/types/modules/kv.d.ts @@ -10,6 +10,14 @@ export interface KVIncrementPath { [path: string]: number; } +export interface KVUpdatePath { + [path: string]: KVValue; +} + +export interface KVAddPath { + [path: string]: KVValue | KVValue[]; +} + export class KV { readonly MAX_KEY_SIZE: number; readonly MAX_VALUE_SIZE: number; @@ -19,6 +27,8 @@ export class KV { del (key: string): Promise; incr (key: string, amount?: number | KVIncrementPath): Promise; decr (key: string, amount?: number | KVIncrementPath): Promise; + add (key: string, value?: KVValue | KVAddPath): Promise; + update (key: string, pathAndValueMap: KVUpdatePath, ttlSeconds?: number): Promise; expire (key: string, ttlSeconds: number): Promise; expireAt (key: string, timestampSeconds: number): Promise; list (pattern?: string, returnValues?: false): Promise; diff --git a/tests/puterJsApiTests/kv.test.ts b/tests/puterJsApiTests/kv.test.ts index f3554fb13..c74873cea 100644 --- a/tests/puterJsApiTests/kv.test.ts +++ b/tests/puterJsApiTests/kv.test.ts @@ -95,6 +95,43 @@ describe('Puter KV Module', () => { expect(finalGet).toEqual({ a: { b: -1 } }); }); + it('should update a key with nested path', async () => { + const updateKey = `${TEST_KEY}-update`; + await puter.kv.set(updateKey, { profile: { name: 'old' } }); + const updateRes = await puter.kv.update(updateKey, { 'profile.name': 'new' }); + expect(updateRes).toEqual({ profile: { name: 'new' } }); + const finalGet = await puter.kv.get(updateKey); + expect(finalGet).toEqual({ profile: { name: 'new' } }); + }); + + it('should update a key with indexed path', async () => { + const updateKey = `${TEST_KEY}-update-index`; + await puter.kv.set(updateKey, { a: { b: [1, 2] } }); + const updateRes = await puter.kv.update(updateKey, { 'a.b[1]': 5 }); + expect(updateRes).toEqual({ a: { b: [1, 5] } }); + const finalGet = await puter.kv.get(updateKey); + expect(finalGet).toEqual({ a: { b: [1, 5] } }); + }); + + it('should add values into list paths', async () => { + const addKey = `${TEST_KEY}-add`; + const addRes = await puter.kv.add(addKey, { 'a.b': 1 }); + expect(addRes).toEqual({ a: { b: [1] } }); + const secondAdd = await puter.kv.add(addKey, { 'a.b': 2, 'a.c': ['x'] }); + expect(secondAdd).toEqual({ a: { b: [1, 2], c: ['x'] } }); + const finalGet = await puter.kv.get(addKey); + expect(finalGet).toEqual({ a: { b: [1, 2], c: ['x'] } }); + }); + + it('should add values into indexed list paths', async () => { + const addKey = `${TEST_KEY}-add-index`; + await puter.kv.set(addKey, { a: { b: [[1], [2]] } }); + const addRes = await puter.kv.add(addKey, { 'a.b[1]': 3 }); + expect(addRes).toEqual({ a: { b: [[1], [2, 3]] } }); + const finalGet = await puter.kv.get(addKey); + expect(finalGet).toEqual({ a: { b: [[1], [2, 3]] } }); + }); + it('should list keys', async () => { const listRes = await puter.kv.list(); expect(Array.isArray(listRes)).toBe(true); @@ -117,4 +154,4 @@ describe('Puter KV Module', () => { const getRes = await puter.kv.get(TEST_KEY); expect(getRes).toBeNull(); }); -}); \ No newline at end of file +});