From 072dbe8db5b065a5ead473c45237f374d239b6b3 Mon Sep 17 00:00:00 2001 From: Sam Atkins Date: Thu, 18 Apr 2024 17:39:13 +0100 Subject: [PATCH] Make BetterReader buffer and cancel, to fix stdin data loss BetterReader.read_with_cancel() returns both the read promise, and a function that can be used to cancel the read. A cancelled read is placed back into the BetterReader's chunk buffer, to be consumed by the next user that requests a read. This is used by Coupler so that when the coupler is closed, its pending read() call does not consume the next batch of input. This fixes the problem we were having with child applications consuming one chunk of stdin after they are closed, meaning the first key you press after an app exits would disappear. Co-authored-by: KernelDeimos --- packages/phoenix/packages/pty/exports.js | 120 ++++++++++++++++-- .../src/ansi-shell/pipeline/Coupler.js | 21 ++- packages/phoenix/src/promise.js | 14 ++ 3 files changed, 137 insertions(+), 18 deletions(-) diff --git a/packages/phoenix/packages/pty/exports.js b/packages/phoenix/packages/pty/exports.js index bbab56eff..56c87f3a2 100644 --- a/packages/phoenix/packages/pty/exports.js +++ b/packages/phoenix/packages/pty/exports.js @@ -16,6 +16,8 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ +import { raceCase } from '../../src/promise.js'; + const encoder = new TextEncoder(); const CHAR_LF = '\n'.charCodeAt(0); @@ -27,29 +29,67 @@ export class BetterReader { this.chunks_ = []; } - async read (opt_buffer) { + _create_cancel_response () { + return { + chunk: null, + n_read: 0, + debug_meta: { + source: 'delegate', + returning: 'cancelled', + this_value_should_not_be_used: true, + }, + }; + } + + async read_and_get_info (opt_buffer, cancel_state) { if ( ! opt_buffer && this.chunks_.length === 0 ) { - return await this.delegate.read(); + const chunk = await this.delegate.read(); + if ( cancel_state?.cancelled ) { + // push the chunk back onto the queue + this.chunks_.push(chunk.value); + return this._create_cancel_response(); + } + return { + chunk, + debug_meta: { source: 'delegate' }, + }; } const chunk = await this.getChunk_(); + if ( cancel_state?.cancelled ) { + // push the chunk back onto the queue + this.chunks_.push(chunk); + return this._create_cancel_response(); + } if ( ! opt_buffer ) { - return chunk; + return { chunk, debug_meta: { source: 'stored chunks', returning: 'chunk' } }; + } + + if ( ! chunk ) { + return { n_read: 0, debug_meta: { source: 'nothing', returning: 'byte count' } }; } this.chunks_.push(chunk); while ( this.getTotalBytesReady_() < opt_buffer.length ) { - this.chunks_.push(await this.getChunk_()) + const read_chunk = await this.getChunk_(); + if ( cancel_state?.cancelled ) { + // push the chunk back onto the queue + this.chunks_.push(read_chunk); + return this._create_cancel_response(); + } + if ( ! read_chunk ) { + break; + } + this.chunks_.push(read_chunk); } - // TODO: need to handle EOT condition in this loop let offset = 0; - for (;;) { + while ( this.chunks_.length > 0 && offset < opt_buffer.length ) { let item = this.chunks_.shift(); if ( item === undefined ) { - throw new Error('calculation is wrong') + break; } if ( offset + item.length > opt_buffer.length ) { const diff = opt_buffer.length - offset; @@ -58,17 +98,49 @@ export class BetterReader { } opt_buffer.set(item, offset); offset += item.length; - - if ( offset == opt_buffer.length ) break; } - // return opt_buffer.length; + return { + n_read: offset, + debug_meta: { source: 'stored chunks', returning: 'byte count' }, + }; + } + + read_with_cancel (opt_buffer) { + const cancel_state = { cancelled: false }; + const promise = (async () => { + const { chunk, n_read } = await this.read_and_get_info(opt_buffer, cancel_state); + return opt_buffer ? n_read : chunk; + })(); + return { + canceller: () => { + cancel_state.cancelled = true; + }, + promise, + }; + } + + async read (opt_buffer) { + const { chunk, n_read } = await this.read_and_get_info(opt_buffer); + return opt_buffer ? n_read : chunk; } async getChunk_() { if ( this.chunks_.length === 0 ) { - const { value } = await this.delegate.read(); - return value; + // Wait for either a delegate read to happen, or for a chunk to be added to the buffer from a cancelled read. + const delegate_read = this.delegate.read(); + const [which, result] = await raceCase({ + delegate: delegate_read, + buffer_not_empty: this.waitUntilDataAvailable(), + }); + if (which === 'delegate') { + return result.value; + } + // There's a chunk in the buffer now, so we can use the regular path. + // But first, make sure that once the delegate read completes, we save the chunk. + delegate_read.then((chunk) => { + this.chunks_.push(chunk.value); + }) } const len = this.getTotalBytesReady_(); @@ -87,6 +159,30 @@ export class BetterReader { getTotalBytesReady_ () { return this.chunks_.reduce((sum, chunk) => sum + chunk.length, 0); } + + canRead() { + return this.getTotalBytesReady_() > 0; + } + + async waitUntilDataAvailable() { + let resolve_promise; + let reject_promise; + const promise = new Promise((resolve, reject) => { + resolve_promise = resolve; + reject_promise = reject; + }); + + const check = () => { + if (this.canRead()) { + resolve_promise(); + } else { + setTimeout(check, 0); + } + }; + setTimeout(check, 0); + + await promise; + } } /** diff --git a/packages/phoenix/src/ansi-shell/pipeline/Coupler.js b/packages/phoenix/src/ansi-shell/pipeline/Coupler.js index 2845be3aa..3182c4835 100644 --- a/packages/phoenix/src/ansi-shell/pipeline/Coupler.js +++ b/packages/phoenix/src/ansi-shell/pipeline/Coupler.js @@ -16,7 +16,7 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ -import { TeePromise } from "../../promise.js"; +import { TeePromise, raceCase } from "../../promise.js"; export class Coupler { static description = ` @@ -40,7 +40,6 @@ export class Coupler { close () { this.closed_.resolve({ - value: undefined, done: true, }); } @@ -48,11 +47,21 @@ export class Coupler { async listenLoop_ () { this.active = true; for (;;) { - const { value, done } = await Promise.race([ - this.closed_, - this.source.read(), - ]); + let canceller = () => {}; + let promise; + if ( this.source.read_with_cancel !== undefined ) { + ({ canceller, promise } = this.source.read_with_cancel()); + } else { + promise = this.source.read(); + } + const [which, { value, done }] = await raceCase({ + source: promise, + closed: this.closed_, + }); if ( done ) { + if ( which === 'closed' ) { + canceller(); + } this.source = null; this.target = null; this.active = false; diff --git a/packages/phoenix/src/promise.js b/packages/phoenix/src/promise.js index 20d080ae1..5c47d2bd1 100644 --- a/packages/phoenix/src/promise.js +++ b/packages/phoenix/src/promise.js @@ -41,3 +41,17 @@ export class TeePromise { return this.then(fn); } } + +/** + * raceCase is like Promise.race except it takes an object instead of + * an array, and returns the key of the promise that resolves first + * as well as the value that it resolved to. + * + * @param {Object.} promise_map + * + * @returns {Promise.<[string, any]>} + */ +export const raceCase = async (promise_map) => { + return Promise.race(Object.entries(promise_map).map( + ([key, promise]) => promise.then(value => [key, value]))); +};