1+ import {
2+ insertMessage as insertMessageIntoBigquery ,
3+ setupBigQuery ,
4+ } from '@codebuff/bigquery'
15import { env } from '@codebuff/internal/env'
26
3- export async function handleOpenrouterStream ( { body } : { body : any } ) {
7+ import { OpenRouterStreamChatCompletionChunkSchema } from './type/openrouter'
8+
9+ import type { OpenRouterStreamChatCompletionChunk } from './type/openrouter'
10+
11+ import { errorToObject } from '@/util/error'
12+ import { logger } from '@/util/logger'
13+
14+ type StreamState = { responseText : string }
15+
16+ export async function handleOpenRouterStream ( {
17+ body,
18+ userId,
19+ } : {
20+ body : any
21+ userId : string
22+ } ) {
423 // Ensure usage tracking is enabled
524 if ( body . usage === undefined ) {
625 body . usage = { }
@@ -30,7 +49,8 @@ export async function handleOpenrouterStream({ body }: { body: any }) {
3049 throw new Error ( 'Failed to get response reader' )
3150 }
3251
33- let heartbeatInterval : ReturnType < typeof setInterval >
52+ let heartbeatInterval : NodeJS . Timeout
53+ let state : StreamState = { responseText : '' }
3454
3555 // Create a ReadableStream that Next.js can handle
3656 const stream = new ReadableStream ( {
@@ -67,6 +87,8 @@ export async function handleOpenrouterStream({ body }: { body: any }) {
6787 const line = buffer . slice ( 0 , lineEnd + 1 )
6888 buffer = buffer . slice ( lineEnd + 1 )
6989
90+ state = await handleLine ( { userId, request : body , line, state } )
91+
7092 // Forward the line to the client
7193 controller . enqueue ( new TextEncoder ( ) . encode ( line ) )
7294
@@ -90,3 +112,109 @@ export async function handleOpenrouterStream({ body }: { body: any }) {
90112
91113 return stream
92114}
115+
116+ async function handleLine ( {
117+ userId,
118+ request,
119+ line,
120+ state,
121+ } : {
122+ userId : string
123+ request : unknown
124+ line : string
125+ state : StreamState
126+ } ) : Promise < StreamState > {
127+ if ( ! line . startsWith ( 'data: ' ) ) {
128+ return state
129+ }
130+
131+ const raw = line . slice ( 'data: ' . length )
132+ if ( raw === '[DONE]\n' ) {
133+ return state
134+ }
135+
136+ // Parse the string into an object
137+ let obj
138+ try {
139+ obj = JSON . parse ( raw )
140+ } catch ( error ) {
141+ logger . warn (
142+ `Received non-JSON OpenRouter response: ${ JSON . stringify ( errorToObject ( error ) , null , 2 ) } `
143+ )
144+ return state
145+ }
146+
147+ // Extract usage
148+ const parsed = OpenRouterStreamChatCompletionChunkSchema . safeParse ( obj )
149+ if ( ! parsed . success ) {
150+ logger . warn (
151+ `Unable to parse OpenRotuer response: ${ JSON . stringify ( errorToObject ( parsed . error ) , null , 2 ) } `
152+ )
153+ return state
154+ }
155+
156+ return await handleResponse ( { userId, request, data : parsed . data , state } )
157+ }
158+
159+ async function handleResponse ( {
160+ userId,
161+ request,
162+ data,
163+ state,
164+ } : {
165+ userId : string
166+ request : unknown
167+ data : OpenRouterStreamChatCompletionChunk
168+ state : StreamState
169+ } ) : Promise < StreamState > {
170+ state = await handleStreamChunk ( { data, state } )
171+
172+ if ( 'error' in data || ! data . usage ) {
173+ // Stream not finished
174+ return state
175+ }
176+ const usage = data . usage
177+
178+ // do not await this
179+ setupBigQuery ( ) . then ( ( ) =>
180+ insertMessageIntoBigquery ( {
181+ id : data . id ,
182+ user_id : userId ,
183+ finished_at : new Date ( ) ,
184+ created_at : new Date ( data . created * 1000 ) ,
185+ request,
186+ response : state . responseText ,
187+ output_tokens : usage . completion_tokens ,
188+ reasoning_tokens : usage . completion_tokens_details ?. reasoning_tokens ,
189+ cost : usage . cost ,
190+ upstream_inference_cost : usage . cost_details ?. upstream_inference_cost ,
191+ input_tokens : usage . prompt_tokens ,
192+ cache_read_input_tokens : usage . prompt_tokens_details ?. cached_tokens ,
193+ } )
194+ )
195+ const openRouterCost = usage . cost ?? 0
196+ const upstreamCost = usage . cost_details ?. upstream_inference_cost ?? 0
197+ const cost = openRouterCost + upstreamCost
198+ // asdf todo: charge user
199+ return state
200+ }
201+
202+ async function handleStreamChunk ( {
203+ data,
204+ state,
205+ } : {
206+ data : OpenRouterStreamChatCompletionChunk
207+ state : StreamState
208+ } ) : Promise < StreamState > {
209+ if ( 'error' in data ) {
210+ logger . warn ( { streamChunk : data } , 'Received error from OpenRouter' )
211+ return state
212+ }
213+
214+ if ( ! data . choices . length ) {
215+ logger . warn ( { streamChunk : data } , 'Received empty choices from OpenRouter' )
216+ }
217+ const choice = data . choices [ 0 ]
218+ state . responseText += choice . delta ?. content ?? ''
219+ return state
220+ }
0 commit comments