从零到自动化:手把手教你用Python脚本调用Redfish API管理服务器(附Postman转Python代码技巧)

发布时间:2026/6/8 2:31:12

从零到自动化:手把手教你用Python脚本调用Redfish API管理服务器(附Postman转Python代码技巧) 从零到自动化手把手教你用Python脚本调用Redfish API管理服务器附Postman转Python代码技巧当你已经熟悉了在Postman中测试Redfish接口的基本操作却苦于无法将这些零散的请求整合成自动化流程时这篇文章正是为你准备的。我们将从Postman的测试场景出发逐步构建一个完整的Python自动化框架让你能够轻松管理服务器生命周期中的各种操作。Redfish作为现代服务器管理的标准接口其RESTful设计让自动化变得可能。但要从手动测试转向生产级脚本我们需要解决认证管理、错误处理、并发控制等一系列问题。下面我将分享如何将Postman中的12个典型用例转化为可靠的Python实现。1. 环境准备与基础架构在开始编码前我们需要搭建好Python环境并理解Redfish的基本工作流程。不同于Postman的一次性测试自动化脚本需要考虑长期运行的稳定性。1.1 安装必要依赖pip install requests urllib3建议使用虚拟环境隔离项目依赖。对于生产环境还可以考虑添加retrying库用于错误重试pip install retrying1.2 构建基础请求类我们将创建一个RedfishClient类来封装所有公共操作避免重复代码。这个类需要处理会话管理认证令牌获取与刷新请求重试机制统一的错误处理响应解析import requests from urllib3.exceptions import InsecureRequestWarning # 禁用SSL警告仅用于测试环境 requests.packages.urllib3.disable_warnings(categoryInsecureRequestWarning) class RedfishClient: def __init__(self, base_url, username, password): self.base_url base_url.rstrip(/) self.session requests.Session() self.session.verify False # 生产环境应配置合法证书 self.auth_token None self._login(username, password) def _login(self, username, password): url f{self.base_url}/redfish/v1/SessionService/Sessions headers {Content-Type: application/json} payload {UserName: username, Password: password} response self.session.post(url, jsonpayload, headersheaders) response.raise_for_status() self.auth_token response.headers.get(X-Auth-Token) self.session.headers.update({X-Auth-Token: self.auth_token})注意生产环境务必配置正确的SSL证书验证禁用验证仅适用于测试环境2. 核心功能实现现在我们可以基于这个基础类实现各种服务器管理操作。让我们从最常用的几个功能开始。2.1 服务器电源管理电源控制是服务器管理中最基础也最重要的操作。Redfish提供了标准的Reset动作类型class RedfishClient: # ... 延续上面的类定义 def power_control(self, system_id1, actionOn): 控制服务器电源状态 :param system_id: 系统ID默认为1 :param action: 操作类型 [On, GracefulShutdown, ForceRestart] valid_actions [On, GracefulShutdown, ForceRestart] if action not in valid_actions: raise ValueError(fInvalid action. Must be one of {valid_actions}) url f{self.base_url}/redfish/v1/Systems/{system_id}/Actions/ComputerSystem.Reset payload {ResetType: action} response self.session.post(url, jsonpayload) response.raise_for_status() return response.json()使用示例client RedfishClient(https://192.168.1.100, admin, password) # 正常关机 client.power_control(actionGracefulShutdown) # 开机 client.power_control(actionOn)2.2 资产管理信息获取获取服务器硬件信息是监控和资产管理的基础。Redfish的Systems端点提供了丰富的资产数据def get_system_info(self, system_id1): 获取服务器详细信息 url f{self.base_url}/redfish/v1/Systems/{system_id} response self.session.get(url) response.raise_for_status() data response.json() return { model: data.get(Model), manufacturer: data.get(Manufacturer), serial_number: data.get(SerialNumber), power_state: data.get(PowerState), bios_version: data.get(BiosVersion), processor_summary: data.get(ProcessorSummary, {}).get(Count), memory_summary: data.get(MemorySummary, {}).get(TotalSystemMemoryGiB) }3. 用户账户管理自动化BMC用户管理是运维中的重要工作特别是当需要批量部署或定期更换凭证时。3.1 创建用户账户def create_user(self, user_id, username, password, roleAdministrator): 创建新的BMC用户 url f{self.base_url}/redfish/v1/AccountService/Accounts payload { Id: str(user_id), UserName: username, Password: password, RoleId: role } response self.session.post(url, jsonpayload) if response.status_code 400: error response.json().get(error, {}) if already exists in error.get(message, ): raise ValueError(fUser ID {user_id} already exists) response.raise_for_status() return response.json()3.2 安全更新用户信息Redfish要求使用ETag机制来防止并发修改冲突这与Postman中的If-Match头对应def update_user(self, user_id, new_usernameNone, new_passwordNone, roleNone): 更新用户信息需要先获取ETag # 先获取当前用户信息和ETag info_url f{self.base_url}/redfish/v1/AccountService/Accounts/{user_id} info_resp self.session.get(info_url) info_resp.raise_for_status() etag info_resp.headers.get(ETag) if not etag: raise RuntimeError(ETag not found in response headers) # 准备更新数据 payload {} if new_username: payload[UserName] new_username if new_password: payload[Password] new_password if role: payload[RoleId] role if not payload: raise ValueError(No update fields provided) # 执行更新 headers {If-Match: etag} update_url f{self.base_url}/redfish/v1/AccountService/Accounts/{user_id} response self.session.patch(update_url, jsonpayload, headersheaders) response.raise_for_status() return response.json()4. 网络配置与BIOS设置4.1 修改BMC网络配置def update_bmc_network(self, interface_id, ipv4_addressNone, ipv6_addressNone): 修改BMC管理网络配置 # 获取当前接口信息和ETag url f{self.base_url}/redfish/v1/Managers/1/EthernetInterfaces/{interface_id} info_resp self.session.get(url) info_resp.raise_for_status() etag info_resp.headers.get(ETag) if not etag: raise RuntimeError(ETag not found in response headers) # 准备更新数据 payload {} if ipv4_address: payload[IPv4Addresses] [{Address: ipv4_address}] if ipv6_address: payload[IPv6Addresses] [{Address: ipv6_address}] headers {If-Match: etag} response self.session.patch(url, jsonpayload, headersheaders) response.raise_for_status() return response.json()4.2 修改BIOS启动顺序def update_bios_boot_order(self, boot_order): 修改BIOS启动顺序 :param boot_order: 启动顺序列表如 [HardDiskDrive, DVDROMDrive, PXE] # 获取当前BIOS设置和ETag url f{self.base_url}/redfish/v1/Systems/1/Bios/Settings info_resp self.session.get(url) info_resp.raise_for_status() etag info_resp.headers.get(ETag) if not etag: raise RuntimeError(ETag not found in response headers) # 构建启动顺序属性 attributes {} for i, device in enumerate(boot_order): attributes[fBootTypeOrder{i}] device payload {Attributes: attributes} headers {If-Match: etag} response self.session.patch(url, jsonpayload, headersheaders) response.raise_for_status() return response.json()5. 从Postman到Python的转换技巧Postman提供了方便的Generate Code Snippet功能可以快速将请求转换为Python代码。但直接使用生成的代码往往不够健壮我们需要进行优化。5.1 利用Postman生成代码骨架在Postman中构造好请求并测试通过点击右侧的Code按钮选择Python - Requests语言复制生成的代码片段生成的代码示例import requests url https://{{deviceip}}/redfish/v1/SessionService/Sessions payload { UserName: 用户名, Password: 密码 } headers { Content-Type: application/json } response requests.request(POST, url, jsonpayload, headersheaders, verifyFalse) print(response.text)5.2 优化生成的代码直接生成的代码有几个问题需要改进硬编码的凭证信息缺乏错误处理没有会话复用忽略重要的响应头信息优化后的版本def login(base_url, username, password): 优化的登录函数 url f{base_url}/redfish/v1/SessionService/Sessions headers {Content-Type: application/json} payload {UserName: username, Password: password} try: response requests.post( url, jsonpayload, headersheaders, verifyFalse, # 生产环境应配置正确证书 timeout10 ) response.raise_for_status() auth_token response.headers.get(X-Auth-Token) if not auth_token: raise ValueError(X-Auth-Token not found in response headers) return auth_token except requests.exceptions.RequestException as e: raise RuntimeError(fLogin failed: {str(e)})6. 错误处理与健壮性设计生产环境的自动化脚本必须具备完善的错误处理机制。以下是几个关键点6.1 重试机制对于临时性网络问题或服务短暂不可用合理的重试可以提高成功率from retrying import retry class RedfishClient: retry(stop_max_attempt_number3, wait_fixed2000) def _make_request(self, method, path, **kwargs): 带重试的请求封装 url f{self.base_url}{path} try: response self.session.request(method, url, **kwargs) if response.status_code 500: raise requests.exceptions.RequestException( fServer error: {response.status_code} ) return response except requests.exceptions.ConnectionError: raise except requests.exceptions.RequestException as e: if self._should_retry(e): raise raise6.2 统一错误处理定义自定义异常类提供更清晰的错误信息class RedfishError(Exception): Redfish操作异常基类 pass class AuthenticationError(RedfishError): 认证失败异常 pass class ResourceNotFoundError(RedfishError): 资源不存在异常 pass class ConcurrentModificationError(RedfishError): 并发修改冲突异常 pass然后在各方法中抛出适当的异常def get_system_info(self, system_id1): try: response self._make_request(GET, f/redfish/v1/Systems/{system_id}) if response.status_code 404: raise ResourceNotFoundError(fSystem {system_id} not found) response.raise_for_status() return response.json() except requests.exceptions.HTTPError as e: raise RedfishError(fFailed to get system info: {str(e)})7. 进阶技巧与最佳实践7.1 并发操作处理当多个系统同时尝试修改同一资源时ETag机制可以防止数据竞争。以下是处理并发修改的完整流程def safe_update_resource(self, resource_path, update_func, max_retries3): 安全更新资源自动处理ETag冲突 :param resource_path: 资源路径如 /redfish/v1/Systems/1 :param update_func: 接收当前资源数据返回更新内容的函数 :param max_retries: 最大重试次数 for attempt in range(max_retries): # 获取当前资源状态 get_response self._make_request(GET, resource_path) if get_response.status_code 404: raise ResourceNotFoundError(fResource {resource_path} not found) get_response.raise_for_status() current_data get_response.json() etag get_response.headers.get(ETag) if not etag: raise RuntimeError(ETag not found in response headers) # 准备更新内容 update_data update_func(current_data) if not update_data: return current_data # 没有需要更新的内容 # 尝试更新 headers {If-Match: etag} try: patch_response self._make_request( PATCH, resource_path, jsonupdate_data, headersheaders ) patch_response.raise_for_status() return patch_response.json() except requests.exceptions.HTTPError as e: if e.response.status_code 412: # Precondition Failed if attempt max_retries - 1: continue # 重试 raise ConcurrentModificationError( fFailed to update after {max_retries} attempts due to concurrent modifications ) raise raise ConcurrentModificationError(Max retries reached for concurrent update)使用示例def update_boot_order(current): new_order [HardDiskDrive, DVDROMDrive, PXE] attributes {} for i, device in enumerate(new_order): attributes[fBootTypeOrder{i}] device return {Attributes: attributes} client.safe_update_resource(/redfish/v1/Systems/1/Bios/Settings, update_boot_order)7.2 批量操作与任务编排对于需要按顺序执行的多个操作可以创建任务链def perform_server_maintenance(self, system_id1): 执行服务器维护任务序列 operations [ {name: GracefulShutdown, func: self.power_control, args: {action: GracefulShutdown}}, {name: UpdateBIOS, func: self.update_bios_settings, args: {}}, {name: PowerOn, func: self.power_control, args: {action: On}} ] results [] for op in operations: try: result op[func](**op[args]) results.append({ operation: op[name], status: success, result: result }) except RedfishError as e: results.append({ operation: op[name], status: failed, error: str(e) }) break # 关键路径失败则中止 return results7.3 日志记录与审计完善的日志记录对于自动化运维至关重要import logging class RedfishClient: def __init__(self, base_url, username, password): self.logger logging.getLogger(RedfishClient) # ... 其他初始化代码 def _log_request(self, method, url, responseNone, errorNone): 记录请求日志 log_data { method: method, url: url, status: getattr(response, status_code, None) if response else None, error: str(error) if error else None } if error: self.logger.error(Request failed, extralog_data) else: self.logger.info(Request completed, extralog_data) def _make_request(self, method, path, **kwargs): 带日志记录的请求封装 url f{self.base_url}{path} try: response self.session.request(method, url, **kwargs) self._log_request(method, url, response) return response except Exception as e: self._log_request(method, url, errore) raise配置日志格式logging.basicConfig( levellogging.INFO, format%(asctime)s - %(name)s - %(levelname)s - %(message)s, handlers[ logging.FileHandler(redfish_operations.log), logging.StreamHandler() ] )

相关新闻