TL;DR

  • Scenario: Use Flink DataStream to write user data to Apache Kudu table in real-time, via custom RichSinkFunction.
  • Conclusion: Example can run, but has engineering risks like “per-column apply”, “async errors not collected”, “primary key conflict”, needs minor fixes.
  • Output: Runnable example + version matrix + error quick reference card.

Version Matrix

StatusComponent Version/ScenarioVerification Conclusion
Flink 1.11.1 (Scala 2.12) + Java 11 + kudu-client 1.17.0Local example can run (2025)
⚠️Flink 1.14–1.18 + Java 11/17 + kudu-client 1.17API basically compatible, need check dependency coordinates and packaging shading (especially scala version consistency)
⚠️YARN/K8s multi-concurrency deploymentNeed set parallelism, checkpoint, idempotent strategy (recommend Upsert or deduplication key)
Kudu table userFields: id INT32 primary key; name STRING; age INT32 Partition: hash(id,3); replica 1
⚠️Security and observabilityAUTO_FLUSH_BACKGROUND needs explicit pending errors collection; recommend integrate metrics and alerts

Implementation Idea

Basic idea of sinking data from Flink to Kudu:

  • Environment preparation: Ensure Flink and Kudu environment running, configure related dependencies.
  • Create Kudu table: Define table to store in Kudu, including primary key and column types.
  • DataStream design: Use Flink’s DataStream API to read input data stream, do necessary data processing and transformation.
  • Write to Kudu: Use Kudu connector to write processed data to Kudu table. Need configure Kudu client and table related info.
  • Execute job: Start Flink job, write data from stream to Kudu in real-time for subsequent query and analysis.

Add Dependency

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>flink-test</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>11</maven.compiler.source>
        <maven.compiler.target>11</maven.compiler.target>
        <flink.version>1.11.1</flink.version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kudu</groupId>
            <artifactId>kudu-client</artifactId>
            <version>1.17.0</version>
        </dependency>
    </dependencies>
</project>

Data Source

new UserInfo("001", "Jack", 18),
new UserInfo("002", "Rose", 20),
new UserInfo("003", "Cris", 22),
new UserInfo("004", "Lily", 19),
new UserInfo("005", "Lucy", 21),
new UserInfo("006", "Json", 24),

Custom Sink

package icu.wzk.kudu;
public class MyFlinkSinkToKudu extends RichSinkFunction<Map<String, Object>> {

    private final static Logger logger = Logger.getLogger("MyFlinkSinkToKudu");

    private KuduClient kuduClient;
    private KuduTable kuduTable;

    private String kuduMasterAddr;
    private String tableName;
    private Schema schema;
    private KuduSession kuduSession;
    private ByteArrayOutputStream out;
    private ObjectOutputStream os;

    public MyFlinkSinkToKudu(String kuduMasterAddr, String tableName) {
        this.kuduMasterAddr = kuduMasterAddr;
        this.tableName = tableName;
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        out = new ByteArrayOutputStream();
        os = new ObjectOutputStream(out);
        kuduClient = new KuduClient.KuduClientBuilder(kuduMasterAddr).build();
        kuduTable = kuduClient.openTable(tableName);
        schema = kuduTable.getSchema();
        kuduSession = kuduClient.newSession();
        kuduSession.setFlushMode(KuduSession.FlushMode.AUTO_FLUSH_BACKGROUND);
    }

    @Override
    public void invoke(Map<String, Object> map, Context context) throws Exception {
        if (null == map) {
            return;
        }
        try {
            int columnCount = schema.getColumnCount();
            Insert insert = kuduTable.newInsert();
            PartialRow row = insert.getRow();
            for (int i = 0; i < columnCount; i ++) {
                Object value = map.get(schema.getColumnByIndex(i).getName());
                insertData(row, schema.getColumnByIndex(i).getType(), schema.getColumnByIndex(i).getName(), value);
                OperationResponse response = kuduSession.apply(insert);
                if (null != response) {
                    logger.error(response.getRowError().toString());
                }
            }
        } catch (Exception e) {
            logger.error(e);
        }
    }

    @Override
    public void close() throws Exception {
        try {
            kuduSession.close();
            kuduClient.close();
            os.close();
            out.close();
        } catch (Exception e) {
            logger.error(e);
        }
    }

    private void insertData(PartialRow row, Type type, String columnName, Object value) {
        try {
            switch (type) {
                case STRING:
                    row.addString(columnName, value.toString());
                    return;
                case INT32:
                    row.addInt(columnName, Integer.valueOf(value.toString()));
                    return;
                case INT64:
                    row.addLong(columnName, Long.valueOf(value.toString()));
                    return;
                case DOUBLE:
                    row.addDouble(columnName, Double.valueOf(value.toString()));
                    return;
                case BOOL:
                    row.addBoolean(columnName, Boolean.valueOf(value.toString()));
                    return;
                case BINARY:
                    os.writeObject(value);
                    row.addBinary(columnName, out.toByteArray());
                    return;
                case FLOAT:
                    row.addFloat(columnName, Float.valueOf(value.toString()));
                default:
                    throw new UnsupportedOperationException("Unknown Type: " + type);
            }

        } catch (Exception e) {
            logger.error("Insert data exception: " + e);
        }
    }
}

Create Entity

package icu.wzk.kudu;

public class UserInfo {

    private String id;

    private String name;

    private Integer age;

    // Omit constructor, Get, Set methods
}

Execute Table Creation

package icu.wzk.kudu;
public class KuduCreateTable {

    public static void main(String[] args) throws KuduException {
        String masterAddress = "localhost:7051,localhost:7151,localhost:7251";
        KuduClient.KuduClientBuilder kuduClientBuilder = new KuduClient.KuduClientBuilder(masterAddress);
        KuduClient kuduClient = kuduClientBuilder.build();

        String tableName = "user";
        List<ColumnSchema> columnSchemas = new ArrayList<>();
        ColumnSchema id = new ColumnSchema
                .ColumnSchemaBuilder("id", Type.INT32)
                .key(true)
                .build();
        columnSchemas.add(id);
        ColumnSchema name = new ColumnSchema
                .ColumnSchemaBuilder("name", Type.STRING)
                .key(false)
                .build();
        columnSchemas.add(name);
        ColumnSchema age = new ColumnSchema
                .ColumnSchemaBuilder("age", Type.INT32)
                .key(false)
                .build();
        columnSchemas.add(age);

        Schema schema = new Schema(columnSchemas);
        CreateTableOptions options = new CreateTableOptions();
        // Replica count is 1
        options.setNumReplicas(1);
        List<String> colrule = new ArrayList<>();
        colrule.add("id");
        options.addHashPartitions(colrule, 3);

        kuduClient.createTable(tableName, schema, options);
        kuduClient.close();
    }

}

Main Logic Code

package icu.wzk.kudu;
public class SinkToKuduTest {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<UserInfo> dataSource = env.fromElements(
                new UserInfo("001", "Jack", 18),
                new UserInfo("002", "Rose", 20),
                new UserInfo("003", "Cris", 22),
                new UserInfo("004", "Lily", 19),
                new UserInfo("005", "Lucy", 21),
                new UserInfo("006", "Json", 24)
        );
        SingleOutputStreamOperator<Map<String, Object>> mapSource = dataSource
                .map(new MapFunction<UserInfo, Map<String, Object>>() {
                    @Override
                    public Map<String, Object> map(UserInfo value) throws Exception {
                        Map<String, Object> map = new HashMap<>();
                        map.put("id", value.getId());
                        map.put("name", value.getName());
                        map.put("age", value.getAge());
                        return map;
                    }
                });

        String kuduMasterAddr = "localhost:7051,localhost:7151,localhost:7251";
        String tableInfo = "user";
        mapSource.addSink(new MyFlinkSinkToKudu(kuduMasterAddr, tableInfo));

        env.execute("SinkToKuduTest");
    }

}

Analysis

Environment Setup

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(): Initialize Flink execution environment, entry point for Flink application.

Data Source Creation

DataStreamSource dataSource = env.fromElements(…): Create data source containing multiple UserInfo objects, simulates input stream.

Data Transformation

SingleOutputStreamOperator<Map<String, Object>> mapSource = dataSource.map(…): Use map function to convert UserInfo objects to Map<String, Object>, convenient for subsequent processing and writing to Kudu. Each UserInfo’s properties are put into a HashMap.

Kudu Configuration Info

String kuduMasterAddr = "localhost:7051,localhost:7151,localhost:7251" and String tableInfo = "user": Define Kudu master node address and target table info.

Data Sink

mapSource.addSink(new MyFlinkSinkToKudu(kuduMasterAddr, tableInfo)): Add transformed data stream to custom Kudu Sink. MyFlinkSinkToKudu class implements logic to write data to Kudu.

Execute Job

env.execute("SinkToKuduTest"): Start Flink job, execute entire data flow processing.

Test Run

  • First run table creation
  • Then run main logic

After creating table, confirm user table exists. Then run Flink program to write data to Kudu.

After confirming table exists, execute Flink program.

Notes

  • Concurrency: Based on Kudu cluster scale and configuration, can adjust Flink job concurrency to improve write performance.
  • Batch Write: Kudu supports batch insert, can improve performance by appropriately configuring Flink’s sink.
  • Fault Handling: Ensure exception and retry logic in job to ensure data is not lost.
  • Monitoring & Debugging: Use Flink monitoring tools and Kudu tools (like Kudu UI) to monitor data flow and performance.

Error Quick Reference

SymptomRoot Cause LocationFix
Submit error “missing required column/duplicate submission/RowError chaos”In per-column loop, kuduSession.apply(insert), causing same row submitted multiple timesCheck Sink invoke, move apply out of column loop, first fill row then submit once; session.flush() if needed
Failed but no error logAUTO_FLUSH_BACKGROUND returns null; errors settle in pending queuesession.getPendingErrors() periodically pull and report; or switch MANUAL_FLUSH for batch submit and explicit error handling
Primary key conflict (Already present)Using Insert, duplicate primary key fails directlyKudu UI/log RowError; if business allows, change to Upsert; or setIgnoreAllDuplicateRows(true) and add compensation log
NumberFormatException / type inconsistencySource data id is String(“001”), Kudu column is INT32Exception stack/field mapping unify types: either use int in source, or change Kudu column to STRING; avoid leading zero loss
Binary column write expansion/dirty dataByteArrayOutputStream not reset, object serialization accumulatesSample row check/byte length anomaly; out.reset() after each write; avoid using ObjectOutputStream to serialize unnecessary fields on hot path
Write blocked/high backpressureSession buffer/network bottleneck/single Tablet hotspotFlink WebUI, Kudu Tablet load reasonable parallelism, scatter by partition key; setMutationBufferSpace, batch size tuning; optimize hash partition strategy
”Table/Column not found”Map key inconsistent with Schema columnsPrint schema.getColumns() and map key; do field mapping validation before write, add default or skip unknown columns; generate mapping list at orchestration stage
Connection failure/timeoutMaster address/port incorrect, or network unreachableCheck Kudu master logs and port confirm master:7051 list, DNS and security policy; if necessary connect directly to IP and ensure bidirectional reachability
Package conflict (NoSuchMethod / Scala version mismatch)Flink and dependency Scala version inconsistentRuntime exception unify using _2.12 artifacts; Maven unify scala.version; shading isolate if needed