mirror of
https://github.com/dgtlmoon/changedetection.io.git
synced 2026-01-14 11:10:19 +00:00
Compare commits
4 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
50b349b464 | ||
|
|
67d097cca7 | ||
|
|
494385a379 | ||
|
|
c2ee84b753 |
@@ -2,7 +2,7 @@
|
||||
|
||||
# Read more https://github.com/dgtlmoon/changedetection.io/wiki
|
||||
# Semver means never use .01, or 00. Should be .1.
|
||||
__version__ = '0.52.2'
|
||||
__version__ = '0.52.3'
|
||||
|
||||
from changedetectionio.strtobool import strtobool
|
||||
from json.decoder import JSONDecodeError
|
||||
|
||||
@@ -21,31 +21,154 @@ from changedetectionio.flask_app import login_optionally_required
|
||||
from loguru import logger
|
||||
|
||||
browsersteps_sessions = {}
|
||||
browsersteps_watch_to_session = {} # Maps watch_uuid -> browsersteps_session_id
|
||||
io_interface_context = None
|
||||
import json
|
||||
import hashlib
|
||||
from flask import Response
|
||||
import asyncio
|
||||
import threading
|
||||
import time
|
||||
|
||||
def run_async_in_browser_loop(coro):
|
||||
"""Run async coroutine using the existing async worker event loop"""
|
||||
from changedetectionio import worker_handler
|
||||
|
||||
# Use the existing async worker event loop instead of creating a new one
|
||||
if worker_handler.USE_ASYNC_WORKERS and worker_handler.async_loop and not worker_handler.async_loop.is_closed():
|
||||
logger.debug("Browser steps using existing async worker event loop")
|
||||
future = asyncio.run_coroutine_threadsafe(coro, worker_handler.async_loop)
|
||||
return future.result()
|
||||
else:
|
||||
# Fallback: create a new event loop (for sync workers or if async loop not available)
|
||||
logger.debug("Browser steps creating temporary event loop")
|
||||
loop = asyncio.new_event_loop()
|
||||
asyncio.set_event_loop(loop)
|
||||
# Dedicated event loop for ALL browser steps sessions
|
||||
_browser_steps_loop = None
|
||||
_browser_steps_thread = None
|
||||
_browser_steps_loop_lock = threading.Lock()
|
||||
|
||||
def _start_browser_steps_loop():
|
||||
"""Start a dedicated event loop for browser steps in its own thread"""
|
||||
global _browser_steps_loop
|
||||
|
||||
# Create and set the event loop for this thread
|
||||
loop = asyncio.new_event_loop()
|
||||
asyncio.set_event_loop(loop)
|
||||
_browser_steps_loop = loop
|
||||
|
||||
logger.debug("Browser steps event loop started")
|
||||
|
||||
try:
|
||||
# Run the loop forever - handles all browsersteps sessions
|
||||
loop.run_forever()
|
||||
except Exception as e:
|
||||
logger.error(f"Browser steps event loop error: {e}")
|
||||
finally:
|
||||
try:
|
||||
return loop.run_until_complete(coro)
|
||||
# Cancel all remaining tasks
|
||||
pending = asyncio.all_tasks(loop)
|
||||
for task in pending:
|
||||
task.cancel()
|
||||
|
||||
# Wait for tasks to finish cancellation
|
||||
if pending:
|
||||
loop.run_until_complete(asyncio.gather(*pending, return_exceptions=True))
|
||||
except Exception as e:
|
||||
logger.debug(f"Error during browser steps loop cleanup: {e}")
|
||||
finally:
|
||||
loop.close()
|
||||
logger.debug("Browser steps event loop closed")
|
||||
|
||||
def _ensure_browser_steps_loop():
|
||||
"""Ensure the browser steps event loop is running"""
|
||||
global _browser_steps_loop, _browser_steps_thread
|
||||
|
||||
with _browser_steps_loop_lock:
|
||||
if _browser_steps_thread is None or not _browser_steps_thread.is_alive():
|
||||
logger.debug("Starting browser steps event loop thread")
|
||||
_browser_steps_thread = threading.Thread(
|
||||
target=_start_browser_steps_loop,
|
||||
daemon=True,
|
||||
name="BrowserStepsEventLoop"
|
||||
)
|
||||
_browser_steps_thread.start()
|
||||
|
||||
# Wait for the loop to be ready
|
||||
timeout = 5.0
|
||||
start_time = time.time()
|
||||
while _browser_steps_loop is None:
|
||||
if time.time() - start_time > timeout:
|
||||
raise RuntimeError("Browser steps event loop failed to start")
|
||||
time.sleep(0.01)
|
||||
|
||||
logger.debug("Browser steps event loop thread started and ready")
|
||||
|
||||
def run_async_in_browser_loop(coro):
|
||||
"""Run async coroutine using the dedicated browser steps event loop"""
|
||||
_ensure_browser_steps_loop()
|
||||
|
||||
if _browser_steps_loop and not _browser_steps_loop.is_closed():
|
||||
logger.debug("Browser steps using dedicated event loop")
|
||||
future = asyncio.run_coroutine_threadsafe(coro, _browser_steps_loop)
|
||||
return future.result()
|
||||
else:
|
||||
raise RuntimeError("Browser steps event loop is not available")
|
||||
|
||||
def cleanup_expired_sessions():
|
||||
"""Remove expired browsersteps sessions and cleanup their resources"""
|
||||
global browsersteps_sessions, browsersteps_watch_to_session
|
||||
|
||||
expired_session_ids = []
|
||||
|
||||
# Find expired sessions
|
||||
for session_id, session_data in browsersteps_sessions.items():
|
||||
browserstepper = session_data.get('browserstepper')
|
||||
if browserstepper and browserstepper.has_expired:
|
||||
expired_session_ids.append(session_id)
|
||||
|
||||
# Cleanup expired sessions
|
||||
for session_id in expired_session_ids:
|
||||
logger.debug(f"Cleaning up expired browsersteps session {session_id}")
|
||||
session_data = browsersteps_sessions[session_id]
|
||||
|
||||
# Cleanup playwright resources asynchronously
|
||||
browserstepper = session_data.get('browserstepper')
|
||||
if browserstepper:
|
||||
try:
|
||||
run_async_in_browser_loop(browserstepper.cleanup())
|
||||
except Exception as e:
|
||||
logger.error(f"Error cleaning up session {session_id}: {e}")
|
||||
|
||||
# Remove from sessions dict
|
||||
del browsersteps_sessions[session_id]
|
||||
|
||||
# Remove from watch mapping
|
||||
for watch_uuid, mapped_session_id in list(browsersteps_watch_to_session.items()):
|
||||
if mapped_session_id == session_id:
|
||||
del browsersteps_watch_to_session[watch_uuid]
|
||||
break
|
||||
|
||||
if expired_session_ids:
|
||||
logger.info(f"Cleaned up {len(expired_session_ids)} expired browsersteps session(s)")
|
||||
|
||||
def cleanup_session_for_watch(watch_uuid):
|
||||
"""Cleanup a specific browsersteps session for a watch UUID"""
|
||||
global browsersteps_sessions, browsersteps_watch_to_session
|
||||
|
||||
session_id = browsersteps_watch_to_session.get(watch_uuid)
|
||||
if not session_id:
|
||||
logger.debug(f"No browsersteps session found for watch {watch_uuid}")
|
||||
return
|
||||
|
||||
logger.debug(f"Cleaning up browsersteps session {session_id} for watch {watch_uuid}")
|
||||
|
||||
session_data = browsersteps_sessions.get(session_id)
|
||||
if session_data:
|
||||
browserstepper = session_data.get('browserstepper')
|
||||
if browserstepper:
|
||||
try:
|
||||
run_async_in_browser_loop(browserstepper.cleanup())
|
||||
except Exception as e:
|
||||
logger.error(f"Error cleaning up session {session_id} for watch {watch_uuid}: {e}")
|
||||
|
||||
# Remove from sessions dict
|
||||
del browsersteps_sessions[session_id]
|
||||
|
||||
# Remove from watch mapping
|
||||
del browsersteps_watch_to_session[watch_uuid]
|
||||
|
||||
logger.debug(f"Cleaned up session for watch {watch_uuid}")
|
||||
|
||||
# Opportunistically cleanup any other expired sessions
|
||||
cleanup_expired_sessions()
|
||||
|
||||
def construct_blueprint(datastore: ChangeDetectionStore):
|
||||
browser_steps_blueprint = Blueprint('browser_steps', __name__, template_folder="templates")
|
||||
@@ -123,6 +246,9 @@ def construct_blueprint(datastore: ChangeDetectionStore):
|
||||
if not watch_uuid:
|
||||
return make_response('No Watch UUID specified', 500)
|
||||
|
||||
# Cleanup any existing session for this watch
|
||||
cleanup_session_for_watch(watch_uuid)
|
||||
|
||||
logger.debug("Starting connection with playwright")
|
||||
logger.debug("browser_steps.py connecting")
|
||||
|
||||
@@ -131,6 +257,10 @@ def construct_blueprint(datastore: ChangeDetectionStore):
|
||||
browsersteps_sessions[browsersteps_session_id] = run_async_in_browser_loop(
|
||||
start_browsersteps_session(watch_uuid)
|
||||
)
|
||||
|
||||
# Store the mapping of watch_uuid -> browsersteps_session_id
|
||||
browsersteps_watch_to_session[watch_uuid] = browsersteps_session_id
|
||||
|
||||
except Exception as e:
|
||||
if 'ECONNREFUSED' in str(e):
|
||||
return make_response('Unable to start the Playwright Browser session, is sockpuppetbrowser running? Network configuration is OK?', 401)
|
||||
|
||||
@@ -50,7 +50,8 @@
|
||||
<td>{{ "{:,}".format(tag_count[uuid]) if uuid in tag_count else 0 }}</td>
|
||||
<td class="title-col inline"> <a href="{{url_for('watchlist.index', tag=uuid) }}">{{ tag.title }}</a></td>
|
||||
<td>
|
||||
<a class="pure-button pure-button-primary" href="{{ url_for('tags.form_tag_edit', uuid=uuid) }}">{{ _('Edit') }}</a>
|
||||
<a class="pure-button pure-button-primary" href="{{ url_for('tags.form_tag_edit', uuid=uuid) }}">{{ _('Edit') }}</a>
|
||||
<a href="{{ url_for('ui.form_watch_checknow', tag=uuid) }}" class="pure-button pure-button-primary" >{{ _('Recheck') }}</a>
|
||||
<a class="pure-button button-error"
|
||||
href="{{ url_for('tags.delete', uuid=uuid) }}"
|
||||
data-requires-confirm
|
||||
|
||||
@@ -238,6 +238,13 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, queuedWatchMe
|
||||
datastore.data['watching'][uuid] = watch_class(datastore_path=datastore.datastore_path, default=datastore.data['watching'][uuid])
|
||||
flash(gettext("Updated watch - unpaused!") if request.args.get('unpause_on_save') else gettext("Updated watch."))
|
||||
|
||||
# Cleanup any browsersteps session for this watch
|
||||
try:
|
||||
from changedetectionio.blueprint.browser_steps import cleanup_session_for_watch
|
||||
cleanup_session_for_watch(uuid)
|
||||
except Exception as e:
|
||||
logger.debug(f"Error cleaning up browsersteps session: {e}")
|
||||
|
||||
# Re #286 - We wait for syncing new data to disk in another thread every 60 seconds
|
||||
# But in the case something is added we should save straight away
|
||||
datastore.needs_write_urgent = True
|
||||
@@ -325,8 +332,6 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, queuedWatchMe
|
||||
'url': url_for('rss.rss_single_watch', uuid=watch['uuid'], token=app_rss_token)
|
||||
},
|
||||
'settings_application': datastore.data['settings']['application'],
|
||||
'system_has_playwright_configured': os.getenv('PLAYWRIGHT_DRIVER_URL'),
|
||||
'system_has_webdriver_configured': os.getenv('WEBDRIVER_URL'),
|
||||
'ui_edit_stats_extras': collect_ui_edit_stats_extras(watch),
|
||||
'visual_selector_data_ready': datastore.visualselector_data_is_ready(watch_uuid=uuid),
|
||||
'timezone_default_config': datastore.data['settings']['application'].get('scheduler_timezone_default'),
|
||||
|
||||
@@ -206,7 +206,7 @@ Math: {{ 1 + 1 }}") }}
|
||||
|
||||
<div class="tab-pane-inner" id="browser-steps">
|
||||
{% if capabilities.supports_browser_steps %}
|
||||
{% if visual_selector_data_ready %}
|
||||
{% if true %}
|
||||
<img class="beta-logo" src="{{url_for('static_content', group='images', filename='beta-logo.png')}}" alt="New beta functionality">
|
||||
<fieldset>
|
||||
<div class="pure-control-group">
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
import gc
|
||||
import json
|
||||
import os
|
||||
from urllib.parse import urlparse
|
||||
@@ -185,20 +186,33 @@ class fetcher(Fetcher):
|
||||
super().screenshot_step(step_n=step_n)
|
||||
screenshot = await capture_full_page_async(page=self.page, screenshot_format=self.screenshot_format)
|
||||
|
||||
# Request GC immediately after screenshot to free memory
|
||||
# Screenshots can be large and browser steps take many of them
|
||||
await self.page.request_gc()
|
||||
|
||||
if self.browser_steps_screenshot_path is not None:
|
||||
destination = os.path.join(self.browser_steps_screenshot_path, 'step_{}.jpeg'.format(step_n))
|
||||
logger.debug(f"Saving step screenshot to {destination}")
|
||||
with open(destination, 'wb') as f:
|
||||
f.write(screenshot)
|
||||
# Clear local reference to allow screenshot bytes to be collected
|
||||
del screenshot
|
||||
gc.collect()
|
||||
|
||||
async def save_step_html(self, step_n):
|
||||
super().save_step_html(step_n=step_n)
|
||||
content = await self.page.content()
|
||||
|
||||
# Request GC after getting page content
|
||||
await self.page.request_gc()
|
||||
|
||||
destination = os.path.join(self.browser_steps_screenshot_path, 'step_{}.html'.format(step_n))
|
||||
logger.debug(f"Saving step HTML to {destination}")
|
||||
with open(destination, 'w', encoding='utf-8') as f:
|
||||
f.write(content)
|
||||
# Clear local reference
|
||||
del content
|
||||
gc.collect()
|
||||
|
||||
async def run(self,
|
||||
fetch_favicon=True,
|
||||
@@ -305,6 +319,12 @@ class fetcher(Fetcher):
|
||||
|
||||
if self.status_code != 200 and not ignore_status_codes:
|
||||
screenshot = await capture_full_page_async(self.page, screenshot_format=self.screenshot_format)
|
||||
# Cleanup before raising to prevent memory leak
|
||||
await self.page.close()
|
||||
await context.close()
|
||||
await browser.close()
|
||||
# Force garbage collection to release Playwright resources immediately
|
||||
gc.collect()
|
||||
raise Non200ErrorCodeReceived(url=url, status_code=self.status_code, screenshot=screenshot)
|
||||
|
||||
if not empty_pages_are_a_change and len((await self.page.content()).strip()) == 0:
|
||||
@@ -313,48 +333,52 @@ class fetcher(Fetcher):
|
||||
await browser.close()
|
||||
raise EmptyReply(url=url, status_code=response.status)
|
||||
|
||||
# Run Browser Steps here
|
||||
if self.browser_steps_get_valid_steps():
|
||||
await self.iterate_browser_steps(start_url=url)
|
||||
|
||||
await self.page.wait_for_timeout(extra_wait * 1000)
|
||||
|
||||
now = time.time()
|
||||
# So we can find an element on the page where its selector was entered manually (maybe not xPath etc)
|
||||
if current_include_filters is not None:
|
||||
await self.page.evaluate("var include_filters={}".format(json.dumps(current_include_filters)))
|
||||
else:
|
||||
await self.page.evaluate("var include_filters=''")
|
||||
await self.page.request_gc()
|
||||
|
||||
# request_gc before and after evaluate to free up memory
|
||||
# @todo browsersteps etc
|
||||
MAX_TOTAL_HEIGHT = int(os.getenv("SCREENSHOT_MAX_HEIGHT", SCREENSHOT_MAX_HEIGHT_DEFAULT))
|
||||
self.xpath_data = await self.page.evaluate(XPATH_ELEMENT_JS, {
|
||||
"visualselector_xpath_selectors": visualselector_xpath_selectors,
|
||||
"max_height": MAX_TOTAL_HEIGHT
|
||||
})
|
||||
await self.page.request_gc()
|
||||
|
||||
self.instock_data = await self.page.evaluate(INSTOCK_DATA_JS)
|
||||
await self.page.request_gc()
|
||||
|
||||
self.content = await self.page.content()
|
||||
await self.page.request_gc()
|
||||
logger.debug(f"Scrape xPath element data in browser done in {time.time() - now:.2f}s")
|
||||
|
||||
|
||||
# Bug 3 in Playwright screenshot handling
|
||||
# Some bug where it gives the wrong screenshot size, but making a request with the clip set first seems to solve it
|
||||
# JPEG is better here because the screenshots can be very very large
|
||||
|
||||
# Screenshots also travel via the ws:// (websocket) meaning that the binary data is base64 encoded
|
||||
# which will significantly increase the IO size between the server and client, it's recommended to use the lowest
|
||||
# acceptable screenshot quality here
|
||||
# Wrap remaining operations in try/finally to ensure cleanup
|
||||
try:
|
||||
# Run Browser Steps here
|
||||
if self.browser_steps_get_valid_steps():
|
||||
await self.iterate_browser_steps(start_url=url)
|
||||
|
||||
await self.page.wait_for_timeout(extra_wait * 1000)
|
||||
|
||||
now = time.time()
|
||||
# So we can find an element on the page where its selector was entered manually (maybe not xPath etc)
|
||||
if current_include_filters is not None:
|
||||
await self.page.evaluate("var include_filters={}".format(json.dumps(current_include_filters)))
|
||||
else:
|
||||
await self.page.evaluate("var include_filters=''")
|
||||
await self.page.request_gc()
|
||||
|
||||
# request_gc before and after evaluate to free up memory
|
||||
# @todo browsersteps etc
|
||||
MAX_TOTAL_HEIGHT = int(os.getenv("SCREENSHOT_MAX_HEIGHT", SCREENSHOT_MAX_HEIGHT_DEFAULT))
|
||||
self.xpath_data = await self.page.evaluate(XPATH_ELEMENT_JS, {
|
||||
"visualselector_xpath_selectors": visualselector_xpath_selectors,
|
||||
"max_height": MAX_TOTAL_HEIGHT
|
||||
})
|
||||
await self.page.request_gc()
|
||||
|
||||
self.instock_data = await self.page.evaluate(INSTOCK_DATA_JS)
|
||||
await self.page.request_gc()
|
||||
|
||||
self.content = await self.page.content()
|
||||
await self.page.request_gc()
|
||||
logger.debug(f"Scrape xPath element data in browser done in {time.time() - now:.2f}s")
|
||||
|
||||
|
||||
# Bug 3 in Playwright screenshot handling
|
||||
# Some bug where it gives the wrong screenshot size, but making a request with the clip set first seems to solve it
|
||||
# JPEG is better here because the screenshots can be very very large
|
||||
|
||||
# Screenshots also travel via the ws:// (websocket) meaning that the binary data is base64 encoded
|
||||
# which will significantly increase the IO size between the server and client, it's recommended to use the lowest
|
||||
# acceptable screenshot quality here
|
||||
# The actual screenshot - this always base64 and needs decoding! horrible! huge CPU usage
|
||||
self.screenshot = await capture_full_page_async(page=self.page, screenshot_format=self.screenshot_format)
|
||||
|
||||
except ScreenshotUnavailable:
|
||||
# Re-raise screenshot unavailable exceptions
|
||||
raise
|
||||
except Exception as e:
|
||||
# It's likely the screenshot was too long/big and something crashed
|
||||
raise ScreenshotUnavailable(url=url, status_code=self.status_code)
|
||||
@@ -389,6 +413,10 @@ class fetcher(Fetcher):
|
||||
pass
|
||||
browser = None
|
||||
|
||||
# Force Python GC to release Playwright resources immediately
|
||||
# Playwright objects can have circular references that delay cleanup
|
||||
gc.collect()
|
||||
|
||||
|
||||
# Plugin registration for built-in fetcher
|
||||
class PlaywrightFetcherPlugin:
|
||||
|
||||
@@ -15,7 +15,7 @@ class fetcher(Fetcher):
|
||||
proxy_url = None
|
||||
|
||||
# Capability flags
|
||||
supports_browser_steps = True
|
||||
supports_browser_steps = False
|
||||
supports_screenshots = True
|
||||
supports_xpath_element_data = True
|
||||
|
||||
|
||||
@@ -144,7 +144,6 @@ def test_basic_browserstep(client, live_server, measure_memory_usage, datastore_
|
||||
|
||||
def test_non_200_errors_report_browsersteps(client, live_server, measure_memory_usage, datastore_path):
|
||||
|
||||
|
||||
four_o_four_url = url_for('test_endpoint', status_code=404, _external=True)
|
||||
four_o_four_url = four_o_four_url.replace('localhost.localdomain', 'cdio')
|
||||
four_o_four_url = four_o_four_url.replace('localhost', 'cdio')
|
||||
@@ -186,3 +185,65 @@ def test_non_200_errors_report_browsersteps(client, live_server, measure_memory_
|
||||
url_for("ui.form_delete", uuid="all"),
|
||||
follow_redirects=True
|
||||
)
|
||||
|
||||
def test_browsersteps_edit_UI_startsession(client, live_server, measure_memory_usage, datastore_path):
|
||||
|
||||
assert os.getenv('PLAYWRIGHT_DRIVER_URL'), "Needs PLAYWRIGHT_DRIVER_URL set for this test"
|
||||
|
||||
# Add a watch first
|
||||
test_url = url_for('test_interactive_html_endpoint', _external=True)
|
||||
test_url = test_url.replace('localhost.localdomain', 'cdio')
|
||||
test_url = test_url.replace('localhost', 'cdio')
|
||||
|
||||
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url, extras={'fetch_backend': 'html_webdriver', 'paused': True})
|
||||
|
||||
# Test starting a browsersteps session
|
||||
res = client.get(
|
||||
url_for("browser_steps.browsersteps_start_session", uuid=uuid),
|
||||
follow_redirects=True
|
||||
)
|
||||
|
||||
assert res.status_code == 200
|
||||
assert res.is_json
|
||||
json_data = res.get_json()
|
||||
assert 'browsersteps_session_id' in json_data
|
||||
assert json_data['browsersteps_session_id'] # Not empty
|
||||
|
||||
browsersteps_session_id = json_data['browsersteps_session_id']
|
||||
|
||||
# Verify the session exists in browsersteps_sessions
|
||||
from changedetectionio.blueprint.browser_steps import browsersteps_sessions, browsersteps_watch_to_session
|
||||
assert browsersteps_session_id in browsersteps_sessions
|
||||
assert uuid in browsersteps_watch_to_session
|
||||
assert browsersteps_watch_to_session[uuid] == browsersteps_session_id
|
||||
|
||||
# Verify browsersteps UI shows up on edit page
|
||||
res = client.get(url_for("ui.ui_edit.edit_page", uuid=uuid))
|
||||
assert b'browsersteps-click-start' in res.data, "Browsersteps manual UI shows up"
|
||||
|
||||
# Session should still exist after GET (not cleaned up yet)
|
||||
assert browsersteps_session_id in browsersteps_sessions
|
||||
assert uuid in browsersteps_watch_to_session
|
||||
|
||||
# Test cleanup happens on save (POST)
|
||||
res = client.post(
|
||||
url_for("ui.ui_edit.edit_page", uuid=uuid),
|
||||
data={
|
||||
"url": test_url,
|
||||
"tags": "",
|
||||
'fetch_backend': "html_webdriver",
|
||||
"time_between_check_use_default": "y",
|
||||
},
|
||||
follow_redirects=True
|
||||
)
|
||||
assert b"Updated watch" in res.data
|
||||
|
||||
# NOW verify the session was cleaned up after save
|
||||
assert browsersteps_session_id not in browsersteps_sessions
|
||||
assert uuid not in browsersteps_watch_to_session
|
||||
|
||||
# Cleanup
|
||||
client.get(
|
||||
url_for("ui.form_delete", uuid="all"),
|
||||
follow_redirects=True
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user