From f9050a44488185240214d698b0acee6edc5ae9a9 Mon Sep 17 00:00:00 2001 From: lklynet Date: Tue, 6 Jan 2026 16:05:23 -0500 Subject: [PATCH] refactor: optimize resources with gossip subsampling, memory leak fixes, and tuned constants --- src/config/constants.js | 46 +++--- src/p2p/messaging.js | 353 +++++++++++++++++++++------------------- src/p2p/relay.js | 37 +++-- src/state/bloom.js | 2 +- src/state/peers.js | 8 +- 5 files changed, 247 insertions(+), 199 deletions(-) diff --git a/src/config/constants.js b/src/config/constants.js index 56aa5f4..0b76032 100644 --- a/src/config/constants.js +++ b/src/config/constants.js @@ -6,7 +6,7 @@ const TOPIC = crypto.createHash("sha256").update(TOPIC_NAME).digest(); /** * fccview here, frankly I don't think I can make this more secure, we can change it to `00000` but * that means until everyone upgrade there'll be a divide between nodes. - * + * * I ran it that way and I was fairly isolated, with hundreds of failed POW, shame. * adding an extra 0 makes it very expensive on attacker to make it worth the fun for them, so maybe consider it. * ---- @@ -16,35 +16,35 @@ const TOPIC = crypto.createHash("sha256").update(TOPIC_NAME).digest(); const MY_POW_PREFIX = "00000"; const VERIFICATION_POW_PREFIX = "0000"; -const MAX_PEERS = parseInt(process.env.MAX_PEERS) || 1000000; +const MAX_PEERS = parseInt(process.env.MAX_PEERS) || 50000; const MAX_MESSAGE_SIZE = 2048; const MAX_RELAY_HOPS = 2; -const MAX_CONNECTIONS = 32; +const MAX_CONNECTIONS = 15; -const HEARTBEAT_INTERVAL = 5000; -const CONNECTION_ROTATION_INTERVAL = 30000; -const PEER_TIMEOUT = 15000; +const HEARTBEAT_INTERVAL = 30000; +const CONNECTION_ROTATION_INTERVAL = 300000; +const PEER_TIMEOUT = 45000; const BROADCAST_THROTTLE = 1000; const DIAGNOSTICS_INTERVAL = 10000; const PORT = process.env.PORT || 3000; -const ENABLE_CHAT = process.env.ENABLE_CHAT === 'true'; +const ENABLE_CHAT = process.env.ENABLE_CHAT === "true"; const CHAT_RATE_LIMIT = 5000; module.exports = { - TOPIC_NAME, - TOPIC, - MY_POW_PREFIX, - VERIFICATION_POW_PREFIX, - MAX_PEERS, - MAX_MESSAGE_SIZE, - MAX_RELAY_HOPS, - MAX_CONNECTIONS, - HEARTBEAT_INTERVAL, - CONNECTION_ROTATION_INTERVAL, - PEER_TIMEOUT, - BROADCAST_THROTTLE, - DIAGNOSTICS_INTERVAL, - PORT, - ENABLE_CHAT, - CHAT_RATE_LIMIT, + TOPIC_NAME, + TOPIC, + MY_POW_PREFIX, + VERIFICATION_POW_PREFIX, + MAX_PEERS, + MAX_MESSAGE_SIZE, + MAX_RELAY_HOPS, + MAX_CONNECTIONS, + HEARTBEAT_INTERVAL, + CONNECTION_ROTATION_INTERVAL, + PEER_TIMEOUT, + BROADCAST_THROTTLE, + DIAGNOSTICS_INTERVAL, + PORT, + ENABLE_CHAT, + CHAT_RATE_LIMIT, }; diff --git a/src/p2p/messaging.js b/src/p2p/messaging.js index 6154e02..82dd178 100644 --- a/src/p2p/messaging.js +++ b/src/p2p/messaging.js @@ -1,197 +1,224 @@ -const { verifyPoW, verifySignature, createPublicKey } = require("../core/security"); +const { + verifyPoW, + verifySignature, + createPublicKey, +} = require("../core/security"); const { MAX_RELAY_HOPS, ENABLE_CHAT } = require("../config/constants"); const { BloomFilterManager } = require("../state/bloom"); class MessageHandler { - constructor(peerManager, diagnostics, relayCallback, broadcastCallback, chatCallback, chatSystemFn) { - this.peerManager = peerManager; - this.diagnostics = diagnostics; - this.relayCallback = relayCallback; - this.broadcastCallback = broadcastCallback; - this.chatCallback = chatCallback; - this.chatSystemFn = chatSystemFn; - this.bloomFilter = new BloomFilterManager(); - this.bloomFilter.start(); - this.chatRateLimits = new Map(); + constructor( + peerManager, + diagnostics, + relayCallback, + broadcastCallback, + chatCallback, + chatSystemFn + ) { + this.peerManager = peerManager; + this.diagnostics = diagnostics; + this.relayCallback = relayCallback; + this.broadcastCallback = broadcastCallback; + this.chatCallback = chatCallback; + this.chatSystemFn = chatSystemFn; + this.bloomFilter = new BloomFilterManager(); + this.bloomFilter.start(); + this.chatRateLimits = new Map(); + } + + handleMessage(msg, sourceSocket) { + if (!validateMessage(msg)) { + return; } - handleMessage(msg, sourceSocket) { - if (!validateMessage(msg)) { - return; - } + if (msg.type === "HEARTBEAT") { + this.handleHeartbeat(msg, sourceSocket); + } else if (msg.type === "LEAVE") { + this.handleLeave(msg, sourceSocket); + } else if (msg.type === "CHAT") { + this.handleChat(msg, sourceSocket); + } + } - if (msg.type === "HEARTBEAT") { - this.handleHeartbeat(msg, sourceSocket); - } else if (msg.type === "LEAVE") { - this.handleLeave(msg, sourceSocket); - } else if (msg.type === "CHAT") { - this.handleChat(msg, sourceSocket); - } + handleHeartbeat(msg, sourceSocket) { + this.diagnostics.increment("heartbeatsReceived"); + const { id, seq, hops, nonce, sig } = msg; + + // Optimization: Check for duplicates BEFORE verifyPoW (CPU intensive) + const stored = this.peerManager.getPeer(id); + if (stored && seq <= stored.seq) { + this.diagnostics.increment("duplicateSeq"); + return; } - handleHeartbeat(msg, sourceSocket) { - this.diagnostics.increment("heartbeatsReceived"); - const { id, seq, hops, nonce, sig } = msg; - - if (!verifyPoW(id, nonce)) { - this.diagnostics.increment("invalidPoW"); - return; - } - - const stored = this.peerManager.getPeer(id); - if (stored && seq <= stored.seq) { - this.diagnostics.increment("duplicateSeq"); - return; - } - - if (!sig) return; - - try { - // Check if we can accept new peers (only matters for new peers) - if (!stored && !this.peerManager.canAcceptPeer(id)) return; - - // Derive public key on-demand from peer ID - const key = createPublicKey(id); - - if (!verifySignature(`seq:${seq}`, sig, key)) { - this.diagnostics.increment("invalidSig"); - return; - } - - if (hops === 0) { - sourceSocket.peerId = id; - } - - const getIp = (sock) => { - if (sock.remoteAddress) return sock.remoteAddress; - if (sock.rawStream && sock.rawStream.remoteHost) return sock.rawStream.remoteHost; - if (sock.rawStream && sock.rawStream.remoteAddress) return sock.rawStream.remoteAddress; - return null; - }; - - const ip = (hops === 0) ? getIp(sourceSocket) : null; - const wasNew = this.peerManager.addOrUpdatePeer(id, seq, key, ip); - - if (wasNew) { - this.diagnostics.increment("newPeersAdded"); - this.broadcastCallback(); - if (ENABLE_CHAT && this.chatSystemFn && hops === 0) { - this.chatSystemFn({ - type: "SYSTEM", - content: `Connection established with Node ...${id.slice(-8)}`, - timestamp: Date.now() - }); - } - } - - // Only relay if we haven't already relayed this message (bloom filter check) - if (hops < MAX_RELAY_HOPS && !this.bloomFilter.hasRelayed(id, seq)) { - this.bloomFilter.markRelayed(id, seq); - this.diagnostics.increment("heartbeatsRelayed"); - this.relayCallback({ ...msg, hops: hops + 1 }, sourceSocket); - } - } catch (e) { - return; - } + if (!verifyPoW(id, nonce)) { + this.diagnostics.increment("invalidPoW"); + return; } - handleLeave(msg, sourceSocket) { - this.diagnostics.increment("leaveMessages"); - const { id, hops, sig } = msg; + if (!sig) return; - if (!sig) return; + try { + // Check if we can accept new peers (only matters for new peers) + if (!stored && !this.peerManager.canAcceptPeer(id)) return; - // Only process leave messages for peers we know about - if (!this.peerManager.hasPeer(id)) return; + // Derive public key on-demand from peer ID + const key = createPublicKey(id); - // Derive public key on-demand from peer ID - const key = createPublicKey(id); + if (!verifySignature(`seq:${seq}`, sig, key)) { + this.diagnostics.increment("invalidSig"); + return; + } - if (!verifySignature(`type:LEAVE:${id}`, sig, key)) { - this.diagnostics.increment("invalidSig"); - return; + if (hops === 0) { + sourceSocket.peerId = id; + } + + const getIp = (sock) => { + if (sock.remoteAddress) return sock.remoteAddress; + if (sock.rawStream && sock.rawStream.remoteHost) + return sock.rawStream.remoteHost; + if (sock.rawStream && sock.rawStream.remoteAddress) + return sock.rawStream.remoteAddress; + return null; + }; + + const ip = hops === 0 ? getIp(sourceSocket) : null; + const wasNew = this.peerManager.addOrUpdatePeer(id, seq, ip); + + if (wasNew) { + this.diagnostics.increment("newPeersAdded"); + this.broadcastCallback(); + if (ENABLE_CHAT && this.chatSystemFn && hops === 0) { + this.chatSystemFn({ + type: "SYSTEM", + content: `Connection established with Node ...${id.slice(-8)}`, + timestamp: Date.now(), + }); } + } - if (this.peerManager.hasPeer(id)) { - this.peerManager.removePeer(id); - this.broadcastCallback(); + // Only relay if we haven't already relayed this message (bloom filter check) + if (hops < MAX_RELAY_HOPS && !this.bloomFilter.hasRelayed(id, seq)) { + this.bloomFilter.markRelayed(id, seq); + this.diagnostics.increment("heartbeatsRelayed"); + this.relayCallback({ ...msg, hops: hops + 1 }, sourceSocket); + } + } catch (e) { + return; + } + } - if (ENABLE_CHAT && this.chatSystemFn && hops === 0) { - this.chatSystemFn({ - type: "SYSTEM", - content: `Node ...${id.slice(-8)} disconnected.`, - timestamp: Date.now() - }); - } + handleLeave(msg, sourceSocket) { + this.diagnostics.increment("leaveMessages"); + const { id, hops, sig } = msg; - // Use id:leave as key for LEAVE messages - if (hops < MAX_RELAY_HOPS && !this.bloomFilter.hasRelayed(id, "leave")) { - this.bloomFilter.markRelayed(id, "leave"); - this.relayCallback({ ...msg, hops: hops + 1 }, sourceSocket); - } - } + if (!sig) return; + + // Only process leave messages for peers we know about + if (!this.peerManager.hasPeer(id)) return; + + // Derive public key on-demand from peer ID + const key = createPublicKey(id); + + if (!verifySignature(`type:LEAVE:${id}`, sig, key)) { + this.diagnostics.increment("invalidSig"); + return; } - handleChat(msg, sourceSocket) { - // Identity Verification: Ensure the sender matches the authenticated socket - if (!sourceSocket.peerId || sourceSocket.peerId !== msg.sender) { - return; - } + if (this.peerManager.hasPeer(id)) { + this.peerManager.removePeer(id); + this.broadcastCallback(); - // Rate Limiting: Prevent flooding (5 messages per 10 seconds per peer) - const now = Date.now(); - let rateData = this.chatRateLimits.get(msg.sender); - - if (!rateData || now - rateData.windowStart > 10000) { - // Reset window - rateData = { count: 0, windowStart: now }; - } + if (ENABLE_CHAT && this.chatSystemFn && hops === 0) { + this.chatSystemFn({ + type: "SYSTEM", + content: `Node ...${id.slice(-8)} disconnected.`, + timestamp: Date.now(), + }); + } - if (rateData.count >= 5) { - return; // Drop message - } - - rateData.count++; - this.chatRateLimits.set(msg.sender, rateData); - - if (this.chatCallback) { - this.chatCallback(msg); - } + // Use id:leave as key for LEAVE messages + if (hops < MAX_RELAY_HOPS && !this.bloomFilter.hasRelayed(id, "leave")) { + this.bloomFilter.markRelayed(id, "leave"); + this.relayCallback({ ...msg, hops: hops + 1 }, sourceSocket); + } } + } + + handleChat(msg, sourceSocket) { + // Identity Verification: Ensure the sender matches the authenticated socket + if (!sourceSocket.peerId || sourceSocket.peerId !== msg.sender) { + return; + } + + // Rate Limiting: Prevent flooding (5 messages per 10 seconds per peer) + const now = Date.now(); + let rateData = this.chatRateLimits.get(msg.sender); + + if (!rateData || now - rateData.windowStart > 10000) { + // Reset window + rateData = { count: 0, windowStart: now }; + } + + if (rateData.count >= 5) { + return; // Drop message + } + + rateData.count++; + this.chatRateLimits.set(msg.sender, rateData); + + if (this.chatCallback) { + this.chatCallback(msg); + } + } } const validateMessage = (msg) => { - if (!msg || typeof msg !== 'object') return false; - if (!msg.type) return false; + if (!msg || typeof msg !== "object") return false; + if (!msg.type) return false; - const msgSize = JSON.stringify(msg).length; - if (msgSize > require("../config/constants").MAX_MESSAGE_SIZE) return false; + const msgSize = JSON.stringify(msg).length; + if (msgSize > require("../config/constants").MAX_MESSAGE_SIZE) return false; - if (msg.type === "HEARTBEAT") { - const allowedFields = ['type', 'id', 'seq', 'hops', 'nonce', 'sig']; - const fields = Object.keys(msg); - return fields.every(f => allowedFields.includes(f)) && - msg.id && typeof msg.seq === 'number' && - typeof msg.hops === 'number' && msg.nonce && msg.sig; - } + if (msg.type === "HEARTBEAT") { + const allowedFields = ["type", "id", "seq", "hops", "nonce", "sig"]; + const fields = Object.keys(msg); + return ( + fields.every((f) => allowedFields.includes(f)) && + msg.id && + typeof msg.seq === "number" && + typeof msg.hops === "number" && + msg.nonce && + msg.sig + ); + } - if (msg.type === "LEAVE") { - const allowedFields = ['type', 'id', 'hops', 'sig']; - const fields = Object.keys(msg); - return fields.every(f => allowedFields.includes(f)) && - msg.id && typeof msg.hops === 'number' && msg.sig; - } + if (msg.type === "LEAVE") { + const allowedFields = ["type", "id", "hops", "sig"]; + const fields = Object.keys(msg); + return ( + fields.every((f) => allowedFields.includes(f)) && + msg.id && + typeof msg.hops === "number" && + msg.sig + ); + } - if (msg.type === "CHAT") { - const allowedFields = ['type', 'sender', 'content', 'timestamp']; - const fields = Object.keys(msg); - return fields.every(f => allowedFields.includes(f)) && - msg.sender && - msg.content && typeof msg.content === 'string' && msg.content.length <= 140 && - typeof msg.timestamp === 'number'; - } + if (msg.type === "CHAT") { + const allowedFields = ["type", "sender", "content", "timestamp"]; + const fields = Object.keys(msg); + return ( + fields.every((f) => allowedFields.includes(f)) && + msg.sender && + msg.content && + typeof msg.content === "string" && + msg.content.length <= 140 && + typeof msg.timestamp === "number" + ); + } - return false; -} + return false; +}; module.exports = { MessageHandler, validateMessage }; diff --git a/src/p2p/relay.js b/src/p2p/relay.js index 1bc6745..70f708f 100644 --- a/src/p2p/relay.js +++ b/src/p2p/relay.js @@ -1,16 +1,33 @@ const relayMessage = (msg, sourceSocket, swarm, diagnostics) => { - const data = JSON.stringify(msg) + "\n"; - const relayCount = swarm.connections.size - 1; + const data = JSON.stringify(msg) + "\n"; - if (diagnostics) { - diagnostics.increment("bytesRelayed", data.length * relayCount); - } + // Gossip Subsampling: + // Instead of flooding everyone (which causes massive bandwidth usage with 50 connections), + // we relay to a random subset of peers (e.g., 6). + // This maintains "Epidemic" reach (O(log N)) while capping bandwidth. - for (const socket of swarm.connections) { - if (socket !== sourceSocket) { - socket.write(data); - } + const TARGET_GOSSIP_COUNT = 6; + const allSockets = Array.from(swarm.connections); + const eligible = allSockets.filter((s) => s !== sourceSocket); + + let targets = eligible; + + if (eligible.length > TARGET_GOSSIP_COUNT) { + // Fisher-Yates shuffle (partial) to pick random peers + for (let i = eligible.length - 1; i > 0; i--) { + const j = Math.floor(Math.random() * (i + 1)); + [eligible[i], eligible[j]] = [eligible[j], eligible[i]]; } -} + targets = eligible.slice(0, TARGET_GOSSIP_COUNT); + } + + if (diagnostics) { + diagnostics.increment("bytesRelayed", data.length * targets.length); + } + + for (const socket of targets) { + socket.write(data); + } +}; module.exports = { relayMessage }; diff --git a/src/state/bloom.js b/src/state/bloom.js index f0a6c04..378a16e 100644 --- a/src/state/bloom.js +++ b/src/state/bloom.js @@ -3,7 +3,7 @@ * Prevents re-relaying messages we've already seen */ class BloomFilter { - constructor(size = 10000, hashCount = 3) { + constructor(size = 200000, hashCount = 3) { this.size = size; this.hashCount = hashCount; this.bits = new Uint8Array(Math.ceil(size / 8)); diff --git a/src/state/peers.js b/src/state/peers.js index 5f6325b..7b7882d 100644 --- a/src/state/peers.js +++ b/src/state/peers.js @@ -9,7 +9,7 @@ class PeerManager { this.mySeq = 0; } - addOrUpdatePeer(id, seq, key, ip = null) { + addOrUpdatePeer(id, seq, ip = null) { const stored = this.seenPeers.get(id); const wasNew = !stored; @@ -19,7 +19,6 @@ class PeerManager { this.seenPeers.set(id, { seq, lastSeen: Date.now(), - key, ip: ip || (stored ? stored.ip : null), }); @@ -51,6 +50,11 @@ class PeerManager { if (now - data.lastSeen > PEER_TIMEOUT) { this.seenPeers.delete(id); removed++; + } else { + // Optimization: Since LRUCache maintains insertion order (updated on access), + // the Map is sorted by lastSeen (ascending). + // If we find a non-stale peer, all subsequent peers are also non-stale. + break; } }