mirror of
				https://github.com/dgtlmoon/changedetection.io.git
				synced 2025-10-31 14:47:21 +00:00 
			
		
		
		
	Compare commits
	
		
			3 Commits
		
	
	
		
			0.50.37
			...
			regex-filt
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
|   | 0e9f01639a | ||
|   | b236c8b24b | ||
|   | 27cedc9fa4 | 
| @@ -9,6 +9,7 @@ from .safe_jinja import ( | ||||
|     JINJA2_MAX_RETURN_PAYLOAD_SIZE, | ||||
|     DEFAULT_JINJA2_EXTENSIONS, | ||||
| ) | ||||
| from .plugins.regex import regex_replace | ||||
|  | ||||
| __all__ = [ | ||||
|     'TimeExtension', | ||||
| @@ -17,4 +18,5 @@ __all__ = [ | ||||
|     'create_jinja_env', | ||||
|     'JINJA2_MAX_RETURN_PAYLOAD_SIZE', | ||||
|     'DEFAULT_JINJA2_EXTENSIONS', | ||||
|     'regex_replace', | ||||
| ] | ||||
|   | ||||
							
								
								
									
										6
									
								
								changedetectionio/jinja2_custom/plugins/__init__.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										6
									
								
								changedetectionio/jinja2_custom/plugins/__init__.py
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,6 @@ | ||||
| """ | ||||
| Jinja2 custom filter plugins for changedetection.io | ||||
| """ | ||||
| from .regex import regex_replace | ||||
|  | ||||
| __all__ = ['regex_replace'] | ||||
							
								
								
									
										98
									
								
								changedetectionio/jinja2_custom/plugins/regex.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										98
									
								
								changedetectionio/jinja2_custom/plugins/regex.py
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,98 @@ | ||||
| """ | ||||
| Regex filter plugin for Jinja2 templates. | ||||
|  | ||||
| Provides regex_replace filter for pattern-based string replacements in templates. | ||||
| """ | ||||
| import re | ||||
| import signal | ||||
| from loguru import logger | ||||
|  | ||||
|  | ||||
| def regex_replace(value: str, pattern: str, replacement: str = '', count: int = 0) -> str: | ||||
|     """ | ||||
|     Replace occurrences of a regex pattern in a string. | ||||
|  | ||||
|     Security: Protected against ReDoS (Regular Expression Denial of Service) attacks: | ||||
|     - Limits input value size to prevent excessive processing | ||||
|     - Uses timeout mechanism to prevent runaway regex operations | ||||
|     - Validates pattern complexity to prevent catastrophic backtracking | ||||
|  | ||||
|     Args: | ||||
|         value: The input string to perform replacements on | ||||
|         pattern: The regex pattern to search for | ||||
|         replacement: The replacement string (default: '') | ||||
|         count: Maximum number of replacements (0 = replace all, default: 0) | ||||
|  | ||||
|     Returns: | ||||
|         String with replacements applied, or original value on error | ||||
|  | ||||
|     Example: | ||||
|         {{ "hello world" | regex_replace("world", "universe") }} | ||||
|         {{ diff | regex_replace("<td>([^<]+)</td><td>([^<]+)</td>", "Label1: \\1\\nLabel2: \\2") }} | ||||
|  | ||||
|     Security limits: | ||||
|         - Maximum input size: 10MB | ||||
|         - Maximum pattern length: 500 characters | ||||
|         - Operation timeout: 10 seconds | ||||
|         - Dangerous nested quantifier patterns are rejected | ||||
|     """ | ||||
|     # Security limits | ||||
|     MAX_INPUT_SIZE = 1024 * 1024 * 10 # 10MB max input size | ||||
|     MAX_PATTERN_LENGTH = 500  # Maximum regex pattern length | ||||
|     REGEX_TIMEOUT_SECONDS = 10  # Maximum time for regex operation | ||||
|  | ||||
|     # Validate input sizes | ||||
|     value_str = str(value) | ||||
|     if len(value_str) > MAX_INPUT_SIZE: | ||||
|         logger.warning(f"regex_replace: Input too large ({len(value_str)} bytes), truncating") | ||||
|         value_str = value_str[:MAX_INPUT_SIZE] | ||||
|  | ||||
|     if len(pattern) > MAX_PATTERN_LENGTH: | ||||
|         logger.warning(f"regex_replace: Pattern too long ({len(pattern)} chars), rejecting") | ||||
|         return value_str | ||||
|  | ||||
|     # Check for potentially dangerous patterns (basic checks) | ||||
|     # Nested quantifiers like (a+)+ can cause catastrophic backtracking | ||||
|     dangerous_patterns = [ | ||||
|         r'\([^)]*\+[^)]*\)\+',  # (x+)+ | ||||
|         r'\([^)]*\*[^)]*\)\+',  # (x*)+ | ||||
|         r'\([^)]*\+[^)]*\)\*',  # (x+)* | ||||
|         r'\([^)]*\*[^)]*\)\*',  # (x*)* | ||||
|     ] | ||||
|  | ||||
|     for dangerous in dangerous_patterns: | ||||
|         if re.search(dangerous, pattern): | ||||
|             logger.warning(f"regex_replace: Potentially dangerous pattern detected: {pattern}") | ||||
|             return value_str | ||||
|  | ||||
|     def timeout_handler(signum, frame): | ||||
|         raise TimeoutError("Regex operation timed out") | ||||
|  | ||||
|     try: | ||||
|         # Set up timeout for regex operation (Unix-like systems only) | ||||
|         # This prevents ReDoS attacks | ||||
|         old_handler = None | ||||
|         if hasattr(signal, 'SIGALRM'): | ||||
|             old_handler = signal.signal(signal.SIGALRM, timeout_handler) | ||||
|             signal.alarm(REGEX_TIMEOUT_SECONDS) | ||||
|  | ||||
|         try: | ||||
|             result = re.sub(pattern, replacement, value_str, count=count) | ||||
|         finally: | ||||
|             # Cancel the alarm | ||||
|             if hasattr(signal, 'SIGALRM'): | ||||
|                 signal.alarm(0) | ||||
|                 if old_handler is not None: | ||||
|                     signal.signal(signal.SIGALRM, old_handler) | ||||
|  | ||||
|         return result | ||||
|  | ||||
|     except TimeoutError: | ||||
|         logger.error(f"regex_replace: Regex operation timed out - possible ReDoS attack. Pattern: {pattern}") | ||||
|         return value_str | ||||
|     except re.error as e: | ||||
|         logger.warning(f"regex_replace: Invalid regex pattern: {e}") | ||||
|         return value_str | ||||
|     except Exception as e: | ||||
|         logger.error(f"regex_replace: Unexpected error: {e}") | ||||
|         return value_str | ||||
| @@ -8,13 +8,13 @@ import jinja2.sandbox | ||||
| import typing as t | ||||
| import os | ||||
| from .extensions.TimeExtension import TimeExtension | ||||
| from .plugins import regex_replace | ||||
|  | ||||
| JINJA2_MAX_RETURN_PAYLOAD_SIZE = 1024 * int(os.getenv("JINJA2_MAX_RETURN_PAYLOAD_SIZE_KB", 1024 * 10)) | ||||
|  | ||||
| # Default extensions - can be overridden in create_jinja_env() | ||||
| DEFAULT_JINJA2_EXTENSIONS = [TimeExtension] | ||||
|  | ||||
|  | ||||
| def create_jinja_env(extensions=None, **kwargs) -> jinja2.sandbox.ImmutableSandboxedEnvironment: | ||||
|     """ | ||||
|     Create a sandboxed Jinja2 environment with our custom extensions and default timezone. | ||||
| @@ -38,6 +38,9 @@ def create_jinja_env(extensions=None, **kwargs) -> jinja2.sandbox.ImmutableSandb | ||||
|     default_timezone = os.getenv('TZ', 'UTC').strip() | ||||
|     jinja2_env.default_timezone = default_timezone | ||||
|  | ||||
|     # Register custom filters | ||||
|     jinja2_env.filters['regex_replace'] = regex_replace | ||||
|  | ||||
|     return jinja2_env | ||||
|  | ||||
|  | ||||
|   | ||||
| @@ -134,6 +134,12 @@ | ||||
|                                     <p> | ||||
|                                         URL encoding, use <strong>|urlencode</strong>, for example - <code>gets://hook-website.com/test.php?title={{ '{{ watch_title|urlencode }}' }}</code> | ||||
|                                     </p> | ||||
|                                     <p> | ||||
|                                         Regular-expression replace, use <strong>|regex_replace</strong>, for example -   <code>{{ "{{ \"hello world 123\" | regex_replace('[0-9]+', 'no-more-numbers') }}" }}</code> | ||||
|                                     </p> | ||||
|                                     <p> | ||||
|                                         For a complete reference of all Jinja2 built-in filters, users can refer to the <a href="https://jinja.palletsprojects.com/en/3.1.x/templates/#builtin-filters">https://jinja.palletsprojects.com/en/3.1.x/templates/#builtin-filters</a> | ||||
|                                     </p> | ||||
|                                 </div> | ||||
|                             </div> | ||||
|                             <div class="pure-control-group"> | ||||
|   | ||||
| @@ -169,4 +169,161 @@ def test_default_timezone_subtraction(environment): | ||||
|  | ||||
|     finalRender = render("{% now '' - 'minutes=11' %}") | ||||
|  | ||||
|     assert finalRender == "Wed, 09 Dec 2015 23:22:01" | ||||
|     assert finalRender == "Wed, 09 Dec 2015 23:22:01" | ||||
|  | ||||
| def test_regex_replace_basic(): | ||||
|     """Test basic regex_replace functionality.""" | ||||
|  | ||||
|     # Simple word replacement | ||||
|     finalRender = render("{{ 'hello world' | regex_replace('world', 'universe') }}") | ||||
|     assert finalRender == "hello universe" | ||||
|  | ||||
| def test_regex_replace_with_groups(): | ||||
|     """Test regex_replace with capture groups (issue #3501 use case).""" | ||||
|  | ||||
|     # Transform HTML table data as described in the issue | ||||
|     template = "{{ '<td>thing</td><td>other</td>' | regex_replace('<td>([^<]+)</td><td>([^<]+)</td>', 'ThingLabel: \\\\1\\nOtherLabel: \\\\2') }}" | ||||
|     finalRender = render(template) | ||||
|     assert "ThingLabel: thing" in finalRender | ||||
|     assert "OtherLabel: other" in finalRender | ||||
|  | ||||
| def test_regex_replace_multiple_matches(): | ||||
|     """Test regex_replace replacing multiple occurrences.""" | ||||
|  | ||||
|     finalRender = render("{{ 'foo bar foo baz' | regex_replace('foo', 'qux') }}") | ||||
|     assert finalRender == "qux bar qux baz" | ||||
|  | ||||
| def test_regex_replace_count_parameter(): | ||||
|     """Test regex_replace with count parameter to limit replacements.""" | ||||
|  | ||||
|     finalRender = render("{{ 'foo bar foo baz' | regex_replace('foo', 'qux', 1) }}") | ||||
|     assert finalRender == "qux bar foo baz" | ||||
|  | ||||
| def test_regex_replace_empty_replacement(): | ||||
|     """Test regex_replace with empty replacement (removal).""" | ||||
|  | ||||
|     finalRender = render("{{ 'hello world 123' | regex_replace('[0-9]+', '') }}") | ||||
|     assert finalRender == "hello world " | ||||
|  | ||||
| def test_regex_replace_no_match(): | ||||
|     """Test regex_replace when pattern doesn't match.""" | ||||
|  | ||||
|     finalRender = render("{{ 'hello world' | regex_replace('xyz', 'abc') }}") | ||||
|     assert finalRender == "hello world" | ||||
|  | ||||
| def test_regex_replace_invalid_regex(): | ||||
|     """Test regex_replace with invalid regex pattern returns original value.""" | ||||
|  | ||||
|     # Invalid regex (unmatched parenthesis) | ||||
|     finalRender = render("{{ 'hello world' | regex_replace('(invalid', 'replacement') }}") | ||||
|     assert finalRender == "hello world" | ||||
|  | ||||
| def test_regex_replace_special_characters(): | ||||
|     """Test regex_replace with special regex characters.""" | ||||
|  | ||||
|     finalRender = render("{{ 'Price: $50.00' | regex_replace('\\\\$([0-9.]+)', 'USD \\\\1') }}") | ||||
|     assert finalRender == "Price: USD 50.00" | ||||
|  | ||||
| def test_regex_replace_multiline(): | ||||
|     """Test regex_replace on multiline text.""" | ||||
|  | ||||
|     template = "{{ 'line1\\nline2\\nline3' | regex_replace('^line', 'row') }}" | ||||
|     finalRender = render(template) | ||||
|     # By default re.sub doesn't use MULTILINE flag, so only first line matches with ^ | ||||
|     assert finalRender == "row1\nline2\nline3" | ||||
|  | ||||
| def test_regex_replace_with_notification_context(): | ||||
|     """Test regex_replace with notification diff variable.""" | ||||
|  | ||||
|     # Simulate how it would be used in notifications with diff variable | ||||
|     from changedetectionio.notification_service import NotificationContextData | ||||
|  | ||||
|     context = NotificationContextData() | ||||
|     context['diff'] = '<td>value1</td><td>value2</td>' | ||||
|  | ||||
|     template = "{{ diff | regex_replace('<td>([^<]+)</td>', '\\\\1 ') }}" | ||||
|  | ||||
|     from changedetectionio.jinja2_custom import create_jinja_env | ||||
|     from jinja2 import BaseLoader | ||||
|  | ||||
|     jinja2_env = create_jinja_env(loader=BaseLoader) | ||||
|     jinja2_env.globals.update(context) | ||||
|     finalRender = jinja2_env.from_string(template).render() | ||||
|  | ||||
|     assert "value1 value2 " in finalRender | ||||
|  | ||||
| def test_regex_replace_security_large_input(): | ||||
|     """Test regex_replace handles large input safely.""" | ||||
|  | ||||
|     # Create a large input string (over 10MB) | ||||
|     large_input = "x" * (1024 * 1024 * 10 + 1000) | ||||
|     template = "{{ large_input | regex_replace('x', 'y') }}" | ||||
|  | ||||
|     from changedetectionio.jinja2_custom import create_jinja_env | ||||
|     from jinja2 import BaseLoader | ||||
|  | ||||
|     jinja2_env = create_jinja_env(loader=BaseLoader) | ||||
|     jinja2_env.globals['large_input'] = large_input | ||||
|     finalRender = jinja2_env.from_string(template).render() | ||||
|  | ||||
|     # Should be truncated to 10MB | ||||
|     assert len(finalRender) == 1024 * 1024 * 10 | ||||
|  | ||||
| def test_regex_replace_security_long_pattern(): | ||||
|     """Test regex_replace rejects very long patterns.""" | ||||
|  | ||||
|     # Pattern longer than 500 chars should be rejected | ||||
|     long_pattern = "a" * 501 | ||||
|     finalRender = render("{{ 'test' | regex_replace('" + long_pattern + "', 'replacement') }}") | ||||
|  | ||||
|     # Should return original value when pattern is too long | ||||
|     assert finalRender == "test" | ||||
|  | ||||
| def test_regex_replace_security_dangerous_pattern(): | ||||
|     """Test regex_replace detects and rejects dangerous nested quantifiers.""" | ||||
|  | ||||
|     # Patterns that could cause catastrophic backtracking | ||||
|     dangerous_patterns = [ | ||||
|         "(a+)+", | ||||
|         "(a*)+", | ||||
|         "(a+)*", | ||||
|         "(a*)*", | ||||
|     ] | ||||
|  | ||||
|     for dangerous in dangerous_patterns: | ||||
|         # Create a template with the dangerous pattern | ||||
|         # Using single quotes to avoid escaping issues | ||||
|         from changedetectionio.jinja2_custom import create_jinja_env | ||||
|         from jinja2 import BaseLoader | ||||
|  | ||||
|         jinja2_env = create_jinja_env(loader=BaseLoader) | ||||
|         jinja2_env.globals['pattern'] = dangerous | ||||
|         template = "{{ 'aaaaaaaaaa' | regex_replace(pattern, 'x') }}" | ||||
|         finalRender = jinja2_env.from_string(template).render() | ||||
|  | ||||
|         # Should return original value when dangerous pattern is detected | ||||
|         assert finalRender == "aaaaaaaaaa" | ||||
|  | ||||
| def test_regex_replace_security_timeout_protection(): | ||||
|     """Test that regex_replace has timeout protection (if SIGALRM available).""" | ||||
|     import signal | ||||
|  | ||||
|     # Only test on systems that support SIGALRM | ||||
|     if not hasattr(signal, 'SIGALRM'): | ||||
|         # Skip test on Windows and other systems without SIGALRM | ||||
|         return | ||||
|  | ||||
|     # This pattern is known to cause exponential backtracking on certain inputs | ||||
|     # but should be caught by our dangerous pattern detector | ||||
|     # We're mainly testing that the timeout mechanism works | ||||
|  | ||||
|     from changedetectionio.jinja2_custom import regex_replace | ||||
|  | ||||
|     # Create input that could trigger slow regex | ||||
|     test_input = "a" * 50 + "b" | ||||
|  | ||||
|     # This shouldn't take long due to our protections | ||||
|     result = regex_replace(test_input, "a+b", "x") | ||||
|  | ||||
|     # Should complete and return a result | ||||
|     assert result is not None | ||||
		Reference in New Issue
	
	Block a user