fix: broadcast pubsub + cleanup old code (#2695)

This commit is contained in:
Daniel Salazar
2026-03-19 13:13:01 -07:00
committed by GitHub
parent 5e2c7e0495
commit c2e77d3503
6 changed files with 81 additions and 458 deletions
@@ -19,18 +19,12 @@
import { createHmac, randomUUID, timingSafeEqual } from 'crypto';
import { Agent as HttpsAgent } from 'https';
import axios from 'axios';
import { Server as SocketIoServer } from 'socket.io';
import { redisClient } from '../../clients/redis/redisSingleton.js';
import { BaseService } from '../../services/BaseService.js';
import { Context } from '../../util/context.js';
import { Endpoint } from '../../util/expressutil.js';
import { CLink } from './connection/CLink.js';
import { SLink } from './connection/SLink.js';
export class BroadcastService extends BaseService {
#peers = [];
#connections = [];
#trustedPublicKeys = {};
#peersByKey = {};
#webhookPeers = [];
#incomingLastNonceByPeer = new Map();
@@ -54,23 +48,35 @@ export class BroadcastService extends BaseService {
const outboundFlushMs = Number(this.config.outbound_flush_ms ?? 2000);
for ( const peer_config of peers ) {
this.#trustedPublicKeys[peer_config.key] = true;
this.#peersByKey[peer_config.key] = {
const peerId = this.#resolvePeerId(peer_config);
if ( ! peerId ) {
console.warn('ignoring broadcast peer config with missing key/peerId', { peer_config });
continue;
}
if ( this.#peersByKey[peerId] ) {
console.warn('duplicate broadcast peer id configured', {
peerId,
existing: this.#peersByKey[peerId]?.webhook_url,
duplicate: peer_config.webhook_url,
});
}
this.#peersByKey[peerId] = {
webhook_secret: peer_config.webhook_secret,
webhook_url: peer_config.webhook_url,
webhook: !!peer_config.webhook,
};
if ( peer_config.webhook ) {
this.#webhookPeers.push(peer_config);
} else {
const peer = new CLink({
keys: this.config.keys,
config: peer_config,
log: this.log,
this.#webhookPeers.push({
...peer_config,
peerId,
});
} else {
console.warn('ignoring non-webhook broadcast peer; websocket transport is disabled', {
peerId,
});
this.#peers.push(peer);
peer.connect();
}
}
@@ -95,7 +101,15 @@ export class BroadcastService extends BaseService {
if ( meta?.from_outside ) return;
const safeMeta = this.#normalizeMeta(meta);
this.#enqueueOutboundEvent({ key, data, meta: safeMeta });
const outboundEvent = { key, data, meta: safeMeta };
// Mirror local outer.gui/pub events to Redis so same-cluster replicas
// receive them even when this instance is the originator.
this.#publishWebhookEventsToRedis([outboundEvent]).catch(error => {
console.warn('local redis pubsub publish failed', { error, key });
});
this.#enqueueOutboundEvent(outboundEvent);
}
#enqueueOutboundEvent (event) {
@@ -134,21 +148,12 @@ export class BroadcastService extends BaseService {
try {
const events = [...this.#outboundEventsByDedupKey.values()];
this.#outboundEventsByDedupKey.clear();
const message = { events };
for ( const peer of this.#peers ) {
try {
peer.send(message);
} catch (e) {
console.warn(`ws broadcast send error: ${ JSON.stringify({ peer: peer.key, error: e })}`);
}
}
for ( const peer_config of this.#webhookPeers ) {
try {
await this.#sendWebhookToPeer(peer_config, events);
} catch (e) {
console.warn(`webhook broadcast send error: ${ JSON.stringify({ peer: peer_config.key, error: e.message })}`);
console.warn(`webhook broadcast send error: ${ JSON.stringify({ peer: peer_config.peerId ?? peer_config.key, error: e.message })}`);
}
}
} finally {
@@ -166,6 +171,23 @@ export class BroadcastService extends BaseService {
return meta;
}
#resolvePeerId (peerConfig) {
if ( !peerConfig || typeof peerConfig !== 'object' ) return null;
const peerId = peerConfig.peerId ?? peerConfig.key;
if ( typeof peerId !== 'string' || peerId.trim() === '' ) return null;
return peerId.trim();
}
#isNonceReplayForPeer ({ timestamp, nonce, peerId }) {
const lastSeen = this.#incomingLastNonceByPeer.get(peerId);
if ( ! lastSeen ) return false;
// A newer timestamp should reset nonce ordering for this peer.
if ( timestamp > lastSeen.timestamp ) return false;
if ( timestamp < lastSeen.timestamp ) return true;
return nonce <= lastSeen.nonce;
}
async #initRedisPubSub () {
if ( typeof redisClient?.duplicate !== 'function' ) {
console.warn('redis pubsub unavailable; duplicate client is not supported');
@@ -350,7 +372,8 @@ export class BroadcastService extends BaseService {
return;
}
const peerId = req.headers['x-broadcast-peer-id'];
const peerIdHeader = req.headers['x-broadcast-peer-id'];
const peerId = Array.isArray(peerIdHeader) ? peerIdHeader[0] : peerIdHeader;
if ( ! peerId ) {
res.status(403).send({ error: { message: 'Missing X-Broadcast-Peer-Id' } });
return;
@@ -391,8 +414,7 @@ export class BroadcastService extends BaseService {
res.status(400).send({ error: { message: 'Invalid X-Broadcast-Nonce' } });
return;
}
const lastNonce = this.#incomingLastNonceByPeer.get(peerId) ?? -1;
if ( nonce <= lastNonce ) {
if ( this.#isNonceReplayForPeer({ timestamp, nonce, peerId }) ) {
res.status(403).send({ error: { message: 'Duplicate or stale nonce' } });
return;
}
@@ -413,7 +435,7 @@ export class BroadcastService extends BaseService {
return;
}
this.#incomingLastNonceByPeer.set(peerId, nonce);
this.#incomingLastNonceByPeer.set(peerId, { timestamp, nonce });
await this.#publishWebhookEventsToRedis(incomingEvents);
this.#emitIncomingEventsSequentially(incomingEvents);
@@ -422,7 +444,8 @@ export class BroadcastService extends BaseService {
}
async #sendWebhookToPeer (peer_config, events) {
const peerId = peer_config.key;
const peerId = this.#resolvePeerId(peer_config);
if ( ! peerId ) return;
const url = peer_config.webhook_url;
const requestUrl = this.#normalizeWebhookUrl(url);
const mySecretKey = this.config.webhook?.secret ?? '';
@@ -439,7 +462,7 @@ export class BroadcastService extends BaseService {
const payloadToSign = `${timestamp}.${nextNonce}.${rawBody}`;
const signature = createHmac('sha256', mySecretKey).update(payloadToSign).digest('hex');
const myPublicKey = this.config.webhook?.key ?? '';
const myPublicKey = this.config.webhook?.peerId ?? this.config.webhook?.key ?? '';
const headers = {
'Content-Type': 'application/json',
'Content-Length': String(Buffer.byteLength(rawBody)),
@@ -488,38 +511,4 @@ export class BroadcastService extends BaseService {
parsedUrl.protocol = `${this.#webhookProtocol}:`;
return parsedUrl.toString();
}
async '__on_install.websockets' () {
const svc_webServer = this.services.get('web-server');
const server = svc_webServer.get_server();
const io = new SocketIoServer(server, {
cors: { origin: '*' },
path: '/wssinternal',
});
io.on('connection', async socket => {
const conn = new SLink({
keys: this.config.keys,
trustedKeys: this.#trustedPublicKeys,
socket,
});
this.#connections.push(conn);
conn.channels.message.on(async message => {
const incomingEvents = this.#normalizeIncomingPayload(message);
if ( ! incomingEvents ) {
console.warn('invalid ws broadcast payload');
return;
}
try {
await this.#emitIncomingEventsSequentially(incomingEvents);
} catch ( error ) {
console.warn('ws broadcast receive error', { error });
}
});
});
}
}
@@ -88,4 +88,29 @@ describe('BroadcastService redis pubsub', () => {
expect(eventService.emit).not.toHaveBeenCalled();
});
it('publishes local outer.gui/pub events to redis pubsub for replicas', async () => {
const publishSpy = vi.spyOn(redisClient, 'publish');
try {
await service.outBroadcastEventHandler('outer.gui.notif.message', { id: 'gui-local' }, {});
await wait();
const publishCall = publishSpy.mock.calls.find(([channel]) => channel === 'broadcast.webhook.events');
expect(publishCall).toBeDefined();
const [channel, payload] = publishCall;
expect(channel).toBe('broadcast.webhook.events');
const parsedPayload = JSON.parse(payload);
expect(parsedPayload.sourceId).toBeDefined();
expect(parsedPayload.events).toEqual([
{
key: 'outer.gui.notif.message',
data: { id: 'gui-local' },
meta: {},
},
]);
} finally {
publishSpy.mockRestore();
}
});
});
@@ -1,53 +0,0 @@
/*
* Copyright (C) 2024-present Puter Technologies Inc.
*
* This file is part of Puter.
*
* Puter is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published
* by the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
const { AdvancedBase } = require('@heyputer/putility');
const { ChannelFeature } = require('../../../traits/ChannelFeature');
class BaseLink extends AdvancedBase {
static FEATURES = [
new ChannelFeature(),
];
static CHANNELS = ['message'];
static MODULES = {
crypto: require('crypto'),
};
static AUTHENTICATING = {};
static ONLINE = {};
static OFFLINE = {};
send (data) {
if ( this.state !== this.constructor.ONLINE ) {
return false;
}
return this._send(data);
}
constructor () {
super();
this.state = this.constructor.AUTHENTICATING;
}
}
module.exports = {
BaseLink,
};
@@ -1,144 +0,0 @@
/*
* Copyright (C) 2024-present Puter Technologies Inc.
*
* This file is part of Puter.
*
* Puter is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published
* by the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
const { BaseLink } = require('./BaseLink');
const { KeyPairHelper } = require('./KeyPairHelper');
/**
* Client-side link that establishes an encrypted socket.io connection.
* Handles AES-256-CBC encryption for message transmission and uses asymmetric
* key exchange for secure AES key distribution.
*/
class CLink extends BaseLink {
static MODULES = {
sioclient: require('socket.io-client'),
};
/**
* Encrypts the data using AES-256-CBC and sends it through the socket.
* The data is JSON stringified, encrypted with a random IV, and transmitted
* as a buffer along with the IV.
*
* @param {*} data - The data to be encrypted and sent through the socket
* @returns {void}
*/
_send (data) {
if ( ! this.socket ) return;
const require = this.require;
const crypto = require('crypto');
const iv = crypto.randomBytes(16);
const cipher = crypto.createCipheriv('aes-256-cbc',
this.aesKey,
iv);
const jsonified = JSON.stringify(data);
let buffers = [];
buffers.push(cipher.update(Buffer.from(jsonified, 'utf-8')));
buffers.push(cipher.final());
const buffer = Buffer.concat(buffers);
this.socket.send({
iv,
message: buffer,
});
}
/**
* Initializes the client link with local keys, remote server configuration, and logger.
*/
constructor ({
keys,
log,
config,
}) {
super();
// keys of client (local)
this.keys = keys;
// keys of server (remote)
this.config = config;
this.log = log;
}
/**
* Establishes a socket.io connection to the configured server address.
* Generates an AES key, encrypts it using the server's public key, and sends
* it during the handshake. Sets up event handlers for connection lifecycle
* and message reception.
*/
connect () {
let address = this.config.address;
if ( ! (
address.startsWith('https://') ||
address.startsWith('http://')
) ) {
address = `https://${address}`;
}
const socket = this.modules.sioclient(address, {
transports: ['websocket'],
path: '/wssinternal',
reconnection: true,
extraHeaders: {
...(this.config.host ? {
Host: this.config.host,
} : {}),
},
});
socket.on('connect', () => {
this.log.info('connected', {
address,
});
const require = this.require;
const crypto = require('crypto');
this.aesKey = crypto.randomBytes(32);
const kp_helper = new KeyPairHelper({
kpublic: this.config.key,
ksecret: this.keys.secret,
});
socket.send({
$: 'take-my-key',
key: this.keys.public,
message: kp_helper.write(this.aesKey.toString('base64')),
});
this.state = this.constructor.ONLINE;
});
socket.on('disconnect', () => {
this.log.info('disconnected', {
address,
});
});
socket.on('connect_error', e => {
console.error('connection error', {
address,
e: e,
});
});
socket.on('error', e => {
console.error(e);
});
socket.on('message', data => {
if ( this.state.on_message ) {
this.state.on_message.call(this, data);
}
});
this.socket = socket;
}
}
module.exports = { CLink };
@@ -1,85 +0,0 @@
/*
* Copyright (C) 2024-present Puter Technologies Inc.
*
* This file is part of Puter.
*
* Puter is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published
* by the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
const { AdvancedBase } = require('@heyputer/putility');
class KeyPairHelper extends AdvancedBase {
static MODULES = {
tweetnacl: require('tweetnacl'),
};
constructor ({
kpublic,
ksecret,
}) {
super();
this.kpublic = kpublic;
this.ksecret = ksecret;
this.nonce_ = 0;
}
to_nacl_key_ (key) {
const full_buffer = Buffer.from(key, 'base64');
// Remove version byte (assumed to be 0x31 and ignored for now)
const buffer = full_buffer.slice(1);
return new Uint8Array(buffer);
}
get naclSecret () {
return this.naclSecret_ ?? (
this.naclSecret_ = this.to_nacl_key_(this.ksecret));
}
get naclPublic () {
return this.naclPublic_ ?? (
this.naclPublic_ = this.to_nacl_key_(this.kpublic));
}
write (text) {
const require = this.require;
const nacl = require('tweetnacl');
const nonce = nacl.randomBytes(nacl.box.nonceLength);
const message = {};
const textUint8 = new Uint8Array(Buffer.from(text, 'utf-8'));
const encryptedText = nacl.box(textUint8, nonce, this.naclPublic, this.naclSecret);
message.text = Buffer.from(encryptedText);
message.nonce = Buffer.from(nonce);
return message;
}
read (message) {
const require = this.require;
const nacl = require('tweetnacl');
const arr = nacl.box.open(new Uint8Array(message.text),
new Uint8Array(message.nonce),
this.naclPublic,
this.naclSecret);
return Buffer.from(arr).toString('utf-8');
}
}
module.exports = {
KeyPairHelper,
};
@@ -1,109 +0,0 @@
/*
* Copyright (C) 2024-present Puter Technologies Inc.
*
* This file is part of Puter.
*
* Puter is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published
* by the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
const { BaseLink } = require('./BaseLink');
const { KeyPairHelper } = require('./KeyPairHelper');
class SLink extends BaseLink {
static AUTHENTICATING = {
on_message (data) {
if ( data.$ !== 'take-my-key' ) {
this.disconnect();
return;
}
const trustedKeys = this.trustedKeys;
const hasKey = trustedKeys[data.key];
if ( ! hasKey ) {
this.disconnect();
return;
}
const is_trusted = trustedKeys.hasOwnProperty(data.key);
if ( ! is_trusted ) {
this.disconnect();
return;
}
const kp_helper = new KeyPairHelper({
kpublic: data.key,
ksecret: this.keys.secret,
});
const message = kp_helper.read(data.message);
this.aesKey = Buffer.from(message, 'base64');
this.state = this.constructor.ONLINE;
},
};
static ONLINE = {
on_message (data) {
const require = this.require;
const crypto = require('crypto');
const decipher = crypto.createDecipheriv('aes-256-cbc',
this.aesKey,
data.iv);
const buffers = [];
buffers.push(decipher.update(data.message));
buffers.push(decipher.final());
const rawjson = Buffer.concat(buffers).toString('utf-8');
const output = JSON.parse(rawjson);
this.channels.message.emit(output);
},
};
static OFFLINE = {
on_message () {
throw new Error('unexpected message');
},
};
_send () {
// TODO: implement as a fallback
throw new Error('cannot send via SLink yet');
}
disconnect () {
this.socket.disconnect();
this.state = this.constructor.OFFLINE;
}
constructor ({
keys,
trustedKeys,
socket,
}) {
super();
this.state = this.constructor.AUTHENTICATING;
// Keys of server (local)
this.keys = keys;
// Allowed client keys (remote)
this.trustedKeys = trustedKeys;
this.socket = socket;
socket.on('message', data => {
this.state.on_message.call(this, data);
});
}
}
module.exports = { SLink };