refactor: optimize resources with gossip subsampling, memory leak fixes, and tuned constants

This commit is contained in:
lklynet
2026-01-06 16:05:23 -05:00
parent 32399602de
commit f9050a4448
5 changed files with 247 additions and 199 deletions
+23 -23
View File
@@ -6,7 +6,7 @@ const TOPIC = crypto.createHash("sha256").update(TOPIC_NAME).digest();
/**
* fccview here, frankly I don't think I can make this more secure, we can change it to `00000` but
* that means until everyone upgrade there'll be a divide between nodes.
*
*
* I ran it that way and I was fairly isolated, with hundreds of failed POW, shame.
* adding an extra 0 makes it very expensive on attacker to make it worth the fun for them, so maybe consider it.
* ----
@@ -16,35 +16,35 @@ const TOPIC = crypto.createHash("sha256").update(TOPIC_NAME).digest();
const MY_POW_PREFIX = "00000";
const VERIFICATION_POW_PREFIX = "0000";
const MAX_PEERS = parseInt(process.env.MAX_PEERS) || 1000000;
const MAX_PEERS = parseInt(process.env.MAX_PEERS) || 50000;
const MAX_MESSAGE_SIZE = 2048;
const MAX_RELAY_HOPS = 2;
const MAX_CONNECTIONS = 32;
const MAX_CONNECTIONS = 15;
const HEARTBEAT_INTERVAL = 5000;
const CONNECTION_ROTATION_INTERVAL = 30000;
const PEER_TIMEOUT = 15000;
const HEARTBEAT_INTERVAL = 30000;
const CONNECTION_ROTATION_INTERVAL = 300000;
const PEER_TIMEOUT = 45000;
const BROADCAST_THROTTLE = 1000;
const DIAGNOSTICS_INTERVAL = 10000;
const PORT = process.env.PORT || 3000;
const ENABLE_CHAT = process.env.ENABLE_CHAT === 'true';
const ENABLE_CHAT = process.env.ENABLE_CHAT === "true";
const CHAT_RATE_LIMIT = 5000;
module.exports = {
TOPIC_NAME,
TOPIC,
MY_POW_PREFIX,
VERIFICATION_POW_PREFIX,
MAX_PEERS,
MAX_MESSAGE_SIZE,
MAX_RELAY_HOPS,
MAX_CONNECTIONS,
HEARTBEAT_INTERVAL,
CONNECTION_ROTATION_INTERVAL,
PEER_TIMEOUT,
BROADCAST_THROTTLE,
DIAGNOSTICS_INTERVAL,
PORT,
ENABLE_CHAT,
CHAT_RATE_LIMIT,
TOPIC_NAME,
TOPIC,
MY_POW_PREFIX,
VERIFICATION_POW_PREFIX,
MAX_PEERS,
MAX_MESSAGE_SIZE,
MAX_RELAY_HOPS,
MAX_CONNECTIONS,
HEARTBEAT_INTERVAL,
CONNECTION_ROTATION_INTERVAL,
PEER_TIMEOUT,
BROADCAST_THROTTLE,
DIAGNOSTICS_INTERVAL,
PORT,
ENABLE_CHAT,
CHAT_RATE_LIMIT,
};
+190 -163
View File
@@ -1,197 +1,224 @@
const { verifyPoW, verifySignature, createPublicKey } = require("../core/security");
const {
verifyPoW,
verifySignature,
createPublicKey,
} = require("../core/security");
const { MAX_RELAY_HOPS, ENABLE_CHAT } = require("../config/constants");
const { BloomFilterManager } = require("../state/bloom");
class MessageHandler {
constructor(peerManager, diagnostics, relayCallback, broadcastCallback, chatCallback, chatSystemFn) {
this.peerManager = peerManager;
this.diagnostics = diagnostics;
this.relayCallback = relayCallback;
this.broadcastCallback = broadcastCallback;
this.chatCallback = chatCallback;
this.chatSystemFn = chatSystemFn;
this.bloomFilter = new BloomFilterManager();
this.bloomFilter.start();
this.chatRateLimits = new Map();
constructor(
peerManager,
diagnostics,
relayCallback,
broadcastCallback,
chatCallback,
chatSystemFn
) {
this.peerManager = peerManager;
this.diagnostics = diagnostics;
this.relayCallback = relayCallback;
this.broadcastCallback = broadcastCallback;
this.chatCallback = chatCallback;
this.chatSystemFn = chatSystemFn;
this.bloomFilter = new BloomFilterManager();
this.bloomFilter.start();
this.chatRateLimits = new Map();
}
handleMessage(msg, sourceSocket) {
if (!validateMessage(msg)) {
return;
}
handleMessage(msg, sourceSocket) {
if (!validateMessage(msg)) {
return;
}
if (msg.type === "HEARTBEAT") {
this.handleHeartbeat(msg, sourceSocket);
} else if (msg.type === "LEAVE") {
this.handleLeave(msg, sourceSocket);
} else if (msg.type === "CHAT") {
this.handleChat(msg, sourceSocket);
}
}
if (msg.type === "HEARTBEAT") {
this.handleHeartbeat(msg, sourceSocket);
} else if (msg.type === "LEAVE") {
this.handleLeave(msg, sourceSocket);
} else if (msg.type === "CHAT") {
this.handleChat(msg, sourceSocket);
}
handleHeartbeat(msg, sourceSocket) {
this.diagnostics.increment("heartbeatsReceived");
const { id, seq, hops, nonce, sig } = msg;
// Optimization: Check for duplicates BEFORE verifyPoW (CPU intensive)
const stored = this.peerManager.getPeer(id);
if (stored && seq <= stored.seq) {
this.diagnostics.increment("duplicateSeq");
return;
}
handleHeartbeat(msg, sourceSocket) {
this.diagnostics.increment("heartbeatsReceived");
const { id, seq, hops, nonce, sig } = msg;
if (!verifyPoW(id, nonce)) {
this.diagnostics.increment("invalidPoW");
return;
}
const stored = this.peerManager.getPeer(id);
if (stored && seq <= stored.seq) {
this.diagnostics.increment("duplicateSeq");
return;
}
if (!sig) return;
try {
// Check if we can accept new peers (only matters for new peers)
if (!stored && !this.peerManager.canAcceptPeer(id)) return;
// Derive public key on-demand from peer ID
const key = createPublicKey(id);
if (!verifySignature(`seq:${seq}`, sig, key)) {
this.diagnostics.increment("invalidSig");
return;
}
if (hops === 0) {
sourceSocket.peerId = id;
}
const getIp = (sock) => {
if (sock.remoteAddress) return sock.remoteAddress;
if (sock.rawStream && sock.rawStream.remoteHost) return sock.rawStream.remoteHost;
if (sock.rawStream && sock.rawStream.remoteAddress) return sock.rawStream.remoteAddress;
return null;
};
const ip = (hops === 0) ? getIp(sourceSocket) : null;
const wasNew = this.peerManager.addOrUpdatePeer(id, seq, key, ip);
if (wasNew) {
this.diagnostics.increment("newPeersAdded");
this.broadcastCallback();
if (ENABLE_CHAT && this.chatSystemFn && hops === 0) {
this.chatSystemFn({
type: "SYSTEM",
content: `Connection established with Node ...${id.slice(-8)}`,
timestamp: Date.now()
});
}
}
// Only relay if we haven't already relayed this message (bloom filter check)
if (hops < MAX_RELAY_HOPS && !this.bloomFilter.hasRelayed(id, seq)) {
this.bloomFilter.markRelayed(id, seq);
this.diagnostics.increment("heartbeatsRelayed");
this.relayCallback({ ...msg, hops: hops + 1 }, sourceSocket);
}
} catch (e) {
return;
}
if (!verifyPoW(id, nonce)) {
this.diagnostics.increment("invalidPoW");
return;
}
handleLeave(msg, sourceSocket) {
this.diagnostics.increment("leaveMessages");
const { id, hops, sig } = msg;
if (!sig) return;
if (!sig) return;
try {
// Check if we can accept new peers (only matters for new peers)
if (!stored && !this.peerManager.canAcceptPeer(id)) return;
// Only process leave messages for peers we know about
if (!this.peerManager.hasPeer(id)) return;
// Derive public key on-demand from peer ID
const key = createPublicKey(id);
// Derive public key on-demand from peer ID
const key = createPublicKey(id);
if (!verifySignature(`seq:${seq}`, sig, key)) {
this.diagnostics.increment("invalidSig");
return;
}
if (!verifySignature(`type:LEAVE:${id}`, sig, key)) {
this.diagnostics.increment("invalidSig");
return;
if (hops === 0) {
sourceSocket.peerId = id;
}
const getIp = (sock) => {
if (sock.remoteAddress) return sock.remoteAddress;
if (sock.rawStream && sock.rawStream.remoteHost)
return sock.rawStream.remoteHost;
if (sock.rawStream && sock.rawStream.remoteAddress)
return sock.rawStream.remoteAddress;
return null;
};
const ip = hops === 0 ? getIp(sourceSocket) : null;
const wasNew = this.peerManager.addOrUpdatePeer(id, seq, ip);
if (wasNew) {
this.diagnostics.increment("newPeersAdded");
this.broadcastCallback();
if (ENABLE_CHAT && this.chatSystemFn && hops === 0) {
this.chatSystemFn({
type: "SYSTEM",
content: `Connection established with Node ...${id.slice(-8)}`,
timestamp: Date.now(),
});
}
}
if (this.peerManager.hasPeer(id)) {
this.peerManager.removePeer(id);
this.broadcastCallback();
// Only relay if we haven't already relayed this message (bloom filter check)
if (hops < MAX_RELAY_HOPS && !this.bloomFilter.hasRelayed(id, seq)) {
this.bloomFilter.markRelayed(id, seq);
this.diagnostics.increment("heartbeatsRelayed");
this.relayCallback({ ...msg, hops: hops + 1 }, sourceSocket);
}
} catch (e) {
return;
}
}
if (ENABLE_CHAT && this.chatSystemFn && hops === 0) {
this.chatSystemFn({
type: "SYSTEM",
content: `Node ...${id.slice(-8)} disconnected.`,
timestamp: Date.now()
});
}
handleLeave(msg, sourceSocket) {
this.diagnostics.increment("leaveMessages");
const { id, hops, sig } = msg;
// Use id:leave as key for LEAVE messages
if (hops < MAX_RELAY_HOPS && !this.bloomFilter.hasRelayed(id, "leave")) {
this.bloomFilter.markRelayed(id, "leave");
this.relayCallback({ ...msg, hops: hops + 1 }, sourceSocket);
}
}
if (!sig) return;
// Only process leave messages for peers we know about
if (!this.peerManager.hasPeer(id)) return;
// Derive public key on-demand from peer ID
const key = createPublicKey(id);
if (!verifySignature(`type:LEAVE:${id}`, sig, key)) {
this.diagnostics.increment("invalidSig");
return;
}
handleChat(msg, sourceSocket) {
// Identity Verification: Ensure the sender matches the authenticated socket
if (!sourceSocket.peerId || sourceSocket.peerId !== msg.sender) {
return;
}
if (this.peerManager.hasPeer(id)) {
this.peerManager.removePeer(id);
this.broadcastCallback();
// Rate Limiting: Prevent flooding (5 messages per 10 seconds per peer)
const now = Date.now();
let rateData = this.chatRateLimits.get(msg.sender);
if (!rateData || now - rateData.windowStart > 10000) {
// Reset window
rateData = { count: 0, windowStart: now };
}
if (ENABLE_CHAT && this.chatSystemFn && hops === 0) {
this.chatSystemFn({
type: "SYSTEM",
content: `Node ...${id.slice(-8)} disconnected.`,
timestamp: Date.now(),
});
}
if (rateData.count >= 5) {
return; // Drop message
}
rateData.count++;
this.chatRateLimits.set(msg.sender, rateData);
if (this.chatCallback) {
this.chatCallback(msg);
}
// Use id:leave as key for LEAVE messages
if (hops < MAX_RELAY_HOPS && !this.bloomFilter.hasRelayed(id, "leave")) {
this.bloomFilter.markRelayed(id, "leave");
this.relayCallback({ ...msg, hops: hops + 1 }, sourceSocket);
}
}
}
handleChat(msg, sourceSocket) {
// Identity Verification: Ensure the sender matches the authenticated socket
if (!sourceSocket.peerId || sourceSocket.peerId !== msg.sender) {
return;
}
// Rate Limiting: Prevent flooding (5 messages per 10 seconds per peer)
const now = Date.now();
let rateData = this.chatRateLimits.get(msg.sender);
if (!rateData || now - rateData.windowStart > 10000) {
// Reset window
rateData = { count: 0, windowStart: now };
}
if (rateData.count >= 5) {
return; // Drop message
}
rateData.count++;
this.chatRateLimits.set(msg.sender, rateData);
if (this.chatCallback) {
this.chatCallback(msg);
}
}
}
const validateMessage = (msg) => {
if (!msg || typeof msg !== 'object') return false;
if (!msg.type) return false;
if (!msg || typeof msg !== "object") return false;
if (!msg.type) return false;
const msgSize = JSON.stringify(msg).length;
if (msgSize > require("../config/constants").MAX_MESSAGE_SIZE) return false;
const msgSize = JSON.stringify(msg).length;
if (msgSize > require("../config/constants").MAX_MESSAGE_SIZE) return false;
if (msg.type === "HEARTBEAT") {
const allowedFields = ['type', 'id', 'seq', 'hops', 'nonce', 'sig'];
const fields = Object.keys(msg);
return fields.every(f => allowedFields.includes(f)) &&
msg.id && typeof msg.seq === 'number' &&
typeof msg.hops === 'number' && msg.nonce && msg.sig;
}
if (msg.type === "HEARTBEAT") {
const allowedFields = ["type", "id", "seq", "hops", "nonce", "sig"];
const fields = Object.keys(msg);
return (
fields.every((f) => allowedFields.includes(f)) &&
msg.id &&
typeof msg.seq === "number" &&
typeof msg.hops === "number" &&
msg.nonce &&
msg.sig
);
}
if (msg.type === "LEAVE") {
const allowedFields = ['type', 'id', 'hops', 'sig'];
const fields = Object.keys(msg);
return fields.every(f => allowedFields.includes(f)) &&
msg.id && typeof msg.hops === 'number' && msg.sig;
}
if (msg.type === "LEAVE") {
const allowedFields = ["type", "id", "hops", "sig"];
const fields = Object.keys(msg);
return (
fields.every((f) => allowedFields.includes(f)) &&
msg.id &&
typeof msg.hops === "number" &&
msg.sig
);
}
if (msg.type === "CHAT") {
const allowedFields = ['type', 'sender', 'content', 'timestamp'];
const fields = Object.keys(msg);
return fields.every(f => allowedFields.includes(f)) &&
msg.sender &&
msg.content && typeof msg.content === 'string' && msg.content.length <= 140 &&
typeof msg.timestamp === 'number';
}
if (msg.type === "CHAT") {
const allowedFields = ["type", "sender", "content", "timestamp"];
const fields = Object.keys(msg);
return (
fields.every((f) => allowedFields.includes(f)) &&
msg.sender &&
msg.content &&
typeof msg.content === "string" &&
msg.content.length <= 140 &&
typeof msg.timestamp === "number"
);
}
return false;
}
return false;
};
module.exports = { MessageHandler, validateMessage };
+27 -10
View File
@@ -1,16 +1,33 @@
const relayMessage = (msg, sourceSocket, swarm, diagnostics) => {
const data = JSON.stringify(msg) + "\n";
const relayCount = swarm.connections.size - 1;
const data = JSON.stringify(msg) + "\n";
if (diagnostics) {
diagnostics.increment("bytesRelayed", data.length * relayCount);
}
// Gossip Subsampling:
// Instead of flooding everyone (which causes massive bandwidth usage with 50 connections),
// we relay to a random subset of peers (e.g., 6).
// This maintains "Epidemic" reach (O(log N)) while capping bandwidth.
for (const socket of swarm.connections) {
if (socket !== sourceSocket) {
socket.write(data);
}
const TARGET_GOSSIP_COUNT = 6;
const allSockets = Array.from(swarm.connections);
const eligible = allSockets.filter((s) => s !== sourceSocket);
let targets = eligible;
if (eligible.length > TARGET_GOSSIP_COUNT) {
// Fisher-Yates shuffle (partial) to pick random peers
for (let i = eligible.length - 1; i > 0; i--) {
const j = Math.floor(Math.random() * (i + 1));
[eligible[i], eligible[j]] = [eligible[j], eligible[i]];
}
}
targets = eligible.slice(0, TARGET_GOSSIP_COUNT);
}
if (diagnostics) {
diagnostics.increment("bytesRelayed", data.length * targets.length);
}
for (const socket of targets) {
socket.write(data);
}
};
module.exports = { relayMessage };
+1 -1
View File
@@ -3,7 +3,7 @@
* Prevents re-relaying messages we've already seen
*/
class BloomFilter {
constructor(size = 10000, hashCount = 3) {
constructor(size = 200000, hashCount = 3) {
this.size = size;
this.hashCount = hashCount;
this.bits = new Uint8Array(Math.ceil(size / 8));
+6 -2
View File
@@ -9,7 +9,7 @@ class PeerManager {
this.mySeq = 0;
}
addOrUpdatePeer(id, seq, key, ip = null) {
addOrUpdatePeer(id, seq, ip = null) {
const stored = this.seenPeers.get(id);
const wasNew = !stored;
@@ -19,7 +19,6 @@ class PeerManager {
this.seenPeers.set(id, {
seq,
lastSeen: Date.now(),
key,
ip: ip || (stored ? stored.ip : null),
});
@@ -51,6 +50,11 @@ class PeerManager {
if (now - data.lastSeen > PEER_TIMEOUT) {
this.seenPeers.delete(id);
removed++;
} else {
// Optimization: Since LRUCache maintains insertion order (updated on access),
// the Map is sorted by lastSeen (ascending).
// If we find a non-stale peer, all subsequent peers are also non-stale.
break;
}
}