Compare commits

..

20 Commits

Author SHA1 Message Date
dgtlmoon
9b036d7b19 Simple UI to see the difference and the two images 2022-02-12 23:48:10 +01:00
dgtlmoon
0761984bcd tweaks to image diff highlighter 2022-02-12 23:37:02 +01:00
dgtlmoon
e73721a3f0 tweaking 2022-02-12 23:03:31 +01:00
dgtlmoon
86fc9d669f Basic handler for diff rendering 2022-02-12 22:58:43 +01:00
dgtlmoon
7a66b69158 Some work around diff viewing 2022-02-12 22:48:29 +01:00
dgtlmoon
ddd7b2772d for now dont bother renaming snapshot 2022-02-12 22:48:15 +01:00
dgtlmoon
305060f79c Exceptions around saving snapshot were not being tracked 2022-02-12 22:46:50 +01:00
dgtlmoon
cfcf59d009 Switch store filename depending on type 2022-02-12 22:22:14 +01:00
dgtlmoon
af25b824a0 small tidyup 2022-02-12 22:13:53 +01:00
dgtlmoon
a29085fa18 check preview page shows what we expect 2022-02-12 22:13:33 +01:00
dgtlmoon
d7832d735d Check preview page is working 2022-02-12 22:11:36 +01:00
dgtlmoon
7d1c4d7673 Allow 'trigger text' on JSON docs 2022-02-12 21:53:02 +01:00
dgtlmoon
6e00f0e025 tidy up checksum check ara 2022-02-12 21:46:23 +01:00
dgtlmoon
4f536bb559 Fix json detect bug 2022-02-12 21:40:35 +01:00
dgtlmoon
38d8aa8d28 encode to str/bytes 2022-02-12 18:26:43 +01:00
dgtlmoon
dec47d5c43 trying to resolve json cast issue 2022-02-12 18:25:25 +01:00
dgtlmoon
cec24fe2c1 Check if 'application/json; charset=utf-8' 2022-02-12 18:22:11 +01:00
dgtlmoon
f4bc0aa2ba Not needed 2022-02-12 18:08:38 +01:00
dgtlmoon
499c4797da More works and tests 2022-02-12 18:08:18 +01:00
dgtlmoon
9bc71d187e Split out content type methods 2022-02-12 17:21:25 +01:00
53 changed files with 779 additions and 1762 deletions

View File

@@ -2,20 +2,16 @@ name: Build and push containers
on:
# Automatically triggered by a testing workflow passing, but this is only checked when it lands in the `master`/default branch
# workflow_run:
# workflows: ["ChangeDetection.io Test"]
# branches: [master]
# tags: ['0.*']
# types: [completed]
workflow_run:
workflows: ["ChangeDetection.io Test"]
branches: [master]
tags: ['0.*']
types: [completed]
# Or a new tagged release
release:
types: [published, edited]
push:
branches:
- master
jobs:
metadata:
runs-on: ubuntu-latest
@@ -95,7 +91,8 @@ jobs:
file: ./Dockerfile
push: true
tags: |
${{ secrets.DOCKER_HUB_USERNAME }}/changedetection.io:latest,ghcr.io/${{ github.repository }}:latest
${{ secrets.DOCKER_HUB_USERNAME }}/changedetection.io:latest
ghcr.io/${{ github.repository }}:latest
platforms: linux/amd64,linux/arm64,linux/arm/v6,linux/arm/v7
cache-from: type=local,src=/tmp/.buildx-cache
cache-to: type=local,dest=/tmp/.buildx-cache
@@ -110,7 +107,8 @@ jobs:
file: ./Dockerfile
push: true
tags: |
${{ secrets.DOCKER_HUB_USERNAME }}/changedetection.io:${{ github.event.release.tag_name }},ghcr.io/dgtlmoon/changedetection.io:${{ github.event.release.tag_name }}
${{ secrets.DOCKER_HUB_USERNAME }}/changedetection.io:${{ github.event.release.tag_name }}
ghcr.io/dgtlmoon/changedetection.io:${{ github.event.release.tag_name }}
platforms: linux/amd64,linux/arm64,linux/arm/v6,linux/arm/v7
cache-from: type=local,src=/tmp/.buildx-cache
cache-to: type=local,dest=/tmp/.buildx-cache

2
.gitignore vendored
View File

@@ -7,6 +7,4 @@ __pycache__
.pytest_cache
build
dist
venv
*.egg-info*
.vscode/settings.json

View File

@@ -2,5 +2,5 @@ recursive-include changedetectionio/templates *
recursive-include changedetectionio/static *
include changedetection.py
global-exclude *.pyc
global-exclude node_modules
global-exclude *node_modules*
global-exclude venv

View File

@@ -15,19 +15,13 @@ Open source web page monitoring, notification and change detection.
<img src="https://raw.githubusercontent.com/dgtlmoon/changedetection.io/master/screenshot.png" style="max-width:100%;" alt="Self-hosted web page change monitoring" title="Self-hosted web page change monitoring" />
**Get your own private instance now! Let us host it for you!**
**Get your own instance now on Lemonade!**
[![Deploy to Lemonade](https://lemonade.changedetection.io/static/images/lemonade.svg)](https://lemonade.changedetection.io/start)
[_Let us host your own private instance - We accept PayPal and Bitcoin, Support the further development of changedetection.io!_](https://lemonade.changedetection.io/start)
- Automatic Updates, Automatic Backups, No Heroku "paused application", don't miss a change!
- Javascript browser included
- Unlimited checks and watches!
- Pay with Bitcoin
#### Example use cases
@@ -105,8 +99,6 @@ See the wiki for more information https://github.com/dgtlmoon/changedetection.io
## Filters
XPath, JSONPath and CSS support comes baked in! You can be as specific as you need, use XPath exported from various XPath element query creation tools.
(We support LXML re:test, re:math and re:replace.)
## Notifications
ChangeDetection.io supports a massive amount of notifications (including email, office365, custom APIs, etc) when a web-page has a change detected thanks to the <a href="https://github.com/caronc/apprise">apprise</a> library.
@@ -163,9 +155,9 @@ See the wiki https://github.com/dgtlmoon/changedetection.io/wiki/Proxy-configura
Raspberry Pi and linux/arm/v6 linux/arm/v7 arm64 devices are supported! See the wiki for [details](https://github.com/dgtlmoon/changedetection.io/wiki/Fetching-pages-with-WebDriver)
## Windows support?
## Windows native support?
YES! See the wiki https://github.com/dgtlmoon/changedetection.io/wiki/Microsoft-Windows
Sorry not yet :( https://github.com/dgtlmoon/changedetection.io/labels/windows
## Support us

View File

@@ -1,11 +1,110 @@
#!/usr/bin/python3
# Entry-point for running from the CLI when not installed via Pip, Pip will handle the console_scripts entry_points's from setup.py
# It's recommended to use `pip3 install changedetection.io` and start with `changedetection.py` instead, it will be linkd to your global path.
# or Docker.
# Read more https://github.com/dgtlmoon/changedetection.io/wiki
# Launch as a eventlet.wsgi server instance.
import getopt
import os
import sys
import eventlet
import eventlet.wsgi
import changedetectionio
from changedetectionio import store
def main():
ssl_mode = False
host = ''
port = os.environ.get('PORT') or 5000
do_cleanup = False
# Must be absolute so that send_from_directory doesnt try to make it relative to backend/
datastore_path = os.path.join(os.getcwd(), "datastore")
try:
opts, args = getopt.getopt(sys.argv[1:], "Ccsd:h:p:", "port")
except getopt.GetoptError:
print('backend.py -s SSL enable -h [host] -p [port] -d [datastore path]')
sys.exit(2)
create_datastore_dir = False
for opt, arg in opts:
# if opt == '--purge':
# Remove history, the actual files you need to delete manually.
# for uuid, watch in datastore.data['watching'].items():
# watch.update({'history': {}, 'last_checked': 0, 'last_changed': 0, 'previous_md5': None})
if opt == '-s':
ssl_mode = True
if opt == '-h':
host = arg
if opt == '-p':
port = int(arg)
if opt == '-d':
datastore_path = arg
# Cleanup (remove text files that arent in the index)
if opt == '-c':
do_cleanup = True
# Create the datadir if it doesnt exist
if opt == '-C':
create_datastore_dir = True
# isnt there some @thingy to attach to each route to tell it, that this route needs a datastore
app_config = {'datastore_path': datastore_path}
if not os.path.isdir(app_config['datastore_path']):
if create_datastore_dir:
os.mkdir(app_config['datastore_path'])
else:
print ("ERROR: Directory path for the datastore '{}' does not exist, cannot start, please make sure the directory exists.\n"
"Alternatively, use the -C parameter.".format(app_config['datastore_path']),file=sys.stderr)
sys.exit(2)
datastore = store.ChangeDetectionStore(datastore_path=app_config['datastore_path'], version_tag=changedetectionio.__version__)
app = changedetectionio.changedetection_app(app_config, datastore)
# Go into cleanup mode
if do_cleanup:
datastore.remove_unused_snapshots()
app.config['datastore_path'] = datastore_path
@app.context_processor
def inject_version():
return dict(right_sticky="v{}".format(datastore.data['version_tag']),
new_version_available=app.config['NEW_VERSION_AVAILABLE'],
has_password=datastore.data['settings']['application']['password'] != False
)
# Proxy sub-directory support
# Set environment var USE_X_SETTINGS=1 on this script
# And then in your proxy_pass settings
#
# proxy_set_header Host "localhost";
# proxy_set_header X-Forwarded-Prefix /app;
if os.getenv('USE_X_SETTINGS'):
print ("USE_X_SETTINGS is ENABLED\n")
from werkzeug.middleware.proxy_fix import ProxyFix
app.wsgi_app = ProxyFix(app.wsgi_app, x_prefix=1, x_host=1)
if ssl_mode:
# @todo finalise SSL config, but this should get you in the right direction if you need it.
eventlet.wsgi.server(eventlet.wrap_ssl(eventlet.listen((host, port)),
certfile='cert.pem',
keyfile='privkey.pem',
server_side=True), app)
else:
eventlet.wsgi.server(eventlet.listen((host, int(port))), app)
from changedetectionio import changedetection
if __name__ == '__main__':
changedetection.main()
main()

View File

@@ -1 +0,0 @@
test-datastore

View File

@@ -35,11 +35,8 @@ from flask import (
url_for,
)
from flask_login import login_required
from flask_wtf import CSRFProtect
from changedetectionio import html_tools
__version__ = '0.39.11'
__version__ = '0.39.8'
datastore = None
@@ -53,10 +50,11 @@ update_q = queue.Queue()
notification_q = queue.Queue()
# Needs to be set this way because we also build and publish via pip
base_path = os.path.dirname(os.path.realpath(__file__))
app = Flask(__name__,
static_url_path="",
static_folder="static",
template_folder="templates")
static_url_path="{}/static".format(base_path),
template_folder="{}/templates".format(base_path))
# Stop browser caching of assets
app.config['SEND_FILE_MAX_AGE_DEFAULT'] = 0
@@ -72,9 +70,6 @@ app.config['LOGIN_DISABLED'] = False
# Disables caching of the templates
app.config['TEMPLATES_AUTO_RELOAD'] = True
csrf = CSRFProtect()
csrf.init_app(app)
notification_debug_log=[]
def init_app_secret(datastore_path):
@@ -131,7 +126,7 @@ def _jinja2_filter_datetimestamp(timestamp, format="%Y-%m-%d %H:%M:%S"):
# return timeago.format(timestamp, time.time())
# return datetime.datetime.utcfromtimestamp(timestamp).strftime(format)
# When nobody is logged in Flask-Login's current_user is set to an AnonymousUser object.
class User(flask_login.UserMixin):
id=None
@@ -140,6 +135,7 @@ class User(flask_login.UserMixin):
def get_user(self, email="defaultuser@changedetection.io"):
return self
def is_authenticated(self):
return True
def is_active(self):
return True
@@ -218,10 +214,6 @@ def changedetection_app(config=None, datastore_o=None):
return redirect(url_for('index'))
if request.method == 'GET':
if flask_login.current_user.is_authenticated:
flash("Already logged in")
return redirect(url_for("index"))
output = render_template("login.html")
return output
@@ -257,11 +249,6 @@ def changedetection_app(config=None, datastore_o=None):
# (No password in settings or env var)
app.config['LOGIN_DISABLED'] = datastore.data['settings']['application']['password'] == False and os.getenv("SALTED_PASS", False) == False
# Set the auth cookie path if we're running as X-settings/X-Forwarded-Prefix
if os.getenv('USE_X_SETTINGS') and 'X-Forwarded-Prefix' in request.headers:
app.config['REMEMBER_COOKIE_PATH'] = request.headers['X-Forwarded-Prefix']
app.config['SESSION_COOKIE_PATH'] = request.headers['X-Forwarded-Prefix']
# For the RSS path, allow access via a token
if request.path == '/rss' and request.args.get('token'):
app_rss_token = datastore.data['settings']['application']['rss_access_token']
@@ -380,10 +367,7 @@ def changedetection_app(config=None, datastore_o=None):
tags=existing_tags,
active_tag=limit_tag,
app_rss_token=datastore.data['settings']['application']['rss_access_token'],
has_unviewed=datastore.data['has_unviewed'],
# Don't link to hosting when we're on the hosting environment
hosted_sticky=os.getenv("SALTED_PASS", False) == False,
guid=datastore.data['app_guid'])
has_unviewed=datastore.data['has_unviewed'])
return output
@@ -457,7 +441,7 @@ def changedetection_app(config=None, datastore_o=None):
raw_content = file.read()
handler = fetch_site_status.perform_site_check(datastore=datastore)
stripped_content = html_tools.strip_ignore_text(raw_content,
stripped_content = handler.strip_ignore_text(raw_content,
datastore.data['watching'][uuid]['ignore_text'])
if datastore.data['settings']['application'].get('ignore_whitespace', False):
@@ -507,13 +491,13 @@ def changedetection_app(config=None, datastore_o=None):
'headers': form.headers.data,
'body': form.body.data,
'method': form.method.data,
'ignore_status_codes': form.ignore_status_codes.data,
'fetch_backend': form.fetch_backend.data,
'trigger_text': form.trigger_text.data,
'notification_title': form.notification_title.data,
'notification_body': form.notification_body.data,
'notification_format': form.notification_format.data,
'extract_title_as_title': form.extract_title_as_title.data,
'extract_title_as_title': form.extract_title_as_title.data
}
# Notification URLs
@@ -530,7 +514,6 @@ def changedetection_app(config=None, datastore_o=None):
datastore.data['watching'][uuid]['css_filter'] = form.css_filter.data.strip()
datastore.data['watching'][uuid]['subtractive_selectors'] = form.subtractive_selectors.data
# Reset the previous_md5 so we process a new snapshot including stripping ignore text.
if form.css_filter.data.strip() != datastore.data['watching'][uuid]['css_filter']:
@@ -563,14 +546,10 @@ def changedetection_app(config=None, datastore_o=None):
flash('No notification URLs set, cannot send test.', 'error')
# Diff page [edit] link should go back to diff page
if request.args.get("next") and request.args.get("next") == 'diff' and not form.save_and_preview_button.data:
if request.args.get("next") and request.args.get("next") == 'diff':
return redirect(url_for('diff_history_page', uuid=uuid))
else:
if form.save_and_preview_button.data:
flash('You may need to reload this page to see the new content.')
return redirect(url_for('preview_page', uuid=uuid))
else:
return redirect(url_for('index'))
return redirect(url_for('index'))
else:
if request.method == 'POST' and not form.validate():
@@ -603,7 +582,6 @@ def changedetection_app(config=None, datastore_o=None):
if request.method == 'GET':
form.minutes_between_check.data = int(datastore.data['settings']['requests']['minutes_between_check'])
form.notification_urls.data = datastore.data['settings']['application']['notification_urls']
form.global_subtractive_selectors.data = datastore.data['settings']['application']['global_subtractive_selectors']
form.global_ignore_text.data = datastore.data['settings']['application']['global_ignore_text']
form.ignore_whitespace.data = datastore.data['settings']['application']['ignore_whitespace']
form.extract_title_as_title.data = datastore.data['settings']['application']['extract_title_as_title']
@@ -613,15 +591,16 @@ def changedetection_app(config=None, datastore_o=None):
form.notification_format.data = datastore.data['settings']['application']['notification_format']
form.base_url.data = datastore.data['settings']['application']['base_url']
if request.method == 'POST' and form.data.get('removepassword_button') == True:
# Password unset is a GET, but we can lock the session to a salted env password to always need the password
if not os.getenv("SALTED_PASS", False):
# Password unset is a GET, but we can lock the session to always need the password
if not os.getenv("SALTED_PASS", False) and request.values.get('removepassword') == 'yes':
from pathlib import Path
datastore.data['settings']['application']['password'] = False
flash("Password protection removed.", 'notice')
flask_login.logout_user()
return redirect(url_for('settings_page'))
if request.method == 'POST' and form.validate():
datastore.data['settings']['application']['notification_urls'] = form.notification_urls.data
datastore.data['settings']['requests']['minutes_between_check'] = form.minutes_between_check.data
datastore.data['settings']['application']['extract_title_as_title'] = form.extract_title_as_title.data
@@ -631,7 +610,6 @@ def changedetection_app(config=None, datastore_o=None):
datastore.data['settings']['application']['notification_format'] = form.notification_format.data
datastore.data['settings']['application']['notification_urls'] = form.notification_urls.data
datastore.data['settings']['application']['base_url'] = form.base_url.data
datastore.data['settings']['application']['global_subtractive_selectors'] = form.global_subtractive_selectors.data
datastore.data['settings']['application']['global_ignore_text'] = form.global_ignore_text.data
datastore.data['settings']['application']['ignore_whitespace'] = form.ignore_whitespace.data
@@ -717,6 +695,10 @@ def changedetection_app(config=None, datastore_o=None):
@app.route("/diff/<string:uuid>", methods=['GET'])
@login_required
def diff_history_page(uuid):
from changedetectionio import content_fetcher
newest_version_file_contents = ""
previous_version_file_contents = ""
# More for testing, possible to return the first/only
if uuid == 'first':
@@ -742,28 +724,28 @@ def changedetection_app(config=None, datastore_o=None):
# Save the current newest history as the most recently viewed
datastore.set_last_viewed(uuid, dates[0])
newest_file = watch['history'][dates[0]]
try:
with open(newest_file, 'r') as f:
newest_version_file_contents = f.read()
except Exception as e:
newest_version_file_contents = "Unable to read {}.\n".format(newest_file)
previous_version = request.args.get('previous_version')
try:
previous_file = watch['history'][previous_version]
except KeyError:
# Not present, use a default value, the second one in the sorted list.
previous_file = watch['history'][dates[1]]
if ('content-type' in watch and content_fetcher.supported_binary_type(watch['content-type'])):
template = "diff-image.html"
else:
newest_file = watch['history'][dates[0]]
with open(newest_file, 'r') as f:
newest_version_file_contents = f.read()
try:
previous_file = watch['history'][previous_version]
except KeyError:
# Not present, use a default value, the second one in the sorted list.
previous_file = watch['history'][dates[1]]
try:
with open(previous_file, 'r') as f:
previous_version_file_contents = f.read()
except Exception as e:
previous_version_file_contents = "Unable to read {}.\n".format(previous_file)
output = render_template("diff.html", watch_a=watch,
template = "diff.html"
output = render_template(template,
watch_a=watch,
newest=newest_version_file_contents,
previous=previous_version_file_contents,
extra_stylesheets=extra_stylesheets,
@@ -773,16 +755,14 @@ def changedetection_app(config=None, datastore_o=None):
current_previous_version=str(previous_version),
current_diff_url=watch['url'],
extra_title=" - Diff - {}".format(watch['title'] if watch['title'] else watch['url']),
left_sticky=True)
left_sticky= True )
return output
@app.route("/preview/<string:uuid>", methods=['GET'])
@login_required
def preview_page(uuid):
content = []
ignored_line_numbers = []
trigger_line_numbers = []
from changedetectionio import content_fetcher
# More for testing, possible to return the first/only
if uuid == 'first':
@@ -796,52 +776,26 @@ def changedetection_app(config=None, datastore_o=None):
flash("No history found for the specified link, bad link?", "error")
return redirect(url_for('index'))
if len(watch['history']):
timestamps = sorted(watch['history'].keys(), key=lambda x: int(x))
filename = watch['history'][timestamps[-1]]
try:
with open(filename, 'r') as f:
tmp = f.readlines()
newest = list(watch['history'].keys())[-1]
fname = watch['history'][newest]
# Get what needs to be highlighted
ignore_rules = watch.get('ignore_text', []) + datastore.data['settings']['application']['global_ignore_text']
# .readlines will keep the \n, but we will parse it here again, in the future tidy this up
ignored_line_numbers = html_tools.strip_ignore_text(content="".join(tmp),
wordlist=ignore_rules,
mode='line numbers'
)
trigger_line_numbers = html_tools.strip_ignore_text(content="".join(tmp),
wordlist=watch['trigger_text'],
mode='line numbers'
)
# Prepare the classes and lines used in the template
i=0
for l in tmp:
classes=[]
i+=1
if i in ignored_line_numbers:
classes.append('ignored')
if i in trigger_line_numbers:
classes.append('triggered')
content.append({'line': l, 'classes': ' '.join(classes)})
except Exception as e:
content.append({'line': "File doesnt exist or unable to read file {}".format(filename), 'classes': ''})
if ('content-type' in watch and content_fetcher.supported_binary_type(watch['content-type'])):
template = "preview-image.html"
content = fname
else:
content.append({'line': "No history found", 'classes': ''})
template = "preview.html"
try:
with open(fname, 'r') as f:
content = f.read()
except:
content = "Cant read {}".format(fname)
output = render_template("preview.html",
content=content,
extra_stylesheets=extra_stylesheets,
ignored_line_numbers=ignored_line_numbers,
triggered_line_numbers=trigger_line_numbers,
current_diff_url=watch['url'],
watch=watch,
uuid=uuid)
uuid=uuid,
watch=watch)
return output
@app.route("/settings/notification-logs", methods=['GET'])
@@ -853,6 +807,49 @@ def changedetection_app(config=None, datastore_o=None):
return output
# render an image which contains the diff of two images
# We always compare the newest against whatever compare_date we are given
@app.route("/diff/show-image/<string:uuid>/<string:datestr>")
def show_single_image(uuid, datestr):
from flask import make_response
watch = datastore.data['watching'][uuid]
if datestr == 'None' or datestr is None:
datestr = list(watch['history'].keys())[0]
fname = watch['history'][datestr]
with open(fname, 'rb') as f:
resp = make_response(f.read())
# @todo assumption here about the type, re-encode? detect?
resp.headers['Content-Type'] = 'image/jpeg'
return resp
# render an image which contains the diff of two images
# We always compare the newest against whatever compare_date we are given
@app.route("/diff/image/<string:uuid>/<string:compare_date>")
def render_diff_image(uuid, compare_date):
from changedetectionio import image_diff
from flask import make_response
watch = datastore.data['watching'][uuid]
newest = list(watch['history'].keys())[-1]
# @todo this is weird
if compare_date == 'None' or compare_date is None:
compare_date = list(watch['history'].keys())[0]
new_img = watch['history'][newest]
prev_img = watch['history'][compare_date]
img = image_diff.render_diff(new_img, prev_img)
resp = make_response(img)
resp.headers['Content-Type'] = 'image/jpeg'
return resp
@app.route("/api/<string:uuid>/snapshot/current", methods=['GET'])
@login_required
def api_snapshot(uuid):
@@ -1136,42 +1133,22 @@ def ticker_thread_check_time_launch_checks():
running_uuids.append(t.current_uuid)
# Re #232 - Deepcopy the data incase it changes while we're iterating through it all
while True:
try:
copied_datastore = deepcopy(datastore)
except RuntimeError as e:
# RuntimeError: dictionary changed size during iteration
time.sleep(0.1)
else:
break
# Re #438 - Don't place more watches in the queue to be checked if the queue is already large
while update_q.qsize() >= 2000:
time.sleep(1)
copied_datastore = deepcopy(datastore)
# Check for watches outside of the time threshold to put in the thread queue.
now = time.time()
max_system_wide = int(copied_datastore.data['settings']['requests']['minutes_between_check']) * 60
for uuid, watch in copied_datastore.data['watching'].items():
# No need todo further processing if it's paused
if watch['paused']:
continue
# If they supplied an individual entry minutes to threshold.
watch_minutes_between_check = watch.get('minutes_between_check', None)
if watch_minutes_between_check is not None:
if 'minutes_between_check' in watch and watch['minutes_between_check'] is not None:
# Cast to int just incase
max_time = int(watch_minutes_between_check) * 60
max_time = int(watch['minutes_between_check']) * 60
else:
# Default system wide.
max_time = max_system_wide
max_time = int(copied_datastore.data['settings']['requests']['minutes_between_check']) * 60
threshold = now - max_time
threshold = time.time() - max_time
# Yeah, put it in the queue, it's more than time
if watch['last_checked'] <= threshold:
# Yeah, put it in the queue, it's more than time.
if not watch['paused'] and watch['last_checked'] <= threshold:
if not uuid in running_uuids and uuid not in update_q.queue:
update_q.put(uuid)

View File

@@ -1,114 +0,0 @@
#!/usr/bin/python3
# Launch as a eventlet.wsgi server instance.
import getopt
import os
import sys
import eventlet
import eventlet.wsgi
from . import store, changedetection_app
from . import __version__
def main():
ssl_mode = False
host = ''
port = os.environ.get('PORT') or 5000
do_cleanup = False
datastore_path = None
# On Windows, create and use a default path.
if os.name == 'nt':
datastore_path = os.path.expandvars(r'%APPDATA%\changedetection.io')
os.makedirs(datastore_path, exist_ok=True)
else:
# Must be absolute so that send_from_directory doesnt try to make it relative to backend/
datastore_path = os.path.join(os.getcwd(), "../datastore")
try:
opts, args = getopt.getopt(sys.argv[1:], "Ccsd:h:p:", "port")
except getopt.GetoptError:
print('backend.py -s SSL enable -h [host] -p [port] -d [datastore path]')
sys.exit(2)
create_datastore_dir = False
for opt, arg in opts:
# if opt == '--purge':
# Remove history, the actual files you need to delete manually.
# for uuid, watch in datastore.data['watching'].items():
# watch.update({'history': {}, 'last_checked': 0, 'last_changed': 0, 'previous_md5': None})
if opt == '-s':
ssl_mode = True
if opt == '-h':
host = arg
if opt == '-p':
port = int(arg)
if opt == '-d':
datastore_path = arg
# Cleanup (remove text files that arent in the index)
if opt == '-c':
do_cleanup = True
# Create the datadir if it doesnt exist
if opt == '-C':
create_datastore_dir = True
# isnt there some @thingy to attach to each route to tell it, that this route needs a datastore
app_config = {'datastore_path': datastore_path}
if not os.path.isdir(app_config['datastore_path']):
if create_datastore_dir:
os.mkdir(app_config['datastore_path'])
else:
print(
"ERROR: Directory path for the datastore '{}' does not exist, cannot start, please make sure the directory exists or specify a directory with the -d option.\n"
"Or use the -C parameter to create the directory.".format(app_config['datastore_path']), file=sys.stderr)
sys.exit(2)
datastore = store.ChangeDetectionStore(datastore_path=app_config['datastore_path'], version_tag=__version__)
app = changedetection_app(app_config, datastore)
# Go into cleanup mode
if do_cleanup:
datastore.remove_unused_snapshots()
app.config['datastore_path'] = datastore_path
@app.context_processor
def inject_version():
return dict(right_sticky="v{}".format(datastore.data['version_tag']),
new_version_available=app.config['NEW_VERSION_AVAILABLE'],
has_password=datastore.data['settings']['application']['password'] != False
)
# Proxy sub-directory support
# Set environment var USE_X_SETTINGS=1 on this script
# And then in your proxy_pass settings
#
# proxy_set_header Host "localhost";
# proxy_set_header X-Forwarded-Prefix /app;
if os.getenv('USE_X_SETTINGS'):
print ("USE_X_SETTINGS is ENABLED\n")
from werkzeug.middleware.proxy_fix import ProxyFix
app.wsgi_app = ProxyFix(app.wsgi_app, x_prefix=1, x_host=1)
if ssl_mode:
# @todo finalise SSL config, but this should get you in the right direction if you need it.
eventlet.wsgi.server(eventlet.wrap_ssl(eventlet.listen((host, port)),
certfile='cert.pem',
keyfile='privkey.pem',
server_side=True), app)
else:
eventlet.wsgi.server(eventlet.listen((host, int(port))), app)

View File

@@ -1,14 +1,13 @@
from abc import ABC, abstractmethod
import chardet
import os
import time
from abc import ABC, abstractmethod
from selenium import webdriver
from selenium.webdriver.common.desired_capabilities import DesiredCapabilities
from selenium.webdriver.common.proxy import Proxy as SeleniumProxy
from selenium.common.exceptions import WebDriverException
import requests
import time
import urllib3.exceptions
# image/jpeg etc
supported_binary_types = ['image']
class EmptyReply(Exception):
def __init__(self, status_code, url):
@@ -22,7 +21,7 @@ class EmptyReply(Exception):
class Fetcher():
error = None
status_code = None
content = None
content = None # Should always be bytes.
headers = None
fetcher_description ="No description"
@@ -32,13 +31,7 @@ class Fetcher():
return self.error
@abstractmethod
def run(self,
url,
timeout,
request_headers,
request_body,
request_method,
ignore_status_codes=False):
def run(self, url, timeout, request_headers, request_body, request_method):
# Should set self.error, self.status_code and self.content
pass
@@ -59,6 +52,15 @@ class Fetcher():
# def return_diff(self, stream_a, stream_b):
# return
# Assume we dont support it as binary if its not in our list
def supported_binary_type(content_type):
# Not a binary thing we support? then use text (also used for JSON/XML etc)
# @todo - future - use regex for matching
if content_type and content_type.lower().strip().split('/')[0] not in (string.lower() for string in supported_binary_types):
return False
return True
def available_fetchers():
import inspect
from changedetectionio import content_fetcher
@@ -105,13 +107,7 @@ class html_webdriver(Fetcher):
if proxy_args:
self.proxy = SeleniumProxy(raw=proxy_args)
def run(self,
url,
timeout,
request_headers,
request_body,
request_method,
ignore_status_codes=False):
def run(self, url, timeout, request_headers, request_body, request_method):
# request_body, request_method unused for now, until some magic in the future happens.
@@ -159,13 +155,8 @@ class html_webdriver(Fetcher):
class html_requests(Fetcher):
fetcher_description = "Basic fast Plaintext/HTTP Client"
def run(self,
url,
timeout,
request_headers,
request_body,
request_method,
ignore_status_codes=False):
def run(self, url, timeout, request_headers, request_body, request_method):
import requests
r = requests.request(method=request_method,
data=request_body,
@@ -174,21 +165,19 @@ class html_requests(Fetcher):
timeout=timeout,
verify=False)
# If the response did not tell us what encoding format to expect, Then use chardet to override what `requests` thinks.
# For example - some sites don't tell us it's utf-8, but return utf-8 content
# This seems to not occur when using webdriver/selenium, it seems to detect the text encoding more reliably.
# https://github.com/psf/requests/issues/1604 good info about requests encoding detection
if not r.headers.get('content-type') or not 'charset=' in r.headers.get('content-type'):
encoding = chardet.detect(r.content)['encoding']
if encoding:
r.encoding = encoding
# https://stackoverflow.com/questions/44203397/python-requests-get-returns-improperly-decoded-text-instead-of-utf-8
if not supported_binary_type(r.headers.get('Content-Type', '')):
content = r.text
else:
content = r.content
# @todo test this
# @todo maybe you really want to test zero-byte return pages?
if (not ignore_status_codes and not r) or not r.content or not len(r.content):
if not r or not content or not len(content):
raise EmptyReply(url=url, status_code=r.status_code)
self.status_code = r.status_code
self.content = r.text
self.content = content
self.headers = r.headers

View File

@@ -1,11 +1,10 @@
import hashlib
import os
import re
import time
import urllib3
from changedetectionio import content_fetcher
import hashlib
from inscriptis import get_text
from changedetectionio import content_fetcher, html_tools
import urllib3
from . import html_tools
import re
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
@@ -17,20 +16,51 @@ class perform_site_check():
super().__init__(*args, **kwargs)
self.datastore = datastore
def strip_ignore_text(self, content, list_ignore_text):
import re
ignore = []
ignore_regex = []
for k in list_ignore_text:
# Is it a regex?
if k[0] == '/':
ignore_regex.append(k.strip(" /"))
else:
ignore.append(k)
output = []
for line in content.splitlines():
# Always ignore blank lines in this mode. (when this function gets called)
if len(line.strip()):
regex_matches = False
# if any of these match, skip
for regex in ignore_regex:
try:
if re.search(regex, line, re.IGNORECASE):
regex_matches = True
except Exception as e:
continue
if not regex_matches and not any(skip_text in line for skip_text in ignore):
output.append(line.encode('utf8'))
return "\n".encode('utf8').join(output)
def run(self, uuid):
timestamp = int(time.time()) # used for storage etc too
changed_detected = False
stripped_text_from_html = ""
fetched_md5 = ""
original_content_before_filters = False
watch = self.datastore.data['watching'][uuid]
# Protect against file:// access
if re.search(r'^file', watch['url'], re.IGNORECASE) and not os.getenv('ALLOW_FILE_URI', False):
raise Exception(
"file:// type access is denied for security reasons."
)
# Unset any existing notification error
update_obj = {'last_notification_error': False, 'last_error': False}
@@ -53,7 +83,6 @@ class perform_site_check():
url = self.datastore.get_val(uuid, 'url')
request_body = self.datastore.get_val(uuid, 'body')
request_method = self.datastore.get_val(uuid, 'method')
ignore_status_code = self.datastore.get_val(uuid, 'ignore_status_codes')
# Pluggable content fetcher
prefer_backend = watch['fetch_backend']
@@ -65,7 +94,8 @@ class perform_site_check():
fetcher = klass()
fetcher.run(url, timeout, request_headers, request_body, request_method, ignore_status_code)
fetcher.run(url, timeout, request_headers, request_body, request_method)
# Fetching complete, now filters
# @todo move to class / maybe inside of fetcher abstract base?
@@ -75,38 +105,39 @@ class perform_site_check():
# - Do we convert to JSON?
# https://stackoverflow.com/questions/41817578/basic-method-chaining ?
# return content().textfilter().jsonextract().checksumcompare() ?
is_json = 'application/json' in fetcher.headers.get('Content-Type', '')
is_html = not is_json
css_filter_rule = watch['css_filter']
subtractive_selectors = watch.get(
"subtractive_selectors", []
) + self.datastore.data["settings"]["application"].get(
"global_subtractive_selectors", []
)
has_filter_rule = css_filter_rule and len(css_filter_rule.strip())
has_subtractive_selectors = subtractive_selectors and len(subtractive_selectors[0].strip())
update_obj['content-type'] = fetcher.headers.get('Content-Type', '').lower().strip()
# Could be 'application/json; charset=utf-8' etc
is_json = 'application/json' in update_obj['content-type']
is_text_or_html = 'text/' in update_obj['content-type'] # text/plain , text/html etc
is_binary = not is_text_or_html and content_fetcher.supported_binary_type(update_obj['content-type'])
css_filter_rule = watch['css_filter']
has_filter_rule = css_filter_rule and len(css_filter_rule.strip())
# Auto-detect application/json, make it reformat the JSON to something nice
if is_json and not has_filter_rule:
css_filter_rule = "json:$"
has_filter_rule = True
if has_filter_rule:
if 'json:' in css_filter_rule:
stripped_text_from_html = html_tools.extract_json_as_string(content=fetcher.content, jsonpath_filter=css_filter_rule)
is_html = False
##### CONVERT THE INPUT TO TEXT, EXTRACT THE PARTS THAT NEED TO BE FILTERED
if is_html:
# Dont depend on the content-type header here, maybe it's not present
if 'json:' in css_filter_rule:
is_json = True
rule = css_filter_rule.replace('json:', '')
stripped_text_from_html = html_tools.extract_json_as_string(content=fetcher.content,
jsonpath_filter=rule).encode('utf-8')
is_text_or_html = False
original_content_before_filters = stripped_text_from_html
if is_text_or_html:
# CSS Filter, extract the HTML that matches and feed that into the existing inscriptis::get_text
html_content = fetcher.content
# If not JSON, and if it's not text/plain..
if 'text/plain' in fetcher.headers.get('Content-Type', '').lower():
# Don't run get_text or xpath/css filters on plaintext
if 'text/plain' in update_obj['content-type']:
stripped_text_from_html = html_content
else:
# Then we assume HTML
# Assume it's HTML if it's not text/plain
if not 'text/plain' in update_obj['content-type']:
if has_filter_rule:
# For HTML/XML we offer xpath as an option, just start a regular xPath "/.."
if css_filter_rule[0] == '/':
@@ -114,33 +145,52 @@ class perform_site_check():
else:
# CSS Filter, extract the HTML that matches and feed that into the existing inscriptis::get_text
html_content = html_tools.css_filter(css_filter=css_filter_rule, html_content=fetcher.content)
if has_subtractive_selectors:
html_content = html_tools.element_removal(subtractive_selectors, html_content)
# get_text() via inscriptis
stripped_text_from_html = get_text(html_content)
# Re #340 - return the content before the 'ignore text' was applied
text_content_before_ignored_filter = stripped_text_from_html.encode('utf-8')
# Extract title as title
if self.datastore.data['settings']['application']['extract_title_as_title'] or watch['extract_title_as_title']:
if not watch['title'] or not len(watch['title']):
update_obj['title'] = html_tools.extract_element(find='title', html_content=fetcher.content)
# Re #340 - return the content before the 'ignore text' was applied
original_content_before_filters = stripped_text_from_html.encode('utf-8')
# We rely on the actual text in the html output.. many sites have random script vars etc,
# in the future we'll implement other mechanisms.
update_obj["last_check_status"] = fetcher.get_last_status_code()
# If there's text to skip
# @todo we could abstract out the get_text() to handle this cleaner
text_to_ignore = watch.get('ignore_text', []) + self.datastore.data['settings']['application'].get('global_ignore_text', [])
if len(text_to_ignore):
stripped_text_from_html = html_tools.strip_ignore_text(stripped_text_from_html, text_to_ignore)
else:
stripped_text_from_html = stripped_text_from_html.encode('utf8')
######## AFTER FILTERING, STRIP OUT IGNORE TEXT
if is_text_or_html:
text_to_ignore = watch.get('ignore_text', []) + self.datastore.data['settings']['application'].get('global_ignore_text', [])
if len(text_to_ignore):
stripped_text_from_html = self.strip_ignore_text(stripped_text_from_html, text_to_ignore)
else:
stripped_text_from_html = stripped_text_from_html.encode('utf8')
######## CALCULATE CHECKSUM FOR DIFF DETECTION
# Re #133 - if we should strip whitespaces from triggering the change detected comparison
if self.datastore.data['settings']['application'].get('ignore_whitespace', False):
fetched_md5 = hashlib.md5(stripped_text_from_html.translate(None, b'\r\n\t ')).hexdigest()
else:
if is_text_or_html:
if self.datastore.data['settings']['application'].get('ignore_whitespace', False):
fetched_md5 = hashlib.md5(stripped_text_from_html.translate(None, b'\r\n\t ')).hexdigest()
else:
fetched_md5 = hashlib.md5(stripped_text_from_html).hexdigest()
if is_json:
fetched_md5 = hashlib.md5(stripped_text_from_html).hexdigest()
# Goal here in the future is to be able to abstract out different content type checks into their own class
if is_binary:
# @todo - use some actual image hash here where possible, audio hash, etc etc
m = hashlib.sha256()
m.update(fetcher.content)
fetched_md5 = m.hexdigest()
original_content_before_filters = fetcher.content
# On the first run of a site, watch['previous_md5'] will be an empty string, set it the current one.
if not len(watch['previous_md5']):
watch['previous_md5'] = fetched_md5
@@ -148,16 +198,24 @@ class perform_site_check():
blocked_by_not_found_trigger_text = False
if len(watch['trigger_text']):
# Yeah, lets block first until something matches
blocked_by_not_found_trigger_text = True
# Filter and trigger works the same, so reuse it
result = html_tools.strip_ignore_text(content=str(stripped_text_from_html),
wordlist=watch['trigger_text'],
mode="line numbers")
if result:
blocked_by_not_found_trigger_text = False
# Trigger text can apply to JSON parsed documents too
if is_text_or_html or is_json:
if len(watch['trigger_text']):
blocked_by_not_found_trigger_text = True
for line in watch['trigger_text']:
# Because JSON wont serialize a re.compile object
if line[0] == '/' and line[-1] == '/':
regex = re.compile(line.strip('/'), re.IGNORECASE)
# Found it? so we don't wait for it anymore
r = re.search(regex, str(stripped_text_from_html))
if r:
blocked_by_not_found_trigger_text = False
break
elif line.lower() in str(stripped_text_from_html).lower():
# We found it don't wait for it.
blocked_by_not_found_trigger_text = False
break
if not blocked_by_not_found_trigger_text and watch['previous_md5'] != fetched_md5:
changed_detected = True
@@ -165,11 +223,5 @@ class perform_site_check():
update_obj["last_changed"] = timestamp
# Extract title as title
if is_html:
if self.datastore.data['settings']['application']['extract_title_as_title'] or watch['extract_title_as_title']:
if not watch['title'] or not len(watch['title']):
update_obj['title'] = html_tools.extract_element(find='title', html_content=fetcher.content)
return changed_detected, update_obj, text_content_before_ignored_filter
# original_content_before_filters is returned for saving the data to disk
return changed_detected, update_obj, original_content_before_filters

View File

@@ -1,30 +1,12 @@
from wtforms import Form, SelectField, RadioField, BooleanField, StringField, PasswordField, validators, IntegerField, fields, TextAreaField, \
Field
from wtforms import widgets
from wtforms.validators import ValidationError
from wtforms.fields import html5
from changedetectionio import content_fetcher
import re
from wtforms import (
BooleanField,
Field,
Form,
IntegerField,
PasswordField,
RadioField,
SelectField,
StringField,
SubmitField,
TextAreaField,
fields,
validators,
widgets,
)
from wtforms.fields import html5
from wtforms.validators import ValidationError
from changedetectionio import content_fetcher
from changedetectionio.notification import (
default_notification_body,
default_notification_format,
default_notification_title,
valid_notification_formats,
)
from changedetectionio.notification import default_notification_format, valid_notification_formats, default_notification_body, default_notification_title
valid_method = {
'GET',
@@ -62,8 +44,8 @@ class SaltyPasswordField(StringField):
encrypted_password = ""
def build_password(self, password):
import base64
import hashlib
import base64
import secrets
# Make a new salt on every new password and store it with the password
@@ -121,9 +103,8 @@ class ValidateContentFetcherIsReady(object):
self.message = message
def __call__(self, form, field):
import urllib3.exceptions
from changedetectionio import content_fetcher
import urllib3.exceptions
# Better would be a radiohandler that keeps a reference to each class
if field.data is not None:
@@ -231,69 +212,52 @@ class ValidateListRegex(object):
except re.error:
message = field.gettext('RegEx \'%s\' is not a valid regular expression.')
raise ValidationError(message % (line))
class ValidateCSSJSONXPATHInput(object):
"""
Filter validation
@todo CSS validator ;)
"""
def __init__(self, message=None, allow_xpath=True, allow_json=True):
def __init__(self, message=None):
self.message = message
self.allow_xpath = allow_xpath
self.allow_json = allow_json
def __call__(self, form, field):
if isinstance(field.data, str):
data = [field.data]
else:
data = field.data
for line in data:
# Nothing to see here
if not len(line.strip()):
return
if not len(field.data.strip()):
return
# Does it look like XPath?
if line.strip()[0] == '/':
if not self.allow_xpath:
raise ValidationError("XPath not permitted in this field!")
from lxml import etree, html
tree = html.fromstring("<html></html>")
# Does it look like XPath?
if field.data.strip()[0] == '/':
from lxml import html, etree
tree = html.fromstring("<html></html>")
try:
tree.xpath(line.strip())
except etree.XPathEvalError as e:
message = field.gettext('\'%s\' is not a valid XPath expression. (%s)')
raise ValidationError(message % (line, str(e)))
except:
raise ValidationError("A system-error occurred when validating your XPath expression")
try:
tree.xpath(field.data.strip())
except etree.XPathEvalError as e:
message = field.gettext('\'%s\' is not a valid XPath expression. (%s)')
raise ValidationError(message % (field.data, str(e)))
except:
raise ValidationError("A system-error occurred when validating your XPath expression")
if 'json:' in line:
if not self.allow_json:
raise ValidationError("JSONPath not permitted in this field!")
if 'json:' in field.data:
from jsonpath_ng.exceptions import JsonPathParserError, JsonPathLexerError
from jsonpath_ng.ext import parse
from jsonpath_ng.exceptions import (
JsonPathLexerError,
JsonPathParserError,
)
from jsonpath_ng.ext import parse
input = field.data.replace('json:', '')
input = line.replace('json:', '')
try:
parse(input)
except (JsonPathParserError, JsonPathLexerError) as e:
message = field.gettext('\'%s\' is not a valid JSONPath expression. (%s)')
raise ValidationError(message % (input, str(e)))
except:
raise ValidationError("A system-error occurred when validating your JSONPath expression")
try:
parse(input)
except (JsonPathParserError, JsonPathLexerError) as e:
message = field.gettext('\'%s\' is not a valid JSONPath expression. (%s)')
raise ValidationError(message % (input, str(e)))
except:
raise ValidationError("A system-error occurred when validating your JSONPath expression")
# Re #265 - maybe in the future fetch the page and offer a
# warning/notice that its possible the rule doesnt yet match anything?
# Re #265 - maybe in the future fetch the page and offer a
# warning/notice that its possible the rule doesnt yet match anything?
class quickWatchForm(Form):
# https://wtforms.readthedocs.io/en/2.3.x/fields/#module-wtforms.fields.html5
# `require_tld` = False is needed even for the test harness "http://localhost:5005.." to run
@@ -318,19 +282,14 @@ class watchForm(commonSettingsForm):
minutes_between_check = html5.IntegerField('Maximum time in minutes until recheck',
[validators.Optional(), validators.NumberRange(min=1)])
css_filter = StringField('CSS/JSON/XPATH Filter', [ValidateCSSJSONXPATHInput()])
subtractive_selectors = StringListField('Remove elements', [ValidateCSSJSONXPATHInput(allow_xpath=False, allow_json=False)])
title = StringField('Title')
ignore_text = StringListField('Ignore Text', [ValidateListRegex()])
headers = StringDictKeyValue('Request Headers')
body = TextAreaField('Request Body', [validators.Optional()])
method = SelectField('Request Method', choices=valid_method, default=default_method)
ignore_status_codes = BooleanField('Ignore Status Codes (process non-2xx status codes as normal)', default=False)
trigger_text = StringListField('Trigger/wait for text', [validators.Optional(), ValidateListRegex()])
save_button = SubmitField('Save', render_kw={"class": "pure-button pure-button-primary"})
save_and_preview_button = SubmitField('Save & Preview', render_kw={"class": "pure-button pure-button-primary"})
def validate(self, **kwargs):
if not super().validate():
return False
@@ -351,8 +310,5 @@ class globalSettingsForm(commonSettingsForm):
[validators.NumberRange(min=1)])
extract_title_as_title = BooleanField('Extract <title> from document and use as watch title')
base_url = StringField('Base URL', validators=[validators.Optional()])
global_subtractive_selectors = StringListField('Remove elements', [ValidateCSSJSONXPATHInput(allow_xpath=False, allow_json=False)])
global_ignore_text = StringListField('Ignore Text', [ValidateListRegex()])
ignore_whitespace = BooleanField('Ignore whitespace')
save_button = SubmitField('Save', render_kw={"class": "pure-button pure-button-primary"})
removepassword_button = SubmitField('Remove password', render_kw={"class": "pure-button pure-button-primary"})
ignore_whitespace = BooleanField('Ignore whitespace')

View File

@@ -1,7 +1,4 @@
import json
import re
from typing import List
from bs4 import BeautifulSoup
from jsonpath_ng.ext import parse
@@ -19,27 +16,16 @@ def css_filter(css_filter, html_content):
return html_block + "\n"
def subtractive_css_selector(css_selector, html_content):
soup = BeautifulSoup(html_content, "html.parser")
for item in soup.select(css_selector):
item.decompose()
return str(soup)
def element_removal(selectors: List[str], html_content):
"""Joins individual filters into one css filter."""
selector = ",".join(selectors)
return subtractive_css_selector(selector, html_content)
# Return str Utf-8 of matched rules
def xpath_filter(xpath_filter, html_content):
from lxml import etree, html
from lxml import html
from lxml import etree
tree = html.fromstring(html_content)
html_block = ""
for item in tree.xpath(xpath_filter.strip(), namespaces={'re':'http://exslt.org/regular-expressions'}):
for item in tree.xpath(xpath_filter.strip()):
html_block+= etree.tostring(item, pretty_print=True).decode('utf-8')+"<br/>"
return html_block
@@ -78,8 +64,7 @@ def _parse_json(json_data, jsonpath_filter):
# Re 265 - Just return an empty string when filter not found
return ''
# Ticket #462 - allow the original encoding through, usually it's UTF-8 or similar
stripped_text_from_html = json.dumps(s, indent=4, ensure_ascii=False)
stripped_text_from_html = json.dumps(s, indent=4)
return stripped_text_from_html
@@ -120,50 +105,3 @@ def extract_json_as_string(content, jsonpath_filter):
return ''
return stripped_text_from_html
# Mode - "content" return the content without the matches (default)
# - "line numbers" return a list of line numbers that match (int list)
#
# wordlist - list of regex's (str) or words (str)
def strip_ignore_text(content, wordlist, mode="content"):
ignore = []
ignore_regex = []
# @todo check this runs case insensitive
for k in wordlist:
# Is it a regex?
if k[0] == '/':
ignore_regex.append(k.strip(" /"))
else:
ignore.append(k)
i = 0
output = []
ignored_line_numbers = []
for line in content.splitlines():
i += 1
# Always ignore blank lines in this mode. (when this function gets called)
if len(line.strip()):
regex_matches = False
# if any of these match, skip
for regex in ignore_regex:
try:
if re.search(regex, line, re.IGNORECASE):
regex_matches = True
except Exception as e:
continue
if not regex_matches and not any(skip_text.lower() in line.lower() for skip_text in ignore):
output.append(line.encode('utf8'))
else:
ignored_line_numbers.append(i)
# Used for finding out what to highlight
if mode == "line numbers":
return ignored_line_numbers
return "\n".encode('utf8').join(output)

View File

@@ -0,0 +1,41 @@
# import the necessary packages
from skimage.metrics import structural_similarity as compare_ssim
import argparse
import imutils
import cv2
# From https://www.pyimagesearch.com/2017/06/19/image-difference-with-opencv-and-python/
def render_diff(fpath_imageA, fpath_imageB):
imageA = cv2.imread(fpath_imageA)
imageB = cv2.imread(fpath_imageB)
# convert the images to grayscale
grayA = cv2.cvtColor(imageA, cv2.COLOR_BGR2GRAY)
grayB = cv2.cvtColor(imageB, cv2.COLOR_BGR2GRAY)
# compute the Structural Similarity Index (SSIM) between the two
# images, ensuring that the difference image is returned
(score, diff) = compare_ssim(grayA, grayB, full=True)
diff = (diff * 255).astype("uint8")
print("SSIM: {}".format(score))
# threshold the difference image, followed by finding contours to
# obtain the regions of the two input images that differ
thresh = cv2.threshold(diff, 0, 255,
cv2.THRESH_BINARY_INV | cv2.THRESH_OTSU)[1]
cnts = cv2.findContours(thresh.copy(), cv2.RETR_EXTERNAL,
cv2.CHAIN_APPROX_SIMPLE)
cnts = imutils.grab_contours(cnts)
# loop over the contours
for c in cnts:
# compute the bounding box of the contour and then draw the
# bounding box on both input images to represent where the two
# images differ
(x, y, w, h) = cv2.boundingRect(c)
cv2.rectangle(imageA, (x, y), (x + w, y + h), (0, 0, 255), 2)
cv2.rectangle(imageB, (x, y), (x + w, y + h), (0, 0, 255), 2)
#return cv2.imencode('.jpg', imageB)[1].tobytes()
return cv2.imencode('.jpg', imageA)[1].tobytes()

View File

@@ -54,19 +54,3 @@ ins {
body {
height: 99%;
/* Hide scroll bar in Firefox */ } }
td#diff-col div {
text-align: justify;
white-space: pre-wrap; }
.ignored {
background-color: #ccc;
/* border: #0d91fa 1px solid; */
opacity: 0.7; }
.triggered {
background-color: #1b98f8; }
/* ignored and triggered? make it obvious error */
.ignored.triggered {
background-color: #ff0000; }

View File

@@ -66,23 +66,3 @@ ins {
height: 99%; /* Hide scroll bar in Firefox */
}
}
td#diff-col div {
text-align: justify;
white-space: pre-wrap;
}
.ignored {
background-color: #ccc;
/* border: #0d91fa 1px solid; */
opacity: 0.7;
}
.triggered {
background-color: #1b98f8;
}
/* ignored and triggered? make it obvious error */
.ignored.triggered {
background-color: #ff0000;
}

View File

@@ -4,7 +4,8 @@
"description": "",
"main": "index.js",
"scripts": {
"build": "node-sass styles.scss -o .;node-sass diff.scss -o ."
"build": "node-sass styles.scss diff.scss -o .",
"watch": "node-sass --watch styles.scss diff.scss -o ."
},
"author": "",
"license": "ISC",

File diff suppressed because one or more lines are too long

View File

@@ -42,14 +42,9 @@ section.content {
justify-content: center;
}
code {
background: #eee;
}
/* table related */
.watch-table {
width: 100%;
font-size: 80%;
tr.unviewed {
font-weight: bold;
@@ -60,6 +55,7 @@ code {
}
td {
font-size: 80%;
white-space: nowrap;
}
@@ -111,12 +107,12 @@ code {
body:after {
content: "";
background: linear-gradient(130deg, #5ad8f7, #2f50af 41.07%, #9150bf 84.05%);
background: linear-gradient(130deg, #ff7a18, #af002d 41.07%, #319197 76.05%)
}
body:after, body:before {
display: block;
height: 650px;
height: 600px;
position: absolute;
top: 0;
left: 0;
@@ -129,8 +125,11 @@ body::after {
}
body::before {
// background-image set in base.html so it works with reverse proxies etc
content: "";
background-image: url(/static/images/gradient-border.png);
}
body:before {
background-size: cover
}
@@ -266,7 +265,6 @@ body:after, body:before {
}
legend {
color: #fff;
font-weight: bold;
}
}
@@ -319,9 +317,11 @@ footer {
*/
}
.sticky-tab {
position: absolute;
top: 60px;
top: 80px;
font-size: 8px;
background: #fff;
padding: 10px;
@@ -331,11 +331,6 @@ footer {
&#right-sticky {
right: 0px;
}
&#hosted-sticky {
right: 0px;
top: 100px;
font-weight: bold;
}
}
#new-version-text a {
@@ -547,16 +542,6 @@ $form-edge-padding: 20px;
display: block;
}
}
.login-form {
.inner {
background: #fff;;
padding: $form-edge-padding;
border-radius: 5px;
}
}
.edit-form {
min-width: 70%;
.tab-pane-inner {
@@ -580,14 +565,5 @@ $form-edge-padding: 20px;
display: block;
background: #fff;
}
.pure-form-message-inline {
padding-left: 0;
}
}
ul {
padding-left: 1em;
padding-top: 0px;
margin-top: 4px;
}

View File

@@ -1,19 +1,15 @@
from os import unlink, path, mkdir
import json
import logging
import os
import threading
import time
import uuid as uuid_builder
from copy import deepcopy
from os import mkdir, path, unlink
from threading import Lock
from copy import deepcopy
from changedetectionio.notification import (
default_notification_body,
default_notification_format,
default_notification_title,
)
import logging
import time
import threading
import os
from changedetectionio.notification import default_notification_format, default_notification_body, default_notification_title
# Is there an existing library to ensure some data store (JSON etc) is in sync with CRUD methods?
# Open a github issue if you know something :)
@@ -50,7 +46,6 @@ class ChangeDetectionStore:
'extract_title_as_title': False,
'fetch_backend': 'html_requests',
'global_ignore_text': [], # List of text to ignore when calculating the comparison checksum
'global_subtractive_selectors': [],
'ignore_whitespace': False,
'notification_urls': [], # Apprise URL list
# Custom notification content
@@ -87,7 +82,6 @@ class ChangeDetectionStore:
'notification_body': default_notification_body,
'notification_format': default_notification_format,
'css_filter': "",
'subtractive_selectors': [],
'trigger_text': [], # List of text or regex to wait for until a change is detected
'fetch_backend': None,
'extract_title_as_title': False
@@ -150,8 +144,8 @@ class ChangeDetectionStore:
unlink(password_reset_lockfile)
if not 'app_guid' in self.__data:
import os
import sys
import os
if "pytest" in sys.modules or "PYTEST_CURRENT_TEST" in os.environ:
self.__data['app_guid'] = "test-" + str(uuid_builder.uuid4())
else:
@@ -190,6 +184,10 @@ class ChangeDetectionStore:
def update_watch(self, uuid, update_obj):
# Skip if 'paused' state
if self.__data['watching'][uuid]['paused']:
return
with self.lock:
# In python 3.9 we have the |= dict operator, but that still will lose data on nested structures...
@@ -374,7 +372,9 @@ class ChangeDetectionStore:
if not os.path.isdir(output_path):
mkdir(output_path)
fname = "{}/{}.stripped.txt".format(output_path, uuid.uuid4())
suffix = "stripped.txt"
fname = "{}/{}.{}".format(output_path, uuid.uuid4(), suffix)
with open(fname, 'wb') as f:
f.write(contents)
f.close()
@@ -400,10 +400,13 @@ class ChangeDetectionStore:
# system was out of memory, out of RAM etc
with open(self.json_store_path+".tmp", 'w') as json_file:
json.dump(data, json_file, indent=4)
os.replace(self.json_store_path+".tmp", self.json_store_path)
except Exception as e:
logging.error("Error writing JSON!! (Main JSON file save was skipped) : %s", str(e))
else:
os.rename(self.json_store_path+".tmp", self.json_store_path)
self.needs_write = False
# Thread runner, this helps with thread/write issues when there are many operations that want to update the JSON
@@ -436,7 +439,6 @@ class ChangeDetectionStore:
index.append(self.data['watching'][uuid]['history'][str(id)])
import pathlib
# Only in the sub-directories
for item in pathlib.Path(self.datastore_path).rglob("*/*txt"):
if not str(item) in index:

View File

@@ -34,8 +34,9 @@
</div>
<div class="pure-controls">
<span class="pure-form-message-inline">
These tokens can be used in the notification body and title to customise the notification text.
These tokens can be used in the notification body and title to
customise the notification text.
</span>
<table class="pure-table" id="token-table">
<thead>
<tr>
@@ -87,7 +88,7 @@
</tr>
</tbody>
</table>
<br/>
<span class="pure-form-message-inline">
URLs generated by changedetection.io (such as <code>{diff_url}</code>) require the <code>BASE_URL</code> environment variable set.<br/>
Your <code>BASE_URL</code> var is currently "{{current_base_url}}"
</span>

View File

@@ -25,6 +25,3 @@
{% endmacro %}
{% macro render_button(field) %}
{{ field(**kwargs)|safe }}
{% endmacro %}

View File

@@ -12,13 +12,7 @@
<link rel="stylesheet" href="{{ m }}?ver=1000">
{% endfor %}
{% endif %}
<style>
body::before {
background-image: url({{url_for('static_content', group='images', filename='gradient-border.png')}});
}
</style>
</head>
<body>
<div class="header">
@@ -41,13 +35,13 @@
{% if current_user.is_authenticated or not has_password %}
{% if not current_diff_url %}
<li class="pure-menu-item">
<a href="{{ url_for('settings_page')}}" class="pure-menu-link">SETTINGS</a>
<a href="{{ url_for('get_backup')}}" class="pure-menu-link">BACKUP</a>
</li>
<li class="pure-menu-item">
<a href="{{ url_for('import_page')}}" class="pure-menu-link">IMPORT</a>
</li>
<li class="pure-menu-item">
<a href="{{ url_for('get_backup')}}" class="pure-menu-link">BACKUP</a>
<a href="{{ url_for('settings_page')}}" class="pure-menu-link">SETTINGS</a>
</li>
{% else %}
<li class="pure-menu-item">
@@ -74,7 +68,7 @@
</ul>
</div>
</div>
{% if hosted_sticky %}<div class="sticky-tab" id="hosted-sticky"><a href="https://lemonade.changedetection.io/start?ref={{guid}}">Let us host your instance!</a></div>{% endif %}
{% if left_sticky %}<div class="sticky-tab" id="left-sticky"><a href="{{url_for('preview_page', uuid=uuid)}}">Show current snapshot</a></div> {% endif %}
{% if right_sticky %}<div class="sticky-tab" id="right-sticky">{{ right_sticky }}</div> {% endif %}
<section class="content">

View File

@@ -0,0 +1,59 @@
{% extends 'base.html' %}
{% block content %}
<div id="settings">
<h1>Differences</h1>
<form class="pure-form " action="" method="GET">
<fieldset>
{% if versions|length >= 1 %}
<label for="diff-version">Compare newest (<span id="current-v-date"></span>) with</label>
<select id="diff-version" name="previous_version">
{% for version in versions %}
<option value="{{version}}" {% if version== current_previous_version %} selected="" {% endif %}>
{{version}}
</option>
{% endfor %}
</select>
<button type="submit" class="pure-button pure-button-primary">Go</button>
{% endif %}
</fieldset>
</form>
</div>
<div id="diff-ui">
<img style="max-width: 100%" src="{{ url_for('render_diff_image', uuid=uuid, compare_date=current_previous_version) }}" />
<div>
<span style="width: 50%">
<img style="max-width: 100%" src="{{ url_for('show_single_image', uuid=uuid, datestr=newest_version_timestamp) }}" />
</span>
<span style="width: 50%">
<img style="max-width: 100%" src="{{ url_for('show_single_image', uuid=uuid, datestr=current_previous_version) }}" />
</span>
</div>
</div>
<script type="text/javascript" src="{{url_for('static_content', group='js', filename='diff.js')}}"></script>
<script defer="">
window.onload = function() {
/* Set current version date as local time in the browser also */
var current_v = document.getElementById("current-v-date");
var dateObject = new Date({{ newest_version_timestamp }}*1000);
current_v.innerHTML=dateObject.toLocaleString();
/* Convert what is options from UTC time.time() to local browser time */
var diffList=document.getElementById("diff-version");
if (typeof(diffList) != 'undefined' && diffList != null) {
for (var option of diffList.options) {
var dateObject = new Date(option.value*1000);
option.label=dateObject.toLocaleString();
}
}
}
</script>
{% endblock %}

View File

@@ -36,7 +36,6 @@
<a onclick="next_diff();">Jump</a>
</div>
<div id="diff-ui">
<div class="tip">Pro-tip: Use <strong>show current snapshot</strong> tab to visualise what will be ignored.</div>
<table>
<tbody>
<tr>

View File

@@ -1,7 +1,6 @@
{% extends 'base.html' %}
{% block content %}
{% from '_helpers.jinja' import render_field %}
{% from '_helpers.jinja' import render_button %}
{% from '_common_fields.jinja' import render_common_settings_form %}
<script type="text/javascript" src="{{url_for('static_content', group='js', filename='tabs.js')}}" defer></script>
@@ -19,7 +18,6 @@
<div class="box-wrap inner">
<form class="pure-form pure-form-stacked"
action="{{ url_for('edit_page', uuid=uuid, next = request.args.get('next') ) }}" method="POST">
<input type="hidden" name="csrf_token" value="{{ csrf_token() }}"/>
<div class="tab-pane-inner" id="general">
<fieldset>
@@ -59,33 +57,24 @@
</span>
</div>
<hr/>
<fieldset class="pure-group">
<span class="pure-form-message-inline">
<strong>Request override is currently only used by the <i>Basic fast Plaintext/HTTP Client</i> method.</strong>
</span>
<div class="pure-control-group">
{{ render_field(form.method) }}
</div>
<div class="pure-control-group">
{{ render_field(form.headers, rows=5, placeholder="Example
<div class="pure-control-group">
{{ render_field(form.method) }}
</div>
<strong>Note: <i>Request Headers and Body settings are ONLY used by Basic fast Plaintext/HTTP Client fetch method.</i></strong>
{{ render_field(form.headers, rows=5, placeholder="Example
Cookie: foobar
User-Agent: wonderbra 1.0") }}
</div>
<div class="pure-control-group">
{{ render_field(form.body, rows=5, placeholder="Example
</fieldset>
<div class="pure-control-group">
{{ render_field(form.body, rows=5, placeholder="Example
{
\"name\":\"John\",
\"age\":30,
\"car\":null
}") }}
</div>
<div>
{{ render_field(form.ignore_status_codes) }}
</div>
</fieldset>
<br/>
</div>
</div>
<div class="tab-pane-inner" id="notifications">
@@ -99,45 +88,22 @@ User-Agent: wonderbra 1.0") }}
<div class="tab-pane-inner" id="filters-and-triggers">
<fieldset>
<div class="pure-control-group">
<strong>Pro-tips:</strong><br/>
<ul>
<li>
Use the preview page to see your filters and triggers highlighted.
</li>
<li>
Some sites use JavaScript to create the content, for this you should <a href="https://github.com/dgtlmoon/changedetection.io/wiki/Fetching-pages-with-WebDriver">use the Chrome/WebDriver Fetcher</a>
</li>
</ul>
</div>
<div class="pure-control-group">
{{ render_field(form.css_filter, placeholder=".class-name or #some-id, or other CSS selector rule.",
class="m-d") }}
<span class="pure-form-message-inline">
<ul>
<li>CSS - Limit text to this CSS rule, only text matching this CSS rule is included.</li>
<li>JSON - Limit text to this JSON rule, using <a href="https://pypi.org/project/jsonpath-ng/">JSONPath</a>, prefix with <code>"json:"</code>, use <code>json:$</code> to force re-formatting if required, <a
<li>JSON - Limit text to this JSON rule, using <a href="https://pypi.org/project/jsonpath-ng/">JSONPath</a>, prefix with <b>"json:"</b>, <a
href="https://jsonpath.com/" target="new">test your JSONPath here</a></li>
<li>XPath - Limit text to this XPath rule, simply start with a forward-slash, example <code>//*[contains(@class, 'sametext')]</code>, <a
<li>XPATH - Limit text to this XPath rule, simply start with a forward-slash, example <b>//*[contains(@class, 'sametext')]</b>, <a
href="http://xpather.com/" target="new">test your XPath here</a></li>
</ul>
Please be sure that you thoroughly understand how to write CSS or JSONPath, XPath selector rules before filing an issue on GitHub! <a
href="https://github.com/dgtlmoon/changedetection.io/wiki/CSS-Selector-help">here for more CSS selector help</a>.<br/>
</span>
</div>
<fieldset class="pure-group">
{{ render_field(form.subtractive_selectors, rows=5, placeholder="header
footer
nav
.stockticker") }}
<span class="pure-form-message-inline">
<ul>
<li> Remove HTML element(s) by CSS selector before text conversion. </li>
<li> Add multiple elements or CSS selectors per line to ignore multiple parts of the HTML. </li>
</ul>
</span>
</fieldset>
</fieldset>
<fieldset class="pure-group">
{{ render_field(form.ignore_text, rows=5, placeholder="Some text to ignore in a line
@@ -146,9 +112,8 @@ nav
<span class="pure-form-message-inline">
<ul>
<li>Each line processed separately, any line matching will be ignored (removed before creating the checksum)</li>
<li>Regular Expression support, wrap the line in forward slash <code>/regex/</code></li>
<li>Regular Expression support, wrap the line in forward slash <b>/regex/</b></li>
<li>Changing this will affect the comparison checksum which may trigger an alert</li>
<li>Use the preview/show current tab to see ignores</li>
</ul>
</span>
@@ -163,7 +128,7 @@ nav
<li>Text to wait for before triggering a change/notification, all text and regex are tested <i>case-insensitive</i>.</li>
<li>Trigger text is processed from the result-text that comes out of any CSS/JSON Filters for this watch</li>
<li>Each line is process separately (think of each line as "OR")</li>
<li>Note: Wrap in forward slash / to use regex example: <code>/foo\d/</code></li>
<li>Note: Wrap in forward slash / to use regex example: <span style="font-family: monospace; background: #eee">/foo\d/</span></li>
</ul>
</span>
</div>
@@ -173,8 +138,7 @@ nav
<div id="actions">
<div class="pure-control-group">
{{ render_button(form.save_button) }} {{ render_button(form.save_and_preview_button) }}
<button type="submit" class="pure-button pure-button-primary">Save</button>
<a href="{{url_for('api_delete', uuid=uuid)}}"
class="pure-button button-small button-error ">Delete</a>
<a href="{{url_for('api_clone', uuid=uuid)}}"

View File

@@ -4,7 +4,6 @@
<div class="edit-form">
<div class="inner">
<form class="pure-form pure-form-aligned" action="{{url_for('import_page')}}" method="POST">
<input type="hidden" name="csrf_token" value="{{ csrf_token() }}"/>
<fieldset class="pure-group">
<legend>
Enter one URL per line, and optionally add tags for each URL after a space, delineated by comma (,):

View File

@@ -1,10 +1,10 @@
{% extends 'base.html' %}
{% block content %}
<div class="login-form">
<div class="edit-form">
<div class="inner">
<form class="pure-form pure-form-stacked" action="{{url_for('login')}}" method="POST">
<input type="hidden" name="csrf_token" value="{{ csrf_token() }}"/>
<fieldset>
<div class="pure-control-group">
<label for="password">Password</label>

View File

@@ -0,0 +1,13 @@
{% extends 'base.html' %}
{% block content %}
<div id="settings">
<h1>Current</h1>
</div>
<div id="diff-ui">
image goes here
</div>
{% endblock %}

View File

@@ -3,21 +3,19 @@
{% block content %}
<div id="settings">
<h1>Current - {{watch.last_checked|format_timestamp_timeago}}</h1>
<h1>Current</h1>
</div>
<div id="diff-ui">
<span class="ignored">Grey lines are ignored</span> <span class="triggered">Blue lines are triggers</span>
<table>
<tbody>
<tr>
<td id="diff-col">
{% for row in content %}
<div class="{{row.classes}}">{{row.line}}</div>
{% endfor %}
<span id="result">{{content}}</span>
</td>
</tr>
</tbody>
</table>
</div>
{% endblock %}

View File

@@ -4,7 +4,6 @@
<div class="edit-form">
<div class="box-wrap inner">
<form class="pure-form pure-form-stacked" action="{{url_for('scrub_page')}}" method="POST">
<input type="hidden" name="csrf_token" value="{{ csrf_token() }}"/>
<fieldset>
<div class="pure-control-group">
This will remove all version snapshots/data, but keep your list of URLs. <br/>

View File

@@ -1,7 +1,7 @@
{% extends 'base.html' %}
{% block content %}
{% from '_helpers.jinja' import render_field, render_button %}
{% from '_helpers.jinja' import render_field %}
{% from '_common_fields.jinja' import render_common_settings_form %}
<script type="text/javascript" src="{{url_for('static_content', group='js', filename='settings.js')}}" defer></script>
@@ -18,7 +18,6 @@
</div>
<div class="box-wrap inner">
<form class="pure-form pure-form-stacked settings" action="{{url_for('settings_page')}}" method="POST">
<input type="hidden" name="csrf_token" value="{{ csrf_token() }}"/>
<div class="tab-pane-inner" id="general">
<fieldset>
<div class="pure-control-group">
@@ -28,7 +27,8 @@
<div class="pure-control-group">
{% if not hide_remove_pass %}
{% if current_user.is_authenticated %}
{{ render_button(form.removepassword_button) }}
<a href="{{url_for('settings_page', removepassword='yes')}}"
class="pure-button pure-button-primary">Remove password</a>
{% else %}
{{ render_field(form.password) }}
<span class="pure-form-message-inline">Password protection for your changedetection.io application.</span>
@@ -83,18 +83,7 @@
</span>
</fieldset>
<fieldset class="pure-group">
{{ render_field(form.global_subtractive_selectors, rows=5, placeholder="header
footer
nav
.stockticker") }}
<span class="pure-form-message-inline">
<ul>
<li> Remove HTML element(s) by CSS selector before text conversion. </li>
<li> Add multiple elements or CSS selectors per line to ignore multiple parts of the HTML. </li>
</ul>
</span>
</fieldset>
<fieldset class="pure-group">
{{ render_field(form.global_ignore_text, rows=5, placeholder="Some text to ignore in a line
/some.regex\d{2}/ for case-INsensitive regex
@@ -104,9 +93,8 @@ nav
<ul>
<li>Note: This is applied globally in addition to the per-watch rules.</li>
<li>Each line processed separately, any line matching will be ignored (removed before creating the checksum)</li>
<li>Regular Expression support, wrap the line in forward slash <code>/regex/</code></li>
<li>Regular Expression support, wrap the line in forward slash <b>/regex/</b></li>
<li>Changing this will affect the comparison checksum which may trigger an alert</li>
<li>Use the preview/show current tab to see ignores</li>
</ul>
</span>
</fieldset>
@@ -114,9 +102,11 @@ nav
<div id="actions">
<div class="pure-control-group">
{{ render_button(form.save_button) }}
<a href="{{url_for('index')}}" class="pure-button button-small button-cancel">Back</a>
<a href="{{url_for('scrub_page')}}" class="pure-button button-small button-cancel">Delete History Snapshot Data</a>
<button type="submit" class="pure-button pure-button-primary">Save</button>
<a href="{{url_for('index')}}" class="pure-button button-small button-cancel">Back</a>
<a href="{{url_for('scrub_page')}}" class="pure-button button-small button-cancel">Delete
History
Snapshot Data</a>
</div>
</div>
</form>

View File

@@ -5,7 +5,6 @@
<div class="box">
<form class="pure-form" action="{{ url_for('api_watch_add') }}" method="POST" id="new-watch-form">
<input type="hidden" name="csrf_token" value="{{ csrf_token() }}"/>
<fieldset>
<legend>Add a new change detection watch</legend>
{{ render_simple_field(form.url, placeholder="https://...", required=true) }}

View File

@@ -42,9 +42,6 @@ def app(request):
cleanup(app_config['datastore_path'])
datastore = store.ChangeDetectionStore(datastore_path=app_config['datastore_path'], include_default_watches=False)
app = changedetection_app(app_config, datastore)
# Disable CSRF while running tests
app.config['WTF_CSRF_ENABLED'] = False
app.config['STOP_THREADS'] = True
def teardown():

View File

@@ -4,8 +4,8 @@ from flask import url_for
def test_check_access_control(app, client):
# Still doesnt work, but this is closer.
with app.test_client(use_cookies=True) as c:
# Check we don't have any password protection enabled yet.
with app.test_client() as c:
# Check we dont have any password protection enabled yet.
res = c.get(url_for("settings_page"))
assert b"Remove password" not in res.data
@@ -46,20 +46,15 @@ def test_check_access_control(app, client):
assert b"BACKUP" in res.data
assert b"IMPORT" in res.data
assert b"LOG OUT" in res.data
assert b"minutes_between_check" in res.data
assert b"fetch_backend" in res.data
res = c.post(
url_for("settings_page"),
data={
"minutes_between_check": 180,
"tag": "",
"headers": "",
"fetch_backend": "html_webdriver",
"removepassword_button": "Remove password"
},
follow_redirects=True,
)
# Now remove the password so other tests function, @todo this should happen before each test automatically
res = c.get(url_for("settings_page", removepassword="yes"),
follow_redirects=True)
assert b"Password protection removed." in res.data
res = c.get(url_for("index"))
assert b"LOG OUT" not in res.data
# There was a bug where saving the settings form would submit a blank password
def test_check_access_control_no_blank_password(app, client):
@@ -76,7 +71,8 @@ def test_check_access_control_no_blank_password(app, client):
data={"password": "",
"minutes_between_check": 180,
'fetch_backend': "html_requests"},
follow_redirects=True
follow_redirects=True
)
assert b"Password protection enabled." not in res.data
@@ -95,8 +91,7 @@ def test_check_access_no_remote_access_to_remove_password(app, client):
# Enable password check.
res = c.post(
url_for("settings_page"),
data={"password": "password",
"minutes_between_check": 180,
data={"password": "password", "minutes_between_check": 180,
'fetch_backend': "html_requests"},
follow_redirects=True
)
@@ -104,17 +99,8 @@ def test_check_access_no_remote_access_to_remove_password(app, client):
assert b"Password protection enabled." in res.data
assert b"Login" in res.data
res = c.post(
url_for("settings_page"),
data={
"minutes_between_check": 180,
"tag": "",
"headers": "",
"fetch_backend": "html_webdriver",
"removepassword_button": "Remove password"
},
follow_redirects=True,
)
res = c.get(url_for("settings_page", removepassword="yes"),
follow_redirects=True)
assert b"Password protection removed." not in res.data
res = c.get(url_for("index"),

View File

@@ -14,6 +14,7 @@ def set_response_data(test_return_data):
def test_snapshot_api_detects_change(client, live_server):
test_return_data = "Some initial text"
test_return_data_modified = "Some NEW nice initial text"
@@ -26,7 +27,7 @@ def test_snapshot_api_detects_change(client, live_server):
time.sleep(1)
# Add our URL to the import page
test_url = url_for('test_endpoint', content_type="text/plain", _external=True)
test_url = url_for('test_endpoint', _external=True)
res = client.post(
url_for("import_page"),
data={"urls": test_url},

View File

@@ -7,13 +7,6 @@ from . util import set_original_response, set_modified_response, live_server_set
sleep_time_for_fetch_thread = 3
# Basic test to check inscriptus is not adding return line chars, basically works etc
def test_inscriptus():
from inscriptis import get_text
html_content="<html><body>test!<br/>ok man</body></html>"
stripped_text_from_html = get_text(html_content)
assert stripped_text_from_html == 'test!\nok man'
def test_check_basic_change_detection_functionality(client, live_server):
set_original_response()
@@ -25,7 +18,6 @@ def test_check_basic_change_detection_functionality(client, live_server):
data={"urls": url_for('test_endpoint', _external=True)},
follow_redirects=True
)
assert b"1 Imported" in res.data
time.sleep(sleep_time_for_fetch_thread)
@@ -108,6 +100,14 @@ def test_check_basic_change_detection_functionality(client, live_server):
# It should have picked up the <title>
assert b'head title' in res.data
# be sure the HTML converter worked
res = client.get(url_for("preview_page", uuid="first"))
assert b'<html>' not in res.data
res = client.get(url_for("preview_page", uuid="first"))
assert b'Some initial text' in res.data
#
# Cleanup everything
res = client.get(url_for("api_delete", uuid="all"), follow_redirects=True)

View File

@@ -0,0 +1,56 @@
#!/usr/bin/python3
import time
import secrets
from flask import url_for
from . util import live_server_setup
def test_binary_file_change(client, live_server):
with open("test-datastore/test.bin", "wb") as f:
f.write(secrets.token_bytes())
live_server_setup(live_server)
sleep_time_for_fetch_thread = 3
# Give the endpoint time to spin up
time.sleep(1)
# Add our URL to the import page
test_url = url_for('test_binaryfile_endpoint', _external=True)
res = client.post(
url_for("import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
# Trigger a check
client.get(url_for("api_watch_checknow"), follow_redirects=True)
# Give the thread time to pick it up
time.sleep(sleep_time_for_fetch_thread)
# Trigger a check
client.get(url_for("api_watch_checknow"), follow_redirects=True)
# It should report nothing found (no new 'unviewed' class)
res = client.get(url_for("index"))
assert b'unviewed' not in res.data
assert b'/test-binary-endpoint' in res.data
# Make a change
with open("test-datastore/test.bin", "wb") as f:
f.write(secrets.token_bytes())
# Trigger a check
client.get(url_for("api_watch_checknow"), follow_redirects=True)
# Give the thread time to pick it up
time.sleep(sleep_time_for_fetch_thread)
# It should report nothing found (no new 'unviewed' class)
res = client.get(url_for("index"))
assert b'unviewed' in res.data

View File

@@ -1,168 +0,0 @@
#!/usr/bin/python3
import time
from flask import url_for
from ..html_tools import *
from .util import live_server_setup
def test_setup(live_server):
live_server_setup(live_server)
def set_original_response():
test_return_data = """<html>
<header>
<h2>Header</h2>
</header>
<nav>
<ul>
<li><a href="#">A</a></li>
<li><a href="#">B</a></li>
<li><a href="#">C</a></li>
</ul>
</nav>
<body>
Some initial text</br>
<p>Which is across multiple lines</p>
</br>
So let's see what happens. </br>
<div id="changetext">Some text that will change</div>
</body>
<footer>
<p>Footer</p>
</footer>
</html>
"""
with open("test-datastore/endpoint-content.txt", "w") as f:
f.write(test_return_data)
def set_modified_response():
test_return_data = """<html>
<header>
<h2>Header changed</h2>
</header>
<nav>
<ul>
<li><a href="#">A changed</a></li>
<li><a href="#">B</a></li>
<li><a href="#">C</a></li>
</ul>
</nav>
<body>
Some initial text</br>
<p>Which is across multiple lines</p>
</br>
So let's see what happens. </br>
<div id="changetext">Some text that changes</div>
</body>
<footer>
<p>Footer changed</p>
</footer>
</html>
"""
with open("test-datastore/endpoint-content.txt", "w") as f:
f.write(test_return_data)
def test_element_removal_output():
from changedetectionio import fetch_site_status
from inscriptis import get_text
# Check text with sub-parts renders correctly
content = """<html>
<header>
<h2>Header</h2>
</header>
<nav>
<ul>
<li><a href="#">A</a></li>
</ul>
</nav>
<body>
Some initial text</br>
<p>across multiple lines</p>
<div id="changetext">Some text that changes</div>
</body>
<footer>
<p>Footer</p>
</footer>
</html>
"""
html_blob = element_removal(
["header", "footer", "nav", "#changetext"], html_content=content
)
text = get_text(html_blob)
assert (
text
== """Some initial text
across multiple lines
"""
)
def test_element_removal_full(client, live_server):
sleep_time_for_fetch_thread = 3
set_original_response()
# Give the endpoint time to spin up
time.sleep(1)
# Add our URL to the import page
test_url = url_for("test_endpoint", _external=True)
res = client.post(
url_for("import_page"), data={"urls": test_url}, follow_redirects=True
)
assert b"1 Imported" in res.data
# Goto the edit page, add the filter data
# Not sure why \r needs to be added - absent of the #changetext this is not necessary
subtractive_selectors_data = "header\r\nfooter\r\nnav\r\n#changetext"
res = client.post(
url_for("edit_page", uuid="first"),
data={
"subtractive_selectors": subtractive_selectors_data,
"url": test_url,
"tag": "",
"headers": "",
"fetch_backend": "html_requests",
},
follow_redirects=True,
)
assert b"Updated watch." in res.data
# Check it saved
res = client.get(
url_for("edit_page", uuid="first"),
)
assert bytes(subtractive_selectors_data.encode("utf-8")) in res.data
# Trigger a check
client.get(url_for("api_watch_checknow"), follow_redirects=True)
# Give the thread time to pick it up
time.sleep(sleep_time_for_fetch_thread)
# No change yet - first check
res = client.get(url_for("index"))
assert b"unviewed" not in res.data
# Make a change to header/footer/nav
set_modified_response()
# Trigger a check
client.get(url_for("api_watch_checknow"), follow_redirects=True)
# Give the thread time to pick it up
time.sleep(sleep_time_for_fetch_thread)
# There should not be an unviewed change, as changes should be removed
res = client.get(url_for("index"))
assert b"unviewed" not in res.data

View File

@@ -1,87 +0,0 @@
#!/usr/bin/python3
# coding=utf-8
import time
from flask import url_for
from .util import live_server_setup
import pytest
def test_setup(live_server):
live_server_setup(live_server)
def set_html_response():
test_return_data = """
<html><body><span class="nav_second_img_text">
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;铸大国重器,挺制造脊梁,致力能源未来,赋能美好生活。
</span>
</body></html>
"""
with open("test-datastore/endpoint-content.txt", "w") as f:
f.write(test_return_data)
return None
# In the case the server does not issue a charset= or doesnt have content_type header set
def test_check_encoding_detection(client, live_server):
set_html_response()
# Give the endpoint time to spin up
time.sleep(1)
# Add our URL to the import page
test_url = url_for('test_endpoint', content_type="text/html", _external=True)
client.post(
url_for("import_page"),
data={"urls": test_url},
follow_redirects=True
)
# Trigger a check
client.get(url_for("api_watch_checknow"), follow_redirects=True)
# Give the thread time to pick it up
time.sleep(2)
res = client.get(
url_for("preview_page", uuid="first"),
follow_redirects=True
)
# Should see the proper string
assert "铸大国重".encode('utf-8') in res.data
# Should not see the failed encoding
assert b'\xc2\xa7' not in res.data
# In the case the server does not issue a charset= or doesnt have content_type header set
def test_check_encoding_detection_missing_content_type_header(client, live_server):
set_html_response()
# Give the endpoint time to spin up
time.sleep(1)
# Add our URL to the import page
test_url = url_for('test_endpoint', _external=True)
client.post(
url_for("import_page"),
data={"urls": test_url},
follow_redirects=True
)
# Trigger a check
client.get(url_for("api_watch_checknow"), follow_redirects=True)
# Give the thread time to pick it up
time.sleep(2)
res = client.get(
url_for("preview_page", uuid="first"),
follow_redirects=True
)
# Should see the proper string
assert "铸大国重".encode('utf-8') in res.data
# Should not see the failed encoding
assert b'\xc2\xa7' not in res.data

View File

@@ -1,7 +1,6 @@
#!/usr/bin/python3
import time
from flask import url_for
from . util import live_server_setup
@@ -18,9 +17,7 @@ def test_error_handler(client, live_server):
time.sleep(1)
# Add our URL to the import page
test_url = url_for('test_endpoint',
status_code=403,
_external=True)
test_url = url_for('test_endpoint_403_error', _external=True)
res = client.post(
url_for("import_page"),
data={"urls": test_url},

View File

@@ -3,7 +3,6 @@
import time
from flask import url_for
from . util import live_server_setup
from changedetectionio import html_tools
def test_setup(live_server):
live_server_setup(live_server)
@@ -24,7 +23,7 @@ def test_strip_regex_text_func():
ignore_lines = ["sometimes", "/\s\d{2,3}\s/", "/ignore-case text/"]
fetcher = fetch_site_status.perform_site_check(datastore=False)
stripped_content = html_tools.strip_ignore_text(test_content, ignore_lines)
stripped_content = fetcher.strip_ignore_text(test_content, ignore_lines)
assert b"but 1 lines" in stripped_content
assert b"igNORe-cAse text" not in stripped_content

View File

@@ -3,7 +3,6 @@
import time
from flask import url_for
from . util import live_server_setup
from changedetectionio import html_tools
def test_setup(live_server):
live_server_setup(live_server)
@@ -24,7 +23,7 @@ def test_strip_text_func():
ignore_lines = ["sometimes"]
fetcher = fetch_site_status.perform_site_check(datastore=False)
stripped_content = html_tools.strip_ignore_text(test_content, ignore_lines)
stripped_content = fetcher.strip_ignore_text(test_content, ignore_lines)
assert b"sometimes" not in stripped_content
assert b"Some content" in stripped_content
@@ -53,8 +52,6 @@ def set_modified_original_ignore_response():
<p>Which is across multiple lines</p>
</br>
So let's see what happens. </br>
<p>new ignore stuff</p>
<p>blah</p>
</body>
</html>
@@ -70,7 +67,7 @@ def set_modified_ignore_response():
<body>
Some initial text</br>
<p>Which is across multiple lines</p>
<P>ZZZZz</P>
<P>ZZZZZ</P>
</br>
So let's see what happens. </br>
</body>
@@ -85,8 +82,7 @@ def set_modified_ignore_response():
def test_check_ignore_text_functionality(client, live_server):
sleep_time_for_fetch_thread = 3
# Use a mix of case in ZzZ to prove it works case-insensitive.
ignore_text = "XXXXX\r\nYYYYY\r\nzZzZZ\r\nnew ignore stuff"
ignore_text = "XXXXX\r\nYYYYY\r\nZZZZZ"
set_original_ignore_response()
# Give the endpoint time to spin up
@@ -146,25 +142,13 @@ def test_check_ignore_text_functionality(client, live_server):
assert b'unviewed' not in res.data
assert b'/test-endpoint' in res.data
# Just to be sure.. set a regular modified change..
set_modified_original_ignore_response()
client.get(url_for("api_watch_checknow"), follow_redirects=True)
time.sleep(sleep_time_for_fetch_thread)
res = client.get(url_for("index"))
assert b'unviewed' in res.data
# Check the preview/highlighter, we should be able to see what we ignored, but it should be highlighted
# We only introduce the "modified" content that includes what we ignore so we can prove the newest version also displays
# at /preview
res = client.get(url_for("preview_page", uuid="first"))
# We should be able to see what we ignored
assert b'<div class="ignored">new ignore stuff' in res.data
res = client.get(url_for("api_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data

View File

@@ -1,190 +0,0 @@
#!/usr/bin/python3
import time
from flask import url_for
from . util import live_server_setup
def test_setup(live_server):
live_server_setup(live_server)
def set_original_response():
test_return_data = """<html>
<body>
Some initial text</br>
<p>Which is across multiple lines</p>
</br>
So let's see what happens. </br>
</body>
</html>
"""
with open("test-datastore/endpoint-content.txt", "w") as f:
f.write(test_return_data)
def set_some_changed_response():
test_return_data = """<html>
<body>
Some initial text</br>
<p>Which is across multiple lines, and a new thing too.</p>
</br>
So let's see what happens. </br>
</body>
</html>
"""
with open("test-datastore/endpoint-content.txt", "w") as f:
f.write(test_return_data)
def test_normal_page_check_works_with_ignore_status_code(client, live_server):
sleep_time_for_fetch_thread = 3
# Give the endpoint time to spin up
time.sleep(1)
set_original_response()
# Goto the settings page, add our ignore text
res = client.post(
url_for("settings_page"),
data={
"minutes_between_check": 180,
"ignore_status_codes": "y",
'fetch_backend': "html_requests"
},
follow_redirects=True
)
assert b"Settings updated." in res.data
# Add our URL to the import page
test_url = url_for('test_endpoint', _external=True)
res = client.post(
url_for("import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
time.sleep(sleep_time_for_fetch_thread)
# Trigger a check
client.get(url_for("api_watch_checknow"), follow_redirects=True)
set_some_changed_response()
time.sleep(sleep_time_for_fetch_thread)
# Trigger a check
client.get(url_for("api_watch_checknow"), follow_redirects=True)
# Give the thread time to pick it up
time.sleep(sleep_time_for_fetch_thread)
# It should report nothing found (no new 'unviewed' class)
res = client.get(url_for("index"))
assert b'unviewed' in res.data
assert b'/test-endpoint' in res.data
# Tests the whole stack works with staus codes ignored
def test_403_page_check_works_with_ignore_status_code(client, live_server):
sleep_time_for_fetch_thread = 3
set_original_response()
# Give the endpoint time to spin up
time.sleep(1)
# Add our URL to the import page
test_url = url_for('test_endpoint', status_code=403, _external=True)
res = client.post(
url_for("import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
# Trigger a check
client.get(url_for("api_watch_checknow"), follow_redirects=True)
# Give the thread time to pick it up
time.sleep(sleep_time_for_fetch_thread)
# Goto the edit page, check our ignore option
# Add our URL to the import page
res = client.post(
url_for("edit_page", uuid="first"),
data={"ignore_status_codes": "y", "url": test_url, "tag": "", "headers": "", 'fetch_backend': "html_requests"},
follow_redirects=True
)
assert b"Updated watch." in res.data
# Trigger a check
client.get(url_for("api_watch_checknow"), follow_redirects=True)
# Give the thread time to pick it up
time.sleep(sleep_time_for_fetch_thread)
# Make a change
set_some_changed_response()
# Trigger a check
client.get(url_for("api_watch_checknow"), follow_redirects=True)
# Give the thread time to pick it up
time.sleep(sleep_time_for_fetch_thread)
# It should have 'unviewed' still
# Because it should be looking at only that 'sametext' id
res = client.get(url_for("index"))
assert b'unviewed' in res.data
# Tests the whole stack works with staus codes ignored
def test_403_page_check_fails_without_ignore_status_code(client, live_server):
sleep_time_for_fetch_thread = 3
set_original_response()
# Give the endpoint time to spin up
time.sleep(1)
# Add our URL to the import page
test_url = url_for('test_endpoint', status_code=403, _external=True)
res = client.post(
url_for("import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
# Trigger a check
client.get(url_for("api_watch_checknow"), follow_redirects=True)
# Give the thread time to pick it up
time.sleep(sleep_time_for_fetch_thread)
# Goto the edit page, check our ignore option
# Add our URL to the import page
res = client.post(
url_for("edit_page", uuid="first"),
data={"url": test_url, "tag": "", "headers": "", 'fetch_backend': "html_requests"},
follow_redirects=True
)
assert b"Updated watch." in res.data
# Trigger a check
client.get(url_for("api_watch_checknow"), follow_redirects=True)
# Give the thread time to pick it up
time.sleep(sleep_time_for_fetch_thread)
# Make a change
set_some_changed_response()
# Trigger a check
client.get(url_for("api_watch_checknow"), follow_redirects=True)
# Give the thread time to pick it up
time.sleep(sleep_time_for_fetch_thread)
# It should have 'unviewed' still
# Because it should be looking at only that 'sametext' id
res = client.get(url_for("index"))
assert b'Status Code 403' in res.data

View File

@@ -1,5 +1,4 @@
#!/usr/bin/python3
# coding=utf-8
import time
from flask import url_for
@@ -143,7 +142,7 @@ def set_modified_response():
}
],
"boss": {
"name": "Örnsköldsvik"
"name": "Foobar"
},
"available": false
}
@@ -163,7 +162,7 @@ def test_check_json_without_filter(client, live_server):
time.sleep(1)
# Add our URL to the import page
test_url = url_for('test_endpoint', content_type="application/json", _external=True)
test_url = url_for('test_endpoint_json', _external=True)
client.post(
url_for("import_page"),
data={"urls": test_url},
@@ -194,7 +193,7 @@ def test_check_json_filter(client, live_server):
time.sleep(1)
# Add our URL to the import page
test_url = url_for('test_endpoint', content_type="application/json", _external=True)
test_url = url_for('test_endpoint', _external=True)
res = client.post(
url_for("import_page"),
data={"urls": test_url},
@@ -247,10 +246,8 @@ def test_check_json_filter(client, live_server):
# Should not see this, because its not in the JSONPath we entered
res = client.get(url_for("diff_history_page", uuid="first"))
# But the change should be there, tho its hard to test the change was detected because it will show old and new versions
# And #462 - check we see the proper utf-8 string there
assert "Örnsköldsvik".encode('utf-8') in res.data
assert b'Foobar' in res.data
def test_check_json_filter_bool_val(client, live_server):
@@ -261,7 +258,7 @@ def test_check_json_filter_bool_val(client, live_server):
# Give the endpoint time to spin up
time.sleep(1)
test_url = url_for('test_endpoint', content_type="application/json", _external=True)
test_url = url_for('test_endpoint', _external=True)
res = client.post(
url_for("import_page"),
@@ -316,7 +313,7 @@ def test_check_json_ext_filter(client, live_server):
time.sleep(1)
# Add our URL to the import page
test_url = url_for('test_endpoint', content_type="application/json", _external=True)
test_url = url_for('test_endpoint', _external=True)
res = client.post(
url_for("import_page"),
data={"urls": test_url},

View File

@@ -77,42 +77,6 @@ def test_body_in_request(client, live_server):
# Add our URL to the import page
test_url = url_for('test_body', _external=True)
res = client.post(
url_for("import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
body_value = 'Test Body Value'
# Add a properly formatted body with a proper method
res = client.post(
url_for("edit_page", uuid="first"),
data={
"url": test_url,
"tag": "",
"method": "POST",
"fetch_backend": "html_requests",
"body": body_value},
follow_redirects=True
)
assert b"Updated watch." in res.data
time.sleep(3)
# The service should echo back the body
res = client.get(
url_for("preview_page", uuid="first"),
follow_redirects=True
)
# If this gets stuck something is wrong, something should always be there
assert b"No history found" not in res.data
# We should see what we sent in the reply
assert str.encode(body_value) in res.data
####### data sanity checks
# Add the test URL twice, we will check
res = client.post(
url_for("import_page"),
@@ -121,15 +85,14 @@ def test_body_in_request(client, live_server):
)
assert b"1 Imported" in res.data
watches_with_body = 0
with open('test-datastore/url-watches.json') as f:
app_struct = json.load(f)
for uuid in app_struct['watching']:
if app_struct['watching'][uuid]['body']==body_value:
watches_with_body += 1
res = client.post(
url_for("import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
# Should be only one with body set
assert watches_with_body==1
body_value = 'Test Body Value'
# Attempt to add a body with a GET method
res = client.post(
@@ -144,6 +107,40 @@ def test_body_in_request(client, live_server):
)
assert b"Body must be empty when Request Method is set to GET" in res.data
# Add a properly formatted body with a proper method
res = client.post(
url_for("edit_page", uuid="first"),
data={
"url": test_url,
"tag": "",
"method": "POST",
"fetch_backend": "html_requests",
"body": body_value},
follow_redirects=True
)
assert b"Updated watch." in res.data
# Give the thread time to pick up the first version
time.sleep(5)
# The service should echo back the body
res = client.get(
url_for("preview_page", uuid="first"),
follow_redirects=True
)
# Check if body returned contains the specified data
assert str.encode(body_value) in res.data
watches_with_body = 0
with open('test-datastore/url-watches.json') as f:
app_struct = json.load(f)
for uuid in app_struct['watching']:
if app_struct['watching'][uuid]['body']==body_value:
watches_with_body += 1
# Should be only one with body set
assert watches_with_body==1
def test_method_in_request(client, live_server):
# Add our URL to the import page

View File

@@ -1,36 +0,0 @@
from flask import url_for
from . util import set_original_response, set_modified_response, live_server_setup
import time
def test_setup(live_server):
live_server_setup(live_server)
def test_file_access(client, live_server):
res = client.post(
url_for("import_page"),
data={"urls": 'https://localhost'},
follow_redirects=True
)
assert b"1 Imported" in res.data
# Attempt to add a body with a GET method
res = client.post(
url_for("edit_page", uuid="first"),
data={
"url": 'file:///etc/passwd',
"tag": "",
"method": "GET",
"fetch_backend": "html_requests",
"body": ""},
follow_redirects=True
)
time.sleep(3)
res = client.get(
url_for("index", uuid="first"),
follow_redirects=True
)
assert b'denied for security reasons' in res.data

View File

@@ -129,8 +129,3 @@ def test_trigger_functionality(client, live_server):
time.sleep(sleep_time_for_fetch_thread)
res = client.get(url_for("index"))
assert b'unviewed' in res.data
# Check the preview/highlighter, we should be able to see what we triggered on, but it should be highlighted
res = client.get(url_for("preview_page", uuid="first"))
# We should be able to see what we ignored
assert b'<div class="triggered">foobar' in res.data

View File

@@ -96,7 +96,6 @@ def test_check_markup_xpath_filter_restriction(client, live_server):
res = client.get(url_for("index"))
assert b'unviewed' not in res.data
def test_xpath_validation(client, live_server):
# Give the endpoint time to spin up

View File

@@ -1,6 +1,5 @@
#!/usr/bin/python3
from flask import make_response, request
def set_original_response():
test_return_data = """<html>
@@ -38,24 +37,45 @@ def set_modified_response():
def live_server_setup(live_server):
@live_server.app.route('/test-binary-endpoint')
def test_binaryfile_endpoint():
from flask import make_response
# Tried using a global var here but didn't seem to work, so reading from a file instead.
with open("test-datastore/test.bin", "rb") as f:
resp = make_response(f.read())
resp.headers['Content-Type'] = 'image/jpeg'
return resp
@live_server.app.route('/test-endpoint')
def test_endpoint():
ctype = request.args.get('content_type')
status_code = request.args.get('status_code')
# Tried using a global var here but didn't seem to work, so reading from a file instead.
with open("test-datastore/endpoint-content.txt", "r") as f:
return f.read()
try:
# Tried using a global var here but didn't seem to work, so reading from a file instead.
with open("test-datastore/endpoint-content.txt", "r") as f:
resp = make_response(f.read(), status_code)
resp.headers['Content-Type'] = ctype if ctype else 'text/html'
return resp
except FileNotFoundError:
return make_response('', status_code)
@live_server.app.route('/test-endpoint-json')
def test_endpoint_json():
from flask import make_response
with open("test-datastore/endpoint-content.txt", "r") as f:
resp = make_response(f.read())
resp.headers['Content-Type'] = 'application/json'
return resp
@live_server.app.route('/test-403')
def test_endpoint_403_error():
from flask import make_response
resp = make_response('', 403)
return resp
# Just return the headers in the request
@live_server.app.route('/test-headers')
def test_headers():
from flask import request
output= []
for header in request.headers:
@@ -66,16 +86,24 @@ def live_server_setup(live_server):
# Just return the body in the request
@live_server.app.route('/test-body', methods=['POST', 'GET'])
def test_body():
from flask import request
return request.data
# Just return the verb in the request
@live_server.app.route('/test-method', methods=['POST', 'GET', 'PATCH'])
def test_method():
from flask import request
return request.method
# Where we POST to as a notification
@live_server.app.route('/test_notification_endpoint', methods=['POST', 'GET'])
def test_notification_endpoint():
from flask import request
with open("test-datastore/notification.txt", "wb") as f:
# Debug method, dump all POST to file also, used to prove #65
data = request.stream.read()
@@ -89,6 +117,8 @@ def live_server_setup(live_server):
# Just return the verb in the request
@live_server.app.route('/test-basicauth', methods=['GET'])
def test_basicauth_method():
from flask import request
auth = request.authorization
ret = " ".join([auth.username, auth.password, auth.type])
return ret

View File

@@ -49,6 +49,8 @@ class update_worker(threading.Thread):
# We then convert/.decode('utf-8') for the notification etc
if not isinstance(contents, (bytes, bytearray)):
raise Exception("Error - returned data from the fetch handler SHOULD be bytes")
except PermissionError as e:
self.app.logger.error("File permission error updating", uuid, str(e))
except content_fetcher.EmptyReply as e:
@@ -132,10 +134,8 @@ class update_worker(threading.Thread):
except Exception as e:
# Catch everything possible here, so that if a worker crashes, we don't lose it until restart!
print("!!!! Exception in update_worker !!!\n", e)
self.app.logger.error("Exception reached processing watch UUID: %s - %s", uuid, str(e))
self.datastore.update_watch(uuid=uuid, update_obj={'last_error': str(e)})
finally:
# Always record that we atleast tried
self.datastore.update_watch(uuid=uuid, update_obj={'fetch_time': round(time.time() - now, 3),
@@ -144,7 +144,4 @@ class update_worker(threading.Thread):
self.current_uuid = None # Done
self.q.task_done()
# Give the CPU time to interrupt
time.sleep(0.1)
self.app.config.exit.wait(1)

View File

@@ -1,9 +1,9 @@
version: '2'
services:
changedetection:
changedetection.io:
image: ghcr.io/dgtlmoon/changedetection.io
container_name: changedetection
hostname: changedetection
container_name: changedetection.io
hostname: changedetection.io
volumes:
- changedetection-data:/datastore

View File

@@ -1,9 +1,9 @@
flask~= 2.0
flask_wtf
eventlet>=0.31.0
validators
timeago ~=1.0
inscriptis ~= 2.2
inscriptis ~= 1.2
feedgen ~= 0.9
flask-login ~= 0.5
pytz
@@ -17,7 +17,7 @@ wtforms ~= 2.3.3
jsonpath-ng ~= 1.5.3
# Notification library
apprise ~= 0.9.7
apprise ~= 0.9.6
# apprise mqtt https://github.com/dgtlmoon/changedetection.io/issues/315
paho-mqtt
@@ -34,4 +34,5 @@ lxml
# 3.141 was missing socksVersion, 3.150 was not in pypi, so we try 4.1.0
selenium ~= 4.1.0
pytest ~=6.2
pytest-flask ~=1.2

View File

@@ -32,11 +32,11 @@ setup(
long_description_content_type='text/markdown',
keywords='website change monitor for changes notification change detection '
'alerts tracking website tracker change alert website and monitoring',
entry_points={"console_scripts": ["changedetection.io=changedetectionio.changedetection:main"]},
zip_safe=True,
scripts=["changedetection.py"],
zip_safe=False,
entry_points={"console_scripts": ["changedetection.io=changedetection:main"]},
author='dgtlmoon',
url='https://changedetection.io',
scripts=['changedetection.py'],
packages=['changedetectionio'],
include_package_data=True,
install_requires=install_requires,