diff --git a/src/backend/src/modules/broadcast/BroadcastService.js b/src/backend/src/modules/broadcast/BroadcastService.js
index f444addba..3e9ea74cf 100644
--- a/src/backend/src/modules/broadcast/BroadcastService.js
+++ b/src/backend/src/modules/broadcast/BroadcastService.js
@@ -19,18 +19,12 @@
import { createHmac, randomUUID, timingSafeEqual } from 'crypto';
import { Agent as HttpsAgent } from 'https';
import axios from 'axios';
-import { Server as SocketIoServer } from 'socket.io';
import { redisClient } from '../../clients/redis/redisSingleton.js';
import { BaseService } from '../../services/BaseService.js';
import { Context } from '../../util/context.js';
import { Endpoint } from '../../util/expressutil.js';
-import { CLink } from './connection/CLink.js';
-import { SLink } from './connection/SLink.js';
export class BroadcastService extends BaseService {
- #peers = [];
- #connections = [];
- #trustedPublicKeys = {};
#peersByKey = {};
#webhookPeers = [];
#incomingLastNonceByPeer = new Map();
@@ -54,23 +48,35 @@ export class BroadcastService extends BaseService {
const outboundFlushMs = Number(this.config.outbound_flush_ms ?? 2000);
for ( const peer_config of peers ) {
- this.#trustedPublicKeys[peer_config.key] = true;
- this.#peersByKey[peer_config.key] = {
+ const peerId = this.#resolvePeerId(peer_config);
+ if ( ! peerId ) {
+ console.warn('ignoring broadcast peer config with missing key/peerId', { peer_config });
+ continue;
+ }
+
+ if ( this.#peersByKey[peerId] ) {
+ console.warn('duplicate broadcast peer id configured', {
+ peerId,
+ existing: this.#peersByKey[peerId]?.webhook_url,
+ duplicate: peer_config.webhook_url,
+ });
+ }
+
+ this.#peersByKey[peerId] = {
webhook_secret: peer_config.webhook_secret,
webhook_url: peer_config.webhook_url,
webhook: !!peer_config.webhook,
};
if ( peer_config.webhook ) {
- this.#webhookPeers.push(peer_config);
- } else {
- const peer = new CLink({
- keys: this.config.keys,
- config: peer_config,
- log: this.log,
+ this.#webhookPeers.push({
+ ...peer_config,
+ peerId,
+ });
+ } else {
+ console.warn('ignoring non-webhook broadcast peer; websocket transport is disabled', {
+ peerId,
});
- this.#peers.push(peer);
- peer.connect();
}
}
@@ -95,7 +101,15 @@ export class BroadcastService extends BaseService {
if ( meta?.from_outside ) return;
const safeMeta = this.#normalizeMeta(meta);
- this.#enqueueOutboundEvent({ key, data, meta: safeMeta });
+ const outboundEvent = { key, data, meta: safeMeta };
+
+ // Mirror local outer.gui/pub events to Redis so same-cluster replicas
+ // receive them even when this instance is the originator.
+ this.#publishWebhookEventsToRedis([outboundEvent]).catch(error => {
+ console.warn('local redis pubsub publish failed', { error, key });
+ });
+
+ this.#enqueueOutboundEvent(outboundEvent);
}
#enqueueOutboundEvent (event) {
@@ -134,21 +148,12 @@ export class BroadcastService extends BaseService {
try {
const events = [...this.#outboundEventsByDedupKey.values()];
this.#outboundEventsByDedupKey.clear();
- const message = { events };
-
- for ( const peer of this.#peers ) {
- try {
- peer.send(message);
- } catch (e) {
- console.warn(`ws broadcast send error: ${ JSON.stringify({ peer: peer.key, error: e })}`);
- }
- }
for ( const peer_config of this.#webhookPeers ) {
try {
await this.#sendWebhookToPeer(peer_config, events);
} catch (e) {
- console.warn(`webhook broadcast send error: ${ JSON.stringify({ peer: peer_config.key, error: e.message })}`);
+ console.warn(`webhook broadcast send error: ${ JSON.stringify({ peer: peer_config.peerId ?? peer_config.key, error: e.message })}`);
}
}
} finally {
@@ -166,6 +171,23 @@ export class BroadcastService extends BaseService {
return meta;
}
+ #resolvePeerId (peerConfig) {
+ if ( !peerConfig || typeof peerConfig !== 'object' ) return null;
+ const peerId = peerConfig.peerId ?? peerConfig.key;
+ if ( typeof peerId !== 'string' || peerId.trim() === '' ) return null;
+ return peerId.trim();
+ }
+
+ #isNonceReplayForPeer ({ timestamp, nonce, peerId }) {
+ const lastSeen = this.#incomingLastNonceByPeer.get(peerId);
+ if ( ! lastSeen ) return false;
+
+ // A newer timestamp should reset nonce ordering for this peer.
+ if ( timestamp > lastSeen.timestamp ) return false;
+ if ( timestamp < lastSeen.timestamp ) return true;
+ return nonce <= lastSeen.nonce;
+ }
+
async #initRedisPubSub () {
if ( typeof redisClient?.duplicate !== 'function' ) {
console.warn('redis pubsub unavailable; duplicate client is not supported');
@@ -350,7 +372,8 @@ export class BroadcastService extends BaseService {
return;
}
- const peerId = req.headers['x-broadcast-peer-id'];
+ const peerIdHeader = req.headers['x-broadcast-peer-id'];
+ const peerId = Array.isArray(peerIdHeader) ? peerIdHeader[0] : peerIdHeader;
if ( ! peerId ) {
res.status(403).send({ error: { message: 'Missing X-Broadcast-Peer-Id' } });
return;
@@ -391,8 +414,7 @@ export class BroadcastService extends BaseService {
res.status(400).send({ error: { message: 'Invalid X-Broadcast-Nonce' } });
return;
}
- const lastNonce = this.#incomingLastNonceByPeer.get(peerId) ?? -1;
- if ( nonce <= lastNonce ) {
+ if ( this.#isNonceReplayForPeer({ timestamp, nonce, peerId }) ) {
res.status(403).send({ error: { message: 'Duplicate or stale nonce' } });
return;
}
@@ -413,7 +435,7 @@ export class BroadcastService extends BaseService {
return;
}
- this.#incomingLastNonceByPeer.set(peerId, nonce);
+ this.#incomingLastNonceByPeer.set(peerId, { timestamp, nonce });
await this.#publishWebhookEventsToRedis(incomingEvents);
this.#emitIncomingEventsSequentially(incomingEvents);
@@ -422,7 +444,8 @@ export class BroadcastService extends BaseService {
}
async #sendWebhookToPeer (peer_config, events) {
- const peerId = peer_config.key;
+ const peerId = this.#resolvePeerId(peer_config);
+ if ( ! peerId ) return;
const url = peer_config.webhook_url;
const requestUrl = this.#normalizeWebhookUrl(url);
const mySecretKey = this.config.webhook?.secret ?? '';
@@ -439,7 +462,7 @@ export class BroadcastService extends BaseService {
const payloadToSign = `${timestamp}.${nextNonce}.${rawBody}`;
const signature = createHmac('sha256', mySecretKey).update(payloadToSign).digest('hex');
- const myPublicKey = this.config.webhook?.key ?? '';
+ const myPublicKey = this.config.webhook?.peerId ?? this.config.webhook?.key ?? '';
const headers = {
'Content-Type': 'application/json',
'Content-Length': String(Buffer.byteLength(rawBody)),
@@ -488,38 +511,4 @@ export class BroadcastService extends BaseService {
parsedUrl.protocol = `${this.#webhookProtocol}:`;
return parsedUrl.toString();
}
-
- async '__on_install.websockets' () {
- const svc_webServer = this.services.get('web-server');
-
- const server = svc_webServer.get_server();
-
- const io = new SocketIoServer(server, {
- cors: { origin: '*' },
- path: '/wssinternal',
- });
-
- io.on('connection', async socket => {
- const conn = new SLink({
- keys: this.config.keys,
- trustedKeys: this.#trustedPublicKeys,
- socket,
- });
- this.#connections.push(conn);
-
- conn.channels.message.on(async message => {
- const incomingEvents = this.#normalizeIncomingPayload(message);
- if ( ! incomingEvents ) {
- console.warn('invalid ws broadcast payload');
- return;
- }
-
- try {
- await this.#emitIncomingEventsSequentially(incomingEvents);
- } catch ( error ) {
- console.warn('ws broadcast receive error', { error });
- }
- });
- });
- }
}
diff --git a/src/backend/src/modules/broadcast/BroadcastService.redisPubSub.test.js b/src/backend/src/modules/broadcast/BroadcastService.redisPubSub.test.js
index b059d1be3..47655ba49 100644
--- a/src/backend/src/modules/broadcast/BroadcastService.redisPubSub.test.js
+++ b/src/backend/src/modules/broadcast/BroadcastService.redisPubSub.test.js
@@ -88,4 +88,29 @@ describe('BroadcastService redis pubsub', () => {
expect(eventService.emit).not.toHaveBeenCalled();
});
+
+ it('publishes local outer.gui/pub events to redis pubsub for replicas', async () => {
+ const publishSpy = vi.spyOn(redisClient, 'publish');
+ try {
+ await service.outBroadcastEventHandler('outer.gui.notif.message', { id: 'gui-local' }, {});
+ await wait();
+
+ const publishCall = publishSpy.mock.calls.find(([channel]) => channel === 'broadcast.webhook.events');
+ expect(publishCall).toBeDefined();
+ const [channel, payload] = publishCall;
+ expect(channel).toBe('broadcast.webhook.events');
+
+ const parsedPayload = JSON.parse(payload);
+ expect(parsedPayload.sourceId).toBeDefined();
+ expect(parsedPayload.events).toEqual([
+ {
+ key: 'outer.gui.notif.message',
+ data: { id: 'gui-local' },
+ meta: {},
+ },
+ ]);
+ } finally {
+ publishSpy.mockRestore();
+ }
+ });
});
diff --git a/src/backend/src/modules/broadcast/connection/BaseLink.js b/src/backend/src/modules/broadcast/connection/BaseLink.js
deleted file mode 100644
index 63eada97e..000000000
--- a/src/backend/src/modules/broadcast/connection/BaseLink.js
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Copyright (C) 2024-present Puter Technologies Inc.
- *
- * This file is part of Puter.
- *
- * Puter is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License as published
- * by the Free Software Foundation, either version 3 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see .
- */
-
-const { AdvancedBase } = require('@heyputer/putility');
-const { ChannelFeature } = require('../../../traits/ChannelFeature');
-
-class BaseLink extends AdvancedBase {
- static FEATURES = [
- new ChannelFeature(),
- ];
- static CHANNELS = ['message'];
-
- static MODULES = {
- crypto: require('crypto'),
- };
-
- static AUTHENTICATING = {};
- static ONLINE = {};
- static OFFLINE = {};
-
- send (data) {
- if ( this.state !== this.constructor.ONLINE ) {
- return false;
- }
-
- return this._send(data);
- }
-
- constructor () {
- super();
- this.state = this.constructor.AUTHENTICATING;
- }
-}
-
-module.exports = {
- BaseLink,
-};
diff --git a/src/backend/src/modules/broadcast/connection/CLink.js b/src/backend/src/modules/broadcast/connection/CLink.js
deleted file mode 100644
index ac1c84f3a..000000000
--- a/src/backend/src/modules/broadcast/connection/CLink.js
+++ /dev/null
@@ -1,144 +0,0 @@
-/*
- * Copyright (C) 2024-present Puter Technologies Inc.
- *
- * This file is part of Puter.
- *
- * Puter is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License as published
- * by the Free Software Foundation, either version 3 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see .
- */
-
-const { BaseLink } = require('./BaseLink');
-const { KeyPairHelper } = require('./KeyPairHelper');
-
-/**
- * Client-side link that establishes an encrypted socket.io connection.
- * Handles AES-256-CBC encryption for message transmission and uses asymmetric
- * key exchange for secure AES key distribution.
- */
-class CLink extends BaseLink {
- static MODULES = {
- sioclient: require('socket.io-client'),
- };
-
- /**
- * Encrypts the data using AES-256-CBC and sends it through the socket.
- * The data is JSON stringified, encrypted with a random IV, and transmitted
- * as a buffer along with the IV.
- *
- * @param {*} data - The data to be encrypted and sent through the socket
- * @returns {void}
- */
- _send (data) {
- if ( ! this.socket ) return;
- const require = this.require;
- const crypto = require('crypto');
- const iv = crypto.randomBytes(16);
- const cipher = crypto.createCipheriv('aes-256-cbc',
- this.aesKey,
- iv);
- const jsonified = JSON.stringify(data);
- let buffers = [];
- buffers.push(cipher.update(Buffer.from(jsonified, 'utf-8')));
- buffers.push(cipher.final());
- const buffer = Buffer.concat(buffers);
- this.socket.send({
- iv,
- message: buffer,
- });
- }
-
- /**
- * Initializes the client link with local keys, remote server configuration, and logger.
- */
- constructor ({
- keys,
- log,
- config,
- }) {
- super();
- // keys of client (local)
- this.keys = keys;
- // keys of server (remote)
- this.config = config;
- this.log = log;
- }
-
- /**
- * Establishes a socket.io connection to the configured server address.
- * Generates an AES key, encrypts it using the server's public key, and sends
- * it during the handshake. Sets up event handlers for connection lifecycle
- * and message reception.
- */
- connect () {
- let address = this.config.address;
- if ( ! (
- address.startsWith('https://') ||
- address.startsWith('http://')
- ) ) {
- address = `https://${address}`;
- }
- const socket = this.modules.sioclient(address, {
- transports: ['websocket'],
- path: '/wssinternal',
- reconnection: true,
- extraHeaders: {
- ...(this.config.host ? {
- Host: this.config.host,
- } : {}),
- },
- });
- socket.on('connect', () => {
- this.log.info('connected', {
- address,
- });
-
- const require = this.require;
- const crypto = require('crypto');
- this.aesKey = crypto.randomBytes(32);
-
- const kp_helper = new KeyPairHelper({
- kpublic: this.config.key,
- ksecret: this.keys.secret,
- });
- socket.send({
- $: 'take-my-key',
- key: this.keys.public,
- message: kp_helper.write(this.aesKey.toString('base64')),
- });
- this.state = this.constructor.ONLINE;
- });
- socket.on('disconnect', () => {
- this.log.info('disconnected', {
- address,
- });
- });
- socket.on('connect_error', e => {
- console.error('connection error', {
- address,
- e: e,
- });
- });
- socket.on('error', e => {
- console.error(e);
- });
- socket.on('message', data => {
- if ( this.state.on_message ) {
- this.state.on_message.call(this, data);
- }
- });
-
- this.socket = socket;
- }
-}
-
-module.exports = { CLink };
diff --git a/src/backend/src/modules/broadcast/connection/KeyPairHelper.js b/src/backend/src/modules/broadcast/connection/KeyPairHelper.js
deleted file mode 100644
index 4d456da82..000000000
--- a/src/backend/src/modules/broadcast/connection/KeyPairHelper.js
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Copyright (C) 2024-present Puter Technologies Inc.
- *
- * This file is part of Puter.
- *
- * Puter is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License as published
- * by the Free Software Foundation, either version 3 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see .
- */
-
-const { AdvancedBase } = require('@heyputer/putility');
-
-class KeyPairHelper extends AdvancedBase {
- static MODULES = {
- tweetnacl: require('tweetnacl'),
- };
-
- constructor ({
- kpublic,
- ksecret,
- }) {
- super();
- this.kpublic = kpublic;
- this.ksecret = ksecret;
- this.nonce_ = 0;
- }
-
- to_nacl_key_ (key) {
- const full_buffer = Buffer.from(key, 'base64');
-
- // Remove version byte (assumed to be 0x31 and ignored for now)
- const buffer = full_buffer.slice(1);
-
- return new Uint8Array(buffer);
- }
-
- get naclSecret () {
- return this.naclSecret_ ?? (
- this.naclSecret_ = this.to_nacl_key_(this.ksecret));
- }
- get naclPublic () {
- return this.naclPublic_ ?? (
- this.naclPublic_ = this.to_nacl_key_(this.kpublic));
- }
-
- write (text) {
- const require = this.require;
- const nacl = require('tweetnacl');
-
- const nonce = nacl.randomBytes(nacl.box.nonceLength);
- const message = {};
-
- const textUint8 = new Uint8Array(Buffer.from(text, 'utf-8'));
- const encryptedText = nacl.box(textUint8, nonce, this.naclPublic, this.naclSecret);
- message.text = Buffer.from(encryptedText);
- message.nonce = Buffer.from(nonce);
-
- return message;
- }
-
- read (message) {
- const require = this.require;
- const nacl = require('tweetnacl');
-
- const arr = nacl.box.open(new Uint8Array(message.text),
- new Uint8Array(message.nonce),
- this.naclPublic,
- this.naclSecret);
-
- return Buffer.from(arr).toString('utf-8');
- }
-}
-
-module.exports = {
- KeyPairHelper,
-};
diff --git a/src/backend/src/modules/broadcast/connection/SLink.js b/src/backend/src/modules/broadcast/connection/SLink.js
deleted file mode 100644
index dd8242da7..000000000
--- a/src/backend/src/modules/broadcast/connection/SLink.js
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Copyright (C) 2024-present Puter Technologies Inc.
- *
- * This file is part of Puter.
- *
- * Puter is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License as published
- * by the Free Software Foundation, either version 3 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see .
- */
-
-const { BaseLink } = require('./BaseLink');
-const { KeyPairHelper } = require('./KeyPairHelper');
-
-class SLink extends BaseLink {
- static AUTHENTICATING = {
- on_message (data) {
- if ( data.$ !== 'take-my-key' ) {
- this.disconnect();
- return;
- }
-
- const trustedKeys = this.trustedKeys;
-
- const hasKey = trustedKeys[data.key];
- if ( ! hasKey ) {
- this.disconnect();
- return;
- }
-
- const is_trusted = trustedKeys.hasOwnProperty(data.key);
- if ( ! is_trusted ) {
- this.disconnect();
- return;
- }
-
- const kp_helper = new KeyPairHelper({
- kpublic: data.key,
- ksecret: this.keys.secret,
- });
-
- const message = kp_helper.read(data.message);
- this.aesKey = Buffer.from(message, 'base64');
-
- this.state = this.constructor.ONLINE;
- },
- };
- static ONLINE = {
- on_message (data) {
- const require = this.require;
- const crypto = require('crypto');
- const decipher = crypto.createDecipheriv('aes-256-cbc',
- this.aesKey,
- data.iv);
- const buffers = [];
- buffers.push(decipher.update(data.message));
- buffers.push(decipher.final());
-
- const rawjson = Buffer.concat(buffers).toString('utf-8');
-
- const output = JSON.parse(rawjson);
-
- this.channels.message.emit(output);
- },
- };
- static OFFLINE = {
- on_message () {
- throw new Error('unexpected message');
- },
- };
-
- _send () {
- // TODO: implement as a fallback
- throw new Error('cannot send via SLink yet');
- }
-
- disconnect () {
- this.socket.disconnect();
- this.state = this.constructor.OFFLINE;
- }
-
- constructor ({
- keys,
- trustedKeys,
- socket,
- }) {
- super();
- this.state = this.constructor.AUTHENTICATING;
- // Keys of server (local)
- this.keys = keys;
- // Allowed client keys (remote)
- this.trustedKeys = trustedKeys;
- this.socket = socket;
-
- socket.on('message', data => {
- this.state.on_message.call(this, data);
- });
- }
-}
-
-module.exports = { SLink };