原创

Delta Lake 快速入门

环境准备

Delta基于Spark构建所以对Scala和spark有版本要求

require:

  1. Apache Spark version 2.4.2 或更新
  2. Scala version 2.12 且支持2.11

Delta支持两种使用方式

  1. 交互式运行:使用Spark Shell运行
  2. 项目中运行:使用Maven依赖的方式在项目中使用

本文主要介绍在项目中使用的方式

添加依赖

在项目中的pom添加如下依赖,Delta提供了两种Scala的编译版本, 2.11 和 2.12 ,若你的项目使用Scala开发,那么需要配置对应的版本,若使用Java开发,则两种都可以使用。

<dependency>
  <groupId>io.delta</groupId>
  <artifactId>delta-core_2.12</artifactId>
  <version>0.1.0</version>
</dependency>

我的项目使用scala verison 2.12所以配置如上版本

代码实现

处理批数据

Delta完全兼容Spark API使用时不需要多余的代码

object QuickStart {

  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      .appName("DeltaQuickStart")
      .master("local")
      .getOrCreate()
    val data = spark.range(0, 5)

    data.write
      .format("delta")
      .mode(saveMode = "overwrite")
    data.show()

  }
}

直接使用DataFrame就可以创建Delta的table,这里Detla会根据你的DataFrame的Schema自行推断出table的结构。我们还可以使用更多的选项使用到创建table的过程中。

可以对数据进行分区,以加快查询或具有涉及分区列的速度。要在创建delta lake表时对数据进行分区,请按列指定分区。常见的模式是按日期划分,这里我们使用time进行划分。

val deviceFrame = spark
      .read
      .json("src/main/resources/sparkresource/device.json")

deviceFrame.write
      .format(source = "delta")
      .partitionBy(colNames = "time")
      .mode(saveMode = "overwrite")
      .save(path = "temp/delta-table/device")
deviceFrame.show()

Delta支持使用标准的DataFrame API,也就说明你可以读取一个指定路径的Delta文件创建DataFrame。

val readDeltaDF = spark.read
      .format(source = "delta")
      .load("temp/delta-table/device")
readDeltaDF.show(2)

Delta支持快照,并且支持你在查询时查询指定的快照版本,该功能是有Delta中的实现。如你打算查询快照版本为0的数据,只需在read数据时指定option("versionAsOf", 0)即可。

val readSpieclVersion = spark.read
      .format(source = "delta")
      .option("versionAsOf", 0)
      .load("temp/delta-table/device")
readSpieclVersion.show(3)

Time Travel 是一个非常强大的功能,它利用Delta Lake事务日志的功能访问不再在表中的数据。删除版本0选项(或指定版本1)将使您再次看到更新的数据。

处理流数据

在Delta Lake上同样你也可以使用Structrue Streaming,Delta Lake的事务日志保证了exactly-once,即使有其他流或批处理查询针对表并发运行。

默认情况下,所有流的新的记录都是以append模式进行处理,关于流的几种处理方式可以参考链接内容。

val streamingDf = spark.readStream
      .format("rate")
      .load()
val query = streamingDf.selectExpr("value as id")
      .writeStream
      .format("delta")
      .option("checkpointLocation", "temp/checkpoint")
      .option("mergeSchema", "true")
      .start("temp/delta-table/device")
query.awaitTermination()

同时在stream向Delta Lake table写数据时,同时可以从写入的table读取数据。您可以启动另一个流式查询,打印对delta lake表所做的所有更改。

spark.readStream.format("delta")
      .load("temp/delta-table/device")
      .writeStream
      .format("console")
      .start()

本文相关源码都已经push到我的GitHub

sev7e0
Write by sev7e0
end
本文目录