Files
puter/extensions/legacyFileSystem/storage/S3StorageController.js
Daniel Salazar 95cbbc5de6
Docker Image CI / build-and-push-image (push) Has been cancelled
Maintain Release Merge PR / update-release-pr (push) Has been cancelled
Notify HeyPuter / notify (push) Has been cancelled
release-please / release-please (push) Has been cancelled
test / test-backend (24.x) (push) Has been cancelled
test / API tests (node env, api-test) (24.x) (push) Has been cancelled
test / puterjs (node env, vitest) (24.x) (push) Has been cancelled
feat: enable singed uploads on oss and fix broken migration (#2765)
* fix: proper shutdown

* feat: enable singed uploads on oss

* fix: s3 in oss

* fix: thumbnails in oss
2026-04-03 07:29:22 -07:00

398 lines
12 KiB
JavaScript

import { AbortMultipartUploadCommand, CompleteMultipartUploadCommand, CopyObjectCommand, CreateMultipartUploadCommand, DeleteObjectCommand, GetObjectCommand, PutObjectCommand, UploadPartCommand, UploadPartCopyCommand } from '@aws-sdk/client-s3';
import { Readable } from 'stream';
import { TeePromise } from 'teepromise';
const { s3ClientProvider } = extension.import('data');
const { Context } = extension.import('core');
const {
chunk_stream,
} = extension.import('core').util.streamutil;
const {
simple_retry,
} = extension.import('core').util.retryutil;
const {
EWMA,
} = extension.import('core').util.opmath;
export default class S3StorageController {
forceDefault = true;
async init () {
this.clients_ = {};
this.config = global_config;
this.global_average_S3_part_time = new EWMA({
initial: 4000, // average from local testing
alpha: 0.1,
});
}
#get_client (region) {
return s3ClientProvider.get(region);
}
async upload ({ uid, file, storage_meta, storage_api }) {
const { progress_tracker } = storage_api;
const {
bucket_region,
bucket,
} = storage_meta;
const client = this.#get_client(bucket_region);
if ( file.buffer ) {
const [s3_error, s3_eventual_success, _s3_resp] = await simple_retry(async () => {
const ret = await client.send(new PutObjectCommand({
Bucket: bucket,
Key: uid,
Body: file.buffer,
}));
progress_tracker.set_total(file.size);
progress_tracker.set(file.size);
return ret;
}, 3, 200);
if ( ! s3_eventual_success ) {
throw s3_error;
}
return; // AKA "} else {{"
}
const [s3_error, s3_eventual_success, _s3_resp] = await simple_retry(async () => {
return await this.#upload_stream({
bucket_region,
bucket,
key: uid,
stream: file.stream,
on_progress: evt => {
progress_tracker.set_total(file.size);
progress_tracker.set(evt.uploaded);
},
});
}, 3, 200);
if ( ! s3_eventual_success ) {
throw s3_error;
}
}
async copy ({ src_node, dst_storage, storage_api }) {
const {
progress_tracker,
} = storage_api;
const src_storage = await src_node.get('s3:location');
const size = await src_node.get('size');
if ( size < 4 * 1000 ** 3 - 100 ) {
const ret = await this.#copy_simple({
src_key: src_storage.key,
src_bucket: src_storage.bucket,
dst_key: dst_storage.key,
dst_bucket_region: dst_storage.bucket_region,
dst_bucket: dst_storage.bucket,
});
progress_tracker.set_total(size);
progress_tracker.set(size);
return ret;
}
return await this.#copy_multipart({
src_key: src_storage.key,
src_bucket: src_storage.bucket,
dst_key: dst_storage.key,
dst_bucket_region: dst_storage.bucket_region,
dst_bucket: dst_storage.bucket,
size,
on_progress: evt => {
const x = Context.get();
progress_tracker.set_total(size);
progress_tracker.set(evt.uploaded);
},
});
}
async delete ({ node }) {
const node_storage = await node.get('s3:location');
const client = this.#get_client(node_storage.bucket_region);
return await client.send(new DeleteObjectCommand({
Bucket: node_storage.bucket,
Key: node_storage.key,
}));
}
async read ({ location, range, version_id }) {
const { bucket_region, bucket, key } = location;
const client = this.#get_client(bucket_region);
const response = await client.send(new GetObjectCommand({
Bucket: bucket,
Key: key,
...(range ? { Range: range } : {}),
...(version_id ? { VersionId: version_id } : {}),
}));
const stream = Readable.from(response.Body);
return stream;
}
async #upload_stream ({ bucket_region, bucket, key, stream, on_progress }) {
const client = this.#get_client(bucket_region);
const multipart_upload = await client.send(new CreateMultipartUploadCommand({
Bucket: bucket,
Key: key,
}));
let ret; // return value
try {
const part_size = 1024 * 1024 * 5; // 5MB
//
// get each part while streaming
const chunk_iterator = chunk_stream(
stream,
part_size,
this.global_average_S3_part_time,
);
let i = 0;
let uploaded_bytes = 0;
let upload_promises = [];
const upload_results = [];
let tp;
let count_parts_being_uploaded = 0;
let check_queue;
let queue_empty_promise = null;
const upload_part = async part => {
if ( count_parts_being_uploaded >= 4 ) {
console.log('too many concurrent part uploads; halting');
tp = new TeePromise();
await tp;
}
const part_number = ++i;
count_parts_being_uploaded++;
const upload_promise = (async () => {
const ts_start = Date.now();
const [err, success, result] = await simple_retry(async () => {
return await client.send(new UploadPartCommand({
Bucket: bucket,
Key: key,
PartNumber: part_number,
UploadId: multipart_upload.UploadId,
Body: part,
}));
}, 3, 50);
if ( err || !success ) {
throw err;
}
const ts_end = Date.now();
const elapsed = ts_end - ts_start;
const elapsed_per_part_size = elapsed * (part.length / part_size);
// this.global_average_S3_part_time.put(elapsed);
this.global_average_S3_part_time.put(elapsed_per_part_size);
uploaded_bytes += part.length;
on_progress({ uploaded: uploaded_bytes });
count_parts_being_uploaded--;
if ( tp ) {
const p = tp;
tp = null;
p.resolve();
}
check_queue();
return result;
})();
upload_promises.push(upload_promise);
};
const part_queue = [];
check_queue = () => {
if ( part_queue.length > 0 ) {
const part = part_queue.shift();
upload_part(part);
if ( part_queue.length == 0 ) {
if ( queue_empty_promise ) {
const p = queue_empty_promise;
queue_empty_promise = null;
p.resolve();
}
}
}
};
for await ( const chunk of chunk_iterator ) {
await upload_part(chunk);
}
// If the file is empty we still need to upload a part
if ( i === 0 ) {
const upload_promise = (async () => {
const [err, success, result] = await simple_retry(async () => {
return await client.send(new UploadPartCommand({
Bucket: bucket,
Key: key,
PartNumber: 1,
UploadId: multipart_upload.UploadId,
Body: Buffer.alloc(0),
}));
}, 3, 50);
if ( err || !success ) {
throw err;
}
on_progress({ uploaded: uploaded_bytes });
return result;
})();
upload_promises.push(upload_promise);
}
if ( part_queue.length > 0 ) {
queue_empty_promise = new TeePromise();
await queue_empty_promise;
}
const some_results = await Promise.all(upload_promises);
upload_results.push(...some_results);
try {
// complete the upload
ret = await client.send(new CompleteMultipartUploadCommand({
Bucket: bucket,
Key: key,
UploadId: multipart_upload.UploadId,
MultipartUpload: {
Parts: upload_results.map((_, i) => ({
PartNumber: i + 1,
ETag: _.ETag,
})),
},
}));
} catch ( e ) {
console.warn(`catch block: ${e.message}`);
}
} catch ( e ) {
console.error(`error: ${e.message}`);
// abort the upload
await client.send(new AbortMultipartUploadCommand({
Bucket: bucket,
Key: key,
UploadId: multipart_upload.UploadId,
}));
throw e;
}
return ret;
}
async #copy_simple ({
dst_bucket_region,
dst_bucket,
src_bucket,
src_key,
dst_key,
}) {
const client = this.#get_client(dst_bucket_region);
const ret = await client.send(new CopyObjectCommand({
Bucket: dst_bucket,
Key: dst_key,
CopySource: `${src_bucket}/${src_key}`,
}));
return ret;
}
async #copy_multipart ({
dst_bucket_region,
dst_bucket,
src_bucket,
src_key,
dst_key,
on_progress,
size,
}) {
const client = this.#get_client(dst_bucket_region);
const multipart_upload = await client.send(new CreateMultipartUploadCommand({
Bucket: dst_bucket,
Key: dst_key,
}));
const part_size = 4 * 1024 * 1024 * 1024; // 4GiB
const results = [];
let part_number_i = 0;
for ( let byte_start = 0 ; byte_start < size ; byte_start += part_size ) {
const part_number = ++part_number_i;
// byte range is inclusive... WTF?
const byte_end = Math.min(byte_start + part_size, size) - 1;
const [err, success, result] = await simple_retry(async () => {
const params = {
Bucket: dst_bucket,
Key: dst_key,
PartNumber: part_number,
UploadId: multipart_upload.UploadId,
CopySource: `${src_bucket}/${src_key}`,
CopySourceRange: `bytes=${byte_start}-${byte_end}`,
};
return await client.send(new UploadPartCopyCommand(params));
}, 3, 50);
if ( err || !success ) {
throw err;
}
results.push(result);
on_progress({ uploaded: byte_end + 1 });
}
const ret = await client.send(new CompleteMultipartUploadCommand({
Bucket: dst_bucket,
Key: dst_key,
UploadId: multipart_upload.UploadId,
MultipartUpload: {
Parts: results.map((_, i) => ({
PartNumber: i + 1,
ETag: _.CopyPartResult.ETag,
})),
},
}));
return ret;
}
}