Compare commits

...

3 Commits

Author SHA1 Message Date
dgtlmoon
b5b5e8d3e4 API - Import - Ability to set any watch value as HTTP URL Query value, for example ?processor=restock_diff&time_between_check={'hours':24} Re #3845 2026-02-11 07:07:02 +01:00
dgtlmoon
6e90a0bbd1 UI - Bulk checkbox operations modal confirmation fix Re #3853 2026-02-11 06:29:59 +01:00
dgtlmoon
987789425d Tags update fix (#3849)
Some checks failed
Build and push containers / metadata (push) Has been cancelled
Build and push containers / build-push-containers (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Build distribution 📦 (push) Has been cancelled
ChangeDetection.io App Test / lint-code (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Test the built package works basically. (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Publish Python 🐍 distribution 📦 to PyPI (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-10 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-11 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-12 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-13 (push) Has been cancelled
2026-02-07 17:13:41 +01:00
9 changed files with 338 additions and 98 deletions

View File

@@ -605,7 +605,7 @@ def main():
return dict(right_sticky="v{}".format(datastore.data['version_tag']),
new_version_available=app.config['NEW_VERSION_AVAILABLE'],
has_password=datastore.data['settings']['application']['password'] != False,
socket_io_enabled=datastore.data['settings']['application']['ui'].get('socket_io_enabled', True),
socket_io_enabled=datastore.data['settings']['application'].get('ui', {}).get('socket_io_enabled', True),
all_paused=datastore.data['settings']['application'].get('all_paused', False),
all_muted=datastore.data['settings']['application'].get('all_muted', False)
)

View File

@@ -2,8 +2,9 @@ from changedetectionio.strtobool import strtobool
from flask_restful import abort, Resource
from flask import request
from functools import wraps
from . import auth, validate_openapi_request
from . import auth, validate_openapi_request, schema_create_watch
from ..validate_url import is_safe_valid_url
import json
def default_content_type(content_type='text/plain'):
@@ -19,6 +20,62 @@ def default_content_type(content_type='text/plain'):
return decorator
def convert_query_param_to_type(value, schema_property):
"""
Convert a query parameter string to the appropriate type based on schema definition.
Args:
value: String value from query parameter
schema_property: Schema property definition with 'type' or 'anyOf' field
Returns:
Converted value in the appropriate type
"""
# Handle anyOf schemas (extract the first type)
if 'anyOf' in schema_property:
# Use the first non-null type from anyOf
for option in schema_property['anyOf']:
if option.get('type') and option.get('type') != 'null':
prop_type = option.get('type')
break
else:
prop_type = None
else:
prop_type = schema_property.get('type')
# Handle array type (e.g., notification_urls)
if prop_type == 'array':
# Support both comma-separated and JSON array format
if value.startswith('['):
try:
return json.loads(value)
except json.JSONDecodeError:
return [v.strip() for v in value.split(',')]
return [v.strip() for v in value.split(',')]
# Handle object type (e.g., time_between_check, headers)
elif prop_type == 'object':
try:
return json.loads(value)
except json.JSONDecodeError:
raise ValueError(f"Invalid JSON object for field: {value}")
# Handle boolean type
elif prop_type == 'boolean':
return strtobool(value)
# Handle integer type
elif prop_type == 'integer':
return int(value)
# Handle number type (float)
elif prop_type == 'number':
return float(value)
# Default: return as string
return value
class Import(Resource):
def __init__(self, **kwargs):
# datastore is a black box dependency
@@ -28,25 +85,79 @@ class Import(Resource):
@default_content_type('text/plain') #3547 #3542
@validate_openapi_request('importWatches')
def post(self):
"""Import a list of watched URLs."""
"""Import a list of watched URLs with optional watch configuration."""
# Special parameters that are NOT watch configuration
special_params = {'tag', 'tag_uuids', 'dedupe', 'proxy'}
extras = {}
# Handle special 'proxy' parameter
if request.args.get('proxy'):
plist = self.datastore.proxy_list
if not request.args.get('proxy') in plist:
return "Invalid proxy choice, currently supported proxies are '{}'".format(', '.join(plist)), 400
proxy_list_str = ', '.join(plist) if plist else 'none configured'
return f"Invalid proxy choice, currently supported proxies are '{proxy_list_str}'", 400
else:
extras['proxy'] = request.args.get('proxy')
# Handle special 'dedupe' parameter
dedupe = strtobool(request.args.get('dedupe', 'true'))
# Handle special 'tag' and 'tag_uuids' parameters
tags = request.args.get('tag')
tag_uuids = request.args.get('tag_uuids')
if tag_uuids:
tag_uuids = tag_uuids.split(',')
# Extract ALL other query parameters as watch configuration
schema_properties = schema_create_watch.get('properties', {})
for param_name, param_value in request.args.items():
# Skip special parameters
if param_name in special_params:
continue
# Skip if not in schema (unknown parameter)
if param_name not in schema_properties:
return f"Unknown watch configuration parameter: {param_name}", 400
# Convert to appropriate type based on schema
try:
converted_value = convert_query_param_to_type(param_value, schema_properties[param_name])
extras[param_name] = converted_value
except (ValueError, json.JSONDecodeError) as e:
return f"Invalid value for parameter '{param_name}': {str(e)}", 400
# Validate processor if provided
if 'processor' in extras:
from changedetectionio.processors import available_processors
available = [p[0] for p in available_processors()]
if extras['processor'] not in available:
return f"Invalid processor '{extras['processor']}'. Available processors: {', '.join(available)}", 400
# Validate fetch_backend if provided
if 'fetch_backend' in extras:
from changedetectionio.content_fetchers import available_fetchers
available = [f[0] for f in available_fetchers()]
# Also allow 'system' and extra_browser_* patterns
is_valid = (
extras['fetch_backend'] == 'system' or
extras['fetch_backend'] in available or
extras['fetch_backend'].startswith('extra_browser_')
)
if not is_valid:
return f"Invalid fetch_backend '{extras['fetch_backend']}'. Available: system, {', '.join(available)}", 400
# Validate notification_urls if provided
if 'notification_urls' in extras:
from wtforms import ValidationError
from changedetectionio.api.Notifications import validate_notification_urls
try:
validate_notification_urls(extras['notification_urls'])
except ValidationError as e:
return f"Invalid notification_urls: {str(e)}", 400
urls = request.get_data().decode('utf8').splitlines()
added = []
for url in urls:
@@ -54,13 +165,15 @@ class Import(Resource):
if not len(url):
continue
# If hosts that only contain alphanumerics are allowed ("localhost" for example)
# Validate URL
if not is_safe_valid_url(url):
return f"Invalid or unsupported URL - {url}", 400
# Check for duplicates if dedupe is enabled
if dedupe and self.datastore.url_exists(url):
continue
# Create watch with extras configuration
new_uuid = self.datastore.add_watch(url=url, extras=extras, tag=tags, tag_uuids=tag_uuids)
added.append(new_uuid)

View File

@@ -848,7 +848,7 @@ def changedetection_app(config=None, datastore_o=None):
app.register_blueprint(watchlist.construct_blueprint(datastore=datastore, update_q=update_q, queuedWatchMetaData=queuedWatchMetaData), url_prefix='')
# Initialize Socket.IO server conditionally based on settings
socket_io_enabled = datastore.data['settings']['application']['ui'].get('socket_io_enabled', True)
socket_io_enabled = datastore.data['settings']['application'].get('ui', {}).get('socket_io_enabled', True)
if socket_io_enabled and app.config.get('batch_mode'):
socket_io_enabled = False
if socket_io_enabled:

View File

@@ -510,7 +510,12 @@ class watch_base(dict):
# Save to disk via subclass implementation
try:
# Determine entity type from module name (Watch.py -> watch, Tag.py -> tag)
from changedetectionio.model.persistence import _determine_entity_type
entity_type = _determine_entity_type(self.__class__)
filename = f"{entity_type}.json"
self._save_to_disk(data_dict, uuid)
logger.debug(f"Committed {self.__class__.__name__.lower()} {uuid}")
logger.debug(f"Committed {entity_type} {uuid} to {uuid}/{filename}")
except Exception as e:
logger.error(f"Failed to commit {uuid}: {e}")

View File

@@ -184,7 +184,8 @@ $(document).ready(function() {
}
// If it's a button in a form, submit the form
else if ($element.is('button')) {
$element.closest('form').submit();
// Use requestSubmit() to include the button's name/value in the form data
$element.closest('form')[0].requestSubmit($element[0]);
}
}
};

View File

@@ -123,10 +123,17 @@ class ChangeDetectionStore(DatastoreUpdatesMixin, FileSavingDataStore):
self.__data['settings']['application'].update(settings_data['settings']['application'])
def _rehydrate_tags(self):
"""Rehydrate tag entities from stored data."""
"""Rehydrate tag entities from stored data into Tag objects with restock_diff processor."""
from ..model import Tag
for uuid, tag in self.__data['settings']['application']['tags'].items():
self.__data['settings']['application']['tags'][uuid] = self.rehydrate_entity(
uuid, tag, processor_override='restock_diff'
# Force processor to restock_diff for override functionality (technical debt)
tag['processor'] = 'restock_diff'
self.__data['settings']['application']['tags'][uuid] = Tag.model(
datastore_path=self.datastore_path,
__datastore=self.__data,
default=tag
)
logger.info(f"Tag: {uuid} {tag['title']}")
@@ -236,8 +243,32 @@ class ChangeDetectionStore(DatastoreUpdatesMixin, FileSavingDataStore):
if not legacy_data:
raise Exception("Failed to load legacy datastore from url-watches.json")
# Store the loaded data
self.__data = legacy_data
# Merge legacy data with base_config defaults (preserves new fields like 'ui')
# self.__data already has App.model() defaults from line 190
logger.info("Merging legacy data with base_config defaults...")
# Apply top-level fields from legacy data
if 'app_guid' in legacy_data:
self.__data['app_guid'] = legacy_data['app_guid']
if 'build_sha' in legacy_data:
self.__data['build_sha'] = legacy_data['build_sha']
if 'version_tag' in legacy_data:
self.__data['version_tag'] = legacy_data['version_tag']
# Apply watching data (complete replacement as these are user's watches)
if 'watching' in legacy_data:
self.__data['watching'] = legacy_data['watching']
# Merge settings sections (preserves base_config defaults for missing fields)
if 'settings' in legacy_data:
if 'headers' in legacy_data['settings']:
self.__data['settings']['headers'].update(legacy_data['settings']['headers'])
if 'requests' in legacy_data['settings']:
self.__data['settings']['requests'].update(legacy_data['settings']['requests'])
if 'application' in legacy_data['settings']:
# CRITICAL: Use .update() to merge, not replace
# This preserves new fields like 'ui' that exist in base_config
self.__data['settings']['application'].update(legacy_data['settings']['application'])
# CRITICAL: Rehydrate watches from dicts into Watch objects
# This ensures watches have their methods available during migration
@@ -340,20 +371,25 @@ class ChangeDetectionStore(DatastoreUpdatesMixin, FileSavingDataStore):
"""
Build settings data structure for saving.
Tags are excluded - they are stored in individual {uuid}/tag.json files.
This keeps changedetection.json small and allows atomic tag updates.
Tags behavior depends on schema version:
- Before update_28 (schema < 28): Tags saved in settings for migration
- After update_28 (schema >= 28): Tags excluded from settings (in individual files)
Returns:
dict: Settings data ready for serialization (without tags)
dict: Settings data ready for serialization
"""
import copy
# Deep copy settings to avoid modifying the original
settings_copy = copy.deepcopy(self.__data['settings'])
# Replace tags dict with empty dict (tags are in individual tag.json files)
# We keep the empty dict for backwards compatibility and clear structure
settings_copy['application']['tags'] = {}
# Only exclude tags if we've already migrated them to individual files (schema >= 28)
# This ensures update_28 can migrate tags from settings
schema_version = self.__data['settings']['application'].get('schema_version', 0)
if schema_version >= 28:
# Tags are in individual tag.json files, don't save to settings
settings_copy['application']['tags'] = {}
# else: keep tags in settings for update_28 migration
return {
'note': 'Settings file - watches are in {uuid}/watch.json, tags are in {uuid}/tag.json',
@@ -403,9 +439,22 @@ class ChangeDetectionStore(DatastoreUpdatesMixin, FileSavingDataStore):
File backend implementation: reads individual tag.json files.
Tags loaded from files override any tags in settings (migration path).
"""
from ..model import Tag
def rehydrate_tag(uuid, entity_dict):
"""Rehydrate tag as Tag object with forced restock_diff processor."""
entity_dict['uuid'] = uuid
entity_dict['processor'] = 'restock_diff' # Force processor for override functionality
return Tag.model(
datastore_path=self.datastore_path,
__datastore=self.__data,
default=entity_dict
)
tags = load_all_tags(
self.datastore_path,
self.rehydrate_entity
rehydrate_tag
)
# Override settings tags with loaded tags

View File

@@ -227,8 +227,7 @@ def load_watch_from_file(watch_json, uuid, rehydrate_entity_func):
rehydrate_entity_func: Function to convert dict to Watch object
Returns:
Tuple of (Watch object, raw_data_dict) or (None, None) if failed
The raw_data_dict is needed to compute the hash before rehydration
Watch object or None if failed
"""
try:
# Check file size before reading
@@ -241,7 +240,7 @@ def load_watch_from_file(watch_json, uuid, rehydrate_entity_func):
f"File: {watch_json}. This indicates a bug or data corruption. "
f"Watch will be skipped."
)
return None, None
return None
if HAS_ORJSON:
with open(watch_json, 'rb') as f:
@@ -250,10 +249,9 @@ def load_watch_from_file(watch_json, uuid, rehydrate_entity_func):
with open(watch_json, 'r', encoding='utf-8') as f:
watch_data = json.load(f)
# Return both the raw data and the rehydrated watch
# Raw data is needed to compute hash before rehydration changes anything
# Rehydrate and return watch object
watch_obj = rehydrate_entity_func(uuid, watch_data)
return watch_obj, watch_data
return watch_obj
except json.JSONDecodeError as e:
logger.critical(
@@ -261,7 +259,7 @@ def load_watch_from_file(watch_json, uuid, rehydrate_entity_func):
f"File: {watch_json}. Error: {e}. "
f"Watch will be skipped and may need manual recovery from backup."
)
return None, None
return None
except ValueError as e:
# orjson raises ValueError for invalid JSON
if "invalid json" in str(e).lower() or HAS_ORJSON:
@@ -270,15 +268,15 @@ def load_watch_from_file(watch_json, uuid, rehydrate_entity_func):
f"File: {watch_json}. Error: {e}. "
f"Watch will be skipped and may need manual recovery from backup."
)
return None, None
return None
# Re-raise if it's not a JSON parsing error
raise
except FileNotFoundError:
logger.error(f"Watch file not found: {watch_json} for watch {uuid}")
return None, None
return None
except Exception as e:
logger.error(f"Failed to load watch {uuid} from {watch_json}: {e}")
return None, None
return None
def load_all_watches(datastore_path, rehydrate_entity_func):
@@ -318,8 +316,8 @@ def load_all_watches(datastore_path, rehydrate_entity_func):
for watch_json in watch_files:
# Extract UUID from path: /datastore/{uuid}/watch.json
uuid_dir = os.path.basename(os.path.dirname(watch_json))
watch, raw_data = load_watch_from_file(watch_json, uuid_dir, rehydrate_entity_func)
if watch and raw_data:
watch = load_watch_from_file(watch_json, uuid_dir, rehydrate_entity_func)
if watch:
watching[uuid_dir] = watch
loaded += 1
@@ -375,8 +373,10 @@ def load_tag_from_file(tag_json, uuid, rehydrate_entity_func):
with open(tag_json, 'r', encoding='utf-8') as f:
tag_data = json.load(f)
tag_data['processor'] = 'restock_diff'
# Rehydrate tag (convert dict to Tag object)
tag_obj = rehydrate_entity_func(uuid, tag_data, processor_override='restock_diff')
# processor_override is set inside the rehydration function
tag_obj = rehydrate_entity_func(uuid, tag_data)
return tag_obj
except json.JSONDecodeError as e:

View File

@@ -154,10 +154,10 @@ class DatastoreUpdatesMixin:
2. For each update > current schema version:
- Create backup of datastore
- Run update method
- Update schema version
- Mark settings and watches dirty
- Update schema version and commit settings
- Commit all watches and tags
3. If any update fails, stop processing
4. Save all changes immediately
4. All changes saved via individual .commit() calls
"""
updates_available = self.get_updates_available()
@@ -206,39 +206,11 @@ class DatastoreUpdatesMixin:
# Don't run any more updates
return
else:
# Bump the version, important
# Bump the version
self.data['settings']['application']['schema_version'] = update_n
self.commit()
# CRITICAL: Save all watches so changes are persisted
# Most updates modify watches, and in the new individual watch.json structure,
# we need to ensure those changes are saved
logger.info(f"Saving all {len(self.data['watching'])} watches after update_{update_n} (so that it saves them to disk)")
for uuid in self.data['watching'].keys():
self.data['watching'][uuid].commit()
# CRITICAL: Save all tags so changes are persisted
# After update_27, tags have individual tag.json files
# For updates before update_27, this will fail silently (tags don't have commit() yet)
tags = self.data['settings']['application'].get('tags', {})
if tags and update_n >= 27:
logger.info(f"Saving all {len(tags)} tags after update_{update_n}")
for uuid in tags.keys():
try:
tags[uuid].commit()
except AttributeError:
# Tag doesn't have commit() method yet (pre-update_27)
pass
# Save changes immediately after each update (more resilient than batching)
logger.critical(f"Saving all changes after update_{update_n}")
try:
self._save_dirty_items()
logger.success(f"Update {update_n} changes saved successfully")
except Exception as e:
logger.error(f"Failed to save update_{update_n} changes: {e}")
# Don't raise - update already ran, but changes might not be persisted
# The update will try to run again on next startup
logger.success(f"Update {update_n} completed")
# Track which updates ran
updates_ran.append(update_n)
@@ -488,6 +460,14 @@ class DatastoreUpdatesMixin:
del self.data['watching'][uuid]['extract_title_as_title']
if self.data['settings']['application'].get('extract_title_as_title'):
# Ensure 'ui' key exists (defensive for edge cases where base_config merge didn't happen)
if 'ui' not in self.data['settings']['application']:
self.data['settings']['application']['ui'] = {
'use_page_title_in_list': True,
'open_diff_in_new_tab': True,
'socket_io_enabled': True,
'favicons_enabled': True
}
self.data['settings']['application']['ui']['use_page_title_in_list'] = self.data['settings']['application'].get('extract_title_as_title')
def update_21(self):
@@ -668,23 +648,6 @@ class DatastoreUpdatesMixin:
logger.critical("Reloading datastore from new format...")
self._load_state() # Includes load_watches
logger.success("Datastore reloaded from new format successfully")
# Verify all watches have hashes after migration
missing_hashes = [uuid for uuid in self.data['watching'].keys() if uuid not in self._watch_hashes]
if missing_hashes:
logger.error(f"WARNING: {len(missing_hashes)} watches missing hashes after migration: {missing_hashes[:5]}")
else:
logger.success(f"All {len(self.data['watching'])} watches have valid hashes after migration")
# Set schema version to latest available update
# This prevents re-running updates and re-marking all watches as dirty
updates_available = self.get_updates_available()
latest_schema = updates_available[-1] if updates_available else 26
self.data['settings']['application']['schema_version'] = latest_schema
self.commit()
logger.info(f"Set schema_version to {latest_schema} (migration complete, all watches already saved)")
logger.critical("=" * 80)
logger.critical("MIGRATION COMPLETED SUCCESSFULLY!")
logger.critical("=" * 80)
@@ -705,22 +668,22 @@ class DatastoreUpdatesMixin:
def update_26(self):
self.migrate_legacy_db_format()
def update_27(self):
def update_28(self):
"""
Migrate tags to individual tag.json files.
Tags are currently saved as part of changedetection.json (settings).
This migration moves them to individual {uuid}/tag.json files,
similar to how watches are stored.
Tags are currently saved only in changedetection.json (settings).
This migration ALSO saves them to individual {uuid}/tag.json files,
similar to how watches are stored (dual storage).
Benefits:
- Reduces changedetection.json size
- Allows atomic tag updates without rewriting entire settings
- Enables independent tag versioning/backup
- Maintains backwards compatibility (tags stay in settings too)
"""
logger.critical("=" * 80)
logger.critical("Running migration: Individual tag persistence (update_27)")
logger.critical("Moving tags from settings to individual tag.json files")
logger.critical("Running migration: Individual tag persistence (update_28)")
logger.critical("Creating individual tag.json files (tags remain in settings too)")
logger.critical("=" * 80)
tags = self.data['settings']['application'].get('tags', {})
@@ -735,17 +698,34 @@ class DatastoreUpdatesMixin:
saved_count = 0
failed_count = 0
for uuid, tag in tags.items():
for uuid, tag_data in tags.items():
try:
# Save tag to its own file
tag.commit()
# Force save as tag.json (not watch.json) even if object is corrupted
from changedetectionio.store.file_saving_datastore import save_entity_atomic
import os
tag_dir = os.path.join(self.datastore_path, uuid)
os.makedirs(tag_dir, exist_ok=True)
# Convert to dict if it's an object
tag_dict = dict(tag_data) if hasattr(tag_data, '__iter__') else tag_data
# Save explicitly as tag.json
save_entity_atomic(
tag_dir,
uuid,
tag_dict,
filename='tag.json',
entity_type='tag',
max_size_mb=1
)
saved_count += 1
if saved_count % 10 == 0:
logger.info(f" Progress: {saved_count}/{tag_count} tags migrated...")
except Exception as e:
logger.error(f"Failed to save tag {uuid} ({tag.get('title', 'unknown')}): {e}")
logger.error(f"Failed to save tag {uuid} ({tag_data.get('title', 'unknown')}): {e}")
failed_count += 1
if failed_count > 0:
@@ -753,9 +733,9 @@ class DatastoreUpdatesMixin:
else:
logger.success(f"Migration complete: {saved_count} tags saved to individual tag.json files")
# Tags remain in settings for backwards compatibility
# On next load, _load_tags() will read from tag.json files and override settings
logger.info("Tags remain in settings for backwards compatibility")
logger.info("Future tag edits will save to tag.json files only")
# Tags remain in settings for backwards compatibility AND easy access
# On next load, _load_tags() will read from tag.json files and merge with settings
logger.info("Tags saved to both settings AND individual tag.json files")
logger.info("Future tag edits will update both locations (dual storage)")
logger.critical("=" * 80)

View File

@@ -489,6 +489,7 @@ def test_api_import(client, live_server, measure_memory_usage, datastore_path):
api_key = live_server.app.config['DATASTORE'].data['settings']['application'].get('api_access_token')
# Test 1: Basic import with tag
res = client.post(
url_for("import") + "?tag=import-test",
data='https://website1.com\r\nhttps://website2.com',
@@ -507,6 +508,97 @@ def test_api_import(client, live_server, measure_memory_usage, datastore_path):
res = client.get(url_for('tags.tags_overview_page'))
assert b'import-test' in res.data
# Test 2: Import with watch configuration fields (issue #3845)
# Test string field (include_filters), boolean (paused), and processor
import urllib.parse
params = urllib.parse.urlencode({
'tag': 'config-test',
'include_filters': 'div.content',
'paused': 'true',
'processor': 'text_json_diff',
'title': 'Imported with Config'
})
res = client.post(
url_for("import") + "?" + params,
data='https://website3.com',
headers={'x-api-key': api_key},
follow_redirects=True
)
assert res.status_code == 200
assert len(res.json) == 1
uuid = res.json[0]
# Verify the configuration was applied
watch = live_server.app.config['DATASTORE'].data['watching'][uuid]
assert watch['include_filters'] == ['div.content'], "include_filters should be set as array"
assert watch['paused'] == True, "paused should be True"
assert watch['processor'] == 'text_json_diff', "processor should be set"
assert watch['title'] == 'Imported with Config', "title should be set"
# Test 3: Import with array field (notification_urls) - using valid Apprise format
params = urllib.parse.urlencode({
'tag': 'notification-test',
'notification_urls': 'mailto://test@example.com,mailto://admin@example.com'
})
res = client.post(
url_for("import") + "?" + params,
data='https://website4.com',
headers={'x-api-key': api_key},
follow_redirects=True
)
assert res.status_code == 200
uuid = res.json[0]
watch = live_server.app.config['DATASTORE'].data['watching'][uuid]
assert 'mailto://test@example.com' in watch['notification_urls'], "notification_urls should contain first email"
assert 'mailto://admin@example.com' in watch['notification_urls'], "notification_urls should contain second email"
# Test 4: Import with object field (time_between_check)
import json
time_config = json.dumps({"hours": 2, "minutes": 30})
params = urllib.parse.urlencode({
'tag': 'schedule-test',
'time_between_check': time_config
})
res = client.post(
url_for("import") + "?" + params,
data='https://website5.com',
headers={'x-api-key': api_key},
follow_redirects=True
)
assert res.status_code == 200
uuid = res.json[0]
watch = live_server.app.config['DATASTORE'].data['watching'][uuid]
assert watch['time_between_check']['hours'] == 2, "time_between_check hours should be 2"
assert watch['time_between_check']['minutes'] == 30, "time_between_check minutes should be 30"
# Test 5: Import with invalid processor (should fail)
res = client.post(
url_for("import") + "?processor=invalid_processor",
data='https://website6.com',
headers={'x-api-key': api_key},
follow_redirects=True
)
assert res.status_code == 400, "Should reject invalid processor"
assert b"Invalid processor" in res.data, "Error message should mention invalid processor"
# Test 6: Import with invalid field (should fail)
res = client.post(
url_for("import") + "?unknown_field=value",
data='https://website7.com',
headers={'x-api-key': api_key},
follow_redirects=True
)
assert res.status_code == 400, "Should reject unknown field"
assert b"Unknown watch configuration parameter" in res.data, "Error message should mention unknown parameter"
def test_api_conflict_UI_password(client, live_server, measure_memory_usage, datastore_path):