Apache KafkaDebezium

Debezium Kafka Mysql CDC Uygulama Örneği

Debezium Nedir?

Dokcer imajı oluşturuyoruz. docker-compose.yml dosyamızda olan servislerimiz şöyle;

  • MySql
  • Zookeper
  • Kafka
  • Debezium Connector

Debezium Docker Dosyası Oluşturmak


version: '3.1'
services:
    db:
      image: mysql
      environment:
        MYSQL_ROOT_PASSWORD: root
        MYSQL_DATABASE: test_db
      ports:
        - "3306:3306"
      volumes:
       - ./mysql:/data/mysql
    zookeeper:
        image: confluentinc/cp-zookeeper
        ports:
          - "2181:2181"
        environment:
          ZOOKEEPER_CLIENT_PORT: 2181
    kafka:
        image: confluentinc/cp-kafka
        depends_on:
          - zookeeper
          - db
        ports:
          - "9092:9092"
        environment:
          KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
          KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
          KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
          KAFKA_LOG_CLEANER_DELETE_RETENTION_MS: 5000
          KAFKA_BROKER_ID: 1
          KAFKA_MIN_INSYNC_REPLICAS: 1
    connector:
        image: debezium/connect:latest
        ports:
          - "8083:8083"
        environment:
          GROUP_ID: 1
          CONFIG_STORAGE_TOPIC: my_connect_configs
          OFFSET_STORAGE_TOPIC: my_connect_offsets
          BOOTSTRAP_SERVERS: kafka:9092
        depends_on:
          - zookeeper
          - db
          - kafka

Docker dosyamızın içerisinde veritabanı bilgileri, kafka, kafka connect ve zookeeper bilgileri mevcut.

Docker dosyamızın içerisindeki environment kısmı oldukça önemli. Önemli olmasının nedeni ise Kafka Connect ayarlarımızı belirtiyoruz

  • BOOTSTRAP_SERVERS
    • Veritabanımızla Kafka arasında bağlantıyı kuracak olan Kafka Connectin, Kafka bağlantı bilgisini veriyoruz.
  • CONFIG_STORAGE_TOPIC
    • Veritabanı bağlantı bilgilerinin olduğu konfigürasyon parametresidir.
  • OFFSET_STORAGE_TOPIC
    • Veritabanı içerisinde oluşan transaction bilgilerinin olduğu parametredir.

Yukarıdaki environment bilgisini belirtmek zorunludur.

Docker imajımızı çalıştıralım.

docker-compose up

Imajımız kurulduktan ve ayağa kaldırdık.

Mysql Veritabanına Bağlanma, Veritabanı ve Tablo Oluşturma İşlemleri


Mysql veritabanımıza bir veritabanı oluşturalım ve içerisine tablo sonrasında bir kayıt ekleyelim.

Mysql veritabanımıza girelim;

docker exec -it imageId bash

Veritabanımıza bağlanalım.

mysql -uroot -proot

Veritabanı ve tablomuzu oluşturalım.

CREATE DATABASE testdb;

CREATE TABLE testdb.person(id SERIAL PRIMARY KEY, age int, name varchar(50));

Veritabanımıza bilgilerimizi girdik.

Debezium Connector Oluşturmak


Debezium connector bilgilerini gireceğiz. Debezium connector bilgilerinin amacı hangi veritabanına bağlacağını ve tespit edilen değişiklikler sonucunda hangi kafka servisine göndereceğini söylüyoruz.

 Bunun için debeziumun çalışan portundaki connectors urline istek göndererek bu bilgileri yolluyoruz.

curl -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '
{
 "name": "testdb-connector",
 "config": {
 "connector.class": "io.debezium.connector.mysql.MySqlConnector",
 "database.hostname": "db",
 "database.port": "3306",
 "database.user": "root",
 "database.allowPublicKeyRetrieval":"true",
 "database.password": "root",
 "database.dbname" : "testdb",
 "database.server.name": "dbserver1",
 "database.whitelist": "testdb.person",
 "heartbeat.interval.ms": "5000",
 "database.history.kafka.bootstrap.servers": "kafka:9092",
 "database.history.kafka.topic": "schema-changes.testdb",
 "include.schema.changes": "true",
 "key.converter": "org.apache.kafka.connect.json.JsonConverter",
 "key.converter.schemas.enable": "false",
 "value.converter": "org.apache.kafka.connect.json.JsonConverter",
 "value.converter.schemas.enable": "false",
 }
}'

Gönderdiğimiz bilgiler içerisinde dikkat etmemiz gereken bazı parametreler mevcut.

  • database.whitelist
    • Bu alana bir tablo ismi girdiğimizde sadece o tabloda olan değişikleri dinleyemeye başlarız, eğer bir değer girmezsek bütün veritabanımız içerisindeki tabloları dinlemiş olacağız.
  • heartbeat.interval.ms
    • Veritabanının durup durmadığı kontrol etmemiz için gerekli olan parametredir. Veritabanının ayakta olup olmadığını bu topic bilgisi sayesinde anlamamız mümkün olmaktadır.
  • shemas
    • Dinlediğimiz tablodaki alanlarımızın tip bilgilerinin gönderilip gönderilmediği belirleyebiliyoruz.
  •  “database.allowPublicKeyRetrieval”:”true”, 
    • Değeri belirtilmezse connectoru Debezium’a bağlayamıyoruz. Bu değer MYSQL’de olmalıdır. Postgressql’de yaptığım denemede bu parametreye ihtiyacım olmadı.

Bu debezium connectorumuzu gönderdik peki Debezium bu connector bilgilerini kabul edip çalıştırmaya başladımı? Bunu öğrenmek için aşağıdaki urli yazarak Debezium içerisinde var olan connector bilgilerini görebiliriz.

curl -X GET -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ 

Gelen sonuç

"testdb-connector"

Kafka ile Debeziumu Dinlemek


Docker imajımızın içerisindeki Kafkamızı inceleyelim ve sonrasında ise Veritabanımızı CDC ile dinlemeye başlayalım.

docker exec -it imageId bash

kafka-topics --zookeeper zookeeper:2181 --list

Topiclerimizin listesini görelim;

__consumer_offset
__testdb-heartbeat.testdb
connect-status
dbserver1
my_connect_configs
my_connect_offsets
schema-changes.testdb

buraya kadar herşey hazır. Topic’imiz oluştu ve debeziumdan gelecek veriler bu topic’te toplanacak.

Veritabanı işlemi yapalım ve Debezium bu değişikliği algılayalıp kafkamıza depolayacak.

Kafka topiğimize consume olalım ve payloadlara bakalım.

kafka-console-consumer --bootstrap-server kafka:9092 --from-beginning --topic dbserver1.testdb --property print.key=true --property key.separator="-"

Yukarıdaki kod ile birlikte kafmıza bağlanmış olduk.

Şimdi bir insert işlemi yapalım;

insert into person(id, age,name) values(1, 30,'burak');

şimdi ise payloadımıza bakalım.

{
   "payload":{
      "before":null,
      "after":{
         "id":1,
         "age":32,
         "name":"burak"
      },
      "source":{
         "version":"1.0.2.Final",
         "connector":"mysql",
         "name":"dbserver1.testdb.person.Envelope",
         "ts_ms":11244567345,
         "snapshot":"false",
         "db":"testdb",
         "schema":"public",
         "table":"person",
         "txId":538,
         "lsn":423313,
         "xmin":null
      },
      "op":"c",
      "ts_ms":11244567345
   }
}

update işlemi yapalım.

update person set age=99 where id=1

payloadımıza tekrar bakalım.

{
   "payload":{
      "before":null,
      "after":{
         "id":1,
         "age":99,
         "name":"burak"
      },
      "source":{
         "version":"1.0.2.Final",
         "connector":"mysql",
         "name":"dbserver1.testdb.person",
         "ts_ms":354318484,
         "snapshot":"false",
         "db":"testdb",
         "schema":"public",
         "table":"person",
         "txId":545,
         "lsn":2341412,
         "xmin":null
      },
      "op":"u",
      "ts_ms":54654876
   }
}

Insert işlemi için: payload -> “op”:”c”

Update işlemi için: payload -> “op”:”u”

olarak gözükür.

Dikkat ettiyseniz Update işleminde before alanı null geldi. Önceki değişklik bilgilerini göremedik. Bunun için tablomuzun trigger özelliğini ayarlamamız gerekmektedir.

Debezium ile Mysql veritabanımızı CDC ettik ve Kafka ile dinledik.

İyi kodlamalar dilerim.

DAHA FAZLASI:Apache Kafka

YORUM YAP

E-posta hesabınız yayımlanmayacak. Gerekli alanlar * ile işaretlenmişlerdir