依赖导入
<dependency>
<groupId>org.apache.kudu</groupId>
<artifactId>kudu-client</artifactId>
<version>1.4.0</version>
</dependency>
创建表
package icu.wzk.kudu;
public class KuduCreateTable {
public static void main(String[] args) throws KuduException {
String masterAddress = "localhost:7051,localhost:7151,localhost:7251";
KuduClient.KuduClientBuilder kuduClientBuilder = new KuduClient.KuduClientBuilder(masterAddress);
KuduClient kuduClient = kuduClientBuilder.build();
String tableName = "student";
List<ColumnSchema> columnSchemas = new ArrayList<>();
ColumnSchema id = new ColumnSchema
.ColumnSchemaBuilder("id", Type.INT32)
.key(true)
.build();
columnSchemas.add(id);
ColumnSchema name = new ColumnSchema
.ColumnSchemaBuilder("name", Type.STRING)
.key(false)
.build();
columnSchemas.add(name);
Schema schema = new Schema(columnSchemas);
CreateTableOptions options = new CreateTableOptions();
options.setNumReplicas(1);
List<String> colrule = new ArrayList<>();
colrule.add("id");
options.addHashPartitions(colrule, 3);
kuduClient.createTable(tableName, schema, options);
kuduClient.close();
}
}
删除表
package icu.wzk.kudu;
public class KuduDeleteTable {
public static void main(String[] args) throws KuduException {
String masterAddress = "localhost:7051,localhost:7151,localhost:7251,";
KuduClient client = new KuduClient.KuduClientBuilder(masterAddress)
.defaultAdminOperationTimeoutMs(5000)
.build();
client.deleteTable("student");
client.close();
}
}
插入数据
package icu.wzk.kudu;
public class KuduInsert {
public static void main(String[] args) throws KuduException {
String masterAddr = "localhost:7051,localhost:7151,localhost:7251";
KuduClient client = new KuduClient
.KuduClientBuilder(masterAddr)
.defaultAdminOperationTimeoutMs(5000)
.build();
KuduTable stuTable = client.openTable("student");
KuduSession kuduSession = client.newSession();
kuduSession.setFlushMode(SessionConfiguration.FlushMode.MANUAL_FLUSH);
Insert insert = stuTable.newInsert();
insert.getRow().addInt("id", 1);
insert.getRow().addString("name", "wzk");
kuduSession.flush();
kuduSession.apply(insert);
kuduSession.close();
client.close();
}
}
KuduSession 刷新模式说明:
- AUTO_FLUSH_SYNC(默认):调用 apply 方法后,客户端刷新到服务器后再返回,不支持批量插入
- AUTO_FLUSH_BACKGROUND:调用 apply 后立即返回,后台发送,可能乱序(KUDU-1767)
- MANUAL_FLUSH:调用 apply 后快速返回,写操作不发送,需用户调用 flush 函数
查询数据
package icu.wzk.kudu;
public class KuduSelect {
public static void main(String[] args) throws KuduException {
String masterAddr = "localhost:7051,localhost:7151,localhost:7251";
KuduClient client = new KuduClient
.KuduClientBuilder(masterAddr)
.build();
KuduTable kuduTable = client.openTable("user");
KuduScanner kuduScanner = client.newScannerBuilder(kuduTable).build();
while (kuduScanner.hasMoreRows()) {
for (RowResult result : kuduScanner.nextRows()) {
int id = result.getInt("id");
String name = result.getString("name");
int age = result.getInt("age");
System.out.println("id: " + id + ", name: " + name + ", age: " + age);
}
}
client.close();
}
}
更新数据
package icu.wzk.kudu;
public class KuduUpdate {
public static void main(String[] args) throws KuduException {
String masterAddress = "localhost:7051,localhost:7151,localhost:7251";
KuduClient client = new KuduClient
.KuduClientBuilder(masterAddress)
.build();
KuduTable stuTable = client.openTable("student");
KuduSession kuduSession = client.newSession();
kuduSession.setFlushMode(SessionConfiguration.FlushMode.MANUAL_FLUSH);
Update update = stuTable.newUpdate();
update.getRow().addInt("id", 1);
update.getRow().addString("name", "wzk_icu");
kuduSession.apply(update);
kuduSession.close();
client.close();
}
}
删除数据
package icu.wzk.kudu;
public class KuduDelete {
public static void main(String[] args) throws KuduException {
String masterAddress = "localhost:7051,localhost:7151,localhost:7251";
KuduClient client = new KuduClient
.KuduClientBuilder(masterAddress)
.build();
KuduSession kuduSession = client.newSession();
KuduTable stuTable = client.openTable("student");
kuduSession.setFlushMode(SessionConfiguration.FlushMode.MANUAL_FLUSH);
Delete delete = stuTable.newDelete();
PartialRow row = delete.getRow();
row.addInt("id", 1);
kuduSession.flush();
kuduSession.apply(delete);
kuduSession.close();
client.close();
}
}
错误速查
| 症状 | 根因定位 | 修复 |
|---|---|---|
| 控制台无报错但表中无数据 | MANUAL_FLUSH 先 flush 再 apply,顺序反了 | 改为先 apply 后 flush |
| UnknownHostException/连接异常 | Master 地址末尾多了逗号 | 移除尾逗号 |
| Not found: table user | 查询表名与创建不一致 | 统一表名 |
| 读取列报错 | Schema 不含该列 | 补齐 Schema |
| 读取更新/删除无效 | MANUAL_FLUSH 未 flush | apply 后显式 flush |
| 乱序 | AUTO_FLUSH_BACKGROUND 并发发送 | 改用同步/手动批处理 |
| 无法连接 Leader Master | Master 未启动/端口错误 | 检查 Master 状态 |
| 插入/更新丢写 | 未检查错误 | 每次 flush 后检查 getPendingErrors() |