Big Data 131 - Flink CEP Practice: 24 Hours ≥5 Transactions & 10 Minutes Unpaid Detection Cases
Flink CEP Overview
Flink CEP (Complex Event Processing) is an extension library provided by Apache Flink for real-time complex event processing. Through Flink CEP, developers can identify specific event patterns from stream data. This is very useful in fraud detection, network security, real-time monitoring, IoT and other scenarios.
Basic Concepts
The core of Flink CEP is defining event patterns to detect complex event sequences from streams.
CEP allows users to:
- Define event patterns: Users can describe combinations of events they are interested in (like consecutive events, delayed events, etc.)
- Match patterns: Flink CEP searches event sequences matching defined patterns from streams
- Process match results: Once event sequences matching patterns are found, users can define how to process these matches
Basic Components
- Pattern: Describes event sequences to match in event stream. Can be single event or multiple events combination. Common pattern operations include next (immediate), followedBy (continuation), etc.
- PatternStream: Transforms event stream into pattern stream by applying pattern definition
- Select Function: Used to extract matched event sequences from pattern stream
CEP Development Steps
- Define event stream: Create a DataStream representing the original event stream
- Define event pattern: Use Flink CEP API to define event pattern
- Apply pattern to stream: Generate PatternStream
- Extract matched events: Use select function to extract events matching pattern
Use Cases
- Fraud detection: Identify consecutively occurring abnormal behaviors
- Network monitoring: Detect specific network attack patterns
- IoT: Analyze sensor data, detect device anomalies
- User behavior analysis: Analyze user behavior sequences
Case 2: Detect Active Trading Users
Business Requirement
Find users with at least 5 valid transactions within 24 hours.
Implementation Steps
- Get data source
- Watermark transformation
- keyBy transformation
- At least 5 times: timeOrMore(5)
- Within 24 hours: within(Time.hours(24))
- Pattern matching
- Extract successfully matched data
Code Implementation
package icu.wzk;
public class FlinkCepActiveUser {
public static void main(String[] args) throws Exception {
// 1. Environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1);
// 2. Construct sample data (username, price, eventTimeMillis)
DataStreamSource<CepActiveUserBean> data = env.fromElements(
new CepActiveUserBean("100XX", 0.0D, 1597905234000L),
new CepActiveUserBean("100XX", 100.0D, 1597905235000L),
new CepActiveUserBean("100XX", 200.0D, 1597905236000L),
new CepActiveUserBean("100XX", 300.0D, 1597905237000L),
new CepActiveUserBean("100XX", 400.0D, 1597905238000L),
new CepActiveUserBean("100XX", 500.0D, 1597905239000L),
new CepActiveUserBean("101XX", 0.0D, 1597905240000L),
new CepActiveUserBean("101XX", 100.0D, 1597905241000L)
);
// 3. Event time extraction + custom watermark
SingleOutputStreamOperator<CepActiveUserBean> watermark = data
.assignTimestampsAndWatermarks(new WatermarkStrategy<CepActiveUserBean>() {
@Override
public WatermarkGenerator<CepActiveUserBean> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
return new WatermarkGenerator<CepActiveUserBean>() {
long maxTimestamp = Long.MAX_VALUE;
long maxOutOfOrderness = 500L;
@Override
public void onEvent(CepActiveUserBean event, long eventTimestamp, WatermarkOutput output) {
maxTimestamp = Math.max(event.getTimestamp(), maxTimestamp);
}
@Override
public void onPeriodicEmit(WatermarkOutput output) {
output.emitWatermark(new Watermark(maxTimestamp - maxOutOfOrderness));
}
};
}
}.withTimestampAssigner((element, recordTimes) -> element.getTimestamp())
);
// 4. Partition by username
KeyedStream<CepActiveUserBean, String> keyed = watermark
.keyBy(new KeySelector<CepActiveUserBean, String>() {
@Override
public String getKey(CepActiveUserBean value) {
return value.getUsername();
}
});
// 5. Define CEP pattern
Pattern<CepActiveUserBean, CepActiveUserBean> pattern = Pattern
.<CepActiveUserBean>begin("start")
.where(new SimpleCondition<CepActiveUserBean>() {
@Override
public boolean filter(CepActiveUserBean value) {
return value.getPrice() > 0;
}
})
.timesOrMore(5)
.within(Time.hours(24));
// 6. Construct PatternStream and process matching results
PatternStream<CepActiveUserBean> parentStream = CEP.pattern(keyed, pattern);
SingleOutputStreamOperator<CepActiveUserBean> process = parentStream
.process(new PatternProcessFunction<CepActiveUserBean, CepActiveUserBean>() {
@Override
public void processMatch(Map<String, List<CepActiveUserBean>> map,
Context context,
Collector<CepActiveUserBean> collector) {
System.out.println("map: " + map);
}
});
// 7. Sink
process.print();
env.execute("FlinkCepActiveUser");
}
}
/** Simple Event Bean */
class CepActiveUserBean {
private String username;
private Double price;
private Long timestamp;
public CepActiveUserBean(String username, Double price, Long timestamp) {
this.username = username;
this.price = price;
this.timestamp = timestamp;
}
// getter / setter omitted
}
Running Result
map: {start=[CepActiveUserBean{username='100XX', price=100.0, timestamp=1597905235000}...
Case 3: Timeout Unpaid
Business Requirement
Find orders not paid within 10 minutes after placing order
Implementation Steps
- Get data source
- Transform Watermark
- keyBy transformation
- Make Pattern (no payment within 10 minutes after order)
- Pattern matching
- Extract successfully matched data
Code Implementation
package icu.wzk;
public class FlinkCepTimeOutPay {
public static void main(String[] args) throws Exception {
// 1. Environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1);
// 2. Construct sample data: userId, operation, eventTimeMillis
DataStreamSource<TimeOutPayBean> data = env.fromElements(
new TimeOutPayBean(1L, "create", 1597905234000L),
new TimeOutPayBean(1L, "pay", 1597905235000L),
new TimeOutPayBean(2L, "create", 1597905236000L),
new TimeOutPayBean(2L, "pay", 1597905237000L),
new TimeOutPayBean(3L, "create", 1597905239000L) // Unpaid
);
// 3. Event time extraction + custom watermark
DataStream<TimeOutPayBean> watermark = data
.assignTimestampsAndWatermarks(new WatermarkStrategy<TimeOutPayBean>() {
@Override
public WatermarkGenerator<TimeOutPayBean> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
return new WatermarkGenerator<TimeOutPayBean>() {
long maxTimestamp = Long.MAX_VALUE;
long maxOutOfOrderness = 500L;
@Override
public void onEvent(TimeOutPayBean e, long ts, WatermarkOutput out) {
maxTimestamp = Math.max(maxTimestamp, e.getTimestamp());
}
@Override
public void onPeriodicEmit(WatermarkOutput out) {
out.emitWatermark(new Watermark(maxTimestamp - maxOutOfOrderness));
}
};
}
}.withTimestampAssigner((element, recordTimestamp) -> element.getTimestamp())
);
// 4. Partition by userId
KeyedStream<TimeOutPayBean, Long> keyedStream = watermark.keyBy(new KeySelector<TimeOutPayBean, Long>() {
@Override
public Long getKey(TimeOutPayBean value) { return value.getUserId(); }
});
// 5. Define CEP pattern
OutputTag<TimeOutPayBean> orderTimeoutOutput = new OutputTag<>("orderTimeout") {};
Pattern<TimeOutPayBean, TimeOutPayBean> pattern = Pattern
.<TimeOutPayBean>begin("begin")
.where(new IterativeCondition<TimeOutPayBean>() {
@Override
public boolean filter(TimeOutPayBean e, Context<TimeOutPayBean> ctx) {
return "create".equals(e.getOperation());
}
})
.followedBy("pay")
.where(new IterativeCondition<TimeOutPayBean>() {
@Override
public boolean filter(TimeOutPayBean e, Context<TimeOutPayBean> ctx) {
return "pay".equals(e.getOperation());
}
})
.within(Time.seconds(600)); // 10 minutes
// 6. Match selection
PatternStream<TimeOutPayBean> patternStream = CEP.pattern(keyedStream, pattern);
SingleOutputStreamOperator<TimeOutPayBean> result = patternStream.select(
orderTimeoutOutput,
new PatternTimeoutFunction<TimeOutPayBean, TimeOutPayBean>() {
@Override
public TimeOutPayBean timeout(Map<String, List<TimeOutPayBean>> map, long timeoutTs) {
return map.get("begin").get(0);
}
},
new PatternSelectFunction<TimeOutPayBean, TimeOutPayBean>() {
@Override
public TimeOutPayBean select(Map<String, List<TimeOutPayBean>> map) {
return map.get("pay").get(0);
}
}
);
// 7. Output
DataStream<TimeOutPayBean> sideOutput = result.getSideOutput(orderTimeoutOutput);
sideOutput.print();
env.execute("FlinkCepTimeOutPay");
}
}
/** Input Event */
class TimeOutPayBean {
private Long userId;
private String operation;
private Long timestamp;
public TimeOutPayBean(Long userId, String operation, Long timestamp) {
this.userId = userId;
this.operation = operation;
this.timestamp = timestamp;
}
// getter / setter omitted
}
Running Result
TimeOutPayBean{userId=1, operation='pay', timestamp=1597905235000...
Core Points Summary
| Scenario | Pattern Configuration |
|---|---|
| 24h≥5 transactions | timesOrMore(5) + within(Time.hours(24)) |
| 10min unpaid | followedBy("pay") + within(Time.seconds(600)) |
Key Configuration:
- Watermark: Handle out-of-order events, ensure event time is processed correctly
- keyBy: CEP must be applied on keyed stream
- Side Output: Used to handle timeout unmatched cases