feat: redis pubsub for multiple connected clients in broadcast service and webhook fixes (#2681)

* debug: logs for webhook broadcasts

* fix: broadcast webhook calls

* fix: add error log with body

* fix: fetch form

* feat: redis pubsub for multiple connected clients in broadcast service

* test: add pubsub tests
This commit is contained in:
Daniel Salazar
2026-03-17 18:45:23 -07:00
committed by GitHub
parent ef93ed4572
commit 9347644f81
2 changed files with 253 additions and 19 deletions
@@ -16,8 +16,11 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
import { createHmac, timingSafeEqual } from 'crypto';
import { createHmac, randomUUID, timingSafeEqual } from 'crypto';
import { Agent as HttpsAgent } from 'https';
import axios from 'axios';
import { Server as SocketIoServer } from 'socket.io';
import { redisClient } from '../../clients/redis/redisSingleton.js';
import { BaseService } from '../../services/BaseService.js';
import { Context } from '../../util/context.js';
import { Endpoint } from '../../util/expressutil.js';
@@ -38,11 +41,17 @@ export class BroadcastService extends BaseService {
#dedupFallbackCounter = 0;
#webhookReplayWindowSeconds = 300;
#outboundFlushMs = 5000;
#webhookHostHeader = null;
#webhookProtocol = 'https';
#webhookHttpsAgent = new HttpsAgent({ rejectUnauthorized: false });
#redisPubSubChannel = 'broadcast.webhook.events';
#redisSubscriber = null;
#redisSourceId = randomUUID();
async _init () {
const peers = this.config.peers ?? [];
const replayWindowSeconds = this.config.webhook_replay_window_seconds ?? 300;
const outboundFlushMs = Number(this.config.outbound_flush_ms ?? 5000);
const outboundFlushMs = Number(this.config.outbound_flush_ms ?? 2000);
for ( const peer_config of peers ) {
this.#trustedPublicKeys[peer_config.key] = true;
@@ -69,6 +78,14 @@ export class BroadcastService extends BaseService {
this.#outboundFlushMs = Number.isFinite(outboundFlushMs) && outboundFlushMs >= 0
? outboundFlushMs
: 5000;
this.#webhookHostHeader = this.global_config.domain;
{
const protocol = String(this.global_config.protocol ?? '').trim().replace(/:$/, '').toLowerCase();
this.#webhookProtocol = protocol === 'http' || protocol === 'https' ? protocol : 'https';
}
this.#redisSourceId = `${String(this.global_config?.server_id ?? 'local')}:${randomUUID()}`;
await this.#initRedisPubSub();
const svc_event = this.services.get('event');
svc_event.on('outer.*', this.outBroadcastEventHandler.bind(this));
@@ -100,11 +117,13 @@ export class BroadcastService extends BaseService {
#scheduleOutboundFlush () {
if ( this.#outboundFlushTimer ) return;
this.#outboundFlushTimer = setTimeout(() => {
this.#outboundFlushTimer = setTimeout(async () => {
this.#outboundFlushTimer = null;
this.#flushOutboundEvents().catch(error => {
try {
await this.#flushOutboundEvents();
} catch ( error ) {
console.warn('outbound broadcast flush failed', { error });
});
}
}, this.#outboundFlushMs);
}
@@ -129,7 +148,7 @@ export class BroadcastService extends BaseService {
try {
await this.#sendWebhookToPeer(peer_config, events);
} catch (e) {
console.warn(`webhook broadcast send error: ${ JSON.stringify({ peer: peer_config.key, error: e })}`);
console.warn(`webhook broadcast send error: ${ JSON.stringify({ peer: peer_config.key, error: e.message })}`);
}
}
} finally {
@@ -147,6 +166,97 @@ export class BroadcastService extends BaseService {
return meta;
}
async #initRedisPubSub () {
if ( typeof redisClient?.duplicate !== 'function' ) {
console.warn('redis pubsub unavailable; duplicate client is not supported');
return;
}
try {
this.#redisSubscriber = redisClient.duplicate();
this.#redisSubscriber.on('error', error => {
console.warn('redis pubsub subscriber error', { error });
});
this.#redisSubscriber.on('message', (channel, message) => {
this.#handleRedisPubSubMessage(channel, message).catch(error => {
console.warn('redis pubsub message handling error', { error });
});
});
await this.#redisSubscriber.subscribe(this.#redisPubSubChannel);
} catch ( error ) {
console.warn('failed to initialize redis pubsub subscriber', { error });
this.#redisSubscriber = null;
}
}
#isRedisWebhookEventKey (key) {
if ( typeof key !== 'string' ) return false;
return key === 'outer.gui' ||
key.startsWith('outer.gui.') ||
key === 'outer.pub' ||
key.startsWith('outer.pub.');
}
#filterRedisWebhookEvents (events) {
return events.filter(event => this.#isRedisWebhookEventKey(event?.key));
}
async #publishWebhookEventsToRedis (events) {
if ( !Array.isArray(events) || events.length === 0 ) return;
const eventsToPublish = this.#filterRedisWebhookEvents(events);
if ( eventsToPublish.length === 0 ) return;
let payload;
try {
payload = JSON.stringify({
sourceId: this.#redisSourceId,
events: eventsToPublish,
});
} catch ( error ) {
console.warn('redis pubsub publish failed: payload not serializable', { error });
return;
}
try {
await redisClient.publish(this.#redisPubSubChannel, payload);
} catch ( error ) {
console.warn('redis pubsub publish failed', { error });
}
}
async #handleRedisPubSubMessage (channel, message) {
if ( channel !== this.#redisPubSubChannel ) return;
let payload;
try {
payload = JSON.parse(message);
} catch {
console.warn('invalid redis pubsub payload: not json');
return;
}
if ( !payload || typeof payload !== 'object' || Array.isArray(payload) ) {
console.warn('invalid redis pubsub payload: expected object');
return;
}
if ( payload.sourceId && payload.sourceId === this.#redisSourceId ) {
return;
}
const incomingEvents = this.#normalizeIncomingPayload(payload);
if ( ! incomingEvents ) {
console.warn('invalid redis pubsub payload: invalid events');
return;
}
const eventsToEmit = this.#filterRedisWebhookEvents(incomingEvents);
if ( eventsToEmit.length === 0 ) return;
await this.#emitIncomingEventsSequentially(eventsToEmit);
}
#normalizeIncomingPayload (payload) {
if ( !payload || typeof payload !== 'object' || Array.isArray(payload) ) {
return null;
@@ -246,8 +356,6 @@ export class BroadcastService extends BaseService {
return;
}
console.log('received peerId', { value: peerId });
const peer = this.#peersByKey[peerId];
if ( !peer || !peer.webhook_secret ) {
res.status(403).send({ error: { message: 'Unknown peer or webhook not configured' } });
@@ -307,6 +415,7 @@ export class BroadcastService extends BaseService {
this.#incomingLastNonceByPeer.set(peerId, nonce);
await this.#publishWebhookEventsToRedis(incomingEvents);
this.#emitIncomingEventsSequentially(incomingEvents);
res.status(200).send({ ok: true });
@@ -315,8 +424,10 @@ export class BroadcastService extends BaseService {
async #sendWebhookToPeer (peer_config, events) {
const peerId = peer_config.key;
const url = peer_config.webhook_url;
const requestUrl = this.#normalizeWebhookUrl(url);
const mySecretKey = this.config.webhook?.secret ?? '';
if ( !url || !mySecretKey ) return;
if ( !requestUrl || !mySecretKey ) return;
let nextNonce = this.#outgoingNonceByPeer.get(peerId) ?? 0;
this.#outgoingNonceByPeer.set(peerId, nextNonce + 1);
@@ -329,23 +440,55 @@ export class BroadcastService extends BaseService {
const signature = createHmac('sha256', mySecretKey).update(payloadToSign).digest('hex');
const myPublicKey = this.config.webhook?.key ?? '';
const response = await fetch(url, {
const headers = {
'Content-Type': 'application/json',
'Content-Length': String(Buffer.byteLength(rawBody)),
'X-Broadcast-Peer-Id': myPublicKey,
'X-Broadcast-Timestamp': String(timestamp),
'X-Broadcast-Nonce': String(nextNonce),
'X-Broadcast-Signature': signature,
...(this.#webhookHostHeader ? { Host: this.#webhookHostHeader } : {}),
};
const response = await axios.request({
method: 'POST',
headers: {
'Content-Type': 'application/json',
'X-Broadcast-Peer-Id': myPublicKey,
'X-Broadcast-Timestamp': String(timestamp),
'X-Broadcast-Nonce': String(nextNonce),
'X-Broadcast-Signature': signature,
},
body: rawBody,
url: requestUrl,
headers,
data: rawBody,
timeout: 15000,
validateStatus: () => true,
responseType: 'text',
transformResponse: value => value,
...(requestUrl.startsWith('https:')
? { httpsAgent: this.#webhookHttpsAgent }
: {}),
});
if ( ! response.ok ) {
if ( response.status < 200 || response.status >= 300 ) {
console.warn(`error with body: ${response.data}`);
throw new Error(`Webhook POST failed: ${response.status} ${response.statusText}`);
}
}
#normalizeWebhookUrl (url) {
if ( typeof url !== 'string' || url.trim() === '' ) {
return null;
}
const urlValue = url.trim();
let parsedUrl;
try {
parsedUrl = urlValue.includes('://')
? new URL(urlValue)
: new URL(`${this.#webhookProtocol}://${urlValue}`);
} catch {
return null;
}
parsedUrl.protocol = `${this.#webhookProtocol}:`;
return parsedUrl.toString();
}
async '__on_install.websockets' () {
const svc_webServer = this.services.get('web-server');
@@ -0,0 +1,91 @@
import { afterAll, beforeAll, beforeEach, describe, expect, it, vi } from 'vitest';
import { redisClient } from '../../clients/redis/redisSingleton.js';
import { BroadcastService } from './BroadcastService.js';
const wait = (ms = 20) => new Promise(resolve => setTimeout(resolve, ms));
describe('BroadcastService redis pubsub', () => {
let eventService;
let service;
beforeAll(async () => {
eventService = {
on: vi.fn(),
emit: vi.fn(async () => {
}),
};
service = new BroadcastService({
services: {
get: (name) => {
if ( name === 'event' ) return eventService;
throw new Error(`unexpected service lookup: ${name}`);
},
},
config: {
domain: 'puter.com',
protocol: 'https',
server_id: 'test-broadcast-a',
services: {
broadcast: {
peers: [],
},
},
},
name: 'broadcast',
args: {},
context: {
get: () => ({ use: () => ({}) }),
},
});
await service._init();
});
afterAll(async () => {
});
beforeEach(() => {
eventService.emit.mockClear();
});
it('re-emits only outer.gui/pub events from redis pubsub payloads', async () => {
await redisClient.publish('broadcast.webhook.events', JSON.stringify({
sourceId: 'other-instance',
events: [
{ key: 'outer.gui.notif.message', data: { id: 'gui-1' }, meta: {} },
{ key: 'outer.pub.notice', data: { id: 'pub-1' }, meta: {} },
{ key: 'outer.cacheUpdate', data: { cacheKey: 'skip-me' }, meta: {} },
],
}));
await wait();
expect(eventService.emit).toHaveBeenCalledTimes(2);
expect(eventService.emit).toHaveBeenNthCalledWith(
1,
'outer.gui.notif.message',
{ id: 'gui-1' },
expect.objectContaining({ from_outside: true }),
);
expect(eventService.emit).toHaveBeenNthCalledWith(
2,
'outer.pub.notice',
{ id: 'pub-1' },
expect.objectContaining({ from_outside: true }),
);
});
it('ignores malformed redis pubsub payloads', async () => {
await redisClient.publish('broadcast.webhook.events', 'not-json');
await wait();
await redisClient.publish('broadcast.webhook.events', JSON.stringify({
sourceId: 'other-instance',
events: [{ bad: 'shape' }],
}));
await wait();
expect(eventService.emit).not.toHaveBeenCalled();
});
});