mirror of
https://github.com/dgtlmoon/changedetection.io.git
synced 2026-02-11 08:46:14 +00:00
Compare commits
3 Commits
tags-updat
...
3845-impor
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b5b5e8d3e4 | ||
|
|
6e90a0bbd1 | ||
|
|
987789425d |
@@ -2,8 +2,9 @@ from changedetectionio.strtobool import strtobool
|
||||
from flask_restful import abort, Resource
|
||||
from flask import request
|
||||
from functools import wraps
|
||||
from . import auth, validate_openapi_request
|
||||
from . import auth, validate_openapi_request, schema_create_watch
|
||||
from ..validate_url import is_safe_valid_url
|
||||
import json
|
||||
|
||||
|
||||
def default_content_type(content_type='text/plain'):
|
||||
@@ -19,6 +20,62 @@ def default_content_type(content_type='text/plain'):
|
||||
return decorator
|
||||
|
||||
|
||||
def convert_query_param_to_type(value, schema_property):
|
||||
"""
|
||||
Convert a query parameter string to the appropriate type based on schema definition.
|
||||
|
||||
Args:
|
||||
value: String value from query parameter
|
||||
schema_property: Schema property definition with 'type' or 'anyOf' field
|
||||
|
||||
Returns:
|
||||
Converted value in the appropriate type
|
||||
"""
|
||||
# Handle anyOf schemas (extract the first type)
|
||||
if 'anyOf' in schema_property:
|
||||
# Use the first non-null type from anyOf
|
||||
for option in schema_property['anyOf']:
|
||||
if option.get('type') and option.get('type') != 'null':
|
||||
prop_type = option.get('type')
|
||||
break
|
||||
else:
|
||||
prop_type = None
|
||||
else:
|
||||
prop_type = schema_property.get('type')
|
||||
|
||||
# Handle array type (e.g., notification_urls)
|
||||
if prop_type == 'array':
|
||||
# Support both comma-separated and JSON array format
|
||||
if value.startswith('['):
|
||||
try:
|
||||
return json.loads(value)
|
||||
except json.JSONDecodeError:
|
||||
return [v.strip() for v in value.split(',')]
|
||||
return [v.strip() for v in value.split(',')]
|
||||
|
||||
# Handle object type (e.g., time_between_check, headers)
|
||||
elif prop_type == 'object':
|
||||
try:
|
||||
return json.loads(value)
|
||||
except json.JSONDecodeError:
|
||||
raise ValueError(f"Invalid JSON object for field: {value}")
|
||||
|
||||
# Handle boolean type
|
||||
elif prop_type == 'boolean':
|
||||
return strtobool(value)
|
||||
|
||||
# Handle integer type
|
||||
elif prop_type == 'integer':
|
||||
return int(value)
|
||||
|
||||
# Handle number type (float)
|
||||
elif prop_type == 'number':
|
||||
return float(value)
|
||||
|
||||
# Default: return as string
|
||||
return value
|
||||
|
||||
|
||||
class Import(Resource):
|
||||
def __init__(self, **kwargs):
|
||||
# datastore is a black box dependency
|
||||
@@ -28,25 +85,79 @@ class Import(Resource):
|
||||
@default_content_type('text/plain') #3547 #3542
|
||||
@validate_openapi_request('importWatches')
|
||||
def post(self):
|
||||
"""Import a list of watched URLs."""
|
||||
"""Import a list of watched URLs with optional watch configuration."""
|
||||
|
||||
# Special parameters that are NOT watch configuration
|
||||
special_params = {'tag', 'tag_uuids', 'dedupe', 'proxy'}
|
||||
|
||||
extras = {}
|
||||
|
||||
# Handle special 'proxy' parameter
|
||||
if request.args.get('proxy'):
|
||||
plist = self.datastore.proxy_list
|
||||
if not request.args.get('proxy') in plist:
|
||||
return "Invalid proxy choice, currently supported proxies are '{}'".format(', '.join(plist)), 400
|
||||
proxy_list_str = ', '.join(plist) if plist else 'none configured'
|
||||
return f"Invalid proxy choice, currently supported proxies are '{proxy_list_str}'", 400
|
||||
else:
|
||||
extras['proxy'] = request.args.get('proxy')
|
||||
|
||||
# Handle special 'dedupe' parameter
|
||||
dedupe = strtobool(request.args.get('dedupe', 'true'))
|
||||
|
||||
# Handle special 'tag' and 'tag_uuids' parameters
|
||||
tags = request.args.get('tag')
|
||||
tag_uuids = request.args.get('tag_uuids')
|
||||
|
||||
if tag_uuids:
|
||||
tag_uuids = tag_uuids.split(',')
|
||||
|
||||
# Extract ALL other query parameters as watch configuration
|
||||
schema_properties = schema_create_watch.get('properties', {})
|
||||
for param_name, param_value in request.args.items():
|
||||
# Skip special parameters
|
||||
if param_name in special_params:
|
||||
continue
|
||||
|
||||
# Skip if not in schema (unknown parameter)
|
||||
if param_name not in schema_properties:
|
||||
return f"Unknown watch configuration parameter: {param_name}", 400
|
||||
|
||||
# Convert to appropriate type based on schema
|
||||
try:
|
||||
converted_value = convert_query_param_to_type(param_value, schema_properties[param_name])
|
||||
extras[param_name] = converted_value
|
||||
except (ValueError, json.JSONDecodeError) as e:
|
||||
return f"Invalid value for parameter '{param_name}': {str(e)}", 400
|
||||
|
||||
# Validate processor if provided
|
||||
if 'processor' in extras:
|
||||
from changedetectionio.processors import available_processors
|
||||
available = [p[0] for p in available_processors()]
|
||||
if extras['processor'] not in available:
|
||||
return f"Invalid processor '{extras['processor']}'. Available processors: {', '.join(available)}", 400
|
||||
|
||||
# Validate fetch_backend if provided
|
||||
if 'fetch_backend' in extras:
|
||||
from changedetectionio.content_fetchers import available_fetchers
|
||||
available = [f[0] for f in available_fetchers()]
|
||||
# Also allow 'system' and extra_browser_* patterns
|
||||
is_valid = (
|
||||
extras['fetch_backend'] == 'system' or
|
||||
extras['fetch_backend'] in available or
|
||||
extras['fetch_backend'].startswith('extra_browser_')
|
||||
)
|
||||
if not is_valid:
|
||||
return f"Invalid fetch_backend '{extras['fetch_backend']}'. Available: system, {', '.join(available)}", 400
|
||||
|
||||
# Validate notification_urls if provided
|
||||
if 'notification_urls' in extras:
|
||||
from wtforms import ValidationError
|
||||
from changedetectionio.api.Notifications import validate_notification_urls
|
||||
try:
|
||||
validate_notification_urls(extras['notification_urls'])
|
||||
except ValidationError as e:
|
||||
return f"Invalid notification_urls: {str(e)}", 400
|
||||
|
||||
urls = request.get_data().decode('utf8').splitlines()
|
||||
added = []
|
||||
for url in urls:
|
||||
@@ -54,13 +165,15 @@ class Import(Resource):
|
||||
if not len(url):
|
||||
continue
|
||||
|
||||
# If hosts that only contain alphanumerics are allowed ("localhost" for example)
|
||||
# Validate URL
|
||||
if not is_safe_valid_url(url):
|
||||
return f"Invalid or unsupported URL - {url}", 400
|
||||
|
||||
# Check for duplicates if dedupe is enabled
|
||||
if dedupe and self.datastore.url_exists(url):
|
||||
continue
|
||||
|
||||
# Create watch with extras configuration
|
||||
new_uuid = self.datastore.add_watch(url=url, extras=extras, tag=tags, tag_uuids=tag_uuids)
|
||||
added.append(new_uuid)
|
||||
|
||||
|
||||
@@ -184,7 +184,8 @@ $(document).ready(function() {
|
||||
}
|
||||
// If it's a button in a form, submit the form
|
||||
else if ($element.is('button')) {
|
||||
$element.closest('form').submit();
|
||||
// Use requestSubmit() to include the button's name/value in the form data
|
||||
$element.closest('form')[0].requestSubmit($element[0]);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
@@ -489,6 +489,7 @@ def test_api_import(client, live_server, measure_memory_usage, datastore_path):
|
||||
|
||||
api_key = live_server.app.config['DATASTORE'].data['settings']['application'].get('api_access_token')
|
||||
|
||||
# Test 1: Basic import with tag
|
||||
res = client.post(
|
||||
url_for("import") + "?tag=import-test",
|
||||
data='https://website1.com\r\nhttps://website2.com',
|
||||
@@ -507,6 +508,97 @@ def test_api_import(client, live_server, measure_memory_usage, datastore_path):
|
||||
res = client.get(url_for('tags.tags_overview_page'))
|
||||
assert b'import-test' in res.data
|
||||
|
||||
# Test 2: Import with watch configuration fields (issue #3845)
|
||||
# Test string field (include_filters), boolean (paused), and processor
|
||||
import urllib.parse
|
||||
params = urllib.parse.urlencode({
|
||||
'tag': 'config-test',
|
||||
'include_filters': 'div.content',
|
||||
'paused': 'true',
|
||||
'processor': 'text_json_diff',
|
||||
'title': 'Imported with Config'
|
||||
})
|
||||
|
||||
res = client.post(
|
||||
url_for("import") + "?" + params,
|
||||
data='https://website3.com',
|
||||
headers={'x-api-key': api_key},
|
||||
follow_redirects=True
|
||||
)
|
||||
|
||||
assert res.status_code == 200
|
||||
assert len(res.json) == 1
|
||||
uuid = res.json[0]
|
||||
|
||||
# Verify the configuration was applied
|
||||
watch = live_server.app.config['DATASTORE'].data['watching'][uuid]
|
||||
assert watch['include_filters'] == ['div.content'], "include_filters should be set as array"
|
||||
assert watch['paused'] == True, "paused should be True"
|
||||
assert watch['processor'] == 'text_json_diff', "processor should be set"
|
||||
assert watch['title'] == 'Imported with Config', "title should be set"
|
||||
|
||||
# Test 3: Import with array field (notification_urls) - using valid Apprise format
|
||||
params = urllib.parse.urlencode({
|
||||
'tag': 'notification-test',
|
||||
'notification_urls': 'mailto://test@example.com,mailto://admin@example.com'
|
||||
})
|
||||
|
||||
res = client.post(
|
||||
url_for("import") + "?" + params,
|
||||
data='https://website4.com',
|
||||
headers={'x-api-key': api_key},
|
||||
follow_redirects=True
|
||||
)
|
||||
|
||||
assert res.status_code == 200
|
||||
uuid = res.json[0]
|
||||
watch = live_server.app.config['DATASTORE'].data['watching'][uuid]
|
||||
assert 'mailto://test@example.com' in watch['notification_urls'], "notification_urls should contain first email"
|
||||
assert 'mailto://admin@example.com' in watch['notification_urls'], "notification_urls should contain second email"
|
||||
|
||||
# Test 4: Import with object field (time_between_check)
|
||||
import json
|
||||
time_config = json.dumps({"hours": 2, "minutes": 30})
|
||||
params = urllib.parse.urlencode({
|
||||
'tag': 'schedule-test',
|
||||
'time_between_check': time_config
|
||||
})
|
||||
|
||||
res = client.post(
|
||||
url_for("import") + "?" + params,
|
||||
data='https://website5.com',
|
||||
headers={'x-api-key': api_key},
|
||||
follow_redirects=True
|
||||
)
|
||||
|
||||
assert res.status_code == 200
|
||||
uuid = res.json[0]
|
||||
watch = live_server.app.config['DATASTORE'].data['watching'][uuid]
|
||||
assert watch['time_between_check']['hours'] == 2, "time_between_check hours should be 2"
|
||||
assert watch['time_between_check']['minutes'] == 30, "time_between_check minutes should be 30"
|
||||
|
||||
# Test 5: Import with invalid processor (should fail)
|
||||
res = client.post(
|
||||
url_for("import") + "?processor=invalid_processor",
|
||||
data='https://website6.com',
|
||||
headers={'x-api-key': api_key},
|
||||
follow_redirects=True
|
||||
)
|
||||
|
||||
assert res.status_code == 400, "Should reject invalid processor"
|
||||
assert b"Invalid processor" in res.data, "Error message should mention invalid processor"
|
||||
|
||||
# Test 6: Import with invalid field (should fail)
|
||||
res = client.post(
|
||||
url_for("import") + "?unknown_field=value",
|
||||
data='https://website7.com',
|
||||
headers={'x-api-key': api_key},
|
||||
follow_redirects=True
|
||||
)
|
||||
|
||||
assert res.status_code == 400, "Should reject unknown field"
|
||||
assert b"Unknown watch configuration parameter" in res.data, "Error message should mention unknown parameter"
|
||||
|
||||
def test_api_conflict_UI_password(client, live_server, measure_memory_usage, datastore_path):
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user