@@ -127,15 +127,14 @@ async def process_chat_message(self, request: ChatRequest) -> AsyncGenerator[Cha
127127 yield ChatResponse (error = "Agent components not properly initialized" )
128128 return
129129
130- # Get or create session
131- session_id = request .session_id or "default"
132- try :
133- # Create a span for this chat request
134- message_preview = request .message [:50 ] + "..." if len (request .message ) > 50 else request .message
135- span_name = f"Zava Agent Chat Request: { message_preview } "
136-
137- with tracer .start_as_current_span (span_name ) as span :
130+ # Create a span for this chat request
131+ message_preview = request .message [:50 ] + "..." if len (request .message ) > 50 else request .message
132+ span_name = f"Zava Agent Chat Request: { message_preview } "
133+
134+ with tracer .start_as_current_span (span_name ) as span :
135+ try :
138136 # Get or create thread for this session
137+ session_id = request .session_id or "default"
139138 session_thread = await self .get_or_create_thread (session_id )
140139
141140 web_handler = None
@@ -223,65 +222,67 @@ async def run_stream() -> None:
223222 # Start the stream task
224223 stream_task = asyncio .create_task (run_stream ())
225224
226- # Stream tokens as they arrive
227- tokens_processed = 0
228- try :
229- while True :
230- try :
231- # Monitor queue health
232- queue_size = web_handler .get_queue_size ()
233- if queue_size > 100 : # Warn if queue gets too large
234- logger .warning ("⚠️ Token queue size is large: %d" , queue_size )
235-
236- # Wait for next token with timeout
237- item = await asyncio .wait_for (
238- web_handler .token_queue .get (), timeout = config .response_timeout_seconds
239- )
240- if item is None : # End of stream signal
225+ # Stream tokens as they arrive
226+ tokens_processed = 0
227+ try :
228+ while True :
229+ try :
230+ # Monitor queue health
231+ queue_size = web_handler .get_queue_size ()
232+ if queue_size > 100 : # Warn if queue gets too large
233+ logger .warning ("⚠️ Token queue size is large: %d" , queue_size )
234+
235+ # Wait for next token with timeout
236+ item = await asyncio .wait_for (
237+ web_handler .token_queue .get (), timeout = config .response_timeout_seconds
238+ )
239+ if item is None : # End of stream signal
240+ break
241+
242+ tokens_processed += 1
243+
244+ # Yield response based on type
245+ if isinstance (item , dict ):
246+ if item .get ("type" ) == "text" :
247+ yield ChatResponse (content = item ["content" ])
248+ elif item .get ("type" ) == "file" :
249+ yield ChatResponse (file_info = item ["file_info" ])
250+ elif item .get ("type" ) == "error" :
251+ yield ChatResponse (error = item ["error" ])
252+ else :
253+ # Backwards compatibility for plain text
254+ yield ChatResponse (content = str (item ))
255+
256+ except asyncio .TimeoutError :
257+ yield ChatResponse (
258+ error = f"Response timeout after { config .response_timeout_seconds } seconds"
259+ )
241260 break
261+ finally :
262+ # Ensure the stream task is properly cleaned up
263+ if stream_task and not stream_task .done ():
264+ stream_task .cancel ()
265+ with contextlib .suppress (asyncio .CancelledError ):
266+ await stream_task
267+
268+ # Clean up any remaining items in the queue to prevent memory leaks
269+ if web_handler :
270+ remaining_items = web_handler .get_queue_size ()
271+ if remaining_items > 0 :
272+ logger .info ("🧹 Cleaning up %d remaining items in token queue" , remaining_items )
273+ await web_handler .cleanup ()
274+
275+ # Send completion signal
276+ if usage :
277+ yield ChatResponse (
278+ content = f"</br></br>Token usage: Prompt: { usage .prompt_tokens } , Completion: { usage .completion_tokens } , Total: { usage .total_tokens } "
279+ )
280+ if incomplete_details :
281+ yield ChatResponse (content = f"</br>{ incomplete_details .reason } " )
242282
243- tokens_processed += 1
244-
245- # Yield response based on type
246- if isinstance (item , dict ):
247- if item .get ("type" ) == "text" :
248- yield ChatResponse (content = item ["content" ])
249- elif item .get ("type" ) == "file" :
250- yield ChatResponse (file_info = item ["file_info" ])
251- elif item .get ("type" ) == "error" :
252- yield ChatResponse (error = item ["error" ])
253- else :
254- # Backwards compatibility for plain text
255- yield ChatResponse (content = str (item ))
256-
257- except asyncio .TimeoutError :
258- yield ChatResponse (error = f"Response timeout after { config .response_timeout_seconds } seconds" )
259- break
260- finally :
261- # Ensure the stream task is properly cleaned up
262- if stream_task and not stream_task .done ():
263- stream_task .cancel ()
264- with contextlib .suppress (asyncio .CancelledError ):
265- await stream_task
266-
267- # Clean up any remaining items in the queue to prevent memory leaks
268- if web_handler :
269- remaining_items = web_handler .get_queue_size ()
270- if remaining_items > 0 :
271- logger .info ("🧹 Cleaning up %d remaining items in token queue" , remaining_items )
272- await web_handler .cleanup ()
273-
274- # Send completion signal
275- if usage :
276- yield ChatResponse (
277- content = f"</br></br>Token usage: Prompt: { usage .prompt_tokens } , Completion: { usage .completion_tokens } , Total: { usage .total_tokens } "
278- )
279- if incomplete_details :
280- yield ChatResponse (content = f"</br>{ incomplete_details .reason } " )
281-
282- yield ChatResponse (done = True )
283- logger .info ("✅ Processed %d tokens successfully" , tokens_processed )
283+ yield ChatResponse (done = True )
284+ logger .info ("✅ Processed %d tokens successfully" , tokens_processed )
284285
285- except Exception as e :
286- logger .error ("❌ Processing chat message: %s" , e )
287- yield ChatResponse (error = f"Streaming error: { e !s} " )
286+ except Exception as e :
287+ logger .error ("❌ Processing chat message: %s" , e )
288+ yield ChatResponse (error = f"Streaming error: { e !s} " )
0 commit comments