From 0d90b588aa5b57765c3e698193121ae590c71e97 Mon Sep 17 00:00:00 2001 From: KernelDeimos Date: Tue, 26 Nov 2024 16:26:05 -0500 Subject: [PATCH] dev: get streaming token counts on everything (except xAI) --- .../src/modules/puterai/AIChatService.js | 42 +++++++++++++++++++ .../src/modules/puterai/ClaudeService.js | 19 ++++++++- .../src/modules/puterai/GroqAIService.js | 19 ++++++++- .../src/modules/puterai/MistralAIService.js | 18 +++++++- .../puterai/OpenAICompletionService.js | 25 ++++++++--- .../src/modules/puterai/TogetherAIService.js | 18 +++++++- 6 files changed, 132 insertions(+), 9 deletions(-) diff --git a/src/backend/src/modules/puterai/AIChatService.js b/src/backend/src/modules/puterai/AIChatService.js index cddf07b5c..24df4b955 100644 --- a/src/backend/src/modules/puterai/AIChatService.js +++ b/src/backend/src/modules/puterai/AIChatService.js @@ -1,4 +1,6 @@ const BaseService = require("../../services/BaseService"); +const { TypeSpec } = require("../../services/drivers/meta/Construct"); +const { TypedValue } = require("../../services/drivers/meta/Runtime"); const { Context } = require("../../util/context"); const MAX_FALLBACKS = 3 + 1; // includes first attempt @@ -18,6 +20,17 @@ class AIChatService extends BaseService { } _init () { this.kvkey = this.modules.uuidv4(); + + const svc_event = this.services.get('event'); + svc_event.on('ai.prompt.report-usage', async (_, details) => { + const user_id = details.actor?.type?.user?.id; + const app_id = details.actor?.type?.app?.id; + const service_name = details.service_used; + const model_name = details.model_used; + const value_uint_1 = details.usage?.input_tokens; + const value_uint_2 = details.usage?.output_tokens; + + console.log('reporting usage', { user_id, app_id, service_name, model_name, value_uint_1, value_uint_2 }); }); } async ['__on_boot.consolidation'] () { @@ -168,6 +181,7 @@ class AIChatService extends BaseService { error = e; errors.push(e); + console.error(e); this.log.error('error calling service', { intended_service, model, @@ -228,6 +242,34 @@ class AIChatService extends BaseService { response_metadata.service_used = service_used; const username = Context.get('actor').type?.user?.username; + + if ( + // Check if we have 'ai-chat-intermediate' response type; + // this means we're streaming and usage comes from a promise. + (ret.result instanceof TypedValue) && + TypeSpec.adapt({ $: 'ai-chat-intermediate' }) + .equals(ret.result.type) + ) { + (async () => { + const usage_promise = ret.result.value.usage_promise; + const usage = await usage_promise; + await svc_event.emit('ai.prompt.report-usage', { + actor: Context.get('actor'), + service_used, + model_used, + usage, + }); + })(); + return ret.result.value.response; + } else { + await svc_event.emit('ai.prompt.report-usage', { + actor: Context.get('actor'), + username, + service_used, + model_used, + usage: ret.result.usage, + }); + } console.log('emitting ai.prompt.complete'); await svc_event.emit('ai.prompt.complete', { diff --git a/src/backend/src/modules/puterai/ClaudeService.js b/src/backend/src/modules/puterai/ClaudeService.js index 2299d648e..98b14c18b 100644 --- a/src/backend/src/modules/puterai/ClaudeService.js +++ b/src/backend/src/modules/puterai/ClaudeService.js @@ -4,6 +4,7 @@ const { whatis } = require("../../util/langutil"); const { PassThrough } = require("stream"); const { TypedValue } = require("../../services/drivers/meta/Runtime"); const APIError = require("../../api/APIError"); +const { TeePromise } = require("../../util/promise"); const PUTER_PROMPT = ` You are running on an open-source platform called Puter, @@ -102,6 +103,8 @@ class ClaudeService extends BaseService { } if ( stream ) { + let usage_promise = new TeePromise(); + const stream = new PassThrough(); const retval = new TypedValue({ $: 'stream', @@ -116,7 +119,16 @@ class ClaudeService extends BaseService { system: PUTER_PROMPT + JSON.stringify(system_prompts), messages: adapted_messages, }); + const counts = { input_tokens: 0, output_tokens: 0 }; for await ( const event of completion ) { + const input_tokens = + (event?.usage ?? event?.message?.usage)?.input_tokens; + const output_tokens = + (event?.usage ?? event?.message?.usage)?.output_tokens; + + if ( input_tokens ) counts.input_tokens += input_tokens; + if ( output_tokens ) counts.output_tokens += output_tokens; + if ( event.type !== 'content_block_delta' || event.delta.type !== 'text_delta' @@ -127,9 +139,14 @@ class ClaudeService extends BaseService { stream.write(str + '\n'); } stream.end(); + usage_promise.resolve(counts); })(); - return retval; + return new TypedValue({ $: 'ai-chat-intermediate' }, { + stream: true, + response: retval, + usage_promise: usage_promise, + }); } const msg = await this.anthropic.messages.create({ diff --git a/src/backend/src/modules/puterai/GroqAIService.js b/src/backend/src/modules/puterai/GroqAIService.js index 861e71315..daf7d0f51 100644 --- a/src/backend/src/modules/puterai/GroqAIService.js +++ b/src/backend/src/modules/puterai/GroqAIService.js @@ -2,6 +2,7 @@ const { PassThrough } = require("stream"); const BaseService = require("../../services/BaseService"); const { TypedValue } = require("../../services/drivers/meta/Runtime"); const { nou } = require("../../util/langutil"); +const { TeePromise } = require("../../util/promise"); class GroqAIService extends BaseService { static MODULES = { @@ -50,6 +51,8 @@ class GroqAIService extends BaseService { }); if ( stream ) { + const usage_promise = new TeePromise(); + const stream = new PassThrough(); const retval = new TypedValue({ $: 'stream', @@ -58,6 +61,15 @@ class GroqAIService extends BaseService { }, stream); (async () => { for await ( const chunk of completion ) { + let usage = chunk?.x_groq?.usage ?? chunk.usage; + if ( usage ) { + usage_promise.resolve({ + input_tokens: usage.prompt_tokens, + output_tokens: usage.completion_tokens, + }); + continue; + } + if ( chunk.choices.length < 1 ) continue; if ( chunk.choices[0].finish_reason ) { stream.end(); @@ -71,7 +83,12 @@ class GroqAIService extends BaseService { } stream.end(); })(); - return retval; + + return new TypedValue({ $: 'ai-chat-intermediate' }, { + stream: true, + response: retval, + usage_promise: usage_promise, + }); } const ret = completion.choices[0]; diff --git a/src/backend/src/modules/puterai/MistralAIService.js b/src/backend/src/modules/puterai/MistralAIService.js index b70e45da6..2f9caffb1 100644 --- a/src/backend/src/modules/puterai/MistralAIService.js +++ b/src/backend/src/modules/puterai/MistralAIService.js @@ -4,6 +4,7 @@ const { TypedValue } = require("../../services/drivers/meta/Runtime"); const { nou } = require("../../util/langutil"); const axios = require('axios'); +const { TeePromise } = require("../../util/promise"); class MistralAIService extends BaseService { static MODULES = { @@ -149,6 +150,8 @@ class MistralAIService extends BaseService { } if ( stream ) { + let usage_promise = new TeePromise(); + const stream = new PassThrough(); const retval = new TypedValue({ $: 'stream', @@ -164,6 +167,14 @@ class MistralAIService extends BaseService { // just because Mistral wants to be different chunk = chunk.data; + if ( chunk.usage ) { + usage_promise.resolve({ + input_tokens: chunk.usage.promptTokens, + output_tokens: chunk.usage.completionTokens, + }); + continue; + } + if ( chunk.choices.length < 1 ) continue; if ( chunk.choices[0].finish_reason ) { stream.end(); @@ -177,7 +188,12 @@ class MistralAIService extends BaseService { } stream.end(); })(); - return retval; + + return new TypedValue({ $: 'ai-chat-intermediate' }, { + stream: true, + response: retval, + usage_promise: usage_promise, + }); } const completion = await this.client.chat.complete({ diff --git a/src/backend/src/modules/puterai/OpenAICompletionService.js b/src/backend/src/modules/puterai/OpenAICompletionService.js index 8a81e2409..4735094b4 100644 --- a/src/backend/src/modules/puterai/OpenAICompletionService.js +++ b/src/backend/src/modules/puterai/OpenAICompletionService.js @@ -5,6 +5,7 @@ const { TypedValue } = require('../../services/drivers/meta/Runtime'); const { Context } = require('../../util/context'); const SmolUtil = require('../../util/smolutil'); const { nou } = require('../../util/langutil'); +const { TeePromise } = require('../../util/promise'); class OpenAICompletionService extends BaseService { static MODULES = { @@ -287,9 +288,14 @@ class OpenAICompletionService extends BaseService { model: model, max_tokens, stream, + ...(stream ? { + stream_options: { include_usage: true }, + } : {}), }); - + if ( stream ) { + let usage_promise = new TeePromise(); + const entire = []; const stream = new PassThrough(); const retval = new TypedValue({ @@ -300,11 +306,14 @@ class OpenAICompletionService extends BaseService { (async () => { for await ( const chunk of completion ) { entire.push(chunk); - if ( chunk.choices.length < 1 ) continue; - if ( chunk.choices[0].finish_reason ) { - stream.end(); - break; + if ( chunk.usage ) { + usage_promise.resolve({ + input_tokens: chunk.usage.prompt_tokens, + output_tokens: chunk.usage.completion_tokens, + }); + continue; } + if ( chunk.choices.length < 1 ) continue; if ( nou(chunk.choices[0].delta.content) ) continue; const str = JSON.stringify({ text: chunk.choices[0].delta.content @@ -313,6 +322,12 @@ class OpenAICompletionService extends BaseService { } stream.end(); })(); + + return new TypedValue({ $: 'ai-chat-intermediate' }, { + stream: true, + response: retval, + usage_promise: usage_promise, + }); return retval; } diff --git a/src/backend/src/modules/puterai/TogetherAIService.js b/src/backend/src/modules/puterai/TogetherAIService.js index 2920d1833..81774fe49 100644 --- a/src/backend/src/modules/puterai/TogetherAIService.js +++ b/src/backend/src/modules/puterai/TogetherAIService.js @@ -2,6 +2,7 @@ const { PassThrough } = require("stream"); const BaseService = require("../../services/BaseService"); const { TypedValue } = require("../../services/drivers/meta/Runtime"); const { nou } = require("../../util/langutil"); +const { TeePromise } = require("../../util/promise"); class TogetherAIService extends BaseService { static MODULES = { @@ -52,6 +53,8 @@ class TogetherAIService extends BaseService { }); if ( stream ) { + let usage_promise = new TeePromise(); + const stream = new PassThrough(); const retval = new TypedValue({ $: 'stream', @@ -60,6 +63,14 @@ class TogetherAIService extends BaseService { }, stream); (async () => { for await ( const chunk of completion ) { + // DRY: same as openai + if ( chunk.usage ) { + usage_promise.resolve({ + input_tokens: chunk.usage.prompt_tokens, + output_tokens: chunk.usage.completion_tokens, + }); + } + if ( chunk.choices.length < 1 ) continue; if ( chunk.choices[0].finish_reason ) { stream.end(); @@ -73,7 +84,12 @@ class TogetherAIService extends BaseService { } stream.end(); })(); - return retval; + + return new TypedValue({ $: 'ai-chat-intermediate' }, { + stream: true, + response: retval, + usage_promise: usage_promise, + }); } // return completion.choices[0];