大数据开发学习Day38

发布时间:2026/5/22 12:54:09

大数据开发学习Day38 一、Linux1. 查看 HDFS 目录层级及文件大小hdfs dfs-du-h/user/hive/warehouse/dwd.db-du -h 人性化单位展示 HDFS 目录占用空间快速判断数仓分层表数据量大小判断是否需要归档清理日常数仓存储容量巡检必备2. 筛选日志中指定时间段报错日志grep2026-05-21 10:/opt/spark/logs/spark.log|grep-ierror先限定时间范围再过滤错误信息精准定位指定时段 Spark、YARN 任务报错比全量检索效率高数十倍排错最快用法3. 后台运行 Python 脚本并输出日志nohuppython3 etl_task.pytask_run.log21nohup 离线后台运行断开终端不终止进程task_run.log 正常日志写入文件21 错误日志同步写入日志文件大数据离线 Python 清洗脚本通用启动方式二、SQL通用表结构dwd_traffic_log 流量日志user_id,net_type,visit_time,stay_time,dtdwd_finance_trade 金融交易表trade_id,user_id,trade_money,trade_type,trade_timedwd_product_flow 商品流量表goods_id,expose_num,click_num,sale_num,dt1. 统计不同网络类型用户平均停留时长SELECTnet_type,ROUND(AVG(stay_time),2)avg_stay_time,COUNT(DISTINCTuser_id)user_cntFROMdwd_traffic_logWHEREdt2026-05-21GROUPBYnet_type;net_type 区分 4G、5G、WiFi、有线网络AVG(stay_time) 计算用户页面平均停留时长同时统计对应网络下活跃用户数用途移动端 APP 体验优化、流量场景分析2. 金融统计用户单日最大单笔交易、累计交易金额SELECTuser_id,MAX(trade_money)max_single_money,SUM(trade_money)total_trade_moneyFROMdwd_finance_tradeWHEREdt2026-05-21GROUPBYuser_id;MAX 取出单日单笔最高交易额SUM 汇总用户全天交易总额金融风控、用户资产评级、大额交易监控核心 SQL可延伸用于识别异常大额转账用户3. 计算商品点击率、转化率SELECTgoods_id,expose_num,click_num,sale_num,ROUND(click_num/expose_num,4)click_rate,ROUND(sale_num/click_num,4)convert_rateFROMdwd_product_flowWHEREdt2026-05-21;曝光量→点击量→成交量三层漏斗点击率 点击数 / 曝光数转化率 成交量 / 点击数电商运营商品爆款筛选、投放效果核心指标三、Pyspark今日重点PySpark Structured Streaming 实时流读取 Kafka 完整实战业务场景实时消费用户行为、订单、设备上报数据流做实时统计、实时预警核心依赖提交任务必须携带 Kafka 连接依赖包完整可运行代码frompyspark.sqlimportSparkSessionfrompyspark.sql.functionsimportfrom_json,colfrompyspark.sql.typesimportStringType,StructType,StructField# 1. 初始化Spark会话开启流处理依赖sparkSparkSession.builder \.appName(KafkaStreamRead)\.getOrCreate()# 2. 读取Kafka实时数据流kafka_dfspark.readStream \.format(kafka)\.option(kafka.bootstrap.servers,192.168.1.100:9092)\.option(subscribe,user_behavior_topic)\.option(startingOffsets,latest)\.load()# 3. 定义JSON数据结构schemaStructType([StructField(user_id,StringType()),StructField(event_type,StringType()),StructField(event_time,StringType())])# 4. 解析Kafka JSON数据stream_dfkafka_df.select(from_json(col(value).cast(string),schema).alias(data))\.select(data.*)# 5. 控制台输出测试querystream_df.writeStream \.outputMode(append)\.format(console)\.start()query.awaitTermination()format(“kafka”) 指定数据源为 Kafka 流subscribe 订阅指定主题支持多个主题逗号分隔startingOffsets latest 读取最新数据earliest 从头读取历史数据Kafka 原始数据为二进制必须转为 string 再解析 JSONfrom_json 按照预定义结构拆分字段append 追加模式只输出新增数据流处理最常用输出模式生产实战要点线上禁止console输出写入 ClickHouse/Hive/Kafka流处理必须设置水印处理乱序数据实时任务开启checkpoint断点续跑防止丢数.option(checkpointLocation,/hdfs/checkpoint/stream_task)三种输出模式append、complete、update 适用场景Kafka 流消费丢数、重复消费解决方案实时任务断点续传原理四、算法合并两个有序数组defmerge(nums1,m,nums2,n):i,j,km-1,n-1,mn-1whilei0andj0:ifnums1[i]nums2[j]:nums1[k]nums1[i]i-1else:nums1[k]nums2[j]j-1k-1nums1[:j1]nums2[:j1]思路精讲倒序双指针合并避免数组元素覆盖从两个数组末尾开始比较取值放入末尾剩余元素直接批量填充大数据用途离线分片有序数据合并日志有序时间数据合并排序数据分区有序重组高频思想

相关新闻