mirror of
				https://github.com/dgtlmoon/changedetection.io.git
				synced 2025-10-31 06:37:41 +00:00 
			
		
		
		
	Compare commits
	
		
			41 Commits
		
	
	
		
			socketio-t
			...
			conditions
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
|   | 79166c0307 | ||
|   | 9dbe91e470 | ||
|   | 51bd8cd2d7 | ||
|   | 35455e7dd6 | ||
|   | aaa038f082 | ||
|   | 57eeb221cb | ||
|   | 8187b9ce4c | ||
|   | cc70b65bfa | ||
|   | 42099f1fff | ||
|   | 408864d346 | ||
|   | 02b8660bf3 | ||
|   | 947a60af89 | ||
|   | a0f4cb4d65 | ||
|   | 71ea8d80f3 | ||
|   | 4f48958187 | ||
|   | 2608980b1d | ||
|   | c982395d72 | ||
|   | ee7e43ea87 | ||
|   | da5585b53c | ||
|   | 76062c9419 | ||
|   | 675953797c | ||
|   | b202652a93 | ||
|   | 617dc721bf | ||
|   | ec13720694 | ||
|   | ddacb0bcbc | ||
|   | f67d98b839 | ||
|   | beee93d528 | ||
|   | 987ab3e494 | ||
|   | 0c68cfffb1 | ||
|   | e93a9244fe | ||
|   | e56eec41c1 | ||
|   | 31f4bb7cc3 | ||
|   | f08efde110 | ||
|   | 9b39b2853b | ||
|   | 892d38ba42 | ||
|   | b170e191d4 | ||
|   | edb78efcca | ||
|   | 383f90b70c | ||
|   | 6948418865 | ||
|   | cd80e317f3 | ||
|   | 8c26210804 | 
| @@ -712,23 +712,63 @@ def changedetection_app(config=None, datastore_o=None): | ||||
|         # Does it use some custom form? does one exist? | ||||
|         processor_name = datastore.data['watching'][uuid].get('processor', '') | ||||
|         processor_classes = next((tpl for tpl in find_processors() if tpl[1] == processor_name), None) | ||||
|          | ||||
|         # If it's not found in traditional processors, check if it's a pluggy plugin | ||||
|         if not processor_classes: | ||||
|             flash(f"Cannot load the edit form for processor/plugin '{processor_classes[1]}', plugin missing?", 'error') | ||||
|             return redirect(url_for('index')) | ||||
|  | ||||
|         parent_module = get_parent_module(processor_classes[0]) | ||||
|  | ||||
|         try: | ||||
|             # Get the parent of the "processor.py" go up one, get the form (kinda spaghetti but its reusing existing code) | ||||
|             forms_module = importlib.import_module(f"{parent_module.__name__}.forms") | ||||
|             # Access the 'processor_settings_form' class from the 'forms' module | ||||
|             form_class = getattr(forms_module, 'processor_settings_form') | ||||
|         except ModuleNotFoundError as e: | ||||
|             # .forms didnt exist | ||||
|             form_class = forms.processor_text_json_diff_form | ||||
|         except AttributeError as e: | ||||
|             # .forms exists but no useful form | ||||
|             form_class = forms.processor_text_json_diff_form | ||||
|             try: | ||||
|                 from changedetectionio.processors.processor_registry import get_processor_form, _get_plugin_name_map | ||||
|                  | ||||
|                 # Get all available plugins for debugging | ||||
|                 available_plugins = list(_get_plugin_name_map().keys()) | ||||
|                 logger.debug(f"Available processor plugins: {available_plugins}") | ||||
|                  | ||||
|                 # Try to get the processor form | ||||
|                 plugin_form_class = get_processor_form(processor_name) | ||||
|                  | ||||
|                 if plugin_form_class: | ||||
|                     # Use default text_json_diff_form as parent module for plugins | ||||
|                     from changedetectionio.processors.text_json_diff import processor as text_json_diff_processor | ||||
|                     form_class = forms.processor_text_json_diff_form | ||||
|                     parent_module = get_parent_module(text_json_diff_processor) | ||||
|                      | ||||
|                     # Skip the normal form loading code path | ||||
|                     use_plugin_form = True | ||||
|                     logger.debug(f"Successfully loaded form for plugin '{processor_name}'") | ||||
|                 else: | ||||
|                     # Check if the plugin is registered but doesn't have a form | ||||
|                     if processor_name in available_plugins: | ||||
|                         logger.error(f"Plugin '{processor_name}' is registered but has no form class") | ||||
|                         flash(f"Plugin '{processor_name}' is registered but has no form class", 'error') | ||||
|                     else: | ||||
|                         logger.error(f"Cannot find plugin '{processor_name}'. Available plugins: {available_plugins}") | ||||
|                         flash(f"Cannot load the edit form for processor/plugin '{processor_name}', plugin missing?", 'error') | ||||
|                     return redirect(url_for('index')) | ||||
|             except ImportError as e: | ||||
|                 logger.error(f"Import error when loading plugin form: {str(e)}") | ||||
|                 flash(f"Cannot load the edit form for processor/plugin '{processor_name}', plugin system not available?", 'error') | ||||
|                 return redirect(url_for('index')) | ||||
|             except Exception as e: | ||||
|                 logger.error(f"Unexpected error loading plugin form: {str(e)}") | ||||
|                 flash(f"Error loading plugin form: {str(e)}", 'error') | ||||
|                 return redirect(url_for('index')) | ||||
|         else: | ||||
|             # Traditional processor - continue with normal flow | ||||
|             parent_module = get_parent_module(processor_classes[0]) | ||||
|             use_plugin_form = False | ||||
|          | ||||
|         # Only follow this path for traditional processors | ||||
|         if not use_plugin_form: | ||||
|             try: | ||||
|                 # Get the parent of the "processor.py" go up one, get the form (kinda spaghetti but its reusing existing code) | ||||
|                 forms_module = importlib.import_module(f"{parent_module.__name__}.forms") | ||||
|                 # Access the 'processor_settings_form' class from the 'forms' module | ||||
|                 form_class = getattr(forms_module, 'processor_settings_form') | ||||
|             except ModuleNotFoundError as e: | ||||
|                 # .forms didnt exist | ||||
|                 form_class = forms.processor_text_json_diff_form | ||||
|             except AttributeError as e: | ||||
|                 # .forms exists but no useful form | ||||
|                 form_class = forms.processor_text_json_diff_form | ||||
|  | ||||
|         form = form_class(formdata=request.form if request.method == 'POST' else None, | ||||
|                           data=default, | ||||
|   | ||||
| @@ -67,7 +67,6 @@ class model(watch_base): | ||||
|  | ||||
|     @property | ||||
|     def link(self): | ||||
|  | ||||
|         url = self.get('url', '') | ||||
|         if not is_safe_url(url): | ||||
|             return 'DISABLED' | ||||
| @@ -93,6 +92,19 @@ class model(watch_base): | ||||
|         # Also double check it after any Jinja2 formatting just incase | ||||
|         if not is_safe_url(ready_url): | ||||
|             return 'DISABLED' | ||||
|              | ||||
|         # Check if a processor wants to customize the display link | ||||
|         processor_name = self.get('processor') | ||||
|         if processor_name: | ||||
|             try: | ||||
|                 # Import here to avoid circular imports | ||||
|                 from changedetectionio.processors.processor_registry import get_display_link | ||||
|                 custom_link = get_display_link(url=ready_url, processor_name=processor_name) | ||||
|                 if custom_link: | ||||
|                     return custom_link | ||||
|             except Exception as e: | ||||
|                 logger.error(f"Error getting custom display link for processor {processor_name}: {str(e)}") | ||||
|                  | ||||
|         return ready_url | ||||
|  | ||||
|     def clear_watch(self): | ||||
|   | ||||
| @@ -3,6 +3,7 @@ from changedetectionio.content_fetchers.base import Fetcher | ||||
| from changedetectionio.strtobool import strtobool | ||||
| from copy import deepcopy | ||||
| from loguru import logger | ||||
|  | ||||
| import hashlib | ||||
| import importlib | ||||
| import inspect | ||||
| @@ -10,6 +11,10 @@ import os | ||||
| import pkgutil | ||||
| import re | ||||
|  | ||||
| # Import the plugin manager | ||||
| from .pluggy_interface import plugin_manager | ||||
|  | ||||
|  | ||||
| class difference_detection_processor(): | ||||
|  | ||||
|     browser_steps = None | ||||
| @@ -26,9 +31,95 @@ class difference_detection_processor(): | ||||
|         self.watch = deepcopy(self.datastore.data['watching'].get(watch_uuid)) | ||||
|         # Generic fetcher that should be extended (requests, playwright etc) | ||||
|         self.fetcher = Fetcher() | ||||
|          | ||||
|     def _get_proxy_for_watch(self, preferred_proxy_id=None): | ||||
|         """Get proxy configuration based on watch settings and preferred proxy ID | ||||
|          | ||||
|         Args: | ||||
|             preferred_proxy_id: Optional explicit proxy ID to use | ||||
|              | ||||
|         Returns: | ||||
|             dict: Proxy configuration or None if no proxy should be used | ||||
|             str: Proxy URL or None if no proxy should be used | ||||
|         """ | ||||
|         # Default to no proxy config | ||||
|         proxy_config = None | ||||
|         proxy_url = None | ||||
|          | ||||
|         # Check if datastore is available and has get_preferred_proxy_for_watch method | ||||
|         if hasattr(self, 'datastore') and self.datastore: | ||||
|             try: | ||||
|                 # Get preferred proxy ID if not provided | ||||
|                 if not preferred_proxy_id and hasattr(self.datastore, 'get_preferred_proxy_for_watch'): | ||||
|                     # Get the watch UUID if available | ||||
|                     watch_uuid = None | ||||
|                     if hasattr(self.watch, 'get'): | ||||
|                         watch_uuid = self.watch.get('uuid') | ||||
|                     elif hasattr(self.watch, 'uuid'): | ||||
|                         watch_uuid = self.watch.uuid | ||||
|                      | ||||
|                     if watch_uuid: | ||||
|                         preferred_proxy_id = self.datastore.get_preferred_proxy_for_watch(uuid=watch_uuid) | ||||
|                  | ||||
|                 # Check if we have a proxy list and a valid proxy ID | ||||
|                 if preferred_proxy_id and hasattr(self.datastore, 'proxy_list') and self.datastore.proxy_list: | ||||
|                     proxy_info = self.datastore.proxy_list.get(preferred_proxy_id) | ||||
|                      | ||||
|                     if proxy_info and 'url' in proxy_info: | ||||
|                         proxy_url = proxy_info.get('url') | ||||
|                         logger.debug(f"Selected proxy key '{preferred_proxy_id}' as proxy URL '{proxy_url}'") | ||||
|                          | ||||
|                         # Parse the proxy URL to build a proxy dict for requests | ||||
|                         import urllib.parse | ||||
|                         parsed_proxy = urllib.parse.urlparse(proxy_url) | ||||
|                         proxy_type = parsed_proxy.scheme | ||||
|                          | ||||
|                         # Extract credentials if present | ||||
|                         username = None | ||||
|                         password = None | ||||
|                         if parsed_proxy.username: | ||||
|                             username = parsed_proxy.username | ||||
|                             if parsed_proxy.password: | ||||
|                                 password = parsed_proxy.password | ||||
|                          | ||||
|                         # Build the proxy URL without credentials for the proxy dict | ||||
|                         netloc = parsed_proxy.netloc | ||||
|                         if '@' in netloc: | ||||
|                             netloc = netloc.split('@')[1] | ||||
|                          | ||||
|                         proxy_addr = f"{proxy_type}://{netloc}" | ||||
|                          | ||||
|                         # Create the proxy configuration | ||||
|                         proxy_config = { | ||||
|                             'http': proxy_addr, | ||||
|                             'https': proxy_addr | ||||
|                         } | ||||
|                          | ||||
|                         # Add credentials if present | ||||
|                         if username: | ||||
|                             proxy_config['username'] = username | ||||
|                             if password: | ||||
|                                 proxy_config['password'] = password | ||||
|             except Exception as e: | ||||
|                 # Log the error but continue without a proxy | ||||
|                 logger.error(f"Error setting up proxy: {str(e)}") | ||||
|                 proxy_config = None | ||||
|                 proxy_url = None | ||||
|                  | ||||
|         return proxy_config, proxy_url | ||||
|  | ||||
|     def call_browser(self, preferred_proxy_id=None): | ||||
|  | ||||
|         """Fetch content using the appropriate browser/fetcher | ||||
|          | ||||
|         This method will: | ||||
|         1. Determine the appropriate fetcher to use based on watch settings | ||||
|         2. Set up proxy configuration if needed | ||||
|         3. Initialize the fetcher with the correct parameters | ||||
|         4. Configure any browser steps if needed | ||||
|          | ||||
|         Args: | ||||
|             preferred_proxy_id: Optional explicit proxy ID to use | ||||
|         """ | ||||
|         from requests.structures import CaseInsensitiveDict | ||||
|  | ||||
|         url = self.watch.link | ||||
| @@ -43,8 +134,8 @@ class difference_detection_processor(): | ||||
|         # Requests, playwright, other browser via wss:// etc, fetch_extra_something | ||||
|         prefer_fetch_backend = self.watch.get('fetch_backend', 'system') | ||||
|  | ||||
|         # Proxy ID "key" | ||||
|         preferred_proxy_id = preferred_proxy_id if preferred_proxy_id else self.datastore.get_preferred_proxy_for_watch(uuid=self.watch.get('uuid')) | ||||
|         # Get proxy configuration | ||||
|         proxy_config, proxy_url = self._get_proxy_for_watch(preferred_proxy_id) | ||||
|  | ||||
|         # Pluggable content self.fetcher | ||||
|         if not prefer_fetch_backend or prefer_fetch_backend == 'system': | ||||
| @@ -82,14 +173,10 @@ class difference_detection_processor(): | ||||
|             # What it referenced doesnt exist, Just use a default | ||||
|             fetcher_obj = getattr(content_fetchers, "html_requests") | ||||
|  | ||||
|         proxy_url = None | ||||
|         if preferred_proxy_id: | ||||
|             # Custom browser endpoints should NOT have a proxy added | ||||
|             if not prefer_fetch_backend.startswith('extra_browser_'): | ||||
|                 proxy_url = self.datastore.proxy_list.get(preferred_proxy_id).get('url') | ||||
|                 logger.debug(f"Selected proxy key '{preferred_proxy_id}' as proxy URL '{proxy_url}' for {url}") | ||||
|             else: | ||||
|                 logger.debug(f"Skipping adding proxy data when custom Browser endpoint is specified. ") | ||||
|         # Custom browser endpoints should NOT have a proxy added | ||||
|         if proxy_url and prefer_fetch_backend.startswith('extra_browser_'): | ||||
|             logger.debug(f"Skipping adding proxy data when custom Browser endpoint is specified.") | ||||
|             proxy_url = None | ||||
|  | ||||
|         # Now call the fetcher (playwright/requests/etc) with arguments that only a fetcher would need. | ||||
|         # When browser_connection_url is None, it method should default to working out whats the best defaults (os env vars etc) | ||||
| @@ -185,9 +272,9 @@ def find_sub_packages(package_name): | ||||
|  | ||||
| def find_processors(): | ||||
|     """ | ||||
|     Find all subclasses of DifferenceDetectionProcessor in the specified package. | ||||
|     Find all subclasses of DifferenceDetectionProcessor in the specified package | ||||
|     and also include processors from the plugin system. | ||||
|  | ||||
|     :param package_name: The name of the package to scan for processor modules. | ||||
|     :return: A list of (module, class) tuples. | ||||
|     """ | ||||
|     package_name = "changedetectionio.processors"  # Name of the current package/module | ||||
| @@ -195,6 +282,7 @@ def find_processors(): | ||||
|     processors = [] | ||||
|     sub_packages = find_sub_packages(package_name) | ||||
|  | ||||
|     # Find traditional processors | ||||
|     for sub_package in sub_packages: | ||||
|         module_name = f"{package_name}.{sub_package}.processor" | ||||
|         try: | ||||
| @@ -207,6 +295,15 @@ def find_processors(): | ||||
|         except (ModuleNotFoundError, ImportError) as e: | ||||
|             logger.warning(f"Failed to import module {module_name}: {e} (find_processors())") | ||||
|  | ||||
|     # Also include processors from the plugin system | ||||
|     try: | ||||
|         from .processor_registry import get_plugin_processor_modules | ||||
|         plugin_modules = get_plugin_processor_modules() | ||||
|         if plugin_modules: | ||||
|             processors.extend(plugin_modules) | ||||
|     except (ImportError, ModuleNotFoundError) as e: | ||||
|         logger.warning(f"Failed to import plugin modules: {e} (find_processors())") | ||||
|  | ||||
|     return processors | ||||
|  | ||||
|  | ||||
| @@ -223,8 +320,22 @@ def get_parent_module(module): | ||||
|     return False | ||||
|  | ||||
|  | ||||
|  | ||||
| def get_custom_watch_obj_for_processor(processor_name): | ||||
|     """ | ||||
|     Get the custom watch object for a processor | ||||
|     :param processor_name: Name of the processor | ||||
|     :return: Watch class or None | ||||
|     """ | ||||
|     # First, try to get the watch model from the pluggy system | ||||
|     try: | ||||
|         from .processor_registry import get_processor_watch_model | ||||
|         watch_model = get_processor_watch_model(processor_name) | ||||
|         if watch_model: | ||||
|             return watch_model | ||||
|     except Exception as e: | ||||
|         logger.warning(f"Error getting processor watch model from pluggy: {e}") | ||||
|  | ||||
|     # Fall back to the traditional approach | ||||
|     from changedetectionio.model import Watch | ||||
|     watch_class = Watch.model | ||||
|     processor_classes = find_processors() | ||||
| @@ -241,14 +352,47 @@ def get_custom_watch_obj_for_processor(processor_name): | ||||
| def available_processors(): | ||||
|     """ | ||||
|     Get a list of processors by name and description for the UI elements | ||||
|     :return: A list :) | ||||
|     :return: A list of tuples (processor_name, description) | ||||
|     """ | ||||
|  | ||||
|     processor_classes = find_processors() | ||||
|  | ||||
|     available = [] | ||||
|     for package, processor_class in processor_classes: | ||||
|         available.append((processor_class, package.name)) | ||||
|  | ||||
|     return available | ||||
|  | ||||
|     # Get processors from the pluggy system | ||||
|     pluggy_processors = [] | ||||
|     try: | ||||
|         from .processor_registry import get_all_processors | ||||
|         pluggy_processors = get_all_processors() | ||||
|     except Exception as e: | ||||
|         logger.error(f"Error getting processors from pluggy: {str(e)}") | ||||
|      | ||||
|     # Get processors from the traditional file-based system | ||||
|     traditional_processors = [] | ||||
|     try: | ||||
|         # Let's not use find_processors() directly since it now also includes pluggy processors | ||||
|         package_name = "changedetectionio.processors" | ||||
|         sub_packages = find_sub_packages(package_name) | ||||
|          | ||||
|         for sub_package in sub_packages: | ||||
|             module_name = f"{package_name}.{sub_package}.processor" | ||||
|             try: | ||||
|                 module = importlib.import_module(module_name) | ||||
|                 # Get the name and description from the module if available | ||||
|                 name = getattr(module, 'name', f"Traditional processor: {sub_package}") | ||||
|                 description = getattr(module, 'description', sub_package) | ||||
|                 traditional_processors.append((sub_package, name)) | ||||
|             except (ModuleNotFoundError, ImportError, AttributeError) as e: | ||||
|                 logger.warning(f"Failed to import module {module_name}: {e} (available_processors())") | ||||
|     except Exception as e: | ||||
|         logger.error(f"Error getting traditional processors: {str(e)}") | ||||
|      | ||||
|     # Combine the lists, ensuring no duplicates | ||||
|     # Pluggy processors take precedence | ||||
|     all_processors = [] | ||||
|      | ||||
|     # Add all pluggy processors | ||||
|     all_processors.extend(pluggy_processors) | ||||
|      | ||||
|     # Add traditional processors that aren't already registered via pluggy | ||||
|     pluggy_processor_names = [name for name, _ in pluggy_processors] | ||||
|     for processor_class, name in traditional_processors: | ||||
|         if processor_class not in pluggy_processor_names: | ||||
|             all_processors.append((processor_class, name)) | ||||
|      | ||||
|     return all_processors | ||||
							
								
								
									
										17
									
								
								changedetectionio/processors/form.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										17
									
								
								changedetectionio/processors/form.py
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,17 @@ | ||||
| from wtforms import ( | ||||
|     BooleanField, | ||||
|     validators, | ||||
|     RadioField | ||||
| ) | ||||
| from wtforms.fields.choices import SelectField | ||||
| from wtforms.fields.form import FormField | ||||
| from wtforms.form import Form | ||||
|  | ||||
| class BaseProcessorForm(Form): | ||||
|     """Base class for processor forms""" | ||||
|      | ||||
|     def extra_tab_content(self): | ||||
|         return None | ||||
|  | ||||
|     def extra_form_content(self): | ||||
|         return None | ||||
							
								
								
									
										4
									
								
								changedetectionio/processors/forms.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										4
									
								
								changedetectionio/processors/forms.py
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,4 @@ | ||||
| """ | ||||
| Forms for processors | ||||
| """ | ||||
| from changedetectionio.forms import processor_text_json_diff_form | ||||
							
								
								
									
										69
									
								
								changedetectionio/processors/pluggy_interface.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										69
									
								
								changedetectionio/processors/pluggy_interface.py
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,69 @@ | ||||
| import pluggy | ||||
|  | ||||
| # Define the plugin namespace for processors | ||||
| PLUGIN_NAMESPACE = "changedetectionio_processors" | ||||
|  | ||||
| hookspec = pluggy.HookspecMarker(PLUGIN_NAMESPACE) | ||||
| hookimpl = pluggy.HookimplMarker(PLUGIN_NAMESPACE) | ||||
|  | ||||
|  | ||||
| class ProcessorSpec: | ||||
|     """Hook specifications for processor plugins.""" | ||||
|  | ||||
|     @hookspec | ||||
|     def get_processor_name(): | ||||
|         """Return the name of the processor.""" | ||||
|         pass | ||||
|  | ||||
|     @hookspec | ||||
|     def get_processor_description(): | ||||
|         """Return the description of the processor.""" | ||||
|         pass | ||||
|      | ||||
|     @hookspec | ||||
|     def get_processor_class(): | ||||
|         """Return the processor class.""" | ||||
|         pass | ||||
|      | ||||
|     @hookspec | ||||
|     def get_processor_form(): | ||||
|         """Return the processor form class.""" | ||||
|         pass | ||||
|      | ||||
|     @hookspec | ||||
|     def get_processor_watch_model(): | ||||
|         """Return the watch model class for this processor (if any).""" | ||||
|         pass | ||||
|      | ||||
|     @hookspec | ||||
|     def get_display_link(url, processor_name): | ||||
|         """Return a custom display link for the given processor. | ||||
|          | ||||
|         Args: | ||||
|             url: The original URL from the watch | ||||
|             processor_name: The name of the processor | ||||
|              | ||||
|         Returns: | ||||
|             A string with the custom display link or None to use the default | ||||
|         """ | ||||
|         pass | ||||
|      | ||||
|     @hookspec | ||||
|     def perform_site_check(datastore, watch_uuid): | ||||
|         """Create and return a processor instance ready to perform site check. | ||||
|          | ||||
|         Args: | ||||
|             datastore: The application datastore | ||||
|             watch_uuid: The UUID of the watch to check | ||||
|              | ||||
|         Returns: | ||||
|             A processor instance ready to perform site check | ||||
|         """ | ||||
|         pass | ||||
|  | ||||
|  | ||||
| # Set up the plugin manager | ||||
| plugin_manager = pluggy.PluginManager(PLUGIN_NAMESPACE) | ||||
|  | ||||
| # Register hook specifications | ||||
| plugin_manager.add_hookspecs(ProcessorSpec) | ||||
							
								
								
									
										222
									
								
								changedetectionio/processors/processor_registry.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										222
									
								
								changedetectionio/processors/processor_registry.py
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,222 @@ | ||||
| from loguru import logger | ||||
| from changedetectionio.model import Watch | ||||
| from .pluggy_interface import plugin_manager | ||||
| from typing import Dict, Any, List, Tuple, Optional, TypeVar, Type | ||||
| import functools | ||||
|  | ||||
| # Import and register internal plugins | ||||
| from . import whois_plugin | ||||
| from . import test_plugin | ||||
|  | ||||
| # Register plugins | ||||
| plugin_manager.register(whois_plugin) | ||||
| plugin_manager.register(test_plugin) | ||||
|  | ||||
| # Load any setuptools entrypoints | ||||
| plugin_manager.load_setuptools_entrypoints("changedetectionio_processors") | ||||
|  | ||||
| # Type definitions for better type hinting | ||||
| T = TypeVar('T') | ||||
| ProcessorClass = TypeVar('ProcessorClass') | ||||
| ProcessorForm = TypeVar('ProcessorForm') | ||||
| ProcessorWatchModel = TypeVar('ProcessorWatchModel') | ||||
| ProcessorInstance = TypeVar('ProcessorInstance') | ||||
|  | ||||
| # Cache for plugin name mapping to improve performance | ||||
| _plugin_name_map: Dict[str, Any] = {} | ||||
|  | ||||
| def register_plugin(plugin_module): | ||||
|     """Register a processor plugin""" | ||||
|     plugin_manager.register(plugin_module) | ||||
|     # Clear the plugin name map cache when a new plugin is registered | ||||
|     global _plugin_name_map | ||||
|     _plugin_name_map = {} | ||||
|  | ||||
| def _get_plugin_name_map() -> Dict[str, Any]: | ||||
|     """Get a mapping of processor names to plugins | ||||
|     :return: Dictionary mapping processor names to plugins | ||||
|     """ | ||||
|     global _plugin_name_map | ||||
|      | ||||
|     # Return cached map if available | ||||
|     if _plugin_name_map: | ||||
|         return _plugin_name_map | ||||
|      | ||||
|     # Build the map | ||||
|     result = {} | ||||
|      | ||||
|     # Get all plugins from the plugin manager | ||||
|     all_plugins = list(plugin_manager.get_plugins()) | ||||
|      | ||||
|     # First register known internal plugins by name for reliability | ||||
|     known_plugins = { | ||||
|         'whois': whois_plugin, | ||||
|         'test': test_plugin | ||||
|     } | ||||
|      | ||||
|     for name, plugin in known_plugins.items(): | ||||
|         if plugin in all_plugins: | ||||
|             result[name] = plugin | ||||
|      | ||||
|     # Then process remaining plugins through the hook system | ||||
|     for plugin in all_plugins: | ||||
|         if plugin in known_plugins.values(): | ||||
|             continue  # Skip plugins we've already registered | ||||
|              | ||||
|         try: | ||||
|             # Get the processor name from this plugin | ||||
|             name_results = plugin_manager.hook.get_processor_name(plugin=plugin) | ||||
|              | ||||
|             if name_results: | ||||
|                 plugin_name = name_results[0] | ||||
|                  | ||||
|                 # Check for name collisions | ||||
|                 if plugin_name in result: | ||||
|                     logger.warning(f"Plugin name collision: '{plugin_name}' is already registered") | ||||
|                     continue | ||||
|                      | ||||
|                 result[plugin_name] = plugin | ||||
|         except Exception as e: | ||||
|             logger.error(f"Error getting processor name from plugin: {str(e)}") | ||||
|      | ||||
|     # Cache the map | ||||
|     _plugin_name_map = result | ||||
|     return result | ||||
|  | ||||
| def _get_plugin_by_name(processor_name: str) -> Optional[Any]: | ||||
|     """Get a plugin by its processor name | ||||
|     :param processor_name: Name of the processor | ||||
|     :return: Plugin object or None | ||||
|     """ | ||||
|     return _get_plugin_name_map().get(processor_name) | ||||
|  | ||||
| def _call_hook_for_plugin(plugin: Any, hook_name: str, default_value: T = None, **kwargs) -> Optional[T]: | ||||
|     """Call a hook for a specific plugin and handle exceptions | ||||
|     :param plugin: The plugin to call the hook for | ||||
|     :param hook_name: Name of the hook to call | ||||
|     :param default_value: Default value to return if the hook call fails | ||||
|     :param kwargs: Additional arguments to pass to the hook | ||||
|     :return: Result of the hook call or default value | ||||
|     """ | ||||
|     if not plugin: | ||||
|         return default_value | ||||
|      | ||||
|     try: | ||||
|         hook = getattr(plugin_manager.hook, hook_name) | ||||
|         results = hook(plugin=plugin, **kwargs) | ||||
|          | ||||
|         if results: | ||||
|             return results[0] | ||||
|     except Exception as e: | ||||
|         logger.error(f"Error calling {hook_name} for plugin: {str(e)}") | ||||
|      | ||||
|     return default_value | ||||
|  | ||||
| def get_all_processors() -> List[Tuple[str, str]]: | ||||
|     """Get all processors | ||||
|     :return: List of tuples (processor_name, processor_description) | ||||
|     """ | ||||
|     processors = [] | ||||
|      | ||||
|     for processor_name, plugin in _get_plugin_name_map().items(): | ||||
|         description = _call_hook_for_plugin(plugin, 'get_processor_description') | ||||
|         if description: | ||||
|             processors.append((processor_name, description)) | ||||
|      | ||||
|     return processors | ||||
|  | ||||
| def get_processor_class(processor_name: str) -> Optional[Type[ProcessorClass]]: | ||||
|     """Get processor class by name | ||||
|     :param processor_name: Name of the processor | ||||
|     :return: Processor class or None | ||||
|     """ | ||||
|     plugin = _get_plugin_by_name(processor_name) | ||||
|     return _call_hook_for_plugin(plugin, 'get_processor_class') | ||||
|  | ||||
| def get_processor_form(processor_name: str) -> Optional[Type[ProcessorForm]]: | ||||
|     """Get processor form by name | ||||
|     :param processor_name: Name of the processor | ||||
|     :return: Processor form class or None | ||||
|     """ | ||||
|     plugin = _get_plugin_by_name(processor_name) | ||||
|     return _call_hook_for_plugin(plugin, 'get_processor_form') | ||||
|  | ||||
| def get_processor_watch_model(processor_name: str) -> Type[ProcessorWatchModel]: | ||||
|     """Get processor watch model by name | ||||
|     :param processor_name: Name of the processor | ||||
|     :return: Watch model class or default Watch model | ||||
|     """ | ||||
|     plugin = _get_plugin_by_name(processor_name) | ||||
|     return _call_hook_for_plugin(plugin, 'get_processor_watch_model', default_value=Watch.model) | ||||
|  | ||||
| def get_processor_site_check(processor_name: str, datastore: Any, watch_uuid: str) -> Optional[ProcessorInstance]: | ||||
|     """Get a processor instance ready to perform site check | ||||
|     :param processor_name: Name of the processor | ||||
|     :param datastore: The application datastore | ||||
|     :param watch_uuid: The UUID of the watch to check | ||||
|     :return: A processor instance ready to perform site check, or None | ||||
|     """ | ||||
|     plugin = _get_plugin_by_name(processor_name) | ||||
|     if not plugin: | ||||
|         return None | ||||
|      | ||||
|     # Try to get the perform_site_check implementation | ||||
|     try: | ||||
|         processor = _call_hook_for_plugin( | ||||
|             plugin,  | ||||
|             'perform_site_check',  | ||||
|             datastore=datastore,  | ||||
|             watch_uuid=watch_uuid | ||||
|         ) | ||||
|         if processor: | ||||
|             return processor | ||||
|          | ||||
|         # If no perform_site_check hook implementation, try getting the class and instantiating it | ||||
|         processor_class = _call_hook_for_plugin(plugin, 'get_processor_class') | ||||
|         if processor_class: | ||||
|             return processor_class(datastore=datastore, watch_uuid=watch_uuid) | ||||
|     except Exception as e: | ||||
|         logger.error(f"Error getting processor site check for {processor_name}: {str(e)}") | ||||
|      | ||||
|     return None | ||||
|  | ||||
| def get_display_link(url: str, processor_name: str) -> Optional[str]: | ||||
|     """Get a custom display link for the given processor | ||||
|     :param url: The original URL from the watch | ||||
|     :param processor_name: Name of the processor | ||||
|     :return: A string with the custom display link or None to use the default | ||||
|     """ | ||||
|     plugin = _get_plugin_by_name(processor_name) | ||||
|     return _call_hook_for_plugin( | ||||
|         plugin,  | ||||
|         'get_display_link',  | ||||
|         url=url,  | ||||
|         processor_name=processor_name | ||||
|     ) | ||||
|  | ||||
| def get_plugin_processor_modules() -> List[Tuple[Any, str]]: | ||||
|     """Get processor modules for all plugins that can be used with the find_processors function | ||||
|      | ||||
|     This function adapts pluggy plugins to be compatible with the traditional find_processors system | ||||
|      | ||||
|     :return: A list of (module, processor_name) tuples | ||||
|     """ | ||||
|     result = [] | ||||
|      | ||||
|     # Import base modules once to avoid repeated imports | ||||
|     from changedetectionio.processors.text_json_diff import processor as text_json_diff_processor | ||||
|  | ||||
|     # For each plugin, map to a suitable module for find_processors | ||||
|     for processor_name, plugin in _get_plugin_name_map().items(): | ||||
|         try: | ||||
|             processor_class = _call_hook_for_plugin(plugin, 'get_processor_class') | ||||
|              | ||||
|             if processor_class: | ||||
|                 # Check if this processor extends the text_json_diff processor | ||||
|                 base_class_name = str(processor_class.__bases__[0].__name__) | ||||
|                 if base_class_name == 'perform_site_check' or 'TextJsonDiffProcessor' in base_class_name: | ||||
|                     result.append((text_json_diff_processor, processor_name)) | ||||
|         except Exception as e: | ||||
|             logger.error(f"Error mapping processor module for {processor_name}: {str(e)}") | ||||
|      | ||||
|     return result | ||||
							
								
								
									
										169
									
								
								changedetectionio/processors/whois_plugin.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										169
									
								
								changedetectionio/processors/whois_plugin.py
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,169 @@ | ||||
| from loguru import logger | ||||
| import re | ||||
| import urllib.parse | ||||
| from .pluggy_interface import hookimpl | ||||
| from requests.structures import CaseInsensitiveDict | ||||
| from changedetectionio.content_fetchers.base import Fetcher | ||||
|  | ||||
| # Import the text_json_diff processor | ||||
| from changedetectionio.processors.text_json_diff.processor import perform_site_check as TextJsonDiffProcessor | ||||
|  | ||||
| # WHOIS Processor implementation that extends TextJsonDiffProcessor | ||||
| class WhoisProcessor(TextJsonDiffProcessor): | ||||
|      | ||||
|     def _extract_domain_from_url(self, url): | ||||
|         """Extract domain from URL, removing www. prefix if present""" | ||||
|         parsed_url = urllib.parse.urlparse(url) | ||||
|         domain = parsed_url.netloc | ||||
|          | ||||
|         # Remove www. prefix if present | ||||
|         domain = re.sub(r'^www\.', '', domain) | ||||
|          | ||||
|         return domain | ||||
|      | ||||
|     def call_browser(self, preferred_proxy_id=None): | ||||
|         """Override call_browser to perform WHOIS lookup instead of using a browser | ||||
|          | ||||
|         Note: The python-whois library doesn't directly support proxies. For real proxy support, | ||||
|         we would need to implement a custom socket connection that routes through the proxy. | ||||
|         This is a TODO for a future enhancement. | ||||
|         """ | ||||
|         # Initialize a basic fetcher - this is used by the parent class | ||||
|         self.fetcher = Fetcher() | ||||
|          | ||||
|         # Extract URL from watch | ||||
|         url = self.watch.link | ||||
|          | ||||
|         # Check for file:// access | ||||
|         if re.search(r'^file:', url.strip(), re.IGNORECASE): | ||||
|             if not self.datastore.data.get('settings', {}).get('application', {}).get('allow_file_uri', False): | ||||
|                 raise Exception("file:// type access is denied for security reasons.") | ||||
|          | ||||
|         # Extract domain from URL | ||||
|         domain = self._extract_domain_from_url(url) | ||||
|          | ||||
|         # Ensure we have a valid domain | ||||
|         if not domain: | ||||
|             error_msg = f"Could not extract domain from URL: '{url}'" | ||||
|             self.fetcher.content = error_msg | ||||
|             self.fetcher.status_code = 400 | ||||
|             logger.error(error_msg) | ||||
|             return | ||||
|          | ||||
|         # Get proxy configuration using the common method from parent class | ||||
|         proxy_config, proxy_url = super()._get_proxy_for_watch(preferred_proxy_id) | ||||
|          | ||||
|         try: | ||||
|             # Use python-whois to get domain information | ||||
|             import whois | ||||
|              | ||||
|             # If we have proxy config, use it for the WHOIS lookup | ||||
|             # Note: The python-whois library doesn't directly support proxies, | ||||
|             # but we can implement proxy support if necessary using custom socket code | ||||
|             if proxy_config: | ||||
|                 # For now, just log that we would use a proxy | ||||
|                 logger.info(f"Using proxy for WHOIS lookup: {proxy_config}") | ||||
|              | ||||
|             # Perform the WHOIS lookup | ||||
|             whois_info = whois.whois(domain) | ||||
|              | ||||
|             # Convert whois_info object to text | ||||
|             if hasattr(whois_info, 'text'): | ||||
|                 # Some whois implementations store raw text in .text attribute | ||||
|                 whois_text = whois_info.text | ||||
|             else: | ||||
|                 # Otherwise, format it nicely as key-value pairs | ||||
|                 whois_text = f"WHOIS Information for domain: {domain}\n\n" | ||||
|                 for key, value in whois_info.items(): | ||||
|                     if value: | ||||
|                         whois_text += f"{key}: {value}\n" | ||||
|              | ||||
|             # Set the content and status for the fetcher | ||||
|             self.fetcher.content = whois_text | ||||
|             self.fetcher.status_code = 200 | ||||
|              | ||||
|             # Setup headers dictionary for the fetcher | ||||
|             self.fetcher.headers = CaseInsensitiveDict({ | ||||
|                 'content-type': 'text/plain', | ||||
|                 'server': 'whois-processor' | ||||
|             }) | ||||
|              | ||||
|             # Add getters for headers | ||||
|             self.fetcher.get_all_headers = lambda: self.fetcher.headers | ||||
|             self.fetcher.get_last_status_code = lambda: self.fetcher.status_code | ||||
|              | ||||
|             # Implement necessary methods | ||||
|             self.fetcher.quit = lambda: None | ||||
|              | ||||
|         except Exception as e: | ||||
|             error_msg = f"Error fetching WHOIS data for domain {domain}: {str(e)}" | ||||
|             self.fetcher.content = error_msg | ||||
|             self.fetcher.status_code = 500 | ||||
|             self.fetcher.headers = CaseInsensitiveDict({ | ||||
|                 'content-type': 'text/plain', | ||||
|                 'server': 'whois-processor' | ||||
|             }) | ||||
|             self.fetcher.get_all_headers = lambda: self.fetcher.headers | ||||
|             self.fetcher.get_last_status_code = lambda: self.fetcher.status_code | ||||
|             self.fetcher.quit = lambda: None | ||||
|             logger.error(error_msg) | ||||
|  | ||||
|         return | ||||
|      | ||||
|     def run_changedetection(self, watch): | ||||
|         """Use the parent's run_changedetection which will use our overridden call_browser method""" | ||||
|         try: | ||||
|             # Let the parent class handle everything now that we've overridden call_browser | ||||
|             changed_detected, update_obj, filtered_text = super().run_changedetection(watch) | ||||
|             return changed_detected, update_obj, filtered_text | ||||
|              | ||||
|         except Exception as e: | ||||
|             error_msg = f"Error in WHOIS processor: {str(e)}" | ||||
|             update_obj = {'last_notification_error': False, 'last_error': error_msg} | ||||
|             logger.error(error_msg) | ||||
|             return False, update_obj, error_msg.encode('utf-8') | ||||
|  | ||||
|     @staticmethod | ||||
|     def perform_site_check(datastore, watch_uuid): | ||||
|         """Factory method to create a WhoisProcessor instance - for compatibility with legacy code""" | ||||
|         processor = WhoisProcessor(datastore=datastore, watch_uuid=watch_uuid) | ||||
|         return processor | ||||
|  | ||||
| @hookimpl | ||||
| def perform_site_check(datastore, watch_uuid): | ||||
|     """Create and return a processor instance ready to perform site check""" | ||||
|     return WhoisProcessor(datastore=datastore, watch_uuid=watch_uuid) | ||||
|  | ||||
| @hookimpl(trylast=True)  # Use trylast to ensure this runs last in case of conflicts | ||||
| def get_processor_name(): | ||||
|     """Return the name of this processor""" | ||||
|     from loguru import logger | ||||
|     logger.debug("whois_plugin.get_processor_name() called") | ||||
|     return "whois" | ||||
|  | ||||
| @hookimpl | ||||
| def get_processor_description(): | ||||
|     """Return the description of this processor""" | ||||
|     return "WHOIS Domain Information Changes Detector" | ||||
|  | ||||
| @hookimpl | ||||
| def get_processor_class(): | ||||
|     """Return the processor class""" | ||||
|     return WhoisProcessor | ||||
|  | ||||
| @hookimpl | ||||
| def get_processor_form(): | ||||
|     """Return the processor form class""" | ||||
|     # Import here to avoid circular imports | ||||
|     try: | ||||
|         from changedetectionio.forms import processor_text_json_diff_form | ||||
|         return processor_text_json_diff_form | ||||
|     except Exception as e: | ||||
|         from loguru import logger | ||||
|         logger.error(f"Error importing form for whois plugin: {str(e)}") | ||||
|         return None | ||||
|  | ||||
| @hookimpl | ||||
| def get_processor_watch_model(): | ||||
|     """Return the watch model class for this processor""" | ||||
|     return None  # Use default watch model | ||||
							
								
								
									
										59
									
								
								changedetectionio/tests/test_processor_registry.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										59
									
								
								changedetectionio/tests/test_processor_registry.py
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,59 @@ | ||||
| import pytest | ||||
| from changedetectionio.processors.processor_registry import get_processor_class, get_all_processors | ||||
|  | ||||
|  | ||||
| def test_get_all_processors(): | ||||
|     """Test that get_all_processors returns a list of processor tuples""" | ||||
|     processors = get_all_processors() | ||||
|     assert isinstance(processors, list) | ||||
|     assert len(processors) > 0 | ||||
|      | ||||
|     # Each item should be a tuple of (name, description) | ||||
|     for processor in processors: | ||||
|         assert isinstance(processor, tuple) | ||||
|         assert len(processor) == 2 | ||||
|         assert isinstance(processor[0], str) | ||||
|         assert isinstance(processor[1], str) | ||||
|          | ||||
|     # Check that our WHOIS processor is included | ||||
|     whois_processor = next((p for p in processors if p[0] == "whois"), None) | ||||
|     assert whois_processor is not None | ||||
|     assert whois_processor[1] == "WHOIS Domain Information Changes" | ||||
|  | ||||
|  | ||||
| def test_get_processor_class(): | ||||
|     """Test that get_processor_class returns the right class""" | ||||
|     # Get the WHOIS processor class | ||||
|     processor_class = get_processor_class("whois") | ||||
|     assert processor_class is not None | ||||
|      | ||||
|     # It should have perform_site_check method | ||||
|     assert hasattr(processor_class, 'perform_site_check') | ||||
|      | ||||
|     # Check for non-existent processor | ||||
|     non_existent = get_processor_class("non_existent_processor") | ||||
|     assert non_existent is None | ||||
|  | ||||
|  | ||||
| def test_get_processor_site_check(): | ||||
|     """Test that get_processor_site_check returns a processor instance""" | ||||
|     from unittest.mock import MagicMock | ||||
|     from changedetectionio.processors.processor_registry import get_processor_site_check | ||||
|      | ||||
|     # Get a WHOIS processor instance | ||||
|     mock_datastore = MagicMock() | ||||
|     watch_uuid = "test-uuid" | ||||
|     processor = get_processor_site_check("whois", mock_datastore, watch_uuid) | ||||
|      | ||||
|     # It should be a processor instance | ||||
|     assert processor is not None | ||||
|      | ||||
|     # It should have the run_changedetection method | ||||
|     assert hasattr(processor, 'run_changedetection') | ||||
|      | ||||
|     # It should have the call_browser method | ||||
|     assert hasattr(processor, 'call_browser') | ||||
|      | ||||
|     # Check for non-existent processor | ||||
|     non_existent = get_processor_site_check("non_existent_processor", mock_datastore, watch_uuid) | ||||
|     assert non_existent is None | ||||
							
								
								
									
										182
									
								
								changedetectionio/tests/test_whois_processor.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										182
									
								
								changedetectionio/tests/test_whois_processor.py
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,182 @@ | ||||
| import pytest | ||||
| from unittest.mock import MagicMock, patch | ||||
| from changedetectionio.processors.whois_plugin import WhoisProcessor | ||||
|  | ||||
|  | ||||
| class MockWatch: | ||||
|     def __init__(self, url, previous_md5=None, include_filters=None, ignore_text=None): | ||||
|         self.url = url | ||||
|         self._previous_md5 = previous_md5 | ||||
|         self._include_filters = include_filters or [] | ||||
|         self._ignore_text = ignore_text or [] | ||||
|         self.history = {} | ||||
|      | ||||
|     def get(self, key, default=None): | ||||
|         if key == 'previous_md5': | ||||
|             return self._previous_md5 | ||||
|         elif key == 'include_filters': | ||||
|             return self._include_filters | ||||
|         elif key == 'ignore_text': | ||||
|             return self._ignore_text | ||||
|         elif key == 'url': | ||||
|             return self.url | ||||
|         return default | ||||
|      | ||||
|     def has_special_diff_filter_options_set(self): | ||||
|         return False | ||||
|  | ||||
|  | ||||
| @patch('whois.whois') | ||||
| @patch('changedetectionio.processors.difference_detection_processor.__init__') | ||||
| @patch('changedetectionio.processors.text_json_diff.processor.perform_site_check.run_changedetection') | ||||
| def test_whois_processor_basic_functionality(mock_super_run, mock_base_init, mock_whois): | ||||
|     """Test the basic functionality of the WhoisProcessor""" | ||||
|     # Mock the base class init so we don't need to set up the full watch structure | ||||
|     mock_base_init.return_value = None | ||||
|      | ||||
|     # Mock super().run_changedetection to return a simple result | ||||
|     mock_super_run.return_value = (False, {'previous_md5': 'some-md5'}, b'Some filtered text') | ||||
|      | ||||
|     # Mock the whois response | ||||
|     mock_whois_result = MagicMock() | ||||
|     mock_whois_result.text = "Domain Name: example.com\nRegistrar: Example Registrar\nCreation Date: 2020-01-01\n" | ||||
|     mock_whois.return_value = mock_whois_result | ||||
|      | ||||
|     # Create mock datastore | ||||
|     mock_datastore = MagicMock() | ||||
|     mock_datastore.proxy_list = None  # No proxies | ||||
|     mock_datastore.get_preferred_proxy_for_watch.return_value = None | ||||
|     mock_datastore.data = { | ||||
|         'settings': { | ||||
|             'application': { | ||||
|                 'allow_file_uri': False | ||||
|             } | ||||
|         } | ||||
|     } | ||||
|      | ||||
|     # Create a processor instance and setup minimal required attributes | ||||
|     processor = WhoisProcessor(datastore=mock_datastore, watch_uuid='test-uuid') | ||||
|      | ||||
|     # Create a minimal watch object | ||||
|     watch = MockWatch(url="https://example.com") | ||||
|      | ||||
|     # Simulate link access in the watch | ||||
|     processor.watch = MagicMock() | ||||
|     processor.watch.link = "https://example.com" | ||||
|     processor.watch.get.return_value = "uuid-123" | ||||
|      | ||||
|     # Run the processor's run_changedetection method by first using call_browser | ||||
|     processor.call_browser() | ||||
|      | ||||
|     # Check that the fetcher was set up correctly | ||||
|     assert processor.fetcher is not None | ||||
|     assert hasattr(processor.fetcher, 'content') | ||||
|     assert hasattr(processor.fetcher, 'headers') | ||||
|     assert hasattr(processor.fetcher, 'status_code') | ||||
|      | ||||
|     # Verify that whois was called with the right domain | ||||
|     assert mock_whois.called | ||||
|     assert mock_whois.call_args[0][0] == 'example.com' | ||||
|      | ||||
|     # Now run the processor | ||||
|     result = processor.run_changedetection(watch) | ||||
|      | ||||
|     # Check that the parent run_changedetection was called | ||||
|     assert mock_super_run.called | ||||
|  | ||||
|  | ||||
| @patch('whois.whois') | ||||
| @patch('changedetectionio.processors.difference_detection_processor.__init__') | ||||
| def test_whois_processor_call_browser_with_proxy(mock_base_init, mock_whois): | ||||
|     """Test the call_browser method with proxy configuration""" | ||||
|     # Mock the base class init | ||||
|     mock_base_init.return_value = None | ||||
|      | ||||
|     # Mock the whois response | ||||
|     mock_whois_result = MagicMock() | ||||
|     mock_whois_result.text = "Domain Name: example.com\nRegistrar: Example Registrar\nCreation Date: 2020-01-01\n" | ||||
|     mock_whois.return_value = mock_whois_result | ||||
|      | ||||
|     # Create mock datastore | ||||
|     mock_datastore = MagicMock() | ||||
|     mock_proxy = { | ||||
|         'test-proxy': { | ||||
|             'url': 'http://proxy.example.com:8080', | ||||
|             'label': 'Test Proxy' | ||||
|         } | ||||
|     } | ||||
|     mock_datastore.proxy_list = mock_proxy | ||||
|     mock_datastore.get_preferred_proxy_for_watch.return_value = 'test-proxy' | ||||
|     mock_datastore.data = { | ||||
|         'settings': { | ||||
|             'application': { | ||||
|                 'allow_file_uri': False | ||||
|             } | ||||
|         } | ||||
|     } | ||||
|      | ||||
|     # Create a processor instance with our mock datastore | ||||
|     processor = WhoisProcessor(datastore=mock_datastore, watch_uuid='test-uuid') | ||||
|      | ||||
|     # Set up watch | ||||
|     processor.watch = MagicMock() | ||||
|     processor.watch.link = "https://example.com" | ||||
|     processor.watch.get.return_value = "uuid-123" | ||||
|      | ||||
|     # Call the method with a proxy | ||||
|     processor.call_browser() | ||||
|      | ||||
|     # Verify whois was called | ||||
|     assert mock_whois.called | ||||
|     assert mock_whois.call_args[0][0] == 'example.com' | ||||
|      | ||||
|     # Check that the fetcher was set up correctly | ||||
|     assert processor.fetcher is not None | ||||
|     assert processor.fetcher.content is not None | ||||
|  | ||||
|  | ||||
| @patch('changedetectionio.processors.difference_detection_processor.__init__') | ||||
| def test_whois_processor_perform_site_check(mock_base_init): | ||||
|     """Test the WhoisProcessor.perform_site_check static method""" | ||||
|     mock_base_init.return_value = None | ||||
|      | ||||
|     # Test the static method | ||||
|     with patch.object(WhoisProcessor, '__init__', return_value=None) as mock_init: | ||||
|         datastore = MagicMock() | ||||
|         watch_uuid = "test-uuid" | ||||
|          | ||||
|         # Call the static method | ||||
|         processor = WhoisProcessor.perform_site_check(datastore=datastore, watch_uuid=watch_uuid) | ||||
|          | ||||
|         # Check that constructor was called with expected args | ||||
|         mock_init.assert_called_once_with(datastore=datastore, watch_uuid=watch_uuid) | ||||
|          | ||||
|         # Check it returns the right type | ||||
|         assert isinstance(processor, WhoisProcessor) | ||||
|  | ||||
|  | ||||
| def test_get_display_link(): | ||||
|     """Test the get_display_link hook implementation""" | ||||
|     from changedetectionio.processors.whois_plugin import get_display_link | ||||
|      | ||||
|     # Test with a regular URL | ||||
|     url = "https://example.com/some/path?param=value" | ||||
|     processor_name = "whois" | ||||
|     link = get_display_link(url=url, processor_name=processor_name) | ||||
|     assert link == "WHOIS - example.com" | ||||
|      | ||||
|     # Test with a subdomain | ||||
|     url = "https://subdomain.example.com/" | ||||
|     link = get_display_link(url=url, processor_name=processor_name) | ||||
|     assert link == "WHOIS - subdomain.example.com" | ||||
|      | ||||
|     # Test with www prefix (should be removed) | ||||
|     url = "https://www.example.com/" | ||||
|     link = get_display_link(url=url, processor_name=processor_name) | ||||
|     assert link == "WHOIS - example.com" | ||||
|      | ||||
|     # Test with a different processor (should return None) | ||||
|     url = "https://example.com/" | ||||
|     processor_name = "text_json_diff" | ||||
|     link = get_display_link(url=url, processor_name=processor_name) | ||||
|     assert link is None | ||||
| @@ -61,5 +61,22 @@ class TestDiffBuilder(unittest.TestCase): | ||||
|         p = watch.get_from_version_based_on_last_viewed | ||||
|         assert p == "100", "Correct with only one history snapshot" | ||||
|  | ||||
|     def test_watch_link_property_with_processor(self): | ||||
|         """Test the link property with a processor that customizes the link""" | ||||
|         from unittest.mock import patch | ||||
|          | ||||
|         watch = Watch.model(datastore_path='/tmp', default={}) | ||||
|         watch['url'] = 'https://example.com' | ||||
|         watch['processor'] = 'whois' | ||||
|          | ||||
|         # Mock the processor registry's get_display_link function | ||||
|         with patch('changedetectionio.processors.processor_registry.get_display_link') as mock_get_display_link: | ||||
|             mock_get_display_link.return_value = "WHOIS - example.com" | ||||
|              | ||||
|             # The link property should use the customized link from the processor | ||||
|             assert watch.link == "WHOIS - example.com" | ||||
|             mock_get_display_link.assert_called_once_with(url='https://example.com', processor_name='whois') | ||||
|  | ||||
|  | ||||
| if __name__ == '__main__': | ||||
|     unittest.main() | ||||
|   | ||||
| @@ -271,19 +271,38 @@ class update_worker(threading.Thread): | ||||
|  | ||||
|                     try: | ||||
|                         # Processor is what we are using for detecting the "Change" | ||||
|                         processor = watch.get('processor', 'text_json_diff') | ||||
|                         processor_name = watch.get('processor', 'text_json_diff') | ||||
|  | ||||
|                         # Init a new 'difference_detection_processor', first look in processors | ||||
|                         processor_module_name = f"changedetectionio.processors.{processor}.processor" | ||||
|                          | ||||
|  | ||||
|                         # First, try to get the processor from our plugin registry | ||||
|                         try: | ||||
|                             processor_module = importlib.import_module(processor_module_name) | ||||
|                         except ModuleNotFoundError as e: | ||||
|                             print(f"Processor module '{processor}' not found.") | ||||
|                             raise e | ||||
|  | ||||
|                         update_handler = processor_module.perform_site_check(datastore=self.datastore, | ||||
|                                                                              watch_uuid=uuid | ||||
|                                                                              ) | ||||
|                             from changedetectionio.processors.processor_registry import get_processor_site_check | ||||
|                             update_handler = get_processor_site_check(processor_name, self.datastore, uuid) | ||||
|                              | ||||
|                             if update_handler: | ||||
|                                 # We found the processor in our plugin registry | ||||
|                                 logger.info(f"Using processor '{processor_name}' from plugin registry") | ||||
|                             else: | ||||
|                                 # Fall back to the traditional file-based approach | ||||
|                                 processor_module_name = f"changedetectionio.processors.{processor_name}.processor" | ||||
|                                 try: | ||||
|                                     processor_module = importlib.import_module(processor_module_name) | ||||
|                                     update_handler = processor_module.perform_site_check(datastore=self.datastore, | ||||
|                                                                                         watch_uuid=uuid) | ||||
|                                 except ModuleNotFoundError as e: | ||||
|                                     print(f"Processor module '{processor_name}' not found in both plugin registry and file system.") | ||||
|                                     raise e | ||||
|                         except ImportError as e: | ||||
|                             # If processor_registry.py cannot be imported, fall back to the traditional approach | ||||
|                             processor_module_name = f"changedetectionio.processors.{processor_name}.processor" | ||||
|                             try: | ||||
|                                 processor_module = importlib.import_module(processor_module_name) | ||||
|                                 update_handler = processor_module.perform_site_check(datastore=self.datastore, | ||||
|                                                                                     watch_uuid=uuid) | ||||
|                             except ModuleNotFoundError as e: | ||||
|                                 print(f"Processor module '{processor_name}' not found.") | ||||
|                                 raise e | ||||
|  | ||||
|                         update_handler.call_browser() | ||||
|  | ||||
|   | ||||
							
								
								
									
										31
									
								
								test_processor_registration.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										31
									
								
								test_processor_registration.py
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,31 @@ | ||||
| #!/usr/bin/env python3 | ||||
|  | ||||
| from changedetectionio.processors import available_processors | ||||
| from changedetectionio.processors.processor_registry import get_processor_class, get_processor_form | ||||
|  | ||||
| # Test processor registration | ||||
| print("=== Available Processors ===") | ||||
| processors = available_processors() | ||||
| for name, description in processors: | ||||
|     print(f"Processor: {name} - {description}") | ||||
|  | ||||
| # Check if our WHOIS processor is registered | ||||
| whois_processor_name = "whois_processor" | ||||
| whois_found = any(name == whois_processor_name for name, _ in processors) | ||||
|  | ||||
| if whois_found: | ||||
|     print(f"\nWHOIS Processor found! Getting processor class and form...") | ||||
|      | ||||
|     # Get the processor class | ||||
|     processor_class = get_processor_class(whois_processor_name) | ||||
|     print(f"Processor class: {processor_class}") | ||||
|     print(f"Processor class name: {processor_class.__name__ if processor_class else None}") | ||||
|     print(f"Processor class module: {processor_class.__module__ if processor_class else None}") | ||||
|      | ||||
|     # Get the processor form | ||||
|     processor_form = get_processor_form(whois_processor_name) | ||||
|     print(f"Processor form: {processor_form}") | ||||
|      | ||||
|     print("\nWHOIS Processor successfully registered") | ||||
| else: | ||||
|     print(f"\nWHOIS Processor not found in available processors") | ||||
							
								
								
									
										16
									
								
								test_processors.py
									
									
									
									
									
										Executable file
									
								
							
							
						
						
									
										16
									
								
								test_processors.py
									
									
									
									
									
										Executable file
									
								
							| @@ -0,0 +1,16 @@ | ||||
| #!/usr/bin/env python3 | ||||
|  | ||||
| from changedetectionio.processors import available_processors | ||||
| from changedetectionio.processors import find_processors | ||||
|  | ||||
| # Test traditional processor discovery | ||||
| print("=== Traditional Processor Discovery ===") | ||||
| traditional_processors = find_processors() | ||||
| for module, name in traditional_processors: | ||||
|     print(f"Found processor: {name} in {module.__name__}") | ||||
|  | ||||
| # Test combined processor discovery (traditional + pluggy) | ||||
| print("\n=== Combined Processor Discovery ===") | ||||
| combined_processors = available_processors() | ||||
| for name, description in combined_processors: | ||||
|     print(f"Processor: {name} - {description}") | ||||
							
								
								
									
										53
									
								
								test_whois_extraction.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										53
									
								
								test_whois_extraction.py
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,53 @@ | ||||
| #!/usr/bin/env python3 | ||||
|  | ||||
| import urllib.parse | ||||
| import re | ||||
| import sys | ||||
|  | ||||
| def extract_domain_from_url(url): | ||||
|     """Extract domain from a URL""" | ||||
|     parsed_url = urllib.parse.urlparse(url) | ||||
|     domain = parsed_url.netloc | ||||
|      | ||||
|     # Remove www. prefix if present | ||||
|     domain = re.sub(r'^www\.', '', domain) | ||||
|      | ||||
|     return domain | ||||
|  | ||||
| # Test domain extraction | ||||
| test_urls = [ | ||||
|     "https://changedetection.io", | ||||
|     "http://www.example.com/page", | ||||
|     "https://subdomain.domain.co.uk/path?query=1", | ||||
|     "ftp://ftp.example.org", | ||||
|     "https://www.changedetection.io/page/subpage", | ||||
| ] | ||||
|  | ||||
| print("=== Domain Extraction Test ===") | ||||
| for url in test_urls: | ||||
|     domain = extract_domain_from_url(url) | ||||
|     print(f"URL: {url} -> Domain: {domain}") | ||||
|  | ||||
| # Test WHOIS lookup for changedetection.io | ||||
| try: | ||||
|     import whois | ||||
|      | ||||
|     domain = extract_domain_from_url("https://changedetection.io") | ||||
|     print(f"\n=== WHOIS lookup for {domain} ===") | ||||
|      | ||||
|     whois_info = whois.whois(domain) | ||||
|      | ||||
|     # Print key information | ||||
|     print(f"Domain Name: {whois_info.get('domain_name', '')}") | ||||
|     print(f"Registrar: {whois_info.get('registrar', '')}") | ||||
|     print(f"Creation Date: {whois_info.get('creation_date', '')}") | ||||
|     print(f"Expiration Date: {whois_info.get('expiration_date', '')}") | ||||
|      | ||||
|     print("\nWHOIS lookup successful!") | ||||
|      | ||||
| except ImportError: | ||||
|     print("python-whois module not installed. Run: pip install python-whois") | ||||
|     sys.exit(1) | ||||
| except Exception as e: | ||||
|     print(f"Error performing WHOIS lookup: {str(e)}") | ||||
|     sys.exit(1) | ||||
							
								
								
									
										47
									
								
								test_whois_processor.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										47
									
								
								test_whois_processor.py
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,47 @@ | ||||
| #!/usr/bin/env python3 | ||||
|  | ||||
| from changedetectionio.processors import available_processors | ||||
| from changedetectionio.processors.processor_registry import get_processor_class | ||||
| import urllib.parse | ||||
| import sys | ||||
|  | ||||
| # First, verify our processor is available | ||||
| print("=== Available Processors ===") | ||||
| processors = available_processors() | ||||
| for name, description in processors: | ||||
|     print(f"Processor: {name} - {description}") | ||||
|  | ||||
| # Get the WHOIS processor class | ||||
| whois_processor_class = get_processor_class("whois_processor") | ||||
| if not whois_processor_class: | ||||
|     print("ERROR: WHOIS processor not found in available processors.") | ||||
|     sys.exit(1) | ||||
|  | ||||
| print(f"\nFound WHOIS processor class: {whois_processor_class}") | ||||
|  | ||||
| # Test the WHOIS processor directly | ||||
| try: | ||||
|     # Parse a domain from a URL | ||||
|     url = "https://changedetection.io" | ||||
|     parsed_url = urllib.parse.urlparse(url) | ||||
|     domain = parsed_url.netloc | ||||
|      | ||||
|     # Import whois and fetch information | ||||
|     import whois | ||||
|     whois_info = whois.whois(domain) | ||||
|      | ||||
|     print(f"\n=== WHOIS Information for {domain} ===") | ||||
|      | ||||
|     # Print the information | ||||
|     if hasattr(whois_info, 'text'): | ||||
|         print(whois_info.text) | ||||
|     else: | ||||
|         for key, value in whois_info.items(): | ||||
|             if value: | ||||
|                 print(f"{key}: {value}") | ||||
|                  | ||||
|     print("\nSuccessfully retrieved WHOIS data!") | ||||
|      | ||||
| except Exception as e: | ||||
|     print(f"Error fetching WHOIS data: {str(e)}") | ||||
|     sys.exit(1) | ||||
							
								
								
									
										136
									
								
								test_whois_processor_full.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										136
									
								
								test_whois_processor_full.py
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,136 @@ | ||||
| #!/usr/bin/env python3 | ||||
|  | ||||
| from changedetectionio.processors import available_processors | ||||
| from changedetectionio.processors.processor_registry import get_processor_class | ||||
| import unittest | ||||
| import sys | ||||
| from unittest.mock import MagicMock, patch | ||||
| import urllib.parse | ||||
|  | ||||
| # First, verify our processor is available | ||||
| print("=== Available Processors ===") | ||||
| processors = available_processors() | ||||
| for name, description in processors: | ||||
|     print(f"Processor: {name} - {description}") | ||||
|  | ||||
| # Get the WHOIS processor class | ||||
| whois_processor_class = get_processor_class("whois_processor") | ||||
| if not whois_processor_class: | ||||
|     print("ERROR: WHOIS processor not found in available processors.") | ||||
|     sys.exit(1) | ||||
|  | ||||
| print(f"\nFound WHOIS processor class: {whois_processor_class}") | ||||
|  | ||||
| # Create a test for our WHOIS processor | ||||
| class TestWhoisProcessor(unittest.TestCase): | ||||
|      | ||||
|     # Use the real whois function - tests will actually make network requests | ||||
|     def test_whois_processor_real(self): | ||||
|         # Extract the domain from the URL | ||||
|         test_url = "https://changedetection.io" | ||||
|         parsed_url = urllib.parse.urlparse(test_url) | ||||
|         domain = parsed_url.netloc | ||||
|          | ||||
|         # Create a minimal mock datastore | ||||
|         mock_datastore = MagicMock() | ||||
|         mock_datastore.data = { | ||||
|             'watching': {'test-uuid': {'url': test_url}}, | ||||
|             'settings': { | ||||
|                 'application': {'empty_pages_are_a_change': False}, | ||||
|                 'requests': {'timeout': 30} | ||||
|             } | ||||
|         } | ||||
|         mock_datastore.get_all_base_headers.return_value = {} | ||||
|         mock_datastore.get_all_headers_in_textfile_for_watch.return_value = {} | ||||
|         mock_datastore.get_preferred_proxy_for_watch.return_value = None | ||||
|         mock_datastore.get_tag_overrides_for_watch.return_value = [] | ||||
|          | ||||
|         # Create a minimal mock watch that mimics the real Watch class | ||||
|         class MockWatch: | ||||
|             def __init__(self, url): | ||||
|                 self.link = url | ||||
|                 self.is_pdf = False | ||||
|                 self.has_browser_steps = False | ||||
|                 self.is_source_type_url = False | ||||
|                 self.history = {} | ||||
|                 self.history_n = 0 | ||||
|                 self.last_viewed = 0 | ||||
|                 self.newest_history_key = 0 | ||||
|                  | ||||
|             def get(self, key, default=None): | ||||
|                 if key == 'uuid': | ||||
|                     return 'test-uuid' | ||||
|                 elif key == 'include_filters': | ||||
|                     return [] | ||||
|                 elif key == 'body': | ||||
|                     return None | ||||
|                 elif key == 'method': | ||||
|                     return 'GET' | ||||
|                 elif key == 'headers': | ||||
|                     return {} | ||||
|                 elif key == 'browser_steps': | ||||
|                     return [] | ||||
|                 return default | ||||
|                  | ||||
|             def __getitem__(self, key): | ||||
|                 return self.get(key) | ||||
|                  | ||||
|             def get_last_fetched_text_before_filters(self): | ||||
|                 return "" | ||||
|              | ||||
|             def save_last_text_fetched_before_filters(self, content): | ||||
|                 pass | ||||
|                  | ||||
|             def has_special_diff_filter_options_set(self): | ||||
|                 return False | ||||
|                  | ||||
|             def lines_contain_something_unique_compared_to_history(self, lines, ignore_whitespace): | ||||
|                 return True | ||||
|                  | ||||
|         mock_watch = MockWatch(test_url) | ||||
|          | ||||
|         # Create a more complete mock fetcher | ||||
|         class MockFetcher: | ||||
|             def __init__(self): | ||||
|                 self.content = "" | ||||
|                 self.raw_content = b"" | ||||
|                 self.headers = {'Content-Type': 'text/plain'} | ||||
|                 self.screenshot = None | ||||
|                 self.xpath_data = None | ||||
|                 self.instock_data = None | ||||
|                 self.browser_steps = [] | ||||
|              | ||||
|             def get_last_status_code(self): | ||||
|                 return 200 | ||||
|                  | ||||
|             def get_all_headers(self): | ||||
|                 return {'content-type': 'text/plain'} | ||||
|                  | ||||
|             def quit(self): | ||||
|                 pass | ||||
|                  | ||||
|             def run(self, **kwargs): | ||||
|                 pass | ||||
|                  | ||||
|         # Create the processor and set the mock fetcher | ||||
|         processor = whois_processor_class(datastore=mock_datastore, watch_uuid='test-uuid') | ||||
|         processor.fetcher = MockFetcher() | ||||
|          | ||||
|         # Run the processor - this will make an actual WHOIS request | ||||
|         changed, update_obj, content = processor.run_changedetection(mock_watch) | ||||
|          | ||||
|         # Print the content for debugging | ||||
|         content_str = content.decode('utf-8') | ||||
|         print(f"\n=== WHOIS Content from processor (first 200 chars) ===") | ||||
|         print(content_str[:200] + "...") | ||||
|          | ||||
|         # Verify the content contains domain information | ||||
|         self.assertIn(domain, content_str) | ||||
|         self.assertIn("Domain Name", content_str) | ||||
|         self.assertIn("Creation Date", content_str) | ||||
|          | ||||
|         print("\nWHOIS processor test with real data PASSED!") | ||||
|  | ||||
| # Run the test | ||||
| if __name__ == "__main__": | ||||
|     unittest.main(argv=['first-arg-is-ignored'], exit=False) | ||||
							
								
								
									
										39
									
								
								test_whois_simple.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										39
									
								
								test_whois_simple.py
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,39 @@ | ||||
| #!/usr/bin/env python3 | ||||
|  | ||||
| import urllib.parse | ||||
| import re | ||||
| import whois | ||||
|  | ||||
| # Test with changedetection.io domain | ||||
| url = "https://changedetection.io" | ||||
|  | ||||
| # Extract domain from URL | ||||
| parsed_url = urllib.parse.urlparse(url) | ||||
| domain = parsed_url.netloc | ||||
|  | ||||
| # Remove www. prefix if present | ||||
| domain = re.sub(r'^www\.', '', domain) | ||||
|  | ||||
| # Fetch WHOIS information | ||||
| print(f"Looking up WHOIS data for domain: {domain}") | ||||
| whois_info = whois.whois(domain) | ||||
|  | ||||
| # Print key WHOIS data | ||||
| print("\nKey WHOIS information:") | ||||
| print(f"Domain Name: {whois_info.get('domain_name', 'Unknown')}") | ||||
| print(f"Registrar: {whois_info.get('registrar', 'Unknown')}") | ||||
| print(f"Creation Date: {whois_info.get('creation_date', 'Unknown')}") | ||||
| print(f"Expiration Date: {whois_info.get('expiration_date', 'Unknown')}") | ||||
| print(f"Updated Date: {whois_info.get('updated_date', 'Unknown')}") | ||||
|  | ||||
| # Format as text | ||||
| whois_text = f"WHOIS Information for domain: {domain}\n\n" | ||||
| for key, value in whois_info.items(): | ||||
|     if value: | ||||
|         whois_text += f"{key}: {value}\n" | ||||
|  | ||||
| # Print the first 200 characters | ||||
| print("\nFormatted WHOIS data (first 200 chars):") | ||||
| print(whois_text[:200] + "...") | ||||
|  | ||||
| print("\nWHOIS lookup successful!") | ||||
		Reference in New Issue
	
	Block a user