Apache KafkaSpring Boot

Spring Boot ile Apache Kafka Uygulaması

Spring Boot Dersleri’ne devam ediyorum. Bu yazıda ise Spring Boot kullanarak Apache Kafka örnek bir uygulama yapacağım.

Spring Boot uygulamasına geçmeden önce Apache Kafka’yı tanıyalım.

Apache Kafka’yı Tanıyalım


Apache Kafka mesajlaşma sistemidir. Bu sistem sayesinde verileri platform ve uygulama bağımsız olarak tutan, yazan ve aktaran sistemdir. Özellikler büyük dataların güvenli şekilde aktarılmasını sağlayan bu yapı fazla kullanılmaktadır.

Apache Kafka için bazı terimleri bilmemiz gerekmektedir.

  • Publisher : Mesajı gönderen yerdir.
  • Topic, Mesajların tutulduğu yerdir.
  • Producer, Topic’lere veriyi yazan yerdir.
  • Partition, Topiclerden bir araya gelip oluşturduğu yapı,
  • Broker, Partitionlar bir araya gelip oluşturduğu yapı,
  • Consumer, mesajları okuyanlara denir.
  • Subscriber, Mesajı alan yerdir.

Spring Boot & Apache Kafka Uygulama Örneği


Spring initializer kullanarak maven uygulamamızı oluşturalım. Bağlımlılık olarak sadece;

  • Web
  • Kafka

bağımlıklarını seçiyoruz.

Örneğimizde Ogrenci modelimizdeki verileri gönderen ve alan bir rest uygulama yapacağız.

application.yaml

Apache kafka kongigürasyonu için application.yaml dosyamıza aşağıdakileri yazalım ve inceleyelim.

spring:
   kafka:
     consumer:
        bootstrap-servers: localhost:9090
        group-id: group_id
        auto-offset-reset: earliest
        key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
        value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
     producer:
        bootstrap-servers: localhost:9090
        key-serializer: org.apache.kafka.common.serialization.StringSerializer
        value-serializer: org.apache.kafka.common.serialization.StringSerializer

Mesaj yazan (producer) ve Mesaj okuyan (consumer) ayarlarını yapıyoruz.

  • bootstrap-servers: Apache Kafka servisinin çalışacağı portu belirliyoruz.
  • key – value serializer: Verinin key, value alanlarının serializer’i hangi sınıf üzerinden implemente edilmesi gerektiğini belirtiyoruz.
model

Modelimizi oluşturalım.

public class Ogrenci {
    private String adi;
    private int numarasi;

    public Ogrenci(String adi, int numarasi) {
        this.adi = adi;
        this.numarasi = numarasi;
    }

    public String getAdi() {
        return adi;
    }

    public void setAdi(String adi) {
        this.adi = adi;
    }

    public int getNumarasi() {
        returProducer Servisin numarasi;
    }

    public void setNumarasi(int numarasi) {
        this.numarasi = numarasi;
    }
}
Producer Servisi

Mesaj verisini yazacak olan servisimiz oluşturalım.

@Service
public class Producer {

    private static final Logger logger = LoggerFactory.getLogger(Producer.class);
    private static final String TOPIC = "ogrenciler";

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    public void sendMessage(String message) {
        logger.info(String.format("# -> Producer mesajı -> %s", message));
        this.kafkaTemplate.send(TOPIC, message);
    }
}

Producer servisimizde TOPIC’den gelen ogrenci verisini kafka templatemize mesajımızla birlikte yazıyoruz. Bu yazılan veriyi ise Consumer servisimizden okuyacağız.

Consumer Servisi

Mesaj verisini okuyacak olan servimizi oluşturalım.

@Service
public class Consumer {

    private final Logger logger = LoggerFactory.getLogger(Producer.class);

    @KafkaListener(topics = "ogrenciler", groupId = "group_id")
    public void consume(String message) throws IOException {
        logger.info(String.format("# -> Consumer mesajı -> %s", message));
    }
}

Kafka listenelerimiz topiclerin değeri ogrenciler olan verileri okumaktadır. Servisimiz konuya abone olarak logger’ımıza yazmasını sağlamakatayız.

Rest Controller

Kafkamız çalışıyor. Bu çalışmayı görüp ve değer göndererek yakalamak için rest servisimizi yazaım.

@RestController
@RequestMapping(value = "/kafkaController")
public class KafkaController {

    private final Producer producer;

    @Autowired
    KafkaController(Producer producer) {
        this.producer = producer;
    }

    @PostMapping(value = "/yayin")
    public void sendMessageToKafkaTopic(@RequestParam("mesaj") String mesaj) {
        this.producer.sendMessage(mesaj);
    }

}

yayin urlsinden mesaj değeri gönderek kafka servisine değerimizi gönderip logger’dan okuyabiliriz.


Projenin Kaynak Kodlarına Ulaş


Önceki Ders: Uygulama Yerelleştirme
Spring Boot Dersleri
Sonraki Ders: Java Bean Validation Kullanmak

DAHA FAZLASI:Apache Kafka

İlgini Çekebilir

YORUM YAP

E-posta hesabınız yayımlanmayacak. Gerekli alanlar * ile işaretlenmişlerdir