Spring Boot y Apache Kafka: Plataforma de streaming distribuido. Envío y recepción de mensajes
- Details
- Created: Wednesday, 25 January 2017 19:34
- Written by Gerardo Pucheta Figueroa
- Hits: 4000
Prerequisitos
- Tener instalado y corriendo Apache Kafka
- Apache Kafka utiliza un servidor de Zookeeper, así que:
if (tienes un servidor Zookeeper instalado)
utiliza ese servidor;
else
Apache Kafka trae un script para levantar uno. /*Te indico abajo como arrancarlo desde la distribución de Kafka*/
Consulta la documentación de Apache Kafka para mayor detalle, pero la forma de levantar el servicio rápidamente es:
1. Cambiarse al directorio donde instalaste Apache Kafka.
NOTA: Si no estás en windows quítale la palabra windows y la extensión .bat
2. Arranca Zookeeper (en caso de que no tengas uno previamente instalado)
bin/windows/zookeeper-server-start.bat config/zookeeper.properties
3. Arranca Apache Kafka
bin/windows/kafka-server-start.bat config/server.properties
IMPORTANTE
Te puedes clonar el código directamente del repositorio en github o descárgatelo en formato zip.
1. Configuración Maven
<groupId>com.ledzedev</groupId>
<artifactId>ledze-spring-kafka</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>ledze-spring-kafka</name>
<description>Ledze Demo project for Spring Boot with Apache Kafka</description>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.4.3.RELEASE</version>
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
2. Código Java
2.1 Recibir mensajes
/**
* Source code generated by Gerado Pucheta Figueroa
* Twitter: @ledzedev
* http://ledze.mx
* 25/01/2017
*/
@Test
public void pruebaReceptorDeMensajes() throws Exception {logger.info("\n\n\n");
logger.info("Inicia contenedor de mensajes...");
ContainerProperties containerProps = new ContainerProperties("topicoLedze", "topic2");
containerProps.setMessageListener((MessageListener<Integer, String>) message -> {
logger.info("recibido: " + message);
});
KafkaMessageListenerContainer<Integer, String> container = createContainer(containerProps);
container.setBeanName("testAuto");
container.start();
//container.stop();
}
private KafkaMessageListenerContainer<Integer, String> createContainer(ContainerProperties containerProps) {
Map<String, Object> props = consumerProps();
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<Integer, String>(props);
KafkaMessageListenerContainer<Integer, String> container = new KafkaMessageListenerContainer<>(cf, containerProps);
return container;
}
private Map<String, Object> consumerProps() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "grupoLedze");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return props;
}
Al ejecutar el consumidor de mensajes se pueden ver todos los parámetros de configuración.
2.2 Enviar mensajes
/**
* Source code generated by Gerado Pucheta Figueroa
* Twitter: @ledzedev
* http://ledze.mx
* 25/01/2017
*/
@Test
public void pruebaEnviaMensajes(){logger.info("\n\n\n");
logger.info("Inicia el envío de mensajes...");
KafkaTemplate<Integer, String> template = createTemplate();
template.setDefaultTopic("topicoLedze");
ListenableFuture<SendResult<Integer, String>> future1 = template.sendDefault(0, "foo");
this.addListenableFutureCallback(future1);
ListenableFuture<SendResult<Integer, String>> future2 = template.sendDefault(2, "bar");
this.addListenableFutureCallback(future2);
ListenableFuture<SendResult<Integer, String>> future3 = template.sendDefault(0, "baz");
this.addListenableFutureCallback(future3);
ListenableFuture<SendResult<Integer, String>> future4 = template.sendDefault(2, "qux");
this.addListenableFutureCallback(future4);
template.flush();
}
private void addListenableFutureCallback(ListenableFuture<SendResult<Integer, String>> future){
future.addCallback(new ListenableFutureCallback<SendResult<Integer, String>>() {
@Override
public void onFailure(Throwable motivoFalla) {
logger.info("EL ENVÍO DEL MENSAJE FALLÓ.", motivoFalla);
}
@Override
public void onSuccess(SendResult<Integer, String> integerStringSendResult) {
logger.info("ENVÍO DE MENSAJE EXITOSO! Mensaje=" + integerStringSendResult.getProducerRecord().value());
}
});
}
private KafkaTemplate<Integer, String> createTemplate() {Map<String, Object> senderProps = senderProps();
ProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<Integer, String>(senderProps);
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf);
return template;
}
private Map<String, Object> senderProps() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.RETRIES_CONFIG, 0);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}
Al ejecutar el envío de mensajes se pueden ver los parámetros de configuración del sender.
3. Resultados
Después de ejecutar los tests varias veces alcanzamos a verificar que la recepción de las respuestas son ASÍNCRONAS, justo como configuramos nuestro callback en el envío de mensajes.