diff --git a/server.js b/server.js index 077af81..d67f8a4 100644 --- a/server.js +++ b/server.js @@ -9,6 +9,155 @@ const PORT = process.env.PORT || 3000; const TOPIC_NAME = "hypermind-lklynet-v1"; const TOPIC = crypto.createHash("sha256").update(TOPIC_NAME).digest(); +// Gossip protocol tuning +const GOSSIP_FANOUT = 10; // Relay to max 10 random peers instead of all +const HEARTBEAT_INTERVAL_FAST = 1000; // 1 second during startup +const HEARTBEAT_INTERVAL_SLOW = 15000; // 15 seconds at steady state +const STARTUP_DURATION = 120000; // Stay in fast mode for 2 minutes +const PEER_STALE_TIMEOUT = 90000; // 90 seconds before peer considered stale + +// --- BLOOM FILTER FOR MESSAGE DEDUPLICATION --- +// Simple bloom filter to prevent re-relaying messages we've already seen +class BloomFilter { + constructor(size = 10000, hashCount = 3) { + this.size = size; + this.hashCount = hashCount; + this.bits = new Uint8Array(Math.ceil(size / 8)); + } + + _hash(str, seed) { + let h = seed; + for (let i = 0; i < str.length; i++) { + h = (h * 31 + str.charCodeAt(i)) >>> 0; + } + return h % this.size; + } + + add(item) { + for (let i = 0; i < this.hashCount; i++) { + const idx = this._hash(item, i * 0x9e3779b9); + this.bits[idx >>> 3] |= (1 << (idx & 7)); + } + } + + has(item) { + for (let i = 0; i < this.hashCount; i++) { + const idx = this._hash(item, i * 0x9e3779b9); + if ((this.bits[idx >>> 3] & (1 << (idx & 7))) === 0) { + return false; + } + } + return true; + } + + clear() { + this.bits.fill(0); + } +} + +// Time-bucketed bloom filter - rotates every 30 seconds +let currentBloom = new BloomFilter(); +let previousBloom = new BloomFilter(); + +function rotateBloomFilters() { + previousBloom = currentBloom; + currentBloom = new BloomFilter(); +} + +// Check if we've recently relayed this message +function hasRelayedMessage(id, seq) { + const key = `${id}:${seq}`; + return currentBloom.has(key) || previousBloom.has(key); +} + +// Mark message as relayed +function markRelayed(id, seq) { + const key = `${id}:${seq}`; + currentBloom.add(key); +} + +// Rotate bloom filters periodically +setInterval(rotateBloomFilters, 30000); + +// --- HYPERLOGLOG FOR PEER COUNTING --- +// Approximate unique peer count with fixed ~1.5KB memory +// Accuracy: ~2% error rate, can count millions of peers +class HyperLogLog { + constructor(precision = 10) { + // 2^precision registers, precision=10 gives 1024 registers (~1KB) + this.precision = precision; + this.registerCount = 1 << precision; + this.registers = new Uint8Array(this.registerCount); + this.alphaMM = this._getAlpha() * this.registerCount * this.registerCount; + } + + _getAlpha() { + // Bias correction constant + switch (this.precision) { + case 4: return 0.673; + case 5: return 0.697; + case 6: return 0.709; + default: return 0.7213 / (1 + 1.079 / this.registerCount); + } + } + + _hash(str) { + // Simple 32-bit hash (good enough for HLL) + let h = 0x811c9dc5; + for (let i = 0; i < str.length; i++) { + h ^= str.charCodeAt(i); + h = (h * 0x01000193) >>> 0; + } + return h; + } + + _countLeadingZeros(value, maxBits) { + if (value === 0) return maxBits; + let count = 0; + while ((value & (1 << (maxBits - 1 - count))) === 0 && count < maxBits) { + count++; + } + return count; + } + + add(item) { + const hash = this._hash(item); + // Use first 'precision' bits for register index + const registerIndex = hash >>> (32 - this.precision); + // Use remaining bits to count leading zeros + const remainingBits = hash << this.precision; + const leadingZeros = this._countLeadingZeros(remainingBits, 32 - this.precision) + 1; + + // Store maximum leading zeros seen for this register + if (leadingZeros > this.registers[registerIndex]) { + this.registers[registerIndex] = leadingZeros; + } + } + + count() { + // Harmonic mean of 2^register values + let harmonicSum = 0; + let zeroRegisters = 0; + + for (let i = 0; i < this.registerCount; i++) { + harmonicSum += Math.pow(2, -this.registers[i]); + if (this.registers[i] === 0) zeroRegisters++; + } + + let estimate = this.alphaMM / harmonicSum; + + // Small range correction (linear counting) + if (estimate <= 2.5 * this.registerCount && zeroRegisters > 0) { + estimate = this.registerCount * Math.log(this.registerCount / zeroRegisters); + } + + return Math.round(estimate); + } +} + +// Global peer counter - tracks all unique peers ever seen +const peerCounter = new HyperLogLog(10); // ~1KB, 2% error + // --- SECURITY --- // We use Ed25519 for signatures and a PoW puzzle to prevent Sybil attacks. // Difficulty: Hash(ID + nonce) must start with '0000' @@ -37,6 +186,7 @@ const seenPeers = new Map(); const sseClients = new Set(); seenPeers.set(MY_ID, { seq: mySeq, lastSeen: Date.now() }); +peerCounter.add(MY_ID); // Count ourselves // Throttle updates to once per second let lastBroadcast = 0; @@ -46,7 +196,7 @@ function broadcastUpdate() { lastBroadcast = now; const data = JSON.stringify({ - count: seenPeers.size, + count: peerCounter.count(), // Use HyperLogLog for total peer count direct: swarm.connections.size, id: MY_ID, }); @@ -59,6 +209,9 @@ function broadcastUpdate() { const swarm = new Hyperswarm(); swarm.on("connection", (socket) => { + // Start adaptive heartbeat on first connection + startHeartbeatIfNeeded(); + const sig = crypto .sign(null, Buffer.from(`seq:${mySeq}`), privateKey) .toString("hex"); @@ -141,6 +294,11 @@ function handleMessage(msg, sourceSocket) { ); if (!verified) return; // Invalid Signature + // Track unique peer in HyperLogLog counter (always, even if over MAX_PEERS) + const prevCount = peerCounter.count(); + peerCounter.add(id); + const countChanged = peerCounter.count() !== prevCount; + // Update Peer if (hops === 0) { sourceSocket.peerId = id; @@ -149,11 +307,18 @@ function handleMessage(msg, sourceSocket) { const now = Date.now(); const wasNew = !stored; - seenPeers.set(id, { seq, lastSeen: now, key }); + // Store in seenPeers only if we have room (memory limit) + // But we still count and relay even if we can't store + const canStore = stored || seenPeers.size < MAX_PEERS; + if (canStore) { + seenPeers.set(id, { seq, lastSeen: now, keyDer }); + } - if (wasNew) broadcastUpdate(); + if ((wasNew && canStore) || countChanged) broadcastUpdate(); - if (hops < 3) { + // Only relay if we haven't already relayed this message (bloom filter check) + if (hops < 3 && !hasRelayedMessage(id, seq)) { + markRelayed(id, seq); relayMessage({ ...msg, hops: hops + 1 }, sourceSocket); } } catch (e) { @@ -165,24 +330,52 @@ function handleMessage(msg, sourceSocket) { seenPeers.delete(id); broadcastUpdate(); - if (hops < 3) { + // Use id:leave as key for LEAVE messages + if (hops < 3 && !hasRelayedMessage(id, "leave")) { + markRelayed(id, "leave"); relayMessage({ ...msg, hops: hops + 1 }, sourceSocket); } } } } +// Fisher-Yates shuffle for random peer selection +function shuffleArray(array) { + for (let i = array.length - 1; i > 0; i--) { + const j = Math.floor(Math.random() * (i + 1)); + [array[i], array[j]] = [array[j], array[i]]; + } + return array; +} + function relayMessage(msg, sourceSocket) { const data = JSON.stringify(msg) + "\n"; - for (const socket of swarm.connections) { - if (socket !== sourceSocket) { - socket.write(data); - } + + // Get all eligible sockets (excluding source) + const eligibleSockets = [...swarm.connections].filter(s => s !== sourceSocket); + + // Apply fanout limiting - only relay to GOSSIP_FANOUT random peers + const targetSockets = eligibleSockets.length <= GOSSIP_FANOUT + ? eligibleSockets + : shuffleArray(eligibleSockets).slice(0, GOSSIP_FANOUT); + + for (const socket of targetSockets) { + socket.write(data); } } -// Periodic Heartbeat -setInterval(() => { +// Adaptive Heartbeat - fast at startup, slows down after STARTUP_DURATION +// Timer starts when first connection is established, not at process start +let heartbeatStartTime = null; +let heartbeatStarted = false; + +function getHeartbeatInterval() { + if (!heartbeatStartTime) return HEARTBEAT_INTERVAL_FAST; + const elapsed = Date.now() - heartbeatStartTime; + return elapsed < STARTUP_DURATION ? HEARTBEAT_INTERVAL_FAST : HEARTBEAT_INTERVAL_SLOW; +} + +function sendHeartbeat() { mySeq++; seenPeers.set(MY_ID, { seq: mySeq, lastSeen: Date.now() }); @@ -206,14 +399,27 @@ setInterval(() => { const now = Date.now(); let changed = false; for (const [id, data] of seenPeers) { - if (now - data.lastSeen > 15000) { + if (now - data.lastSeen > PEER_STALE_TIMEOUT) { seenPeers.delete(id); changed = true; } } if (changed) broadcastUpdate(); -}, 5000); + + // Schedule next heartbeat with adaptive interval + setTimeout(sendHeartbeat, getHeartbeatInterval()); +} + +// Start heartbeat loop on first connection +function startHeartbeatIfNeeded() { + if (!heartbeatStarted) { + heartbeatStarted = true; + heartbeatStartTime = Date.now(); + console.log("[P2P] First connection established, starting fast heartbeat..."); + sendHeartbeat(); + } +} // Graceful Shutdown function handleShutdown() { @@ -234,7 +440,7 @@ process.on("SIGTERM", handleShutdown); // --- WEB SERVER --- app.get("/", (req, res) => { - const count = seenPeers.size; + const count = peerCounter.count(); // HyperLogLog approximate count const directPeers = swarm.connections.size; res.send(` @@ -410,7 +616,7 @@ app.get("/events", (req, res) => { sseClients.add(res); const data = JSON.stringify({ - count: seenPeers.size, + count: peerCounter.count(), direct: swarm.connections.size, id: MY_ID, }); @@ -423,7 +629,7 @@ app.get("/events", (req, res) => { app.get("/api/stats", (req, res) => { res.json({ - count: seenPeers.size, + count: peerCounter.count(), direct: swarm.connections.size, id: MY_ID, });