mirror of
				https://github.com/dgtlmoon/changedetection.io.git
				synced 2025-11-04 00:27:48 +00:00 
			
		
		
		
	Compare commits
	
		
			6 Commits
		
	
	
		
			adjustable
			...
			1924-dynam
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
| 
						 | 
					0e4c39f1ef | ||
| 
						 | 
					460a7762e5 | ||
| 
						 | 
					6c44973cfd | ||
| 
						 | 
					160901a32c | ||
| 
						 | 
					73c0bd1839 | ||
| 
						 | 
					a15978f578 | 
@@ -137,6 +137,7 @@ class import_distill_io_json(Importer):
 | 
			
		||||
 | 
			
		||||
        flash("{} Imported from Distill.io in {:.2f}s, {} Skipped.".format(len(self.new_uuids), time.time() - now, len(self.remaining_data)))
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class import_xlsx_wachete(Importer):
 | 
			
		||||
 | 
			
		||||
    def run(self,
 | 
			
		||||
@@ -144,6 +145,7 @@ class import_xlsx_wachete(Importer):
 | 
			
		||||
            flash,
 | 
			
		||||
            datastore,
 | 
			
		||||
            ):
 | 
			
		||||
 | 
			
		||||
        good = 0
 | 
			
		||||
        now = time.time()
 | 
			
		||||
        self.new_uuids = []
 | 
			
		||||
@@ -153,62 +155,67 @@ class import_xlsx_wachete(Importer):
 | 
			
		||||
        try:
 | 
			
		||||
            wb = load_workbook(data)
 | 
			
		||||
        except Exception as e:
 | 
			
		||||
            #@todo correct except
 | 
			
		||||
            # @todo correct except
 | 
			
		||||
            flash("Unable to read export XLSX file, something wrong with the file?", 'error')
 | 
			
		||||
            return
 | 
			
		||||
 | 
			
		||||
        sheet_obj = wb.active
 | 
			
		||||
        row_id = 2
 | 
			
		||||
        for row in wb.active.iter_rows(min_row=row_id):
 | 
			
		||||
            try:
 | 
			
		||||
                extras = {}
 | 
			
		||||
                data = {}
 | 
			
		||||
                for cell in row:
 | 
			
		||||
                    if not cell.value:
 | 
			
		||||
                        continue
 | 
			
		||||
                    column_title = wb.active.cell(row=1, column=cell.column).value.strip().lower()
 | 
			
		||||
                    data[column_title] = cell.value
 | 
			
		||||
 | 
			
		||||
        i = 1
 | 
			
		||||
        row = 2
 | 
			
		||||
        while sheet_obj.cell(row=row, column=1).value:
 | 
			
		||||
            data = {}
 | 
			
		||||
            while sheet_obj.cell(row=row, column=i).value:
 | 
			
		||||
                column_title = sheet_obj.cell(row=1, column=i).value.strip().lower()
 | 
			
		||||
                column_row_value = sheet_obj.cell(row=row, column=i).value
 | 
			
		||||
                data[column_title] = column_row_value
 | 
			
		||||
                # Forced switch to webdriver/playwright/etc
 | 
			
		||||
                dynamic_wachet = str(data.get('dynamic wachet')).strip().lower()  # Convert bool to str to cover all cases
 | 
			
		||||
                # libreoffice and others can have it as =FALSE() =TRUE(), or bool(true)
 | 
			
		||||
                if 'true' in dynamic_wachet or dynamic_wachet == '1':
 | 
			
		||||
                    extras['fetch_backend'] = 'html_webdriver'
 | 
			
		||||
 | 
			
		||||
                i += 1
 | 
			
		||||
                if data.get('xpath'):
 | 
			
		||||
                    # @todo split by || ?
 | 
			
		||||
                    extras['include_filters'] = [data.get('xpath')]
 | 
			
		||||
                if data.get('name'):
 | 
			
		||||
                    extras['title'] = data.get('name').strip()
 | 
			
		||||
                if data.get('interval (min)'):
 | 
			
		||||
                    minutes = int(data.get('interval (min)'))
 | 
			
		||||
                    hours, minutes = divmod(minutes, 60)
 | 
			
		||||
                    days, hours = divmod(hours, 24)
 | 
			
		||||
                    weeks, days = divmod(days, 7)
 | 
			
		||||
                    extras['time_between_check'] = {'weeks': weeks, 'days': days, 'hours': hours, 'minutes': minutes, 'seconds': 0}
 | 
			
		||||
 | 
			
		||||
            extras = {}
 | 
			
		||||
            if data.get('xpath'):
 | 
			
		||||
                #@todo split by || ?
 | 
			
		||||
                extras['include_filters'] = [data.get('xpath')]
 | 
			
		||||
            if data.get('name'):
 | 
			
		||||
                extras['title'] = [data.get('name').strip()]
 | 
			
		||||
            if data.get('interval (min)'):
 | 
			
		||||
                minutes = int(data.get('interval (min)'))
 | 
			
		||||
                hours, minutes = divmod(minutes, 60)
 | 
			
		||||
                days, hours = divmod(hours, 24)
 | 
			
		||||
                weeks, days = divmod(days, 7)
 | 
			
		||||
                extras['time_between_check'] = {'weeks': weeks, 'days': days, 'hours': hours, 'minutes': minutes, 'seconds': 0}
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
            # At minimum a URL is required.
 | 
			
		||||
            if data.get('url'):
 | 
			
		||||
                try:
 | 
			
		||||
                    validate_url(data.get('url'))
 | 
			
		||||
                except ValidationError as e:
 | 
			
		||||
                    print(">> import URL error", data.get('url'), str(e))
 | 
			
		||||
                    # Don't bother processing anything else on this row
 | 
			
		||||
                    continue
 | 
			
		||||
 | 
			
		||||
                new_uuid = datastore.add_watch(url=data['url'].strip(),
 | 
			
		||||
                                               extras=extras,
 | 
			
		||||
                                               tag=data.get('folder'),
 | 
			
		||||
                                               write_to_disk_now=False)
 | 
			
		||||
                if new_uuid:
 | 
			
		||||
                    # Straight into the queue.
 | 
			
		||||
                    self.new_uuids.append(new_uuid)
 | 
			
		||||
                    good += 1
 | 
			
		||||
 | 
			
		||||
            row += 1
 | 
			
		||||
            i = 1
 | 
			
		||||
                # At minimum a URL is required.
 | 
			
		||||
                if data.get('url'):
 | 
			
		||||
                    try:
 | 
			
		||||
                        validate_url(data.get('url'))
 | 
			
		||||
                    except ValidationError as e:
 | 
			
		||||
                        print(">> import URL error", data.get('url'), str(e))
 | 
			
		||||
                        flash(f"Error processing row number {row_id}, URL value was incorrect, row was skipped.", 'error')
 | 
			
		||||
                        # Don't bother processing anything else on this row
 | 
			
		||||
                        continue
 | 
			
		||||
 | 
			
		||||
                    new_uuid = datastore.add_watch(url=data['url'].strip(),
 | 
			
		||||
                                                   extras=extras,
 | 
			
		||||
                                                   tag=data.get('folder'),
 | 
			
		||||
                                                   write_to_disk_now=False)
 | 
			
		||||
                    if new_uuid:
 | 
			
		||||
                        # Straight into the queue.
 | 
			
		||||
                        self.new_uuids.append(new_uuid)
 | 
			
		||||
                        good += 1
 | 
			
		||||
            except Exception as e:
 | 
			
		||||
                print(e)
 | 
			
		||||
                flash(f"Error processing row number {row_id}, check all cell data types are correct, row was skipped.", 'error')
 | 
			
		||||
            else:
 | 
			
		||||
                row_id += 1
 | 
			
		||||
 | 
			
		||||
        flash(
 | 
			
		||||
            "{} imported from Wachete .xlsx in {:.2f}s".format(len(self.new_uuids), time.time() - now))
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class import_xlsx_custom(Importer):
 | 
			
		||||
 | 
			
		||||
    def run(self,
 | 
			
		||||
@@ -216,6 +223,7 @@ class import_xlsx_custom(Importer):
 | 
			
		||||
            flash,
 | 
			
		||||
            datastore,
 | 
			
		||||
            ):
 | 
			
		||||
 | 
			
		||||
        good = 0
 | 
			
		||||
        now = time.time()
 | 
			
		||||
        self.new_uuids = []
 | 
			
		||||
@@ -225,56 +233,68 @@ class import_xlsx_custom(Importer):
 | 
			
		||||
        try:
 | 
			
		||||
            wb = load_workbook(data)
 | 
			
		||||
        except Exception as e:
 | 
			
		||||
            #@todo correct except
 | 
			
		||||
            # @todo correct except
 | 
			
		||||
            flash("Unable to read export XLSX file, something wrong with the file?", 'error')
 | 
			
		||||
            return
 | 
			
		||||
 | 
			
		||||
        # @todo cehck atleast 2 rows, same in other method
 | 
			
		||||
 | 
			
		||||
        sheet_obj = wb.active
 | 
			
		||||
        from .forms import validate_url
 | 
			
		||||
        row = 2
 | 
			
		||||
        while sheet_obj.cell(row=row, column=1).value:
 | 
			
		||||
            url = None
 | 
			
		||||
            tags = None
 | 
			
		||||
            extras = {}
 | 
			
		||||
            for col_i, cell_map in self.import_profile.items():
 | 
			
		||||
                cell_val = sheet_obj.cell(row=row, column=col_i).value
 | 
			
		||||
                if cell_map == 'url':
 | 
			
		||||
                    url = cell_val.strip()
 | 
			
		||||
                    try:
 | 
			
		||||
                        validate_url(url)
 | 
			
		||||
                    except ValidationError as e:
 | 
			
		||||
                        print (">> Import URL error",url, str(e))
 | 
			
		||||
                        # Don't bother processing anything else on this row
 | 
			
		||||
                        url = None
 | 
			
		||||
                        break
 | 
			
		||||
        row_i = 1
 | 
			
		||||
 | 
			
		||||
                elif cell_map == 'tag':
 | 
			
		||||
                    tags = cell_val.strip()
 | 
			
		||||
                elif cell_map == 'include_filters':
 | 
			
		||||
                    # @todo validate?
 | 
			
		||||
                    extras['include_filters'] = [cell_val.strip()]
 | 
			
		||||
                elif cell_map == 'interval_minutes':
 | 
			
		||||
                    hours, minutes = divmod(int(cell_val), 60)
 | 
			
		||||
                    days, hours = divmod(hours, 24)
 | 
			
		||||
                    weeks, days = divmod(days, 7)
 | 
			
		||||
                    extras['time_between_check'] = {'weeks': weeks, 'days': days, 'hours': hours, 'minutes': minutes, 'seconds': 0}
 | 
			
		||||
                else:
 | 
			
		||||
                    extras[cell_map] = cell_val.strip()
 | 
			
		||||
        try:
 | 
			
		||||
            for row in wb.active.iter_rows():
 | 
			
		||||
                url = None
 | 
			
		||||
                tags = None
 | 
			
		||||
                extras = {}
 | 
			
		||||
 | 
			
		||||
            # At minimum a URL is required.
 | 
			
		||||
            if url:
 | 
			
		||||
                new_uuid = datastore.add_watch(url=url,
 | 
			
		||||
                                               extras=extras,
 | 
			
		||||
                                               tag=tags,
 | 
			
		||||
                                               write_to_disk_now=False)
 | 
			
		||||
                if new_uuid:
 | 
			
		||||
                    # Straight into the queue.
 | 
			
		||||
                    self.new_uuids.append(new_uuid)
 | 
			
		||||
                    good += 1
 | 
			
		||||
                for cell in row:
 | 
			
		||||
                    if not self.import_profile.get(cell.col_idx):
 | 
			
		||||
                        continue
 | 
			
		||||
                    if not cell.value:
 | 
			
		||||
                        continue
 | 
			
		||||
 | 
			
		||||
            row += 1
 | 
			
		||||
                    cell_map = self.import_profile.get(cell.col_idx)
 | 
			
		||||
 | 
			
		||||
                    cell_val = str(cell.value).strip()  # could be bool
 | 
			
		||||
 | 
			
		||||
                    if cell_map == 'url':
 | 
			
		||||
                        url = cell.value.strip()
 | 
			
		||||
                        try:
 | 
			
		||||
                            validate_url(url)
 | 
			
		||||
                        except ValidationError as e:
 | 
			
		||||
                            print(">> Import URL error", url, str(e))
 | 
			
		||||
                            flash(f"Error processing row number {row_i}, URL value was incorrect, row was skipped.", 'error')
 | 
			
		||||
                            # Don't bother processing anything else on this row
 | 
			
		||||
                            url = None
 | 
			
		||||
                            break
 | 
			
		||||
                    elif cell_map == 'tag':
 | 
			
		||||
                        tags = cell.value.strip()
 | 
			
		||||
                    elif cell_map == 'include_filters':
 | 
			
		||||
                        # @todo validate?
 | 
			
		||||
                        extras['include_filters'] = [cell.value.strip()]
 | 
			
		||||
                    elif cell_map == 'interval_minutes':
 | 
			
		||||
                        hours, minutes = divmod(int(cell_val), 60)
 | 
			
		||||
                        days, hours = divmod(hours, 24)
 | 
			
		||||
                        weeks, days = divmod(days, 7)
 | 
			
		||||
                        extras['time_between_check'] = {'weeks': weeks, 'days': days, 'hours': hours, 'minutes': minutes, 'seconds': 0}
 | 
			
		||||
                    else:
 | 
			
		||||
                        extras[cell_map] = cell_val
 | 
			
		||||
 | 
			
		||||
                # At minimum a URL is required.
 | 
			
		||||
                if url:
 | 
			
		||||
                    new_uuid = datastore.add_watch(url=url,
 | 
			
		||||
                                                   extras=extras,
 | 
			
		||||
                                                   tag=tags,
 | 
			
		||||
                                                   write_to_disk_now=False)
 | 
			
		||||
                    if new_uuid:
 | 
			
		||||
                        # Straight into the queue.
 | 
			
		||||
                        self.new_uuids.append(new_uuid)
 | 
			
		||||
                        good += 1
 | 
			
		||||
        except Exception as e:
 | 
			
		||||
            print(e)
 | 
			
		||||
            flash(f"Error processing row number {row_i}, check all cell data types are correct, row was skipped.", 'error')
 | 
			
		||||
        else:
 | 
			
		||||
            row_i += 1
 | 
			
		||||
 | 
			
		||||
        flash(
 | 
			
		||||
            "{} imported from custom .xlsx in {:.2f}s".format(len(self.new_uuids), time.time() - now))
 | 
			
		||||
 
 | 
			
		||||
										
											Binary file not shown.
										
									
								
							@@ -127,6 +127,7 @@ def test_import_custom_xlsx(client, live_server):
 | 
			
		||||
    """Test can upload a excel spreadsheet and the watches are created correctly"""
 | 
			
		||||
 | 
			
		||||
    #live_server_setup(live_server)
 | 
			
		||||
 | 
			
		||||
    dirname = os.path.dirname(__file__)
 | 
			
		||||
    filename = os.path.join(dirname, 'import/spreadsheet.xlsx')
 | 
			
		||||
    with open(filename, 'rb') as f:
 | 
			
		||||
@@ -150,13 +151,14 @@ def test_import_custom_xlsx(client, live_server):
 | 
			
		||||
        follow_redirects=True,
 | 
			
		||||
    )
 | 
			
		||||
 | 
			
		||||
    assert b'2 imported from custom .xlsx' in res.data
 | 
			
		||||
    assert b'3 imported from custom .xlsx' in res.data
 | 
			
		||||
    # Because this row was actually just a header with no usable URL, we should get an error
 | 
			
		||||
    assert b'Error processing row number 1' in res.data
 | 
			
		||||
 | 
			
		||||
    res = client.get(
 | 
			
		||||
        url_for("index")
 | 
			
		||||
    )
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
    assert b'Somesite results ABC' in res.data
 | 
			
		||||
    assert b'City news results' in res.data
 | 
			
		||||
 | 
			
		||||
@@ -167,6 +169,9 @@ def test_import_custom_xlsx(client, live_server):
 | 
			
		||||
            assert filters[0] == '/html[1]/body[1]/div[4]/div[1]/div[1]/div[1]||//*[@id=\'content\']/div[3]/div[1]/div[1]||//*[@id=\'content\']/div[1]'
 | 
			
		||||
            assert watch.get('time_between_check') == {'weeks': 0, 'days': 1, 'hours': 6, 'minutes': 24, 'seconds': 0}
 | 
			
		||||
 | 
			
		||||
    res = client.get(url_for("form_delete", uuid="all"), follow_redirects=True)
 | 
			
		||||
    assert b'Deleted' in res.data
 | 
			
		||||
 | 
			
		||||
def test_import_watchete_xlsx(client, live_server):
 | 
			
		||||
    """Test can upload a excel spreadsheet and the watches are created correctly"""
 | 
			
		||||
 | 
			
		||||
@@ -186,7 +191,7 @@ def test_import_watchete_xlsx(client, live_server):
 | 
			
		||||
        follow_redirects=True,
 | 
			
		||||
    )
 | 
			
		||||
 | 
			
		||||
    assert b'2 imported from Wachete .xlsx' in res.data
 | 
			
		||||
    assert b'3 imported from Wachete .xlsx' in res.data
 | 
			
		||||
 | 
			
		||||
    res = client.get(
 | 
			
		||||
        url_for("index")
 | 
			
		||||
@@ -201,3 +206,10 @@ def test_import_watchete_xlsx(client, live_server):
 | 
			
		||||
            filters = watch.get('include_filters')
 | 
			
		||||
            assert filters[0] == '/html[1]/body[1]/div[4]/div[1]/div[1]/div[1]||//*[@id=\'content\']/div[3]/div[1]/div[1]||//*[@id=\'content\']/div[1]'
 | 
			
		||||
            assert watch.get('time_between_check') == {'weeks': 0, 'days': 1, 'hours': 6, 'minutes': 24, 'seconds': 0}
 | 
			
		||||
            assert watch.get('fetch_backend') == 'system' # always uses default
 | 
			
		||||
 | 
			
		||||
        if watch.get('title') == 'JS website':
 | 
			
		||||
            assert watch.get('fetch_backend') == 'html_webdriver' # Has active 'dynamic wachet'
 | 
			
		||||
 | 
			
		||||
    res = client.get(url_for("form_delete", uuid="all"), follow_redirects=True)
 | 
			
		||||
    assert b'Deleted' in res.data
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user