
DeOldify图像批量重试机制网络中断/超时自动续传与状态恢复1. 项目背景与需求在实际的图像上色项目中我们经常需要处理大量黑白照片。但在批量处理过程中难免会遇到网络波动、服务超时、系统负载等问题导致处理中断。传统的手动重试方式效率低下特别是处理成百上千张图片时。基于这个痛点我们为DeOldify图像上色服务设计了智能重试机制让批量处理更加稳定可靠。无论是因为网络问题还是服务暂时不可用系统都能自动恢复并继续处理无需人工干预。这个机制特别适合老照片修复项目的批量处理历史档案数字化的大规模上色个人相册的自动化修复商业项目的稳定生产环境2. 核心重试机制设计2.1 自动重试策略我们的重试机制采用指数退避算法在遇到失败时不会立即放弃而是按照智能策略进行多次尝试import time import requests from requests.exceptions import RequestException def smart_retry_request(url, files, max_retries5, initial_delay1): 智能重试请求函数 retries 0 delay initial_delay while retries max_retries: try: response requests.post(url, filesfiles, timeout30) if response.status_code 200: return response else: raise RequestException(fHTTP错误: {response.status_code}) except (RequestException, TimeoutError) as e: retries 1 if retries max_retries: raise e # 指数退避等待时间逐渐增加 wait_time delay * (2 ** (retries - 1)) print(f第{retries}次重试等待{wait_time}秒后重试...) time.sleep(wait_time) return None2.2 断点续传实现对于批量处理我们设计了状态保存机制确保即使程序中断也能从上次停止的地方继续import json import os class BatchProcessor: def __init__(self, checkpoint_filecheckpoint.json): self.checkpoint_file checkpoint_file self.processed_files self.load_checkpoint() def load_checkpoint(self): 加载处理进度 if os.path.exists(self.checkpoint_file): with open(self.checkpoint_file, r) as f: return set(json.load(f)) return set() def save_checkpoint(self): 保存处理进度 with open(self.checkpoint_file, w) as f: json.dump(list(self.processed_files), f) def process_batch(self, file_list): 批量处理文件支持断点续传 for file_path in file_list: if file_path in self.processed_files: print(f跳过已处理文件: {file_path}) continue try: # 处理文件 result self.process_file(file_path) if result: self.processed_files.add(file_path) self.save_checkpoint() # 每处理成功一个就保存进度 except Exception as e: print(f处理失败 {file_path}: {e}) # 继续处理下一个文件不中断整个批次 def process_file(self, file_path): 处理单个文件 # 实际的文件处理逻辑 pass3. 完整批量处理解决方案3.1 增强型批量处理函数基于重试机制我们提供完整的批量处理方案import requests import base64 import os import time import json from PIL import Image from io import BytesIO class DeOldifyBatchProcessor: def __init__(self, service_urlhttp://localhost:7860, max_retries3): self.service_url service_url self.max_retries max_retries self.checkpoint_file deoldify_checkpoint.json self.processed_files self.load_checkpoint() def load_checkpoint(self): 加载处理进度 if os.path.exists(self.checkpoint_file): try: with open(self.checkpoint_file, r) as f: return set(json.load(f)) except: return set() return set() def save_checkpoint(self): 保存处理进度 with open(self.checkpoint_file, w) as f: json.dump(list(self.processed_files), f) def colorize_with_retry(self, image_path, retry_count0): 带重试机制的上色函数 try: with open(image_path, rb) as f: files {image: f} # 设置超时时间 response requests.post( f{self.service_url}/colorize, filesfiles, timeout30 # 30秒超时 ) if response.status_code 200: result response.json() if result[success]: return result else: raise Exception(fAPI返回失败: {result}) else: raise Exception(fHTTP错误: {response.status_code}) except Exception as e: if retry_count self.max_retries: wait_time 2 ** retry_count # 指数退避 print(f第{retry_count1}次重试等待{wait_time}秒...) time.sleep(wait_time) return self.colorize_with_retry(image_path, retry_count 1) else: raise e def batch_process(self, input_folder, output_folder): 批量处理文件夹中的所有图片 # 创建输出文件夹 os.makedirs(output_folder, exist_okTrue) # 支持的图片格式 valid_extensions [.jpg, .jpeg, .png, .bmp, .tiff, .webp] # 获取所有图片文件 all_files [] for filename in os.listdir(input_folder): ext os.path.splitext(filename)[1].lower() if ext in valid_extensions: all_files.append(filename) # 处理每个文件 for filename in all_files: input_path os.path.join(input_folder, filename) # 检查是否已处理 if input_path in self.processed_files: print(f跳过已处理文件: {filename}) continue print(f正在处理: {filename}) try: # 带重试机制的处理 result self.colorize_with_retry(input_path) # 保存处理结果 if result and result[success]: img_data base64.b64decode(result[output_img_base64]) img Image.open(BytesIO(img_data)) # 生成输出文件名 name, ext os.path.splitext(filename) output_filename f{name}_colored.png output_path os.path.join(output_folder, output_filename) img.save(output_path) print(f ✓ 完成: {output_filename}) # 更新处理进度 self.processed_files.add(input_path) self.save_checkpoint() except Exception as e: print(f ✗ 处理失败: {e}) # 继续处理下一个文件不中断批量处理 print(批量处理完成)3.2 使用示例# 初始化处理器 processor DeOldifyBatchProcessor( service_urlhttp://localhost:7860, max_retries5 # 最多重试5次 ) # 开始批量处理 processor.batch_process( input_folder./old_photos, output_folder./colored_photos ) # 如果处理中断再次运行会从断点继续 print(继续处理剩余文件...) processor.batch_process( input_folder./old_photos, output_folder./colored_photos )4. 高级功能与优化4.1 并行处理加速对于大量图片我们可以使用多线程并行处理import concurrent.futures import threading class ParallelProcessor(DeOldifyBatchProcessor): def __init__(self, max_workers4, **kwargs): super().__init__(**kwargs) self.max_workers max_workers self.lock threading.Lock() def process_single_file(self, args): 处理单个文件用于并行处理 filename, input_folder, output_folder args input_path os.path.join(input_folder, filename) # 检查是否已处理 with self.lock: if input_path in self.processed_files: print(f跳过已处理文件: {filename}) return None try: result self.colorize_with_retry(input_path) if result and result[success]: img_data base64.b64decode(result[output_img_base64]) img Image.open(BytesIO(img_data)) name, ext os.path.splitext(filename) output_filename f{name}_colored.png output_path os.path.join(output_folder, output_filename) img.save(output_path) # 更新处理进度 with self.lock: self.processed_files.add(input_path) self.save_checkpoint() return output_path except Exception as e: print(f处理失败 {filename}: {e}) return None def parallel_batch_process(self, input_folder, output_folder): 并行批量处理 os.makedirs(output_folder, exist_okTrue) valid_extensions [.jpg, .jpeg, .png, .bmp, .tiff, .webp] all_files [ f for f in os.listdir(input_folder) if os.path.splitext(f)[1].lower() in valid_extensions ] # 准备参数 args_list [(f, input_folder, output_folder) for f in all_files] # 使用线程池并行处理 with concurrent.futures.ThreadPoolExecutor(max_workersself.max_workers) as executor: results list(executor.map(self.process_single_file, args_list)) successful sum(1 for r in results if r is not None) print(f处理完成成功: {successful}/{len(all_files)})4.2 状态监控与报告添加处理状态监控和详细报告生成class MonitoringProcessor(DeOldifyBatchProcessor): def __init__(self, **kwargs): super().__init__(**kwargs) self.stats { total_processed: 0, successful: 0, failed: 0, retry_count: 0, start_time: None, end_time: None } def batch_process(self, input_folder, output_folder): 带监控的批量处理 self.stats[start_time] time.time() # 调用父类的处理逻辑 super().batch_process(input_folder, output_folder) self.stats[end_time] time.time() self.generate_report() def colorize_with_retry(self, image_path, retry_count0): 带统计的重试处理 result super().colorize_with_retry(image_path, retry_count) # 更新统计信息 with threading.Lock(): self.stats[retry_count] retry_count self.stats[total_processed] 1 if result and result[success]: self.stats[successful] 1 else: self.stats[failed] 1 return result def generate_report(self): 生成处理报告 total_time self.stats[end_time] - self.stats[start_time] avg_time total_time / self.stats[total_processed] if self.stats[total_processed] 0 else 0 report f DeOldify 批量处理报告 开始时间: {time.ctime(self.stats[start_time])} 结束时间: {time.ctime(self.stats[end_time])} 总耗时: {total_time:.2f} 秒 处理统计: - 总共处理: {self.stats[total_processed]} 个文件 - 成功: {self.stats[successful]} - 失败: {self.stats[failed]} - 成功率: {(self.stats[successful]/self.stats[total_processed]*100):.1f}% 性能指标: - 平均处理时间: {avg_time:.2f} 秒/文件 - 总重试次数: {self.stats[retry_count]} print(report) # 保存报告到文件 with open(processing_report.txt, w) as f: f.write(report)5. 实际应用示例5.1 家庭老照片修复项目# 初始化处理器 processor MonitoringProcessor( service_urlhttp://localhost:7860, max_retries3, max_workers2 # 家庭用户建议2个线程避免过度负载 ) # 处理家庭老照片 processor.parallel_batch_process( input_folder/home/user/old_family_photos, output_folder/home/user/colored_family_photos ) # 查看处理报告 print(处理报告已保存到 processing_report.txt)5.2 历史档案批量处理# 对于重要历史档案使用更保守的设置 archival_processor MonitoringProcessor( service_urlhttp://localhost:7860, max_retries5, # 更多重试次数 max_workers1 # 单线程处理避免服务器过载 ) # 设置检查点备份 archival_processor.checkpoint_file archival_checkpoint.json # 处理历史档案 archival_processor.batch_process( input_folder/archive/historical_photos, output_folder/archive/colored_historical )6. 总结与最佳实践通过实现智能重试机制和状态恢复功能DeOldify图像上色服务的批量处理能力得到了显著提升。这个解决方案提供了自动故障恢复网络中断或服务超时时的自动重试断点续传处理中断后能从上次停止的地方继续并行处理大幅提升批量处理效率详细监控实时统计和报告生成灵活配置可根据不同场景调整参数最佳实践建议对于家庭用户使用2-3个并行线程3次重试对于商业项目根据服务器性能调整线程数建议5次重试对于重要档案使用单线程处理确保数据安全定期清理检查点文件避免积累过多历史数据这个重试机制不仅适用于DeOldify服务其设计思路也可以应用到其他类似的批量处理场景中为各种AI服务提供稳定可靠的处理保障。获取更多AI镜像想探索更多AI镜像和应用场景访问 CSDN星图镜像广场提供丰富的预置镜像覆盖大模型推理、图像生成、视频生成、模型微调等多个领域支持一键部署。