Compare commits

...

23 Commits

Author SHA1 Message Date
dgtlmoon
ff9b4fc32c Adding tests, adding group/tag name to RSS <category> 2025-11-13 20:07:03 +01:00
dgtlmoon
9a6a131985 Tidyups for single watch feed 2025-11-13 19:59:54 +01:00
dgtlmoon
629f939224 RSS Feed per watch - Setting order (newest changes first) (#3634) 2025-11-13 19:44:22 +01:00
dgtlmoon
48299e5738 UI - Moving 'RSS' options to its own settings tab, RSS - Adding watch history length (#3633) 2025-11-13 19:20:03 +01:00
dgtlmoon
5b1b70b8ab RSS per group! (#3632) 2025-11-13 18:56:04 +01:00
dgtlmoon
678d568b37 UI - Move 'Jitter seconds' settings tab from "General" to "Fetching" global Settings. 2025-11-13 17:34:57 +01:00
John Eismeier
fb15b62fb9 README typo fix and ignore files for emacs style backups
Some checks failed
Build and push containers / metadata (push) Has been cancelled
Build and push containers / build-push-containers (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Build distribution 📦 (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Test the built package works basically. (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Publish Python 🐍 distribution 📦 to PyPI (push) Has been cancelled
ChangeDetection.io App Test / lint-code (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-10 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-11 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-12 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-13 (push) Has been cancelled
CodeQL / Analyze (javascript) (push) Has been cancelled
CodeQL / Analyze (python) (push) Has been cancelled
2025-11-12 21:51:06 +01:00
dgtlmoon
8dc39d4a3d RSS feeds for a single watches!
Some checks failed
Build and push containers / metadata (push) Has been cancelled
Build and push containers / build-push-containers (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Build distribution 📦 (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Test the built package works basically. (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Publish Python 🐍 distribution 📦 to PyPI (push) Has been cancelled
ChangeDetection.io App Test / lint-code (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-10 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-11 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-12 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-13 (push) Has been cancelled
2025-11-12 17:38:18 +01:00
dgtlmoon
805cd618d4 Always backup JSON DB on new versions as well as the existing between updates. 2025-11-12 17:37:12 +01:00
dgtlmoon
4ba5fcce8f 0.50.43
Some checks failed
Build and push containers / metadata (push) Has been cancelled
Build and push containers / build-push-containers (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Build distribution 📦 (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Test the built package works basically. (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Publish Python 🐍 distribution 📦 to PyPI (push) Has been cancelled
ChangeDetection.io App Test / lint-code (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-10 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-11 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-12 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-13 (push) Has been cancelled
2025-11-12 13:00:42 +01:00
dgtlmoon
b9305faf21 Forcing UTF-8 when reading JSON DB (Fixes data not loaded for some platforms #3622 #3611 #3628), Always create new versions of the backup DB if one exists for that step when running updates, Adding extra sanity checks on DB load 2025-11-12 12:58:59 +01:00
dgtlmoon
3d3b53831e Adding data sanity checks across restarts (#3629) 2025-11-12 12:19:16 +01:00
dgtlmoon
2ae29ab78f 0.50.42
Some checks failed
Build and push containers / metadata (push) Has been cancelled
Build and push containers / build-push-containers (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Build distribution 📦 (push) Has been cancelled
ChangeDetection.io App Test / lint-code (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Test the built package works basically. (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Publish Python 🐍 distribution 📦 to PyPI (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-10 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-11 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-12 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-13 (push) Has been cancelled
2025-11-10 13:32:08 +01:00
dgtlmoon
caffd804fe Revert "Windows - JSON DB fixes - Forcing utf-8 for json DB read/writes should solve windows saving/loading problems. (#3615 #3611)"
This reverts commit e2b407c6f3.
2025-11-10 13:31:51 +01:00
dgtlmoon
c58a97f69d 0.50.41
Some checks failed
Build and push containers / metadata (push) Has been cancelled
Build and push containers / build-push-containers (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Build distribution 📦 (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Test the built package works basically. (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Publish Python 🐍 distribution 📦 to PyPI (push) Has been cancelled
ChangeDetection.io App Test / lint-code (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-10 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-11 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-12 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-13 (push) Has been cancelled
ChangeDetection.io Container Build Test / Build linux/amd64 (alpine) (push) Has been cancelled
ChangeDetection.io Container Build Test / Build linux/arm64 (alpine) (push) Has been cancelled
ChangeDetection.io Container Build Test / Build linux/amd64 (main) (push) Has been cancelled
ChangeDetection.io Container Build Test / Build linux/arm/v7 (main) (push) Has been cancelled
ChangeDetection.io Container Build Test / Build linux/arm/v8 (main) (push) Has been cancelled
ChangeDetection.io Container Build Test / Build linux/arm64 (main) (push) Has been cancelled
2025-11-10 11:59:41 +01:00
dgtlmoon
e2b407c6f3 Windows - JSON DB fixes - Forcing utf-8 for json DB read/writes should solve windows saving/loading problems. (#3615 #3611) 2025-11-10 11:59:17 +01:00
dependabot[bot]
d65a2c784d Update orjson requirement from ~=3.10 to ~=3.11 (#3617) 2025-11-10 11:26:48 +01:00
dgtlmoon
9bc812a167 0.50.40
Some checks failed
Build and push containers / metadata (push) Has been cancelled
Build and push containers / build-push-containers (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Build distribution 📦 (push) Has been cancelled
ChangeDetection.io App Test / lint-code (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Test the built package works basically. (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Publish Python 🐍 distribution 📦 to PyPI (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-10 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-11 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-12 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-13 (push) Has been cancelled
2025-11-07 13:21:22 +01:00
dgtlmoon
fd2080567d Page <title> should only be captured on HTML documents (#3608) 2025-11-07 11:51:10 +01:00
dgtlmoon
969c75e7be Notification body/title - Fixing validation on empty strings #3606 (#3607) 2025-11-07 11:42:57 +01:00
dgtlmoon
4b14cec5f4 Real time UI - Remove polling thread for updates - it's all done realtime by signals (#3603)
Some checks failed
Build and push containers / metadata (push) Has been cancelled
Build and push containers / build-push-containers (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Build distribution 📦 (push) Has been cancelled
ChangeDetection.io App Test / lint-code (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Test the built package works basically. (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Publish Python 🐍 distribution 📦 to PyPI (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-10 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-11 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-12 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-13 (push) Has been cancelled
CodeQL / Analyze (javascript) (push) Has been cancelled
CodeQL / Analyze (python) (push) Has been cancelled
2025-11-05 21:49:17 +01:00
dgtlmoon
a8d5ea067d Watch history - Don't rescan whole history.txt when looking up a timestamp <->filepath (#3602) 2025-11-05 18:50:27 +01:00
dgtlmoon
2f6873f7d5 Datastore - Use orjson for faster saves (#3601)
Some checks failed
Build and push containers / metadata (push) Has been cancelled
Build and push containers / build-push-containers (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Build distribution 📦 (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Test the built package works basically. (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Publish Python 🐍 distribution 📦 to PyPI (push) Has been cancelled
ChangeDetection.io App Test / lint-code (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-10 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-11 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-12 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-13 (push) Has been cancelled
ChangeDetection.io Container Build Test / Build linux/amd64 (alpine) (push) Has been cancelled
ChangeDetection.io Container Build Test / Build linux/arm64 (alpine) (push) Has been cancelled
ChangeDetection.io Container Build Test / Build linux/amd64 (main) (push) Has been cancelled
ChangeDetection.io Container Build Test / Build linux/arm/v7 (main) (push) Has been cancelled
ChangeDetection.io Container Build Test / Build linux/arm/v8 (main) (push) Has been cancelled
ChangeDetection.io Container Build Test / Build linux/arm64 (main) (push) Has been cancelled
2025-11-05 15:12:11 +01:00
45 changed files with 1473 additions and 413 deletions

1
.gitignore vendored
View File

@@ -21,6 +21,7 @@ venv/
# IDEs
.idea
.vscode/settings.json
*~
# Datastore files
datastore/

View File

@@ -14,7 +14,7 @@ Ideal for monitoring price changes, content edits, conditional changes and more.
[<img src="https://raw.githubusercontent.com/dgtlmoon/changedetection.io/master/docs/screenshot.png" style="max-width:100%;" alt="Self-hosted web page change monitoring, list of websites with changes" title="Self-hosted web page change monitoring, list of websites with changes" />](https://changedetection.io)
[**Don't have time? Try our extremely affordable subscription use our proxies and support!**](https://changedetection.io)
[**Don't have time? Try our extremely affordable subscription use our proxies and support!**](https://changedetection.io)
@@ -31,7 +31,7 @@ Available when connected to a <a href="https://github.com/dgtlmoon/changedetecti
### Perform interactive browser steps
Fill in text boxes, click buttons and more, setup your changedetection scenario.
Fill in text boxes, click buttons and more, setup your changedetection scenario.
Using the **Browser Steps** configuration, add basic steps before performing change detection, such as logging into websites, adding a product to a cart, accept cookie logins, entering dates and refining searches.
@@ -54,7 +54,7 @@ Requires Playwright to be enabled.
- Know when your favourite whiskey is on sale, or other special deals are announced before anyone else
- COVID related news from government websites
- University/organisation news from their website
- Detect and monitor changes in JSON API responses
- Detect and monitor changes in JSON API responses
- JSON API monitoring and alerting
- Changes in legal and other documents
- Trigger API calls via notifications when text appears on a website
@@ -86,7 +86,7 @@ _Need an actual Chrome runner with Javascript support? We support fetching via W
We [recommend and use Bright Data](https://brightdata.grsm.io/n0r16zf7eivq) global proxy services, Bright Data will match any first deposit up to $100 using our signup link.
[Oxylabs](https://oxylabs.go2cloud.org/SH2d) is also an excellent proxy provider and well worth using, they offer Residental, ISP, Rotating and many other proxy types to suit your project.
[Oxylabs](https://oxylabs.go2cloud.org/SH2d) is also an excellent proxy provider and well worth using, they offer Residential, ISP, Rotating and many other proxy types to suit your project.
Please :star: star :star: this project and help it grow! https://github.com/dgtlmoon/changedetection.io/
@@ -106,4 +106,3 @@ $ changedetection.io -d /path/to/empty/data/dir -p 5000
Then visit http://127.0.0.1:5000 , You should now be able to access the UI.
See https://changedetection.io for more information.

View File

@@ -64,7 +64,7 @@ def count_words_in_history(watch):
return 0
latest_key = list(watch.history.keys())[-1]
latest_content = watch.get_history_snapshot(latest_key)
latest_content = watch.get_history_snapshot(timestamp=latest_key)
return len(latest_content.split())
except Exception as e:
logger.error(f"Error counting words: {str(e)}")

View File

@@ -2,7 +2,7 @@
# Read more https://github.com/dgtlmoon/changedetection.io/wiki
__version__ = '0.50.39'
__version__ = '0.50.43'
from changedetectionio.strtobool import strtobool
from json.decoder import JSONDecodeError
@@ -74,6 +74,12 @@ def main():
datastore_path = None
do_cleanup = False
# Optional URL to watch since start
default_url = None
# Set a default logger level
logger_level = 'DEBUG'
include_default_watches = True
host = os.environ.get("LISTEN_HOST", "0.0.0.0").strip()
port = int(os.environ.get('PORT', 5000))
ssl_mode = False
@@ -87,15 +93,13 @@ def main():
datastore_path = os.path.join(os.getcwd(), "../datastore")
try:
opts, args = getopt.getopt(sys.argv[1:], "6Ccsd:h:p:l:", "port")
opts, args = getopt.getopt(sys.argv[1:], "6Ccsd:h:p:l:u:", "port")
except getopt.GetoptError:
print('backend.py -s SSL enable -h [host] -p [port] -d [datastore path] -l [debug level - TRACE, DEBUG(default), INFO, SUCCESS, WARNING, ERROR, CRITICAL]')
print('backend.py -s SSL enable -h [host] -p [port] -d [datastore path] -u [default URL to watch] -l [debug level - TRACE, DEBUG(default), INFO, SUCCESS, WARNING, ERROR, CRITICAL]')
sys.exit(2)
create_datastore_dir = False
# Set a default logger level
logger_level = 'DEBUG'
# Set a logger level via shell env variable
# Used: Dockerfile for CICD
# To set logger level for pytest, see the app function in tests/conftest.py
@@ -116,6 +120,10 @@ def main():
if opt == '-d':
datastore_path = arg
if opt == '-u':
default_url = arg
include_default_watches = False
# Cleanup (remove text files that arent in the index)
if opt == '-c':
do_cleanup = True
@@ -172,13 +180,16 @@ def main():
sys.exit(2)
try:
datastore = store.ChangeDetectionStore(datastore_path=app_config['datastore_path'], version_tag=__version__)
datastore = store.ChangeDetectionStore(datastore_path=app_config['datastore_path'], version_tag=__version__, include_default_watches=include_default_watches)
except JSONDecodeError as e:
# Dont' start if the JSON DB looks corrupt
logger.critical(f"ERROR: JSON DB or Proxy List JSON at '{app_config['datastore_path']}' appears to be corrupt, aborting.")
logger.critical(str(e))
return
if default_url:
datastore.add_watch(url = default_url)
app = changedetection_app(app_config, datastore)
# Get the SocketIO instance from the Flask app (created in flask_app.py)

View File

@@ -175,7 +175,7 @@ class WatchSingleHistory(Resource):
response = make_response("No content found", 404)
response.mimetype = "text/plain"
else:
content = watch.get_history_snapshot(timestamp)
content = watch.get_history_snapshot(timestamp=timestamp)
response = make_response(content, 200)
response.mimetype = "text/plain"

View File

@@ -41,7 +41,7 @@ def get_openapi_spec():
# Possibly for pip3 packages
spec_path = os.path.join(os.path.dirname(__file__), '../docs/api-spec.yaml')
with open(spec_path, 'r') as f:
with open(spec_path, 'r', encoding='utf-8') as f:
spec_dict = yaml.safe_load(f)
_openapi_spec = OpenAPI.from_dict(spec_dict)
return _openapi_spec

View File

@@ -353,12 +353,15 @@ async def async_update_worker(worker_id, q, notification_q, app, datastore):
count = watch.get('check_count', 0) + 1
# Always record page title (used in notifications, and can change even when the content is the same)
try:
page_title = html_tools.extract_title(data=update_handler.fetcher.content)
logger.debug(f"UUID: {uuid} Page <title> is '{page_title}'")
datastore.update_watch(uuid=uuid, update_obj={'page_title': page_title})
except Exception as e:
logger.warning(f"UUID: {uuid} Exception when extracting <title> - {str(e)}")
if update_obj.get('content-type') and 'html' in update_obj.get('content-type'):
try:
page_title = html_tools.extract_title(data=update_handler.fetcher.content)
if page_title:
page_title = page_title.strip()[:2000]
logger.debug(f"UUID: {uuid} Page <title> is '{page_title}'")
datastore.update_watch(uuid=uuid, update_obj={'page_title': page_title})
except Exception as e:
logger.warning(f"UUID: {uuid} Exception when extracting <title> - {str(e)}")
# Record server header
try:

View File

@@ -1 +1,17 @@
RSS_FORMAT_TYPES = [('plaintext', 'Plain text'), ('html', 'HTML Color')]
from copy import deepcopy
from loguru import logger
from changedetectionio.model import USE_SYSTEM_DEFAULT_NOTIFICATION_FORMAT_FOR_WATCH
from changedetectionio.notification import valid_notification_formats
RSS_CONTENT_FORMAT_DEFAULT = 'text'
# Some stuff not related
RSS_FORMAT_TYPES = deepcopy(valid_notification_formats)
if RSS_FORMAT_TYPES.get('markdown'):
del RSS_FORMAT_TYPES['markdown']
if RSS_FORMAT_TYPES.get(USE_SYSTEM_DEFAULT_NOTIFICATION_FORMAT_FOR_WATCH):
del RSS_FORMAT_TYPES[USE_SYSTEM_DEFAULT_NOTIFICATION_FORMAT_FOR_WATCH]
if not RSS_FORMAT_TYPES.get(RSS_CONTENT_FORMAT_DEFAULT):
logger.critical(f"RSS_CONTENT_FORMAT_DEFAULT not in the acceptable list {RSS_CONTENT_FORMAT_DEFAULT}")

View File

@@ -0,0 +1,97 @@
"""
Utility functions for RSS feed generation.
"""
from changedetectionio.jinja2_custom import render as jinja_render
from changedetectionio.notification.handler import apply_service_tweaks
from loguru import logger
import re
BAD_CHARS_REGEX = r'[\x00-\x08\x0B\x0C\x0E-\x1F]'
def scan_invalid_chars_in_rss(content):
"""
Scan for invalid characters in RSS content.
Returns True if invalid characters are found.
"""
for match in re.finditer(BAD_CHARS_REGEX, content):
i = match.start()
bad_char = content[i]
hex_value = f"0x{ord(bad_char):02x}"
# Grab context
start = max(0, i - 20)
end = min(len(content), i + 21)
context = content[start:end].replace('\n', '\\n').replace('\r', '\\r')
logger.warning(f"Invalid char {hex_value} at pos {i}: ...{context}...")
# First match is enough
return True
return False
def clean_entry_content(content):
"""
Remove invalid characters from RSS content.
"""
cleaned = re.sub(BAD_CHARS_REGEX, '', content)
return cleaned
def generate_watch_guid(watch):
"""
Generate a unique GUID for a watch RSS entry.
"""
return f"{watch['uuid']}/{watch.last_changed}"
def generate_watch_diff_content(watch, dates, rss_content_format, datastore, date_index_from=-2, date_index_to=-1):
"""
Generate HTML diff content for a watch given its history dates.
Returns tuple of (content, watch_label).
Args:
watch: The watch object
dates: List of history snapshot dates
rss_content_format: Format for RSS content (html or text)
datastore: The ChangeDetectionStore instance
date_index_from: Index of the "from" date in the dates list (default: -2)
date_index_to: Index of the "to" date in the dates list (default: -1)
Returns:
Tuple of (content, watch_label) - the rendered HTML content and watch label
"""
from changedetectionio import diff
# Same logic as watch-overview.html
if datastore.data['settings']['application']['ui'].get('use_page_title_in_list') or watch.get('use_page_title_in_list'):
watch_label = watch.label
else:
watch_label = watch.get('url')
try:
html_diff = diff.render_diff(
previous_version_file_contents=watch.get_history_snapshot(timestamp=dates[date_index_from]),
newest_version_file_contents=watch.get_history_snapshot(timestamp=dates[date_index_to]),
include_equal=False
)
requested_output_format = datastore.data['settings']['application'].get('rss_content_format')
url, html_diff, n_title = apply_service_tweaks(url='', n_body=html_diff, n_title=None, requested_output_format=requested_output_format)
except FileNotFoundError as e:
html_diff = f"History snapshot file for watch {watch.get('uuid')}@{watch.last_changed} - '{watch.get('title')} not found."
# @note: We use <pre> because nearly all RSS readers render only HTML (Thunderbird for example cant do just plaintext)
rss_template = "<pre>{{watch_label}} had a change.\n\n{{html_diff}}\n</pre>"
if 'html' in rss_content_format:
rss_template = "<html><body>\n<h4><a href=\"{{watch_url}}\">{{watch_label}}</a></h4>\n<p>{{html_diff}}</p>\n</body></html>\n"
content = jinja_render(template_str=rss_template, watch_label=watch_label, html_diff=html_diff, watch_url=watch.link)
# Out of range chars could also break feedgen
if scan_invalid_chars_in_rss(content):
content = clean_entry_content(content)
return content, watch_label

View File

@@ -1,155 +1,26 @@
from changedetectionio.jinja2_custom import render as jinja_render
from changedetectionio.notification.handler import apply_service_tweaks
from changedetectionio.store import ChangeDetectionStore
from feedgen.feed import FeedGenerator
from flask import Blueprint, make_response, request, url_for, redirect
from loguru import logger
import datetime
import pytz
import re
import time
from flask import Blueprint
BAD_CHARS_REGEX=r'[\x00-\x08\x0B\x0C\x0E-\x1F]'
# Anything that is not text/UTF-8 should be stripped before it breaks feedgen (such as binary data etc)
def scan_invalid_chars_in_rss(content):
for match in re.finditer(BAD_CHARS_REGEX, content):
i = match.start()
bad_char = content[i]
hex_value = f"0x{ord(bad_char):02x}"
# Grab context
start = max(0, i - 20)
end = min(len(content), i + 21)
context = content[start:end].replace('\n', '\\n').replace('\r', '\\r')
logger.warning(f"Invalid char {hex_value} at pos {i}: ...{context}...")
# First match is enough
return True
return False
def clean_entry_content(content):
cleaned = re.sub(BAD_CHARS_REGEX, '', content)
return cleaned
from . import tag as tag_routes
from . import main_feed
from . import single_watch
def construct_blueprint(datastore: ChangeDetectionStore):
"""
Construct and configure the RSS blueprint with all routes.
Args:
datastore: The ChangeDetectionStore instance
Returns:
The configured Flask blueprint
"""
rss_blueprint = Blueprint('rss', __name__)
# Some RSS reader situations ended up with rss/ (forward slash after RSS) due
# to some earlier blueprint rerouting work, it should goto feed.
@rss_blueprint.route("/", methods=['GET'])
def extraslash():
return redirect(url_for('rss.feed'))
# Import the login decorator if needed
# from changedetectionio.auth_decorator import login_optionally_required
@rss_blueprint.route("", methods=['GET'])
def feed():
now = time.time()
# Always requires token set
app_rss_token = datastore.data['settings']['application'].get('rss_access_token')
rss_url_token = request.args.get('token')
if rss_url_token != app_rss_token:
return "Access denied, bad token", 403
from changedetectionio import diff
limit_tag = request.args.get('tag', '').lower().strip()
# Be sure limit_tag is a uuid
for uuid, tag in datastore.data['settings']['application'].get('tags', {}).items():
if limit_tag == tag.get('title', '').lower().strip():
limit_tag = uuid
# Sort by last_changed and add the uuid which is usually the key..
sorted_watches = []
# @todo needs a .itemsWithTag() or something - then we can use that in Jinaj2 and throw this away
for uuid, watch in datastore.data['watching'].items():
# @todo tag notification_muted skip also (improve Watch model)
if datastore.data['settings']['application'].get('rss_hide_muted_watches') and watch.get('notification_muted'):
continue
if limit_tag and not limit_tag in watch['tags']:
continue
watch['uuid'] = uuid
sorted_watches.append(watch)
sorted_watches.sort(key=lambda x: x.last_changed, reverse=False)
fg = FeedGenerator()
fg.title('changedetection.io')
fg.description('Feed description')
fg.link(href='https://changedetection.io')
html_colour_enable = False
if datastore.data['settings']['application'].get('rss_content_format') == 'html':
html_colour_enable = True
for watch in sorted_watches:
dates = list(watch.history.keys())
# Re #521 - Don't bother processing this one if theres less than 2 snapshots, means we never had a change detected.
if len(dates) < 2:
continue
if not watch.viewed:
# Re #239 - GUID needs to be individual for each event
# @todo In the future make this a configurable link back (see work on BASE_URL https://github.com/dgtlmoon/changedetection.io/pull/228)
guid = "{}/{}".format(watch['uuid'], watch.last_changed)
fe = fg.add_entry()
# Include a link to the diff page, they will have to login here to see if password protection is enabled.
# Description is the page you watch, link takes you to the diff JS UI page
# Dict val base_url will get overriden with the env var if it is set.
ext_base_url = datastore.data['settings']['application'].get('active_base_url')
# @todo fix
# Because we are called via whatever web server, flask should figure out the right path (
diff_link = {'href': url_for('ui.ui_views.diff_history_page', uuid=watch['uuid'], _external=True)}
fe.link(link=diff_link)
# Same logic as watch-overview.html
if datastore.data['settings']['application']['ui'].get('use_page_title_in_list') or watch.get('use_page_title_in_list'):
watch_label = watch.label
else:
watch_label = watch.get('url')
fe.title(title=watch_label)
try:
html_diff = diff.render_diff(previous_version_file_contents=watch.get_history_snapshot(dates[-2]),
newest_version_file_contents=watch.get_history_snapshot(dates[-1]),
include_equal=False,
line_feed_sep="<br>"
)
requested_output_format = 'htmlcolor' if html_colour_enable else 'html'
html_diff = apply_service_tweaks(url='', n_body=html_diff, n_title=None, requested_output_format=requested_output_format)
except FileNotFoundError as e:
html_diff = f"History snapshot file for watch {watch.get('uuid')}@{watch.last_changed} - '{watch.get('title')} not found."
# @todo Make this configurable and also consider html-colored markup
# @todo User could decide if <link> goes to the diff page, or to the watch link
rss_template = "<html><body>\n<h4><a href=\"{{watch_url}}\">{{watch_title}}</a></h4>\n<p>{{html_diff}}</p>\n</body></html>\n"
content = jinja_render(template_str=rss_template, watch_title=watch_label, html_diff=html_diff, watch_url=watch.link)
# Out of range chars could also break feedgen
if scan_invalid_chars_in_rss(content):
content = clean_entry_content(content)
fe.content(content=content, type='CDATA')
fe.guid(guid, permalink=False)
dt = datetime.datetime.fromtimestamp(int(watch.newest_history_key))
dt = dt.replace(tzinfo=pytz.UTC)
fe.pubDate(dt)
response = make_response(fg.rss_str())
response.headers.set('Content-Type', 'application/rss+xml;charset=utf-8')
logger.trace(f"RSS generated in {time.time() - now:.3f}s")
return response
# Register all route modules
main_feed.construct_main_feed_routes(rss_blueprint, datastore)
single_watch.construct_single_watch_routes(rss_blueprint, datastore)
tag_routes.construct_tag_routes(rss_blueprint, datastore)
return rss_blueprint

View File

@@ -0,0 +1,109 @@
from flask import make_response, request, url_for, redirect
from feedgen.feed import FeedGenerator
from loguru import logger
import datetime
import pytz
import time
from ._util import generate_watch_guid, generate_watch_diff_content
def construct_main_feed_routes(rss_blueprint, datastore):
"""
Construct the main RSS feed routes.
Args:
rss_blueprint: The Flask blueprint to add routes to
datastore: The ChangeDetectionStore instance
"""
# Some RSS reader situations ended up with rss/ (forward slash after RSS) due
# to some earlier blueprint rerouting work, it should goto feed.
@rss_blueprint.route("/", methods=['GET'])
def extraslash():
return redirect(url_for('rss.feed'))
# Import the login decorator if needed
# from changedetectionio.auth_decorator import login_optionally_required
@rss_blueprint.route("", methods=['GET'])
def feed():
now = time.time()
# Always requires token set
app_rss_token = datastore.data['settings']['application'].get('rss_access_token')
rss_url_token = request.args.get('token')
rss_content_format = datastore.data['settings']['application'].get('rss_content_format')
if rss_url_token != app_rss_token:
return "Access denied, bad token", 403
limit_tag = request.args.get('tag', '').lower().strip()
# Be sure limit_tag is a uuid
for uuid, tag in datastore.data['settings']['application'].get('tags', {}).items():
if limit_tag == tag.get('title', '').lower().strip():
limit_tag = uuid
# Sort by last_changed and add the uuid which is usually the key..
sorted_watches = []
# @todo needs a .itemsWithTag() or something - then we can use that in Jinaj2 and throw this away
for uuid, watch in datastore.data['watching'].items():
# @todo tag notification_muted skip also (improve Watch model)
if datastore.data['settings']['application'].get('rss_hide_muted_watches') and watch.get('notification_muted'):
continue
if limit_tag and not limit_tag in watch['tags']:
continue
watch['uuid'] = uuid
sorted_watches.append(watch)
sorted_watches.sort(key=lambda x: x.last_changed, reverse=False)
fg = FeedGenerator()
fg.title('changedetection.io')
fg.description('Feed description')
fg.link(href='https://changedetection.io')
for watch in sorted_watches:
dates = list(watch.history.keys())
# Re #521 - Don't bother processing this one if theres less than 2 snapshots, means we never had a change detected.
if len(dates) < 2:
continue
if not watch.viewed:
# Re #239 - GUID needs to be individual for each event
# @todo In the future make this a configurable link back (see work on BASE_URL https://github.com/dgtlmoon/changedetection.io/pull/228)
guid = generate_watch_guid(watch)
fe = fg.add_entry()
# Include a link to the diff page, they will have to login here to see if password protection is enabled.
# Description is the page you watch, link takes you to the diff JS UI page
# Dict val base_url will get overriden with the env var if it is set.
ext_base_url = datastore.data['settings']['application'].get('active_base_url')
# @todo fix
# Because we are called via whatever web server, flask should figure out the right path (
diff_link = {'href': url_for('ui.ui_views.diff_history_page', uuid=watch['uuid'], _external=True)}
fe.link(link=diff_link)
content, watch_label = generate_watch_diff_content(watch, dates, rss_content_format, datastore)
fe.title(title=watch_label)
fe.content(content=content, type='CDATA')
fe.guid(guid, permalink=False)
dt = datetime.datetime.fromtimestamp(int(watch.newest_history_key))
dt = dt.replace(tzinfo=pytz.UTC)
fe.pubDate(dt)
# Add categories based on watch tags
for tag_uuid in watch.get('tags', []):
tag = datastore.data['settings']['application'].get('tags', {}).get(tag_uuid)
if tag:
tag_title = tag.get('title', '')
if tag_title:
fe.category(term=tag_title)
response = make_response(fg.rss_str())
response.headers.set('Content-Type', 'application/rss+xml;charset=utf-8')
logger.trace(f"RSS generated in {time.time() - now:.3f}s")
return response

View File

@@ -0,0 +1,159 @@
from flask import make_response, request, url_for
from feedgen.feed import FeedGenerator
import datetime
import pytz
import locale
from ._util import generate_watch_guid, generate_watch_diff_content
def construct_single_watch_routes(rss_blueprint, datastore):
"""
Construct RSS feed routes for single watches.
Args:
rss_blueprint: The Flask blueprint to add routes to
datastore: The ChangeDetectionStore instance
"""
@rss_blueprint.route("/watch/<string:uuid>", methods=['GET'])
def rss_single_watch(uuid):
"""
Display the most recent changes for a single watch as RSS feed.
Returns RSS XML with multiple entries showing diffs between consecutive snapshots.
The number of entries is controlled by the rss_diff_length setting.
"""
# Always requires token set
app_rss_token = datastore.data['settings']['application'].get('rss_access_token')
rss_url_token = request.args.get('token')
rss_content_format = datastore.data['settings']['application'].get('rss_content_format')
if rss_url_token != app_rss_token:
return "Access denied, bad token", 403
# Get the watch by UUID
watch = datastore.data['watching'].get(uuid)
if not watch:
return f"Watch with UUID {uuid} not found", 404
# Check if watch has at least 2 history snapshots
dates = list(watch.history.keys())
if len(dates) < 2:
return f"Watch {uuid} does not have enough history snapshots to show changes (need at least 2)", 400
# Add uuid to watch for proper functioning
watch['uuid'] = uuid
# Get the number of diffs to include (default: 5)
rss_diff_length = datastore.data['settings']['application'].get('rss_diff_length', 5)
# Calculate how many diffs we can actually show (limited by available history)
# We need at least 2 snapshots to create 1 diff
max_possible_diffs = len(dates) - 1
num_diffs = min(rss_diff_length, max_possible_diffs) if rss_diff_length > 0 else max_possible_diffs
# Create RSS feed
fg = FeedGenerator()
# Set title: use "label (url)" if label differs from url, otherwise just url
watch_url = watch.get('url', '')
watch_label = watch.label
if watch_label and watch_label != watch_url:
feed_title = f'changedetection.io - {watch_label} ({watch_url})'
else:
feed_title = f'changedetection.io - {watch_url}'
fg.title(feed_title)
fg.description('Changes')
fg.link(href='https://changedetection.io')
# Loop through history and create RSS entries for each diff
# Add entries in reverse order because feedgen reverses them
# This way, the newest change appears first in the final RSS
for i in range(num_diffs - 1, -1, -1):
# Calculate indices for this diff (working backwards from newest)
# i=0: compare dates[-2] to dates[-1] (most recent change)
# i=1: compare dates[-3] to dates[-2] (previous change)
# etc.
date_index_to = -(i + 1)
date_index_from = -(i + 2)
try:
# Generate the diff content for this pair of snapshots
timestamp_to = dates[date_index_to]
timestamp_from = dates[date_index_from]
content, watch_label = generate_watch_diff_content(
watch, dates, rss_content_format, datastore,
date_index_from=date_index_from,
date_index_to=date_index_to
)
# Generate edit watch link and add to content
edit_watch_url = url_for('ui.ui_edit.edit_page',
uuid=watch['uuid'],
_external=True)
# Add edit watch links at top and bottom of content
if 'html' in rss_content_format:
edit_link_html = f'<p><a href="{edit_watch_url}">[edit watch]</a></p>'
# Insert after <body> and before </body>
content = content.replace('<body>', f'<body>\n{edit_link_html}', 1)
content = content.replace('</body>', f'{edit_link_html}\n</body>', 1)
else:
# For plain text format, add plain text links in separate <pre> blocks
edit_link_top = f'<pre>[edit watch] {edit_watch_url}</pre>\n'
edit_link_bottom = f'\n<pre>[edit watch] {edit_watch_url}</pre>'
content = edit_link_top + content + edit_link_bottom
# Create a unique GUID for this specific diff
guid = f"{watch['uuid']}/{timestamp_to}"
fe = fg.add_entry()
# Include a link to the diff page with specific versions
diff_link = {'href': url_for('ui.ui_views.diff_history_page',
uuid=watch['uuid'],
from_version=timestamp_from,
to_version=timestamp_to,
_external=True)}
fe.link(link=diff_link)
# Format the date using locale-aware formatting with timezone
dt = datetime.datetime.fromtimestamp(int(timestamp_to))
dt = dt.replace(tzinfo=pytz.UTC)
# Get local timezone-aware datetime
local_tz = datetime.datetime.now().astimezone().tzinfo
local_dt = dt.astimezone(local_tz)
# Format date with timezone - using strftime for locale awareness
try:
formatted_date = local_dt.strftime('%Y-%m-%d %H:%M:%S %Z')
except:
# Fallback if locale issues
formatted_date = local_dt.isoformat()
# Use formatted date in title instead of "Change 1, 2, 3"
fe.title(title=f"{watch_label} - Change @ {formatted_date}")
fe.content(content=content, type='CDATA')
fe.guid(guid, permalink=False)
# Use the timestamp of the "to" snapshot for pubDate
fe.pubDate(dt)
# Add categories based on watch tags
for tag_uuid in watch.get('tags', []):
tag = datastore.data['settings']['application'].get('tags', {}).get(tag_uuid)
if tag:
tag_title = tag.get('title', '')
if tag_title:
fe.category(term=tag_title)
except (IndexError, FileNotFoundError) as e:
# Skip this diff if we can't generate it
continue
response = make_response(fg.rss_str())
response.headers.set('Content-Type', 'application/rss+xml;charset=utf-8')
return response

View File

@@ -0,0 +1,94 @@
from flask import make_response, request, url_for
from feedgen.feed import FeedGenerator
import datetime
import pytz
from ._util import generate_watch_guid, generate_watch_diff_content
def construct_tag_routes(rss_blueprint, datastore):
"""
Construct RSS feed routes for tags.
Args:
rss_blueprint: The Flask blueprint to add routes to
datastore: The ChangeDetectionStore instance
"""
@rss_blueprint.route("/tag/<string:tag_uuid>", methods=['GET'])
def rss_tag_feed(tag_uuid):
"""
Display an RSS feed for all unviewed watches that belong to a specific tag.
Returns RSS XML with entries for each unviewed watch with sufficient history.
"""
# Always requires token set
app_rss_token = datastore.data['settings']['application'].get('rss_access_token')
rss_url_token = request.args.get('token')
rss_content_format = datastore.data['settings']['application'].get('rss_content_format')
if rss_url_token != app_rss_token:
return "Access denied, bad token", 403
# Verify tag exists
tag = datastore.data['settings']['application'].get('tags', {}).get(tag_uuid)
if not tag:
return f"Tag with UUID {tag_uuid} not found", 404
tag_title = tag.get('title', 'Unknown Tag')
# Create RSS feed
fg = FeedGenerator()
fg.title(f'changedetection.io - {tag_title}')
fg.description(f'Changes for watches tagged with {tag_title}')
fg.link(href='https://changedetection.io')
# Find all watches with this tag
for uuid, watch in datastore.data['watching'].items():
# Skip if watch doesn't have this tag
if tag_uuid not in watch.get('tags', []):
continue
# Skip muted watches if configured
if datastore.data['settings']['application'].get('rss_hide_muted_watches') and watch.get('notification_muted'):
continue
# Check if watch has at least 2 history snapshots
dates = list(watch.history.keys())
if len(dates) < 2:
continue
# Only include unviewed watches
if not watch.viewed:
# Add uuid to watch for proper functioning
watch['uuid'] = uuid
# Generate GUID for this entry
guid = generate_watch_guid(watch)
fe = fg.add_entry()
# Include a link to the diff page
diff_link = {'href': url_for('ui.ui_views.diff_history_page', uuid=watch['uuid'], _external=True)}
fe.link(link=diff_link)
# Generate diff content
content, watch_label = generate_watch_diff_content(watch, dates, rss_content_format, datastore)
fe.title(title=watch_label)
fe.content(content=content, type='CDATA')
fe.guid(guid, permalink=False)
dt = datetime.datetime.fromtimestamp(int(watch.newest_history_key))
dt = dt.replace(tzinfo=pytz.UTC)
fe.pubDate(dt)
# Add categories based on watch tags
for tag_uuid in watch.get('tags', []):
tag = datastore.data['settings']['application'].get('tags', {}).get(tag_uuid)
if tag:
tag_title = tag.get('title', '')
if tag_title:
fe.category(term=tag_title)
response = make_response(fg.rss_str())
response.headers.set('Content-Type', 'application/rss+xml;charset=utf-8')
return response

View File

@@ -24,6 +24,7 @@
<li class="tab"><a href="#filters">Global Filters</a></li>
<li class="tab"><a href="#ui-options">UI Options</a></li>
<li class="tab"><a href="#api">API</a></li>
<li class="tab"><a href="#rss">RSS</a></li>
<li class="tab"><a href="#timedate">Time &amp Date</a></li>
<li class="tab"><a href="#proxies">CAPTCHA &amp; Proxies</a></li>
</ul>
@@ -43,10 +44,6 @@
</div>
</div>
</div>
<div class="pure-control-group">
{{ render_field(form.requests.form.jitter_seconds, class="jitter_seconds") }}
<span class="pure-form-message-inline">Example - 3 seconds random jitter could trigger up to 3 seconds earlier or up to 3 seconds later</span>
</div>
<div class="pure-control-group">
{{ render_field(form.application.form.filter_failure_notification_threshold_attempts, class="filter_failure_notification_threshold_attempts") }}
<span class="pure-form-message-inline">After this many consecutive times that the CSS/xPath filter is missing, send a notification
@@ -76,19 +73,6 @@
{{ render_checkbox_field(form.application.form.empty_pages_are_a_change) }}
<span class="pure-form-message-inline">When a request returns no content, or the HTML does not contain any text, is this considered a change?</span>
</div>
<div class="grey-form-border">
<div class="pure-control-group">
{{ render_checkbox_field(form.application.form.rss_hide_muted_watches) }}
</div>
<div class="pure-control-group">
{{ render_field(form.application.form.rss_content_format) }}
<span class="pure-form-message-inline">Love RSS? Does your reader support HTML? Set it here</span>
</div>
<div class="pure-control-group">
{{ render_checkbox_field(form.application.form.rss_reader_mode) }}
<span class="pure-form-message-inline">Transforms RSS/RDF feed watches into beautiful text only</span>
</div>
</div>
</fieldset>
</div>
@@ -131,6 +115,10 @@
<span class="pure-form-message-inline">Number of concurrent workers to process watches. More workers = faster processing but higher memory usage.<br>
Currently running: <strong>{{ worker_info.count }}</strong> operational {{ worker_info.type }} workers{% if worker_info.active_workers > 0 %} ({{ worker_info.active_workers }} actively processing){% endif %}.</span>
</div>
<div class="pure-control-group">
{{ render_field(form.requests.form.jitter_seconds, class="jitter_seconds") }}
<span class="pure-form-message-inline">Example - 3 seconds random jitter could trigger up to 3 seconds earlier or up to 3 seconds later</span>
</div>
<div class="pure-control-group">
{{ render_field(form.requests.form.timeout) }}
<span class="pure-form-message-inline">For regular plain requests (not chrome based), maximum number of seconds until timeout, 1-999.<br>
@@ -230,6 +218,24 @@ nav
</p>
</div>
</div>
<div class="tab-pane-inner" id="rss">
<div class="pure-control-group">
{{ render_checkbox_field(form.application.form.rss_hide_muted_watches) }}
</div>
<div class="pure-control-group">
{{ render_field(form.application.form.rss_content_format) }}
<span class="pure-form-message-inline">Love RSS? Does your reader support HTML? Set it here</span>
</div>
<div class="pure-control-group">
{{ render_field(form.application.form.rss_diff_length) }}
<span class="pure-form-message-inline">Maximum number of history snapshots to include in the watch specific RSS feed.</span>
</div>
<div class="pure-control-group">
{{ render_checkbox_field(form.application.form.rss_reader_mode) }}
<span class="pure-form-message-inline">For watching other RSS feeds - When watching RSS/Atom feeds, convert them into clean text for better change detection.</span>
</div>
</div>
<div class="tab-pane-inner" id="timedate">
<div class="pure-control-group">
Ensure the settings below are correct, they are used to manage the time schedule for checking your web page watches.

View File

@@ -21,9 +21,10 @@ def construct_blueprint(datastore: ChangeDetectionStore):
tag_count = Counter(tag for watch in datastore.data['watching'].values() if watch.get('tags') for tag in watch['tags'])
output = render_template("groups-overview.html",
app_rss_token=datastore.data['settings']['application'].get('rss_access_token'),
available_tags=sorted_tags,
form=add_form,
tag_count=tag_count
tag_count=tag_count,
)
return output
@@ -149,9 +150,9 @@ def construct_blueprint(datastore: ChangeDetectionStore):
included_content = template.render(**template_args)
output = render_template("edit-tag.html",
settings_application=datastore.data['settings']['application'],
extra_tab_content=form.extra_tab_content() if form.extra_tab_content() else None,
extra_form_content=included_content,
extra_tab_content=form.extra_tab_content() if form.extra_tab_content() else None,
settings_application=datastore.data['settings']['application'],
**template_args
)

View File

@@ -52,6 +52,7 @@
<a class="pure-button pure-button-primary" href="{{ url_for('tags.form_tag_edit', uuid=uuid) }}">Edit</a>&nbsp;
<a class="pure-button pure-button-primary" href="{{ url_for('tags.delete', uuid=uuid) }}" title="Deletes and removes tag">Delete</a>
<a class="pure-button pure-button-primary" href="{{ url_for('tags.unlink', uuid=uuid) }}" title="Keep the tag but unlink any watches">Unlink</a>
<a href="{{ url_for('rss.rss_tag_feed', tag_uuid=uuid, token=app_rss_token)}}"><img alt="RSS Feed for this watch" style="padding-left: 1em;" src="{{url_for('static_content', group='images', filename='generic_feed-icon.svg')}}" height="15"></a>
</td>
</tr>
{% endfor %}

View File

@@ -236,7 +236,7 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, queuedWatchMe
# Import the global plugin system
from changedetectionio.pluggy_interface import collect_ui_edit_stats_extras
app_rss_token = datastore.data['settings']['application'].get('rss_access_token'),
template_args = {
'available_processors': processors.available_processors(),
'available_timezones': sorted(available_timezones()),
@@ -252,6 +252,11 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, queuedWatchMe
'has_special_tag_options': _watch_has_tag_options_set(watch=watch),
'jq_support': jq_support,
'playwright_enabled': os.getenv('PLAYWRIGHT_DRIVER_URL', False),
'app_rss_token': app_rss_token,
'rss_uuid_feed' : {
'label': watch.label,
'url': url_for('rss.rss_single_watch', uuid=watch['uuid'], token=app_rss_token)
},
'settings_application': datastore.data['settings']['application'],
'system_has_playwright_configured': os.getenv('PLAYWRIGHT_DRIVER_URL'),
'system_has_webdriver_configured': os.getenv('WEBDRIVER_URL'),

View File

@@ -106,7 +106,7 @@ def construct_blueprint(datastore: ChangeDetectionStore):
trigger_text = watch.get('trigger_text', [])
# Add text that was triggered
if len(dates):
snapshot_contents = watch.get_history_snapshot(dates[-1])
snapshot_contents = watch.get_history_snapshot(timestamp=dates[-1])
else:
snapshot_contents = "No snapshot/history available, the watch should fetch atleast once."
@@ -123,8 +123,8 @@ def construct_blueprint(datastore: ChangeDetectionStore):
if len(dates) > 1:
prev_snapshot = watch.get_history_snapshot(dates[-2])
current_snapshot = watch.get_history_snapshot(dates[-1])
prev_snapshot = watch.get_history_snapshot(timestamp=dates[-2])
current_snapshot = watch.get_history_snapshot(timestamp=dates[-1])
n_object.update(set_basic_notification_vars(snapshot_contents=snapshot_contents,
current_snapshot=current_snapshot,

View File

@@ -476,6 +476,7 @@ Math: {{ 1 + 1 }}") }}
class="pure-button button-error">Clear History</a>{% endif %}
<a href="{{url_for('ui.form_clone', uuid=uuid)}}"
class="pure-button">Clone &amp; Edit</a>
<a href="{{ url_for('rss.rss_single_watch', uuid=uuid, token=app_rss_token)}}"><img alt="RSS Feed for this watch" style="padding: .5em 1em;" src="{{url_for('static_content', group='images', filename='generic_feed-icon.svg')}}" height="15"></a>
</div>
</div>
</form>

View File

@@ -47,7 +47,7 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, queuedWatchMe
try:
versions = list(watch.history.keys())
content = watch.get_history_snapshot(timestamp)
content = watch.get_history_snapshot(timestamp=timestamp)
triggered_line_numbers = html_tools.strip_ignore_text(content=content,
wordlist=watch['trigger_text'],

View File

@@ -14,7 +14,7 @@ def count_words_in_history(watch, incoming_text=None):
elif watch.history.keys():
# When called from UI extras to count latest snapshot
latest_key = list(watch.history.keys())[-1]
latest_content = watch.get_history_snapshot(latest_key)
latest_content = watch.get_history_snapshot(timestamp=latest_key)
return len(latest_content.split())
return 0
except Exception as e:

View File

@@ -139,7 +139,7 @@ class fetcher(Fetcher):
content = await self.page.content()
destination = os.path.join(self.browser_steps_screenshot_path, 'step_{}.html'.format(step_n))
logger.debug(f"Saving step HTML to {destination}")
with open(destination, 'w') as f:
with open(destination, 'w', encoding='utf-8') as f:
f.write(content)
async def run(self,

View File

@@ -101,12 +101,12 @@ def init_app_secret(datastore_path):
path = os.path.join(datastore_path, "secret.txt")
try:
with open(path, "r") as f:
with open(path, "r", encoding='utf-8') as f:
secret = f.read()
except FileNotFoundError:
import secrets
with open(path, "w") as f:
with open(path, "w", encoding='utf-8') as f:
secret = secrets.token_hex(32)
f.write(secret)

View File

@@ -503,7 +503,9 @@ class ValidateJinja2Template(object):
jinja2_env = create_jinja_env(loader=BaseLoader)
# Add notification tokens for validation
jinja2_env.globals.update(NotificationContextData())
static_token_placeholders = NotificationContextData()
static_token_placeholders.set_random_for_validation()
jinja2_env.globals.update(static_token_placeholders)
if hasattr(field, 'extra_notification_tokens'):
jinja2_env.globals.update(field.extra_notification_tokens)
@@ -998,7 +1000,7 @@ class globalSettingsApplicationForm(commonSettingsForm):
validators=[validators.NumberRange(min=0,
message="Should be atleast zero (disabled)")])
rss_content_format = SelectField('RSS Content format', choices=RSS_FORMAT_TYPES)
rss_content_format = SelectField('RSS Content format', choices=list(RSS_FORMAT_TYPES.items()))
removepassword_button = SubmitField('Remove password', render_kw={"class": "pure-button pure-button-primary"})
render_anchor_tag_content = BooleanField('Render anchor tag content', default=False)
@@ -1007,8 +1009,10 @@ class globalSettingsApplicationForm(commonSettingsForm):
rss_hide_muted_watches = BooleanField('Hide muted watches from RSS feed', default=True,
validators=[validators.Optional()])
rss_reader_mode = BooleanField('RSS reader mode ', default=False,
validators=[validators.Optional()])
rss_reader_mode = BooleanField('RSS reader mode ', default=False, validators=[validators.Optional()])
rss_diff_length = IntegerField(label='Number of changes to show in watch RSS feed',
render_kw={"style": "width: 5em;"},
validators=[validators.NumberRange(min=0, message="Should contain zero or more attempts")])
filter_failure_notification_threshold_attempts = IntegerField('Number of times the filter can be missing before sending a notification',
render_kw={"style": "width: 5em;"},

View File

@@ -1,7 +1,7 @@
from os import getenv
from copy import deepcopy
from changedetectionio.blueprint.rss import RSS_FORMAT_TYPES
from changedetectionio.blueprint.rss import RSS_FORMAT_TYPES, RSS_CONTENT_FORMAT_DEFAULT
from changedetectionio.notification import (
default_notification_body,
@@ -54,7 +54,8 @@ class model(dict):
'password': False,
'render_anchor_tag_content': False,
'rss_access_token': None,
'rss_content_format': RSS_FORMAT_TYPES[0][0],
'rss_content_format': RSS_CONTENT_FORMAT_DEFAULT,
'rss_diff_length': 5,
'rss_hide_muted_watches': True,
'rss_reader_mode': False,
'scheduler_timezone_default': None, # Default IANA timezone name
@@ -81,7 +82,7 @@ class model(dict):
def parse_headers_from_text_file(filepath):
headers = {}
with open(filepath, 'r') as f:
with open(filepath, 'r', encoding='utf-8') as f:
for l in f.readlines():
l = l.strip()
if not l.startswith('#') and ':' in l:

View File

@@ -188,7 +188,7 @@ class model(watch_base):
fname = os.path.join(self.watch_data_dir, "history.txt")
if os.path.isfile(fname):
logger.debug(f"Reading watch history index for {self.get('uuid')}")
with open(fname, "r") as f:
with open(fname, "r", encoding='utf-8') as f:
for i in f.readlines():
if ',' in i:
k, v = i.strip().split(',', 2)
@@ -276,9 +276,17 @@ class model(watch_base):
# When the 'last viewed' timestamp is less than the oldest snapshot, return oldest
return sorted_keys[-1]
def get_history_snapshot(self, timestamp):
def get_history_snapshot(self, timestamp=None, filepath=None):
"""
Accepts either timestamp or filepath
:param timestamp:
:param filepath:
:return:
"""
import brotli
filepath = self.history[timestamp]
if not filepath:
filepath = self.history[timestamp]
# See if a brotli versions exists and switch to that
if not filepath.endswith('.br') and os.path.isfile(f"{filepath}.br"):
@@ -382,7 +390,7 @@ class model(watch_base):
# Compare each lines (set) against each history text file (set) looking for something new..
existing_history = set({})
for k, v in self.history.items():
content = self.get_history_snapshot(k)
content = self.get_history_snapshot(filepath=v)
if ignore_whitespace:
alist = set([line.translate(TRANSLATE_WHITESPACE_TABLE).lower() for line in content.splitlines()])
@@ -586,7 +594,7 @@ class model(watch_base):
"""Return the text saved from a previous request that resulted in a non-200 error"""
fname = os.path.join(self.watch_data_dir, "last-error.txt")
if os.path.isfile(fname):
with open(fname, 'r') as f:
with open(fname, 'r', encoding='utf-8') as f:
return f.read()
return False
@@ -639,7 +647,7 @@ class model(watch_base):
for k, fname in self.history.items():
if os.path.isfile(fname):
if True:
contents = self.get_history_snapshot(k)
contents = self.get_history_snapshot(timestamp=k)
res = re.findall(regex, contents, re.MULTILINE)
if res:
if not csv_writer:
@@ -732,7 +740,7 @@ class model(watch_base):
# If a previous attempt doesnt yet exist, just snarf the previous snapshot instead
dates = list(self.history.keys())
if len(dates):
return self.get_history_snapshot(dates[-1])
return self.get_history_snapshot(timestamp=dates[-1])
else:
return ''

View File

@@ -187,6 +187,8 @@ def replace_placemarkers_in_text(text, url, requested_output_format):
def apply_service_tweaks(url, n_body, n_title, requested_output_format):
logger.debug(f"Applying markup in '{requested_output_format}' mode")
# Re 323 - Limit discord length to their 2000 char limit total or it wont send.
# Because different notifications may require different pre-processing, run each sequentially :(
# 2000 bytes minus -

View File

@@ -133,7 +133,7 @@ class NotificationService:
# Add text that was triggered
if len(dates):
snapshot_contents = watch.get_history_snapshot(dates[-1])
snapshot_contents = watch.get_history_snapshot(timestamp=dates[-1])
else:
snapshot_contents = "No snapshot/history available, the watch should fetch atleast once."
@@ -154,8 +154,8 @@ class NotificationService:
current_snapshot = "Example text: example test\nExample text: change detection is fantastic\nExample text: even more examples\nExample text: a lot more examples"
if len(dates) > 1:
prev_snapshot = watch.get_history_snapshot(dates[-2])
current_snapshot = watch.get_history_snapshot(dates[-1])
prev_snapshot = watch.get_history_snapshot(timestamp=dates[-2])
current_snapshot = watch.get_history_snapshot(timestamp=dates[-1])
n_object.update(set_basic_notification_vars(snapshot_contents=snapshot_contents,

View File

@@ -280,7 +280,7 @@ class ContentProcessor:
# Sort JSON to avoid false alerts from reordering
try:
content = json.dumps(json.loads(content), sort_keys=True, indent=4)
content = json.dumps(json.loads(content), sort_keys=True, indent=2, ensure_ascii=False)
except Exception:
# Might be malformed JSON, continue anyway
pass

View File

@@ -37,18 +37,6 @@ class SignalHandler:
notification_event_signal.connect(self.handle_notification_event, weak=False)
logger.info("SignalHandler: Connected to notification_event signal")
# Create and start the queue update thread using standard threading
import threading
self.polling_emitter_thread = threading.Thread(
target=self.polling_emit_running_or_queued_watches_threaded,
daemon=True
)
self.polling_emitter_thread.start()
logger.info("Started polling thread using threading (eventlet-free)")
# Store the thread reference in socketio for clean shutdown
self.socketio_instance.polling_emitter_thread = self.polling_emitter_thread
def handle_signal(self, *args, **kwargs):
logger.trace(f"SignalHandler: Signal received with {len(args)} args and {len(kwargs)} kwargs")
# Safely extract the watch UUID from kwargs
@@ -124,74 +112,6 @@ class SignalHandler:
except Exception as e:
logger.error(f"Socket.IO error in handle_notification_event: {str(e)}")
def polling_emit_running_or_queued_watches_threaded(self):
"""Threading version of polling for Windows compatibility"""
import time
import threading
logger.info("Queue update thread started (threading mode)")
# Import here to avoid circular imports
from changedetectionio.flask_app import app
from changedetectionio import worker_handler
watch_check_update = signal('watch_check_update')
# Track previous state to avoid unnecessary emissions
previous_running_uuids = set()
# Run until app shutdown - check exit flag more frequently for fast shutdown
exit_event = getattr(app.config, 'exit', threading.Event())
while not exit_event.is_set():
try:
# Get current running UUIDs from async workers
running_uuids = set(worker_handler.get_running_uuids())
# Only send updates for UUIDs that changed state
newly_running = running_uuids - previous_running_uuids
no_longer_running = previous_running_uuids - running_uuids
# Send updates for newly running UUIDs (but exit fast if shutdown requested)
for uuid in newly_running:
if exit_event.is_set():
break
logger.trace(f"Threading polling: UUID {uuid} started processing")
with app.app_context():
watch_check_update.send(app_context=app, watch_uuid=uuid)
time.sleep(0.01) # Small yield
# Send updates for UUIDs that finished processing (but exit fast if shutdown requested)
if not exit_event.is_set():
for uuid in no_longer_running:
if exit_event.is_set():
break
logger.trace(f"Threading polling: UUID {uuid} finished processing")
with app.app_context():
watch_check_update.send(app_context=app, watch_uuid=uuid)
time.sleep(0.01) # Small yield
# Update tracking for next iteration
previous_running_uuids = running_uuids
# Sleep between polling cycles, but check exit flag every 0.5 seconds for fast shutdown
for _ in range(20): # 20 * 0.5 = 10 seconds total
if exit_event.is_set():
break
time.sleep(0.5)
except Exception as e:
logger.error(f"Error in threading polling: {str(e)}")
# Even during error recovery, check for exit quickly
for _ in range(1): # 1 * 0.5 = 0.5 seconds
if exit_event.is_set():
break
time.sleep(0.5)
# Check if we're in pytest environment - if so, be more gentle with logging
import sys
in_pytest = "pytest" in sys.modules or "PYTEST_CURRENT_TEST" in os.environ
if not in_pytest:
logger.info("Queue update thread stopped (threading mode)")
def handle_watch_update(socketio, **kwargs):
@@ -383,19 +303,6 @@ def init_socketio(app, datastore):
"""Shutdown the SocketIO server fast and aggressively"""
try:
logger.info("Socket.IO: Fast shutdown initiated...")
# For threading mode, give the thread a very short time to exit gracefully
if hasattr(socketio, 'polling_emitter_thread'):
if socketio.polling_emitter_thread.is_alive():
logger.info("Socket.IO: Waiting 1 second for polling thread to stop...")
socketio.polling_emitter_thread.join(timeout=1.0) # Only 1 second timeout
if socketio.polling_emitter_thread.is_alive():
logger.info("Socket.IO: Polling thread still running after timeout - continuing with shutdown")
else:
logger.info("Socket.IO: Polling thread stopped quickly")
else:
logger.info("Socket.IO: Polling thread already stopped")
logger.info("Socket.IO: Fast shutdown complete")
except Exception as e:
logger.error(f"Socket.IO error during shutdown: {str(e)}")

View File

@@ -11,6 +11,56 @@ set -e
SCRIPT_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd )
# Since theres no curl installed lets roll with python3
check_sanity() {
local port="$1"
if [ -z "$port" ]; then
echo "Usage: check_sanity <port>" >&2
return 1
fi
python3 - "$port" <<'PYCODE'
import sys, time, urllib.request, socket
port = sys.argv[1]
url = f'http://localhost:{port}'
ok = False
for _ in range(6): # --retry 6
try:
r = urllib.request.urlopen(url, timeout=3).read().decode()
if 'est-url-is-sanity' in r:
ok = True
break
except (urllib.error.URLError, ConnectionRefusedError, socket.error):
time.sleep(1)
sys.exit(0 if ok else 1)
PYCODE
}
data_sanity_test () {
# Restart data sanity test
cd ..
TMPDIR=$(mktemp -d)
PORT_N=$((5000 + RANDOM % (6501 - 5000)))
./changedetection.py -p $PORT_N -d $TMPDIR -u "https://localhost?test-url-is-sanity=1" &
PID=$!
sleep 5
kill $PID
sleep 2
./changedetection.py -p $PORT_N -d $TMPDIR &
PID=$!
sleep 5
# On a restart the URL should still be there
check_sanity $PORT_N || exit 1
kill $PID
cd $OLDPWD
# datastore looks alright, continue
}
data_sanity_test
# REMOVE_REQUESTS_OLD_SCREENSHOTS disabled so that we can write a screenshot and send it in test_notifications.py without a real browser
REMOVE_REQUESTS_OLD_SCREENSHOTS=false pytest -n 30 --dist load tests/test_*.py
@@ -41,3 +91,6 @@ FETCH_WORKERS=130 pytest tests/test_history_consistency.py -v -l
# Check file:// will pickup a file when enabled
echo "Hello world" > /tmp/test-file.txt
ALLOW_FILE_URI=yes pytest tests/test_security.py

View File

@@ -6,6 +6,7 @@ from flask import (
flash
)
from .blueprint.rss import RSS_CONTENT_FORMAT_DEFAULT
from .html_tools import TRANSLATE_WHITESPACE_TABLE
from .model import App, Watch, USE_SYSTEM_DEFAULT_NOTIFICATION_FORMAT_FOR_WATCH
from copy import deepcopy, copy
@@ -22,6 +23,13 @@ import uuid as uuid_builder
from loguru import logger
from blinker import signal
# Try to import orjson for faster JSON serialization
try:
import orjson
HAS_ORJSON = True
except ImportError:
HAS_ORJSON = False
from .processors import get_custom_watch_obj_for_processor
from .processors.restock_diff import Restock
@@ -37,7 +45,7 @@ class ChangeDetectionStore:
lock = Lock()
# For general updates/writes that can wait a few seconds
needs_write = False
datastore_path = None
# For when we edit, we should write to disk
needs_write_urgent = False
@@ -47,18 +55,30 @@ class ChangeDetectionStore:
def __init__(self, datastore_path="/datastore", include_default_watches=True, version_tag="0.0.0"):
# Should only be active for docker
# logging.basicConfig(filename='/dev/stdout', level=logging.INFO)
self.datastore_path = datastore_path
self.needs_write = False
self.start_time = time.time()
self.stop_thread = False
self.save_version_copy_json_db(version_tag)
self.reload_state(datastore_path=datastore_path, include_default_watches=include_default_watches, version_tag=version_tag)
def save_version_copy_json_db(self, version_tag):
import re
version_text = re.sub(r'\D+', '-', version_tag)
db_path = os.path.join(self.datastore_path, "url-watches.json")
db_path_version_backup = os.path.join(self.datastore_path, f"url-watches-{version_text}.json")
if not os.path.isfile(db_path_version_backup) and os.path.isfile(db_path):
from shutil import copyfile
logger.info(f"Backing up JSON DB due to new version to '{db_path_version_backup}'.")
copyfile(db_path, db_path_version_backup)
def reload_state(self, datastore_path, include_default_watches, version_tag):
logger.info(f"Datastore path is '{datastore_path}'")
self.__data = App.model()
self.datastore_path = datastore_path
self.json_store_path = os.path.join(self.datastore_path, "url-watches.json")
# Base definition for all watchers
# deepcopy part of #569 - not sure why its needed exactly
@@ -71,37 +91,46 @@ class ChangeDetectionStore:
self.__data['build_sha'] = f.read()
try:
# @todo retest with ", encoding='utf-8'"
with open(self.json_store_path) as json_file:
from_disk = json.load(json_file)
if HAS_ORJSON:
# orjson.loads() expects UTF-8 encoded bytes #3611
with open(self.json_store_path, 'rb') as json_file:
from_disk = orjson.loads(json_file.read())
else:
with open(self.json_store_path, encoding='utf-8') as json_file:
from_disk = json.load(json_file)
# @todo isnt there a way todo this dict.update recursively?
# Problem here is if the one on the disk is missing a sub-struct, it wont be present anymore.
if 'watching' in from_disk:
self.__data['watching'].update(from_disk['watching'])
if not from_disk:
# No FileNotFound exception was thrown but somehow the JSON was empty - abort for safety.
logger.critical(f"JSON DB existed but was empty on load - empty JSON file? '{self.json_store_path}' Aborting")
raise Exception('JSON DB existed but was empty on load - Aborting')
if 'app_guid' in from_disk:
self.__data['app_guid'] = from_disk['app_guid']
# @todo isnt there a way todo this dict.update recursively?
# Problem here is if the one on the disk is missing a sub-struct, it wont be present anymore.
if 'watching' in from_disk:
self.__data['watching'].update(from_disk['watching'])
if 'settings' in from_disk:
if 'headers' in from_disk['settings']:
self.__data['settings']['headers'].update(from_disk['settings']['headers'])
if 'app_guid' in from_disk:
self.__data['app_guid'] = from_disk['app_guid']
if 'requests' in from_disk['settings']:
self.__data['settings']['requests'].update(from_disk['settings']['requests'])
if 'settings' in from_disk:
if 'headers' in from_disk['settings']:
self.__data['settings']['headers'].update(from_disk['settings']['headers'])
if 'application' in from_disk['settings']:
self.__data['settings']['application'].update(from_disk['settings']['application'])
if 'requests' in from_disk['settings']:
self.__data['settings']['requests'].update(from_disk['settings']['requests'])
# Convert each existing watch back to the Watch.model object
for uuid, watch in self.__data['watching'].items():
self.__data['watching'][uuid] = self.rehydrate_entity(uuid, watch)
logger.info(f"Watching: {uuid} {watch['url']}")
if 'application' in from_disk['settings']:
self.__data['settings']['application'].update(from_disk['settings']['application'])
# And for Tags also, should be Restock type because it has extra settings
for uuid, tag in self.__data['settings']['application']['tags'].items():
self.__data['settings']['application']['tags'][uuid] = self.rehydrate_entity(uuid, tag, processor_override='restock_diff')
logger.info(f"Tag: {uuid} {tag['title']}")
# Convert each existing watch back to the Watch.model object
for uuid, watch in self.__data['watching'].items():
self.__data['watching'][uuid] = self.rehydrate_entity(uuid, watch)
logger.info(f"Watching: {uuid} {watch['url']}")
# And for Tags also, should be Restock type because it has extra settings
for uuid, tag in self.__data['settings']['application']['tags'].items():
self.__data['settings']['application']['tags'][uuid] = self.rehydrate_entity(uuid, tag, processor_override='restock_diff')
logger.info(f"Tag: {uuid} {tag['title']}")
# First time ran, Create the datastore.
except (FileNotFoundError):
@@ -426,9 +455,15 @@ class ChangeDetectionStore:
# Re #286 - First write to a temp file, then confirm it looks OK and rename it
# This is a fairly basic strategy to deal with the case that the file is corrupted,
# system was out of memory, out of RAM etc
with open(self.json_store_path+".tmp", 'w') as json_file:
# Use compact JSON in production for better performance
json.dump(data, json_file, indent=2)
if HAS_ORJSON:
# Use orjson for faster serialization
# orjson.dumps() always returns UTF-8 encoded bytes #3611
with open(self.json_store_path+".tmp", 'wb') as json_file:
json_file.write(orjson.dumps(data, option=orjson.OPT_INDENT_2))
else:
# Fallback to standard json module
with open(self.json_store_path+".tmp", 'w', encoding='utf-8') as json_file:
json.dump(data, json_file, indent=2, ensure_ascii=False)
os.replace(self.json_store_path+".tmp", self.json_store_path)
except Exception as e:
logger.error(f"Error writing JSON!! (Main JSON file save was skipped) : {str(e)}")
@@ -490,8 +525,13 @@ class ChangeDetectionStore:
# Load from external config file
if path.isfile(proxy_list_file):
with open(os.path.join(self.datastore_path, "proxies.json")) as f:
proxy_list = json.load(f)
if HAS_ORJSON:
# orjson.loads() expects UTF-8 encoded bytes #3611
with open(os.path.join(self.datastore_path, "proxies.json"), 'rb') as f:
proxy_list = orjson.loads(f.read())
else:
with open(os.path.join(self.datastore_path, "proxies.json"), encoding='utf-8') as f:
proxy_list = json.load(f)
# Mapping from UI config if available
extras = self.data['settings']['requests'].get('extra_proxies')
@@ -736,6 +776,28 @@ class ChangeDetectionStore:
return updates_available
def add_notification_url(self, notification_url):
logger.debug(f">>> Adding new notification_url - '{notification_url}'")
notification_urls = self.data['settings']['application'].get('notification_urls', [])
if notification_url in notification_urls:
return notification_url
with self.lock:
notification_urls = self.__data['settings']['application'].get('notification_urls', [])
if notification_url in notification_urls:
return notification_url
# Append and update the datastore
notification_urls.append(notification_url)
self.__data['settings']['application']['notification_urls'] = notification_urls
self.needs_write = True
return notification_url
# Run all updates
# IMPORTANT - Each update could be run even when they have a new install and the schema is correct
# So therefor - each `update_n` should be very careful about checking if it needs to actually run
@@ -748,7 +810,16 @@ class ChangeDetectionStore:
logger.critical(f"Applying update_{update_n}")
# Wont exist on fresh installs
if os.path.exists(self.json_store_path):
shutil.copyfile(self.json_store_path, os.path.join(self.datastore_path, f"url-watches-before-{update_n}.json"))
i = 0
while True:
i+=1
dest = os.path.join(self.datastore_path, f"url-watches-before-{update_n}-{i}.json")
if not os.path.exists(dest):
logger.debug(f"Copying url-watches.json DB to '{dest}' backup.")
shutil.copyfile(self.json_store_path, dest)
break
else:
logger.warning(f"Backup of url-watches.json '{dest}', DB already exists, trying {i+1}.. ")
try:
update_method = getattr(self, f"update_{update_n}")()
@@ -1039,25 +1110,15 @@ class ChangeDetectionStore:
formats['markdown'] = 'Markdown'
re_run(formats)
def add_notification_url(self, notification_url):
logger.debug(f">>> Adding new notification_url - '{notification_url}'")
notification_urls = self.data['settings']['application'].get('notification_urls', [])
if notification_url in notification_urls:
return notification_url
with self.lock:
notification_urls = self.__data['settings']['application'].get('notification_urls', [])
if notification_url in notification_urls:
return notification_url
# Append and update the datastore
notification_urls.append(notification_url)
self.__data['settings']['application']['notification_urls'] = notification_urls
self.needs_write = True
return notification_url
# RSS types should be inline with the same names as notification types
def update_24(self):
rss_format = self.data['settings']['application'].get('rss_content_format')
if not rss_format or 'text' in rss_format:
# might have been 'plaintext, 'plain text' or something
self.data['settings']['application']['rss_content_format'] = RSS_CONTENT_FORMAT_DEFAULT
elif 'html' in rss_format:
self.data['settings']['application']['rss_content_format'] = 'htmlcolor'
else:
# safe fallback to text
self.data['settings']['application']['rss_content_format'] = RSS_CONTENT_FORMAT_DEFAULT

View File

@@ -8,8 +8,13 @@
<meta name="robots" content="noindex">
<title>Change Detection{{extra_title}}</title>
{% if app_rss_token %}
<link rel="alternate" type="application/rss+xml" title="Changedetection.io » Feed{% if active_tag_uuid %}- {{active_tag.title}}{% endif %}" href="{{ url_for('rss.feed', tag=active_tag_uuid , token=app_rss_token)}}" >
{% endif %}
<link rel="alternate" type="application/rss+xml" title="Changedetection.io » Feed{% if active_tag_uuid %}- {{active_tag.title}}{% endif %}" href="{{ url_for('rss.feed', tag=active_tag_uuid, token=app_rss_token, _external=True )}}" >
{% if rss_uuid_feed %}
<link rel="alternate" type="application/rss+xml" title="Feed » {{ rss_uuid_feed['label'] }}" href="{{ rss_uuid_feed['url'] }}" >
{%- endif -%}
{%- endif -%}
<link rel="stylesheet" href="{{url_for('static_content', group='styles', filename='pure-min.css')}}" >
<link rel="stylesheet" href="{{url_for('static_content', group='styles', filename='styles.css')}}?v={{ get_css_version() }}" >
{% if extra_stylesheets %}

View File

@@ -19,18 +19,9 @@ def test_inscriptus():
def test_check_basic_change_detection_functionality(client, live_server, measure_memory_usage, datastore_path):
set_original_response(datastore_path=datastore_path)
# live_server_setup(live_server) # Setup on conftest per function
# Add our URL to the import page
res = client.post(
url_for("imports.import_page"),
data={"urls": url_for('test_endpoint', _external=True)},
follow_redirects=True
)
uuid = client.application.config.get('DATASTORE').add_watch(url=url_for('test_endpoint', _external=True))
assert b"1 Imported" in res.data
wait_for_all_checks(client)
# Do this a few times.. ensures we dont accidently set the status
for n in range(3):
@@ -86,10 +77,9 @@ def test_check_basic_change_detection_functionality(client, live_server, measure
assert b'<rss' in res.data
# re #16 should have the diff in here too
assert b'(into) which has this one new line' in res.data
assert b'which has this one new line' in res.data
assert b'CDATA' in res.data
assert expected_url.encode('utf-8') in res.data
#
# Following the 'diff' link, it should no longer display as 'has-unread-changes' even after we recheck it a few times
res = client.get(url_for("ui.ui_views.diff_history_page", uuid=uuid))
@@ -115,7 +105,6 @@ def test_check_basic_change_detection_functionality(client, live_server, measure
# It should report nothing found (no new 'has-unread-changes' class)
res = client.get(url_for("watchlist.index"))
assert b'has-unread-changes' not in res.data
assert b'class="has-unread-changes' not in res.data
assert b'head title' in res.data # Should be ON by default
@@ -129,23 +118,6 @@ def test_check_basic_change_detection_functionality(client, live_server, measure
res = client.get(url_for("watchlist.index"))
assert b'head title and more' in res.data
# disable <title> pickup
res = client.post(
url_for("settings.settings_page"),
data={"application-ui-use_page_title_in_list": "", "requests-time_between_check-minutes": 180,
'application-fetch_backend': "html_requests"},
follow_redirects=True
)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
res = client.get(url_for("watchlist.index"))
assert b'has-unread-changes' in res.data
assert b'class="has-unread-changes' in res.data
assert b'head title' not in res.data # should now be off
# Be sure the last_viewed is going to be greater than the last snapshot
time.sleep(1)
@@ -166,6 +138,63 @@ def test_check_basic_change_detection_functionality(client, live_server, measure
# Cleanup everything
delete_all_watches(client)
def test_title_scraper(client, live_server, measure_memory_usage, datastore_path):
set_original_response(datastore_path=datastore_path)
uuid = client.application.config.get('DATASTORE').add_watch(url=url_for('test_endpoint', _external=True))
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks()
# It should report nothing found (no new 'has-unread-changes' class)
res = client.get(url_for("watchlist.index"))
assert b'head title' in res.data # Should be ON by default
# Recheck it but only with a title change, content wasnt changed
set_original_response(datastore_path=datastore_path, extra_title=" and more")
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
res = client.get(url_for("watchlist.index"))
assert b'head title and more' in res.data
# disable <title> pickup
res = client.post(
url_for("settings.settings_page"),
data={"application-ui-use_page_title_in_list": "",
"requests-time_between_check-minutes": 180,
'application-fetch_backend': "html_requests"},
follow_redirects=True
)
set_original_response(datastore_path=datastore_path, extra_title=" SHOULD NOT APPEAR")
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
res = client.get(url_for("watchlist.index"))
assert b'SHOULD NOT APPEAR' not in res.data
delete_all_watches(client)
def test_title_scraper_html_only(client, live_server, measure_memory_usage, datastore_path):
with open(os.path.join(datastore_path, "endpoint-content.txt"), "w") as f:
f.write('"My text document\nWhere I talk about <title>\nwhich should not get registered\n</title>')
test_url = url_for('test_endpoint', content_type="text/plain", _external=True)
uuid = client.application.config.get('DATASTORE').add_watch(test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks()
# It should report nothing found (no new 'has-unread-changes' class)
res = client.get(url_for("watchlist.index"))
assert b'which should not get registered' not in res.data # Should be ON by default
assert not live_server.app.config['DATASTORE'].data['watching'][uuid].get('title')
# Server says its plaintext, we should always treat it as plaintext, and then if they have a filter, try to apply that
def test_requests_timeout(client, live_server, measure_memory_usage, datastore_path):

View File

@@ -40,7 +40,7 @@ def test_consistent_history(client, live_server, measure_memory_usage, datastore
json_db_file = os.path.join(live_server.app.config['DATASTORE'].datastore_path, 'url-watches.json')
json_obj = None
with open(json_db_file, 'r') as f:
with open(json_db_file, 'r', encoding='utf-8') as f:
json_obj = json.load(f)
# assert the right amount of watches was found in the JSON
@@ -76,7 +76,7 @@ def test_consistent_history(client, live_server, measure_memory_usage, datastore
assert len(files_in_watch_dir) == 3, "Should be just three files in the dir, html.br snapshot, history.txt and the extracted text snapshot"
json_db_file = os.path.join(live_server.app.config['DATASTORE'].datastore_path, 'url-watches.json')
with open(json_db_file, 'r') as f:
with open(json_db_file, 'r', encoding='utf-8') as f:
assert '"default"' not in f.read(), "'default' probably shouldnt be here, it came from when the 'default' Watch vars were accidently being saved"

View File

@@ -353,7 +353,7 @@ def check_json_ext_filter(json_filter, client, live_server, datastore_path):
watch = live_server.app.config['DATASTORE'].data['watching'][uuid]
dates = list(watch.history.keys())
snapshot_contents = watch.get_history_snapshot(dates[0])
snapshot_contents = watch.get_history_snapshot(timestamp=dates[0])
assert snapshot_contents[0] == '['
@@ -439,16 +439,15 @@ def test_correct_header_detect(client, live_server, measure_memory_usage, datast
watch = live_server.app.config['DATASTORE'].data['watching'][uuid]
dates = list(watch.history.keys())
snapshot_contents = watch.get_history_snapshot(dates[0])
snapshot_contents = watch.get_history_snapshot(timestamp=dates[0])
assert b'&#34;hello&#34;: 123,' in res.data # properly html escaped in the front end
import json
data = json.loads(snapshot_contents)
keys = list(data.keys())
# Should be correctly formatted and sorted, ("world" goes to end)
assert snapshot_contents == """{
"hello": 123,
"world": 123
}"""
assert keys == ["hello", "world"]
delete_all_watches(client)
def test_check_jsonpath_ext_filter(client, live_server, measure_memory_usage, datastore_path):

View File

@@ -302,15 +302,20 @@ def test_notification_urls_jinja2_apprise_integration(client, live_server, measu
data={
"application-fetch_backend": "html_requests",
"application-minutes_between_check": 180,
"application-notification_body": '{ "url" : "{{ watch_url }}", "secret": 444, "somebug": "网站监测 内容更新了" }',
"application-notification_body": '{ "url" : "{{ watch_url }}", "secret": 444, "somebug": "网站监测 内容更新了", "another": "{{diff|truncate(1500)}}" }',
"application-notification_format": default_notification_format,
"application-notification_urls": test_notification_url,
# https://github.com/caronc/apprise/wiki/Notify_Custom_JSON#get-parameter-manipulation
"application-notification_title": "New ChangeDetection.io Notification - {{ watch_url }} ",
"application-notification_title": "New ChangeDetection.io Notification - {{ watch_url }} {{diff|truncate(200)}} ",
},
follow_redirects=True
)
assert b'Settings updated' in res.data
assert '网站监测'.encode() in res.data
assert b'{{diff|truncate(1500)}}' in res.data
assert b'{{diff|truncate(200)}}' in res.data
def test_notification_custom_endpoint_and_jinja2(client, live_server, measure_memory_usage, datastore_path):

View File

@@ -22,7 +22,7 @@ def test_fetch_pdf(client, live_server, measure_memory_usage, datastore_path):
watch = live_server.app.config['DATASTORE'].data['watching'][uuid]
dates = list(watch.history.keys())
snapshot_contents = watch.get_history_snapshot(dates[0])
snapshot_contents = watch.get_history_snapshot(timestamp=dates[0])
# PDF header should not be there (it was converted to text)
assert 'PDF' not in snapshot_contents
@@ -75,7 +75,7 @@ def test_fetch_pdf(client, live_server, measure_memory_usage, datastore_path):
dates = list(watch.history.keys())
# new snapshot was also OK, no HTML
snapshot_contents = watch.get_history_snapshot(dates[1])
snapshot_contents = watch.get_history_snapshot(timestamp=dates[1])
assert 'html' not in snapshot_contents.lower()
assert f'Original file size - {os.path.getsize(os.path.join(datastore_path, "endpoint-test.pdf"))}' in snapshot_contents
assert f'here is a change' in snapshot_contents

View File

@@ -142,7 +142,7 @@ def test_body_in_request(client, live_server, measure_memory_usage, datastore_pa
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
watches_with_body = 0
with open(os.path.join(datastore_path, 'url-watches.json')) as f:
with open(os.path.join(datastore_path, 'url-watches.json'), encoding='utf-8') as f:
app_struct = json.load(f)
for uuid in app_struct['watching']:
if app_struct['watching'][uuid]['body']==body_value:
@@ -225,7 +225,7 @@ def test_method_in_request(client, live_server, measure_memory_usage, datastore_
wait_for_all_checks(client)
watches_with_method = 0
with open(os.path.join(datastore_path, 'url-watches.json')) as f:
with open(os.path.join(datastore_path, 'url-watches.json'), encoding='utf-8') as f:
app_struct = json.load(f)
for uuid in app_struct['watching']:
if app_struct['watching'][uuid]['method'] == 'PATCH':

View File

@@ -4,6 +4,8 @@ import time
from flask import url_for
from .util import set_original_response, set_modified_response, live_server_setup, wait_for_all_checks, extract_rss_token_from_UI, \
extract_UUID_from_client, delete_all_watches
from loguru import logger
from ..blueprint.rss import RSS_FORMAT_TYPES
def set_original_cdata_xml(datastore_path):
@@ -238,6 +240,114 @@ def test_rss_bad_chars_breaking(client, live_server, measure_memory_usage, datas
#assert live_server.app.config['DATASTORE'].data['watching'][uuid].history_n == 2
def test_rss_single_watch_feed(client, live_server, measure_memory_usage, datastore_path):
app_rss_token = live_server.app.config['DATASTORE'].data['settings']['application'].get('rss_access_token')
rss_content_format = live_server.app.config['DATASTORE'].data['settings']['application'].get('rss_content_format')
set_original_response(datastore_path=datastore_path)
test_url = url_for('test_endpoint', _external=True)
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
res = client.get(
url_for('rss.rss_single_watch', uuid=uuid, token=app_rss_token),
follow_redirects=False
)
assert res.status_code == 400
assert b'not have enough history' in res.data
set_modified_response(datastore_path=datastore_path)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
res = client.get(
url_for('rss.rss_single_watch', uuid=uuid, token=app_rss_token),
follow_redirects=False
)
assert res.status_code == 200
import xml.etree.ElementTree as ET
root = ET.fromstring(res.data)
def check_formatting(expected_type, content, url):
logger.debug(f"Checking formatting type {expected_type}")
if expected_type == 'text':
assert '<p>' not in content
assert 'body' not in content
assert '(changed) Which is across multiple lines\n'
assert 'modified head title had a change.' # Because it picked it up <title> as watch_title in default template
elif expected_type == 'html':
assert '<p>' in content
assert '<body>' in content
assert '<p>(changed) Which is across multiple lines<br>' in content
assert f'href="{url}">modified head title had a change.</a>'
elif expected_type == 'htmlcolor':
assert '<body>' in content
assert ' role="note" aria-label="Changed text" title="Changed text">Which is across multiple lines</span>' in content
assert f'href="{url}">modified head title had a change.</a>'
else:
raise Exception(f"Unknown type {expected_type}")
item = root.findall('.//item')[0].findtext('description')
check_formatting(expected_type=rss_content_format, content=item, url=test_url)
# Now the default one is over, lets try all the others
for k in list(RSS_FORMAT_TYPES.keys()):
res = client.post(
url_for("settings.settings_page"),
data={"application-rss_content_format": k},
follow_redirects=True
)
assert b'Settings updated' in res.data
res = client.get(
url_for('rss.rss_single_watch', uuid=uuid, token=app_rss_token),
follow_redirects=False
)
assert res.status_code == 200
root = ET.fromstring(res.data)
item = root.findall('.//item')[0].findtext('description')
check_formatting(expected_type=k, content=item, url=test_url)
# Test RSS entry order: Create multiple versions and verify newest appears first
for version in range(3, 6): # Create versions 3, 4, 5
set_html_content(datastore_path, f"Version {version} content")
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
time.sleep(0.5) # Small delay to ensure different timestamps
# Fetch RSS feed again to verify order
res = client.get(
url_for('rss.rss_single_watch', uuid=uuid, token=app_rss_token),
follow_redirects=False
)
assert res.status_code == 200
# Parse RSS and check order (newest first)
root = ET.fromstring(res.data)
items = root.findall('.//item')
assert len(items) >= 3, f"Expected at least 3 items, got {len(items)}"
# Get descriptions from first 3 items
descriptions = []
for item in items[:3]:
desc = item.findtext('description')
descriptions.append(desc if desc else "")
# First item should contain newest change (Version 5)
assert b"Version 5" in descriptions[0].encode() or "Version 5" in descriptions[0], \
f"First item should show newest change (Version 5), but got: {descriptions[0][:200]}"
# Second item should contain Version 4
assert b"Version 4" in descriptions[1].encode() or "Version 4" in descriptions[1], \
f"Second item should show Version 4, but got: {descriptions[1][:200]}"
# Third item should contain Version 3
assert b"Version 3" in descriptions[2].encode() or "Version 3" in descriptions[2], \
f"Third item should show Version 3, but got: {descriptions[2][:200]}"

View File

@@ -0,0 +1,254 @@
#!/usr/bin/env python3
import time
from flask import url_for
from .util import live_server_setup, wait_for_all_checks, extract_rss_token_from_UI, get_UUID_for_tag_name, delete_all_watches
import os
def set_original_response(datastore_path):
test_return_data = """<html>
<body>
Some initial text<br>
<p>Watch 1 content</p>
<p>Watch 2 content</p>
</body>
</html>
"""
with open(os.path.join(datastore_path, "endpoint-content.txt"), "w") as f:
f.write(test_return_data)
return None
def set_modified_response(datastore_path):
test_return_data = """<html>
<body>
Some initial text<br>
<p>Watch 1 content MODIFIED</p>
<p>Watch 2 content CHANGED</p>
</body>
</html>
"""
with open(os.path.join(datastore_path, "endpoint-content.txt"), "w") as f:
f.write(test_return_data)
return None
def test_rss_group(client, live_server, measure_memory_usage, datastore_path):
"""
Test that RSS feed for a specific tag/group shows only watches in that group
and displays changes correctly.
"""
set_original_response(datastore_path=datastore_path)
# Create a tag/group
res = client.post(
url_for("tags.form_tag_add"),
data={"name": "test-rss-group"},
follow_redirects=True
)
assert b"Tag added" in res.data
assert b"test-rss-group" in res.data
# Get the tag UUID
tag_uuid = get_UUID_for_tag_name(client, name="test-rss-group")
assert tag_uuid is not None
# Add first watch with the tag
test_url_1 = url_for('test_endpoint', _external=True) + "?watch=1"
res = client.post(
url_for("ui.ui_views.form_quick_watch_add"),
data={"url": test_url_1, "tags": 'test-rss-group'},
follow_redirects=True
)
assert b"Watch added" in res.data
# Add second watch with the tag
test_url_2 = url_for('test_endpoint', _external=True) + "?watch=2"
res = client.post(
url_for("ui.ui_views.form_quick_watch_add"),
data={"url": test_url_2, "tags": 'test-rss-group'},
follow_redirects=True
)
assert b"Watch added" in res.data
# Add a third watch WITHOUT the tag (should not appear in RSS)
test_url_3 = url_for('test_endpoint', _external=True) + "?watch=3"
res = client.post(
url_for("ui.ui_views.form_quick_watch_add"),
data={"url": test_url_3, "tags": 'other-tag'},
follow_redirects=True
)
assert b"Watch added" in res.data
# Wait for initial checks to complete
wait_for_all_checks(client)
# Trigger a change
set_modified_response(datastore_path=datastore_path)
# Recheck all watches
res = client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
# Get RSS token
rss_token = extract_rss_token_from_UI(client)
assert rss_token is not None
# Request RSS feed for the specific tag/group using the new endpoint
res = client.get(
url_for("rss.rss_tag_feed", tag_uuid=tag_uuid, token=rss_token, _external=True),
follow_redirects=True
)
# Verify response is successful
assert res.status_code == 200
assert b"<?xml" in res.data or b"<rss" in res.data
# Verify the RSS feed contains the tag name in the title
assert b"test-rss-group" in res.data
# Verify watch 1 and watch 2 are in the RSS feed (they have the tag)
assert b"watch=1" in res.data
assert b"watch=2" in res.data
# Verify watch 3 is NOT in the RSS feed (it doesn't have the tag)
assert b"watch=3" not in res.data
# Verify the changes are shown in the RSS feed
assert b"MODIFIED" in res.data or b"CHANGED" in res.data
# Verify it's actual RSS/XML format
assert b"<rss" in res.data or b"<feed" in res.data
# Test with invalid tag UUID - should return 404
res = client.get(
url_for("rss.rss_tag_feed", tag_uuid="invalid-uuid-12345", token=rss_token, _external=True),
follow_redirects=True
)
assert res.status_code == 404
assert b"not found" in res.data
# Test with invalid token - should return 403
res = client.get(
url_for("rss.rss_tag_feed", tag_uuid=tag_uuid, token="wrong-token", _external=True),
follow_redirects=True
)
assert res.status_code == 403
assert b"Access denied" in res.data
# Clean up
delete_all_watches(client)
res = client.get(url_for("tags.delete_all"), follow_redirects=True)
assert b'All tags deleted' in res.data
def test_rss_group_empty_tag(client, live_server, measure_memory_usage, datastore_path):
"""
Test that RSS feed for a tag with no watches returns valid but empty RSS.
"""
# Create a tag with no watches
res = client.post(
url_for("tags.form_tag_add"),
data={"name": "empty-tag"},
follow_redirects=True
)
assert b"Tag added" in res.data
tag_uuid = get_UUID_for_tag_name(client, name="empty-tag")
assert tag_uuid is not None
# Get RSS token
rss_token = extract_rss_token_from_UI(client)
# Request RSS feed for empty tag
res = client.get(
url_for("rss.rss_tag_feed", tag_uuid=tag_uuid, token=rss_token, _external=True),
follow_redirects=True
)
# Should still return 200 with valid RSS
assert res.status_code == 200
assert b"<?xml" in res.data or b"<rss" in res.data
assert b"empty-tag" in res.data
# Clean up
res = client.get(url_for("tags.delete_all"), follow_redirects=True)
assert b'All tags deleted' in res.data
def test_rss_group_only_unviewed(client, live_server, measure_memory_usage, datastore_path):
"""
Test that RSS feed for a tag only shows unviewed watches.
"""
set_original_response(datastore_path=datastore_path)
# Create a tag
res = client.post(
url_for("tags.form_tag_add"),
data={"name": "unviewed-test"},
follow_redirects=True
)
assert b"Tag added" in res.data
tag_uuid = get_UUID_for_tag_name(client, name="unviewed-test")
# Add two watches with the tag
test_url_1 = url_for('test_endpoint', _external=True) + "?unviewed=1"
res = client.post(
url_for("ui.ui_views.form_quick_watch_add"),
data={"url": test_url_1, "tags": 'unviewed-test'},
follow_redirects=True
)
assert b"Watch added" in res.data
test_url_2 = url_for('test_endpoint', _external=True) + "?unviewed=2"
res = client.post(
url_for("ui.ui_views.form_quick_watch_add"),
data={"url": test_url_2, "tags": 'unviewed-test'},
follow_redirects=True
)
assert b"Watch added" in res.data
wait_for_all_checks(client)
# Trigger changes
set_modified_response(datastore_path=datastore_path)
res = client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
# Get RSS token
rss_token = extract_rss_token_from_UI(client)
# Request RSS feed - should show both watches (both unviewed)
res = client.get(
url_for("rss.rss_tag_feed", tag_uuid=tag_uuid, token=rss_token, _external=True),
follow_redirects=True
)
assert res.status_code == 200
assert b"unviewed=1" in res.data
assert b"unviewed=2" in res.data
# Mark all as viewed
res = client.get(url_for('ui.mark_all_viewed'), follow_redirects=True)
wait_for_all_checks(client)
# Request RSS feed again - should be empty now (no unviewed watches)
res = client.get(
url_for("rss.rss_tag_feed", tag_uuid=tag_uuid, token=rss_token, _external=True),
follow_redirects=True
)
assert res.status_code == 200
# Should not contain the watch URLs anymore since they're viewed
assert b"unviewed=1" not in res.data
assert b"unviewed=2" not in res.data
# Clean up
delete_all_watches(client)
res = client.get(url_for("tags.delete_all"), follow_redirects=True)
assert b'All tags deleted' in res.data

View File

@@ -65,7 +65,7 @@ def test_rss_reader_mode(client, live_server, measure_memory_usage, datastore_pa
watch = live_server.app.config['DATASTORE'].data['watching'][uuid]
dates = list(watch.history.keys())
snapshot_contents = watch.get_history_snapshot(dates[0])
snapshot_contents = watch.get_history_snapshot(timestamp=dates[0])
assert 'Wet noodles escape' in snapshot_contents
assert '<br>' not in snapshot_contents
assert '&lt;' not in snapshot_contents
@@ -91,7 +91,7 @@ def test_rss_reader_mode_with_css_filters(client, live_server, measure_memory_us
watch = live_server.app.config['DATASTORE'].data['watching'][uuid]
dates = list(watch.history.keys())
snapshot_contents = watch.get_history_snapshot(dates[0])
snapshot_contents = watch.get_history_snapshot(timestamp=dates[0])
assert 'Wet noodles escape' not in snapshot_contents
assert '<br>' not in snapshot_contents
assert '&lt;' not in snapshot_contents

View File

@@ -0,0 +1,245 @@
#!/usr/bin/env python3
import time
import os
import xml.etree.ElementTree as ET
from flask import url_for
from .util import live_server_setup, wait_for_all_checks, extract_rss_token_from_UI, extract_UUID_from_client, delete_all_watches
def test_rss_single_watch_order(client, live_server, measure_memory_usage, datastore_path):
"""
Test that single watch RSS feed shows changes in correct order (newest first).
"""
# Create initial content
def set_response(datastore_path, version):
test_return_data = f"""<html>
<body>
<p>Version {version} content</p>
</body>
</html>
"""
with open(os.path.join(datastore_path, "endpoint-content.txt"), "w") as f:
f.write(test_return_data)
# Start with version 1
set_response(datastore_path, 1)
# Add a watch
test_url = url_for('test_endpoint', _external=True) + "?order_test=1"
res = client.post(
url_for("ui.ui_views.form_quick_watch_add"),
data={"url": test_url, "tags": 'test-tag'},
follow_redirects=True
)
assert b"Watch added" in res.data
# Get the watch UUID
watch_uuid = extract_UUID_from_client(client)
# Wait for initial check
wait_for_all_checks(client)
# Create multiple versions by triggering changes
for version in range(2, 6): # Create versions 2, 3, 4, 5
set_response(datastore_path, version)
res = client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
time.sleep(0.5) # Small delay to ensure different timestamps
# Get RSS token
rss_token = extract_rss_token_from_UI(client)
# Request RSS feed for the single watch
res = client.get(
url_for("rss.rss_single_watch", uuid=watch_uuid, token=rss_token, _external=True),
follow_redirects=True
)
# Should return valid RSS
assert res.status_code == 200
assert b"<?xml" in res.data or b"<rss" in res.data
# Parse the RSS/XML
root = ET.fromstring(res.data)
# Find all items (RSS 2.0) or entries (Atom)
items = root.findall('.//item')
if not items:
items = root.findall('.//{http://www.w3.org/2005/Atom}entry')
# Should have multiple items
assert len(items) >= 3, f"Expected at least 3 items, got {len(items)}"
# Get the descriptions/content from first 3 items
descriptions = []
for item in items[:3]:
# Try RSS format first
desc = item.findtext('description')
if not desc:
# Try Atom format
content_elem = item.find('{http://www.w3.org/2005/Atom}content')
if content_elem is not None:
desc = content_elem.text
descriptions.append(desc if desc else "")
print(f"First item content: {descriptions[0][:100] if descriptions[0] else 'None'}")
print(f"Second item content: {descriptions[1][:100] if descriptions[1] else 'None'}")
print(f"Third item content: {descriptions[2][:100] if descriptions[2] else 'None'}")
# The FIRST item should contain the NEWEST change (Version 5)
# The SECOND item should contain Version 4
# The THIRD item should contain Version 3
assert b"Version 5" in descriptions[0].encode() or "Version 5" in descriptions[0], \
f"First item should show newest change (Version 5), but got: {descriptions[0][:200]}"
# Verify the order is correct
assert b"Version 4" in descriptions[1].encode() or "Version 4" in descriptions[1], \
f"Second item should show Version 4, but got: {descriptions[1][:200]}"
assert b"Version 3" in descriptions[2].encode() or "Version 3" in descriptions[2], \
f"Third item should show Version 3, but got: {descriptions[2][:200]}"
# Clean up
delete_all_watches(client)
def test_rss_categories_from_tags(client, live_server, measure_memory_usage, datastore_path):
"""
Test that RSS feeds include category tags from watch tags.
"""
# Create initial content
test_return_data = """<html>
<body>
<p>Test content for RSS categories</p>
</body>
</html>
"""
with open(os.path.join(datastore_path, "endpoint-content.txt"), "w") as f:
f.write(test_return_data)
# Create some tags first
res = client.post(
url_for("tags.form_tag_add"),
data={"name": "Security"},
follow_redirects=True
)
res = client.post(
url_for("tags.form_tag_add"),
data={"name": "Python"},
follow_redirects=True
)
res = client.post(
url_for("tags.form_tag_add"),
data={"name": "Tech News"},
follow_redirects=True
)
# Add a watch with tags
test_url = url_for('test_endpoint', _external=True) + "?category_test=1"
res = client.post(
url_for("ui.ui_views.form_quick_watch_add"),
data={"url": test_url, "tags": "Security, Python, Tech News"},
follow_redirects=True
)
assert b"Watch added" in res.data
# Get the watch UUID
watch_uuid = extract_UUID_from_client(client)
# Wait for initial check
wait_for_all_checks(client)
# Trigger one change
test_return_data_v2 = """<html>
<body>
<p>Updated content for RSS categories</p>
</body>
</html>
"""
with open(os.path.join(datastore_path, "endpoint-content.txt"), "w") as f:
f.write(test_return_data_v2)
res = client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
# Get RSS token
rss_token = extract_rss_token_from_UI(client)
# Test 1: Check single watch RSS feed
res = client.get(
url_for("rss.rss_single_watch", uuid=watch_uuid, token=rss_token, _external=True),
follow_redirects=True
)
assert res.status_code == 200
assert b"<?xml" in res.data or b"<rss" in res.data
# Parse the RSS/XML
root = ET.fromstring(res.data)
# Find all items
items = root.findall('.//item')
assert len(items) >= 1, "Expected at least 1 item in RSS feed"
# Get categories from first item
categories = [cat.text for cat in items[0].findall('category')]
print(f"Found categories in single watch RSS: {categories}")
# Should have all three categories
assert "Security" in categories, f"Expected 'Security' category, got: {categories}"
assert "Python" in categories, f"Expected 'Python' category, got: {categories}"
assert "Tech News" in categories, f"Expected 'Tech News' category, got: {categories}"
assert len(categories) == 3, f"Expected 3 categories, got {len(categories)}: {categories}"
# Test 2: Check main RSS feed
res = client.get(
url_for("rss.feed", token=rss_token, _external=True),
follow_redirects=True
)
assert res.status_code == 200
root = ET.fromstring(res.data)
items = root.findall('.//item')
assert len(items) >= 1, "Expected at least 1 item in main RSS feed"
# Get categories from first item in main feed
categories = [cat.text for cat in items[0].findall('category')]
print(f"Found categories in main RSS feed: {categories}")
# Should have all three categories
assert "Security" in categories, f"Expected 'Security' category in main feed, got: {categories}"
assert "Python" in categories, f"Expected 'Python' category in main feed, got: {categories}"
assert "Tech News" in categories, f"Expected 'Tech News' category in main feed, got: {categories}"
# Test 3: Check tag-specific RSS feed (should also have categories)
# Get the tag UUID for "Security" and verify the tag feed also has categories
from .util import get_UUID_for_tag_name
security_tag_uuid = get_UUID_for_tag_name(client, name="Security")
if security_tag_uuid:
res = client.get(
url_for("rss.rss_tag_feed", tag_uuid=security_tag_uuid, token=rss_token, _external=True),
follow_redirects=True
)
assert res.status_code == 200
root = ET.fromstring(res.data)
items = root.findall('.//item')
if len(items) >= 1:
categories = [cat.text for cat in items[0].findall('category')]
print(f"Found categories in tag RSS feed: {categories}")
# Should still have all three categories
assert "Security" in categories, f"Expected 'Security' category in tag feed, got: {categories}"
assert "Python" in categories, f"Expected 'Python' category in tag feed, got: {categories}"
assert "Tech News" in categories, f"Expected 'Tech News' category in tag feed, got: {categories}"
# Clean up
delete_all_watches(client)

View File

@@ -55,8 +55,8 @@ class TestTriggerConditions(unittest.TestCase):
self.assertEqual(len(history), 2)
# Retrieve and check snapshots
#snapshot1 = watch.get_history_snapshot(str(timestamp1))
#snapshot2 = watch.get_history_snapshot(str(timestamp2))
#snapshot1 = watch.get_history_snapshot(timestamp=str(timestamp1))
#snapshot2 = watch.get_history_snapshot(timestamp=str(timestamp2))
self.store.data['watching'][self.watch_uuid].update(
{

View File

@@ -33,6 +33,9 @@ chardet>2.3.0
wtforms~=3.2
jsonpath-ng~=1.7.0
# Fast JSON serialization for better performance
orjson~=3.11
# dnspython - Used by paho-mqtt for MQTT broker resolution
# Version pin removed since eventlet (which required the specific 2.6.1 pin) has been eliminated
# paho-mqtt will install compatible dnspython version automatically