Debezium Kafka Mysql CDC Uygulama Örneği
Dokcer imajı oluşturuyoruz. docker-compose.yml
dosyamızda olan servislerimiz şöyle;
- MySql
- Zookeper
- Kafka
- Debezium Connector
Debezium Docker Dosyası Oluşturmak
version: '3.1'
services:
db:
image: mysql
environment:
MYSQL_ROOT_PASSWORD: root
MYSQL_DATABASE: test_db
ports:
- "3306:3306"
volumes:
- ./mysql:/data/mysql
zookeeper:
image: confluentinc/cp-zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
kafka:
image: confluentinc/cp-kafka
depends_on:
- zookeeper
- db
ports:
- "9092:9092"
environment:
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_LOG_CLEANER_DELETE_RETENTION_MS: 5000
KAFKA_BROKER_ID: 1
KAFKA_MIN_INSYNC_REPLICAS: 1
connector:
image: debezium/connect:latest
ports:
- "8083:8083"
environment:
GROUP_ID: 1
CONFIG_STORAGE_TOPIC: my_connect_configs
OFFSET_STORAGE_TOPIC: my_connect_offsets
BOOTSTRAP_SERVERS: kafka:9092
depends_on:
- zookeeper
- db
- kafka
Code language: JavaScript (javascript)
Docker dosyamızın içerisinde veritabanı bilgileri, kafka, kafka connect ve zookeeper bilgileri mevcut.
Docker dosyamızın içerisindeki environment kısmı oldukça önemli. Önemli olmasının nedeni ise Kafka Connect ayarlarımızı belirtiyoruz
- BOOTSTRAP_SERVERS
- Veritabanımızla Kafka arasında bağlantıyı kuracak olan Kafka Connectin, Kafka bağlantı bilgisini veriyoruz.
- CONFIG_STORAGE_TOPIC
- Veritabanı bağlantı bilgilerinin olduğu konfigürasyon parametresidir.
- OFFSET_STORAGE_TOPIC
- Veritabanı içerisinde oluşan transaction bilgilerinin olduğu parametredir.
Yukarıdaki environment bilgisini belirtmek zorunludur.
Docker imajımızı çalıştıralım.
docker-compose up
Imajımız kurulduktan ve ayağa kaldırdık.
Mysql Veritabanına Bağlanma, Veritabanı ve Tablo Oluşturma İşlemleri
Mysql veritabanımıza bir veritabanı oluşturalım ve içerisine tablo sonrasında bir kayıt ekleyelim.
Mysql veritabanımıza girelim;
docker exec -it imageId bash
Veritabanımıza bağlanalım.
mysql -uroot -proot
Veritabanı ve tablomuzu oluşturalım.
CREATE DATABASE testdb;
CREATE TABLE testdb.person(id SERIAL PRIMARY KEY, age int, name varchar(50));
Code language: CSS (css)
Veritabanımıza bilgilerimizi girdik.
Debezium Connector Oluşturmak
Debezium connector bilgilerini gireceğiz. Debezium connector bilgilerinin amacı hangi veritabanına bağlacağını ve tespit edilen değişiklikler sonucunda hangi kafka servisine göndereceğini söylüyoruz.
Bunun için debeziumun çalışan portundaki connectors urline istek göndererek bu bilgileri yolluyoruz.
curl -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '
{
"name": "testdb-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "db",
"database.port": "3306",
"database.user": "root",
"database.allowPublicKeyRetrieval":"true",
"database.password": "root",
"database.dbname" : "testdb",
"database.server.name": "dbserver1",
"database.whitelist": "testdb.person",
"heartbeat.interval.ms": "5000",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "schema-changes.testdb",
"include.schema.changes": "true",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
}
}'
Code language: PHP (php)
Gönderdiğimiz bilgiler içerisinde dikkat etmemiz gereken bazı parametreler mevcut.
- database.whitelist
- Bu alana bir tablo ismi girdiğimizde sadece o tabloda olan değişikleri dinleyemeye başlarız, eğer bir değer girmezsek bütün veritabanımız içerisindeki tabloları dinlemiş olacağız.
- heartbeat.interval.ms
- Veritabanının durup durmadığı kontrol etmemiz için gerekli olan parametredir. Veritabanının ayakta olup olmadığını bu topic bilgisi sayesinde anlamamız mümkün olmaktadır.
- shemas
- Dinlediğimiz tablodaki alanlarımızın tip bilgilerinin gönderilip gönderilmediği belirleyebiliyoruz.
- “database.allowPublicKeyRetrieval”:”true”,
- Değeri belirtilmezse connectoru Debezium’a bağlayamıyoruz. Bu değer MYSQL’de olmalıdır. Postgressql’de yaptığım denemede bu parametreye ihtiyacım olmadı.
Bu debezium connectorumuzu gönderdik peki Debezium bu connector bilgilerini kabul edip çalıştırmaya başladımı? Bunu öğrenmek için aşağıdaki urli yazarak Debezium içerisinde var olan connector bilgilerini görebiliriz.
curl -X GET -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/
Code language: JavaScript (javascript)
Gelen sonuç
"testdb-connector"
Code language: JSON / JSON with Comments (json)
Kafka ile Debeziumu Dinlemek
Docker imajımızın içerisindeki Kafkamızı inceleyelim ve sonrasında ise Veritabanımızı CDC ile dinlemeye başlayalım.
docker exec -it imageId bash
kafka-topics --zookeeper zookeeper:2181 --list
Code language: CSS (css)
Topiclerimizin listesini görelim;
__consumer_offset
__testdb-heartbeat.testdb
connect-status
dbserver1
my_connect_configs
my_connect_offsets
schema-changes.testdb
Code language: CSS (css)
buraya kadar herşey hazır. Topic’imiz oluştu ve debeziumdan gelecek veriler bu topic’te toplanacak.
Veritabanı işlemi yapalım ve Debezium bu değişikliği algılayalıp kafkamıza depolayacak.
Kafka topiğimize consume olalım ve payloadlara bakalım.
kafka-console-consumer --bootstrap-server kafka:9092 --from-beginning --topic dbserver1.testdb --property print.key=true --property key.separator="-"
Code language: JavaScript (javascript)
Yukarıdaki kod ile birlikte kafmıza bağlanmış olduk.
Şimdi bir insert işlemi yapalım;
insert into person(id, age,name) values(1, 30,'burak');
Code language: JavaScript (javascript)
şimdi ise payloadımıza bakalım.
{
"payload":{
"before":null,
"after":{
"id":1,
"age":32,
"name":"burak"
},
"source":{
"version":"1.0.2.Final",
"connector":"mysql",
"name":"dbserver1.testdb.person.Envelope",
"ts_ms":11244567345,
"snapshot":"false",
"db":"testdb",
"schema":"public",
"table":"person",
"txId":538,
"lsn":423313,
"xmin":null
},
"op":"c",
"ts_ms":11244567345
}
}
Code language: JSON / JSON with Comments (json)
update işlemi yapalım.
update person set age=99 where id=1
Code language: JavaScript (javascript)
payloadımıza tekrar bakalım.
{
"payload":{
"before":null,
"after":{
"id":1,
"age":99,
"name":"burak"
},
"source":{
"version":"1.0.2.Final",
"connector":"mysql",
"name":"dbserver1.testdb.person",
"ts_ms":354318484,
"snapshot":"false",
"db":"testdb",
"schema":"public",
"table":"person",
"txId":545,
"lsn":2341412,
"xmin":null
},
"op":"u",
"ts_ms":54654876
}
}
Code language: JSON / JSON with Comments (json)
Insert işlemi için: payload -> “op”:”c”
Update işlemi için: payload -> “op”:”u”
olarak gözükür.
Dikkat ettiyseniz Update işleminde before alanı null geldi. Önceki değişklik bilgilerini göremedik. Bunun için tablomuzun trigger özelliğini ayarlamamız gerekmektedir.
Debezium ile Mysql veritabanımızı CDC ettik ve Kafka ile dinledik.
İyi kodlamalar dilerim.
No Comment! Be the first one.