mirror of
				https://github.com/dgtlmoon/changedetection.io.git
				synced 2025-11-04 08:34:57 +00:00 
			
		
		
		
	Compare commits
	
		
			1 Commits
		
	
	
		
			0.43.2
			...
			elementpat
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
| 
						 | 
					b72ecfa852 | 
@@ -1,18 +1,2 @@
 | 
			
		||||
.git
 | 
			
		||||
.github
 | 
			
		||||
changedetectionio/processors/__pycache__
 | 
			
		||||
changedetectionio/api/__pycache__
 | 
			
		||||
changedetectionio/model/__pycache__
 | 
			
		||||
changedetectionio/blueprint/price_data_follower/__pycache__
 | 
			
		||||
changedetectionio/blueprint/tags/__pycache__
 | 
			
		||||
changedetectionio/blueprint/__pycache__
 | 
			
		||||
changedetectionio/blueprint/browser_steps/__pycache__
 | 
			
		||||
changedetectionio/fetchers/__pycache__
 | 
			
		||||
changedetectionio/tests/visualselector/__pycache__
 | 
			
		||||
changedetectionio/tests/restock/__pycache__
 | 
			
		||||
changedetectionio/tests/__pycache__
 | 
			
		||||
changedetectionio/tests/fetchers/__pycache__
 | 
			
		||||
changedetectionio/tests/unit/__pycache__
 | 
			
		||||
changedetectionio/tests/proxy_list/__pycache__
 | 
			
		||||
changedetectionio/__pycache__
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										12
									
								
								.github/workflows/test-only.yml
									
									
									
									
										vendored
									
									
								
							
							
						
						
									
										12
									
								
								.github/workflows/test-only.yml
									
									
									
									
										vendored
									
									
								
							@@ -37,11 +37,6 @@ jobs:
 | 
			
		||||
          # Build a changedetection.io container and start testing inside
 | 
			
		||||
          docker build . -t test-changedetectionio
 | 
			
		||||
 | 
			
		||||
      - name: Spin up ancillary SMTP+Echo message test server
 | 
			
		||||
        run: |
 | 
			
		||||
          # Debug SMTP server/echo message back server
 | 
			
		||||
          docker run --network changedet-network -d -p 11025:11025 -p 11080:11080  --hostname mailserver test-changedetectionio  bash -c 'python changedetectionio/tests/smtp/smtp-test-server.py' 
 | 
			
		||||
 | 
			
		||||
      - name: Test built container with pytest
 | 
			
		||||
        run: |
 | 
			
		||||
          
 | 
			
		||||
@@ -63,16 +58,11 @@ jobs:
 | 
			
		||||
          # Settings headers playwright tests - Call back in from Browserless, check headers
 | 
			
		||||
          docker run --name "changedet" --hostname changedet --rm -e "FLASK_SERVER_NAME=changedet" -e "PLAYWRIGHT_DRIVER_URL=ws://browserless:3000?dumpio=true" --network changedet-network test-changedetectionio  bash -c 'cd changedetectionio; pytest --live-server-host=0.0.0.0  --live-server-port=5004 tests/test_request.py'
 | 
			
		||||
          docker run --name "changedet" --hostname changedet --rm -e "FLASK_SERVER_NAME=changedet" -e "WEBDRIVER_URL=http://selenium:4444/wd/hub" --network changedet-network test-changedetectionio  bash -c 'cd changedetectionio; pytest --live-server-host=0.0.0.0  --live-server-port=5004 tests/test_request.py'
 | 
			
		||||
          docker run --name "changedet" --hostname changedet --rm -e "FLASK_SERVER_NAME=changedet" -e "USE_EXPERIMENTAL_PUPPETEER_FETCH=yes" -e "PLAYWRIGHT_DRIVER_URL=ws://browserless:3000?dumpio=true" --network changedet-network test-changedetectionio  bash -c 'cd changedetectionio; pytest --live-server-host=0.0.0.0  --live-server-port=5004 tests/test_request.py'          
 | 
			
		||||
          docker run --name "changedet" --hostname changedet --rm -e "FLASK_SERVER_NAME=changedet" -e "USE_EXPERIMENTAL_PUPPETEER_FETCH=yes" -e "PLAYWRIGHT_DRIVER_URL=ws://browserless:3000?dumpio=true" --network changedet-network test-changedetectionio  bash -c 'cd changedetectionio; pytest --live-server-host=0.0.0.0  --live-server-port=5004 tests/test_request.py'
 | 
			
		||||
          
 | 
			
		||||
          # restock detection via playwright - added name=changedet here so that playwright/browserless can connect to it
 | 
			
		||||
          docker run --rm --name "changedet" -e "FLASK_SERVER_NAME=changedet" -e "PLAYWRIGHT_DRIVER_URL=ws://browserless:3000" --network changedet-network test-changedetectionio  bash -c 'cd changedetectionio;pytest --live-server-port=5004 --live-server-host=0.0.0.0 tests/restock/test_restock.py'
 | 
			
		||||
 | 
			
		||||
      - name: Test SMTP notification mime types
 | 
			
		||||
        run: |
 | 
			
		||||
          # SMTP content types - needs the 'Debug SMTP server/echo message back server' container from above
 | 
			
		||||
          docker run --rm  --network changedet-network test-changedetectionio bash -c 'cd changedetectionio;pytest tests/smtp/test_notification_smtp.py'
 | 
			
		||||
 | 
			
		||||
      - name: Test with puppeteer fetcher and disk cache
 | 
			
		||||
        run: |
 | 
			
		||||
          docker run --rm -e "PUPPETEER_DISK_CACHE=/tmp/data/" -e "USE_EXPERIMENTAL_PUPPETEER_FETCH=yes" -e "PLAYWRIGHT_DRIVER_URL=ws://browserless:3000" --network changedet-network test-changedetectionio  bash -c 'cd changedetectionio;pytest tests/fetchers/test_content.py && pytest tests/test_errorhandling.py && pytest tests/visualselector/test_fetch_data.py'
 | 
			
		||||
 
 | 
			
		||||
@@ -38,7 +38,7 @@ from flask_paginate import Pagination, get_page_parameter
 | 
			
		||||
from changedetectionio import html_tools
 | 
			
		||||
from changedetectionio.api import api_v1
 | 
			
		||||
 | 
			
		||||
__version__ = '0.43.2'
 | 
			
		||||
__version__ = '0.43'
 | 
			
		||||
 | 
			
		||||
datastore = None
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -219,15 +219,13 @@ class CreateWatch(Resource):
 | 
			
		||||
 | 
			
		||||
        extras = copy.deepcopy(json_data)
 | 
			
		||||
 | 
			
		||||
        # Because we renamed 'tag' to 'tags' but don't want to change the API (can do this in v2 of the API)
 | 
			
		||||
        tags = None
 | 
			
		||||
        # Because we renamed 'tag' to 'tags' but dont want to change the API (can do this in v2 of the API)
 | 
			
		||||
        if extras.get('tag'):
 | 
			
		||||
            tags = extras.get('tag')
 | 
			
		||||
            del extras['tag']
 | 
			
		||||
            extras['tags'] = extras.get('tag')
 | 
			
		||||
 | 
			
		||||
        del extras['url']
 | 
			
		||||
 | 
			
		||||
        new_uuid = self.datastore.add_watch(url=url, extras=extras, tag=tags)
 | 
			
		||||
        new_uuid = self.datastore.add_watch(url=url, extras=extras)
 | 
			
		||||
        if new_uuid:
 | 
			
		||||
            self.update_q.put(queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': new_uuid, 'skip_when_checksum_same': True}))
 | 
			
		||||
            return {'uuid': new_uuid}, 201
 | 
			
		||||
 
 | 
			
		||||
@@ -76,16 +76,6 @@ def construct_blueprint(datastore: ChangeDetectionStore):
 | 
			
		||||
        flash(f"Tag unlinked removed from {unlinked} watches")
 | 
			
		||||
        return redirect(url_for('tags.tags_overview_page'))
 | 
			
		||||
 | 
			
		||||
    @tags_blueprint.route("/delete_all", methods=['GET'])
 | 
			
		||||
    @login_optionally_required
 | 
			
		||||
    def delete_all():
 | 
			
		||||
        for watch_uuid, watch in datastore.data['watching'].items():
 | 
			
		||||
            watch['tags'] = []
 | 
			
		||||
        datastore.data['settings']['application']['tags'] = {}
 | 
			
		||||
 | 
			
		||||
        flash(f"All tags deleted")
 | 
			
		||||
        return redirect(url_for('tags.tags_overview_page'))
 | 
			
		||||
 | 
			
		||||
    @tags_blueprint.route("/edit/<string:uuid>", methods=['GET'])
 | 
			
		||||
    @login_optionally_required
 | 
			
		||||
    def form_tag_edit(uuid):
 | 
			
		||||
 
 | 
			
		||||
@@ -54,5 +54,4 @@ def render_diff(previous_version_file_contents, newest_version_file_contents, in
 | 
			
		||||
 | 
			
		||||
    # Recursively join lists
 | 
			
		||||
    f = lambda L: line_feed_sep.join([f(x) if type(x) is list else x for x in L])
 | 
			
		||||
    p= f(rendered_diff)
 | 
			
		||||
    return p
 | 
			
		||||
    return f(rendered_diff)
 | 
			
		||||
 
 | 
			
		||||
@@ -316,10 +316,11 @@ class ValidateCSSJSONXPATHInput(object):
 | 
			
		||||
                if not self.allow_xpath:
 | 
			
		||||
                    raise ValidationError("XPath not permitted in this field!")
 | 
			
		||||
                from lxml import etree, html
 | 
			
		||||
                import elementpath
 | 
			
		||||
                tree = html.fromstring("<html></html>")
 | 
			
		||||
 | 
			
		||||
                try:
 | 
			
		||||
                    tree.xpath(line.strip())
 | 
			
		||||
                    elementpath.select(tree, line)
 | 
			
		||||
                except etree.XPathEvalError as e:
 | 
			
		||||
                    message = field.gettext('\'%s\' is not a valid XPath expression. (%s)')
 | 
			
		||||
                    raise ValidationError(message % (line, str(e)))
 | 
			
		||||
 
 | 
			
		||||
@@ -51,12 +51,13 @@ def element_removal(selectors: List[str], html_content):
 | 
			
		||||
 | 
			
		||||
# Return str Utf-8 of matched rules
 | 
			
		||||
def xpath_filter(xpath_filter, html_content, append_pretty_line_formatting=False):
 | 
			
		||||
    import elementpath
 | 
			
		||||
    from lxml import etree, html
 | 
			
		||||
 | 
			
		||||
    tree = html.fromstring(bytes(html_content, encoding='utf-8'))
 | 
			
		||||
    html_block = ""
 | 
			
		||||
 | 
			
		||||
    r = tree.xpath(xpath_filter.strip(), namespaces={'re': 'http://exslt.org/regular-expressions'})
 | 
			
		||||
    r = elementpath.select(tree, xpath_filter.strip())
 | 
			
		||||
    #@note: //title/text() wont work where <title>CDATA..
 | 
			
		||||
 | 
			
		||||
    for element in r:
 | 
			
		||||
 
 | 
			
		||||
@@ -85,8 +85,7 @@ class import_distill_io_json(Importer):
 | 
			
		||||
        now = time.time()
 | 
			
		||||
        self.new_uuids=[]
 | 
			
		||||
 | 
			
		||||
        # @todo Use JSONSchema like in the API to validate here.
 | 
			
		||||
        
 | 
			
		||||
 | 
			
		||||
        try:
 | 
			
		||||
            data = json.loads(data.strip())
 | 
			
		||||
        except json.decoder.JSONDecodeError:
 | 
			
		||||
@@ -121,8 +120,11 @@ class import_distill_io_json(Importer):
 | 
			
		||||
                except IndexError:
 | 
			
		||||
                    pass
 | 
			
		||||
 | 
			
		||||
# Does this need to be here anymore?
 | 
			
		||||
                if d.get('tags', False):
 | 
			
		||||
                    extras['tags'] = ", ".join(d['tags'])
 | 
			
		||||
 | 
			
		||||
                new_uuid = datastore.add_watch(url=d['uri'].strip(),
 | 
			
		||||
                                               tag=",".join(d.get('tags', [])),
 | 
			
		||||
                                               extras=extras,
 | 
			
		||||
                                               write_to_disk_now=False)
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -92,12 +92,6 @@ def process_notification(n_object, datastore):
 | 
			
		||||
        n_object.get('notification_format', default_notification_format),
 | 
			
		||||
        valid_notification_formats[default_notification_format],
 | 
			
		||||
    )
 | 
			
		||||
 | 
			
		||||
    # If we arrived with 'System default' then look it up
 | 
			
		||||
    if n_format == default_notification_format_for_watch and datastore.data['settings']['application'].get('notification_format') != default_notification_format_for_watch:
 | 
			
		||||
        # Initially text or whatever
 | 
			
		||||
        n_format = datastore.data['settings']['application'].get('notification_format', valid_notification_formats[default_notification_format])
 | 
			
		||||
 | 
			
		||||
    
 | 
			
		||||
    # https://github.com/caronc/apprise/wiki/Development_LogCapture
 | 
			
		||||
    # Anything higher than or equal to WARNING (which covers things like Connection errors)
 | 
			
		||||
@@ -151,12 +145,9 @@ def process_notification(n_object, datastore):
 | 
			
		||||
                    # Apprise will default to HTML, so we need to override it
 | 
			
		||||
                    # So that whats' generated in n_body is in line with what is going to be sent.
 | 
			
		||||
                    # https://github.com/caronc/apprise/issues/633#issuecomment-1191449321
 | 
			
		||||
                    if not 'format=' in url and (n_format == 'Text' or n_format == 'Markdown'):
 | 
			
		||||
                    if not 'format=' in url and (n_format == 'text' or n_format == 'markdown'):
 | 
			
		||||
                        prefix = '?' if not '?' in url else '&'
 | 
			
		||||
                        # Apprise format is lowercase text https://github.com/caronc/apprise/issues/633
 | 
			
		||||
                        n_format = n_format.tolower()
 | 
			
		||||
                        url = "{}{}format={}".format(url, prefix, n_format)
 | 
			
		||||
                    # If n_format == HTML, then apprise email should default to text/html and we should be sending HTML only
 | 
			
		||||
 | 
			
		||||
                apobj.add(url)
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -205,9 +205,10 @@ class ChangeDetectionStore:
 | 
			
		||||
 | 
			
		||||
    # Clone a watch by UUID
 | 
			
		||||
    def clone(self, uuid):
 | 
			
		||||
        url = self.data['watching'][uuid].get('url')
 | 
			
		||||
        url = self.data['watching'][uuid]['url']
 | 
			
		||||
        tag = self.data['watching'][uuid].get('tags',[])
 | 
			
		||||
        extras = self.data['watching'][uuid]
 | 
			
		||||
        new_uuid = self.add_watch(url=url, extras=extras)
 | 
			
		||||
        new_uuid = self.add_watch(url=url, tag_uuids=tag, extras=extras)
 | 
			
		||||
        return new_uuid
 | 
			
		||||
 | 
			
		||||
    def url_exists(self, url):
 | 
			
		||||
@@ -247,9 +248,12 @@ class ChangeDetectionStore:
 | 
			
		||||
        if extras is None:
 | 
			
		||||
            extras = {}
 | 
			
		||||
 | 
			
		||||
        # should always be str
 | 
			
		||||
        if tag is None or not tag:
 | 
			
		||||
            tag = ''
 | 
			
		||||
 | 
			
		||||
        # Incase these are copied across, assume it's a reference and deepcopy()
 | 
			
		||||
        apply_extras = deepcopy(extras)
 | 
			
		||||
        apply_extras['tags'] = [] if not apply_extras.get('tags') else apply_extras.get('tags')
 | 
			
		||||
 | 
			
		||||
        # Was it a share link? try to fetch the data
 | 
			
		||||
        if (url.startswith("https://changedetection.io/share/")):
 | 
			
		||||
@@ -299,22 +303,20 @@ class ChangeDetectionStore:
 | 
			
		||||
            flash('Watch protocol is not permitted by SAFE_PROTOCOL_REGEX', 'error')
 | 
			
		||||
            return None
 | 
			
		||||
 | 
			
		||||
        if tag and type(tag) == str:
 | 
			
		||||
            # Then it's probably a string of the actual tag by name, split and add it
 | 
			
		||||
            for t in tag.split(','):
 | 
			
		||||
 | 
			
		||||
        # #Re 569
 | 
			
		||||
        # Could be in 'tags',  var or extras, smash them together and strip
 | 
			
		||||
        apply_extras['tags'] = []
 | 
			
		||||
        if tag or extras.get('tags'):
 | 
			
		||||
            tags = list(filter(None, list(set().union(tag.split(','), extras.get('tags', '').split(',')))))
 | 
			
		||||
            for t in list(map(str.strip, tags)):
 | 
			
		||||
                # for each stripped tag, add tag as UUID
 | 
			
		||||
                for a_t in t.split(','):
 | 
			
		||||
                    tag_uuid = self.add_tag(a_t)
 | 
			
		||||
                    apply_extras['tags'].append(tag_uuid)
 | 
			
		||||
                apply_extras['tags'].append(self.add_tag(t))
 | 
			
		||||
 | 
			
		||||
        # Or if UUIDs given directly
 | 
			
		||||
        if tag_uuids:
 | 
			
		||||
            apply_extras['tags'] = list(set(apply_extras['tags'] + tag_uuids))
 | 
			
		||||
 | 
			
		||||
        # Make any uuids unique
 | 
			
		||||
        if apply_extras.get('tags'):
 | 
			
		||||
            apply_extras['tags'] = list(set(apply_extras.get('tags')))
 | 
			
		||||
 | 
			
		||||
        new_watch = Watch.model(datastore_path=self.datastore_path, url=url)
 | 
			
		||||
 | 
			
		||||
        new_uuid = new_watch.get('uuid')
 | 
			
		||||
@@ -566,7 +568,7 @@ class ChangeDetectionStore:
 | 
			
		||||
    def add_tag(self, name):
 | 
			
		||||
        # If name exists, return that
 | 
			
		||||
        n = name.strip().lower()
 | 
			
		||||
        print (f">>> Adding new tag - '{n}'")
 | 
			
		||||
        print (f">>> Adding new tag - '{n}")
 | 
			
		||||
        if not n:
 | 
			
		||||
            return False
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -1,42 +0,0 @@
 | 
			
		||||
#!/usr/bin/python3
 | 
			
		||||
import smtpd
 | 
			
		||||
import asyncore
 | 
			
		||||
 | 
			
		||||
# Accept a SMTP message and offer a way to retrieve the last message via TCP Socket
 | 
			
		||||
 | 
			
		||||
last_received_message = b"Nothing"
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class CustomSMTPServer(smtpd.SMTPServer):
 | 
			
		||||
 | 
			
		||||
    def process_message(self, peer, mailfrom, rcpttos, data, **kwargs):
 | 
			
		||||
        global last_received_message
 | 
			
		||||
        last_received_message = data
 | 
			
		||||
        print('Receiving message from:', peer)
 | 
			
		||||
        print('Message addressed from:', mailfrom)
 | 
			
		||||
        print('Message addressed to  :', rcpttos)
 | 
			
		||||
        print('Message length        :', len(data))
 | 
			
		||||
        print(data.decode('utf8'))
 | 
			
		||||
        return
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
# Just print out the last message received on plain TCP socket server
 | 
			
		||||
class EchoServer(asyncore.dispatcher):
 | 
			
		||||
 | 
			
		||||
    def __init__(self, host, port):
 | 
			
		||||
        asyncore.dispatcher.__init__(self)
 | 
			
		||||
        self.create_socket()
 | 
			
		||||
        self.set_reuse_addr()
 | 
			
		||||
        self.bind((host, port))
 | 
			
		||||
        self.listen(5)
 | 
			
		||||
 | 
			
		||||
    def handle_accepted(self, sock, addr):
 | 
			
		||||
        global last_received_message
 | 
			
		||||
        print('Incoming connection from %s' % repr(addr))
 | 
			
		||||
        sock.send(last_received_message)
 | 
			
		||||
        last_received_message = b''
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
server = CustomSMTPServer(('0.0.0.0', 11025), None)  # SMTP mail goes here
 | 
			
		||||
server2 = EchoServer('0.0.0.0', 11080)  # Echo back last message received
 | 
			
		||||
asyncore.loop()
 | 
			
		||||
@@ -1,165 +0,0 @@
 | 
			
		||||
import json
 | 
			
		||||
import os
 | 
			
		||||
import time
 | 
			
		||||
import re
 | 
			
		||||
from flask import url_for
 | 
			
		||||
from changedetectionio.tests.util import set_original_response, set_modified_response, set_more_modified_response, live_server_setup, \
 | 
			
		||||
    wait_for_all_checks, \
 | 
			
		||||
    set_longer_modified_response
 | 
			
		||||
from changedetectionio.tests.util import extract_UUID_from_client
 | 
			
		||||
import logging
 | 
			
		||||
import base64
 | 
			
		||||
 | 
			
		||||
# NOTE - RELIES ON mailserver as hostname running, see github build recipes
 | 
			
		||||
smtp_test_server = 'mailserver'
 | 
			
		||||
 | 
			
		||||
from changedetectionio.notification import (
 | 
			
		||||
    default_notification_body,
 | 
			
		||||
    default_notification_format,
 | 
			
		||||
    default_notification_title,
 | 
			
		||||
    valid_notification_formats,
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
def test_setup(live_server):
 | 
			
		||||
    live_server_setup(live_server)
 | 
			
		||||
 | 
			
		||||
def get_last_message_from_smtp_server():
 | 
			
		||||
    import socket
 | 
			
		||||
    global smtp_test_server
 | 
			
		||||
    port = 11080  # socket server port number
 | 
			
		||||
 | 
			
		||||
    client_socket = socket.socket()  # instantiate
 | 
			
		||||
    client_socket.connect((smtp_test_server, port))  # connect to the server
 | 
			
		||||
 | 
			
		||||
    data = client_socket.recv(50024).decode()  # receive response
 | 
			
		||||
    client_socket.close()  # close the connection
 | 
			
		||||
    return data
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
# Requires running the test SMTP server
 | 
			
		||||
 | 
			
		||||
def test_check_notification_email_formats_default_HTML(client, live_server):
 | 
			
		||||
    # live_server_setup(live_server)
 | 
			
		||||
    set_original_response()
 | 
			
		||||
 | 
			
		||||
    global smtp_test_server
 | 
			
		||||
    notification_url = f'mailto://changedetection@{smtp_test_server}:11025/?to=fff@home.com'
 | 
			
		||||
 | 
			
		||||
    #####################
 | 
			
		||||
    # Set this up for when we remove the notification from the watch, it should fallback with these details
 | 
			
		||||
    res = client.post(
 | 
			
		||||
        url_for("settings_page"),
 | 
			
		||||
        data={"application-notification_urls": notification_url,
 | 
			
		||||
              "application-notification_title": "fallback-title " + default_notification_title,
 | 
			
		||||
              "application-notification_body": "fallback-body<br> " + default_notification_body,
 | 
			
		||||
              "application-notification_format": 'HTML',
 | 
			
		||||
              "requests-time_between_check-minutes": 180,
 | 
			
		||||
              'application-fetch_backend': "html_requests"},
 | 
			
		||||
        follow_redirects=True
 | 
			
		||||
    )
 | 
			
		||||
    assert b"Settings updated." in res.data
 | 
			
		||||
 | 
			
		||||
    # Add a watch and trigger a HTTP POST
 | 
			
		||||
    test_url = url_for('test_endpoint', _external=True)
 | 
			
		||||
    res = client.post(
 | 
			
		||||
        url_for("form_quick_watch_add"),
 | 
			
		||||
        data={"url": test_url, "tags": 'nice one'},
 | 
			
		||||
        follow_redirects=True
 | 
			
		||||
    )
 | 
			
		||||
 | 
			
		||||
    assert b"Watch added" in res.data
 | 
			
		||||
 | 
			
		||||
    wait_for_all_checks(client)
 | 
			
		||||
    set_longer_modified_response()
 | 
			
		||||
    client.get(url_for("form_watch_checknow"), follow_redirects=True)
 | 
			
		||||
    wait_for_all_checks(client)
 | 
			
		||||
 | 
			
		||||
    time.sleep(3)
 | 
			
		||||
 | 
			
		||||
    msg = get_last_message_from_smtp_server()
 | 
			
		||||
    assert len(msg) >= 1
 | 
			
		||||
 | 
			
		||||
    # The email should have two bodies, and the text/html part should be <br>
 | 
			
		||||
    assert 'Content-Type: text/plain' in msg
 | 
			
		||||
    assert '(added) So let\'s see what happens.\n' in msg  # The plaintext part with \n
 | 
			
		||||
    assert 'Content-Type: text/html' in msg
 | 
			
		||||
    assert '(added) So let\'s see what happens.<br>' in msg  # the html part
 | 
			
		||||
    res = client.get(url_for("form_delete", uuid="all"), follow_redirects=True)
 | 
			
		||||
    assert b'Deleted' in res.data
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def test_check_notification_email_formats_default_Text_override_HTML(client, live_server):
 | 
			
		||||
    # live_server_setup(live_server)
 | 
			
		||||
 | 
			
		||||
    # HTML problems? see this
 | 
			
		||||
    # https://github.com/caronc/apprise/issues/633
 | 
			
		||||
 | 
			
		||||
    set_original_response()
 | 
			
		||||
    global smtp_test_server
 | 
			
		||||
    notification_url = f'mailto://changedetection@{smtp_test_server}:11025/?to=fff@home.com'
 | 
			
		||||
 | 
			
		||||
    #####################
 | 
			
		||||
    # Set this up for when we remove the notification from the watch, it should fallback with these details
 | 
			
		||||
    res = client.post(
 | 
			
		||||
        url_for("settings_page"),
 | 
			
		||||
        data={"application-notification_urls": notification_url,
 | 
			
		||||
              "application-notification_title": "fallback-title " + default_notification_title,
 | 
			
		||||
              "application-notification_body": default_notification_body,
 | 
			
		||||
              "application-notification_format": 'Text',
 | 
			
		||||
              "requests-time_between_check-minutes": 180,
 | 
			
		||||
              'application-fetch_backend': "html_requests"},
 | 
			
		||||
        follow_redirects=True
 | 
			
		||||
    )
 | 
			
		||||
    assert b"Settings updated." in res.data
 | 
			
		||||
 | 
			
		||||
    # Add a watch and trigger a HTTP POST
 | 
			
		||||
    test_url = url_for('test_endpoint', _external=True)
 | 
			
		||||
    res = client.post(
 | 
			
		||||
        url_for("form_quick_watch_add"),
 | 
			
		||||
        data={"url": test_url, "tags": 'nice one'},
 | 
			
		||||
        follow_redirects=True
 | 
			
		||||
    )
 | 
			
		||||
 | 
			
		||||
    assert b"Watch added" in res.data
 | 
			
		||||
 | 
			
		||||
    wait_for_all_checks(client)
 | 
			
		||||
    set_longer_modified_response()
 | 
			
		||||
    client.get(url_for("form_watch_checknow"), follow_redirects=True)
 | 
			
		||||
    wait_for_all_checks(client)
 | 
			
		||||
 | 
			
		||||
    time.sleep(3)
 | 
			
		||||
    msg = get_last_message_from_smtp_server()
 | 
			
		||||
    assert len(msg) >= 1
 | 
			
		||||
    #    with open('/tmp/m.txt', 'w') as f:
 | 
			
		||||
    #        f.write(msg)
 | 
			
		||||
 | 
			
		||||
    # The email should not have two bodies, should be TEXT only
 | 
			
		||||
 | 
			
		||||
    assert 'Content-Type: text/plain' in msg
 | 
			
		||||
    assert '(added) So let\'s see what happens.\n' in msg  # The plaintext part with \n
 | 
			
		||||
 | 
			
		||||
    set_original_response()
 | 
			
		||||
    # Now override as HTML format
 | 
			
		||||
    res = client.post(
 | 
			
		||||
        url_for("edit_page", uuid="first"),
 | 
			
		||||
        data={
 | 
			
		||||
            "url": test_url,
 | 
			
		||||
            "notification_format": 'HTML',
 | 
			
		||||
            'fetch_backend': "html_requests"},
 | 
			
		||||
        follow_redirects=True
 | 
			
		||||
    )
 | 
			
		||||
    assert b"Updated watch." in res.data
 | 
			
		||||
    wait_for_all_checks(client)
 | 
			
		||||
 | 
			
		||||
    time.sleep(3)
 | 
			
		||||
    msg = get_last_message_from_smtp_server()
 | 
			
		||||
    assert len(msg) >= 1
 | 
			
		||||
 | 
			
		||||
    # The email should have two bodies, and the text/html part should be <br>
 | 
			
		||||
    assert 'Content-Type: text/plain' in msg
 | 
			
		||||
    assert '(removed) So let\'s see what happens.\n' in msg  # The plaintext part with \n
 | 
			
		||||
    assert 'Content-Type: text/html' in msg
 | 
			
		||||
    assert '(removed) So let\'s see what happens.<br>' in msg  # the html part
 | 
			
		||||
 | 
			
		||||
    res = client.get(url_for("form_delete", uuid="all"), follow_redirects=True)
 | 
			
		||||
    assert b'Deleted' in res.data
 | 
			
		||||
@@ -267,7 +267,7 @@ def test_api_watch_PUT_update(client, live_server):
 | 
			
		||||
 | 
			
		||||
    #live_server_setup(live_server)
 | 
			
		||||
    api_key = extract_api_key_from_UI(client)
 | 
			
		||||
 | 
			
		||||
    time.sleep(1)
 | 
			
		||||
    # Create a watch
 | 
			
		||||
    set_original_response()
 | 
			
		||||
    test_url = url_for('test_endpoint', _external=True,
 | 
			
		||||
@@ -283,6 +283,7 @@ def test_api_watch_PUT_update(client, live_server):
 | 
			
		||||
 | 
			
		||||
    assert res.status_code == 201
 | 
			
		||||
 | 
			
		||||
    time.sleep(1)
 | 
			
		||||
 | 
			
		||||
    # Get a listing, it will be the first one
 | 
			
		||||
    res = client.get(
 | 
			
		||||
 
 | 
			
		||||
@@ -2,7 +2,7 @@
 | 
			
		||||
 | 
			
		||||
import time
 | 
			
		||||
from flask import url_for
 | 
			
		||||
from .util import live_server_setup, wait_for_all_checks, extract_rss_token_from_UI, get_UUID_for_tag_name, extract_UUID_from_client
 | 
			
		||||
from .util import live_server_setup, wait_for_all_checks, extract_rss_token_from_UI, get_UUID_for_tag_name
 | 
			
		||||
import os
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@@ -154,10 +154,6 @@ def test_tag_add_in_ui(client, live_server):
 | 
			
		||||
    )
 | 
			
		||||
    assert b"Tag added" in res.data
 | 
			
		||||
    assert b"new-test-tag" in res.data
 | 
			
		||||
 | 
			
		||||
    res = client.get(url_for("tags.delete_all"), follow_redirects=True)
 | 
			
		||||
    assert b'All tags deleted' in res.data
 | 
			
		||||
 | 
			
		||||
    res = client.get(url_for("form_delete", uuid="all"), follow_redirects=True)
 | 
			
		||||
    assert b'Deleted' in res.data
 | 
			
		||||
 | 
			
		||||
@@ -223,11 +219,13 @@ def test_group_tag_notification(client, live_server):
 | 
			
		||||
    assert "test-tag" in notification_submission
 | 
			
		||||
    assert "other-tag" in notification_submission
 | 
			
		||||
 | 
			
		||||
    #@todo Test that multiple notifications fired
 | 
			
		||||
    #@todo Test that each of multiple notifications with different settings
 | 
			
		||||
    res = client.get(url_for("form_delete", uuid="all"), follow_redirects=True)
 | 
			
		||||
    assert b'Deleted' in res.data
 | 
			
		||||
 | 
			
		||||
    #@todo Test that multiple notifications fired
 | 
			
		||||
    #@todo Test that each of multiple notifications with different settings
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def test_limit_tag_ui(client, live_server):
 | 
			
		||||
    #live_server_setup(live_server)
 | 
			
		||||
 | 
			
		||||
@@ -262,61 +260,3 @@ def test_limit_tag_ui(client, live_server):
 | 
			
		||||
    assert b'test-tag' in res.data
 | 
			
		||||
    assert res.data.count(b'processor-text_json_diff') == 20
 | 
			
		||||
    assert b"object at" not in res.data
 | 
			
		||||
    res = client.get(url_for("form_delete", uuid="all"), follow_redirects=True)
 | 
			
		||||
    assert b'Deleted' in res.data
 | 
			
		||||
    res = client.get(url_for("tags.delete_all"), follow_redirects=True)
 | 
			
		||||
    assert b'All tags deleted' in res.data
 | 
			
		||||
def test_clone_tag_on_import(client, live_server):
 | 
			
		||||
    #live_server_setup(live_server)
 | 
			
		||||
    test_url = url_for('test_endpoint', _external=True)
 | 
			
		||||
    res = client.post(
 | 
			
		||||
        url_for("import_page"),
 | 
			
		||||
        data={"urls": test_url + " test-tag, another-tag\r\n"},
 | 
			
		||||
        follow_redirects=True
 | 
			
		||||
    )
 | 
			
		||||
 | 
			
		||||
    assert b"1 Imported" in res.data
 | 
			
		||||
 | 
			
		||||
    res = client.get(url_for("index"))
 | 
			
		||||
    assert b'test-tag' in res.data
 | 
			
		||||
    assert b'another-tag' in res.data
 | 
			
		||||
 | 
			
		||||
    watch_uuid = extract_UUID_from_client(client)
 | 
			
		||||
    res = client.get(url_for("form_clone", uuid=watch_uuid), follow_redirects=True)
 | 
			
		||||
 | 
			
		||||
    assert b'Cloned' in res.data
 | 
			
		||||
    # 2 times plus the top link to tag
 | 
			
		||||
    assert res.data.count(b'test-tag') == 3
 | 
			
		||||
    assert res.data.count(b'another-tag') == 3
 | 
			
		||||
    res = client.get(url_for("form_delete", uuid="all"), follow_redirects=True)
 | 
			
		||||
    assert b'Deleted' in res.data
 | 
			
		||||
 | 
			
		||||
def test_clone_tag_on_quickwatchform_add(client, live_server):
 | 
			
		||||
    #live_server_setup(live_server)
 | 
			
		||||
 | 
			
		||||
    test_url = url_for('test_endpoint', _external=True)
 | 
			
		||||
 | 
			
		||||
    res = client.post(
 | 
			
		||||
        url_for("form_quick_watch_add"),
 | 
			
		||||
        data={"url": test_url, "tags": ' test-tag, another-tag      '},
 | 
			
		||||
        follow_redirects=True
 | 
			
		||||
    )
 | 
			
		||||
 | 
			
		||||
    assert b"Watch added" in res.data
 | 
			
		||||
 | 
			
		||||
    res = client.get(url_for("index"))
 | 
			
		||||
    assert b'test-tag' in res.data
 | 
			
		||||
    assert b'another-tag' in res.data
 | 
			
		||||
 | 
			
		||||
    watch_uuid = extract_UUID_from_client(client)
 | 
			
		||||
    res = client.get(url_for("form_clone", uuid=watch_uuid), follow_redirects=True)
 | 
			
		||||
 | 
			
		||||
    assert b'Cloned' in res.data
 | 
			
		||||
    # 2 times plus the top link to tag
 | 
			
		||||
    assert res.data.count(b'test-tag') == 3
 | 
			
		||||
    assert res.data.count(b'another-tag') == 3
 | 
			
		||||
    res = client.get(url_for("form_delete", uuid="all"), follow_redirects=True)
 | 
			
		||||
    assert b'Deleted' in res.data
 | 
			
		||||
 | 
			
		||||
    res = client.get(url_for("tags.delete_all"), follow_redirects=True)
 | 
			
		||||
    assert b'All tags deleted' in res.data
 | 
			
		||||
@@ -112,7 +112,6 @@ def test_import_distillio(client, live_server):
 | 
			
		||||
    # did the tags work?
 | 
			
		||||
    res = client.get( url_for("index"))
 | 
			
		||||
 | 
			
		||||
    # check tags
 | 
			
		||||
    assert b"nice stuff" in res.data
 | 
			
		||||
    assert b"nerd-news" in res.data
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -3,8 +3,7 @@ import os
 | 
			
		||||
import time
 | 
			
		||||
import re
 | 
			
		||||
from flask import url_for
 | 
			
		||||
from .util import set_original_response, set_modified_response, set_more_modified_response, live_server_setup, wait_for_all_checks, \
 | 
			
		||||
    set_longer_modified_response
 | 
			
		||||
from .util import set_original_response, set_modified_response, set_more_modified_response, live_server_setup, wait_for_all_checks
 | 
			
		||||
from . util import  extract_UUID_from_client
 | 
			
		||||
import logging
 | 
			
		||||
import base64
 | 
			
		||||
@@ -25,6 +24,9 @@ def test_check_notification(client, live_server):
 | 
			
		||||
    #live_server_setup(live_server)
 | 
			
		||||
    set_original_response()
 | 
			
		||||
 | 
			
		||||
    # Give the endpoint time to spin up
 | 
			
		||||
    time.sleep(1)
 | 
			
		||||
 | 
			
		||||
    # Re 360 - new install should have defaults set
 | 
			
		||||
    res = client.get(url_for("settings_page"))
 | 
			
		||||
    notification_url = url_for('test_notification_endpoint', _external=True).replace('http', 'json')
 | 
			
		||||
@@ -140,7 +142,8 @@ def test_check_notification(client, live_server):
 | 
			
		||||
 | 
			
		||||
    # Did we see the URL that had a change, in the notification?
 | 
			
		||||
    # Diff was correctly executed
 | 
			
		||||
 | 
			
		||||
    assert test_url in notification_submission
 | 
			
		||||
    assert ':-)' in notification_submission
 | 
			
		||||
    assert "Diff Full: Some initial text" in notification_submission
 | 
			
		||||
    assert "Diff: (changed) Which is across multiple lines" in notification_submission
 | 
			
		||||
    assert "(into) which has this one new line" in notification_submission
 | 
			
		||||
@@ -153,8 +156,7 @@ def test_check_notification(client, live_server):
 | 
			
		||||
    assert "preview/" in notification_submission
 | 
			
		||||
    assert ":-)" in notification_submission
 | 
			
		||||
    assert "New ChangeDetection.io Notification - {}".format(test_url) in notification_submission
 | 
			
		||||
    assert test_url in notification_submission
 | 
			
		||||
    assert ':-)' in notification_submission
 | 
			
		||||
 | 
			
		||||
    # Check the attachment was added, and that it is a JPEG from the original PNG
 | 
			
		||||
    notification_submission_object = json.loads(notification_submission)
 | 
			
		||||
    # We keep PNG screenshots for now
 | 
			
		||||
@@ -273,7 +275,7 @@ def test_notification_validation(client, live_server):
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def test_notification_custom_endpoint_and_jinja2(client, live_server):
 | 
			
		||||
    #live_server_setup(live_server)
 | 
			
		||||
    time.sleep(1)
 | 
			
		||||
 | 
			
		||||
    # test_endpoint - that sends the contents of a file
 | 
			
		||||
    # test_notification_endpoint - that takes a POST and writes it to file (test-datastore/notification.txt)
 | 
			
		||||
@@ -284,14 +286,12 @@ def test_notification_custom_endpoint_and_jinja2(client, live_server):
 | 
			
		||||
 | 
			
		||||
    res = client.post(
 | 
			
		||||
        url_for("settings_page"),
 | 
			
		||||
        data={
 | 
			
		||||
              "application-fetch_backend": "html_requests",
 | 
			
		||||
              "application-minutes_between_check": 180,
 | 
			
		||||
        data={"application-notification_title": "New ChangeDetection.io Notification - {{ watch_url }}",
 | 
			
		||||
              "application-notification_body": '{ "url" : "{{ watch_url }}", "secret": 444 }',
 | 
			
		||||
              "application-notification_format": default_notification_format,
 | 
			
		||||
              "application-notification_urls": test_notification_url,
 | 
			
		||||
              # https://github.com/caronc/apprise/wiki/Notify_Custom_JSON#get-parameter-manipulation
 | 
			
		||||
              "application-notification_title": "New ChangeDetection.io Notification - {{ watch_url }}",
 | 
			
		||||
              "application-notification_urls": test_notification_url,
 | 
			
		||||
              "application-minutes_between_check": 180,
 | 
			
		||||
              "application-fetch_backend": "html_requests"
 | 
			
		||||
              },
 | 
			
		||||
        follow_redirects=True
 | 
			
		||||
    )
 | 
			
		||||
@@ -316,8 +316,9 @@ def test_notification_custom_endpoint_and_jinja2(client, live_server):
 | 
			
		||||
    client.get(url_for("form_watch_checknow"), follow_redirects=True)
 | 
			
		||||
    time.sleep(2)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
    with open("test-datastore/notification.txt", 'r') as f:
 | 
			
		||||
        x = f.read()
 | 
			
		||||
        x=f.read()
 | 
			
		||||
        j = json.loads(x)
 | 
			
		||||
        assert j['url'].startswith('http://localhost')
 | 
			
		||||
        assert j['secret'] == 444
 | 
			
		||||
@@ -328,9 +329,5 @@ def test_notification_custom_endpoint_and_jinja2(client, live_server):
 | 
			
		||||
        notification_url = f.read()
 | 
			
		||||
        assert 'xxx=http' in notification_url
 | 
			
		||||
 | 
			
		||||
    # Should always be automatically detected as JSON content type even when we set it as 'Text' (default)
 | 
			
		||||
    assert os.path.isfile("test-datastore/notification-content-type.txt")
 | 
			
		||||
    with open("test-datastore/notification-content-type.txt", 'r') as f:
 | 
			
		||||
        assert 'application/json' in f.read()
 | 
			
		||||
 | 
			
		||||
    os.unlink("test-datastore/notification-url.txt")
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -2,7 +2,7 @@
 | 
			
		||||
 | 
			
		||||
import time
 | 
			
		||||
from flask import url_for
 | 
			
		||||
from . util import live_server_setup
 | 
			
		||||
from .util import live_server_setup, wait_for_all_checks
 | 
			
		||||
 | 
			
		||||
from ..html_tools import *
 | 
			
		||||
 | 
			
		||||
@@ -164,6 +164,7 @@ def test_check_xpath_text_function_utf8(client, live_server):
 | 
			
		||||
    assert b'Deleted' in res.data
 | 
			
		||||
 | 
			
		||||
def test_check_markup_xpath_filter_restriction(client, live_server):
 | 
			
		||||
    live_server_setup(live_server)
 | 
			
		||||
    sleep_time_for_fetch_thread = 3
 | 
			
		||||
 | 
			
		||||
    xpath_filter = "//*[contains(@class, 'sametext')]"
 | 
			
		||||
@@ -183,7 +184,7 @@ def test_check_markup_xpath_filter_restriction(client, live_server):
 | 
			
		||||
    assert b"1 Imported" in res.data
 | 
			
		||||
 | 
			
		||||
    # Give the thread time to pick it up
 | 
			
		||||
    time.sleep(sleep_time_for_fetch_thread)
 | 
			
		||||
    wait_for_all_checks(client)
 | 
			
		||||
 | 
			
		||||
    # Goto the edit page, add our ignore text
 | 
			
		||||
    # Add our URL to the import page
 | 
			
		||||
@@ -195,7 +196,7 @@ def test_check_markup_xpath_filter_restriction(client, live_server):
 | 
			
		||||
    assert b"Updated watch." in res.data
 | 
			
		||||
 | 
			
		||||
    # Give the thread time to pick it up
 | 
			
		||||
    time.sleep(sleep_time_for_fetch_thread)
 | 
			
		||||
    wait_for_all_checks(client)
 | 
			
		||||
 | 
			
		||||
    # view it/reset state back to viewed
 | 
			
		||||
    client.get(url_for("diff_history_page", uuid="first"), follow_redirects=True)
 | 
			
		||||
@@ -206,7 +207,7 @@ def test_check_markup_xpath_filter_restriction(client, live_server):
 | 
			
		||||
    # Trigger a check
 | 
			
		||||
    client.get(url_for("form_watch_checknow"), follow_redirects=True)
 | 
			
		||||
    # Give the thread time to pick it up
 | 
			
		||||
    time.sleep(sleep_time_for_fetch_thread)
 | 
			
		||||
    wait_for_all_checks(client)
 | 
			
		||||
 | 
			
		||||
    res = client.get(url_for("index"))
 | 
			
		||||
    assert b'unviewed' not in res.data
 | 
			
		||||
 
 | 
			
		||||
@@ -38,25 +38,7 @@ def set_modified_response():
 | 
			
		||||
        f.write(test_return_data)
 | 
			
		||||
 | 
			
		||||
    return None
 | 
			
		||||
def set_longer_modified_response():
 | 
			
		||||
    test_return_data = """<html>
 | 
			
		||||
    <head><title>modified head title</title></head>
 | 
			
		||||
    <body>
 | 
			
		||||
     Some initial text<br>
 | 
			
		||||
     <p>which has this one new line</p>
 | 
			
		||||
     <br>
 | 
			
		||||
     So let's see what happens.  <br>
 | 
			
		||||
     So let's see what happens.  <br>
 | 
			
		||||
      So let's see what happens.  <br>
 | 
			
		||||
     So let's see what happens.  <br>           
 | 
			
		||||
     </body>
 | 
			
		||||
     </html>
 | 
			
		||||
    """
 | 
			
		||||
 | 
			
		||||
    with open("test-datastore/endpoint-content.txt", "w") as f:
 | 
			
		||||
        f.write(test_return_data)
 | 
			
		||||
 | 
			
		||||
    return None
 | 
			
		||||
def set_more_modified_response():
 | 
			
		||||
    test_return_data = """<html>
 | 
			
		||||
    <head><title>modified head title</title></head>
 | 
			
		||||
@@ -205,10 +187,6 @@ def live_server_setup(live_server):
 | 
			
		||||
        with open("test-datastore/notification-url.txt", "w") as f:
 | 
			
		||||
            f.write(request.url)
 | 
			
		||||
 | 
			
		||||
        if request.content_type:
 | 
			
		||||
            with open("test-datastore/notification-content-type.txt", "w") as f:
 | 
			
		||||
                f.write(request.content_type)
 | 
			
		||||
 | 
			
		||||
        print("\n>> Test notification endpoint was hit.\n", data)
 | 
			
		||||
        return "Text was set"
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -32,17 +32,15 @@ class update_worker(threading.Thread):
 | 
			
		||||
 | 
			
		||||
        watch_history = watch.history
 | 
			
		||||
        dates = list(watch_history.keys())
 | 
			
		||||
        # Add text that was triggered
 | 
			
		||||
        snapshot_contents = watch.get_history_snapshot(dates[-1])
 | 
			
		||||
 | 
			
		||||
        # HTML needs linebreak, but MarkDown and Text can use a linefeed
 | 
			
		||||
        if n_object['notification_format'] == 'HTML':
 | 
			
		||||
            line_feed_sep = "<br>"
 | 
			
		||||
            # Snapshot will be plaintext on the disk, convert to some kind of HTML
 | 
			
		||||
            snapshot_contents = snapshot_contents.replace('\n', line_feed_sep)
 | 
			
		||||
        else:
 | 
			
		||||
            line_feed_sep = "\n"
 | 
			
		||||
 | 
			
		||||
        # Add text that was triggered
 | 
			
		||||
        snapshot_contents = watch.get_history_snapshot(dates[-1])
 | 
			
		||||
        trigger_text = watch.get('trigger_text', [])
 | 
			
		||||
        triggered_text = ''
 | 
			
		||||
 | 
			
		||||
@@ -67,48 +65,15 @@ class update_worker(threading.Thread):
 | 
			
		||||
        logging.info (">> SENDING NOTIFICATION")
 | 
			
		||||
        self.notification_q.put(n_object)
 | 
			
		||||
 | 
			
		||||
    # Prefer - Individual watch settings > Tag settings >  Global settings (in that order)
 | 
			
		||||
    def _check_cascading_vars(self, var_name, watch):
 | 
			
		||||
 | 
			
		||||
        from changedetectionio.notification import (
 | 
			
		||||
            default_notification_format_for_watch,
 | 
			
		||||
            default_notification_body,
 | 
			
		||||
            default_notification_title
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
        # Would be better if this was some kind of Object where Watch can reference the parent datastore etc
 | 
			
		||||
        v = watch.get(var_name)
 | 
			
		||||
        if v and not watch.get('notification_muted'):
 | 
			
		||||
            if var_name == 'notification_format' and v == default_notification_format_for_watch:
 | 
			
		||||
                return self.datastore.data['settings']['application'].get('notification_format')
 | 
			
		||||
 | 
			
		||||
            return v
 | 
			
		||||
 | 
			
		||||
        tags = self.datastore.get_all_tags_for_watch(uuid=watch.get('uuid'))
 | 
			
		||||
        if tags:
 | 
			
		||||
            for tag_uuid, tag in tags.items():
 | 
			
		||||
                v = tag.get(var_name)
 | 
			
		||||
                if v and not tag.get('notification_muted'):
 | 
			
		||||
                    return v
 | 
			
		||||
 | 
			
		||||
        if self.datastore.data['settings']['application'].get(var_name):
 | 
			
		||||
            return self.datastore.data['settings']['application'].get(var_name)
 | 
			
		||||
 | 
			
		||||
        # Otherwise could be defaults
 | 
			
		||||
        if var_name == 'notification_format':
 | 
			
		||||
            return default_notification_format_for_watch
 | 
			
		||||
        if var_name == 'notification_body':
 | 
			
		||||
            return default_notification_body
 | 
			
		||||
        if var_name == 'notification_title':
 | 
			
		||||
            return default_notification_title
 | 
			
		||||
 | 
			
		||||
        return None
 | 
			
		||||
 | 
			
		||||
    def send_content_changed_notification(self, watch_uuid):
 | 
			
		||||
 | 
			
		||||
        from changedetectionio.notification import (
 | 
			
		||||
            default_notification_format_for_watch
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
        n_object = {}
 | 
			
		||||
        watch = self.datastore.data['watching'].get(watch_uuid)
 | 
			
		||||
        watch = self.datastore.data['watching'].get(watch_uuid, False)
 | 
			
		||||
        if not watch:
 | 
			
		||||
            return
 | 
			
		||||
 | 
			
		||||
@@ -122,20 +87,57 @@ class update_worker(threading.Thread):
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
        # Should be a better parent getter in the model object
 | 
			
		||||
 | 
			
		||||
        # Prefer - Individual watch settings > Tag settings >  Global settings (in that order)
 | 
			
		||||
        n_object['notification_urls'] = self._check_cascading_vars('notification_urls', watch)
 | 
			
		||||
        n_object['notification_title'] = self._check_cascading_vars('notification_title', watch)
 | 
			
		||||
        n_object['notification_body'] = self._check_cascading_vars('notification_body', watch)
 | 
			
		||||
        n_object['notification_format'] = self._check_cascading_vars('notification_format', watch)
 | 
			
		||||
        n_object['notification_urls'] = watch.get('notification_urls')
 | 
			
		||||
 | 
			
		||||
        n_object['notification_title'] = watch['notification_title'] if watch['notification_title'] else \
 | 
			
		||||
            self.datastore.data['settings']['application']['notification_title']
 | 
			
		||||
 | 
			
		||||
        n_object['notification_body'] = watch['notification_body'] if watch['notification_body'] else \
 | 
			
		||||
            self.datastore.data['settings']['application']['notification_body']
 | 
			
		||||
 | 
			
		||||
        n_object['notification_format'] = watch['notification_format'] if watch['notification_format'] != default_notification_format_for_watch else \
 | 
			
		||||
            self.datastore.data['settings']['application']['notification_format']
 | 
			
		||||
 | 
			
		||||
        # (Individual watch) Only prepare to notify if the rules above matched
 | 
			
		||||
        queued = False
 | 
			
		||||
        if n_object and n_object.get('notification_urls'):
 | 
			
		||||
            queued = True
 | 
			
		||||
        sent = False
 | 
			
		||||
        if 'notification_urls' in n_object and n_object['notification_urls']:
 | 
			
		||||
            sent = True
 | 
			
		||||
            self.queue_notification_for_watch(n_object, watch)
 | 
			
		||||
 | 
			
		||||
        return queued
 | 
			
		||||
        # (Group tags) try by group tag
 | 
			
		||||
        if not sent:
 | 
			
		||||
            # Else, Try by tag, and use system default vars for format, body etc as fallback
 | 
			
		||||
            tags = self.datastore.get_all_tags_for_watch(uuid=watch_uuid)
 | 
			
		||||
            for tag_uuid, tag in tags.items():
 | 
			
		||||
                n_object = {}
 | 
			
		||||
                n_object['notification_urls'] = tag.get('notification_urls')
 | 
			
		||||
 | 
			
		||||
                n_object['notification_title'] = tag.get('notification_title') if tag.get('notification_title') else \
 | 
			
		||||
                    self.datastore.data['settings']['application']['notification_title']
 | 
			
		||||
 | 
			
		||||
                n_object['notification_body'] = tag.get('notification_body') if tag.get('notification_body') else \
 | 
			
		||||
                    self.datastore.data['settings']['application']['notification_body']
 | 
			
		||||
 | 
			
		||||
                n_object['notification_format'] = tag.get('notification_format') if tag.get('notification_format') != default_notification_format_for_watch else \
 | 
			
		||||
                    self.datastore.data['settings']['application']['notification_format']
 | 
			
		||||
 | 
			
		||||
                if 'notification_urls' in n_object and n_object.get('notification_urls') and not tag.get('notification_muted'):
 | 
			
		||||
                    sent = True
 | 
			
		||||
                    self.queue_notification_for_watch(n_object, watch)
 | 
			
		||||
 | 
			
		||||
        # (Group tags) try by global
 | 
			
		||||
        if not sent:
 | 
			
		||||
            # leave this as is, but repeat in a loop for each tag also
 | 
			
		||||
            n_object['notification_urls'] = self.datastore.data['settings']['application'].get('notification_urls')
 | 
			
		||||
            n_object['notification_title'] = self.datastore.data['settings']['application'].get('notification_title')
 | 
			
		||||
            n_object['notification_body'] = self.datastore.data['settings']['application'].get('notification_body')
 | 
			
		||||
            n_object['notification_format'] = self.datastore.data['settings']['application'].get('notification_format')
 | 
			
		||||
            if n_object.get('notification_urls') and n_object.get('notification_body') and n_object.get('notification_title'):
 | 
			
		||||
                sent = True
 | 
			
		||||
                self.queue_notification_for_watch(n_object, watch)
 | 
			
		||||
 | 
			
		||||
        return sent
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
    def send_filter_failure_notification(self, watch_uuid):
 | 
			
		||||
 
 | 
			
		||||
@@ -42,6 +42,8 @@ paho-mqtt
 | 
			
		||||
# (introduced once apprise became a dep)
 | 
			
		||||
cryptography~=3.4
 | 
			
		||||
 | 
			
		||||
elementpath
 | 
			
		||||
 | 
			
		||||
# Used for CSS filtering
 | 
			
		||||
beautifulsoup4
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user