超时事件提取:当一个模式通过within关键字定义了检测窗口时间时,部分事件序列可能因为超过窗口长度而被丢弃,为了能够处理这些超时的部分匹配,select和flatSelectAPI调用允许制定超时处理程序。

FlinkCEP开发流程

  1. 数据源转换:从数据源(Kafka、文件、Socket等)获取原始数据流,将数据转换为DataStream
  2. 模式定义与模式流创建:定义事件模式(Pattern),将DataStream与Pattern组合转换为PatternStream
  3. 模式流处理:使用Select/Process算子处理匹配事件
  4. 结果输出:输出到目标库

案例:恶意登录检测 找出5秒内,连续登录失败的账号。

测试数据:

new CepLoginBean(1L, "fail", 1597905234000L),
new CepLoginBean(1L, "success", 1597905235000L),
new CepLoginBean(2L, "fail", 1597905236000L),
new CepLoginBean(2L, "fail", 1597905237000L),
new CepLoginBean(2L, "fail", 1597905238000L),
new CepLoginBean(3L, "fail", 1597905239000L),
new CepLoginBean(3L, "success", 1597905240000L)

完整Java代码包含:

  • 创建执行环境,设置事件时间语义
  • 模拟输入数据流
  • 分配时间戳与水位线
  • 按用户ID分组
  • 定义CEP模式:连续两次失败且间隔不超过5秒
  • 应用模式并处理匹配结果

运行结果:检测到用户2在5秒内连续两次登录失败,输出匹配事件。