From c3654ab148e42ec084bcefeb9c3a97583a6e7e37 Mon Sep 17 00:00:00 2001 From: KernelDeimos Date: Fri, 19 Apr 2024 23:24:32 -0400 Subject: [PATCH] Add more streaming fixes --- packages/phoenix/packages/pty/exports.js | 255 ++++++++++++++++-- .../src/ansi-shell/pipeline/Coupler.js | 11 +- .../src/ansi-shell/pipeline/Pipeline.js | 7 + .../src/ansi-shell/readline/readline.js | 1 + 4 files changed, 252 insertions(+), 22 deletions(-) diff --git a/packages/phoenix/packages/pty/exports.js b/packages/phoenix/packages/pty/exports.js index 56c87f3a2..ff62d4e9f 100644 --- a/packages/phoenix/packages/pty/exports.js +++ b/packages/phoenix/packages/pty/exports.js @@ -16,19 +16,145 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ -import { raceCase } from '../../src/promise.js'; +import { TeePromise, raceCase } from '../../src/promise.js'; const encoder = new TextEncoder(); const CHAR_LF = '\n'.charCodeAt(0); const CHAR_CR = '\r'.charCodeAt(0); +const DONE = Symbol('done'); + +class Channel { + constructor () { + this.chunks_ = []; + + globalThis.chnl = this; + + const events = ['write','consume','change']; + for ( const event of events ) { + this[`on_${event}_`] = []; + this[`emit_${event}_`] = () => { + for ( const listener of this[`on_${event}_`] ) { + listener(); + } + }; + } + + this.on('write', () => { this.emit_change_(); }); + this.on('consume', () => { this.emit_change_(); }); + } + + on (event, listener) { + this[`on_${event}_`].push(listener); + } + + off (event, listener) { + const index = this[`on_${event}_`].indexOf(listener); + if ( index !== -1 ) { + this[`on_${event}_`].splice(index, 1); + } + } + + get () { + const cancel = new TeePromise(); + const data = new TeePromise(); + const done = new TeePromise(); + + let called = 0; + + const on_data = () => { + if ( this.chunks_.length > 0 ) { + if ( called > 0 ) { + throw new Error('called more than once'); + } + called++; + const chunk = this.chunks_.shift(); + console.log('shifted off chunk', chunk); + ( chunk === DONE ? done : data ).resolve(chunk); + this.off('write', on_data); + this.emit_consume_(); + } + }; + + console.log('this case', this.chunks_.length); + + this.on('write', on_data); + on_data(); + + const to_return = { + cancel: () => { + console.log('cancel called'); + this.off('write', on_data); + cancel.resolve(); + }, + promise: raceCase({ + cancel, + data, + done, + }), + }; + + console.log('to_return?', to_return); + return to_return; + } + + write (chunk) { + this.chunks_.push(chunk); + this.emit_write_(); + } + + pushback (...chunks) { + console.log('pushing back...', chunks) + for ( let i = chunks.length - 1; i >= 0; i-- ) { + console.log('unshifting ', i, chunks[i]); + console.log('chunks_ before unshift', this.chunks_.length); + this.chunks_.unshift(chunks[i]); + console.log('chunks_ after unshift', this.chunks_.length); + + } + this.emit_write_(); + } + + is_empty () { + return this.chunks_.length === 0; + } +} + export class BetterReader { constructor ({ delegate }) { this.delegate = delegate; this.chunks_ = []; + this.channel_ = new Channel(); + + this._init(); } + _init () { + let working = Promise.resolve(); + this.channel_.on('consume', async () => { + await working; + working = new TeePromise(); + if ( this.channel_.is_empty() ) { + await this.intake_(); + } + working.resolve(); + }); + this.intake_(); + } + + async intake_ () { + const { value, done } = await this.delegate.read(); + if ( done ) { + console.log('writing to channel '); + this.channel_.write(DONE); + return; + } + console.log('writing to channel', value); + this.channel_.write(value); + } + + _create_cancel_response () { return { chunk: null, @@ -41,12 +167,75 @@ export class BetterReader { }; } - async read_and_get_info (opt_buffer, cancel_state) { + read_and_get_info (opt_buffer, cancel_state) { + if ( ! opt_buffer ) { + const { promise, cancel } = this.channel_.get(); + return { + cancel, + promise: promise.then(([which, chunk]) => { + if ( which !== 'data' ) { + return { done: true, value: null }; + } + return { value: chunk }; + }), + + }; + } + console.log('!!!'); + + const final_promise = new TeePromise(); + let current_cancel_ = () => {}; + + (async () => { + console.log('STARTING BUFFER READ'); + let n_read = 0; + const chunks = []; + while ( n_read < opt_buffer.length ) { + const { promise, cancel } = this.channel_.get(); + current_cancel_ = cancel; + + let [which, chunk] = await promise; + console.log('which', which, 'chunk', chunk) + if ( which === 'done' ) { + break; + } + if ( which === 'cancel' ) { + this.channel_.pushback(...chunks); + return + } + if ( n_read + chunk.length > opt_buffer.length ) { + const diff = opt_buffer.length - n_read; + this.channel_.pushback(chunk.subarray(diff)); + chunk = chunk.subarray(0, diff); + } + chunks.push(chunk); + console.log('calling set', chunk, n_read, opt_buffer.length); + opt_buffer.set(chunk, n_read); + n_read += chunk.length; + } + + console.log('RESOLVING', opt_buffer); + console.log('-- and channel?', this.channel_.chunks_.length); + + final_promise.resolve({ n_read }); + })(); + + return { + cancel: () => { + current_cancel_(); + }, + promise: final_promise, + }; + + // --- everything below this line is being removed --- + + /* if ( ! opt_buffer && this.chunks_.length === 0 ) { const chunk = await this.delegate.read(); if ( cancel_state?.cancelled ) { // push the chunk back onto the queue - this.chunks_.push(chunk.value); + console.log('aaa', chunk); + this.chunks_.push(chunk); return this._create_cancel_response(); } return { @@ -56,8 +245,10 @@ export class BetterReader { } const chunk = await this.getChunk_(); + console.log('what we got', chunk); if ( cancel_state?.cancelled ) { // push the chunk back onto the queue + console.log('bbb', chunk); this.chunks_.push(chunk); return this._create_cancel_response(); } @@ -70,30 +261,49 @@ export class BetterReader { return { n_read: 0, debug_meta: { source: 'nothing', returning: 'byte count' } }; } + console.log('ccc', chunk); this.chunks_.push(chunk); + let itermax = 20; while ( this.getTotalBytesReady_() < opt_buffer.length ) { + if ( --itermax < 0 ) { + throw new Error('too many iterations'); + } + console.log('iter b'); const read_chunk = await this.getChunk_(); if ( cancel_state?.cancelled ) { // push the chunk back onto the queue + console.log('ddd', chunk); this.chunks_.push(read_chunk); return this._create_cancel_response(); } if ( ! read_chunk ) { break; } + console.log('adding chunk', read_chunk) this.chunks_.push(read_chunk); } let offset = 0; while ( this.chunks_.length > 0 && offset < opt_buffer.length ) { + console.log('iter a') let item = this.chunks_.shift(); if ( item === undefined ) { + console.log('undefined ', this.chunks_); break; } + if ( item.value === undefined ) { + console.log('undefined ', item, this.chunks_); + break; + } + const is_done = item.done; + item = item.value; if ( offset + item.length > opt_buffer.length ) { const diff = opt_buffer.length - offset; - this.chunks_.unshift(item.subarray(diff)); + this.chunks_.unshift({ + done: is_done, + value: item.subarray(diff), + }); item = item.subarray(0, diff); } opt_buffer.set(item, offset); @@ -104,24 +314,28 @@ export class BetterReader { n_read: offset, debug_meta: { source: 'stored chunks', returning: 'byte count' }, }; + /**/ } read_with_cancel (opt_buffer) { - const cancel_state = { cancelled: false }; - const promise = (async () => { - const { chunk, n_read } = await this.read_and_get_info(opt_buffer, cancel_state); - return opt_buffer ? n_read : chunk; - })(); + console.log('read with cancel called'); + const o = this.read_and_get_info(opt_buffer); + const { cancel, promise } = o; + promise.then(v => { + console.log('promise resolved', v); + }); + // const promise = (async () => { + // const { chunk, n_read } = await this.read_and_get_info(opt_buffer, cancel_state); + // return opt_buffer ? n_read : chunk; + // })(); return { - canceller: () => { - cancel_state.cancelled = true; - }, + cancel, promise, }; } async read (opt_buffer) { - const { chunk, n_read } = await this.read_and_get_info(opt_buffer); + const { chunk, n_read } = await this.read_and_get_info(opt_buffer).promise; return opt_buffer ? n_read : chunk; } @@ -133,17 +347,19 @@ export class BetterReader { delegate: delegate_read, buffer_not_empty: this.waitUntilDataAvailable(), }); + console.log('which?', which); if (which === 'delegate') { - return result.value; + return result; } + // There's a chunk in the buffer now, so we can use the regular path. // But first, make sure that once the delegate read completes, we save the chunk. - delegate_read.then((chunk) => { - this.chunks_.push(chunk.value); - }) + console.log('result', result) + this.chunks_.push(result); } const len = this.getTotalBytesReady_(); + console.log('len', len); const merged = new Uint8Array(len); let offset = 0; for ( const item of this.chunks_ ) { @@ -157,7 +373,10 @@ export class BetterReader { } getTotalBytesReady_ () { - return this.chunks_.reduce((sum, chunk) => sum + chunk.length, 0); + return this.chunks_.reduce((sum, chunk) => { + console.log('sum', sum, 'chunk', chunk); + return sum + chunk.value.length + }, 0); } canRead() { diff --git a/packages/phoenix/src/ansi-shell/pipeline/Coupler.js b/packages/phoenix/src/ansi-shell/pipeline/Coupler.js index 3182c4835..d7679d496 100644 --- a/packages/phoenix/src/ansi-shell/pipeline/Coupler.js +++ b/packages/phoenix/src/ansi-shell/pipeline/Coupler.js @@ -47,20 +47,22 @@ export class Coupler { async listenLoop_ () { this.active = true; for (;;) { - let canceller = () => {}; + let cancel = () => {}; let promise; if ( this.source.read_with_cancel !== undefined ) { - ({ canceller, promise } = this.source.read_with_cancel()); + ({ cancel, promise } = this.source.read_with_cancel()); } else { promise = this.source.read(); } - const [which, { value, done }] = await raceCase({ + const [which, result] = await raceCase({ source: promise, closed: this.closed_, }); + console.log('result?', which, result); + const { value, done } = result; if ( done ) { if ( which === 'closed' ) { - canceller(); + cancel(); } this.source = null; this.target = null; @@ -69,6 +71,7 @@ export class Coupler { break; } if ( this.on_ ) { + if ( ! value ) debugger; await this.target.write(value); } } diff --git a/packages/phoenix/src/ansi-shell/pipeline/Pipeline.js b/packages/phoenix/src/ansi-shell/pipeline/Pipeline.js index bda57096d..b67dec868 100644 --- a/packages/phoenix/src/ansi-shell/pipeline/Pipeline.js +++ b/packages/phoenix/src/ansi-shell/pipeline/Pipeline.js @@ -285,7 +285,9 @@ export class PreparedCommand { let exit_code = 0; try { + console.log(`awaiting execute for ${command.name}`) await execute(ctx); + console.log(`DONE execute for ${command.name}`) valve.close(); } catch (e) { if ( e instanceof Exit ) { @@ -304,10 +306,12 @@ export class PreparedCommand { ); ctx.locals.exit = -1; } + if ( ! (e instanceof Exit) ) console.error(e); } // ctx.externs.in?.close?.(); // ctx.externs.out?.close?.(); + console.log(`calling close for ${command.name}`, ctx.externs.out); await ctx.externs.out.close(); // TODO: need write command from puter-shell before this can be done @@ -382,8 +386,11 @@ export class Pipeline { const command = preparedCommands[i]; commandPromises.push(command.execute()); } + console.log('command promises', commandPromises); await Promise.all(commandPromises); + console.log('|AWAIT COUPLER'); await coupler.isDone; + console.log('|DONE AWAIT COUPLER'); } } \ No newline at end of file diff --git a/packages/phoenix/src/ansi-shell/readline/readline.js b/packages/phoenix/src/ansi-shell/readline/readline.js index 0a9c04901..64d78ef00 100644 --- a/packages/phoenix/src/ansi-shell/readline/readline.js +++ b/packages/phoenix/src/ansi-shell/readline/readline.js @@ -58,6 +58,7 @@ const ReadlineProcessorBuilder = builder => builder const byteBuffer = new Uint8Array(1); await externs.in_.read(byteBuffer); + console.log('got a byte!', byteBuffer[0]); locals.byteBuffer = byteBuffer; locals.byte = byteBuffer[0]; })