Spring Boot ile Apache Kafka Uygulaması
Spring Boot Dersleri’ne devam ediyorum. Bu yazıda ise Spring Boot kullanarak Apache Kafka örnek bir uygulama yapacağım.
Spring Boot uygulamasına geçmeden önce Apache Kafka’yı tanıyalım.
Apache Kafka’yı Tanıyalım
Apache Kafka mesajlaşma sistemidir. Bu sistem sayesinde verileri platform ve uygulama bağımsız olarak tutan, yazan ve aktaran sistemdir. Özellikler büyük dataların güvenli şekilde aktarılmasını sağlayan bu yapı fazla kullanılmaktadır.
Apache Kafka için bazı terimleri bilmemiz gerekmektedir.
- Publisher : Mesajı gönderen yerdir.
- Topic, Mesajların tutulduğu yerdir.
- Producer, Topic’lere veriyi yazan yerdir.
- Partition, Topiclerden bir araya gelip oluşturduğu yapı,
- Broker, Partitionlar bir araya gelip oluşturduğu yapı,
- Consumer, mesajları okuyanlara denir.
- Subscriber, Mesajı alan yerdir.
Spring Boot & Apache Kafka Uygulama Örneği
Spring initializer kullanarak maven uygulamamızı oluşturalım. Bağlımlılık olarak sadece;
- Web
- Kafka
bağımlıklarını seçiyoruz.
Örneğimizde Ogrenci modelimizdeki verileri gönderen ve alan bir rest uygulama yapacağız.
application.yaml
Apache kafka kongigürasyonu için application.yaml dosyamıza aşağıdakileri yazalım ve inceleyelim.
spring:
kafka:
consumer:
bootstrap-servers: localhost:9090
group-id: group_id
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
producer:
bootstrap-servers: localhost:9090
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
Code language: CSS (css)
Mesaj yazan (producer) ve Mesaj okuyan (consumer) ayarlarını yapıyoruz.
- bootstrap-servers: Apache Kafka servisinin çalışacağı portu belirliyoruz.
- key – value serializer: Verinin key, value alanlarının serializer’i hangi sınıf üzerinden implemente edilmesi gerektiğini belirtiyoruz.
model
Modelimizi oluşturalım.
public class Ogrenci {
private String adi;
private int numarasi;
public Ogrenci(String adi, int numarasi) {
this.adi = adi;
this.numarasi = numarasi;
}
public String getAdi() {
return adi;
}
public void setAdi(String adi) {
this.adi = adi;
}
public int getNumarasi() {
returProducer Servisin numarasi;
}
public void setNumarasi(int numarasi) {
this.numarasi = numarasi;
}
}
Code language: JavaScript (javascript)
Producer Servisi
Mesaj verisini yazacak olan servisimiz oluşturalım.
@Service
public class Producer {
private static final Logger logger = LoggerFactory.getLogger(Producer.class);
private static final String TOPIC = "ogrenciler";
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void sendMessage(String message) {
logger.info(String.format("# -> Producer mesajı -> %s", message));
this.kafkaTemplate.send(TOPIC, message);
}
}
Code language: JavaScript (javascript)
Producer servisimizde TOPIC’den gelen ogrenci verisini kafka templatemize mesajımızla birlikte yazıyoruz. Bu yazılan veriyi ise Consumer servisimizden okuyacağız.
Consumer Servisi
Mesaj verisini okuyacak olan servimizi oluşturalım.
@Service
public class Consumer {
private final Logger logger = LoggerFactory.getLogger(Producer.class);
@KafkaListener(topics = "ogrenciler", groupId = "group_id")
public void consume(String message) throws IOException {
logger.info(String.format("# -> Consumer mesajı -> %s", message));
}
}
Code language: JavaScript (javascript)
Kafka listenelerimiz topiclerin değeri ogrenciler olan verileri okumaktadır. Servisimiz konuya abone olarak logger’ımıza yazmasını sağlamakatayız.
Rest Controller
Kafkamız çalışıyor. Bu çalışmayı görüp ve değer göndererek yakalamak için rest servisimizi yazaım.
@RestController
@RequestMapping(value = "/kafkaController")
public class KafkaController {
private final Producer producer;
@Autowired
KafkaController(Producer producer) {
this.producer = producer;
}
@PostMapping(value = "/yayin")
public void sendMessageToKafkaTopic(@RequestParam("mesaj") String mesaj) {
this.producer.sendMessage(mesaj);
}
}
Code language: JavaScript (javascript)
yayin urlsinden mesaj değeri gönderek kafka servisine değerimizi gönderip logger’dan okuyabiliriz.
No Comment! Be the first one.