Compare commits

...

9 Commits

Author SHA1 Message Date
dgtlmoon
a89c30f882 adding notes 2026-02-07 03:41:02 +01:00
dgtlmoon
c6744f6969 fix test 2026-02-07 03:29:19 +01:00
dgtlmoon
01eb8f629a Adding tests and comments 2026-02-07 03:20:23 +01:00
dgtlmoon
faa7fa88cd lock fixes 2026-02-07 02:48:37 +01:00
dgtlmoon
3123bf0016 processor config fixes 2026-02-07 02:34:29 +01:00
dgtlmoon
fcadda5f09 cross platform safety 2026-02-07 02:27:02 +01:00
dgtlmoon
dc157cccd5 remove old code 2026-02-07 02:20:51 +01:00
dgtlmoon
8018742c67 remove old calls 2026-02-07 02:15:53 +01:00
dgtlmoon
e41b33269f Refactor 2026-02-07 02:01:58 +01:00
24 changed files with 1163 additions and 645 deletions

View File

@@ -112,9 +112,9 @@ def sigshutdown_handler(_signo, _stack_frame):
from changedetectionio.flask_app import update_q, notification_q from changedetectionio.flask_app import update_q, notification_q
update_q.close() update_q.close()
notification_q.close() notification_q.close()
logger.debug("Janus queues closed successfully") logger.debug("Queues closed successfully")
except Exception as e: except Exception as e:
logger.critical(f"CRITICAL: Failed to close janus queues: {e}") logger.critical(f"CRITICAL: Failed to close queues: {e}")
# Shutdown socketio server fast # Shutdown socketio server fast
from changedetectionio.flask_app import socketio_server from changedetectionio.flask_app import socketio_server
@@ -124,13 +124,9 @@ def sigshutdown_handler(_signo, _stack_frame):
except Exception as e: except Exception as e:
logger.error(f"Error shutting down Socket.IO server: {str(e)}") logger.error(f"Error shutting down Socket.IO server: {str(e)}")
# Save data quickly - force immediate save using abstract method # With immediate persistence, all data is already saved
try: logger.success('All data already persisted (immediate commits enabled).')
datastore.force_save_all()
logger.success('Fast sync to storage complete.')
except Exception as e:
logger.error(f"Error syncing to storage: {str(e)}")
sys.exit() sys.exit()
def print_help(): def print_help():

View File

@@ -67,7 +67,7 @@ class Notifications(Resource):
clean_urls = [url.strip() for url in notification_urls if isinstance(url, str)] clean_urls = [url.strip() for url in notification_urls if isinstance(url, str)]
self.datastore.data['settings']['application']['notification_urls'] = clean_urls self.datastore.data['settings']['application']['notification_urls'] = clean_urls
self.datastore.needs_write = True self.datastore.commit()
return {'notification_urls': clean_urls}, 200 return {'notification_urls': clean_urls}, 200
@@ -95,7 +95,7 @@ class Notifications(Resource):
abort(400, message="No matching notification URLs found.") abort(400, message="No matching notification URLs found.")
self.datastore.data['settings']['application']['notification_urls'] = notification_urls self.datastore.data['settings']['application']['notification_urls'] = notification_urls
self.datastore.needs_write = True self.datastore.commit()
return 'OK', 204 return 'OK', 204

View File

@@ -63,9 +63,11 @@ class Tag(Resource):
if request.args.get('muted', '') == 'muted': if request.args.get('muted', '') == 'muted':
self.datastore.data['settings']['application']['tags'][uuid]['notification_muted'] = True self.datastore.data['settings']['application']['tags'][uuid]['notification_muted'] = True
self.datastore.commit()
return "OK", 200 return "OK", 200
elif request.args.get('muted', '') == 'unmuted': elif request.args.get('muted', '') == 'unmuted':
self.datastore.data['settings']['application']['tags'][uuid]['notification_muted'] = False self.datastore.data['settings']['application']['tags'][uuid]['notification_muted'] = False
self.datastore.commit()
return "OK", 200 return "OK", 200
return tag return tag
@@ -79,11 +81,13 @@ class Tag(Resource):
# Delete the tag, and any tag reference # Delete the tag, and any tag reference
del self.datastore.data['settings']['application']['tags'][uuid] del self.datastore.data['settings']['application']['tags'][uuid]
self.datastore.commit()
# Remove tag from all watches # Remove tag from all watches
for watch_uuid, watch in self.datastore.data['watching'].items(): for watch_uuid, watch in self.datastore.data['watching'].items():
if watch.get('tags') and uuid in watch['tags']: if watch.get('tags') and uuid in watch['tags']:
watch['tags'].remove(uuid) watch['tags'].remove(uuid)
watch.commit()
return 'OK', 204 return 'OK', 204
@@ -107,7 +111,7 @@ class Tag(Resource):
return str(e), 400 return str(e), 400
tag.update(request.json) tag.update(request.json)
self.datastore.needs_write_urgent = True self.datastore.commit()
return "OK", 200 return "OK", 200

View File

@@ -84,15 +84,19 @@ class Watch(Resource):
return "OK", 200 return "OK", 200
if request.args.get('paused', '') == 'paused': if request.args.get('paused', '') == 'paused':
watch_obj.pause() watch_obj.pause()
watch_obj.commit()
return "OK", 200 return "OK", 200
elif request.args.get('paused', '') == 'unpaused': elif request.args.get('paused', '') == 'unpaused':
watch_obj.unpause() watch_obj.unpause()
watch_obj.commit()
return "OK", 200 return "OK", 200
if request.args.get('muted', '') == 'muted': if request.args.get('muted', '') == 'muted':
watch_obj.mute() watch_obj.mute()
watch_obj.commit()
return "OK", 200 return "OK", 200
elif request.args.get('muted', '') == 'unmuted': elif request.args.get('muted', '') == 'unmuted':
watch_obj.unmute() watch_obj.unmute()
watch_obj.commit()
return "OK", 200 return "OK", 200
# Return without history, get that via another API call # Return without history, get that via another API call
@@ -173,6 +177,7 @@ class Watch(Resource):
# Update watch with regular (non-processor-config) fields # Update watch with regular (non-processor-config) fields
watch.update(json_data) watch.update(json_data)
watch.commit()
# Save processor config to JSON file # Save processor config to JSON file
processors.save_processor_config(self.datastore, uuid, processor_config_data) processors.save_processor_config(self.datastore, uuid, processor_config_data)
@@ -419,8 +424,14 @@ class CreateWatch(Resource):
except ValidationError as e: except ValidationError as e:
return str(e), 400 return str(e), 400
# Handle processor-config-* fields separately (save to JSON, not watch)
from changedetectionio import processors
extras = copy.deepcopy(json_data) extras = copy.deepcopy(json_data)
# Extract and remove processor config fields from extras
processor_config_data = processors.extract_processor_config_from_form_data(extras)
# Because we renamed 'tag' to 'tags' but don't want to change the API (can do this in v2 of the API) # Because we renamed 'tag' to 'tags' but don't want to change the API (can do this in v2 of the API)
tags = None tags = None
if extras.get('tag'): if extras.get('tag'):
@@ -430,6 +441,10 @@ class CreateWatch(Resource):
del extras['url'] del extras['url']
new_uuid = self.datastore.add_watch(url=url, extras=extras, tag=tags) new_uuid = self.datastore.add_watch(url=url, extras=extras, tag=tags)
# Save processor config to separate JSON file
if new_uuid and processor_config_data:
processors.save_processor_config(self.datastore, new_uuid, processor_config_data)
if new_uuid: if new_uuid:
# Dont queue because the scheduler will check that it hasnt been checked before anyway # Dont queue because the scheduler will check that it hasnt been checked before anyway
# worker_pool.queue_item_async_safe(self.update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': new_uuid})) # worker_pool.queue_item_async_safe(self.update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': new_uuid}))

View File

@@ -12,9 +12,17 @@ schema = api_schema.build_watch_json_schema(watch_base_config)
schema_create_watch = copy.deepcopy(schema) schema_create_watch = copy.deepcopy(schema)
schema_create_watch['required'] = ['url'] schema_create_watch['required'] = ['url']
del schema_create_watch['properties']['last_viewed'] del schema_create_watch['properties']['last_viewed']
# Allow processor_config_* fields (handled separately in endpoint)
schema_create_watch['patternProperties'] = {
'^processor_config_': {'type': ['string', 'number', 'boolean', 'object', 'array', 'null']}
}
schema_update_watch = copy.deepcopy(schema) schema_update_watch = copy.deepcopy(schema)
schema_update_watch['additionalProperties'] = False schema_update_watch['additionalProperties'] = False
# Allow processor_config_* fields (handled separately in endpoint)
schema_update_watch['patternProperties'] = {
'^processor_config_': {'type': ['string', 'number', 'boolean', 'object', 'array', 'null']}
}
# Tag schema is also based on watch_base since Tag inherits from it # Tag schema is also based on watch_base since Tag inherits from it
schema_tag = copy.deepcopy(schema) schema_tag = copy.deepcopy(schema)

View File

@@ -102,8 +102,7 @@ def construct_blueprint(datastore: ChangeDetectionStore):
flash(gettext("Maximum number of backups reached, please remove some"), "error") flash(gettext("Maximum number of backups reached, please remove some"), "error")
return redirect(url_for('backups.index')) return redirect(url_for('backups.index'))
# Be sure we're written fresh - force immediate save using abstract method # With immediate persistence, all data is already saved
datastore.force_save_all()
zip_thread = threading.Thread( zip_thread = threading.Thread(
target=create_backup, target=create_backup,
args=(datastore.datastore_path, datastore.data.get("watching")), args=(datastore.datastore_path, datastore.data.get("watching")),

View File

@@ -20,6 +20,7 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q: PriorityQueue
datastore.data['watching'][uuid]['track_ldjson_price_data'] = PRICE_DATA_TRACK_ACCEPT datastore.data['watching'][uuid]['track_ldjson_price_data'] = PRICE_DATA_TRACK_ACCEPT
datastore.data['watching'][uuid]['processor'] = 'restock_diff' datastore.data['watching'][uuid]['processor'] = 'restock_diff'
datastore.data['watching'][uuid].clear_watch() datastore.data['watching'][uuid].clear_watch()
datastore.data['watching'][uuid].commit()
worker_pool.queue_item_async_safe(update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': uuid})) worker_pool.queue_item_async_safe(update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': uuid}))
return redirect(url_for("watchlist.index")) return redirect(url_for("watchlist.index"))
@@ -27,6 +28,7 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q: PriorityQueue
@price_data_follower_blueprint.route("/<string:uuid>/reject", methods=['GET']) @price_data_follower_blueprint.route("/<string:uuid>/reject", methods=['GET'])
def reject(uuid): def reject(uuid):
datastore.data['watching'][uuid]['track_ldjson_price_data'] = PRICE_DATA_TRACK_REJECT datastore.data['watching'][uuid]['track_ldjson_price_data'] = PRICE_DATA_TRACK_REJECT
datastore.data['watching'][uuid].commit()
return redirect(url_for("watchlist.index")) return redirect(url_for("watchlist.index"))

View File

@@ -74,12 +74,13 @@ def construct_blueprint(datastore: ChangeDetectionStore):
del (app_update['password']) del (app_update['password'])
datastore.data['settings']['application'].update(app_update) datastore.data['settings']['application'].update(app_update)
# Handle dynamic worker count adjustment # Handle dynamic worker count adjustment
old_worker_count = datastore.data['settings']['requests'].get('workers', 1) old_worker_count = datastore.data['settings']['requests'].get('workers', 1)
new_worker_count = form.data['requests'].get('workers', 1) new_worker_count = form.data['requests'].get('workers', 1)
datastore.data['settings']['requests'].update(form.data['requests']) datastore.data['settings']['requests'].update(form.data['requests'])
datastore.commit()
# Adjust worker count if it changed # Adjust worker count if it changed
if new_worker_count != old_worker_count: if new_worker_count != old_worker_count:
@@ -109,13 +110,11 @@ def construct_blueprint(datastore: ChangeDetectionStore):
if not os.getenv("SALTED_PASS", False) and len(form.application.form.password.encrypted_password): if not os.getenv("SALTED_PASS", False) and len(form.application.form.password.encrypted_password):
datastore.data['settings']['application']['password'] = form.application.form.password.encrypted_password datastore.data['settings']['application']['password'] = form.application.form.password.encrypted_password
datastore.needs_write_urgent = True datastore.commit()
flash(gettext("Password protection enabled."), 'notice') flash(gettext("Password protection enabled."), 'notice')
flask_login.logout_user() flask_login.logout_user()
return redirect(url_for('watchlist.index')) return redirect(url_for('watchlist.index'))
datastore.needs_write_urgent = True
# Also save plugin settings from the same form submission # Also save plugin settings from the same form submission
plugin_tabs_list = get_plugin_settings_tabs() plugin_tabs_list = get_plugin_settings_tabs()
for tab in plugin_tabs_list: for tab in plugin_tabs_list:
@@ -181,7 +180,7 @@ def construct_blueprint(datastore: ChangeDetectionStore):
def settings_reset_api_key(): def settings_reset_api_key():
secret = secrets.token_hex(16) secret = secrets.token_hex(16)
datastore.data['settings']['application']['api_access_token'] = secret datastore.data['settings']['application']['api_access_token'] = secret
datastore.needs_write_urgent = True datastore.commit()
flash(gettext("API Key was regenerated.")) flash(gettext("API Key was regenerated."))
return redirect(url_for('settings.settings_page')+'#api') return redirect(url_for('settings.settings_page')+'#api')
@@ -198,7 +197,7 @@ def construct_blueprint(datastore: ChangeDetectionStore):
def toggle_all_paused(): def toggle_all_paused():
current_state = datastore.data['settings']['application'].get('all_paused', False) current_state = datastore.data['settings']['application'].get('all_paused', False)
datastore.data['settings']['application']['all_paused'] = not current_state datastore.data['settings']['application']['all_paused'] = not current_state
datastore.needs_write_urgent = True datastore.commit()
if datastore.data['settings']['application']['all_paused']: if datastore.data['settings']['application']['all_paused']:
flash(gettext("Automatic scheduling paused - checks will not be queued."), 'notice') flash(gettext("Automatic scheduling paused - checks will not be queued."), 'notice')
@@ -212,7 +211,7 @@ def construct_blueprint(datastore: ChangeDetectionStore):
def toggle_all_muted(): def toggle_all_muted():
current_state = datastore.data['settings']['application'].get('all_muted', False) current_state = datastore.data['settings']['application'].get('all_muted', False)
datastore.data['settings']['application']['all_muted'] = not current_state datastore.data['settings']['application']['all_muted'] = not current_state
datastore.needs_write_urgent = True datastore.commit()
if datastore.data['settings']['application']['all_muted']: if datastore.data['settings']['application']['all_muted']:
flash(gettext("All notifications muted."), 'notice') flash(gettext("All notifications muted."), 'notice')

View File

@@ -59,6 +59,7 @@ def construct_blueprint(datastore: ChangeDetectionStore):
def mute(uuid): def mute(uuid):
if datastore.data['settings']['application']['tags'].get(uuid): if datastore.data['settings']['application']['tags'].get(uuid):
datastore.data['settings']['application']['tags'][uuid]['notification_muted'] = not datastore.data['settings']['application']['tags'][uuid]['notification_muted'] datastore.data['settings']['application']['tags'][uuid]['notification_muted'] = not datastore.data['settings']['application']['tags'][uuid]['notification_muted']
datastore.commit()
return redirect(url_for('tags.tags_overview_page')) return redirect(url_for('tags.tags_overview_page'))
@tags_blueprint.route("/delete/<string:uuid>", methods=['GET']) @tags_blueprint.route("/delete/<string:uuid>", methods=['GET'])
@@ -76,6 +77,7 @@ def construct_blueprint(datastore: ChangeDetectionStore):
for watch_uuid, watch in datastore.data['watching'].items(): for watch_uuid, watch in datastore.data['watching'].items():
if watch.get('tags') and tag_uuid in watch['tags']: if watch.get('tags') and tag_uuid in watch['tags']:
watch['tags'].remove(tag_uuid) watch['tags'].remove(tag_uuid)
watch.commit()
removed_count += 1 removed_count += 1
logger.info(f"Background: Tag {tag_uuid} removed from {removed_count} watches") logger.info(f"Background: Tag {tag_uuid} removed from {removed_count} watches")
except Exception as e: except Exception as e:
@@ -98,6 +100,7 @@ def construct_blueprint(datastore: ChangeDetectionStore):
for watch_uuid, watch in datastore.data['watching'].items(): for watch_uuid, watch in datastore.data['watching'].items():
if watch.get('tags') and tag_uuid in watch['tags']: if watch.get('tags') and tag_uuid in watch['tags']:
watch['tags'].remove(tag_uuid) watch['tags'].remove(tag_uuid)
watch.commit()
unlinked_count += 1 unlinked_count += 1
logger.info(f"Background: Tag {tag_uuid} unlinked from {unlinked_count} watches") logger.info(f"Background: Tag {tag_uuid} unlinked from {unlinked_count} watches")
except Exception as e: except Exception as e:
@@ -114,6 +117,7 @@ def construct_blueprint(datastore: ChangeDetectionStore):
def delete_all(): def delete_all():
# Clear all tags from settings immediately # Clear all tags from settings immediately
datastore.data['settings']['application']['tags'] = {} datastore.data['settings']['application']['tags'] = {}
datastore.commit()
# Clear tags from all watches in background thread to avoid blocking # Clear tags from all watches in background thread to avoid blocking
def clear_all_tags_background(): def clear_all_tags_background():
@@ -122,6 +126,7 @@ def construct_blueprint(datastore: ChangeDetectionStore):
try: try:
for watch_uuid, watch in datastore.data['watching'].items(): for watch_uuid, watch in datastore.data['watching'].items():
watch['tags'] = [] watch['tags'] = []
watch.commit()
cleared_count += 1 cleared_count += 1
logger.info(f"Background: Cleared tags from {cleared_count} watches") logger.info(f"Background: Cleared tags from {cleared_count} watches")
except Exception as e: except Exception as e:
@@ -216,7 +221,7 @@ def construct_blueprint(datastore: ChangeDetectionStore):
datastore.data['settings']['application']['tags'][uuid].update(form.data) datastore.data['settings']['application']['tags'][uuid].update(form.data)
datastore.data['settings']['application']['tags'][uuid]['processor'] = 'restock_diff' datastore.data['settings']['application']['tags'][uuid]['processor'] = 'restock_diff'
datastore.needs_write_urgent = True datastore.commit()
flash(gettext("Updated")) flash(gettext("Updated"))
return redirect(url_for('tags.tags_overview_page')) return redirect(url_for('tags.tags_overview_page'))

View File

@@ -24,7 +24,7 @@ def _handle_operations(op, uuids, datastore, worker_pool, update_q, queuedWatchM
for uuid in uuids: for uuid in uuids:
if datastore.data['watching'].get(uuid): if datastore.data['watching'].get(uuid):
datastore.data['watching'][uuid]['paused'] = True datastore.data['watching'][uuid]['paused'] = True
datastore.mark_watch_dirty(uuid) datastore.data['watching'][uuid].commit()
if emit_flash: if emit_flash:
flash(gettext("{} watches paused").format(len(uuids))) flash(gettext("{} watches paused").format(len(uuids)))
@@ -32,7 +32,7 @@ def _handle_operations(op, uuids, datastore, worker_pool, update_q, queuedWatchM
for uuid in uuids: for uuid in uuids:
if datastore.data['watching'].get(uuid): if datastore.data['watching'].get(uuid):
datastore.data['watching'][uuid.strip()]['paused'] = False datastore.data['watching'][uuid.strip()]['paused'] = False
datastore.mark_watch_dirty(uuid) datastore.data['watching'][uuid].commit()
if emit_flash: if emit_flash:
flash(gettext("{} watches unpaused").format(len(uuids))) flash(gettext("{} watches unpaused").format(len(uuids)))
@@ -47,7 +47,7 @@ def _handle_operations(op, uuids, datastore, worker_pool, update_q, queuedWatchM
for uuid in uuids: for uuid in uuids:
if datastore.data['watching'].get(uuid): if datastore.data['watching'].get(uuid):
datastore.data['watching'][uuid]['notification_muted'] = True datastore.data['watching'][uuid]['notification_muted'] = True
datastore.mark_watch_dirty(uuid) datastore.data['watching'][uuid].commit()
if emit_flash: if emit_flash:
flash(gettext("{} watches muted").format(len(uuids))) flash(gettext("{} watches muted").format(len(uuids)))
@@ -55,7 +55,7 @@ def _handle_operations(op, uuids, datastore, worker_pool, update_q, queuedWatchM
for uuid in uuids: for uuid in uuids:
if datastore.data['watching'].get(uuid): if datastore.data['watching'].get(uuid):
datastore.data['watching'][uuid]['notification_muted'] = False datastore.data['watching'][uuid]['notification_muted'] = False
datastore.mark_watch_dirty(uuid) datastore.data['watching'][uuid].commit()
if emit_flash: if emit_flash:
flash(gettext("{} watches un-muted").format(len(uuids))) flash(gettext("{} watches un-muted").format(len(uuids)))
@@ -71,7 +71,7 @@ def _handle_operations(op, uuids, datastore, worker_pool, update_q, queuedWatchM
for uuid in uuids: for uuid in uuids:
if datastore.data['watching'].get(uuid): if datastore.data['watching'].get(uuid):
datastore.data['watching'][uuid]["last_error"] = False datastore.data['watching'][uuid]["last_error"] = False
datastore.mark_watch_dirty(uuid) datastore.data['watching'][uuid].commit()
if emit_flash: if emit_flash:
flash(gettext("{} watches errors cleared").format(len(uuids))) flash(gettext("{} watches errors cleared").format(len(uuids)))
@@ -92,6 +92,7 @@ def _handle_operations(op, uuids, datastore, worker_pool, update_q, queuedWatchM
datastore.data['watching'][uuid]['notification_body'] = None datastore.data['watching'][uuid]['notification_body'] = None
datastore.data['watching'][uuid]['notification_urls'] = [] datastore.data['watching'][uuid]['notification_urls'] = []
datastore.data['watching'][uuid]['notification_format'] = USE_SYSTEM_DEFAULT_NOTIFICATION_FORMAT_FOR_WATCH datastore.data['watching'][uuid]['notification_format'] = USE_SYSTEM_DEFAULT_NOTIFICATION_FORMAT_FOR_WATCH
datastore.data['watching'][uuid].commit()
if emit_flash: if emit_flash:
flash(gettext("{} watches set to use default notification settings").format(len(uuids))) flash(gettext("{} watches set to use default notification settings").format(len(uuids)))
@@ -107,6 +108,7 @@ def _handle_operations(op, uuids, datastore, worker_pool, update_q, queuedWatchM
datastore.data['watching'][uuid]['tags'] = [] datastore.data['watching'][uuid]['tags'] = []
datastore.data['watching'][uuid]['tags'].append(tag_uuid) datastore.data['watching'][uuid]['tags'].append(tag_uuid)
datastore.data['watching'][uuid].commit()
if emit_flash: if emit_flash:
flash(gettext("{} watches were tagged").format(len(uuids))) flash(gettext("{} watches were tagged").format(len(uuids)))

View File

@@ -198,6 +198,10 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, queuedWatchMe
# Recast it if need be to right data Watch handler # Recast it if need be to right data Watch handler
watch_class = processors.get_custom_watch_obj_for_processor(form.data.get('processor')) watch_class = processors.get_custom_watch_obj_for_processor(form.data.get('processor'))
datastore.data['watching'][uuid] = watch_class(datastore_path=datastore.datastore_path, __datastore=datastore.data, default=datastore.data['watching'][uuid]) datastore.data['watching'][uuid] = watch_class(datastore_path=datastore.datastore_path, __datastore=datastore.data, default=datastore.data['watching'][uuid])
# Save the watch immediately
datastore.data['watching'][uuid].commit()
flash(gettext("Updated watch - unpaused!") if request.args.get('unpause_on_save') else gettext("Updated watch.")) flash(gettext("Updated watch - unpaused!") if request.args.get('unpause_on_save') else gettext("Updated watch."))
# Cleanup any browsersteps session for this watch # Cleanup any browsersteps session for this watch
@@ -207,10 +211,6 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, queuedWatchMe
except Exception as e: except Exception as e:
logger.debug(f"Error cleaning up browsersteps session: {e}") logger.debug(f"Error cleaning up browsersteps session: {e}")
# Re #286 - We wait for syncing new data to disk in another thread every 60 seconds
# But in the case something is added we should save straight away
datastore.needs_write_urgent = True
# Do not queue on edit if its not within the time range # Do not queue on edit if its not within the time range
# @todo maybe it should never queue anyway on edit... # @todo maybe it should never queue anyway on edit...
@@ -386,6 +386,9 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, queuedWatchMe
s = re.sub(r'[0-9]+', r'\\d+', s) s = re.sub(r'[0-9]+', r'\\d+', s)
datastore.data["watching"][uuid]['ignore_text'].append('/' + s + '/') datastore.data["watching"][uuid]['ignore_text'].append('/' + s + '/')
# Save the updated ignore_text
datastore.data["watching"][uuid].commit()
return f"<a href={url_for('ui.ui_preview.preview_page', uuid=uuid)}>Click to preview</a>" return f"<a href={url_for('ui.ui_preview.preview_page', uuid=uuid)}>Click to preview</a>"
return edit_blueprint return edit_blueprint

View File

@@ -39,7 +39,7 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, queuedWatchMe
elif op == 'mute': elif op == 'mute':
datastore.data['watching'][uuid].toggle_mute() datastore.data['watching'][uuid].toggle_mute()
datastore.needs_write = True datastore.data['watching'][uuid].commit()
return redirect(url_for('watchlist.index', tag = active_tag_uuid)) return redirect(url_for('watchlist.index', tag = active_tag_uuid))
# Sort by last_changed and add the uuid which is usually the key.. # Sort by last_changed and add the uuid which is usually the key..

View File

@@ -1,8 +1,45 @@
"""
Tag/Group domain model for organizing and overriding watch settings.
ARCHITECTURE NOTE: Configuration Override Hierarchy
===================================================
Tags can override Watch settings when overrides_watch=True.
Current implementation requires manual checking in processors:
for tag_uuid in watch.get('tags'):
tag = datastore['settings']['application']['tags'][tag_uuid]
if tag.get('overrides_watch'):
restock_settings = tag.get('restock_settings', {})
break
With Pydantic, this would be automatic via chain resolution:
Watch → Tag (first with overrides_watch) → Global
See: Watch.py model docstring for full Pydantic architecture explanation
See: processors/restock_diff/processor.py:184-192 for current manual implementation
"""
from changedetectionio.model import watch_base from changedetectionio.model import watch_base
class model(watch_base): class model(watch_base):
"""
Tag domain model - groups watches and can override their settings.
Tags inherit from watch_base to reuse all the same fields as Watch.
When overrides_watch=True, tag settings take precedence over watch settings
for all watches in this tag/group.
Fields:
overrides_watch (bool): If True, this tag's settings override watch settings
title (str): Display name for this tag/group
uuid (str): Unique identifier
... (all fields from watch_base can be set as tag-level overrides)
Resolution order when overrides_watch=True:
Watch.field → Tag.field (if overrides_watch) → Global.field
"""
def __init__(self, *arg, **kw): def __init__(self, *arg, **kw):
# Store datastore reference (optional for Tags, but good for consistency) # Store datastore reference (optional for Tags, but good for consistency)

View File

@@ -1,3 +1,29 @@
"""
Watch domain model for change detection monitoring.
ARCHITECTURE NOTE: Configuration Override Hierarchy
===================================================
This module implements Watch objects that inherit from dict (technical debt).
The dream architecture would use Pydantic for:
1. CHAIN RESOLUTION (Watch → Tag → Global Settings)
- Current: Manual resolution scattered across codebase
- Future: @computed_field properties with automatic resolution
- Examples: resolved_fetch_backend, resolved_restock_settings, etc.
2. DATABASE BACKEND ABSTRACTION
- Current: Domain model tightly coupled to file-based JSON storage
- Future: Domain model (Pydantic) separate from persistence layer
- Enables: Easy migration to PostgreSQL, MongoDB, etc.
3. TYPE SAFETY & VALIDATION
- Current: Dict access with no compile-time checks
- Future: Type hints, IDE autocomplete, validation at boundaries
See class model docstring for detailed explanation and examples.
See: processors/restock_diff/processor.py:184-192 for manual resolution example
"""
import gc import gc
from copy import copy from copy import copy
@@ -104,6 +130,99 @@ def _brotli_save(contents, filepath, mode=None, fallback_uncompressed=False):
class model(watch_base): class model(watch_base):
"""
Watch domain model for monitoring URL changes.
Inherits from watch_base (which inherits dict) - see watch_base docstring for field documentation.
## Configuration Override Hierarchy (Chain Resolution)
The dream architecture uses a 3-level resolution chain:
Watch settings → Tag/Group settings → Global settings
Current implementation is MANUAL (see processor.py:184-192 for example):
- Processors manually check watch.get('field')
- Then loop through watch.tags to find first tag with overrides_watch=True
- Finally fall back to datastore['settings']['application']['field']
FUTURE: Pydantic-based chain resolution would enable:
```python
# Instead of manual resolution in every processor:
restock_settings = watch.get('restock_settings', {})
for tag_uuid in watch.get('tags'):
tag = datastore['settings']['application']['tags'][tag_uuid]
if tag.get('overrides_watch'):
restock_settings = tag.get('restock_settings', {})
break
# Clean computed properties with automatic resolution:
@computed_field
def resolved_restock_settings(self) -> dict:
if self.restock_settings:
return self.restock_settings
for tag_uuid in self.tags:
tag = self._datastore.get_tag(tag_uuid)
if tag.overrides_watch and tag.restock_settings:
return tag.restock_settings
return self._datastore.settings.restock_settings or {}
# Usage: watch.resolved_restock_settings (automatic, type-safe, tested once)
```
Benefits of Pydantic migration:
1. Single source of truth for resolution logic (not scattered across processors)
2. Type safety + IDE autocomplete (watch.resolved_fetch_backend vs dict navigation)
3. Database backend abstraction (domain model separate from persistence)
4. Automatic validation at boundaries
5. Self-documenting via type hints
6. Easy to test resolution independently
Resolution chain examples that would benefit:
- fetch_backend: watch → tag → global (see get_fetch_backend property)
- notification_urls: watch → tag → global
- time_between_check: watch → global (see threshold_seconds)
- restock_settings: watch → tag (see processors/restock_diff/processor.py:184-192)
- history_snapshot_max_length: watch → global (see save_history_blob:550-556)
- All processor_config_* settings could use tag overrides
## Database Backend Abstraction with Pydantic
Current: Watch inherits dict, tightly coupled to file-based JSON storage
Future: Domain model (Watch) separate from persistence layer
```python
# Domain model (database-agnostic)
class Watch(BaseModel):
uuid: str
url: str
# ... validation, business logic
# Pluggable backends
class DataStoreBackend(ABC):
def save_watch(self, watch: Watch): ...
def load_watch(self, uuid: str) -> Watch: ...
# Implementations: FileBackend, MongoBackend, PostgresBackend, etc.
```
This would enable:
- Easy migration between storage backends (file → postgres → mongodb)
- Pydantic handles serialization/deserialization automatically
- Domain logic stays clean (no storage concerns in Watch methods)
## Migration Path
Given existing codebase, incremental migration recommended:
1. Create Pydantic models alongside existing dict-based models
2. Add .to_pydantic() / .from_pydantic() bridge methods
3. Gradually migrate code to use Pydantic models
4. Remove dict inheritance once migration complete
See: watch_base docstring for technical debt discussion
See: processors/restock_diff/processor.py:184-192 for manual resolution example
See: Watch.py:550-556 for nested dict navigation that would become watch.resolved_*
"""
__newest_history_key = None __newest_history_key = None
__history_n = 0 __history_n = 0
jitter_seconds = 0 jitter_seconds = 0
@@ -243,8 +362,30 @@ class model(watch_base):
@property @property
def get_fetch_backend(self): def get_fetch_backend(self):
""" """
Like just using the `fetch_backend` key but there could be some logic Get the fetch backend for this watch with special case handling.
:return:
CHAIN RESOLUTION OPPORTUNITY:
Currently returns watch.fetch_backend directly, but doesn't implement
Watch → Tag → Global resolution chain. With Pydantic:
@computed_field
def resolved_fetch_backend(self) -> str:
# Special case: PDFs always use html_requests
if self.is_pdf:
return 'html_requests'
# Watch override
if self.fetch_backend and self.fetch_backend != 'system':
return self.fetch_backend
# Tag override (first tag with overrides_watch=True wins)
for tag_uuid in self.tags:
tag = self._datastore.get_tag(tag_uuid)
if tag.overrides_watch and tag.fetch_backend:
return tag.fetch_backend
# Global default
return self._datastore.settings.fetch_backend
""" """
# Maybe also if is_image etc? # Maybe also if is_image etc?
# This is because chrome/playwright wont render the PDF in the browser and we will just fetch it and use pdf2html to see the text. # This is because chrome/playwright wont render the PDF in the browser and we will just fetch it and use pdf2html to see the text.
@@ -546,7 +687,12 @@ class model(watch_base):
self.__newest_history_key = timestamp self.__newest_history_key = timestamp
self.__history_n += 1 self.__history_n += 1
# MANUAL CHAIN RESOLUTION: Watch → Global
# With Pydantic, this would become: maxlen = watch.resolved_history_snapshot_max_length
# @computed_field def resolved_history_snapshot_max_length(self) -> Optional[int]:
# if self.history_snapshot_max_length: return self.history_snapshot_max_length
# if tag := self._get_override_tag(): return tag.history_snapshot_max_length
# return self._datastore.settings.history_snapshot_max_length
maxlen = ( maxlen = (
self.get('history_snapshot_max_length') self.get('history_snapshot_max_length')
or (self.__datastore and self.__datastore['settings']['application'].get('history_snapshot_max_length')) or (self.__datastore and self.__datastore['settings']['application'].get('history_snapshot_max_length'))
@@ -844,6 +990,57 @@ class model(watch_base):
def toggle_mute(self): def toggle_mute(self):
self['notification_muted'] ^= True self['notification_muted'] ^= True
def commit(self):
"""
Save this watch immediately to disk using atomic write.
Replaces the old dirty-tracking system with immediate persistence.
Uses atomic write pattern (temp file + rename) for crash safety.
Fire-and-forget: Logs errors but does not raise exceptions.
Watch data remains in memory even if save fails, so next commit will retry.
"""
from loguru import logger
if not self.__datastore:
logger.error(f"Cannot commit watch {self.get('uuid')} without datastore reference")
return
if not self.watch_data_dir:
logger.error(f"Cannot commit watch {self.get('uuid')} without datastore_path")
return
# Convert to dict for serialization, excluding processor config keys
# Processor configs are stored separately in processor-specific JSON files
# Use deepcopy to prevent mutations from affecting the original Watch object
import copy
# Acquire datastore lock to prevent concurrent modifications during copy
# Take a quick shallow snapshot under lock, then deep copy outside lock
lock = self.__datastore.lock if self.__datastore and hasattr(self.__datastore, 'lock') else None
if lock:
with lock:
snapshot = dict(self)
else:
snapshot = dict(self)
# Deep copy snapshot (slower, but done outside lock to minimize contention)
watch_dict = {k: copy.deepcopy(v) for k, v in snapshot.items() if not k.startswith('processor_config_')}
# Normalize browser_steps: if no meaningful steps, save as empty list
if not self.has_browser_steps:
watch_dict['browser_steps'] = []
# Use existing atomic write helper
from changedetectionio.store.file_saving_datastore import save_watch_atomic
try:
save_watch_atomic(self.watch_data_dir, self.get('uuid'), watch_dict)
logger.debug(f"Committed watch {self.get('uuid')}")
except Exception as e:
logger.error(f"Failed to commit watch {self.get('uuid')}: {e}")
def extra_notification_token_values(self): def extra_notification_token_values(self):
# Used for providing extra tokens # Used for providing extra tokens
# return {'widget': 555} # return {'widget': 555}

View File

@@ -6,6 +6,147 @@ USE_SYSTEM_DEFAULT_NOTIFICATION_FORMAT_FOR_WATCH = 'System default'
CONDITIONS_MATCH_LOGIC_DEFAULT = 'ALL' CONDITIONS_MATCH_LOGIC_DEFAULT = 'ALL'
class watch_base(dict): class watch_base(dict):
"""
Base watch domain model (inherits from dict for backward compatibility).
WARNING: This class inherits from dict, which violates proper encapsulation.
Dict inheritance is legacy technical debt that should be refactored to a proper
domain model (e.g., Pydantic BaseModel) for better type safety and validation.
TODO: Migrate to Pydantic BaseModel for:
- Type safety and IDE autocomplete
- Automatic validation
- Clear separation between domain model and serialization
- Database backend abstraction (file → postgres → mongodb)
- Configuration override chain resolution (Watch → Tag → Global)
- Immutability options
- Better testing
CHAIN RESOLUTION ARCHITECTURE:
The dream is a 3-level override hierarchy:
Watch settings → Tag/Group settings → Global settings
Current implementation: MANUAL resolution scattered across codebase
- Processors manually check watch.get('field')
- Loop through tags to find overrides_watch=True
- Fall back to datastore['settings']['application']['field']
Pydantic implementation: AUTOMATIC resolution via @computed_field
- Single source of truth for each setting's resolution logic
- Type-safe, testable, self-documenting
- Example: watch.resolved_fetch_backend (instead of nested dict navigation)
See: Watch.py model docstring for detailed Pydantic architecture plan
See: Tag.py model docstring for tag override explanation
See: processors/restock_diff/processor.py:184-192 for current manual example
Core Fields:
uuid (str): Unique identifier for this watch (auto-generated)
url (str): Target URL to monitor for changes
title (str|None): Custom display name (overrides page_title if set)
page_title (str|None): Title extracted from <title> tag of monitored page
tags (List[str]): List of tag UUIDs for categorization
tag (str): DEPRECATED - Old single-tag system, use tags instead
Check Configuration:
processor (str): Processor type ('text_json_diff', 'restock_diff', etc.)
fetch_backend (str): Fetcher to use ('system', 'html_requests', 'playwright', etc.)
method (str): HTTP method ('GET', 'POST', etc.)
headers (dict): Custom HTTP headers to send
proxy (str|None): Preferred proxy server
paused (bool): Whether change detection is paused
Scheduling:
time_between_check (dict): Check interval {'weeks': int, 'days': int, 'hours': int, 'minutes': int, 'seconds': int}
time_between_check_use_default (bool): Use global default interval if True
time_schedule_limit (dict): Weekly schedule limiting when checks can run
Structure: {
'enabled': bool,
'monday/tuesday/.../sunday': {
'enabled': bool,
'start_time': str ('HH:MM'),
'duration': {'hours': str, 'minutes': str}
}
}
Content Filtering:
include_filters (List[str]): CSS/XPath selectors to extract content
subtractive_selectors (List[str]): Selectors to remove from content
ignore_text (List[str]): Text patterns to ignore in change detection
trigger_text (List[str]): Text/regex that must be present to trigger change
text_should_not_be_present (List[str]): Text that should NOT be present
extract_text (List[str]): Regex patterns to extract specific text after filtering
Text Processing:
trim_text_whitespace (bool): Strip leading/trailing whitespace
sort_text_alphabetically (bool): Sort lines alphabetically before comparison
remove_duplicate_lines (bool): Remove duplicate lines
check_unique_lines (bool): Compare against all history for unique lines
strip_ignored_lines (bool|None): Remove lines matching ignore patterns
Change Detection Filters:
filter_text_added (bool): Include added text in change detection
filter_text_removed (bool): Include removed text in change detection
filter_text_replaced (bool): Include replaced text in change detection
Browser Automation:
browser_steps (List[dict]): Browser automation steps for JS-heavy sites
browser_steps_last_error_step (int|None): Last step that caused error
webdriver_delay (int|None): Seconds to wait after page load
webdriver_js_execute_code (str|None): JavaScript to execute before extraction
Restock Detection:
in_stock_only (bool): Only trigger on in-stock transitions
follow_price_changes (bool): Monitor price changes
has_ldjson_price_data (bool|None): Whether page has LD-JSON price data
track_ldjson_price_data (str|None): Track LD-JSON price data ('ACCEPT', 'REJECT', None)
price_change_threshold_percent (float|None): Minimum price change % to trigger
Notifications:
notification_urls (List[str]): Apprise URLs for notifications
notification_title (str|None): Custom notification title template
notification_body (str|None): Custom notification body template
notification_format (str): Notification format (e.g., 'System default', 'Text', 'HTML')
notification_muted (bool): Disable notifications for this watch
notification_screenshot (bool): Include screenshot in notifications
notification_alert_count (int): Number of notifications sent
last_notification_error (str|None): Last notification error message
body (str|None): DEPRECATED? Legacy notification body field
filter_failure_notification_send (bool): Send notification on filter failures
History & State:
date_created (int|None): Unix timestamp of watch creation
last_checked (int): Unix timestamp of last check
last_viewed (int): History snapshot key of last user view
last_error (str|bool): Last error message or False if no error
check_count (int): Total number of checks performed
fetch_time (float): Duration of last fetch in seconds
consecutive_filter_failures (int): Counter for consecutive filter match failures
previous_md5 (str|bool): MD5 hash of previous content
previous_md5_before_filters (str|bool): MD5 hash before filters applied
history_snapshot_max_length (int|None): Max history snapshots to keep (None = use global)
Conditions:
conditions (dict): Custom conditions for change detection logic
conditions_match_logic (str): Logic operator ('ALL', 'ANY') for conditions
Metadata:
content-type (str|None): Content-Type from last fetch
remote_server_reply (str|None): Server header from last response
ignore_status_codes (List[int]|None): HTTP status codes to ignore
use_page_title_in_list (bool|None): Display page title in watch list (None = use system default)
Instance Attributes (not serialized):
__datastore: Reference to parent DataStore (set externally after creation)
watch_data_dir: Filesystem path for this watch's data directory
Notes:
- Many fields default to None to distinguish "not set" from "set to default"
- When field is None, system-level defaults are used
- Processor-specific configs (e.g., processor_config_*) are NOT stored in watch.json
They are stored in separate {processor_name}.json files
- This class is used for both Watch and Tag objects (tags reuse the structure)
"""
def __init__(self, *arg, **kw): def __init__(self, *arg, **kw):
self.update({ self.update({

View File

@@ -56,9 +56,7 @@ class ChangeDetectionStore(DatastoreUpdatesMixin, FileSavingDataStore):
# Should only be active for docker # Should only be active for docker
# logging.basicConfig(filename='/dev/stdout', level=logging.INFO) # logging.basicConfig(filename='/dev/stdout', level=logging.INFO)
self.datastore_path = datastore_path self.datastore_path = datastore_path
self.needs_write = False
self.start_time = time.time() self.start_time = time.time()
self.stop_thread = False
self.save_version_copy_json_db(version_tag) self.save_version_copy_json_db(version_tag)
self.reload_state(datastore_path=datastore_path, include_default_watches=include_default_watches, version_tag=version_tag) self.reload_state(datastore_path=datastore_path, include_default_watches=include_default_watches, version_tag=version_tag)
@@ -286,19 +284,19 @@ class ChangeDetectionStore(DatastoreUpdatesMixin, FileSavingDataStore):
self.__data['app_guid'] = "test-" + str(uuid_builder.uuid4()) self.__data['app_guid'] = "test-" + str(uuid_builder.uuid4())
else: else:
self.__data['app_guid'] = str(uuid_builder.uuid4()) self.__data['app_guid'] = str(uuid_builder.uuid4())
self.mark_settings_dirty() self.commit()
# Ensure RSS access token exists # Ensure RSS access token exists
if not self.__data['settings']['application'].get('rss_access_token'): if not self.__data['settings']['application'].get('rss_access_token'):
secret = secrets.token_hex(16) secret = secrets.token_hex(16)
self.__data['settings']['application']['rss_access_token'] = secret self.__data['settings']['application']['rss_access_token'] = secret
self.mark_settings_dirty() self.commit()
# Ensure API access token exists # Ensure API access token exists
if not self.__data['settings']['application'].get('api_access_token'): if not self.__data['settings']['application'].get('api_access_token'):
secret = secrets.token_hex(16) secret = secrets.token_hex(16)
self.__data['settings']['application']['api_access_token'] = secret self.__data['settings']['application']['api_access_token'] = secret
self.mark_settings_dirty() self.commit()
# Handle password reset lockfile # Handle password reset lockfile
password_reset_lockfile = os.path.join(self.datastore_path, "removepassword.lock") password_reset_lockfile = os.path.join(self.datastore_path, "removepassword.lock")
@@ -306,9 +304,6 @@ class ChangeDetectionStore(DatastoreUpdatesMixin, FileSavingDataStore):
self.remove_password() self.remove_password()
unlink(password_reset_lockfile) unlink(password_reset_lockfile)
# Start the background save thread
self.start_save_thread()
def rehydrate_entity(self, uuid, entity, processor_override=None): def rehydrate_entity(self, uuid, entity, processor_override=None):
"""Set the dict back to the dict Watch object""" """Set the dict back to the dict Watch object"""
entity['uuid'] = uuid entity['uuid'] = uuid
@@ -375,22 +370,15 @@ class ChangeDetectionStore(DatastoreUpdatesMixin, FileSavingDataStore):
Implementation of abstract method from FileSavingDataStore. Implementation of abstract method from FileSavingDataStore.
Delegates to helper function and stores results in internal data structure. Delegates to helper function and stores results in internal data structure.
""" """
watching, watch_hashes = load_all_watches( watching = load_all_watches(
self.datastore_path, self.datastore_path,
self.rehydrate_entity, self.rehydrate_entity
self._compute_hash
) )
# Store loaded data # Store loaded data
self.__data['watching'] = watching self.__data['watching'] = watching
self._watch_hashes = watch_hashes
# Verify all watches have hashes logger.debug(f"Loaded {len(watching)} watches")
missing_hashes = [uuid for uuid in watching.keys() if uuid not in watch_hashes]
if missing_hashes:
logger.error(f"WARNING: {len(missing_hashes)} watches missing hashes after load: {missing_hashes[:5]}")
else:
logger.debug(f"All {len(watching)} watches have valid hashes")
def _delete_watch(self, uuid): def _delete_watch(self, uuid):
""" """
@@ -414,7 +402,7 @@ class ChangeDetectionStore(DatastoreUpdatesMixin, FileSavingDataStore):
def set_last_viewed(self, uuid, timestamp): def set_last_viewed(self, uuid, timestamp):
logger.debug(f"Setting watch UUID: {uuid} last viewed to {int(timestamp)}") logger.debug(f"Setting watch UUID: {uuid} last viewed to {int(timestamp)}")
self.data['watching'][uuid].update({'last_viewed': int(timestamp)}) self.data['watching'][uuid].update({'last_viewed': int(timestamp)})
self.mark_watch_dirty(uuid) self.data['watching'][uuid].commit()
watch_check_update = signal('watch_check_update') watch_check_update = signal('watch_check_update')
if watch_check_update: if watch_check_update:
@@ -422,7 +410,22 @@ class ChangeDetectionStore(DatastoreUpdatesMixin, FileSavingDataStore):
def remove_password(self): def remove_password(self):
self.__data['settings']['application']['password'] = False self.__data['settings']['application']['password'] = False
self.mark_settings_dirty() self.commit()
def commit(self):
"""
Save settings immediately to disk using atomic write.
Uses atomic write pattern (temp file + rename) for crash safety.
Fire-and-forget: Logs errors but does not raise exceptions.
Settings data remains in memory even if save fails, so next commit will retry.
"""
try:
self._save_settings()
logger.debug("Committed settings")
except Exception as e:
logger.error(f"Failed to commit settings: {e}")
def update_watch(self, uuid, update_obj): def update_watch(self, uuid, update_obj):
@@ -441,7 +444,8 @@ class ChangeDetectionStore(DatastoreUpdatesMixin, FileSavingDataStore):
self.__data['watching'][uuid].update(update_obj) self.__data['watching'][uuid].update(update_obj)
self.mark_watch_dirty(uuid) # Immediate save
self.__data['watching'][uuid].commit()
@property @property
def threshold_seconds(self): def threshold_seconds(self):
@@ -502,10 +506,6 @@ class ChangeDetectionStore(DatastoreUpdatesMixin, FileSavingDataStore):
except Exception as e: except Exception as e:
logger.error(f"Failed to delete watch {watch_uuid} from storage: {e}") logger.error(f"Failed to delete watch {watch_uuid} from storage: {e}")
# Clean up tracking data
self._watch_hashes.pop(watch_uuid, None)
self._dirty_watches.discard(watch_uuid)
# Send delete signal # Send delete signal
watch_delete_signal = signal('watch_deleted') watch_delete_signal = signal('watch_deleted')
if watch_delete_signal: if watch_delete_signal:
@@ -527,17 +527,11 @@ class ChangeDetectionStore(DatastoreUpdatesMixin, FileSavingDataStore):
# Remove from watching dict # Remove from watching dict
del self.data['watching'][uuid] del self.data['watching'][uuid]
# Clean up tracking data
self._watch_hashes.pop(uuid, None)
self._dirty_watches.discard(uuid)
# Send delete signal # Send delete signal
watch_delete_signal = signal('watch_deleted') watch_delete_signal = signal('watch_deleted')
if watch_delete_signal: if watch_delete_signal:
watch_delete_signal.send(watch_uuid=uuid) watch_delete_signal.send(watch_uuid=uuid)
self.needs_write_urgent = True
# Clone a watch by UUID # Clone a watch by UUID
def clone(self, uuid): def clone(self, uuid):
url = self.data['watching'][uuid].get('url') url = self.data['watching'][uuid].get('url')
@@ -562,7 +556,7 @@ class ChangeDetectionStore(DatastoreUpdatesMixin, FileSavingDataStore):
# Remove a watchs data but keep the entry (URL etc) # Remove a watchs data but keep the entry (URL etc)
def clear_watch_history(self, uuid): def clear_watch_history(self, uuid):
self.__data['watching'][uuid].clear_watch() self.__data['watching'][uuid].clear_watch()
self.needs_write_urgent = True self.__data['watching'][uuid].commit()
def add_watch(self, url, tag='', extras=None, tag_uuids=None, save_immediately=True): def add_watch(self, url, tag='', extras=None, tag_uuids=None, save_immediately=True):
@@ -675,16 +669,9 @@ class ChangeDetectionStore(DatastoreUpdatesMixin, FileSavingDataStore):
self.__data['watching'][new_uuid] = new_watch self.__data['watching'][new_uuid] = new_watch
if save_immediately: if save_immediately:
# Save immediately using polymorphic method # Save immediately using commit
try: new_watch.commit()
self.save_watch(new_uuid, force=True) logger.debug(f"Saved new watch {new_uuid}")
logger.debug(f"Saved new watch {new_uuid}")
except Exception as e:
logger.error(f"Failed to save new watch {new_uuid}: {e}")
# Mark dirty for retry
self.mark_watch_dirty(new_uuid)
else:
self.mark_watch_dirty(new_uuid)
logger.debug(f"Added '{url}'") logger.debug(f"Added '{url}'")
@@ -889,7 +876,7 @@ class ChangeDetectionStore(DatastoreUpdatesMixin, FileSavingDataStore):
self.__data['settings']['application']['tags'][new_uuid] = new_tag self.__data['settings']['application']['tags'][new_uuid] = new_tag
self.mark_settings_dirty() self.commit()
return new_uuid return new_uuid
def get_all_tags_for_watch(self, uuid): def get_all_tags_for_watch(self, uuid):
@@ -1006,7 +993,7 @@ class ChangeDetectionStore(DatastoreUpdatesMixin, FileSavingDataStore):
notification_urls.append(notification_url) notification_urls.append(notification_url)
self.__data['settings']['application']['notification_urls'] = notification_urls self.__data['settings']['application']['notification_urls'] = notification_urls
self.mark_settings_dirty() self.commit()
return notification_url return notification_url
# Schema update methods moved to store/updates.py (DatastoreUpdatesMixin) # Schema update methods moved to store/updates.py (DatastoreUpdatesMixin)

View File

@@ -81,20 +81,3 @@ class DataStore(ABC):
""" """
pass pass
@abstractmethod
def force_save_all(self):
"""
Force immediate synchronous save of all data to storage.
This is the abstract method for forcing a complete save.
Different backends implement this differently:
- File backend: Mark all watches/settings dirty, then save
- Redis backend: SAVE command or pipeline flush
- SQL backend: COMMIT transaction
Used by:
- Backup creation (ensure everything is saved before backup)
- Shutdown (ensure all changes are persisted)
- Manual save operations
"""
pass

View File

@@ -1,22 +1,17 @@
""" """
File-based datastore with individual watch persistence and dirty tracking. File-based datastore with individual watch persistence and immediate commits.
This module provides the FileSavingDataStore abstract class that implements: This module provides the FileSavingDataStore abstract class that implements:
- Individual watch.json file persistence - Individual watch.json file persistence
- Hash-based change detection (only save what changed) - Immediate commit-based persistence (watch.commit(), datastore.commit())
- Periodic audit scan (catches unmarked changes)
- Background save thread with batched parallel saves
- Atomic file writes safe for NFS/NAS - Atomic file writes safe for NFS/NAS
""" """
import glob import glob
import hashlib
import json import json
import os import os
import tempfile import tempfile
import time import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from threading import Thread
from loguru import logger from loguru import logger
from .base import DataStore from .base import DataStore
@@ -34,19 +29,6 @@ except ImportError:
# Set to True for mission-critical deployments requiring crash consistency # Set to True for mission-critical deployments requiring crash consistency
FORCE_FSYNC_DATA_IS_CRITICAL = bool(strtobool(os.getenv('FORCE_FSYNC_DATA_IS_CRITICAL', 'False'))) FORCE_FSYNC_DATA_IS_CRITICAL = bool(strtobool(os.getenv('FORCE_FSYNC_DATA_IS_CRITICAL', 'False')))
# Save interval configuration: How often the background thread saves dirty items
# Default 10 seconds - increase for less frequent saves, decrease for more frequent
DATASTORE_SCAN_DIRTY_SAVE_INTERVAL_SECONDS = int(os.getenv('DATASTORE_SCAN_DIRTY_SAVE_INTERVAL_SECONDS', '10'))
# Rolling audit configuration: Scans a fraction of watches each cycle
# Default: Run audit every 10s, split into 5 shards
# Full audit completes every 50s (10s × 5 shards)
# With 56k watches: 56k / 5 = ~11k watches per cycle (~60ms vs 316ms for all)
# Handles dynamic watch count - recalculates shard boundaries each cycle
DATASTORE_AUDIT_INTERVAL_SECONDS = int(os.getenv('DATASTORE_AUDIT_INTERVAL_SECONDS', '10'))
DATASTORE_AUDIT_SHARDS = int(os.getenv('DATASTORE_AUDIT_SHARDS', '5'))
# ============================================================================ # ============================================================================
# Helper Functions for Atomic File Operations # Helper Functions for Atomic File Operations
# ============================================================================ # ============================================================================
@@ -61,6 +43,9 @@ def save_json_atomic(file_path, data_dict, label="file", max_size_mb=10):
- Size validation - Size validation
- Proper error handling - Proper error handling
Thread safety: Caller must hold datastore.lock to prevent concurrent modifications.
Multi-process safety: Not supported - run only one app instance per datastore.
Args: Args:
file_path: Full path to target JSON file file_path: Full path to target JSON file
data_dict: Dictionary to serialize data_dict: Dictionary to serialize
@@ -242,11 +227,6 @@ def load_watch_from_file(watch_json, uuid, rehydrate_entity_func):
with open(watch_json, 'r', encoding='utf-8') as f: with open(watch_json, 'r', encoding='utf-8') as f:
watch_data = json.load(f) watch_data = json.load(f)
if watch_data.get('time_schedule_limit'):
del watch_data['time_schedule_limit']
if watch_data.get('time_between_check'):
del watch_data['time_between_check']
# Return both the raw data and the rehydrated watch # Return both the raw data and the rehydrated watch
# Raw data is needed to compute hash before rehydration changes anything # Raw data is needed to compute hash before rehydration changes anything
watch_obj = rehydrate_entity_func(uuid, watch_data) watch_obj = rehydrate_entity_func(uuid, watch_data)
@@ -278,7 +258,7 @@ def load_watch_from_file(watch_json, uuid, rehydrate_entity_func):
return None, None return None, None
def load_all_watches(datastore_path, rehydrate_entity_func, compute_hash_func): def load_all_watches(datastore_path, rehydrate_entity_func):
""" """
Load all watches from individual watch.json files. Load all watches from individual watch.json files.
@@ -289,21 +269,17 @@ def load_all_watches(datastore_path, rehydrate_entity_func, compute_hash_func):
Args: Args:
datastore_path: Path to the datastore directory datastore_path: Path to the datastore directory
rehydrate_entity_func: Function to convert dict to Watch object rehydrate_entity_func: Function to convert dict to Watch object
compute_hash_func: Function to compute hash from raw watch dict
Returns: Returns:
Tuple of (watching_dict, hashes_dict) Dictionary of uuid -> Watch object
- watching_dict: uuid -> Watch object
- hashes_dict: uuid -> hash string (computed from raw data)
""" """
start_time = time.time() start_time = time.time()
logger.info("Loading watches from individual watch.json files...") logger.info("Loading watches from individual watch.json files...")
watching = {} watching = {}
watch_hashes = {}
if not os.path.exists(datastore_path): if not os.path.exists(datastore_path):
return watching, watch_hashes return watching
# Find all watch.json files using glob (faster than manual directory traversal) # Find all watch.json files using glob (faster than manual directory traversal)
glob_start = time.time() glob_start = time.time()
@@ -322,9 +298,6 @@ def load_all_watches(datastore_path, rehydrate_entity_func, compute_hash_func):
watch, raw_data = load_watch_from_file(watch_json, uuid_dir, rehydrate_entity_func) watch, raw_data = load_watch_from_file(watch_json, uuid_dir, rehydrate_entity_func)
if watch and raw_data: if watch and raw_data:
watching[uuid_dir] = watch watching[uuid_dir] = watch
# Compute hash from rehydrated Watch object (as dict) to match how we compute on save
# This ensures hash matches what audit will compute from dict(watch)
watch_hashes[uuid_dir] = compute_hash_func(dict(watch))
loaded += 1 loaded += 1
if loaded % 100 == 0: if loaded % 100 == 0:
@@ -344,7 +317,7 @@ def load_all_watches(datastore_path, rehydrate_entity_func, compute_hash_func):
else: else:
logger.info(f"Loaded {loaded} watches from disk in {elapsed:.2f}s ({loaded/elapsed:.0f} watches/sec)") logger.info(f"Loaded {loaded} watches from disk in {elapsed:.2f}s ({loaded/elapsed:.0f} watches/sec)")
return watching, watch_hashes return watching
# ============================================================================ # ============================================================================
@@ -353,151 +326,20 @@ def load_all_watches(datastore_path, rehydrate_entity_func, compute_hash_func):
class FileSavingDataStore(DataStore): class FileSavingDataStore(DataStore):
""" """
Abstract datastore that provides file persistence with change tracking. Abstract datastore that provides file persistence with immediate commits.
Features: Features:
- Individual watch.json files (one per watch) - Individual watch.json files (one per watch)
- Dirty tracking: Only saves items that have changed - Immediate persistence via watch.commit() and datastore.commit()
- Hash-based change detection: Prevents unnecessary writes - Atomic file writes for crash safety
- Background save thread: Non-blocking persistence
- Two-tier urgency: Standard (60s) and urgent (immediate) saves
Subclasses must implement: Subclasses must implement:
- rehydrate_entity(): Convert dict to Watch object - rehydrate_entity(): Convert dict to Watch object
- Access to internal __data structure for watch management - Access to internal __data structure for watch management
""" """
needs_write = False
needs_write_urgent = False
stop_thread = False
# Change tracking
_dirty_watches = set() # Watch UUIDs that need saving
_dirty_settings = False # Settings changed
_watch_hashes = {} # UUID -> SHA256 hash for change detection
# Health monitoring
_last_save_time = 0 # Timestamp of last successful save
_last_audit_time = 0 # Timestamp of last audit scan
_save_cycle_count = 0 # Number of save cycles completed
_total_saves = 0 # Total watches saved (lifetime)
_save_errors = 0 # Total save errors (lifetime)
_audit_count = 0 # Number of audit scans completed
_audit_found_changes = 0 # Total unmarked changes found by audits
_audit_shard_index = 0 # Current shard being audited (rolling audit)
def __init__(self): def __init__(self):
super().__init__() super().__init__()
self.save_data_thread = None
self._last_save_time = time.time()
self._last_audit_time = time.time()
def mark_watch_dirty(self, uuid):
"""
Mark a watch as needing save.
Args:
uuid: Watch UUID
"""
with self.lock:
self._dirty_watches.add(uuid)
dirty_count = len(self._dirty_watches)
# Backpressure detection - warn if dirty set grows too large
if dirty_count > 1000:
logger.critical(
f"BACKPRESSURE WARNING: {dirty_count} watches pending save! "
f"Save thread may not be keeping up with write rate. "
f"This could indicate disk I/O bottleneck or save thread failure."
)
elif dirty_count > 500:
logger.warning(
f"Dirty watch count high: {dirty_count} watches pending save. "
f"Monitoring for potential backpressure."
)
self.needs_write = True
def mark_settings_dirty(self):
"""Mark settings as needing save."""
with self.lock:
self._dirty_settings = True
self.needs_write = True
def _compute_hash(self, watch_dict):
"""
Compute SHA256 hash of watch for change detection.
Args:
watch_dict: Dictionary representation of watch
Returns:
Hex string of SHA256 hash
"""
# Use orjson for deterministic serialization if available
if HAS_ORJSON:
json_bytes = orjson.dumps(watch_dict, option=orjson.OPT_SORT_KEYS)
else:
json_str = json.dumps(watch_dict, sort_keys=True, ensure_ascii=False)
json_bytes = json_str.encode('utf-8')
return hashlib.sha256(json_bytes).hexdigest()
def save_watch(self, uuid, force=False, watch_dict=None, current_hash=None):
"""
Save a single watch if it has changed (polymorphic method).
Args:
uuid: Watch UUID
force: If True, skip hash check and save anyway
watch_dict: Pre-computed watch dictionary (optimization)
current_hash: Pre-computed hash (optimization)
Returns:
True if saved, False if skipped (unchanged)
"""
if not self._watch_exists(uuid):
logger.warning(f"Cannot save watch {uuid} - does not exist")
return False
# Get watch dict if not provided
if watch_dict is None:
watch_dict = self._get_watch_dict(uuid)
# Compute hash if not provided
if current_hash is None:
current_hash = self._compute_hash(watch_dict)
# Skip save if unchanged (unless forced)
if not force and current_hash == self._watch_hashes.get(uuid):
return False
try:
self._save_watch(uuid, watch_dict)
self._watch_hashes[uuid] = current_hash
logger.debug(f"Saved watch {uuid}")
return True
except Exception as e:
logger.error(f"Failed to save watch {uuid}: {e}")
raise
def _save_watch(self, uuid, watch_dict):
"""
Save a single watch to storage (polymorphic).
Backend-specific implementation. Subclasses override for different storage:
- File backend: Writes to {uuid}/watch.json
- Redis backend: SET watch:{uuid}
- SQL backend: UPDATE watches WHERE uuid=?
Args:
uuid: Watch UUID
watch_dict: Dictionary representation of watch
"""
# Default file implementation
watch_dir = os.path.join(self.datastore_path, uuid)
save_watch_atomic(watch_dir, uuid, watch_dict)
def _save_settings(self): def _save_settings(self):
""" """
@@ -510,6 +352,7 @@ class FileSavingDataStore(DataStore):
""" """
raise NotImplementedError("Subclass must implement _save_settings") raise NotImplementedError("Subclass must implement _save_settings")
def _load_watches(self): def _load_watches(self):
""" """
Load all watches from storage (polymorphic). Load all watches from storage (polymorphic).
@@ -535,364 +378,4 @@ class FileSavingDataStore(DataStore):
""" """
raise NotImplementedError("Subclass must implement _delete_watch") raise NotImplementedError("Subclass must implement _delete_watch")
def _save_dirty_items(self):
"""
Save dirty watches and settings.
This is the core optimization: instead of saving the entire datastore,
we only save watches that were marked dirty and settings if changed.
"""
start_time = time.time()
# Capture dirty sets under lock
with self.lock:
dirty_watches = list(self._dirty_watches)
dirty_settings = self._dirty_settings
self._dirty_watches.clear()
self._dirty_settings = False
if not dirty_watches and not dirty_settings:
return
logger.trace(f"Saving {len(dirty_watches)} dirty watches, settings_dirty={dirty_settings}")
# Save each dirty watch using the polymorphic save method
saved_count = 0
error_count = 0
skipped_unchanged = 0
# Process in batches of 50, using thread pool for parallel saves
BATCH_SIZE = 50
MAX_WORKERS = 20 # Number of parallel save threads
def save_single_watch(uuid):
"""Helper function for thread pool execution."""
try:
# Check if watch still exists (might have been deleted)
if not self._watch_exists(uuid):
# Watch was deleted, remove hash
self._watch_hashes.pop(uuid, None)
return {'status': 'deleted', 'uuid': uuid}
# Pre-check hash to avoid unnecessary save_watch() calls
watch_dict = self._get_watch_dict(uuid)
current_hash = self._compute_hash(watch_dict)
if current_hash == self._watch_hashes.get(uuid):
# Watch hasn't actually changed, skip
return {'status': 'unchanged', 'uuid': uuid}
# Pass pre-computed values to avoid redundant serialization/hashing
if self.save_watch(uuid, force=True, watch_dict=watch_dict, current_hash=current_hash):
return {'status': 'saved', 'uuid': uuid}
else:
return {'status': 'skipped', 'uuid': uuid}
except Exception as e:
logger.error(f"Error saving watch {uuid}: {e}")
return {'status': 'error', 'uuid': uuid, 'error': e}
# Process dirty watches in batches
for batch_start in range(0, len(dirty_watches), BATCH_SIZE):
batch = dirty_watches[batch_start:batch_start + BATCH_SIZE]
batch_num = (batch_start // BATCH_SIZE) + 1
total_batches = (len(dirty_watches) + BATCH_SIZE - 1) // BATCH_SIZE
if len(dirty_watches) > BATCH_SIZE:
logger.trace(f"Save batch {batch_num}/{total_batches} ({len(batch)} watches)")
# Use thread pool to save watches in parallel
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
# Submit all save tasks
future_to_uuid = {executor.submit(save_single_watch, uuid): uuid for uuid in batch}
# Collect results as they complete
for future in as_completed(future_to_uuid):
result = future.result()
status = result['status']
if status == 'saved':
saved_count += 1
elif status == 'unchanged':
skipped_unchanged += 1
elif status == 'error':
error_count += 1
# Re-mark for retry
with self.lock:
self._dirty_watches.add(result['uuid'])
# 'deleted' and 'skipped' don't need special handling
# Save settings if changed
if dirty_settings:
try:
self._save_settings()
logger.debug("Saved settings")
except Exception as e:
logger.error(f"Failed to save settings: {e}")
error_count += 1
with self.lock:
self._dirty_settings = True
# Update metrics
elapsed = time.time() - start_time
self._save_cycle_count += 1
self._total_saves += saved_count
self._save_errors += error_count
self._last_save_time = time.time()
# Log performance metrics
if saved_count > 0:
avg_time_per_watch = (elapsed / saved_count) * 1000 # milliseconds
skipped_msg = f", {skipped_unchanged} unchanged" if skipped_unchanged > 0 else ""
parallel_msg = f" [parallel: {MAX_WORKERS} workers]" if saved_count > 1 else ""
logger.info(
f"Successfully saved {saved_count} watches in {elapsed:.2f}s "
f"(avg {avg_time_per_watch:.1f}ms per watch{skipped_msg}){parallel_msg}. "
f"Total: {self._total_saves} saves, {self._save_errors} errors (lifetime)"
)
elif skipped_unchanged > 0:
logger.debug(f"Save cycle: {skipped_unchanged} watches verified unchanged (hash match), nothing saved")
if error_count > 0:
logger.error(f"Save cycle completed with {error_count} errors")
self.needs_write = False
self.needs_write_urgent = False
def _watch_exists(self, uuid):
"""
Check if watch exists. Subclass must implement.
Args:
uuid: Watch UUID
Returns:
bool
"""
raise NotImplementedError("Subclass must implement _watch_exists")
def _get_watch_dict(self, uuid):
"""
Get watch as dictionary. Subclass must implement.
Args:
uuid: Watch UUID
Returns:
Dictionary representation of watch
"""
raise NotImplementedError("Subclass must implement _get_watch_dict")
def _audit_all_watches(self):
"""
Rolling audit: Scans a fraction of watches to detect unmarked changes.
Instead of scanning ALL watches at once, this scans 1/N shards per cycle.
The shard rotates each cycle, completing a full audit every N cycles.
Handles dynamic watch count - recalculates shard boundaries each cycle,
so newly added watches will be audited in subsequent cycles.
Benefits:
- Lower CPU per cycle (56k / 5 = ~11k watches vs all 56k)
- More frequent audits overall (every 50s vs every 10s)
- Spreads load evenly across time
"""
audit_start = time.time()
# Get list of all watch UUIDs (read-only, no lock needed)
try:
all_uuids = list(self.data['watching'].keys())
except (KeyError, AttributeError, RuntimeError):
# Data structure not ready or being modified
return
if not all_uuids:
return
total_watches = len(all_uuids)
# Calculate this cycle's shard boundaries
# Example: 56,278 watches / 5 shards = 11,255 watches per shard
# Shard 0: [0:11255], Shard 1: [11255:22510], etc.
shard_size = (total_watches + DATASTORE_AUDIT_SHARDS - 1) // DATASTORE_AUDIT_SHARDS
start_idx = self._audit_shard_index * shard_size
end_idx = min(start_idx + shard_size, total_watches)
# Handle wrap-around (shouldn't happen normally, but defensive)
if start_idx >= total_watches:
self._audit_shard_index = 0
start_idx = 0
end_idx = min(shard_size, total_watches)
# Audit only this shard's watches
shard_uuids = all_uuids[start_idx:end_idx]
changes_found = 0
errors = 0
for uuid in shard_uuids:
try:
# Get current watch dict and compute hash
watch_dict = self._get_watch_dict(uuid)
current_hash = self._compute_hash(watch_dict)
stored_hash = self._watch_hashes.get(uuid)
# If hash changed and not already marked dirty, mark it
if current_hash != stored_hash:
with self.lock:
if uuid not in self._dirty_watches:
self._dirty_watches.add(uuid)
changes_found += 1
logger.warning(
f"Audit detected unmarked change in watch {uuid[:8]}... current {current_hash:8} stored hash {stored_hash[:8]}"
f"(hash changed but not marked dirty)"
)
self.needs_write = True
except Exception as e:
errors += 1
logger.trace(f"Audit error for watch {uuid[:8]}...: {e}")
audit_elapsed = (time.time() - audit_start) * 1000 # milliseconds
# Advance to next shard (wrap around after last shard)
self._audit_shard_index = (self._audit_shard_index + 1) % DATASTORE_AUDIT_SHARDS
# Update metrics
self._audit_count += 1
self._audit_found_changes += changes_found
self._last_audit_time = time.time()
if changes_found > 0:
logger.warning(
f"Audit shard {self._audit_shard_index}/{DATASTORE_AUDIT_SHARDS} found {changes_found} "
f"unmarked changes in {len(shard_uuids)}/{total_watches} watches ({audit_elapsed:.1f}ms)"
)
else:
logger.trace(
f"Audit shard {self._audit_shard_index}/{DATASTORE_AUDIT_SHARDS}: "
f"{len(shard_uuids)}/{total_watches} watches checked, 0 changes ({audit_elapsed:.1f}ms)"
)
def save_datastore(self):
"""
Background thread that periodically saves dirty items and audits watches.
Runs two independent cycles:
1. Save dirty items every DATASTORE_SCAN_DIRTY_SAVE_INTERVAL_SECONDS (default 10s)
2. Rolling audit: every DATASTORE_AUDIT_INTERVAL_SECONDS (default 10s)
- Scans 1/DATASTORE_AUDIT_SHARDS watches per cycle (default 1/5)
- Full audit completes every 50s (10s × 5 shards)
- Automatically handles new/deleted watches
Uses 0.5s sleep intervals for responsiveness to urgent saves.
"""
while True:
if self.stop_thread:
# Graceful shutdown: flush any remaining dirty items before stopping
if self.needs_write or self._dirty_watches or self._dirty_settings:
logger.warning("Datastore save thread stopping - flushing remaining dirty items...")
try:
self._save_dirty_items()
logger.info("Graceful shutdown complete - all data saved")
except Exception as e:
logger.critical(f"FAILED to save dirty items during shutdown: {e}")
else:
logger.info("Datastore save thread stopping - no dirty items")
return
# Check if it's time to run audit scan (every N seconds)
if time.time() - self._last_audit_time >= DATASTORE_AUDIT_INTERVAL_SECONDS:
try:
self._audit_all_watches()
except Exception as e:
logger.error(f"Error in audit cycle: {e}")
# Save dirty items if needed
if self.needs_write or self.needs_write_urgent:
try:
self._save_dirty_items()
except Exception as e:
logger.error(f"Error in save cycle: {e}")
# Timer with early break for urgent saves
# Each iteration is 0.5 seconds, so iterations = DATASTORE_SCAN_DIRTY_SAVE_INTERVAL_SECONDS * 2
for i in range(DATASTORE_SCAN_DIRTY_SAVE_INTERVAL_SECONDS * 2):
time.sleep(0.5)
if self.stop_thread or self.needs_write_urgent:
break
def start_save_thread(self):
"""Start the background save thread."""
if not self.save_data_thread or not self.save_data_thread.is_alive():
self.save_data_thread = Thread(target=self.save_datastore, daemon=True, name="DatastoreSaver")
self.save_data_thread.start()
logger.info("Datastore save thread started")
def force_save_all(self):
"""
Force immediate synchronous save of all changes to storage.
File backend implementation of the abstract force_save_all() method.
Marks all watches and settings as dirty, then saves immediately.
Used by:
- Backup creation (ensure everything is saved before backup)
- Shutdown (ensure all changes are persisted)
- Manual save operations
"""
logger.info("Force saving all data to storage...")
# Mark everything as dirty to ensure complete save
for uuid in self.data['watching'].keys():
self.mark_watch_dirty(uuid)
self.mark_settings_dirty()
# Save immediately (synchronous)
self._save_dirty_items()
logger.success("All data saved to storage")
def get_health_status(self):
"""
Get datastore health status for monitoring.
Returns:
dict with health metrics and status
"""
now = time.time()
time_since_last_save = now - self._last_save_time
with self.lock:
dirty_count = len(self._dirty_watches)
is_thread_alive = self.save_data_thread and self.save_data_thread.is_alive()
# Determine health status
if not is_thread_alive:
status = "CRITICAL"
message = "Save thread is DEAD"
elif time_since_last_save > 300: # 5 minutes
status = "WARNING"
message = f"No save activity for {time_since_last_save:.0f}s"
elif dirty_count > 1000:
status = "WARNING"
message = f"High backpressure: {dirty_count} watches pending"
elif self._save_errors > 0 and (self._save_errors / max(self._total_saves, 1)) > 0.01:
status = "WARNING"
message = f"High error rate: {self._save_errors} errors"
else:
status = "HEALTHY"
message = "Operating normally"
return {
"status": status,
"message": message,
"thread_alive": is_thread_alive,
"dirty_watches": dirty_count,
"dirty_settings": self._dirty_settings,
"last_save_seconds_ago": int(time_since_last_save),
"save_cycles": self._save_cycle_count,
"total_saves": self._total_saves,
"total_errors": self._save_errors,
"error_rate_percent": round((self._save_errors / max(self._total_saves, 1)) * 100, 2)
}

View File

@@ -168,7 +168,7 @@ class DatastoreUpdatesMixin:
latest_update = updates_available[-1] if updates_available else 0 latest_update = updates_available[-1] if updates_available else 0
logger.info(f"No schema version found and no watches exist - assuming fresh install, setting schema_version to {latest_update}") logger.info(f"No schema version found and no watches exist - assuming fresh install, setting schema_version to {latest_update}")
self.data['settings']['application']['schema_version'] = latest_update self.data['settings']['application']['schema_version'] = latest_update
self.mark_settings_dirty() self.commit()
return # No updates needed for fresh install return # No updates needed for fresh install
else: else:
# Has watches but no schema version - likely old datastore, run all updates # Has watches but no schema version - likely old datastore, run all updates
@@ -201,14 +201,14 @@ class DatastoreUpdatesMixin:
else: else:
# Bump the version, important # Bump the version, important
self.data['settings']['application']['schema_version'] = update_n self.data['settings']['application']['schema_version'] = update_n
self.mark_settings_dirty() self.commit()
# CRITICAL: Mark all watches as dirty so changes are persisted # CRITICAL: Save all watches so changes are persisted
# Most updates modify watches, and in the new individual watch.json structure, # Most updates modify watches, and in the new individual watch.json structure,
# we need to ensure those changes are saved # we need to ensure those changes are saved
logger.info(f"Marking all {len(self.data['watching'])} watches as dirty after update_{update_n} (so that it saves them to disk)") logger.info(f"Saving all {len(self.data['watching'])} watches after update_{update_n} (so that it saves them to disk)")
for uuid in self.data['watching'].keys(): for uuid in self.data['watching'].keys():
self.mark_watch_dirty(uuid) self.data['watching'][uuid].commit()
# Save changes immediately after each update (more resilient than batching) # Save changes immediately after each update (more resilient than batching)
logger.critical(f"Saving all changes after update_{update_n}") logger.critical(f"Saving all changes after update_{update_n}")
@@ -662,7 +662,7 @@ class DatastoreUpdatesMixin:
updates_available = self.get_updates_available() updates_available = self.get_updates_available()
latest_schema = updates_available[-1] if updates_available else 26 latest_schema = updates_available[-1] if updates_available else 26
self.data['settings']['application']['schema_version'] = latest_schema self.data['settings']['application']['schema_version'] = latest_schema
self.mark_settings_dirty() self.commit()
logger.info(f"Set schema_version to {latest_schema} (migration complete, all watches already saved)") logger.info(f"Set schema_version to {latest_schema} (migration complete, all watches already saved)")
logger.critical("=" * 80) logger.critical("=" * 80)

View File

@@ -308,10 +308,6 @@ def prepare_test_function(live_server, datastore_path):
# Prevent background thread from writing during cleanup/reload
datastore.needs_write = False
datastore.needs_write_urgent = False
# CRITICAL: Clean up any files from previous tests # CRITICAL: Clean up any files from previous tests
# This ensures a completely clean directory # This ensures a completely clean directory
cleanup(datastore_path) cleanup(datastore_path)
@@ -344,7 +340,6 @@ def prepare_test_function(live_server, datastore_path):
break break
datastore.data['watching'] = {} datastore.data['watching'] = {}
datastore.needs_write = True
except Exception as e: except Exception as e:
logger.warning(f"Error during datastore cleanup: {e}") logger.warning(f"Error during datastore cleanup: {e}")

View File

@@ -465,7 +465,10 @@ def test_api_watch_PUT_update(client, live_server, measure_memory_usage, datasto
assert res.status_code == 400, "Should get error 400 when we give a field that doesnt exist" assert res.status_code == 400, "Should get error 400 when we give a field that doesnt exist"
# Message will come from `flask_expects_json` # Message will come from `flask_expects_json`
assert b'Additional properties are not allowed' in res.data # With patternProperties for processor_config_*, the error message format changed slightly
assert (b'Additional properties are not allowed' in res.data or
b'does not match any of the regexes' in res.data), \
"Should reject unknown fields with schema validation error"
# Try a XSS URL # Try a XSS URL

View File

@@ -80,7 +80,10 @@ def test_openapi_validation_invalid_field_in_request_body(client, live_server, m
# Should get 400 error due to invalid field (this will be caught by internal validation) # Should get 400 error due to invalid field (this will be caught by internal validation)
# Note: This tests the flow where OpenAPI validation passes but internal validation catches it # Note: This tests the flow where OpenAPI validation passes but internal validation catches it
assert res.status_code == 400, f"Expected 400 but got {res.status_code}" assert res.status_code == 400, f"Expected 400 but got {res.status_code}"
assert b"Additional properties are not allowed" in res.data, "Should contain validation error about additional properties" # With patternProperties for processor_config_*, the error message format changed slightly
assert (b"Additional properties are not allowed" in res.data or
b"does not match any of the regexes" in res.data), \
"Should contain validation error about additional/invalid properties"
def test_openapi_validation_import_wrong_content_type(client, live_server, measure_memory_usage, datastore_path): def test_openapi_validation_import_wrong_content_type(client, live_server, measure_memory_usage, datastore_path):

View File

@@ -0,0 +1,661 @@
#!/usr/bin/env python3
"""
Tests for immediate commit-based persistence system.
Tests cover:
- Watch.commit() persistence to disk
- Concurrent commit safety (race conditions)
- Processor config separation
- Data loss prevention (settings, tags, watch modifications)
"""
import json
import os
import threading
import time
from flask import url_for
from .util import wait_for_all_checks
# ==============================================================================
# 2. Commit() Persistence Tests
# ==============================================================================
def test_watch_commit_persists_to_disk(client, live_server):
"""Test that watch.commit() actually writes to watch.json immediately"""
datastore = client.application.config.get('DATASTORE')
# Create a watch
uuid = datastore.add_watch(url='http://example.com', extras={'title': 'Original Title'})
watch = datastore.data['watching'][uuid]
# Modify and commit
watch['title'] = 'Modified Title'
watch['paused'] = True
watch.commit()
# Read directly from disk (bypass datastore cache)
watch_json_path = os.path.join(watch.watch_data_dir, 'watch.json')
assert os.path.exists(watch_json_path), "watch.json should exist on disk"
with open(watch_json_path, 'r') as f:
disk_data = json.load(f)
assert disk_data['title'] == 'Modified Title', "Title should be persisted to disk"
assert disk_data['paused'] == True, "Paused state should be persisted to disk"
assert disk_data['uuid'] == uuid, "UUID should match"
def test_watch_commit_survives_reload(client, live_server):
"""Test that committed changes survive datastore reload"""
from changedetectionio.store import ChangeDetectionStore
datastore = client.application.config.get('DATASTORE')
datastore_path = datastore.datastore_path
# Create and modify a watch
uuid = datastore.add_watch(url='http://example.com', extras={'title': 'Test Watch'})
watch = datastore.data['watching'][uuid]
watch['title'] = 'Persisted Title'
watch['paused'] = True
watch['tags'] = ['tag-1', 'tag-2']
watch.commit()
# Simulate app restart - create new datastore instance
datastore2 = ChangeDetectionStore(datastore_path=datastore_path)
datastore2.reload_state(
datastore_path=datastore_path,
include_default_watches=False,
version_tag='test'
)
# Check data survived
assert uuid in datastore2.data['watching'], "Watch should exist after reload"
reloaded_watch = datastore2.data['watching'][uuid]
assert reloaded_watch['title'] == 'Persisted Title', "Title should survive reload"
assert reloaded_watch['paused'] == True, "Paused state should survive reload"
assert reloaded_watch['tags'] == ['tag-1', 'tag-2'], "Tags should survive reload"
def test_watch_commit_atomic_on_crash(client, live_server):
"""Test that atomic writes prevent corruption (temp file pattern)"""
datastore = client.application.config.get('DATASTORE')
uuid = datastore.add_watch(url='http://example.com', extras={'title': 'Original'})
watch = datastore.data['watching'][uuid]
# First successful commit
watch['title'] = 'First Save'
watch.commit()
# Verify watch.json exists and is valid
watch_json_path = os.path.join(watch.watch_data_dir, 'watch.json')
with open(watch_json_path, 'r') as f:
data = json.load(f) # Should not raise JSONDecodeError
assert data['title'] == 'First Save'
# Second commit - even if interrupted, original file should be intact
# (atomic write uses temp file + rename, so original is never corrupted)
watch['title'] = 'Second Save'
watch.commit()
with open(watch_json_path, 'r') as f:
data = json.load(f)
assert data['title'] == 'Second Save'
def test_multiple_watches_commit_independently(client, live_server):
"""Test that committing one watch doesn't affect others"""
datastore = client.application.config.get('DATASTORE')
# Create multiple watches
uuid1 = datastore.add_watch(url='http://example1.com', extras={'title': 'Watch 1'})
uuid2 = datastore.add_watch(url='http://example2.com', extras={'title': 'Watch 2'})
uuid3 = datastore.add_watch(url='http://example3.com', extras={'title': 'Watch 3'})
watch1 = datastore.data['watching'][uuid1]
watch2 = datastore.data['watching'][uuid2]
watch3 = datastore.data['watching'][uuid3]
# Modify and commit only watch2
watch2['title'] = 'Modified Watch 2'
watch2['paused'] = True
watch2.commit()
# Read all from disk
def read_watch_json(uuid):
watch = datastore.data['watching'][uuid]
path = os.path.join(watch.watch_data_dir, 'watch.json')
with open(path, 'r') as f:
return json.load(f)
data1 = read_watch_json(uuid1)
data2 = read_watch_json(uuid2)
data3 = read_watch_json(uuid3)
# Only watch2 should have changes
assert data1['title'] == 'Watch 1', "Watch 1 should be unchanged"
assert data1['paused'] == False, "Watch 1 should not be paused"
assert data2['title'] == 'Modified Watch 2', "Watch 2 should be modified"
assert data2['paused'] == True, "Watch 2 should be paused"
assert data3['title'] == 'Watch 3', "Watch 3 should be unchanged"
assert data3['paused'] == False, "Watch 3 should not be paused"
# ==============================================================================
# 3. Concurrency/Race Condition Tests
# ==============================================================================
def test_concurrent_watch_commits_dont_corrupt(client, live_server):
"""Test that simultaneous commits to same watch don't corrupt JSON"""
datastore = client.application.config.get('DATASTORE')
uuid = datastore.add_watch(url='http://example.com', extras={'title': 'Test'})
watch = datastore.data['watching'][uuid]
errors = []
def modify_and_commit(field, value):
try:
watch[field] = value
watch.commit()
except Exception as e:
errors.append(e)
# Run 10 concurrent commits
threads = []
for i in range(10):
t = threading.Thread(target=modify_and_commit, args=('title', f'Title {i}'))
threads.append(t)
t.start()
for t in threads:
t.join()
# Should not have any errors
assert len(errors) == 0, f"Expected no errors, got: {errors}"
# JSON file should still be valid (not corrupted)
watch_json_path = os.path.join(watch.watch_data_dir, 'watch.json')
with open(watch_json_path, 'r') as f:
data = json.load(f) # Should not raise JSONDecodeError
assert data['uuid'] == uuid, "UUID should still be correct"
assert 'Title' in data['title'], "Title should contain 'Title'"
def test_concurrent_modifications_during_commit(client, live_server):
"""Test that modifying watch during commit doesn't cause RuntimeError"""
datastore = client.application.config.get('DATASTORE')
uuid = datastore.add_watch(url='http://example.com', extras={'title': 'Test'})
watch = datastore.data['watching'][uuid]
errors = []
stop_flag = threading.Event()
def keep_modifying():
"""Continuously modify watch"""
try:
i = 0
while not stop_flag.is_set():
watch['title'] = f'Title {i}'
watch['paused'] = i % 2 == 0
i += 1
time.sleep(0.001)
except Exception as e:
errors.append(('modifier', e))
def keep_committing():
"""Continuously commit watch"""
try:
for _ in range(20):
watch.commit()
time.sleep(0.005)
except Exception as e:
errors.append(('committer', e))
# Start concurrent modification and commits
modifier = threading.Thread(target=keep_modifying)
committer = threading.Thread(target=keep_committing)
modifier.start()
committer.start()
committer.join()
stop_flag.set()
modifier.join()
# Should not have RuntimeError from dict changing during iteration
runtime_errors = [e for source, e in errors if isinstance(e, RuntimeError)]
assert len(runtime_errors) == 0, f"Should not have RuntimeError, got: {runtime_errors}"
def test_datastore_lock_protects_commit_snapshot(client, live_server):
"""Test that datastore.lock prevents race conditions during deepcopy"""
datastore = client.application.config.get('DATASTORE')
uuid = datastore.add_watch(url='http://example.com', extras={'title': 'Test'})
watch = datastore.data['watching'][uuid]
# Add some complex nested data
watch['browser_steps'] = [
{'operation': 'click', 'selector': '#foo'},
{'operation': 'wait', 'seconds': 5}
]
errors = []
commits_succeeded = [0]
def rapid_commits():
try:
for i in range(50):
watch['title'] = f'Title {i}'
watch.commit()
commits_succeeded[0] += 1
time.sleep(0.001)
except Exception as e:
errors.append(e)
# Multiple threads doing rapid commits
threads = [threading.Thread(target=rapid_commits) for _ in range(3)]
for t in threads:
t.start()
for t in threads:
t.join()
assert len(errors) == 0, f"Expected no errors, got: {errors}"
assert commits_succeeded[0] == 150, f"Expected 150 commits, got {commits_succeeded[0]}"
# Final JSON should be valid
watch_json_path = os.path.join(watch.watch_data_dir, 'watch.json')
with open(watch_json_path, 'r') as f:
data = json.load(f)
assert data['uuid'] == uuid
# ==============================================================================
# 4. Processor Config Separation Tests
# ==============================================================================
def test_processor_config_never_in_watch_json(client, live_server):
"""Test that processor_config_* fields are filtered out of watch.json"""
datastore = client.application.config.get('DATASTORE')
uuid = datastore.add_watch(
url='http://example.com',
extras={
'title': 'Test Watch',
'processor': 'restock_diff'
}
)
watch = datastore.data['watching'][uuid]
# Try to set processor config fields (these should be filtered during commit)
watch['processor_config_price_threshold'] = 10.0
watch['processor_config_some_setting'] = 'value'
watch['processor_config_another'] = {'nested': 'data'}
watch.commit()
# Read watch.json from disk
watch_json_path = os.path.join(watch.watch_data_dir, 'watch.json')
with open(watch_json_path, 'r') as f:
data = json.load(f)
# Verify processor_config_* fields are NOT in watch.json
for key in data.keys():
assert not key.startswith('processor_config_'), \
f"Found {key} in watch.json - processor configs should be in separate file!"
# Normal fields should still be there
assert data['title'] == 'Test Watch'
assert data['processor'] == 'restock_diff'
def test_api_post_saves_processor_config_separately(client, live_server):
"""Test that API POST saves processor configs to {processor}.json"""
import json
from changedetectionio.processors import extract_processor_config_from_form_data
# Get API key
api_key = live_server.app.config['DATASTORE'].data['settings']['application'].get('api_access_token')
# Create watch via API with processor config
response = client.post(
url_for("createwatch"),
data=json.dumps({
'url': 'http://example.com',
'processor': 'restock_diff',
'processor_config_price_threshold': 10.0,
'processor_config_in_stock_only': True
}),
headers={'content-type': 'application/json', 'x-api-key': api_key}
)
assert response.status_code in (200, 201), f"Expected 200/201, got {response.status_code}"
uuid = response.json.get('uuid')
assert uuid, "Should return UUID"
datastore = client.application.config.get('DATASTORE')
watch = datastore.data['watching'][uuid]
# Check that processor config file exists
processor_config_path = os.path.join(watch.watch_data_dir, 'restock_diff.json')
assert os.path.exists(processor_config_path), "Processor config file should exist"
with open(processor_config_path, 'r') as f:
config = json.load(f)
# Verify fields are saved WITHOUT processor_config_ prefix
assert config.get('price_threshold') == 10.0, "Should have price_threshold (no prefix)"
assert config.get('in_stock_only') == True, "Should have in_stock_only (no prefix)"
assert 'processor_config_price_threshold' not in config, "Should NOT have prefixed keys"
def test_api_put_saves_processor_config_separately(client, live_server):
"""Test that API PUT updates processor configs in {processor}.json"""
import json
datastore = client.application.config.get('DATASTORE')
# Get API key
api_key = live_server.app.config['DATASTORE'].data['settings']['application'].get('api_access_token')
# Create watch
uuid = datastore.add_watch(
url='http://example.com',
extras={'processor': 'restock_diff'}
)
# Update via API with processor config
response = client.put(
url_for("watch", uuid=uuid),
data=json.dumps({
'processor_config_price_threshold': 15.0,
'processor_config_min_stock': 5
}),
headers={'content-type': 'application/json', 'x-api-key': api_key}
)
# PUT might return different status codes, 200 or 204 are both OK
assert response.status_code in (200, 204), f"Expected 200/204, got {response.status_code}: {response.data}"
watch = datastore.data['watching'][uuid]
# Check processor config file
processor_config_path = os.path.join(watch.watch_data_dir, 'restock_diff.json')
assert os.path.exists(processor_config_path), "Processor config file should exist"
with open(processor_config_path, 'r') as f:
config = json.load(f)
assert config.get('price_threshold') == 15.0, "Should have updated price_threshold"
assert config.get('min_stock') == 5, "Should have min_stock"
def test_ui_edit_saves_processor_config_separately(client, live_server):
"""Test that processor_config_* fields never appear in watch.json (even from UI)"""
datastore = client.application.config.get('DATASTORE')
# Create watch
uuid = datastore.add_watch(
url='http://example.com',
extras={'processor': 'text_json_diff', 'title': 'Test'}
)
watch = datastore.data['watching'][uuid]
# Simulate someone accidentally trying to set processor_config fields directly
watch['processor_config_should_not_save'] = 'test_value'
watch['processor_config_another_field'] = 123
watch['normal_field'] = 'this_should_save'
watch.commit()
# Check watch.json has NO processor_config_* fields (main point of this test)
watch_json_path = os.path.join(watch.watch_data_dir, 'watch.json')
with open(watch_json_path, 'r') as f:
watch_data = json.load(f)
for key in watch_data.keys():
assert not key.startswith('processor_config_'), \
f"Found {key} in watch.json - processor configs should be filtered during commit"
# Verify normal fields still save
assert watch_data['normal_field'] == 'this_should_save', "Normal fields should save"
assert watch_data['title'] == 'Test', "Original fields should still be there"
def test_browser_steps_normalized_to_empty_list(client, live_server):
"""Test that meaningless browser_steps are normalized to [] during commit"""
datastore = client.application.config.get('DATASTORE')
uuid = datastore.add_watch(url='http://example.com')
watch = datastore.data['watching'][uuid]
# Set browser_steps to meaningless values
watch['browser_steps'] = [
{'operation': 'Choose one', 'selector': ''},
{'operation': 'Goto site', 'selector': ''},
{'operation': '', 'selector': '#foo'}
]
watch.commit()
# Read from disk
watch_json_path = os.path.join(watch.watch_data_dir, 'watch.json')
with open(watch_json_path, 'r') as f:
data = json.load(f)
# Should be normalized to empty list
assert data['browser_steps'] == [], "Meaningless browser_steps should be normalized to []"
# ==============================================================================
# 5. Data Loss Prevention Tests
# ==============================================================================
def test_settings_persist_after_update(client, live_server):
"""Test that settings updates are committed and survive restart"""
from changedetectionio.store import ChangeDetectionStore
datastore = client.application.config.get('DATASTORE')
datastore_path = datastore.datastore_path
# Update settings directly (bypass form validation issues)
datastore.data['settings']['application']['empty_pages_are_a_change'] = True
datastore.data['settings']['application']['fetch_backend'] = 'html_requests'
datastore.data['settings']['requests']['time_between_check']['minutes'] = 120
datastore.commit()
# Simulate restart
datastore2 = ChangeDetectionStore(datastore_path=datastore_path)
datastore2.reload_state(
datastore_path=datastore_path,
include_default_watches=False,
version_tag='test'
)
# Verify settings survived
assert datastore2.data['settings']['application']['empty_pages_are_a_change'] == True, "empty_pages_are_a_change should persist"
assert datastore2.data['settings']['application']['fetch_backend'] == 'html_requests', "fetch_backend should persist"
assert datastore2.data['settings']['requests']['time_between_check']['minutes'] == 120, "time_between_check should persist"
def test_tag_mute_persists(client, live_server):
"""Test that tag mute/unmute operations persist"""
from changedetectionio.store import ChangeDetectionStore
datastore = client.application.config.get('DATASTORE')
datastore_path = datastore.datastore_path
# Add a tag
tag_uuid = datastore.add_tag('Test Tag')
# Mute the tag
response = client.get(url_for("tags.mute", uuid=tag_uuid))
assert response.status_code == 302 # Redirect
# Verify muted in memory
assert datastore.data['settings']['application']['tags'][tag_uuid]['notification_muted'] == True
# Simulate restart
datastore2 = ChangeDetectionStore(datastore_path=datastore_path)
datastore2.reload_state(
datastore_path=datastore_path,
include_default_watches=False,
version_tag='test'
)
# Verify mute state survived
assert tag_uuid in datastore2.data['settings']['application']['tags']
assert datastore2.data['settings']['application']['tags'][tag_uuid]['notification_muted'] == True
def test_tag_delete_removes_from_watches(client, live_server):
"""Test that deleting a tag removes it from all watches"""
datastore = client.application.config.get('DATASTORE')
# Create a tag
tag_uuid = datastore.add_tag('Test Tag')
# Create watches with this tag
uuid1 = datastore.add_watch(url='http://example1.com')
uuid2 = datastore.add_watch(url='http://example2.com')
uuid3 = datastore.add_watch(url='http://example3.com')
watch1 = datastore.data['watching'][uuid1]
watch2 = datastore.data['watching'][uuid2]
watch3 = datastore.data['watching'][uuid3]
watch1['tags'] = [tag_uuid]
watch1.commit()
watch2['tags'] = [tag_uuid, 'other-tag']
watch2.commit()
# watch3 has no tags
# Delete the tag
response = client.get(url_for("tags.delete", uuid=tag_uuid))
assert response.status_code == 302
# Wait for background thread to complete
time.sleep(1)
# Tag should be removed from settings
assert tag_uuid not in datastore.data['settings']['application']['tags']
# Tag should be removed from watches and persisted
def check_watch_tags(uuid):
watch = datastore.data['watching'][uuid]
watch_json_path = os.path.join(watch.watch_data_dir, 'watch.json')
with open(watch_json_path, 'r') as f:
return json.load(f)['tags']
assert tag_uuid not in check_watch_tags(uuid1), "Tag should be removed from watch1"
assert tag_uuid not in check_watch_tags(uuid2), "Tag should be removed from watch2"
assert 'other-tag' in check_watch_tags(uuid2), "Other tags should remain in watch2"
assert check_watch_tags(uuid3) == [], "Watch3 should still have empty tags"
def test_watch_pause_unpause_persists(client, live_server):
"""Test that pause/unpause operations commit and persist"""
datastore = client.application.config.get('DATASTORE')
# Get API key
api_key = live_server.app.config['DATASTORE'].data['settings']['application'].get('api_access_token')
uuid = datastore.add_watch(url='http://example.com')
watch = datastore.data['watching'][uuid]
# Pause via API
response = client.get(url_for("watch", uuid=uuid, paused='paused'), headers={'x-api-key': api_key})
assert response.status_code == 200
# Check persisted to disk
watch_json_path = os.path.join(watch.watch_data_dir, 'watch.json')
with open(watch_json_path, 'r') as f:
data = json.load(f)
assert data['paused'] == True, "Pause should be persisted"
# Unpause
response = client.get(url_for("watch", uuid=uuid, paused='unpaused'), headers={'x-api-key': api_key})
assert response.status_code == 200
with open(watch_json_path, 'r') as f:
data = json.load(f)
assert data['paused'] == False, "Unpause should be persisted"
def test_watch_mute_unmute_persists(client, live_server):
"""Test that mute/unmute operations commit and persist"""
datastore = client.application.config.get('DATASTORE')
# Get API key
api_key = live_server.app.config['DATASTORE'].data['settings']['application'].get('api_access_token')
uuid = datastore.add_watch(url='http://example.com')
watch = datastore.data['watching'][uuid]
# Mute via API
response = client.get(url_for("watch", uuid=uuid, muted='muted'), headers={'x-api-key': api_key})
assert response.status_code == 200
# Check persisted to disk
watch_json_path = os.path.join(watch.watch_data_dir, 'watch.json')
with open(watch_json_path, 'r') as f:
data = json.load(f)
assert data['notification_muted'] == True, "Mute should be persisted"
# Unmute
response = client.get(url_for("watch", uuid=uuid, muted='unmuted'), headers={'x-api-key': api_key})
assert response.status_code == 200
with open(watch_json_path, 'r') as f:
data = json.load(f)
assert data['notification_muted'] == False, "Unmute should be persisted"
def test_ui_watch_edit_persists_all_fields(client, live_server):
"""Test that UI watch edit form persists all modified fields"""
from changedetectionio.store import ChangeDetectionStore
datastore = client.application.config.get('DATASTORE')
datastore_path = datastore.datastore_path
# Create watch
uuid = datastore.add_watch(url='http://example.com')
# Edit via UI with multiple field changes
response = client.post(
url_for("ui.ui_edit.edit_page", uuid=uuid),
data={
'url': 'http://updated-example.com',
'title': 'Updated Watch Title',
'time_between_check-hours': '2',
'time_between_check-minutes': '30',
'include_filters': '#content',
'fetch_backend': 'html_requests',
'method': 'POST',
'ignore_text': 'Advertisement\nTracking'
},
follow_redirects=True
)
assert b"Updated watch" in response.data or b"Saved" in response.data
# Simulate restart
datastore2 = ChangeDetectionStore(datastore_path=datastore_path)
datastore2.reload_state(
datastore_path=datastore_path,
include_default_watches=False,
version_tag='test'
)
# Verify all fields survived
watch = datastore2.data['watching'][uuid]
assert watch['url'] == 'http://updated-example.com'
assert watch['title'] == 'Updated Watch Title'
assert watch['time_between_check']['hours'] == 2
assert watch['time_between_check']['minutes'] == 30
assert watch['fetch_backend'] == 'html_requests'
assert watch['method'] == 'POST'

View File

@@ -161,11 +161,6 @@ def extract_UUID_from_client(client):
def delete_all_watches(client=None): def delete_all_watches(client=None):
# Change tracking
client.application.config.get('DATASTORE')._dirty_watches = set() # Watch UUIDs that need saving
client.application.config.get('DATASTORE')._dirty_settings = False # Settings changed
client.application.config.get('DATASTORE')._watch_hashes = {} # UUID -> SHA256 hash for change detection
uuids = list(client.application.config.get('DATASTORE').data['watching']) uuids = list(client.application.config.get('DATASTORE').data['watching'])
for uuid in uuids: for uuid in uuids:
client.application.config.get('DATASTORE').delete(uuid) client.application.config.get('DATASTORE').delete(uuid)