From 7600d9b07c5b719d529f8a48c38d9178efefa266 Mon Sep 17 00:00:00 2001 From: KernelDeimos Date: Thu, 22 Aug 2024 14:28:09 -0400 Subject: [PATCH] feat: add streaming to XHR driver client --- src/puter-js/src/lib/utils.js | 67 +++++++++++++++++++++++++++++++++++ 1 file changed, 67 insertions(+) diff --git a/src/puter-js/src/lib/utils.js b/src/puter-js/src/lib/utils.js index 8195da8f0..1d8af33fd 100644 --- a/src/puter-js/src/lib/utils.js +++ b/src/puter-js/src/lib/utils.js @@ -268,9 +268,76 @@ async function driverCall_( if ( settings.responseType ) { xhr.responseType = settings.responseType; } + + // =============================================== + // TO UNDERSTAND THIS CODE, YOU MUST FIRST + // UNDERSTAND THE FOLLOWING TEXT: + // + // Everything between here and the comment reading + // "=== END OF STREAMING ===" is ONLY for handling + // requests with content type "application/x-ndjson" + // =============================================== + + let is_stream = false; + let got_headers = false; + let signal_stream_update = null; + let lastLength = 0; + let response_complete = false; + const parts_received = []; + xhr.onreadystatechange = () => { + if ( got_headers ) return; + got_headers = true; + if ( xhr.readyState >= 2 ) { + if ( xhr.getResponseHeader("Content-Type") !== + 'application/x-ndjson' + ) return; + is_stream = true; + const Stream = async function* Stream () { + while ( ! response_complete ) { + const tp = new TeePromise(); + signal_stream_update = tp.resolve.bind(tp); + await tp; + if ( response_complete ) break; + while ( parts_received.length > 0 ) { + const value = parts_received.pop(); + const parts = value.split('\n'); + for ( const part of parts ) { + if ( part.trim() === '' ) continue; + yield JSON.parse(part); + } + } + } + } + + return resolve_func(Stream()); + } + }; + + xhr.onprogress = function() { + if ( ! signal_stream_update ) return; + + const newText = xhr.responseText.slice(lastLength); + lastLength = xhr.responseText.length; // Update lastLength to the current length + + parts_received.push(newText); + signal_stream_update(); + }; + + xhr.addEventListener('load', () => { + response_complete = true; + }); + + // ======================== + // === END OF STREAMING === + // ======================== // load: success or error xhr.addEventListener('load', async function(response){ + response_complete = true; + if ( is_stream ) { + signal_stream_update?.(); + return; + } const resp = await parseResponse(response.target); // HTTP Error - unauthorized if(response.status === 401 || resp?.code === "token_auth_failed"){