Canal Overview

Canal is Alibaba’s open-source MySQL binlog incremental subscription and consumption platform. It simulates MySQL’s master-slave replication mechanism, implementing Change Data Capture (CDC) by parsing MySQL binary logs (binlog).

  • Data synchronization: Supports synchronizing database change data to other data sources or message systems like Kafka, RocketMQ, Elasticsearch, etc.
  • Real-time capability: Based on binlog parsing and subscription, enables millisecond-level data change capture.
  • Distributed architecture: Supports cluster deployment, meeting high availability and high throughput requirements.
  • Multi-database support: Besides MySQL, also supports MariaDB and some databases compatible with MySQL protocol.

Kafka Overview

Kafka is a distributed messaging system for high-throughput, low-latency data stream processing. It supports message persistence, subscription consumption, stream processing, and is commonly used for log collection, event stream processing, big data analysis, and message queue scenarios.

  • Producer: Responsible for writing data to Kafka.
  • Consumer: Reads data from Kafka.
  • Broker: Server in Kafka cluster, responsible for storing and distributing messages.
  • Topic: Message classification storage unit.
  • Partition: Stores data in partitions to achieve parallel processing and load balancing.

Canal and Kafka Integration Principle

Canal and Kafka are typically used together to build efficient data synchronization pipelines, enabling real-time push of database changes to message queues. The process is as follows:

  1. Data source capture: Canal monitors MySQL binlog data change events.
  2. Data parsing: Canal parses binlog data into JSON format or other structured data.
  3. Message push: Canal sends parsed data to specified Kafka Topic.
  4. Message consumption and processing: Kafka Consumer consumes data and further distributes to other services or storage like Hadoop, Elasticsearch, Redis.

Use Cases

Data Synchronization and Distribution

Achieve data consistency across multiple heterogeneous systems. For example, synchronize MySQL data changes to Elasticsearch for real-time search.

Log Analysis and Monitoring

Push database operation events to Kafka for log analysis or real-time monitoring systems.

Real-time Data Stream Processing

Data flows through Kafka into Flink, Spark Streaming and other stream processing frameworks to meet complex data processing needs.

Cache Refresh

After database updates, push change messages to Kafka, then consumers update Redis cache to improve consistency and access performance.

Notes and Optimization

Data Consistency Assurance

Ensure binlog is consistent with business logs to avoid missed or duplicate consumption.

Partition and Load Balancing

Use Kafka’s partitioning mechanism to distribute different tables or business traffic, improving parallel consumption capability.

Message Format Optimization

Choose flat JSON format for data transmission, facilitating consumer parsing and processing.

Fault Tolerance and Recovery

Configure retry mechanisms between Canal and Kafka to avoid data loss from temporary network failures.

Security

Configure Canal with minimum permission account for MySQL access, only granting REPLICATION SLAVE and REPLICATION CLIENT permissions.

Environment Requirements

  • MySQL
  • Canal to capture binlog
  • Kafka for verification

Create Topic

kafka-topics.sh --zookeeper h123.wzk.icu:2181 --create --replication-factor 3 --partitions 1 --topic dwshow

View Topics

kafka-topics.sh --zookeeper h123.wzk.icu:2181 --list

Start Producer

kafka-console-producer.sh --broker-list h123.wzk.icu:9092 --topic dwshow

Start Consumer

kafka-console-consumer.sh --bootstrap-server h123.wzk.icu:9092 --topic dwshow --from-beginning

Data Operations

At this point, if there are changes in MySQL data tables, row-type logs will be written to Kafka in JSON format.

INSERT Operation

{
    "data": [
        {
            "id": "6",
            "payMethod": "meituan",
            "payName": "美团支付",
            "description": "美团支付",
            "payOrder": "0",
            "online": "-1"
        }
    ],
    "database": "dwshow",
    "es": 1604461572000,
    "id": 6,
    "isDdl": false,
    "mysqlType": {
        "id": "int(11)",
        "payMethod": "varchar(20)",
        "payName": "varchar(255)",
        "description": "varchar(255)",
        "payOrder": "int(11)",
        "online": "tinyint(4)"
    },
    "old": null,
    "pkNames": null,
    "sql": "",
    "sqlType": {
        "id": 4,
        "payMethod": 12,
        "payName": 12,
        "description": 12,
        "payOrder": 4,
        "online": -6
    },
    "table": "wzk_payments",
    "ts": 1604461572297,
    "type": "INSERT"
}

UPDATE Operation

{
  "data": [
    {
      "productId": "115908",
      "productName": "索尼 xxx10",
      "shopId": "100365",
      "price": "300.0",
      "isSale": "1",
      "status": "0",
      "categoryId": "10395",
      "createTime": "2020-07-12 13:22:22",
      "modifyTime": "2020-09-27 02:51:16"
    }
  ],
  "database": "dwshow",
  "es": 1601189476000,
  "id": 456,
  "isDdl": false,
  "mysqlType": {
    "productId": "bigint(11)",
    "productName": "varchar(200)",
    "shopId": "bigint(11)",
    "price": "decimal(11,2)",
    "isSale": "tinyint(4)",
    "status": "tinyint(4)",
    "categoryId": "int(11)",
    "createTime": "varchar(25)",
    "modifyTime": "datetime"
  },
  "old": [
    {
      "price": "597.80",
      "modifyTime": "2020-07-12 13:22:22"
    }
  ],
  "pkNames": null,
  "sql": "",
  "sqlType": {
    "productId": -5,
    "productName": 12,
    "shopId": -5,
    "price": 3,
    "isSale": -6,
    "status": -6,
    "categoryId": 4,
    "createTime": 12,
    "modifyTime": 93
  },
  "table": "wzk_product_info",
  "ts": 1601189477116,
  "type": "UPDATE"
}

DELETE Operation

{
  "data": [
    {
      "productId": "115908",
      "productName": "索尼 xxx10",
      "shopId": "100365",
      "price": "300.0",
      "isSale": "1",
      "status": "0",
      "categoryId": "10395",
      "createTime": "2020-07-12 13:22:22",
      "modifyTime": "2020-09-27 02:51:16"
    }
  ],
  "database": "dwshow",
  "es": 1601189576000,
  "id": 457,
  "isDdl": false,
  "mysqlType": {
    "productId": "bigint(11)",
    "productName": "varchar(200)",
    "shopId": "bigint(11)",
    "price": "decimal(11,2)",
    "isSale": "tinyint(4)",
    "status": "tinyint(4)",
    "categoryId": "int(11)",
    "createTime": "varchar(25)",
    "modifyTime": "datetime"
  },
  "old": null,
  "pkNames": null,
  "sql": "",
  "sqlType": {
    "productId": -5,
    "productName": 12,
    "shopId": -5,
    "price": 3,
    "isSale": -6,
    "status": -6,
    "categoryId": 4,
    "createTime": 12,
    "modifyTime": 93
  },
  "table": "wzk_product_info",
  "ts": 1601189576594,
  "type": "DELETE"
}

JSON Format Description

  • data: Latest data, JSON array. For INSERT, represents newly inserted data; for UPDATE, represents latest data after update; for DELETE, represents deleted data
  • database: Database name
  • es: Event time, 13-digit timestamp
  • id: Sequence number of event operation: 1, 2, 3
  • isDdl: Whether it’s a DDL operation
  • mysqlType: Field types
  • old: Old data
  • pkNames: Primary key names
  • sql: SQL statement
  • sqlType: Converted by Canal, e.g., unsigned int is converted to Long, unsigned long is converted to BigDecimal
  • table: Table name
  • ts: Log time
  • type: Operation type such as DELETE, UPDATE, INSERT