diff --git a/changedetectionio/api/__init__.py b/changedetectionio/api/__init__.py index 4004e019..cb607c8f 100644 --- a/changedetectionio/api/__init__.py +++ b/changedetectionio/api/__init__.py @@ -1,10 +1,7 @@ import copy -import yaml import functools from flask import request, abort from loguru import logger -from openapi_core import OpenAPI -from openapi_core.contrib.flask import FlaskOpenAPIRequest from . import api_schema from ..model import watch_base @@ -34,7 +31,11 @@ schema_delete_notification_urls['required'] = ['notification_urls'] @functools.cache def get_openapi_spec(): + """Lazy load OpenAPI spec and dependencies only when validation is needed.""" import os + import yaml # Lazy import - only loaded when API validation is actually used + from openapi_core import OpenAPI # Lazy import - saves ~10.7 MB on startup + spec_path = os.path.join(os.path.dirname(__file__), '../../docs/api-spec.yaml') with open(spec_path, 'r') as f: spec_dict = yaml.safe_load(f) @@ -49,6 +50,9 @@ def validate_openapi_request(operation_id): try: # Skip OpenAPI validation for GET requests since they don't have request bodies if request.method.upper() != 'GET': + # Lazy import - only loaded when actually validating a request + from openapi_core.contrib.flask import FlaskOpenAPIRequest + spec = get_openapi_spec() openapi_request = FlaskOpenAPIRequest(request) result = spec.unmarshal_request(openapi_request) diff --git a/changedetectionio/html_tools.py b/changedetectionio/html_tools.py index e39ecdf9..3800d372 100644 --- a/changedetectionio/html_tools.py +++ b/changedetectionio/html_tools.py @@ -100,7 +100,7 @@ def element_removal(selectors: List[str], html_content): xpath_selectors = [] for selector in selectors: - if selector.startswith(('xpath:', 'xpath1:', '//')): + if selector.strip().startswith(('xpath:', 'xpath1:', '//')): # Handle XPath selectors separately xpath_selector = selector.removeprefix('xpath:').removeprefix('xpath1:') xpath_selectors.append(xpath_selector) diff --git a/changedetectionio/processors/magic.py b/changedetectionio/processors/magic.py index 7768ce08..a742b9f8 100644 --- a/changedetectionio/processors/magic.py +++ b/changedetectionio/processors/magic.py @@ -44,11 +44,8 @@ XML_CONTENT_TYPES = [ HTML_PATTERNS = ['Added by changedetection.io: Document checksum - " + f"{hashlib.md5(raw_content).hexdigest().upper()} " + f"Filesize - {len(html_content)} bytes

" + ) + return html_content.replace('', metadata + '') + + def preprocess_json(self, content, has_filters): + """Format and sort JSON content.""" + # Force reformat if no filters specified + if not has_filters: + content = html_tools.extract_json_as_string(content=content, json_filter="json:$") + + # Sort JSON to avoid false alerts from reordering + try: + content = json.dumps(json.loads(content), sort_keys=True) + except Exception: + # Might be malformed JSON, continue anyway + pass + + return content + + def apply_include_filters(self, content, stream_content_type): + """Apply CSS, XPath, or JSON filters to extract specific content.""" + filtered_content = "" + + for filter_rule in self.filter_config.include_filters: + # XPath filters + if filter_rule[0] == '/' or filter_rule.startswith('xpath:'): + filtered_content += html_tools.xpath_filter( + xpath_filter=filter_rule.replace('xpath:', ''), + html_content=content, + append_pretty_line_formatting=not self.watch.is_source_type_url, + is_rss=stream_content_type.is_rss + ) + + # XPath1 filters (first match only) + elif filter_rule.startswith('xpath1:'): + filtered_content += html_tools.xpath1_filter( + xpath_filter=filter_rule.replace('xpath1:', ''), + html_content=content, + append_pretty_line_formatting=not self.watch.is_source_type_url, + is_rss=stream_content_type.is_rss + ) + + # JSON filters + elif any(filter_rule.startswith(prefix) for prefix in json_filter_prefixes): + filtered_content += html_tools.extract_json_as_string( + content=content, + json_filter=filter_rule + ) + + # CSS selectors, default fallback + else: + filtered_content += html_tools.include_filters( + include_filters=filter_rule, + html_content=content, + append_pretty_line_formatting=not self.watch.is_source_type_url + ) + + # Raise error if filter returned nothing + if not filtered_content.strip(): + raise FilterNotFoundInResponse( + msg=self.filter_config.include_filters, + screenshot=self.fetcher.screenshot, + xpath_data=self.fetcher.xpath_data + ) + + return filtered_content + + def apply_subtractive_selectors(self, content): + """Remove elements matching subtractive selectors.""" + return html_tools.element_removal(self.filter_config.subtractive_selectors, content) + + def extract_text_from_html(self, html_content, stream_content_type): + """Convert HTML to plain text.""" + do_anchor = self.datastore.data["settings"]["application"].get("render_anchor_tag_content", False) + return html_tools.html_to_text( + html_content=html_content, + render_anchor_tag_content=do_anchor, + is_rss=stream_content_type.is_rss + ) + + +class ChecksumCalculator: + """Calculates checksums with various options.""" + + @staticmethod + def calculate(text, ignore_whitespace=False): + """Calculate MD5 checksum of text content.""" + if ignore_whitespace: + text = text.translate(TRANSLATE_WHITESPACE_TABLE) + return hashlib.md5(text.encode('utf-8')).hexdigest() + + # Some common stuff here that can be moved to a base class # (set_proxy_from_list) class perform_site_check(difference_detection_processor): def run_changedetection(self, watch): changed_detected = False - html_content = "" - screenshot = False # as bytes - stripped_text_from_html = "" if not watch: raise Exception("Watch no longer exists.") + # Initialize components + filter_config = FilterConfig(watch, self.datastore) + content_processor = ContentProcessor(self.fetcher, watch, filter_config, self.datastore) + transformer = ContentTransformer() + rule_engine = RuleEngine() + + # Get content type and stream info ctype_header = self.fetcher.get_all_headers().get('content-type', DEFAULT_WHEN_NO_CONTENT_TYPE_HEADER).lower() stream_content_type = guess_stream_type(http_content_header=ctype_header, content=self.fetcher.content) # Unset any existing notification error update_obj = {'last_notification_error': False, 'last_error': False} - url = watch.link self.screenshot = self.fetcher.screenshot self.xpath_data = self.fetcher.xpath_data - # Track the content type + # Track the content type and checksum before filters update_obj['content_type'] = ctype_header - - # Watches added automatically in the queue manager will skip if its the same checksum as the previous run - # Saves a lot of CPU update_obj['previous_md5_before_filters'] = hashlib.md5(self.fetcher.content.encode('utf-8')).hexdigest() - # Fetching complete, now filters + # === CONTENT PREPROCESSING === + # Avoid creating unnecessary intermediate string copies by reassigning only when needed + content = self.fetcher.content - # @note: I feel like the following should be in a more obvious chain system - # - Check filter text - # - Is the checksum different? - # - Do we convert to JSON? - # https://stackoverflow.com/questions/41817578/basic-method-chaining ? - # return content().textfilter().jsonextract().checksumcompare() ? - - - # Go into RSS preprocess for converting CDATA/comment to usable text + # RSS preprocessing if stream_content_type.is_rss: - self.fetcher.content = cdata_in_document_to_text(html_content=self.fetcher.content) + content = content_processor.preprocess_rss(content) + # PDF preprocessing if watch.is_pdf or stream_content_type.is_pdf: - from shutil import which - tool = os.getenv("PDF_TO_HTML_TOOL", "pdftohtml") - if not which(tool): - raise PDFToHTMLToolNotFound("Command-line `{}` tool was not found in system PATH, was it installed?".format(tool)) - - import subprocess - proc = subprocess.Popen( - [tool, '-stdout', '-', '-s', 'out.pdf', '-i'], - stdout=subprocess.PIPE, - stdin=subprocess.PIPE) - proc.stdin.write(self.fetcher.raw_content) - proc.stdin.close() - self.fetcher.content = proc.stdout.read().decode('utf-8') - proc.wait(timeout=60) - - # Add a little metadata so we know if the file changes (like if an image changes, but the text is the same - # @todo may cause problems with non-UTF8? - metadata = "

Added by changedetection.io: Document checksum - {} Filesize - {} bytes

".format( - hashlib.md5(self.fetcher.raw_content).hexdigest().upper(), - len(self.fetcher.content)) - - self.fetcher.content = self.fetcher.content.replace('', metadata + '') - - # Better would be if Watch.model could access the global data also - # and then use getattr https://docs.python.org/3/reference/datamodel.html#object.__getitem__ - # https://realpython.com/inherit-python-dict/ instead of doing it procedurely - include_filters_from_tags = self.datastore.get_tag_overrides_for_watch(uuid=watch.get('uuid'), attr='include_filters') - - # 1845 - remove duplicated filters in both group and watch include filter - include_filters_rule = list(dict.fromkeys(watch.get('include_filters', []) + include_filters_from_tags)) - - subtractive_selectors = [*self.datastore.get_tag_overrides_for_watch(uuid=watch.get('uuid'), attr='subtractive_selectors'), - *watch.get("subtractive_selectors", []), - *self.datastore.data["settings"]["application"].get("global_subtractive_selectors", []) - ] - - # Inject a virtual LD+JSON price tracker rule - if watch.get('track_ldjson_price_data', '') == PRICE_DATA_TRACK_ACCEPT: - include_filters_rule += html_tools.LD_JSON_PRODUCT_OFFER_SELECTORS - - has_filter_rule = len(include_filters_rule) and len(include_filters_rule[0].strip()) - has_subtractive_selectors = len(subtractive_selectors) and len(subtractive_selectors[0].strip()) + content = content_processor.preprocess_pdf(content, self.fetcher.raw_content) + # JSON preprocessing if stream_content_type.is_json: - if not has_filter_rule: - # Force a reformat - include_filters_rule.append("json:$") - has_filter_rule = True + content = content_processor.preprocess_json(content, filter_config.has_include_filters) - # Sort the JSON so we dont get false alerts when the content is just re-ordered - try: - self.fetcher.content = json.dumps(json.loads(self.fetcher.content), sort_keys=True) - except Exception as e: - # Might have just been a snippet, or otherwise bad JSON, continue - pass + # HTML obfuscation workarounds + if stream_content_type.is_html: + content = html_tools.workarounds_for_obfuscations(content) - if has_filter_rule: - for filter in include_filters_rule: - if any(prefix in filter for prefix in json_filter_prefixes): - stripped_text_from_html += html_tools.extract_json_as_string(content=self.fetcher.content, json_filter=filter) - if stripped_text_from_html: - stream_content_type.is_json = True - stream_content_type.is_html = False + # Check for LD+JSON price data (for HTML content) + if stream_content_type.is_html: + update_obj['has_ldjson_price_data'] = html_tools.has_ldjson_product_info(content) - # We have 'watch.is_source_type_url' because we should be able to use selectors on the raw HTML but return just that selected HTML - if stream_content_type.is_html or watch.is_source_type_url or stream_content_type.is_plaintext or stream_content_type.is_rss or stream_content_type.is_xml or stream_content_type.is_pdf: + # === FILTER APPLICATION === + # Start with content reference, avoid copy until modification + html_content = content - # CSS Filter, extract the HTML that matches and feed that into the existing inscriptis::get_text - self.fetcher.content = html_tools.workarounds_for_obfuscations(self.fetcher.content) - html_content = self.fetcher.content + # Apply include filters (CSS, XPath, JSON) + if filter_config.has_include_filters: + html_content = content_processor.apply_include_filters(content, stream_content_type) - # Some kind of "text" but definitely not RSS looking - if stream_content_type.is_plaintext: - # Don't run get_text or xpath/css filters on plaintext - # We are not HTML, we are not any kind of RSS, doesnt even look like HTML - stripped_text_from_html = html_content + # Apply subtractive selectors + if filter_config.has_subtractive_selectors: + html_content = content_processor.apply_subtractive_selectors(html_content) + + # === TEXT EXTRACTION === + if watch.is_source_type_url: + # For source URLs, keep raw content + stripped_text = html_content + else: + # Extract text from HTML/RSS content (not generic XML) + if stream_content_type.is_html or stream_content_type.is_rss: + stripped_text = content_processor.extract_text_from_html(html_content, stream_content_type) else: - # If not JSON, and if it's not text/plain.. - # Does it have some ld+json price data? used for easier monitoring - update_obj['has_ldjson_price_data'] = html_tools.has_ldjson_product_info(self.fetcher.content) - - # Then we assume HTML - if has_filter_rule: - html_content = "" - - for filter_rule in include_filters_rule: - # For HTML/XML we offer xpath as an option, just start a regular xPath "/.." - if filter_rule[0] == '/' or filter_rule.startswith('xpath:'): - html_content += html_tools.xpath_filter(xpath_filter=filter_rule.replace('xpath:', ''), - html_content=self.fetcher.content, - append_pretty_line_formatting=not watch.is_source_type_url, - is_rss=stream_content_type.is_rss) - - elif filter_rule.startswith('xpath1:'): - html_content += html_tools.xpath1_filter(xpath_filter=filter_rule.replace('xpath1:', ''), - html_content=self.fetcher.content, - append_pretty_line_formatting=not watch.is_source_type_url, - is_rss=stream_content_type.is_rss) - else: - html_content += html_tools.include_filters(include_filters=filter_rule, - html_content=self.fetcher.content, - append_pretty_line_formatting=not watch.is_source_type_url) - - if not html_content.strip(): - raise FilterNotFoundInResponse(msg=include_filters_rule, screenshot=self.fetcher.screenshot, xpath_data=self.fetcher.xpath_data) - - if has_subtractive_selectors: - html_content = html_tools.element_removal(subtractive_selectors, html_content) - - if watch.is_source_type_url: - stripped_text_from_html = html_content - else: - # extract text - do_anchor = self.datastore.data["settings"]["application"].get("render_anchor_tag_content", False) - stripped_text_from_html = html_tools.html_to_text(html_content=html_content, - render_anchor_tag_content=do_anchor, - is_rss=stream_content_type.is_rss) # 1874 activate the 0: - regex_matched_output = [] - for s_re in extract_text: - # incase they specified something in '/.../x' - if re.search(PERL_STYLE_REGEX, s_re, re.IGNORECASE): - regex = html_tools.perl_style_slash_enclosed_regex_to_options(s_re) - result = re.findall(regex, stripped_text_from_html) - - for l in result: - if type(l) is tuple: - # @todo - some formatter option default (between groups) - regex_matched_output += list(l) + ['\n'] - else: - # @todo - some formatter option default (between each ungrouped result) - regex_matched_output += [l] + ['\n'] - else: - # Doesnt look like regex, just hunt for plaintext and return that which matches - # `stripped_text_from_html` will be bytes, so we must encode s_re also to bytes - r = re.compile(re.escape(s_re), re.IGNORECASE) - res = r.findall(stripped_text_from_html) - if res: - for match in res: - regex_matched_output += [match] + ['\n'] - - ########################################################## - stripped_text_from_html = '' - - if regex_matched_output: - # @todo some formatter for presentation? - stripped_text_from_html = ''.join(regex_matched_output) + # === REGEX EXTRACTION === + if filter_config.extract_text: + extracted = transformer.extract_by_regex(stripped_text, filter_config.extract_text) + stripped_text = extracted + # === MORE TEXT TRANSFORMATIONS === if watch.get('remove_duplicate_lines'): - stripped_text_from_html = '\n'.join(dict.fromkeys(line for line in stripped_text_from_html.replace("\n\n", "\n").splitlines())) - + stripped_text = transformer.remove_duplicate_lines(stripped_text) if watch.get('sort_text_alphabetically'): - # Note: Because a <p>something</p> will add an extra line feed to signify the paragraph gap - # we end up with 'Some text\n\n', sorting will add all those extra \n at the start, so we remove them here. - stripped_text_from_html = stripped_text_from_html.replace("\n\n", "\n") - stripped_text_from_html = '\n'.join(sorted(stripped_text_from_html.splitlines(), key=lambda x: x.lower())) + stripped_text = transformer.sort_alphabetically(stripped_text) -### CALCULATE MD5 - # If there's text to ignore - text_to_ignore = watch.get('ignore_text', []) + self.datastore.data['settings']['application'].get('global_ignore_text', []) - text_to_ignore += self.datastore.get_tag_overrides_for_watch(uuid=watch.get('uuid'), attr='ignore_text') + # === CHECKSUM CALCULATION === + text_for_checksuming = stripped_text - text_for_checksuming = stripped_text_from_html - if text_to_ignore: - text_for_checksuming = html_tools.strip_ignore_text(stripped_text_from_html, text_to_ignore) - # Some people prefer to also completely remove it - strip_ignored_lines = watch.get('strip_ignored_lines') if watch.get('strip_ignored_lines') is not None else self.datastore.data['settings']['application'].get('strip_ignored_lines') + # Apply ignore_text for checksum calculation + if filter_config.ignore_text: + text_for_checksuming = html_tools.strip_ignore_text(stripped_text, filter_config.ignore_text) + + # Optionally remove ignored lines from output + strip_ignored_lines = watch.get('strip_ignored_lines') + if strip_ignored_lines is None: + strip_ignored_lines = self.datastore.data['settings']['application'].get('strip_ignored_lines') if strip_ignored_lines: - # @todo add test in the 'preview' mode, check the widget works? compare to datastruct - stripped_text_from_html = text_for_checksuming + stripped_text = text_for_checksuming - # Re #133 - if we should strip whitespaces from triggering the change detected comparison - if text_for_checksuming and self.datastore.data['settings']['application'].get('ignore_whitespace', False): - fetched_md5 = hashlib.md5(text_for_checksuming.translate(TRANSLATE_WHITESPACE_TABLE).encode('utf-8')).hexdigest() - else: - fetched_md5 = hashlib.md5(text_for_checksuming.encode('utf-8')).hexdigest() + # Calculate checksum + ignore_whitespace = self.datastore.data['settings']['application'].get('ignore_whitespace', False) + fetched_md5 = ChecksumCalculator.calculate(text_for_checksuming, ignore_whitespace=ignore_whitespace) - ############ Blocking rules, after checksum ################# + # === BLOCKING RULES EVALUATION === blocked = False - trigger_text = list(dict.fromkeys(watch.get('trigger_text', []) + self.datastore.get_tag_overrides_for_watch(uuid=watch.get('uuid'), attr='trigger_text'))) - if len(trigger_text): - # Assume blocked + + # Check trigger_text + if rule_engine.evaluate_trigger_text(stripped_text, filter_config.trigger_text): blocked = True - # Filter and trigger works the same, so reuse it - # It should return the line numbers that match - # Unblock flow if the trigger was found (some text remained after stripped what didnt match) - result = html_tools.strip_ignore_text(content=str(stripped_text_from_html), - wordlist=trigger_text, - mode="line numbers") - # Unblock if the trigger was found - if result: - blocked = False - text_should_not_be_present = list(dict.fromkeys(watch.get('text_should_not_be_present', []) + self.datastore.get_tag_overrides_for_watch(uuid=watch.get('uuid'), attr='text_should_not_be_present'))) - if len(text_should_not_be_present): - # If anything matched, then we should block a change from happening - result = html_tools.strip_ignore_text(content=str(stripped_text_from_html), - wordlist=text_should_not_be_present, - mode="line numbers") - if result: - blocked = True + # Check text_should_not_be_present + if rule_engine.evaluate_text_should_not_be_present(stripped_text, filter_config.text_should_not_be_present): + blocked = True - # And check if 'conditions' will let this pass through - if watch.get('conditions') and watch.get('conditions_match_logic'): - conditions_result = execute_ruleset_against_all_plugins(current_watch_uuid=watch.get('uuid'), - application_datastruct=self.datastore.data, - ephemeral_data={ - 'text': stripped_text_from_html - } - ) + # Check custom conditions + if rule_engine.evaluate_conditions(watch, self.datastore, stripped_text): + blocked = True - if not conditions_result.get('result'): - # Conditions say "Condition not met" so we block it. - blocked = True - - # Looks like something changed, but did it match all the rules? + # === CHANGE DETECTION === if blocked: changed_detected = False else: - # The main thing that all this at the moment comes down to :) + # Compare checksums if watch.get('previous_md5') != fetched_md5: changed_detected = True # Always record the new checksum update_obj["previous_md5"] = fetched_md5 - # On the first run of a site, watch['previous_md5'] will be None, set it the current one. + # On first run, initialize previous_md5 if not watch.get('previous_md5'): watch['previous_md5'] = fetched_md5 logger.debug(f"Watch UUID {watch.get('uuid')} content check - Previous MD5: {watch.get('previous_md5')}, Fetched MD5 {fetched_md5}") - if changed_detected: - if watch.get('check_unique_lines', False): - ignore_whitespace = self.datastore.data['settings']['application'].get('ignore_whitespace') + # === UNIQUE LINES CHECK === + if changed_detected and watch.get('check_unique_lines', False): + has_unique_lines = watch.lines_contain_something_unique_compared_to_history( + lines=stripped_text.splitlines(), + ignore_whitespace=ignore_whitespace + ) - has_unique_lines = watch.lines_contain_something_unique_compared_to_history( - lines=stripped_text_from_html.splitlines(), - ignore_whitespace=ignore_whitespace - ) + if not has_unique_lines: + logger.debug(f"check_unique_lines: UUID {watch.get('uuid')} didnt have anything new setting change_detected=False") + changed_detected = False + else: + logger.debug(f"check_unique_lines: UUID {watch.get('uuid')} had unique content") - # One or more lines? unsure? - if not has_unique_lines: - logger.debug(f"check_unique_lines: UUID {watch.get('uuid')} didnt have anything new setting change_detected=False") - changed_detected = False - else: - logger.debug(f"check_unique_lines: UUID {watch.get('uuid')} had unique content") + return changed_detected, update_obj, stripped_text + def _apply_diff_filtering(self, watch, stripped_text, text_before_filter): + """Apply user's diff filtering preferences (show only added/removed/replaced lines).""" + from changedetectionio import diff - # stripped_text_from_html - Everything after filters and NO 'ignored' content - return changed_detected, update_obj, stripped_text_from_html + rendered_diff = diff.render_diff( + previous_version_file_contents=watch.get_last_fetched_text_before_filters(), + newest_version_file_contents=stripped_text, + include_equal=False, + include_added=watch.get('filter_text_added', True), + include_removed=watch.get('filter_text_removed', True), + include_replaced=watch.get('filter_text_replaced', True), + line_feed_sep="\n", + include_change_type_prefix=False + ) + + watch.save_last_text_fetched_before_filters(text_before_filter.encode('utf-8')) + + if not rendered_diff and stripped_text: + # No differences found + return None + + return rendered_diff diff --git a/changedetectionio/tests/conftest.py b/changedetectionio/tests/conftest.py index 3e7069ca..bf9b071a 100644 --- a/changedetectionio/tests/conftest.py +++ b/changedetectionio/tests/conftest.py @@ -49,7 +49,7 @@ def measure_memory_usage(request): # Note: ru_maxrss is in kilobytes on Unix-based systems max_memory_used = memory_usage["peak"] / 1024 # Convert to MB - s = f"Peak memory used by the test {request.node.fspath} - '{request.node.name}': {max_memory_used:.2f} MB" + s = f"{time.time()} Peak memory used by the test {request.node.fspath} - '{request.node.name}': {max_memory_used:.2f} MB" logger.debug(s) with open("test-memory.log", 'a') as f: diff --git a/changedetectionio/tests/test_backend.py b/changedetectionio/tests/test_backend.py index a15a97a9..c945004c 100644 --- a/changedetectionio/tests/test_backend.py +++ b/changedetectionio/tests/test_backend.py @@ -312,3 +312,30 @@ def test_plaintext_even_if_xml_content(client, live_server, measure_memory_usage delete_all_watches(client) +# Server says its plaintext, we should always treat it as plaintext, and then if they have a filter, try to apply that +def test_plaintext_even_if_xml_content_and_can_apply_filters(client, live_server, measure_memory_usage): + + + with open("test-datastore/endpoint-content.txt", "w") as f: + f.write("""<?xml version="1.0" encoding="utf-8"?> +<resources xmlns:tools="http://schemas.android.com/tools"> + <!--Activity and fragment titles--> + <string name="feed_update_receiver_name">Abonnementen bijwerken</string> + <foobar>ok man</foobar> +</resources> +""") + + test_url=url_for('test_endpoint', content_type="text/plain", _external=True) + uuid = client.application.config.get('DATASTORE').add_watch(url=test_url, extras={"include_filters": ['//string']}) + client.get(url_for("ui.form_watch_checknow"), follow_redirects=True) + wait_for_all_checks(client) + + res = client.get( + url_for("ui.ui_views.preview_page", uuid="first"), + follow_redirects=True + ) + + assert b'<string name="feed_update_receiver_name"' in res.data + assert b'<foobar' not in res.data + + res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True) \ No newline at end of file diff --git a/changedetectionio/tests/test_element_removal.py b/changedetectionio/tests/test_element_removal.py index 69f1230f..d06c468c 100644 --- a/changedetectionio/tests/test_element_removal.py +++ b/changedetectionio/tests/test_element_removal.py @@ -211,44 +211,30 @@ def test_element_removal_full(client, live_server, measure_memory_usage): def test_element_removal_nth_offset_no_shift(client, live_server, measure_memory_usage): set_response_with_multiple_index() - subtractive_selectors_data = [""" -body > table > tr:nth-child(1) > th:nth-child(2) + subtractive_selectors_data = [ +### css style ### +"""body > table > tr:nth-child(1) > th:nth-child(2) body > table > tr:nth-child(2) > td:nth-child(2) body > table > tr:nth-child(3) > td:nth-child(2) body > table > tr:nth-child(1) > th:nth-child(3) body > table > tr:nth-child(2) > td:nth-child(3) body > table > tr:nth-child(3) > td:nth-child(3)""", +### second type, xpath ### """//body/table/tr[1]/th[2] //body/table/tr[2]/td[2] //body/table/tr[3]/td[2] //body/table/tr[1]/th[3] //body/table/tr[2]/td[3] //body/table/tr[3]/td[3]"""] + + test_url = url_for("test_endpoint", _external=True) for selector_list in subtractive_selectors_data: delete_all_watches(client) - # Add our URL to the import page - test_url = url_for("test_endpoint", _external=True) - res = client.post( - url_for("imports.import_page"), data={"urls": test_url}, follow_redirects=True - ) - assert b"1 Imported" in res.data - wait_for_all_checks(client) - - res = client.post( - url_for("ui.ui_edit.edit_page", uuid="first"), - data={ - "subtractive_selectors": selector_list, - "url": test_url, - "tags": "", - "fetch_backend": "html_requests", - "time_between_check_use_default": "y", - }, - follow_redirects=True, - ) - assert b"Updated watch." in res.data + uuid = client.application.config.get('DATASTORE').add_watch(url=test_url, extras={"subtractive_selectors": selector_list.splitlines()}) + client.get(url_for("ui.form_watch_checknow"), follow_redirects=True) wait_for_all_checks(client) res = client.get( @@ -256,6 +242,7 @@ body > table > tr:nth-child(3) > td:nth-child(3)""", follow_redirects=True ) + # the filters above should have removed this but they never say to remove the "emil" column assert b"Tobias" not in res.data assert b"Linus" not in res.data assert b"Person 2" not in res.data diff --git a/changedetectionio/tests/test_jsonpath_jq_selector.py b/changedetectionio/tests/test_jsonpath_jq_selector.py index fbe1efb5..64d392d2 100644 --- a/changedetectionio/tests/test_jsonpath_jq_selector.py +++ b/changedetectionio/tests/test_jsonpath_jq_selector.py @@ -205,9 +205,6 @@ def test_check_json_without_filter(client, live_server, measure_memory_usage): # and be sure it doesn't get chewed up by instriptis set_json_response_with_html() - # Give the endpoint time to spin up - time.sleep(1) - # Add our URL to the import page test_url = url_for('test_endpoint', content_type="application/json", _external=True) uuid = client.application.config.get('DATASTORE').add_watch(url=test_url) @@ -230,8 +227,6 @@ def test_check_json_without_filter(client, live_server, measure_memory_usage): def check_json_filter(json_filter, client, live_server): set_original_response() - # Give the endpoint time to spin up - time.sleep(1) # Add our URL to the import page test_url = url_for('test_endpoint', content_type="application/json", _external=True)