mirror of
https://github.com/dgtlmoon/changedetection.io.git
synced 2026-01-15 11:40:21 +00:00
Compare commits
10 Commits
brotli-mor
...
0.52.4
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
51d531d732 | ||
|
|
e40c4ca97d | ||
|
|
b8ede70f3a | ||
|
|
50b349b464 | ||
|
|
67d097cca7 | ||
|
|
494385a379 | ||
|
|
c2ee84b753 | ||
|
|
c1e0296cda | ||
|
|
f041223c38 | ||
|
|
d36738d7ef |
@@ -11,6 +11,7 @@ recursive-include changedetectionio/realtime *
|
||||
recursive-include changedetectionio/static *
|
||||
recursive-include changedetectionio/templates *
|
||||
recursive-include changedetectionio/tests *
|
||||
recursive-include changedetectionio/translations *
|
||||
recursive-include changedetectionio/widgets *
|
||||
prune changedetectionio/static/package-lock.json
|
||||
prune changedetectionio/static/styles/node_modules
|
||||
|
||||
@@ -2,7 +2,7 @@
|
||||
|
||||
# Read more https://github.com/dgtlmoon/changedetection.io/wiki
|
||||
# Semver means never use .01, or 00. Should be .1.
|
||||
__version__ = '0.52.1'
|
||||
__version__ = '0.52.4'
|
||||
|
||||
from changedetectionio.strtobool import strtobool
|
||||
from json.decoder import JSONDecodeError
|
||||
|
||||
@@ -1,5 +1,4 @@
|
||||
from blinker import signal
|
||||
|
||||
from .processors.exceptions import ProcessorException
|
||||
import changedetectionio.content_fetchers.exceptions as content_fetchers_exceptions
|
||||
from changedetectionio.processors.text_json_diff.processor import FilterNotFoundInResponse
|
||||
@@ -9,7 +8,6 @@ from changedetectionio.flask_app import watch_check_update
|
||||
import asyncio
|
||||
import importlib
|
||||
import os
|
||||
import queue
|
||||
import time
|
||||
|
||||
from loguru import logger
|
||||
@@ -17,36 +15,48 @@ from loguru import logger
|
||||
# Async version of update_worker
|
||||
# Processes jobs from AsyncSignalPriorityQueue instead of threaded queue
|
||||
|
||||
async def async_update_worker(worker_id, q, notification_q, app, datastore):
|
||||
async def async_update_worker(worker_id, q, notification_q, app, datastore, executor=None):
|
||||
"""
|
||||
Async worker function that processes watch check jobs from the queue.
|
||||
|
||||
|
||||
Args:
|
||||
worker_id: Unique identifier for this worker
|
||||
q: AsyncSignalPriorityQueue containing jobs to process
|
||||
notification_q: Standard queue for notifications
|
||||
app: Flask application instance
|
||||
datastore: Application datastore
|
||||
executor: ThreadPoolExecutor for queue operations (optional)
|
||||
"""
|
||||
# Set a descriptive name for this task
|
||||
task = asyncio.current_task()
|
||||
if task:
|
||||
task.set_name(f"async-worker-{worker_id}")
|
||||
|
||||
|
||||
logger.info(f"Starting async worker {worker_id}")
|
||||
|
||||
|
||||
while not app.config.exit.is_set():
|
||||
update_handler = None
|
||||
watch = None
|
||||
|
||||
try:
|
||||
# Use native janus async interface - no threads needed!
|
||||
queued_item_data = await asyncio.wait_for(q.async_get(), timeout=1.0)
|
||||
# Use sync interface via run_in_executor since each worker has its own event loop
|
||||
loop = asyncio.get_event_loop()
|
||||
queued_item_data = await asyncio.wait_for(
|
||||
loop.run_in_executor(executor, q.get, True, 1.0), # block=True, timeout=1.0
|
||||
timeout=1.5
|
||||
)
|
||||
|
||||
except asyncio.TimeoutError:
|
||||
# No jobs available, continue loop
|
||||
continue
|
||||
except Exception as e:
|
||||
# Handle expected Empty exception from queue timeout
|
||||
import queue
|
||||
if isinstance(e, queue.Empty):
|
||||
# Queue is empty, normal behavior - just continue
|
||||
continue
|
||||
|
||||
# Unexpected exception - log as critical
|
||||
logger.critical(f"CRITICAL: Worker {worker_id} failed to get queue item: {type(e).__name__}: {e}")
|
||||
|
||||
# Log queue health for debugging
|
||||
@@ -414,14 +424,13 @@ async def async_update_worker(worker_id, q, notification_q, app, datastore):
|
||||
datastore.update_watch(uuid=uuid, update_obj={'last_error': f"Worker error: {str(e)}"})
|
||||
|
||||
finally:
|
||||
|
||||
try:
|
||||
await update_handler.fetcher.quit(watch=watch)
|
||||
except Exception as e:
|
||||
logger.error(f"Exception while cleaning/quit after calling browser: {e}")
|
||||
|
||||
# Always cleanup - this runs whether there was an exception or not
|
||||
if uuid:
|
||||
try:
|
||||
if update_handler and hasattr(update_handler, 'fetcher') and update_handler.fetcher:
|
||||
await update_handler.fetcher.quit(watch=watch)
|
||||
except Exception as e:
|
||||
logger.error(f"Exception while cleaning/quit after calling browser: {e}")
|
||||
try:
|
||||
# Mark UUID as no longer being processed by this worker
|
||||
worker_handler.set_uuid_processing(uuid, worker_id=worker_id, processing=False)
|
||||
@@ -460,7 +469,9 @@ async def async_update_worker(worker_id, q, notification_q, app, datastore):
|
||||
logger.debug(f"Worker {worker_id} completed watch {uuid} in {time.time()-fetch_start_time:.2f}s")
|
||||
except Exception as cleanup_error:
|
||||
logger.error(f"Worker {worker_id} error during cleanup: {cleanup_error}")
|
||||
|
||||
|
||||
del(uuid)
|
||||
|
||||
# Brief pause before continuing to avoid tight error loops (only on error)
|
||||
if 'e' in locals():
|
||||
await asyncio.sleep(1.0)
|
||||
|
||||
@@ -92,7 +92,12 @@ def construct_blueprint(datastore: ChangeDetectionStore):
|
||||
|
||||
# Be sure we're written fresh
|
||||
datastore.sync_to_json()
|
||||
zip_thread = threading.Thread(target=create_backup, args=(datastore.datastore_path, datastore.data.get("watching")))
|
||||
zip_thread = threading.Thread(
|
||||
target=create_backup,
|
||||
args=(datastore.datastore_path, datastore.data.get("watching")),
|
||||
daemon=True,
|
||||
name="BackupCreator"
|
||||
)
|
||||
zip_thread.start()
|
||||
backup_threads.append(zip_thread)
|
||||
flash(gettext("Backup building in background, check back in a few minutes."))
|
||||
|
||||
@@ -21,31 +21,154 @@ from changedetectionio.flask_app import login_optionally_required
|
||||
from loguru import logger
|
||||
|
||||
browsersteps_sessions = {}
|
||||
browsersteps_watch_to_session = {} # Maps watch_uuid -> browsersteps_session_id
|
||||
io_interface_context = None
|
||||
import json
|
||||
import hashlib
|
||||
from flask import Response
|
||||
import asyncio
|
||||
import threading
|
||||
import time
|
||||
|
||||
def run_async_in_browser_loop(coro):
|
||||
"""Run async coroutine using the existing async worker event loop"""
|
||||
from changedetectionio import worker_handler
|
||||
|
||||
# Use the existing async worker event loop instead of creating a new one
|
||||
if worker_handler.USE_ASYNC_WORKERS and worker_handler.async_loop and not worker_handler.async_loop.is_closed():
|
||||
logger.debug("Browser steps using existing async worker event loop")
|
||||
future = asyncio.run_coroutine_threadsafe(coro, worker_handler.async_loop)
|
||||
return future.result()
|
||||
else:
|
||||
# Fallback: create a new event loop (for sync workers or if async loop not available)
|
||||
logger.debug("Browser steps creating temporary event loop")
|
||||
loop = asyncio.new_event_loop()
|
||||
asyncio.set_event_loop(loop)
|
||||
# Dedicated event loop for ALL browser steps sessions
|
||||
_browser_steps_loop = None
|
||||
_browser_steps_thread = None
|
||||
_browser_steps_loop_lock = threading.Lock()
|
||||
|
||||
def _start_browser_steps_loop():
|
||||
"""Start a dedicated event loop for browser steps in its own thread"""
|
||||
global _browser_steps_loop
|
||||
|
||||
# Create and set the event loop for this thread
|
||||
loop = asyncio.new_event_loop()
|
||||
asyncio.set_event_loop(loop)
|
||||
_browser_steps_loop = loop
|
||||
|
||||
logger.debug("Browser steps event loop started")
|
||||
|
||||
try:
|
||||
# Run the loop forever - handles all browsersteps sessions
|
||||
loop.run_forever()
|
||||
except Exception as e:
|
||||
logger.error(f"Browser steps event loop error: {e}")
|
||||
finally:
|
||||
try:
|
||||
return loop.run_until_complete(coro)
|
||||
# Cancel all remaining tasks
|
||||
pending = asyncio.all_tasks(loop)
|
||||
for task in pending:
|
||||
task.cancel()
|
||||
|
||||
# Wait for tasks to finish cancellation
|
||||
if pending:
|
||||
loop.run_until_complete(asyncio.gather(*pending, return_exceptions=True))
|
||||
except Exception as e:
|
||||
logger.debug(f"Error during browser steps loop cleanup: {e}")
|
||||
finally:
|
||||
loop.close()
|
||||
logger.debug("Browser steps event loop closed")
|
||||
|
||||
def _ensure_browser_steps_loop():
|
||||
"""Ensure the browser steps event loop is running"""
|
||||
global _browser_steps_loop, _browser_steps_thread
|
||||
|
||||
with _browser_steps_loop_lock:
|
||||
if _browser_steps_thread is None or not _browser_steps_thread.is_alive():
|
||||
logger.debug("Starting browser steps event loop thread")
|
||||
_browser_steps_thread = threading.Thread(
|
||||
target=_start_browser_steps_loop,
|
||||
daemon=True,
|
||||
name="BrowserStepsEventLoop"
|
||||
)
|
||||
_browser_steps_thread.start()
|
||||
|
||||
# Wait for the loop to be ready
|
||||
timeout = 5.0
|
||||
start_time = time.time()
|
||||
while _browser_steps_loop is None:
|
||||
if time.time() - start_time > timeout:
|
||||
raise RuntimeError("Browser steps event loop failed to start")
|
||||
time.sleep(0.01)
|
||||
|
||||
logger.debug("Browser steps event loop thread started and ready")
|
||||
|
||||
def run_async_in_browser_loop(coro):
|
||||
"""Run async coroutine using the dedicated browser steps event loop"""
|
||||
_ensure_browser_steps_loop()
|
||||
|
||||
if _browser_steps_loop and not _browser_steps_loop.is_closed():
|
||||
logger.debug("Browser steps using dedicated event loop")
|
||||
future = asyncio.run_coroutine_threadsafe(coro, _browser_steps_loop)
|
||||
return future.result()
|
||||
else:
|
||||
raise RuntimeError("Browser steps event loop is not available")
|
||||
|
||||
def cleanup_expired_sessions():
|
||||
"""Remove expired browsersteps sessions and cleanup their resources"""
|
||||
global browsersteps_sessions, browsersteps_watch_to_session
|
||||
|
||||
expired_session_ids = []
|
||||
|
||||
# Find expired sessions
|
||||
for session_id, session_data in browsersteps_sessions.items():
|
||||
browserstepper = session_data.get('browserstepper')
|
||||
if browserstepper and browserstepper.has_expired:
|
||||
expired_session_ids.append(session_id)
|
||||
|
||||
# Cleanup expired sessions
|
||||
for session_id in expired_session_ids:
|
||||
logger.debug(f"Cleaning up expired browsersteps session {session_id}")
|
||||
session_data = browsersteps_sessions[session_id]
|
||||
|
||||
# Cleanup playwright resources asynchronously
|
||||
browserstepper = session_data.get('browserstepper')
|
||||
if browserstepper:
|
||||
try:
|
||||
run_async_in_browser_loop(browserstepper.cleanup())
|
||||
except Exception as e:
|
||||
logger.error(f"Error cleaning up session {session_id}: {e}")
|
||||
|
||||
# Remove from sessions dict
|
||||
del browsersteps_sessions[session_id]
|
||||
|
||||
# Remove from watch mapping
|
||||
for watch_uuid, mapped_session_id in list(browsersteps_watch_to_session.items()):
|
||||
if mapped_session_id == session_id:
|
||||
del browsersteps_watch_to_session[watch_uuid]
|
||||
break
|
||||
|
||||
if expired_session_ids:
|
||||
logger.info(f"Cleaned up {len(expired_session_ids)} expired browsersteps session(s)")
|
||||
|
||||
def cleanup_session_for_watch(watch_uuid):
|
||||
"""Cleanup a specific browsersteps session for a watch UUID"""
|
||||
global browsersteps_sessions, browsersteps_watch_to_session
|
||||
|
||||
session_id = browsersteps_watch_to_session.get(watch_uuid)
|
||||
if not session_id:
|
||||
logger.debug(f"No browsersteps session found for watch {watch_uuid}")
|
||||
return
|
||||
|
||||
logger.debug(f"Cleaning up browsersteps session {session_id} for watch {watch_uuid}")
|
||||
|
||||
session_data = browsersteps_sessions.get(session_id)
|
||||
if session_data:
|
||||
browserstepper = session_data.get('browserstepper')
|
||||
if browserstepper:
|
||||
try:
|
||||
run_async_in_browser_loop(browserstepper.cleanup())
|
||||
except Exception as e:
|
||||
logger.error(f"Error cleaning up session {session_id} for watch {watch_uuid}: {e}")
|
||||
|
||||
# Remove from sessions dict
|
||||
del browsersteps_sessions[session_id]
|
||||
|
||||
# Remove from watch mapping
|
||||
del browsersteps_watch_to_session[watch_uuid]
|
||||
|
||||
logger.debug(f"Cleaned up session for watch {watch_uuid}")
|
||||
|
||||
# Opportunistically cleanup any other expired sessions
|
||||
cleanup_expired_sessions()
|
||||
|
||||
def construct_blueprint(datastore: ChangeDetectionStore):
|
||||
browser_steps_blueprint = Blueprint('browser_steps', __name__, template_folder="templates")
|
||||
@@ -123,6 +246,9 @@ def construct_blueprint(datastore: ChangeDetectionStore):
|
||||
if not watch_uuid:
|
||||
return make_response('No Watch UUID specified', 500)
|
||||
|
||||
# Cleanup any existing session for this watch
|
||||
cleanup_session_for_watch(watch_uuid)
|
||||
|
||||
logger.debug("Starting connection with playwright")
|
||||
logger.debug("browser_steps.py connecting")
|
||||
|
||||
@@ -131,6 +257,10 @@ def construct_blueprint(datastore: ChangeDetectionStore):
|
||||
browsersteps_sessions[browsersteps_session_id] = run_async_in_browser_loop(
|
||||
start_browsersteps_session(watch_uuid)
|
||||
)
|
||||
|
||||
# Store the mapping of watch_uuid -> browsersteps_session_id
|
||||
browsersteps_watch_to_session[watch_uuid] = browsersteps_session_id
|
||||
|
||||
except Exception as e:
|
||||
if 'ECONNREFUSED' in str(e):
|
||||
return make_response('Unable to start the Playwright Browser session, is sockpuppetbrowser running? Network configuration is OK?', 401)
|
||||
|
||||
@@ -47,9 +47,6 @@ def construct_single_watch_routes(rss_blueprint, datastore):
|
||||
if len(dates) < 2:
|
||||
return f"Watch {uuid} does not have enough history snapshots to show changes (need at least 2)", 400
|
||||
|
||||
# Add uuid to watch for proper functioning
|
||||
watch['uuid'] = uuid
|
||||
|
||||
# Get the number of diffs to include (default: 5)
|
||||
rss_diff_length = datastore.data['settings']['application'].get('rss_diff_length', 5)
|
||||
|
||||
@@ -101,7 +98,7 @@ def construct_single_watch_routes(rss_blueprint, datastore):
|
||||
date_index_from, date_index_to)
|
||||
|
||||
# Create and populate feed entry
|
||||
guid = f"{watch['uuid']}/{timestamp_to}"
|
||||
guid = f"{uuid}/{timestamp_to}"
|
||||
fe = fg.add_entry()
|
||||
title_suffix = f"Change @ {res['original_context']['change_datetime']}"
|
||||
populate_feed_entry(fe, watch, res.get('body', ''), guid, timestamp_to,
|
||||
|
||||
@@ -63,11 +63,8 @@ def construct_tag_routes(rss_blueprint, datastore):
|
||||
|
||||
# Only include unviewed watches
|
||||
if not watch.viewed:
|
||||
# Add uuid to watch for proper functioning
|
||||
watch['uuid'] = uuid
|
||||
|
||||
# Include a link to the diff page
|
||||
diff_link = {'href': url_for('ui.ui_diff.diff_history_page', uuid=watch['uuid'], _external=True)}
|
||||
# Include a link to the diff page (use uuid from loop, don't modify watch dict)
|
||||
diff_link = {'href': url_for('ui.ui_diff.diff_history_page', uuid=uuid, _external=True)}
|
||||
|
||||
# Get watch label
|
||||
watch_label = get_watch_label(datastore, watch)
|
||||
|
||||
@@ -50,7 +50,8 @@
|
||||
<td>{{ "{:,}".format(tag_count[uuid]) if uuid in tag_count else 0 }}</td>
|
||||
<td class="title-col inline"> <a href="{{url_for('watchlist.index', tag=uuid) }}">{{ tag.title }}</a></td>
|
||||
<td>
|
||||
<a class="pure-button pure-button-primary" href="{{ url_for('tags.form_tag_edit', uuid=uuid) }}">{{ _('Edit') }}</a>
|
||||
<a class="pure-button pure-button-primary" href="{{ url_for('tags.form_tag_edit', uuid=uuid) }}">{{ _('Edit') }}</a>
|
||||
<a href="{{ url_for('ui.form_watch_checknow', tag=uuid) }}" class="pure-button pure-button-primary" >{{ _('Recheck') }}</a>
|
||||
<a class="pure-button button-error"
|
||||
href="{{ url_for('tags.delete', uuid=uuid) }}"
|
||||
data-requires-confirm
|
||||
|
||||
@@ -238,6 +238,13 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, queuedWatchMe
|
||||
datastore.data['watching'][uuid] = watch_class(datastore_path=datastore.datastore_path, default=datastore.data['watching'][uuid])
|
||||
flash(gettext("Updated watch - unpaused!") if request.args.get('unpause_on_save') else gettext("Updated watch."))
|
||||
|
||||
# Cleanup any browsersteps session for this watch
|
||||
try:
|
||||
from changedetectionio.blueprint.browser_steps import cleanup_session_for_watch
|
||||
cleanup_session_for_watch(uuid)
|
||||
except Exception as e:
|
||||
logger.debug(f"Error cleaning up browsersteps session: {e}")
|
||||
|
||||
# Re #286 - We wait for syncing new data to disk in another thread every 60 seconds
|
||||
# But in the case something is added we should save straight away
|
||||
datastore.needs_write_urgent = True
|
||||
@@ -325,8 +332,6 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, queuedWatchMe
|
||||
'url': url_for('rss.rss_single_watch', uuid=watch['uuid'], token=app_rss_token)
|
||||
},
|
||||
'settings_application': datastore.data['settings']['application'],
|
||||
'system_has_playwright_configured': os.getenv('PLAYWRIGHT_DRIVER_URL'),
|
||||
'system_has_webdriver_configured': os.getenv('WEBDRIVER_URL'),
|
||||
'ui_edit_stats_extras': collect_ui_edit_stats_extras(watch),
|
||||
'visual_selector_data_ready': datastore.visualselector_data_is_ready(watch_uuid=uuid),
|
||||
'timezone_default_config': datastore.data['settings']['application'].get('scheduler_timezone_default'),
|
||||
|
||||
@@ -206,7 +206,7 @@ Math: {{ 1 + 1 }}") }}
|
||||
|
||||
<div class="tab-pane-inner" id="browser-steps">
|
||||
{% if capabilities.supports_browser_steps %}
|
||||
{% if visual_selector_data_ready %}
|
||||
{% if true %}
|
||||
<img class="beta-logo" src="{{url_for('static_content', group='images', filename='beta-logo.png')}}" alt="New beta functionality">
|
||||
<fieldset>
|
||||
<div class="pure-control-group">
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
import gc
|
||||
import json
|
||||
import os
|
||||
from urllib.parse import urlparse
|
||||
@@ -185,20 +186,33 @@ class fetcher(Fetcher):
|
||||
super().screenshot_step(step_n=step_n)
|
||||
screenshot = await capture_full_page_async(page=self.page, screenshot_format=self.screenshot_format)
|
||||
|
||||
# Request GC immediately after screenshot to free memory
|
||||
# Screenshots can be large and browser steps take many of them
|
||||
await self.page.request_gc()
|
||||
|
||||
if self.browser_steps_screenshot_path is not None:
|
||||
destination = os.path.join(self.browser_steps_screenshot_path, 'step_{}.jpeg'.format(step_n))
|
||||
logger.debug(f"Saving step screenshot to {destination}")
|
||||
with open(destination, 'wb') as f:
|
||||
f.write(screenshot)
|
||||
# Clear local reference to allow screenshot bytes to be collected
|
||||
del screenshot
|
||||
gc.collect()
|
||||
|
||||
async def save_step_html(self, step_n):
|
||||
super().save_step_html(step_n=step_n)
|
||||
content = await self.page.content()
|
||||
|
||||
# Request GC after getting page content
|
||||
await self.page.request_gc()
|
||||
|
||||
destination = os.path.join(self.browser_steps_screenshot_path, 'step_{}.html'.format(step_n))
|
||||
logger.debug(f"Saving step HTML to {destination}")
|
||||
with open(destination, 'w', encoding='utf-8') as f:
|
||||
f.write(content)
|
||||
# Clear local reference
|
||||
del content
|
||||
gc.collect()
|
||||
|
||||
async def run(self,
|
||||
fetch_favicon=True,
|
||||
@@ -305,6 +319,12 @@ class fetcher(Fetcher):
|
||||
|
||||
if self.status_code != 200 and not ignore_status_codes:
|
||||
screenshot = await capture_full_page_async(self.page, screenshot_format=self.screenshot_format)
|
||||
# Cleanup before raising to prevent memory leak
|
||||
await self.page.close()
|
||||
await context.close()
|
||||
await browser.close()
|
||||
# Force garbage collection to release Playwright resources immediately
|
||||
gc.collect()
|
||||
raise Non200ErrorCodeReceived(url=url, status_code=self.status_code, screenshot=screenshot)
|
||||
|
||||
if not empty_pages_are_a_change and len((await self.page.content()).strip()) == 0:
|
||||
@@ -313,48 +333,52 @@ class fetcher(Fetcher):
|
||||
await browser.close()
|
||||
raise EmptyReply(url=url, status_code=response.status)
|
||||
|
||||
# Run Browser Steps here
|
||||
if self.browser_steps_get_valid_steps():
|
||||
await self.iterate_browser_steps(start_url=url)
|
||||
|
||||
await self.page.wait_for_timeout(extra_wait * 1000)
|
||||
|
||||
now = time.time()
|
||||
# So we can find an element on the page where its selector was entered manually (maybe not xPath etc)
|
||||
if current_include_filters is not None:
|
||||
await self.page.evaluate("var include_filters={}".format(json.dumps(current_include_filters)))
|
||||
else:
|
||||
await self.page.evaluate("var include_filters=''")
|
||||
await self.page.request_gc()
|
||||
|
||||
# request_gc before and after evaluate to free up memory
|
||||
# @todo browsersteps etc
|
||||
MAX_TOTAL_HEIGHT = int(os.getenv("SCREENSHOT_MAX_HEIGHT", SCREENSHOT_MAX_HEIGHT_DEFAULT))
|
||||
self.xpath_data = await self.page.evaluate(XPATH_ELEMENT_JS, {
|
||||
"visualselector_xpath_selectors": visualselector_xpath_selectors,
|
||||
"max_height": MAX_TOTAL_HEIGHT
|
||||
})
|
||||
await self.page.request_gc()
|
||||
|
||||
self.instock_data = await self.page.evaluate(INSTOCK_DATA_JS)
|
||||
await self.page.request_gc()
|
||||
|
||||
self.content = await self.page.content()
|
||||
await self.page.request_gc()
|
||||
logger.debug(f"Scrape xPath element data in browser done in {time.time() - now:.2f}s")
|
||||
|
||||
|
||||
# Bug 3 in Playwright screenshot handling
|
||||
# Some bug where it gives the wrong screenshot size, but making a request with the clip set first seems to solve it
|
||||
# JPEG is better here because the screenshots can be very very large
|
||||
|
||||
# Screenshots also travel via the ws:// (websocket) meaning that the binary data is base64 encoded
|
||||
# which will significantly increase the IO size between the server and client, it's recommended to use the lowest
|
||||
# acceptable screenshot quality here
|
||||
# Wrap remaining operations in try/finally to ensure cleanup
|
||||
try:
|
||||
# Run Browser Steps here
|
||||
if self.browser_steps_get_valid_steps():
|
||||
await self.iterate_browser_steps(start_url=url)
|
||||
|
||||
await self.page.wait_for_timeout(extra_wait * 1000)
|
||||
|
||||
now = time.time()
|
||||
# So we can find an element on the page where its selector was entered manually (maybe not xPath etc)
|
||||
if current_include_filters is not None:
|
||||
await self.page.evaluate("var include_filters={}".format(json.dumps(current_include_filters)))
|
||||
else:
|
||||
await self.page.evaluate("var include_filters=''")
|
||||
await self.page.request_gc()
|
||||
|
||||
# request_gc before and after evaluate to free up memory
|
||||
# @todo browsersteps etc
|
||||
MAX_TOTAL_HEIGHT = int(os.getenv("SCREENSHOT_MAX_HEIGHT", SCREENSHOT_MAX_HEIGHT_DEFAULT))
|
||||
self.xpath_data = await self.page.evaluate(XPATH_ELEMENT_JS, {
|
||||
"visualselector_xpath_selectors": visualselector_xpath_selectors,
|
||||
"max_height": MAX_TOTAL_HEIGHT
|
||||
})
|
||||
await self.page.request_gc()
|
||||
|
||||
self.instock_data = await self.page.evaluate(INSTOCK_DATA_JS)
|
||||
await self.page.request_gc()
|
||||
|
||||
self.content = await self.page.content()
|
||||
await self.page.request_gc()
|
||||
logger.debug(f"Scrape xPath element data in browser done in {time.time() - now:.2f}s")
|
||||
|
||||
|
||||
# Bug 3 in Playwright screenshot handling
|
||||
# Some bug where it gives the wrong screenshot size, but making a request with the clip set first seems to solve it
|
||||
# JPEG is better here because the screenshots can be very very large
|
||||
|
||||
# Screenshots also travel via the ws:// (websocket) meaning that the binary data is base64 encoded
|
||||
# which will significantly increase the IO size between the server and client, it's recommended to use the lowest
|
||||
# acceptable screenshot quality here
|
||||
# The actual screenshot - this always base64 and needs decoding! horrible! huge CPU usage
|
||||
self.screenshot = await capture_full_page_async(page=self.page, screenshot_format=self.screenshot_format)
|
||||
|
||||
except ScreenshotUnavailable:
|
||||
# Re-raise screenshot unavailable exceptions
|
||||
raise
|
||||
except Exception as e:
|
||||
# It's likely the screenshot was too long/big and something crashed
|
||||
raise ScreenshotUnavailable(url=url, status_code=self.status_code)
|
||||
@@ -389,6 +413,10 @@ class fetcher(Fetcher):
|
||||
pass
|
||||
browser = None
|
||||
|
||||
# Force Python GC to release Playwright resources immediately
|
||||
# Playwright objects can have circular references that delay cleanup
|
||||
gc.collect()
|
||||
|
||||
|
||||
# Plugin registration for built-in fetcher
|
||||
class PlaywrightFetcherPlugin:
|
||||
|
||||
@@ -15,7 +15,7 @@ class fetcher(Fetcher):
|
||||
proxy_url = None
|
||||
|
||||
# Capability flags
|
||||
supports_browser_steps = True
|
||||
supports_browser_steps = False
|
||||
supports_screenshots = True
|
||||
supports_xpath_element_data = True
|
||||
|
||||
|
||||
@@ -57,14 +57,15 @@ class SignalPriorityQueue(queue.PriorityQueue):
|
||||
def put(self, item, block=True, timeout=None):
|
||||
# Call the parent's put method first
|
||||
super().put(item, block, timeout)
|
||||
|
||||
|
||||
# After putting the item in the queue, check if it has a UUID and emit signal
|
||||
if hasattr(item, 'item') and isinstance(item.item, dict) and 'uuid' in item.item:
|
||||
uuid = item.item['uuid']
|
||||
# Get the signal and send it if it exists
|
||||
watch_check_update = signal('watch_check_update')
|
||||
if watch_check_update:
|
||||
# Send the watch_uuid parameter
|
||||
# NOTE: This would block other workers from .put/.get while this signal sends
|
||||
# Signal handlers may iterate the queue/datastore while holding locks
|
||||
watch_check_update.send(watch_uuid=uuid)
|
||||
|
||||
# Send queue_length signal with current queue size
|
||||
@@ -312,14 +313,15 @@ class AsyncSignalPriorityQueue(asyncio.PriorityQueue):
|
||||
async def put(self, item):
|
||||
# Call the parent's put method first
|
||||
await super().put(item)
|
||||
|
||||
|
||||
# After putting the item in the queue, check if it has a UUID and emit signal
|
||||
if hasattr(item, 'item') and isinstance(item.item, dict) and 'uuid' in item.item:
|
||||
uuid = item.item['uuid']
|
||||
# Get the signal and send it if it exists
|
||||
watch_check_update = signal('watch_check_update')
|
||||
if watch_check_update:
|
||||
# Send the watch_uuid parameter
|
||||
# NOTE: This would block other workers from .put/.get while this signal sends
|
||||
# Signal handlers may iterate the queue/datastore while holding locks
|
||||
watch_check_update.send(watch_uuid=uuid)
|
||||
|
||||
# Send queue_length signal with current queue size
|
||||
|
||||
@@ -863,13 +863,13 @@ def changedetection_app(config=None, datastore_o=None):
|
||||
worker_handler.start_workers(n_workers, update_q, notification_q, app, datastore)
|
||||
|
||||
# @todo handle ctrl break
|
||||
ticker_thread = threading.Thread(target=ticker_thread_check_time_launch_checks).start()
|
||||
threading.Thread(target=notification_runner).start()
|
||||
ticker_thread = threading.Thread(target=ticker_thread_check_time_launch_checks, daemon=True, name="TickerThread-ScheduleChecker").start()
|
||||
threading.Thread(target=notification_runner, daemon=True, name="NotificationRunner").start()
|
||||
|
||||
in_pytest = "pytest" in sys.modules or "PYTEST_CURRENT_TEST" in os.environ
|
||||
# Check for new release version, but not when running in test/build or pytest
|
||||
if not os.getenv("GITHUB_REF", False) and not strtobool(os.getenv('DISABLE_VERSION_CHECK', 'no')) and not in_pytest:
|
||||
threading.Thread(target=check_for_new_version).start()
|
||||
threading.Thread(target=check_for_new_version, daemon=True, name="VersionChecker").start()
|
||||
|
||||
# Return the Flask app - the Socket.IO will be attached to it but initialized separately
|
||||
# This avoids circular dependencies
|
||||
|
||||
@@ -29,6 +29,7 @@ def get_timeago_locale(flask_locale):
|
||||
"""
|
||||
locale_map = {
|
||||
'zh': 'zh_CN', # Chinese Simplified
|
||||
'zh_Hant_TW': 'zh_TW', # Flask-Babel normalizes zh_TW to zh_Hant_TW
|
||||
'pt': 'pt_PT', # Portuguese (Portugal)
|
||||
'sv': 'sv_SE', # Swedish
|
||||
'no': 'nb_NO', # Norwegian Bokmål
|
||||
|
||||
@@ -10,9 +10,13 @@ from pathlib import Path
|
||||
from loguru import logger
|
||||
|
||||
from .. import jinja2_custom as safe_jinja
|
||||
from ..diff import ADDED_PLACEMARKER_OPEN
|
||||
from ..html_tools import TRANSLATE_WHITESPACE_TABLE
|
||||
|
||||
FAVICON_RESAVE_THRESHOLD_SECONDS=86400
|
||||
BROTLI_COMPRESS_SIZE_THRESHOLD = int(os.getenv('SNAPSHOT_BROTLI_COMPRESSION_THRESHOLD', 1024))
|
||||
|
||||
minimum_seconds_recheck_time = int(os.getenv('MINIMUM_SECONDS_RECHECK_TIME', 3))
|
||||
mtable = {'seconds': 1, 'minutes': 60, 'hours': 3600, 'days': 86400, 'weeks': 86400 * 7}
|
||||
|
||||
def _brotli_compress_worker(conn, filepath, mode=None):
|
||||
"""
|
||||
@@ -29,6 +33,7 @@ def _brotli_compress_worker(conn, filepath, mode=None):
|
||||
try:
|
||||
# Receive data from parent process via pipe (avoids pickle overhead)
|
||||
contents = conn.recv()
|
||||
logger.debug(f"Starting brotli compression of {len(contents)} bytes.")
|
||||
|
||||
if mode is not None:
|
||||
compressed_data = brotli.compress(contents, mode=mode)
|
||||
@@ -40,9 +45,10 @@ def _brotli_compress_worker(conn, filepath, mode=None):
|
||||
|
||||
# Send success status back
|
||||
conn.send(True)
|
||||
logger.debug(f"Finished brotli compression - From {len(contents)} to {len(compressed_data)} bytes.")
|
||||
# No need for explicit cleanup - process exit frees all memory
|
||||
except Exception as e:
|
||||
logger.error(f"Brotli compression worker failed: {e}")
|
||||
logger.critical(f"Brotli compression worker failed: {e}")
|
||||
conn.send(False)
|
||||
finally:
|
||||
conn.close()
|
||||
@@ -66,7 +72,6 @@ def _brotli_subprocess_save(contents, filepath, mode=None, timeout=30, fallback_
|
||||
Raises:
|
||||
Exception: if compression fails and fallback_uncompressed is False
|
||||
"""
|
||||
import brotli
|
||||
import multiprocessing
|
||||
import sys
|
||||
|
||||
@@ -144,11 +149,6 @@ def _brotli_subprocess_save(contents, filepath, mode=None, timeout=30, fallback_
|
||||
else:
|
||||
raise Exception(f"Brotli compression subprocess failed for {filepath}")
|
||||
|
||||
FAVICON_RESAVE_THRESHOLD_SECONDS=86400
|
||||
|
||||
|
||||
minimum_seconds_recheck_time = int(os.getenv('MINIMUM_SECONDS_RECHECK_TIME', 3))
|
||||
mtable = {'seconds': 1, 'minutes': 60, 'hours': 3600, 'days': 86400, 'weeks': 86400 * 7}
|
||||
|
||||
class model(watch_base):
|
||||
__newest_history_key = None
|
||||
@@ -492,7 +492,6 @@ class model(watch_base):
|
||||
|
||||
self.ensure_data_dir_exists()
|
||||
|
||||
threshold = int(os.getenv('SNAPSHOT_BROTLI_COMPRESSION_THRESHOLD', 1024))
|
||||
skip_brotli = strtobool(os.getenv('DISABLE_BROTLI_TEXT_SNAPSHOT', 'False'))
|
||||
|
||||
# Binary data - detect file type and save without compression
|
||||
@@ -516,7 +515,7 @@ class model(watch_base):
|
||||
|
||||
# Text data - use brotli compression if enabled and above threshold
|
||||
else:
|
||||
if not skip_brotli and len(contents) > threshold:
|
||||
if not skip_brotli and len(contents) > BROTLI_COMPRESS_SIZE_THRESHOLD:
|
||||
# Compressed text
|
||||
import brotli
|
||||
snapshot_fname = f"{snapshot_id}.txt.br"
|
||||
|
||||
@@ -86,6 +86,7 @@ class RecheckPriorityQueue:
|
||||
|
||||
def get(self, block: bool = True, timeout: Optional[float] = None):
|
||||
"""Thread-safe sync get with priority ordering"""
|
||||
import queue
|
||||
try:
|
||||
# Wait for notification
|
||||
self.sync_q.get(block=block, timeout=timeout)
|
||||
@@ -103,8 +104,11 @@ class RecheckPriorityQueue:
|
||||
logger.debug(f"Successfully retrieved item: {self._get_item_uuid(item)}")
|
||||
return item
|
||||
|
||||
except queue.Empty:
|
||||
# Queue is empty with timeout - expected behavior, re-raise without logging
|
||||
raise
|
||||
except Exception as e:
|
||||
logger.critical(f"CRITICAL: Failed to get item from queue: {str(e)}")
|
||||
# Re-raise without logging - caller (worker) will handle and log appropriately
|
||||
raise
|
||||
|
||||
# ASYNC INTERFACE (for workers)
|
||||
|
||||
@@ -98,11 +98,12 @@ pytest -vv -s --maxfail=1 tests/test_rss.py
|
||||
pytest -vv -s --maxfail=1 tests/test_unique_lines.py
|
||||
|
||||
# Try high concurrency
|
||||
FETCH_WORKERS=130 pytest tests/test_history_consistency.py -v -l
|
||||
FETCH_WORKERS=50 pytest tests/test_history_consistency.py -vv -l -s
|
||||
|
||||
# Check file:// will pickup a file when enabled
|
||||
echo "Hello world" > /tmp/test-file.txt
|
||||
ALLOW_FILE_URI=yes pytest -vv -s tests/test_security.py
|
||||
|
||||
|
||||
|
||||
# Run it again so that brotli kicks in
|
||||
TEST_WITH_BROTLI=1 SNAPSHOT_BROTLI_COMPRESSION_THRESHOLD=100 FETCH_WORKERS=20 pytest tests/test_history_consistency.py -vv -l -s
|
||||
|
||||
@@ -186,7 +186,7 @@ class ChangeDetectionStore:
|
||||
# Finally start the thread that will manage periodic data saves to JSON
|
||||
# Only start if thread is not already running (reload_state might be called multiple times)
|
||||
if not self.save_data_thread or not self.save_data_thread.is_alive():
|
||||
self.save_data_thread = threading.Thread(target=self.save_datastore)
|
||||
self.save_data_thread = threading.Thread(target=self.save_datastore, daemon=True, name="DatastoreSaver")
|
||||
self.save_data_thread.start()
|
||||
|
||||
def rehydrate_entity(self, uuid, entity, processor_override=None):
|
||||
|
||||
@@ -17,6 +17,8 @@ _MAP = {
|
||||
|
||||
|
||||
def strtobool(value):
|
||||
if not value:
|
||||
return False
|
||||
try:
|
||||
return _MAP[str(value).lower()]
|
||||
except KeyError:
|
||||
|
||||
@@ -270,3 +270,6 @@ def app(request, datastore_path):
|
||||
|
||||
request.addfinalizer(teardown)
|
||||
yield app
|
||||
|
||||
|
||||
|
||||
|
||||
@@ -206,11 +206,10 @@ def test_regex_error_handling(client, live_server, measure_memory_usage, datasto
|
||||
# Add our URL to the import page
|
||||
test_url = url_for('test_endpoint', _external=True)
|
||||
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
|
||||
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
|
||||
|
||||
time.sleep(0.2)
|
||||
### test regex error handling
|
||||
res = client.post(
|
||||
url_for("ui.ui_edit.edit_page", uuid="first"),
|
||||
url_for("ui.ui_edit.edit_page", uuid=uuid),
|
||||
data={"extract_text": '/something bad\d{3/XYZ',
|
||||
"url": test_url,
|
||||
"fetch_backend": "html_requests",
|
||||
|
||||
@@ -4,25 +4,47 @@ import time
|
||||
import os
|
||||
import json
|
||||
from flask import url_for
|
||||
from loguru import logger
|
||||
from .. import strtobool
|
||||
from .util import wait_for_all_checks, delete_all_watches
|
||||
from urllib.parse import urlparse, parse_qs
|
||||
import brotli
|
||||
|
||||
|
||||
def test_consistent_history(client, live_server, measure_memory_usage, datastore_path):
|
||||
# live_server_setup(live_server) # Setup on conftest per function
|
||||
workers = int(os.getenv("FETCH_WORKERS", 10))
|
||||
r = range(1, 10+workers)
|
||||
|
||||
for one in r:
|
||||
test_url = url_for('test_endpoint', content_type="text/html", content=str(one), _external=True)
|
||||
res = client.post(
|
||||
url_for("imports.import_page"),
|
||||
data={"urls": test_url},
|
||||
follow_redirects=True
|
||||
)
|
||||
uuids = set()
|
||||
sys_fetch_workers = int(os.getenv("FETCH_WORKERS", 10))
|
||||
workers = range(1, sys_fetch_workers)
|
||||
now = time.time()
|
||||
|
||||
assert b"1 Imported" in res.data
|
||||
for one in workers:
|
||||
if strtobool(os.getenv("TEST_WITH_BROTLI")):
|
||||
# A very long string that WILL trigger Brotli compression of the snapshot
|
||||
# BROTLI_COMPRESS_SIZE_THRESHOLD should be set to say 200
|
||||
from ..model.Watch import BROTLI_COMPRESS_SIZE_THRESHOLD
|
||||
content = str(one) + "x" + str(one) * (BROTLI_COMPRESS_SIZE_THRESHOLD + 10)
|
||||
else:
|
||||
# Just enough to test datastore
|
||||
content = str(one)+'x'
|
||||
|
||||
test_url = url_for('test_endpoint', content_type="text/html", content=content, _external=True)
|
||||
uuids.add(client.application.config.get('DATASTORE').add_watch(url=test_url, extras={'title': str(one)}))
|
||||
|
||||
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
|
||||
|
||||
wait_for_all_checks(client)
|
||||
duration = time.time() - now
|
||||
per_worker = duration/sys_fetch_workers
|
||||
if sys_fetch_workers < 20:
|
||||
per_worker_threshold=0.6
|
||||
elif sys_fetch_workers < 50:
|
||||
per_worker_threshold = 0.8
|
||||
else:
|
||||
per_worker_threshold = 1.5
|
||||
|
||||
logger.debug(f"All fetched in {duration:.2f}s, {per_worker}s per worker")
|
||||
# Problematic on github
|
||||
#assert per_worker < per_worker_threshold, f"If concurrency is working good, no blocking async problems, each worker ({sys_fetch_workers} workers) should have done his job in under {per_worker_threshold}s, got {per_worker:.2f}s per worker, total duration was {duration:.2f}s"
|
||||
|
||||
# Essentially just triggers the DB write/update
|
||||
res = client.post(
|
||||
@@ -34,7 +56,7 @@ def test_consistent_history(client, live_server, measure_memory_usage, datastore
|
||||
)
|
||||
assert b"Settings updated." in res.data
|
||||
|
||||
|
||||
# Wait for the sync DB save to happen
|
||||
time.sleep(2)
|
||||
|
||||
json_db_file = os.path.join(live_server.app.config['DATASTORE'].datastore_path, 'url-watches.json')
|
||||
@@ -44,14 +66,18 @@ def test_consistent_history(client, live_server, measure_memory_usage, datastore
|
||||
json_obj = json.load(f)
|
||||
|
||||
# assert the right amount of watches was found in the JSON
|
||||
assert len(json_obj['watching']) == len(r), "Correct number of watches was found in the JSON"
|
||||
i=0
|
||||
assert len(json_obj['watching']) == len(workers), "Correct number of watches was found in the JSON"
|
||||
|
||||
i = 0
|
||||
# each one should have a history.txt containing just one line
|
||||
for w in json_obj['watching'].keys():
|
||||
i+=1
|
||||
i += 1
|
||||
history_txt_index_file = os.path.join(live_server.app.config['DATASTORE'].datastore_path, w, 'history.txt')
|
||||
assert os.path.isfile(history_txt_index_file), f"History.txt should exist where I expect it at {history_txt_index_file}"
|
||||
|
||||
# Should be no errors (could be from brotli etc)
|
||||
assert not live_server.app.config['DATASTORE'].data['watching'][w].get('last_error')
|
||||
|
||||
# Same like in model.Watch
|
||||
with open(history_txt_index_file, "r") as f:
|
||||
tmp_history = dict(i.strip().split(',', 2) for i in f.readlines())
|
||||
@@ -63,15 +89,21 @@ def test_consistent_history(client, live_server, measure_memory_usage, datastore
|
||||
# Find the snapshot one
|
||||
for fname in files_in_watch_dir:
|
||||
if fname != 'history.txt' and 'html' not in fname:
|
||||
if strtobool(os.getenv("TEST_WITH_BROTLI")):
|
||||
assert fname.endswith('.br'), "Forced TEST_WITH_BROTLI then it should be a .br filename"
|
||||
|
||||
full_snapshot_history_path = os.path.join(live_server.app.config['DATASTORE'].datastore_path, w, fname)
|
||||
# contents should match what we requested as content returned from the test url
|
||||
with open(os.path.join(live_server.app.config['DATASTORE'].datastore_path, w, fname), 'r') as snapshot_f:
|
||||
contents = snapshot_f.read()
|
||||
watch_url = json_obj['watching'][w]['url']
|
||||
u = urlparse(watch_url)
|
||||
q = parse_qs(u[4])
|
||||
assert q['content'][0] == contents.strip(), f"Snapshot file {fname} should contain {q['content'][0]}"
|
||||
|
||||
if fname.endswith('.br'):
|
||||
with open(full_snapshot_history_path, 'rb') as f:
|
||||
contents = brotli.decompress(f.read()).decode('utf-8')
|
||||
else:
|
||||
with open(full_snapshot_history_path, 'r') as snapshot_f:
|
||||
contents = snapshot_f.read()
|
||||
|
||||
watch_title = json_obj['watching'][w]['title']
|
||||
assert json_obj['watching'][w]['title'], "Watch should have a title set"
|
||||
assert contents.startswith(watch_title + "x"), f"Snapshot contents in file {fname} should start with '{watch_title}x', got '{contents}'"
|
||||
|
||||
assert len(files_in_watch_dir) == 3, "Should be just three files in the dir, html.br snapshot, history.txt and the extracted text snapshot"
|
||||
|
||||
|
||||
@@ -25,12 +25,13 @@ def test_content_filter_live_preview(client, live_server, measure_memory_usage,
|
||||
|
||||
test_url = url_for('test_endpoint', _external=True)
|
||||
|
||||
res = client.post(
|
||||
url_for("ui.ui_views.form_quick_watch_add"),
|
||||
data={"url": test_url, "tags": ''},
|
||||
follow_redirects=True
|
||||
)
|
||||
uuid = next(iter(live_server.app.config['DATASTORE'].data['watching']))
|
||||
|
||||
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
|
||||
res = client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
|
||||
assert b'Queued 1 watch for rechecking.' in res.data
|
||||
|
||||
wait_for_all_checks(client)
|
||||
|
||||
res = client.post(
|
||||
url_for("ui.ui_edit.edit_page", uuid=uuid),
|
||||
data={
|
||||
|
||||
@@ -2,7 +2,7 @@
|
||||
|
||||
import time
|
||||
from flask import url_for
|
||||
from .util import live_server_setup, wait_for_all_checks, extract_rss_token_from_UI, get_UUID_for_tag_name, delete_all_watches
|
||||
from .util import live_server_setup, wait_for_all_checks, wait_for_watch_history, extract_rss_token_from_UI, get_UUID_for_tag_name, delete_all_watches
|
||||
import os
|
||||
|
||||
|
||||
@@ -87,6 +87,9 @@ def test_rss_group(client, live_server, measure_memory_usage, datastore_path):
|
||||
# Wait for initial checks to complete
|
||||
wait_for_all_checks(client)
|
||||
|
||||
# Ensure initial snapshots are saved
|
||||
assert wait_for_watch_history(client, min_history_count=1, timeout=10), "Watches did not save initial snapshots"
|
||||
|
||||
# Trigger a change
|
||||
set_modified_response(datastore_path=datastore_path)
|
||||
|
||||
@@ -94,6 +97,9 @@ def test_rss_group(client, live_server, measure_memory_usage, datastore_path):
|
||||
res = client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
|
||||
wait_for_all_checks(client)
|
||||
|
||||
# Ensure all watches have sufficient history for RSS generation
|
||||
assert wait_for_watch_history(client, min_history_count=2, timeout=10), "Watches did not accumulate sufficient history"
|
||||
|
||||
# Get RSS token
|
||||
rss_token = extract_rss_token_from_UI(client)
|
||||
assert rss_token is not None
|
||||
@@ -216,11 +222,13 @@ def test_rss_group_only_unviewed(client, live_server, measure_memory_usage, data
|
||||
assert b"Watch added" in res.data
|
||||
|
||||
wait_for_all_checks(client)
|
||||
assert wait_for_watch_history(client, min_history_count=1, timeout=10), "Initial snapshots not saved"
|
||||
|
||||
# Trigger changes
|
||||
set_modified_response(datastore_path=datastore_path)
|
||||
res = client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
|
||||
wait_for_all_checks(client)
|
||||
assert wait_for_watch_history(client, min_history_count=2, timeout=10), "History not accumulated"
|
||||
|
||||
# Get RSS token
|
||||
rss_token = extract_rss_token_from_UI(client)
|
||||
|
||||
@@ -1,8 +1,10 @@
|
||||
import sys
|
||||
import os
|
||||
import pytest
|
||||
|
||||
from changedetectionio import html_tools
|
||||
|
||||
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||||
import html_tools
|
||||
|
||||
# test generation guide.
|
||||
# 1. Do not include encoding in the xml declaration if the test object is a str type.
|
||||
|
||||
@@ -164,14 +164,45 @@ def wait_for_all_checks(client=None):
|
||||
if q_length == 0 and not any_workers_busy:
|
||||
if empty_since is None:
|
||||
empty_since = time.time()
|
||||
elif time.time() - empty_since >= 0.15: # Shorter wait
|
||||
# Brief stabilization period for async workers
|
||||
elif time.time() - empty_since >= 0.3:
|
||||
break
|
||||
else:
|
||||
empty_since = None
|
||||
|
||||
|
||||
attempt += 1
|
||||
time.sleep(0.3)
|
||||
|
||||
def wait_for_watch_history(client, min_history_count=2, timeout=10):
|
||||
"""
|
||||
Wait for watches to have sufficient history entries.
|
||||
Useful after wait_for_all_checks() when you need to ensure history is populated.
|
||||
|
||||
Args:
|
||||
client: Test client with access to datastore
|
||||
min_history_count: Minimum number of history entries required
|
||||
timeout: Maximum time to wait in seconds
|
||||
"""
|
||||
datastore = client.application.config.get('DATASTORE')
|
||||
start_time = time.time()
|
||||
|
||||
while time.time() - start_time < timeout:
|
||||
all_have_history = True
|
||||
for uuid, watch in datastore.data['watching'].items():
|
||||
history_count = len(watch.history.keys())
|
||||
if history_count < min_history_count:
|
||||
all_have_history = False
|
||||
break
|
||||
|
||||
if all_have_history:
|
||||
return True
|
||||
|
||||
time.sleep(0.2)
|
||||
|
||||
# Timeout - return False
|
||||
return False
|
||||
|
||||
|
||||
# Replaced by new_live_server_setup and calling per function scope in conftest.py
|
||||
def live_server_setup(live_server):
|
||||
return True
|
||||
@@ -189,6 +220,8 @@ def new_live_server_setup(live_server):
|
||||
|
||||
@live_server.app.route('/test-endpoint')
|
||||
def test_endpoint():
|
||||
from loguru import logger
|
||||
logger.debug(f"/test-endpoint hit {request}")
|
||||
ctype = request.args.get('content_type')
|
||||
status_code = request.args.get('status_code')
|
||||
content = request.args.get('content') or None
|
||||
|
||||
@@ -144,7 +144,6 @@ def test_basic_browserstep(client, live_server, measure_memory_usage, datastore_
|
||||
|
||||
def test_non_200_errors_report_browsersteps(client, live_server, measure_memory_usage, datastore_path):
|
||||
|
||||
|
||||
four_o_four_url = url_for('test_endpoint', status_code=404, _external=True)
|
||||
four_o_four_url = four_o_four_url.replace('localhost.localdomain', 'cdio')
|
||||
four_o_four_url = four_o_four_url.replace('localhost', 'cdio')
|
||||
@@ -186,3 +185,65 @@ def test_non_200_errors_report_browsersteps(client, live_server, measure_memory_
|
||||
url_for("ui.form_delete", uuid="all"),
|
||||
follow_redirects=True
|
||||
)
|
||||
|
||||
def test_browsersteps_edit_UI_startsession(client, live_server, measure_memory_usage, datastore_path):
|
||||
|
||||
assert os.getenv('PLAYWRIGHT_DRIVER_URL'), "Needs PLAYWRIGHT_DRIVER_URL set for this test"
|
||||
|
||||
# Add a watch first
|
||||
test_url = url_for('test_interactive_html_endpoint', _external=True)
|
||||
test_url = test_url.replace('localhost.localdomain', 'cdio')
|
||||
test_url = test_url.replace('localhost', 'cdio')
|
||||
|
||||
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url, extras={'fetch_backend': 'html_webdriver', 'paused': True})
|
||||
|
||||
# Test starting a browsersteps session
|
||||
res = client.get(
|
||||
url_for("browser_steps.browsersteps_start_session", uuid=uuid),
|
||||
follow_redirects=True
|
||||
)
|
||||
|
||||
assert res.status_code == 200
|
||||
assert res.is_json
|
||||
json_data = res.get_json()
|
||||
assert 'browsersteps_session_id' in json_data
|
||||
assert json_data['browsersteps_session_id'] # Not empty
|
||||
|
||||
browsersteps_session_id = json_data['browsersteps_session_id']
|
||||
|
||||
# Verify the session exists in browsersteps_sessions
|
||||
from changedetectionio.blueprint.browser_steps import browsersteps_sessions, browsersteps_watch_to_session
|
||||
assert browsersteps_session_id in browsersteps_sessions
|
||||
assert uuid in browsersteps_watch_to_session
|
||||
assert browsersteps_watch_to_session[uuid] == browsersteps_session_id
|
||||
|
||||
# Verify browsersteps UI shows up on edit page
|
||||
res = client.get(url_for("ui.ui_edit.edit_page", uuid=uuid))
|
||||
assert b'browsersteps-click-start' in res.data, "Browsersteps manual UI shows up"
|
||||
|
||||
# Session should still exist after GET (not cleaned up yet)
|
||||
assert browsersteps_session_id in browsersteps_sessions
|
||||
assert uuid in browsersteps_watch_to_session
|
||||
|
||||
# Test cleanup happens on save (POST)
|
||||
res = client.post(
|
||||
url_for("ui.ui_edit.edit_page", uuid=uuid),
|
||||
data={
|
||||
"url": test_url,
|
||||
"tags": "",
|
||||
'fetch_backend': "html_webdriver",
|
||||
"time_between_check_use_default": "y",
|
||||
},
|
||||
follow_redirects=True
|
||||
)
|
||||
assert b"Updated watch" in res.data
|
||||
|
||||
# NOW verify the session was cleaned up after save
|
||||
assert browsersteps_session_id not in browsersteps_sessions
|
||||
assert uuid not in browsersteps_watch_to_session
|
||||
|
||||
# Cleanup
|
||||
client.get(
|
||||
url_for("ui.form_delete", uuid="all"),
|
||||
follow_redirects=True
|
||||
)
|
||||
|
||||
@@ -2,19 +2,18 @@
|
||||
Worker management module for changedetection.io
|
||||
|
||||
Handles asynchronous workers for dynamic worker scaling.
|
||||
Sync worker support has been removed in favor of async-only architecture.
|
||||
Each worker runs in its own thread with its own event loop for isolation.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import os
|
||||
import threading
|
||||
import time
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
from loguru import logger
|
||||
|
||||
# Global worker state
|
||||
running_async_tasks = []
|
||||
async_loop = None
|
||||
async_loop_thread = None
|
||||
# Global worker state - each worker has its own thread and event loop
|
||||
worker_threads = [] # List of WorkerThread objects
|
||||
|
||||
# Track currently processing UUIDs for async workers - maps {uuid: worker_id}
|
||||
currently_processing_uuids = {}
|
||||
@@ -22,89 +21,118 @@ currently_processing_uuids = {}
|
||||
# Configuration - async workers only
|
||||
USE_ASYNC_WORKERS = True
|
||||
|
||||
# Custom ThreadPoolExecutor for queue operations with named threads
|
||||
# Scale executor threads with FETCH_WORKERS to avoid bottleneck at high concurrency
|
||||
_max_executor_workers = max(50, int(os.getenv("FETCH_WORKERS", "10")))
|
||||
queue_executor = ThreadPoolExecutor(
|
||||
max_workers=_max_executor_workers,
|
||||
thread_name_prefix="QueueGetter-"
|
||||
)
|
||||
|
||||
def start_async_event_loop():
|
||||
"""Start a dedicated event loop for async workers in a separate thread"""
|
||||
global async_loop
|
||||
logger.info("Starting async event loop for workers")
|
||||
|
||||
try:
|
||||
# Create a new event loop for this thread
|
||||
async_loop = asyncio.new_event_loop()
|
||||
# Set it as the event loop for this thread
|
||||
asyncio.set_event_loop(async_loop)
|
||||
|
||||
logger.debug(f"Event loop created and set: {async_loop}")
|
||||
|
||||
# Run the event loop forever
|
||||
async_loop.run_forever()
|
||||
except Exception as e:
|
||||
logger.error(f"Async event loop error: {e}")
|
||||
finally:
|
||||
# Clean up
|
||||
if async_loop and not async_loop.is_closed():
|
||||
async_loop.close()
|
||||
async_loop = None
|
||||
logger.info("Async event loop stopped")
|
||||
|
||||
class WorkerThread:
|
||||
"""Container for a worker thread with its own event loop"""
|
||||
def __init__(self, worker_id, update_q, notification_q, app, datastore):
|
||||
self.worker_id = worker_id
|
||||
self.update_q = update_q
|
||||
self.notification_q = notification_q
|
||||
self.app = app
|
||||
self.datastore = datastore
|
||||
self.thread = None
|
||||
self.loop = None
|
||||
self.running = False
|
||||
|
||||
def run(self):
|
||||
"""Run the worker in its own event loop"""
|
||||
try:
|
||||
# Create a new event loop for this thread
|
||||
self.loop = asyncio.new_event_loop()
|
||||
asyncio.set_event_loop(self.loop)
|
||||
self.running = True
|
||||
|
||||
# Run the worker coroutine
|
||||
self.loop.run_until_complete(
|
||||
start_single_async_worker(
|
||||
self.worker_id,
|
||||
self.update_q,
|
||||
self.notification_q,
|
||||
self.app,
|
||||
self.datastore,
|
||||
queue_executor
|
||||
)
|
||||
)
|
||||
except asyncio.CancelledError:
|
||||
# Normal shutdown - worker was cancelled
|
||||
import os
|
||||
in_pytest = "pytest" in os.sys.modules or "PYTEST_CURRENT_TEST" in os.environ
|
||||
if not in_pytest:
|
||||
logger.info(f"Worker {self.worker_id} shutting down gracefully")
|
||||
except RuntimeError as e:
|
||||
# Ignore expected shutdown errors
|
||||
if "Event loop stopped" not in str(e) and "Event loop is closed" not in str(e):
|
||||
logger.error(f"Worker {self.worker_id} runtime error: {e}")
|
||||
except Exception as e:
|
||||
logger.error(f"Worker {self.worker_id} thread error: {e}")
|
||||
finally:
|
||||
# Clean up
|
||||
if self.loop and not self.loop.is_closed():
|
||||
self.loop.close()
|
||||
self.running = False
|
||||
self.loop = None
|
||||
|
||||
def start(self):
|
||||
"""Start the worker thread"""
|
||||
self.thread = threading.Thread(
|
||||
target=self.run,
|
||||
daemon=True,
|
||||
name=f"PageFetchAsyncUpdateWorker-{self.worker_id}"
|
||||
)
|
||||
self.thread.start()
|
||||
|
||||
def stop(self):
|
||||
"""Stop the worker thread"""
|
||||
if self.loop and self.running:
|
||||
try:
|
||||
# Signal the loop to stop
|
||||
self.loop.call_soon_threadsafe(self.loop.stop)
|
||||
except RuntimeError:
|
||||
pass
|
||||
|
||||
if self.thread and self.thread.is_alive():
|
||||
self.thread.join(timeout=2.0)
|
||||
|
||||
|
||||
def start_async_workers(n_workers, update_q, notification_q, app, datastore):
|
||||
"""Start the async worker management system"""
|
||||
global async_loop_thread, async_loop, running_async_tasks, currently_processing_uuids
|
||||
|
||||
# Clear any stale UUID tracking state
|
||||
"""Start async workers, each with its own thread and event loop for isolation"""
|
||||
global worker_threads, currently_processing_uuids
|
||||
|
||||
# Clear any stale state
|
||||
currently_processing_uuids.clear()
|
||||
|
||||
# Start the event loop in a separate thread
|
||||
async_loop_thread = threading.Thread(target=start_async_event_loop, daemon=True)
|
||||
async_loop_thread.start()
|
||||
|
||||
# Wait for the loop to be available (with timeout for safety)
|
||||
max_wait_time = 5.0
|
||||
wait_start = time.time()
|
||||
while async_loop is None and (time.time() - wait_start) < max_wait_time:
|
||||
time.sleep(0.1)
|
||||
|
||||
if async_loop is None:
|
||||
logger.error("Failed to start async event loop within timeout")
|
||||
return
|
||||
|
||||
# Additional brief wait to ensure loop is running
|
||||
time.sleep(0.2)
|
||||
|
||||
# Start async workers
|
||||
logger.info(f"Starting {n_workers} async workers")
|
||||
|
||||
# Start each worker in its own thread with its own event loop
|
||||
logger.info(f"Starting {n_workers} async workers (isolated threads)")
|
||||
for i in range(n_workers):
|
||||
try:
|
||||
# Use a factory function to create named worker coroutines
|
||||
def create_named_worker(worker_id):
|
||||
async def named_worker():
|
||||
task = asyncio.current_task()
|
||||
if task:
|
||||
task.set_name(f"async-worker-{worker_id}")
|
||||
return await start_single_async_worker(worker_id, update_q, notification_q, app, datastore)
|
||||
return named_worker()
|
||||
|
||||
task_future = asyncio.run_coroutine_threadsafe(create_named_worker(i), async_loop)
|
||||
running_async_tasks.append(task_future)
|
||||
except RuntimeError as e:
|
||||
worker = WorkerThread(i, update_q, notification_q, app, datastore)
|
||||
worker.start()
|
||||
worker_threads.append(worker)
|
||||
# No sleep needed - threads start independently and asynchronously
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to start async worker {i}: {e}")
|
||||
continue
|
||||
|
||||
|
||||
async def start_single_async_worker(worker_id, update_q, notification_q, app, datastore):
|
||||
async def start_single_async_worker(worker_id, update_q, notification_q, app, datastore, executor=None):
|
||||
"""Start a single async worker with auto-restart capability"""
|
||||
from changedetectionio.async_update_worker import async_update_worker
|
||||
|
||||
|
||||
# Check if we're in pytest environment - if so, be more gentle with logging
|
||||
import os
|
||||
in_pytest = "pytest" in os.sys.modules or "PYTEST_CURRENT_TEST" in os.environ
|
||||
|
||||
|
||||
while not app.config.exit.is_set():
|
||||
try:
|
||||
if not in_pytest:
|
||||
logger.info(f"Starting async worker {worker_id}")
|
||||
await async_update_worker(worker_id, update_q, notification_q, app, datastore)
|
||||
await async_update_worker(worker_id, update_q, notification_q, app, datastore, executor)
|
||||
# If we reach here, worker exited cleanly
|
||||
if not in_pytest:
|
||||
logger.info(f"Async worker {worker_id} exited cleanly")
|
||||
@@ -131,39 +159,38 @@ def start_workers(n_workers, update_q, notification_q, app, datastore):
|
||||
|
||||
def add_worker(update_q, notification_q, app, datastore):
|
||||
"""Add a new async worker (for dynamic scaling)"""
|
||||
global running_async_tasks
|
||||
|
||||
if not async_loop:
|
||||
logger.error("Async loop not running, cannot add worker")
|
||||
return False
|
||||
|
||||
worker_id = len(running_async_tasks)
|
||||
global worker_threads
|
||||
|
||||
worker_id = len(worker_threads)
|
||||
logger.info(f"Adding async worker {worker_id}")
|
||||
|
||||
task_future = asyncio.run_coroutine_threadsafe(
|
||||
start_single_async_worker(worker_id, update_q, notification_q, app, datastore), async_loop
|
||||
)
|
||||
running_async_tasks.append(task_future)
|
||||
return True
|
||||
|
||||
try:
|
||||
worker = WorkerThread(worker_id, update_q, notification_q, app, datastore)
|
||||
worker.start()
|
||||
worker_threads.append(worker)
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to add worker {worker_id}: {e}")
|
||||
return False
|
||||
|
||||
|
||||
def remove_worker():
|
||||
"""Remove an async worker (for dynamic scaling)"""
|
||||
global running_async_tasks
|
||||
|
||||
if not running_async_tasks:
|
||||
global worker_threads
|
||||
|
||||
if not worker_threads:
|
||||
return False
|
||||
|
||||
# Cancel the last worker
|
||||
task_future = running_async_tasks.pop()
|
||||
task_future.cancel()
|
||||
logger.info(f"Removed async worker, {len(running_async_tasks)} workers remaining")
|
||||
|
||||
# Stop the last worker
|
||||
worker = worker_threads.pop()
|
||||
worker.stop()
|
||||
logger.info(f"Removed async worker, {len(worker_threads)} workers remaining")
|
||||
return True
|
||||
|
||||
|
||||
def get_worker_count():
|
||||
"""Get current number of async workers"""
|
||||
return len(running_async_tasks)
|
||||
return len(worker_threads)
|
||||
|
||||
|
||||
def get_running_uuids():
|
||||
@@ -249,38 +276,21 @@ def queue_item_async_safe(update_q, item, silent=False):
|
||||
|
||||
def shutdown_workers():
|
||||
"""Shutdown all async workers fast and aggressively"""
|
||||
global async_loop, async_loop_thread, running_async_tasks
|
||||
|
||||
global worker_threads
|
||||
|
||||
# Check if we're in pytest environment - if so, be more gentle with logging
|
||||
import os
|
||||
in_pytest = "pytest" in os.sys.modules or "PYTEST_CURRENT_TEST" in os.environ
|
||||
|
||||
|
||||
if not in_pytest:
|
||||
logger.info("Fast shutdown of async workers initiated...")
|
||||
|
||||
# Cancel all async tasks immediately
|
||||
for task_future in running_async_tasks:
|
||||
if not task_future.done():
|
||||
task_future.cancel()
|
||||
|
||||
# Stop the async event loop immediately
|
||||
if async_loop and not async_loop.is_closed():
|
||||
try:
|
||||
async_loop.call_soon_threadsafe(async_loop.stop)
|
||||
except RuntimeError:
|
||||
# Loop might already be stopped
|
||||
pass
|
||||
|
||||
running_async_tasks.clear()
|
||||
async_loop = None
|
||||
|
||||
# Give async thread minimal time to finish, then continue
|
||||
if async_loop_thread and async_loop_thread.is_alive():
|
||||
async_loop_thread.join(timeout=1.0) # Only 1 second timeout
|
||||
if async_loop_thread.is_alive() and not in_pytest:
|
||||
logger.info("Async thread still running after timeout - continuing with shutdown")
|
||||
async_loop_thread = None
|
||||
|
||||
|
||||
# Stop all worker threads
|
||||
for worker in worker_threads:
|
||||
worker.stop()
|
||||
|
||||
worker_threads.clear()
|
||||
|
||||
if not in_pytest:
|
||||
logger.info("Async workers fast shutdown complete")
|
||||
|
||||
@@ -290,69 +300,57 @@ def shutdown_workers():
|
||||
def adjust_async_worker_count(new_count, update_q=None, notification_q=None, app=None, datastore=None):
|
||||
"""
|
||||
Dynamically adjust the number of async workers.
|
||||
|
||||
|
||||
Args:
|
||||
new_count: Target number of workers
|
||||
update_q, notification_q, app, datastore: Required for adding new workers
|
||||
|
||||
|
||||
Returns:
|
||||
dict: Status of the adjustment operation
|
||||
"""
|
||||
global running_async_tasks
|
||||
|
||||
global worker_threads
|
||||
|
||||
current_count = get_worker_count()
|
||||
|
||||
|
||||
if new_count == current_count:
|
||||
return {
|
||||
'status': 'no_change',
|
||||
'message': f'Worker count already at {current_count}',
|
||||
'current_count': current_count
|
||||
}
|
||||
|
||||
|
||||
if new_count > current_count:
|
||||
# Add workers
|
||||
workers_to_add = new_count - current_count
|
||||
logger.info(f"Adding {workers_to_add} async workers (from {current_count} to {new_count})")
|
||||
|
||||
|
||||
if not all([update_q, notification_q, app, datastore]):
|
||||
return {
|
||||
'status': 'error',
|
||||
'message': 'Missing required parameters to add workers',
|
||||
'current_count': current_count
|
||||
}
|
||||
|
||||
|
||||
for i in range(workers_to_add):
|
||||
worker_id = len(running_async_tasks)
|
||||
task_future = asyncio.run_coroutine_threadsafe(
|
||||
start_single_async_worker(worker_id, update_q, notification_q, app, datastore),
|
||||
async_loop
|
||||
)
|
||||
running_async_tasks.append(task_future)
|
||||
|
||||
add_worker(update_q, notification_q, app, datastore)
|
||||
|
||||
return {
|
||||
'status': 'success',
|
||||
'message': f'Added {workers_to_add} workers',
|
||||
'previous_count': current_count,
|
||||
'current_count': new_count
|
||||
'current_count': len(worker_threads)
|
||||
}
|
||||
|
||||
|
||||
else:
|
||||
# Remove workers
|
||||
workers_to_remove = current_count - new_count
|
||||
logger.info(f"Removing {workers_to_remove} async workers (from {current_count} to {new_count})")
|
||||
|
||||
|
||||
removed_count = 0
|
||||
for _ in range(workers_to_remove):
|
||||
if running_async_tasks:
|
||||
task_future = running_async_tasks.pop()
|
||||
task_future.cancel()
|
||||
# Wait for the task to actually stop
|
||||
try:
|
||||
task_future.result(timeout=5) # 5 second timeout
|
||||
except Exception:
|
||||
pass # Task was cancelled, which is expected
|
||||
if remove_worker():
|
||||
removed_count += 1
|
||||
|
||||
|
||||
return {
|
||||
'status': 'success',
|
||||
'message': f'Removed {removed_count} workers',
|
||||
@@ -367,72 +365,58 @@ def get_worker_status():
|
||||
'worker_type': 'async',
|
||||
'worker_count': get_worker_count(),
|
||||
'running_uuids': get_running_uuids(),
|
||||
'async_loop_running': async_loop is not None,
|
||||
'active_threads': sum(1 for w in worker_threads if w.thread and w.thread.is_alive()),
|
||||
}
|
||||
|
||||
|
||||
def check_worker_health(expected_count, update_q=None, notification_q=None, app=None, datastore=None):
|
||||
"""
|
||||
Check if the expected number of async workers are running and restart any missing ones.
|
||||
|
||||
|
||||
Args:
|
||||
expected_count: Expected number of workers
|
||||
update_q, notification_q, app, datastore: Required for restarting workers
|
||||
|
||||
|
||||
Returns:
|
||||
dict: Health check results
|
||||
"""
|
||||
global running_async_tasks
|
||||
|
||||
global worker_threads
|
||||
|
||||
current_count = get_worker_count()
|
||||
|
||||
if current_count == expected_count:
|
||||
|
||||
# Check which workers are actually alive
|
||||
alive_count = sum(1 for w in worker_threads if w.thread and w.thread.is_alive())
|
||||
|
||||
if alive_count == expected_count:
|
||||
return {
|
||||
'status': 'healthy',
|
||||
'expected_count': expected_count,
|
||||
'actual_count': current_count,
|
||||
'actual_count': alive_count,
|
||||
'message': f'All {expected_count} async workers running'
|
||||
}
|
||||
|
||||
# Check for crashed async workers
|
||||
|
||||
# Find dead workers
|
||||
dead_workers = []
|
||||
alive_count = 0
|
||||
|
||||
for i, task_future in enumerate(running_async_tasks[:]):
|
||||
if task_future.done():
|
||||
try:
|
||||
result = task_future.result()
|
||||
dead_workers.append(i)
|
||||
logger.warning(f"Async worker {i} completed unexpectedly")
|
||||
except Exception as e:
|
||||
dead_workers.append(i)
|
||||
logger.error(f"Async worker {i} crashed: {e}")
|
||||
else:
|
||||
alive_count += 1
|
||||
|
||||
for i, worker in enumerate(worker_threads[:]):
|
||||
if not worker.thread or not worker.thread.is_alive():
|
||||
dead_workers.append(i)
|
||||
logger.warning(f"Async worker {worker.worker_id} thread is dead")
|
||||
|
||||
# Remove dead workers from tracking
|
||||
for i in reversed(dead_workers):
|
||||
if i < len(running_async_tasks):
|
||||
running_async_tasks.pop(i)
|
||||
|
||||
if i < len(worker_threads):
|
||||
worker_threads.pop(i)
|
||||
|
||||
missing_workers = expected_count - alive_count
|
||||
restarted_count = 0
|
||||
|
||||
|
||||
if missing_workers > 0 and all([update_q, notification_q, app, datastore]):
|
||||
logger.info(f"Restarting {missing_workers} crashed async workers")
|
||||
|
||||
|
||||
for i in range(missing_workers):
|
||||
worker_id = alive_count + i
|
||||
try:
|
||||
task_future = asyncio.run_coroutine_threadsafe(
|
||||
start_single_async_worker(worker_id, update_q, notification_q, app, datastore),
|
||||
async_loop
|
||||
)
|
||||
running_async_tasks.append(task_future)
|
||||
if add_worker(update_q, notification_q, app, datastore):
|
||||
restarted_count += 1
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to restart worker {worker_id}: {e}")
|
||||
|
||||
|
||||
return {
|
||||
'status': 'repaired' if restarted_count > 0 else 'degraded',
|
||||
'expected_count': expected_count,
|
||||
|
||||
Reference in New Issue
Block a user