Dive into Debezium

March 11, 2022

Debezium


Debezium (dee-BEE-zee-uhm) Debeziumは、データベースの行レベルの変更をキャプチャし、 アプリケーションがその変更を取得できるようにした分散型サービスです。

発音のカタカナ表記は、ディビージウムが近いと思います。

例えば、基幹業務でPostgreSQLを使っているとして、レコードが変更されたらNeo4jのノードやリレーションも変更したいとします。

単純な例ですが、意外と多いのが一定時間毎にcsvなどで差分のデータを取得し反映することではないでしょうか。 これでも良い場合がありますが、データの削除がある場合はどうするのでしょうか? 物理削除をしてしまうと習得できないので、論理削除という形でレコードの中に削除フラグを付けて対応でしょうか。 1日に1回とかの更新だと、PostgreSQLのデータは今日でも、Neo4jのデータは昨日のデータになるのは困りますね。 なので、1時間に1回とか、5分に1回とかの処理にするわけです。

こう考えると、更新された時だけにリアルタイムに情報をキューに出して、受け取り側が取り込む仕組みが作れれば 良いのでは無いかと思うわけです。

ここのサイトを参考にしました https://debezium.io/documentation/reference/1.8/tutorial.html

準備


以下の材料を用意します。

なお、今回の作業はmacOS/Intelで行っています。

  1. PC
    • ターミナル (Terminal)
    • Docker Desktop
      • 2GBの空き容量(ちょっと大きめです)
  2. インターネット回線
    • ダウンロードするモジュールが大きいのでそれなりの回線

Debeziumに必要なプロダクト


Debeziumは、ZooKeeper、Kafka、Debeziumの3つのサービスで構成されます。 これに、接続するデータベース(今回は、MySQL)が必要になります。

  1. Apache ZooKeeper (分散システムの分散構成や同期、ネーミングレジストリを提供)
  2. Apache Kafka (分散型イベントストリーミングプラットフォーム)
  3. Debezium connector service
  4. MySQL (データーベース)

Docker Pull


では、Docker Pullしましょう。

https://hub.docker.com/u/debezium/

% docker pull debezium/zookeeper:1.8.0.Final
% docker pull debezium/kafka:1.8.0.Final
% docker pull debezium/connect:1.8.0.Final
% docker pull debezium/example-mysql:1.8.0.Final

Docker Run


次に、Docker Runです。

Start Zookeeper


% docker run -it --rm \
--name zookeeper \
-p 2181:2181 -p 2888:2888 -p 3888:3888 \
debezium/zookeeper:1.8.0.Final

Start Kafka


% docker run -it --rm \
--name kafka \
-p 9092:9092 \
--link zookeeper:zookeeper \
debezium/kafka:1.8.0.Final

Start a MySQL database


% docker run -it --rm \
--name mysql \
-p 3306:3306 \
-e MYSQL_ROOT_PASSWORD=debezium \
-e MYSQL_USER=mysqluser \
-e MYSQL_PASSWORD=mysqlpw \
debezium/example-mysql:1.8.0.Final

Starting a MySQL command line client


% docker run -it --rm \
--name mysqlterm \
--link mysql \
--rm mysql:8.0 sh -c 'exec mysql -h"$MYSQL_PORT_3306_TCP_ADDR" -P"$MYSQL_PORT_3306_TCP_PORT" -uroot -p"$MYSQL_ENV_MYSQL_ROOT_PASSWORD"'

inventory database

mysql> use inventory;
Database changed
mysql> show tables;
+---------------------+
| Tables_in_inventory |
+---------------------+
| addresses           |
| customers           |
| geom                |
| orders              |
| products            |
| products_on_hand    |
+---------------------+
mysql> SELECT * FROM customers;
+------+------------+-----------+-----------------------+
| id   | first_name | last_name | email                 |
+------+------------+-----------+-----------------------+
| 1001 | Sally      | Thomas    | sally.thomas@acme.com |
| 1002 | George     | Bailey    | gbailey@foobar.com    |
| 1003 | Edward     | Walker    | ed@walker.com         |
| 1004 | Anne       | Kretchmar | annek@noanswer.org    |
+------+------------+-----------+-----------------------+

Start Kafka Connect


% docker run -it --rm \
--name connect \
-p 8083:8083 \
-e GROUP_ID=1 \
-e CONFIG_STORAGE_TOPIC=my_connect_configs \
-e OFFSET_STORAGE_TOPIC=my_connect_offsets \
-e STATUS_STORAGE_TOPIC=my_connect_statuses \
--link zookeeper:zookeeper \
--link kafka:kafka \
--link mysql:mysql \
debezium/connect:1.8.0.Final

Use the Kafka Connect REST API to check the status of the Kafka Connect service.

$ curl -H "Accept:application/json" localhost:8083/

{"version":"3.0.0","commit":"8cb0a5e9d3441962","kafka_cluster_id":"Ib7tp7dTSs2rm5V4EAYKgw"}%

Check the list of connectors

$ curl -H "Accept:application/json" localhost:8083/connectors/

[]%  

Deploying the MySQL connector

Register the Debezium MySQL connector.

$ curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{ "name": "inventory-connector", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "mysql", "database.port": "3306", "database.user": "debezium", "database.password": "dbz", "database.server.id": "184054", "database.server.name": "dbserver1", "database.include.list": "inventory", "database.history.kafka.bootstrap.servers": "kafka:9092", "database.history.kafka.topic": "dbhistory.inventory" } }'

HTTP/1.1 201 Created
Date: Mon, 24 Jan 2022 10:40:04 GMT
Location: http://localhost:8083/connectors/inventory-connector
Content-Type: application/json
Content-Length: 490
Server: Jetty(9.4.43.v20210629)

{"name":"inventory-connector","config":{"connector.class":"io.debezium.connector.mysql.MySqlConnector","tasks.max":"1","database.hostname":"mysql","database.port":"3306","database.user":"debezium","database.password":"dbz","database.server.id":"184054","database.server.name":"dbserver1","database.include.list":"inventory","database.history.kafka.bootstrap.servers":"kafka:9092","database.history.kafka.topic":"dbhistory.inventory","name":"inventory-connector"},"tasks":[],"type":"source"}%   

Check the list of connectors

$ curl -H "Accept:application/json" localhost:8083/connectors/

["inventory-connector"]% 

Review the connector’s tasks

$ curl -i -X GET -H "Accept:application/json" localhost:8083/connectors/inventory-connector

HTTP/1.1 200 OK
Date: Mon, 24 Jan 2022 10:46:41 GMT
Content-Type: application/json
Content-Length: 534
Server: Jetty(9.4.43.v20210629)

{"name":"inventory-connector","config":{"connector.class":"io.debezium.connector.mysql.MySqlConnector","database.user":"debezium","database.server.id":"184054","tasks.max":"1","database.hostname":"mysql","database.password":"dbz","database.history.kafka.bootstrap.servers":"kafka:9092","database.history.kafka.topic":"dbhistory.inventory","name":"inventory-connector","database.server.name":"dbserver1","database.port":"3306","database.include.list":"inventory"},"tasks":[{"connector":"inventory-connector","task":0}],"type":"source"}%  

Profile picture

Written by Koji Annoura who lives and works in Fukuoka Japan. You should follow them on Twitter