fix conflicts and create a class for Hyperloglog to keep with the refactored architecture

This commit is contained in:
Fernando Campione
2026-01-02 20:59:11 +00:00
7 changed files with 181 additions and 4 deletions
+9 -2
View File
@@ -1,5 +1,6 @@
const { verifyPoW, verifySignature, createPublicKey } = require("../core/security");
const { MAX_RELAY_HOPS } = require("../config/constants");
const { BloomFilterManager } = require("../state/bloom");
class MessageHandler {
constructor(peerManager, diagnostics, relayCallback, broadcastCallback) {
@@ -7,6 +8,8 @@ class MessageHandler {
this.diagnostics = diagnostics;
this.relayCallback = relayCallback;
this.broadcastCallback = broadcastCallback;
this.bloomFilter = new BloomFilterManager();
this.bloomFilter.start();
}
handleMessage(msg, sourceSocket) {
@@ -63,7 +66,9 @@ class MessageHandler {
this.broadcastCallback();
}
if (hops < MAX_RELAY_HOPS) {
// Only relay if we haven't already relayed this message (bloom filter check)
if (hops < MAX_RELAY_HOPS && !this.bloomFilter.hasRelayed(id, seq)) {
this.bloomFilter.markRelayed(id, seq);
this.diagnostics.increment("heartbeatsRelayed");
this.relayCallback({ ...msg, hops: hops + 1 }, sourceSocket);
}
@@ -90,7 +95,9 @@ class MessageHandler {
this.peerManager.removePeer(id);
this.broadcastCallback();
if (hops < MAX_RELAY_HOPS) {
// Use id:leave as key for LEAVE messages
if (hops < MAX_RELAY_HOPS && !this.bloomFilter.hasRelayed(id, "leave")) {
this.bloomFilter.markRelayed(id, "leave");
this.relayCallback({ ...msg, hops: hops + 1 }, sourceSocket);
}
}
+78
View File
@@ -0,0 +1,78 @@
/**
* Simple Bloom filter for message deduplication
* Prevents re-relaying messages we've already seen
*/
class BloomFilter {
constructor(size = 10000, hashCount = 3) {
this.size = size;
this.hashCount = hashCount;
this.bits = new Uint8Array(Math.ceil(size / 8));
}
_hash(str, seed) {
let h = seed;
for (let i = 0; i < str.length; i++) {
h = (h * 31 + str.charCodeAt(i)) >>> 0;
}
return h % this.size;
}
add(item) {
for (let i = 0; i < this.hashCount; i++) {
const idx = this._hash(item, i * 0x9e3779b9);
this.bits[idx >>> 3] |= (1 << (idx & 7));
}
}
has(item) {
for (let i = 0; i < this.hashCount; i++) {
const idx = this._hash(item, i * 0x9e3779b9);
if ((this.bits[idx >>> 3] & (1 << (idx & 7))) === 0) {
return false;
}
}
return true;
}
clear() {
this.bits.fill(0);
}
}
/**
* Time-bucketed bloom filter manager
* Rotates every 30 seconds to prevent unbounded growth
*/
class BloomFilterManager {
constructor() {
this.currentBloom = new BloomFilter();
this.previousBloom = new BloomFilter();
this.rotationInterval = null;
}
start() {
this.rotationInterval = setInterval(() => {
this.previousBloom = this.currentBloom;
this.currentBloom = new BloomFilter();
}, 30000);
}
stop() {
if (this.rotationInterval) {
clearInterval(this.rotationInterval);
this.rotationInterval = null;
}
}
hasRelayed(id, seq) {
const key = `${id}:${seq}`;
return this.currentBloom.has(key) || this.previousBloom.has(key);
}
markRelayed(id, seq) {
const key = `${id}:${seq}`;
this.currentBloom.add(key);
}
}
module.exports = { BloomFilter, BloomFilterManager };
+66
View File
@@ -0,0 +1,66 @@
class HyperLogLog {
constructor(precision = 10) {
this.precision = precision;
this.registerCount = 1 << precision;
this.registers = new Uint8Array(this.registerCount);
this.alphaMM = this._getAlpha() * this.registerCount * this.registerCount;
}
_getAlpha() {
switch (this.precision) {
case 4: return 0.673;
case 5: return 0.697;
case 6: return 0.709;
default: return 0.7213 / (1 + 1.079 / this.registerCount);
}
}
_hash(str) {
let h = 0x811c9dc5;
for (let i = 0; i < str.length; i++) {
h ^= str.charCodeAt(i);
h = (h * 0x01000193) >>> 0;
}
return h;
}
_countLeadingZeros(value, maxBits) {
if (value === 0) return maxBits;
let count = 0;
while ((value & (1 << (maxBits - 1 - count))) === 0 && count < maxBits) {
count++;
}
return count;
}
add(item) {
const hash = this._hash(item);
const registerIndex = hash >>> (32 - this.precision);
const remainingBits = hash << this.precision;
const leadingZeros = this._countLeadingZeros(remainingBits, 32 - this.precision) + 1;
if (leadingZeros > this.registers[registerIndex]) {
this.registers[registerIndex] = leadingZeros;
}
}
count() {
let harmonicSum = 0;
let zeroRegisters = 0;
for (let i = 0; i < this.registerCount; i++) {
harmonicSum += Math.pow(2, -this.registers[i]);
if (this.registers[i] === 0) zeroRegisters++;
}
let estimate = this.alphaMM / harmonicSum;
if (estimate <= 2.5 * this.registerCount && zeroRegisters > 0) {
estimate = this.registerCount * Math.log(this.registerCount / zeroRegisters);
}
return Math.round(estimate);
}
}
module.exports = { HyperLogLog };