mirror of
https://github.com/dgtlmoon/changedetection.io.git
synced 2025-11-06 09:35:48 +00:00
Compare commits
1 Commits
docker-bui
...
adjustable
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
8aa675cbf0 |
@@ -1547,6 +1547,7 @@ def changedetection_app(config=None, datastore_o=None):
|
||||
# @todo handle ctrl break
|
||||
ticker_thread = threading.Thread(target=ticker_thread_check_time_launch_checks).start()
|
||||
threading.Thread(target=notification_runner).start()
|
||||
threading.Thread(target=thread_maintain_worker_thread_pool).start()
|
||||
|
||||
# Check for new release version, but not when running in test/build or pytest
|
||||
if not os.getenv("GITHUB_REF", False) and not config.get('disable_checkver') == True:
|
||||
@@ -1629,23 +1630,73 @@ def notification_runner():
|
||||
# Trim the log length
|
||||
notification_debug_log = notification_debug_log[-100:]
|
||||
|
||||
|
||||
def thread_maintain_worker_thread_pool():
|
||||
from changedetectionio import update_worker
|
||||
|
||||
n_workers = int(os.getenv("FETCH_WORKERS", datastore.data['settings']['requests']['workers']))
|
||||
|
||||
while not app.config.exit.is_set():
|
||||
needed_threads = n_workers if not running_update_threads else 0
|
||||
how_many_running_now = 0
|
||||
dead_threads = []
|
||||
|
||||
for i, t in enumerate(running_update_threads):
|
||||
if t.is_alive():
|
||||
how_many_running_now += 1
|
||||
else:
|
||||
dead_threads.append(i)
|
||||
|
||||
for i in dead_threads:
|
||||
del running_update_threads[i]
|
||||
|
||||
for _ in range(needed_threads - how_many_running_now):
|
||||
logger.info("Adding new worker thread")
|
||||
new_worker = update_worker.update_worker(update_q, notification_q, app, datastore)
|
||||
running_update_threads.append(new_worker)
|
||||
new_worker.start()
|
||||
|
||||
app.config.exit.wait(2)
|
||||
|
||||
|
||||
|
||||
def thread_maintain_worker_thread_pool():
|
||||
from changedetectionio import update_worker
|
||||
|
||||
logger.info("Starting thread pool worker maintainer thread")
|
||||
n_workers = int(os.getenv("FETCH_WORKERS", datastore.data['settings']['requests']['workers']))
|
||||
|
||||
while not app.config.exit.is_set():
|
||||
needed_threads = n_workers if not running_update_threads else 0
|
||||
how_many_running_now = 0
|
||||
dead_threads = []
|
||||
|
||||
for i, t in enumerate(running_update_threads):
|
||||
if t.is_alive():
|
||||
how_many_running_now += 1
|
||||
else:
|
||||
dead_threads.append(i)
|
||||
|
||||
for i in dead_threads:
|
||||
del running_update_threads[i]
|
||||
|
||||
for _ in range(needed_threads - how_many_running_now):
|
||||
logger.info("Adding new worker thread")
|
||||
new_worker = update_worker.update_worker(update_q, notification_q, app, datastore)
|
||||
running_update_threads.append(new_worker)
|
||||
new_worker.start()
|
||||
|
||||
app.config.exit.wait(2)
|
||||
|
||||
# Thread runner to check every minute, look for new watches to feed into the Queue.
|
||||
def ticker_thread_check_time_launch_checks():
|
||||
import random
|
||||
from changedetectionio import update_worker
|
||||
|
||||
proxy_last_called_time = {}
|
||||
|
||||
recheck_time_minimum_seconds = int(os.getenv('MINIMUM_SECONDS_RECHECK_TIME', 20))
|
||||
logger.debug(f"System env MINIMUM_SECONDS_RECHECK_TIME {recheck_time_minimum_seconds}")
|
||||
|
||||
# Spin up Workers that do the fetching
|
||||
# Can be overriden by ENV or use the default settings
|
||||
n_workers = int(os.getenv("FETCH_WORKERS", datastore.data['settings']['requests']['workers']))
|
||||
for _ in range(n_workers):
|
||||
new_worker = update_worker.update_worker(update_q, notification_q, app, datastore)
|
||||
running_update_threads.append(new_worker)
|
||||
new_worker.start()
|
||||
|
||||
while not app.config.exit.is_set():
|
||||
|
||||
@@ -1728,7 +1779,7 @@ def ticker_thread_check_time_launch_checks():
|
||||
priority = int(time.time())
|
||||
logger.debug(
|
||||
f"> Queued watch UUID {uuid} "
|
||||
f"last checked at {watch['last_checked']} "
|
||||
f"last checked at {watch['last_checked']} ({seconds_since_last_recheck} seconds ago!) recheck min was :{recheck_time_minimum_seconds} "
|
||||
f"queued at {now:0.2f} priority {priority} "
|
||||
f"jitter {watch.jitter_seconds:0.2f}s, "
|
||||
f"{now - watch['last_checked']:0.2f}s since last checked")
|
||||
|
||||
Reference in New Issue
Block a user