Kafka Streams实战:从入门到精通

发布时间:2026/5/27 4:59:00

Kafka Streams实战:从入门到精通 Kafka Streams实战从入门到精通引言Kafka Streams是Apache Kafka生态系统中用于构建实时流处理应用的核心库。它提供了轻量级、高效的流处理能力使得开发者能够像编写普通应用程序一样编写复杂的流处理逻辑无需额外部署独立的流处理集群。本文将深入探讨Kafka Streams的架构、API使用、状态管理以及实际应用场景。Kafka Streams基础1.1 什么是Kafka StreamsKafka Streams是一个用于构建实时流处理应用的客户端库具有以下特点轻量级无需独立集群直接嵌入应用程序高扩展性自动处理负载均衡和故障转移低延迟毫秒级处理延迟Exactly-Once语义保证端到端的数据一致性简化开发使用Kafka原生API易于学习和使用!-- Maven依赖 -- dependency groupIdorg.apache.kafka/groupId artifactIdkafka-streams/artifactId version3.6.0/version /dependency1.2 核心概念Kafka Streams有几个核心概念需要理解import org.apache.kafka.streams.*; import org.apache.kafka.streams.kstream.*; import org.apache.kafka.common.serialization.Serdes; import java.util.Arrays; import java.util.Properties; public class KafkaStreamsConcepts { public static void main(String[] args) { Properties props new Properties(); props.put(ApplicationConfig.APPLICATION_ID_CONFIG, streams-wordcount); props.put(bootstrapServersConfig, localhost:9092); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); StreamsBuilder builder new StreamsBuilder(); // 从输入主题读取 KStreamString, String source builder.stream(streams-plaintext-input); // 转换操作 KStreamString, String words source .flatMapValues(value - Arrays.asList(value.toLowerCase().split(\\W))); // 聚合统计 KTableString, Long wordCounts words .groupBy((key, value) - value) .count(Materialized.as(counts)); // 输出到目标主题 wordCounts.toStream().to(streams-wordcount-output); // 构建拓扑 KafkaStreams streams new KafkaStreams( builder.build(), props); streams.start(); } }Kafka Streams架构2.1 拓扑结构Kafka Streams应用程序由处理器拓扑Topology组成拓扑定义了数据流的处理逻辑┌─────────────────────────────────────────────────────┐ │ Processor Topology │ │ │ │ Source Processor │ │ │ │ │ ▼ │ │ Processor 1 ────────────────────► Processor 2 │ │ │ │ │ │ ▼ ▼ │ │ State Store Sink Processor │ │ │ └─────────────────────────────────────────────────────┘public class TopologyExample { public static Topology createWordCountTopology() { StreamsBuilder builder new StreamsBuilder(); // 源处理器读取输入主题 KStreamString, String source builder.stream(word-count-input, Consumed.with(Serdes.String(), Serdes.String())); // 处理器1分词并转换为小写 KStreamString, String lowerCased source .flatMapValues(textLine - Arrays.asList(textLine.toLowerCase().split(\\W))); // 处理器2过滤空字符串 KStreamString, String filtered lowerCased .filter((key, word) - !word.isEmpty()); // 状态处理器聚合统计 KTableString, Long wordCounts filtered .groupBy((key, word) - KeyValue.pair(word, word)) .count(Materialized.as(word-counts-store)); // 处理器3格式化输出 KStreamString, String output wordCounts.toStream() .map((word, count) - KeyValue.pair(word, word - count)); // sink处理器写入输出主题 output.to(word-count-output, Produced.with(Serdes.String(), Serdes.String())); return builder.build(); } }2.2 处理模式Kafka Streams支持两种处理模式public class ProcessingModes { public static void AT_LEAST_ONCE() { Properties props new Properties(); props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.AT_LEAST_ONCE_V2); StreamsBuilder builder new StreamsBuilder(); // 处理逻辑 KafkaStreams streams new KafkaStreams( builder.build(), props); streams.start(); } public static void EXACTLY_ONCE_V2() { Properties props new Properties(); props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2); StreamsBuilder builder new StreamsBuilder(); // 处理逻辑 KafkaStreams streams new KafkaStreams( builder.build(), props); streams.start(); } }KStream与KTable3.1 KStream事件流KStream表示无界的连续事件流每个事件都是独立的public class KStreamOperations { public static void demonstrateKStream() { StreamsBuilder builder new StreamsBuilder(); // 创建KStream KStreamString, Order orders builder.stream(orders, Consumed.with(Serdes.String(), new JsonSerde(Order.class))); // filter操作 KStreamString, Order vipOrders orders .filter((key, order) - order.isVip()); // map操作 KStreamString, OrderConfirm confirmations orders .map((key, order) - KeyValue.pair(key, new OrderConfirm(order))); // flatMap操作 KStreamString, OrderItem items orders .flatMap((key, order) - order.getItems().stream() .map(item - KeyValue.pair(item.getId(), item)) .collect(Collectors.toList())); // branch操作分流 KStreamString, Order[] branches orders.branch( (key, order) - order.getStatus().equals(PENDING), (key, order) - order.getStatus().equals(PROCESSING), (key, order) - order.getStatus().equals(COMPLETED) ); KStreamString, Order pendingOrders branches[0]; KStreamString, Order processingOrders branches[1]; KStreamString, Order completedOrders branches[2]; // merge操作合流 KStreamString, Order allOrders pendingOrders.merge(processingOrders); // 输出 allOrders.to(processed-orders); } }3.2 KTable状态表KTable表示按Key聚合的最新状态视图public class KTableOperations { public static void demonstrateKTable() { StreamsBuilder builder new StreamsBuilder(); // 创建KTable KTableString, User users builder.table(users, Consumed.with(Serdes.String(), new JsonSerde(User.class))); // filter操作 KTableString, User activeUsers users .filter((key, user) - user.isActive()); // mapValues操作 KTableString, String userNames users .mapValues(user - user.getName()); // groupBy操作 KTableString, String userByCity users .groupBy((key, user) - KeyValue.pair(user.getCity(), user.getName())) .reduce((name1, name2) - name1 , name2); // 聚合操作 KTableString, Long orderCounts builder.stream(orders, Consumed.with(Serdes.String(), new JsonSerde(Order.class))) .groupBy((key, order) - KeyValue.pair(order.getUserId(), order)) .count(Materialized.as(order-counts)); // 连接操作 KTableString, UserOrderSummary userSummaries users .join(orderCounts, (user, count) - new UserOrderSummary(user, count)); userSummaries.toStream().to(user-summaries); } }3.3 KStream与KTable交互public class KStreamKTableInteraction { public static void demonstrateInteraction() { StreamsBuilder builder new StreamsBuilder(); // 创建KTable用户信息表 KTableString, User users builder.table(users); // 创建KStream订单事件流 KStreamString, Order orders builder.stream(orders); // KStream与KTable的join KStreamString, EnrichedOrder enrichedOrders orders .join(users, (order, user) - new EnrichedOrder(order, user), Joined.keySerde(Serdes.String()) .withValueSerde(new JsonSerde(Order.class))); enrichedOrders.to(enriched-orders); // KTable与KTable的join KTableString, UserProfile profiles builder.table(profiles); KTableString, UserFullInfo userFullInfo users .join(profiles, (user, profile) - new UserFullInfo(user, profile)); userFullInfo.toStream().to(user-full-info); } }状态管理4.1 状态存储Kafka Streams使用状态存储State Store来保存中间处理结果public class StateStoreExample { public static void demonstrateStateStore() { StreamsBuilder builder new StreamsBuilder(); // 创建源KStream KStreamString, Transaction transactions builder.stream(transactions); // 使用状态存储进行聚合 KTableString, AccountBalance balances transactions .groupBy((key, tx) - KeyValue.pair(tx.getAccountId(), tx)) .aggregate( () - new AccountBalance(), (key, tx, balance) - balance.update(tx), Materialized.String, AccountBalance, StateStoreas(account-balances) .withKeySerde(Serdes.String()) .withValueSerde(new JsonSerde(AccountBalance.class)) .withCachingEnabled() .withLoggingDisabled() ); // 从状态存储查询 ReadOnlyKeyValueStoreString, AccountBalance store null; // 需要从KafkaStreams实例获取 AccountBalance balance store.get(account-123); } public static class AccountBalance { private String accountId; private BigDecimal totalCredit; private BigDecimal totalDebit; public AccountBalance update(Transaction tx) { if (tx.isCredit()) { totalCredit totalCredit.add(tx.getAmount()); } else { totalDebit totalDebit.add(tx.getAmount()); } return this; } public BigDecimal getBalance() { return totalCredit.subtract(totalDebit); } } }4.2 窗口计算public class WindowOperations { public static void demonstrateWindows() { StreamsBuilder builder new StreamsBuilder(); KStreamString, Event events builder.stream(events); // 滚动窗口Tumbling Window KTableWindowedString, Long countByTumblingWindow events.groupBy((key, event) - KeyValue.pair(event.getUserId(), event)) .windowedBy(TimeWindows.of(Duration.ofMinutes(5))) .count(); // 跳跃窗口Hopping Window KTableWindowedString, Long countByHoppingWindow events.groupBy((key, event) - KeyValue.pair(event.getUserId(), event)) .windowedBy(TimeWindows.of(Duration.ofMinutes(5)) .advanceBy(Duration.ofMinutes(1))) .count(); // 会话窗口Session Window KTableWindowedString, Long sessionCounts events.groupBy((key, event) - KeyValue.pair(event.getUserId(), event)) .windowedBy(SessionWindows.with(Duration.ofMinutes(10)) .gap(Duration.ofMinutes(2))) .count(); // 滑动窗口Sliding Window KTableWindowedString, Double slidingAverage events.groupBy((key, event) - KeyValue.pair(event.getMetricName(), event)) .windowedBy(SlidingWindows.of(Duration.ofMinutes(5))) .aggregate( () - new MetricAggregate(), (key, event, agg) - agg.add(event), (agg1, agg2) - agg1.merge(agg2), Materialized.as(sliding-metrics) ); } }时间语义5.1 事件时间处理public class EventTimeProcessing { public static void demonstrateEventTime() { Properties props new Properties(); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); // 使用事件时间 props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class.getName()); StreamsBuilder builder new StreamsBuilder(); // 从主题读取并使用自定义时间戳 KStreamString, Event events builder.stream(events, Consumed.with(Serdes.String(), new JsonSerde(Event.class)) .withTimestampExtractor((record, previousTimestamp) - { Event event (Event) record.value(); return event.getEventTimestamp(); })); // 窗口计算使用事件时间 KTableWindowedString, Long counts events .groupBy((key, event) - KeyValue.pair(event.getType(), event)) .windowedBy(TimeWindows.of(Duration.ofMinutes(5))) .count(); counts.toStream() .foreach((windowedKey, count) - System.out.println(windowedKey.key() [ windowedKey.window().start() - windowedKey.window().end() ] count)); } }5.2 迟到数据处理public class LateDataHandling { public static void demonstrateLateDataHandling() { StreamsBuilder builder new StreamsBuilder(); KStreamString, Event events builder.stream(events); // 定义5分钟窗口允许1分钟迟到 TimeWindows windowSpec TimeWindows.of(Duration.ofMinutes(5)) .grace(Duration.ofMinutes(1)); KTableWindowedString, Long counts events .groupBy((key, event) - KeyValue.pair(event.getUserId(), event)) .windowedBy(windowSpec) .count() .suppress(Suppressed.untilWindowCloses( Suppressed.BufferConfig.unbounded() .shutDownWhenFull())); // 记录被丢弃的迟到数据 events.filter((key, event) - { long now System.currentTimeMillis(); long eventTime event.getEventTimestamp(); return now - eventTime Duration.ofMinutes(6).toMillis(); }).foreach((key, event) - { System.out.println(丢弃迟到数据: event); }); } }实际应用场景6.1 实时数据统计public class RealTimeStatistics { public static void main(String[] args) { Properties props createStreamsConfig(); StreamsBuilder builder new StreamsBuilder(); // 读取原始数据 KStreamString, MetricEvent metrics builder.stream(metrics, Consumed.with(Serdes.String(), new JsonSerde(MetricEvent.class))); // 实时统计每分钟每服务请求数 KTableWindowedString, Long requestsPerMinute metrics .filter((key, metric) - request.equals(metric.getType())) .groupBy((key, metric) - KeyValue.pair(metric.getServiceName(), metric)) .windowedBy(TimeWindows.of(Duration.ofMinutes(1))) .count(); // 计算每分钟平均响应时间 KTableWindowedString, Double avgResponseTime metrics .filter((key, metric) - metric.getResponseTime() 0) .groupBy((key, metric) - KeyValue.pair(metric.getServiceName(), metric)) .windowedBy(TimeWindows.of(Duration.ofMinutes(1))) .aggregate( () - new ResponseTimeAccumulator(), (key, metric, acc) - acc.add(metric.getResponseTime()), (key, acc1, acc2) - acc1.merge(acc2), Materialized.as(avg-response-time) .withValueSerde(new JsonSerde( ResponseTimeAccumulator.class)) ) .mapValues(acc - acc.getAverage()); // 百分位数统计 KTableWindowedString, Percentiles percentiles metrics .groupBy((key, metric) - KeyValue.pair(metric.getServiceName(), metric)) .windowedBy(TimeWindows.of(Duration.ofMinutes(5))) .aggregate( () - new PercentileAccumulator(), (key, metric, acc) - acc.add(metric.getResponseTime()), (key, acc1, acc2) - acc1.merge(acc2), Materialized.as(percentiles) .withValueSerde(new JsonSerde( PercentileAccumulator.class)) ) .mapValues(acc - acc.calculate()); // 输出到目标主题 requestsPerMinute.toStream() .map((key, value) - KeyValue.pair(key.key(), String.format(%s,%d,%d,%d, key.key(), key.window().start(), key.window().end(), value))) .to(stats-requests-per-minute); KafkaStreams streams new KafkaStreams( builder.build(), props); streams.start(); } static class MetricEvent { private String serviceName; private String type; private long responseTime; private long eventTimestamp; public String getServiceName() { return serviceName; } public String getType() { return type; } public long getResponseTime() { return responseTime; } public long getEventTimestamp() { return eventTimestamp; } } static class ResponseTimeAccumulator { private long sum; private long count; public ResponseTimeAccumulator add(long value) { sum value; count; return this; } public ResponseTimeAccumulator merge(ResponseTimeAccumulator other) { sum other.sum; count other.count; return this; } public double getAverage() { return count 0 ? (double) sum / count : 0; } } static class PercentileAccumulator { private ListLong values new ArrayList(); public PercentileAccumulator add(long value) { values.add(value); Collections.sort(values); return this; } public PercentileAccumulator merge(PercentileAccumulator other) { values.addAll(other.values); Collections.sort(values); return this; } public Percentiles calculate() { return new Percentiles( percentile(50), percentile(90), percentile(95), percentile(99) ); } private long percentile(double p) { if (values.isEmpty()) return 0; int index (int) Math.ceil(p / 100.0 * values.size()) - 1; return values.get(Math.max(0, index)); } } static class Percentiles { private long p50, p90, p95, p99; public Percentiles(long p50, long p90, long p95, long p99) { this.p50 p50; this.p90 p90; this.p95 p95; this.p99 p99; } } }6.2 实时告警系统public class RealTimeAlerting { public static void main(String[] args) { StreamsBuilder builder new StreamsBuilder(); KStreamString, SensorReading readings builder.stream(sensors); // 检测异常值 KTableString, Long anomalyCount readings .filter((key, reading) - isAnomaly(reading)) .groupBy((key, reading) - KeyValue.pair(reading.getSensorId(), reading)) .windowedBy(TimeWindows.of(Duration.ofMinutes(5))) .count(); // 触发告警 anomalyCount.toStream() .filter((key, count) - count 3) .map((key, count) - KeyValue.pair(key.key(), new Alert(key.key(), 连续5分钟内检测到 count 次异常, key.window().start()))) .to(alerts); } private static boolean isAnomaly(SensorReading reading) { // 简单的异常检测逻辑 return reading.getValue() 100 || reading.getValue() 0; } static class SensorReading { private String sensorId; private double value; private long timestamp; } static class Alert { private String sensorId; private String message; private long timestamp; } }性能优化7.1 配置优化public class PerformanceOptimization { public static Properties createOptimizedConfig() { Properties props new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, optimized-streams-app); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka1:9092,kafka2:9092,kafka3:9092); // 序列化配置 props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); // 并行度配置 props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 4); props.put(StreamsConfig.PARALLELISM_CONFIG, 3); // 缓存配置 props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 104857600L); // 100MB // 提交配置 props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000); props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1); // 状态存储配置 props.put(StreamsConfig.STATE_DIR_CONFIG, /tmp/kafka-streams); // 处理保证 props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2); return props; } }7.2 状态存储优化public class StateStoreOptimization { public static void optimizeStateStores() { StreamsBuilder builder new StreamsBuilder(); // 启用缓存 KTableString, Long counts builder.stream(events) .groupBy((key, value) - KeyValue.pair(value, value)) .count(Materialized.as(counts) .withCachingEnabled() .withRetention(Duration.ofHours(24))); // RocksDB配置 KTableString, String userProfiles builder.table(profiles, Consumed.with(Serdes.String(), Serdes.String()), Materialized.String, String, KeyValueStoreBytes, byte[]as(profiles-store) .withRocksDBConfigSetter((options, topic, partition) - { options.setCompressionType( org.rocksdb.CompressionType.SNAPPY_COMPRESSION); options.setWriteBufferSize(2 * 1024 * 1024); options.setMaxWriteBufferNumber(3); options.setMaxTotalWalSize(64 * 1024 * 1024); })); } }总结Kafka Streams是一个功能强大的流处理框架能够帮助开发者构建高性能、可靠的实时流处理应用。本文详细介绍了Kafka Streams的核心概念、架构、KStream与KTable操作、状态管理、时间语义以及实际应用场景。通过深入理解这些内容开发者可以更好地应用Kafka Streams构建满足生产环境需求的流处理系统。

相关新闻