mirror of
				https://github.com/dgtlmoon/changedetection.io.git
				synced 2025-10-31 14:47:21 +00:00 
			
		
		
		
	Compare commits
	
		
			29 Commits
		
	
	
		
			0.50.29
			...
			small-func
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
|   | 5fc37dfff4 | ||
|   | 2116b2cb93 | ||
|   | acee89c0a9 | ||
|   | 552e98519b | ||
|   | 8f580ac96b | ||
|   | a8cadc3d16 | ||
|   | c9290d73e0 | ||
|   | 2db5e906e9 | ||
|   | 0751bd371a | ||
| ![dependabot[bot]](/assets/img/avatar_default.png)  | 3ffa0805e9 | ||
| ![dependabot[bot]](/assets/img/avatar_default.png)  | 3335270692 | ||
| ![dependabot[bot]](/assets/img/avatar_default.png)  | a7573b10ec | ||
| ![dependabot[bot]](/assets/img/avatar_default.png)  | df945ad743 | ||
| ![dependabot[bot]](/assets/img/avatar_default.png)  | 4536e95205 | ||
|   | 1479d7bd46 | ||
|   | 9ba2094f75 | ||
|   | 8aa012ba8e | ||
|   | 8bc6b10db1 | ||
|   | 76d799c95b | ||
|   | 7c8bdfcc9f | ||
|   | 01a938d7ce | ||
|   | e44853c439 | ||
|   | 3830bec891 | ||
|   | 88ab663330 | ||
|   | 68335b95c3 | ||
|   | 7bbfa0ef32 | ||
|   | e233d52931 | ||
|   | 181d32e82a | ||
|   | a51614f83d | 
							
								
								
									
										6
									
								
								.github/workflows/pypi-release.yml
									
									
									
									
										vendored
									
									
								
							
							
						
						
									
										6
									
								
								.github/workflows/pypi-release.yml
									
									
									
									
										vendored
									
									
								
							| @@ -21,7 +21,7 @@ jobs: | ||||
|     - name: Build a binary wheel and a source tarball | ||||
|       run: python3 -m build | ||||
|     - name: Store the distribution packages | ||||
|       uses: actions/upload-artifact@v4 | ||||
|       uses: actions/upload-artifact@v5 | ||||
|       with: | ||||
|         name: python-package-distributions | ||||
|         path: dist/ | ||||
| @@ -34,7 +34,7 @@ jobs: | ||||
|     - build | ||||
|     steps: | ||||
|     - name: Download all the dists | ||||
|       uses: actions/download-artifact@v5 | ||||
|       uses: actions/download-artifact@v6 | ||||
|       with: | ||||
|         name: python-package-distributions | ||||
|         path: dist/ | ||||
| @@ -93,7 +93,7 @@ jobs: | ||||
|  | ||||
|     steps: | ||||
|     - name: Download all the dists | ||||
|       uses: actions/download-artifact@v5 | ||||
|       uses: actions/download-artifact@v6 | ||||
|       with: | ||||
|         name: python-package-distributions | ||||
|         path: dist/ | ||||
|   | ||||
| @@ -282,7 +282,7 @@ jobs: | ||||
|  | ||||
|       - name: Store everything including test-datastore | ||||
|         if: always() | ||||
|         uses: actions/upload-artifact@v4 | ||||
|         uses: actions/upload-artifact@v5 | ||||
|         with: | ||||
|           name: test-cdio-basic-tests-output-py${{ env.PYTHON_VERSION }} | ||||
|           path: . | ||||
|   | ||||
| @@ -2,7 +2,7 @@ | ||||
|  | ||||
| # Read more https://github.com/dgtlmoon/changedetection.io/wiki | ||||
|  | ||||
| __version__ = '0.50.29' | ||||
| __version__ = '0.50.33' | ||||
|  | ||||
| from changedetectionio.strtobool import strtobool | ||||
| from json.decoder import JSONDecodeError | ||||
|   | ||||
| @@ -3,15 +3,30 @@ from changedetectionio.strtobool import strtobool | ||||
| from flask_restful import abort, Resource | ||||
| from flask import request | ||||
| import validators | ||||
| from functools import wraps | ||||
| from . import auth, validate_openapi_request | ||||
|  | ||||
|  | ||||
| def default_content_type(content_type='text/plain'): | ||||
|     """Decorator to set a default Content-Type header if none is provided.""" | ||||
|     def decorator(f): | ||||
|         @wraps(f) | ||||
|         def wrapper(*args, **kwargs): | ||||
|             if not request.content_type: | ||||
|                 # Set default content type in the request environment | ||||
|                 request.environ['CONTENT_TYPE'] = content_type | ||||
|             return f(*args, **kwargs) | ||||
|         return wrapper | ||||
|     return decorator | ||||
|  | ||||
|  | ||||
| class Import(Resource): | ||||
|     def __init__(self, **kwargs): | ||||
|         # datastore is a black box dependency | ||||
|         self.datastore = kwargs['datastore'] | ||||
|  | ||||
|     @auth.check_token | ||||
|     @default_content_type('text/plain') #3547 #3542 | ||||
|     @validate_openapi_request('importWatches') | ||||
|     def post(self): | ||||
|         """Import a list of watched URLs.""" | ||||
|   | ||||
| @@ -1,5 +1,7 @@ | ||||
| import os | ||||
|  | ||||
| from changedetectionio.strtobool import strtobool | ||||
| from changedetectionio.html_tools import is_safe_url | ||||
|  | ||||
| from flask_expects_json import expects_json | ||||
| from changedetectionio import queuedWatchMetaData | ||||
| @@ -121,6 +123,10 @@ class Watch(Resource): | ||||
|         if validation_error: | ||||
|             return validation_error, 400 | ||||
|  | ||||
|         # XSS etc protection | ||||
|         if request.json.get('url') and not is_safe_url(request.json.get('url')): | ||||
|             return "Invalid URL", 400 | ||||
|  | ||||
|         watch.update(request.json) | ||||
|  | ||||
|         return "OK", 200 | ||||
|   | ||||
| @@ -240,9 +240,7 @@ nav | ||||
|                     <p> | ||||
|                        {{ render_field(form.application.form.scheduler_timezone_default) }} | ||||
|                         <datalist id="timezones" style="display: none;"> | ||||
|                             {% for tz_name in available_timezones %} | ||||
|                                 <option value="{{ tz_name }}">{{ tz_name }}</option> | ||||
|                             {% endfor %} | ||||
|                             {%- for timezone in available_timezones -%}<option value="{{ timezone }}">{{ timezone }}</option>{%- endfor -%} | ||||
|                         </datalist> | ||||
|                     </p> | ||||
|                 </div> | ||||
|   | ||||
| @@ -76,14 +76,14 @@ def _handle_operations(op, uuids, datastore, worker_handler, update_q, queuedWat | ||||
|  | ||||
|     elif (op == 'notification-default'): | ||||
|         from changedetectionio.notification import ( | ||||
|             default_notification_format_for_watch | ||||
|             USE_SYSTEM_DEFAULT_NOTIFICATION_FORMAT_FOR_WATCH | ||||
|         ) | ||||
|         for uuid in uuids: | ||||
|             if datastore.data['watching'].get(uuid): | ||||
|                 datastore.data['watching'][uuid]['notification_title'] = None | ||||
|                 datastore.data['watching'][uuid]['notification_body'] = None | ||||
|                 datastore.data['watching'][uuid]['notification_urls'] = [] | ||||
|                 datastore.data['watching'][uuid]['notification_format'] = default_notification_format_for_watch | ||||
|                 datastore.data['watching'][uuid]['notification_format'] = USE_SYSTEM_DEFAULT_NOTIFICATION_FORMAT_FOR_WATCH | ||||
|         if emit_flash: | ||||
|             flash(f"{len(uuids)} watches set to use default notification settings") | ||||
|  | ||||
|   | ||||
| @@ -75,7 +75,6 @@ class Fetcher(): | ||||
|         self.screenshot = None | ||||
|         self.xpath_data = None | ||||
|         # Keep headers and status_code as they're small | ||||
|         logger.trace("Fetcher content cleared from memory") | ||||
|  | ||||
|     @abstractmethod | ||||
|     def get_error(self): | ||||
|   | ||||
| @@ -1,21 +1,32 @@ | ||||
| import difflib | ||||
| from typing import List, Iterator, Union | ||||
|  | ||||
| HTML_REMOVED_STYLE = "background-color: #fadad7; color: #b30000;" | ||||
| HTML_ADDED_STYLE = "background-color: #eaf2c2; color: #406619;" | ||||
| # https://github.com/dgtlmoon/changedetection.io/issues/821#issuecomment-1241837050 | ||||
| #HTML_ADDED_STYLE = "background-color: #d2f7c2; color: #255d00;" | ||||
| #HTML_CHANGED_INTO_STYLE = "background-color: #dafbe1; color: #116329;" | ||||
| #HTML_CHANGED_STYLE = "background-color: #ffd6cc; color: #7a2000;" | ||||
| #HTML_REMOVED_STYLE = "background-color: #ffebe9; color: #82071e;" | ||||
|  | ||||
| # @todo - In the future we can make this configurable | ||||
| HTML_ADDED_STYLE = "background-color: #eaf2c2; color: #406619" | ||||
| HTML_REMOVED_STYLE = "background-color: #fadad7; color: #b30000" | ||||
| HTML_CHANGED_STYLE = HTML_REMOVED_STYLE | ||||
| HTML_CHANGED_INTO_STYLE = HTML_ADDED_STYLE | ||||
|  | ||||
|  | ||||
| # These get set to html or telegram type or discord compatible or whatever in handler.py | ||||
| REMOVED_PLACEMARKER_OPEN = '<<<removed_PLACEMARKER_OPEN' | ||||
| REMOVED_PLACEMARKER_CLOSED = '<<<removed_PLACEMARKER_CLOSED' | ||||
| # Something that cant get escaped to HTML by accident | ||||
| REMOVED_PLACEMARKER_OPEN = '@removed_PLACEMARKER_OPEN' | ||||
| REMOVED_PLACEMARKER_CLOSED = '@removed_PLACEMARKER_CLOSED' | ||||
|  | ||||
| ADDED_PLACEMARKER_OPEN = '<<<added_PLACEMARKER_OPEN' | ||||
| ADDED_PLACEMARKER_CLOSED = '<<<added_PLACEMARKER_CLOSED' | ||||
| ADDED_PLACEMARKER_OPEN = '@added_PLACEMARKER_OPEN' | ||||
| ADDED_PLACEMARKER_CLOSED = '@added_PLACEMARKER_CLOSED' | ||||
|  | ||||
| CHANGED_PLACEMARKER_OPEN = '<<<changed_PLACEMARKER_OPEN' | ||||
| CHANGED_PLACEMARKER_CLOSED = '<<<changed_PLACEMARKER_CLOSED' | ||||
| CHANGED_PLACEMARKER_OPEN = '@changed_PLACEMARKER_OPEN' | ||||
| CHANGED_PLACEMARKER_CLOSED = '@changed_PLACEMARKER_CLOSED' | ||||
|  | ||||
| CHANGED_INTO_PLACEMARKER_OPEN = '<<<changed_into_PLACEMARKER_OPEN' | ||||
| CHANGED_INTO_PLACEMARKER_CLOSED = '<<<changed_into_PLACEMARKER_CLOSED' | ||||
| CHANGED_INTO_PLACEMARKER_OPEN = '@changed_into_PLACEMARKER_OPEN' | ||||
| CHANGED_INTO_PLACEMARKER_CLOSED = '@changed_into_PLACEMARKER_CLOSED' | ||||
|  | ||||
| def same_slicer(lst: List[str], start: int, end: int) -> List[str]: | ||||
|     """Return a slice of the list, or a single element if start == end.""" | ||||
|   | ||||
| @@ -133,6 +133,11 @@ def get_socketio_path(): | ||||
|     # Socket.IO will be available at {prefix}/socket.io/ | ||||
|     return prefix | ||||
|  | ||||
| @app.template_global('is_safe_url') | ||||
| def _is_safe_url(test_url): | ||||
|     from .html_tools import is_safe_url | ||||
|     return is_safe_url(test_url) | ||||
|  | ||||
|  | ||||
| @app.template_filter('format_number_locale') | ||||
| def _jinja2_filter_format_number_locale(value: float) -> str: | ||||
|   | ||||
| @@ -550,7 +550,7 @@ def validate_url(test_url): | ||||
|         # This should be wtforms.validators. | ||||
|         raise ValidationError(message) | ||||
|  | ||||
|     from .model.Watch import is_safe_url | ||||
|     from changedetectionio.html_tools import is_safe_url | ||||
|     if not is_safe_url(test_url): | ||||
|         # This should be wtforms.validators. | ||||
|         raise ValidationError('Watch protocol is not permitted by SAFE_PROTOCOL_REGEX or incorrect URL format') | ||||
| @@ -741,7 +741,6 @@ class quickWatchForm(Form): | ||||
|     edit_and_watch_submit_button = SubmitField('Edit > Watch', render_kw={"class": "pure-button pure-button-primary"}) | ||||
|  | ||||
|  | ||||
|  | ||||
| # Common to a single watch and the global settings | ||||
| class commonSettingsForm(Form): | ||||
|     from . import processors | ||||
| @@ -754,7 +753,7 @@ class commonSettingsForm(Form): | ||||
|  | ||||
|     fetch_backend = RadioField(u'Fetch Method', choices=content_fetchers.available_fetchers(), validators=[ValidateContentFetcherIsReady()]) | ||||
|     notification_body = TextAreaField('Notification Body', default='{{ watch_url }} had a change.', validators=[validators.Optional(), ValidateJinja2Template()]) | ||||
|     notification_format = SelectField('Notification format', choices=valid_notification_formats.keys()) | ||||
|     notification_format = SelectField('Notification format', choices=list(valid_notification_formats.items())) | ||||
|     notification_title = StringField('Notification Title', default='ChangeDetection.io Notification - {{ watch_url }}', validators=[validators.Optional(), ValidateJinja2Template()]) | ||||
|     notification_urls = StringListField('Notification URL List', validators=[validators.Optional(), ValidateAppRiseServers(), ValidateJinja2Template()]) | ||||
|     processor = RadioField( label=u"Processor - What do you want to achieve?", choices=processors.available_processors(), default="text_json_diff") | ||||
|   | ||||
| @@ -1,3 +1,5 @@ | ||||
| from functools import lru_cache | ||||
|  | ||||
| from loguru import logger | ||||
| from typing import List | ||||
| import html | ||||
| @@ -13,6 +15,7 @@ TITLE_RE = re.compile(r"<title[^>]*>(.*?)</title>", re.I | re.S) | ||||
| META_CS  = re.compile(r'<meta[^>]+charset=["\']?\s*([a-z0-9_\-:+.]+)', re.I) | ||||
| META_CT  = re.compile(r'<meta[^>]+http-equiv=["\']?content-type["\']?[^>]*content=["\'][^>]*charset=([a-z0-9_\-:+.]+)', re.I) | ||||
|  | ||||
| SAFE_PROTOCOL_REGEX='^(http|https|ftp|file):' | ||||
|  | ||||
| # 'price' , 'lowPrice', 'highPrice' are usually under here | ||||
| # All of those may or may not appear on different websites - I didnt find a way todo case-insensitive searching here | ||||
| @@ -22,9 +25,25 @@ class JSONNotFound(ValueError): | ||||
|     def __init__(self, msg): | ||||
|         ValueError.__init__(self, msg) | ||||
|  | ||||
| def is_safe_url(test_url): | ||||
|     import os | ||||
|     # See https://github.com/dgtlmoon/changedetection.io/issues/1358 | ||||
|  | ||||
|     # Remove 'source:' prefix so we dont get 'source:javascript:' etc | ||||
|     # 'source:' is a valid way to tell us to return the source | ||||
|  | ||||
|     r = re.compile(re.escape('source:'), re.IGNORECASE) | ||||
|     test_url = r.sub('', test_url) | ||||
|  | ||||
|     pattern = re.compile(os.getenv('SAFE_PROTOCOL_REGEX', SAFE_PROTOCOL_REGEX), re.IGNORECASE) | ||||
|     if not pattern.match(test_url.strip()): | ||||
|         return False | ||||
|  | ||||
|     return True | ||||
|  | ||||
| # Doesn't look like python supports forward slash auto enclosure in re.findall | ||||
| # So convert it to inline flag "(?i)foobar" type configuration | ||||
| @lru_cache(maxsize=100) | ||||
| def perl_style_slash_enclosed_regex_to_options(regex): | ||||
|  | ||||
|     res = re.search(PERL_STYLE_REGEX, regex, re.IGNORECASE) | ||||
| @@ -185,8 +204,21 @@ def xpath_filter(xpath_filter, html_content, append_pretty_line_formatting=False | ||||
|     tree = html.fromstring(bytes(html_content, encoding='utf-8'), parser=parser) | ||||
|     html_block = "" | ||||
|  | ||||
|     r = elementpath.select(tree, xpath_filter.strip(), namespaces={'re': 'http://exslt.org/regular-expressions'}, parser=XPath3Parser) | ||||
|     #@note: //title/text() wont work where <title>CDATA.. | ||||
|     # Build namespace map for XPath queries | ||||
|     namespaces = {'re': 'http://exslt.org/regular-expressions'} | ||||
|  | ||||
|     # Handle default namespace in documents (common in RSS/Atom feeds, but can occur in any XML) | ||||
|     # XPath spec: unprefixed element names have no namespace, not the default namespace | ||||
|     # Solution: Register the default namespace with empty string prefix in elementpath | ||||
|     # This is primarily for RSS/Atom feeds but works for any XML with default namespace | ||||
|     if hasattr(tree, 'nsmap') and tree.nsmap and None in tree.nsmap: | ||||
|         # Register the default namespace with empty string prefix for elementpath | ||||
|         # This allows //title to match elements in the default namespace | ||||
|         namespaces[''] = tree.nsmap[None] | ||||
|  | ||||
|     r = elementpath.select(tree, xpath_filter.strip(), namespaces=namespaces, parser=XPath3Parser) | ||||
|     #@note: //title/text() now works with default namespaces (fixed by registering '' prefix) | ||||
|     #@note: //title/text() wont work where <title>CDATA.. (use cdata_in_document_to_text first) | ||||
|  | ||||
|     if type(r) != list: | ||||
|         r = [r] | ||||
| @@ -221,8 +253,19 @@ def xpath1_filter(xpath_filter, html_content, append_pretty_line_formatting=Fals | ||||
|     tree = html.fromstring(bytes(html_content, encoding='utf-8'), parser=parser) | ||||
|     html_block = "" | ||||
|  | ||||
|     r = tree.xpath(xpath_filter.strip(), namespaces={'re': 'http://exslt.org/regular-expressions'}) | ||||
|     #@note: //title/text() wont work where <title>CDATA.. | ||||
|     # Build namespace map for XPath queries | ||||
|     namespaces = {'re': 'http://exslt.org/regular-expressions'} | ||||
|  | ||||
|     # NOTE: lxml's native xpath() does NOT support empty string prefix for default namespace | ||||
|     # For documents with default namespace (RSS/Atom feeds), users must use: | ||||
|     #   - local-name(): //*[local-name()='title']/text() | ||||
|     #   - Or use xpath_filter (not xpath1_filter) which supports default namespaces | ||||
|     # XPath spec: unprefixed element names have no namespace, not the default namespace | ||||
|  | ||||
|     r = tree.xpath(xpath_filter.strip(), namespaces=namespaces) | ||||
|     #@note: xpath1 (lxml) does NOT automatically handle default namespaces | ||||
|     #@note: Use //*[local-name()='element'] or switch to xpath_filter for default namespace support | ||||
|     #@note: //title/text() wont work where <title>CDATA.. (use cdata_in_document_to_text first) | ||||
|  | ||||
|     for element in r: | ||||
|         # When there's more than 1 match, then add the suffix to separate each line | ||||
|   | ||||
| @@ -9,6 +9,7 @@ from .safe_jinja import ( | ||||
|     JINJA2_MAX_RETURN_PAYLOAD_SIZE, | ||||
|     DEFAULT_JINJA2_EXTENSIONS, | ||||
| ) | ||||
| from .plugins.regex import regex_replace | ||||
|  | ||||
| __all__ = [ | ||||
|     'TimeExtension', | ||||
| @@ -17,4 +18,5 @@ __all__ = [ | ||||
|     'create_jinja_env', | ||||
|     'JINJA2_MAX_RETURN_PAYLOAD_SIZE', | ||||
|     'DEFAULT_JINJA2_EXTENSIONS', | ||||
|     'regex_replace', | ||||
| ] | ||||
|   | ||||
							
								
								
									
										6
									
								
								changedetectionio/jinja2_custom/plugins/__init__.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										6
									
								
								changedetectionio/jinja2_custom/plugins/__init__.py
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,6 @@ | ||||
| """ | ||||
| Jinja2 custom filter plugins for changedetection.io | ||||
| """ | ||||
| from .regex import regex_replace | ||||
|  | ||||
| __all__ = ['regex_replace'] | ||||
							
								
								
									
										98
									
								
								changedetectionio/jinja2_custom/plugins/regex.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										98
									
								
								changedetectionio/jinja2_custom/plugins/regex.py
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,98 @@ | ||||
| """ | ||||
| Regex filter plugin for Jinja2 templates. | ||||
|  | ||||
| Provides regex_replace filter for pattern-based string replacements in templates. | ||||
| """ | ||||
| import re | ||||
| import signal | ||||
| from loguru import logger | ||||
|  | ||||
|  | ||||
| def regex_replace(value: str, pattern: str, replacement: str = '', count: int = 0) -> str: | ||||
|     """ | ||||
|     Replace occurrences of a regex pattern in a string. | ||||
|  | ||||
|     Security: Protected against ReDoS (Regular Expression Denial of Service) attacks: | ||||
|     - Limits input value size to prevent excessive processing | ||||
|     - Uses timeout mechanism to prevent runaway regex operations | ||||
|     - Validates pattern complexity to prevent catastrophic backtracking | ||||
|  | ||||
|     Args: | ||||
|         value: The input string to perform replacements on | ||||
|         pattern: The regex pattern to search for | ||||
|         replacement: The replacement string (default: '') | ||||
|         count: Maximum number of replacements (0 = replace all, default: 0) | ||||
|  | ||||
|     Returns: | ||||
|         String with replacements applied, or original value on error | ||||
|  | ||||
|     Example: | ||||
|         {{ "hello world" | regex_replace("world", "universe") }} | ||||
|         {{ diff | regex_replace("<td>([^<]+)</td><td>([^<]+)</td>", "Label1: \\1\\nLabel2: \\2") }} | ||||
|  | ||||
|     Security limits: | ||||
|         - Maximum input size: 10MB | ||||
|         - Maximum pattern length: 500 characters | ||||
|         - Operation timeout: 10 seconds | ||||
|         - Dangerous nested quantifier patterns are rejected | ||||
|     """ | ||||
|     # Security limits | ||||
|     MAX_INPUT_SIZE = 1024 * 1024 * 10 # 10MB max input size | ||||
|     MAX_PATTERN_LENGTH = 500  # Maximum regex pattern length | ||||
|     REGEX_TIMEOUT_SECONDS = 10  # Maximum time for regex operation | ||||
|  | ||||
|     # Validate input sizes | ||||
|     value_str = str(value) | ||||
|     if len(value_str) > MAX_INPUT_SIZE: | ||||
|         logger.warning(f"regex_replace: Input too large ({len(value_str)} bytes), truncating") | ||||
|         value_str = value_str[:MAX_INPUT_SIZE] | ||||
|  | ||||
|     if len(pattern) > MAX_PATTERN_LENGTH: | ||||
|         logger.warning(f"regex_replace: Pattern too long ({len(pattern)} chars), rejecting") | ||||
|         return value_str | ||||
|  | ||||
|     # Check for potentially dangerous patterns (basic checks) | ||||
|     # Nested quantifiers like (a+)+ can cause catastrophic backtracking | ||||
|     dangerous_patterns = [ | ||||
|         r'\([^)]*\+[^)]*\)\+',  # (x+)+ | ||||
|         r'\([^)]*\*[^)]*\)\+',  # (x*)+ | ||||
|         r'\([^)]*\+[^)]*\)\*',  # (x+)* | ||||
|         r'\([^)]*\*[^)]*\)\*',  # (x*)* | ||||
|     ] | ||||
|  | ||||
|     for dangerous in dangerous_patterns: | ||||
|         if re.search(dangerous, pattern): | ||||
|             logger.warning(f"regex_replace: Potentially dangerous pattern detected: {pattern}") | ||||
|             return value_str | ||||
|  | ||||
|     def timeout_handler(signum, frame): | ||||
|         raise TimeoutError("Regex operation timed out") | ||||
|  | ||||
|     try: | ||||
|         # Set up timeout for regex operation (Unix-like systems only) | ||||
|         # This prevents ReDoS attacks | ||||
|         old_handler = None | ||||
|         if hasattr(signal, 'SIGALRM'): | ||||
|             old_handler = signal.signal(signal.SIGALRM, timeout_handler) | ||||
|             signal.alarm(REGEX_TIMEOUT_SECONDS) | ||||
|  | ||||
|         try: | ||||
|             result = re.sub(pattern, replacement, value_str, count=count) | ||||
|         finally: | ||||
|             # Cancel the alarm | ||||
|             if hasattr(signal, 'SIGALRM'): | ||||
|                 signal.alarm(0) | ||||
|                 if old_handler is not None: | ||||
|                     signal.signal(signal.SIGALRM, old_handler) | ||||
|  | ||||
|         return result | ||||
|  | ||||
|     except TimeoutError: | ||||
|         logger.error(f"regex_replace: Regex operation timed out - possible ReDoS attack. Pattern: {pattern}") | ||||
|         return value_str | ||||
|     except re.error as e: | ||||
|         logger.warning(f"regex_replace: Invalid regex pattern: {e}") | ||||
|         return value_str | ||||
|     except Exception as e: | ||||
|         logger.error(f"regex_replace: Unexpected error: {e}") | ||||
|         return value_str | ||||
| @@ -8,13 +8,13 @@ import jinja2.sandbox | ||||
| import typing as t | ||||
| import os | ||||
| from .extensions.TimeExtension import TimeExtension | ||||
| from .plugins import regex_replace | ||||
|  | ||||
| JINJA2_MAX_RETURN_PAYLOAD_SIZE = 1024 * int(os.getenv("JINJA2_MAX_RETURN_PAYLOAD_SIZE_KB", 1024 * 10)) | ||||
|  | ||||
| # Default extensions - can be overridden in create_jinja_env() | ||||
| DEFAULT_JINJA2_EXTENSIONS = [TimeExtension] | ||||
|  | ||||
|  | ||||
| def create_jinja_env(extensions=None, **kwargs) -> jinja2.sandbox.ImmutableSandboxedEnvironment: | ||||
|     """ | ||||
|     Create a sandboxed Jinja2 environment with our custom extensions and default timezone. | ||||
| @@ -38,6 +38,9 @@ def create_jinja_env(extensions=None, **kwargs) -> jinja2.sandbox.ImmutableSandb | ||||
|     default_timezone = os.getenv('TZ', 'UTC').strip() | ||||
|     jinja2_env.default_timezone = default_timezone | ||||
|  | ||||
|     # Register custom filters | ||||
|     jinja2_env.filters['regex_replace'] = regex_replace | ||||
|  | ||||
|     return jinja2_env | ||||
|  | ||||
|  | ||||
|   | ||||
| @@ -1,5 +1,5 @@ | ||||
| from blinker import signal | ||||
|  | ||||
| from changedetectionio.html_tools import is_safe_url | ||||
| from changedetectionio.strtobool import strtobool | ||||
| from changedetectionio.jinja2_custom import render as jinja_render | ||||
| from . import watch_base | ||||
| @@ -21,23 +21,6 @@ FAVICON_RESAVE_THRESHOLD_SECONDS=86400 | ||||
| minimum_seconds_recheck_time = int(os.getenv('MINIMUM_SECONDS_RECHECK_TIME', 3)) | ||||
| mtable = {'seconds': 1, 'minutes': 60, 'hours': 3600, 'days': 86400, 'weeks': 86400 * 7} | ||||
|  | ||||
|  | ||||
| def is_safe_url(test_url): | ||||
|     # See https://github.com/dgtlmoon/changedetection.io/issues/1358 | ||||
|  | ||||
|     # Remove 'source:' prefix so we dont get 'source:javascript:' etc | ||||
|     # 'source:' is a valid way to tell us to return the source | ||||
|  | ||||
|     r = re.compile(re.escape('source:'), re.IGNORECASE) | ||||
|     test_url = r.sub('', test_url) | ||||
|  | ||||
|     pattern = re.compile(os.getenv('SAFE_PROTOCOL_REGEX', SAFE_PROTOCOL_REGEX), re.IGNORECASE) | ||||
|     if not pattern.match(test_url.strip()): | ||||
|         return False | ||||
|  | ||||
|     return True | ||||
|  | ||||
|  | ||||
| class model(watch_base): | ||||
|     __newest_history_key = None | ||||
|     __history_n = 0 | ||||
|   | ||||
| @@ -2,7 +2,7 @@ import os | ||||
| import uuid | ||||
|  | ||||
| from changedetectionio import strtobool | ||||
| default_notification_format_for_watch = 'System default' | ||||
| USE_SYSTEM_DEFAULT_NOTIFICATION_FORMAT_FOR_WATCH = 'System default' | ||||
| CONDITIONS_MATCH_LOGIC_DEFAULT = 'ALL' | ||||
|  | ||||
| class watch_base(dict): | ||||
| @@ -44,7 +44,7 @@ class watch_base(dict): | ||||
|             'method': 'GET', | ||||
|             'notification_alert_count': 0, | ||||
|             'notification_body': None, | ||||
|             'notification_format': default_notification_format_for_watch, | ||||
|             'notification_format': USE_SYSTEM_DEFAULT_NOTIFICATION_FORMAT_FOR_WATCH, | ||||
|             'notification_muted': False, | ||||
|             'notification_screenshot': False,  # Include the latest screenshot if available and supported by the apprise URL | ||||
|             'notification_title': None, | ||||
|   | ||||
| @@ -1,17 +1,16 @@ | ||||
| from changedetectionio.model import default_notification_format_for_watch | ||||
| from changedetectionio.model import USE_SYSTEM_DEFAULT_NOTIFICATION_FORMAT_FOR_WATCH | ||||
|  | ||||
| default_notification_format = 'HTML Color' | ||||
| default_notification_format = 'htmlcolor' | ||||
| default_notification_body = '{{watch_url}} had a change.\n---\n{{diff}}\n---\n' | ||||
| default_notification_title = 'ChangeDetection.io Notification - {{watch_url}}' | ||||
|  | ||||
| # The values (markdown etc) are from apprise NotifyFormat, | ||||
| # But to avoid importing the whole heavy module just use the same strings here. | ||||
| valid_notification_formats = { | ||||
|     'Plain Text': 'text', | ||||
|     'HTML': 'html', | ||||
|     'HTML Color': 'htmlcolor', | ||||
|     'Markdown to HTML': 'markdown', | ||||
|     'text': 'Plain Text', | ||||
|     'html': 'HTML', | ||||
|     'htmlcolor': 'HTML Color', | ||||
|     'markdown': 'Markdown to HTML', | ||||
|     # Used only for editing a watch (not for global) | ||||
|     default_notification_format_for_watch: default_notification_format_for_watch | ||||
|     USE_SYSTEM_DEFAULT_NOTIFICATION_FORMAT_FOR_WATCH: USE_SYSTEM_DEFAULT_NOTIFICATION_FORMAT_FOR_WATCH | ||||
| } | ||||
|  | ||||
|   | ||||
| @@ -1,10 +1,61 @@ | ||||
| """ | ||||
| Custom Apprise HTTP Handlers with format= Parameter Support | ||||
|  | ||||
| IMPORTANT: This module works around a limitation in Apprise's @notify decorator. | ||||
|  | ||||
| THE PROBLEM: | ||||
| ------------- | ||||
| When using Apprise's @notify decorator to create custom notification handlers, the | ||||
| decorator creates a CustomNotifyPlugin that uses parse_url(..., simple=True) to parse | ||||
| URLs. This simple parsing mode does NOT extract the format= query parameter from the URL | ||||
| and set it as a top-level parameter that NotifyBase.__init__ can use to set notify_format. | ||||
|  | ||||
| As a result: | ||||
| 1. URL: post://example.com/webhook?format=html | ||||
| 2. Apprise parses this and sees format=html in qsd (query string dictionary) | ||||
| 3. But it does NOT extract it and pass it to NotifyBase.__init__ | ||||
| 4. NotifyBase defaults to notify_format=TEXT | ||||
| 5. When you call apobj.notify(body="<html>...", body_format="html"): | ||||
|    - Apprise sees: input format = html, output format (notify_format) = text | ||||
|    - Apprise calls convert_between("html", "text", body) | ||||
|    - This strips all HTML tags, leaving only plain text | ||||
| 6. Your custom handler receives stripped plain text instead of HTML | ||||
|  | ||||
| THE SOLUTION: | ||||
| ------------- | ||||
| Instead of using the @notify decorator directly, we: | ||||
| 1. Manually register custom plugins using plugins.N_MGR.add() | ||||
| 2. Create a CustomHTTPHandler class that extends CustomNotifyPlugin | ||||
| 3. Override __init__ to extract format= from qsd and set it as kwargs['format'] | ||||
| 4. Call NotifyBase.__init__ which properly sets notify_format from kwargs['format'] | ||||
| 5. Set up _default_args like CustomNotifyPlugin does for compatibility | ||||
|  | ||||
| This ensures that when format=html is in the URL: | ||||
| - notify_format is set to HTML | ||||
| - Apprise sees: input format = html, output format = html | ||||
| - No conversion happens (convert_between returns content unchanged) | ||||
| - Your custom handler receives the original HTML intact | ||||
|  | ||||
| TESTING: | ||||
| -------- | ||||
| To verify this works: | ||||
| >>> apobj = apprise.Apprise() | ||||
| >>> apobj.add('post://localhost:5005/test?format=html') | ||||
| >>> for server in apobj: | ||||
| ...     print(server.notify_format)  # Should print: html (not text) | ||||
| >>> apobj.notify(body='<span>Test</span>', body_format='html') | ||||
| # Your handler should receive '<span>Test</span>' not 'Test' | ||||
| """ | ||||
|  | ||||
| import json | ||||
| import re | ||||
| from urllib.parse import unquote_plus | ||||
|  | ||||
| import requests | ||||
| from apprise.decorators import notify | ||||
| from apprise.utils.parse import parse_url as apprise_parse_url | ||||
| from apprise import plugins | ||||
| from apprise.decorators.base import CustomNotifyPlugin | ||||
| from apprise.utils.parse import parse_url as apprise_parse_url, url_assembly | ||||
| from apprise.utils.logic import dict_full_update | ||||
| from loguru import logger | ||||
| from requests.structures import CaseInsensitiveDict | ||||
|  | ||||
| @@ -12,13 +63,66 @@ SUPPORTED_HTTP_METHODS = {"get", "post", "put", "delete", "patch", "head"} | ||||
|  | ||||
|  | ||||
| def notify_supported_methods(func): | ||||
|     """Register custom HTTP method handlers that properly support format= parameter.""" | ||||
|     for method in SUPPORTED_HTTP_METHODS: | ||||
|         func = notify(on=method)(func) | ||||
|         # Add support for https, for each supported http method | ||||
|         func = notify(on=f"{method}s")(func) | ||||
|         _register_http_handler(method, func) | ||||
|         _register_http_handler(f"{method}s", func) | ||||
|     return func | ||||
|  | ||||
|  | ||||
| def _register_http_handler(schema, send_func): | ||||
|     """Register a custom HTTP handler that extracts format= from URL query parameters.""" | ||||
|  | ||||
|     # Parse base URL | ||||
|     base_url = f"{schema}://" | ||||
|     base_args = apprise_parse_url(base_url, default_schema=schema, verify_host=False, simple=True) | ||||
|  | ||||
|     class CustomHTTPHandler(CustomNotifyPlugin): | ||||
|         secure_protocol = schema | ||||
|         service_name = f"Custom HTTP - {schema.upper()}" | ||||
|         _base_args = base_args | ||||
|  | ||||
|         def __init__(self, **kwargs): | ||||
|             # Extract format from qsd and set it as a top-level kwarg | ||||
|             # This allows NotifyBase.__init__ to properly set notify_format | ||||
|             if 'qsd' in kwargs and 'format' in kwargs['qsd']: | ||||
|                 kwargs['format'] = kwargs['qsd']['format'] | ||||
|  | ||||
|             # Call NotifyBase.__init__ (skip CustomNotifyPlugin.__init__) | ||||
|             super(CustomNotifyPlugin, self).__init__(**kwargs) | ||||
|  | ||||
|             # Set up _default_args like CustomNotifyPlugin does | ||||
|             self._default_args = {} | ||||
|             kwargs.pop("secure", None) | ||||
|             dict_full_update(self._default_args, self._base_args) | ||||
|             dict_full_update(self._default_args, kwargs) | ||||
|             self._default_args["url"] = url_assembly(**self._default_args) | ||||
|  | ||||
|         __send = staticmethod(send_func) | ||||
|  | ||||
|         def send(self, body, title="", notify_type="info", *args, **kwargs): | ||||
|             """Call the custom send function.""" | ||||
|             try: | ||||
|                 result = self.__send( | ||||
|                     body, title, notify_type, | ||||
|                     *args, | ||||
|                     meta=self._default_args, | ||||
|                     **kwargs | ||||
|                 ) | ||||
|                 return True if result is None else bool(result) | ||||
|             except Exception as e: | ||||
|                 self.logger.warning(f"Exception in custom HTTP handler: {e}") | ||||
|                 return False | ||||
|  | ||||
|     # Register the plugin | ||||
|     plugins.N_MGR.add( | ||||
|         plugin=CustomHTTPHandler, | ||||
|         schemas=schema, | ||||
|         send_func=send_func, | ||||
|         url=base_url, | ||||
|     ) | ||||
|  | ||||
|  | ||||
| def _get_auth(parsed_url: dict) -> str | tuple[str, str]: | ||||
|     user: str | None = parsed_url.get("user") | ||||
|     password: str | None = parsed_url.get("password") | ||||
| @@ -74,6 +178,8 @@ def apprise_http_custom_handler( | ||||
|     *args, | ||||
|     **kwargs, | ||||
| ) -> bool: | ||||
|  | ||||
|  | ||||
|     url: str = meta.get("url") | ||||
|     schema: str = meta.get("schema") | ||||
|     method: str = re.sub(r"s$", "", schema).upper() | ||||
| @@ -89,25 +195,16 @@ def apprise_http_custom_handler( | ||||
|  | ||||
|     url = re.sub(rf"^{schema}", "https" if schema.endswith("s") else "http", parsed_url.get("url")) | ||||
|  | ||||
|     try: | ||||
|         response = requests.request( | ||||
|             method=method, | ||||
|             url=url, | ||||
|             auth=auth, | ||||
|             headers=headers, | ||||
|             params=params, | ||||
|             data=body.encode("utf-8") if isinstance(body, str) else body, | ||||
|         ) | ||||
|     response = requests.request( | ||||
|         method=method, | ||||
|         url=url, | ||||
|         auth=auth, | ||||
|         headers=headers, | ||||
|         params=params, | ||||
|         data=body.encode("utf-8") if isinstance(body, str) else body, | ||||
|     ) | ||||
|  | ||||
|         response.raise_for_status() | ||||
|     response.raise_for_status() | ||||
|  | ||||
|         logger.info(f"Successfully sent custom notification to {url}") | ||||
|         return True | ||||
|  | ||||
|     except requests.RequestException as e: | ||||
|         logger.error(f"Remote host error while sending custom notification to {url}: {e}") | ||||
|         return False | ||||
|  | ||||
|     except Exception as e: | ||||
|         logger.error(f"Unexpected error occurred while sending custom notification to {url}: {e}") | ||||
|         return False | ||||
|     logger.info(f"Successfully sent custom notification to {url}") | ||||
|     return True | ||||
|   | ||||
							
								
								
									
										42
									
								
								changedetectionio/notification/email_helpers.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										42
									
								
								changedetectionio/notification/email_helpers.py
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,42 @@ | ||||
| def as_monospaced_html_email(content: str, title: str) -> str: | ||||
|     """ | ||||
|     Wraps `content` in a minimal, email-safe HTML template | ||||
|     that forces monospace rendering across Gmail, Hotmail, Apple Mail, etc. | ||||
|  | ||||
|     Args: | ||||
|         content: The body text (plain text or HTML-like). | ||||
|         title: The title plaintext | ||||
|     Returns: | ||||
|         A complete HTML document string suitable for sending as an email body. | ||||
|     """ | ||||
|  | ||||
|     # All line feed types should be removed and then this function should only be fed <br>'s | ||||
|     # Then it works with our <pre> styling without double linefeeds | ||||
|     content = content.translate(str.maketrans('', '', '\r\n')) | ||||
|  | ||||
|     if title: | ||||
|         import html | ||||
|         title = html.escape(title) | ||||
|     else: | ||||
|         title = '' | ||||
|     # 2. Full email-safe HTML | ||||
|     html_email = f"""<!DOCTYPE html> | ||||
| <html lang="en"> | ||||
| <head> | ||||
|   <meta charset="UTF-8"> | ||||
|   <meta name="x-apple-disable-message-reformatting"> | ||||
|   <meta name="viewport" content="width=device-width, initial-scale=1.0"> | ||||
|   <!--[if mso]> | ||||
|     <style> | ||||
|       body, div, pre, td {{ font-family: "Courier New", Courier, monospace !important; }} | ||||
|     </style> | ||||
|   <![endif]--> | ||||
|   <title>{title}</title> | ||||
| </head> | ||||
| <body style="-webkit-text-size-adjust:100%;-ms-text-size-adjust:100%;"> | ||||
|   <pre role="article" aria-roledescription="email" lang="en" | ||||
|        style="font-family: monospace, 'Courier New', Courier; font-size: 0.8em; | ||||
|               white-space: pre-wrap; word-break: break-word;">{content}</pre> | ||||
| </body> | ||||
| </html>""" | ||||
|     return html_email | ||||
| @@ -6,10 +6,12 @@ from loguru import logger | ||||
| from urllib.parse import urlparse | ||||
| from .apprise_plugin.assets import apprise_asset, APPRISE_AVATAR_URL | ||||
| from .apprise_plugin.custom_handlers import SUPPORTED_HTTP_METHODS | ||||
| from .email_helpers import as_monospaced_html_email | ||||
| from ..diff import HTML_REMOVED_STYLE, REMOVED_PLACEMARKER_OPEN, REMOVED_PLACEMARKER_CLOSED, ADDED_PLACEMARKER_OPEN, HTML_ADDED_STYLE, \ | ||||
|     ADDED_PLACEMARKER_CLOSED, CHANGED_INTO_PLACEMARKER_OPEN, CHANGED_INTO_PLACEMARKER_CLOSED, CHANGED_PLACEMARKER_OPEN, \ | ||||
|     CHANGED_PLACEMARKER_CLOSED | ||||
| from ..notification_service import NotificationContextData | ||||
|     CHANGED_PLACEMARKER_CLOSED, HTML_CHANGED_STYLE, HTML_CHANGED_INTO_STYLE | ||||
| from ..notification_service import NotificationContextData, CUSTOM_LINEBREAK_PLACEHOLDER | ||||
|  | ||||
|  | ||||
|  | ||||
| def markup_text_links_to_html(body): | ||||
| @@ -61,13 +63,13 @@ def notification_format_align_with_apprise(n_format : str): | ||||
|     :return: | ||||
|     """ | ||||
|  | ||||
|     if n_format.lower().startswith('html'): | ||||
|     if n_format.startswith('html'): | ||||
|         # Apprise only knows 'html' not 'htmlcolor' etc, which shouldnt matter here | ||||
|         n_format = NotifyFormat.HTML.value | ||||
|     elif n_format.lower().startswith('markdown'): | ||||
|     elif n_format.startswith('markdown'): | ||||
|         # probably the same but just to be safe | ||||
|         n_format = NotifyFormat.MARKDOWN.value | ||||
|     elif n_format.lower().startswith('text'): | ||||
|     elif n_format.startswith('text'): | ||||
|         # probably the same but just to be safe | ||||
|         n_format = NotifyFormat.TEXT.value | ||||
|     else: | ||||
| @@ -75,6 +77,55 @@ def notification_format_align_with_apprise(n_format : str): | ||||
|  | ||||
|     return n_format | ||||
|  | ||||
| def apply_discord_markdown_to_body(n_body): | ||||
|     """ | ||||
|     Discord does not support <del> but it supports non-standard ~~strikethrough~~ | ||||
|     :param n_body: | ||||
|     :return: | ||||
|     """ | ||||
|     import re | ||||
|     # Define the mapping between your placeholders and markdown markers | ||||
|     replacements = [ | ||||
|         (REMOVED_PLACEMARKER_OPEN, '~~', REMOVED_PLACEMARKER_CLOSED, '~~'), | ||||
|         (ADDED_PLACEMARKER_OPEN, '**', ADDED_PLACEMARKER_CLOSED, '**'), | ||||
|         (CHANGED_PLACEMARKER_OPEN, '~~', CHANGED_PLACEMARKER_CLOSED, '~~'), | ||||
|         (CHANGED_INTO_PLACEMARKER_OPEN, '**', CHANGED_INTO_PLACEMARKER_CLOSED, '**'), | ||||
|     ] | ||||
|     # So that the markdown gets added without any whitespace following it which would break it | ||||
|     for open_tag, open_md, close_tag, close_md in replacements: | ||||
|         # Regex: match opening tag, optional whitespace, capture the content, optional whitespace, then closing tag | ||||
|         pattern = re.compile( | ||||
|             re.escape(open_tag) + r'(\s*)(.*?)?(\s*)' + re.escape(close_tag), | ||||
|             flags=re.DOTALL | ||||
|         ) | ||||
|         n_body = pattern.sub(lambda m: f"{m.group(1)}{open_md}{m.group(2)}{close_md}{m.group(3)}", n_body) | ||||
|     return n_body | ||||
|  | ||||
| def apply_standard_markdown_to_body(n_body): | ||||
|     """ | ||||
|     Apprise does not support ~~strikethrough~~ but it will convert <del> to HTML strikethrough. | ||||
|     :param n_body: | ||||
|     :return: | ||||
|     """ | ||||
|     import re | ||||
|     # Define the mapping between your placeholders and markdown markers | ||||
|     replacements = [ | ||||
|         (REMOVED_PLACEMARKER_OPEN, '<del>', REMOVED_PLACEMARKER_CLOSED, '</del>'), | ||||
|         (ADDED_PLACEMARKER_OPEN, '**', ADDED_PLACEMARKER_CLOSED, '**'), | ||||
|         (CHANGED_PLACEMARKER_OPEN, '<del>', CHANGED_PLACEMARKER_CLOSED, '</del>'), | ||||
|         (CHANGED_INTO_PLACEMARKER_OPEN, '**', CHANGED_INTO_PLACEMARKER_CLOSED, '**'), | ||||
|     ] | ||||
|  | ||||
|     # So that the markdown gets added without any whitespace following it which would break it | ||||
|     for open_tag, open_md, close_tag, close_md in replacements: | ||||
|         # Regex: match opening tag, optional whitespace, capture the content, optional whitespace, then closing tag | ||||
|         pattern = re.compile( | ||||
|             re.escape(open_tag) + r'(\s*)(.*?)?(\s*)' + re.escape(close_tag), | ||||
|             flags=re.DOTALL | ||||
|         ) | ||||
|         n_body = pattern.sub(lambda m: f"{m.group(1)}{open_md}{m.group(2)}{close_md}{m.group(3)}", n_body) | ||||
|     return n_body | ||||
|  | ||||
|  | ||||
| def apply_service_tweaks(url, n_body, n_title, requested_output_format): | ||||
|  | ||||
| @@ -104,6 +155,7 @@ def apply_service_tweaks(url, n_body, n_title, requested_output_format): | ||||
|         # @todo re-use an existing library we have already imported to strip all non-allowed tags | ||||
|         n_body = n_body.replace('<br>', '\n') | ||||
|         n_body = n_body.replace('</br>', '\n') | ||||
|         n_body = n_body.replace(CUSTOM_LINEBREAK_PLACEHOLDER, '\n') | ||||
|  | ||||
|         # Use strikethrough for removed content, bold for added content | ||||
|         n_body = n_body.replace(REMOVED_PLACEMARKER_OPEN, '<s>') | ||||
| @@ -128,6 +180,7 @@ def apply_service_tweaks(url, n_body, n_title, requested_output_format): | ||||
|         # Discord doesn't support HTML, replace <br> with newlines | ||||
|         n_body = n_body.strip().replace('<br>', '\n') | ||||
|         n_body = n_body.replace('</br>', '\n') | ||||
|         n_body = n_body.replace(CUSTOM_LINEBREAK_PLACEHOLDER, '\n') | ||||
|  | ||||
|         # Don't replace placeholders or truncate here - let the custom Discord plugin handle it | ||||
|         # The plugin will use embeds (6000 char limit across all embeds) if placeholders are present, | ||||
| @@ -137,15 +190,7 @@ def apply_service_tweaks(url, n_body, n_title, requested_output_format): | ||||
|         if requested_output_format == 'html': | ||||
|             # No diff placeholders, use Discord markdown for any other formatting | ||||
|             # Use Discord markdown: strikethrough for removed, bold for added | ||||
|             n_body = n_body.replace(REMOVED_PLACEMARKER_OPEN, '~~') | ||||
|             n_body = n_body.replace(REMOVED_PLACEMARKER_CLOSED, '~~') | ||||
|             n_body = n_body.replace(ADDED_PLACEMARKER_OPEN, '**') | ||||
|             n_body = n_body.replace(ADDED_PLACEMARKER_CLOSED, '**') | ||||
|             # Handle changed/replaced lines (old → new) | ||||
|             n_body = n_body.replace(CHANGED_PLACEMARKER_OPEN, '~~') | ||||
|             n_body = n_body.replace(CHANGED_PLACEMARKER_CLOSED, '~~') | ||||
|             n_body = n_body.replace(CHANGED_INTO_PLACEMARKER_OPEN, '**') | ||||
|             n_body = n_body.replace(CHANGED_INTO_PLACEMARKER_CLOSED, '**') | ||||
|             n_body = apply_discord_markdown_to_body(n_body=n_body) | ||||
|  | ||||
|             # Apply 2000 char limit for plain content | ||||
|             payload_max_size = 1700 | ||||
| @@ -156,16 +201,17 @@ def apply_service_tweaks(url, n_body, n_title, requested_output_format): | ||||
|  | ||||
|     # Is not discord/tgram and they want htmlcolor | ||||
|     elif requested_output_format == 'htmlcolor': | ||||
|         n_body = n_body.replace(REMOVED_PLACEMARKER_OPEN, f'<span style="{HTML_REMOVED_STYLE}">') | ||||
|         # https://github.com/dgtlmoon/changedetection.io/issues/821#issuecomment-1241837050 | ||||
|         n_body = n_body.replace(REMOVED_PLACEMARKER_OPEN, f'<span style="{HTML_REMOVED_STYLE}" role="deletion" aria-label="Removed text" title="Removed text">') | ||||
|         n_body = n_body.replace(REMOVED_PLACEMARKER_CLOSED, f'</span>') | ||||
|         n_body = n_body.replace(ADDED_PLACEMARKER_OPEN, f'<span style="{HTML_ADDED_STYLE}">') | ||||
|         n_body = n_body.replace(ADDED_PLACEMARKER_OPEN, f'<span style="{HTML_ADDED_STYLE}" role="insertion" aria-label="Added text" title="Added text">') | ||||
|         n_body = n_body.replace(ADDED_PLACEMARKER_CLOSED, f'</span>') | ||||
|         # Handle changed/replaced lines (old → new) | ||||
|         n_body = n_body.replace(CHANGED_PLACEMARKER_OPEN, f'<span style="{HTML_REMOVED_STYLE}">') | ||||
|         n_body = n_body.replace(CHANGED_PLACEMARKER_OPEN, f'<span style="{HTML_CHANGED_STYLE}" role="note" aria-label="Changed text" title="Changed text">') | ||||
|         n_body = n_body.replace(CHANGED_PLACEMARKER_CLOSED, f'</span>') | ||||
|         n_body = n_body.replace(CHANGED_INTO_PLACEMARKER_OPEN, f'<span style="{HTML_ADDED_STYLE}">') | ||||
|         n_body = n_body.replace(CHANGED_INTO_PLACEMARKER_OPEN, f'<span style="{HTML_CHANGED_INTO_STYLE}" role="note" aria-label="Changed into" title="Changed into">') | ||||
|         n_body = n_body.replace(CHANGED_INTO_PLACEMARKER_CLOSED, f'</span>') | ||||
|         n_body = n_body.replace("\n", '<br>') | ||||
|         n_body = n_body.replace('\n', f'{CUSTOM_LINEBREAK_PLACEHOLDER}\n') | ||||
|     elif requested_output_format == 'html': | ||||
|         n_body = n_body.replace(REMOVED_PLACEMARKER_OPEN, '(removed) ') | ||||
|         n_body = n_body.replace(REMOVED_PLACEMARKER_CLOSED, '') | ||||
| @@ -175,7 +221,10 @@ def apply_service_tweaks(url, n_body, n_title, requested_output_format): | ||||
|         n_body = n_body.replace(CHANGED_PLACEMARKER_CLOSED, f'') | ||||
|         n_body = n_body.replace(CHANGED_INTO_PLACEMARKER_OPEN, f'(into) ') | ||||
|         n_body = n_body.replace(CHANGED_INTO_PLACEMARKER_CLOSED, f'') | ||||
|         n_body = n_body.replace("\n", '<br>') | ||||
|         n_body = n_body.replace('\n', f'{CUSTOM_LINEBREAK_PLACEHOLDER}\n') | ||||
|     elif requested_output_format == 'markdown': | ||||
|         # Markdown to HTML - Apprise will convert this to HTML | ||||
|         n_body = apply_standard_markdown_to_body(n_body=n_body) | ||||
|  | ||||
|     else: #plaintext etc default | ||||
|         n_body = n_body.replace(REMOVED_PLACEMARKER_OPEN, '(removed) ') | ||||
| @@ -192,21 +241,12 @@ def apply_service_tweaks(url, n_body, n_title, requested_output_format): | ||||
|  | ||||
| def process_notification(n_object: NotificationContextData, datastore): | ||||
|     from changedetectionio.jinja2_custom import render as jinja_render | ||||
|     from . import default_notification_format_for_watch, default_notification_format, valid_notification_formats | ||||
|     from . import USE_SYSTEM_DEFAULT_NOTIFICATION_FORMAT_FOR_WATCH, default_notification_format, valid_notification_formats | ||||
|     # be sure its registered | ||||
|     from .apprise_plugin.custom_handlers import apprise_http_custom_handler | ||||
|     # Register custom Discord plugin | ||||
|     from .apprise_plugin.discord import NotifyDiscordCustom | ||||
|  | ||||
|     # Create list of custom handler protocols (both http and https versions) | ||||
|     custom_handler_protocols = [f"{method}://" for method in SUPPORTED_HTTP_METHODS] | ||||
|     custom_handler_protocols += [f"{method}s://" for method in SUPPORTED_HTTP_METHODS] | ||||
|  | ||||
|     has_custom_handler = any( | ||||
|         url.startswith(tuple(custom_handler_protocols)) | ||||
|         for url in n_object['notification_urls'] | ||||
|     ) | ||||
|  | ||||
|     if not isinstance(n_object, NotificationContextData): | ||||
|         raise TypeError(f"Expected NotificationContextData, got {type(n_object)}") | ||||
|  | ||||
| @@ -217,27 +257,21 @@ def process_notification(n_object: NotificationContextData, datastore): | ||||
|     # Insert variables into the notification content | ||||
|     notification_parameters = create_notification_parameters(n_object, datastore) | ||||
|  | ||||
|     requested_output_format = valid_notification_formats.get( | ||||
|         n_object.get('notification_format', default_notification_format), | ||||
|         valid_notification_formats[default_notification_format], | ||||
|     ) | ||||
|     requested_output_format = n_object.get('notification_format', default_notification_format) | ||||
|     logger.debug(f"Requested notification output format: '{requested_output_format}'") | ||||
|  | ||||
|     # If we arrived with 'System default' then look it up | ||||
|     if requested_output_format == default_notification_format_for_watch and datastore.data['settings']['application'].get('notification_format') != default_notification_format_for_watch: | ||||
|     if requested_output_format == USE_SYSTEM_DEFAULT_NOTIFICATION_FORMAT_FOR_WATCH: | ||||
|         # Initially text or whatever | ||||
|         requested_output_format = datastore.data['settings']['application'].get('notification_format', valid_notification_formats[default_notification_format]).lower() | ||||
|         requested_output_format = datastore.data['settings']['application'].get('notification_format', default_notification_format) | ||||
|  | ||||
|     requested_output_format_original = requested_output_format | ||||
|  | ||||
|     # Now clean it up so it fits perfectly with apprise | ||||
|     requested_output_format = notification_format_align_with_apprise(n_format=requested_output_format) | ||||
|  | ||||
|     logger.trace(f"Complete notification body including Jinja and placeholders calculated in  {time.time() - now:.2f}s") | ||||
|  | ||||
|     # If we have custom handlers, use invalid format to prevent conversion | ||||
|     # Otherwise use the proper format | ||||
|     if has_custom_handler: | ||||
|         input_format = 'raw-no-convert' | ||||
|  | ||||
|     # https://github.com/caronc/apprise/wiki/Development_LogCapture | ||||
|     # Anything higher than or equal to WARNING (which covers things like Connection errors) | ||||
|     # raise it as an exception | ||||
| @@ -258,43 +292,15 @@ def process_notification(n_object: NotificationContextData, datastore): | ||||
|     if not n_object.get('notification_urls'): | ||||
|         return None | ||||
|  | ||||
|     with apprise.LogCapture(level=apprise.logging.DEBUG) as logs: | ||||
|     with (apprise.LogCapture(level=apprise.logging.DEBUG) as logs): | ||||
|         for url in n_object['notification_urls']: | ||||
|             parsed_url = urlparse(url) | ||||
|             prefix_add_to_url = '?' if not parsed_url.query else '&' | ||||
|  | ||||
|             # Get the notification body from datastore | ||||
|             n_body = jinja_render(template_str=n_object.get('notification_body', ''), **notification_parameters) | ||||
|  | ||||
|             if n_object.get('markup_text_to_html'): | ||||
|             if n_object.get('markup_text_links_to_html_links'): | ||||
|                 n_body = markup_text_links_to_html(body=n_body) | ||||
|  | ||||
|             # This actually means we request "Markdown to HTML" | ||||
|             if requested_output_format == NotifyFormat.MARKDOWN.value: | ||||
|                 output_format = NotifyFormat.HTML.value | ||||
|                 input_format = NotifyFormat.MARKDOWN.value | ||||
|                 if not 'format=' in url.lower(): | ||||
|                     url = f"{url}{prefix_add_to_url}format={output_format}" | ||||
|  | ||||
|             # Deviation from apprise. | ||||
|             # No conversion, its like they want to send raw HTML but we add linebreaks | ||||
|             elif requested_output_format == NotifyFormat.HTML.value: | ||||
|                 # same in and out means apprise wont try to convert | ||||
|                 input_format = output_format = NotifyFormat.HTML.value | ||||
|                 if not 'format=' in url.lower(): | ||||
|                     url = f"{url}{prefix_add_to_url}format={output_format}" | ||||
|  | ||||
|             else: | ||||
|                 # Nothing to be done, leave it as plaintext | ||||
|                 # `body_format` Tell apprise what format the INPUT is in | ||||
|                 # &format= in URL Tell apprise what format the OUTPUT should be in (it can convert between) | ||||
|                 input_format = output_format = NotifyFormat.TEXT.value | ||||
|                 if not 'format=' in url.lower(): | ||||
|                     url = f"{url}{prefix_add_to_url}format={output_format}" | ||||
|  | ||||
|             if has_custom_handler: | ||||
|                 input_format='raw-no-convert' | ||||
|  | ||||
|             n_title = jinja_render(template_str=n_object.get('notification_title', ''), **notification_parameters) | ||||
|  | ||||
|             url = url.strip() | ||||
| @@ -309,20 +315,68 @@ def process_notification(n_object: NotificationContextData, datastore): | ||||
|             logger.info(f">> Process Notification: AppRise notifying {url}") | ||||
|             url = jinja_render(template_str=url, **notification_parameters) | ||||
|  | ||||
|             # If it's a plaintext document, and they want HTML type email/alerts, so it needs to be escaped | ||||
|             watch_mime_type = n_object.get('watch_mime_type') | ||||
|             if watch_mime_type and 'text/' in watch_mime_type.lower() and not 'html' in watch_mime_type.lower(): | ||||
|                 if 'html' in requested_output_format: | ||||
|                     from markupsafe import escape | ||||
|                     n_body = str(escape(n_body)) | ||||
|  | ||||
|             if 'html' in requested_output_format: | ||||
|                 # Since the n_body is always some kind of text from the 'diff' engine, attempt to preserve whitespaces that get sent to the HTML output | ||||
|                 # But only where its more than 1 consecutive whitespace, otherwise "and this" becomes "and this" etc which is too much. | ||||
|                 n_body = n_body.replace('  ', '  ') | ||||
|  | ||||
|             (url, n_body, n_title) = apply_service_tweaks(url=url, n_body=n_body, n_title=n_title, requested_output_format=requested_output_format_original) | ||||
|  | ||||
|             apobj.add(url) | ||||
|             apprise_input_format = "NO-THANKS-WE-WILL-MANAGE-ALL-OF-THIS" | ||||
|  | ||||
|             if not 'format=' in url: | ||||
|                 parsed_url = urlparse(url) | ||||
|                 prefix_add_to_url = '?' if not parsed_url.query else '&' | ||||
|  | ||||
|                 # THIS IS THE TRICK HOW TO DISABLE APPRISE DOING WEIRD AUTO-CONVERSION WITH BREAKING BR TAGS ETC | ||||
|                 if 'html' in requested_output_format: | ||||
|                     url = f"{url}{prefix_add_to_url}format={NotifyFormat.HTML.value}" | ||||
|                     apprise_input_format = NotifyFormat.HTML.value | ||||
|                 elif 'text' in requested_output_format: | ||||
|                     url = f"{url}{prefix_add_to_url}format={NotifyFormat.TEXT.value}" | ||||
|                     apprise_input_format = NotifyFormat.TEXT.value | ||||
|  | ||||
|                 elif requested_output_format == NotifyFormat.MARKDOWN.value: | ||||
|                     # Convert markdown to HTML ourselves since not all plugins do this | ||||
|                     from apprise.conversion import markdown_to_html | ||||
|                     # Make sure there are paragraph breaks around horizontal rules | ||||
|                     n_body = n_body.replace('---', '\n\n---\n\n') | ||||
|                     n_body = markdown_to_html(n_body) | ||||
|                     url = f"{url}{prefix_add_to_url}format={NotifyFormat.HTML.value}" | ||||
|                     requested_output_format = NotifyFormat.HTML.value | ||||
|                     apprise_input_format = NotifyFormat.HTML.value  # Changed from MARKDOWN to HTML | ||||
|  | ||||
|                 # Could have arrived at any stage, so we dont end up running .escape on it | ||||
|                 if 'html' in requested_output_format: | ||||
|                     n_body = n_body.replace(CUSTOM_LINEBREAK_PLACEHOLDER, '<br>\r\n') | ||||
|                 else: | ||||
|                     # texty types | ||||
|                     n_body = n_body.replace(CUSTOM_LINEBREAK_PLACEHOLDER, '\r\n') | ||||
|  | ||||
|             sent_objs.append({'title': n_title, | ||||
|                               'body': n_body, | ||||
|                               'url': url}) | ||||
|             apobj.add(url) | ||||
|  | ||||
|             # Since the output is always based on the plaintext of the 'diff' engine, wrap it nicely. | ||||
|             # It should always be similar to the 'history' part of the UI. | ||||
|             if url.startswith('mail') and 'html' in requested_output_format: | ||||
|                 if not '<pre' in n_body and not '<body' in n_body: # No custom HTML-ish body was setup already | ||||
|                     n_body = as_monospaced_html_email(content=n_body, title=n_title) | ||||
|  | ||||
|         apobj.notify( | ||||
|             title=n_title, | ||||
|             body=n_body, | ||||
|             # `body_format` Tell apprise what format the INPUT is in | ||||
|             # `body_format` Tell apprise what format the INPUT is in, specify a wrong/bad type and it will force skip conversion in apprise | ||||
|             # &format= in URL Tell apprise what format the OUTPUT should be in (it can convert between) | ||||
|             body_format=input_format, | ||||
|             body_format=apprise_input_format, | ||||
|             # False is not an option for AppRise, must be type None | ||||
|             attach=n_object.get('screenshot', None) | ||||
|         ) | ||||
|   | ||||
| @@ -9,29 +9,35 @@ for both sync and async workers | ||||
| from loguru import logger | ||||
| import time | ||||
|  | ||||
| from changedetectionio.notification import default_notification_format | ||||
| from changedetectionio.model import USE_SYSTEM_DEFAULT_NOTIFICATION_FORMAT_FOR_WATCH | ||||
| from changedetectionio.notification import default_notification_format, valid_notification_formats | ||||
|  | ||||
| # This gets modified on notification time (handler.py) depending on the required notification output | ||||
| CUSTOM_LINEBREAK_PLACEHOLDER='@BR@' | ||||
|  | ||||
|  | ||||
| # What is passed around as notification context, also used as the complete list of valid {{ tokens }} | ||||
| class NotificationContextData(dict): | ||||
|     def __init__(self, initial_data=None, **kwargs): | ||||
|         super().__init__({ | ||||
|             'base_url': None, | ||||
|             'current_snapshot': None, | ||||
|             'diff': None, | ||||
|             'diff_added': None, | ||||
|             'diff_full': None, | ||||
|             'diff_patch': None, | ||||
|             'diff_removed': None, | ||||
|             'diff_url': None, | ||||
|             'markup_text_links_to_html_links': False, # If automatic conversion of plaintext to HTML should happen | ||||
|             'notification_timestamp': time.time(), | ||||
|             'preview_url': None, | ||||
|             'screenshot': None, | ||||
|             'triggered_text': None, | ||||
|             'uuid': 'XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX',  # Converted to 'watch_uuid' in create_notification_parameters | ||||
|             'watch_url': 'https://WATCH-PLACE-HOLDER/', | ||||
|             'base_url': None, | ||||
|             'diff_url': None, | ||||
|             'preview_url': None, | ||||
|             'watch_mime_type': None, | ||||
|             'watch_tag': None, | ||||
|             'watch_title': None, | ||||
|             'markup_text_to_html': False, # If automatic conversion of plaintext to HTML should happen | ||||
|             'watch_url': 'https://WATCH-PLACE-HOLDER/', | ||||
|         }) | ||||
|  | ||||
|         # Apply any initial data passed in | ||||
| @@ -43,15 +49,28 @@ class NotificationContextData(dict): | ||||
|         if kwargs: | ||||
|             self.update(kwargs) | ||||
|  | ||||
|         n_format = self.get('notification_format') | ||||
|         if n_format and not valid_notification_formats.get(n_format): | ||||
|             raise ValueError(f'Invalid notification format: "{n_format}"') | ||||
|  | ||||
|     def set_random_for_validation(self): | ||||
|         import random, string | ||||
|         """Randomly fills all dict keys with random strings (for validation/testing).""" | ||||
|         """Randomly fills all dict keys with random strings (for validation/testing).  | ||||
|         So we can test the output in the notification body | ||||
|         """ | ||||
|         for key in self.keys(): | ||||
|             if key in ['uuid', 'time', 'watch_uuid']: | ||||
|                 continue | ||||
|             rand_str = 'RANDOM-PLACEHOLDER-'+''.join(random.choices(string.ascii_letters + string.digits, k=12)) | ||||
|             self[key] = rand_str | ||||
|  | ||||
|     def __setitem__(self, key, value): | ||||
|         if key == 'notification_format' and isinstance(value, str) and not value.startswith('RANDOM-PLACEHOLDER-'): | ||||
|             if not valid_notification_formats.get(value): | ||||
|                 raise ValueError(f'Invalid notification format: "{value}"') | ||||
|  | ||||
|         super().__setitem__(key, value) | ||||
|  | ||||
| class NotificationService: | ||||
|     """ | ||||
|     Standalone notification service that handles all notification functionality | ||||
| @@ -67,7 +86,7 @@ class NotificationService: | ||||
|         Queue a notification for a watch with full diff rendering and template variables | ||||
|         """ | ||||
|         from changedetectionio import diff | ||||
|         from changedetectionio.notification import default_notification_format_for_watch | ||||
|         from changedetectionio.notification import USE_SYSTEM_DEFAULT_NOTIFICATION_FORMAT_FOR_WATCH | ||||
|  | ||||
|         if not isinstance(n_object, NotificationContextData): | ||||
|             raise TypeError(f"Expected NotificationContextData, got {type(n_object)}") | ||||
| @@ -89,27 +108,16 @@ class NotificationService: | ||||
|             snapshot_contents = "No snapshot/history available, the watch should fetch atleast once." | ||||
|  | ||||
|         # If we ended up here with "System default" | ||||
|         if n_object.get('notification_format') == default_notification_format_for_watch: | ||||
|         if n_object.get('notification_format') == USE_SYSTEM_DEFAULT_NOTIFICATION_FORMAT_FOR_WATCH: | ||||
|             n_object['notification_format'] = self.datastore.data['settings']['application'].get('notification_format') | ||||
|  | ||||
|         # HTML needs linebreak, but MarkDown and Text can use a linefeed | ||||
|         if n_object.get('notification_format') == 'HTML': | ||||
|             line_feed_sep = "<br>" | ||||
|             # Snapshot will be plaintext on the disk, convert to some kind of HTML | ||||
|             snapshot_contents = snapshot_contents.replace('\n', line_feed_sep) | ||||
|         elif n_object.get('notification_format') == 'HTML Color': | ||||
|             line_feed_sep = "<br>" | ||||
|             # Snapshot will be plaintext on the disk, convert to some kind of HTML | ||||
|             snapshot_contents = snapshot_contents.replace('\n', line_feed_sep) | ||||
|         else: | ||||
|             line_feed_sep = "\n" | ||||
|  | ||||
|         triggered_text = '' | ||||
|         if len(trigger_text): | ||||
|             from . import html_tools | ||||
|             triggered_text = html_tools.get_triggered_text(content=snapshot_contents, trigger_text=trigger_text) | ||||
|             if triggered_text: | ||||
|                 triggered_text = line_feed_sep.join(triggered_text) | ||||
|                 triggered_text = CUSTOM_LINEBREAK_PLACEHOLDER.join(triggered_text) | ||||
|  | ||||
|         # Could be called as a 'test notification' with only 1 snapshot available | ||||
|         prev_snapshot = "Example text: example test\nExample text: change detection is cool\nExample text: some more examples\n" | ||||
| @@ -121,16 +129,17 @@ class NotificationService: | ||||
|  | ||||
|         n_object.update({ | ||||
|             'current_snapshot': snapshot_contents, | ||||
|             'diff': diff.render_diff(prev_snapshot, current_snapshot, line_feed_sep=line_feed_sep), | ||||
|             'diff_added': diff.render_diff(prev_snapshot, current_snapshot, include_removed=False, line_feed_sep=line_feed_sep), | ||||
|             'diff_full': diff.render_diff(prev_snapshot, current_snapshot, include_equal=True, line_feed_sep=line_feed_sep), | ||||
|             'diff_patch': diff.render_diff(prev_snapshot, current_snapshot, line_feed_sep=line_feed_sep, patch_format=True), | ||||
|             'diff_removed': diff.render_diff(prev_snapshot, current_snapshot, include_added=False, line_feed_sep=line_feed_sep), | ||||
|             'diff': diff.render_diff(prev_snapshot, current_snapshot, line_feed_sep=CUSTOM_LINEBREAK_PLACEHOLDER), | ||||
|             'diff_added': diff.render_diff(prev_snapshot, current_snapshot, include_removed=False, line_feed_sep=CUSTOM_LINEBREAK_PLACEHOLDER), | ||||
|             'diff_full': diff.render_diff(prev_snapshot, current_snapshot, include_equal=True, line_feed_sep=CUSTOM_LINEBREAK_PLACEHOLDER), | ||||
|             'diff_patch': diff.render_diff(prev_snapshot, current_snapshot, line_feed_sep=CUSTOM_LINEBREAK_PLACEHOLDER, patch_format=True), | ||||
|             'diff_removed': diff.render_diff(prev_snapshot, current_snapshot, include_added=False, line_feed_sep=CUSTOM_LINEBREAK_PLACEHOLDER), | ||||
|             'screenshot': watch.get_screenshot() if watch and watch.get('notification_screenshot') else None, | ||||
|             'triggered_text': triggered_text, | ||||
|             'uuid': watch.get('uuid') if watch else None, | ||||
|             'watch_url': watch.get('url') if watch else None, | ||||
|             'watch_uuid': watch.get('uuid') if watch else None, | ||||
|             'watch_mime_type': watch.get('content-type') | ||||
|         }) | ||||
|  | ||||
|         if watch: | ||||
| @@ -146,7 +155,7 @@ class NotificationService: | ||||
|         Individual watch settings > Tag settings > Global settings | ||||
|         """ | ||||
|         from changedetectionio.notification import ( | ||||
|             default_notification_format_for_watch, | ||||
|             USE_SYSTEM_DEFAULT_NOTIFICATION_FORMAT_FOR_WATCH, | ||||
|             default_notification_body, | ||||
|             default_notification_title | ||||
|         ) | ||||
| @@ -154,7 +163,7 @@ class NotificationService: | ||||
|         # Would be better if this was some kind of Object where Watch can reference the parent datastore etc | ||||
|         v = watch.get(var_name) | ||||
|         if v and not watch.get('notification_muted'): | ||||
|             if var_name == 'notification_format' and v == default_notification_format_for_watch: | ||||
|             if var_name == 'notification_format' and v == USE_SYSTEM_DEFAULT_NOTIFICATION_FORMAT_FOR_WATCH: | ||||
|                 return self.datastore.data['settings']['application'].get('notification_format') | ||||
|  | ||||
|             return v | ||||
| @@ -171,7 +180,7 @@ class NotificationService: | ||||
|  | ||||
|         # Otherwise could be defaults | ||||
|         if var_name == 'notification_format': | ||||
|             return default_notification_format_for_watch | ||||
|             return USE_SYSTEM_DEFAULT_NOTIFICATION_FORMAT_FOR_WATCH | ||||
|         if var_name == 'notification_body': | ||||
|             return default_notification_body | ||||
|         if var_name == 'notification_title': | ||||
| @@ -226,9 +235,8 @@ class NotificationService: | ||||
|         if not watch: | ||||
|             return | ||||
|  | ||||
|         n_format = self.datastore.data['settings']['application'].get('notification_format', default_notification_format) | ||||
|         filter_list = ", ".join(watch['include_filters']) | ||||
|         # @todo - This could be a markdown template on the disk, apprise will convert the markdown to HTML+Plaintext parts in the email, and then 'markup_text_to_html' is not needed | ||||
|         # @todo - This could be a markdown template on the disk, apprise will convert the markdown to HTML+Plaintext parts in the email, and then 'markup_text_links_to_html_links' is not needed | ||||
|         body = f"""Hello, | ||||
|  | ||||
| Your configured CSS/xPath filters of '{filter_list}' for {{{{watch_url}}}} did not appear on the page after {threshold} attempts. | ||||
| @@ -243,9 +251,9 @@ Thanks - Your omniscient changedetection.io installation. | ||||
|         n_object = NotificationContextData({ | ||||
|             'notification_title': 'Changedetection.io - Alert - CSS/xPath filter was not present in the page', | ||||
|             'notification_body': body, | ||||
|             'notification_format': n_format, | ||||
|             'markup_text_to_html': n_format.lower().startswith('html') | ||||
|             'notification_format': self._check_cascading_vars('notification_format', watch), | ||||
|         }) | ||||
|         n_object['markup_text_links_to_html_links'] = n_object.get('notification_format').startswith('html') | ||||
|  | ||||
|         if len(watch['notification_urls']): | ||||
|             n_object['notification_urls'] = watch['notification_urls'] | ||||
| @@ -273,9 +281,9 @@ Thanks - Your omniscient changedetection.io installation. | ||||
|         if not watch: | ||||
|             return | ||||
|         threshold = self.datastore.data['settings']['application'].get('filter_failure_notification_threshold_attempts') | ||||
|         n_format = self.datastore.data['settings']['application'].get('notification_format', default_notification_format).lower() | ||||
|  | ||||
|         step = step_n + 1 | ||||
|         # @todo - This could be a markdown template on the disk, apprise will convert the markdown to HTML+Plaintext parts in the email, and then 'markup_text_to_html' is not needed | ||||
|         # @todo - This could be a markdown template on the disk, apprise will convert the markdown to HTML+Plaintext parts in the email, and then 'markup_text_links_to_html_links' is not needed | ||||
|  | ||||
|         # {{{{ }}}} because this will be Jinja2 {{ }} tokens | ||||
|         body = f"""Hello, | ||||
| @@ -292,9 +300,9 @@ Thanks - Your omniscient changedetection.io installation. | ||||
|         n_object = NotificationContextData({ | ||||
|             'notification_title': f"Changedetection.io - Alert - Browser step at position {step} could not be run", | ||||
|             'notification_body': body, | ||||
|             'notification_format': n_format, | ||||
|             'markup_text_to_html': n_format.lower().startswith('html') | ||||
|             'notification_format': self._check_cascading_vars('notification_format', watch), | ||||
|         }) | ||||
|         n_object['markup_text_links_to_html_links'] = n_object.get('notification_format').startswith('html') | ||||
|  | ||||
|         if len(watch['notification_urls']): | ||||
|             n_object['notification_urls'] = watch['notification_urls'] | ||||
|   | ||||
| @@ -1,11 +1,12 @@ | ||||
| from changedetectionio.strtobool import strtobool | ||||
| from changedetectionio.html_tools import is_safe_url | ||||
|  | ||||
| from flask import ( | ||||
|     flash | ||||
| ) | ||||
|  | ||||
| from .html_tools import TRANSLATE_WHITESPACE_TABLE | ||||
| from . model import App, Watch | ||||
| from .model import App, Watch, USE_SYSTEM_DEFAULT_NOTIFICATION_FORMAT_FOR_WATCH | ||||
| from copy import deepcopy, copy | ||||
| from os import path, unlink | ||||
| from threading import Lock | ||||
| @@ -340,7 +341,6 @@ class ChangeDetectionStore: | ||||
|                 logger.error(f"Error fetching metadata for shared watch link {url} {str(e)}") | ||||
|                 flash("Error fetching metadata for {}".format(url), 'error') | ||||
|                 return False | ||||
|         from .model.Watch import is_safe_url | ||||
|         if not is_safe_url(url): | ||||
|             flash('Watch protocol is not permitted by SAFE_PROTOCOL_REGEX', 'error') | ||||
|             return None | ||||
| @@ -987,10 +987,35 @@ class ChangeDetectionStore: | ||||
|             self.data['settings']['application']['ui']['use_page_title_in_list'] = self.data['settings']['application'].get('extract_title_as_title') | ||||
|  | ||||
|     def update_21(self): | ||||
|         self.data['settings']['application']['scheduler_timezone_default'] = self.data['settings']['application'].get('timezone') | ||||
|         del self.data['settings']['application']['timezone'] | ||||
|         if self.data['settings']['application'].get('timezone'): | ||||
|             self.data['settings']['application']['scheduler_timezone_default'] = self.data['settings']['application'].get('timezone') | ||||
|             del self.data['settings']['application']['timezone'] | ||||
|  | ||||
|  | ||||
|     # Some notification formats got the wrong name type | ||||
|     def update_22(self): | ||||
|         from .notification import valid_notification_formats | ||||
|  | ||||
|         sys_n_format = self.data['settings']['application'].get('notification_format') | ||||
|         key_exists_as_value = next((k for k, v in valid_notification_formats.items() if v == sys_n_format), None) | ||||
|         if key_exists_as_value: # key of "Plain text" | ||||
|             logger.success(f"['settings']['application']['notification_format'] '{sys_n_format}' -> '{key_exists_as_value}'") | ||||
|             self.data['settings']['application']['notification_format'] = key_exists_as_value | ||||
|  | ||||
|         for uuid, watch in self.data['watching'].items(): | ||||
|             n_format = self.data['watching'][uuid].get('notification_format') | ||||
|             key_exists_as_value = next((k for k, v in valid_notification_formats.items() if v == n_format), None) | ||||
|             if key_exists_as_value and key_exists_as_value != USE_SYSTEM_DEFAULT_NOTIFICATION_FORMAT_FOR_WATCH:  # key of "Plain text" | ||||
|                 logger.success(f"['watching'][{uuid}]['notification_format'] '{n_format}' -> '{key_exists_as_value}'") | ||||
|                 self.data['watching'][uuid]['notification_format'] = key_exists_as_value # should be 'text' or whatever | ||||
|  | ||||
|         for uuid, tag in self.data['settings']['application']['tags'].items(): | ||||
|             n_format = self.data['settings']['application']['tags'][uuid].get('notification_format') | ||||
|             key_exists_as_value = next((k for k, v in valid_notification_formats.items() if v == n_format), None) | ||||
|             if key_exists_as_value and key_exists_as_value != USE_SYSTEM_DEFAULT_NOTIFICATION_FORMAT_FOR_WATCH:  # key of "Plain text" | ||||
|                 logger.success(f"['settings']['application']['tags'][{uuid}]['notification_format'] '{n_format}' -> '{key_exists_as_value}'") | ||||
|                 self.data['settings']['application']['tags'][uuid]['notification_format'] = key_exists_as_value # should be 'text' or whatever | ||||
|  | ||||
|     def add_notification_url(self, notification_url): | ||||
|          | ||||
|         logger.debug(f">>> Adding new notification_url - '{notification_url}'") | ||||
|   | ||||
| @@ -134,6 +134,12 @@ | ||||
|                                     <p> | ||||
|                                         URL encoding, use <strong>|urlencode</strong>, for example - <code>gets://hook-website.com/test.php?title={{ '{{ watch_title|urlencode }}' }}</code> | ||||
|                                     </p> | ||||
|                                     <p> | ||||
|                                         Regular-expression replace, use <strong>|regex_replace</strong>, for example -   <code>{{ "{{ \"hello world 123\" | regex_replace('[0-9]+', 'no-more-numbers') }}" }}</code> | ||||
|                                     </p> | ||||
|                                     <p> | ||||
|                                         For a complete reference of all Jinja2 built-in filters, users can refer to the <a href="https://jinja.palletsprojects.com/en/3.1.x/templates/#builtin-filters">https://jinja.palletsprojects.com/en/3.1.x/templates/#builtin-filters</a> | ||||
|                                     </p> | ||||
|                                 </div> | ||||
|                             </div> | ||||
|                             <div class="pure-control-group"> | ||||
|   | ||||
| @@ -266,9 +266,7 @@ | ||||
|             <li id="timezone-info"> | ||||
|                 {{ render_field(form.time_schedule_limit.timezone, placeholder=timezone_default_config) }} <span id="local-time-in-tz"></span> | ||||
|                 <datalist id="timezones" style="display: none;"> | ||||
|                     {% for timezone in available_timezones %} | ||||
|                         <option value="{{ timezone }}">{{ timezone }}</option> | ||||
|                     {% endfor %} | ||||
|                     {%- for timezone in available_timezones -%}<option value="{{ timezone }}">{{ timezone }}</option>{%- endfor -%} | ||||
|                 </datalist> | ||||
|             </li> | ||||
|         </ul> | ||||
|   | ||||
| @@ -53,7 +53,7 @@ | ||||
|           <a class="pure-menu-heading" href="{{url_for('watchlist.index')}}"> | ||||
|             <strong>Change</strong>Detection.io</a> | ||||
|         {% endif %} | ||||
|         {% if current_diff_url %} | ||||
|         {% if current_diff_url and is_safe_url(current_diff_url) %} | ||||
|           <a class="current-diff-url" href="{{ current_diff_url }}"> | ||||
|             <span style="max-width: 30%; overflow: hidden">{{ current_diff_url }}</span></a> | ||||
|         {% else %} | ||||
|   | ||||
| @@ -1,51 +1,110 @@ | ||||
| #!/usr/bin/env python3 | ||||
| import asyncio | ||||
| import threading | ||||
| import time | ||||
| from aiosmtpd.controller import Controller | ||||
| from aiosmtpd.smtp import SMTP | ||||
| from flask import Flask, Response | ||||
| from email import message_from_bytes | ||||
| from email.policy import default | ||||
|  | ||||
| # Accept a SMTP message and offer a way to retrieve the last message via TCP Socket | ||||
| # Accept a SMTP message and offer a way to retrieve the last message via HTTP | ||||
|  | ||||
| last_received_message = b"Nothing" | ||||
| last_received_message = b"Nothing received yet." | ||||
| active_smtp_connections = 0 | ||||
| smtp_lock = threading.Lock() | ||||
|  | ||||
|  | ||||
| class CustomSMTPHandler: | ||||
|     async def handle_DATA(self, server, session, envelope): | ||||
|         global last_received_message | ||||
|         last_received_message = envelope.content | ||||
|         print('Receiving message from:', session.peer) | ||||
|         print('Message addressed from:', envelope.mail_from) | ||||
|         print('Message addressed to  :', envelope.rcpt_tos) | ||||
|         print('Message length        :', len(envelope.content)) | ||||
|         print(envelope.content.decode('utf8')) | ||||
|         return '250 Message accepted for delivery' | ||||
|         global last_received_message, active_smtp_connections | ||||
|  | ||||
|         with smtp_lock: | ||||
|             active_smtp_connections += 1 | ||||
|  | ||||
|         try: | ||||
|             last_received_message = envelope.content | ||||
|             print('Receiving message from:', session.peer) | ||||
|             print('Message addressed from:', envelope.mail_from) | ||||
|             print('Message addressed to  :', envelope.rcpt_tos) | ||||
|             print('Message length        :', len(envelope.content)) | ||||
|             print('*******************************') | ||||
|             print(envelope.content.decode('utf8')) | ||||
|             print('*******************************') | ||||
|  | ||||
|             # Parse the email message | ||||
|             msg = message_from_bytes(envelope.content, policy=default) | ||||
|             with open('/tmp/last.eml', 'wb') as f: | ||||
|                 f.write(envelope.content) | ||||
|  | ||||
|             # Write parts to files based on content type | ||||
|             if msg.is_multipart(): | ||||
|                 for part in msg.walk(): | ||||
|                     content_type = part.get_content_type() | ||||
|                     payload = part.get_payload(decode=True) | ||||
|  | ||||
|                     if payload: | ||||
|                         if content_type == 'text/plain': | ||||
|                             with open('/tmp/last.txt', 'wb') as f: | ||||
|                                 f.write(payload) | ||||
|                             print(f'Written text/plain part to /tmp/last.txt') | ||||
|                         elif content_type == 'text/html': | ||||
|                             with open('/tmp/last.html', 'wb') as f: | ||||
|                                 f.write(payload) | ||||
|                             print(f'Written text/html part to /tmp/last.html') | ||||
|             else: | ||||
|                 # Single part message | ||||
|                 content_type = msg.get_content_type() | ||||
|                 payload = msg.get_payload(decode=True) | ||||
|  | ||||
|                 if payload: | ||||
|                     if content_type == 'text/plain' or content_type.startswith('text/'): | ||||
|                         with open('/tmp/last.txt', 'wb') as f: | ||||
|                             f.write(payload) | ||||
|                         print(f'Written single part message to /tmp/last.txt') | ||||
|  | ||||
|             return '250 Message accepted for delivery' | ||||
|         finally: | ||||
|             with smtp_lock: | ||||
|                 active_smtp_connections -= 1 | ||||
|  | ||||
|  | ||||
| class EchoServerProtocol(asyncio.Protocol): | ||||
|     def connection_made(self, transport): | ||||
|         global last_received_message | ||||
|         self.transport = transport | ||||
|         peername = transport.get_extra_info('peername') | ||||
|         print('Incoming connection from {}'.format(peername)) | ||||
|         self.transport.write(last_received_message) | ||||
|  | ||||
|         last_received_message = b'' | ||||
|         self.transport.close() | ||||
| # Simple Flask HTTP server to echo back the last SMTP message | ||||
| app = Flask(__name__) | ||||
|  | ||||
|  | ||||
| async def main(): | ||||
| @app.route('/') | ||||
| def echo_last_message(): | ||||
|     global last_received_message, active_smtp_connections | ||||
|  | ||||
|     # Wait for any in-progress SMTP connections to complete | ||||
|     max_wait = 5  # Maximum 5 seconds | ||||
|     wait_interval = 0.05  # Check every 50ms | ||||
|     elapsed = 0 | ||||
|  | ||||
|     while elapsed < max_wait: | ||||
|         with smtp_lock: | ||||
|             if active_smtp_connections == 0: | ||||
|                 break | ||||
|         time.sleep(wait_interval) | ||||
|         elapsed += wait_interval | ||||
|  | ||||
|     return Response(last_received_message, mimetype='text/plain') | ||||
|  | ||||
|  | ||||
| def run_flask(): | ||||
|     app.run(host='0.0.0.0', port=11080, debug=False, use_reloader=False) | ||||
|  | ||||
|  | ||||
| if __name__ == "__main__": | ||||
|     # Start the SMTP server | ||||
|     controller = Controller(CustomSMTPHandler(), hostname='0.0.0.0', port=11025) | ||||
|     controller.start() | ||||
|  | ||||
|     # Start the TCP Echo server | ||||
|     loop = asyncio.get_running_loop() | ||||
|     server = await loop.create_server( | ||||
|         lambda: EchoServerProtocol(), | ||||
|         '0.0.0.0', 11080 | ||||
|     ) | ||||
|     async with server: | ||||
|         await server.serve_forever() | ||||
|     # Start the HTTP server in a separate thread | ||||
|     flask_thread = threading.Thread(target=run_flask, daemon=True) | ||||
|     flask_thread.start() | ||||
|  | ||||
|  | ||||
| if __name__ == "__main__": | ||||
|     asyncio.run(main()) | ||||
|     # Keep the main thread alive | ||||
|     try: | ||||
|         flask_thread.join() | ||||
|     except KeyboardInterrupt: | ||||
|         print("Shutting down...") | ||||
|   | ||||
| @@ -3,7 +3,8 @@ from flask import url_for | ||||
| from email import message_from_string | ||||
| from email.policy import default as email_policy | ||||
|  | ||||
| from changedetectionio.diff import HTML_REMOVED_STYLE, HTML_ADDED_STYLE | ||||
| from changedetectionio.diff import HTML_REMOVED_STYLE, HTML_ADDED_STYLE, HTML_CHANGED_STYLE | ||||
| from changedetectionio.notification_service import NotificationContextData | ||||
| from changedetectionio.tests.util import set_original_response, set_modified_response, set_more_modified_response, live_server_setup, \ | ||||
|     wait_for_all_checks, \ | ||||
|     set_longer_modified_response, delete_all_watches | ||||
| @@ -14,6 +15,8 @@ import logging | ||||
| # NOTE - RELIES ON mailserver as hostname running, see github build recipes | ||||
| smtp_test_server = 'mailserver' | ||||
|  | ||||
| ALL_MARKUP_TOKENS = ''.join(f"TOKEN: '{t}'\n{{{{{t}}}}}\n" for t in NotificationContextData().keys()) | ||||
|  | ||||
| from changedetectionio.notification import ( | ||||
|     default_notification_body, | ||||
|     default_notification_format, | ||||
| @@ -24,16 +27,14 @@ from changedetectionio.notification import ( | ||||
|  | ||||
|  | ||||
| def get_last_message_from_smtp_server(): | ||||
|     import socket | ||||
|     port = 11080  # socket server port number | ||||
|  | ||||
|     client_socket = socket.socket()  # instantiate | ||||
|     client_socket.connect((smtp_test_server, port))  # connect to the server | ||||
|  | ||||
|     data = client_socket.recv(50024).decode()  # receive response | ||||
|     import requests | ||||
|     time.sleep(1) # wait for any smtp connects to die off | ||||
|     port = 11080  # HTTP server port number | ||||
|     # Make HTTP GET request to Flask server | ||||
|     response = requests.get(f'http://{smtp_test_server}:{port}/') | ||||
|     data = response.text | ||||
|     logging.info("get_last_message_from_smtp_server..") | ||||
|     logging.info(data) | ||||
|     client_socket.close()  # close the connection | ||||
|     return data | ||||
|  | ||||
|  | ||||
| @@ -52,7 +53,7 @@ def test_check_notification_email_formats_default_HTML(client, live_server, meas | ||||
|         data={"application-notification_urls": notification_url, | ||||
|               "application-notification_title": "fallback-title " + default_notification_title, | ||||
|               "application-notification_body": "some text\nfallback-body<br> " + default_notification_body, | ||||
|               "application-notification_format": 'HTML', | ||||
|               "application-notification_format": 'html', | ||||
|               "requests-time_between_check-minutes": 180, | ||||
|               'application-fetch_backend': "html_requests"}, | ||||
|         follow_redirects=True | ||||
| @@ -121,7 +122,7 @@ def test_check_notification_plaintext_format(client, live_server, measure_memory | ||||
|         data={"application-notification_urls": notification_url, | ||||
|               "application-notification_title": "fallback-title " + default_notification_title, | ||||
|               "application-notification_body": "some text\n" + default_notification_body, | ||||
|               "application-notification_format": 'Plain Text', | ||||
|               "application-notification_format": 'text', | ||||
|               "requests-time_between_check-minutes": 180, | ||||
|               'application-fetch_backend': "html_requests"}, | ||||
|         follow_redirects=True | ||||
| @@ -172,8 +173,8 @@ def test_check_notification_html_color_format(client, live_server, measure_memor | ||||
|         url_for("settings.settings_page"), | ||||
|         data={"application-notification_urls": notification_url, | ||||
|               "application-notification_title": "fallback-title " + default_notification_title, | ||||
|               "application-notification_body": "some text\n" + default_notification_body, | ||||
|               "application-notification_format": 'HTML Color', | ||||
|               "application-notification_body": f"some text\n{default_notification_body}\nMore output test\n{ALL_MARKUP_TOKENS}", | ||||
|               "application-notification_format": 'htmlcolor', | ||||
|               "requests-time_between_check-minutes": 180, | ||||
|               'application-fetch_backend': "html_requests"}, | ||||
|         follow_redirects=True | ||||
| @@ -225,8 +226,9 @@ def test_check_notification_html_color_format(client, live_server, measure_memor | ||||
|     html_part = parts[1] | ||||
|     assert html_part.get_content_type() == 'text/html' | ||||
|     html_content = html_part.get_content() | ||||
|     assert HTML_REMOVED_STYLE in html_content | ||||
|     assert HTML_CHANGED_STYLE or HTML_REMOVED_STYLE in html_content | ||||
|     assert HTML_ADDED_STYLE in html_content | ||||
|     assert '<' not in html_content | ||||
|  | ||||
|     assert 'some text<br>' in html_content | ||||
|     delete_all_watches(client) | ||||
| @@ -243,7 +245,7 @@ def test_check_notification_markdown_format(client, live_server, measure_memory_ | ||||
|         data={"application-notification_urls": notification_url, | ||||
|               "application-notification_title": "fallback-title " + default_notification_title, | ||||
|               "application-notification_body": "*header*\n\nsome text\n" + default_notification_body, | ||||
|               "application-notification_format": 'Markdown to HTML', | ||||
|               "application-notification_format": 'markdown', | ||||
|               "requests-time_between_check-minutes": 180, | ||||
|               'application-fetch_backend': "html_requests"}, | ||||
|         follow_redirects=True | ||||
| @@ -288,7 +290,8 @@ def test_check_notification_markdown_format(client, live_server, measure_memory_ | ||||
|     text_part = parts[0] | ||||
|     assert text_part.get_content_type() == 'text/plain' | ||||
|     text_content = text_part.get_content() | ||||
|     assert '(added) So let\'s see what happens.\r\n' in text_content  # The plaintext part | ||||
|     # We wont see anything in the "FALLBACK" text but that's OK (no added/strikethrough etc) | ||||
|     assert 'So let\'s see what happens.\r\n' in text_content  # The plaintext part | ||||
|  | ||||
|  | ||||
|     # Second part should be text/html and roughly converted from markdown to HTML | ||||
| @@ -296,10 +299,10 @@ def test_check_notification_markdown_format(client, live_server, measure_memory_ | ||||
|     assert html_part.get_content_type() == 'text/html' | ||||
|     html_content = html_part.get_content() | ||||
|     assert '<p><em>header</em></p>' in html_content | ||||
|     assert '(added) So let\'s see what happens.<br' in html_content | ||||
|     assert '<strong>So let\'s see what happens.</strong><br>' in html_content # Additions are <strong> in markdown | ||||
|     delete_all_watches(client) | ||||
|  | ||||
|  | ||||
| # Custom notification body with HTML, that is either sent as HTML or rendered to plaintext and sent | ||||
| def test_check_notification_email_formats_default_Text_override_HTML(client, live_server, measure_memory_usage): | ||||
|  | ||||
|     # HTML problems? see this | ||||
| @@ -326,7 +329,7 @@ def test_check_notification_email_formats_default_Text_override_HTML(client, liv | ||||
|         data={"application-notification_urls": notification_url, | ||||
|               "application-notification_title": "fallback-title " + default_notification_title, | ||||
|               "application-notification_body": notification_body, | ||||
|               "application-notification_format": 'Plain Text', | ||||
|               "application-notification_format": 'text', | ||||
|               "requests-time_between_check-minutes": 180, | ||||
|               'application-fetch_backend': "html_requests"}, | ||||
|         follow_redirects=True | ||||
| @@ -334,7 +337,7 @@ def test_check_notification_email_formats_default_Text_override_HTML(client, liv | ||||
|     assert b"Settings updated." in res.data | ||||
|  | ||||
|     # Add a watch and trigger a HTTP POST | ||||
|     test_url = url_for('test_endpoint', _external=True) | ||||
|     test_url = url_for('test_endpoint',content_type="text/html", _external=True) | ||||
|     res = client.post( | ||||
|         url_for("ui.ui_views.form_quick_watch_add"), | ||||
|         data={"url": test_url, "tags": 'nice one'}, | ||||
| @@ -343,6 +346,7 @@ def test_check_notification_email_formats_default_Text_override_HTML(client, liv | ||||
|  | ||||
|     assert b"Watch added" in res.data | ||||
|  | ||||
|     #################################### FIRST SITUATION, PLAIN TEXT NOTIFICATION IS WANTED BUT WE HAVE HTML IN OUR TEMPLATE AND CONTENT ########## | ||||
|     wait_for_all_checks(client) | ||||
|     set_longer_modified_response() | ||||
|     time.sleep(2) | ||||
| @@ -365,14 +369,17 @@ def test_check_notification_email_formats_default_Text_override_HTML(client, liv | ||||
|     # Get the plain text content | ||||
|     text_content = msg.get_content() | ||||
|     assert '(added) So let\'s see what happens.\r\n' in text_content  # The plaintext part | ||||
|     assert '<!DOCTYPE html>' in text_content # even tho they added html, they selected plaintext so it should have not got converted | ||||
|  | ||||
|  | ||||
|     #################################### SECOND SITUATION, HTML IS CORRECTLY PASSED THROUGH TO THE EMAIL #################### | ||||
|     set_original_response() | ||||
|     # Now override as HTML format | ||||
|     res = client.post( | ||||
|         url_for("ui.ui_edit.edit_page", uuid="first"), | ||||
|         data={ | ||||
|             "url": test_url, | ||||
|             "notification_format": 'HTML', | ||||
|             "notification_format": 'html', | ||||
|             'fetch_backend': "html_requests", | ||||
|             "time_between_check_use_default": "y"}, | ||||
|         follow_redirects=True | ||||
| @@ -405,10 +412,271 @@ def test_check_notification_email_formats_default_Text_override_HTML(client, liv | ||||
|     html_part = parts[1] | ||||
|     assert html_part.get_content_type() == 'text/html' | ||||
|     html_content = html_part.get_content() | ||||
|     assert '(removed) So let\'s see what happens.<br>' in html_content  # the html part | ||||
|     assert '(removed) So let\'s see what happens.' in html_content  # the html part | ||||
|     assert '<!DOCTYPE html' not in html_content | ||||
|     assert '<!DOCTYPE html' in html_content # Our original template is working correctly | ||||
|  | ||||
|     # https://github.com/dgtlmoon/changedetection.io/issues/2103 | ||||
|     assert '<h1>Test</h1>' in html_content | ||||
|     assert '<' not in html_content | ||||
|  | ||||
|     delete_all_watches(client) | ||||
|  | ||||
| def test_check_plaintext_document_plaintext_notification_smtp(client, live_server, measure_memory_usage): | ||||
|     """When following a plaintext document, notification in Plain Text format is sent correctly""" | ||||
|  | ||||
|     with open("test-datastore/endpoint-content.txt", "w") as f: | ||||
|         f.write("Some nice plain text\nwhich we add some extra data\nover here\n") | ||||
|  | ||||
|     notification_url = f'mailto://changedetection@{smtp_test_server}:11025/?to=fff@home.com' | ||||
|     notification_body = f"""{default_notification_body}""" | ||||
|  | ||||
|     ##################### | ||||
|     # Set this up for when we remove the notification from the watch, it should fallback with these details | ||||
|     res = client.post( | ||||
|         url_for("settings.settings_page"), | ||||
|         data={"application-notification_urls": notification_url, | ||||
|               "application-notification_title": "fallback-title " + default_notification_title, | ||||
|               "application-notification_body": f"{notification_body}\nMore output test\n{ALL_MARKUP_TOKENS}", | ||||
|               "application-notification_format": 'text', | ||||
|               "requests-time_between_check-minutes": 180, | ||||
|               'application-fetch_backend': "html_requests"}, | ||||
|         follow_redirects=True | ||||
|     ) | ||||
|     assert b"Settings updated." in res.data | ||||
|  | ||||
|     # Add our URL to the import page | ||||
|     test_url = url_for('test_endpoint', content_type="text/plain", _external=True) | ||||
|     uuid = client.application.config.get('DATASTORE').add_watch(url=test_url) | ||||
|     client.get(url_for("ui.form_watch_checknow"), follow_redirects=True) | ||||
|     wait_for_all_checks(client) | ||||
|  | ||||
|     # Change the content | ||||
|     with open("test-datastore/endpoint-content.txt", "w") as f: | ||||
|         f.write("Some nice plain text\nwhich we add some extra data\nAnd let's talk about <title> tags\nover here\n") | ||||
|  | ||||
|  | ||||
|     time.sleep(1) | ||||
|     client.get(url_for("ui.form_watch_checknow"), follow_redirects=True) | ||||
|     wait_for_all_checks(client) | ||||
|  | ||||
|     # Parse the email properly using Python's email library | ||||
|     msg = message_from_string(get_last_message_from_smtp_server(), policy=email_policy) | ||||
|  | ||||
|     assert not msg.is_multipart() | ||||
|     assert msg.get_content_type() == 'text/plain' | ||||
|     body = msg.get_content() | ||||
|     # nothing is escaped, raw html stuff in text/plain | ||||
|     assert 'talk about <title> tags' in body | ||||
|     assert '(added)' in body | ||||
|     assert '<br' not in body | ||||
|     assert '<' not in body | ||||
|     assert '<pre' not in body | ||||
|     delete_all_watches(client) | ||||
|  | ||||
| def test_check_plaintext_document_html_notifications(client, live_server, measure_memory_usage): | ||||
|     """When following a plaintext document, notification in Plain Text format is sent correctly""" | ||||
|  | ||||
|     with open("test-datastore/endpoint-content.txt", "w") as f: | ||||
|         f.write("    Some nice plain text\nwhich we add some extra data\nover here\n") | ||||
|  | ||||
|     notification_url = f'mailto://changedetection@{smtp_test_server}:11025/?to=fff@home.com' | ||||
|     notification_body = f"""{default_notification_body}""" | ||||
|  | ||||
|     ##################### | ||||
|     # Set this up for when we remove the notification from the watch, it should fallback with these details | ||||
|     res = client.post( | ||||
|         url_for("settings.settings_page"), | ||||
|         data={"application-notification_urls": notification_url, | ||||
|               "application-notification_title": "fallback-title " + default_notification_title, | ||||
|               "application-notification_body": f"{notification_body}\nMore output test\n{ALL_MARKUP_TOKENS}", | ||||
|               "application-notification_format": 'html', | ||||
|               "requests-time_between_check-minutes": 180, | ||||
|               'application-fetch_backend': "html_requests"}, | ||||
|         follow_redirects=True | ||||
|     ) | ||||
|     assert b"Settings updated." in res.data | ||||
|  | ||||
|     # Add our URL to the import page | ||||
|     test_url = url_for('test_endpoint', content_type="text/plain", _external=True) | ||||
|     uuid = client.application.config.get('DATASTORE').add_watch(url=test_url) | ||||
|     client.get(url_for("ui.form_watch_checknow"), follow_redirects=True) | ||||
|     wait_for_all_checks(client) | ||||
|  | ||||
|     # Change the content | ||||
|     with open("test-datastore/endpoint-content.txt", "w") as f: | ||||
|         f.write("    Some nice plain text\nwhich we add some extra data\nAnd let's talk about <title> tags\nover here\n") | ||||
|  | ||||
|  | ||||
|     time.sleep(2) | ||||
|     client.get(url_for("ui.form_watch_checknow"), follow_redirects=True) | ||||
|     wait_for_all_checks(client) | ||||
|  | ||||
|     # Parse the email properly using Python's email library | ||||
|     msg = message_from_string(get_last_message_from_smtp_server(), policy=email_policy) | ||||
|  | ||||
|  | ||||
|     # The email should have two bodies (multipart/alternative) | ||||
|     assert msg.is_multipart() | ||||
|     assert msg.get_content_type() == 'multipart/alternative' | ||||
|  | ||||
|     # Get the parts | ||||
|     parts = list(msg.iter_parts()) | ||||
|     assert len(parts) == 2 | ||||
|  | ||||
|     # First part should be text/plain | ||||
|     text_part = parts[0] | ||||
|     assert text_part.get_content_type() == 'text/plain' | ||||
|     text_content = text_part.get_content() | ||||
|     html_part = parts[1] | ||||
|     assert html_part.get_content_type() == 'text/html' | ||||
|     html_content = html_part.get_content() | ||||
|  | ||||
|  | ||||
|     assert 'And let\'s talk about <title> tags\r\n' in text_content | ||||
|     assert '<br' not in text_content | ||||
|     assert '<span' not in text_content | ||||
|  | ||||
|  | ||||
|     assert 'talk about <title>' not in html_content  # the html part, should have got marked up to < etc | ||||
|     assert 'talk about <title>' in html_content | ||||
|     # Should be the HTML, but not HTML Color | ||||
|     assert 'background-color' not in html_content | ||||
|     assert '<br>(added) And let's talk about <title> tags<br>' in html_content | ||||
|     assert '<br' not in html_content | ||||
|     assert '<pre role="article"' in html_content # Should have got wrapped nicely in email_helpers.py | ||||
|  | ||||
|     # And now for the whitespace retention | ||||
|     assert '    Some nice plain text' in html_content | ||||
|     assert '(added) And let' in html_content # just to show a single whitespace didnt get touched | ||||
|     delete_all_watches(client) | ||||
|  | ||||
|  | ||||
| def test_check_plaintext_document_html_color_notifications(client, live_server, measure_memory_usage): | ||||
|     """When following a plaintext document, notification in Plain Text format is sent correctly""" | ||||
|  | ||||
|     with open("test-datastore/endpoint-content.txt", "w") as f: | ||||
|         f.write("Some nice plain text\nwhich we add some extra data\nover here\n") | ||||
|  | ||||
|     notification_url = f'mailto://changedetection@{smtp_test_server}:11025/?to=fff@home.com' | ||||
|     notification_body = f"""{default_notification_body}""" | ||||
|  | ||||
|     ##################### | ||||
|     # Set this up for when we remove the notification from the watch, it should fallback with these details | ||||
|     res = client.post( | ||||
|         url_for("settings.settings_page"), | ||||
|         data={"application-notification_urls": notification_url, | ||||
|               "application-notification_title": "fallback-title " + default_notification_title, | ||||
|               "application-notification_body": f"{notification_body}\nMore output test\n{ALL_MARKUP_TOKENS}", | ||||
|               "application-notification_format": 'htmlcolor', | ||||
|               "requests-time_between_check-minutes": 180, | ||||
|               'application-fetch_backend': "html_requests"}, | ||||
|         follow_redirects=True | ||||
|     ) | ||||
|  | ||||
|     assert b"Settings updated." in res.data | ||||
|  | ||||
|     # Add our URL to the import page | ||||
|     test_url = url_for('test_endpoint', content_type="text/plain", _external=True) | ||||
|     uuid = client.application.config.get('DATASTORE').add_watch(url=test_url) | ||||
|     client.get(url_for("ui.form_watch_checknow"), follow_redirects=True) | ||||
|     wait_for_all_checks(client) | ||||
|  | ||||
|     # Change the content | ||||
|     with open("test-datastore/endpoint-content.txt", "w") as f: | ||||
|         f.write("Some nice plain text\nwhich we add some extra data\nAnd let's talk about <title> tags\nover here\n") | ||||
|  | ||||
|     time.sleep(1) | ||||
|     client.get(url_for("ui.form_watch_checknow"), follow_redirects=True) | ||||
|     wait_for_all_checks(client) | ||||
|  | ||||
|     # Parse the email properly using Python's email library | ||||
|     msg = message_from_string(get_last_message_from_smtp_server(), policy=email_policy) | ||||
|  | ||||
|     # The email should have two bodies (multipart/alternative) | ||||
|     assert msg.is_multipart() | ||||
|     assert msg.get_content_type() == 'multipart/alternative' | ||||
|  | ||||
|     # Get the parts | ||||
|     parts = list(msg.iter_parts()) | ||||
|     assert len(parts) == 2 | ||||
|  | ||||
|     # First part should be text/plain | ||||
|     text_part = parts[0] | ||||
|     assert text_part.get_content_type() == 'text/plain' | ||||
|     text_content = text_part.get_content() | ||||
|     html_part = parts[1] | ||||
|     assert html_part.get_content_type() == 'text/html' | ||||
|     html_content = html_part.get_content() | ||||
|  | ||||
|  | ||||
|     assert 'And let\'s talk about <title> tags\r\n' in text_content | ||||
|     assert '<br' not in text_content | ||||
|     assert '<span' not in text_content | ||||
|  | ||||
|     assert 'talk about <title>' not in html_content  # the html part, should have got marked up to < etc | ||||
|     assert 'talk about <title>' in html_content | ||||
|     # Should be the HTML, but not HTML Color | ||||
|     assert 'background-color' in html_content | ||||
|     assert '(added) And let' not in html_content | ||||
|     assert '<br' not in html_content | ||||
|     assert '<br>' in html_content | ||||
|     assert '<pre role="article"' in html_content # Should have got wrapped nicely in email_helpers.py | ||||
|     delete_all_watches(client) | ||||
|  | ||||
| def test_check_html_document_plaintext_notification(client, live_server, measure_memory_usage): | ||||
|     """When following a HTML document, notification in Plain Text format is sent correctly""" | ||||
|  | ||||
|     with open("test-datastore/endpoint-content.txt", "w") as f: | ||||
|         f.write("<html><body>some stuff<br>and more stuff<br>and even more stuff<br></body></html>") | ||||
|  | ||||
|     notification_url = f'mailto://changedetection@{smtp_test_server}:11025/?to=fff@home.com' | ||||
|     notification_body = f"""{default_notification_body}""" | ||||
|  | ||||
|     ##################### | ||||
|     # Set this up for when we remove the notification from the watch, it should fallback with these details | ||||
|     res = client.post( | ||||
|         url_for("settings.settings_page"), | ||||
|         data={"application-notification_urls": notification_url, | ||||
|               "application-notification_title": "fallback-title " + default_notification_title, | ||||
|               "application-notification_body": f"{notification_body}\nMore output test\n{ALL_MARKUP_TOKENS}", | ||||
|               "application-notification_format": 'text', | ||||
|               "requests-time_between_check-minutes": 180, | ||||
|               'application-fetch_backend': "html_requests"}, | ||||
|         follow_redirects=True | ||||
|     ) | ||||
|  | ||||
|     assert b"Settings updated." in res.data | ||||
|  | ||||
|     # Add our URL to the import page | ||||
|     test_url = url_for('test_endpoint', content_type="text/html", _external=True) | ||||
|     uuid = client.application.config.get('DATASTORE').add_watch(url=test_url) | ||||
|     client.get(url_for("ui.form_watch_checknow"), follow_redirects=True) | ||||
|     wait_for_all_checks(client) | ||||
|  | ||||
|     with open("test-datastore/endpoint-content.txt", "w") as f: | ||||
|         f.write("<html><body>sxome stuff<br>and more stuff<br>lets slip this in<br>and this in<br>and even more stuff<br><tag></body></html>") | ||||
|  | ||||
|     time.sleep(0.1) | ||||
|     client.get(url_for("ui.form_watch_checknow"), follow_redirects=True) | ||||
|     wait_for_all_checks(client) | ||||
|  | ||||
|  | ||||
|     # Parse the email properly using Python's email library | ||||
|     msg = message_from_string(get_last_message_from_smtp_server(), policy=email_policy) | ||||
|  | ||||
|     assert not msg.is_multipart() | ||||
|     assert msg.get_content_type() == 'text/plain' | ||||
|     body = msg.get_content() | ||||
|  | ||||
|     assert '<tag>' in body # Should have got converted from original HTML to plaintext | ||||
|     assert '(changed) some stuff\r\n' in body | ||||
|     assert '(into) sxome stuff\r\n' in body | ||||
|     assert '(added) lets slip this in\r\n' in body | ||||
|     assert '(added) and this in\r\n' in body | ||||
|     assert ' ' not in body | ||||
|  | ||||
|  | ||||
|     delete_all_watches(client) | ||||
|  | ||||
|  | ||||
|   | ||||
| @@ -124,7 +124,7 @@ def test_check_add_line_contains_trigger(client, live_server, measure_memory_usa | ||||
|               "application-notification_body": 'triggered text was -{{triggered_text}}- ### 网站监测 内容更新了 ####', | ||||
|               # https://github.com/caronc/apprise/wiki/Notify_Custom_JSON#get-parameter-manipulation | ||||
|               "application-notification_urls": test_notification_url, | ||||
|               "application-notification_format": 'Plain Text', | ||||
|               "application-notification_format": 'text', | ||||
|               "application-minutes_between_check": 180, | ||||
|               "application-fetch_backend": "html_requests" | ||||
|               }, | ||||
|   | ||||
| @@ -370,7 +370,7 @@ def test_api_watch_PUT_update(client, live_server, measure_memory_usage): | ||||
|  | ||||
|     ###################################################### | ||||
|  | ||||
|     # HTTP PUT try a field that doenst exist | ||||
|     # HTTP PUT try a field that doesn't exist | ||||
|  | ||||
|     # HTTP PUT an update | ||||
|     res = client.put( | ||||
| @@ -383,6 +383,17 @@ def test_api_watch_PUT_update(client, live_server, measure_memory_usage): | ||||
|     # Message will come from `flask_expects_json` | ||||
|     assert b'Additional properties are not allowed' in res.data | ||||
|  | ||||
|  | ||||
|     # Try a XSS URL | ||||
|     res = client.put( | ||||
|         url_for("watch", uuid=watch_uuid), | ||||
|         headers={'x-api-key': api_key, 'content-type': 'application/json'}, | ||||
|         data=json.dumps({ | ||||
|             'url': 'javascript:alert(document.domain)' | ||||
|         }), | ||||
|     ) | ||||
|     assert res.status_code == 400 | ||||
|  | ||||
|     # Cleanup everything | ||||
|     delete_all_watches(client) | ||||
|  | ||||
| @@ -394,7 +405,8 @@ def test_api_import(client, live_server, measure_memory_usage): | ||||
|     res = client.post( | ||||
|         url_for("import") + "?tag=import-test", | ||||
|         data='https://website1.com\r\nhttps://website2.com', | ||||
|         headers={'x-api-key': api_key, 'content-type': 'text/plain'}, | ||||
|         # We removed  'content-type': 'text/plain', the Import API should assume this if none is set #3547 #3542 | ||||
|         headers={'x-api-key': api_key}, | ||||
|         follow_redirects=True | ||||
|     ) | ||||
|  | ||||
|   | ||||
| @@ -86,7 +86,7 @@ def test_filter_doesnt_exist_then_exists_should_get_notification(client, live_se | ||||
|                                                    "Diff Full: {{diff_full}}\n" | ||||
|                                                    "Diff as Patch: {{diff_patch}}\n" | ||||
|                                                    ":-)", | ||||
|                               "notification_format": 'Plain Text'} | ||||
|                               "notification_format": 'text'} | ||||
|  | ||||
|     notification_form_data.update({ | ||||
|         "url": test_url, | ||||
|   | ||||
| @@ -63,7 +63,7 @@ def run_filter_test(client, live_server, content_filter, app_notification_format | ||||
|                                        "Diff Full: {{diff_full}}\n" | ||||
|                                        "Diff as Patch: {{diff_patch}}\n" | ||||
|                                        ":-)", | ||||
|                   "notification_format": 'Plain Text', | ||||
|                   "notification_format": 'text', | ||||
|                   "fetch_backend": "html_requests", | ||||
|                   "filter_failure_notification_send": 'y', | ||||
|                   "time_between_check_use_default": "y", | ||||
| @@ -175,13 +175,13 @@ def run_filter_test(client, live_server, content_filter, app_notification_format | ||||
|  | ||||
| def test_check_include_filters_failure_notification(client, live_server, measure_memory_usage): | ||||
|     #   #  live_server_setup(live_server) # Setup on conftest per function | ||||
|     run_filter_test(client=client, live_server=live_server, content_filter='#nope-doesnt-exist', app_notification_format=valid_notification_formats.get('HTML Color')) | ||||
|     run_filter_test(client=client, live_server=live_server, content_filter='#nope-doesnt-exist', app_notification_format=valid_notification_formats.get('htmlcolor')) | ||||
|     # Check markup send conversion didnt affect plaintext preference | ||||
|     run_filter_test(client=client, live_server=live_server, content_filter='#nope-doesnt-exist', app_notification_format=valid_notification_formats.get('Plain Text')) | ||||
|     run_filter_test(client=client, live_server=live_server, content_filter='#nope-doesnt-exist', app_notification_format=valid_notification_formats.get('text')) | ||||
|  | ||||
| def test_check_xpath_filter_failure_notification(client, live_server, measure_memory_usage): | ||||
|     #   #  live_server_setup(live_server) # Setup on conftest per function | ||||
|     run_filter_test(client=client, live_server=live_server, content_filter='//*[@id="nope-doesnt-exist"]', app_notification_format=valid_notification_formats.get('HTML Color')) | ||||
|     run_filter_test(client=client, live_server=live_server, content_filter='//*[@id="nope-doesnt-exist"]', app_notification_format=valid_notification_formats.get('htmlcolor')) | ||||
|  | ||||
| # Test that notification is never sent | ||||
|  | ||||
|   | ||||
| @@ -195,7 +195,7 @@ def test_group_tag_notification(client, live_server, measure_memory_usage): | ||||
|                                                    "Diff as Patch: {{diff_patch}}\n" | ||||
|                                                    ":-)", | ||||
|                               "notification_screenshot": True, | ||||
|                               "notification_format": 'Plain Text', | ||||
|                               "notification_format": 'text', | ||||
|                               "title": "test-tag"} | ||||
|  | ||||
|     res = client.post( | ||||
|   | ||||
| @@ -169,4 +169,161 @@ def test_default_timezone_subtraction(environment): | ||||
|  | ||||
|     finalRender = render("{% now '' - 'minutes=11' %}") | ||||
|  | ||||
|     assert finalRender == "Wed, 09 Dec 2015 23:22:01" | ||||
|     assert finalRender == "Wed, 09 Dec 2015 23:22:01" | ||||
|  | ||||
| def test_regex_replace_basic(): | ||||
|     """Test basic regex_replace functionality.""" | ||||
|  | ||||
|     # Simple word replacement | ||||
|     finalRender = render("{{ 'hello world' | regex_replace('world', 'universe') }}") | ||||
|     assert finalRender == "hello universe" | ||||
|  | ||||
| def test_regex_replace_with_groups(): | ||||
|     """Test regex_replace with capture groups (issue #3501 use case).""" | ||||
|  | ||||
|     # Transform HTML table data as described in the issue | ||||
|     template = "{{ '<td>thing</td><td>other</td>' | regex_replace('<td>([^<]+)</td><td>([^<]+)</td>', 'ThingLabel: \\\\1\\nOtherLabel: \\\\2') }}" | ||||
|     finalRender = render(template) | ||||
|     assert "ThingLabel: thing" in finalRender | ||||
|     assert "OtherLabel: other" in finalRender | ||||
|  | ||||
| def test_regex_replace_multiple_matches(): | ||||
|     """Test regex_replace replacing multiple occurrences.""" | ||||
|  | ||||
|     finalRender = render("{{ 'foo bar foo baz' | regex_replace('foo', 'qux') }}") | ||||
|     assert finalRender == "qux bar qux baz" | ||||
|  | ||||
| def test_regex_replace_count_parameter(): | ||||
|     """Test regex_replace with count parameter to limit replacements.""" | ||||
|  | ||||
|     finalRender = render("{{ 'foo bar foo baz' | regex_replace('foo', 'qux', 1) }}") | ||||
|     assert finalRender == "qux bar foo baz" | ||||
|  | ||||
| def test_regex_replace_empty_replacement(): | ||||
|     """Test regex_replace with empty replacement (removal).""" | ||||
|  | ||||
|     finalRender = render("{{ 'hello world 123' | regex_replace('[0-9]+', '') }}") | ||||
|     assert finalRender == "hello world " | ||||
|  | ||||
| def test_regex_replace_no_match(): | ||||
|     """Test regex_replace when pattern doesn't match.""" | ||||
|  | ||||
|     finalRender = render("{{ 'hello world' | regex_replace('xyz', 'abc') }}") | ||||
|     assert finalRender == "hello world" | ||||
|  | ||||
| def test_regex_replace_invalid_regex(): | ||||
|     """Test regex_replace with invalid regex pattern returns original value.""" | ||||
|  | ||||
|     # Invalid regex (unmatched parenthesis) | ||||
|     finalRender = render("{{ 'hello world' | regex_replace('(invalid', 'replacement') }}") | ||||
|     assert finalRender == "hello world" | ||||
|  | ||||
| def test_regex_replace_special_characters(): | ||||
|     """Test regex_replace with special regex characters.""" | ||||
|  | ||||
|     finalRender = render("{{ 'Price: $50.00' | regex_replace('\\\\$([0-9.]+)', 'USD \\\\1') }}") | ||||
|     assert finalRender == "Price: USD 50.00" | ||||
|  | ||||
| def test_regex_replace_multiline(): | ||||
|     """Test regex_replace on multiline text.""" | ||||
|  | ||||
|     template = "{{ 'line1\\nline2\\nline3' | regex_replace('^line', 'row') }}" | ||||
|     finalRender = render(template) | ||||
|     # By default re.sub doesn't use MULTILINE flag, so only first line matches with ^ | ||||
|     assert finalRender == "row1\nline2\nline3" | ||||
|  | ||||
| def test_regex_replace_with_notification_context(): | ||||
|     """Test regex_replace with notification diff variable.""" | ||||
|  | ||||
|     # Simulate how it would be used in notifications with diff variable | ||||
|     from changedetectionio.notification_service import NotificationContextData | ||||
|  | ||||
|     context = NotificationContextData() | ||||
|     context['diff'] = '<td>value1</td><td>value2</td>' | ||||
|  | ||||
|     template = "{{ diff | regex_replace('<td>([^<]+)</td>', '\\\\1 ') }}" | ||||
|  | ||||
|     from changedetectionio.jinja2_custom import create_jinja_env | ||||
|     from jinja2 import BaseLoader | ||||
|  | ||||
|     jinja2_env = create_jinja_env(loader=BaseLoader) | ||||
|     jinja2_env.globals.update(context) | ||||
|     finalRender = jinja2_env.from_string(template).render() | ||||
|  | ||||
|     assert "value1 value2 " in finalRender | ||||
|  | ||||
| def test_regex_replace_security_large_input(): | ||||
|     """Test regex_replace handles large input safely.""" | ||||
|  | ||||
|     # Create a large input string (over 10MB) | ||||
|     large_input = "x" * (1024 * 1024 * 10 + 1000) | ||||
|     template = "{{ large_input | regex_replace('x', 'y') }}" | ||||
|  | ||||
|     from changedetectionio.jinja2_custom import create_jinja_env | ||||
|     from jinja2 import BaseLoader | ||||
|  | ||||
|     jinja2_env = create_jinja_env(loader=BaseLoader) | ||||
|     jinja2_env.globals['large_input'] = large_input | ||||
|     finalRender = jinja2_env.from_string(template).render() | ||||
|  | ||||
|     # Should be truncated to 10MB | ||||
|     assert len(finalRender) == 1024 * 1024 * 10 | ||||
|  | ||||
| def test_regex_replace_security_long_pattern(): | ||||
|     """Test regex_replace rejects very long patterns.""" | ||||
|  | ||||
|     # Pattern longer than 500 chars should be rejected | ||||
|     long_pattern = "a" * 501 | ||||
|     finalRender = render("{{ 'test' | regex_replace('" + long_pattern + "', 'replacement') }}") | ||||
|  | ||||
|     # Should return original value when pattern is too long | ||||
|     assert finalRender == "test" | ||||
|  | ||||
| def test_regex_replace_security_dangerous_pattern(): | ||||
|     """Test regex_replace detects and rejects dangerous nested quantifiers.""" | ||||
|  | ||||
|     # Patterns that could cause catastrophic backtracking | ||||
|     dangerous_patterns = [ | ||||
|         "(a+)+", | ||||
|         "(a*)+", | ||||
|         "(a+)*", | ||||
|         "(a*)*", | ||||
|     ] | ||||
|  | ||||
|     for dangerous in dangerous_patterns: | ||||
|         # Create a template with the dangerous pattern | ||||
|         # Using single quotes to avoid escaping issues | ||||
|         from changedetectionio.jinja2_custom import create_jinja_env | ||||
|         from jinja2 import BaseLoader | ||||
|  | ||||
|         jinja2_env = create_jinja_env(loader=BaseLoader) | ||||
|         jinja2_env.globals['pattern'] = dangerous | ||||
|         template = "{{ 'aaaaaaaaaa' | regex_replace(pattern, 'x') }}" | ||||
|         finalRender = jinja2_env.from_string(template).render() | ||||
|  | ||||
|         # Should return original value when dangerous pattern is detected | ||||
|         assert finalRender == "aaaaaaaaaa" | ||||
|  | ||||
| def test_regex_replace_security_timeout_protection(): | ||||
|     """Test that regex_replace has timeout protection (if SIGALRM available).""" | ||||
|     import signal | ||||
|  | ||||
|     # Only test on systems that support SIGALRM | ||||
|     if not hasattr(signal, 'SIGALRM'): | ||||
|         # Skip test on Windows and other systems without SIGALRM | ||||
|         return | ||||
|  | ||||
|     # This pattern is known to cause exponential backtracking on certain inputs | ||||
|     # but should be caught by our dangerous pattern detector | ||||
|     # We're mainly testing that the timeout mechanism works | ||||
|  | ||||
|     from changedetectionio.jinja2_custom import regex_replace | ||||
|  | ||||
|     # Create input that could trigger slow regex | ||||
|     test_input = "a" * 50 + "b" | ||||
|  | ||||
|     # This shouldn't take long due to our protections | ||||
|     result = regex_replace(test_input, "a+b", "x") | ||||
|  | ||||
|     # Should complete and return a result | ||||
|     assert result is not None | ||||
| @@ -13,10 +13,10 @@ import base64 | ||||
| from changedetectionio.notification import ( | ||||
|     default_notification_body, | ||||
|     default_notification_format, | ||||
|     default_notification_title, | ||||
|     valid_notification_formats, | ||||
|     default_notification_title, valid_notification_formats | ||||
| ) | ||||
|  | ||||
| from ..diff import HTML_CHANGED_STYLE | ||||
| from ..model import USE_SYSTEM_DEFAULT_NOTIFICATION_FORMAT_FOR_WATCH | ||||
|  | ||||
|  | ||||
| # Hard to just add more live server URLs when one test is already running (I think) | ||||
| @@ -47,6 +47,14 @@ def test_check_notification(client, live_server, measure_memory_usage): | ||||
|  | ||||
|     assert b"Settings updated." in res.data | ||||
|  | ||||
|     res = client.get(url_for("settings.settings_page")) | ||||
|     for k,v in valid_notification_formats.items(): | ||||
|         if k == USE_SYSTEM_DEFAULT_NOTIFICATION_FORMAT_FOR_WATCH: | ||||
|             continue | ||||
|         assert f'value="{k}"'.encode() in res.data # Should be by key NOT value | ||||
|         assert f'value="{v}"'.encode() not in res.data # Should be by key NOT value | ||||
|  | ||||
|  | ||||
|     # When test mode is in BASE_URL env mode, we should see this already configured | ||||
|     env_base_url = os.getenv('BASE_URL', '').strip() | ||||
|     if len(env_base_url): | ||||
| @@ -101,7 +109,7 @@ def test_check_notification(client, live_server, measure_memory_usage): | ||||
|                                                    "Diff as Patch: {{diff_patch}}\n" | ||||
|                                                    ":-)", | ||||
|                               "notification_screenshot": True, | ||||
|                               "notification_format": 'Plain Text'} | ||||
|                               "notification_format": 'text'} | ||||
|  | ||||
|     notification_form_data.update({ | ||||
|         "url": test_url, | ||||
| @@ -267,7 +275,7 @@ def test_notification_validation(client, live_server, measure_memory_usage): | ||||
| #        data={"notification_urls": 'json://localhost/foobar', | ||||
| #              "notification_title": "", | ||||
| #              "notification_body": "", | ||||
| #              "notification_format": 'Plain Text', | ||||
| #              "notification_format": 'text', | ||||
| #              "url": test_url, | ||||
| #              "tag": "my tag", | ||||
| #              "title": "my title", | ||||
| @@ -467,6 +475,25 @@ def test_global_send_test_notification(client, live_server, measure_memory_usage | ||||
|         # Should come from notification.py default handler when there is no notification body to pull from | ||||
|         assert 'change detection is cool 网站监测 内容更新了' in x | ||||
|  | ||||
|     ## Check that 'test' catches errors | ||||
|     test_notification_url = 'post://akjsdfkjasdkfjasdkfjasdkjfas232323/should-error' | ||||
|  | ||||
|     ######### Test global/system settings | ||||
|     res = client.post( | ||||
|         url_for("ui.ui_notification.ajax_callback_send_notification_test")+"?mode=global-settings", | ||||
|         data={"notification_urls": test_notification_url}, | ||||
|         follow_redirects=True | ||||
|     ) | ||||
|     assert res.status_code == 400 | ||||
|     assert ( | ||||
|         b"No address found" in res.data or | ||||
|         b"Name or service not known" in res.data or | ||||
|         b"nodename nor servname provided" in res.data or | ||||
|         b"Temporary failure in name resolution" in res.data or | ||||
|         b"Failed to establish a new connection" in res.data or | ||||
|         b"Connection error occurred" in res.data | ||||
|     ) | ||||
|      | ||||
|     client.get( | ||||
|         url_for("ui.form_delete", uuid="all"), | ||||
|         follow_redirects=True | ||||
| @@ -483,9 +510,8 @@ def test_global_send_test_notification(client, live_server, measure_memory_usage | ||||
|     assert b"Error: You must have atleast one watch configured for 'test notification' to work" in res.data | ||||
|  | ||||
|  | ||||
| def _test_color_notifications(client, notification_body_token): | ||||
|  | ||||
|     from changedetectionio.diff import HTML_ADDED_STYLE, HTML_REMOVED_STYLE | ||||
| def _test_color_notifications(client, notification_body_token): | ||||
|  | ||||
|     set_original_response() | ||||
|  | ||||
| @@ -503,7 +529,7 @@ def _test_color_notifications(client, notification_body_token): | ||||
|             "application-fetch_backend": "html_requests", | ||||
|             "application-minutes_between_check": 180, | ||||
|             "application-notification_body": notification_body_token, | ||||
|             "application-notification_format": "HTML Color", | ||||
|             "application-notification_format": "htmlcolor", | ||||
|             "application-notification_urls": test_notification_url, | ||||
|             "application-notification_title": "New ChangeDetection.io Notification - {{ watch_url }}", | ||||
|         }, | ||||
| @@ -533,7 +559,8 @@ def _test_color_notifications(client, notification_body_token): | ||||
|  | ||||
|     with open("test-datastore/notification.txt", 'r') as f: | ||||
|         x = f.read() | ||||
|         assert f'<span style="{HTML_REMOVED_STYLE}">Which is across multiple lines' in x | ||||
|         s =  f'<span style="{HTML_CHANGED_STYLE}" role="note" aria-label="Changed text" title="Changed text">Which is across multiple lines' | ||||
|         assert s in x | ||||
|  | ||||
|  | ||||
|     client.get( | ||||
|   | ||||
| @@ -30,7 +30,7 @@ def test_check_notification_error_handling(client, live_server, measure_memory_u | ||||
|         data={"notification_urls": f"{broken_notification_url}\r\n{working_notification_url}", | ||||
|               "notification_title": "xxx", | ||||
|               "notification_body": "xxxxx", | ||||
|               "notification_format": 'Plain Text', | ||||
|               "notification_format": 'text', | ||||
|               "url": test_url, | ||||
|               "tags": "", | ||||
|               "title": "", | ||||
|   | ||||
| @@ -1,6 +1,8 @@ | ||||
| import os | ||||
|  | ||||
| from flask import url_for | ||||
|  | ||||
| from changedetectionio.tests.util import set_modified_response | ||||
| from .util import live_server_setup, wait_for_all_checks, delete_all_watches | ||||
| from .. import strtobool | ||||
|  | ||||
| @@ -132,6 +134,26 @@ def test_xss(client, live_server, measure_memory_usage): | ||||
|     assert b"<img src=x onerror=alert(" not in res.data | ||||
|     assert b"<img" in res.data | ||||
|  | ||||
|     # Check that even forcing an update directly still doesnt get to the frontend | ||||
|     set_original_response() | ||||
|     XSS_HACK = 'javascript:alert(document.domain)' | ||||
|     uuid = client.application.config.get('DATASTORE').add_watch(url=url_for('test_endpoint', _external=True)) | ||||
|     client.get(url_for("ui.form_watch_checknow"), follow_redirects=True) | ||||
|     wait_for_all_checks(client) | ||||
|     set_modified_response() | ||||
|     client.get(url_for("ui.form_watch_checknow"), follow_redirects=True) | ||||
|     wait_for_all_checks(client) | ||||
|  | ||||
|     live_server.app.config['DATASTORE'].data['watching'][uuid]['url']=XSS_HACK | ||||
|  | ||||
|  | ||||
|     res = client.get(url_for("ui.ui_views.preview_page", uuid=uuid)) | ||||
|     assert XSS_HACK.encode('utf-8') not in res.data and res.status_code == 200 | ||||
|     client.get(url_for("ui.ui_views.diff_history_page", uuid=uuid)) | ||||
|     assert XSS_HACK.encode('utf-8') not in res.data and res.status_code == 200 | ||||
|     res = client.get(url_for("watchlist.index")) | ||||
|     assert XSS_HACK.encode('utf-8') not in res.data and res.status_code == 200 | ||||
|  | ||||
|  | ||||
| def test_xss_watch_last_error(client, live_server, measure_memory_usage): | ||||
|     set_original_response() | ||||
|   | ||||
							
								
								
									
										153
									
								
								changedetectionio/tests/test_xpath_default_namespace.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										153
									
								
								changedetectionio/tests/test_xpath_default_namespace.py
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,153 @@ | ||||
| #!/usr/bin/env python3 | ||||
| """ | ||||
| Unit tests for XPath default namespace handling in RSS/Atom feeds. | ||||
| Tests the fix for issue where //title/text() returns empty on feeds with default namespaces. | ||||
|  | ||||
| Real-world test data from https://github.com/microsoft/PowerToys/releases.atom | ||||
| """ | ||||
|  | ||||
| import sys | ||||
| import os | ||||
| import pytest | ||||
|  | ||||
| sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) | ||||
| import html_tools | ||||
|  | ||||
|  | ||||
| # Real-world Atom feed with default namespace from GitHub PowerToys releases | ||||
| # This is the actual format that was failing before the fix | ||||
| atom_feed_with_default_ns = """<?xml version="1.0" encoding="UTF-8"?> | ||||
| <feed xmlns="http://www.w3.org/2005/Atom" xmlns:media="http://search.yahoo.com/mrss/" xml:lang="en-US"> | ||||
|   <id>tag:github.com,2008:https://github.com/microsoft/PowerToys/releases</id> | ||||
|   <link type="text/html" rel="alternate" href="https://github.com/microsoft/PowerToys/releases"/> | ||||
|   <link type="application/atom+xml" rel="self" href="https://github.com/microsoft/PowerToys/releases.atom"/> | ||||
|   <title>Release notes from PowerToys</title> | ||||
|   <updated>2025-10-23T08:53:12Z</updated> | ||||
|   <entry> | ||||
|     <id>tag:github.com,2008:Repository/184456251/v0.95.1</id> | ||||
|     <updated>2025-10-24T14:20:14Z</updated> | ||||
|     <link rel="alternate" type="text/html" href="https://github.com/microsoft/PowerToys/releases/tag/v0.95.1"/> | ||||
|     <title>Release 0.95.1</title> | ||||
|     <content type="html"><p>This patch release fixes several important stability issues.</p></content> | ||||
|     <author> | ||||
|       <name>Jaylyn-Barbee</name> | ||||
|     </author> | ||||
|   </entry> | ||||
|   <entry> | ||||
|     <id>tag:github.com,2008:Repository/184456251/v0.95.0</id> | ||||
|     <updated>2025-10-17T12:51:21Z</updated> | ||||
|     <link rel="alternate" type="text/html" href="https://github.com/microsoft/PowerToys/releases/tag/v0.95.0"/> | ||||
|     <title>Release v0.95.0</title> | ||||
|     <content type="html"><p>New features, stability, optimization improvements.</p></content> | ||||
|     <author> | ||||
|       <name>Jaylyn-Barbee</name> | ||||
|     </author> | ||||
|   </entry> | ||||
| </feed>""" | ||||
|  | ||||
| # RSS feed without default namespace | ||||
| rss_feed_no_default_ns = """<?xml version="1.0" encoding="UTF-8"?> | ||||
| <rss version="2.0"> | ||||
|   <channel> | ||||
|     <title>Channel Title</title> | ||||
|     <description>Channel Description</description> | ||||
|     <item> | ||||
|       <title>Item 1 Title</title> | ||||
|       <description>Item 1 Description</description> | ||||
|     </item> | ||||
|     <item> | ||||
|       <title>Item 2 Title</title> | ||||
|       <description>Item 2 Description</description> | ||||
|     </item> | ||||
|   </channel> | ||||
| </rss>""" | ||||
|  | ||||
| # RSS 2.0 feed with namespace prefix (not default) | ||||
| rss_feed_with_ns_prefix = """<?xml version="1.0" encoding="UTF-8"?> | ||||
| <rss xmlns:dc="http://purl.org/dc/elements/1.1/" | ||||
|      xmlns:content="http://purl.org/rss/1.0/modules/content/" | ||||
|      xmlns:atom="http://www.w3.org/2005/Atom" | ||||
|      version="2.0"> | ||||
|   <channel> | ||||
|     <title>Channel Title</title> | ||||
|     <atom:link href="http://example.com/feed" rel="self" type="application/rss+xml"/> | ||||
|     <item> | ||||
|       <title>Item Title</title> | ||||
|       <dc:creator>Author Name</dc:creator> | ||||
|     </item> | ||||
|   </channel> | ||||
| </rss>""" | ||||
|  | ||||
|  | ||||
| class TestXPathDefaultNamespace: | ||||
|     """Test XPath queries on feeds with and without default namespaces.""" | ||||
|  | ||||
|     def test_atom_feed_simple_xpath_with_xpath_filter(self): | ||||
|         """Test that //title/text() works on Atom feed with default namespace using xpath_filter.""" | ||||
|         result = html_tools.xpath_filter('//title/text()', atom_feed_with_default_ns, is_rss=True) | ||||
|         assert 'Release notes from PowerToys' in result | ||||
|         assert 'Release 0.95.1' in result | ||||
|         assert 'Release v0.95.0' in result | ||||
|  | ||||
|     def test_atom_feed_nested_xpath_with_xpath_filter(self): | ||||
|         """Test nested XPath like //entry/title/text() on Atom feed.""" | ||||
|         result = html_tools.xpath_filter('//entry/title/text()', atom_feed_with_default_ns, is_rss=True) | ||||
|         assert 'Release 0.95.1' in result | ||||
|         assert 'Release v0.95.0' in result | ||||
|         # Should NOT include the feed title | ||||
|         assert 'Release notes from PowerToys' not in result | ||||
|  | ||||
|     def test_atom_feed_other_elements_with_xpath_filter(self): | ||||
|         """Test that other elements like //updated/text() work on Atom feed.""" | ||||
|         result = html_tools.xpath_filter('//updated/text()', atom_feed_with_default_ns, is_rss=True) | ||||
|         assert '2025-10-23T08:53:12Z' in result | ||||
|         assert '2025-10-24T14:20:14Z' in result | ||||
|  | ||||
|     def test_rss_feed_without_namespace(self): | ||||
|         """Test that //title/text() works on RSS feed without default namespace.""" | ||||
|         result = html_tools.xpath_filter('//title/text()', rss_feed_no_default_ns, is_rss=True) | ||||
|         assert 'Channel Title' in result | ||||
|         assert 'Item 1 Title' in result | ||||
|         assert 'Item 2 Title' in result | ||||
|  | ||||
|     def test_rss_feed_nested_xpath(self): | ||||
|         """Test nested XPath on RSS feed without default namespace.""" | ||||
|         result = html_tools.xpath_filter('//item/title/text()', rss_feed_no_default_ns, is_rss=True) | ||||
|         assert 'Item 1 Title' in result | ||||
|         assert 'Item 2 Title' in result | ||||
|         # Should NOT include channel title | ||||
|         assert 'Channel Title' not in result | ||||
|  | ||||
|     def test_rss_feed_with_prefixed_namespaces(self): | ||||
|         """Test that feeds with namespace prefixes (not default) still work.""" | ||||
|         result = html_tools.xpath_filter('//title/text()', rss_feed_with_ns_prefix, is_rss=True) | ||||
|         assert 'Channel Title' in result | ||||
|         assert 'Item Title' in result | ||||
|  | ||||
|     def test_local_name_workaround_still_works(self): | ||||
|         """Test that local-name() workaround still works for Atom feeds.""" | ||||
|         result = html_tools.xpath_filter('//*[local-name()="title"]/text()', atom_feed_with_default_ns, is_rss=True) | ||||
|         assert 'Release notes from PowerToys' in result | ||||
|         assert 'Release 0.95.1' in result | ||||
|  | ||||
|     def test_xpath1_filter_without_default_namespace(self): | ||||
|         """Test xpath1_filter works on RSS without default namespace.""" | ||||
|         result = html_tools.xpath1_filter('//title/text()', rss_feed_no_default_ns, is_rss=True) | ||||
|         assert 'Channel Title' in result | ||||
|         assert 'Item 1 Title' in result | ||||
|  | ||||
|     def test_xpath1_filter_with_default_namespace_returns_empty(self): | ||||
|         """Test that xpath1_filter returns empty on Atom with default namespace (known limitation).""" | ||||
|         result = html_tools.xpath1_filter('//title/text()', atom_feed_with_default_ns, is_rss=True) | ||||
|         # xpath1_filter (lxml) doesn't support default namespaces, so this returns empty | ||||
|         assert result == '' | ||||
|  | ||||
|     def test_xpath1_filter_local_name_workaround(self): | ||||
|         """Test that xpath1_filter works with local-name() workaround on Atom feeds.""" | ||||
|         result = html_tools.xpath1_filter('//*[local-name()="title"]/text()', atom_feed_with_default_ns, is_rss=True) | ||||
|         assert 'Release notes from PowerToys' in result | ||||
|         assert 'Release 0.95.1' in result | ||||
|  | ||||
|  | ||||
| if __name__ == '__main__': | ||||
|     pytest.main([__file__, '-v']) | ||||
| @@ -244,7 +244,7 @@ def new_live_server_setup(live_server): | ||||
|         return request.method | ||||
|  | ||||
|     # Where we POST to as a notification, also use a space here to test URL escaping is OK across all tests that use this. ( #2868 ) | ||||
|     @live_server.app.route('/test_notification endpoint', methods=['POST', 'GET']) | ||||
|     @live_server.app.route('/test_notification_endpoint', methods=['POST', 'GET']) | ||||
|     def test_notification_endpoint(): | ||||
|  | ||||
|         with open("test-datastore/notification.txt", "wb") as f: | ||||
|   | ||||
| @@ -1,3 +1,5 @@ | ||||
| from functools import lru_cache | ||||
|  | ||||
| import arrow | ||||
| from enum import IntEnum | ||||
|  | ||||
| @@ -12,7 +14,7 @@ class Weekday(IntEnum): | ||||
|     Saturday = 5 | ||||
|     Sunday = 6 | ||||
|  | ||||
|  | ||||
| @lru_cache(maxsize=100) | ||||
| def am_i_inside_time( | ||||
|         day_of_week: str, | ||||
|         time_str: str, | ||||
|   | ||||
| @@ -2,6 +2,9 @@ | ||||
|  | ||||
| import sys | ||||
| import os | ||||
|  | ||||
| from changedetectionio.model import USE_SYSTEM_DEFAULT_NOTIFICATION_FORMAT_FOR_WATCH | ||||
|  | ||||
| sys.path.insert(0, os.path.join(os.path.dirname(__file__), '../..')) | ||||
|  | ||||
| from changedetectionio.widgets import TernaryNoneBooleanField | ||||
| @@ -93,7 +96,7 @@ def test_custom_text(): | ||||
|     print(f"Does NOT contain 'System default': {'System default' not in boolean_html}") | ||||
|     print(f"Does NOT contain 'Default': {'Default' not in boolean_html}") | ||||
|     assert 'Enabled' in boolean_html and 'Disabled' in boolean_html | ||||
|     assert 'System default' not in boolean_html and 'Default' not in boolean_html | ||||
|     assert USE_SYSTEM_DEFAULT_NOTIFICATION_FORMAT_FOR_WATCH not in boolean_html and 'Default' not in boolean_html | ||||
|      | ||||
|     # Test FontAwesome field | ||||
|     print("\n--- FontAwesome Icons Field ---") | ||||
|   | ||||
| @@ -28,7 +28,7 @@ info: | ||||
|      | ||||
|     For example: `x-api-key: YOUR_API_KEY` | ||||
|      | ||||
|   version: 0.1.1 | ||||
|   version: 0.1.2 | ||||
|   contact: | ||||
|     name: ChangeDetection.io | ||||
|     url: https://github.com/dgtlmoon/changedetection.io | ||||
| @@ -143,7 +143,7 @@ components: | ||||
|         paused: | ||||
|           type: boolean | ||||
|           description: Whether the web page change monitor (watch) is paused | ||||
|         muted: | ||||
|         notification_muted: | ||||
|           type: boolean | ||||
|           description: Whether notifications are muted | ||||
|         method: | ||||
| @@ -207,7 +207,7 @@ components: | ||||
|           maxLength: 5000 | ||||
|         notification_format: | ||||
|           type: string | ||||
|           enum: [Text, HTML, Markdown] | ||||
|           enum: ['text', 'html', 'htmlcolor', 'markdown', 'System default'] | ||||
|           description: Format for notifications | ||||
|         track_ldjson_price_data: | ||||
|           type: boolean | ||||
| @@ -406,7 +406,7 @@ paths: | ||||
|                   page_title: "The HTML <title> from the page" | ||||
|                   tags: ["550e8400-e29b-41d4-a716-446655440000"] | ||||
|                   paused: false | ||||
|                   muted: false | ||||
|                   notification_muted: false | ||||
|                   method: "GET" | ||||
|                   fetch_backend: "html_requests" | ||||
|                   last_checked: 1640995200 | ||||
| @@ -419,7 +419,7 @@ paths: | ||||
|                   page_title: "The HTML <title> from the page" | ||||
|                   tags: ["330e8400-e29b-41d4-a716-446655440001"] | ||||
|                   paused: false | ||||
|                   muted: true | ||||
|                   notification_muted: true | ||||
|                   method: "GET" | ||||
|                   fetch_backend: "html_webdriver" | ||||
|                   last_checked: 1640998800 | ||||
| @@ -1224,7 +1224,7 @@ paths: | ||||
|                     title: "Example Website Monitor" | ||||
|                     tags: ["550e8400-e29b-41d4-a716-446655440000"] | ||||
|                     paused: false | ||||
|                     muted: false | ||||
|                     notification_muted: false | ||||
|  | ||||
|   /import: | ||||
|     post: | ||||
|   | ||||
| @@ -1,5 +1,5 @@ | ||||
| # eventlet>=0.38.0  # Removed - replaced with threading mode for better Python 3.12+ compatibility | ||||
| feedgen~=0.9 | ||||
| feedgen~=1.0 | ||||
| feedparser~=6.0  # For parsing RSS/Atom feeds | ||||
| flask-compress | ||||
| # 0.6.3 included compatibility fix for werkzeug 3.x (2.x had deprecation of url handlers) | ||||
| @@ -12,7 +12,7 @@ janus # Thread-safe async/sync queue bridge | ||||
| flask_wtf~=1.2 | ||||
| flask~=3.1 | ||||
| flask-socketio~=5.5.1 | ||||
| python-socketio~=5.13.0 | ||||
| python-socketio~=5.14.2 | ||||
| python-engineio~=4.12.3 | ||||
| inscriptis~=2.2 | ||||
| pytz | ||||
| @@ -22,7 +22,7 @@ validators~=0.35 | ||||
|  | ||||
| # Set these versions together to avoid a RequestsDependencyWarning | ||||
| # >= 2.26 also adds Brotli support if brotli is installed | ||||
| brotli~=1.0 | ||||
| brotli~=1.1 | ||||
| requests[socks] | ||||
| requests-file | ||||
|  | ||||
| @@ -30,7 +30,7 @@ requests-file | ||||
| # If specific version needed for security, use urllib3>=1.26.19,<3.0 | ||||
| chardet>2.3.0 | ||||
|  | ||||
| wtforms~=3.0 | ||||
| wtforms~=3.2 | ||||
| jsonpath-ng~=1.5.3 | ||||
|  | ||||
| # dnspython - Used by paho-mqtt for MQTT broker resolution   | ||||
|   | ||||
		Reference in New Issue
	
	Block a user