Dependency Import
<dependency>
<groupId>org.apache.kudu</groupId>
<artifactId>kudu-client</artifactId>
<version>1.4.0</version>
</dependency>
Create Table
package icu.wzk.kudu;
public class KuduCreateTable {
public static void main(String[] args) throws KuduException {
String masterAddress = "localhost:7051,localhost:7151,localhost:7251";
KuduClient.KuduClientBuilder kuduClientBuilder = new KuduClient.KuduClientBuilder(masterAddress);
KuduClient kuduClient = kuduClientBuilder.build();
String tableName = "student";
List<ColumnSchema> columnSchemas = new ArrayList<>();
ColumnSchema id = new ColumnSchema
.ColumnSchemaBuilder("id", Type.INT32)
.key(true)
.build();
columnSchemas.add(id);
ColumnSchema name = new ColumnSchema
.ColumnSchemaBuilder("name", Type.STRING)
.key(false)
.build();
columnSchemas.add(name);
Schema schema = new Schema(columnSchemas);
CreateTableOptions options = new CreateTableOptions();
options.setNumReplicas(1);
List<String> colrule = new ArrayList<>();
colrule.add("id");
options.addHashPartitions(colrule, 3);
kuduClient.createTable(tableName, schema, options);
kuduClient.close();
}
}
Delete Table
package icu.wzk.kudu;
public class KuduDeleteTable {
public static void main(String[] args) throws KuduException {
String masterAddress = "localhost:7051,localhost:7151,localhost:7251,";
KuduClient client = new KuduClient.KuduClientBuilder(masterAddress)
.defaultAdminOperationTimeoutMs(5000)
.build();
client.deleteTable("student");
client.close();
}
}
Insert Data
package icu.wzk.kudu;
public class KuduInsert {
public static void main(String[] args) throws KuduException {
String masterAddr = "localhost:7051,localhost:7151,localhost:7251";
KuduClient client = new KuduClient
.KuduClientBuilder(masterAddr)
.defaultAdminOperationTimeoutMs(5000)
.build();
KuduTable stuTable = client.openTable("student");
KuduSession kuduSession = client.newSession();
kuduSession.setFlushMode(SessionConfiguration.FlushMode.MANUAL_FLUSH);
Insert insert = stuTable.newInsert();
insert.getRow().addInt("id", 1);
insert.getRow().addString("name", "wzk");
kuduSession.flush();
kuduSession.apply(insert);
kuduSession.close();
client.close();
}
}
KuduSession Flush Mode Notes:
- AUTO_FLUSH_SYNC (default): After calling apply, client refreshes to server before returning, doesn’t support batch insert
- AUTO_FLUSH_BACKGROUND: Returns immediately after calling apply, sends in background, may be out of order (KUDU-1767)
- MANUAL_FLUSH: Returns quickly after calling apply, write operations not sent, requires user to call flush function
Query Data
package icu.wzk.kudu;
public class KuduSelect {
public static void main(String[] args) throws KuduException {
String masterAddr = "localhost:7051,localhost:7151,localhost:7251";
KuduClient client = new KuduClient
.KuduClientBuilder(masterAddr)
.build();
KuduTable kuduTable = client.openTable("user");
KuduScanner kuduScanner = client.newScannerBuilder(kuduTable).build();
while (kuduScanner.hasMoreRows()) {
for (RowResult result : kuduScanner.nextRows()) {
int id = result.getInt("id");
String name = result.getString("name");
int age = result.getInt("age");
System.out.println("id: " + id + ", name: " + name + ", age: " + age);
}
}
client.close();
}
}
Update Data
package icu.wzk.kudu;
public class KuduUpdate {
public static void main(String[] args) throws KuduException {
String masterAddress = "localhost:7051,localhost:7151,localhost:7251";
KuduClient client = new KuduClient
.KuduClientBuilder(masterAddress)
.build();
KuduTable stuTable = client.openTable("student");
KuduSession kuduSession = client.newSession();
kuduSession.setFlushMode(SessionConfiguration.FlushMode.MANUAL_FLUSH);
Update update = stuTable.newUpdate();
update.getRow().addInt("id", 1);
update.getRow().addString("name", "wzk_icu");
kuduSession.apply(update);
kuduSession.close();
client.close();
}
}
Delete Data
package icu.wzk.kudu;
public class KuduDelete {
public static void main(String[] args) throws KuduException {
String masterAddress = "localhost:7051,localhost:7151,localhost:7251";
KuduClient client = new KuduClient
.KuduClientBuilder(masterAddress)
.build();
KuduSession kuduSession = client.newSession();
KuduTable stuTable = client.openTable("student");
kuduSession.setFlushMode(SessionConfiguration.FlushMode.MANUAL_FLUSH);
Delete delete = stuTable.newDelete();
PartialRow row = delete.getRow();
row.addInt("id", 1);
kuduSession.flush();
kuduSession.apply(delete);
kuduSession.close();
client.close();
}
}
Error Quick Reference
| Symptom | Root Cause Location | Fix |
|---|---|---|
| No error in console but data not in table | MANUAL_FLUSH called flush before apply, order reversed | Change to apply before flush |
| UnknownHostException/connection exception | Extra comma at end of Master address | Remove trailing comma |
| Not found: table user | Query table name different from creation | Unify table names |
| Column read error | Schema doesn’t contain that column | Add to Schema |
| Read update/delete ineffective | MANUAL_FLUSH not flushed | Explicit flush after apply |
| Out of order | AUTO_FLUSH_BACKGROUND concurrent send | Change to sync/manual batch |
| Cannot connect to Leader Master | Master not started/wrong port | Check Master status |
| Insert/update lost writes | Not checking errors | Check getPendingErrors() after each flush |