TL;DR
- Scenario: Use Flink DataStream to write user data to Apache Kudu table in real-time, via custom RichSinkFunction.
- Conclusion: Example can run, but has engineering risks like “per-column apply”, “async errors not collected”, “primary key conflict”, needs minor fixes.
- Output: Runnable example + version matrix + error quick reference card.
Version Matrix
| Status | Component Version/Scenario | Verification Conclusion |
|---|---|---|
| ✅ | Flink 1.11.1 (Scala 2.12) + Java 11 + kudu-client 1.17.0 | Local example can run (2025) |
| ⚠️ | Flink 1.14–1.18 + Java 11/17 + kudu-client 1.17 | API basically compatible, need check dependency coordinates and packaging shading (especially scala version consistency) |
| ⚠️ | YARN/K8s multi-concurrency deployment | Need set parallelism, checkpoint, idempotent strategy (recommend Upsert or deduplication key) |
| ✅ | Kudu table user | Fields: id INT32 primary key; name STRING; age INT32 Partition: hash(id,3); replica 1 |
| ⚠️ | Security and observability | AUTO_FLUSH_BACKGROUND needs explicit pending errors collection; recommend integrate metrics and alerts |
Implementation Idea
Basic idea of sinking data from Flink to Kudu:
- Environment preparation: Ensure Flink and Kudu environment running, configure related dependencies.
- Create Kudu table: Define table to store in Kudu, including primary key and column types.
- DataStream design: Use Flink’s DataStream API to read input data stream, do necessary data processing and transformation.
- Write to Kudu: Use Kudu connector to write processed data to Kudu table. Need configure Kudu client and table related info.
- Execute job: Start Flink job, write data from stream to Kudu in real-time for subsequent query and analysis.
Add Dependency
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>flink-test</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
<flink.version>1.11.1</flink.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kudu</groupId>
<artifactId>kudu-client</artifactId>
<version>1.17.0</version>
</dependency>
</dependencies>
</project>
Data Source
new UserInfo("001", "Jack", 18),
new UserInfo("002", "Rose", 20),
new UserInfo("003", "Cris", 22),
new UserInfo("004", "Lily", 19),
new UserInfo("005", "Lucy", 21),
new UserInfo("006", "Json", 24),
Custom Sink
package icu.wzk.kudu;
public class MyFlinkSinkToKudu extends RichSinkFunction<Map<String, Object>> {
private final static Logger logger = Logger.getLogger("MyFlinkSinkToKudu");
private KuduClient kuduClient;
private KuduTable kuduTable;
private String kuduMasterAddr;
private String tableName;
private Schema schema;
private KuduSession kuduSession;
private ByteArrayOutputStream out;
private ObjectOutputStream os;
public MyFlinkSinkToKudu(String kuduMasterAddr, String tableName) {
this.kuduMasterAddr = kuduMasterAddr;
this.tableName = tableName;
}
@Override
public void open(Configuration parameters) throws Exception {
out = new ByteArrayOutputStream();
os = new ObjectOutputStream(out);
kuduClient = new KuduClient.KuduClientBuilder(kuduMasterAddr).build();
kuduTable = kuduClient.openTable(tableName);
schema = kuduTable.getSchema();
kuduSession = kuduClient.newSession();
kuduSession.setFlushMode(KuduSession.FlushMode.AUTO_FLUSH_BACKGROUND);
}
@Override
public void invoke(Map<String, Object> map, Context context) throws Exception {
if (null == map) {
return;
}
try {
int columnCount = schema.getColumnCount();
Insert insert = kuduTable.newInsert();
PartialRow row = insert.getRow();
for (int i = 0; i < columnCount; i ++) {
Object value = map.get(schema.getColumnByIndex(i).getName());
insertData(row, schema.getColumnByIndex(i).getType(), schema.getColumnByIndex(i).getName(), value);
OperationResponse response = kuduSession.apply(insert);
if (null != response) {
logger.error(response.getRowError().toString());
}
}
} catch (Exception e) {
logger.error(e);
}
}
@Override
public void close() throws Exception {
try {
kuduSession.close();
kuduClient.close();
os.close();
out.close();
} catch (Exception e) {
logger.error(e);
}
}
private void insertData(PartialRow row, Type type, String columnName, Object value) {
try {
switch (type) {
case STRING:
row.addString(columnName, value.toString());
return;
case INT32:
row.addInt(columnName, Integer.valueOf(value.toString()));
return;
case INT64:
row.addLong(columnName, Long.valueOf(value.toString()));
return;
case DOUBLE:
row.addDouble(columnName, Double.valueOf(value.toString()));
return;
case BOOL:
row.addBoolean(columnName, Boolean.valueOf(value.toString()));
return;
case BINARY:
os.writeObject(value);
row.addBinary(columnName, out.toByteArray());
return;
case FLOAT:
row.addFloat(columnName, Float.valueOf(value.toString()));
default:
throw new UnsupportedOperationException("Unknown Type: " + type);
}
} catch (Exception e) {
logger.error("Insert data exception: " + e);
}
}
}
Create Entity
package icu.wzk.kudu;
public class UserInfo {
private String id;
private String name;
private Integer age;
// Omit constructor, Get, Set methods
}
Execute Table Creation
package icu.wzk.kudu;
public class KuduCreateTable {
public static void main(String[] args) throws KuduException {
String masterAddress = "localhost:7051,localhost:7151,localhost:7251";
KuduClient.KuduClientBuilder kuduClientBuilder = new KuduClient.KuduClientBuilder(masterAddress);
KuduClient kuduClient = kuduClientBuilder.build();
String tableName = "user";
List<ColumnSchema> columnSchemas = new ArrayList<>();
ColumnSchema id = new ColumnSchema
.ColumnSchemaBuilder("id", Type.INT32)
.key(true)
.build();
columnSchemas.add(id);
ColumnSchema name = new ColumnSchema
.ColumnSchemaBuilder("name", Type.STRING)
.key(false)
.build();
columnSchemas.add(name);
ColumnSchema age = new ColumnSchema
.ColumnSchemaBuilder("age", Type.INT32)
.key(false)
.build();
columnSchemas.add(age);
Schema schema = new Schema(columnSchemas);
CreateTableOptions options = new CreateTableOptions();
// Replica count is 1
options.setNumReplicas(1);
List<String> colrule = new ArrayList<>();
colrule.add("id");
options.addHashPartitions(colrule, 3);
kuduClient.createTable(tableName, schema, options);
kuduClient.close();
}
}
Main Logic Code
package icu.wzk.kudu;
public class SinkToKuduTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<UserInfo> dataSource = env.fromElements(
new UserInfo("001", "Jack", 18),
new UserInfo("002", "Rose", 20),
new UserInfo("003", "Cris", 22),
new UserInfo("004", "Lily", 19),
new UserInfo("005", "Lucy", 21),
new UserInfo("006", "Json", 24)
);
SingleOutputStreamOperator<Map<String, Object>> mapSource = dataSource
.map(new MapFunction<UserInfo, Map<String, Object>>() {
@Override
public Map<String, Object> map(UserInfo value) throws Exception {
Map<String, Object> map = new HashMap<>();
map.put("id", value.getId());
map.put("name", value.getName());
map.put("age", value.getAge());
return map;
}
});
String kuduMasterAddr = "localhost:7051,localhost:7151,localhost:7251";
String tableInfo = "user";
mapSource.addSink(new MyFlinkSinkToKudu(kuduMasterAddr, tableInfo));
env.execute("SinkToKuduTest");
}
}
Analysis
Environment Setup
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(): Initialize Flink execution environment, entry point for Flink application.
Data Source Creation
DataStreamSource dataSource = env.fromElements(…): Create data source containing multiple UserInfo objects, simulates input stream.
Data Transformation
SingleOutputStreamOperator<Map<String, Object>> mapSource = dataSource.map(…): Use map function to convert UserInfo objects to Map<String, Object>, convenient for subsequent processing and writing to Kudu. Each UserInfo’s properties are put into a HashMap.
Kudu Configuration Info
String kuduMasterAddr = "localhost:7051,localhost:7151,localhost:7251" and String tableInfo = "user": Define Kudu master node address and target table info.
Data Sink
mapSource.addSink(new MyFlinkSinkToKudu(kuduMasterAddr, tableInfo)): Add transformed data stream to custom Kudu Sink. MyFlinkSinkToKudu class implements logic to write data to Kudu.
Execute Job
env.execute("SinkToKuduTest"): Start Flink job, execute entire data flow processing.
Test Run
- First run table creation
- Then run main logic
After creating table, confirm user table exists. Then run Flink program to write data to Kudu.
After confirming table exists, execute Flink program.
Notes
- Concurrency: Based on Kudu cluster scale and configuration, can adjust Flink job concurrency to improve write performance.
- Batch Write: Kudu supports batch insert, can improve performance by appropriately configuring Flink’s sink.
- Fault Handling: Ensure exception and retry logic in job to ensure data is not lost.
- Monitoring & Debugging: Use Flink monitoring tools and Kudu tools (like Kudu UI) to monitor data flow and performance.
Error Quick Reference
| Symptom | Root Cause Location | Fix |
|---|---|---|
| Submit error “missing required column/duplicate submission/RowError chaos” | In per-column loop, kuduSession.apply(insert), causing same row submitted multiple times | Check Sink invoke, move apply out of column loop, first fill row then submit once; session.flush() if needed |
| Failed but no error log | AUTO_FLUSH_BACKGROUND returns null; errors settle in pending queue | session.getPendingErrors() periodically pull and report; or switch MANUAL_FLUSH for batch submit and explicit error handling |
| Primary key conflict (Already present) | Using Insert, duplicate primary key fails directly | Kudu UI/log RowError; if business allows, change to Upsert; or setIgnoreAllDuplicateRows(true) and add compensation log |
| NumberFormatException / type inconsistency | Source data id is String(“001”), Kudu column is INT32 | Exception stack/field mapping unify types: either use int in source, or change Kudu column to STRING; avoid leading zero loss |
| Binary column write expansion/dirty data | ByteArrayOutputStream not reset, object serialization accumulates | Sample row check/byte length anomaly; out.reset() after each write; avoid using ObjectOutputStream to serialize unnecessary fields on hot path |
| Write blocked/high backpressure | Session buffer/network bottleneck/single Tablet hotspot | Flink WebUI, Kudu Tablet load reasonable parallelism, scatter by partition key; setMutationBufferSpace, batch size tuning; optimize hash partition strategy |
| ”Table/Column not found” | Map key inconsistent with Schema columns | Print schema.getColumns() and map key; do field mapping validation before write, add default or skip unknown columns; generate mapping list at orchestration stage |
| Connection failure/timeout | Master address/port incorrect, or network unreachable | Check Kudu master logs and port confirm master:7051 list, DNS and security policy; if necessary connect directly to IP and ensure bidirectional reachability |
| Package conflict (NoSuchMethod / Scala version mismatch) | Flink and dependency Scala version inconsistent | Runtime exception unify using _2.12 artifacts; Maven unify scala.version; shading isolate if needed |