kafka
前言
最近在做接口交互, 要推数据到另一个系统, 然后考虑到数据的大小和对边项目中的情况, 就决定用kafka. 原本是打算用rabbitmq的, 组件服务是对面提供, 结果他们没有, 就采用他们现有的消息中间件kafka了, 这里记录一下kafka的原理和使用情况
正文
参考文章:1.大白话 kafka 架构原理 , 2.卡夫卡快速入门指南 , 3.从面试角度一文学完 Kafka 4. Kafka到底会不会丢消息? 5. SpringBoot集成kafka全面实战
介绍
Kafka最初由Linkedin公司开发,是一个分布式的、分区的、多副本的、多订阅者,基于zookeeper协调的分布式日志系统(也可以当做MQ系统),常用于web/nginx日志、访问日志、消息服务等等
为什么kafka这么多人用呢? 可以看看他的一些特点
- 分布式架构:Kafka采用分布式架构,将数据分散存储在多个Broker节点上。这样可以将负载分摊到多个节点上,提高整体的处理能力和吞吐量
- 分区机制:Kafka将每个主题划分为多个分区,每个分区可以在不同的Broker上进行复制和存储。这样可以实现数据的并行处理和负载均衡,提高读写吞吐量
- 副本机制:Kafka支持将每个分区的数据复制到多个副本中,以提供数据冗余和高可用性。副本可以在不同的Broker上进行存储,如果某个Broker发生故障,仍然可以从其他副本中读取数据
- 批量处理:Kafka支持批量处理消息,可以将多个消息一起发送或读取,减少网络开销和提高效率。通过调整批量处理的大小,可以根据需求平衡延迟和吞吐量
- 零拷贝技术:Kafka使用零拷贝技术来提高数据的传输效率。它避免了将数据从内核空间复制到用户空间的开销,直接在内核空间进行数据传输,减少了CPU和内存的开销。
- 磁盘顺序写:Kafka将数据以追加方式写入磁盘,而不是随机写入。这种磁盘顺序写的方式可以提高写入性能,甚至比操作内存的速度还快, 同时也有利于磁盘的顺序读取,提高读取性能
kafka是一个消息组件, 因为它自身的特性 , 很适合大量数据推送的情况 . 不过要想它高性能, 高可用 ,最好是部署集群环境, 而集群环境的情况下就得结合上zookeeper 了( 它是一个分布式协调服务,用于管理和同步分布式系统中的各个进程 ), 简单的说就是管理kafka集群每个节点都是健康的, 如果挂了一个就需要安排其他的顶上. 所以看到kafka基本都是配合zookeeper 在一起用
消息中间件无非都是这样的结构, 生产者->消息中间件->消费者, 不同的只是中间件的结构
kafka主体结构
要了解kafka , 就需要先知道它自身的结构
- broker: 上图可以看出, kafka 集群包含一个或多个服务器,每个服务器节点称为一个broker
- topic: 每条发布到kafka集群的消息都有一个类别,这个类别称为topic,同一个topic的数据既可以在同一个broker上也可以在不同的broker结点上. 一个broker可以有任意数量个topic
- partition: 分区
- 每个topic被物理划分为一个或多个分区,每个分区在物理上对应一个文件夹(就像一个大文件夹下面的再次分类),该文件夹里面存储了这个分区的所有消息和索引文件。在创建topic时可指定parition数量,生产者将消息发送到topic时,消息会根据 分区策略 追加到分区文件的末尾,属于顺序写磁盘,因此效率非常高(经验证,顺序写磁盘效率比随机写内存还要高,这是Kafka高吞吐率的一个很重要的保证)
- 以上图为例: 在集群的情况下, 创建了partition1 和2 , 系统会根据规则将partition1,2分配, 这里是分配到了节点上boreker1和2 中, 而所有的节点中只有一个是 leader , 其余都是 follower 副本, 读写操作都是 leader ,follower在leader未不可用之前, 都只负责数据备份, 以便 leader 不可用后快速成为新的 leader
- kafka中, 被消费的消息也不会立马删除,在kafka的server.propertise配置文件中定义了数据的保存时间,当文件到设定的保存时间时才会删除 ,log.retention.hours=168 (7天)
其实结构主要就是这三个,但是里面的特性有什么, 重点介绍一下partition的结构:
Kafka 存储在文件系统上,每一个 Partition 最终对应一个目录 , 默认情况下,每一个 Topic 在创建时如果不指定 Partition 数量时只会创建 1 个 Partition。比如,我创建了一个 Topic 名字为 test ,没有指定 Partition 的数量,那么会默认创建一个 test-0 的文件夹, 每一条消息被发送到 Broker 中,会根据 Partition 规则( 自己设置的 )选择被存储到哪一个 Partition (按顺序写入, 同一partition中的数据是有序的,但topic下的多个partition之间在消费数据时不能保证有序性)
这里顺序写入就引出了新的概念: offset
在Kafka中,,offset是一个重要的概念,它是为每条消息分配的一个唯一的编号,表示消息在分区中的顺序位置。Offset是从0开始的,每当有新的消息写入分区时,offset就会加1。Offset是不可变的,即使消息被删除或过期,offset也不会改变或重用
Offset的主要作用有两个:一是用来定位消息。通过指定offset,消费者可以准确地找到分区中的某条消息,或者从某个位置开始消费消息。二是用来记录消费进度。消费者在消费完一条消息后,需要提交offset来告诉Kafka broker自己消费到哪里了。这样,如果消费者发生故障或重启,它可以根据保存的offset来恢复消费状态
offset在Kafka中有两个重要的概念:commit offset和fetch offset
- Commit offset:消费者将已经消费的消息的offset提交给Kafka,以便Kafka知道消费者已经消费了哪些消息。当消费者提交offset后,Kafka就会将该offset标记为已提交,并且在重启后仍然有效
- Fetch offset:消费者从Kafka中获取消息时,会指定一个fetch offset,表示从哪个offset开始获取消息。消费者可以通过不断更新fetch offset来实现消息的持续消费
通过使用offset,Kafka可以实现高效的消息存储和消费。消费者可以根据自己的需求灵活地控制消费的起始位置,并且可以保证在重启后不会丢失已经消费的消息
现在看这个结构图, 应该是能看懂一点了
生产者(Producer)
Producer 发送消息的过程
在生产者中需要注意如下参数, 部分也可以代码中配置:
#Kafka集群的地址,多个地址用逗号分隔
spring.kafka.bootstrap-servers=112.126.xx.xxx:9090,112.126.xx.xxx:9091
###########【初始化生产者配置】###########
# 重试次数
spring.kafka.producer.retries=3
# 应答级别:默认值:0,生产者不会等待任何确认,消息将立即被发送出去,1 代表需要 leader 确认写入它的本地 log 并立即确认,-1 代表 leader 及所有的备份节点都收到消息后确认)(会增加消息的延迟,因为生产者需要等待所有副本节点的确认)。只对 async 模式起作用,这个参数的调整是数据不丢失和发送效率的 tradeoff,如果对数据丢失不敏感而在乎效率的场景可以考虑设置为 0,这样可以大大提高 producer 发送数据的效率(可选0、1、all/-1)
spring.kafka.producer.acks=1
# 批量大小,默认值:200
spring.kafka.producer.batch-size=16384
# 生产者在发送批次之前等待更多消息的时间,(以毫秒为单位)
spring.kafka.producer.properties.linger.ms=0
# 如果设置了这两个参数,就开启了批量发送模式,当生产端积累的消息达到batch-size或接收到消息linger.ms后,生产者就会将消息提交给kafka
# linger.ms为0表示每接收到一条消息就提交给kafka,这时候batch-size其实就没用了
# 生产者用于缓存批次数据的内存大小
spring.kafka.producer.buffer-memory = 33554432
#启用的消息压缩类型。可选值有none、gzip、snappy或lz4
compression-type:gzip
#生产者可以发送的最大消息大小(以字节为单位)
max-request-size:10485760
#生产者客户端的唯一标识符
client-id:my-producer
# Kafka提供的序列化和反序列化类
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
# 自定义分区器
# spring.kafka.producer.properties.partitioner.class=com.felix.kafka.producer.CustomizePartitioner
#开启事务,每个事务将使用一个随机生成的事务ID,这里就指定Kafka事务ID的前缀
transaction-id-prefix: tx_
#用于启用Kafka生产者的幂等性
enable-idempotence: true
代码中使用如下:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
//可选配置,如果提前已经配置好就不需要自行创建
@Configuration
public class KafkaInitialConfiguration {
// 创建一个名为testtopic的Topic并设置分区数为8,分区副本数为2
@Bean
public NewTopic initialTopic() {
return new NewTopic("testtopic",8, (short) 2 );
}
// 如果要修改分区数,只需修改配置值重启项目即可
// 修改分区数并不会导致数据的丢失,但是分区数只能增大不能减小
@Bean
public NewTopic updateTopic() {
return new NewTopic("testtopic",10, (short) 2 );
}
}
//简单的使用生成者
@RestController
public class KafkaProducer {
private final KafkaTemplate<String, String> kafkaTemplate;
@Autowired
public KafkaProducer(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
// 发送消息
public void sendMessage1(@PathVariable("message") String normalMessage) {
//send方法有多个重载,说几个里面的参数:
//1. topic:消息发送的目标主题名称。
//2. key:消息的键,解析后根据键的哈希值将消息路由到正确的分区。
//3. message:要发送的消息对象。
//4. partition:要发送的分区的索引,如果未指定,则使用默认分区策略。
//5. timestamp:消息的时间戳,如果未指定,则使用当前时间戳。
//6. headers:消息的头信息,可以用于存储元数据。
//7. callback:一个回调函数,用于在消息发送完成时执行。它是一个实现了 Callback 接口的对象,通常使用 Lambda 表达式或匿名内部类实现
kafkaTemplate.send("topic1", normalMessage);
}
}
//简单的消费者
@Component
public class KafkaConsumer {
// 消费监听
@KafkaListener(topics = {"topic1"})
public void onMessage1(ConsumerRecord<?, ?> record){
// 消费的哪个topic、partition的消息,打印出消息内容
System.out.println("简单消费:"+record.topic()+"-"+record.partition()+"-"+record.value());
}
}
//有回调的发送,确定消息是否成功发送到集群, 方便做后续的补偿处理, 有两种方式
//方式一
public void sendMessage2(@PathVariable("message") String callbackMessage) {
kafkaTemplate.send("topic1", callbackMessage).addCallback(success -> {
// 消息发送到的topic
String topic = success.getRecordMetadata().topic();
// 消息发送到的分区
int partition = success.getRecordMetadata().partition();
// 消息在分区内的offset
long offset = success.getRecordMetadata().offset();
System.out.println("发送消息成功:" + topic + "-" + partition + "-" + offset);
}, failure -> {
System.out.println("发送消息失败:" + failure.getMessage());
});
}
//方式二
public void sendMessage3(@PathVariable("message") String callbackMessage) {
kafkaTemplate.send("topic1", callbackMessage).addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
@Override
public void onFailure(Throwable ex) {
System.out.println("发送消息失败:"+ex.getMessage());
}
@Override
public void onSuccess(SendResult<String, Object> result) {
System.out.println("发送消息成功:" + result.getRecordMetadata().topic() + "-"
+ result.getRecordMetadata().partition() + "-" + result.getRecordMetadata().offset());
}
});
}
事务的使用, 其实事务还是比较简单的, 可以用本身自带的事务, 也可以用spring的事务
方式一:
@Autowired
private TransactionTemplate transactionTemplate;
//方法被调用时,如果发送消息成功,事务将自动提交;如果发送消息失败,事务将自动回滚
public void sendMessageWithTransaction() {
transactionTemplate.execute(status -> {
// 发送消息的逻辑
kafkaTemplate.send("test-topic", "key", "value");
return null;
});
}
方式二:
public void sendMessage7(){
// 声明事务:后面报错消息不会发出去
kafkaTemplate.executeInTransaction(operations -> {
operations.send("topic1","test executeInTransaction");
throw new RuntimeException("fail");
});
// 不声明事务:后面报错但前面消息已经发送成功了
kafkaTemplate.send("topic1","test executeInTransaction");
throw new RuntimeException("fail");
}
消费者
- 消费者会有一个消费组(consumer group) 的概念, 在上面讲过, kafka中的消息不管有没有被消费过, 都不会消失(文件默认保持是168小时), 那就说明了消息是可以被多次消费的.
消费者是以 consumer group 消费者组的方式工作,由一个或者多个消费者组成一个组, 共同消费一个 topic。每个分区在同一时间只能由 group 中的一个消费者读取,但是多个 group 可以同时消费这个 partition. 每一个分区只能被一个消费组中的一个消费者所消费,所以同一个消费组中消费者的数量如果超过了分区的数量,将会出现有些消费者分配不到消费的分区。所以不建议group中的成员数量比partition多
这也是kafka用来实现一个topic消息的广播和单播的手段,如果需要实现广播,一个consumer group内只放一个消费者即可,要实现单播,将所有的消费者放到同一个consumer group即可
如图:
有两个消费组A和B, 去消费一个topic下的4个partition, 消费组B刚好有四个消费者, 所以能每个消费者消费一个partition, 而消费组A, 原本有两个消费者, 可以平分每个消费两个partition, 但突然C2消费者挂了, 那么其他的 group 成员会自动负载均衡读取之前失败的消费者读取的分区. 触发重平衡(rebalance)操作
-
消费者是一种pull的消费方式, 是主动去拉取, 而非kafka主动推送
-
零拷贝优化
正常的读取过程是包含4次copy操作和2次系统上下文切换,而上下文切换是CPU密集型的工作,数据拷贝是I/O密集型的工作,性能其实非常低效。
零拷贝就是使用了一个名为sendfile()的系统调用方法,将数据从page cache直接发送到Socket缓冲区,避免了系统上下文的切换,消除了从内核空间到用户空间的来回复制。从上图可以看出,"零拷贝"并不是说整个过程完全不发生拷贝,而是站在内核的角度来说的,避免了内核空间到用户空间的来回拷贝
实操,那我们就来看看项目中使用的配置情况
#Kafka集群的地址,多个地址用逗号分隔
spring.kafka.bootstrap-servers=112.126.xx.xxx:9090,112.126.xx.xxx:9091
###########【初始化消费者配置】###########
# 默认的消费组ID,同一个消费者组内的消费者共同消费主题中的消息,从而实现负载均衡。如果不设置,默认使用随机字符串作为消费者组ID
spring.kafka.consumer.properties.group.id=defaultConsumerGroup
# 是否自动提交offset,默认值是true
spring.kafka.consumer.enable-auto-commit=true
# 提交offset延时(接收到消息后多久提交offset)
spring.kafka.consumer.auto.commit.interval.ms=1000
参考:[https://blog.csdn.net/lishuangzhe7047/article/details/74530417]
# earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费;
# latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据;
# none:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据;
spring.kafka.consumer.auto-offset-reset=latest
# 消费会话超时时间(超过这个时间consumer没有发送心跳,就会触发重平衡rebalance操作)
spring.kafka.consumer.properties.session.timeout.ms=120000
# 消费请求超时时间, 超时则抛出异常
spring.kafka.consumer.properties.request.timeout.ms=180000
# Kafka提供的序列化和反序列化类
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
# 消费端监听的topic不存在时,项目启动会报错(关掉)
spring.kafka.listener.missing-topics-fatal=false
# 开启设置批量消费
# spring.kafka.listener.type=batch
# 批量消费每次最多消费多少条消息
# spring.kafka.consumer.max-poll-records=5
# 设置并发数
concurrency: 3
- 使用的话也简单,打开监听器即可 ,配置好相关信息后, 消费组会自动接收kafka消息
@Component
public class KafkaConsumer {
// 消费监听
@KafkaListener(topics = {"topic1"})
public void onMessage1(ConsumerRecord<?, ?> record){
// 消费的哪个topic、partition的消息,打印出消息内容
System.out.println("简单消费:"+record.topic()+"-"+record.partition()+"-"+record.value());
}
}
如果开启了批量消费,每次消费条数则会根据配置的数量消费(要注意,批量的话就得改一下参数了,用List接收:List<ConsumerRecord> record)
而如何开启了并发消费, 需要注意:并发量根据实际分区数决定,必须小于等于分区数,否则会有线程一直处于空闲状态
设置concurrency为3,也就是将会启动3条线程进行监听,而要监听的topic有5个partition,意味着将有2条线程都是分配到2个partition,还有1条线程分配到1个partition
- 指定topic、partition、offset消费
前面我们在监听消费topic1的时候,监听的是topic1上所有的消息,如果我们想指定topic、指定partition、指定offset来消费呢?也很简单,@KafkaListener注解已全部为我们提供
/**
* @Title 指定topic、partition、offset消费
* @Description 同时监听topic1和topic2,监听topic1的0号分区、topic2的 "0号和1号" 分区,指向1号分区的offset初始值为8
* id:消费者ID;
* groupId:消费组ID;
* topics:监听的topic,可监听多个;
* topicPartitions:可配置更加详细的监听信息,可指定topic、parition、offset监听。
* onMessage2监听的含义:监听topic1的0号分区,同时监听topic2的0号分区和topic2的1号分区里面offset从8开始的消息。
* 注意:topics和topicPartitions不能同时使用;
**/
@KafkaListener(id = "consumer1",groupId = "felix-group",topicPartitions = {
@TopicPartition(topic = "topic1", partitions = { "0" }),
@TopicPartition(topic = "topic2", partitions = "0", partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "8"))
})
public void onMessage2(ConsumerRecord<?, ?> record) {
System.out.println("topic:"+record.topic()+"|partition:"+record.partition()+"|offset:"+record.offset()+"|value:"+record.value());
}
3.批量消费
@KafkaListener(id = "consumer2",groupId = "felix-group", topics = "topic1")
public void onMessage3(List<ConsumerRecord<?, ?>> records) {
System.out.println(">>>批量消费一次,records.size()="+records.size());
for (ConsumerRecord<?, ?> record : records) {
System.out.println(record.value());
}
}
4.ConsumerAwareListenerErrorHandler 异常处理器
通过异常处理器,我们可以处理consumer在消费时发生的异常。
新建一个 ConsumerAwareListenerErrorHandler 类型的异常处理方法,用@Bean注入,BeanName默认就是方法名,然后我们将这个异常处理器的BeanName放到@KafkaListener注解的errorHandler属性里面,当监听抛出异常的时候,则会自动调用异常处理器
// 新建一个异常处理器,用@Bean注入
@Bean
public ConsumerAwareListenerErrorHandler consumerAwareErrorHandler() {
return (message, exception, consumer) -> {
System.out.println("消费异常:"+message.getPayload());
return null;
};
}
// 将这个异常处理器的BeanName放到@KafkaListener注解的errorHandler属性里面
@KafkaListener(topics = {"topic1"},errorHandler = "consumerAwareErrorHandler")
public void onMessage4(ConsumerRecord<?, ?> record) throws Exception {
throw new Exception("简单消费-模拟异常");
}
// 批量消费也一样,异常处理器的message.getPayload()也可以拿到各条消息的信息
@KafkaListener(topics = "topic1",errorHandler="consumerAwareErrorHandler")
public void onMessage5(List<ConsumerRecord<?, ?>> records) throws Exception {
System.out.println("批量消费一次...");
throw new Exception("批量消费-模拟异常");
}
5.消息过滤器
消息过滤器可以在消息抵达consumer之前被拦截,在实际应用中,我们可以根据自己的业务逻辑,筛选出需要的信息再交由KafkaListener处理,不需要的消息则过滤掉。
配置消息过滤只需要为 监听器工厂 配置一个RecordFilterStrategy(消息过滤策略),返回true的时候消息将会被抛弃,返回false时,消息能正常抵达监听容器
如下实现了一个”过滤奇数、接收偶数”的过滤策略,我们向topic1发送0-99总共100条消息,监听器只消费了偶数
@Component
public class KafkaConsumer {
@Autowired
ConsumerFactory consumerFactory;
// 消息过滤器
@Bean
public ConcurrentKafkaListenerContainerFactory filterContainerFactory() {
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory);
// 被过滤的消息将被丢弃
factory.setAckDiscarded(true);
// 消息过滤策略
factory.setRecordFilterStrategy(consumerRecord -> {
if (Integer.parseInt(consumerRecord.value().toString()) % 2 == 0) {
return false;
}
//返回true消息则被过滤
return true;
});
return factory;
}
// 消息过滤监听
@KafkaListener(topics = {"topic1"},containerFactory = "filterContainerFactory")
public void onMessage6(ConsumerRecord<?, ?> record) {
System.out.println(record.value());
}
}
6.消息转发
在实际开发中,我们可能有这样的需求,应用A从TopicA获取到消息,经过处理后转发到TopicB,再由应用B监听处理消息,即一个应用处理完成后将该消息转发至其他应用,完成消息的转发。
在SpringBoot集成Kafka实现消息的转发也很简单,只需要通过一个@SendTo注解,被注解方法的return值即转发的消息内容,如下:
/**
* @Title 消息转发
* @Description 从topic1接收到的消息经过处理后转发到topic2
**/
@KafkaListener(topics = {"topic1"})
@SendTo("topic2")
public String onMessage7(ConsumerRecord<?, ?> record) {
return record.value()+"-forward message";
}
7.定时启动、停止监听器
默认情况下,当消费者项目启动的时候,监听器就开始工作,监听消费发送到指定topic的消息,那如果我们不想让监听器立即工作,想让它在我们指定的时间点开始工作,或者在我们指定的时间点停止工作,该怎么处理呢——使用KafkaListenerEndpointRegistry,下面我们就来实现:
① 禁止监听器自启动;
② 创建两个定时任务,一个用来在指定时间点启动定时器,另一个在指定时间点停止定时器;
新建一个定时任务类,用注解@EnableScheduling声明,KafkaListenerEndpointRegistry 在SpringIO中已经被注册为Bean,直接注入,设置禁止KafkaListener自启动
@EnableScheduling
@Component
public class CronTimer {
/**
* @KafkaListener注解所标注的方法并不会在IOC容器中被注册为Bean,
* 而是会被注册在KafkaListenerEndpointRegistry中,
* 而KafkaListenerEndpointRegistry在SpringIOC中已经被注册为Bean
**/
@Autowired
private KafkaListenerEndpointRegistry registry;
@Autowired
private ConsumerFactory consumerFactory;
// 监听器容器工厂(设置禁止KafkaListener自启动)
@Bean
public ConcurrentKafkaListenerContainerFactory delayContainerFactory() {
ConcurrentKafkaListenerContainerFactory container = new ConcurrentKafkaListenerContainerFactory();
container.setConsumerFactory(consumerFactory);
//禁止KafkaListener自启动
container.setAutoStartup(false);
return container;
}
// 监听器
@KafkaListener(id="timingConsumer",topics = "topic1",containerFactory = "delayContainerFactory")
public void onMessage1(ConsumerRecord<?, ?> record){
System.out.println("消费成功:"+record.topic()+"-"+record.partition()+"-"+record.value());
}
// 定时启动监听器
@Scheduled(cron = "0 42 11 * * ? ")
public void startListener() {
System.out.println("启动监听器...");
// "timingConsumer"是@KafkaListener注解后面设置的监听器ID,标识这个监听器
if (!registry.getListenerContainer("timingConsumer").isRunning()) {
registry.getListenerContainer("timingConsumer").start();
}
//registry.getListenerContainer("timingConsumer").resume();
}
// 定时停止监听器
@Scheduled(cron = "0 45 11 * * ? ")
public void shutDownListener() {
System.out.println("关闭监听器...");
registry.getListenerContainer("timingConsumer").pause();
}
}
总结
可以看到, 消息中间件大致模型都类似, 但模式, 过程 , 功能却各有不同, 所以选择合适的才是最重要的. 对于必须严谨的数据交互,并且要实时保证数据的动向, rabbitmq更合适. 而对于大数据量交互, 没有太苛刻的数据监护要求, kafka会稍微合适一点