mirror of
				https://github.com/dgtlmoon/changedetection.io.git
				synced 2025-11-04 00:27:48 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			625 lines
		
	
	
		
			28 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			625 lines
		
	
	
		
			28 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
import os
 | 
						|
import re
 | 
						|
from loguru import logger
 | 
						|
 | 
						|
from changedetectionio.strtobool import strtobool
 | 
						|
 | 
						|
from wtforms import (
 | 
						|
    BooleanField,
 | 
						|
    Form,
 | 
						|
    IntegerField,
 | 
						|
    RadioField,
 | 
						|
    SelectField,
 | 
						|
    StringField,
 | 
						|
    SubmitField,
 | 
						|
    TextAreaField,
 | 
						|
    fields,
 | 
						|
    validators,
 | 
						|
    widgets
 | 
						|
)
 | 
						|
from flask_wtf.file import FileField, FileAllowed
 | 
						|
from wtforms.fields import FieldList
 | 
						|
 | 
						|
from wtforms.validators import ValidationError
 | 
						|
 | 
						|
from validators.url import url as url_validator
 | 
						|
 | 
						|
 | 
						|
# default
 | 
						|
# each select <option data-enabled="enabled-0-0"
 | 
						|
from changedetectionio.blueprint.browser_steps.browser_steps import browser_step_ui_config
 | 
						|
 | 
						|
from changedetectionio import html_tools, content_fetchers
 | 
						|
 | 
						|
from changedetectionio.notification import (
 | 
						|
    valid_notification_formats,
 | 
						|
)
 | 
						|
 | 
						|
from wtforms.fields import FormField
 | 
						|
 | 
						|
dictfilt = lambda x, y: dict([ (i,x[i]) for i in x if i in set(y) ])
 | 
						|
 | 
						|
valid_method = {
 | 
						|
    'GET',
 | 
						|
    'POST',
 | 
						|
    'PUT',
 | 
						|
    'PATCH',
 | 
						|
    'DELETE',
 | 
						|
    'OPTIONS',
 | 
						|
}
 | 
						|
 | 
						|
default_method = 'GET'
 | 
						|
allow_simplehost = not strtobool(os.getenv('BLOCK_SIMPLEHOSTS', 'False'))
 | 
						|
 | 
						|
class StringListField(StringField):
 | 
						|
    widget = widgets.TextArea()
 | 
						|
 | 
						|
    def _value(self):
 | 
						|
        if self.data:
 | 
						|
            # ignore empty lines in the storage
 | 
						|
            data = list(filter(lambda x: len(x.strip()), self.data))
 | 
						|
            # Apply strip to each line
 | 
						|
            data = list(map(lambda x: x.strip(), data))
 | 
						|
            return "\r\n".join(data)
 | 
						|
        else:
 | 
						|
            return u''
 | 
						|
 | 
						|
    # incoming
 | 
						|
    def process_formdata(self, valuelist):
 | 
						|
        if valuelist and len(valuelist[0].strip()):
 | 
						|
            # Remove empty strings, stripping and splitting \r\n, only \n etc.
 | 
						|
            self.data = valuelist[0].splitlines()
 | 
						|
            # Remove empty lines from the final data
 | 
						|
            self.data = list(filter(lambda x: len(x.strip()), self.data))
 | 
						|
        else:
 | 
						|
            self.data = []
 | 
						|
 | 
						|
 | 
						|
class SaltyPasswordField(StringField):
 | 
						|
    widget = widgets.PasswordInput()
 | 
						|
    encrypted_password = ""
 | 
						|
 | 
						|
    def build_password(self, password):
 | 
						|
        import base64
 | 
						|
        import hashlib
 | 
						|
        import secrets
 | 
						|
 | 
						|
        # Make a new salt on every new password and store it with the password
 | 
						|
        salt = secrets.token_bytes(32)
 | 
						|
 | 
						|
        key = hashlib.pbkdf2_hmac('sha256', password.encode('utf-8'), salt, 100000)
 | 
						|
        store = base64.b64encode(salt + key).decode('ascii')
 | 
						|
 | 
						|
        return store
 | 
						|
 | 
						|
    # incoming
 | 
						|
    def process_formdata(self, valuelist):
 | 
						|
        if valuelist:
 | 
						|
            # Be really sure it's non-zero in length
 | 
						|
            if len(valuelist[0].strip()) > 0:
 | 
						|
                self.encrypted_password = self.build_password(valuelist[0])
 | 
						|
                self.data = ""
 | 
						|
        else:
 | 
						|
            self.data = False
 | 
						|
 | 
						|
class StringTagUUID(StringField):
 | 
						|
 | 
						|
   # process_formdata(self, valuelist) handled manually in POST handler
 | 
						|
 | 
						|
    # Is what is shown when field <input> is rendered
 | 
						|
    def _value(self):
 | 
						|
        # Tag UUID to name, on submit it will convert it back (in the submit handler of init.py)
 | 
						|
        if self.data and type(self.data) is list:
 | 
						|
            tag_titles = []
 | 
						|
            for i in self.data:
 | 
						|
                tag = self.datastore.data['settings']['application']['tags'].get(i)
 | 
						|
                if tag:
 | 
						|
                    tag_title = tag.get('title')
 | 
						|
                    if tag_title:
 | 
						|
                        tag_titles.append(tag_title)
 | 
						|
 | 
						|
            return ', '.join(tag_titles)
 | 
						|
 | 
						|
        if not self.data:
 | 
						|
            return ''
 | 
						|
 | 
						|
        return 'error'
 | 
						|
 | 
						|
class TimeBetweenCheckForm(Form):
 | 
						|
    weeks = IntegerField('Weeks', validators=[validators.Optional(), validators.NumberRange(min=0, message="Should contain zero or more seconds")])
 | 
						|
    days = IntegerField('Days', validators=[validators.Optional(), validators.NumberRange(min=0, message="Should contain zero or more seconds")])
 | 
						|
    hours = IntegerField('Hours', validators=[validators.Optional(), validators.NumberRange(min=0, message="Should contain zero or more seconds")])
 | 
						|
    minutes = IntegerField('Minutes', validators=[validators.Optional(), validators.NumberRange(min=0, message="Should contain zero or more seconds")])
 | 
						|
    seconds = IntegerField('Seconds', validators=[validators.Optional(), validators.NumberRange(min=0, message="Should contain zero or more seconds")])
 | 
						|
    # @todo add total seconds minimum validatior = minimum_seconds_recheck_time
 | 
						|
 | 
						|
# Separated by  key:value
 | 
						|
class StringDictKeyValue(StringField):
 | 
						|
    widget = widgets.TextArea()
 | 
						|
 | 
						|
    def _value(self):
 | 
						|
        if self.data:
 | 
						|
            output = u''
 | 
						|
            for k in self.data.keys():
 | 
						|
                output += "{}: {}\r\n".format(k, self.data[k])
 | 
						|
 | 
						|
            return output
 | 
						|
        else:
 | 
						|
            return u''
 | 
						|
 | 
						|
    # incoming
 | 
						|
    def process_formdata(self, valuelist):
 | 
						|
        if valuelist:
 | 
						|
            self.data = {}
 | 
						|
            # Remove empty strings
 | 
						|
            cleaned = list(filter(None, valuelist[0].split("\n")))
 | 
						|
            for s in cleaned:
 | 
						|
                parts = s.strip().split(':', 1)
 | 
						|
                if len(parts) == 2:
 | 
						|
                    self.data.update({parts[0].strip(): parts[1].strip()})
 | 
						|
 | 
						|
        else:
 | 
						|
            self.data = {}
 | 
						|
 | 
						|
class ValidateContentFetcherIsReady(object):
 | 
						|
    """
 | 
						|
    Validates that anything that looks like a regex passes as a regex
 | 
						|
    """
 | 
						|
    def __init__(self, message=None):
 | 
						|
        self.message = message
 | 
						|
 | 
						|
    def __call__(self, form, field):
 | 
						|
        return
 | 
						|
 | 
						|
# AttributeError: module 'changedetectionio.content_fetcher' has no attribute 'extra_browser_unlocked<>ASDF213r123r'
 | 
						|
        # Better would be a radiohandler that keeps a reference to each class
 | 
						|
        # if field.data is not None and field.data != 'system':
 | 
						|
        #     klass = getattr(content_fetcher, field.data)
 | 
						|
        #     some_object = klass()
 | 
						|
        #     try:
 | 
						|
        #         ready = some_object.is_ready()
 | 
						|
        #
 | 
						|
        #     except urllib3.exceptions.MaxRetryError as e:
 | 
						|
        #         driver_url = some_object.command_executor
 | 
						|
        #         message = field.gettext('Content fetcher \'%s\' did not respond.' % (field.data))
 | 
						|
        #         message += '<br>' + field.gettext(
 | 
						|
        #             'Be sure that the selenium/webdriver runner is running and accessible via network from this container/host.')
 | 
						|
        #         message += '<br>' + field.gettext('Did you follow the instructions in the wiki?')
 | 
						|
        #         message += '<br><br>' + field.gettext('WebDriver Host: %s' % (driver_url))
 | 
						|
        #         message += '<br><a href="https://github.com/dgtlmoon/changedetection.io/wiki/Fetching-pages-with-WebDriver">Go here for more information</a>'
 | 
						|
        #         message += '<br>'+field.gettext('Content fetcher did not respond properly, unable to use it.\n %s' % (str(e)))
 | 
						|
        #
 | 
						|
        #         raise ValidationError(message)
 | 
						|
        #
 | 
						|
        #     except Exception as e:
 | 
						|
        #         message = field.gettext('Content fetcher \'%s\' did not respond properly, unable to use it.\n %s')
 | 
						|
        #         raise ValidationError(message % (field.data, e))
 | 
						|
 | 
						|
 | 
						|
class ValidateNotificationBodyAndTitleWhenURLisSet(object):
 | 
						|
    """
 | 
						|
       Validates that they entered something in both notification title+body when the URL is set
 | 
						|
       Due to https://github.com/dgtlmoon/changedetection.io/issues/360
 | 
						|
       """
 | 
						|
 | 
						|
    def __init__(self, message=None):
 | 
						|
        self.message = message
 | 
						|
 | 
						|
    def __call__(self, form, field):
 | 
						|
        if len(field.data):
 | 
						|
            if not len(form.notification_title.data) or not len(form.notification_body.data):
 | 
						|
                message = field.gettext('Notification Body and Title is required when a Notification URL is used')
 | 
						|
                raise ValidationError(message)
 | 
						|
 | 
						|
class ValidateAppRiseServers(object):
 | 
						|
    """
 | 
						|
       Validates that each URL given is compatible with AppRise
 | 
						|
       """
 | 
						|
 | 
						|
    def __init__(self, message=None):
 | 
						|
        self.message = message
 | 
						|
 | 
						|
    def __call__(self, form, field):
 | 
						|
        import apprise
 | 
						|
        apobj = apprise.Apprise()
 | 
						|
        # so that the custom endpoints are registered
 | 
						|
        from changedetectionio.apprise_plugin import apprise_custom_api_call_wrapper
 | 
						|
        for server_url in field.data:
 | 
						|
            if not apobj.add(server_url):
 | 
						|
                message = field.gettext('\'%s\' is not a valid AppRise URL.' % (server_url))
 | 
						|
                raise ValidationError(message)
 | 
						|
 | 
						|
class ValidateJinja2Template(object):
 | 
						|
    """
 | 
						|
    Validates that a {token} is from a valid set
 | 
						|
    """
 | 
						|
    def __call__(self, form, field):
 | 
						|
        from changedetectionio import notification
 | 
						|
 | 
						|
        from jinja2 import BaseLoader, TemplateSyntaxError, UndefinedError
 | 
						|
        from jinja2.sandbox import ImmutableSandboxedEnvironment
 | 
						|
        from jinja2.meta import find_undeclared_variables
 | 
						|
        import jinja2.exceptions
 | 
						|
 | 
						|
        # Might be a list of text, or might be just text (like from the apprise url list)
 | 
						|
        joined_data = ' '.join(map(str, field.data)) if isinstance(field.data, list) else f"{field.data}"
 | 
						|
 | 
						|
        try:
 | 
						|
            jinja2_env = ImmutableSandboxedEnvironment(loader=BaseLoader)
 | 
						|
            jinja2_env.globals.update(notification.valid_tokens)
 | 
						|
            # Extra validation tokens provided on the form_class(... extra_tokens={}) setup
 | 
						|
            if hasattr(field, 'extra_notification_tokens'):
 | 
						|
                jinja2_env.globals.update(field.extra_notification_tokens)
 | 
						|
 | 
						|
            jinja2_env.from_string(joined_data).render()
 | 
						|
        except TemplateSyntaxError as e:
 | 
						|
            raise ValidationError(f"This is not a valid Jinja2 template: {e}") from e
 | 
						|
        except UndefinedError as e:
 | 
						|
            raise ValidationError(f"A variable or function is not defined: {e}") from e
 | 
						|
        except jinja2.exceptions.SecurityError as e:
 | 
						|
            raise ValidationError(f"This is not a valid Jinja2 template: {e}") from e
 | 
						|
 | 
						|
        ast = jinja2_env.parse(joined_data)
 | 
						|
        undefined = ", ".join(find_undeclared_variables(ast))
 | 
						|
        if undefined:
 | 
						|
            raise ValidationError(
 | 
						|
                f"The following tokens used in the notification are not valid: {undefined}"
 | 
						|
            )
 | 
						|
 | 
						|
class validateURL(object):
 | 
						|
 | 
						|
    """
 | 
						|
       Flask wtform validators wont work with basic auth
 | 
						|
    """
 | 
						|
 | 
						|
    def __init__(self, message=None):
 | 
						|
        self.message = message
 | 
						|
 | 
						|
    def __call__(self, form, field):
 | 
						|
        # This should raise a ValidationError() or not
 | 
						|
        validate_url(field.data)
 | 
						|
 | 
						|
def validate_url(test_url):
 | 
						|
    # If hosts that only contain alphanumerics are allowed ("localhost" for example)
 | 
						|
    try:
 | 
						|
        url_validator(test_url, simple_host=allow_simplehost)
 | 
						|
    except validators.ValidationError:
 | 
						|
        #@todo check for xss
 | 
						|
        message = f"'{test_url}' is not a valid URL."
 | 
						|
        # This should be wtforms.validators.
 | 
						|
        raise ValidationError(message)
 | 
						|
 | 
						|
    from .model.Watch import is_safe_url
 | 
						|
    if not is_safe_url(test_url):
 | 
						|
        # This should be wtforms.validators.
 | 
						|
        raise ValidationError('Watch protocol is not permitted by SAFE_PROTOCOL_REGEX or incorrect URL format')
 | 
						|
 | 
						|
class ValidateListRegex(object):
 | 
						|
    """
 | 
						|
    Validates that anything that looks like a regex passes as a regex
 | 
						|
    """
 | 
						|
    def __init__(self, message=None):
 | 
						|
        self.message = message
 | 
						|
 | 
						|
    def __call__(self, form, field):
 | 
						|
 | 
						|
        for line in field.data:
 | 
						|
            if re.search(html_tools.PERL_STYLE_REGEX, line, re.IGNORECASE):
 | 
						|
                try:
 | 
						|
                    regex = html_tools.perl_style_slash_enclosed_regex_to_options(line)
 | 
						|
                    re.compile(regex)
 | 
						|
                except re.error:
 | 
						|
                    message = field.gettext('RegEx \'%s\' is not a valid regular expression.')
 | 
						|
                    raise ValidationError(message % (line))
 | 
						|
 | 
						|
class ValidateCSSJSONXPATHInput(object):
 | 
						|
    """
 | 
						|
    Filter validation
 | 
						|
    @todo CSS validator ;)
 | 
						|
    """
 | 
						|
 | 
						|
    def __init__(self, message=None, allow_xpath=True, allow_json=True):
 | 
						|
        self.message = message
 | 
						|
        self.allow_xpath = allow_xpath
 | 
						|
        self.allow_json = allow_json
 | 
						|
 | 
						|
    def __call__(self, form, field):
 | 
						|
 | 
						|
        if isinstance(field.data, str):
 | 
						|
            data = [field.data]
 | 
						|
        else:
 | 
						|
            data = field.data
 | 
						|
 | 
						|
        for line in data:
 | 
						|
        # Nothing to see here
 | 
						|
            if not len(line.strip()):
 | 
						|
                return
 | 
						|
 | 
						|
            # Does it look like XPath?
 | 
						|
            if line.strip()[0] == '/' or line.strip().startswith('xpath:'):
 | 
						|
                if not self.allow_xpath:
 | 
						|
                    raise ValidationError("XPath not permitted in this field!")
 | 
						|
                from lxml import etree, html
 | 
						|
                import elementpath
 | 
						|
                # xpath 2.0-3.1
 | 
						|
                from elementpath.xpath3 import XPath3Parser
 | 
						|
                tree = html.fromstring("<html></html>")
 | 
						|
                line = line.replace('xpath:', '')
 | 
						|
 | 
						|
                try:
 | 
						|
                    elementpath.select(tree, line.strip(), parser=XPath3Parser)
 | 
						|
                except elementpath.ElementPathError as e:
 | 
						|
                    message = field.gettext('\'%s\' is not a valid XPath expression. (%s)')
 | 
						|
                    raise ValidationError(message % (line, str(e)))
 | 
						|
                except:
 | 
						|
                    raise ValidationError("A system-error occurred when validating your XPath expression")
 | 
						|
 | 
						|
            if line.strip().startswith('xpath1:'):
 | 
						|
                if not self.allow_xpath:
 | 
						|
                    raise ValidationError("XPath not permitted in this field!")
 | 
						|
                from lxml import etree, html
 | 
						|
                tree = html.fromstring("<html></html>")
 | 
						|
                line = re.sub(r'^xpath1:', '', line)
 | 
						|
 | 
						|
                try:
 | 
						|
                    tree.xpath(line.strip())
 | 
						|
                except etree.XPathEvalError as e:
 | 
						|
                    message = field.gettext('\'%s\' is not a valid XPath expression. (%s)')
 | 
						|
                    raise ValidationError(message % (line, str(e)))
 | 
						|
                except:
 | 
						|
                    raise ValidationError("A system-error occurred when validating your XPath expression")
 | 
						|
 | 
						|
            if 'json:' in line:
 | 
						|
                if not self.allow_json:
 | 
						|
                    raise ValidationError("JSONPath not permitted in this field!")
 | 
						|
 | 
						|
                from jsonpath_ng.exceptions import (
 | 
						|
                    JsonPathLexerError,
 | 
						|
                    JsonPathParserError,
 | 
						|
                )
 | 
						|
                from jsonpath_ng.ext import parse
 | 
						|
 | 
						|
                input = line.replace('json:', '')
 | 
						|
 | 
						|
                try:
 | 
						|
                    parse(input)
 | 
						|
                except (JsonPathParserError, JsonPathLexerError) as e:
 | 
						|
                    message = field.gettext('\'%s\' is not a valid JSONPath expression. (%s)')
 | 
						|
                    raise ValidationError(message % (input, str(e)))
 | 
						|
                except:
 | 
						|
                    raise ValidationError("A system-error occurred when validating your JSONPath expression")
 | 
						|
 | 
						|
                # Re #265 - maybe in the future fetch the page and offer a
 | 
						|
                # warning/notice that its possible the rule doesnt yet match anything?
 | 
						|
                if not self.allow_json:
 | 
						|
                    raise ValidationError("jq not permitted in this field!")
 | 
						|
 | 
						|
            if 'jq:' in line:
 | 
						|
                try:
 | 
						|
                    import jq
 | 
						|
                except ModuleNotFoundError:
 | 
						|
                    # `jq` requires full compilation in windows and so isn't generally available
 | 
						|
                    raise ValidationError("jq not support not found")
 | 
						|
 | 
						|
                input = line.replace('jq:', '')
 | 
						|
 | 
						|
                try:
 | 
						|
                    jq.compile(input)
 | 
						|
                except (ValueError) as e:
 | 
						|
                    message = field.gettext('\'%s\' is not a valid jq expression. (%s)')
 | 
						|
                    raise ValidationError(message % (input, str(e)))
 | 
						|
                except:
 | 
						|
                    raise ValidationError("A system-error occurred when validating your jq expression")
 | 
						|
 | 
						|
class quickWatchForm(Form):
 | 
						|
    from . import processors
 | 
						|
 | 
						|
    url = fields.URLField('URL', validators=[validateURL()])
 | 
						|
    tags = StringTagUUID('Group tag', [validators.Optional()])
 | 
						|
    watch_submit_button = SubmitField('Watch', render_kw={"class": "pure-button pure-button-primary"})
 | 
						|
    processor = RadioField(u'Processor', choices=processors.available_processors(), default="text_json_diff")
 | 
						|
    edit_and_watch_submit_button = SubmitField('Edit > Watch', render_kw={"class": "pure-button pure-button-primary"})
 | 
						|
 | 
						|
 | 
						|
# Common to a single watch and the global settings
 | 
						|
class commonSettingsForm(Form):
 | 
						|
    from . import processors
 | 
						|
 | 
						|
    def __init__(self, formdata=None, obj=None, prefix="", data=None, meta=None, **kwargs):
 | 
						|
        super().__init__(formdata, obj, prefix, data, meta, **kwargs)
 | 
						|
        self.notification_body.extra_notification_tokens = kwargs.get('extra_notification_tokens', {})
 | 
						|
        self.notification_title.extra_notification_tokens = kwargs.get('extra_notification_tokens', {})
 | 
						|
        self.notification_urls.extra_notification_tokens = kwargs.get('extra_notification_tokens', {})
 | 
						|
 | 
						|
    extract_title_as_title = BooleanField('Extract <title> from document and use as watch title', default=False)
 | 
						|
    fetch_backend = RadioField(u'Fetch Method', choices=content_fetchers.available_fetchers(), validators=[ValidateContentFetcherIsReady()])
 | 
						|
    notification_body = TextAreaField('Notification Body', default='{{ watch_url }} had a change.', validators=[validators.Optional(), ValidateJinja2Template()])
 | 
						|
    notification_format = SelectField('Notification format', choices=valid_notification_formats.keys())
 | 
						|
    notification_title = StringField('Notification Title', default='ChangeDetection.io Notification - {{ watch_url }}', validators=[validators.Optional(), ValidateJinja2Template()])
 | 
						|
    notification_urls = StringListField('Notification URL List', validators=[validators.Optional(), ValidateAppRiseServers(), ValidateJinja2Template()])
 | 
						|
    processor = RadioField( label=u"Processor - What do you want to achieve?", choices=processors.available_processors(), default="text_json_diff")
 | 
						|
    webdriver_delay = IntegerField('Wait seconds before extracting text', validators=[validators.Optional(), validators.NumberRange(min=1, message="Should contain one or more seconds")])
 | 
						|
 | 
						|
 | 
						|
class importForm(Form):
 | 
						|
    from . import processors
 | 
						|
    processor = RadioField(u'Processor', choices=processors.available_processors(), default="text_json_diff")
 | 
						|
    urls = TextAreaField('URLs')
 | 
						|
    xlsx_file = FileField('Upload .xlsx file', validators=[FileAllowed(['xlsx'], 'Must be .xlsx file!')])
 | 
						|
    file_mapping = SelectField('File mapping', [validators.DataRequired()], choices={('wachete', 'Wachete mapping'), ('custom','Custom mapping')})
 | 
						|
 | 
						|
 | 
						|
class SingleBrowserStep(Form):
 | 
						|
 | 
						|
    operation = SelectField('Operation', [validators.Optional()], choices=browser_step_ui_config.keys())
 | 
						|
 | 
						|
    # maybe better to set some <script>var..
 | 
						|
    selector = StringField('Selector', [validators.Optional()], render_kw={"placeholder": "CSS or xPath selector"})
 | 
						|
    optional_value = StringField('value', [validators.Optional()], render_kw={"placeholder": "Value"})
 | 
						|
#   @todo move to JS? ajax fetch new field?
 | 
						|
#    remove_button = SubmitField('-', render_kw={"type": "button", "class": "pure-button pure-button-primary", 'title': 'Remove'})
 | 
						|
#    add_button = SubmitField('+', render_kw={"type": "button", "class": "pure-button pure-button-primary", 'title': 'Add new step after'})
 | 
						|
 | 
						|
class processor_text_json_diff_form(commonSettingsForm):
 | 
						|
 | 
						|
    url = fields.URLField('URL', validators=[validateURL()])
 | 
						|
    tags = StringTagUUID('Group tag', [validators.Optional()], default='')
 | 
						|
 | 
						|
    time_between_check = FormField(TimeBetweenCheckForm)
 | 
						|
    time_between_check_use_default = BooleanField('Use global settings for time between check', default=False)
 | 
						|
 | 
						|
    include_filters = StringListField('CSS/JSONPath/JQ/XPath Filters', [ValidateCSSJSONXPATHInput()], default='')
 | 
						|
 | 
						|
    subtractive_selectors = StringListField('Remove elements', [ValidateCSSJSONXPATHInput(allow_json=False)])
 | 
						|
 | 
						|
    extract_text = StringListField('Extract text', [ValidateListRegex()])
 | 
						|
 | 
						|
    title = StringField('Title', default='')
 | 
						|
 | 
						|
    ignore_text = StringListField('Ignore lines containing', [ValidateListRegex()])
 | 
						|
    headers = StringDictKeyValue('Request headers')
 | 
						|
    body = TextAreaField('Request body', [validators.Optional()])
 | 
						|
    method = SelectField('Request method', choices=valid_method, default=default_method)
 | 
						|
    ignore_status_codes = BooleanField('Ignore status codes (process non-2xx status codes as normal)', default=False)
 | 
						|
    check_unique_lines = BooleanField('Only trigger when unique lines appear in all history', default=False)
 | 
						|
    remove_duplicate_lines = BooleanField('Remove duplicate lines of text', default=False)
 | 
						|
    sort_text_alphabetically =  BooleanField('Sort text alphabetically', default=False)
 | 
						|
    trim_text_whitespace = BooleanField('Trim whitespace before and after text', default=False)
 | 
						|
 | 
						|
    filter_text_added = BooleanField('Added lines', default=True)
 | 
						|
    filter_text_replaced = BooleanField('Replaced/changed lines', default=True)
 | 
						|
    filter_text_removed = BooleanField('Removed lines', default=True)
 | 
						|
 | 
						|
    trigger_text = StringListField('Trigger/wait for text', [validators.Optional(), ValidateListRegex()])
 | 
						|
    if os.getenv("PLAYWRIGHT_DRIVER_URL"):
 | 
						|
        browser_steps = FieldList(FormField(SingleBrowserStep), min_entries=10)
 | 
						|
    text_should_not_be_present = StringListField('Block change-detection while text matches', [validators.Optional(), ValidateListRegex()])
 | 
						|
    webdriver_js_execute_code = TextAreaField('Execute JavaScript before change detection', render_kw={"rows": "5"}, validators=[validators.Optional()])
 | 
						|
 | 
						|
    save_button = SubmitField('Save', render_kw={"class": "pure-button pure-button-primary"})
 | 
						|
 | 
						|
    proxy = RadioField('Proxy')
 | 
						|
    filter_failure_notification_send = BooleanField(
 | 
						|
        'Send a notification when the filter can no longer be found on the page', default=False)
 | 
						|
 | 
						|
    notification_muted = BooleanField('Notifications Muted / Off', default=False)
 | 
						|
    notification_screenshot = BooleanField('Attach screenshot to notification (where possible)', default=False)
 | 
						|
 | 
						|
    def extra_tab_content(self):
 | 
						|
        return None
 | 
						|
 | 
						|
    def extra_form_content(self):
 | 
						|
        return None
 | 
						|
 | 
						|
    def validate(self, **kwargs):
 | 
						|
        if not super().validate():
 | 
						|
            return False
 | 
						|
 | 
						|
        result = True
 | 
						|
 | 
						|
        # Fail form validation when a body is set for a GET
 | 
						|
        if self.method.data == 'GET' and self.body.data:
 | 
						|
            self.body.errors.append('Body must be empty when Request Method is set to GET')
 | 
						|
            result = False
 | 
						|
 | 
						|
        # Attempt to validate jinja2 templates in the URL
 | 
						|
        try:
 | 
						|
            from changedetectionio.safe_jinja import render as jinja_render
 | 
						|
            jinja_render(template_str=self.url.data)
 | 
						|
        except ModuleNotFoundError as e:
 | 
						|
            # incase jinja2_time or others is missing
 | 
						|
            logger.error(e)
 | 
						|
            self.url.errors.append(e)
 | 
						|
            result = False
 | 
						|
        except Exception as e:
 | 
						|
            logger.error(e)
 | 
						|
            self.url.errors.append('Invalid template syntax')
 | 
						|
            result = False
 | 
						|
 | 
						|
        return result
 | 
						|
 | 
						|
class SingleExtraProxy(Form):
 | 
						|
 | 
						|
    # maybe better to set some <script>var..
 | 
						|
    proxy_name = StringField('Name', [validators.Optional()], render_kw={"placeholder": "Name"})
 | 
						|
    proxy_url = StringField('Proxy URL', [validators.Optional()], render_kw={"placeholder": "socks5:// or regular proxy http://user:pass@...:3128", "size":50})
 | 
						|
    # @todo do the validation here instead
 | 
						|
 | 
						|
class SingleExtraBrowser(Form):
 | 
						|
    browser_name = StringField('Name', [validators.Optional()], render_kw={"placeholder": "Name"})
 | 
						|
    browser_connection_url = StringField('Browser connection URL', [validators.Optional()], render_kw={"placeholder": "wss://brightdata... wss://oxylabs etc", "size":50})
 | 
						|
    # @todo do the validation here instead
 | 
						|
 | 
						|
class DefaultUAInputForm(Form):
 | 
						|
    html_requests = StringField('Plaintext requests', validators=[validators.Optional()], render_kw={"placeholder": "<default>"})
 | 
						|
    if os.getenv("PLAYWRIGHT_DRIVER_URL") or os.getenv("WEBDRIVER_URL"):
 | 
						|
        html_webdriver = StringField('Chrome requests', validators=[validators.Optional()], render_kw={"placeholder": "<default>"})
 | 
						|
 | 
						|
# datastore.data['settings']['requests']..
 | 
						|
class globalSettingsRequestForm(Form):
 | 
						|
    time_between_check = FormField(TimeBetweenCheckForm)
 | 
						|
    proxy = RadioField('Proxy')
 | 
						|
    jitter_seconds = IntegerField('Random jitter seconds ± check',
 | 
						|
                                  render_kw={"style": "width: 5em;"},
 | 
						|
                                  validators=[validators.NumberRange(min=0, message="Should contain zero or more seconds")])
 | 
						|
    extra_proxies = FieldList(FormField(SingleExtraProxy), min_entries=5)
 | 
						|
    extra_browsers = FieldList(FormField(SingleExtraBrowser), min_entries=5)
 | 
						|
 | 
						|
    default_ua = FormField(DefaultUAInputForm, label="Default User-Agent overrides")
 | 
						|
 | 
						|
    def validate_extra_proxies(self, extra_validators=None):
 | 
						|
        for e in self.data['extra_proxies']:
 | 
						|
            if e.get('proxy_name') or e.get('proxy_url'):
 | 
						|
                if not e.get('proxy_name','').strip() or not e.get('proxy_url','').strip():
 | 
						|
                    self.extra_proxies.errors.append('Both a name, and a Proxy URL is required.')
 | 
						|
                    return False
 | 
						|
 | 
						|
 | 
						|
# datastore.data['settings']['application']..
 | 
						|
class globalSettingsApplicationForm(commonSettingsForm):
 | 
						|
 | 
						|
    api_access_token_enabled = BooleanField('API access token security check enabled', default=True, validators=[validators.Optional()])
 | 
						|
    base_url = StringField('Notification base URL override',
 | 
						|
                           validators=[validators.Optional()],
 | 
						|
                           render_kw={"placeholder": os.getenv('BASE_URL', 'Not set')}
 | 
						|
                           )
 | 
						|
    empty_pages_are_a_change =  BooleanField('Treat empty pages as a change?', default=False)
 | 
						|
    fetch_backend = RadioField('Fetch Method', default="html_requests", choices=content_fetchers.available_fetchers(), validators=[ValidateContentFetcherIsReady()])
 | 
						|
    global_ignore_text = StringListField('Ignore Text', [ValidateListRegex()])
 | 
						|
    global_subtractive_selectors = StringListField('Remove elements', [ValidateCSSJSONXPATHInput(allow_json=False)])
 | 
						|
    ignore_whitespace = BooleanField('Ignore whitespace')
 | 
						|
    password = SaltyPasswordField()
 | 
						|
    pager_size = IntegerField('Pager size',
 | 
						|
                              render_kw={"style": "width: 5em;"},
 | 
						|
                              validators=[validators.NumberRange(min=0,
 | 
						|
                                                                 message="Should be atleast zero (disabled)")])
 | 
						|
    removepassword_button = SubmitField('Remove password', render_kw={"class": "pure-button pure-button-primary"})
 | 
						|
    render_anchor_tag_content = BooleanField('Render anchor tag content', default=False)
 | 
						|
    shared_diff_access = BooleanField('Allow access to view diff page when password is enabled', default=False, validators=[validators.Optional()])
 | 
						|
    rss_hide_muted_watches = BooleanField('Hide muted watches from RSS feed', default=True,
 | 
						|
                                      validators=[validators.Optional()])
 | 
						|
    filter_failure_notification_threshold_attempts = IntegerField('Number of times the filter can be missing before sending a notification',
 | 
						|
                                                                  render_kw={"style": "width: 5em;"},
 | 
						|
                                                                  validators=[validators.NumberRange(min=0,
 | 
						|
                                                                                                     message="Should contain zero or more attempts")])
 | 
						|
 | 
						|
 | 
						|
class globalSettingsForm(Form):
 | 
						|
    # Define these as FormFields/"sub forms", this way it matches the JSON storage
 | 
						|
    # datastore.data['settings']['application']..
 | 
						|
    # datastore.data['settings']['requests']..
 | 
						|
    def __init__(self, formdata=None, obj=None, prefix="", data=None, meta=None, **kwargs):
 | 
						|
        super().__init__(formdata, obj, prefix, data, meta, **kwargs)
 | 
						|
        self.application.notification_body.extra_notification_tokens = kwargs.get('extra_notification_tokens', {})
 | 
						|
        self.application.notification_title.extra_notification_tokens = kwargs.get('extra_notification_tokens', {})
 | 
						|
        self.application.notification_urls.extra_notification_tokens = kwargs.get('extra_notification_tokens', {})
 | 
						|
 | 
						|
    requests = FormField(globalSettingsRequestForm)
 | 
						|
    application = FormField(globalSettingsApplicationForm)
 | 
						|
    save_button = SubmitField('Save', render_kw={"class": "pure-button pure-button-primary"})
 | 
						|
 | 
						|
 | 
						|
class extractDataForm(Form):
 | 
						|
    extract_regex = StringField('RegEx to extract', validators=[validators.Length(min=1, message="Needs a RegEx")])
 | 
						|
    extract_submit_button = SubmitField('Extract as CSV', render_kw={"class": "pure-button pure-button-primary"})
 |