Merge pull request #5 from kiliantyler/optimizations

Performance optimizations
This commit is contained in:
LKLY
2026-01-02 15:01:07 -05:00
committed by GitHub
+222 -16
View File
@@ -9,6 +9,155 @@ const PORT = process.env.PORT || 3000;
const TOPIC_NAME = "hypermind-lklynet-v1";
const TOPIC = crypto.createHash("sha256").update(TOPIC_NAME).digest();
// Gossip protocol tuning
const GOSSIP_FANOUT = 10; // Relay to max 10 random peers instead of all
const HEARTBEAT_INTERVAL_FAST = 1000; // 1 second during startup
const HEARTBEAT_INTERVAL_SLOW = 15000; // 15 seconds at steady state
const STARTUP_DURATION = 120000; // Stay in fast mode for 2 minutes
const PEER_STALE_TIMEOUT = 90000; // 90 seconds before peer considered stale
// --- BLOOM FILTER FOR MESSAGE DEDUPLICATION ---
// Simple bloom filter to prevent re-relaying messages we've already seen
class BloomFilter {
constructor(size = 10000, hashCount = 3) {
this.size = size;
this.hashCount = hashCount;
this.bits = new Uint8Array(Math.ceil(size / 8));
}
_hash(str, seed) {
let h = seed;
for (let i = 0; i < str.length; i++) {
h = (h * 31 + str.charCodeAt(i)) >>> 0;
}
return h % this.size;
}
add(item) {
for (let i = 0; i < this.hashCount; i++) {
const idx = this._hash(item, i * 0x9e3779b9);
this.bits[idx >>> 3] |= (1 << (idx & 7));
}
}
has(item) {
for (let i = 0; i < this.hashCount; i++) {
const idx = this._hash(item, i * 0x9e3779b9);
if ((this.bits[idx >>> 3] & (1 << (idx & 7))) === 0) {
return false;
}
}
return true;
}
clear() {
this.bits.fill(0);
}
}
// Time-bucketed bloom filter - rotates every 30 seconds
let currentBloom = new BloomFilter();
let previousBloom = new BloomFilter();
function rotateBloomFilters() {
previousBloom = currentBloom;
currentBloom = new BloomFilter();
}
// Check if we've recently relayed this message
function hasRelayedMessage(id, seq) {
const key = `${id}:${seq}`;
return currentBloom.has(key) || previousBloom.has(key);
}
// Mark message as relayed
function markRelayed(id, seq) {
const key = `${id}:${seq}`;
currentBloom.add(key);
}
// Rotate bloom filters periodically
setInterval(rotateBloomFilters, 30000);
// --- HYPERLOGLOG FOR PEER COUNTING ---
// Approximate unique peer count with fixed ~1.5KB memory
// Accuracy: ~2% error rate, can count millions of peers
class HyperLogLog {
constructor(precision = 10) {
// 2^precision registers, precision=10 gives 1024 registers (~1KB)
this.precision = precision;
this.registerCount = 1 << precision;
this.registers = new Uint8Array(this.registerCount);
this.alphaMM = this._getAlpha() * this.registerCount * this.registerCount;
}
_getAlpha() {
// Bias correction constant
switch (this.precision) {
case 4: return 0.673;
case 5: return 0.697;
case 6: return 0.709;
default: return 0.7213 / (1 + 1.079 / this.registerCount);
}
}
_hash(str) {
// Simple 32-bit hash (good enough for HLL)
let h = 0x811c9dc5;
for (let i = 0; i < str.length; i++) {
h ^= str.charCodeAt(i);
h = (h * 0x01000193) >>> 0;
}
return h;
}
_countLeadingZeros(value, maxBits) {
if (value === 0) return maxBits;
let count = 0;
while ((value & (1 << (maxBits - 1 - count))) === 0 && count < maxBits) {
count++;
}
return count;
}
add(item) {
const hash = this._hash(item);
// Use first 'precision' bits for register index
const registerIndex = hash >>> (32 - this.precision);
// Use remaining bits to count leading zeros
const remainingBits = hash << this.precision;
const leadingZeros = this._countLeadingZeros(remainingBits, 32 - this.precision) + 1;
// Store maximum leading zeros seen for this register
if (leadingZeros > this.registers[registerIndex]) {
this.registers[registerIndex] = leadingZeros;
}
}
count() {
// Harmonic mean of 2^register values
let harmonicSum = 0;
let zeroRegisters = 0;
for (let i = 0; i < this.registerCount; i++) {
harmonicSum += Math.pow(2, -this.registers[i]);
if (this.registers[i] === 0) zeroRegisters++;
}
let estimate = this.alphaMM / harmonicSum;
// Small range correction (linear counting)
if (estimate <= 2.5 * this.registerCount && zeroRegisters > 0) {
estimate = this.registerCount * Math.log(this.registerCount / zeroRegisters);
}
return Math.round(estimate);
}
}
// Global peer counter - tracks all unique peers ever seen
const peerCounter = new HyperLogLog(10); // ~1KB, 2% error
// --- SECURITY ---
// We use Ed25519 for signatures and a PoW puzzle to prevent Sybil attacks.
// Difficulty: Hash(ID + nonce) must start with '0000'
@@ -37,6 +186,7 @@ const seenPeers = new Map();
const sseClients = new Set();
seenPeers.set(MY_ID, { seq: mySeq, lastSeen: Date.now() });
peerCounter.add(MY_ID); // Count ourselves
// Throttle updates to once per second
let lastBroadcast = 0;
@@ -46,7 +196,7 @@ function broadcastUpdate() {
lastBroadcast = now;
const data = JSON.stringify({
count: seenPeers.size,
count: peerCounter.count(), // Use HyperLogLog for total peer count
direct: swarm.connections.size,
id: MY_ID,
});
@@ -59,6 +209,9 @@ function broadcastUpdate() {
const swarm = new Hyperswarm();
swarm.on("connection", (socket) => {
// Start adaptive heartbeat on first connection
startHeartbeatIfNeeded();
const sig = crypto
.sign(null, Buffer.from(`seq:${mySeq}`), privateKey)
.toString("hex");
@@ -141,6 +294,11 @@ function handleMessage(msg, sourceSocket) {
);
if (!verified) return; // Invalid Signature
// Track unique peer in HyperLogLog counter (always, even if over MAX_PEERS)
const prevCount = peerCounter.count();
peerCounter.add(id);
const countChanged = peerCounter.count() !== prevCount;
// Update Peer
if (hops === 0) {
sourceSocket.peerId = id;
@@ -149,11 +307,18 @@ function handleMessage(msg, sourceSocket) {
const now = Date.now();
const wasNew = !stored;
seenPeers.set(id, { seq, lastSeen: now, key });
// Store in seenPeers only if we have room (memory limit)
// But we still count and relay even if we can't store
const canStore = stored || seenPeers.size < MAX_PEERS;
if (canStore) {
seenPeers.set(id, { seq, lastSeen: now, keyDer });
}
if (wasNew) broadcastUpdate();
if ((wasNew && canStore) || countChanged) broadcastUpdate();
if (hops < 3) {
// Only relay if we haven't already relayed this message (bloom filter check)
if (hops < 3 && !hasRelayedMessage(id, seq)) {
markRelayed(id, seq);
relayMessage({ ...msg, hops: hops + 1 }, sourceSocket);
}
} catch (e) {
@@ -165,24 +330,52 @@ function handleMessage(msg, sourceSocket) {
seenPeers.delete(id);
broadcastUpdate();
if (hops < 3) {
// Use id:leave as key for LEAVE messages
if (hops < 3 && !hasRelayedMessage(id, "leave")) {
markRelayed(id, "leave");
relayMessage({ ...msg, hops: hops + 1 }, sourceSocket);
}
}
}
}
// Fisher-Yates shuffle for random peer selection
function shuffleArray(array) {
for (let i = array.length - 1; i > 0; i--) {
const j = Math.floor(Math.random() * (i + 1));
[array[i], array[j]] = [array[j], array[i]];
}
return array;
}
function relayMessage(msg, sourceSocket) {
const data = JSON.stringify(msg) + "\n";
for (const socket of swarm.connections) {
if (socket !== sourceSocket) {
socket.write(data);
}
// Get all eligible sockets (excluding source)
const eligibleSockets = [...swarm.connections].filter(s => s !== sourceSocket);
// Apply fanout limiting - only relay to GOSSIP_FANOUT random peers
const targetSockets = eligibleSockets.length <= GOSSIP_FANOUT
? eligibleSockets
: shuffleArray(eligibleSockets).slice(0, GOSSIP_FANOUT);
for (const socket of targetSockets) {
socket.write(data);
}
}
// Periodic Heartbeat
setInterval(() => {
// Adaptive Heartbeat - fast at startup, slows down after STARTUP_DURATION
// Timer starts when first connection is established, not at process start
let heartbeatStartTime = null;
let heartbeatStarted = false;
function getHeartbeatInterval() {
if (!heartbeatStartTime) return HEARTBEAT_INTERVAL_FAST;
const elapsed = Date.now() - heartbeatStartTime;
return elapsed < STARTUP_DURATION ? HEARTBEAT_INTERVAL_FAST : HEARTBEAT_INTERVAL_SLOW;
}
function sendHeartbeat() {
mySeq++;
seenPeers.set(MY_ID, { seq: mySeq, lastSeen: Date.now() });
@@ -206,14 +399,27 @@ setInterval(() => {
const now = Date.now();
let changed = false;
for (const [id, data] of seenPeers) {
if (now - data.lastSeen > 15000) {
if (now - data.lastSeen > PEER_STALE_TIMEOUT) {
seenPeers.delete(id);
changed = true;
}
}
if (changed) broadcastUpdate();
}, 5000);
// Schedule next heartbeat with adaptive interval
setTimeout(sendHeartbeat, getHeartbeatInterval());
}
// Start heartbeat loop on first connection
function startHeartbeatIfNeeded() {
if (!heartbeatStarted) {
heartbeatStarted = true;
heartbeatStartTime = Date.now();
console.log("[P2P] First connection established, starting fast heartbeat...");
sendHeartbeat();
}
}
// Graceful Shutdown
function handleShutdown() {
@@ -234,7 +440,7 @@ process.on("SIGTERM", handleShutdown);
// --- WEB SERVER ---
app.get("/", (req, res) => {
const count = seenPeers.size;
const count = peerCounter.count(); // HyperLogLog approximate count
const directPeers = swarm.connections.size;
res.send(`
@@ -410,7 +616,7 @@ app.get("/events", (req, res) => {
sseClients.add(res);
const data = JSON.stringify({
count: seenPeers.size,
count: peerCounter.count(),
direct: swarm.connections.size,
id: MY_ID,
});
@@ -423,7 +629,7 @@ app.get("/events", (req, res) => {
app.get("/api/stats", (req, res) => {
res.json({
count: seenPeers.size,
count: peerCounter.count(),
direct: swarm.connections.size,
id: MY_ID,
});