实时大数据计算中,Spark的滑动窗口和允许消息迟到机制

作者:sword_csdn日期:2025/11/13

目录

  • 1.开发环境
  • 2.几句话先概括
  • 3.例子说明
    • 3.1.参数配置
    • 3.2.窗口是如何产生的
    • 3.3.Trigger触发机制
    • 3.4.迟到的消息数据

最近做了个实时大数据分析的项目,发现很多东西都忘记了,属实没有好好整理笔记之过,趁眼下闲暇,做个回忆和记录。

1.开发环境

这次环境采用Java17+,Scala2.13,Spark的版本为4.0.0,且基于Kafka创建读取流。其它环境可参考以下maven pom。

1<?xml version="1.0" encoding="UTF-8"?>
2<project xmlns="http://maven.apache.org/POM/4.0.0"
3         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5
6    <modelVersion>4.0.0</modelVersion>
7    <groupId>com.edata.bigdata</groupId>
8    <artifactId>edata</artifactId>
9    <packaging>pom</packaging>
10    <version>1.0</version>
11    <properties>
12        <java.version>17</java.version>
13        <spark.version>4.0.0</spark.version>
14        <hadoop.version>3.4.1</hadoop.version>
15        <flink.version>2.0.0</flink.version>
16        <scala.version>2.13</scala.version>
17        <zookeeper.version>3.9.4</zookeeper.version>
18        <maven.compiler.version>3.8.1</maven.compiler.version>
19    </properties>
20    <!--其它配置-->
21</project>
22

该pom所在工程是我的私人仓库,读者可以根据自身情况调整版本号

2.几句话先概括

Spark Streaming的滑动窗口和允许消息迟到机制是由以下四个参数决定,严格来说是三个,trigger不算。

  1. windowDuration 控制“聚合粒度”。
  2. slideDuration 控制“聚合频率”。
  3. watermark控制“迟到的数据还能否被计算”。
  4. trigger控制“何时处理一个微批”。

3.例子说明

实际上每一条消息都自带时间字段,该字段称为事件时间(Event Time),一般是在数据产生时就被打上的时间戳,与Spark,kafka无关,是Spark是否将其纳入窗口计算的依据。
假设连续的消息组成了时间序列为(每一秒产生一条数据):

1事件时间(Event Time)00:00  00:01  00:02  ...  00:09  00:10  00:11  ...  00:19  00:20  ...
2

Spark处理消息时也会产生连续的处理时间(Processing Time),假设为:

1数据到达 (Processing Time) →  09:00  09:01  09:02  ...  09:09  09:10  09:11  ...  09:19  09:20  ...
2

事件时间一开始可能是由其他字段名进行标识,例如以下代码。

1Dataset<Row> windowed = data
2                .withColumn("event_time", col("timestamp").cast("timestamp"))
3                .withColumn("data", col("value").cast("string"))
4                .select("data", "event_time");
5

原本的时间字段为timestamp,且为字符串,我转成了timestamp类型,且更改了字段名为event_time,真正的消息体原本是value字段,我改成了data,最终仅选择data和event_time进行后续的计算。

3.1.参数配置

假设在本例子中的各项重要参数的配置如下所示。

参数说明
windowDuration10 seconds每个窗口覆盖 10秒分钟事件时间
slideDuration5 seconds每 5 秒产生一个新窗口(重叠)
triggerProcessingTime(“1 second”)每 1 秒 Spark 处理一次微批
watermark20 seconds迟到 ≤20秒的数据仍可进入窗口

参数的设置方法如下所示

1public Dataset<Row> applyWindowAndWatermark(Dataset<Row> data,
2                                                    String windowDuration,
3                                                    String slideDuration,
4                                                    String watermarkDelay) {
5    //将timestamp列的字符串转成timestamp格式,并改名为“event_time”
6    //将value列的数据解码,转成字符串,并改名为“data”
7    //选择data,event_time两列,其他列丢掉
8    Dataset<Row> windowed = data
9            .withColumn("event_time", col("timestamp").cast("timestamp"))
10            .withColumn("data", col("value").cast("string"))
11            .select("data", "event_time");
12    if (watermarkDelay != null && !watermarkDelay.isBlank()) {
13        windowed = windowed.withWatermark("event_time", watermarkDelay);
14    }
15    Column windowCol = window(
16            col("event_time"),
17            windowDuration,
18            slideDuration != null ? slideDuration : windowDuration
19    );
20    return windowed.withColumn("window", windowCol);
21}
22

3.2.窗口是如何产生的

根据参数配置, Spark每5秒(slideDuration)产生一个10秒(windowDuration)。通过示意图来表达,大概如下所示

1事件时间(Event Time)            00:00 ... 00:05 ... 00:10 ... 00:15 ... 00:20 ... 00:25 ...
2数据到达(Processing Time)       09:00 ... 09:05 ... 09:10 ... 09:15 ... 09:20 ... 09:25 ...
3处理窗口(Processing windows):     [--------10s--------)
4                                   ----5s----[--------10s--------)
5                                             ----5s----[--------10s--------)
6                                                       ----5s----[--------10s--------)
7

通过上图可以知道,生成的窗口区间分别为W1:[9:00,9:10),W2:[9:05,9:15),W3:[9:10,9:20),W4:[9:15,9:25),在这些窗口内,会获取对应的消息进行计算。窗口生成的过程也叫做窗口生成时间轴。
通过观察可以发现,窗口之间有重叠的部分,如果不希望窗口内计算的消息有所重叠,则将slideDuration和windowDuration设置为相等即可。

3.3.Trigger触发机制

Trigger的作用是多久“输出”一次计算结果,这个“输出”可能是多方面的,可能是输出到console,可能是写入文件,可能是写入数据库。假设是输出到console,且按照触发时间是3s来算,我们可以得到触发器的触发时间序列如下

1事件时间(Event Time)           :00:00 ... 00:05 ... 00:10 ... 00:15 ... 00:20 ... 00:25 ...
2数据到达(Processing Time)      :09:00 ... 09:05 ... 09:10 ... 09:15 ... 09:20 ... 09:25 ...
3处理窗口(Processing windows)   :[--------10s--------)
4                                |  ----5s----[--------10s--------)
5                                |            ----5s----[--------10s--------)
6                                |                      ----5s----[--------10s--------)
7触发序列(Trigger Time)         :  09:03  09:06  09:09  09:12  09:15  09:18  09:21......
8

从以上时间轴我们可以看到,触发器分别在09:03,09:06,09:09,09:12,09:15,09:18等时间点开始触发。在这些时间点,有以下结论

  • 在09:03,输出W1的部分计算结果。
  • 在09:06,输出W1,W2的部分计算结果。
  • 在09:09,输出W1,W2的部分计算结果。
  • 在09:12,输出W1的完整计算结果(关闭),输出W2,W3的部分计算结果
  • 在09:15,输出W2的完整计算结果(关闭),输出W3,W4的部分计算结果
  • 在09:18,输出W3,W4的部分计算结果
  • 在09:21,输出W3的完整计算结果,输出W4的部分计算结果。
  • … …

3.4.迟到的消息数据

在实时计算中,有时会遇到某些消息的事件时间与数据到达时间相差很远,此时可以通过设置watermask来决定是否让该数据回到自己的窗口进行计算。
假设允许迟到的时间为10s,并且某条迟到消息的事件时间为00:02,到达时间为09:15。则
(1)Spark先判断该消息的所属窗口W1:[00:00,00:10),数据尝试回到W1进行计算。
(2)由于09:02+10s=09:12,说明该消息最晚应该在09:12分到达,但实际上它09:15才到达,所以丢弃。
如果该条数据的事件时间为00:07,到达时间为09:15,则
(1)该条数据所属窗口为W1:[00:00,00:10)和W2:[00:05,00:15)。
(2)由于09:07+10s=09:17,说明该消息最晚可以在09:17分到达,09:15分仍在许可范围内。


实时大数据计算中,Spark的滑动窗口和允许消息迟到机制》 是转载文章,点击查看原文


相关推荐


centos运维常用命令
KV_T2025/11/12

CentOS 服务器运维中,以下是按场景分类的常用命令,涵盖系统监控、用户管理、服务管理、文件操作等核心场景,适合日常运维参考: 一、系统状态监控 查看系统负载 uptime # 显示系统运行时间、用户数、1/5/15分钟负载 w # 更详细的负载信息,包括登录用户和进程 CPU 监控 top # 实时查看CPU、内存占用(按q退出) htop # 交互式CPU/内存监控(需安装:yum install htop) lscpu


C++中实现多线程编程
Alex艾力的IT数字空间2025/11/10

一、基于POSIX线程库(pthreads) 适用场景:Linux/Unix系统、需要底层线程控制或兼容旧代码。 核心步骤: 包含头文件:#include <pthread.h> 定义线程函数:返回类型为void*,参数为void*指针。 创建线程:使用pthread_create函数。 等待线程结束:使用pthread_join回收资源。 示例代码: #include <iostream> #include <pthread.h> void* thread


微信小程序开发案例 | 个人相册小程序(上)
志昂张呀2025/11/8

阶段案例-个人相册小程序 01、准备工作 1 导入代码包 为了节约时间,这里我们直接把完成的小程序空白模板代码包templateDemo复制一份并重命名为demo07_myAlbum, 导入开发工具等待改造。 2 启动服务器 这里我们使用本地电脑安装phpStudy v8.1套件来模拟服务器效果,本次阶段案例不需要使用MySQL数据库,因此直接启动Apache或者Nginx来模拟Web服务器即可。 以Nginx为例,启动效果如图7-8所示。 ■ 图7-8  ph


Python 的内置函数 getattr
IMPYLH2025/11/6

Python 内建函数列表 > Python 的内置函数 getattr def getattr(obj, name:str): ''' 获取属性的值 :param obj: 一个对象 :param name: 属性的名字 ''' Python 的内置函数 getattr 是一个非常有用的反射工具,主要用于动态获取对象的属性或方法。其基本语法为:getattr(object, name[, default]),其中 object 是目标对象,nam


Flash游戏破解参考
FD_20132025/11/1

编 者按:工作、学习之余,玩一会Flash小游戏,放松一下紧绷的神经,是不少朋友的最爱。不过,大部分Flash小游戏并不提供SWF文件的下载,想玩游 戏就必需打开网页,也给我们带来了不小的麻烦。当然,小小的问题难不倒我们,通过各种途径,我们依然可以获取各种被加密的SWF文件,从而无需联网,便能 在本地运行Flash小游戏。 对症下药,玩转Flash游戏下载 Flash游戏下载,关键便在于获取SWF文件的真实URL地址。由于不同类型的Flash游戏,采用的加密、运行方式各不相同,因此必需采


南京大学LLM开发基础(四)MoE, LoRA, 数的精度 + MLP层实验
nju_spy2025/10/30

https://njudeepengine.github.io/llm-course-lecture/2025/lecture8.html#1 目录 1. Mixture-of-experts (MoE) 1.1 优势 1.2 结构 1.3 训练 2. Low-rank adaptation (LoRA) 3. 数的精度 -- 混合精度 + 量化操作 Task1:DenseMLPWithLoRA 一、任务背景 二、任务要求 Task2:Sparse MLP 1.


【C++list】底层结构、迭代器核心原理与常用接口实现全解析
m0_748233642025/10/27

一、官方源码的探究 在实现list的底层前,我们先看下官方的核心成员变量,link_type node,其中link_type是list_node*,也就是说是节点的指针 在这里插入图片描述 下面我们看下其的初始化,在空初始化中,链表为空并不是把节点的指针给成空,而是给了个节点,让其的前驱指针和后继指针均指向自己,在C语言阶段的数据结构中我们便知道这个节点是哨兵位头节点 注意: 这里创捷新的节点不是new的,而是使用get_node出来的,这里是由于内存池的原因,后续再介绍


从复杂到高效:QtitanNavigation助力金融系统界面优化升级
Aevget2025/10/24

QtitanNavigation 组件模拟Microsoft Dynamics CRM-2016 / Office 365导航界面和一组控件,来改善Qt.C ++应用程序的用户体验。QtitanNavigation结合用户界面构建“Ribbon UI”和“Side Bar”的各种示例,可以更好地在您的应用程序中导航,使用户更直观地访问应用程序的某些部分。因此,它允许同时显示更多的信息,并让您高效地查看所有实体(工作区域,网格或其他项目),滚动次数更少,点击次数更少。与我们的其他解决方案一样,Qt


Rust 与 Go – 比较以及每个如何满足您的需求
std78792025/10/22

Rust 和 Go 是新的编程语言。每个都解决了以前编程语言(例如 C 和 C++)固有的问题。 如果您不确定哪一个适合您的项目,请查看这篇比较文章,我们将在其中更深入地研究 Rust 与 Go。 在比较结束时,您将清楚地了解 Rust 和 Go 提供的功能。我们将介绍它们的主要特点、优缺点、异同,并根据您的要求讨论正确的选择。 除此之外,我们还将争辩说,大多数团队可能能够同时使用这两种语言来支持他们的应用程序,并且比只坚持使用一种编程语言获得好处。 那么,为什么还要等呢?让我们


node.js上传图片接口
郏国上2025/10/21

node.js需要使用koa-multer库来实现上传图片接口。 实际上先通过koa-multer下载到本地指定目录,然后上传到阿里云(部分格式图片需要转换成网络格式图片jgp再上传)。 首先在系统启动文件引入注册路由: app.use(BodyParser({ 'formLimit':'3mb', 'jsonLimit':'3mb', 'textLimit':'3mb' })); // 注意顺序,必须body parser在前, router在后 app.use(rou

首页编辑器站点地图

Copyright © 2025 聚合阅读

License: CC BY-SA 4.0