Debezium
Debezium (dee-BEE-zee-uhm) Debeziumは、データベースの行レベルの変更をキャプチャし、 アプリケーションがその変更を取得できるようにした分散型サービスです。
発音のカタカナ表記は、ディビージウムが近いと思います。
例えば、基幹業務でPostgreSQLを使っているとして、レコードが変更されたらNeo4jのノードやリレーションも変更したいとします。
単純な例ですが、意外と多いのが一定時間毎にcsvなどで差分のデータを取得し反映することではないでしょうか。 これでも良い場合がありますが、データの削除がある場合はどうするのでしょうか? 物理削除をしてしまうと習得できないので、論理削除という形でレコードの中に削除フラグを付けて対応でしょうか。 1日に1回とかの更新だと、PostgreSQLのデータは今日でも、Neo4jのデータは昨日のデータになるのは困りますね。 なので、1時間に1回とか、5分に1回とかの処理にするわけです。
こう考えると、更新された時だけにリアルタイムに情報をキューに出して、受け取り側が取り込む仕組みが作れれば 良いのでは無いかと思うわけです。
ここのサイトを参考にしました https://debezium.io/documentation/reference/1.8/tutorial.html
準備
以下の材料を用意します。
なお、今回の作業はmacOS/Intelで行っています。
- PC
- ターミナル (Terminal)
- Docker Desktop
- 2GBの空き容量(ちょっと大きめです)
- インターネット回線
- ダウンロードするモジュールが大きいのでそれなりの回線
Debeziumに必要なプロダクト
Debeziumは、ZooKeeper、Kafka、Debeziumの3つのサービスで構成されます。 これに、接続するデータベース(今回は、MySQL)が必要になります。
- Apache ZooKeeper (分散システムの分散構成や同期、ネーミングレジストリを提供)
- Apache Kafka (分散型イベントストリーミングプラットフォーム)
- Debezium connector service
- MySQL (データーベース)
Docker Pull
では、Docker Pullしましょう。
https://hub.docker.com/u/debezium/
% docker pull debezium/zookeeper:1.8.0.Final
% docker pull debezium/kafka:1.8.0.Final
% docker pull debezium/connect:1.8.0.Final
% docker pull debezium/example-mysql:1.8.0.Final
Docker Run
次に、Docker Runです。
Start Zookeeper
% docker run -it --rm \
--name zookeeper \
-p 2181:2181 -p 2888:2888 -p 3888:3888 \
debezium/zookeeper:1.8.0.Final
Start Kafka
% docker run -it --rm \
--name kafka \
-p 9092:9092 \
--link zookeeper:zookeeper \
debezium/kafka:1.8.0.Final
Start a MySQL database
% docker run -it --rm \
--name mysql \
-p 3306:3306 \
-e MYSQL_ROOT_PASSWORD=debezium \
-e MYSQL_USER=mysqluser \
-e MYSQL_PASSWORD=mysqlpw \
debezium/example-mysql:1.8.0.Final
Starting a MySQL command line client
% docker run -it --rm \
--name mysqlterm \
--link mysql \
--rm mysql:8.0 sh -c 'exec mysql -h"$MYSQL_PORT_3306_TCP_ADDR" -P"$MYSQL_PORT_3306_TCP_PORT" -uroot -p"$MYSQL_ENV_MYSQL_ROOT_PASSWORD"'
inventory database
mysql> use inventory;
Database changed
mysql> show tables;
+---------------------+
| Tables_in_inventory |
+---------------------+
| addresses |
| customers |
| geom |
| orders |
| products |
| products_on_hand |
+---------------------+
mysql> SELECT * FROM customers;
+------+------------+-----------+-----------------------+
| id | first_name | last_name | email |
+------+------------+-----------+-----------------------+
| 1001 | Sally | Thomas | sally.thomas@acme.com |
| 1002 | George | Bailey | gbailey@foobar.com |
| 1003 | Edward | Walker | ed@walker.com |
| 1004 | Anne | Kretchmar | annek@noanswer.org |
+------+------------+-----------+-----------------------+
Start Kafka Connect
% docker run -it --rm \
--name connect \
-p 8083:8083 \
-e GROUP_ID=1 \
-e CONFIG_STORAGE_TOPIC=my_connect_configs \
-e OFFSET_STORAGE_TOPIC=my_connect_offsets \
-e STATUS_STORAGE_TOPIC=my_connect_statuses \
--link zookeeper:zookeeper \
--link kafka:kafka \
--link mysql:mysql \
debezium/connect:1.8.0.Final
Use the Kafka Connect REST API to check the status of the Kafka Connect service.
$ curl -H "Accept:application/json" localhost:8083/
{"version":"3.0.0","commit":"8cb0a5e9d3441962","kafka_cluster_id":"Ib7tp7dTSs2rm5V4EAYKgw"}%
Check the list of connectors
$ curl -H "Accept:application/json" localhost:8083/connectors/
[]%
Deploying the MySQL connector
Register the Debezium MySQL connector.
$ curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{ "name": "inventory-connector", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "mysql", "database.port": "3306", "database.user": "debezium", "database.password": "dbz", "database.server.id": "184054", "database.server.name": "dbserver1", "database.include.list": "inventory", "database.history.kafka.bootstrap.servers": "kafka:9092", "database.history.kafka.topic": "dbhistory.inventory" } }'
HTTP/1.1 201 Created
Date: Mon, 24 Jan 2022 10:40:04 GMT
Location: http://localhost:8083/connectors/inventory-connector
Content-Type: application/json
Content-Length: 490
Server: Jetty(9.4.43.v20210629)
{"name":"inventory-connector","config":{"connector.class":"io.debezium.connector.mysql.MySqlConnector","tasks.max":"1","database.hostname":"mysql","database.port":"3306","database.user":"debezium","database.password":"dbz","database.server.id":"184054","database.server.name":"dbserver1","database.include.list":"inventory","database.history.kafka.bootstrap.servers":"kafka:9092","database.history.kafka.topic":"dbhistory.inventory","name":"inventory-connector"},"tasks":[],"type":"source"}%
Check the list of connectors
$ curl -H "Accept:application/json" localhost:8083/connectors/
["inventory-connector"]%
Review the connector’s tasks
$ curl -i -X GET -H "Accept:application/json" localhost:8083/connectors/inventory-connector
HTTP/1.1 200 OK
Date: Mon, 24 Jan 2022 10:46:41 GMT
Content-Type: application/json
Content-Length: 534
Server: Jetty(9.4.43.v20210629)
{"name":"inventory-connector","config":{"connector.class":"io.debezium.connector.mysql.MySqlConnector","database.user":"debezium","database.server.id":"184054","tasks.max":"1","database.hostname":"mysql","database.password":"dbz","database.history.kafka.bootstrap.servers":"kafka:9092","database.history.kafka.topic":"dbhistory.inventory","name":"inventory-connector","database.server.name":"dbserver1","database.port":"3306","database.include.list":"inventory"},"tasks":[{"connector":"inventory-connector","task":0}],"type":"source"}%