BGE Reranker-v2-m3 API开发指南:基于FastAPI构建高性能服务

发布时间:2026/6/21 23:46:09

BGE Reranker-v2-m3 API开发指南:基于FastAPI构建高性能服务 BGE Reranker-v2-m3 API开发指南基于FastAPI构建高性能服务1. 引言如果你正在构建搜索系统或推荐引擎可能会遇到这样的问题初步检索到的结果很多但相关性参差不齐用户体验不佳。BGE Reranker-v2-m3作为北京智源研究院开发的轻量级重排序模型能够精准计算查询与文档之间的相关性得分帮你优化检索系统的排序结果。本文将手把手教你如何使用FastAPI框架将BGE Reranker-v2-m3封装为生产级API服务。无论你是Python初学者还是有经验的开发者都能跟着教程快速搭建一个高性能的重排序服务。2. 环境准备与模型部署2.1 安装必要依赖首先创建并激活虚拟环境然后安装所需包python -m venv reranker_env source reranker_env/bin/activate # Linux/Mac # 或 reranker_env\Scripts\activate # Windows pip install fastapi uvicorn transformers torch pip install python-multipart pydantic-settings2.2 下载并加载模型创建模型加载脚本确保高效初始化from transformers import AutoModelForSequenceClassification, AutoTokenizer import torch class RerankerModel: def __init__(self, model_nameBAAI/bge-reranker-v2-m3): self.device torch.device(cuda if torch.cuda.is_available() else cpu) self.tokenizer AutoTokenizer.from_pretrained(model_name) self.model AutoModelForSequenceClassification.from_pretrained(model_name) self.model.to(self.device) self.model.eval() def predict(self, query, documents): 计算查询与文档的相关性得分 scores [] for doc in documents: inputs self.tokenizer.encode_plus( query, doc, max_length512, truncationTrue, return_tensorspt ) inputs {k: v.to(self.device) for k, v in inputs.items()} with torch.no_grad(): outputs self.model(**inputs) score outputs.logits.item() scores.append(score) return scores # 全局模型实例 reranker_model RerankerModel()3. FastAPI服务核心实现3.1 基础API接口设计创建主应用文件定义核心路由from fastapi import FastAPI, HTTPException from pydantic import BaseModel from typing import List, Optional import asyncio import concurrent.futures app FastAPI(titleBGE Reranker API, version1.0.0) class RerankRequest(BaseModel): query: str documents: List[str] top_n: Optional[int] None class RerankResult(BaseModel): document: str score: float rank: int app.post(/rerank, response_modelList[RerankResult]) async def rerank_documents(request: RerankRequest): 对文档进行重排序 - query: 查询文本 - documents: 待排序文档列表 - top_n: 返回前N个结果可选 if not request.documents: raise HTTPException(status_code400, detail文档列表不能为空) # 使用线程池执行CPU密集型任务 loop asyncio.get_event_loop() with concurrent.futures.ThreadPoolExecutor() as pool: scores await loop.run_in_executor( pool, lambda: reranker_model.predict(request.query, request.documents) ) # 组合结果并排序 results [] for i, (doc, score) in enumerate(zip(request.documents, scores)): results.append({document: doc, score: score, index: i}) # 按分数降序排序 results.sort(keylambda x: x[score], reverseTrue) # 处理top_n参数 if request.top_n is not None: results results[:request.top_n] return [RerankResult( documentresult[document], scoreresult[score], ranki1 ) for i, result in enumerate(results)] app.get(/health) async def health_check(): 健康检查端点 return {status: healthy, model_loaded: True}3.2 并发处理优化为了提高吞吐量我们需要优化并发处理from fastapi import BackgroundTasks import time from functools import lru_cache # 添加批量处理端点 app.post(/batch_rerank) async def batch_rerank(requests: List[RerankRequest], background_tasks: BackgroundTasks): 批量重排序处理 results [] for request in requests: result await rerank_documents(request) results.append(result) return results # 添加缓存机制可选 lru_cache(maxsize1000) def cached_predict(query: str, document: str) - float: 带缓存的预测函数 return reranker_model.predict_single(query, document)4. 企业级功能实现4.1 监控与日志记录添加监控中间件和日志记录import logging from fastapi import Request import time # 配置日志 logging.basicConfig(levellogging.INFO) logger logging.getLogger(__name__) app.middleware(http) async def log_requests(request: Request, call_next): 请求日志中间件 start_time time.time() response await call_next(request) process_time time.time() - start_time logger.info(f{request.method} {request.url} - 耗时: {process_time:.2f}s) return response # 添加性能监控端点 app.get(/metrics) async def get_metrics(): 获取服务性能指标 return { uptime: time.time() - app.start_time, request_count: app.state.request_count, avg_processing_time: app.state.total_processing_time / app.state.request_count }4.2 速率限制和认证添加基础的安全措施from fastapi import Depends from fastapi.security import APIKeyHeader from slowapi import Limiter, _rate_limit_exceeded_handler from slowapi.util import get_remote_address from slowapi.errors import RateLimitExceeded # 速率限制 limiter Limiter(key_funcget_remote_address) app.state.limiter limiter app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler) # API密钥认证 api_key_header APIKeyHeader(nameX-API-Key) async def get_api_key(api_key: str Depends(api_key_header)): 简单的API密钥验证 if api_key ! your-secret-key: # 实际使用时应该从配置或数据库读取 raise HTTPException(status_code401, detail无效的API密钥) return api_key app.post(/secure/rerank) limiter.limit(10/minute) async def secure_rerank( request: RerankRequest, api_key: str Depends(get_api_key) ): 需要认证的重排序端点 return await rerank_documents(request)5. 部署与运行5.1 创建启动脚本创建启动文件main.pyimport uvicorn from app import app # 假设上面的代码保存在app.py中 if __name__ __main__: uvicorn.run( app, host0.0.0.0, port8000, workers4, # 根据CPU核心数调整 timeout_keep_alive60 )5.2 使用Docker容器化创建DockerfileFROM python:3.9-slim WORKDIR /app COPY requirements.txt . RUN pip install -r requirements.txt COPY . . EXPOSE 8000 CMD [uvicorn, main:app, --host, 0.0.0.0, --port, 8000, --workers, 4]5.3 测试API服务启动服务后使用以下代码测试import requests import json url http://localhost:8000/rerank headers {Content-Type: application/json} data { query: 如何预防感冒, documents: [ 感冒是一种常见的呼吸道疾病主要通过飞沫传播, 预防感冒的方法包括勤洗手、戴口罩、保持社交距离, 新冠病毒的预防措施包括接种疫苗和保持室内通风, 冬季是感冒高发季节需要注意保暖 ], top_n: 3 } response requests.post(url, headersheaders, datajson.dumps(data)) print(json.dumps(response.json(), indent2, ensure_asciiFalse))6. 总结通过本教程我们成功搭建了一个基于FastAPI的BGE Reranker-v2-m3生产级API服务。这个服务不仅提供了基本的重排序功能还包含了并发处理、监控日志、速率限制等企业级特性。实际使用中你可以根据具体需求进一步优化比如添加模型热更新、更复杂的缓存策略、或者集成到现有的微服务架构中。这个API服务可以轻松集成到搜索引擎、推荐系统或问答系统中显著提升结果的相关性和用户体验。记得在生产环境中适当调整配置参数比如worker数量、超时设置等以确保服务的稳定性和性能。如果你遇到性能瓶颈可以考虑使用模型量化、ONNX转换等技术进一步优化推理速度。获取更多AI镜像想探索更多AI镜像和应用场景访问 CSDN星图镜像广场提供丰富的预置镜像覆盖大模型推理、图像生成、视频生成、模型微调等多个领域支持一键部署。

相关新闻