依赖导入

<dependency>
  <groupId>org.apache.kudu</groupId>
  <artifactId>kudu-client</artifactId>
  <version>1.4.0</version>
</dependency>

创建表

package icu.wzk.kudu;
public class KuduCreateTable {
    public static void main(String[] args) throws KuduException {
        String masterAddress = "localhost:7051,localhost:7151,localhost:7251";
        KuduClient.KuduClientBuilder kuduClientBuilder = new KuduClient.KuduClientBuilder(masterAddress);
        KuduClient kuduClient = kuduClientBuilder.build();
        String tableName = "student";
        List<ColumnSchema> columnSchemas = new ArrayList<>();
        ColumnSchema id = new ColumnSchema
                .ColumnSchemaBuilder("id", Type.INT32)
                .key(true)
                .build();
        columnSchemas.add(id);
        ColumnSchema name = new ColumnSchema
                .ColumnSchemaBuilder("name", Type.STRING)
                .key(false)
                .build();
        columnSchemas.add(name);
        Schema schema = new Schema(columnSchemas);
        CreateTableOptions options = new CreateTableOptions();
        options.setNumReplicas(1);
        List<String> colrule = new ArrayList<>();
        colrule.add("id");
        options.addHashPartitions(colrule, 3);
        kuduClient.createTable(tableName, schema, options);
        kuduClient.close();
    }
}

删除表

package icu.wzk.kudu;
public class KuduDeleteTable {
    public static void main(String[] args) throws KuduException {
        String masterAddress = "localhost:7051,localhost:7151,localhost:7251,";
        KuduClient client = new KuduClient.KuduClientBuilder(masterAddress)
                .defaultAdminOperationTimeoutMs(5000)
                .build();
        client.deleteTable("student");
        client.close();
    }
}

插入数据

package icu.wzk.kudu;
public class KuduInsert {
    public static void main(String[] args) throws KuduException {
        String masterAddr = "localhost:7051,localhost:7151,localhost:7251";
        KuduClient client = new KuduClient
                .KuduClientBuilder(masterAddr)
                .defaultAdminOperationTimeoutMs(5000)
                .build();
        KuduTable stuTable = client.openTable("student");
        KuduSession kuduSession = client.newSession();
        kuduSession.setFlushMode(SessionConfiguration.FlushMode.MANUAL_FLUSH);
        Insert insert = stuTable.newInsert();
        insert.getRow().addInt("id", 1);
        insert.getRow().addString("name", "wzk");
        kuduSession.flush();
        kuduSession.apply(insert);
        kuduSession.close();
        client.close();
    }
}

KuduSession 刷新模式说明:

  • AUTO_FLUSH_SYNC(默认):调用 apply 方法后,客户端刷新到服务器后再返回,不支持批量插入
  • AUTO_FLUSH_BACKGROUND:调用 apply 后立即返回,后台发送,可能乱序(KUDU-1767)
  • MANUAL_FLUSH:调用 apply 后快速返回,写操作不发送,需用户调用 flush 函数

查询数据

package icu.wzk.kudu;
public class KuduSelect {
    public static void main(String[] args) throws KuduException {
        String masterAddr = "localhost:7051,localhost:7151,localhost:7251";
        KuduClient client = new KuduClient
                .KuduClientBuilder(masterAddr)
                .build();
        KuduTable kuduTable = client.openTable("user");
        KuduScanner kuduScanner = client.newScannerBuilder(kuduTable).build();
        while (kuduScanner.hasMoreRows()) {
            for (RowResult result : kuduScanner.nextRows()) {
                int id = result.getInt("id");
                String name = result.getString("name");
                int age = result.getInt("age");
                System.out.println("id: " + id + ", name: " + name + ", age: " + age);
            }
        }
        client.close();
    }
}

更新数据

package icu.wzk.kudu;
public class KuduUpdate {
    public static void main(String[] args) throws KuduException {
        String masterAddress = "localhost:7051,localhost:7151,localhost:7251";
        KuduClient client = new KuduClient
                .KuduClientBuilder(masterAddress)
                .build();
        KuduTable stuTable = client.openTable("student");
        KuduSession kuduSession = client.newSession();
        kuduSession.setFlushMode(SessionConfiguration.FlushMode.MANUAL_FLUSH);
        Update update = stuTable.newUpdate();
        update.getRow().addInt("id", 1);
        update.getRow().addString("name", "wzk_icu");
        kuduSession.apply(update);
        kuduSession.close();
        client.close();
    }
}

删除数据

package icu.wzk.kudu;
public class KuduDelete {
    public static void main(String[] args) throws KuduException {
        String masterAddress = "localhost:7051,localhost:7151,localhost:7251";
        KuduClient client = new KuduClient
                .KuduClientBuilder(masterAddress)
                .build();
        KuduSession kuduSession = client.newSession();
        KuduTable stuTable = client.openTable("student");
        kuduSession.setFlushMode(SessionConfiguration.FlushMode.MANUAL_FLUSH);
        Delete delete = stuTable.newDelete();
        PartialRow row = delete.getRow();
        row.addInt("id", 1);
        kuduSession.flush();
        kuduSession.apply(delete);
        kuduSession.close();
        client.close();
    }
}

错误速查

症状根因定位修复
控制台无报错但表中无数据MANUAL_FLUSH 先 flush 再 apply,顺序反了改为先 apply 后 flush
UnknownHostException/连接异常Master 地址末尾多了逗号移除尾逗号
Not found: table user查询表名与创建不一致统一表名
读取列报错Schema 不含该列补齐 Schema
读取更新/删除无效MANUAL_FLUSH 未 flushapply 后显式 flush
乱序AUTO_FLUSH_BACKGROUND 并发发送改用同步/手动批处理
无法连接 Leader MasterMaster 未启动/端口错误检查 Master 状态
插入/更新丢写未检查错误每次 flush 后检查 getPendingErrors()