mirror of
https://github.com/dgtlmoon/changedetection.io.git
synced 2026-02-06 14:26:07 +00:00
Compare commits
2 Commits
python-314
...
rss-per-gr
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
eb8b2b98c0 | ||
|
|
ab9774cf2d |
95
changedetectionio/blueprint/rss/_util.py
Normal file
95
changedetectionio/blueprint/rss/_util.py
Normal file
@@ -0,0 +1,95 @@
|
|||||||
|
"""
|
||||||
|
Utility functions for RSS feed generation.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from changedetectionio.jinja2_custom import render as jinja_render
|
||||||
|
from changedetectionio.notification.handler import apply_service_tweaks
|
||||||
|
from loguru import logger
|
||||||
|
import re
|
||||||
|
|
||||||
|
|
||||||
|
BAD_CHARS_REGEX = r'[\x00-\x08\x0B\x0C\x0E-\x1F]'
|
||||||
|
|
||||||
|
|
||||||
|
def scan_invalid_chars_in_rss(content):
|
||||||
|
"""
|
||||||
|
Scan for invalid characters in RSS content.
|
||||||
|
Returns True if invalid characters are found.
|
||||||
|
"""
|
||||||
|
for match in re.finditer(BAD_CHARS_REGEX, content):
|
||||||
|
i = match.start()
|
||||||
|
bad_char = content[i]
|
||||||
|
hex_value = f"0x{ord(bad_char):02x}"
|
||||||
|
# Grab context
|
||||||
|
start = max(0, i - 20)
|
||||||
|
end = min(len(content), i + 21)
|
||||||
|
context = content[start:end].replace('\n', '\\n').replace('\r', '\\r')
|
||||||
|
logger.warning(f"Invalid char {hex_value} at pos {i}: ...{context}...")
|
||||||
|
# First match is enough
|
||||||
|
return True
|
||||||
|
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
def clean_entry_content(content):
|
||||||
|
"""
|
||||||
|
Remove invalid characters from RSS content.
|
||||||
|
"""
|
||||||
|
cleaned = re.sub(BAD_CHARS_REGEX, '', content)
|
||||||
|
return cleaned
|
||||||
|
|
||||||
|
|
||||||
|
def generate_watch_guid(watch):
|
||||||
|
"""
|
||||||
|
Generate a unique GUID for a watch RSS entry.
|
||||||
|
"""
|
||||||
|
return f"{watch['uuid']}/{watch.last_changed}"
|
||||||
|
|
||||||
|
|
||||||
|
def generate_watch_diff_content(watch, dates, rss_content_format, datastore):
|
||||||
|
"""
|
||||||
|
Generate HTML diff content for a watch given its history dates.
|
||||||
|
Returns tuple of (content, watch_label).
|
||||||
|
|
||||||
|
Args:
|
||||||
|
watch: The watch object
|
||||||
|
dates: List of history snapshot dates
|
||||||
|
rss_content_format: Format for RSS content (html or text)
|
||||||
|
datastore: The ChangeDetectionStore instance
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Tuple of (content, watch_label) - the rendered HTML content and watch label
|
||||||
|
"""
|
||||||
|
from changedetectionio import diff
|
||||||
|
|
||||||
|
# Same logic as watch-overview.html
|
||||||
|
if datastore.data['settings']['application']['ui'].get('use_page_title_in_list') or watch.get('use_page_title_in_list'):
|
||||||
|
watch_label = watch.label
|
||||||
|
else:
|
||||||
|
watch_label = watch.get('url')
|
||||||
|
|
||||||
|
try:
|
||||||
|
html_diff = diff.render_diff(
|
||||||
|
previous_version_file_contents=watch.get_history_snapshot(timestamp=dates[-2]),
|
||||||
|
newest_version_file_contents=watch.get_history_snapshot(timestamp=dates[-1]),
|
||||||
|
include_equal=False
|
||||||
|
)
|
||||||
|
|
||||||
|
requested_output_format = datastore.data['settings']['application'].get('rss_content_format')
|
||||||
|
url, html_diff, n_title = apply_service_tweaks(url='', n_body=html_diff, n_title=None, requested_output_format=requested_output_format)
|
||||||
|
|
||||||
|
except FileNotFoundError as e:
|
||||||
|
html_diff = f"History snapshot file for watch {watch.get('uuid')}@{watch.last_changed} - '{watch.get('title')} not found."
|
||||||
|
|
||||||
|
# @note: We use <pre> because nearly all RSS readers render only HTML (Thunderbird for example cant do just plaintext)
|
||||||
|
rss_template = "<pre>{{watch_label}} had a change.\n\n{{html_diff}}\n</pre>"
|
||||||
|
if 'html' in rss_content_format:
|
||||||
|
rss_template = "<html><body>\n<h4><a href=\"{{watch_url}}\">{{watch_label}}</a></h4>\n<p>{{html_diff}}</p>\n</body></html>\n"
|
||||||
|
|
||||||
|
content = jinja_render(template_str=rss_template, watch_label=watch_label, html_diff=html_diff, watch_url=watch.link)
|
||||||
|
|
||||||
|
# Out of range chars could also break feedgen
|
||||||
|
if scan_invalid_chars_in_rss(content):
|
||||||
|
content = clean_entry_content(content)
|
||||||
|
|
||||||
|
return content, watch_label
|
||||||
@@ -1,224 +1,26 @@
|
|||||||
|
|
||||||
from changedetectionio.jinja2_custom import render as jinja_render
|
|
||||||
from changedetectionio.notification.handler import apply_service_tweaks
|
|
||||||
from changedetectionio.store import ChangeDetectionStore
|
from changedetectionio.store import ChangeDetectionStore
|
||||||
from feedgen.feed import FeedGenerator
|
from flask import Blueprint
|
||||||
from flask import Blueprint, make_response, request, url_for, redirect
|
|
||||||
from loguru import logger
|
|
||||||
import datetime
|
|
||||||
import pytz
|
|
||||||
import re
|
|
||||||
import time
|
|
||||||
|
|
||||||
|
from . import tag as tag_routes
|
||||||
BAD_CHARS_REGEX=r'[\x00-\x08\x0B\x0C\x0E-\x1F]'
|
from . import main_feed
|
||||||
|
from . import single_watch
|
||||||
# Anything that is not text/UTF-8 should be stripped before it breaks feedgen (such as binary data etc)
|
|
||||||
def scan_invalid_chars_in_rss(content):
|
|
||||||
for match in re.finditer(BAD_CHARS_REGEX, content):
|
|
||||||
i = match.start()
|
|
||||||
bad_char = content[i]
|
|
||||||
hex_value = f"0x{ord(bad_char):02x}"
|
|
||||||
# Grab context
|
|
||||||
start = max(0, i - 20)
|
|
||||||
end = min(len(content), i + 21)
|
|
||||||
context = content[start:end].replace('\n', '\\n').replace('\r', '\\r')
|
|
||||||
logger.warning(f"Invalid char {hex_value} at pos {i}: ...{context}...")
|
|
||||||
# First match is enough
|
|
||||||
return True
|
|
||||||
|
|
||||||
return False
|
|
||||||
|
|
||||||
|
|
||||||
def clean_entry_content(content):
|
|
||||||
cleaned = re.sub(BAD_CHARS_REGEX, '', content)
|
|
||||||
return cleaned
|
|
||||||
|
|
||||||
def construct_blueprint(datastore: ChangeDetectionStore):
|
def construct_blueprint(datastore: ChangeDetectionStore):
|
||||||
|
"""
|
||||||
|
Construct and configure the RSS blueprint with all routes.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
datastore: The ChangeDetectionStore instance
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
The configured Flask blueprint
|
||||||
|
"""
|
||||||
rss_blueprint = Blueprint('rss', __name__)
|
rss_blueprint = Blueprint('rss', __name__)
|
||||||
|
|
||||||
# Helper function to generate GUID for RSS entries
|
# Register all route modules
|
||||||
def generate_watch_guid(watch):
|
main_feed.construct_main_feed_routes(rss_blueprint, datastore)
|
||||||
"""Generate a unique GUID for a watch RSS entry."""
|
single_watch.construct_single_watch_routes(rss_blueprint, datastore)
|
||||||
return f"{watch['uuid']}/{watch.last_changed}"
|
tag_routes.construct_tag_routes(rss_blueprint, datastore)
|
||||||
|
|
||||||
# Helper function to generate diff content for a watch
|
|
||||||
def generate_watch_diff_content(watch, dates, rss_content_format):
|
|
||||||
"""
|
|
||||||
Generate HTML diff content for a watch given its history dates.
|
|
||||||
Returns the rendered HTML content ready for RSS/display.
|
|
||||||
"""
|
|
||||||
from changedetectionio import diff
|
|
||||||
|
|
||||||
# Same logic as watch-overview.html
|
|
||||||
if datastore.data['settings']['application']['ui'].get('use_page_title_in_list') or watch.get('use_page_title_in_list'):
|
|
||||||
watch_label = watch.label
|
|
||||||
else:
|
|
||||||
watch_label = watch.get('url')
|
|
||||||
|
|
||||||
try:
|
|
||||||
html_diff = diff.render_diff(
|
|
||||||
previous_version_file_contents=watch.get_history_snapshot(timestamp=dates[-2]),
|
|
||||||
newest_version_file_contents=watch.get_history_snapshot(timestamp=dates[-1]),
|
|
||||||
include_equal=False
|
|
||||||
)
|
|
||||||
|
|
||||||
requested_output_format = datastore.data['settings']['application'].get('rss_content_format')
|
|
||||||
url, html_diff, n_title = apply_service_tweaks(url='', n_body=html_diff, n_title=None, requested_output_format=requested_output_format)
|
|
||||||
|
|
||||||
except FileNotFoundError as e:
|
|
||||||
html_diff = f"History snapshot file for watch {watch.get('uuid')}@{watch.last_changed} - '{watch.get('title')} not found."
|
|
||||||
|
|
||||||
# @note: We use <pre> because nearly all RSS readers render only HTML (Thunderbird for example cant do just plaintext)
|
|
||||||
rss_template = "<pre>{{watch_label}} had a change.\n\n{{html_diff}}\n</pre>"
|
|
||||||
if 'html' in rss_content_format:
|
|
||||||
rss_template = "<html><body>\n<h4><a href=\"{{watch_url}}\">{{watch_label}}</a></h4>\n<p>{{html_diff}}</p>\n</body></html>\n"
|
|
||||||
|
|
||||||
content = jinja_render(template_str=rss_template, watch_label=watch_label, html_diff=html_diff, watch_url=watch.link)
|
|
||||||
|
|
||||||
# Out of range chars could also break feedgen
|
|
||||||
if scan_invalid_chars_in_rss(content):
|
|
||||||
content = clean_entry_content(content)
|
|
||||||
|
|
||||||
return content, watch_label
|
|
||||||
|
|
||||||
# Some RSS reader situations ended up with rss/ (forward slash after RSS) due
|
|
||||||
# to some earlier blueprint rerouting work, it should goto feed.
|
|
||||||
@rss_blueprint.route("/", methods=['GET'])
|
|
||||||
def extraslash():
|
|
||||||
return redirect(url_for('rss.feed'))
|
|
||||||
|
|
||||||
# Import the login decorator if needed
|
|
||||||
# from changedetectionio.auth_decorator import login_optionally_required
|
|
||||||
@rss_blueprint.route("", methods=['GET'])
|
|
||||||
def feed():
|
|
||||||
now = time.time()
|
|
||||||
# Always requires token set
|
|
||||||
app_rss_token = datastore.data['settings']['application'].get('rss_access_token')
|
|
||||||
rss_url_token = request.args.get('token')
|
|
||||||
rss_content_format = datastore.data['settings']['application'].get('rss_content_format')
|
|
||||||
|
|
||||||
if rss_url_token != app_rss_token:
|
|
||||||
return "Access denied, bad token", 403
|
|
||||||
|
|
||||||
from changedetectionio import diff
|
|
||||||
limit_tag = request.args.get('tag', '').lower().strip()
|
|
||||||
# Be sure limit_tag is a uuid
|
|
||||||
for uuid, tag in datastore.data['settings']['application'].get('tags', {}).items():
|
|
||||||
if limit_tag == tag.get('title', '').lower().strip():
|
|
||||||
limit_tag = uuid
|
|
||||||
|
|
||||||
# Sort by last_changed and add the uuid which is usually the key..
|
|
||||||
sorted_watches = []
|
|
||||||
|
|
||||||
# @todo needs a .itemsWithTag() or something - then we can use that in Jinaj2 and throw this away
|
|
||||||
for uuid, watch in datastore.data['watching'].items():
|
|
||||||
# @todo tag notification_muted skip also (improve Watch model)
|
|
||||||
if datastore.data['settings']['application'].get('rss_hide_muted_watches') and watch.get('notification_muted'):
|
|
||||||
continue
|
|
||||||
if limit_tag and not limit_tag in watch['tags']:
|
|
||||||
continue
|
|
||||||
watch['uuid'] = uuid
|
|
||||||
sorted_watches.append(watch)
|
|
||||||
|
|
||||||
sorted_watches.sort(key=lambda x: x.last_changed, reverse=False)
|
|
||||||
|
|
||||||
fg = FeedGenerator()
|
|
||||||
fg.title('changedetection.io')
|
|
||||||
fg.description('Feed description')
|
|
||||||
fg.link(href='https://changedetection.io')
|
|
||||||
|
|
||||||
for watch in sorted_watches:
|
|
||||||
|
|
||||||
dates = list(watch.history.keys())
|
|
||||||
# Re #521 - Don't bother processing this one if theres less than 2 snapshots, means we never had a change detected.
|
|
||||||
if len(dates) < 2:
|
|
||||||
continue
|
|
||||||
|
|
||||||
if not watch.viewed:
|
|
||||||
# Re #239 - GUID needs to be individual for each event
|
|
||||||
# @todo In the future make this a configurable link back (see work on BASE_URL https://github.com/dgtlmoon/changedetection.io/pull/228)
|
|
||||||
guid = generate_watch_guid(watch)
|
|
||||||
fe = fg.add_entry()
|
|
||||||
|
|
||||||
# Include a link to the diff page, they will have to login here to see if password protection is enabled.
|
|
||||||
# Description is the page you watch, link takes you to the diff JS UI page
|
|
||||||
# Dict val base_url will get overriden with the env var if it is set.
|
|
||||||
ext_base_url = datastore.data['settings']['application'].get('active_base_url')
|
|
||||||
# @todo fix
|
|
||||||
|
|
||||||
# Because we are called via whatever web server, flask should figure out the right path (
|
|
||||||
diff_link = {'href': url_for('ui.ui_views.diff_history_page', uuid=watch['uuid'], _external=True)}
|
|
||||||
|
|
||||||
fe.link(link=diff_link)
|
|
||||||
|
|
||||||
content, watch_label = generate_watch_diff_content(watch, dates, rss_content_format)
|
|
||||||
|
|
||||||
fe.title(title=watch_label)
|
|
||||||
fe.content(content=content, type='CDATA')
|
|
||||||
fe.guid(guid, permalink=False)
|
|
||||||
dt = datetime.datetime.fromtimestamp(int(watch.newest_history_key))
|
|
||||||
dt = dt.replace(tzinfo=pytz.UTC)
|
|
||||||
fe.pubDate(dt)
|
|
||||||
|
|
||||||
response = make_response(fg.rss_str())
|
|
||||||
response.headers.set('Content-Type', 'application/rss+xml;charset=utf-8')
|
|
||||||
logger.trace(f"RSS generated in {time.time() - now:.3f}s")
|
|
||||||
return response
|
|
||||||
|
|
||||||
@rss_blueprint.route("/watch/<string:uuid>", methods=['GET'])
|
|
||||||
def rss_single_watch(uuid):
|
|
||||||
"""
|
|
||||||
Display the most recent change for a single watch as RSS feed.
|
|
||||||
Returns RSS XML with a single entry showing the diff between the last two snapshots.
|
|
||||||
"""
|
|
||||||
# Always requires token set
|
|
||||||
app_rss_token = datastore.data['settings']['application'].get('rss_access_token')
|
|
||||||
rss_url_token = request.args.get('token')
|
|
||||||
rss_content_format = datastore.data['settings']['application'].get('rss_content_format')
|
|
||||||
|
|
||||||
if rss_url_token != app_rss_token:
|
|
||||||
return "Access denied, bad token", 403
|
|
||||||
|
|
||||||
# Get the watch by UUID
|
|
||||||
watch = datastore.data['watching'].get(uuid)
|
|
||||||
if not watch:
|
|
||||||
return f"Watch with UUID {uuid} not found", 404
|
|
||||||
|
|
||||||
# Check if watch has at least 2 history snapshots
|
|
||||||
dates = list(watch.history.keys())
|
|
||||||
if len(dates) < 2:
|
|
||||||
return f"Watch {uuid} does not have enough history snapshots to show changes (need at least 2)", 400
|
|
||||||
|
|
||||||
# Add uuid to watch for proper functioning
|
|
||||||
watch['uuid'] = uuid
|
|
||||||
|
|
||||||
# Generate the diff content using the shared helper function
|
|
||||||
content, watch_label = generate_watch_diff_content(watch, dates, rss_content_format)
|
|
||||||
|
|
||||||
# Create RSS feed with single entry
|
|
||||||
fg = FeedGenerator()
|
|
||||||
fg.title(f'changedetection.io - {watch.label}')
|
|
||||||
fg.description('Changes')
|
|
||||||
fg.link(href='https://changedetection.io')
|
|
||||||
|
|
||||||
# Add single entry for this watch
|
|
||||||
guid = generate_watch_guid(watch)
|
|
||||||
fe = fg.add_entry()
|
|
||||||
|
|
||||||
# Include a link to the diff page
|
|
||||||
diff_link = {'href': url_for('ui.ui_views.diff_history_page', uuid=watch['uuid'], _external=True)}
|
|
||||||
fe.link(link=diff_link)
|
|
||||||
|
|
||||||
fe.title(title=watch_label)
|
|
||||||
fe.content(content=content, type='CDATA')
|
|
||||||
fe.guid(guid, permalink=False)
|
|
||||||
dt = datetime.datetime.fromtimestamp(int(watch.newest_history_key))
|
|
||||||
dt = dt.replace(tzinfo=pytz.UTC)
|
|
||||||
fe.pubDate(dt)
|
|
||||||
|
|
||||||
response = make_response(fg.rss_str())
|
|
||||||
response.headers.set('Content-Type', 'application/rss+xml;charset=utf-8')
|
|
||||||
return response
|
|
||||||
|
|
||||||
return rss_blueprint
|
return rss_blueprint
|
||||||
101
changedetectionio/blueprint/rss/main_feed.py
Normal file
101
changedetectionio/blueprint/rss/main_feed.py
Normal file
@@ -0,0 +1,101 @@
|
|||||||
|
from flask import make_response, request, url_for, redirect
|
||||||
|
from feedgen.feed import FeedGenerator
|
||||||
|
from loguru import logger
|
||||||
|
import datetime
|
||||||
|
import pytz
|
||||||
|
import time
|
||||||
|
|
||||||
|
from ._util import generate_watch_guid, generate_watch_diff_content
|
||||||
|
|
||||||
|
|
||||||
|
def construct_main_feed_routes(rss_blueprint, datastore):
|
||||||
|
"""
|
||||||
|
Construct the main RSS feed routes.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
rss_blueprint: The Flask blueprint to add routes to
|
||||||
|
datastore: The ChangeDetectionStore instance
|
||||||
|
"""
|
||||||
|
|
||||||
|
# Some RSS reader situations ended up with rss/ (forward slash after RSS) due
|
||||||
|
# to some earlier blueprint rerouting work, it should goto feed.
|
||||||
|
@rss_blueprint.route("/", methods=['GET'])
|
||||||
|
def extraslash():
|
||||||
|
return redirect(url_for('rss.feed'))
|
||||||
|
|
||||||
|
# Import the login decorator if needed
|
||||||
|
# from changedetectionio.auth_decorator import login_optionally_required
|
||||||
|
@rss_blueprint.route("", methods=['GET'])
|
||||||
|
def feed():
|
||||||
|
now = time.time()
|
||||||
|
# Always requires token set
|
||||||
|
app_rss_token = datastore.data['settings']['application'].get('rss_access_token')
|
||||||
|
rss_url_token = request.args.get('token')
|
||||||
|
rss_content_format = datastore.data['settings']['application'].get('rss_content_format')
|
||||||
|
|
||||||
|
if rss_url_token != app_rss_token:
|
||||||
|
return "Access denied, bad token", 403
|
||||||
|
|
||||||
|
limit_tag = request.args.get('tag', '').lower().strip()
|
||||||
|
# Be sure limit_tag is a uuid
|
||||||
|
for uuid, tag in datastore.data['settings']['application'].get('tags', {}).items():
|
||||||
|
if limit_tag == tag.get('title', '').lower().strip():
|
||||||
|
limit_tag = uuid
|
||||||
|
|
||||||
|
# Sort by last_changed and add the uuid which is usually the key..
|
||||||
|
sorted_watches = []
|
||||||
|
|
||||||
|
# @todo needs a .itemsWithTag() or something - then we can use that in Jinaj2 and throw this away
|
||||||
|
for uuid, watch in datastore.data['watching'].items():
|
||||||
|
# @todo tag notification_muted skip also (improve Watch model)
|
||||||
|
if datastore.data['settings']['application'].get('rss_hide_muted_watches') and watch.get('notification_muted'):
|
||||||
|
continue
|
||||||
|
if limit_tag and not limit_tag in watch['tags']:
|
||||||
|
continue
|
||||||
|
watch['uuid'] = uuid
|
||||||
|
sorted_watches.append(watch)
|
||||||
|
|
||||||
|
sorted_watches.sort(key=lambda x: x.last_changed, reverse=False)
|
||||||
|
|
||||||
|
fg = FeedGenerator()
|
||||||
|
fg.title('changedetection.io')
|
||||||
|
fg.description('Feed description')
|
||||||
|
fg.link(href='https://changedetection.io')
|
||||||
|
|
||||||
|
for watch in sorted_watches:
|
||||||
|
|
||||||
|
dates = list(watch.history.keys())
|
||||||
|
# Re #521 - Don't bother processing this one if theres less than 2 snapshots, means we never had a change detected.
|
||||||
|
if len(dates) < 2:
|
||||||
|
continue
|
||||||
|
|
||||||
|
if not watch.viewed:
|
||||||
|
# Re #239 - GUID needs to be individual for each event
|
||||||
|
# @todo In the future make this a configurable link back (see work on BASE_URL https://github.com/dgtlmoon/changedetection.io/pull/228)
|
||||||
|
guid = generate_watch_guid(watch)
|
||||||
|
fe = fg.add_entry()
|
||||||
|
|
||||||
|
# Include a link to the diff page, they will have to login here to see if password protection is enabled.
|
||||||
|
# Description is the page you watch, link takes you to the diff JS UI page
|
||||||
|
# Dict val base_url will get overriden with the env var if it is set.
|
||||||
|
ext_base_url = datastore.data['settings']['application'].get('active_base_url')
|
||||||
|
# @todo fix
|
||||||
|
|
||||||
|
# Because we are called via whatever web server, flask should figure out the right path (
|
||||||
|
diff_link = {'href': url_for('ui.ui_views.diff_history_page', uuid=watch['uuid'], _external=True)}
|
||||||
|
|
||||||
|
fe.link(link=diff_link)
|
||||||
|
|
||||||
|
content, watch_label = generate_watch_diff_content(watch, dates, rss_content_format, datastore)
|
||||||
|
|
||||||
|
fe.title(title=watch_label)
|
||||||
|
fe.content(content=content, type='CDATA')
|
||||||
|
fe.guid(guid, permalink=False)
|
||||||
|
dt = datetime.datetime.fromtimestamp(int(watch.newest_history_key))
|
||||||
|
dt = dt.replace(tzinfo=pytz.UTC)
|
||||||
|
fe.pubDate(dt)
|
||||||
|
|
||||||
|
response = make_response(fg.rss_str())
|
||||||
|
response.headers.set('Content-Type', 'application/rss+xml;charset=utf-8')
|
||||||
|
logger.trace(f"RSS generated in {time.time() - now:.3f}s")
|
||||||
|
return response
|
||||||
71
changedetectionio/blueprint/rss/single_watch.py
Normal file
71
changedetectionio/blueprint/rss/single_watch.py
Normal file
@@ -0,0 +1,71 @@
|
|||||||
|
from flask import make_response, request, url_for
|
||||||
|
from feedgen.feed import FeedGenerator
|
||||||
|
import datetime
|
||||||
|
import pytz
|
||||||
|
|
||||||
|
from ._util import generate_watch_guid, generate_watch_diff_content
|
||||||
|
|
||||||
|
|
||||||
|
def construct_single_watch_routes(rss_blueprint, datastore):
|
||||||
|
"""
|
||||||
|
Construct RSS feed routes for single watches.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
rss_blueprint: The Flask blueprint to add routes to
|
||||||
|
datastore: The ChangeDetectionStore instance
|
||||||
|
"""
|
||||||
|
|
||||||
|
@rss_blueprint.route("/watch/<string:uuid>", methods=['GET'])
|
||||||
|
def rss_single_watch(uuid):
|
||||||
|
"""
|
||||||
|
Display the most recent change for a single watch as RSS feed.
|
||||||
|
Returns RSS XML with a single entry showing the diff between the last two snapshots.
|
||||||
|
"""
|
||||||
|
# Always requires token set
|
||||||
|
app_rss_token = datastore.data['settings']['application'].get('rss_access_token')
|
||||||
|
rss_url_token = request.args.get('token')
|
||||||
|
rss_content_format = datastore.data['settings']['application'].get('rss_content_format')
|
||||||
|
|
||||||
|
if rss_url_token != app_rss_token:
|
||||||
|
return "Access denied, bad token", 403
|
||||||
|
|
||||||
|
# Get the watch by UUID
|
||||||
|
watch = datastore.data['watching'].get(uuid)
|
||||||
|
if not watch:
|
||||||
|
return f"Watch with UUID {uuid} not found", 404
|
||||||
|
|
||||||
|
# Check if watch has at least 2 history snapshots
|
||||||
|
dates = list(watch.history.keys())
|
||||||
|
if len(dates) < 2:
|
||||||
|
return f"Watch {uuid} does not have enough history snapshots to show changes (need at least 2)", 400
|
||||||
|
|
||||||
|
# Add uuid to watch for proper functioning
|
||||||
|
watch['uuid'] = uuid
|
||||||
|
|
||||||
|
# Generate the diff content using the shared helper function
|
||||||
|
content, watch_label = generate_watch_diff_content(watch, dates, rss_content_format, datastore)
|
||||||
|
|
||||||
|
# Create RSS feed with single entry
|
||||||
|
fg = FeedGenerator()
|
||||||
|
fg.title(f'changedetection.io - {watch.label}')
|
||||||
|
fg.description('Changes')
|
||||||
|
fg.link(href='https://changedetection.io')
|
||||||
|
|
||||||
|
# Add single entry for this watch
|
||||||
|
guid = generate_watch_guid(watch)
|
||||||
|
fe = fg.add_entry()
|
||||||
|
|
||||||
|
# Include a link to the diff page
|
||||||
|
diff_link = {'href': url_for('ui.ui_views.diff_history_page', uuid=watch['uuid'], _external=True)}
|
||||||
|
fe.link(link=diff_link)
|
||||||
|
|
||||||
|
fe.title(title=watch_label)
|
||||||
|
fe.content(content=content, type='CDATA')
|
||||||
|
fe.guid(guid, permalink=False)
|
||||||
|
dt = datetime.datetime.fromtimestamp(int(watch.newest_history_key))
|
||||||
|
dt = dt.replace(tzinfo=pytz.UTC)
|
||||||
|
fe.pubDate(dt)
|
||||||
|
|
||||||
|
response = make_response(fg.rss_str())
|
||||||
|
response.headers.set('Content-Type', 'application/rss+xml;charset=utf-8')
|
||||||
|
return response
|
||||||
86
changedetectionio/blueprint/rss/tag.py
Normal file
86
changedetectionio/blueprint/rss/tag.py
Normal file
@@ -0,0 +1,86 @@
|
|||||||
|
from flask import make_response, request, url_for
|
||||||
|
from feedgen.feed import FeedGenerator
|
||||||
|
import datetime
|
||||||
|
import pytz
|
||||||
|
|
||||||
|
|
||||||
|
from ._util import generate_watch_guid, generate_watch_diff_content
|
||||||
|
|
||||||
|
|
||||||
|
def construct_tag_routes(rss_blueprint, datastore):
|
||||||
|
"""
|
||||||
|
Construct RSS feed routes for tags.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
rss_blueprint: The Flask blueprint to add routes to
|
||||||
|
datastore: The ChangeDetectionStore instance
|
||||||
|
"""
|
||||||
|
|
||||||
|
@rss_blueprint.route("/tag/<string:tag_uuid>", methods=['GET'])
|
||||||
|
def rss_tag_feed(tag_uuid):
|
||||||
|
"""
|
||||||
|
Display an RSS feed for all unviewed watches that belong to a specific tag.
|
||||||
|
Returns RSS XML with entries for each unviewed watch with sufficient history.
|
||||||
|
"""
|
||||||
|
# Always requires token set
|
||||||
|
app_rss_token = datastore.data['settings']['application'].get('rss_access_token')
|
||||||
|
rss_url_token = request.args.get('token')
|
||||||
|
rss_content_format = datastore.data['settings']['application'].get('rss_content_format')
|
||||||
|
|
||||||
|
if rss_url_token != app_rss_token:
|
||||||
|
return "Access denied, bad token", 403
|
||||||
|
|
||||||
|
# Verify tag exists
|
||||||
|
tag = datastore.data['settings']['application'].get('tags', {}).get(tag_uuid)
|
||||||
|
if not tag:
|
||||||
|
return f"Tag with UUID {tag_uuid} not found", 404
|
||||||
|
|
||||||
|
tag_title = tag.get('title', 'Unknown Tag')
|
||||||
|
|
||||||
|
# Create RSS feed
|
||||||
|
fg = FeedGenerator()
|
||||||
|
fg.title(f'changedetection.io - {tag_title}')
|
||||||
|
fg.description(f'Changes for watches tagged with {tag_title}')
|
||||||
|
fg.link(href='https://changedetection.io')
|
||||||
|
|
||||||
|
# Find all watches with this tag
|
||||||
|
for uuid, watch in datastore.data['watching'].items():
|
||||||
|
# Skip if watch doesn't have this tag
|
||||||
|
if tag_uuid not in watch.get('tags', []):
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Skip muted watches if configured
|
||||||
|
if datastore.data['settings']['application'].get('rss_hide_muted_watches') and watch.get('notification_muted'):
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Check if watch has at least 2 history snapshots
|
||||||
|
dates = list(watch.history.keys())
|
||||||
|
if len(dates) < 2:
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Only include unviewed watches
|
||||||
|
if not watch.viewed:
|
||||||
|
# Add uuid to watch for proper functioning
|
||||||
|
watch['uuid'] = uuid
|
||||||
|
|
||||||
|
# Generate GUID for this entry
|
||||||
|
guid = generate_watch_guid(watch)
|
||||||
|
fe = fg.add_entry()
|
||||||
|
|
||||||
|
# Include a link to the diff page
|
||||||
|
diff_link = {'href': url_for('ui.ui_views.diff_history_page', uuid=watch['uuid'], _external=True)}
|
||||||
|
fe.link(link=diff_link)
|
||||||
|
|
||||||
|
# Generate diff content
|
||||||
|
content, watch_label = generate_watch_diff_content(watch, dates, rss_content_format, datastore)
|
||||||
|
|
||||||
|
fe.title(title=watch_label)
|
||||||
|
fe.content(content=content, type='CDATA')
|
||||||
|
fe.guid(guid, permalink=False)
|
||||||
|
dt = datetime.datetime.fromtimestamp(int(watch.newest_history_key))
|
||||||
|
dt = dt.replace(tzinfo=pytz.UTC)
|
||||||
|
fe.pubDate(dt)
|
||||||
|
|
||||||
|
response = make_response(fg.rss_str())
|
||||||
|
response.headers.set('Content-Type', 'application/rss+xml;charset=utf-8')
|
||||||
|
return response
|
||||||
@@ -21,9 +21,10 @@ def construct_blueprint(datastore: ChangeDetectionStore):
|
|||||||
tag_count = Counter(tag for watch in datastore.data['watching'].values() if watch.get('tags') for tag in watch['tags'])
|
tag_count = Counter(tag for watch in datastore.data['watching'].values() if watch.get('tags') for tag in watch['tags'])
|
||||||
|
|
||||||
output = render_template("groups-overview.html",
|
output = render_template("groups-overview.html",
|
||||||
|
app_rss_token=datastore.data['settings']['application'].get('rss_access_token'),
|
||||||
available_tags=sorted_tags,
|
available_tags=sorted_tags,
|
||||||
form=add_form,
|
form=add_form,
|
||||||
tag_count=tag_count
|
tag_count=tag_count,
|
||||||
)
|
)
|
||||||
|
|
||||||
return output
|
return output
|
||||||
@@ -149,9 +150,9 @@ def construct_blueprint(datastore: ChangeDetectionStore):
|
|||||||
included_content = template.render(**template_args)
|
included_content = template.render(**template_args)
|
||||||
|
|
||||||
output = render_template("edit-tag.html",
|
output = render_template("edit-tag.html",
|
||||||
settings_application=datastore.data['settings']['application'],
|
|
||||||
extra_tab_content=form.extra_tab_content() if form.extra_tab_content() else None,
|
|
||||||
extra_form_content=included_content,
|
extra_form_content=included_content,
|
||||||
|
extra_tab_content=form.extra_tab_content() if form.extra_tab_content() else None,
|
||||||
|
settings_application=datastore.data['settings']['application'],
|
||||||
**template_args
|
**template_args
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|||||||
@@ -52,6 +52,7 @@
|
|||||||
<a class="pure-button pure-button-primary" href="{{ url_for('tags.form_tag_edit', uuid=uuid) }}">Edit</a>
|
<a class="pure-button pure-button-primary" href="{{ url_for('tags.form_tag_edit', uuid=uuid) }}">Edit</a>
|
||||||
<a class="pure-button pure-button-primary" href="{{ url_for('tags.delete', uuid=uuid) }}" title="Deletes and removes tag">Delete</a>
|
<a class="pure-button pure-button-primary" href="{{ url_for('tags.delete', uuid=uuid) }}" title="Deletes and removes tag">Delete</a>
|
||||||
<a class="pure-button pure-button-primary" href="{{ url_for('tags.unlink', uuid=uuid) }}" title="Keep the tag but unlink any watches">Unlink</a>
|
<a class="pure-button pure-button-primary" href="{{ url_for('tags.unlink', uuid=uuid) }}" title="Keep the tag but unlink any watches">Unlink</a>
|
||||||
|
<a href="{{ url_for('rss.rss_tag_feed', tag_uuid=uuid, token=app_rss_token)}}"><img alt="RSS Feed for this watch" style="padding-left: 1em;" src="{{url_for('static_content', group='images', filename='generic_feed-icon.svg')}}" height="15"></a>
|
||||||
</td>
|
</td>
|
||||||
</tr>
|
</tr>
|
||||||
{% endfor %}
|
{% endfor %}
|
||||||
|
|||||||
254
changedetectionio/tests/test_rss_group.py
Normal file
254
changedetectionio/tests/test_rss_group.py
Normal file
@@ -0,0 +1,254 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
|
||||||
|
import time
|
||||||
|
from flask import url_for
|
||||||
|
from .util import live_server_setup, wait_for_all_checks, extract_rss_token_from_UI, get_UUID_for_tag_name, delete_all_watches
|
||||||
|
import os
|
||||||
|
|
||||||
|
|
||||||
|
def set_original_response(datastore_path):
|
||||||
|
test_return_data = """<html>
|
||||||
|
<body>
|
||||||
|
Some initial text<br>
|
||||||
|
<p>Watch 1 content</p>
|
||||||
|
<p>Watch 2 content</p>
|
||||||
|
</body>
|
||||||
|
</html>
|
||||||
|
"""
|
||||||
|
|
||||||
|
with open(os.path.join(datastore_path, "endpoint-content.txt"), "w") as f:
|
||||||
|
f.write(test_return_data)
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def set_modified_response(datastore_path):
|
||||||
|
test_return_data = """<html>
|
||||||
|
<body>
|
||||||
|
Some initial text<br>
|
||||||
|
<p>Watch 1 content MODIFIED</p>
|
||||||
|
<p>Watch 2 content CHANGED</p>
|
||||||
|
</body>
|
||||||
|
</html>
|
||||||
|
"""
|
||||||
|
|
||||||
|
with open(os.path.join(datastore_path, "endpoint-content.txt"), "w") as f:
|
||||||
|
f.write(test_return_data)
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def test_rss_group(client, live_server, measure_memory_usage, datastore_path):
|
||||||
|
"""
|
||||||
|
Test that RSS feed for a specific tag/group shows only watches in that group
|
||||||
|
and displays changes correctly.
|
||||||
|
"""
|
||||||
|
|
||||||
|
set_original_response(datastore_path=datastore_path)
|
||||||
|
|
||||||
|
# Create a tag/group
|
||||||
|
res = client.post(
|
||||||
|
url_for("tags.form_tag_add"),
|
||||||
|
data={"name": "test-rss-group"},
|
||||||
|
follow_redirects=True
|
||||||
|
)
|
||||||
|
assert b"Tag added" in res.data
|
||||||
|
assert b"test-rss-group" in res.data
|
||||||
|
|
||||||
|
# Get the tag UUID
|
||||||
|
tag_uuid = get_UUID_for_tag_name(client, name="test-rss-group")
|
||||||
|
assert tag_uuid is not None
|
||||||
|
|
||||||
|
# Add first watch with the tag
|
||||||
|
test_url_1 = url_for('test_endpoint', _external=True) + "?watch=1"
|
||||||
|
res = client.post(
|
||||||
|
url_for("ui.ui_views.form_quick_watch_add"),
|
||||||
|
data={"url": test_url_1, "tags": 'test-rss-group'},
|
||||||
|
follow_redirects=True
|
||||||
|
)
|
||||||
|
assert b"Watch added" in res.data
|
||||||
|
|
||||||
|
# Add second watch with the tag
|
||||||
|
test_url_2 = url_for('test_endpoint', _external=True) + "?watch=2"
|
||||||
|
res = client.post(
|
||||||
|
url_for("ui.ui_views.form_quick_watch_add"),
|
||||||
|
data={"url": test_url_2, "tags": 'test-rss-group'},
|
||||||
|
follow_redirects=True
|
||||||
|
)
|
||||||
|
assert b"Watch added" in res.data
|
||||||
|
|
||||||
|
# Add a third watch WITHOUT the tag (should not appear in RSS)
|
||||||
|
test_url_3 = url_for('test_endpoint', _external=True) + "?watch=3"
|
||||||
|
res = client.post(
|
||||||
|
url_for("ui.ui_views.form_quick_watch_add"),
|
||||||
|
data={"url": test_url_3, "tags": 'other-tag'},
|
||||||
|
follow_redirects=True
|
||||||
|
)
|
||||||
|
assert b"Watch added" in res.data
|
||||||
|
|
||||||
|
# Wait for initial checks to complete
|
||||||
|
wait_for_all_checks(client)
|
||||||
|
|
||||||
|
# Trigger a change
|
||||||
|
set_modified_response(datastore_path=datastore_path)
|
||||||
|
|
||||||
|
# Recheck all watches
|
||||||
|
res = client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
|
||||||
|
wait_for_all_checks(client)
|
||||||
|
|
||||||
|
# Get RSS token
|
||||||
|
rss_token = extract_rss_token_from_UI(client)
|
||||||
|
assert rss_token is not None
|
||||||
|
|
||||||
|
# Request RSS feed for the specific tag/group using the new endpoint
|
||||||
|
res = client.get(
|
||||||
|
url_for("rss.rss_tag_feed", tag_uuid=tag_uuid, token=rss_token, _external=True),
|
||||||
|
follow_redirects=True
|
||||||
|
)
|
||||||
|
|
||||||
|
# Verify response is successful
|
||||||
|
assert res.status_code == 200
|
||||||
|
assert b"<?xml" in res.data or b"<rss" in res.data
|
||||||
|
|
||||||
|
# Verify the RSS feed contains the tag name in the title
|
||||||
|
assert b"test-rss-group" in res.data
|
||||||
|
|
||||||
|
# Verify watch 1 and watch 2 are in the RSS feed (they have the tag)
|
||||||
|
assert b"watch=1" in res.data
|
||||||
|
assert b"watch=2" in res.data
|
||||||
|
|
||||||
|
# Verify watch 3 is NOT in the RSS feed (it doesn't have the tag)
|
||||||
|
assert b"watch=3" not in res.data
|
||||||
|
|
||||||
|
# Verify the changes are shown in the RSS feed
|
||||||
|
assert b"MODIFIED" in res.data or b"CHANGED" in res.data
|
||||||
|
|
||||||
|
# Verify it's actual RSS/XML format
|
||||||
|
assert b"<rss" in res.data or b"<feed" in res.data
|
||||||
|
|
||||||
|
# Test with invalid tag UUID - should return 404
|
||||||
|
res = client.get(
|
||||||
|
url_for("rss.rss_tag_feed", tag_uuid="invalid-uuid-12345", token=rss_token, _external=True),
|
||||||
|
follow_redirects=True
|
||||||
|
)
|
||||||
|
assert res.status_code == 404
|
||||||
|
assert b"not found" in res.data
|
||||||
|
|
||||||
|
# Test with invalid token - should return 403
|
||||||
|
res = client.get(
|
||||||
|
url_for("rss.rss_tag_feed", tag_uuid=tag_uuid, token="wrong-token", _external=True),
|
||||||
|
follow_redirects=True
|
||||||
|
)
|
||||||
|
assert res.status_code == 403
|
||||||
|
assert b"Access denied" in res.data
|
||||||
|
|
||||||
|
# Clean up
|
||||||
|
delete_all_watches(client)
|
||||||
|
res = client.get(url_for("tags.delete_all"), follow_redirects=True)
|
||||||
|
assert b'All tags deleted' in res.data
|
||||||
|
|
||||||
|
|
||||||
|
def test_rss_group_empty_tag(client, live_server, measure_memory_usage, datastore_path):
|
||||||
|
"""
|
||||||
|
Test that RSS feed for a tag with no watches returns valid but empty RSS.
|
||||||
|
"""
|
||||||
|
|
||||||
|
# Create a tag with no watches
|
||||||
|
res = client.post(
|
||||||
|
url_for("tags.form_tag_add"),
|
||||||
|
data={"name": "empty-tag"},
|
||||||
|
follow_redirects=True
|
||||||
|
)
|
||||||
|
assert b"Tag added" in res.data
|
||||||
|
|
||||||
|
tag_uuid = get_UUID_for_tag_name(client, name="empty-tag")
|
||||||
|
assert tag_uuid is not None
|
||||||
|
|
||||||
|
# Get RSS token
|
||||||
|
rss_token = extract_rss_token_from_UI(client)
|
||||||
|
|
||||||
|
# Request RSS feed for empty tag
|
||||||
|
res = client.get(
|
||||||
|
url_for("rss.rss_tag_feed", tag_uuid=tag_uuid, token=rss_token, _external=True),
|
||||||
|
follow_redirects=True
|
||||||
|
)
|
||||||
|
|
||||||
|
# Should still return 200 with valid RSS
|
||||||
|
assert res.status_code == 200
|
||||||
|
assert b"<?xml" in res.data or b"<rss" in res.data
|
||||||
|
assert b"empty-tag" in res.data
|
||||||
|
|
||||||
|
# Clean up
|
||||||
|
res = client.get(url_for("tags.delete_all"), follow_redirects=True)
|
||||||
|
assert b'All tags deleted' in res.data
|
||||||
|
|
||||||
|
|
||||||
|
def test_rss_group_only_unviewed(client, live_server, measure_memory_usage, datastore_path):
|
||||||
|
"""
|
||||||
|
Test that RSS feed for a tag only shows unviewed watches.
|
||||||
|
"""
|
||||||
|
|
||||||
|
set_original_response(datastore_path=datastore_path)
|
||||||
|
|
||||||
|
# Create a tag
|
||||||
|
res = client.post(
|
||||||
|
url_for("tags.form_tag_add"),
|
||||||
|
data={"name": "unviewed-test"},
|
||||||
|
follow_redirects=True
|
||||||
|
)
|
||||||
|
assert b"Tag added" in res.data
|
||||||
|
|
||||||
|
tag_uuid = get_UUID_for_tag_name(client, name="unviewed-test")
|
||||||
|
|
||||||
|
# Add two watches with the tag
|
||||||
|
test_url_1 = url_for('test_endpoint', _external=True) + "?unviewed=1"
|
||||||
|
res = client.post(
|
||||||
|
url_for("ui.ui_views.form_quick_watch_add"),
|
||||||
|
data={"url": test_url_1, "tags": 'unviewed-test'},
|
||||||
|
follow_redirects=True
|
||||||
|
)
|
||||||
|
assert b"Watch added" in res.data
|
||||||
|
|
||||||
|
test_url_2 = url_for('test_endpoint', _external=True) + "?unviewed=2"
|
||||||
|
res = client.post(
|
||||||
|
url_for("ui.ui_views.form_quick_watch_add"),
|
||||||
|
data={"url": test_url_2, "tags": 'unviewed-test'},
|
||||||
|
follow_redirects=True
|
||||||
|
)
|
||||||
|
assert b"Watch added" in res.data
|
||||||
|
|
||||||
|
wait_for_all_checks(client)
|
||||||
|
|
||||||
|
# Trigger changes
|
||||||
|
set_modified_response(datastore_path=datastore_path)
|
||||||
|
res = client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
|
||||||
|
wait_for_all_checks(client)
|
||||||
|
|
||||||
|
# Get RSS token
|
||||||
|
rss_token = extract_rss_token_from_UI(client)
|
||||||
|
|
||||||
|
# Request RSS feed - should show both watches (both unviewed)
|
||||||
|
res = client.get(
|
||||||
|
url_for("rss.rss_tag_feed", tag_uuid=tag_uuid, token=rss_token, _external=True),
|
||||||
|
follow_redirects=True
|
||||||
|
)
|
||||||
|
assert res.status_code == 200
|
||||||
|
assert b"unviewed=1" in res.data
|
||||||
|
assert b"unviewed=2" in res.data
|
||||||
|
|
||||||
|
# Mark all as viewed
|
||||||
|
res = client.get(url_for('ui.mark_all_viewed'), follow_redirects=True)
|
||||||
|
wait_for_all_checks(client)
|
||||||
|
|
||||||
|
# Request RSS feed again - should be empty now (no unviewed watches)
|
||||||
|
res = client.get(
|
||||||
|
url_for("rss.rss_tag_feed", tag_uuid=tag_uuid, token=rss_token, _external=True),
|
||||||
|
follow_redirects=True
|
||||||
|
)
|
||||||
|
assert res.status_code == 200
|
||||||
|
# Should not contain the watch URLs anymore since they're viewed
|
||||||
|
assert b"unviewed=1" not in res.data
|
||||||
|
assert b"unviewed=2" not in res.data
|
||||||
|
|
||||||
|
# Clean up
|
||||||
|
delete_all_watches(client)
|
||||||
|
res = client.get(url_for("tags.delete_all"), follow_redirects=True)
|
||||||
|
assert b'All tags deleted' in res.data
|
||||||
Reference in New Issue
Block a user