别再死记硬背命令了!用Docker Compose 5分钟搞定Kafka单机环境(附Java生产者/消费者代码)

发布时间:2026/5/16 18:30:26

别再死记硬背命令了!用Docker Compose 5分钟搞定Kafka单机环境(附Java生产者/消费者代码) 5分钟极速搭建Kafka开发环境Docker Compose与Java实战指南每次想测试Kafka功能都要先折腾Zookeeper配置还在为复杂的Kafka命令行参数头疼作为开发者我们的时间应该花在核心业务逻辑上而不是反复查阅安装文档。本文将带你用Docker Compose一键部署完整的Kafka单机环境并附上可直接运行的Java生产者和消费者代码示例。1. 为什么选择Docker Compose部署Kafka传统方式部署Kafka需要经历以下繁琐步骤单独安装和配置Zookeeper下载并解压Kafka二进制包修改server.properties等配置文件依次启动Zookeeper和Kafka服务测试服务可用性而使用Docker Compose方案你只需要version: 3 services: zookeeper: image: zookeeper:3.7 ports: - 2181:2181 kafka: image: bitnami/kafka:3.1 ports: - 9092:9092 environment: - KAFKA_CFG_ZOOKEEPER_CONNECTzookeeper:2181 - ALLOW_PLAINTEXT_LISTENERyes depends_on: - zookeeper两相对比Docker方案的优势显而易见对比维度传统方式Docker Compose部署时间15-30分钟5分钟配置复杂度高低环境一致性差高资源占用高可控清理便捷性困难一键删除提示对于开发测试环境使用Docker部署Kafka可以节省大量时间且不会影响宿主机环境。2. 完整环境搭建步骤2.1 准备工作确保你的系统已经安装Docker Engine 20.10Docker Compose 2.0JDK 8用于运行Java示例检查Docker是否正常运行docker --version docker-compose --version2.2 编写docker-compose.yml创建项目目录并添加以下内容version: 3.8 services: zookeeper: image: zookeeper:3.7 container_name: zookeeper ports: - 2181:2181 environment: ZOO_MY_ID: 1 ZOO_SERVERS: server.1zookeeper:2888:3888;2181 networks: - kafka-net kafka: image: bitnami/kafka:3.1 container_name: kafka ports: - 9092:9092 - 29092:29092 environment: - KAFKA_CFG_ZOOKEEPER_CONNECTzookeeper:2181 - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAPPLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT - KAFKA_CFG_LISTENERSPLAINTEXT://:9092,PLAINTEXT_HOST://:29092 - KAFKA_CFG_ADVERTISED_LISTENERSPLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092 - KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLEtrue - ALLOW_PLAINTEXT_LISTENERyes depends_on: - zookeeper networks: - kafka-net networks: kafka-net: driver: bridge关键配置说明KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLEtrue允许自动创建Topic配置了两个监听器方便容器内外访问使用自定义网络kafka-net确保服务间通信2.3 启动服务在包含docker-compose.yml的目录下执行docker-compose up -d验证服务状态docker-compose ps应该看到类似输出Name Command State Ports -------------------------------------------------------------------- kafka /opt/bitnami/scripts/kafka/run.sh Up 0.0.0.0:9092-9092/tcp, 0.0.0.0:29092-29092/tcp zookeeper /docker-entrypoint.sh zkSe ... Up 0.0.0.0:2181-2181/tcp, 2888/tcp, 3888/tcp3. Java客户端实战3.1 生产者示例代码创建Maven项目并添加依赖dependency groupIdorg.apache.kafka/groupId artifactIdkafka-clients/artifactId version3.1.0/version /dependency生产者完整代码import org.apache.kafka.clients.producer.*; import java.util.Properties; public class SimpleProducer { public static void main(String[] args) { Properties props new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, localhost:29092); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringSerializer); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringSerializer); // 提高可靠性配置 props.put(ProducerConfig.ACKS_CONFIG, all); props.put(ProducerConfig.RETRIES_CONFIG, 3); ProducerString, String producer new KafkaProducer(props); for (int i 0; i 10; i) { ProducerRecordString, String record new ProducerRecord(test-topic, key- i, value- i); producer.send(record, (metadata, exception) - { if (exception null) { System.out.printf(发送成功: topic%s, partition%d, offset%d%n, metadata.topic(), metadata.partition(), metadata.offset()); } else { exception.printStackTrace(); } }); } producer.close(); } }3.2 消费者示例代码import org.apache.kafka.clients.consumer.*; import java.time.Duration; import java.util.Collections; import java.util.Properties; public class SimpleConsumer { public static void main(String[] args) { Properties props new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, localhost:29092); props.put(ConsumerConfig.GROUP_ID_CONFIG, test-group); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringDeserializer); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringDeserializer); // 从最早的消息开始消费 props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, earliest); // 关闭自动提交改为手动提交 props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); KafkaConsumerString, String consumer new KafkaConsumer(props); consumer.subscribe(Collections.singletonList(test-topic)); try { while (true) { ConsumerRecordsString, String records consumer.poll(Duration.ofMillis(100)); for (ConsumerRecordString, String record : records) { System.out.printf(收到消息: topic%s, partition%d, offset%d, key%s, value%s%n, record.topic(), record.partition(), record.offset(), record.key(), record.value()); } // 手动异步提交offset consumer.commitAsync(); } } finally { consumer.close(); } } }4. 常见问题排查4.1 连接问题如果遇到连接问题检查以下方面端口映射是否正确docker-compose ps netstat -tuln | grep 29092防火墙设置sudo ufw allow 29092 sudo ufw allow 9092 sudo ufw allow 2181Kafka日志查看docker logs kafka4.2 性能调优建议对于开发环境可以调整以下参数environment: - KAFKA_CFG_NUM_PARTITIONS3 - KAFKA_CFG_DEFAULT_REPLICATION_FACTOR1 - KAFKA_CFG_LOG_RETENTION_HOURS72 - KAFKA_CFG_LOG_SEGMENT_BYTES10737418244.3 数据持久化如果需要保留Kafka数据添加卷挂载kafka: volumes: - ./kafka_data:/bitnami/kafka zookeeper: volumes: - ./zookeeper_data:/data在实际项目中我发现配置KAFKA_CFG_ADVERTISED_LISTENERS最容易出错。如果Java客户端无法连接通常是因为广告地址配置不正确。建议先用docker exec -it kafka bash进入容器内部尝试用kafka-console-producer.sh测试基本功能是否正常。

相关新闻