mirror of
https://github.com/dgtlmoon/changedetection.io.git
synced 2026-01-06 23:30:21 +00:00
Compare commits
10 Commits
elementpat
...
test-clean
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
bc79c60873 | ||
|
|
2cef3cb77f | ||
|
|
91f0d02129 | ||
|
|
2f777ea3bb | ||
|
|
e709201955 | ||
|
|
572f71299f | ||
|
|
5f150c4f03 | ||
|
|
7d43539eda | ||
|
|
6bdd5ab20a | ||
|
|
b29fec0d95 |
@@ -38,7 +38,7 @@ from flask_paginate import Pagination, get_page_parameter
|
|||||||
from changedetectionio import html_tools
|
from changedetectionio import html_tools
|
||||||
from changedetectionio.api import api_v1
|
from changedetectionio.api import api_v1
|
||||||
|
|
||||||
__version__ = '0.43'
|
__version__ = '0.43.1'
|
||||||
|
|
||||||
datastore = None
|
datastore = None
|
||||||
|
|
||||||
|
|||||||
@@ -219,13 +219,15 @@ class CreateWatch(Resource):
|
|||||||
|
|
||||||
extras = copy.deepcopy(json_data)
|
extras = copy.deepcopy(json_data)
|
||||||
|
|
||||||
# Because we renamed 'tag' to 'tags' but dont want to change the API (can do this in v2 of the API)
|
# Because we renamed 'tag' to 'tags' but don't want to change the API (can do this in v2 of the API)
|
||||||
|
tags = None
|
||||||
if extras.get('tag'):
|
if extras.get('tag'):
|
||||||
extras['tags'] = extras.get('tag')
|
tags = extras.get('tag')
|
||||||
|
del extras['tag']
|
||||||
|
|
||||||
del extras['url']
|
del extras['url']
|
||||||
|
|
||||||
new_uuid = self.datastore.add_watch(url=url, extras=extras)
|
new_uuid = self.datastore.add_watch(url=url, extras=extras, tag=tags)
|
||||||
if new_uuid:
|
if new_uuid:
|
||||||
self.update_q.put(queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': new_uuid, 'skip_when_checksum_same': True}))
|
self.update_q.put(queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': new_uuid, 'skip_when_checksum_same': True}))
|
||||||
return {'uuid': new_uuid}, 201
|
return {'uuid': new_uuid}, 201
|
||||||
|
|||||||
@@ -76,6 +76,16 @@ def construct_blueprint(datastore: ChangeDetectionStore):
|
|||||||
flash(f"Tag unlinked removed from {unlinked} watches")
|
flash(f"Tag unlinked removed from {unlinked} watches")
|
||||||
return redirect(url_for('tags.tags_overview_page'))
|
return redirect(url_for('tags.tags_overview_page'))
|
||||||
|
|
||||||
|
@tags_blueprint.route("/delete_all", methods=['GET'])
|
||||||
|
@login_optionally_required
|
||||||
|
def delete_all():
|
||||||
|
for watch_uuid, watch in datastore.data['watching'].items():
|
||||||
|
watch['tags'] = []
|
||||||
|
datastore.data['settings']['application']['tags'] = {}
|
||||||
|
|
||||||
|
flash(f"All tags deleted")
|
||||||
|
return redirect(url_for('tags.tags_overview_page'))
|
||||||
|
|
||||||
@tags_blueprint.route("/edit/<string:uuid>", methods=['GET'])
|
@tags_blueprint.route("/edit/<string:uuid>", methods=['GET'])
|
||||||
@login_optionally_required
|
@login_optionally_required
|
||||||
def form_tag_edit(uuid):
|
def form_tag_edit(uuid):
|
||||||
|
|||||||
@@ -85,6 +85,7 @@ class import_distill_io_json(Importer):
|
|||||||
now = time.time()
|
now = time.time()
|
||||||
self.new_uuids=[]
|
self.new_uuids=[]
|
||||||
|
|
||||||
|
# @todo Use JSONSchema like in the API to validate here.
|
||||||
|
|
||||||
try:
|
try:
|
||||||
data = json.loads(data.strip())
|
data = json.loads(data.strip())
|
||||||
@@ -120,11 +121,8 @@ class import_distill_io_json(Importer):
|
|||||||
except IndexError:
|
except IndexError:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
# Does this need to be here anymore?
|
|
||||||
if d.get('tags', False):
|
|
||||||
extras['tags'] = ", ".join(d['tags'])
|
|
||||||
|
|
||||||
new_uuid = datastore.add_watch(url=d['uri'].strip(),
|
new_uuid = datastore.add_watch(url=d['uri'].strip(),
|
||||||
|
tag=",".join(d.get('tags', [])),
|
||||||
extras=extras,
|
extras=extras,
|
||||||
write_to_disk_now=False)
|
write_to_disk_now=False)
|
||||||
|
|
||||||
|
|||||||
@@ -93,6 +93,12 @@ def process_notification(n_object, datastore):
|
|||||||
valid_notification_formats[default_notification_format],
|
valid_notification_formats[default_notification_format],
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# If we arrived with 'System default' then look it up
|
||||||
|
if n_format == default_notification_format_for_watch and datastore.data['settings']['application'].get('notification_format') != default_notification_format_for_watch:
|
||||||
|
# Initially text or whatever
|
||||||
|
n_format = datastore.data['settings']['application'].get('notification_format', valid_notification_formats[default_notification_format])
|
||||||
|
|
||||||
|
|
||||||
# https://github.com/caronc/apprise/wiki/Development_LogCapture
|
# https://github.com/caronc/apprise/wiki/Development_LogCapture
|
||||||
# Anything higher than or equal to WARNING (which covers things like Connection errors)
|
# Anything higher than or equal to WARNING (which covers things like Connection errors)
|
||||||
# raise it as an exception
|
# raise it as an exception
|
||||||
|
|||||||
@@ -205,10 +205,9 @@ class ChangeDetectionStore:
|
|||||||
|
|
||||||
# Clone a watch by UUID
|
# Clone a watch by UUID
|
||||||
def clone(self, uuid):
|
def clone(self, uuid):
|
||||||
url = self.data['watching'][uuid]['url']
|
url = self.data['watching'][uuid].get('url')
|
||||||
tag = self.data['watching'][uuid].get('tags',[])
|
|
||||||
extras = self.data['watching'][uuid]
|
extras = self.data['watching'][uuid]
|
||||||
new_uuid = self.add_watch(url=url, tag_uuids=tag, extras=extras)
|
new_uuid = self.add_watch(url=url, extras=extras)
|
||||||
return new_uuid
|
return new_uuid
|
||||||
|
|
||||||
def url_exists(self, url):
|
def url_exists(self, url):
|
||||||
@@ -248,12 +247,9 @@ class ChangeDetectionStore:
|
|||||||
if extras is None:
|
if extras is None:
|
||||||
extras = {}
|
extras = {}
|
||||||
|
|
||||||
# should always be str
|
|
||||||
if tag is None or not tag:
|
|
||||||
tag = ''
|
|
||||||
|
|
||||||
# Incase these are copied across, assume it's a reference and deepcopy()
|
# Incase these are copied across, assume it's a reference and deepcopy()
|
||||||
apply_extras = deepcopy(extras)
|
apply_extras = deepcopy(extras)
|
||||||
|
apply_extras['tags'] = [] if not apply_extras.get('tags') else apply_extras.get('tags')
|
||||||
|
|
||||||
# Was it a share link? try to fetch the data
|
# Was it a share link? try to fetch the data
|
||||||
if (url.startswith("https://changedetection.io/share/")):
|
if (url.startswith("https://changedetection.io/share/")):
|
||||||
@@ -303,20 +299,22 @@ class ChangeDetectionStore:
|
|||||||
flash('Watch protocol is not permitted by SAFE_PROTOCOL_REGEX', 'error')
|
flash('Watch protocol is not permitted by SAFE_PROTOCOL_REGEX', 'error')
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
if tag and type(tag) == str:
|
||||||
# #Re 569
|
# Then it's probably a string of the actual tag by name, split and add it
|
||||||
# Could be in 'tags', var or extras, smash them together and strip
|
for t in tag.split(','):
|
||||||
apply_extras['tags'] = []
|
|
||||||
if tag or extras.get('tags'):
|
|
||||||
tags = list(filter(None, list(set().union(tag.split(','), extras.get('tags', '').split(',')))))
|
|
||||||
for t in list(map(str.strip, tags)):
|
|
||||||
# for each stripped tag, add tag as UUID
|
# for each stripped tag, add tag as UUID
|
||||||
apply_extras['tags'].append(self.add_tag(t))
|
for a_t in t.split(','):
|
||||||
|
tag_uuid = self.add_tag(a_t)
|
||||||
|
apply_extras['tags'].append(tag_uuid)
|
||||||
|
|
||||||
# Or if UUIDs given directly
|
# Or if UUIDs given directly
|
||||||
if tag_uuids:
|
if tag_uuids:
|
||||||
apply_extras['tags'] = list(set(apply_extras['tags'] + tag_uuids))
|
apply_extras['tags'] = list(set(apply_extras['tags'] + tag_uuids))
|
||||||
|
|
||||||
|
# Make any uuids unique
|
||||||
|
if apply_extras.get('tags'):
|
||||||
|
apply_extras['tags'] = list(set(apply_extras.get('tags')))
|
||||||
|
|
||||||
new_watch = Watch.model(datastore_path=self.datastore_path, url=url)
|
new_watch = Watch.model(datastore_path=self.datastore_path, url=url)
|
||||||
|
|
||||||
new_uuid = new_watch.get('uuid')
|
new_uuid = new_watch.get('uuid')
|
||||||
@@ -568,7 +566,7 @@ class ChangeDetectionStore:
|
|||||||
def add_tag(self, name):
|
def add_tag(self, name):
|
||||||
# If name exists, return that
|
# If name exists, return that
|
||||||
n = name.strip().lower()
|
n = name.strip().lower()
|
||||||
print (f">>> Adding new tag - '{n}")
|
print (f">>> Adding new tag - '{n}'")
|
||||||
if not n:
|
if not n:
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
|||||||
@@ -28,8 +28,6 @@ def test_fetch_webdriver_content(client, live_server):
|
|||||||
)
|
)
|
||||||
|
|
||||||
assert b"1 Imported" in res.data
|
assert b"1 Imported" in res.data
|
||||||
time.sleep(3)
|
|
||||||
|
|
||||||
wait_for_all_checks(client)
|
wait_for_all_checks(client)
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -2,10 +2,11 @@
|
|||||||
|
|
||||||
import time
|
import time
|
||||||
from flask import url_for
|
from flask import url_for
|
||||||
from ..util import live_server_setup
|
from ..util import live_server_setup, wait_for_all_checks
|
||||||
|
|
||||||
|
|
||||||
def test_preferred_proxy(client, live_server):
|
def test_preferred_proxy(client, live_server):
|
||||||
time.sleep(1)
|
|
||||||
live_server_setup(live_server)
|
live_server_setup(live_server)
|
||||||
time.sleep(1)
|
time.sleep(1)
|
||||||
url = "http://chosen.changedetection.io"
|
url = "http://chosen.changedetection.io"
|
||||||
@@ -20,7 +21,8 @@ def test_preferred_proxy(client, live_server):
|
|||||||
|
|
||||||
assert b"1 Imported" in res.data
|
assert b"1 Imported" in res.data
|
||||||
|
|
||||||
time.sleep(2)
|
wait_for_all_checks(client)
|
||||||
|
|
||||||
res = client.post(
|
res = client.post(
|
||||||
url_for("edit_page", uuid="first"),
|
url_for("edit_page", uuid="first"),
|
||||||
data={
|
data={
|
||||||
@@ -34,5 +36,6 @@ def test_preferred_proxy(client, live_server):
|
|||||||
follow_redirects=True
|
follow_redirects=True
|
||||||
)
|
)
|
||||||
assert b"Updated watch." in res.data
|
assert b"Updated watch." in res.data
|
||||||
time.sleep(2)
|
wait_for_all_checks(client)
|
||||||
|
|
||||||
# Now the request should appear in the second-squid logs
|
# Now the request should appear in the second-squid logs
|
||||||
|
|||||||
@@ -16,4 +16,4 @@ def test_check_basic_change_detection_functionality(client, live_server):
|
|||||||
)
|
)
|
||||||
|
|
||||||
assert b"1 Imported" in res.data
|
assert b"1 Imported" in res.data
|
||||||
time.sleep(3)
|
wait_for_all_checks(client)
|
||||||
|
|||||||
@@ -94,7 +94,7 @@ def test_restock_detection(client, live_server):
|
|||||||
assert b'not-in-stock' not in res.data
|
assert b'not-in-stock' not in res.data
|
||||||
|
|
||||||
# We should have a notification
|
# We should have a notification
|
||||||
time.sleep(2)
|
wait_for_all_checks(client)
|
||||||
assert os.path.isfile("test-datastore/notification.txt")
|
assert os.path.isfile("test-datastore/notification.txt")
|
||||||
os.unlink("test-datastore/notification.txt")
|
os.unlink("test-datastore/notification.txt")
|
||||||
|
|
||||||
|
|||||||
@@ -1,4 +1,4 @@
|
|||||||
from . util import live_server_setup, extract_UUID_from_client
|
from .util import live_server_setup, extract_UUID_from_client, wait_for_all_checks
|
||||||
from flask import url_for
|
from flask import url_for
|
||||||
import time
|
import time
|
||||||
|
|
||||||
@@ -19,10 +19,10 @@ def test_check_access_control(app, client, live_server):
|
|||||||
)
|
)
|
||||||
|
|
||||||
assert b"1 Imported" in res.data
|
assert b"1 Imported" in res.data
|
||||||
time.sleep(2)
|
wait_for_all_checks(client)
|
||||||
res = client.get(url_for("form_watch_checknow"), follow_redirects=True)
|
res = client.get(url_for("form_watch_checknow"), follow_redirects=True)
|
||||||
assert b'1 watches queued for rechecking.' in res.data
|
assert b'1 watches queued for rechecking.' in res.data
|
||||||
time.sleep(2)
|
wait_for_all_checks(client)
|
||||||
|
|
||||||
# Enable password check and diff page access bypass
|
# Enable password check and diff page access bypass
|
||||||
res = c.post(
|
res = c.post(
|
||||||
|
|||||||
@@ -41,7 +41,6 @@ def test_setup(client, live_server):
|
|||||||
def test_check_removed_line_contains_trigger(client, live_server):
|
def test_check_removed_line_contains_trigger(client, live_server):
|
||||||
|
|
||||||
# Give the endpoint time to spin up
|
# Give the endpoint time to spin up
|
||||||
time.sleep(1)
|
|
||||||
set_original()
|
set_original()
|
||||||
# Add our URL to the import page
|
# Add our URL to the import page
|
||||||
test_url = url_for('test_endpoint', _external=True)
|
test_url = url_for('test_endpoint', _external=True)
|
||||||
|
|||||||
@@ -267,7 +267,9 @@ def test_api_watch_PUT_update(client, live_server):
|
|||||||
|
|
||||||
#live_server_setup(live_server)
|
#live_server_setup(live_server)
|
||||||
api_key = extract_api_key_from_UI(client)
|
api_key = extract_api_key_from_UI(client)
|
||||||
time.sleep(1)
|
|
||||||
|
wait_for_all_checks(client)
|
||||||
|
|
||||||
# Create a watch
|
# Create a watch
|
||||||
set_original_response()
|
set_original_response()
|
||||||
test_url = url_for('test_endpoint', _external=True,
|
test_url = url_for('test_endpoint', _external=True,
|
||||||
@@ -283,7 +285,9 @@ def test_api_watch_PUT_update(client, live_server):
|
|||||||
|
|
||||||
assert res.status_code == 201
|
assert res.status_code == 201
|
||||||
|
|
||||||
time.sleep(1)
|
|
||||||
|
wait_for_all_checks(client)
|
||||||
|
|
||||||
|
|
||||||
# Get a listing, it will be the first one
|
# Get a listing, it will be the first one
|
||||||
res = client.get(
|
res = client.get(
|
||||||
|
|||||||
@@ -2,7 +2,8 @@
|
|||||||
|
|
||||||
import time
|
import time
|
||||||
from flask import url_for
|
from flask import url_for
|
||||||
from . util import live_server_setup
|
from .util import live_server_setup, wait_for_all_checks
|
||||||
|
|
||||||
|
|
||||||
def test_basic_auth(client, live_server):
|
def test_basic_auth(client, live_server):
|
||||||
|
|
||||||
@@ -19,7 +20,7 @@ def test_basic_auth(client, live_server):
|
|||||||
follow_redirects=True
|
follow_redirects=True
|
||||||
)
|
)
|
||||||
assert b"1 Imported" in res.data
|
assert b"1 Imported" in res.data
|
||||||
time.sleep(1)
|
wait_for_all_checks(client)
|
||||||
|
|
||||||
# Check form validation
|
# Check form validation
|
||||||
res = client.post(
|
res = client.post(
|
||||||
@@ -29,7 +30,7 @@ def test_basic_auth(client, live_server):
|
|||||||
)
|
)
|
||||||
assert b"Updated watch." in res.data
|
assert b"Updated watch." in res.data
|
||||||
|
|
||||||
time.sleep(1)
|
wait_for_all_checks(client)
|
||||||
res = client.get(
|
res = client.get(
|
||||||
url_for("preview_page", uuid="first"),
|
url_for("preview_page", uuid="first"),
|
||||||
follow_redirects=True
|
follow_redirects=True
|
||||||
|
|||||||
@@ -2,7 +2,8 @@
|
|||||||
|
|
||||||
import time
|
import time
|
||||||
from flask import url_for
|
from flask import url_for
|
||||||
from .util import live_server_setup, extract_UUID_from_client, extract_api_key_from_UI
|
from .util import live_server_setup, extract_UUID_from_client, extract_api_key_from_UI, wait_for_all_checks
|
||||||
|
|
||||||
|
|
||||||
def set_response_with_ldjson():
|
def set_response_with_ldjson():
|
||||||
test_return_data = """<html>
|
test_return_data = """<html>
|
||||||
@@ -92,7 +93,7 @@ def test_check_ldjson_price_autodetect(client, live_server):
|
|||||||
follow_redirects=True
|
follow_redirects=True
|
||||||
)
|
)
|
||||||
assert b"1 Imported" in res.data
|
assert b"1 Imported" in res.data
|
||||||
time.sleep(3)
|
wait_for_all_checks(client)
|
||||||
|
|
||||||
# Should get a notice that it's available
|
# Should get a notice that it's available
|
||||||
res = client.get(url_for("index"))
|
res = client.get(url_for("index"))
|
||||||
@@ -102,11 +103,11 @@ def test_check_ldjson_price_autodetect(client, live_server):
|
|||||||
uuid = extract_UUID_from_client(client)
|
uuid = extract_UUID_from_client(client)
|
||||||
|
|
||||||
client.get(url_for('price_data_follower.accept', uuid=uuid, follow_redirects=True))
|
client.get(url_for('price_data_follower.accept', uuid=uuid, follow_redirects=True))
|
||||||
time.sleep(2)
|
wait_for_all_checks(client)
|
||||||
|
|
||||||
# Trigger a check
|
# Trigger a check
|
||||||
client.get(url_for("form_watch_checknow"), follow_redirects=True)
|
client.get(url_for("form_watch_checknow"), follow_redirects=True)
|
||||||
time.sleep(2)
|
wait_for_all_checks(client)
|
||||||
# Offer should be gone
|
# Offer should be gone
|
||||||
res = client.get(url_for("index"))
|
res = client.get(url_for("index"))
|
||||||
assert b'Embedded price data' not in res.data
|
assert b'Embedded price data' not in res.data
|
||||||
@@ -138,7 +139,7 @@ def test_check_ldjson_price_autodetect(client, live_server):
|
|||||||
follow_redirects=True
|
follow_redirects=True
|
||||||
)
|
)
|
||||||
assert b"1 Imported" in res.data
|
assert b"1 Imported" in res.data
|
||||||
time.sleep(3)
|
wait_for_all_checks(client)
|
||||||
res = client.get(url_for("index"))
|
res = client.get(url_for("index"))
|
||||||
assert b'ldjson-price-track-offer' not in res.data
|
assert b'ldjson-price-track-offer' not in res.data
|
||||||
|
|
||||||
|
|||||||
@@ -28,8 +28,7 @@ def test_check_basic_change_detection_functionality(client, live_server):
|
|||||||
)
|
)
|
||||||
|
|
||||||
assert b"1 Imported" in res.data
|
assert b"1 Imported" in res.data
|
||||||
|
wait_for_all_checks(client)
|
||||||
time.sleep(sleep_time_for_fetch_thread)
|
|
||||||
|
|
||||||
# Do this a few times.. ensures we dont accidently set the status
|
# Do this a few times.. ensures we dont accidently set the status
|
||||||
for n in range(3):
|
for n in range(3):
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
#!/usr/bin/python3
|
#!/usr/bin/python3
|
||||||
|
|
||||||
from .util import set_original_response, set_modified_response, live_server_setup
|
from .util import set_original_response, set_modified_response, live_server_setup, wait_for_all_checks
|
||||||
from flask import url_for
|
from flask import url_for
|
||||||
from urllib.request import urlopen
|
from urllib.request import urlopen
|
||||||
from zipfile import ZipFile
|
from zipfile import ZipFile
|
||||||
@@ -24,7 +24,7 @@ def test_backup(client, live_server):
|
|||||||
)
|
)
|
||||||
|
|
||||||
assert b"1 Imported" in res.data
|
assert b"1 Imported" in res.data
|
||||||
time.sleep(3)
|
wait_for_all_checks(client)
|
||||||
|
|
||||||
res = client.get(
|
res = client.get(
|
||||||
url_for("get_backup"),
|
url_for("get_backup"),
|
||||||
|
|||||||
@@ -2,7 +2,7 @@
|
|||||||
|
|
||||||
import time
|
import time
|
||||||
from flask import url_for
|
from flask import url_for
|
||||||
from . util import live_server_setup
|
from .util import live_server_setup, wait_for_all_checks
|
||||||
|
|
||||||
from ..html_tools import *
|
from ..html_tools import *
|
||||||
|
|
||||||
@@ -90,7 +90,7 @@ def test_check_markup_include_filters_restriction(client, live_server):
|
|||||||
assert b"1 Imported" in res.data
|
assert b"1 Imported" in res.data
|
||||||
|
|
||||||
# Give the thread time to pick it up
|
# Give the thread time to pick it up
|
||||||
time.sleep(sleep_time_for_fetch_thread)
|
wait_for_all_checks(client)
|
||||||
|
|
||||||
# Goto the edit page, add our ignore text
|
# Goto the edit page, add our ignore text
|
||||||
# Add our URL to the import page
|
# Add our URL to the import page
|
||||||
@@ -100,7 +100,7 @@ def test_check_markup_include_filters_restriction(client, live_server):
|
|||||||
follow_redirects=True
|
follow_redirects=True
|
||||||
)
|
)
|
||||||
assert b"Updated watch." in res.data
|
assert b"Updated watch." in res.data
|
||||||
time.sleep(1)
|
wait_for_all_checks(client)
|
||||||
# Check it saved
|
# Check it saved
|
||||||
res = client.get(
|
res = client.get(
|
||||||
url_for("edit_page", uuid="first"),
|
url_for("edit_page", uuid="first"),
|
||||||
@@ -108,14 +108,14 @@ def test_check_markup_include_filters_restriction(client, live_server):
|
|||||||
assert bytes(include_filters.encode('utf-8')) in res.data
|
assert bytes(include_filters.encode('utf-8')) in res.data
|
||||||
|
|
||||||
# Give the thread time to pick it up
|
# Give the thread time to pick it up
|
||||||
time.sleep(sleep_time_for_fetch_thread)
|
wait_for_all_checks(client)
|
||||||
# Make a change
|
# Make a change
|
||||||
set_modified_response()
|
set_modified_response()
|
||||||
|
|
||||||
# Trigger a check
|
# Trigger a check
|
||||||
client.get(url_for("form_watch_checknow"), follow_redirects=True)
|
client.get(url_for("form_watch_checknow"), follow_redirects=True)
|
||||||
# Give the thread time to pick it up
|
# Give the thread time to pick it up
|
||||||
time.sleep(sleep_time_for_fetch_thread)
|
wait_for_all_checks(client)
|
||||||
|
|
||||||
# It should have 'unviewed' still
|
# It should have 'unviewed' still
|
||||||
# Because it should be looking at only that 'sametext' id
|
# Because it should be looking at only that 'sametext' id
|
||||||
@@ -139,7 +139,7 @@ def test_check_multiple_filters(client, live_server):
|
|||||||
""")
|
""")
|
||||||
|
|
||||||
# Give the endpoint time to spin up
|
# Give the endpoint time to spin up
|
||||||
time.sleep(1)
|
wait_for_all_checks(client)
|
||||||
|
|
||||||
# Add our URL to the import page
|
# Add our URL to the import page
|
||||||
test_url = url_for('test_endpoint', _external=True)
|
test_url = url_for('test_endpoint', _external=True)
|
||||||
@@ -149,7 +149,7 @@ def test_check_multiple_filters(client, live_server):
|
|||||||
follow_redirects=True
|
follow_redirects=True
|
||||||
)
|
)
|
||||||
assert b"1 Imported" in res.data
|
assert b"1 Imported" in res.data
|
||||||
time.sleep(1)
|
wait_for_all_checks(client)
|
||||||
|
|
||||||
# Goto the edit page, add our ignore text
|
# Goto the edit page, add our ignore text
|
||||||
# Add our URL to the import page
|
# Add our URL to the import page
|
||||||
@@ -165,7 +165,7 @@ def test_check_multiple_filters(client, live_server):
|
|||||||
assert b"Updated watch." in res.data
|
assert b"Updated watch." in res.data
|
||||||
|
|
||||||
# Give the thread time to pick it up
|
# Give the thread time to pick it up
|
||||||
time.sleep(sleep_time_for_fetch_thread)
|
wait_for_all_checks(client)
|
||||||
|
|
||||||
res = client.get(
|
res = client.get(
|
||||||
url_for("preview_page", uuid="first"),
|
url_for("preview_page", uuid="first"),
|
||||||
|
|||||||
@@ -5,7 +5,7 @@ import time
|
|||||||
from flask import url_for
|
from flask import url_for
|
||||||
|
|
||||||
from ..html_tools import *
|
from ..html_tools import *
|
||||||
from .util import live_server_setup
|
from .util import live_server_setup, wait_for_all_checks
|
||||||
|
|
||||||
|
|
||||||
def test_setup(live_server):
|
def test_setup(live_server):
|
||||||
@@ -120,7 +120,7 @@ def test_element_removal_full(client, live_server):
|
|||||||
url_for("import_page"), data={"urls": test_url}, follow_redirects=True
|
url_for("import_page"), data={"urls": test_url}, follow_redirects=True
|
||||||
)
|
)
|
||||||
assert b"1 Imported" in res.data
|
assert b"1 Imported" in res.data
|
||||||
time.sleep(1)
|
wait_for_all_checks(client)
|
||||||
# Goto the edit page, add the filter data
|
# Goto the edit page, add the filter data
|
||||||
# Not sure why \r needs to be added - absent of the #changetext this is not necessary
|
# Not sure why \r needs to be added - absent of the #changetext this is not necessary
|
||||||
subtractive_selectors_data = "header\r\nfooter\r\nnav\r\n#changetext"
|
subtractive_selectors_data = "header\r\nfooter\r\nnav\r\n#changetext"
|
||||||
@@ -147,7 +147,7 @@ def test_element_removal_full(client, live_server):
|
|||||||
client.get(url_for("form_watch_checknow"), follow_redirects=True)
|
client.get(url_for("form_watch_checknow"), follow_redirects=True)
|
||||||
|
|
||||||
# Give the thread time to pick it up
|
# Give the thread time to pick it up
|
||||||
time.sleep(sleep_time_for_fetch_thread)
|
wait_for_all_checks(client)
|
||||||
|
|
||||||
# so that we set the state to 'unviewed' after all the edits
|
# so that we set the state to 'unviewed' after all the edits
|
||||||
client.get(url_for("diff_history_page", uuid="first"))
|
client.get(url_for("diff_history_page", uuid="first"))
|
||||||
@@ -159,7 +159,7 @@ def test_element_removal_full(client, live_server):
|
|||||||
client.get(url_for("form_watch_checknow"), follow_redirects=True)
|
client.get(url_for("form_watch_checknow"), follow_redirects=True)
|
||||||
|
|
||||||
# Give the thread time to pick it up
|
# Give the thread time to pick it up
|
||||||
time.sleep(sleep_time_for_fetch_thread)
|
wait_for_all_checks(client)
|
||||||
|
|
||||||
# There should not be an unviewed change, as changes should be removed
|
# There should not be an unviewed change, as changes should be removed
|
||||||
res = client.get(url_for("index"))
|
res = client.get(url_for("index"))
|
||||||
|
|||||||
@@ -3,7 +3,7 @@
|
|||||||
|
|
||||||
import time
|
import time
|
||||||
from flask import url_for
|
from flask import url_for
|
||||||
from .util import live_server_setup
|
from .util import live_server_setup, wait_for_all_checks
|
||||||
import pytest
|
import pytest
|
||||||
|
|
||||||
|
|
||||||
@@ -39,7 +39,7 @@ def test_check_encoding_detection(client, live_server):
|
|||||||
)
|
)
|
||||||
|
|
||||||
# Give the thread time to pick it up
|
# Give the thread time to pick it up
|
||||||
time.sleep(2)
|
wait_for_all_checks(client)
|
||||||
|
|
||||||
res = client.get(
|
res = client.get(
|
||||||
url_for("preview_page", uuid="first"),
|
url_for("preview_page", uuid="first"),
|
||||||
@@ -68,7 +68,7 @@ def test_check_encoding_detection_missing_content_type_header(client, live_serve
|
|||||||
)
|
)
|
||||||
|
|
||||||
# Give the thread time to pick it up
|
# Give the thread time to pick it up
|
||||||
time.sleep(2)
|
wait_for_all_checks(client)
|
||||||
|
|
||||||
res = client.get(
|
res = client.get(
|
||||||
url_for("preview_page", uuid="first"),
|
url_for("preview_page", uuid="first"),
|
||||||
|
|||||||
@@ -3,7 +3,7 @@
|
|||||||
import time
|
import time
|
||||||
|
|
||||||
from flask import url_for
|
from flask import url_for
|
||||||
from . util import live_server_setup
|
from .util import live_server_setup, wait_for_all_checks
|
||||||
|
|
||||||
from ..html_tools import *
|
from ..html_tools import *
|
||||||
|
|
||||||
@@ -30,7 +30,7 @@ def _runner_test_http_errors(client, live_server, http_code, expected_text):
|
|||||||
assert b"1 Imported" in res.data
|
assert b"1 Imported" in res.data
|
||||||
|
|
||||||
# Give the thread time to pick it up
|
# Give the thread time to pick it up
|
||||||
time.sleep(2)
|
wait_for_all_checks(client)
|
||||||
|
|
||||||
res = client.get(url_for("index"))
|
res = client.get(url_for("index"))
|
||||||
# no change
|
# no change
|
||||||
@@ -76,7 +76,7 @@ def test_DNS_errors(client, live_server):
|
|||||||
assert b"1 Imported" in res.data
|
assert b"1 Imported" in res.data
|
||||||
|
|
||||||
# Give the thread time to pick it up
|
# Give the thread time to pick it up
|
||||||
time.sleep(3)
|
wait_for_all_checks(client)
|
||||||
|
|
||||||
res = client.get(url_for("index"))
|
res = client.get(url_for("index"))
|
||||||
found_name_resolution_error = b"Temporary failure in name resolution" in res.data or b"Name or service not known" in res.data
|
found_name_resolution_error = b"Temporary failure in name resolution" in res.data or b"Name or service not known" in res.data
|
||||||
@@ -90,7 +90,7 @@ def test_DNS_errors(client, live_server):
|
|||||||
def test_low_level_errors_clear_correctly(client, live_server):
|
def test_low_level_errors_clear_correctly(client, live_server):
|
||||||
#live_server_setup(live_server)
|
#live_server_setup(live_server)
|
||||||
# Give the endpoint time to spin up
|
# Give the endpoint time to spin up
|
||||||
time.sleep(1)
|
#time.sleep(1)
|
||||||
|
|
||||||
with open("test-datastore/endpoint-content.txt", "w") as f:
|
with open("test-datastore/endpoint-content.txt", "w") as f:
|
||||||
f.write("<html><body><div id=here>Hello world</div></body></html>")
|
f.write("<html><body><div id=here>Hello world</div></body></html>")
|
||||||
@@ -104,7 +104,7 @@ def test_low_level_errors_clear_correctly(client, live_server):
|
|||||||
follow_redirects=True
|
follow_redirects=True
|
||||||
)
|
)
|
||||||
assert b"1 Imported" in res.data
|
assert b"1 Imported" in res.data
|
||||||
time.sleep(2)
|
wait_for_all_checks(client)
|
||||||
|
|
||||||
# We should see the DNS error
|
# We should see the DNS error
|
||||||
res = client.get(url_for("index"))
|
res = client.get(url_for("index"))
|
||||||
@@ -121,7 +121,7 @@ def test_low_level_errors_clear_correctly(client, live_server):
|
|||||||
)
|
)
|
||||||
|
|
||||||
# Now the error should be gone
|
# Now the error should be gone
|
||||||
time.sleep(2)
|
wait_for_all_checks(client)
|
||||||
res = client.get(url_for("index"))
|
res = client.get(url_for("index"))
|
||||||
found_name_resolution_error = b"Temporary failure in name resolution" in res.data or b"Name or service not known" in res.data
|
found_name_resolution_error = b"Temporary failure in name resolution" in res.data or b"Name or service not known" in res.data
|
||||||
assert not found_name_resolution_error
|
assert not found_name_resolution_error
|
||||||
|
|||||||
@@ -24,7 +24,7 @@ def test_check_extract_text_from_diff(client, live_server):
|
|||||||
)
|
)
|
||||||
|
|
||||||
assert b"1 Imported" in res.data
|
assert b"1 Imported" in res.data
|
||||||
time.sleep(1)
|
wait_for_all_checks
|
||||||
|
|
||||||
# Load in 5 different numbers/changes
|
# Load in 5 different numbers/changes
|
||||||
last_date=""
|
last_date=""
|
||||||
|
|||||||
@@ -2,7 +2,7 @@
|
|||||||
|
|
||||||
import time
|
import time
|
||||||
from flask import url_for
|
from flask import url_for
|
||||||
from .util import live_server_setup
|
from .util import live_server_setup, wait_for_all_checks
|
||||||
|
|
||||||
from ..html_tools import *
|
from ..html_tools import *
|
||||||
|
|
||||||
@@ -82,7 +82,7 @@ def test_check_filter_multiline(client, live_server):
|
|||||||
)
|
)
|
||||||
assert b"1 Imported" in res.data
|
assert b"1 Imported" in res.data
|
||||||
|
|
||||||
time.sleep(3)
|
wait_for_all_checks(client)
|
||||||
|
|
||||||
# Goto the edit page, add our ignore text
|
# Goto the edit page, add our ignore text
|
||||||
# Add our URL to the import page
|
# Add our URL to the import page
|
||||||
@@ -99,7 +99,7 @@ def test_check_filter_multiline(client, live_server):
|
|||||||
)
|
)
|
||||||
|
|
||||||
assert b"Updated watch." in res.data
|
assert b"Updated watch." in res.data
|
||||||
time.sleep(3)
|
wait_for_all_checks(client)
|
||||||
|
|
||||||
res = client.get(
|
res = client.get(
|
||||||
url_for("preview_page", uuid="first"),
|
url_for("preview_page", uuid="first"),
|
||||||
@@ -132,12 +132,12 @@ def test_check_filter_and_regex_extract(client, live_server):
|
|||||||
)
|
)
|
||||||
assert b"1 Imported" in res.data
|
assert b"1 Imported" in res.data
|
||||||
|
|
||||||
time.sleep(1)
|
wait_for_all_checks(client)
|
||||||
# Trigger a check
|
# Trigger a check
|
||||||
client.get(url_for("form_watch_checknow"), follow_redirects=True)
|
client.get(url_for("form_watch_checknow"), follow_redirects=True)
|
||||||
|
|
||||||
# Give the thread time to pick it up
|
# Give the thread time to pick it up
|
||||||
time.sleep(sleep_time_for_fetch_thread)
|
wait_for_all_checks(client)
|
||||||
|
|
||||||
# Goto the edit page, add our ignore text
|
# Goto the edit page, add our ignore text
|
||||||
# Add our URL to the import page
|
# Add our URL to the import page
|
||||||
@@ -156,7 +156,7 @@ def test_check_filter_and_regex_extract(client, live_server):
|
|||||||
assert b"Updated watch." in res.data
|
assert b"Updated watch." in res.data
|
||||||
|
|
||||||
# Give the thread time to pick it up
|
# Give the thread time to pick it up
|
||||||
time.sleep(sleep_time_for_fetch_thread)
|
wait_for_all_checks(client)
|
||||||
|
|
||||||
# Make a change
|
# Make a change
|
||||||
set_modified_response()
|
set_modified_response()
|
||||||
@@ -164,7 +164,7 @@ def test_check_filter_and_regex_extract(client, live_server):
|
|||||||
# Trigger a check
|
# Trigger a check
|
||||||
client.get(url_for("form_watch_checknow"), follow_redirects=True)
|
client.get(url_for("form_watch_checknow"), follow_redirects=True)
|
||||||
# Give the thread time to pick it up
|
# Give the thread time to pick it up
|
||||||
time.sleep(sleep_time_for_fetch_thread)
|
wait_for_all_checks(client)
|
||||||
|
|
||||||
# It should have 'unviewed' still
|
# It should have 'unviewed' still
|
||||||
# Because it should be looking at only that 'sametext' id
|
# Because it should be looking at only that 'sametext' id
|
||||||
|
|||||||
@@ -4,7 +4,7 @@
|
|||||||
import os
|
import os
|
||||||
import time
|
import time
|
||||||
from flask import url_for
|
from flask import url_for
|
||||||
from .util import set_original_response, live_server_setup
|
from .util import set_original_response, live_server_setup, wait_for_all_checks
|
||||||
from changedetectionio.model import App
|
from changedetectionio.model import App
|
||||||
|
|
||||||
|
|
||||||
@@ -62,7 +62,7 @@ def test_filter_doesnt_exist_then_exists_should_get_notification(client, live_se
|
|||||||
assert b"Watch added" in res.data
|
assert b"Watch added" in res.data
|
||||||
|
|
||||||
# Give the thread time to pick up the first version
|
# Give the thread time to pick up the first version
|
||||||
time.sleep(3)
|
wait_for_all_checks(client)
|
||||||
|
|
||||||
# Goto the edit page, add our ignore text
|
# Goto the edit page, add our ignore text
|
||||||
# Add our URL to the import page
|
# Add our URL to the import page
|
||||||
@@ -101,6 +101,9 @@ def test_filter_doesnt_exist_then_exists_should_get_notification(client, live_se
|
|||||||
follow_redirects=True
|
follow_redirects=True
|
||||||
)
|
)
|
||||||
assert b"Updated watch." in res.data
|
assert b"Updated watch." in res.data
|
||||||
|
wait_for_all_checks(client)
|
||||||
|
|
||||||
|
# apprise takes a moment to fire
|
||||||
time.sleep(3)
|
time.sleep(3)
|
||||||
|
|
||||||
# Shouldn't exist, shouldn't have fired
|
# Shouldn't exist, shouldn't have fired
|
||||||
@@ -108,8 +111,11 @@ def test_filter_doesnt_exist_then_exists_should_get_notification(client, live_se
|
|||||||
# Now the filter should exist
|
# Now the filter should exist
|
||||||
set_response_with_filter()
|
set_response_with_filter()
|
||||||
client.get(url_for("form_watch_checknow"), follow_redirects=True)
|
client.get(url_for("form_watch_checknow"), follow_redirects=True)
|
||||||
time.sleep(3)
|
wait_for_all_checks(client)
|
||||||
|
|
||||||
|
|
||||||
|
# apprise takes a moment to fire
|
||||||
|
time.sleep(3)
|
||||||
assert os.path.isfile("test-datastore/notification.txt")
|
assert os.path.isfile("test-datastore/notification.txt")
|
||||||
|
|
||||||
with open("test-datastore/notification.txt", 'r') as f:
|
with open("test-datastore/notification.txt", 'r') as f:
|
||||||
|
|||||||
@@ -2,7 +2,7 @@
|
|||||||
|
|
||||||
import time
|
import time
|
||||||
from flask import url_for
|
from flask import url_for
|
||||||
from .util import live_server_setup, wait_for_all_checks, extract_rss_token_from_UI, get_UUID_for_tag_name
|
from .util import live_server_setup, wait_for_all_checks, extract_rss_token_from_UI, get_UUID_for_tag_name, extract_UUID_from_client
|
||||||
import os
|
import os
|
||||||
|
|
||||||
|
|
||||||
@@ -154,6 +154,10 @@ def test_tag_add_in_ui(client, live_server):
|
|||||||
)
|
)
|
||||||
assert b"Tag added" in res.data
|
assert b"Tag added" in res.data
|
||||||
assert b"new-test-tag" in res.data
|
assert b"new-test-tag" in res.data
|
||||||
|
|
||||||
|
res = client.get(url_for("tags.delete_all"), follow_redirects=True)
|
||||||
|
assert b'All tags deleted' in res.data
|
||||||
|
|
||||||
res = client.get(url_for("form_delete", uuid="all"), follow_redirects=True)
|
res = client.get(url_for("form_delete", uuid="all"), follow_redirects=True)
|
||||||
assert b'Deleted' in res.data
|
assert b'Deleted' in res.data
|
||||||
|
|
||||||
@@ -219,12 +223,10 @@ def test_group_tag_notification(client, live_server):
|
|||||||
assert "test-tag" in notification_submission
|
assert "test-tag" in notification_submission
|
||||||
assert "other-tag" in notification_submission
|
assert "other-tag" in notification_submission
|
||||||
|
|
||||||
res = client.get(url_for("form_delete", uuid="all"), follow_redirects=True)
|
|
||||||
assert b'Deleted' in res.data
|
|
||||||
|
|
||||||
#@todo Test that multiple notifications fired
|
#@todo Test that multiple notifications fired
|
||||||
#@todo Test that each of multiple notifications with different settings
|
#@todo Test that each of multiple notifications with different settings
|
||||||
|
res = client.get(url_for("form_delete", uuid="all"), follow_redirects=True)
|
||||||
|
assert b'Deleted' in res.data
|
||||||
|
|
||||||
def test_limit_tag_ui(client, live_server):
|
def test_limit_tag_ui(client, live_server):
|
||||||
#live_server_setup(live_server)
|
#live_server_setup(live_server)
|
||||||
@@ -260,3 +262,61 @@ def test_limit_tag_ui(client, live_server):
|
|||||||
assert b'test-tag' in res.data
|
assert b'test-tag' in res.data
|
||||||
assert res.data.count(b'processor-text_json_diff') == 20
|
assert res.data.count(b'processor-text_json_diff') == 20
|
||||||
assert b"object at" not in res.data
|
assert b"object at" not in res.data
|
||||||
|
res = client.get(url_for("form_delete", uuid="all"), follow_redirects=True)
|
||||||
|
assert b'Deleted' in res.data
|
||||||
|
res = client.get(url_for("tags.delete_all"), follow_redirects=True)
|
||||||
|
assert b'All tags deleted' in res.data
|
||||||
|
def test_clone_tag_on_import(client, live_server):
|
||||||
|
#live_server_setup(live_server)
|
||||||
|
test_url = url_for('test_endpoint', _external=True)
|
||||||
|
res = client.post(
|
||||||
|
url_for("import_page"),
|
||||||
|
data={"urls": test_url + " test-tag, another-tag\r\n"},
|
||||||
|
follow_redirects=True
|
||||||
|
)
|
||||||
|
|
||||||
|
assert b"1 Imported" in res.data
|
||||||
|
|
||||||
|
res = client.get(url_for("index"))
|
||||||
|
assert b'test-tag' in res.data
|
||||||
|
assert b'another-tag' in res.data
|
||||||
|
|
||||||
|
watch_uuid = extract_UUID_from_client(client)
|
||||||
|
res = client.get(url_for("form_clone", uuid=watch_uuid), follow_redirects=True)
|
||||||
|
|
||||||
|
assert b'Cloned' in res.data
|
||||||
|
# 2 times plus the top link to tag
|
||||||
|
assert res.data.count(b'test-tag') == 3
|
||||||
|
assert res.data.count(b'another-tag') == 3
|
||||||
|
res = client.get(url_for("form_delete", uuid="all"), follow_redirects=True)
|
||||||
|
assert b'Deleted' in res.data
|
||||||
|
|
||||||
|
def test_clone_tag_on_quickwatchform_add(client, live_server):
|
||||||
|
#live_server_setup(live_server)
|
||||||
|
|
||||||
|
test_url = url_for('test_endpoint', _external=True)
|
||||||
|
|
||||||
|
res = client.post(
|
||||||
|
url_for("form_quick_watch_add"),
|
||||||
|
data={"url": test_url, "tags": ' test-tag, another-tag '},
|
||||||
|
follow_redirects=True
|
||||||
|
)
|
||||||
|
|
||||||
|
assert b"Watch added" in res.data
|
||||||
|
|
||||||
|
res = client.get(url_for("index"))
|
||||||
|
assert b'test-tag' in res.data
|
||||||
|
assert b'another-tag' in res.data
|
||||||
|
|
||||||
|
watch_uuid = extract_UUID_from_client(client)
|
||||||
|
res = client.get(url_for("form_clone", uuid=watch_uuid), follow_redirects=True)
|
||||||
|
|
||||||
|
assert b'Cloned' in res.data
|
||||||
|
# 2 times plus the top link to tag
|
||||||
|
assert res.data.count(b'test-tag') == 3
|
||||||
|
assert res.data.count(b'another-tag') == 3
|
||||||
|
res = client.get(url_for("form_delete", uuid="all"), follow_redirects=True)
|
||||||
|
assert b'Deleted' in res.data
|
||||||
|
|
||||||
|
res = client.get(url_for("tags.delete_all"), follow_redirects=True)
|
||||||
|
assert b'All tags deleted' in res.data
|
||||||
@@ -3,7 +3,7 @@
|
|||||||
|
|
||||||
import time
|
import time
|
||||||
from flask import url_for
|
from flask import url_for
|
||||||
from .util import live_server_setup
|
from .util import live_server_setup, wait_for_all_checks
|
||||||
|
|
||||||
|
|
||||||
def test_setup(live_server):
|
def test_setup(live_server):
|
||||||
@@ -70,19 +70,19 @@ def test_render_anchor_tag_content_true(client, live_server):
|
|||||||
)
|
)
|
||||||
assert b"1 Imported" in res.data
|
assert b"1 Imported" in res.data
|
||||||
|
|
||||||
time.sleep(sleep_time_for_fetch_thread)
|
wait_for_all_checks(client)
|
||||||
# Trigger a check
|
# Trigger a check
|
||||||
client.get(url_for("form_watch_checknow"), follow_redirects=True)
|
client.get(url_for("form_watch_checknow"), follow_redirects=True)
|
||||||
|
|
||||||
# set a new html text with a modified link
|
# set a new html text with a modified link
|
||||||
set_modified_ignore_response()
|
set_modified_ignore_response()
|
||||||
time.sleep(sleep_time_for_fetch_thread)
|
wait_for_all_checks(client)
|
||||||
|
|
||||||
# Trigger a check
|
# Trigger a check
|
||||||
client.get(url_for("form_watch_checknow"), follow_redirects=True)
|
client.get(url_for("form_watch_checknow"), follow_redirects=True)
|
||||||
|
|
||||||
# Give the thread time to pick it up
|
# Give the thread time to pick it up
|
||||||
time.sleep(sleep_time_for_fetch_thread)
|
wait_for_all_checks(client)
|
||||||
|
|
||||||
# We should not see the rendered anchor tag
|
# We should not see the rendered anchor tag
|
||||||
res = client.get(url_for("preview_page", uuid="first"))
|
res = client.get(url_for("preview_page", uuid="first"))
|
||||||
@@ -104,7 +104,7 @@ def test_render_anchor_tag_content_true(client, live_server):
|
|||||||
client.get(url_for("form_watch_checknow"), follow_redirects=True)
|
client.get(url_for("form_watch_checknow"), follow_redirects=True)
|
||||||
|
|
||||||
# Give the thread time to pick it up
|
# Give the thread time to pick it up
|
||||||
time.sleep(sleep_time_for_fetch_thread)
|
wait_for_all_checks(client)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -2,7 +2,8 @@
|
|||||||
|
|
||||||
import time
|
import time
|
||||||
from flask import url_for
|
from flask import url_for
|
||||||
from . util import live_server_setup
|
from .util import live_server_setup, wait_for_all_checks
|
||||||
|
|
||||||
|
|
||||||
def test_setup(live_server):
|
def test_setup(live_server):
|
||||||
live_server_setup(live_server)
|
live_server_setup(live_server)
|
||||||
@@ -50,7 +51,6 @@ def set_original_ignore_response():
|
|||||||
|
|
||||||
# If there was only a change in the whitespacing, then we shouldnt have a change detected
|
# If there was only a change in the whitespacing, then we shouldnt have a change detected
|
||||||
def test_check_ignore_whitespace(client, live_server):
|
def test_check_ignore_whitespace(client, live_server):
|
||||||
sleep_time_for_fetch_thread = 3
|
|
||||||
|
|
||||||
# Give the endpoint time to spin up
|
# Give the endpoint time to spin up
|
||||||
time.sleep(1)
|
time.sleep(1)
|
||||||
@@ -78,17 +78,17 @@ def test_check_ignore_whitespace(client, live_server):
|
|||||||
)
|
)
|
||||||
assert b"1 Imported" in res.data
|
assert b"1 Imported" in res.data
|
||||||
|
|
||||||
time.sleep(sleep_time_for_fetch_thread)
|
wait_for_all_checks(client)
|
||||||
# Trigger a check
|
# Trigger a check
|
||||||
client.get(url_for("form_watch_checknow"), follow_redirects=True)
|
client.get(url_for("form_watch_checknow"), follow_redirects=True)
|
||||||
|
|
||||||
set_original_ignore_response_but_with_whitespace()
|
set_original_ignore_response_but_with_whitespace()
|
||||||
time.sleep(sleep_time_for_fetch_thread)
|
wait_for_all_checks(client)
|
||||||
# Trigger a check
|
# Trigger a check
|
||||||
client.get(url_for("form_watch_checknow"), follow_redirects=True)
|
client.get(url_for("form_watch_checknow"), follow_redirects=True)
|
||||||
|
|
||||||
# Give the thread time to pick it up
|
# Give the thread time to pick it up
|
||||||
time.sleep(sleep_time_for_fetch_thread)
|
wait_for_all_checks(client)
|
||||||
|
|
||||||
# It should report nothing found (no new 'unviewed' class)
|
# It should report nothing found (no new 'unviewed' class)
|
||||||
res = client.get(url_for("index"))
|
res = client.get(url_for("index"))
|
||||||
|
|||||||
@@ -112,6 +112,7 @@ def test_import_distillio(client, live_server):
|
|||||||
# did the tags work?
|
# did the tags work?
|
||||||
res = client.get( url_for("index"))
|
res = client.get( url_for("index"))
|
||||||
|
|
||||||
|
# check tags
|
||||||
assert b"nice stuff" in res.data
|
assert b"nice stuff" in res.data
|
||||||
assert b"nerd-news" in res.data
|
assert b"nerd-news" in res.data
|
||||||
|
|
||||||
|
|||||||
@@ -2,7 +2,7 @@
|
|||||||
|
|
||||||
import time
|
import time
|
||||||
from flask import url_for
|
from flask import url_for
|
||||||
from .util import live_server_setup
|
from .util import live_server_setup, wait_for_all_checks
|
||||||
|
|
||||||
|
|
||||||
# If there was only a change in the whitespacing, then we shouldnt have a change detected
|
# If there was only a change in the whitespacing, then we shouldnt have a change detected
|
||||||
@@ -24,7 +24,7 @@ def test_jinja2_in_url_query(client, live_server):
|
|||||||
follow_redirects=True
|
follow_redirects=True
|
||||||
)
|
)
|
||||||
assert b"Watch added" in res.data
|
assert b"Watch added" in res.data
|
||||||
time.sleep(3)
|
wait_for_all_checks(client)
|
||||||
# It should report nothing found (no new 'unviewed' class)
|
# It should report nothing found (no new 'unviewed' class)
|
||||||
res = client.get(
|
res = client.get(
|
||||||
url_for("preview_page", uuid="first"),
|
url_for("preview_page", uuid="first"),
|
||||||
|
|||||||
@@ -3,7 +3,7 @@
|
|||||||
import time
|
import time
|
||||||
from flask import url_for
|
from flask import url_for
|
||||||
from urllib.request import urlopen
|
from urllib.request import urlopen
|
||||||
from .util import set_original_response, set_modified_response, live_server_setup
|
from .util import set_original_response, set_modified_response, live_server_setup, wait_for_all_checks
|
||||||
|
|
||||||
sleep_time_for_fetch_thread = 3
|
sleep_time_for_fetch_thread = 3
|
||||||
|
|
||||||
@@ -35,14 +35,14 @@ def test_check_basic_change_detection_functionality(client, live_server):
|
|||||||
|
|
||||||
assert b"1 Imported" in res.data
|
assert b"1 Imported" in res.data
|
||||||
|
|
||||||
time.sleep(sleep_time_for_fetch_thread)
|
wait_for_all_checks(client)
|
||||||
|
|
||||||
# Do this a few times.. ensures we dont accidently set the status
|
# Do this a few times.. ensures we dont accidently set the status
|
||||||
for n in range(3):
|
for n in range(3):
|
||||||
client.get(url_for("form_watch_checknow"), follow_redirects=True)
|
client.get(url_for("form_watch_checknow"), follow_redirects=True)
|
||||||
|
|
||||||
# Give the thread time to pick it up
|
# Give the thread time to pick it up
|
||||||
time.sleep(sleep_time_for_fetch_thread)
|
wait_for_all_checks(client)
|
||||||
|
|
||||||
# It should report nothing found (no new 'unviewed' class)
|
# It should report nothing found (no new 'unviewed' class)
|
||||||
res = client.get(url_for("index"))
|
res = client.get(url_for("index"))
|
||||||
@@ -64,7 +64,7 @@ def test_check_basic_change_detection_functionality(client, live_server):
|
|||||||
client.get(url_for("form_watch_checknow"), follow_redirects=True)
|
client.get(url_for("form_watch_checknow"), follow_redirects=True)
|
||||||
|
|
||||||
# Give the thread time to pick it up
|
# Give the thread time to pick it up
|
||||||
time.sleep(sleep_time_for_fetch_thread)
|
wait_for_all_checks(client)
|
||||||
|
|
||||||
# It should report nothing found (no new 'unviewed' class)
|
# It should report nothing found (no new 'unviewed' class)
|
||||||
res = client.get(url_for("index"))
|
res = client.get(url_for("index"))
|
||||||
@@ -86,7 +86,7 @@ def test_check_basic_change_detection_functionality(client, live_server):
|
|||||||
client.get(url_for("form_watch_checknow"), follow_redirects=True)
|
client.get(url_for("form_watch_checknow"), follow_redirects=True)
|
||||||
|
|
||||||
# Give the thread time to pick it up
|
# Give the thread time to pick it up
|
||||||
time.sleep(sleep_time_for_fetch_thread)
|
wait_for_all_checks(client)
|
||||||
|
|
||||||
# It should report nothing found (no new 'unviewed' class)
|
# It should report nothing found (no new 'unviewed' class)
|
||||||
res = client.get(url_for("index"))
|
res = client.get(url_for("index"))
|
||||||
|
|||||||
@@ -24,9 +24,6 @@ def test_check_notification(client, live_server):
|
|||||||
#live_server_setup(live_server)
|
#live_server_setup(live_server)
|
||||||
set_original_response()
|
set_original_response()
|
||||||
|
|
||||||
# Give the endpoint time to spin up
|
|
||||||
time.sleep(1)
|
|
||||||
|
|
||||||
# Re 360 - new install should have defaults set
|
# Re 360 - new install should have defaults set
|
||||||
res = client.get(url_for("settings_page"))
|
res = client.get(url_for("settings_page"))
|
||||||
notification_url = url_for('test_notification_endpoint', _external=True).replace('http', 'json')
|
notification_url = url_for('test_notification_endpoint', _external=True).replace('http', 'json')
|
||||||
@@ -142,8 +139,7 @@ def test_check_notification(client, live_server):
|
|||||||
|
|
||||||
# Did we see the URL that had a change, in the notification?
|
# Did we see the URL that had a change, in the notification?
|
||||||
# Diff was correctly executed
|
# Diff was correctly executed
|
||||||
assert test_url in notification_submission
|
|
||||||
assert ':-)' in notification_submission
|
|
||||||
assert "Diff Full: Some initial text" in notification_submission
|
assert "Diff Full: Some initial text" in notification_submission
|
||||||
assert "Diff: (changed) Which is across multiple lines" in notification_submission
|
assert "Diff: (changed) Which is across multiple lines" in notification_submission
|
||||||
assert "(into) which has this one new line" in notification_submission
|
assert "(into) which has this one new line" in notification_submission
|
||||||
@@ -156,7 +152,8 @@ def test_check_notification(client, live_server):
|
|||||||
assert "preview/" in notification_submission
|
assert "preview/" in notification_submission
|
||||||
assert ":-)" in notification_submission
|
assert ":-)" in notification_submission
|
||||||
assert "New ChangeDetection.io Notification - {}".format(test_url) in notification_submission
|
assert "New ChangeDetection.io Notification - {}".format(test_url) in notification_submission
|
||||||
|
assert test_url in notification_submission
|
||||||
|
assert ':-)' in notification_submission
|
||||||
# Check the attachment was added, and that it is a JPEG from the original PNG
|
# Check the attachment was added, and that it is a JPEG from the original PNG
|
||||||
notification_submission_object = json.loads(notification_submission)
|
notification_submission_object = json.loads(notification_submission)
|
||||||
# We keep PNG screenshots for now
|
# We keep PNG screenshots for now
|
||||||
|
|||||||
@@ -2,7 +2,7 @@ import os
|
|||||||
import time
|
import time
|
||||||
import re
|
import re
|
||||||
from flask import url_for
|
from flask import url_for
|
||||||
from . util import set_original_response, set_modified_response, live_server_setup
|
from .util import set_original_response, set_modified_response, live_server_setup, wait_for_all_checks
|
||||||
import logging
|
import logging
|
||||||
|
|
||||||
def test_check_notification_error_handling(client, live_server):
|
def test_check_notification_error_handling(client, live_server):
|
||||||
@@ -11,7 +11,7 @@ def test_check_notification_error_handling(client, live_server):
|
|||||||
set_original_response()
|
set_original_response()
|
||||||
|
|
||||||
# Give the endpoint time to spin up
|
# Give the endpoint time to spin up
|
||||||
time.sleep(2)
|
time.sleep(1)
|
||||||
|
|
||||||
# Set a URL and fetch it, then set a notification URL which is going to give errors
|
# Set a URL and fetch it, then set a notification URL which is going to give errors
|
||||||
test_url = url_for('test_endpoint', _external=True)
|
test_url = url_for('test_endpoint', _external=True)
|
||||||
@@ -22,7 +22,7 @@ def test_check_notification_error_handling(client, live_server):
|
|||||||
)
|
)
|
||||||
assert b"Watch added" in res.data
|
assert b"Watch added" in res.data
|
||||||
|
|
||||||
time.sleep(2)
|
wait_for_all_checks(client)
|
||||||
set_modified_response()
|
set_modified_response()
|
||||||
|
|
||||||
res = client.post(
|
res = client.post(
|
||||||
|
|||||||
@@ -2,7 +2,7 @@
|
|||||||
|
|
||||||
import time
|
import time
|
||||||
from flask import url_for
|
from flask import url_for
|
||||||
from .util import live_server_setup
|
from .util import live_server_setup, wait_for_all_checks
|
||||||
|
|
||||||
|
|
||||||
def set_original_ignore_response():
|
def set_original_ignore_response():
|
||||||
@@ -32,8 +32,7 @@ def test_obfuscations(client, live_server):
|
|||||||
assert b"1 Imported" in res.data
|
assert b"1 Imported" in res.data
|
||||||
|
|
||||||
# Give the thread time to pick it up
|
# Give the thread time to pick it up
|
||||||
time.sleep(3)
|
wait_for_all_checks(client)
|
||||||
|
|
||||||
# Check HTML conversion detected and workd
|
# Check HTML conversion detected and workd
|
||||||
res = client.get(
|
res = client.get(
|
||||||
url_for("preview_page", uuid="first"),
|
url_for("preview_page", uuid="first"),
|
||||||
|
|||||||
@@ -2,7 +2,7 @@
|
|||||||
|
|
||||||
import time
|
import time
|
||||||
from flask import url_for
|
from flask import url_for
|
||||||
from .util import set_original_response, set_modified_response, live_server_setup
|
from .util import set_original_response, set_modified_response, live_server_setup, wait_for_all_checks
|
||||||
|
|
||||||
sleep_time_for_fetch_thread = 3
|
sleep_time_for_fetch_thread = 3
|
||||||
|
|
||||||
@@ -22,7 +22,7 @@ def test_fetch_pdf(client, live_server):
|
|||||||
|
|
||||||
assert b"1 Imported" in res.data
|
assert b"1 Imported" in res.data
|
||||||
|
|
||||||
time.sleep(sleep_time_for_fetch_thread)
|
wait_for_all_checks(client)
|
||||||
res = client.get(
|
res = client.get(
|
||||||
url_for("preview_page", uuid="first"),
|
url_for("preview_page", uuid="first"),
|
||||||
follow_redirects=True
|
follow_redirects=True
|
||||||
|
|||||||
@@ -19,9 +19,9 @@ def test_rss_and_token(client, live_server):
|
|||||||
assert b"1 Imported" in res.data
|
assert b"1 Imported" in res.data
|
||||||
rss_token = extract_rss_token_from_UI(client)
|
rss_token = extract_rss_token_from_UI(client)
|
||||||
|
|
||||||
time.sleep(2)
|
wait_for_all_checks(client)
|
||||||
client.get(url_for("form_watch_checknow"), follow_redirects=True)
|
client.get(url_for("form_watch_checknow"), follow_redirects=True)
|
||||||
time.sleep(2)
|
wait_for_all_checks(client)
|
||||||
|
|
||||||
# Add our URL to the import page
|
# Add our URL to the import page
|
||||||
res = client.get(
|
res = client.get(
|
||||||
|
|||||||
@@ -1,5 +1,5 @@
|
|||||||
from flask import url_for
|
from flask import url_for
|
||||||
from . util import set_original_response, set_modified_response, live_server_setup
|
from .util import set_original_response, set_modified_response, live_server_setup, wait_for_all_checks
|
||||||
import time
|
import time
|
||||||
|
|
||||||
|
|
||||||
@@ -59,7 +59,7 @@ def test_bad_access(client, live_server):
|
|||||||
data={"url": 'file:///tasty/disk/drive', "tags": ''},
|
data={"url": 'file:///tasty/disk/drive', "tags": ''},
|
||||||
follow_redirects=True
|
follow_redirects=True
|
||||||
)
|
)
|
||||||
time.sleep(1)
|
wait_for_all_checks(client)
|
||||||
res = client.get(url_for("index"))
|
res = client.get(url_for("index"))
|
||||||
|
|
||||||
assert b'file:// type access is denied for security reasons.' in res.data
|
assert b'file:// type access is denied for security reasons.' in res.data
|
||||||
@@ -22,7 +22,7 @@ def test_check_basic_change_detection_functionality_source(client, live_server):
|
|||||||
|
|
||||||
assert b"1 Imported" in res.data
|
assert b"1 Imported" in res.data
|
||||||
|
|
||||||
time.sleep(sleep_time_for_fetch_thread)
|
wait_for_all_checks(client)
|
||||||
|
|
||||||
#####################
|
#####################
|
||||||
|
|
||||||
@@ -82,7 +82,7 @@ def test_check_ignore_elements(client, live_server):
|
|||||||
follow_redirects=True
|
follow_redirects=True
|
||||||
)
|
)
|
||||||
|
|
||||||
time.sleep(sleep_time_for_fetch_thread)
|
wait_for_all_checks(client)
|
||||||
|
|
||||||
res = client.get(
|
res = client.get(
|
||||||
url_for("preview_page", uuid="first"),
|
url_for("preview_page", uuid="first"),
|
||||||
|
|||||||
@@ -2,7 +2,7 @@
|
|||||||
|
|
||||||
import time
|
import time
|
||||||
from flask import url_for
|
from flask import url_for
|
||||||
from . util import live_server_setup
|
from .util import live_server_setup, wait_for_all_checks
|
||||||
|
|
||||||
|
|
||||||
def set_original_ignore_response():
|
def set_original_ignore_response():
|
||||||
@@ -64,7 +64,7 @@ def test_trigger_functionality(client, live_server):
|
|||||||
set_original_ignore_response()
|
set_original_ignore_response()
|
||||||
|
|
||||||
# Give the endpoint time to spin up
|
# Give the endpoint time to spin up
|
||||||
time.sleep(1)
|
wait_for_all_checks(client)
|
||||||
|
|
||||||
# Add our URL to the import page
|
# Add our URL to the import page
|
||||||
test_url = url_for('test_endpoint', _external=True)
|
test_url = url_for('test_endpoint', _external=True)
|
||||||
@@ -96,7 +96,7 @@ def test_trigger_functionality(client, live_server):
|
|||||||
assert bytes(trigger_text.encode('utf-8')) in res.data
|
assert bytes(trigger_text.encode('utf-8')) in res.data
|
||||||
|
|
||||||
# Give the thread time to pick it up
|
# Give the thread time to pick it up
|
||||||
time.sleep(sleep_time_for_fetch_thread)
|
wait_for_all_checks(client)
|
||||||
|
|
||||||
# so that we set the state to 'unviewed' after all the edits
|
# so that we set the state to 'unviewed' after all the edits
|
||||||
client.get(url_for("diff_history_page", uuid="first"))
|
client.get(url_for("diff_history_page", uuid="first"))
|
||||||
@@ -105,7 +105,7 @@ def test_trigger_functionality(client, live_server):
|
|||||||
client.get(url_for("form_watch_checknow"), follow_redirects=True)
|
client.get(url_for("form_watch_checknow"), follow_redirects=True)
|
||||||
|
|
||||||
# Give the thread time to pick it up
|
# Give the thread time to pick it up
|
||||||
time.sleep(sleep_time_for_fetch_thread)
|
wait_for_all_checks(client)
|
||||||
|
|
||||||
# It should report nothing found (no new 'unviewed' class)
|
# It should report nothing found (no new 'unviewed' class)
|
||||||
res = client.get(url_for("index"))
|
res = client.get(url_for("index"))
|
||||||
@@ -118,18 +118,19 @@ def test_trigger_functionality(client, live_server):
|
|||||||
# Trigger a check
|
# Trigger a check
|
||||||
client.get(url_for("form_watch_checknow"), follow_redirects=True)
|
client.get(url_for("form_watch_checknow"), follow_redirects=True)
|
||||||
# Give the thread time to pick it up
|
# Give the thread time to pick it up
|
||||||
time.sleep(sleep_time_for_fetch_thread)
|
wait_for_all_checks(client)
|
||||||
|
|
||||||
# It should report nothing found (no new 'unviewed' class)
|
# It should report nothing found (no new 'unviewed' class)
|
||||||
res = client.get(url_for("index"))
|
res = client.get(url_for("index"))
|
||||||
assert b'unviewed' not in res.data
|
assert b'unviewed' not in res.data
|
||||||
|
|
||||||
# Now set the content which contains the trigger text
|
# Now set the content which contains the trigger text
|
||||||
time.sleep(sleep_time_for_fetch_thread)
|
wait_for_all_checks(client)
|
||||||
set_modified_with_trigger_text_response()
|
set_modified_with_trigger_text_response()
|
||||||
|
|
||||||
client.get(url_for("form_watch_checknow"), follow_redirects=True)
|
client.get(url_for("form_watch_checknow"), follow_redirects=True)
|
||||||
time.sleep(sleep_time_for_fetch_thread)
|
wait_for_all_checks(client)
|
||||||
|
|
||||||
res = client.get(url_for("index"))
|
res = client.get(url_for("index"))
|
||||||
assert b'unviewed' in res.data
|
assert b'unviewed' in res.data
|
||||||
|
|
||||||
|
|||||||
@@ -2,7 +2,7 @@
|
|||||||
|
|
||||||
import time
|
import time
|
||||||
from flask import url_for
|
from flask import url_for
|
||||||
from . util import live_server_setup
|
from .util import live_server_setup, wait_for_all_checks
|
||||||
|
|
||||||
|
|
||||||
def set_original_ignore_response():
|
def set_original_ignore_response():
|
||||||
@@ -43,7 +43,7 @@ def test_trigger_regex_functionality(client, live_server):
|
|||||||
assert b"1 Imported" in res.data
|
assert b"1 Imported" in res.data
|
||||||
|
|
||||||
# Give the thread time to pick it up
|
# Give the thread time to pick it up
|
||||||
time.sleep(sleep_time_for_fetch_thread)
|
wait_for_all_checks(client)
|
||||||
|
|
||||||
# It should report nothing found (just a new one shouldnt have anything)
|
# It should report nothing found (just a new one shouldnt have anything)
|
||||||
res = client.get(url_for("index"))
|
res = client.get(url_for("index"))
|
||||||
@@ -57,7 +57,8 @@ def test_trigger_regex_functionality(client, live_server):
|
|||||||
"fetch_backend": "html_requests"},
|
"fetch_backend": "html_requests"},
|
||||||
follow_redirects=True
|
follow_redirects=True
|
||||||
)
|
)
|
||||||
time.sleep(sleep_time_for_fetch_thread)
|
wait_for_all_checks(client)
|
||||||
|
|
||||||
# so that we set the state to 'unviewed' after all the edits
|
# so that we set the state to 'unviewed' after all the edits
|
||||||
client.get(url_for("diff_history_page", uuid="first"))
|
client.get(url_for("diff_history_page", uuid="first"))
|
||||||
|
|
||||||
@@ -65,7 +66,7 @@ def test_trigger_regex_functionality(client, live_server):
|
|||||||
f.write("some new noise")
|
f.write("some new noise")
|
||||||
|
|
||||||
client.get(url_for("form_watch_checknow"), follow_redirects=True)
|
client.get(url_for("form_watch_checknow"), follow_redirects=True)
|
||||||
time.sleep(sleep_time_for_fetch_thread)
|
wait_for_all_checks(client)
|
||||||
|
|
||||||
# It should report nothing found (nothing should match the regex)
|
# It should report nothing found (nothing should match the regex)
|
||||||
res = client.get(url_for("index"))
|
res = client.get(url_for("index"))
|
||||||
@@ -75,7 +76,8 @@ def test_trigger_regex_functionality(client, live_server):
|
|||||||
f.write("regex test123<br>\nsomething 123")
|
f.write("regex test123<br>\nsomething 123")
|
||||||
|
|
||||||
client.get(url_for("form_watch_checknow"), follow_redirects=True)
|
client.get(url_for("form_watch_checknow"), follow_redirects=True)
|
||||||
time.sleep(sleep_time_for_fetch_thread)
|
wait_for_all_checks(client)
|
||||||
|
|
||||||
res = client.get(url_for("index"))
|
res = client.get(url_for("index"))
|
||||||
assert b'unviewed' in res.data
|
assert b'unviewed' in res.data
|
||||||
|
|
||||||
|
|||||||
@@ -2,7 +2,7 @@
|
|||||||
|
|
||||||
import time
|
import time
|
||||||
from flask import url_for
|
from flask import url_for
|
||||||
from . util import live_server_setup
|
from .util import live_server_setup, wait_for_all_checks
|
||||||
|
|
||||||
|
|
||||||
def set_original_ignore_response():
|
def set_original_ignore_response():
|
||||||
@@ -42,7 +42,7 @@ def test_trigger_regex_functionality_with_filter(client, live_server):
|
|||||||
assert b"1 Imported" in res.data
|
assert b"1 Imported" in res.data
|
||||||
|
|
||||||
# it needs time to save the original version
|
# it needs time to save the original version
|
||||||
time.sleep(sleep_time_for_fetch_thread)
|
wait_for_all_checks(client)
|
||||||
|
|
||||||
### test regex with filter
|
### test regex with filter
|
||||||
res = client.post(
|
res = client.post(
|
||||||
@@ -55,7 +55,7 @@ def test_trigger_regex_functionality_with_filter(client, live_server):
|
|||||||
)
|
)
|
||||||
|
|
||||||
# Give the thread time to pick it up
|
# Give the thread time to pick it up
|
||||||
time.sleep(sleep_time_for_fetch_thread)
|
wait_for_all_checks(client)
|
||||||
|
|
||||||
client.get(url_for("diff_history_page", uuid="first"))
|
client.get(url_for("diff_history_page", uuid="first"))
|
||||||
|
|
||||||
@@ -64,7 +64,7 @@ def test_trigger_regex_functionality_with_filter(client, live_server):
|
|||||||
f.write("<html>some new noise with cool stuff2 ok</html>")
|
f.write("<html>some new noise with cool stuff2 ok</html>")
|
||||||
|
|
||||||
client.get(url_for("form_watch_checknow"), follow_redirects=True)
|
client.get(url_for("form_watch_checknow"), follow_redirects=True)
|
||||||
time.sleep(sleep_time_for_fetch_thread)
|
wait_for_all_checks(client)
|
||||||
|
|
||||||
# It should report nothing found (nothing should match the regex and filter)
|
# It should report nothing found (nothing should match the regex and filter)
|
||||||
res = client.get(url_for("index"))
|
res = client.get(url_for("index"))
|
||||||
@@ -75,7 +75,8 @@ def test_trigger_regex_functionality_with_filter(client, live_server):
|
|||||||
f.write("<html>some new noise with <span id=in-here>cool stuff6</span> ok</html>")
|
f.write("<html>some new noise with <span id=in-here>cool stuff6</span> ok</html>")
|
||||||
|
|
||||||
client.get(url_for("form_watch_checknow"), follow_redirects=True)
|
client.get(url_for("form_watch_checknow"), follow_redirects=True)
|
||||||
time.sleep(sleep_time_for_fetch_thread)
|
wait_for_all_checks(client)
|
||||||
|
|
||||||
res = client.get(url_for("index"))
|
res = client.get(url_for("index"))
|
||||||
assert b'unviewed' in res.data
|
assert b'unviewed' in res.data
|
||||||
|
|
||||||
|
|||||||
@@ -2,7 +2,7 @@
|
|||||||
|
|
||||||
import time
|
import time
|
||||||
from flask import url_for
|
from flask import url_for
|
||||||
from .util import live_server_setup
|
from .util import live_server_setup, wait_for_all_checks
|
||||||
|
|
||||||
|
|
||||||
def set_original_ignore_response():
|
def set_original_ignore_response():
|
||||||
@@ -67,7 +67,7 @@ def test_unique_lines_functionality(client, live_server):
|
|||||||
follow_redirects=True
|
follow_redirects=True
|
||||||
)
|
)
|
||||||
assert b"1 Imported" in res.data
|
assert b"1 Imported" in res.data
|
||||||
time.sleep(sleep_time_for_fetch_thread)
|
wait_for_all_checks(client)
|
||||||
|
|
||||||
# Add our URL to the import page
|
# Add our URL to the import page
|
||||||
res = client.post(
|
res = client.post(
|
||||||
@@ -83,12 +83,13 @@ def test_unique_lines_functionality(client, live_server):
|
|||||||
# Make a change
|
# Make a change
|
||||||
set_modified_swapped_lines()
|
set_modified_swapped_lines()
|
||||||
|
|
||||||
time.sleep(sleep_time_for_fetch_thread)
|
wait_for_all_checks(client)
|
||||||
|
|
||||||
# Trigger a check
|
# Trigger a check
|
||||||
client.get(url_for("form_watch_checknow"), follow_redirects=True)
|
client.get(url_for("form_watch_checknow"), follow_redirects=True)
|
||||||
|
|
||||||
# Give the thread time to pick it up
|
# Give the thread time to pick it up
|
||||||
time.sleep(sleep_time_for_fetch_thread)
|
wait_for_all_checks(client)
|
||||||
|
|
||||||
# It should report nothing found (no new 'unviewed' class)
|
# It should report nothing found (no new 'unviewed' class)
|
||||||
res = client.get(url_for("index"))
|
res = client.get(url_for("index"))
|
||||||
|
|||||||
@@ -2,7 +2,7 @@
|
|||||||
|
|
||||||
import time
|
import time
|
||||||
from flask import url_for
|
from flask import url_for
|
||||||
from . util import live_server_setup
|
from .util import live_server_setup, wait_for_all_checks
|
||||||
|
|
||||||
from ..html_tools import *
|
from ..html_tools import *
|
||||||
|
|
||||||
@@ -86,14 +86,15 @@ def test_check_xpath_filter_utf8(client, live_server):
|
|||||||
follow_redirects=True
|
follow_redirects=True
|
||||||
)
|
)
|
||||||
assert b"1 Imported" in res.data
|
assert b"1 Imported" in res.data
|
||||||
time.sleep(1)
|
wait_for_all_checks(client)
|
||||||
res = client.post(
|
res = client.post(
|
||||||
url_for("edit_page", uuid="first"),
|
url_for("edit_page", uuid="first"),
|
||||||
data={"include_filters": filter, "url": test_url, "tags": "", "headers": "", 'fetch_backend': "html_requests"},
|
data={"include_filters": filter, "url": test_url, "tags": "", "headers": "", 'fetch_backend': "html_requests"},
|
||||||
follow_redirects=True
|
follow_redirects=True
|
||||||
)
|
)
|
||||||
assert b"Updated watch." in res.data
|
assert b"Updated watch." in res.data
|
||||||
time.sleep(3)
|
wait_for_all_checks(client)
|
||||||
|
|
||||||
res = client.get(url_for("index"))
|
res = client.get(url_for("index"))
|
||||||
assert b'Unicode strings with encoding declaration are not supported.' not in res.data
|
assert b'Unicode strings with encoding declaration are not supported.' not in res.data
|
||||||
res = client.get(url_for("form_delete", uuid="all"), follow_redirects=True)
|
res = client.get(url_for("form_delete", uuid="all"), follow_redirects=True)
|
||||||
@@ -140,14 +141,14 @@ def test_check_xpath_text_function_utf8(client, live_server):
|
|||||||
follow_redirects=True
|
follow_redirects=True
|
||||||
)
|
)
|
||||||
assert b"1 Imported" in res.data
|
assert b"1 Imported" in res.data
|
||||||
time.sleep(1)
|
wait_for_all_checks(client)
|
||||||
res = client.post(
|
res = client.post(
|
||||||
url_for("edit_page", uuid="first"),
|
url_for("edit_page", uuid="first"),
|
||||||
data={"include_filters": filter, "url": test_url, "tags": "", "headers": "", 'fetch_backend': "html_requests"},
|
data={"include_filters": filter, "url": test_url, "tags": "", "headers": "", 'fetch_backend': "html_requests"},
|
||||||
follow_redirects=True
|
follow_redirects=True
|
||||||
)
|
)
|
||||||
assert b"Updated watch." in res.data
|
assert b"Updated watch." in res.data
|
||||||
time.sleep(3)
|
wait_for_all_checks(client)
|
||||||
res = client.get(url_for("index"))
|
res = client.get(url_for("index"))
|
||||||
assert b'Unicode strings with encoding declaration are not supported.' not in res.data
|
assert b'Unicode strings with encoding declaration are not supported.' not in res.data
|
||||||
|
|
||||||
@@ -183,7 +184,7 @@ def test_check_markup_xpath_filter_restriction(client, live_server):
|
|||||||
assert b"1 Imported" in res.data
|
assert b"1 Imported" in res.data
|
||||||
|
|
||||||
# Give the thread time to pick it up
|
# Give the thread time to pick it up
|
||||||
time.sleep(sleep_time_for_fetch_thread)
|
wait_for_all_checks(client)
|
||||||
|
|
||||||
# Goto the edit page, add our ignore text
|
# Goto the edit page, add our ignore text
|
||||||
# Add our URL to the import page
|
# Add our URL to the import page
|
||||||
@@ -195,7 +196,7 @@ def test_check_markup_xpath_filter_restriction(client, live_server):
|
|||||||
assert b"Updated watch." in res.data
|
assert b"Updated watch." in res.data
|
||||||
|
|
||||||
# Give the thread time to pick it up
|
# Give the thread time to pick it up
|
||||||
time.sleep(sleep_time_for_fetch_thread)
|
wait_for_all_checks(client)
|
||||||
|
|
||||||
# view it/reset state back to viewed
|
# view it/reset state back to viewed
|
||||||
client.get(url_for("diff_history_page", uuid="first"), follow_redirects=True)
|
client.get(url_for("diff_history_page", uuid="first"), follow_redirects=True)
|
||||||
@@ -206,7 +207,7 @@ def test_check_markup_xpath_filter_restriction(client, live_server):
|
|||||||
# Trigger a check
|
# Trigger a check
|
||||||
client.get(url_for("form_watch_checknow"), follow_redirects=True)
|
client.get(url_for("form_watch_checknow"), follow_redirects=True)
|
||||||
# Give the thread time to pick it up
|
# Give the thread time to pick it up
|
||||||
time.sleep(sleep_time_for_fetch_thread)
|
wait_for_all_checks(client)
|
||||||
|
|
||||||
res = client.get(url_for("index"))
|
res = client.get(url_for("index"))
|
||||||
assert b'unviewed' not in res.data
|
assert b'unviewed' not in res.data
|
||||||
@@ -216,9 +217,6 @@ def test_check_markup_xpath_filter_restriction(client, live_server):
|
|||||||
|
|
||||||
def test_xpath_validation(client, live_server):
|
def test_xpath_validation(client, live_server):
|
||||||
|
|
||||||
# Give the endpoint time to spin up
|
|
||||||
time.sleep(1)
|
|
||||||
|
|
||||||
# Add our URL to the import page
|
# Add our URL to the import page
|
||||||
test_url = url_for('test_endpoint', _external=True)
|
test_url = url_for('test_endpoint', _external=True)
|
||||||
res = client.post(
|
res = client.post(
|
||||||
@@ -227,7 +225,7 @@ def test_xpath_validation(client, live_server):
|
|||||||
follow_redirects=True
|
follow_redirects=True
|
||||||
)
|
)
|
||||||
assert b"1 Imported" in res.data
|
assert b"1 Imported" in res.data
|
||||||
time.sleep(2)
|
wait_for_all_checks(client)
|
||||||
|
|
||||||
res = client.post(
|
res = client.post(
|
||||||
url_for("edit_page", uuid="first"),
|
url_for("edit_page", uuid="first"),
|
||||||
@@ -257,7 +255,7 @@ def test_check_with_prefix_include_filters(client, live_server):
|
|||||||
follow_redirects=True
|
follow_redirects=True
|
||||||
)
|
)
|
||||||
assert b"1 Imported" in res.data
|
assert b"1 Imported" in res.data
|
||||||
time.sleep(3)
|
wait_for_all_checks(client)
|
||||||
|
|
||||||
res = client.post(
|
res = client.post(
|
||||||
url_for("edit_page", uuid="first"),
|
url_for("edit_page", uuid="first"),
|
||||||
@@ -266,7 +264,7 @@ def test_check_with_prefix_include_filters(client, live_server):
|
|||||||
)
|
)
|
||||||
|
|
||||||
assert b"Updated watch." in res.data
|
assert b"Updated watch." in res.data
|
||||||
time.sleep(3)
|
wait_for_all_checks(client)
|
||||||
|
|
||||||
res = client.get(
|
res = client.get(
|
||||||
url_for("preview_page", uuid="first"),
|
url_for("preview_page", uuid="first"),
|
||||||
|
|||||||
@@ -65,15 +65,45 @@ class update_worker(threading.Thread):
|
|||||||
logging.info (">> SENDING NOTIFICATION")
|
logging.info (">> SENDING NOTIFICATION")
|
||||||
self.notification_q.put(n_object)
|
self.notification_q.put(n_object)
|
||||||
|
|
||||||
|
# Prefer - Individual watch settings > Tag settings > Global settings (in that order)
|
||||||
|
def _check_cascading_vars(self, var_name, watch):
|
||||||
|
|
||||||
|
from changedetectionio.notification import (
|
||||||
|
default_notification_format_for_watch,
|
||||||
|
default_notification_body,
|
||||||
|
default_notification_title
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
# Would be better if this was some kind of Object where Watch can reference the parent datastore etc
|
||||||
|
v = watch.get(var_name)
|
||||||
|
if v and not watch.get('notification_muted'):
|
||||||
|
return v
|
||||||
|
|
||||||
|
tags = self.datastore.get_all_tags_for_watch(uuid=watch.get('uuid'))
|
||||||
|
if tags:
|
||||||
|
for tag_uuid, tag in tags.items():
|
||||||
|
v = tag.get(var_name)
|
||||||
|
if v and not tag.get('notification_muted'):
|
||||||
|
return v
|
||||||
|
|
||||||
|
if self.datastore.data['settings']['application'].get(var_name):
|
||||||
|
return self.datastore.data['settings']['application'].get(var_name)
|
||||||
|
|
||||||
|
# Otherwise could be defaults
|
||||||
|
if var_name == 'notification_format':
|
||||||
|
return default_notification_format_for_watch
|
||||||
|
if var_name == 'notification_body':
|
||||||
|
return default_notification_body
|
||||||
|
if var_name == 'notification_title':
|
||||||
|
return default_notification_title
|
||||||
|
|
||||||
|
return None
|
||||||
|
|
||||||
def send_content_changed_notification(self, watch_uuid):
|
def send_content_changed_notification(self, watch_uuid):
|
||||||
|
|
||||||
from changedetectionio.notification import (
|
|
||||||
default_notification_format_for_watch
|
|
||||||
)
|
|
||||||
|
|
||||||
n_object = {}
|
n_object = {}
|
||||||
watch = self.datastore.data['watching'].get(watch_uuid, False)
|
watch = self.datastore.data['watching'].get(watch_uuid)
|
||||||
if not watch:
|
if not watch:
|
||||||
return
|
return
|
||||||
|
|
||||||
@@ -87,57 +117,20 @@ class update_worker(threading.Thread):
|
|||||||
)
|
)
|
||||||
|
|
||||||
# Should be a better parent getter in the model object
|
# Should be a better parent getter in the model object
|
||||||
|
|
||||||
# Prefer - Individual watch settings > Tag settings > Global settings (in that order)
|
# Prefer - Individual watch settings > Tag settings > Global settings (in that order)
|
||||||
n_object['notification_urls'] = watch.get('notification_urls')
|
n_object['notification_urls'] = self._check_cascading_vars('notification_urls', watch)
|
||||||
|
n_object['notification_title'] = self._check_cascading_vars('notification_title', watch)
|
||||||
n_object['notification_title'] = watch['notification_title'] if watch['notification_title'] else \
|
n_object['notification_body'] = self._check_cascading_vars('notification_body', watch)
|
||||||
self.datastore.data['settings']['application']['notification_title']
|
n_object['notification_format'] = self._check_cascading_vars('notification_format', watch)
|
||||||
|
|
||||||
n_object['notification_body'] = watch['notification_body'] if watch['notification_body'] else \
|
|
||||||
self.datastore.data['settings']['application']['notification_body']
|
|
||||||
|
|
||||||
n_object['notification_format'] = watch['notification_format'] if watch['notification_format'] != default_notification_format_for_watch else \
|
|
||||||
self.datastore.data['settings']['application']['notification_format']
|
|
||||||
|
|
||||||
# (Individual watch) Only prepare to notify if the rules above matched
|
# (Individual watch) Only prepare to notify if the rules above matched
|
||||||
sent = False
|
queued = False
|
||||||
if 'notification_urls' in n_object and n_object['notification_urls']:
|
if n_object and n_object.get('notification_urls'):
|
||||||
sent = True
|
queued = True
|
||||||
self.queue_notification_for_watch(n_object, watch)
|
self.queue_notification_for_watch(n_object, watch)
|
||||||
|
|
||||||
# (Group tags) try by group tag
|
return queued
|
||||||
if not sent:
|
|
||||||
# Else, Try by tag, and use system default vars for format, body etc as fallback
|
|
||||||
tags = self.datastore.get_all_tags_for_watch(uuid=watch_uuid)
|
|
||||||
for tag_uuid, tag in tags.items():
|
|
||||||
n_object = {}
|
|
||||||
n_object['notification_urls'] = tag.get('notification_urls')
|
|
||||||
|
|
||||||
n_object['notification_title'] = tag.get('notification_title') if tag.get('notification_title') else \
|
|
||||||
self.datastore.data['settings']['application']['notification_title']
|
|
||||||
|
|
||||||
n_object['notification_body'] = tag.get('notification_body') if tag.get('notification_body') else \
|
|
||||||
self.datastore.data['settings']['application']['notification_body']
|
|
||||||
|
|
||||||
n_object['notification_format'] = tag.get('notification_format') if tag.get('notification_format') != default_notification_format_for_watch else \
|
|
||||||
self.datastore.data['settings']['application']['notification_format']
|
|
||||||
|
|
||||||
if 'notification_urls' in n_object and n_object.get('notification_urls') and not tag.get('notification_muted'):
|
|
||||||
sent = True
|
|
||||||
self.queue_notification_for_watch(n_object, watch)
|
|
||||||
|
|
||||||
# (Group tags) try by global
|
|
||||||
if not sent:
|
|
||||||
# leave this as is, but repeat in a loop for each tag also
|
|
||||||
n_object['notification_urls'] = self.datastore.data['settings']['application'].get('notification_urls')
|
|
||||||
n_object['notification_title'] = self.datastore.data['settings']['application'].get('notification_title')
|
|
||||||
n_object['notification_body'] = self.datastore.data['settings']['application'].get('notification_body')
|
|
||||||
n_object['notification_format'] = self.datastore.data['settings']['application'].get('notification_format')
|
|
||||||
if n_object.get('notification_urls') and n_object.get('notification_body') and n_object.get('notification_title'):
|
|
||||||
sent = True
|
|
||||||
self.queue_notification_for_watch(n_object, watch)
|
|
||||||
|
|
||||||
return sent
|
|
||||||
|
|
||||||
|
|
||||||
def send_filter_failure_notification(self, watch_uuid):
|
def send_filter_failure_notification(self, watch_uuid):
|
||||||
|
|||||||
Reference in New Issue
Block a user