mirror of
https://github.com/HeyPuter/puter.git
synced 2026-05-04 00:20:45 +00:00
dev(backend): add broadcast webhook endpoint
This commit adds the "receive" side of webhook support for BroadcastService, which will eventually make broadcasting stateless and not requiring of persistent connections. Some specific considerations taken into account include: - incremental nonce to prevent replay attacks - request timestamp to prevent nonce-reuse after restarting - HMAC signature to ensure authorized peer Known limitations: - if instances run indefinitely, eventually the nonce value would wrap around to zero and broadcasts would stop working. It is assumed that 9 quintillion requests in the lifetime of an instance is reasonably impossible.
This commit is contained in:
@@ -20,6 +20,8 @@ const BaseService = require('../../services/BaseService');
|
||||
const { CLink } = require('./connection/CLink');
|
||||
const { SLink } = require('./connection/SLink');
|
||||
const { Context } = require('../../util/context');
|
||||
const { Endpoint } = require('../../util/expressutil');
|
||||
const crypto = require('crypto');
|
||||
|
||||
class BroadcastService extends BaseService {
|
||||
static MODULES = {
|
||||
@@ -31,21 +33,38 @@ class BroadcastService extends BaseService {
|
||||
this.peers_ = [];
|
||||
this.connections_ = [];
|
||||
this.trustedPublicKeys_ = {};
|
||||
this.peersByKey_ = {};
|
||||
this.webhookPeers_ = [];
|
||||
this.incomingLastNonceByPeer_ = new Map();
|
||||
}
|
||||
|
||||
async _init () {
|
||||
const peers = this.config.peers ?? [];
|
||||
const replayWindowSeconds = this.config.webhook_replay_window_seconds ?? 300;
|
||||
|
||||
for ( const peer_config of peers ) {
|
||||
this.trustedPublicKeys_[peer_config.key] = true;
|
||||
const peer = new CLink({
|
||||
keys: this.config.keys,
|
||||
config: peer_config,
|
||||
log: this.log,
|
||||
});
|
||||
this.peers_.push(peer);
|
||||
peer.connect();
|
||||
this.peersByKey_[peer_config.key] = {
|
||||
webhook_secret: peer_config.webhook_secret,
|
||||
webhook_url: peer_config.webhook_url,
|
||||
webhook: !!peer_config.webhook,
|
||||
};
|
||||
|
||||
if ( peer_config.webhook ) {
|
||||
this.webhookPeers_.push(peer_config);
|
||||
} else {
|
||||
const peer = new CLink({
|
||||
keys: this.config.keys,
|
||||
config: peer_config,
|
||||
log: this.log,
|
||||
});
|
||||
this.peers_.push(peer);
|
||||
peer.connect();
|
||||
}
|
||||
}
|
||||
|
||||
this.webhookReplayWindowSeconds_ = replayWindowSeconds;
|
||||
|
||||
this._register_commands(this.services.get('commands'));
|
||||
|
||||
const svc_event = this.services.get('event');
|
||||
@@ -64,6 +83,122 @@ class BroadcastService extends BaseService {
|
||||
}
|
||||
}
|
||||
|
||||
async ['__on_install.routes'] (_, { app }) {
|
||||
const svc_web = this.services.get('web-server');
|
||||
svc_web.allow_undefined_origin('/broadcast/webhook');
|
||||
|
||||
Endpoint({
|
||||
route: '/broadcast/webhook',
|
||||
methods: ['POST'],
|
||||
handler: this.handleWebhookRequest_.bind(this),
|
||||
}).attach(app);
|
||||
}
|
||||
|
||||
handleWebhookRequest_ (req, res) {
|
||||
const rawBody = req.rawBody;
|
||||
if ( rawBody === undefined || rawBody === null ) {
|
||||
res.status(400).send({ error: { message: 'Missing or invalid body' } });
|
||||
return;
|
||||
}
|
||||
|
||||
const body = req.body;
|
||||
if ( !body || typeof body !== 'object' ) {
|
||||
res.status(400).send({ error: { message: 'Invalid JSON body' } });
|
||||
return;
|
||||
}
|
||||
|
||||
// Validate required properties
|
||||
const { key, data, meta } = body;
|
||||
if ( key === undefined || key === null ) {
|
||||
res.status(400).send({ error: { message: 'Missing key' } });
|
||||
return;
|
||||
}
|
||||
if ( data === undefined ) {
|
||||
res.status(400).send({ error: { message: 'Missing data' } });
|
||||
return;
|
||||
}
|
||||
if ( meta === undefined ) {
|
||||
res.status(400).send({ error: { message: 'Missing meta' } });
|
||||
return;
|
||||
}
|
||||
|
||||
const peerId = req.headers['x-broadcast-peer-id'];
|
||||
if ( ! peerId ) {
|
||||
res.status(403).send({ error: { message: 'Missing X-Broadcast-Peer-Id' } });
|
||||
return;
|
||||
}
|
||||
|
||||
const peer = this.peersByKey_[peerId];
|
||||
if ( !peer || !peer.webhook_secret ) {
|
||||
res.status(403).send({ error: { message: 'Unknown peer or webhook not configured' } });
|
||||
return;
|
||||
}
|
||||
|
||||
// Timestamp avoids nonce-reuse after a restart
|
||||
const timestampHeader = req.headers['x-broadcast-timestamp'];
|
||||
if ( ! timestampHeader ) {
|
||||
res.status(400).send({ error: { message: 'Missing X-Broadcast-Timestamp' } });
|
||||
return;
|
||||
}
|
||||
const timestamp = Number(timestampHeader);
|
||||
if ( Number.isNaN(timestamp) ) {
|
||||
res.status(400).send({ error: { message: 'Invalid X-Broadcast-Timestamp' } });
|
||||
return;
|
||||
}
|
||||
const nowSeconds = Math.floor(Date.now() / 1000);
|
||||
const window = this.webhookReplayWindowSeconds_;
|
||||
if ( timestamp < nowSeconds - window || timestamp > nowSeconds + 60 ) {
|
||||
res.status(400).send({ error: { message: 'Timestamp out of window' } });
|
||||
return;
|
||||
}
|
||||
|
||||
// Nonce avoids replay attacks
|
||||
const nonceHeader = req.headers['x-broadcast-nonce'];
|
||||
if ( nonceHeader === undefined || nonceHeader === null || nonceHeader === '' ) {
|
||||
res.status(400).send({ error: { message: 'Missing X-Broadcast-Nonce' } });
|
||||
return;
|
||||
}
|
||||
const nonce = Number(nonceHeader);
|
||||
if ( Number.isNaN(nonce) ) {
|
||||
res.status(400).send({ error: { message: 'Invalid X-Broadcast-Nonce' } });
|
||||
return;
|
||||
}
|
||||
const lastNonce = this.incomingLastNonceByPeer_.get(peerId) ?? -1;
|
||||
if ( nonce <= lastNonce ) {
|
||||
res.status(403).send({ error: { message: 'Duplicate or stale nonce' } });
|
||||
return;
|
||||
}
|
||||
|
||||
// We verify a signature to ensure the message came from an authorized peer
|
||||
const signatureHeader = req.headers['x-broadcast-signature'];
|
||||
if ( ! signatureHeader ) {
|
||||
res.status(403).send({ error: { message: 'Missing X-Broadcast-Signature' } });
|
||||
return;
|
||||
}
|
||||
|
||||
const payloadToSign = `${timestamp}.${nonce}.${rawBody}`;
|
||||
const expectedHmac = crypto.createHmac('sha256', peer.webhook_secret).update(payloadToSign).digest('hex');
|
||||
const signatureBuffer = Buffer.from(signatureHeader, 'hex');
|
||||
const expectedBuffer = Buffer.from(expectedHmac, 'hex');
|
||||
if ( signatureBuffer.length !== expectedBuffer.length || !crypto.timingSafeEqual(signatureBuffer, expectedBuffer) ) {
|
||||
res.status(403).send({ error: { message: 'Invalid signature' } });
|
||||
return;
|
||||
}
|
||||
|
||||
this.incomingLastNonceByPeer_.set(peerId, nonce);
|
||||
|
||||
// We emit the event sent to this webhook endpoint so other services
|
||||
// can react to it. We set the `from_outside` flag to avoid feedback.
|
||||
const svc_event = this.services.get('event');
|
||||
const metaOut = { ...meta, from_outside: true };
|
||||
const context = Context.get(undefined, { allow_fallback: true });
|
||||
context.arun(async () => {
|
||||
await svc_event.emit(key, data, metaOut);
|
||||
});
|
||||
|
||||
res.status(200).send({ ok: true });
|
||||
}
|
||||
|
||||
async ['__on_install.websockets'] () {
|
||||
const svc_event = this.services.get('event');
|
||||
const svc_webServer = this.services.get('web-server');
|
||||
|
||||
Reference in New Issue
Block a user