dev: get streaming token counts on everything (except xAI)

This commit is contained in:
KernelDeimos
2024-11-26 16:26:05 -05:00
parent 2dd9417234
commit 0d90b588aa
6 changed files with 132 additions and 9 deletions
@@ -1,4 +1,6 @@
const BaseService = require("../../services/BaseService");
const { TypeSpec } = require("../../services/drivers/meta/Construct");
const { TypedValue } = require("../../services/drivers/meta/Runtime");
const { Context } = require("../../util/context");
const MAX_FALLBACKS = 3 + 1; // includes first attempt
@@ -18,6 +20,17 @@ class AIChatService extends BaseService {
}
_init () {
this.kvkey = this.modules.uuidv4();
const svc_event = this.services.get('event');
svc_event.on('ai.prompt.report-usage', async (_, details) => {
const user_id = details.actor?.type?.user?.id;
const app_id = details.actor?.type?.app?.id;
const service_name = details.service_used;
const model_name = details.model_used;
const value_uint_1 = details.usage?.input_tokens;
const value_uint_2 = details.usage?.output_tokens;
console.log('reporting usage', { user_id, app_id, service_name, model_name, value_uint_1, value_uint_2 }); });
}
async ['__on_boot.consolidation'] () {
@@ -168,6 +181,7 @@ class AIChatService extends BaseService {
error = e;
errors.push(e);
console.error(e);
this.log.error('error calling service', {
intended_service,
model,
@@ -228,6 +242,34 @@ class AIChatService extends BaseService {
response_metadata.service_used = service_used;
const username = Context.get('actor').type?.user?.username;
if (
// Check if we have 'ai-chat-intermediate' response type;
// this means we're streaming and usage comes from a promise.
(ret.result instanceof TypedValue) &&
TypeSpec.adapt({ $: 'ai-chat-intermediate' })
.equals(ret.result.type)
) {
(async () => {
const usage_promise = ret.result.value.usage_promise;
const usage = await usage_promise;
await svc_event.emit('ai.prompt.report-usage', {
actor: Context.get('actor'),
service_used,
model_used,
usage,
});
})();
return ret.result.value.response;
} else {
await svc_event.emit('ai.prompt.report-usage', {
actor: Context.get('actor'),
username,
service_used,
model_used,
usage: ret.result.usage,
});
}
console.log('emitting ai.prompt.complete');
await svc_event.emit('ai.prompt.complete', {
@@ -4,6 +4,7 @@ const { whatis } = require("../../util/langutil");
const { PassThrough } = require("stream");
const { TypedValue } = require("../../services/drivers/meta/Runtime");
const APIError = require("../../api/APIError");
const { TeePromise } = require("../../util/promise");
const PUTER_PROMPT = `
You are running on an open-source platform called Puter,
@@ -102,6 +103,8 @@ class ClaudeService extends BaseService {
}
if ( stream ) {
let usage_promise = new TeePromise();
const stream = new PassThrough();
const retval = new TypedValue({
$: 'stream',
@@ -116,7 +119,16 @@ class ClaudeService extends BaseService {
system: PUTER_PROMPT + JSON.stringify(system_prompts),
messages: adapted_messages,
});
const counts = { input_tokens: 0, output_tokens: 0 };
for await ( const event of completion ) {
const input_tokens =
(event?.usage ?? event?.message?.usage)?.input_tokens;
const output_tokens =
(event?.usage ?? event?.message?.usage)?.output_tokens;
if ( input_tokens ) counts.input_tokens += input_tokens;
if ( output_tokens ) counts.output_tokens += output_tokens;
if (
event.type !== 'content_block_delta' ||
event.delta.type !== 'text_delta'
@@ -127,9 +139,14 @@ class ClaudeService extends BaseService {
stream.write(str + '\n');
}
stream.end();
usage_promise.resolve(counts);
})();
return retval;
return new TypedValue({ $: 'ai-chat-intermediate' }, {
stream: true,
response: retval,
usage_promise: usage_promise,
});
}
const msg = await this.anthropic.messages.create({
@@ -2,6 +2,7 @@ const { PassThrough } = require("stream");
const BaseService = require("../../services/BaseService");
const { TypedValue } = require("../../services/drivers/meta/Runtime");
const { nou } = require("../../util/langutil");
const { TeePromise } = require("../../util/promise");
class GroqAIService extends BaseService {
static MODULES = {
@@ -50,6 +51,8 @@ class GroqAIService extends BaseService {
});
if ( stream ) {
const usage_promise = new TeePromise();
const stream = new PassThrough();
const retval = new TypedValue({
$: 'stream',
@@ -58,6 +61,15 @@ class GroqAIService extends BaseService {
}, stream);
(async () => {
for await ( const chunk of completion ) {
let usage = chunk?.x_groq?.usage ?? chunk.usage;
if ( usage ) {
usage_promise.resolve({
input_tokens: usage.prompt_tokens,
output_tokens: usage.completion_tokens,
});
continue;
}
if ( chunk.choices.length < 1 ) continue;
if ( chunk.choices[0].finish_reason ) {
stream.end();
@@ -71,7 +83,12 @@ class GroqAIService extends BaseService {
}
stream.end();
})();
return retval;
return new TypedValue({ $: 'ai-chat-intermediate' }, {
stream: true,
response: retval,
usage_promise: usage_promise,
});
}
const ret = completion.choices[0];
@@ -4,6 +4,7 @@ const { TypedValue } = require("../../services/drivers/meta/Runtime");
const { nou } = require("../../util/langutil");
const axios = require('axios');
const { TeePromise } = require("../../util/promise");
class MistralAIService extends BaseService {
static MODULES = {
@@ -149,6 +150,8 @@ class MistralAIService extends BaseService {
}
if ( stream ) {
let usage_promise = new TeePromise();
const stream = new PassThrough();
const retval = new TypedValue({
$: 'stream',
@@ -164,6 +167,14 @@ class MistralAIService extends BaseService {
// just because Mistral wants to be different
chunk = chunk.data;
if ( chunk.usage ) {
usage_promise.resolve({
input_tokens: chunk.usage.promptTokens,
output_tokens: chunk.usage.completionTokens,
});
continue;
}
if ( chunk.choices.length < 1 ) continue;
if ( chunk.choices[0].finish_reason ) {
stream.end();
@@ -177,7 +188,12 @@ class MistralAIService extends BaseService {
}
stream.end();
})();
return retval;
return new TypedValue({ $: 'ai-chat-intermediate' }, {
stream: true,
response: retval,
usage_promise: usage_promise,
});
}
const completion = await this.client.chat.complete({
@@ -5,6 +5,7 @@ const { TypedValue } = require('../../services/drivers/meta/Runtime');
const { Context } = require('../../util/context');
const SmolUtil = require('../../util/smolutil');
const { nou } = require('../../util/langutil');
const { TeePromise } = require('../../util/promise');
class OpenAICompletionService extends BaseService {
static MODULES = {
@@ -287,9 +288,14 @@ class OpenAICompletionService extends BaseService {
model: model,
max_tokens,
stream,
...(stream ? {
stream_options: { include_usage: true },
} : {}),
});
if ( stream ) {
let usage_promise = new TeePromise();
const entire = [];
const stream = new PassThrough();
const retval = new TypedValue({
@@ -300,11 +306,14 @@ class OpenAICompletionService extends BaseService {
(async () => {
for await ( const chunk of completion ) {
entire.push(chunk);
if ( chunk.choices.length < 1 ) continue;
if ( chunk.choices[0].finish_reason ) {
stream.end();
break;
if ( chunk.usage ) {
usage_promise.resolve({
input_tokens: chunk.usage.prompt_tokens,
output_tokens: chunk.usage.completion_tokens,
});
continue;
}
if ( chunk.choices.length < 1 ) continue;
if ( nou(chunk.choices[0].delta.content) ) continue;
const str = JSON.stringify({
text: chunk.choices[0].delta.content
@@ -313,6 +322,12 @@ class OpenAICompletionService extends BaseService {
}
stream.end();
})();
return new TypedValue({ $: 'ai-chat-intermediate' }, {
stream: true,
response: retval,
usage_promise: usage_promise,
});
return retval;
}
@@ -2,6 +2,7 @@ const { PassThrough } = require("stream");
const BaseService = require("../../services/BaseService");
const { TypedValue } = require("../../services/drivers/meta/Runtime");
const { nou } = require("../../util/langutil");
const { TeePromise } = require("../../util/promise");
class TogetherAIService extends BaseService {
static MODULES = {
@@ -52,6 +53,8 @@ class TogetherAIService extends BaseService {
});
if ( stream ) {
let usage_promise = new TeePromise();
const stream = new PassThrough();
const retval = new TypedValue({
$: 'stream',
@@ -60,6 +63,14 @@ class TogetherAIService extends BaseService {
}, stream);
(async () => {
for await ( const chunk of completion ) {
// DRY: same as openai
if ( chunk.usage ) {
usage_promise.resolve({
input_tokens: chunk.usage.prompt_tokens,
output_tokens: chunk.usage.completion_tokens,
});
}
if ( chunk.choices.length < 1 ) continue;
if ( chunk.choices[0].finish_reason ) {
stream.end();
@@ -73,7 +84,12 @@ class TogetherAIService extends BaseService {
}
stream.end();
})();
return retval;
return new TypedValue({ $: 'ai-chat-intermediate' }, {
stream: true,
response: retval,
usage_promise: usage_promise,
});
}
// return completion.choices[0];