ApacheFlink在汽车之家的应用与

本文整理自汽车之家实时计算平台负责人邸星星在FlinkForwardAsia分享的议题《ApachFlink在汽车之家的应用及实践》。主要内容包括:

1.背景及现状

2.AutoStram平台

3.基于Flink的实时生态建设

4.后续规划

一、背景及现状

1.第一阶段

在年之前,汽车之家的大部分实时业务都是运行在Storm之上的。Storm作为早期主流的实时计算引擎,凭借简单的Spout和Bolt编程模型以及集群本身的稳定性,俘获了大批用户,我们在年搭建了Storm平台。

随着实时计算的需求日渐增多,数据规模逐步增大,Storm在开发及维护成本上都凸显了不足,这里列举几个痛点:

开发成本高我们一直是用的Lambda架构,会用T+1的离线数据修正实时数据,即最终以离线数据为准,所以计算口径实时要和离线完全保持一致,实时数据开发的需求文档就是离线的SQL,实时开发人员的核心工作就是把离线的SQL翻译成Storm代码,期间虽然封装了一些通用的Bolt来简化开发,但把离线动辄几百行的SQL精准地翻译成代码还是很有挑战的,并且每次运行都要经过打包、上传、重启的一系列的繁琐操作,调试成本很高。计算低效Storm对状态支持的不好,通常需要借助Rdis、HBas这类kv存储维护中间状态,我们之前是强依赖Rdis。比如常见的计算UV的场景,最简单的办法是使用Rdis的sadd命令判断uid是否为已经存在,但这种方法会带来很高的网络IO,同时如果没有提前报备的大促或搞活动导致流量翻倍的情况,很容易把Rdis内存搞满,运维同学也会被杀个措手不及。同时Rdis的吞吐能力也限制了整个作业的吞吐量。难以维护、管理由于采用编写Storm代码方式开发,难以分析元数据及血缘关系,同时可读性差,计算口径不透明,业务交接成本很高。对数仓不友好数据仓库团队是直接对接业务需求的团队,他们更熟悉基于Hiv的SQL开发模式,通常都不擅长Storm作业的开发,这导致一些原本是实时的需求,只能退而求其次选择T+1的方式给出数据。

在这个阶段,我们支持了最基本的实时计算需求,因为开发门槛比较高,很多实时业务都是由我们平台开发来完成,既做平台,又做数据开发,精力分散很严重。

2.第二阶段

我们从年开始调研Flink引擎,其相对完备的SQL支持,天生对状态的支持吸引了我们,在经过学习调研后,年初开始设计开发FlinkSQL平台,并于年中上线了AutoStram1.0平台。平台上线之初就在仓库团队、监控团队和运维团队得以应用,能够快速被用户主要得益于以下几点:

开发、维护成本低:汽车之家大部分的实时任务可以用FlinkSQL+UDF实现。平台提供常用的Sourc和Sink,以及业务开发常用的UDF,同时用户可以自己编写UDF。基于"SQL+配置"的方式完成开发,可以满足大部分需求。对于自定义任务,我们提供方便开发使用的SDK,助力用户快速开发自定义Flink任务。平台面向的用户已经不只是专业的数据开发人员了,普通开发、测试、运维人员经过基本的学习都可以在平台上完成日常的实时数据开发工作,实现平台赋能化。数据资产可管理,SQL语句本身是结构化的,我们通过解析一个作业的SQL,结合sourc、sink的DDL,可以很容易的知道这个作业的上下游,天然保留血缘关系。高性能:Flink可以完全基于状态(内存,磁盘)做计算,对比之前依赖外部存储做计算的场景,性能提升巨。在活动压测期间,改造后的程序可以轻松支持原来几十倍流量的实时计算,且横向扩展性能十分良好。全面的监控报警:用户将任务托管在平台上,任务的存续由平台负责,用户专注于任务本身的逻辑开发本身即可。对于SQL任务,SQL的可读性极高,便于维护;对于自定义任务,基于我们SDK开发,用户可以更专注于梳理业务逻辑上。不论是SQL任务还是SDK,我们都内嵌了大量监控,并与报警平台关联,方便用户快速发现分析定位并修复任务,提高稳定性。赋能业务:支持数仓分层模型,平台提供了良好的SQL支持,数仓人员可以借助SQL,将离线数仓的建设经验应用于实时数仓的建设上,自平台上线后,数仓逐步开始对接实时计算需求。

痛点:

易用性有待提高,比如用户无法自助管理UDF,只能使用平台内置的UDF或者把打好的jar包发给平台管理员,通过人工的方式处理上传问题。随着平台作业量的高速增长,平台on-call成本非常高。首先我们经常面对一些新用户的基础问题:平台的使用问题;开发过程中遇到的问题,比如为什么打包报错;FlinkUI的使用问题;监控图形的含义,如何配置报警。还有一些不太容易快速给出答案的问题:Jar包冲突;为什么消费Kafka延迟;任务为什么报错。尤其是延迟问题,我们常见的数据倾斜,GC,反压问题可以直接引导用户去FlinkUI和监控图表上去查看,但有时候还是需要手动去服务器上查看jmap、jstack等信息,有时候还需要生成火焰图来帮助用户定位性能问题。初期我们没有和运营团队合作,完全是我们开发人员直接对接处理这些问题,虽然期间补充了大量的文档,但是整体上on-call成本还是很高。在Kafka或Yarn出现故障时,没有快速恢复的方案,当面对一些重保业务时,有些捉襟见肘。众所周知,没有永远稳定,不出故障的环境或组件,当有重大故障出现时,需要有快速恢复业务的应对方案。资源没有合理管控,存在比较严重的资源浪费的情况。随着使用平台开发任务的用户不断增加,平台的作业数也不断增加。有些用户不能很好的把控集群资源的使用,经常出现过多申请资源的问题,导致作业运行效率低下甚至空闲,造成了资源的浪费。

在AutoStram1.0平台这个阶段,基于SQL开发的方式极大地降低了实时开发的门槛,各业务方可以自己实现实时业务的开发,同时数仓同学经过简单的学习后,就开始对接实时业务,将我们平台方从大量的业务需求中释放出来,让我们可以专心做平台方面的工作。

3.当前阶段

针对上面的几个方面,我们有针对性行的做了以下几点升级:

引入JarSrvic:支持用户自助上传UDFjar包,并在SQL片段中自助引用,实现自助管理UDF。同时自定义作业也可以配置JarSrvic中的Jar,面对多个作业共用同一个Jar的场景,用户只需要在作业中配置JarSrvic中的jar包路径就可以,避免每次上线都重复上传Jar的繁琐操作;自助诊断:我们开发了动态调整日志级别、自助查看火焰图等功能,方便用户自己定位问题,减轻我们日常on-call成本;作业健康检查功能:从多个维度分析,为每个Flink作业打分,每个低分项都相应的给出建议;Flink作业级别的快速容灾恢复:我们建设了两套YARN环境,每一个YARN对应一个单独的HDFS,两个HDFS之前通过SNAPSHOT方式进行Chckpoint数据的双向复制,同时在平台上增加了切换集群的功能,在一个YARN集群不可用的情况下,用户可以自助在平台上,选择备用集群的Chckpoint;Kafka多集群架构支持:使用我们自研的KafkaSDK,支持快速切换Kafka集群;对接预算系统:每个作业占用的资源都直接对应到预算团队,这样一定程度上保证资源不会被其他团队占用,同时每个团队的预算管理员可以查看预算使用明细,了解自己的预算支持了团队内的哪些业务。

目前用户对平台的使用已经趋于熟悉,同时自助健康检查和自助诊断等功能的上线,我们平台方的日常on-call频率在逐步降低,开始逐渐进入平台建设的良性循环阶段。

4.应用场景

汽车之家用于做实时计算的数据主要分为三类:

客户端日志,也就是我们内部说的点击流日志,包括用户端上报的启动日志、时长日志、PV日志、点击日志以及各类事件日志,这类主要是用户行为日志,是我们建设实时数仓中流量宽表、UAS系统、实时画像的基础,在这之上还支持了智能搜索、智能推荐等在线业务;同时基础的流量数据也用于支持各业务线的流量分析、实时效果统计,支持日常运营决策。服务端日志,包括nginx日志、各类后端应用产生的日志、各种中间件的日志。这些日志数据主要用于后端服务的健康监测、性能监控等场景。业务库的实时变更记录,主要有三种:MySQL的binlog,SQLSrvr的CDC,TiDB的TiCDC数据,基于这些实时的数据变更记录,我们通过对各种内容数据的抽象与规范,建设了内容中台、资源池等基础服务;也有一些做简单逻辑的业务数据实时统计场景,结果数据用于实时大屏、罗盘等,做数据展现。

以上这三类数据都会实时写入Kafka集群,在Flink集群中针对不同场景进行计算,结果数据写入到Rdis、MySQL、Elasticsarch、HBas、Kafka、Kylin等引擎中,用于支持上层应用。

下面列举了一些应用场景:

5.集群规模

目前Flink集群服务器+,部署模式为YARN(80%)和Kubrnts,运行作业数+,日计算量1万亿,峰值每秒处理数据万条。

二、AutoStram平台

1.平台架构

上面是AutoStram平台目前的整体架构,主要是以下几部分内容:

AutoStramcorSystm这是我们平台的核心服务,负责对元数据服务、Flink客户端服务、Jar管理服务及交互结果查询服务进行整合,通过前端页面把平台功能暴露给用户。主要包括SQL和Jar作业的管理、库表信息的管理、UDF管理、操作记录及历史版本的管理、健康检查、自助诊断、报警管理等模块,同时提供对接外部系统的能力,支持其他系统通过接口方式管理库表信息、SQL作业信息及作业启停操作等。基于Akka任务的生命周期管理和调度系统提供了高效,简单,低延迟的操作保障,提升了用户使用的效率和易用性。元数据服务(Catalog-likUnifidMtastor)主要对应FlinkCatalog的后端实现,除了支持基本的库表信息管理外,还支持库表粒度的权限控制,结合我们自身的特点,支持用户组级别的授权。底层我们提供了PluginCatalog机制,既可以用于和Flink已有的Catalog实现做集成,也可以方便我们嵌入自定义的Catalogs,通过Plugin机制可以很容易的重用HivCatalog,JdbcCatalog等,从而保证了库表的周期的一致性。同时元数据服务还负责对用户提交的DML语句进行解析,识别当前作业的依赖的表信息,用于作业的分析及提交过程,同时可以记录血缘关系。JarSrvic平台提供的各类SDK在JarSrvic上进行统一管理,同时用户也可以在平台上把自定义Jar、UDFjar等提交到JarSrvic上统一管理,然后在作业中通过配置或DDL引用。Flink客户端服务(CustomdFlinkJobClint)负责把平台上的作业转化成FlinkJob提交到Yarn或Kubrnts上,我们在这一层针对Yarn和Kubrnts做了抽象,统一两种调度框架的行为,对外暴露统一接口及规范化的参数,弱化Yarn和Kubrnts的差异,为Flink作业在两种框架上无缝切换打下了良好的基础。每个作业的依赖不尽相同,我们除了对基础依赖的管理以外,还需要支持个性化的依赖。比如不同版本的SQLSDK,用户自助上传的Jar、UDF等,所以不同作业的提交阶段需要做隔离。我们采用的是Jarsrvic+进程隔离的方式,通过和JarSrvic对接,根据作业的类型和配置,选用相应的Jar,并且提交单独的进程中执行,实现物理隔离。结果缓存服务(RsultCachSrivc)是一个简易的缓存服务,用于SQL作业开发阶段的在线调试场景。当我们分析出用户的SQL语句,将Slct语句的结果集存入缓存服务中;然后用户可以在平台上通过选择SQL序号(每个完整的SELECT语句对应一个序号),实时查看SQL对应的结果数据,方便用户开发与分析问题。内置Connctors(SourcSink)最右侧的部分主要是各种Sourc、Sink的实现,有一些是重用Flink提供的connctor,有一些是我们自己开发的connctor。针对每一种connctor我们都添加了必要Mtric,并配置成单独的监控图表,方便用户了解作业运行情况,同时也为定位问题提供数据依据。

2.基于SQL的开发流程

在平台提供以上功能的基础上,用户可以快速的实现SQL作业的开发:

创建一个SQL任务;编写DDL声明Sourc和Sink;编写DML,完成主要业务逻辑的实现;在线查看结果,若数据符合预期,添加INSERTINTO语句,写入到指定Sink中即可。

平台默认会保存SQL每一次的变更记录,用户可以在线查看历史版本,同时我们会记录针对作业的各种操作,在作业维护阶段可以帮助用户追溯变更历史,定位问题。

下面是一个Dmo,用于统计当天的PV、UV数据:

3.基于Catalog的元数据管理

元数据管理的主要内容:

支持权限控制:除了支持基本的库表信息管理外,还支持表粒度的权限控制,结合我们自身的特点,支持用户组级别的授权;PluginCatalog机制:可以组合多种其他Catalog实现,复用已有的Catalog;库表生命周期行为统一:用户可以选择平台上的表和底层存储的生命周期统一,避免两边分别维护,重复建表;新老版本完全兼容:由于在AutoStram1.0的时候,我们没有单独引入Mtastor服务,此外1.0时期的DDLSQL解析模块是自研的组件。所以在建设MtaStor服务时,需要考虑历史作业和历史库表信息兼容的问题。对于库表信息,新的MtaStor在底层将新版和旧版的库表信息转换成统一的存储格式,从而保证了库表信息的兼容性。对于作业,这里我们通过抽象接口,并分别提供V1Srvic和V2Srvic两种实现路径,保证了新老作业在用户层面的兼容。

下面是几个模块和Mtastor交互的示意图:

4.UDXF管理

我们引入了JarSrvic服务用来管理各种Jar,包括用户自定义作业、平台内部SDK组件、UDXF等,在JarSrvic基础上我们可以很容易的实现UDXF的自助管理,在Onk8s的场景下,我们提供了统一的镜像,Pod启动后会从JarSrvic下载对应的Jar到容器内部,用于支持作业的启动。

用户提交的SQL中如果包含FunctionDDL,我们会在JobClintSrvic中会解析DDL,下载对应的Jar到本地。

为了避免和其他作业有依赖冲突,我们每次都会单独启动一个子进程来完成作业提交的操作。UDXFJar会被并加入到classpath中,我们对Flink做了一些修改,作业提交时会把这个Jar一并上传到HDFS中;同时AutoSQLSDK会根据函数名称和类名为当前作业注册UDF。

5.监控报警及日志收集

得益于Flink完善的Mtric机制,我们可以方便的添加Mtric,针对Connctor,我们内嵌了丰富的Mtric,并配置了默认的监控看板,通过看板可以查看CPU、内存、JVM、网络传输、Chckpoint、各种Connctor的监控图表。同时平台和公司的云监控系统对接,自动生成默认的报警策略,监控存活状态、消费延迟等关键指标。同时用户可以在云监控系统修改默认的报警策略,添加新的报警项实现个性化监控报警。

日志通过云Filbat组件写入到Elasticsarch集群,同时开放Kibana供用户查询。

整体的监控报警及日志收集架构如下:

6.健康检查机制

随着作业数的高速增长,出现了很多资源使用不合理的情况,比如前面提到的资源浪费的情况。用户大多时候都是在对接新需求,支持新业务,很少回过头来评估作业的资源配置是否合理,优化资源使用。所以平台规划了一版成本评估的模型,也就是现在说的健康检查机制,平台每天会针对作业做多维度的健康评分,用户可以随时在平台上查看单个作业的得分情况及最近30天的得分变化曲线。

低分作业会在用户登录平台时进行提示,并且定期发邮件提醒用户进行优化、整改,在优化作业后用户可以主动触发重新评分,查看优化效果。

我们引入了多维度,基于权重的评分策略,针对CPU、内存使用率、是否存在空闲Slot、GC情况、Kafka消费延迟、单核每秒处理数据量等多个维度的指标结合计算拓补图进行分析评估,最终产生一个综合分。

每个低分项都会显示低分的原因及参考范围,并显示一些指导建议,辅助用户进行优化。

我们新增了一个Mtric,用一个0%~%的数字体现TaskManagnrCPU利用率。这样用户可以直观的评估CPU是否存在浪费的情况。

下面是作业评分的大致流程:首先我们会收集和整理运行作业的基本信息和Mtrics信息。然后应用我们设定好的规则,得到基本评分和基础建议信息。最后将得分信息和建议整合,综合评判,得出综合得分和最终的报告。用户可以通过平台查看报告。对于得分较低的作业,我们会发送报警给作业的归属用户。

7.自助诊断

如之前提到的痛点,用户定位线上问题时,只能求助于我们平台方,造成我们on-call工作量很大,同时用户体验也不好,鉴于此,所以我们上线了以下功能:

动态修改日志级别:我们借鉴了Storm的修改日志级别的方式,在Flink上实现了类似功能,通过扩展RESTAPI和RPC接口的方法,支持修改指定Loggr的到某一日志级别,并支持设置一个过期时间,当过期后,改Loggr的日志会重新恢复为INFO级别;支持自助查看线程栈和堆内存信息:FlinkUI中已经支持在线查看线程栈(jstack),我们直接复用了这个接口;还额外增加了查看堆内存(jmap)的接口,方便用户在线查看;支持在线生成、查看火焰图:火焰图是定位程序性能问题的一大利器,我们利用了阿里的arthas组件,为Flink增加了在线查看火焰图的能力,用户遇到性能问题时,可以快速评估性能瓶颈。

8.基于Chckpoint复制的快速容灾

当实时计算应用在重要业务场景时,单个Yarn集群一旦出现故障且短期内不可恢复,那么可能会对业务造成较大影响。

在此背景下,我们建设了Yarn多集群架构,两个独立的Yarn各自对应一套独立的HDFS环境,chckpoint数据定期在两个HDFS间相互复制。目前chckpoint复制的延迟稳定在20分钟内。

同时,在平台层面,我们把切换集群的功能直接开放给用户,用户可以在线查看chckpoint的复制情况,选择合适的chckpoint后(当然也可以选择不从chckpoint恢复)进行集群切换,然后重启作业,实现作业在集群间的相对平滑的迁移。

三、基于Flink的实时生态建设

AutoStram平台的核心场景是支持实时计算开发人员的使用,使实时计算开发变得简单高效、可监控、易运维。同时随着平台的逐步完善,我们开始摸索如何对AutoStram平台进行重用,如何让Flink应用在更多场景下。重用AutoStram有以下几点优势:

Flink本身是优秀的分布式计算框架,有着较高的计算性能,良好的容错能力和成熟的状态管理机制,社区蓬勃发展,功能及稳定性有保障;AutoStram有着完善的监控和报警机制,作业运行在平台上,无需单独对接监控系统,同时Flink对Mtric支持很友好,可以方便的添加新的Mtric;大量的技术沉淀和运营经验,通过两年多的平台建设,我们在AutoStram上已经实现了较为完善的Flink作业全生命周期的管理,并建设了JarSrvic等基础组件,通过简单的上层接口包装,就可以对接其他系统,让其他系统具备实时计算的能力;支持Yarn和Kubrnts部署。

基于以上几点,我们在建设其他系统时,优先重用AutoStram平台,以接口调用的方式进行对接,将Flink作业全流程的生命周期,完全托管给AutoStram平台,各系统优先考虑实现自身的业务逻辑即可。

我们团队内的AutoDTS(接入及分发任务)和AutoKafka(Kafka集群复制)系统目前就是依托于AutoStram建设的。简单介绍一下集成的方式,以AutoDTS为例:

把任务Flink化,AutoDTS上的接入、分发任务,都是以Flink作业的形式存在;和AutoStram平台对接,调用接口实现Flink作业的创建、修改、启动、停止等操作。这里Flink作业既可以是Jar,也可以是SQL作业;AutoDTS平台根据业务场景,建设个性化的前端页面,个性化的表单数据,表单提交后,可以将表单数据存储到MySQL中;同时需要把作业信息以及Jar包地址等信息组装成AutoStram接口定义的格式,通过接口调用在AutoStram平台自动生成一个Flink任务,同时保存这个Flink任务的ID;启动AutoDTS的一个接入任务,直接调用AutoStram接口就实现了作业的启动。

1.AutoDTS数据接入分发平台

AutoDTS系统主要包含两部分功能:

数据接入:将数据库中的变更数据(Changlog)实时写入到Kafka;数据分发:将接入到Kafka的数据,实时写入到其他存储引擎。

1.1AutoDTS数据接入

下面是数据接入的架构图:

我们维护了基于Flink的数据接入SDK并定义了统一的JSON数据格式,也就是说MySQLBinlog,SQLSrvr、TiDB的变更数据接入到Kafka后,数据格式是一致的,下游业务使用时,基于统一格式做开发,无需


转载请注明:http://www.aierlanlan.com/rzfs/3016.html