超时事件提取:当一个模式通过within关键字定义了检测窗口时间时,部分事件序列可能因为超过窗口长度而被丢弃,为了能够处理这些超时的部分匹配,select和flatSelectAPI调用允许制定超时处理程序。
FlinkCEP开发流程:
- 数据源转换:从数据源(Kafka、文件、Socket等)获取原始数据流,将数据转换为DataStream
- 模式定义与模式流创建:定义事件模式(Pattern),将DataStream与Pattern组合转换为PatternStream
- 模式流处理:使用Select/Process算子处理匹配事件
- 结果输出:输出到目标库
案例:恶意登录检测 找出5秒内,连续登录失败的账号。
测试数据:
new CepLoginBean(1L, "fail", 1597905234000L),
new CepLoginBean(1L, "success", 1597905235000L),
new CepLoginBean(2L, "fail", 1597905236000L),
new CepLoginBean(2L, "fail", 1597905237000L),
new CepLoginBean(2L, "fail", 1597905238000L),
new CepLoginBean(3L, "fail", 1597905239000L),
new CepLoginBean(3L, "success", 1597905240000L)
完整Java代码包含:
- 创建执行环境,设置事件时间语义
- 模拟输入数据流
- 分配时间戳与水位线
- 按用户ID分组
- 定义CEP模式:连续两次失败且间隔不超过5秒
- 应用模式并处理匹配结果
运行结果:检测到用户2在5秒内连续两次登录失败,输出匹配事件。