Java Spring Boot Complex Examples

  1. Home
  2. Docs
  3. Java Spring Boot Complex Examples
  4. REST API and Kafka Message Publishing

REST API and Kafka Message Publishing

? 40 minutes

What you’ll learn

How to setup your application for : 

  • connecting to Kafka and publishing messages to its’ topic,
  • getting data from REST API, 
  • providing data to REST API. 

In this tutorial, we will create a simple java component with java spring-boot scaffolder. We want to expose a single REST endpoint for getting client data. Client data is provided by another REST component client-data-db, so we need to configure a spring rest call for it. Any access to client data should be logged in the Kafka topic, so we need Kafka client configuration as well.

Project source

This example project can be cloned from: [email protected]:innobank/client-data-service.git

Prerequisites

Steps

Open your IDE, import created component and start coding:

  • Define the message payload. Here is an example of the Client, which is a simple POJO with basic client data:
    • generate getters and setters with your IDE
package io.codenow.client.data.service.model;

import java.time.LocalDate;

public class Client {
    private String username;
    private String firstname;
    private String surname;
    private LocalDate birthdate;

}
  • Next prepare the configuration for the Kafka logging client:
<dependency>
  <groupId>org.springframework.kafka</groupId>
  <artifactId>spring-kafka</artifactId>
</dependency>
	
@Value("${kafka.broker.url}") private String kafkaBrokerUrl;

    @Bean
    public ProducerFactory<String, String> producerFactory() {
    	return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    @Bean
    public Map<String, Object> producerConfigs() {
    	Map<String, Object> props = new HashMap<>();
    	props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBrokerUrl);
    	props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    	props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    	// See https://kafka.apache.org/documentation/#producerconfigs for more properties
    	return props;
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
    	return new KafkaTemplate<String, String>(producerFactory());
    }
package io.codenow.client.data.service.controller;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.reactive.function.client.WebClient;

import io.codenow.client.data.service.model.Client;
import reactor.core.publisher.Flux;

@RestController
@RequestMapping("/data")
public class ClientDataController {
    private static final Logger LOG = LoggerFactory.getLogger(ClientDataController.class);

    private String clientDataDBURL;
    private String kafkaTopicName;
    private String kafkaTopicKey;
    private KafkaTemplate<String, String> kafkaTemplate;


    public ClientDataController(@Value("${endpoint.client.data.db}") String clientDataDBURL, @Value("${kafka.topic.name}") String kafkaTopicName, KafkaTemplate<String, String> kafkaTemplate, @Value("${kafka.topic.key}") String kafkaTopicKey) {
   	 super();
   	 this.clientDataDBURL = clientDataDBURL;
   	 this.kafkaTopicName = kafkaTopicName;
   	 this.kafkaTemplate = kafkaTemplate;
   	 this.kafkaTopicKey = kafkaTopicKey;
    }

    @GetMapping("/{username}")
    private Flux<Client> getClientData(@PathVariable String username) {
   	 LOG.info("Get data for username: {}", username);
   	 kafkaTemplate.send(kafkaTopicName, kafkaTopicKey, username);

   	 Flux<Client> clientFlux = WebClient.create().get().uri(clientDataDBURL + "/db/clients/" + username).retrieve()
   			 .bodyToFlux(Client.class);

   	 clientFlux.subscribe(client -> LOG.info(client.toString()));
   	 return clientFlux;

    }
}
  • Last but not least, append the configuration for Kafka to config/application.yaml
    • Note that this configuration depends on your local development setup for Kafka and can be different case-by-case.
    • Make sure you follow yaml syntax (especially whitespaces)
endpoint:
  client:
	data:
  	db: http://client-data-db
kafka:
  broker:
	url: client-logging-kafka-kafka-brokers.managed-components:9092
  topic:
	name: client-logging
	key: client-data-service
  • Try to build and run the application in your IDE. After startup, you should be able to access your new controller’s swagger: http://localhost:8080/swagger/index.html

What’s next?

If your code works in local development, you are ready to push your changes to GIT and try to build and deploy your new component version to the CodeNOW environment. For more information see Application Deployment and Monitoring, just make sure to change application.yaml properties from the local to the production setup.

Was this article helpful to you? Yes No