From c2e77d35034793ebfee412d5276897d2a10169ee Mon Sep 17 00:00:00 2001 From: Daniel Salazar Date: Thu, 19 Mar 2026 13:13:01 -0700 Subject: [PATCH] fix: broadcast pubsub + cleanup old code (#2695) --- .../src/modules/broadcast/BroadcastService.js | 123 +++++++-------- .../BroadcastService.redisPubSub.test.js | 25 +++ .../modules/broadcast/connection/BaseLink.js | 53 ------- .../src/modules/broadcast/connection/CLink.js | 144 ------------------ .../broadcast/connection/KeyPairHelper.js | 85 ----------- .../src/modules/broadcast/connection/SLink.js | 109 ------------- 6 files changed, 81 insertions(+), 458 deletions(-) delete mode 100644 src/backend/src/modules/broadcast/connection/BaseLink.js delete mode 100644 src/backend/src/modules/broadcast/connection/CLink.js delete mode 100644 src/backend/src/modules/broadcast/connection/KeyPairHelper.js delete mode 100644 src/backend/src/modules/broadcast/connection/SLink.js diff --git a/src/backend/src/modules/broadcast/BroadcastService.js b/src/backend/src/modules/broadcast/BroadcastService.js index f444addba..3e9ea74cf 100644 --- a/src/backend/src/modules/broadcast/BroadcastService.js +++ b/src/backend/src/modules/broadcast/BroadcastService.js @@ -19,18 +19,12 @@ import { createHmac, randomUUID, timingSafeEqual } from 'crypto'; import { Agent as HttpsAgent } from 'https'; import axios from 'axios'; -import { Server as SocketIoServer } from 'socket.io'; import { redisClient } from '../../clients/redis/redisSingleton.js'; import { BaseService } from '../../services/BaseService.js'; import { Context } from '../../util/context.js'; import { Endpoint } from '../../util/expressutil.js'; -import { CLink } from './connection/CLink.js'; -import { SLink } from './connection/SLink.js'; export class BroadcastService extends BaseService { - #peers = []; - #connections = []; - #trustedPublicKeys = {}; #peersByKey = {}; #webhookPeers = []; #incomingLastNonceByPeer = new Map(); @@ -54,23 +48,35 @@ export class BroadcastService extends BaseService { const outboundFlushMs = Number(this.config.outbound_flush_ms ?? 2000); for ( const peer_config of peers ) { - this.#trustedPublicKeys[peer_config.key] = true; - this.#peersByKey[peer_config.key] = { + const peerId = this.#resolvePeerId(peer_config); + if ( ! peerId ) { + console.warn('ignoring broadcast peer config with missing key/peerId', { peer_config }); + continue; + } + + if ( this.#peersByKey[peerId] ) { + console.warn('duplicate broadcast peer id configured', { + peerId, + existing: this.#peersByKey[peerId]?.webhook_url, + duplicate: peer_config.webhook_url, + }); + } + + this.#peersByKey[peerId] = { webhook_secret: peer_config.webhook_secret, webhook_url: peer_config.webhook_url, webhook: !!peer_config.webhook, }; if ( peer_config.webhook ) { - this.#webhookPeers.push(peer_config); - } else { - const peer = new CLink({ - keys: this.config.keys, - config: peer_config, - log: this.log, + this.#webhookPeers.push({ + ...peer_config, + peerId, + }); + } else { + console.warn('ignoring non-webhook broadcast peer; websocket transport is disabled', { + peerId, }); - this.#peers.push(peer); - peer.connect(); } } @@ -95,7 +101,15 @@ export class BroadcastService extends BaseService { if ( meta?.from_outside ) return; const safeMeta = this.#normalizeMeta(meta); - this.#enqueueOutboundEvent({ key, data, meta: safeMeta }); + const outboundEvent = { key, data, meta: safeMeta }; + + // Mirror local outer.gui/pub events to Redis so same-cluster replicas + // receive them even when this instance is the originator. + this.#publishWebhookEventsToRedis([outboundEvent]).catch(error => { + console.warn('local redis pubsub publish failed', { error, key }); + }); + + this.#enqueueOutboundEvent(outboundEvent); } #enqueueOutboundEvent (event) { @@ -134,21 +148,12 @@ export class BroadcastService extends BaseService { try { const events = [...this.#outboundEventsByDedupKey.values()]; this.#outboundEventsByDedupKey.clear(); - const message = { events }; - - for ( const peer of this.#peers ) { - try { - peer.send(message); - } catch (e) { - console.warn(`ws broadcast send error: ${ JSON.stringify({ peer: peer.key, error: e })}`); - } - } for ( const peer_config of this.#webhookPeers ) { try { await this.#sendWebhookToPeer(peer_config, events); } catch (e) { - console.warn(`webhook broadcast send error: ${ JSON.stringify({ peer: peer_config.key, error: e.message })}`); + console.warn(`webhook broadcast send error: ${ JSON.stringify({ peer: peer_config.peerId ?? peer_config.key, error: e.message })}`); } } } finally { @@ -166,6 +171,23 @@ export class BroadcastService extends BaseService { return meta; } + #resolvePeerId (peerConfig) { + if ( !peerConfig || typeof peerConfig !== 'object' ) return null; + const peerId = peerConfig.peerId ?? peerConfig.key; + if ( typeof peerId !== 'string' || peerId.trim() === '' ) return null; + return peerId.trim(); + } + + #isNonceReplayForPeer ({ timestamp, nonce, peerId }) { + const lastSeen = this.#incomingLastNonceByPeer.get(peerId); + if ( ! lastSeen ) return false; + + // A newer timestamp should reset nonce ordering for this peer. + if ( timestamp > lastSeen.timestamp ) return false; + if ( timestamp < lastSeen.timestamp ) return true; + return nonce <= lastSeen.nonce; + } + async #initRedisPubSub () { if ( typeof redisClient?.duplicate !== 'function' ) { console.warn('redis pubsub unavailable; duplicate client is not supported'); @@ -350,7 +372,8 @@ export class BroadcastService extends BaseService { return; } - const peerId = req.headers['x-broadcast-peer-id']; + const peerIdHeader = req.headers['x-broadcast-peer-id']; + const peerId = Array.isArray(peerIdHeader) ? peerIdHeader[0] : peerIdHeader; if ( ! peerId ) { res.status(403).send({ error: { message: 'Missing X-Broadcast-Peer-Id' } }); return; @@ -391,8 +414,7 @@ export class BroadcastService extends BaseService { res.status(400).send({ error: { message: 'Invalid X-Broadcast-Nonce' } }); return; } - const lastNonce = this.#incomingLastNonceByPeer.get(peerId) ?? -1; - if ( nonce <= lastNonce ) { + if ( this.#isNonceReplayForPeer({ timestamp, nonce, peerId }) ) { res.status(403).send({ error: { message: 'Duplicate or stale nonce' } }); return; } @@ -413,7 +435,7 @@ export class BroadcastService extends BaseService { return; } - this.#incomingLastNonceByPeer.set(peerId, nonce); + this.#incomingLastNonceByPeer.set(peerId, { timestamp, nonce }); await this.#publishWebhookEventsToRedis(incomingEvents); this.#emitIncomingEventsSequentially(incomingEvents); @@ -422,7 +444,8 @@ export class BroadcastService extends BaseService { } async #sendWebhookToPeer (peer_config, events) { - const peerId = peer_config.key; + const peerId = this.#resolvePeerId(peer_config); + if ( ! peerId ) return; const url = peer_config.webhook_url; const requestUrl = this.#normalizeWebhookUrl(url); const mySecretKey = this.config.webhook?.secret ?? ''; @@ -439,7 +462,7 @@ export class BroadcastService extends BaseService { const payloadToSign = `${timestamp}.${nextNonce}.${rawBody}`; const signature = createHmac('sha256', mySecretKey).update(payloadToSign).digest('hex'); - const myPublicKey = this.config.webhook?.key ?? ''; + const myPublicKey = this.config.webhook?.peerId ?? this.config.webhook?.key ?? ''; const headers = { 'Content-Type': 'application/json', 'Content-Length': String(Buffer.byteLength(rawBody)), @@ -488,38 +511,4 @@ export class BroadcastService extends BaseService { parsedUrl.protocol = `${this.#webhookProtocol}:`; return parsedUrl.toString(); } - - async '__on_install.websockets' () { - const svc_webServer = this.services.get('web-server'); - - const server = svc_webServer.get_server(); - - const io = new SocketIoServer(server, { - cors: { origin: '*' }, - path: '/wssinternal', - }); - - io.on('connection', async socket => { - const conn = new SLink({ - keys: this.config.keys, - trustedKeys: this.#trustedPublicKeys, - socket, - }); - this.#connections.push(conn); - - conn.channels.message.on(async message => { - const incomingEvents = this.#normalizeIncomingPayload(message); - if ( ! incomingEvents ) { - console.warn('invalid ws broadcast payload'); - return; - } - - try { - await this.#emitIncomingEventsSequentially(incomingEvents); - } catch ( error ) { - console.warn('ws broadcast receive error', { error }); - } - }); - }); - } } diff --git a/src/backend/src/modules/broadcast/BroadcastService.redisPubSub.test.js b/src/backend/src/modules/broadcast/BroadcastService.redisPubSub.test.js index b059d1be3..47655ba49 100644 --- a/src/backend/src/modules/broadcast/BroadcastService.redisPubSub.test.js +++ b/src/backend/src/modules/broadcast/BroadcastService.redisPubSub.test.js @@ -88,4 +88,29 @@ describe('BroadcastService redis pubsub', () => { expect(eventService.emit).not.toHaveBeenCalled(); }); + + it('publishes local outer.gui/pub events to redis pubsub for replicas', async () => { + const publishSpy = vi.spyOn(redisClient, 'publish'); + try { + await service.outBroadcastEventHandler('outer.gui.notif.message', { id: 'gui-local' }, {}); + await wait(); + + const publishCall = publishSpy.mock.calls.find(([channel]) => channel === 'broadcast.webhook.events'); + expect(publishCall).toBeDefined(); + const [channel, payload] = publishCall; + expect(channel).toBe('broadcast.webhook.events'); + + const parsedPayload = JSON.parse(payload); + expect(parsedPayload.sourceId).toBeDefined(); + expect(parsedPayload.events).toEqual([ + { + key: 'outer.gui.notif.message', + data: { id: 'gui-local' }, + meta: {}, + }, + ]); + } finally { + publishSpy.mockRestore(); + } + }); }); diff --git a/src/backend/src/modules/broadcast/connection/BaseLink.js b/src/backend/src/modules/broadcast/connection/BaseLink.js deleted file mode 100644 index 63eada97e..000000000 --- a/src/backend/src/modules/broadcast/connection/BaseLink.js +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Copyright (C) 2024-present Puter Technologies Inc. - * - * This file is part of Puter. - * - * Puter is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as published - * by the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -const { AdvancedBase } = require('@heyputer/putility'); -const { ChannelFeature } = require('../../../traits/ChannelFeature'); - -class BaseLink extends AdvancedBase { - static FEATURES = [ - new ChannelFeature(), - ]; - static CHANNELS = ['message']; - - static MODULES = { - crypto: require('crypto'), - }; - - static AUTHENTICATING = {}; - static ONLINE = {}; - static OFFLINE = {}; - - send (data) { - if ( this.state !== this.constructor.ONLINE ) { - return false; - } - - return this._send(data); - } - - constructor () { - super(); - this.state = this.constructor.AUTHENTICATING; - } -} - -module.exports = { - BaseLink, -}; diff --git a/src/backend/src/modules/broadcast/connection/CLink.js b/src/backend/src/modules/broadcast/connection/CLink.js deleted file mode 100644 index ac1c84f3a..000000000 --- a/src/backend/src/modules/broadcast/connection/CLink.js +++ /dev/null @@ -1,144 +0,0 @@ -/* - * Copyright (C) 2024-present Puter Technologies Inc. - * - * This file is part of Puter. - * - * Puter is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as published - * by the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -const { BaseLink } = require('./BaseLink'); -const { KeyPairHelper } = require('./KeyPairHelper'); - -/** - * Client-side link that establishes an encrypted socket.io connection. - * Handles AES-256-CBC encryption for message transmission and uses asymmetric - * key exchange for secure AES key distribution. - */ -class CLink extends BaseLink { - static MODULES = { - sioclient: require('socket.io-client'), - }; - - /** - * Encrypts the data using AES-256-CBC and sends it through the socket. - * The data is JSON stringified, encrypted with a random IV, and transmitted - * as a buffer along with the IV. - * - * @param {*} data - The data to be encrypted and sent through the socket - * @returns {void} - */ - _send (data) { - if ( ! this.socket ) return; - const require = this.require; - const crypto = require('crypto'); - const iv = crypto.randomBytes(16); - const cipher = crypto.createCipheriv('aes-256-cbc', - this.aesKey, - iv); - const jsonified = JSON.stringify(data); - let buffers = []; - buffers.push(cipher.update(Buffer.from(jsonified, 'utf-8'))); - buffers.push(cipher.final()); - const buffer = Buffer.concat(buffers); - this.socket.send({ - iv, - message: buffer, - }); - } - - /** - * Initializes the client link with local keys, remote server configuration, and logger. - */ - constructor ({ - keys, - log, - config, - }) { - super(); - // keys of client (local) - this.keys = keys; - // keys of server (remote) - this.config = config; - this.log = log; - } - - /** - * Establishes a socket.io connection to the configured server address. - * Generates an AES key, encrypts it using the server's public key, and sends - * it during the handshake. Sets up event handlers for connection lifecycle - * and message reception. - */ - connect () { - let address = this.config.address; - if ( ! ( - address.startsWith('https://') || - address.startsWith('http://') - ) ) { - address = `https://${address}`; - } - const socket = this.modules.sioclient(address, { - transports: ['websocket'], - path: '/wssinternal', - reconnection: true, - extraHeaders: { - ...(this.config.host ? { - Host: this.config.host, - } : {}), - }, - }); - socket.on('connect', () => { - this.log.info('connected', { - address, - }); - - const require = this.require; - const crypto = require('crypto'); - this.aesKey = crypto.randomBytes(32); - - const kp_helper = new KeyPairHelper({ - kpublic: this.config.key, - ksecret: this.keys.secret, - }); - socket.send({ - $: 'take-my-key', - key: this.keys.public, - message: kp_helper.write(this.aesKey.toString('base64')), - }); - this.state = this.constructor.ONLINE; - }); - socket.on('disconnect', () => { - this.log.info('disconnected', { - address, - }); - }); - socket.on('connect_error', e => { - console.error('connection error', { - address, - e: e, - }); - }); - socket.on('error', e => { - console.error(e); - }); - socket.on('message', data => { - if ( this.state.on_message ) { - this.state.on_message.call(this, data); - } - }); - - this.socket = socket; - } -} - -module.exports = { CLink }; diff --git a/src/backend/src/modules/broadcast/connection/KeyPairHelper.js b/src/backend/src/modules/broadcast/connection/KeyPairHelper.js deleted file mode 100644 index 4d456da82..000000000 --- a/src/backend/src/modules/broadcast/connection/KeyPairHelper.js +++ /dev/null @@ -1,85 +0,0 @@ -/* - * Copyright (C) 2024-present Puter Technologies Inc. - * - * This file is part of Puter. - * - * Puter is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as published - * by the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -const { AdvancedBase } = require('@heyputer/putility'); - -class KeyPairHelper extends AdvancedBase { - static MODULES = { - tweetnacl: require('tweetnacl'), - }; - - constructor ({ - kpublic, - ksecret, - }) { - super(); - this.kpublic = kpublic; - this.ksecret = ksecret; - this.nonce_ = 0; - } - - to_nacl_key_ (key) { - const full_buffer = Buffer.from(key, 'base64'); - - // Remove version byte (assumed to be 0x31 and ignored for now) - const buffer = full_buffer.slice(1); - - return new Uint8Array(buffer); - } - - get naclSecret () { - return this.naclSecret_ ?? ( - this.naclSecret_ = this.to_nacl_key_(this.ksecret)); - } - get naclPublic () { - return this.naclPublic_ ?? ( - this.naclPublic_ = this.to_nacl_key_(this.kpublic)); - } - - write (text) { - const require = this.require; - const nacl = require('tweetnacl'); - - const nonce = nacl.randomBytes(nacl.box.nonceLength); - const message = {}; - - const textUint8 = new Uint8Array(Buffer.from(text, 'utf-8')); - const encryptedText = nacl.box(textUint8, nonce, this.naclPublic, this.naclSecret); - message.text = Buffer.from(encryptedText); - message.nonce = Buffer.from(nonce); - - return message; - } - - read (message) { - const require = this.require; - const nacl = require('tweetnacl'); - - const arr = nacl.box.open(new Uint8Array(message.text), - new Uint8Array(message.nonce), - this.naclPublic, - this.naclSecret); - - return Buffer.from(arr).toString('utf-8'); - } -} - -module.exports = { - KeyPairHelper, -}; diff --git a/src/backend/src/modules/broadcast/connection/SLink.js b/src/backend/src/modules/broadcast/connection/SLink.js deleted file mode 100644 index dd8242da7..000000000 --- a/src/backend/src/modules/broadcast/connection/SLink.js +++ /dev/null @@ -1,109 +0,0 @@ -/* - * Copyright (C) 2024-present Puter Technologies Inc. - * - * This file is part of Puter. - * - * Puter is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as published - * by the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -const { BaseLink } = require('./BaseLink'); -const { KeyPairHelper } = require('./KeyPairHelper'); - -class SLink extends BaseLink { - static AUTHENTICATING = { - on_message (data) { - if ( data.$ !== 'take-my-key' ) { - this.disconnect(); - return; - } - - const trustedKeys = this.trustedKeys; - - const hasKey = trustedKeys[data.key]; - if ( ! hasKey ) { - this.disconnect(); - return; - } - - const is_trusted = trustedKeys.hasOwnProperty(data.key); - if ( ! is_trusted ) { - this.disconnect(); - return; - } - - const kp_helper = new KeyPairHelper({ - kpublic: data.key, - ksecret: this.keys.secret, - }); - - const message = kp_helper.read(data.message); - this.aesKey = Buffer.from(message, 'base64'); - - this.state = this.constructor.ONLINE; - }, - }; - static ONLINE = { - on_message (data) { - const require = this.require; - const crypto = require('crypto'); - const decipher = crypto.createDecipheriv('aes-256-cbc', - this.aesKey, - data.iv); - const buffers = []; - buffers.push(decipher.update(data.message)); - buffers.push(decipher.final()); - - const rawjson = Buffer.concat(buffers).toString('utf-8'); - - const output = JSON.parse(rawjson); - - this.channels.message.emit(output); - }, - }; - static OFFLINE = { - on_message () { - throw new Error('unexpected message'); - }, - }; - - _send () { - // TODO: implement as a fallback - throw new Error('cannot send via SLink yet'); - } - - disconnect () { - this.socket.disconnect(); - this.state = this.constructor.OFFLINE; - } - - constructor ({ - keys, - trustedKeys, - socket, - }) { - super(); - this.state = this.constructor.AUTHENTICATING; - // Keys of server (local) - this.keys = keys; - // Allowed client keys (remote) - this.trustedKeys = trustedKeys; - this.socket = socket; - - socket.on('message', data => { - this.state.on_message.call(this, data); - }); - } -} - -module.exports = { SLink };