From dfdda4f3b9bd8e07e8af973936b03a9243f31c72 Mon Sep 17 00:00:00 2001 From: Sam Atkins Date: Wed, 17 Apr 2024 10:08:42 +0100 Subject: [PATCH 01/11] Extract some helper functions for accessing a window/iframe by uuid --- src/IPC.js | 8 -------- src/helpers.js | 12 +++++++++++- 2 files changed, 11 insertions(+), 9 deletions(-) diff --git a/src/IPC.js b/src/IPC.js index f8aad6602..dbe18eb13 100644 --- a/src/IPC.js +++ b/src/IPC.js @@ -74,14 +74,6 @@ window.addEventListener('message', async (event) => { return; } - const window_for_app_instance = (instance_id) => { - return $(`.window[data-element_uuid="${instance_id}"]`).get(0); - }; - - const iframe_for_app_instance = (instance_id) => { - return $(window_for_app_instance(instance_id)).find('.window-app-iframe').get(0); - }; - const $el_parent_window = $(window_for_app_instance(event.data.appInstanceID)); const parent_window_id = $el_parent_window.attr('data-id'); const $el_parent_disable_mask = $el_parent_window.find('.window-disable-mask'); diff --git a/src/helpers.js b/src/helpers.js index f9504ca48..5c0ff66a8 100644 --- a/src/helpers.js +++ b/src/helpers.js @@ -3512,4 +3512,14 @@ window.change_clock_visible = (clock_visible) => { } $('select.change-clock-visible').val(window.user_preferences.clock_visible); -} \ No newline at end of file +} + +// Finds the `.window` element for the given app instance ID +window.window_for_app_instance = (instance_id) => { + return $(`.window[data-element_uuid="${instance_id}"]`).get(0); +}; + +// Finds the `iframe` element for the given app instance ID +window.iframe_for_app_instance = (instance_id) => { + return $(window_for_app_instance(instance_id)).find('.window-app-iframe').get(0); +}; From 0aa5543397a1a65b04f9cba11a1e9be0406342ea Mon Sep 17 00:00:00 2001 From: Sam Atkins Date: Wed, 17 Apr 2024 11:14:33 +0100 Subject: [PATCH 02/11] Let AppConnection know if its target app uses the Puter SDK Apps are not required to use the Puter SDK. If they don't, then we can still launch them, close them, and listen to their close event, but are unable to send messages to them. --- packages/puter-js/src/modules/UI.js | 20 +++++++++++++++++--- src/IPC.js | 12 +----------- src/helpers.js | 16 ++++++++++++++++ 3 files changed, 34 insertions(+), 14 deletions(-) diff --git a/packages/puter-js/src/modules/UI.js b/packages/puter-js/src/modules/UI.js index 849d4cad8..abfd43c29 100644 --- a/packages/puter-js/src/modules/UI.js +++ b/packages/puter-js/src/modules/UI.js @@ -14,7 +14,11 @@ class AppConnection extends EventListener { // Whether the target app is open #isOpen; - constructor(messageTarget, appInstanceID, targetAppInstanceID) { + // Whether the target app uses the Puter SDK, and so accepts messages + // (Closing and close events will still function.) + #usesSDK; + + constructor(messageTarget, appInstanceID, targetAppInstanceID, usesSDK) { super([ 'message', // The target sent us something with postMessage() 'close', // The target app was closed @@ -23,6 +27,7 @@ class AppConnection extends EventListener { this.appInstanceID = appInstanceID; this.targetAppInstanceID = targetAppInstanceID; this.#isOpen = true; + this.#usesSDK = usesSDK; // TODO: Set this.#puterOrigin to the puter origin @@ -54,12 +59,21 @@ class AppConnection extends EventListener { }); } + // Does the target app use the Puter SDK? If not, certain features will be unavailable. + get usesSDK() { return this.#usesSDK; } + + // Send a message to the target app. Requires the target to use the Puter SDK. postMessage(message) { if (!this.#isOpen) { console.warn('Trying to post message on a closed AppConnection'); return; } + if (!this.#usesSDK) { + console.warn('Trying to post message to a non-SDK app'); + return; + } + this.messageTarget.postMessage({ msg: 'messageToApp', appInstanceID: this.appInstanceID, @@ -155,7 +169,7 @@ class UI extends EventListener { } if (this.parentInstanceID) { - this.#parentAppConnection = new AppConnection(this.messageTarget, this.appInstanceID, this.parentInstanceID); + this.#parentAppConnection = new AppConnection(this.messageTarget, this.appInstanceID, this.parentInstanceID, true); } // Tell the host environment that this app is using the Puter SDK and is ready to receive messages, @@ -374,7 +388,7 @@ class UI extends EventListener { } else if (e.data.msg === 'childAppLaunched') { // execute callback with a new AppConnection to the child - const connection = new AppConnection(this.messageTarget, this.appInstanceID, e.data.child_instance_id); + const connection = new AppConnection(this.messageTarget, this.appInstanceID, e.data.child_instance_id, e.data.uses_sdk); this.#callbackFunctions[e.data.original_msg_id](connection); } else{ diff --git a/src/IPC.js b/src/IPC.js index dbe18eb13..6e9b4cc3b 100644 --- a/src/IPC.js +++ b/src/IPC.js @@ -90,17 +90,7 @@ window.addEventListener('message', async (event) => { $(target_iframe).attr('data-appUsesSDK', 'true'); // If we were waiting to launch this as a child app, report to the parent that it succeeded. - const child_launch_callback = window.child_launch_callbacks[event.data.appInstanceID]; - if (child_launch_callback) { - const parent_iframe = iframe_for_app_instance(child_launch_callback.parent_instance_id); - // send confirmation to requester window - parent_iframe.contentWindow.postMessage({ - msg: 'childAppLaunched', - original_msg_id: child_launch_callback.launch_msg_id, - child_instance_id: event.data.appInstanceID, - }, '*'); - delete window.child_launch_callbacks[event.data.appInstanceID]; - } + window.report_app_launched(event.data.appInstanceID, { uses_sdk: true }); // Send any saved broadcasts to the new app globalThis.services.get('broadcast').sendSavedBroadcastsTo(event.data.appInstanceID); diff --git a/src/helpers.js b/src/helpers.js index 5c0ff66a8..1273bb2b2 100644 --- a/src/helpers.js +++ b/src/helpers.js @@ -3523,3 +3523,19 @@ window.window_for_app_instance = (instance_id) => { window.iframe_for_app_instance = (instance_id) => { return $(window_for_app_instance(instance_id)).find('.window-app-iframe').get(0); }; + +// Run any callbacks to say that the app has launched +window.report_app_launched = (instance_id, { uses_sdk = true }) => { + const child_launch_callback = window.child_launch_callbacks[instance_id]; + if (child_launch_callback) { + const parent_iframe = iframe_for_app_instance(child_launch_callback.parent_instance_id); + // send confirmation to requester window + parent_iframe.contentWindow.postMessage({ + msg: 'childAppLaunched', + original_msg_id: child_launch_callback.launch_msg_id, + child_instance_id: instance_id, + uses_sdk: uses_sdk, + }, '*'); + delete window.child_launch_callbacks[instance_id]; + } +} From 639653dac2f468b0a3e55b761ddc6a005cc22704 Mon Sep 17 00:00:00 2001 From: Sam Atkins Date: Wed, 17 Apr 2024 11:47:30 +0100 Subject: [PATCH 03/11] Report when a non-SDK app closes puter.ui.launchApp() returns a Promise that needs to resolve whether the app uses the Puter SDK or not. Non-SDK apps are tricky because they don't send a READY message on startup, and we don't know in advance whether an app will use the SDK or not. This is a workaround to ensure that launchApp() always resolves. When an app is closed, if it wasn't using the SDK, we send an artificial notification that it launched, followed by an extra notification that it has closed (because the original close notification was sent before this point). This means any users of launchApp() can await it, and get an AppConnection, and listen to the close event. They can't otherwise interact with a non-SDK app because it will have closed already, but we can improve this in the future without breaking the API. --- src/UI/UIWindow.js | 22 +--------------------- src/helpers.js | 37 ++++++++++++++++++++++++++++++++++++- 2 files changed, 37 insertions(+), 22 deletions(-) diff --git a/src/UI/UIWindow.js b/src/UI/UIWindow.js index 812959f13..d4c4e9367 100644 --- a/src/UI/UIWindow.js +++ b/src/UI/UIWindow.js @@ -2853,27 +2853,7 @@ $.fn.close = async function(options) { $(`.window[data-parent_uuid="${window_uuid}"]`).close(); // notify other apps that we're closing - if (app_uses_sdk) { - // notify parent app, if we have one, that we're closing - const parent_id = this.dataset['parent_instance_id']; - const parent = $(`.window[data-element_uuid="${parent_id}"] .window-app-iframe`).get(0); - if (parent) { - parent.contentWindow.postMessage({ - msg: 'appClosed', - appInstanceID: window_uuid, - }, '*'); - } - - // notify child apps, if we have them, that we're closing - const children = $(`.window[data-parent_instance_id="${window_uuid}"] .window-app-iframe`); - children.each((_, child) => { - child.contentWindow.postMessage({ - msg: 'appClosed', - appInstanceID: window_uuid, - }, '*'); - }); - // TODO: Once other AppConnections exist, those will need notifying too. - } + window.report_app_closed(window_uuid); // remove backdrop $(this).closest('.window-backdrop').remove(); diff --git a/src/helpers.js b/src/helpers.js index 1273bb2b2..07db8e5aa 100644 --- a/src/helpers.js +++ b/src/helpers.js @@ -1899,6 +1899,15 @@ window.launch_app = async (options)=>{ $(el).on('remove', () => { const svc_process = globalThis.services.get('process'); svc_process.unregister(process.uuid); + + // If it's a non-sdk app, report that it launched and closed. + // FIXME: This is awkward. Really, we want some way of knowing when it's launched and reporting that immediately instead. + const $app_iframe = $(el).find('.window-app-iframe'); + if ($app_iframe.attr('data-appUsesSdk') !== 'true') { + window.report_app_launched(process.uuid, { uses_sdk: false }); + // We also have to report an extra close event because the real one was sent already + window.report_app_closed(process.uuid); + } }); process.references.el_win = el; @@ -3538,4 +3547,30 @@ window.report_app_launched = (instance_id, { uses_sdk = true }) => { }, '*'); delete window.child_launch_callbacks[instance_id]; } -} +}; + +// Run any callbacks to say that the app has closed +window.report_app_closed = (instance_id) => { + const el_window = window_for_app_instance(instance_id); + + // notify parent app, if we have one, that we're closing + const parent_id = el_window.dataset['parent_instance_id']; + const parent = $(`.window[data-element_uuid="${parent_id}"] .window-app-iframe`).get(0); + if (parent) { + parent.contentWindow.postMessage({ + msg: 'appClosed', + appInstanceID: instance_id, + }, '*'); + } + + // notify child apps, if we have them, that we're closing + const children = $(`.window[data-parent_instance_id="${instance_id}"] .window-app-iframe`); + children.each((_, child) => { + child.contentWindow.postMessage({ + msg: 'appClosed', + appInstanceID: instance_id, + }, '*'); + }); + + // TODO: Once other AppConnections exist, those will need notifying too. +}; From e355c77a4aa477f45e721feca6ea5e636ba67b13 Mon Sep 17 00:00:00 2001 From: Sam Atkins Date: Wed, 17 Apr 2024 16:31:31 +0100 Subject: [PATCH 04/11] Phoenix: Wait for apps to finish executing, and connect stdio to them After launching an app, if successful, we connect stdio streams to it, and wait for it to exit before we return to the prompt. stdio is implemented as regular AppConnection messages: - stdin: `{ $: 'stdin', data: Uint8Array }` from phoenix -> child - stdout: `{ $: 'stdout', data: Uint8Array }` from child -> phoenix Terminal and Phoenix now communicate with each other using the same style, instead of 'input' and 'output' messages. This will help with eventually running subshells. SIGINT currently is not sent. We also suffer from the same "one more read from stdin happens after app exits" bug that's in PathCommandProvider where I copied the stdin code from. --- packages/phoenix/src/pty/XDocumentPTT.js | 4 +- .../providers/PuterAppCommandProvider.js | 57 +++++++++++++++++-- .../terminal/src/pty/XDocumentANSIShell.js | 4 +- 3 files changed, 57 insertions(+), 8 deletions(-) diff --git a/packages/phoenix/src/pty/XDocumentPTT.js b/packages/phoenix/src/pty/XDocumentPTT.js index 13ea657a2..64435d2d2 100644 --- a/packages/phoenix/src/pty/XDocumentPTT.js +++ b/packages/phoenix/src/pty/XDocumentPTT.js @@ -38,7 +38,7 @@ export class XDocumentPTT { chunk = encoder.encode(chunk); } terminalConnection.postMessage({ - $: 'output', + $: 'stdout', data: chunk, }); } @@ -52,7 +52,7 @@ export class XDocumentPTT { this.emit('ioctl.set', message); return; } - if (message.$ === 'input') { + if (message.$ === 'stdin') { this.readController.enqueue(message.data); return; } diff --git a/packages/phoenix/src/puter-shell/providers/PuterAppCommandProvider.js b/packages/phoenix/src/puter-shell/providers/PuterAppCommandProvider.js index 44bf6cbbe..8eb678f08 100644 --- a/packages/phoenix/src/puter-shell/providers/PuterAppCommandProvider.js +++ b/packages/phoenix/src/puter-shell/providers/PuterAppCommandProvider.js @@ -16,6 +16,9 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ +import { Exit } from '../coreutils/coreutil_lib/exit.js'; +import { signals } from '../../ansi-shell/signals.js'; + const BUILT_IN_APPS = [ 'explorer', ]; @@ -31,8 +34,7 @@ export class PuterAppCommandProvider { // TODO: Parameters and options? async execute(ctx) { const args = {}; // TODO: Passed-in parameters and options would go here - // NOTE: No await here, because launchApp() currently only resolves for Puter SDK apps. - puter.ui.launchApp(id, args); + await puter.ui.launchApp(id, args); } }; } @@ -57,8 +59,55 @@ export class PuterAppCommandProvider { // TODO: Parameters and options? async execute(ctx) { const args = {}; // TODO: Passed-in parameters and options would go here - // NOTE: No await here, yet, because launchApp() currently only resolves for Puter SDK apps. - puter.ui.launchApp(name, args); + const child = await puter.ui.launchApp(name, args); + + // Wait for app to close. + const app_close_promise = new Promise((resolve, reject) => { + child.on('close', () => { + // TODO: Exit codes for apps + resolve({ done: true }); + }); + }); + + // Wait for SIGINT + const sigint_promise = new Promise((resolve, reject) => { + ctx.externs.sig.on((signal) => { + if (signal === signals.SIGINT) { + child.close(); + reject(new Exit(130)); + } + }); + }); + + // We don't connect stdio to non-SDK apps, because they won't make use of it. + if (child.usesSDK) { + const decoder = new TextDecoder(); + child.on('message', message => { + if (message.$ === 'stdout') { + ctx.externs.out.write(decoder.decode(message.data)); + } + }); + + // Repeatedly copy data from stdin to the child, while it's running. + // DRY: Initially copied from PathCommandProvider + let data, done; + const next_data = async () => { + // FIXME: This waits for one more read() after we finish. + ({ value: data, done } = await Promise.race([ + app_close_promise, sigint_promise, ctx.externs.in_.read(), + ])); + if (data) { + child.postMessage({ + $: 'stdin', + data: data, + }); + if (!done) setTimeout(next_data, 0); + } + }; + setTimeout(next_data, 0); + } + + return Promise.race([ app_close_promise, sigint_promise ]); } }; } diff --git a/packages/terminal/src/pty/XDocumentANSIShell.js b/packages/terminal/src/pty/XDocumentANSIShell.js index 8df371e1e..886383411 100644 --- a/packages/terminal/src/pty/XDocumentANSIShell.js +++ b/packages/terminal/src/pty/XDocumentANSIShell.js @@ -53,7 +53,7 @@ export class XDocumentANSIShell { return; } - if (message.$ === 'output') { + if (message.$ === 'stdout') { ptt.out.write(message.data); return; } @@ -69,7 +69,7 @@ export class XDocumentANSIShell { for ( ;; ) { const chunk = (await ptt.in.read()).value; shell.postMessage({ - $: 'input', + $: 'stdin', data: chunk, }); } From 222a617c443812cfd4673075807298bb5df994fc Mon Sep 17 00:00:00 2001 From: Sam Atkins Date: Wed, 17 Apr 2024 13:37:49 +0100 Subject: [PATCH 05/11] Phoenix: Use regular code path to run built-in apps Now launchApp() can always be awaited, we can run built-in apps using the same code path for other apps, and eventually have SIGINT close them. --- .../providers/PuterAppCommandProvider.js | 50 ++++++++----------- 1 file changed, 22 insertions(+), 28 deletions(-) diff --git a/packages/phoenix/src/puter-shell/providers/PuterAppCommandProvider.js b/packages/phoenix/src/puter-shell/providers/PuterAppCommandProvider.js index 8eb678f08..983293b30 100644 --- a/packages/phoenix/src/puter-shell/providers/PuterAppCommandProvider.js +++ b/packages/phoenix/src/puter-shell/providers/PuterAppCommandProvider.js @@ -23,43 +23,37 @@ const BUILT_IN_APPS = [ 'explorer', ]; +const lookup_app = async (id) => { + if (BUILT_IN_APPS.includes(id)) { + return { success: true, path: null }; + } + + const request = await fetch(`${puter.APIOrigin}/drivers/call`, { + "headers": { + "Content-Type": "application/json", + "Authorization": `Bearer ${puter.authToken}`, + }, + "body": JSON.stringify({ interface: 'puter-apps', method: 'read', args: { id: { name: id } } }), + "method": "POST", + }); + + const { success, result } = await request.json(); + return { success, path: result?.index_url }; +}; + export class PuterAppCommandProvider { async lookup (id) { - // Built-in apps will not be returned by the fetch query below, so we handle them separately. - if (BUILT_IN_APPS.includes(id)) { - return { - name: id, - path: 'Built-in Puter app', - // TODO: Parameters and options? - async execute(ctx) { - const args = {}; // TODO: Passed-in parameters and options would go here - await puter.ui.launchApp(id, args); - } - }; - } - - const request = await fetch(`${puter.APIOrigin}/drivers/call`, { - "headers": { - "Content-Type": "application/json", - "Authorization": `Bearer ${puter.authToken}`, - }, - "body": JSON.stringify({ interface: 'puter-apps', method: 'read', args: { id: { name: id } } }), - "method": "POST", - }); - - const { success, result } = await request.json(); - + const { success, path } = await lookup_app(id); if (!success) return; - const { name, index_url } = result; return { - name, - path: index_url, + name: id, + path: path ?? 'Built-in Puter app', // TODO: Parameters and options? async execute(ctx) { const args = {}; // TODO: Passed-in parameters and options would go here - const child = await puter.ui.launchApp(name, args); + const child = await puter.ui.launchApp(id, args); // Wait for app to close. const app_close_promise = new Promise((resolve, reject) => { From da208e23f541e3b92bbf0a9acd1d7962666d3a10 Mon Sep 17 00:00:00 2001 From: KernelDeimos Date: Wed, 17 Apr 2024 12:22:53 -0400 Subject: [PATCH 06/11] Add a valve and internal pipe to commands --- .../src/ansi-shell/pipeline/Coupler.js | 15 ++++++- .../src/ansi-shell/pipeline/Pipeline.js | 5 +++ packages/phoenix/src/promise.js | 43 +++++++++++++++++++ 3 files changed, 62 insertions(+), 1 deletion(-) create mode 100644 packages/phoenix/src/promise.js diff --git a/packages/phoenix/src/ansi-shell/pipeline/Coupler.js b/packages/phoenix/src/ansi-shell/pipeline/Coupler.js index 43ffe4233..2845be3aa 100644 --- a/packages/phoenix/src/ansi-shell/pipeline/Coupler.js +++ b/packages/phoenix/src/ansi-shell/pipeline/Coupler.js @@ -16,6 +16,8 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ +import { TeePromise } from "../../promise.js"; + export class Coupler { static description = ` Connects a read stream to a write stream. @@ -26,6 +28,7 @@ export class Coupler { this.source = source; this.target = target; this.on_ = true; + this.closed_ = new TeePromise(); this.isDone = new Promise(rslv => { this.resolveIsDone = rslv; }) @@ -35,10 +38,20 @@ export class Coupler { off () { this.on_ = false; } on () { this.on_ = true; } + close () { + this.closed_.resolve({ + value: undefined, + done: true, + }); + } + async listenLoop_ () { this.active = true; for (;;) { - const { value, done } = await this.source.read(); + const { value, done } = await Promise.race([ + this.closed_, + this.source.read(), + ]); if ( done ) { this.source = null; this.target = null; diff --git a/packages/phoenix/src/ansi-shell/pipeline/Pipeline.js b/packages/phoenix/src/ansi-shell/pipeline/Pipeline.js index 2ad938646..27ae2c56e 100644 --- a/packages/phoenix/src/ansi-shell/pipeline/Pipeline.js +++ b/packages/phoenix/src/ansi-shell/pipeline/Pipeline.js @@ -202,6 +202,10 @@ export class PreparedCommand { in_ = new MemReader(response); } + const internal_input_pipe = new Pipe(); + const valve = new Coupler(in_, internal_input_pipe.in); + in_ = internal_input_pipe.out; + // simple naive implementation for now const sig = { listeners_: [], @@ -297,6 +301,7 @@ export class PreparedCommand { let exit_code = 0; try { await execute(ctx); + valve.close(); } catch (e) { if ( e instanceof Exit ) { exit_code = e.code; diff --git a/packages/phoenix/src/promise.js b/packages/phoenix/src/promise.js new file mode 100644 index 000000000..20d080ae1 --- /dev/null +++ b/packages/phoenix/src/promise.js @@ -0,0 +1,43 @@ +export class TeePromise { + static STATUS_PENDING = Symbol('pending'); + static STATUS_RUNNING = {}; + static STATUS_DONE = Symbol('done'); + constructor () { + this.status_ = this.constructor.STATUS_PENDING; + this.donePromise = new Promise((resolve, reject) => { + this.doneResolve = resolve; + this.doneReject = reject; + }); + } + get status () { + return this.status_; + } + set status (status) { + this.status_ = status; + if ( status === this.constructor.STATUS_DONE ) { + this.doneResolve(); + } + } + resolve (value) { + this.status_ = this.constructor.STATUS_DONE; + this.doneResolve(value); + } + awaitDone () { + return this.donePromise; + } + then (fn, ...a) { + return this.donePromise.then(fn, ...a); + } + + reject (err) { + this.status_ = this.constructor.STATUS_DONE; + this.doneReject(err); + } + + /** + * @deprecated use then() instead + */ + onComplete(fn) { + return this.then(fn); + } +} From 072dbe8db5b065a5ead473c45237f374d239b6b3 Mon Sep 17 00:00:00 2001 From: Sam Atkins Date: Thu, 18 Apr 2024 17:39:13 +0100 Subject: [PATCH 07/11] Make BetterReader buffer and cancel, to fix stdin data loss BetterReader.read_with_cancel() returns both the read promise, and a function that can be used to cancel the read. A cancelled read is placed back into the BetterReader's chunk buffer, to be consumed by the next user that requests a read. This is used by Coupler so that when the coupler is closed, its pending read() call does not consume the next batch of input. This fixes the problem we were having with child applications consuming one chunk of stdin after they are closed, meaning the first key you press after an app exits would disappear. Co-authored-by: KernelDeimos --- packages/phoenix/packages/pty/exports.js | 120 ++++++++++++++++-- .../src/ansi-shell/pipeline/Coupler.js | 21 ++- packages/phoenix/src/promise.js | 14 ++ 3 files changed, 137 insertions(+), 18 deletions(-) diff --git a/packages/phoenix/packages/pty/exports.js b/packages/phoenix/packages/pty/exports.js index bbab56eff..56c87f3a2 100644 --- a/packages/phoenix/packages/pty/exports.js +++ b/packages/phoenix/packages/pty/exports.js @@ -16,6 +16,8 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ +import { raceCase } from '../../src/promise.js'; + const encoder = new TextEncoder(); const CHAR_LF = '\n'.charCodeAt(0); @@ -27,29 +29,67 @@ export class BetterReader { this.chunks_ = []; } - async read (opt_buffer) { + _create_cancel_response () { + return { + chunk: null, + n_read: 0, + debug_meta: { + source: 'delegate', + returning: 'cancelled', + this_value_should_not_be_used: true, + }, + }; + } + + async read_and_get_info (opt_buffer, cancel_state) { if ( ! opt_buffer && this.chunks_.length === 0 ) { - return await this.delegate.read(); + const chunk = await this.delegate.read(); + if ( cancel_state?.cancelled ) { + // push the chunk back onto the queue + this.chunks_.push(chunk.value); + return this._create_cancel_response(); + } + return { + chunk, + debug_meta: { source: 'delegate' }, + }; } const chunk = await this.getChunk_(); + if ( cancel_state?.cancelled ) { + // push the chunk back onto the queue + this.chunks_.push(chunk); + return this._create_cancel_response(); + } if ( ! opt_buffer ) { - return chunk; + return { chunk, debug_meta: { source: 'stored chunks', returning: 'chunk' } }; + } + + if ( ! chunk ) { + return { n_read: 0, debug_meta: { source: 'nothing', returning: 'byte count' } }; } this.chunks_.push(chunk); while ( this.getTotalBytesReady_() < opt_buffer.length ) { - this.chunks_.push(await this.getChunk_()) + const read_chunk = await this.getChunk_(); + if ( cancel_state?.cancelled ) { + // push the chunk back onto the queue + this.chunks_.push(read_chunk); + return this._create_cancel_response(); + } + if ( ! read_chunk ) { + break; + } + this.chunks_.push(read_chunk); } - // TODO: need to handle EOT condition in this loop let offset = 0; - for (;;) { + while ( this.chunks_.length > 0 && offset < opt_buffer.length ) { let item = this.chunks_.shift(); if ( item === undefined ) { - throw new Error('calculation is wrong') + break; } if ( offset + item.length > opt_buffer.length ) { const diff = opt_buffer.length - offset; @@ -58,17 +98,49 @@ export class BetterReader { } opt_buffer.set(item, offset); offset += item.length; - - if ( offset == opt_buffer.length ) break; } - // return opt_buffer.length; + return { + n_read: offset, + debug_meta: { source: 'stored chunks', returning: 'byte count' }, + }; + } + + read_with_cancel (opt_buffer) { + const cancel_state = { cancelled: false }; + const promise = (async () => { + const { chunk, n_read } = await this.read_and_get_info(opt_buffer, cancel_state); + return opt_buffer ? n_read : chunk; + })(); + return { + canceller: () => { + cancel_state.cancelled = true; + }, + promise, + }; + } + + async read (opt_buffer) { + const { chunk, n_read } = await this.read_and_get_info(opt_buffer); + return opt_buffer ? n_read : chunk; } async getChunk_() { if ( this.chunks_.length === 0 ) { - const { value } = await this.delegate.read(); - return value; + // Wait for either a delegate read to happen, or for a chunk to be added to the buffer from a cancelled read. + const delegate_read = this.delegate.read(); + const [which, result] = await raceCase({ + delegate: delegate_read, + buffer_not_empty: this.waitUntilDataAvailable(), + }); + if (which === 'delegate') { + return result.value; + } + // There's a chunk in the buffer now, so we can use the regular path. + // But first, make sure that once the delegate read completes, we save the chunk. + delegate_read.then((chunk) => { + this.chunks_.push(chunk.value); + }) } const len = this.getTotalBytesReady_(); @@ -87,6 +159,30 @@ export class BetterReader { getTotalBytesReady_ () { return this.chunks_.reduce((sum, chunk) => sum + chunk.length, 0); } + + canRead() { + return this.getTotalBytesReady_() > 0; + } + + async waitUntilDataAvailable() { + let resolve_promise; + let reject_promise; + const promise = new Promise((resolve, reject) => { + resolve_promise = resolve; + reject_promise = reject; + }); + + const check = () => { + if (this.canRead()) { + resolve_promise(); + } else { + setTimeout(check, 0); + } + }; + setTimeout(check, 0); + + await promise; + } } /** diff --git a/packages/phoenix/src/ansi-shell/pipeline/Coupler.js b/packages/phoenix/src/ansi-shell/pipeline/Coupler.js index 2845be3aa..3182c4835 100644 --- a/packages/phoenix/src/ansi-shell/pipeline/Coupler.js +++ b/packages/phoenix/src/ansi-shell/pipeline/Coupler.js @@ -16,7 +16,7 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ -import { TeePromise } from "../../promise.js"; +import { TeePromise, raceCase } from "../../promise.js"; export class Coupler { static description = ` @@ -40,7 +40,6 @@ export class Coupler { close () { this.closed_.resolve({ - value: undefined, done: true, }); } @@ -48,11 +47,21 @@ export class Coupler { async listenLoop_ () { this.active = true; for (;;) { - const { value, done } = await Promise.race([ - this.closed_, - this.source.read(), - ]); + let canceller = () => {}; + let promise; + if ( this.source.read_with_cancel !== undefined ) { + ({ canceller, promise } = this.source.read_with_cancel()); + } else { + promise = this.source.read(); + } + const [which, { value, done }] = await raceCase({ + source: promise, + closed: this.closed_, + }); if ( done ) { + if ( which === 'closed' ) { + canceller(); + } this.source = null; this.target = null; this.active = false; diff --git a/packages/phoenix/src/promise.js b/packages/phoenix/src/promise.js index 20d080ae1..5c47d2bd1 100644 --- a/packages/phoenix/src/promise.js +++ b/packages/phoenix/src/promise.js @@ -41,3 +41,17 @@ export class TeePromise { return this.then(fn); } } + +/** + * raceCase is like Promise.race except it takes an object instead of + * an array, and returns the key of the promise that resolves first + * as well as the value that it resolved to. + * + * @param {Object.} promise_map + * + * @returns {Promise.<[string, any]>} + */ +export const raceCase = async (promise_map) => { + return Promise.race(Object.entries(promise_map).map( + ([key, promise]) => promise.then(value => [key, value]))); +}; From 2f49c1c9b0ca7616d442a2fa4a027266b4817b8e Mon Sep 17 00:00:00 2001 From: Sam Atkins Date: Fri, 19 Apr 2024 17:29:26 +0100 Subject: [PATCH 08/11] Remove a whole load of noisy log messages MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit These can be added back if and when we need them, but right now, it's hard to follow console output with all this chatter. 😅 --- .../parse_impls/StrUntilParserImpl.js | 3 --- .../ContextSwitchingPStratumImpl.js | 8 ------- .../ansi-shell/arg-parsers/simple-parser.js | 5 ---- .../ansi-shell/parsing/PuterShellParser.js | 1 - .../parsing/buildParserSecondHalf.js | 11 --------- .../src/ansi-shell/pipeline/Pipeline.js | 23 ------------------- .../src/ansi-shell/readline/readline.js | 4 ---- packages/terminal/src/main.js | 13 ----------- src/UI/UIWindow.js | 1 - src/helpers.js | 3 --- 10 files changed, 72 deletions(-) diff --git a/packages/phoenix/packages/strataparse/parse_impls/StrUntilParserImpl.js b/packages/phoenix/packages/strataparse/parse_impls/StrUntilParserImpl.js index 4ca5b81ab..9f627faf1 100644 --- a/packages/phoenix/packages/strataparse/parse_impls/StrUntilParserImpl.js +++ b/packages/phoenix/packages/strataparse/parse_impls/StrUntilParserImpl.js @@ -23,7 +23,6 @@ export default class StrUntilParserImpl { parse (lexer) { let text = ''; for ( ;; ) { - console.log('B') let { done, value } = lexer.look(); if ( done ) break; @@ -41,8 +40,6 @@ export default class StrUntilParserImpl { if ( text.length === 0 ) return; - console.log('test?', text) - return { $: 'until', text }; } } diff --git a/packages/phoenix/packages/strataparse/strata_impls/ContextSwitchingPStratumImpl.js b/packages/phoenix/packages/strataparse/strata_impls/ContextSwitchingPStratumImpl.js index 68205931a..de8190728 100644 --- a/packages/phoenix/packages/strataparse/strata_impls/ContextSwitchingPStratumImpl.js +++ b/packages/phoenix/packages/strataparse/strata_impls/ContextSwitchingPStratumImpl.js @@ -22,7 +22,6 @@ export default class ContextSwitchingPStratumImpl { constructor ({ contexts, entry }) { this.contexts = { ...contexts }; for ( const key in this.contexts ) { - console.log('parsers?', this.contexts[key]); const new_array = []; for ( const parser of this.contexts[key] ) { if ( parser.hasOwnProperty('transition') ) { @@ -44,7 +43,6 @@ export default class ContextSwitchingPStratumImpl { this.lastvalue = null; } get stack_top () { - console.log('stack top?', this.stack[this.stack.length - 1]) return this.stack[this.stack.length - 1]; } get current_context () { @@ -55,7 +53,6 @@ export default class ContextSwitchingPStratumImpl { const lexer = api.delegate; const context = this.current_context; - console.log('context?', context); for ( const spec of context ) { { const { done, value } = lexer.look(); @@ -64,7 +61,6 @@ export default class ContextSwitchingPStratumImpl { throw new Error('infinite loop'); } this.lastvalue = value; - console.log('last value?', value, done); if ( done ) return { done }; } @@ -76,7 +72,6 @@ export default class ContextSwitchingPStratumImpl { } const subLexer = lexer.fork(); - // console.log('spec?', spec); const result = parser.parse(subLexer); if ( result.status === ParseResult.UNRECOGNIZED ) { continue; @@ -84,11 +79,9 @@ export default class ContextSwitchingPStratumImpl { if ( result.status === ParseResult.INVALID ) { return { done: true, value: result }; } - console.log('RESULT', result, spec) if ( ! peek ) lexer.join(subLexer); if ( transition ) { - console.log('GOT A TRANSITION') if ( transition.pop ) this.stack.pop(); if ( transition.to ) this.stack.push({ context_name: transition.to, @@ -97,7 +90,6 @@ export default class ContextSwitchingPStratumImpl { if ( result.value.$discard || peek ) return this.next(api); - console.log('PROVIDING VALUE', result.value); return { done: false, value: result.value }; } diff --git a/packages/phoenix/src/ansi-shell/arg-parsers/simple-parser.js b/packages/phoenix/src/ansi-shell/arg-parsers/simple-parser.js index c8f4832a0..a0fc8ff4d 100644 --- a/packages/phoenix/src/ansi-shell/arg-parsers/simple-parser.js +++ b/packages/phoenix/src/ansi-shell/arg-parsers/simple-parser.js @@ -22,11 +22,6 @@ import { DEFAULT_OPTIONS } from '../../puter-shell/coreutils/coreutil_lib/help.j export default { name: 'simple-parser', async process (ctx, spec) { - console.log({ - ...spec, - args: ctx.locals.args - }); - // Insert standard options spec.options = Object.assign(spec.options || {}, DEFAULT_OPTIONS); diff --git a/packages/phoenix/src/ansi-shell/parsing/PuterShellParser.js b/packages/phoenix/src/ansi-shell/parsing/PuterShellParser.js index b38ffa6de..8dc2626c9 100644 --- a/packages/phoenix/src/ansi-shell/parsing/PuterShellParser.js +++ b/packages/phoenix/src/ansi-shell/parsing/PuterShellParser.js @@ -37,7 +37,6 @@ export class PuterShellParser { if ( sp.error ) { throw new Error(sp.error); } - console.log('PARSER RESULT', result); return result; } parseScript (input) { diff --git a/packages/phoenix/src/ansi-shell/parsing/buildParserSecondHalf.js b/packages/phoenix/src/ansi-shell/parsing/buildParserSecondHalf.js index 4bd4d82a2..1fa6e4ae0 100644 --- a/packages/phoenix/src/ansi-shell/parsing/buildParserSecondHalf.js +++ b/packages/phoenix/src/ansi-shell/parsing/buildParserSecondHalf.js @@ -54,7 +54,6 @@ class ReducePrimitivesPStratumImpl { let text = ''; for ( const item of contents.results ) { if ( item.$ === 'string.segment' ) { - // console.log('segment?', item.text) text += item.text; continue; } @@ -86,7 +85,6 @@ class ShellConstructsPStratumImpl { node.commands = []; }, exit ({ node }) { - console.log('!!!!!',this.stack_top.node) if ( this.stack_top?.node?.$ === 'script' ) { this.stack_top.node.statements.push(node); } @@ -96,7 +94,6 @@ class ShellConstructsPStratumImpl { }, next ({ value, lexer }) { if ( value.$ === 'op.line-terminator' ) { - console.log('the stack??', this.stack) this.pop(); return; } @@ -189,7 +186,6 @@ class ShellConstructsPStratumImpl { }, next ({ value, lexer }) { if ( value.$ === 'op.line-terminator' ) { - console.log('well, got here') this.pop(); return; } @@ -223,9 +219,7 @@ class ShellConstructsPStratumImpl { this.stack_top.node.components.push(...node.components); }, next ({ node, value, lexer }) { - console.log('WHAT THO', node) if ( value.$ === 'op.line-terminator' && node.quote === null ) { - console.log('well, got here') this.pop(); return; } @@ -292,7 +286,6 @@ class ShellConstructsPStratumImpl { const lexer = api.delegate; - console.log('THE NODE', this.stack[0].node); // return { done: true, value: { $: 'test' } }; for ( let i=0 ; i < 500 ; i++ ) { @@ -306,15 +299,12 @@ class ShellConstructsPStratumImpl { } const { state, node } = this.stack_top; - console.log('value?', value, done) - console.log('state?', state.name); state.next.call(this, { lexer, value, node, state }); // if ( done ) break; } - console.log('THE NODE', this.stack[0]); this.done_ = true; return { done: false, value: this.stack[0].node }; @@ -433,7 +423,6 @@ export const buildParserSecondHalf = (sp, { multiline } = {}) => { // sp.add(new ReducePrimitivesPStratumImpl()); if ( multiline ) { - console.log('USING MULTILINE'); sp.add(new MultilinePStratumImpl()); } else { sp.add(new ShellConstructsPStratumImpl()); diff --git a/packages/phoenix/src/ansi-shell/pipeline/Pipeline.js b/packages/phoenix/src/ansi-shell/pipeline/Pipeline.js index 27ae2c56e..bda57096d 100644 --- a/packages/phoenix/src/ansi-shell/pipeline/Pipeline.js +++ b/packages/phoenix/src/ansi-shell/pipeline/Pipeline.js @@ -38,11 +38,6 @@ class Token { throw new Error('expected token node'); } - console.log('ast has cst?', - ast, - ast.components?.[0]?.$cst - ) - return new Token(ast); } constructor (ast) { @@ -53,20 +48,15 @@ class Token { // If the only components are of type 'symbol' and 'string.segment' // then we can statically resolve the value of the token. - console.log('checking viability of static resolve', this.ast) - const isStatic = this.ast.components.every(c => { return c.$ === 'symbol' || c.$ === 'string.segment'; }); if ( ! isStatic ) return; - console.log('doing static thing', this.ast) - // TODO: Variables can also be statically resolved, I think... let value = ''; for ( const component of this.ast.components ) { - console.log('component', component); value += component.text; } @@ -113,7 +103,6 @@ export class PreparedCommand { // TODO: check that node for command name is of a // supported type - maybe use adapt pattern - console.log('ast?', ast); const cmd = command_token.maybeStaticallyResolve(ctx); const { commands } = ctx.registries; @@ -124,16 +113,13 @@ export class PreparedCommand { : command_token; if ( command === undefined ) { - console.log('command token?', command_token); throw new ConcreteSyntaxError( `no command: ${JSON.stringify(cmd)}`, command_token.$cst, ); - throw new Error('no command: ' + JSON.stringify(cmd)); } // TODO: test this - console.log('ast?', ast); const inputRedirect = ast.inputRedirects.length > 0 ? (() => { const token = Token.createFromAST(ctx, ast.inputRedirects[0]); return token.maybeStaticallyResolve(ctx) ?? token; @@ -172,7 +158,6 @@ export class PreparedCommand { // command to run. if ( command instanceof Token ) { const cmd = await command.resolve(this.ctx); - console.log('RUNNING CMD?', cmd) const { commandProvider } = this.ctx.externs; command = await commandProvider.lookup(cmd, { ctx: this.ctx }); if ( command === undefined ) { @@ -327,23 +312,16 @@ export class PreparedCommand { // TODO: need write command from puter-shell before this can be done for ( let i=0 ; i < this.outputRedirects.length ; i++ ) { - console.log('output redirect??', this.outputRedirects[i]); const { filesystem } = this.ctx.platform; const outputRedirect = this.outputRedirects[i]; const dest_path = outputRedirect instanceof Token ? await outputRedirect.resolve(this.ctx) : outputRedirect; const path = resolveRelativePath(ctx.vars, dest_path); - console.log('it should work?', { - path, - outputMemWriters, - }) // TODO: error handling here await filesystem.write(path, outputMemWriters[i].getAsBlob()); } - - console.log('OUTPUT WRITERS', outputMemWriters); } } @@ -405,7 +383,6 @@ export class Pipeline { commandPromises.push(command.execute()); } await Promise.all(commandPromises); - console.log('PIPELINE DONE'); await coupler.isDone; } diff --git a/packages/phoenix/src/ansi-shell/readline/readline.js b/packages/phoenix/src/ansi-shell/readline/readline.js index e7f816d22..0a9c04901 100644 --- a/packages/phoenix/src/ansi-shell/readline/readline.js +++ b/packages/phoenix/src/ansi-shell/readline/readline.js @@ -96,7 +96,6 @@ const ReadlineProcessorBuilder = builder => builder externs.out.write(externs.prompt); externs.out.write(vars.result); const invCurPos = vars.result.length - vars.cursor; - console.log(invCurPos) if ( invCurPos !== 0 ) { externs.out.write(`\x1B[${invCurPos}D`); } @@ -111,8 +110,6 @@ const ReadlineProcessorBuilder = builder => builder } })); // NEXT: get tab completer for input state - console.log('input state', inputState); - let completer = null; if ( inputState.$ === 'redirect' ) { completer = new FileCompleter(); @@ -141,7 +138,6 @@ const ReadlineProcessorBuilder = builder => builder const applyCompletion = txt => { const p1 = vars.result.slice(0, vars.cursor); const p2 = vars.result.slice(vars.cursor); - console.log({ p1, p2 }); vars.result = p1 + txt + p2; vars.cursor += txt.length; externs.out.write(txt); diff --git a/packages/terminal/src/main.js b/packages/terminal/src/main.js index 785d4c709..0935fad88 100644 --- a/packages/terminal/src/main.js +++ b/packages/terminal/src/main.js @@ -41,27 +41,14 @@ class XTermIO { } async handleKeyBeforeProcess (evt) { - console.log( - 'right this event might be up or down so it\'s necessary to determine which', - evt, - ); if ( evt.key === 'V' && evt.ctrlKey && evt.shiftKey && evt.type === 'keydown' ) { const clipboard = navigator.clipboard; const text = await clipboard.readText(); - console.log( - 'this is the relevant text for this thing that is the thing that is the one that is here', - text, - ); this.pty.out.write(text); } } handleKey ({ key, domEvent }) { - console.log( - 'key event happened', - key, - domEvent, - ); const pty = this.pty; const handlers = { diff --git a/src/UI/UIWindow.js b/src/UI/UIWindow.js index d4c4e9367..725278e1d 100644 --- a/src/UI/UIWindow.js +++ b/src/UI/UIWindow.js @@ -2773,7 +2773,6 @@ window.sidebar_item_droppable = (el_window)=>{ // closes a window $.fn.close = async function(options) { options = options || {}; - console.log(options); $(this).each(async function() { const el_iframe = $(this).find('.window-app-iframe'); const app_uses_sdk = el_iframe.length > 0 && el_iframe.attr('data-appUsesSDK') === 'true'; diff --git a/src/helpers.js b/src/helpers.js index 07db8e5aa..0f43ca7a0 100644 --- a/src/helpers.js +++ b/src/helpers.js @@ -1839,8 +1839,6 @@ window.launch_app = async (options)=>{ // ...and finally append urm_source=puter.com to the URL iframe_url.searchParams.append('urm_source', 'puter.com'); - console.log('backgrounded??', app_info.background); - el_win = UIWindow({ element_uuid: uuid, title: title, @@ -1895,7 +1893,6 @@ window.launch_app = async (options)=>{ (async () => { const el = await el_win; - console.log('RESOV', el); $(el).on('remove', () => { const svc_process = globalThis.services.get('process'); svc_process.unregister(process.uuid); From c3654ab148e42ec084bcefeb9c3a97583a6e7e37 Mon Sep 17 00:00:00 2001 From: KernelDeimos Date: Fri, 19 Apr 2024 23:24:32 -0400 Subject: [PATCH 09/11] Add more streaming fixes --- packages/phoenix/packages/pty/exports.js | 255 ++++++++++++++++-- .../src/ansi-shell/pipeline/Coupler.js | 11 +- .../src/ansi-shell/pipeline/Pipeline.js | 7 + .../src/ansi-shell/readline/readline.js | 1 + 4 files changed, 252 insertions(+), 22 deletions(-) diff --git a/packages/phoenix/packages/pty/exports.js b/packages/phoenix/packages/pty/exports.js index 56c87f3a2..ff62d4e9f 100644 --- a/packages/phoenix/packages/pty/exports.js +++ b/packages/phoenix/packages/pty/exports.js @@ -16,19 +16,145 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ -import { raceCase } from '../../src/promise.js'; +import { TeePromise, raceCase } from '../../src/promise.js'; const encoder = new TextEncoder(); const CHAR_LF = '\n'.charCodeAt(0); const CHAR_CR = '\r'.charCodeAt(0); +const DONE = Symbol('done'); + +class Channel { + constructor () { + this.chunks_ = []; + + globalThis.chnl = this; + + const events = ['write','consume','change']; + for ( const event of events ) { + this[`on_${event}_`] = []; + this[`emit_${event}_`] = () => { + for ( const listener of this[`on_${event}_`] ) { + listener(); + } + }; + } + + this.on('write', () => { this.emit_change_(); }); + this.on('consume', () => { this.emit_change_(); }); + } + + on (event, listener) { + this[`on_${event}_`].push(listener); + } + + off (event, listener) { + const index = this[`on_${event}_`].indexOf(listener); + if ( index !== -1 ) { + this[`on_${event}_`].splice(index, 1); + } + } + + get () { + const cancel = new TeePromise(); + const data = new TeePromise(); + const done = new TeePromise(); + + let called = 0; + + const on_data = () => { + if ( this.chunks_.length > 0 ) { + if ( called > 0 ) { + throw new Error('called more than once'); + } + called++; + const chunk = this.chunks_.shift(); + console.log('shifted off chunk', chunk); + ( chunk === DONE ? done : data ).resolve(chunk); + this.off('write', on_data); + this.emit_consume_(); + } + }; + + console.log('this case', this.chunks_.length); + + this.on('write', on_data); + on_data(); + + const to_return = { + cancel: () => { + console.log('cancel called'); + this.off('write', on_data); + cancel.resolve(); + }, + promise: raceCase({ + cancel, + data, + done, + }), + }; + + console.log('to_return?', to_return); + return to_return; + } + + write (chunk) { + this.chunks_.push(chunk); + this.emit_write_(); + } + + pushback (...chunks) { + console.log('pushing back...', chunks) + for ( let i = chunks.length - 1; i >= 0; i-- ) { + console.log('unshifting ', i, chunks[i]); + console.log('chunks_ before unshift', this.chunks_.length); + this.chunks_.unshift(chunks[i]); + console.log('chunks_ after unshift', this.chunks_.length); + + } + this.emit_write_(); + } + + is_empty () { + return this.chunks_.length === 0; + } +} + export class BetterReader { constructor ({ delegate }) { this.delegate = delegate; this.chunks_ = []; + this.channel_ = new Channel(); + + this._init(); } + _init () { + let working = Promise.resolve(); + this.channel_.on('consume', async () => { + await working; + working = new TeePromise(); + if ( this.channel_.is_empty() ) { + await this.intake_(); + } + working.resolve(); + }); + this.intake_(); + } + + async intake_ () { + const { value, done } = await this.delegate.read(); + if ( done ) { + console.log('writing to channel '); + this.channel_.write(DONE); + return; + } + console.log('writing to channel', value); + this.channel_.write(value); + } + + _create_cancel_response () { return { chunk: null, @@ -41,12 +167,75 @@ export class BetterReader { }; } - async read_and_get_info (opt_buffer, cancel_state) { + read_and_get_info (opt_buffer, cancel_state) { + if ( ! opt_buffer ) { + const { promise, cancel } = this.channel_.get(); + return { + cancel, + promise: promise.then(([which, chunk]) => { + if ( which !== 'data' ) { + return { done: true, value: null }; + } + return { value: chunk }; + }), + + }; + } + console.log('!!!'); + + const final_promise = new TeePromise(); + let current_cancel_ = () => {}; + + (async () => { + console.log('STARTING BUFFER READ'); + let n_read = 0; + const chunks = []; + while ( n_read < opt_buffer.length ) { + const { promise, cancel } = this.channel_.get(); + current_cancel_ = cancel; + + let [which, chunk] = await promise; + console.log('which', which, 'chunk', chunk) + if ( which === 'done' ) { + break; + } + if ( which === 'cancel' ) { + this.channel_.pushback(...chunks); + return + } + if ( n_read + chunk.length > opt_buffer.length ) { + const diff = opt_buffer.length - n_read; + this.channel_.pushback(chunk.subarray(diff)); + chunk = chunk.subarray(0, diff); + } + chunks.push(chunk); + console.log('calling set', chunk, n_read, opt_buffer.length); + opt_buffer.set(chunk, n_read); + n_read += chunk.length; + } + + console.log('RESOLVING', opt_buffer); + console.log('-- and channel?', this.channel_.chunks_.length); + + final_promise.resolve({ n_read }); + })(); + + return { + cancel: () => { + current_cancel_(); + }, + promise: final_promise, + }; + + // --- everything below this line is being removed --- + + /* if ( ! opt_buffer && this.chunks_.length === 0 ) { const chunk = await this.delegate.read(); if ( cancel_state?.cancelled ) { // push the chunk back onto the queue - this.chunks_.push(chunk.value); + console.log('aaa', chunk); + this.chunks_.push(chunk); return this._create_cancel_response(); } return { @@ -56,8 +245,10 @@ export class BetterReader { } const chunk = await this.getChunk_(); + console.log('what we got', chunk); if ( cancel_state?.cancelled ) { // push the chunk back onto the queue + console.log('bbb', chunk); this.chunks_.push(chunk); return this._create_cancel_response(); } @@ -70,30 +261,49 @@ export class BetterReader { return { n_read: 0, debug_meta: { source: 'nothing', returning: 'byte count' } }; } + console.log('ccc', chunk); this.chunks_.push(chunk); + let itermax = 20; while ( this.getTotalBytesReady_() < opt_buffer.length ) { + if ( --itermax < 0 ) { + throw new Error('too many iterations'); + } + console.log('iter b'); const read_chunk = await this.getChunk_(); if ( cancel_state?.cancelled ) { // push the chunk back onto the queue + console.log('ddd', chunk); this.chunks_.push(read_chunk); return this._create_cancel_response(); } if ( ! read_chunk ) { break; } + console.log('adding chunk', read_chunk) this.chunks_.push(read_chunk); } let offset = 0; while ( this.chunks_.length > 0 && offset < opt_buffer.length ) { + console.log('iter a') let item = this.chunks_.shift(); if ( item === undefined ) { + console.log('undefined ', this.chunks_); break; } + if ( item.value === undefined ) { + console.log('undefined ', item, this.chunks_); + break; + } + const is_done = item.done; + item = item.value; if ( offset + item.length > opt_buffer.length ) { const diff = opt_buffer.length - offset; - this.chunks_.unshift(item.subarray(diff)); + this.chunks_.unshift({ + done: is_done, + value: item.subarray(diff), + }); item = item.subarray(0, diff); } opt_buffer.set(item, offset); @@ -104,24 +314,28 @@ export class BetterReader { n_read: offset, debug_meta: { source: 'stored chunks', returning: 'byte count' }, }; + /**/ } read_with_cancel (opt_buffer) { - const cancel_state = { cancelled: false }; - const promise = (async () => { - const { chunk, n_read } = await this.read_and_get_info(opt_buffer, cancel_state); - return opt_buffer ? n_read : chunk; - })(); + console.log('read with cancel called'); + const o = this.read_and_get_info(opt_buffer); + const { cancel, promise } = o; + promise.then(v => { + console.log('promise resolved', v); + }); + // const promise = (async () => { + // const { chunk, n_read } = await this.read_and_get_info(opt_buffer, cancel_state); + // return opt_buffer ? n_read : chunk; + // })(); return { - canceller: () => { - cancel_state.cancelled = true; - }, + cancel, promise, }; } async read (opt_buffer) { - const { chunk, n_read } = await this.read_and_get_info(opt_buffer); + const { chunk, n_read } = await this.read_and_get_info(opt_buffer).promise; return opt_buffer ? n_read : chunk; } @@ -133,17 +347,19 @@ export class BetterReader { delegate: delegate_read, buffer_not_empty: this.waitUntilDataAvailable(), }); + console.log('which?', which); if (which === 'delegate') { - return result.value; + return result; } + // There's a chunk in the buffer now, so we can use the regular path. // But first, make sure that once the delegate read completes, we save the chunk. - delegate_read.then((chunk) => { - this.chunks_.push(chunk.value); - }) + console.log('result', result) + this.chunks_.push(result); } const len = this.getTotalBytesReady_(); + console.log('len', len); const merged = new Uint8Array(len); let offset = 0; for ( const item of this.chunks_ ) { @@ -157,7 +373,10 @@ export class BetterReader { } getTotalBytesReady_ () { - return this.chunks_.reduce((sum, chunk) => sum + chunk.length, 0); + return this.chunks_.reduce((sum, chunk) => { + console.log('sum', sum, 'chunk', chunk); + return sum + chunk.value.length + }, 0); } canRead() { diff --git a/packages/phoenix/src/ansi-shell/pipeline/Coupler.js b/packages/phoenix/src/ansi-shell/pipeline/Coupler.js index 3182c4835..d7679d496 100644 --- a/packages/phoenix/src/ansi-shell/pipeline/Coupler.js +++ b/packages/phoenix/src/ansi-shell/pipeline/Coupler.js @@ -47,20 +47,22 @@ export class Coupler { async listenLoop_ () { this.active = true; for (;;) { - let canceller = () => {}; + let cancel = () => {}; let promise; if ( this.source.read_with_cancel !== undefined ) { - ({ canceller, promise } = this.source.read_with_cancel()); + ({ cancel, promise } = this.source.read_with_cancel()); } else { promise = this.source.read(); } - const [which, { value, done }] = await raceCase({ + const [which, result] = await raceCase({ source: promise, closed: this.closed_, }); + console.log('result?', which, result); + const { value, done } = result; if ( done ) { if ( which === 'closed' ) { - canceller(); + cancel(); } this.source = null; this.target = null; @@ -69,6 +71,7 @@ export class Coupler { break; } if ( this.on_ ) { + if ( ! value ) debugger; await this.target.write(value); } } diff --git a/packages/phoenix/src/ansi-shell/pipeline/Pipeline.js b/packages/phoenix/src/ansi-shell/pipeline/Pipeline.js index bda57096d..b67dec868 100644 --- a/packages/phoenix/src/ansi-shell/pipeline/Pipeline.js +++ b/packages/phoenix/src/ansi-shell/pipeline/Pipeline.js @@ -285,7 +285,9 @@ export class PreparedCommand { let exit_code = 0; try { + console.log(`awaiting execute for ${command.name}`) await execute(ctx); + console.log(`DONE execute for ${command.name}`) valve.close(); } catch (e) { if ( e instanceof Exit ) { @@ -304,10 +306,12 @@ export class PreparedCommand { ); ctx.locals.exit = -1; } + if ( ! (e instanceof Exit) ) console.error(e); } // ctx.externs.in?.close?.(); // ctx.externs.out?.close?.(); + console.log(`calling close for ${command.name}`, ctx.externs.out); await ctx.externs.out.close(); // TODO: need write command from puter-shell before this can be done @@ -382,8 +386,11 @@ export class Pipeline { const command = preparedCommands[i]; commandPromises.push(command.execute()); } + console.log('command promises', commandPromises); await Promise.all(commandPromises); + console.log('|AWAIT COUPLER'); await coupler.isDone; + console.log('|DONE AWAIT COUPLER'); } } \ No newline at end of file diff --git a/packages/phoenix/src/ansi-shell/readline/readline.js b/packages/phoenix/src/ansi-shell/readline/readline.js index 0a9c04901..64d78ef00 100644 --- a/packages/phoenix/src/ansi-shell/readline/readline.js +++ b/packages/phoenix/src/ansi-shell/readline/readline.js @@ -58,6 +58,7 @@ const ReadlineProcessorBuilder = builder => builder const byteBuffer = new Uint8Array(1); await externs.in_.read(byteBuffer); + console.log('got a byte!', byteBuffer[0]); locals.byteBuffer = byteBuffer; locals.byte = byteBuffer[0]; }) From 3f249fcc89ba37be1eadfa1e808db1f03656ebe9 Mon Sep 17 00:00:00 2001 From: KernelDeimos Date: Fri, 19 Apr 2024 23:31:23 -0400 Subject: [PATCH 10/11] Fix pipes --- packages/phoenix/src/ansi-shell/pipeline/Coupler.js | 4 +--- packages/phoenix/src/ansi-shell/pipeline/Pipeline.js | 12 +++++++----- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/packages/phoenix/src/ansi-shell/pipeline/Coupler.js b/packages/phoenix/src/ansi-shell/pipeline/Coupler.js index d7679d496..bb0efe68a 100644 --- a/packages/phoenix/src/ansi-shell/pipeline/Coupler.js +++ b/packages/phoenix/src/ansi-shell/pipeline/Coupler.js @@ -61,9 +61,7 @@ export class Coupler { console.log('result?', which, result); const { value, done } = result; if ( done ) { - if ( which === 'closed' ) { - cancel(); - } + cancel(); this.source = null; this.target = null; this.active = false; diff --git a/packages/phoenix/src/ansi-shell/pipeline/Pipeline.js b/packages/phoenix/src/ansi-shell/pipeline/Pipeline.js index b67dec868..910108e4a 100644 --- a/packages/phoenix/src/ansi-shell/pipeline/Pipeline.js +++ b/packages/phoenix/src/ansi-shell/pipeline/Pipeline.js @@ -187,10 +187,6 @@ export class PreparedCommand { in_ = new MemReader(response); } - const internal_input_pipe = new Pipe(); - const valve = new Coupler(in_, internal_input_pipe.in); - in_ = internal_input_pipe.out; - // simple naive implementation for now const sig = { listeners_: [], @@ -288,7 +284,6 @@ export class PreparedCommand { console.log(`awaiting execute for ${command.name}`) await execute(ctx); console.log(`DONE execute for ${command.name}`) - valve.close(); } catch (e) { if ( e instanceof Exit ) { exit_code = e.code; @@ -353,6 +348,11 @@ export class Pipeline { let nextIn = ctx.externs.in; let lastPipe = null; + // Create valve to close input pipe when done + const pipeline_input_pipe = new Pipe(); + const valve = new Coupler(nextIn, pipeline_input_pipe.in); + nextIn = pipeline_input_pipe.out; + // TOOD: this will eventually defer piping of certain // sub-pipelines to the Puter Shell. @@ -392,5 +392,7 @@ export class Pipeline { console.log('|AWAIT COUPLER'); await coupler.isDone; console.log('|DONE AWAIT COUPLER'); + + valve.close(); } } \ No newline at end of file From 27553ef926555cb7deaa8bf87862e7bcdfe57890 Mon Sep 17 00:00:00 2001 From: KernelDeimos Date: Fri, 19 Apr 2024 23:35:56 -0400 Subject: [PATCH 11/11] Cleanup --- packages/phoenix/packages/pty/exports.js | 115 ------------------ .../src/ansi-shell/ioutil/SignalReader.js | 1 - .../src/ansi-shell/pipeline/Coupler.js | 1 - .../src/ansi-shell/pipeline/Pipeline.js | 9 -- .../src/ansi-shell/readline/readline.js | 1 - 5 files changed, 127 deletions(-) diff --git a/packages/phoenix/packages/pty/exports.js b/packages/phoenix/packages/pty/exports.js index ff62d4e9f..3f45abf0c 100644 --- a/packages/phoenix/packages/pty/exports.js +++ b/packages/phoenix/packages/pty/exports.js @@ -70,21 +70,17 @@ class Channel { } called++; const chunk = this.chunks_.shift(); - console.log('shifted off chunk', chunk); ( chunk === DONE ? done : data ).resolve(chunk); this.off('write', on_data); this.emit_consume_(); } }; - console.log('this case', this.chunks_.length); - this.on('write', on_data); on_data(); const to_return = { cancel: () => { - console.log('cancel called'); this.off('write', on_data); cancel.resolve(); }, @@ -95,7 +91,6 @@ class Channel { }), }; - console.log('to_return?', to_return); return to_return; } @@ -105,12 +100,8 @@ class Channel { } pushback (...chunks) { - console.log('pushing back...', chunks) for ( let i = chunks.length - 1; i >= 0; i-- ) { - console.log('unshifting ', i, chunks[i]); - console.log('chunks_ before unshift', this.chunks_.length); this.chunks_.unshift(chunks[i]); - console.log('chunks_ after unshift', this.chunks_.length); } this.emit_write_(); @@ -146,11 +137,9 @@ export class BetterReader { async intake_ () { const { value, done } = await this.delegate.read(); if ( done ) { - console.log('writing to channel '); this.channel_.write(DONE); return; } - console.log('writing to channel', value); this.channel_.write(value); } @@ -181,13 +170,11 @@ export class BetterReader { }; } - console.log('!!!'); const final_promise = new TeePromise(); let current_cancel_ = () => {}; (async () => { - console.log('STARTING BUFFER READ'); let n_read = 0; const chunks = []; while ( n_read < opt_buffer.length ) { @@ -195,7 +182,6 @@ export class BetterReader { current_cancel_ = cancel; let [which, chunk] = await promise; - console.log('which', which, 'chunk', chunk) if ( which === 'done' ) { break; } @@ -209,14 +195,10 @@ export class BetterReader { chunk = chunk.subarray(0, diff); } chunks.push(chunk); - console.log('calling set', chunk, n_read, opt_buffer.length); opt_buffer.set(chunk, n_read); n_read += chunk.length; } - console.log('RESOLVING', opt_buffer); - console.log('-- and channel?', this.channel_.chunks_.length); - final_promise.resolve({ n_read }); })(); @@ -226,104 +208,11 @@ export class BetterReader { }, promise: final_promise, }; - - // --- everything below this line is being removed --- - - /* - if ( ! opt_buffer && this.chunks_.length === 0 ) { - const chunk = await this.delegate.read(); - if ( cancel_state?.cancelled ) { - // push the chunk back onto the queue - console.log('aaa', chunk); - this.chunks_.push(chunk); - return this._create_cancel_response(); - } - return { - chunk, - debug_meta: { source: 'delegate' }, - }; - } - - const chunk = await this.getChunk_(); - console.log('what we got', chunk); - if ( cancel_state?.cancelled ) { - // push the chunk back onto the queue - console.log('bbb', chunk); - this.chunks_.push(chunk); - return this._create_cancel_response(); - } - - if ( ! opt_buffer ) { - return { chunk, debug_meta: { source: 'stored chunks', returning: 'chunk' } }; - } - - if ( ! chunk ) { - return { n_read: 0, debug_meta: { source: 'nothing', returning: 'byte count' } }; - } - - console.log('ccc', chunk); - this.chunks_.push(chunk); - - let itermax = 20; - while ( this.getTotalBytesReady_() < opt_buffer.length ) { - if ( --itermax < 0 ) { - throw new Error('too many iterations'); - } - console.log('iter b'); - const read_chunk = await this.getChunk_(); - if ( cancel_state?.cancelled ) { - // push the chunk back onto the queue - console.log('ddd', chunk); - this.chunks_.push(read_chunk); - return this._create_cancel_response(); - } - if ( ! read_chunk ) { - break; - } - console.log('adding chunk', read_chunk) - this.chunks_.push(read_chunk); - } - - let offset = 0; - while ( this.chunks_.length > 0 && offset < opt_buffer.length ) { - console.log('iter a') - let item = this.chunks_.shift(); - if ( item === undefined ) { - console.log('undefined ', this.chunks_); - break; - } - if ( item.value === undefined ) { - console.log('undefined ', item, this.chunks_); - break; - } - const is_done = item.done; - item = item.value; - if ( offset + item.length > opt_buffer.length ) { - const diff = opt_buffer.length - offset; - this.chunks_.unshift({ - done: is_done, - value: item.subarray(diff), - }); - item = item.subarray(0, diff); - } - opt_buffer.set(item, offset); - offset += item.length; - } - - return { - n_read: offset, - debug_meta: { source: 'stored chunks', returning: 'byte count' }, - }; - /**/ } read_with_cancel (opt_buffer) { - console.log('read with cancel called'); const o = this.read_and_get_info(opt_buffer); const { cancel, promise } = o; - promise.then(v => { - console.log('promise resolved', v); - }); // const promise = (async () => { // const { chunk, n_read } = await this.read_and_get_info(opt_buffer, cancel_state); // return opt_buffer ? n_read : chunk; @@ -347,19 +236,16 @@ export class BetterReader { delegate: delegate_read, buffer_not_empty: this.waitUntilDataAvailable(), }); - console.log('which?', which); if (which === 'delegate') { return result; } // There's a chunk in the buffer now, so we can use the regular path. // But first, make sure that once the delegate read completes, we save the chunk. - console.log('result', result) this.chunks_.push(result); } const len = this.getTotalBytesReady_(); - console.log('len', len); const merged = new Uint8Array(len); let offset = 0; for ( const item of this.chunks_ ) { @@ -374,7 +260,6 @@ export class BetterReader { getTotalBytesReady_ () { return this.chunks_.reduce((sum, chunk) => { - console.log('sum', sum, 'chunk', chunk); return sum + chunk.value.length }, 0); } diff --git a/packages/phoenix/src/ansi-shell/ioutil/SignalReader.js b/packages/phoenix/src/ansi-shell/ioutil/SignalReader.js index 59d69a0a3..b7eafc86e 100644 --- a/packages/phoenix/src/ansi-shell/ioutil/SignalReader.js +++ b/packages/phoenix/src/ansi-shell/ioutil/SignalReader.js @@ -48,7 +48,6 @@ export class SignalReader extends ProxyReader { // show hex for debugging // console.log(value.split('').map(c => c.charCodeAt(0).toString(16)).join(' ')); - console.log('value??', value) for ( const [key, signal] of mapping ) { if ( tmp_value.includes(key) ) { diff --git a/packages/phoenix/src/ansi-shell/pipeline/Coupler.js b/packages/phoenix/src/ansi-shell/pipeline/Coupler.js index bb0efe68a..7ad454a74 100644 --- a/packages/phoenix/src/ansi-shell/pipeline/Coupler.js +++ b/packages/phoenix/src/ansi-shell/pipeline/Coupler.js @@ -58,7 +58,6 @@ export class Coupler { source: promise, closed: this.closed_, }); - console.log('result?', which, result); const { value, done } = result; if ( done ) { cancel(); diff --git a/packages/phoenix/src/ansi-shell/pipeline/Pipeline.js b/packages/phoenix/src/ansi-shell/pipeline/Pipeline.js index 910108e4a..186c8cab9 100644 --- a/packages/phoenix/src/ansi-shell/pipeline/Pipeline.js +++ b/packages/phoenix/src/ansi-shell/pipeline/Pipeline.js @@ -281,9 +281,7 @@ export class PreparedCommand { let exit_code = 0; try { - console.log(`awaiting execute for ${command.name}`) await execute(ctx); - console.log(`DONE execute for ${command.name}`) } catch (e) { if ( e instanceof Exit ) { exit_code = e.code; @@ -304,9 +302,6 @@ export class PreparedCommand { if ( ! (e instanceof Exit) ) console.error(e); } - // ctx.externs.in?.close?.(); - // ctx.externs.out?.close?.(); - console.log(`calling close for ${command.name}`, ctx.externs.out); await ctx.externs.out.close(); // TODO: need write command from puter-shell before this can be done @@ -386,12 +381,8 @@ export class Pipeline { const command = preparedCommands[i]; commandPromises.push(command.execute()); } - console.log('command promises', commandPromises); await Promise.all(commandPromises); - - console.log('|AWAIT COUPLER'); await coupler.isDone; - console.log('|DONE AWAIT COUPLER'); valve.close(); } diff --git a/packages/phoenix/src/ansi-shell/readline/readline.js b/packages/phoenix/src/ansi-shell/readline/readline.js index 64d78ef00..0a9c04901 100644 --- a/packages/phoenix/src/ansi-shell/readline/readline.js +++ b/packages/phoenix/src/ansi-shell/readline/readline.js @@ -58,7 +58,6 @@ const ReadlineProcessorBuilder = builder => builder const byteBuffer = new Uint8Array(1); await externs.in_.read(byteBuffer); - console.log('got a byte!', byteBuffer[0]); locals.byteBuffer = byteBuffer; locals.byte = byteBuffer[0]; })