mirror of
https://github.com/dgtlmoon/changedetection.io.git
synced 2025-11-14 13:36:09 +00:00
Compare commits
23 Commits
history-fi
...
RSS-per-wa
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
ff9b4fc32c | ||
|
|
9a6a131985 | ||
|
|
629f939224 | ||
|
|
48299e5738 | ||
|
|
5b1b70b8ab | ||
|
|
678d568b37 | ||
|
|
fb15b62fb9 | ||
|
|
8dc39d4a3d | ||
|
|
805cd618d4 | ||
|
|
4ba5fcce8f | ||
|
|
b9305faf21 | ||
|
|
3d3b53831e | ||
|
|
2ae29ab78f | ||
|
|
caffd804fe | ||
|
|
c58a97f69d | ||
|
|
e2b407c6f3 | ||
|
|
d65a2c784d | ||
|
|
9bc812a167 | ||
|
|
fd2080567d | ||
|
|
969c75e7be | ||
|
|
4b14cec5f4 | ||
|
|
a8d5ea067d | ||
|
|
2f6873f7d5 |
1
.gitignore
vendored
1
.gitignore
vendored
@@ -21,6 +21,7 @@ venv/
|
||||
# IDEs
|
||||
.idea
|
||||
.vscode/settings.json
|
||||
*~
|
||||
|
||||
# Datastore files
|
||||
datastore/
|
||||
|
||||
@@ -14,7 +14,7 @@ Ideal for monitoring price changes, content edits, conditional changes and more.
|
||||
[<img src="https://raw.githubusercontent.com/dgtlmoon/changedetection.io/master/docs/screenshot.png" style="max-width:100%;" alt="Self-hosted web page change monitoring, list of websites with changes" title="Self-hosted web page change monitoring, list of websites with changes" />](https://changedetection.io)
|
||||
|
||||
|
||||
[**Don't have time? Try our extremely affordable subscription use our proxies and support!**](https://changedetection.io)
|
||||
[**Don't have time? Try our extremely affordable subscription use our proxies and support!**](https://changedetection.io)
|
||||
|
||||
|
||||
|
||||
@@ -31,7 +31,7 @@ Available when connected to a <a href="https://github.com/dgtlmoon/changedetecti
|
||||
|
||||
### Perform interactive browser steps
|
||||
|
||||
Fill in text boxes, click buttons and more, setup your changedetection scenario.
|
||||
Fill in text boxes, click buttons and more, setup your changedetection scenario.
|
||||
|
||||
Using the **Browser Steps** configuration, add basic steps before performing change detection, such as logging into websites, adding a product to a cart, accept cookie logins, entering dates and refining searches.
|
||||
|
||||
@@ -54,7 +54,7 @@ Requires Playwright to be enabled.
|
||||
- Know when your favourite whiskey is on sale, or other special deals are announced before anyone else
|
||||
- COVID related news from government websites
|
||||
- University/organisation news from their website
|
||||
- Detect and monitor changes in JSON API responses
|
||||
- Detect and monitor changes in JSON API responses
|
||||
- JSON API monitoring and alerting
|
||||
- Changes in legal and other documents
|
||||
- Trigger API calls via notifications when text appears on a website
|
||||
@@ -86,7 +86,7 @@ _Need an actual Chrome runner with Javascript support? We support fetching via W
|
||||
|
||||
We [recommend and use Bright Data](https://brightdata.grsm.io/n0r16zf7eivq) global proxy services, Bright Data will match any first deposit up to $100 using our signup link.
|
||||
|
||||
[Oxylabs](https://oxylabs.go2cloud.org/SH2d) is also an excellent proxy provider and well worth using, they offer Residental, ISP, Rotating and many other proxy types to suit your project.
|
||||
[Oxylabs](https://oxylabs.go2cloud.org/SH2d) is also an excellent proxy provider and well worth using, they offer Residential, ISP, Rotating and many other proxy types to suit your project.
|
||||
|
||||
Please :star: star :star: this project and help it grow! https://github.com/dgtlmoon/changedetection.io/
|
||||
|
||||
@@ -106,4 +106,3 @@ $ changedetection.io -d /path/to/empty/data/dir -p 5000
|
||||
Then visit http://127.0.0.1:5000 , You should now be able to access the UI.
|
||||
|
||||
See https://changedetection.io for more information.
|
||||
|
||||
|
||||
@@ -64,7 +64,7 @@ def count_words_in_history(watch):
|
||||
return 0
|
||||
|
||||
latest_key = list(watch.history.keys())[-1]
|
||||
latest_content = watch.get_history_snapshot(latest_key)
|
||||
latest_content = watch.get_history_snapshot(timestamp=latest_key)
|
||||
return len(latest_content.split())
|
||||
except Exception as e:
|
||||
logger.error(f"Error counting words: {str(e)}")
|
||||
|
||||
@@ -2,7 +2,7 @@
|
||||
|
||||
# Read more https://github.com/dgtlmoon/changedetection.io/wiki
|
||||
|
||||
__version__ = '0.50.39'
|
||||
__version__ = '0.50.43'
|
||||
|
||||
from changedetectionio.strtobool import strtobool
|
||||
from json.decoder import JSONDecodeError
|
||||
@@ -74,6 +74,12 @@ def main():
|
||||
|
||||
datastore_path = None
|
||||
do_cleanup = False
|
||||
# Optional URL to watch since start
|
||||
default_url = None
|
||||
# Set a default logger level
|
||||
logger_level = 'DEBUG'
|
||||
include_default_watches = True
|
||||
|
||||
host = os.environ.get("LISTEN_HOST", "0.0.0.0").strip()
|
||||
port = int(os.environ.get('PORT', 5000))
|
||||
ssl_mode = False
|
||||
@@ -87,15 +93,13 @@ def main():
|
||||
datastore_path = os.path.join(os.getcwd(), "../datastore")
|
||||
|
||||
try:
|
||||
opts, args = getopt.getopt(sys.argv[1:], "6Ccsd:h:p:l:", "port")
|
||||
opts, args = getopt.getopt(sys.argv[1:], "6Ccsd:h:p:l:u:", "port")
|
||||
except getopt.GetoptError:
|
||||
print('backend.py -s SSL enable -h [host] -p [port] -d [datastore path] -l [debug level - TRACE, DEBUG(default), INFO, SUCCESS, WARNING, ERROR, CRITICAL]')
|
||||
print('backend.py -s SSL enable -h [host] -p [port] -d [datastore path] -u [default URL to watch] -l [debug level - TRACE, DEBUG(default), INFO, SUCCESS, WARNING, ERROR, CRITICAL]')
|
||||
sys.exit(2)
|
||||
|
||||
create_datastore_dir = False
|
||||
|
||||
# Set a default logger level
|
||||
logger_level = 'DEBUG'
|
||||
# Set a logger level via shell env variable
|
||||
# Used: Dockerfile for CICD
|
||||
# To set logger level for pytest, see the app function in tests/conftest.py
|
||||
@@ -116,6 +120,10 @@ def main():
|
||||
if opt == '-d':
|
||||
datastore_path = arg
|
||||
|
||||
if opt == '-u':
|
||||
default_url = arg
|
||||
include_default_watches = False
|
||||
|
||||
# Cleanup (remove text files that arent in the index)
|
||||
if opt == '-c':
|
||||
do_cleanup = True
|
||||
@@ -172,13 +180,16 @@ def main():
|
||||
sys.exit(2)
|
||||
|
||||
try:
|
||||
datastore = store.ChangeDetectionStore(datastore_path=app_config['datastore_path'], version_tag=__version__)
|
||||
datastore = store.ChangeDetectionStore(datastore_path=app_config['datastore_path'], version_tag=__version__, include_default_watches=include_default_watches)
|
||||
except JSONDecodeError as e:
|
||||
# Dont' start if the JSON DB looks corrupt
|
||||
logger.critical(f"ERROR: JSON DB or Proxy List JSON at '{app_config['datastore_path']}' appears to be corrupt, aborting.")
|
||||
logger.critical(str(e))
|
||||
return
|
||||
|
||||
if default_url:
|
||||
datastore.add_watch(url = default_url)
|
||||
|
||||
app = changedetection_app(app_config, datastore)
|
||||
|
||||
# Get the SocketIO instance from the Flask app (created in flask_app.py)
|
||||
|
||||
@@ -175,7 +175,7 @@ class WatchSingleHistory(Resource):
|
||||
response = make_response("No content found", 404)
|
||||
response.mimetype = "text/plain"
|
||||
else:
|
||||
content = watch.get_history_snapshot(timestamp)
|
||||
content = watch.get_history_snapshot(timestamp=timestamp)
|
||||
response = make_response(content, 200)
|
||||
response.mimetype = "text/plain"
|
||||
|
||||
|
||||
@@ -41,7 +41,7 @@ def get_openapi_spec():
|
||||
# Possibly for pip3 packages
|
||||
spec_path = os.path.join(os.path.dirname(__file__), '../docs/api-spec.yaml')
|
||||
|
||||
with open(spec_path, 'r') as f:
|
||||
with open(spec_path, 'r', encoding='utf-8') as f:
|
||||
spec_dict = yaml.safe_load(f)
|
||||
_openapi_spec = OpenAPI.from_dict(spec_dict)
|
||||
return _openapi_spec
|
||||
|
||||
@@ -353,12 +353,15 @@ async def async_update_worker(worker_id, q, notification_q, app, datastore):
|
||||
count = watch.get('check_count', 0) + 1
|
||||
|
||||
# Always record page title (used in notifications, and can change even when the content is the same)
|
||||
try:
|
||||
page_title = html_tools.extract_title(data=update_handler.fetcher.content)
|
||||
logger.debug(f"UUID: {uuid} Page <title> is '{page_title}'")
|
||||
datastore.update_watch(uuid=uuid, update_obj={'page_title': page_title})
|
||||
except Exception as e:
|
||||
logger.warning(f"UUID: {uuid} Exception when extracting <title> - {str(e)}")
|
||||
if update_obj.get('content-type') and 'html' in update_obj.get('content-type'):
|
||||
try:
|
||||
page_title = html_tools.extract_title(data=update_handler.fetcher.content)
|
||||
if page_title:
|
||||
page_title = page_title.strip()[:2000]
|
||||
logger.debug(f"UUID: {uuid} Page <title> is '{page_title}'")
|
||||
datastore.update_watch(uuid=uuid, update_obj={'page_title': page_title})
|
||||
except Exception as e:
|
||||
logger.warning(f"UUID: {uuid} Exception when extracting <title> - {str(e)}")
|
||||
|
||||
# Record server header
|
||||
try:
|
||||
|
||||
@@ -1 +1,17 @@
|
||||
RSS_FORMAT_TYPES = [('plaintext', 'Plain text'), ('html', 'HTML Color')]
|
||||
from copy import deepcopy
|
||||
from loguru import logger
|
||||
|
||||
from changedetectionio.model import USE_SYSTEM_DEFAULT_NOTIFICATION_FORMAT_FOR_WATCH
|
||||
from changedetectionio.notification import valid_notification_formats
|
||||
RSS_CONTENT_FORMAT_DEFAULT = 'text'
|
||||
|
||||
# Some stuff not related
|
||||
RSS_FORMAT_TYPES = deepcopy(valid_notification_formats)
|
||||
if RSS_FORMAT_TYPES.get('markdown'):
|
||||
del RSS_FORMAT_TYPES['markdown']
|
||||
|
||||
if RSS_FORMAT_TYPES.get(USE_SYSTEM_DEFAULT_NOTIFICATION_FORMAT_FOR_WATCH):
|
||||
del RSS_FORMAT_TYPES[USE_SYSTEM_DEFAULT_NOTIFICATION_FORMAT_FOR_WATCH]
|
||||
|
||||
if not RSS_FORMAT_TYPES.get(RSS_CONTENT_FORMAT_DEFAULT):
|
||||
logger.critical(f"RSS_CONTENT_FORMAT_DEFAULT not in the acceptable list {RSS_CONTENT_FORMAT_DEFAULT}")
|
||||
|
||||
97
changedetectionio/blueprint/rss/_util.py
Normal file
97
changedetectionio/blueprint/rss/_util.py
Normal file
@@ -0,0 +1,97 @@
|
||||
"""
|
||||
Utility functions for RSS feed generation.
|
||||
"""
|
||||
|
||||
from changedetectionio.jinja2_custom import render as jinja_render
|
||||
from changedetectionio.notification.handler import apply_service_tweaks
|
||||
from loguru import logger
|
||||
import re
|
||||
|
||||
|
||||
BAD_CHARS_REGEX = r'[\x00-\x08\x0B\x0C\x0E-\x1F]'
|
||||
|
||||
|
||||
def scan_invalid_chars_in_rss(content):
|
||||
"""
|
||||
Scan for invalid characters in RSS content.
|
||||
Returns True if invalid characters are found.
|
||||
"""
|
||||
for match in re.finditer(BAD_CHARS_REGEX, content):
|
||||
i = match.start()
|
||||
bad_char = content[i]
|
||||
hex_value = f"0x{ord(bad_char):02x}"
|
||||
# Grab context
|
||||
start = max(0, i - 20)
|
||||
end = min(len(content), i + 21)
|
||||
context = content[start:end].replace('\n', '\\n').replace('\r', '\\r')
|
||||
logger.warning(f"Invalid char {hex_value} at pos {i}: ...{context}...")
|
||||
# First match is enough
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
|
||||
def clean_entry_content(content):
|
||||
"""
|
||||
Remove invalid characters from RSS content.
|
||||
"""
|
||||
cleaned = re.sub(BAD_CHARS_REGEX, '', content)
|
||||
return cleaned
|
||||
|
||||
|
||||
def generate_watch_guid(watch):
|
||||
"""
|
||||
Generate a unique GUID for a watch RSS entry.
|
||||
"""
|
||||
return f"{watch['uuid']}/{watch.last_changed}"
|
||||
|
||||
|
||||
def generate_watch_diff_content(watch, dates, rss_content_format, datastore, date_index_from=-2, date_index_to=-1):
|
||||
"""
|
||||
Generate HTML diff content for a watch given its history dates.
|
||||
Returns tuple of (content, watch_label).
|
||||
|
||||
Args:
|
||||
watch: The watch object
|
||||
dates: List of history snapshot dates
|
||||
rss_content_format: Format for RSS content (html or text)
|
||||
datastore: The ChangeDetectionStore instance
|
||||
date_index_from: Index of the "from" date in the dates list (default: -2)
|
||||
date_index_to: Index of the "to" date in the dates list (default: -1)
|
||||
|
||||
Returns:
|
||||
Tuple of (content, watch_label) - the rendered HTML content and watch label
|
||||
"""
|
||||
from changedetectionio import diff
|
||||
|
||||
# Same logic as watch-overview.html
|
||||
if datastore.data['settings']['application']['ui'].get('use_page_title_in_list') or watch.get('use_page_title_in_list'):
|
||||
watch_label = watch.label
|
||||
else:
|
||||
watch_label = watch.get('url')
|
||||
|
||||
try:
|
||||
html_diff = diff.render_diff(
|
||||
previous_version_file_contents=watch.get_history_snapshot(timestamp=dates[date_index_from]),
|
||||
newest_version_file_contents=watch.get_history_snapshot(timestamp=dates[date_index_to]),
|
||||
include_equal=False
|
||||
)
|
||||
|
||||
requested_output_format = datastore.data['settings']['application'].get('rss_content_format')
|
||||
url, html_diff, n_title = apply_service_tweaks(url='', n_body=html_diff, n_title=None, requested_output_format=requested_output_format)
|
||||
|
||||
except FileNotFoundError as e:
|
||||
html_diff = f"History snapshot file for watch {watch.get('uuid')}@{watch.last_changed} - '{watch.get('title')} not found."
|
||||
|
||||
# @note: We use <pre> because nearly all RSS readers render only HTML (Thunderbird for example cant do just plaintext)
|
||||
rss_template = "<pre>{{watch_label}} had a change.\n\n{{html_diff}}\n</pre>"
|
||||
if 'html' in rss_content_format:
|
||||
rss_template = "<html><body>\n<h4><a href=\"{{watch_url}}\">{{watch_label}}</a></h4>\n<p>{{html_diff}}</p>\n</body></html>\n"
|
||||
|
||||
content = jinja_render(template_str=rss_template, watch_label=watch_label, html_diff=html_diff, watch_url=watch.link)
|
||||
|
||||
# Out of range chars could also break feedgen
|
||||
if scan_invalid_chars_in_rss(content):
|
||||
content = clean_entry_content(content)
|
||||
|
||||
return content, watch_label
|
||||
@@ -1,155 +1,26 @@
|
||||
|
||||
from changedetectionio.jinja2_custom import render as jinja_render
|
||||
from changedetectionio.notification.handler import apply_service_tweaks
|
||||
from changedetectionio.store import ChangeDetectionStore
|
||||
from feedgen.feed import FeedGenerator
|
||||
from flask import Blueprint, make_response, request, url_for, redirect
|
||||
from loguru import logger
|
||||
import datetime
|
||||
import pytz
|
||||
import re
|
||||
import time
|
||||
from flask import Blueprint
|
||||
|
||||
|
||||
BAD_CHARS_REGEX=r'[\x00-\x08\x0B\x0C\x0E-\x1F]'
|
||||
|
||||
# Anything that is not text/UTF-8 should be stripped before it breaks feedgen (such as binary data etc)
|
||||
def scan_invalid_chars_in_rss(content):
|
||||
for match in re.finditer(BAD_CHARS_REGEX, content):
|
||||
i = match.start()
|
||||
bad_char = content[i]
|
||||
hex_value = f"0x{ord(bad_char):02x}"
|
||||
# Grab context
|
||||
start = max(0, i - 20)
|
||||
end = min(len(content), i + 21)
|
||||
context = content[start:end].replace('\n', '\\n').replace('\r', '\\r')
|
||||
logger.warning(f"Invalid char {hex_value} at pos {i}: ...{context}...")
|
||||
# First match is enough
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
|
||||
def clean_entry_content(content):
|
||||
cleaned = re.sub(BAD_CHARS_REGEX, '', content)
|
||||
return cleaned
|
||||
from . import tag as tag_routes
|
||||
from . import main_feed
|
||||
from . import single_watch
|
||||
|
||||
def construct_blueprint(datastore: ChangeDetectionStore):
|
||||
"""
|
||||
Construct and configure the RSS blueprint with all routes.
|
||||
|
||||
Args:
|
||||
datastore: The ChangeDetectionStore instance
|
||||
|
||||
Returns:
|
||||
The configured Flask blueprint
|
||||
"""
|
||||
rss_blueprint = Blueprint('rss', __name__)
|
||||
|
||||
# Some RSS reader situations ended up with rss/ (forward slash after RSS) due
|
||||
# to some earlier blueprint rerouting work, it should goto feed.
|
||||
@rss_blueprint.route("/", methods=['GET'])
|
||||
def extraslash():
|
||||
return redirect(url_for('rss.feed'))
|
||||
|
||||
# Import the login decorator if needed
|
||||
# from changedetectionio.auth_decorator import login_optionally_required
|
||||
@rss_blueprint.route("", methods=['GET'])
|
||||
def feed():
|
||||
now = time.time()
|
||||
# Always requires token set
|
||||
app_rss_token = datastore.data['settings']['application'].get('rss_access_token')
|
||||
rss_url_token = request.args.get('token')
|
||||
if rss_url_token != app_rss_token:
|
||||
return "Access denied, bad token", 403
|
||||
|
||||
from changedetectionio import diff
|
||||
limit_tag = request.args.get('tag', '').lower().strip()
|
||||
# Be sure limit_tag is a uuid
|
||||
for uuid, tag in datastore.data['settings']['application'].get('tags', {}).items():
|
||||
if limit_tag == tag.get('title', '').lower().strip():
|
||||
limit_tag = uuid
|
||||
|
||||
# Sort by last_changed and add the uuid which is usually the key..
|
||||
sorted_watches = []
|
||||
|
||||
# @todo needs a .itemsWithTag() or something - then we can use that in Jinaj2 and throw this away
|
||||
for uuid, watch in datastore.data['watching'].items():
|
||||
# @todo tag notification_muted skip also (improve Watch model)
|
||||
if datastore.data['settings']['application'].get('rss_hide_muted_watches') and watch.get('notification_muted'):
|
||||
continue
|
||||
if limit_tag and not limit_tag in watch['tags']:
|
||||
continue
|
||||
watch['uuid'] = uuid
|
||||
sorted_watches.append(watch)
|
||||
|
||||
sorted_watches.sort(key=lambda x: x.last_changed, reverse=False)
|
||||
|
||||
fg = FeedGenerator()
|
||||
fg.title('changedetection.io')
|
||||
fg.description('Feed description')
|
||||
fg.link(href='https://changedetection.io')
|
||||
|
||||
html_colour_enable = False
|
||||
if datastore.data['settings']['application'].get('rss_content_format') == 'html':
|
||||
html_colour_enable = True
|
||||
|
||||
for watch in sorted_watches:
|
||||
|
||||
dates = list(watch.history.keys())
|
||||
# Re #521 - Don't bother processing this one if theres less than 2 snapshots, means we never had a change detected.
|
||||
if len(dates) < 2:
|
||||
continue
|
||||
|
||||
if not watch.viewed:
|
||||
# Re #239 - GUID needs to be individual for each event
|
||||
# @todo In the future make this a configurable link back (see work on BASE_URL https://github.com/dgtlmoon/changedetection.io/pull/228)
|
||||
guid = "{}/{}".format(watch['uuid'], watch.last_changed)
|
||||
fe = fg.add_entry()
|
||||
|
||||
# Include a link to the diff page, they will have to login here to see if password protection is enabled.
|
||||
# Description is the page you watch, link takes you to the diff JS UI page
|
||||
# Dict val base_url will get overriden with the env var if it is set.
|
||||
ext_base_url = datastore.data['settings']['application'].get('active_base_url')
|
||||
# @todo fix
|
||||
|
||||
# Because we are called via whatever web server, flask should figure out the right path (
|
||||
diff_link = {'href': url_for('ui.ui_views.diff_history_page', uuid=watch['uuid'], _external=True)}
|
||||
|
||||
fe.link(link=diff_link)
|
||||
|
||||
# Same logic as watch-overview.html
|
||||
if datastore.data['settings']['application']['ui'].get('use_page_title_in_list') or watch.get('use_page_title_in_list'):
|
||||
watch_label = watch.label
|
||||
else:
|
||||
watch_label = watch.get('url')
|
||||
|
||||
fe.title(title=watch_label)
|
||||
try:
|
||||
|
||||
html_diff = diff.render_diff(previous_version_file_contents=watch.get_history_snapshot(dates[-2]),
|
||||
newest_version_file_contents=watch.get_history_snapshot(dates[-1]),
|
||||
include_equal=False,
|
||||
line_feed_sep="<br>"
|
||||
)
|
||||
|
||||
|
||||
requested_output_format = 'htmlcolor' if html_colour_enable else 'html'
|
||||
html_diff = apply_service_tweaks(url='', n_body=html_diff, n_title=None, requested_output_format=requested_output_format)
|
||||
|
||||
except FileNotFoundError as e:
|
||||
html_diff = f"History snapshot file for watch {watch.get('uuid')}@{watch.last_changed} - '{watch.get('title')} not found."
|
||||
|
||||
# @todo Make this configurable and also consider html-colored markup
|
||||
# @todo User could decide if <link> goes to the diff page, or to the watch link
|
||||
rss_template = "<html><body>\n<h4><a href=\"{{watch_url}}\">{{watch_title}}</a></h4>\n<p>{{html_diff}}</p>\n</body></html>\n"
|
||||
|
||||
content = jinja_render(template_str=rss_template, watch_title=watch_label, html_diff=html_diff, watch_url=watch.link)
|
||||
|
||||
# Out of range chars could also break feedgen
|
||||
if scan_invalid_chars_in_rss(content):
|
||||
content = clean_entry_content(content)
|
||||
|
||||
fe.content(content=content, type='CDATA')
|
||||
fe.guid(guid, permalink=False)
|
||||
dt = datetime.datetime.fromtimestamp(int(watch.newest_history_key))
|
||||
dt = dt.replace(tzinfo=pytz.UTC)
|
||||
fe.pubDate(dt)
|
||||
|
||||
response = make_response(fg.rss_str())
|
||||
response.headers.set('Content-Type', 'application/rss+xml;charset=utf-8')
|
||||
logger.trace(f"RSS generated in {time.time() - now:.3f}s")
|
||||
return response
|
||||
# Register all route modules
|
||||
main_feed.construct_main_feed_routes(rss_blueprint, datastore)
|
||||
single_watch.construct_single_watch_routes(rss_blueprint, datastore)
|
||||
tag_routes.construct_tag_routes(rss_blueprint, datastore)
|
||||
|
||||
return rss_blueprint
|
||||
109
changedetectionio/blueprint/rss/main_feed.py
Normal file
109
changedetectionio/blueprint/rss/main_feed.py
Normal file
@@ -0,0 +1,109 @@
|
||||
from flask import make_response, request, url_for, redirect
|
||||
from feedgen.feed import FeedGenerator
|
||||
from loguru import logger
|
||||
import datetime
|
||||
import pytz
|
||||
import time
|
||||
|
||||
from ._util import generate_watch_guid, generate_watch_diff_content
|
||||
|
||||
|
||||
def construct_main_feed_routes(rss_blueprint, datastore):
|
||||
"""
|
||||
Construct the main RSS feed routes.
|
||||
|
||||
Args:
|
||||
rss_blueprint: The Flask blueprint to add routes to
|
||||
datastore: The ChangeDetectionStore instance
|
||||
"""
|
||||
|
||||
# Some RSS reader situations ended up with rss/ (forward slash after RSS) due
|
||||
# to some earlier blueprint rerouting work, it should goto feed.
|
||||
@rss_blueprint.route("/", methods=['GET'])
|
||||
def extraslash():
|
||||
return redirect(url_for('rss.feed'))
|
||||
|
||||
# Import the login decorator if needed
|
||||
# from changedetectionio.auth_decorator import login_optionally_required
|
||||
@rss_blueprint.route("", methods=['GET'])
|
||||
def feed():
|
||||
now = time.time()
|
||||
# Always requires token set
|
||||
app_rss_token = datastore.data['settings']['application'].get('rss_access_token')
|
||||
rss_url_token = request.args.get('token')
|
||||
rss_content_format = datastore.data['settings']['application'].get('rss_content_format')
|
||||
|
||||
if rss_url_token != app_rss_token:
|
||||
return "Access denied, bad token", 403
|
||||
|
||||
limit_tag = request.args.get('tag', '').lower().strip()
|
||||
# Be sure limit_tag is a uuid
|
||||
for uuid, tag in datastore.data['settings']['application'].get('tags', {}).items():
|
||||
if limit_tag == tag.get('title', '').lower().strip():
|
||||
limit_tag = uuid
|
||||
|
||||
# Sort by last_changed and add the uuid which is usually the key..
|
||||
sorted_watches = []
|
||||
|
||||
# @todo needs a .itemsWithTag() or something - then we can use that in Jinaj2 and throw this away
|
||||
for uuid, watch in datastore.data['watching'].items():
|
||||
# @todo tag notification_muted skip also (improve Watch model)
|
||||
if datastore.data['settings']['application'].get('rss_hide_muted_watches') and watch.get('notification_muted'):
|
||||
continue
|
||||
if limit_tag and not limit_tag in watch['tags']:
|
||||
continue
|
||||
watch['uuid'] = uuid
|
||||
sorted_watches.append(watch)
|
||||
|
||||
sorted_watches.sort(key=lambda x: x.last_changed, reverse=False)
|
||||
|
||||
fg = FeedGenerator()
|
||||
fg.title('changedetection.io')
|
||||
fg.description('Feed description')
|
||||
fg.link(href='https://changedetection.io')
|
||||
|
||||
for watch in sorted_watches:
|
||||
|
||||
dates = list(watch.history.keys())
|
||||
# Re #521 - Don't bother processing this one if theres less than 2 snapshots, means we never had a change detected.
|
||||
if len(dates) < 2:
|
||||
continue
|
||||
|
||||
if not watch.viewed:
|
||||
# Re #239 - GUID needs to be individual for each event
|
||||
# @todo In the future make this a configurable link back (see work on BASE_URL https://github.com/dgtlmoon/changedetection.io/pull/228)
|
||||
guid = generate_watch_guid(watch)
|
||||
fe = fg.add_entry()
|
||||
|
||||
# Include a link to the diff page, they will have to login here to see if password protection is enabled.
|
||||
# Description is the page you watch, link takes you to the diff JS UI page
|
||||
# Dict val base_url will get overriden with the env var if it is set.
|
||||
ext_base_url = datastore.data['settings']['application'].get('active_base_url')
|
||||
# @todo fix
|
||||
|
||||
# Because we are called via whatever web server, flask should figure out the right path (
|
||||
diff_link = {'href': url_for('ui.ui_views.diff_history_page', uuid=watch['uuid'], _external=True)}
|
||||
|
||||
fe.link(link=diff_link)
|
||||
|
||||
content, watch_label = generate_watch_diff_content(watch, dates, rss_content_format, datastore)
|
||||
|
||||
fe.title(title=watch_label)
|
||||
fe.content(content=content, type='CDATA')
|
||||
fe.guid(guid, permalink=False)
|
||||
dt = datetime.datetime.fromtimestamp(int(watch.newest_history_key))
|
||||
dt = dt.replace(tzinfo=pytz.UTC)
|
||||
fe.pubDate(dt)
|
||||
|
||||
# Add categories based on watch tags
|
||||
for tag_uuid in watch.get('tags', []):
|
||||
tag = datastore.data['settings']['application'].get('tags', {}).get(tag_uuid)
|
||||
if tag:
|
||||
tag_title = tag.get('title', '')
|
||||
if tag_title:
|
||||
fe.category(term=tag_title)
|
||||
|
||||
response = make_response(fg.rss_str())
|
||||
response.headers.set('Content-Type', 'application/rss+xml;charset=utf-8')
|
||||
logger.trace(f"RSS generated in {time.time() - now:.3f}s")
|
||||
return response
|
||||
159
changedetectionio/blueprint/rss/single_watch.py
Normal file
159
changedetectionio/blueprint/rss/single_watch.py
Normal file
@@ -0,0 +1,159 @@
|
||||
from flask import make_response, request, url_for
|
||||
from feedgen.feed import FeedGenerator
|
||||
import datetime
|
||||
import pytz
|
||||
import locale
|
||||
|
||||
from ._util import generate_watch_guid, generate_watch_diff_content
|
||||
|
||||
|
||||
def construct_single_watch_routes(rss_blueprint, datastore):
|
||||
"""
|
||||
Construct RSS feed routes for single watches.
|
||||
|
||||
Args:
|
||||
rss_blueprint: The Flask blueprint to add routes to
|
||||
datastore: The ChangeDetectionStore instance
|
||||
"""
|
||||
|
||||
@rss_blueprint.route("/watch/<string:uuid>", methods=['GET'])
|
||||
def rss_single_watch(uuid):
|
||||
"""
|
||||
Display the most recent changes for a single watch as RSS feed.
|
||||
Returns RSS XML with multiple entries showing diffs between consecutive snapshots.
|
||||
The number of entries is controlled by the rss_diff_length setting.
|
||||
"""
|
||||
# Always requires token set
|
||||
app_rss_token = datastore.data['settings']['application'].get('rss_access_token')
|
||||
rss_url_token = request.args.get('token')
|
||||
rss_content_format = datastore.data['settings']['application'].get('rss_content_format')
|
||||
|
||||
if rss_url_token != app_rss_token:
|
||||
return "Access denied, bad token", 403
|
||||
|
||||
# Get the watch by UUID
|
||||
watch = datastore.data['watching'].get(uuid)
|
||||
if not watch:
|
||||
return f"Watch with UUID {uuid} not found", 404
|
||||
|
||||
# Check if watch has at least 2 history snapshots
|
||||
dates = list(watch.history.keys())
|
||||
if len(dates) < 2:
|
||||
return f"Watch {uuid} does not have enough history snapshots to show changes (need at least 2)", 400
|
||||
|
||||
# Add uuid to watch for proper functioning
|
||||
watch['uuid'] = uuid
|
||||
|
||||
# Get the number of diffs to include (default: 5)
|
||||
rss_diff_length = datastore.data['settings']['application'].get('rss_diff_length', 5)
|
||||
|
||||
# Calculate how many diffs we can actually show (limited by available history)
|
||||
# We need at least 2 snapshots to create 1 diff
|
||||
max_possible_diffs = len(dates) - 1
|
||||
num_diffs = min(rss_diff_length, max_possible_diffs) if rss_diff_length > 0 else max_possible_diffs
|
||||
|
||||
# Create RSS feed
|
||||
fg = FeedGenerator()
|
||||
|
||||
# Set title: use "label (url)" if label differs from url, otherwise just url
|
||||
watch_url = watch.get('url', '')
|
||||
watch_label = watch.label
|
||||
if watch_label and watch_label != watch_url:
|
||||
feed_title = f'changedetection.io - {watch_label} ({watch_url})'
|
||||
else:
|
||||
feed_title = f'changedetection.io - {watch_url}'
|
||||
|
||||
fg.title(feed_title)
|
||||
fg.description('Changes')
|
||||
fg.link(href='https://changedetection.io')
|
||||
|
||||
# Loop through history and create RSS entries for each diff
|
||||
# Add entries in reverse order because feedgen reverses them
|
||||
# This way, the newest change appears first in the final RSS
|
||||
for i in range(num_diffs - 1, -1, -1):
|
||||
# Calculate indices for this diff (working backwards from newest)
|
||||
# i=0: compare dates[-2] to dates[-1] (most recent change)
|
||||
# i=1: compare dates[-3] to dates[-2] (previous change)
|
||||
# etc.
|
||||
date_index_to = -(i + 1)
|
||||
date_index_from = -(i + 2)
|
||||
|
||||
try:
|
||||
# Generate the diff content for this pair of snapshots
|
||||
timestamp_to = dates[date_index_to]
|
||||
timestamp_from = dates[date_index_from]
|
||||
|
||||
content, watch_label = generate_watch_diff_content(
|
||||
watch, dates, rss_content_format, datastore,
|
||||
date_index_from=date_index_from,
|
||||
date_index_to=date_index_to
|
||||
)
|
||||
|
||||
# Generate edit watch link and add to content
|
||||
edit_watch_url = url_for('ui.ui_edit.edit_page',
|
||||
uuid=watch['uuid'],
|
||||
_external=True)
|
||||
|
||||
# Add edit watch links at top and bottom of content
|
||||
if 'html' in rss_content_format:
|
||||
edit_link_html = f'<p><a href="{edit_watch_url}">[edit watch]</a></p>'
|
||||
# Insert after <body> and before </body>
|
||||
content = content.replace('<body>', f'<body>\n{edit_link_html}', 1)
|
||||
content = content.replace('</body>', f'{edit_link_html}\n</body>', 1)
|
||||
else:
|
||||
# For plain text format, add plain text links in separate <pre> blocks
|
||||
edit_link_top = f'<pre>[edit watch] {edit_watch_url}</pre>\n'
|
||||
edit_link_bottom = f'\n<pre>[edit watch] {edit_watch_url}</pre>'
|
||||
content = edit_link_top + content + edit_link_bottom
|
||||
|
||||
# Create a unique GUID for this specific diff
|
||||
guid = f"{watch['uuid']}/{timestamp_to}"
|
||||
|
||||
fe = fg.add_entry()
|
||||
|
||||
# Include a link to the diff page with specific versions
|
||||
diff_link = {'href': url_for('ui.ui_views.diff_history_page',
|
||||
uuid=watch['uuid'],
|
||||
from_version=timestamp_from,
|
||||
to_version=timestamp_to,
|
||||
_external=True)}
|
||||
fe.link(link=diff_link)
|
||||
|
||||
# Format the date using locale-aware formatting with timezone
|
||||
dt = datetime.datetime.fromtimestamp(int(timestamp_to))
|
||||
dt = dt.replace(tzinfo=pytz.UTC)
|
||||
|
||||
# Get local timezone-aware datetime
|
||||
local_tz = datetime.datetime.now().astimezone().tzinfo
|
||||
local_dt = dt.astimezone(local_tz)
|
||||
|
||||
# Format date with timezone - using strftime for locale awareness
|
||||
try:
|
||||
formatted_date = local_dt.strftime('%Y-%m-%d %H:%M:%S %Z')
|
||||
except:
|
||||
# Fallback if locale issues
|
||||
formatted_date = local_dt.isoformat()
|
||||
|
||||
# Use formatted date in title instead of "Change 1, 2, 3"
|
||||
fe.title(title=f"{watch_label} - Change @ {formatted_date}")
|
||||
fe.content(content=content, type='CDATA')
|
||||
fe.guid(guid, permalink=False)
|
||||
|
||||
# Use the timestamp of the "to" snapshot for pubDate
|
||||
fe.pubDate(dt)
|
||||
|
||||
# Add categories based on watch tags
|
||||
for tag_uuid in watch.get('tags', []):
|
||||
tag = datastore.data['settings']['application'].get('tags', {}).get(tag_uuid)
|
||||
if tag:
|
||||
tag_title = tag.get('title', '')
|
||||
if tag_title:
|
||||
fe.category(term=tag_title)
|
||||
|
||||
except (IndexError, FileNotFoundError) as e:
|
||||
# Skip this diff if we can't generate it
|
||||
continue
|
||||
|
||||
response = make_response(fg.rss_str())
|
||||
response.headers.set('Content-Type', 'application/rss+xml;charset=utf-8')
|
||||
return response
|
||||
94
changedetectionio/blueprint/rss/tag.py
Normal file
94
changedetectionio/blueprint/rss/tag.py
Normal file
@@ -0,0 +1,94 @@
|
||||
from flask import make_response, request, url_for
|
||||
from feedgen.feed import FeedGenerator
|
||||
import datetime
|
||||
import pytz
|
||||
|
||||
|
||||
from ._util import generate_watch_guid, generate_watch_diff_content
|
||||
|
||||
|
||||
def construct_tag_routes(rss_blueprint, datastore):
|
||||
"""
|
||||
Construct RSS feed routes for tags.
|
||||
|
||||
Args:
|
||||
rss_blueprint: The Flask blueprint to add routes to
|
||||
datastore: The ChangeDetectionStore instance
|
||||
"""
|
||||
|
||||
@rss_blueprint.route("/tag/<string:tag_uuid>", methods=['GET'])
|
||||
def rss_tag_feed(tag_uuid):
|
||||
"""
|
||||
Display an RSS feed for all unviewed watches that belong to a specific tag.
|
||||
Returns RSS XML with entries for each unviewed watch with sufficient history.
|
||||
"""
|
||||
# Always requires token set
|
||||
app_rss_token = datastore.data['settings']['application'].get('rss_access_token')
|
||||
rss_url_token = request.args.get('token')
|
||||
rss_content_format = datastore.data['settings']['application'].get('rss_content_format')
|
||||
|
||||
if rss_url_token != app_rss_token:
|
||||
return "Access denied, bad token", 403
|
||||
|
||||
# Verify tag exists
|
||||
tag = datastore.data['settings']['application'].get('tags', {}).get(tag_uuid)
|
||||
if not tag:
|
||||
return f"Tag with UUID {tag_uuid} not found", 404
|
||||
|
||||
tag_title = tag.get('title', 'Unknown Tag')
|
||||
|
||||
# Create RSS feed
|
||||
fg = FeedGenerator()
|
||||
fg.title(f'changedetection.io - {tag_title}')
|
||||
fg.description(f'Changes for watches tagged with {tag_title}')
|
||||
fg.link(href='https://changedetection.io')
|
||||
|
||||
# Find all watches with this tag
|
||||
for uuid, watch in datastore.data['watching'].items():
|
||||
# Skip if watch doesn't have this tag
|
||||
if tag_uuid not in watch.get('tags', []):
|
||||
continue
|
||||
|
||||
# Skip muted watches if configured
|
||||
if datastore.data['settings']['application'].get('rss_hide_muted_watches') and watch.get('notification_muted'):
|
||||
continue
|
||||
|
||||
# Check if watch has at least 2 history snapshots
|
||||
dates = list(watch.history.keys())
|
||||
if len(dates) < 2:
|
||||
continue
|
||||
|
||||
# Only include unviewed watches
|
||||
if not watch.viewed:
|
||||
# Add uuid to watch for proper functioning
|
||||
watch['uuid'] = uuid
|
||||
|
||||
# Generate GUID for this entry
|
||||
guid = generate_watch_guid(watch)
|
||||
fe = fg.add_entry()
|
||||
|
||||
# Include a link to the diff page
|
||||
diff_link = {'href': url_for('ui.ui_views.diff_history_page', uuid=watch['uuid'], _external=True)}
|
||||
fe.link(link=diff_link)
|
||||
|
||||
# Generate diff content
|
||||
content, watch_label = generate_watch_diff_content(watch, dates, rss_content_format, datastore)
|
||||
|
||||
fe.title(title=watch_label)
|
||||
fe.content(content=content, type='CDATA')
|
||||
fe.guid(guid, permalink=False)
|
||||
dt = datetime.datetime.fromtimestamp(int(watch.newest_history_key))
|
||||
dt = dt.replace(tzinfo=pytz.UTC)
|
||||
fe.pubDate(dt)
|
||||
|
||||
# Add categories based on watch tags
|
||||
for tag_uuid in watch.get('tags', []):
|
||||
tag = datastore.data['settings']['application'].get('tags', {}).get(tag_uuid)
|
||||
if tag:
|
||||
tag_title = tag.get('title', '')
|
||||
if tag_title:
|
||||
fe.category(term=tag_title)
|
||||
|
||||
response = make_response(fg.rss_str())
|
||||
response.headers.set('Content-Type', 'application/rss+xml;charset=utf-8')
|
||||
return response
|
||||
@@ -24,6 +24,7 @@
|
||||
<li class="tab"><a href="#filters">Global Filters</a></li>
|
||||
<li class="tab"><a href="#ui-options">UI Options</a></li>
|
||||
<li class="tab"><a href="#api">API</a></li>
|
||||
<li class="tab"><a href="#rss">RSS</a></li>
|
||||
<li class="tab"><a href="#timedate">Time & Date</a></li>
|
||||
<li class="tab"><a href="#proxies">CAPTCHA & Proxies</a></li>
|
||||
</ul>
|
||||
@@ -43,10 +44,6 @@
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
<div class="pure-control-group">
|
||||
{{ render_field(form.requests.form.jitter_seconds, class="jitter_seconds") }}
|
||||
<span class="pure-form-message-inline">Example - 3 seconds random jitter could trigger up to 3 seconds earlier or up to 3 seconds later</span>
|
||||
</div>
|
||||
<div class="pure-control-group">
|
||||
{{ render_field(form.application.form.filter_failure_notification_threshold_attempts, class="filter_failure_notification_threshold_attempts") }}
|
||||
<span class="pure-form-message-inline">After this many consecutive times that the CSS/xPath filter is missing, send a notification
|
||||
@@ -76,19 +73,6 @@
|
||||
{{ render_checkbox_field(form.application.form.empty_pages_are_a_change) }}
|
||||
<span class="pure-form-message-inline">When a request returns no content, or the HTML does not contain any text, is this considered a change?</span>
|
||||
</div>
|
||||
<div class="grey-form-border">
|
||||
<div class="pure-control-group">
|
||||
{{ render_checkbox_field(form.application.form.rss_hide_muted_watches) }}
|
||||
</div>
|
||||
<div class="pure-control-group">
|
||||
{{ render_field(form.application.form.rss_content_format) }}
|
||||
<span class="pure-form-message-inline">Love RSS? Does your reader support HTML? Set it here</span>
|
||||
</div>
|
||||
<div class="pure-control-group">
|
||||
{{ render_checkbox_field(form.application.form.rss_reader_mode) }}
|
||||
<span class="pure-form-message-inline">Transforms RSS/RDF feed watches into beautiful text only</span>
|
||||
</div>
|
||||
</div>
|
||||
</fieldset>
|
||||
</div>
|
||||
|
||||
@@ -131,6 +115,10 @@
|
||||
<span class="pure-form-message-inline">Number of concurrent workers to process watches. More workers = faster processing but higher memory usage.<br>
|
||||
Currently running: <strong>{{ worker_info.count }}</strong> operational {{ worker_info.type }} workers{% if worker_info.active_workers > 0 %} ({{ worker_info.active_workers }} actively processing){% endif %}.</span>
|
||||
</div>
|
||||
<div class="pure-control-group">
|
||||
{{ render_field(form.requests.form.jitter_seconds, class="jitter_seconds") }}
|
||||
<span class="pure-form-message-inline">Example - 3 seconds random jitter could trigger up to 3 seconds earlier or up to 3 seconds later</span>
|
||||
</div>
|
||||
<div class="pure-control-group">
|
||||
{{ render_field(form.requests.form.timeout) }}
|
||||
<span class="pure-form-message-inline">For regular plain requests (not chrome based), maximum number of seconds until timeout, 1-999.<br>
|
||||
@@ -230,6 +218,24 @@ nav
|
||||
</p>
|
||||
</div>
|
||||
</div>
|
||||
<div class="tab-pane-inner" id="rss">
|
||||
<div class="pure-control-group">
|
||||
{{ render_checkbox_field(form.application.form.rss_hide_muted_watches) }}
|
||||
</div>
|
||||
<div class="pure-control-group">
|
||||
{{ render_field(form.application.form.rss_content_format) }}
|
||||
<span class="pure-form-message-inline">Love RSS? Does your reader support HTML? Set it here</span>
|
||||
</div>
|
||||
<div class="pure-control-group">
|
||||
{{ render_field(form.application.form.rss_diff_length) }}
|
||||
<span class="pure-form-message-inline">Maximum number of history snapshots to include in the watch specific RSS feed.</span>
|
||||
</div>
|
||||
<div class="pure-control-group">
|
||||
{{ render_checkbox_field(form.application.form.rss_reader_mode) }}
|
||||
<span class="pure-form-message-inline">For watching other RSS feeds - When watching RSS/Atom feeds, convert them into clean text for better change detection.</span>
|
||||
</div>
|
||||
</div>
|
||||
|
||||
<div class="tab-pane-inner" id="timedate">
|
||||
<div class="pure-control-group">
|
||||
Ensure the settings below are correct, they are used to manage the time schedule for checking your web page watches.
|
||||
|
||||
@@ -21,9 +21,10 @@ def construct_blueprint(datastore: ChangeDetectionStore):
|
||||
tag_count = Counter(tag for watch in datastore.data['watching'].values() if watch.get('tags') for tag in watch['tags'])
|
||||
|
||||
output = render_template("groups-overview.html",
|
||||
app_rss_token=datastore.data['settings']['application'].get('rss_access_token'),
|
||||
available_tags=sorted_tags,
|
||||
form=add_form,
|
||||
tag_count=tag_count
|
||||
tag_count=tag_count,
|
||||
)
|
||||
|
||||
return output
|
||||
@@ -149,9 +150,9 @@ def construct_blueprint(datastore: ChangeDetectionStore):
|
||||
included_content = template.render(**template_args)
|
||||
|
||||
output = render_template("edit-tag.html",
|
||||
settings_application=datastore.data['settings']['application'],
|
||||
extra_tab_content=form.extra_tab_content() if form.extra_tab_content() else None,
|
||||
extra_form_content=included_content,
|
||||
extra_tab_content=form.extra_tab_content() if form.extra_tab_content() else None,
|
||||
settings_application=datastore.data['settings']['application'],
|
||||
**template_args
|
||||
)
|
||||
|
||||
|
||||
@@ -52,6 +52,7 @@
|
||||
<a class="pure-button pure-button-primary" href="{{ url_for('tags.form_tag_edit', uuid=uuid) }}">Edit</a>
|
||||
<a class="pure-button pure-button-primary" href="{{ url_for('tags.delete', uuid=uuid) }}" title="Deletes and removes tag">Delete</a>
|
||||
<a class="pure-button pure-button-primary" href="{{ url_for('tags.unlink', uuid=uuid) }}" title="Keep the tag but unlink any watches">Unlink</a>
|
||||
<a href="{{ url_for('rss.rss_tag_feed', tag_uuid=uuid, token=app_rss_token)}}"><img alt="RSS Feed for this watch" style="padding-left: 1em;" src="{{url_for('static_content', group='images', filename='generic_feed-icon.svg')}}" height="15"></a>
|
||||
</td>
|
||||
</tr>
|
||||
{% endfor %}
|
||||
|
||||
@@ -236,7 +236,7 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, queuedWatchMe
|
||||
|
||||
# Import the global plugin system
|
||||
from changedetectionio.pluggy_interface import collect_ui_edit_stats_extras
|
||||
|
||||
app_rss_token = datastore.data['settings']['application'].get('rss_access_token'),
|
||||
template_args = {
|
||||
'available_processors': processors.available_processors(),
|
||||
'available_timezones': sorted(available_timezones()),
|
||||
@@ -252,6 +252,11 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, queuedWatchMe
|
||||
'has_special_tag_options': _watch_has_tag_options_set(watch=watch),
|
||||
'jq_support': jq_support,
|
||||
'playwright_enabled': os.getenv('PLAYWRIGHT_DRIVER_URL', False),
|
||||
'app_rss_token': app_rss_token,
|
||||
'rss_uuid_feed' : {
|
||||
'label': watch.label,
|
||||
'url': url_for('rss.rss_single_watch', uuid=watch['uuid'], token=app_rss_token)
|
||||
},
|
||||
'settings_application': datastore.data['settings']['application'],
|
||||
'system_has_playwright_configured': os.getenv('PLAYWRIGHT_DRIVER_URL'),
|
||||
'system_has_webdriver_configured': os.getenv('WEBDRIVER_URL'),
|
||||
|
||||
@@ -106,7 +106,7 @@ def construct_blueprint(datastore: ChangeDetectionStore):
|
||||
trigger_text = watch.get('trigger_text', [])
|
||||
# Add text that was triggered
|
||||
if len(dates):
|
||||
snapshot_contents = watch.get_history_snapshot(dates[-1])
|
||||
snapshot_contents = watch.get_history_snapshot(timestamp=dates[-1])
|
||||
else:
|
||||
snapshot_contents = "No snapshot/history available, the watch should fetch atleast once."
|
||||
|
||||
@@ -123,8 +123,8 @@ def construct_blueprint(datastore: ChangeDetectionStore):
|
||||
|
||||
|
||||
if len(dates) > 1:
|
||||
prev_snapshot = watch.get_history_snapshot(dates[-2])
|
||||
current_snapshot = watch.get_history_snapshot(dates[-1])
|
||||
prev_snapshot = watch.get_history_snapshot(timestamp=dates[-2])
|
||||
current_snapshot = watch.get_history_snapshot(timestamp=dates[-1])
|
||||
|
||||
n_object.update(set_basic_notification_vars(snapshot_contents=snapshot_contents,
|
||||
current_snapshot=current_snapshot,
|
||||
|
||||
@@ -476,6 +476,7 @@ Math: {{ 1 + 1 }}") }}
|
||||
class="pure-button button-error">Clear History</a>{% endif %}
|
||||
<a href="{{url_for('ui.form_clone', uuid=uuid)}}"
|
||||
class="pure-button">Clone & Edit</a>
|
||||
<a href="{{ url_for('rss.rss_single_watch', uuid=uuid, token=app_rss_token)}}"><img alt="RSS Feed for this watch" style="padding: .5em 1em;" src="{{url_for('static_content', group='images', filename='generic_feed-icon.svg')}}" height="15"></a>
|
||||
</div>
|
||||
</div>
|
||||
</form>
|
||||
|
||||
@@ -47,7 +47,7 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, queuedWatchMe
|
||||
|
||||
try:
|
||||
versions = list(watch.history.keys())
|
||||
content = watch.get_history_snapshot(timestamp)
|
||||
content = watch.get_history_snapshot(timestamp=timestamp)
|
||||
|
||||
triggered_line_numbers = html_tools.strip_ignore_text(content=content,
|
||||
wordlist=watch['trigger_text'],
|
||||
|
||||
@@ -14,7 +14,7 @@ def count_words_in_history(watch, incoming_text=None):
|
||||
elif watch.history.keys():
|
||||
# When called from UI extras to count latest snapshot
|
||||
latest_key = list(watch.history.keys())[-1]
|
||||
latest_content = watch.get_history_snapshot(latest_key)
|
||||
latest_content = watch.get_history_snapshot(timestamp=latest_key)
|
||||
return len(latest_content.split())
|
||||
return 0
|
||||
except Exception as e:
|
||||
|
||||
@@ -139,7 +139,7 @@ class fetcher(Fetcher):
|
||||
content = await self.page.content()
|
||||
destination = os.path.join(self.browser_steps_screenshot_path, 'step_{}.html'.format(step_n))
|
||||
logger.debug(f"Saving step HTML to {destination}")
|
||||
with open(destination, 'w') as f:
|
||||
with open(destination, 'w', encoding='utf-8') as f:
|
||||
f.write(content)
|
||||
|
||||
async def run(self,
|
||||
|
||||
@@ -101,12 +101,12 @@ def init_app_secret(datastore_path):
|
||||
path = os.path.join(datastore_path, "secret.txt")
|
||||
|
||||
try:
|
||||
with open(path, "r") as f:
|
||||
with open(path, "r", encoding='utf-8') as f:
|
||||
secret = f.read()
|
||||
|
||||
except FileNotFoundError:
|
||||
import secrets
|
||||
with open(path, "w") as f:
|
||||
with open(path, "w", encoding='utf-8') as f:
|
||||
secret = secrets.token_hex(32)
|
||||
f.write(secret)
|
||||
|
||||
|
||||
@@ -503,7 +503,9 @@ class ValidateJinja2Template(object):
|
||||
jinja2_env = create_jinja_env(loader=BaseLoader)
|
||||
|
||||
# Add notification tokens for validation
|
||||
jinja2_env.globals.update(NotificationContextData())
|
||||
static_token_placeholders = NotificationContextData()
|
||||
static_token_placeholders.set_random_for_validation()
|
||||
jinja2_env.globals.update(static_token_placeholders)
|
||||
if hasattr(field, 'extra_notification_tokens'):
|
||||
jinja2_env.globals.update(field.extra_notification_tokens)
|
||||
|
||||
@@ -998,7 +1000,7 @@ class globalSettingsApplicationForm(commonSettingsForm):
|
||||
validators=[validators.NumberRange(min=0,
|
||||
message="Should be atleast zero (disabled)")])
|
||||
|
||||
rss_content_format = SelectField('RSS Content format', choices=RSS_FORMAT_TYPES)
|
||||
rss_content_format = SelectField('RSS Content format', choices=list(RSS_FORMAT_TYPES.items()))
|
||||
|
||||
removepassword_button = SubmitField('Remove password', render_kw={"class": "pure-button pure-button-primary"})
|
||||
render_anchor_tag_content = BooleanField('Render anchor tag content', default=False)
|
||||
@@ -1007,8 +1009,10 @@ class globalSettingsApplicationForm(commonSettingsForm):
|
||||
rss_hide_muted_watches = BooleanField('Hide muted watches from RSS feed', default=True,
|
||||
validators=[validators.Optional()])
|
||||
|
||||
rss_reader_mode = BooleanField('RSS reader mode ', default=False,
|
||||
validators=[validators.Optional()])
|
||||
rss_reader_mode = BooleanField('RSS reader mode ', default=False, validators=[validators.Optional()])
|
||||
rss_diff_length = IntegerField(label='Number of changes to show in watch RSS feed',
|
||||
render_kw={"style": "width: 5em;"},
|
||||
validators=[validators.NumberRange(min=0, message="Should contain zero or more attempts")])
|
||||
|
||||
filter_failure_notification_threshold_attempts = IntegerField('Number of times the filter can be missing before sending a notification',
|
||||
render_kw={"style": "width: 5em;"},
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
from os import getenv
|
||||
from copy import deepcopy
|
||||
|
||||
from changedetectionio.blueprint.rss import RSS_FORMAT_TYPES
|
||||
from changedetectionio.blueprint.rss import RSS_FORMAT_TYPES, RSS_CONTENT_FORMAT_DEFAULT
|
||||
|
||||
from changedetectionio.notification import (
|
||||
default_notification_body,
|
||||
@@ -54,7 +54,8 @@ class model(dict):
|
||||
'password': False,
|
||||
'render_anchor_tag_content': False,
|
||||
'rss_access_token': None,
|
||||
'rss_content_format': RSS_FORMAT_TYPES[0][0],
|
||||
'rss_content_format': RSS_CONTENT_FORMAT_DEFAULT,
|
||||
'rss_diff_length': 5,
|
||||
'rss_hide_muted_watches': True,
|
||||
'rss_reader_mode': False,
|
||||
'scheduler_timezone_default': None, # Default IANA timezone name
|
||||
@@ -81,7 +82,7 @@ class model(dict):
|
||||
|
||||
def parse_headers_from_text_file(filepath):
|
||||
headers = {}
|
||||
with open(filepath, 'r') as f:
|
||||
with open(filepath, 'r', encoding='utf-8') as f:
|
||||
for l in f.readlines():
|
||||
l = l.strip()
|
||||
if not l.startswith('#') and ':' in l:
|
||||
|
||||
@@ -188,7 +188,7 @@ class model(watch_base):
|
||||
fname = os.path.join(self.watch_data_dir, "history.txt")
|
||||
if os.path.isfile(fname):
|
||||
logger.debug(f"Reading watch history index for {self.get('uuid')}")
|
||||
with open(fname, "r") as f:
|
||||
with open(fname, "r", encoding='utf-8') as f:
|
||||
for i in f.readlines():
|
||||
if ',' in i:
|
||||
k, v = i.strip().split(',', 2)
|
||||
@@ -276,9 +276,17 @@ class model(watch_base):
|
||||
# When the 'last viewed' timestamp is less than the oldest snapshot, return oldest
|
||||
return sorted_keys[-1]
|
||||
|
||||
def get_history_snapshot(self, timestamp):
|
||||
def get_history_snapshot(self, timestamp=None, filepath=None):
|
||||
"""
|
||||
Accepts either timestamp or filepath
|
||||
:param timestamp:
|
||||
:param filepath:
|
||||
:return:
|
||||
"""
|
||||
import brotli
|
||||
filepath = self.history[timestamp]
|
||||
|
||||
if not filepath:
|
||||
filepath = self.history[timestamp]
|
||||
|
||||
# See if a brotli versions exists and switch to that
|
||||
if not filepath.endswith('.br') and os.path.isfile(f"{filepath}.br"):
|
||||
@@ -382,7 +390,7 @@ class model(watch_base):
|
||||
# Compare each lines (set) against each history text file (set) looking for something new..
|
||||
existing_history = set({})
|
||||
for k, v in self.history.items():
|
||||
content = self.get_history_snapshot(k)
|
||||
content = self.get_history_snapshot(filepath=v)
|
||||
|
||||
if ignore_whitespace:
|
||||
alist = set([line.translate(TRANSLATE_WHITESPACE_TABLE).lower() for line in content.splitlines()])
|
||||
@@ -586,7 +594,7 @@ class model(watch_base):
|
||||
"""Return the text saved from a previous request that resulted in a non-200 error"""
|
||||
fname = os.path.join(self.watch_data_dir, "last-error.txt")
|
||||
if os.path.isfile(fname):
|
||||
with open(fname, 'r') as f:
|
||||
with open(fname, 'r', encoding='utf-8') as f:
|
||||
return f.read()
|
||||
return False
|
||||
|
||||
@@ -639,7 +647,7 @@ class model(watch_base):
|
||||
for k, fname in self.history.items():
|
||||
if os.path.isfile(fname):
|
||||
if True:
|
||||
contents = self.get_history_snapshot(k)
|
||||
contents = self.get_history_snapshot(timestamp=k)
|
||||
res = re.findall(regex, contents, re.MULTILINE)
|
||||
if res:
|
||||
if not csv_writer:
|
||||
@@ -732,7 +740,7 @@ class model(watch_base):
|
||||
# If a previous attempt doesnt yet exist, just snarf the previous snapshot instead
|
||||
dates = list(self.history.keys())
|
||||
if len(dates):
|
||||
return self.get_history_snapshot(dates[-1])
|
||||
return self.get_history_snapshot(timestamp=dates[-1])
|
||||
else:
|
||||
return ''
|
||||
|
||||
|
||||
@@ -187,6 +187,8 @@ def replace_placemarkers_in_text(text, url, requested_output_format):
|
||||
|
||||
def apply_service_tweaks(url, n_body, n_title, requested_output_format):
|
||||
|
||||
logger.debug(f"Applying markup in '{requested_output_format}' mode")
|
||||
|
||||
# Re 323 - Limit discord length to their 2000 char limit total or it wont send.
|
||||
# Because different notifications may require different pre-processing, run each sequentially :(
|
||||
# 2000 bytes minus -
|
||||
|
||||
@@ -133,7 +133,7 @@ class NotificationService:
|
||||
|
||||
# Add text that was triggered
|
||||
if len(dates):
|
||||
snapshot_contents = watch.get_history_snapshot(dates[-1])
|
||||
snapshot_contents = watch.get_history_snapshot(timestamp=dates[-1])
|
||||
else:
|
||||
snapshot_contents = "No snapshot/history available, the watch should fetch atleast once."
|
||||
|
||||
@@ -154,8 +154,8 @@ class NotificationService:
|
||||
current_snapshot = "Example text: example test\nExample text: change detection is fantastic\nExample text: even more examples\nExample text: a lot more examples"
|
||||
|
||||
if len(dates) > 1:
|
||||
prev_snapshot = watch.get_history_snapshot(dates[-2])
|
||||
current_snapshot = watch.get_history_snapshot(dates[-1])
|
||||
prev_snapshot = watch.get_history_snapshot(timestamp=dates[-2])
|
||||
current_snapshot = watch.get_history_snapshot(timestamp=dates[-1])
|
||||
|
||||
|
||||
n_object.update(set_basic_notification_vars(snapshot_contents=snapshot_contents,
|
||||
|
||||
@@ -280,7 +280,7 @@ class ContentProcessor:
|
||||
|
||||
# Sort JSON to avoid false alerts from reordering
|
||||
try:
|
||||
content = json.dumps(json.loads(content), sort_keys=True, indent=4)
|
||||
content = json.dumps(json.loads(content), sort_keys=True, indent=2, ensure_ascii=False)
|
||||
except Exception:
|
||||
# Might be malformed JSON, continue anyway
|
||||
pass
|
||||
|
||||
@@ -37,18 +37,6 @@ class SignalHandler:
|
||||
notification_event_signal.connect(self.handle_notification_event, weak=False)
|
||||
logger.info("SignalHandler: Connected to notification_event signal")
|
||||
|
||||
# Create and start the queue update thread using standard threading
|
||||
import threading
|
||||
self.polling_emitter_thread = threading.Thread(
|
||||
target=self.polling_emit_running_or_queued_watches_threaded,
|
||||
daemon=True
|
||||
)
|
||||
self.polling_emitter_thread.start()
|
||||
logger.info("Started polling thread using threading (eventlet-free)")
|
||||
|
||||
# Store the thread reference in socketio for clean shutdown
|
||||
self.socketio_instance.polling_emitter_thread = self.polling_emitter_thread
|
||||
|
||||
def handle_signal(self, *args, **kwargs):
|
||||
logger.trace(f"SignalHandler: Signal received with {len(args)} args and {len(kwargs)} kwargs")
|
||||
# Safely extract the watch UUID from kwargs
|
||||
@@ -124,74 +112,6 @@ class SignalHandler:
|
||||
except Exception as e:
|
||||
logger.error(f"Socket.IO error in handle_notification_event: {str(e)}")
|
||||
|
||||
def polling_emit_running_or_queued_watches_threaded(self):
|
||||
"""Threading version of polling for Windows compatibility"""
|
||||
import time
|
||||
import threading
|
||||
logger.info("Queue update thread started (threading mode)")
|
||||
|
||||
# Import here to avoid circular imports
|
||||
from changedetectionio.flask_app import app
|
||||
from changedetectionio import worker_handler
|
||||
watch_check_update = signal('watch_check_update')
|
||||
|
||||
# Track previous state to avoid unnecessary emissions
|
||||
previous_running_uuids = set()
|
||||
|
||||
# Run until app shutdown - check exit flag more frequently for fast shutdown
|
||||
exit_event = getattr(app.config, 'exit', threading.Event())
|
||||
|
||||
while not exit_event.is_set():
|
||||
try:
|
||||
# Get current running UUIDs from async workers
|
||||
running_uuids = set(worker_handler.get_running_uuids())
|
||||
|
||||
# Only send updates for UUIDs that changed state
|
||||
newly_running = running_uuids - previous_running_uuids
|
||||
no_longer_running = previous_running_uuids - running_uuids
|
||||
|
||||
# Send updates for newly running UUIDs (but exit fast if shutdown requested)
|
||||
for uuid in newly_running:
|
||||
if exit_event.is_set():
|
||||
break
|
||||
logger.trace(f"Threading polling: UUID {uuid} started processing")
|
||||
with app.app_context():
|
||||
watch_check_update.send(app_context=app, watch_uuid=uuid)
|
||||
time.sleep(0.01) # Small yield
|
||||
|
||||
# Send updates for UUIDs that finished processing (but exit fast if shutdown requested)
|
||||
if not exit_event.is_set():
|
||||
for uuid in no_longer_running:
|
||||
if exit_event.is_set():
|
||||
break
|
||||
logger.trace(f"Threading polling: UUID {uuid} finished processing")
|
||||
with app.app_context():
|
||||
watch_check_update.send(app_context=app, watch_uuid=uuid)
|
||||
time.sleep(0.01) # Small yield
|
||||
|
||||
# Update tracking for next iteration
|
||||
previous_running_uuids = running_uuids
|
||||
|
||||
# Sleep between polling cycles, but check exit flag every 0.5 seconds for fast shutdown
|
||||
for _ in range(20): # 20 * 0.5 = 10 seconds total
|
||||
if exit_event.is_set():
|
||||
break
|
||||
time.sleep(0.5)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error in threading polling: {str(e)}")
|
||||
# Even during error recovery, check for exit quickly
|
||||
for _ in range(1): # 1 * 0.5 = 0.5 seconds
|
||||
if exit_event.is_set():
|
||||
break
|
||||
time.sleep(0.5)
|
||||
|
||||
# Check if we're in pytest environment - if so, be more gentle with logging
|
||||
import sys
|
||||
in_pytest = "pytest" in sys.modules or "PYTEST_CURRENT_TEST" in os.environ
|
||||
|
||||
if not in_pytest:
|
||||
logger.info("Queue update thread stopped (threading mode)")
|
||||
|
||||
|
||||
def handle_watch_update(socketio, **kwargs):
|
||||
@@ -383,19 +303,6 @@ def init_socketio(app, datastore):
|
||||
"""Shutdown the SocketIO server fast and aggressively"""
|
||||
try:
|
||||
logger.info("Socket.IO: Fast shutdown initiated...")
|
||||
|
||||
# For threading mode, give the thread a very short time to exit gracefully
|
||||
if hasattr(socketio, 'polling_emitter_thread'):
|
||||
if socketio.polling_emitter_thread.is_alive():
|
||||
logger.info("Socket.IO: Waiting 1 second for polling thread to stop...")
|
||||
socketio.polling_emitter_thread.join(timeout=1.0) # Only 1 second timeout
|
||||
if socketio.polling_emitter_thread.is_alive():
|
||||
logger.info("Socket.IO: Polling thread still running after timeout - continuing with shutdown")
|
||||
else:
|
||||
logger.info("Socket.IO: Polling thread stopped quickly")
|
||||
else:
|
||||
logger.info("Socket.IO: Polling thread already stopped")
|
||||
|
||||
logger.info("Socket.IO: Fast shutdown complete")
|
||||
except Exception as e:
|
||||
logger.error(f"Socket.IO error during shutdown: {str(e)}")
|
||||
|
||||
@@ -11,6 +11,56 @@ set -e
|
||||
|
||||
SCRIPT_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd )
|
||||
|
||||
# Since theres no curl installed lets roll with python3
|
||||
check_sanity() {
|
||||
local port="$1"
|
||||
if [ -z "$port" ]; then
|
||||
echo "Usage: check_sanity <port>" >&2
|
||||
return 1
|
||||
fi
|
||||
|
||||
python3 - "$port" <<'PYCODE'
|
||||
import sys, time, urllib.request, socket
|
||||
|
||||
port = sys.argv[1]
|
||||
url = f'http://localhost:{port}'
|
||||
ok = False
|
||||
|
||||
for _ in range(6): # --retry 6
|
||||
try:
|
||||
r = urllib.request.urlopen(url, timeout=3).read().decode()
|
||||
if 'est-url-is-sanity' in r:
|
||||
ok = True
|
||||
break
|
||||
except (urllib.error.URLError, ConnectionRefusedError, socket.error):
|
||||
time.sleep(1)
|
||||
sys.exit(0 if ok else 1)
|
||||
PYCODE
|
||||
}
|
||||
|
||||
data_sanity_test () {
|
||||
# Restart data sanity test
|
||||
cd ..
|
||||
TMPDIR=$(mktemp -d)
|
||||
PORT_N=$((5000 + RANDOM % (6501 - 5000)))
|
||||
./changedetection.py -p $PORT_N -d $TMPDIR -u "https://localhost?test-url-is-sanity=1" &
|
||||
PID=$!
|
||||
sleep 5
|
||||
kill $PID
|
||||
sleep 2
|
||||
./changedetection.py -p $PORT_N -d $TMPDIR &
|
||||
PID=$!
|
||||
sleep 5
|
||||
# On a restart the URL should still be there
|
||||
check_sanity $PORT_N || exit 1
|
||||
kill $PID
|
||||
cd $OLDPWD
|
||||
|
||||
# datastore looks alright, continue
|
||||
}
|
||||
|
||||
data_sanity_test
|
||||
|
||||
# REMOVE_REQUESTS_OLD_SCREENSHOTS disabled so that we can write a screenshot and send it in test_notifications.py without a real browser
|
||||
REMOVE_REQUESTS_OLD_SCREENSHOTS=false pytest -n 30 --dist load tests/test_*.py
|
||||
|
||||
@@ -41,3 +91,6 @@ FETCH_WORKERS=130 pytest tests/test_history_consistency.py -v -l
|
||||
# Check file:// will pickup a file when enabled
|
||||
echo "Hello world" > /tmp/test-file.txt
|
||||
ALLOW_FILE_URI=yes pytest tests/test_security.py
|
||||
|
||||
|
||||
|
||||
|
||||
@@ -6,6 +6,7 @@ from flask import (
|
||||
flash
|
||||
)
|
||||
|
||||
from .blueprint.rss import RSS_CONTENT_FORMAT_DEFAULT
|
||||
from .html_tools import TRANSLATE_WHITESPACE_TABLE
|
||||
from .model import App, Watch, USE_SYSTEM_DEFAULT_NOTIFICATION_FORMAT_FOR_WATCH
|
||||
from copy import deepcopy, copy
|
||||
@@ -22,6 +23,13 @@ import uuid as uuid_builder
|
||||
from loguru import logger
|
||||
from blinker import signal
|
||||
|
||||
# Try to import orjson for faster JSON serialization
|
||||
try:
|
||||
import orjson
|
||||
HAS_ORJSON = True
|
||||
except ImportError:
|
||||
HAS_ORJSON = False
|
||||
|
||||
from .processors import get_custom_watch_obj_for_processor
|
||||
from .processors.restock_diff import Restock
|
||||
|
||||
@@ -37,7 +45,7 @@ class ChangeDetectionStore:
|
||||
lock = Lock()
|
||||
# For general updates/writes that can wait a few seconds
|
||||
needs_write = False
|
||||
|
||||
datastore_path = None
|
||||
# For when we edit, we should write to disk
|
||||
needs_write_urgent = False
|
||||
|
||||
@@ -47,18 +55,30 @@ class ChangeDetectionStore:
|
||||
def __init__(self, datastore_path="/datastore", include_default_watches=True, version_tag="0.0.0"):
|
||||
# Should only be active for docker
|
||||
# logging.basicConfig(filename='/dev/stdout', level=logging.INFO)
|
||||
|
||||
self.datastore_path = datastore_path
|
||||
self.needs_write = False
|
||||
self.start_time = time.time()
|
||||
self.stop_thread = False
|
||||
self.save_version_copy_json_db(version_tag)
|
||||
self.reload_state(datastore_path=datastore_path, include_default_watches=include_default_watches, version_tag=version_tag)
|
||||
|
||||
def save_version_copy_json_db(self, version_tag):
|
||||
import re
|
||||
|
||||
version_text = re.sub(r'\D+', '-', version_tag)
|
||||
db_path = os.path.join(self.datastore_path, "url-watches.json")
|
||||
db_path_version_backup = os.path.join(self.datastore_path, f"url-watches-{version_text}.json")
|
||||
|
||||
if not os.path.isfile(db_path_version_backup) and os.path.isfile(db_path):
|
||||
from shutil import copyfile
|
||||
logger.info(f"Backing up JSON DB due to new version to '{db_path_version_backup}'.")
|
||||
copyfile(db_path, db_path_version_backup)
|
||||
|
||||
|
||||
def reload_state(self, datastore_path, include_default_watches, version_tag):
|
||||
logger.info(f"Datastore path is '{datastore_path}'")
|
||||
|
||||
self.__data = App.model()
|
||||
self.datastore_path = datastore_path
|
||||
self.json_store_path = os.path.join(self.datastore_path, "url-watches.json")
|
||||
# Base definition for all watchers
|
||||
# deepcopy part of #569 - not sure why its needed exactly
|
||||
@@ -71,37 +91,46 @@ class ChangeDetectionStore:
|
||||
self.__data['build_sha'] = f.read()
|
||||
|
||||
try:
|
||||
# @todo retest with ", encoding='utf-8'"
|
||||
with open(self.json_store_path) as json_file:
|
||||
from_disk = json.load(json_file)
|
||||
if HAS_ORJSON:
|
||||
# orjson.loads() expects UTF-8 encoded bytes #3611
|
||||
with open(self.json_store_path, 'rb') as json_file:
|
||||
from_disk = orjson.loads(json_file.read())
|
||||
else:
|
||||
with open(self.json_store_path, encoding='utf-8') as json_file:
|
||||
from_disk = json.load(json_file)
|
||||
|
||||
# @todo isnt there a way todo this dict.update recursively?
|
||||
# Problem here is if the one on the disk is missing a sub-struct, it wont be present anymore.
|
||||
if 'watching' in from_disk:
|
||||
self.__data['watching'].update(from_disk['watching'])
|
||||
if not from_disk:
|
||||
# No FileNotFound exception was thrown but somehow the JSON was empty - abort for safety.
|
||||
logger.critical(f"JSON DB existed but was empty on load - empty JSON file? '{self.json_store_path}' Aborting")
|
||||
raise Exception('JSON DB existed but was empty on load - Aborting')
|
||||
|
||||
if 'app_guid' in from_disk:
|
||||
self.__data['app_guid'] = from_disk['app_guid']
|
||||
# @todo isnt there a way todo this dict.update recursively?
|
||||
# Problem here is if the one on the disk is missing a sub-struct, it wont be present anymore.
|
||||
if 'watching' in from_disk:
|
||||
self.__data['watching'].update(from_disk['watching'])
|
||||
|
||||
if 'settings' in from_disk:
|
||||
if 'headers' in from_disk['settings']:
|
||||
self.__data['settings']['headers'].update(from_disk['settings']['headers'])
|
||||
if 'app_guid' in from_disk:
|
||||
self.__data['app_guid'] = from_disk['app_guid']
|
||||
|
||||
if 'requests' in from_disk['settings']:
|
||||
self.__data['settings']['requests'].update(from_disk['settings']['requests'])
|
||||
if 'settings' in from_disk:
|
||||
if 'headers' in from_disk['settings']:
|
||||
self.__data['settings']['headers'].update(from_disk['settings']['headers'])
|
||||
|
||||
if 'application' in from_disk['settings']:
|
||||
self.__data['settings']['application'].update(from_disk['settings']['application'])
|
||||
if 'requests' in from_disk['settings']:
|
||||
self.__data['settings']['requests'].update(from_disk['settings']['requests'])
|
||||
|
||||
# Convert each existing watch back to the Watch.model object
|
||||
for uuid, watch in self.__data['watching'].items():
|
||||
self.__data['watching'][uuid] = self.rehydrate_entity(uuid, watch)
|
||||
logger.info(f"Watching: {uuid} {watch['url']}")
|
||||
if 'application' in from_disk['settings']:
|
||||
self.__data['settings']['application'].update(from_disk['settings']['application'])
|
||||
|
||||
# And for Tags also, should be Restock type because it has extra settings
|
||||
for uuid, tag in self.__data['settings']['application']['tags'].items():
|
||||
self.__data['settings']['application']['tags'][uuid] = self.rehydrate_entity(uuid, tag, processor_override='restock_diff')
|
||||
logger.info(f"Tag: {uuid} {tag['title']}")
|
||||
# Convert each existing watch back to the Watch.model object
|
||||
for uuid, watch in self.__data['watching'].items():
|
||||
self.__data['watching'][uuid] = self.rehydrate_entity(uuid, watch)
|
||||
logger.info(f"Watching: {uuid} {watch['url']}")
|
||||
|
||||
# And for Tags also, should be Restock type because it has extra settings
|
||||
for uuid, tag in self.__data['settings']['application']['tags'].items():
|
||||
self.__data['settings']['application']['tags'][uuid] = self.rehydrate_entity(uuid, tag, processor_override='restock_diff')
|
||||
logger.info(f"Tag: {uuid} {tag['title']}")
|
||||
|
||||
# First time ran, Create the datastore.
|
||||
except (FileNotFoundError):
|
||||
@@ -426,9 +455,15 @@ class ChangeDetectionStore:
|
||||
# Re #286 - First write to a temp file, then confirm it looks OK and rename it
|
||||
# This is a fairly basic strategy to deal with the case that the file is corrupted,
|
||||
# system was out of memory, out of RAM etc
|
||||
with open(self.json_store_path+".tmp", 'w') as json_file:
|
||||
# Use compact JSON in production for better performance
|
||||
json.dump(data, json_file, indent=2)
|
||||
if HAS_ORJSON:
|
||||
# Use orjson for faster serialization
|
||||
# orjson.dumps() always returns UTF-8 encoded bytes #3611
|
||||
with open(self.json_store_path+".tmp", 'wb') as json_file:
|
||||
json_file.write(orjson.dumps(data, option=orjson.OPT_INDENT_2))
|
||||
else:
|
||||
# Fallback to standard json module
|
||||
with open(self.json_store_path+".tmp", 'w', encoding='utf-8') as json_file:
|
||||
json.dump(data, json_file, indent=2, ensure_ascii=False)
|
||||
os.replace(self.json_store_path+".tmp", self.json_store_path)
|
||||
except Exception as e:
|
||||
logger.error(f"Error writing JSON!! (Main JSON file save was skipped) : {str(e)}")
|
||||
@@ -490,8 +525,13 @@ class ChangeDetectionStore:
|
||||
|
||||
# Load from external config file
|
||||
if path.isfile(proxy_list_file):
|
||||
with open(os.path.join(self.datastore_path, "proxies.json")) as f:
|
||||
proxy_list = json.load(f)
|
||||
if HAS_ORJSON:
|
||||
# orjson.loads() expects UTF-8 encoded bytes #3611
|
||||
with open(os.path.join(self.datastore_path, "proxies.json"), 'rb') as f:
|
||||
proxy_list = orjson.loads(f.read())
|
||||
else:
|
||||
with open(os.path.join(self.datastore_path, "proxies.json"), encoding='utf-8') as f:
|
||||
proxy_list = json.load(f)
|
||||
|
||||
# Mapping from UI config if available
|
||||
extras = self.data['settings']['requests'].get('extra_proxies')
|
||||
@@ -736,6 +776,28 @@ class ChangeDetectionStore:
|
||||
|
||||
return updates_available
|
||||
|
||||
def add_notification_url(self, notification_url):
|
||||
|
||||
logger.debug(f">>> Adding new notification_url - '{notification_url}'")
|
||||
|
||||
notification_urls = self.data['settings']['application'].get('notification_urls', [])
|
||||
|
||||
if notification_url in notification_urls:
|
||||
return notification_url
|
||||
|
||||
with self.lock:
|
||||
notification_urls = self.__data['settings']['application'].get('notification_urls', [])
|
||||
|
||||
if notification_url in notification_urls:
|
||||
return notification_url
|
||||
|
||||
# Append and update the datastore
|
||||
notification_urls.append(notification_url)
|
||||
self.__data['settings']['application']['notification_urls'] = notification_urls
|
||||
self.needs_write = True
|
||||
|
||||
return notification_url
|
||||
|
||||
# Run all updates
|
||||
# IMPORTANT - Each update could be run even when they have a new install and the schema is correct
|
||||
# So therefor - each `update_n` should be very careful about checking if it needs to actually run
|
||||
@@ -748,7 +810,16 @@ class ChangeDetectionStore:
|
||||
logger.critical(f"Applying update_{update_n}")
|
||||
# Wont exist on fresh installs
|
||||
if os.path.exists(self.json_store_path):
|
||||
shutil.copyfile(self.json_store_path, os.path.join(self.datastore_path, f"url-watches-before-{update_n}.json"))
|
||||
i = 0
|
||||
while True:
|
||||
i+=1
|
||||
dest = os.path.join(self.datastore_path, f"url-watches-before-{update_n}-{i}.json")
|
||||
if not os.path.exists(dest):
|
||||
logger.debug(f"Copying url-watches.json DB to '{dest}' backup.")
|
||||
shutil.copyfile(self.json_store_path, dest)
|
||||
break
|
||||
else:
|
||||
logger.warning(f"Backup of url-watches.json '{dest}', DB already exists, trying {i+1}.. ")
|
||||
|
||||
try:
|
||||
update_method = getattr(self, f"update_{update_n}")()
|
||||
@@ -1039,25 +1110,15 @@ class ChangeDetectionStore:
|
||||
formats['markdown'] = 'Markdown'
|
||||
re_run(formats)
|
||||
|
||||
def add_notification_url(self, notification_url):
|
||||
|
||||
logger.debug(f">>> Adding new notification_url - '{notification_url}'")
|
||||
|
||||
notification_urls = self.data['settings']['application'].get('notification_urls', [])
|
||||
|
||||
if notification_url in notification_urls:
|
||||
return notification_url
|
||||
|
||||
with self.lock:
|
||||
notification_urls = self.__data['settings']['application'].get('notification_urls', [])
|
||||
|
||||
if notification_url in notification_urls:
|
||||
return notification_url
|
||||
|
||||
# Append and update the datastore
|
||||
notification_urls.append(notification_url)
|
||||
self.__data['settings']['application']['notification_urls'] = notification_urls
|
||||
self.needs_write = True
|
||||
|
||||
return notification_url
|
||||
|
||||
# RSS types should be inline with the same names as notification types
|
||||
def update_24(self):
|
||||
rss_format = self.data['settings']['application'].get('rss_content_format')
|
||||
if not rss_format or 'text' in rss_format:
|
||||
# might have been 'plaintext, 'plain text' or something
|
||||
self.data['settings']['application']['rss_content_format'] = RSS_CONTENT_FORMAT_DEFAULT
|
||||
elif 'html' in rss_format:
|
||||
self.data['settings']['application']['rss_content_format'] = 'htmlcolor'
|
||||
else:
|
||||
# safe fallback to text
|
||||
self.data['settings']['application']['rss_content_format'] = RSS_CONTENT_FORMAT_DEFAULT
|
||||
|
||||
@@ -8,8 +8,13 @@
|
||||
<meta name="robots" content="noindex">
|
||||
<title>Change Detection{{extra_title}}</title>
|
||||
{% if app_rss_token %}
|
||||
<link rel="alternate" type="application/rss+xml" title="Changedetection.io » Feed{% if active_tag_uuid %}- {{active_tag.title}}{% endif %}" href="{{ url_for('rss.feed', tag=active_tag_uuid , token=app_rss_token)}}" >
|
||||
{% endif %}
|
||||
<link rel="alternate" type="application/rss+xml" title="Changedetection.io » Feed{% if active_tag_uuid %}- {{active_tag.title}}{% endif %}" href="{{ url_for('rss.feed', tag=active_tag_uuid, token=app_rss_token, _external=True )}}" >
|
||||
|
||||
{% if rss_uuid_feed %}
|
||||
<link rel="alternate" type="application/rss+xml" title="Feed » {{ rss_uuid_feed['label'] }}" href="{{ rss_uuid_feed['url'] }}" >
|
||||
|
||||
{%- endif -%}
|
||||
{%- endif -%}
|
||||
<link rel="stylesheet" href="{{url_for('static_content', group='styles', filename='pure-min.css')}}" >
|
||||
<link rel="stylesheet" href="{{url_for('static_content', group='styles', filename='styles.css')}}?v={{ get_css_version() }}" >
|
||||
{% if extra_stylesheets %}
|
||||
|
||||
@@ -19,18 +19,9 @@ def test_inscriptus():
|
||||
|
||||
def test_check_basic_change_detection_functionality(client, live_server, measure_memory_usage, datastore_path):
|
||||
set_original_response(datastore_path=datastore_path)
|
||||
# live_server_setup(live_server) # Setup on conftest per function
|
||||
|
||||
# Add our URL to the import page
|
||||
res = client.post(
|
||||
url_for("imports.import_page"),
|
||||
data={"urls": url_for('test_endpoint', _external=True)},
|
||||
follow_redirects=True
|
||||
)
|
||||
uuid = client.application.config.get('DATASTORE').add_watch(url=url_for('test_endpoint', _external=True))
|
||||
|
||||
assert b"1 Imported" in res.data
|
||||
|
||||
wait_for_all_checks(client)
|
||||
|
||||
# Do this a few times.. ensures we dont accidently set the status
|
||||
for n in range(3):
|
||||
@@ -86,10 +77,9 @@ def test_check_basic_change_detection_functionality(client, live_server, measure
|
||||
assert b'<rss' in res.data
|
||||
|
||||
# re #16 should have the diff in here too
|
||||
assert b'(into) which has this one new line' in res.data
|
||||
assert b'which has this one new line' in res.data
|
||||
assert b'CDATA' in res.data
|
||||
|
||||
assert expected_url.encode('utf-8') in res.data
|
||||
#
|
||||
# Following the 'diff' link, it should no longer display as 'has-unread-changes' even after we recheck it a few times
|
||||
res = client.get(url_for("ui.ui_views.diff_history_page", uuid=uuid))
|
||||
@@ -115,7 +105,6 @@ def test_check_basic_change_detection_functionality(client, live_server, measure
|
||||
# It should report nothing found (no new 'has-unread-changes' class)
|
||||
res = client.get(url_for("watchlist.index"))
|
||||
|
||||
|
||||
assert b'has-unread-changes' not in res.data
|
||||
assert b'class="has-unread-changes' not in res.data
|
||||
assert b'head title' in res.data # Should be ON by default
|
||||
@@ -129,23 +118,6 @@ def test_check_basic_change_detection_functionality(client, live_server, measure
|
||||
res = client.get(url_for("watchlist.index"))
|
||||
assert b'head title and more' in res.data
|
||||
|
||||
# disable <title> pickup
|
||||
res = client.post(
|
||||
url_for("settings.settings_page"),
|
||||
data={"application-ui-use_page_title_in_list": "", "requests-time_between_check-minutes": 180,
|
||||
'application-fetch_backend': "html_requests"},
|
||||
follow_redirects=True
|
||||
)
|
||||
|
||||
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
|
||||
wait_for_all_checks(client)
|
||||
|
||||
res = client.get(url_for("watchlist.index"))
|
||||
assert b'has-unread-changes' in res.data
|
||||
assert b'class="has-unread-changes' in res.data
|
||||
assert b'head title' not in res.data # should now be off
|
||||
|
||||
|
||||
# Be sure the last_viewed is going to be greater than the last snapshot
|
||||
time.sleep(1)
|
||||
|
||||
@@ -166,6 +138,63 @@ def test_check_basic_change_detection_functionality(client, live_server, measure
|
||||
# Cleanup everything
|
||||
delete_all_watches(client)
|
||||
|
||||
def test_title_scraper(client, live_server, measure_memory_usage, datastore_path):
|
||||
|
||||
set_original_response(datastore_path=datastore_path)
|
||||
uuid = client.application.config.get('DATASTORE').add_watch(url=url_for('test_endpoint', _external=True))
|
||||
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
|
||||
wait_for_all_checks()
|
||||
|
||||
# It should report nothing found (no new 'has-unread-changes' class)
|
||||
res = client.get(url_for("watchlist.index"))
|
||||
|
||||
assert b'head title' in res.data # Should be ON by default
|
||||
|
||||
# Recheck it but only with a title change, content wasnt changed
|
||||
set_original_response(datastore_path=datastore_path, extra_title=" and more")
|
||||
|
||||
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
|
||||
wait_for_all_checks(client)
|
||||
res = client.get(url_for("watchlist.index"))
|
||||
assert b'head title and more' in res.data
|
||||
|
||||
# disable <title> pickup
|
||||
res = client.post(
|
||||
url_for("settings.settings_page"),
|
||||
data={"application-ui-use_page_title_in_list": "",
|
||||
"requests-time_between_check-minutes": 180,
|
||||
'application-fetch_backend': "html_requests"},
|
||||
follow_redirects=True
|
||||
)
|
||||
|
||||
set_original_response(datastore_path=datastore_path, extra_title=" SHOULD NOT APPEAR")
|
||||
|
||||
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
|
||||
wait_for_all_checks(client)
|
||||
res = client.get(url_for("watchlist.index"))
|
||||
assert b'SHOULD NOT APPEAR' not in res.data
|
||||
|
||||
delete_all_watches(client)
|
||||
|
||||
def test_title_scraper_html_only(client, live_server, measure_memory_usage, datastore_path):
|
||||
|
||||
with open(os.path.join(datastore_path, "endpoint-content.txt"), "w") as f:
|
||||
f.write('"My text document\nWhere I talk about <title>\nwhich should not get registered\n</title>')
|
||||
|
||||
test_url = url_for('test_endpoint', content_type="text/plain", _external=True)
|
||||
|
||||
uuid = client.application.config.get('DATASTORE').add_watch(test_url)
|
||||
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
|
||||
wait_for_all_checks()
|
||||
|
||||
# It should report nothing found (no new 'has-unread-changes' class)
|
||||
res = client.get(url_for("watchlist.index"))
|
||||
|
||||
assert b'which should not get registered' not in res.data # Should be ON by default
|
||||
assert not live_server.app.config['DATASTORE'].data['watching'][uuid].get('title')
|
||||
|
||||
|
||||
|
||||
|
||||
# Server says its plaintext, we should always treat it as plaintext, and then if they have a filter, try to apply that
|
||||
def test_requests_timeout(client, live_server, measure_memory_usage, datastore_path):
|
||||
|
||||
@@ -40,7 +40,7 @@ def test_consistent_history(client, live_server, measure_memory_usage, datastore
|
||||
json_db_file = os.path.join(live_server.app.config['DATASTORE'].datastore_path, 'url-watches.json')
|
||||
|
||||
json_obj = None
|
||||
with open(json_db_file, 'r') as f:
|
||||
with open(json_db_file, 'r', encoding='utf-8') as f:
|
||||
json_obj = json.load(f)
|
||||
|
||||
# assert the right amount of watches was found in the JSON
|
||||
@@ -76,7 +76,7 @@ def test_consistent_history(client, live_server, measure_memory_usage, datastore
|
||||
assert len(files_in_watch_dir) == 3, "Should be just three files in the dir, html.br snapshot, history.txt and the extracted text snapshot"
|
||||
|
||||
json_db_file = os.path.join(live_server.app.config['DATASTORE'].datastore_path, 'url-watches.json')
|
||||
with open(json_db_file, 'r') as f:
|
||||
with open(json_db_file, 'r', encoding='utf-8') as f:
|
||||
assert '"default"' not in f.read(), "'default' probably shouldnt be here, it came from when the 'default' Watch vars were accidently being saved"
|
||||
|
||||
|
||||
|
||||
@@ -353,7 +353,7 @@ def check_json_ext_filter(json_filter, client, live_server, datastore_path):
|
||||
|
||||
watch = live_server.app.config['DATASTORE'].data['watching'][uuid]
|
||||
dates = list(watch.history.keys())
|
||||
snapshot_contents = watch.get_history_snapshot(dates[0])
|
||||
snapshot_contents = watch.get_history_snapshot(timestamp=dates[0])
|
||||
|
||||
assert snapshot_contents[0] == '['
|
||||
|
||||
@@ -439,16 +439,15 @@ def test_correct_header_detect(client, live_server, measure_memory_usage, datast
|
||||
|
||||
watch = live_server.app.config['DATASTORE'].data['watching'][uuid]
|
||||
dates = list(watch.history.keys())
|
||||
snapshot_contents = watch.get_history_snapshot(dates[0])
|
||||
snapshot_contents = watch.get_history_snapshot(timestamp=dates[0])
|
||||
|
||||
assert b'"hello": 123,' in res.data # properly html escaped in the front end
|
||||
|
||||
import json
|
||||
data = json.loads(snapshot_contents)
|
||||
keys = list(data.keys())
|
||||
# Should be correctly formatted and sorted, ("world" goes to end)
|
||||
assert snapshot_contents == """{
|
||||
"hello": 123,
|
||||
"world": 123
|
||||
}"""
|
||||
|
||||
assert keys == ["hello", "world"]
|
||||
|
||||
delete_all_watches(client)
|
||||
|
||||
def test_check_jsonpath_ext_filter(client, live_server, measure_memory_usage, datastore_path):
|
||||
|
||||
@@ -302,15 +302,20 @@ def test_notification_urls_jinja2_apprise_integration(client, live_server, measu
|
||||
data={
|
||||
"application-fetch_backend": "html_requests",
|
||||
"application-minutes_between_check": 180,
|
||||
"application-notification_body": '{ "url" : "{{ watch_url }}", "secret": 444, "somebug": "网站监测 内容更新了" }',
|
||||
"application-notification_body": '{ "url" : "{{ watch_url }}", "secret": 444, "somebug": "网站监测 内容更新了", "another": "{{diff|truncate(1500)}}" }',
|
||||
"application-notification_format": default_notification_format,
|
||||
"application-notification_urls": test_notification_url,
|
||||
# https://github.com/caronc/apprise/wiki/Notify_Custom_JSON#get-parameter-manipulation
|
||||
"application-notification_title": "New ChangeDetection.io Notification - {{ watch_url }} ",
|
||||
"application-notification_title": "New ChangeDetection.io Notification - {{ watch_url }} {{diff|truncate(200)}} ",
|
||||
},
|
||||
follow_redirects=True
|
||||
)
|
||||
assert b'Settings updated' in res.data
|
||||
assert '网站监测'.encode() in res.data
|
||||
assert b'{{diff|truncate(1500)}}' in res.data
|
||||
assert b'{{diff|truncate(200)}}' in res.data
|
||||
|
||||
|
||||
|
||||
|
||||
def test_notification_custom_endpoint_and_jinja2(client, live_server, measure_memory_usage, datastore_path):
|
||||
|
||||
@@ -22,7 +22,7 @@ def test_fetch_pdf(client, live_server, measure_memory_usage, datastore_path):
|
||||
|
||||
watch = live_server.app.config['DATASTORE'].data['watching'][uuid]
|
||||
dates = list(watch.history.keys())
|
||||
snapshot_contents = watch.get_history_snapshot(dates[0])
|
||||
snapshot_contents = watch.get_history_snapshot(timestamp=dates[0])
|
||||
|
||||
# PDF header should not be there (it was converted to text)
|
||||
assert 'PDF' not in snapshot_contents
|
||||
@@ -75,7 +75,7 @@ def test_fetch_pdf(client, live_server, measure_memory_usage, datastore_path):
|
||||
|
||||
dates = list(watch.history.keys())
|
||||
# new snapshot was also OK, no HTML
|
||||
snapshot_contents = watch.get_history_snapshot(dates[1])
|
||||
snapshot_contents = watch.get_history_snapshot(timestamp=dates[1])
|
||||
assert 'html' not in snapshot_contents.lower()
|
||||
assert f'Original file size - {os.path.getsize(os.path.join(datastore_path, "endpoint-test.pdf"))}' in snapshot_contents
|
||||
assert f'here is a change' in snapshot_contents
|
||||
|
||||
@@ -142,7 +142,7 @@ def test_body_in_request(client, live_server, measure_memory_usage, datastore_pa
|
||||
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
|
||||
wait_for_all_checks(client)
|
||||
watches_with_body = 0
|
||||
with open(os.path.join(datastore_path, 'url-watches.json')) as f:
|
||||
with open(os.path.join(datastore_path, 'url-watches.json'), encoding='utf-8') as f:
|
||||
app_struct = json.load(f)
|
||||
for uuid in app_struct['watching']:
|
||||
if app_struct['watching'][uuid]['body']==body_value:
|
||||
@@ -225,7 +225,7 @@ def test_method_in_request(client, live_server, measure_memory_usage, datastore_
|
||||
wait_for_all_checks(client)
|
||||
|
||||
watches_with_method = 0
|
||||
with open(os.path.join(datastore_path, 'url-watches.json')) as f:
|
||||
with open(os.path.join(datastore_path, 'url-watches.json'), encoding='utf-8') as f:
|
||||
app_struct = json.load(f)
|
||||
for uuid in app_struct['watching']:
|
||||
if app_struct['watching'][uuid]['method'] == 'PATCH':
|
||||
|
||||
@@ -4,6 +4,8 @@ import time
|
||||
from flask import url_for
|
||||
from .util import set_original_response, set_modified_response, live_server_setup, wait_for_all_checks, extract_rss_token_from_UI, \
|
||||
extract_UUID_from_client, delete_all_watches
|
||||
from loguru import logger
|
||||
from ..blueprint.rss import RSS_FORMAT_TYPES
|
||||
|
||||
|
||||
def set_original_cdata_xml(datastore_path):
|
||||
@@ -238,6 +240,114 @@ def test_rss_bad_chars_breaking(client, live_server, measure_memory_usage, datas
|
||||
#assert live_server.app.config['DATASTORE'].data['watching'][uuid].history_n == 2
|
||||
|
||||
|
||||
def test_rss_single_watch_feed(client, live_server, measure_memory_usage, datastore_path):
|
||||
|
||||
app_rss_token = live_server.app.config['DATASTORE'].data['settings']['application'].get('rss_access_token')
|
||||
rss_content_format = live_server.app.config['DATASTORE'].data['settings']['application'].get('rss_content_format')
|
||||
|
||||
set_original_response(datastore_path=datastore_path)
|
||||
|
||||
|
||||
test_url = url_for('test_endpoint', _external=True)
|
||||
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
|
||||
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
|
||||
wait_for_all_checks(client)
|
||||
|
||||
res = client.get(
|
||||
url_for('rss.rss_single_watch', uuid=uuid, token=app_rss_token),
|
||||
follow_redirects=False
|
||||
)
|
||||
|
||||
assert res.status_code == 400
|
||||
assert b'not have enough history' in res.data
|
||||
|
||||
set_modified_response(datastore_path=datastore_path)
|
||||
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
|
||||
wait_for_all_checks(client)
|
||||
|
||||
res = client.get(
|
||||
url_for('rss.rss_single_watch', uuid=uuid, token=app_rss_token),
|
||||
follow_redirects=False
|
||||
)
|
||||
assert res.status_code == 200
|
||||
import xml.etree.ElementTree as ET
|
||||
root = ET.fromstring(res.data)
|
||||
|
||||
def check_formatting(expected_type, content, url):
|
||||
logger.debug(f"Checking formatting type {expected_type}")
|
||||
if expected_type == 'text':
|
||||
assert '<p>' not in content
|
||||
assert 'body' not in content
|
||||
assert '(changed) Which is across multiple lines\n'
|
||||
assert 'modified head title had a change.' # Because it picked it up <title> as watch_title in default template
|
||||
elif expected_type == 'html':
|
||||
assert '<p>' in content
|
||||
assert '<body>' in content
|
||||
assert '<p>(changed) Which is across multiple lines<br>' in content
|
||||
assert f'href="{url}">modified head title had a change.</a>'
|
||||
elif expected_type == 'htmlcolor':
|
||||
assert '<body>' in content
|
||||
assert ' role="note" aria-label="Changed text" title="Changed text">Which is across multiple lines</span>' in content
|
||||
assert f'href="{url}">modified head title had a change.</a>'
|
||||
else:
|
||||
raise Exception(f"Unknown type {expected_type}")
|
||||
|
||||
|
||||
item = root.findall('.//item')[0].findtext('description')
|
||||
check_formatting(expected_type=rss_content_format, content=item, url=test_url)
|
||||
|
||||
# Now the default one is over, lets try all the others
|
||||
for k in list(RSS_FORMAT_TYPES.keys()):
|
||||
res = client.post(
|
||||
url_for("settings.settings_page"),
|
||||
data={"application-rss_content_format": k},
|
||||
follow_redirects=True
|
||||
)
|
||||
assert b'Settings updated' in res.data
|
||||
|
||||
res = client.get(
|
||||
url_for('rss.rss_single_watch', uuid=uuid, token=app_rss_token),
|
||||
follow_redirects=False
|
||||
)
|
||||
assert res.status_code == 200
|
||||
root = ET.fromstring(res.data)
|
||||
item = root.findall('.//item')[0].findtext('description')
|
||||
check_formatting(expected_type=k, content=item, url=test_url)
|
||||
|
||||
# Test RSS entry order: Create multiple versions and verify newest appears first
|
||||
for version in range(3, 6): # Create versions 3, 4, 5
|
||||
set_html_content(datastore_path, f"Version {version} content")
|
||||
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
|
||||
wait_for_all_checks(client)
|
||||
time.sleep(0.5) # Small delay to ensure different timestamps
|
||||
|
||||
# Fetch RSS feed again to verify order
|
||||
res = client.get(
|
||||
url_for('rss.rss_single_watch', uuid=uuid, token=app_rss_token),
|
||||
follow_redirects=False
|
||||
)
|
||||
assert res.status_code == 200
|
||||
|
||||
# Parse RSS and check order (newest first)
|
||||
root = ET.fromstring(res.data)
|
||||
items = root.findall('.//item')
|
||||
assert len(items) >= 3, f"Expected at least 3 items, got {len(items)}"
|
||||
|
||||
# Get descriptions from first 3 items
|
||||
descriptions = []
|
||||
for item in items[:3]:
|
||||
desc = item.findtext('description')
|
||||
descriptions.append(desc if desc else "")
|
||||
|
||||
# First item should contain newest change (Version 5)
|
||||
assert b"Version 5" in descriptions[0].encode() or "Version 5" in descriptions[0], \
|
||||
f"First item should show newest change (Version 5), but got: {descriptions[0][:200]}"
|
||||
|
||||
# Second item should contain Version 4
|
||||
assert b"Version 4" in descriptions[1].encode() or "Version 4" in descriptions[1], \
|
||||
f"Second item should show Version 4, but got: {descriptions[1][:200]}"
|
||||
|
||||
# Third item should contain Version 3
|
||||
assert b"Version 3" in descriptions[2].encode() or "Version 3" in descriptions[2], \
|
||||
f"Third item should show Version 3, but got: {descriptions[2][:200]}"
|
||||
|
||||
|
||||
254
changedetectionio/tests/test_rss_group.py
Normal file
254
changedetectionio/tests/test_rss_group.py
Normal file
@@ -0,0 +1,254 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
import time
|
||||
from flask import url_for
|
||||
from .util import live_server_setup, wait_for_all_checks, extract_rss_token_from_UI, get_UUID_for_tag_name, delete_all_watches
|
||||
import os
|
||||
|
||||
|
||||
def set_original_response(datastore_path):
|
||||
test_return_data = """<html>
|
||||
<body>
|
||||
Some initial text<br>
|
||||
<p>Watch 1 content</p>
|
||||
<p>Watch 2 content</p>
|
||||
</body>
|
||||
</html>
|
||||
"""
|
||||
|
||||
with open(os.path.join(datastore_path, "endpoint-content.txt"), "w") as f:
|
||||
f.write(test_return_data)
|
||||
return None
|
||||
|
||||
|
||||
def set_modified_response(datastore_path):
|
||||
test_return_data = """<html>
|
||||
<body>
|
||||
Some initial text<br>
|
||||
<p>Watch 1 content MODIFIED</p>
|
||||
<p>Watch 2 content CHANGED</p>
|
||||
</body>
|
||||
</html>
|
||||
"""
|
||||
|
||||
with open(os.path.join(datastore_path, "endpoint-content.txt"), "w") as f:
|
||||
f.write(test_return_data)
|
||||
return None
|
||||
|
||||
|
||||
def test_rss_group(client, live_server, measure_memory_usage, datastore_path):
|
||||
"""
|
||||
Test that RSS feed for a specific tag/group shows only watches in that group
|
||||
and displays changes correctly.
|
||||
"""
|
||||
|
||||
set_original_response(datastore_path=datastore_path)
|
||||
|
||||
# Create a tag/group
|
||||
res = client.post(
|
||||
url_for("tags.form_tag_add"),
|
||||
data={"name": "test-rss-group"},
|
||||
follow_redirects=True
|
||||
)
|
||||
assert b"Tag added" in res.data
|
||||
assert b"test-rss-group" in res.data
|
||||
|
||||
# Get the tag UUID
|
||||
tag_uuid = get_UUID_for_tag_name(client, name="test-rss-group")
|
||||
assert tag_uuid is not None
|
||||
|
||||
# Add first watch with the tag
|
||||
test_url_1 = url_for('test_endpoint', _external=True) + "?watch=1"
|
||||
res = client.post(
|
||||
url_for("ui.ui_views.form_quick_watch_add"),
|
||||
data={"url": test_url_1, "tags": 'test-rss-group'},
|
||||
follow_redirects=True
|
||||
)
|
||||
assert b"Watch added" in res.data
|
||||
|
||||
# Add second watch with the tag
|
||||
test_url_2 = url_for('test_endpoint', _external=True) + "?watch=2"
|
||||
res = client.post(
|
||||
url_for("ui.ui_views.form_quick_watch_add"),
|
||||
data={"url": test_url_2, "tags": 'test-rss-group'},
|
||||
follow_redirects=True
|
||||
)
|
||||
assert b"Watch added" in res.data
|
||||
|
||||
# Add a third watch WITHOUT the tag (should not appear in RSS)
|
||||
test_url_3 = url_for('test_endpoint', _external=True) + "?watch=3"
|
||||
res = client.post(
|
||||
url_for("ui.ui_views.form_quick_watch_add"),
|
||||
data={"url": test_url_3, "tags": 'other-tag'},
|
||||
follow_redirects=True
|
||||
)
|
||||
assert b"Watch added" in res.data
|
||||
|
||||
# Wait for initial checks to complete
|
||||
wait_for_all_checks(client)
|
||||
|
||||
# Trigger a change
|
||||
set_modified_response(datastore_path=datastore_path)
|
||||
|
||||
# Recheck all watches
|
||||
res = client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
|
||||
wait_for_all_checks(client)
|
||||
|
||||
# Get RSS token
|
||||
rss_token = extract_rss_token_from_UI(client)
|
||||
assert rss_token is not None
|
||||
|
||||
# Request RSS feed for the specific tag/group using the new endpoint
|
||||
res = client.get(
|
||||
url_for("rss.rss_tag_feed", tag_uuid=tag_uuid, token=rss_token, _external=True),
|
||||
follow_redirects=True
|
||||
)
|
||||
|
||||
# Verify response is successful
|
||||
assert res.status_code == 200
|
||||
assert b"<?xml" in res.data or b"<rss" in res.data
|
||||
|
||||
# Verify the RSS feed contains the tag name in the title
|
||||
assert b"test-rss-group" in res.data
|
||||
|
||||
# Verify watch 1 and watch 2 are in the RSS feed (they have the tag)
|
||||
assert b"watch=1" in res.data
|
||||
assert b"watch=2" in res.data
|
||||
|
||||
# Verify watch 3 is NOT in the RSS feed (it doesn't have the tag)
|
||||
assert b"watch=3" not in res.data
|
||||
|
||||
# Verify the changes are shown in the RSS feed
|
||||
assert b"MODIFIED" in res.data or b"CHANGED" in res.data
|
||||
|
||||
# Verify it's actual RSS/XML format
|
||||
assert b"<rss" in res.data or b"<feed" in res.data
|
||||
|
||||
# Test with invalid tag UUID - should return 404
|
||||
res = client.get(
|
||||
url_for("rss.rss_tag_feed", tag_uuid="invalid-uuid-12345", token=rss_token, _external=True),
|
||||
follow_redirects=True
|
||||
)
|
||||
assert res.status_code == 404
|
||||
assert b"not found" in res.data
|
||||
|
||||
# Test with invalid token - should return 403
|
||||
res = client.get(
|
||||
url_for("rss.rss_tag_feed", tag_uuid=tag_uuid, token="wrong-token", _external=True),
|
||||
follow_redirects=True
|
||||
)
|
||||
assert res.status_code == 403
|
||||
assert b"Access denied" in res.data
|
||||
|
||||
# Clean up
|
||||
delete_all_watches(client)
|
||||
res = client.get(url_for("tags.delete_all"), follow_redirects=True)
|
||||
assert b'All tags deleted' in res.data
|
||||
|
||||
|
||||
def test_rss_group_empty_tag(client, live_server, measure_memory_usage, datastore_path):
|
||||
"""
|
||||
Test that RSS feed for a tag with no watches returns valid but empty RSS.
|
||||
"""
|
||||
|
||||
# Create a tag with no watches
|
||||
res = client.post(
|
||||
url_for("tags.form_tag_add"),
|
||||
data={"name": "empty-tag"},
|
||||
follow_redirects=True
|
||||
)
|
||||
assert b"Tag added" in res.data
|
||||
|
||||
tag_uuid = get_UUID_for_tag_name(client, name="empty-tag")
|
||||
assert tag_uuid is not None
|
||||
|
||||
# Get RSS token
|
||||
rss_token = extract_rss_token_from_UI(client)
|
||||
|
||||
# Request RSS feed for empty tag
|
||||
res = client.get(
|
||||
url_for("rss.rss_tag_feed", tag_uuid=tag_uuid, token=rss_token, _external=True),
|
||||
follow_redirects=True
|
||||
)
|
||||
|
||||
# Should still return 200 with valid RSS
|
||||
assert res.status_code == 200
|
||||
assert b"<?xml" in res.data or b"<rss" in res.data
|
||||
assert b"empty-tag" in res.data
|
||||
|
||||
# Clean up
|
||||
res = client.get(url_for("tags.delete_all"), follow_redirects=True)
|
||||
assert b'All tags deleted' in res.data
|
||||
|
||||
|
||||
def test_rss_group_only_unviewed(client, live_server, measure_memory_usage, datastore_path):
|
||||
"""
|
||||
Test that RSS feed for a tag only shows unviewed watches.
|
||||
"""
|
||||
|
||||
set_original_response(datastore_path=datastore_path)
|
||||
|
||||
# Create a tag
|
||||
res = client.post(
|
||||
url_for("tags.form_tag_add"),
|
||||
data={"name": "unviewed-test"},
|
||||
follow_redirects=True
|
||||
)
|
||||
assert b"Tag added" in res.data
|
||||
|
||||
tag_uuid = get_UUID_for_tag_name(client, name="unviewed-test")
|
||||
|
||||
# Add two watches with the tag
|
||||
test_url_1 = url_for('test_endpoint', _external=True) + "?unviewed=1"
|
||||
res = client.post(
|
||||
url_for("ui.ui_views.form_quick_watch_add"),
|
||||
data={"url": test_url_1, "tags": 'unviewed-test'},
|
||||
follow_redirects=True
|
||||
)
|
||||
assert b"Watch added" in res.data
|
||||
|
||||
test_url_2 = url_for('test_endpoint', _external=True) + "?unviewed=2"
|
||||
res = client.post(
|
||||
url_for("ui.ui_views.form_quick_watch_add"),
|
||||
data={"url": test_url_2, "tags": 'unviewed-test'},
|
||||
follow_redirects=True
|
||||
)
|
||||
assert b"Watch added" in res.data
|
||||
|
||||
wait_for_all_checks(client)
|
||||
|
||||
# Trigger changes
|
||||
set_modified_response(datastore_path=datastore_path)
|
||||
res = client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
|
||||
wait_for_all_checks(client)
|
||||
|
||||
# Get RSS token
|
||||
rss_token = extract_rss_token_from_UI(client)
|
||||
|
||||
# Request RSS feed - should show both watches (both unviewed)
|
||||
res = client.get(
|
||||
url_for("rss.rss_tag_feed", tag_uuid=tag_uuid, token=rss_token, _external=True),
|
||||
follow_redirects=True
|
||||
)
|
||||
assert res.status_code == 200
|
||||
assert b"unviewed=1" in res.data
|
||||
assert b"unviewed=2" in res.data
|
||||
|
||||
# Mark all as viewed
|
||||
res = client.get(url_for('ui.mark_all_viewed'), follow_redirects=True)
|
||||
wait_for_all_checks(client)
|
||||
|
||||
# Request RSS feed again - should be empty now (no unviewed watches)
|
||||
res = client.get(
|
||||
url_for("rss.rss_tag_feed", tag_uuid=tag_uuid, token=rss_token, _external=True),
|
||||
follow_redirects=True
|
||||
)
|
||||
assert res.status_code == 200
|
||||
# Should not contain the watch URLs anymore since they're viewed
|
||||
assert b"unviewed=1" not in res.data
|
||||
assert b"unviewed=2" not in res.data
|
||||
|
||||
# Clean up
|
||||
delete_all_watches(client)
|
||||
res = client.get(url_for("tags.delete_all"), follow_redirects=True)
|
||||
assert b'All tags deleted' in res.data
|
||||
@@ -65,7 +65,7 @@ def test_rss_reader_mode(client, live_server, measure_memory_usage, datastore_pa
|
||||
|
||||
watch = live_server.app.config['DATASTORE'].data['watching'][uuid]
|
||||
dates = list(watch.history.keys())
|
||||
snapshot_contents = watch.get_history_snapshot(dates[0])
|
||||
snapshot_contents = watch.get_history_snapshot(timestamp=dates[0])
|
||||
assert 'Wet noodles escape' in snapshot_contents
|
||||
assert '<br>' not in snapshot_contents
|
||||
assert '<' not in snapshot_contents
|
||||
@@ -91,7 +91,7 @@ def test_rss_reader_mode_with_css_filters(client, live_server, measure_memory_us
|
||||
|
||||
watch = live_server.app.config['DATASTORE'].data['watching'][uuid]
|
||||
dates = list(watch.history.keys())
|
||||
snapshot_contents = watch.get_history_snapshot(dates[0])
|
||||
snapshot_contents = watch.get_history_snapshot(timestamp=dates[0])
|
||||
assert 'Wet noodles escape' not in snapshot_contents
|
||||
assert '<br>' not in snapshot_contents
|
||||
assert '<' not in snapshot_contents
|
||||
|
||||
245
changedetectionio/tests/test_rss_single_watch.py
Normal file
245
changedetectionio/tests/test_rss_single_watch.py
Normal file
@@ -0,0 +1,245 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
import time
|
||||
import os
|
||||
import xml.etree.ElementTree as ET
|
||||
from flask import url_for
|
||||
from .util import live_server_setup, wait_for_all_checks, extract_rss_token_from_UI, extract_UUID_from_client, delete_all_watches
|
||||
|
||||
|
||||
def test_rss_single_watch_order(client, live_server, measure_memory_usage, datastore_path):
|
||||
"""
|
||||
Test that single watch RSS feed shows changes in correct order (newest first).
|
||||
"""
|
||||
|
||||
# Create initial content
|
||||
def set_response(datastore_path, version):
|
||||
test_return_data = f"""<html>
|
||||
<body>
|
||||
<p>Version {version} content</p>
|
||||
</body>
|
||||
</html>
|
||||
"""
|
||||
with open(os.path.join(datastore_path, "endpoint-content.txt"), "w") as f:
|
||||
f.write(test_return_data)
|
||||
|
||||
# Start with version 1
|
||||
set_response(datastore_path, 1)
|
||||
|
||||
# Add a watch
|
||||
test_url = url_for('test_endpoint', _external=True) + "?order_test=1"
|
||||
res = client.post(
|
||||
url_for("ui.ui_views.form_quick_watch_add"),
|
||||
data={"url": test_url, "tags": 'test-tag'},
|
||||
follow_redirects=True
|
||||
)
|
||||
assert b"Watch added" in res.data
|
||||
|
||||
# Get the watch UUID
|
||||
watch_uuid = extract_UUID_from_client(client)
|
||||
|
||||
# Wait for initial check
|
||||
wait_for_all_checks(client)
|
||||
|
||||
# Create multiple versions by triggering changes
|
||||
for version in range(2, 6): # Create versions 2, 3, 4, 5
|
||||
set_response(datastore_path, version)
|
||||
res = client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
|
||||
wait_for_all_checks(client)
|
||||
time.sleep(0.5) # Small delay to ensure different timestamps
|
||||
|
||||
# Get RSS token
|
||||
rss_token = extract_rss_token_from_UI(client)
|
||||
|
||||
# Request RSS feed for the single watch
|
||||
res = client.get(
|
||||
url_for("rss.rss_single_watch", uuid=watch_uuid, token=rss_token, _external=True),
|
||||
follow_redirects=True
|
||||
)
|
||||
|
||||
# Should return valid RSS
|
||||
assert res.status_code == 200
|
||||
assert b"<?xml" in res.data or b"<rss" in res.data
|
||||
|
||||
# Parse the RSS/XML
|
||||
root = ET.fromstring(res.data)
|
||||
|
||||
# Find all items (RSS 2.0) or entries (Atom)
|
||||
items = root.findall('.//item')
|
||||
if not items:
|
||||
items = root.findall('.//{http://www.w3.org/2005/Atom}entry')
|
||||
|
||||
# Should have multiple items
|
||||
assert len(items) >= 3, f"Expected at least 3 items, got {len(items)}"
|
||||
|
||||
# Get the descriptions/content from first 3 items
|
||||
descriptions = []
|
||||
for item in items[:3]:
|
||||
# Try RSS format first
|
||||
desc = item.findtext('description')
|
||||
if not desc:
|
||||
# Try Atom format
|
||||
content_elem = item.find('{http://www.w3.org/2005/Atom}content')
|
||||
if content_elem is not None:
|
||||
desc = content_elem.text
|
||||
descriptions.append(desc if desc else "")
|
||||
|
||||
print(f"First item content: {descriptions[0][:100] if descriptions[0] else 'None'}")
|
||||
print(f"Second item content: {descriptions[1][:100] if descriptions[1] else 'None'}")
|
||||
print(f"Third item content: {descriptions[2][:100] if descriptions[2] else 'None'}")
|
||||
|
||||
# The FIRST item should contain the NEWEST change (Version 5)
|
||||
# The SECOND item should contain Version 4
|
||||
# The THIRD item should contain Version 3
|
||||
assert b"Version 5" in descriptions[0].encode() or "Version 5" in descriptions[0], \
|
||||
f"First item should show newest change (Version 5), but got: {descriptions[0][:200]}"
|
||||
|
||||
# Verify the order is correct
|
||||
assert b"Version 4" in descriptions[1].encode() or "Version 4" in descriptions[1], \
|
||||
f"Second item should show Version 4, but got: {descriptions[1][:200]}"
|
||||
|
||||
assert b"Version 3" in descriptions[2].encode() or "Version 3" in descriptions[2], \
|
||||
f"Third item should show Version 3, but got: {descriptions[2][:200]}"
|
||||
|
||||
# Clean up
|
||||
delete_all_watches(client)
|
||||
|
||||
|
||||
def test_rss_categories_from_tags(client, live_server, measure_memory_usage, datastore_path):
|
||||
"""
|
||||
Test that RSS feeds include category tags from watch tags.
|
||||
"""
|
||||
|
||||
# Create initial content
|
||||
test_return_data = """<html>
|
||||
<body>
|
||||
<p>Test content for RSS categories</p>
|
||||
</body>
|
||||
</html>
|
||||
"""
|
||||
with open(os.path.join(datastore_path, "endpoint-content.txt"), "w") as f:
|
||||
f.write(test_return_data)
|
||||
|
||||
# Create some tags first
|
||||
res = client.post(
|
||||
url_for("tags.form_tag_add"),
|
||||
data={"name": "Security"},
|
||||
follow_redirects=True
|
||||
)
|
||||
|
||||
res = client.post(
|
||||
url_for("tags.form_tag_add"),
|
||||
data={"name": "Python"},
|
||||
follow_redirects=True
|
||||
)
|
||||
|
||||
res = client.post(
|
||||
url_for("tags.form_tag_add"),
|
||||
data={"name": "Tech News"},
|
||||
follow_redirects=True
|
||||
)
|
||||
|
||||
# Add a watch with tags
|
||||
test_url = url_for('test_endpoint', _external=True) + "?category_test=1"
|
||||
res = client.post(
|
||||
url_for("ui.ui_views.form_quick_watch_add"),
|
||||
data={"url": test_url, "tags": "Security, Python, Tech News"},
|
||||
follow_redirects=True
|
||||
)
|
||||
assert b"Watch added" in res.data
|
||||
|
||||
# Get the watch UUID
|
||||
watch_uuid = extract_UUID_from_client(client)
|
||||
|
||||
# Wait for initial check
|
||||
wait_for_all_checks(client)
|
||||
|
||||
# Trigger one change
|
||||
test_return_data_v2 = """<html>
|
||||
<body>
|
||||
<p>Updated content for RSS categories</p>
|
||||
</body>
|
||||
</html>
|
||||
"""
|
||||
with open(os.path.join(datastore_path, "endpoint-content.txt"), "w") as f:
|
||||
f.write(test_return_data_v2)
|
||||
|
||||
res = client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
|
||||
wait_for_all_checks(client)
|
||||
|
||||
# Get RSS token
|
||||
rss_token = extract_rss_token_from_UI(client)
|
||||
|
||||
# Test 1: Check single watch RSS feed
|
||||
res = client.get(
|
||||
url_for("rss.rss_single_watch", uuid=watch_uuid, token=rss_token, _external=True),
|
||||
follow_redirects=True
|
||||
)
|
||||
assert res.status_code == 200
|
||||
assert b"<?xml" in res.data or b"<rss" in res.data
|
||||
|
||||
# Parse the RSS/XML
|
||||
root = ET.fromstring(res.data)
|
||||
|
||||
# Find all items
|
||||
items = root.findall('.//item')
|
||||
assert len(items) >= 1, "Expected at least 1 item in RSS feed"
|
||||
|
||||
# Get categories from first item
|
||||
categories = [cat.text for cat in items[0].findall('category')]
|
||||
|
||||
print(f"Found categories in single watch RSS: {categories}")
|
||||
|
||||
# Should have all three categories
|
||||
assert "Security" in categories, f"Expected 'Security' category, got: {categories}"
|
||||
assert "Python" in categories, f"Expected 'Python' category, got: {categories}"
|
||||
assert "Tech News" in categories, f"Expected 'Tech News' category, got: {categories}"
|
||||
assert len(categories) == 3, f"Expected 3 categories, got {len(categories)}: {categories}"
|
||||
|
||||
# Test 2: Check main RSS feed
|
||||
res = client.get(
|
||||
url_for("rss.feed", token=rss_token, _external=True),
|
||||
follow_redirects=True
|
||||
)
|
||||
assert res.status_code == 200
|
||||
|
||||
root = ET.fromstring(res.data)
|
||||
items = root.findall('.//item')
|
||||
assert len(items) >= 1, "Expected at least 1 item in main RSS feed"
|
||||
|
||||
# Get categories from first item in main feed
|
||||
categories = [cat.text for cat in items[0].findall('category')]
|
||||
|
||||
print(f"Found categories in main RSS feed: {categories}")
|
||||
|
||||
# Should have all three categories
|
||||
assert "Security" in categories, f"Expected 'Security' category in main feed, got: {categories}"
|
||||
assert "Python" in categories, f"Expected 'Python' category in main feed, got: {categories}"
|
||||
assert "Tech News" in categories, f"Expected 'Tech News' category in main feed, got: {categories}"
|
||||
|
||||
# Test 3: Check tag-specific RSS feed (should also have categories)
|
||||
# Get the tag UUID for "Security" and verify the tag feed also has categories
|
||||
from .util import get_UUID_for_tag_name
|
||||
security_tag_uuid = get_UUID_for_tag_name(client, name="Security")
|
||||
|
||||
if security_tag_uuid:
|
||||
res = client.get(
|
||||
url_for("rss.rss_tag_feed", tag_uuid=security_tag_uuid, token=rss_token, _external=True),
|
||||
follow_redirects=True
|
||||
)
|
||||
assert res.status_code == 200
|
||||
|
||||
root = ET.fromstring(res.data)
|
||||
items = root.findall('.//item')
|
||||
|
||||
if len(items) >= 1:
|
||||
categories = [cat.text for cat in items[0].findall('category')]
|
||||
print(f"Found categories in tag RSS feed: {categories}")
|
||||
|
||||
# Should still have all three categories
|
||||
assert "Security" in categories, f"Expected 'Security' category in tag feed, got: {categories}"
|
||||
assert "Python" in categories, f"Expected 'Python' category in tag feed, got: {categories}"
|
||||
assert "Tech News" in categories, f"Expected 'Tech News' category in tag feed, got: {categories}"
|
||||
|
||||
# Clean up
|
||||
delete_all_watches(client)
|
||||
@@ -55,8 +55,8 @@ class TestTriggerConditions(unittest.TestCase):
|
||||
self.assertEqual(len(history), 2)
|
||||
|
||||
# Retrieve and check snapshots
|
||||
#snapshot1 = watch.get_history_snapshot(str(timestamp1))
|
||||
#snapshot2 = watch.get_history_snapshot(str(timestamp2))
|
||||
#snapshot1 = watch.get_history_snapshot(timestamp=str(timestamp1))
|
||||
#snapshot2 = watch.get_history_snapshot(timestamp=str(timestamp2))
|
||||
|
||||
self.store.data['watching'][self.watch_uuid].update(
|
||||
{
|
||||
|
||||
@@ -33,6 +33,9 @@ chardet>2.3.0
|
||||
wtforms~=3.2
|
||||
jsonpath-ng~=1.7.0
|
||||
|
||||
# Fast JSON serialization for better performance
|
||||
orjson~=3.11
|
||||
|
||||
# dnspython - Used by paho-mqtt for MQTT broker resolution
|
||||
# Version pin removed since eventlet (which required the specific 2.6.1 pin) has been eliminated
|
||||
# paho-mqtt will install compatible dnspython version automatically
|
||||
|
||||
Reference in New Issue
Block a user