Skip to content

Commit

Permalink
Spring Boot整合Kafka
Browse files Browse the repository at this point in the history
  • Loading branch information
wuyouzhuguli committed Mar 26, 2019
1 parent adc5715 commit e1a7c76
Show file tree
Hide file tree
Showing 8 changed files with 307 additions and 0 deletions.
41 changes: 41 additions & 0 deletions 54.Spring-Boot-Kafka/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.3.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.example</groupId>
<artifactId>demo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>demo</name>
<description>Demo project for Spring Boot</description>

<properties>
<java.version>1.8</java.version>
</properties>

<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package com.example.demo;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class KafkaApplication {

public static void main(String[] args) {
SpringApplication.run(KafkaApplication.class, args);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package com.example.demo.config;

import com.example.demo.domain.Message;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import org.springframework.kafka.support.serializer.JsonSerializer;

import java.util.HashMap;
import java.util.Map;

/**
* @author MrBird
*/
@EnableKafka
@Configuration
public class KafkaConsumerConfig {

@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;

@Value("${spring.kafka.consumer.group-id}")
private String consumerGroupId;

@Value("${spring.kafka.consumer.auto-offset-reset}")
private String autoOffsetReset;

@Bean
public ConsumerFactory<String, Message> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapServers);
props.put(
ConsumerConfig.GROUP_ID_CONFIG,
consumerGroupId);
props.put(
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
autoOffsetReset);
// props.put(
// ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
// StringDeserializer.class);
// props.put(
// ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
// StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(
props,
new StringDeserializer(),
new JsonDeserializer<>(Message.class));
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, Message> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Message> factory
= new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
// factory.setRecordFilterStrategy(
// r -> r.value().contains("fuck")
// );
return factory;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package com.example.demo.config;

import com.example.demo.domain.Message;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonSerializer;

import java.util.HashMap;
import java.util.Map;

/**
* @author MrBird
*/
@Configuration
public class KafkaProducerConfig {

@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;

@Bean
public ProducerFactory<String, Message> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapServers);
configProps.put(
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
configProps.put(
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}

@Bean
public KafkaTemplate<String, Message> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package com.example.demo.controller;

import com.example.demo.domain.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;

/**
* @author MrBird
*/
@RestController
public class SendMessageController {

private Logger logger = LoggerFactory.getLogger(this.getClass());

@Autowired
// private KafkaTemplate<String, String> kafkaTemplate;
private KafkaTemplate<String, Message> kafkaTemplate;

// @GetMapping("send/{message}")
// public void send(@PathVariable String message) {
// this.kafkaTemplate.send("test", message);
// ListenableFuture<SendResult<String, String>> future = this.kafkaTemplate.send("test", message);
// future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
// @Override
// public void onSuccess(SendResult<String, String> result) {
// logger.info("成功发送消息:{},offset=[{}]", message, result.getRecordMetadata().offset());
// }
//
// @Override
// public void onFailure(Throwable ex) {
// logger.error("消息:{} 发送失败,原因:{}", message, ex.getMessage());
// }
// });
// }

@GetMapping("send/{message}")
public void sendMessage(@PathVariable String message) {
this.kafkaTemplate.send("test", new Message("mrbird", message));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package com.example.demo.domain;

import java.io.Serializable;
import java.time.LocalTime;

/**
* @author MrBird
*/
public class Message implements Serializable {
private static final long serialVersionUID = 6678420965611108427L;

private String from;

private String message;

public Message() {
}

public Message(String from, String message) {
this.from = from;
this.message = message;
}

@Override
public String toString() {
return "Message{" +
"from='" + from + '\'' +
", message='" + message + '\'' +
'}';
}

public String getFrom() {
return from;
}

public void setFrom(String from) {
this.from = from;
}

public String getMessage() {
return message;
}

public void setMessage(String message) {
this.message = message;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package com.example.demo.listener;

import com.example.demo.domain.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.PartitionOffset;
import org.springframework.kafka.annotation.TopicPartition;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;

/**
* @author MrBird
*/
@Component
public class KafkaMessageListener {

private Logger logger = LoggerFactory.getLogger(this.getClass());

// @KafkaListener(topics = "test", groupId = "test-consumer")
// @KafkaListener(groupId = "test-consumer",
// topicPartitions = @TopicPartition(topic = "test",
// partitionOffsets = {
// @PartitionOffset(partition = "0", initialOffset = "0")
// }))
// public void listen(@Payload String message,
// @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
// logger.info("接收消息: {},partition:{}", message, partition);
// }

@KafkaListener(topics = "test", groupId = "test-consumer")
public void listen(Message message) {
logger.info("接收消息: {}", message);
}

}
6 changes: 6 additions & 0 deletions 54.Spring-Boot-Kafka/src/main/resources/application.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
spring:
kafka:
bootstrap-servers: localhost:9092
consumer:
group-id: test-consumer
auto-offset-reset: latest

0 comments on commit e1a7c76

Please sign in to comment.