mirror of
				https://github.com/dgtlmoon/changedetection.io.git
				synced 2025-11-04 08:34:57 +00:00 
			
		
		
		
	Compare commits
	
		
			41 Commits
		
	
	
		
			API-OpenAP
			...
			conditions
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
| 
						 | 
					79166c0307 | ||
| 
						 | 
					9dbe91e470 | ||
| 
						 | 
					51bd8cd2d7 | ||
| 
						 | 
					35455e7dd6 | ||
| 
						 | 
					aaa038f082 | ||
| 
						 | 
					57eeb221cb | ||
| 
						 | 
					8187b9ce4c | ||
| 
						 | 
					cc70b65bfa | ||
| 
						 | 
					42099f1fff | ||
| 
						 | 
					408864d346 | ||
| 
						 | 
					02b8660bf3 | ||
| 
						 | 
					947a60af89 | ||
| 
						 | 
					a0f4cb4d65 | ||
| 
						 | 
					71ea8d80f3 | ||
| 
						 | 
					4f48958187 | ||
| 
						 | 
					2608980b1d | ||
| 
						 | 
					c982395d72 | ||
| 
						 | 
					ee7e43ea87 | ||
| 
						 | 
					da5585b53c | ||
| 
						 | 
					76062c9419 | ||
| 
						 | 
					675953797c | ||
| 
						 | 
					b202652a93 | ||
| 
						 | 
					617dc721bf | ||
| 
						 | 
					ec13720694 | ||
| 
						 | 
					ddacb0bcbc | ||
| 
						 | 
					f67d98b839 | ||
| 
						 | 
					beee93d528 | ||
| 
						 | 
					987ab3e494 | ||
| 
						 | 
					0c68cfffb1 | ||
| 
						 | 
					e93a9244fe | ||
| 
						 | 
					e56eec41c1 | ||
| 
						 | 
					31f4bb7cc3 | ||
| 
						 | 
					f08efde110 | ||
| 
						 | 
					9b39b2853b | ||
| 
						 | 
					892d38ba42 | ||
| 
						 | 
					b170e191d4 | ||
| 
						 | 
					edb78efcca | ||
| 
						 | 
					383f90b70c | ||
| 
						 | 
					6948418865 | ||
| 
						 | 
					cd80e317f3 | ||
| 
						 | 
					8c26210804 | 
@@ -712,23 +712,63 @@ def changedetection_app(config=None, datastore_o=None):
 | 
			
		||||
        # Does it use some custom form? does one exist?
 | 
			
		||||
        processor_name = datastore.data['watching'][uuid].get('processor', '')
 | 
			
		||||
        processor_classes = next((tpl for tpl in find_processors() if tpl[1] == processor_name), None)
 | 
			
		||||
        
 | 
			
		||||
        # If it's not found in traditional processors, check if it's a pluggy plugin
 | 
			
		||||
        if not processor_classes:
 | 
			
		||||
            flash(f"Cannot load the edit form for processor/plugin '{processor_classes[1]}', plugin missing?", 'error')
 | 
			
		||||
            return redirect(url_for('index'))
 | 
			
		||||
 | 
			
		||||
        parent_module = get_parent_module(processor_classes[0])
 | 
			
		||||
 | 
			
		||||
        try:
 | 
			
		||||
            # Get the parent of the "processor.py" go up one, get the form (kinda spaghetti but its reusing existing code)
 | 
			
		||||
            forms_module = importlib.import_module(f"{parent_module.__name__}.forms")
 | 
			
		||||
            # Access the 'processor_settings_form' class from the 'forms' module
 | 
			
		||||
            form_class = getattr(forms_module, 'processor_settings_form')
 | 
			
		||||
        except ModuleNotFoundError as e:
 | 
			
		||||
            # .forms didnt exist
 | 
			
		||||
            form_class = forms.processor_text_json_diff_form
 | 
			
		||||
        except AttributeError as e:
 | 
			
		||||
            # .forms exists but no useful form
 | 
			
		||||
            form_class = forms.processor_text_json_diff_form
 | 
			
		||||
            try:
 | 
			
		||||
                from changedetectionio.processors.processor_registry import get_processor_form, _get_plugin_name_map
 | 
			
		||||
                
 | 
			
		||||
                # Get all available plugins for debugging
 | 
			
		||||
                available_plugins = list(_get_plugin_name_map().keys())
 | 
			
		||||
                logger.debug(f"Available processor plugins: {available_plugins}")
 | 
			
		||||
                
 | 
			
		||||
                # Try to get the processor form
 | 
			
		||||
                plugin_form_class = get_processor_form(processor_name)
 | 
			
		||||
                
 | 
			
		||||
                if plugin_form_class:
 | 
			
		||||
                    # Use default text_json_diff_form as parent module for plugins
 | 
			
		||||
                    from changedetectionio.processors.text_json_diff import processor as text_json_diff_processor
 | 
			
		||||
                    form_class = forms.processor_text_json_diff_form
 | 
			
		||||
                    parent_module = get_parent_module(text_json_diff_processor)
 | 
			
		||||
                    
 | 
			
		||||
                    # Skip the normal form loading code path
 | 
			
		||||
                    use_plugin_form = True
 | 
			
		||||
                    logger.debug(f"Successfully loaded form for plugin '{processor_name}'")
 | 
			
		||||
                else:
 | 
			
		||||
                    # Check if the plugin is registered but doesn't have a form
 | 
			
		||||
                    if processor_name in available_plugins:
 | 
			
		||||
                        logger.error(f"Plugin '{processor_name}' is registered but has no form class")
 | 
			
		||||
                        flash(f"Plugin '{processor_name}' is registered but has no form class", 'error')
 | 
			
		||||
                    else:
 | 
			
		||||
                        logger.error(f"Cannot find plugin '{processor_name}'. Available plugins: {available_plugins}")
 | 
			
		||||
                        flash(f"Cannot load the edit form for processor/plugin '{processor_name}', plugin missing?", 'error')
 | 
			
		||||
                    return redirect(url_for('index'))
 | 
			
		||||
            except ImportError as e:
 | 
			
		||||
                logger.error(f"Import error when loading plugin form: {str(e)}")
 | 
			
		||||
                flash(f"Cannot load the edit form for processor/plugin '{processor_name}', plugin system not available?", 'error')
 | 
			
		||||
                return redirect(url_for('index'))
 | 
			
		||||
            except Exception as e:
 | 
			
		||||
                logger.error(f"Unexpected error loading plugin form: {str(e)}")
 | 
			
		||||
                flash(f"Error loading plugin form: {str(e)}", 'error')
 | 
			
		||||
                return redirect(url_for('index'))
 | 
			
		||||
        else:
 | 
			
		||||
            # Traditional processor - continue with normal flow
 | 
			
		||||
            parent_module = get_parent_module(processor_classes[0])
 | 
			
		||||
            use_plugin_form = False
 | 
			
		||||
        
 | 
			
		||||
        # Only follow this path for traditional processors
 | 
			
		||||
        if not use_plugin_form:
 | 
			
		||||
            try:
 | 
			
		||||
                # Get the parent of the "processor.py" go up one, get the form (kinda spaghetti but its reusing existing code)
 | 
			
		||||
                forms_module = importlib.import_module(f"{parent_module.__name__}.forms")
 | 
			
		||||
                # Access the 'processor_settings_form' class from the 'forms' module
 | 
			
		||||
                form_class = getattr(forms_module, 'processor_settings_form')
 | 
			
		||||
            except ModuleNotFoundError as e:
 | 
			
		||||
                # .forms didnt exist
 | 
			
		||||
                form_class = forms.processor_text_json_diff_form
 | 
			
		||||
            except AttributeError as e:
 | 
			
		||||
                # .forms exists but no useful form
 | 
			
		||||
                form_class = forms.processor_text_json_diff_form
 | 
			
		||||
 | 
			
		||||
        form = form_class(formdata=request.form if request.method == 'POST' else None,
 | 
			
		||||
                          data=default,
 | 
			
		||||
 
 | 
			
		||||
@@ -67,7 +67,6 @@ class model(watch_base):
 | 
			
		||||
 | 
			
		||||
    @property
 | 
			
		||||
    def link(self):
 | 
			
		||||
 | 
			
		||||
        url = self.get('url', '')
 | 
			
		||||
        if not is_safe_url(url):
 | 
			
		||||
            return 'DISABLED'
 | 
			
		||||
@@ -93,6 +92,19 @@ class model(watch_base):
 | 
			
		||||
        # Also double check it after any Jinja2 formatting just incase
 | 
			
		||||
        if not is_safe_url(ready_url):
 | 
			
		||||
            return 'DISABLED'
 | 
			
		||||
            
 | 
			
		||||
        # Check if a processor wants to customize the display link
 | 
			
		||||
        processor_name = self.get('processor')
 | 
			
		||||
        if processor_name:
 | 
			
		||||
            try:
 | 
			
		||||
                # Import here to avoid circular imports
 | 
			
		||||
                from changedetectionio.processors.processor_registry import get_display_link
 | 
			
		||||
                custom_link = get_display_link(url=ready_url, processor_name=processor_name)
 | 
			
		||||
                if custom_link:
 | 
			
		||||
                    return custom_link
 | 
			
		||||
            except Exception as e:
 | 
			
		||||
                logger.error(f"Error getting custom display link for processor {processor_name}: {str(e)}")
 | 
			
		||||
                
 | 
			
		||||
        return ready_url
 | 
			
		||||
 | 
			
		||||
    def clear_watch(self):
 | 
			
		||||
 
 | 
			
		||||
@@ -3,6 +3,7 @@ from changedetectionio.content_fetchers.base import Fetcher
 | 
			
		||||
from changedetectionio.strtobool import strtobool
 | 
			
		||||
from copy import deepcopy
 | 
			
		||||
from loguru import logger
 | 
			
		||||
 | 
			
		||||
import hashlib
 | 
			
		||||
import importlib
 | 
			
		||||
import inspect
 | 
			
		||||
@@ -10,6 +11,10 @@ import os
 | 
			
		||||
import pkgutil
 | 
			
		||||
import re
 | 
			
		||||
 | 
			
		||||
# Import the plugin manager
 | 
			
		||||
from .pluggy_interface import plugin_manager
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class difference_detection_processor():
 | 
			
		||||
 | 
			
		||||
    browser_steps = None
 | 
			
		||||
@@ -26,9 +31,95 @@ class difference_detection_processor():
 | 
			
		||||
        self.watch = deepcopy(self.datastore.data['watching'].get(watch_uuid))
 | 
			
		||||
        # Generic fetcher that should be extended (requests, playwright etc)
 | 
			
		||||
        self.fetcher = Fetcher()
 | 
			
		||||
        
 | 
			
		||||
    def _get_proxy_for_watch(self, preferred_proxy_id=None):
 | 
			
		||||
        """Get proxy configuration based on watch settings and preferred proxy ID
 | 
			
		||||
        
 | 
			
		||||
        Args:
 | 
			
		||||
            preferred_proxy_id: Optional explicit proxy ID to use
 | 
			
		||||
            
 | 
			
		||||
        Returns:
 | 
			
		||||
            dict: Proxy configuration or None if no proxy should be used
 | 
			
		||||
            str: Proxy URL or None if no proxy should be used
 | 
			
		||||
        """
 | 
			
		||||
        # Default to no proxy config
 | 
			
		||||
        proxy_config = None
 | 
			
		||||
        proxy_url = None
 | 
			
		||||
        
 | 
			
		||||
        # Check if datastore is available and has get_preferred_proxy_for_watch method
 | 
			
		||||
        if hasattr(self, 'datastore') and self.datastore:
 | 
			
		||||
            try:
 | 
			
		||||
                # Get preferred proxy ID if not provided
 | 
			
		||||
                if not preferred_proxy_id and hasattr(self.datastore, 'get_preferred_proxy_for_watch'):
 | 
			
		||||
                    # Get the watch UUID if available
 | 
			
		||||
                    watch_uuid = None
 | 
			
		||||
                    if hasattr(self.watch, 'get'):
 | 
			
		||||
                        watch_uuid = self.watch.get('uuid')
 | 
			
		||||
                    elif hasattr(self.watch, 'uuid'):
 | 
			
		||||
                        watch_uuid = self.watch.uuid
 | 
			
		||||
                    
 | 
			
		||||
                    if watch_uuid:
 | 
			
		||||
                        preferred_proxy_id = self.datastore.get_preferred_proxy_for_watch(uuid=watch_uuid)
 | 
			
		||||
                
 | 
			
		||||
                # Check if we have a proxy list and a valid proxy ID
 | 
			
		||||
                if preferred_proxy_id and hasattr(self.datastore, 'proxy_list') and self.datastore.proxy_list:
 | 
			
		||||
                    proxy_info = self.datastore.proxy_list.get(preferred_proxy_id)
 | 
			
		||||
                    
 | 
			
		||||
                    if proxy_info and 'url' in proxy_info:
 | 
			
		||||
                        proxy_url = proxy_info.get('url')
 | 
			
		||||
                        logger.debug(f"Selected proxy key '{preferred_proxy_id}' as proxy URL '{proxy_url}'")
 | 
			
		||||
                        
 | 
			
		||||
                        # Parse the proxy URL to build a proxy dict for requests
 | 
			
		||||
                        import urllib.parse
 | 
			
		||||
                        parsed_proxy = urllib.parse.urlparse(proxy_url)
 | 
			
		||||
                        proxy_type = parsed_proxy.scheme
 | 
			
		||||
                        
 | 
			
		||||
                        # Extract credentials if present
 | 
			
		||||
                        username = None
 | 
			
		||||
                        password = None
 | 
			
		||||
                        if parsed_proxy.username:
 | 
			
		||||
                            username = parsed_proxy.username
 | 
			
		||||
                            if parsed_proxy.password:
 | 
			
		||||
                                password = parsed_proxy.password
 | 
			
		||||
                        
 | 
			
		||||
                        # Build the proxy URL without credentials for the proxy dict
 | 
			
		||||
                        netloc = parsed_proxy.netloc
 | 
			
		||||
                        if '@' in netloc:
 | 
			
		||||
                            netloc = netloc.split('@')[1]
 | 
			
		||||
                        
 | 
			
		||||
                        proxy_addr = f"{proxy_type}://{netloc}"
 | 
			
		||||
                        
 | 
			
		||||
                        # Create the proxy configuration
 | 
			
		||||
                        proxy_config = {
 | 
			
		||||
                            'http': proxy_addr,
 | 
			
		||||
                            'https': proxy_addr
 | 
			
		||||
                        }
 | 
			
		||||
                        
 | 
			
		||||
                        # Add credentials if present
 | 
			
		||||
                        if username:
 | 
			
		||||
                            proxy_config['username'] = username
 | 
			
		||||
                            if password:
 | 
			
		||||
                                proxy_config['password'] = password
 | 
			
		||||
            except Exception as e:
 | 
			
		||||
                # Log the error but continue without a proxy
 | 
			
		||||
                logger.error(f"Error setting up proxy: {str(e)}")
 | 
			
		||||
                proxy_config = None
 | 
			
		||||
                proxy_url = None
 | 
			
		||||
                
 | 
			
		||||
        return proxy_config, proxy_url
 | 
			
		||||
 | 
			
		||||
    def call_browser(self, preferred_proxy_id=None):
 | 
			
		||||
 | 
			
		||||
        """Fetch content using the appropriate browser/fetcher
 | 
			
		||||
        
 | 
			
		||||
        This method will:
 | 
			
		||||
        1. Determine the appropriate fetcher to use based on watch settings
 | 
			
		||||
        2. Set up proxy configuration if needed
 | 
			
		||||
        3. Initialize the fetcher with the correct parameters
 | 
			
		||||
        4. Configure any browser steps if needed
 | 
			
		||||
        
 | 
			
		||||
        Args:
 | 
			
		||||
            preferred_proxy_id: Optional explicit proxy ID to use
 | 
			
		||||
        """
 | 
			
		||||
        from requests.structures import CaseInsensitiveDict
 | 
			
		||||
 | 
			
		||||
        url = self.watch.link
 | 
			
		||||
@@ -43,8 +134,8 @@ class difference_detection_processor():
 | 
			
		||||
        # Requests, playwright, other browser via wss:// etc, fetch_extra_something
 | 
			
		||||
        prefer_fetch_backend = self.watch.get('fetch_backend', 'system')
 | 
			
		||||
 | 
			
		||||
        # Proxy ID "key"
 | 
			
		||||
        preferred_proxy_id = preferred_proxy_id if preferred_proxy_id else self.datastore.get_preferred_proxy_for_watch(uuid=self.watch.get('uuid'))
 | 
			
		||||
        # Get proxy configuration
 | 
			
		||||
        proxy_config, proxy_url = self._get_proxy_for_watch(preferred_proxy_id)
 | 
			
		||||
 | 
			
		||||
        # Pluggable content self.fetcher
 | 
			
		||||
        if not prefer_fetch_backend or prefer_fetch_backend == 'system':
 | 
			
		||||
@@ -82,14 +173,10 @@ class difference_detection_processor():
 | 
			
		||||
            # What it referenced doesnt exist, Just use a default
 | 
			
		||||
            fetcher_obj = getattr(content_fetchers, "html_requests")
 | 
			
		||||
 | 
			
		||||
        proxy_url = None
 | 
			
		||||
        if preferred_proxy_id:
 | 
			
		||||
            # Custom browser endpoints should NOT have a proxy added
 | 
			
		||||
            if not prefer_fetch_backend.startswith('extra_browser_'):
 | 
			
		||||
                proxy_url = self.datastore.proxy_list.get(preferred_proxy_id).get('url')
 | 
			
		||||
                logger.debug(f"Selected proxy key '{preferred_proxy_id}' as proxy URL '{proxy_url}' for {url}")
 | 
			
		||||
            else:
 | 
			
		||||
                logger.debug(f"Skipping adding proxy data when custom Browser endpoint is specified. ")
 | 
			
		||||
        # Custom browser endpoints should NOT have a proxy added
 | 
			
		||||
        if proxy_url and prefer_fetch_backend.startswith('extra_browser_'):
 | 
			
		||||
            logger.debug(f"Skipping adding proxy data when custom Browser endpoint is specified.")
 | 
			
		||||
            proxy_url = None
 | 
			
		||||
 | 
			
		||||
        # Now call the fetcher (playwright/requests/etc) with arguments that only a fetcher would need.
 | 
			
		||||
        # When browser_connection_url is None, it method should default to working out whats the best defaults (os env vars etc)
 | 
			
		||||
@@ -185,9 +272,9 @@ def find_sub_packages(package_name):
 | 
			
		||||
 | 
			
		||||
def find_processors():
 | 
			
		||||
    """
 | 
			
		||||
    Find all subclasses of DifferenceDetectionProcessor in the specified package.
 | 
			
		||||
    Find all subclasses of DifferenceDetectionProcessor in the specified package
 | 
			
		||||
    and also include processors from the plugin system.
 | 
			
		||||
 | 
			
		||||
    :param package_name: The name of the package to scan for processor modules.
 | 
			
		||||
    :return: A list of (module, class) tuples.
 | 
			
		||||
    """
 | 
			
		||||
    package_name = "changedetectionio.processors"  # Name of the current package/module
 | 
			
		||||
@@ -195,6 +282,7 @@ def find_processors():
 | 
			
		||||
    processors = []
 | 
			
		||||
    sub_packages = find_sub_packages(package_name)
 | 
			
		||||
 | 
			
		||||
    # Find traditional processors
 | 
			
		||||
    for sub_package in sub_packages:
 | 
			
		||||
        module_name = f"{package_name}.{sub_package}.processor"
 | 
			
		||||
        try:
 | 
			
		||||
@@ -207,6 +295,15 @@ def find_processors():
 | 
			
		||||
        except (ModuleNotFoundError, ImportError) as e:
 | 
			
		||||
            logger.warning(f"Failed to import module {module_name}: {e} (find_processors())")
 | 
			
		||||
 | 
			
		||||
    # Also include processors from the plugin system
 | 
			
		||||
    try:
 | 
			
		||||
        from .processor_registry import get_plugin_processor_modules
 | 
			
		||||
        plugin_modules = get_plugin_processor_modules()
 | 
			
		||||
        if plugin_modules:
 | 
			
		||||
            processors.extend(plugin_modules)
 | 
			
		||||
    except (ImportError, ModuleNotFoundError) as e:
 | 
			
		||||
        logger.warning(f"Failed to import plugin modules: {e} (find_processors())")
 | 
			
		||||
 | 
			
		||||
    return processors
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@@ -223,8 +320,22 @@ def get_parent_module(module):
 | 
			
		||||
    return False
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def get_custom_watch_obj_for_processor(processor_name):
 | 
			
		||||
    """
 | 
			
		||||
    Get the custom watch object for a processor
 | 
			
		||||
    :param processor_name: Name of the processor
 | 
			
		||||
    :return: Watch class or None
 | 
			
		||||
    """
 | 
			
		||||
    # First, try to get the watch model from the pluggy system
 | 
			
		||||
    try:
 | 
			
		||||
        from .processor_registry import get_processor_watch_model
 | 
			
		||||
        watch_model = get_processor_watch_model(processor_name)
 | 
			
		||||
        if watch_model:
 | 
			
		||||
            return watch_model
 | 
			
		||||
    except Exception as e:
 | 
			
		||||
        logger.warning(f"Error getting processor watch model from pluggy: {e}")
 | 
			
		||||
 | 
			
		||||
    # Fall back to the traditional approach
 | 
			
		||||
    from changedetectionio.model import Watch
 | 
			
		||||
    watch_class = Watch.model
 | 
			
		||||
    processor_classes = find_processors()
 | 
			
		||||
@@ -241,14 +352,47 @@ def get_custom_watch_obj_for_processor(processor_name):
 | 
			
		||||
def available_processors():
 | 
			
		||||
    """
 | 
			
		||||
    Get a list of processors by name and description for the UI elements
 | 
			
		||||
    :return: A list :)
 | 
			
		||||
    :return: A list of tuples (processor_name, description)
 | 
			
		||||
    """
 | 
			
		||||
 | 
			
		||||
    processor_classes = find_processors()
 | 
			
		||||
 | 
			
		||||
    available = []
 | 
			
		||||
    for package, processor_class in processor_classes:
 | 
			
		||||
        available.append((processor_class, package.name))
 | 
			
		||||
 | 
			
		||||
    return available
 | 
			
		||||
 | 
			
		||||
    # Get processors from the pluggy system
 | 
			
		||||
    pluggy_processors = []
 | 
			
		||||
    try:
 | 
			
		||||
        from .processor_registry import get_all_processors
 | 
			
		||||
        pluggy_processors = get_all_processors()
 | 
			
		||||
    except Exception as e:
 | 
			
		||||
        logger.error(f"Error getting processors from pluggy: {str(e)}")
 | 
			
		||||
    
 | 
			
		||||
    # Get processors from the traditional file-based system
 | 
			
		||||
    traditional_processors = []
 | 
			
		||||
    try:
 | 
			
		||||
        # Let's not use find_processors() directly since it now also includes pluggy processors
 | 
			
		||||
        package_name = "changedetectionio.processors"
 | 
			
		||||
        sub_packages = find_sub_packages(package_name)
 | 
			
		||||
        
 | 
			
		||||
        for sub_package in sub_packages:
 | 
			
		||||
            module_name = f"{package_name}.{sub_package}.processor"
 | 
			
		||||
            try:
 | 
			
		||||
                module = importlib.import_module(module_name)
 | 
			
		||||
                # Get the name and description from the module if available
 | 
			
		||||
                name = getattr(module, 'name', f"Traditional processor: {sub_package}")
 | 
			
		||||
                description = getattr(module, 'description', sub_package)
 | 
			
		||||
                traditional_processors.append((sub_package, name))
 | 
			
		||||
            except (ModuleNotFoundError, ImportError, AttributeError) as e:
 | 
			
		||||
                logger.warning(f"Failed to import module {module_name}: {e} (available_processors())")
 | 
			
		||||
    except Exception as e:
 | 
			
		||||
        logger.error(f"Error getting traditional processors: {str(e)}")
 | 
			
		||||
    
 | 
			
		||||
    # Combine the lists, ensuring no duplicates
 | 
			
		||||
    # Pluggy processors take precedence
 | 
			
		||||
    all_processors = []
 | 
			
		||||
    
 | 
			
		||||
    # Add all pluggy processors
 | 
			
		||||
    all_processors.extend(pluggy_processors)
 | 
			
		||||
    
 | 
			
		||||
    # Add traditional processors that aren't already registered via pluggy
 | 
			
		||||
    pluggy_processor_names = [name for name, _ in pluggy_processors]
 | 
			
		||||
    for processor_class, name in traditional_processors:
 | 
			
		||||
        if processor_class not in pluggy_processor_names:
 | 
			
		||||
            all_processors.append((processor_class, name))
 | 
			
		||||
    
 | 
			
		||||
    return all_processors
 | 
			
		||||
							
								
								
									
										17
									
								
								changedetectionio/processors/form.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										17
									
								
								changedetectionio/processors/form.py
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,17 @@
 | 
			
		||||
from wtforms import (
 | 
			
		||||
    BooleanField,
 | 
			
		||||
    validators,
 | 
			
		||||
    RadioField
 | 
			
		||||
)
 | 
			
		||||
from wtforms.fields.choices import SelectField
 | 
			
		||||
from wtforms.fields.form import FormField
 | 
			
		||||
from wtforms.form import Form
 | 
			
		||||
 | 
			
		||||
class BaseProcessorForm(Form):
 | 
			
		||||
    """Base class for processor forms"""
 | 
			
		||||
    
 | 
			
		||||
    def extra_tab_content(self):
 | 
			
		||||
        return None
 | 
			
		||||
 | 
			
		||||
    def extra_form_content(self):
 | 
			
		||||
        return None
 | 
			
		||||
							
								
								
									
										4
									
								
								changedetectionio/processors/forms.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										4
									
								
								changedetectionio/processors/forms.py
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,4 @@
 | 
			
		||||
"""
 | 
			
		||||
Forms for processors
 | 
			
		||||
"""
 | 
			
		||||
from changedetectionio.forms import processor_text_json_diff_form
 | 
			
		||||
							
								
								
									
										69
									
								
								changedetectionio/processors/pluggy_interface.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										69
									
								
								changedetectionio/processors/pluggy_interface.py
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,69 @@
 | 
			
		||||
import pluggy
 | 
			
		||||
 | 
			
		||||
# Define the plugin namespace for processors
 | 
			
		||||
PLUGIN_NAMESPACE = "changedetectionio_processors"
 | 
			
		||||
 | 
			
		||||
hookspec = pluggy.HookspecMarker(PLUGIN_NAMESPACE)
 | 
			
		||||
hookimpl = pluggy.HookimplMarker(PLUGIN_NAMESPACE)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class ProcessorSpec:
 | 
			
		||||
    """Hook specifications for processor plugins."""
 | 
			
		||||
 | 
			
		||||
    @hookspec
 | 
			
		||||
    def get_processor_name():
 | 
			
		||||
        """Return the name of the processor."""
 | 
			
		||||
        pass
 | 
			
		||||
 | 
			
		||||
    @hookspec
 | 
			
		||||
    def get_processor_description():
 | 
			
		||||
        """Return the description of the processor."""
 | 
			
		||||
        pass
 | 
			
		||||
    
 | 
			
		||||
    @hookspec
 | 
			
		||||
    def get_processor_class():
 | 
			
		||||
        """Return the processor class."""
 | 
			
		||||
        pass
 | 
			
		||||
    
 | 
			
		||||
    @hookspec
 | 
			
		||||
    def get_processor_form():
 | 
			
		||||
        """Return the processor form class."""
 | 
			
		||||
        pass
 | 
			
		||||
    
 | 
			
		||||
    @hookspec
 | 
			
		||||
    def get_processor_watch_model():
 | 
			
		||||
        """Return the watch model class for this processor (if any)."""
 | 
			
		||||
        pass
 | 
			
		||||
    
 | 
			
		||||
    @hookspec
 | 
			
		||||
    def get_display_link(url, processor_name):
 | 
			
		||||
        """Return a custom display link for the given processor.
 | 
			
		||||
        
 | 
			
		||||
        Args:
 | 
			
		||||
            url: The original URL from the watch
 | 
			
		||||
            processor_name: The name of the processor
 | 
			
		||||
            
 | 
			
		||||
        Returns:
 | 
			
		||||
            A string with the custom display link or None to use the default
 | 
			
		||||
        """
 | 
			
		||||
        pass
 | 
			
		||||
    
 | 
			
		||||
    @hookspec
 | 
			
		||||
    def perform_site_check(datastore, watch_uuid):
 | 
			
		||||
        """Create and return a processor instance ready to perform site check.
 | 
			
		||||
        
 | 
			
		||||
        Args:
 | 
			
		||||
            datastore: The application datastore
 | 
			
		||||
            watch_uuid: The UUID of the watch to check
 | 
			
		||||
            
 | 
			
		||||
        Returns:
 | 
			
		||||
            A processor instance ready to perform site check
 | 
			
		||||
        """
 | 
			
		||||
        pass
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
# Set up the plugin manager
 | 
			
		||||
plugin_manager = pluggy.PluginManager(PLUGIN_NAMESPACE)
 | 
			
		||||
 | 
			
		||||
# Register hook specifications
 | 
			
		||||
plugin_manager.add_hookspecs(ProcessorSpec)
 | 
			
		||||
							
								
								
									
										222
									
								
								changedetectionio/processors/processor_registry.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										222
									
								
								changedetectionio/processors/processor_registry.py
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,222 @@
 | 
			
		||||
from loguru import logger
 | 
			
		||||
from changedetectionio.model import Watch
 | 
			
		||||
from .pluggy_interface import plugin_manager
 | 
			
		||||
from typing import Dict, Any, List, Tuple, Optional, TypeVar, Type
 | 
			
		||||
import functools
 | 
			
		||||
 | 
			
		||||
# Import and register internal plugins
 | 
			
		||||
from . import whois_plugin
 | 
			
		||||
from . import test_plugin
 | 
			
		||||
 | 
			
		||||
# Register plugins
 | 
			
		||||
plugin_manager.register(whois_plugin)
 | 
			
		||||
plugin_manager.register(test_plugin)
 | 
			
		||||
 | 
			
		||||
# Load any setuptools entrypoints
 | 
			
		||||
plugin_manager.load_setuptools_entrypoints("changedetectionio_processors")
 | 
			
		||||
 | 
			
		||||
# Type definitions for better type hinting
 | 
			
		||||
T = TypeVar('T')
 | 
			
		||||
ProcessorClass = TypeVar('ProcessorClass')
 | 
			
		||||
ProcessorForm = TypeVar('ProcessorForm')
 | 
			
		||||
ProcessorWatchModel = TypeVar('ProcessorWatchModel')
 | 
			
		||||
ProcessorInstance = TypeVar('ProcessorInstance')
 | 
			
		||||
 | 
			
		||||
# Cache for plugin name mapping to improve performance
 | 
			
		||||
_plugin_name_map: Dict[str, Any] = {}
 | 
			
		||||
 | 
			
		||||
def register_plugin(plugin_module):
 | 
			
		||||
    """Register a processor plugin"""
 | 
			
		||||
    plugin_manager.register(plugin_module)
 | 
			
		||||
    # Clear the plugin name map cache when a new plugin is registered
 | 
			
		||||
    global _plugin_name_map
 | 
			
		||||
    _plugin_name_map = {}
 | 
			
		||||
 | 
			
		||||
def _get_plugin_name_map() -> Dict[str, Any]:
 | 
			
		||||
    """Get a mapping of processor names to plugins
 | 
			
		||||
    :return: Dictionary mapping processor names to plugins
 | 
			
		||||
    """
 | 
			
		||||
    global _plugin_name_map
 | 
			
		||||
    
 | 
			
		||||
    # Return cached map if available
 | 
			
		||||
    if _plugin_name_map:
 | 
			
		||||
        return _plugin_name_map
 | 
			
		||||
    
 | 
			
		||||
    # Build the map
 | 
			
		||||
    result = {}
 | 
			
		||||
    
 | 
			
		||||
    # Get all plugins from the plugin manager
 | 
			
		||||
    all_plugins = list(plugin_manager.get_plugins())
 | 
			
		||||
    
 | 
			
		||||
    # First register known internal plugins by name for reliability
 | 
			
		||||
    known_plugins = {
 | 
			
		||||
        'whois': whois_plugin,
 | 
			
		||||
        'test': test_plugin
 | 
			
		||||
    }
 | 
			
		||||
    
 | 
			
		||||
    for name, plugin in known_plugins.items():
 | 
			
		||||
        if plugin in all_plugins:
 | 
			
		||||
            result[name] = plugin
 | 
			
		||||
    
 | 
			
		||||
    # Then process remaining plugins through the hook system
 | 
			
		||||
    for plugin in all_plugins:
 | 
			
		||||
        if plugin in known_plugins.values():
 | 
			
		||||
            continue  # Skip plugins we've already registered
 | 
			
		||||
            
 | 
			
		||||
        try:
 | 
			
		||||
            # Get the processor name from this plugin
 | 
			
		||||
            name_results = plugin_manager.hook.get_processor_name(plugin=plugin)
 | 
			
		||||
            
 | 
			
		||||
            if name_results:
 | 
			
		||||
                plugin_name = name_results[0]
 | 
			
		||||
                
 | 
			
		||||
                # Check for name collisions
 | 
			
		||||
                if plugin_name in result:
 | 
			
		||||
                    logger.warning(f"Plugin name collision: '{plugin_name}' is already registered")
 | 
			
		||||
                    continue
 | 
			
		||||
                    
 | 
			
		||||
                result[plugin_name] = plugin
 | 
			
		||||
        except Exception as e:
 | 
			
		||||
            logger.error(f"Error getting processor name from plugin: {str(e)}")
 | 
			
		||||
    
 | 
			
		||||
    # Cache the map
 | 
			
		||||
    _plugin_name_map = result
 | 
			
		||||
    return result
 | 
			
		||||
 | 
			
		||||
def _get_plugin_by_name(processor_name: str) -> Optional[Any]:
 | 
			
		||||
    """Get a plugin by its processor name
 | 
			
		||||
    :param processor_name: Name of the processor
 | 
			
		||||
    :return: Plugin object or None
 | 
			
		||||
    """
 | 
			
		||||
    return _get_plugin_name_map().get(processor_name)
 | 
			
		||||
 | 
			
		||||
def _call_hook_for_plugin(plugin: Any, hook_name: str, default_value: T = None, **kwargs) -> Optional[T]:
 | 
			
		||||
    """Call a hook for a specific plugin and handle exceptions
 | 
			
		||||
    :param plugin: The plugin to call the hook for
 | 
			
		||||
    :param hook_name: Name of the hook to call
 | 
			
		||||
    :param default_value: Default value to return if the hook call fails
 | 
			
		||||
    :param kwargs: Additional arguments to pass to the hook
 | 
			
		||||
    :return: Result of the hook call or default value
 | 
			
		||||
    """
 | 
			
		||||
    if not plugin:
 | 
			
		||||
        return default_value
 | 
			
		||||
    
 | 
			
		||||
    try:
 | 
			
		||||
        hook = getattr(plugin_manager.hook, hook_name)
 | 
			
		||||
        results = hook(plugin=plugin, **kwargs)
 | 
			
		||||
        
 | 
			
		||||
        if results:
 | 
			
		||||
            return results[0]
 | 
			
		||||
    except Exception as e:
 | 
			
		||||
        logger.error(f"Error calling {hook_name} for plugin: {str(e)}")
 | 
			
		||||
    
 | 
			
		||||
    return default_value
 | 
			
		||||
 | 
			
		||||
def get_all_processors() -> List[Tuple[str, str]]:
 | 
			
		||||
    """Get all processors
 | 
			
		||||
    :return: List of tuples (processor_name, processor_description)
 | 
			
		||||
    """
 | 
			
		||||
    processors = []
 | 
			
		||||
    
 | 
			
		||||
    for processor_name, plugin in _get_plugin_name_map().items():
 | 
			
		||||
        description = _call_hook_for_plugin(plugin, 'get_processor_description')
 | 
			
		||||
        if description:
 | 
			
		||||
            processors.append((processor_name, description))
 | 
			
		||||
    
 | 
			
		||||
    return processors
 | 
			
		||||
 | 
			
		||||
def get_processor_class(processor_name: str) -> Optional[Type[ProcessorClass]]:
 | 
			
		||||
    """Get processor class by name
 | 
			
		||||
    :param processor_name: Name of the processor
 | 
			
		||||
    :return: Processor class or None
 | 
			
		||||
    """
 | 
			
		||||
    plugin = _get_plugin_by_name(processor_name)
 | 
			
		||||
    return _call_hook_for_plugin(plugin, 'get_processor_class')
 | 
			
		||||
 | 
			
		||||
def get_processor_form(processor_name: str) -> Optional[Type[ProcessorForm]]:
 | 
			
		||||
    """Get processor form by name
 | 
			
		||||
    :param processor_name: Name of the processor
 | 
			
		||||
    :return: Processor form class or None
 | 
			
		||||
    """
 | 
			
		||||
    plugin = _get_plugin_by_name(processor_name)
 | 
			
		||||
    return _call_hook_for_plugin(plugin, 'get_processor_form')
 | 
			
		||||
 | 
			
		||||
def get_processor_watch_model(processor_name: str) -> Type[ProcessorWatchModel]:
 | 
			
		||||
    """Get processor watch model by name
 | 
			
		||||
    :param processor_name: Name of the processor
 | 
			
		||||
    :return: Watch model class or default Watch model
 | 
			
		||||
    """
 | 
			
		||||
    plugin = _get_plugin_by_name(processor_name)
 | 
			
		||||
    return _call_hook_for_plugin(plugin, 'get_processor_watch_model', default_value=Watch.model)
 | 
			
		||||
 | 
			
		||||
def get_processor_site_check(processor_name: str, datastore: Any, watch_uuid: str) -> Optional[ProcessorInstance]:
 | 
			
		||||
    """Get a processor instance ready to perform site check
 | 
			
		||||
    :param processor_name: Name of the processor
 | 
			
		||||
    :param datastore: The application datastore
 | 
			
		||||
    :param watch_uuid: The UUID of the watch to check
 | 
			
		||||
    :return: A processor instance ready to perform site check, or None
 | 
			
		||||
    """
 | 
			
		||||
    plugin = _get_plugin_by_name(processor_name)
 | 
			
		||||
    if not plugin:
 | 
			
		||||
        return None
 | 
			
		||||
    
 | 
			
		||||
    # Try to get the perform_site_check implementation
 | 
			
		||||
    try:
 | 
			
		||||
        processor = _call_hook_for_plugin(
 | 
			
		||||
            plugin, 
 | 
			
		||||
            'perform_site_check', 
 | 
			
		||||
            datastore=datastore, 
 | 
			
		||||
            watch_uuid=watch_uuid
 | 
			
		||||
        )
 | 
			
		||||
        if processor:
 | 
			
		||||
            return processor
 | 
			
		||||
        
 | 
			
		||||
        # If no perform_site_check hook implementation, try getting the class and instantiating it
 | 
			
		||||
        processor_class = _call_hook_for_plugin(plugin, 'get_processor_class')
 | 
			
		||||
        if processor_class:
 | 
			
		||||
            return processor_class(datastore=datastore, watch_uuid=watch_uuid)
 | 
			
		||||
    except Exception as e:
 | 
			
		||||
        logger.error(f"Error getting processor site check for {processor_name}: {str(e)}")
 | 
			
		||||
    
 | 
			
		||||
    return None
 | 
			
		||||
 | 
			
		||||
def get_display_link(url: str, processor_name: str) -> Optional[str]:
 | 
			
		||||
    """Get a custom display link for the given processor
 | 
			
		||||
    :param url: The original URL from the watch
 | 
			
		||||
    :param processor_name: Name of the processor
 | 
			
		||||
    :return: A string with the custom display link or None to use the default
 | 
			
		||||
    """
 | 
			
		||||
    plugin = _get_plugin_by_name(processor_name)
 | 
			
		||||
    return _call_hook_for_plugin(
 | 
			
		||||
        plugin, 
 | 
			
		||||
        'get_display_link', 
 | 
			
		||||
        url=url, 
 | 
			
		||||
        processor_name=processor_name
 | 
			
		||||
    )
 | 
			
		||||
 | 
			
		||||
def get_plugin_processor_modules() -> List[Tuple[Any, str]]:
 | 
			
		||||
    """Get processor modules for all plugins that can be used with the find_processors function
 | 
			
		||||
    
 | 
			
		||||
    This function adapts pluggy plugins to be compatible with the traditional find_processors system
 | 
			
		||||
    
 | 
			
		||||
    :return: A list of (module, processor_name) tuples
 | 
			
		||||
    """
 | 
			
		||||
    result = []
 | 
			
		||||
    
 | 
			
		||||
    # Import base modules once to avoid repeated imports
 | 
			
		||||
    from changedetectionio.processors.text_json_diff import processor as text_json_diff_processor
 | 
			
		||||
 | 
			
		||||
    # For each plugin, map to a suitable module for find_processors
 | 
			
		||||
    for processor_name, plugin in _get_plugin_name_map().items():
 | 
			
		||||
        try:
 | 
			
		||||
            processor_class = _call_hook_for_plugin(plugin, 'get_processor_class')
 | 
			
		||||
            
 | 
			
		||||
            if processor_class:
 | 
			
		||||
                # Check if this processor extends the text_json_diff processor
 | 
			
		||||
                base_class_name = str(processor_class.__bases__[0].__name__)
 | 
			
		||||
                if base_class_name == 'perform_site_check' or 'TextJsonDiffProcessor' in base_class_name:
 | 
			
		||||
                    result.append((text_json_diff_processor, processor_name))
 | 
			
		||||
        except Exception as e:
 | 
			
		||||
            logger.error(f"Error mapping processor module for {processor_name}: {str(e)}")
 | 
			
		||||
    
 | 
			
		||||
    return result
 | 
			
		||||
							
								
								
									
										169
									
								
								changedetectionio/processors/whois_plugin.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										169
									
								
								changedetectionio/processors/whois_plugin.py
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,169 @@
 | 
			
		||||
from loguru import logger
 | 
			
		||||
import re
 | 
			
		||||
import urllib.parse
 | 
			
		||||
from .pluggy_interface import hookimpl
 | 
			
		||||
from requests.structures import CaseInsensitiveDict
 | 
			
		||||
from changedetectionio.content_fetchers.base import Fetcher
 | 
			
		||||
 | 
			
		||||
# Import the text_json_diff processor
 | 
			
		||||
from changedetectionio.processors.text_json_diff.processor import perform_site_check as TextJsonDiffProcessor
 | 
			
		||||
 | 
			
		||||
# WHOIS Processor implementation that extends TextJsonDiffProcessor
 | 
			
		||||
class WhoisProcessor(TextJsonDiffProcessor):
 | 
			
		||||
    
 | 
			
		||||
    def _extract_domain_from_url(self, url):
 | 
			
		||||
        """Extract domain from URL, removing www. prefix if present"""
 | 
			
		||||
        parsed_url = urllib.parse.urlparse(url)
 | 
			
		||||
        domain = parsed_url.netloc
 | 
			
		||||
        
 | 
			
		||||
        # Remove www. prefix if present
 | 
			
		||||
        domain = re.sub(r'^www\.', '', domain)
 | 
			
		||||
        
 | 
			
		||||
        return domain
 | 
			
		||||
    
 | 
			
		||||
    def call_browser(self, preferred_proxy_id=None):
 | 
			
		||||
        """Override call_browser to perform WHOIS lookup instead of using a browser
 | 
			
		||||
        
 | 
			
		||||
        Note: The python-whois library doesn't directly support proxies. For real proxy support,
 | 
			
		||||
        we would need to implement a custom socket connection that routes through the proxy.
 | 
			
		||||
        This is a TODO for a future enhancement.
 | 
			
		||||
        """
 | 
			
		||||
        # Initialize a basic fetcher - this is used by the parent class
 | 
			
		||||
        self.fetcher = Fetcher()
 | 
			
		||||
        
 | 
			
		||||
        # Extract URL from watch
 | 
			
		||||
        url = self.watch.link
 | 
			
		||||
        
 | 
			
		||||
        # Check for file:// access
 | 
			
		||||
        if re.search(r'^file:', url.strip(), re.IGNORECASE):
 | 
			
		||||
            if not self.datastore.data.get('settings', {}).get('application', {}).get('allow_file_uri', False):
 | 
			
		||||
                raise Exception("file:// type access is denied for security reasons.")
 | 
			
		||||
        
 | 
			
		||||
        # Extract domain from URL
 | 
			
		||||
        domain = self._extract_domain_from_url(url)
 | 
			
		||||
        
 | 
			
		||||
        # Ensure we have a valid domain
 | 
			
		||||
        if not domain:
 | 
			
		||||
            error_msg = f"Could not extract domain from URL: '{url}'"
 | 
			
		||||
            self.fetcher.content = error_msg
 | 
			
		||||
            self.fetcher.status_code = 400
 | 
			
		||||
            logger.error(error_msg)
 | 
			
		||||
            return
 | 
			
		||||
        
 | 
			
		||||
        # Get proxy configuration using the common method from parent class
 | 
			
		||||
        proxy_config, proxy_url = super()._get_proxy_for_watch(preferred_proxy_id)
 | 
			
		||||
        
 | 
			
		||||
        try:
 | 
			
		||||
            # Use python-whois to get domain information
 | 
			
		||||
            import whois
 | 
			
		||||
            
 | 
			
		||||
            # If we have proxy config, use it for the WHOIS lookup
 | 
			
		||||
            # Note: The python-whois library doesn't directly support proxies,
 | 
			
		||||
            # but we can implement proxy support if necessary using custom socket code
 | 
			
		||||
            if proxy_config:
 | 
			
		||||
                # For now, just log that we would use a proxy
 | 
			
		||||
                logger.info(f"Using proxy for WHOIS lookup: {proxy_config}")
 | 
			
		||||
            
 | 
			
		||||
            # Perform the WHOIS lookup
 | 
			
		||||
            whois_info = whois.whois(domain)
 | 
			
		||||
            
 | 
			
		||||
            # Convert whois_info object to text
 | 
			
		||||
            if hasattr(whois_info, 'text'):
 | 
			
		||||
                # Some whois implementations store raw text in .text attribute
 | 
			
		||||
                whois_text = whois_info.text
 | 
			
		||||
            else:
 | 
			
		||||
                # Otherwise, format it nicely as key-value pairs
 | 
			
		||||
                whois_text = f"WHOIS Information for domain: {domain}\n\n"
 | 
			
		||||
                for key, value in whois_info.items():
 | 
			
		||||
                    if value:
 | 
			
		||||
                        whois_text += f"{key}: {value}\n"
 | 
			
		||||
            
 | 
			
		||||
            # Set the content and status for the fetcher
 | 
			
		||||
            self.fetcher.content = whois_text
 | 
			
		||||
            self.fetcher.status_code = 200
 | 
			
		||||
            
 | 
			
		||||
            # Setup headers dictionary for the fetcher
 | 
			
		||||
            self.fetcher.headers = CaseInsensitiveDict({
 | 
			
		||||
                'content-type': 'text/plain',
 | 
			
		||||
                'server': 'whois-processor'
 | 
			
		||||
            })
 | 
			
		||||
            
 | 
			
		||||
            # Add getters for headers
 | 
			
		||||
            self.fetcher.get_all_headers = lambda: self.fetcher.headers
 | 
			
		||||
            self.fetcher.get_last_status_code = lambda: self.fetcher.status_code
 | 
			
		||||
            
 | 
			
		||||
            # Implement necessary methods
 | 
			
		||||
            self.fetcher.quit = lambda: None
 | 
			
		||||
            
 | 
			
		||||
        except Exception as e:
 | 
			
		||||
            error_msg = f"Error fetching WHOIS data for domain {domain}: {str(e)}"
 | 
			
		||||
            self.fetcher.content = error_msg
 | 
			
		||||
            self.fetcher.status_code = 500
 | 
			
		||||
            self.fetcher.headers = CaseInsensitiveDict({
 | 
			
		||||
                'content-type': 'text/plain',
 | 
			
		||||
                'server': 'whois-processor'
 | 
			
		||||
            })
 | 
			
		||||
            self.fetcher.get_all_headers = lambda: self.fetcher.headers
 | 
			
		||||
            self.fetcher.get_last_status_code = lambda: self.fetcher.status_code
 | 
			
		||||
            self.fetcher.quit = lambda: None
 | 
			
		||||
            logger.error(error_msg)
 | 
			
		||||
 | 
			
		||||
        return
 | 
			
		||||
    
 | 
			
		||||
    def run_changedetection(self, watch):
 | 
			
		||||
        """Use the parent's run_changedetection which will use our overridden call_browser method"""
 | 
			
		||||
        try:
 | 
			
		||||
            # Let the parent class handle everything now that we've overridden call_browser
 | 
			
		||||
            changed_detected, update_obj, filtered_text = super().run_changedetection(watch)
 | 
			
		||||
            return changed_detected, update_obj, filtered_text
 | 
			
		||||
            
 | 
			
		||||
        except Exception as e:
 | 
			
		||||
            error_msg = f"Error in WHOIS processor: {str(e)}"
 | 
			
		||||
            update_obj = {'last_notification_error': False, 'last_error': error_msg}
 | 
			
		||||
            logger.error(error_msg)
 | 
			
		||||
            return False, update_obj, error_msg.encode('utf-8')
 | 
			
		||||
 | 
			
		||||
    @staticmethod
 | 
			
		||||
    def perform_site_check(datastore, watch_uuid):
 | 
			
		||||
        """Factory method to create a WhoisProcessor instance - for compatibility with legacy code"""
 | 
			
		||||
        processor = WhoisProcessor(datastore=datastore, watch_uuid=watch_uuid)
 | 
			
		||||
        return processor
 | 
			
		||||
 | 
			
		||||
@hookimpl
 | 
			
		||||
def perform_site_check(datastore, watch_uuid):
 | 
			
		||||
    """Create and return a processor instance ready to perform site check"""
 | 
			
		||||
    return WhoisProcessor(datastore=datastore, watch_uuid=watch_uuid)
 | 
			
		||||
 | 
			
		||||
@hookimpl(trylast=True)  # Use trylast to ensure this runs last in case of conflicts
 | 
			
		||||
def get_processor_name():
 | 
			
		||||
    """Return the name of this processor"""
 | 
			
		||||
    from loguru import logger
 | 
			
		||||
    logger.debug("whois_plugin.get_processor_name() called")
 | 
			
		||||
    return "whois"
 | 
			
		||||
 | 
			
		||||
@hookimpl
 | 
			
		||||
def get_processor_description():
 | 
			
		||||
    """Return the description of this processor"""
 | 
			
		||||
    return "WHOIS Domain Information Changes Detector"
 | 
			
		||||
 | 
			
		||||
@hookimpl
 | 
			
		||||
def get_processor_class():
 | 
			
		||||
    """Return the processor class"""
 | 
			
		||||
    return WhoisProcessor
 | 
			
		||||
 | 
			
		||||
@hookimpl
 | 
			
		||||
def get_processor_form():
 | 
			
		||||
    """Return the processor form class"""
 | 
			
		||||
    # Import here to avoid circular imports
 | 
			
		||||
    try:
 | 
			
		||||
        from changedetectionio.forms import processor_text_json_diff_form
 | 
			
		||||
        return processor_text_json_diff_form
 | 
			
		||||
    except Exception as e:
 | 
			
		||||
        from loguru import logger
 | 
			
		||||
        logger.error(f"Error importing form for whois plugin: {str(e)}")
 | 
			
		||||
        return None
 | 
			
		||||
 | 
			
		||||
@hookimpl
 | 
			
		||||
def get_processor_watch_model():
 | 
			
		||||
    """Return the watch model class for this processor"""
 | 
			
		||||
    return None  # Use default watch model
 | 
			
		||||
							
								
								
									
										59
									
								
								changedetectionio/tests/test_processor_registry.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										59
									
								
								changedetectionio/tests/test_processor_registry.py
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,59 @@
 | 
			
		||||
import pytest
 | 
			
		||||
from changedetectionio.processors.processor_registry import get_processor_class, get_all_processors
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def test_get_all_processors():
 | 
			
		||||
    """Test that get_all_processors returns a list of processor tuples"""
 | 
			
		||||
    processors = get_all_processors()
 | 
			
		||||
    assert isinstance(processors, list)
 | 
			
		||||
    assert len(processors) > 0
 | 
			
		||||
    
 | 
			
		||||
    # Each item should be a tuple of (name, description)
 | 
			
		||||
    for processor in processors:
 | 
			
		||||
        assert isinstance(processor, tuple)
 | 
			
		||||
        assert len(processor) == 2
 | 
			
		||||
        assert isinstance(processor[0], str)
 | 
			
		||||
        assert isinstance(processor[1], str)
 | 
			
		||||
        
 | 
			
		||||
    # Check that our WHOIS processor is included
 | 
			
		||||
    whois_processor = next((p for p in processors if p[0] == "whois"), None)
 | 
			
		||||
    assert whois_processor is not None
 | 
			
		||||
    assert whois_processor[1] == "WHOIS Domain Information Changes"
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def test_get_processor_class():
 | 
			
		||||
    """Test that get_processor_class returns the right class"""
 | 
			
		||||
    # Get the WHOIS processor class
 | 
			
		||||
    processor_class = get_processor_class("whois")
 | 
			
		||||
    assert processor_class is not None
 | 
			
		||||
    
 | 
			
		||||
    # It should have perform_site_check method
 | 
			
		||||
    assert hasattr(processor_class, 'perform_site_check')
 | 
			
		||||
    
 | 
			
		||||
    # Check for non-existent processor
 | 
			
		||||
    non_existent = get_processor_class("non_existent_processor")
 | 
			
		||||
    assert non_existent is None
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def test_get_processor_site_check():
 | 
			
		||||
    """Test that get_processor_site_check returns a processor instance"""
 | 
			
		||||
    from unittest.mock import MagicMock
 | 
			
		||||
    from changedetectionio.processors.processor_registry import get_processor_site_check
 | 
			
		||||
    
 | 
			
		||||
    # Get a WHOIS processor instance
 | 
			
		||||
    mock_datastore = MagicMock()
 | 
			
		||||
    watch_uuid = "test-uuid"
 | 
			
		||||
    processor = get_processor_site_check("whois", mock_datastore, watch_uuid)
 | 
			
		||||
    
 | 
			
		||||
    # It should be a processor instance
 | 
			
		||||
    assert processor is not None
 | 
			
		||||
    
 | 
			
		||||
    # It should have the run_changedetection method
 | 
			
		||||
    assert hasattr(processor, 'run_changedetection')
 | 
			
		||||
    
 | 
			
		||||
    # It should have the call_browser method
 | 
			
		||||
    assert hasattr(processor, 'call_browser')
 | 
			
		||||
    
 | 
			
		||||
    # Check for non-existent processor
 | 
			
		||||
    non_existent = get_processor_site_check("non_existent_processor", mock_datastore, watch_uuid)
 | 
			
		||||
    assert non_existent is None
 | 
			
		||||
							
								
								
									
										182
									
								
								changedetectionio/tests/test_whois_processor.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										182
									
								
								changedetectionio/tests/test_whois_processor.py
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,182 @@
 | 
			
		||||
import pytest
 | 
			
		||||
from unittest.mock import MagicMock, patch
 | 
			
		||||
from changedetectionio.processors.whois_plugin import WhoisProcessor
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class MockWatch:
 | 
			
		||||
    def __init__(self, url, previous_md5=None, include_filters=None, ignore_text=None):
 | 
			
		||||
        self.url = url
 | 
			
		||||
        self._previous_md5 = previous_md5
 | 
			
		||||
        self._include_filters = include_filters or []
 | 
			
		||||
        self._ignore_text = ignore_text or []
 | 
			
		||||
        self.history = {}
 | 
			
		||||
    
 | 
			
		||||
    def get(self, key, default=None):
 | 
			
		||||
        if key == 'previous_md5':
 | 
			
		||||
            return self._previous_md5
 | 
			
		||||
        elif key == 'include_filters':
 | 
			
		||||
            return self._include_filters
 | 
			
		||||
        elif key == 'ignore_text':
 | 
			
		||||
            return self._ignore_text
 | 
			
		||||
        elif key == 'url':
 | 
			
		||||
            return self.url
 | 
			
		||||
        return default
 | 
			
		||||
    
 | 
			
		||||
    def has_special_diff_filter_options_set(self):
 | 
			
		||||
        return False
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@patch('whois.whois')
 | 
			
		||||
@patch('changedetectionio.processors.difference_detection_processor.__init__')
 | 
			
		||||
@patch('changedetectionio.processors.text_json_diff.processor.perform_site_check.run_changedetection')
 | 
			
		||||
def test_whois_processor_basic_functionality(mock_super_run, mock_base_init, mock_whois):
 | 
			
		||||
    """Test the basic functionality of the WhoisProcessor"""
 | 
			
		||||
    # Mock the base class init so we don't need to set up the full watch structure
 | 
			
		||||
    mock_base_init.return_value = None
 | 
			
		||||
    
 | 
			
		||||
    # Mock super().run_changedetection to return a simple result
 | 
			
		||||
    mock_super_run.return_value = (False, {'previous_md5': 'some-md5'}, b'Some filtered text')
 | 
			
		||||
    
 | 
			
		||||
    # Mock the whois response
 | 
			
		||||
    mock_whois_result = MagicMock()
 | 
			
		||||
    mock_whois_result.text = "Domain Name: example.com\nRegistrar: Example Registrar\nCreation Date: 2020-01-01\n"
 | 
			
		||||
    mock_whois.return_value = mock_whois_result
 | 
			
		||||
    
 | 
			
		||||
    # Create mock datastore
 | 
			
		||||
    mock_datastore = MagicMock()
 | 
			
		||||
    mock_datastore.proxy_list = None  # No proxies
 | 
			
		||||
    mock_datastore.get_preferred_proxy_for_watch.return_value = None
 | 
			
		||||
    mock_datastore.data = {
 | 
			
		||||
        'settings': {
 | 
			
		||||
            'application': {
 | 
			
		||||
                'allow_file_uri': False
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
    
 | 
			
		||||
    # Create a processor instance and setup minimal required attributes
 | 
			
		||||
    processor = WhoisProcessor(datastore=mock_datastore, watch_uuid='test-uuid')
 | 
			
		||||
    
 | 
			
		||||
    # Create a minimal watch object
 | 
			
		||||
    watch = MockWatch(url="https://example.com")
 | 
			
		||||
    
 | 
			
		||||
    # Simulate link access in the watch
 | 
			
		||||
    processor.watch = MagicMock()
 | 
			
		||||
    processor.watch.link = "https://example.com"
 | 
			
		||||
    processor.watch.get.return_value = "uuid-123"
 | 
			
		||||
    
 | 
			
		||||
    # Run the processor's run_changedetection method by first using call_browser
 | 
			
		||||
    processor.call_browser()
 | 
			
		||||
    
 | 
			
		||||
    # Check that the fetcher was set up correctly
 | 
			
		||||
    assert processor.fetcher is not None
 | 
			
		||||
    assert hasattr(processor.fetcher, 'content')
 | 
			
		||||
    assert hasattr(processor.fetcher, 'headers')
 | 
			
		||||
    assert hasattr(processor.fetcher, 'status_code')
 | 
			
		||||
    
 | 
			
		||||
    # Verify that whois was called with the right domain
 | 
			
		||||
    assert mock_whois.called
 | 
			
		||||
    assert mock_whois.call_args[0][0] == 'example.com'
 | 
			
		||||
    
 | 
			
		||||
    # Now run the processor
 | 
			
		||||
    result = processor.run_changedetection(watch)
 | 
			
		||||
    
 | 
			
		||||
    # Check that the parent run_changedetection was called
 | 
			
		||||
    assert mock_super_run.called
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@patch('whois.whois')
 | 
			
		||||
@patch('changedetectionio.processors.difference_detection_processor.__init__')
 | 
			
		||||
def test_whois_processor_call_browser_with_proxy(mock_base_init, mock_whois):
 | 
			
		||||
    """Test the call_browser method with proxy configuration"""
 | 
			
		||||
    # Mock the base class init
 | 
			
		||||
    mock_base_init.return_value = None
 | 
			
		||||
    
 | 
			
		||||
    # Mock the whois response
 | 
			
		||||
    mock_whois_result = MagicMock()
 | 
			
		||||
    mock_whois_result.text = "Domain Name: example.com\nRegistrar: Example Registrar\nCreation Date: 2020-01-01\n"
 | 
			
		||||
    mock_whois.return_value = mock_whois_result
 | 
			
		||||
    
 | 
			
		||||
    # Create mock datastore
 | 
			
		||||
    mock_datastore = MagicMock()
 | 
			
		||||
    mock_proxy = {
 | 
			
		||||
        'test-proxy': {
 | 
			
		||||
            'url': 'http://proxy.example.com:8080',
 | 
			
		||||
            'label': 'Test Proxy'
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
    mock_datastore.proxy_list = mock_proxy
 | 
			
		||||
    mock_datastore.get_preferred_proxy_for_watch.return_value = 'test-proxy'
 | 
			
		||||
    mock_datastore.data = {
 | 
			
		||||
        'settings': {
 | 
			
		||||
            'application': {
 | 
			
		||||
                'allow_file_uri': False
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
    
 | 
			
		||||
    # Create a processor instance with our mock datastore
 | 
			
		||||
    processor = WhoisProcessor(datastore=mock_datastore, watch_uuid='test-uuid')
 | 
			
		||||
    
 | 
			
		||||
    # Set up watch
 | 
			
		||||
    processor.watch = MagicMock()
 | 
			
		||||
    processor.watch.link = "https://example.com"
 | 
			
		||||
    processor.watch.get.return_value = "uuid-123"
 | 
			
		||||
    
 | 
			
		||||
    # Call the method with a proxy
 | 
			
		||||
    processor.call_browser()
 | 
			
		||||
    
 | 
			
		||||
    # Verify whois was called
 | 
			
		||||
    assert mock_whois.called
 | 
			
		||||
    assert mock_whois.call_args[0][0] == 'example.com'
 | 
			
		||||
    
 | 
			
		||||
    # Check that the fetcher was set up correctly
 | 
			
		||||
    assert processor.fetcher is not None
 | 
			
		||||
    assert processor.fetcher.content is not None
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@patch('changedetectionio.processors.difference_detection_processor.__init__')
 | 
			
		||||
def test_whois_processor_perform_site_check(mock_base_init):
 | 
			
		||||
    """Test the WhoisProcessor.perform_site_check static method"""
 | 
			
		||||
    mock_base_init.return_value = None
 | 
			
		||||
    
 | 
			
		||||
    # Test the static method
 | 
			
		||||
    with patch.object(WhoisProcessor, '__init__', return_value=None) as mock_init:
 | 
			
		||||
        datastore = MagicMock()
 | 
			
		||||
        watch_uuid = "test-uuid"
 | 
			
		||||
        
 | 
			
		||||
        # Call the static method
 | 
			
		||||
        processor = WhoisProcessor.perform_site_check(datastore=datastore, watch_uuid=watch_uuid)
 | 
			
		||||
        
 | 
			
		||||
        # Check that constructor was called with expected args
 | 
			
		||||
        mock_init.assert_called_once_with(datastore=datastore, watch_uuid=watch_uuid)
 | 
			
		||||
        
 | 
			
		||||
        # Check it returns the right type
 | 
			
		||||
        assert isinstance(processor, WhoisProcessor)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def test_get_display_link():
 | 
			
		||||
    """Test the get_display_link hook implementation"""
 | 
			
		||||
    from changedetectionio.processors.whois_plugin import get_display_link
 | 
			
		||||
    
 | 
			
		||||
    # Test with a regular URL
 | 
			
		||||
    url = "https://example.com/some/path?param=value"
 | 
			
		||||
    processor_name = "whois"
 | 
			
		||||
    link = get_display_link(url=url, processor_name=processor_name)
 | 
			
		||||
    assert link == "WHOIS - example.com"
 | 
			
		||||
    
 | 
			
		||||
    # Test with a subdomain
 | 
			
		||||
    url = "https://subdomain.example.com/"
 | 
			
		||||
    link = get_display_link(url=url, processor_name=processor_name)
 | 
			
		||||
    assert link == "WHOIS - subdomain.example.com"
 | 
			
		||||
    
 | 
			
		||||
    # Test with www prefix (should be removed)
 | 
			
		||||
    url = "https://www.example.com/"
 | 
			
		||||
    link = get_display_link(url=url, processor_name=processor_name)
 | 
			
		||||
    assert link == "WHOIS - example.com"
 | 
			
		||||
    
 | 
			
		||||
    # Test with a different processor (should return None)
 | 
			
		||||
    url = "https://example.com/"
 | 
			
		||||
    processor_name = "text_json_diff"
 | 
			
		||||
    link = get_display_link(url=url, processor_name=processor_name)
 | 
			
		||||
    assert link is None
 | 
			
		||||
@@ -61,5 +61,22 @@ class TestDiffBuilder(unittest.TestCase):
 | 
			
		||||
        p = watch.get_from_version_based_on_last_viewed
 | 
			
		||||
        assert p == "100", "Correct with only one history snapshot"
 | 
			
		||||
 | 
			
		||||
    def test_watch_link_property_with_processor(self):
 | 
			
		||||
        """Test the link property with a processor that customizes the link"""
 | 
			
		||||
        from unittest.mock import patch
 | 
			
		||||
        
 | 
			
		||||
        watch = Watch.model(datastore_path='/tmp', default={})
 | 
			
		||||
        watch['url'] = 'https://example.com'
 | 
			
		||||
        watch['processor'] = 'whois'
 | 
			
		||||
        
 | 
			
		||||
        # Mock the processor registry's get_display_link function
 | 
			
		||||
        with patch('changedetectionio.processors.processor_registry.get_display_link') as mock_get_display_link:
 | 
			
		||||
            mock_get_display_link.return_value = "WHOIS - example.com"
 | 
			
		||||
            
 | 
			
		||||
            # The link property should use the customized link from the processor
 | 
			
		||||
            assert watch.link == "WHOIS - example.com"
 | 
			
		||||
            mock_get_display_link.assert_called_once_with(url='https://example.com', processor_name='whois')
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
if __name__ == '__main__':
 | 
			
		||||
    unittest.main()
 | 
			
		||||
 
 | 
			
		||||
@@ -271,19 +271,38 @@ class update_worker(threading.Thread):
 | 
			
		||||
 | 
			
		||||
                    try:
 | 
			
		||||
                        # Processor is what we are using for detecting the "Change"
 | 
			
		||||
                        processor = watch.get('processor', 'text_json_diff')
 | 
			
		||||
                        processor_name = watch.get('processor', 'text_json_diff')
 | 
			
		||||
 | 
			
		||||
                        # Init a new 'difference_detection_processor', first look in processors
 | 
			
		||||
                        processor_module_name = f"changedetectionio.processors.{processor}.processor"
 | 
			
		||||
                        
 | 
			
		||||
 | 
			
		||||
                        # First, try to get the processor from our plugin registry
 | 
			
		||||
                        try:
 | 
			
		||||
                            processor_module = importlib.import_module(processor_module_name)
 | 
			
		||||
                        except ModuleNotFoundError as e:
 | 
			
		||||
                            print(f"Processor module '{processor}' not found.")
 | 
			
		||||
                            raise e
 | 
			
		||||
 | 
			
		||||
                        update_handler = processor_module.perform_site_check(datastore=self.datastore,
 | 
			
		||||
                                                                             watch_uuid=uuid
 | 
			
		||||
                                                                             )
 | 
			
		||||
                            from changedetectionio.processors.processor_registry import get_processor_site_check
 | 
			
		||||
                            update_handler = get_processor_site_check(processor_name, self.datastore, uuid)
 | 
			
		||||
                            
 | 
			
		||||
                            if update_handler:
 | 
			
		||||
                                # We found the processor in our plugin registry
 | 
			
		||||
                                logger.info(f"Using processor '{processor_name}' from plugin registry")
 | 
			
		||||
                            else:
 | 
			
		||||
                                # Fall back to the traditional file-based approach
 | 
			
		||||
                                processor_module_name = f"changedetectionio.processors.{processor_name}.processor"
 | 
			
		||||
                                try:
 | 
			
		||||
                                    processor_module = importlib.import_module(processor_module_name)
 | 
			
		||||
                                    update_handler = processor_module.perform_site_check(datastore=self.datastore,
 | 
			
		||||
                                                                                        watch_uuid=uuid)
 | 
			
		||||
                                except ModuleNotFoundError as e:
 | 
			
		||||
                                    print(f"Processor module '{processor_name}' not found in both plugin registry and file system.")
 | 
			
		||||
                                    raise e
 | 
			
		||||
                        except ImportError as e:
 | 
			
		||||
                            # If processor_registry.py cannot be imported, fall back to the traditional approach
 | 
			
		||||
                            processor_module_name = f"changedetectionio.processors.{processor_name}.processor"
 | 
			
		||||
                            try:
 | 
			
		||||
                                processor_module = importlib.import_module(processor_module_name)
 | 
			
		||||
                                update_handler = processor_module.perform_site_check(datastore=self.datastore,
 | 
			
		||||
                                                                                    watch_uuid=uuid)
 | 
			
		||||
                            except ModuleNotFoundError as e:
 | 
			
		||||
                                print(f"Processor module '{processor_name}' not found.")
 | 
			
		||||
                                raise e
 | 
			
		||||
 | 
			
		||||
                        update_handler.call_browser()
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										31
									
								
								test_processor_registration.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										31
									
								
								test_processor_registration.py
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,31 @@
 | 
			
		||||
#!/usr/bin/env python3
 | 
			
		||||
 | 
			
		||||
from changedetectionio.processors import available_processors
 | 
			
		||||
from changedetectionio.processors.processor_registry import get_processor_class, get_processor_form
 | 
			
		||||
 | 
			
		||||
# Test processor registration
 | 
			
		||||
print("=== Available Processors ===")
 | 
			
		||||
processors = available_processors()
 | 
			
		||||
for name, description in processors:
 | 
			
		||||
    print(f"Processor: {name} - {description}")
 | 
			
		||||
 | 
			
		||||
# Check if our WHOIS processor is registered
 | 
			
		||||
whois_processor_name = "whois_processor"
 | 
			
		||||
whois_found = any(name == whois_processor_name for name, _ in processors)
 | 
			
		||||
 | 
			
		||||
if whois_found:
 | 
			
		||||
    print(f"\nWHOIS Processor found! Getting processor class and form...")
 | 
			
		||||
    
 | 
			
		||||
    # Get the processor class
 | 
			
		||||
    processor_class = get_processor_class(whois_processor_name)
 | 
			
		||||
    print(f"Processor class: {processor_class}")
 | 
			
		||||
    print(f"Processor class name: {processor_class.__name__ if processor_class else None}")
 | 
			
		||||
    print(f"Processor class module: {processor_class.__module__ if processor_class else None}")
 | 
			
		||||
    
 | 
			
		||||
    # Get the processor form
 | 
			
		||||
    processor_form = get_processor_form(whois_processor_name)
 | 
			
		||||
    print(f"Processor form: {processor_form}")
 | 
			
		||||
    
 | 
			
		||||
    print("\nWHOIS Processor successfully registered")
 | 
			
		||||
else:
 | 
			
		||||
    print(f"\nWHOIS Processor not found in available processors")
 | 
			
		||||
							
								
								
									
										16
									
								
								test_processors.py
									
									
									
									
									
										Executable file
									
								
							
							
						
						
									
										16
									
								
								test_processors.py
									
									
									
									
									
										Executable file
									
								
							@@ -0,0 +1,16 @@
 | 
			
		||||
#!/usr/bin/env python3
 | 
			
		||||
 | 
			
		||||
from changedetectionio.processors import available_processors
 | 
			
		||||
from changedetectionio.processors import find_processors
 | 
			
		||||
 | 
			
		||||
# Test traditional processor discovery
 | 
			
		||||
print("=== Traditional Processor Discovery ===")
 | 
			
		||||
traditional_processors = find_processors()
 | 
			
		||||
for module, name in traditional_processors:
 | 
			
		||||
    print(f"Found processor: {name} in {module.__name__}")
 | 
			
		||||
 | 
			
		||||
# Test combined processor discovery (traditional + pluggy)
 | 
			
		||||
print("\n=== Combined Processor Discovery ===")
 | 
			
		||||
combined_processors = available_processors()
 | 
			
		||||
for name, description in combined_processors:
 | 
			
		||||
    print(f"Processor: {name} - {description}")
 | 
			
		||||
							
								
								
									
										53
									
								
								test_whois_extraction.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										53
									
								
								test_whois_extraction.py
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,53 @@
 | 
			
		||||
#!/usr/bin/env python3
 | 
			
		||||
 | 
			
		||||
import urllib.parse
 | 
			
		||||
import re
 | 
			
		||||
import sys
 | 
			
		||||
 | 
			
		||||
def extract_domain_from_url(url):
 | 
			
		||||
    """Extract domain from a URL"""
 | 
			
		||||
    parsed_url = urllib.parse.urlparse(url)
 | 
			
		||||
    domain = parsed_url.netloc
 | 
			
		||||
    
 | 
			
		||||
    # Remove www. prefix if present
 | 
			
		||||
    domain = re.sub(r'^www\.', '', domain)
 | 
			
		||||
    
 | 
			
		||||
    return domain
 | 
			
		||||
 | 
			
		||||
# Test domain extraction
 | 
			
		||||
test_urls = [
 | 
			
		||||
    "https://changedetection.io",
 | 
			
		||||
    "http://www.example.com/page",
 | 
			
		||||
    "https://subdomain.domain.co.uk/path?query=1",
 | 
			
		||||
    "ftp://ftp.example.org",
 | 
			
		||||
    "https://www.changedetection.io/page/subpage",
 | 
			
		||||
]
 | 
			
		||||
 | 
			
		||||
print("=== Domain Extraction Test ===")
 | 
			
		||||
for url in test_urls:
 | 
			
		||||
    domain = extract_domain_from_url(url)
 | 
			
		||||
    print(f"URL: {url} -> Domain: {domain}")
 | 
			
		||||
 | 
			
		||||
# Test WHOIS lookup for changedetection.io
 | 
			
		||||
try:
 | 
			
		||||
    import whois
 | 
			
		||||
    
 | 
			
		||||
    domain = extract_domain_from_url("https://changedetection.io")
 | 
			
		||||
    print(f"\n=== WHOIS lookup for {domain} ===")
 | 
			
		||||
    
 | 
			
		||||
    whois_info = whois.whois(domain)
 | 
			
		||||
    
 | 
			
		||||
    # Print key information
 | 
			
		||||
    print(f"Domain Name: {whois_info.get('domain_name', '')}")
 | 
			
		||||
    print(f"Registrar: {whois_info.get('registrar', '')}")
 | 
			
		||||
    print(f"Creation Date: {whois_info.get('creation_date', '')}")
 | 
			
		||||
    print(f"Expiration Date: {whois_info.get('expiration_date', '')}")
 | 
			
		||||
    
 | 
			
		||||
    print("\nWHOIS lookup successful!")
 | 
			
		||||
    
 | 
			
		||||
except ImportError:
 | 
			
		||||
    print("python-whois module not installed. Run: pip install python-whois")
 | 
			
		||||
    sys.exit(1)
 | 
			
		||||
except Exception as e:
 | 
			
		||||
    print(f"Error performing WHOIS lookup: {str(e)}")
 | 
			
		||||
    sys.exit(1)
 | 
			
		||||
							
								
								
									
										47
									
								
								test_whois_processor.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										47
									
								
								test_whois_processor.py
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,47 @@
 | 
			
		||||
#!/usr/bin/env python3
 | 
			
		||||
 | 
			
		||||
from changedetectionio.processors import available_processors
 | 
			
		||||
from changedetectionio.processors.processor_registry import get_processor_class
 | 
			
		||||
import urllib.parse
 | 
			
		||||
import sys
 | 
			
		||||
 | 
			
		||||
# First, verify our processor is available
 | 
			
		||||
print("=== Available Processors ===")
 | 
			
		||||
processors = available_processors()
 | 
			
		||||
for name, description in processors:
 | 
			
		||||
    print(f"Processor: {name} - {description}")
 | 
			
		||||
 | 
			
		||||
# Get the WHOIS processor class
 | 
			
		||||
whois_processor_class = get_processor_class("whois_processor")
 | 
			
		||||
if not whois_processor_class:
 | 
			
		||||
    print("ERROR: WHOIS processor not found in available processors.")
 | 
			
		||||
    sys.exit(1)
 | 
			
		||||
 | 
			
		||||
print(f"\nFound WHOIS processor class: {whois_processor_class}")
 | 
			
		||||
 | 
			
		||||
# Test the WHOIS processor directly
 | 
			
		||||
try:
 | 
			
		||||
    # Parse a domain from a URL
 | 
			
		||||
    url = "https://changedetection.io"
 | 
			
		||||
    parsed_url = urllib.parse.urlparse(url)
 | 
			
		||||
    domain = parsed_url.netloc
 | 
			
		||||
    
 | 
			
		||||
    # Import whois and fetch information
 | 
			
		||||
    import whois
 | 
			
		||||
    whois_info = whois.whois(domain)
 | 
			
		||||
    
 | 
			
		||||
    print(f"\n=== WHOIS Information for {domain} ===")
 | 
			
		||||
    
 | 
			
		||||
    # Print the information
 | 
			
		||||
    if hasattr(whois_info, 'text'):
 | 
			
		||||
        print(whois_info.text)
 | 
			
		||||
    else:
 | 
			
		||||
        for key, value in whois_info.items():
 | 
			
		||||
            if value:
 | 
			
		||||
                print(f"{key}: {value}")
 | 
			
		||||
                
 | 
			
		||||
    print("\nSuccessfully retrieved WHOIS data!")
 | 
			
		||||
    
 | 
			
		||||
except Exception as e:
 | 
			
		||||
    print(f"Error fetching WHOIS data: {str(e)}")
 | 
			
		||||
    sys.exit(1)
 | 
			
		||||
							
								
								
									
										136
									
								
								test_whois_processor_full.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										136
									
								
								test_whois_processor_full.py
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,136 @@
 | 
			
		||||
#!/usr/bin/env python3
 | 
			
		||||
 | 
			
		||||
from changedetectionio.processors import available_processors
 | 
			
		||||
from changedetectionio.processors.processor_registry import get_processor_class
 | 
			
		||||
import unittest
 | 
			
		||||
import sys
 | 
			
		||||
from unittest.mock import MagicMock, patch
 | 
			
		||||
import urllib.parse
 | 
			
		||||
 | 
			
		||||
# First, verify our processor is available
 | 
			
		||||
print("=== Available Processors ===")
 | 
			
		||||
processors = available_processors()
 | 
			
		||||
for name, description in processors:
 | 
			
		||||
    print(f"Processor: {name} - {description}")
 | 
			
		||||
 | 
			
		||||
# Get the WHOIS processor class
 | 
			
		||||
whois_processor_class = get_processor_class("whois_processor")
 | 
			
		||||
if not whois_processor_class:
 | 
			
		||||
    print("ERROR: WHOIS processor not found in available processors.")
 | 
			
		||||
    sys.exit(1)
 | 
			
		||||
 | 
			
		||||
print(f"\nFound WHOIS processor class: {whois_processor_class}")
 | 
			
		||||
 | 
			
		||||
# Create a test for our WHOIS processor
 | 
			
		||||
class TestWhoisProcessor(unittest.TestCase):
 | 
			
		||||
    
 | 
			
		||||
    # Use the real whois function - tests will actually make network requests
 | 
			
		||||
    def test_whois_processor_real(self):
 | 
			
		||||
        # Extract the domain from the URL
 | 
			
		||||
        test_url = "https://changedetection.io"
 | 
			
		||||
        parsed_url = urllib.parse.urlparse(test_url)
 | 
			
		||||
        domain = parsed_url.netloc
 | 
			
		||||
        
 | 
			
		||||
        # Create a minimal mock datastore
 | 
			
		||||
        mock_datastore = MagicMock()
 | 
			
		||||
        mock_datastore.data = {
 | 
			
		||||
            'watching': {'test-uuid': {'url': test_url}},
 | 
			
		||||
            'settings': {
 | 
			
		||||
                'application': {'empty_pages_are_a_change': False},
 | 
			
		||||
                'requests': {'timeout': 30}
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
        mock_datastore.get_all_base_headers.return_value = {}
 | 
			
		||||
        mock_datastore.get_all_headers_in_textfile_for_watch.return_value = {}
 | 
			
		||||
        mock_datastore.get_preferred_proxy_for_watch.return_value = None
 | 
			
		||||
        mock_datastore.get_tag_overrides_for_watch.return_value = []
 | 
			
		||||
        
 | 
			
		||||
        # Create a minimal mock watch that mimics the real Watch class
 | 
			
		||||
        class MockWatch:
 | 
			
		||||
            def __init__(self, url):
 | 
			
		||||
                self.link = url
 | 
			
		||||
                self.is_pdf = False
 | 
			
		||||
                self.has_browser_steps = False
 | 
			
		||||
                self.is_source_type_url = False
 | 
			
		||||
                self.history = {}
 | 
			
		||||
                self.history_n = 0
 | 
			
		||||
                self.last_viewed = 0
 | 
			
		||||
                self.newest_history_key = 0
 | 
			
		||||
                
 | 
			
		||||
            def get(self, key, default=None):
 | 
			
		||||
                if key == 'uuid':
 | 
			
		||||
                    return 'test-uuid'
 | 
			
		||||
                elif key == 'include_filters':
 | 
			
		||||
                    return []
 | 
			
		||||
                elif key == 'body':
 | 
			
		||||
                    return None
 | 
			
		||||
                elif key == 'method':
 | 
			
		||||
                    return 'GET'
 | 
			
		||||
                elif key == 'headers':
 | 
			
		||||
                    return {}
 | 
			
		||||
                elif key == 'browser_steps':
 | 
			
		||||
                    return []
 | 
			
		||||
                return default
 | 
			
		||||
                
 | 
			
		||||
            def __getitem__(self, key):
 | 
			
		||||
                return self.get(key)
 | 
			
		||||
                
 | 
			
		||||
            def get_last_fetched_text_before_filters(self):
 | 
			
		||||
                return ""
 | 
			
		||||
            
 | 
			
		||||
            def save_last_text_fetched_before_filters(self, content):
 | 
			
		||||
                pass
 | 
			
		||||
                
 | 
			
		||||
            def has_special_diff_filter_options_set(self):
 | 
			
		||||
                return False
 | 
			
		||||
                
 | 
			
		||||
            def lines_contain_something_unique_compared_to_history(self, lines, ignore_whitespace):
 | 
			
		||||
                return True
 | 
			
		||||
                
 | 
			
		||||
        mock_watch = MockWatch(test_url)
 | 
			
		||||
        
 | 
			
		||||
        # Create a more complete mock fetcher
 | 
			
		||||
        class MockFetcher:
 | 
			
		||||
            def __init__(self):
 | 
			
		||||
                self.content = ""
 | 
			
		||||
                self.raw_content = b""
 | 
			
		||||
                self.headers = {'Content-Type': 'text/plain'}
 | 
			
		||||
                self.screenshot = None
 | 
			
		||||
                self.xpath_data = None
 | 
			
		||||
                self.instock_data = None
 | 
			
		||||
                self.browser_steps = []
 | 
			
		||||
            
 | 
			
		||||
            def get_last_status_code(self):
 | 
			
		||||
                return 200
 | 
			
		||||
                
 | 
			
		||||
            def get_all_headers(self):
 | 
			
		||||
                return {'content-type': 'text/plain'}
 | 
			
		||||
                
 | 
			
		||||
            def quit(self):
 | 
			
		||||
                pass
 | 
			
		||||
                
 | 
			
		||||
            def run(self, **kwargs):
 | 
			
		||||
                pass
 | 
			
		||||
                
 | 
			
		||||
        # Create the processor and set the mock fetcher
 | 
			
		||||
        processor = whois_processor_class(datastore=mock_datastore, watch_uuid='test-uuid')
 | 
			
		||||
        processor.fetcher = MockFetcher()
 | 
			
		||||
        
 | 
			
		||||
        # Run the processor - this will make an actual WHOIS request
 | 
			
		||||
        changed, update_obj, content = processor.run_changedetection(mock_watch)
 | 
			
		||||
        
 | 
			
		||||
        # Print the content for debugging
 | 
			
		||||
        content_str = content.decode('utf-8')
 | 
			
		||||
        print(f"\n=== WHOIS Content from processor (first 200 chars) ===")
 | 
			
		||||
        print(content_str[:200] + "...")
 | 
			
		||||
        
 | 
			
		||||
        # Verify the content contains domain information
 | 
			
		||||
        self.assertIn(domain, content_str)
 | 
			
		||||
        self.assertIn("Domain Name", content_str)
 | 
			
		||||
        self.assertIn("Creation Date", content_str)
 | 
			
		||||
        
 | 
			
		||||
        print("\nWHOIS processor test with real data PASSED!")
 | 
			
		||||
 | 
			
		||||
# Run the test
 | 
			
		||||
if __name__ == "__main__":
 | 
			
		||||
    unittest.main(argv=['first-arg-is-ignored'], exit=False)
 | 
			
		||||
							
								
								
									
										39
									
								
								test_whois_simple.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										39
									
								
								test_whois_simple.py
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,39 @@
 | 
			
		||||
#!/usr/bin/env python3
 | 
			
		||||
 | 
			
		||||
import urllib.parse
 | 
			
		||||
import re
 | 
			
		||||
import whois
 | 
			
		||||
 | 
			
		||||
# Test with changedetection.io domain
 | 
			
		||||
url = "https://changedetection.io"
 | 
			
		||||
 | 
			
		||||
# Extract domain from URL
 | 
			
		||||
parsed_url = urllib.parse.urlparse(url)
 | 
			
		||||
domain = parsed_url.netloc
 | 
			
		||||
 | 
			
		||||
# Remove www. prefix if present
 | 
			
		||||
domain = re.sub(r'^www\.', '', domain)
 | 
			
		||||
 | 
			
		||||
# Fetch WHOIS information
 | 
			
		||||
print(f"Looking up WHOIS data for domain: {domain}")
 | 
			
		||||
whois_info = whois.whois(domain)
 | 
			
		||||
 | 
			
		||||
# Print key WHOIS data
 | 
			
		||||
print("\nKey WHOIS information:")
 | 
			
		||||
print(f"Domain Name: {whois_info.get('domain_name', 'Unknown')}")
 | 
			
		||||
print(f"Registrar: {whois_info.get('registrar', 'Unknown')}")
 | 
			
		||||
print(f"Creation Date: {whois_info.get('creation_date', 'Unknown')}")
 | 
			
		||||
print(f"Expiration Date: {whois_info.get('expiration_date', 'Unknown')}")
 | 
			
		||||
print(f"Updated Date: {whois_info.get('updated_date', 'Unknown')}")
 | 
			
		||||
 | 
			
		||||
# Format as text
 | 
			
		||||
whois_text = f"WHOIS Information for domain: {domain}\n\n"
 | 
			
		||||
for key, value in whois_info.items():
 | 
			
		||||
    if value:
 | 
			
		||||
        whois_text += f"{key}: {value}\n"
 | 
			
		||||
 | 
			
		||||
# Print the first 200 characters
 | 
			
		||||
print("\nFormatted WHOIS data (first 200 chars):")
 | 
			
		||||
print(whois_text[:200] + "...")
 | 
			
		||||
 | 
			
		||||
print("\nWHOIS lookup successful!")
 | 
			
		||||
		Reference in New Issue
	
	Block a user