diff --git a/packages/phoenix/packages/pty/exports.js b/packages/phoenix/packages/pty/exports.js index bbab56eff..56c87f3a2 100644 --- a/packages/phoenix/packages/pty/exports.js +++ b/packages/phoenix/packages/pty/exports.js @@ -16,6 +16,8 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ +import { raceCase } from '../../src/promise.js'; + const encoder = new TextEncoder(); const CHAR_LF = '\n'.charCodeAt(0); @@ -27,29 +29,67 @@ export class BetterReader { this.chunks_ = []; } - async read (opt_buffer) { + _create_cancel_response () { + return { + chunk: null, + n_read: 0, + debug_meta: { + source: 'delegate', + returning: 'cancelled', + this_value_should_not_be_used: true, + }, + }; + } + + async read_and_get_info (opt_buffer, cancel_state) { if ( ! opt_buffer && this.chunks_.length === 0 ) { - return await this.delegate.read(); + const chunk = await this.delegate.read(); + if ( cancel_state?.cancelled ) { + // push the chunk back onto the queue + this.chunks_.push(chunk.value); + return this._create_cancel_response(); + } + return { + chunk, + debug_meta: { source: 'delegate' }, + }; } const chunk = await this.getChunk_(); + if ( cancel_state?.cancelled ) { + // push the chunk back onto the queue + this.chunks_.push(chunk); + return this._create_cancel_response(); + } if ( ! opt_buffer ) { - return chunk; + return { chunk, debug_meta: { source: 'stored chunks', returning: 'chunk' } }; + } + + if ( ! chunk ) { + return { n_read: 0, debug_meta: { source: 'nothing', returning: 'byte count' } }; } this.chunks_.push(chunk); while ( this.getTotalBytesReady_() < opt_buffer.length ) { - this.chunks_.push(await this.getChunk_()) + const read_chunk = await this.getChunk_(); + if ( cancel_state?.cancelled ) { + // push the chunk back onto the queue + this.chunks_.push(read_chunk); + return this._create_cancel_response(); + } + if ( ! read_chunk ) { + break; + } + this.chunks_.push(read_chunk); } - // TODO: need to handle EOT condition in this loop let offset = 0; - for (;;) { + while ( this.chunks_.length > 0 && offset < opt_buffer.length ) { let item = this.chunks_.shift(); if ( item === undefined ) { - throw new Error('calculation is wrong') + break; } if ( offset + item.length > opt_buffer.length ) { const diff = opt_buffer.length - offset; @@ -58,17 +98,49 @@ export class BetterReader { } opt_buffer.set(item, offset); offset += item.length; - - if ( offset == opt_buffer.length ) break; } - // return opt_buffer.length; + return { + n_read: offset, + debug_meta: { source: 'stored chunks', returning: 'byte count' }, + }; + } + + read_with_cancel (opt_buffer) { + const cancel_state = { cancelled: false }; + const promise = (async () => { + const { chunk, n_read } = await this.read_and_get_info(opt_buffer, cancel_state); + return opt_buffer ? n_read : chunk; + })(); + return { + canceller: () => { + cancel_state.cancelled = true; + }, + promise, + }; + } + + async read (opt_buffer) { + const { chunk, n_read } = await this.read_and_get_info(opt_buffer); + return opt_buffer ? n_read : chunk; } async getChunk_() { if ( this.chunks_.length === 0 ) { - const { value } = await this.delegate.read(); - return value; + // Wait for either a delegate read to happen, or for a chunk to be added to the buffer from a cancelled read. + const delegate_read = this.delegate.read(); + const [which, result] = await raceCase({ + delegate: delegate_read, + buffer_not_empty: this.waitUntilDataAvailable(), + }); + if (which === 'delegate') { + return result.value; + } + // There's a chunk in the buffer now, so we can use the regular path. + // But first, make sure that once the delegate read completes, we save the chunk. + delegate_read.then((chunk) => { + this.chunks_.push(chunk.value); + }) } const len = this.getTotalBytesReady_(); @@ -87,6 +159,30 @@ export class BetterReader { getTotalBytesReady_ () { return this.chunks_.reduce((sum, chunk) => sum + chunk.length, 0); } + + canRead() { + return this.getTotalBytesReady_() > 0; + } + + async waitUntilDataAvailable() { + let resolve_promise; + let reject_promise; + const promise = new Promise((resolve, reject) => { + resolve_promise = resolve; + reject_promise = reject; + }); + + const check = () => { + if (this.canRead()) { + resolve_promise(); + } else { + setTimeout(check, 0); + } + }; + setTimeout(check, 0); + + await promise; + } } /** diff --git a/packages/phoenix/src/ansi-shell/pipeline/Coupler.js b/packages/phoenix/src/ansi-shell/pipeline/Coupler.js index 2845be3aa..3182c4835 100644 --- a/packages/phoenix/src/ansi-shell/pipeline/Coupler.js +++ b/packages/phoenix/src/ansi-shell/pipeline/Coupler.js @@ -16,7 +16,7 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ -import { TeePromise } from "../../promise.js"; +import { TeePromise, raceCase } from "../../promise.js"; export class Coupler { static description = ` @@ -40,7 +40,6 @@ export class Coupler { close () { this.closed_.resolve({ - value: undefined, done: true, }); } @@ -48,11 +47,21 @@ export class Coupler { async listenLoop_ () { this.active = true; for (;;) { - const { value, done } = await Promise.race([ - this.closed_, - this.source.read(), - ]); + let canceller = () => {}; + let promise; + if ( this.source.read_with_cancel !== undefined ) { + ({ canceller, promise } = this.source.read_with_cancel()); + } else { + promise = this.source.read(); + } + const [which, { value, done }] = await raceCase({ + source: promise, + closed: this.closed_, + }); if ( done ) { + if ( which === 'closed' ) { + canceller(); + } this.source = null; this.target = null; this.active = false; diff --git a/packages/phoenix/src/promise.js b/packages/phoenix/src/promise.js index 20d080ae1..5c47d2bd1 100644 --- a/packages/phoenix/src/promise.js +++ b/packages/phoenix/src/promise.js @@ -41,3 +41,17 @@ export class TeePromise { return this.then(fn); } } + +/** + * raceCase is like Promise.race except it takes an object instead of + * an array, and returns the key of the promise that resolves first + * as well as the value that it resolved to. + * + * @param {Object.} promise_map + * + * @returns {Promise.<[string, any]>} + */ +export const raceCase = async (promise_map) => { + return Promise.race(Object.entries(promise_map).map( + ([key, promise]) => promise.then(value => [key, value]))); +};