Twitter Youtube Github
Burak Kutbay'ın Kişisel Blog'u |

Type and hit Enter to search

  • Eğitim
  • Quarkus
  • Spring Cloud
  • Spring Boot
  • Spring JDBC Template
  • Spring MVC
  • Spring Core
  • Spring Örnekleri
  • Hibernate
  • Java Server Pages
  • Java Server Faces
  • PrimeFaces
  • Servlet
  • JDBC
  • JSTL
  • Java 8
  • / Diğer
    • Ünlü Bilişimciler
    • C Sharp
    • Kütüphane
    • Makale
Apache KafkaDebezium

Debezium Kafka Mysql CDC Uygulama Örneği

Paylaş
Debezium Nedir?

Dokcer imajı oluşturuyoruz. docker-compose.yml dosyamızda olan servislerimiz şöyle;

İçindekiler

  • Debezium Docker Dosyası Oluşturmak
  • Mysql Veritabanına Bağlanma, Veritabanı ve Tablo Oluşturma İşlemleri
  • Debezium Connector Oluşturmak
  • Kafka ile Debeziumu Dinlemek
  • MySql
  • Zookeper
  • Kafka
  • Debezium Connector

Debezium Docker Dosyası Oluşturmak


version: '3.1' services: db: image: mysql environment: MYSQL_ROOT_PASSWORD: root MYSQL_DATABASE: test_db ports: - "3306:3306" volumes: - ./mysql:/data/mysql zookeeper: image: confluentinc/cp-zookeeper ports: - "2181:2181" environment: ZOOKEEPER_CLIENT_PORT: 2181 kafka: image: confluentinc/cp-kafka depends_on: - zookeeper - db ports: - "9092:9092" environment: KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092 KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 KAFKA_LOG_CLEANER_DELETE_RETENTION_MS: 5000 KAFKA_BROKER_ID: 1 KAFKA_MIN_INSYNC_REPLICAS: 1 connector: image: debezium/connect:latest ports: - "8083:8083" environment: GROUP_ID: 1 CONFIG_STORAGE_TOPIC: my_connect_configs OFFSET_STORAGE_TOPIC: my_connect_offsets BOOTSTRAP_SERVERS: kafka:9092 depends_on: - zookeeper - db - kafka
Code language: JavaScript (javascript)

Docker dosyamızın içerisinde veritabanı bilgileri, kafka, kafka connect ve zookeeper bilgileri mevcut.

Docker dosyamızın içerisindeki environment kısmı oldukça önemli. Önemli olmasının nedeni ise Kafka Connect ayarlarımızı belirtiyoruz

  • BOOTSTRAP_SERVERS
    • Veritabanımızla Kafka arasında bağlantıyı kuracak olan Kafka Connectin, Kafka bağlantı bilgisini veriyoruz.
  • CONFIG_STORAGE_TOPIC
    • Veritabanı bağlantı bilgilerinin olduğu konfigürasyon parametresidir.
  • OFFSET_STORAGE_TOPIC
    • Veritabanı içerisinde oluşan transaction bilgilerinin olduğu parametredir.

Yukarıdaki environment bilgisini belirtmek zorunludur.

Docker imajımızı çalıştıralım.

docker-compose up

Imajımız kurulduktan ve ayağa kaldırdık.

Mysql Veritabanına Bağlanma, Veritabanı ve Tablo Oluşturma İşlemleri


Mysql veritabanımıza bir veritabanı oluşturalım ve içerisine tablo sonrasında bir kayıt ekleyelim.

Mysql veritabanımıza girelim;

docker exec -it imageId bash

Veritabanımıza bağlanalım.

mysql -uroot -proot

Veritabanı ve tablomuzu oluşturalım.

CREATE DATABASE testdb; CREATE TABLE testdb.person(id SERIAL PRIMARY KEY, age int, name varchar(50));
Code language: CSS (css)

Veritabanımıza bilgilerimizi girdik.

Debezium Connector Oluşturmak


Debezium connector bilgilerini gireceğiz. Debezium connector bilgilerinin amacı hangi veritabanına bağlacağını ve tespit edilen değişiklikler sonucunda hangi kafka servisine göndereceğini söylüyoruz.

 Bunun için debeziumun çalışan portundaki connectors urline istek göndererek bu bilgileri yolluyoruz.

curl -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d ' { "name": "testdb-connector", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "database.hostname": "db", "database.port": "3306", "database.user": "root", "database.allowPublicKeyRetrieval":"true", "database.password": "root", "database.dbname" : "testdb", "database.server.name": "dbserver1", "database.whitelist": "testdb.person", "heartbeat.interval.ms": "5000", "database.history.kafka.bootstrap.servers": "kafka:9092", "database.history.kafka.topic": "schema-changes.testdb", "include.schema.changes": "true", "key.converter": "org.apache.kafka.connect.json.JsonConverter", "key.converter.schemas.enable": "false", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter.schemas.enable": "false", } }'
Code language: PHP (php)

Gönderdiğimiz bilgiler içerisinde dikkat etmemiz gereken bazı parametreler mevcut.

  • database.whitelist
    • Bu alana bir tablo ismi girdiğimizde sadece o tabloda olan değişikleri dinleyemeye başlarız, eğer bir değer girmezsek bütün veritabanımız içerisindeki tabloları dinlemiş olacağız.
  • heartbeat.interval.ms
    • Veritabanının durup durmadığı kontrol etmemiz için gerekli olan parametredir. Veritabanının ayakta olup olmadığını bu topic bilgisi sayesinde anlamamız mümkün olmaktadır.
  • shemas
    • Dinlediğimiz tablodaki alanlarımızın tip bilgilerinin gönderilip gönderilmediği belirleyebiliyoruz.
  •  “database.allowPublicKeyRetrieval”:”true”, 
    • Değeri belirtilmezse connectoru Debezium’a bağlayamıyoruz. Bu değer MYSQL’de olmalıdır. Postgressql’de yaptığım denemede bu parametreye ihtiyacım olmadı.

Bu debezium connectorumuzu gönderdik peki Debezium bu connector bilgilerini kabul edip çalıştırmaya başladımı? Bunu öğrenmek için aşağıdaki urli yazarak Debezium içerisinde var olan connector bilgilerini görebiliriz.

curl -X GET -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/
Code language: JavaScript (javascript)

Gelen sonuç

"testdb-connector"
Code language: JSON / JSON with Comments (json)

Kafka ile Debeziumu Dinlemek


Docker imajımızın içerisindeki Kafkamızı inceleyelim ve sonrasında ise Veritabanımızı CDC ile dinlemeye başlayalım.

docker exec -it imageId bash kafka-topics --zookeeper zookeeper:2181 --list
Code language: CSS (css)

Topiclerimizin listesini görelim;

__consumer_offset __testdb-heartbeat.testdb connect-status dbserver1 my_connect_configs my_connect_offsets schema-changes.testdb
Code language: CSS (css)

buraya kadar herşey hazır. Topic’imiz oluştu ve debeziumdan gelecek veriler bu topic’te toplanacak.

Veritabanı işlemi yapalım ve Debezium bu değişikliği algılayalıp kafkamıza depolayacak.

Kafka topiğimize consume olalım ve payloadlara bakalım.

kafka-console-consumer --bootstrap-server kafka:9092 --from-beginning --topic dbserver1.testdb --property print.key=true --property key.separator="-"
Code language: JavaScript (javascript)

Yukarıdaki kod ile birlikte kafmıza bağlanmış olduk.

Şimdi bir insert işlemi yapalım;

insert into person(id, age,name) values(1, 30,'burak');
Code language: JavaScript (javascript)

şimdi ise payloadımıza bakalım.

{ "payload":{ "before":null, "after":{ "id":1, "age":32, "name":"burak" }, "source":{ "version":"1.0.2.Final", "connector":"mysql", "name":"dbserver1.testdb.person.Envelope", "ts_ms":11244567345, "snapshot":"false", "db":"testdb", "schema":"public", "table":"person", "txId":538, "lsn":423313, "xmin":null }, "op":"c", "ts_ms":11244567345 } }
Code language: JSON / JSON with Comments (json)

update işlemi yapalım.

update person set age=99 where id=1
Code language: JavaScript (javascript)

payloadımıza tekrar bakalım.

{ "payload":{ "before":null, "after":{ "id":1, "age":99, "name":"burak" }, "source":{ "version":"1.0.2.Final", "connector":"mysql", "name":"dbserver1.testdb.person", "ts_ms":354318484, "snapshot":"false", "db":"testdb", "schema":"public", "table":"person", "txId":545, "lsn":2341412, "xmin":null }, "op":"u", "ts_ms":54654876 } }
Code language: JSON / JSON with Comments (json)

Insert işlemi için: payload -> “op”:”c”

Update işlemi için: payload -> “op”:”u”

olarak gözükür.

Dikkat ettiyseniz Update işleminde before alanı null geldi. Önceki değişklik bilgilerini göremedik. Bunun için tablomuzun trigger özelliğini ayarlamamız gerekmektedir.

Debezium ile Mysql veritabanımızı CDC ettik ve Kafka ile dinledik.

İyi kodlamalar dilerim.

Tags:

apache kafka örnekcdc debeziumdebezium connectordebezium kafkadebezium mimaridebezium nedirdebezium örnektransaction log

Paylaş

Diğer Yazılar

Previous

Debezium Nedir Mimarisi ve Apache Kafka Connector

Quarkus Dersleri Uygulama Örnekleri
Next

Quarkus Projesi Konfigüre Etmek

Next
Quarkus Dersleri Uygulama Örnekleri
22 Ocak 2021

Quarkus Projesi Konfigüre Etmek

Previews
19 Ocak 2021

Debezium Nedir Mimarisi ve Apache Kafka Connector

blank

No Comment! Be the first one.

Bir cevap yazın Cevabı iptal et

E-posta hesabınız yayımlanmayacak. Gerekli alanlar * ile işaretlenmişlerdir

İlgini Çekebilir

blank

Debezium Nedir Mimarisi ve Apache Kafka Connector

SPRİNG BOOT DERSLERİ UYGULAMA ÖRNEKLERİ

Spring Boot ile Apache Kafka Uygulaması

Burak Kutbay'ın Kişisel Blog'u |

© 2008 - ∞, Her hakkı saklıdır.

Link

  • Hakkımda
  • İletişim
  • Arşiv

Kategori

Cloud
Amazon Web Services
Vue.js
Gradle
Node.js
Android
Struts
Redis
Röportaj
Spring Data
Spring Cloud Stream
XCode
Debezium
Mikroservis Mimarisi
Video
Spring Native
iPhone Uygulama Geliştirme
Objective C
JPA
Spring Security
PostgreSQL
Apache Kafka
Git
Servlet
Yaptığım Projeler
Maven
Design Patterns
DevOps
Linux
RabbitMQ
PrimeFaces
JDBC
Spring Jdbc Template
Spring
Spring MVC
Manset
Google
Spring Cloud
OCA Java SE 8
Spring Core
Quarkus
Microsoft
Veritabanı
Tanıyalım
Java SE
Hibernate
Teknoloji
Okuduğum Kitaplar
Java Server Faces
Yazılım Mühendisliği
C Sharp
Spring Boot
Java
Günlüğüm
Java Server Page
Makale

Takip Et

Twitter Youtube Github
  • Eğitim
  • Quarkus
  • Spring Cloud
  • Spring Boot
  • Spring JDBC Template
  • Spring MVC
  • Spring Core
  • Spring Örnekleri
  • Hibernate
  • Java Server Pages
  • Java Server Faces
  • PrimeFaces
  • Servlet
  • JDBC
  • JSTL
  • Java 8
  • / Diğer
    • Ünlü Bilişimciler
    • C Sharp
    • Kütüphane
    • Makale