Compare commits

...

41 Commits

Author SHA1 Message Date
dgtlmoon
79166c0307 Merge branch 'master' into conditions-then-plugins
Some checks failed
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Build distribution 📦 (push) Has been cancelled
ChangeDetection.io App Test / lint-code (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Test the built 📦 package works basically. (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Publish Python 🐍 distribution 📦 to PyPI (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-10 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-11 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-12 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-13 (push) Has been cancelled
2025-03-17 19:22:48 +01:00
dgtlmoon
9dbe91e470 WIP 2025-03-17 19:17:18 +01:00
dgtlmoon
51bd8cd2d7 WIP 2025-03-17 19:08:08 +01:00
dgtlmoon
35455e7dd6 plugins refactor 2025-03-17 18:51:01 +01:00
dgtlmoon
aaa038f082 Move operation 2025-03-17 18:28:53 +01:00
dgtlmoon
57eeb221cb Merge branch 'conditions' into conditions-then-plugins 2025-03-17 18:24:16 +01:00
dgtlmoon
8187b9ce4c add test, mov operators 2025-03-17 18:19:16 +01:00
dgtlmoon
cc70b65bfa length min/max 2025-03-17 18:03:28 +01:00
dgtlmoon
42099f1fff tweak tests 2025-03-17 17:56:51 +01:00
dgtlmoon
408864d346 move conditions blueprint 2025-03-17 17:48:41 +01:00
dgtlmoon
02b8660bf3 Fix imported operations 2025-03-17 17:37:07 +01:00
dgtlmoon
947a60af89 test tweak
Some checks are pending
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Build distribution 📦 (push) Waiting to run
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Test the built 📦 package works basically. (push) Blocked by required conditions
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Publish Python 🐍 distribution 📦 to PyPI (push) Blocked by required conditions
ChangeDetection.io App Test / lint-code (push) Waiting to run
ChangeDetection.io App Test / test-application-3-10 (push) Blocked by required conditions
ChangeDetection.io App Test / test-application-3-11 (push) Blocked by required conditions
ChangeDetection.io App Test / test-application-3-12 (push) Blocked by required conditions
ChangeDetection.io App Test / test-application-3-13 (push) Blocked by required conditions
2025-03-17 17:28:01 +01:00
dgtlmoon
a0f4cb4d65 Add helper text 2025-03-17 17:24:33 +01:00
dgtlmoon
71ea8d80f3 WIP 2025-03-17 17:21:56 +01:00
dgtlmoon
4f48958187 WIP 2025-03-17 11:48:35 +01:00
dgtlmoon
2608980b1d test fixup 2025-03-17 11:20:23 +01:00
dgtlmoon
c982395d72 WIP 2025-03-17 11:00:29 +01:00
dgtlmoon
ee7e43ea87 Logic fix 2025-03-17 10:47:41 +01:00
dgtlmoon
da5585b53c Include conditions module 2025-03-17 10:15:45 +01:00
dgtlmoon
76062c9419 Merge branch 'conditions' into conditions-then-plugins 2025-03-17 10:10:23 +01:00
dgtlmoon
675953797c Fix up logic 2025-03-17 10:09:36 +01:00
dgtlmoon
b202652a93 Refactoring conditions and plugins 2025-03-17 09:38:24 +01:00
dgtlmoon
617dc721bf WIP
Some checks failed
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Build distribution 📦 (push) Has been cancelled
ChangeDetection.io App Test / lint-code (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Test the built 📦 package works basically. (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Publish Python 🐍 distribution 📦 to PyPI (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-10 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-11 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-12 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-13 (push) Has been cancelled
2025-03-16 09:14:54 +01:00
dgtlmoon
ec13720694 safety checks
Some checks failed
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Build distribution 📦 (push) Has been cancelled
ChangeDetection.io App Test / lint-code (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Test the built 📦 package works basically. (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Publish Python 🐍 distribution 📦 to PyPI (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-10 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-11 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-12 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-13 (push) Has been cancelled
2025-03-14 18:15:05 +01:00
dgtlmoon
ddacb0bcbc small tidyups 2025-03-14 17:48:31 +01:00
dgtlmoon
f67d98b839 WIP 2025-03-14 17:41:48 +01:00
dgtlmoon
beee93d528 WIP 2025-03-14 17:18:48 +01:00
dgtlmoon
987ab3e494 Some tidyup
Some checks are pending
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Build distribution 📦 (push) Waiting to run
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Test the built 📦 package works basically. (push) Blocked by required conditions
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Publish Python 🐍 distribution 📦 to PyPI (push) Blocked by required conditions
ChangeDetection.io App Test / lint-code (push) Waiting to run
ChangeDetection.io App Test / test-application-3-10 (push) Blocked by required conditions
ChangeDetection.io App Test / test-application-3-11 (push) Blocked by required conditions
ChangeDetection.io App Test / test-application-3-12 (push) Blocked by required conditions
ChangeDetection.io App Test / test-application-3-13 (push) Blocked by required conditions
2025-03-13 23:58:18 +01:00
dgtlmoon
0c68cfffb1 Bumping namespace 2025-03-13 23:45:20 +01:00
dgtlmoon
e93a9244fe WIP
Some checks failed
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Build distribution 📦 (push) Waiting to run
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Test the built 📦 package works basically. (push) Blocked by required conditions
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Publish Python 🐍 distribution 📦 to PyPI (push) Blocked by required conditions
ChangeDetection.io App Test / lint-code (push) Waiting to run
ChangeDetection.io App Test / test-application-3-10 (push) Blocked by required conditions
ChangeDetection.io App Test / test-application-3-11 (push) Blocked by required conditions
ChangeDetection.io App Test / test-application-3-12 (push) Blocked by required conditions
ChangeDetection.io App Test / test-application-3-13 (push) Blocked by required conditions
ChangeDetection.io Container Build Test / test-container-build (push) Has been cancelled
2025-03-13 23:19:09 +01:00
dgtlmoon
e56eec41c1 move to own plugin, tidyup 2025-03-13 17:48:52 +01:00
dgtlmoon
31f4bb7cc3 Merge branch 'master' into conditions
Some checks are pending
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Build distribution 📦 (push) Waiting to run
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Test the built 📦 package works basically. (push) Blocked by required conditions
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Publish Python 🐍 distribution 📦 to PyPI (push) Blocked by required conditions
ChangeDetection.io Container Build Test / test-container-build (push) Waiting to run
ChangeDetection.io App Test / lint-code (push) Waiting to run
ChangeDetection.io App Test / test-application-3-10 (push) Blocked by required conditions
ChangeDetection.io App Test / test-application-3-11 (push) Blocked by required conditions
ChangeDetection.io App Test / test-application-3-12 (push) Blocked by required conditions
ChangeDetection.io App Test / test-application-3-13 (push) Blocked by required conditions
2025-03-13 13:39:01 +01:00
dgtlmoon
f08efde110 Merge branch 'master' into conditions
Some checks failed
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Build distribution 📦 (push) Has been cancelled
ChangeDetection.io App Test / lint-code (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Test the built 📦 package works basically. (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Publish Python 🐍 distribution 📦 to PyPI (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-10 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-11 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-12 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-13 (push) Has been cancelled
2025-02-10 10:35:50 +01:00
dgtlmoon
9b39b2853b Adding pluggy
Some checks failed
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Build distribution 📦 (push) Waiting to run
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Test the built 📦 package works basically. (push) Blocked by required conditions
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Publish Python 🐍 distribution 📦 to PyPI (push) Blocked by required conditions
ChangeDetection.io App Test / lint-code (push) Waiting to run
ChangeDetection.io App Test / test-application-3-10 (push) Blocked by required conditions
ChangeDetection.io App Test / test-application-3-11 (push) Blocked by required conditions
ChangeDetection.io App Test / test-application-3-12 (push) Blocked by required conditions
ChangeDetection.io App Test / test-application-3-13 (push) Blocked by required conditions
ChangeDetection.io Container Build Test / test-container-build (push) Has been cancelled
2025-02-09 20:50:57 +01:00
dgtlmoon
892d38ba42 experiment with default plugins 2025-02-09 16:52:58 +01:00
dgtlmoon
b170e191d4 fix validators
Some checks are pending
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Build distribution 📦 (push) Waiting to run
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Test the built 📦 package works basically. (push) Blocked by required conditions
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Publish Python 🐍 distribution 📦 to PyPI (push) Blocked by required conditions
ChangeDetection.io App Test / lint-code (push) Waiting to run
ChangeDetection.io App Test / test-application-3-10 (push) Blocked by required conditions
ChangeDetection.io App Test / test-application-3-11 (push) Blocked by required conditions
ChangeDetection.io App Test / test-application-3-12 (push) Blocked by required conditions
ChangeDetection.io App Test / test-application-3-13 (push) Blocked by required conditions
2025-02-09 16:33:11 +01:00
dgtlmoon
edb78efcca handle non set condition 2025-02-09 16:25:09 +01:00
dgtlmoon
383f90b70c nah 2025-02-09 00:14:47 +01:00
dgtlmoon
6948418865 WIP 2025-02-09 00:13:47 +01:00
dgtlmoon
cd80e317f3 Some more ideas 2025-02-09 00:08:23 +01:00
dgtlmoon
8c26210804 Initial WIP for adding support for CONDITIONS 2025-02-09 00:03:01 +01:00
18 changed files with 1328 additions and 52 deletions

View File

@@ -712,23 +712,63 @@ def changedetection_app(config=None, datastore_o=None):
# Does it use some custom form? does one exist?
processor_name = datastore.data['watching'][uuid].get('processor', '')
processor_classes = next((tpl for tpl in find_processors() if tpl[1] == processor_name), None)
# If it's not found in traditional processors, check if it's a pluggy plugin
if not processor_classes:
flash(f"Cannot load the edit form for processor/plugin '{processor_classes[1]}', plugin missing?", 'error')
return redirect(url_for('index'))
parent_module = get_parent_module(processor_classes[0])
try:
# Get the parent of the "processor.py" go up one, get the form (kinda spaghetti but its reusing existing code)
forms_module = importlib.import_module(f"{parent_module.__name__}.forms")
# Access the 'processor_settings_form' class from the 'forms' module
form_class = getattr(forms_module, 'processor_settings_form')
except ModuleNotFoundError as e:
# .forms didnt exist
form_class = forms.processor_text_json_diff_form
except AttributeError as e:
# .forms exists but no useful form
form_class = forms.processor_text_json_diff_form
try:
from changedetectionio.processors.processor_registry import get_processor_form, _get_plugin_name_map
# Get all available plugins for debugging
available_plugins = list(_get_plugin_name_map().keys())
logger.debug(f"Available processor plugins: {available_plugins}")
# Try to get the processor form
plugin_form_class = get_processor_form(processor_name)
if plugin_form_class:
# Use default text_json_diff_form as parent module for plugins
from changedetectionio.processors.text_json_diff import processor as text_json_diff_processor
form_class = forms.processor_text_json_diff_form
parent_module = get_parent_module(text_json_diff_processor)
# Skip the normal form loading code path
use_plugin_form = True
logger.debug(f"Successfully loaded form for plugin '{processor_name}'")
else:
# Check if the plugin is registered but doesn't have a form
if processor_name in available_plugins:
logger.error(f"Plugin '{processor_name}' is registered but has no form class")
flash(f"Plugin '{processor_name}' is registered but has no form class", 'error')
else:
logger.error(f"Cannot find plugin '{processor_name}'. Available plugins: {available_plugins}")
flash(f"Cannot load the edit form for processor/plugin '{processor_name}', plugin missing?", 'error')
return redirect(url_for('index'))
except ImportError as e:
logger.error(f"Import error when loading plugin form: {str(e)}")
flash(f"Cannot load the edit form for processor/plugin '{processor_name}', plugin system not available?", 'error')
return redirect(url_for('index'))
except Exception as e:
logger.error(f"Unexpected error loading plugin form: {str(e)}")
flash(f"Error loading plugin form: {str(e)}", 'error')
return redirect(url_for('index'))
else:
# Traditional processor - continue with normal flow
parent_module = get_parent_module(processor_classes[0])
use_plugin_form = False
# Only follow this path for traditional processors
if not use_plugin_form:
try:
# Get the parent of the "processor.py" go up one, get the form (kinda spaghetti but its reusing existing code)
forms_module = importlib.import_module(f"{parent_module.__name__}.forms")
# Access the 'processor_settings_form' class from the 'forms' module
form_class = getattr(forms_module, 'processor_settings_form')
except ModuleNotFoundError as e:
# .forms didnt exist
form_class = forms.processor_text_json_diff_form
except AttributeError as e:
# .forms exists but no useful form
form_class = forms.processor_text_json_diff_form
form = form_class(formdata=request.form if request.method == 'POST' else None,
data=default,

View File

@@ -67,7 +67,6 @@ class model(watch_base):
@property
def link(self):
url = self.get('url', '')
if not is_safe_url(url):
return 'DISABLED'
@@ -93,6 +92,19 @@ class model(watch_base):
# Also double check it after any Jinja2 formatting just incase
if not is_safe_url(ready_url):
return 'DISABLED'
# Check if a processor wants to customize the display link
processor_name = self.get('processor')
if processor_name:
try:
# Import here to avoid circular imports
from changedetectionio.processors.processor_registry import get_display_link
custom_link = get_display_link(url=ready_url, processor_name=processor_name)
if custom_link:
return custom_link
except Exception as e:
logger.error(f"Error getting custom display link for processor {processor_name}: {str(e)}")
return ready_url
def clear_watch(self):

View File

@@ -3,6 +3,7 @@ from changedetectionio.content_fetchers.base import Fetcher
from changedetectionio.strtobool import strtobool
from copy import deepcopy
from loguru import logger
import hashlib
import importlib
import inspect
@@ -10,6 +11,10 @@ import os
import pkgutil
import re
# Import the plugin manager
from .pluggy_interface import plugin_manager
class difference_detection_processor():
browser_steps = None
@@ -26,9 +31,95 @@ class difference_detection_processor():
self.watch = deepcopy(self.datastore.data['watching'].get(watch_uuid))
# Generic fetcher that should be extended (requests, playwright etc)
self.fetcher = Fetcher()
def _get_proxy_for_watch(self, preferred_proxy_id=None):
"""Get proxy configuration based on watch settings and preferred proxy ID
Args:
preferred_proxy_id: Optional explicit proxy ID to use
Returns:
dict: Proxy configuration or None if no proxy should be used
str: Proxy URL or None if no proxy should be used
"""
# Default to no proxy config
proxy_config = None
proxy_url = None
# Check if datastore is available and has get_preferred_proxy_for_watch method
if hasattr(self, 'datastore') and self.datastore:
try:
# Get preferred proxy ID if not provided
if not preferred_proxy_id and hasattr(self.datastore, 'get_preferred_proxy_for_watch'):
# Get the watch UUID if available
watch_uuid = None
if hasattr(self.watch, 'get'):
watch_uuid = self.watch.get('uuid')
elif hasattr(self.watch, 'uuid'):
watch_uuid = self.watch.uuid
if watch_uuid:
preferred_proxy_id = self.datastore.get_preferred_proxy_for_watch(uuid=watch_uuid)
# Check if we have a proxy list and a valid proxy ID
if preferred_proxy_id and hasattr(self.datastore, 'proxy_list') and self.datastore.proxy_list:
proxy_info = self.datastore.proxy_list.get(preferred_proxy_id)
if proxy_info and 'url' in proxy_info:
proxy_url = proxy_info.get('url')
logger.debug(f"Selected proxy key '{preferred_proxy_id}' as proxy URL '{proxy_url}'")
# Parse the proxy URL to build a proxy dict for requests
import urllib.parse
parsed_proxy = urllib.parse.urlparse(proxy_url)
proxy_type = parsed_proxy.scheme
# Extract credentials if present
username = None
password = None
if parsed_proxy.username:
username = parsed_proxy.username
if parsed_proxy.password:
password = parsed_proxy.password
# Build the proxy URL without credentials for the proxy dict
netloc = parsed_proxy.netloc
if '@' in netloc:
netloc = netloc.split('@')[1]
proxy_addr = f"{proxy_type}://{netloc}"
# Create the proxy configuration
proxy_config = {
'http': proxy_addr,
'https': proxy_addr
}
# Add credentials if present
if username:
proxy_config['username'] = username
if password:
proxy_config['password'] = password
except Exception as e:
# Log the error but continue without a proxy
logger.error(f"Error setting up proxy: {str(e)}")
proxy_config = None
proxy_url = None
return proxy_config, proxy_url
def call_browser(self, preferred_proxy_id=None):
"""Fetch content using the appropriate browser/fetcher
This method will:
1. Determine the appropriate fetcher to use based on watch settings
2. Set up proxy configuration if needed
3. Initialize the fetcher with the correct parameters
4. Configure any browser steps if needed
Args:
preferred_proxy_id: Optional explicit proxy ID to use
"""
from requests.structures import CaseInsensitiveDict
url = self.watch.link
@@ -43,8 +134,8 @@ class difference_detection_processor():
# Requests, playwright, other browser via wss:// etc, fetch_extra_something
prefer_fetch_backend = self.watch.get('fetch_backend', 'system')
# Proxy ID "key"
preferred_proxy_id = preferred_proxy_id if preferred_proxy_id else self.datastore.get_preferred_proxy_for_watch(uuid=self.watch.get('uuid'))
# Get proxy configuration
proxy_config, proxy_url = self._get_proxy_for_watch(preferred_proxy_id)
# Pluggable content self.fetcher
if not prefer_fetch_backend or prefer_fetch_backend == 'system':
@@ -82,14 +173,10 @@ class difference_detection_processor():
# What it referenced doesnt exist, Just use a default
fetcher_obj = getattr(content_fetchers, "html_requests")
proxy_url = None
if preferred_proxy_id:
# Custom browser endpoints should NOT have a proxy added
if not prefer_fetch_backend.startswith('extra_browser_'):
proxy_url = self.datastore.proxy_list.get(preferred_proxy_id).get('url')
logger.debug(f"Selected proxy key '{preferred_proxy_id}' as proxy URL '{proxy_url}' for {url}")
else:
logger.debug(f"Skipping adding proxy data when custom Browser endpoint is specified. ")
# Custom browser endpoints should NOT have a proxy added
if proxy_url and prefer_fetch_backend.startswith('extra_browser_'):
logger.debug(f"Skipping adding proxy data when custom Browser endpoint is specified.")
proxy_url = None
# Now call the fetcher (playwright/requests/etc) with arguments that only a fetcher would need.
# When browser_connection_url is None, it method should default to working out whats the best defaults (os env vars etc)
@@ -185,9 +272,9 @@ def find_sub_packages(package_name):
def find_processors():
"""
Find all subclasses of DifferenceDetectionProcessor in the specified package.
Find all subclasses of DifferenceDetectionProcessor in the specified package
and also include processors from the plugin system.
:param package_name: The name of the package to scan for processor modules.
:return: A list of (module, class) tuples.
"""
package_name = "changedetectionio.processors" # Name of the current package/module
@@ -195,6 +282,7 @@ def find_processors():
processors = []
sub_packages = find_sub_packages(package_name)
# Find traditional processors
for sub_package in sub_packages:
module_name = f"{package_name}.{sub_package}.processor"
try:
@@ -207,6 +295,15 @@ def find_processors():
except (ModuleNotFoundError, ImportError) as e:
logger.warning(f"Failed to import module {module_name}: {e} (find_processors())")
# Also include processors from the plugin system
try:
from .processor_registry import get_plugin_processor_modules
plugin_modules = get_plugin_processor_modules()
if plugin_modules:
processors.extend(plugin_modules)
except (ImportError, ModuleNotFoundError) as e:
logger.warning(f"Failed to import plugin modules: {e} (find_processors())")
return processors
@@ -223,8 +320,22 @@ def get_parent_module(module):
return False
def get_custom_watch_obj_for_processor(processor_name):
"""
Get the custom watch object for a processor
:param processor_name: Name of the processor
:return: Watch class or None
"""
# First, try to get the watch model from the pluggy system
try:
from .processor_registry import get_processor_watch_model
watch_model = get_processor_watch_model(processor_name)
if watch_model:
return watch_model
except Exception as e:
logger.warning(f"Error getting processor watch model from pluggy: {e}")
# Fall back to the traditional approach
from changedetectionio.model import Watch
watch_class = Watch.model
processor_classes = find_processors()
@@ -241,14 +352,47 @@ def get_custom_watch_obj_for_processor(processor_name):
def available_processors():
"""
Get a list of processors by name and description for the UI elements
:return: A list :)
:return: A list of tuples (processor_name, description)
"""
processor_classes = find_processors()
available = []
for package, processor_class in processor_classes:
available.append((processor_class, package.name))
return available
# Get processors from the pluggy system
pluggy_processors = []
try:
from .processor_registry import get_all_processors
pluggy_processors = get_all_processors()
except Exception as e:
logger.error(f"Error getting processors from pluggy: {str(e)}")
# Get processors from the traditional file-based system
traditional_processors = []
try:
# Let's not use find_processors() directly since it now also includes pluggy processors
package_name = "changedetectionio.processors"
sub_packages = find_sub_packages(package_name)
for sub_package in sub_packages:
module_name = f"{package_name}.{sub_package}.processor"
try:
module = importlib.import_module(module_name)
# Get the name and description from the module if available
name = getattr(module, 'name', f"Traditional processor: {sub_package}")
description = getattr(module, 'description', sub_package)
traditional_processors.append((sub_package, name))
except (ModuleNotFoundError, ImportError, AttributeError) as e:
logger.warning(f"Failed to import module {module_name}: {e} (available_processors())")
except Exception as e:
logger.error(f"Error getting traditional processors: {str(e)}")
# Combine the lists, ensuring no duplicates
# Pluggy processors take precedence
all_processors = []
# Add all pluggy processors
all_processors.extend(pluggy_processors)
# Add traditional processors that aren't already registered via pluggy
pluggy_processor_names = [name for name, _ in pluggy_processors]
for processor_class, name in traditional_processors:
if processor_class not in pluggy_processor_names:
all_processors.append((processor_class, name))
return all_processors

View File

@@ -0,0 +1,17 @@
from wtforms import (
BooleanField,
validators,
RadioField
)
from wtforms.fields.choices import SelectField
from wtforms.fields.form import FormField
from wtforms.form import Form
class BaseProcessorForm(Form):
"""Base class for processor forms"""
def extra_tab_content(self):
return None
def extra_form_content(self):
return None

View File

@@ -0,0 +1,4 @@
"""
Forms for processors
"""
from changedetectionio.forms import processor_text_json_diff_form

View File

@@ -0,0 +1,69 @@
import pluggy
# Define the plugin namespace for processors
PLUGIN_NAMESPACE = "changedetectionio_processors"
hookspec = pluggy.HookspecMarker(PLUGIN_NAMESPACE)
hookimpl = pluggy.HookimplMarker(PLUGIN_NAMESPACE)
class ProcessorSpec:
"""Hook specifications for processor plugins."""
@hookspec
def get_processor_name():
"""Return the name of the processor."""
pass
@hookspec
def get_processor_description():
"""Return the description of the processor."""
pass
@hookspec
def get_processor_class():
"""Return the processor class."""
pass
@hookspec
def get_processor_form():
"""Return the processor form class."""
pass
@hookspec
def get_processor_watch_model():
"""Return the watch model class for this processor (if any)."""
pass
@hookspec
def get_display_link(url, processor_name):
"""Return a custom display link for the given processor.
Args:
url: The original URL from the watch
processor_name: The name of the processor
Returns:
A string with the custom display link or None to use the default
"""
pass
@hookspec
def perform_site_check(datastore, watch_uuid):
"""Create and return a processor instance ready to perform site check.
Args:
datastore: The application datastore
watch_uuid: The UUID of the watch to check
Returns:
A processor instance ready to perform site check
"""
pass
# Set up the plugin manager
plugin_manager = pluggy.PluginManager(PLUGIN_NAMESPACE)
# Register hook specifications
plugin_manager.add_hookspecs(ProcessorSpec)

View File

@@ -0,0 +1,222 @@
from loguru import logger
from changedetectionio.model import Watch
from .pluggy_interface import plugin_manager
from typing import Dict, Any, List, Tuple, Optional, TypeVar, Type
import functools
# Import and register internal plugins
from . import whois_plugin
from . import test_plugin
# Register plugins
plugin_manager.register(whois_plugin)
plugin_manager.register(test_plugin)
# Load any setuptools entrypoints
plugin_manager.load_setuptools_entrypoints("changedetectionio_processors")
# Type definitions for better type hinting
T = TypeVar('T')
ProcessorClass = TypeVar('ProcessorClass')
ProcessorForm = TypeVar('ProcessorForm')
ProcessorWatchModel = TypeVar('ProcessorWatchModel')
ProcessorInstance = TypeVar('ProcessorInstance')
# Cache for plugin name mapping to improve performance
_plugin_name_map: Dict[str, Any] = {}
def register_plugin(plugin_module):
"""Register a processor plugin"""
plugin_manager.register(plugin_module)
# Clear the plugin name map cache when a new plugin is registered
global _plugin_name_map
_plugin_name_map = {}
def _get_plugin_name_map() -> Dict[str, Any]:
"""Get a mapping of processor names to plugins
:return: Dictionary mapping processor names to plugins
"""
global _plugin_name_map
# Return cached map if available
if _plugin_name_map:
return _plugin_name_map
# Build the map
result = {}
# Get all plugins from the plugin manager
all_plugins = list(plugin_manager.get_plugins())
# First register known internal plugins by name for reliability
known_plugins = {
'whois': whois_plugin,
'test': test_plugin
}
for name, plugin in known_plugins.items():
if plugin in all_plugins:
result[name] = plugin
# Then process remaining plugins through the hook system
for plugin in all_plugins:
if plugin in known_plugins.values():
continue # Skip plugins we've already registered
try:
# Get the processor name from this plugin
name_results = plugin_manager.hook.get_processor_name(plugin=plugin)
if name_results:
plugin_name = name_results[0]
# Check for name collisions
if plugin_name in result:
logger.warning(f"Plugin name collision: '{plugin_name}' is already registered")
continue
result[plugin_name] = plugin
except Exception as e:
logger.error(f"Error getting processor name from plugin: {str(e)}")
# Cache the map
_plugin_name_map = result
return result
def _get_plugin_by_name(processor_name: str) -> Optional[Any]:
"""Get a plugin by its processor name
:param processor_name: Name of the processor
:return: Plugin object or None
"""
return _get_plugin_name_map().get(processor_name)
def _call_hook_for_plugin(plugin: Any, hook_name: str, default_value: T = None, **kwargs) -> Optional[T]:
"""Call a hook for a specific plugin and handle exceptions
:param plugin: The plugin to call the hook for
:param hook_name: Name of the hook to call
:param default_value: Default value to return if the hook call fails
:param kwargs: Additional arguments to pass to the hook
:return: Result of the hook call or default value
"""
if not plugin:
return default_value
try:
hook = getattr(plugin_manager.hook, hook_name)
results = hook(plugin=plugin, **kwargs)
if results:
return results[0]
except Exception as e:
logger.error(f"Error calling {hook_name} for plugin: {str(e)}")
return default_value
def get_all_processors() -> List[Tuple[str, str]]:
"""Get all processors
:return: List of tuples (processor_name, processor_description)
"""
processors = []
for processor_name, plugin in _get_plugin_name_map().items():
description = _call_hook_for_plugin(plugin, 'get_processor_description')
if description:
processors.append((processor_name, description))
return processors
def get_processor_class(processor_name: str) -> Optional[Type[ProcessorClass]]:
"""Get processor class by name
:param processor_name: Name of the processor
:return: Processor class or None
"""
plugin = _get_plugin_by_name(processor_name)
return _call_hook_for_plugin(plugin, 'get_processor_class')
def get_processor_form(processor_name: str) -> Optional[Type[ProcessorForm]]:
"""Get processor form by name
:param processor_name: Name of the processor
:return: Processor form class or None
"""
plugin = _get_plugin_by_name(processor_name)
return _call_hook_for_plugin(plugin, 'get_processor_form')
def get_processor_watch_model(processor_name: str) -> Type[ProcessorWatchModel]:
"""Get processor watch model by name
:param processor_name: Name of the processor
:return: Watch model class or default Watch model
"""
plugin = _get_plugin_by_name(processor_name)
return _call_hook_for_plugin(plugin, 'get_processor_watch_model', default_value=Watch.model)
def get_processor_site_check(processor_name: str, datastore: Any, watch_uuid: str) -> Optional[ProcessorInstance]:
"""Get a processor instance ready to perform site check
:param processor_name: Name of the processor
:param datastore: The application datastore
:param watch_uuid: The UUID of the watch to check
:return: A processor instance ready to perform site check, or None
"""
plugin = _get_plugin_by_name(processor_name)
if not plugin:
return None
# Try to get the perform_site_check implementation
try:
processor = _call_hook_for_plugin(
plugin,
'perform_site_check',
datastore=datastore,
watch_uuid=watch_uuid
)
if processor:
return processor
# If no perform_site_check hook implementation, try getting the class and instantiating it
processor_class = _call_hook_for_plugin(plugin, 'get_processor_class')
if processor_class:
return processor_class(datastore=datastore, watch_uuid=watch_uuid)
except Exception as e:
logger.error(f"Error getting processor site check for {processor_name}: {str(e)}")
return None
def get_display_link(url: str, processor_name: str) -> Optional[str]:
"""Get a custom display link for the given processor
:param url: The original URL from the watch
:param processor_name: Name of the processor
:return: A string with the custom display link or None to use the default
"""
plugin = _get_plugin_by_name(processor_name)
return _call_hook_for_plugin(
plugin,
'get_display_link',
url=url,
processor_name=processor_name
)
def get_plugin_processor_modules() -> List[Tuple[Any, str]]:
"""Get processor modules for all plugins that can be used with the find_processors function
This function adapts pluggy plugins to be compatible with the traditional find_processors system
:return: A list of (module, processor_name) tuples
"""
result = []
# Import base modules once to avoid repeated imports
from changedetectionio.processors.text_json_diff import processor as text_json_diff_processor
# For each plugin, map to a suitable module for find_processors
for processor_name, plugin in _get_plugin_name_map().items():
try:
processor_class = _call_hook_for_plugin(plugin, 'get_processor_class')
if processor_class:
# Check if this processor extends the text_json_diff processor
base_class_name = str(processor_class.__bases__[0].__name__)
if base_class_name == 'perform_site_check' or 'TextJsonDiffProcessor' in base_class_name:
result.append((text_json_diff_processor, processor_name))
except Exception as e:
logger.error(f"Error mapping processor module for {processor_name}: {str(e)}")
return result

View File

@@ -0,0 +1,169 @@
from loguru import logger
import re
import urllib.parse
from .pluggy_interface import hookimpl
from requests.structures import CaseInsensitiveDict
from changedetectionio.content_fetchers.base import Fetcher
# Import the text_json_diff processor
from changedetectionio.processors.text_json_diff.processor import perform_site_check as TextJsonDiffProcessor
# WHOIS Processor implementation that extends TextJsonDiffProcessor
class WhoisProcessor(TextJsonDiffProcessor):
def _extract_domain_from_url(self, url):
"""Extract domain from URL, removing www. prefix if present"""
parsed_url = urllib.parse.urlparse(url)
domain = parsed_url.netloc
# Remove www. prefix if present
domain = re.sub(r'^www\.', '', domain)
return domain
def call_browser(self, preferred_proxy_id=None):
"""Override call_browser to perform WHOIS lookup instead of using a browser
Note: The python-whois library doesn't directly support proxies. For real proxy support,
we would need to implement a custom socket connection that routes through the proxy.
This is a TODO for a future enhancement.
"""
# Initialize a basic fetcher - this is used by the parent class
self.fetcher = Fetcher()
# Extract URL from watch
url = self.watch.link
# Check for file:// access
if re.search(r'^file:', url.strip(), re.IGNORECASE):
if not self.datastore.data.get('settings', {}).get('application', {}).get('allow_file_uri', False):
raise Exception("file:// type access is denied for security reasons.")
# Extract domain from URL
domain = self._extract_domain_from_url(url)
# Ensure we have a valid domain
if not domain:
error_msg = f"Could not extract domain from URL: '{url}'"
self.fetcher.content = error_msg
self.fetcher.status_code = 400
logger.error(error_msg)
return
# Get proxy configuration using the common method from parent class
proxy_config, proxy_url = super()._get_proxy_for_watch(preferred_proxy_id)
try:
# Use python-whois to get domain information
import whois
# If we have proxy config, use it for the WHOIS lookup
# Note: The python-whois library doesn't directly support proxies,
# but we can implement proxy support if necessary using custom socket code
if proxy_config:
# For now, just log that we would use a proxy
logger.info(f"Using proxy for WHOIS lookup: {proxy_config}")
# Perform the WHOIS lookup
whois_info = whois.whois(domain)
# Convert whois_info object to text
if hasattr(whois_info, 'text'):
# Some whois implementations store raw text in .text attribute
whois_text = whois_info.text
else:
# Otherwise, format it nicely as key-value pairs
whois_text = f"WHOIS Information for domain: {domain}\n\n"
for key, value in whois_info.items():
if value:
whois_text += f"{key}: {value}\n"
# Set the content and status for the fetcher
self.fetcher.content = whois_text
self.fetcher.status_code = 200
# Setup headers dictionary for the fetcher
self.fetcher.headers = CaseInsensitiveDict({
'content-type': 'text/plain',
'server': 'whois-processor'
})
# Add getters for headers
self.fetcher.get_all_headers = lambda: self.fetcher.headers
self.fetcher.get_last_status_code = lambda: self.fetcher.status_code
# Implement necessary methods
self.fetcher.quit = lambda: None
except Exception as e:
error_msg = f"Error fetching WHOIS data for domain {domain}: {str(e)}"
self.fetcher.content = error_msg
self.fetcher.status_code = 500
self.fetcher.headers = CaseInsensitiveDict({
'content-type': 'text/plain',
'server': 'whois-processor'
})
self.fetcher.get_all_headers = lambda: self.fetcher.headers
self.fetcher.get_last_status_code = lambda: self.fetcher.status_code
self.fetcher.quit = lambda: None
logger.error(error_msg)
return
def run_changedetection(self, watch):
"""Use the parent's run_changedetection which will use our overridden call_browser method"""
try:
# Let the parent class handle everything now that we've overridden call_browser
changed_detected, update_obj, filtered_text = super().run_changedetection(watch)
return changed_detected, update_obj, filtered_text
except Exception as e:
error_msg = f"Error in WHOIS processor: {str(e)}"
update_obj = {'last_notification_error': False, 'last_error': error_msg}
logger.error(error_msg)
return False, update_obj, error_msg.encode('utf-8')
@staticmethod
def perform_site_check(datastore, watch_uuid):
"""Factory method to create a WhoisProcessor instance - for compatibility with legacy code"""
processor = WhoisProcessor(datastore=datastore, watch_uuid=watch_uuid)
return processor
@hookimpl
def perform_site_check(datastore, watch_uuid):
"""Create and return a processor instance ready to perform site check"""
return WhoisProcessor(datastore=datastore, watch_uuid=watch_uuid)
@hookimpl(trylast=True) # Use trylast to ensure this runs last in case of conflicts
def get_processor_name():
"""Return the name of this processor"""
from loguru import logger
logger.debug("whois_plugin.get_processor_name() called")
return "whois"
@hookimpl
def get_processor_description():
"""Return the description of this processor"""
return "WHOIS Domain Information Changes Detector"
@hookimpl
def get_processor_class():
"""Return the processor class"""
return WhoisProcessor
@hookimpl
def get_processor_form():
"""Return the processor form class"""
# Import here to avoid circular imports
try:
from changedetectionio.forms import processor_text_json_diff_form
return processor_text_json_diff_form
except Exception as e:
from loguru import logger
logger.error(f"Error importing form for whois plugin: {str(e)}")
return None
@hookimpl
def get_processor_watch_model():
"""Return the watch model class for this processor"""
return None # Use default watch model

View File

@@ -0,0 +1,59 @@
import pytest
from changedetectionio.processors.processor_registry import get_processor_class, get_all_processors
def test_get_all_processors():
"""Test that get_all_processors returns a list of processor tuples"""
processors = get_all_processors()
assert isinstance(processors, list)
assert len(processors) > 0
# Each item should be a tuple of (name, description)
for processor in processors:
assert isinstance(processor, tuple)
assert len(processor) == 2
assert isinstance(processor[0], str)
assert isinstance(processor[1], str)
# Check that our WHOIS processor is included
whois_processor = next((p for p in processors if p[0] == "whois"), None)
assert whois_processor is not None
assert whois_processor[1] == "WHOIS Domain Information Changes"
def test_get_processor_class():
"""Test that get_processor_class returns the right class"""
# Get the WHOIS processor class
processor_class = get_processor_class("whois")
assert processor_class is not None
# It should have perform_site_check method
assert hasattr(processor_class, 'perform_site_check')
# Check for non-existent processor
non_existent = get_processor_class("non_existent_processor")
assert non_existent is None
def test_get_processor_site_check():
"""Test that get_processor_site_check returns a processor instance"""
from unittest.mock import MagicMock
from changedetectionio.processors.processor_registry import get_processor_site_check
# Get a WHOIS processor instance
mock_datastore = MagicMock()
watch_uuid = "test-uuid"
processor = get_processor_site_check("whois", mock_datastore, watch_uuid)
# It should be a processor instance
assert processor is not None
# It should have the run_changedetection method
assert hasattr(processor, 'run_changedetection')
# It should have the call_browser method
assert hasattr(processor, 'call_browser')
# Check for non-existent processor
non_existent = get_processor_site_check("non_existent_processor", mock_datastore, watch_uuid)
assert non_existent is None

View File

@@ -0,0 +1,182 @@
import pytest
from unittest.mock import MagicMock, patch
from changedetectionio.processors.whois_plugin import WhoisProcessor
class MockWatch:
def __init__(self, url, previous_md5=None, include_filters=None, ignore_text=None):
self.url = url
self._previous_md5 = previous_md5
self._include_filters = include_filters or []
self._ignore_text = ignore_text or []
self.history = {}
def get(self, key, default=None):
if key == 'previous_md5':
return self._previous_md5
elif key == 'include_filters':
return self._include_filters
elif key == 'ignore_text':
return self._ignore_text
elif key == 'url':
return self.url
return default
def has_special_diff_filter_options_set(self):
return False
@patch('whois.whois')
@patch('changedetectionio.processors.difference_detection_processor.__init__')
@patch('changedetectionio.processors.text_json_diff.processor.perform_site_check.run_changedetection')
def test_whois_processor_basic_functionality(mock_super_run, mock_base_init, mock_whois):
"""Test the basic functionality of the WhoisProcessor"""
# Mock the base class init so we don't need to set up the full watch structure
mock_base_init.return_value = None
# Mock super().run_changedetection to return a simple result
mock_super_run.return_value = (False, {'previous_md5': 'some-md5'}, b'Some filtered text')
# Mock the whois response
mock_whois_result = MagicMock()
mock_whois_result.text = "Domain Name: example.com\nRegistrar: Example Registrar\nCreation Date: 2020-01-01\n"
mock_whois.return_value = mock_whois_result
# Create mock datastore
mock_datastore = MagicMock()
mock_datastore.proxy_list = None # No proxies
mock_datastore.get_preferred_proxy_for_watch.return_value = None
mock_datastore.data = {
'settings': {
'application': {
'allow_file_uri': False
}
}
}
# Create a processor instance and setup minimal required attributes
processor = WhoisProcessor(datastore=mock_datastore, watch_uuid='test-uuid')
# Create a minimal watch object
watch = MockWatch(url="https://example.com")
# Simulate link access in the watch
processor.watch = MagicMock()
processor.watch.link = "https://example.com"
processor.watch.get.return_value = "uuid-123"
# Run the processor's run_changedetection method by first using call_browser
processor.call_browser()
# Check that the fetcher was set up correctly
assert processor.fetcher is not None
assert hasattr(processor.fetcher, 'content')
assert hasattr(processor.fetcher, 'headers')
assert hasattr(processor.fetcher, 'status_code')
# Verify that whois was called with the right domain
assert mock_whois.called
assert mock_whois.call_args[0][0] == 'example.com'
# Now run the processor
result = processor.run_changedetection(watch)
# Check that the parent run_changedetection was called
assert mock_super_run.called
@patch('whois.whois')
@patch('changedetectionio.processors.difference_detection_processor.__init__')
def test_whois_processor_call_browser_with_proxy(mock_base_init, mock_whois):
"""Test the call_browser method with proxy configuration"""
# Mock the base class init
mock_base_init.return_value = None
# Mock the whois response
mock_whois_result = MagicMock()
mock_whois_result.text = "Domain Name: example.com\nRegistrar: Example Registrar\nCreation Date: 2020-01-01\n"
mock_whois.return_value = mock_whois_result
# Create mock datastore
mock_datastore = MagicMock()
mock_proxy = {
'test-proxy': {
'url': 'http://proxy.example.com:8080',
'label': 'Test Proxy'
}
}
mock_datastore.proxy_list = mock_proxy
mock_datastore.get_preferred_proxy_for_watch.return_value = 'test-proxy'
mock_datastore.data = {
'settings': {
'application': {
'allow_file_uri': False
}
}
}
# Create a processor instance with our mock datastore
processor = WhoisProcessor(datastore=mock_datastore, watch_uuid='test-uuid')
# Set up watch
processor.watch = MagicMock()
processor.watch.link = "https://example.com"
processor.watch.get.return_value = "uuid-123"
# Call the method with a proxy
processor.call_browser()
# Verify whois was called
assert mock_whois.called
assert mock_whois.call_args[0][0] == 'example.com'
# Check that the fetcher was set up correctly
assert processor.fetcher is not None
assert processor.fetcher.content is not None
@patch('changedetectionio.processors.difference_detection_processor.__init__')
def test_whois_processor_perform_site_check(mock_base_init):
"""Test the WhoisProcessor.perform_site_check static method"""
mock_base_init.return_value = None
# Test the static method
with patch.object(WhoisProcessor, '__init__', return_value=None) as mock_init:
datastore = MagicMock()
watch_uuid = "test-uuid"
# Call the static method
processor = WhoisProcessor.perform_site_check(datastore=datastore, watch_uuid=watch_uuid)
# Check that constructor was called with expected args
mock_init.assert_called_once_with(datastore=datastore, watch_uuid=watch_uuid)
# Check it returns the right type
assert isinstance(processor, WhoisProcessor)
def test_get_display_link():
"""Test the get_display_link hook implementation"""
from changedetectionio.processors.whois_plugin import get_display_link
# Test with a regular URL
url = "https://example.com/some/path?param=value"
processor_name = "whois"
link = get_display_link(url=url, processor_name=processor_name)
assert link == "WHOIS - example.com"
# Test with a subdomain
url = "https://subdomain.example.com/"
link = get_display_link(url=url, processor_name=processor_name)
assert link == "WHOIS - subdomain.example.com"
# Test with www prefix (should be removed)
url = "https://www.example.com/"
link = get_display_link(url=url, processor_name=processor_name)
assert link == "WHOIS - example.com"
# Test with a different processor (should return None)
url = "https://example.com/"
processor_name = "text_json_diff"
link = get_display_link(url=url, processor_name=processor_name)
assert link is None

View File

@@ -61,5 +61,22 @@ class TestDiffBuilder(unittest.TestCase):
p = watch.get_from_version_based_on_last_viewed
assert p == "100", "Correct with only one history snapshot"
def test_watch_link_property_with_processor(self):
"""Test the link property with a processor that customizes the link"""
from unittest.mock import patch
watch = Watch.model(datastore_path='/tmp', default={})
watch['url'] = 'https://example.com'
watch['processor'] = 'whois'
# Mock the processor registry's get_display_link function
with patch('changedetectionio.processors.processor_registry.get_display_link') as mock_get_display_link:
mock_get_display_link.return_value = "WHOIS - example.com"
# The link property should use the customized link from the processor
assert watch.link == "WHOIS - example.com"
mock_get_display_link.assert_called_once_with(url='https://example.com', processor_name='whois')
if __name__ == '__main__':
unittest.main()

View File

@@ -271,19 +271,38 @@ class update_worker(threading.Thread):
try:
# Processor is what we are using for detecting the "Change"
processor = watch.get('processor', 'text_json_diff')
processor_name = watch.get('processor', 'text_json_diff')
# Init a new 'difference_detection_processor', first look in processors
processor_module_name = f"changedetectionio.processors.{processor}.processor"
# First, try to get the processor from our plugin registry
try:
processor_module = importlib.import_module(processor_module_name)
except ModuleNotFoundError as e:
print(f"Processor module '{processor}' not found.")
raise e
update_handler = processor_module.perform_site_check(datastore=self.datastore,
watch_uuid=uuid
)
from changedetectionio.processors.processor_registry import get_processor_site_check
update_handler = get_processor_site_check(processor_name, self.datastore, uuid)
if update_handler:
# We found the processor in our plugin registry
logger.info(f"Using processor '{processor_name}' from plugin registry")
else:
# Fall back to the traditional file-based approach
processor_module_name = f"changedetectionio.processors.{processor_name}.processor"
try:
processor_module = importlib.import_module(processor_module_name)
update_handler = processor_module.perform_site_check(datastore=self.datastore,
watch_uuid=uuid)
except ModuleNotFoundError as e:
print(f"Processor module '{processor_name}' not found in both plugin registry and file system.")
raise e
except ImportError as e:
# If processor_registry.py cannot be imported, fall back to the traditional approach
processor_module_name = f"changedetectionio.processors.{processor_name}.processor"
try:
processor_module = importlib.import_module(processor_module_name)
update_handler = processor_module.perform_site_check(datastore=self.datastore,
watch_uuid=uuid)
except ModuleNotFoundError as e:
print(f"Processor module '{processor_name}' not found.")
raise e
update_handler.call_browser()

View File

@@ -0,0 +1,31 @@
#!/usr/bin/env python3
from changedetectionio.processors import available_processors
from changedetectionio.processors.processor_registry import get_processor_class, get_processor_form
# Test processor registration
print("=== Available Processors ===")
processors = available_processors()
for name, description in processors:
print(f"Processor: {name} - {description}")
# Check if our WHOIS processor is registered
whois_processor_name = "whois_processor"
whois_found = any(name == whois_processor_name for name, _ in processors)
if whois_found:
print(f"\nWHOIS Processor found! Getting processor class and form...")
# Get the processor class
processor_class = get_processor_class(whois_processor_name)
print(f"Processor class: {processor_class}")
print(f"Processor class name: {processor_class.__name__ if processor_class else None}")
print(f"Processor class module: {processor_class.__module__ if processor_class else None}")
# Get the processor form
processor_form = get_processor_form(whois_processor_name)
print(f"Processor form: {processor_form}")
print("\nWHOIS Processor successfully registered")
else:
print(f"\nWHOIS Processor not found in available processors")

16
test_processors.py Executable file
View File

@@ -0,0 +1,16 @@
#!/usr/bin/env python3
from changedetectionio.processors import available_processors
from changedetectionio.processors import find_processors
# Test traditional processor discovery
print("=== Traditional Processor Discovery ===")
traditional_processors = find_processors()
for module, name in traditional_processors:
print(f"Found processor: {name} in {module.__name__}")
# Test combined processor discovery (traditional + pluggy)
print("\n=== Combined Processor Discovery ===")
combined_processors = available_processors()
for name, description in combined_processors:
print(f"Processor: {name} - {description}")

53
test_whois_extraction.py Normal file
View File

@@ -0,0 +1,53 @@
#!/usr/bin/env python3
import urllib.parse
import re
import sys
def extract_domain_from_url(url):
"""Extract domain from a URL"""
parsed_url = urllib.parse.urlparse(url)
domain = parsed_url.netloc
# Remove www. prefix if present
domain = re.sub(r'^www\.', '', domain)
return domain
# Test domain extraction
test_urls = [
"https://changedetection.io",
"http://www.example.com/page",
"https://subdomain.domain.co.uk/path?query=1",
"ftp://ftp.example.org",
"https://www.changedetection.io/page/subpage",
]
print("=== Domain Extraction Test ===")
for url in test_urls:
domain = extract_domain_from_url(url)
print(f"URL: {url} -> Domain: {domain}")
# Test WHOIS lookup for changedetection.io
try:
import whois
domain = extract_domain_from_url("https://changedetection.io")
print(f"\n=== WHOIS lookup for {domain} ===")
whois_info = whois.whois(domain)
# Print key information
print(f"Domain Name: {whois_info.get('domain_name', '')}")
print(f"Registrar: {whois_info.get('registrar', '')}")
print(f"Creation Date: {whois_info.get('creation_date', '')}")
print(f"Expiration Date: {whois_info.get('expiration_date', '')}")
print("\nWHOIS lookup successful!")
except ImportError:
print("python-whois module not installed. Run: pip install python-whois")
sys.exit(1)
except Exception as e:
print(f"Error performing WHOIS lookup: {str(e)}")
sys.exit(1)

47
test_whois_processor.py Normal file
View File

@@ -0,0 +1,47 @@
#!/usr/bin/env python3
from changedetectionio.processors import available_processors
from changedetectionio.processors.processor_registry import get_processor_class
import urllib.parse
import sys
# First, verify our processor is available
print("=== Available Processors ===")
processors = available_processors()
for name, description in processors:
print(f"Processor: {name} - {description}")
# Get the WHOIS processor class
whois_processor_class = get_processor_class("whois_processor")
if not whois_processor_class:
print("ERROR: WHOIS processor not found in available processors.")
sys.exit(1)
print(f"\nFound WHOIS processor class: {whois_processor_class}")
# Test the WHOIS processor directly
try:
# Parse a domain from a URL
url = "https://changedetection.io"
parsed_url = urllib.parse.urlparse(url)
domain = parsed_url.netloc
# Import whois and fetch information
import whois
whois_info = whois.whois(domain)
print(f"\n=== WHOIS Information for {domain} ===")
# Print the information
if hasattr(whois_info, 'text'):
print(whois_info.text)
else:
for key, value in whois_info.items():
if value:
print(f"{key}: {value}")
print("\nSuccessfully retrieved WHOIS data!")
except Exception as e:
print(f"Error fetching WHOIS data: {str(e)}")
sys.exit(1)

View File

@@ -0,0 +1,136 @@
#!/usr/bin/env python3
from changedetectionio.processors import available_processors
from changedetectionio.processors.processor_registry import get_processor_class
import unittest
import sys
from unittest.mock import MagicMock, patch
import urllib.parse
# First, verify our processor is available
print("=== Available Processors ===")
processors = available_processors()
for name, description in processors:
print(f"Processor: {name} - {description}")
# Get the WHOIS processor class
whois_processor_class = get_processor_class("whois_processor")
if not whois_processor_class:
print("ERROR: WHOIS processor not found in available processors.")
sys.exit(1)
print(f"\nFound WHOIS processor class: {whois_processor_class}")
# Create a test for our WHOIS processor
class TestWhoisProcessor(unittest.TestCase):
# Use the real whois function - tests will actually make network requests
def test_whois_processor_real(self):
# Extract the domain from the URL
test_url = "https://changedetection.io"
parsed_url = urllib.parse.urlparse(test_url)
domain = parsed_url.netloc
# Create a minimal mock datastore
mock_datastore = MagicMock()
mock_datastore.data = {
'watching': {'test-uuid': {'url': test_url}},
'settings': {
'application': {'empty_pages_are_a_change': False},
'requests': {'timeout': 30}
}
}
mock_datastore.get_all_base_headers.return_value = {}
mock_datastore.get_all_headers_in_textfile_for_watch.return_value = {}
mock_datastore.get_preferred_proxy_for_watch.return_value = None
mock_datastore.get_tag_overrides_for_watch.return_value = []
# Create a minimal mock watch that mimics the real Watch class
class MockWatch:
def __init__(self, url):
self.link = url
self.is_pdf = False
self.has_browser_steps = False
self.is_source_type_url = False
self.history = {}
self.history_n = 0
self.last_viewed = 0
self.newest_history_key = 0
def get(self, key, default=None):
if key == 'uuid':
return 'test-uuid'
elif key == 'include_filters':
return []
elif key == 'body':
return None
elif key == 'method':
return 'GET'
elif key == 'headers':
return {}
elif key == 'browser_steps':
return []
return default
def __getitem__(self, key):
return self.get(key)
def get_last_fetched_text_before_filters(self):
return ""
def save_last_text_fetched_before_filters(self, content):
pass
def has_special_diff_filter_options_set(self):
return False
def lines_contain_something_unique_compared_to_history(self, lines, ignore_whitespace):
return True
mock_watch = MockWatch(test_url)
# Create a more complete mock fetcher
class MockFetcher:
def __init__(self):
self.content = ""
self.raw_content = b""
self.headers = {'Content-Type': 'text/plain'}
self.screenshot = None
self.xpath_data = None
self.instock_data = None
self.browser_steps = []
def get_last_status_code(self):
return 200
def get_all_headers(self):
return {'content-type': 'text/plain'}
def quit(self):
pass
def run(self, **kwargs):
pass
# Create the processor and set the mock fetcher
processor = whois_processor_class(datastore=mock_datastore, watch_uuid='test-uuid')
processor.fetcher = MockFetcher()
# Run the processor - this will make an actual WHOIS request
changed, update_obj, content = processor.run_changedetection(mock_watch)
# Print the content for debugging
content_str = content.decode('utf-8')
print(f"\n=== WHOIS Content from processor (first 200 chars) ===")
print(content_str[:200] + "...")
# Verify the content contains domain information
self.assertIn(domain, content_str)
self.assertIn("Domain Name", content_str)
self.assertIn("Creation Date", content_str)
print("\nWHOIS processor test with real data PASSED!")
# Run the test
if __name__ == "__main__":
unittest.main(argv=['first-arg-is-ignored'], exit=False)

39
test_whois_simple.py Normal file
View File

@@ -0,0 +1,39 @@
#!/usr/bin/env python3
import urllib.parse
import re
import whois
# Test with changedetection.io domain
url = "https://changedetection.io"
# Extract domain from URL
parsed_url = urllib.parse.urlparse(url)
domain = parsed_url.netloc
# Remove www. prefix if present
domain = re.sub(r'^www\.', '', domain)
# Fetch WHOIS information
print(f"Looking up WHOIS data for domain: {domain}")
whois_info = whois.whois(domain)
# Print key WHOIS data
print("\nKey WHOIS information:")
print(f"Domain Name: {whois_info.get('domain_name', 'Unknown')}")
print(f"Registrar: {whois_info.get('registrar', 'Unknown')}")
print(f"Creation Date: {whois_info.get('creation_date', 'Unknown')}")
print(f"Expiration Date: {whois_info.get('expiration_date', 'Unknown')}")
print(f"Updated Date: {whois_info.get('updated_date', 'Unknown')}")
# Format as text
whois_text = f"WHOIS Information for domain: {domain}\n\n"
for key, value in whois_info.items():
if value:
whois_text += f"{key}: {value}\n"
# Print the first 200 characters
print("\nFormatted WHOIS data (first 200 chars):")
print(whois_text[:200] + "...")
print("\nWHOIS lookup successful!")