Compare commits

..

1 Commits

Author SHA1 Message Date
dgtlmoon
65897f1a74 1.56 https://github.com/truenas/apps/issues/3587 2025-11-19 13:00:13 +01:00
42 changed files with 282 additions and 1392 deletions

View File

@@ -30,7 +30,7 @@ jobs:
steps:
- name: Checkout repository
uses: actions/checkout@v6
uses: actions/checkout@v5
# Initializes the CodeQL tools for scanning.
- name: Initialize CodeQL

View File

@@ -39,7 +39,7 @@ jobs:
# Or if we are in a tagged release scenario.
if: ${{ github.event.workflow_run.conclusion == 'success' }} || ${{ github.event.release.tag_name }} != ''
steps:
- uses: actions/checkout@v6
- uses: actions/checkout@v5
- name: Set up Python 3.11
uses: actions/setup-python@v6
with:

View File

@@ -7,7 +7,7 @@ jobs:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v6
- uses: actions/checkout@v5
- name: Set up Python
uses: actions/setup-python@v6
with:

View File

@@ -44,7 +44,7 @@ jobs:
- platform: linux/arm64
dockerfile: ./.github/test/Dockerfile-alpine
steps:
- uses: actions/checkout@v6
- uses: actions/checkout@v5
- name: Set up Python 3.11
uses: actions/setup-python@v6
with:

View File

@@ -7,7 +7,7 @@ jobs:
lint-code:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v6
- uses: actions/checkout@v5
- name: Lint with Ruff
run: |
pip install ruff

View File

@@ -21,7 +21,7 @@ jobs:
env:
PYTHON_VERSION: ${{ inputs.python-version }}
steps:
- uses: actions/checkout@v6
- uses: actions/checkout@v5
- name: Set up Python ${{ env.PYTHON_VERSION }}
uses: actions/setup-python@v6
@@ -66,7 +66,7 @@ jobs:
env:
PYTHON_VERSION: ${{ inputs.python-version }}
steps:
- uses: actions/checkout@v6
- uses: actions/checkout@v5
- name: Download Docker image artifact
uses: actions/download-artifact@v6
@@ -93,7 +93,7 @@ jobs:
env:
PYTHON_VERSION: ${{ inputs.python-version }}
steps:
- uses: actions/checkout@v6
- uses: actions/checkout@v5
- name: Download Docker image artifact
uses: actions/download-artifact@v6
@@ -132,7 +132,7 @@ jobs:
env:
PYTHON_VERSION: ${{ inputs.python-version }}
steps:
- uses: actions/checkout@v6
- uses: actions/checkout@v5
- name: Download Docker image artifact
uses: actions/download-artifact@v6
@@ -174,7 +174,7 @@ jobs:
env:
PYTHON_VERSION: ${{ inputs.python-version }}
steps:
- uses: actions/checkout@v6
- uses: actions/checkout@v5
- name: Download Docker image artifact
uses: actions/download-artifact@v6
@@ -214,7 +214,7 @@ jobs:
env:
PYTHON_VERSION: ${{ inputs.python-version }}
steps:
- uses: actions/checkout@v6
- uses: actions/checkout@v5
- name: Download Docker image artifact
uses: actions/download-artifact@v6
@@ -250,7 +250,7 @@ jobs:
env:
PYTHON_VERSION: ${{ inputs.python-version }}
steps:
- uses: actions/checkout@v6
- uses: actions/checkout@v5
- name: Download Docker image artifact
uses: actions/download-artifact@v6
@@ -279,7 +279,7 @@ jobs:
env:
PYTHON_VERSION: ${{ inputs.python-version }}
steps:
- uses: actions/checkout@v6
- uses: actions/checkout@v5
- name: Download Docker image artifact
uses: actions/download-artifact@v6
@@ -319,7 +319,7 @@ jobs:
env:
PYTHON_VERSION: ${{ inputs.python-version }}
steps:
- uses: actions/checkout@v6
- uses: actions/checkout@v5
- name: Download Docker image artifact
uses: actions/download-artifact@v6
@@ -350,7 +350,7 @@ jobs:
env:
PYTHON_VERSION: ${{ inputs.python-version }}
steps:
- uses: actions/checkout@v6
- uses: actions/checkout@v5
- name: Download Docker image artifact
uses: actions/download-artifact@v6
@@ -395,7 +395,7 @@ jobs:
env:
PYTHON_VERSION: ${{ inputs.python-version }}
steps:
- uses: actions/checkout@v6
- uses: actions/checkout@v5
- name: Download Docker image artifact
uses: actions/download-artifact@v6

View File

@@ -2,7 +2,7 @@
# Read more https://github.com/dgtlmoon/changedetection.io/wiki
# Semver means never use .01, or 00. Should be .1.
__version__ = '0.51.4'
__version__ = '0.51.1'
from changedetectionio.strtobool import strtobool
from json.decoder import JSONDecodeError
@@ -187,10 +187,6 @@ def main():
logger.critical(str(e))
return
# Inject datastore into plugins that need access to settings
from changedetectionio.pluggy_interface import inject_datastore_into_plugins
inject_datastore_into_plugins(datastore)
if default_url:
datastore.add_watch(url = default_url)

View File

@@ -1,5 +1,3 @@
from blinker import signal
from .processors.exceptions import ProcessorException
import changedetectionio.content_fetchers.exceptions as content_fetchers_exceptions
from changedetectionio.processors.text_json_diff.processor import FilterNotFoundInResponse
@@ -99,9 +97,6 @@ async def async_update_worker(worker_id, q, notification_q, app, datastore):
update_handler = processor_module.perform_site_check(datastore=datastore,
watch_uuid=uuid)
update_signal = signal('watch_small_status_comment')
update_signal.send(watch_uuid=uuid, status="Fetching page..")
# All fetchers are now async, so call directly
await update_handler.call_browser()
@@ -314,7 +309,6 @@ async def async_update_worker(worker_id, q, notification_q, app, datastore):
if not datastore.data['watching'].get(uuid):
continue
logger.debug(f"Processing watch UUID: {uuid} - xpath_data length returned {len(update_handler.xpath_data) if update_handler.xpath_data else 'empty.'}")
if process_changedetection_results:
try:
datastore.update_watch(uuid=uuid, update_obj=update_obj)

View File

@@ -439,7 +439,7 @@ class browsersteps_live_ui(steppable_browser_interface):
logger.warning("Attempted to get current state after cleanup")
return (None, None)
xpath_element_js = importlib.resources.files("changedetectionio.content_fetchers.res").joinpath('xpath_element_scraper.js').read_text(encoding="utf-8")
xpath_element_js = importlib.resources.files("changedetectionio.content_fetchers.res").joinpath('xpath_element_scraper.js').read_text()
now = time.time()
await self.page.wait_for_timeout(1 * 1000)

View File

@@ -17,12 +17,6 @@ def construct_blueprint(datastore: ChangeDetectionStore):
@login_optionally_required
def settings_page():
from changedetectionio import forms
from changedetectionio.pluggy_interface import (
get_plugin_settings_tabs,
load_plugin_settings,
save_plugin_settings
)
default = deepcopy(datastore.data['settings'])
if datastore.proxy_list is not None:
@@ -108,20 +102,6 @@ def construct_blueprint(datastore: ChangeDetectionStore):
return redirect(url_for('watchlist.index'))
datastore.needs_write_urgent = True
# Also save plugin settings from the same form submission
plugin_tabs_list = get_plugin_settings_tabs()
for tab in plugin_tabs_list:
plugin_id = tab['plugin_id']
form_class = tab['form_class']
# Instantiate plugin form with POST data
plugin_form = form_class(formdata=request.form)
# Save plugin settings (validation is optional for plugins)
if plugin_form.data:
save_plugin_settings(datastore.datastore_path, plugin_id, plugin_form.data)
flash("Settings updated.")
else:
@@ -130,30 +110,8 @@ def construct_blueprint(datastore: ChangeDetectionStore):
# Convert to ISO 8601 format, all date/time relative events stored as UTC time
utc_time = datetime.now(ZoneInfo("UTC")).isoformat()
# Get active plugins
from changedetectionio.pluggy_interface import get_active_plugins
import sys
active_plugins = get_active_plugins()
python_version = f"{sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}"
# Get plugin settings tabs and instantiate forms
plugin_tabs = get_plugin_settings_tabs()
plugin_forms = {}
for tab in plugin_tabs:
plugin_id = tab['plugin_id']
form_class = tab['form_class']
# Load existing settings
settings = load_plugin_settings(datastore.datastore_path, plugin_id)
# Instantiate the form with existing settings
plugin_forms[plugin_id] = form_class(data=settings)
output = render_template("settings.html",
active_plugins=active_plugins,
api_key=datastore.data['settings']['application'].get('api_access_token'),
python_version=python_version,
available_timezones=sorted(available_timezones()),
emailprefix=os.getenv('NOTIFICATION_MAIL_BUTTON_PREFIX', False),
extra_notification_token_placeholder_info=datastore.get_unique_notification_token_placeholders_available(),
@@ -163,8 +121,6 @@ def construct_blueprint(datastore: ChangeDetectionStore):
settings_application=datastore.data['settings']['application'],
timezone_default_config=datastore.data['settings']['application'].get('scheduler_timezone_default'),
utc_time=utc_time,
plugin_tabs=plugin_tabs,
plugin_forms=plugin_forms,
)
return output

View File

@@ -27,12 +27,6 @@
<li class="tab"><a href="#rss">RSS</a></li>
<li class="tab"><a href="#timedate">Time &amp Date</a></li>
<li class="tab"><a href="#proxies">CAPTCHA &amp; Proxies</a></li>
{% if plugin_tabs %}
{% for tab in plugin_tabs %}
<li class="tab"><a href="#plugin-{{ tab.plugin_id }}">{{ tab.tab_label }}</a></li>
{% endfor %}
{% endif %}
<li class="tab"><a href="#info">Info</a></li>
</ul>
</div>
<div class="box-wrap inner">
@@ -358,45 +352,7 @@ nav
</p>
{{ render_fieldlist_with_inline_errors(form.requests.form.extra_browsers) }}
</div>
</div>
{% if plugin_tabs %}
{% for tab in plugin_tabs %}
<div class="tab-pane-inner" id="plugin-{{ tab.plugin_id }}">
{% set plugin_form = plugin_forms[tab.plugin_id] %}
{% if tab.template_path %}
{# Plugin provides custom template - include it directly (no separate form) #}
{% include tab.template_path with context %}
{% else %}
{# Default form rendering - fields only, no submit button #}
<fieldset>
{% for field in plugin_form %}
{% if field.type != 'CSRFToken' and field.type != 'SubmitField' %}
<div class="pure-control-group">
{% if field.type == 'BooleanField' %}
{{ render_checkbox_field(field) }}
{% else %}
{{ render_field(field) }}
{% endif %}
</div>
{% endif %}
{% endfor %}
</fieldset>
{% endif %}
</div>
{% endfor %}
{% endif %}
<div class="tab-pane-inner" id="info">
<p><strong>Python version:</strong> {{ python_version }}</p>
<p><strong>Plugins active:</strong></p>
{% if active_plugins %}
<ul>
{% for plugin in active_plugins %}
<li><strong>{{ plugin.name }}</strong> - {{ plugin.description }}</li>
{% endfor %}
</ul>
{% else %}
<p>No plugins active</p>
{% endif %}
</div>
<div id="actions">
<div class="pure-control-group">

View File

@@ -223,13 +223,19 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, queuedWatchMe
watch = datastore.data['watching'].get(uuid)
# if system or watch is configured to need a chrome type browser
system_uses_webdriver = datastore.data['settings']['application']['fetch_backend'] == 'html_webdriver'
watch_needs_selenium_or_playwright = False
if (watch.get('fetch_backend') == 'system' and system_uses_webdriver) or watch.get('fetch_backend') == 'html_webdriver' or watch.get('fetch_backend', '').startswith('extra_browser_'):
watch_needs_selenium_or_playwright = True
from zoneinfo import available_timezones
# Import the global plugin system
from changedetectionio.pluggy_interface import collect_ui_edit_stats_extras, get_fetcher_capabilities
# Only works reliably with Playwright
# Get fetcher capabilities instead of hardcoded logic
capabilities = get_fetcher_capabilities(watch, datastore)
# Import the global plugin system
from changedetectionio.pluggy_interface import collect_ui_edit_stats_extras
app_rss_token = datastore.data['settings']['application'].get('rss_access_token'),
template_args = {
'available_processors': processors.available_processors(),
@@ -260,7 +266,7 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, queuedWatchMe
'using_global_webdriver_wait': not default['webdriver_delay'],
'uuid': uuid,
'watch': watch,
'capabilities': capabilities
'watch_needs_selenium_or_playwright': watch_needs_selenium_or_playwright,
}
included_content = None

View File

@@ -206,8 +206,9 @@ Math: {{ 1 + 1 }}") }}
</div>
<div class="tab-pane-inner" id="browser-steps">
{% if capabilities.supports_browser_steps %}
{% if visual_selector_data_ready %}
{% if watch_needs_selenium_or_playwright %}
{# Only works with playwright #}
{% if system_has_playwright_configured %}
<img class="beta-logo" src="{{url_for('static_content', group='images', filename='beta-logo.png')}}" alt="New beta functionality">
<fieldset>
<div class="pure-control-group">
@@ -247,13 +248,15 @@ Math: {{ 1 + 1 }}") }}
</div>
</fieldset>
{% else %}
<strong>Visual Selector data is not ready, watch needs to be checked atleast once.</strong>
{# it's configured to use selenium or chrome but system says its not configured #}
{{ playwright_warning() }}
{% if system_has_webdriver_configured %}
<strong>Selenium/Webdriver cant be used here because it wont fetch screenshots reliably.</strong>
{% endif %}
{% endif %}
{% else %}
<p>
<strong>Sorry, this functionality only works with fetchers that support interactive Javascript (so far only Playwright based fetchers)<br>
You need to <a href="#request">Set the fetch method</a> to one that supports interactive Javascript.</strong>
</p>
{# "This functionality needs chrome.." #}
{{ only_playwright_type_watches_warning() }}
{% endif %}
</div>
@@ -263,7 +266,7 @@ Math: {{ 1 + 1 }}") }}
<div class="pure-control-group inline-radio">
{{ render_ternary_field(form.notification_muted, BooleanField=true) }}
</div>
{% if capabilities.supports_screenshots %}
{% if watch_needs_selenium_or_playwright %}
<div class="pure-control-group inline-radio">
{{ render_checkbox_field(form.notification_screenshot) }}
<span class="pure-form-message-inline">
@@ -380,33 +383,35 @@ Math: {{ 1 + 1 }}") }}
<fieldset>
<div class="pure-control-group">
{% if capabilities.supports_screenshots and capabilities.supports_xpath_element_data %}
{% if visual_selector_data_ready %}
<span class="pure-form-message-inline" id="visual-selector-heading">
The Visual Selector tool lets you select the <i>text</i> elements that will be used for the change detection. It automatically fills-in the filters in the "CSS/JSONPath/JQ/XPath Filters" box of the <a href="#filters-and-triggers">Filters & Triggers</a> tab. Use <strong>Shift+Click</strong> to select multiple items.
</span>
{% if watch_needs_selenium_or_playwright %}
{% if system_has_playwright_configured %}
<span class="pure-form-message-inline" id="visual-selector-heading">
The Visual Selector tool lets you select the <i>text</i> elements that will be used for the change detection. It automatically fills-in the filters in the "CSS/JSONPath/JQ/XPath Filters" box of the <a href="#filters-and-triggers">Filters & Triggers</a> tab. Use <strong>Shift+Click</strong> to select multiple items.
</span>
<div id="selector-header">
<a id="clear-selector" class="pure-button button-secondary button-xsmall" style="font-size: 70%">Clear selection</a>
<!-- visual selector IMG will try to load, it will either replace this or on error replace it with some handy text -->
<i class="fetching-update-notice" style="font-size: 80%;">One moment, fetching screenshot and element information..</i>
</div>
<div id="selector-wrapper" style="display: none">
<!-- request the screenshot and get the element offset info ready -->
<!-- use img src ready load to know everything is ready to map out -->
<!-- @todo: maybe something interesting like a field to select 'elements that contain text... and their parents n' -->
<img id="selector-background" >
<canvas id="selector-canvas"></canvas>
</div>
<div id="selector-current-xpath" style="overflow-x: hidden"><strong>Currently:</strong>&nbsp;<span class="text">Loading...</span></div>
{% else %}
<strong>Visual Selector data is not ready, watch needs to be checked atleast once.</strong>
{% endif %}
<div id="selector-header">
<a id="clear-selector" class="pure-button button-secondary button-xsmall" style="font-size: 70%">Clear selection</a>
<!-- visual selector IMG will try to load, it will either replace this or on error replace it with some handy text -->
<i class="fetching-update-notice" style="font-size: 80%;">One moment, fetching screenshot and element information..</i>
</div>
<div id="selector-wrapper" style="display: none">
<!-- request the screenshot and get the element offset info ready -->
<!-- use img src ready load to know everything is ready to map out -->
<!-- @todo: maybe something interesting like a field to select 'elements that contain text... and their parents n' -->
<img id="selector-background" >
<canvas id="selector-canvas"></canvas>
</div>
<div id="selector-current-xpath" style="overflow-x: hidden"><strong>Currently:</strong>&nbsp;<span class="text">Loading...</span></div>
{% else %}
{# The watch needed chrome but system says that playwright is not ready #}
{{ playwright_warning() }}
{% endif %}
{% if system_has_webdriver_configured %}
<strong>Selenium/Webdriver cant be used here because it wont fetch screenshots reliably.</strong>
{% endif %}
{% else %}
<p>
<strong>Sorry, this functionality only works with fetchers that support Javascript and screenshots (such as playwright etc).<br>
You need to <a href="#request">Set the fetch method</a> to one that supports Javascript and screenshots.</strong>
</p>
{# "This functionality needs chrome.." #}
{{ only_playwright_type_watches_warning() }}
{% endif %}
</div>
</fieldset>

View File

@@ -57,26 +57,22 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, queuedWatchMe
except Exception as e:
content.append({'line': f"File doesnt exist or unable to read timestamp {timestamp}", 'classes': ''})
from changedetectionio.pluggy_interface import get_fetcher_capabilities
capabilities = get_fetcher_capabilities(watch, datastore)
output = render_template("preview.html",
capabilities=capabilities,
content=content,
current_diff_url=watch['url'],
current_version=timestamp,
history_n=watch.history_n,
extra_stylesheets=extra_stylesheets,
extra_title=f" - Diff - {watch.label} @ {timestamp}",
history_n=watch.history_n,
triggered_line_numbers=triggered_line_numbers,
current_diff_url=watch['url'],
screenshot=watch.get_screenshot(),
watch=watch,
uuid=uuid,
is_html_webdriver=is_html_webdriver,
last_error=watch['last_error'],
last_error_screenshot=watch.get_error_snapshot(),
last_error_text=watch.get_error_text(),
screenshot=watch.get_screenshot(),
triggered_line_numbers=triggered_line_numbers,
uuid=uuid,
versions=versions,
watch=watch,
last_error_screenshot=watch.get_error_snapshot(),
versions=versions
)
return output
@@ -178,31 +174,29 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, queuedWatchMe
password_enabled_and_share_is_off = not datastore.data['settings']['application'].get('shared_diff_access')
datastore.set_last_viewed(uuid, time.time())
from changedetectionio.pluggy_interface import get_fetcher_capabilities
capabilities = get_fetcher_capabilities(watch, datastore)
return render_template("diff.html",
capabilities=capabilities,
current_diff_url=watch['url'],
extra_stylesheets=extra_stylesheets,
extra_title=f" - Diff - {watch.label}",
extract_form=extract_form,
from_version=str(from_version),
from_version_file_contents=from_version_file_contents,
is_html_webdriver=is_html_webdriver,
last_error=watch['last_error'],
last_error_screenshot=watch.get_error_snapshot(),
last_error_text=watch.get_error_text(),
left_sticky=True,
newest=to_version_file_contents,
newest_version_timestamp=dates[-1],
password_enabled_and_share_is_off=password_enabled_and_share_is_off,
screenshot=screenshot_url,
to_version=str(to_version),
to_version_file_contents=to_version_file_contents,
uuid=uuid,
versions=dates, # All except current/last
watch_a=watch
)
current_diff_url=watch['url'],
from_version=str(from_version),
to_version=str(to_version),
extra_stylesheets=extra_stylesheets,
extra_title=f" - Diff - {watch.label}",
extract_form=extract_form,
is_html_webdriver=is_html_webdriver,
last_error=watch['last_error'],
last_error_screenshot=watch.get_error_snapshot(),
last_error_text=watch.get_error_text(),
left_sticky=True,
newest=to_version_file_contents,
newest_version_timestamp=dates[-1],
password_enabled_and_share_is_off=password_enabled_and_share_is_off,
from_version_file_contents=from_version_file_contents,
to_version_file_contents=to_version_file_contents,
screenshot=screenshot_url,
uuid=uuid,
versions=dates, # All except current/last
watch_a=watch
)
@views_blueprint.route("/diff/<string:uuid>", methods=['GET'])
@login_optionally_required

View File

@@ -182,9 +182,11 @@ document.addEventListener('DOMContentLoaded', function() {
</div>
<div class="status-icons">
<a class="link-spread" href="{{url_for('ui.form_share_put_watch', uuid=watch.uuid)}}"><img src="{{url_for('static_content', group='images', filename='spread.svg')}}" class="status-icon icon icon-spread" title="Create a link to share watch config with others" ></a>
{%- set effective_fetcher = watch.get_fetch_backend if watch.get_fetch_backend != "system" else system_default_fetcher -%}
{%- if effective_fetcher and ("html_webdriver" in effective_fetcher or "html_" in effective_fetcher or "extra_browser_" in effective_fetcher) -%}
{{ effective_fetcher|fetcher_status_icons }}
{%- if watch.get_fetch_backend == "html_webdriver"
or ( watch.get_fetch_backend == "system" and system_default_fetcher == 'html_webdriver' )
or "extra_browser_" in watch.get_fetch_backend
-%}
<img class="status-icon" src="{{url_for('static_content', group='images', filename='google-chrome-icon.png')}}" alt="Using a Chrome browser" title="Using a Chrome browser" >
{%- endif -%}
{%- if watch.is_pdf -%}<img class="status-icon" src="{{url_for('static_content', group='images', filename='pdf-icon.svg')}}" alt="Converting PDF to text" >{%- endif -%}
{%- if watch.has_browser_steps -%}<img class="status-icon status-browsersteps" src="{{url_for('static_content', group='images', filename='steps.svg')}}" alt="Browser Steps is enabled" >{%- endif -%}
@@ -205,7 +207,7 @@ document.addEventListener('DOMContentLoaded', function() {
{%- if watch.get('restock') and watch['restock']['price'] != None -%}
{%- if watch['restock']['price'] != None -%}
<span class="restock-label price" title="Price">
{{ watch['restock']['price']|format_number_locale if watch['restock'].get('price') else '' }} {{ watch['restock'].get('currency','') }}
{{ watch['restock']['price']|format_number_locale }} {{ watch['restock']['currency'] }}
</span>
{%- endif -%}
{%- elif not watch.has_restock_info -%}
@@ -217,7 +219,7 @@ document.addEventListener('DOMContentLoaded', function() {
{#last_checked becomes fetch-start-time#}
<td class="last-checked" data-timestamp="{{ watch.last_checked }}" data-fetchduration={{ watch.fetch_time }} data-eta_complete="{{ watch.last_checked+watch.fetch_time }}" >
<div class="spinner-wrapper" style="display:none;" >
<span class="spinner"></span><span class="status-text">&nbsp;Checking now</span>
<span class="spinner"></span><span>&nbsp;Checking now</span>
</div>
<span class="innertext">{{watch|format_last_checked_time|safe}}</span>
</td>

View File

@@ -1,7 +1,3 @@
"""
Levenshtein distance and similarity plugin for text change detection.
Provides metrics for measuring text similarity between snapshots.
"""
import pluggy
from loguru import logger

View File

@@ -1,7 +1,3 @@
"""
Word count plugin for content analysis.
Provides word count metrics for snapshot content.
"""
import pluggy
from loguru import logger

View File

@@ -7,9 +7,6 @@ import os
# Visual Selector scraper - 'Button' is there because some sites have <button>OUT OF STOCK</button>.
visualselector_xpath_selectors = 'div,span,form,table,tbody,tr,td,a,p,ul,li,h1,h2,h3,h4,header,footer,section,article,aside,details,main,nav,section,summary,button'
# Import hookimpl from centralized pluggy interface
from changedetectionio.pluggy_interface import hookimpl
SCREENSHOT_MAX_HEIGHT_DEFAULT = 20000
SCREENSHOT_DEFAULT_QUALITY = 40
@@ -38,54 +35,17 @@ def available_fetchers():
# See the if statement at the bottom of this file for how we switch between playwright and webdriver
import inspect
p = []
# Get built-in fetchers (but skip plugin fetchers that were added via setattr)
for name, obj in inspect.getmembers(sys.modules[__name__], inspect.isclass):
if inspect.isclass(obj):
# @todo html_ is maybe better as fetcher_ or something
# In this case, make sure to edit the default one in store.py and fetch_site_status.py
if name.startswith('html_'):
# Skip plugin fetchers that were already registered
if name not in _plugin_fetchers:
t = tuple([name, obj.fetcher_description])
p.append(t)
# Get plugin fetchers from cache (already loaded at module init)
for name, fetcher_class in _plugin_fetchers.items():
if hasattr(fetcher_class, 'fetcher_description'):
t = tuple([name, fetcher_class.fetcher_description])
p.append(t)
else:
logger.warning(f"Plugin fetcher '{name}' does not have fetcher_description attribute")
t = tuple([name, obj.fetcher_description])
p.append(t)
return p
def get_plugin_fetchers():
"""Load and return all plugin fetchers from the centralized plugin manager."""
from changedetectionio.pluggy_interface import plugin_manager
fetchers = {}
try:
# Call the register_content_fetcher hook from all registered plugins
results = plugin_manager.hook.register_content_fetcher()
for result in results:
if result:
name, fetcher_class = result
fetchers[name] = fetcher_class
# Register in current module so hasattr() checks work
setattr(sys.modules[__name__], name, fetcher_class)
logger.info(f"Registered plugin fetcher: {name} - {getattr(fetcher_class, 'fetcher_description', 'No description')}")
except Exception as e:
logger.error(f"Error loading plugin fetchers: {e}")
return fetchers
# Initialize plugins at module load time
_plugin_fetchers = get_plugin_fetchers()
# Decide which is the 'real' HTML webdriver, this is more a system wide config
# rather than site-specific.
use_playwright_as_chrome_fetcher = os.getenv('PLAYWRIGHT_DRIVER_URL', False)
@@ -102,8 +62,3 @@ else:
logger.debug("Falling back to selenium as fetcher")
from .webdriver_selenium import fetcher as html_webdriver
# Register built-in fetchers as plugins after all imports are complete
from changedetectionio.pluggy_interface import register_builtin_fetchers
register_builtin_fetchers()

View File

@@ -64,30 +64,6 @@ class Fetcher():
# Time ONTOP of the system defined env minimum time
render_extract_delay = 0
# Fetcher capability flags - subclasses should override these
# These indicate what features the fetcher supports
supports_browser_steps = False # Can execute browser automation steps
supports_screenshots = False # Can capture page screenshots
supports_xpath_element_data = False # Can extract xpath element positions/data for visual selector
@classmethod
def get_status_icon_data(cls):
"""Return data for status icon to display in the watch overview.
This method can be overridden by subclasses to provide custom status icons.
Returns:
dict or None: Dictionary with icon data:
{
'filename': 'icon-name.svg', # Icon filename
'alt': 'Alt text', # Alt attribute
'title': 'Tooltip text', # Title attribute
'style': 'height: 1em;' # Optional inline CSS
}
Or None if no icon
"""
return None
def clear_content(self):
"""
Explicitly clear all content from memory to free up heap space.
@@ -116,7 +92,6 @@ class Fetcher():
request_method=None,
timeout=None,
url=None,
watch_uuid=None,
):
# Should set self.error, self.status_code and self.content
pass

View File

@@ -89,20 +89,6 @@ class fetcher(Fetcher):
proxy = None
# Capability flags
supports_browser_steps = True
supports_screenshots = True
supports_xpath_element_data = True
@classmethod
def get_status_icon_data(cls):
"""Return Chrome browser icon data for Playwright fetcher."""
return {
'filename': 'google-chrome-icon.png',
'alt': 'Using a Chrome browser',
'title': 'Using a Chrome browser'
}
def __init__(self, proxy_override=None, custom_browser_connection_url=None):
super().__init__()
@@ -167,7 +153,6 @@ class fetcher(Fetcher):
request_method=None,
timeout=None,
url=None,
watch_uuid=None,
):
from playwright.async_api import async_playwright
@@ -345,17 +330,4 @@ class fetcher(Fetcher):
browser = None
# Plugin registration for built-in fetcher
class PlaywrightFetcherPlugin:
"""Plugin class that registers the Playwright fetcher as a built-in plugin."""
def register_content_fetcher(self):
"""Register the Playwright fetcher"""
return ('html_webdriver', fetcher)
# Create module-level instance for plugin registration
playwright_plugin = PlaywrightFetcherPlugin()

View File

@@ -98,20 +98,6 @@ class fetcher(Fetcher):
proxy = None
# Capability flags
supports_browser_steps = True
supports_screenshots = True
supports_xpath_element_data = True
@classmethod
def get_status_icon_data(cls):
"""Return Chrome browser icon data for Puppeteer fetcher."""
return {
'filename': 'google-chrome-icon.png',
'alt': 'Using a Chrome browser',
'title': 'Using a Chrome browser'
}
def __init__(self, proxy_override=None, custom_browser_connection_url=None):
super().__init__()
@@ -169,7 +155,6 @@ class fetcher(Fetcher):
request_method,
timeout,
url,
watch_uuid
):
import re
self.delete_browser_steps_screenshots()
@@ -377,7 +362,6 @@ class fetcher(Fetcher):
request_method=None,
timeout=None,
url=None,
watch_uuid=None,
):
#@todo make update_worker async which could run any of these content_fetchers within memory and time constraints
@@ -396,21 +380,7 @@ class fetcher(Fetcher):
request_method=request_method,
timeout=timeout,
url=url,
watch_uuid=watch_uuid,
), timeout=max_time
)
except asyncio.TimeoutError:
raise (BrowserFetchTimedOut(msg=f"Browser connected but was unable to process the page in {max_time} seconds."))
# Plugin registration for built-in fetcher
class PuppeteerFetcherPlugin:
"""Plugin class that registers the Puppeteer fetcher as a built-in plugin."""
def register_content_fetcher(self):
"""Register the Puppeteer fetcher"""
return ('html_webdriver', fetcher)
# Create module-level instance for plugin registration
puppeteer_plugin = PuppeteerFetcherPlugin()

View File

@@ -1,7 +1,6 @@
from loguru import logger
import hashlib
import os
import re
import asyncio
from changedetectionio import strtobool
from changedetectionio.content_fetchers.exceptions import BrowserStepsInUnsupportedFetcher, EmptyReply, Non200ErrorCodeReceived
@@ -26,9 +25,7 @@ class fetcher(Fetcher):
ignore_status_codes=False,
current_include_filters=None,
is_binary=False,
empty_pages_are_a_change=False,
watch_uuid=None,
):
empty_pages_are_a_change=False):
"""Synchronous version of run - the original requests implementation"""
import chardet
@@ -79,22 +76,9 @@ class fetcher(Fetcher):
if not is_binary:
# Don't run this for PDF (and requests identified as binary) takes a _long_ time
if not r.headers.get('content-type') or not 'charset=' in r.headers.get('content-type'):
# For XML/RSS feeds, check the XML declaration for encoding attribute
# This is more reliable than chardet which can misdetect UTF-8 as MacRoman
content_type = r.headers.get('content-type', '').lower()
if 'xml' in content_type or 'rss' in content_type:
# Look for <?xml version="1.0" encoding="UTF-8"?>
xml_encoding_match = re.search(rb'<\?xml[^>]+encoding=["\']([^"\']+)["\']', r.content[:200])
if xml_encoding_match:
r.encoding = xml_encoding_match.group(1).decode('ascii')
else:
# Default to UTF-8 for XML if no encoding found
r.encoding = 'utf-8'
else:
# For other content types, use chardet
encoding = chardet.detect(r.content)['encoding']
if encoding:
r.encoding = encoding
encoding = chardet.detect(r.content)['encoding']
if encoding:
r.encoding = encoding
self.headers = r.headers
@@ -131,7 +115,6 @@ class fetcher(Fetcher):
request_method=None,
timeout=None,
url=None,
watch_uuid=None,
):
"""Async wrapper that runs the synchronous requests code in a thread pool"""
@@ -149,8 +132,7 @@ class fetcher(Fetcher):
ignore_status_codes=ignore_status_codes,
current_include_filters=current_include_filters,
is_binary=is_binary,
empty_pages_are_a_change=empty_pages_are_a_change,
watch_uuid=watch_uuid,
empty_pages_are_a_change=empty_pages_are_a_change
)
)
@@ -167,15 +149,3 @@ class fetcher(Fetcher):
except Exception as e:
logger.warning(f"Failed to unlink screenshot: {screenshot} - {e}")
# Plugin registration for built-in fetcher
class RequestsFetcherPlugin:
"""Plugin class that registers the requests fetcher as a built-in plugin."""
def register_content_fetcher(self):
"""Register the requests fetcher"""
return ('html_requests', fetcher)
# Create module-level instance for plugin registration
requests_plugin = RequestsFetcherPlugin()

View File

@@ -14,20 +14,6 @@ class fetcher(Fetcher):
proxy = None
proxy_url = None
# Capability flags
supports_browser_steps = True
supports_screenshots = True
supports_xpath_element_data = True
@classmethod
def get_status_icon_data(cls):
"""Return Chrome browser icon data for WebDriver fetcher."""
return {
'filename': 'google-chrome-icon.png',
'alt': 'Using a Chrome browser',
'title': 'Using a Chrome browser'
}
def __init__(self, proxy_override=None, custom_browser_connection_url=None):
super().__init__()
from urllib.parse import urlparse
@@ -71,7 +57,6 @@ class fetcher(Fetcher):
request_method=None,
timeout=None,
url=None,
watch_uuid=None,
):
import asyncio
@@ -156,16 +141,3 @@ class fetcher(Fetcher):
# Run the selenium operations in a thread pool to avoid blocking the event loop
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, _run_sync)
# Plugin registration for built-in fetcher
class WebDriverSeleniumFetcherPlugin:
"""Plugin class that registers the WebDriver Selenium fetcher as a built-in plugin."""
def register_content_fetcher(self):
"""Register the WebDriver Selenium fetcher"""
return ('html_webdriver', fetcher)
# Create module-level instance for plugin registration
webdriver_selenium_plugin = WebDriverSeleniumFetcherPlugin()

View File

@@ -81,28 +81,6 @@ if os.getenv('FLASK_SERVER_NAME'):
# Disables caching of the templates
app.config['TEMPLATES_AUTO_RELOAD'] = True
app.jinja_env.add_extension('jinja2.ext.loopcontrols')
# Configure Jinja2 to search for templates in plugin directories
def _configure_plugin_templates():
"""Configure Jinja2 loader to include plugin template directories."""
from jinja2 import ChoiceLoader, FileSystemLoader
from changedetectionio.pluggy_interface import get_plugin_template_paths
# Get plugin template paths
plugin_template_paths = get_plugin_template_paths()
if plugin_template_paths:
# Create a ChoiceLoader that searches app templates first, then plugin templates
loaders = [app.jinja_loader] # Keep the default app loader first
for path in plugin_template_paths:
loaders.append(FileSystemLoader(path))
app.jinja_loader = ChoiceLoader(loaders)
logger.info(f"Configured Jinja2 to search {len(plugin_template_paths)} plugin template directories")
# Configure plugin templates (called after plugins are loaded)
_configure_plugin_templates()
csrf = CSRFProtect()
csrf.init_app(app)
notification_debug_log=[]
@@ -232,55 +210,6 @@ def _jinja2_filter_seconds_precise(timestamp):
return format(int(time.time()-timestamp), ',d')
@app.template_filter('fetcher_status_icons')
def _jinja2_filter_fetcher_status_icons(fetcher_name):
"""Get status icon HTML for a given fetcher.
This filter checks both built-in fetchers and plugin fetchers for status icons.
Args:
fetcher_name: The fetcher name (e.g., 'html_webdriver', 'html_js_zyte')
Returns:
str: HTML string containing status icon elements
"""
from changedetectionio import content_fetchers
from changedetectionio.pluggy_interface import collect_fetcher_status_icons
from markupsafe import Markup
from flask import url_for
icon_data = None
# First check if it's a plugin fetcher (plugins have priority)
plugin_icon_data = collect_fetcher_status_icons(fetcher_name)
if plugin_icon_data:
icon_data = plugin_icon_data
# Check if it's a built-in fetcher
elif hasattr(content_fetchers, fetcher_name):
fetcher_class = getattr(content_fetchers, fetcher_name)
if hasattr(fetcher_class, 'get_status_icon_data'):
icon_data = fetcher_class.get_status_icon_data()
# Build HTML from icon data
if icon_data and isinstance(icon_data, dict):
# Use 'group' from icon_data if specified, otherwise default to 'images'
group = icon_data.get('group', 'images')
# Try to use url_for, but fall back to manual URL building if endpoint not registered yet
try:
icon_url = url_for('static_content', group=group, filename=icon_data['filename'])
except:
# Fallback: build URL manually respecting APPLICATION_ROOT
from flask import request
app_root = request.script_root if hasattr(request, 'script_root') else ''
icon_url = f"{app_root}/static/{group}/{icon_data['filename']}"
style_attr = f' style="{icon_data["style"]}"' if icon_data.get('style') else ''
html = f'<img class="status-icon" src="{icon_url}" alt="{icon_data["alt"]}" title="{icon_data["title"]}"{style_attr}>'
return Markup(html)
return ''
# Import login_optionally_required from auth_decorator
from changedetectionio.auth_decorator import login_optionally_required
@@ -559,31 +488,6 @@ def changedetection_app(config=None, datastore_o=None):
except FileNotFoundError:
abort(404)
# Handle plugin group specially
if group == 'plugin':
# Serve files from plugin static directories
from changedetectionio.pluggy_interface import plugin_manager
import os as os_check
for plugin_name, plugin_obj in plugin_manager.list_name_plugin():
if hasattr(plugin_obj, 'plugin_static_path'):
try:
static_path = plugin_obj.plugin_static_path()
if static_path and os_check.path.isdir(static_path):
# Check if file exists in plugin's static directory
plugin_file_path = os_check.path.join(static_path, filename)
if os_check.path.isfile(plugin_file_path):
# Found the file in a plugin
response = make_response(send_from_directory(static_path, filename))
response.headers['Cache-Control'] = 'max-age=3600, public' # Cache for 1 hour
return response
except Exception as e:
logger.debug(f"Error checking plugin {plugin_name} for static file: {e}")
pass
# File not found in any plugin
abort(404)
# These files should be in our subdirectory
try:
return send_from_directory(f"static/{group}", path=filename)

View File

@@ -172,131 +172,99 @@ def elementpath_tostring(obj):
return str(obj)
# Return str Utf-8 of matched rules
def xpath_filter(xpath_filter, html_content, append_pretty_line_formatting=False, is_xml=False):
"""
:param xpath_filter:
:param html_content:
:param append_pretty_line_formatting:
:param is_xml: set to true if is XML or is RSS (RSS is XML)
:return:
"""
def xpath_filter(xpath_filter, html_content, append_pretty_line_formatting=False, is_rss=False):
from lxml import etree, html
import elementpath
# xpath 2.0-3.1
from elementpath.xpath3 import XPath3Parser
parser = etree.HTMLParser()
tree = None
try:
if is_xml:
# So that we can keep CDATA for cdata_in_document_to_text() to process
parser = etree.XMLParser(strip_cdata=False)
# For XML/RSS content, use etree.fromstring to properly handle XML declarations
tree = etree.fromstring(html_content.encode('utf-8') if isinstance(html_content, str) else html_content, parser=parser)
if is_rss:
# So that we can keep CDATA for cdata_in_document_to_text() to process
parser = etree.XMLParser(strip_cdata=False)
tree = html.fromstring(bytes(html_content, encoding='utf-8'), parser=parser)
html_block = ""
# Build namespace map for XPath queries
namespaces = {'re': 'http://exslt.org/regular-expressions'}
# Handle default namespace in documents (common in RSS/Atom feeds, but can occur in any XML)
# XPath spec: unprefixed element names have no namespace, not the default namespace
# Solution: Register the default namespace with empty string prefix in elementpath
# This is primarily for RSS/Atom feeds but works for any XML with default namespace
if hasattr(tree, 'nsmap') and tree.nsmap and None in tree.nsmap:
# Register the default namespace with empty string prefix for elementpath
# This allows //title to match elements in the default namespace
namespaces[''] = tree.nsmap[None]
r = elementpath.select(tree, xpath_filter.strip(), namespaces=namespaces, parser=XPath3Parser)
#@note: //title/text() now works with default namespaces (fixed by registering '' prefix)
#@note: //title/text() wont work where <title>CDATA.. (use cdata_in_document_to_text first)
if type(r) != list:
r = [r]
for element in r:
# When there's more than 1 match, then add the suffix to separate each line
# And where the matched result doesn't include something that will cause Inscriptis to add a newline
# (This way each 'match' reliably has a new-line in the diff)
# Divs are converted to 4 whitespaces by inscriptis
if append_pretty_line_formatting and len(html_block) and (not hasattr( element, 'tag' ) or not element.tag in (['br', 'hr', 'div', 'p'])):
html_block += TEXT_FILTER_LIST_LINE_SUFFIX
if type(element) == str:
html_block += element
elif issubclass(type(element), etree._Element) or issubclass(type(element), etree._ElementTree):
html_block += etree.tostring(element, pretty_print=True).decode('utf-8')
else:
tree = html.fromstring(html_content, parser=parser)
html_block = ""
html_block += elementpath_tostring(element)
# Build namespace map for XPath queries
namespaces = {'re': 'http://exslt.org/regular-expressions'}
# Handle default namespace in documents (common in RSS/Atom feeds, but can occur in any XML)
# XPath spec: unprefixed element names have no namespace, not the default namespace
# Solution: Register the default namespace with empty string prefix in elementpath
# This is primarily for RSS/Atom feeds but works for any XML with default namespace
if hasattr(tree, 'nsmap') and tree.nsmap and None in tree.nsmap:
# Register the default namespace with empty string prefix for elementpath
# This allows //title to match elements in the default namespace
namespaces[''] = tree.nsmap[None]
r = elementpath.select(tree, xpath_filter.strip(), namespaces=namespaces, parser=XPath3Parser)
#@note: //title/text() now works with default namespaces (fixed by registering '' prefix)
#@note: //title/text() wont work where <title>CDATA.. (use cdata_in_document_to_text first)
if type(r) != list:
r = [r]
for element in r:
# When there's more than 1 match, then add the suffix to separate each line
# And where the matched result doesn't include something that will cause Inscriptis to add a newline
# (This way each 'match' reliably has a new-line in the diff)
# Divs are converted to 4 whitespaces by inscriptis
if append_pretty_line_formatting and len(html_block) and (not hasattr( element, 'tag' ) or not element.tag in (['br', 'hr', 'div', 'p'])):
html_block += TEXT_FILTER_LIST_LINE_SUFFIX
if type(element) == str:
html_block += element
elif issubclass(type(element), etree._Element) or issubclass(type(element), etree._ElementTree):
# Use 'xml' method for RSS/XML content, 'html' for HTML content
# parser will be XMLParser if we detected XML content
method = 'xml' if (is_xml or isinstance(parser, etree.XMLParser)) else 'html'
html_block += etree.tostring(element, pretty_print=True, method=method, encoding='unicode')
else:
html_block += elementpath_tostring(element)
return html_block
finally:
# Explicitly clear the tree to free memory
# lxml trees can hold significant memory, especially with large documents
if tree is not None:
tree.clear()
return html_block
# Return str Utf-8 of matched rules
# 'xpath1:'
def xpath1_filter(xpath_filter, html_content, append_pretty_line_formatting=False, is_xml=False):
def xpath1_filter(xpath_filter, html_content, append_pretty_line_formatting=False, is_rss=False):
from lxml import etree, html
parser = None
tree = None
try:
if is_xml:
# So that we can keep CDATA for cdata_in_document_to_text() to process
parser = etree.XMLParser(strip_cdata=False)
# For XML/RSS content, use etree.fromstring to properly handle XML declarations
tree = etree.fromstring(html_content.encode('utf-8') if isinstance(html_content, str) else html_content, parser=parser)
if is_rss:
# So that we can keep CDATA for cdata_in_document_to_text() to process
parser = etree.XMLParser(strip_cdata=False)
tree = html.fromstring(bytes(html_content, encoding='utf-8'), parser=parser)
html_block = ""
# Build namespace map for XPath queries
namespaces = {'re': 'http://exslt.org/regular-expressions'}
# NOTE: lxml's native xpath() does NOT support empty string prefix for default namespace
# For documents with default namespace (RSS/Atom feeds), users must use:
# - local-name(): //*[local-name()='title']/text()
# - Or use xpath_filter (not xpath1_filter) which supports default namespaces
# XPath spec: unprefixed element names have no namespace, not the default namespace
r = tree.xpath(xpath_filter.strip(), namespaces=namespaces)
#@note: xpath1 (lxml) does NOT automatically handle default namespaces
#@note: Use //*[local-name()='element'] or switch to xpath_filter for default namespace support
#@note: //title/text() wont work where <title>CDATA.. (use cdata_in_document_to_text first)
for element in r:
# When there's more than 1 match, then add the suffix to separate each line
# And where the matched result doesn't include something that will cause Inscriptis to add a newline
# (This way each 'match' reliably has a new-line in the diff)
# Divs are converted to 4 whitespaces by inscriptis
if append_pretty_line_formatting and len(html_block) and (not hasattr(element, 'tag') or not element.tag in (['br', 'hr', 'div', 'p'])):
html_block += TEXT_FILTER_LIST_LINE_SUFFIX
# Some kind of text, UTF-8 or other
if isinstance(element, (str, bytes)):
html_block += element
else:
tree = html.fromstring(html_content, parser=parser)
html_block = ""
# Return the HTML which will get parsed as text
html_block += etree.tostring(element, pretty_print=True).decode('utf-8')
# Build namespace map for XPath queries
namespaces = {'re': 'http://exslt.org/regular-expressions'}
# NOTE: lxml's native xpath() does NOT support empty string prefix for default namespace
# For documents with default namespace (RSS/Atom feeds), users must use:
# - local-name(): //*[local-name()='title']/text()
# - Or use xpath_filter (not xpath1_filter) which supports default namespaces
# XPath spec: unprefixed element names have no namespace, not the default namespace
r = tree.xpath(xpath_filter.strip(), namespaces=namespaces)
#@note: xpath1 (lxml) does NOT automatically handle default namespaces
#@note: Use //*[local-name()='element'] or switch to xpath_filter for default namespace support
#@note: //title/text() wont work where <title>CDATA.. (use cdata_in_document_to_text first)
for element in r:
# When there's more than 1 match, then add the suffix to separate each line
# And where the matched result doesn't include something that will cause Inscriptis to add a newline
# (This way each 'match' reliably has a new-line in the diff)
# Divs are converted to 4 whitespaces by inscriptis
if append_pretty_line_formatting and len(html_block) and (not hasattr(element, 'tag') or not element.tag in (['br', 'hr', 'div', 'p'])):
html_block += TEXT_FILTER_LIST_LINE_SUFFIX
# Some kind of text, UTF-8 or other
if isinstance(element, (str, bytes)):
html_block += element
else:
# Return the HTML/XML which will get parsed as text
# Use 'xml' method for RSS/XML content, 'html' for HTML content
# parser will be XMLParser if we detected XML content
method = 'xml' if (is_xml or isinstance(parser, etree.XMLParser)) else 'html'
html_block += etree.tostring(element, pretty_print=True, method=method, encoding='unicode')
return html_block
finally:
# Explicitly clear the tree to free memory
# lxml trees can hold significant memory, especially with large documents
if tree is not None:
tree.clear()
return html_block
# Extract/find element
def extract_element(find='title', html_content=''):

View File

@@ -2,7 +2,6 @@ import pluggy
import os
import importlib
import sys
from loguru import logger
# Global plugin namespace for changedetection.io
PLUGIN_NAMESPACE = "changedetectionio"
@@ -17,94 +16,15 @@ class ChangeDetectionSpec:
@hookspec
def ui_edit_stats_extras(watch):
"""Return HTML content to add to the stats tab in the edit view.
Args:
watch: The watch object being edited
Returns:
str: HTML content to be inserted in the stats tab
"""
pass
@hookspec
def register_content_fetcher(self):
"""Return a tuple of (fetcher_name, fetcher_class) for content fetcher plugins.
The fetcher_name should start with 'html_' and the fetcher_class
should inherit from changedetectionio.content_fetchers.base.Fetcher
Returns:
tuple: (str: fetcher_name, class: fetcher_class)
"""
pass
@hookspec
def fetcher_status_icon(fetcher_name):
"""Return status icon HTML attributes for a content fetcher.
Args:
fetcher_name: The name of the fetcher (e.g., 'html_webdriver', 'html_js_zyte')
Returns:
str: HTML string containing <img> tags or other status icon elements
Empty string if no custom status icon is needed
"""
pass
@hookspec
def plugin_static_path(self):
"""Return the path to the plugin's static files directory.
Returns:
str: Absolute path to the plugin's static directory, or None if no static files
"""
pass
@hookspec
def get_itemprop_availability_override(self, content, fetcher_name, fetcher_instance, url):
"""Provide custom implementation of get_itemprop_availability for a specific fetcher.
This hook allows plugins to provide their own product availability detection
when their fetcher is being used. This is called as a fallback when the built-in
method doesn't find good data.
Args:
content: The HTML/text content to parse
fetcher_name: The name of the fetcher being used (e.g., 'html_js_zyte')
fetcher_instance: The fetcher instance that generated the content
url: The URL being watched/checked
Returns:
dict or None: Dictionary with availability data:
{
'price': float or None,
'availability': str or None, # e.g., 'in stock', 'out of stock'
'currency': str or None, # e.g., 'USD', 'EUR'
}
Or None if this plugin doesn't handle this fetcher or couldn't extract data
"""
pass
@hookspec
def plugin_settings_tab(self):
"""Return settings tab information for this plugin.
This hook allows plugins to add their own settings tab to the settings page.
Settings will be saved to a separate JSON file in the datastore directory.
Returns:
dict or None: Dictionary with settings tab information:
{
'plugin_id': str, # Unique identifier (e.g., 'zyte_fetcher')
'tab_label': str, # Display name for tab (e.g., 'Zyte Fetcher')
'form_class': Form, # WTForms Form class for the settings
'template_path': str, # Optional: path to Jinja2 template (relative to plugin)
# If not provided, a default form renderer will be used
}
Or None if this plugin doesn't provide settings
"""
pass
# Set up Plugin Manager
plugin_manager = pluggy.PluginManager(PLUGIN_NAMESPACE)
@@ -145,311 +65,18 @@ load_plugins_from_directories()
# Discover installed plugins from external packages (if any)
plugin_manager.load_setuptools_entrypoints(PLUGIN_NAMESPACE)
# Function to inject datastore into plugins that need it
def inject_datastore_into_plugins(datastore):
"""Inject the global datastore into plugins that need access to settings.
This should be called after plugins are loaded and datastore is initialized.
Args:
datastore: The global ChangeDetectionStore instance
"""
for plugin_name, plugin_obj in plugin_manager.list_name_plugin():
# Check if plugin has datastore attribute and it's not set
if hasattr(plugin_obj, 'datastore'):
if plugin_obj.datastore is None:
plugin_obj.datastore = datastore
logger.debug(f"Injected datastore into plugin: {plugin_name}")
# Function to register built-in fetchers - called later from content_fetchers/__init__.py
def register_builtin_fetchers():
"""Register built-in content fetchers as internal plugins
This is called from content_fetchers/__init__.py after all fetchers are imported
to avoid circular import issues.
"""
from changedetectionio.content_fetchers import requests, playwright, puppeteer, webdriver_selenium
# Register each built-in fetcher plugin
if hasattr(requests, 'requests_plugin'):
plugin_manager.register(requests.requests_plugin, 'builtin_requests')
if hasattr(playwright, 'playwright_plugin'):
plugin_manager.register(playwright.playwright_plugin, 'builtin_playwright')
if hasattr(puppeteer, 'puppeteer_plugin'):
plugin_manager.register(puppeteer.puppeteer_plugin, 'builtin_puppeteer')
if hasattr(webdriver_selenium, 'webdriver_selenium_plugin'):
plugin_manager.register(webdriver_selenium.webdriver_selenium_plugin, 'builtin_webdriver_selenium')
# Helper function to collect UI stats extras from all plugins
def collect_ui_edit_stats_extras(watch):
"""Collect and combine HTML content from all plugins that implement ui_edit_stats_extras"""
extras_content = []
# Get all plugins that implement the ui_edit_stats_extras hook
results = plugin_manager.hook.ui_edit_stats_extras(watch=watch)
# If we have results, add them to our content
if results:
for result in results:
if result: # Skip empty results
extras_content.append(result)
return "\n".join(extras_content) if extras_content else ""
def collect_fetcher_status_icons(fetcher_name):
"""Collect status icon data from all plugins
Args:
fetcher_name: The name of the fetcher (e.g., 'html_webdriver', 'html_js_zyte')
Returns:
dict or None: Icon data dictionary from first matching plugin, or None
"""
# Get status icon data from plugins
results = plugin_manager.hook.fetcher_status_icon(fetcher_name=fetcher_name)
# Return first non-None result
if results:
for result in results:
if result and isinstance(result, dict):
return result
return None
def get_itemprop_availability_from_plugin(content, fetcher_name, fetcher_instance, url):
"""Get itemprop availability data from plugins as a fallback.
This is called when the built-in get_itemprop_availability doesn't find good data.
Args:
content: The HTML/text content to parse
fetcher_name: The name of the fetcher being used (e.g., 'html_js_zyte')
fetcher_instance: The fetcher instance that generated the content
url: The URL being watched (watch.link - includes Jinja2 evaluation)
Returns:
dict or None: Availability data dictionary from first matching plugin, or None
"""
# Get availability data from plugins
results = plugin_manager.hook.get_itemprop_availability_override(
content=content,
fetcher_name=fetcher_name,
fetcher_instance=fetcher_instance,
url=url
)
# Return first non-None result with actual data
if results:
for result in results:
if result and isinstance(result, dict):
# Check if the result has any meaningful data
if result.get('price') is not None or result.get('availability'):
return result
return None
def get_active_plugins():
"""Get a list of active plugins with their descriptions.
Returns:
list: List of dictionaries with plugin information:
[
{'name': 'plugin_name', 'description': 'Plugin description'},
...
]
"""
active_plugins = []
# Get all registered plugins
for plugin_name, plugin_obj in plugin_manager.list_name_plugin():
# Skip built-in plugins (they start with 'builtin_')
if plugin_name.startswith('builtin_'):
continue
# Get plugin description if available
description = None
if hasattr(plugin_obj, '__doc__') and plugin_obj.__doc__:
description = plugin_obj.__doc__.strip().split('\n')[0] # First line only
elif hasattr(plugin_obj, 'description'):
description = plugin_obj.description
# Try to get a friendly name from the plugin
friendly_name = plugin_name
if hasattr(plugin_obj, 'name'):
friendly_name = plugin_obj.name
active_plugins.append({
'name': friendly_name,
'description': description or 'No description available'
})
return active_plugins
def get_fetcher_capabilities(watch, datastore):
"""Get capability flags for a watch's fetcher.
Args:
watch: The watch object/dict
datastore: The datastore to resolve 'system' fetcher
Returns:
dict: Dictionary with capability flags:
{
'supports_browser_steps': bool,
'supports_screenshots': bool,
'supports_xpath_element_data': bool
}
"""
# Get the fetcher name from watch
fetcher_name = watch.get('fetch_backend', 'system')
# Resolve 'system' to actual fetcher
if fetcher_name == 'system':
fetcher_name = datastore.data['settings']['application'].get('fetch_backend', 'html_requests')
# Get the fetcher class
from changedetectionio import content_fetchers
# Try to get from built-in fetchers first
if hasattr(content_fetchers, fetcher_name):
fetcher_class = getattr(content_fetchers, fetcher_name)
return {
'supports_browser_steps': getattr(fetcher_class, 'supports_browser_steps', False),
'supports_screenshots': getattr(fetcher_class, 'supports_screenshots', False),
'supports_xpath_element_data': getattr(fetcher_class, 'supports_xpath_element_data', False)
}
# Try to get from plugin-provided fetchers
# Query all plugins for registered fetchers
plugin_fetchers = plugin_manager.hook.register_content_fetcher()
for fetcher_registration in plugin_fetchers:
if fetcher_registration:
name, fetcher_class = fetcher_registration
if name == fetcher_name:
return {
'supports_browser_steps': getattr(fetcher_class, 'supports_browser_steps', False),
'supports_screenshots': getattr(fetcher_class, 'supports_screenshots', False),
'supports_xpath_element_data': getattr(fetcher_class, 'supports_xpath_element_data', False)
}
# Default: no capabilities
return {
'supports_browser_steps': False,
'supports_screenshots': False,
'supports_xpath_element_data': False
}
def get_plugin_settings_tabs():
"""Get all plugin settings tabs.
Returns:
list: List of dictionaries with plugin settings tab information:
[
{
'plugin_id': str,
'tab_label': str,
'form_class': Form,
'description': str
},
...
]
"""
tabs = []
results = plugin_manager.hook.plugin_settings_tab()
for result in results:
if result and isinstance(result, dict):
# Validate required fields
if 'plugin_id' in result and 'tab_label' in result and 'form_class' in result:
tabs.append(result)
else:
logger.warning(f"Invalid plugin settings tab spec: {result}")
return tabs
def load_plugin_settings(datastore_path, plugin_id):
"""Load settings for a specific plugin from JSON file.
Args:
datastore_path: Path to the datastore directory
plugin_id: Unique identifier for the plugin (e.g., 'zyte_fetcher')
Returns:
dict: Plugin settings, or empty dict if file doesn't exist
"""
import json
settings_file = os.path.join(datastore_path, f"{plugin_id}.json")
if not os.path.exists(settings_file):
return {}
try:
with open(settings_file, 'r', encoding='utf-8') as f:
return json.load(f)
except Exception as e:
logger.error(f"Failed to load settings for plugin '{plugin_id}': {e}")
return {}
def save_plugin_settings(datastore_path, plugin_id, settings):
"""Save settings for a specific plugin to JSON file.
Args:
datastore_path: Path to the datastore directory
plugin_id: Unique identifier for the plugin (e.g., 'zyte_fetcher')
settings: Dictionary of settings to save
Returns:
bool: True if save was successful, False otherwise
"""
import json
settings_file = os.path.join(datastore_path, f"{plugin_id}.json")
try:
with open(settings_file, 'w', encoding='utf-8') as f:
json.dump(settings, f, indent=2, ensure_ascii=False)
logger.info(f"Saved settings for plugin '{plugin_id}' to {settings_file}")
return True
except Exception as e:
logger.error(f"Failed to save settings for plugin '{plugin_id}': {e}")
return False
def get_plugin_template_paths():
"""Get list of plugin template directories for Jinja2 loader.
Returns:
list: List of absolute paths to plugin template directories
"""
template_paths = []
# Get all registered plugins
for plugin_name, plugin_obj in plugin_manager.list_name_plugin():
# Check if plugin has a templates directory
if hasattr(plugin_obj, '__file__'):
plugin_file = plugin_obj.__file__
elif hasattr(plugin_obj, '__module__'):
# Get the module file
module = sys.modules.get(plugin_obj.__module__)
if module and hasattr(module, '__file__'):
plugin_file = module.__file__
else:
continue
else:
continue
if plugin_file:
plugin_dir = os.path.dirname(os.path.abspath(plugin_file))
templates_dir = os.path.join(plugin_dir, 'templates')
if os.path.isdir(templates_dir):
template_paths.append(templates_dir)
logger.debug(f"Added plugin template path: {templates_dir}")
return template_paths
return "\n".join(extras_content) if extras_content else ""

View File

@@ -23,7 +23,6 @@ class difference_detection_processor():
def __init__(self, *args, datastore, watch_uuid, **kwargs):
super().__init__(*args, **kwargs)
self.datastore = datastore
self.watch_uuid = watch_uuid
self.watch = deepcopy(self.datastore.data['watching'].get(watch_uuid))
# Generic fetcher that should be extended (requests, playwright etc)
self.fetcher = Fetcher()
@@ -161,7 +160,6 @@ class difference_detection_processor():
request_method=request_method,
timeout=timeout,
url=url,
watch_uuid=self.watch_uuid,
)
#@todo .quit here could go on close object, so we can run JS if change-detected

View File

@@ -103,15 +103,15 @@ class guess_stream_type():
self.is_json = True
elif 'pdf' in magic_content_header:
self.is_pdf = True
elif has_html_patterns or http_content_header == 'text/html':
self.is_html = True
elif any(s in magic_content_header for s in JSON_CONTENT_TYPES):
self.is_json = True
# magic will call a rss document 'xml'
# Rarely do endpoints give the right header, usually just text/xml, so we check also for <rss
# This also triggers the automatic CDATA text parser so the RSS goes back a nice content list
elif '<rss' in test_content_normalized or '<feed' in test_content_normalized or any(s in magic_content_header for s in RSS_XML_CONTENT_TYPES) or '<rdf:' in test_content_normalized:
self.is_rss = True
elif has_html_patterns or http_content_header == 'text/html':
self.is_html = True
elif any(s in magic_content_header for s in JSON_CONTENT_TYPES):
self.is_json = True
elif any(s in http_content_header for s in XML_CONTENT_TYPES):
# Only mark as generic XML if not already detected as RSS
if not self.is_rss:

View File

@@ -187,8 +187,6 @@ class perform_site_check(difference_detection_processor):
itemprop_availability = {}
# Try built-in extraction first, this will scan metadata in the HTML
try:
itemprop_availability = get_itemprop_availability(self.fetcher.content)
except MoreThanOnePriceFound as e:
@@ -200,33 +198,6 @@ class perform_site_check(difference_detection_processor):
xpath_data=self.fetcher.xpath_data
)
# If built-in extraction didn't get both price AND availability, try plugin override
# Only check plugin if this watch is using a fetcher that might provide better data
has_price = itemprop_availability.get('price') is not None
has_availability = itemprop_availability.get('availability') is not None
# @TODO !!! some setting like "Use as fallback" or "always use", "t
if not (has_price and has_availability) or True:
from changedetectionio.pluggy_interface import get_itemprop_availability_from_plugin
fetcher_name = watch.get('fetch_backend', 'html_requests')
# Only try plugin override if not using system default (which might be anything)
if fetcher_name and fetcher_name != 'system':
logger.debug("Calling extra plugins for getting item price/availability")
plugin_availability = get_itemprop_availability_from_plugin(self.fetcher.content, fetcher_name, self.fetcher, watch.link)
if plugin_availability:
# Plugin provided better data, use it
plugin_has_price = plugin_availability.get('price') is not None
plugin_has_availability = plugin_availability.get('availability') is not None
# Only use plugin data if it's actually better than what we have
if plugin_has_price or plugin_has_availability:
itemprop_availability = plugin_availability
logger.info(f"Using plugin-provided availability data for fetcher '{fetcher_name}' (built-in had price={has_price}, availability={has_availability}; plugin has price={plugin_has_price}, availability={plugin_has_availability})")
if not plugin_availability:
logger.debug("No item price/availability from plugins")
# Something valid in get_itemprop_availability() by scraping metadata ?
if itemprop_availability.get('price') or itemprop_availability.get('availability'):
# Store for other usage

View File

@@ -298,7 +298,7 @@ class ContentProcessor:
xpath_filter=filter_rule.replace('xpath:', ''),
html_content=content,
append_pretty_line_formatting=not self.watch.is_source_type_url,
is_xml=stream_content_type.is_rss or stream_content_type.is_xml
is_rss=stream_content_type.is_rss
)
# XPath1 filters (first match only)
@@ -307,7 +307,7 @@ class ContentProcessor:
xpath_filter=filter_rule.replace('xpath1:', ''),
html_content=content,
append_pretty_line_formatting=not self.watch.is_source_type_url,
is_xml=stream_content_type.is_rss or stream_content_type.is_xml
is_rss=stream_content_type.is_rss
)
# JSON filters

View File

@@ -5,7 +5,7 @@ from blinker import signal
def register_watch_operation_handlers(socketio, datastore):
"""Register Socket.IO event handlers for watch operations"""
@socketio.on('watch_operation')
def handle_watch_operation(data):
"""Handle watch operations like pause, mute, recheck via Socket.IO"""

View File

@@ -32,31 +32,11 @@ class SignalHandler:
watch_favicon_bumped_signal = signal('watch_favicon_bump')
watch_favicon_bumped_signal.connect(self.handle_watch_bumped_favicon_signal, weak=False)
watch_small_status_comment_signal = signal('watch_small_status_comment')
watch_small_status_comment_signal.connect(self.handle_watch_small_status_update, weak=False)
# Connect to the notification_event signal
notification_event_signal = signal('notification_event')
notification_event_signal.connect(self.handle_notification_event, weak=False)
logger.info("SignalHandler: Connected to notification_event signal")
def handle_watch_small_status_update(self, *args, **kwargs):
"""Small simple status update, for example 'Connecting...'"""
watch_uuid = kwargs.get('watch_uuid')
status = kwargs.get('status')
if watch_uuid and status:
logger.debug(f"Socket.IO: Received watch small status update '{status}' for UUID {watch_uuid}")
# Emit the status update to all connected clients
self.socketio_instance.emit("watch_small_status_comment", {
"uuid": watch_uuid,
"status": status,
"event_timestamp": time.time()
})
def handle_signal(self, *args, **kwargs):
logger.trace(f"SignalHandler: Signal received with {len(args)} args and {len(kwargs)} kwargs")
# Safely extract the watch UUID from kwargs

View File

@@ -29,135 +29,16 @@ def cdata_in_document_to_text(html_content: str, render_anchor_tag_content=False
return re.sub(pattern, repl, html_content)
# Jinja2 template for formatting RSS/Atom feed entries
# Covers all common feedparser entry fields including namespaced elements
# Outputs HTML that will be converted to text via html_to_text
# @todo - This could be a UI setting in the future
RSS_ENTRY_TEMPLATE = """<article class="rss-item" id="{{ entry.id|replace('"', '')|replace(' ', '-') }}">{%- if entry.title -%}Title: {{ entry.title }}<br>{%- endif -%}
{%- if entry.link -%}<strong>Link:</strong> <a href="{{ entry.link }}">{{ entry.link }}</a><br>
{%- endif -%}
{%- if entry.id -%}
<strong>Guid:</strong> {{ entry.id }}<br>
{%- endif -%}
{%- if entry.published -%}
<strong>PubDate:</strong> {{ entry.published }}<br>
{%- endif -%}
{%- if entry.updated and entry.updated != entry.published -%}
<strong>Updated:</strong> {{ entry.updated }}<br>
{%- endif -%}
{%- if entry.author -%}
<strong>Author:</strong> {{ entry.author }}<br>
{%- elif entry.author_detail and entry.author_detail.name -%}
<strong>Author:</strong> {{ entry.author_detail.name }}
{%- if entry.author_detail.email %} ({{ entry.author_detail.email }}){% endif -%}
<br>
{%- endif -%}
{%- if entry.contributors -%}
<strong>Contributors:</strong> {% for contributor in entry.contributors -%}
{{ contributor.name if contributor.name else contributor }}
{%- if not loop.last %}, {% endif -%}
{%- endfor %}<br>
{%- endif -%}
{%- if entry.publisher -%}
<strong>Publisher:</strong> {{ entry.publisher }}<br>
{%- endif -%}
{%- if entry.rights -%}
<strong>Rights:</strong> {{ entry.rights }}<br>
{%- endif -%}
{%- if entry.license -%}
<strong>License:</strong> {{ entry.license }}<br>
{%- endif -%}
{%- if entry.language -%}
<strong>Language:</strong> {{ entry.language }}<br>
{%- endif -%}
{%- if entry.tags -%}
<strong>Tags:</strong> {% for tag in entry.tags -%}
{{ tag.term if tag.term else tag }}
{%- if not loop.last %}, {% endif -%}
{%- endfor %}<br>
{%- endif -%}
{%- if entry.category -%}
<strong>Category:</strong> {{ entry.category }}<br>
{%- endif -%}
{%- if entry.comments -%}
<strong>Comments:</strong> <a href="{{ entry.comments }}">{{ entry.comments }}</a><br>
{%- endif -%}
{%- if entry.slash_comments -%}
<strong>Comment Count:</strong> {{ entry.slash_comments }}<br>
{%- endif -%}
{%- if entry.enclosures -%}
<strong>Enclosures:</strong><br>
{%- for enclosure in entry.enclosures %}
- <a href="{{ enclosure.href }}">{{ enclosure.href }}</a> ({{ enclosure.type if enclosure.type else 'unknown type' }}
{%- if enclosure.length %}, {{ enclosure.length }} bytes{% endif -%}
)<br>
{%- endfor -%}
{%- endif -%}
{%- if entry.media_content -%}
<strong>Media:</strong><br>
{%- for media in entry.media_content %}
- <a href="{{ media.url }}">{{ media.url }}</a>
{%- if media.type %} ({{ media.type }}){% endif -%}
{%- if media.width and media.height %} {{ media.width }}x{{ media.height }}{% endif -%}
<br>
{%- endfor -%}
{%- endif -%}
{%- if entry.media_thumbnail -%}
<strong>Thumbnail:</strong> <a href="{{ entry.media_thumbnail[0].url if entry.media_thumbnail[0].url else entry.media_thumbnail[0] }}">{{ entry.media_thumbnail[0].url if entry.media_thumbnail[0].url else entry.media_thumbnail[0] }}</a><br>
{%- endif -%}
{%- if entry.media_description -%}
<strong>Media Description:</strong> {{ entry.media_description }}<br>
{%- endif -%}
{%- if entry.itunes_duration -%}
<strong>Duration:</strong> {{ entry.itunes_duration }}<br>
{%- endif -%}
{%- if entry.itunes_author -%}
<strong>Podcast Author:</strong> {{ entry.itunes_author }}<br>
{%- endif -%}
{%- if entry.dc_identifier -%}
<strong>Identifier:</strong> {{ entry.dc_identifier }}<br>
{%- endif -%}
{%- if entry.dc_source -%}
<strong>DC Source:</strong> {{ entry.dc_source }}<br>
{%- endif -%}
{%- if entry.dc_type -%}
<strong>Type:</strong> {{ entry.dc_type }}<br>
{%- endif -%}
{%- if entry.dc_format -%}
<strong>Format:</strong> {{ entry.dc_format }}<br>
{%- endif -%}
{%- if entry.dc_relation -%}
<strong>Related:</strong> {{ entry.dc_relation }}<br>
{%- endif -%}
{%- if entry.dc_coverage -%}
<strong>Coverage:</strong> {{ entry.dc_coverage }}<br>
{%- endif -%}
{%- if entry.source and entry.source.title -%}
<strong>Source:</strong> {{ entry.source.title }}
{%- if entry.source.link %} (<a href="{{ entry.source.link }}">{{ entry.source.link }}</a>){% endif -%}
<br>
{%- endif -%}
{%- if entry.dc_content -%}
<strong>Content:</strong> {{ entry.dc_content | safe }}
{%- elif entry.content and entry.content[0].value -%}
<strong>Content:</strong> {{ entry.content[0].value | safe }}
{%- elif entry.summary -%}
<strong>Summary:</strong> {{ entry.summary | safe }}
{%- endif -%}</article>
"""
def format_rss_items(rss_content: str, render_anchor_tag_content=False) -> str:
"""
Format RSS/Atom feed items in a readable text format using feedparser and Jinja2.
Format RSS/Atom feed items in a readable text format using feedparser.
Converts RSS <item> or Atom <entry> elements to formatted text with all available fields:
- Basic fields: title, link, id/guid, published date, updated date
- Author fields: author, author_detail, contributors, publisher
- Content fields: content, summary, description
- Metadata: tags, category, rights, license
- Media: enclosures, media_content, media_thumbnail
- Dublin Core elements: dc:creator, dc:date, dc:publisher, etc. (mapped by feedparser)
Converts RSS <item> or Atom <entry> elements to formatted text with:
- <title> → <h1>Title</h1>
- <link> → Link: [url]
- <guid> → Guid: [id]
- <pubDate> → PubDate: [date]
- <description> or <content> → Raw HTML content (CDATA and entities automatically handled)
Args:
rss_content: The RSS/Atom feed content
@@ -168,19 +49,65 @@ def format_rss_items(rss_content: str, render_anchor_tag_content=False) -> str:
"""
try:
import feedparser
from changedetectionio.jinja2_custom import safe_jinja
from xml.sax.saxutils import escape as xml_escape
# Parse the feed - feedparser handles all RSS/Atom variants, CDATA, entity unescaping, etc.
feed = feedparser.parse(rss_content)
# Determine feed type for appropriate labels
formatted_items = []
# Determine feed type for appropriate labels when fields are missing
# feedparser sets feed.version to things like 'rss20', 'atom10', etc.
is_atom = feed.version and 'atom' in feed.version
formatted_items = []
for entry in feed.entries:
# Render the entry using Jinja2 template
rendered = safe_jinja.render(RSS_ENTRY_TEMPLATE, entry=entry, is_atom=is_atom)
formatted_items.append(rendered.strip())
item_parts = []
# Title - feedparser handles CDATA and entity unescaping automatically
if hasattr(entry, 'title') and entry.title:
item_parts.append(f'<h1>{xml_escape(entry.title)}</h1>')
# Link
if hasattr(entry, 'link') and entry.link:
item_parts.append(f'Link: {xml_escape(entry.link)}<br>')
# GUID/ID
if hasattr(entry, 'id') and entry.id:
item_parts.append(f'Guid: {xml_escape(entry.id)}<br>')
# Date - feedparser normalizes all date field names to 'published'
if hasattr(entry, 'published') and entry.published:
item_parts.append(f'PubDate: {xml_escape(entry.published)}<br>')
# Description/Content - feedparser handles CDATA and entity unescaping automatically
# Only add "Summary:" label for Atom <summary> tags
content = None
add_label = False
if hasattr(entry, 'content') and entry.content:
# Atom <content> - no label, just content
content = entry.content[0].value if entry.content[0].value else None
elif hasattr(entry, 'summary'):
# Could be RSS <description> or Atom <summary>
# feedparser maps both to entry.summary
content = entry.summary if entry.summary else None
# Only add "Summary:" label for Atom feeds (which use <summary> tag)
if is_atom:
add_label = True
# Add content with or without label
if content:
if add_label:
item_parts.append(f'Summary:<br>{content}')
else:
item_parts.append(content)
else:
# No content - just show <none>
item_parts.append('&lt;none&gt;')
# Join all parts of this item
if item_parts:
formatted_items.append('\n'.join(item_parts))
# Wrap each item in a div with classes (first, last, item-N)
items_html = []
@@ -195,8 +122,7 @@ def format_rss_items(rss_content: str, render_anchor_tag_content=False) -> str:
class_str = ' '.join(classes)
items_html.append(f'<div class="{class_str}">{item}</div>')
return '<html><body>\n' + "\n<br>".join(items_html) + '\n</body></html>'
return '<html><body>\n'+"\n<br><br>".join(items_html)+'\n</body></html>'
except Exception as e:
logger.warning(f"Error formatting RSS items: {str(e)}")

View File

@@ -101,11 +101,6 @@ $(document).ready(function () {
}
});
socket.on('watch_small_status_comment', function (data) {
console.log(`Socket.IO: Operation watch_small_status_comment'${data.uuid}' status ${data.status}`);
$('tr[data-watch-uuid="' + data.uuid + '"] td.last-checked .status-text').html("&nbsp;").text(data.status);
});
socket.on('notification_event', function (data) {
console.log(`Stub handler for notification_event ${data.watch_uuid}`)
});

View File

@@ -186,6 +186,10 @@
<br>
{% endmacro %}
{% macro only_playwright_type_watches_warning() %}
<p><strong>Sorry, this functionality only works with Playwright/Chrome enabled watches.<br>You need to <a href="#request">Set the fetch method to Playwright/Chrome mode and resave</a> and have the SockpuppetBrowser/Playwright or Selenium enabled.</strong></p><br>
{% endmacro %}
{% macro render_time_schedule_form(form, available_timezones, timezone_default_config) %}
<style>
.day-schedule *, .day-schedule select {

View File

@@ -241,7 +241,7 @@
</section>
<script src="{{url_for('static_content', group='js', filename='toggle-theme.js')}}" defer></script>
<div id="checking-now-fixed-tab" style="display: none;"><span class="spinner"></span><span class="status-text">&nbsp;Checking now</span></div>
<div id="checking-now-fixed-tab" style="display: none;"><span class="spinner"></span><span>&nbsp;Checking now</span></div>
<div id="realtime-conn-error" style="display:none">Real-time updates offline</div>
</body>

View File

@@ -112,7 +112,7 @@
<div class="tip">
For now, Differences are performed on text, not graphically, only the latest screenshot is available.
</div>
{% if capabilities.get('supports_screenshots') %}
{% if is_html_webdriver %}
{% if screenshot %}
<div class="snapshot-age">{{watch_a.snapshot_screenshot_ctime|format_timestamp_timeago}}</div>
<img style="max-width: 80%" id="screenshot-img" alt="Current screenshot from most recent request" >
@@ -120,7 +120,7 @@
No screenshot available just yet! Try rechecking the page.
{% endif %}
{% else %}
<strong>Screenshot requires a Content Fetcher ( Chrome, Zyte etc ) that supports screenshots.</strong>
<strong>Screenshot requires Playwright/WebDriver enabled</strong>
{% endif %}
</div>
<div class="tab-pane-inner" id="extract">

View File

@@ -89,7 +89,7 @@
For now, Differences are performed on text, not graphically, only the latest screenshot is available.
</div>
<br>
{% if capabilities.supports_screenshots %}
{% if is_html_webdriver %}
{% if screenshot %}
<div class="snapshot-age">{{ watch.snapshot_screenshot_ctime|format_timestamp_timeago }}</div>
<img style="max-width: 80%" id="screenshot-img" alt="Current screenshot from most recent request">
@@ -97,7 +97,7 @@
No screenshot available just yet! Try rechecking the page.
{% endif %}
{% else %}
<strong>Screenshot requires a Content Fetcher ( Chrome, Zyte etc ) that supports screenshots.</strong>
<strong>Screenshot requires Playwright/WebDriver enabled</strong>
{% endif %}
</div>
</div>

View File

@@ -405,10 +405,7 @@ def test_plaintext_even_if_xml_content_and_can_apply_filters(client, live_server
follow_redirects=True
)
# Check that the string element with the correct name attribute is present
# Note: namespace declarations may be included when extracting elements, which is correct XML behavior
assert b'feed_update_receiver_name' in res.data
assert b'Abonnementen bijwerken' in res.data
assert b'&lt;string name=&#34;feed_update_receiver_name&#34;' in res.data
assert b'&lt;foobar' not in res.data
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)

View File

@@ -7,61 +7,6 @@ from flask import url_for
from .util import set_original_response, set_modified_response, live_server_setup, wait_for_all_checks, extract_rss_token_from_UI, \
extract_UUID_from_client, delete_all_watches
def set_xmlns_purl_content(datastore_path, extra=""):
data=f"""<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="https://purl.org/dc/elements/1.1/" xmlns:media="http://search.yahoo.com/mrss/" xmlns:atom="http://www.w3.org/2005/Atom" version="2.0">
<channel>
<atom:link href="https://www.xxxxxxxtechxxxxx.com/feeds.xml" rel="self" type="application/rss+xml"/>
<title>
<![CDATA[ Latest from xxxxxxxtechxxxxx ]]>
</title>
<link>https://www.xxxxx.com</link>
<description>
<![CDATA[ All the latest content from the xxxxxxxtechxxxxx team ]]>
</description>
<lastBuildDate>Wed, 19 Nov 2025 15:00:00 +0000</lastBuildDate>
<language>en</language>
<item>
<title>
<![CDATA[ Sony Xperia 1 VII review: has Sonys long-standing Xperia family lost what it takes to compete? ]]>
</title>
<dc:content>
<![CDATA[ {{extra}} a little harder, dc-content. blue often quite tough and purple usually very difficult.</p><p>On the plus side, you don't technically need to solve the final one, as you'll be able to answer that one by a process of elimination. What's more, you can make up to four mistakes, which gives you a little bit of breathing room.</p><p>It's a little more involved than something like Wordle, however, and there are plenty of opportunities for the game to trip you up with tricks. For instance, watch out for homophones and other word games that could disguise the answers.</p><p>It's playable for free via the <a href="https://www.nytimes.com/games/strands" target="_blank">NYT Games site</a> on desktop or mobile.</p></article></section> ]]>
</dc:content>
<link>https://www.xxxxxxx.com/gaming/nyt-connections-today-answers-hints-20-november-2025</link>
<description>
<![CDATA[ Looking for NYT Connections answers and hints? Here's all you need to know to solve today's game, plus my commentary on the puzzles. ]]>
</description>
<guid isPermaLink="false">N2C2T6DztpWdxSdKpSUx89</guid>
<enclosure url="https://cdn.mos.cms.futurecdn.net/RCGfdf3yhQ9W3MHbTRT6yk-1280-80.jpg" type="image/jpeg" length="0"/>
<pubDate>Wed, 19 Nov 2025 15:00:00 +0000</pubDate>
<category>
<![CDATA[ Gaming ]]>
</category>
<dc:creator>
<![CDATA[ Johnny Dee ]]>
</dc:creator>
<media:content type="image/jpeg" url="https://cdn.mos.cms.futurecdn.net/RCGfdf3yhQ9W3MHbTRT6yk-1280-80.jpg">
<media:credit>
<![CDATA[ New York Times ]]>
</media:credit>
<media:text>
<![CDATA[ NYT Connections homescreen on a phone, on a purple background ]]>
</media:text>
<media:title type="plain">
<![CDATA[ NYT Connections homescreen on a phone, on a purple background ]]>
</media:title>
</media:content>
<media:thumbnail url="https://cdn.mos.cms.futurecdn.net/RCGfdf3yhQ9W3MHbTRT6yk-1280-80.jpg"/>
</item>
</channel>
</rss>
"""
with open(os.path.join(datastore_path, "endpoint-content.txt"), "w") as f:
f.write(data)
def set_original_cdata_xml(datastore_path):
test_return_data = """<rss xmlns:atom="http://www.w3.org/2005/Atom" version="2.0">
@@ -153,26 +98,3 @@ def test_rss_reader_mode_with_css_filters(client, live_server, measure_memory_us
assert 'The days of Terminator and The Matrix' in snapshot_contents
delete_all_watches(client)
def test_xmlns_purl_content(client, live_server, measure_memory_usage, datastore_path):
set_xmlns_purl_content(datastore_path=datastore_path)
# Rarely do endpoints give the right header, usually just text/xml, so we check also for <rss
# This also triggers the automatic CDATA text parser so the RSS goes back a nice content list
#test_url = url_for('test_endpoint', content_type="text/xml; charset=UTF-8", _external=True)
# Because NO utf-8 was specified here, we should be able to recover it in requests or other somehow.
test_url = url_for('test_endpoint', content_type="text/xml;", _external=True)
live_server.app.config['DATASTORE'].data['settings']['application']['rss_reader_mode'] = True
# Add our URL to the import page
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url, extras={'include_filters': [".last"]})
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
watch = live_server.app.config['DATASTORE'].data['watching'][uuid]
dates = list(watch.history.keys())
snapshot_contents = watch.get_history_snapshot(timestamp=dates[0])
assert "Title: Sony Xperia 1 VII review: has Sonys long-standing Xperia family lost what it takes to compete?" in snapshot_contents
assert "dc-content" in snapshot_contents

View File

@@ -84,14 +84,14 @@ class TestXPathDefaultNamespace:
def test_atom_feed_simple_xpath_with_xpath_filter(self):
"""Test that //title/text() works on Atom feed with default namespace using xpath_filter."""
result = html_tools.xpath_filter('//title/text()', atom_feed_with_default_ns, is_xml=True)
result = html_tools.xpath_filter('//title/text()', atom_feed_with_default_ns, is_rss=True)
assert 'Release notes from PowerToys' in result
assert 'Release 0.95.1' in result
assert 'Release v0.95.0' in result
def test_atom_feed_nested_xpath_with_xpath_filter(self):
"""Test nested XPath like //entry/title/text() on Atom feed."""
result = html_tools.xpath_filter('//entry/title/text()', atom_feed_with_default_ns, is_xml=True)
result = html_tools.xpath_filter('//entry/title/text()', atom_feed_with_default_ns, is_rss=True)
assert 'Release 0.95.1' in result
assert 'Release v0.95.0' in result
# Should NOT include the feed title
@@ -99,20 +99,20 @@ class TestXPathDefaultNamespace:
def test_atom_feed_other_elements_with_xpath_filter(self):
"""Test that other elements like //updated/text() work on Atom feed."""
result = html_tools.xpath_filter('//updated/text()', atom_feed_with_default_ns, is_xml=True)
result = html_tools.xpath_filter('//updated/text()', atom_feed_with_default_ns, is_rss=True)
assert '2025-10-23T08:53:12Z' in result
assert '2025-10-24T14:20:14Z' in result
def test_rss_feed_without_namespace(self):
"""Test that //title/text() works on RSS feed without default namespace."""
result = html_tools.xpath_filter('//title/text()', rss_feed_no_default_ns, is_xml=True)
result = html_tools.xpath_filter('//title/text()', rss_feed_no_default_ns, is_rss=True)
assert 'Channel Title' in result
assert 'Item 1 Title' in result
assert 'Item 2 Title' in result
def test_rss_feed_nested_xpath(self):
"""Test nested XPath on RSS feed without default namespace."""
result = html_tools.xpath_filter('//item/title/text()', rss_feed_no_default_ns, is_xml=True)
result = html_tools.xpath_filter('//item/title/text()', rss_feed_no_default_ns, is_rss=True)
assert 'Item 1 Title' in result
assert 'Item 2 Title' in result
# Should NOT include channel title
@@ -120,31 +120,31 @@ class TestXPathDefaultNamespace:
def test_rss_feed_with_prefixed_namespaces(self):
"""Test that feeds with namespace prefixes (not default) still work."""
result = html_tools.xpath_filter('//title/text()', rss_feed_with_ns_prefix, is_xml=True)
result = html_tools.xpath_filter('//title/text()', rss_feed_with_ns_prefix, is_rss=True)
assert 'Channel Title' in result
assert 'Item Title' in result
def test_local_name_workaround_still_works(self):
"""Test that local-name() workaround still works for Atom feeds."""
result = html_tools.xpath_filter('//*[local-name()="title"]/text()', atom_feed_with_default_ns, is_xml=True)
result = html_tools.xpath_filter('//*[local-name()="title"]/text()', atom_feed_with_default_ns, is_rss=True)
assert 'Release notes from PowerToys' in result
assert 'Release 0.95.1' in result
def test_xpath1_filter_without_default_namespace(self):
"""Test xpath1_filter works on RSS without default namespace."""
result = html_tools.xpath1_filter('//title/text()', rss_feed_no_default_ns, is_xml=True)
result = html_tools.xpath1_filter('//title/text()', rss_feed_no_default_ns, is_rss=True)
assert 'Channel Title' in result
assert 'Item 1 Title' in result
def test_xpath1_filter_with_default_namespace_returns_empty(self):
"""Test that xpath1_filter returns empty on Atom with default namespace (known limitation)."""
result = html_tools.xpath1_filter('//title/text()', atom_feed_with_default_ns, is_xml=True)
result = html_tools.xpath1_filter('//title/text()', atom_feed_with_default_ns, is_rss=True)
# xpath1_filter (lxml) doesn't support default namespaces, so this returns empty
assert result == ''
def test_xpath1_filter_local_name_workaround(self):
"""Test that xpath1_filter works with local-name() workaround on Atom feeds."""
result = html_tools.xpath1_filter('//*[local-name()="title"]/text()', atom_feed_with_default_ns, is_xml=True)
result = html_tools.xpath1_filter('//*[local-name()="title"]/text()', atom_feed_with_default_ns, is_rss=True)
assert 'Release notes from PowerToys' in result
assert 'Release 0.95.1' in result

View File

@@ -201,120 +201,3 @@ def test_trips(html_content, xpath, answer):
html_content = html_tools.xpath_filter(xpath, html_content, append_pretty_line_formatting=True)
assert type(html_content) == str
assert answer in html_content
# Test for UTF-8 encoding bug fix (issue #3658)
# Polish and other UTF-8 characters should be preserved correctly
polish_html = """<!DOCTYPE html>
<html>
<head><meta charset="utf-8"></head>
<body>
<div class="index--s-headline-link">
<a class="index--s-headline-link" href="#">
Naukowcy potwierdzają: oglądanie krótkich filmików prowadzi do "zgnilizny mózgu"
</a>
</div>
<div>
<a class="other-class" href="#">
Test with Polish chars: żółć ąę śń
</a>
</div>
<div>
<p class="unicode-test">Cyrillic: Привет мир</p>
<p class="unicode-test">Greek: Γειά σου κόσμε</p>
<p class="unicode-test">Arabic: مرحبا بالعالم</p>
<p class="unicode-test">Chinese: 你好世界</p>
<p class="unicode-test">Japanese: こんにちは世界</p>
<p class="unicode-test">Emoji: 🌍🎉✨</p>
</div>
</body>
</html>
"""
@pytest.mark.parametrize("html_content", [polish_html])
@pytest.mark.parametrize("xpath, expected_text", [
# Test Polish characters in xpath_filter
('//a[(contains(@class,"index--s-headline-link"))]', 'Naukowcy potwierdzają'),
('//a[(contains(@class,"index--s-headline-link"))]', 'oglądanie krótkich filmików'),
('//a[(contains(@class,"index--s-headline-link"))]', 'zgnilizny mózgu'),
('//a[@class="other-class"]', 'żółć ąę śń'),
# Test various Unicode scripts
('//p[@class="unicode-test"]', 'Привет мир'),
('//p[@class="unicode-test"]', 'Γειά σου κόσμε'),
('//p[@class="unicode-test"]', 'مرحبا بالعالم'),
('//p[@class="unicode-test"]', '你好世界'),
('//p[@class="unicode-test"]', 'こんにちは世界'),
('//p[@class="unicode-test"]', '🌍🎉✨'),
# Test with text() extraction
('//a[@class="other-class"]/text()', 'żółć'),
])
def test_xpath_utf8_encoding(html_content, xpath, expected_text):
"""Test that XPath filters preserve UTF-8 characters correctly (issue #3658)"""
result = html_tools.xpath_filter(xpath, html_content, append_pretty_line_formatting=False)
assert type(result) == str
assert expected_text in result
# Ensure characters are NOT HTML-entity encoded
# For example, 'ą' should NOT become '&#261;'
assert '&#' not in result or expected_text in result
@pytest.mark.parametrize("html_content", [polish_html])
@pytest.mark.parametrize("xpath, expected_text", [
# Test Polish characters in xpath1_filter
('//a[(contains(@class,"index--s-headline-link"))]', 'Naukowcy potwierdzają'),
('//a[(contains(@class,"index--s-headline-link"))]', 'mózgu'),
('//a[@class="other-class"]', 'żółć ąę śń'),
# Test various Unicode scripts with xpath1
('//p[@class="unicode-test" and contains(text(), "Cyrillic")]', 'Привет мир'),
('//p[@class="unicode-test" and contains(text(), "Greek")]', 'Γειά σου'),
('//p[@class="unicode-test" and contains(text(), "Chinese")]', '你好世界'),
])
def test_xpath1_utf8_encoding(html_content, xpath, expected_text):
"""Test that XPath1 filters preserve UTF-8 characters correctly"""
result = html_tools.xpath1_filter(xpath, html_content, append_pretty_line_formatting=False)
assert type(result) == str
assert expected_text in result
# Ensure characters are NOT HTML-entity encoded
assert '&#' not in result or expected_text in result
# Test with real-world example from wyborcza.pl (issue #3658)
wyborcza_style_html = """<!DOCTYPE html>
<html lang="pl">
<head><meta charset="utf-8"></head>
<body>
<div class="article-list">
<a class="index--s-headline-link" href="/article1">
Naukowcy potwierdzają: oglądanie krótkich filmików prowadzi do "zgnilizny mózgu"
</a>
<a class="index--s-headline-link" href="/article2">
Zmiany klimatyczne wpływają na życie w miastach
</a>
<a class="index--s-headline-link" href="/article3">
Łódź: Nowe inwestycje w infrastrukturę miejską
</a>
</div>
</body>
</html>
"""
def test_wyborcza_real_world_example():
"""Test real-world case from wyborcza.pl that was failing (issue #3658)"""
xpath = '//a[(contains(@class,"index--s-headline-link"))]'
result = html_tools.xpath_filter(xpath, wyborcza_style_html, append_pretty_line_formatting=False)
# These exact strings should appear in the result
assert 'Naukowcy potwierdzają' in result
assert 'oglądanie krótkich filmików' in result
assert 'zgnilizny mózgu' in result
assert 'Łódź' in result
# Make sure they're NOT corrupted to mojibake like "potwierdzajÄ"
assert 'potwierdzajÄ' not in result
assert 'ogl&#261;danie' not in result
assert 'm&#243;zgu' not in result