mirror of
https://github.com/dgtlmoon/changedetection.io.git
synced 2025-11-03 08:07:23 +00:00
Compare commits
41 Commits
0.50.38
...
conditions
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
79166c0307 | ||
|
|
9dbe91e470 | ||
|
|
51bd8cd2d7 | ||
|
|
35455e7dd6 | ||
|
|
aaa038f082 | ||
|
|
57eeb221cb | ||
|
|
8187b9ce4c | ||
|
|
cc70b65bfa | ||
|
|
42099f1fff | ||
|
|
408864d346 | ||
|
|
02b8660bf3 | ||
|
|
947a60af89 | ||
|
|
a0f4cb4d65 | ||
|
|
71ea8d80f3 | ||
|
|
4f48958187 | ||
|
|
2608980b1d | ||
|
|
c982395d72 | ||
|
|
ee7e43ea87 | ||
|
|
da5585b53c | ||
|
|
76062c9419 | ||
|
|
675953797c | ||
|
|
b202652a93 | ||
|
|
617dc721bf | ||
|
|
ec13720694 | ||
|
|
ddacb0bcbc | ||
|
|
f67d98b839 | ||
|
|
beee93d528 | ||
|
|
987ab3e494 | ||
|
|
0c68cfffb1 | ||
|
|
e93a9244fe | ||
|
|
e56eec41c1 | ||
|
|
31f4bb7cc3 | ||
|
|
f08efde110 | ||
|
|
9b39b2853b | ||
|
|
892d38ba42 | ||
|
|
b170e191d4 | ||
|
|
edb78efcca | ||
|
|
383f90b70c | ||
|
|
6948418865 | ||
|
|
cd80e317f3 | ||
|
|
8c26210804 |
@@ -712,23 +712,63 @@ def changedetection_app(config=None, datastore_o=None):
|
||||
# Does it use some custom form? does one exist?
|
||||
processor_name = datastore.data['watching'][uuid].get('processor', '')
|
||||
processor_classes = next((tpl for tpl in find_processors() if tpl[1] == processor_name), None)
|
||||
|
||||
# If it's not found in traditional processors, check if it's a pluggy plugin
|
||||
if not processor_classes:
|
||||
flash(f"Cannot load the edit form for processor/plugin '{processor_classes[1]}', plugin missing?", 'error')
|
||||
return redirect(url_for('index'))
|
||||
|
||||
parent_module = get_parent_module(processor_classes[0])
|
||||
|
||||
try:
|
||||
# Get the parent of the "processor.py" go up one, get the form (kinda spaghetti but its reusing existing code)
|
||||
forms_module = importlib.import_module(f"{parent_module.__name__}.forms")
|
||||
# Access the 'processor_settings_form' class from the 'forms' module
|
||||
form_class = getattr(forms_module, 'processor_settings_form')
|
||||
except ModuleNotFoundError as e:
|
||||
# .forms didnt exist
|
||||
form_class = forms.processor_text_json_diff_form
|
||||
except AttributeError as e:
|
||||
# .forms exists but no useful form
|
||||
form_class = forms.processor_text_json_diff_form
|
||||
try:
|
||||
from changedetectionio.processors.processor_registry import get_processor_form, _get_plugin_name_map
|
||||
|
||||
# Get all available plugins for debugging
|
||||
available_plugins = list(_get_plugin_name_map().keys())
|
||||
logger.debug(f"Available processor plugins: {available_plugins}")
|
||||
|
||||
# Try to get the processor form
|
||||
plugin_form_class = get_processor_form(processor_name)
|
||||
|
||||
if plugin_form_class:
|
||||
# Use default text_json_diff_form as parent module for plugins
|
||||
from changedetectionio.processors.text_json_diff import processor as text_json_diff_processor
|
||||
form_class = forms.processor_text_json_diff_form
|
||||
parent_module = get_parent_module(text_json_diff_processor)
|
||||
|
||||
# Skip the normal form loading code path
|
||||
use_plugin_form = True
|
||||
logger.debug(f"Successfully loaded form for plugin '{processor_name}'")
|
||||
else:
|
||||
# Check if the plugin is registered but doesn't have a form
|
||||
if processor_name in available_plugins:
|
||||
logger.error(f"Plugin '{processor_name}' is registered but has no form class")
|
||||
flash(f"Plugin '{processor_name}' is registered but has no form class", 'error')
|
||||
else:
|
||||
logger.error(f"Cannot find plugin '{processor_name}'. Available plugins: {available_plugins}")
|
||||
flash(f"Cannot load the edit form for processor/plugin '{processor_name}', plugin missing?", 'error')
|
||||
return redirect(url_for('index'))
|
||||
except ImportError as e:
|
||||
logger.error(f"Import error when loading plugin form: {str(e)}")
|
||||
flash(f"Cannot load the edit form for processor/plugin '{processor_name}', plugin system not available?", 'error')
|
||||
return redirect(url_for('index'))
|
||||
except Exception as e:
|
||||
logger.error(f"Unexpected error loading plugin form: {str(e)}")
|
||||
flash(f"Error loading plugin form: {str(e)}", 'error')
|
||||
return redirect(url_for('index'))
|
||||
else:
|
||||
# Traditional processor - continue with normal flow
|
||||
parent_module = get_parent_module(processor_classes[0])
|
||||
use_plugin_form = False
|
||||
|
||||
# Only follow this path for traditional processors
|
||||
if not use_plugin_form:
|
||||
try:
|
||||
# Get the parent of the "processor.py" go up one, get the form (kinda spaghetti but its reusing existing code)
|
||||
forms_module = importlib.import_module(f"{parent_module.__name__}.forms")
|
||||
# Access the 'processor_settings_form' class from the 'forms' module
|
||||
form_class = getattr(forms_module, 'processor_settings_form')
|
||||
except ModuleNotFoundError as e:
|
||||
# .forms didnt exist
|
||||
form_class = forms.processor_text_json_diff_form
|
||||
except AttributeError as e:
|
||||
# .forms exists but no useful form
|
||||
form_class = forms.processor_text_json_diff_form
|
||||
|
||||
form = form_class(formdata=request.form if request.method == 'POST' else None,
|
||||
data=default,
|
||||
|
||||
@@ -67,7 +67,6 @@ class model(watch_base):
|
||||
|
||||
@property
|
||||
def link(self):
|
||||
|
||||
url = self.get('url', '')
|
||||
if not is_safe_url(url):
|
||||
return 'DISABLED'
|
||||
@@ -93,6 +92,19 @@ class model(watch_base):
|
||||
# Also double check it after any Jinja2 formatting just incase
|
||||
if not is_safe_url(ready_url):
|
||||
return 'DISABLED'
|
||||
|
||||
# Check if a processor wants to customize the display link
|
||||
processor_name = self.get('processor')
|
||||
if processor_name:
|
||||
try:
|
||||
# Import here to avoid circular imports
|
||||
from changedetectionio.processors.processor_registry import get_display_link
|
||||
custom_link = get_display_link(url=ready_url, processor_name=processor_name)
|
||||
if custom_link:
|
||||
return custom_link
|
||||
except Exception as e:
|
||||
logger.error(f"Error getting custom display link for processor {processor_name}: {str(e)}")
|
||||
|
||||
return ready_url
|
||||
|
||||
def clear_watch(self):
|
||||
|
||||
@@ -3,6 +3,7 @@ from changedetectionio.content_fetchers.base import Fetcher
|
||||
from changedetectionio.strtobool import strtobool
|
||||
from copy import deepcopy
|
||||
from loguru import logger
|
||||
|
||||
import hashlib
|
||||
import importlib
|
||||
import inspect
|
||||
@@ -10,6 +11,10 @@ import os
|
||||
import pkgutil
|
||||
import re
|
||||
|
||||
# Import the plugin manager
|
||||
from .pluggy_interface import plugin_manager
|
||||
|
||||
|
||||
class difference_detection_processor():
|
||||
|
||||
browser_steps = None
|
||||
@@ -26,9 +31,95 @@ class difference_detection_processor():
|
||||
self.watch = deepcopy(self.datastore.data['watching'].get(watch_uuid))
|
||||
# Generic fetcher that should be extended (requests, playwright etc)
|
||||
self.fetcher = Fetcher()
|
||||
|
||||
def _get_proxy_for_watch(self, preferred_proxy_id=None):
|
||||
"""Get proxy configuration based on watch settings and preferred proxy ID
|
||||
|
||||
Args:
|
||||
preferred_proxy_id: Optional explicit proxy ID to use
|
||||
|
||||
Returns:
|
||||
dict: Proxy configuration or None if no proxy should be used
|
||||
str: Proxy URL or None if no proxy should be used
|
||||
"""
|
||||
# Default to no proxy config
|
||||
proxy_config = None
|
||||
proxy_url = None
|
||||
|
||||
# Check if datastore is available and has get_preferred_proxy_for_watch method
|
||||
if hasattr(self, 'datastore') and self.datastore:
|
||||
try:
|
||||
# Get preferred proxy ID if not provided
|
||||
if not preferred_proxy_id and hasattr(self.datastore, 'get_preferred_proxy_for_watch'):
|
||||
# Get the watch UUID if available
|
||||
watch_uuid = None
|
||||
if hasattr(self.watch, 'get'):
|
||||
watch_uuid = self.watch.get('uuid')
|
||||
elif hasattr(self.watch, 'uuid'):
|
||||
watch_uuid = self.watch.uuid
|
||||
|
||||
if watch_uuid:
|
||||
preferred_proxy_id = self.datastore.get_preferred_proxy_for_watch(uuid=watch_uuid)
|
||||
|
||||
# Check if we have a proxy list and a valid proxy ID
|
||||
if preferred_proxy_id and hasattr(self.datastore, 'proxy_list') and self.datastore.proxy_list:
|
||||
proxy_info = self.datastore.proxy_list.get(preferred_proxy_id)
|
||||
|
||||
if proxy_info and 'url' in proxy_info:
|
||||
proxy_url = proxy_info.get('url')
|
||||
logger.debug(f"Selected proxy key '{preferred_proxy_id}' as proxy URL '{proxy_url}'")
|
||||
|
||||
# Parse the proxy URL to build a proxy dict for requests
|
||||
import urllib.parse
|
||||
parsed_proxy = urllib.parse.urlparse(proxy_url)
|
||||
proxy_type = parsed_proxy.scheme
|
||||
|
||||
# Extract credentials if present
|
||||
username = None
|
||||
password = None
|
||||
if parsed_proxy.username:
|
||||
username = parsed_proxy.username
|
||||
if parsed_proxy.password:
|
||||
password = parsed_proxy.password
|
||||
|
||||
# Build the proxy URL without credentials for the proxy dict
|
||||
netloc = parsed_proxy.netloc
|
||||
if '@' in netloc:
|
||||
netloc = netloc.split('@')[1]
|
||||
|
||||
proxy_addr = f"{proxy_type}://{netloc}"
|
||||
|
||||
# Create the proxy configuration
|
||||
proxy_config = {
|
||||
'http': proxy_addr,
|
||||
'https': proxy_addr
|
||||
}
|
||||
|
||||
# Add credentials if present
|
||||
if username:
|
||||
proxy_config['username'] = username
|
||||
if password:
|
||||
proxy_config['password'] = password
|
||||
except Exception as e:
|
||||
# Log the error but continue without a proxy
|
||||
logger.error(f"Error setting up proxy: {str(e)}")
|
||||
proxy_config = None
|
||||
proxy_url = None
|
||||
|
||||
return proxy_config, proxy_url
|
||||
|
||||
def call_browser(self, preferred_proxy_id=None):
|
||||
|
||||
"""Fetch content using the appropriate browser/fetcher
|
||||
|
||||
This method will:
|
||||
1. Determine the appropriate fetcher to use based on watch settings
|
||||
2. Set up proxy configuration if needed
|
||||
3. Initialize the fetcher with the correct parameters
|
||||
4. Configure any browser steps if needed
|
||||
|
||||
Args:
|
||||
preferred_proxy_id: Optional explicit proxy ID to use
|
||||
"""
|
||||
from requests.structures import CaseInsensitiveDict
|
||||
|
||||
url = self.watch.link
|
||||
@@ -43,8 +134,8 @@ class difference_detection_processor():
|
||||
# Requests, playwright, other browser via wss:// etc, fetch_extra_something
|
||||
prefer_fetch_backend = self.watch.get('fetch_backend', 'system')
|
||||
|
||||
# Proxy ID "key"
|
||||
preferred_proxy_id = preferred_proxy_id if preferred_proxy_id else self.datastore.get_preferred_proxy_for_watch(uuid=self.watch.get('uuid'))
|
||||
# Get proxy configuration
|
||||
proxy_config, proxy_url = self._get_proxy_for_watch(preferred_proxy_id)
|
||||
|
||||
# Pluggable content self.fetcher
|
||||
if not prefer_fetch_backend or prefer_fetch_backend == 'system':
|
||||
@@ -82,14 +173,10 @@ class difference_detection_processor():
|
||||
# What it referenced doesnt exist, Just use a default
|
||||
fetcher_obj = getattr(content_fetchers, "html_requests")
|
||||
|
||||
proxy_url = None
|
||||
if preferred_proxy_id:
|
||||
# Custom browser endpoints should NOT have a proxy added
|
||||
if not prefer_fetch_backend.startswith('extra_browser_'):
|
||||
proxy_url = self.datastore.proxy_list.get(preferred_proxy_id).get('url')
|
||||
logger.debug(f"Selected proxy key '{preferred_proxy_id}' as proxy URL '{proxy_url}' for {url}")
|
||||
else:
|
||||
logger.debug(f"Skipping adding proxy data when custom Browser endpoint is specified. ")
|
||||
# Custom browser endpoints should NOT have a proxy added
|
||||
if proxy_url and prefer_fetch_backend.startswith('extra_browser_'):
|
||||
logger.debug(f"Skipping adding proxy data when custom Browser endpoint is specified.")
|
||||
proxy_url = None
|
||||
|
||||
# Now call the fetcher (playwright/requests/etc) with arguments that only a fetcher would need.
|
||||
# When browser_connection_url is None, it method should default to working out whats the best defaults (os env vars etc)
|
||||
@@ -185,9 +272,9 @@ def find_sub_packages(package_name):
|
||||
|
||||
def find_processors():
|
||||
"""
|
||||
Find all subclasses of DifferenceDetectionProcessor in the specified package.
|
||||
Find all subclasses of DifferenceDetectionProcessor in the specified package
|
||||
and also include processors from the plugin system.
|
||||
|
||||
:param package_name: The name of the package to scan for processor modules.
|
||||
:return: A list of (module, class) tuples.
|
||||
"""
|
||||
package_name = "changedetectionio.processors" # Name of the current package/module
|
||||
@@ -195,6 +282,7 @@ def find_processors():
|
||||
processors = []
|
||||
sub_packages = find_sub_packages(package_name)
|
||||
|
||||
# Find traditional processors
|
||||
for sub_package in sub_packages:
|
||||
module_name = f"{package_name}.{sub_package}.processor"
|
||||
try:
|
||||
@@ -207,6 +295,15 @@ def find_processors():
|
||||
except (ModuleNotFoundError, ImportError) as e:
|
||||
logger.warning(f"Failed to import module {module_name}: {e} (find_processors())")
|
||||
|
||||
# Also include processors from the plugin system
|
||||
try:
|
||||
from .processor_registry import get_plugin_processor_modules
|
||||
plugin_modules = get_plugin_processor_modules()
|
||||
if plugin_modules:
|
||||
processors.extend(plugin_modules)
|
||||
except (ImportError, ModuleNotFoundError) as e:
|
||||
logger.warning(f"Failed to import plugin modules: {e} (find_processors())")
|
||||
|
||||
return processors
|
||||
|
||||
|
||||
@@ -223,8 +320,22 @@ def get_parent_module(module):
|
||||
return False
|
||||
|
||||
|
||||
|
||||
def get_custom_watch_obj_for_processor(processor_name):
|
||||
"""
|
||||
Get the custom watch object for a processor
|
||||
:param processor_name: Name of the processor
|
||||
:return: Watch class or None
|
||||
"""
|
||||
# First, try to get the watch model from the pluggy system
|
||||
try:
|
||||
from .processor_registry import get_processor_watch_model
|
||||
watch_model = get_processor_watch_model(processor_name)
|
||||
if watch_model:
|
||||
return watch_model
|
||||
except Exception as e:
|
||||
logger.warning(f"Error getting processor watch model from pluggy: {e}")
|
||||
|
||||
# Fall back to the traditional approach
|
||||
from changedetectionio.model import Watch
|
||||
watch_class = Watch.model
|
||||
processor_classes = find_processors()
|
||||
@@ -241,14 +352,47 @@ def get_custom_watch_obj_for_processor(processor_name):
|
||||
def available_processors():
|
||||
"""
|
||||
Get a list of processors by name and description for the UI elements
|
||||
:return: A list :)
|
||||
:return: A list of tuples (processor_name, description)
|
||||
"""
|
||||
|
||||
processor_classes = find_processors()
|
||||
|
||||
available = []
|
||||
for package, processor_class in processor_classes:
|
||||
available.append((processor_class, package.name))
|
||||
|
||||
return available
|
||||
|
||||
# Get processors from the pluggy system
|
||||
pluggy_processors = []
|
||||
try:
|
||||
from .processor_registry import get_all_processors
|
||||
pluggy_processors = get_all_processors()
|
||||
except Exception as e:
|
||||
logger.error(f"Error getting processors from pluggy: {str(e)}")
|
||||
|
||||
# Get processors from the traditional file-based system
|
||||
traditional_processors = []
|
||||
try:
|
||||
# Let's not use find_processors() directly since it now also includes pluggy processors
|
||||
package_name = "changedetectionio.processors"
|
||||
sub_packages = find_sub_packages(package_name)
|
||||
|
||||
for sub_package in sub_packages:
|
||||
module_name = f"{package_name}.{sub_package}.processor"
|
||||
try:
|
||||
module = importlib.import_module(module_name)
|
||||
# Get the name and description from the module if available
|
||||
name = getattr(module, 'name', f"Traditional processor: {sub_package}")
|
||||
description = getattr(module, 'description', sub_package)
|
||||
traditional_processors.append((sub_package, name))
|
||||
except (ModuleNotFoundError, ImportError, AttributeError) as e:
|
||||
logger.warning(f"Failed to import module {module_name}: {e} (available_processors())")
|
||||
except Exception as e:
|
||||
logger.error(f"Error getting traditional processors: {str(e)}")
|
||||
|
||||
# Combine the lists, ensuring no duplicates
|
||||
# Pluggy processors take precedence
|
||||
all_processors = []
|
||||
|
||||
# Add all pluggy processors
|
||||
all_processors.extend(pluggy_processors)
|
||||
|
||||
# Add traditional processors that aren't already registered via pluggy
|
||||
pluggy_processor_names = [name for name, _ in pluggy_processors]
|
||||
for processor_class, name in traditional_processors:
|
||||
if processor_class not in pluggy_processor_names:
|
||||
all_processors.append((processor_class, name))
|
||||
|
||||
return all_processors
|
||||
17
changedetectionio/processors/form.py
Normal file
17
changedetectionio/processors/form.py
Normal file
@@ -0,0 +1,17 @@
|
||||
from wtforms import (
|
||||
BooleanField,
|
||||
validators,
|
||||
RadioField
|
||||
)
|
||||
from wtforms.fields.choices import SelectField
|
||||
from wtforms.fields.form import FormField
|
||||
from wtforms.form import Form
|
||||
|
||||
class BaseProcessorForm(Form):
|
||||
"""Base class for processor forms"""
|
||||
|
||||
def extra_tab_content(self):
|
||||
return None
|
||||
|
||||
def extra_form_content(self):
|
||||
return None
|
||||
4
changedetectionio/processors/forms.py
Normal file
4
changedetectionio/processors/forms.py
Normal file
@@ -0,0 +1,4 @@
|
||||
"""
|
||||
Forms for processors
|
||||
"""
|
||||
from changedetectionio.forms import processor_text_json_diff_form
|
||||
69
changedetectionio/processors/pluggy_interface.py
Normal file
69
changedetectionio/processors/pluggy_interface.py
Normal file
@@ -0,0 +1,69 @@
|
||||
import pluggy
|
||||
|
||||
# Define the plugin namespace for processors
|
||||
PLUGIN_NAMESPACE = "changedetectionio_processors"
|
||||
|
||||
hookspec = pluggy.HookspecMarker(PLUGIN_NAMESPACE)
|
||||
hookimpl = pluggy.HookimplMarker(PLUGIN_NAMESPACE)
|
||||
|
||||
|
||||
class ProcessorSpec:
|
||||
"""Hook specifications for processor plugins."""
|
||||
|
||||
@hookspec
|
||||
def get_processor_name():
|
||||
"""Return the name of the processor."""
|
||||
pass
|
||||
|
||||
@hookspec
|
||||
def get_processor_description():
|
||||
"""Return the description of the processor."""
|
||||
pass
|
||||
|
||||
@hookspec
|
||||
def get_processor_class():
|
||||
"""Return the processor class."""
|
||||
pass
|
||||
|
||||
@hookspec
|
||||
def get_processor_form():
|
||||
"""Return the processor form class."""
|
||||
pass
|
||||
|
||||
@hookspec
|
||||
def get_processor_watch_model():
|
||||
"""Return the watch model class for this processor (if any)."""
|
||||
pass
|
||||
|
||||
@hookspec
|
||||
def get_display_link(url, processor_name):
|
||||
"""Return a custom display link for the given processor.
|
||||
|
||||
Args:
|
||||
url: The original URL from the watch
|
||||
processor_name: The name of the processor
|
||||
|
||||
Returns:
|
||||
A string with the custom display link or None to use the default
|
||||
"""
|
||||
pass
|
||||
|
||||
@hookspec
|
||||
def perform_site_check(datastore, watch_uuid):
|
||||
"""Create and return a processor instance ready to perform site check.
|
||||
|
||||
Args:
|
||||
datastore: The application datastore
|
||||
watch_uuid: The UUID of the watch to check
|
||||
|
||||
Returns:
|
||||
A processor instance ready to perform site check
|
||||
"""
|
||||
pass
|
||||
|
||||
|
||||
# Set up the plugin manager
|
||||
plugin_manager = pluggy.PluginManager(PLUGIN_NAMESPACE)
|
||||
|
||||
# Register hook specifications
|
||||
plugin_manager.add_hookspecs(ProcessorSpec)
|
||||
222
changedetectionio/processors/processor_registry.py
Normal file
222
changedetectionio/processors/processor_registry.py
Normal file
@@ -0,0 +1,222 @@
|
||||
from loguru import logger
|
||||
from changedetectionio.model import Watch
|
||||
from .pluggy_interface import plugin_manager
|
||||
from typing import Dict, Any, List, Tuple, Optional, TypeVar, Type
|
||||
import functools
|
||||
|
||||
# Import and register internal plugins
|
||||
from . import whois_plugin
|
||||
from . import test_plugin
|
||||
|
||||
# Register plugins
|
||||
plugin_manager.register(whois_plugin)
|
||||
plugin_manager.register(test_plugin)
|
||||
|
||||
# Load any setuptools entrypoints
|
||||
plugin_manager.load_setuptools_entrypoints("changedetectionio_processors")
|
||||
|
||||
# Type definitions for better type hinting
|
||||
T = TypeVar('T')
|
||||
ProcessorClass = TypeVar('ProcessorClass')
|
||||
ProcessorForm = TypeVar('ProcessorForm')
|
||||
ProcessorWatchModel = TypeVar('ProcessorWatchModel')
|
||||
ProcessorInstance = TypeVar('ProcessorInstance')
|
||||
|
||||
# Cache for plugin name mapping to improve performance
|
||||
_plugin_name_map: Dict[str, Any] = {}
|
||||
|
||||
def register_plugin(plugin_module):
|
||||
"""Register a processor plugin"""
|
||||
plugin_manager.register(plugin_module)
|
||||
# Clear the plugin name map cache when a new plugin is registered
|
||||
global _plugin_name_map
|
||||
_plugin_name_map = {}
|
||||
|
||||
def _get_plugin_name_map() -> Dict[str, Any]:
|
||||
"""Get a mapping of processor names to plugins
|
||||
:return: Dictionary mapping processor names to plugins
|
||||
"""
|
||||
global _plugin_name_map
|
||||
|
||||
# Return cached map if available
|
||||
if _plugin_name_map:
|
||||
return _plugin_name_map
|
||||
|
||||
# Build the map
|
||||
result = {}
|
||||
|
||||
# Get all plugins from the plugin manager
|
||||
all_plugins = list(plugin_manager.get_plugins())
|
||||
|
||||
# First register known internal plugins by name for reliability
|
||||
known_plugins = {
|
||||
'whois': whois_plugin,
|
||||
'test': test_plugin
|
||||
}
|
||||
|
||||
for name, plugin in known_plugins.items():
|
||||
if plugin in all_plugins:
|
||||
result[name] = plugin
|
||||
|
||||
# Then process remaining plugins through the hook system
|
||||
for plugin in all_plugins:
|
||||
if plugin in known_plugins.values():
|
||||
continue # Skip plugins we've already registered
|
||||
|
||||
try:
|
||||
# Get the processor name from this plugin
|
||||
name_results = plugin_manager.hook.get_processor_name(plugin=plugin)
|
||||
|
||||
if name_results:
|
||||
plugin_name = name_results[0]
|
||||
|
||||
# Check for name collisions
|
||||
if plugin_name in result:
|
||||
logger.warning(f"Plugin name collision: '{plugin_name}' is already registered")
|
||||
continue
|
||||
|
||||
result[plugin_name] = plugin
|
||||
except Exception as e:
|
||||
logger.error(f"Error getting processor name from plugin: {str(e)}")
|
||||
|
||||
# Cache the map
|
||||
_plugin_name_map = result
|
||||
return result
|
||||
|
||||
def _get_plugin_by_name(processor_name: str) -> Optional[Any]:
|
||||
"""Get a plugin by its processor name
|
||||
:param processor_name: Name of the processor
|
||||
:return: Plugin object or None
|
||||
"""
|
||||
return _get_plugin_name_map().get(processor_name)
|
||||
|
||||
def _call_hook_for_plugin(plugin: Any, hook_name: str, default_value: T = None, **kwargs) -> Optional[T]:
|
||||
"""Call a hook for a specific plugin and handle exceptions
|
||||
:param plugin: The plugin to call the hook for
|
||||
:param hook_name: Name of the hook to call
|
||||
:param default_value: Default value to return if the hook call fails
|
||||
:param kwargs: Additional arguments to pass to the hook
|
||||
:return: Result of the hook call or default value
|
||||
"""
|
||||
if not plugin:
|
||||
return default_value
|
||||
|
||||
try:
|
||||
hook = getattr(plugin_manager.hook, hook_name)
|
||||
results = hook(plugin=plugin, **kwargs)
|
||||
|
||||
if results:
|
||||
return results[0]
|
||||
except Exception as e:
|
||||
logger.error(f"Error calling {hook_name} for plugin: {str(e)}")
|
||||
|
||||
return default_value
|
||||
|
||||
def get_all_processors() -> List[Tuple[str, str]]:
|
||||
"""Get all processors
|
||||
:return: List of tuples (processor_name, processor_description)
|
||||
"""
|
||||
processors = []
|
||||
|
||||
for processor_name, plugin in _get_plugin_name_map().items():
|
||||
description = _call_hook_for_plugin(plugin, 'get_processor_description')
|
||||
if description:
|
||||
processors.append((processor_name, description))
|
||||
|
||||
return processors
|
||||
|
||||
def get_processor_class(processor_name: str) -> Optional[Type[ProcessorClass]]:
|
||||
"""Get processor class by name
|
||||
:param processor_name: Name of the processor
|
||||
:return: Processor class or None
|
||||
"""
|
||||
plugin = _get_plugin_by_name(processor_name)
|
||||
return _call_hook_for_plugin(plugin, 'get_processor_class')
|
||||
|
||||
def get_processor_form(processor_name: str) -> Optional[Type[ProcessorForm]]:
|
||||
"""Get processor form by name
|
||||
:param processor_name: Name of the processor
|
||||
:return: Processor form class or None
|
||||
"""
|
||||
plugin = _get_plugin_by_name(processor_name)
|
||||
return _call_hook_for_plugin(plugin, 'get_processor_form')
|
||||
|
||||
def get_processor_watch_model(processor_name: str) -> Type[ProcessorWatchModel]:
|
||||
"""Get processor watch model by name
|
||||
:param processor_name: Name of the processor
|
||||
:return: Watch model class or default Watch model
|
||||
"""
|
||||
plugin = _get_plugin_by_name(processor_name)
|
||||
return _call_hook_for_plugin(plugin, 'get_processor_watch_model', default_value=Watch.model)
|
||||
|
||||
def get_processor_site_check(processor_name: str, datastore: Any, watch_uuid: str) -> Optional[ProcessorInstance]:
|
||||
"""Get a processor instance ready to perform site check
|
||||
:param processor_name: Name of the processor
|
||||
:param datastore: The application datastore
|
||||
:param watch_uuid: The UUID of the watch to check
|
||||
:return: A processor instance ready to perform site check, or None
|
||||
"""
|
||||
plugin = _get_plugin_by_name(processor_name)
|
||||
if not plugin:
|
||||
return None
|
||||
|
||||
# Try to get the perform_site_check implementation
|
||||
try:
|
||||
processor = _call_hook_for_plugin(
|
||||
plugin,
|
||||
'perform_site_check',
|
||||
datastore=datastore,
|
||||
watch_uuid=watch_uuid
|
||||
)
|
||||
if processor:
|
||||
return processor
|
||||
|
||||
# If no perform_site_check hook implementation, try getting the class and instantiating it
|
||||
processor_class = _call_hook_for_plugin(plugin, 'get_processor_class')
|
||||
if processor_class:
|
||||
return processor_class(datastore=datastore, watch_uuid=watch_uuid)
|
||||
except Exception as e:
|
||||
logger.error(f"Error getting processor site check for {processor_name}: {str(e)}")
|
||||
|
||||
return None
|
||||
|
||||
def get_display_link(url: str, processor_name: str) -> Optional[str]:
|
||||
"""Get a custom display link for the given processor
|
||||
:param url: The original URL from the watch
|
||||
:param processor_name: Name of the processor
|
||||
:return: A string with the custom display link or None to use the default
|
||||
"""
|
||||
plugin = _get_plugin_by_name(processor_name)
|
||||
return _call_hook_for_plugin(
|
||||
plugin,
|
||||
'get_display_link',
|
||||
url=url,
|
||||
processor_name=processor_name
|
||||
)
|
||||
|
||||
def get_plugin_processor_modules() -> List[Tuple[Any, str]]:
|
||||
"""Get processor modules for all plugins that can be used with the find_processors function
|
||||
|
||||
This function adapts pluggy plugins to be compatible with the traditional find_processors system
|
||||
|
||||
:return: A list of (module, processor_name) tuples
|
||||
"""
|
||||
result = []
|
||||
|
||||
# Import base modules once to avoid repeated imports
|
||||
from changedetectionio.processors.text_json_diff import processor as text_json_diff_processor
|
||||
|
||||
# For each plugin, map to a suitable module for find_processors
|
||||
for processor_name, plugin in _get_plugin_name_map().items():
|
||||
try:
|
||||
processor_class = _call_hook_for_plugin(plugin, 'get_processor_class')
|
||||
|
||||
if processor_class:
|
||||
# Check if this processor extends the text_json_diff processor
|
||||
base_class_name = str(processor_class.__bases__[0].__name__)
|
||||
if base_class_name == 'perform_site_check' or 'TextJsonDiffProcessor' in base_class_name:
|
||||
result.append((text_json_diff_processor, processor_name))
|
||||
except Exception as e:
|
||||
logger.error(f"Error mapping processor module for {processor_name}: {str(e)}")
|
||||
|
||||
return result
|
||||
169
changedetectionio/processors/whois_plugin.py
Normal file
169
changedetectionio/processors/whois_plugin.py
Normal file
@@ -0,0 +1,169 @@
|
||||
from loguru import logger
|
||||
import re
|
||||
import urllib.parse
|
||||
from .pluggy_interface import hookimpl
|
||||
from requests.structures import CaseInsensitiveDict
|
||||
from changedetectionio.content_fetchers.base import Fetcher
|
||||
|
||||
# Import the text_json_diff processor
|
||||
from changedetectionio.processors.text_json_diff.processor import perform_site_check as TextJsonDiffProcessor
|
||||
|
||||
# WHOIS Processor implementation that extends TextJsonDiffProcessor
|
||||
class WhoisProcessor(TextJsonDiffProcessor):
|
||||
|
||||
def _extract_domain_from_url(self, url):
|
||||
"""Extract domain from URL, removing www. prefix if present"""
|
||||
parsed_url = urllib.parse.urlparse(url)
|
||||
domain = parsed_url.netloc
|
||||
|
||||
# Remove www. prefix if present
|
||||
domain = re.sub(r'^www\.', '', domain)
|
||||
|
||||
return domain
|
||||
|
||||
def call_browser(self, preferred_proxy_id=None):
|
||||
"""Override call_browser to perform WHOIS lookup instead of using a browser
|
||||
|
||||
Note: The python-whois library doesn't directly support proxies. For real proxy support,
|
||||
we would need to implement a custom socket connection that routes through the proxy.
|
||||
This is a TODO for a future enhancement.
|
||||
"""
|
||||
# Initialize a basic fetcher - this is used by the parent class
|
||||
self.fetcher = Fetcher()
|
||||
|
||||
# Extract URL from watch
|
||||
url = self.watch.link
|
||||
|
||||
# Check for file:// access
|
||||
if re.search(r'^file:', url.strip(), re.IGNORECASE):
|
||||
if not self.datastore.data.get('settings', {}).get('application', {}).get('allow_file_uri', False):
|
||||
raise Exception("file:// type access is denied for security reasons.")
|
||||
|
||||
# Extract domain from URL
|
||||
domain = self._extract_domain_from_url(url)
|
||||
|
||||
# Ensure we have a valid domain
|
||||
if not domain:
|
||||
error_msg = f"Could not extract domain from URL: '{url}'"
|
||||
self.fetcher.content = error_msg
|
||||
self.fetcher.status_code = 400
|
||||
logger.error(error_msg)
|
||||
return
|
||||
|
||||
# Get proxy configuration using the common method from parent class
|
||||
proxy_config, proxy_url = super()._get_proxy_for_watch(preferred_proxy_id)
|
||||
|
||||
try:
|
||||
# Use python-whois to get domain information
|
||||
import whois
|
||||
|
||||
# If we have proxy config, use it for the WHOIS lookup
|
||||
# Note: The python-whois library doesn't directly support proxies,
|
||||
# but we can implement proxy support if necessary using custom socket code
|
||||
if proxy_config:
|
||||
# For now, just log that we would use a proxy
|
||||
logger.info(f"Using proxy for WHOIS lookup: {proxy_config}")
|
||||
|
||||
# Perform the WHOIS lookup
|
||||
whois_info = whois.whois(domain)
|
||||
|
||||
# Convert whois_info object to text
|
||||
if hasattr(whois_info, 'text'):
|
||||
# Some whois implementations store raw text in .text attribute
|
||||
whois_text = whois_info.text
|
||||
else:
|
||||
# Otherwise, format it nicely as key-value pairs
|
||||
whois_text = f"WHOIS Information for domain: {domain}\n\n"
|
||||
for key, value in whois_info.items():
|
||||
if value:
|
||||
whois_text += f"{key}: {value}\n"
|
||||
|
||||
# Set the content and status for the fetcher
|
||||
self.fetcher.content = whois_text
|
||||
self.fetcher.status_code = 200
|
||||
|
||||
# Setup headers dictionary for the fetcher
|
||||
self.fetcher.headers = CaseInsensitiveDict({
|
||||
'content-type': 'text/plain',
|
||||
'server': 'whois-processor'
|
||||
})
|
||||
|
||||
# Add getters for headers
|
||||
self.fetcher.get_all_headers = lambda: self.fetcher.headers
|
||||
self.fetcher.get_last_status_code = lambda: self.fetcher.status_code
|
||||
|
||||
# Implement necessary methods
|
||||
self.fetcher.quit = lambda: None
|
||||
|
||||
except Exception as e:
|
||||
error_msg = f"Error fetching WHOIS data for domain {domain}: {str(e)}"
|
||||
self.fetcher.content = error_msg
|
||||
self.fetcher.status_code = 500
|
||||
self.fetcher.headers = CaseInsensitiveDict({
|
||||
'content-type': 'text/plain',
|
||||
'server': 'whois-processor'
|
||||
})
|
||||
self.fetcher.get_all_headers = lambda: self.fetcher.headers
|
||||
self.fetcher.get_last_status_code = lambda: self.fetcher.status_code
|
||||
self.fetcher.quit = lambda: None
|
||||
logger.error(error_msg)
|
||||
|
||||
return
|
||||
|
||||
def run_changedetection(self, watch):
|
||||
"""Use the parent's run_changedetection which will use our overridden call_browser method"""
|
||||
try:
|
||||
# Let the parent class handle everything now that we've overridden call_browser
|
||||
changed_detected, update_obj, filtered_text = super().run_changedetection(watch)
|
||||
return changed_detected, update_obj, filtered_text
|
||||
|
||||
except Exception as e:
|
||||
error_msg = f"Error in WHOIS processor: {str(e)}"
|
||||
update_obj = {'last_notification_error': False, 'last_error': error_msg}
|
||||
logger.error(error_msg)
|
||||
return False, update_obj, error_msg.encode('utf-8')
|
||||
|
||||
@staticmethod
|
||||
def perform_site_check(datastore, watch_uuid):
|
||||
"""Factory method to create a WhoisProcessor instance - for compatibility with legacy code"""
|
||||
processor = WhoisProcessor(datastore=datastore, watch_uuid=watch_uuid)
|
||||
return processor
|
||||
|
||||
@hookimpl
|
||||
def perform_site_check(datastore, watch_uuid):
|
||||
"""Create and return a processor instance ready to perform site check"""
|
||||
return WhoisProcessor(datastore=datastore, watch_uuid=watch_uuid)
|
||||
|
||||
@hookimpl(trylast=True) # Use trylast to ensure this runs last in case of conflicts
|
||||
def get_processor_name():
|
||||
"""Return the name of this processor"""
|
||||
from loguru import logger
|
||||
logger.debug("whois_plugin.get_processor_name() called")
|
||||
return "whois"
|
||||
|
||||
@hookimpl
|
||||
def get_processor_description():
|
||||
"""Return the description of this processor"""
|
||||
return "WHOIS Domain Information Changes Detector"
|
||||
|
||||
@hookimpl
|
||||
def get_processor_class():
|
||||
"""Return the processor class"""
|
||||
return WhoisProcessor
|
||||
|
||||
@hookimpl
|
||||
def get_processor_form():
|
||||
"""Return the processor form class"""
|
||||
# Import here to avoid circular imports
|
||||
try:
|
||||
from changedetectionio.forms import processor_text_json_diff_form
|
||||
return processor_text_json_diff_form
|
||||
except Exception as e:
|
||||
from loguru import logger
|
||||
logger.error(f"Error importing form for whois plugin: {str(e)}")
|
||||
return None
|
||||
|
||||
@hookimpl
|
||||
def get_processor_watch_model():
|
||||
"""Return the watch model class for this processor"""
|
||||
return None # Use default watch model
|
||||
59
changedetectionio/tests/test_processor_registry.py
Normal file
59
changedetectionio/tests/test_processor_registry.py
Normal file
@@ -0,0 +1,59 @@
|
||||
import pytest
|
||||
from changedetectionio.processors.processor_registry import get_processor_class, get_all_processors
|
||||
|
||||
|
||||
def test_get_all_processors():
|
||||
"""Test that get_all_processors returns a list of processor tuples"""
|
||||
processors = get_all_processors()
|
||||
assert isinstance(processors, list)
|
||||
assert len(processors) > 0
|
||||
|
||||
# Each item should be a tuple of (name, description)
|
||||
for processor in processors:
|
||||
assert isinstance(processor, tuple)
|
||||
assert len(processor) == 2
|
||||
assert isinstance(processor[0], str)
|
||||
assert isinstance(processor[1], str)
|
||||
|
||||
# Check that our WHOIS processor is included
|
||||
whois_processor = next((p for p in processors if p[0] == "whois"), None)
|
||||
assert whois_processor is not None
|
||||
assert whois_processor[1] == "WHOIS Domain Information Changes"
|
||||
|
||||
|
||||
def test_get_processor_class():
|
||||
"""Test that get_processor_class returns the right class"""
|
||||
# Get the WHOIS processor class
|
||||
processor_class = get_processor_class("whois")
|
||||
assert processor_class is not None
|
||||
|
||||
# It should have perform_site_check method
|
||||
assert hasattr(processor_class, 'perform_site_check')
|
||||
|
||||
# Check for non-existent processor
|
||||
non_existent = get_processor_class("non_existent_processor")
|
||||
assert non_existent is None
|
||||
|
||||
|
||||
def test_get_processor_site_check():
|
||||
"""Test that get_processor_site_check returns a processor instance"""
|
||||
from unittest.mock import MagicMock
|
||||
from changedetectionio.processors.processor_registry import get_processor_site_check
|
||||
|
||||
# Get a WHOIS processor instance
|
||||
mock_datastore = MagicMock()
|
||||
watch_uuid = "test-uuid"
|
||||
processor = get_processor_site_check("whois", mock_datastore, watch_uuid)
|
||||
|
||||
# It should be a processor instance
|
||||
assert processor is not None
|
||||
|
||||
# It should have the run_changedetection method
|
||||
assert hasattr(processor, 'run_changedetection')
|
||||
|
||||
# It should have the call_browser method
|
||||
assert hasattr(processor, 'call_browser')
|
||||
|
||||
# Check for non-existent processor
|
||||
non_existent = get_processor_site_check("non_existent_processor", mock_datastore, watch_uuid)
|
||||
assert non_existent is None
|
||||
182
changedetectionio/tests/test_whois_processor.py
Normal file
182
changedetectionio/tests/test_whois_processor.py
Normal file
@@ -0,0 +1,182 @@
|
||||
import pytest
|
||||
from unittest.mock import MagicMock, patch
|
||||
from changedetectionio.processors.whois_plugin import WhoisProcessor
|
||||
|
||||
|
||||
class MockWatch:
|
||||
def __init__(self, url, previous_md5=None, include_filters=None, ignore_text=None):
|
||||
self.url = url
|
||||
self._previous_md5 = previous_md5
|
||||
self._include_filters = include_filters or []
|
||||
self._ignore_text = ignore_text or []
|
||||
self.history = {}
|
||||
|
||||
def get(self, key, default=None):
|
||||
if key == 'previous_md5':
|
||||
return self._previous_md5
|
||||
elif key == 'include_filters':
|
||||
return self._include_filters
|
||||
elif key == 'ignore_text':
|
||||
return self._ignore_text
|
||||
elif key == 'url':
|
||||
return self.url
|
||||
return default
|
||||
|
||||
def has_special_diff_filter_options_set(self):
|
||||
return False
|
||||
|
||||
|
||||
@patch('whois.whois')
|
||||
@patch('changedetectionio.processors.difference_detection_processor.__init__')
|
||||
@patch('changedetectionio.processors.text_json_diff.processor.perform_site_check.run_changedetection')
|
||||
def test_whois_processor_basic_functionality(mock_super_run, mock_base_init, mock_whois):
|
||||
"""Test the basic functionality of the WhoisProcessor"""
|
||||
# Mock the base class init so we don't need to set up the full watch structure
|
||||
mock_base_init.return_value = None
|
||||
|
||||
# Mock super().run_changedetection to return a simple result
|
||||
mock_super_run.return_value = (False, {'previous_md5': 'some-md5'}, b'Some filtered text')
|
||||
|
||||
# Mock the whois response
|
||||
mock_whois_result = MagicMock()
|
||||
mock_whois_result.text = "Domain Name: example.com\nRegistrar: Example Registrar\nCreation Date: 2020-01-01\n"
|
||||
mock_whois.return_value = mock_whois_result
|
||||
|
||||
# Create mock datastore
|
||||
mock_datastore = MagicMock()
|
||||
mock_datastore.proxy_list = None # No proxies
|
||||
mock_datastore.get_preferred_proxy_for_watch.return_value = None
|
||||
mock_datastore.data = {
|
||||
'settings': {
|
||||
'application': {
|
||||
'allow_file_uri': False
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
# Create a processor instance and setup minimal required attributes
|
||||
processor = WhoisProcessor(datastore=mock_datastore, watch_uuid='test-uuid')
|
||||
|
||||
# Create a minimal watch object
|
||||
watch = MockWatch(url="https://example.com")
|
||||
|
||||
# Simulate link access in the watch
|
||||
processor.watch = MagicMock()
|
||||
processor.watch.link = "https://example.com"
|
||||
processor.watch.get.return_value = "uuid-123"
|
||||
|
||||
# Run the processor's run_changedetection method by first using call_browser
|
||||
processor.call_browser()
|
||||
|
||||
# Check that the fetcher was set up correctly
|
||||
assert processor.fetcher is not None
|
||||
assert hasattr(processor.fetcher, 'content')
|
||||
assert hasattr(processor.fetcher, 'headers')
|
||||
assert hasattr(processor.fetcher, 'status_code')
|
||||
|
||||
# Verify that whois was called with the right domain
|
||||
assert mock_whois.called
|
||||
assert mock_whois.call_args[0][0] == 'example.com'
|
||||
|
||||
# Now run the processor
|
||||
result = processor.run_changedetection(watch)
|
||||
|
||||
# Check that the parent run_changedetection was called
|
||||
assert mock_super_run.called
|
||||
|
||||
|
||||
@patch('whois.whois')
|
||||
@patch('changedetectionio.processors.difference_detection_processor.__init__')
|
||||
def test_whois_processor_call_browser_with_proxy(mock_base_init, mock_whois):
|
||||
"""Test the call_browser method with proxy configuration"""
|
||||
# Mock the base class init
|
||||
mock_base_init.return_value = None
|
||||
|
||||
# Mock the whois response
|
||||
mock_whois_result = MagicMock()
|
||||
mock_whois_result.text = "Domain Name: example.com\nRegistrar: Example Registrar\nCreation Date: 2020-01-01\n"
|
||||
mock_whois.return_value = mock_whois_result
|
||||
|
||||
# Create mock datastore
|
||||
mock_datastore = MagicMock()
|
||||
mock_proxy = {
|
||||
'test-proxy': {
|
||||
'url': 'http://proxy.example.com:8080',
|
||||
'label': 'Test Proxy'
|
||||
}
|
||||
}
|
||||
mock_datastore.proxy_list = mock_proxy
|
||||
mock_datastore.get_preferred_proxy_for_watch.return_value = 'test-proxy'
|
||||
mock_datastore.data = {
|
||||
'settings': {
|
||||
'application': {
|
||||
'allow_file_uri': False
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
# Create a processor instance with our mock datastore
|
||||
processor = WhoisProcessor(datastore=mock_datastore, watch_uuid='test-uuid')
|
||||
|
||||
# Set up watch
|
||||
processor.watch = MagicMock()
|
||||
processor.watch.link = "https://example.com"
|
||||
processor.watch.get.return_value = "uuid-123"
|
||||
|
||||
# Call the method with a proxy
|
||||
processor.call_browser()
|
||||
|
||||
# Verify whois was called
|
||||
assert mock_whois.called
|
||||
assert mock_whois.call_args[0][0] == 'example.com'
|
||||
|
||||
# Check that the fetcher was set up correctly
|
||||
assert processor.fetcher is not None
|
||||
assert processor.fetcher.content is not None
|
||||
|
||||
|
||||
@patch('changedetectionio.processors.difference_detection_processor.__init__')
|
||||
def test_whois_processor_perform_site_check(mock_base_init):
|
||||
"""Test the WhoisProcessor.perform_site_check static method"""
|
||||
mock_base_init.return_value = None
|
||||
|
||||
# Test the static method
|
||||
with patch.object(WhoisProcessor, '__init__', return_value=None) as mock_init:
|
||||
datastore = MagicMock()
|
||||
watch_uuid = "test-uuid"
|
||||
|
||||
# Call the static method
|
||||
processor = WhoisProcessor.perform_site_check(datastore=datastore, watch_uuid=watch_uuid)
|
||||
|
||||
# Check that constructor was called with expected args
|
||||
mock_init.assert_called_once_with(datastore=datastore, watch_uuid=watch_uuid)
|
||||
|
||||
# Check it returns the right type
|
||||
assert isinstance(processor, WhoisProcessor)
|
||||
|
||||
|
||||
def test_get_display_link():
|
||||
"""Test the get_display_link hook implementation"""
|
||||
from changedetectionio.processors.whois_plugin import get_display_link
|
||||
|
||||
# Test with a regular URL
|
||||
url = "https://example.com/some/path?param=value"
|
||||
processor_name = "whois"
|
||||
link = get_display_link(url=url, processor_name=processor_name)
|
||||
assert link == "WHOIS - example.com"
|
||||
|
||||
# Test with a subdomain
|
||||
url = "https://subdomain.example.com/"
|
||||
link = get_display_link(url=url, processor_name=processor_name)
|
||||
assert link == "WHOIS - subdomain.example.com"
|
||||
|
||||
# Test with www prefix (should be removed)
|
||||
url = "https://www.example.com/"
|
||||
link = get_display_link(url=url, processor_name=processor_name)
|
||||
assert link == "WHOIS - example.com"
|
||||
|
||||
# Test with a different processor (should return None)
|
||||
url = "https://example.com/"
|
||||
processor_name = "text_json_diff"
|
||||
link = get_display_link(url=url, processor_name=processor_name)
|
||||
assert link is None
|
||||
@@ -61,5 +61,22 @@ class TestDiffBuilder(unittest.TestCase):
|
||||
p = watch.get_from_version_based_on_last_viewed
|
||||
assert p == "100", "Correct with only one history snapshot"
|
||||
|
||||
def test_watch_link_property_with_processor(self):
|
||||
"""Test the link property with a processor that customizes the link"""
|
||||
from unittest.mock import patch
|
||||
|
||||
watch = Watch.model(datastore_path='/tmp', default={})
|
||||
watch['url'] = 'https://example.com'
|
||||
watch['processor'] = 'whois'
|
||||
|
||||
# Mock the processor registry's get_display_link function
|
||||
with patch('changedetectionio.processors.processor_registry.get_display_link') as mock_get_display_link:
|
||||
mock_get_display_link.return_value = "WHOIS - example.com"
|
||||
|
||||
# The link property should use the customized link from the processor
|
||||
assert watch.link == "WHOIS - example.com"
|
||||
mock_get_display_link.assert_called_once_with(url='https://example.com', processor_name='whois')
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
||||
@@ -271,19 +271,38 @@ class update_worker(threading.Thread):
|
||||
|
||||
try:
|
||||
# Processor is what we are using for detecting the "Change"
|
||||
processor = watch.get('processor', 'text_json_diff')
|
||||
processor_name = watch.get('processor', 'text_json_diff')
|
||||
|
||||
# Init a new 'difference_detection_processor', first look in processors
|
||||
processor_module_name = f"changedetectionio.processors.{processor}.processor"
|
||||
|
||||
|
||||
# First, try to get the processor from our plugin registry
|
||||
try:
|
||||
processor_module = importlib.import_module(processor_module_name)
|
||||
except ModuleNotFoundError as e:
|
||||
print(f"Processor module '{processor}' not found.")
|
||||
raise e
|
||||
|
||||
update_handler = processor_module.perform_site_check(datastore=self.datastore,
|
||||
watch_uuid=uuid
|
||||
)
|
||||
from changedetectionio.processors.processor_registry import get_processor_site_check
|
||||
update_handler = get_processor_site_check(processor_name, self.datastore, uuid)
|
||||
|
||||
if update_handler:
|
||||
# We found the processor in our plugin registry
|
||||
logger.info(f"Using processor '{processor_name}' from plugin registry")
|
||||
else:
|
||||
# Fall back to the traditional file-based approach
|
||||
processor_module_name = f"changedetectionio.processors.{processor_name}.processor"
|
||||
try:
|
||||
processor_module = importlib.import_module(processor_module_name)
|
||||
update_handler = processor_module.perform_site_check(datastore=self.datastore,
|
||||
watch_uuid=uuid)
|
||||
except ModuleNotFoundError as e:
|
||||
print(f"Processor module '{processor_name}' not found in both plugin registry and file system.")
|
||||
raise e
|
||||
except ImportError as e:
|
||||
# If processor_registry.py cannot be imported, fall back to the traditional approach
|
||||
processor_module_name = f"changedetectionio.processors.{processor_name}.processor"
|
||||
try:
|
||||
processor_module = importlib.import_module(processor_module_name)
|
||||
update_handler = processor_module.perform_site_check(datastore=self.datastore,
|
||||
watch_uuid=uuid)
|
||||
except ModuleNotFoundError as e:
|
||||
print(f"Processor module '{processor_name}' not found.")
|
||||
raise e
|
||||
|
||||
update_handler.call_browser()
|
||||
|
||||
|
||||
31
test_processor_registration.py
Normal file
31
test_processor_registration.py
Normal file
@@ -0,0 +1,31 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
from changedetectionio.processors import available_processors
|
||||
from changedetectionio.processors.processor_registry import get_processor_class, get_processor_form
|
||||
|
||||
# Test processor registration
|
||||
print("=== Available Processors ===")
|
||||
processors = available_processors()
|
||||
for name, description in processors:
|
||||
print(f"Processor: {name} - {description}")
|
||||
|
||||
# Check if our WHOIS processor is registered
|
||||
whois_processor_name = "whois_processor"
|
||||
whois_found = any(name == whois_processor_name for name, _ in processors)
|
||||
|
||||
if whois_found:
|
||||
print(f"\nWHOIS Processor found! Getting processor class and form...")
|
||||
|
||||
# Get the processor class
|
||||
processor_class = get_processor_class(whois_processor_name)
|
||||
print(f"Processor class: {processor_class}")
|
||||
print(f"Processor class name: {processor_class.__name__ if processor_class else None}")
|
||||
print(f"Processor class module: {processor_class.__module__ if processor_class else None}")
|
||||
|
||||
# Get the processor form
|
||||
processor_form = get_processor_form(whois_processor_name)
|
||||
print(f"Processor form: {processor_form}")
|
||||
|
||||
print("\nWHOIS Processor successfully registered")
|
||||
else:
|
||||
print(f"\nWHOIS Processor not found in available processors")
|
||||
16
test_processors.py
Executable file
16
test_processors.py
Executable file
@@ -0,0 +1,16 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
from changedetectionio.processors import available_processors
|
||||
from changedetectionio.processors import find_processors
|
||||
|
||||
# Test traditional processor discovery
|
||||
print("=== Traditional Processor Discovery ===")
|
||||
traditional_processors = find_processors()
|
||||
for module, name in traditional_processors:
|
||||
print(f"Found processor: {name} in {module.__name__}")
|
||||
|
||||
# Test combined processor discovery (traditional + pluggy)
|
||||
print("\n=== Combined Processor Discovery ===")
|
||||
combined_processors = available_processors()
|
||||
for name, description in combined_processors:
|
||||
print(f"Processor: {name} - {description}")
|
||||
53
test_whois_extraction.py
Normal file
53
test_whois_extraction.py
Normal file
@@ -0,0 +1,53 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
import urllib.parse
|
||||
import re
|
||||
import sys
|
||||
|
||||
def extract_domain_from_url(url):
|
||||
"""Extract domain from a URL"""
|
||||
parsed_url = urllib.parse.urlparse(url)
|
||||
domain = parsed_url.netloc
|
||||
|
||||
# Remove www. prefix if present
|
||||
domain = re.sub(r'^www\.', '', domain)
|
||||
|
||||
return domain
|
||||
|
||||
# Test domain extraction
|
||||
test_urls = [
|
||||
"https://changedetection.io",
|
||||
"http://www.example.com/page",
|
||||
"https://subdomain.domain.co.uk/path?query=1",
|
||||
"ftp://ftp.example.org",
|
||||
"https://www.changedetection.io/page/subpage",
|
||||
]
|
||||
|
||||
print("=== Domain Extraction Test ===")
|
||||
for url in test_urls:
|
||||
domain = extract_domain_from_url(url)
|
||||
print(f"URL: {url} -> Domain: {domain}")
|
||||
|
||||
# Test WHOIS lookup for changedetection.io
|
||||
try:
|
||||
import whois
|
||||
|
||||
domain = extract_domain_from_url("https://changedetection.io")
|
||||
print(f"\n=== WHOIS lookup for {domain} ===")
|
||||
|
||||
whois_info = whois.whois(domain)
|
||||
|
||||
# Print key information
|
||||
print(f"Domain Name: {whois_info.get('domain_name', '')}")
|
||||
print(f"Registrar: {whois_info.get('registrar', '')}")
|
||||
print(f"Creation Date: {whois_info.get('creation_date', '')}")
|
||||
print(f"Expiration Date: {whois_info.get('expiration_date', '')}")
|
||||
|
||||
print("\nWHOIS lookup successful!")
|
||||
|
||||
except ImportError:
|
||||
print("python-whois module not installed. Run: pip install python-whois")
|
||||
sys.exit(1)
|
||||
except Exception as e:
|
||||
print(f"Error performing WHOIS lookup: {str(e)}")
|
||||
sys.exit(1)
|
||||
47
test_whois_processor.py
Normal file
47
test_whois_processor.py
Normal file
@@ -0,0 +1,47 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
from changedetectionio.processors import available_processors
|
||||
from changedetectionio.processors.processor_registry import get_processor_class
|
||||
import urllib.parse
|
||||
import sys
|
||||
|
||||
# First, verify our processor is available
|
||||
print("=== Available Processors ===")
|
||||
processors = available_processors()
|
||||
for name, description in processors:
|
||||
print(f"Processor: {name} - {description}")
|
||||
|
||||
# Get the WHOIS processor class
|
||||
whois_processor_class = get_processor_class("whois_processor")
|
||||
if not whois_processor_class:
|
||||
print("ERROR: WHOIS processor not found in available processors.")
|
||||
sys.exit(1)
|
||||
|
||||
print(f"\nFound WHOIS processor class: {whois_processor_class}")
|
||||
|
||||
# Test the WHOIS processor directly
|
||||
try:
|
||||
# Parse a domain from a URL
|
||||
url = "https://changedetection.io"
|
||||
parsed_url = urllib.parse.urlparse(url)
|
||||
domain = parsed_url.netloc
|
||||
|
||||
# Import whois and fetch information
|
||||
import whois
|
||||
whois_info = whois.whois(domain)
|
||||
|
||||
print(f"\n=== WHOIS Information for {domain} ===")
|
||||
|
||||
# Print the information
|
||||
if hasattr(whois_info, 'text'):
|
||||
print(whois_info.text)
|
||||
else:
|
||||
for key, value in whois_info.items():
|
||||
if value:
|
||||
print(f"{key}: {value}")
|
||||
|
||||
print("\nSuccessfully retrieved WHOIS data!")
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error fetching WHOIS data: {str(e)}")
|
||||
sys.exit(1)
|
||||
136
test_whois_processor_full.py
Normal file
136
test_whois_processor_full.py
Normal file
@@ -0,0 +1,136 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
from changedetectionio.processors import available_processors
|
||||
from changedetectionio.processors.processor_registry import get_processor_class
|
||||
import unittest
|
||||
import sys
|
||||
from unittest.mock import MagicMock, patch
|
||||
import urllib.parse
|
||||
|
||||
# First, verify our processor is available
|
||||
print("=== Available Processors ===")
|
||||
processors = available_processors()
|
||||
for name, description in processors:
|
||||
print(f"Processor: {name} - {description}")
|
||||
|
||||
# Get the WHOIS processor class
|
||||
whois_processor_class = get_processor_class("whois_processor")
|
||||
if not whois_processor_class:
|
||||
print("ERROR: WHOIS processor not found in available processors.")
|
||||
sys.exit(1)
|
||||
|
||||
print(f"\nFound WHOIS processor class: {whois_processor_class}")
|
||||
|
||||
# Create a test for our WHOIS processor
|
||||
class TestWhoisProcessor(unittest.TestCase):
|
||||
|
||||
# Use the real whois function - tests will actually make network requests
|
||||
def test_whois_processor_real(self):
|
||||
# Extract the domain from the URL
|
||||
test_url = "https://changedetection.io"
|
||||
parsed_url = urllib.parse.urlparse(test_url)
|
||||
domain = parsed_url.netloc
|
||||
|
||||
# Create a minimal mock datastore
|
||||
mock_datastore = MagicMock()
|
||||
mock_datastore.data = {
|
||||
'watching': {'test-uuid': {'url': test_url}},
|
||||
'settings': {
|
||||
'application': {'empty_pages_are_a_change': False},
|
||||
'requests': {'timeout': 30}
|
||||
}
|
||||
}
|
||||
mock_datastore.get_all_base_headers.return_value = {}
|
||||
mock_datastore.get_all_headers_in_textfile_for_watch.return_value = {}
|
||||
mock_datastore.get_preferred_proxy_for_watch.return_value = None
|
||||
mock_datastore.get_tag_overrides_for_watch.return_value = []
|
||||
|
||||
# Create a minimal mock watch that mimics the real Watch class
|
||||
class MockWatch:
|
||||
def __init__(self, url):
|
||||
self.link = url
|
||||
self.is_pdf = False
|
||||
self.has_browser_steps = False
|
||||
self.is_source_type_url = False
|
||||
self.history = {}
|
||||
self.history_n = 0
|
||||
self.last_viewed = 0
|
||||
self.newest_history_key = 0
|
||||
|
||||
def get(self, key, default=None):
|
||||
if key == 'uuid':
|
||||
return 'test-uuid'
|
||||
elif key == 'include_filters':
|
||||
return []
|
||||
elif key == 'body':
|
||||
return None
|
||||
elif key == 'method':
|
||||
return 'GET'
|
||||
elif key == 'headers':
|
||||
return {}
|
||||
elif key == 'browser_steps':
|
||||
return []
|
||||
return default
|
||||
|
||||
def __getitem__(self, key):
|
||||
return self.get(key)
|
||||
|
||||
def get_last_fetched_text_before_filters(self):
|
||||
return ""
|
||||
|
||||
def save_last_text_fetched_before_filters(self, content):
|
||||
pass
|
||||
|
||||
def has_special_diff_filter_options_set(self):
|
||||
return False
|
||||
|
||||
def lines_contain_something_unique_compared_to_history(self, lines, ignore_whitespace):
|
||||
return True
|
||||
|
||||
mock_watch = MockWatch(test_url)
|
||||
|
||||
# Create a more complete mock fetcher
|
||||
class MockFetcher:
|
||||
def __init__(self):
|
||||
self.content = ""
|
||||
self.raw_content = b""
|
||||
self.headers = {'Content-Type': 'text/plain'}
|
||||
self.screenshot = None
|
||||
self.xpath_data = None
|
||||
self.instock_data = None
|
||||
self.browser_steps = []
|
||||
|
||||
def get_last_status_code(self):
|
||||
return 200
|
||||
|
||||
def get_all_headers(self):
|
||||
return {'content-type': 'text/plain'}
|
||||
|
||||
def quit(self):
|
||||
pass
|
||||
|
||||
def run(self, **kwargs):
|
||||
pass
|
||||
|
||||
# Create the processor and set the mock fetcher
|
||||
processor = whois_processor_class(datastore=mock_datastore, watch_uuid='test-uuid')
|
||||
processor.fetcher = MockFetcher()
|
||||
|
||||
# Run the processor - this will make an actual WHOIS request
|
||||
changed, update_obj, content = processor.run_changedetection(mock_watch)
|
||||
|
||||
# Print the content for debugging
|
||||
content_str = content.decode('utf-8')
|
||||
print(f"\n=== WHOIS Content from processor (first 200 chars) ===")
|
||||
print(content_str[:200] + "...")
|
||||
|
||||
# Verify the content contains domain information
|
||||
self.assertIn(domain, content_str)
|
||||
self.assertIn("Domain Name", content_str)
|
||||
self.assertIn("Creation Date", content_str)
|
||||
|
||||
print("\nWHOIS processor test with real data PASSED!")
|
||||
|
||||
# Run the test
|
||||
if __name__ == "__main__":
|
||||
unittest.main(argv=['first-arg-is-ignored'], exit=False)
|
||||
39
test_whois_simple.py
Normal file
39
test_whois_simple.py
Normal file
@@ -0,0 +1,39 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
import urllib.parse
|
||||
import re
|
||||
import whois
|
||||
|
||||
# Test with changedetection.io domain
|
||||
url = "https://changedetection.io"
|
||||
|
||||
# Extract domain from URL
|
||||
parsed_url = urllib.parse.urlparse(url)
|
||||
domain = parsed_url.netloc
|
||||
|
||||
# Remove www. prefix if present
|
||||
domain = re.sub(r'^www\.', '', domain)
|
||||
|
||||
# Fetch WHOIS information
|
||||
print(f"Looking up WHOIS data for domain: {domain}")
|
||||
whois_info = whois.whois(domain)
|
||||
|
||||
# Print key WHOIS data
|
||||
print("\nKey WHOIS information:")
|
||||
print(f"Domain Name: {whois_info.get('domain_name', 'Unknown')}")
|
||||
print(f"Registrar: {whois_info.get('registrar', 'Unknown')}")
|
||||
print(f"Creation Date: {whois_info.get('creation_date', 'Unknown')}")
|
||||
print(f"Expiration Date: {whois_info.get('expiration_date', 'Unknown')}")
|
||||
print(f"Updated Date: {whois_info.get('updated_date', 'Unknown')}")
|
||||
|
||||
# Format as text
|
||||
whois_text = f"WHOIS Information for domain: {domain}\n\n"
|
||||
for key, value in whois_info.items():
|
||||
if value:
|
||||
whois_text += f"{key}: {value}\n"
|
||||
|
||||
# Print the first 200 characters
|
||||
print("\nFormatted WHOIS data (first 200 chars):")
|
||||
print(whois_text[:200] + "...")
|
||||
|
||||
print("\nWHOIS lookup successful!")
|
||||
Reference in New Issue
Block a user