:RDD转换DataFrame与Spark SQL读写数据库)
摘要本文深入讲解Spark SQL中RDD与DataFrame的互转机制包括反射推断模式和编程式定义模式两种转换方式。同时系统讲解Spark SQL通过JDBC连接MySQL数据库的完整流程涵盖依赖导入、数据读取、数据写入等实战操作配合完整的Scala代码示例和常见错误排查。一、RDD转换DataFrameSpark提供了两种方法将RDD转换为DataFrame利用反射机制推断RDD模式以及使用编程方式定义RDD模式。两种方法各有适用场景开发者可根据实际情况选择。1.1 方法一利用反射机制推断RDD模式原理通过定义case class利用Spark的隐式转换机制自动将RDD[CaseClass]转换为DataFrame。核心特点简洁高效代码量少自动推断字段名和类型必须使用case class普通class不支持case class必须定义在main方法之外完整代码实现importorg.apache.spark.rdd.RDDimportorg.apache.spark.sql.{DataFrame,SparkSession}objectRDDToDFByReflection{// case class必须放到main方法之外伴生对象下// 因为隐式转换时会通过 伴生对象名.case类名 来调用caseclassPerson(name:String,age:Long)defmain(args:Array[String]):Unit{valsparkSparkSession.builder().master(local[*]).appName(RDD-To-DF-Reflection).getOrCreate()// 导入隐式转换这里的spark是SparkSession对象不是org.apache.spark包importspark.implicits._// 1. 读取文本文件解析为RDD[Person]valrdd:RDD[Person]spark.sparkContext.textFile(data/sql/people.txt).map(lineline.split(,)).map(tPerson(t(0).trim,t(1).trim.toLong))// 2. 隐式转换RDD[Person] - DataFramevaldf:DataFramerdd.toDF()// 3. 注册临时视图执行SQL查询df.createOrReplaceTempView(people)spark.sql(SELECT * FROM people WHERE age 20).show()spark.stop()}}输入数据people.txtTom, 21 Mike, 25 Andy, 18运行结果------- |name|age| ------- | Tom| 21| |Mike| 25| -------关键注意事项注意点说明错误后果case class位置必须放在main方法之外伴生对象下编译报错找不到case类implicits导入import spark.implicits._中的spark是SparkSession对象导入错误将无法隐式转换数据类型匹配case class字段类型需与数据匹配类型转换异常空值处理数值类型建议用Long/DoubleInt可能溢出数据流转图解文本文件: Tom, 21 Mike, 25 Andy, 18 ↓ textFile map map RDD[Person]: Person(Tom, 21) Person(Mike, 25) Person(Andy, 18) ↓ toDF() (隐式转换) DataFrame: ------- |name|age| ------- | Tom| 21| |Mike| 25| |Andy| 18| -------1.2 方法二使用编程方式定义RDD模式原理通过StructType定义Schema表头通过Row定义每条记录最后调用createDataFrame将两者拼接。核心特点无需定义case class更灵活适合动态Schema场景字段不确定代码稍繁琐但不易出错运行时类型安全完整代码实现importorg.apache.spark.rdd.RDDimportorg.apache.spark.sql.{DataFrame,Row,SparkSession}importorg.apache.spark.sql.types.{IntegerType,StringType,StructField,StructType}objectRDDToDFByProgramming{defmain(args:Array[String]):Unit{valsparkSparkSession.builder().master(local[*]).appName(RDD-To-DF-Programming).getOrCreate()// Step 1: 制作表头 - 定义Schema结构valschema:StructTypeStructType(Array(StructField(name,StringType,nullabletrue),StructField(age,IntegerType,nullabletrue)))// Step 2: 制作表中记录 - 读取文件生成RDD[Row]valrowRDD:RDD[Row]spark.sparkContext.textFile(data/sql/people.txt).map(_.split(,)).map(attrRow(attr(0).trim,attr(1).trim.toInt))// Step 3: 拼接表头和记录 - 创建DataFramevalpeopleDF:DataFramespark.createDataFrame(rowRDD,schema)// 注册临时视图并查询peopleDF.createOrReplaceTempView(people)spark.sql(SELECT * FROM people WHERE age 20).show()spark.stop()}}运行结果同上------- |name|age| ------- | Tom| 21| |Mike| 25| -------三个核心步骤详解步骤操作代码作用1制作表头StructType(Array(StructField(...)))定义字段名、类型、可空性2制作记录RDD[Row]将原始数据转换为Row对象3拼接合并spark.createDataFrame(rowRDD, schema)将Schema和RowRDD合并为DataFrameRow对象的创建方式// 方式1按位置传入值需与Schema顺序一致valrow1Row(Tom,21)valrow2Row(Mike,25)// 方式2通过索引访问值valnamerow1.getString(0)// Tomvalagerow1.getInt(1)// 21// 方式3类型安全的获取推荐valnamerow1.getAs[String](0)valagerow1.getAs[Int](1)1.3 两种方法对比特性反射推断模式编程式定义模式代码量少较多灵活性低需预定义case class高动态定义Schema类型安全编译时检查运行时检查适用场景字段固定的结构化数据字段动态变化的数据性能相同底层都转为RDD相同错误排查相对困难相对容易case class必须不需要选择建议字段固定、类型明确 → 反射推断模式代码简洁字段动态、Schema不确定 → 编程式定义模式灵活可控二、Spark SQL读写MySQL数据库Spark SQL通过JDBC连接器可以方便地读写关系型数据库本节以MySQL为例进行讲解。2.1 导入依赖在项目的pom.xml中添加MySQL JDBC驱动依赖dependencygroupIdmysql/groupIdartifactIdmysql-connector-java/artifactIdversion8.0.31/version/dependency版本注意事项MySQL版本JDBC驱动类URL格式MySQL 5.xcom.mysql.jdbc.Driverjdbc:mysql://host:3306/dbMySQL 8.xcom.mysql.cj.jdbc.Driverjdbc:mysql://host:3306/db?serverTimezoneUTC注意MySQL 8.0必须使用com.mysql.cj.jdbc.Driver使用旧驱动会报错。2.2 读取MySQL数据通过spark.read.format(jdbc)读取数据库表数据。完整代码importorg.apache.spark.sql.{DataFrame,SparkSession}objectReadMySQL{defmain(args:Array[String]):Unit{valsparkSparkSession.builder().master(local[*]).appName(Read-MySQL).getOrCreate()// 方式1使用format(jdbc) option链式配置valmysqlDF:DataFramespark.read.format(jdbc).option(url,jdbc:mysql://localhost:3306/spark).option(driver,com.mysql.cj.jdbc.Driver).option(dbtable,student).option(user,root).option(password,your_password).load()// 方式2使用jdbc()方法更简洁valmysqlDF2spark.read.jdbc(jdbc:mysql://localhost:3306/spark,student,properties)mysqlDF.show()spark.stop()}}JDBC常用配置选项选项必填说明示例url✅JDBC连接URLjdbc:mysql://localhost:3306/sparkdriver✅JDBC驱动类名com.mysql.cj.jdbc.Driverdbtable✅表名或SQL子查询student或(SELECT * FROM student WHERE age20) tmpuser✅数据库用户名rootpassword✅数据库密码123456partitionColumn❌分区列用于并行读取idlowerBound❌分区下界1upperBound❌分区上界10000numPartitions❌并行分区数4fetchsize❌每次获取行数1000运行结果------------- | id|name|age|sex| ------------- | 1| Tom| 21| 男| | 2|Andy| 20| 女| -------------并行读取优化// 通过分区列实现并行读取提升大数据量读取性能valdfspark.read.format(jdbc).option(url,jdbc:mysql://localhost:3306/spark).option(driver,com.mysql.cj.jdbc.Driver).option(dbtable,student).option(user,root).option(password,123456).option(partitionColumn,id)// 按id列分区.option(lowerBound,1)// 最小id.option(upperBound,1000)// 最大id.option(numPartitions,4)// 分4个分区并行读取.load()2.3 向MySQL写入数据通过df.write.mode().jdbc()将DataFrame数据写入数据库。完整代码importorg.apache.spark.rdd.RDDimportorg.apache.spark.sql.{DataFrame,Row,SparkSession}importorg.apache.spark.sql.types.{IntegerType,StringType,StructField,StructType}importjava.util.PropertiesobjectWriteMySQL{defmain(args:Array[String]):Unit{valsparkSparkSession.builder().master(local[*]).appName(Write-MySQL).getOrCreate()// Step 1: 准备要写入的数据从RDD创建valrdd:RDD[Array[String]]spark.sparkContext.parallelize(Array(3 Mike 22 男,4 Cindy 23 女)).map(_.split( ))// Step 2: 定义Schema表头valschema:StructTypeStructType(Array(StructField(id,IntegerType,true),StructField(name,StringType,true),StructField(age,IntegerType,true),StructField(sex,StringType,true)))// Step 3: 创建Row RDD记录valrowRDD:RDD[Row]rdd.map(stuRow(stu(0).toInt,stu(1),stu(2).toInt,stu(3)))// Step 4: 创建DataFramevaldf:DataFramespark.createDataFrame(rowRDD,schema)// Step 5: 配置JDBC连接参数valpropnewProperties()prop.put(user,root)prop.put(password,your_password)prop.put(driver,com.mysql.cj.jdbc.Driver)// Step 6: 写入数据append模式追加df.write.mode(append).jdbc(jdbc:mysql://localhost:3306/spark,spark.student,prop)// 验证写入结果valresultspark.read.format(jdbc).option(url,jdbc:mysql://localhost:3306/spark).option(driver,com.mysql.cj.jdbc.Driver).option(dbtable,spark.student).option(user,root).option(password,your_password).load()result.show()spark.stop()}}写入模式说明模式说明适用场景append追加数据到已有表增量写入overwrite先删除表数据再写入全量覆盖ignore表存在则忽略不写入避免重复写入errorIfExists表存在则报错默认防止误操作写入前数据库表结构CREATETABLEspark.student(idINTPRIMARYKEY,nameVARCHAR(50),ageINT,sexVARCHAR(10));写入后数据-------------- | id| name|age|sex| -------------- | 1| Tom| 21| 男| | 2| Andy| 20| 女| | 3| Mike| 22| 男| | 4|Cindy| 23| 女| --------------2.4 读写数据库完整流程图读取流程: MySQL数据库 ↓ JDBC连接 (url, driver, user, password) ↓ spark.read.format(jdbc).option(...).load() ↓ DataFrame ↓ 数据处理/分析 写入流程: 原始数据 (RDD/集合/文件) ↓ 定义Schema 创建Row RDD ↓ spark.createDataFrame(rowRDD, schema) ↓ DataFrame ↓ df.write.mode(append).jdbc(url, table, properties) ↓ MySQL数据库三、常见问题排查3.1 ClassNotFoundException: com.mysql.cj.jdbc.Driver原因缺少MySQL JDBC驱动依赖或驱动类名错误。解决确认pom.xml中已添加mysql-connector-java依赖确认MySQL 8.x使用com.mysql.cj.jdbc.Driver5.x使用com.mysql.jdbc.Driver提交集群任务时使用--jars参数携带驱动jar包spark-submit--jarsmysql-connector-java-8.0.31.jar your_app.jar3.2 时区错误The server time zone value ‘xxx’ is unrecognized原因MySQL 8.0默认时区与JDBC驱动不匹配。解决在URL中添加时区参数.option(url,jdbc:mysql://localhost:3306/spark?serverTimezoneUTC)3.3 写入时表不存在原因目标表未提前创建。解决方式1提前在MySQL中创建表方式2使用df.write.mode(overwrite).option(createTableOptions, ...).jdbc(...)自动创建3.4 数据类型不匹配原因DataFrame字段类型与数据库表字段类型不兼容。解决检查Schema定义与数据库表结构是否一致注意Spark的IntegerType对应MySQL的INTLongType对应BIGINT字符串长度不足时调整MySQL字段的VARCHAR长度四、总结本文系统讲解了RDD与DataFrame的转换以及Spark SQL的数据库操作核心知识点RDD转DataFrame两种方法反射推断模式定义case class import spark.implicits._rdd.toDF()编程式定义模式StructType定义Schema RDD[Row]创建记录 createDataFrame()拼接Spark SQL读取MySQL导入mysql-connector-java依赖使用spark.read.format(jdbc).option(...).load()关键参数url、driver、dbtable、user、passwordSpark SQL写入MySQL准备数据为DataFrame格式使用df.write.mode(append).jdbc(url, table, properties)支持append/overwrite/ignore/errorIfExists四种模式方法选择指南场景推荐方法原因字段固定、类型明确反射推断模式代码简洁自动推断字段动态、Schema不确定编程式定义模式灵活可控运行时安全读取数据库全表format(“jdbc”)标准JDBC方式大数据量读取JDBC 分区参数并行读取提升性能增量写入数据库write.mode(“append”)不破坏已有数据全量覆盖写入write.mode(“overwrite”)替换旧数据