mirror of
https://github.com/dgtlmoon/changedetection.io.git
synced 2026-01-13 18:50:22 +00:00
Compare commits
19 Commits
0.52.1
...
brotli-mor
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e3a967db57 | ||
|
|
13f18dc7ea | ||
|
|
8bc99eb0ce | ||
|
|
4f2f1e094a | ||
|
|
c58710bf4c | ||
|
|
4552ff25b5 | ||
|
|
87bce81d5a | ||
|
|
170d6652c8 | ||
|
|
819af84db6 | ||
|
|
96dfd67633 | ||
|
|
e48ab5afc2 | ||
|
|
70a0ee77f0 | ||
|
|
aabe818024 | ||
|
|
95ed02d99e | ||
|
|
380d5862f7 | ||
|
|
5f6e346a35 | ||
|
|
41321889bb | ||
|
|
6f37efb0ca | ||
|
|
e51ff34c89 |
@@ -17,36 +17,48 @@ from loguru import logger
|
||||
# Async version of update_worker
|
||||
# Processes jobs from AsyncSignalPriorityQueue instead of threaded queue
|
||||
|
||||
async def async_update_worker(worker_id, q, notification_q, app, datastore):
|
||||
async def async_update_worker(worker_id, q, notification_q, app, datastore, executor=None):
|
||||
"""
|
||||
Async worker function that processes watch check jobs from the queue.
|
||||
|
||||
|
||||
Args:
|
||||
worker_id: Unique identifier for this worker
|
||||
q: AsyncSignalPriorityQueue containing jobs to process
|
||||
notification_q: Standard queue for notifications
|
||||
app: Flask application instance
|
||||
datastore: Application datastore
|
||||
executor: ThreadPoolExecutor for queue operations (optional)
|
||||
"""
|
||||
# Set a descriptive name for this task
|
||||
task = asyncio.current_task()
|
||||
if task:
|
||||
task.set_name(f"async-worker-{worker_id}")
|
||||
|
||||
|
||||
logger.info(f"Starting async worker {worker_id}")
|
||||
|
||||
|
||||
while not app.config.exit.is_set():
|
||||
update_handler = None
|
||||
watch = None
|
||||
|
||||
try:
|
||||
# Use native janus async interface - no threads needed!
|
||||
queued_item_data = await asyncio.wait_for(q.async_get(), timeout=1.0)
|
||||
# Use sync interface via run_in_executor since each worker has its own event loop
|
||||
loop = asyncio.get_event_loop()
|
||||
queued_item_data = await asyncio.wait_for(
|
||||
loop.run_in_executor(executor, q.get, True, 1.0), # block=True, timeout=1.0
|
||||
timeout=1.5
|
||||
)
|
||||
|
||||
except asyncio.TimeoutError:
|
||||
# No jobs available, continue loop
|
||||
continue
|
||||
except Exception as e:
|
||||
# Handle expected Empty exception from queue timeout
|
||||
import queue
|
||||
if isinstance(e, queue.Empty):
|
||||
# Queue is empty, normal behavior - just continue
|
||||
continue
|
||||
|
||||
# Unexpected exception - log as critical
|
||||
logger.critical(f"CRITICAL: Worker {worker_id} failed to get queue item: {type(e).__name__}: {e}")
|
||||
|
||||
# Log queue health for debugging
|
||||
@@ -414,14 +426,13 @@ async def async_update_worker(worker_id, q, notification_q, app, datastore):
|
||||
datastore.update_watch(uuid=uuid, update_obj={'last_error': f"Worker error: {str(e)}"})
|
||||
|
||||
finally:
|
||||
|
||||
try:
|
||||
await update_handler.fetcher.quit(watch=watch)
|
||||
except Exception as e:
|
||||
logger.error(f"Exception while cleaning/quit after calling browser: {e}")
|
||||
|
||||
# Always cleanup - this runs whether there was an exception or not
|
||||
if uuid:
|
||||
try:
|
||||
if update_handler and hasattr(update_handler, 'fetcher') and update_handler.fetcher:
|
||||
await update_handler.fetcher.quit(watch=watch)
|
||||
except Exception as e:
|
||||
logger.error(f"Exception while cleaning/quit after calling browser: {e}")
|
||||
try:
|
||||
# Mark UUID as no longer being processed by this worker
|
||||
worker_handler.set_uuid_processing(uuid, worker_id=worker_id, processing=False)
|
||||
@@ -460,7 +471,9 @@ async def async_update_worker(worker_id, q, notification_q, app, datastore):
|
||||
logger.debug(f"Worker {worker_id} completed watch {uuid} in {time.time()-fetch_start_time:.2f}s")
|
||||
except Exception as cleanup_error:
|
||||
logger.error(f"Worker {worker_id} error during cleanup: {cleanup_error}")
|
||||
|
||||
|
||||
del(uuid)
|
||||
|
||||
# Brief pause before continuing to avoid tight error loops (only on error)
|
||||
if 'e' in locals():
|
||||
await asyncio.sleep(1.0)
|
||||
|
||||
@@ -92,7 +92,12 @@ def construct_blueprint(datastore: ChangeDetectionStore):
|
||||
|
||||
# Be sure we're written fresh
|
||||
datastore.sync_to_json()
|
||||
zip_thread = threading.Thread(target=create_backup, args=(datastore.datastore_path, datastore.data.get("watching")))
|
||||
zip_thread = threading.Thread(
|
||||
target=create_backup,
|
||||
args=(datastore.datastore_path, datastore.data.get("watching")),
|
||||
daemon=True,
|
||||
name="BackupCreator"
|
||||
)
|
||||
zip_thread.start()
|
||||
backup_threads.append(zip_thread)
|
||||
flash(gettext("Backup building in background, check back in a few minutes."))
|
||||
|
||||
@@ -47,9 +47,6 @@ def construct_single_watch_routes(rss_blueprint, datastore):
|
||||
if len(dates) < 2:
|
||||
return f"Watch {uuid} does not have enough history snapshots to show changes (need at least 2)", 400
|
||||
|
||||
# Add uuid to watch for proper functioning
|
||||
watch['uuid'] = uuid
|
||||
|
||||
# Get the number of diffs to include (default: 5)
|
||||
rss_diff_length = datastore.data['settings']['application'].get('rss_diff_length', 5)
|
||||
|
||||
@@ -101,7 +98,7 @@ def construct_single_watch_routes(rss_blueprint, datastore):
|
||||
date_index_from, date_index_to)
|
||||
|
||||
# Create and populate feed entry
|
||||
guid = f"{watch['uuid']}/{timestamp_to}"
|
||||
guid = f"{uuid}/{timestamp_to}"
|
||||
fe = fg.add_entry()
|
||||
title_suffix = f"Change @ {res['original_context']['change_datetime']}"
|
||||
populate_feed_entry(fe, watch, res.get('body', ''), guid, timestamp_to,
|
||||
|
||||
@@ -63,11 +63,8 @@ def construct_tag_routes(rss_blueprint, datastore):
|
||||
|
||||
# Only include unviewed watches
|
||||
if not watch.viewed:
|
||||
# Add uuid to watch for proper functioning
|
||||
watch['uuid'] = uuid
|
||||
|
||||
# Include a link to the diff page
|
||||
diff_link = {'href': url_for('ui.ui_diff.diff_history_page', uuid=watch['uuid'], _external=True)}
|
||||
# Include a link to the diff page (use uuid from loop, don't modify watch dict)
|
||||
diff_link = {'href': url_for('ui.ui_diff.diff_history_page', uuid=uuid, _external=True)}
|
||||
|
||||
# Get watch label
|
||||
watch_label = get_watch_label(datastore, watch)
|
||||
|
||||
@@ -57,14 +57,15 @@ class SignalPriorityQueue(queue.PriorityQueue):
|
||||
def put(self, item, block=True, timeout=None):
|
||||
# Call the parent's put method first
|
||||
super().put(item, block, timeout)
|
||||
|
||||
|
||||
# After putting the item in the queue, check if it has a UUID and emit signal
|
||||
if hasattr(item, 'item') and isinstance(item.item, dict) and 'uuid' in item.item:
|
||||
uuid = item.item['uuid']
|
||||
# Get the signal and send it if it exists
|
||||
watch_check_update = signal('watch_check_update')
|
||||
if watch_check_update:
|
||||
# Send the watch_uuid parameter
|
||||
# NOTE: This would block other workers from .put/.get while this signal sends
|
||||
# Signal handlers may iterate the queue/datastore while holding locks
|
||||
watch_check_update.send(watch_uuid=uuid)
|
||||
|
||||
# Send queue_length signal with current queue size
|
||||
@@ -312,14 +313,15 @@ class AsyncSignalPriorityQueue(asyncio.PriorityQueue):
|
||||
async def put(self, item):
|
||||
# Call the parent's put method first
|
||||
await super().put(item)
|
||||
|
||||
|
||||
# After putting the item in the queue, check if it has a UUID and emit signal
|
||||
if hasattr(item, 'item') and isinstance(item.item, dict) and 'uuid' in item.item:
|
||||
uuid = item.item['uuid']
|
||||
# Get the signal and send it if it exists
|
||||
watch_check_update = signal('watch_check_update')
|
||||
if watch_check_update:
|
||||
# Send the watch_uuid parameter
|
||||
# NOTE: This would block other workers from .put/.get while this signal sends
|
||||
# Signal handlers may iterate the queue/datastore while holding locks
|
||||
watch_check_update.send(watch_uuid=uuid)
|
||||
|
||||
# Send queue_length signal with current queue size
|
||||
|
||||
@@ -863,13 +863,13 @@ def changedetection_app(config=None, datastore_o=None):
|
||||
worker_handler.start_workers(n_workers, update_q, notification_q, app, datastore)
|
||||
|
||||
# @todo handle ctrl break
|
||||
ticker_thread = threading.Thread(target=ticker_thread_check_time_launch_checks).start()
|
||||
threading.Thread(target=notification_runner).start()
|
||||
ticker_thread = threading.Thread(target=ticker_thread_check_time_launch_checks, daemon=True, name="TickerThread-ScheduleChecker").start()
|
||||
threading.Thread(target=notification_runner, daemon=True, name="NotificationRunner").start()
|
||||
|
||||
in_pytest = "pytest" in sys.modules or "PYTEST_CURRENT_TEST" in os.environ
|
||||
# Check for new release version, but not when running in test/build or pytest
|
||||
if not os.getenv("GITHUB_REF", False) and not strtobool(os.getenv('DISABLE_VERSION_CHECK', 'no')) and not in_pytest:
|
||||
threading.Thread(target=check_for_new_version).start()
|
||||
threading.Thread(target=check_for_new_version, daemon=True, name="VersionChecker").start()
|
||||
|
||||
# Return the Flask app - the Socket.IO will be attached to it but initialized separately
|
||||
# This avoids circular dependencies
|
||||
|
||||
@@ -10,9 +10,13 @@ from pathlib import Path
|
||||
from loguru import logger
|
||||
|
||||
from .. import jinja2_custom as safe_jinja
|
||||
from ..diff import ADDED_PLACEMARKER_OPEN
|
||||
from ..html_tools import TRANSLATE_WHITESPACE_TABLE
|
||||
|
||||
FAVICON_RESAVE_THRESHOLD_SECONDS=86400
|
||||
BROTLI_COMPRESS_SIZE_THRESHOLD = int(os.getenv('SNAPSHOT_BROTLI_COMPRESSION_THRESHOLD', 1024))
|
||||
|
||||
minimum_seconds_recheck_time = int(os.getenv('MINIMUM_SECONDS_RECHECK_TIME', 3))
|
||||
mtable = {'seconds': 1, 'minutes': 60, 'hours': 3600, 'days': 86400, 'weeks': 86400 * 7}
|
||||
|
||||
def _brotli_compress_worker(conn, filepath, mode=None):
|
||||
"""
|
||||
@@ -29,6 +33,7 @@ def _brotli_compress_worker(conn, filepath, mode=None):
|
||||
try:
|
||||
# Receive data from parent process via pipe (avoids pickle overhead)
|
||||
contents = conn.recv()
|
||||
logger.debug(f"Starting brotli compression of {len(contents)} bytes.")
|
||||
|
||||
if mode is not None:
|
||||
compressed_data = brotli.compress(contents, mode=mode)
|
||||
@@ -40,9 +45,10 @@ def _brotli_compress_worker(conn, filepath, mode=None):
|
||||
|
||||
# Send success status back
|
||||
conn.send(True)
|
||||
logger.debug(f"Finished brotli compression - From {len(contents)} to {len(compressed_data)} bytes.")
|
||||
# No need for explicit cleanup - process exit frees all memory
|
||||
except Exception as e:
|
||||
logger.error(f"Brotli compression worker failed: {e}")
|
||||
logger.critical(f"Brotli compression worker failed: {e}")
|
||||
conn.send(False)
|
||||
finally:
|
||||
conn.close()
|
||||
@@ -66,7 +72,6 @@ def _brotli_subprocess_save(contents, filepath, mode=None, timeout=30, fallback_
|
||||
Raises:
|
||||
Exception: if compression fails and fallback_uncompressed is False
|
||||
"""
|
||||
import brotli
|
||||
import multiprocessing
|
||||
import sys
|
||||
|
||||
@@ -144,11 +149,6 @@ def _brotli_subprocess_save(contents, filepath, mode=None, timeout=30, fallback_
|
||||
else:
|
||||
raise Exception(f"Brotli compression subprocess failed for {filepath}")
|
||||
|
||||
FAVICON_RESAVE_THRESHOLD_SECONDS=86400
|
||||
|
||||
|
||||
minimum_seconds_recheck_time = int(os.getenv('MINIMUM_SECONDS_RECHECK_TIME', 3))
|
||||
mtable = {'seconds': 1, 'minutes': 60, 'hours': 3600, 'days': 86400, 'weeks': 86400 * 7}
|
||||
|
||||
class model(watch_base):
|
||||
__newest_history_key = None
|
||||
@@ -492,7 +492,6 @@ class model(watch_base):
|
||||
|
||||
self.ensure_data_dir_exists()
|
||||
|
||||
threshold = int(os.getenv('SNAPSHOT_BROTLI_COMPRESSION_THRESHOLD', 1024))
|
||||
skip_brotli = strtobool(os.getenv('DISABLE_BROTLI_TEXT_SNAPSHOT', 'False'))
|
||||
|
||||
# Binary data - detect file type and save without compression
|
||||
@@ -516,7 +515,7 @@ class model(watch_base):
|
||||
|
||||
# Text data - use brotli compression if enabled and above threshold
|
||||
else:
|
||||
if not skip_brotli and len(contents) > threshold:
|
||||
if not skip_brotli and len(contents) > BROTLI_COMPRESS_SIZE_THRESHOLD:
|
||||
# Compressed text
|
||||
import brotli
|
||||
snapshot_fname = f"{snapshot_id}.txt.br"
|
||||
|
||||
@@ -86,6 +86,7 @@ class RecheckPriorityQueue:
|
||||
|
||||
def get(self, block: bool = True, timeout: Optional[float] = None):
|
||||
"""Thread-safe sync get with priority ordering"""
|
||||
import queue
|
||||
try:
|
||||
# Wait for notification
|
||||
self.sync_q.get(block=block, timeout=timeout)
|
||||
@@ -103,8 +104,11 @@ class RecheckPriorityQueue:
|
||||
logger.debug(f"Successfully retrieved item: {self._get_item_uuid(item)}")
|
||||
return item
|
||||
|
||||
except queue.Empty:
|
||||
# Queue is empty with timeout - expected behavior, re-raise without logging
|
||||
raise
|
||||
except Exception as e:
|
||||
logger.critical(f"CRITICAL: Failed to get item from queue: {str(e)}")
|
||||
# Re-raise without logging - caller (worker) will handle and log appropriately
|
||||
raise
|
||||
|
||||
# ASYNC INTERFACE (for workers)
|
||||
|
||||
@@ -98,11 +98,12 @@ pytest -vv -s --maxfail=1 tests/test_rss.py
|
||||
pytest -vv -s --maxfail=1 tests/test_unique_lines.py
|
||||
|
||||
# Try high concurrency
|
||||
FETCH_WORKERS=130 pytest tests/test_history_consistency.py -v -l
|
||||
FETCH_WORKERS=50 pytest tests/test_history_consistency.py -vv -l -s
|
||||
|
||||
# Check file:// will pickup a file when enabled
|
||||
echo "Hello world" > /tmp/test-file.txt
|
||||
ALLOW_FILE_URI=yes pytest -vv -s tests/test_security.py
|
||||
|
||||
|
||||
|
||||
# Run it again so that brotli kicks in
|
||||
TEST_WITH_BROTLI=1 SNAPSHOT_BROTLI_COMPRESSION_THRESHOLD=100 FETCH_WORKERS=20 pytest tests/test_history_consistency.py -vv -l -s
|
||||
|
||||
@@ -53,7 +53,7 @@
|
||||
}
|
||||
}
|
||||
|
||||
.language-modal {
|
||||
#language-modal {
|
||||
.language-list {
|
||||
.lang-option {
|
||||
display: inline-block;
|
||||
|
||||
File diff suppressed because one or more lines are too long
@@ -186,7 +186,7 @@ class ChangeDetectionStore:
|
||||
# Finally start the thread that will manage periodic data saves to JSON
|
||||
# Only start if thread is not already running (reload_state might be called multiple times)
|
||||
if not self.save_data_thread or not self.save_data_thread.is_alive():
|
||||
self.save_data_thread = threading.Thread(target=self.save_datastore)
|
||||
self.save_data_thread = threading.Thread(target=self.save_datastore, daemon=True, name="DatastoreSaver")
|
||||
self.save_data_thread.start()
|
||||
|
||||
def rehydrate_entity(self, uuid, entity, processor_override=None):
|
||||
|
||||
@@ -17,6 +17,8 @@ _MAP = {
|
||||
|
||||
|
||||
def strtobool(value):
|
||||
if not value:
|
||||
return False
|
||||
try:
|
||||
return _MAP[str(value).lower()]
|
||||
except KeyError:
|
||||
|
||||
@@ -270,3 +270,6 @@ def app(request, datastore_path):
|
||||
|
||||
request.addfinalizer(teardown)
|
||||
yield app
|
||||
|
||||
|
||||
|
||||
|
||||
@@ -206,11 +206,10 @@ def test_regex_error_handling(client, live_server, measure_memory_usage, datasto
|
||||
# Add our URL to the import page
|
||||
test_url = url_for('test_endpoint', _external=True)
|
||||
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
|
||||
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
|
||||
|
||||
time.sleep(0.2)
|
||||
### test regex error handling
|
||||
res = client.post(
|
||||
url_for("ui.ui_edit.edit_page", uuid="first"),
|
||||
url_for("ui.ui_edit.edit_page", uuid=uuid),
|
||||
data={"extract_text": '/something bad\d{3/XYZ',
|
||||
"url": test_url,
|
||||
"fetch_backend": "html_requests",
|
||||
|
||||
@@ -4,25 +4,47 @@ import time
|
||||
import os
|
||||
import json
|
||||
from flask import url_for
|
||||
from loguru import logger
|
||||
from .. import strtobool
|
||||
from .util import wait_for_all_checks, delete_all_watches
|
||||
from urllib.parse import urlparse, parse_qs
|
||||
import brotli
|
||||
|
||||
|
||||
def test_consistent_history(client, live_server, measure_memory_usage, datastore_path):
|
||||
# live_server_setup(live_server) # Setup on conftest per function
|
||||
workers = int(os.getenv("FETCH_WORKERS", 10))
|
||||
r = range(1, 10+workers)
|
||||
|
||||
for one in r:
|
||||
test_url = url_for('test_endpoint', content_type="text/html", content=str(one), _external=True)
|
||||
res = client.post(
|
||||
url_for("imports.import_page"),
|
||||
data={"urls": test_url},
|
||||
follow_redirects=True
|
||||
)
|
||||
uuids = set()
|
||||
sys_fetch_workers = int(os.getenv("FETCH_WORKERS", 10))
|
||||
workers = range(1, sys_fetch_workers)
|
||||
now = time.time()
|
||||
|
||||
assert b"1 Imported" in res.data
|
||||
for one in workers:
|
||||
if strtobool(os.getenv("TEST_WITH_BROTLI")):
|
||||
# A very long string that WILL trigger Brotli compression of the snapshot
|
||||
# BROTLI_COMPRESS_SIZE_THRESHOLD should be set to say 200
|
||||
from ..model.Watch import BROTLI_COMPRESS_SIZE_THRESHOLD
|
||||
content = str(one) + "x" + str(one) * (BROTLI_COMPRESS_SIZE_THRESHOLD + 10)
|
||||
else:
|
||||
# Just enough to test datastore
|
||||
content = str(one)+'x'
|
||||
|
||||
test_url = url_for('test_endpoint', content_type="text/html", content=content, _external=True)
|
||||
uuids.add(client.application.config.get('DATASTORE').add_watch(url=test_url, extras={'title': str(one)}))
|
||||
|
||||
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
|
||||
|
||||
wait_for_all_checks(client)
|
||||
duration = time.time() - now
|
||||
per_worker = duration/sys_fetch_workers
|
||||
if sys_fetch_workers < 20:
|
||||
per_worker_threshold=0.6
|
||||
elif sys_fetch_workers < 50:
|
||||
per_worker_threshold = 0.8
|
||||
else:
|
||||
per_worker_threshold = 1.5
|
||||
|
||||
logger.debug(f"All fetched in {duration:.2f}s, {per_worker}s per worker")
|
||||
# Problematic on github
|
||||
#assert per_worker < per_worker_threshold, f"If concurrency is working good, no blocking async problems, each worker ({sys_fetch_workers} workers) should have done his job in under {per_worker_threshold}s, got {per_worker:.2f}s per worker, total duration was {duration:.2f}s"
|
||||
|
||||
# Essentially just triggers the DB write/update
|
||||
res = client.post(
|
||||
@@ -34,7 +56,7 @@ def test_consistent_history(client, live_server, measure_memory_usage, datastore
|
||||
)
|
||||
assert b"Settings updated." in res.data
|
||||
|
||||
|
||||
# Wait for the sync DB save to happen
|
||||
time.sleep(2)
|
||||
|
||||
json_db_file = os.path.join(live_server.app.config['DATASTORE'].datastore_path, 'url-watches.json')
|
||||
@@ -44,14 +66,18 @@ def test_consistent_history(client, live_server, measure_memory_usage, datastore
|
||||
json_obj = json.load(f)
|
||||
|
||||
# assert the right amount of watches was found in the JSON
|
||||
assert len(json_obj['watching']) == len(r), "Correct number of watches was found in the JSON"
|
||||
i=0
|
||||
assert len(json_obj['watching']) == len(workers), "Correct number of watches was found in the JSON"
|
||||
|
||||
i = 0
|
||||
# each one should have a history.txt containing just one line
|
||||
for w in json_obj['watching'].keys():
|
||||
i+=1
|
||||
i += 1
|
||||
history_txt_index_file = os.path.join(live_server.app.config['DATASTORE'].datastore_path, w, 'history.txt')
|
||||
assert os.path.isfile(history_txt_index_file), f"History.txt should exist where I expect it at {history_txt_index_file}"
|
||||
|
||||
# Should be no errors (could be from brotli etc)
|
||||
assert not live_server.app.config['DATASTORE'].data['watching'][w].get('last_error')
|
||||
|
||||
# Same like in model.Watch
|
||||
with open(history_txt_index_file, "r") as f:
|
||||
tmp_history = dict(i.strip().split(',', 2) for i in f.readlines())
|
||||
@@ -63,15 +89,21 @@ def test_consistent_history(client, live_server, measure_memory_usage, datastore
|
||||
# Find the snapshot one
|
||||
for fname in files_in_watch_dir:
|
||||
if fname != 'history.txt' and 'html' not in fname:
|
||||
if strtobool(os.getenv("TEST_WITH_BROTLI")):
|
||||
assert fname.endswith('.br'), "Forced TEST_WITH_BROTLI then it should be a .br filename"
|
||||
|
||||
full_snapshot_history_path = os.path.join(live_server.app.config['DATASTORE'].datastore_path, w, fname)
|
||||
# contents should match what we requested as content returned from the test url
|
||||
with open(os.path.join(live_server.app.config['DATASTORE'].datastore_path, w, fname), 'r') as snapshot_f:
|
||||
contents = snapshot_f.read()
|
||||
watch_url = json_obj['watching'][w]['url']
|
||||
u = urlparse(watch_url)
|
||||
q = parse_qs(u[4])
|
||||
assert q['content'][0] == contents.strip(), f"Snapshot file {fname} should contain {q['content'][0]}"
|
||||
|
||||
if fname.endswith('.br'):
|
||||
with open(full_snapshot_history_path, 'rb') as f:
|
||||
contents = brotli.decompress(f.read()).decode('utf-8')
|
||||
else:
|
||||
with open(full_snapshot_history_path, 'r') as snapshot_f:
|
||||
contents = snapshot_f.read()
|
||||
|
||||
watch_title = json_obj['watching'][w]['title']
|
||||
assert json_obj['watching'][w]['title'], "Watch should have a title set"
|
||||
assert contents.startswith(watch_title + "x"), f"Snapshot contents in file {fname} should start with '{watch_title}x', got '{contents}'"
|
||||
|
||||
assert len(files_in_watch_dir) == 3, "Should be just three files in the dir, html.br snapshot, history.txt and the extracted text snapshot"
|
||||
|
||||
|
||||
@@ -2,7 +2,7 @@
|
||||
|
||||
import time
|
||||
from flask import url_for
|
||||
from .util import live_server_setup, wait_for_all_checks, extract_rss_token_from_UI, get_UUID_for_tag_name, delete_all_watches
|
||||
from .util import live_server_setup, wait_for_all_checks, wait_for_watch_history, extract_rss_token_from_UI, get_UUID_for_tag_name, delete_all_watches
|
||||
import os
|
||||
|
||||
|
||||
@@ -87,6 +87,9 @@ def test_rss_group(client, live_server, measure_memory_usage, datastore_path):
|
||||
# Wait for initial checks to complete
|
||||
wait_for_all_checks(client)
|
||||
|
||||
# Ensure initial snapshots are saved
|
||||
assert wait_for_watch_history(client, min_history_count=1, timeout=10), "Watches did not save initial snapshots"
|
||||
|
||||
# Trigger a change
|
||||
set_modified_response(datastore_path=datastore_path)
|
||||
|
||||
@@ -94,6 +97,9 @@ def test_rss_group(client, live_server, measure_memory_usage, datastore_path):
|
||||
res = client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
|
||||
wait_for_all_checks(client)
|
||||
|
||||
# Ensure all watches have sufficient history for RSS generation
|
||||
assert wait_for_watch_history(client, min_history_count=2, timeout=10), "Watches did not accumulate sufficient history"
|
||||
|
||||
# Get RSS token
|
||||
rss_token = extract_rss_token_from_UI(client)
|
||||
assert rss_token is not None
|
||||
@@ -216,11 +222,13 @@ def test_rss_group_only_unviewed(client, live_server, measure_memory_usage, data
|
||||
assert b"Watch added" in res.data
|
||||
|
||||
wait_for_all_checks(client)
|
||||
assert wait_for_watch_history(client, min_history_count=1, timeout=10), "Initial snapshots not saved"
|
||||
|
||||
# Trigger changes
|
||||
set_modified_response(datastore_path=datastore_path)
|
||||
res = client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
|
||||
wait_for_all_checks(client)
|
||||
assert wait_for_watch_history(client, min_history_count=2, timeout=10), "History not accumulated"
|
||||
|
||||
# Get RSS token
|
||||
rss_token = extract_rss_token_from_UI(client)
|
||||
|
||||
@@ -1,8 +1,10 @@
|
||||
import sys
|
||||
import os
|
||||
import pytest
|
||||
|
||||
from changedetectionio import html_tools
|
||||
|
||||
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||||
import html_tools
|
||||
|
||||
# test generation guide.
|
||||
# 1. Do not include encoding in the xml declaration if the test object is a str type.
|
||||
|
||||
@@ -164,14 +164,45 @@ def wait_for_all_checks(client=None):
|
||||
if q_length == 0 and not any_workers_busy:
|
||||
if empty_since is None:
|
||||
empty_since = time.time()
|
||||
elif time.time() - empty_since >= 0.15: # Shorter wait
|
||||
# Brief stabilization period for async workers
|
||||
elif time.time() - empty_since >= 0.3:
|
||||
break
|
||||
else:
|
||||
empty_since = None
|
||||
|
||||
|
||||
attempt += 1
|
||||
time.sleep(0.3)
|
||||
|
||||
def wait_for_watch_history(client, min_history_count=2, timeout=10):
|
||||
"""
|
||||
Wait for watches to have sufficient history entries.
|
||||
Useful after wait_for_all_checks() when you need to ensure history is populated.
|
||||
|
||||
Args:
|
||||
client: Test client with access to datastore
|
||||
min_history_count: Minimum number of history entries required
|
||||
timeout: Maximum time to wait in seconds
|
||||
"""
|
||||
datastore = client.application.config.get('DATASTORE')
|
||||
start_time = time.time()
|
||||
|
||||
while time.time() - start_time < timeout:
|
||||
all_have_history = True
|
||||
for uuid, watch in datastore.data['watching'].items():
|
||||
history_count = len(watch.history.keys())
|
||||
if history_count < min_history_count:
|
||||
all_have_history = False
|
||||
break
|
||||
|
||||
if all_have_history:
|
||||
return True
|
||||
|
||||
time.sleep(0.2)
|
||||
|
||||
# Timeout - return False
|
||||
return False
|
||||
|
||||
|
||||
# Replaced by new_live_server_setup and calling per function scope in conftest.py
|
||||
def live_server_setup(live_server):
|
||||
return True
|
||||
@@ -189,6 +220,8 @@ def new_live_server_setup(live_server):
|
||||
|
||||
@live_server.app.route('/test-endpoint')
|
||||
def test_endpoint():
|
||||
from loguru import logger
|
||||
logger.debug(f"/test-endpoint hit {request}")
|
||||
ctype = request.args.get('content_type')
|
||||
status_code = request.args.get('status_code')
|
||||
content = request.args.get('content') or None
|
||||
|
||||
@@ -2,19 +2,18 @@
|
||||
Worker management module for changedetection.io
|
||||
|
||||
Handles asynchronous workers for dynamic worker scaling.
|
||||
Sync worker support has been removed in favor of async-only architecture.
|
||||
Each worker runs in its own thread with its own event loop for isolation.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import os
|
||||
import threading
|
||||
import time
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
from loguru import logger
|
||||
|
||||
# Global worker state
|
||||
running_async_tasks = []
|
||||
async_loop = None
|
||||
async_loop_thread = None
|
||||
# Global worker state - each worker has its own thread and event loop
|
||||
worker_threads = [] # List of WorkerThread objects
|
||||
|
||||
# Track currently processing UUIDs for async workers - maps {uuid: worker_id}
|
||||
currently_processing_uuids = {}
|
||||
@@ -22,89 +21,118 @@ currently_processing_uuids = {}
|
||||
# Configuration - async workers only
|
||||
USE_ASYNC_WORKERS = True
|
||||
|
||||
# Custom ThreadPoolExecutor for queue operations with named threads
|
||||
# Scale executor threads with FETCH_WORKERS to avoid bottleneck at high concurrency
|
||||
_max_executor_workers = max(50, int(os.getenv("FETCH_WORKERS", "10")))
|
||||
queue_executor = ThreadPoolExecutor(
|
||||
max_workers=_max_executor_workers,
|
||||
thread_name_prefix="QueueGetter-"
|
||||
)
|
||||
|
||||
def start_async_event_loop():
|
||||
"""Start a dedicated event loop for async workers in a separate thread"""
|
||||
global async_loop
|
||||
logger.info("Starting async event loop for workers")
|
||||
|
||||
try:
|
||||
# Create a new event loop for this thread
|
||||
async_loop = asyncio.new_event_loop()
|
||||
# Set it as the event loop for this thread
|
||||
asyncio.set_event_loop(async_loop)
|
||||
|
||||
logger.debug(f"Event loop created and set: {async_loop}")
|
||||
|
||||
# Run the event loop forever
|
||||
async_loop.run_forever()
|
||||
except Exception as e:
|
||||
logger.error(f"Async event loop error: {e}")
|
||||
finally:
|
||||
# Clean up
|
||||
if async_loop and not async_loop.is_closed():
|
||||
async_loop.close()
|
||||
async_loop = None
|
||||
logger.info("Async event loop stopped")
|
||||
|
||||
class WorkerThread:
|
||||
"""Container for a worker thread with its own event loop"""
|
||||
def __init__(self, worker_id, update_q, notification_q, app, datastore):
|
||||
self.worker_id = worker_id
|
||||
self.update_q = update_q
|
||||
self.notification_q = notification_q
|
||||
self.app = app
|
||||
self.datastore = datastore
|
||||
self.thread = None
|
||||
self.loop = None
|
||||
self.running = False
|
||||
|
||||
def run(self):
|
||||
"""Run the worker in its own event loop"""
|
||||
try:
|
||||
# Create a new event loop for this thread
|
||||
self.loop = asyncio.new_event_loop()
|
||||
asyncio.set_event_loop(self.loop)
|
||||
self.running = True
|
||||
|
||||
# Run the worker coroutine
|
||||
self.loop.run_until_complete(
|
||||
start_single_async_worker(
|
||||
self.worker_id,
|
||||
self.update_q,
|
||||
self.notification_q,
|
||||
self.app,
|
||||
self.datastore,
|
||||
queue_executor
|
||||
)
|
||||
)
|
||||
except asyncio.CancelledError:
|
||||
# Normal shutdown - worker was cancelled
|
||||
import os
|
||||
in_pytest = "pytest" in os.sys.modules or "PYTEST_CURRENT_TEST" in os.environ
|
||||
if not in_pytest:
|
||||
logger.info(f"Worker {self.worker_id} shutting down gracefully")
|
||||
except RuntimeError as e:
|
||||
# Ignore expected shutdown errors
|
||||
if "Event loop stopped" not in str(e) and "Event loop is closed" not in str(e):
|
||||
logger.error(f"Worker {self.worker_id} runtime error: {e}")
|
||||
except Exception as e:
|
||||
logger.error(f"Worker {self.worker_id} thread error: {e}")
|
||||
finally:
|
||||
# Clean up
|
||||
if self.loop and not self.loop.is_closed():
|
||||
self.loop.close()
|
||||
self.running = False
|
||||
self.loop = None
|
||||
|
||||
def start(self):
|
||||
"""Start the worker thread"""
|
||||
self.thread = threading.Thread(
|
||||
target=self.run,
|
||||
daemon=True,
|
||||
name=f"PageFetchAsyncUpdateWorker-{self.worker_id}"
|
||||
)
|
||||
self.thread.start()
|
||||
|
||||
def stop(self):
|
||||
"""Stop the worker thread"""
|
||||
if self.loop and self.running:
|
||||
try:
|
||||
# Signal the loop to stop
|
||||
self.loop.call_soon_threadsafe(self.loop.stop)
|
||||
except RuntimeError:
|
||||
pass
|
||||
|
||||
if self.thread and self.thread.is_alive():
|
||||
self.thread.join(timeout=2.0)
|
||||
|
||||
|
||||
def start_async_workers(n_workers, update_q, notification_q, app, datastore):
|
||||
"""Start the async worker management system"""
|
||||
global async_loop_thread, async_loop, running_async_tasks, currently_processing_uuids
|
||||
|
||||
# Clear any stale UUID tracking state
|
||||
"""Start async workers, each with its own thread and event loop for isolation"""
|
||||
global worker_threads, currently_processing_uuids
|
||||
|
||||
# Clear any stale state
|
||||
currently_processing_uuids.clear()
|
||||
|
||||
# Start the event loop in a separate thread
|
||||
async_loop_thread = threading.Thread(target=start_async_event_loop, daemon=True)
|
||||
async_loop_thread.start()
|
||||
|
||||
# Wait for the loop to be available (with timeout for safety)
|
||||
max_wait_time = 5.0
|
||||
wait_start = time.time()
|
||||
while async_loop is None and (time.time() - wait_start) < max_wait_time:
|
||||
time.sleep(0.1)
|
||||
|
||||
if async_loop is None:
|
||||
logger.error("Failed to start async event loop within timeout")
|
||||
return
|
||||
|
||||
# Additional brief wait to ensure loop is running
|
||||
time.sleep(0.2)
|
||||
|
||||
# Start async workers
|
||||
logger.info(f"Starting {n_workers} async workers")
|
||||
|
||||
# Start each worker in its own thread with its own event loop
|
||||
logger.info(f"Starting {n_workers} async workers (isolated threads)")
|
||||
for i in range(n_workers):
|
||||
try:
|
||||
# Use a factory function to create named worker coroutines
|
||||
def create_named_worker(worker_id):
|
||||
async def named_worker():
|
||||
task = asyncio.current_task()
|
||||
if task:
|
||||
task.set_name(f"async-worker-{worker_id}")
|
||||
return await start_single_async_worker(worker_id, update_q, notification_q, app, datastore)
|
||||
return named_worker()
|
||||
|
||||
task_future = asyncio.run_coroutine_threadsafe(create_named_worker(i), async_loop)
|
||||
running_async_tasks.append(task_future)
|
||||
except RuntimeError as e:
|
||||
worker = WorkerThread(i, update_q, notification_q, app, datastore)
|
||||
worker.start()
|
||||
worker_threads.append(worker)
|
||||
# No sleep needed - threads start independently and asynchronously
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to start async worker {i}: {e}")
|
||||
continue
|
||||
|
||||
|
||||
async def start_single_async_worker(worker_id, update_q, notification_q, app, datastore):
|
||||
async def start_single_async_worker(worker_id, update_q, notification_q, app, datastore, executor=None):
|
||||
"""Start a single async worker with auto-restart capability"""
|
||||
from changedetectionio.async_update_worker import async_update_worker
|
||||
|
||||
|
||||
# Check if we're in pytest environment - if so, be more gentle with logging
|
||||
import os
|
||||
in_pytest = "pytest" in os.sys.modules or "PYTEST_CURRENT_TEST" in os.environ
|
||||
|
||||
|
||||
while not app.config.exit.is_set():
|
||||
try:
|
||||
if not in_pytest:
|
||||
logger.info(f"Starting async worker {worker_id}")
|
||||
await async_update_worker(worker_id, update_q, notification_q, app, datastore)
|
||||
await async_update_worker(worker_id, update_q, notification_q, app, datastore, executor)
|
||||
# If we reach here, worker exited cleanly
|
||||
if not in_pytest:
|
||||
logger.info(f"Async worker {worker_id} exited cleanly")
|
||||
@@ -131,39 +159,38 @@ def start_workers(n_workers, update_q, notification_q, app, datastore):
|
||||
|
||||
def add_worker(update_q, notification_q, app, datastore):
|
||||
"""Add a new async worker (for dynamic scaling)"""
|
||||
global running_async_tasks
|
||||
|
||||
if not async_loop:
|
||||
logger.error("Async loop not running, cannot add worker")
|
||||
return False
|
||||
|
||||
worker_id = len(running_async_tasks)
|
||||
global worker_threads
|
||||
|
||||
worker_id = len(worker_threads)
|
||||
logger.info(f"Adding async worker {worker_id}")
|
||||
|
||||
task_future = asyncio.run_coroutine_threadsafe(
|
||||
start_single_async_worker(worker_id, update_q, notification_q, app, datastore), async_loop
|
||||
)
|
||||
running_async_tasks.append(task_future)
|
||||
return True
|
||||
|
||||
try:
|
||||
worker = WorkerThread(worker_id, update_q, notification_q, app, datastore)
|
||||
worker.start()
|
||||
worker_threads.append(worker)
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to add worker {worker_id}: {e}")
|
||||
return False
|
||||
|
||||
|
||||
def remove_worker():
|
||||
"""Remove an async worker (for dynamic scaling)"""
|
||||
global running_async_tasks
|
||||
|
||||
if not running_async_tasks:
|
||||
global worker_threads
|
||||
|
||||
if not worker_threads:
|
||||
return False
|
||||
|
||||
# Cancel the last worker
|
||||
task_future = running_async_tasks.pop()
|
||||
task_future.cancel()
|
||||
logger.info(f"Removed async worker, {len(running_async_tasks)} workers remaining")
|
||||
|
||||
# Stop the last worker
|
||||
worker = worker_threads.pop()
|
||||
worker.stop()
|
||||
logger.info(f"Removed async worker, {len(worker_threads)} workers remaining")
|
||||
return True
|
||||
|
||||
|
||||
def get_worker_count():
|
||||
"""Get current number of async workers"""
|
||||
return len(running_async_tasks)
|
||||
return len(worker_threads)
|
||||
|
||||
|
||||
def get_running_uuids():
|
||||
@@ -249,38 +276,21 @@ def queue_item_async_safe(update_q, item, silent=False):
|
||||
|
||||
def shutdown_workers():
|
||||
"""Shutdown all async workers fast and aggressively"""
|
||||
global async_loop, async_loop_thread, running_async_tasks
|
||||
|
||||
global worker_threads
|
||||
|
||||
# Check if we're in pytest environment - if so, be more gentle with logging
|
||||
import os
|
||||
in_pytest = "pytest" in os.sys.modules or "PYTEST_CURRENT_TEST" in os.environ
|
||||
|
||||
|
||||
if not in_pytest:
|
||||
logger.info("Fast shutdown of async workers initiated...")
|
||||
|
||||
# Cancel all async tasks immediately
|
||||
for task_future in running_async_tasks:
|
||||
if not task_future.done():
|
||||
task_future.cancel()
|
||||
|
||||
# Stop the async event loop immediately
|
||||
if async_loop and not async_loop.is_closed():
|
||||
try:
|
||||
async_loop.call_soon_threadsafe(async_loop.stop)
|
||||
except RuntimeError:
|
||||
# Loop might already be stopped
|
||||
pass
|
||||
|
||||
running_async_tasks.clear()
|
||||
async_loop = None
|
||||
|
||||
# Give async thread minimal time to finish, then continue
|
||||
if async_loop_thread and async_loop_thread.is_alive():
|
||||
async_loop_thread.join(timeout=1.0) # Only 1 second timeout
|
||||
if async_loop_thread.is_alive() and not in_pytest:
|
||||
logger.info("Async thread still running after timeout - continuing with shutdown")
|
||||
async_loop_thread = None
|
||||
|
||||
|
||||
# Stop all worker threads
|
||||
for worker in worker_threads:
|
||||
worker.stop()
|
||||
|
||||
worker_threads.clear()
|
||||
|
||||
if not in_pytest:
|
||||
logger.info("Async workers fast shutdown complete")
|
||||
|
||||
@@ -290,69 +300,57 @@ def shutdown_workers():
|
||||
def adjust_async_worker_count(new_count, update_q=None, notification_q=None, app=None, datastore=None):
|
||||
"""
|
||||
Dynamically adjust the number of async workers.
|
||||
|
||||
|
||||
Args:
|
||||
new_count: Target number of workers
|
||||
update_q, notification_q, app, datastore: Required for adding new workers
|
||||
|
||||
|
||||
Returns:
|
||||
dict: Status of the adjustment operation
|
||||
"""
|
||||
global running_async_tasks
|
||||
|
||||
global worker_threads
|
||||
|
||||
current_count = get_worker_count()
|
||||
|
||||
|
||||
if new_count == current_count:
|
||||
return {
|
||||
'status': 'no_change',
|
||||
'message': f'Worker count already at {current_count}',
|
||||
'current_count': current_count
|
||||
}
|
||||
|
||||
|
||||
if new_count > current_count:
|
||||
# Add workers
|
||||
workers_to_add = new_count - current_count
|
||||
logger.info(f"Adding {workers_to_add} async workers (from {current_count} to {new_count})")
|
||||
|
||||
|
||||
if not all([update_q, notification_q, app, datastore]):
|
||||
return {
|
||||
'status': 'error',
|
||||
'message': 'Missing required parameters to add workers',
|
||||
'current_count': current_count
|
||||
}
|
||||
|
||||
|
||||
for i in range(workers_to_add):
|
||||
worker_id = len(running_async_tasks)
|
||||
task_future = asyncio.run_coroutine_threadsafe(
|
||||
start_single_async_worker(worker_id, update_q, notification_q, app, datastore),
|
||||
async_loop
|
||||
)
|
||||
running_async_tasks.append(task_future)
|
||||
|
||||
add_worker(update_q, notification_q, app, datastore)
|
||||
|
||||
return {
|
||||
'status': 'success',
|
||||
'message': f'Added {workers_to_add} workers',
|
||||
'previous_count': current_count,
|
||||
'current_count': new_count
|
||||
'current_count': len(worker_threads)
|
||||
}
|
||||
|
||||
|
||||
else:
|
||||
# Remove workers
|
||||
workers_to_remove = current_count - new_count
|
||||
logger.info(f"Removing {workers_to_remove} async workers (from {current_count} to {new_count})")
|
||||
|
||||
|
||||
removed_count = 0
|
||||
for _ in range(workers_to_remove):
|
||||
if running_async_tasks:
|
||||
task_future = running_async_tasks.pop()
|
||||
task_future.cancel()
|
||||
# Wait for the task to actually stop
|
||||
try:
|
||||
task_future.result(timeout=5) # 5 second timeout
|
||||
except Exception:
|
||||
pass # Task was cancelled, which is expected
|
||||
if remove_worker():
|
||||
removed_count += 1
|
||||
|
||||
|
||||
return {
|
||||
'status': 'success',
|
||||
'message': f'Removed {removed_count} workers',
|
||||
@@ -367,72 +365,58 @@ def get_worker_status():
|
||||
'worker_type': 'async',
|
||||
'worker_count': get_worker_count(),
|
||||
'running_uuids': get_running_uuids(),
|
||||
'async_loop_running': async_loop is not None,
|
||||
'active_threads': sum(1 for w in worker_threads if w.thread and w.thread.is_alive()),
|
||||
}
|
||||
|
||||
|
||||
def check_worker_health(expected_count, update_q=None, notification_q=None, app=None, datastore=None):
|
||||
"""
|
||||
Check if the expected number of async workers are running and restart any missing ones.
|
||||
|
||||
|
||||
Args:
|
||||
expected_count: Expected number of workers
|
||||
update_q, notification_q, app, datastore: Required for restarting workers
|
||||
|
||||
|
||||
Returns:
|
||||
dict: Health check results
|
||||
"""
|
||||
global running_async_tasks
|
||||
|
||||
global worker_threads
|
||||
|
||||
current_count = get_worker_count()
|
||||
|
||||
if current_count == expected_count:
|
||||
|
||||
# Check which workers are actually alive
|
||||
alive_count = sum(1 for w in worker_threads if w.thread and w.thread.is_alive())
|
||||
|
||||
if alive_count == expected_count:
|
||||
return {
|
||||
'status': 'healthy',
|
||||
'expected_count': expected_count,
|
||||
'actual_count': current_count,
|
||||
'actual_count': alive_count,
|
||||
'message': f'All {expected_count} async workers running'
|
||||
}
|
||||
|
||||
# Check for crashed async workers
|
||||
|
||||
# Find dead workers
|
||||
dead_workers = []
|
||||
alive_count = 0
|
||||
|
||||
for i, task_future in enumerate(running_async_tasks[:]):
|
||||
if task_future.done():
|
||||
try:
|
||||
result = task_future.result()
|
||||
dead_workers.append(i)
|
||||
logger.warning(f"Async worker {i} completed unexpectedly")
|
||||
except Exception as e:
|
||||
dead_workers.append(i)
|
||||
logger.error(f"Async worker {i} crashed: {e}")
|
||||
else:
|
||||
alive_count += 1
|
||||
|
||||
for i, worker in enumerate(worker_threads[:]):
|
||||
if not worker.thread or not worker.thread.is_alive():
|
||||
dead_workers.append(i)
|
||||
logger.warning(f"Async worker {worker.worker_id} thread is dead")
|
||||
|
||||
# Remove dead workers from tracking
|
||||
for i in reversed(dead_workers):
|
||||
if i < len(running_async_tasks):
|
||||
running_async_tasks.pop(i)
|
||||
|
||||
if i < len(worker_threads):
|
||||
worker_threads.pop(i)
|
||||
|
||||
missing_workers = expected_count - alive_count
|
||||
restarted_count = 0
|
||||
|
||||
|
||||
if missing_workers > 0 and all([update_q, notification_q, app, datastore]):
|
||||
logger.info(f"Restarting {missing_workers} crashed async workers")
|
||||
|
||||
|
||||
for i in range(missing_workers):
|
||||
worker_id = alive_count + i
|
||||
try:
|
||||
task_future = asyncio.run_coroutine_threadsafe(
|
||||
start_single_async_worker(worker_id, update_q, notification_q, app, datastore),
|
||||
async_loop
|
||||
)
|
||||
running_async_tasks.append(task_future)
|
||||
if add_worker(update_q, notification_q, app, datastore):
|
||||
restarted_count += 1
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to restart worker {worker_id}: {e}")
|
||||
|
||||
|
||||
return {
|
||||
'status': 'repaired' if restarted_count > 0 else 'degraded',
|
||||
'expected_count': expected_count,
|
||||
|
||||
Reference in New Issue
Block a user