Compare commits

...

1 Commits

Author SHA1 Message Date
dgtlmoon
2b96e0a2fe API & UI - Recheck all - Dont requeue existing queued or processing watches. 2026-01-17 18:09:05 +01:00
2 changed files with 73 additions and 12 deletions

View File

@@ -475,17 +475,46 @@ class CreateWatch(Resource):
# If less than 20 watches, queue synchronously for immediate feedback
if len(watches_to_queue) < 20:
for uuid in watches_to_queue:
# Get already queued/running UUIDs once (efficient)
queued_uuids = set(self.update_q.get_queued_uuids())
running_uuids = set(worker_handler.get_running_uuids())
# Filter out watches that are already queued or running
watches_to_queue_filtered = [
uuid for uuid in watches_to_queue
if uuid not in queued_uuids and uuid not in running_uuids
]
# Queue only the filtered watches
for uuid in watches_to_queue_filtered:
worker_handler.queue_item_async_safe(self.update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': uuid}))
return {'status': f'OK, queued {len(watches_to_queue)} watches for rechecking'}, 200
# Provide feedback about skipped watches
skipped_count = len(watches_to_queue) - len(watches_to_queue_filtered)
if skipped_count > 0:
return {'status': f'OK, queued {len(watches_to_queue_filtered)} watches for rechecking ({skipped_count} already queued or running)'}, 200
else:
return {'status': f'OK, queued {len(watches_to_queue_filtered)} watches for rechecking'}, 200
else:
# 20+ watches - queue in background thread to avoid blocking API response
# Capture queued/running state before background thread
queued_uuids = set(self.update_q.get_queued_uuids())
running_uuids = set(worker_handler.get_running_uuids())
def queue_all_watches_background():
"""Background thread to queue all watches - discarded after completion."""
try:
queued_count = 0
skipped_count = 0
for uuid in watches_to_queue:
worker_handler.queue_item_async_safe(self.update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': uuid}))
logger.info(f"Background queueing complete: {len(watches_to_queue)} watches queued")
# Check if already queued or running (state captured at start)
if uuid not in queued_uuids and uuid not in running_uuids:
worker_handler.queue_item_async_safe(self.update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': uuid}))
queued_count += 1
else:
skipped_count += 1
logger.info(f"Background queueing complete: {queued_count} watches queued, {skipped_count} skipped (already queued/running)")
except Exception as e:
logger.error(f"Error in background queueing all watches: {e}")

View File

@@ -256,9 +256,12 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, worker_handle
with_errors = request.args.get('with_errors') == "1"
if uuid:
# Single watch - queue immediately
worker_handler.queue_item_async_safe(update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': uuid}))
flash(gettext("Queued 1 watch for rechecking."))
# Single watch - check if already queued or running
if worker_handler.is_watch_running(uuid) or uuid in update_q.get_queued_uuids():
flash(gettext("Watch is already queued or being checked."))
else:
worker_handler.queue_item_async_safe(update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': uuid}))
flash(gettext("Queued 1 watch for rechecking."))
else:
# Multiple watches - first count how many need to be queued
watches_to_queue = []
@@ -274,21 +277,50 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, worker_handle
# If less than 20 watches, queue synchronously for immediate feedback
if len(watches_to_queue) < 20:
# Get already queued/running UUIDs once (efficient)
queued_uuids = set(update_q.get_queued_uuids())
running_uuids = set(worker_handler.get_running_uuids())
# Filter out watches that are already queued or running
watches_to_queue_filtered = []
for watch_uuid in watches_to_queue:
if watch_uuid not in queued_uuids and watch_uuid not in running_uuids:
watches_to_queue_filtered.append(watch_uuid)
# Queue only the filtered watches
for watch_uuid in watches_to_queue_filtered:
worker_handler.queue_item_async_safe(update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': watch_uuid}))
if len(watches_to_queue) == 1:
flash(gettext("Queued 1 watch for rechecking."))
# Provide feedback about skipped watches
skipped_count = len(watches_to_queue) - len(watches_to_queue_filtered)
if skipped_count > 0:
flash(gettext("Queued {} watches for rechecking ({} already queued or running).").format(
len(watches_to_queue_filtered), skipped_count))
else:
flash(gettext("Queued {} watches for rechecking.").format(len(watches_to_queue)))
if len(watches_to_queue_filtered) == 1:
flash(gettext("Queued 1 watch for rechecking."))
else:
flash(gettext("Queued {} watches for rechecking.").format(len(watches_to_queue_filtered)))
else:
# 20+ watches - queue in background thread to avoid blocking HTTP response
# Capture queued/running state before background thread
queued_uuids = set(update_q.get_queued_uuids())
running_uuids = set(worker_handler.get_running_uuids())
def queue_watches_background():
"""Background thread to queue watches - discarded after completion."""
try:
queued_count = 0
skipped_count = 0
for watch_uuid in watches_to_queue:
worker_handler.queue_item_async_safe(update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': watch_uuid}))
logger.info(f"Background queueing complete: {len(watches_to_queue)} watches queued")
# Check if already queued or running (state captured at start)
if watch_uuid not in queued_uuids and watch_uuid not in running_uuids:
worker_handler.queue_item_async_safe(update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': watch_uuid}))
queued_count += 1
else:
skipped_count += 1
logger.info(f"Background queueing complete: {queued_count} watches queued, {skipped_count} skipped (already queued/running)")
except Exception as e:
logger.error(f"Error in background queueing: {e}")