流批一体在京东的探索与实践

01 整体思考

提到流批一体,不得不提传统的大数据平台 —— Lambda 架构。它能够有效地支撑离线和实时的数据开发需求,但它流和批两条数据链路割裂所导致的高开发维护成本以及数据口径不一致是无法忽视的缺陷。

通过一套数据链路来同时满足流和批的数据处理需求是最理想的情况,即流批一体。此外我们认为流批一体还存在一些中间阶段,比如只实现计算的统一或者只实现存储的统一也是有重大意义的。

以只实现计算统一为例,有一些数据应用的实时性要求比较高,比如希望端到端的数据处理延时不超过一秒钟,这对目前开源的、适合作为流批统一的存储来说是一个很大的挑战。以数据湖为例,它的数据可见性与 commit 的间隔相关,进而与 Flink 做 checkpoint 的时间间隔相关,此特性结合数据处理链路的长度,可见做到端到端一秒钟的处理并不容易。因此对于这类需求,只实现计算统一也是可行的。通过计算统一去降低用户的开发及维护成本,解决数据口径不一致的问题。

在流批一体技术落地的过程中,面临的挑战可以总结为以下 4 个方面:

  • 首先是数据实时性。如何把端到端的数据时延降低到秒级别是一个很大的挑战,因为它同时涉及到计算引擎及存储技术。它本质上属于性能问题,也是一个长期目标。
  • 第二个挑战是如何兼容好在数据处理领域已经广泛应用的离线批处理能力。此处涉及开发和调度两个层面的问题,开发层面主要是复用的问题,比如如何复用已经存在的离线表的数据模型,如何复用用户已经在使用的自定义开发的 Hive UDF 等。调度层面的问题主要是如何合理地与调度系统进行集成。
  • 第三个挑战是资源及部署问题。比如通过不同类型的流、批应用的混合部署来提高资源利用率,以及如何基于 metrics 来构建弹性伸缩能力,进一步提高资源利用率。
  • 最后一个挑战也是最困难的一个:用户观念。大多数用户对于比较新的技术理念通常仅限于技术交流或者验证,即使验证之后觉得可以解决实际问题,也需要等待合适的业务来试水。这个问题也催生了一些思考,平台侧一定要多站在用户的视角看待问题,合理地评估对用户的现有技术架构的改动成本以及用户收益、业务迁移的潜在风险等。

上图是京东实时计算平台的全景图,也是我们实现流批一体能力的载体。中间的 Flink 基于开源社区版本深度定制。基于该版本构建的集群,外部依赖包含三个部分,JDOS、HDFS/CFS 和 Zookeeper。

  • JDOS 是京东的 Kubernetes 平台,目前我们所有 Flink 计算任务容器化的,都运行在这套平台之上;
  • Flink 的状态后端有 HDFS 和 CFS 两种选择,其中 CFS 是京东自研的对象存储;
  • Flink 集群的高可用是基于 Zookeeper 构建的。

在应用开发方式方面,平台提供 SQL 和 Jar 包两种方式,其中 Jar 的方式支持用户直接上传 Flink 应用 Jar 包或者提供 Git 地址由平台来负责打包。除此之外我们平台化的功能也相对比较完善,比如基础的元数据服务、SQL 调试功能,产品端支持所有的参数配置,以及基于 metrics 的监控、任务日志查询等。

连接数据源方面,平台通过 connector 支持了丰富的数据源类型,其中 JDQ 基于开源 Kafka 定制,主要应用于大数据场景的消息队列;JMQ 是京东自研,主要应用于在线系统的消息队列;JimDB 是京东自研的分布式 KV 存储。

在当前 Lambda 架构中,假设实时链路的数据存储在 JDQ,离线链路的数据存在 Hive 表中,即便计算的是同一业务模型,元数据的定义也常常是存在差异的,因此我们引入统一的逻辑模型来兼容实时离线两边的元数据。

在计算环节,通过 FlinkSQL 结合 UDF 的方式来实现业务逻辑的流批统一计算,此外平台会提供大量的公用 UDF,同时也支持用户上传自定义 UDF。针对计算结果的输出,我们同样引入统一的逻辑模型来屏蔽流批两端的差异。对于只实现计算统一的场景,可以将计算结果分别写入流批各自对应的存储,以保证数据的实时性与先前保持一致。

对于同时实现计算统一和存储统一的场景,我们可以将计算的结果直接写入到流批统一的存储。我们选择了 Iceberg 作为流批统一的存储,因为它拥有良好的架构设计,比如不会绑定到某一个特定的引擎等。

在兼容批处理能力方面,我们主要进行了以下三个方面的工作:

第一,复用离线数仓中的 Hive 表。

以数据源端为例,为了屏蔽上图左侧图中流、批两端元数据的差异,我们定义了逻辑模型 gdm_order_m 表,并且需要用户显示地指定 Hive 表和 Topic 中的字段与这张逻辑表中字段的映射关系。这里映射关系的定义非常重要,因为基于 FlinkSQL 的计算只需面向这张逻辑表,而无需关心实际的 Hive 表与 Topic 中的字段信息。在运行时通过 connector 创建流表和批表的时候,逻辑表中的字段会通过映射关系被替换成实际的字段。

在产品端,我们可以给逻辑表分别绑定流表和批表,通过拖拽的方式来指定字段之间的映射关系。这种模式使得我们的开发方式与之前有所差异,之前的方式是先新建一个任务并指定是流任务还是批任务,然后进行 SQL 开发,再去指定任务相关的配置,最后发布任务。而在流批一体模式下,开发模式变为了首先完成 SQL 的开发,其中包括逻辑的、物理的 DDL 的定义,以及它们之间的字段映射关系的指定,DML 的编写等,然后分别指定流批任务相关的配置,最后发布成流批两个任务。

第二,与调度系统打通。

离线数仓的数据加工基本是以 Hive/Spark 结合调度的模式,以上图中居中的图为例,数据的加工被分为 4 个阶段,分别对应数仓的 BDM、FDM、GDM 和 ADM 层。随着 Flink 能力的增强,用户希望把 GDM 层的数据加工任务替换为 FlinkSQL 的批任务,这就需要把 FlinkSQL 批任务嵌入到当前的数据加工过程中,作为中间的一个环节。

为了解决这个问题,除了任务本身支持配置调度规则,我们还打通了调度系统,从中继承了父任务的依赖关系,并将任务自身的信息同步到调度系统中,支持作为下游任务的父任务,从而实现了将 FlinkSQL 的批任务作为原数据加工的其中一个环节。

第三,对用户自定义的 Hive UDF、UDAF 及 UDTF 的复用。

对于现存的基于 Hive 的离线加工任务,如果用户已经开发了 UDF 函数,那么最理想的方式是在迁移 Flink 时对这些 UDF 进行直接复用,而不是按照 Flink UDF 定义重新实现。

在 UDF 的兼容问题上,针对使用 Hive 内置函数的场景,社区提供了 load hive modules 方案。如果用户希望使用自己开发的 Hive UDF,可以通过使用 create catalog、use catalog、create function,最后在 DML 中调用的方式来实现, 这个过程会将 Function 的信息注册到 Hive 的 Metastore 中。从平台管理的角度,我们希望用户的 UDF 具备一定的隔离性,限制用户 Job 的粒度,减少与 Hive Metastore 交互以及产生脏函数元数据的风险。

此外,当元信息已经被注册过,希望下次能在 Flink 平台端正常使用,如果不使用 if not exist 语法,通常需要先 drop function,再进行 create 操作。但是这种方式不够优雅,同时也对用户的使用方式有限制。另一种解决方法是用户可以注册临时的 Hive UDF,在 Flink1.12 中注册临时 UDF 的方式是 create temporary function,但是该 Function 需要实现 UserDefinedFunction 接口后才能通过后面的校验,否则会注册失败。

所以我们并没有使用 create temporary function,而是对 create function 做了一些调整,扩展了 ExtFunctionModule,将解析出来的 FunctionDefinition 注册到 ExtFunctionModule 中,做了一次 Job 级别的临时注册。这样的好处就是不会污染 Hive Metastore,提供了良好的隔离性,同时也没有对用户的使用习惯产生限制,提供了良好的体验。

不过这个问题在社区 1.13 的版本已经得到了综合的解决。通过引入 Hive 解析器等扩展,已经可以把实现 UDF、GenericUDF 接口的自定义 Hive 函数通过 create temporary function 语法进行注册和使用。

资源占用方面,流处理和批处理是天然错峰的。对于批处理,离线数仓每天 0 点开始计算过去一整天的数据,所有的离线报表的数据加工会在第二天上班前全部完成,所以通常 00:00 到 8:00 是批计算任务大量占用资源的时间段,而这个时间段通常在线的流量都比较低。流处理的负载与在线的流量是正相关的,所以这个时间段流处理的资源需求是比较低的。上午 8 点到晚上 0 点,在线的流量比较高,而这个时间段批处理的任务大部分都不会被触发执行。

基于这种天然的错峰,我们可以通过在专属的 JDOS Zone 中进行不同类型的流批应用的混部来提升资源的使用率,并且如果统一使用 Flink 引擎来处理流批应用,资源的使用率会更高。

同时为了使应用可以基于流量进行动态调整,我们还开发了自动弹性伸缩的服务 (Auto-Scaling Service)。它的工作原理如下:运行在平台上的 Flink 任务上报 metrics 信息到 metrics 系统,Auto-Scaling Service 会基于 metrics 系统中的一些关键指标,比如 TaskManager 的 CPU 使用率、任务的背压情况等来判定任务是否需要增减计算资源,并把调整的结果反馈给 JRC 平台,JRC 平台通过内嵌的 fabric 客户端将调整的结果同步到 JDOS 平台,从而完成对 TaskManager Pod 个数的调整。此外,用户可以在 JRC 平台上通过配置来决定是否为任务开启此功能。

上图右侧图表是我们在 JDOS Zone 中进行流批混部并结合弹性伸缩服务试点测试时的 CPU 使用情况。可以看到 0 点流任务进行了缩容,将资源释放给批任务。我们设置的新任务在 2 点开始执行,所以从 2 点开始直到早上批任务结束这段时间,CPU 的使用率都比较高,最高到 80% 以上。批任务运行结束后,在线流量开始增长时,流任务进行了扩容,CPU 的使用率也随之上升。

02 技术方案及优化

流批一体是以 FlinkSQL 为核心载体,所以我们对于 FlinkSQL 的底层能力也做了一些优化,主要分为维表优化、join 优化、window 优化和 Iceberg connector 优化几个方面。

首先是维表相关的几个优化。目前社区版本的 FlinkSQL 只支持部分数据源 sink 算子并行度的修改,并不支持 source 以及中间处理算子的并行度修改。

假设一个 FlinkSQL 任务消费的 topic 有 5 个分区,那么下游算子的实际并行度是 5,算子之间是 forward 的关系。对于数据量比较大的维表 join 场景,为了提高效率,我们希望并行度高一些,希望可以灵活设置它的并行度而不与上游的分区数绑定。

基于此,我们开发了预览拓扑的功能,不论是 Jar 包、SQL 任务都可以解析并生成 StreamGraph 进行预览,进一步还能支持修改分组、算子 chain 的策略、并行度、设置 uid 等。

借助这个功能,我们还可以调整维表 join 算子的并行度,并且将分区策略由 forward 调整为 rebalance,然后把这些调整后的信息更新到 StreamGraph。此外我们还实现了动态 rebalance 策略,可以基于 backLog 去判断下游分区中的负载情况,从而选择最优的分区进行数据分发。

为了提升维表 join 的性能,我们对所有平台支持的维表数据源类型都实现了异步 IO 并支持在内存中做缓存。不论是原生的 forward 方式还是 rebalance 方式,都存在缓存失效和替换的问题。那么,如何提高维表缓存的命中率以及如何降低维表缓存淘汰的操作?

以原生的 forward 方式为例,forward 意味着每个 subtask 缓存着随机的维表数据,与 joinkey 的值有关。对维表的 joinkey 做哈希,就能保证下游每一个算子缓存着与 joinkey 相关的、不同的维表数据,从而有效地提升缓存的命中率。

在实现层面我们新增了一条叫 StreamExecLookupHashJoinRule 的优化规则,并且把它添加到物理 rewrite 的阶段。在最底层的扫描数据 StreamExecTableSourceScan 和维表 join StreamExecLookupJoin 之间增加了一个 StreamExecChange 节点,由它来完成对维表数据的哈希操作。可以通过在定义维表 DDL 时指定 lookup.hash.enable=true 来开启这个功能。

我们对于 forward、rebalance、哈希三种方式开启缓存,进行了相同场景的性能测试。主表一亿条数据去 join 维表的 1 万条数据,在不同的计算资源下,rebalance 相较于原生的 forward 方式有数倍的性能提升,而哈希相较于 rebalance 的方式又有数倍的性能提升,整体效果是比较可观的。

针对维表 join 单条查询效率比较低的问题,解决思路也很简单,就是攒批,按照微批的方式去访问 (mini-batch)。可以在 DDL 的定义中通过设置 lookup.async.batch.size 的值来指定批次的大小。除此之外,我们还在时间维度上引入了 Linger 机制来做限制,防止极端场景出现迟迟无法攒够一批数据而导致时延比较高的情况,可以通过在 DDL 的定义中设置 lookup.async.batch.linger 的值来指定等待时间。

经过测试,mini-batch 的方式能够带来 15% ~ 50% 的性能提升。

Interval join 也是生产上一个使用比较频繁的场景,这类业务的特点是流量非常大,比如 10 分钟百 GB 级别。Interval join 两条流的数据都会缓存在内部 state 中,任意一边的数据到达都会获取对面流相应时间范围的数据去执行 join function,所以这种大流量的任务会有非常大的状态。

对此我们选用了 RocksDB 来做状态后端,但是进行了调参优化后效果仍不理想,任务运行一段时间之后会出现背压,导致 RocksDB 的性能下降,CPU 的使用率也比较高。

通过分析我们发现,根本原因与 Flink 底层扫描 RocksDB 是基于前缀的扫描方式有关。因此解决思路也很简单,根据查询条件,精确地构建查询的上下界,把前缀查询变为范围查询。查询条件依赖的具体上下界的 key 变为了 keyGroup+joinKey+namespace+timestamp[lower,upper],可以精确地只查询某些 timestamp 之间的数据,任务的背压问题也得到了解决。而且数据量越大,这种优化带来的性能提升越明显。

Regular join 使用状态来保存所有历史数据,所以如果流量大也会导致状态数据比较大。而它保存状态是依赖 table.exec.state.ttl 参数,这个参数值比较大也会导致状态大。

针对这种场景,我们改为使用外部存储JimDB存储状态数据。目前只做了 inner join 的实现,实现机制如下:两边的流对 join 到的数据进行下发的同时,将所有数据以 mini-batch 的方式写入到 JimDB,join 时会同时扫描内存中以及 JimDB 中对应的数据。此外,可以通过 JimDB ttl 的机制来实现 table.exec.state.ttl 功能,从而完成对过期数据的清理。

上述实现方式优缺点都比较明显,优点是可以支持非常大的状态,缺点是目前无法被 Flink checkpoint 覆盖到。

对于 window 的优化,首先是窗口偏移量。需求最早来源于一个线上场景,比如我们想统计某个指标 2021 年 12 月 4 日 0 点 ~ 2021 年 12 月 5 日 0 点的结果, 但由于线上集群是东 8 区时间,所以实际统计的结果是 2021 年 12 月 4 日早上 8 点 ~ 2021 年 12 月 5 日早上 8 点的结果,这显然不符合预期。因此这个功能最早是为了修复非本地时区跨天级别的窗口统计错误的问题。

在我们增加了窗口偏移量参数后,可以非常灵活地设置窗口的起始时间,能够支持的需求也更广泛。

其次,还存在另外一个场景:虽然用户设定了窗口大小,但是他希望更早看到窗口当前的计算结果,便于更早地去做决策。因此我们新增了增量窗口的功能,它可以根据设置的增量间隔,触发执行输出窗口的当前计算结果。

对于端到端实时性要求不高的应用,可以选择 Iceberg 作为下游的统一存储。但是鉴于计算本身的特性、用户 checkpoint 间隔的配置等原因,可能导致产生大量的小文件。Iceberg 的底层我们选用 HDFS 作为存储,大量的小文件会对 Namenode 产生较大的压力,所以就有了合并小文件的需求。

Flink 社区本身提供了基于 Flink batch job 的合并小文件的工具可以解决这个问题,但这种方式有点重,所以我们开发了算子级别的小文件合并的实现。思路是这样的,在原生的 global commit 之后,我们新增了三个算子 compactCoordinator、 compactOperator 和 compactCommitter,其中 compactCoordinator 负责获取待合并的 snapshot 并下发,compactOperator 负责 snapshot 的合并操作的执行,并且可以多个 compactOperator 并发执行,compactCommitter 负责合并后 datafiles 的提交。

我们在 DDL 的定义中新增了两个参数,auto-compact 指定是否开启合并文件的功能,compact.delta.commits 指定每提交多少次 commit 来触发一次 compaction。

在实际的业务需求中,用户可能会从 Iceberg 中读取嵌套数据,虽然可以在 SQL 中指定读取嵌套字段内部的数据,但是在实际读取数据时是会将包含当前嵌套字段的所有字段都读取到,再去获取用户需要的字段,而这会直接导致 CPU 和网络带宽负载的增高,所以就产生了如下需求:如何只读取到用户真正需要的字段?

解决这个问题,要满足两个条件,第一个条件是读取 Iceberg 的数据结构 schema 只包含用户需要的字段,第二个条件是 Iceberg 支持按列名去读取数据,而这个本身已经满足了,所以我们只需要实现第一个条件即可。

如上图右侧所示,结合之前的 tableSchema 和 projectFields 信息重构,生成了一个只包含用户需要字段的新的数据结构 PruningTableSchema,并且作为 Iceberg schema 的输入,通过这样的操作实现了根据用户的实际使用情况对嵌套结构进行列裁剪。图中左下部的示例展示了用户优化前后读取嵌套字段的对比,可以看到基于 PruningTablesSchema 能够对无用的字段进行有效的裁剪。

经过上述优化,CPU 使用率降低了 20%~30%。而且,在相同的数据量下,批任务的执行时间缩短了 20%~30%。

此外,我们还实现了一些其他优化,比如修复了 interval outer join 数据晚于 watermark 下发、且下游有时间算子时会导致的数据丢失问题,UDF 的复用问题,FlinkSQL 扩展 KeyBy 语法,维表数据预加载以及 Iceberg connector 从指定的 snapshot 去读取等功能。

03 落地案例

京东目前 FlinkSQL 线上任务 700+,占Flink总任务数的 15% 左右,FlinkSQL 任务累计峰值处理能力超过 1.1 亿条/秒。目前主要基于社区的 1.12 版本进行了一些定制优化。

3.1 案例一

实时通用数据层 RDDM 流批一体化的建设。RDDM 全称是 real-time detail data model - 实时明细数据模型,它涉及订单、流量、商品、用户等,是京东实时数仓的重要一环,服务了非常多的核心业务,例如黄金眼/商智、JDV、广告算法、搜推算法等。

RDDM 层的实时业务模型与离线数据中 ADM 和 GDM 层的业务加工逻辑一致。基于此,我们希望通过 FlinkSQL 来实现业务模型的流批计算统一。同时这些业务也具备非常鲜明的特点,比如订单相关的业务模型都涉及大状态的处理,流量相关的业务模型对于端到端的实时性要求比较高。此外,某些特殊场景也需要一些定制化的开发来支持。

RDDM 的实现主要有两个核心诉求:首先它的计算需要关联的数据比较多的,大量的维度数据都存储在 HBase 中;此外部分维度数据的查询存在二级索引,需要先查询索引表,从中取出符合条件的 key 再去维度表中获取真正的数据。

针对上述需求,我们通过结合维表数据预加载的功能与维表 keyby 的功能来提升 join 的效率。针对二级索引的查询需求,我们定制了 connector 来实现。

维表数据预加载的功能指在初始化的阶段就将维表数据加载到内存中,这个功能结合 keyby 使用可以非常有效地减少缓存的数量,提高命中率。

部分业务模型关联的历史数据比较多,导致状态数据比较大,目前我们是根据场景进行定制的优化。我们认为根本的解决方案是实现一套高效的基于 KV 的 statebackend,对于此功能的实现正在规划中。

**3.2 案例二
**

流量买卖黑产的舆情分析。它的主要流程如下:源端通过爬虫获取相关信息并写入到 JMQ,数据同步到 JDQ 以后,通过 Flink 处理然后继续写下游的 JDQ。与此同时,通过 DTS 数据传输服务,将上游 JDQ 的数据同步到 HDFS,然后通过 Hive 表进行离线的数据加工。

此业务有两个特点:首先,端到端的实时性要求不高,可以接受分钟级别的延时;第二,离线和实时的加工逻辑一致。因此,可以直接把中间环节的存储从 JDQ 换成 Iceberg,然后通过 Flink 去增量读取,并通过 FlinkSQL 实现业务逻辑加工,即完成了流批两套链路的完全统一。其中 Iceberg 表中的数据也可以供 OLAP 查询或离线做进一步的加工。

上述链路端到端的时延在一分钟左右,基于算子的小文件合并功能有效地提升了性能,存储计算成本有了显著的降低,综合评估开发维护成本降低了 30% 以上。

**04 未来规划
**

未来规划主要分为以下两个方面:

首先,业务拓展方面。我们会加大 FlinkSQL 任务的推广,探索更多流批一体的业务场景,同时对产品形态进行打磨,加速用户向 SQL 的转型。同时,将平台元数据与离线元数据做更深度的融合,提供更好的元数据服务。

其次,平台能力方面。我们会继续深挖 join 场景和大状态场景,同时探索高效 KV 类型的状态后端实现,并在统一计算和统一存储的框架下不断优化设计,以降低端到端时延。