Compare commits

..

2 Commits

Author SHA1 Message Date
dgtlmoon
20d65cdd26 0.43.2 2023-06-30 22:57:05 +02:00
dgtlmoon
37ff5f6d37 Bug - SMTP mailto:// Notification content-type (HTML/Text) fix and add CI tests (#1660) 2023-06-30 21:35:35 +02:00
43 changed files with 408 additions and 152 deletions

View File

@@ -1,2 +1,18 @@
.git
.github
changedetectionio/processors/__pycache__
changedetectionio/api/__pycache__
changedetectionio/model/__pycache__
changedetectionio/blueprint/price_data_follower/__pycache__
changedetectionio/blueprint/tags/__pycache__
changedetectionio/blueprint/__pycache__
changedetectionio/blueprint/browser_steps/__pycache__
changedetectionio/fetchers/__pycache__
changedetectionio/tests/visualselector/__pycache__
changedetectionio/tests/restock/__pycache__
changedetectionio/tests/__pycache__
changedetectionio/tests/fetchers/__pycache__
changedetectionio/tests/unit/__pycache__
changedetectionio/tests/proxy_list/__pycache__
changedetectionio/__pycache__

View File

@@ -37,6 +37,11 @@ jobs:
# Build a changedetection.io container and start testing inside
docker build . -t test-changedetectionio
- name: Spin up ancillary SMTP+Echo message test server
run: |
# Debug SMTP server/echo message back server
docker run --network changedet-network -d -p 11025:11025 -p 11080:11080 --hostname mailserver test-changedetectionio bash -c 'python changedetectionio/tests/smtp/smtp-test-server.py'
- name: Test built container with pytest
run: |
@@ -58,11 +63,16 @@ jobs:
# Settings headers playwright tests - Call back in from Browserless, check headers
docker run --name "changedet" --hostname changedet --rm -e "FLASK_SERVER_NAME=changedet" -e "PLAYWRIGHT_DRIVER_URL=ws://browserless:3000?dumpio=true" --network changedet-network test-changedetectionio bash -c 'cd changedetectionio; pytest --live-server-host=0.0.0.0 --live-server-port=5004 tests/test_request.py'
docker run --name "changedet" --hostname changedet --rm -e "FLASK_SERVER_NAME=changedet" -e "WEBDRIVER_URL=http://selenium:4444/wd/hub" --network changedet-network test-changedetectionio bash -c 'cd changedetectionio; pytest --live-server-host=0.0.0.0 --live-server-port=5004 tests/test_request.py'
docker run --name "changedet" --hostname changedet --rm -e "FLASK_SERVER_NAME=changedet" -e "USE_EXPERIMENTAL_PUPPETEER_FETCH=yes" -e "PLAYWRIGHT_DRIVER_URL=ws://browserless:3000?dumpio=true" --network changedet-network test-changedetectionio bash -c 'cd changedetectionio; pytest --live-server-host=0.0.0.0 --live-server-port=5004 tests/test_request.py'
docker run --name "changedet" --hostname changedet --rm -e "FLASK_SERVER_NAME=changedet" -e "USE_EXPERIMENTAL_PUPPETEER_FETCH=yes" -e "PLAYWRIGHT_DRIVER_URL=ws://browserless:3000?dumpio=true" --network changedet-network test-changedetectionio bash -c 'cd changedetectionio; pytest --live-server-host=0.0.0.0 --live-server-port=5004 tests/test_request.py'
# restock detection via playwright - added name=changedet here so that playwright/browserless can connect to it
docker run --rm --name "changedet" -e "FLASK_SERVER_NAME=changedet" -e "PLAYWRIGHT_DRIVER_URL=ws://browserless:3000" --network changedet-network test-changedetectionio bash -c 'cd changedetectionio;pytest --live-server-port=5004 --live-server-host=0.0.0.0 tests/restock/test_restock.py'
- name: Test SMTP notification mime types
run: |
# SMTP content types - needs the 'Debug SMTP server/echo message back server' container from above
docker run --rm --network changedet-network test-changedetectionio bash -c 'cd changedetectionio;pytest tests/smtp/test_notification_smtp.py'
- name: Test with puppeteer fetcher and disk cache
run: |
docker run --rm -e "PUPPETEER_DISK_CACHE=/tmp/data/" -e "USE_EXPERIMENTAL_PUPPETEER_FETCH=yes" -e "PLAYWRIGHT_DRIVER_URL=ws://browserless:3000" --network changedet-network test-changedetectionio bash -c 'cd changedetectionio;pytest tests/fetchers/test_content.py && pytest tests/test_errorhandling.py && pytest tests/visualselector/test_fetch_data.py'

View File

@@ -38,7 +38,7 @@ from flask_paginate import Pagination, get_page_parameter
from changedetectionio import html_tools
from changedetectionio.api import api_v1
__version__ = '0.43.1'
__version__ = '0.43.2'
datastore = None

View File

@@ -54,4 +54,5 @@ def render_diff(previous_version_file_contents, newest_version_file_contents, in
# Recursively join lists
f = lambda L: line_feed_sep.join([f(x) if type(x) is list else x for x in L])
return f(rendered_diff)
p= f(rendered_diff)
return p

View File

@@ -151,9 +151,12 @@ def process_notification(n_object, datastore):
# Apprise will default to HTML, so we need to override it
# So that whats' generated in n_body is in line with what is going to be sent.
# https://github.com/caronc/apprise/issues/633#issuecomment-1191449321
if not 'format=' in url and (n_format == 'text' or n_format == 'markdown'):
if not 'format=' in url and (n_format == 'Text' or n_format == 'Markdown'):
prefix = '?' if not '?' in url else '&'
# Apprise format is lowercase text https://github.com/caronc/apprise/issues/633
n_format = n_format.tolower()
url = "{}{}format={}".format(url, prefix, n_format)
# If n_format == HTML, then apprise email should default to text/html and we should be sending HTML only
apobj.add(url)

View File

@@ -28,6 +28,8 @@ def test_fetch_webdriver_content(client, live_server):
)
assert b"1 Imported" in res.data
time.sleep(3)
wait_for_all_checks(client)

View File

@@ -2,11 +2,10 @@
import time
from flask import url_for
from ..util import live_server_setup, wait_for_all_checks
from ..util import live_server_setup
def test_preferred_proxy(client, live_server):
time.sleep(1)
live_server_setup(live_server)
time.sleep(1)
url = "http://chosen.changedetection.io"
@@ -21,8 +20,7 @@ def test_preferred_proxy(client, live_server):
assert b"1 Imported" in res.data
wait_for_all_checks(client)
time.sleep(2)
res = client.post(
url_for("edit_page", uuid="first"),
data={
@@ -36,6 +34,5 @@ def test_preferred_proxy(client, live_server):
follow_redirects=True
)
assert b"Updated watch." in res.data
wait_for_all_checks(client)
time.sleep(2)
# Now the request should appear in the second-squid logs

View File

@@ -16,4 +16,4 @@ def test_check_basic_change_detection_functionality(client, live_server):
)
assert b"1 Imported" in res.data
wait_for_all_checks(client)
time.sleep(3)

View File

@@ -94,7 +94,7 @@ def test_restock_detection(client, live_server):
assert b'not-in-stock' not in res.data
# We should have a notification
wait_for_all_checks(client)
time.sleep(2)
assert os.path.isfile("test-datastore/notification.txt")
os.unlink("test-datastore/notification.txt")

View File

@@ -0,0 +1,42 @@
#!/usr/bin/python3
import smtpd
import asyncore
# Accept a SMTP message and offer a way to retrieve the last message via TCP Socket
last_received_message = b"Nothing"
class CustomSMTPServer(smtpd.SMTPServer):
def process_message(self, peer, mailfrom, rcpttos, data, **kwargs):
global last_received_message
last_received_message = data
print('Receiving message from:', peer)
print('Message addressed from:', mailfrom)
print('Message addressed to :', rcpttos)
print('Message length :', len(data))
print(data.decode('utf8'))
return
# Just print out the last message received on plain TCP socket server
class EchoServer(asyncore.dispatcher):
def __init__(self, host, port):
asyncore.dispatcher.__init__(self)
self.create_socket()
self.set_reuse_addr()
self.bind((host, port))
self.listen(5)
def handle_accepted(self, sock, addr):
global last_received_message
print('Incoming connection from %s' % repr(addr))
sock.send(last_received_message)
last_received_message = b''
server = CustomSMTPServer(('0.0.0.0', 11025), None) # SMTP mail goes here
server2 = EchoServer('0.0.0.0', 11080) # Echo back last message received
asyncore.loop()

View File

@@ -0,0 +1,165 @@
import json
import os
import time
import re
from flask import url_for
from changedetectionio.tests.util import set_original_response, set_modified_response, set_more_modified_response, live_server_setup, \
wait_for_all_checks, \
set_longer_modified_response
from changedetectionio.tests.util import extract_UUID_from_client
import logging
import base64
# NOTE - RELIES ON mailserver as hostname running, see github build recipes
smtp_test_server = 'mailserver'
from changedetectionio.notification import (
default_notification_body,
default_notification_format,
default_notification_title,
valid_notification_formats,
)
def test_setup(live_server):
live_server_setup(live_server)
def get_last_message_from_smtp_server():
import socket
global smtp_test_server
port = 11080 # socket server port number
client_socket = socket.socket() # instantiate
client_socket.connect((smtp_test_server, port)) # connect to the server
data = client_socket.recv(50024).decode() # receive response
client_socket.close() # close the connection
return data
# Requires running the test SMTP server
def test_check_notification_email_formats_default_HTML(client, live_server):
# live_server_setup(live_server)
set_original_response()
global smtp_test_server
notification_url = f'mailto://changedetection@{smtp_test_server}:11025/?to=fff@home.com'
#####################
# Set this up for when we remove the notification from the watch, it should fallback with these details
res = client.post(
url_for("settings_page"),
data={"application-notification_urls": notification_url,
"application-notification_title": "fallback-title " + default_notification_title,
"application-notification_body": "fallback-body<br> " + default_notification_body,
"application-notification_format": 'HTML',
"requests-time_between_check-minutes": 180,
'application-fetch_backend': "html_requests"},
follow_redirects=True
)
assert b"Settings updated." in res.data
# Add a watch and trigger a HTTP POST
test_url = url_for('test_endpoint', _external=True)
res = client.post(
url_for("form_quick_watch_add"),
data={"url": test_url, "tags": 'nice one'},
follow_redirects=True
)
assert b"Watch added" in res.data
wait_for_all_checks(client)
set_longer_modified_response()
client.get(url_for("form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
time.sleep(3)
msg = get_last_message_from_smtp_server()
assert len(msg) >= 1
# The email should have two bodies, and the text/html part should be <br>
assert 'Content-Type: text/plain' in msg
assert '(added) So let\'s see what happens.\n' in msg # The plaintext part with \n
assert 'Content-Type: text/html' in msg
assert '(added) So let\'s see what happens.<br>' in msg # the html part
res = client.get(url_for("form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data
def test_check_notification_email_formats_default_Text_override_HTML(client, live_server):
# live_server_setup(live_server)
# HTML problems? see this
# https://github.com/caronc/apprise/issues/633
set_original_response()
global smtp_test_server
notification_url = f'mailto://changedetection@{smtp_test_server}:11025/?to=fff@home.com'
#####################
# Set this up for when we remove the notification from the watch, it should fallback with these details
res = client.post(
url_for("settings_page"),
data={"application-notification_urls": notification_url,
"application-notification_title": "fallback-title " + default_notification_title,
"application-notification_body": default_notification_body,
"application-notification_format": 'Text',
"requests-time_between_check-minutes": 180,
'application-fetch_backend': "html_requests"},
follow_redirects=True
)
assert b"Settings updated." in res.data
# Add a watch and trigger a HTTP POST
test_url = url_for('test_endpoint', _external=True)
res = client.post(
url_for("form_quick_watch_add"),
data={"url": test_url, "tags": 'nice one'},
follow_redirects=True
)
assert b"Watch added" in res.data
wait_for_all_checks(client)
set_longer_modified_response()
client.get(url_for("form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
time.sleep(3)
msg = get_last_message_from_smtp_server()
assert len(msg) >= 1
# with open('/tmp/m.txt', 'w') as f:
# f.write(msg)
# The email should not have two bodies, should be TEXT only
assert 'Content-Type: text/plain' in msg
assert '(added) So let\'s see what happens.\n' in msg # The plaintext part with \n
set_original_response()
# Now override as HTML format
res = client.post(
url_for("edit_page", uuid="first"),
data={
"url": test_url,
"notification_format": 'HTML',
'fetch_backend': "html_requests"},
follow_redirects=True
)
assert b"Updated watch." in res.data
wait_for_all_checks(client)
time.sleep(3)
msg = get_last_message_from_smtp_server()
assert len(msg) >= 1
# The email should have two bodies, and the text/html part should be <br>
assert 'Content-Type: text/plain' in msg
assert '(removed) So let\'s see what happens.\n' in msg # The plaintext part with \n
assert 'Content-Type: text/html' in msg
assert '(removed) So let\'s see what happens.<br>' in msg # the html part
res = client.get(url_for("form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data

View File

@@ -1,4 +1,4 @@
from .util import live_server_setup, extract_UUID_from_client, wait_for_all_checks
from . util import live_server_setup, extract_UUID_from_client
from flask import url_for
import time
@@ -19,10 +19,10 @@ def test_check_access_control(app, client, live_server):
)
assert b"1 Imported" in res.data
wait_for_all_checks(client)
time.sleep(2)
res = client.get(url_for("form_watch_checknow"), follow_redirects=True)
assert b'1 watches queued for rechecking.' in res.data
wait_for_all_checks(client)
time.sleep(2)
# Enable password check and diff page access bypass
res = c.post(

View File

@@ -41,6 +41,7 @@ def test_setup(client, live_server):
def test_check_removed_line_contains_trigger(client, live_server):
# Give the endpoint time to spin up
time.sleep(1)
set_original()
# Add our URL to the import page
test_url = url_for('test_endpoint', _external=True)

View File

@@ -268,8 +268,6 @@ def test_api_watch_PUT_update(client, live_server):
#live_server_setup(live_server)
api_key = extract_api_key_from_UI(client)
wait_for_all_checks(client)
# Create a watch
set_original_response()
test_url = url_for('test_endpoint', _external=True,
@@ -286,9 +284,6 @@ def test_api_watch_PUT_update(client, live_server):
assert res.status_code == 201
wait_for_all_checks(client)
# Get a listing, it will be the first one
res = client.get(
url_for("createwatch"),

View File

@@ -2,8 +2,7 @@
import time
from flask import url_for
from .util import live_server_setup, wait_for_all_checks
from . util import live_server_setup
def test_basic_auth(client, live_server):
@@ -20,7 +19,7 @@ def test_basic_auth(client, live_server):
follow_redirects=True
)
assert b"1 Imported" in res.data
wait_for_all_checks(client)
time.sleep(1)
# Check form validation
res = client.post(
@@ -30,7 +29,7 @@ def test_basic_auth(client, live_server):
)
assert b"Updated watch." in res.data
wait_for_all_checks(client)
time.sleep(1)
res = client.get(
url_for("preview_page", uuid="first"),
follow_redirects=True

View File

@@ -2,8 +2,7 @@
import time
from flask import url_for
from .util import live_server_setup, extract_UUID_from_client, extract_api_key_from_UI, wait_for_all_checks
from .util import live_server_setup, extract_UUID_from_client, extract_api_key_from_UI
def set_response_with_ldjson():
test_return_data = """<html>
@@ -93,7 +92,7 @@ def test_check_ldjson_price_autodetect(client, live_server):
follow_redirects=True
)
assert b"1 Imported" in res.data
wait_for_all_checks(client)
time.sleep(3)
# Should get a notice that it's available
res = client.get(url_for("index"))
@@ -103,11 +102,11 @@ def test_check_ldjson_price_autodetect(client, live_server):
uuid = extract_UUID_from_client(client)
client.get(url_for('price_data_follower.accept', uuid=uuid, follow_redirects=True))
wait_for_all_checks(client)
time.sleep(2)
# Trigger a check
client.get(url_for("form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
time.sleep(2)
# Offer should be gone
res = client.get(url_for("index"))
assert b'Embedded price data' not in res.data
@@ -139,7 +138,7 @@ def test_check_ldjson_price_autodetect(client, live_server):
follow_redirects=True
)
assert b"1 Imported" in res.data
wait_for_all_checks(client)
time.sleep(3)
res = client.get(url_for("index"))
assert b'ldjson-price-track-offer' not in res.data

View File

@@ -28,7 +28,8 @@ def test_check_basic_change_detection_functionality(client, live_server):
)
assert b"1 Imported" in res.data
wait_for_all_checks(client)
time.sleep(sleep_time_for_fetch_thread)
# Do this a few times.. ensures we dont accidently set the status
for n in range(3):

View File

@@ -1,6 +1,6 @@
#!/usr/bin/python3
from .util import set_original_response, set_modified_response, live_server_setup, wait_for_all_checks
from .util import set_original_response, set_modified_response, live_server_setup
from flask import url_for
from urllib.request import urlopen
from zipfile import ZipFile
@@ -24,7 +24,7 @@ def test_backup(client, live_server):
)
assert b"1 Imported" in res.data
wait_for_all_checks(client)
time.sleep(3)
res = client.get(
url_for("get_backup"),

View File

@@ -2,7 +2,7 @@
import time
from flask import url_for
from .util import live_server_setup, wait_for_all_checks
from . util import live_server_setup
from ..html_tools import *
@@ -90,7 +90,7 @@ def test_check_markup_include_filters_restriction(client, live_server):
assert b"1 Imported" in res.data
# Give the thread time to pick it up
wait_for_all_checks(client)
time.sleep(sleep_time_for_fetch_thread)
# Goto the edit page, add our ignore text
# Add our URL to the import page
@@ -100,7 +100,7 @@ def test_check_markup_include_filters_restriction(client, live_server):
follow_redirects=True
)
assert b"Updated watch." in res.data
wait_for_all_checks(client)
time.sleep(1)
# Check it saved
res = client.get(
url_for("edit_page", uuid="first"),
@@ -108,14 +108,14 @@ def test_check_markup_include_filters_restriction(client, live_server):
assert bytes(include_filters.encode('utf-8')) in res.data
# Give the thread time to pick it up
wait_for_all_checks(client)
time.sleep(sleep_time_for_fetch_thread)
# Make a change
set_modified_response()
# Trigger a check
client.get(url_for("form_watch_checknow"), follow_redirects=True)
# Give the thread time to pick it up
wait_for_all_checks(client)
time.sleep(sleep_time_for_fetch_thread)
# It should have 'unviewed' still
# Because it should be looking at only that 'sametext' id
@@ -139,7 +139,7 @@ def test_check_multiple_filters(client, live_server):
""")
# Give the endpoint time to spin up
wait_for_all_checks(client)
time.sleep(1)
# Add our URL to the import page
test_url = url_for('test_endpoint', _external=True)
@@ -149,7 +149,7 @@ def test_check_multiple_filters(client, live_server):
follow_redirects=True
)
assert b"1 Imported" in res.data
wait_for_all_checks(client)
time.sleep(1)
# Goto the edit page, add our ignore text
# Add our URL to the import page
@@ -165,7 +165,7 @@ def test_check_multiple_filters(client, live_server):
assert b"Updated watch." in res.data
# Give the thread time to pick it up
wait_for_all_checks(client)
time.sleep(sleep_time_for_fetch_thread)
res = client.get(
url_for("preview_page", uuid="first"),

View File

@@ -5,7 +5,7 @@ import time
from flask import url_for
from ..html_tools import *
from .util import live_server_setup, wait_for_all_checks
from .util import live_server_setup
def test_setup(live_server):
@@ -120,7 +120,7 @@ def test_element_removal_full(client, live_server):
url_for("import_page"), data={"urls": test_url}, follow_redirects=True
)
assert b"1 Imported" in res.data
wait_for_all_checks(client)
time.sleep(1)
# Goto the edit page, add the filter data
# Not sure why \r needs to be added - absent of the #changetext this is not necessary
subtractive_selectors_data = "header\r\nfooter\r\nnav\r\n#changetext"
@@ -147,7 +147,7 @@ def test_element_removal_full(client, live_server):
client.get(url_for("form_watch_checknow"), follow_redirects=True)
# Give the thread time to pick it up
wait_for_all_checks(client)
time.sleep(sleep_time_for_fetch_thread)
# so that we set the state to 'unviewed' after all the edits
client.get(url_for("diff_history_page", uuid="first"))
@@ -159,7 +159,7 @@ def test_element_removal_full(client, live_server):
client.get(url_for("form_watch_checknow"), follow_redirects=True)
# Give the thread time to pick it up
wait_for_all_checks(client)
time.sleep(sleep_time_for_fetch_thread)
# There should not be an unviewed change, as changes should be removed
res = client.get(url_for("index"))

View File

@@ -3,7 +3,7 @@
import time
from flask import url_for
from .util import live_server_setup, wait_for_all_checks
from .util import live_server_setup
import pytest
@@ -39,7 +39,7 @@ def test_check_encoding_detection(client, live_server):
)
# Give the thread time to pick it up
wait_for_all_checks(client)
time.sleep(2)
res = client.get(
url_for("preview_page", uuid="first"),
@@ -68,7 +68,7 @@ def test_check_encoding_detection_missing_content_type_header(client, live_serve
)
# Give the thread time to pick it up
wait_for_all_checks(client)
time.sleep(2)
res = client.get(
url_for("preview_page", uuid="first"),

View File

@@ -3,7 +3,7 @@
import time
from flask import url_for
from .util import live_server_setup, wait_for_all_checks
from . util import live_server_setup
from ..html_tools import *
@@ -30,7 +30,7 @@ def _runner_test_http_errors(client, live_server, http_code, expected_text):
assert b"1 Imported" in res.data
# Give the thread time to pick it up
wait_for_all_checks(client)
time.sleep(2)
res = client.get(url_for("index"))
# no change
@@ -76,7 +76,7 @@ def test_DNS_errors(client, live_server):
assert b"1 Imported" in res.data
# Give the thread time to pick it up
wait_for_all_checks(client)
time.sleep(3)
res = client.get(url_for("index"))
found_name_resolution_error = b"Temporary failure in name resolution" in res.data or b"Name or service not known" in res.data
@@ -90,7 +90,7 @@ def test_DNS_errors(client, live_server):
def test_low_level_errors_clear_correctly(client, live_server):
#live_server_setup(live_server)
# Give the endpoint time to spin up
#time.sleep(1)
time.sleep(1)
with open("test-datastore/endpoint-content.txt", "w") as f:
f.write("<html><body><div id=here>Hello world</div></body></html>")
@@ -104,7 +104,7 @@ def test_low_level_errors_clear_correctly(client, live_server):
follow_redirects=True
)
assert b"1 Imported" in res.data
wait_for_all_checks(client)
time.sleep(2)
# We should see the DNS error
res = client.get(url_for("index"))
@@ -121,7 +121,7 @@ def test_low_level_errors_clear_correctly(client, live_server):
)
# Now the error should be gone
wait_for_all_checks(client)
time.sleep(2)
res = client.get(url_for("index"))
found_name_resolution_error = b"Temporary failure in name resolution" in res.data or b"Name or service not known" in res.data
assert not found_name_resolution_error

View File

@@ -24,7 +24,7 @@ def test_check_extract_text_from_diff(client, live_server):
)
assert b"1 Imported" in res.data
wait_for_all_checks
time.sleep(1)
# Load in 5 different numbers/changes
last_date=""

View File

@@ -2,7 +2,7 @@
import time
from flask import url_for
from .util import live_server_setup, wait_for_all_checks
from .util import live_server_setup
from ..html_tools import *
@@ -82,7 +82,7 @@ def test_check_filter_multiline(client, live_server):
)
assert b"1 Imported" in res.data
wait_for_all_checks(client)
time.sleep(3)
# Goto the edit page, add our ignore text
# Add our URL to the import page
@@ -99,7 +99,7 @@ def test_check_filter_multiline(client, live_server):
)
assert b"Updated watch." in res.data
wait_for_all_checks(client)
time.sleep(3)
res = client.get(
url_for("preview_page", uuid="first"),
@@ -132,12 +132,12 @@ def test_check_filter_and_regex_extract(client, live_server):
)
assert b"1 Imported" in res.data
wait_for_all_checks(client)
time.sleep(1)
# Trigger a check
client.get(url_for("form_watch_checknow"), follow_redirects=True)
# Give the thread time to pick it up
wait_for_all_checks(client)
time.sleep(sleep_time_for_fetch_thread)
# Goto the edit page, add our ignore text
# Add our URL to the import page
@@ -156,7 +156,7 @@ def test_check_filter_and_regex_extract(client, live_server):
assert b"Updated watch." in res.data
# Give the thread time to pick it up
wait_for_all_checks(client)
time.sleep(sleep_time_for_fetch_thread)
# Make a change
set_modified_response()
@@ -164,7 +164,7 @@ def test_check_filter_and_regex_extract(client, live_server):
# Trigger a check
client.get(url_for("form_watch_checknow"), follow_redirects=True)
# Give the thread time to pick it up
wait_for_all_checks(client)
time.sleep(sleep_time_for_fetch_thread)
# It should have 'unviewed' still
# Because it should be looking at only that 'sametext' id

View File

@@ -4,7 +4,7 @@
import os
import time
from flask import url_for
from .util import set_original_response, live_server_setup, wait_for_all_checks
from .util import set_original_response, live_server_setup
from changedetectionio.model import App
@@ -62,7 +62,7 @@ def test_filter_doesnt_exist_then_exists_should_get_notification(client, live_se
assert b"Watch added" in res.data
# Give the thread time to pick up the first version
wait_for_all_checks(client)
time.sleep(3)
# Goto the edit page, add our ignore text
# Add our URL to the import page
@@ -101,9 +101,6 @@ def test_filter_doesnt_exist_then_exists_should_get_notification(client, live_se
follow_redirects=True
)
assert b"Updated watch." in res.data
wait_for_all_checks(client)
# apprise takes a moment to fire
time.sleep(3)
# Shouldn't exist, shouldn't have fired
@@ -111,11 +108,8 @@ def test_filter_doesnt_exist_then_exists_should_get_notification(client, live_se
# Now the filter should exist
set_response_with_filter()
client.get(url_for("form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
# apprise takes a moment to fire
time.sleep(3)
assert os.path.isfile("test-datastore/notification.txt")
with open("test-datastore/notification.txt", 'r') as f:

View File

@@ -3,7 +3,7 @@
import time
from flask import url_for
from .util import live_server_setup, wait_for_all_checks
from .util import live_server_setup
def test_setup(live_server):
@@ -70,19 +70,19 @@ def test_render_anchor_tag_content_true(client, live_server):
)
assert b"1 Imported" in res.data
wait_for_all_checks(client)
time.sleep(sleep_time_for_fetch_thread)
# Trigger a check
client.get(url_for("form_watch_checknow"), follow_redirects=True)
# set a new html text with a modified link
set_modified_ignore_response()
wait_for_all_checks(client)
time.sleep(sleep_time_for_fetch_thread)
# Trigger a check
client.get(url_for("form_watch_checknow"), follow_redirects=True)
# Give the thread time to pick it up
wait_for_all_checks(client)
time.sleep(sleep_time_for_fetch_thread)
# We should not see the rendered anchor tag
res = client.get(url_for("preview_page", uuid="first"))
@@ -104,7 +104,7 @@ def test_render_anchor_tag_content_true(client, live_server):
client.get(url_for("form_watch_checknow"), follow_redirects=True)
# Give the thread time to pick it up
wait_for_all_checks(client)
time.sleep(sleep_time_for_fetch_thread)

View File

@@ -2,8 +2,7 @@
import time
from flask import url_for
from .util import live_server_setup, wait_for_all_checks
from . util import live_server_setup
def test_setup(live_server):
live_server_setup(live_server)
@@ -51,6 +50,7 @@ def set_original_ignore_response():
# If there was only a change in the whitespacing, then we shouldnt have a change detected
def test_check_ignore_whitespace(client, live_server):
sleep_time_for_fetch_thread = 3
# Give the endpoint time to spin up
time.sleep(1)
@@ -78,17 +78,17 @@ def test_check_ignore_whitespace(client, live_server):
)
assert b"1 Imported" in res.data
wait_for_all_checks(client)
time.sleep(sleep_time_for_fetch_thread)
# Trigger a check
client.get(url_for("form_watch_checknow"), follow_redirects=True)
set_original_ignore_response_but_with_whitespace()
wait_for_all_checks(client)
time.sleep(sleep_time_for_fetch_thread)
# Trigger a check
client.get(url_for("form_watch_checknow"), follow_redirects=True)
# Give the thread time to pick it up
wait_for_all_checks(client)
time.sleep(sleep_time_for_fetch_thread)
# It should report nothing found (no new 'unviewed' class)
res = client.get(url_for("index"))

View File

@@ -2,7 +2,7 @@
import time
from flask import url_for
from .util import live_server_setup, wait_for_all_checks
from .util import live_server_setup
# If there was only a change in the whitespacing, then we shouldnt have a change detected
@@ -24,7 +24,7 @@ def test_jinja2_in_url_query(client, live_server):
follow_redirects=True
)
assert b"Watch added" in res.data
wait_for_all_checks(client)
time.sleep(3)
# It should report nothing found (no new 'unviewed' class)
res = client.get(
url_for("preview_page", uuid="first"),

View File

@@ -3,7 +3,7 @@
import time
from flask import url_for
from urllib.request import urlopen
from .util import set_original_response, set_modified_response, live_server_setup, wait_for_all_checks
from .util import set_original_response, set_modified_response, live_server_setup
sleep_time_for_fetch_thread = 3
@@ -35,14 +35,14 @@ def test_check_basic_change_detection_functionality(client, live_server):
assert b"1 Imported" in res.data
wait_for_all_checks(client)
time.sleep(sleep_time_for_fetch_thread)
# Do this a few times.. ensures we dont accidently set the status
for n in range(3):
client.get(url_for("form_watch_checknow"), follow_redirects=True)
# Give the thread time to pick it up
wait_for_all_checks(client)
time.sleep(sleep_time_for_fetch_thread)
# It should report nothing found (no new 'unviewed' class)
res = client.get(url_for("index"))
@@ -64,7 +64,7 @@ def test_check_basic_change_detection_functionality(client, live_server):
client.get(url_for("form_watch_checknow"), follow_redirects=True)
# Give the thread time to pick it up
wait_for_all_checks(client)
time.sleep(sleep_time_for_fetch_thread)
# It should report nothing found (no new 'unviewed' class)
res = client.get(url_for("index"))
@@ -86,7 +86,7 @@ def test_check_basic_change_detection_functionality(client, live_server):
client.get(url_for("form_watch_checknow"), follow_redirects=True)
# Give the thread time to pick it up
wait_for_all_checks(client)
time.sleep(sleep_time_for_fetch_thread)
# It should report nothing found (no new 'unviewed' class)
res = client.get(url_for("index"))

View File

@@ -3,7 +3,8 @@ import os
import time
import re
from flask import url_for
from .util import set_original_response, set_modified_response, set_more_modified_response, live_server_setup, wait_for_all_checks
from .util import set_original_response, set_modified_response, set_more_modified_response, live_server_setup, wait_for_all_checks, \
set_longer_modified_response
from . util import extract_UUID_from_client
import logging
import base64
@@ -272,7 +273,7 @@ def test_notification_validation(client, live_server):
def test_notification_custom_endpoint_and_jinja2(client, live_server):
time.sleep(1)
#live_server_setup(live_server)
# test_endpoint - that sends the contents of a file
# test_notification_endpoint - that takes a POST and writes it to file (test-datastore/notification.txt)
@@ -283,12 +284,14 @@ def test_notification_custom_endpoint_and_jinja2(client, live_server):
res = client.post(
url_for("settings_page"),
data={"application-notification_title": "New ChangeDetection.io Notification - {{ watch_url }}",
"application-notification_body": '{ "url" : "{{ watch_url }}", "secret": 444 }',
# https://github.com/caronc/apprise/wiki/Notify_Custom_JSON#get-parameter-manipulation
"application-notification_urls": test_notification_url,
data={
"application-fetch_backend": "html_requests",
"application-minutes_between_check": 180,
"application-fetch_backend": "html_requests"
"application-notification_body": '{ "url" : "{{ watch_url }}", "secret": 444 }',
"application-notification_format": default_notification_format,
"application-notification_urls": test_notification_url,
# https://github.com/caronc/apprise/wiki/Notify_Custom_JSON#get-parameter-manipulation
"application-notification_title": "New ChangeDetection.io Notification - {{ watch_url }}",
},
follow_redirects=True
)
@@ -313,9 +316,8 @@ def test_notification_custom_endpoint_and_jinja2(client, live_server):
client.get(url_for("form_watch_checknow"), follow_redirects=True)
time.sleep(2)
with open("test-datastore/notification.txt", 'r') as f:
x=f.read()
x = f.read()
j = json.loads(x)
assert j['url'].startswith('http://localhost')
assert j['secret'] == 444
@@ -326,5 +328,9 @@ def test_notification_custom_endpoint_and_jinja2(client, live_server):
notification_url = f.read()
assert 'xxx=http' in notification_url
os.unlink("test-datastore/notification-url.txt")
# Should always be automatically detected as JSON content type even when we set it as 'Text' (default)
assert os.path.isfile("test-datastore/notification-content-type.txt")
with open("test-datastore/notification-content-type.txt", 'r') as f:
assert 'application/json' in f.read()
os.unlink("test-datastore/notification-url.txt")

View File

@@ -2,7 +2,7 @@ import os
import time
import re
from flask import url_for
from .util import set_original_response, set_modified_response, live_server_setup, wait_for_all_checks
from . util import set_original_response, set_modified_response, live_server_setup
import logging
def test_check_notification_error_handling(client, live_server):
@@ -11,7 +11,7 @@ def test_check_notification_error_handling(client, live_server):
set_original_response()
# Give the endpoint time to spin up
time.sleep(1)
time.sleep(2)
# Set a URL and fetch it, then set a notification URL which is going to give errors
test_url = url_for('test_endpoint', _external=True)
@@ -22,7 +22,7 @@ def test_check_notification_error_handling(client, live_server):
)
assert b"Watch added" in res.data
wait_for_all_checks(client)
time.sleep(2)
set_modified_response()
res = client.post(

View File

@@ -2,7 +2,7 @@
import time
from flask import url_for
from .util import live_server_setup, wait_for_all_checks
from .util import live_server_setup
def set_original_ignore_response():
@@ -32,7 +32,8 @@ def test_obfuscations(client, live_server):
assert b"1 Imported" in res.data
# Give the thread time to pick it up
wait_for_all_checks(client)
time.sleep(3)
# Check HTML conversion detected and workd
res = client.get(
url_for("preview_page", uuid="first"),

View File

@@ -2,7 +2,7 @@
import time
from flask import url_for
from .util import set_original_response, set_modified_response, live_server_setup, wait_for_all_checks
from .util import set_original_response, set_modified_response, live_server_setup
sleep_time_for_fetch_thread = 3
@@ -22,7 +22,7 @@ def test_fetch_pdf(client, live_server):
assert b"1 Imported" in res.data
wait_for_all_checks(client)
time.sleep(sleep_time_for_fetch_thread)
res = client.get(
url_for("preview_page", uuid="first"),
follow_redirects=True

View File

@@ -19,9 +19,9 @@ def test_rss_and_token(client, live_server):
assert b"1 Imported" in res.data
rss_token = extract_rss_token_from_UI(client)
wait_for_all_checks(client)
time.sleep(2)
client.get(url_for("form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
time.sleep(2)
# Add our URL to the import page
res = client.get(

View File

@@ -1,5 +1,5 @@
from flask import url_for
from .util import set_original_response, set_modified_response, live_server_setup, wait_for_all_checks
from . util import set_original_response, set_modified_response, live_server_setup
import time
@@ -59,7 +59,7 @@ def test_bad_access(client, live_server):
data={"url": 'file:///tasty/disk/drive', "tags": ''},
follow_redirects=True
)
wait_for_all_checks(client)
time.sleep(1)
res = client.get(url_for("index"))
assert b'file:// type access is denied for security reasons.' in res.data

View File

@@ -22,7 +22,7 @@ def test_check_basic_change_detection_functionality_source(client, live_server):
assert b"1 Imported" in res.data
wait_for_all_checks(client)
time.sleep(sleep_time_for_fetch_thread)
#####################
@@ -82,7 +82,7 @@ def test_check_ignore_elements(client, live_server):
follow_redirects=True
)
wait_for_all_checks(client)
time.sleep(sleep_time_for_fetch_thread)
res = client.get(
url_for("preview_page", uuid="first"),

View File

@@ -2,7 +2,7 @@
import time
from flask import url_for
from .util import live_server_setup, wait_for_all_checks
from . util import live_server_setup
def set_original_ignore_response():
@@ -64,7 +64,7 @@ def test_trigger_functionality(client, live_server):
set_original_ignore_response()
# Give the endpoint time to spin up
wait_for_all_checks(client)
time.sleep(1)
# Add our URL to the import page
test_url = url_for('test_endpoint', _external=True)
@@ -96,7 +96,7 @@ def test_trigger_functionality(client, live_server):
assert bytes(trigger_text.encode('utf-8')) in res.data
# Give the thread time to pick it up
wait_for_all_checks(client)
time.sleep(sleep_time_for_fetch_thread)
# so that we set the state to 'unviewed' after all the edits
client.get(url_for("diff_history_page", uuid="first"))
@@ -105,7 +105,7 @@ def test_trigger_functionality(client, live_server):
client.get(url_for("form_watch_checknow"), follow_redirects=True)
# Give the thread time to pick it up
wait_for_all_checks(client)
time.sleep(sleep_time_for_fetch_thread)
# It should report nothing found (no new 'unviewed' class)
res = client.get(url_for("index"))
@@ -118,19 +118,18 @@ def test_trigger_functionality(client, live_server):
# Trigger a check
client.get(url_for("form_watch_checknow"), follow_redirects=True)
# Give the thread time to pick it up
wait_for_all_checks(client)
time.sleep(sleep_time_for_fetch_thread)
# It should report nothing found (no new 'unviewed' class)
res = client.get(url_for("index"))
assert b'unviewed' not in res.data
# Now set the content which contains the trigger text
wait_for_all_checks(client)
time.sleep(sleep_time_for_fetch_thread)
set_modified_with_trigger_text_response()
client.get(url_for("form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
time.sleep(sleep_time_for_fetch_thread)
res = client.get(url_for("index"))
assert b'unviewed' in res.data

View File

@@ -2,7 +2,7 @@
import time
from flask import url_for
from .util import live_server_setup, wait_for_all_checks
from . util import live_server_setup
def set_original_ignore_response():
@@ -43,7 +43,7 @@ def test_trigger_regex_functionality(client, live_server):
assert b"1 Imported" in res.data
# Give the thread time to pick it up
wait_for_all_checks(client)
time.sleep(sleep_time_for_fetch_thread)
# It should report nothing found (just a new one shouldnt have anything)
res = client.get(url_for("index"))
@@ -57,8 +57,7 @@ def test_trigger_regex_functionality(client, live_server):
"fetch_backend": "html_requests"},
follow_redirects=True
)
wait_for_all_checks(client)
time.sleep(sleep_time_for_fetch_thread)
# so that we set the state to 'unviewed' after all the edits
client.get(url_for("diff_history_page", uuid="first"))
@@ -66,7 +65,7 @@ def test_trigger_regex_functionality(client, live_server):
f.write("some new noise")
client.get(url_for("form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
time.sleep(sleep_time_for_fetch_thread)
# It should report nothing found (nothing should match the regex)
res = client.get(url_for("index"))
@@ -76,8 +75,7 @@ def test_trigger_regex_functionality(client, live_server):
f.write("regex test123<br>\nsomething 123")
client.get(url_for("form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
time.sleep(sleep_time_for_fetch_thread)
res = client.get(url_for("index"))
assert b'unviewed' in res.data

View File

@@ -2,7 +2,7 @@
import time
from flask import url_for
from .util import live_server_setup, wait_for_all_checks
from . util import live_server_setup
def set_original_ignore_response():
@@ -42,7 +42,7 @@ def test_trigger_regex_functionality_with_filter(client, live_server):
assert b"1 Imported" in res.data
# it needs time to save the original version
wait_for_all_checks(client)
time.sleep(sleep_time_for_fetch_thread)
### test regex with filter
res = client.post(
@@ -55,7 +55,7 @@ def test_trigger_regex_functionality_with_filter(client, live_server):
)
# Give the thread time to pick it up
wait_for_all_checks(client)
time.sleep(sleep_time_for_fetch_thread)
client.get(url_for("diff_history_page", uuid="first"))
@@ -64,7 +64,7 @@ def test_trigger_regex_functionality_with_filter(client, live_server):
f.write("<html>some new noise with cool stuff2 ok</html>")
client.get(url_for("form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
time.sleep(sleep_time_for_fetch_thread)
# It should report nothing found (nothing should match the regex and filter)
res = client.get(url_for("index"))
@@ -75,8 +75,7 @@ def test_trigger_regex_functionality_with_filter(client, live_server):
f.write("<html>some new noise with <span id=in-here>cool stuff6</span> ok</html>")
client.get(url_for("form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
time.sleep(sleep_time_for_fetch_thread)
res = client.get(url_for("index"))
assert b'unviewed' in res.data

View File

@@ -2,7 +2,7 @@
import time
from flask import url_for
from .util import live_server_setup, wait_for_all_checks
from .util import live_server_setup
def set_original_ignore_response():
@@ -67,7 +67,7 @@ def test_unique_lines_functionality(client, live_server):
follow_redirects=True
)
assert b"1 Imported" in res.data
wait_for_all_checks(client)
time.sleep(sleep_time_for_fetch_thread)
# Add our URL to the import page
res = client.post(
@@ -83,13 +83,12 @@ def test_unique_lines_functionality(client, live_server):
# Make a change
set_modified_swapped_lines()
wait_for_all_checks(client)
time.sleep(sleep_time_for_fetch_thread)
# Trigger a check
client.get(url_for("form_watch_checknow"), follow_redirects=True)
# Give the thread time to pick it up
wait_for_all_checks(client)
time.sleep(sleep_time_for_fetch_thread)
# It should report nothing found (no new 'unviewed' class)
res = client.get(url_for("index"))

View File

@@ -2,7 +2,7 @@
import time
from flask import url_for
from .util import live_server_setup, wait_for_all_checks
from . util import live_server_setup
from ..html_tools import *
@@ -86,15 +86,14 @@ def test_check_xpath_filter_utf8(client, live_server):
follow_redirects=True
)
assert b"1 Imported" in res.data
wait_for_all_checks(client)
time.sleep(1)
res = client.post(
url_for("edit_page", uuid="first"),
data={"include_filters": filter, "url": test_url, "tags": "", "headers": "", 'fetch_backend': "html_requests"},
follow_redirects=True
)
assert b"Updated watch." in res.data
wait_for_all_checks(client)
time.sleep(3)
res = client.get(url_for("index"))
assert b'Unicode strings with encoding declaration are not supported.' not in res.data
res = client.get(url_for("form_delete", uuid="all"), follow_redirects=True)
@@ -141,14 +140,14 @@ def test_check_xpath_text_function_utf8(client, live_server):
follow_redirects=True
)
assert b"1 Imported" in res.data
wait_for_all_checks(client)
time.sleep(1)
res = client.post(
url_for("edit_page", uuid="first"),
data={"include_filters": filter, "url": test_url, "tags": "", "headers": "", 'fetch_backend': "html_requests"},
follow_redirects=True
)
assert b"Updated watch." in res.data
wait_for_all_checks(client)
time.sleep(3)
res = client.get(url_for("index"))
assert b'Unicode strings with encoding declaration are not supported.' not in res.data
@@ -184,7 +183,7 @@ def test_check_markup_xpath_filter_restriction(client, live_server):
assert b"1 Imported" in res.data
# Give the thread time to pick it up
wait_for_all_checks(client)
time.sleep(sleep_time_for_fetch_thread)
# Goto the edit page, add our ignore text
# Add our URL to the import page
@@ -196,7 +195,7 @@ def test_check_markup_xpath_filter_restriction(client, live_server):
assert b"Updated watch." in res.data
# Give the thread time to pick it up
wait_for_all_checks(client)
time.sleep(sleep_time_for_fetch_thread)
# view it/reset state back to viewed
client.get(url_for("diff_history_page", uuid="first"), follow_redirects=True)
@@ -207,7 +206,7 @@ def test_check_markup_xpath_filter_restriction(client, live_server):
# Trigger a check
client.get(url_for("form_watch_checknow"), follow_redirects=True)
# Give the thread time to pick it up
wait_for_all_checks(client)
time.sleep(sleep_time_for_fetch_thread)
res = client.get(url_for("index"))
assert b'unviewed' not in res.data
@@ -217,6 +216,9 @@ def test_check_markup_xpath_filter_restriction(client, live_server):
def test_xpath_validation(client, live_server):
# Give the endpoint time to spin up
time.sleep(1)
# Add our URL to the import page
test_url = url_for('test_endpoint', _external=True)
res = client.post(
@@ -225,7 +227,7 @@ def test_xpath_validation(client, live_server):
follow_redirects=True
)
assert b"1 Imported" in res.data
wait_for_all_checks(client)
time.sleep(2)
res = client.post(
url_for("edit_page", uuid="first"),
@@ -255,7 +257,7 @@ def test_check_with_prefix_include_filters(client, live_server):
follow_redirects=True
)
assert b"1 Imported" in res.data
wait_for_all_checks(client)
time.sleep(3)
res = client.post(
url_for("edit_page", uuid="first"),
@@ -264,7 +266,7 @@ def test_check_with_prefix_include_filters(client, live_server):
)
assert b"Updated watch." in res.data
wait_for_all_checks(client)
time.sleep(3)
res = client.get(
url_for("preview_page", uuid="first"),

View File

@@ -38,7 +38,25 @@ def set_modified_response():
f.write(test_return_data)
return None
def set_longer_modified_response():
test_return_data = """<html>
<head><title>modified head title</title></head>
<body>
Some initial text<br>
<p>which has this one new line</p>
<br>
So let's see what happens. <br>
So let's see what happens. <br>
So let's see what happens. <br>
So let's see what happens. <br>
</body>
</html>
"""
with open("test-datastore/endpoint-content.txt", "w") as f:
f.write(test_return_data)
return None
def set_more_modified_response():
test_return_data = """<html>
<head><title>modified head title</title></head>
@@ -187,6 +205,10 @@ def live_server_setup(live_server):
with open("test-datastore/notification-url.txt", "w") as f:
f.write(request.url)
if request.content_type:
with open("test-datastore/notification-content-type.txt", "w") as f:
f.write(request.content_type)
print("\n>> Test notification endpoint was hit.\n", data)
return "Text was set"

View File

@@ -32,15 +32,17 @@ class update_worker(threading.Thread):
watch_history = watch.history
dates = list(watch_history.keys())
# Add text that was triggered
snapshot_contents = watch.get_history_snapshot(dates[-1])
# HTML needs linebreak, but MarkDown and Text can use a linefeed
if n_object['notification_format'] == 'HTML':
line_feed_sep = "<br>"
# Snapshot will be plaintext on the disk, convert to some kind of HTML
snapshot_contents = snapshot_contents.replace('\n', line_feed_sep)
else:
line_feed_sep = "\n"
# Add text that was triggered
snapshot_contents = watch.get_history_snapshot(dates[-1])
trigger_text = watch.get('trigger_text', [])
triggered_text = ''
@@ -78,6 +80,9 @@ class update_worker(threading.Thread):
# Would be better if this was some kind of Object where Watch can reference the parent datastore etc
v = watch.get(var_name)
if v and not watch.get('notification_muted'):
if var_name == 'notification_format' and v == default_notification_format_for_watch:
return self.datastore.data['settings']['application'].get('notification_format')
return v
tags = self.datastore.get_all_tags_for_watch(uuid=watch.get('uuid'))