From 9347644f81758bac24ea3957eaa2cade2af3fd2b Mon Sep 17 00:00:00 2001 From: Daniel Salazar Date: Tue, 17 Mar 2026 18:45:23 -0700 Subject: [PATCH] feat: redis pubsub for multiple connected clients in broadcast service and webhook fixes (#2681) * debug: logs for webhook broadcasts * fix: broadcast webhook calls * fix: add error log with body * fix: fetch form * feat: redis pubsub for multiple connected clients in broadcast service * test: add pubsub tests --- .../src/modules/broadcast/BroadcastService.js | 181 ++++++++++++++++-- .../BroadcastService.redisPubSub.test.js | 91 +++++++++ 2 files changed, 253 insertions(+), 19 deletions(-) create mode 100644 src/backend/src/modules/broadcast/BroadcastService.redisPubSub.test.js diff --git a/src/backend/src/modules/broadcast/BroadcastService.js b/src/backend/src/modules/broadcast/BroadcastService.js index ad33218fb..f444addba 100644 --- a/src/backend/src/modules/broadcast/BroadcastService.js +++ b/src/backend/src/modules/broadcast/BroadcastService.js @@ -16,8 +16,11 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ -import { createHmac, timingSafeEqual } from 'crypto'; +import { createHmac, randomUUID, timingSafeEqual } from 'crypto'; +import { Agent as HttpsAgent } from 'https'; +import axios from 'axios'; import { Server as SocketIoServer } from 'socket.io'; +import { redisClient } from '../../clients/redis/redisSingleton.js'; import { BaseService } from '../../services/BaseService.js'; import { Context } from '../../util/context.js'; import { Endpoint } from '../../util/expressutil.js'; @@ -38,11 +41,17 @@ export class BroadcastService extends BaseService { #dedupFallbackCounter = 0; #webhookReplayWindowSeconds = 300; #outboundFlushMs = 5000; + #webhookHostHeader = null; + #webhookProtocol = 'https'; + #webhookHttpsAgent = new HttpsAgent({ rejectUnauthorized: false }); + #redisPubSubChannel = 'broadcast.webhook.events'; + #redisSubscriber = null; + #redisSourceId = randomUUID(); async _init () { const peers = this.config.peers ?? []; const replayWindowSeconds = this.config.webhook_replay_window_seconds ?? 300; - const outboundFlushMs = Number(this.config.outbound_flush_ms ?? 5000); + const outboundFlushMs = Number(this.config.outbound_flush_ms ?? 2000); for ( const peer_config of peers ) { this.#trustedPublicKeys[peer_config.key] = true; @@ -69,6 +78,14 @@ export class BroadcastService extends BaseService { this.#outboundFlushMs = Number.isFinite(outboundFlushMs) && outboundFlushMs >= 0 ? outboundFlushMs : 5000; + this.#webhookHostHeader = this.global_config.domain; + { + const protocol = String(this.global_config.protocol ?? '').trim().replace(/:$/, '').toLowerCase(); + this.#webhookProtocol = protocol === 'http' || protocol === 'https' ? protocol : 'https'; + } + this.#redisSourceId = `${String(this.global_config?.server_id ?? 'local')}:${randomUUID()}`; + + await this.#initRedisPubSub(); const svc_event = this.services.get('event'); svc_event.on('outer.*', this.outBroadcastEventHandler.bind(this)); @@ -100,11 +117,13 @@ export class BroadcastService extends BaseService { #scheduleOutboundFlush () { if ( this.#outboundFlushTimer ) return; - this.#outboundFlushTimer = setTimeout(() => { + this.#outboundFlushTimer = setTimeout(async () => { this.#outboundFlushTimer = null; - this.#flushOutboundEvents().catch(error => { + try { + await this.#flushOutboundEvents(); + } catch ( error ) { console.warn('outbound broadcast flush failed', { error }); - }); + } }, this.#outboundFlushMs); } @@ -129,7 +148,7 @@ export class BroadcastService extends BaseService { try { await this.#sendWebhookToPeer(peer_config, events); } catch (e) { - console.warn(`webhook broadcast send error: ${ JSON.stringify({ peer: peer_config.key, error: e })}`); + console.warn(`webhook broadcast send error: ${ JSON.stringify({ peer: peer_config.key, error: e.message })}`); } } } finally { @@ -147,6 +166,97 @@ export class BroadcastService extends BaseService { return meta; } + async #initRedisPubSub () { + if ( typeof redisClient?.duplicate !== 'function' ) { + console.warn('redis pubsub unavailable; duplicate client is not supported'); + return; + } + + try { + this.#redisSubscriber = redisClient.duplicate(); + this.#redisSubscriber.on('error', error => { + console.warn('redis pubsub subscriber error', { error }); + }); + this.#redisSubscriber.on('message', (channel, message) => { + this.#handleRedisPubSubMessage(channel, message).catch(error => { + console.warn('redis pubsub message handling error', { error }); + }); + }); + await this.#redisSubscriber.subscribe(this.#redisPubSubChannel); + } catch ( error ) { + console.warn('failed to initialize redis pubsub subscriber', { error }); + this.#redisSubscriber = null; + } + } + + #isRedisWebhookEventKey (key) { + if ( typeof key !== 'string' ) return false; + return key === 'outer.gui' || + key.startsWith('outer.gui.') || + key === 'outer.pub' || + key.startsWith('outer.pub.'); + } + + #filterRedisWebhookEvents (events) { + return events.filter(event => this.#isRedisWebhookEventKey(event?.key)); + } + + async #publishWebhookEventsToRedis (events) { + if ( !Array.isArray(events) || events.length === 0 ) return; + + const eventsToPublish = this.#filterRedisWebhookEvents(events); + if ( eventsToPublish.length === 0 ) return; + + let payload; + try { + payload = JSON.stringify({ + sourceId: this.#redisSourceId, + events: eventsToPublish, + }); + } catch ( error ) { + console.warn('redis pubsub publish failed: payload not serializable', { error }); + return; + } + + try { + await redisClient.publish(this.#redisPubSubChannel, payload); + } catch ( error ) { + console.warn('redis pubsub publish failed', { error }); + } + } + + async #handleRedisPubSubMessage (channel, message) { + if ( channel !== this.#redisPubSubChannel ) return; + + let payload; + try { + payload = JSON.parse(message); + } catch { + console.warn('invalid redis pubsub payload: not json'); + return; + } + + if ( !payload || typeof payload !== 'object' || Array.isArray(payload) ) { + console.warn('invalid redis pubsub payload: expected object'); + return; + } + + if ( payload.sourceId && payload.sourceId === this.#redisSourceId ) { + return; + } + + const incomingEvents = this.#normalizeIncomingPayload(payload); + if ( ! incomingEvents ) { + console.warn('invalid redis pubsub payload: invalid events'); + return; + } + + const eventsToEmit = this.#filterRedisWebhookEvents(incomingEvents); + if ( eventsToEmit.length === 0 ) return; + + await this.#emitIncomingEventsSequentially(eventsToEmit); + } + #normalizeIncomingPayload (payload) { if ( !payload || typeof payload !== 'object' || Array.isArray(payload) ) { return null; @@ -246,8 +356,6 @@ export class BroadcastService extends BaseService { return; } - console.log('received peerId', { value: peerId }); - const peer = this.#peersByKey[peerId]; if ( !peer || !peer.webhook_secret ) { res.status(403).send({ error: { message: 'Unknown peer or webhook not configured' } }); @@ -307,6 +415,7 @@ export class BroadcastService extends BaseService { this.#incomingLastNonceByPeer.set(peerId, nonce); + await this.#publishWebhookEventsToRedis(incomingEvents); this.#emitIncomingEventsSequentially(incomingEvents); res.status(200).send({ ok: true }); @@ -315,8 +424,10 @@ export class BroadcastService extends BaseService { async #sendWebhookToPeer (peer_config, events) { const peerId = peer_config.key; const url = peer_config.webhook_url; + const requestUrl = this.#normalizeWebhookUrl(url); const mySecretKey = this.config.webhook?.secret ?? ''; - if ( !url || !mySecretKey ) return; + + if ( !requestUrl || !mySecretKey ) return; let nextNonce = this.#outgoingNonceByPeer.get(peerId) ?? 0; this.#outgoingNonceByPeer.set(peerId, nextNonce + 1); @@ -329,23 +440,55 @@ export class BroadcastService extends BaseService { const signature = createHmac('sha256', mySecretKey).update(payloadToSign).digest('hex'); const myPublicKey = this.config.webhook?.key ?? ''; - const response = await fetch(url, { + const headers = { + 'Content-Type': 'application/json', + 'Content-Length': String(Buffer.byteLength(rawBody)), + 'X-Broadcast-Peer-Id': myPublicKey, + 'X-Broadcast-Timestamp': String(timestamp), + 'X-Broadcast-Nonce': String(nextNonce), + 'X-Broadcast-Signature': signature, + ...(this.#webhookHostHeader ? { Host: this.#webhookHostHeader } : {}), + }; + + const response = await axios.request({ method: 'POST', - headers: { - 'Content-Type': 'application/json', - 'X-Broadcast-Peer-Id': myPublicKey, - 'X-Broadcast-Timestamp': String(timestamp), - 'X-Broadcast-Nonce': String(nextNonce), - 'X-Broadcast-Signature': signature, - }, - body: rawBody, + url: requestUrl, + headers, + data: rawBody, + timeout: 15000, + validateStatus: () => true, + responseType: 'text', + transformResponse: value => value, + ...(requestUrl.startsWith('https:') + ? { httpsAgent: this.#webhookHttpsAgent } + : {}), }); - if ( ! response.ok ) { + if ( response.status < 200 || response.status >= 300 ) { + console.warn(`error with body: ${response.data}`); throw new Error(`Webhook POST failed: ${response.status} ${response.statusText}`); } } + #normalizeWebhookUrl (url) { + if ( typeof url !== 'string' || url.trim() === '' ) { + return null; + } + + const urlValue = url.trim(); + let parsedUrl; + try { + parsedUrl = urlValue.includes('://') + ? new URL(urlValue) + : new URL(`${this.#webhookProtocol}://${urlValue}`); + } catch { + return null; + } + + parsedUrl.protocol = `${this.#webhookProtocol}:`; + return parsedUrl.toString(); + } + async '__on_install.websockets' () { const svc_webServer = this.services.get('web-server'); diff --git a/src/backend/src/modules/broadcast/BroadcastService.redisPubSub.test.js b/src/backend/src/modules/broadcast/BroadcastService.redisPubSub.test.js new file mode 100644 index 000000000..b059d1be3 --- /dev/null +++ b/src/backend/src/modules/broadcast/BroadcastService.redisPubSub.test.js @@ -0,0 +1,91 @@ +import { afterAll, beforeAll, beforeEach, describe, expect, it, vi } from 'vitest'; +import { redisClient } from '../../clients/redis/redisSingleton.js'; +import { BroadcastService } from './BroadcastService.js'; + +const wait = (ms = 20) => new Promise(resolve => setTimeout(resolve, ms)); + +describe('BroadcastService redis pubsub', () => { + let eventService; + let service; + + beforeAll(async () => { + eventService = { + on: vi.fn(), + emit: vi.fn(async () => { + }), + }; + + service = new BroadcastService({ + services: { + get: (name) => { + if ( name === 'event' ) return eventService; + throw new Error(`unexpected service lookup: ${name}`); + }, + }, + config: { + domain: 'puter.com', + protocol: 'https', + server_id: 'test-broadcast-a', + services: { + broadcast: { + peers: [], + }, + }, + }, + name: 'broadcast', + args: {}, + context: { + get: () => ({ use: () => ({}) }), + }, + }); + + await service._init(); + }); + + afterAll(async () => { + }); + + beforeEach(() => { + eventService.emit.mockClear(); + }); + + it('re-emits only outer.gui/pub events from redis pubsub payloads', async () => { + await redisClient.publish('broadcast.webhook.events', JSON.stringify({ + sourceId: 'other-instance', + events: [ + { key: 'outer.gui.notif.message', data: { id: 'gui-1' }, meta: {} }, + { key: 'outer.pub.notice', data: { id: 'pub-1' }, meta: {} }, + { key: 'outer.cacheUpdate', data: { cacheKey: 'skip-me' }, meta: {} }, + ], + })); + + await wait(); + + expect(eventService.emit).toHaveBeenCalledTimes(2); + expect(eventService.emit).toHaveBeenNthCalledWith( + 1, + 'outer.gui.notif.message', + { id: 'gui-1' }, + expect.objectContaining({ from_outside: true }), + ); + expect(eventService.emit).toHaveBeenNthCalledWith( + 2, + 'outer.pub.notice', + { id: 'pub-1' }, + expect.objectContaining({ from_outside: true }), + ); + }); + + it('ignores malformed redis pubsub payloads', async () => { + await redisClient.publish('broadcast.webhook.events', 'not-json'); + await wait(); + + await redisClient.publish('broadcast.webhook.events', JSON.stringify({ + sourceId: 'other-instance', + events: [{ bad: 'shape' }], + })); + await wait(); + + expect(eventService.emit).not.toHaveBeenCalled(); + }); +});