mirror of
https://github.com/dgtlmoon/changedetection.io.git
synced 2025-12-11 18:45:34 +00:00
* Re #265 - Use extended JSONpath support, Allow a JSONPath selector to not match anything (yet) Adding test Correctly capture invalid JSONPath query error
232 lines
8.7 KiB
Python
232 lines
8.7 KiB
Python
from wtforms import Form, SelectField, RadioField, BooleanField, StringField, PasswordField, validators, IntegerField, fields, TextAreaField, \
|
|
Field
|
|
from wtforms import widgets
|
|
from wtforms.validators import ValidationError
|
|
from wtforms.fields import html5
|
|
from changedetectionio import content_fetcher
|
|
import re
|
|
|
|
class StringListField(StringField):
|
|
widget = widgets.TextArea()
|
|
|
|
def _value(self):
|
|
if self.data:
|
|
return "\r\n".join(self.data)
|
|
else:
|
|
return u''
|
|
|
|
# incoming
|
|
def process_formdata(self, valuelist):
|
|
if valuelist:
|
|
# Remove empty strings
|
|
cleaned = list(filter(None, valuelist[0].split("\n")))
|
|
self.data = [x.strip() for x in cleaned]
|
|
p = 1
|
|
else:
|
|
self.data = []
|
|
|
|
|
|
|
|
class SaltyPasswordField(StringField):
|
|
widget = widgets.PasswordInput()
|
|
encrypted_password = ""
|
|
|
|
def build_password(self, password):
|
|
import hashlib
|
|
import base64
|
|
import secrets
|
|
|
|
# Make a new salt on every new password and store it with the password
|
|
salt = secrets.token_bytes(32)
|
|
|
|
key = hashlib.pbkdf2_hmac('sha256', password.encode('utf-8'), salt, 100000)
|
|
store = base64.b64encode(salt + key).decode('ascii')
|
|
|
|
return store
|
|
|
|
# incoming
|
|
def process_formdata(self, valuelist):
|
|
if valuelist:
|
|
# Be really sure it's non-zero in length
|
|
if len(valuelist[0].strip()) > 0:
|
|
self.encrypted_password = self.build_password(valuelist[0])
|
|
self.data = ""
|
|
else:
|
|
self.data = False
|
|
|
|
|
|
# Separated by key:value
|
|
class StringDictKeyValue(StringField):
|
|
widget = widgets.TextArea()
|
|
|
|
def _value(self):
|
|
if self.data:
|
|
output = u''
|
|
for k in self.data.keys():
|
|
output += "{}: {}\r\n".format(k, self.data[k])
|
|
|
|
return output
|
|
else:
|
|
return u''
|
|
|
|
# incoming
|
|
def process_formdata(self, valuelist):
|
|
if valuelist:
|
|
self.data = {}
|
|
# Remove empty strings
|
|
cleaned = list(filter(None, valuelist[0].split("\n")))
|
|
for s in cleaned:
|
|
parts = s.strip().split(':', 1)
|
|
if len(parts) == 2:
|
|
self.data.update({parts[0].strip(): parts[1].strip()})
|
|
|
|
else:
|
|
self.data = {}
|
|
|
|
class ValidateContentFetcherIsReady(object):
|
|
"""
|
|
Validates that anything that looks like a regex passes as a regex
|
|
"""
|
|
def __init__(self, message=None):
|
|
self.message = message
|
|
|
|
def __call__(self, form, field):
|
|
from changedetectionio import content_fetcher
|
|
import urllib3.exceptions
|
|
|
|
# Better would be a radiohandler that keeps a reference to each class
|
|
if field.data is not None:
|
|
klass = getattr(content_fetcher, field.data)
|
|
some_object = klass()
|
|
try:
|
|
ready = some_object.is_ready()
|
|
|
|
except urllib3.exceptions.MaxRetryError as e:
|
|
driver_url = some_object.command_executor
|
|
message = field.gettext('Content fetcher \'%s\' did not respond.' % (field.data))
|
|
message += '<br/>'+field.gettext('Be sure that the selenium/webdriver runner is running and accessible via network from this container/host.')
|
|
message += '<br/>' + field.gettext('Did you follow the instructions in the wiki?')
|
|
message += '<br/><br/>' + field.gettext('WebDriver Host: %s' % (driver_url))
|
|
message += '<br/><a href="https://github.com/dgtlmoon/changedetection.io/wiki/Fetching-pages-with-WebDriver">Go here for more information</a>'
|
|
|
|
raise ValidationError(message)
|
|
|
|
except Exception as e:
|
|
message = field.gettext('Content fetcher \'%s\' did not respond properly, unable to use it.\n %s')
|
|
raise ValidationError(message % (field.data, e))
|
|
|
|
|
|
class ValidateAppRiseServers(object):
|
|
"""
|
|
Validates that each URL given is compatible with AppRise
|
|
"""
|
|
|
|
def __init__(self, message=None):
|
|
self.message = message
|
|
|
|
def __call__(self, form, field):
|
|
import apprise
|
|
apobj = apprise.Apprise()
|
|
|
|
for server_url in field.data:
|
|
if not apobj.add(server_url):
|
|
message = field.gettext('\'%s\' is not a valid AppRise URL.' % (server_url))
|
|
raise ValidationError(message)
|
|
|
|
class ValidateTokensList(object):
|
|
"""
|
|
Validates that a {token} is from a valid set
|
|
"""
|
|
def __init__(self, message=None):
|
|
self.message = message
|
|
|
|
def __call__(self, form, field):
|
|
from changedetectionio import notification
|
|
regex = re.compile('{.*?}')
|
|
for p in re.findall(regex, field.data):
|
|
if not p.strip('{}') in notification.valid_tokens:
|
|
message = field.gettext('Token \'%s\' is not a valid token.')
|
|
raise ValidationError(message % (p))
|
|
|
|
class ValidateListRegex(object):
|
|
"""
|
|
Validates that anything that looks like a regex passes as a regex
|
|
"""
|
|
def __init__(self, message=None):
|
|
self.message = message
|
|
|
|
def __call__(self, form, field):
|
|
|
|
for line in field.data:
|
|
if line[0] == '/' and line[-1] == '/':
|
|
# Because internally we dont wrap in /
|
|
line = line.strip('/')
|
|
try:
|
|
re.compile(line)
|
|
except re.error:
|
|
message = field.gettext('RegEx \'%s\' is not a valid regular expression.')
|
|
raise ValidationError(message % (line))
|
|
|
|
class ValidateCSSJSONInput(object):
|
|
"""
|
|
Filter validation
|
|
@todo CSS validator ;)
|
|
"""
|
|
|
|
def __init__(self, message=None):
|
|
self.message = message
|
|
|
|
def __call__(self, form, field):
|
|
if 'json:' in field.data:
|
|
from jsonpath_ng.exceptions import JsonPathParserError, JsonPathLexerError
|
|
from jsonpath_ng.ext import parse
|
|
|
|
input = field.data.replace('json:', '')
|
|
|
|
try:
|
|
parse(input)
|
|
except (JsonPathParserError, JsonPathLexerError) as e:
|
|
message = field.gettext('\'%s\' is not a valid JSONPath expression. (%s)')
|
|
raise ValidationError(message % (input, str(e)))
|
|
|
|
# Re #265 - maybe in the future fetch the page and offer a
|
|
# warning/notice that its possible the rule doesnt yet match anything?
|
|
|
|
class quickWatchForm(Form):
|
|
# https://wtforms.readthedocs.io/en/2.3.x/fields/#module-wtforms.fields.html5
|
|
# `require_tld` = False is needed even for the test harness "http://localhost:5005.." to run
|
|
url = html5.URLField('URL', [validators.URL(require_tld=False)])
|
|
tag = StringField('Group tag', [validators.Optional(), validators.Length(max=35)])
|
|
|
|
class commonSettingsForm(Form):
|
|
|
|
notification_urls = StringListField('Notification URL List', validators=[validators.Optional(), ValidateAppRiseServers()])
|
|
notification_title = StringField('Notification Title', default='ChangeDetection.io Notification - {watch_url}', validators=[validators.Optional(), ValidateTokensList()])
|
|
notification_body = TextAreaField('Notification Body', default='{watch_url} had a change.', validators=[validators.Optional(), ValidateTokensList()])
|
|
trigger_check = BooleanField('Send test notification on save')
|
|
fetch_backend = RadioField(u'Fetch Method', choices=content_fetcher.available_fetchers(), validators=[ValidateContentFetcherIsReady()])
|
|
extract_title_as_title = BooleanField('Extract <title> from document and use as watch title', default=False)
|
|
|
|
class watchForm(commonSettingsForm):
|
|
|
|
url = html5.URLField('URL', [validators.URL(require_tld=False)])
|
|
tag = StringField('Group tag', [validators.Optional(), validators.Length(max=35)])
|
|
|
|
minutes_between_check = html5.IntegerField('Maximum time in minutes until recheck',
|
|
[validators.Optional(), validators.NumberRange(min=1)])
|
|
css_filter = StringField('CSS/JSON Filter', [ValidateCSSJSONInput()])
|
|
title = StringField('Title')
|
|
|
|
ignore_text = StringListField('Ignore Text', [ValidateListRegex()])
|
|
headers = StringDictKeyValue('Request Headers')
|
|
trigger_text = StringListField('Trigger/wait for text', [validators.Optional(), ValidateListRegex()])
|
|
|
|
|
|
class globalSettingsForm(commonSettingsForm):
|
|
|
|
password = SaltyPasswordField()
|
|
minutes_between_check = html5.IntegerField('Maximum time in minutes until recheck',
|
|
[validators.NumberRange(min=1)])
|
|
extract_title_as_title = BooleanField('Extract <title> from document and use as watch title')
|
|
base_url = StringField('Base URL', validators=[validators.Optional()])
|