mirror of
https://github.com/lklynet/hypermind.git
synced 2026-05-03 09:30:36 +00:00
Merge pull request #44 from lklynet/resource-refinement
refactor: optimize resources with gossip subsampling, memory leak fixes, and tuned constants
This commit is contained in:
+23
-23
@@ -6,7 +6,7 @@ const TOPIC = crypto.createHash("sha256").update(TOPIC_NAME).digest();
|
||||
/**
|
||||
* fccview here, frankly I don't think I can make this more secure, we can change it to `00000` but
|
||||
* that means until everyone upgrade there'll be a divide between nodes.
|
||||
*
|
||||
*
|
||||
* I ran it that way and I was fairly isolated, with hundreds of failed POW, shame.
|
||||
* adding an extra 0 makes it very expensive on attacker to make it worth the fun for them, so maybe consider it.
|
||||
* ----
|
||||
@@ -16,35 +16,35 @@ const TOPIC = crypto.createHash("sha256").update(TOPIC_NAME).digest();
|
||||
const MY_POW_PREFIX = "00000";
|
||||
const VERIFICATION_POW_PREFIX = "0000";
|
||||
|
||||
const MAX_PEERS = parseInt(process.env.MAX_PEERS) || 1000000;
|
||||
const MAX_PEERS = parseInt(process.env.MAX_PEERS) || 50000;
|
||||
const MAX_MESSAGE_SIZE = 2048;
|
||||
const MAX_RELAY_HOPS = 2;
|
||||
const MAX_CONNECTIONS = 32;
|
||||
const MAX_CONNECTIONS = 15;
|
||||
|
||||
const HEARTBEAT_INTERVAL = 5000;
|
||||
const CONNECTION_ROTATION_INTERVAL = 30000;
|
||||
const PEER_TIMEOUT = 15000;
|
||||
const HEARTBEAT_INTERVAL = 30000;
|
||||
const CONNECTION_ROTATION_INTERVAL = 300000;
|
||||
const PEER_TIMEOUT = 45000;
|
||||
const BROADCAST_THROTTLE = 1000;
|
||||
const DIAGNOSTICS_INTERVAL = 10000;
|
||||
const PORT = process.env.PORT || 3000;
|
||||
const ENABLE_CHAT = process.env.ENABLE_CHAT === 'true';
|
||||
const ENABLE_CHAT = process.env.ENABLE_CHAT === "true";
|
||||
const CHAT_RATE_LIMIT = 5000;
|
||||
|
||||
module.exports = {
|
||||
TOPIC_NAME,
|
||||
TOPIC,
|
||||
MY_POW_PREFIX,
|
||||
VERIFICATION_POW_PREFIX,
|
||||
MAX_PEERS,
|
||||
MAX_MESSAGE_SIZE,
|
||||
MAX_RELAY_HOPS,
|
||||
MAX_CONNECTIONS,
|
||||
HEARTBEAT_INTERVAL,
|
||||
CONNECTION_ROTATION_INTERVAL,
|
||||
PEER_TIMEOUT,
|
||||
BROADCAST_THROTTLE,
|
||||
DIAGNOSTICS_INTERVAL,
|
||||
PORT,
|
||||
ENABLE_CHAT,
|
||||
CHAT_RATE_LIMIT,
|
||||
TOPIC_NAME,
|
||||
TOPIC,
|
||||
MY_POW_PREFIX,
|
||||
VERIFICATION_POW_PREFIX,
|
||||
MAX_PEERS,
|
||||
MAX_MESSAGE_SIZE,
|
||||
MAX_RELAY_HOPS,
|
||||
MAX_CONNECTIONS,
|
||||
HEARTBEAT_INTERVAL,
|
||||
CONNECTION_ROTATION_INTERVAL,
|
||||
PEER_TIMEOUT,
|
||||
BROADCAST_THROTTLE,
|
||||
DIAGNOSTICS_INTERVAL,
|
||||
PORT,
|
||||
ENABLE_CHAT,
|
||||
CHAT_RATE_LIMIT,
|
||||
};
|
||||
|
||||
+190
-163
@@ -1,197 +1,224 @@
|
||||
const { verifyPoW, verifySignature, createPublicKey } = require("../core/security");
|
||||
const {
|
||||
verifyPoW,
|
||||
verifySignature,
|
||||
createPublicKey,
|
||||
} = require("../core/security");
|
||||
const { MAX_RELAY_HOPS, ENABLE_CHAT } = require("../config/constants");
|
||||
const { BloomFilterManager } = require("../state/bloom");
|
||||
|
||||
class MessageHandler {
|
||||
constructor(peerManager, diagnostics, relayCallback, broadcastCallback, chatCallback, chatSystemFn) {
|
||||
this.peerManager = peerManager;
|
||||
this.diagnostics = diagnostics;
|
||||
this.relayCallback = relayCallback;
|
||||
this.broadcastCallback = broadcastCallback;
|
||||
this.chatCallback = chatCallback;
|
||||
this.chatSystemFn = chatSystemFn;
|
||||
this.bloomFilter = new BloomFilterManager();
|
||||
this.bloomFilter.start();
|
||||
this.chatRateLimits = new Map();
|
||||
constructor(
|
||||
peerManager,
|
||||
diagnostics,
|
||||
relayCallback,
|
||||
broadcastCallback,
|
||||
chatCallback,
|
||||
chatSystemFn
|
||||
) {
|
||||
this.peerManager = peerManager;
|
||||
this.diagnostics = diagnostics;
|
||||
this.relayCallback = relayCallback;
|
||||
this.broadcastCallback = broadcastCallback;
|
||||
this.chatCallback = chatCallback;
|
||||
this.chatSystemFn = chatSystemFn;
|
||||
this.bloomFilter = new BloomFilterManager();
|
||||
this.bloomFilter.start();
|
||||
this.chatRateLimits = new Map();
|
||||
}
|
||||
|
||||
handleMessage(msg, sourceSocket) {
|
||||
if (!validateMessage(msg)) {
|
||||
return;
|
||||
}
|
||||
|
||||
handleMessage(msg, sourceSocket) {
|
||||
if (!validateMessage(msg)) {
|
||||
return;
|
||||
}
|
||||
if (msg.type === "HEARTBEAT") {
|
||||
this.handleHeartbeat(msg, sourceSocket);
|
||||
} else if (msg.type === "LEAVE") {
|
||||
this.handleLeave(msg, sourceSocket);
|
||||
} else if (msg.type === "CHAT") {
|
||||
this.handleChat(msg, sourceSocket);
|
||||
}
|
||||
}
|
||||
|
||||
if (msg.type === "HEARTBEAT") {
|
||||
this.handleHeartbeat(msg, sourceSocket);
|
||||
} else if (msg.type === "LEAVE") {
|
||||
this.handleLeave(msg, sourceSocket);
|
||||
} else if (msg.type === "CHAT") {
|
||||
this.handleChat(msg, sourceSocket);
|
||||
}
|
||||
handleHeartbeat(msg, sourceSocket) {
|
||||
this.diagnostics.increment("heartbeatsReceived");
|
||||
const { id, seq, hops, nonce, sig } = msg;
|
||||
|
||||
// Optimization: Check for duplicates BEFORE verifyPoW (CPU intensive)
|
||||
const stored = this.peerManager.getPeer(id);
|
||||
if (stored && seq <= stored.seq) {
|
||||
this.diagnostics.increment("duplicateSeq");
|
||||
return;
|
||||
}
|
||||
|
||||
handleHeartbeat(msg, sourceSocket) {
|
||||
this.diagnostics.increment("heartbeatsReceived");
|
||||
const { id, seq, hops, nonce, sig } = msg;
|
||||
|
||||
if (!verifyPoW(id, nonce)) {
|
||||
this.diagnostics.increment("invalidPoW");
|
||||
return;
|
||||
}
|
||||
|
||||
const stored = this.peerManager.getPeer(id);
|
||||
if (stored && seq <= stored.seq) {
|
||||
this.diagnostics.increment("duplicateSeq");
|
||||
return;
|
||||
}
|
||||
|
||||
if (!sig) return;
|
||||
|
||||
try {
|
||||
// Check if we can accept new peers (only matters for new peers)
|
||||
if (!stored && !this.peerManager.canAcceptPeer(id)) return;
|
||||
|
||||
// Derive public key on-demand from peer ID
|
||||
const key = createPublicKey(id);
|
||||
|
||||
if (!verifySignature(`seq:${seq}`, sig, key)) {
|
||||
this.diagnostics.increment("invalidSig");
|
||||
return;
|
||||
}
|
||||
|
||||
if (hops === 0) {
|
||||
sourceSocket.peerId = id;
|
||||
}
|
||||
|
||||
const getIp = (sock) => {
|
||||
if (sock.remoteAddress) return sock.remoteAddress;
|
||||
if (sock.rawStream && sock.rawStream.remoteHost) return sock.rawStream.remoteHost;
|
||||
if (sock.rawStream && sock.rawStream.remoteAddress) return sock.rawStream.remoteAddress;
|
||||
return null;
|
||||
};
|
||||
|
||||
const ip = (hops === 0) ? getIp(sourceSocket) : null;
|
||||
const wasNew = this.peerManager.addOrUpdatePeer(id, seq, key, ip);
|
||||
|
||||
if (wasNew) {
|
||||
this.diagnostics.increment("newPeersAdded");
|
||||
this.broadcastCallback();
|
||||
if (ENABLE_CHAT && this.chatSystemFn && hops === 0) {
|
||||
this.chatSystemFn({
|
||||
type: "SYSTEM",
|
||||
content: `Connection established with Node ...${id.slice(-8)}`,
|
||||
timestamp: Date.now()
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
// Only relay if we haven't already relayed this message (bloom filter check)
|
||||
if (hops < MAX_RELAY_HOPS && !this.bloomFilter.hasRelayed(id, seq)) {
|
||||
this.bloomFilter.markRelayed(id, seq);
|
||||
this.diagnostics.increment("heartbeatsRelayed");
|
||||
this.relayCallback({ ...msg, hops: hops + 1 }, sourceSocket);
|
||||
}
|
||||
} catch (e) {
|
||||
return;
|
||||
}
|
||||
if (!verifyPoW(id, nonce)) {
|
||||
this.diagnostics.increment("invalidPoW");
|
||||
return;
|
||||
}
|
||||
|
||||
handleLeave(msg, sourceSocket) {
|
||||
this.diagnostics.increment("leaveMessages");
|
||||
const { id, hops, sig } = msg;
|
||||
if (!sig) return;
|
||||
|
||||
if (!sig) return;
|
||||
try {
|
||||
// Check if we can accept new peers (only matters for new peers)
|
||||
if (!stored && !this.peerManager.canAcceptPeer(id)) return;
|
||||
|
||||
// Only process leave messages for peers we know about
|
||||
if (!this.peerManager.hasPeer(id)) return;
|
||||
// Derive public key on-demand from peer ID
|
||||
const key = createPublicKey(id);
|
||||
|
||||
// Derive public key on-demand from peer ID
|
||||
const key = createPublicKey(id);
|
||||
if (!verifySignature(`seq:${seq}`, sig, key)) {
|
||||
this.diagnostics.increment("invalidSig");
|
||||
return;
|
||||
}
|
||||
|
||||
if (!verifySignature(`type:LEAVE:${id}`, sig, key)) {
|
||||
this.diagnostics.increment("invalidSig");
|
||||
return;
|
||||
if (hops === 0) {
|
||||
sourceSocket.peerId = id;
|
||||
}
|
||||
|
||||
const getIp = (sock) => {
|
||||
if (sock.remoteAddress) return sock.remoteAddress;
|
||||
if (sock.rawStream && sock.rawStream.remoteHost)
|
||||
return sock.rawStream.remoteHost;
|
||||
if (sock.rawStream && sock.rawStream.remoteAddress)
|
||||
return sock.rawStream.remoteAddress;
|
||||
return null;
|
||||
};
|
||||
|
||||
const ip = hops === 0 ? getIp(sourceSocket) : null;
|
||||
const wasNew = this.peerManager.addOrUpdatePeer(id, seq, ip);
|
||||
|
||||
if (wasNew) {
|
||||
this.diagnostics.increment("newPeersAdded");
|
||||
this.broadcastCallback();
|
||||
if (ENABLE_CHAT && this.chatSystemFn && hops === 0) {
|
||||
this.chatSystemFn({
|
||||
type: "SYSTEM",
|
||||
content: `Connection established with Node ...${id.slice(-8)}`,
|
||||
timestamp: Date.now(),
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
if (this.peerManager.hasPeer(id)) {
|
||||
this.peerManager.removePeer(id);
|
||||
this.broadcastCallback();
|
||||
// Only relay if we haven't already relayed this message (bloom filter check)
|
||||
if (hops < MAX_RELAY_HOPS && !this.bloomFilter.hasRelayed(id, seq)) {
|
||||
this.bloomFilter.markRelayed(id, seq);
|
||||
this.diagnostics.increment("heartbeatsRelayed");
|
||||
this.relayCallback({ ...msg, hops: hops + 1 }, sourceSocket);
|
||||
}
|
||||
} catch (e) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
if (ENABLE_CHAT && this.chatSystemFn && hops === 0) {
|
||||
this.chatSystemFn({
|
||||
type: "SYSTEM",
|
||||
content: `Node ...${id.slice(-8)} disconnected.`,
|
||||
timestamp: Date.now()
|
||||
});
|
||||
}
|
||||
handleLeave(msg, sourceSocket) {
|
||||
this.diagnostics.increment("leaveMessages");
|
||||
const { id, hops, sig } = msg;
|
||||
|
||||
// Use id:leave as key for LEAVE messages
|
||||
if (hops < MAX_RELAY_HOPS && !this.bloomFilter.hasRelayed(id, "leave")) {
|
||||
this.bloomFilter.markRelayed(id, "leave");
|
||||
this.relayCallback({ ...msg, hops: hops + 1 }, sourceSocket);
|
||||
}
|
||||
}
|
||||
if (!sig) return;
|
||||
|
||||
// Only process leave messages for peers we know about
|
||||
if (!this.peerManager.hasPeer(id)) return;
|
||||
|
||||
// Derive public key on-demand from peer ID
|
||||
const key = createPublicKey(id);
|
||||
|
||||
if (!verifySignature(`type:LEAVE:${id}`, sig, key)) {
|
||||
this.diagnostics.increment("invalidSig");
|
||||
return;
|
||||
}
|
||||
|
||||
handleChat(msg, sourceSocket) {
|
||||
// Identity Verification: Ensure the sender matches the authenticated socket
|
||||
if (!sourceSocket.peerId || sourceSocket.peerId !== msg.sender) {
|
||||
return;
|
||||
}
|
||||
if (this.peerManager.hasPeer(id)) {
|
||||
this.peerManager.removePeer(id);
|
||||
this.broadcastCallback();
|
||||
|
||||
// Rate Limiting: Prevent flooding (5 messages per 10 seconds per peer)
|
||||
const now = Date.now();
|
||||
let rateData = this.chatRateLimits.get(msg.sender);
|
||||
|
||||
if (!rateData || now - rateData.windowStart > 10000) {
|
||||
// Reset window
|
||||
rateData = { count: 0, windowStart: now };
|
||||
}
|
||||
if (ENABLE_CHAT && this.chatSystemFn && hops === 0) {
|
||||
this.chatSystemFn({
|
||||
type: "SYSTEM",
|
||||
content: `Node ...${id.slice(-8)} disconnected.`,
|
||||
timestamp: Date.now(),
|
||||
});
|
||||
}
|
||||
|
||||
if (rateData.count >= 5) {
|
||||
return; // Drop message
|
||||
}
|
||||
|
||||
rateData.count++;
|
||||
this.chatRateLimits.set(msg.sender, rateData);
|
||||
|
||||
if (this.chatCallback) {
|
||||
this.chatCallback(msg);
|
||||
}
|
||||
// Use id:leave as key for LEAVE messages
|
||||
if (hops < MAX_RELAY_HOPS && !this.bloomFilter.hasRelayed(id, "leave")) {
|
||||
this.bloomFilter.markRelayed(id, "leave");
|
||||
this.relayCallback({ ...msg, hops: hops + 1 }, sourceSocket);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
handleChat(msg, sourceSocket) {
|
||||
// Identity Verification: Ensure the sender matches the authenticated socket
|
||||
if (!sourceSocket.peerId || sourceSocket.peerId !== msg.sender) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Rate Limiting: Prevent flooding (5 messages per 10 seconds per peer)
|
||||
const now = Date.now();
|
||||
let rateData = this.chatRateLimits.get(msg.sender);
|
||||
|
||||
if (!rateData || now - rateData.windowStart > 10000) {
|
||||
// Reset window
|
||||
rateData = { count: 0, windowStart: now };
|
||||
}
|
||||
|
||||
if (rateData.count >= 5) {
|
||||
return; // Drop message
|
||||
}
|
||||
|
||||
rateData.count++;
|
||||
this.chatRateLimits.set(msg.sender, rateData);
|
||||
|
||||
if (this.chatCallback) {
|
||||
this.chatCallback(msg);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const validateMessage = (msg) => {
|
||||
if (!msg || typeof msg !== 'object') return false;
|
||||
if (!msg.type) return false;
|
||||
if (!msg || typeof msg !== "object") return false;
|
||||
if (!msg.type) return false;
|
||||
|
||||
const msgSize = JSON.stringify(msg).length;
|
||||
if (msgSize > require("../config/constants").MAX_MESSAGE_SIZE) return false;
|
||||
const msgSize = JSON.stringify(msg).length;
|
||||
if (msgSize > require("../config/constants").MAX_MESSAGE_SIZE) return false;
|
||||
|
||||
if (msg.type === "HEARTBEAT") {
|
||||
const allowedFields = ['type', 'id', 'seq', 'hops', 'nonce', 'sig'];
|
||||
const fields = Object.keys(msg);
|
||||
return fields.every(f => allowedFields.includes(f)) &&
|
||||
msg.id && typeof msg.seq === 'number' &&
|
||||
typeof msg.hops === 'number' && msg.nonce && msg.sig;
|
||||
}
|
||||
if (msg.type === "HEARTBEAT") {
|
||||
const allowedFields = ["type", "id", "seq", "hops", "nonce", "sig"];
|
||||
const fields = Object.keys(msg);
|
||||
return (
|
||||
fields.every((f) => allowedFields.includes(f)) &&
|
||||
msg.id &&
|
||||
typeof msg.seq === "number" &&
|
||||
typeof msg.hops === "number" &&
|
||||
msg.nonce &&
|
||||
msg.sig
|
||||
);
|
||||
}
|
||||
|
||||
if (msg.type === "LEAVE") {
|
||||
const allowedFields = ['type', 'id', 'hops', 'sig'];
|
||||
const fields = Object.keys(msg);
|
||||
return fields.every(f => allowedFields.includes(f)) &&
|
||||
msg.id && typeof msg.hops === 'number' && msg.sig;
|
||||
}
|
||||
if (msg.type === "LEAVE") {
|
||||
const allowedFields = ["type", "id", "hops", "sig"];
|
||||
const fields = Object.keys(msg);
|
||||
return (
|
||||
fields.every((f) => allowedFields.includes(f)) &&
|
||||
msg.id &&
|
||||
typeof msg.hops === "number" &&
|
||||
msg.sig
|
||||
);
|
||||
}
|
||||
|
||||
if (msg.type === "CHAT") {
|
||||
const allowedFields = ['type', 'sender', 'content', 'timestamp'];
|
||||
const fields = Object.keys(msg);
|
||||
return fields.every(f => allowedFields.includes(f)) &&
|
||||
msg.sender &&
|
||||
msg.content && typeof msg.content === 'string' && msg.content.length <= 140 &&
|
||||
typeof msg.timestamp === 'number';
|
||||
}
|
||||
if (msg.type === "CHAT") {
|
||||
const allowedFields = ["type", "sender", "content", "timestamp"];
|
||||
const fields = Object.keys(msg);
|
||||
return (
|
||||
fields.every((f) => allowedFields.includes(f)) &&
|
||||
msg.sender &&
|
||||
msg.content &&
|
||||
typeof msg.content === "string" &&
|
||||
msg.content.length <= 140 &&
|
||||
typeof msg.timestamp === "number"
|
||||
);
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
return false;
|
||||
};
|
||||
|
||||
module.exports = { MessageHandler, validateMessage };
|
||||
|
||||
+27
-10
@@ -1,16 +1,33 @@
|
||||
const relayMessage = (msg, sourceSocket, swarm, diagnostics) => {
|
||||
const data = JSON.stringify(msg) + "\n";
|
||||
const relayCount = swarm.connections.size - 1;
|
||||
const data = JSON.stringify(msg) + "\n";
|
||||
|
||||
if (diagnostics) {
|
||||
diagnostics.increment("bytesRelayed", data.length * relayCount);
|
||||
}
|
||||
// Gossip Subsampling:
|
||||
// Instead of flooding everyone (which causes massive bandwidth usage with 50 connections),
|
||||
// we relay to a random subset of peers (e.g., 6).
|
||||
// This maintains "Epidemic" reach (O(log N)) while capping bandwidth.
|
||||
|
||||
for (const socket of swarm.connections) {
|
||||
if (socket !== sourceSocket) {
|
||||
socket.write(data);
|
||||
}
|
||||
const TARGET_GOSSIP_COUNT = 6;
|
||||
const allSockets = Array.from(swarm.connections);
|
||||
const eligible = allSockets.filter((s) => s !== sourceSocket);
|
||||
|
||||
let targets = eligible;
|
||||
|
||||
if (eligible.length > TARGET_GOSSIP_COUNT) {
|
||||
// Fisher-Yates shuffle (partial) to pick random peers
|
||||
for (let i = eligible.length - 1; i > 0; i--) {
|
||||
const j = Math.floor(Math.random() * (i + 1));
|
||||
[eligible[i], eligible[j]] = [eligible[j], eligible[i]];
|
||||
}
|
||||
}
|
||||
targets = eligible.slice(0, TARGET_GOSSIP_COUNT);
|
||||
}
|
||||
|
||||
if (diagnostics) {
|
||||
diagnostics.increment("bytesRelayed", data.length * targets.length);
|
||||
}
|
||||
|
||||
for (const socket of targets) {
|
||||
socket.write(data);
|
||||
}
|
||||
};
|
||||
|
||||
module.exports = { relayMessage };
|
||||
|
||||
+1
-1
@@ -3,7 +3,7 @@
|
||||
* Prevents re-relaying messages we've already seen
|
||||
*/
|
||||
class BloomFilter {
|
||||
constructor(size = 10000, hashCount = 3) {
|
||||
constructor(size = 200000, hashCount = 3) {
|
||||
this.size = size;
|
||||
this.hashCount = hashCount;
|
||||
this.bits = new Uint8Array(Math.ceil(size / 8));
|
||||
|
||||
+6
-2
@@ -9,7 +9,7 @@ class PeerManager {
|
||||
this.mySeq = 0;
|
||||
}
|
||||
|
||||
addOrUpdatePeer(id, seq, key, ip = null) {
|
||||
addOrUpdatePeer(id, seq, ip = null) {
|
||||
const stored = this.seenPeers.get(id);
|
||||
const wasNew = !stored;
|
||||
|
||||
@@ -19,7 +19,6 @@ class PeerManager {
|
||||
this.seenPeers.set(id, {
|
||||
seq,
|
||||
lastSeen: Date.now(),
|
||||
key,
|
||||
ip: ip || (stored ? stored.ip : null),
|
||||
});
|
||||
|
||||
@@ -51,6 +50,11 @@ class PeerManager {
|
||||
if (now - data.lastSeen > PEER_TIMEOUT) {
|
||||
this.seenPeers.delete(id);
|
||||
removed++;
|
||||
} else {
|
||||
// Optimization: Since LRUCache maintains insertion order (updated on access),
|
||||
// the Map is sorted by lastSeen (ascending).
|
||||
// If we find a non-stale peer, all subsequent peers are also non-stale.
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user