
1. 项目概述与核心价值最近在整理自己的安全运营工具箱发现威胁情报的获取和利用一直是个痛点。公开的威胁情报源TI Feed虽然多但要么更新不及时要么与自己实际面临的威胁画像不匹配。花钱买商业情报固然好但对于个人或小团队来说成本又太高。于是我就琢磨着能不能自己动手低成本地构建一个自动化、可持续的威胁情报源。这个项目的核心思路就是利用HFish蜜罐作为“传感器”来捕获原始攻击数据通过Python脚本进行自动化处理和提炼最后将结构化的威胁情报如恶意IP、攻击指纹发布到GitHub Pages上形成一个可公开或内部订阅的、持续更新的情报源。这不仅仅是一个技术拼装更是一种主动防御思维的实践。蜜罐捕获的是最真实、最“热乎”的攻击行为从中提取的IP、攻击路径、漏洞利用手法对于你所在的网络环境具有极高的参考价值。通过自动化流程你可以将零散的告警日志转化为标准化的威胁情报指标IOCs并对外提供服务。无论是用于丰富自己防火墙的阻断名单还是分享给社区都能显著提升对新兴威胁的感知速度和响应能力。接下来我就把搭建这个“自动化威胁情报生产线”的上半部分——从环境准备到数据采集与处理——的详细步骤和踩过的坑毫无保留地分享出来。2. 核心组件选型与架构设计2.1 为什么是HFishPythonGitHub Pages在开始动手之前我们需要理解每个组件在体系中的角色和选型理由。整个系统的设计目标是自动化、低成本、可持续、易维护。HFish我们的“前线哨所”选择HFish而非其他开源蜜罐如T-Pot、Cowrie主要基于以下几点实战考量部署与维护极简HFish提供一键部署脚本和清晰的Web管理界面从下载到上线可能只需要10分钟。这对于需要长期稳定运行的生产环境至关重要避免了复杂的依赖和环境配置问题。协议仿真丰富它内置了90多种蜜罐服务覆盖了从SSH、RDP、MySQL到各类OA、Web服务仿真。这意味着它能吸引更多样化的攻击流量捕获的攻击数据维度更广有利于生产更全面的情报。数据输出友好HFish支持将告警日志通过Webhook方式推送出来这为我们用Python脚本实时接收和处理数据提供了完美的接口。相比去解析复杂的本地日志文件Webhook是更优雅、解耦的集成方式。资源消耗极低官方称其节点端内存占用可低至50MB。我实际部署在1核1G的云服务器上运行数月依然稳定这对控制成本非常有利。Python自动化流程的“大脑”Python是这个项目的粘合剂和处理器。我们需要它来完成以下几项核心任务接收与解析编写一个Flask或FastAPI服务接收HFish通过Webhook推送过来的JSON格式攻击告警数据。数据清洗与富化从原始告警中提取关键字段如攻击源IP、攻击时间、利用的协议/服务、攻击载荷片段。此外还可以调用外部API如IP地理位置查询、威胁情报平台查询对攻击源IP进行富化增加情报的价值。情报格式化与生成将处理后的数据按照标准的威胁情报格式如STIX/TAXII或更简单的CSV、JSON列表进行组织并生成静态文件。调度与推送通过APScheduler等库设置定时任务定期将新生成的情报文件更新到GitHub仓库。选择Python是因为其生态中有大量成熟的网络、数据处理和任务调度库requests,pandas,flask,apscheduler代码编写快速社区支持强大非常适合构建这类自动化流水线。GitHub Pages情报的“发布平台”为什么选择GitHub Pages来托管最终的情报文件完全免费与稳定它提供免费的静态网站托管服务带宽和可靠性都有保障非常适合发布更新频率不高如每小时或每天更新一次的文本类情报数据。天然的版本控制情报的每一次更新都会在Git仓库中留下记录你可以清晰地回溯某个恶意IP是何时首次出现的这对于分析攻击活动的时间线非常有帮助。易于订阅与集成其他系统可以通过直接访问一个固定的URL如https://yourname.github.io/threat-intel-feed/latest_iocs.json来拉取最新的情报。很多安全设备或SIEM系统都支持通过HTTP URL定期获取阻断列表。访问控制灵活你可以选择仓库公开分享给社区或私有仅自己团队使用通过GitHub的访问令牌进行自动化更新安全又方便。整个架构的工作流如下图所示概念性描述攻击流量触发部署在公网或内网的HFish蜜罐 - HFish将攻击日志通过Webhook实时推送到我们的Python处理服务 - Python服务解析日志提取并富化IOCs生成标准格式文件 - Python服务通过Git API将新文件提交到指定的GitHub仓库 - GitHub Pages自动从仓库更新对外提供最新的情报文件。2.2 系统架构与数据流设计基于上述组件我们需要设计一个松耦合、易于扩展的系统架构。核心思想是事件驱动和模块化。数据流分阶段阐述第一阶段数据采集HFish管理端部署在一台具有公网IP的服务器上VPS即可。在HFish的Web管理后台配置“告警设置”添加一个“Webhook告警”。这里的URL就填写我们即将搭建的Python数据处理服务的接收端点例如http://你的处理服务器IP:5000/hfish/webhook。这样每当蜜罐捕获到一次攻击就会将一条包含攻击详情的JSON消息POST到这个地址。第二阶段数据处理服务这是Python核心脚本所在。我建议将其分为几个模块Webhook接收器一个简单的HTTP服务器使用Flask专门接收HFish的数据。它只做两件事验证请求来源可选通过Token和将数据放入一个消息队列如Redis的List或者直接使用Python的queue.Queue做内存队列。这样做是为了避免因为处理速度慢而阻塞HFish的推送。日志解析器从队列中取出原始JSON进行解析。HFish的Webhook数据格式包含agent_id节点ID、agent_name、type事件类型如攻击、登录、info详细信息等字段。我们需要重点关注info字段里面通常有src_ip攻击源IP、dst_port目标端口、honeypot蜜罐类型、data攻击载荷等黄金信息。情报提炼器这是赋予数据价值的关键步骤。解析器提取出src_ip后提炼器可以去重检查该IP是否在最近一段时间内如24小时已经报告过避免情报源被重复条目淹没。富化调用免费的IP情报API例如ip-api.com查询地理位置virustotal.com的API查询历史恶意评分注意免费API有速率限制。将富化后的信息如国家、城市、ISP、威胁评分附加到该IP记录上。格式化将这条记录转换为一个标准的字典或对象准备输出。第三阶段情报发布文件生成器定时例如每30分钟或在积累一定数量新IOC后将处理好的情报列表写入文件。格式可以选择JSON最通用结构清晰。例如一个包含IP列表及元数据的JSON对象。CSV某些防火墙或系统更容易导入。STIX 2.1 JSON如果你想做得非常专业可以生成符合STIX标准的情报包但这会复杂很多。 我通常生成两个文件latest_iocs.json只包含最近24小时的新增IOCs和full_iocs.json包含所有历史IOCs。GitHub同步器使用PyGithub或gitpython库将生成的新文件提交到GitHub仓库的特定分支通常是gh-pages或main。提交后GitHub Pages会自动部署你的情报URL内容就更新了。这个架构的优点是每个模块职责单一可以通过增加队列消费者来横向扩展处理能力未来如果想增加数据源如其他蜜罐的日志或输出格式如同时推送至SIEM也非常容易。3. 实战部署HFish蜜罐搭建与配置理论讲完我们进入实战环节。第一步是把我们的“哨所”HFish给立起来。3.1 服务器准备与基础环境你需要一台云服务器VPS建议选择国内外主流厂商的基础款如1核CPU、1GB内存、20GB SSD硬盘就完全足够。操作系统推荐Ubuntu 22.04 LTS或CentOS 7/8社区支持好问题容易排查。安全基线配置非常重要蜜罐本身是诱饵但部署蜜罐的服务器必须是坚固的堡垒。在安装任何东西之前请务必完成以下操作更新系统sudo apt update sudo apt upgrade -y(Ubuntu) 或sudo yum update -y(CentOS)。修改SSH端口编辑/etc/ssh/sshd_config找到#Port 22改为Port 你的新端口号例如 2222。然后重启SSH服务sudo systemctl restart sshd。务必确保新端口在防火墙中已放行并且使用密钥登录而非密码否则你可能会把自己锁在外面。配置防火墙使用ufw(Ubuntu) 或firewalld(CentOS) 严格限制入站流量。只开放以下端口你修改后的SSH端口如TCP 2222。HFish管理端Web界面端口默认是4433建议修改。HFish节点端需要暴露的蜜罐服务端口例如你想仿真一个SSH蜜罐就需要开放TCP 22端口给公网。这部分端口规划需要在部署节点时仔细考虑。 例如用ufwsudo ufw allow 2222/tcp,sudo ufw allow 4433/tcp,sudo ufw enable。创建非root用户避免一直使用root操作。sudo adduser hfishadmin然后将其加入sudo组。3.2 HFish管理端安装与初始化HFish提供了非常方便的安装脚本。我们以Linux系统为例# 切换到有sudo权限的用户如刚创建的hfishadmin su - hfishadmin # 下载安装脚本 wget https://hfish.io/install.sh # 给脚本执行权限并运行 chmod x install.sh sudo ./install.sh运行脚本后它会交互式地询问你几个问题安装类型选择管理端。绑定IP通常填写0.0.0.0以便任何网络接口都能访问Web界面。Web端口默认是4433。强烈建议修改为一个不常见的端口比如 54433以减少被批量扫描发现管理界面的风险。用户名/密码设置一个强密码用于登录Web管理后台。安装完成后脚本会提示你访问https://你的服务器IP:54433进行初始化。由于是自签名证书浏览器会提示不安全点击“高级”-“继续前往”即可。初始化注意事项首次登录会要求你修改初始密码请务必设置一个复杂且唯一的密码。在“系统设置”中建议开启“仅允许管理端IP访问API”功能并将你的办公网络IP或处理服务器的IP加入白名单增加安全性。备份你的登录凭证和安装目录。HFish的配置和数据默认在/opt/hfish目录下。3.3 蜜罐节点部署与服务配置管理端装好后它本身就是一个“本地节点”但为了更灵活我们通常会在管理端上添加新的节点甚至可以在其他服务器上部署独立节点。在管理端上添加节点登录HFish管理端Web界面。导航到“节点管理” - “新增节点”。填写节点信息如节点名称例如public-web-honeypot。关键步骤是“服务配置”。点击“添加服务”你会看到一个长长的列表包含了所有可仿真的服务。策略选择对于威胁情报收集我们的目标是“广撒网”。建议选择一些高交互频率的协议和服务例如SSH(22端口)永恒的热点爆破攻击不断。Redis(6379端口)因配置不当导致入侵的案例非常多攻击活跃。MySQL(3306端口)数据库爆破常见。HTTP/HTTPS(80/443端口)可以配置一个仿真的Web登录页面或API端点捕获扫描器和漏洞利用尝试。Tomcat AJP(8009端口)过去重大漏洞的利用依然存在。端口设置你可以使用服务默认端口也可以修改为其他端口。使用非常用端口有时能过滤掉一部分无目标的扫描噪音吸引到更有针对性的攻击者。例如将SSH服务开在22222端口。添加完服务后这个节点就创建好了。HFish会自动在本机启动这些蜜罐服务进程。分布式节点部署可选如果你有多个IP或想在内部网络部署可以在其他机器上安装“节点端”。在“节点管理”页面点击“安装节点”会生成一个针对该节点的安装命令包含连接管理端的令牌。到目标机器上执行该命令即可。这对于收集内网横移攻击情报特别有用。重要心得蜜罐服务不要开得太多太杂初期精选5-8个高价值服务即可。开太多会分散日志增加分析负担也可能因资源占用影响稳定性。重点在于服务的“仿真度”和日志的“质量”而非数量。3.4 告警配置打通与Python服务的桥梁这是连接HFish和后续自动化流程的关键一步。我们要让HFish在受到攻击时主动通知我们的Python处理服务。在HFish管理端进入“告警设置”。选择“Webhook告警”。点击“添加”配置以下参数Webhook名称例如Python Intel Processor。Webhook地址填写你即将运行的Python数据接收服务的URL。假设你的处理服务器IP是10.0.0.100服务端口是5000接收路径是/webhook那么地址就是http://10.0.0.100:5000/webhook。如果处理服务在公网务必使用HTTPS并配置好证书否则令牌可能泄露。密钥这是一个可选但强烈建议填写的字段。你可以生成一个随机的长字符串如YourSuperSecretToken123!。Python服务在收到请求时会校验HTTP头中是否携带正确的密钥以此验证请求是否来自合法的HFish。这能防止任何人向你的端点乱发数据。告警模板保持默认即可HFish会推送结构化的JSON数据。保存后可以点击“测试”按钮发送一条测试告警确保地址可达。至此你的HFish蜜罐已经部署完毕并配置好了数据输出的通道。它已经开始安静地等待“鱼儿”上钩并将所有攻击行为通过Webhook实时推送出来。接下来我们要构建接收和处理这些数据的“大脑”——Python自动化服务。4. Python数据处理服务开发上Webhook接收与解析我们的Python服务是情报生产线的核心。我将分模块详细讲解如何构建一个健壮、高效的处理服务。首先从接收端开始。4.1 项目环境搭建与依赖管理创建一个新的项目目录并使用虚拟环境来隔离依赖这是Python项目的最佳实践。mkdir auto-threat-intel cd auto-threat-intel python3 -m venv venv # 创建虚拟环境 source venv/bin/activate # 激活虚拟环境 (Linux/macOS) # 对于Windows: venv\Scripts\activate安装核心依赖库。我们使用pip并生成requirements.txt文件。pip install flask requests python-dotenv redis apscheduler pygithub pip freeze requirements.txtflask: 轻量级Web框架用于构建Webhook接收接口。requests: 用于调用外部API进行IP富化。python-dotenv: 管理环境变量避免将密钥硬编码在代码中。redis(可选): 如果处理量大使用Redis作为消息队列实现异步处理和解耦。对于小规模可以使用内存队列。apscheduler: 强大的定时任务库用于调度情报文件生成和Git推送任务。pygithub: 官方推荐的GitHub API库用于操作GitHub仓库。4.2 构建安全的Webhook接收端点创建一个app.py文件作为服务的主入口。我们先构建接收端点。# app.py import hmac import hashlib import json import logging from flask import Flask, request, jsonify, abort from queue import Queue import threading # 配置日志 logging.basicConfig(levellogging.INFO, format%(asctime)s - %(levelname)s - %(message)s) logger logging.getLogger(__name__) app Flask(__name__) # 从环境变量读取配置增强安全性 WEBHOOK_SECRET os.environ.get(HFISH_WEBHOOK_SECRET, YourDefaultSecretChangeMe) # 使用一个内存队列来缓冲消息 (生产环境建议用Redis) message_queue Queue(maxsize1000) def verify_signature(data, signature): 验证Webhook签名确保请求来自可信的HFish实例 if not WEBHOOK_SECRET: logger.warning(Webhook secret not set, skipping signature verification.) return True expected_signature hmac.new( WEBHOOK_SECRET.encode(utf-8), data, hashlib.sha256 ).hexdigest() return hmac.compare_digest(expected_signature, signature) app.route(/webhook, methods[POST]) def hfish_webhook(): 接收HFish Webhook的主端点 # 1. 获取签名和原始数据 received_signature request.headers.get(X-HFish-Signature, ) raw_data request.get_data() # 2. 验证签名 if not verify_signature(raw_data, received_signature): logger.warning(fInvalid webhook signature from {request.remote_addr}) abort(403, descriptionInvalid signature) # 3. 解析JSON数据 try: event_data request.get_json() if not event_data: raise ValueError(Empty JSON payload) except Exception as e: logger.error(fFailed to parse JSON: {e}) abort(400, descriptionInvalid JSON) # 4. 将事件放入处理队列 try: # 这里可以加入一些简单的过滤比如只处理‘攻击’类型事件 if event_data.get(type) in [攻击, 登录]: # HFish的事件类型可能是中文 message_queue.put(event_data) logger.info(fEvent queued. Type: {event_data.get(type)}, SrcIP: {event_data.get(info, {}).get(src_ip, N/A)}) else: logger.debug(fIgnored event type: {event_data.get(type)}) except Exception as e: logger.error(fFailed to queue event: {e}) abort(500, descriptionInternal server error) # 5. 立即返回成功响应避免HFish重试 return jsonify({status: success, message: Event received}), 200 if __name__ __main__: # 在生产环境中应使用Gunicorn或uWSGI来运行Flask应用 app.run(host0.0.0.0, port5000, debugFalse) # debug务必设为False关键点解析与避坑指南签名验证代码中的verify_signature函数模拟了HFish Webhook的签名机制。你需要确保HFish后台配置的“密钥”与代码中WEBHOOK_SECRET环境变量的值一致。这是防止恶意伪造攻击数据的第一道防线。异步处理Webhook端点接收到数据后只做最基本的验证和解析然后立刻放入队列 (message_queue)并立即返回HTTP 200响应。绝对不要在Webhook处理函数中进行耗时的操作如IP查询、文件写入、网络请求否则会导致HFish端因超时而重发消息甚至阻塞后续请求。错误处理对JSON解析失败、队列满等情况做了处理并返回恰当的HTTP状态码。良好的错误处理能让日志更清晰也便于排查问题。运行方式直接运行python app.py仅用于开发测试。生产环境务必使用WSGI服务器如Gunicorn。例如gunicorn -w 4 -b 0.0.0.0:5000 app:app。-w 4表示启动4个worker进程可以并发处理请求。4.3 设计高效的消息队列与消费者现在我们需要一个后台线程或进程持续地从队列中取出消息进行处理。我们在app.py中增加消费者逻辑。# 在app.py中继续添加 import time from datetime import datetime class ThreatIntelProcessor(threading.Thread): def __init__(self, queue): super().__init__(daemonTrue) # 设置为守护线程主程序退出时自动结束 self.queue queue self.running True self.ioc_buffer [] # 临时缓冲区积累一定数量的IOC再批量处理 def run(self): logger.info(Threat intelligence processor thread started.) while self.running: try: # 阻塞获取最多等待5秒 event self.queue.get(timeout5) self.process_event(event) self.queue.task_done() except Exception as e: # 主要是队列超时异常属于正常情况 pass # 每隔一段时间检查缓冲区是否需要持久化例如每30秒或满100条 if len(self.ioc_buffer) 100 or (self.ioc_buffer and time.time() - self.last_flush 30): self.flush_buffer_to_disk() def process_event(self, event): 处理单个HFish事件提取IOC try: info event.get(info, {}) src_ip info.get(src_ip) if not src_ip or src_ip 0.0.0.0: return # 忽略无效IP honeypot info.get(honeypot, unknown) dst_port info.get(dst_port) timestamp event.get(time, datetime.utcnow().isoformat()) attack_payload info.get(data, )[:500] # 截取部分载荷避免过大 # 构建基础IOC记录 ioc_record { source_ip: src_ip, timestamp: timestamp, honeypot_type: honeypot, target_port: dst_port, event_type: event.get(type), raw_payload_preview: attack_payload, geo_info: {}, # 留待富化 threat_score: 0 } # 将记录加入缓冲区 self.ioc_buffer.append(ioc_record) logger.debug(fProcessed IOC for IP: {src_ip}) except Exception as e: logger.error(fError processing event {event.get(id)}: {e}) def flush_buffer_to_disk(self): 将缓冲区中的IOC写入临时文件或数据库 if not self.ioc_buffer: return # 这里先简单打印后续替换为写入文件或数据库 logger.info(fFlushing {len(self.ioc_buffer)} IOCs to storage.) # 示例写入一个临时JSON文件 import json temp_filename fioc_buffer_{int(time.time())}.json with open(temp_filename, w) as f: json.dump(self.ioc_buffer, f, indent2) logger.info(fBuffer saved to {temp_filename}) # 清空缓冲区并更新时间戳 self.ioc_buffer.clear() self.last_flush time.time() def stop(self): self.running False self.flush_buffer_to_disk() # 停止前保存剩余数据 # 在Flask app启动前启动处理器线程 processor ThreatIntelProcessor(message_queue) processor.start()设计思路与优化建议线程与守护进程使用threading.Thread创建一个后台守护线程专门处理队列。daemonTrue确保当主程序退出时线程也会结束避免僵尸进程。批量处理ioc_buffer缓冲区用于积累IOC记录。频繁地写入文件或调用外部API是低效的。积累到一定数量如100条或经过一定时间如30秒再批量处理能显著提升性能也符合外部API的调用频率限制。分离关注点process_event方法只负责从原始事件中提取关键字段构建一个结构化的IOC记录。复杂的富化逻辑IP查询和持久化逻辑写入文件、推送到Git可以放在flush_buffer_to_disk方法中或者拆分成更细的模块。临时存储当前示例将缓冲区数据写入一个临时JSON文件。在实际生产中你可以考虑SQLite数据库轻量适合单机服务方便查询去重。Redis速度快支持列表或有序集合天然适合做队列和临时存储。直接写入一个待处理的目录由另一个定时任务来读取和处理。至此我们已经搭建了一个能够安全接收HFish告警、并初步解析和缓冲攻击数据的Python服务。这个服务运行后HFish捕获的每一次攻击都会转化为一条结构化的IOC记录等待下一步处理。在下一部分我们将深入讲解如何为这些IOC记录“增值”——通过IP富化来补充地理位置、ISP、威胁评分等信息并最终实现定时生成情报文件和自动同步到GitHub Pages的完整闭环。