forked from wuyouzhuguli/SpringAll
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
adc5715
commit e1a7c76
Showing
8 changed files
with
307 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,41 @@ | ||
<?xml version="1.0" encoding="UTF-8"?> | ||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" | ||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> | ||
<modelVersion>4.0.0</modelVersion> | ||
<parent> | ||
<groupId>org.springframework.boot</groupId> | ||
<artifactId>spring-boot-starter-parent</artifactId> | ||
<version>2.1.3.RELEASE</version> | ||
<relativePath/> <!-- lookup parent from repository --> | ||
</parent> | ||
<groupId>com.example</groupId> | ||
<artifactId>demo</artifactId> | ||
<version>0.0.1-SNAPSHOT</version> | ||
<name>demo</name> | ||
<description>Demo project for Spring Boot</description> | ||
|
||
<properties> | ||
<java.version>1.8</java.version> | ||
</properties> | ||
|
||
<dependencies> | ||
<dependency> | ||
<groupId>org.springframework.boot</groupId> | ||
<artifactId>spring-boot-starter-web</artifactId> | ||
</dependency> | ||
<dependency> | ||
<groupId>org.springframework.kafka</groupId> | ||
<artifactId>spring-kafka</artifactId> | ||
</dependency> | ||
</dependencies> | ||
|
||
<build> | ||
<plugins> | ||
<plugin> | ||
<groupId>org.springframework.boot</groupId> | ||
<artifactId>spring-boot-maven-plugin</artifactId> | ||
</plugin> | ||
</plugins> | ||
</build> | ||
|
||
</project> |
13 changes: 13 additions & 0 deletions
13
54.Spring-Boot-Kafka/src/main/java/com/example/demo/KafkaApplication.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,13 @@ | ||
package com.example.demo; | ||
|
||
import org.springframework.boot.SpringApplication; | ||
import org.springframework.boot.autoconfigure.SpringBootApplication; | ||
|
||
@SpringBootApplication | ||
public class KafkaApplication { | ||
|
||
public static void main(String[] args) { | ||
SpringApplication.run(KafkaApplication.class, args); | ||
} | ||
|
||
} |
69 changes: 69 additions & 0 deletions
69
54.Spring-Boot-Kafka/src/main/java/com/example/demo/config/KafkaConsumerConfig.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,69 @@ | ||
package com.example.demo.config; | ||
|
||
import com.example.demo.domain.Message; | ||
import org.apache.kafka.clients.consumer.ConsumerConfig; | ||
import org.apache.kafka.common.serialization.StringDeserializer; | ||
import org.springframework.beans.factory.annotation.Value; | ||
import org.springframework.context.annotation.Bean; | ||
import org.springframework.context.annotation.Configuration; | ||
import org.springframework.kafka.annotation.EnableKafka; | ||
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; | ||
import org.springframework.kafka.core.ConsumerFactory; | ||
import org.springframework.kafka.core.DefaultKafkaConsumerFactory; | ||
import org.springframework.kafka.support.serializer.JsonDeserializer; | ||
import org.springframework.kafka.support.serializer.JsonSerializer; | ||
|
||
import java.util.HashMap; | ||
import java.util.Map; | ||
|
||
/** | ||
* @author MrBird | ||
*/ | ||
@EnableKafka | ||
@Configuration | ||
public class KafkaConsumerConfig { | ||
|
||
@Value("${spring.kafka.bootstrap-servers}") | ||
private String bootstrapServers; | ||
|
||
@Value("${spring.kafka.consumer.group-id}") | ||
private String consumerGroupId; | ||
|
||
@Value("${spring.kafka.consumer.auto-offset-reset}") | ||
private String autoOffsetReset; | ||
|
||
@Bean | ||
public ConsumerFactory<String, Message> consumerFactory() { | ||
Map<String, Object> props = new HashMap<>(); | ||
props.put( | ||
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, | ||
bootstrapServers); | ||
props.put( | ||
ConsumerConfig.GROUP_ID_CONFIG, | ||
consumerGroupId); | ||
props.put( | ||
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, | ||
autoOffsetReset); | ||
// props.put( | ||
// ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, | ||
// StringDeserializer.class); | ||
// props.put( | ||
// ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, | ||
// StringDeserializer.class); | ||
return new DefaultKafkaConsumerFactory<>( | ||
props, | ||
new StringDeserializer(), | ||
new JsonDeserializer<>(Message.class)); | ||
} | ||
|
||
@Bean | ||
public ConcurrentKafkaListenerContainerFactory<String, Message> kafkaListenerContainerFactory() { | ||
ConcurrentKafkaListenerContainerFactory<String, Message> factory | ||
= new ConcurrentKafkaListenerContainerFactory<>(); | ||
factory.setConsumerFactory(consumerFactory()); | ||
// factory.setRecordFilterStrategy( | ||
// r -> r.value().contains("fuck") | ||
// ); | ||
return factory; | ||
} | ||
} |
45 changes: 45 additions & 0 deletions
45
54.Spring-Boot-Kafka/src/main/java/com/example/demo/config/KafkaProducerConfig.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,45 @@ | ||
package com.example.demo.config; | ||
|
||
import com.example.demo.domain.Message; | ||
import org.apache.kafka.clients.producer.ProducerConfig; | ||
import org.apache.kafka.common.serialization.StringSerializer; | ||
import org.springframework.beans.factory.annotation.Value; | ||
import org.springframework.context.annotation.Bean; | ||
import org.springframework.context.annotation.Configuration; | ||
import org.springframework.kafka.core.DefaultKafkaProducerFactory; | ||
import org.springframework.kafka.core.KafkaTemplate; | ||
import org.springframework.kafka.core.ProducerFactory; | ||
import org.springframework.kafka.support.serializer.JsonSerializer; | ||
|
||
import java.util.HashMap; | ||
import java.util.Map; | ||
|
||
/** | ||
* @author MrBird | ||
*/ | ||
@Configuration | ||
public class KafkaProducerConfig { | ||
|
||
@Value("${spring.kafka.bootstrap-servers}") | ||
private String bootstrapServers; | ||
|
||
@Bean | ||
public ProducerFactory<String, Message> producerFactory() { | ||
Map<String, Object> configProps = new HashMap<>(); | ||
configProps.put( | ||
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, | ||
bootstrapServers); | ||
configProps.put( | ||
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, | ||
StringSerializer.class); | ||
configProps.put( | ||
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, | ||
JsonSerializer.class); | ||
return new DefaultKafkaProducerFactory<>(configProps); | ||
} | ||
|
||
@Bean | ||
public KafkaTemplate<String, Message> kafkaTemplate() { | ||
return new KafkaTemplate<>(producerFactory()); | ||
} | ||
} |
48 changes: 48 additions & 0 deletions
48
54.Spring-Boot-Kafka/src/main/java/com/example/demo/controller/SendMessageController.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,48 @@ | ||
package com.example.demo.controller; | ||
|
||
import com.example.demo.domain.Message; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
import org.springframework.beans.factory.annotation.Autowired; | ||
import org.springframework.kafka.core.KafkaTemplate; | ||
import org.springframework.kafka.support.SendResult; | ||
import org.springframework.util.concurrent.ListenableFuture; | ||
import org.springframework.util.concurrent.ListenableFutureCallback; | ||
import org.springframework.web.bind.annotation.GetMapping; | ||
import org.springframework.web.bind.annotation.PathVariable; | ||
import org.springframework.web.bind.annotation.RestController; | ||
|
||
/** | ||
* @author MrBird | ||
*/ | ||
@RestController | ||
public class SendMessageController { | ||
|
||
private Logger logger = LoggerFactory.getLogger(this.getClass()); | ||
|
||
@Autowired | ||
// private KafkaTemplate<String, String> kafkaTemplate; | ||
private KafkaTemplate<String, Message> kafkaTemplate; | ||
|
||
// @GetMapping("send/{message}") | ||
// public void send(@PathVariable String message) { | ||
// this.kafkaTemplate.send("test", message); | ||
// ListenableFuture<SendResult<String, String>> future = this.kafkaTemplate.send("test", message); | ||
// future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() { | ||
// @Override | ||
// public void onSuccess(SendResult<String, String> result) { | ||
// logger.info("成功发送消息:{},offset=[{}]", message, result.getRecordMetadata().offset()); | ||
// } | ||
// | ||
// @Override | ||
// public void onFailure(Throwable ex) { | ||
// logger.error("消息:{} 发送失败,原因:{}", message, ex.getMessage()); | ||
// } | ||
// }); | ||
// } | ||
|
||
@GetMapping("send/{message}") | ||
public void sendMessage(@PathVariable String message) { | ||
this.kafkaTemplate.send("test", new Message("mrbird", message)); | ||
} | ||
} |
47 changes: 47 additions & 0 deletions
47
54.Spring-Boot-Kafka/src/main/java/com/example/demo/domain/Message.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,47 @@ | ||
package com.example.demo.domain; | ||
|
||
import java.io.Serializable; | ||
import java.time.LocalTime; | ||
|
||
/** | ||
* @author MrBird | ||
*/ | ||
public class Message implements Serializable { | ||
private static final long serialVersionUID = 6678420965611108427L; | ||
|
||
private String from; | ||
|
||
private String message; | ||
|
||
public Message() { | ||
} | ||
|
||
public Message(String from, String message) { | ||
this.from = from; | ||
this.message = message; | ||
} | ||
|
||
@Override | ||
public String toString() { | ||
return "Message{" + | ||
"from='" + from + '\'' + | ||
", message='" + message + '\'' + | ||
'}'; | ||
} | ||
|
||
public String getFrom() { | ||
return from; | ||
} | ||
|
||
public void setFrom(String from) { | ||
this.from = from; | ||
} | ||
|
||
public String getMessage() { | ||
return message; | ||
} | ||
|
||
public void setMessage(String message) { | ||
this.message = message; | ||
} | ||
} |
38 changes: 38 additions & 0 deletions
38
54.Spring-Boot-Kafka/src/main/java/com/example/demo/listener/KafkaMessageListener.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,38 @@ | ||
package com.example.demo.listener; | ||
|
||
import com.example.demo.domain.Message; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
import org.springframework.kafka.annotation.KafkaListener; | ||
import org.springframework.kafka.annotation.PartitionOffset; | ||
import org.springframework.kafka.annotation.TopicPartition; | ||
import org.springframework.kafka.support.KafkaHeaders; | ||
import org.springframework.messaging.handler.annotation.Header; | ||
import org.springframework.messaging.handler.annotation.Payload; | ||
import org.springframework.stereotype.Component; | ||
|
||
/** | ||
* @author MrBird | ||
*/ | ||
@Component | ||
public class KafkaMessageListener { | ||
|
||
private Logger logger = LoggerFactory.getLogger(this.getClass()); | ||
|
||
// @KafkaListener(topics = "test", groupId = "test-consumer") | ||
// @KafkaListener(groupId = "test-consumer", | ||
// topicPartitions = @TopicPartition(topic = "test", | ||
// partitionOffsets = { | ||
// @PartitionOffset(partition = "0", initialOffset = "0") | ||
// })) | ||
// public void listen(@Payload String message, | ||
// @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) { | ||
// logger.info("接收消息: {},partition:{}", message, partition); | ||
// } | ||
|
||
@KafkaListener(topics = "test", groupId = "test-consumer") | ||
public void listen(Message message) { | ||
logger.info("接收消息: {}", message); | ||
} | ||
|
||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,6 @@ | ||
spring: | ||
kafka: | ||
bootstrap-servers: localhost:9092 | ||
consumer: | ||
group-id: test-consumer | ||
auto-offset-reset: latest |