diff --git a/src/backend/src/modules/broadcast/BroadcastService.js b/src/backend/src/modules/broadcast/BroadcastService.js index 0ae3dc853..44f9ec42e 100644 --- a/src/backend/src/modules/broadcast/BroadcastService.js +++ b/src/backend/src/modules/broadcast/BroadcastService.js @@ -20,6 +20,8 @@ const BaseService = require('../../services/BaseService'); const { CLink } = require('./connection/CLink'); const { SLink } = require('./connection/SLink'); const { Context } = require('../../util/context'); +const { Endpoint } = require('../../util/expressutil'); +const crypto = require('crypto'); class BroadcastService extends BaseService { static MODULES = { @@ -31,21 +33,38 @@ class BroadcastService extends BaseService { this.peers_ = []; this.connections_ = []; this.trustedPublicKeys_ = {}; + this.peersByKey_ = {}; + this.webhookPeers_ = []; + this.incomingLastNonceByPeer_ = new Map(); } async _init () { const peers = this.config.peers ?? []; + const replayWindowSeconds = this.config.webhook_replay_window_seconds ?? 300; + for ( const peer_config of peers ) { this.trustedPublicKeys_[peer_config.key] = true; - const peer = new CLink({ - keys: this.config.keys, - config: peer_config, - log: this.log, - }); - this.peers_.push(peer); - peer.connect(); + this.peersByKey_[peer_config.key] = { + webhook_secret: peer_config.webhook_secret, + webhook_url: peer_config.webhook_url, + webhook: !!peer_config.webhook, + }; + + if ( peer_config.webhook ) { + this.webhookPeers_.push(peer_config); + } else { + const peer = new CLink({ + keys: this.config.keys, + config: peer_config, + log: this.log, + }); + this.peers_.push(peer); + peer.connect(); + } } + this.webhookReplayWindowSeconds_ = replayWindowSeconds; + this._register_commands(this.services.get('commands')); const svc_event = this.services.get('event'); @@ -64,6 +83,122 @@ class BroadcastService extends BaseService { } } + async ['__on_install.routes'] (_, { app }) { + const svc_web = this.services.get('web-server'); + svc_web.allow_undefined_origin('/broadcast/webhook'); + + Endpoint({ + route: '/broadcast/webhook', + methods: ['POST'], + handler: this.handleWebhookRequest_.bind(this), + }).attach(app); + } + + handleWebhookRequest_ (req, res) { + const rawBody = req.rawBody; + if ( rawBody === undefined || rawBody === null ) { + res.status(400).send({ error: { message: 'Missing or invalid body' } }); + return; + } + + const body = req.body; + if ( !body || typeof body !== 'object' ) { + res.status(400).send({ error: { message: 'Invalid JSON body' } }); + return; + } + + // Validate required properties + const { key, data, meta } = body; + if ( key === undefined || key === null ) { + res.status(400).send({ error: { message: 'Missing key' } }); + return; + } + if ( data === undefined ) { + res.status(400).send({ error: { message: 'Missing data' } }); + return; + } + if ( meta === undefined ) { + res.status(400).send({ error: { message: 'Missing meta' } }); + return; + } + + const peerId = req.headers['x-broadcast-peer-id']; + if ( ! peerId ) { + res.status(403).send({ error: { message: 'Missing X-Broadcast-Peer-Id' } }); + return; + } + + const peer = this.peersByKey_[peerId]; + if ( !peer || !peer.webhook_secret ) { + res.status(403).send({ error: { message: 'Unknown peer or webhook not configured' } }); + return; + } + + // Timestamp avoids nonce-reuse after a restart + const timestampHeader = req.headers['x-broadcast-timestamp']; + if ( ! timestampHeader ) { + res.status(400).send({ error: { message: 'Missing X-Broadcast-Timestamp' } }); + return; + } + const timestamp = Number(timestampHeader); + if ( Number.isNaN(timestamp) ) { + res.status(400).send({ error: { message: 'Invalid X-Broadcast-Timestamp' } }); + return; + } + const nowSeconds = Math.floor(Date.now() / 1000); + const window = this.webhookReplayWindowSeconds_; + if ( timestamp < nowSeconds - window || timestamp > nowSeconds + 60 ) { + res.status(400).send({ error: { message: 'Timestamp out of window' } }); + return; + } + + // Nonce avoids replay attacks + const nonceHeader = req.headers['x-broadcast-nonce']; + if ( nonceHeader === undefined || nonceHeader === null || nonceHeader === '' ) { + res.status(400).send({ error: { message: 'Missing X-Broadcast-Nonce' } }); + return; + } + const nonce = Number(nonceHeader); + if ( Number.isNaN(nonce) ) { + res.status(400).send({ error: { message: 'Invalid X-Broadcast-Nonce' } }); + return; + } + const lastNonce = this.incomingLastNonceByPeer_.get(peerId) ?? -1; + if ( nonce <= lastNonce ) { + res.status(403).send({ error: { message: 'Duplicate or stale nonce' } }); + return; + } + + // We verify a signature to ensure the message came from an authorized peer + const signatureHeader = req.headers['x-broadcast-signature']; + if ( ! signatureHeader ) { + res.status(403).send({ error: { message: 'Missing X-Broadcast-Signature' } }); + return; + } + + const payloadToSign = `${timestamp}.${nonce}.${rawBody}`; + const expectedHmac = crypto.createHmac('sha256', peer.webhook_secret).update(payloadToSign).digest('hex'); + const signatureBuffer = Buffer.from(signatureHeader, 'hex'); + const expectedBuffer = Buffer.from(expectedHmac, 'hex'); + if ( signatureBuffer.length !== expectedBuffer.length || !crypto.timingSafeEqual(signatureBuffer, expectedBuffer) ) { + res.status(403).send({ error: { message: 'Invalid signature' } }); + return; + } + + this.incomingLastNonceByPeer_.set(peerId, nonce); + + // We emit the event sent to this webhook endpoint so other services + // can react to it. We set the `from_outside` flag to avoid feedback. + const svc_event = this.services.get('event'); + const metaOut = { ...meta, from_outside: true }; + const context = Context.get(undefined, { allow_fallback: true }); + context.arun(async () => { + await svc_event.emit(key, data, metaOut); + }); + + res.status(200).send({ ok: true }); + } + async ['__on_install.websockets'] () { const svc_event = this.services.get('event'); const svc_webServer = this.services.get('web-server');