
告别低效排查用ZoomEye API Python脚本自动化你的资产发现与监控流程在网络安全和运维领域资产发现与监控是日常工作中最基础却最耗时的环节之一。传统的手动搜索方式不仅效率低下还容易遗漏关键信息。想象一下当你需要监控公司数百个IP段中的新开放端口或服务变更时手动操作几乎是一场噩梦。这正是自动化工具链的价值所在——将重复性劳动交给机器让工程师专注于真正需要人类智慧的决策环节。ZoomEye作为专业的网络空间测绘引擎其API接口为自动化流程提供了强大支持。结合Python的灵活性和丰富的生态库我们可以构建一套完整的资产监控系统实现从发现、分析到告警的全流程自动化。本文将带你从零开始开发一套可集成到现有运维体系中的自动化工具链。1. 环境准备与ZoomEye API基础1.1 获取API访问权限首先需要注册ZoomEye账号并获取API Key访问ZoomEye官网完成注册进入个人中心找到API选项卡生成专属API Key并妥善保存注意免费账号有API调用次数限制企业级需求建议考虑商业授权1.2 安装必要的Python库推荐使用Python 3.8环境安装以下核心依赖pip install requests pandas schedule python-dotenv各库的作用requests处理HTTP API请求pandas数据分析与报表生成schedule定时任务管理python-dotenv环境变量管理1.3 API基础请求示例下面是一个简单的API测试脚本验证环境配置是否正确import requests from dotenv import load_dotenv import os load_dotenv() API_KEY os.getenv(ZOOMEYE_API_KEY) headers { API-KEY: API_KEY } response requests.get( https://api.zoomeye.org/resources-info, headersheaders ) print(response.json())将API Key保存在项目根目录的.env文件中ZOOMEYE_API_KEYyour_api_key_here2. 构建自动化资产发现系统2.1 设计资产搜索策略根据不同的监控需求可以设计多种搜索策略搜索类型适用场景示例查询IP段监控企业内网资产发现cidr:192.168.1.0/24服务发现特定服务版本监控app:nginx port:80漏洞扫描CVE相关设备发现app:Apache Tomcat ver:9.0.02.2 实现基础搜索功能以下代码实现了基本的资产搜索功能def search_assets(query, page1): url https://api.zoomeye.org/host/search params { query: query, page: page } try: response requests.get(url, headersheaders, paramsparams) response.raise_for_status() return response.json() except requests.exceptions.RequestException as e: print(fAPI请求失败: {e}) return None2.3 结果解析与存储将API返回的JSON数据转换为结构化DataFrameimport pandas as pd def parse_results(data): records [] for item in data.get(matches, []): record { ip: item.get(ip), port: item.get(portinfo, {}).get(port), service: item.get(portinfo, {}).get(service), app: item.get(portinfo, {}).get(app), version: item.get(portinfo, {}).get(version), timestamp: item.get(timestamp) } records.append(record) return pd.DataFrame(records)3. 实现资产变更监控系统3.1 设计变更检测算法核心思路是通过定期扫描对比历史数据检测以下变更类型新增资产之前不存在的IP:Port组合服务变更同一IP端口上的服务类型或版本变化资产下线之前存在的服务不再响应3.2 实现差异检测功能def detect_changes(current_df, previous_df): # 合并两期数据 merged pd.merge( current_df, previous_df, on[ip, port], howouter, suffixes(_current, _previous) ) # 识别变更类型 changes [] for _, row in merged.iterrows(): if pd.isna(row[service_previous]): changes.append({ type: 新增, ip: row[ip], port: row[port], detail: f新增服务: {row[service_current]} }) elif pd.isna(row[service_current]): changes.append({ type: 下线, ip: row[ip], port: row[port], detail: f服务下线: {row[service_previous]} }) elif row[service_current] ! row[service_previous]: changes.append({ type: 变更, ip: row[ip], port: row[port], detail: f{row[service_previous]} → {row[service_current]} }) return pd.DataFrame(changes)3.3 定时任务集成使用schedule库实现定时扫描import schedule import time def monitoring_job(): print(f开始执行监控任务: {time.ctime()}) current_data search_assets(cidr:192.168.1.0/24) current_df parse_results(current_data) try: previous_df pd.read_csv(last_scan.csv) except FileNotFoundError: previous_df pd.DataFrame() if not previous_df.empty: changes detect_changes(current_df, previous_df) if not changes.empty: alert_on_changes(changes) current_df.to_csv(last_scan.csv, indexFalse) # 每天凌晨2点执行 schedule.every().day.at(02:00).do(monitoring_job) while True: schedule.run_pending() time.sleep(60)4. 告警与报表系统集成4.1 多通道告警实现根据企业环境选择适当的告警方式邮件告警适合非紧急通知即时通讯工具如企业微信、钉钉机器人SIEM系统集成通过Webhook对接安全事件管理系统示例邮件告警实现import smtplib from email.mime.text import MIMEText def send_alert_email(changes_df): msg MIMEText(changes_df.to_html(), html) msg[Subject] 资产变更告警 msg[From] monitorexample.com msg[To] security-teamexample.com with smtplib.SMTP(smtp.example.com) as server: server.send_message(msg)4.2 自动化报表生成使用pandas的样式功能创建专业报表def generate_report(changes_df): report changes_df.style\ .applymap(highlight_critical, subset[type])\ .set_properties(**{text-align: left})\ .set_table_styles([ {selector: th, props: [(background-color, #f5f5f5)]} ]) with open(report.html, w) as f: f.write(report.render()) def highlight_critical(val): color red if val 新增 else orange if val 变更 else gray return fcolor: {color}5. 高级应用与优化技巧5.1 性能优化策略当监控大量资产时需要考虑API调用效率并行请求使用多线程/协程并发处理结果缓存减少重复查询增量检查只检查可能发生变更的IP段5.2 与企业系统集成将监控系统融入现有DevOps工具链# Jenkins集成示例 def jenkins_callback(build_url): payload { text: f资产扫描完成查看报告: {build_url} } requests.post(JENKINS_WEBHOOK, jsonpayload) # Prometheus监控指标导出 from prometheus_client import start_http_server, Gauge assets_gauge Gauge(zoomeye_assets, Discovered assets count) def export_metrics(): data search_assets(cidr:192.168.1.0/24) assets_gauge.set(len(data.get(matches, [])))5.3 异常处理与日志健壮的生产级代码需要完善的错误处理import logging logging.basicConfig( filenamemonitor.log, levellogging.INFO, format%(asctime)s - %(levelname)s - %(message)s ) def safe_search(query): try: return search_assets(query) except Exception as e: logging.error(f搜索失败: {str(e)}) notify_admin(f搜索失败: {query}) return None在实际项目中这套系统帮助我们将资产发现时间从原来的数小时缩短到几分钟变更检测的实时性也从天级别提升到小时级别。一个特别有用的技巧是为不同类型的资产设置不同的扫描频率——对核心业务系统提高扫描频率而对相对静态的基础设施则降低频率以节省API配额。