mirror of
https://github.com/dgtlmoon/changedetection.io.git
synced 2025-11-09 11:06:47 +00:00
Compare commits
1 Commits
3423-opena
...
adjustable
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
8aa675cbf0 |
@@ -1547,6 +1547,7 @@ def changedetection_app(config=None, datastore_o=None):
|
|||||||
# @todo handle ctrl break
|
# @todo handle ctrl break
|
||||||
ticker_thread = threading.Thread(target=ticker_thread_check_time_launch_checks).start()
|
ticker_thread = threading.Thread(target=ticker_thread_check_time_launch_checks).start()
|
||||||
threading.Thread(target=notification_runner).start()
|
threading.Thread(target=notification_runner).start()
|
||||||
|
threading.Thread(target=thread_maintain_worker_thread_pool).start()
|
||||||
|
|
||||||
# Check for new release version, but not when running in test/build or pytest
|
# Check for new release version, but not when running in test/build or pytest
|
||||||
if not os.getenv("GITHUB_REF", False) and not config.get('disable_checkver') == True:
|
if not os.getenv("GITHUB_REF", False) and not config.get('disable_checkver') == True:
|
||||||
@@ -1629,23 +1630,73 @@ def notification_runner():
|
|||||||
# Trim the log length
|
# Trim the log length
|
||||||
notification_debug_log = notification_debug_log[-100:]
|
notification_debug_log = notification_debug_log[-100:]
|
||||||
|
|
||||||
|
|
||||||
|
def thread_maintain_worker_thread_pool():
|
||||||
|
from changedetectionio import update_worker
|
||||||
|
|
||||||
|
n_workers = int(os.getenv("FETCH_WORKERS", datastore.data['settings']['requests']['workers']))
|
||||||
|
|
||||||
|
while not app.config.exit.is_set():
|
||||||
|
needed_threads = n_workers if not running_update_threads else 0
|
||||||
|
how_many_running_now = 0
|
||||||
|
dead_threads = []
|
||||||
|
|
||||||
|
for i, t in enumerate(running_update_threads):
|
||||||
|
if t.is_alive():
|
||||||
|
how_many_running_now += 1
|
||||||
|
else:
|
||||||
|
dead_threads.append(i)
|
||||||
|
|
||||||
|
for i in dead_threads:
|
||||||
|
del running_update_threads[i]
|
||||||
|
|
||||||
|
for _ in range(needed_threads - how_many_running_now):
|
||||||
|
logger.info("Adding new worker thread")
|
||||||
|
new_worker = update_worker.update_worker(update_q, notification_q, app, datastore)
|
||||||
|
running_update_threads.append(new_worker)
|
||||||
|
new_worker.start()
|
||||||
|
|
||||||
|
app.config.exit.wait(2)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
def thread_maintain_worker_thread_pool():
|
||||||
|
from changedetectionio import update_worker
|
||||||
|
|
||||||
|
logger.info("Starting thread pool worker maintainer thread")
|
||||||
|
n_workers = int(os.getenv("FETCH_WORKERS", datastore.data['settings']['requests']['workers']))
|
||||||
|
|
||||||
|
while not app.config.exit.is_set():
|
||||||
|
needed_threads = n_workers if not running_update_threads else 0
|
||||||
|
how_many_running_now = 0
|
||||||
|
dead_threads = []
|
||||||
|
|
||||||
|
for i, t in enumerate(running_update_threads):
|
||||||
|
if t.is_alive():
|
||||||
|
how_many_running_now += 1
|
||||||
|
else:
|
||||||
|
dead_threads.append(i)
|
||||||
|
|
||||||
|
for i in dead_threads:
|
||||||
|
del running_update_threads[i]
|
||||||
|
|
||||||
|
for _ in range(needed_threads - how_many_running_now):
|
||||||
|
logger.info("Adding new worker thread")
|
||||||
|
new_worker = update_worker.update_worker(update_q, notification_q, app, datastore)
|
||||||
|
running_update_threads.append(new_worker)
|
||||||
|
new_worker.start()
|
||||||
|
|
||||||
|
app.config.exit.wait(2)
|
||||||
|
|
||||||
# Thread runner to check every minute, look for new watches to feed into the Queue.
|
# Thread runner to check every minute, look for new watches to feed into the Queue.
|
||||||
def ticker_thread_check_time_launch_checks():
|
def ticker_thread_check_time_launch_checks():
|
||||||
import random
|
import random
|
||||||
from changedetectionio import update_worker
|
|
||||||
|
|
||||||
proxy_last_called_time = {}
|
proxy_last_called_time = {}
|
||||||
|
|
||||||
recheck_time_minimum_seconds = int(os.getenv('MINIMUM_SECONDS_RECHECK_TIME', 20))
|
recheck_time_minimum_seconds = int(os.getenv('MINIMUM_SECONDS_RECHECK_TIME', 20))
|
||||||
logger.debug(f"System env MINIMUM_SECONDS_RECHECK_TIME {recheck_time_minimum_seconds}")
|
logger.debug(f"System env MINIMUM_SECONDS_RECHECK_TIME {recheck_time_minimum_seconds}")
|
||||||
|
|
||||||
# Spin up Workers that do the fetching
|
|
||||||
# Can be overriden by ENV or use the default settings
|
|
||||||
n_workers = int(os.getenv("FETCH_WORKERS", datastore.data['settings']['requests']['workers']))
|
|
||||||
for _ in range(n_workers):
|
|
||||||
new_worker = update_worker.update_worker(update_q, notification_q, app, datastore)
|
|
||||||
running_update_threads.append(new_worker)
|
|
||||||
new_worker.start()
|
|
||||||
|
|
||||||
while not app.config.exit.is_set():
|
while not app.config.exit.is_set():
|
||||||
|
|
||||||
@@ -1728,7 +1779,7 @@ def ticker_thread_check_time_launch_checks():
|
|||||||
priority = int(time.time())
|
priority = int(time.time())
|
||||||
logger.debug(
|
logger.debug(
|
||||||
f"> Queued watch UUID {uuid} "
|
f"> Queued watch UUID {uuid} "
|
||||||
f"last checked at {watch['last_checked']} "
|
f"last checked at {watch['last_checked']} ({seconds_since_last_recheck} seconds ago!) recheck min was :{recheck_time_minimum_seconds} "
|
||||||
f"queued at {now:0.2f} priority {priority} "
|
f"queued at {now:0.2f} priority {priority} "
|
||||||
f"jitter {watch.jitter_seconds:0.2f}s, "
|
f"jitter {watch.jitter_seconds:0.2f}s, "
|
||||||
f"{now - watch['last_checked']:0.2f}s since last checked")
|
f"{now - watch['last_checked']:0.2f}s since last checked")
|
||||||
|
|||||||
Reference in New Issue
Block a user