mirror of
				https://github.com/dgtlmoon/changedetection.io.git
				synced 2025-11-04 00:27:48 +00:00 
			
		
		
		
	Compare commits
	
		
			2 Commits
		
	
	
		
			0.50.10
			...
			default-fa
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
| 
						 | 
					b8e8cc1d20 | ||
| 
						 | 
					405990be0f | 
							
								
								
									
										5
									
								
								.github/test/Dockerfile-alpine
									
									
									
									
										vendored
									
									
								
							
							
						
						
									
										5
									
								
								.github/test/Dockerfile-alpine
									
									
									
									
										vendored
									
									
								
							@@ -2,7 +2,7 @@
 | 
			
		||||
# Test that we can still build on Alpine (musl modified libc https://musl.libc.org/)
 | 
			
		||||
# Some packages wont install via pypi because they dont have a wheel available under this architecture.
 | 
			
		||||
 | 
			
		||||
FROM ghcr.io/linuxserver/baseimage-alpine:3.22
 | 
			
		||||
FROM ghcr.io/linuxserver/baseimage-alpine:3.21
 | 
			
		||||
ENV PYTHONUNBUFFERED=1
 | 
			
		||||
 | 
			
		||||
COPY requirements.txt /requirements.txt
 | 
			
		||||
@@ -24,13 +24,12 @@ RUN \
 | 
			
		||||
  apk add --update --no-cache \
 | 
			
		||||
    libjpeg \
 | 
			
		||||
    libxslt \
 | 
			
		||||
    file \
 | 
			
		||||
    nodejs \
 | 
			
		||||
    poppler-utils \
 | 
			
		||||
    python3 && \
 | 
			
		||||
  echo "**** pip3 install test of changedetection.io ****" && \
 | 
			
		||||
  python3 -m venv /lsiopy  && \
 | 
			
		||||
  pip install -U pip wheel setuptools && \
 | 
			
		||||
  pip install -U --no-cache-dir --find-links https://wheel-index.linuxserver.io/alpine-3.22/ -r /requirements.txt && \
 | 
			
		||||
  pip install -U --no-cache-dir --find-links https://wheel-index.linuxserver.io/alpine-3.21/ -r /requirements.txt && \
 | 
			
		||||
  apk del --purge \
 | 
			
		||||
    build-dependencies
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										2
									
								
								.github/workflows/codeql-analysis.yml
									
									
									
									
										vendored
									
									
								
							
							
						
						
									
										2
									
								
								.github/workflows/codeql-analysis.yml
									
									
									
									
										vendored
									
									
								
							@@ -30,7 +30,7 @@ jobs:
 | 
			
		||||
 | 
			
		||||
    steps:
 | 
			
		||||
    - name: Checkout repository
 | 
			
		||||
      uses: actions/checkout@v5
 | 
			
		||||
      uses: actions/checkout@v4
 | 
			
		||||
 | 
			
		||||
    # Initializes the CodeQL tools for scanning.
 | 
			
		||||
    - name: Initialize CodeQL
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										2
									
								
								.github/workflows/containers.yml
									
									
									
									
										vendored
									
									
								
							
							
						
						
									
										2
									
								
								.github/workflows/containers.yml
									
									
									
									
										vendored
									
									
								
							@@ -39,7 +39,7 @@ jobs:
 | 
			
		||||
    # Or if we are in a tagged release scenario.
 | 
			
		||||
    if: ${{ github.event.workflow_run.conclusion == 'success' }} || ${{ github.event.release.tag_name }} != ''
 | 
			
		||||
    steps:
 | 
			
		||||
      - uses: actions/checkout@v5
 | 
			
		||||
      - uses: actions/checkout@v4
 | 
			
		||||
      - name: Set up Python 3.11
 | 
			
		||||
        uses: actions/setup-python@v5
 | 
			
		||||
        with:
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										6
									
								
								.github/workflows/pypi-release.yml
									
									
									
									
										vendored
									
									
								
							
							
						
						
									
										6
									
								
								.github/workflows/pypi-release.yml
									
									
									
									
										vendored
									
									
								
							@@ -7,7 +7,7 @@ jobs:
 | 
			
		||||
    runs-on: ubuntu-latest
 | 
			
		||||
 | 
			
		||||
    steps:
 | 
			
		||||
    - uses: actions/checkout@v5
 | 
			
		||||
    - uses: actions/checkout@v4
 | 
			
		||||
    - name: Set up Python
 | 
			
		||||
      uses: actions/setup-python@v5
 | 
			
		||||
      with:
 | 
			
		||||
@@ -34,7 +34,7 @@ jobs:
 | 
			
		||||
    - build
 | 
			
		||||
    steps:
 | 
			
		||||
    - name: Download all the dists
 | 
			
		||||
      uses: actions/download-artifact@v5
 | 
			
		||||
      uses: actions/download-artifact@v4
 | 
			
		||||
      with:
 | 
			
		||||
        name: python-package-distributions
 | 
			
		||||
        path: dist/
 | 
			
		||||
@@ -72,7 +72,7 @@ jobs:
 | 
			
		||||
 | 
			
		||||
    steps:
 | 
			
		||||
    - name: Download all the dists
 | 
			
		||||
      uses: actions/download-artifact@v5
 | 
			
		||||
      uses: actions/download-artifact@v4
 | 
			
		||||
      with:
 | 
			
		||||
        name: python-package-distributions
 | 
			
		||||
        path: dist/
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										2
									
								
								.github/workflows/test-container-build.yml
									
									
									
									
										vendored
									
									
								
							
							
						
						
									
										2
									
								
								.github/workflows/test-container-build.yml
									
									
									
									
										vendored
									
									
								
							@@ -46,7 +46,7 @@ jobs:
 | 
			
		||||
          - platform: linux/arm64
 | 
			
		||||
            dockerfile: ./.github/test/Dockerfile-alpine
 | 
			
		||||
    steps:
 | 
			
		||||
        - uses: actions/checkout@v5
 | 
			
		||||
        - uses: actions/checkout@v4
 | 
			
		||||
        - name: Set up Python 3.11
 | 
			
		||||
          uses: actions/setup-python@v5
 | 
			
		||||
          with:
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										2
									
								
								.github/workflows/test-only.yml
									
									
									
									
										vendored
									
									
								
							
							
						
						
									
										2
									
								
								.github/workflows/test-only.yml
									
									
									
									
										vendored
									
									
								
							@@ -7,7 +7,7 @@ jobs:
 | 
			
		||||
  lint-code:
 | 
			
		||||
    runs-on: ubuntu-latest
 | 
			
		||||
    steps:
 | 
			
		||||
      - uses: actions/checkout@v5
 | 
			
		||||
      - uses: actions/checkout@v4
 | 
			
		||||
      - name: Lint with Ruff
 | 
			
		||||
        run: |
 | 
			
		||||
          pip install ruff
 | 
			
		||||
 
 | 
			
		||||
@@ -20,7 +20,7 @@ jobs:
 | 
			
		||||
    env:
 | 
			
		||||
      PYTHON_VERSION: ${{ inputs.python-version }}
 | 
			
		||||
    steps:
 | 
			
		||||
      - uses: actions/checkout@v5
 | 
			
		||||
      - uses: actions/checkout@v4
 | 
			
		||||
 | 
			
		||||
      # Mainly just for link/flake8
 | 
			
		||||
      - name: Set up Python ${{ env.PYTHON_VERSION }}
 | 
			
		||||
 
 | 
			
		||||
@@ -84,9 +84,6 @@ COPY changedetection.py /app/changedetection.py
 | 
			
		||||
ARG LOGGER_LEVEL=''
 | 
			
		||||
ENV LOGGER_LEVEL="$LOGGER_LEVEL"
 | 
			
		||||
 | 
			
		||||
# Default
 | 
			
		||||
ENV LC_ALL=en_US.UTF-8
 | 
			
		||||
 | 
			
		||||
WORKDIR /app
 | 
			
		||||
CMD ["python", "./changedetection.py", "-d", "/datastore"]
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -2,7 +2,7 @@
 | 
			
		||||
 | 
			
		||||
# Read more https://github.com/dgtlmoon/changedetection.io/wiki
 | 
			
		||||
 | 
			
		||||
__version__ = '0.50.10'
 | 
			
		||||
__version__ = '0.50.7'
 | 
			
		||||
 | 
			
		||||
from changedetectionio.strtobool import strtobool
 | 
			
		||||
from json.decoder import JSONDecodeError
 | 
			
		||||
@@ -35,22 +35,13 @@ def sigshutdown_handler(_signo, _stack_frame):
 | 
			
		||||
    app.config.exit.set()
 | 
			
		||||
    datastore.stop_thread = True
 | 
			
		||||
    
 | 
			
		||||
    # Shutdown workers and queues immediately
 | 
			
		||||
    # Shutdown workers immediately
 | 
			
		||||
    try:
 | 
			
		||||
        from changedetectionio import worker_handler
 | 
			
		||||
        worker_handler.shutdown_workers()
 | 
			
		||||
    except Exception as e:
 | 
			
		||||
        logger.error(f"Error shutting down workers: {str(e)}")
 | 
			
		||||
    
 | 
			
		||||
    # Close janus queues properly
 | 
			
		||||
    try:
 | 
			
		||||
        from changedetectionio.flask_app import update_q, notification_q
 | 
			
		||||
        update_q.close()
 | 
			
		||||
        notification_q.close()
 | 
			
		||||
        logger.debug("Janus queues closed successfully")
 | 
			
		||||
    except Exception as e:
 | 
			
		||||
        logger.critical(f"CRITICAL: Failed to close janus queues: {e}")
 | 
			
		||||
    
 | 
			
		||||
    # Shutdown socketio server fast
 | 
			
		||||
    from changedetectionio.flask_app import socketio_server
 | 
			
		||||
    if socketio_server and hasattr(socketio_server, 'shutdown'):
 | 
			
		||||
 
 | 
			
		||||
@@ -1,8 +1,5 @@
 | 
			
		||||
from changedetectionio import queuedWatchMetaData
 | 
			
		||||
from changedetectionio import worker_handler
 | 
			
		||||
from flask_expects_json import expects_json
 | 
			
		||||
from flask_restful import abort, Resource
 | 
			
		||||
 | 
			
		||||
from flask import request
 | 
			
		||||
from . import auth
 | 
			
		||||
 | 
			
		||||
@@ -14,24 +11,21 @@ class Tag(Resource):
 | 
			
		||||
    def __init__(self, **kwargs):
 | 
			
		||||
        # datastore is a black box dependency
 | 
			
		||||
        self.datastore = kwargs['datastore']
 | 
			
		||||
        self.update_q = kwargs['update_q']
 | 
			
		||||
 | 
			
		||||
    # Get information about a single tag
 | 
			
		||||
    # curl http://localhost:5000/api/v1/tag/<string:uuid>
 | 
			
		||||
    @auth.check_token
 | 
			
		||||
    def get(self, uuid):
 | 
			
		||||
        """
 | 
			
		||||
        @api {get} /api/v1/tag/:uuid Single tag - Get data, toggle notification muting, recheck all.
 | 
			
		||||
        @apiDescription Retrieve tag information, set notification_muted status, recheck all in tag.
 | 
			
		||||
        @api {get} /api/v1/tag/:uuid Single tag - get data or toggle notification muting.
 | 
			
		||||
        @apiDescription Retrieve tag information and set notification_muted status
 | 
			
		||||
        @apiExample {curl} Example usage:
 | 
			
		||||
            curl http://localhost:5000/api/v1/tag/cc0cfffa-f449-477b-83ea-0caafd1dc091 -H"x-api-key:813031b16330fe25e3780cf0325daa45"
 | 
			
		||||
            curl "http://localhost:5000/api/v1/tag/cc0cfffa-f449-477b-83ea-0caafd1dc091?muted=muted" -H"x-api-key:813031b16330fe25e3780cf0325daa45"
 | 
			
		||||
            curl "http://localhost:5000/api/v1/tag/cc0cfffa-f449-477b-83ea-0caafd1dc091?recheck=true" -H"x-api-key:813031b16330fe25e3780cf0325daa45"
 | 
			
		||||
        @apiName Tag
 | 
			
		||||
        @apiGroup Tag
 | 
			
		||||
        @apiParam {uuid} uuid Tag unique ID.
 | 
			
		||||
        @apiQuery {String} [muted] =`muted` or =`unmuted` , Sets the MUTE NOTIFICATIONS state
 | 
			
		||||
        @apiQuery {String} [recheck] = True, Queue all watches with this tag for recheck
 | 
			
		||||
        @apiSuccess (200) {String} OK When muted operation OR full JSON object of the tag
 | 
			
		||||
        @apiSuccess (200) {JSON} TagJSON JSON Full JSON object of the tag
 | 
			
		||||
        """
 | 
			
		||||
@@ -40,20 +34,6 @@ class Tag(Resource):
 | 
			
		||||
        if not tag:
 | 
			
		||||
            abort(404, message=f'No tag exists with the UUID of {uuid}')
 | 
			
		||||
 | 
			
		||||
        if request.args.get('recheck'):
 | 
			
		||||
            # Recheck all, including muted
 | 
			
		||||
            # Get most overdue first
 | 
			
		||||
            i=0
 | 
			
		||||
            for k in sorted(self.datastore.data['watching'].items(), key=lambda item: item[1].get('last_checked', 0)):
 | 
			
		||||
                watch_uuid = k[0]
 | 
			
		||||
                watch = k[1]
 | 
			
		||||
                if not watch['paused'] and tag['uuid'] not in watch['tags']:
 | 
			
		||||
                    continue
 | 
			
		||||
                worker_handler.queue_item_async_safe(self.update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': watch_uuid}))
 | 
			
		||||
                i+=1
 | 
			
		||||
 | 
			
		||||
            return f"OK, {i} watches queued", 200
 | 
			
		||||
 | 
			
		||||
        if request.args.get('muted', '') == 'muted':
 | 
			
		||||
            self.datastore.data['settings']['application']['tags'][uuid]['notification_muted'] = True
 | 
			
		||||
            return "OK", 200
 | 
			
		||||
 
 | 
			
		||||
@@ -7,7 +7,6 @@ from changedetectionio.flask_app import watch_check_update
 | 
			
		||||
import asyncio
 | 
			
		||||
import importlib
 | 
			
		||||
import os
 | 
			
		||||
import queue
 | 
			
		||||
import time
 | 
			
		||||
 | 
			
		||||
from loguru import logger
 | 
			
		||||
@@ -38,23 +37,13 @@ async def async_update_worker(worker_id, q, notification_q, app, datastore):
 | 
			
		||||
        watch = None
 | 
			
		||||
 | 
			
		||||
        try:
 | 
			
		||||
            # Use native janus async interface - no threads needed!
 | 
			
		||||
            queued_item_data = await asyncio.wait_for(q.async_get(), timeout=1.0)
 | 
			
		||||
            
 | 
			
		||||
            # Use asyncio wait_for to make queue.get() cancellable
 | 
			
		||||
            queued_item_data = await asyncio.wait_for(q.get(), timeout=1.0)
 | 
			
		||||
        except asyncio.TimeoutError:
 | 
			
		||||
            # No jobs available, continue loop
 | 
			
		||||
            continue
 | 
			
		||||
        except Exception as e:
 | 
			
		||||
            logger.critical(f"CRITICAL: Worker {worker_id} failed to get queue item: {type(e).__name__}: {e}")
 | 
			
		||||
            
 | 
			
		||||
            # Log queue health for debugging
 | 
			
		||||
            try:
 | 
			
		||||
                queue_size = q.qsize()
 | 
			
		||||
                is_empty = q.empty()
 | 
			
		||||
                logger.critical(f"CRITICAL: Worker {worker_id} queue health - size: {queue_size}, empty: {is_empty}")
 | 
			
		||||
            except Exception as health_e:
 | 
			
		||||
                logger.critical(f"CRITICAL: Worker {worker_id} queue health check failed: {health_e}")
 | 
			
		||||
            
 | 
			
		||||
            logger.error(f"Worker {worker_id} error getting queue item: {e}")
 | 
			
		||||
            await asyncio.sleep(0.1)
 | 
			
		||||
            continue
 | 
			
		||||
        
 | 
			
		||||
 
 | 
			
		||||
@@ -93,15 +93,12 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, queuedWatchMe
 | 
			
		||||
            return redirect(url_for('watchlist.index'))
 | 
			
		||||
 | 
			
		||||
        # For submission of requesting an extract
 | 
			
		||||
        extract_form = forms.extractDataForm(formdata=request.form,
 | 
			
		||||
                                             data={'extract_regex': request.form.get('extract_regex', '')}
 | 
			
		||||
                                             )
 | 
			
		||||
        extract_form = forms.extractDataForm(request.form)
 | 
			
		||||
        if not extract_form.validate():
 | 
			
		||||
            flash("An error occurred, please see below.", "error")
 | 
			
		||||
            return _render_diff_template(uuid, extract_form)
 | 
			
		||||
 | 
			
		||||
        else:
 | 
			
		||||
            extract_regex = request.form.get('extract_regex', '').strip()
 | 
			
		||||
            extract_regex = request.form.get('extract_regex').strip()
 | 
			
		||||
            output = watch.extract_regex_from_all_history(extract_regex)
 | 
			
		||||
            if output:
 | 
			
		||||
                watch_dir = os.path.join(datastore.datastore_path, uuid)
 | 
			
		||||
@@ -112,11 +109,12 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, queuedWatchMe
 | 
			
		||||
                response.headers['Expires'] = "0"
 | 
			
		||||
                return response
 | 
			
		||||
 | 
			
		||||
            flash('No matches found while scanning all of the watch history for that RegEx.', 'error')
 | 
			
		||||
        return redirect(url_for('ui.ui_views.diff_history_page', uuid=uuid) + '#extract')
 | 
			
		||||
            flash('Nothing matches that RegEx', 'error')
 | 
			
		||||
        redirect(url_for('ui_views.diff_history_page', uuid=uuid) + '#extract')
 | 
			
		||||
 | 
			
		||||
    def _render_diff_template(uuid, extract_form=None):
 | 
			
		||||
        """Helper function to render the diff template with all required data"""
 | 
			
		||||
    @views_blueprint.route("/diff/<string:uuid>", methods=['GET'])
 | 
			
		||||
    @login_optionally_required
 | 
			
		||||
    def diff_history_page(uuid):
 | 
			
		||||
        from changedetectionio import forms
 | 
			
		||||
 | 
			
		||||
        # More for testing, possible to return the first/only
 | 
			
		||||
@@ -130,11 +128,8 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, queuedWatchMe
 | 
			
		||||
            flash("No history found for the specified link, bad link?", "error")
 | 
			
		||||
            return redirect(url_for('watchlist.index'))
 | 
			
		||||
 | 
			
		||||
        # Use provided form or create a new one
 | 
			
		||||
        if extract_form is None:
 | 
			
		||||
            extract_form = forms.extractDataForm(formdata=request.form,
 | 
			
		||||
                                                 data={'extract_regex': request.form.get('extract_regex', '')}
 | 
			
		||||
                                                 )
 | 
			
		||||
        # For submission of requesting an extract
 | 
			
		||||
        extract_form = forms.extractDataForm(request.form)
 | 
			
		||||
 | 
			
		||||
        history = watch.history
 | 
			
		||||
        dates = list(history.keys())
 | 
			
		||||
@@ -175,7 +170,7 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, queuedWatchMe
 | 
			
		||||
 | 
			
		||||
        datastore.set_last_viewed(uuid, time.time())
 | 
			
		||||
 | 
			
		||||
        return render_template("diff.html",
 | 
			
		||||
        output = render_template("diff.html",
 | 
			
		||||
                                 current_diff_url=watch['url'],
 | 
			
		||||
                                 from_version=str(from_version),
 | 
			
		||||
                                 to_version=str(to_version),
 | 
			
		||||
@@ -198,10 +193,7 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, queuedWatchMe
 | 
			
		||||
                                 watch_a=watch
 | 
			
		||||
                                 )
 | 
			
		||||
 | 
			
		||||
    @views_blueprint.route("/diff/<string:uuid>", methods=['GET'])
 | 
			
		||||
    @login_optionally_required
 | 
			
		||||
    def diff_history_page(uuid):
 | 
			
		||||
        return _render_diff_template(uuid)
 | 
			
		||||
        return output
 | 
			
		||||
 | 
			
		||||
    @views_blueprint.route("/form/add/quickwatch", methods=['POST'])
 | 
			
		||||
    @login_optionally_required
 | 
			
		||||
 
 | 
			
		||||
@@ -12,7 +12,7 @@ from blinker import signal
 | 
			
		||||
 | 
			
		||||
from changedetectionio.strtobool import strtobool
 | 
			
		||||
from threading import Event
 | 
			
		||||
from changedetectionio.queue_handlers import RecheckPriorityQueue, NotificationQueue
 | 
			
		||||
from changedetectionio.custom_queue import SignalPriorityQueue, AsyncSignalPriorityQueue, NotificationQueue
 | 
			
		||||
from changedetectionio import worker_handler
 | 
			
		||||
 | 
			
		||||
from flask import (
 | 
			
		||||
@@ -48,8 +48,8 @@ datastore = None
 | 
			
		||||
ticker_thread = None
 | 
			
		||||
extra_stylesheets = []
 | 
			
		||||
 | 
			
		||||
# Use bulletproof janus-based queues for sync/async reliability  
 | 
			
		||||
update_q = RecheckPriorityQueue()
 | 
			
		||||
# Use async queue by default, keep sync for backward compatibility  
 | 
			
		||||
update_q = AsyncSignalPriorityQueue() if worker_handler.USE_ASYNC_WORKERS else SignalPriorityQueue()
 | 
			
		||||
notification_q = NotificationQueue()
 | 
			
		||||
MAX_QUEUE_SIZE = 2000
 | 
			
		||||
 | 
			
		||||
@@ -329,7 +329,7 @@ def changedetection_app(config=None, datastore_o=None):
 | 
			
		||||
                           resource_class_kwargs={'datastore': datastore})
 | 
			
		||||
 | 
			
		||||
    watch_api.add_resource(Tag, '/api/v1/tag', '/api/v1/tag/<string:uuid>',
 | 
			
		||||
                           resource_class_kwargs={'datastore': datastore, 'update_q': update_q})
 | 
			
		||||
                           resource_class_kwargs={'datastore': datastore})
 | 
			
		||||
                           
 | 
			
		||||
    watch_api.add_resource(Search, '/api/v1/search',
 | 
			
		||||
                           resource_class_kwargs={'datastore': datastore})
 | 
			
		||||
@@ -844,21 +844,15 @@ def ticker_thread_check_time_launch_checks():
 | 
			
		||||
 | 
			
		||||
                    # Use Epoch time as priority, so we get a "sorted" PriorityQueue, but we can still push a priority 1 into it.
 | 
			
		||||
                    priority = int(time.time())
 | 
			
		||||
                    logger.debug(
 | 
			
		||||
                        f"> Queued watch UUID {uuid} "
 | 
			
		||||
                        f"last checked at {watch['last_checked']} "
 | 
			
		||||
                        f"queued at {now:0.2f} priority {priority} "
 | 
			
		||||
                        f"jitter {watch.jitter_seconds:0.2f}s, "
 | 
			
		||||
                        f"{now - watch['last_checked']:0.2f}s since last checked")
 | 
			
		||||
 | 
			
		||||
                    # Into the queue with you
 | 
			
		||||
                    queued_successfully = worker_handler.queue_item_async_safe(update_q,
 | 
			
		||||
                                                                               queuedWatchMetaData.PrioritizedItem(priority=priority,
 | 
			
		||||
                                                                                                                   item={'uuid': uuid})
 | 
			
		||||
                                                                               )
 | 
			
		||||
                    if queued_successfully:
 | 
			
		||||
                        logger.debug(
 | 
			
		||||
                            f"> Queued watch UUID {uuid} "
 | 
			
		||||
                            f"last checked at {watch['last_checked']} "
 | 
			
		||||
                            f"queued at {now:0.2f} priority {priority} "
 | 
			
		||||
                            f"jitter {watch.jitter_seconds:0.2f}s, "
 | 
			
		||||
                            f"{now - watch['last_checked']:0.2f}s since last checked")
 | 
			
		||||
                    else:
 | 
			
		||||
                        logger.critical(f"CRITICAL: Failed to queue watch UUID {uuid} in ticker thread!")
 | 
			
		||||
                    worker_handler.queue_item_async_safe(update_q, queuedWatchMetaData.PrioritizedItem(priority=priority, item={'uuid': uuid}))
 | 
			
		||||
 | 
			
		||||
                    # Reset for next time
 | 
			
		||||
                    watch.jitter_seconds = 0
 | 
			
		||||
 
 | 
			
		||||
@@ -396,19 +396,6 @@ def validate_url(test_url):
 | 
			
		||||
        # This should be wtforms.validators.
 | 
			
		||||
        raise ValidationError('Watch protocol is not permitted by SAFE_PROTOCOL_REGEX or incorrect URL format')
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class ValidateSinglePythonRegexString(object):
 | 
			
		||||
    def __init__(self, message=None):
 | 
			
		||||
        self.message = message
 | 
			
		||||
 | 
			
		||||
    def __call__(self, form, field):
 | 
			
		||||
        try:
 | 
			
		||||
            re.compile(field.data)
 | 
			
		||||
        except re.error:
 | 
			
		||||
            message = field.gettext('RegEx \'%s\' is not a valid regular expression.')
 | 
			
		||||
            raise ValidationError(message % (field.data))
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class ValidateListRegex(object):
 | 
			
		||||
    """
 | 
			
		||||
    Validates that anything that looks like a regex passes as a regex
 | 
			
		||||
@@ -427,7 +414,6 @@ class ValidateListRegex(object):
 | 
			
		||||
                    message = field.gettext('RegEx \'%s\' is not a valid regular expression.')
 | 
			
		||||
                    raise ValidationError(message % (line))
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class ValidateCSSJSONXPATHInput(object):
 | 
			
		||||
    """
 | 
			
		||||
    Filter validation
 | 
			
		||||
@@ -805,5 +791,5 @@ class globalSettingsForm(Form):
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class extractDataForm(Form):
 | 
			
		||||
    extract_regex = StringField('RegEx to extract', validators=[validators.DataRequired(), ValidateSinglePythonRegexString()])
 | 
			
		||||
    extract_regex = StringField('RegEx to extract', validators=[validators.Length(min=1, message="Needs a RegEx")])
 | 
			
		||||
    extract_submit_button = SubmitField('Extract as CSV', render_kw={"class": "pure-button pure-button-primary"})
 | 
			
		||||
 
 | 
			
		||||
@@ -639,7 +639,7 @@ class model(watch_base):
 | 
			
		||||
                    if res:
 | 
			
		||||
                        if not csv_writer:
 | 
			
		||||
                            # A file on the disk can be transferred much faster via flask than a string reply
 | 
			
		||||
                            csv_output_filename = f"report-{self.get('uuid')}.csv"
 | 
			
		||||
                            csv_output_filename = 'report.csv'
 | 
			
		||||
                            f = open(os.path.join(self.watch_data_dir, csv_output_filename), 'w')
 | 
			
		||||
                            # @todo some headers in the future
 | 
			
		||||
                            #fieldnames = ['Epoch seconds', 'Date']
 | 
			
		||||
 
 | 
			
		||||
@@ -3,7 +3,6 @@ import uuid
 | 
			
		||||
 | 
			
		||||
from changedetectionio import strtobool
 | 
			
		||||
default_notification_format_for_watch = 'System default'
 | 
			
		||||
CONDITIONS_MATCH_LOGIC_DEFAULT = 'ALL'
 | 
			
		||||
 | 
			
		||||
class watch_base(dict):
 | 
			
		||||
 | 
			
		||||
@@ -16,8 +15,6 @@ class watch_base(dict):
 | 
			
		||||
            'body': None,
 | 
			
		||||
            'browser_steps': [],
 | 
			
		||||
            'browser_steps_last_error_step': None,
 | 
			
		||||
            'conditions' : {},
 | 
			
		||||
            'conditions_match_logic': CONDITIONS_MATCH_LOGIC_DEFAULT,
 | 
			
		||||
            'check_count': 0,
 | 
			
		||||
            'check_unique_lines': False,  # On change-detected, compare against all history if its something new
 | 
			
		||||
            'consecutive_filter_failures': 0,  # Every time the CSS/xPath filter cannot be located, reset when all is fine.
 | 
			
		||||
 
 | 
			
		||||
@@ -1,435 +0,0 @@
 | 
			
		||||
from blinker import signal
 | 
			
		||||
from loguru import logger
 | 
			
		||||
from typing import Dict, List, Any, Optional
 | 
			
		||||
import heapq
 | 
			
		||||
import queue
 | 
			
		||||
import threading
 | 
			
		||||
 | 
			
		||||
try:
 | 
			
		||||
    import janus
 | 
			
		||||
except ImportError:
 | 
			
		||||
    logger.critical(f"CRITICAL: janus library is required. Install with: pip install janus")
 | 
			
		||||
    raise
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class RecheckPriorityQueue:
 | 
			
		||||
    """
 | 
			
		||||
    Ultra-reliable priority queue using janus for async/sync bridging.
 | 
			
		||||
    
 | 
			
		||||
    CRITICAL DESIGN NOTE: Both sync_q and async_q are required because:
 | 
			
		||||
    - sync_q: Used by Flask routes, ticker threads, and other synchronous code
 | 
			
		||||
    - async_q: Used by async workers (the actual fetchers/processors) and coroutines
 | 
			
		||||
    
 | 
			
		||||
    DO NOT REMOVE EITHER INTERFACE - they bridge different execution contexts:
 | 
			
		||||
    - Synchronous code (Flask, threads) cannot use async methods without blocking
 | 
			
		||||
    - Async code cannot use sync methods without blocking the event loop
 | 
			
		||||
    - janus provides the only safe bridge between these two worlds
 | 
			
		||||
    
 | 
			
		||||
    Attempting to unify to async-only would require:
 | 
			
		||||
    - Converting all Flask routes to async (major breaking change)
 | 
			
		||||
    - Using asyncio.run() in sync contexts (causes deadlocks)
 | 
			
		||||
    - Thread-pool wrapping (adds complexity and overhead)
 | 
			
		||||
    
 | 
			
		||||
    Minimal implementation focused on reliability:
 | 
			
		||||
    - Pure janus for sync/async bridge
 | 
			
		||||
    - Thread-safe priority ordering  
 | 
			
		||||
    - Bulletproof error handling with critical logging
 | 
			
		||||
    """
 | 
			
		||||
    
 | 
			
		||||
    def __init__(self, maxsize: int = 0):
 | 
			
		||||
        try:
 | 
			
		||||
            self._janus_queue = janus.Queue(maxsize=maxsize)
 | 
			
		||||
            # BOTH interfaces required - see class docstring for why
 | 
			
		||||
            self.sync_q = self._janus_queue.sync_q   # Flask routes, ticker thread
 | 
			
		||||
            self.async_q = self._janus_queue.async_q # Async workers
 | 
			
		||||
            
 | 
			
		||||
            # Priority storage - thread-safe
 | 
			
		||||
            self._priority_items = []
 | 
			
		||||
            self._lock = threading.RLock()
 | 
			
		||||
            
 | 
			
		||||
            # Signals for UI updates
 | 
			
		||||
            self.queue_length_signal = signal('queue_length')
 | 
			
		||||
            
 | 
			
		||||
            logger.debug("RecheckPriorityQueue initialized successfully")
 | 
			
		||||
        except Exception as e:
 | 
			
		||||
            logger.critical(f"CRITICAL: Failed to initialize RecheckPriorityQueue: {str(e)}")
 | 
			
		||||
            raise
 | 
			
		||||
    
 | 
			
		||||
    # SYNC INTERFACE (for ticker thread)
 | 
			
		||||
    def put(self, item, block: bool = True, timeout: Optional[float] = None):
 | 
			
		||||
        """Thread-safe sync put with priority ordering"""
 | 
			
		||||
        try:
 | 
			
		||||
            # Add to priority storage
 | 
			
		||||
            with self._lock:
 | 
			
		||||
                heapq.heappush(self._priority_items, item)
 | 
			
		||||
            
 | 
			
		||||
            # Notify via janus sync queue
 | 
			
		||||
            self.sync_q.put(True, block=block, timeout=timeout)
 | 
			
		||||
            
 | 
			
		||||
            # Emit signals
 | 
			
		||||
            self._emit_put_signals(item)
 | 
			
		||||
            
 | 
			
		||||
            logger.debug(f"Successfully queued item: {self._get_item_uuid(item)}")
 | 
			
		||||
            return True
 | 
			
		||||
            
 | 
			
		||||
        except Exception as e:
 | 
			
		||||
            logger.critical(f"CRITICAL: Failed to put item {self._get_item_uuid(item)}: {str(e)}")
 | 
			
		||||
            # Remove from priority storage if janus put failed
 | 
			
		||||
            try:
 | 
			
		||||
                with self._lock:
 | 
			
		||||
                    if item in self._priority_items:
 | 
			
		||||
                        self._priority_items.remove(item)
 | 
			
		||||
                        heapq.heapify(self._priority_items)
 | 
			
		||||
            except Exception as cleanup_e:
 | 
			
		||||
                logger.critical(f"CRITICAL: Failed to cleanup after put failure: {str(e)}")
 | 
			
		||||
            return False
 | 
			
		||||
    
 | 
			
		||||
    def get(self, block: bool = True, timeout: Optional[float] = None):
 | 
			
		||||
        """Thread-safe sync get with priority ordering"""
 | 
			
		||||
        try:
 | 
			
		||||
            # Wait for notification
 | 
			
		||||
            self.sync_q.get(block=block, timeout=timeout)
 | 
			
		||||
            
 | 
			
		||||
            # Get highest priority item
 | 
			
		||||
            with self._lock:
 | 
			
		||||
                if not self._priority_items:
 | 
			
		||||
                    logger.critical(f"CRITICAL: Queue notification received but no priority items available")
 | 
			
		||||
                    raise Exception("Priority queue inconsistency")
 | 
			
		||||
                item = heapq.heappop(self._priority_items)
 | 
			
		||||
            
 | 
			
		||||
            # Emit signals
 | 
			
		||||
            self._emit_get_signals()
 | 
			
		||||
            
 | 
			
		||||
            logger.debug(f"Successfully retrieved item: {self._get_item_uuid(item)}")
 | 
			
		||||
            return item
 | 
			
		||||
            
 | 
			
		||||
        except Exception as e:
 | 
			
		||||
            logger.critical(f"CRITICAL: Failed to get item from queue: {str(e)}")
 | 
			
		||||
            raise
 | 
			
		||||
    
 | 
			
		||||
    # ASYNC INTERFACE (for workers)
 | 
			
		||||
    async def async_put(self, item):
 | 
			
		||||
        """Pure async put with priority ordering"""
 | 
			
		||||
        try:
 | 
			
		||||
            # Add to priority storage
 | 
			
		||||
            with self._lock:
 | 
			
		||||
                heapq.heappush(self._priority_items, item)
 | 
			
		||||
            
 | 
			
		||||
            # Notify via janus async queue
 | 
			
		||||
            await self.async_q.put(True)
 | 
			
		||||
            
 | 
			
		||||
            # Emit signals
 | 
			
		||||
            self._emit_put_signals(item)
 | 
			
		||||
            
 | 
			
		||||
            logger.debug(f"Successfully async queued item: {self._get_item_uuid(item)}")
 | 
			
		||||
            return True
 | 
			
		||||
            
 | 
			
		||||
        except Exception as e:
 | 
			
		||||
            logger.critical(f"CRITICAL: Failed to async put item {self._get_item_uuid(item)}: {str(e)}")
 | 
			
		||||
            # Remove from priority storage if janus put failed
 | 
			
		||||
            try:
 | 
			
		||||
                with self._lock:
 | 
			
		||||
                    if item in self._priority_items:
 | 
			
		||||
                        self._priority_items.remove(item)
 | 
			
		||||
                        heapq.heapify(self._priority_items)
 | 
			
		||||
            except Exception as cleanup_e:
 | 
			
		||||
                logger.critical(f"CRITICAL: Failed to cleanup after async put failure: {str(e)}")
 | 
			
		||||
            return False
 | 
			
		||||
    
 | 
			
		||||
    async def async_get(self):
 | 
			
		||||
        """Pure async get with priority ordering"""
 | 
			
		||||
        try:
 | 
			
		||||
            # Wait for notification
 | 
			
		||||
            await self.async_q.get()
 | 
			
		||||
            
 | 
			
		||||
            # Get highest priority item
 | 
			
		||||
            with self._lock:
 | 
			
		||||
                if not self._priority_items:
 | 
			
		||||
                    logger.critical(f"CRITICAL: Async queue notification received but no priority items available")
 | 
			
		||||
                    raise Exception("Priority queue inconsistency")
 | 
			
		||||
                item = heapq.heappop(self._priority_items)
 | 
			
		||||
            
 | 
			
		||||
            # Emit signals
 | 
			
		||||
            self._emit_get_signals()
 | 
			
		||||
            
 | 
			
		||||
            logger.debug(f"Successfully async retrieved item: {self._get_item_uuid(item)}")
 | 
			
		||||
            return item
 | 
			
		||||
            
 | 
			
		||||
        except Exception as e:
 | 
			
		||||
            logger.critical(f"CRITICAL: Failed to async get item from queue: {str(e)}")
 | 
			
		||||
            raise
 | 
			
		||||
    
 | 
			
		||||
    # UTILITY METHODS
 | 
			
		||||
    def qsize(self) -> int:
 | 
			
		||||
        """Get current queue size"""
 | 
			
		||||
        try:
 | 
			
		||||
            with self._lock:
 | 
			
		||||
                return len(self._priority_items)
 | 
			
		||||
        except Exception as e:
 | 
			
		||||
            logger.critical(f"CRITICAL: Failed to get queue size: {str(e)}")
 | 
			
		||||
            return 0
 | 
			
		||||
    
 | 
			
		||||
    def empty(self) -> bool:
 | 
			
		||||
        """Check if queue is empty"""
 | 
			
		||||
        return self.qsize() == 0
 | 
			
		||||
    
 | 
			
		||||
    def close(self):
 | 
			
		||||
        """Close the janus queue"""
 | 
			
		||||
        try:
 | 
			
		||||
            self._janus_queue.close()
 | 
			
		||||
            logger.debug("RecheckPriorityQueue closed successfully")
 | 
			
		||||
        except Exception as e:
 | 
			
		||||
            logger.critical(f"CRITICAL: Failed to close RecheckPriorityQueue: {str(e)}")
 | 
			
		||||
    
 | 
			
		||||
    # COMPATIBILITY METHODS (from original implementation)
 | 
			
		||||
    @property
 | 
			
		||||
    def queue(self):
 | 
			
		||||
        """Provide compatibility with original queue access"""
 | 
			
		||||
        try:
 | 
			
		||||
            with self._lock:
 | 
			
		||||
                return list(self._priority_items)
 | 
			
		||||
        except Exception as e:
 | 
			
		||||
            logger.critical(f"CRITICAL: Failed to get queue list: {str(e)}")
 | 
			
		||||
            return []
 | 
			
		||||
    
 | 
			
		||||
    def get_uuid_position(self, target_uuid: str) -> Dict[str, Any]:
 | 
			
		||||
        """Find position of UUID in queue"""
 | 
			
		||||
        try:
 | 
			
		||||
            with self._lock:
 | 
			
		||||
                queue_list = list(self._priority_items)
 | 
			
		||||
                total_items = len(queue_list)
 | 
			
		||||
                
 | 
			
		||||
                if total_items == 0:
 | 
			
		||||
                    return {'position': None, 'total_items': 0, 'priority': None, 'found': False}
 | 
			
		||||
                
 | 
			
		||||
                # Find target item
 | 
			
		||||
                for item in queue_list:
 | 
			
		||||
                    if (hasattr(item, 'item') and isinstance(item.item, dict) and 
 | 
			
		||||
                        item.item.get('uuid') == target_uuid):
 | 
			
		||||
                        
 | 
			
		||||
                        # Count items with higher priority
 | 
			
		||||
                        position = sum(1 for other in queue_list if other.priority < item.priority)
 | 
			
		||||
                        return {
 | 
			
		||||
                            'position': position,
 | 
			
		||||
                            'total_items': total_items, 
 | 
			
		||||
                            'priority': item.priority,
 | 
			
		||||
                            'found': True
 | 
			
		||||
                        }
 | 
			
		||||
                
 | 
			
		||||
                return {'position': None, 'total_items': total_items, 'priority': None, 'found': False}
 | 
			
		||||
                
 | 
			
		||||
        except Exception as e:
 | 
			
		||||
            logger.critical(f"CRITICAL: Failed to get UUID position for {target_uuid}: {str(e)}")
 | 
			
		||||
            return {'position': None, 'total_items': 0, 'priority': None, 'found': False}
 | 
			
		||||
    
 | 
			
		||||
    def get_all_queued_uuids(self, limit: Optional[int] = None, offset: int = 0) -> Dict[str, Any]:
 | 
			
		||||
        """Get all queued UUIDs with pagination"""
 | 
			
		||||
        try:
 | 
			
		||||
            with self._lock:
 | 
			
		||||
                queue_list = sorted(self._priority_items)  # Sort by priority
 | 
			
		||||
                total_items = len(queue_list)
 | 
			
		||||
                
 | 
			
		||||
                if total_items == 0:
 | 
			
		||||
                    return {'items': [], 'total_items': 0, 'returned_items': 0, 'has_more': False}
 | 
			
		||||
                
 | 
			
		||||
                # Apply pagination
 | 
			
		||||
                end_idx = min(offset + limit, total_items) if limit else total_items
 | 
			
		||||
                items_to_process = queue_list[offset:end_idx]
 | 
			
		||||
                
 | 
			
		||||
                result = []
 | 
			
		||||
                for position, item in enumerate(items_to_process, start=offset):
 | 
			
		||||
                    if (hasattr(item, 'item') and isinstance(item.item, dict) and 
 | 
			
		||||
                        'uuid' in item.item):
 | 
			
		||||
                        result.append({
 | 
			
		||||
                            'uuid': item.item['uuid'],
 | 
			
		||||
                            'position': position,
 | 
			
		||||
                            'priority': item.priority
 | 
			
		||||
                        })
 | 
			
		||||
                
 | 
			
		||||
                return {
 | 
			
		||||
                    'items': result,
 | 
			
		||||
                    'total_items': total_items,
 | 
			
		||||
                    'returned_items': len(result),
 | 
			
		||||
                    'has_more': (offset + len(result)) < total_items
 | 
			
		||||
                }
 | 
			
		||||
                
 | 
			
		||||
        except Exception as e:
 | 
			
		||||
            logger.critical(f"CRITICAL: Failed to get all queued UUIDs: {str(e)}")
 | 
			
		||||
            return {'items': [], 'total_items': 0, 'returned_items': 0, 'has_more': False}
 | 
			
		||||
    
 | 
			
		||||
    def get_queue_summary(self) -> Dict[str, Any]:
 | 
			
		||||
        """Get queue summary statistics"""
 | 
			
		||||
        try:
 | 
			
		||||
            with self._lock:
 | 
			
		||||
                queue_list = list(self._priority_items)
 | 
			
		||||
                total_items = len(queue_list)
 | 
			
		||||
                
 | 
			
		||||
                if total_items == 0:
 | 
			
		||||
                    return {
 | 
			
		||||
                        'total_items': 0, 'priority_breakdown': {},
 | 
			
		||||
                        'immediate_items': 0, 'clone_items': 0, 'scheduled_items': 0
 | 
			
		||||
                    }
 | 
			
		||||
                
 | 
			
		||||
                immediate_items = clone_items = scheduled_items = 0
 | 
			
		||||
                priority_counts = {}
 | 
			
		||||
                
 | 
			
		||||
                for item in queue_list:
 | 
			
		||||
                    priority = item.priority
 | 
			
		||||
                    priority_counts[priority] = priority_counts.get(priority, 0) + 1
 | 
			
		||||
                    
 | 
			
		||||
                    if priority == 1:
 | 
			
		||||
                        immediate_items += 1
 | 
			
		||||
                    elif priority == 5:
 | 
			
		||||
                        clone_items += 1
 | 
			
		||||
                    elif priority > 100:
 | 
			
		||||
                        scheduled_items += 1
 | 
			
		||||
                
 | 
			
		||||
                return {
 | 
			
		||||
                    'total_items': total_items,
 | 
			
		||||
                    'priority_breakdown': priority_counts,
 | 
			
		||||
                    'immediate_items': immediate_items,
 | 
			
		||||
                    'clone_items': clone_items,
 | 
			
		||||
                    'scheduled_items': scheduled_items,
 | 
			
		||||
                    'min_priority': min(priority_counts.keys()) if priority_counts else None,
 | 
			
		||||
                    'max_priority': max(priority_counts.keys()) if priority_counts else None
 | 
			
		||||
                }
 | 
			
		||||
                
 | 
			
		||||
        except Exception as e:
 | 
			
		||||
            logger.critical(f"CRITICAL: Failed to get queue summary: {str(e)}")
 | 
			
		||||
            return {'total_items': 0, 'priority_breakdown': {}, 'immediate_items': 0, 
 | 
			
		||||
                   'clone_items': 0, 'scheduled_items': 0}
 | 
			
		||||
    
 | 
			
		||||
    # PRIVATE METHODS
 | 
			
		||||
    def _get_item_uuid(self, item) -> str:
 | 
			
		||||
        """Safely extract UUID from item for logging"""
 | 
			
		||||
        try:
 | 
			
		||||
            if hasattr(item, 'item') and isinstance(item.item, dict):
 | 
			
		||||
                return item.item.get('uuid', 'unknown')
 | 
			
		||||
        except Exception:
 | 
			
		||||
            pass
 | 
			
		||||
        return 'unknown'
 | 
			
		||||
    
 | 
			
		||||
    def _emit_put_signals(self, item):
 | 
			
		||||
        """Emit signals when item is added"""
 | 
			
		||||
        try:
 | 
			
		||||
            # Watch update signal
 | 
			
		||||
            if hasattr(item, 'item') and isinstance(item.item, dict) and 'uuid' in item.item:
 | 
			
		||||
                watch_check_update = signal('watch_check_update')
 | 
			
		||||
                if watch_check_update:
 | 
			
		||||
                    watch_check_update.send(watch_uuid=item.item['uuid'])
 | 
			
		||||
            
 | 
			
		||||
            # Queue length signal
 | 
			
		||||
            if self.queue_length_signal:
 | 
			
		||||
                self.queue_length_signal.send(length=self.qsize())
 | 
			
		||||
                
 | 
			
		||||
        except Exception as e:
 | 
			
		||||
            logger.critical(f"CRITICAL: Failed to emit put signals: {str(e)}")
 | 
			
		||||
    
 | 
			
		||||
    def _emit_get_signals(self):
 | 
			
		||||
        """Emit signals when item is removed"""
 | 
			
		||||
        try:
 | 
			
		||||
            if self.queue_length_signal:
 | 
			
		||||
                self.queue_length_signal.send(length=self.qsize())
 | 
			
		||||
        except Exception as e:
 | 
			
		||||
            logger.critical(f"CRITICAL: Failed to emit get signals: {str(e)}")
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class NotificationQueue:
 | 
			
		||||
    """
 | 
			
		||||
    Ultra-reliable notification queue using pure janus.
 | 
			
		||||
    
 | 
			
		||||
    CRITICAL DESIGN NOTE: Both sync_q and async_q are required because:
 | 
			
		||||
    - sync_q: Used by Flask routes, ticker threads, and other synchronous code
 | 
			
		||||
    - async_q: Used by async workers and coroutines
 | 
			
		||||
    
 | 
			
		||||
    DO NOT REMOVE EITHER INTERFACE - they bridge different execution contexts.
 | 
			
		||||
    See RecheckPriorityQueue docstring above for detailed explanation.
 | 
			
		||||
    
 | 
			
		||||
    Simple wrapper around janus with bulletproof error handling.
 | 
			
		||||
    """
 | 
			
		||||
    
 | 
			
		||||
    def __init__(self, maxsize: int = 0):
 | 
			
		||||
        try:
 | 
			
		||||
            self._janus_queue = janus.Queue(maxsize=maxsize)
 | 
			
		||||
            # BOTH interfaces required - see class docstring for why
 | 
			
		||||
            self.sync_q = self._janus_queue.sync_q   # Flask routes, threads
 | 
			
		||||
            self.async_q = self._janus_queue.async_q # Async workers
 | 
			
		||||
            self.notification_event_signal = signal('notification_event')
 | 
			
		||||
            logger.debug("NotificationQueue initialized successfully")
 | 
			
		||||
        except Exception as e:
 | 
			
		||||
            logger.critical(f"CRITICAL: Failed to initialize NotificationQueue: {str(e)}")
 | 
			
		||||
            raise
 | 
			
		||||
    
 | 
			
		||||
    def put(self, item: Dict[str, Any], block: bool = True, timeout: Optional[float] = None):
 | 
			
		||||
        """Thread-safe sync put with signal emission"""
 | 
			
		||||
        try:
 | 
			
		||||
            self.sync_q.put(item, block=block, timeout=timeout)
 | 
			
		||||
            self._emit_notification_signal(item)
 | 
			
		||||
            logger.debug(f"Successfully queued notification: {item.get('uuid', 'unknown')}")
 | 
			
		||||
            return True
 | 
			
		||||
        except Exception as e:
 | 
			
		||||
            logger.critical(f"CRITICAL: Failed to put notification {item.get('uuid', 'unknown')}: {str(e)}")
 | 
			
		||||
            return False
 | 
			
		||||
    
 | 
			
		||||
    async def async_put(self, item: Dict[str, Any]):
 | 
			
		||||
        """Pure async put with signal emission"""
 | 
			
		||||
        try:
 | 
			
		||||
            await self.async_q.put(item)
 | 
			
		||||
            self._emit_notification_signal(item)
 | 
			
		||||
            logger.debug(f"Successfully async queued notification: {item.get('uuid', 'unknown')}")
 | 
			
		||||
            return True
 | 
			
		||||
        except Exception as e:
 | 
			
		||||
            logger.critical(f"CRITICAL: Failed to async put notification {item.get('uuid', 'unknown')}: {str(e)}")
 | 
			
		||||
            return False
 | 
			
		||||
    
 | 
			
		||||
    def get(self, block: bool = True, timeout: Optional[float] = None):
 | 
			
		||||
        """Thread-safe sync get"""
 | 
			
		||||
        try:
 | 
			
		||||
            return self.sync_q.get(block=block, timeout=timeout)
 | 
			
		||||
        except queue.Empty as e:
 | 
			
		||||
            raise e
 | 
			
		||||
        except Exception as e:
 | 
			
		||||
            logger.critical(f"CRITICAL: Failed to get notification: {str(e)}")
 | 
			
		||||
            raise e
 | 
			
		||||
    
 | 
			
		||||
    async def async_get(self):
 | 
			
		||||
        """Pure async get"""
 | 
			
		||||
        try:
 | 
			
		||||
            return await self.async_q.get()
 | 
			
		||||
        except queue.Empty as e:
 | 
			
		||||
            raise e
 | 
			
		||||
        except Exception as e:
 | 
			
		||||
            logger.critical(f"CRITICAL: Failed to async get notification: {str(e)}")
 | 
			
		||||
            raise e
 | 
			
		||||
    
 | 
			
		||||
    def qsize(self) -> int:
 | 
			
		||||
        """Get current queue size"""
 | 
			
		||||
        try:
 | 
			
		||||
            return self.sync_q.qsize()
 | 
			
		||||
        except Exception as e:
 | 
			
		||||
            logger.critical(f"CRITICAL: Failed to get notification queue size: {str(e)}")
 | 
			
		||||
            return 0
 | 
			
		||||
    
 | 
			
		||||
    def empty(self) -> bool:
 | 
			
		||||
        """Check if queue is empty"""
 | 
			
		||||
        return self.qsize() == 0
 | 
			
		||||
    
 | 
			
		||||
    def close(self):
 | 
			
		||||
        """Close the janus queue"""
 | 
			
		||||
        try:
 | 
			
		||||
            self._janus_queue.close()
 | 
			
		||||
            logger.debug("NotificationQueue closed successfully")
 | 
			
		||||
        except Exception as e:
 | 
			
		||||
            logger.critical(f"CRITICAL: Failed to close NotificationQueue: {str(e)}")
 | 
			
		||||
    
 | 
			
		||||
    def _emit_notification_signal(self, item: Dict[str, Any]):
 | 
			
		||||
        """Emit notification signal"""
 | 
			
		||||
        try:
 | 
			
		||||
            if self.notification_event_signal and isinstance(item, dict):
 | 
			
		||||
                watch_uuid = item.get('uuid')
 | 
			
		||||
                if watch_uuid:
 | 
			
		||||
                    self.notification_event_signal.send(watch_uuid=watch_uuid)
 | 
			
		||||
                else:
 | 
			
		||||
                    self.notification_event_signal.send()
 | 
			
		||||
        except Exception as e:
 | 
			
		||||
            logger.critical(f"CRITICAL: Failed to emit notification signal: {str(e)}")
 | 
			
		||||
@@ -6,19 +6,19 @@
 | 
			
		||||
      }
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  tr {
 | 
			
		||||
    /* make the icons and the text inline-ish */
 | 
			
		||||
    td.inline.title-col {
 | 
			
		||||
      .flex-wrapper {
 | 
			
		||||
        display: flex;
 | 
			
		||||
        align-items: center;
 | 
			
		||||
        gap: 4px;
 | 
			
		||||
  &.favicon-enabled {
 | 
			
		||||
    tr {
 | 
			
		||||
      /* make the icons and the text inline-ish */
 | 
			
		||||
      td.inline.title-col {
 | 
			
		||||
        .flex-wrapper {
 | 
			
		||||
          display: flex;
 | 
			
		||||
          align-items: center;
 | 
			
		||||
          gap: 4px;
 | 
			
		||||
        }
 | 
			
		||||
      }
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
  td,
 | 
			
		||||
  th {
 | 
			
		||||
    vertical-align: middle;
 | 
			
		||||
 
 | 
			
		||||
										
											
												File diff suppressed because one or more lines are too long
											
										
									
								
							@@ -292,7 +292,9 @@ def test_access_denied(client, live_server, measure_memory_usage):
 | 
			
		||||
 | 
			
		||||
def test_api_watch_PUT_update(client, live_server, measure_memory_usage):
 | 
			
		||||
 | 
			
		||||
    
 | 
			
		||||
    api_key = live_server.app.config['DATASTORE'].data['settings']['application'].get('api_access_token')
 | 
			
		||||
 | 
			
		||||
    # Create a watch
 | 
			
		||||
    set_original_response()
 | 
			
		||||
    test_url = url_for('test_endpoint', _external=True)
 | 
			
		||||
@@ -300,27 +302,14 @@ def test_api_watch_PUT_update(client, live_server, measure_memory_usage):
 | 
			
		||||
    # Create new
 | 
			
		||||
    res = client.post(
 | 
			
		||||
        url_for("createwatch"),
 | 
			
		||||
        data=json.dumps({"url": test_url,
 | 
			
		||||
                         'tag': "One, Two",
 | 
			
		||||
                         "title": "My test URL",
 | 
			
		||||
                         'headers': {'cookie': 'yum'},
 | 
			
		||||
                         "conditions": [
 | 
			
		||||
                             {
 | 
			
		||||
                                 "field": "page_filtered_text",
 | 
			
		||||
                                 "operator": "contains_regex",
 | 
			
		||||
                                 "value": "."  # contains anything
 | 
			
		||||
                             }
 | 
			
		||||
                         ],
 | 
			
		||||
                         "conditions_match_logic": "ALL"
 | 
			
		||||
                         }
 | 
			
		||||
                        ),
 | 
			
		||||
        data=json.dumps({"url": test_url, 'tag': "One, Two", "title": "My test URL", 'headers': {'cookie': 'yum'} }),
 | 
			
		||||
        headers={'content-type': 'application/json', 'x-api-key': api_key},
 | 
			
		||||
        follow_redirects=True
 | 
			
		||||
    )
 | 
			
		||||
 | 
			
		||||
    assert res.status_code == 201
 | 
			
		||||
 | 
			
		||||
    wait_for_all_checks(client)
 | 
			
		||||
 | 
			
		||||
    # Get a listing, it will be the first one
 | 
			
		||||
    res = client.get(
 | 
			
		||||
        url_for("createwatch"),
 | 
			
		||||
 
 | 
			
		||||
@@ -1,18 +1,15 @@
 | 
			
		||||
#!/usr/bin/env python3
 | 
			
		||||
 | 
			
		||||
from flask import url_for
 | 
			
		||||
from .util import live_server_setup, wait_for_all_checks, set_original_response
 | 
			
		||||
from .util import live_server_setup, wait_for_all_checks
 | 
			
		||||
import json
 | 
			
		||||
import time
 | 
			
		||||
 | 
			
		||||
def test_api_tags_listing(client, live_server, measure_memory_usage):
 | 
			
		||||
   #  live_server_setup(live_server) # Setup on conftest per function
 | 
			
		||||
    api_key = live_server.app.config['DATASTORE'].data['settings']['application'].get('api_access_token')
 | 
			
		||||
    tag_title = 'Test Tag'
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
    set_original_response()
 | 
			
		||||
 | 
			
		||||
    # Get a listing
 | 
			
		||||
    res = client.get(
 | 
			
		||||
        url_for("tags"),
 | 
			
		||||
        headers={'x-api-key': api_key}
 | 
			
		||||
@@ -107,8 +104,6 @@ def test_api_tags_listing(client, live_server, measure_memory_usage):
 | 
			
		||||
    assert res.status_code == 201
 | 
			
		||||
    watch_uuid = res.json.get('uuid')
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
    wait_for_all_checks()
 | 
			
		||||
    # Verify tag is associated with watch by name if need be
 | 
			
		||||
    res = client.get(
 | 
			
		||||
        url_for("watch", uuid=watch_uuid),
 | 
			
		||||
@@ -117,21 +112,6 @@ def test_api_tags_listing(client, live_server, measure_memory_usage):
 | 
			
		||||
    assert res.status_code == 200
 | 
			
		||||
    assert new_tag_uuid in res.json.get('tags', [])
 | 
			
		||||
 | 
			
		||||
    # Check recheck by tag
 | 
			
		||||
    before_check_time = live_server.app.config['DATASTORE'].data['watching'][watch_uuid].get('last_checked')
 | 
			
		||||
    time.sleep(1)
 | 
			
		||||
    res = client.get(
 | 
			
		||||
       url_for("tag", uuid=new_tag_uuid) + "?recheck=true",
 | 
			
		||||
       headers={'x-api-key': api_key}
 | 
			
		||||
    )
 | 
			
		||||
    wait_for_all_checks()
 | 
			
		||||
    assert res.status_code == 200
 | 
			
		||||
    assert b'OK, 1 watches' in res.data
 | 
			
		||||
 | 
			
		||||
    after_check_time = live_server.app.config['DATASTORE'].data['watching'][watch_uuid].get('last_checked')
 | 
			
		||||
 | 
			
		||||
    assert before_check_time != after_check_time
 | 
			
		||||
 | 
			
		||||
    # Delete tag
 | 
			
		||||
    res = client.delete(
 | 
			
		||||
        url_for("tag", uuid=new_tag_uuid),
 | 
			
		||||
@@ -161,6 +141,3 @@ def test_api_tags_listing(client, live_server, measure_memory_usage):
 | 
			
		||||
        headers={'x-api-key': api_key},
 | 
			
		||||
    )
 | 
			
		||||
    assert res.status_code == 204
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -4,8 +4,6 @@ import time
 | 
			
		||||
 | 
			
		||||
from flask import url_for
 | 
			
		||||
from .util import live_server_setup, wait_for_all_checks
 | 
			
		||||
from ..model import CONDITIONS_MATCH_LOGIC_DEFAULT
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def set_original_response(number="50"):
 | 
			
		||||
    test_return_data = f"""<html>
 | 
			
		||||
@@ -78,7 +76,7 @@ def test_conditions_with_text_and_number(client, live_server):
 | 
			
		||||
            "fetch_backend": "html_requests",
 | 
			
		||||
            "include_filters": ".number-container",
 | 
			
		||||
            "title": "Number AND Text Condition Test",
 | 
			
		||||
            "conditions_match_logic": CONDITIONS_MATCH_LOGIC_DEFAULT,  # ALL = AND logic
 | 
			
		||||
            "conditions_match_logic": "ALL",  # ALL = AND logic
 | 
			
		||||
            "conditions-0-operator": "in",
 | 
			
		||||
            "conditions-0-field": "page_filtered_text",
 | 
			
		||||
            "conditions-0-value": "5",
 | 
			
		||||
@@ -285,7 +283,7 @@ def test_lev_conditions_plugin(client, live_server, measure_memory_usage):
 | 
			
		||||
        data={
 | 
			
		||||
            "url": test_url,
 | 
			
		||||
            "fetch_backend": "html_requests",
 | 
			
		||||
            "conditions_match_logic": CONDITIONS_MATCH_LOGIC_DEFAULT,  # ALL = AND logic
 | 
			
		||||
            "conditions_match_logic": "ALL",  # ALL = AND logic
 | 
			
		||||
            "conditions-0-field": "levenshtein_ratio",
 | 
			
		||||
            "conditions-0-operator": "<",
 | 
			
		||||
            "conditions-0-value": "0.8" # needs to be more of a diff to trigger a change
 | 
			
		||||
 
 | 
			
		||||
@@ -46,7 +46,7 @@ def test_check_extract_text_from_diff(client, live_server, measure_memory_usage)
 | 
			
		||||
        follow_redirects=False
 | 
			
		||||
    )
 | 
			
		||||
 | 
			
		||||
    assert b'No matches found while scanning all of the watch history for that RegEx.' not in res.data
 | 
			
		||||
    assert b'Nothing matches that RegEx' not in res.data
 | 
			
		||||
    assert res.content_type == 'text/csv'
 | 
			
		||||
 | 
			
		||||
    # Read the csv reply as stringio
 | 
			
		||||
 
 | 
			
		||||
@@ -1,5 +1,4 @@
 | 
			
		||||
from changedetectionio.conditions import execute_ruleset_against_all_plugins
 | 
			
		||||
from changedetectionio.model import CONDITIONS_MATCH_LOGIC_DEFAULT
 | 
			
		||||
from changedetectionio.store import ChangeDetectionStore
 | 
			
		||||
import shutil
 | 
			
		||||
import tempfile
 | 
			
		||||
@@ -60,7 +59,7 @@ class TestTriggerConditions(unittest.TestCase):
 | 
			
		||||
 | 
			
		||||
        self.store.data['watching'][self.watch_uuid].update(
 | 
			
		||||
            {
 | 
			
		||||
                "conditions_match_logic": CONDITIONS_MATCH_LOGIC_DEFAULT,
 | 
			
		||||
                "conditions_match_logic": "ALL",
 | 
			
		||||
                "conditions": [
 | 
			
		||||
                    {"operator": ">=", "field": "extracted_number", "value": "10"},
 | 
			
		||||
                    {"operator": "<=", "field": "extracted_number", "value": "5000"},
 | 
			
		||||
 
 | 
			
		||||
@@ -188,54 +188,15 @@ def is_watch_running(watch_uuid):
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def queue_item_async_safe(update_q, item):
 | 
			
		||||
    """Bulletproof queue operation with comprehensive error handling"""
 | 
			
		||||
    item_uuid = 'unknown'
 | 
			
		||||
    
 | 
			
		||||
    try:
 | 
			
		||||
        # Safely extract UUID for logging
 | 
			
		||||
        if hasattr(item, 'item') and isinstance(item.item, dict):
 | 
			
		||||
            item_uuid = item.item.get('uuid', 'unknown')
 | 
			
		||||
    except Exception as uuid_e:
 | 
			
		||||
        logger.critical(f"CRITICAL: Failed to extract UUID from queue item: {uuid_e}")
 | 
			
		||||
    
 | 
			
		||||
    # Validate inputs
 | 
			
		||||
    if not update_q:
 | 
			
		||||
        logger.critical(f"CRITICAL: Queue is None/invalid for item {item_uuid}")
 | 
			
		||||
        return False
 | 
			
		||||
    
 | 
			
		||||
    if not item:
 | 
			
		||||
        logger.critical(f"CRITICAL: Item is None/invalid")
 | 
			
		||||
        return False
 | 
			
		||||
    
 | 
			
		||||
    # Attempt queue operation with multiple fallbacks
 | 
			
		||||
    try:
 | 
			
		||||
        # Primary: Use sync interface (thread-safe)
 | 
			
		||||
        success = update_q.put(item, block=True, timeout=5.0)
 | 
			
		||||
        if success is False:  # Explicit False return means failure
 | 
			
		||||
            logger.critical(f"CRITICAL: Queue.put() returned False for item {item_uuid}")
 | 
			
		||||
            return False
 | 
			
		||||
        
 | 
			
		||||
        logger.debug(f"Successfully queued item: {item_uuid}")
 | 
			
		||||
        return True
 | 
			
		||||
        
 | 
			
		||||
    except Exception as e:
 | 
			
		||||
        logger.critical(f"CRITICAL: Exception during queue operation for item {item_uuid}: {type(e).__name__}: {e}")
 | 
			
		||||
        
 | 
			
		||||
        # Secondary: Attempt queue health check
 | 
			
		||||
    """Queue an item for async queue processing"""
 | 
			
		||||
    if async_loop and not async_loop.is_closed():
 | 
			
		||||
        try:
 | 
			
		||||
            queue_size = update_q.qsize()
 | 
			
		||||
            is_empty = update_q.empty()
 | 
			
		||||
            logger.critical(f"CRITICAL: Queue health - size: {queue_size}, empty: {is_empty}")
 | 
			
		||||
        except Exception as health_e:
 | 
			
		||||
            logger.critical(f"CRITICAL: Queue health check failed: {health_e}")
 | 
			
		||||
        
 | 
			
		||||
        # Log queue type for debugging
 | 
			
		||||
        try:
 | 
			
		||||
            logger.critical(f"CRITICAL: Queue type: {type(update_q)}, has sync_q: {hasattr(update_q, 'sync_q')}")
 | 
			
		||||
        except Exception:
 | 
			
		||||
            logger.critical(f"CRITICAL: Cannot determine queue type")
 | 
			
		||||
        
 | 
			
		||||
        return False
 | 
			
		||||
            # For async queue, schedule the put operation
 | 
			
		||||
            asyncio.run_coroutine_threadsafe(update_q.put(item), async_loop)
 | 
			
		||||
        except RuntimeError as e:
 | 
			
		||||
            logger.error(f"Failed to queue item: {e}")
 | 
			
		||||
    else:
 | 
			
		||||
        logger.error("Async loop not available or closed for queueing item")
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def shutdown_workers():
 | 
			
		||||
 
 | 
			
		||||
@@ -66,9 +66,6 @@ services:
 | 
			
		||||
  #        A valid timezone name to run as (for scheduling watch checking) see https://en.wikipedia.org/wiki/List_of_tz_database_time_zones
 | 
			
		||||
  #      - TZ=America/Los_Angeles
 | 
			
		||||
  #
 | 
			
		||||
  #        Text processing locale, en_US.UTF-8 used by default unless defined as something else here, UTF-8 should cover 99.99% of cases.
 | 
			
		||||
  #      - LC_ALL=en_US.UTF-8
 | 
			
		||||
  #
 | 
			
		||||
  #        Maximum height of screenshots, default is 16000 px, screenshots will be clipped to this if exceeded.
 | 
			
		||||
  #        RAM usage will be higher if you increase this.
 | 
			
		||||
  #      - SCREENSHOT_MAX_HEIGHT=16000
 | 
			
		||||
 
 | 
			
		||||
@@ -7,7 +7,6 @@ flask-paginate
 | 
			
		||||
flask_expects_json~=1.7
 | 
			
		||||
flask_restful
 | 
			
		||||
flask_cors # For the Chrome extension to operate
 | 
			
		||||
janus # Thread-safe async/sync queue bridge
 | 
			
		||||
flask_wtf~=1.2
 | 
			
		||||
flask~=2.3
 | 
			
		||||
flask-socketio~=5.5.1
 | 
			
		||||
@@ -46,7 +45,7 @@ apprise==1.9.3
 | 
			
		||||
paho-mqtt!=2.0.*
 | 
			
		||||
 | 
			
		||||
# Requires extra wheel for rPi
 | 
			
		||||
#cryptography~=42.0.8
 | 
			
		||||
cryptography~=42.0.8
 | 
			
		||||
 | 
			
		||||
# Used for CSS filtering
 | 
			
		||||
beautifulsoup4>=4.0.0
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user