Spring Boot y Apache Kafka: Plataforma de streaming distribuido. Envío y recepción de mensajes

Prerequisitos

- Tener instalado y corriendo Apache Kafka

- Apache Kafka utiliza un servidor de Zookeeper, así que:

if (tienes un servidor Zookeeper instalado)

     utiliza ese servidor;

else

     Apache Kafka trae un script para levantar uno. /*Te indico abajo como arrancarlo desde la distribución de Kafka*/

 

Consulta la documentación de Apache Kafka para mayor detalle, pero la forma de levantar el servicio rápidamente es:

1. Cambiarse al directorio donde instalaste Apache Kafka.

NOTA: Si no estás en windows quítale la palabra windows y la extensión .bat 

2. Arranca Zookeeper (en caso de que no tengas uno previamente instalado)

     bin/windows/zookeeper-server-start.bat config/zookeeper.properties

3. Arranca Apache Kafka

     bin/windows/kafka-server-start.bat config/server.properties

 

IMPORTANTE

Te puedes clonar el código directamente del repositorio en github o descárgatelo en formato zip.

 

1. Configuración Maven

<groupId>com.ledzedev</groupId>
<artifactId>ledze-spring-kafka</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>

<name>ledze-spring-kafka</name>
<description>Ledze Demo project for Spring Boot with Apache Kafka</description>

<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.4.3.RELEASE</version>
</parent>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
</properties>

<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

 

2. Código Java

2.1 Recibir mensajes

/**
* Source code generated by Gerado Pucheta Figueroa
* Twitter: @ledzedev
* http://ledze.mx
* 25/01/2017
*/
@Test
public void pruebaReceptorDeMensajes() throws Exception {

logger.info("\n\n\n");
logger.info("Inicia contenedor de mensajes...");

ContainerProperties containerProps = new ContainerProperties("topicoLedze", "topic2");
containerProps.setMessageListener((MessageListener<Integer, String>) message -> {
logger.info("recibido: " + message);
});

KafkaMessageListenerContainer<Integer, String> container = createContainer(containerProps);
container.setBeanName("testAuto");
container.start();
//container.stop();
}


private KafkaMessageListenerContainer<Integer, String> createContainer(ContainerProperties containerProps) {
Map<String, Object> props = consumerProps();
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<Integer, String>(props);
KafkaMessageListenerContainer<Integer, String> container = new KafkaMessageListenerContainer<>(cf, containerProps);
return container;
}

private Map<String, Object> consumerProps() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "grupoLedze");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return props;
}

Al ejecutar el consumidor de mensajes se pueden ver todos los parámetros de configuración.



2.2 Enviar mensajes

/**
* Source code generated by Gerado Pucheta Figueroa
* Twitter: @ledzedev
* http://ledze.mx
* 25/01/2017
*/
@Test
public void pruebaEnviaMensajes(){

logger.info("\n\n\n");
logger.info("Inicia el envío de mensajes...");

KafkaTemplate<Integer, String> template = createTemplate();
template.setDefaultTopic("topicoLedze");

ListenableFuture<SendResult<Integer, String>> future1 = template.sendDefault(0, "foo");
this.addListenableFutureCallback(future1);

ListenableFuture<SendResult<Integer, String>> future2 = template.sendDefault(2, "bar");
this.addListenableFutureCallback(future2);

ListenableFuture<SendResult<Integer, String>> future3 = template.sendDefault(0, "baz");
this.addListenableFutureCallback(future3);

ListenableFuture<SendResult<Integer, String>> future4 = template.sendDefault(2, "qux");
this.addListenableFutureCallback(future4);

template.flush();

}
private void addListenableFutureCallback(ListenableFuture<SendResult<Integer, String>> future){
future.addCallback(new ListenableFutureCallback<SendResult<Integer, String>>() {
@Override
public void onFailure(Throwable motivoFalla) {
logger.info("EL ENVÍO DEL MENSAJE FALLÓ.", motivoFalla);
}
@Override
public void onSuccess(SendResult<Integer, String> integerStringSendResult) {
logger.info("ENVÍO DE MENSAJE EXITOSO! Mensaje=" + integerStringSendResult.getProducerRecord().value());
}
});
}

private
KafkaTemplate<Integer, String> createTemplate() {

Map<String, Object> senderProps = senderProps();
ProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<Integer, String>(senderProps);
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf);
return template;
}

private Map<String, Object> senderProps() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.RETRIES_CONFIG, 0);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}

Al ejecutar el envío de mensajes se pueden ver los parámetros de configuración del sender.

 

 

3. Resultados

 Después de ejecutar los tests varias veces alcanzamos a verificar que la recepción de las respuestas son ASÍNCRONAS, justo como configuramos nuestro callback en el envío de mensajes.