
事件流中带时间窗口的事件合成1、问题概述2、核心设计3、代码实现3、测试1、问题概述引入时间窗口Time Window是分布式事件流处理如 Flink/Spark Streaming中非常经典的场景。加入时间窗口意味着子事件不仅要满足逻辑表达式而且所有参与合成的子事件其发生的时间差不能超过规定的范围例如 5 分钟内。一旦超出这个窗口之前收到的过期事件应当被“清空”或“失效”状态机需要重新等待新的事件流。要在我们现有的解析器和逻辑树架构上优雅地实现它最有效的方法是引入一个“带有生命周期的事件注册表Registry”并让叶子节点去这个注册表里查询事件是否有效。2、核心设计带时间戳的事件传入的不再仅仅是一个 eventId而是一个包含了 timestamp 的事件对象。全局事件时间中心EventRegistry统一维护当前窗口内活跃的事件及其发生时间。每次新事件进来时自动清理掉那些已经和当前最新事件时差超过窗口大小的“老事件”。主状态机逻辑当一个新事件导致窗口滑动、老事件失效时状态机能实时感知并让逻辑树重新评估。3、代码实现第一步定义事件与事件注册表带滑窗清理// 1. 定义事件对象包含 ID 和发生的时间戳publicclassTimeEvent{finalintid;finallongtimestamp;// 毫秒publicTimeEvent(intid,longtimestamp){this.idid;this.timestamptimestamp;}}// 2. 时间窗口注册表负责管理哪些事件在窗口内有效classTimeWindowRegistry{privatefinallongwindowSizeMs;// 窗口大小毫秒// 记录窗口内每个事件 ID 对应的最新发生时间privatefinalMapInteger,LongactiveEventsnewHashMap();publicTimeWindowRegistry(longwindowSizeMs){this.windowSizeMswindowSizeMs;}// 激活一个事件并根据当前最新时间滑窗清理过期事件publicvoidregisterEvent(TimeEventevent){longcurrentLatestTimeevent.timestamp;activeEvents.put(event.id,currentLatestTime);// 滑动窗口的核心移除所有与当前最新事件时间差超过 windowSizeMs 的旧事件activeEvents.entrySet().removeIf(entry-(currentLatestTime-entry.getValue())windowSizeMs);}// 检查某个事件当前是否在窗口内有效publicbooleanisEventActive(inteventId){returnactiveEvents.containsKey(eventId);}publicvoidclear(){activeEvents.clear();}}第二步改造叶子节点从静态标记改为动态查询以前的 EventLeafNode 是自己内部存一个 triggered true。现在它应该去 TimeWindowRegistry 里动态查询自己关心的事件当前是否还在窗口内。publicinterfaceLogicNode{booleanisSatisfied();// 当前节点是否已满足voidtrigger(inteventId);// 触发子事件voidreset();// 重置状态}// 改造后的叶子节点classTimeEventLeafNodeimplementsLogicNode{privatefinalinteventId;privatefinalTimeWindowRegistryregistry;// 注入注册表的引用publicTimeEventLeafNode(inteventId,TimeWindowRegistryregistry){this.eventIdeventId;this.registryregistry;}OverridepublicbooleanisSatisfied(){// 动态判断只有在时间窗口内活跃才算满足returnregistry.isEventActive(eventId);}Overridepublicvoidtrigger(intid){// 触发逻辑交由全局注册表通过 feedEvent 统一调度这里无需单独处理}Overridepublicvoidreset(){// 注册表清空即可}}// 2. 与节点And所有子节点必须全部满足classAndNodeimplementsLogicNode{privatefinalListLogicNodechildrennewArrayList();publicAndNode(LogicNode...nodes){children.addAll(Arrays.asList(nodes));}OverridepublicbooleanisSatisfied(){returnchildren.stream().allMatch(LogicNode::isSatisfied);}Overridepublicvoidtrigger(inteventId){children.forEach(child-child.trigger(eventId));}Overridepublicvoidreset(){children.forEach(LogicNode::reset);}}// 3. 或节点Or子节点满足其一即可classOrNodeimplementsLogicNode{privatefinalListLogicNodechildrennewArrayList();publicOrNode(LogicNode...nodes){children.addAll(Arrays.asList(nodes));}OverridepublicbooleanisSatisfied(){returnchildren.stream().anyMatch(LogicNode::isSatisfied);}Overridepublicvoidtrigger(inteventId){children.forEach(child-child.trigger(eventId));}Overridepublicvoidreset(){children.forEach(LogicNode::reset);}}第三步升级解析器支持注入注册表为了配合新的叶子节点解析器在生成树时要把 TimeWindowRegistry 传给每一个叶子节点。publicclassTimeRuleParser{publicstaticLogicNodeparse(Stringexpression,TimeWindowRegistryregistry){expressionexpression.replaceAll(\\s,);StackLogicNodenodesnewStack();StackCharacteroperatorsnewStack();inti0;while(iexpression.length()){charcexpression.charAt(i);if(Character.isDigit(c)){StringBuildersbnewStringBuilder();while(iexpression.length()Character.isDigit(expression.charAt(i))){sb.append(expression.charAt(i));i;}// 【核心改动】创建叶子节点时注入时间窗口注册表nodes.push(newTimeEventLeafNode(Integer.parseInt(sb.toString()),registry));continue;}elseif(c(){operators.push(c);}elseif(c)){while(!operators.isEmpty()operators.peek()!(){executeOperator(nodes,operators.pop());}operators.pop();}elseif(c||c|){while(!operators.isEmpty()precedence(operators.peek())precedence(c)){executeOperator(nodes,operators.pop());}operators.push(c);}i;}while(!operators.isEmpty()){executeOperator(nodes,operators.pop());}returnnodes.pop();}privatestaticintprecedence(charop){if(op)return2;if(op|)return1;return0;}privatestaticvoidexecuteOperator(StackLogicNodenodes,charop){LogicNoderightnodes.pop();LogicNodeleftnodes.pop();if(op)nodes.push(newAndNode(left,right));elseif(op|)nodes.push(newOrNode(left,right));}}第四步时间窗口状态机上下文publicclassTimeStateMachine{//主状态privateMainStatecurrentStateMainState.INIT;//抽象语法树根节点privatefinalLogicNoderootCondition;//时间窗口privatefinalTimeWindowRegistryregistry;publicTimeStateMachine(LogicNoderootCondition,TimeWindowRegistryregistry){this.rootConditionrootCondition;this.registryregistry;}// 接收带时间戳的事件publicsynchronizedvoidfeedEvent(TimeEventevent){if(currentStateMainState.COMPLETED)return;currentStateMainState.PROCESSING;// 1. 将新事件注册进去内部会自动触发滑窗淘汰超时事件registry.registerEvent(event);System.out.println(String.format(收到事件 [%d]发生时间: %dms,event.id,event.timestamp));// 2. 让逻辑树基于当前窗口内的有效事件重新进行整体评估if(rootCondition.isSatisfied()){this.currentStateMainState.COMPLETED;System.out.println( 【成功】在时间窗口内满足公式流转至 - currentState);}else{System.out.println(- 当前窗口内未满足公式继续等待...);}}publicMainStategetCurrentState(){returncurrentState;}}3、测试我们设置时间窗口为 5000毫秒5秒规则依然是 (1 2 (3 | 4)) 5。测试场景 A由于事件 1 超时导致合成失败publicstaticvoidmain(String[]args){StringruleStr(1 2 (3 | 4)) 5;// 1. 定义一个 5 秒的时间窗口注册表TimeWindowRegistryregistrynewTimeWindowRegistry(5000);LogicNoderootTimeRuleParser.parse(ruleStr,registry);TimeStateMachinestateMachinenewTimeStateMachine(root,registry);System.out.println(--- 模拟场景 A事件 1 超时被滑出窗口 ---);stateMachine.feedEvent(newTimeEvent(1,1000));// 1 秒时发生事件 1stateMachine.feedEvent(newTimeEvent(5,2000));// 2 秒时发生事件 5stateMachine.feedEvent(newTimeEvent(4,3000));// 3 秒时发生事件 4// 关键点等到第 7 秒才发生事件 2。// 此时最新时间是 7000窗口范围是 [2000, 7000]1000ms 发生的事件 1 已经被无情滑出stateMachine.feedEvent(newTimeEvent(2,7000));System.out.println(最终状态: stateMachine.getCurrentState());// 依然是 PROCESSING}--- 模拟场景 A事件 1 超时被滑出窗口 --- 收到事件 [1]发生时间: 1000ms - 当前窗口内未满足公式继续等待... 收到事件 [5]发生时间: 2000ms - 当前窗口内未满足公式继续等待... 收到事件 [4]发生时间: 3000ms - 当前窗口内未满足公式继续等待... 收到事件 [2]发生时间: 7000ms - 当前窗口内未满足公式继续等待... 最终状态: PROCESSING测试场景 B所有事件紧凑发生成功合成System.out.println(\n--- 模拟场景 B5秒窗口内紧凑发生合成成功 ---);// 重置状态机进行新测试TimeWindowRegistryregistryBnewTimeWindowRegistry(5000);TimeStateMachinestateMachineBnewTimeStateMachine(TimeRuleParser.parse(ruleStr,registryB),registryB);stateMachineB.feedEvent(newTimeEvent(1,1000));// 1sstateMachineB.feedEvent(newTimeEvent(5,2000));// 2sstateMachineB.feedEvent(newTimeEvent(4,3000));// 3sstateMachineB.feedEvent(newTimeEvent(2,4500));// 4.5s - 最新窗口 [0, 4500]事件 1 还在--- 模拟场景 B5秒窗口内紧凑发生合成成功 --- 收到事件 [1]发生时间: 1000ms - 当前窗口内未满足公式继续等待... 收到事件 [5]发生时间: 2000ms - 当前窗口内未满足公式继续等待... 收到事件 [4]发生时间: 3000ms - 当前窗口内未满足公式继续等待... 收到事件 [2]发生时间: 4500ms 【成功】在时间窗口内满足公式流转至 - COMPLETED