
PythonFlask自动化处理m3u8视频流从下载到合并再到Cloudflare R2存储的一站式解决方案每次下载m3u8视频后面对一堆零散的.ts文件你是否也感到头疼手动合并不仅耗时耗力还容易出错。本文将带你用PythonFlask构建一个自动化系统实现从m3u8视频流下载、内存合并到直接上传Cloudflare R2存储的全流程解决方案。1. 系统架构设计我们的目标是一个完整的自动化处理流水线主要包含三个核心模块m3u8解析与下载模块负责解析m3u8索引文件并下载所有.ts片段视频合并模块在内存或临时目录中自动合并.ts文件为单个MP4Cloudflare R2上传模块将最终视频文件直接上传到R2存储技术栈选择Python 3.8Flask 2.0 (轻量级Web框架)requests (HTTP请求)ffmpeg-python (视频处理)boto3 (AWS SDK兼容R2)2. 环境准备与依赖安装首先确保系统已安装FFmpeg这是视频处理的核心工具# Ubuntu/Debian sudo apt-get install ffmpeg # macOS brew install ffmpeg # Windows choco install ffmpeg然后安装Python依赖pip install flask requests ffmpeg-python boto3 python-dotenv提示建议使用虚拟环境隔离项目依赖避免版本冲突3. 核心功能实现3.1 m3u8解析与下载我们首先实现m3u8索引文件的解析和.ts片段的并行下载import concurrent.futures import requests from urllib.parse import urljoin def download_ts_segment(ts_url, output_dir, headersNone): try: response requests.get(ts_url, headersheaders, streamTrue) filename ts_url.split(/)[-1] filepath f{output_dir}/{filename} with open(filepath, wb) as f: for chunk in response.iter_content(chunk_size8192): f.write(chunk) return filepath except Exception as e: print(f下载失败 {ts_url}: {str(e)}) return None def download_m3u8_playlist(m3u8_url, output_dir, max_workers5): response requests.get(m3u8_url) response.raise_for_status() ts_urls [line for line in response.text.split(\n) if line and not line.startswith(#) and line.endswith(.ts)] base_url /.join(m3u8_url.split(/)[:-1]) / absolute_ts_urls [urljoin(base_url, ts) for ts in ts_urls] os.makedirs(output_dir, exist_okTrue) with concurrent.futures.ThreadPoolExecutor(max_workersmax_workers) as executor: futures [executor.submit(download_ts_segment, url, output_dir) for url in absolute_ts_urls] for future in concurrent.futures.as_completed(futures): result future.result() if result: print(f成功下载: {result})3.2 内存中的TS文件合并传统方法需要先将所有.ts文件下载到磁盘再合并我们改进为在内存中处理import ffmpeg import tempfile import os def merge_ts_in_memory(ts_file_paths, output_filename): 在内存中合并TS文件 :param ts_file_paths: TS文件路径列表 :param output_filename: 输出文件名 :return: 合并后的文件路径 with tempfile.NamedTemporaryFile(suffix.ts, deleteFalse) as temp_merged: temp_merged_path temp_merged.name try: # 使用ffmpeg合并TS文件 ( ffmpeg .input(concat: |.join(ts_file_paths), formatconcat, safe0) .output(temp_merged_path, ccopy) .run(quietTrue) ) # 转换为MP4格式 output_path output_filename if output_filename.endswith(.mp4) else f{output_filename}.mp4 ( ffmpeg .input(temp_merged_path) .output(output_path, formatmp4, movflagsfaststart) .run(quietTrue) ) return output_path finally: if os.path.exists(temp_merged_path): os.unlink(temp_merged_path)3.3 Cloudflare R2上传集成配置Cloudflare R2存储并实现上传功能import boto3 from botocore.exceptions import ClientError import os def init_r2_client(): return boto3.client( s3, endpoint_urlos.getenv(R2_ENDPOINT), aws_access_key_idos.getenv(R2_ACCESS_KEY), aws_secret_access_keyos.getenv(R2_SECRET_KEY), region_nameauto ) def upload_to_r2(file_path, bucket_name, object_nameNone): 上传文件到Cloudflare R2存储 if object_name is None: object_name os.path.basename(file_path) s3_client init_r2_client() try: response s3_client.upload_file(file_path, bucket_name, object_name) return True except ClientError as e: print(f上传失败: {e}) return False4. Flask应用集成将上述功能集成到Flask应用中提供RESTful APIfrom flask import Flask, request, jsonify import os from werkzeug.utils import secure_filename app Flask(__name__) app.route(/api/process_m3u8, methods[POST]) def process_m3u8(): data request.json m3u8_url data.get(m3u8_url) output_dir data.get(output_dir, temp_videos) bucket_name data.get(bucket_name, my-video-bucket) if not m3u8_url: return jsonify({error: m3u8_url is required}), 400 try: # 1. 下载所有TS片段 ts_files download_m3u8_playlist(m3u8_url, output_dir) # 2. 合并TS文件 output_filename secure_filename(m3u8_url.split(/)[-1].replace(.m3u8, .mp4)) merged_path merge_ts_in_memory(ts_files, output_filename) # 3. 上传到R2 if upload_to_r2(merged_path, bucket_name): # 清理临时文件 for ts_file in ts_files: os.unlink(ts_file) os.unlink(merged_path) return jsonify({ status: success, message: Video processed and uploaded successfully, object_name: output_filename }) else: return jsonify({error: Failed to upload to R2}), 500 except Exception as e: return jsonify({error: str(e)}), 500 if __name__ __main__: app.run(debugTrue)5. 高级优化与错误处理5.1 断点续传实现对于大视频文件实现断点续传功能def download_with_resume(ts_url, output_path, headersNone): temp_path output_path .temp if os.path.exists(output_path): return output_path if os.path.exists(temp_path): downloaded_size os.path.getsize(temp_path) headers headers or {} headers[Range] fbytes{downloaded_size}- else: downloaded_size 0 try: response requests.get(ts_url, headersheaders, streamTrue) if response.status_code 206: # Partial Content mode ab elif response.status_code 200: # Full Content mode wb else: raise Exception(fUnexpected status code: {response.status_code}) with open(temp_path, mode) as f: for chunk in response.iter_content(chunk_size8192): f.write(chunk) if downloaded_size 0 or response.status_code 200: os.rename(temp_path, output_path) return output_path except Exception as e: print(f下载失败 {ts_url}: {str(e)}) return None5.2 视频质量检查合并后自动检查视频完整性def check_video_integrity(video_path): try: probe ffmpeg.probe(video_path) duration float(probe[format][duration]) if duration 0: return True return False except Exception as e: print(f视频检查失败: {str(e)}) return False5.3 性能优化技巧并行处理优化使用线程池控制并发下载数量对TS文件按大小排序先下载大文件实现下载速度限制避免被封禁from concurrent.futures import ThreadPoolExecutor, as_completed def optimized_download(ts_urls, output_dir, max_workers3): # 先获取文件大小并排序 ts_sizes [] for url in ts_urls: try: head requests.head(url) size int(head.headers.get(content-length, 0)) ts_sizes.append((url, size)) except: ts_sizes.append((url, 0)) # 按文件大小降序排序 ts_sizes.sort(keylambda x: x[1], reverseTrue) with ThreadPoolExecutor(max_workersmax_workers) as executor: futures {executor.submit(download_ts_segment, url, output_dir): url for url, _ in ts_sizes} for future in as_completed(futures): url futures[future] try: result future.result() if result: print(f下载完成: {url}) except Exception as e: print(f下载失败 {url}: {str(e)})6. 安全与最佳实践6.1 安全注意事项输入验证对所有输入的URL进行严格验证临时文件清理确保处理完成后删除所有临时文件错误隔离单个TS文件下载失败不应中断整个流程import re from urllib.parse import urlparse def validate_m3u8_url(url): 验证m3u8 URL是否合法 parsed urlparse(url) if not parsed.scheme in (http, https): raise ValueError(只支持HTTP/HTTPS协议) if not re.match(r^https?://[^/]/.\.m3u8(?:\?.*)?$, url): raise ValueError(无效的m3u8 URL格式) return True6.2 配置管理使用环境变量管理敏感信息# .env 文件示例 R2_ENDPOINThttps://your-account-id.r2.cloudflarestorage.com R2_ACCESS_KEYyour-access-key R2_SECRET_KEYyour-secret-key MAX_WORKERS5 TEMP_DIR./temp_videos在Python中加载配置from dotenv import load_dotenv import os load_dotenv() MAX_WORKERS int(os.getenv(MAX_WORKERS, 3)) TEMP_DIR os.getenv(TEMP_DIR, ./temp_videos)7. 部署与扩展7.1 Docker容器化部署创建Dockerfile实现一键部署FROM python:3.9-slim WORKDIR /app RUN apt-get update apt-get install -y \ ffmpeg \ rm -rf /var/lib/apt/lists/* COPY requirements.txt . RUN pip install --no-cache-dir -r requirements.txt COPY . . ENV FLASK_APPapp.py ENV FLASK_ENVproduction CMD [gunicorn, --bind, 0.0.0.0:5000, app:app]7.2 性能监控集成Prometheus监控指标from prometheus_client import start_http_server, Counter, Gauge # 定义指标 VIDEOS_PROCESSED Counter(videos_processed_total, Total videos processed) PROCESSING_TIME Gauge(video_processing_seconds, Time spent processing video) FAILED_DOWNLOADS Counter(ts_download_failures_total, Total TS download failures) app.route(/api/process_m3u8, methods[POST]) def process_m3u8(): start_time time.time() VIDEOS_PROCESSED.inc() try: # ...原有处理逻辑... processing_time time.time() - start_time PROCESSING_TIME.set(processing_time) return jsonify({status: success}) except Exception as e: FAILED_DOWNLOADS.inc() return jsonify({error: str(e)}), 500 if __name__ __main__: start_http_server(8000) app.run(host0.0.0.0, port5000)7.3 扩展思路分布式处理使用Celery将任务分发到多个worker节点进度跟踪实现WebSocket实时报告处理进度格式转换扩展支持更多输出格式如WebM、AV1编码CDN集成上传后自动刷新CDN缓存from celery import Celery celery Celery(tasks, brokerredis://localhost:6379/0) celery.task(bindTrue) def process_m3u8_task(self, m3u8_url, output_dir, bucket_name): # 实现异步处理逻辑 pass