mirror of
				https://github.com/dgtlmoon/changedetection.io.git
				synced 2025-10-31 14:47:21 +00:00 
			
		
		
		
	Compare commits
	
		
			14 Commits
		
	
	
		
			openai-int
			...
			3356-API-r
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
|   | 0c892387b3 | ||
|   | 83f35e98e4 | ||
|   | 1aae6220e5 | ||
|   | b7da6f0ca7 | ||
| ![dependabot[bot]](/assets/img/avatar_default.png)  | e4a81ebe08 | ||
|   | a4edc46af0 | ||
|   | 767db3b79b | ||
| ![dependabot[bot]](/assets/img/avatar_default.png)  | 4f6e9dcc56 | ||
|   | aa4e182549 | ||
|   | fe1f7c30e1 | ||
|   | e5ed1ae349 | ||
|   | d1b1dd70f4 | ||
|   | 93b14c9fc8 | ||
|   | c9c5de20d8 | 
							
								
								
									
										5
									
								
								.github/test/Dockerfile-alpine
									
									
									
									
										vendored
									
									
								
							
							
						
						
									
										5
									
								
								.github/test/Dockerfile-alpine
									
									
									
									
										vendored
									
									
								
							| @@ -2,7 +2,7 @@ | ||||
| # Test that we can still build on Alpine (musl modified libc https://musl.libc.org/) | ||||
| # Some packages wont install via pypi because they dont have a wheel available under this architecture. | ||||
|  | ||||
| FROM ghcr.io/linuxserver/baseimage-alpine:3.21 | ||||
| FROM ghcr.io/linuxserver/baseimage-alpine:3.22 | ||||
| ENV PYTHONUNBUFFERED=1 | ||||
|  | ||||
| COPY requirements.txt /requirements.txt | ||||
| @@ -24,12 +24,13 @@ RUN \ | ||||
|   apk add --update --no-cache \ | ||||
|     libjpeg \ | ||||
|     libxslt \ | ||||
|     file \ | ||||
|     nodejs \ | ||||
|     poppler-utils \ | ||||
|     python3 && \ | ||||
|   echo "**** pip3 install test of changedetection.io ****" && \ | ||||
|   python3 -m venv /lsiopy  && \ | ||||
|   pip install -U pip wheel setuptools && \ | ||||
|   pip install -U --no-cache-dir --find-links https://wheel-index.linuxserver.io/alpine-3.21/ -r /requirements.txt && \ | ||||
|   pip install -U --no-cache-dir --find-links https://wheel-index.linuxserver.io/alpine-3.22/ -r /requirements.txt && \ | ||||
|   apk del --purge \ | ||||
|     build-dependencies | ||||
|   | ||||
							
								
								
									
										2
									
								
								.github/workflows/codeql-analysis.yml
									
									
									
									
										vendored
									
									
								
							
							
						
						
									
										2
									
								
								.github/workflows/codeql-analysis.yml
									
									
									
									
										vendored
									
									
								
							| @@ -30,7 +30,7 @@ jobs: | ||||
|  | ||||
|     steps: | ||||
|     - name: Checkout repository | ||||
|       uses: actions/checkout@v4 | ||||
|       uses: actions/checkout@v5 | ||||
|  | ||||
|     # Initializes the CodeQL tools for scanning. | ||||
|     - name: Initialize CodeQL | ||||
|   | ||||
							
								
								
									
										2
									
								
								.github/workflows/containers.yml
									
									
									
									
										vendored
									
									
								
							
							
						
						
									
										2
									
								
								.github/workflows/containers.yml
									
									
									
									
										vendored
									
									
								
							| @@ -39,7 +39,7 @@ jobs: | ||||
|     # Or if we are in a tagged release scenario. | ||||
|     if: ${{ github.event.workflow_run.conclusion == 'success' }} || ${{ github.event.release.tag_name }} != '' | ||||
|     steps: | ||||
|       - uses: actions/checkout@v4 | ||||
|       - uses: actions/checkout@v5 | ||||
|       - name: Set up Python 3.11 | ||||
|         uses: actions/setup-python@v5 | ||||
|         with: | ||||
|   | ||||
							
								
								
									
										6
									
								
								.github/workflows/pypi-release.yml
									
									
									
									
										vendored
									
									
								
							
							
						
						
									
										6
									
								
								.github/workflows/pypi-release.yml
									
									
									
									
										vendored
									
									
								
							| @@ -7,7 +7,7 @@ jobs: | ||||
|     runs-on: ubuntu-latest | ||||
|  | ||||
|     steps: | ||||
|     - uses: actions/checkout@v4 | ||||
|     - uses: actions/checkout@v5 | ||||
|     - name: Set up Python | ||||
|       uses: actions/setup-python@v5 | ||||
|       with: | ||||
| @@ -34,7 +34,7 @@ jobs: | ||||
|     - build | ||||
|     steps: | ||||
|     - name: Download all the dists | ||||
|       uses: actions/download-artifact@v4 | ||||
|       uses: actions/download-artifact@v5 | ||||
|       with: | ||||
|         name: python-package-distributions | ||||
|         path: dist/ | ||||
| @@ -72,7 +72,7 @@ jobs: | ||||
|  | ||||
|     steps: | ||||
|     - name: Download all the dists | ||||
|       uses: actions/download-artifact@v4 | ||||
|       uses: actions/download-artifact@v5 | ||||
|       with: | ||||
|         name: python-package-distributions | ||||
|         path: dist/ | ||||
|   | ||||
							
								
								
									
										2
									
								
								.github/workflows/test-container-build.yml
									
									
									
									
										vendored
									
									
								
							
							
						
						
									
										2
									
								
								.github/workflows/test-container-build.yml
									
									
									
									
										vendored
									
									
								
							| @@ -46,7 +46,7 @@ jobs: | ||||
|           - platform: linux/arm64 | ||||
|             dockerfile: ./.github/test/Dockerfile-alpine | ||||
|     steps: | ||||
|         - uses: actions/checkout@v4 | ||||
|         - uses: actions/checkout@v5 | ||||
|         - name: Set up Python 3.11 | ||||
|           uses: actions/setup-python@v5 | ||||
|           with: | ||||
|   | ||||
							
								
								
									
										2
									
								
								.github/workflows/test-only.yml
									
									
									
									
										vendored
									
									
								
							
							
						
						
									
										2
									
								
								.github/workflows/test-only.yml
									
									
									
									
										vendored
									
									
								
							| @@ -7,7 +7,7 @@ jobs: | ||||
|   lint-code: | ||||
|     runs-on: ubuntu-latest | ||||
|     steps: | ||||
|       - uses: actions/checkout@v4 | ||||
|       - uses: actions/checkout@v5 | ||||
|       - name: Lint with Ruff | ||||
|         run: | | ||||
|           pip install ruff | ||||
|   | ||||
| @@ -20,7 +20,7 @@ jobs: | ||||
|     env: | ||||
|       PYTHON_VERSION: ${{ inputs.python-version }} | ||||
|     steps: | ||||
|       - uses: actions/checkout@v4 | ||||
|       - uses: actions/checkout@v5 | ||||
|  | ||||
|       # Mainly just for link/flake8 | ||||
|       - name: Set up Python ${{ env.PYTHON_VERSION }} | ||||
|   | ||||
| @@ -84,6 +84,9 @@ COPY changedetection.py /app/changedetection.py | ||||
| ARG LOGGER_LEVEL='' | ||||
| ENV LOGGER_LEVEL="$LOGGER_LEVEL" | ||||
|  | ||||
| # Default | ||||
| ENV LC_ALL=en_US.UTF-8 | ||||
|  | ||||
| WORKDIR /app | ||||
| CMD ["python", "./changedetection.py", "-d", "/datastore"] | ||||
|  | ||||
|   | ||||
| @@ -2,7 +2,7 @@ | ||||
|  | ||||
| # Read more https://github.com/dgtlmoon/changedetection.io/wiki | ||||
|  | ||||
| __version__ = '0.50.7' | ||||
| __version__ = '0.50.9' | ||||
|  | ||||
| from changedetectionio.strtobool import strtobool | ||||
| from json.decoder import JSONDecodeError | ||||
| @@ -35,13 +35,22 @@ def sigshutdown_handler(_signo, _stack_frame): | ||||
|     app.config.exit.set() | ||||
|     datastore.stop_thread = True | ||||
|      | ||||
|     # Shutdown workers immediately | ||||
|     # Shutdown workers and queues immediately | ||||
|     try: | ||||
|         from changedetectionio import worker_handler | ||||
|         worker_handler.shutdown_workers() | ||||
|     except Exception as e: | ||||
|         logger.error(f"Error shutting down workers: {str(e)}") | ||||
|      | ||||
|     # Close janus queues properly | ||||
|     try: | ||||
|         from changedetectionio.flask_app import update_q, notification_q | ||||
|         update_q.close() | ||||
|         notification_q.close() | ||||
|         logger.debug("Janus queues closed successfully") | ||||
|     except Exception as e: | ||||
|         logger.critical(f"CRITICAL: Failed to close janus queues: {e}") | ||||
|      | ||||
|     # Shutdown socketio server fast | ||||
|     from changedetectionio.flask_app import socketio_server | ||||
|     if socketio_server and hasattr(socketio_server, 'shutdown'): | ||||
|   | ||||
| @@ -1,5 +1,8 @@ | ||||
| from changedetectionio import queuedWatchMetaData | ||||
| from changedetectionio import worker_handler | ||||
| from flask_expects_json import expects_json | ||||
| from flask_restful import abort, Resource | ||||
|  | ||||
| from flask import request | ||||
| from . import auth | ||||
|  | ||||
| @@ -11,21 +14,24 @@ class Tag(Resource): | ||||
|     def __init__(self, **kwargs): | ||||
|         # datastore is a black box dependency | ||||
|         self.datastore = kwargs['datastore'] | ||||
|         self.update_q = kwargs['update_q'] | ||||
|  | ||||
|     # Get information about a single tag | ||||
|     # curl http://localhost:5000/api/v1/tag/<string:uuid> | ||||
|     @auth.check_token | ||||
|     def get(self, uuid): | ||||
|         """ | ||||
|         @api {get} /api/v1/tag/:uuid Single tag - get data or toggle notification muting. | ||||
|         @apiDescription Retrieve tag information and set notification_muted status | ||||
|         @api {get} /api/v1/tag/:uuid Single tag - Get data, toggle notification muting, recheck all. | ||||
|         @apiDescription Retrieve tag information, set notification_muted status, recheck all in tag. | ||||
|         @apiExample {curl} Example usage: | ||||
|             curl http://localhost:5000/api/v1/tag/cc0cfffa-f449-477b-83ea-0caafd1dc091 -H"x-api-key:813031b16330fe25e3780cf0325daa45" | ||||
|             curl "http://localhost:5000/api/v1/tag/cc0cfffa-f449-477b-83ea-0caafd1dc091?muted=muted" -H"x-api-key:813031b16330fe25e3780cf0325daa45" | ||||
|             curl "http://localhost:5000/api/v1/tag/cc0cfffa-f449-477b-83ea-0caafd1dc091?recheck=true" -H"x-api-key:813031b16330fe25e3780cf0325daa45" | ||||
|         @apiName Tag | ||||
|         @apiGroup Tag | ||||
|         @apiParam {uuid} uuid Tag unique ID. | ||||
|         @apiQuery {String} [muted] =`muted` or =`unmuted` , Sets the MUTE NOTIFICATIONS state | ||||
|         @apiQuery {String} [recheck] = True, Queue all watches with this tag for recheck | ||||
|         @apiSuccess (200) {String} OK When muted operation OR full JSON object of the tag | ||||
|         @apiSuccess (200) {JSON} TagJSON JSON Full JSON object of the tag | ||||
|         """ | ||||
| @@ -34,6 +40,20 @@ class Tag(Resource): | ||||
|         if not tag: | ||||
|             abort(404, message=f'No tag exists with the UUID of {uuid}') | ||||
|  | ||||
|         if request.args.get('recheck'): | ||||
|             # Recheck all, including muted | ||||
|             # Get most overdue first | ||||
|             i=0 | ||||
|             for k in sorted(self.datastore.data['watching'].items(), key=lambda item: item[1].get('last_checked', 0)): | ||||
|                 watch_uuid = k[0] | ||||
|                 watch = k[1] | ||||
|                 if not watch['paused'] and tag['uuid'] not in watch['tags']: | ||||
|                     continue | ||||
|                 worker_handler.queue_item_async_safe(self.update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': watch_uuid})) | ||||
|                 i+=1 | ||||
|  | ||||
|             return f"OK, {i} watches queued", 200 | ||||
|  | ||||
|         if request.args.get('muted', '') == 'muted': | ||||
|             self.datastore.data['settings']['application']['tags'][uuid]['notification_muted'] = True | ||||
|             return "OK", 200 | ||||
|   | ||||
| @@ -7,6 +7,7 @@ from changedetectionio.flask_app import watch_check_update | ||||
| import asyncio | ||||
| import importlib | ||||
| import os | ||||
| import queue | ||||
| import time | ||||
|  | ||||
| from loguru import logger | ||||
| @@ -37,13 +38,23 @@ async def async_update_worker(worker_id, q, notification_q, app, datastore): | ||||
|         watch = None | ||||
|  | ||||
|         try: | ||||
|             # Use asyncio wait_for to make queue.get() cancellable | ||||
|             queued_item_data = await asyncio.wait_for(q.get(), timeout=1.0) | ||||
|             # Use native janus async interface - no threads needed! | ||||
|             queued_item_data = await asyncio.wait_for(q.async_get(), timeout=1.0) | ||||
|              | ||||
|         except asyncio.TimeoutError: | ||||
|             # No jobs available, continue loop | ||||
|             continue | ||||
|         except Exception as e: | ||||
|             logger.error(f"Worker {worker_id} error getting queue item: {e}") | ||||
|             logger.critical(f"CRITICAL: Worker {worker_id} failed to get queue item: {type(e).__name__}: {e}") | ||||
|              | ||||
|             # Log queue health for debugging | ||||
|             try: | ||||
|                 queue_size = q.qsize() | ||||
|                 is_empty = q.empty() | ||||
|                 logger.critical(f"CRITICAL: Worker {worker_id} queue health - size: {queue_size}, empty: {is_empty}") | ||||
|             except Exception as health_e: | ||||
|                 logger.critical(f"CRITICAL: Worker {worker_id} queue health check failed: {health_e}") | ||||
|              | ||||
|             await asyncio.sleep(0.1) | ||||
|             continue | ||||
|          | ||||
|   | ||||
| @@ -93,12 +93,15 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, queuedWatchMe | ||||
|             return redirect(url_for('watchlist.index')) | ||||
|  | ||||
|         # For submission of requesting an extract | ||||
|         extract_form = forms.extractDataForm(request.form) | ||||
|         extract_form = forms.extractDataForm(formdata=request.form, | ||||
|                                              data={'extract_regex': request.form.get('extract_regex', '')} | ||||
|                                              ) | ||||
|         if not extract_form.validate(): | ||||
|             flash("An error occurred, please see below.", "error") | ||||
|             return _render_diff_template(uuid, extract_form) | ||||
|  | ||||
|         else: | ||||
|             extract_regex = request.form.get('extract_regex').strip() | ||||
|             extract_regex = request.form.get('extract_regex', '').strip() | ||||
|             output = watch.extract_regex_from_all_history(extract_regex) | ||||
|             if output: | ||||
|                 watch_dir = os.path.join(datastore.datastore_path, uuid) | ||||
| @@ -109,12 +112,11 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, queuedWatchMe | ||||
|                 response.headers['Expires'] = "0" | ||||
|                 return response | ||||
|  | ||||
|             flash('Nothing matches that RegEx', 'error') | ||||
|         redirect(url_for('ui_views.diff_history_page', uuid=uuid) + '#extract') | ||||
|             flash('No matches found while scanning all of the watch history for that RegEx.', 'error') | ||||
|         return redirect(url_for('ui.ui_views.diff_history_page', uuid=uuid) + '#extract') | ||||
|  | ||||
|     @views_blueprint.route("/diff/<string:uuid>", methods=['GET']) | ||||
|     @login_optionally_required | ||||
|     def diff_history_page(uuid): | ||||
|     def _render_diff_template(uuid, extract_form=None): | ||||
|         """Helper function to render the diff template with all required data""" | ||||
|         from changedetectionio import forms | ||||
|  | ||||
|         # More for testing, possible to return the first/only | ||||
| @@ -128,8 +130,11 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, queuedWatchMe | ||||
|             flash("No history found for the specified link, bad link?", "error") | ||||
|             return redirect(url_for('watchlist.index')) | ||||
|  | ||||
|         # For submission of requesting an extract | ||||
|         extract_form = forms.extractDataForm(request.form) | ||||
|         # Use provided form or create a new one | ||||
|         if extract_form is None: | ||||
|             extract_form = forms.extractDataForm(formdata=request.form, | ||||
|                                                  data={'extract_regex': request.form.get('extract_regex', '')} | ||||
|                                                  ) | ||||
|  | ||||
|         history = watch.history | ||||
|         dates = list(history.keys()) | ||||
| @@ -170,7 +175,7 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, queuedWatchMe | ||||
|  | ||||
|         datastore.set_last_viewed(uuid, time.time()) | ||||
|  | ||||
|         output = render_template("diff.html", | ||||
|         return render_template("diff.html", | ||||
|                                  current_diff_url=watch['url'], | ||||
|                                  from_version=str(from_version), | ||||
|                                  to_version=str(to_version), | ||||
| @@ -193,7 +198,10 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, queuedWatchMe | ||||
|                                  watch_a=watch | ||||
|                                  ) | ||||
|  | ||||
|         return output | ||||
|     @views_blueprint.route("/diff/<string:uuid>", methods=['GET']) | ||||
|     @login_optionally_required | ||||
|     def diff_history_page(uuid): | ||||
|         return _render_diff_template(uuid) | ||||
|  | ||||
|     @views_blueprint.route("/form/add/quickwatch", methods=['POST']) | ||||
|     @login_optionally_required | ||||
|   | ||||
| @@ -12,7 +12,7 @@ from blinker import signal | ||||
|  | ||||
| from changedetectionio.strtobool import strtobool | ||||
| from threading import Event | ||||
| from changedetectionio.custom_queue import SignalPriorityQueue, AsyncSignalPriorityQueue, NotificationQueue | ||||
| from changedetectionio.queue_handlers import RecheckPriorityQueue, NotificationQueue | ||||
| from changedetectionio import worker_handler | ||||
|  | ||||
| from flask import ( | ||||
| @@ -48,8 +48,8 @@ datastore = None | ||||
| ticker_thread = None | ||||
| extra_stylesheets = [] | ||||
|  | ||||
| # Use async queue by default, keep sync for backward compatibility   | ||||
| update_q = AsyncSignalPriorityQueue() if worker_handler.USE_ASYNC_WORKERS else SignalPriorityQueue() | ||||
| # Use bulletproof janus-based queues for sync/async reliability   | ||||
| update_q = RecheckPriorityQueue() | ||||
| notification_q = NotificationQueue() | ||||
| MAX_QUEUE_SIZE = 2000 | ||||
|  | ||||
| @@ -329,7 +329,7 @@ def changedetection_app(config=None, datastore_o=None): | ||||
|                            resource_class_kwargs={'datastore': datastore}) | ||||
|  | ||||
|     watch_api.add_resource(Tag, '/api/v1/tag', '/api/v1/tag/<string:uuid>', | ||||
|                            resource_class_kwargs={'datastore': datastore}) | ||||
|                            resource_class_kwargs={'datastore': datastore, 'update_q': update_q}) | ||||
|                             | ||||
|     watch_api.add_resource(Search, '/api/v1/search', | ||||
|                            resource_class_kwargs={'datastore': datastore}) | ||||
| @@ -844,16 +844,22 @@ def ticker_thread_check_time_launch_checks(): | ||||
|  | ||||
|                     # Use Epoch time as priority, so we get a "sorted" PriorityQueue, but we can still push a priority 1 into it. | ||||
|                     priority = int(time.time()) | ||||
|                     logger.debug( | ||||
|                         f"> Queued watch UUID {uuid} " | ||||
|                         f"last checked at {watch['last_checked']} " | ||||
|                         f"queued at {now:0.2f} priority {priority} " | ||||
|                         f"jitter {watch.jitter_seconds:0.2f}s, " | ||||
|                         f"{now - watch['last_checked']:0.2f}s since last checked") | ||||
|  | ||||
|                     # Into the queue with you | ||||
|                     worker_handler.queue_item_async_safe(update_q, queuedWatchMetaData.PrioritizedItem(priority=priority, item={'uuid': uuid})) | ||||
|  | ||||
|                     queued_successfully = worker_handler.queue_item_async_safe(update_q, | ||||
|                                                                                queuedWatchMetaData.PrioritizedItem(priority=priority, | ||||
|                                                                                                                    item={'uuid': uuid}) | ||||
|                                                                                ) | ||||
|                     if queued_successfully: | ||||
|                         logger.debug( | ||||
|                             f"> Queued watch UUID {uuid} " | ||||
|                             f"last checked at {watch['last_checked']} " | ||||
|                             f"queued at {now:0.2f} priority {priority} " | ||||
|                             f"jitter {watch.jitter_seconds:0.2f}s, " | ||||
|                             f"{now - watch['last_checked']:0.2f}s since last checked") | ||||
|                     else: | ||||
|                         logger.critical(f"CRITICAL: Failed to queue watch UUID {uuid} in ticker thread!") | ||||
|                          | ||||
|                     # Reset for next time | ||||
|                     watch.jitter_seconds = 0 | ||||
|  | ||||
|   | ||||
| @@ -396,6 +396,19 @@ def validate_url(test_url): | ||||
|         # This should be wtforms.validators. | ||||
|         raise ValidationError('Watch protocol is not permitted by SAFE_PROTOCOL_REGEX or incorrect URL format') | ||||
|  | ||||
|  | ||||
| class ValidateSinglePythonRegexString(object): | ||||
|     def __init__(self, message=None): | ||||
|         self.message = message | ||||
|  | ||||
|     def __call__(self, form, field): | ||||
|         try: | ||||
|             re.compile(field.data) | ||||
|         except re.error: | ||||
|             message = field.gettext('RegEx \'%s\' is not a valid regular expression.') | ||||
|             raise ValidationError(message % (field.data)) | ||||
|  | ||||
|  | ||||
| class ValidateListRegex(object): | ||||
|     """ | ||||
|     Validates that anything that looks like a regex passes as a regex | ||||
| @@ -414,6 +427,7 @@ class ValidateListRegex(object): | ||||
|                     message = field.gettext('RegEx \'%s\' is not a valid regular expression.') | ||||
|                     raise ValidationError(message % (line)) | ||||
|  | ||||
|  | ||||
| class ValidateCSSJSONXPATHInput(object): | ||||
|     """ | ||||
|     Filter validation | ||||
| @@ -791,5 +805,5 @@ class globalSettingsForm(Form): | ||||
|  | ||||
|  | ||||
| class extractDataForm(Form): | ||||
|     extract_regex = StringField('RegEx to extract', validators=[validators.Length(min=1, message="Needs a RegEx")]) | ||||
|     extract_regex = StringField('RegEx to extract', validators=[validators.DataRequired(), ValidateSinglePythonRegexString()]) | ||||
|     extract_submit_button = SubmitField('Extract as CSV', render_kw={"class": "pure-button pure-button-primary"}) | ||||
|   | ||||
| @@ -639,7 +639,7 @@ class model(watch_base): | ||||
|                     if res: | ||||
|                         if not csv_writer: | ||||
|                             # A file on the disk can be transferred much faster via flask than a string reply | ||||
|                             csv_output_filename = 'report.csv' | ||||
|                             csv_output_filename = f"report-{self.get('uuid')}.csv" | ||||
|                             f = open(os.path.join(self.watch_data_dir, csv_output_filename), 'w') | ||||
|                             # @todo some headers in the future | ||||
|                             #fieldnames = ['Epoch seconds', 'Date'] | ||||
|   | ||||
| @@ -3,6 +3,7 @@ import uuid | ||||
|  | ||||
| from changedetectionio import strtobool | ||||
| default_notification_format_for_watch = 'System default' | ||||
| CONDITIONS_MATCH_LOGIC_DEFAULT = 'ALL' | ||||
|  | ||||
| class watch_base(dict): | ||||
|  | ||||
| @@ -15,6 +16,8 @@ class watch_base(dict): | ||||
|             'body': None, | ||||
|             'browser_steps': [], | ||||
|             'browser_steps_last_error_step': None, | ||||
|             'conditions' : {}, | ||||
|             'conditions_match_logic': CONDITIONS_MATCH_LOGIC_DEFAULT, | ||||
|             'check_count': 0, | ||||
|             'check_unique_lines': False,  # On change-detected, compare against all history if its something new | ||||
|             'consecutive_filter_failures': 0,  # Every time the CSS/xPath filter cannot be located, reset when all is fine. | ||||
|   | ||||
							
								
								
									
										430
									
								
								changedetectionio/queue_handlers.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										430
									
								
								changedetectionio/queue_handlers.py
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,430 @@ | ||||
| import heapq | ||||
| import threading | ||||
| from typing import Dict, List, Any, Optional | ||||
| from blinker import signal | ||||
| from loguru import logger | ||||
|  | ||||
| try: | ||||
|     import janus | ||||
| except ImportError: | ||||
|     logger.critical("CRITICAL: janus library is required. Install with: pip install janus") | ||||
|     raise | ||||
|  | ||||
|  | ||||
| class RecheckPriorityQueue: | ||||
|     """ | ||||
|     Ultra-reliable priority queue using janus for async/sync bridging. | ||||
|      | ||||
|     CRITICAL DESIGN NOTE: Both sync_q and async_q are required because: | ||||
|     - sync_q: Used by Flask routes, ticker threads, and other synchronous code | ||||
|     - async_q: Used by async workers (the actual fetchers/processors) and coroutines | ||||
|      | ||||
|     DO NOT REMOVE EITHER INTERFACE - they bridge different execution contexts: | ||||
|     - Synchronous code (Flask, threads) cannot use async methods without blocking | ||||
|     - Async code cannot use sync methods without blocking the event loop | ||||
|     - janus provides the only safe bridge between these two worlds | ||||
|      | ||||
|     Attempting to unify to async-only would require: | ||||
|     - Converting all Flask routes to async (major breaking change) | ||||
|     - Using asyncio.run() in sync contexts (causes deadlocks) | ||||
|     - Thread-pool wrapping (adds complexity and overhead) | ||||
|      | ||||
|     Minimal implementation focused on reliability: | ||||
|     - Pure janus for sync/async bridge | ||||
|     - Thread-safe priority ordering   | ||||
|     - Bulletproof error handling with critical logging | ||||
|     """ | ||||
|      | ||||
|     def __init__(self, maxsize: int = 0): | ||||
|         try: | ||||
|             self._janus_queue = janus.Queue(maxsize=maxsize) | ||||
|             # BOTH interfaces required - see class docstring for why | ||||
|             self.sync_q = self._janus_queue.sync_q   # Flask routes, ticker thread | ||||
|             self.async_q = self._janus_queue.async_q # Async workers | ||||
|              | ||||
|             # Priority storage - thread-safe | ||||
|             self._priority_items = [] | ||||
|             self._lock = threading.RLock() | ||||
|              | ||||
|             # Signals for UI updates | ||||
|             self.queue_length_signal = signal('queue_length') | ||||
|              | ||||
|             logger.debug("RecheckPriorityQueue initialized successfully") | ||||
|         except Exception as e: | ||||
|             logger.critical(f"CRITICAL: Failed to initialize RecheckPriorityQueue: {e}") | ||||
|             raise | ||||
|      | ||||
|     # SYNC INTERFACE (for ticker thread) | ||||
|     def put(self, item, block: bool = True, timeout: Optional[float] = None): | ||||
|         """Thread-safe sync put with priority ordering""" | ||||
|         try: | ||||
|             # Add to priority storage | ||||
|             with self._lock: | ||||
|                 heapq.heappush(self._priority_items, item) | ||||
|              | ||||
|             # Notify via janus sync queue | ||||
|             self.sync_q.put(True, block=block, timeout=timeout) | ||||
|              | ||||
|             # Emit signals | ||||
|             self._emit_put_signals(item) | ||||
|              | ||||
|             logger.debug(f"Successfully queued item: {self._get_item_uuid(item)}") | ||||
|             return True | ||||
|              | ||||
|         except Exception as e: | ||||
|             logger.critical(f"CRITICAL: Failed to put item {self._get_item_uuid(item)}: {e}") | ||||
|             # Remove from priority storage if janus put failed | ||||
|             try: | ||||
|                 with self._lock: | ||||
|                     if item in self._priority_items: | ||||
|                         self._priority_items.remove(item) | ||||
|                         heapq.heapify(self._priority_items) | ||||
|             except Exception as cleanup_e: | ||||
|                 logger.critical(f"CRITICAL: Failed to cleanup after put failure: {cleanup_e}") | ||||
|             return False | ||||
|      | ||||
|     def get(self, block: bool = True, timeout: Optional[float] = None): | ||||
|         """Thread-safe sync get with priority ordering""" | ||||
|         try: | ||||
|             # Wait for notification | ||||
|             self.sync_q.get(block=block, timeout=timeout) | ||||
|              | ||||
|             # Get highest priority item | ||||
|             with self._lock: | ||||
|                 if not self._priority_items: | ||||
|                     logger.critical("CRITICAL: Queue notification received but no priority items available") | ||||
|                     raise Exception("Priority queue inconsistency") | ||||
|                 item = heapq.heappop(self._priority_items) | ||||
|              | ||||
|             # Emit signals | ||||
|             self._emit_get_signals() | ||||
|              | ||||
|             logger.debug(f"Successfully retrieved item: {self._get_item_uuid(item)}") | ||||
|             return item | ||||
|              | ||||
|         except Exception as e: | ||||
|             logger.critical(f"CRITICAL: Failed to get item from queue: {e}") | ||||
|             raise | ||||
|      | ||||
|     # ASYNC INTERFACE (for workers) | ||||
|     async def async_put(self, item): | ||||
|         """Pure async put with priority ordering""" | ||||
|         try: | ||||
|             # Add to priority storage | ||||
|             with self._lock: | ||||
|                 heapq.heappush(self._priority_items, item) | ||||
|              | ||||
|             # Notify via janus async queue | ||||
|             await self.async_q.put(True) | ||||
|              | ||||
|             # Emit signals | ||||
|             self._emit_put_signals(item) | ||||
|              | ||||
|             logger.debug(f"Successfully async queued item: {self._get_item_uuid(item)}") | ||||
|             return True | ||||
|              | ||||
|         except Exception as e: | ||||
|             logger.critical(f"CRITICAL: Failed to async put item {self._get_item_uuid(item)}: {e}") | ||||
|             # Remove from priority storage if janus put failed | ||||
|             try: | ||||
|                 with self._lock: | ||||
|                     if item in self._priority_items: | ||||
|                         self._priority_items.remove(item) | ||||
|                         heapq.heapify(self._priority_items) | ||||
|             except Exception as cleanup_e: | ||||
|                 logger.critical(f"CRITICAL: Failed to cleanup after async put failure: {cleanup_e}") | ||||
|             return False | ||||
|      | ||||
|     async def async_get(self): | ||||
|         """Pure async get with priority ordering""" | ||||
|         try: | ||||
|             # Wait for notification | ||||
|             await self.async_q.get() | ||||
|              | ||||
|             # Get highest priority item | ||||
|             with self._lock: | ||||
|                 if not self._priority_items: | ||||
|                     logger.critical("CRITICAL: Async queue notification received but no priority items available") | ||||
|                     raise Exception("Priority queue inconsistency") | ||||
|                 item = heapq.heappop(self._priority_items) | ||||
|              | ||||
|             # Emit signals | ||||
|             self._emit_get_signals() | ||||
|              | ||||
|             logger.debug(f"Successfully async retrieved item: {self._get_item_uuid(item)}") | ||||
|             return item | ||||
|              | ||||
|         except Exception as e: | ||||
|             logger.critical(f"CRITICAL: Failed to async get item from queue: {e}") | ||||
|             raise | ||||
|      | ||||
|     # UTILITY METHODS | ||||
|     def qsize(self) -> int: | ||||
|         """Get current queue size""" | ||||
|         try: | ||||
|             with self._lock: | ||||
|                 return len(self._priority_items) | ||||
|         except Exception as e: | ||||
|             logger.critical(f"CRITICAL: Failed to get queue size: {e}") | ||||
|             return 0 | ||||
|      | ||||
|     def empty(self) -> bool: | ||||
|         """Check if queue is empty""" | ||||
|         return self.qsize() == 0 | ||||
|      | ||||
|     def close(self): | ||||
|         """Close the janus queue""" | ||||
|         try: | ||||
|             self._janus_queue.close() | ||||
|             logger.debug("RecheckPriorityQueue closed successfully") | ||||
|         except Exception as e: | ||||
|             logger.critical(f"CRITICAL: Failed to close RecheckPriorityQueue: {e}") | ||||
|      | ||||
|     # COMPATIBILITY METHODS (from original implementation) | ||||
|     @property | ||||
|     def queue(self): | ||||
|         """Provide compatibility with original queue access""" | ||||
|         try: | ||||
|             with self._lock: | ||||
|                 return list(self._priority_items) | ||||
|         except Exception as e: | ||||
|             logger.critical(f"CRITICAL: Failed to get queue list: {e}") | ||||
|             return [] | ||||
|      | ||||
|     def get_uuid_position(self, target_uuid: str) -> Dict[str, Any]: | ||||
|         """Find position of UUID in queue""" | ||||
|         try: | ||||
|             with self._lock: | ||||
|                 queue_list = list(self._priority_items) | ||||
|                 total_items = len(queue_list) | ||||
|                  | ||||
|                 if total_items == 0: | ||||
|                     return {'position': None, 'total_items': 0, 'priority': None, 'found': False} | ||||
|                  | ||||
|                 # Find target item | ||||
|                 for item in queue_list: | ||||
|                     if (hasattr(item, 'item') and isinstance(item.item, dict) and  | ||||
|                         item.item.get('uuid') == target_uuid): | ||||
|                          | ||||
|                         # Count items with higher priority | ||||
|                         position = sum(1 for other in queue_list if other.priority < item.priority) | ||||
|                         return { | ||||
|                             'position': position, | ||||
|                             'total_items': total_items,  | ||||
|                             'priority': item.priority, | ||||
|                             'found': True | ||||
|                         } | ||||
|                  | ||||
|                 return {'position': None, 'total_items': total_items, 'priority': None, 'found': False} | ||||
|                  | ||||
|         except Exception as e: | ||||
|             logger.critical(f"CRITICAL: Failed to get UUID position for {target_uuid}: {e}") | ||||
|             return {'position': None, 'total_items': 0, 'priority': None, 'found': False} | ||||
|      | ||||
|     def get_all_queued_uuids(self, limit: Optional[int] = None, offset: int = 0) -> Dict[str, Any]: | ||||
|         """Get all queued UUIDs with pagination""" | ||||
|         try: | ||||
|             with self._lock: | ||||
|                 queue_list = sorted(self._priority_items)  # Sort by priority | ||||
|                 total_items = len(queue_list) | ||||
|                  | ||||
|                 if total_items == 0: | ||||
|                     return {'items': [], 'total_items': 0, 'returned_items': 0, 'has_more': False} | ||||
|                  | ||||
|                 # Apply pagination | ||||
|                 end_idx = min(offset + limit, total_items) if limit else total_items | ||||
|                 items_to_process = queue_list[offset:end_idx] | ||||
|                  | ||||
|                 result = [] | ||||
|                 for position, item in enumerate(items_to_process, start=offset): | ||||
|                     if (hasattr(item, 'item') and isinstance(item.item, dict) and  | ||||
|                         'uuid' in item.item): | ||||
|                         result.append({ | ||||
|                             'uuid': item.item['uuid'], | ||||
|                             'position': position, | ||||
|                             'priority': item.priority | ||||
|                         }) | ||||
|                  | ||||
|                 return { | ||||
|                     'items': result, | ||||
|                     'total_items': total_items, | ||||
|                     'returned_items': len(result), | ||||
|                     'has_more': (offset + len(result)) < total_items | ||||
|                 } | ||||
|                  | ||||
|         except Exception as e: | ||||
|             logger.critical(f"CRITICAL: Failed to get all queued UUIDs: {e}") | ||||
|             return {'items': [], 'total_items': 0, 'returned_items': 0, 'has_more': False} | ||||
|      | ||||
|     def get_queue_summary(self) -> Dict[str, Any]: | ||||
|         """Get queue summary statistics""" | ||||
|         try: | ||||
|             with self._lock: | ||||
|                 queue_list = list(self._priority_items) | ||||
|                 total_items = len(queue_list) | ||||
|                  | ||||
|                 if total_items == 0: | ||||
|                     return { | ||||
|                         'total_items': 0, 'priority_breakdown': {}, | ||||
|                         'immediate_items': 0, 'clone_items': 0, 'scheduled_items': 0 | ||||
|                     } | ||||
|                  | ||||
|                 immediate_items = clone_items = scheduled_items = 0 | ||||
|                 priority_counts = {} | ||||
|                  | ||||
|                 for item in queue_list: | ||||
|                     priority = item.priority | ||||
|                     priority_counts[priority] = priority_counts.get(priority, 0) + 1 | ||||
|                      | ||||
|                     if priority == 1: | ||||
|                         immediate_items += 1 | ||||
|                     elif priority == 5: | ||||
|                         clone_items += 1 | ||||
|                     elif priority > 100: | ||||
|                         scheduled_items += 1 | ||||
|                  | ||||
|                 return { | ||||
|                     'total_items': total_items, | ||||
|                     'priority_breakdown': priority_counts, | ||||
|                     'immediate_items': immediate_items, | ||||
|                     'clone_items': clone_items, | ||||
|                     'scheduled_items': scheduled_items, | ||||
|                     'min_priority': min(priority_counts.keys()) if priority_counts else None, | ||||
|                     'max_priority': max(priority_counts.keys()) if priority_counts else None | ||||
|                 } | ||||
|                  | ||||
|         except Exception as e: | ||||
|             logger.critical(f"CRITICAL: Failed to get queue summary: {e}") | ||||
|             return {'total_items': 0, 'priority_breakdown': {}, 'immediate_items': 0,  | ||||
|                    'clone_items': 0, 'scheduled_items': 0} | ||||
|      | ||||
|     # PRIVATE METHODS | ||||
|     def _get_item_uuid(self, item) -> str: | ||||
|         """Safely extract UUID from item for logging""" | ||||
|         try: | ||||
|             if hasattr(item, 'item') and isinstance(item.item, dict): | ||||
|                 return item.item.get('uuid', 'unknown') | ||||
|         except Exception: | ||||
|             pass | ||||
|         return 'unknown' | ||||
|      | ||||
|     def _emit_put_signals(self, item): | ||||
|         """Emit signals when item is added""" | ||||
|         try: | ||||
|             # Watch update signal | ||||
|             if hasattr(item, 'item') and isinstance(item.item, dict) and 'uuid' in item.item: | ||||
|                 watch_check_update = signal('watch_check_update') | ||||
|                 if watch_check_update: | ||||
|                     watch_check_update.send(watch_uuid=item.item['uuid']) | ||||
|              | ||||
|             # Queue length signal | ||||
|             if self.queue_length_signal: | ||||
|                 self.queue_length_signal.send(length=self.qsize()) | ||||
|                  | ||||
|         except Exception as e: | ||||
|             logger.critical(f"CRITICAL: Failed to emit put signals: {e}") | ||||
|      | ||||
|     def _emit_get_signals(self): | ||||
|         """Emit signals when item is removed""" | ||||
|         try: | ||||
|             if self.queue_length_signal: | ||||
|                 self.queue_length_signal.send(length=self.qsize()) | ||||
|         except Exception as e: | ||||
|             logger.critical(f"CRITICAL: Failed to emit get signals: {e}") | ||||
|  | ||||
|  | ||||
| class NotificationQueue: | ||||
|     """ | ||||
|     Ultra-reliable notification queue using pure janus. | ||||
|      | ||||
|     CRITICAL DESIGN NOTE: Both sync_q and async_q are required because: | ||||
|     - sync_q: Used by Flask routes, ticker threads, and other synchronous code | ||||
|     - async_q: Used by async workers and coroutines | ||||
|      | ||||
|     DO NOT REMOVE EITHER INTERFACE - they bridge different execution contexts. | ||||
|     See RecheckPriorityQueue docstring above for detailed explanation. | ||||
|      | ||||
|     Simple wrapper around janus with bulletproof error handling. | ||||
|     """ | ||||
|      | ||||
|     def __init__(self, maxsize: int = 0): | ||||
|         try: | ||||
|             self._janus_queue = janus.Queue(maxsize=maxsize) | ||||
|             # BOTH interfaces required - see class docstring for why | ||||
|             self.sync_q = self._janus_queue.sync_q   # Flask routes, threads | ||||
|             self.async_q = self._janus_queue.async_q # Async workers | ||||
|             self.notification_event_signal = signal('notification_event') | ||||
|             logger.debug("NotificationQueue initialized successfully") | ||||
|         except Exception as e: | ||||
|             logger.critical(f"CRITICAL: Failed to initialize NotificationQueue: {e}") | ||||
|             raise | ||||
|      | ||||
|     def put(self, item: Dict[str, Any], block: bool = True, timeout: Optional[float] = None): | ||||
|         """Thread-safe sync put with signal emission""" | ||||
|         try: | ||||
|             self.sync_q.put(item, block=block, timeout=timeout) | ||||
|             self._emit_notification_signal(item) | ||||
|             logger.debug(f"Successfully queued notification: {item.get('uuid', 'unknown')}") | ||||
|             return True | ||||
|         except Exception as e: | ||||
|             logger.critical(f"CRITICAL: Failed to put notification {item.get('uuid', 'unknown')}: {e}") | ||||
|             return False | ||||
|      | ||||
|     async def async_put(self, item: Dict[str, Any]): | ||||
|         """Pure async put with signal emission""" | ||||
|         try: | ||||
|             await self.async_q.put(item) | ||||
|             self._emit_notification_signal(item) | ||||
|             logger.debug(f"Successfully async queued notification: {item.get('uuid', 'unknown')}") | ||||
|             return True | ||||
|         except Exception as e: | ||||
|             logger.critical(f"CRITICAL: Failed to async put notification {item.get('uuid', 'unknown')}: {e}") | ||||
|             return False | ||||
|      | ||||
|     def get(self, block: bool = True, timeout: Optional[float] = None): | ||||
|         """Thread-safe sync get""" | ||||
|         try: | ||||
|             return self.sync_q.get(block=block, timeout=timeout) | ||||
|         except Exception as e: | ||||
|             logger.critical(f"CRITICAL: Failed to get notification: {e}") | ||||
|             raise | ||||
|      | ||||
|     async def async_get(self): | ||||
|         """Pure async get""" | ||||
|         try: | ||||
|             return await self.async_q.get() | ||||
|         except Exception as e: | ||||
|             logger.critical(f"CRITICAL: Failed to async get notification: {e}") | ||||
|             raise | ||||
|      | ||||
|     def qsize(self) -> int: | ||||
|         """Get current queue size""" | ||||
|         try: | ||||
|             return self.sync_q.qsize() | ||||
|         except Exception as e: | ||||
|             logger.critical(f"CRITICAL: Failed to get notification queue size: {e}") | ||||
|             return 0 | ||||
|      | ||||
|     def empty(self) -> bool: | ||||
|         """Check if queue is empty""" | ||||
|         return self.qsize() == 0 | ||||
|      | ||||
|     def close(self): | ||||
|         """Close the janus queue""" | ||||
|         try: | ||||
|             self._janus_queue.close() | ||||
|             logger.debug("NotificationQueue closed successfully") | ||||
|         except Exception as e: | ||||
|             logger.critical(f"CRITICAL: Failed to close NotificationQueue: {e}") | ||||
|      | ||||
|     def _emit_notification_signal(self, item: Dict[str, Any]): | ||||
|         """Emit notification signal""" | ||||
|         try: | ||||
|             if self.notification_event_signal and isinstance(item, dict): | ||||
|                 watch_uuid = item.get('uuid') | ||||
|                 if watch_uuid: | ||||
|                     self.notification_event_signal.send(watch_uuid=watch_uuid) | ||||
|                 else: | ||||
|                     self.notification_event_signal.send() | ||||
|         except Exception as e: | ||||
|             logger.critical(f"CRITICAL: Failed to emit notification signal: {e}") | ||||
| @@ -6,19 +6,19 @@ | ||||
|       } | ||||
|     } | ||||
|   } | ||||
|   &.favicon-enabled { | ||||
|     tr { | ||||
|       /* make the icons and the text inline-ish */ | ||||
|       td.inline.title-col { | ||||
|         .flex-wrapper { | ||||
|           display: flex; | ||||
|           align-items: center; | ||||
|           gap: 4px; | ||||
|         } | ||||
|  | ||||
|   tr { | ||||
|     /* make the icons and the text inline-ish */ | ||||
|     td.inline.title-col { | ||||
|       .flex-wrapper { | ||||
|         display: flex; | ||||
|         align-items: center; | ||||
|         gap: 4px; | ||||
|       } | ||||
|     } | ||||
|   } | ||||
|  | ||||
|  | ||||
|   td, | ||||
|   th { | ||||
|     vertical-align: middle; | ||||
|   | ||||
										
											
												File diff suppressed because one or more lines are too long
											
										
									
								
							| @@ -292,9 +292,7 @@ def test_access_denied(client, live_server, measure_memory_usage): | ||||
|  | ||||
| def test_api_watch_PUT_update(client, live_server, measure_memory_usage): | ||||
|  | ||||
|      | ||||
|     api_key = live_server.app.config['DATASTORE'].data['settings']['application'].get('api_access_token') | ||||
|  | ||||
|     # Create a watch | ||||
|     set_original_response() | ||||
|     test_url = url_for('test_endpoint', _external=True) | ||||
| @@ -302,14 +300,27 @@ def test_api_watch_PUT_update(client, live_server, measure_memory_usage): | ||||
|     # Create new | ||||
|     res = client.post( | ||||
|         url_for("createwatch"), | ||||
|         data=json.dumps({"url": test_url, 'tag': "One, Two", "title": "My test URL", 'headers': {'cookie': 'yum'} }), | ||||
|         data=json.dumps({"url": test_url, | ||||
|                          'tag': "One, Two", | ||||
|                          "title": "My test URL", | ||||
|                          'headers': {'cookie': 'yum'}, | ||||
|                          "conditions": [ | ||||
|                              { | ||||
|                                  "field": "page_filtered_text", | ||||
|                                  "operator": "contains_regex", | ||||
|                                  "value": "."  # contains anything | ||||
|                              } | ||||
|                          ], | ||||
|                          "conditions_match_logic": "ALL" | ||||
|                          } | ||||
|                         ), | ||||
|         headers={'content-type': 'application/json', 'x-api-key': api_key}, | ||||
|         follow_redirects=True | ||||
|     ) | ||||
|  | ||||
|     assert res.status_code == 201 | ||||
|  | ||||
|  | ||||
|     wait_for_all_checks(client) | ||||
|     # Get a listing, it will be the first one | ||||
|     res = client.get( | ||||
|         url_for("createwatch"), | ||||
|   | ||||
| @@ -1,15 +1,18 @@ | ||||
| #!/usr/bin/env python3 | ||||
|  | ||||
| from flask import url_for | ||||
| from .util import live_server_setup, wait_for_all_checks | ||||
| from .util import live_server_setup, wait_for_all_checks, set_original_response | ||||
| import json | ||||
| import time | ||||
|  | ||||
| def test_api_tags_listing(client, live_server, measure_memory_usage): | ||||
|    #  live_server_setup(live_server) # Setup on conftest per function | ||||
|     api_key = live_server.app.config['DATASTORE'].data['settings']['application'].get('api_access_token') | ||||
|     tag_title = 'Test Tag' | ||||
|  | ||||
|     # Get a listing | ||||
|  | ||||
|     set_original_response() | ||||
|  | ||||
|     res = client.get( | ||||
|         url_for("tags"), | ||||
|         headers={'x-api-key': api_key} | ||||
| @@ -104,6 +107,8 @@ def test_api_tags_listing(client, live_server, measure_memory_usage): | ||||
|     assert res.status_code == 201 | ||||
|     watch_uuid = res.json.get('uuid') | ||||
|  | ||||
|  | ||||
|     wait_for_all_checks() | ||||
|     # Verify tag is associated with watch by name if need be | ||||
|     res = client.get( | ||||
|         url_for("watch", uuid=watch_uuid), | ||||
| @@ -112,6 +117,21 @@ def test_api_tags_listing(client, live_server, measure_memory_usage): | ||||
|     assert res.status_code == 200 | ||||
|     assert new_tag_uuid in res.json.get('tags', []) | ||||
|  | ||||
|     # Check recheck by tag | ||||
|     before_check_time = live_server.app.config['DATASTORE'].data['watching'][watch_uuid].get('last_checked') | ||||
|     time.sleep(1) | ||||
|     res = client.get( | ||||
|        url_for("tag", uuid=new_tag_uuid) + "?recheck=true", | ||||
|        headers={'x-api-key': api_key} | ||||
|     ) | ||||
|     wait_for_all_checks() | ||||
|     assert res.status_code == 200 | ||||
|     assert b'OK, 1 watches' in res.data | ||||
|  | ||||
|     after_check_time = live_server.app.config['DATASTORE'].data['watching'][watch_uuid].get('last_checked') | ||||
|  | ||||
|     assert before_check_time != after_check_time | ||||
|  | ||||
|     # Delete tag | ||||
|     res = client.delete( | ||||
|         url_for("tag", uuid=new_tag_uuid), | ||||
| @@ -141,3 +161,6 @@ def test_api_tags_listing(client, live_server, measure_memory_usage): | ||||
|         headers={'x-api-key': api_key}, | ||||
|     ) | ||||
|     assert res.status_code == 204 | ||||
|  | ||||
|  | ||||
|  | ||||
|   | ||||
| @@ -4,6 +4,8 @@ import time | ||||
|  | ||||
| from flask import url_for | ||||
| from .util import live_server_setup, wait_for_all_checks | ||||
| from ..model import CONDITIONS_MATCH_LOGIC_DEFAULT | ||||
|  | ||||
|  | ||||
| def set_original_response(number="50"): | ||||
|     test_return_data = f"""<html> | ||||
| @@ -76,7 +78,7 @@ def test_conditions_with_text_and_number(client, live_server): | ||||
|             "fetch_backend": "html_requests", | ||||
|             "include_filters": ".number-container", | ||||
|             "title": "Number AND Text Condition Test", | ||||
|             "conditions_match_logic": "ALL",  # ALL = AND logic | ||||
|             "conditions_match_logic": CONDITIONS_MATCH_LOGIC_DEFAULT,  # ALL = AND logic | ||||
|             "conditions-0-operator": "in", | ||||
|             "conditions-0-field": "page_filtered_text", | ||||
|             "conditions-0-value": "5", | ||||
| @@ -283,7 +285,7 @@ def test_lev_conditions_plugin(client, live_server, measure_memory_usage): | ||||
|         data={ | ||||
|             "url": test_url, | ||||
|             "fetch_backend": "html_requests", | ||||
|             "conditions_match_logic": "ALL",  # ALL = AND logic | ||||
|             "conditions_match_logic": CONDITIONS_MATCH_LOGIC_DEFAULT,  # ALL = AND logic | ||||
|             "conditions-0-field": "levenshtein_ratio", | ||||
|             "conditions-0-operator": "<", | ||||
|             "conditions-0-value": "0.8" # needs to be more of a diff to trigger a change | ||||
|   | ||||
| @@ -46,7 +46,7 @@ def test_check_extract_text_from_diff(client, live_server, measure_memory_usage) | ||||
|         follow_redirects=False | ||||
|     ) | ||||
|  | ||||
|     assert b'Nothing matches that RegEx' not in res.data | ||||
|     assert b'No matches found while scanning all of the watch history for that RegEx.' not in res.data | ||||
|     assert res.content_type == 'text/csv' | ||||
|  | ||||
|     # Read the csv reply as stringio | ||||
|   | ||||
| @@ -1,4 +1,5 @@ | ||||
| from changedetectionio.conditions import execute_ruleset_against_all_plugins | ||||
| from changedetectionio.model import CONDITIONS_MATCH_LOGIC_DEFAULT | ||||
| from changedetectionio.store import ChangeDetectionStore | ||||
| import shutil | ||||
| import tempfile | ||||
| @@ -59,7 +60,7 @@ class TestTriggerConditions(unittest.TestCase): | ||||
|  | ||||
|         self.store.data['watching'][self.watch_uuid].update( | ||||
|             { | ||||
|                 "conditions_match_logic": "ALL", | ||||
|                 "conditions_match_logic": CONDITIONS_MATCH_LOGIC_DEFAULT, | ||||
|                 "conditions": [ | ||||
|                     {"operator": ">=", "field": "extracted_number", "value": "10"}, | ||||
|                     {"operator": "<=", "field": "extracted_number", "value": "5000"}, | ||||
|   | ||||
| @@ -188,15 +188,54 @@ def is_watch_running(watch_uuid): | ||||
|  | ||||
|  | ||||
| def queue_item_async_safe(update_q, item): | ||||
|     """Queue an item for async queue processing""" | ||||
|     if async_loop and not async_loop.is_closed(): | ||||
|     """Bulletproof queue operation with comprehensive error handling""" | ||||
|     item_uuid = 'unknown' | ||||
|      | ||||
|     try: | ||||
|         # Safely extract UUID for logging | ||||
|         if hasattr(item, 'item') and isinstance(item.item, dict): | ||||
|             item_uuid = item.item.get('uuid', 'unknown') | ||||
|     except Exception as uuid_e: | ||||
|         logger.critical(f"CRITICAL: Failed to extract UUID from queue item: {uuid_e}") | ||||
|      | ||||
|     # Validate inputs | ||||
|     if not update_q: | ||||
|         logger.critical(f"CRITICAL: Queue is None/invalid for item {item_uuid}") | ||||
|         return False | ||||
|      | ||||
|     if not item: | ||||
|         logger.critical(f"CRITICAL: Item is None/invalid") | ||||
|         return False | ||||
|      | ||||
|     # Attempt queue operation with multiple fallbacks | ||||
|     try: | ||||
|         # Primary: Use sync interface (thread-safe) | ||||
|         success = update_q.put(item, block=True, timeout=5.0) | ||||
|         if success is False:  # Explicit False return means failure | ||||
|             logger.critical(f"CRITICAL: Queue.put() returned False for item {item_uuid}") | ||||
|             return False | ||||
|          | ||||
|         logger.debug(f"Successfully queued item: {item_uuid}") | ||||
|         return True | ||||
|          | ||||
|     except Exception as e: | ||||
|         logger.critical(f"CRITICAL: Exception during queue operation for item {item_uuid}: {type(e).__name__}: {e}") | ||||
|          | ||||
|         # Secondary: Attempt queue health check | ||||
|         try: | ||||
|             # For async queue, schedule the put operation | ||||
|             asyncio.run_coroutine_threadsafe(update_q.put(item), async_loop) | ||||
|         except RuntimeError as e: | ||||
|             logger.error(f"Failed to queue item: {e}") | ||||
|     else: | ||||
|         logger.error("Async loop not available or closed for queueing item") | ||||
|             queue_size = update_q.qsize() | ||||
|             is_empty = update_q.empty() | ||||
|             logger.critical(f"CRITICAL: Queue health - size: {queue_size}, empty: {is_empty}") | ||||
|         except Exception as health_e: | ||||
|             logger.critical(f"CRITICAL: Queue health check failed: {health_e}") | ||||
|          | ||||
|         # Log queue type for debugging | ||||
|         try: | ||||
|             logger.critical(f"CRITICAL: Queue type: {type(update_q)}, has sync_q: {hasattr(update_q, 'sync_q')}") | ||||
|         except Exception: | ||||
|             logger.critical(f"CRITICAL: Cannot determine queue type") | ||||
|          | ||||
|         return False | ||||
|  | ||||
|  | ||||
| def shutdown_workers(): | ||||
|   | ||||
| @@ -66,6 +66,9 @@ services: | ||||
|   #        A valid timezone name to run as (for scheduling watch checking) see https://en.wikipedia.org/wiki/List_of_tz_database_time_zones | ||||
|   #      - TZ=America/Los_Angeles | ||||
|   # | ||||
|   #        Text processing locale, en_US.UTF-8 used by default unless defined as something else here, UTF-8 should cover 99.99% of cases. | ||||
|   #      - LC_ALL=en_US.UTF-8 | ||||
|   # | ||||
|   #        Maximum height of screenshots, default is 16000 px, screenshots will be clipped to this if exceeded. | ||||
|   #        RAM usage will be higher if you increase this. | ||||
|   #      - SCREENSHOT_MAX_HEIGHT=16000 | ||||
|   | ||||
| @@ -7,6 +7,7 @@ flask-paginate | ||||
| flask_expects_json~=1.7 | ||||
| flask_restful | ||||
| flask_cors # For the Chrome extension to operate | ||||
| janus # Thread-safe async/sync queue bridge | ||||
| flask_wtf~=1.2 | ||||
| flask~=2.3 | ||||
| flask-socketio~=5.5.1 | ||||
| @@ -45,7 +46,7 @@ apprise==1.9.3 | ||||
| paho-mqtt!=2.0.* | ||||
|  | ||||
| # Requires extra wheel for rPi | ||||
| cryptography~=42.0.8 | ||||
| #cryptography~=42.0.8 | ||||
|  | ||||
| # Used for CSS filtering | ||||
| beautifulsoup4>=4.0.0 | ||||
|   | ||||
		Reference in New Issue
	
	Block a user