
https://juejin.im/post/5ddf5659518825782d599641#heading-1)https://github.com/Snailclimb/JavaGuide/blob/master/docs/system-design/data-communication/Kafka%E5%85%A5%E9%97%A8%E7%9C%8B%E8%BF%99%E4%B8%80%E7%AF%87%E5%B0%B1%E5%A4%9F%E4%BA%86.md多线程消费kafka的时候开发、测试环境都能每秒10w,但是正式环境只能1w/s正式环境不能重启看怎么调试参考答案基准测试。观察 网络和磁盘的读写实时与历史曲线观察文件句柄/内存的使用情况。观察系统patch 基础库/运行时状态。kafka的架构kafka在zookeeper中的存储结构参考这位大神https://blog.csdn.net/zzhongcy/article/details/1288491781、启动bin/kafka-server-start.sh config/server.properties2、创建Topicbin/kafka-topics.sh--create--zookeeperlocalhost:2181 --replication-factor3--partitions3--topictopic1如果出现zookeeper is not a recognized option则代表kafka版本过高将命令换成bin/kafka-topics.sh--create--bootstrap-server localhost:9092 --replication-factor3--partitions3--topictopic1查看topic副本信息bin/kafka-topics.sh--describe--bootstrap-server localhost:9092 --replication-factor3--partitions3--topictopic1生成者发送消息bin/kafka-console-producer.sh --broker-list localhost:9092--topictopic1带keybin/kafka-console-producer.sh --broker-list localhost:9092--topictopic1--propertyparse.keytrue消费者消费消息bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic topic1kafka-console-consumer.sh 脚本是一个简易的消费者控制台。该 shell 脚本的功能通过调用 kafka.tools 包下的ConsoleConsumer类并将提供的命令行参数全部传给该类实现。1.手动插入数据kafka-console-producer.sh --broker-list 192.168.104.91:9092,192.168.104.92:9092,192.168.104.93:9092 --topic capture_test2. 消息消费kafka-console-consumer.sh --bootstrap-server 192.168.104.91:9092,192.168.104.92:9092,192.168.104.93:9092 --topic topicName kafka-console-consumer.sh --zookeeper 192.168.104.91:2181,192.168.104.92:2181,192.168.104.93:2181 --topic topicName表示从 latest 位移位置开始消费该主题的所有分区消息即仅消费正在写入的消息。3.从开始位置消费kafka-console-consumer.sh --bootstrap-server 192.168.104.91:9092,192.168.104.92:9092,192.168.104.93:9092 --from-beginning --topic topicNamekafka-console-consumer.sh --zookeeper 192.168.104.91:2181,192.168.104.92:2181,192.168.104.93:2181 --from-beginning --topic topicName4.显示key消费kafka-console-consumer.sh --bootstrap-server 192.168.104.91:9092,192.168.104.92:9092,192.168.104.93:9092 --property print.keytrue --topic topicName kafka-console-consumer.sh --zookeeper 192.168.104.91:2181,192.168.104.92:2181,192.168.104.93:2181 --property print.keytrue --topic topicName消费出的消息结果将打印出消息体的 key 和 value。参数值类型说明有效值–topicstring被消费的topic–whiteliststring正则表达式指定要包含以供使用的主题的白名单–partitioninteger指定分区 除非指定’–offset’否则从分区结束(latest)开始消费–offsetstring执行消费的起始offset位置 默认值:latestlatest earliest–consumer-propertystring将用户定义的属性以keyvalue的形式传递给使用者–consumer.configstring消费者配置属性文件 请注意[consumer-property]优先于此配置–formatterstring用于格式化kafka消息以供显示的类的名称 默认值:kafka.tools.DefaultMessageFormatterkafka.tools.DefaultMessageFormatter kafka.tools.LoggingMessageFormatter kafka.tools.NoOpMessageFormatter kafka.tools.ChecksumMessageFormatter–propertystring初始化消息格式化程序的属性print.timestamptrue|false print.keytrue|false print.valuetrue|false key.separatorkey.separator line.separatorline.separator key.deserializerkey.deserializer value.deserializervalue.deserializer–from-beginning从存在的最早消息开始而不是从最新消息开始–max-messagesinteger消费的最大数据量若不指定则持续消费下去–timeout-msinteger在指定时间间隔内没有消息可用时退出–skip-message-on-error如果处理消息时出错请跳过它而不是暂停–bootstrap-serverstring必需(除非使用旧版本的消费者)要连接的服务器–key-deserializerstring–value-deserializerstring–enable-systest-events除记录消费的消息外还记录消费者的生命周期 (用于系统测试)–isolation-levelstring设置为read_committed以过滤掉未提交的事务性消息 设置为read_uncommitted以读取所有消息 默认值:read_uncommitted–groupstring指定消费者所属组的ID–blackliststring要从消费中排除的主题黑名单–csv-reporter-enabled如果设置将启用csv metrics报告器–delete-consumer-offsets如果指定则启动时删除zookeeper中的消费者信息–metrics-dirstring输出csv度量值 需与[csv-reporter-enable]配合使用–zookeeperstring必需(仅当使用旧的使用者时)连接zookeeper的字符串。 可以给出多个URL以允许故障转移springboot整合kafka产生者server.port8080#制定kafka代理地址 spring.kafka.bootstrap-serverslocalhost:9092#消息发送失败重试次数 spring.kafka.producer.retries0#每次批量发送消息的数量 spring.kafka.producer.batch-size16384#每次批量发送消息的缓冲区大小 spring.kafka.producer.buffer-memory335554432# 指定消息key和消息体的编解码方式 spring.kafka.producer.key-serializerorg.apache.kafka.common.serialization.StringSerializerspring.kafka.producer.value-serializerorg.apache.kafka.common.serialization.StringSerializerAutowiredprivateKafkaTemplatekafkaTemplate;publicvoidsendMsg(){for(inti0;i20;i){Stringmsg[ i ]hello, local Thread.currentThread().getName();// kafkaTemplate.send(topic1, msg);kafkaTemplate.send(topic1,i,msg);}}还可以取到commit的回调publicvoidsendMsg(){for(inti0;i20;i){Stringmsg[ i ]hello, local Thread.currentThread().getName();// kafkaTemplate.send(topic1, msg);// kafkaTemplate.send(topic1, i, msg);ListenableFuturelistenableFuturekafkaTemplate.send(topic1,i,msg);listenableFuture.addCallback(newListenableFutureCallback(){OverridepublicvoidonFailure(Throwablethrowable){log.info(error send);}OverridepublicvoidonSuccess(Objecto){log.info(succ send:{},o.toString());}});}}消费者# 指定默认消费者group id spring.kafka.consumer.group-iduser-log-group spring.kafka.consumer.auto-offset-resetearliest spring.kafka.consumer.enable-auto-committruespring.kafka.consumer.auto-commit-interval100# 指定消息key和消息体的编解码方式 spring.kafka.consumer.key-deserializerorg.apache.kafka.common.serialization.StringDeserializerspring.kafka.consumer.value-deserializerorg.apache.kafka.common.serialization.StringDeserializerComponentpublicclassUserLogConsumer{KafkaListener(topics{userLog})publicvoidconsumer(ConsumerRecordconsumerRecord){OptionalkafkaMsgOptional.ofNullable(consumerRecord.value());if(kafkaMsg.isPresent()){ObjectmsgkafkaMsg.get();System.err.println(msg);}}}生成者详细说明#################consumer的配置参数开始################# #如果enable.auto.commit为true则消费者偏移自动提交给Kafka的频率以毫秒为单位默认值为5000。 spring.kafka.consumer.auto-commit-interval;#当Kafka中没有初始偏移量或者服务器上不再存在当前偏移量时该怎么办默认值为latest表示自动将偏移重置为最新的偏移量 #可选的值为latest,earliest,none spring.kafka.consumer.auto-offset-resetlatest;#以逗号分隔的主机端口对列表用于建立与Kafka群集的初始连接。 spring.kafka.consumer.bootstrap-servers;#ID在发出请求时传递给服务器;用于服务器端日志记录。 spring.kafka.consumer.client-id;#如果为true则消费者的偏移量将在后台定期提交默认值为truespring.kafka.consumer.enable-auto-committrue;#如果没有足够的数据立即满足“fetch.min.bytes”给出的要求服务器在回答获取请求之前将阻塞的最长时间以毫秒为单位 #默认值为500spring.kafka.consumer.fetch-max-wait;#服务器应以字节为单位返回获取请求的最小数据量默认值为1对应的kafka的参数为fetch.min.bytes。 spring.kafka.consumer.fetch-min-size;#用于标识此使用者所属的使用者组的唯一字符串。 spring.kafka.consumer.group-id;#心跳与消费者协调员之间的预期时间以毫秒为单位默认值为3000spring.kafka.consumer.heartbeat-interval;#密钥的反序列化器类实现类实现了接口org.apache.kafka.common.serialization.Deserializerspring.kafka.consumer.key-deserializerorg.apache.kafka.common.serialization.StringDeserializer#值的反序列化器类实现类实现了接口org.apache.kafka.common.serialization.Deserializerspring.kafka.consumer.value-deserializerorg.apache.kafka.common.serialization.StringDeserializer#一次调用poll()操作时返回的最大记录数默认值为500spring.kafka.consumer.max-poll-records;#################consumer的配置参数结束################# #################producer的配置参数开始################# #procedure要求leader在考虑完成请求之前收到的确认数用于控制发送记录在服务端的持久化其值可以为如下 #acks0如果设置为零则生产者将不会等待来自服务器的任何确认该记录将立即添加到套接字缓冲区并视为已发送。在这种情况下无法保证服务器已收到记录并且重试配置将不会生效因为客户端通常不会知道任何故障为每条记录返回的偏移量始终设置为-1。 #acks1这意味着leader会将记录写入其本地日志但无需等待所有副本服务器的完全确认即可做出回应在这种情况下如果leader在确认记录后立即失败但在将数据复制到所有的副本服务器之前则记录将会丢失。 #acksall 这意味着leader将等待完整的同步副本集以确认记录这保证了只要至少一个同步副本服务器仍然存活记录就不会丢失这是最强有力的保证这相当于acks-1的设置。 #可以设置的值为all,-1,0,1spring.kafka.producer.acks1#每当多个记录被发送到同一分区时生产者将尝试将记录一起批量处理为更少的请求 #这有助于提升客户端和服务器上的性能此配置控制默认批量大小以字节为单位默认值为16384spring.kafka.producer.batch-size16384#以逗号分隔的主机端口对列表用于建立与Kafka群集的初始连接 spring.kafka.producer.bootstrap-servers #生产者可用于缓冲等待发送到服务器的记录的内存总字节数默认值为33554432spring.kafka.producer.buffer-memory33554432#ID在发出请求时传递给服务器用于服务器端日志记录 spring.kafka.producer.client-id #生产者生成的所有数据的压缩类型此配置接受标准压缩编解码器gzipsnappylz4 #它还接受uncompressed以及producer分别表示没有压缩以及保留生产者设置的原始压缩编解码器 #默认值为producer spring.kafka.producer.compression-typeproducer #key的Serializer类实现类实现了接口org.apache.kafka.common.serialization.Serializerspring.kafka.producer.key-serializerorg.apache.kafka.common.serialization.StringSerializer#值的Serializer类实现类实现了接口org.apache.kafka.common.serialization.Serializerspring.kafka.producer.value-serializerorg.apache.kafka.common.serialization.StringSerializer#如果该值大于零时表示启用重试失败的发送次数 spring.kafka.producer.retries #################producer的配置参数结束################# #################listener的配置参数结束################# #侦听器的AckMode,参见https://docs.spring.io/spring-kafka/reference/htmlsingle/#committing-offsets #当enable.auto.commit的值设置为false时该值会生效为true时不会生效 spring.kafka.listener.ack-mode;#在侦听器容器中运行的线程数 spring.kafka.listener.concurrency;#轮询消费者时使用的超时以毫秒为单位 spring.kafka.listener.poll-timeout;#当ackMode为“COUNT”或“COUNT_TIME”时偏移提交之间的记录数 spring.kafka.listener.ack-count;#当ackMode为“TIME”或“COUNT_TIME”时偏移提交之间的时间以毫秒为单位 spring.kafka.listener.ack-time;#################listener的配置参数结束#################为了方便管理partition还可以在broker上装个 Kafka Manager可视化界面Kafka Partition分区是什么一句话Partition 是 Kafka 中 Topic 的最小存储单元用来实现分布式、高并发和高可用。最通俗的理解一个 Topic主题 就像一个 “消息队列 / 文件夹”这个文件夹会被拆分成多个小文件这些小文件就是 Partition分区消息会被分散存到不同分区里不同分区可以放在不同机器上分区有什么用提高并发生产者、消费者可以同时读写不同分区速度成倍提升。分布式存储一个 Topic 数据太大一台机器放不下分到多台机器。高可用每个分区可以有多个副本Replica一台挂了不影响使用。消息怎么进分区默认策略带 Key → 对 Key 哈希取模分到固定分区不带 Key → 轮询分发到各个分区同一个 Key 的消息永远在同一个 Partition 里保证顺序。简单总结Topic消息分类PartitionTopic 的分片分布式存储单元作用快、大、稳高吞吐、大容量、高可用