mirror of
https://github.com/dgtlmoon/changedetection.io.git
synced 2026-04-10 13:07:57 +00:00
Compare commits
1 Commits
python-314
...
download-w
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
cdabdfeef1 |
@@ -354,6 +354,59 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, queuedWatchMe
|
|||||||
# Return a 500 error
|
# Return a 500 error
|
||||||
abort(500)
|
abort(500)
|
||||||
|
|
||||||
|
@edit_blueprint.route("/edit/<string:uuid>/get-data-package", methods=['GET'])
|
||||||
|
@login_optionally_required
|
||||||
|
def watch_get_data_package(uuid):
|
||||||
|
"""Download all data for a single watch as a zip file"""
|
||||||
|
from io import BytesIO
|
||||||
|
from flask import send_file
|
||||||
|
import zipfile
|
||||||
|
from pathlib import Path
|
||||||
|
import datetime
|
||||||
|
|
||||||
|
if uuid == 'first':
|
||||||
|
uuid = list(datastore.data['watching'].keys()).pop()
|
||||||
|
|
||||||
|
watch = datastore.data['watching'].get(uuid)
|
||||||
|
if not watch:
|
||||||
|
abort(404)
|
||||||
|
|
||||||
|
# Create zip in memory
|
||||||
|
memory_file = BytesIO()
|
||||||
|
|
||||||
|
with zipfile.ZipFile(memory_file, 'w',
|
||||||
|
compression=zipfile.ZIP_DEFLATED,
|
||||||
|
compresslevel=8) as zipObj:
|
||||||
|
|
||||||
|
# Add the watch's JSON file if it exists
|
||||||
|
watch_json_path = os.path.join(watch.data_dir, 'watch.json')
|
||||||
|
if os.path.isfile(watch_json_path):
|
||||||
|
zipObj.write(watch_json_path,
|
||||||
|
arcname=os.path.join(uuid, 'watch.json'),
|
||||||
|
compress_type=zipfile.ZIP_DEFLATED,
|
||||||
|
compresslevel=8)
|
||||||
|
|
||||||
|
# Add all files in the watch data directory
|
||||||
|
if os.path.isdir(watch.data_dir):
|
||||||
|
for f in Path(watch.data_dir).glob('*'):
|
||||||
|
if f.is_file() and f.name != 'watch.json': # Skip watch.json since we already added it
|
||||||
|
zipObj.write(f,
|
||||||
|
arcname=os.path.join(uuid, f.name),
|
||||||
|
compress_type=zipfile.ZIP_DEFLATED,
|
||||||
|
compresslevel=8)
|
||||||
|
|
||||||
|
# Seek to beginning of file
|
||||||
|
memory_file.seek(0)
|
||||||
|
|
||||||
|
# Generate filename with timestamp
|
||||||
|
timestamp = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
|
||||||
|
filename = f"watch-data-{uuid[:8]}-{timestamp}.zip"
|
||||||
|
|
||||||
|
return send_file(memory_file,
|
||||||
|
as_attachment=True,
|
||||||
|
download_name=filename,
|
||||||
|
mimetype='application/zip')
|
||||||
|
|
||||||
# Ajax callback
|
# Ajax callback
|
||||||
@edit_blueprint.route("/edit/<string:uuid>/preview-rendered", methods=['POST'])
|
@edit_blueprint.route("/edit/<string:uuid>/preview-rendered", methods=['POST'])
|
||||||
@login_optionally_required
|
@login_optionally_required
|
||||||
|
|||||||
@@ -488,6 +488,7 @@ Math: {{ 1 + 1 }}") }}
|
|||||||
{% if watch.history_n %}
|
{% if watch.history_n %}
|
||||||
<p>
|
<p>
|
||||||
<a href="{{url_for('ui.ui_edit.watch_get_latest_html', uuid=uuid)}}" class="pure-button button-small">{{ _('Download latest HTML snapshot') }}</a>
|
<a href="{{url_for('ui.ui_edit.watch_get_latest_html', uuid=uuid)}}" class="pure-button button-small">{{ _('Download latest HTML snapshot') }}</a>
|
||||||
|
<a href="{{url_for('ui.ui_edit.watch_get_data_package', uuid=uuid)}}" class="pure-button button-small">{{ _('Export watch data') }}</a>
|
||||||
</p>
|
</p>
|
||||||
{% endif %}
|
{% endif %}
|
||||||
|
|
||||||
|
|||||||
@@ -75,4 +75,62 @@ def test_backup(client, live_server, measure_memory_usage, datastore_path):
|
|||||||
follow_redirects=True
|
follow_redirects=True
|
||||||
)
|
)
|
||||||
|
|
||||||
assert b'No backups found.' in res.data
|
assert b'No backups found.' in res.data
|
||||||
|
|
||||||
|
|
||||||
|
def test_watch_data_package_download(client, live_server, measure_memory_usage, datastore_path):
|
||||||
|
"""Test downloading a single watch's data as a zip package"""
|
||||||
|
import os
|
||||||
|
import json
|
||||||
|
|
||||||
|
set_original_response(datastore_path=datastore_path)
|
||||||
|
|
||||||
|
# Add a watch
|
||||||
|
res = client.post(
|
||||||
|
url_for("imports.import_page"),
|
||||||
|
data={"urls": url_for('test_endpoint', _external=True)},
|
||||||
|
follow_redirects=True
|
||||||
|
)
|
||||||
|
|
||||||
|
assert b"1 Imported" in res.data
|
||||||
|
wait_for_all_checks(client)
|
||||||
|
|
||||||
|
# Get the UUID directly from the datastore
|
||||||
|
# Find the watch directories
|
||||||
|
uuid = None
|
||||||
|
for item in os.listdir(datastore_path):
|
||||||
|
item_path = os.path.join(datastore_path, item)
|
||||||
|
if os.path.isdir(item_path) and len(item) == 36: # UUID format
|
||||||
|
uuid = item
|
||||||
|
break
|
||||||
|
|
||||||
|
assert uuid is not None, "Could not find watch UUID in datastore"
|
||||||
|
|
||||||
|
# Download the watch data package
|
||||||
|
res = client.get(
|
||||||
|
url_for("ui.ui_edit.watch_get_data_package", uuid=uuid),
|
||||||
|
follow_redirects=True
|
||||||
|
)
|
||||||
|
|
||||||
|
# Should get the right zip content type
|
||||||
|
assert res.content_type == "application/zip"
|
||||||
|
|
||||||
|
# Should be PK/ZIP stream (PKzip header)
|
||||||
|
assert res.data[:2] == b'PK', "File should start with PK (PKzip header)"
|
||||||
|
assert res.data.count(b'PK') >= 2, "Should have multiple PK markers (zip file structure)"
|
||||||
|
|
||||||
|
# Verify zip contents
|
||||||
|
backup = ZipFile(io.BytesIO(res.data))
|
||||||
|
files = backup.namelist()
|
||||||
|
|
||||||
|
# Should have files in a UUID directory
|
||||||
|
assert any(uuid in f for f in files), f"Files should be in UUID directory: {files}"
|
||||||
|
|
||||||
|
# Should contain watch.json
|
||||||
|
watch_json_path = f"{uuid}/watch.json"
|
||||||
|
assert watch_json_path in files, f"Should contain watch.json, got: {files}"
|
||||||
|
|
||||||
|
# Should contain history/snapshot files
|
||||||
|
uuid4hex_txt = re.compile(f'^{re.escape(uuid)}/.*\\.txt', re.I)
|
||||||
|
txt_files = list(filter(uuid4hex_txt.match, files))
|
||||||
|
assert len(txt_files) > 0, f"Should have at least one .txt file (history/snapshot), got: {files}"
|
||||||
Reference in New Issue
Block a user