目录
- 1.开发环境
- 2.几句话先概括
- 3.例子说明
-
- 3.1.参数配置
- 3.2.窗口是如何产生的
- 3.3.Trigger触发机制
- 3.4.迟到的消息数据
最近做了个实时大数据分析的项目,发现很多东西都忘记了,属实没有好好整理笔记之过,趁眼下闲暇,做个回忆和记录。
1.开发环境
这次环境采用Java17+,Scala2.13,Spark的版本为4.0.0,且基于Kafka创建读取流。其它环境可参考以下maven pom。
1<?xml version="1.0" encoding="UTF-8"?> 2<project xmlns="http://maven.apache.org/POM/4.0.0" 3 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 4 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 5 6 <modelVersion>4.0.0</modelVersion> 7 <groupId>com.edata.bigdata</groupId> 8 <artifactId>edata</artifactId> 9 <packaging>pom</packaging> 10 <version>1.0</version> 11 <properties> 12 <java.version>17</java.version> 13 <spark.version>4.0.0</spark.version> 14 <hadoop.version>3.4.1</hadoop.version> 15 <flink.version>2.0.0</flink.version> 16 <scala.version>2.13</scala.version> 17 <zookeeper.version>3.9.4</zookeeper.version> 18 <maven.compiler.version>3.8.1</maven.compiler.version> 19 </properties> 20 <!--其它配置--> 21</project> 22
该pom所在工程是我的私人仓库,读者可以根据自身情况调整版本号
2.几句话先概括
Spark Streaming的滑动窗口和允许消息迟到机制是由以下四个参数决定,严格来说是三个,trigger不算。
- windowDuration 控制“聚合粒度”。
- slideDuration 控制“聚合频率”。
- watermark控制“迟到的数据还能否被计算”。
- trigger控制“何时处理一个微批”。
3.例子说明
实际上每一条消息都自带时间字段,该字段称为事件时间(Event Time),一般是在数据产生时就被打上的时间戳,与Spark,kafka无关,是Spark是否将其纳入窗口计算的依据。
假设连续的消息组成了时间序列为(每一秒产生一条数据):
1事件时间(Event Time)00:00 00:01 00:02 ... 00:09 00:10 00:11 ... 00:19 00:20 ... 2
Spark处理消息时也会产生连续的处理时间(Processing Time),假设为:
1数据到达 (Processing Time) → 09:00 09:01 09:02 ... 09:09 09:10 09:11 ... 09:19 09:20 ... 2
事件时间一开始可能是由其他字段名进行标识,例如以下代码。
1Dataset<Row> windowed = data 2 .withColumn("event_time", col("timestamp").cast("timestamp")) 3 .withColumn("data", col("value").cast("string")) 4 .select("data", "event_time"); 5
原本的时间字段为timestamp,且为字符串,我转成了timestamp类型,且更改了字段名为event_time,真正的消息体原本是value字段,我改成了data,最终仅选择data和event_time进行后续的计算。
3.1.参数配置
假设在本例子中的各项重要参数的配置如下所示。
| 参数 | 值 | 说明 |
|---|---|---|
| windowDuration | 10 seconds | 每个窗口覆盖 10秒分钟事件时间 |
| slideDuration | 5 seconds | 每 5 秒产生一个新窗口(重叠) |
| trigger | ProcessingTime(“1 second”) | 每 1 秒 Spark 处理一次微批 |
| watermark | 20 seconds | 迟到 ≤20秒的数据仍可进入窗口 |
参数的设置方法如下所示
1public Dataset<Row> applyWindowAndWatermark(Dataset<Row> data, 2 String windowDuration, 3 String slideDuration, 4 String watermarkDelay) { 5 //将timestamp列的字符串转成timestamp格式,并改名为“event_time” 6 //将value列的数据解码,转成字符串,并改名为“data” 7 //选择data,event_time两列,其他列丢掉 8 Dataset<Row> windowed = data 9 .withColumn("event_time", col("timestamp").cast("timestamp")) 10 .withColumn("data", col("value").cast("string")) 11 .select("data", "event_time"); 12 if (watermarkDelay != null && !watermarkDelay.isBlank()) { 13 windowed = windowed.withWatermark("event_time", watermarkDelay); 14 } 15 Column windowCol = window( 16 col("event_time"), 17 windowDuration, 18 slideDuration != null ? slideDuration : windowDuration 19 ); 20 return windowed.withColumn("window", windowCol); 21} 22
3.2.窗口是如何产生的
根据参数配置, Spark每5秒(slideDuration)产生一个10秒(windowDuration)。通过示意图来表达,大概如下所示
1事件时间(Event Time) 00:00 ... 00:05 ... 00:10 ... 00:15 ... 00:20 ... 00:25 ... 2数据到达(Processing Time) 09:00 ... 09:05 ... 09:10 ... 09:15 ... 09:20 ... 09:25 ... 3处理窗口(Processing windows): [--------10s--------) 4 ----5s----[--------10s--------) 5 ----5s----[--------10s--------) 6 ----5s----[--------10s--------) 7
通过上图可以知道,生成的窗口区间分别为W1:[9:00,9:10),W2:[9:05,9:15),W3:[9:10,9:20),W4:[9:15,9:25),在这些窗口内,会获取对应的消息进行计算。窗口生成的过程也叫做窗口生成时间轴。
通过观察可以发现,窗口之间有重叠的部分,如果不希望窗口内计算的消息有所重叠,则将slideDuration和windowDuration设置为相等即可。
3.3.Trigger触发机制
Trigger的作用是多久“输出”一次计算结果,这个“输出”可能是多方面的,可能是输出到console,可能是写入文件,可能是写入数据库。假设是输出到console,且按照触发时间是3s来算,我们可以得到触发器的触发时间序列如下
1事件时间(Event Time) :00:00 ... 00:05 ... 00:10 ... 00:15 ... 00:20 ... 00:25 ... 2数据到达(Processing Time) :09:00 ... 09:05 ... 09:10 ... 09:15 ... 09:20 ... 09:25 ... 3处理窗口(Processing windows) :[--------10s--------) 4 | ----5s----[--------10s--------) 5 | ----5s----[--------10s--------) 6 | ----5s----[--------10s--------) 7触发序列(Trigger Time) : 09:03 09:06 09:09 09:12 09:15 09:18 09:21...... 8
从以上时间轴我们可以看到,触发器分别在09:03,09:06,09:09,09:12,09:15,09:18等时间点开始触发。在这些时间点,有以下结论
- 在09:03,输出W1的部分计算结果。
- 在09:06,输出W1,W2的部分计算结果。
- 在09:09,输出W1,W2的部分计算结果。
- 在09:12,输出W1的完整计算结果(关闭),输出W2,W3的部分计算结果
- 在09:15,输出W2的完整计算结果(关闭),输出W3,W4的部分计算结果
- 在09:18,输出W3,W4的部分计算结果
- 在09:21,输出W3的完整计算结果,输出W4的部分计算结果。
- … …
3.4.迟到的消息数据
在实时计算中,有时会遇到某些消息的事件时间与数据到达时间相差很远,此时可以通过设置watermask来决定是否让该数据回到自己的窗口进行计算。
假设允许迟到的时间为10s,并且某条迟到消息的事件时间为00:02,到达时间为09:15。则
(1)Spark先判断该消息的所属窗口W1:[00:00,00:10),数据尝试回到W1进行计算。
(2)由于09:02+10s=09:12,说明该消息最晚应该在09:12分到达,但实际上它09:15才到达,所以丢弃。
如果该条数据的事件时间为00:07,到达时间为09:15,则
(1)该条数据所属窗口为W1:[00:00,00:10)和W2:[00:05,00:15)。
(2)由于09:07+10s=09:17,说明该消息最晚可以在09:17分到达,09:15分仍在许可范围内。
《实时大数据计算中,Spark的滑动窗口和允许消息迟到机制》 是转载文章,点击查看原文。

