diff --git a/packages/phoenix/packages/pty/exports.js b/packages/phoenix/packages/pty/exports.js
index 56c87f3a2..ff62d4e9f 100644
--- a/packages/phoenix/packages/pty/exports.js
+++ b/packages/phoenix/packages/pty/exports.js
@@ -16,19 +16,145 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*/
-import { raceCase } from '../../src/promise.js';
+import { TeePromise, raceCase } from '../../src/promise.js';
const encoder = new TextEncoder();
const CHAR_LF = '\n'.charCodeAt(0);
const CHAR_CR = '\r'.charCodeAt(0);
+const DONE = Symbol('done');
+
+class Channel {
+ constructor () {
+ this.chunks_ = [];
+
+ globalThis.chnl = this;
+
+ const events = ['write','consume','change'];
+ for ( const event of events ) {
+ this[`on_${event}_`] = [];
+ this[`emit_${event}_`] = () => {
+ for ( const listener of this[`on_${event}_`] ) {
+ listener();
+ }
+ };
+ }
+
+ this.on('write', () => { this.emit_change_(); });
+ this.on('consume', () => { this.emit_change_(); });
+ }
+
+ on (event, listener) {
+ this[`on_${event}_`].push(listener);
+ }
+
+ off (event, listener) {
+ const index = this[`on_${event}_`].indexOf(listener);
+ if ( index !== -1 ) {
+ this[`on_${event}_`].splice(index, 1);
+ }
+ }
+
+ get () {
+ const cancel = new TeePromise();
+ const data = new TeePromise();
+ const done = new TeePromise();
+
+ let called = 0;
+
+ const on_data = () => {
+ if ( this.chunks_.length > 0 ) {
+ if ( called > 0 ) {
+ throw new Error('called more than once');
+ }
+ called++;
+ const chunk = this.chunks_.shift();
+ console.log('shifted off chunk', chunk);
+ ( chunk === DONE ? done : data ).resolve(chunk);
+ this.off('write', on_data);
+ this.emit_consume_();
+ }
+ };
+
+ console.log('this case', this.chunks_.length);
+
+ this.on('write', on_data);
+ on_data();
+
+ const to_return = {
+ cancel: () => {
+ console.log('cancel called');
+ this.off('write', on_data);
+ cancel.resolve();
+ },
+ promise: raceCase({
+ cancel,
+ data,
+ done,
+ }),
+ };
+
+ console.log('to_return?', to_return);
+ return to_return;
+ }
+
+ write (chunk) {
+ this.chunks_.push(chunk);
+ this.emit_write_();
+ }
+
+ pushback (...chunks) {
+ console.log('pushing back...', chunks)
+ for ( let i = chunks.length - 1; i >= 0; i-- ) {
+ console.log('unshifting ', i, chunks[i]);
+ console.log('chunks_ before unshift', this.chunks_.length);
+ this.chunks_.unshift(chunks[i]);
+ console.log('chunks_ after unshift', this.chunks_.length);
+
+ }
+ this.emit_write_();
+ }
+
+ is_empty () {
+ return this.chunks_.length === 0;
+ }
+}
+
export class BetterReader {
constructor ({ delegate }) {
this.delegate = delegate;
this.chunks_ = [];
+ this.channel_ = new Channel();
+
+ this._init();
}
+ _init () {
+ let working = Promise.resolve();
+ this.channel_.on('consume', async () => {
+ await working;
+ working = new TeePromise();
+ if ( this.channel_.is_empty() ) {
+ await this.intake_();
+ }
+ working.resolve();
+ });
+ this.intake_();
+ }
+
+ async intake_ () {
+ const { value, done } = await this.delegate.read();
+ if ( done ) {
+ console.log('writing to channel ');
+ this.channel_.write(DONE);
+ return;
+ }
+ console.log('writing to channel', value);
+ this.channel_.write(value);
+ }
+
+
_create_cancel_response () {
return {
chunk: null,
@@ -41,12 +167,75 @@ export class BetterReader {
};
}
- async read_and_get_info (opt_buffer, cancel_state) {
+ read_and_get_info (opt_buffer, cancel_state) {
+ if ( ! opt_buffer ) {
+ const { promise, cancel } = this.channel_.get();
+ return {
+ cancel,
+ promise: promise.then(([which, chunk]) => {
+ if ( which !== 'data' ) {
+ return { done: true, value: null };
+ }
+ return { value: chunk };
+ }),
+
+ };
+ }
+ console.log('!!!');
+
+ const final_promise = new TeePromise();
+ let current_cancel_ = () => {};
+
+ (async () => {
+ console.log('STARTING BUFFER READ');
+ let n_read = 0;
+ const chunks = [];
+ while ( n_read < opt_buffer.length ) {
+ const { promise, cancel } = this.channel_.get();
+ current_cancel_ = cancel;
+
+ let [which, chunk] = await promise;
+ console.log('which', which, 'chunk', chunk)
+ if ( which === 'done' ) {
+ break;
+ }
+ if ( which === 'cancel' ) {
+ this.channel_.pushback(...chunks);
+ return
+ }
+ if ( n_read + chunk.length > opt_buffer.length ) {
+ const diff = opt_buffer.length - n_read;
+ this.channel_.pushback(chunk.subarray(diff));
+ chunk = chunk.subarray(0, diff);
+ }
+ chunks.push(chunk);
+ console.log('calling set', chunk, n_read, opt_buffer.length);
+ opt_buffer.set(chunk, n_read);
+ n_read += chunk.length;
+ }
+
+ console.log('RESOLVING', opt_buffer);
+ console.log('-- and channel?', this.channel_.chunks_.length);
+
+ final_promise.resolve({ n_read });
+ })();
+
+ return {
+ cancel: () => {
+ current_cancel_();
+ },
+ promise: final_promise,
+ };
+
+ // --- everything below this line is being removed ---
+
+ /*
if ( ! opt_buffer && this.chunks_.length === 0 ) {
const chunk = await this.delegate.read();
if ( cancel_state?.cancelled ) {
// push the chunk back onto the queue
- this.chunks_.push(chunk.value);
+ console.log('aaa', chunk);
+ this.chunks_.push(chunk);
return this._create_cancel_response();
}
return {
@@ -56,8 +245,10 @@ export class BetterReader {
}
const chunk = await this.getChunk_();
+ console.log('what we got', chunk);
if ( cancel_state?.cancelled ) {
// push the chunk back onto the queue
+ console.log('bbb', chunk);
this.chunks_.push(chunk);
return this._create_cancel_response();
}
@@ -70,30 +261,49 @@ export class BetterReader {
return { n_read: 0, debug_meta: { source: 'nothing', returning: 'byte count' } };
}
+ console.log('ccc', chunk);
this.chunks_.push(chunk);
+ let itermax = 20;
while ( this.getTotalBytesReady_() < opt_buffer.length ) {
+ if ( --itermax < 0 ) {
+ throw new Error('too many iterations');
+ }
+ console.log('iter b');
const read_chunk = await this.getChunk_();
if ( cancel_state?.cancelled ) {
// push the chunk back onto the queue
+ console.log('ddd', chunk);
this.chunks_.push(read_chunk);
return this._create_cancel_response();
}
if ( ! read_chunk ) {
break;
}
+ console.log('adding chunk', read_chunk)
this.chunks_.push(read_chunk);
}
let offset = 0;
while ( this.chunks_.length > 0 && offset < opt_buffer.length ) {
+ console.log('iter a')
let item = this.chunks_.shift();
if ( item === undefined ) {
+ console.log('undefined ', this.chunks_);
break;
}
+ if ( item.value === undefined ) {
+ console.log('undefined ', item, this.chunks_);
+ break;
+ }
+ const is_done = item.done;
+ item = item.value;
if ( offset + item.length > opt_buffer.length ) {
const diff = opt_buffer.length - offset;
- this.chunks_.unshift(item.subarray(diff));
+ this.chunks_.unshift({
+ done: is_done,
+ value: item.subarray(diff),
+ });
item = item.subarray(0, diff);
}
opt_buffer.set(item, offset);
@@ -104,24 +314,28 @@ export class BetterReader {
n_read: offset,
debug_meta: { source: 'stored chunks', returning: 'byte count' },
};
+ /**/
}
read_with_cancel (opt_buffer) {
- const cancel_state = { cancelled: false };
- const promise = (async () => {
- const { chunk, n_read } = await this.read_and_get_info(opt_buffer, cancel_state);
- return opt_buffer ? n_read : chunk;
- })();
+ console.log('read with cancel called');
+ const o = this.read_and_get_info(opt_buffer);
+ const { cancel, promise } = o;
+ promise.then(v => {
+ console.log('promise resolved', v);
+ });
+ // const promise = (async () => {
+ // const { chunk, n_read } = await this.read_and_get_info(opt_buffer, cancel_state);
+ // return opt_buffer ? n_read : chunk;
+ // })();
return {
- canceller: () => {
- cancel_state.cancelled = true;
- },
+ cancel,
promise,
};
}
async read (opt_buffer) {
- const { chunk, n_read } = await this.read_and_get_info(opt_buffer);
+ const { chunk, n_read } = await this.read_and_get_info(opt_buffer).promise;
return opt_buffer ? n_read : chunk;
}
@@ -133,17 +347,19 @@ export class BetterReader {
delegate: delegate_read,
buffer_not_empty: this.waitUntilDataAvailable(),
});
+ console.log('which?', which);
if (which === 'delegate') {
- return result.value;
+ return result;
}
+
// There's a chunk in the buffer now, so we can use the regular path.
// But first, make sure that once the delegate read completes, we save the chunk.
- delegate_read.then((chunk) => {
- this.chunks_.push(chunk.value);
- })
+ console.log('result', result)
+ this.chunks_.push(result);
}
const len = this.getTotalBytesReady_();
+ console.log('len', len);
const merged = new Uint8Array(len);
let offset = 0;
for ( const item of this.chunks_ ) {
@@ -157,7 +373,10 @@ export class BetterReader {
}
getTotalBytesReady_ () {
- return this.chunks_.reduce((sum, chunk) => sum + chunk.length, 0);
+ return this.chunks_.reduce((sum, chunk) => {
+ console.log('sum', sum, 'chunk', chunk);
+ return sum + chunk.value.length
+ }, 0);
}
canRead() {
diff --git a/packages/phoenix/src/ansi-shell/pipeline/Coupler.js b/packages/phoenix/src/ansi-shell/pipeline/Coupler.js
index 3182c4835..d7679d496 100644
--- a/packages/phoenix/src/ansi-shell/pipeline/Coupler.js
+++ b/packages/phoenix/src/ansi-shell/pipeline/Coupler.js
@@ -47,20 +47,22 @@ export class Coupler {
async listenLoop_ () {
this.active = true;
for (;;) {
- let canceller = () => {};
+ let cancel = () => {};
let promise;
if ( this.source.read_with_cancel !== undefined ) {
- ({ canceller, promise } = this.source.read_with_cancel());
+ ({ cancel, promise } = this.source.read_with_cancel());
} else {
promise = this.source.read();
}
- const [which, { value, done }] = await raceCase({
+ const [which, result] = await raceCase({
source: promise,
closed: this.closed_,
});
+ console.log('result?', which, result);
+ const { value, done } = result;
if ( done ) {
if ( which === 'closed' ) {
- canceller();
+ cancel();
}
this.source = null;
this.target = null;
@@ -69,6 +71,7 @@ export class Coupler {
break;
}
if ( this.on_ ) {
+ if ( ! value ) debugger;
await this.target.write(value);
}
}
diff --git a/packages/phoenix/src/ansi-shell/pipeline/Pipeline.js b/packages/phoenix/src/ansi-shell/pipeline/Pipeline.js
index bda57096d..b67dec868 100644
--- a/packages/phoenix/src/ansi-shell/pipeline/Pipeline.js
+++ b/packages/phoenix/src/ansi-shell/pipeline/Pipeline.js
@@ -285,7 +285,9 @@ export class PreparedCommand {
let exit_code = 0;
try {
+ console.log(`awaiting execute for ${command.name}`)
await execute(ctx);
+ console.log(`DONE execute for ${command.name}`)
valve.close();
} catch (e) {
if ( e instanceof Exit ) {
@@ -304,10 +306,12 @@ export class PreparedCommand {
);
ctx.locals.exit = -1;
}
+ if ( ! (e instanceof Exit) ) console.error(e);
}
// ctx.externs.in?.close?.();
// ctx.externs.out?.close?.();
+ console.log(`calling close for ${command.name}`, ctx.externs.out);
await ctx.externs.out.close();
// TODO: need write command from puter-shell before this can be done
@@ -382,8 +386,11 @@ export class Pipeline {
const command = preparedCommands[i];
commandPromises.push(command.execute());
}
+ console.log('command promises', commandPromises);
await Promise.all(commandPromises);
+ console.log('|AWAIT COUPLER');
await coupler.isDone;
+ console.log('|DONE AWAIT COUPLER');
}
}
\ No newline at end of file
diff --git a/packages/phoenix/src/ansi-shell/readline/readline.js b/packages/phoenix/src/ansi-shell/readline/readline.js
index 0a9c04901..64d78ef00 100644
--- a/packages/phoenix/src/ansi-shell/readline/readline.js
+++ b/packages/phoenix/src/ansi-shell/readline/readline.js
@@ -58,6 +58,7 @@ const ReadlineProcessorBuilder = builder => builder
const byteBuffer = new Uint8Array(1);
await externs.in_.read(byteBuffer);
+ console.log('got a byte!', byteBuffer[0]);
locals.byteBuffer = byteBuffer;
locals.byte = byteBuffer[0];
})