mirror of
https://github.com/dgtlmoon/changedetection.io.git
synced 2025-12-02 14:22:34 +00:00
Compare commits
7 Commits
UI-eta-tim
...
2782-clone
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
7b3d054a4a | ||
|
|
3d17a85c79 | ||
|
|
694a8e2fe7 | ||
|
|
6c1b687cd1 | ||
|
|
e850540a91 | ||
|
|
d4bc9dfc50 | ||
|
|
f26ea55e9c |
@@ -2,7 +2,7 @@
|
||||
|
||||
# Read more https://github.com/dgtlmoon/changedetection.io/wiki
|
||||
|
||||
__version__ = '0.49.8'
|
||||
__version__ = '0.49.9'
|
||||
|
||||
from changedetectionio.strtobool import strtobool
|
||||
from json.decoder import JSONDecodeError
|
||||
|
||||
@@ -1,102 +1 @@
|
||||
import time
|
||||
import datetime
|
||||
import pytz
|
||||
from flask import Blueprint, make_response, request, url_for
|
||||
from loguru import logger
|
||||
from feedgen.feed import FeedGenerator
|
||||
|
||||
from changedetectionio.store import ChangeDetectionStore
|
||||
from changedetectionio.safe_jinja import render as jinja_render
|
||||
|
||||
def construct_blueprint(datastore: ChangeDetectionStore):
|
||||
rss_blueprint = Blueprint('rss', __name__)
|
||||
|
||||
# Import the login decorator if needed
|
||||
# from changedetectionio.auth_decorator import login_optionally_required
|
||||
@rss_blueprint.route("", methods=['GET'])
|
||||
def feed():
|
||||
now = time.time()
|
||||
# Always requires token set
|
||||
app_rss_token = datastore.data['settings']['application'].get('rss_access_token')
|
||||
rss_url_token = request.args.get('token')
|
||||
if rss_url_token != app_rss_token:
|
||||
return "Access denied, bad token", 403
|
||||
|
||||
from changedetectionio import diff
|
||||
limit_tag = request.args.get('tag', '').lower().strip()
|
||||
# Be sure limit_tag is a uuid
|
||||
for uuid, tag in datastore.data['settings']['application'].get('tags', {}).items():
|
||||
if limit_tag == tag.get('title', '').lower().strip():
|
||||
limit_tag = uuid
|
||||
|
||||
# Sort by last_changed and add the uuid which is usually the key..
|
||||
sorted_watches = []
|
||||
|
||||
# @todo needs a .itemsWithTag() or something - then we can use that in Jinaj2 and throw this away
|
||||
for uuid, watch in datastore.data['watching'].items():
|
||||
# @todo tag notification_muted skip also (improve Watch model)
|
||||
if datastore.data['settings']['application'].get('rss_hide_muted_watches') and watch.get('notification_muted'):
|
||||
continue
|
||||
if limit_tag and not limit_tag in watch['tags']:
|
||||
continue
|
||||
watch['uuid'] = uuid
|
||||
sorted_watches.append(watch)
|
||||
|
||||
sorted_watches.sort(key=lambda x: x.last_changed, reverse=False)
|
||||
|
||||
fg = FeedGenerator()
|
||||
fg.title('changedetection.io')
|
||||
fg.description('Feed description')
|
||||
fg.link(href='https://changedetection.io')
|
||||
|
||||
for watch in sorted_watches:
|
||||
|
||||
dates = list(watch.history.keys())
|
||||
# Re #521 - Don't bother processing this one if theres less than 2 snapshots, means we never had a change detected.
|
||||
if len(dates) < 2:
|
||||
continue
|
||||
|
||||
if not watch.viewed:
|
||||
# Re #239 - GUID needs to be individual for each event
|
||||
# @todo In the future make this a configurable link back (see work on BASE_URL https://github.com/dgtlmoon/changedetection.io/pull/228)
|
||||
guid = "{}/{}".format(watch['uuid'], watch.last_changed)
|
||||
fe = fg.add_entry()
|
||||
|
||||
# Include a link to the diff page, they will have to login here to see if password protection is enabled.
|
||||
# Description is the page you watch, link takes you to the diff JS UI page
|
||||
# Dict val base_url will get overriden with the env var if it is set.
|
||||
ext_base_url = datastore.data['settings']['application'].get('active_base_url')
|
||||
|
||||
# Because we are called via whatever web server, flask should figure out the right path (
|
||||
diff_link = {'href': url_for('ui.ui_views.diff_history_page', uuid=watch['uuid'], _external=True)}
|
||||
|
||||
fe.link(link=diff_link)
|
||||
|
||||
# @todo watch should be a getter - watch.get('title') (internally if URL else..)
|
||||
|
||||
watch_title = watch.get('title') if watch.get('title') else watch.get('url')
|
||||
fe.title(title=watch_title)
|
||||
|
||||
html_diff = diff.render_diff(previous_version_file_contents=watch.get_history_snapshot(dates[-2]),
|
||||
newest_version_file_contents=watch.get_history_snapshot(dates[-1]),
|
||||
include_equal=False,
|
||||
line_feed_sep="<br>")
|
||||
|
||||
# @todo Make this configurable and also consider html-colored markup
|
||||
# @todo User could decide if <link> goes to the diff page, or to the watch link
|
||||
rss_template = "<html><body>\n<h4><a href=\"{{watch_url}}\">{{watch_title}}</a></h4>\n<p>{{html_diff}}</p>\n</body></html>\n"
|
||||
content = jinja_render(template_str=rss_template, watch_title=watch_title, html_diff=html_diff, watch_url=watch.link)
|
||||
|
||||
fe.content(content=content, type='CDATA')
|
||||
|
||||
fe.guid(guid, permalink=False)
|
||||
dt = datetime.datetime.fromtimestamp(int(watch.newest_history_key))
|
||||
dt = dt.replace(tzinfo=pytz.UTC)
|
||||
fe.pubDate(dt)
|
||||
|
||||
response = make_response(fg.rss_str())
|
||||
response.headers.set('Content-Type', 'application/rss+xml;charset=utf-8')
|
||||
logger.trace(f"RSS generated in {time.time() - now:.3f}s")
|
||||
return response
|
||||
|
||||
return rss_blueprint
|
||||
RSS_FORMAT_TYPES = [('plaintext', 'Plain text'), ('html', 'HTML Color')]
|
||||
|
||||
147
changedetectionio/blueprint/rss/blueprint.py
Normal file
147
changedetectionio/blueprint/rss/blueprint.py
Normal file
@@ -0,0 +1,147 @@
|
||||
|
||||
from changedetectionio.safe_jinja import render as jinja_render
|
||||
from changedetectionio.store import ChangeDetectionStore
|
||||
from feedgen.feed import FeedGenerator
|
||||
from flask import Blueprint, make_response, request, url_for, redirect
|
||||
from loguru import logger
|
||||
import datetime
|
||||
import pytz
|
||||
import re
|
||||
import time
|
||||
|
||||
|
||||
BAD_CHARS_REGEX=r'[\x00-\x08\x0B\x0C\x0E-\x1F]'
|
||||
|
||||
# Anything that is not text/UTF-8 should be stripped before it breaks feedgen (such as binary data etc)
|
||||
def scan_invalid_chars_in_rss(content):
|
||||
for match in re.finditer(BAD_CHARS_REGEX, content):
|
||||
i = match.start()
|
||||
bad_char = content[i]
|
||||
hex_value = f"0x{ord(bad_char):02x}"
|
||||
# Grab context
|
||||
start = max(0, i - 20)
|
||||
end = min(len(content), i + 21)
|
||||
context = content[start:end].replace('\n', '\\n').replace('\r', '\\r')
|
||||
logger.warning(f"Invalid char {hex_value} at pos {i}: ...{context}...")
|
||||
# First match is enough
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
|
||||
def clean_entry_content(content):
|
||||
cleaned = re.sub(BAD_CHARS_REGEX, '', content)
|
||||
return cleaned
|
||||
|
||||
def construct_blueprint(datastore: ChangeDetectionStore):
|
||||
rss_blueprint = Blueprint('rss', __name__)
|
||||
|
||||
# Some RSS reader situations ended up with rss/ (forward slash after RSS) due
|
||||
# to some earlier blueprint rerouting work, it should goto feed.
|
||||
@rss_blueprint.route("/", methods=['GET'])
|
||||
def extraslash():
|
||||
return redirect(url_for('rss.feed'))
|
||||
|
||||
# Import the login decorator if needed
|
||||
# from changedetectionio.auth_decorator import login_optionally_required
|
||||
@rss_blueprint.route("", methods=['GET'])
|
||||
def feed():
|
||||
now = time.time()
|
||||
# Always requires token set
|
||||
app_rss_token = datastore.data['settings']['application'].get('rss_access_token')
|
||||
rss_url_token = request.args.get('token')
|
||||
if rss_url_token != app_rss_token:
|
||||
return "Access denied, bad token", 403
|
||||
|
||||
from changedetectionio import diff
|
||||
limit_tag = request.args.get('tag', '').lower().strip()
|
||||
# Be sure limit_tag is a uuid
|
||||
for uuid, tag in datastore.data['settings']['application'].get('tags', {}).items():
|
||||
if limit_tag == tag.get('title', '').lower().strip():
|
||||
limit_tag = uuid
|
||||
|
||||
# Sort by last_changed and add the uuid which is usually the key..
|
||||
sorted_watches = []
|
||||
|
||||
# @todo needs a .itemsWithTag() or something - then we can use that in Jinaj2 and throw this away
|
||||
for uuid, watch in datastore.data['watching'].items():
|
||||
# @todo tag notification_muted skip also (improve Watch model)
|
||||
if datastore.data['settings']['application'].get('rss_hide_muted_watches') and watch.get('notification_muted'):
|
||||
continue
|
||||
if limit_tag and not limit_tag in watch['tags']:
|
||||
continue
|
||||
watch['uuid'] = uuid
|
||||
sorted_watches.append(watch)
|
||||
|
||||
sorted_watches.sort(key=lambda x: x.last_changed, reverse=False)
|
||||
|
||||
fg = FeedGenerator()
|
||||
fg.title('changedetection.io')
|
||||
fg.description('Feed description')
|
||||
fg.link(href='https://changedetection.io')
|
||||
|
||||
html_colour_enable = False
|
||||
if datastore.data['settings']['application'].get('rss_content_format') == 'html':
|
||||
html_colour_enable = True
|
||||
|
||||
for watch in sorted_watches:
|
||||
|
||||
dates = list(watch.history.keys())
|
||||
# Re #521 - Don't bother processing this one if theres less than 2 snapshots, means we never had a change detected.
|
||||
if len(dates) < 2:
|
||||
continue
|
||||
|
||||
if not watch.viewed:
|
||||
# Re #239 - GUID needs to be individual for each event
|
||||
# @todo In the future make this a configurable link back (see work on BASE_URL https://github.com/dgtlmoon/changedetection.io/pull/228)
|
||||
guid = "{}/{}".format(watch['uuid'], watch.last_changed)
|
||||
fe = fg.add_entry()
|
||||
|
||||
# Include a link to the diff page, they will have to login here to see if password protection is enabled.
|
||||
# Description is the page you watch, link takes you to the diff JS UI page
|
||||
# Dict val base_url will get overriden with the env var if it is set.
|
||||
ext_base_url = datastore.data['settings']['application'].get('active_base_url')
|
||||
# @todo fix
|
||||
|
||||
# Because we are called via whatever web server, flask should figure out the right path (
|
||||
diff_link = {'href': url_for('ui.ui_views.diff_history_page', uuid=watch['uuid'], _external=True)}
|
||||
|
||||
fe.link(link=diff_link)
|
||||
|
||||
# @todo watch should be a getter - watch.get('title') (internally if URL else..)
|
||||
|
||||
watch_title = watch.get('title') if watch.get('title') else watch.get('url')
|
||||
fe.title(title=watch_title)
|
||||
try:
|
||||
|
||||
html_diff = diff.render_diff(previous_version_file_contents=watch.get_history_snapshot(dates[-2]),
|
||||
newest_version_file_contents=watch.get_history_snapshot(dates[-1]),
|
||||
include_equal=False,
|
||||
line_feed_sep="<br>",
|
||||
html_colour=html_colour_enable
|
||||
)
|
||||
except FileNotFoundError as e:
|
||||
html_diff = f"History snapshot file for watch {watch.get('uuid')}@{watch.last_changed} - '{watch.get('title')} not found."
|
||||
|
||||
# @todo Make this configurable and also consider html-colored markup
|
||||
# @todo User could decide if <link> goes to the diff page, or to the watch link
|
||||
rss_template = "<html><body>\n<h4><a href=\"{{watch_url}}\">{{watch_title}}</a></h4>\n<p>{{html_diff}}</p>\n</body></html>\n"
|
||||
|
||||
content = jinja_render(template_str=rss_template, watch_title=watch_title, html_diff=html_diff, watch_url=watch.link)
|
||||
|
||||
# Out of range chars could also break feedgen
|
||||
if scan_invalid_chars_in_rss(content):
|
||||
content = clean_entry_content(content)
|
||||
|
||||
fe.content(content=content, type='CDATA')
|
||||
fe.guid(guid, permalink=False)
|
||||
dt = datetime.datetime.fromtimestamp(int(watch.newest_history_key))
|
||||
dt = dt.replace(tzinfo=pytz.UTC)
|
||||
fe.pubDate(dt)
|
||||
|
||||
response = make_response(fg.rss_str())
|
||||
response.headers.set('Content-Type', 'application/rss+xml;charset=utf-8')
|
||||
logger.trace(f"RSS generated in {time.time() - now:.3f}s")
|
||||
return response
|
||||
|
||||
return rss_blueprint
|
||||
@@ -78,7 +78,10 @@
|
||||
{{ render_field(form.application.form.pager_size) }}
|
||||
<span class="pure-form-message-inline">Number of items per page in the watch overview list, 0 to disable.</span>
|
||||
</div>
|
||||
|
||||
<div class="pure-control-group">
|
||||
{{ render_field(form.application.form.rss_content_format) }}
|
||||
<span class="pure-form-message-inline">Love RSS? Does your reader support HTML? Set it here</span>
|
||||
</div>
|
||||
<div class="pure-control-group">
|
||||
{{ render_checkbox_field(form.application.form.extract_title_as_title) }}
|
||||
<span class="pure-form-message-inline">Note: This will automatically apply to all existing watches.</span>
|
||||
|
||||
@@ -96,12 +96,13 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, running_updat
|
||||
uuid = list(datastore.data['watching'].keys()).pop()
|
||||
|
||||
new_uuid = datastore.clone(uuid)
|
||||
if new_uuid:
|
||||
if not datastore.data['watching'].get(uuid).get('paused'):
|
||||
update_q.put(queuedWatchMetaData.PrioritizedItem(priority=5, item={'uuid': new_uuid}))
|
||||
flash('Cloned.')
|
||||
|
||||
return redirect(url_for('watchlist.index'))
|
||||
if not datastore.data['watching'].get(uuid).get('paused'):
|
||||
update_q.put(queuedWatchMetaData.PrioritizedItem(priority=5, item={'uuid': new_uuid}))
|
||||
|
||||
flash('Cloned, you are editing the new watch.')
|
||||
|
||||
return redirect(url_for("ui.ui_edit.edit_page", uuid=new_uuid))
|
||||
|
||||
@ui_blueprint.route("/checknow", methods=['GET'])
|
||||
@login_optionally_required
|
||||
|
||||
@@ -433,7 +433,7 @@ def changedetection_app(config=None, datastore_o=None):
|
||||
import changedetectionio.conditions.blueprint as conditions
|
||||
app.register_blueprint(conditions.construct_blueprint(datastore), url_prefix='/conditions')
|
||||
|
||||
import changedetectionio.blueprint.rss as rss
|
||||
import changedetectionio.blueprint.rss.blueprint as rss
|
||||
app.register_blueprint(rss.construct_blueprint(datastore), url_prefix='/rss')
|
||||
|
||||
# watchlist UI buttons etc
|
||||
|
||||
@@ -3,6 +3,7 @@ import re
|
||||
from loguru import logger
|
||||
from wtforms.widgets.core import TimeInput
|
||||
|
||||
from changedetectionio.blueprint.rss import RSS_FORMAT_TYPES
|
||||
from changedetectionio.conditions.form import ConditionFormRow
|
||||
from changedetectionio.strtobool import strtobool
|
||||
|
||||
@@ -739,6 +740,9 @@ class globalSettingsApplicationForm(commonSettingsForm):
|
||||
render_kw={"style": "width: 5em;"},
|
||||
validators=[validators.NumberRange(min=0,
|
||||
message="Should be atleast zero (disabled)")])
|
||||
|
||||
rss_content_format = SelectField('RSS Content format', choices=RSS_FORMAT_TYPES)
|
||||
|
||||
removepassword_button = SubmitField('Remove password', render_kw={"class": "pure-button pure-button-primary"})
|
||||
render_anchor_tag_content = BooleanField('Render anchor tag content', default=False)
|
||||
shared_diff_access = BooleanField('Allow access to view diff page when password is enabled', default=False, validators=[validators.Optional()])
|
||||
|
||||
@@ -1,4 +1,7 @@
|
||||
from os import getenv
|
||||
|
||||
from changedetectionio.blueprint.rss import RSS_FORMAT_TYPES
|
||||
|
||||
from changedetectionio.notification import (
|
||||
default_notification_body,
|
||||
default_notification_format,
|
||||
@@ -9,6 +12,8 @@ from changedetectionio.notification import (
|
||||
_FILTER_FAILURE_THRESHOLD_ATTEMPTS_DEFAULT = 6
|
||||
DEFAULT_SETTINGS_HEADERS_USERAGENT='Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/87.0.4280.66 Safari/537.36'
|
||||
|
||||
|
||||
|
||||
class model(dict):
|
||||
base_config = {
|
||||
'note': "Hello! If you change this file manually, please be sure to restart your changedetection.io instance!",
|
||||
@@ -48,6 +53,7 @@ class model(dict):
|
||||
'password': False,
|
||||
'render_anchor_tag_content': False,
|
||||
'rss_access_token': None,
|
||||
'rss_content_format': RSS_FORMAT_TYPES[0][0],
|
||||
'rss_hide_muted_watches': True,
|
||||
'schema_version' : 0,
|
||||
'shared_diff_access': False,
|
||||
|
||||
@@ -251,8 +251,14 @@ class ChangeDetectionStore:
|
||||
# Clone a watch by UUID
|
||||
def clone(self, uuid):
|
||||
url = self.data['watching'][uuid].get('url')
|
||||
extras = self.data['watching'][uuid]
|
||||
extras = deepcopy(self.data['watching'][uuid])
|
||||
new_uuid = self.add_watch(url=url, extras=extras)
|
||||
watch = self.data['watching'][new_uuid]
|
||||
|
||||
if self.data['settings']['application'].get('extract_title_as_title') or watch['extract_title_as_title']:
|
||||
# Because it will be recalculated on the next fetch
|
||||
self.data['watching'][new_uuid]['title'] = None
|
||||
|
||||
return new_uuid
|
||||
|
||||
def url_exists(self, url):
|
||||
@@ -363,7 +369,6 @@ class ChangeDetectionStore:
|
||||
new_watch.ensure_data_dir_exists()
|
||||
self.__data['watching'][new_uuid] = new_watch
|
||||
|
||||
|
||||
if write_to_disk_now:
|
||||
self.sync_to_json()
|
||||
|
||||
|
||||
@@ -157,15 +157,13 @@
|
||||
<h4>Try our Chrome extension</h4>
|
||||
<p>
|
||||
<a id="chrome-extension-link"
|
||||
title="Try our new Chrome Extension!"
|
||||
title="Chrome Extension - Web Page Change Detection with changedetection.io!"
|
||||
href="https://chromewebstore.google.com/detail/changedetectionio-website/kefcfmgmlhmankjmnbijimhofdjekbop">
|
||||
<img alt="Chrome store icon" src="{{url_for('static_content', group='images', filename='Google-Chrome-icon.png')}}">
|
||||
Chrome Webstore
|
||||
</a>
|
||||
</p>
|
||||
|
||||
Easily add the current web-page from your browser directly into your changedetection.io tool, more great features coming soon!
|
||||
|
||||
<h4>Changedetection.io needs your support!</h4>
|
||||
<p>
|
||||
You can help us by supporting changedetection.io on these platforms;
|
||||
@@ -173,17 +171,20 @@
|
||||
<p>
|
||||
<ul>
|
||||
<li>
|
||||
<a href="https://alternativeto.net/software/changedetection-io/about/">Rate us at
|
||||
<a href="https://alternativeto.net/software/changedetection-io/about/" title="Web page change detection at alternativeto.net">Rate us at
|
||||
AlternativeTo.net</a>
|
||||
</li>
|
||||
<li>
|
||||
<a href="https://github.com/dgtlmoon/changedetection.io">Star us on GitHub</a>
|
||||
<a href="https://github.com/dgtlmoon/changedetection.io" title="Web page change detection on GitHub">Star us on GitHub</a>
|
||||
</li>
|
||||
<li>
|
||||
<a href="https://twitter.com/change_det_io">Follow us at Twitter/X</a>
|
||||
<a rel="nofollow" href="https://twitter.com/change_det_io" title="Web page change detection on Twitter">Follow us at Twitter/X</a>
|
||||
</li>
|
||||
<li>
|
||||
<a href="https://www.linkedin.com/company/changedetection-io">Check us out on LinkedIn</a>
|
||||
<a rel="nofollow" href="https://www.g2.com/products/changedetection-io/reviews" title="Web page change detection reviews at G2">G2 Software reviews</a>
|
||||
</li>
|
||||
<li>
|
||||
<a rel="nofollow" href="https://www.linkedin.com/company/changedetection-io" title="Visit web page change detection at LinkedIn">Check us out on LinkedIn</a>
|
||||
</li>
|
||||
<li>
|
||||
And tell your friends and colleagues :)
|
||||
|
||||
@@ -588,10 +588,10 @@ keyword") }}
|
||||
{{ render_button(form.save_button) }}
|
||||
<a href="{{url_for('ui.form_delete', uuid=uuid)}}"
|
||||
class="pure-button button-small button-error ">Delete</a>
|
||||
<a href="{{url_for('ui.clear_watch_history', uuid=uuid)}}"
|
||||
class="pure-button button-small button-error ">Clear History</a>
|
||||
{% if watch.history_n %}<a href="{{url_for('ui.clear_watch_history', uuid=uuid)}}"
|
||||
class="pure-button button-small button-error ">Clear History</a>{% endif %}
|
||||
<a href="{{url_for('ui.form_clone', uuid=uuid)}}"
|
||||
class="pure-button button-small ">Create Copy</a>
|
||||
class="pure-button button-small ">Clone & Edit</a>
|
||||
</div>
|
||||
</div>
|
||||
</form>
|
||||
|
||||
@@ -2,29 +2,39 @@
|
||||
|
||||
import time
|
||||
from flask import url_for
|
||||
from . util import live_server_setup
|
||||
from .util import live_server_setup, wait_for_all_checks
|
||||
|
||||
|
||||
|
||||
def test_trigger_functionality(client, live_server, measure_memory_usage):
|
||||
def test_clone_functionality(client, live_server, measure_memory_usage):
|
||||
|
||||
live_server_setup(live_server)
|
||||
with open("test-datastore/endpoint-content.txt", "w") as f:
|
||||
f.write("<html><body>Some content</body></html>")
|
||||
|
||||
# Give the endpoint time to spin up
|
||||
time.sleep(1)
|
||||
test_url = url_for('test_endpoint', _external=True)
|
||||
|
||||
# Add our URL to the import page
|
||||
res = client.post(
|
||||
url_for("imports.import_page"),
|
||||
data={"urls": "https://changedetection.io"},
|
||||
data={"urls": test_url},
|
||||
follow_redirects=True
|
||||
)
|
||||
assert b"1 Imported" in res.data
|
||||
wait_for_all_checks(client)
|
||||
|
||||
# So that we can be sure the same history doesnt carry over
|
||||
time.sleep(1)
|
||||
|
||||
res = client.get(
|
||||
url_for("ui.form_clone", uuid="first"),
|
||||
follow_redirects=True
|
||||
)
|
||||
existing_uuids = set()
|
||||
|
||||
assert b"Cloned." in res.data
|
||||
for uuid, watch in live_server.app.config['DATASTORE'].data['watching'].items():
|
||||
new_uuids = set(watch.history.keys())
|
||||
duplicates = existing_uuids.intersection(new_uuids)
|
||||
assert len(duplicates) == 0
|
||||
existing_uuids.update(new_uuids)
|
||||
|
||||
assert b"Cloned" in res.data
|
||||
|
||||
@@ -273,6 +273,7 @@ def test_limit_tag_ui(client, live_server, measure_memory_usage):
|
||||
assert b'Deleted' in res.data
|
||||
res = client.get(url_for("tags.delete_all"), follow_redirects=True)
|
||||
assert b'All tags deleted' in res.data
|
||||
|
||||
def test_clone_tag_on_import(client, live_server, measure_memory_usage):
|
||||
#live_server_setup(live_server)
|
||||
test_url = url_for('test_endpoint', _external=True)
|
||||
@@ -292,6 +293,7 @@ def test_clone_tag_on_import(client, live_server, measure_memory_usage):
|
||||
res = client.get(url_for("ui.form_clone", uuid=watch_uuid), follow_redirects=True)
|
||||
|
||||
assert b'Cloned' in res.data
|
||||
res = client.get(url_for("watchlist.index"))
|
||||
# 2 times plus the top link to tag
|
||||
assert res.data.count(b'test-tag') == 3
|
||||
assert res.data.count(b'another-tag') == 3
|
||||
@@ -317,8 +319,9 @@ def test_clone_tag_on_quickwatchform_add(client, live_server, measure_memory_usa
|
||||
|
||||
watch_uuid = next(iter(live_server.app.config['DATASTORE'].data['watching']))
|
||||
res = client.get(url_for("ui.form_clone", uuid=watch_uuid), follow_redirects=True)
|
||||
|
||||
assert b'Cloned' in res.data
|
||||
|
||||
res = client.get(url_for("watchlist.index"))
|
||||
# 2 times plus the top link to tag
|
||||
assert res.data.count(b'test-tag') == 3
|
||||
assert res.data.count(b'another-tag') == 3
|
||||
|
||||
@@ -49,6 +49,22 @@ def set_original_cdata_xml():
|
||||
f.write(test_return_data)
|
||||
|
||||
|
||||
|
||||
def set_html_content(content):
|
||||
test_return_data = f"""<html>
|
||||
<body>
|
||||
Some initial text<br>
|
||||
<p>{content}</p>
|
||||
<br>
|
||||
So let's see what happens. <br>
|
||||
</body>
|
||||
</html>
|
||||
"""
|
||||
|
||||
# Write as UTF-8 encoded bytes
|
||||
with open("test-datastore/endpoint-content.txt", "wb") as f:
|
||||
f.write(test_return_data.encode('utf-8'))
|
||||
|
||||
def test_setup(client, live_server, measure_memory_usage):
|
||||
live_server_setup(live_server)
|
||||
|
||||
@@ -164,3 +180,58 @@ def test_rss_xpath_filtering(client, live_server, measure_memory_usage):
|
||||
assert b'Some other description' not in res.data # Should NOT be selected by the xpath
|
||||
|
||||
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
|
||||
|
||||
|
||||
def test_rss_bad_chars_breaking(client, live_server):
|
||||
"""This should absolutely trigger the RSS builder to go into worst state mode
|
||||
|
||||
- source: prefix means no html conversion (which kinda filters out the bad stuff)
|
||||
- Binary data
|
||||
- Very long so that the saving is performed by Brotli (and decoded back to bytes)
|
||||
|
||||
Otherwise feedgen should support regular unicode
|
||||
"""
|
||||
#live_server_setup(live_server)
|
||||
|
||||
with open("test-datastore/endpoint-content.txt", "w") as f:
|
||||
ten_kb_string = "A" * 10_000
|
||||
f.write(ten_kb_string)
|
||||
|
||||
test_url = url_for('test_endpoint', _external=True)
|
||||
res = client.post(
|
||||
url_for("imports.import_page"),
|
||||
data={"urls": "source:"+test_url},
|
||||
follow_redirects=True
|
||||
)
|
||||
assert b"1 Imported" in res.data
|
||||
wait_for_all_checks(client)
|
||||
|
||||
# Set the bad content
|
||||
with open("test-datastore/endpoint-content.txt", "w") as f:
|
||||
jpeg_bytes = "\xff\xd8\xff\xe0\x00\x10XXXXXXXX\x00\x01\x02\x00\x00\x01\x00\x01\x00\x00" # JPEG header
|
||||
jpeg_bytes += "A" * 10_000
|
||||
|
||||
f.write(jpeg_bytes)
|
||||
|
||||
res = client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
|
||||
assert b'Queued 1 watch for rechecking.' in res.data
|
||||
wait_for_all_checks(client)
|
||||
rss_token = extract_rss_token_from_UI(client)
|
||||
|
||||
uuid = next(iter(live_server.app.config['DATASTORE'].data['watching']))
|
||||
assert live_server.app.config['DATASTORE'].data['watching'][uuid].history_n == 2
|
||||
|
||||
# Check RSS feed is still working
|
||||
res = client.get(
|
||||
url_for("rss.feed", uuid=uuid, token=rss_token),
|
||||
follow_redirects=False # Important! leave this off! it should not redirect
|
||||
)
|
||||
assert res.status_code == 200
|
||||
|
||||
#assert live_server.app.config['DATASTORE'].data['watching'][uuid].history_n == 2
|
||||
#assert live_server.app.config['DATASTORE'].data['watching'][uuid].history_n == 2
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
@@ -173,7 +173,7 @@ def live_server_setup(live_server):
|
||||
return resp
|
||||
|
||||
# Tried using a global var here but didn't seem to work, so reading from a file instead.
|
||||
with open("test-datastore/endpoint-content.txt", "r") as f:
|
||||
with open("test-datastore/endpoint-content.txt", "rb") as f:
|
||||
resp = make_response(f.read(), status_code)
|
||||
if uppercase_headers:
|
||||
resp.headers['CONTENT-TYPE'] = ctype if ctype else 'text/html'
|
||||
|
||||
Reference in New Issue
Block a user