mirror of
https://github.com/dgtlmoon/changedetection.io.git
synced 2025-12-13 03:25:45 +00:00
Merge branch 'master' into jinja2-notifications
This commit is contained in:
@@ -1,39 +1,75 @@
|
||||
from wtforms import Form, SelectField, RadioField, BooleanField, StringField, PasswordField, validators, IntegerField, fields, TextAreaField, \
|
||||
Field
|
||||
from wtforms import widgets
|
||||
from wtforms.validators import ValidationError
|
||||
from wtforms.fields import html5
|
||||
from changedetectionio import content_fetcher
|
||||
import os
|
||||
import re
|
||||
|
||||
from wtforms import (
|
||||
BooleanField,
|
||||
Form,
|
||||
IntegerField,
|
||||
RadioField,
|
||||
SelectField,
|
||||
StringField,
|
||||
SubmitField,
|
||||
TextAreaField,
|
||||
fields,
|
||||
validators,
|
||||
widgets
|
||||
)
|
||||
from wtforms.fields import FieldList
|
||||
from wtforms.validators import ValidationError
|
||||
|
||||
# default
|
||||
# each select <option data-enabled="enabled-0-0"
|
||||
from changedetectionio.blueprint.browser_steps.browser_steps import browser_step_ui_config
|
||||
|
||||
from changedetectionio import content_fetcher
|
||||
from changedetectionio.notification import (
|
||||
valid_notification_formats,
|
||||
)
|
||||
|
||||
from wtforms.fields import FormField
|
||||
|
||||
valid_method = {
|
||||
'GET',
|
||||
'POST',
|
||||
'PUT',
|
||||
'PATCH',
|
||||
'DELETE',
|
||||
}
|
||||
|
||||
default_method = 'GET'
|
||||
|
||||
|
||||
class StringListField(StringField):
|
||||
widget = widgets.TextArea()
|
||||
|
||||
def _value(self):
|
||||
if self.data:
|
||||
return "\r\n".join(self.data)
|
||||
# ignore empty lines in the storage
|
||||
data = list(filter(lambda x: len(x.strip()), self.data))
|
||||
# Apply strip to each line
|
||||
data = list(map(lambda x: x.strip(), data))
|
||||
return "\r\n".join(data)
|
||||
else:
|
||||
return u''
|
||||
|
||||
# incoming
|
||||
def process_formdata(self, valuelist):
|
||||
if valuelist:
|
||||
# Remove empty strings
|
||||
cleaned = list(filter(None, valuelist[0].split("\n")))
|
||||
self.data = [x.strip() for x in cleaned]
|
||||
p = 1
|
||||
if valuelist and len(valuelist[0].strip()):
|
||||
# Remove empty strings, stripping and splitting \r\n, only \n etc.
|
||||
self.data = valuelist[0].splitlines()
|
||||
# Remove empty lines from the final data
|
||||
self.data = list(filter(lambda x: len(x.strip()), self.data))
|
||||
else:
|
||||
self.data = []
|
||||
|
||||
|
||||
|
||||
class SaltyPasswordField(StringField):
|
||||
widget = widgets.PasswordInput()
|
||||
encrypted_password = ""
|
||||
|
||||
def build_password(self, password):
|
||||
import hashlib
|
||||
import base64
|
||||
import hashlib
|
||||
import secrets
|
||||
|
||||
# Make a new salt on every new password and store it with the password
|
||||
@@ -54,6 +90,13 @@ class SaltyPasswordField(StringField):
|
||||
else:
|
||||
self.data = False
|
||||
|
||||
class TimeBetweenCheckForm(Form):
|
||||
weeks = IntegerField('Weeks', validators=[validators.Optional(), validators.NumberRange(min=0, message="Should contain zero or more seconds")])
|
||||
days = IntegerField('Days', validators=[validators.Optional(), validators.NumberRange(min=0, message="Should contain zero or more seconds")])
|
||||
hours = IntegerField('Hours', validators=[validators.Optional(), validators.NumberRange(min=0, message="Should contain zero or more seconds")])
|
||||
minutes = IntegerField('Minutes', validators=[validators.Optional(), validators.NumberRange(min=0, message="Should contain zero or more seconds")])
|
||||
seconds = IntegerField('Seconds', validators=[validators.Optional(), validators.NumberRange(min=0, message="Should contain zero or more seconds")])
|
||||
# @todo add total seconds minimum validatior = minimum_seconds_recheck_time
|
||||
|
||||
# Separated by key:value
|
||||
class StringDictKeyValue(StringField):
|
||||
@@ -91,8 +134,8 @@ class ValidateContentFetcherIsReady(object):
|
||||
self.message = message
|
||||
|
||||
def __call__(self, form, field):
|
||||
from changedetectionio import content_fetcher
|
||||
import urllib3.exceptions
|
||||
from changedetectionio import content_fetcher
|
||||
|
||||
# Better would be a radiohandler that keeps a reference to each class
|
||||
if field.data is not None:
|
||||
@@ -104,10 +147,12 @@ class ValidateContentFetcherIsReady(object):
|
||||
except urllib3.exceptions.MaxRetryError as e:
|
||||
driver_url = some_object.command_executor
|
||||
message = field.gettext('Content fetcher \'%s\' did not respond.' % (field.data))
|
||||
message += '<br/>'+field.gettext('Be sure that the selenium/webdriver runner is running and accessible via network from this container/host.')
|
||||
message += '<br/>' + field.gettext(
|
||||
'Be sure that the selenium/webdriver runner is running and accessible via network from this container/host.')
|
||||
message += '<br/>' + field.gettext('Did you follow the instructions in the wiki?')
|
||||
message += '<br/><br/>' + field.gettext('WebDriver Host: %s' % (driver_url))
|
||||
message += '<br/><a href="https://github.com/dgtlmoon/changedetection.io/wiki/Fetching-pages-with-WebDriver">Go here for more information</a>'
|
||||
message += '<br/>'+field.gettext('Content fetcher did not respond properly, unable to use it.\n %s' % (str(e)))
|
||||
|
||||
raise ValidationError(message)
|
||||
|
||||
@@ -116,6 +161,21 @@ class ValidateContentFetcherIsReady(object):
|
||||
raise ValidationError(message % (field.data, e))
|
||||
|
||||
|
||||
class ValidateNotificationBodyAndTitleWhenURLisSet(object):
|
||||
"""
|
||||
Validates that they entered something in both notification title+body when the URL is set
|
||||
Due to https://github.com/dgtlmoon/changedetection.io/issues/360
|
||||
"""
|
||||
|
||||
def __init__(self, message=None):
|
||||
self.message = message
|
||||
|
||||
def __call__(self, form, field):
|
||||
if len(field.data):
|
||||
if not len(form.notification_title.data) or not len(form.notification_body.data):
|
||||
message = field.gettext('Notification Body and Title is required when a Notification URL is used')
|
||||
raise ValidationError(message)
|
||||
|
||||
class ValidateAppRiseServers(object):
|
||||
"""
|
||||
Validates that each URL given is compatible with AppRise
|
||||
@@ -142,6 +202,7 @@ class ValidateJinja2Template(object):
|
||||
|
||||
def __call__(self, form, field):
|
||||
from changedetectionio import notification
|
||||
|
||||
from jinja2 import Environment, BaseLoader, TemplateSyntaxError
|
||||
from jinja2.meta import find_undeclared_variables
|
||||
|
||||
@@ -159,6 +220,30 @@ class ValidateJinja2Template(object):
|
||||
f"The following tokens used in the notification are not valid: {undefined}"
|
||||
)
|
||||
|
||||
regex = re.compile('{.*?}')
|
||||
for p in re.findall(regex, field.data):
|
||||
if not p.strip('{}') in notification.valid_tokens:
|
||||
message = field.gettext('Token \'%s\' is not a valid token.')
|
||||
raise ValidationError(message % (p))
|
||||
|
||||
class validateURL(object):
|
||||
|
||||
"""
|
||||
Flask wtform validators wont work with basic auth
|
||||
"""
|
||||
|
||||
def __init__(self, message=None):
|
||||
self.message = message
|
||||
|
||||
def __call__(self, form, field):
|
||||
import validators
|
||||
try:
|
||||
validators.url(field.data.strip())
|
||||
except validators.ValidationFailure:
|
||||
message = field.gettext('\'%s\' is not a valid URL.' % (field.data.strip()))
|
||||
raise ValidationError(message)
|
||||
|
||||
|
||||
class ValidateListRegex(object):
|
||||
"""
|
||||
Validates that anything that looks like a regex passes as a regex
|
||||
@@ -178,65 +263,207 @@ class ValidateListRegex(object):
|
||||
message = field.gettext('RegEx \'%s\' is not a valid regular expression.')
|
||||
raise ValidationError(message % (line))
|
||||
|
||||
class ValidateCSSJSONInput(object):
|
||||
class ValidateCSSJSONXPATHInput(object):
|
||||
"""
|
||||
Filter validation
|
||||
@todo CSS validator ;)
|
||||
"""
|
||||
|
||||
def __init__(self, message=None):
|
||||
def __init__(self, message=None, allow_xpath=True, allow_json=True):
|
||||
self.message = message
|
||||
self.allow_xpath = allow_xpath
|
||||
self.allow_json = allow_json
|
||||
|
||||
def __call__(self, form, field):
|
||||
if 'json:' in field.data:
|
||||
from jsonpath_ng.exceptions import JsonPathParserError, JsonPathLexerError
|
||||
from jsonpath_ng.ext import parse
|
||||
|
||||
input = field.data.replace('json:', '')
|
||||
if isinstance(field.data, str):
|
||||
data = [field.data]
|
||||
else:
|
||||
data = field.data
|
||||
|
||||
try:
|
||||
parse(input)
|
||||
except (JsonPathParserError, JsonPathLexerError) as e:
|
||||
message = field.gettext('\'%s\' is not a valid JSONPath expression. (%s)')
|
||||
raise ValidationError(message % (input, str(e)))
|
||||
for line in data:
|
||||
# Nothing to see here
|
||||
if not len(line.strip()):
|
||||
return
|
||||
|
||||
# Re #265 - maybe in the future fetch the page and offer a
|
||||
# warning/notice that its possible the rule doesnt yet match anything?
|
||||
# Does it look like XPath?
|
||||
if line.strip()[0] == '/':
|
||||
if not self.allow_xpath:
|
||||
raise ValidationError("XPath not permitted in this field!")
|
||||
from lxml import etree, html
|
||||
tree = html.fromstring("<html></html>")
|
||||
|
||||
try:
|
||||
tree.xpath(line.strip())
|
||||
except etree.XPathEvalError as e:
|
||||
message = field.gettext('\'%s\' is not a valid XPath expression. (%s)')
|
||||
raise ValidationError(message % (line, str(e)))
|
||||
except:
|
||||
raise ValidationError("A system-error occurred when validating your XPath expression")
|
||||
|
||||
if 'json:' in line:
|
||||
if not self.allow_json:
|
||||
raise ValidationError("JSONPath not permitted in this field!")
|
||||
|
||||
from jsonpath_ng.exceptions import (
|
||||
JsonPathLexerError,
|
||||
JsonPathParserError,
|
||||
)
|
||||
from jsonpath_ng.ext import parse
|
||||
|
||||
input = line.replace('json:', '')
|
||||
|
||||
try:
|
||||
parse(input)
|
||||
except (JsonPathParserError, JsonPathLexerError) as e:
|
||||
message = field.gettext('\'%s\' is not a valid JSONPath expression. (%s)')
|
||||
raise ValidationError(message % (input, str(e)))
|
||||
except:
|
||||
raise ValidationError("A system-error occurred when validating your JSONPath expression")
|
||||
|
||||
# Re #265 - maybe in the future fetch the page and offer a
|
||||
# warning/notice that its possible the rule doesnt yet match anything?
|
||||
if not self.allow_json:
|
||||
raise ValidationError("jq not permitted in this field!")
|
||||
|
||||
if 'jq:' in line:
|
||||
try:
|
||||
import jq
|
||||
except ModuleNotFoundError:
|
||||
# `jq` requires full compilation in windows and so isn't generally available
|
||||
raise ValidationError("jq not support not found")
|
||||
|
||||
input = line.replace('jq:', '')
|
||||
|
||||
try:
|
||||
jq.compile(input)
|
||||
except (ValueError) as e:
|
||||
message = field.gettext('\'%s\' is not a valid jq expression. (%s)')
|
||||
raise ValidationError(message % (input, str(e)))
|
||||
except:
|
||||
raise ValidationError("A system-error occurred when validating your jq expression")
|
||||
|
||||
class quickWatchForm(Form):
|
||||
# https://wtforms.readthedocs.io/en/2.3.x/fields/#module-wtforms.fields.html5
|
||||
# `require_tld` = False is needed even for the test harness "http://localhost:5005.." to run
|
||||
url = html5.URLField('URL', [validators.URL(require_tld=False)])
|
||||
tag = StringField('Group tag', [validators.Optional(), validators.Length(max=35)])
|
||||
url = fields.URLField('URL', validators=[validateURL()])
|
||||
tag = StringField('Group tag', [validators.Optional()])
|
||||
watch_submit_button = SubmitField('Watch', render_kw={"class": "pure-button pure-button-primary"})
|
||||
edit_and_watch_submit_button = SubmitField('Edit > Watch', render_kw={"class": "pure-button pure-button-primary"})
|
||||
|
||||
|
||||
# Common to a single watch and the global settings
|
||||
class commonSettingsForm(Form):
|
||||
|
||||
notification_urls = StringListField('Notification URL List', validators=[validators.Optional(), ValidateAppRiseServers()])
|
||||
notification_title = StringField('Notification Title', default='ChangeDetection.io Notification - {{ watch_url }}', validators=[validators.Optional(), ValidateJinja2Template()])
|
||||
notification_body = TextAreaField('Notification Body', default='{{ watch_url }} had a change.', validators=[validators.Optional(), ValidateJinja2Template()])
|
||||
trigger_check = BooleanField('Send test notification on save')
|
||||
notification_format = SelectField('Notification format', choices=valid_notification_formats.keys())
|
||||
fetch_backend = RadioField(u'Fetch Method', choices=content_fetcher.available_fetchers(), validators=[ValidateContentFetcherIsReady()])
|
||||
extract_title_as_title = BooleanField('Extract <title> from document and use as watch title', default=False)
|
||||
webdriver_delay = IntegerField('Wait seconds before extracting text', validators=[validators.Optional(), validators.NumberRange(min=1,
|
||||
message="Should contain one or more seconds")])
|
||||
|
||||
class SingleBrowserStep(Form):
|
||||
|
||||
operation = SelectField('Operation', [validators.Optional()], choices=browser_step_ui_config.keys())
|
||||
|
||||
# maybe better to set some <script>var..
|
||||
selector = StringField('Selector', [validators.Optional()], render_kw={"placeholder": "CSS or xPath selector"})
|
||||
optional_value = StringField('value', [validators.Optional()], render_kw={"placeholder": "Value"})
|
||||
# @todo move to JS? ajax fetch new field?
|
||||
# remove_button = SubmitField('-', render_kw={"type": "button", "class": "pure-button pure-button-primary", 'title': 'Remove'})
|
||||
# add_button = SubmitField('+', render_kw={"type": "button", "class": "pure-button pure-button-primary", 'title': 'Add new step after'})
|
||||
|
||||
class watchForm(commonSettingsForm):
|
||||
|
||||
url = html5.URLField('URL', [validators.URL(require_tld=False)])
|
||||
tag = StringField('Group tag', [validators.Optional(), validators.Length(max=35)])
|
||||
url = fields.URLField('URL', validators=[validateURL()])
|
||||
tag = StringField('Group tag', [validators.Optional()], default='')
|
||||
|
||||
minutes_between_check = html5.IntegerField('Maximum time in minutes until recheck',
|
||||
[validators.Optional(), validators.NumberRange(min=1)])
|
||||
css_filter = StringField('CSS/JSON Filter', [ValidateCSSJSONInput()])
|
||||
title = StringField('Title')
|
||||
time_between_check = FormField(TimeBetweenCheckForm)
|
||||
|
||||
ignore_text = StringListField('Ignore Text', [ValidateListRegex()])
|
||||
headers = StringDictKeyValue('Request Headers')
|
||||
include_filters = StringListField('CSS/JSONPath/JQ/XPath Filters', [ValidateCSSJSONXPATHInput()], default='')
|
||||
|
||||
subtractive_selectors = StringListField('Remove elements', [ValidateCSSJSONXPATHInput(allow_xpath=False, allow_json=False)])
|
||||
|
||||
extract_text = StringListField('Extract text', [ValidateListRegex()])
|
||||
|
||||
title = StringField('Title', default='')
|
||||
|
||||
ignore_text = StringListField('Ignore text', [ValidateListRegex()])
|
||||
headers = StringDictKeyValue('Request headers')
|
||||
body = TextAreaField('Request body', [validators.Optional()])
|
||||
method = SelectField('Request method', choices=valid_method, default=default_method)
|
||||
ignore_status_codes = BooleanField('Ignore status codes (process non-2xx status codes as normal)', default=False)
|
||||
check_unique_lines = BooleanField('Only trigger when new lines appear', default=False)
|
||||
trigger_text = StringListField('Trigger/wait for text', [validators.Optional(), ValidateListRegex()])
|
||||
if os.getenv("PLAYWRIGHT_DRIVER_URL"):
|
||||
browser_steps = FieldList(FormField(SingleBrowserStep), min_entries=10)
|
||||
text_should_not_be_present = StringListField('Block change-detection if text matches', [validators.Optional(), ValidateListRegex()])
|
||||
webdriver_js_execute_code = TextAreaField('Execute JavaScript before change detection', render_kw={"rows": "5"}, validators=[validators.Optional()])
|
||||
|
||||
save_button = SubmitField('Save', render_kw={"class": "pure-button pure-button-primary"})
|
||||
|
||||
proxy = RadioField('Proxy')
|
||||
filter_failure_notification_send = BooleanField(
|
||||
'Send a notification when the filter can no longer be found on the page', default=False)
|
||||
|
||||
notification_muted = BooleanField('Notifications Muted / Off', default=False)
|
||||
notification_screenshot = BooleanField('Attach screenshot to notification (where possible)', default=False)
|
||||
|
||||
def validate(self, **kwargs):
|
||||
if not super().validate():
|
||||
return False
|
||||
|
||||
result = True
|
||||
|
||||
# Fail form validation when a body is set for a GET
|
||||
if self.method.data == 'GET' and self.body.data:
|
||||
self.body.errors.append('Body must be empty when Request Method is set to GET')
|
||||
result = False
|
||||
|
||||
# Attempt to validate jinja2 templates in the URL
|
||||
from jinja2 import Environment
|
||||
# Jinja2 available in URLs along with https://pypi.org/project/jinja2-time/
|
||||
jinja2_env = Environment(extensions=['jinja2_time.TimeExtension'])
|
||||
try:
|
||||
ready_url = str(jinja2_env.from_string(self.url.data).render())
|
||||
except Exception as e:
|
||||
self.url.errors.append('Invalid template syntax')
|
||||
result = False
|
||||
return result
|
||||
|
||||
|
||||
class globalSettingsForm(commonSettingsForm):
|
||||
# datastore.data['settings']['requests']..
|
||||
class globalSettingsRequestForm(Form):
|
||||
time_between_check = FormField(TimeBetweenCheckForm)
|
||||
proxy = RadioField('Proxy')
|
||||
jitter_seconds = IntegerField('Random jitter seconds ± check',
|
||||
render_kw={"style": "width: 5em;"},
|
||||
validators=[validators.NumberRange(min=0, message="Should contain zero or more seconds")])
|
||||
|
||||
# datastore.data['settings']['application']..
|
||||
class globalSettingsApplicationForm(commonSettingsForm):
|
||||
|
||||
password = SaltyPasswordField()
|
||||
minutes_between_check = html5.IntegerField('Maximum time in minutes until recheck',
|
||||
[validators.NumberRange(min=1)])
|
||||
extract_title_as_title = BooleanField('Extract <title> from document and use as watch title')
|
||||
base_url = StringField('Base URL', validators=[validators.Optional()])
|
||||
global_subtractive_selectors = StringListField('Remove elements', [ValidateCSSJSONXPATHInput(allow_xpath=False, allow_json=False)])
|
||||
global_ignore_text = StringListField('Ignore Text', [ValidateListRegex()])
|
||||
ignore_whitespace = BooleanField('Ignore whitespace')
|
||||
removepassword_button = SubmitField('Remove password', render_kw={"class": "pure-button pure-button-primary"})
|
||||
empty_pages_are_a_change = BooleanField('Treat empty pages as a change?', default=False)
|
||||
render_anchor_tag_content = BooleanField('Render anchor tag content', default=False)
|
||||
fetch_backend = RadioField('Fetch Method', default="html_requests", choices=content_fetcher.available_fetchers(), validators=[ValidateContentFetcherIsReady()])
|
||||
api_access_token_enabled = BooleanField('API access token security check enabled', default=True, validators=[validators.Optional()])
|
||||
password = SaltyPasswordField()
|
||||
|
||||
filter_failure_notification_threshold_attempts = IntegerField('Number of times the filter can be missing before sending a notification',
|
||||
render_kw={"style": "width: 5em;"},
|
||||
validators=[validators.NumberRange(min=0,
|
||||
message="Should contain zero or more attempts")])
|
||||
|
||||
|
||||
class globalSettingsForm(Form):
|
||||
# Define these as FormFields/"sub forms", this way it matches the JSON storage
|
||||
# datastore.data['settings']['application']..
|
||||
# datastore.data['settings']['requests']..
|
||||
|
||||
requests = FormField(globalSettingsRequestForm)
|
||||
application = FormField(globalSettingsApplicationForm)
|
||||
save_button = SubmitField('Save', render_kw={"class": "pure-button pure-button-primary"})
|
||||
|
||||
Reference in New Issue
Block a user