mirror of
https://github.com/dgtlmoon/changedetection.io.git
synced 2025-11-17 23:16:10 +00:00
Compare commits
41 Commits
email-noti
...
conditions
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
79166c0307 | ||
|
|
9dbe91e470 | ||
|
|
51bd8cd2d7 | ||
|
|
35455e7dd6 | ||
|
|
aaa038f082 | ||
|
|
57eeb221cb | ||
|
|
8187b9ce4c | ||
|
|
cc70b65bfa | ||
|
|
42099f1fff | ||
|
|
408864d346 | ||
|
|
02b8660bf3 | ||
|
|
947a60af89 | ||
|
|
a0f4cb4d65 | ||
|
|
71ea8d80f3 | ||
|
|
4f48958187 | ||
|
|
2608980b1d | ||
|
|
c982395d72 | ||
|
|
ee7e43ea87 | ||
|
|
da5585b53c | ||
|
|
76062c9419 | ||
|
|
675953797c | ||
|
|
b202652a93 | ||
|
|
617dc721bf | ||
|
|
ec13720694 | ||
|
|
ddacb0bcbc | ||
|
|
f67d98b839 | ||
|
|
beee93d528 | ||
|
|
987ab3e494 | ||
|
|
0c68cfffb1 | ||
|
|
e93a9244fe | ||
|
|
e56eec41c1 | ||
|
|
31f4bb7cc3 | ||
|
|
f08efde110 | ||
|
|
9b39b2853b | ||
|
|
892d38ba42 | ||
|
|
b170e191d4 | ||
|
|
edb78efcca | ||
|
|
383f90b70c | ||
|
|
6948418865 | ||
|
|
cd80e317f3 | ||
|
|
8c26210804 |
@@ -712,12 +712,52 @@ def changedetection_app(config=None, datastore_o=None):
|
|||||||
# Does it use some custom form? does one exist?
|
# Does it use some custom form? does one exist?
|
||||||
processor_name = datastore.data['watching'][uuid].get('processor', '')
|
processor_name = datastore.data['watching'][uuid].get('processor', '')
|
||||||
processor_classes = next((tpl for tpl in find_processors() if tpl[1] == processor_name), None)
|
processor_classes = next((tpl for tpl in find_processors() if tpl[1] == processor_name), None)
|
||||||
|
|
||||||
|
# If it's not found in traditional processors, check if it's a pluggy plugin
|
||||||
if not processor_classes:
|
if not processor_classes:
|
||||||
flash(f"Cannot load the edit form for processor/plugin '{processor_classes[1]}', plugin missing?", 'error')
|
try:
|
||||||
|
from changedetectionio.processors.processor_registry import get_processor_form, _get_plugin_name_map
|
||||||
|
|
||||||
|
# Get all available plugins for debugging
|
||||||
|
available_plugins = list(_get_plugin_name_map().keys())
|
||||||
|
logger.debug(f"Available processor plugins: {available_plugins}")
|
||||||
|
|
||||||
|
# Try to get the processor form
|
||||||
|
plugin_form_class = get_processor_form(processor_name)
|
||||||
|
|
||||||
|
if plugin_form_class:
|
||||||
|
# Use default text_json_diff_form as parent module for plugins
|
||||||
|
from changedetectionio.processors.text_json_diff import processor as text_json_diff_processor
|
||||||
|
form_class = forms.processor_text_json_diff_form
|
||||||
|
parent_module = get_parent_module(text_json_diff_processor)
|
||||||
|
|
||||||
|
# Skip the normal form loading code path
|
||||||
|
use_plugin_form = True
|
||||||
|
logger.debug(f"Successfully loaded form for plugin '{processor_name}'")
|
||||||
|
else:
|
||||||
|
# Check if the plugin is registered but doesn't have a form
|
||||||
|
if processor_name in available_plugins:
|
||||||
|
logger.error(f"Plugin '{processor_name}' is registered but has no form class")
|
||||||
|
flash(f"Plugin '{processor_name}' is registered but has no form class", 'error')
|
||||||
|
else:
|
||||||
|
logger.error(f"Cannot find plugin '{processor_name}'. Available plugins: {available_plugins}")
|
||||||
|
flash(f"Cannot load the edit form for processor/plugin '{processor_name}', plugin missing?", 'error')
|
||||||
return redirect(url_for('index'))
|
return redirect(url_for('index'))
|
||||||
|
except ImportError as e:
|
||||||
|
logger.error(f"Import error when loading plugin form: {str(e)}")
|
||||||
|
flash(f"Cannot load the edit form for processor/plugin '{processor_name}', plugin system not available?", 'error')
|
||||||
|
return redirect(url_for('index'))
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Unexpected error loading plugin form: {str(e)}")
|
||||||
|
flash(f"Error loading plugin form: {str(e)}", 'error')
|
||||||
|
return redirect(url_for('index'))
|
||||||
|
else:
|
||||||
|
# Traditional processor - continue with normal flow
|
||||||
parent_module = get_parent_module(processor_classes[0])
|
parent_module = get_parent_module(processor_classes[0])
|
||||||
|
use_plugin_form = False
|
||||||
|
|
||||||
|
# Only follow this path for traditional processors
|
||||||
|
if not use_plugin_form:
|
||||||
try:
|
try:
|
||||||
# Get the parent of the "processor.py" go up one, get the form (kinda spaghetti but its reusing existing code)
|
# Get the parent of the "processor.py" go up one, get the form (kinda spaghetti but its reusing existing code)
|
||||||
forms_module = importlib.import_module(f"{parent_module.__name__}.forms")
|
forms_module = importlib.import_module(f"{parent_module.__name__}.forms")
|
||||||
|
|||||||
@@ -67,7 +67,6 @@ class model(watch_base):
|
|||||||
|
|
||||||
@property
|
@property
|
||||||
def link(self):
|
def link(self):
|
||||||
|
|
||||||
url = self.get('url', '')
|
url = self.get('url', '')
|
||||||
if not is_safe_url(url):
|
if not is_safe_url(url):
|
||||||
return 'DISABLED'
|
return 'DISABLED'
|
||||||
@@ -93,6 +92,19 @@ class model(watch_base):
|
|||||||
# Also double check it after any Jinja2 formatting just incase
|
# Also double check it after any Jinja2 formatting just incase
|
||||||
if not is_safe_url(ready_url):
|
if not is_safe_url(ready_url):
|
||||||
return 'DISABLED'
|
return 'DISABLED'
|
||||||
|
|
||||||
|
# Check if a processor wants to customize the display link
|
||||||
|
processor_name = self.get('processor')
|
||||||
|
if processor_name:
|
||||||
|
try:
|
||||||
|
# Import here to avoid circular imports
|
||||||
|
from changedetectionio.processors.processor_registry import get_display_link
|
||||||
|
custom_link = get_display_link(url=ready_url, processor_name=processor_name)
|
||||||
|
if custom_link:
|
||||||
|
return custom_link
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error getting custom display link for processor {processor_name}: {str(e)}")
|
||||||
|
|
||||||
return ready_url
|
return ready_url
|
||||||
|
|
||||||
def clear_watch(self):
|
def clear_watch(self):
|
||||||
|
|||||||
@@ -3,6 +3,7 @@ from changedetectionio.content_fetchers.base import Fetcher
|
|||||||
from changedetectionio.strtobool import strtobool
|
from changedetectionio.strtobool import strtobool
|
||||||
from copy import deepcopy
|
from copy import deepcopy
|
||||||
from loguru import logger
|
from loguru import logger
|
||||||
|
|
||||||
import hashlib
|
import hashlib
|
||||||
import importlib
|
import importlib
|
||||||
import inspect
|
import inspect
|
||||||
@@ -10,6 +11,10 @@ import os
|
|||||||
import pkgutil
|
import pkgutil
|
||||||
import re
|
import re
|
||||||
|
|
||||||
|
# Import the plugin manager
|
||||||
|
from .pluggy_interface import plugin_manager
|
||||||
|
|
||||||
|
|
||||||
class difference_detection_processor():
|
class difference_detection_processor():
|
||||||
|
|
||||||
browser_steps = None
|
browser_steps = None
|
||||||
@@ -27,8 +32,94 @@ class difference_detection_processor():
|
|||||||
# Generic fetcher that should be extended (requests, playwright etc)
|
# Generic fetcher that should be extended (requests, playwright etc)
|
||||||
self.fetcher = Fetcher()
|
self.fetcher = Fetcher()
|
||||||
|
|
||||||
def call_browser(self, preferred_proxy_id=None):
|
def _get_proxy_for_watch(self, preferred_proxy_id=None):
|
||||||
|
"""Get proxy configuration based on watch settings and preferred proxy ID
|
||||||
|
|
||||||
|
Args:
|
||||||
|
preferred_proxy_id: Optional explicit proxy ID to use
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
dict: Proxy configuration or None if no proxy should be used
|
||||||
|
str: Proxy URL or None if no proxy should be used
|
||||||
|
"""
|
||||||
|
# Default to no proxy config
|
||||||
|
proxy_config = None
|
||||||
|
proxy_url = None
|
||||||
|
|
||||||
|
# Check if datastore is available and has get_preferred_proxy_for_watch method
|
||||||
|
if hasattr(self, 'datastore') and self.datastore:
|
||||||
|
try:
|
||||||
|
# Get preferred proxy ID if not provided
|
||||||
|
if not preferred_proxy_id and hasattr(self.datastore, 'get_preferred_proxy_for_watch'):
|
||||||
|
# Get the watch UUID if available
|
||||||
|
watch_uuid = None
|
||||||
|
if hasattr(self.watch, 'get'):
|
||||||
|
watch_uuid = self.watch.get('uuid')
|
||||||
|
elif hasattr(self.watch, 'uuid'):
|
||||||
|
watch_uuid = self.watch.uuid
|
||||||
|
|
||||||
|
if watch_uuid:
|
||||||
|
preferred_proxy_id = self.datastore.get_preferred_proxy_for_watch(uuid=watch_uuid)
|
||||||
|
|
||||||
|
# Check if we have a proxy list and a valid proxy ID
|
||||||
|
if preferred_proxy_id and hasattr(self.datastore, 'proxy_list') and self.datastore.proxy_list:
|
||||||
|
proxy_info = self.datastore.proxy_list.get(preferred_proxy_id)
|
||||||
|
|
||||||
|
if proxy_info and 'url' in proxy_info:
|
||||||
|
proxy_url = proxy_info.get('url')
|
||||||
|
logger.debug(f"Selected proxy key '{preferred_proxy_id}' as proxy URL '{proxy_url}'")
|
||||||
|
|
||||||
|
# Parse the proxy URL to build a proxy dict for requests
|
||||||
|
import urllib.parse
|
||||||
|
parsed_proxy = urllib.parse.urlparse(proxy_url)
|
||||||
|
proxy_type = parsed_proxy.scheme
|
||||||
|
|
||||||
|
# Extract credentials if present
|
||||||
|
username = None
|
||||||
|
password = None
|
||||||
|
if parsed_proxy.username:
|
||||||
|
username = parsed_proxy.username
|
||||||
|
if parsed_proxy.password:
|
||||||
|
password = parsed_proxy.password
|
||||||
|
|
||||||
|
# Build the proxy URL without credentials for the proxy dict
|
||||||
|
netloc = parsed_proxy.netloc
|
||||||
|
if '@' in netloc:
|
||||||
|
netloc = netloc.split('@')[1]
|
||||||
|
|
||||||
|
proxy_addr = f"{proxy_type}://{netloc}"
|
||||||
|
|
||||||
|
# Create the proxy configuration
|
||||||
|
proxy_config = {
|
||||||
|
'http': proxy_addr,
|
||||||
|
'https': proxy_addr
|
||||||
|
}
|
||||||
|
|
||||||
|
# Add credentials if present
|
||||||
|
if username:
|
||||||
|
proxy_config['username'] = username
|
||||||
|
if password:
|
||||||
|
proxy_config['password'] = password
|
||||||
|
except Exception as e:
|
||||||
|
# Log the error but continue without a proxy
|
||||||
|
logger.error(f"Error setting up proxy: {str(e)}")
|
||||||
|
proxy_config = None
|
||||||
|
proxy_url = None
|
||||||
|
|
||||||
|
return proxy_config, proxy_url
|
||||||
|
|
||||||
|
def call_browser(self, preferred_proxy_id=None):
|
||||||
|
"""Fetch content using the appropriate browser/fetcher
|
||||||
|
|
||||||
|
This method will:
|
||||||
|
1. Determine the appropriate fetcher to use based on watch settings
|
||||||
|
2. Set up proxy configuration if needed
|
||||||
|
3. Initialize the fetcher with the correct parameters
|
||||||
|
4. Configure any browser steps if needed
|
||||||
|
|
||||||
|
Args:
|
||||||
|
preferred_proxy_id: Optional explicit proxy ID to use
|
||||||
|
"""
|
||||||
from requests.structures import CaseInsensitiveDict
|
from requests.structures import CaseInsensitiveDict
|
||||||
|
|
||||||
url = self.watch.link
|
url = self.watch.link
|
||||||
@@ -43,8 +134,8 @@ class difference_detection_processor():
|
|||||||
# Requests, playwright, other browser via wss:// etc, fetch_extra_something
|
# Requests, playwright, other browser via wss:// etc, fetch_extra_something
|
||||||
prefer_fetch_backend = self.watch.get('fetch_backend', 'system')
|
prefer_fetch_backend = self.watch.get('fetch_backend', 'system')
|
||||||
|
|
||||||
# Proxy ID "key"
|
# Get proxy configuration
|
||||||
preferred_proxy_id = preferred_proxy_id if preferred_proxy_id else self.datastore.get_preferred_proxy_for_watch(uuid=self.watch.get('uuid'))
|
proxy_config, proxy_url = self._get_proxy_for_watch(preferred_proxy_id)
|
||||||
|
|
||||||
# Pluggable content self.fetcher
|
# Pluggable content self.fetcher
|
||||||
if not prefer_fetch_backend or prefer_fetch_backend == 'system':
|
if not prefer_fetch_backend or prefer_fetch_backend == 'system':
|
||||||
@@ -82,14 +173,10 @@ class difference_detection_processor():
|
|||||||
# What it referenced doesnt exist, Just use a default
|
# What it referenced doesnt exist, Just use a default
|
||||||
fetcher_obj = getattr(content_fetchers, "html_requests")
|
fetcher_obj = getattr(content_fetchers, "html_requests")
|
||||||
|
|
||||||
proxy_url = None
|
|
||||||
if preferred_proxy_id:
|
|
||||||
# Custom browser endpoints should NOT have a proxy added
|
# Custom browser endpoints should NOT have a proxy added
|
||||||
if not prefer_fetch_backend.startswith('extra_browser_'):
|
if proxy_url and prefer_fetch_backend.startswith('extra_browser_'):
|
||||||
proxy_url = self.datastore.proxy_list.get(preferred_proxy_id).get('url')
|
logger.debug(f"Skipping adding proxy data when custom Browser endpoint is specified.")
|
||||||
logger.debug(f"Selected proxy key '{preferred_proxy_id}' as proxy URL '{proxy_url}' for {url}")
|
proxy_url = None
|
||||||
else:
|
|
||||||
logger.debug(f"Skipping adding proxy data when custom Browser endpoint is specified. ")
|
|
||||||
|
|
||||||
# Now call the fetcher (playwright/requests/etc) with arguments that only a fetcher would need.
|
# Now call the fetcher (playwright/requests/etc) with arguments that only a fetcher would need.
|
||||||
# When browser_connection_url is None, it method should default to working out whats the best defaults (os env vars etc)
|
# When browser_connection_url is None, it method should default to working out whats the best defaults (os env vars etc)
|
||||||
@@ -185,9 +272,9 @@ def find_sub_packages(package_name):
|
|||||||
|
|
||||||
def find_processors():
|
def find_processors():
|
||||||
"""
|
"""
|
||||||
Find all subclasses of DifferenceDetectionProcessor in the specified package.
|
Find all subclasses of DifferenceDetectionProcessor in the specified package
|
||||||
|
and also include processors from the plugin system.
|
||||||
|
|
||||||
:param package_name: The name of the package to scan for processor modules.
|
|
||||||
:return: A list of (module, class) tuples.
|
:return: A list of (module, class) tuples.
|
||||||
"""
|
"""
|
||||||
package_name = "changedetectionio.processors" # Name of the current package/module
|
package_name = "changedetectionio.processors" # Name of the current package/module
|
||||||
@@ -195,6 +282,7 @@ def find_processors():
|
|||||||
processors = []
|
processors = []
|
||||||
sub_packages = find_sub_packages(package_name)
|
sub_packages = find_sub_packages(package_name)
|
||||||
|
|
||||||
|
# Find traditional processors
|
||||||
for sub_package in sub_packages:
|
for sub_package in sub_packages:
|
||||||
module_name = f"{package_name}.{sub_package}.processor"
|
module_name = f"{package_name}.{sub_package}.processor"
|
||||||
try:
|
try:
|
||||||
@@ -207,6 +295,15 @@ def find_processors():
|
|||||||
except (ModuleNotFoundError, ImportError) as e:
|
except (ModuleNotFoundError, ImportError) as e:
|
||||||
logger.warning(f"Failed to import module {module_name}: {e} (find_processors())")
|
logger.warning(f"Failed to import module {module_name}: {e} (find_processors())")
|
||||||
|
|
||||||
|
# Also include processors from the plugin system
|
||||||
|
try:
|
||||||
|
from .processor_registry import get_plugin_processor_modules
|
||||||
|
plugin_modules = get_plugin_processor_modules()
|
||||||
|
if plugin_modules:
|
||||||
|
processors.extend(plugin_modules)
|
||||||
|
except (ImportError, ModuleNotFoundError) as e:
|
||||||
|
logger.warning(f"Failed to import plugin modules: {e} (find_processors())")
|
||||||
|
|
||||||
return processors
|
return processors
|
||||||
|
|
||||||
|
|
||||||
@@ -223,8 +320,22 @@ def get_parent_module(module):
|
|||||||
return False
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
def get_custom_watch_obj_for_processor(processor_name):
|
def get_custom_watch_obj_for_processor(processor_name):
|
||||||
|
"""
|
||||||
|
Get the custom watch object for a processor
|
||||||
|
:param processor_name: Name of the processor
|
||||||
|
:return: Watch class or None
|
||||||
|
"""
|
||||||
|
# First, try to get the watch model from the pluggy system
|
||||||
|
try:
|
||||||
|
from .processor_registry import get_processor_watch_model
|
||||||
|
watch_model = get_processor_watch_model(processor_name)
|
||||||
|
if watch_model:
|
||||||
|
return watch_model
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning(f"Error getting processor watch model from pluggy: {e}")
|
||||||
|
|
||||||
|
# Fall back to the traditional approach
|
||||||
from changedetectionio.model import Watch
|
from changedetectionio.model import Watch
|
||||||
watch_class = Watch.model
|
watch_class = Watch.model
|
||||||
processor_classes = find_processors()
|
processor_classes = find_processors()
|
||||||
@@ -241,14 +352,47 @@ def get_custom_watch_obj_for_processor(processor_name):
|
|||||||
def available_processors():
|
def available_processors():
|
||||||
"""
|
"""
|
||||||
Get a list of processors by name and description for the UI elements
|
Get a list of processors by name and description for the UI elements
|
||||||
:return: A list :)
|
:return: A list of tuples (processor_name, description)
|
||||||
"""
|
"""
|
||||||
|
# Get processors from the pluggy system
|
||||||
|
pluggy_processors = []
|
||||||
|
try:
|
||||||
|
from .processor_registry import get_all_processors
|
||||||
|
pluggy_processors = get_all_processors()
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error getting processors from pluggy: {str(e)}")
|
||||||
|
|
||||||
processor_classes = find_processors()
|
# Get processors from the traditional file-based system
|
||||||
|
traditional_processors = []
|
||||||
|
try:
|
||||||
|
# Let's not use find_processors() directly since it now also includes pluggy processors
|
||||||
|
package_name = "changedetectionio.processors"
|
||||||
|
sub_packages = find_sub_packages(package_name)
|
||||||
|
|
||||||
available = []
|
for sub_package in sub_packages:
|
||||||
for package, processor_class in processor_classes:
|
module_name = f"{package_name}.{sub_package}.processor"
|
||||||
available.append((processor_class, package.name))
|
try:
|
||||||
|
module = importlib.import_module(module_name)
|
||||||
|
# Get the name and description from the module if available
|
||||||
|
name = getattr(module, 'name', f"Traditional processor: {sub_package}")
|
||||||
|
description = getattr(module, 'description', sub_package)
|
||||||
|
traditional_processors.append((sub_package, name))
|
||||||
|
except (ModuleNotFoundError, ImportError, AttributeError) as e:
|
||||||
|
logger.warning(f"Failed to import module {module_name}: {e} (available_processors())")
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error getting traditional processors: {str(e)}")
|
||||||
|
|
||||||
return available
|
# Combine the lists, ensuring no duplicates
|
||||||
|
# Pluggy processors take precedence
|
||||||
|
all_processors = []
|
||||||
|
|
||||||
|
# Add all pluggy processors
|
||||||
|
all_processors.extend(pluggy_processors)
|
||||||
|
|
||||||
|
# Add traditional processors that aren't already registered via pluggy
|
||||||
|
pluggy_processor_names = [name for name, _ in pluggy_processors]
|
||||||
|
for processor_class, name in traditional_processors:
|
||||||
|
if processor_class not in pluggy_processor_names:
|
||||||
|
all_processors.append((processor_class, name))
|
||||||
|
|
||||||
|
return all_processors
|
||||||
17
changedetectionio/processors/form.py
Normal file
17
changedetectionio/processors/form.py
Normal file
@@ -0,0 +1,17 @@
|
|||||||
|
from wtforms import (
|
||||||
|
BooleanField,
|
||||||
|
validators,
|
||||||
|
RadioField
|
||||||
|
)
|
||||||
|
from wtforms.fields.choices import SelectField
|
||||||
|
from wtforms.fields.form import FormField
|
||||||
|
from wtforms.form import Form
|
||||||
|
|
||||||
|
class BaseProcessorForm(Form):
|
||||||
|
"""Base class for processor forms"""
|
||||||
|
|
||||||
|
def extra_tab_content(self):
|
||||||
|
return None
|
||||||
|
|
||||||
|
def extra_form_content(self):
|
||||||
|
return None
|
||||||
4
changedetectionio/processors/forms.py
Normal file
4
changedetectionio/processors/forms.py
Normal file
@@ -0,0 +1,4 @@
|
|||||||
|
"""
|
||||||
|
Forms for processors
|
||||||
|
"""
|
||||||
|
from changedetectionio.forms import processor_text_json_diff_form
|
||||||
69
changedetectionio/processors/pluggy_interface.py
Normal file
69
changedetectionio/processors/pluggy_interface.py
Normal file
@@ -0,0 +1,69 @@
|
|||||||
|
import pluggy
|
||||||
|
|
||||||
|
# Define the plugin namespace for processors
|
||||||
|
PLUGIN_NAMESPACE = "changedetectionio_processors"
|
||||||
|
|
||||||
|
hookspec = pluggy.HookspecMarker(PLUGIN_NAMESPACE)
|
||||||
|
hookimpl = pluggy.HookimplMarker(PLUGIN_NAMESPACE)
|
||||||
|
|
||||||
|
|
||||||
|
class ProcessorSpec:
|
||||||
|
"""Hook specifications for processor plugins."""
|
||||||
|
|
||||||
|
@hookspec
|
||||||
|
def get_processor_name():
|
||||||
|
"""Return the name of the processor."""
|
||||||
|
pass
|
||||||
|
|
||||||
|
@hookspec
|
||||||
|
def get_processor_description():
|
||||||
|
"""Return the description of the processor."""
|
||||||
|
pass
|
||||||
|
|
||||||
|
@hookspec
|
||||||
|
def get_processor_class():
|
||||||
|
"""Return the processor class."""
|
||||||
|
pass
|
||||||
|
|
||||||
|
@hookspec
|
||||||
|
def get_processor_form():
|
||||||
|
"""Return the processor form class."""
|
||||||
|
pass
|
||||||
|
|
||||||
|
@hookspec
|
||||||
|
def get_processor_watch_model():
|
||||||
|
"""Return the watch model class for this processor (if any)."""
|
||||||
|
pass
|
||||||
|
|
||||||
|
@hookspec
|
||||||
|
def get_display_link(url, processor_name):
|
||||||
|
"""Return a custom display link for the given processor.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
url: The original URL from the watch
|
||||||
|
processor_name: The name of the processor
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
A string with the custom display link or None to use the default
|
||||||
|
"""
|
||||||
|
pass
|
||||||
|
|
||||||
|
@hookspec
|
||||||
|
def perform_site_check(datastore, watch_uuid):
|
||||||
|
"""Create and return a processor instance ready to perform site check.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
datastore: The application datastore
|
||||||
|
watch_uuid: The UUID of the watch to check
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
A processor instance ready to perform site check
|
||||||
|
"""
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
# Set up the plugin manager
|
||||||
|
plugin_manager = pluggy.PluginManager(PLUGIN_NAMESPACE)
|
||||||
|
|
||||||
|
# Register hook specifications
|
||||||
|
plugin_manager.add_hookspecs(ProcessorSpec)
|
||||||
222
changedetectionio/processors/processor_registry.py
Normal file
222
changedetectionio/processors/processor_registry.py
Normal file
@@ -0,0 +1,222 @@
|
|||||||
|
from loguru import logger
|
||||||
|
from changedetectionio.model import Watch
|
||||||
|
from .pluggy_interface import plugin_manager
|
||||||
|
from typing import Dict, Any, List, Tuple, Optional, TypeVar, Type
|
||||||
|
import functools
|
||||||
|
|
||||||
|
# Import and register internal plugins
|
||||||
|
from . import whois_plugin
|
||||||
|
from . import test_plugin
|
||||||
|
|
||||||
|
# Register plugins
|
||||||
|
plugin_manager.register(whois_plugin)
|
||||||
|
plugin_manager.register(test_plugin)
|
||||||
|
|
||||||
|
# Load any setuptools entrypoints
|
||||||
|
plugin_manager.load_setuptools_entrypoints("changedetectionio_processors")
|
||||||
|
|
||||||
|
# Type definitions for better type hinting
|
||||||
|
T = TypeVar('T')
|
||||||
|
ProcessorClass = TypeVar('ProcessorClass')
|
||||||
|
ProcessorForm = TypeVar('ProcessorForm')
|
||||||
|
ProcessorWatchModel = TypeVar('ProcessorWatchModel')
|
||||||
|
ProcessorInstance = TypeVar('ProcessorInstance')
|
||||||
|
|
||||||
|
# Cache for plugin name mapping to improve performance
|
||||||
|
_plugin_name_map: Dict[str, Any] = {}
|
||||||
|
|
||||||
|
def register_plugin(plugin_module):
|
||||||
|
"""Register a processor plugin"""
|
||||||
|
plugin_manager.register(plugin_module)
|
||||||
|
# Clear the plugin name map cache when a new plugin is registered
|
||||||
|
global _plugin_name_map
|
||||||
|
_plugin_name_map = {}
|
||||||
|
|
||||||
|
def _get_plugin_name_map() -> Dict[str, Any]:
|
||||||
|
"""Get a mapping of processor names to plugins
|
||||||
|
:return: Dictionary mapping processor names to plugins
|
||||||
|
"""
|
||||||
|
global _plugin_name_map
|
||||||
|
|
||||||
|
# Return cached map if available
|
||||||
|
if _plugin_name_map:
|
||||||
|
return _plugin_name_map
|
||||||
|
|
||||||
|
# Build the map
|
||||||
|
result = {}
|
||||||
|
|
||||||
|
# Get all plugins from the plugin manager
|
||||||
|
all_plugins = list(plugin_manager.get_plugins())
|
||||||
|
|
||||||
|
# First register known internal plugins by name for reliability
|
||||||
|
known_plugins = {
|
||||||
|
'whois': whois_plugin,
|
||||||
|
'test': test_plugin
|
||||||
|
}
|
||||||
|
|
||||||
|
for name, plugin in known_plugins.items():
|
||||||
|
if plugin in all_plugins:
|
||||||
|
result[name] = plugin
|
||||||
|
|
||||||
|
# Then process remaining plugins through the hook system
|
||||||
|
for plugin in all_plugins:
|
||||||
|
if plugin in known_plugins.values():
|
||||||
|
continue # Skip plugins we've already registered
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Get the processor name from this plugin
|
||||||
|
name_results = plugin_manager.hook.get_processor_name(plugin=plugin)
|
||||||
|
|
||||||
|
if name_results:
|
||||||
|
plugin_name = name_results[0]
|
||||||
|
|
||||||
|
# Check for name collisions
|
||||||
|
if plugin_name in result:
|
||||||
|
logger.warning(f"Plugin name collision: '{plugin_name}' is already registered")
|
||||||
|
continue
|
||||||
|
|
||||||
|
result[plugin_name] = plugin
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error getting processor name from plugin: {str(e)}")
|
||||||
|
|
||||||
|
# Cache the map
|
||||||
|
_plugin_name_map = result
|
||||||
|
return result
|
||||||
|
|
||||||
|
def _get_plugin_by_name(processor_name: str) -> Optional[Any]:
|
||||||
|
"""Get a plugin by its processor name
|
||||||
|
:param processor_name: Name of the processor
|
||||||
|
:return: Plugin object or None
|
||||||
|
"""
|
||||||
|
return _get_plugin_name_map().get(processor_name)
|
||||||
|
|
||||||
|
def _call_hook_for_plugin(plugin: Any, hook_name: str, default_value: T = None, **kwargs) -> Optional[T]:
|
||||||
|
"""Call a hook for a specific plugin and handle exceptions
|
||||||
|
:param plugin: The plugin to call the hook for
|
||||||
|
:param hook_name: Name of the hook to call
|
||||||
|
:param default_value: Default value to return if the hook call fails
|
||||||
|
:param kwargs: Additional arguments to pass to the hook
|
||||||
|
:return: Result of the hook call or default value
|
||||||
|
"""
|
||||||
|
if not plugin:
|
||||||
|
return default_value
|
||||||
|
|
||||||
|
try:
|
||||||
|
hook = getattr(plugin_manager.hook, hook_name)
|
||||||
|
results = hook(plugin=plugin, **kwargs)
|
||||||
|
|
||||||
|
if results:
|
||||||
|
return results[0]
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error calling {hook_name} for plugin: {str(e)}")
|
||||||
|
|
||||||
|
return default_value
|
||||||
|
|
||||||
|
def get_all_processors() -> List[Tuple[str, str]]:
|
||||||
|
"""Get all processors
|
||||||
|
:return: List of tuples (processor_name, processor_description)
|
||||||
|
"""
|
||||||
|
processors = []
|
||||||
|
|
||||||
|
for processor_name, plugin in _get_plugin_name_map().items():
|
||||||
|
description = _call_hook_for_plugin(plugin, 'get_processor_description')
|
||||||
|
if description:
|
||||||
|
processors.append((processor_name, description))
|
||||||
|
|
||||||
|
return processors
|
||||||
|
|
||||||
|
def get_processor_class(processor_name: str) -> Optional[Type[ProcessorClass]]:
|
||||||
|
"""Get processor class by name
|
||||||
|
:param processor_name: Name of the processor
|
||||||
|
:return: Processor class or None
|
||||||
|
"""
|
||||||
|
plugin = _get_plugin_by_name(processor_name)
|
||||||
|
return _call_hook_for_plugin(plugin, 'get_processor_class')
|
||||||
|
|
||||||
|
def get_processor_form(processor_name: str) -> Optional[Type[ProcessorForm]]:
|
||||||
|
"""Get processor form by name
|
||||||
|
:param processor_name: Name of the processor
|
||||||
|
:return: Processor form class or None
|
||||||
|
"""
|
||||||
|
plugin = _get_plugin_by_name(processor_name)
|
||||||
|
return _call_hook_for_plugin(plugin, 'get_processor_form')
|
||||||
|
|
||||||
|
def get_processor_watch_model(processor_name: str) -> Type[ProcessorWatchModel]:
|
||||||
|
"""Get processor watch model by name
|
||||||
|
:param processor_name: Name of the processor
|
||||||
|
:return: Watch model class or default Watch model
|
||||||
|
"""
|
||||||
|
plugin = _get_plugin_by_name(processor_name)
|
||||||
|
return _call_hook_for_plugin(plugin, 'get_processor_watch_model', default_value=Watch.model)
|
||||||
|
|
||||||
|
def get_processor_site_check(processor_name: str, datastore: Any, watch_uuid: str) -> Optional[ProcessorInstance]:
|
||||||
|
"""Get a processor instance ready to perform site check
|
||||||
|
:param processor_name: Name of the processor
|
||||||
|
:param datastore: The application datastore
|
||||||
|
:param watch_uuid: The UUID of the watch to check
|
||||||
|
:return: A processor instance ready to perform site check, or None
|
||||||
|
"""
|
||||||
|
plugin = _get_plugin_by_name(processor_name)
|
||||||
|
if not plugin:
|
||||||
|
return None
|
||||||
|
|
||||||
|
# Try to get the perform_site_check implementation
|
||||||
|
try:
|
||||||
|
processor = _call_hook_for_plugin(
|
||||||
|
plugin,
|
||||||
|
'perform_site_check',
|
||||||
|
datastore=datastore,
|
||||||
|
watch_uuid=watch_uuid
|
||||||
|
)
|
||||||
|
if processor:
|
||||||
|
return processor
|
||||||
|
|
||||||
|
# If no perform_site_check hook implementation, try getting the class and instantiating it
|
||||||
|
processor_class = _call_hook_for_plugin(plugin, 'get_processor_class')
|
||||||
|
if processor_class:
|
||||||
|
return processor_class(datastore=datastore, watch_uuid=watch_uuid)
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error getting processor site check for {processor_name}: {str(e)}")
|
||||||
|
|
||||||
|
return None
|
||||||
|
|
||||||
|
def get_display_link(url: str, processor_name: str) -> Optional[str]:
|
||||||
|
"""Get a custom display link for the given processor
|
||||||
|
:param url: The original URL from the watch
|
||||||
|
:param processor_name: Name of the processor
|
||||||
|
:return: A string with the custom display link or None to use the default
|
||||||
|
"""
|
||||||
|
plugin = _get_plugin_by_name(processor_name)
|
||||||
|
return _call_hook_for_plugin(
|
||||||
|
plugin,
|
||||||
|
'get_display_link',
|
||||||
|
url=url,
|
||||||
|
processor_name=processor_name
|
||||||
|
)
|
||||||
|
|
||||||
|
def get_plugin_processor_modules() -> List[Tuple[Any, str]]:
|
||||||
|
"""Get processor modules for all plugins that can be used with the find_processors function
|
||||||
|
|
||||||
|
This function adapts pluggy plugins to be compatible with the traditional find_processors system
|
||||||
|
|
||||||
|
:return: A list of (module, processor_name) tuples
|
||||||
|
"""
|
||||||
|
result = []
|
||||||
|
|
||||||
|
# Import base modules once to avoid repeated imports
|
||||||
|
from changedetectionio.processors.text_json_diff import processor as text_json_diff_processor
|
||||||
|
|
||||||
|
# For each plugin, map to a suitable module for find_processors
|
||||||
|
for processor_name, plugin in _get_plugin_name_map().items():
|
||||||
|
try:
|
||||||
|
processor_class = _call_hook_for_plugin(plugin, 'get_processor_class')
|
||||||
|
|
||||||
|
if processor_class:
|
||||||
|
# Check if this processor extends the text_json_diff processor
|
||||||
|
base_class_name = str(processor_class.__bases__[0].__name__)
|
||||||
|
if base_class_name == 'perform_site_check' or 'TextJsonDiffProcessor' in base_class_name:
|
||||||
|
result.append((text_json_diff_processor, processor_name))
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error mapping processor module for {processor_name}: {str(e)}")
|
||||||
|
|
||||||
|
return result
|
||||||
169
changedetectionio/processors/whois_plugin.py
Normal file
169
changedetectionio/processors/whois_plugin.py
Normal file
@@ -0,0 +1,169 @@
|
|||||||
|
from loguru import logger
|
||||||
|
import re
|
||||||
|
import urllib.parse
|
||||||
|
from .pluggy_interface import hookimpl
|
||||||
|
from requests.structures import CaseInsensitiveDict
|
||||||
|
from changedetectionio.content_fetchers.base import Fetcher
|
||||||
|
|
||||||
|
# Import the text_json_diff processor
|
||||||
|
from changedetectionio.processors.text_json_diff.processor import perform_site_check as TextJsonDiffProcessor
|
||||||
|
|
||||||
|
# WHOIS Processor implementation that extends TextJsonDiffProcessor
|
||||||
|
class WhoisProcessor(TextJsonDiffProcessor):
|
||||||
|
|
||||||
|
def _extract_domain_from_url(self, url):
|
||||||
|
"""Extract domain from URL, removing www. prefix if present"""
|
||||||
|
parsed_url = urllib.parse.urlparse(url)
|
||||||
|
domain = parsed_url.netloc
|
||||||
|
|
||||||
|
# Remove www. prefix if present
|
||||||
|
domain = re.sub(r'^www\.', '', domain)
|
||||||
|
|
||||||
|
return domain
|
||||||
|
|
||||||
|
def call_browser(self, preferred_proxy_id=None):
|
||||||
|
"""Override call_browser to perform WHOIS lookup instead of using a browser
|
||||||
|
|
||||||
|
Note: The python-whois library doesn't directly support proxies. For real proxy support,
|
||||||
|
we would need to implement a custom socket connection that routes through the proxy.
|
||||||
|
This is a TODO for a future enhancement.
|
||||||
|
"""
|
||||||
|
# Initialize a basic fetcher - this is used by the parent class
|
||||||
|
self.fetcher = Fetcher()
|
||||||
|
|
||||||
|
# Extract URL from watch
|
||||||
|
url = self.watch.link
|
||||||
|
|
||||||
|
# Check for file:// access
|
||||||
|
if re.search(r'^file:', url.strip(), re.IGNORECASE):
|
||||||
|
if not self.datastore.data.get('settings', {}).get('application', {}).get('allow_file_uri', False):
|
||||||
|
raise Exception("file:// type access is denied for security reasons.")
|
||||||
|
|
||||||
|
# Extract domain from URL
|
||||||
|
domain = self._extract_domain_from_url(url)
|
||||||
|
|
||||||
|
# Ensure we have a valid domain
|
||||||
|
if not domain:
|
||||||
|
error_msg = f"Could not extract domain from URL: '{url}'"
|
||||||
|
self.fetcher.content = error_msg
|
||||||
|
self.fetcher.status_code = 400
|
||||||
|
logger.error(error_msg)
|
||||||
|
return
|
||||||
|
|
||||||
|
# Get proxy configuration using the common method from parent class
|
||||||
|
proxy_config, proxy_url = super()._get_proxy_for_watch(preferred_proxy_id)
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Use python-whois to get domain information
|
||||||
|
import whois
|
||||||
|
|
||||||
|
# If we have proxy config, use it for the WHOIS lookup
|
||||||
|
# Note: The python-whois library doesn't directly support proxies,
|
||||||
|
# but we can implement proxy support if necessary using custom socket code
|
||||||
|
if proxy_config:
|
||||||
|
# For now, just log that we would use a proxy
|
||||||
|
logger.info(f"Using proxy for WHOIS lookup: {proxy_config}")
|
||||||
|
|
||||||
|
# Perform the WHOIS lookup
|
||||||
|
whois_info = whois.whois(domain)
|
||||||
|
|
||||||
|
# Convert whois_info object to text
|
||||||
|
if hasattr(whois_info, 'text'):
|
||||||
|
# Some whois implementations store raw text in .text attribute
|
||||||
|
whois_text = whois_info.text
|
||||||
|
else:
|
||||||
|
# Otherwise, format it nicely as key-value pairs
|
||||||
|
whois_text = f"WHOIS Information for domain: {domain}\n\n"
|
||||||
|
for key, value in whois_info.items():
|
||||||
|
if value:
|
||||||
|
whois_text += f"{key}: {value}\n"
|
||||||
|
|
||||||
|
# Set the content and status for the fetcher
|
||||||
|
self.fetcher.content = whois_text
|
||||||
|
self.fetcher.status_code = 200
|
||||||
|
|
||||||
|
# Setup headers dictionary for the fetcher
|
||||||
|
self.fetcher.headers = CaseInsensitiveDict({
|
||||||
|
'content-type': 'text/plain',
|
||||||
|
'server': 'whois-processor'
|
||||||
|
})
|
||||||
|
|
||||||
|
# Add getters for headers
|
||||||
|
self.fetcher.get_all_headers = lambda: self.fetcher.headers
|
||||||
|
self.fetcher.get_last_status_code = lambda: self.fetcher.status_code
|
||||||
|
|
||||||
|
# Implement necessary methods
|
||||||
|
self.fetcher.quit = lambda: None
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
error_msg = f"Error fetching WHOIS data for domain {domain}: {str(e)}"
|
||||||
|
self.fetcher.content = error_msg
|
||||||
|
self.fetcher.status_code = 500
|
||||||
|
self.fetcher.headers = CaseInsensitiveDict({
|
||||||
|
'content-type': 'text/plain',
|
||||||
|
'server': 'whois-processor'
|
||||||
|
})
|
||||||
|
self.fetcher.get_all_headers = lambda: self.fetcher.headers
|
||||||
|
self.fetcher.get_last_status_code = lambda: self.fetcher.status_code
|
||||||
|
self.fetcher.quit = lambda: None
|
||||||
|
logger.error(error_msg)
|
||||||
|
|
||||||
|
return
|
||||||
|
|
||||||
|
def run_changedetection(self, watch):
|
||||||
|
"""Use the parent's run_changedetection which will use our overridden call_browser method"""
|
||||||
|
try:
|
||||||
|
# Let the parent class handle everything now that we've overridden call_browser
|
||||||
|
changed_detected, update_obj, filtered_text = super().run_changedetection(watch)
|
||||||
|
return changed_detected, update_obj, filtered_text
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
error_msg = f"Error in WHOIS processor: {str(e)}"
|
||||||
|
update_obj = {'last_notification_error': False, 'last_error': error_msg}
|
||||||
|
logger.error(error_msg)
|
||||||
|
return False, update_obj, error_msg.encode('utf-8')
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def perform_site_check(datastore, watch_uuid):
|
||||||
|
"""Factory method to create a WhoisProcessor instance - for compatibility with legacy code"""
|
||||||
|
processor = WhoisProcessor(datastore=datastore, watch_uuid=watch_uuid)
|
||||||
|
return processor
|
||||||
|
|
||||||
|
@hookimpl
|
||||||
|
def perform_site_check(datastore, watch_uuid):
|
||||||
|
"""Create and return a processor instance ready to perform site check"""
|
||||||
|
return WhoisProcessor(datastore=datastore, watch_uuid=watch_uuid)
|
||||||
|
|
||||||
|
@hookimpl(trylast=True) # Use trylast to ensure this runs last in case of conflicts
|
||||||
|
def get_processor_name():
|
||||||
|
"""Return the name of this processor"""
|
||||||
|
from loguru import logger
|
||||||
|
logger.debug("whois_plugin.get_processor_name() called")
|
||||||
|
return "whois"
|
||||||
|
|
||||||
|
@hookimpl
|
||||||
|
def get_processor_description():
|
||||||
|
"""Return the description of this processor"""
|
||||||
|
return "WHOIS Domain Information Changes Detector"
|
||||||
|
|
||||||
|
@hookimpl
|
||||||
|
def get_processor_class():
|
||||||
|
"""Return the processor class"""
|
||||||
|
return WhoisProcessor
|
||||||
|
|
||||||
|
@hookimpl
|
||||||
|
def get_processor_form():
|
||||||
|
"""Return the processor form class"""
|
||||||
|
# Import here to avoid circular imports
|
||||||
|
try:
|
||||||
|
from changedetectionio.forms import processor_text_json_diff_form
|
||||||
|
return processor_text_json_diff_form
|
||||||
|
except Exception as e:
|
||||||
|
from loguru import logger
|
||||||
|
logger.error(f"Error importing form for whois plugin: {str(e)}")
|
||||||
|
return None
|
||||||
|
|
||||||
|
@hookimpl
|
||||||
|
def get_processor_watch_model():
|
||||||
|
"""Return the watch model class for this processor"""
|
||||||
|
return None # Use default watch model
|
||||||
59
changedetectionio/tests/test_processor_registry.py
Normal file
59
changedetectionio/tests/test_processor_registry.py
Normal file
@@ -0,0 +1,59 @@
|
|||||||
|
import pytest
|
||||||
|
from changedetectionio.processors.processor_registry import get_processor_class, get_all_processors
|
||||||
|
|
||||||
|
|
||||||
|
def test_get_all_processors():
|
||||||
|
"""Test that get_all_processors returns a list of processor tuples"""
|
||||||
|
processors = get_all_processors()
|
||||||
|
assert isinstance(processors, list)
|
||||||
|
assert len(processors) > 0
|
||||||
|
|
||||||
|
# Each item should be a tuple of (name, description)
|
||||||
|
for processor in processors:
|
||||||
|
assert isinstance(processor, tuple)
|
||||||
|
assert len(processor) == 2
|
||||||
|
assert isinstance(processor[0], str)
|
||||||
|
assert isinstance(processor[1], str)
|
||||||
|
|
||||||
|
# Check that our WHOIS processor is included
|
||||||
|
whois_processor = next((p for p in processors if p[0] == "whois"), None)
|
||||||
|
assert whois_processor is not None
|
||||||
|
assert whois_processor[1] == "WHOIS Domain Information Changes"
|
||||||
|
|
||||||
|
|
||||||
|
def test_get_processor_class():
|
||||||
|
"""Test that get_processor_class returns the right class"""
|
||||||
|
# Get the WHOIS processor class
|
||||||
|
processor_class = get_processor_class("whois")
|
||||||
|
assert processor_class is not None
|
||||||
|
|
||||||
|
# It should have perform_site_check method
|
||||||
|
assert hasattr(processor_class, 'perform_site_check')
|
||||||
|
|
||||||
|
# Check for non-existent processor
|
||||||
|
non_existent = get_processor_class("non_existent_processor")
|
||||||
|
assert non_existent is None
|
||||||
|
|
||||||
|
|
||||||
|
def test_get_processor_site_check():
|
||||||
|
"""Test that get_processor_site_check returns a processor instance"""
|
||||||
|
from unittest.mock import MagicMock
|
||||||
|
from changedetectionio.processors.processor_registry import get_processor_site_check
|
||||||
|
|
||||||
|
# Get a WHOIS processor instance
|
||||||
|
mock_datastore = MagicMock()
|
||||||
|
watch_uuid = "test-uuid"
|
||||||
|
processor = get_processor_site_check("whois", mock_datastore, watch_uuid)
|
||||||
|
|
||||||
|
# It should be a processor instance
|
||||||
|
assert processor is not None
|
||||||
|
|
||||||
|
# It should have the run_changedetection method
|
||||||
|
assert hasattr(processor, 'run_changedetection')
|
||||||
|
|
||||||
|
# It should have the call_browser method
|
||||||
|
assert hasattr(processor, 'call_browser')
|
||||||
|
|
||||||
|
# Check for non-existent processor
|
||||||
|
non_existent = get_processor_site_check("non_existent_processor", mock_datastore, watch_uuid)
|
||||||
|
assert non_existent is None
|
||||||
182
changedetectionio/tests/test_whois_processor.py
Normal file
182
changedetectionio/tests/test_whois_processor.py
Normal file
@@ -0,0 +1,182 @@
|
|||||||
|
import pytest
|
||||||
|
from unittest.mock import MagicMock, patch
|
||||||
|
from changedetectionio.processors.whois_plugin import WhoisProcessor
|
||||||
|
|
||||||
|
|
||||||
|
class MockWatch:
|
||||||
|
def __init__(self, url, previous_md5=None, include_filters=None, ignore_text=None):
|
||||||
|
self.url = url
|
||||||
|
self._previous_md5 = previous_md5
|
||||||
|
self._include_filters = include_filters or []
|
||||||
|
self._ignore_text = ignore_text or []
|
||||||
|
self.history = {}
|
||||||
|
|
||||||
|
def get(self, key, default=None):
|
||||||
|
if key == 'previous_md5':
|
||||||
|
return self._previous_md5
|
||||||
|
elif key == 'include_filters':
|
||||||
|
return self._include_filters
|
||||||
|
elif key == 'ignore_text':
|
||||||
|
return self._ignore_text
|
||||||
|
elif key == 'url':
|
||||||
|
return self.url
|
||||||
|
return default
|
||||||
|
|
||||||
|
def has_special_diff_filter_options_set(self):
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
@patch('whois.whois')
|
||||||
|
@patch('changedetectionio.processors.difference_detection_processor.__init__')
|
||||||
|
@patch('changedetectionio.processors.text_json_diff.processor.perform_site_check.run_changedetection')
|
||||||
|
def test_whois_processor_basic_functionality(mock_super_run, mock_base_init, mock_whois):
|
||||||
|
"""Test the basic functionality of the WhoisProcessor"""
|
||||||
|
# Mock the base class init so we don't need to set up the full watch structure
|
||||||
|
mock_base_init.return_value = None
|
||||||
|
|
||||||
|
# Mock super().run_changedetection to return a simple result
|
||||||
|
mock_super_run.return_value = (False, {'previous_md5': 'some-md5'}, b'Some filtered text')
|
||||||
|
|
||||||
|
# Mock the whois response
|
||||||
|
mock_whois_result = MagicMock()
|
||||||
|
mock_whois_result.text = "Domain Name: example.com\nRegistrar: Example Registrar\nCreation Date: 2020-01-01\n"
|
||||||
|
mock_whois.return_value = mock_whois_result
|
||||||
|
|
||||||
|
# Create mock datastore
|
||||||
|
mock_datastore = MagicMock()
|
||||||
|
mock_datastore.proxy_list = None # No proxies
|
||||||
|
mock_datastore.get_preferred_proxy_for_watch.return_value = None
|
||||||
|
mock_datastore.data = {
|
||||||
|
'settings': {
|
||||||
|
'application': {
|
||||||
|
'allow_file_uri': False
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
# Create a processor instance and setup minimal required attributes
|
||||||
|
processor = WhoisProcessor(datastore=mock_datastore, watch_uuid='test-uuid')
|
||||||
|
|
||||||
|
# Create a minimal watch object
|
||||||
|
watch = MockWatch(url="https://example.com")
|
||||||
|
|
||||||
|
# Simulate link access in the watch
|
||||||
|
processor.watch = MagicMock()
|
||||||
|
processor.watch.link = "https://example.com"
|
||||||
|
processor.watch.get.return_value = "uuid-123"
|
||||||
|
|
||||||
|
# Run the processor's run_changedetection method by first using call_browser
|
||||||
|
processor.call_browser()
|
||||||
|
|
||||||
|
# Check that the fetcher was set up correctly
|
||||||
|
assert processor.fetcher is not None
|
||||||
|
assert hasattr(processor.fetcher, 'content')
|
||||||
|
assert hasattr(processor.fetcher, 'headers')
|
||||||
|
assert hasattr(processor.fetcher, 'status_code')
|
||||||
|
|
||||||
|
# Verify that whois was called with the right domain
|
||||||
|
assert mock_whois.called
|
||||||
|
assert mock_whois.call_args[0][0] == 'example.com'
|
||||||
|
|
||||||
|
# Now run the processor
|
||||||
|
result = processor.run_changedetection(watch)
|
||||||
|
|
||||||
|
# Check that the parent run_changedetection was called
|
||||||
|
assert mock_super_run.called
|
||||||
|
|
||||||
|
|
||||||
|
@patch('whois.whois')
|
||||||
|
@patch('changedetectionio.processors.difference_detection_processor.__init__')
|
||||||
|
def test_whois_processor_call_browser_with_proxy(mock_base_init, mock_whois):
|
||||||
|
"""Test the call_browser method with proxy configuration"""
|
||||||
|
# Mock the base class init
|
||||||
|
mock_base_init.return_value = None
|
||||||
|
|
||||||
|
# Mock the whois response
|
||||||
|
mock_whois_result = MagicMock()
|
||||||
|
mock_whois_result.text = "Domain Name: example.com\nRegistrar: Example Registrar\nCreation Date: 2020-01-01\n"
|
||||||
|
mock_whois.return_value = mock_whois_result
|
||||||
|
|
||||||
|
# Create mock datastore
|
||||||
|
mock_datastore = MagicMock()
|
||||||
|
mock_proxy = {
|
||||||
|
'test-proxy': {
|
||||||
|
'url': 'http://proxy.example.com:8080',
|
||||||
|
'label': 'Test Proxy'
|
||||||
|
}
|
||||||
|
}
|
||||||
|
mock_datastore.proxy_list = mock_proxy
|
||||||
|
mock_datastore.get_preferred_proxy_for_watch.return_value = 'test-proxy'
|
||||||
|
mock_datastore.data = {
|
||||||
|
'settings': {
|
||||||
|
'application': {
|
||||||
|
'allow_file_uri': False
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
# Create a processor instance with our mock datastore
|
||||||
|
processor = WhoisProcessor(datastore=mock_datastore, watch_uuid='test-uuid')
|
||||||
|
|
||||||
|
# Set up watch
|
||||||
|
processor.watch = MagicMock()
|
||||||
|
processor.watch.link = "https://example.com"
|
||||||
|
processor.watch.get.return_value = "uuid-123"
|
||||||
|
|
||||||
|
# Call the method with a proxy
|
||||||
|
processor.call_browser()
|
||||||
|
|
||||||
|
# Verify whois was called
|
||||||
|
assert mock_whois.called
|
||||||
|
assert mock_whois.call_args[0][0] == 'example.com'
|
||||||
|
|
||||||
|
# Check that the fetcher was set up correctly
|
||||||
|
assert processor.fetcher is not None
|
||||||
|
assert processor.fetcher.content is not None
|
||||||
|
|
||||||
|
|
||||||
|
@patch('changedetectionio.processors.difference_detection_processor.__init__')
|
||||||
|
def test_whois_processor_perform_site_check(mock_base_init):
|
||||||
|
"""Test the WhoisProcessor.perform_site_check static method"""
|
||||||
|
mock_base_init.return_value = None
|
||||||
|
|
||||||
|
# Test the static method
|
||||||
|
with patch.object(WhoisProcessor, '__init__', return_value=None) as mock_init:
|
||||||
|
datastore = MagicMock()
|
||||||
|
watch_uuid = "test-uuid"
|
||||||
|
|
||||||
|
# Call the static method
|
||||||
|
processor = WhoisProcessor.perform_site_check(datastore=datastore, watch_uuid=watch_uuid)
|
||||||
|
|
||||||
|
# Check that constructor was called with expected args
|
||||||
|
mock_init.assert_called_once_with(datastore=datastore, watch_uuid=watch_uuid)
|
||||||
|
|
||||||
|
# Check it returns the right type
|
||||||
|
assert isinstance(processor, WhoisProcessor)
|
||||||
|
|
||||||
|
|
||||||
|
def test_get_display_link():
|
||||||
|
"""Test the get_display_link hook implementation"""
|
||||||
|
from changedetectionio.processors.whois_plugin import get_display_link
|
||||||
|
|
||||||
|
# Test with a regular URL
|
||||||
|
url = "https://example.com/some/path?param=value"
|
||||||
|
processor_name = "whois"
|
||||||
|
link = get_display_link(url=url, processor_name=processor_name)
|
||||||
|
assert link == "WHOIS - example.com"
|
||||||
|
|
||||||
|
# Test with a subdomain
|
||||||
|
url = "https://subdomain.example.com/"
|
||||||
|
link = get_display_link(url=url, processor_name=processor_name)
|
||||||
|
assert link == "WHOIS - subdomain.example.com"
|
||||||
|
|
||||||
|
# Test with www prefix (should be removed)
|
||||||
|
url = "https://www.example.com/"
|
||||||
|
link = get_display_link(url=url, processor_name=processor_name)
|
||||||
|
assert link == "WHOIS - example.com"
|
||||||
|
|
||||||
|
# Test with a different processor (should return None)
|
||||||
|
url = "https://example.com/"
|
||||||
|
processor_name = "text_json_diff"
|
||||||
|
link = get_display_link(url=url, processor_name=processor_name)
|
||||||
|
assert link is None
|
||||||
@@ -61,5 +61,22 @@ class TestDiffBuilder(unittest.TestCase):
|
|||||||
p = watch.get_from_version_based_on_last_viewed
|
p = watch.get_from_version_based_on_last_viewed
|
||||||
assert p == "100", "Correct with only one history snapshot"
|
assert p == "100", "Correct with only one history snapshot"
|
||||||
|
|
||||||
|
def test_watch_link_property_with_processor(self):
|
||||||
|
"""Test the link property with a processor that customizes the link"""
|
||||||
|
from unittest.mock import patch
|
||||||
|
|
||||||
|
watch = Watch.model(datastore_path='/tmp', default={})
|
||||||
|
watch['url'] = 'https://example.com'
|
||||||
|
watch['processor'] = 'whois'
|
||||||
|
|
||||||
|
# Mock the processor registry's get_display_link function
|
||||||
|
with patch('changedetectionio.processors.processor_registry.get_display_link') as mock_get_display_link:
|
||||||
|
mock_get_display_link.return_value = "WHOIS - example.com"
|
||||||
|
|
||||||
|
# The link property should use the customized link from the processor
|
||||||
|
assert watch.link == "WHOIS - example.com"
|
||||||
|
mock_get_display_link.assert_called_once_with(url='https://example.com', processor_name='whois')
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
unittest.main()
|
unittest.main()
|
||||||
|
|||||||
@@ -271,19 +271,38 @@ class update_worker(threading.Thread):
|
|||||||
|
|
||||||
try:
|
try:
|
||||||
# Processor is what we are using for detecting the "Change"
|
# Processor is what we are using for detecting the "Change"
|
||||||
processor = watch.get('processor', 'text_json_diff')
|
processor_name = watch.get('processor', 'text_json_diff')
|
||||||
|
|
||||||
# Init a new 'difference_detection_processor', first look in processors
|
|
||||||
processor_module_name = f"changedetectionio.processors.{processor}.processor"
|
|
||||||
|
# First, try to get the processor from our plugin registry
|
||||||
|
try:
|
||||||
|
from changedetectionio.processors.processor_registry import get_processor_site_check
|
||||||
|
update_handler = get_processor_site_check(processor_name, self.datastore, uuid)
|
||||||
|
|
||||||
|
if update_handler:
|
||||||
|
# We found the processor in our plugin registry
|
||||||
|
logger.info(f"Using processor '{processor_name}' from plugin registry")
|
||||||
|
else:
|
||||||
|
# Fall back to the traditional file-based approach
|
||||||
|
processor_module_name = f"changedetectionio.processors.{processor_name}.processor"
|
||||||
try:
|
try:
|
||||||
processor_module = importlib.import_module(processor_module_name)
|
processor_module = importlib.import_module(processor_module_name)
|
||||||
except ModuleNotFoundError as e:
|
|
||||||
print(f"Processor module '{processor}' not found.")
|
|
||||||
raise e
|
|
||||||
|
|
||||||
update_handler = processor_module.perform_site_check(datastore=self.datastore,
|
update_handler = processor_module.perform_site_check(datastore=self.datastore,
|
||||||
watch_uuid=uuid
|
watch_uuid=uuid)
|
||||||
)
|
except ModuleNotFoundError as e:
|
||||||
|
print(f"Processor module '{processor_name}' not found in both plugin registry and file system.")
|
||||||
|
raise e
|
||||||
|
except ImportError as e:
|
||||||
|
# If processor_registry.py cannot be imported, fall back to the traditional approach
|
||||||
|
processor_module_name = f"changedetectionio.processors.{processor_name}.processor"
|
||||||
|
try:
|
||||||
|
processor_module = importlib.import_module(processor_module_name)
|
||||||
|
update_handler = processor_module.perform_site_check(datastore=self.datastore,
|
||||||
|
watch_uuid=uuid)
|
||||||
|
except ModuleNotFoundError as e:
|
||||||
|
print(f"Processor module '{processor_name}' not found.")
|
||||||
|
raise e
|
||||||
|
|
||||||
update_handler.call_browser()
|
update_handler.call_browser()
|
||||||
|
|
||||||
|
|||||||
31
test_processor_registration.py
Normal file
31
test_processor_registration.py
Normal file
@@ -0,0 +1,31 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
|
||||||
|
from changedetectionio.processors import available_processors
|
||||||
|
from changedetectionio.processors.processor_registry import get_processor_class, get_processor_form
|
||||||
|
|
||||||
|
# Test processor registration
|
||||||
|
print("=== Available Processors ===")
|
||||||
|
processors = available_processors()
|
||||||
|
for name, description in processors:
|
||||||
|
print(f"Processor: {name} - {description}")
|
||||||
|
|
||||||
|
# Check if our WHOIS processor is registered
|
||||||
|
whois_processor_name = "whois_processor"
|
||||||
|
whois_found = any(name == whois_processor_name for name, _ in processors)
|
||||||
|
|
||||||
|
if whois_found:
|
||||||
|
print(f"\nWHOIS Processor found! Getting processor class and form...")
|
||||||
|
|
||||||
|
# Get the processor class
|
||||||
|
processor_class = get_processor_class(whois_processor_name)
|
||||||
|
print(f"Processor class: {processor_class}")
|
||||||
|
print(f"Processor class name: {processor_class.__name__ if processor_class else None}")
|
||||||
|
print(f"Processor class module: {processor_class.__module__ if processor_class else None}")
|
||||||
|
|
||||||
|
# Get the processor form
|
||||||
|
processor_form = get_processor_form(whois_processor_name)
|
||||||
|
print(f"Processor form: {processor_form}")
|
||||||
|
|
||||||
|
print("\nWHOIS Processor successfully registered")
|
||||||
|
else:
|
||||||
|
print(f"\nWHOIS Processor not found in available processors")
|
||||||
16
test_processors.py
Executable file
16
test_processors.py
Executable file
@@ -0,0 +1,16 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
|
||||||
|
from changedetectionio.processors import available_processors
|
||||||
|
from changedetectionio.processors import find_processors
|
||||||
|
|
||||||
|
# Test traditional processor discovery
|
||||||
|
print("=== Traditional Processor Discovery ===")
|
||||||
|
traditional_processors = find_processors()
|
||||||
|
for module, name in traditional_processors:
|
||||||
|
print(f"Found processor: {name} in {module.__name__}")
|
||||||
|
|
||||||
|
# Test combined processor discovery (traditional + pluggy)
|
||||||
|
print("\n=== Combined Processor Discovery ===")
|
||||||
|
combined_processors = available_processors()
|
||||||
|
for name, description in combined_processors:
|
||||||
|
print(f"Processor: {name} - {description}")
|
||||||
53
test_whois_extraction.py
Normal file
53
test_whois_extraction.py
Normal file
@@ -0,0 +1,53 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
|
||||||
|
import urllib.parse
|
||||||
|
import re
|
||||||
|
import sys
|
||||||
|
|
||||||
|
def extract_domain_from_url(url):
|
||||||
|
"""Extract domain from a URL"""
|
||||||
|
parsed_url = urllib.parse.urlparse(url)
|
||||||
|
domain = parsed_url.netloc
|
||||||
|
|
||||||
|
# Remove www. prefix if present
|
||||||
|
domain = re.sub(r'^www\.', '', domain)
|
||||||
|
|
||||||
|
return domain
|
||||||
|
|
||||||
|
# Test domain extraction
|
||||||
|
test_urls = [
|
||||||
|
"https://changedetection.io",
|
||||||
|
"http://www.example.com/page",
|
||||||
|
"https://subdomain.domain.co.uk/path?query=1",
|
||||||
|
"ftp://ftp.example.org",
|
||||||
|
"https://www.changedetection.io/page/subpage",
|
||||||
|
]
|
||||||
|
|
||||||
|
print("=== Domain Extraction Test ===")
|
||||||
|
for url in test_urls:
|
||||||
|
domain = extract_domain_from_url(url)
|
||||||
|
print(f"URL: {url} -> Domain: {domain}")
|
||||||
|
|
||||||
|
# Test WHOIS lookup for changedetection.io
|
||||||
|
try:
|
||||||
|
import whois
|
||||||
|
|
||||||
|
domain = extract_domain_from_url("https://changedetection.io")
|
||||||
|
print(f"\n=== WHOIS lookup for {domain} ===")
|
||||||
|
|
||||||
|
whois_info = whois.whois(domain)
|
||||||
|
|
||||||
|
# Print key information
|
||||||
|
print(f"Domain Name: {whois_info.get('domain_name', '')}")
|
||||||
|
print(f"Registrar: {whois_info.get('registrar', '')}")
|
||||||
|
print(f"Creation Date: {whois_info.get('creation_date', '')}")
|
||||||
|
print(f"Expiration Date: {whois_info.get('expiration_date', '')}")
|
||||||
|
|
||||||
|
print("\nWHOIS lookup successful!")
|
||||||
|
|
||||||
|
except ImportError:
|
||||||
|
print("python-whois module not installed. Run: pip install python-whois")
|
||||||
|
sys.exit(1)
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Error performing WHOIS lookup: {str(e)}")
|
||||||
|
sys.exit(1)
|
||||||
47
test_whois_processor.py
Normal file
47
test_whois_processor.py
Normal file
@@ -0,0 +1,47 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
|
||||||
|
from changedetectionio.processors import available_processors
|
||||||
|
from changedetectionio.processors.processor_registry import get_processor_class
|
||||||
|
import urllib.parse
|
||||||
|
import sys
|
||||||
|
|
||||||
|
# First, verify our processor is available
|
||||||
|
print("=== Available Processors ===")
|
||||||
|
processors = available_processors()
|
||||||
|
for name, description in processors:
|
||||||
|
print(f"Processor: {name} - {description}")
|
||||||
|
|
||||||
|
# Get the WHOIS processor class
|
||||||
|
whois_processor_class = get_processor_class("whois_processor")
|
||||||
|
if not whois_processor_class:
|
||||||
|
print("ERROR: WHOIS processor not found in available processors.")
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
print(f"\nFound WHOIS processor class: {whois_processor_class}")
|
||||||
|
|
||||||
|
# Test the WHOIS processor directly
|
||||||
|
try:
|
||||||
|
# Parse a domain from a URL
|
||||||
|
url = "https://changedetection.io"
|
||||||
|
parsed_url = urllib.parse.urlparse(url)
|
||||||
|
domain = parsed_url.netloc
|
||||||
|
|
||||||
|
# Import whois and fetch information
|
||||||
|
import whois
|
||||||
|
whois_info = whois.whois(domain)
|
||||||
|
|
||||||
|
print(f"\n=== WHOIS Information for {domain} ===")
|
||||||
|
|
||||||
|
# Print the information
|
||||||
|
if hasattr(whois_info, 'text'):
|
||||||
|
print(whois_info.text)
|
||||||
|
else:
|
||||||
|
for key, value in whois_info.items():
|
||||||
|
if value:
|
||||||
|
print(f"{key}: {value}")
|
||||||
|
|
||||||
|
print("\nSuccessfully retrieved WHOIS data!")
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Error fetching WHOIS data: {str(e)}")
|
||||||
|
sys.exit(1)
|
||||||
136
test_whois_processor_full.py
Normal file
136
test_whois_processor_full.py
Normal file
@@ -0,0 +1,136 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
|
||||||
|
from changedetectionio.processors import available_processors
|
||||||
|
from changedetectionio.processors.processor_registry import get_processor_class
|
||||||
|
import unittest
|
||||||
|
import sys
|
||||||
|
from unittest.mock import MagicMock, patch
|
||||||
|
import urllib.parse
|
||||||
|
|
||||||
|
# First, verify our processor is available
|
||||||
|
print("=== Available Processors ===")
|
||||||
|
processors = available_processors()
|
||||||
|
for name, description in processors:
|
||||||
|
print(f"Processor: {name} - {description}")
|
||||||
|
|
||||||
|
# Get the WHOIS processor class
|
||||||
|
whois_processor_class = get_processor_class("whois_processor")
|
||||||
|
if not whois_processor_class:
|
||||||
|
print("ERROR: WHOIS processor not found in available processors.")
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
print(f"\nFound WHOIS processor class: {whois_processor_class}")
|
||||||
|
|
||||||
|
# Create a test for our WHOIS processor
|
||||||
|
class TestWhoisProcessor(unittest.TestCase):
|
||||||
|
|
||||||
|
# Use the real whois function - tests will actually make network requests
|
||||||
|
def test_whois_processor_real(self):
|
||||||
|
# Extract the domain from the URL
|
||||||
|
test_url = "https://changedetection.io"
|
||||||
|
parsed_url = urllib.parse.urlparse(test_url)
|
||||||
|
domain = parsed_url.netloc
|
||||||
|
|
||||||
|
# Create a minimal mock datastore
|
||||||
|
mock_datastore = MagicMock()
|
||||||
|
mock_datastore.data = {
|
||||||
|
'watching': {'test-uuid': {'url': test_url}},
|
||||||
|
'settings': {
|
||||||
|
'application': {'empty_pages_are_a_change': False},
|
||||||
|
'requests': {'timeout': 30}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
mock_datastore.get_all_base_headers.return_value = {}
|
||||||
|
mock_datastore.get_all_headers_in_textfile_for_watch.return_value = {}
|
||||||
|
mock_datastore.get_preferred_proxy_for_watch.return_value = None
|
||||||
|
mock_datastore.get_tag_overrides_for_watch.return_value = []
|
||||||
|
|
||||||
|
# Create a minimal mock watch that mimics the real Watch class
|
||||||
|
class MockWatch:
|
||||||
|
def __init__(self, url):
|
||||||
|
self.link = url
|
||||||
|
self.is_pdf = False
|
||||||
|
self.has_browser_steps = False
|
||||||
|
self.is_source_type_url = False
|
||||||
|
self.history = {}
|
||||||
|
self.history_n = 0
|
||||||
|
self.last_viewed = 0
|
||||||
|
self.newest_history_key = 0
|
||||||
|
|
||||||
|
def get(self, key, default=None):
|
||||||
|
if key == 'uuid':
|
||||||
|
return 'test-uuid'
|
||||||
|
elif key == 'include_filters':
|
||||||
|
return []
|
||||||
|
elif key == 'body':
|
||||||
|
return None
|
||||||
|
elif key == 'method':
|
||||||
|
return 'GET'
|
||||||
|
elif key == 'headers':
|
||||||
|
return {}
|
||||||
|
elif key == 'browser_steps':
|
||||||
|
return []
|
||||||
|
return default
|
||||||
|
|
||||||
|
def __getitem__(self, key):
|
||||||
|
return self.get(key)
|
||||||
|
|
||||||
|
def get_last_fetched_text_before_filters(self):
|
||||||
|
return ""
|
||||||
|
|
||||||
|
def save_last_text_fetched_before_filters(self, content):
|
||||||
|
pass
|
||||||
|
|
||||||
|
def has_special_diff_filter_options_set(self):
|
||||||
|
return False
|
||||||
|
|
||||||
|
def lines_contain_something_unique_compared_to_history(self, lines, ignore_whitespace):
|
||||||
|
return True
|
||||||
|
|
||||||
|
mock_watch = MockWatch(test_url)
|
||||||
|
|
||||||
|
# Create a more complete mock fetcher
|
||||||
|
class MockFetcher:
|
||||||
|
def __init__(self):
|
||||||
|
self.content = ""
|
||||||
|
self.raw_content = b""
|
||||||
|
self.headers = {'Content-Type': 'text/plain'}
|
||||||
|
self.screenshot = None
|
||||||
|
self.xpath_data = None
|
||||||
|
self.instock_data = None
|
||||||
|
self.browser_steps = []
|
||||||
|
|
||||||
|
def get_last_status_code(self):
|
||||||
|
return 200
|
||||||
|
|
||||||
|
def get_all_headers(self):
|
||||||
|
return {'content-type': 'text/plain'}
|
||||||
|
|
||||||
|
def quit(self):
|
||||||
|
pass
|
||||||
|
|
||||||
|
def run(self, **kwargs):
|
||||||
|
pass
|
||||||
|
|
||||||
|
# Create the processor and set the mock fetcher
|
||||||
|
processor = whois_processor_class(datastore=mock_datastore, watch_uuid='test-uuid')
|
||||||
|
processor.fetcher = MockFetcher()
|
||||||
|
|
||||||
|
# Run the processor - this will make an actual WHOIS request
|
||||||
|
changed, update_obj, content = processor.run_changedetection(mock_watch)
|
||||||
|
|
||||||
|
# Print the content for debugging
|
||||||
|
content_str = content.decode('utf-8')
|
||||||
|
print(f"\n=== WHOIS Content from processor (first 200 chars) ===")
|
||||||
|
print(content_str[:200] + "...")
|
||||||
|
|
||||||
|
# Verify the content contains domain information
|
||||||
|
self.assertIn(domain, content_str)
|
||||||
|
self.assertIn("Domain Name", content_str)
|
||||||
|
self.assertIn("Creation Date", content_str)
|
||||||
|
|
||||||
|
print("\nWHOIS processor test with real data PASSED!")
|
||||||
|
|
||||||
|
# Run the test
|
||||||
|
if __name__ == "__main__":
|
||||||
|
unittest.main(argv=['first-arg-is-ignored'], exit=False)
|
||||||
39
test_whois_simple.py
Normal file
39
test_whois_simple.py
Normal file
@@ -0,0 +1,39 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
|
||||||
|
import urllib.parse
|
||||||
|
import re
|
||||||
|
import whois
|
||||||
|
|
||||||
|
# Test with changedetection.io domain
|
||||||
|
url = "https://changedetection.io"
|
||||||
|
|
||||||
|
# Extract domain from URL
|
||||||
|
parsed_url = urllib.parse.urlparse(url)
|
||||||
|
domain = parsed_url.netloc
|
||||||
|
|
||||||
|
# Remove www. prefix if present
|
||||||
|
domain = re.sub(r'^www\.', '', domain)
|
||||||
|
|
||||||
|
# Fetch WHOIS information
|
||||||
|
print(f"Looking up WHOIS data for domain: {domain}")
|
||||||
|
whois_info = whois.whois(domain)
|
||||||
|
|
||||||
|
# Print key WHOIS data
|
||||||
|
print("\nKey WHOIS information:")
|
||||||
|
print(f"Domain Name: {whois_info.get('domain_name', 'Unknown')}")
|
||||||
|
print(f"Registrar: {whois_info.get('registrar', 'Unknown')}")
|
||||||
|
print(f"Creation Date: {whois_info.get('creation_date', 'Unknown')}")
|
||||||
|
print(f"Expiration Date: {whois_info.get('expiration_date', 'Unknown')}")
|
||||||
|
print(f"Updated Date: {whois_info.get('updated_date', 'Unknown')}")
|
||||||
|
|
||||||
|
# Format as text
|
||||||
|
whois_text = f"WHOIS Information for domain: {domain}\n\n"
|
||||||
|
for key, value in whois_info.items():
|
||||||
|
if value:
|
||||||
|
whois_text += f"{key}: {value}\n"
|
||||||
|
|
||||||
|
# Print the first 200 characters
|
||||||
|
print("\nFormatted WHOIS data (first 200 chars):")
|
||||||
|
print(whois_text[:200] + "...")
|
||||||
|
|
||||||
|
print("\nWHOIS lookup successful!")
|
||||||
Reference in New Issue
Block a user