GLM-4.7-Flash实战案例:用Python requests流式解析response.iter_lines()

发布时间:2026/5/17 14:42:17

GLM-4.7-Flash实战案例:用Python requests流式解析response.iter_lines() GLM-4.7-Flash实战案例用Python requests流式解析response.iter_lines()你是不是也遇到过这种情况调用一个大模型API问了一个稍微复杂点的问题然后就开始盯着屏幕发呆看着那个转圈圈的加载图标心里默默数着秒不知道它到底是在思考还是在摸鱼特别是当你想做一个实时对话应用或者需要处理长文本生成时这种“等待-响应”的模式简直让人抓狂。用户可能等了几十秒最后只看到一段完整的文字“啪”一下弹出来中间没有任何反馈体验感直接降到冰点。今天我就来分享一个实战技巧用Python的requests库配合GLM-4.7-Flash的流式API实现真正的实时响应。你问完问题模型一边思考一边回答文字像打字一样逐个蹦出来那种感觉用过就回不去了。1. 为什么需要流式响应在深入代码之前我们先搞清楚一个问题流式响应到底解决了什么痛点1.1 传统API调用的局限性传统的API调用是这样的import requests response requests.post(http://127.0.0.1:8000/v1/chat/completions, json{ model: GLM-4.7-Flash, messages: [{role: user, content: 请写一篇关于人工智能未来发展的文章不少于1000字}], stream: False # 注意这里 }) # 等待...等待...等待... # 可能10秒、20秒甚至更久 result response.json() print(result[choices][0][message][content])这种模式有几个明显的问题用户体验差用户看着空白屏幕干等不知道程序是卡了还是在工作内存压力大如果生成的内容很长需要等全部生成完才能返回服务器和客户端都要缓存完整响应无法中途停止一旦开始生成即使发现回答方向不对也无法中途取消响应时间不可控用户不知道要等多久可能失去耐心直接关闭页面1.2 流式响应的优势流式响应Streaming Response就像打开了一个水龙头数据一点一点地流出来实时反馈模型生成一个字客户端就收到一个字降低延迟第一个token几乎立即返回用户马上能看到响应内存友好不需要缓存完整响应边生成边处理可交互性可以中途停止生成或者根据已生成内容调整后续问题GLM-4.7-Flash的API原生支持流式输出我们只需要学会怎么“接住”这些流出来的数据就行了。2. 基础流式调用从简单开始我们先从一个最简单的例子开始看看流式API的基本用法。2.1 最基本的流式调用import requests import json def basic_streaming(): 最基本的流式调用示例 url http://127.0.0.1:8000/v1/chat/completions # 注意stream参数设置为True response requests.post( url, json{ model: /root/.cache/huggingface/ZhipuAI/GLM-4.7-Flash, messages: [{role: user, content: 用一句话介绍Python语言的优点}], temperature: 0.7, max_tokens: 100, stream: True # 关键参数 }, streamTrue # requests库的stream参数也要设置为True ) # 逐行读取流式响应 for line in response.iter_lines(): if line: # 每行数据格式data: {...} line_text line.decode(utf-8) # 跳过心跳包和结束标记 if line_text.startswith(data: ): data_str line_text[6:] # 去掉data: 前缀 # 如果是[DONE]表示流结束 if data_str [DONE]: print(\n[流式响应结束]) break try: data json.loads(data_str) # 提取生成的文本内容 if choices in data and len(data[choices]) 0: delta data[choices][0].get(delta, {}) content delta.get(content, ) if content: print(content, end, flushTrue) # 实时打印 except json.JSONDecodeError: print(f解析失败: {data_str}) print() # 最后换行 if __name__ __main__: basic_streaming()运行这个代码你会看到文字一个一个地打印出来就像有人在打字一样。这就是流式响应的魅力所在。2.2 理解响应格式流式API返回的数据格式比较特殊我们需要理解它的结构data: {id:chatcmpl-xxx,object:chat.completion.chunk,created:1234567890,model:GLM-4.7-Flash,choices:[{index:0,delta:{content:你},finish_reason:null}]} data: {id:chatcmpl-xxx,object:chat.completion.chunk,created:1234567890,model:GLM-4.7-Flash,choices:[{index:0,delta:{content:好},finish_reason:null}]} data: {id:chatcmpl-xxx,object:chat.completion.chunk,created:1234567890,model:GLM-4.7-Flash,choices:[{index:0,delta:{content:},finish_reason:null}]} data: [DONE]每个数据块chunk包含id: 本次对话的唯一IDchoices[0].delta.content: 本次生成的文本内容可能是一个字、一个词或一个标点choices[0].finish_reason: 结束原因当生成完成时不为null最后会有一个特殊的data: [DONE]标记表示流式响应结束。3. 实战进阶构建完整的流式对话系统现在我们来构建一个更实用的流式对话系统包含错误处理、超时控制、上下文管理等功能。3.1 带错误处理的流式客户端import requests import json import time from typing import Generator, Optional, Dict, Any class GLMStreamingClient: GLM-4.7-Flash流式API客户端 def __init__(self, base_url: str http://127.0.0.1:8000): 初始化客户端 Args: base_url: API服务器地址 self.base_url base_url.rstrip(/) self.api_url f{self.base_url}/v1/chat/completions self.session requests.Session() # 配置会话 self.session.headers.update({ Content-Type: application/json, Accept: text/event-stream # 重要告诉服务器我们需要流式响应 }) def stream_chat( self, messages: list, model: str /root/.cache/huggingface/ZhipuAI/GLM-4.7-Flash, temperature: float 0.7, max_tokens: int 2048, timeout: int 60 ) - Generator[str, None, None]: 流式对话生成器 Args: messages: 对话消息列表格式[{role: user, content: ...}] model: 模型名称 temperature: 温度参数控制随机性 max_tokens: 最大生成token数 timeout: 超时时间秒 Yields: 每次生成的文本片段 Raises: requests.exceptions.RequestException: 网络或API错误 ValueError: 参数错误或响应解析错误 payload { model: model, messages: messages, temperature: temperature, max_tokens: max_tokens, stream: True } try: # 发送请求设置streamTrue启用流式响应 response self.session.post( self.api_url, jsonpayload, streamTrue, timeouttimeout ) # 检查HTTP状态码 response.raise_for_status() # 检查Content-Type确保是流式响应 content_type response.headers.get(Content-Type, ) if text/event-stream not in content_type: print(f警告服务器返回的不是流式响应Content-Type: {content_type}) # 逐行处理流式响应 buffer # 用于处理跨行的数据 for line in response.iter_lines(decode_unicodeTrue, chunk_size1): if line: # 处理可能的行缓冲 buffer line # 检查是否是一个完整的数据行 if buffer.endswith(\n) or \n in buffer: lines buffer.split(\n) for single_line in lines: if single_line.strip(): yield from self._process_line(single_line.strip()) buffer else: # 继续累积 continue # 处理最后可能剩余的数据 if buffer: yield from self._process_line(buffer) except requests.exceptions.Timeout: raise Exception(f请求超时{timeout}秒) except requests.exceptions.RequestException as e: raise Exception(f请求失败: {str(e)}) def _process_line(self, line: str) - Generator[str, None, None]: 处理单行流式数据 # 跳过空行和注释 if not line or line.startswith(:): return # 解析SSE格式data: {...} if line.startswith(data: ): data_str line[6:] # 去掉data: 前缀 # 结束标记 if data_str [DONE]: return try: data json.loads(data_str) # 提取生成的文本 if choices in data and data[choices]: choice data[choices][0] delta choice.get(delta, {}) # 返回文本内容 if content in delta and delta[content]: yield delta[content] # 检查是否结束 finish_reason choice.get(finish_reason) if finish_reason: # 可以在这里处理结束逻辑 pass except json.JSONDecodeError as e: print(fJSON解析错误: {e}, 原始数据: {data_str}) except KeyError as e: print(f数据格式错误缺少键: {e}) def close(self): 关闭会话 self.session.close() # 使用示例 def example_conversation(): 完整的对话示例 client GLMStreamingClient() try: # 第一轮对话 print(用户: 你好请介绍一下你自己) print(AI: , end, flushTrue) messages [ {role: user, content: 你好请介绍一下你自己} ] full_response for chunk in client.stream_chat(messages, temperature0.8): print(chunk, end, flushTrue) full_response chunk print(\n *50) # 第二轮对话保持上下文 messages.append({role: assistant, content: full_response}) messages.append({role: user, content: 你刚才说你是AI助手那你能帮我做什么呢}) print(用户: 你刚才说你是AI助手那你能帮我做什么呢) print(AI: , end, flushTrue) full_response for chunk in client.stream_chat(messages): print(chunk, end, flushTrue) full_response chunk print() finally: client.close() if __name__ __main__: example_conversation()这个客户端类提供了完整的错误处理、超时控制和上下文管理可以直接用在生产环境中。3.2 添加速率限制和进度显示在实际应用中我们可能还需要控制生成速度或者显示生成进度import threading import queue class EnhancedGLMClient(GLMStreamingClient): 增强版客户端支持速率限制和进度显示 def stream_chat_with_progress( self, messages: list, max_tokens: int 2048, words_per_minute: Optional[int] None, show_progress: bool True ) - Generator[str, None, None]: 带进度显示的流式对话 Args: words_per_minute: 每分钟生成字数模拟打字速度 show_progress: 是否显示进度条 start_time time.time() token_count 0 # 计算延迟模拟打字效果 delay_per_char 0 if words_per_minute and words_per_minute 0: # 假设平均每个中文字符2个英文字符 chars_per_minute words_per_minute * 2 delay_per_char 60.0 / chars_per_minute for chunk in self.stream_chat(messages, max_tokensmax_tokens): # 应用速率限制 if delay_per_char 0: time.sleep(delay_per_char * len(chunk)) # 更新计数 token_count len(chunk) # 显示进度 if show_progress: elapsed time.time() - start_time speed token_count / elapsed if elapsed 0 else 0 progress min(100, (token_count / max_tokens) * 100) # 简单的进度显示 print(f\r生成进度: {progress:.1f}% | 速度: {speed:.1f}字/秒, end) yield chunk if show_progress: print() # 换行 # 使用示例 def example_with_progress(): 带进度显示的示例 client EnhancedGLMClient() messages [ {role: user, content: 写一篇关于机器学习在医疗领域应用的短文约300字} ] print(开始生成...) print(- * 50) full_response for chunk in client.stream_chat_with_progress( messages, max_tokens500, words_per_minute120, # 模拟120字/分钟的打字速度 show_progressTrue ): print(chunk, end, flushTrue) full_response chunk print(\n - * 50) print(f生成完成总字数: {len(full_response)}) client.close()4. 实际应用场景流式响应不仅仅是为了好看在实际应用中有很多重要用途。4.1 实时聊天应用from flask import Flask, Response, render_template_string import json app Flask(__name__) # 简单的HTML页面 CHAT_HTML !DOCTYPE html html head titleGLM-4.7-Flash 实时聊天/title style body { font-family: Arial; max-width: 800px; margin: 0 auto; padding: 20px; } #chat { border: 1px solid #ccc; height: 400px; overflow-y: scroll; padding: 10px; margin-bottom: 10px; } .user { color: blue; margin: 5px 0; } .ai { color: green; margin: 5px 0; } #input { width: 70%; padding: 5px; } button { padding: 5px 15px; } /style /head body h2GLM-4.7-Flash 实时聊天/h2 div idchat/div input idinput typetext placeholder输入消息... button onclicksendMessage()发送/button script const chatDiv document.getElementById(chat); const input document.getElementById(input); function addMessage(role, content) { const div document.createElement(div); div.className role; div.innerHTML strong${role}:/strong ${content}; chatDiv.appendChild(div); chatDiv.scrollTop chatDiv.scrollHeight; } async function sendMessage() { const message input.value.trim(); if (!message) return; input.value ; addMessage(user, message); // 创建AI消息容器 const aiMessageDiv document.createElement(div); aiMessageDiv.className ai; aiMessageDiv.innerHTML strongAI:/strong ; chatDiv.appendChild(aiMessageDiv); try { const response await fetch(/chat, { method: POST, headers: { Content-Type: application/json }, body: JSON.stringify({ message: message }) }); if (!response.ok) throw new Error(请求失败); const reader response.body.getReader(); const decoder new TextDecoder(); let aiText ; while (true) { const { done, value } await reader.read(); if (done) break; const chunk decoder.decode(value); const lines chunk.split(\n); for (const line of lines) { if (line.startsWith(data: )) { const dataStr line.slice(6); if (dataStr [DONE]) break; try { const data JSON.parse(dataStr); if (data.choices data.choices[0].delta.content) { aiText data.choices[0].delta.content; aiMessageDiv.innerHTML strongAI:/strong ${aiText}; chatDiv.scrollTop chatDiv.scrollHeight; } } catch (e) { console.error(解析错误:, e); } } } } } catch (error) { aiMessageDiv.innerHTML span stylecolor:red错误: ${error.message}/span; } } // 回车发送 input.addEventListener(keypress, (e) { if (e.key Enter) sendMessage(); }); /script /body /html app.route(/) def index(): return render_template_string(CHAT_HTML) app.route(/chat, methods[POST]) def chat(): 流式聊天API from GLMStreamingClient import GLMStreamingClient data request.json user_message data.get(message, ) client GLMStreamingClient() messages [{role: user, content: user_message}] def generate(): try: for chunk in client.stream_chat(messages): # 格式化为Server-Sent Events格式 data { choices: [{ delta: {content: chunk}, finish_reason: None }] } yield fdata: {json.dumps(data, ensure_asciiFalse)}\n\n # 发送结束标记 yield data: [DONE]\n\n finally: client.close() return Response(generate(), mimetypetext/event-stream) if __name__ __main__: app.run(debugTrue, port5000)这个简单的Web应用展示了如何将流式API集成到实际产品中用户可以看到实时的打字效果。4.2 长文本生成与处理对于长文本生成流式响应可以边生成边处理避免内存溢出def process_long_document(prompt: str, output_file: str): 生成长文档并实时保存 Args: prompt: 生成提示 output_file: 输出文件路径 client GLMStreamingClient() messages [ {role: user, content: f请生成以下主题的长文档{prompt}\n\n要求结构清晰内容详实不少于2000字。} ] print(f开始生成文档: {prompt}) print(- * 50) # 实时写入文件 with open(output_file, w, encodingutf-8) as f: total_chars 0 start_time time.time() for chunk in client.stream_chat(messages, max_tokens4000): # 实时写入 f.write(chunk) f.flush() # 确保立即写入磁盘 # 实时统计 total_chars len(chunk) elapsed time.time() - start_time # 进度显示 progress (total_chars / 2000) * 100 # 基于目标2000字 speed total_chars / elapsed if elapsed 0 else 0 print(f\r进度: {min(progress, 100):.1f}% | 已生成: {total_chars}字 | 速度: {speed:.1f}字/秒, end) # 可以在这里添加实时处理逻辑 # 比如关键词提取、情感分析、格式检查等 print(f\n生成完成文档已保存到: {output_file}) print(f总字数: {total_chars}耗时: {time.time() - start_time:.1f}秒) client.close() # 使用示例 process_long_document(人工智能在医疗诊断中的应用, 医疗AI报告.txt)4.3 批量处理与管道流式响应还可以用于构建数据处理管道import asyncio import aiohttp from concurrent.futures import ThreadPoolExecutor class AsyncGLMClient: 异步流式客户端支持并发处理 def __init__(self, base_url: str http://127.0.0.1:8000, max_workers: int 5): self.base_url base_url.rstrip(/) self.api_url f{self.base_url}/v1/chat/completions self.executor ThreadPoolExecutor(max_workersmax_workers) async def process_batch_streaming(self, prompts: list, callbackNone): 批量处理提示词使用流式响应 Args: prompts: 提示词列表 callback: 每个提示词处理完成后的回调函数 async with aiohttp.ClientSession() as session: tasks [] for i, prompt in enumerate(prompts): task self._process_single_streaming(session, prompt, i, callback) tasks.append(task) await asyncio.gather(*tasks) async def _process_single_streaming(self, session, prompt: str, index: int, callback): 处理单个提示词的流式响应 payload { model: /root/.cache/huggingface/ZhipuAI/GLM-4.7-Flash, messages: [{role: user, content: prompt}], temperature: 0.7, max_tokens: 1000, stream: True } print(f[任务{index}] 开始处理: {prompt[:50]}...) try: full_response async with session.post(self.api_url, jsonpayload) as response: response.raise_for_status() async for line in response.content: if line: line_text line.decode(utf-8).strip() if line_text.startswith(data: ): data_str line_text[6:] if data_str [DONE]: break try: data json.loads(data_str) if choices in data and data[choices]: delta data[choices][0].get(delta, {}) content delta.get(content, ) if content: full_response content # 可以在这里实时处理每个chunk # print(f[任务{index}] 收到: {content}) except: pass print(f[任务{index}] 处理完成生成{len(full_response)}字) if callback: await callback(index, prompt, full_response) except Exception as e: print(f[任务{index}] 处理失败: {str(e)}) # 使用示例 async def main(): prompts [ 写一首关于春天的诗, 解释什么是机器学习, 用Python写一个快速排序算法, 介绍深度学习的基本概念, 写一段产品推广文案 ] client AsyncGLMClient() async def process_callback(index, prompt, response): 处理完成后的回调 print(f\n[回调] 任务{index}完成:) print(f提示: {prompt[:30]}...) print(f响应: {response[:100]}...\n) await client.process_batch_streaming(prompts, process_callback) # 运行 asyncio.run(main())5. 性能优化与最佳实践在实际使用中我们还需要考虑一些性能优化和最佳实践。5.1 连接池与超时设置import requests from requests.adapters import HTTPAdapter from urllib3.util.retry import Retry class OptimizedGLMClient: 优化版的GLM客户端包含连接池和重试机制 def __init__(self, base_url: str http://127.0.0.1:8000): self.base_url base_url.rstrip(/) self.api_url f{self.base_url}/v1/chat/completions # 创建带连接池的Session self.session requests.Session() # 配置重试策略 retry_strategy Retry( total3, # 最大重试次数 backoff_factor1, # 重试间隔 status_forcelist[429, 500, 502, 503, 504], # 需要重试的状态码 allowed_methods[POST] # 只对POST方法重试 ) # 创建适配器 adapter HTTPAdapter( max_retriesretry_strategy, pool_connections10, # 连接池大小 pool_maxsize10 ) # 挂载适配器 self.session.mount(http://, adapter) self.session.mount(https://, adapter) # 设置超时 self.timeout (10, 60) # (连接超时, 读取超时) # 设置请求头 self.session.headers.update({ Content-Type: application/json, Accept: text/event-stream, User-Agent: GLM-Streaming-Client/1.0 }) def stream_with_timeout(self, messages: list, **kwargs): 带超时控制的流式请求 payload { model: /root/.cache/huggingface/ZhipuAI/GLM-4.7-Flash, messages: messages, stream: True, **kwargs } try: response self.session.post( self.api_url, jsonpayload, streamTrue, timeoutself.timeout ) # 设置流式读取超时 response.raw.read self._timeout_wrapper(response.raw.read) return response except requests.exceptions.Timeout: raise Exception(请求超时请检查网络连接或服务器状态) except requests.exceptions.RequestException as e: raise Exception(f请求失败: {str(e)}) def _timeout_wrapper(self, read_method): 包装read方法添加超时控制 def wrapper(*args, **kwargs): import socket # 设置socket超时 socket.setdefaulttimeout(5) # 5秒读取超时 try: return read_method(*args, **kwargs) except socket.timeout: raise requests.exceptions.Timeout(读取数据超时) finally: socket.setdefaulttimeout(None) return wrapper5.2 错误处理与重试def robust_streaming_request(client, messages, max_retries3): 健壮的流式请求包含错误重试 for attempt in range(max_retries): try: response client.stream_with_timeout(messages) # 检查响应状态 if response.status_code ! 200: if response.status_code 429: print(请求过于频繁等待后重试...) time.sleep(2 ** attempt) # 指数退避 continue elif response.status_code 500: print(f服务器错误({response.status_code})重试中...) time.sleep(1) continue else: response.raise_for_status() return response except requests.exceptions.Timeout: print(f超时 (尝试 {attempt 1}/{max_retries})) if attempt max_retries - 1: wait_time 2 ** attempt # 指数退避 print(f等待 {wait_time} 秒后重试...) time.sleep(wait_time) else: raise Exception(多次重试后仍然超时) except Exception as e: print(f请求失败: {str(e)} (尝试 {attempt 1}/{max_retries})) if attempt max_retries - 1: time.sleep(1) else: raise raise Exception(所有重试尝试都失败了)5.3 内存优化技巧对于长时间运行的流式请求内存管理很重要def memory_efficient_streaming(client, messages, chunk_size1024): 内存高效的流式处理 Args: chunk_size: 每次处理的数据块大小 response robust_streaming_request(client, messages) # 使用生成器逐步处理避免一次性加载所有数据 buffer b for chunk in response.iter_content(chunk_sizechunk_size): buffer chunk # 按行分割处理 while b\n in buffer: line, buffer buffer.split(b\n, 1) if line: yield from process_chunk(line.decode(utf-8)) # 处理剩余数据 if buffer: yield from process_chunk(buffer.decode(utf-8)) def process_chunk(chunk_data): 处理单个数据块 lines chunk_data.strip().split(\n) for line in lines: if line.startswith(data: ): data_str line[6:] if data_str [DONE]: return try: data json.loads(data_str) if choices in data and data[choices]: content data[choices][0].get(delta, {}).get(content, ) if content: yield content except json.JSONDecodeError: # 忽略解析错误继续处理下一个 pass6. 总结通过本文的实战案例我们深入探讨了如何使用Python的requests库配合GLM-4.7-Flash的流式API。从最基础的流式调用到完整的生产级客户端再到各种实际应用场景我希望你能掌握这项实用的技术。6.1 关键要点回顾流式响应的核心价值实时反馈、降低延迟、内存友好、可交互性强技术实现要点设置streamTrue参数使用response.iter_lines()逐行读取解析SSEServer-Sent Events格式数据正确处理[DONE]结束标记生产环境考虑完善的错误处理和重试机制连接池和超时控制内存优化和性能监控异步处理和并发控制6.2 实际应用建议在实际项目中应用流式API时我建议根据场景选择不是所有场景都需要流式响应简单的问答可以直接用普通API用户体验优先对于需要实时反馈的场景如聊天、长文本生成流式响应能显著提升体验监控和优化监控响应时间、错误率、内存使用等指标持续优化兼容性考虑确保前端能正确处理流式数据有降级方案6.3 进一步探索掌握了基础之后你还可以进一步探索结合WebSocket对于需要双向通信的场景可以结合WebSocket实现更实时的交互流式处理管道将流式响应集成到数据处理管道中实现边生成边处理自定义协议根据业务需求设计更高效的流式传输协议性能优化针对大规模并发场景进行深度优化流式API技术正在成为大模型应用的标配掌握这项技术能让你构建出更流畅、更智能的应用。希望本文的实战案例能帮助你在实际项目中更好地应用GLM-4.7-Flash的流式能力。获取更多AI镜像想探索更多AI镜像和应用场景访问 CSDN星图镜像广场提供丰富的预置镜像覆盖大模型推理、图像生成、视频生成、模型微调等多个领域支持一键部署。

相关新闻