Compare commits

...

8 Commits

Author SHA1 Message Date
dgtlmoon
3767a2d5b9 0.51.1 Fixing semver version number
Some checks failed
Build and push containers / metadata (push) Has been cancelled
Build and push containers / build-push-containers (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Build distribution 📦 (push) Has been cancelled
ChangeDetection.io App Test / lint-code (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Test the built package works basically. (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Publish Python 🐍 distribution 📦 to PyPI (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-10 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-11 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-12 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-13 (push) Has been cancelled
2025-11-14 10:21:32 +01:00
dgtlmoon
71c8d8b1b1 0.51.01 2025-11-14 10:11:45 +01:00
dgtlmoon
20cbe6f510 0.51.00 2025-11-14 10:10:40 +01:00
dgtlmoon
3a6e1f908f UI - Minor text fix for anon history access 2025-11-14 10:01:03 +01:00
dgtlmoon
73fdbf24e3 RSS per watch tweaks (#3635)
Some checks failed
Build and push containers / metadata (push) Has been cancelled
Build and push containers / build-push-containers (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Build distribution 📦 (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Test the built package works basically. (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Publish Python 🐍 distribution 📦 to PyPI (push) Has been cancelled
ChangeDetection.io App Test / lint-code (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-10 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-11 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-12 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-13 (push) Has been cancelled
2025-11-13 20:37:11 +01:00
dgtlmoon
629f939224 RSS Feed per watch - Setting order (newest changes first) (#3634) 2025-11-13 19:44:22 +01:00
dgtlmoon
48299e5738 UI - Moving 'RSS' options to its own settings tab, RSS - Adding watch history length (#3633) 2025-11-13 19:20:03 +01:00
dgtlmoon
5b1b70b8ab RSS per group! (#3632) 2025-11-13 18:56:04 +01:00
14 changed files with 1044 additions and 236 deletions

View File

@@ -1,8 +1,8 @@
#!/usr/bin/env python3
# Read more https://github.com/dgtlmoon/changedetection.io/wiki
__version__ = '0.50.43'
# Semver means never use .01, or 00. Should be .1.
__version__ = '0.51.1'
from changedetectionio.strtobool import strtobool
from json.decoder import JSONDecodeError

View File

@@ -0,0 +1,97 @@
"""
Utility functions for RSS feed generation.
"""
from changedetectionio.jinja2_custom import render as jinja_render
from changedetectionio.notification.handler import apply_service_tweaks
from loguru import logger
import re
BAD_CHARS_REGEX = r'[\x00-\x08\x0B\x0C\x0E-\x1F]'
def scan_invalid_chars_in_rss(content):
"""
Scan for invalid characters in RSS content.
Returns True if invalid characters are found.
"""
for match in re.finditer(BAD_CHARS_REGEX, content):
i = match.start()
bad_char = content[i]
hex_value = f"0x{ord(bad_char):02x}"
# Grab context
start = max(0, i - 20)
end = min(len(content), i + 21)
context = content[start:end].replace('\n', '\\n').replace('\r', '\\r')
logger.warning(f"Invalid char {hex_value} at pos {i}: ...{context}...")
# First match is enough
return True
return False
def clean_entry_content(content):
"""
Remove invalid characters from RSS content.
"""
cleaned = re.sub(BAD_CHARS_REGEX, '', content)
return cleaned
def generate_watch_guid(watch):
"""
Generate a unique GUID for a watch RSS entry.
"""
return f"{watch['uuid']}/{watch.last_changed}"
def generate_watch_diff_content(watch, dates, rss_content_format, datastore, date_index_from=-2, date_index_to=-1):
"""
Generate HTML diff content for a watch given its history dates.
Returns tuple of (content, watch_label).
Args:
watch: The watch object
dates: List of history snapshot dates
rss_content_format: Format for RSS content (html or text)
datastore: The ChangeDetectionStore instance
date_index_from: Index of the "from" date in the dates list (default: -2)
date_index_to: Index of the "to" date in the dates list (default: -1)
Returns:
Tuple of (content, watch_label) - the rendered HTML content and watch label
"""
from changedetectionio import diff
# Same logic as watch-overview.html
if datastore.data['settings']['application']['ui'].get('use_page_title_in_list') or watch.get('use_page_title_in_list'):
watch_label = watch.label
else:
watch_label = watch.get('url')
try:
html_diff = diff.render_diff(
previous_version_file_contents=watch.get_history_snapshot(timestamp=dates[date_index_from]),
newest_version_file_contents=watch.get_history_snapshot(timestamp=dates[date_index_to]),
include_equal=False
)
requested_output_format = datastore.data['settings']['application'].get('rss_content_format')
url, html_diff, n_title = apply_service_tweaks(url='', n_body=html_diff, n_title=None, requested_output_format=requested_output_format)
except FileNotFoundError as e:
html_diff = f"History snapshot file for watch {watch.get('uuid')}@{watch.last_changed} - '{watch.get('title')} not found."
# @note: We use <pre> because nearly all RSS readers render only HTML (Thunderbird for example cant do just plaintext)
rss_template = "<pre>{{watch_label}} had a change.\n\n{{html_diff}}\n</pre>"
if 'html' in rss_content_format:
rss_template = "<html><body>\n<h4><a href=\"{{watch_url}}\">{{watch_label}}</a></h4>\n<p>{{html_diff}}</p>\n</body></html>\n"
content = jinja_render(template_str=rss_template, watch_label=watch_label, html_diff=html_diff, watch_url=watch.link)
# Out of range chars could also break feedgen
if scan_invalid_chars_in_rss(content):
content = clean_entry_content(content)
return content, watch_label

View File

@@ -1,224 +1,26 @@
from changedetectionio.jinja2_custom import render as jinja_render
from changedetectionio.notification.handler import apply_service_tweaks
from changedetectionio.store import ChangeDetectionStore
from feedgen.feed import FeedGenerator
from flask import Blueprint, make_response, request, url_for, redirect
from loguru import logger
import datetime
import pytz
import re
import time
from flask import Blueprint
BAD_CHARS_REGEX=r'[\x00-\x08\x0B\x0C\x0E-\x1F]'
# Anything that is not text/UTF-8 should be stripped before it breaks feedgen (such as binary data etc)
def scan_invalid_chars_in_rss(content):
for match in re.finditer(BAD_CHARS_REGEX, content):
i = match.start()
bad_char = content[i]
hex_value = f"0x{ord(bad_char):02x}"
# Grab context
start = max(0, i - 20)
end = min(len(content), i + 21)
context = content[start:end].replace('\n', '\\n').replace('\r', '\\r')
logger.warning(f"Invalid char {hex_value} at pos {i}: ...{context}...")
# First match is enough
return True
return False
def clean_entry_content(content):
cleaned = re.sub(BAD_CHARS_REGEX, '', content)
return cleaned
from . import tag as tag_routes
from . import main_feed
from . import single_watch
def construct_blueprint(datastore: ChangeDetectionStore):
"""
Construct and configure the RSS blueprint with all routes.
Args:
datastore: The ChangeDetectionStore instance
Returns:
The configured Flask blueprint
"""
rss_blueprint = Blueprint('rss', __name__)
# Helper function to generate GUID for RSS entries
def generate_watch_guid(watch):
"""Generate a unique GUID for a watch RSS entry."""
return f"{watch['uuid']}/{watch.last_changed}"
# Helper function to generate diff content for a watch
def generate_watch_diff_content(watch, dates, rss_content_format):
"""
Generate HTML diff content for a watch given its history dates.
Returns the rendered HTML content ready for RSS/display.
"""
from changedetectionio import diff
# Same logic as watch-overview.html
if datastore.data['settings']['application']['ui'].get('use_page_title_in_list') or watch.get('use_page_title_in_list'):
watch_label = watch.label
else:
watch_label = watch.get('url')
try:
html_diff = diff.render_diff(
previous_version_file_contents=watch.get_history_snapshot(timestamp=dates[-2]),
newest_version_file_contents=watch.get_history_snapshot(timestamp=dates[-1]),
include_equal=False
)
requested_output_format = datastore.data['settings']['application'].get('rss_content_format')
url, html_diff, n_title = apply_service_tweaks(url='', n_body=html_diff, n_title=None, requested_output_format=requested_output_format)
except FileNotFoundError as e:
html_diff = f"History snapshot file for watch {watch.get('uuid')}@{watch.last_changed} - '{watch.get('title')} not found."
# @note: We use <pre> because nearly all RSS readers render only HTML (Thunderbird for example cant do just plaintext)
rss_template = "<pre>{{watch_label}} had a change.\n\n{{html_diff}}\n</pre>"
if 'html' in rss_content_format:
rss_template = "<html><body>\n<h4><a href=\"{{watch_url}}\">{{watch_label}}</a></h4>\n<p>{{html_diff}}</p>\n</body></html>\n"
content = jinja_render(template_str=rss_template, watch_label=watch_label, html_diff=html_diff, watch_url=watch.link)
# Out of range chars could also break feedgen
if scan_invalid_chars_in_rss(content):
content = clean_entry_content(content)
return content, watch_label
# Some RSS reader situations ended up with rss/ (forward slash after RSS) due
# to some earlier blueprint rerouting work, it should goto feed.
@rss_blueprint.route("/", methods=['GET'])
def extraslash():
return redirect(url_for('rss.feed'))
# Import the login decorator if needed
# from changedetectionio.auth_decorator import login_optionally_required
@rss_blueprint.route("", methods=['GET'])
def feed():
now = time.time()
# Always requires token set
app_rss_token = datastore.data['settings']['application'].get('rss_access_token')
rss_url_token = request.args.get('token')
rss_content_format = datastore.data['settings']['application'].get('rss_content_format')
if rss_url_token != app_rss_token:
return "Access denied, bad token", 403
from changedetectionio import diff
limit_tag = request.args.get('tag', '').lower().strip()
# Be sure limit_tag is a uuid
for uuid, tag in datastore.data['settings']['application'].get('tags', {}).items():
if limit_tag == tag.get('title', '').lower().strip():
limit_tag = uuid
# Sort by last_changed and add the uuid which is usually the key..
sorted_watches = []
# @todo needs a .itemsWithTag() or something - then we can use that in Jinaj2 and throw this away
for uuid, watch in datastore.data['watching'].items():
# @todo tag notification_muted skip also (improve Watch model)
if datastore.data['settings']['application'].get('rss_hide_muted_watches') and watch.get('notification_muted'):
continue
if limit_tag and not limit_tag in watch['tags']:
continue
watch['uuid'] = uuid
sorted_watches.append(watch)
sorted_watches.sort(key=lambda x: x.last_changed, reverse=False)
fg = FeedGenerator()
fg.title('changedetection.io')
fg.description('Feed description')
fg.link(href='https://changedetection.io')
for watch in sorted_watches:
dates = list(watch.history.keys())
# Re #521 - Don't bother processing this one if theres less than 2 snapshots, means we never had a change detected.
if len(dates) < 2:
continue
if not watch.viewed:
# Re #239 - GUID needs to be individual for each event
# @todo In the future make this a configurable link back (see work on BASE_URL https://github.com/dgtlmoon/changedetection.io/pull/228)
guid = generate_watch_guid(watch)
fe = fg.add_entry()
# Include a link to the diff page, they will have to login here to see if password protection is enabled.
# Description is the page you watch, link takes you to the diff JS UI page
# Dict val base_url will get overriden with the env var if it is set.
ext_base_url = datastore.data['settings']['application'].get('active_base_url')
# @todo fix
# Because we are called via whatever web server, flask should figure out the right path (
diff_link = {'href': url_for('ui.ui_views.diff_history_page', uuid=watch['uuid'], _external=True)}
fe.link(link=diff_link)
content, watch_label = generate_watch_diff_content(watch, dates, rss_content_format)
fe.title(title=watch_label)
fe.content(content=content, type='CDATA')
fe.guid(guid, permalink=False)
dt = datetime.datetime.fromtimestamp(int(watch.newest_history_key))
dt = dt.replace(tzinfo=pytz.UTC)
fe.pubDate(dt)
response = make_response(fg.rss_str())
response.headers.set('Content-Type', 'application/rss+xml;charset=utf-8')
logger.trace(f"RSS generated in {time.time() - now:.3f}s")
return response
@rss_blueprint.route("/watch/<string:uuid>", methods=['GET'])
def rss_single_watch(uuid):
"""
Display the most recent change for a single watch as RSS feed.
Returns RSS XML with a single entry showing the diff between the last two snapshots.
"""
# Always requires token set
app_rss_token = datastore.data['settings']['application'].get('rss_access_token')
rss_url_token = request.args.get('token')
rss_content_format = datastore.data['settings']['application'].get('rss_content_format')
if rss_url_token != app_rss_token:
return "Access denied, bad token", 403
# Get the watch by UUID
watch = datastore.data['watching'].get(uuid)
if not watch:
return f"Watch with UUID {uuid} not found", 404
# Check if watch has at least 2 history snapshots
dates = list(watch.history.keys())
if len(dates) < 2:
return f"Watch {uuid} does not have enough history snapshots to show changes (need at least 2)", 400
# Add uuid to watch for proper functioning
watch['uuid'] = uuid
# Generate the diff content using the shared helper function
content, watch_label = generate_watch_diff_content(watch, dates, rss_content_format)
# Create RSS feed with single entry
fg = FeedGenerator()
fg.title(f'changedetection.io - {watch.label}')
fg.description('Changes')
fg.link(href='https://changedetection.io')
# Add single entry for this watch
guid = generate_watch_guid(watch)
fe = fg.add_entry()
# Include a link to the diff page
diff_link = {'href': url_for('ui.ui_views.diff_history_page', uuid=watch['uuid'], _external=True)}
fe.link(link=diff_link)
fe.title(title=watch_label)
fe.content(content=content, type='CDATA')
fe.guid(guid, permalink=False)
dt = datetime.datetime.fromtimestamp(int(watch.newest_history_key))
dt = dt.replace(tzinfo=pytz.UTC)
fe.pubDate(dt)
response = make_response(fg.rss_str())
response.headers.set('Content-Type', 'application/rss+xml;charset=utf-8')
return response
# Register all route modules
main_feed.construct_main_feed_routes(rss_blueprint, datastore)
single_watch.construct_single_watch_routes(rss_blueprint, datastore)
tag_routes.construct_tag_routes(rss_blueprint, datastore)
return rss_blueprint

View File

@@ -0,0 +1,109 @@
from flask import make_response, request, url_for, redirect
from feedgen.feed import FeedGenerator
from loguru import logger
import datetime
import pytz
import time
from ._util import generate_watch_guid, generate_watch_diff_content
def construct_main_feed_routes(rss_blueprint, datastore):
"""
Construct the main RSS feed routes.
Args:
rss_blueprint: The Flask blueprint to add routes to
datastore: The ChangeDetectionStore instance
"""
# Some RSS reader situations ended up with rss/ (forward slash after RSS) due
# to some earlier blueprint rerouting work, it should goto feed.
@rss_blueprint.route("/", methods=['GET'])
def extraslash():
return redirect(url_for('rss.feed'))
# Import the login decorator if needed
# from changedetectionio.auth_decorator import login_optionally_required
@rss_blueprint.route("", methods=['GET'])
def feed():
now = time.time()
# Always requires token set
app_rss_token = datastore.data['settings']['application'].get('rss_access_token')
rss_url_token = request.args.get('token')
rss_content_format = datastore.data['settings']['application'].get('rss_content_format')
if rss_url_token != app_rss_token:
return "Access denied, bad token", 403
limit_tag = request.args.get('tag', '').lower().strip()
# Be sure limit_tag is a uuid
for uuid, tag in datastore.data['settings']['application'].get('tags', {}).items():
if limit_tag == tag.get('title', '').lower().strip():
limit_tag = uuid
# Sort by last_changed and add the uuid which is usually the key..
sorted_watches = []
# @todo needs a .itemsWithTag() or something - then we can use that in Jinaj2 and throw this away
for uuid, watch in datastore.data['watching'].items():
# @todo tag notification_muted skip also (improve Watch model)
if datastore.data['settings']['application'].get('rss_hide_muted_watches') and watch.get('notification_muted'):
continue
if limit_tag and not limit_tag in watch['tags']:
continue
watch['uuid'] = uuid
sorted_watches.append(watch)
sorted_watches.sort(key=lambda x: x.last_changed, reverse=False)
fg = FeedGenerator()
fg.title('changedetection.io')
fg.description('Feed description')
fg.link(href='https://changedetection.io')
for watch in sorted_watches:
dates = list(watch.history.keys())
# Re #521 - Don't bother processing this one if theres less than 2 snapshots, means we never had a change detected.
if len(dates) < 2:
continue
if not watch.viewed:
# Re #239 - GUID needs to be individual for each event
# @todo In the future make this a configurable link back (see work on BASE_URL https://github.com/dgtlmoon/changedetection.io/pull/228)
guid = generate_watch_guid(watch)
fe = fg.add_entry()
# Include a link to the diff page, they will have to login here to see if password protection is enabled.
# Description is the page you watch, link takes you to the diff JS UI page
# Dict val base_url will get overriden with the env var if it is set.
ext_base_url = datastore.data['settings']['application'].get('active_base_url')
# @todo fix
# Because we are called via whatever web server, flask should figure out the right path (
diff_link = {'href': url_for('ui.ui_views.diff_history_page', uuid=watch['uuid'], _external=True)}
fe.link(link=diff_link)
content, watch_label = generate_watch_diff_content(watch, dates, rss_content_format, datastore)
fe.title(title=watch_label)
fe.content(content=content, type='CDATA')
fe.guid(guid, permalink=False)
dt = datetime.datetime.fromtimestamp(int(watch.newest_history_key))
dt = dt.replace(tzinfo=pytz.UTC)
fe.pubDate(dt)
# Add categories based on watch tags
for tag_uuid in watch.get('tags', []):
tag = datastore.data['settings']['application'].get('tags', {}).get(tag_uuid)
if tag:
tag_title = tag.get('title', '')
if tag_title:
fe.category(term=tag_title)
response = make_response(fg.rss_str())
response.headers.set('Content-Type', 'application/rss+xml;charset=utf-8')
logger.trace(f"RSS generated in {time.time() - now:.3f}s")
return response

View File

@@ -0,0 +1,159 @@
from flask import make_response, request, url_for
from feedgen.feed import FeedGenerator
import datetime
import pytz
import locale
from ._util import generate_watch_guid, generate_watch_diff_content
def construct_single_watch_routes(rss_blueprint, datastore):
"""
Construct RSS feed routes for single watches.
Args:
rss_blueprint: The Flask blueprint to add routes to
datastore: The ChangeDetectionStore instance
"""
@rss_blueprint.route("/watch/<string:uuid>", methods=['GET'])
def rss_single_watch(uuid):
"""
Display the most recent changes for a single watch as RSS feed.
Returns RSS XML with multiple entries showing diffs between consecutive snapshots.
The number of entries is controlled by the rss_diff_length setting.
"""
# Always requires token set
app_rss_token = datastore.data['settings']['application'].get('rss_access_token')
rss_url_token = request.args.get('token')
rss_content_format = datastore.data['settings']['application'].get('rss_content_format')
if rss_url_token != app_rss_token:
return "Access denied, bad token", 403
# Get the watch by UUID
watch = datastore.data['watching'].get(uuid)
if not watch:
return f"Watch with UUID {uuid} not found", 404
# Check if watch has at least 2 history snapshots
dates = list(watch.history.keys())
if len(dates) < 2:
return f"Watch {uuid} does not have enough history snapshots to show changes (need at least 2)", 400
# Add uuid to watch for proper functioning
watch['uuid'] = uuid
# Get the number of diffs to include (default: 5)
rss_diff_length = datastore.data['settings']['application'].get('rss_diff_length', 5)
# Calculate how many diffs we can actually show (limited by available history)
# We need at least 2 snapshots to create 1 diff
max_possible_diffs = len(dates) - 1
num_diffs = min(rss_diff_length, max_possible_diffs) if rss_diff_length > 0 else max_possible_diffs
# Create RSS feed
fg = FeedGenerator()
# Set title: use "label (url)" if label differs from url, otherwise just url
watch_url = watch.get('url', '')
watch_label = watch.label
if watch_label and watch_label != watch_url:
feed_title = f'changedetection.io - {watch_label} ({watch_url})'
else:
feed_title = f'changedetection.io - {watch_url}'
fg.title(feed_title)
fg.description('Changes')
fg.link(href='https://changedetection.io')
# Loop through history and create RSS entries for each diff
# Add entries in reverse order because feedgen reverses them
# This way, the newest change appears first in the final RSS
for i in range(num_diffs - 1, -1, -1):
# Calculate indices for this diff (working backwards from newest)
# i=0: compare dates[-2] to dates[-1] (most recent change)
# i=1: compare dates[-3] to dates[-2] (previous change)
# etc.
date_index_to = -(i + 1)
date_index_from = -(i + 2)
try:
# Generate the diff content for this pair of snapshots
timestamp_to = dates[date_index_to]
timestamp_from = dates[date_index_from]
content, watch_label = generate_watch_diff_content(
watch, dates, rss_content_format, datastore,
date_index_from=date_index_from,
date_index_to=date_index_to
)
# Generate edit watch link and add to content
edit_watch_url = url_for('ui.ui_edit.edit_page',
uuid=watch['uuid'],
_external=True)
# Add edit watch links at top and bottom of content
if 'html' in rss_content_format:
edit_link_html = f'<p><a href="{edit_watch_url}">[edit watch]</a></p>'
# Insert after <body> and before </body>
content = content.replace('<body>', f'<body>\n{edit_link_html}', 1)
content = content.replace('</body>', f'{edit_link_html}\n</body>', 1)
else:
# For plain text format, add plain text links in separate <pre> blocks
edit_link_top = f'<pre>[edit watch] {edit_watch_url}</pre>\n'
edit_link_bottom = f'\n<pre>[edit watch] {edit_watch_url}</pre>'
content = edit_link_top + content + edit_link_bottom
# Create a unique GUID for this specific diff
guid = f"{watch['uuid']}/{timestamp_to}"
fe = fg.add_entry()
# Include a link to the diff page with specific versions
diff_link = {'href': url_for('ui.ui_views.diff_history_page',
uuid=watch['uuid'],
from_version=timestamp_from,
to_version=timestamp_to,
_external=True)}
fe.link(link=diff_link)
# Format the date using locale-aware formatting with timezone
dt = datetime.datetime.fromtimestamp(int(timestamp_to))
dt = dt.replace(tzinfo=pytz.UTC)
# Get local timezone-aware datetime
local_tz = datetime.datetime.now().astimezone().tzinfo
local_dt = dt.astimezone(local_tz)
# Format date with timezone - using strftime for locale awareness
try:
formatted_date = local_dt.strftime('%Y-%m-%d %H:%M:%S %Z')
except:
# Fallback if locale issues
formatted_date = local_dt.isoformat()
# Use formatted date in title instead of "Change 1, 2, 3"
fe.title(title=f"{watch_label} - Change @ {formatted_date}")
fe.content(content=content, type='CDATA')
fe.guid(guid, permalink=False)
# Use the timestamp of the "to" snapshot for pubDate
fe.pubDate(dt)
# Add categories based on watch tags
for tag_uuid in watch.get('tags', []):
tag = datastore.data['settings']['application'].get('tags', {}).get(tag_uuid)
if tag:
tag_title = tag.get('title', '')
if tag_title:
fe.category(term=tag_title)
except (IndexError, FileNotFoundError) as e:
# Skip this diff if we can't generate it
continue
response = make_response(fg.rss_str())
response.headers.set('Content-Type', 'application/rss+xml;charset=utf-8')
return response

View File

@@ -0,0 +1,94 @@
from flask import make_response, request, url_for
from feedgen.feed import FeedGenerator
import datetime
import pytz
from ._util import generate_watch_guid, generate_watch_diff_content
def construct_tag_routes(rss_blueprint, datastore):
"""
Construct RSS feed routes for tags.
Args:
rss_blueprint: The Flask blueprint to add routes to
datastore: The ChangeDetectionStore instance
"""
@rss_blueprint.route("/tag/<string:tag_uuid>", methods=['GET'])
def rss_tag_feed(tag_uuid):
"""
Display an RSS feed for all unviewed watches that belong to a specific tag.
Returns RSS XML with entries for each unviewed watch with sufficient history.
"""
# Always requires token set
app_rss_token = datastore.data['settings']['application'].get('rss_access_token')
rss_url_token = request.args.get('token')
rss_content_format = datastore.data['settings']['application'].get('rss_content_format')
if rss_url_token != app_rss_token:
return "Access denied, bad token", 403
# Verify tag exists
tag = datastore.data['settings']['application'].get('tags', {}).get(tag_uuid)
if not tag:
return f"Tag with UUID {tag_uuid} not found", 404
tag_title = tag.get('title', 'Unknown Tag')
# Create RSS feed
fg = FeedGenerator()
fg.title(f'changedetection.io - {tag_title}')
fg.description(f'Changes for watches tagged with {tag_title}')
fg.link(href='https://changedetection.io')
# Find all watches with this tag
for uuid, watch in datastore.data['watching'].items():
# Skip if watch doesn't have this tag
if tag_uuid not in watch.get('tags', []):
continue
# Skip muted watches if configured
if datastore.data['settings']['application'].get('rss_hide_muted_watches') and watch.get('notification_muted'):
continue
# Check if watch has at least 2 history snapshots
dates = list(watch.history.keys())
if len(dates) < 2:
continue
# Only include unviewed watches
if not watch.viewed:
# Add uuid to watch for proper functioning
watch['uuid'] = uuid
# Generate GUID for this entry
guid = generate_watch_guid(watch)
fe = fg.add_entry()
# Include a link to the diff page
diff_link = {'href': url_for('ui.ui_views.diff_history_page', uuid=watch['uuid'], _external=True)}
fe.link(link=diff_link)
# Generate diff content
content, watch_label = generate_watch_diff_content(watch, dates, rss_content_format, datastore)
fe.title(title=watch_label)
fe.content(content=content, type='CDATA')
fe.guid(guid, permalink=False)
dt = datetime.datetime.fromtimestamp(int(watch.newest_history_key))
dt = dt.replace(tzinfo=pytz.UTC)
fe.pubDate(dt)
# Add categories based on watch tags
for tag_uuid in watch.get('tags', []):
tag = datastore.data['settings']['application'].get('tags', {}).get(tag_uuid)
if tag:
tag_title = tag.get('title', '')
if tag_title:
fe.category(term=tag_title)
response = make_response(fg.rss_str())
response.headers.set('Content-Type', 'application/rss+xml;charset=utf-8')
return response

View File

@@ -24,6 +24,7 @@
<li class="tab"><a href="#filters">Global Filters</a></li>
<li class="tab"><a href="#ui-options">UI Options</a></li>
<li class="tab"><a href="#api">API</a></li>
<li class="tab"><a href="#rss">RSS</a></li>
<li class="tab"><a href="#timedate">Time &amp Date</a></li>
<li class="tab"><a href="#proxies">CAPTCHA &amp; Proxies</a></li>
</ul>
@@ -65,26 +66,13 @@
<div class="pure-control-group">
{{ render_checkbox_field(form.application.form.shared_diff_access, class="shared_diff_access") }}
<span class="pure-form-message-inline">Allow access to view watch diff page when password is enabled (Good for sharing the diff page)
<span class="pure-form-message-inline">Allow access to the watch change history page when password is enabled (Good for sharing the diff page)
</span>
</div>
<div class="pure-control-group">
{{ render_checkbox_field(form.application.form.empty_pages_are_a_change) }}
<span class="pure-form-message-inline">When a request returns no content, or the HTML does not contain any text, is this considered a change?</span>
</div>
<div class="grey-form-border">
<div class="pure-control-group">
{{ render_checkbox_field(form.application.form.rss_hide_muted_watches) }}
</div>
<div class="pure-control-group">
{{ render_field(form.application.form.rss_content_format) }}
<span class="pure-form-message-inline">Love RSS? Does your reader support HTML? Set it here</span>
</div>
<div class="pure-control-group">
{{ render_checkbox_field(form.application.form.rss_reader_mode) }}
<span class="pure-form-message-inline">When watching RSS/Atom feeds, convert them into clean text for better change detection.</span>
</div>
</div>
</fieldset>
</div>
@@ -230,6 +218,24 @@ nav
</p>
</div>
</div>
<div class="tab-pane-inner" id="rss">
<div class="pure-control-group">
{{ render_checkbox_field(form.application.form.rss_hide_muted_watches) }}
</div>
<div class="pure-control-group">
{{ render_field(form.application.form.rss_content_format) }}
<span class="pure-form-message-inline">Love RSS? Does your reader support HTML? Set it here</span>
</div>
<div class="pure-control-group">
{{ render_field(form.application.form.rss_diff_length) }}
<span class="pure-form-message-inline">Maximum number of history snapshots to include in the watch specific RSS feed.</span>
</div>
<div class="pure-control-group">
{{ render_checkbox_field(form.application.form.rss_reader_mode) }}
<span class="pure-form-message-inline">For watching other RSS feeds - When watching RSS/Atom feeds, convert them into clean text for better change detection.</span>
</div>
</div>
<div class="tab-pane-inner" id="timedate">
<div class="pure-control-group">
Ensure the settings below are correct, they are used to manage the time schedule for checking your web page watches.

View File

@@ -21,9 +21,10 @@ def construct_blueprint(datastore: ChangeDetectionStore):
tag_count = Counter(tag for watch in datastore.data['watching'].values() if watch.get('tags') for tag in watch['tags'])
output = render_template("groups-overview.html",
app_rss_token=datastore.data['settings']['application'].get('rss_access_token'),
available_tags=sorted_tags,
form=add_form,
tag_count=tag_count
tag_count=tag_count,
)
return output
@@ -149,9 +150,9 @@ def construct_blueprint(datastore: ChangeDetectionStore):
included_content = template.render(**template_args)
output = render_template("edit-tag.html",
settings_application=datastore.data['settings']['application'],
extra_tab_content=form.extra_tab_content() if form.extra_tab_content() else None,
extra_form_content=included_content,
extra_tab_content=form.extra_tab_content() if form.extra_tab_content() else None,
settings_application=datastore.data['settings']['application'],
**template_args
)

View File

@@ -52,6 +52,7 @@
<a class="pure-button pure-button-primary" href="{{ url_for('tags.form_tag_edit', uuid=uuid) }}">Edit</a>&nbsp;
<a class="pure-button pure-button-primary" href="{{ url_for('tags.delete', uuid=uuid) }}" title="Deletes and removes tag">Delete</a>
<a class="pure-button pure-button-primary" href="{{ url_for('tags.unlink', uuid=uuid) }}" title="Keep the tag but unlink any watches">Unlink</a>
<a href="{{ url_for('rss.rss_tag_feed', tag_uuid=uuid, token=app_rss_token)}}"><img alt="RSS Feed for this watch" style="padding-left: 1em;" src="{{url_for('static_content', group='images', filename='generic_feed-icon.svg')}}" height="15"></a>
</td>
</tr>
{% endfor %}

View File

@@ -1009,8 +1009,10 @@ class globalSettingsApplicationForm(commonSettingsForm):
rss_hide_muted_watches = BooleanField('Hide muted watches from RSS feed', default=True,
validators=[validators.Optional()])
rss_reader_mode = BooleanField('RSS reader mode ', default=False,
validators=[validators.Optional()])
rss_reader_mode = BooleanField('RSS reader mode ', default=False, validators=[validators.Optional()])
rss_diff_length = IntegerField(label='Number of changes to show in watch RSS feed',
render_kw={"style": "width: 5em;"},
validators=[validators.NumberRange(min=0, message="Should contain zero or more attempts")])
filter_failure_notification_threshold_attempts = IntegerField('Number of times the filter can be missing before sending a notification',
render_kw={"style": "width: 5em;"},

View File

@@ -55,6 +55,7 @@ class model(dict):
'render_anchor_tag_content': False,
'rss_access_token': None,
'rss_content_format': RSS_CONTENT_FORMAT_DEFAULT,
'rss_diff_length': 5,
'rss_hide_muted_watches': True,
'rss_reader_mode': False,
'scheduler_timezone_default': None, # Default IANA timezone name

View File

@@ -314,3 +314,40 @@ def test_rss_single_watch_feed(client, live_server, measure_memory_usage, datast
item = root.findall('.//item')[0].findtext('description')
check_formatting(expected_type=k, content=item, url=test_url)
# Test RSS entry order: Create multiple versions and verify newest appears first
for version in range(3, 6): # Create versions 3, 4, 5
set_html_content(datastore_path, f"Version {version} content")
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
time.sleep(0.5) # Small delay to ensure different timestamps
# Fetch RSS feed again to verify order
res = client.get(
url_for('rss.rss_single_watch', uuid=uuid, token=app_rss_token),
follow_redirects=False
)
assert res.status_code == 200
# Parse RSS and check order (newest first)
root = ET.fromstring(res.data)
items = root.findall('.//item')
assert len(items) >= 3, f"Expected at least 3 items, got {len(items)}"
# Get descriptions from first 3 items
descriptions = []
for item in items[:3]:
desc = item.findtext('description')
descriptions.append(desc if desc else "")
# First item should contain newest change (Version 5)
assert b"Version 5" in descriptions[0].encode() or "Version 5" in descriptions[0], \
f"First item should show newest change (Version 5), but got: {descriptions[0][:200]}"
# Second item should contain Version 4
assert b"Version 4" in descriptions[1].encode() or "Version 4" in descriptions[1], \
f"Second item should show Version 4, but got: {descriptions[1][:200]}"
# Third item should contain Version 3
assert b"Version 3" in descriptions[2].encode() or "Version 3" in descriptions[2], \
f"Third item should show Version 3, but got: {descriptions[2][:200]}"

View File

@@ -0,0 +1,254 @@
#!/usr/bin/env python3
import time
from flask import url_for
from .util import live_server_setup, wait_for_all_checks, extract_rss_token_from_UI, get_UUID_for_tag_name, delete_all_watches
import os
def set_original_response(datastore_path):
test_return_data = """<html>
<body>
Some initial text<br>
<p>Watch 1 content</p>
<p>Watch 2 content</p>
</body>
</html>
"""
with open(os.path.join(datastore_path, "endpoint-content.txt"), "w") as f:
f.write(test_return_data)
return None
def set_modified_response(datastore_path):
test_return_data = """<html>
<body>
Some initial text<br>
<p>Watch 1 content MODIFIED</p>
<p>Watch 2 content CHANGED</p>
</body>
</html>
"""
with open(os.path.join(datastore_path, "endpoint-content.txt"), "w") as f:
f.write(test_return_data)
return None
def test_rss_group(client, live_server, measure_memory_usage, datastore_path):
"""
Test that RSS feed for a specific tag/group shows only watches in that group
and displays changes correctly.
"""
set_original_response(datastore_path=datastore_path)
# Create a tag/group
res = client.post(
url_for("tags.form_tag_add"),
data={"name": "test-rss-group"},
follow_redirects=True
)
assert b"Tag added" in res.data
assert b"test-rss-group" in res.data
# Get the tag UUID
tag_uuid = get_UUID_for_tag_name(client, name="test-rss-group")
assert tag_uuid is not None
# Add first watch with the tag
test_url_1 = url_for('test_endpoint', _external=True) + "?watch=1"
res = client.post(
url_for("ui.ui_views.form_quick_watch_add"),
data={"url": test_url_1, "tags": 'test-rss-group'},
follow_redirects=True
)
assert b"Watch added" in res.data
# Add second watch with the tag
test_url_2 = url_for('test_endpoint', _external=True) + "?watch=2"
res = client.post(
url_for("ui.ui_views.form_quick_watch_add"),
data={"url": test_url_2, "tags": 'test-rss-group'},
follow_redirects=True
)
assert b"Watch added" in res.data
# Add a third watch WITHOUT the tag (should not appear in RSS)
test_url_3 = url_for('test_endpoint', _external=True) + "?watch=3"
res = client.post(
url_for("ui.ui_views.form_quick_watch_add"),
data={"url": test_url_3, "tags": 'other-tag'},
follow_redirects=True
)
assert b"Watch added" in res.data
# Wait for initial checks to complete
wait_for_all_checks(client)
# Trigger a change
set_modified_response(datastore_path=datastore_path)
# Recheck all watches
res = client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
# Get RSS token
rss_token = extract_rss_token_from_UI(client)
assert rss_token is not None
# Request RSS feed for the specific tag/group using the new endpoint
res = client.get(
url_for("rss.rss_tag_feed", tag_uuid=tag_uuid, token=rss_token, _external=True),
follow_redirects=True
)
# Verify response is successful
assert res.status_code == 200
assert b"<?xml" in res.data or b"<rss" in res.data
# Verify the RSS feed contains the tag name in the title
assert b"test-rss-group" in res.data
# Verify watch 1 and watch 2 are in the RSS feed (they have the tag)
assert b"watch=1" in res.data
assert b"watch=2" in res.data
# Verify watch 3 is NOT in the RSS feed (it doesn't have the tag)
assert b"watch=3" not in res.data
# Verify the changes are shown in the RSS feed
assert b"MODIFIED" in res.data or b"CHANGED" in res.data
# Verify it's actual RSS/XML format
assert b"<rss" in res.data or b"<feed" in res.data
# Test with invalid tag UUID - should return 404
res = client.get(
url_for("rss.rss_tag_feed", tag_uuid="invalid-uuid-12345", token=rss_token, _external=True),
follow_redirects=True
)
assert res.status_code == 404
assert b"not found" in res.data
# Test with invalid token - should return 403
res = client.get(
url_for("rss.rss_tag_feed", tag_uuid=tag_uuid, token="wrong-token", _external=True),
follow_redirects=True
)
assert res.status_code == 403
assert b"Access denied" in res.data
# Clean up
delete_all_watches(client)
res = client.get(url_for("tags.delete_all"), follow_redirects=True)
assert b'All tags deleted' in res.data
def test_rss_group_empty_tag(client, live_server, measure_memory_usage, datastore_path):
"""
Test that RSS feed for a tag with no watches returns valid but empty RSS.
"""
# Create a tag with no watches
res = client.post(
url_for("tags.form_tag_add"),
data={"name": "empty-tag"},
follow_redirects=True
)
assert b"Tag added" in res.data
tag_uuid = get_UUID_for_tag_name(client, name="empty-tag")
assert tag_uuid is not None
# Get RSS token
rss_token = extract_rss_token_from_UI(client)
# Request RSS feed for empty tag
res = client.get(
url_for("rss.rss_tag_feed", tag_uuid=tag_uuid, token=rss_token, _external=True),
follow_redirects=True
)
# Should still return 200 with valid RSS
assert res.status_code == 200
assert b"<?xml" in res.data or b"<rss" in res.data
assert b"empty-tag" in res.data
# Clean up
res = client.get(url_for("tags.delete_all"), follow_redirects=True)
assert b'All tags deleted' in res.data
def test_rss_group_only_unviewed(client, live_server, measure_memory_usage, datastore_path):
"""
Test that RSS feed for a tag only shows unviewed watches.
"""
set_original_response(datastore_path=datastore_path)
# Create a tag
res = client.post(
url_for("tags.form_tag_add"),
data={"name": "unviewed-test"},
follow_redirects=True
)
assert b"Tag added" in res.data
tag_uuid = get_UUID_for_tag_name(client, name="unviewed-test")
# Add two watches with the tag
test_url_1 = url_for('test_endpoint', _external=True) + "?unviewed=1"
res = client.post(
url_for("ui.ui_views.form_quick_watch_add"),
data={"url": test_url_1, "tags": 'unviewed-test'},
follow_redirects=True
)
assert b"Watch added" in res.data
test_url_2 = url_for('test_endpoint', _external=True) + "?unviewed=2"
res = client.post(
url_for("ui.ui_views.form_quick_watch_add"),
data={"url": test_url_2, "tags": 'unviewed-test'},
follow_redirects=True
)
assert b"Watch added" in res.data
wait_for_all_checks(client)
# Trigger changes
set_modified_response(datastore_path=datastore_path)
res = client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
# Get RSS token
rss_token = extract_rss_token_from_UI(client)
# Request RSS feed - should show both watches (both unviewed)
res = client.get(
url_for("rss.rss_tag_feed", tag_uuid=tag_uuid, token=rss_token, _external=True),
follow_redirects=True
)
assert res.status_code == 200
assert b"unviewed=1" in res.data
assert b"unviewed=2" in res.data
# Mark all as viewed
res = client.get(url_for('ui.mark_all_viewed'), follow_redirects=True)
wait_for_all_checks(client)
# Request RSS feed again - should be empty now (no unviewed watches)
res = client.get(
url_for("rss.rss_tag_feed", tag_uuid=tag_uuid, token=rss_token, _external=True),
follow_redirects=True
)
assert res.status_code == 200
# Should not contain the watch URLs anymore since they're viewed
assert b"unviewed=1" not in res.data
assert b"unviewed=2" not in res.data
# Clean up
delete_all_watches(client)
res = client.get(url_for("tags.delete_all"), follow_redirects=True)
assert b'All tags deleted' in res.data

View File

@@ -0,0 +1,245 @@
#!/usr/bin/env python3
import time
import os
import xml.etree.ElementTree as ET
from flask import url_for
from .util import live_server_setup, wait_for_all_checks, extract_rss_token_from_UI, extract_UUID_from_client, delete_all_watches
def test_rss_single_watch_order(client, live_server, measure_memory_usage, datastore_path):
"""
Test that single watch RSS feed shows changes in correct order (newest first).
"""
# Create initial content
def set_response(datastore_path, version):
test_return_data = f"""<html>
<body>
<p>Version {version} content</p>
</body>
</html>
"""
with open(os.path.join(datastore_path, "endpoint-content.txt"), "w") as f:
f.write(test_return_data)
# Start with version 1
set_response(datastore_path, 1)
# Add a watch
test_url = url_for('test_endpoint', _external=True) + "?order_test=1"
res = client.post(
url_for("ui.ui_views.form_quick_watch_add"),
data={"url": test_url, "tags": 'test-tag'},
follow_redirects=True
)
assert b"Watch added" in res.data
# Get the watch UUID
watch_uuid = extract_UUID_from_client(client)
# Wait for initial check
wait_for_all_checks(client)
# Create multiple versions by triggering changes
for version in range(2, 6): # Create versions 2, 3, 4, 5
set_response(datastore_path, version)
res = client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
time.sleep(0.5) # Small delay to ensure different timestamps
# Get RSS token
rss_token = extract_rss_token_from_UI(client)
# Request RSS feed for the single watch
res = client.get(
url_for("rss.rss_single_watch", uuid=watch_uuid, token=rss_token, _external=True),
follow_redirects=True
)
# Should return valid RSS
assert res.status_code == 200
assert b"<?xml" in res.data or b"<rss" in res.data
# Parse the RSS/XML
root = ET.fromstring(res.data)
# Find all items (RSS 2.0) or entries (Atom)
items = root.findall('.//item')
if not items:
items = root.findall('.//{http://www.w3.org/2005/Atom}entry')
# Should have multiple items
assert len(items) >= 3, f"Expected at least 3 items, got {len(items)}"
# Get the descriptions/content from first 3 items
descriptions = []
for item in items[:3]:
# Try RSS format first
desc = item.findtext('description')
if not desc:
# Try Atom format
content_elem = item.find('{http://www.w3.org/2005/Atom}content')
if content_elem is not None:
desc = content_elem.text
descriptions.append(desc if desc else "")
print(f"First item content: {descriptions[0][:100] if descriptions[0] else 'None'}")
print(f"Second item content: {descriptions[1][:100] if descriptions[1] else 'None'}")
print(f"Third item content: {descriptions[2][:100] if descriptions[2] else 'None'}")
# The FIRST item should contain the NEWEST change (Version 5)
# The SECOND item should contain Version 4
# The THIRD item should contain Version 3
assert b"Version 5" in descriptions[0].encode() or "Version 5" in descriptions[0], \
f"First item should show newest change (Version 5), but got: {descriptions[0][:200]}"
# Verify the order is correct
assert b"Version 4" in descriptions[1].encode() or "Version 4" in descriptions[1], \
f"Second item should show Version 4, but got: {descriptions[1][:200]}"
assert b"Version 3" in descriptions[2].encode() or "Version 3" in descriptions[2], \
f"Third item should show Version 3, but got: {descriptions[2][:200]}"
# Clean up
delete_all_watches(client)
def test_rss_categories_from_tags(client, live_server, measure_memory_usage, datastore_path):
"""
Test that RSS feeds include category tags from watch tags.
"""
# Create initial content
test_return_data = """<html>
<body>
<p>Test content for RSS categories</p>
</body>
</html>
"""
with open(os.path.join(datastore_path, "endpoint-content.txt"), "w") as f:
f.write(test_return_data)
# Create some tags first
res = client.post(
url_for("tags.form_tag_add"),
data={"name": "Security"},
follow_redirects=True
)
res = client.post(
url_for("tags.form_tag_add"),
data={"name": "Python"},
follow_redirects=True
)
res = client.post(
url_for("tags.form_tag_add"),
data={"name": "Tech News"},
follow_redirects=True
)
# Add a watch with tags
test_url = url_for('test_endpoint', _external=True) + "?category_test=1"
res = client.post(
url_for("ui.ui_views.form_quick_watch_add"),
data={"url": test_url, "tags": "Security, Python, Tech News"},
follow_redirects=True
)
assert b"Watch added" in res.data
# Get the watch UUID
watch_uuid = extract_UUID_from_client(client)
# Wait for initial check
wait_for_all_checks(client)
# Trigger one change
test_return_data_v2 = """<html>
<body>
<p>Updated content for RSS categories</p>
</body>
</html>
"""
with open(os.path.join(datastore_path, "endpoint-content.txt"), "w") as f:
f.write(test_return_data_v2)
res = client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
# Get RSS token
rss_token = extract_rss_token_from_UI(client)
# Test 1: Check single watch RSS feed
res = client.get(
url_for("rss.rss_single_watch", uuid=watch_uuid, token=rss_token, _external=True),
follow_redirects=True
)
assert res.status_code == 200
assert b"<?xml" in res.data or b"<rss" in res.data
# Parse the RSS/XML
root = ET.fromstring(res.data)
# Find all items
items = root.findall('.//item')
assert len(items) >= 1, "Expected at least 1 item in RSS feed"
# Get categories from first item
categories = [cat.text for cat in items[0].findall('category')]
print(f"Found categories in single watch RSS: {categories}")
# Should have all three categories
assert "Security" in categories, f"Expected 'Security' category, got: {categories}"
assert "Python" in categories, f"Expected 'Python' category, got: {categories}"
assert "Tech News" in categories, f"Expected 'Tech News' category, got: {categories}"
assert len(categories) == 3, f"Expected 3 categories, got {len(categories)}: {categories}"
# Test 2: Check main RSS feed
res = client.get(
url_for("rss.feed", token=rss_token, _external=True),
follow_redirects=True
)
assert res.status_code == 200
root = ET.fromstring(res.data)
items = root.findall('.//item')
assert len(items) >= 1, "Expected at least 1 item in main RSS feed"
# Get categories from first item in main feed
categories = [cat.text for cat in items[0].findall('category')]
print(f"Found categories in main RSS feed: {categories}")
# Should have all three categories
assert "Security" in categories, f"Expected 'Security' category in main feed, got: {categories}"
assert "Python" in categories, f"Expected 'Python' category in main feed, got: {categories}"
assert "Tech News" in categories, f"Expected 'Tech News' category in main feed, got: {categories}"
# Test 3: Check tag-specific RSS feed (should also have categories)
# Get the tag UUID for "Security" and verify the tag feed also has categories
from .util import get_UUID_for_tag_name
security_tag_uuid = get_UUID_for_tag_name(client, name="Security")
if security_tag_uuid:
res = client.get(
url_for("rss.rss_tag_feed", tag_uuid=security_tag_uuid, token=rss_token, _external=True),
follow_redirects=True
)
assert res.status_code == 200
root = ET.fromstring(res.data)
items = root.findall('.//item')
if len(items) >= 1:
categories = [cat.text for cat in items[0].findall('category')]
print(f"Found categories in tag RSS feed: {categories}")
# Should still have all three categories
assert "Security" in categories, f"Expected 'Security' category in tag feed, got: {categories}"
assert "Python" in categories, f"Expected 'Python' category in tag feed, got: {categories}"
assert "Tech News" in categories, f"Expected 'Tech News' category in tag feed, got: {categories}"
# Clean up
delete_all_watches(client)