Compare commits

...

25 Commits

Author SHA1 Message Date
dgtlmoon
78647d308d Adding test 2025-11-19 16:33:02 +01:00
dgtlmoon
00d28c6c40 Add some metadata 2025-11-19 15:44:38 +01:00
dgtlmoon
91729ae724 Template tweaks 2025-11-19 15:33:01 +01:00
dgtlmoon
e09c86dd13 Tempalte update 2025-11-19 15:25:17 +01:00
dgtlmoon
90d68f7ca7 RSS encoding fixes 2025-11-19 15:19:50 +01:00
dgtlmoon
b6b733a1fa RSS Reader mode - Improve parser, dc:content, etc 2025-11-19 15:08:23 +01:00
dgtlmoon
0be5005776 0.51.2 2025-11-19 13:08:26 +01:00
dgtlmoon
12ce03c0bb RSS - New Settings option for making RSS follow the format of Notification Body across watch/group/etc, or system default and override the format with your own as you like. 2025-11-19 12:58:12 +01:00
dgtlmoon
3767a2d5b9 0.51.1 Fixing semver version number
Some checks failed
Build and push containers / metadata (push) Has been cancelled
Build and push containers / build-push-containers (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Build distribution 📦 (push) Has been cancelled
ChangeDetection.io App Test / lint-code (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Test the built package works basically. (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Publish Python 🐍 distribution 📦 to PyPI (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-10 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-11 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-12 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-13 (push) Has been cancelled
2025-11-14 10:21:32 +01:00
dgtlmoon
71c8d8b1b1 0.51.01 2025-11-14 10:11:45 +01:00
dgtlmoon
20cbe6f510 0.51.00 2025-11-14 10:10:40 +01:00
dgtlmoon
3a6e1f908f UI - Minor text fix for anon history access 2025-11-14 10:01:03 +01:00
dgtlmoon
73fdbf24e3 RSS per watch tweaks (#3635)
Some checks failed
Build and push containers / metadata (push) Has been cancelled
Build and push containers / build-push-containers (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Build distribution 📦 (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Test the built package works basically. (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Publish Python 🐍 distribution 📦 to PyPI (push) Has been cancelled
ChangeDetection.io App Test / lint-code (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-10 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-11 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-12 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-13 (push) Has been cancelled
2025-11-13 20:37:11 +01:00
dgtlmoon
629f939224 RSS Feed per watch - Setting order (newest changes first) (#3634) 2025-11-13 19:44:22 +01:00
dgtlmoon
48299e5738 UI - Moving 'RSS' options to its own settings tab, RSS - Adding watch history length (#3633) 2025-11-13 19:20:03 +01:00
dgtlmoon
5b1b70b8ab RSS per group! (#3632) 2025-11-13 18:56:04 +01:00
dgtlmoon
678d568b37 UI - Move 'Jitter seconds' settings tab from "General" to "Fetching" global Settings. 2025-11-13 17:34:57 +01:00
John Eismeier
fb15b62fb9 README typo fix and ignore files for emacs style backups
Some checks failed
Build and push containers / metadata (push) Has been cancelled
Build and push containers / build-push-containers (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Build distribution 📦 (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Test the built package works basically. (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Publish Python 🐍 distribution 📦 to PyPI (push) Has been cancelled
ChangeDetection.io App Test / lint-code (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-10 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-11 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-12 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-13 (push) Has been cancelled
CodeQL / Analyze (javascript) (push) Has been cancelled
CodeQL / Analyze (python) (push) Has been cancelled
2025-11-12 21:51:06 +01:00
dgtlmoon
8dc39d4a3d RSS feeds for a single watches!
Some checks failed
Build and push containers / metadata (push) Has been cancelled
Build and push containers / build-push-containers (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Build distribution 📦 (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Test the built package works basically. (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Publish Python 🐍 distribution 📦 to PyPI (push) Has been cancelled
ChangeDetection.io App Test / lint-code (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-10 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-11 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-12 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-13 (push) Has been cancelled
2025-11-12 17:38:18 +01:00
dgtlmoon
805cd618d4 Always backup JSON DB on new versions as well as the existing between updates. 2025-11-12 17:37:12 +01:00
dgtlmoon
4ba5fcce8f 0.50.43
Some checks failed
Build and push containers / metadata (push) Has been cancelled
Build and push containers / build-push-containers (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Build distribution 📦 (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Test the built package works basically. (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Publish Python 🐍 distribution 📦 to PyPI (push) Has been cancelled
ChangeDetection.io App Test / lint-code (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-10 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-11 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-12 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-13 (push) Has been cancelled
2025-11-12 13:00:42 +01:00
dgtlmoon
b9305faf21 Forcing UTF-8 when reading JSON DB (Fixes data not loaded for some platforms #3622 #3611 #3628), Always create new versions of the backup DB if one exists for that step when running updates, Adding extra sanity checks on DB load 2025-11-12 12:58:59 +01:00
dgtlmoon
3d3b53831e Adding data sanity checks across restarts (#3629) 2025-11-12 12:19:16 +01:00
dgtlmoon
2ae29ab78f 0.50.42
Some checks failed
Build and push containers / metadata (push) Has been cancelled
Build and push containers / build-push-containers (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Build distribution 📦 (push) Has been cancelled
ChangeDetection.io App Test / lint-code (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Test the built package works basically. (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Publish Python 🐍 distribution 📦 to PyPI (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-10 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-11 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-12 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-13 (push) Has been cancelled
2025-11-10 13:32:08 +01:00
dgtlmoon
caffd804fe Revert "Windows - JSON DB fixes - Forcing utf-8 for json DB read/writes should solve windows saving/loading problems. (#3615 #3611)"
This reverts commit e2b407c6f3.
2025-11-10 13:31:51 +01:00
33 changed files with 1962 additions and 565 deletions

1
.gitignore vendored
View File

@@ -21,6 +21,7 @@ venv/
# IDEs
.idea
.vscode/settings.json
*~
# Datastore files
datastore/

View File

@@ -14,7 +14,7 @@ Ideal for monitoring price changes, content edits, conditional changes and more.
[<img src="https://raw.githubusercontent.com/dgtlmoon/changedetection.io/master/docs/screenshot.png" style="max-width:100%;" alt="Self-hosted web page change monitoring, list of websites with changes" title="Self-hosted web page change monitoring, list of websites with changes" />](https://changedetection.io)
[**Don't have time? Try our extremely affordable subscription use our proxies and support!**](https://changedetection.io)
[**Don't have time? Try our extremely affordable subscription use our proxies and support!**](https://changedetection.io)
@@ -31,7 +31,7 @@ Available when connected to a <a href="https://github.com/dgtlmoon/changedetecti
### Perform interactive browser steps
Fill in text boxes, click buttons and more, setup your changedetection scenario.
Fill in text boxes, click buttons and more, setup your changedetection scenario.
Using the **Browser Steps** configuration, add basic steps before performing change detection, such as logging into websites, adding a product to a cart, accept cookie logins, entering dates and refining searches.
@@ -54,7 +54,7 @@ Requires Playwright to be enabled.
- Know when your favourite whiskey is on sale, or other special deals are announced before anyone else
- COVID related news from government websites
- University/organisation news from their website
- Detect and monitor changes in JSON API responses
- Detect and monitor changes in JSON API responses
- JSON API monitoring and alerting
- Changes in legal and other documents
- Trigger API calls via notifications when text appears on a website
@@ -86,7 +86,7 @@ _Need an actual Chrome runner with Javascript support? We support fetching via W
We [recommend and use Bright Data](https://brightdata.grsm.io/n0r16zf7eivq) global proxy services, Bright Data will match any first deposit up to $100 using our signup link.
[Oxylabs](https://oxylabs.go2cloud.org/SH2d) is also an excellent proxy provider and well worth using, they offer Residental, ISP, Rotating and many other proxy types to suit your project.
[Oxylabs](https://oxylabs.go2cloud.org/SH2d) is also an excellent proxy provider and well worth using, they offer Residential, ISP, Rotating and many other proxy types to suit your project.
Please :star: star :star: this project and help it grow! https://github.com/dgtlmoon/changedetection.io/
@@ -106,4 +106,3 @@ $ changedetection.io -d /path/to/empty/data/dir -p 5000
Then visit http://127.0.0.1:5000 , You should now be able to access the UI.
See https://changedetection.io for more information.

View File

@@ -1,8 +1,8 @@
#!/usr/bin/env python3
# Read more https://github.com/dgtlmoon/changedetection.io/wiki
__version__ = '0.50.41'
# Semver means never use .01, or 00. Should be .1.
__version__ = '0.51.2'
from changedetectionio.strtobool import strtobool
from json.decoder import JSONDecodeError
@@ -74,6 +74,12 @@ def main():
datastore_path = None
do_cleanup = False
# Optional URL to watch since start
default_url = None
# Set a default logger level
logger_level = 'DEBUG'
include_default_watches = True
host = os.environ.get("LISTEN_HOST", "0.0.0.0").strip()
port = int(os.environ.get('PORT', 5000))
ssl_mode = False
@@ -87,15 +93,13 @@ def main():
datastore_path = os.path.join(os.getcwd(), "../datastore")
try:
opts, args = getopt.getopt(sys.argv[1:], "6Ccsd:h:p:l:", "port")
opts, args = getopt.getopt(sys.argv[1:], "6Ccsd:h:p:l:u:", "port")
except getopt.GetoptError:
print('backend.py -s SSL enable -h [host] -p [port] -d [datastore path] -l [debug level - TRACE, DEBUG(default), INFO, SUCCESS, WARNING, ERROR, CRITICAL]')
print('backend.py -s SSL enable -h [host] -p [port] -d [datastore path] -u [default URL to watch] -l [debug level - TRACE, DEBUG(default), INFO, SUCCESS, WARNING, ERROR, CRITICAL]')
sys.exit(2)
create_datastore_dir = False
# Set a default logger level
logger_level = 'DEBUG'
# Set a logger level via shell env variable
# Used: Dockerfile for CICD
# To set logger level for pytest, see the app function in tests/conftest.py
@@ -116,6 +120,10 @@ def main():
if opt == '-d':
datastore_path = arg
if opt == '-u':
default_url = arg
include_default_watches = False
# Cleanup (remove text files that arent in the index)
if opt == '-c':
do_cleanup = True
@@ -172,13 +180,16 @@ def main():
sys.exit(2)
try:
datastore = store.ChangeDetectionStore(datastore_path=app_config['datastore_path'], version_tag=__version__)
datastore = store.ChangeDetectionStore(datastore_path=app_config['datastore_path'], version_tag=__version__, include_default_watches=include_default_watches)
except JSONDecodeError as e:
# Dont' start if the JSON DB looks corrupt
logger.critical(f"ERROR: JSON DB or Proxy List JSON at '{app_config['datastore_path']}' appears to be corrupt, aborting.")
logger.critical(str(e))
return
if default_url:
datastore.add_watch(url = default_url)
app = changedetection_app(app_config, datastore)
# Get the SocketIO instance from the Flask app (created in flask_app.py)

View File

@@ -1 +1,27 @@
RSS_FORMAT_TYPES = [('plaintext', 'Plain text'), ('html', 'HTML Color')]
from copy import deepcopy
from loguru import logger
from changedetectionio.model import USE_SYSTEM_DEFAULT_NOTIFICATION_FORMAT_FOR_WATCH
from changedetectionio.notification import valid_notification_formats
RSS_CONTENT_FORMAT_DEFAULT = 'text'
# Some stuff not related
RSS_FORMAT_TYPES = deepcopy(valid_notification_formats)
if RSS_FORMAT_TYPES.get('markdown'):
del RSS_FORMAT_TYPES['markdown']
if RSS_FORMAT_TYPES.get(USE_SYSTEM_DEFAULT_NOTIFICATION_FORMAT_FOR_WATCH):
del RSS_FORMAT_TYPES[USE_SYSTEM_DEFAULT_NOTIFICATION_FORMAT_FOR_WATCH]
if not RSS_FORMAT_TYPES.get(RSS_CONTENT_FORMAT_DEFAULT):
logger.critical(f"RSS_CONTENT_FORMAT_DEFAULT not in the acceptable list {RSS_CONTENT_FORMAT_DEFAULT}")
RSS_TEMPLATE_TYPE_OPTIONS = {'system_default': 'System default', 'notification_body': 'Notification body'}
# @note: We use <pre> because nearly all RSS readers render only HTML (Thunderbird for example cant do just plaintext)
RSS_TEMPLATE_PLAINTEXT_DEFAULT = "<pre>{{watch_label}} had a change.\n\n{{diff}}\n</pre>"
# @todo add some [edit]/[history]/[goto] etc links
# @todo need {{watch_edit_link}} + delete + history link token
RSS_TEMPLATE_HTML_DEFAULT = "<html><body>\n<h4><a href=\"{{watch_url}}\">{{watch_label}}</a></h4>\n<p>{{diff}}</p>\n</body></html>\n"

View File

@@ -0,0 +1,156 @@
"""
Utility functions for RSS feed generation.
"""
from changedetectionio.notification.handler import process_notification
from changedetectionio.notification_service import NotificationContextData, _check_cascading_vars
from loguru import logger
import datetime
import pytz
import re
BAD_CHARS_REGEX = r'[\x00-\x08\x0B\x0C\x0E-\x1F]'
def scan_invalid_chars_in_rss(content):
"""
Scan for invalid characters in RSS content.
Returns True if invalid characters are found.
"""
for match in re.finditer(BAD_CHARS_REGEX, content):
i = match.start()
bad_char = content[i]
hex_value = f"0x{ord(bad_char):02x}"
# Grab context
start = max(0, i - 20)
end = min(len(content), i + 21)
context = content[start:end].replace('\n', '\\n').replace('\r', '\\r')
logger.warning(f"Invalid char {hex_value} at pos {i}: ...{context}...")
# First match is enough
return True
return False
def clean_entry_content(content):
"""
Remove invalid characters from RSS content.
"""
cleaned = re.sub(BAD_CHARS_REGEX, '', content)
return cleaned
def generate_watch_guid(watch, timestamp):
"""
Generate a unique GUID for a watch RSS entry.
Args:
watch: The watch object
timestamp: The timestamp of the specific change this entry represents
"""
return f"{watch['uuid']}/{timestamp}"
def validate_rss_token(datastore, request):
"""
Validate the RSS access token from the request.
Returns:
tuple: (is_valid, error_response) where error_response is None if valid
"""
app_rss_token = datastore.data['settings']['application'].get('rss_access_token')
rss_url_token = request.args.get('token')
if rss_url_token != app_rss_token:
return False, ("Access denied, bad token", 403)
return True, None
def get_rss_template(datastore, watch, rss_content_format, default_html, default_plaintext):
"""Get the appropriate template for RSS content."""
if datastore.data['settings']['application'].get('rss_template_type') == 'notification_body':
return _check_cascading_vars(datastore=datastore, var_name='notification_body', watch=watch)
override = datastore.data['settings']['application'].get('rss_template_override')
if override and override.strip():
return override
elif 'text' in rss_content_format:
return default_plaintext
else:
return default_html
def get_watch_label(datastore, watch):
"""Get the label for a watch based on settings."""
if datastore.data['settings']['application']['ui'].get('use_page_title_in_list') or watch.get('use_page_title_in_list'):
return watch.label
else:
return watch.get('url')
def add_watch_categories(fe, watch, datastore):
"""Add category tags to a feed entry based on watch tags."""
for tag_uuid in watch.get('tags', []):
tag = datastore.data['settings']['application'].get('tags', {}).get(tag_uuid)
if tag and tag.get('title'):
fe.category(term=tag.get('title'))
def build_notification_context(watch, timestamp_from, timestamp_to, watch_label,
n_body_template, rss_content_format):
"""Build the notification context object."""
return NotificationContextData(initial_data={
'notification_urls': ['null://just-sending-a-null-test-for-the-render-in-RSS'],
'notification_body': n_body_template,
'timestamp_to': timestamp_to,
'timestamp_from': timestamp_from,
'watch_label': watch_label,
'notification_format': rss_content_format
})
def render_notification(n_object, notification_service, watch, datastore,
date_index_from=None, date_index_to=None):
"""Process and render the notification content."""
kwargs = {'n_object': n_object, 'watch': watch}
if date_index_from is not None and date_index_to is not None:
kwargs['date_index_from'] = date_index_from
kwargs['date_index_to'] = date_index_to
n_object = notification_service.queue_notification_for_watch(**kwargs)
n_object['watch_mime_type'] = None
res = process_notification(n_object=n_object, datastore=datastore)
return res[0]
def populate_feed_entry(fe, watch, content, guid, timestamp, link=None, title_suffix=None):
"""Populate a feed entry with content and metadata."""
watch_label = watch.get('url') # Already determined by caller
# Set link
if link:
fe.link(link=link)
# Set title
if title_suffix:
fe.title(title=f"{watch_label} - {title_suffix}")
else:
fe.title(title=watch_label)
# Clean and set content
if scan_invalid_chars_in_rss(content):
content = clean_entry_content(content)
fe.content(content=content, type='CDATA')
# Set GUID
fe.guid(guid, permalink=False)
# Set pubDate using the timestamp of this specific change
dt = datetime.datetime.fromtimestamp(int(timestamp))
dt = dt.replace(tzinfo=pytz.UTC)
fe.pubDate(dt)

View File

@@ -1,155 +1,26 @@
from changedetectionio.jinja2_custom import render as jinja_render
from changedetectionio.notification.handler import apply_service_tweaks
from changedetectionio.store import ChangeDetectionStore
from feedgen.feed import FeedGenerator
from flask import Blueprint, make_response, request, url_for, redirect
from loguru import logger
import datetime
import pytz
import re
import time
from flask import Blueprint
BAD_CHARS_REGEX=r'[\x00-\x08\x0B\x0C\x0E-\x1F]'
# Anything that is not text/UTF-8 should be stripped before it breaks feedgen (such as binary data etc)
def scan_invalid_chars_in_rss(content):
for match in re.finditer(BAD_CHARS_REGEX, content):
i = match.start()
bad_char = content[i]
hex_value = f"0x{ord(bad_char):02x}"
# Grab context
start = max(0, i - 20)
end = min(len(content), i + 21)
context = content[start:end].replace('\n', '\\n').replace('\r', '\\r')
logger.warning(f"Invalid char {hex_value} at pos {i}: ...{context}...")
# First match is enough
return True
return False
def clean_entry_content(content):
cleaned = re.sub(BAD_CHARS_REGEX, '', content)
return cleaned
from . import tag as tag_routes
from . import main_feed
from . import single_watch
def construct_blueprint(datastore: ChangeDetectionStore):
"""
Construct and configure the RSS blueprint with all routes.
Args:
datastore: The ChangeDetectionStore instance
Returns:
The configured Flask blueprint
"""
rss_blueprint = Blueprint('rss', __name__)
# Some RSS reader situations ended up with rss/ (forward slash after RSS) due
# to some earlier blueprint rerouting work, it should goto feed.
@rss_blueprint.route("/", methods=['GET'])
def extraslash():
return redirect(url_for('rss.feed'))
# Import the login decorator if needed
# from changedetectionio.auth_decorator import login_optionally_required
@rss_blueprint.route("", methods=['GET'])
def feed():
now = time.time()
# Always requires token set
app_rss_token = datastore.data['settings']['application'].get('rss_access_token')
rss_url_token = request.args.get('token')
if rss_url_token != app_rss_token:
return "Access denied, bad token", 403
from changedetectionio import diff
limit_tag = request.args.get('tag', '').lower().strip()
# Be sure limit_tag is a uuid
for uuid, tag in datastore.data['settings']['application'].get('tags', {}).items():
if limit_tag == tag.get('title', '').lower().strip():
limit_tag = uuid
# Sort by last_changed and add the uuid which is usually the key..
sorted_watches = []
# @todo needs a .itemsWithTag() or something - then we can use that in Jinaj2 and throw this away
for uuid, watch in datastore.data['watching'].items():
# @todo tag notification_muted skip also (improve Watch model)
if datastore.data['settings']['application'].get('rss_hide_muted_watches') and watch.get('notification_muted'):
continue
if limit_tag and not limit_tag in watch['tags']:
continue
watch['uuid'] = uuid
sorted_watches.append(watch)
sorted_watches.sort(key=lambda x: x.last_changed, reverse=False)
fg = FeedGenerator()
fg.title('changedetection.io')
fg.description('Feed description')
fg.link(href='https://changedetection.io')
html_colour_enable = False
if datastore.data['settings']['application'].get('rss_content_format') == 'html':
html_colour_enable = True
for watch in sorted_watches:
dates = list(watch.history.keys())
# Re #521 - Don't bother processing this one if theres less than 2 snapshots, means we never had a change detected.
if len(dates) < 2:
continue
if not watch.viewed:
# Re #239 - GUID needs to be individual for each event
# @todo In the future make this a configurable link back (see work on BASE_URL https://github.com/dgtlmoon/changedetection.io/pull/228)
guid = "{}/{}".format(watch['uuid'], watch.last_changed)
fe = fg.add_entry()
# Include a link to the diff page, they will have to login here to see if password protection is enabled.
# Description is the page you watch, link takes you to the diff JS UI page
# Dict val base_url will get overriden with the env var if it is set.
ext_base_url = datastore.data['settings']['application'].get('active_base_url')
# @todo fix
# Because we are called via whatever web server, flask should figure out the right path (
diff_link = {'href': url_for('ui.ui_views.diff_history_page', uuid=watch['uuid'], _external=True)}
fe.link(link=diff_link)
# Same logic as watch-overview.html
if datastore.data['settings']['application']['ui'].get('use_page_title_in_list') or watch.get('use_page_title_in_list'):
watch_label = watch.label
else:
watch_label = watch.get('url')
fe.title(title=watch_label)
try:
html_diff = diff.render_diff(previous_version_file_contents=watch.get_history_snapshot(timestamp=dates[-2]),
newest_version_file_contents=watch.get_history_snapshot(timestamp=dates[-1]),
include_equal=False,
line_feed_sep="<br>"
)
requested_output_format = 'htmlcolor' if html_colour_enable else 'html'
html_diff = apply_service_tweaks(url='', n_body=html_diff, n_title=None, requested_output_format=requested_output_format)
except FileNotFoundError as e:
html_diff = f"History snapshot file for watch {watch.get('uuid')}@{watch.last_changed} - '{watch.get('title')} not found."
# @todo Make this configurable and also consider html-colored markup
# @todo User could decide if <link> goes to the diff page, or to the watch link
rss_template = "<html><body>\n<h4><a href=\"{{watch_url}}\">{{watch_title}}</a></h4>\n<p>{{html_diff}}</p>\n</body></html>\n"
content = jinja_render(template_str=rss_template, watch_title=watch_label, html_diff=html_diff, watch_url=watch.link)
# Out of range chars could also break feedgen
if scan_invalid_chars_in_rss(content):
content = clean_entry_content(content)
fe.content(content=content, type='CDATA')
fe.guid(guid, permalink=False)
dt = datetime.datetime.fromtimestamp(int(watch.newest_history_key))
dt = dt.replace(tzinfo=pytz.UTC)
fe.pubDate(dt)
response = make_response(fg.rss_str())
response.headers.set('Content-Type', 'application/rss+xml;charset=utf-8')
logger.trace(f"RSS generated in {time.time() - now:.3f}s")
return response
# Register all route modules
main_feed.construct_main_feed_routes(rss_blueprint, datastore)
single_watch.construct_single_watch_routes(rss_blueprint, datastore)
tag_routes.construct_tag_routes(rss_blueprint, datastore)
return rss_blueprint

View File

@@ -0,0 +1,105 @@
from flask import make_response, request, url_for, redirect
def construct_main_feed_routes(rss_blueprint, datastore):
"""
Construct the main RSS feed routes.
Args:
rss_blueprint: The Flask blueprint to add routes to
datastore: The ChangeDetectionStore instance
"""
# Some RSS reader situations ended up with rss/ (forward slash after RSS) due
# to some earlier blueprint rerouting work, it should goto feed.
@rss_blueprint.route("/", methods=['GET'])
def extraslash():
return redirect(url_for('rss.feed'))
# Import the login decorator if needed
# from changedetectionio.auth_decorator import login_optionally_required
@rss_blueprint.route("", methods=['GET'])
def feed():
from feedgen.feed import FeedGenerator
from loguru import logger
import time
from . import RSS_TEMPLATE_HTML_DEFAULT, RSS_TEMPLATE_PLAINTEXT_DEFAULT
from ._util import (validate_rss_token, generate_watch_guid, get_rss_template,
get_watch_label, build_notification_context, render_notification,
populate_feed_entry, add_watch_categories)
from ...notification_service import NotificationService
now = time.time()
# Validate token
is_valid, error = validate_rss_token(datastore, request)
if not is_valid:
return error
rss_content_format = datastore.data['settings']['application'].get('rss_content_format')
limit_tag = request.args.get('tag', '').lower().strip()
# Be sure limit_tag is a uuid
for uuid, tag in datastore.data['settings']['application'].get('tags', {}).items():
if limit_tag == tag.get('title', '').lower().strip():
limit_tag = uuid
# Sort by last_changed and add the uuid which is usually the key..
sorted_watches = []
# @todo needs a .itemsWithTag() or something - then we can use that in Jinaj2 and throw this away
for uuid, watch in datastore.data['watching'].items():
# @todo tag notification_muted skip also (improve Watch model)
if datastore.data['settings']['application'].get('rss_hide_muted_watches') and watch.get('notification_muted'):
continue
if limit_tag and not limit_tag in watch['tags']:
continue
sorted_watches.append(watch)
sorted_watches.sort(key=lambda x: x.last_changed, reverse=False)
fg = FeedGenerator()
fg.title('changedetection.io')
fg.description('Feed description')
fg.link(href='https://changedetection.io')
notification_service = NotificationService(datastore=datastore, notification_q=False)
for watch in sorted_watches:
dates = list(watch.history.keys())
# Re #521 - Don't bother processing this one if theres less than 2 snapshots, means we never had a change detected.
if len(dates) < 2:
continue
if not watch.viewed:
# Re #239 - GUID needs to be individual for each event
# @todo In the future make this a configurable link back (see work on BASE_URL https://github.com/dgtlmoon/changedetection.io/pull/228)
watch_label = get_watch_label(datastore, watch)
timestamp_to = dates[-1]
timestamp_from = dates[-2]
guid = generate_watch_guid(watch, timestamp_to)
# Because we are called via whatever web server, flask should figure out the right path
diff_link = {'href': url_for('ui.ui_views.diff_history_page', uuid=watch['uuid'], _external=True)}
# Get template and build notification context
n_body_template = get_rss_template(datastore, watch, rss_content_format,
RSS_TEMPLATE_HTML_DEFAULT, RSS_TEMPLATE_PLAINTEXT_DEFAULT)
n_object = build_notification_context(watch, timestamp_from, timestamp_to,
watch_label, n_body_template, rss_content_format)
# Render notification
res = render_notification(n_object, notification_service, watch, datastore)
# Create and populate feed entry
fe = fg.add_entry()
populate_feed_entry(fe, watch, res['body'], guid, timestamp_to, link=diff_link)
fe.title(title=watch_label) # Override title to not include suffix
add_watch_categories(fe, watch, datastore)
response = make_response(fg.rss_str())
response.headers.set('Content-Type', 'application/rss+xml;charset=utf-8')
logger.trace(f"RSS generated in {time.time() - now:.3f}s")
return response

View File

@@ -0,0 +1,115 @@
def construct_single_watch_routes(rss_blueprint, datastore):
"""
Construct RSS feed routes for single watches.
Args:
rss_blueprint: The Flask blueprint to add routes to
datastore: The ChangeDetectionStore instance
"""
@rss_blueprint.route("/watch/<string:uuid>", methods=['GET'])
def rss_single_watch(uuid):
import time
from flask import make_response, request
from feedgen.feed import FeedGenerator
from loguru import logger
from . import RSS_TEMPLATE_HTML_DEFAULT, RSS_TEMPLATE_PLAINTEXT_DEFAULT
from ._util import (validate_rss_token, get_rss_template, get_watch_label,
build_notification_context, render_notification,
populate_feed_entry, add_watch_categories)
from ...notification_service import NotificationService
"""
Display the most recent changes for a single watch as RSS feed.
Returns RSS XML with multiple entries showing diffs between consecutive snapshots.
The number of entries is controlled by the rss_diff_length setting.
"""
now = time.time()
# Validate token
is_valid, error = validate_rss_token(datastore, request)
if not is_valid:
return error
rss_content_format = datastore.data['settings']['application'].get('rss_content_format')
# Get the watch by UUID
watch = datastore.data['watching'].get(uuid)
if not watch:
return f"Watch with UUID {uuid} not found", 404
# Check if watch has at least 2 history snapshots
dates = list(watch.history.keys())
if len(dates) < 2:
return f"Watch {uuid} does not have enough history snapshots to show changes (need at least 2)", 400
# Add uuid to watch for proper functioning
watch['uuid'] = uuid
# Get the number of diffs to include (default: 5)
rss_diff_length = datastore.data['settings']['application'].get('rss_diff_length', 5)
# Calculate how many diffs we can actually show (limited by available history)
# We need at least 2 snapshots to create 1 diff
max_possible_diffs = len(dates) - 1
num_diffs = min(rss_diff_length, max_possible_diffs) if rss_diff_length > 0 else max_possible_diffs
# Create RSS feed
fg = FeedGenerator()
# Set title: use "label (url)" if label differs from url, otherwise just url
watch_url = watch.get('url', '')
watch_label = get_watch_label(datastore, watch)
if watch_label != watch_url:
feed_title = f'changedetection.io - {watch_label} ({watch_url})'
else:
feed_title = f'changedetection.io - {watch_url}'
fg.title(feed_title)
fg.description('Changes')
fg.link(href='https://changedetection.io')
# Loop through history and create RSS entries for each diff
# Add entries in reverse order because feedgen reverses them
# This way, the newest change appears first in the final RSS
notification_service = NotificationService(datastore=datastore, notification_q=False)
for i in range(num_diffs - 1, -1, -1):
# Calculate indices for this diff (working backwards from newest)
# i=0: compare dates[-2] to dates[-1] (most recent change)
# i=1: compare dates[-3] to dates[-2] (previous change)
# etc.
date_index_to = -(i + 1)
date_index_from = -(i + 2)
timestamp_to = dates[date_index_to]
timestamp_from = dates[date_index_from]
# Get template and build notification context
n_body_template = get_rss_template(datastore, watch, rss_content_format,
RSS_TEMPLATE_HTML_DEFAULT, RSS_TEMPLATE_PLAINTEXT_DEFAULT)
n_object = build_notification_context(watch, timestamp_from, timestamp_to,
watch_label, n_body_template, rss_content_format)
# Render notification with date indices
res = render_notification(n_object, notification_service, watch, datastore,
date_index_from, date_index_to)
# Create and populate feed entry
guid = f"{watch['uuid']}/{timestamp_to}"
fe = fg.add_entry()
title_suffix = f"Change @ {res['original_context']['change_datetime']}"
populate_feed_entry(fe, watch, res.get('body', ''), guid, timestamp_to,
link={'href': watch.get('url')}, title_suffix=title_suffix)
add_watch_categories(fe, watch, datastore)
response = make_response(fg.rss_str())
response.headers.set('Content-Type', 'application/rss+xml;charset=utf-8')
logger.debug(f"RSS Single watch built in {time.time()-now:.2f}s")
return response

View File

@@ -0,0 +1,98 @@
def construct_tag_routes(rss_blueprint, datastore):
"""
Construct RSS feed routes for tags.
Args:
rss_blueprint: The Flask blueprint to add routes to
datastore: The ChangeDetectionStore instance
"""
@rss_blueprint.route("/tag/<string:tag_uuid>", methods=['GET'])
def rss_tag_feed(tag_uuid):
from flask import make_response, request, url_for
from feedgen.feed import FeedGenerator
from . import RSS_TEMPLATE_HTML_DEFAULT, RSS_TEMPLATE_PLAINTEXT_DEFAULT
from ._util import (validate_rss_token, generate_watch_guid, get_rss_template,
get_watch_label, build_notification_context, render_notification,
populate_feed_entry, add_watch_categories)
from ...notification_service import NotificationService
"""
Display an RSS feed for all unviewed watches that belong to a specific tag.
Returns RSS XML with entries for each unviewed watch with sufficient history.
"""
# Validate token
is_valid, error = validate_rss_token(datastore, request)
if not is_valid:
return error
rss_content_format = datastore.data['settings']['application'].get('rss_content_format')
# Verify tag exists
tag = datastore.data['settings']['application'].get('tags', {}).get(tag_uuid)
if not tag:
return f"Tag with UUID {tag_uuid} not found", 404
tag_title = tag.get('title', 'Unknown Tag')
# Create RSS feed
fg = FeedGenerator()
fg.title(f'changedetection.io - {tag_title}')
fg.description(f'Changes for watches tagged with {tag_title}')
fg.link(href='https://changedetection.io')
notification_service = NotificationService(datastore=datastore, notification_q=False)
# Find all watches with this tag
for uuid, watch in datastore.data['watching'].items():
#@todo This is wrong, it needs to sort by most recently changed and then limit it datastore.data['watching'].items().sorted(?)
# So get all watches in this tag then sort
# Skip if watch doesn't have this tag
if tag_uuid not in watch.get('tags', []):
continue
# Skip muted watches if configured
if datastore.data['settings']['application'].get('rss_hide_muted_watches') and watch.get('notification_muted'):
continue
# Check if watch has at least 2 history snapshots
dates = list(watch.history.keys())
if len(dates) < 2:
continue
# Only include unviewed watches
if not watch.viewed:
# Add uuid to watch for proper functioning
watch['uuid'] = uuid
# Include a link to the diff page
diff_link = {'href': url_for('ui.ui_views.diff_history_page', uuid=watch['uuid'], _external=True)}
# Get watch label
watch_label = get_watch_label(datastore, watch)
# Get template and build notification context
timestamp_to = dates[-1]
timestamp_from = dates[-2]
# Generate GUID for this entry
guid = generate_watch_guid(watch, timestamp_to)
n_body_template = get_rss_template(datastore, watch, rss_content_format,
RSS_TEMPLATE_HTML_DEFAULT, RSS_TEMPLATE_PLAINTEXT_DEFAULT)
n_object = build_notification_context(watch, timestamp_from, timestamp_to,
watch_label, n_body_template, rss_content_format)
# Render notification
res = render_notification(n_object, notification_service, watch, datastore)
# Create and populate feed entry
fe = fg.add_entry()
title_suffix = f"Change @ {res['original_context']['change_datetime']}"
populate_feed_entry(fe, watch, res['body'], guid, timestamp_to, link=diff_link, title_suffix=title_suffix)
add_watch_categories(fe, watch, datastore)
response = make_response(fg.rss_str())
response.headers.set('Content-Type', 'application/rss+xml;charset=utf-8')
return response

View File

@@ -2,7 +2,7 @@
{% block content %}
{% from '_helpers.html' import render_field, render_checkbox_field, render_button, render_time_schedule_form, render_ternary_field, render_fieldlist_with_inline_errors %}
{% from '_common_fields.html' import render_common_settings_form %}
{% from '_common_fields.html' import render_common_settings_form, show_token_placeholders %}
<script>
const notification_base_url="{{url_for('ui.ui_notification.ajax_callback_send_notification_test', mode="global-settings")}}";
{% if emailprefix %}
@@ -24,6 +24,7 @@
<li class="tab"><a href="#filters">Global Filters</a></li>
<li class="tab"><a href="#ui-options">UI Options</a></li>
<li class="tab"><a href="#api">API</a></li>
<li class="tab"><a href="#rss">RSS</a></li>
<li class="tab"><a href="#timedate">Time &amp Date</a></li>
<li class="tab"><a href="#proxies">CAPTCHA &amp; Proxies</a></li>
</ul>
@@ -43,10 +44,6 @@
</div>
</div>
</div>
<div class="pure-control-group">
{{ render_field(form.requests.form.jitter_seconds, class="jitter_seconds") }}
<span class="pure-form-message-inline">Example - 3 seconds random jitter could trigger up to 3 seconds earlier or up to 3 seconds later</span>
</div>
<div class="pure-control-group">
{{ render_field(form.application.form.filter_failure_notification_threshold_attempts, class="filter_failure_notification_threshold_attempts") }}
<span class="pure-form-message-inline">After this many consecutive times that the CSS/xPath filter is missing, send a notification
@@ -69,26 +66,13 @@
<div class="pure-control-group">
{{ render_checkbox_field(form.application.form.shared_diff_access, class="shared_diff_access") }}
<span class="pure-form-message-inline">Allow access to view watch diff page when password is enabled (Good for sharing the diff page)
<span class="pure-form-message-inline">Allow access to the watch change history page when password is enabled (Good for sharing the diff page)
</span>
</div>
<div class="pure-control-group">
{{ render_checkbox_field(form.application.form.empty_pages_are_a_change) }}
<span class="pure-form-message-inline">When a request returns no content, or the HTML does not contain any text, is this considered a change?</span>
</div>
<div class="grey-form-border">
<div class="pure-control-group">
{{ render_checkbox_field(form.application.form.rss_hide_muted_watches) }}
</div>
<div class="pure-control-group">
{{ render_field(form.application.form.rss_content_format) }}
<span class="pure-form-message-inline">Love RSS? Does your reader support HTML? Set it here</span>
</div>
<div class="pure-control-group">
{{ render_checkbox_field(form.application.form.rss_reader_mode) }}
<span class="pure-form-message-inline">Transforms RSS/RDF feed watches into beautiful text only</span>
</div>
</div>
</fieldset>
</div>
@@ -131,6 +115,10 @@
<span class="pure-form-message-inline">Number of concurrent workers to process watches. More workers = faster processing but higher memory usage.<br>
Currently running: <strong>{{ worker_info.count }}</strong> operational {{ worker_info.type }} workers{% if worker_info.active_workers > 0 %} ({{ worker_info.active_workers }} actively processing){% endif %}.</span>
</div>
<div class="pure-control-group">
{{ render_field(form.requests.form.jitter_seconds, class="jitter_seconds") }}
<span class="pure-form-message-inline">Example - 3 seconds random jitter could trigger up to 3 seconds earlier or up to 3 seconds later</span>
</div>
<div class="pure-control-group">
{{ render_field(form.requests.form.timeout) }}
<span class="pure-form-message-inline">For regular plain requests (not chrome based), maximum number of seconds until timeout, 1-999.<br>
@@ -230,7 +218,37 @@ nav
</p>
</div>
</div>
<div class="tab-pane-inner" id="timedate">
<div class="tab-pane-inner" id="rss">
<div class="pure-control-group">
{{ render_checkbox_field(form.application.form.rss_hide_muted_watches) }}
</div>
<div class="pure-control-group">
{{ render_field(form.application.form.rss_diff_length) }}
<span class="pure-form-message-inline">Maximum number of history snapshots to include in the watch specific RSS feed.</span>
</div>
<div class="pure-control-group">
{{ render_checkbox_field(form.application.form.rss_reader_mode) }}
<span class="pure-form-message-inline">For watching other RSS feeds - When watching RSS/Atom feeds, convert them into clean text for better change detection.</span>
</div>
<div class="pure-control-group grey-form-border">
<div class="pure-control-group">
{{ render_field(form.application.form.rss_content_format) }}
<span class="pure-form-message-inline">Does your reader support HTML? Set it here</span>
</div>
<div class="pure-control-group">
{{ render_field(form.application.form.rss_template_type) }}
<span class="pure-form-message-inline">'System default' for the same template for all items, or re-use your "Notification Body" as the template.</span>
</div>
<div>
{{ render_field(form.application.form.rss_template_override) }}
{{ show_token_placeholders(extra_notification_token_placeholder_info=extra_notification_token_placeholder_info, suffix="-rss") }}
</div>
</div>
<br>
</div>
<div class="tab-pane-inner" id="timedate">
<div class="pure-control-group">
Ensure the settings below are correct, they are used to manage the time schedule for checking your web page watches.
</div>

View File

@@ -21,9 +21,10 @@ def construct_blueprint(datastore: ChangeDetectionStore):
tag_count = Counter(tag for watch in datastore.data['watching'].values() if watch.get('tags') for tag in watch['tags'])
output = render_template("groups-overview.html",
app_rss_token=datastore.data['settings']['application'].get('rss_access_token'),
available_tags=sorted_tags,
form=add_form,
tag_count=tag_count
tag_count=tag_count,
)
return output
@@ -149,9 +150,9 @@ def construct_blueprint(datastore: ChangeDetectionStore):
included_content = template.render(**template_args)
output = render_template("edit-tag.html",
settings_application=datastore.data['settings']['application'],
extra_tab_content=form.extra_tab_content() if form.extra_tab_content() else None,
extra_form_content=included_content,
extra_tab_content=form.extra_tab_content() if form.extra_tab_content() else None,
settings_application=datastore.data['settings']['application'],
**template_args
)

View File

@@ -52,6 +52,7 @@
<a class="pure-button pure-button-primary" href="{{ url_for('tags.form_tag_edit', uuid=uuid) }}">Edit</a>&nbsp;
<a class="pure-button pure-button-primary" href="{{ url_for('tags.delete', uuid=uuid) }}" title="Deletes and removes tag">Delete</a>
<a class="pure-button pure-button-primary" href="{{ url_for('tags.unlink', uuid=uuid) }}" title="Keep the tag but unlink any watches">Unlink</a>
<a href="{{ url_for('rss.rss_tag_feed', tag_uuid=uuid, token=app_rss_token)}}"><img alt="RSS Feed for this watch" style="padding-left: 1em;" src="{{url_for('static_content', group='images', filename='generic_feed-icon.svg')}}" height="15"></a>
</td>
</tr>
{% endfor %}

View File

@@ -236,7 +236,7 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, queuedWatchMe
# Import the global plugin system
from changedetectionio.pluggy_interface import collect_ui_edit_stats_extras
app_rss_token = datastore.data['settings']['application'].get('rss_access_token'),
template_args = {
'available_processors': processors.available_processors(),
'available_timezones': sorted(available_timezones()),
@@ -252,6 +252,11 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, queuedWatchMe
'has_special_tag_options': _watch_has_tag_options_set(watch=watch),
'jq_support': jq_support,
'playwright_enabled': os.getenv('PLAYWRIGHT_DRIVER_URL', False),
'app_rss_token': app_rss_token,
'rss_uuid_feed' : {
'label': watch.label,
'url': url_for('rss.rss_single_watch', uuid=watch['uuid'], token=app_rss_token)
},
'settings_application': datastore.data['settings']['application'],
'system_has_playwright_configured': os.getenv('PLAYWRIGHT_DRIVER_URL'),
'system_has_webdriver_configured': os.getenv('WEBDRIVER_URL'),

View File

@@ -2,7 +2,6 @@ from flask import Blueprint, request, make_response
import random
from loguru import logger
from changedetectionio.notification_service import NotificationContextData, set_basic_notification_vars
from changedetectionio.store import ChangeDetectionStore
from changedetectionio.auth_decorator import login_optionally_required
@@ -15,7 +14,7 @@ def construct_blueprint(datastore: ChangeDetectionStore):
@notification_blueprint.route("/notification/send-test/", methods=['POST'])
@login_optionally_required
def ajax_callback_send_notification_test(watch_uuid=None):
from changedetectionio.notification_service import NotificationContextData, set_basic_notification_vars
# Watch_uuid could be unset in the case it`s used in tag editor, global settings
import apprise
from changedetectionio.notification.handler import process_notification
@@ -97,31 +96,14 @@ def construct_blueprint(datastore: ChangeDetectionStore):
n_object['as_async'] = False
# Same like in notification service, should be refactored
dates = []
dates = list(watch.history.keys())
trigger_text = ''
snapshot_contents = ''
if watch:
watch_history = watch.history
dates = list(watch_history.keys())
trigger_text = watch.get('trigger_text', [])
# Add text that was triggered
if len(dates):
snapshot_contents = watch.get_history_snapshot(timestamp=dates[-1])
else:
snapshot_contents = "No snapshot/history available, the watch should fetch atleast once."
if len(trigger_text):
from . import html_tools
triggered_text = html_tools.get_triggered_text(content=snapshot_contents, trigger_text=trigger_text)
if triggered_text:
triggered_text = '\n'.join(triggered_text)
# Could be called as a 'test notification' with only 1 snapshot available
prev_snapshot = "Example text: example test\nExample text: change detection is cool\nExample text: some more examples\n"
current_snapshot = "Example text: example test\nExample text: change detection is fantastic\nExample text: even more examples\nExample text: a lot more examples"
if len(dates) > 1:
prev_snapshot = watch.get_history_snapshot(timestamp=dates[-2])
current_snapshot = watch.get_history_snapshot(timestamp=dates[-1])
@@ -130,7 +112,8 @@ def construct_blueprint(datastore: ChangeDetectionStore):
current_snapshot=current_snapshot,
prev_snapshot=prev_snapshot,
watch=watch,
triggered_text=trigger_text))
triggered_text=trigger_text,
timestamp_changed=dates[-1] if dates else None))
sent_obj = process_notification(n_object, datastore)

View File

@@ -476,6 +476,7 @@ Math: {{ 1 + 1 }}") }}
class="pure-button button-error">Clear History</a>{% endif %}
<a href="{{url_for('ui.form_clone', uuid=uuid)}}"
class="pure-button">Clone &amp; Edit</a>
<a href="{{ url_for('rss.rss_single_watch', uuid=uuid, token=app_rss_token)}}"><img alt="RSS Feed for this watch" style="padding: .5em 1em;" src="{{url_for('static_content', group='images', filename='generic_feed-icon.svg')}}" height="15"></a>
</div>
</div>
</form>

View File

@@ -1,6 +1,7 @@
from loguru import logger
import hashlib
import os
import re
import asyncio
from changedetectionio import strtobool
from changedetectionio.content_fetchers.exceptions import BrowserStepsInUnsupportedFetcher, EmptyReply, Non200ErrorCodeReceived
@@ -76,9 +77,22 @@ class fetcher(Fetcher):
if not is_binary:
# Don't run this for PDF (and requests identified as binary) takes a _long_ time
if not r.headers.get('content-type') or not 'charset=' in r.headers.get('content-type'):
encoding = chardet.detect(r.content)['encoding']
if encoding:
r.encoding = encoding
# For XML/RSS feeds, check the XML declaration for encoding attribute
# This is more reliable than chardet which can misdetect UTF-8 as MacRoman
content_type = r.headers.get('content-type', '').lower()
if 'xml' in content_type or 'rss' in content_type:
# Look for <?xml version="1.0" encoding="UTF-8"?>
xml_encoding_match = re.search(rb'<\?xml[^>]+encoding=["\']([^"\']+)["\']', r.content[:200])
if xml_encoding_match:
r.encoding = xml_encoding_match.group(1).decode('ascii')
else:
# Default to UTF-8 for XML if no encoding found
r.encoding = 'utf-8'
else:
# For other content types, use chardet
encoding = chardet.detect(r.content)['encoding']
if encoding:
r.encoding = encoding
self.headers = r.headers

View File

@@ -3,7 +3,7 @@ import re
from loguru import logger
from wtforms.widgets.core import TimeInput
from changedetectionio.blueprint.rss import RSS_FORMAT_TYPES
from changedetectionio.blueprint.rss import RSS_FORMAT_TYPES, RSS_TEMPLATE_TYPE_OPTIONS, RSS_TEMPLATE_HTML_DEFAULT
from changedetectionio.conditions.form import ConditionFormRow
from changedetectionio.notification_service import NotificationContextData
from changedetectionio.strtobool import strtobool
@@ -1000,7 +1000,9 @@ class globalSettingsApplicationForm(commonSettingsForm):
validators=[validators.NumberRange(min=0,
message="Should be atleast zero (disabled)")])
rss_content_format = SelectField('RSS Content format', choices=RSS_FORMAT_TYPES)
rss_content_format = SelectField('RSS Content format', choices=list(RSS_FORMAT_TYPES.items()))
rss_template_type = SelectField('RSS <description> body built from', choices=list(RSS_TEMPLATE_TYPE_OPTIONS.items()))
rss_template_override = TextAreaField('RSS "System default" template override', render_kw={"rows": "5", "placeholder": RSS_TEMPLATE_HTML_DEFAULT}, validators=[validators.Optional(), ValidateJinja2Template()])
removepassword_button = SubmitField('Remove password', render_kw={"class": "pure-button pure-button-primary"})
render_anchor_tag_content = BooleanField('Render anchor tag content', default=False)
@@ -1009,8 +1011,10 @@ class globalSettingsApplicationForm(commonSettingsForm):
rss_hide_muted_watches = BooleanField('Hide muted watches from RSS feed', default=True,
validators=[validators.Optional()])
rss_reader_mode = BooleanField('RSS reader mode ', default=False,
validators=[validators.Optional()])
rss_reader_mode = BooleanField('Enable RSS reader mode ', default=False, validators=[validators.Optional()])
rss_diff_length = IntegerField(label='Number of changes to show in watch RSS feed',
render_kw={"style": "width: 5em;"},
validators=[validators.NumberRange(min=0, message="Should contain zero or more attempts")])
filter_failure_notification_threshold_attempts = IntegerField('Number of times the filter can be missing before sending a notification',
render_kw={"style": "width: 5em;"},

View File

@@ -1,7 +1,7 @@
from os import getenv
from copy import deepcopy
from changedetectionio.blueprint.rss import RSS_FORMAT_TYPES
from changedetectionio.blueprint.rss import RSS_FORMAT_TYPES, RSS_CONTENT_FORMAT_DEFAULT
from changedetectionio.notification import (
default_notification_body,
@@ -54,7 +54,10 @@ class model(dict):
'password': False,
'render_anchor_tag_content': False,
'rss_access_token': None,
'rss_content_format': RSS_FORMAT_TYPES[0][0],
'rss_content_format': RSS_CONTENT_FORMAT_DEFAULT,
'rss_template_type': 'system_default',
'rss_template_override': None,
'rss_diff_length': 5,
'rss_hide_muted_watches': True,
'rss_reader_mode': False,
'scheduler_timezone_default': None, # Default IANA timezone name

View File

@@ -187,6 +187,8 @@ def replace_placemarkers_in_text(text, url, requested_output_format):
def apply_service_tweaks(url, n_body, n_title, requested_output_format):
logger.debug(f"Applying markup in '{requested_output_format}' mode")
# Re 323 - Limit discord length to their 2000 char limit total or it wont send.
# Because different notifications may require different pre-processing, run each sequentially :(
# 2000 bytes minus -
@@ -397,11 +399,15 @@ def process_notification(n_object: NotificationContextData, datastore):
apprise_input_format = NotifyFormat.TEXT.value
requested_output_format = NotifyFormat.TEXT.value
#@todo on null:// (only if its a 1 url with null) probably doesnt need to actually .add/setup/etc
sent_objs.append({'title': n_title,
'body': n_body,
'url': url})
apobj.add(url)
'url': url,
# So that we can do a null:// call and get back exactly what would have been sent
'original_context': n_object })
if not url.startswith('null://'):
apobj.add(url)
# Since the output is always based on the plaintext of the 'diff' engine, wrap it nicely.
# It should always be similar to the 'history' part of the UI.
@@ -409,15 +415,16 @@ def process_notification(n_object: NotificationContextData, datastore):
if not '<pre' in n_body and not '<body' in n_body: # No custom HTML-ish body was setup already
n_body = as_monospaced_html_email(content=n_body, title=n_title)
apobj.notify(
title=n_title,
body=n_body,
# `body_format` Tell apprise what format the INPUT is in, specify a wrong/bad type and it will force skip conversion in apprise
# &format= in URL Tell apprise what format the OUTPUT should be in (it can convert between)
body_format=apprise_input_format,
# False is not an option for AppRise, must be type None
attach=n_object.get('screenshot', None)
)
if not url.startswith('null://'):
apobj.notify(
title=n_title,
body=n_body,
# `body_format` Tell apprise what format the INPUT is in, specify a wrong/bad type and it will force skip conversion in apprise
# &format= in URL Tell apprise what format the OUTPUT should be in (it can convert between)
body_format=apprise_input_format,
# False is not an option for AppRise, must be type None
attach=n_object.get('screenshot', None)
)
# Returns empty string if nothing found, multi-line string otherwise
log_value = logs.getvalue()
@@ -436,6 +443,8 @@ def create_notification_parameters(n_object: NotificationContextData, datastore)
if not isinstance(n_object, NotificationContextData):
raise TypeError(f"Expected NotificationContextData, got {type(n_object)}")
ext_base_url = datastore.data['settings']['application'].get('active_base_url').strip('/')+'/'
watch = datastore.data['watching'].get(n_object['uuid'])
if watch:
watch_title = datastore.data['watching'][n_object['uuid']].label
@@ -449,20 +458,29 @@ def create_notification_parameters(n_object: NotificationContextData, datastore)
watch_title = 'Change Detection'
watch_tag = ''
# Create URLs to customise the notification with
# active_base_url - set in store.py data property
base_url = datastore.data['settings']['application'].get('active_base_url')
watch_url = n_object['watch_url']
diff_url = "{}/diff/{}".format(base_url, n_object['uuid'])
preview_url = "{}/preview/{}".format(base_url, n_object['uuid'])
# Build URLs manually instead of using url_for() to avoid requiring a request context
# This allows notifications to be processed in background threads
uuid = n_object['uuid']
if n_object.get('timestamp_from') and n_object.get('timestamp_to'):
# Include a link to the diff page with specific versions
diff_url = f"{ext_base_url}diff/{uuid}?from_version={n_object['timestamp_from']}&to_version={n_object['timestamp_to']}"
else:
diff_url = f"{ext_base_url}diff/{uuid}"
preview_url = f"{ext_base_url}preview/{uuid}"
edit_url = f"{ext_base_url}edit/{uuid}"
# @todo test that preview_url is correct when running in not-null mode?
# if not, first time app loads i think it can set a flask context
n_object.update(
{
'base_url': base_url,
'base_url': ext_base_url,
'diff_url': diff_url,
'preview_url': preview_url,
'preview_url': preview_url, #@todo include 'version='
'edit_url': edit_url, #@todo also pause, also mute link
'watch_tag': watch_tag if watch_tag is not None else '',
'watch_title': watch_title if watch_title is not None else '',
'watch_url': watch_url,

View File

@@ -5,13 +5,54 @@ Notification Service Module
Extracted from update_worker.py to provide standalone notification functionality
for both sync and async workers
"""
import datetime
import pytz
from loguru import logger
import time
from changedetectionio.notification import default_notification_format, valid_notification_formats
def _check_cascading_vars(datastore, var_name, watch):
"""
Check notification variables in cascading priority:
Individual watch settings > Tag settings > Global settings
"""
from changedetectionio.notification import (
USE_SYSTEM_DEFAULT_NOTIFICATION_FORMAT_FOR_WATCH,
default_notification_body,
default_notification_title
)
# Would be better if this was some kind of Object where Watch can reference the parent datastore etc
v = watch.get(var_name)
if v and not watch.get('notification_muted'):
if var_name == 'notification_format' and v == USE_SYSTEM_DEFAULT_NOTIFICATION_FORMAT_FOR_WATCH:
return datastore.data['settings']['application'].get('notification_format')
return v
tags = datastore.get_all_tags_for_watch(uuid=watch.get('uuid'))
if tags:
for tag_uuid, tag in tags.items():
v = tag.get(var_name)
if v and not tag.get('notification_muted'):
return v
if datastore.data['settings']['application'].get(var_name):
return datastore.data['settings']['application'].get(var_name)
# Otherwise could be defaults
if var_name == 'notification_format':
return USE_SYSTEM_DEFAULT_NOTIFICATION_FORMAT_FOR_WATCH
if var_name == 'notification_body':
return default_notification_body
if var_name == 'notification_title':
return default_notification_title
return None
# What is passed around as notification context, also used as the complete list of valid {{ tokens }}
class NotificationContextData(dict):
@@ -34,6 +75,8 @@ class NotificationContextData(dict):
'preview_url': None,
'screenshot': None,
'triggered_text': None,
'timestamp_from': None,
'timestamp_to': None,
'uuid': 'XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX', # Converted to 'watch_uuid' in create_notification_parameters
'watch_mime_type': None,
'watch_tag': None,
@@ -73,7 +116,25 @@ class NotificationContextData(dict):
super().__setitem__(key, value)
def set_basic_notification_vars(snapshot_contents, current_snapshot, prev_snapshot, watch, triggered_text):
def timestamp_to_localtime(timestamp):
# Format the date using locale-aware formatting with timezone
dt = datetime.datetime.fromtimestamp(int(timestamp))
dt = dt.replace(tzinfo=pytz.UTC)
# Get local timezone-aware datetime
local_tz = datetime.datetime.now().astimezone().tzinfo
local_dt = dt.astimezone(local_tz)
# Format date with timezone - using strftime for locale awareness
try:
formatted_date = local_dt.strftime('%Y-%m-%d %H:%M:%S %Z')
except:
# Fallback if locale issues
formatted_date = local_dt.isoformat()
return formatted_date
def set_basic_notification_vars(snapshot_contents, current_snapshot, prev_snapshot, watch, triggered_text, timestamp_changed=None):
now = time.time()
from changedetectionio import diff
@@ -89,6 +150,7 @@ def set_basic_notification_vars(snapshot_contents, current_snapshot, prev_snapsh
'diff_removed': diff.render_diff(prev_snapshot, current_snapshot, include_added=False),
'diff_removed_clean': diff.render_diff(prev_snapshot, current_snapshot, include_added=False, include_change_type_prefix=False),
'screenshot': watch.get_screenshot() if watch and watch.get('notification_screenshot') else None,
'change_datetime': timestamp_to_localtime(timestamp_changed) if timestamp_changed else None,
'triggered_text': triggered_text,
'uuid': watch.get('uuid') if watch else None,
'watch_url': watch.get('url') if watch else None,
@@ -114,7 +176,7 @@ class NotificationService:
self.datastore = datastore
self.notification_q = notification_q
def queue_notification_for_watch(self, n_object: NotificationContextData, watch):
def queue_notification_for_watch(self, n_object: NotificationContextData, watch, date_index_from=-2, date_index_to=-1):
"""
Queue a notification for a watch with full diff rendering and template variables
"""
@@ -154,57 +216,23 @@ class NotificationService:
current_snapshot = "Example text: example test\nExample text: change detection is fantastic\nExample text: even more examples\nExample text: a lot more examples"
if len(dates) > 1:
prev_snapshot = watch.get_history_snapshot(timestamp=dates[-2])
current_snapshot = watch.get_history_snapshot(timestamp=dates[-1])
prev_snapshot = watch.get_history_snapshot(timestamp=dates[date_index_from])
current_snapshot = watch.get_history_snapshot(timestamp=dates[date_index_to])
n_object.update(set_basic_notification_vars(snapshot_contents=snapshot_contents,
current_snapshot=current_snapshot,
prev_snapshot=prev_snapshot,
watch=watch,
triggered_text=triggered_text))
triggered_text=triggered_text,
timestamp_changed=dates[date_index_to]))
logger.debug("Queued notification for sending")
self.notification_q.put(n_object)
def _check_cascading_vars(self, var_name, watch):
"""
Check notification variables in cascading priority:
Individual watch settings > Tag settings > Global settings
"""
from changedetectionio.notification import (
USE_SYSTEM_DEFAULT_NOTIFICATION_FORMAT_FOR_WATCH,
default_notification_body,
default_notification_title
)
# Would be better if this was some kind of Object where Watch can reference the parent datastore etc
v = watch.get(var_name)
if v and not watch.get('notification_muted'):
if var_name == 'notification_format' and v == USE_SYSTEM_DEFAULT_NOTIFICATION_FORMAT_FOR_WATCH:
return self.datastore.data['settings']['application'].get('notification_format')
return v
tags = self.datastore.get_all_tags_for_watch(uuid=watch.get('uuid'))
if tags:
for tag_uuid, tag in tags.items():
v = tag.get(var_name)
if v and not tag.get('notification_muted'):
return v
if self.datastore.data['settings']['application'].get(var_name):
return self.datastore.data['settings']['application'].get(var_name)
# Otherwise could be defaults
if var_name == 'notification_format':
return USE_SYSTEM_DEFAULT_NOTIFICATION_FORMAT_FOR_WATCH
if var_name == 'notification_body':
return default_notification_body
if var_name == 'notification_title':
return default_notification_title
return None
if self.notification_q:
logger.debug("Queued notification for sending")
self.notification_q.put(n_object)
else:
logger.debug("Not queued, no queue defined. Just returning processed data")
return n_object
def send_content_changed_notification(self, watch_uuid):
"""
@@ -227,10 +255,11 @@ class NotificationService:
# Should be a better parent getter in the model object
# Prefer - Individual watch settings > Tag settings > Global settings (in that order)
n_object['notification_urls'] = self._check_cascading_vars('notification_urls', watch)
n_object['notification_title'] = self._check_cascading_vars('notification_title', watch)
n_object['notification_body'] = self._check_cascading_vars('notification_body', watch)
n_object['notification_format'] = self._check_cascading_vars('notification_format', watch)
# this change probably not needed?
n_object['notification_urls'] = _check_cascading_vars(self.datastore, 'notification_urls', watch)
n_object['notification_title'] = _check_cascading_vars(self.datastore,'notification_title', watch)
n_object['notification_body'] = _check_cascading_vars(self.datastore,'notification_body', watch)
n_object['notification_format'] = _check_cascading_vars(self.datastore,'notification_format', watch)
# (Individual watch) Only prepare to notify if the rules above matched
queued = False
@@ -269,7 +298,7 @@ Thanks - Your omniscient changedetection.io installation.
n_object = NotificationContextData({
'notification_title': 'Changedetection.io - Alert - CSS/xPath filter was not present in the page',
'notification_body': body,
'notification_format': self._check_cascading_vars('notification_format', watch),
'notification_format': _check_cascading_vars(self.datastore, 'notification_format', watch),
})
n_object['markup_text_links_to_html_links'] = n_object.get('notification_format').startswith('html')

View File

@@ -29,16 +29,135 @@ def cdata_in_document_to_text(html_content: str, render_anchor_tag_content=False
return re.sub(pattern, repl, html_content)
# Jinja2 template for formatting RSS/Atom feed entries
# Covers all common feedparser entry fields including namespaced elements
# Outputs HTML that will be converted to text via html_to_text
# @todo - This could be a UI setting in the future
RSS_ENTRY_TEMPLATE = """<article class="rss-item" id="{{ entry.id|replace('"', '')|replace(' ', '-') }}">{%- if entry.title -%}Title: {{ entry.title }}<br>{%- endif -%}
{%- if entry.link -%}<strong>Link:</strong> <a href="{{ entry.link }}">{{ entry.link }}</a><br>
{%- endif -%}
{%- if entry.id -%}
<strong>Guid:</strong> {{ entry.id }}<br>
{%- endif -%}
{%- if entry.published -%}
<strong>PubDate:</strong> {{ entry.published }}<br>
{%- endif -%}
{%- if entry.updated and entry.updated != entry.published -%}
<strong>Updated:</strong> {{ entry.updated }}<br>
{%- endif -%}
{%- if entry.author -%}
<strong>Author:</strong> {{ entry.author }}<br>
{%- elif entry.author_detail and entry.author_detail.name -%}
<strong>Author:</strong> {{ entry.author_detail.name }}
{%- if entry.author_detail.email %} ({{ entry.author_detail.email }}){% endif -%}
<br>
{%- endif -%}
{%- if entry.contributors -%}
<strong>Contributors:</strong> {% for contributor in entry.contributors -%}
{{ contributor.name if contributor.name else contributor }}
{%- if not loop.last %}, {% endif -%}
{%- endfor %}<br>
{%- endif -%}
{%- if entry.publisher -%}
<strong>Publisher:</strong> {{ entry.publisher }}<br>
{%- endif -%}
{%- if entry.rights -%}
<strong>Rights:</strong> {{ entry.rights }}<br>
{%- endif -%}
{%- if entry.license -%}
<strong>License:</strong> {{ entry.license }}<br>
{%- endif -%}
{%- if entry.language -%}
<strong>Language:</strong> {{ entry.language }}<br>
{%- endif -%}
{%- if entry.tags -%}
<strong>Tags:</strong> {% for tag in entry.tags -%}
{{ tag.term if tag.term else tag }}
{%- if not loop.last %}, {% endif -%}
{%- endfor %}<br>
{%- endif -%}
{%- if entry.category -%}
<strong>Category:</strong> {{ entry.category }}<br>
{%- endif -%}
{%- if entry.comments -%}
<strong>Comments:</strong> <a href="{{ entry.comments }}">{{ entry.comments }}</a><br>
{%- endif -%}
{%- if entry.slash_comments -%}
<strong>Comment Count:</strong> {{ entry.slash_comments }}<br>
{%- endif -%}
{%- if entry.enclosures -%}
<strong>Enclosures:</strong><br>
{%- for enclosure in entry.enclosures %}
- <a href="{{ enclosure.href }}">{{ enclosure.href }}</a> ({{ enclosure.type if enclosure.type else 'unknown type' }}
{%- if enclosure.length %}, {{ enclosure.length }} bytes{% endif -%}
)<br>
{%- endfor -%}
{%- endif -%}
{%- if entry.media_content -%}
<strong>Media:</strong><br>
{%- for media in entry.media_content %}
- <a href="{{ media.url }}">{{ media.url }}</a>
{%- if media.type %} ({{ media.type }}){% endif -%}
{%- if media.width and media.height %} {{ media.width }}x{{ media.height }}{% endif -%}
<br>
{%- endfor -%}
{%- endif -%}
{%- if entry.media_thumbnail -%}
<strong>Thumbnail:</strong> <a href="{{ entry.media_thumbnail[0].url if entry.media_thumbnail[0].url else entry.media_thumbnail[0] }}">{{ entry.media_thumbnail[0].url if entry.media_thumbnail[0].url else entry.media_thumbnail[0] }}</a><br>
{%- endif -%}
{%- if entry.media_description -%}
<strong>Media Description:</strong> {{ entry.media_description }}<br>
{%- endif -%}
{%- if entry.itunes_duration -%}
<strong>Duration:</strong> {{ entry.itunes_duration }}<br>
{%- endif -%}
{%- if entry.itunes_author -%}
<strong>Podcast Author:</strong> {{ entry.itunes_author }}<br>
{%- endif -%}
{%- if entry.dc_identifier -%}
<strong>Identifier:</strong> {{ entry.dc_identifier }}<br>
{%- endif -%}
{%- if entry.dc_source -%}
<strong>DC Source:</strong> {{ entry.dc_source }}<br>
{%- endif -%}
{%- if entry.dc_type -%}
<strong>Type:</strong> {{ entry.dc_type }}<br>
{%- endif -%}
{%- if entry.dc_format -%}
<strong>Format:</strong> {{ entry.dc_format }}<br>
{%- endif -%}
{%- if entry.dc_relation -%}
<strong>Related:</strong> {{ entry.dc_relation }}<br>
{%- endif -%}
{%- if entry.dc_coverage -%}
<strong>Coverage:</strong> {{ entry.dc_coverage }}<br>
{%- endif -%}
{%- if entry.source and entry.source.title -%}
<strong>Source:</strong> {{ entry.source.title }}
{%- if entry.source.link %} (<a href="{{ entry.source.link }}">{{ entry.source.link }}</a>){% endif -%}
<br>
{%- endif -%}
{%- if entry.dc_content -%}
<strong>Content:</strong> {{ entry.dc_content | safe }}
{%- elif entry.content and entry.content[0].value -%}
<strong>Content:</strong> {{ entry.content[0].value | safe }}
{%- elif entry.summary -%}
<strong>Summary:</strong> {{ entry.summary | safe }}
{%- endif -%}</article>
"""
def format_rss_items(rss_content: str, render_anchor_tag_content=False) -> str:
"""
Format RSS/Atom feed items in a readable text format using feedparser.
Format RSS/Atom feed items in a readable text format using feedparser and Jinja2.
Converts RSS <item> or Atom <entry> elements to formatted text with:
- <title> → <h1>Title</h1>
- <link> → Link: [url]
- <guid> → Guid: [id]
- <pubDate> → PubDate: [date]
- <description> or <content> → Raw HTML content (CDATA and entities automatically handled)
Converts RSS <item> or Atom <entry> elements to formatted text with all available fields:
- Basic fields: title, link, id/guid, published date, updated date
- Author fields: author, author_detail, contributors, publisher
- Content fields: content, summary, description
- Metadata: tags, category, rights, license
- Media: enclosures, media_content, media_thumbnail
- Dublin Core elements: dc:creator, dc:date, dc:publisher, etc. (mapped by feedparser)
Args:
rss_content: The RSS/Atom feed content
@@ -49,65 +168,19 @@ def format_rss_items(rss_content: str, render_anchor_tag_content=False) -> str:
"""
try:
import feedparser
from xml.sax.saxutils import escape as xml_escape
from changedetectionio.jinja2_custom import safe_jinja
# Parse the feed - feedparser handles all RSS/Atom variants, CDATA, entity unescaping, etc.
feed = feedparser.parse(rss_content)
formatted_items = []
# Determine feed type for appropriate labels when fields are missing
# feedparser sets feed.version to things like 'rss20', 'atom10', etc.
# Determine feed type for appropriate labels
is_atom = feed.version and 'atom' in feed.version
formatted_items = []
for entry in feed.entries:
item_parts = []
# Title - feedparser handles CDATA and entity unescaping automatically
if hasattr(entry, 'title') and entry.title:
item_parts.append(f'<h1>{xml_escape(entry.title)}</h1>')
# Link
if hasattr(entry, 'link') and entry.link:
item_parts.append(f'Link: {xml_escape(entry.link)}<br>')
# GUID/ID
if hasattr(entry, 'id') and entry.id:
item_parts.append(f'Guid: {xml_escape(entry.id)}<br>')
# Date - feedparser normalizes all date field names to 'published'
if hasattr(entry, 'published') and entry.published:
item_parts.append(f'PubDate: {xml_escape(entry.published)}<br>')
# Description/Content - feedparser handles CDATA and entity unescaping automatically
# Only add "Summary:" label for Atom <summary> tags
content = None
add_label = False
if hasattr(entry, 'content') and entry.content:
# Atom <content> - no label, just content
content = entry.content[0].value if entry.content[0].value else None
elif hasattr(entry, 'summary'):
# Could be RSS <description> or Atom <summary>
# feedparser maps both to entry.summary
content = entry.summary if entry.summary else None
# Only add "Summary:" label for Atom feeds (which use <summary> tag)
if is_atom:
add_label = True
# Add content with or without label
if content:
if add_label:
item_parts.append(f'Summary:<br>{content}')
else:
item_parts.append(content)
else:
# No content - just show <none>
item_parts.append('&lt;none&gt;')
# Join all parts of this item
if item_parts:
formatted_items.append('\n'.join(item_parts))
# Render the entry using Jinja2 template
rendered = safe_jinja.render(RSS_ENTRY_TEMPLATE, entry=entry, is_atom=is_atom)
formatted_items.append(rendered.strip())
# Wrap each item in a div with classes (first, last, item-N)
items_html = []
@@ -122,7 +195,8 @@ def format_rss_items(rss_content: str, render_anchor_tag_content=False) -> str:
class_str = ' '.join(classes)
items_html.append(f'<div class="{class_str}">{item}</div>')
return '<html><body>\n'+"\n<br><br>".join(items_html)+'\n</body></html>'
return '<html><body>\n' + "\n<br>".join(items_html) + '\n</body></html>'
except Exception as e:
logger.warning(f"Error formatting RSS items: {str(e)}")

View File

@@ -11,6 +11,56 @@ set -e
SCRIPT_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd )
# Since theres no curl installed lets roll with python3
check_sanity() {
local port="$1"
if [ -z "$port" ]; then
echo "Usage: check_sanity <port>" >&2
return 1
fi
python3 - "$port" <<'PYCODE'
import sys, time, urllib.request, socket
port = sys.argv[1]
url = f'http://localhost:{port}'
ok = False
for _ in range(6): # --retry 6
try:
r = urllib.request.urlopen(url, timeout=3).read().decode()
if 'est-url-is-sanity' in r:
ok = True
break
except (urllib.error.URLError, ConnectionRefusedError, socket.error):
time.sleep(1)
sys.exit(0 if ok else 1)
PYCODE
}
data_sanity_test () {
# Restart data sanity test
cd ..
TMPDIR=$(mktemp -d)
PORT_N=$((5000 + RANDOM % (6501 - 5000)))
./changedetection.py -p $PORT_N -d $TMPDIR -u "https://localhost?test-url-is-sanity=1" &
PID=$!
sleep 5
kill $PID
sleep 2
./changedetection.py -p $PORT_N -d $TMPDIR &
PID=$!
sleep 5
# On a restart the URL should still be there
check_sanity $PORT_N || exit 1
kill $PID
cd $OLDPWD
# datastore looks alright, continue
}
data_sanity_test
# REMOVE_REQUESTS_OLD_SCREENSHOTS disabled so that we can write a screenshot and send it in test_notifications.py without a real browser
REMOVE_REQUESTS_OLD_SCREENSHOTS=false pytest -n 30 --dist load tests/test_*.py
@@ -20,7 +70,7 @@ echo "RUNNING WITH BASE_URL SET"
# Now re-run some tests with BASE_URL enabled
# Re #65 - Ability to include a link back to the installation, in the notification.
export BASE_URL="https://really-unique-domain.io"
REMOVE_REQUESTS_OLD_SCREENSHOTS=false pytest -vv --maxfail=1 tests/test_notification.py
REMOVE_REQUESTS_OLD_SCREENSHOTS=false pytest -vv -s --maxfail=1 tests/test_notification.py
# Re-run with HIDE_REFERER set - could affect login
@@ -40,4 +90,7 @@ FETCH_WORKERS=130 pytest tests/test_history_consistency.py -v -l
# Check file:// will pickup a file when enabled
echo "Hello world" > /tmp/test-file.txt
ALLOW_FILE_URI=yes pytest tests/test_security.py
ALLOW_FILE_URI=yes pytest -vv -s tests/test_security.py

View File

@@ -6,6 +6,7 @@ from flask import (
flash
)
from .blueprint.rss import RSS_CONTENT_FORMAT_DEFAULT
from .html_tools import TRANSLATE_WHITESPACE_TABLE
from .model import App, Watch, USE_SYSTEM_DEFAULT_NOTIFICATION_FORMAT_FOR_WATCH
from copy import deepcopy, copy
@@ -44,7 +45,7 @@ class ChangeDetectionStore:
lock = Lock()
# For general updates/writes that can wait a few seconds
needs_write = False
datastore_path = None
# For when we edit, we should write to disk
needs_write_urgent = False
@@ -54,18 +55,30 @@ class ChangeDetectionStore:
def __init__(self, datastore_path="/datastore", include_default_watches=True, version_tag="0.0.0"):
# Should only be active for docker
# logging.basicConfig(filename='/dev/stdout', level=logging.INFO)
self.datastore_path = datastore_path
self.needs_write = False
self.start_time = time.time()
self.stop_thread = False
self.save_version_copy_json_db(version_tag)
self.reload_state(datastore_path=datastore_path, include_default_watches=include_default_watches, version_tag=version_tag)
def save_version_copy_json_db(self, version_tag):
import re
version_text = re.sub(r'\D+', '-', version_tag)
db_path = os.path.join(self.datastore_path, "url-watches.json")
db_path_version_backup = os.path.join(self.datastore_path, f"url-watches-{version_text}.json")
if not os.path.isfile(db_path_version_backup) and os.path.isfile(db_path):
from shutil import copyfile
logger.info(f"Backing up JSON DB due to new version to '{db_path_version_backup}'.")
copyfile(db_path, db_path_version_backup)
def reload_state(self, datastore_path, include_default_watches, version_tag):
logger.info(f"Datastore path is '{datastore_path}'")
self.__data = App.model()
self.datastore_path = datastore_path
self.json_store_path = os.path.join(self.datastore_path, "url-watches.json")
# Base definition for all watchers
# deepcopy part of #569 - not sure why its needed exactly
@@ -86,33 +99,38 @@ class ChangeDetectionStore:
with open(self.json_store_path, encoding='utf-8') as json_file:
from_disk = json.load(json_file)
# @todo isnt there a way todo this dict.update recursively?
# Problem here is if the one on the disk is missing a sub-struct, it wont be present anymore.
if 'watching' in from_disk:
self.__data['watching'].update(from_disk['watching'])
if not from_disk:
# No FileNotFound exception was thrown but somehow the JSON was empty - abort for safety.
logger.critical(f"JSON DB existed but was empty on load - empty JSON file? '{self.json_store_path}' Aborting")
raise Exception('JSON DB existed but was empty on load - Aborting')
if 'app_guid' in from_disk:
self.__data['app_guid'] = from_disk['app_guid']
# @todo isnt there a way todo this dict.update recursively?
# Problem here is if the one on the disk is missing a sub-struct, it wont be present anymore.
if 'watching' in from_disk:
self.__data['watching'].update(from_disk['watching'])
if 'settings' in from_disk:
if 'headers' in from_disk['settings']:
self.__data['settings']['headers'].update(from_disk['settings']['headers'])
if 'app_guid' in from_disk:
self.__data['app_guid'] = from_disk['app_guid']
if 'requests' in from_disk['settings']:
self.__data['settings']['requests'].update(from_disk['settings']['requests'])
if 'settings' in from_disk:
if 'headers' in from_disk['settings']:
self.__data['settings']['headers'].update(from_disk['settings']['headers'])
if 'application' in from_disk['settings']:
self.__data['settings']['application'].update(from_disk['settings']['application'])
if 'requests' in from_disk['settings']:
self.__data['settings']['requests'].update(from_disk['settings']['requests'])
# Convert each existing watch back to the Watch.model object
for uuid, watch in self.__data['watching'].items():
self.__data['watching'][uuid] = self.rehydrate_entity(uuid, watch)
logger.info(f"Watching: {uuid} {watch['url']}")
if 'application' in from_disk['settings']:
self.__data['settings']['application'].update(from_disk['settings']['application'])
# And for Tags also, should be Restock type because it has extra settings
for uuid, tag in self.__data['settings']['application']['tags'].items():
self.__data['settings']['application']['tags'][uuid] = self.rehydrate_entity(uuid, tag, processor_override='restock_diff')
logger.info(f"Tag: {uuid} {tag['title']}")
# Convert each existing watch back to the Watch.model object
for uuid, watch in self.__data['watching'].items():
self.__data['watching'][uuid] = self.rehydrate_entity(uuid, watch)
logger.info(f"Watching: {uuid} {watch['url']}")
# And for Tags also, should be Restock type because it has extra settings
for uuid, tag in self.__data['settings']['application']['tags'].items():
self.__data['settings']['application']['tags'][uuid] = self.rehydrate_entity(uuid, tag, processor_override='restock_diff')
logger.info(f"Tag: {uuid} {tag['title']}")
# First time ran, Create the datastore.
except (FileNotFoundError):
@@ -758,6 +776,28 @@ class ChangeDetectionStore:
return updates_available
def add_notification_url(self, notification_url):
logger.debug(f">>> Adding new notification_url - '{notification_url}'")
notification_urls = self.data['settings']['application'].get('notification_urls', [])
if notification_url in notification_urls:
return notification_url
with self.lock:
notification_urls = self.__data['settings']['application'].get('notification_urls', [])
if notification_url in notification_urls:
return notification_url
# Append and update the datastore
notification_urls.append(notification_url)
self.__data['settings']['application']['notification_urls'] = notification_urls
self.needs_write = True
return notification_url
# Run all updates
# IMPORTANT - Each update could be run even when they have a new install and the schema is correct
# So therefor - each `update_n` should be very careful about checking if it needs to actually run
@@ -770,7 +810,16 @@ class ChangeDetectionStore:
logger.critical(f"Applying update_{update_n}")
# Wont exist on fresh installs
if os.path.exists(self.json_store_path):
shutil.copyfile(self.json_store_path, os.path.join(self.datastore_path, f"url-watches-before-{update_n}.json"))
i = 0
while True:
i+=1
dest = os.path.join(self.datastore_path, f"url-watches-before-{update_n}-{i}.json")
if not os.path.exists(dest):
logger.debug(f"Copying url-watches.json DB to '{dest}' backup.")
shutil.copyfile(self.json_store_path, dest)
break
else:
logger.warning(f"Backup of url-watches.json '{dest}', DB already exists, trying {i+1}.. ")
try:
update_method = getattr(self, f"update_{update_n}")()
@@ -1061,25 +1110,15 @@ class ChangeDetectionStore:
formats['markdown'] = 'Markdown'
re_run(formats)
def add_notification_url(self, notification_url):
logger.debug(f">>> Adding new notification_url - '{notification_url}'")
notification_urls = self.data['settings']['application'].get('notification_urls', [])
if notification_url in notification_urls:
return notification_url
with self.lock:
notification_urls = self.__data['settings']['application'].get('notification_urls', [])
if notification_url in notification_urls:
return notification_url
# Append and update the datastore
notification_urls.append(notification_url)
self.__data['settings']['application']['notification_urls'] = notification_urls
self.needs_write = True
return notification_url
# RSS types should be inline with the same names as notification types
def update_24(self):
rss_format = self.data['settings']['application'].get('rss_content_format')
if not rss_format or 'text' in rss_format:
# might have been 'plaintext, 'plain text' or something
self.data['settings']['application']['rss_content_format'] = RSS_CONTENT_FORMAT_DEFAULT
elif 'html' in rss_format:
self.data['settings']['application']['rss_content_format'] = 'htmlcolor'
else:
# safe fallback to text
self.data['settings']['application']['rss_content_format'] = RSS_CONTENT_FORMAT_DEFAULT

View File

@@ -1,6 +1,118 @@
{% from '_helpers.html' import render_field %}
{% macro show_token_placeholders(extra_notification_token_placeholder_info, suffix="") %}
<div class="pure-controls">
<span class="pure-form-message-inline">
Body for all notifications &dash; You can use <a target="newwindow" href="https://jinja.palletsprojects.com/en/3.0.x/templates/">Jinja2</a> templating in the notification title, body and URL, and tokens from below.
</span><br>
<div data-target="#notification-tokens-info{{ suffix }}" class="toggle-show pure-button button-tag button-xsmall">Show
token/placeholders
</div>
</div>
<div class="pure-controls" style="display: none;" id="notification-tokens-info{{ suffix }}">
<table class="pure-table" id="token-table">
<thead>
<tr>
<th>Token</th>
<th>Description</th>
</tr>
</thead>
<tbody>
<tr>
<td><code>{{ '{{base_url}}' }}</code></td>
<td>The URL of the changedetection.io instance you are running.</td>
</tr>
<tr>
<td><code>{{ '{{watch_url}}' }}</code></td>
<td>The URL being watched.</td>
</tr>
<tr>
<td><code>{{ '{{watch_uuid}}' }}</code></td>
<td>The UUID of the watch.</td>
</tr>
<tr>
<td><code>{{ '{{watch_title}}' }}</code></td>
<td>The page title of the watch, uses &lt;title&gt; if not set, falls back to URL</td>
</tr>
<tr>
<td><code>{{ '{{watch_tag}}' }}</code></td>
<td>The watch group / tag</td>
</tr>
<tr>
<td><code>{{ '{{preview_url}}' }}</code></td>
<td>The URL of the preview page generated by changedetection.io.</td>
</tr>
<tr>
<td><code>{{ '{{diff_url}}' }}</code></td>
<td>The URL of the diff output for the watch.</td>
</tr>
<tr>
<td><code>{{ '{{diff}}' }}</code></td>
<td>The diff output - only changes, additions, and removals</td>
</tr>
<tr>
<td><code>{{ '{{diff_clean}}' }}</code></td>
<td>The diff output - only changes, additions, and removals &dash; <i>Without (added) prefix or colors</i>
</td>
</tr>
<tr>
<td><code>{{ '{{diff_added}}' }}</code></td>
<td>The diff output - only changes and additions</td>
</tr>
<tr>
<td><code>{{ '{{diff_added_clean}}' }}</code></td>
<td>The diff output - only changes and additions &dash; <i>Without (added) prefix or colors</i></td>
</tr>
<tr>
<td><code>{{ '{{diff_removed}}' }}</code></td>
<td>The diff output - only changes and removals</td>
</tr>
<tr>
<td><code>{{ '{{diff_removed_clean}}' }}</code></td>
<td>The diff output - only changes and removals &dash; <i>Without (added) prefix or colors</i></td>
</tr>
<tr>
<td><code>{{ '{{diff_full}}' }}</code></td>
<td>The diff output - full difference output</td>
</tr>
<tr>
<td><code>{{ '{{diff_full_clean}}' }}</code></td>
<td>The diff output - full difference output &dash; <i>Without (added) prefix or colors</i></td>
</tr>
<tr>
<td><code>{{ '{{diff_patch}}' }}</code></td>
<td>The diff output - patch in unified format</td>
</tr>
<tr>
<td><code>{{ '{{current_snapshot}}' }}</code></td>
<td>The current snapshot text contents value, useful when combined with JSON or CSS filters
</td>
</tr>
<tr>
<td><code>{{ '{{triggered_text}}' }}</code></td>
<td>Text that tripped the trigger from filters</td>
{% if extra_notification_token_placeholder_info %}
{% for token in extra_notification_token_placeholder_info %}
<tr>
<td><code>{{ '{{' }}{{ token[0] }}{{ '}}' }}</code></td>
<td>{{ token[1] }}</td>
</tr>
{% endfor %}
{% endif %}
</tbody>
</table>
<span class="pure-form-message-inline">
Warning: Contents of <code>{{ '{{diff}}' }}</code>, <code>{{ '{{diff_removed}}' }}</code>, and <code>{{ '{{diff_added}}' }}</code> depend on how the difference algorithm perceives the change. <br>
For example, an addition or removal could be perceived as a change in some cases. <a target="newwindow" href="https://github.com/dgtlmoon/changedetection.io/wiki/Using-the-%7B%7Bdiff%7D%7D,-%7B%7Bdiff_added%7D%7D,-and-%7B%7Bdiff_removed%7D%7D-notification-tokens">More Here</a> <br>
</span>
</div>
{% endmacro %}
{% macro render_common_settings_form(form, emailprefix, settings_application, extra_notification_token_placeholder_info) %}
<div class="pure-control-group">
{{ render_field(form.notification_urls, rows=5, placeholder="Examples:
@@ -40,125 +152,25 @@
</div>
<div class="pure-control-group">
{{ render_field(form.notification_body , rows=5, class="notification-body", placeholder=settings_application['notification_body']) }}
<span class="pure-form-message-inline">Body for all notifications &dash; You can use <a target="newwindow" href="https://jinja.palletsprojects.com/en/3.0.x/templates/">Jinja2</a> templating in the notification title, body and URL, and tokens from below.
</span>
</div>
<div class="pure-controls">
<div data-target="#notification-tokens-info" class="toggle-show pure-button button-tag button-xsmall">Show token/placeholders</div>
</div>
<div class="pure-controls" style="display: none;" id="notification-tokens-info">
<table class="pure-table" id="token-table">
<thead>
<tr>
<th>Token</th>
<th>Description</th>
</tr>
</thead>
<tbody>
<tr>
<td><code>{{ '{{base_url}}' }}</code></td>
<td>The URL of the changedetection.io instance you are running.</td>
</tr>
<tr>
<td><code>{{ '{{watch_url}}' }}</code></td>
<td>The URL being watched.</td>
</tr>
<tr>
<td><code>{{ '{{watch_uuid}}' }}</code></td>
<td>The UUID of the watch.</td>
</tr>
<tr>
<td><code>{{ '{{watch_title}}' }}</code></td>
<td>The page title of the watch, uses &lt;title&gt; if not set, falls back to URL</td>
</tr>
<tr>
<td><code>{{ '{{watch_tag}}' }}</code></td>
<td>The watch group / tag</td>
</tr>
<tr>
<td><code>{{ '{{preview_url}}' }}</code></td>
<td>The URL of the preview page generated by changedetection.io.</td>
</tr>
<tr>
<td><code>{{ '{{diff_url}}' }}</code></td>
<td>The URL of the diff output for the watch.</td>
</tr>
<tr>
<td><code>{{ '{{diff}}' }}</code></td>
<td>The diff output - only changes, additions, and removals</td>
</tr>
<tr>
<td><code>{{ '{{diff_clean}}' }}</code></td>
<td>The diff output - only changes, additions, and removals &dash; <i>Without (added) prefix or colors</i></td>
</tr>
<tr>
<td><code>{{ '{{diff_added}}' }}</code></td>
<td>The diff output - only changes and additions</td>
</tr>
<tr>
<td><code>{{ '{{diff_added_clean}}' }}</code></td>
<td>The diff output - only changes and additions &dash; <i>Without (added) prefix or colors</i></td>
</tr>
<tr>
<td><code>{{ '{{diff_removed}}' }}</code></td>
<td>The diff output - only changes and removals</td>
</tr>
<tr>
<td><code>{{ '{{diff_removed_clean}}' }}</code></td>
<td>The diff output - only changes and removals &dash; <i>Without (added) prefix or colors</i></td>
</tr>
<tr>
<td><code>{{ '{{diff_full}}' }}</code></td>
<td>The diff output - full difference output</td>
</tr>
<tr>
<td><code>{{ '{{diff_full_clean}}' }}</code></td>
<td>The diff output - full difference output &dash; <i>Without (added) prefix or colors</i></td>
</tr>
<tr>
<td><code>{{ '{{diff_patch}}' }}</code></td>
<td>The diff output - patch in unified format</td>
</tr>
<tr>
<td><code>{{ '{{current_snapshot}}' }}</code></td>
<td>The current snapshot text contents value, useful when combined with JSON or CSS filters
</td>
</tr>
<tr>
<td><code>{{ '{{triggered_text}}' }}</code></td>
<td>Text that tripped the trigger from filters</td>
{% if extra_notification_token_placeholder_info %}
{% for token in extra_notification_token_placeholder_info %}
<tr>
<td><code>{{ '{{' }}{{ token[0] }}{{ '}}' }}</code></td>
<td>{{ token[1] }}</td>
</tr>
{% endfor %}
{% endif %}
</tbody>
</table>
{{ show_token_placeholders(extra_notification_token_placeholder_info=extra_notification_token_placeholder_info) }}
<div class="pure-form-message-inline">
<p>
Warning: Contents of <code>{{ '{{diff}}' }}</code>, <code>{{ '{{diff_removed}}' }}</code>, and <code>{{ '{{diff_added}}' }}</code> depend on how the difference algorithm perceives the change. <br>
For example, an addition or removal could be perceived as a change in some cases. <a target="newwindow" href="https://github.com/dgtlmoon/changedetection.io/wiki/Using-the-%7B%7Bdiff%7D%7D,-%7B%7Bdiff_added%7D%7D,-and-%7B%7Bdiff_removed%7D%7D-notification-tokens">More Here</a> <br>
</p>
<p>
<ul>
<li><span class="pure-form-message-inline">
For JSON payloads, use <strong>|tojson</strong> without quotes for automatic escaping, for example - <code>{ "name": {{ '{{ watch_title|tojson }}' }} }</code>
</p>
<p>
</span></li>
<li><span class="pure-form-message-inline">
URL encoding, use <strong>|urlencode</strong>, for example - <code>gets://hook-website.com/test.php?title={{ '{{ watch_title|urlencode }}' }}</code>
</p>
<p>
</span></li>
<li><span class="pure-form-message-inline">
Regular-expression replace, use <strong>|regex_replace</strong>, for example - <code>{{ "{{ \"hello world 123\" | regex_replace('[0-9]+', 'no-more-numbers') }}" }}</code>
</p>
<p>
</span></li>
<li><span class="pure-form-message-inline">
For a complete reference of all Jinja2 built-in filters, users can refer to the <a href="https://jinja.palletsprojects.com/en/3.1.x/templates/#builtin-filters">https://jinja.palletsprojects.com/en/3.1.x/templates/#builtin-filters</a>
</p>
</div>
</span></li>
</ul>
<br>
</div>
<div class="pure-control-group">
<div class="">
{{ render_field(form.notification_format , class="notification-format") }}
<span class="pure-form-message-inline">Format for all notifications</span>
</div>

View File

@@ -8,8 +8,13 @@
<meta name="robots" content="noindex">
<title>Change Detection{{extra_title}}</title>
{% if app_rss_token %}
<link rel="alternate" type="application/rss+xml" title="Changedetection.io » Feed{% if active_tag_uuid %}- {{active_tag.title}}{% endif %}" href="{{ url_for('rss.feed', tag=active_tag_uuid , token=app_rss_token)}}" >
{% endif %}
<link rel="alternate" type="application/rss+xml" title="Changedetection.io » Feed{% if active_tag_uuid %}- {{active_tag.title}}{% endif %}" href="{{ url_for('rss.feed', tag=active_tag_uuid, token=app_rss_token, _external=True )}}" >
{% if rss_uuid_feed %}
<link rel="alternate" type="application/rss+xml" title="Feed » {{ rss_uuid_feed['label'] }}" href="{{ rss_uuid_feed['url'] }}" >
{%- endif -%}
{%- endif -%}
<link rel="stylesheet" href="{{url_for('static_content', group='styles', filename='pure-min.css')}}" >
<link rel="stylesheet" href="{{url_for('static_content', group='styles', filename='styles.css')}}?v={{ get_css_version() }}" >
{% if extra_stylesheets %}

View File

@@ -77,10 +77,9 @@ def test_check_basic_change_detection_functionality(client, live_server, measure
assert b'<rss' in res.data
# re #16 should have the diff in here too
assert b'(into) which has this one new line' in res.data
assert b'which has this one new line' in res.data
assert b'CDATA' in res.data
assert expected_url.encode('utf-8') in res.data
#
# Following the 'diff' link, it should no longer display as 'has-unread-changes' even after we recheck it a few times
res = client.get(url_for("ui.ui_views.diff_history_page", uuid=uuid))

View File

@@ -253,43 +253,6 @@ def test_check_notification(client, live_server, measure_memory_usage, datastore
follow_redirects=True
)
def test_notification_validation(client, live_server, measure_memory_usage, datastore_path):
time.sleep(1)
# re #242 - when you edited an existing new entry, it would not correctly show the notification settings
# Add our URL to the import page
test_url = url_for('test_endpoint', _external=True)
res = client.post(
url_for("ui.ui_views.form_quick_watch_add"),
data={"url": test_url, "tags": 'nice one'},
follow_redirects=True
)
assert b"Watch added" in res.data
# Re #360 some validation
# res = client.post(
# url_for("ui.ui_edit.edit_page", uuid="first"),
# data={"notification_urls": 'json://localhost/foobar',
# "notification_title": "",
# "notification_body": "",
# "notification_format": 'text',
# "url": test_url,
# "tag": "my tag",
# "title": "my title",
# "headers": "",
# "fetch_backend": "html_requests"},
# follow_redirects=True
# )
# assert b"Notification Body and Title is required when a Notification URL is used" in res.data
# cleanup for the next
client.get(
url_for("ui.form_delete", uuid="all"),
follow_redirects=True
)
def test_notification_urls_jinja2_apprise_integration(client, live_server, measure_memory_usage, datastore_path):
@@ -532,6 +495,7 @@ def test_single_send_test_notification_on_watch(client, live_server, measure_mem
test_url = url_for('test_endpoint', _external=True)
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
test_notification_url = url_for('test_notification_endpoint', _external=True).replace('http://', 'post://')+"?xxx={{ watch_url }}&+custom-header=123"
# 1995 UTF-8 content should be encoded

View File

@@ -329,6 +329,7 @@ def test_change_with_notification_values(client, live_server, measure_memory_usa
# Should see new tokens register
res = client.get(url_for("settings.settings_page"))
assert b'{{restock.original_price}}' in res.data
assert b'Original price at first check' in res.data

View File

@@ -4,6 +4,8 @@ import time
from flask import url_for
from .util import set_original_response, set_modified_response, live_server_setup, wait_for_all_checks, extract_rss_token_from_UI, \
extract_UUID_from_client, delete_all_watches
from loguru import logger
from ..blueprint.rss import RSS_FORMAT_TYPES
def set_original_cdata_xml(datastore_path):
@@ -65,8 +67,17 @@ def set_html_content(datastore_path, content):
with open(os.path.join(datastore_path, "endpoint-content.txt"), "wb") as f:
f.write(test_return_data.encode('utf-8'))
# def test_setup(client, live_server, measure_memory_usage, datastore_path):
# live_server_setup(live_server) # Setup on conftest per function
def test_rss_feed_empty(client, live_server, measure_memory_usage, datastore_path):
rss_token = extract_rss_token_from_UI(client)
res = client.get(
url_for("rss.feed", token=rss_token, _external=True),
follow_redirects=True
)
assert res.status_code == 200
assert b'xml' in res.data
def test_rss_and_token(client, live_server, measure_memory_usage, datastore_path):
# # live_server_setup(live_server) # Setup on conftest per function
@@ -74,18 +85,10 @@ def test_rss_and_token(client, live_server, measure_memory_usage, datastore_path
set_original_response(datastore_path=datastore_path)
rss_token = extract_rss_token_from_UI(client)
# Add our URL to the import page
res = client.post(
url_for("imports.import_page"),
data={"urls": url_for('test_random_content_endpoint', _external=True)},
follow_redirects=True
)
assert b"1 Imported" in res.data
uuid = client.application.config.get('DATASTORE').add_watch(url=url_for('test_random_content_endpoint', _external=True))
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
set_modified_response(datastore_path=datastore_path)
time.sleep(1)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
@@ -238,6 +241,114 @@ def test_rss_bad_chars_breaking(client, live_server, measure_memory_usage, datas
#assert live_server.app.config['DATASTORE'].data['watching'][uuid].history_n == 2
def test_rss_single_watch_feed(client, live_server, measure_memory_usage, datastore_path):
app_rss_token = live_server.app.config['DATASTORE'].data['settings']['application'].get('rss_access_token')
rss_content_format = live_server.app.config['DATASTORE'].data['settings']['application'].get('rss_content_format')
set_original_response(datastore_path=datastore_path)
test_url = url_for('test_endpoint', _external=True)
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
res = client.get(
url_for('rss.rss_single_watch', uuid=uuid, token=app_rss_token),
follow_redirects=False
)
assert res.status_code == 400
assert b'not have enough history' in res.data
set_modified_response(datastore_path=datastore_path)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
res = client.get(
url_for('rss.rss_single_watch', uuid=uuid, token=app_rss_token),
follow_redirects=False
)
assert res.status_code == 200
import xml.etree.ElementTree as ET
root = ET.fromstring(res.data)
def check_formatting(expected_type, content, url):
logger.debug(f"Checking formatting type {expected_type}")
if expected_type == 'text':
assert '<p>' not in content
assert 'body' not in content
assert '(changed) Which is across multiple lines\n'
assert 'modified head title had a change.' # Because it picked it up <title> as watch_title in default template
elif expected_type == 'html':
assert '<p>' in content
assert '<body>' in content
assert '<p>(changed) Which is across multiple lines<br>' in content
assert f'href="{url}">modified head title had a change.</a>'
elif expected_type == 'htmlcolor':
assert '<body>' in content
assert ' role="note" aria-label="Changed text" title="Changed text">Which is across multiple lines</span>' in content
assert f'href="{url}">modified head title had a change.</a>'
else:
raise Exception(f"Unknown type {expected_type}")
item = root.findall('.//item')[0].findtext('description')
check_formatting(expected_type=rss_content_format, content=item, url=test_url)
# Now the default one is over, lets try all the others
for k in list(RSS_FORMAT_TYPES.keys()):
res = client.post(
url_for("settings.settings_page"),
data={"application-rss_content_format": k},
follow_redirects=True
)
assert b'Settings updated' in res.data
res = client.get(
url_for('rss.rss_single_watch', uuid=uuid, token=app_rss_token),
follow_redirects=False
)
assert res.status_code == 200
root = ET.fromstring(res.data)
item = root.findall('.//item')[0].findtext('description')
check_formatting(expected_type=k, content=item, url=test_url)
# Test RSS entry order: Create multiple versions and verify newest appears first
for version in range(3, 6): # Create versions 3, 4, 5
set_html_content(datastore_path, f"Version {version} content")
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
time.sleep(0.5) # Small delay to ensure different timestamps
# Fetch RSS feed again to verify order
res = client.get(
url_for('rss.rss_single_watch', uuid=uuid, token=app_rss_token),
follow_redirects=False
)
assert res.status_code == 200
# Parse RSS and check order (newest first)
root = ET.fromstring(res.data)
items = root.findall('.//item')
assert len(items) >= 3, f"Expected at least 3 items, got {len(items)}"
# Get descriptions from first 3 items
descriptions = []
for item in items[:3]:
desc = item.findtext('description')
descriptions.append(desc if desc else "")
# First item should contain newest change (Version 5)
assert b"Version 5" in descriptions[0].encode() or "Version 5" in descriptions[0], \
f"First item should show newest change (Version 5), but got: {descriptions[0][:200]}"
# Second item should contain Version 4
assert b"Version 4" in descriptions[1].encode() or "Version 4" in descriptions[1], \
f"Second item should show Version 4, but got: {descriptions[1][:200]}"
# Third item should contain Version 3
assert b"Version 3" in descriptions[2].encode() or "Version 3" in descriptions[2], \
f"Third item should show Version 3, but got: {descriptions[2][:200]}"

View File

@@ -0,0 +1,254 @@
#!/usr/bin/env python3
import time
from flask import url_for
from .util import live_server_setup, wait_for_all_checks, extract_rss_token_from_UI, get_UUID_for_tag_name, delete_all_watches
import os
def set_original_response(datastore_path):
test_return_data = """<html>
<body>
Some initial text<br>
<p>Watch 1 content</p>
<p>Watch 2 content</p>
</body>
</html>
"""
with open(os.path.join(datastore_path, "endpoint-content.txt"), "w") as f:
f.write(test_return_data)
return None
def set_modified_response(datastore_path):
test_return_data = """<html>
<body>
Some initial text<br>
<p>Watch 1 content MODIFIED</p>
<p>Watch 2 content CHANGED</p>
</body>
</html>
"""
with open(os.path.join(datastore_path, "endpoint-content.txt"), "w") as f:
f.write(test_return_data)
return None
def test_rss_group(client, live_server, measure_memory_usage, datastore_path):
"""
Test that RSS feed for a specific tag/group shows only watches in that group
and displays changes correctly.
"""
set_original_response(datastore_path=datastore_path)
# Create a tag/group
res = client.post(
url_for("tags.form_tag_add"),
data={"name": "test-rss-group"},
follow_redirects=True
)
assert b"Tag added" in res.data
assert b"test-rss-group" in res.data
# Get the tag UUID
tag_uuid = get_UUID_for_tag_name(client, name="test-rss-group")
assert tag_uuid is not None
# Add first watch with the tag
test_url_1 = url_for('test_endpoint', _external=True) + "?watch=1"
res = client.post(
url_for("ui.ui_views.form_quick_watch_add"),
data={"url": test_url_1, "tags": 'test-rss-group'},
follow_redirects=True
)
assert b"Watch added" in res.data
# Add second watch with the tag
test_url_2 = url_for('test_endpoint', _external=True) + "?watch=2"
res = client.post(
url_for("ui.ui_views.form_quick_watch_add"),
data={"url": test_url_2, "tags": 'test-rss-group'},
follow_redirects=True
)
assert b"Watch added" in res.data
# Add a third watch WITHOUT the tag (should not appear in RSS)
test_url_3 = url_for('test_endpoint', _external=True) + "?watch=3"
res = client.post(
url_for("ui.ui_views.form_quick_watch_add"),
data={"url": test_url_3, "tags": 'other-tag'},
follow_redirects=True
)
assert b"Watch added" in res.data
# Wait for initial checks to complete
wait_for_all_checks(client)
# Trigger a change
set_modified_response(datastore_path=datastore_path)
# Recheck all watches
res = client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
# Get RSS token
rss_token = extract_rss_token_from_UI(client)
assert rss_token is not None
# Request RSS feed for the specific tag/group using the new endpoint
res = client.get(
url_for("rss.rss_tag_feed", tag_uuid=tag_uuid, token=rss_token, _external=True),
follow_redirects=True
)
# Verify response is successful
assert res.status_code == 200
assert b"<?xml" in res.data or b"<rss" in res.data
# Verify the RSS feed contains the tag name in the title
assert b"test-rss-group" in res.data
# Verify watch 1 and watch 2 are in the RSS feed (they have the tag)
assert b"watch=1" in res.data
assert b"watch=2" in res.data
# Verify watch 3 is NOT in the RSS feed (it doesn't have the tag)
assert b"watch=3" not in res.data
# Verify the changes are shown in the RSS feed
assert b"MODIFIED" in res.data or b"CHANGED" in res.data
# Verify it's actual RSS/XML format
assert b"<rss" in res.data or b"<feed" in res.data
# Test with invalid tag UUID - should return 404
res = client.get(
url_for("rss.rss_tag_feed", tag_uuid="invalid-uuid-12345", token=rss_token, _external=True),
follow_redirects=True
)
assert res.status_code == 404
assert b"not found" in res.data
# Test with invalid token - should return 403
res = client.get(
url_for("rss.rss_tag_feed", tag_uuid=tag_uuid, token="wrong-token", _external=True),
follow_redirects=True
)
assert res.status_code == 403
assert b"Access denied" in res.data
# Clean up
delete_all_watches(client)
res = client.get(url_for("tags.delete_all"), follow_redirects=True)
assert b'All tags deleted' in res.data
def test_rss_group_empty_tag(client, live_server, measure_memory_usage, datastore_path):
"""
Test that RSS feed for a tag with no watches returns valid but empty RSS.
"""
# Create a tag with no watches
res = client.post(
url_for("tags.form_tag_add"),
data={"name": "empty-tag"},
follow_redirects=True
)
assert b"Tag added" in res.data
tag_uuid = get_UUID_for_tag_name(client, name="empty-tag")
assert tag_uuid is not None
# Get RSS token
rss_token = extract_rss_token_from_UI(client)
# Request RSS feed for empty tag
res = client.get(
url_for("rss.rss_tag_feed", tag_uuid=tag_uuid, token=rss_token, _external=True),
follow_redirects=True
)
# Should still return 200 with valid RSS
assert res.status_code == 200
assert b"<?xml" in res.data or b"<rss" in res.data
assert b"empty-tag" in res.data
# Clean up
res = client.get(url_for("tags.delete_all"), follow_redirects=True)
assert b'All tags deleted' in res.data
def test_rss_group_only_unviewed(client, live_server, measure_memory_usage, datastore_path):
"""
Test that RSS feed for a tag only shows unviewed watches.
"""
set_original_response(datastore_path=datastore_path)
# Create a tag
res = client.post(
url_for("tags.form_tag_add"),
data={"name": "unviewed-test"},
follow_redirects=True
)
assert b"Tag added" in res.data
tag_uuid = get_UUID_for_tag_name(client, name="unviewed-test")
# Add two watches with the tag
test_url_1 = url_for('test_endpoint', _external=True) + "?unviewed=1"
res = client.post(
url_for("ui.ui_views.form_quick_watch_add"),
data={"url": test_url_1, "tags": 'unviewed-test'},
follow_redirects=True
)
assert b"Watch added" in res.data
test_url_2 = url_for('test_endpoint', _external=True) + "?unviewed=2"
res = client.post(
url_for("ui.ui_views.form_quick_watch_add"),
data={"url": test_url_2, "tags": 'unviewed-test'},
follow_redirects=True
)
assert b"Watch added" in res.data
wait_for_all_checks(client)
# Trigger changes
set_modified_response(datastore_path=datastore_path)
res = client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
# Get RSS token
rss_token = extract_rss_token_from_UI(client)
# Request RSS feed - should show both watches (both unviewed)
res = client.get(
url_for("rss.rss_tag_feed", tag_uuid=tag_uuid, token=rss_token, _external=True),
follow_redirects=True
)
assert res.status_code == 200
assert b"unviewed=1" in res.data
assert b"unviewed=2" in res.data
# Mark all as viewed
res = client.get(url_for('ui.mark_all_viewed'), follow_redirects=True)
wait_for_all_checks(client)
# Request RSS feed again - should be empty now (no unviewed watches)
res = client.get(
url_for("rss.rss_tag_feed", tag_uuid=tag_uuid, token=rss_token, _external=True),
follow_redirects=True
)
assert res.status_code == 200
# Should not contain the watch URLs anymore since they're viewed
assert b"unviewed=1" not in res.data
assert b"unviewed=2" not in res.data
# Clean up
delete_all_watches(client)
res = client.get(url_for("tags.delete_all"), follow_redirects=True)
assert b'All tags deleted' in res.data

View File

@@ -7,6 +7,61 @@ from flask import url_for
from .util import set_original_response, set_modified_response, live_server_setup, wait_for_all_checks, extract_rss_token_from_UI, \
extract_UUID_from_client, delete_all_watches
def set_xmlns_purl_content(datastore_path, extra=""):
data=f"""<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="https://purl.org/dc/elements/1.1/" xmlns:media="http://search.yahoo.com/mrss/" xmlns:atom="http://www.w3.org/2005/Atom" version="2.0">
<channel>
<atom:link href="https://www.xxxxxxxtechxxxxx.com/feeds.xml" rel="self" type="application/rss+xml"/>
<title>
<![CDATA[ Latest from xxxxxxxtechxxxxx ]]>
</title>
<link>https://www.xxxxx.com</link>
<description>
<![CDATA[ All the latest content from the xxxxxxxtechxxxxx team ]]>
</description>
<lastBuildDate>Wed, 19 Nov 2025 15:00:00 +0000</lastBuildDate>
<language>en</language>
<item>
<title>
<![CDATA[ Sony Xperia 1 VII review: has Sonys long-standing Xperia family lost what it takes to compete? ]]>
</title>
<dc:content>
<![CDATA[ {{extra}} a little harder, dc-content. blue often quite tough and purple usually very difficult.</p><p>On the plus side, you don't technically need to solve the final one, as you'll be able to answer that one by a process of elimination. What's more, you can make up to four mistakes, which gives you a little bit of breathing room.</p><p>It's a little more involved than something like Wordle, however, and there are plenty of opportunities for the game to trip you up with tricks. For instance, watch out for homophones and other word games that could disguise the answers.</p><p>It's playable for free via the <a href="https://www.nytimes.com/games/strands" target="_blank">NYT Games site</a> on desktop or mobile.</p></article></section> ]]>
</dc:content>
<link>https://www.xxxxxxx.com/gaming/nyt-connections-today-answers-hints-20-november-2025</link>
<description>
<![CDATA[ Looking for NYT Connections answers and hints? Here's all you need to know to solve today's game, plus my commentary on the puzzles. ]]>
</description>
<guid isPermaLink="false">N2C2T6DztpWdxSdKpSUx89</guid>
<enclosure url="https://cdn.mos.cms.futurecdn.net/RCGfdf3yhQ9W3MHbTRT6yk-1280-80.jpg" type="image/jpeg" length="0"/>
<pubDate>Wed, 19 Nov 2025 15:00:00 +0000</pubDate>
<category>
<![CDATA[ Gaming ]]>
</category>
<dc:creator>
<![CDATA[ Johnny Dee ]]>
</dc:creator>
<media:content type="image/jpeg" url="https://cdn.mos.cms.futurecdn.net/RCGfdf3yhQ9W3MHbTRT6yk-1280-80.jpg">
<media:credit>
<![CDATA[ New York Times ]]>
</media:credit>
<media:text>
<![CDATA[ NYT Connections homescreen on a phone, on a purple background ]]>
</media:text>
<media:title type="plain">
<![CDATA[ NYT Connections homescreen on a phone, on a purple background ]]>
</media:title>
</media:content>
<media:thumbnail url="https://cdn.mos.cms.futurecdn.net/RCGfdf3yhQ9W3MHbTRT6yk-1280-80.jpg"/>
</item>
</channel>
</rss>
"""
with open(os.path.join(datastore_path, "endpoint-content.txt"), "w") as f:
f.write(data)
def set_original_cdata_xml(datastore_path):
test_return_data = """<rss xmlns:atom="http://www.w3.org/2005/Atom" version="2.0">
@@ -98,3 +153,26 @@ def test_rss_reader_mode_with_css_filters(client, live_server, measure_memory_us
assert 'The days of Terminator and The Matrix' in snapshot_contents
delete_all_watches(client)
def test_xmlns_purl_content(client, live_server, measure_memory_usage, datastore_path):
set_xmlns_purl_content(datastore_path=datastore_path)
# Rarely do endpoints give the right header, usually just text/xml, so we check also for <rss
# This also triggers the automatic CDATA text parser so the RSS goes back a nice content list
#test_url = url_for('test_endpoint', content_type="text/xml; charset=UTF-8", _external=True)
# Because NO utf-8 was specified here, we should be able to recover it in requests or other somehow.
test_url = url_for('test_endpoint', content_type="text/xml;", _external=True)
live_server.app.config['DATASTORE'].data['settings']['application']['rss_reader_mode'] = True
# Add our URL to the import page
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url, extras={'include_filters': [".last"]})
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
watch = live_server.app.config['DATASTORE'].data['watching'][uuid]
dates = list(watch.history.keys())
snapshot_contents = watch.get_history_snapshot(timestamp=dates[0])
assert "Title: Sony Xperia 1 VII review: has Sonys long-standing Xperia family lost what it takes to compete?" in snapshot_contents
assert "dc-content" in snapshot_contents

View File

@@ -0,0 +1,347 @@
#!/usr/bin/env python3
import time
import os
import xml.etree.ElementTree as ET
from flask import url_for
from .restock.test_restock import set_original_response
from .util import live_server_setup, wait_for_all_checks, extract_rss_token_from_UI, extract_UUID_from_client, delete_all_watches, set_modified_response
from ..notification import default_notification_format
# Watch with no change should not break the output
def test_rss_feed_empty(client, live_server, measure_memory_usage, datastore_path):
set_original_response(datastore_path=datastore_path)
rss_token = extract_rss_token_from_UI(client)
test_url = url_for('test_endpoint', _external=True)
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
# Request RSS feed for the single watch
res = client.get(
url_for("rss.rss_single_watch", uuid=uuid, token=rss_token, _external=True),
follow_redirects=True
)
assert res.status_code == 400
assert b'does not have enough history snapshots to show' in res.data
def test_rss_single_watch_order(client, live_server, measure_memory_usage, datastore_path):
"""
Test that single watch RSS feed shows changes in correct order (newest first).
"""
# Create initial content
def set_response(datastore_path, version):
test_return_data = f"""<html>
<body>
<p>Version {version} content</p>
</body>
</html>
"""
with open(os.path.join(datastore_path, "endpoint-content.txt"), "w") as f:
f.write(test_return_data)
# Start with version 1
set_response(datastore_path, 1)
# Add a watch
test_url = url_for('test_endpoint', _external=True) + "?order_test=1"
res = client.post(
url_for("ui.ui_views.form_quick_watch_add"),
data={"url": test_url, "tags": 'test-tag'},
follow_redirects=True
)
assert b"Watch added" in res.data
# Get the watch UUID
watch_uuid = extract_UUID_from_client(client)
# Wait for initial check
wait_for_all_checks(client)
# Create multiple versions by triggering changes
for version in range(2, 6): # Create versions 2, 3, 4, 5
set_response(datastore_path, version)
res = client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
time.sleep(0.5) # Small delay to ensure different timestamps
# Get RSS token
rss_token = extract_rss_token_from_UI(client)
# Request RSS feed for the single watch
res = client.get(
url_for("rss.rss_single_watch", uuid=watch_uuid, token=rss_token, _external=True),
follow_redirects=True
)
# Should return valid RSS
assert res.status_code == 200
assert b"<?xml" in res.data or b"<rss" in res.data
# Parse the RSS/XML
root = ET.fromstring(res.data)
# Find all items (RSS 2.0) or entries (Atom)
items = root.findall('.//item')
if not items:
items = root.findall('.//{http://www.w3.org/2005/Atom}entry')
# Should have multiple items
assert len(items) >= 3, f"Expected at least 3 items, got {len(items)}"
# Get the descriptions/content from first 3 items
descriptions = []
for item in items[:3]:
# Try RSS format first
desc = item.findtext('description')
if not desc:
# Try Atom format
content_elem = item.find('{http://www.w3.org/2005/Atom}content')
if content_elem is not None:
desc = content_elem.text
descriptions.append(desc if desc else "")
print(f"First item content: {descriptions[0][:100] if descriptions[0] else 'None'}")
print(f"Second item content: {descriptions[1][:100] if descriptions[1] else 'None'}")
print(f"Third item content: {descriptions[2][:100] if descriptions[2] else 'None'}")
# The FIRST item should contain the NEWEST change (Version 5)
# The SECOND item should contain Version 4
# The THIRD item should contain Version 3
assert b"Version 5" in descriptions[0].encode() or "Version 5" in descriptions[0], \
f"First item should show newest change (Version 5), but got: {descriptions[0][:200]}"
# Verify the order is correct
assert b"Version 4" in descriptions[1].encode() or "Version 4" in descriptions[1], \
f"Second item should show Version 4, but got: {descriptions[1][:200]}"
assert b"Version 3" in descriptions[2].encode() or "Version 3" in descriptions[2], \
f"Third item should show Version 3, but got: {descriptions[2][:200]}"
# Clean up
delete_all_watches(client)
def test_rss_categories_from_tags(client, live_server, measure_memory_usage, datastore_path):
"""
Test that RSS feeds include category tags from watch tags.
"""
# Create initial content
test_return_data = """<html>
<body>
<p>Test content for RSS categories</p>
</body>
</html>
"""
with open(os.path.join(datastore_path, "endpoint-content.txt"), "w") as f:
f.write(test_return_data)
# Create some tags first
res = client.post(
url_for("tags.form_tag_add"),
data={"name": "Security"},
follow_redirects=True
)
res = client.post(
url_for("tags.form_tag_add"),
data={"name": "Python"},
follow_redirects=True
)
res = client.post(
url_for("tags.form_tag_add"),
data={"name": "Tech News"},
follow_redirects=True
)
# Add a watch with tags
test_url = url_for('test_endpoint', _external=True) + "?category_test=1"
res = client.post(
url_for("ui.ui_views.form_quick_watch_add"),
data={"url": test_url, "tags": "Security, Python, Tech News"},
follow_redirects=True
)
assert b"Watch added" in res.data
# Get the watch UUID
watch_uuid = extract_UUID_from_client(client)
# Wait for initial check
wait_for_all_checks(client)
# Trigger one change
test_return_data_v2 = """<html>
<body>
<p>Updated content for RSS categories</p>
</body>
</html>
"""
with open(os.path.join(datastore_path, "endpoint-content.txt"), "w") as f:
f.write(test_return_data_v2)
res = client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
# Get RSS token
rss_token = extract_rss_token_from_UI(client)
# Test 1: Check single watch RSS feed
res = client.get(
url_for("rss.rss_single_watch", uuid=watch_uuid, token=rss_token, _external=True),
follow_redirects=True
)
assert res.status_code == 200
assert b"<?xml" in res.data or b"<rss" in res.data
# Parse the RSS/XML
root = ET.fromstring(res.data)
# Find all items
items = root.findall('.//item')
assert len(items) >= 1, "Expected at least 1 item in RSS feed"
# Get categories from first item
categories = [cat.text for cat in items[0].findall('category')]
print(f"Found categories in single watch RSS: {categories}")
# Should have all three categories
assert "Security" in categories, f"Expected 'Security' category, got: {categories}"
assert "Python" in categories, f"Expected 'Python' category, got: {categories}"
assert "Tech News" in categories, f"Expected 'Tech News' category, got: {categories}"
assert len(categories) == 3, f"Expected 3 categories, got {len(categories)}: {categories}"
# Test 2: Check main RSS feed
res = client.get(
url_for("rss.feed", token=rss_token, _external=True),
follow_redirects=True
)
assert res.status_code == 200
root = ET.fromstring(res.data)
items = root.findall('.//item')
assert len(items) >= 1, "Expected at least 1 item in main RSS feed"
# Get categories from first item in main feed
categories = [cat.text for cat in items[0].findall('category')]
print(f"Found categories in main RSS feed: {categories}")
# Should have all three categories
assert "Security" in categories, f"Expected 'Security' category in main feed, got: {categories}"
assert "Python" in categories, f"Expected 'Python' category in main feed, got: {categories}"
assert "Tech News" in categories, f"Expected 'Tech News' category in main feed, got: {categories}"
# Test 3: Check tag-specific RSS feed (should also have categories)
# Get the tag UUID for "Security" and verify the tag feed also has categories
from .util import get_UUID_for_tag_name
security_tag_uuid = get_UUID_for_tag_name(client, name="Security")
if security_tag_uuid:
res = client.get(
url_for("rss.rss_tag_feed", tag_uuid=security_tag_uuid, token=rss_token, _external=True),
follow_redirects=True
)
assert res.status_code == 200
root = ET.fromstring(res.data)
items = root.findall('.//item')
if len(items) >= 1:
categories = [cat.text for cat in items[0].findall('category')]
print(f"Found categories in tag RSS feed: {categories}")
# Should still have all three categories
assert "Security" in categories, f"Expected 'Security' category in tag feed, got: {categories}"
assert "Python" in categories, f"Expected 'Python' category in tag feed, got: {categories}"
assert "Tech News" in categories, f"Expected 'Tech News' category in tag feed, got: {categories}"
# Clean up
delete_all_watches(client)
# RSS <description> should follow Main Settings -> Tag/Group -> Watch in that order of priority if set.
def test_rss_single_watch_follow_notification_body(client, live_server, measure_memory_usage, datastore_path):
rss_token = extract_rss_token_from_UI(client)
res = client.post(
url_for("settings.settings_page"),
data={
"application-fetch_backend": "html_requests",
"application-minutes_between_check": 180,
"application-notification_body": 'Boo yeah hello from main settings notification body<br>\nTitle: {{ watch_title }} changed',
"application-notification_format": default_notification_format,
"application-rss_template_type" : 'notification_body',
"application-notification_urls": "",
},
follow_redirects=True
)
assert b'Settings updated' in res.data
set_original_response(datastore_path=datastore_path)
# Add our URL to the import page
test_url = url_for('test_endpoint', _external=True)
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url, tag="RSS-Custom")
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
set_modified_response(datastore_path=datastore_path)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
# Request RSS feed for the single watch
res = client.get(
url_for("rss.rss_single_watch", uuid=uuid, token=rss_token, _external=True),
follow_redirects=True
)
# Should return valid RSS
assert res.status_code == 200
assert b"<?xml" in res.data or b"<rss" in res.data
# Check it took the notification body from main settings ####
item_description = ET.fromstring(res.data).findall('.//item')[0].findtext('description')
assert "Boo yeah hello from main settings notification body" in item_description
assert "Title: modified head" in item_description
## Edit the tag notification_body, it should cascade up and become the RSS output
res = client.post(
url_for("tags.form_tag_edit_submit", uuid="first"),
data={"name": "rss-custom",
"notification_body": 'Hello from the group/tag level'},
follow_redirects=True
)
assert b"Updated" in res.data
res = client.get(
url_for("rss.rss_single_watch", uuid=uuid, token=rss_token, _external=True),
follow_redirects=True
)
item_description = ET.fromstring(res.data).findall('.//item')[0].findtext('description')
assert 'Hello from the group/tag level' in item_description
# Override notification body at watch level and check ####
res = client.post(
url_for("ui.ui_edit.edit_page", uuid=uuid),
data={"notification_body": "RSS body description set from watch level at notification body - {{ watch_title }}",
"url": test_url,
'fetch_backend': "html_requests",
"time_between_check_use_default": "y"
},
follow_redirects=True
)
assert b"Updated watch." in res.data
res = client.get(
url_for("rss.rss_single_watch", uuid=uuid, token=rss_token, _external=True),
follow_redirects=True
)
item_description = ET.fromstring(res.data).findall('.//item')[0].findtext('description')
assert 'RSS body description set from watch level at notification body - modified head title' in item_description
delete_all_watches(client)

View File

@@ -110,13 +110,14 @@ def get_UUID_for_tag_name(client, name):
# kinda funky, but works for now
def extract_rss_token_from_UI(client):
import re
res = client.get(
url_for("watchlist.index"),
)
m = re.search('token=(.+?)"', str(res.data))
token_key = m.group(1)
return token_key.strip()
return client.application.config.get('DATASTORE').data['settings']['application'].get('rss_access_token')
# import re
# res = client.get(
# url_for("watchlist.index"),
# )
# m = re.search('token=(.+?)"', str(res.data))
# token_key = m.group(1)
# return token_key.strip()
# kinda funky, but works for now
def extract_UUID_from_client(client):