Compare commits

..

10 Commits

Author SHA1 Message Date
dgtlmoon
9672c92a9b tweak timing
Some checks failed
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Build distribution 📦 (push) Has been cancelled
ChangeDetection.io App Test / lint-code (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Test the built 📦 package works basically. (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Publish Python 🐍 distribution 📦 to PyPI (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-10 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-11 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-12 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-13 (push) Has been cancelled
2025-10-10 00:27:28 +02:00
dgtlmoon
3ab4b337b6 hmm 2025-10-09 19:13:05 +02:00
dgtlmoon
ead82d373a timing 2025-10-09 18:54:02 +02:00
dgtlmoon
1abb5de0e7 speed up waits 2025-10-09 18:49:09 +02:00
dgtlmoon
830332ff82 tweak stats 2025-10-09 18:44:28 +02:00
dgtlmoon
0bec23bd72 Add memor usage 2025-10-09 18:38:13 +02:00
dgtlmoon
c61ea82816 Merge branch 'master' into test-speedup 2025-10-09 18:33:43 +02:00
dgtlmoon
ab84c6590a Test speedup - remove common calls for function calls 2025-10-09 18:33:15 +02:00
dgtlmoon
cde42c8a49 Reducing memory usage (#3476)
Some checks failed
Build and push containers / metadata (push) Has been cancelled
Build and push containers / build-push-containers (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Build distribution 📦 (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Test the built 📦 package works basically. (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Publish Python 🐍 distribution 📦 to PyPI (push) Has been cancelled
ChangeDetection.io App Test / lint-code (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-10 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-11 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-12 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-13 (push) Has been cancelled
2025-10-09 18:31:19 +02:00
dgtlmoon
3b9d19df43 Refactoring text/html difference processor (#3475) 2025-10-09 18:30:53 +02:00
53 changed files with 798 additions and 1020 deletions

View File

@@ -100,7 +100,7 @@ def element_removal(selectors: List[str], html_content):
xpath_selectors = []
for selector in selectors:
if selector.startswith(('xpath:', 'xpath1:', '//')):
if selector.strip().startswith(('xpath:', 'xpath1:', '//')):
# Handle XPath selectors separately
xpath_selector = selector.removeprefix('xpath:').removeprefix('xpath1:')
xpath_selectors.append(xpath_selector)

View File

@@ -37,355 +37,516 @@ class PDFToHTMLToolNotFound(ValueError):
ValueError.__init__(self, msg)
class FilterConfig:
"""Consolidates all filter and rule configurations from watch, tags, and global settings."""
def __init__(self, watch, datastore):
self.watch = watch
self.datastore = datastore
self.watch_uuid = watch.get('uuid')
# Cache computed properties to avoid repeated list operations
self._include_filters_cache = None
self._subtractive_selectors_cache = None
def _get_merged_rules(self, attr, include_global=False):
"""Merge rules from watch, tags, and optionally global settings."""
watch_rules = self.watch.get(attr, [])
tag_rules = self.datastore.get_tag_overrides_for_watch(uuid=self.watch_uuid, attr=attr)
rules = list(dict.fromkeys(watch_rules + tag_rules))
if include_global:
global_rules = self.datastore.data['settings']['application'].get(f'global_{attr}', [])
rules = list(dict.fromkeys(rules + global_rules))
return rules
@property
def include_filters(self):
if self._include_filters_cache is None:
filters = self._get_merged_rules('include_filters')
# Inject LD+JSON price tracker rule if enabled
if self.watch.get('track_ldjson_price_data', '') == PRICE_DATA_TRACK_ACCEPT:
filters += html_tools.LD_JSON_PRODUCT_OFFER_SELECTORS
self._include_filters_cache = filters
return self._include_filters_cache
@property
def subtractive_selectors(self):
if self._subtractive_selectors_cache is None:
watch_selectors = self.watch.get("subtractive_selectors", [])
tag_selectors = self.datastore.get_tag_overrides_for_watch(uuid=self.watch_uuid, attr='subtractive_selectors')
global_selectors = self.datastore.data["settings"]["application"].get("global_subtractive_selectors", [])
self._subtractive_selectors_cache = [*tag_selectors, *watch_selectors, *global_selectors]
return self._subtractive_selectors_cache
@property
def extract_text(self):
return self._get_merged_rules('extract_text')
@property
def ignore_text(self):
return self._get_merged_rules('ignore_text', include_global=True)
@property
def trigger_text(self):
return self._get_merged_rules('trigger_text')
@property
def text_should_not_be_present(self):
return self._get_merged_rules('text_should_not_be_present')
@property
def has_include_filters(self):
return bool(self.include_filters) and bool(self.include_filters[0].strip())
@property
def has_subtractive_selectors(self):
return bool(self.subtractive_selectors) and bool(self.subtractive_selectors[0].strip())
class ContentTransformer:
"""Handles text transformations like trimming, sorting, and deduplication."""
@staticmethod
def trim_whitespace(text):
"""Remove leading/trailing whitespace from each line."""
# Use generator expression to avoid building intermediate list
return '\n'.join(line.strip() for line in text.replace("\n\n", "\n").splitlines())
@staticmethod
def remove_duplicate_lines(text):
"""Remove duplicate lines while preserving order."""
return '\n'.join(dict.fromkeys(line for line in text.replace("\n\n", "\n").splitlines()))
@staticmethod
def sort_alphabetically(text):
"""Sort lines alphabetically (case-insensitive)."""
# Remove double line feeds before sorting
text = text.replace("\n\n", "\n")
return '\n'.join(sorted(text.splitlines(), key=lambda x: x.lower()))
@staticmethod
def extract_by_regex(text, regex_patterns):
"""Extract text matching regex patterns."""
# Use list of strings instead of concatenating lists repeatedly (avoids O(n²) behavior)
regex_matched_output = []
for s_re in regex_patterns:
# Check if it's perl-style regex /.../
if re.search(PERL_STYLE_REGEX, s_re, re.IGNORECASE):
regex = html_tools.perl_style_slash_enclosed_regex_to_options(s_re)
result = re.findall(regex, text)
for match in result:
if type(match) is tuple:
regex_matched_output.extend(match)
regex_matched_output.append('\n')
else:
regex_matched_output.append(match)
regex_matched_output.append('\n')
else:
# Plain text search (case-insensitive)
r = re.compile(re.escape(s_re), re.IGNORECASE)
res = r.findall(text)
if res:
for match in res:
regex_matched_output.append(match)
regex_matched_output.append('\n')
return ''.join(regex_matched_output) if regex_matched_output else ''
class RuleEngine:
"""Evaluates blocking rules (triggers, conditions, text_should_not_be_present)."""
@staticmethod
def evaluate_trigger_text(content, trigger_patterns):
"""
Check if trigger text is present. If trigger_text is configured,
content is blocked UNLESS the trigger is found.
Returns True if blocked, False if allowed.
"""
if not trigger_patterns:
return False
# Assume blocked if trigger_text is configured
result = html_tools.strip_ignore_text(
content=str(content),
wordlist=trigger_patterns,
mode="line numbers"
)
# Unblock if trigger was found
return not bool(result)
@staticmethod
def evaluate_text_should_not_be_present(content, patterns):
"""
Check if forbidden text is present. If found, block the change.
Returns True if blocked, False if allowed.
"""
if not patterns:
return False
result = html_tools.strip_ignore_text(
content=str(content),
wordlist=patterns,
mode="line numbers"
)
# Block if forbidden text was found
return bool(result)
@staticmethod
def evaluate_conditions(watch, datastore, content):
"""
Evaluate custom conditions ruleset.
Returns True if blocked, False if allowed.
"""
if not watch.get('conditions') or not watch.get('conditions_match_logic'):
return False
conditions_result = execute_ruleset_against_all_plugins(
current_watch_uuid=watch.get('uuid'),
application_datastruct=datastore.data,
ephemeral_data={'text': content}
)
# Block if conditions not met
return not conditions_result.get('result')
class ContentProcessor:
"""Handles content preprocessing, filtering, and extraction."""
def __init__(self, fetcher, watch, filter_config, datastore):
self.fetcher = fetcher
self.watch = watch
self.filter_config = filter_config
self.datastore = datastore
def preprocess_rss(self, content):
"""Convert CDATA/comments in RSS to usable text."""
return cdata_in_document_to_text(html_content=content)
def preprocess_pdf(self, content, raw_content):
"""Convert PDF to HTML using external tool."""
from shutil import which
tool = os.getenv("PDF_TO_HTML_TOOL", "pdftohtml")
if not which(tool):
raise PDFToHTMLToolNotFound(
f"Command-line `{tool}` tool was not found in system PATH, was it installed?"
)
import subprocess
proc = subprocess.Popen(
[tool, '-stdout', '-', '-s', 'out.pdf', '-i'],
stdout=subprocess.PIPE,
stdin=subprocess.PIPE
)
proc.stdin.write(raw_content)
proc.stdin.close()
html_content = proc.stdout.read().decode('utf-8')
proc.wait(timeout=60)
# Add metadata for change detection
metadata = (
f"<p>Added by changedetection.io: Document checksum - "
f"{hashlib.md5(raw_content).hexdigest().upper()} "
f"Filesize - {len(html_content)} bytes</p>"
)
return html_content.replace('</body>', metadata + '</body>')
def preprocess_json(self, content, has_filters):
"""Format and sort JSON content."""
# Force reformat if no filters specified
if not has_filters:
content = html_tools.extract_json_as_string(content=content, json_filter="json:$")
# Sort JSON to avoid false alerts from reordering
try:
content = json.dumps(json.loads(content), sort_keys=True)
except Exception:
# Might be malformed JSON, continue anyway
pass
return content
def apply_include_filters(self, content, stream_content_type):
"""Apply CSS, XPath, or JSON filters to extract specific content."""
filtered_content = ""
for filter_rule in self.filter_config.include_filters:
# XPath filters
if filter_rule[0] == '/' or filter_rule.startswith('xpath:'):
filtered_content += html_tools.xpath_filter(
xpath_filter=filter_rule.replace('xpath:', ''),
html_content=content,
append_pretty_line_formatting=not self.watch.is_source_type_url,
is_rss=stream_content_type.is_rss
)
# XPath1 filters (first match only)
elif filter_rule.startswith('xpath1:'):
filtered_content += html_tools.xpath1_filter(
xpath_filter=filter_rule.replace('xpath1:', ''),
html_content=content,
append_pretty_line_formatting=not self.watch.is_source_type_url,
is_rss=stream_content_type.is_rss
)
# JSON filters
elif any(filter_rule.startswith(prefix) for prefix in json_filter_prefixes):
filtered_content += html_tools.extract_json_as_string(
content=content,
json_filter=filter_rule
)
# CSS selectors, default fallback
else:
filtered_content += html_tools.include_filters(
include_filters=filter_rule,
html_content=content,
append_pretty_line_formatting=not self.watch.is_source_type_url
)
# Raise error if filter returned nothing
if not filtered_content.strip():
raise FilterNotFoundInResponse(
msg=self.filter_config.include_filters,
screenshot=self.fetcher.screenshot,
xpath_data=self.fetcher.xpath_data
)
return filtered_content
def apply_subtractive_selectors(self, content):
"""Remove elements matching subtractive selectors."""
return html_tools.element_removal(self.filter_config.subtractive_selectors, content)
def extract_text_from_html(self, html_content, stream_content_type):
"""Convert HTML to plain text."""
do_anchor = self.datastore.data["settings"]["application"].get("render_anchor_tag_content", False)
return html_tools.html_to_text(
html_content=html_content,
render_anchor_tag_content=do_anchor,
is_rss=stream_content_type.is_rss
)
class ChecksumCalculator:
"""Calculates checksums with various options."""
@staticmethod
def calculate(text, ignore_whitespace=False):
"""Calculate MD5 checksum of text content."""
if ignore_whitespace:
text = text.translate(TRANSLATE_WHITESPACE_TABLE)
return hashlib.md5(text.encode('utf-8')).hexdigest()
# Some common stuff here that can be moved to a base class
# (set_proxy_from_list)
class perform_site_check(difference_detection_processor):
def run_changedetection(self, watch):
changed_detected = False
html_content = ""
screenshot = False # as bytes
stripped_text_from_html = ""
if not watch:
raise Exception("Watch no longer exists.")
# Initialize components
filter_config = FilterConfig(watch, self.datastore)
content_processor = ContentProcessor(self.fetcher, watch, filter_config, self.datastore)
transformer = ContentTransformer()
rule_engine = RuleEngine()
# Get content type and stream info
ctype_header = self.fetcher.get_all_headers().get('content-type', DEFAULT_WHEN_NO_CONTENT_TYPE_HEADER).lower()
stream_content_type = guess_stream_type(http_content_header=ctype_header, content=self.fetcher.content)
# Unset any existing notification error
update_obj = {'last_notification_error': False, 'last_error': False}
url = watch.link
self.screenshot = self.fetcher.screenshot
self.xpath_data = self.fetcher.xpath_data
# Track the content type
# Track the content type and checksum before filters
update_obj['content_type'] = ctype_header
# Watches added automatically in the queue manager will skip if its the same checksum as the previous run
# Saves a lot of CPU
update_obj['previous_md5_before_filters'] = hashlib.md5(self.fetcher.content.encode('utf-8')).hexdigest()
# Fetching complete, now filters
# === CONTENT PREPROCESSING ===
# Avoid creating unnecessary intermediate string copies by reassigning only when needed
content = self.fetcher.content
# @note: I feel like the following should be in a more obvious chain system
# - Check filter text
# - Is the checksum different?
# - Do we convert to JSON?
# https://stackoverflow.com/questions/41817578/basic-method-chaining ?
# return content().textfilter().jsonextract().checksumcompare() ?
# Go into RSS preprocess for converting CDATA/comment to usable text
# RSS preprocessing
if stream_content_type.is_rss:
self.fetcher.content = cdata_in_document_to_text(html_content=self.fetcher.content)
content = content_processor.preprocess_rss(content)
# PDF preprocessing
if watch.is_pdf or stream_content_type.is_pdf:
from shutil import which
tool = os.getenv("PDF_TO_HTML_TOOL", "pdftohtml")
if not which(tool):
raise PDFToHTMLToolNotFound("Command-line `{}` tool was not found in system PATH, was it installed?".format(tool))
import subprocess
proc = subprocess.Popen(
[tool, '-stdout', '-', '-s', 'out.pdf', '-i'],
stdout=subprocess.PIPE,
stdin=subprocess.PIPE)
proc.stdin.write(self.fetcher.raw_content)
proc.stdin.close()
self.fetcher.content = proc.stdout.read().decode('utf-8')
proc.wait(timeout=60)
# Add a little metadata so we know if the file changes (like if an image changes, but the text is the same
# @todo may cause problems with non-UTF8?
metadata = "<p>Added by changedetection.io: Document checksum - {} Filesize - {} bytes</p>".format(
hashlib.md5(self.fetcher.raw_content).hexdigest().upper(),
len(self.fetcher.content))
self.fetcher.content = self.fetcher.content.replace('</body>', metadata + '</body>')
# Better would be if Watch.model could access the global data also
# and then use getattr https://docs.python.org/3/reference/datamodel.html#object.__getitem__
# https://realpython.com/inherit-python-dict/ instead of doing it procedurely
include_filters_from_tags = self.datastore.get_tag_overrides_for_watch(uuid=watch.get('uuid'), attr='include_filters')
# 1845 - remove duplicated filters in both group and watch include filter
include_filters_rule = list(dict.fromkeys(watch.get('include_filters', []) + include_filters_from_tags))
subtractive_selectors = [*self.datastore.get_tag_overrides_for_watch(uuid=watch.get('uuid'), attr='subtractive_selectors'),
*watch.get("subtractive_selectors", []),
*self.datastore.data["settings"]["application"].get("global_subtractive_selectors", [])
]
# Inject a virtual LD+JSON price tracker rule
if watch.get('track_ldjson_price_data', '') == PRICE_DATA_TRACK_ACCEPT:
include_filters_rule += html_tools.LD_JSON_PRODUCT_OFFER_SELECTORS
has_filter_rule = len(include_filters_rule) and len(include_filters_rule[0].strip())
has_subtractive_selectors = len(subtractive_selectors) and len(subtractive_selectors[0].strip())
content = content_processor.preprocess_pdf(content, self.fetcher.raw_content)
# JSON preprocessing
if stream_content_type.is_json:
if not has_filter_rule:
# Force a reformat
include_filters_rule.append("json:$")
has_filter_rule = True
content = content_processor.preprocess_json(content, filter_config.has_include_filters)
# Sort the JSON so we dont get false alerts when the content is just re-ordered
try:
self.fetcher.content = json.dumps(json.loads(self.fetcher.content), sort_keys=True)
except Exception as e:
# Might have just been a snippet, or otherwise bad JSON, continue
pass
# HTML obfuscation workarounds
if stream_content_type.is_html:
content = html_tools.workarounds_for_obfuscations(content)
if has_filter_rule:
for filter in include_filters_rule:
if any(prefix in filter for prefix in json_filter_prefixes):
stripped_text_from_html += html_tools.extract_json_as_string(content=self.fetcher.content, json_filter=filter)
if stripped_text_from_html:
stream_content_type.is_json = True
stream_content_type.is_html = False
# Check for LD+JSON price data (for HTML content)
if stream_content_type.is_html:
update_obj['has_ldjson_price_data'] = html_tools.has_ldjson_product_info(content)
# We have 'watch.is_source_type_url' because we should be able to use selectors on the raw HTML but return just that selected HTML
if stream_content_type.is_html or watch.is_source_type_url or stream_content_type.is_plaintext or stream_content_type.is_rss or stream_content_type.is_xml or stream_content_type.is_pdf:
# === FILTER APPLICATION ===
# Start with content reference, avoid copy until modification
html_content = content
# CSS Filter, extract the HTML that matches and feed that into the existing inscriptis::get_text
self.fetcher.content = html_tools.workarounds_for_obfuscations(self.fetcher.content)
html_content = self.fetcher.content
# Apply include filters (CSS, XPath, JSON)
if filter_config.has_include_filters:
html_content = content_processor.apply_include_filters(content, stream_content_type)
# Some kind of "text" but definitely not RSS looking
if stream_content_type.is_plaintext:
# Don't run get_text or xpath/css filters on plaintext
# We are not HTML, we are not any kind of RSS, doesnt even look like HTML
stripped_text_from_html = html_content
# Apply subtractive selectors
if filter_config.has_subtractive_selectors:
html_content = content_processor.apply_subtractive_selectors(html_content)
# === TEXT EXTRACTION ===
if watch.is_source_type_url:
# For source URLs, keep raw content
stripped_text = html_content
else:
# Extract text from HTML/RSS content (not generic XML)
if stream_content_type.is_html or stream_content_type.is_rss:
stripped_text = content_processor.extract_text_from_html(html_content, stream_content_type)
else:
# If not JSON, and if it's not text/plain..
# Does it have some ld+json price data? used for easier monitoring
update_obj['has_ldjson_price_data'] = html_tools.has_ldjson_product_info(self.fetcher.content)
# Then we assume HTML
if has_filter_rule:
html_content = ""
for filter_rule in include_filters_rule:
# For HTML/XML we offer xpath as an option, just start a regular xPath "/.."
if filter_rule[0] == '/' or filter_rule.startswith('xpath:'):
html_content += html_tools.xpath_filter(xpath_filter=filter_rule.replace('xpath:', ''),
html_content=self.fetcher.content,
append_pretty_line_formatting=not watch.is_source_type_url,
is_rss=stream_content_type.is_rss)
elif filter_rule.startswith('xpath1:'):
html_content += html_tools.xpath1_filter(xpath_filter=filter_rule.replace('xpath1:', ''),
html_content=self.fetcher.content,
append_pretty_line_formatting=not watch.is_source_type_url,
is_rss=stream_content_type.is_rss)
else:
html_content += html_tools.include_filters(include_filters=filter_rule,
html_content=self.fetcher.content,
append_pretty_line_formatting=not watch.is_source_type_url)
if not html_content.strip():
raise FilterNotFoundInResponse(msg=include_filters_rule, screenshot=self.fetcher.screenshot, xpath_data=self.fetcher.xpath_data)
if has_subtractive_selectors:
html_content = html_tools.element_removal(subtractive_selectors, html_content)
if watch.is_source_type_url:
stripped_text_from_html = html_content
else:
# extract text
do_anchor = self.datastore.data["settings"]["application"].get("render_anchor_tag_content", False)
stripped_text_from_html = html_tools.html_to_text(html_content=html_content,
render_anchor_tag_content=do_anchor,
is_rss=stream_content_type.is_rss) # 1874 activate the <title workaround hack
stripped_text = html_content
# === TEXT TRANSFORMATIONS ===
if watch.get('trim_text_whitespace'):
stripped_text_from_html = '\n'.join(line.strip() for line in stripped_text_from_html.replace("\n\n", "\n").splitlines())
stripped_text = transformer.trim_whitespace(stripped_text)
# Re #340 - return the content before the 'ignore text' was applied
# Also used to calculate/show what was removed
text_content_before_ignored_filter = stripped_text_from_html
# @todo whitespace coming from missing rtrim()?
# stripped_text_from_html could be based on their preferences, replace the processed text with only that which they want to know about.
# Rewrite's the processing text based on only what diff result they want to see
# Save text before ignore filters (for diff calculation)
text_content_before_ignored_filter = stripped_text
# === DIFF FILTERING ===
# If user wants specific diff types (added/removed/replaced only)
if watch.has_special_diff_filter_options_set() and len(watch.history.keys()):
# Now the content comes from the diff-parser and not the returned HTTP traffic, so could be some differences
from changedetectionio import diff
# needs to not include (added) etc or it may get used twice
# Replace the processed text with the preferred result
rendered_diff = diff.render_diff(previous_version_file_contents=watch.get_last_fetched_text_before_filters(),
newest_version_file_contents=stripped_text_from_html,
include_equal=False, # not the same lines
include_added=watch.get('filter_text_added', True),
include_removed=watch.get('filter_text_removed', True),
include_replaced=watch.get('filter_text_replaced', True),
line_feed_sep="\n",
include_change_type_prefix=False)
stripped_text = self._apply_diff_filtering(watch, stripped_text, text_content_before_ignored_filter)
if stripped_text is None:
# No differences found, but content exists
c = ChecksumCalculator.calculate(text_content_before_ignored_filter, ignore_whitespace=True)
return False, {'previous_md5': c}, text_content_before_ignored_filter.encode('utf-8')
watch.save_last_text_fetched_before_filters(text_content_before_ignored_filter.encode('utf-8'))
if not rendered_diff and stripped_text_from_html:
# We had some content, but no differences were found
# Store our new file as the MD5 so it will trigger in the future
c = hashlib.md5(stripped_text_from_html.translate(TRANSLATE_WHITESPACE_TABLE).encode('utf-8')).hexdigest()
return False, {'previous_md5': c}, stripped_text_from_html.encode('utf-8')
else:
stripped_text_from_html = rendered_diff
# Treat pages with no renderable text content as a change? No by default
# === EMPTY PAGE CHECK ===
empty_pages_are_a_change = self.datastore.data['settings']['application'].get('empty_pages_are_a_change', False)
if not stream_content_type.is_json and not empty_pages_are_a_change and len(stripped_text_from_html.strip()) == 0:
raise content_fetchers.exceptions.ReplyWithContentButNoText(url=url,
status_code=self.fetcher.get_last_status_code(),
screenshot=self.fetcher.screenshot,
has_filters=has_filter_rule,
html_content=html_content,
xpath_data=self.fetcher.xpath_data
)
# We rely on the actual text in the html output.. many sites have random script vars etc,
# in the future we'll implement other mechanisms.
if not stream_content_type.is_json and not empty_pages_are_a_change and len(stripped_text.strip()) == 0:
raise content_fetchers.exceptions.ReplyWithContentButNoText(
url=url,
status_code=self.fetcher.get_last_status_code(),
screenshot=self.fetcher.screenshot,
has_filters=filter_config.has_include_filters,
html_content=html_content,
xpath_data=self.fetcher.xpath_data
)
update_obj["last_check_status"] = self.fetcher.get_last_status_code()
# 615 Extract text by regex
extract_text = list(dict.fromkeys(watch.get('extract_text', []) + self.datastore.get_tag_overrides_for_watch(uuid=watch.get('uuid'), attr='extract_text')))
if len(extract_text) > 0:
regex_matched_output = []
for s_re in extract_text:
# incase they specified something in '/.../x'
if re.search(PERL_STYLE_REGEX, s_re, re.IGNORECASE):
regex = html_tools.perl_style_slash_enclosed_regex_to_options(s_re)
result = re.findall(regex, stripped_text_from_html)
for l in result:
if type(l) is tuple:
# @todo - some formatter option default (between groups)
regex_matched_output += list(l) + ['\n']
else:
# @todo - some formatter option default (between each ungrouped result)
regex_matched_output += [l] + ['\n']
else:
# Doesnt look like regex, just hunt for plaintext and return that which matches
# `stripped_text_from_html` will be bytes, so we must encode s_re also to bytes
r = re.compile(re.escape(s_re), re.IGNORECASE)
res = r.findall(stripped_text_from_html)
if res:
for match in res:
regex_matched_output += [match] + ['\n']
##########################################################
stripped_text_from_html = ''
if regex_matched_output:
# @todo some formatter for presentation?
stripped_text_from_html = ''.join(regex_matched_output)
# === REGEX EXTRACTION ===
if filter_config.extract_text:
extracted = transformer.extract_by_regex(stripped_text, filter_config.extract_text)
stripped_text = extracted
# === MORE TEXT TRANSFORMATIONS ===
if watch.get('remove_duplicate_lines'):
stripped_text_from_html = '\n'.join(dict.fromkeys(line for line in stripped_text_from_html.replace("\n\n", "\n").splitlines()))
stripped_text = transformer.remove_duplicate_lines(stripped_text)
if watch.get('sort_text_alphabetically'):
# Note: Because a <p>something</p> will add an extra line feed to signify the paragraph gap
# we end up with 'Some text\n\n', sorting will add all those extra \n at the start, so we remove them here.
stripped_text_from_html = stripped_text_from_html.replace("\n\n", "\n")
stripped_text_from_html = '\n'.join(sorted(stripped_text_from_html.splitlines(), key=lambda x: x.lower()))
stripped_text = transformer.sort_alphabetically(stripped_text)
### CALCULATE MD5
# If there's text to ignore
text_to_ignore = watch.get('ignore_text', []) + self.datastore.data['settings']['application'].get('global_ignore_text', [])
text_to_ignore += self.datastore.get_tag_overrides_for_watch(uuid=watch.get('uuid'), attr='ignore_text')
# === CHECKSUM CALCULATION ===
text_for_checksuming = stripped_text
text_for_checksuming = stripped_text_from_html
if text_to_ignore:
text_for_checksuming = html_tools.strip_ignore_text(stripped_text_from_html, text_to_ignore)
# Some people prefer to also completely remove it
strip_ignored_lines = watch.get('strip_ignored_lines') if watch.get('strip_ignored_lines') is not None else self.datastore.data['settings']['application'].get('strip_ignored_lines')
# Apply ignore_text for checksum calculation
if filter_config.ignore_text:
text_for_checksuming = html_tools.strip_ignore_text(stripped_text, filter_config.ignore_text)
# Optionally remove ignored lines from output
strip_ignored_lines = watch.get('strip_ignored_lines')
if strip_ignored_lines is None:
strip_ignored_lines = self.datastore.data['settings']['application'].get('strip_ignored_lines')
if strip_ignored_lines:
# @todo add test in the 'preview' mode, check the widget works? compare to datastruct
stripped_text_from_html = text_for_checksuming
stripped_text = text_for_checksuming
# Re #133 - if we should strip whitespaces from triggering the change detected comparison
if text_for_checksuming and self.datastore.data['settings']['application'].get('ignore_whitespace', False):
fetched_md5 = hashlib.md5(text_for_checksuming.translate(TRANSLATE_WHITESPACE_TABLE).encode('utf-8')).hexdigest()
else:
fetched_md5 = hashlib.md5(text_for_checksuming.encode('utf-8')).hexdigest()
# Calculate checksum
ignore_whitespace = self.datastore.data['settings']['application'].get('ignore_whitespace', False)
fetched_md5 = ChecksumCalculator.calculate(text_for_checksuming, ignore_whitespace=ignore_whitespace)
############ Blocking rules, after checksum #################
# === BLOCKING RULES EVALUATION ===
blocked = False
trigger_text = list(dict.fromkeys(watch.get('trigger_text', []) + self.datastore.get_tag_overrides_for_watch(uuid=watch.get('uuid'), attr='trigger_text')))
if len(trigger_text):
# Assume blocked
# Check trigger_text
if rule_engine.evaluate_trigger_text(stripped_text, filter_config.trigger_text):
blocked = True
# Filter and trigger works the same, so reuse it
# It should return the line numbers that match
# Unblock flow if the trigger was found (some text remained after stripped what didnt match)
result = html_tools.strip_ignore_text(content=str(stripped_text_from_html),
wordlist=trigger_text,
mode="line numbers")
# Unblock if the trigger was found
if result:
blocked = False
text_should_not_be_present = list(dict.fromkeys(watch.get('text_should_not_be_present', []) + self.datastore.get_tag_overrides_for_watch(uuid=watch.get('uuid'), attr='text_should_not_be_present')))
if len(text_should_not_be_present):
# If anything matched, then we should block a change from happening
result = html_tools.strip_ignore_text(content=str(stripped_text_from_html),
wordlist=text_should_not_be_present,
mode="line numbers")
if result:
blocked = True
# Check text_should_not_be_present
if rule_engine.evaluate_text_should_not_be_present(stripped_text, filter_config.text_should_not_be_present):
blocked = True
# And check if 'conditions' will let this pass through
if watch.get('conditions') and watch.get('conditions_match_logic'):
conditions_result = execute_ruleset_against_all_plugins(current_watch_uuid=watch.get('uuid'),
application_datastruct=self.datastore.data,
ephemeral_data={
'text': stripped_text_from_html
}
)
# Check custom conditions
if rule_engine.evaluate_conditions(watch, self.datastore, stripped_text):
blocked = True
if not conditions_result.get('result'):
# Conditions say "Condition not met" so we block it.
blocked = True
# Looks like something changed, but did it match all the rules?
# === CHANGE DETECTION ===
if blocked:
changed_detected = False
else:
# The main thing that all this at the moment comes down to :)
# Compare checksums
if watch.get('previous_md5') != fetched_md5:
changed_detected = True
# Always record the new checksum
update_obj["previous_md5"] = fetched_md5
# On the first run of a site, watch['previous_md5'] will be None, set it the current one.
# On first run, initialize previous_md5
if not watch.get('previous_md5'):
watch['previous_md5'] = fetched_md5
logger.debug(f"Watch UUID {watch.get('uuid')} content check - Previous MD5: {watch.get('previous_md5')}, Fetched MD5 {fetched_md5}")
if changed_detected:
if watch.get('check_unique_lines', False):
ignore_whitespace = self.datastore.data['settings']['application'].get('ignore_whitespace')
# === UNIQUE LINES CHECK ===
if changed_detected and watch.get('check_unique_lines', False):
has_unique_lines = watch.lines_contain_something_unique_compared_to_history(
lines=stripped_text.splitlines(),
ignore_whitespace=ignore_whitespace
)
has_unique_lines = watch.lines_contain_something_unique_compared_to_history(
lines=stripped_text_from_html.splitlines(),
ignore_whitespace=ignore_whitespace
)
if not has_unique_lines:
logger.debug(f"check_unique_lines: UUID {watch.get('uuid')} didnt have anything new setting change_detected=False")
changed_detected = False
else:
logger.debug(f"check_unique_lines: UUID {watch.get('uuid')} had unique content")
# One or more lines? unsure?
if not has_unique_lines:
logger.debug(f"check_unique_lines: UUID {watch.get('uuid')} didnt have anything new setting change_detected=False")
changed_detected = False
else:
logger.debug(f"check_unique_lines: UUID {watch.get('uuid')} had unique content")
return changed_detected, update_obj, stripped_text
def _apply_diff_filtering(self, watch, stripped_text, text_before_filter):
"""Apply user's diff filtering preferences (show only added/removed/replaced lines)."""
from changedetectionio import diff
# stripped_text_from_html - Everything after filters and NO 'ignored' content
return changed_detected, update_obj, stripped_text_from_html
rendered_diff = diff.render_diff(
previous_version_file_contents=watch.get_last_fetched_text_before_filters(),
newest_version_file_contents=stripped_text,
include_equal=False,
include_added=watch.get('filter_text_added', True),
include_removed=watch.get('filter_text_removed', True),
include_replaced=watch.get('filter_text_replaced', True),
line_feed_sep="\n",
include_change_type_prefix=False
)
watch.save_last_text_fetched_before_filters(text_before_filter.encode('utf-8'))
if not rendered_diff and stripped_text:
# No differences found
return None
return rendered_diff

View File

@@ -29,16 +29,28 @@ def reportlog(pytestconfig):
logger.remove(handler_id)
def format_memory_human(bytes_value):
"""Format memory in human-readable units (KB, MB, GB)"""
if bytes_value < 1024:
return f"{bytes_value} B"
elif bytes_value < 1024 ** 2:
return f"{bytes_value / 1024:.2f} KB"
elif bytes_value < 1024 ** 3:
return f"{bytes_value / (1024 ** 2):.2f} MB"
else:
return f"{bytes_value / (1024 ** 3):.2f} GB"
def track_memory(memory_usage, ):
process = psutil.Process(os.getpid())
while not memory_usage["stop"]:
current_rss = process.memory_info().rss
memory_usage["peak"] = max(memory_usage["peak"], current_rss)
memory_usage["current"] = current_rss # Keep updating current
time.sleep(0.01) # Adjust the sleep time as needed
@pytest.fixture(scope='function')
def measure_memory_usage(request):
memory_usage = {"peak": 0, "stop": False}
memory_usage = {"peak": 0, "current": 0, "stop": False}
tracker_thread = Thread(target=track_memory, args=(memory_usage,))
tracker_thread.start()
@@ -47,16 +59,17 @@ def measure_memory_usage(request):
memory_usage["stop"] = True
tracker_thread.join()
# Note: ru_maxrss is in kilobytes on Unix-based systems
max_memory_used = memory_usage["peak"] / 1024 # Convert to MB
s = f"Peak memory used by the test {request.node.fspath} - '{request.node.name}': {max_memory_used:.2f} MB"
# Note: psutil returns RSS memory in bytes
peak_human = format_memory_human(memory_usage["peak"])
s = f"{time.time()} {request.node.fspath} - '{request.node.name}' - Peak memory: {peak_human}"
logger.debug(s)
with open("test-memory.log", 'a') as f:
f.write(f"{s}\n")
# Assert that the memory usage is less than 200MB
# assert max_memory_used < 150, f"Memory usage exceeded 200MB: {max_memory_used:.2f} MB"
# assert peak_memory_kb < 150 * 1024, f"Memory usage exceeded 150MB: {peak_human}"
def cleanup(datastore_path):

View File

@@ -29,13 +29,8 @@ def do_test(client, live_server, make_test_use_extra_browser=False):
assert b"Settings updated." in res.data
# Add our URL to the import page
res = client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
if make_test_use_extra_browser:

View File

@@ -2,7 +2,7 @@
import json
import os
from flask import url_for
from changedetectionio.tests.util import live_server_setup, wait_for_all_checks, extract_UUID_from_client
from changedetectionio.tests.util import live_server_setup, wait_for_all_checks, extract_UUID_from_client, delete_all_watches
def set_response():
@@ -98,6 +98,5 @@ def test_socks5(client, live_server, measure_memory_usage):
)
assert b"OK" in res.data
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data
delete_all_watches(client)

View File

@@ -5,7 +5,7 @@ import re
from flask import url_for
from changedetectionio.tests.util import set_original_response, set_modified_response, set_more_modified_response, live_server_setup, \
wait_for_all_checks, \
set_longer_modified_response
set_longer_modified_response, delete_all_watches
from changedetectionio.tests.util import extract_UUID_from_client
import logging
import base64
@@ -85,8 +85,7 @@ def test_check_notification_email_formats_default_HTML(client, live_server, meas
assert '(added) So let\'s see what happens.\r\n' in msg # The plaintext part with \r\n
assert 'Content-Type: text/html' in msg
assert '(added) So let\'s see what happens.<br>' in msg # the html part
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data
delete_all_watches(client)
def test_check_notification_email_formats_default_Text_override_HTML(client, live_server, measure_memory_usage):
@@ -179,5 +178,4 @@ def test_check_notification_email_formats_default_Text_override_HTML(client, liv
assert '&lt;' not in msg
assert 'Content-Type: text/html' in msg
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data
delete_all_watches(client)

View File

@@ -2,7 +2,7 @@ from .util import live_server_setup, wait_for_all_checks
from flask import url_for
import time
def test_check_access_control(app, client, live_server):
def test_check_access_control(app, client, live_server, measure_memory_usage):
# Still doesnt work, but this is closer.
# live_server_setup(live_server) # Setup on conftest per function

View File

@@ -3,7 +3,7 @@
import os.path
from flask import url_for
from .util import live_server_setup, wait_for_all_checks, wait_for_notification_endpoint_output
from .util import live_server_setup, wait_for_all_checks, wait_for_notification_endpoint_output, delete_all_watches
import time
def set_original(excluding=None, add_line=None):
@@ -44,12 +44,8 @@ def test_check_removed_line_contains_trigger(client, live_server, measure_memory
set_original()
# Add our URL to the import page
test_url = url_for('test_endpoint', _external=True)
res = client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
# Give the thread time to pick it up
wait_for_all_checks(client)
@@ -107,14 +103,12 @@ def test_check_removed_line_contains_trigger(client, live_server, measure_memory
res = client.get(url_for("watchlist.index"))
assert b'has-unread-changes' in res.data
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data
delete_all_watches(client)
def test_check_add_line_contains_trigger(client, live_server, measure_memory_usage):
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data
delete_all_watches(client)
time.sleep(1)
# Give the endpoint time to spin up
@@ -137,12 +131,8 @@ def test_check_add_line_contains_trigger(client, live_server, measure_memory_usa
set_original()
# Add our URL to the import page
test_url = url_for('test_endpoint', _external=True)
res = client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
# Give the thread time to pick it up
wait_for_all_checks(client)
@@ -187,5 +177,4 @@ def test_check_add_line_contains_trigger(client, live_server, measure_memory_usa
assert b'-Oh yes please' in response
assert '网站监测 内容更新了'.encode('utf-8') in response
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data
delete_all_watches(client)

View File

@@ -2,7 +2,7 @@
import time
from flask import url_for
from .util import live_server_setup, wait_for_all_checks
from .util import live_server_setup, wait_for_all_checks, delete_all_watches
import json
import uuid
@@ -276,8 +276,7 @@ def test_access_denied(client, live_server, measure_memory_usage):
assert res.status_code == 200
# Cleanup everything
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data
delete_all_watches(client)
res = client.post(
url_for("settings.settings_page"),
@@ -385,8 +384,7 @@ def test_api_watch_PUT_update(client, live_server, measure_memory_usage):
assert b'Additional properties are not allowed' in res.data
# Cleanup everything
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data
delete_all_watches(client)
def test_api_import(client, live_server, measure_memory_usage):

View File

@@ -4,7 +4,7 @@ from flask import url_for
from .util import live_server_setup
import json
def test_api_notifications_crud(client, live_server):
def test_api_notifications_crud(client, live_server, measure_memory_usage):
# live_server_setup(live_server) # Setup on conftest per function
api_key = live_server.app.config['DATASTORE'].data['settings']['application'].get('api_access_token')

View File

@@ -6,7 +6,7 @@ import time
from .util import live_server_setup, wait_for_all_checks
def test_api_search(client, live_server):
def test_api_search(client, live_server, measure_memory_usage):
# live_server_setup(live_server) # Setup on conftest per function
api_key = live_server.app.config['DATASTORE'].data['settings']['application'].get('api_access_token')

View File

@@ -12,12 +12,8 @@ def test_basic_auth(client, live_server, measure_memory_usage):
# This page will echo back any auth info
test_url = url_for('test_basicauth_method', _external=True).replace("//","//myuser:mypass@")
time.sleep(1)
res = client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
time.sleep(1)
# Check form validation

View File

@@ -86,12 +86,8 @@ def test_check_ldjson_price_autodetect(client, live_server, measure_memory_usage
# Add our URL to the import page
test_url = url_for('test_endpoint', _external=True)
res = client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
# Should get a notice that it's available
@@ -129,12 +125,8 @@ def test_check_ldjson_price_autodetect(client, live_server, measure_memory_usage
# Add our URL to the import page
test_url = url_for('test_endpoint', _external=True)
res = client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
res = client.get(url_for("watchlist.index"))
assert b'ldjson-price-track-offer' not in res.data
@@ -146,12 +138,8 @@ def test_check_ldjson_price_autodetect(client, live_server, measure_memory_usage
def _test_runner_check_bad_format_ignored(live_server, client, has_ldjson_price_data):
test_url = url_for('test_endpoint', _external=True)
res = client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
for k,v in client.application.config.get('DATASTORE').data['watching'].items():

View File

@@ -3,7 +3,7 @@
import time
from flask import url_for
from .util import set_original_response, set_modified_response, live_server_setup, wait_for_all_checks, extract_rss_token_from_UI, \
extract_UUID_from_client
extract_UUID_from_client, delete_all_watches
sleep_time_for_fetch_thread = 3
@@ -163,8 +163,7 @@ def test_check_basic_change_detection_functionality(client, live_server, measure
#
# Cleanup everything
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data
delete_all_watches(client)
def test_non_text_mime_or_downloads(client, live_server, measure_memory_usage):
"""
@@ -193,13 +192,8 @@ got it\r\n
test_url = url_for('test_endpoint', content_type="application/octet-stream", _external=True)
# Add our URL to the import page
res = client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
@@ -227,7 +221,7 @@ got it\r\n
assert b"some random text that should be split by line\n" in res.data
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
delete_all_watches(client)
def test_standard_text_plain(client, live_server, measure_memory_usage):
@@ -258,13 +252,8 @@ got it\r\n
test_url = url_for('test_endpoint', content_type="text/plain", _external=True)
# Add our URL to the import page
res = client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
@@ -293,7 +282,7 @@ got it\r\n
assert b"some random text that should be split by line\n" in res.data
assert b"<title>Even this title should stay because we are just plain text</title>" in res.data
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
delete_all_watches(client)
# Server says its plaintext, we should always treat it as plaintext
def test_plaintext_even_if_xml_content(client, live_server, measure_memory_usage):
@@ -309,13 +298,8 @@ def test_plaintext_even_if_xml_content(client, live_server, measure_memory_usage
test_url = url_for('test_endpoint', content_type="text/plain", _external=True)
# Add our URL to the import page
res = client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
@@ -326,5 +310,32 @@ def test_plaintext_even_if_xml_content(client, live_server, measure_memory_usage
assert b'&lt;string name=&#34;feed_update_receiver_name&#34;' in res.data
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
delete_all_watches(client)
# Server says its plaintext, we should always treat it as plaintext, and then if they have a filter, try to apply that
def test_plaintext_even_if_xml_content_and_can_apply_filters(client, live_server, measure_memory_usage):
with open("test-datastore/endpoint-content.txt", "w") as f:
f.write("""<?xml version="1.0" encoding="utf-8"?>
<resources xmlns:tools="http://schemas.android.com/tools">
<!--Activity and fragment titles-->
<string name="feed_update_receiver_name">Abonnementen bijwerken</string>
<foobar>ok man</foobar>
</resources>
""")
test_url=url_for('test_endpoint', content_type="text/plain", _external=True)
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url, extras={"include_filters": ['//string']})
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
res = client.get(
url_for("ui.ui_views.preview_page", uuid="first"),
follow_redirects=True
)
assert b'&lt;string name=&#34;feed_update_receiver_name&#34;' in res.data
assert b'&lt;foobar' not in res.data
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)

View File

@@ -2,7 +2,7 @@
import time
from flask import url_for
from .util import live_server_setup, wait_for_all_checks
from .util import live_server_setup, wait_for_all_checks, delete_all_watches
from changedetectionio import html_tools
def set_original_ignore_response():
@@ -70,12 +70,8 @@ def test_check_block_changedetection_text_NOT_present(client, live_server, measu
# Add our URL to the import page
test_url = url_for('test_endpoint', _external=True)
res = client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
# Give the thread time to pick it up
wait_for_all_checks(client)
@@ -144,5 +140,4 @@ def test_check_block_changedetection_text_NOT_present(client, live_server, measu
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data
delete_all_watches(client)

View File

@@ -14,12 +14,8 @@ def test_clone_functionality(client, live_server, measure_memory_usage):
test_url = url_for('test_endpoint', _external=True)
# Add our URL to the import page
res = client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
# So that we can be sure the same history doesnt carry over

View File

@@ -3,7 +3,7 @@ import json
import time
from flask import url_for
from .util import live_server_setup, wait_for_all_checks
from .util import live_server_setup, wait_for_all_checks, delete_all_watches
from ..model import CONDITIONS_MATCH_LOGIC_DEFAULT
@@ -47,11 +47,11 @@ def set_number_out_of_range_response(number="150"):
f.write(test_return_data)
# def test_setup(client, live_server):
# def test_setup(client, live_server, measure_memory_usage):
"""Test that both text and number conditions work together with AND logic."""
# live_server_setup(live_server) # Setup on conftest per function
def test_conditions_with_text_and_number(client, live_server):
def test_conditions_with_text_and_number(client, live_server, measure_memory_usage):
"""Test that both text and number conditions work together with AND logic."""
set_original_response("50")
@@ -60,12 +60,8 @@ def test_conditions_with_text_and_number(client, live_server):
test_url = url_for('test_endpoint', _external=True)
# Add our URL to the import page
res = client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
# Configure the watch with two conditions connected with AND:
@@ -143,23 +139,18 @@ def test_conditions_with_text_and_number(client, live_server):
res = client.get(url_for("watchlist.index"))
assert b'has-unread-changes' not in res.data
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data
delete_all_watches(client)
# The 'validate' button next to each rule row
def test_condition_validate_rule_row(client, live_server):
def test_condition_validate_rule_row(client, live_server, measure_memory_usage):
set_original_response("50")
test_url = url_for('test_endpoint', _external=True)
# Add our URL to the import page
res = client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
uuid = next(iter(live_server.app.config['DATASTORE'].data['watching']))
@@ -230,12 +221,8 @@ def test_wordcount_conditions_plugin(client, live_server, measure_memory_usage):
# Add our URL to the import page
test_url = url_for('test_endpoint', _external=True)
res = client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
# Give the thread time to pick it up
wait_for_all_checks(client)

View File

@@ -81,12 +81,8 @@ def test_check_markup_include_filters_restriction(client, live_server, measure_m
# Add our URL to the import page
test_url = url_for('test_endpoint', _external=True)
res = client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
# Give the thread time to pick it up
time.sleep(sleep_time_for_fetch_thread)
@@ -138,12 +134,8 @@ def test_check_multiple_filters(client, live_server, measure_memory_usage):
# Add our URL to the import page
test_url = url_for('test_endpoint', _external=True)
res = client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
# Goto the edit page, add our ignore text
@@ -193,12 +185,8 @@ def test_filter_is_empty_help_suggestion(client, live_server, measure_memory_usa
# Add our URL to the import page
test_url = url_for('test_endpoint', _external=True)
res = client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
# Goto the edit page, add our ignore text

View File

@@ -5,7 +5,7 @@ import time
from flask import url_for
from ..html_tools import *
from .util import live_server_setup, wait_for_all_checks
from .util import live_server_setup, wait_for_all_checks, delete_all_watches
@@ -209,48 +209,32 @@ def test_element_removal_full(client, live_server, measure_memory_usage):
# Re #2752
def test_element_removal_nth_offset_no_shift(client, live_server, measure_memory_usage):
set_response_with_multiple_index()
subtractive_selectors_data = ["""
body > table > tr:nth-child(1) > th:nth-child(2)
subtractive_selectors_data = [
### css style ###
"""body > table > tr:nth-child(1) > th:nth-child(2)
body > table > tr:nth-child(2) > td:nth-child(2)
body > table > tr:nth-child(3) > td:nth-child(2)
body > table > tr:nth-child(1) > th:nth-child(3)
body > table > tr:nth-child(2) > td:nth-child(3)
body > table > tr:nth-child(3) > td:nth-child(3)""",
### second type, xpath ###
"""//body/table/tr[1]/th[2]
//body/table/tr[2]/td[2]
//body/table/tr[3]/td[2]
//body/table/tr[1]/th[3]
//body/table/tr[2]/td[3]
//body/table/tr[3]/td[3]"""]
test_url = url_for("test_endpoint", _external=True)
for selector_list in subtractive_selectors_data:
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data
delete_all_watches(client)
# Add our URL to the import page
test_url = url_for("test_endpoint", _external=True)
res = client.post(
url_for("imports.import_page"), data={"urls": test_url}, follow_redirects=True
)
assert b"1 Imported" in res.data
wait_for_all_checks(client)
res = client.post(
url_for("ui.ui_edit.edit_page", uuid="first"),
data={
"subtractive_selectors": selector_list,
"url": test_url,
"tags": "",
"fetch_backend": "html_requests",
"time_between_check_use_default": "y",
},
follow_redirects=True,
)
assert b"Updated watch." in res.data
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url, extras={"subtractive_selectors": selector_list.splitlines()})
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
res = client.get(
@@ -258,6 +242,7 @@ body > table > tr:nth-child(3) > td:nth-child(3)""",
follow_redirects=True
)
# the filters above should have removed this but they never say to remove the "emil" column
assert b"Tobias" not in res.data
assert b"Linus" not in res.data
assert b"Person 2" not in res.data

View File

@@ -28,11 +28,8 @@ def test_check_encoding_detection(client, live_server, measure_memory_usage):
# Add our URL to the import page
test_url = url_for('test_endpoint', content_type="text/html", _external=True)
client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
# Give the thread time to pick it up
wait_for_all_checks(client)
@@ -59,11 +56,8 @@ def test_check_encoding_detection_missing_content_type_header(client, live_serve
# Add our URL to the import page
test_url = url_for('test_endpoint', _external=True)
client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)

View File

@@ -3,7 +3,7 @@
import time
from flask import url_for
from .util import live_server_setup, wait_for_all_checks
from .util import live_server_setup, wait_for_all_checks, delete_all_watches
@@ -19,12 +19,8 @@ def _runner_test_http_errors(client, live_server, http_code, expected_text):
status_code=http_code,
_external=True)
res = client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
# Give the thread time to pick it up
wait_for_all_checks(client)
@@ -47,8 +43,7 @@ def _runner_test_http_errors(client, live_server, http_code, expected_text):
#assert b'Error Screenshot' in res.data
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data
delete_all_watches(client)
def test_http_error_handler(client, live_server, measure_memory_usage):
@@ -56,8 +51,7 @@ def test_http_error_handler(client, live_server, measure_memory_usage):
_runner_test_http_errors(client, live_server, 404, 'Page not found')
_runner_test_http_errors(client, live_server, 500, '(Internal server error) received')
_runner_test_http_errors(client, live_server, 400, 'Error - Request returned a HTTP error code 400')
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data
delete_all_watches(client)
# Just to be sure error text is properly handled
def test_DNS_errors(client, live_server, measure_memory_usage):
@@ -87,8 +81,7 @@ def test_DNS_errors(client, live_server, measure_memory_usage):
assert found_name_resolution_error
# Should always record that we tried
assert bytes("just now".encode('utf-8')) in res.data
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data
delete_all_watches(client)
# Re 1513
def test_low_level_errors_clear_correctly(client, live_server, measure_memory_usage):
@@ -145,5 +138,4 @@ def test_low_level_errors_clear_correctly(client, live_server, measure_memory_us
)
assert not found_name_resolution_error
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data
delete_all_watches(client)

View File

@@ -2,7 +2,7 @@
import time
from flask import url_for
from .util import live_server_setup, wait_for_all_checks
from .util import live_server_setup, wait_for_all_checks, delete_all_watches
from ..html_tools import *
@@ -76,12 +76,8 @@ def test_check_filter_multiline(client, live_server, measure_memory_usage):
# Add our URL to the import page
test_url = url_for('test_endpoint', _external=True)
res = client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
@@ -131,12 +127,8 @@ def test_check_filter_and_regex_extract(client, live_server, measure_memory_usag
# Add our URL to the import page
test_url = url_for('test_endpoint', _external=True)
res = client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
# Give the thread time to pick it up
wait_for_all_checks(client)
@@ -212,12 +204,8 @@ def test_regex_error_handling(client, live_server, measure_memory_usage):
# Add our URL to the import page
test_url = url_for('test_endpoint', _external=True)
res = client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
### test regex error handling
res = client.post(
@@ -231,5 +219,4 @@ def test_regex_error_handling(client, live_server, measure_memory_usage):
assert b'is not a valid regular expression.' in res.data
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data
delete_all_watches(client)

View File

@@ -42,13 +42,8 @@ def run_filter_test(client, live_server, content_filter):
if os.path.isfile("test-datastore/notification.txt"):
os.unlink("test-datastore/notification.txt")
res = client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
uuid = next(iter(live_server.app.config['DATASTORE'].data['watching']))

View File

@@ -2,7 +2,7 @@
import time
from flask import url_for
from .util import live_server_setup, wait_for_all_checks, extract_rss_token_from_UI, get_UUID_for_tag_name, extract_UUID_from_client
from .util import live_server_setup, wait_for_all_checks, extract_rss_token_from_UI, get_UUID_for_tag_name, extract_UUID_from_client, delete_all_watches
import os
@@ -127,8 +127,7 @@ def test_setup_group_tag(client, live_server, measure_memory_usage):
assert b"should-be-excluded" not in res.data
assert res.status_code == 200
assert b"first-imported=1" in res.data
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data
delete_all_watches(client)
def test_tag_import_singular(client, live_server, measure_memory_usage):
@@ -147,8 +146,7 @@ def test_tag_import_singular(client, live_server, measure_memory_usage):
)
# Should be only 1 tag because they both had the same
assert res.data.count(b'test-tag') == 1
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data
delete_all_watches(client)
def test_tag_add_in_ui(client, live_server, measure_memory_usage):
@@ -164,8 +162,7 @@ def test_tag_add_in_ui(client, live_server, measure_memory_usage):
res = client.get(url_for("tags.delete_all"), follow_redirects=True)
assert b'All tags deleted' in res.data
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data
delete_all_watches(client)
def test_group_tag_notification(client, live_server, measure_memory_usage):
@@ -232,8 +229,7 @@ def test_group_tag_notification(client, live_server, measure_memory_usage):
#@todo Test that multiple notifications fired
#@todo Test that each of multiple notifications with different settings
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data
delete_all_watches(client)
def test_limit_tag_ui(client, live_server, measure_memory_usage):
@@ -269,8 +265,7 @@ def test_limit_tag_ui(client, live_server, measure_memory_usage):
assert res.data.count(b' unviewed ') == 1
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data
delete_all_watches(client)
res = client.get(url_for("tags.delete_all"), follow_redirects=True)
assert b'All tags deleted' in res.data
@@ -297,8 +292,7 @@ def test_clone_tag_on_import(client, live_server, measure_memory_usage):
# 2 times plus the top link to tag
assert res.data.count(b'test-tag') == 3
assert res.data.count(b'another-tag') == 3
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data
delete_all_watches(client)
def test_clone_tag_on_quickwatchform_add(client, live_server, measure_memory_usage):
@@ -325,8 +319,7 @@ def test_clone_tag_on_quickwatchform_add(client, live_server, measure_memory_usa
# 2 times plus the top link to tag
assert res.data.count(b'test-tag') == 3
assert res.data.count(b'another-tag') == 3
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data
delete_all_watches(client)
res = client.get(url_for("tags.delete_all"), follow_redirects=True)
assert b'All tags deleted' in res.data
@@ -389,12 +382,8 @@ def test_order_of_filters_tag_filter_and_watch_filter(client, live_server, measu
f.write(d)
test_url = url_for('test_endpoint', _external=True)
res = client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
filters = [
@@ -480,5 +469,4 @@ the {test} appeared before. {test in res.data[:n]=}
"""
n += t_index + len(test)
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data
delete_all_watches(client)

View File

@@ -4,7 +4,7 @@ import time
import os
import json
from flask import url_for
from .util import wait_for_all_checks
from .util import wait_for_all_checks, delete_all_watches
from urllib.parse import urlparse, parse_qs
def test_consistent_history(client, live_server, measure_memory_usage):
@@ -80,19 +80,15 @@ def test_consistent_history(client, live_server, measure_memory_usage):
assert '"default"' not in f.read(), "'default' probably shouldnt be here, it came from when the 'default' Watch vars were accidently being saved"
def test_check_text_history_view(client, live_server):
def test_check_text_history_view(client, live_server, measure_memory_usage):
with open("test-datastore/endpoint-content.txt", "w") as f:
f.write("<html>test-one</html>")
# Add our URL to the import page
test_url = url_for('test_endpoint', _external=True)
res = client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
# Give the thread time to pick it up
wait_for_all_checks(client)
@@ -121,5 +117,4 @@ def test_check_text_history_view(client, live_server):
assert b'test-two' in res.data
assert b'test-one' not in res.data
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data
delete_all_watches(client)

View File

@@ -27,12 +27,8 @@ def test_ignore(client, live_server, measure_memory_usage):
# live_server_setup(live_server) # Setup on conftest per function
set_original_ignore_response()
test_url = url_for('test_endpoint', _external=True)
res = client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
# Give the thread time to pick it up
wait_for_all_checks(client)
@@ -79,12 +75,8 @@ def test_strip_ignore_lines(client, live_server, measure_memory_usage):
assert b"Settings updated." in res.data
test_url = url_for('test_endpoint', _external=True)
res = client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
# Give the thread time to pick it up
wait_for_all_checks(client)

View File

@@ -2,7 +2,7 @@
import time
from flask import url_for
from .util import live_server_setup, wait_for_all_checks
from .util import live_server_setup, wait_for_all_checks, delete_all_watches
from changedetectionio import html_tools
@@ -97,12 +97,8 @@ def test_check_ignore_text_functionality(client, live_server, measure_memory_usa
# Add our URL to the import page
test_url = url_for('test_endpoint', _external=True)
res = client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
# Give the thread time to pick it up
wait_for_all_checks(client)
@@ -163,8 +159,7 @@ def test_check_ignore_text_functionality(client, live_server, measure_memory_usa
# it is only ignored, it is not removed (it will be highlighted too)
assert b'new ignore stuff' in res.data
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data
delete_all_watches(client)
# When adding some ignore text, it should not trigger a change, even if something else on that line changes
def _run_test_global_ignore(client, as_source=False, extra_ignore=""):
@@ -192,12 +187,8 @@ def _run_test_global_ignore(client, as_source=False, extra_ignore=""):
# Switch to source mode so we can test that too!
test_url = "source:"+test_url
res = client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
# Give the thread time to pick it up
wait_for_all_checks(client)
@@ -251,13 +242,12 @@ def _run_test_global_ignore(client, as_source=False, extra_ignore=""):
res = client.get(url_for("watchlist.index"))
assert b'has-unread-changes' in res.data
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data
delete_all_watches(client)
def test_check_global_ignore_text_functionality(client, live_server):
def test_check_global_ignore_text_functionality(client, live_server, measure_memory_usage):
_run_test_global_ignore(client, as_source=False)
def test_check_global_ignore_text_functionality_as_source(client, live_server):
def test_check_global_ignore_text_functionality_as_source(client, live_server, measure_memory_usage):
_run_test_global_ignore(client, as_source=True, extra_ignore='/\?v=\d/')

View File

@@ -3,9 +3,7 @@
import time
from flask import url_for
from .util import live_server_setup, wait_for_all_checks
from .util import live_server_setup, wait_for_all_checks, delete_all_watches
def set_original_ignore_response():
@@ -117,7 +115,5 @@ def test_render_anchor_tag_content_true(client, live_server, measure_memory_usag
assert b"/test-endpoint" in res.data
# Cleanup everything
res = client.get(url_for("ui.form_delete", uuid="all"),
follow_redirects=True)
assert b'Deleted' in res.data
delete_all_watches(client)

View File

@@ -60,12 +60,8 @@ def test_normal_page_check_works_with_ignore_status_code(client, live_server, me
# Add our URL to the import page
test_url = url_for('test_endpoint', _external=True)
res = client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
@@ -94,12 +90,8 @@ def test_403_page_check_works_with_ignore_status_code(client, live_server, measu
# Add our URL to the import page
test_url = url_for('test_endpoint', status_code=403, _external=True)
res = client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
# Give the thread time to pick it up
time.sleep(sleep_time_for_fetch_thread)

View File

@@ -70,12 +70,8 @@ def test_check_ignore_whitespace(client, live_server, measure_memory_usage):
# Add our URL to the import page
test_url = url_for('test_endpoint', _external=True)
res = client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
time.sleep(sleep_time_for_fetch_thread)
# Trigger a check

View File

@@ -5,7 +5,7 @@ import time
from flask import url_for
from .util import live_server_setup, wait_for_all_checks
from .util import live_server_setup, wait_for_all_checks, delete_all_watches
# def test_setup(client, live_server, measure_memory_usage):
@@ -28,7 +28,7 @@ https://example.com tag1, other tag"""
assert b"3 Imported" in res.data
assert b"tag1" in res.data
assert b"other tag" in res.data
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
delete_all_watches(client)
# Clear flask alerts
res = client.get( url_for("watchlist.index"))
@@ -53,7 +53,7 @@ def xtest_import_skip_url(client, live_server, measure_memory_usage):
assert b"1 Imported" in res.data
assert b"ht000000broken" in res.data
assert b"1 Skipped" in res.data
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
delete_all_watches(client)
# Clear flask alerts
res = client.get( url_for("watchlist.index"))
@@ -119,7 +119,7 @@ def test_import_distillio(client, live_server, measure_memory_usage):
assert b"nice stuff" in res.data
assert b"nerd-news" in res.data
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
delete_all_watches(client)
# Clear flask alerts
res = client.get(url_for("watchlist.index"))
@@ -169,8 +169,7 @@ def test_import_custom_xlsx(client, live_server, measure_memory_usage):
assert filters[0] == '/html[1]/body[1]/div[4]/div[1]/div[1]/div[1]||//*[@id=\'content\']/div[3]/div[1]/div[1]||//*[@id=\'content\']/div[1]'
assert watch.get('time_between_check') == {'weeks': 0, 'days': 1, 'hours': 6, 'minutes': 24, 'seconds': 0}
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data
delete_all_watches(client)
def test_import_watchete_xlsx(client, live_server, measure_memory_usage):
"""Test can upload a excel spreadsheet and the watches are created correctly"""
@@ -214,5 +213,4 @@ def test_import_watchete_xlsx(client, live_server, measure_memory_usage):
if watch.get('title') == 'system default website':
assert watch.get('fetch_backend') == 'system' # uses default if blank
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data
delete_all_watches(client)

View File

@@ -3,7 +3,7 @@
import time
from flask import url_for, escape
from . util import live_server_setup, wait_for_all_checks
from . util import live_server_setup, wait_for_all_checks, delete_all_watches
import pytest
jq_support = True
@@ -205,16 +205,10 @@ def test_check_json_without_filter(client, live_server, measure_memory_usage):
# and be sure it doesn't get chewed up by instriptis
set_json_response_with_html()
# Give the endpoint time to spin up
time.sleep(1)
# Add our URL to the import page
test_url = url_for('test_endpoint', content_type="application/json", _external=True)
client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
# Give the thread time to pick it up
wait_for_all_checks(client)
@@ -228,23 +222,16 @@ def test_check_json_without_filter(client, live_server, measure_memory_usage):
assert b'&#34;html&#34;: &#34;&lt;b&gt;&#34;' in res.data
assert res.data.count(b'{') >= 2
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data
delete_all_watches(client)
def check_json_filter(json_filter, client, live_server):
set_original_response()
# Give the endpoint time to spin up
time.sleep(1)
# Add our URL to the import page
test_url = url_for('test_endpoint', content_type="application/json", _external=True)
res = client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
# Give the thread time to pick it up
wait_for_all_checks(client)
@@ -291,8 +278,7 @@ def check_json_filter(json_filter, client, live_server):
# And #462 - check we see the proper utf-8 string there
assert "Örnsköldsvik".encode('utf-8') in res.data
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data
delete_all_watches(client)
def test_check_jsonpath_filter(client, live_server, measure_memory_usage):
check_json_filter('json:boss.name', client, live_server)
@@ -313,12 +299,8 @@ def check_json_filter_bool_val(json_filter, client, live_server):
test_url = url_for('test_endpoint', content_type="application/json", _external=True)
res = client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
# Goto the edit page, add our ignore text
@@ -350,8 +332,7 @@ def check_json_filter_bool_val(json_filter, client, live_server):
# But the change should be there, tho its hard to test the change was detected because it will show old and new versions
assert b'false' in res.data
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data
delete_all_watches(client)
def test_check_jsonpath_filter_bool_val(client, live_server, measure_memory_usage):
check_json_filter_bool_val("json:$['available']", client, live_server)
@@ -377,12 +358,8 @@ def check_json_ext_filter(json_filter, client, live_server):
# Add our URL to the import page
test_url = url_for('test_endpoint', content_type="application/json", _external=True)
res = client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
# Give the thread time to pick it up
wait_for_all_checks(client)
@@ -436,8 +413,7 @@ def check_json_ext_filter(json_filter, client, live_server):
assert b'ForSale' in res.data
assert b'Sold' in res.data
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data
delete_all_watches(client)
def test_ignore_json_order(client, live_server, measure_memory_usage):
# A change in order shouldn't trigger a notification
@@ -448,12 +424,8 @@ def test_ignore_json_order(client, live_server, measure_memory_usage):
# Add our URL to the import page
test_url = url_for('test_endpoint', content_type="application/json", _external=True)
res = client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
@@ -478,8 +450,7 @@ def test_ignore_json_order(client, live_server, measure_memory_usage):
res = client.get(url_for("watchlist.index"))
assert b'has-unread-changes' in res.data
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data
delete_all_watches(client)
def test_correct_header_detect(client, live_server, measure_memory_usage):
# Like in https://github.com/dgtlmoon/changedetection.io/pull/1593
@@ -490,12 +461,8 @@ def test_correct_header_detect(client, live_server, measure_memory_usage):
# Add our URL to the import page
# Check weird casing is cleaned up and detected also
test_url = url_for('test_endpoint', content_type="aPPlication/JSon", uppercase_headers=True, _external=True)
res = client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
res = client.get(url_for("watchlist.index"))
@@ -510,8 +477,7 @@ def test_correct_header_detect(client, live_server, measure_memory_usage):
assert b'&#34;hello&#34;: 123,' in res.data
assert b'&#34;world&#34;: 123' in res.data
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data
delete_all_watches(client)
def test_check_jsonpath_ext_filter(client, live_server, measure_memory_usage):
check_json_ext_filter('json:$[?(@.status==Sold)]', client, live_server)

View File

@@ -1,7 +1,7 @@
#!/usr/bin/env python3
from flask import url_for
from changedetectionio.tests.util import live_server_setup, wait_for_all_checks, extract_UUID_from_client
from changedetectionio.tests.util import live_server_setup, wait_for_all_checks, extract_UUID_from_client, delete_all_watches
def set_response():
@@ -75,5 +75,4 @@ def test_content_filter_live_preview(client, live_server, measure_memory_usage):
assert reply.get('ignore_line_numbers') == [2] # Ignored - "socks" on line 2
assert reply.get('trigger_line_numbers') == [1] # Triggers "Awesome" in line 1
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data
delete_all_watches(client)

View File

@@ -1,7 +1,7 @@
#!/usr/bin/env python3
from flask import url_for
from .util import set_original_response, set_modified_response, live_server_setup, wait_for_all_checks
from .util import set_original_response, set_modified_response, live_server_setup, wait_for_all_checks, delete_all_watches
import time
@@ -113,6 +113,5 @@ def test_check_basic_change_detection_functionality(client, live_server, measure
#
# Cleanup everything
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data
delete_all_watches(client)

View File

@@ -24,12 +24,8 @@ def test_obfuscations(client, live_server, measure_memory_usage):
time.sleep(1)
# Add our URL to the import page
test_url = url_for('test_endpoint', _external=True)
res = client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
# Give the thread time to pick it up
time.sleep(3)

View File

@@ -13,13 +13,8 @@ def test_fetch_pdf(client, live_server, measure_memory_usage):
# live_server_setup(live_server) # Setup on conftest per function
test_url = url_for('test_pdf_endpoint', _external=True)
# Add our URL to the import page
res = client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)

View File

@@ -13,13 +13,8 @@ def test_fetch_pdf(client, live_server, measure_memory_usage):
# live_server_setup(live_server) # Setup on conftest per function
test_url = url_for('test_pdf_endpoint', _external=True)
# Add our URL to the import page
res = client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)

View File

@@ -2,7 +2,7 @@ import json
import os
import time
from flask import url_for
from . util import set_original_response, set_modified_response, live_server_setup, wait_for_all_checks, extract_UUID_from_client
from . util import set_original_response, set_modified_response, live_server_setup, wait_for_all_checks, extract_UUID_from_client, delete_all_watches
@@ -17,21 +17,13 @@ def test_headers_in_request(client, live_server, measure_memory_usage):
test_url = test_url.replace('localhost', 'changedet')
# Add the test URL twice, we will check
res = client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
res = client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
cookie_header = '_ga=GA1.2.1022228332; cookie-preferences=analytics:accepted;'
@@ -82,8 +74,7 @@ def test_headers_in_request(client, live_server, measure_memory_usage):
for k, watch in client.application.config.get('DATASTORE').data.get('watching').items():
assert 'custom' in watch.get('remote_server_reply') # added in util.py
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data
delete_all_watches(client)
def test_body_in_request(client, live_server, measure_memory_usage):
@@ -93,12 +84,8 @@ def test_body_in_request(client, live_server, measure_memory_usage):
# Because its no longer calling back to localhost but from the browser container, set in test-only.yml
test_url = test_url.replace('localhost', 'cdio')
res = client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
@@ -150,12 +137,8 @@ def test_body_in_request(client, live_server, measure_memory_usage):
####### data sanity checks
# Add the test URL twice, we will check
res = client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
watches_with_body = 0
with open('test-datastore/url-watches.json') as f:
@@ -180,8 +163,7 @@ def test_body_in_request(client, live_server, measure_memory_usage):
follow_redirects=True
)
assert b"Body must be empty when Request Method is set to GET" in res.data
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data
delete_all_watches(client)
def test_method_in_request(client, live_server, measure_memory_usage):
# Add our URL to the import page
@@ -191,20 +173,12 @@ def test_method_in_request(client, live_server, measure_memory_usage):
test_url = test_url.replace('localhost', 'cdio')
# Add the test URL twice, we will check
res = client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
res = client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
@@ -258,8 +232,7 @@ def test_method_in_request(client, live_server, measure_memory_usage):
# Should be only one with method set to PATCH
assert watches_with_method == 1
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data
delete_all_watches(client)
# Re #2408 - user-agent override test, also should handle case-insensitive header deduplication
def test_ua_global_override(client, live_server, measure_memory_usage):
@@ -277,12 +250,8 @@ def test_ua_global_override(client, live_server, measure_memory_usage):
)
assert b'Settings updated' in res.data
res = client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
res = client.get(
@@ -315,8 +284,7 @@ def test_ua_global_override(client, live_server, measure_memory_usage):
)
assert b"agent-from-watch" in res.data
assert b"html-requests-user-agent" not in res.data
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data
delete_all_watches(client)
def test_headers_textfile_in_request(client, live_server, measure_memory_usage):
@@ -356,12 +324,8 @@ def test_headers_textfile_in_request(client, live_server, measure_memory_usage):
assert b"requests-default_ua-html_requests" in res.data
# Add the test URL twice, we will check
res = client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
@@ -429,19 +393,14 @@ def test_headers_textfile_in_request(client, live_server, measure_memory_usage):
assert "User-Agent:".encode('utf-8') + requests_ua.encode('utf-8') in res.data
# unlink headers.txt on start/stop
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data
delete_all_watches(client)
def test_headers_validation(client, live_server):
def test_headers_validation(client, live_server, measure_memory_usage):
test_url = url_for('test_headers', _external=True)
res = client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
res = client.post(
url_for("ui.ui_edit.edit_page", uuid="first"),

View File

@@ -3,7 +3,7 @@ import os
import time
from flask import url_for
from .util import live_server_setup, wait_for_all_checks, wait_for_notification_endpoint_output, extract_UUID_from_client
from .util import live_server_setup, wait_for_all_checks, wait_for_notification_endpoint_output, extract_UUID_from_client, delete_all_watches
from ..notification import default_notification_format
instock_props = [
@@ -44,11 +44,11 @@ def set_original_response(props_markup='', price="121.95"):
# def test_setup(client, live_server):
# def test_setup(client, live_server, measure_memory_usage):
# live_server_setup(live_server) # Setup on conftest per function
def test_restock_itemprop_basic(client, live_server):
def test_restock_itemprop_basic(client, live_server, measure_memory_usage):
@@ -69,8 +69,7 @@ def test_restock_itemprop_basic(client, live_server):
assert b'has-restock-info' in res.data
assert b' in-stock' in res.data
assert b' not-in-stock' not in res.data
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data
delete_all_watches(client)
for p in out_of_stock_props:
@@ -85,10 +84,9 @@ def test_restock_itemprop_basic(client, live_server):
assert b'has-restock-info not-in-stock' in res.data
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data
delete_all_watches(client)
def test_itemprop_price_change(client, live_server):
def test_itemprop_price_change(client, live_server, measure_memory_usage):
# Out of the box 'Follow price changes' should be ON
@@ -132,13 +130,11 @@ def test_itemprop_price_change(client, live_server):
assert b'has-unread-changes' not in res.data
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data
delete_all_watches(client)
def _run_test_minmax_limit(client, extra_watch_edit_form):
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data
delete_all_watches(client)
test_url = url_for('test_endpoint', _external=True)
@@ -212,11 +208,10 @@ def _run_test_minmax_limit(client, extra_watch_edit_form):
assert b'1,890.45' in res.data or b'1890.45' in res.data
assert b'has-unread-changes' in res.data
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data
delete_all_watches(client)
def test_restock_itemprop_minmax(client, live_server):
def test_restock_itemprop_minmax(client, live_server, measure_memory_usage):
extras = {
"restock_settings-follow_price_changes": "y",
@@ -225,7 +220,7 @@ def test_restock_itemprop_minmax(client, live_server):
}
_run_test_minmax_limit(client, extra_watch_edit_form=extras)
def test_restock_itemprop_with_tag(client, live_server):
def test_restock_itemprop_with_tag(client, live_server, measure_memory_usage):
res = client.post(
@@ -254,11 +249,10 @@ def test_restock_itemprop_with_tag(client, live_server):
def test_itemprop_percent_threshold(client, live_server):
def test_itemprop_percent_threshold(client, live_server, measure_memory_usage):
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data
delete_all_watches(client)
test_url = url_for('test_endpoint', _external=True)
@@ -317,12 +311,11 @@ def test_itemprop_percent_threshold(client, live_server):
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data
delete_all_watches(client)
def test_change_with_notification_values(client, live_server):
def test_change_with_notification_values(client, live_server, measure_memory_usage):
if os.path.isfile("test-datastore/notification.txt"):
@@ -390,11 +383,10 @@ def test_change_with_notification_values(client, live_server):
assert os.path.isfile("test-datastore/notification.txt"), "Notification received"
def test_data_sanity(client, live_server):
def test_data_sanity(client, live_server, measure_memory_usage):
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data
delete_all_watches(client)
test_url = url_for('test_endpoint', _external=True)
test_url2 = url_for('test_endpoint2', _external=True)
@@ -421,8 +413,7 @@ def test_data_sanity(client, live_server):
assert str(res.data.decode()).count("950.95") == 1, "Price should only show once (for the watch added, no other watches yet)"
## different test, check the edit page works on an empty request result
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data
delete_all_watches(client)
client.post(
url_for("ui.ui_views.form_quick_watch_add"),
@@ -435,11 +426,10 @@ def test_data_sanity(client, live_server):
url_for("ui.ui_edit.edit_page", uuid="first"))
assert test_url2.encode('utf-8') in res.data
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data
delete_all_watches(client)
# All examples should give a prive of 666.66
def test_special_prop_examples(client, live_server):
def test_special_prop_examples(client, live_server, measure_memory_usage):
import glob

View File

@@ -3,7 +3,7 @@
import time
from flask import url_for
from .util import set_original_response, set_modified_response, live_server_setup, wait_for_all_checks, extract_rss_token_from_UI, \
extract_UUID_from_client
extract_UUID_from_client, delete_all_watches
def set_original_cdata_xml():
@@ -114,13 +114,8 @@ def test_basic_cdata_rss_markup(client, live_server, measure_memory_usage):
test_url = url_for('test_endpoint', content_type="application/atom+xml; charset=UTF-8", _external=True)
# Add our URL to the import page
res = client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
@@ -132,7 +127,7 @@ def test_basic_cdata_rss_markup(client, live_server, measure_memory_usage):
assert b'<![' not in res.data
assert b'Hackers can access your computer' in res.data
assert b'The days of Terminator' in res.data
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
delete_all_watches(client)
def test_rss_xpath_filtering(client, live_server, measure_memory_usage):
@@ -180,10 +175,10 @@ def test_rss_xpath_filtering(client, live_server, measure_memory_usage):
assert b'The days of Terminator' not in res.data # Should NOT be selected by the xpath
assert b'Some other description' not in res.data # Should NOT be selected by the xpath
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
delete_all_watches(client)
def test_rss_bad_chars_breaking(client, live_server):
def test_rss_bad_chars_breaking(client, live_server, measure_memory_usage):
"""This should absolutely trigger the RSS builder to go into worst state mode
- source: prefix means no html conversion (which kinda filters out the bad stuff)

View File

@@ -5,11 +5,11 @@ from copy import copy
from datetime import datetime, timezone
from zoneinfo import ZoneInfo
from flask import url_for
from .util import live_server_setup, wait_for_all_checks, extract_UUID_from_client
from .util import live_server_setup, wait_for_all_checks, extract_UUID_from_client, delete_all_watches
from ..forms import REQUIRE_ATLEAST_ONE_TIME_PART_MESSAGE_DEFAULT, REQUIRE_ATLEAST_ONE_TIME_PART_WHEN_NOT_GLOBAL_DEFAULT
# def test_setup(client, live_server):
# def test_setup(client, live_server, measure_memory_usage):
# live_server_setup(live_server) # Setup on conftest per function
def test_check_basic_scheduler_functionality(client, live_server, measure_memory_usage):
@@ -34,13 +34,8 @@ def test_check_basic_scheduler_functionality(client, live_server, measure_memory
res = client.get(url_for("settings.settings_page"))
assert b'Pacific/Kiritimati' in res.data
res = client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
uuid = next(iter(live_server.app.config['DATASTORE'].data['watching']))
@@ -92,8 +87,7 @@ def test_check_basic_scheduler_functionality(client, live_server, measure_memory
assert live_server.app.config['DATASTORE'].data['watching'][uuid]['last_checked'] != last_check
# Cleanup everything
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data
delete_all_watches(client)
def test_check_basic_global_scheduler_functionality(client, live_server, measure_memory_usage):
@@ -101,13 +95,8 @@ def test_check_basic_global_scheduler_functionality(client, live_server, measure
days = ['monday', 'tuesday', 'wednesday', 'thursday', 'friday', 'saturday', 'sunday']
test_url = url_for('test_random_content_endpoint', _external=True)
res = client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
uuid = next(iter(live_server.app.config['DATASTORE'].data['watching']))
@@ -180,18 +169,13 @@ def test_check_basic_global_scheduler_functionality(client, live_server, measure
assert live_server.app.config['DATASTORE'].data['watching'][uuid]['last_checked'] != last_check
# Cleanup everything
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data
delete_all_watches(client)
def test_validation_time_interval_field(client, live_server, measure_memory_usage):
test_url = url_for('test_endpoint', _external=True)
res = client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
res = client.post(

View File

@@ -1,7 +1,7 @@
import os
from flask import url_for
from .util import live_server_setup, wait_for_all_checks
from .util import live_server_setup, wait_for_all_checks, delete_all_watches
from .. import strtobool
@@ -100,8 +100,7 @@ def _runner_test_various_file_slash(client, file_uri):
# This will give some error from requests or if it went to chrome, will give some other error :-)
assert any(s in res.data for s in substrings)
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data
delete_all_watches(client)
def test_file_slash_access(client, live_server, measure_memory_usage):

View File

@@ -3,7 +3,7 @@
import time
from flask import url_for
from urllib.request import urlopen
from .util import set_original_response, set_modified_response, live_server_setup
from .util import set_original_response, set_modified_response, live_server_setup, delete_all_watches
import re
sleep_time_for_fetch_thread = 3
@@ -17,13 +17,8 @@ def test_share_watch(client, live_server, measure_memory_usage):
include_filters = ".nice-filter"
# Add our URL to the import page
res = client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
# Goto the edit page, add our ignore text
# Add our URL to the import page
@@ -54,8 +49,7 @@ def test_share_watch(client, live_server, measure_memory_usage):
# Now delete what we have, we will try to re-import it
# Cleanup everything
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data
delete_all_watches(client)
# Add our URL to the import page
res = client.post(

View File

@@ -13,13 +13,8 @@ def test_check_basic_change_detection_functionality_source(client, live_server,
set_original_response()
test_url = 'source:'+url_for('test_endpoint', _external=True)
# Add our URL to the import page
res = client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
time.sleep(sleep_time_for_fetch_thread)
@@ -62,13 +57,8 @@ def test_check_ignore_elements(client, live_server, measure_memory_usage):
time.sleep(1)
test_url = 'source:'+url_for('test_endpoint', _external=True)
# Add our URL to the import page
res = client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)

View File

@@ -65,12 +65,8 @@ def test_trigger_functionality(client, live_server, measure_memory_usage):
# Add our URL to the import page
test_url = url_for('test_endpoint', _external=True)
res = client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
# Trigger a check
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)

View File

@@ -2,7 +2,7 @@
import time
from flask import url_for
from .util import live_server_setup, wait_for_all_checks
from .util import live_server_setup, wait_for_all_checks, delete_all_watches
def set_original_ignore_response():
@@ -30,12 +30,8 @@ def test_trigger_regex_functionality(client, live_server, measure_memory_usage):
# Add our URL to the import page
test_url = url_for('test_endpoint', _external=True)
res = client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
# Give the thread time to pick it up
wait_for_all_checks(client)
@@ -76,5 +72,4 @@ def test_trigger_regex_functionality(client, live_server, measure_memory_usage):
assert b'has-unread-changes' in res.data
# Cleanup everything
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data
delete_all_watches(client)

View File

@@ -2,7 +2,7 @@
import time
from flask import url_for
from . util import live_server_setup
from . util import live_server_setup, delete_all_watches
def set_original_ignore_response():
@@ -34,12 +34,8 @@ def test_trigger_regex_functionality_with_filter(client, live_server, measure_me
# Add our URL to the import page
test_url = url_for('test_endpoint', _external=True)
res = client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
# it needs time to save the original version
time.sleep(sleep_time_for_fetch_thread)
@@ -81,5 +77,4 @@ def test_trigger_regex_functionality_with_filter(client, live_server, measure_me
assert b'has-unread-changes' in res.data
# Cleanup everything
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data
delete_all_watches(client)

View File

@@ -1,11 +1,11 @@
#!/usr/bin/env python3
from flask import url_for
from .util import set_original_response, set_modified_response, live_server_setup, wait_for_all_checks
from .util import set_original_response, set_modified_response, live_server_setup, wait_for_all_checks, delete_all_watches
from ..forms import REQUIRE_ATLEAST_ONE_TIME_PART_WHEN_NOT_GLOBAL_DEFAULT, REQUIRE_ATLEAST_ONE_TIME_PART_MESSAGE_DEFAULT
def test_recheck_time_field_validation_global_settings(client, live_server):
def test_recheck_time_field_validation_global_settings(client, live_server, measure_memory_usage):
"""
Tests that the global settings time field has atleast one value for week/day/hours/minute/seconds etc entered
class globalSettingsRequestForm(Form):
@@ -27,7 +27,7 @@ def test_recheck_time_field_validation_global_settings(client, live_server):
assert REQUIRE_ATLEAST_ONE_TIME_PART_MESSAGE_DEFAULT.encode('utf-8') in res.data
def test_recheck_time_field_validation_single_watch(client, live_server):
def test_recheck_time_field_validation_single_watch(client, live_server, measure_memory_usage):
"""
Tests that the global settings time field has atleast one value for week/day/hours/minute/seconds etc entered
class globalSettingsRequestForm(Form):
@@ -36,13 +36,8 @@ def test_recheck_time_field_validation_single_watch(client, live_server):
test_url = url_for('test_endpoint', _external=True)
# Add our URL to the import page
res = client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
res = client.post(
url_for("ui.ui_edit.edit_page", uuid="first"),
@@ -100,7 +95,7 @@ def test_recheck_time_field_validation_single_watch(client, live_server):
assert b"Updated watch." in res.data
assert REQUIRE_ATLEAST_ONE_TIME_PART_WHEN_NOT_GLOBAL_DEFAULT.encode('utf-8') not in res.data
def test_checkbox_open_diff_in_new_tab(client, live_server):
def test_checkbox_open_diff_in_new_tab(client, live_server, measure_memory_usage):
set_original_response()
# Add our URL to the import page
@@ -171,10 +166,9 @@ def test_checkbox_open_diff_in_new_tab(client, live_server):
assert 'target=' not in target_line
# Cleanup everything
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data
delete_all_watches(client)
def test_page_title_listing_behaviour(client, live_server):
def test_page_title_listing_behaviour(client, live_server, measure_memory_usage):
set_original_response(extra_title="custom html")
@@ -249,7 +243,7 @@ def test_page_title_listing_behaviour(client, live_server):
assert b"head titlecustom html" in res.data
def test_ui_viewed_unread_flag(client, live_server):
def test_ui_viewed_unread_flag(client, live_server, measure_memory_usage):
import time

View File

@@ -2,7 +2,7 @@
import time
from flask import url_for
from .util import live_server_setup, wait_for_all_checks
from .util import live_server_setup, wait_for_all_checks, delete_all_watches
def set_original_ignore_response():
@@ -79,12 +79,8 @@ def test_unique_lines_functionality(client, live_server, measure_memory_usage):
# Add our URL to the import page
test_url = url_for('test_endpoint', _external=True)
res = client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
# Add our URL to the import page
@@ -118,8 +114,7 @@ def test_unique_lines_functionality(client, live_server, measure_memory_usage):
wait_for_all_checks(client)
res = client.get(url_for("watchlist.index"))
assert b'has-unread-changes' in res.data
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data
delete_all_watches(client)
def test_sort_lines_functionality(client, live_server, measure_memory_usage):
@@ -128,12 +123,8 @@ def test_sort_lines_functionality(client, live_server, measure_memory_usage):
# Add our URL to the import page
test_url = url_for('test_endpoint', _external=True)
res = client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
# Add our URL to the import page
@@ -168,8 +159,7 @@ def test_sort_lines_functionality(client, live_server, measure_memory_usage):
assert res.data.find(b'A uppercase') < res.data.find(b'Z last')
assert res.data.find(b'Some initial text') < res.data.find(b'Which is across multiple lines')
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data
delete_all_watches(client)
def test_extra_filters(client, live_server, measure_memory_usage):
@@ -179,12 +169,8 @@ def test_extra_filters(client, live_server, measure_memory_usage):
# Add our URL to the import page
test_url = url_for('test_endpoint', _external=True)
res = client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
# Add our URL to the import page
@@ -216,5 +202,4 @@ def test_extra_filters(client, live_server, measure_memory_usage):
# still should remain unsorted ('A - sortable line') stays at the end
assert res.data.find(b'A - sortable line') > res.data.find(b'Which is across multiple lines')
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data
delete_all_watches(client)

View File

@@ -10,12 +10,8 @@ def test_check_watch_field_storage(client, live_server, measure_memory_usage):
test_url = "http://somerandomsitewewatch.com"
res = client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
res = client.post(

View File

@@ -2,7 +2,7 @@
from flask import url_for
from .util import wait_for_all_checks
from .util import wait_for_all_checks, delete_all_watches
from ..processors.magic import RSS_XML_CONTENT_TYPES
@@ -113,12 +113,8 @@ def test_check_xpath_filter_utf8(client, live_server, measure_memory_usage):
# Add our URL to the import page
test_url = url_for('test_endpoint', _external=True, content_type="application/rss+xml;charset=UTF-8")
res = client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
res = client.post(
url_for("ui.ui_edit.edit_page", uuid="first"),
@@ -129,8 +125,7 @@ def test_check_xpath_filter_utf8(client, live_server, measure_memory_usage):
wait_for_all_checks(client)
res = client.get(url_for("watchlist.index"))
assert b'Unicode strings with encoding declaration are not supported.' not in res.data
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data
delete_all_watches(client)
# Handle utf-8 charset replies https://github.com/dgtlmoon/changedetection.io/pull/613
@@ -167,12 +162,8 @@ def test_check_xpath_text_function_utf8(client, live_server, measure_memory_usag
# Add our URL to the import page
test_url = url_for('test_endpoint', _external=True, content_type="application/rss+xml;charset=UTF-8")
res = client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
res = client.post(
url_for("ui.ui_edit.edit_page", uuid="first"),
@@ -193,8 +184,7 @@ def test_check_xpath_text_function_utf8(client, live_server, measure_memory_usag
assert b'Stock Alert (UK): RPi CM4' in res.data
assert b'Stock Alert (UK): Big monitor' in res.data
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data
delete_all_watches(client)
def test_check_markup_xpath_filter_restriction(client, live_server, measure_memory_usage):
@@ -204,12 +194,8 @@ def test_check_markup_xpath_filter_restriction(client, live_server, measure_memo
# Add our URL to the import page
test_url = url_for('test_endpoint', _external=True)
res = client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
# Give the thread time to pick it up
wait_for_all_checks(client)
@@ -239,19 +225,14 @@ def test_check_markup_xpath_filter_restriction(client, live_server, measure_memo
res = client.get(url_for("watchlist.index"))
assert b'has-unread-changes' not in res.data
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data
delete_all_watches(client)
def test_xpath_validation(client, live_server, measure_memory_usage):
# Add our URL to the import page
test_url = url_for('test_endpoint', _external=True)
res = client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
res = client.post(
@@ -260,19 +241,14 @@ def test_xpath_validation(client, live_server, measure_memory_usage):
follow_redirects=True
)
assert b"is not a valid XPath expression" in res.data
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data
delete_all_watches(client)
def test_xpath23_prefix_validation(client, live_server, measure_memory_usage):
# Add our URL to the import page
test_url = url_for('test_endpoint', _external=True)
res = client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
res = client.post(
@@ -281,8 +257,7 @@ def test_xpath23_prefix_validation(client, live_server, measure_memory_usage):
follow_redirects=True
)
assert b"is not a valid XPath expression" in res.data
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data
delete_all_watches(client)
def test_xpath1_lxml(client, live_server, measure_memory_usage):
@@ -317,12 +292,8 @@ def test_xpath1_lxml(client, live_server, measure_memory_usage):
test_url = url_for('test_endpoint', _external=True)
res = client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
res = client.post(
@@ -351,12 +322,8 @@ def test_xpath1_lxml(client, live_server, measure_memory_usage):
def test_xpath1_validation(client, live_server, measure_memory_usage):
# Add our URL to the import page
test_url = url_for('test_endpoint', _external=True)
res = client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
res = client.post(
@@ -365,25 +332,19 @@ def test_xpath1_validation(client, live_server, measure_memory_usage):
follow_redirects=True
)
assert b"is not a valid XPath expression" in res.data
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data
delete_all_watches(client)
# actually only really used by the distll.io importer, but could be handy too
def test_check_with_prefix_include_filters(client, live_server, measure_memory_usage):
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data
delete_all_watches(client)
set_original_response()
wait_for_all_checks(client)
# Add our URL to the import page
test_url = url_for('test_endpoint', _external=True)
res = client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
res = client.post(
@@ -428,12 +389,8 @@ def test_various_rules(client, live_server, measure_memory_usage):
""")
test_url = url_for('test_endpoint', _external=True)
res = client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
for r in ['//div', '//a', 'xpath://div', 'xpath://a']:
@@ -452,18 +409,13 @@ def test_various_rules(client, live_server, measure_memory_usage):
res = client.get(url_for("watchlist.index"))
assert b'fetch-error' not in res.data, f"Should not see errors after '{r} filter"
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data
delete_all_watches(client)
def test_xpath_20(client, live_server, measure_memory_usage):
test_url = url_for('test_endpoint', _external=True)
res = client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
set_original_response()
@@ -499,12 +451,8 @@ def test_xpath_20_function_count(client, live_server, measure_memory_usage):
# Add our URL to the import page
test_url = url_for('test_endpoint', _external=True)
res = client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
res = client.post(
@@ -536,12 +484,8 @@ def test_xpath_20_function_count2(client, live_server, measure_memory_usage):
# Add our URL to the import page
test_url = url_for('test_endpoint', _external=True)
res = client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
res = client.post(
@@ -573,16 +517,12 @@ def test_xpath_20_function_string_join_matches(client, live_server, measure_memo
# Add our URL to the import page
test_url = url_for('test_endpoint', _external=True)
res = client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
res = client.post(
url_for("ui.ui_edit.edit_page", uuid="first"),
url_for("ui.ui_edit.edit_page", uuid=uuid),
data={
"include_filters": "xpath:string-join(//*[contains(@class, 'sametext')]|//*[matches(@class, 'changetext')], 'specialconjunction')",
"url": test_url,
@@ -597,7 +537,7 @@ def test_xpath_20_function_string_join_matches(client, live_server, measure_memo
wait_for_all_checks(client)
res = client.get(
url_for("ui.ui_views.preview_page", uuid="first"),
url_for("ui.ui_views.preview_page", uuid=uuid),
follow_redirects=True
)
@@ -644,7 +584,7 @@ def _subtest_xpath_rss(client, content_type='text/html'):
client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
# Be sure all-in-the-wild types of RSS feeds work with xpath
def test_rss_xpath(client, live_server):
def test_rss_xpath(client, live_server, measure_memory_usage):
for feed_header in ['', '<?xml version="1.0" encoding="utf-8"?>']:
set_rss_atom_feed_response(header=feed_header)
for content_type in RSS_XML_CONTENT_TYPES:

View File

@@ -127,6 +127,11 @@ def extract_UUID_from_client(client):
uuid = m.group(1)
return uuid.strip()
def delete_all_watches(client=None):
uuids = list(client.application.config.get('DATASTORE').data['watching'])
for uuid in uuids:
client.application.config.get('DATASTORE').delete(uuid)
def wait_for_all_checks(client=None):
"""
@@ -135,8 +140,6 @@ def wait_for_all_checks(client=None):
"""
from changedetectionio.flask_app import update_q as global_update_q
from changedetectionio import worker_handler
logger = logging.getLogger()
empty_since = None
attempt = 0
max_attempts = 150 # Still reasonable upper bound
@@ -144,9 +147,9 @@ def wait_for_all_checks(client=None):
while attempt < max_attempts:
# Start with fast checks, slow down if needed
if attempt < 10:
time.sleep(0.1) # Very fast initial checks
time.sleep(0.2) # Very fast initial checks
elif attempt < 30:
time.sleep(0.3) # Medium speed
time.sleep(0.4) # Medium speed
else:
time.sleep(0.8) # Slower for persistent issues
@@ -322,4 +325,3 @@ def new_live_server_setup(live_server):
return resp
live_server.start()

View File

@@ -4,7 +4,7 @@ import os
from flask import url_for
from ..util import live_server_setup, wait_for_all_checks
# def test_setup(client, live_server):
# def test_setup(client, live_server, measure_memory_usage):
# live_server_setup(live_server) # Setup on conftest per function
@@ -142,7 +142,7 @@ def test_basic_browserstep(client, live_server, measure_memory_usage):
assert b"testheader: yes" in res.data
assert b"user-agent: mycustomagent" in res.data
def test_non_200_errors_report_browsersteps(client, live_server):
def test_non_200_errors_report_browsersteps(client, live_server, measure_memory_usage):
four_o_four_url = url_for('test_endpoint', status_code=404, _external=True)