mirror of
https://github.com/dgtlmoon/changedetection.io.git
synced 2025-11-12 20:46:17 +00:00
Compare commits
12 Commits
default-fa
...
janus-queu
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
213a18c061 | ||
|
|
1633b94511 | ||
|
|
928b97e6e5 | ||
|
|
ea09168650 | ||
|
|
4f6e9dcc56 | ||
|
|
f0588a9dd1 | ||
|
|
aa4e182549 | ||
|
|
fe1f7c30e1 | ||
|
|
e5ed1ae349 | ||
|
|
d1b1dd70f4 | ||
|
|
93b14c9fc8 | ||
|
|
c9c5de20d8 |
5
.github/test/Dockerfile-alpine
vendored
5
.github/test/Dockerfile-alpine
vendored
@@ -2,7 +2,7 @@
|
|||||||
# Test that we can still build on Alpine (musl modified libc https://musl.libc.org/)
|
# Test that we can still build on Alpine (musl modified libc https://musl.libc.org/)
|
||||||
# Some packages wont install via pypi because they dont have a wheel available under this architecture.
|
# Some packages wont install via pypi because they dont have a wheel available under this architecture.
|
||||||
|
|
||||||
FROM ghcr.io/linuxserver/baseimage-alpine:3.21
|
FROM ghcr.io/linuxserver/baseimage-alpine:3.22
|
||||||
ENV PYTHONUNBUFFERED=1
|
ENV PYTHONUNBUFFERED=1
|
||||||
|
|
||||||
COPY requirements.txt /requirements.txt
|
COPY requirements.txt /requirements.txt
|
||||||
@@ -24,12 +24,13 @@ RUN \
|
|||||||
apk add --update --no-cache \
|
apk add --update --no-cache \
|
||||||
libjpeg \
|
libjpeg \
|
||||||
libxslt \
|
libxslt \
|
||||||
|
file \
|
||||||
nodejs \
|
nodejs \
|
||||||
poppler-utils \
|
poppler-utils \
|
||||||
python3 && \
|
python3 && \
|
||||||
echo "**** pip3 install test of changedetection.io ****" && \
|
echo "**** pip3 install test of changedetection.io ****" && \
|
||||||
python3 -m venv /lsiopy && \
|
python3 -m venv /lsiopy && \
|
||||||
pip install -U pip wheel setuptools && \
|
pip install -U pip wheel setuptools && \
|
||||||
pip install -U --no-cache-dir --find-links https://wheel-index.linuxserver.io/alpine-3.21/ -r /requirements.txt && \
|
pip install -U --no-cache-dir --find-links https://wheel-index.linuxserver.io/alpine-3.22/ -r /requirements.txt && \
|
||||||
apk del --purge \
|
apk del --purge \
|
||||||
build-dependencies
|
build-dependencies
|
||||||
|
|||||||
4
.github/workflows/pypi-release.yml
vendored
4
.github/workflows/pypi-release.yml
vendored
@@ -34,7 +34,7 @@ jobs:
|
|||||||
- build
|
- build
|
||||||
steps:
|
steps:
|
||||||
- name: Download all the dists
|
- name: Download all the dists
|
||||||
uses: actions/download-artifact@v4
|
uses: actions/download-artifact@v5
|
||||||
with:
|
with:
|
||||||
name: python-package-distributions
|
name: python-package-distributions
|
||||||
path: dist/
|
path: dist/
|
||||||
@@ -72,7 +72,7 @@ jobs:
|
|||||||
|
|
||||||
steps:
|
steps:
|
||||||
- name: Download all the dists
|
- name: Download all the dists
|
||||||
uses: actions/download-artifact@v4
|
uses: actions/download-artifact@v5
|
||||||
with:
|
with:
|
||||||
name: python-package-distributions
|
name: python-package-distributions
|
||||||
path: dist/
|
path: dist/
|
||||||
|
|||||||
@@ -84,6 +84,9 @@ COPY changedetection.py /app/changedetection.py
|
|||||||
ARG LOGGER_LEVEL=''
|
ARG LOGGER_LEVEL=''
|
||||||
ENV LOGGER_LEVEL="$LOGGER_LEVEL"
|
ENV LOGGER_LEVEL="$LOGGER_LEVEL"
|
||||||
|
|
||||||
|
# Default
|
||||||
|
ENV LC_ALL=en_US.UTF-8
|
||||||
|
|
||||||
WORKDIR /app
|
WORKDIR /app
|
||||||
CMD ["python", "./changedetection.py", "-d", "/datastore"]
|
CMD ["python", "./changedetection.py", "-d", "/datastore"]
|
||||||
|
|
||||||
|
|||||||
@@ -2,7 +2,7 @@
|
|||||||
|
|
||||||
# Read more https://github.com/dgtlmoon/changedetection.io/wiki
|
# Read more https://github.com/dgtlmoon/changedetection.io/wiki
|
||||||
|
|
||||||
__version__ = '0.50.7'
|
__version__ = '0.50.8'
|
||||||
|
|
||||||
from changedetectionio.strtobool import strtobool
|
from changedetectionio.strtobool import strtobool
|
||||||
from json.decoder import JSONDecodeError
|
from json.decoder import JSONDecodeError
|
||||||
@@ -35,13 +35,22 @@ def sigshutdown_handler(_signo, _stack_frame):
|
|||||||
app.config.exit.set()
|
app.config.exit.set()
|
||||||
datastore.stop_thread = True
|
datastore.stop_thread = True
|
||||||
|
|
||||||
# Shutdown workers immediately
|
# Shutdown workers and queues immediately
|
||||||
try:
|
try:
|
||||||
from changedetectionio import worker_handler
|
from changedetectionio import worker_handler
|
||||||
worker_handler.shutdown_workers()
|
worker_handler.shutdown_workers()
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Error shutting down workers: {str(e)}")
|
logger.error(f"Error shutting down workers: {str(e)}")
|
||||||
|
|
||||||
|
# Close janus queues properly
|
||||||
|
try:
|
||||||
|
from changedetectionio.flask_app import update_q, notification_q
|
||||||
|
update_q.close()
|
||||||
|
notification_q.close()
|
||||||
|
logger.debug("Janus queues closed successfully")
|
||||||
|
except Exception as e:
|
||||||
|
logger.critical(f"CRITICAL: Failed to close janus queues: {e}")
|
||||||
|
|
||||||
# Shutdown socketio server fast
|
# Shutdown socketio server fast
|
||||||
from changedetectionio.flask_app import socketio_server
|
from changedetectionio.flask_app import socketio_server
|
||||||
if socketio_server and hasattr(socketio_server, 'shutdown'):
|
if socketio_server and hasattr(socketio_server, 'shutdown'):
|
||||||
|
|||||||
@@ -7,6 +7,7 @@ from changedetectionio.flask_app import watch_check_update
|
|||||||
import asyncio
|
import asyncio
|
||||||
import importlib
|
import importlib
|
||||||
import os
|
import os
|
||||||
|
import queue
|
||||||
import time
|
import time
|
||||||
|
|
||||||
from loguru import logger
|
from loguru import logger
|
||||||
@@ -37,13 +38,23 @@ async def async_update_worker(worker_id, q, notification_q, app, datastore):
|
|||||||
watch = None
|
watch = None
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# Use asyncio wait_for to make queue.get() cancellable
|
# Use native janus async interface - no threads needed!
|
||||||
queued_item_data = await asyncio.wait_for(q.get(), timeout=1.0)
|
queued_item_data = await asyncio.wait_for(q.async_get(), timeout=1.0)
|
||||||
|
|
||||||
except asyncio.TimeoutError:
|
except asyncio.TimeoutError:
|
||||||
# No jobs available, continue loop
|
# No jobs available, continue loop
|
||||||
continue
|
continue
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Worker {worker_id} error getting queue item: {e}")
|
logger.critical(f"CRITICAL: Worker {worker_id} failed to get queue item: {type(e).__name__}: {e}")
|
||||||
|
|
||||||
|
# Log queue health for debugging
|
||||||
|
try:
|
||||||
|
queue_size = q.qsize()
|
||||||
|
is_empty = q.empty()
|
||||||
|
logger.critical(f"CRITICAL: Worker {worker_id} queue health - size: {queue_size}, empty: {is_empty}")
|
||||||
|
except Exception as health_e:
|
||||||
|
logger.critical(f"CRITICAL: Worker {worker_id} queue health check failed: {health_e}")
|
||||||
|
|
||||||
await asyncio.sleep(0.1)
|
await asyncio.sleep(0.1)
|
||||||
continue
|
continue
|
||||||
|
|
||||||
|
|||||||
@@ -93,12 +93,15 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, queuedWatchMe
|
|||||||
return redirect(url_for('watchlist.index'))
|
return redirect(url_for('watchlist.index'))
|
||||||
|
|
||||||
# For submission of requesting an extract
|
# For submission of requesting an extract
|
||||||
extract_form = forms.extractDataForm(request.form)
|
extract_form = forms.extractDataForm(formdata=request.form,
|
||||||
|
data={'extract_regex': request.form.get('extract_regex', '')}
|
||||||
|
)
|
||||||
if not extract_form.validate():
|
if not extract_form.validate():
|
||||||
flash("An error occurred, please see below.", "error")
|
flash("An error occurred, please see below.", "error")
|
||||||
|
return _render_diff_template(uuid, extract_form)
|
||||||
|
|
||||||
else:
|
else:
|
||||||
extract_regex = request.form.get('extract_regex').strip()
|
extract_regex = request.form.get('extract_regex', '').strip()
|
||||||
output = watch.extract_regex_from_all_history(extract_regex)
|
output = watch.extract_regex_from_all_history(extract_regex)
|
||||||
if output:
|
if output:
|
||||||
watch_dir = os.path.join(datastore.datastore_path, uuid)
|
watch_dir = os.path.join(datastore.datastore_path, uuid)
|
||||||
@@ -109,12 +112,11 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, queuedWatchMe
|
|||||||
response.headers['Expires'] = "0"
|
response.headers['Expires'] = "0"
|
||||||
return response
|
return response
|
||||||
|
|
||||||
flash('Nothing matches that RegEx', 'error')
|
flash('No matches found while scanning all of the watch history for that RegEx.', 'error')
|
||||||
redirect(url_for('ui_views.diff_history_page', uuid=uuid) + '#extract')
|
return redirect(url_for('ui.ui_views.diff_history_page', uuid=uuid) + '#extract')
|
||||||
|
|
||||||
@views_blueprint.route("/diff/<string:uuid>", methods=['GET'])
|
def _render_diff_template(uuid, extract_form=None):
|
||||||
@login_optionally_required
|
"""Helper function to render the diff template with all required data"""
|
||||||
def diff_history_page(uuid):
|
|
||||||
from changedetectionio import forms
|
from changedetectionio import forms
|
||||||
|
|
||||||
# More for testing, possible to return the first/only
|
# More for testing, possible to return the first/only
|
||||||
@@ -128,8 +130,11 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, queuedWatchMe
|
|||||||
flash("No history found for the specified link, bad link?", "error")
|
flash("No history found for the specified link, bad link?", "error")
|
||||||
return redirect(url_for('watchlist.index'))
|
return redirect(url_for('watchlist.index'))
|
||||||
|
|
||||||
# For submission of requesting an extract
|
# Use provided form or create a new one
|
||||||
extract_form = forms.extractDataForm(request.form)
|
if extract_form is None:
|
||||||
|
extract_form = forms.extractDataForm(formdata=request.form,
|
||||||
|
data={'extract_regex': request.form.get('extract_regex', '')}
|
||||||
|
)
|
||||||
|
|
||||||
history = watch.history
|
history = watch.history
|
||||||
dates = list(history.keys())
|
dates = list(history.keys())
|
||||||
@@ -170,7 +175,7 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, queuedWatchMe
|
|||||||
|
|
||||||
datastore.set_last_viewed(uuid, time.time())
|
datastore.set_last_viewed(uuid, time.time())
|
||||||
|
|
||||||
output = render_template("diff.html",
|
return render_template("diff.html",
|
||||||
current_diff_url=watch['url'],
|
current_diff_url=watch['url'],
|
||||||
from_version=str(from_version),
|
from_version=str(from_version),
|
||||||
to_version=str(to_version),
|
to_version=str(to_version),
|
||||||
@@ -193,7 +198,10 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, queuedWatchMe
|
|||||||
watch_a=watch
|
watch_a=watch
|
||||||
)
|
)
|
||||||
|
|
||||||
return output
|
@views_blueprint.route("/diff/<string:uuid>", methods=['GET'])
|
||||||
|
@login_optionally_required
|
||||||
|
def diff_history_page(uuid):
|
||||||
|
return _render_diff_template(uuid)
|
||||||
|
|
||||||
@views_blueprint.route("/form/add/quickwatch", methods=['POST'])
|
@views_blueprint.route("/form/add/quickwatch", methods=['POST'])
|
||||||
@login_optionally_required
|
@login_optionally_required
|
||||||
|
|||||||
@@ -12,7 +12,7 @@ from blinker import signal
|
|||||||
|
|
||||||
from changedetectionio.strtobool import strtobool
|
from changedetectionio.strtobool import strtobool
|
||||||
from threading import Event
|
from threading import Event
|
||||||
from changedetectionio.custom_queue import SignalPriorityQueue, AsyncSignalPriorityQueue, NotificationQueue
|
from changedetectionio.queue_handlers import RecheckPriorityQueue, NotificationQueue
|
||||||
from changedetectionio import worker_handler
|
from changedetectionio import worker_handler
|
||||||
|
|
||||||
from flask import (
|
from flask import (
|
||||||
@@ -48,8 +48,8 @@ datastore = None
|
|||||||
ticker_thread = None
|
ticker_thread = None
|
||||||
extra_stylesheets = []
|
extra_stylesheets = []
|
||||||
|
|
||||||
# Use async queue by default, keep sync for backward compatibility
|
# Use bulletproof janus-based queues for sync/async reliability
|
||||||
update_q = AsyncSignalPriorityQueue() if worker_handler.USE_ASYNC_WORKERS else SignalPriorityQueue()
|
update_q = RecheckPriorityQueue()
|
||||||
notification_q = NotificationQueue()
|
notification_q = NotificationQueue()
|
||||||
MAX_QUEUE_SIZE = 2000
|
MAX_QUEUE_SIZE = 2000
|
||||||
|
|
||||||
@@ -844,15 +844,21 @@ def ticker_thread_check_time_launch_checks():
|
|||||||
|
|
||||||
# Use Epoch time as priority, so we get a "sorted" PriorityQueue, but we can still push a priority 1 into it.
|
# Use Epoch time as priority, so we get a "sorted" PriorityQueue, but we can still push a priority 1 into it.
|
||||||
priority = int(time.time())
|
priority = int(time.time())
|
||||||
|
|
||||||
|
# Into the queue with you
|
||||||
|
queued_successfully = worker_handler.queue_item_async_safe(update_q,
|
||||||
|
queuedWatchMetaData.PrioritizedItem(priority=priority,
|
||||||
|
item={'uuid': uuid})
|
||||||
|
)
|
||||||
|
if queued_successfully:
|
||||||
logger.debug(
|
logger.debug(
|
||||||
f"> Queued watch UUID {uuid} "
|
f"> Queued watch UUID {uuid} "
|
||||||
f"last checked at {watch['last_checked']} "
|
f"last checked at {watch['last_checked']} "
|
||||||
f"queued at {now:0.2f} priority {priority} "
|
f"queued at {now:0.2f} priority {priority} "
|
||||||
f"jitter {watch.jitter_seconds:0.2f}s, "
|
f"jitter {watch.jitter_seconds:0.2f}s, "
|
||||||
f"{now - watch['last_checked']:0.2f}s since last checked")
|
f"{now - watch['last_checked']:0.2f}s since last checked")
|
||||||
|
else:
|
||||||
# Into the queue with you
|
logger.critical(f"CRITICAL: Failed to queue watch UUID {uuid} in ticker thread!")
|
||||||
worker_handler.queue_item_async_safe(update_q, queuedWatchMetaData.PrioritizedItem(priority=priority, item={'uuid': uuid}))
|
|
||||||
|
|
||||||
# Reset for next time
|
# Reset for next time
|
||||||
watch.jitter_seconds = 0
|
watch.jitter_seconds = 0
|
||||||
|
|||||||
@@ -396,6 +396,19 @@ def validate_url(test_url):
|
|||||||
# This should be wtforms.validators.
|
# This should be wtforms.validators.
|
||||||
raise ValidationError('Watch protocol is not permitted by SAFE_PROTOCOL_REGEX or incorrect URL format')
|
raise ValidationError('Watch protocol is not permitted by SAFE_PROTOCOL_REGEX or incorrect URL format')
|
||||||
|
|
||||||
|
|
||||||
|
class ValidateSinglePythonRegexString(object):
|
||||||
|
def __init__(self, message=None):
|
||||||
|
self.message = message
|
||||||
|
|
||||||
|
def __call__(self, form, field):
|
||||||
|
try:
|
||||||
|
re.compile(field.data)
|
||||||
|
except re.error:
|
||||||
|
message = field.gettext('RegEx \'%s\' is not a valid regular expression.')
|
||||||
|
raise ValidationError(message % (field.data))
|
||||||
|
|
||||||
|
|
||||||
class ValidateListRegex(object):
|
class ValidateListRegex(object):
|
||||||
"""
|
"""
|
||||||
Validates that anything that looks like a regex passes as a regex
|
Validates that anything that looks like a regex passes as a regex
|
||||||
@@ -414,6 +427,7 @@ class ValidateListRegex(object):
|
|||||||
message = field.gettext('RegEx \'%s\' is not a valid regular expression.')
|
message = field.gettext('RegEx \'%s\' is not a valid regular expression.')
|
||||||
raise ValidationError(message % (line))
|
raise ValidationError(message % (line))
|
||||||
|
|
||||||
|
|
||||||
class ValidateCSSJSONXPATHInput(object):
|
class ValidateCSSJSONXPATHInput(object):
|
||||||
"""
|
"""
|
||||||
Filter validation
|
Filter validation
|
||||||
@@ -791,5 +805,5 @@ class globalSettingsForm(Form):
|
|||||||
|
|
||||||
|
|
||||||
class extractDataForm(Form):
|
class extractDataForm(Form):
|
||||||
extract_regex = StringField('RegEx to extract', validators=[validators.Length(min=1, message="Needs a RegEx")])
|
extract_regex = StringField('RegEx to extract', validators=[validators.DataRequired(), ValidateSinglePythonRegexString()])
|
||||||
extract_submit_button = SubmitField('Extract as CSV', render_kw={"class": "pure-button pure-button-primary"})
|
extract_submit_button = SubmitField('Extract as CSV', render_kw={"class": "pure-button pure-button-primary"})
|
||||||
|
|||||||
@@ -639,7 +639,7 @@ class model(watch_base):
|
|||||||
if res:
|
if res:
|
||||||
if not csv_writer:
|
if not csv_writer:
|
||||||
# A file on the disk can be transferred much faster via flask than a string reply
|
# A file on the disk can be transferred much faster via flask than a string reply
|
||||||
csv_output_filename = 'report.csv'
|
csv_output_filename = f"report-{self.get('uuid')}.csv"
|
||||||
f = open(os.path.join(self.watch_data_dir, csv_output_filename), 'w')
|
f = open(os.path.join(self.watch_data_dir, csv_output_filename), 'w')
|
||||||
# @todo some headers in the future
|
# @todo some headers in the future
|
||||||
#fieldnames = ['Epoch seconds', 'Date']
|
#fieldnames = ['Epoch seconds', 'Date']
|
||||||
|
|||||||
@@ -3,6 +3,7 @@ import uuid
|
|||||||
|
|
||||||
from changedetectionio import strtobool
|
from changedetectionio import strtobool
|
||||||
default_notification_format_for_watch = 'System default'
|
default_notification_format_for_watch = 'System default'
|
||||||
|
CONDITIONS_MATCH_LOGIC_DEFAULT = 'ALL'
|
||||||
|
|
||||||
class watch_base(dict):
|
class watch_base(dict):
|
||||||
|
|
||||||
@@ -15,6 +16,8 @@ class watch_base(dict):
|
|||||||
'body': None,
|
'body': None,
|
||||||
'browser_steps': [],
|
'browser_steps': [],
|
||||||
'browser_steps_last_error_step': None,
|
'browser_steps_last_error_step': None,
|
||||||
|
'conditions' : {},
|
||||||
|
'conditions_match_logic': CONDITIONS_MATCH_LOGIC_DEFAULT,
|
||||||
'check_count': 0,
|
'check_count': 0,
|
||||||
'check_unique_lines': False, # On change-detected, compare against all history if its something new
|
'check_unique_lines': False, # On change-detected, compare against all history if its something new
|
||||||
'consecutive_filter_failures': 0, # Every time the CSS/xPath filter cannot be located, reset when all is fine.
|
'consecutive_filter_failures': 0, # Every time the CSS/xPath filter cannot be located, reset when all is fine.
|
||||||
|
|||||||
430
changedetectionio/queue_handlers.py
Normal file
430
changedetectionio/queue_handlers.py
Normal file
@@ -0,0 +1,430 @@
|
|||||||
|
import heapq
|
||||||
|
import threading
|
||||||
|
from typing import Dict, List, Any, Optional
|
||||||
|
from blinker import signal
|
||||||
|
from loguru import logger
|
||||||
|
|
||||||
|
try:
|
||||||
|
import janus
|
||||||
|
except ImportError:
|
||||||
|
logger.critical("CRITICAL: janus library is required. Install with: pip install janus")
|
||||||
|
raise
|
||||||
|
|
||||||
|
|
||||||
|
class RecheckPriorityQueue:
|
||||||
|
"""
|
||||||
|
Ultra-reliable priority queue using janus for async/sync bridging.
|
||||||
|
|
||||||
|
CRITICAL DESIGN NOTE: Both sync_q and async_q are required because:
|
||||||
|
- sync_q: Used by Flask routes, ticker threads, and other synchronous code
|
||||||
|
- async_q: Used by async workers (the actual fetchers/processors) and coroutines
|
||||||
|
|
||||||
|
DO NOT REMOVE EITHER INTERFACE - they bridge different execution contexts:
|
||||||
|
- Synchronous code (Flask, threads) cannot use async methods without blocking
|
||||||
|
- Async code cannot use sync methods without blocking the event loop
|
||||||
|
- janus provides the only safe bridge between these two worlds
|
||||||
|
|
||||||
|
Attempting to unify to async-only would require:
|
||||||
|
- Converting all Flask routes to async (major breaking change)
|
||||||
|
- Using asyncio.run() in sync contexts (causes deadlocks)
|
||||||
|
- Thread-pool wrapping (adds complexity and overhead)
|
||||||
|
|
||||||
|
Minimal implementation focused on reliability:
|
||||||
|
- Pure janus for sync/async bridge
|
||||||
|
- Thread-safe priority ordering
|
||||||
|
- Bulletproof error handling with critical logging
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, maxsize: int = 0):
|
||||||
|
try:
|
||||||
|
self._janus_queue = janus.Queue(maxsize=maxsize)
|
||||||
|
# BOTH interfaces required - see class docstring for why
|
||||||
|
self.sync_q = self._janus_queue.sync_q # Flask routes, ticker thread
|
||||||
|
self.async_q = self._janus_queue.async_q # Async workers
|
||||||
|
|
||||||
|
# Priority storage - thread-safe
|
||||||
|
self._priority_items = []
|
||||||
|
self._lock = threading.RLock()
|
||||||
|
|
||||||
|
# Signals for UI updates
|
||||||
|
self.queue_length_signal = signal('queue_length')
|
||||||
|
|
||||||
|
logger.debug("RecheckPriorityQueue initialized successfully")
|
||||||
|
except Exception as e:
|
||||||
|
logger.critical(f"CRITICAL: Failed to initialize RecheckPriorityQueue: {e}")
|
||||||
|
raise
|
||||||
|
|
||||||
|
# SYNC INTERFACE (for ticker thread)
|
||||||
|
def put(self, item, block: bool = True, timeout: Optional[float] = None):
|
||||||
|
"""Thread-safe sync put with priority ordering"""
|
||||||
|
try:
|
||||||
|
# Add to priority storage
|
||||||
|
with self._lock:
|
||||||
|
heapq.heappush(self._priority_items, item)
|
||||||
|
|
||||||
|
# Notify via janus sync queue
|
||||||
|
self.sync_q.put(True, block=block, timeout=timeout)
|
||||||
|
|
||||||
|
# Emit signals
|
||||||
|
self._emit_put_signals(item)
|
||||||
|
|
||||||
|
logger.debug(f"Successfully queued item: {self._get_item_uuid(item)}")
|
||||||
|
return True
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.critical(f"CRITICAL: Failed to put item {self._get_item_uuid(item)}: {e}")
|
||||||
|
# Remove from priority storage if janus put failed
|
||||||
|
try:
|
||||||
|
with self._lock:
|
||||||
|
if item in self._priority_items:
|
||||||
|
self._priority_items.remove(item)
|
||||||
|
heapq.heapify(self._priority_items)
|
||||||
|
except Exception as cleanup_e:
|
||||||
|
logger.critical(f"CRITICAL: Failed to cleanup after put failure: {cleanup_e}")
|
||||||
|
return False
|
||||||
|
|
||||||
|
def get(self, block: bool = True, timeout: Optional[float] = None):
|
||||||
|
"""Thread-safe sync get with priority ordering"""
|
||||||
|
try:
|
||||||
|
# Wait for notification
|
||||||
|
self.sync_q.get(block=block, timeout=timeout)
|
||||||
|
|
||||||
|
# Get highest priority item
|
||||||
|
with self._lock:
|
||||||
|
if not self._priority_items:
|
||||||
|
logger.critical("CRITICAL: Queue notification received but no priority items available")
|
||||||
|
raise Exception("Priority queue inconsistency")
|
||||||
|
item = heapq.heappop(self._priority_items)
|
||||||
|
|
||||||
|
# Emit signals
|
||||||
|
self._emit_get_signals()
|
||||||
|
|
||||||
|
logger.debug(f"Successfully retrieved item: {self._get_item_uuid(item)}")
|
||||||
|
return item
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.critical(f"CRITICAL: Failed to get item from queue: {e}")
|
||||||
|
raise
|
||||||
|
|
||||||
|
# ASYNC INTERFACE (for workers)
|
||||||
|
async def async_put(self, item):
|
||||||
|
"""Pure async put with priority ordering"""
|
||||||
|
try:
|
||||||
|
# Add to priority storage
|
||||||
|
with self._lock:
|
||||||
|
heapq.heappush(self._priority_items, item)
|
||||||
|
|
||||||
|
# Notify via janus async queue
|
||||||
|
await self.async_q.put(True)
|
||||||
|
|
||||||
|
# Emit signals
|
||||||
|
self._emit_put_signals(item)
|
||||||
|
|
||||||
|
logger.debug(f"Successfully async queued item: {self._get_item_uuid(item)}")
|
||||||
|
return True
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.critical(f"CRITICAL: Failed to async put item {self._get_item_uuid(item)}: {e}")
|
||||||
|
# Remove from priority storage if janus put failed
|
||||||
|
try:
|
||||||
|
with self._lock:
|
||||||
|
if item in self._priority_items:
|
||||||
|
self._priority_items.remove(item)
|
||||||
|
heapq.heapify(self._priority_items)
|
||||||
|
except Exception as cleanup_e:
|
||||||
|
logger.critical(f"CRITICAL: Failed to cleanup after async put failure: {cleanup_e}")
|
||||||
|
return False
|
||||||
|
|
||||||
|
async def async_get(self):
|
||||||
|
"""Pure async get with priority ordering"""
|
||||||
|
try:
|
||||||
|
# Wait for notification
|
||||||
|
await self.async_q.get()
|
||||||
|
|
||||||
|
# Get highest priority item
|
||||||
|
with self._lock:
|
||||||
|
if not self._priority_items:
|
||||||
|
logger.critical("CRITICAL: Async queue notification received but no priority items available")
|
||||||
|
raise Exception("Priority queue inconsistency")
|
||||||
|
item = heapq.heappop(self._priority_items)
|
||||||
|
|
||||||
|
# Emit signals
|
||||||
|
self._emit_get_signals()
|
||||||
|
|
||||||
|
logger.debug(f"Successfully async retrieved item: {self._get_item_uuid(item)}")
|
||||||
|
return item
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.critical(f"CRITICAL: Failed to async get item from queue: {e}")
|
||||||
|
raise
|
||||||
|
|
||||||
|
# UTILITY METHODS
|
||||||
|
def qsize(self) -> int:
|
||||||
|
"""Get current queue size"""
|
||||||
|
try:
|
||||||
|
with self._lock:
|
||||||
|
return len(self._priority_items)
|
||||||
|
except Exception as e:
|
||||||
|
logger.critical(f"CRITICAL: Failed to get queue size: {e}")
|
||||||
|
return 0
|
||||||
|
|
||||||
|
def empty(self) -> bool:
|
||||||
|
"""Check if queue is empty"""
|
||||||
|
return self.qsize() == 0
|
||||||
|
|
||||||
|
def close(self):
|
||||||
|
"""Close the janus queue"""
|
||||||
|
try:
|
||||||
|
self._janus_queue.close()
|
||||||
|
logger.debug("RecheckPriorityQueue closed successfully")
|
||||||
|
except Exception as e:
|
||||||
|
logger.critical(f"CRITICAL: Failed to close RecheckPriorityQueue: {e}")
|
||||||
|
|
||||||
|
# COMPATIBILITY METHODS (from original implementation)
|
||||||
|
@property
|
||||||
|
def queue(self):
|
||||||
|
"""Provide compatibility with original queue access"""
|
||||||
|
try:
|
||||||
|
with self._lock:
|
||||||
|
return list(self._priority_items)
|
||||||
|
except Exception as e:
|
||||||
|
logger.critical(f"CRITICAL: Failed to get queue list: {e}")
|
||||||
|
return []
|
||||||
|
|
||||||
|
def get_uuid_position(self, target_uuid: str) -> Dict[str, Any]:
|
||||||
|
"""Find position of UUID in queue"""
|
||||||
|
try:
|
||||||
|
with self._lock:
|
||||||
|
queue_list = list(self._priority_items)
|
||||||
|
total_items = len(queue_list)
|
||||||
|
|
||||||
|
if total_items == 0:
|
||||||
|
return {'position': None, 'total_items': 0, 'priority': None, 'found': False}
|
||||||
|
|
||||||
|
# Find target item
|
||||||
|
for item in queue_list:
|
||||||
|
if (hasattr(item, 'item') and isinstance(item.item, dict) and
|
||||||
|
item.item.get('uuid') == target_uuid):
|
||||||
|
|
||||||
|
# Count items with higher priority
|
||||||
|
position = sum(1 for other in queue_list if other.priority < item.priority)
|
||||||
|
return {
|
||||||
|
'position': position,
|
||||||
|
'total_items': total_items,
|
||||||
|
'priority': item.priority,
|
||||||
|
'found': True
|
||||||
|
}
|
||||||
|
|
||||||
|
return {'position': None, 'total_items': total_items, 'priority': None, 'found': False}
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.critical(f"CRITICAL: Failed to get UUID position for {target_uuid}: {e}")
|
||||||
|
return {'position': None, 'total_items': 0, 'priority': None, 'found': False}
|
||||||
|
|
||||||
|
def get_all_queued_uuids(self, limit: Optional[int] = None, offset: int = 0) -> Dict[str, Any]:
|
||||||
|
"""Get all queued UUIDs with pagination"""
|
||||||
|
try:
|
||||||
|
with self._lock:
|
||||||
|
queue_list = sorted(self._priority_items) # Sort by priority
|
||||||
|
total_items = len(queue_list)
|
||||||
|
|
||||||
|
if total_items == 0:
|
||||||
|
return {'items': [], 'total_items': 0, 'returned_items': 0, 'has_more': False}
|
||||||
|
|
||||||
|
# Apply pagination
|
||||||
|
end_idx = min(offset + limit, total_items) if limit else total_items
|
||||||
|
items_to_process = queue_list[offset:end_idx]
|
||||||
|
|
||||||
|
result = []
|
||||||
|
for position, item in enumerate(items_to_process, start=offset):
|
||||||
|
if (hasattr(item, 'item') and isinstance(item.item, dict) and
|
||||||
|
'uuid' in item.item):
|
||||||
|
result.append({
|
||||||
|
'uuid': item.item['uuid'],
|
||||||
|
'position': position,
|
||||||
|
'priority': item.priority
|
||||||
|
})
|
||||||
|
|
||||||
|
return {
|
||||||
|
'items': result,
|
||||||
|
'total_items': total_items,
|
||||||
|
'returned_items': len(result),
|
||||||
|
'has_more': (offset + len(result)) < total_items
|
||||||
|
}
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.critical(f"CRITICAL: Failed to get all queued UUIDs: {e}")
|
||||||
|
return {'items': [], 'total_items': 0, 'returned_items': 0, 'has_more': False}
|
||||||
|
|
||||||
|
def get_queue_summary(self) -> Dict[str, Any]:
|
||||||
|
"""Get queue summary statistics"""
|
||||||
|
try:
|
||||||
|
with self._lock:
|
||||||
|
queue_list = list(self._priority_items)
|
||||||
|
total_items = len(queue_list)
|
||||||
|
|
||||||
|
if total_items == 0:
|
||||||
|
return {
|
||||||
|
'total_items': 0, 'priority_breakdown': {},
|
||||||
|
'immediate_items': 0, 'clone_items': 0, 'scheduled_items': 0
|
||||||
|
}
|
||||||
|
|
||||||
|
immediate_items = clone_items = scheduled_items = 0
|
||||||
|
priority_counts = {}
|
||||||
|
|
||||||
|
for item in queue_list:
|
||||||
|
priority = item.priority
|
||||||
|
priority_counts[priority] = priority_counts.get(priority, 0) + 1
|
||||||
|
|
||||||
|
if priority == 1:
|
||||||
|
immediate_items += 1
|
||||||
|
elif priority == 5:
|
||||||
|
clone_items += 1
|
||||||
|
elif priority > 100:
|
||||||
|
scheduled_items += 1
|
||||||
|
|
||||||
|
return {
|
||||||
|
'total_items': total_items,
|
||||||
|
'priority_breakdown': priority_counts,
|
||||||
|
'immediate_items': immediate_items,
|
||||||
|
'clone_items': clone_items,
|
||||||
|
'scheduled_items': scheduled_items,
|
||||||
|
'min_priority': min(priority_counts.keys()) if priority_counts else None,
|
||||||
|
'max_priority': max(priority_counts.keys()) if priority_counts else None
|
||||||
|
}
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.critical(f"CRITICAL: Failed to get queue summary: {e}")
|
||||||
|
return {'total_items': 0, 'priority_breakdown': {}, 'immediate_items': 0,
|
||||||
|
'clone_items': 0, 'scheduled_items': 0}
|
||||||
|
|
||||||
|
# PRIVATE METHODS
|
||||||
|
def _get_item_uuid(self, item) -> str:
|
||||||
|
"""Safely extract UUID from item for logging"""
|
||||||
|
try:
|
||||||
|
if hasattr(item, 'item') and isinstance(item.item, dict):
|
||||||
|
return item.item.get('uuid', 'unknown')
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
return 'unknown'
|
||||||
|
|
||||||
|
def _emit_put_signals(self, item):
|
||||||
|
"""Emit signals when item is added"""
|
||||||
|
try:
|
||||||
|
# Watch update signal
|
||||||
|
if hasattr(item, 'item') and isinstance(item.item, dict) and 'uuid' in item.item:
|
||||||
|
watch_check_update = signal('watch_check_update')
|
||||||
|
if watch_check_update:
|
||||||
|
watch_check_update.send(watch_uuid=item.item['uuid'])
|
||||||
|
|
||||||
|
# Queue length signal
|
||||||
|
if self.queue_length_signal:
|
||||||
|
self.queue_length_signal.send(length=self.qsize())
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.critical(f"CRITICAL: Failed to emit put signals: {e}")
|
||||||
|
|
||||||
|
def _emit_get_signals(self):
|
||||||
|
"""Emit signals when item is removed"""
|
||||||
|
try:
|
||||||
|
if self.queue_length_signal:
|
||||||
|
self.queue_length_signal.send(length=self.qsize())
|
||||||
|
except Exception as e:
|
||||||
|
logger.critical(f"CRITICAL: Failed to emit get signals: {e}")
|
||||||
|
|
||||||
|
|
||||||
|
class NotificationQueue:
|
||||||
|
"""
|
||||||
|
Ultra-reliable notification queue using pure janus.
|
||||||
|
|
||||||
|
CRITICAL DESIGN NOTE: Both sync_q and async_q are required because:
|
||||||
|
- sync_q: Used by Flask routes, ticker threads, and other synchronous code
|
||||||
|
- async_q: Used by async workers and coroutines
|
||||||
|
|
||||||
|
DO NOT REMOVE EITHER INTERFACE - they bridge different execution contexts.
|
||||||
|
See RecheckPriorityQueue docstring above for detailed explanation.
|
||||||
|
|
||||||
|
Simple wrapper around janus with bulletproof error handling.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, maxsize: int = 0):
|
||||||
|
try:
|
||||||
|
self._janus_queue = janus.Queue(maxsize=maxsize)
|
||||||
|
# BOTH interfaces required - see class docstring for why
|
||||||
|
self.sync_q = self._janus_queue.sync_q # Flask routes, threads
|
||||||
|
self.async_q = self._janus_queue.async_q # Async workers
|
||||||
|
self.notification_event_signal = signal('notification_event')
|
||||||
|
logger.debug("NotificationQueue initialized successfully")
|
||||||
|
except Exception as e:
|
||||||
|
logger.critical(f"CRITICAL: Failed to initialize NotificationQueue: {e}")
|
||||||
|
raise
|
||||||
|
|
||||||
|
def put(self, item: Dict[str, Any], block: bool = True, timeout: Optional[float] = None):
|
||||||
|
"""Thread-safe sync put with signal emission"""
|
||||||
|
try:
|
||||||
|
self.sync_q.put(item, block=block, timeout=timeout)
|
||||||
|
self._emit_notification_signal(item)
|
||||||
|
logger.debug(f"Successfully queued notification: {item.get('uuid', 'unknown')}")
|
||||||
|
return True
|
||||||
|
except Exception as e:
|
||||||
|
logger.critical(f"CRITICAL: Failed to put notification {item.get('uuid', 'unknown')}: {e}")
|
||||||
|
return False
|
||||||
|
|
||||||
|
async def async_put(self, item: Dict[str, Any]):
|
||||||
|
"""Pure async put with signal emission"""
|
||||||
|
try:
|
||||||
|
await self.async_q.put(item)
|
||||||
|
self._emit_notification_signal(item)
|
||||||
|
logger.debug(f"Successfully async queued notification: {item.get('uuid', 'unknown')}")
|
||||||
|
return True
|
||||||
|
except Exception as e:
|
||||||
|
logger.critical(f"CRITICAL: Failed to async put notification {item.get('uuid', 'unknown')}: {e}")
|
||||||
|
return False
|
||||||
|
|
||||||
|
def get(self, block: bool = True, timeout: Optional[float] = None):
|
||||||
|
"""Thread-safe sync get"""
|
||||||
|
try:
|
||||||
|
return self.sync_q.get(block=block, timeout=timeout)
|
||||||
|
except Exception as e:
|
||||||
|
logger.critical(f"CRITICAL: Failed to get notification: {e}")
|
||||||
|
raise
|
||||||
|
|
||||||
|
async def async_get(self):
|
||||||
|
"""Pure async get"""
|
||||||
|
try:
|
||||||
|
return await self.async_q.get()
|
||||||
|
except Exception as e:
|
||||||
|
logger.critical(f"CRITICAL: Failed to async get notification: {e}")
|
||||||
|
raise
|
||||||
|
|
||||||
|
def qsize(self) -> int:
|
||||||
|
"""Get current queue size"""
|
||||||
|
try:
|
||||||
|
return self.sync_q.qsize()
|
||||||
|
except Exception as e:
|
||||||
|
logger.critical(f"CRITICAL: Failed to get notification queue size: {e}")
|
||||||
|
return 0
|
||||||
|
|
||||||
|
def empty(self) -> bool:
|
||||||
|
"""Check if queue is empty"""
|
||||||
|
return self.qsize() == 0
|
||||||
|
|
||||||
|
def close(self):
|
||||||
|
"""Close the janus queue"""
|
||||||
|
try:
|
||||||
|
self._janus_queue.close()
|
||||||
|
logger.debug("NotificationQueue closed successfully")
|
||||||
|
except Exception as e:
|
||||||
|
logger.critical(f"CRITICAL: Failed to close NotificationQueue: {e}")
|
||||||
|
|
||||||
|
def _emit_notification_signal(self, item: Dict[str, Any]):
|
||||||
|
"""Emit notification signal"""
|
||||||
|
try:
|
||||||
|
if self.notification_event_signal and isinstance(item, dict):
|
||||||
|
watch_uuid = item.get('uuid')
|
||||||
|
if watch_uuid:
|
||||||
|
self.notification_event_signal.send(watch_uuid=watch_uuid)
|
||||||
|
else:
|
||||||
|
self.notification_event_signal.send()
|
||||||
|
except Exception as e:
|
||||||
|
logger.critical(f"CRITICAL: Failed to emit notification signal: {e}")
|
||||||
@@ -6,7 +6,7 @@
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
&.favicon-enabled {
|
|
||||||
tr {
|
tr {
|
||||||
/* make the icons and the text inline-ish */
|
/* make the icons and the text inline-ish */
|
||||||
td.inline.title-col {
|
td.inline.title-col {
|
||||||
@@ -17,7 +17,7 @@
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
td,
|
td,
|
||||||
th {
|
th {
|
||||||
|
|||||||
File diff suppressed because one or more lines are too long
@@ -292,9 +292,7 @@ def test_access_denied(client, live_server, measure_memory_usage):
|
|||||||
|
|
||||||
def test_api_watch_PUT_update(client, live_server, measure_memory_usage):
|
def test_api_watch_PUT_update(client, live_server, measure_memory_usage):
|
||||||
|
|
||||||
|
|
||||||
api_key = live_server.app.config['DATASTORE'].data['settings']['application'].get('api_access_token')
|
api_key = live_server.app.config['DATASTORE'].data['settings']['application'].get('api_access_token')
|
||||||
|
|
||||||
# Create a watch
|
# Create a watch
|
||||||
set_original_response()
|
set_original_response()
|
||||||
test_url = url_for('test_endpoint', _external=True)
|
test_url = url_for('test_endpoint', _external=True)
|
||||||
@@ -302,14 +300,27 @@ def test_api_watch_PUT_update(client, live_server, measure_memory_usage):
|
|||||||
# Create new
|
# Create new
|
||||||
res = client.post(
|
res = client.post(
|
||||||
url_for("createwatch"),
|
url_for("createwatch"),
|
||||||
data=json.dumps({"url": test_url, 'tag': "One, Two", "title": "My test URL", 'headers': {'cookie': 'yum'} }),
|
data=json.dumps({"url": test_url,
|
||||||
|
'tag': "One, Two",
|
||||||
|
"title": "My test URL",
|
||||||
|
'headers': {'cookie': 'yum'},
|
||||||
|
"conditions": [
|
||||||
|
{
|
||||||
|
"field": "page_filtered_text",
|
||||||
|
"operator": "contains_regex",
|
||||||
|
"value": "." # contains anything
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"conditions_match_logic": "ALL"
|
||||||
|
}
|
||||||
|
),
|
||||||
headers={'content-type': 'application/json', 'x-api-key': api_key},
|
headers={'content-type': 'application/json', 'x-api-key': api_key},
|
||||||
follow_redirects=True
|
follow_redirects=True
|
||||||
)
|
)
|
||||||
|
|
||||||
assert res.status_code == 201
|
assert res.status_code == 201
|
||||||
|
|
||||||
|
wait_for_all_checks(client)
|
||||||
# Get a listing, it will be the first one
|
# Get a listing, it will be the first one
|
||||||
res = client.get(
|
res = client.get(
|
||||||
url_for("createwatch"),
|
url_for("createwatch"),
|
||||||
|
|||||||
@@ -4,6 +4,8 @@ import time
|
|||||||
|
|
||||||
from flask import url_for
|
from flask import url_for
|
||||||
from .util import live_server_setup, wait_for_all_checks
|
from .util import live_server_setup, wait_for_all_checks
|
||||||
|
from ..model import CONDITIONS_MATCH_LOGIC_DEFAULT
|
||||||
|
|
||||||
|
|
||||||
def set_original_response(number="50"):
|
def set_original_response(number="50"):
|
||||||
test_return_data = f"""<html>
|
test_return_data = f"""<html>
|
||||||
@@ -76,7 +78,7 @@ def test_conditions_with_text_and_number(client, live_server):
|
|||||||
"fetch_backend": "html_requests",
|
"fetch_backend": "html_requests",
|
||||||
"include_filters": ".number-container",
|
"include_filters": ".number-container",
|
||||||
"title": "Number AND Text Condition Test",
|
"title": "Number AND Text Condition Test",
|
||||||
"conditions_match_logic": "ALL", # ALL = AND logic
|
"conditions_match_logic": CONDITIONS_MATCH_LOGIC_DEFAULT, # ALL = AND logic
|
||||||
"conditions-0-operator": "in",
|
"conditions-0-operator": "in",
|
||||||
"conditions-0-field": "page_filtered_text",
|
"conditions-0-field": "page_filtered_text",
|
||||||
"conditions-0-value": "5",
|
"conditions-0-value": "5",
|
||||||
@@ -283,7 +285,7 @@ def test_lev_conditions_plugin(client, live_server, measure_memory_usage):
|
|||||||
data={
|
data={
|
||||||
"url": test_url,
|
"url": test_url,
|
||||||
"fetch_backend": "html_requests",
|
"fetch_backend": "html_requests",
|
||||||
"conditions_match_logic": "ALL", # ALL = AND logic
|
"conditions_match_logic": CONDITIONS_MATCH_LOGIC_DEFAULT, # ALL = AND logic
|
||||||
"conditions-0-field": "levenshtein_ratio",
|
"conditions-0-field": "levenshtein_ratio",
|
||||||
"conditions-0-operator": "<",
|
"conditions-0-operator": "<",
|
||||||
"conditions-0-value": "0.8" # needs to be more of a diff to trigger a change
|
"conditions-0-value": "0.8" # needs to be more of a diff to trigger a change
|
||||||
|
|||||||
@@ -46,7 +46,7 @@ def test_check_extract_text_from_diff(client, live_server, measure_memory_usage)
|
|||||||
follow_redirects=False
|
follow_redirects=False
|
||||||
)
|
)
|
||||||
|
|
||||||
assert b'Nothing matches that RegEx' not in res.data
|
assert b'No matches found while scanning all of the watch history for that RegEx.' not in res.data
|
||||||
assert res.content_type == 'text/csv'
|
assert res.content_type == 'text/csv'
|
||||||
|
|
||||||
# Read the csv reply as stringio
|
# Read the csv reply as stringio
|
||||||
|
|||||||
@@ -1,4 +1,5 @@
|
|||||||
from changedetectionio.conditions import execute_ruleset_against_all_plugins
|
from changedetectionio.conditions import execute_ruleset_against_all_plugins
|
||||||
|
from changedetectionio.model import CONDITIONS_MATCH_LOGIC_DEFAULT
|
||||||
from changedetectionio.store import ChangeDetectionStore
|
from changedetectionio.store import ChangeDetectionStore
|
||||||
import shutil
|
import shutil
|
||||||
import tempfile
|
import tempfile
|
||||||
@@ -59,7 +60,7 @@ class TestTriggerConditions(unittest.TestCase):
|
|||||||
|
|
||||||
self.store.data['watching'][self.watch_uuid].update(
|
self.store.data['watching'][self.watch_uuid].update(
|
||||||
{
|
{
|
||||||
"conditions_match_logic": "ALL",
|
"conditions_match_logic": CONDITIONS_MATCH_LOGIC_DEFAULT,
|
||||||
"conditions": [
|
"conditions": [
|
||||||
{"operator": ">=", "field": "extracted_number", "value": "10"},
|
{"operator": ">=", "field": "extracted_number", "value": "10"},
|
||||||
{"operator": "<=", "field": "extracted_number", "value": "5000"},
|
{"operator": "<=", "field": "extracted_number", "value": "5000"},
|
||||||
|
|||||||
@@ -188,15 +188,54 @@ def is_watch_running(watch_uuid):
|
|||||||
|
|
||||||
|
|
||||||
def queue_item_async_safe(update_q, item):
|
def queue_item_async_safe(update_q, item):
|
||||||
"""Queue an item for async queue processing"""
|
"""Bulletproof queue operation with comprehensive error handling"""
|
||||||
if async_loop and not async_loop.is_closed():
|
item_uuid = 'unknown'
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# For async queue, schedule the put operation
|
# Safely extract UUID for logging
|
||||||
asyncio.run_coroutine_threadsafe(update_q.put(item), async_loop)
|
if hasattr(item, 'item') and isinstance(item.item, dict):
|
||||||
except RuntimeError as e:
|
item_uuid = item.item.get('uuid', 'unknown')
|
||||||
logger.error(f"Failed to queue item: {e}")
|
except Exception as uuid_e:
|
||||||
else:
|
logger.critical(f"CRITICAL: Failed to extract UUID from queue item: {uuid_e}")
|
||||||
logger.error("Async loop not available or closed for queueing item")
|
|
||||||
|
# Validate inputs
|
||||||
|
if not update_q:
|
||||||
|
logger.critical(f"CRITICAL: Queue is None/invalid for item {item_uuid}")
|
||||||
|
return False
|
||||||
|
|
||||||
|
if not item:
|
||||||
|
logger.critical(f"CRITICAL: Item is None/invalid")
|
||||||
|
return False
|
||||||
|
|
||||||
|
# Attempt queue operation with multiple fallbacks
|
||||||
|
try:
|
||||||
|
# Primary: Use sync interface (thread-safe)
|
||||||
|
success = update_q.put(item, block=True, timeout=5.0)
|
||||||
|
if success is False: # Explicit False return means failure
|
||||||
|
logger.critical(f"CRITICAL: Queue.put() returned False for item {item_uuid}")
|
||||||
|
return False
|
||||||
|
|
||||||
|
logger.debug(f"Successfully queued item: {item_uuid}")
|
||||||
|
return True
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.critical(f"CRITICAL: Exception during queue operation for item {item_uuid}: {type(e).__name__}: {e}")
|
||||||
|
|
||||||
|
# Secondary: Attempt queue health check
|
||||||
|
try:
|
||||||
|
queue_size = update_q.qsize()
|
||||||
|
is_empty = update_q.empty()
|
||||||
|
logger.critical(f"CRITICAL: Queue health - size: {queue_size}, empty: {is_empty}")
|
||||||
|
except Exception as health_e:
|
||||||
|
logger.critical(f"CRITICAL: Queue health check failed: {health_e}")
|
||||||
|
|
||||||
|
# Log queue type for debugging
|
||||||
|
try:
|
||||||
|
logger.critical(f"CRITICAL: Queue type: {type(update_q)}, has sync_q: {hasattr(update_q, 'sync_q')}")
|
||||||
|
except Exception:
|
||||||
|
logger.critical(f"CRITICAL: Cannot determine queue type")
|
||||||
|
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
def shutdown_workers():
|
def shutdown_workers():
|
||||||
|
|||||||
@@ -66,6 +66,9 @@ services:
|
|||||||
# A valid timezone name to run as (for scheduling watch checking) see https://en.wikipedia.org/wiki/List_of_tz_database_time_zones
|
# A valid timezone name to run as (for scheduling watch checking) see https://en.wikipedia.org/wiki/List_of_tz_database_time_zones
|
||||||
# - TZ=America/Los_Angeles
|
# - TZ=America/Los_Angeles
|
||||||
#
|
#
|
||||||
|
# Text processing locale, en_US.UTF-8 used by default unless defined as something else here, UTF-8 should cover 99.99% of cases.
|
||||||
|
# - LC_ALL=en_US.UTF-8
|
||||||
|
#
|
||||||
# Maximum height of screenshots, default is 16000 px, screenshots will be clipped to this if exceeded.
|
# Maximum height of screenshots, default is 16000 px, screenshots will be clipped to this if exceeded.
|
||||||
# RAM usage will be higher if you increase this.
|
# RAM usage will be higher if you increase this.
|
||||||
# - SCREENSHOT_MAX_HEIGHT=16000
|
# - SCREENSHOT_MAX_HEIGHT=16000
|
||||||
|
|||||||
@@ -7,6 +7,7 @@ flask-paginate
|
|||||||
flask_expects_json~=1.7
|
flask_expects_json~=1.7
|
||||||
flask_restful
|
flask_restful
|
||||||
flask_cors # For the Chrome extension to operate
|
flask_cors # For the Chrome extension to operate
|
||||||
|
janus # Thread-safe async/sync queue bridge
|
||||||
flask_wtf~=1.2
|
flask_wtf~=1.2
|
||||||
flask~=2.3
|
flask~=2.3
|
||||||
flask-socketio~=5.5.1
|
flask-socketio~=5.5.1
|
||||||
|
|||||||
Reference in New Issue
Block a user