大数据计算框架简介
Hadoop,Spark、Flink是目前重要的三大分布式计算系统
- Hadoop用于离线复杂大数据处理
- Spark用于离线快速的大数据处理
- Flink用于在线实时的大数据处理
1.1. Hadoop
Hadoop是一个由Apache基金会所开发的分布式系统基础架构,主要解决海量数据的存储和分析计算问题。
广义上来说,Hadoop通常是指一个更广泛的概念——Haddop生态圈。
1) Hadoop分布式文件系统(HDFS)
HDFS (Hadoop Distributed File System) ,它是一个文件系统,用来存储文件,通过目录树来定位文件,其次,它是分布式的,由很多服务器联合起来实现其功能,集群中的服务器有各自的角色。
使用场景:HDFS适合一次写入,多次读出的场景,且不支持文件的修改,适合用来做数据分析,并不适合用来做网盘应用。
2) Hadoop MapReduce引擎
MapReduce是一种并行程序设计模型与方法,用于大规模数据集的并行运算。概念”Map(映射)”和”Reduce(归约)”是它们的主要思想。
1.2. Spark
Apache Spark是专为大规模数据处理而设计的快速通用的计算引擎。Spark是类Hadoop MapReduce的通用并行框架。Spark拥有Hadoop MapReduce所具有的优点;但不同于MapReduce的是,Job中间的输出结果可以保存在内存中,从而不再需要读写HDFS,因此Spark能更好地适用于数据挖掘与机器学习等需要迭代的MapReduce的算法。
1.3. Flink
Apache Flink是一个框架和分布式处理引擎,用于对无界和有界数据流进行有状态计算。Flink 被设计在所有常见的集群环境中运行,以内存执行速度和任意规模来执行计算。
流数据:在自然环境中,数据的产生原本就是流式的。但是当你分析数据时,可以围绕 有界流(bounded)或 无界流(unbounded)两种模型来组织处理数据,当然,选择不同的模型,程序的执行和处理方式也都会不同。
离线计算的代表spark和flink的对比
在spark中,一切都是由批次组成的,离线数据是一个大批次,而实时数据是由一个一个无限的小批次组成的。而在flink中,一切都是由流组成的,离线数据是有界限的流,实时数据是一个没有界限的流。
有界流\:****数据有时间的界限,比如我们长说某天的数据、某月的数据。
无界流:数据没有时间的界限,所处理的数据是持续不断的输入的。因此程序必须持续不断地对到达的数据进行处理。
2. Spark简介
2.1. Saprk相关概念
RDD(Resilient Distributed Dataset):弹性分布式数据集,是Spark的核心部分,可以理解为一个分布式数据结构的封装,并在此基础上提供了各种数据操作的API。其中,数据操作可以分成转换(**transformation**)和行动(**action**)两类,转换构建各**RDD**之间的依赖关系,但不会实际执行,只有遇到行动操作后才会实际提交作业触发执行。RDD之间的依赖关系构成了RDD的血缘,当RDD丢失时可以通过血缘关系重新执行生成,保证了RDD的容错性。
举例:
数据集 | map | reduce |
---|---|---|
{x: x∈N+, x<10 } | {x: f(x)} | {x: ∑f(x)}、{x: Πf(x)}等 |
类别 | 函数名 | 解释 |
---|---|---|
转换算子 | map | 将处理的数据逐条进行映射转换,这里的转换可以是类型的转换,也可以是值的转换 |
mapPartitions | 将待处理的数据以分区为单位发送到计算节点进行处理,这里的处理是指可以进行任意的处理,包括过滤 | |
mapPartitionsWithindex | 将待处理的数据以分区为单位发送到计算节点进行处理,这里的处理是指可以进行任意的处理,包括过滤,在处理同时可以获取当前分区的索引 | |
flatMap | 将处理的数据进行扁平化后再进行映射处理,所以算子也称之为扁平映射 | |
glom | 将同一个分区的数据直接转换为相同类型的内存数组进行处理,分区不变 | |
groupBy | 将数据根据指定的规则进行分组,分区默认不变,但是数据会被打乱重新组合,我们将这样的操作称之为shuffle,极限情况下,数据可能被分到同一个分区中 | |
filter | 将数据根据执行的规则进行筛选过滤,符合规则的数据保留,不符合规则的数据丢弃。当数据进行筛选过滤后,分区不变,但是分区内的数据可能不均衡,生产环境下,可能出现数据倾斜 | |
sample | 根据指定的规则聪数据中抽取数据 | |
coalesce | 根据数据量进行缩减分区,用于大数据集过滤后,提高小数据集的执行效率,减少任务调度成本 | |
distinct | 将数据集中的重复数据进行去重 | |
repartitions | 调整分区数量 | |
sortBy | 可以根据指定的规则对数据源中的数据进行排序 | |
intersection | 对源RDD和参数RDD求交集后返回一个新的RDD | |
union | 对源RDD和参数RDD求并集后返回一个新的RDD | |
subtract | 对源RDD和参数RDD求差集后返回一个新的RDD | |
zip | 相同位置的数据拉取到一块 | |
partitionBy | 将数据按照指定Partitioner重新进行分区 | |
reduceByKey | 将相同的key进行分组,将value聚合 | |
groupByKey | 将分区的数据直接转换为相同类型的内存数组进行后续处理 | |
aggregateByKey | 将数据根据不同规则进行分区内计算和分区间计算 | |
flodByKey | foldByKey就是aggregateByKey的简化版本,当aggregateByKey的第二个参数计算逻辑相同时就可以用foldByKey | |
combineByKey | 需要三个参数 1.将相同key的第一个数据进行结构的转换,实现操作 2.分区内计算规则 3.分区间计算规则 | |
join | 在类型为(K,V)和(K,W)的RDD上调用,返回一个相同的key对应的所有元素连接在一起的(K,(v,w))的RDD | |
leftOuterJoin | 类似于sql的左外连接 | |
rightOuterJoin | 类似于sql的右外连接 | |
cogroup | 在类型(K,V)和(K,W)的RDD上调用,返回一个(K,(Iterable |
|
… | … | |
行动算子 | reduce | 聚集RDD中的所有元素,先聚合分区内数据,再聚合分区间数 |
collect | 方法会将不同分区的数据按照分区顺序采集到Driver端内存中,形成数组 | |
count | 数据源中数据的个数 | |
first | 数据源中数据的第一个 | |
take | 获取n个数据 | |
takeOrdered | 数据排序后取n个数据 | |
aggregate | 分区的数据通过初始值和分区内的数据进行聚合,然后再和初始值进行分区间的数据聚合 | |
fold | 折叠操作,aggregate的简化版操作 | |
countByKey | 统计每种key的个数 | |
… | … |
Partition:数据分区,即RDD进行数据划分的单元,也是Spark数据处理的单元,任务的数量由分区数量确定,一个任务处理一个分区的数据。
Dependency:RDD之间的依赖关系。分为窄依赖(NarrowDependency)和Shuffle依赖。前者表示子RDD中的分区依赖于父RDD中固定数量的分区,包括依赖一个分区的OneToOneDependency和一定范围内多个分区的RangeDependency;后者也称为宽依赖,子RDD中的分区可能对父RDD中所有的分区产生依赖,具体依赖关系取决于分区器partitioner。
Job:Spark中的作业,对**RDD**的一个行动操作对应一个作业。每个作业包括了该RDD的行动操作以及构成当前RDD血缘关系的所有转换操作,一个Spark应用可以包含多个作业。
Stage:在每个作业中,Spark会根据**shuffle**依赖将作业划分成多个阶段。从最后一个触发行动操作的RDD开始(最后一个属于ResultStage),对RDD的依赖进行回溯,如果遇到shuffle依赖,则将shuffle之前的RDD视为一个阶段(ShuffleMapStage),shuffle之后的RDD到回溯起点之间所有的RDD视为一个阶段,接着从shuffle之前的RDD开始继续回溯,直到回溯至与外部数据源相关的RDD为止,如图1所示。
Task:每个阶段中,Spark会根据**RDD**的分区数量,创建相同数量的任务,每个任务处理一个分区的数据。其中,ShuffleMapStage中的任务为ShuffleMapTask,ResultStage中的任务为ResultTask。
下图为Spark中Stage的划分方式
2.2. Spark核心模块
Spark的模块组成包括核心模块Spark Core以及建立在核心模块之上的扩展模块Spark SQL、Spark Streaming、GraphX、MLlib。Spark核心模块提供了Spark中最基础最核心的功能,包括:
基础设施:分为负责参数配置的SparkConf,负责Spark跨节点组件之间通信的基于netty实现的RPC框架,负责SparkContext内部组件间通信的事件总线ListenerBus,负责监控各个组件运行期状态的度量系统;
SparkContext:作为Spark所有功能的集成者和用户程序开发的接口,其内部封装了网络通信、消息通信、存储体系、计算引擎、度量系统、WebUI等内容,这些功能对开发人员来说都是透明的;
SparkEnv:是Spark执行环境,任务运行所必需的组件,由RpcEnv、序列化管理器、BroadcastManager、MapOutputTracker、存储体系、度量系统、OutputCommitCoordinator等组件组成;
存储体系:分为内存存储管理和磁盘存储管理,Spark的内存空间提供了Tungsten的实现,对堆外内存进行管理。此外,Spark的内存存储空间和计算存储空间之间的边界是“软”边界,当一方资源不够时可向另一方借用,提高了资源利用率;
调度体系:分为DAGScheduler和TaskScheduler,内置在SparkContext中。DAGScheduler负责解析RDD之间的依赖,按照shuffle操作划分阶段,根据阶段创建TaskSet,TaskScheduler负责按照FAIR或FIFO调度算法对TaskSet进行调度,为每个任务分配可执行的Executor,并将序列化后的任务发送到指定的Executor上执行;
计算引擎:由内存管理器、Tungsten、任务内存管理器、外部排序器、Shuffle管理器等组成,其中外部排序器用于在map端或者reduce端对ShuffleMapTask计算得到的中间结果进行排序、聚合等操作,Shuffle管理器用于将各个分区对应的ShuffleMapTask产生的中间结果持久化到磁盘,并在reduce端按照分区远程拉取生成的中间结果。
2.3. Spark集群架构
从集群部署的角度来看,Spark集群由集群管理器(Cluster Manager)、工作节点(Worker)、执行器(Executor)、驱动器(Driver)、应用程序(Application)等部分组成。
2.3.1. Driver
用于运行Spark程序的 main 方法,创建SparkContext对象
Driver在spark作业执行时主要负责:
1)启动SparkContext或SparkSession,将用户程序转化为作业(Job);
2)在Executor之间调度任务(Task);
3)跟踪Executor的执行情况;
4)通过UI展示查询运行情况。
2.3.2. Cluster Manager
在YARN部署模式下为ResourceManager
负责对整个集群资源的分配与管理,不负责对Executor的资源的分配。
常见的集群管理工具有Yarn、Mesos、Kubernetes、Standalone。
2.3.3. Worker
在YARN部署模式下为NodeManager
负责启动和管理Executor,将Executor状态信息同步给Cluster Manager。
2.3.4. Executor
负责执行Spark作业中具体的任务,任务彼此之间相互独立。
Executor 有两个核心功能:
1)向Driver认领属于自己的任务,接收任务后负责运行Spark Task,并将结果返回给Driver。
2)通过自身的Block Manager为用户程序中要求缓存的RDD提供内存式存储。RDD 的数据是直接缓存在 Executor 进程内的,因此任务可以在运行时充分利用缓存数据加速运算。
2.4. Spark任务提交过程
当用户通过SparkContext提交应用程序时,分为以下几步:
1)首先通过RpcEnv向ClusterManager注册应用并申请资源。
2)Cluster Manager根据应用的需求,将对应的Executor分配给应用程序,并在Worker上启动Executor后台进程,该进程通过RpcEnv向Driver注册Executor资源信息,Driver中的TaskScheduler保存Executor的地址、内存以及核数等信息;
3)SparkContext根据RDD的转换操作,构建RDD的血缘关系,并在遇到行动操作的时候,将构建的依赖关系提交给DAGScheduler,DAGScheduler对依赖关系进行解析,生成TaskSet,将TaskSet保存至任务池,TaskScheduler从任务池中取出TaskSet后按照FAIR或FIFO算法进行调度,给任务分配Executor资源,并将序列化后的任务发送至Executor执行。
2.5. Spark性能调优
2.5.1. 参数调优
2.5.1.1. num-executors
该参数用于设计Spark作业总的Executor进程的个数。YARN集群管理器会尽可能根据num-executor设置在工作节点上启动Executor。
2.5.1.2. executor-memory
该参数用于设置每个Executor 进程的内存,Executor内存的大小,很多程度上直接决定了Spark作业的性能,而且跟很常见的Java中的虚拟机内存溢出异常(OOM)也有关系。
2.5.1.3. executor-core
该参数用于设置每个Executor进程的CPU core 数量。
2.5.1.4. driver-memory
该参数用于设置Driver进程的内存,这个参数通常不设置。但是要注意的一点是,使用collect算子时,一定要保证Driver内存足够大,否则会出现内存溢出的错误。
2.5.1.5. Spark.default.parallelism
该参数用于设置每个Stage默认的task数量。
2.5.1.6. Spark.Shuffle.memoryFraction
该参数用于设置Shuffle过程中一个task拉取到上个Stage的task的输出后,进行聚合操作时能够使用的Executor内存的比例,默认是0.2。也就是说,Executor默认只有20%的内存用来进行该操作。shuffle操作在进行聚合时,如果发现使用的内存超出了这个20%的限制,那么多余的数据就会溢写到磁盘文件中去,此时就会极大地降低性能。建议:如果Spark作业中的RDD持久化操作较少,shuffle操作较多时,建议降低持久化操作的内存占比,提高shuffle操作的内存占比比例,避免shuffle过程中数据过多时内存不够用,必须溢写到磁盘上,降低了性能。此外,如果发现作业由于频繁的gc导致运行缓慢,意味着task执行用户代码的内存不够用,那么同样建议调低这个参数的值。
2.5.2. 代码重构调优
2.5.2.1. 尽可能复用一个RDD
在对不同的数据执行算子操作时应该尽量复用一个RDD。例如,当RDD A的数据格式是key-value类型的,RDD B的数据格式是value类型的,但是这两个RDD的value数据完全相同;那么,RDD A包含了RDD B中的所有信息,理论上来说RDD B可以被替代,而实际开发中也应该尽量减少多个RDD数据有重复或者包含的情况,这样可以尽可能减少RDD的数量从而减少算子执行的次数。
2.5.2.2. 对多次使用的RDD进行持久化
RDD的持有化有几种不同的级别,分别是:MEMORY_ONLY、MEMORY_AND_DISK、MEMORY_ONLY_SER、MEMORY_AND_DISK_SER、DISK_ONLY、MEMORY_ONLY_2 等,这几种持久化级别使用的优先级排序如下:
- MEMORY_ONLY性能最高,直接将RDD存储在内存中,省去了序列化及反序列化、从磁盘读取的时间,但是对于内存的容量有较高的要求;
- MEMORY_ONLY_SER会将数据序列化后保存在内存中,通过序列化压缩了RDD的大小,但是相较于MEMORY_ONLY多出了序列化及反序列化的时间;
- MEMORY_AND_DISK_SER优先将RDD缓存在内存中,内存缓存不下时才会存在磁盘中;
- DISK_ONLY和后缀为_2的级别通常不建议使用,完全基于磁盘文件的读写会导致性能的极具降低;后缀为2的级别会将所有数据都复制一份副本到其他节点上,数据复制及网络传输会导致较大的性能开销。
2.5.2.3. 尽量避免使用Shuffle算子
Spark作业最消耗性能的部分就是Shuffle过程,应尽量避免使用Shuffle算子。Shuffle过程就是将分布在集群中多个节点上的同一个 key,拉取到同一个节点上,进行聚合或者join操作,在操作过程中可能会因为一个节点上处理的key过多导致数据溢出到磁盘。由此可见,Shuffle过程可能会发生大量的磁盘文件读写的 IO 操作,以及数据的网络传输操作。
Shuffle类算子有:distinct、groupByKey、reduceByKey、aggregateByKey、join、cogroup、repartition等,编写Spark作业程序时,应该尽量使用map类算子替代Shuffle算子。
2.5.2.4. 使用高性能算子
- 使用reduceByKey/aggregateByKey替代groupByKey
- 使用mapPartitions替代普通map Transformation算子
- 使用foreachPartitions替代foreach Action算子
- 使用filter之后进行coalesce操作
- repartition:coalesce(numPartitions,true)增多分区使用这个
- coalesce(numPartitions,false)减少分区,没有shuffle只是合并partition
2.5.2.5. 使用Kryo优化序列化性能
Spark支持使用Kryo序列化机制。这种序列化机制,比默认的Java序列化机制速度要快,序列化后的数据更小。所以Kryo序列化优化以后,可以让网络传输的数据变少,在集群中耗费的内存资源大大减少。
2.5.2.6. 优化数据结构
在Java中有三种类型比较耗费内存
- 对象:每个Java对象都有对象头、引用等额外的信息,因此比较占用内存空间。
- 字符串:每个字符串内部都有一个字符数组以及长度等额外信息。
- 集合类型:比如HashMap、LinkedList等,因为集合类型内部通常会使用一些内部类来封装集合元素,比如Map.Entry。
因此Spark编码时应尽量不要使用以上三种数据结构,尽量使用字符串代替对象,使用原始类型(比如 Int、Long)替代字符串,使用数组替代集合类型,这样尽可能地减少内存占用,降低垃圾回收的频率提高性能。
2.5.2.7. 广播大变量
开发过程中,会遇到需要在算子函数中使用外部变量的场景(尤其是大变量,比如100M以上的大集合),那么此时就应该使用Spark的广播(Broadcast)功能来提升性能;函数中使用到外部变量时,默认情况下,Spark会将该变量复制多个副本,通过网络传输到task中,此时每个task都有一个变量副本。如果变量本身比较大的话(比如100M,甚至1G),那么大量的变量副本在网络中传输的性能开销,以及在各个节点的Executor中占用过多内存导致的频繁GC(垃圾回收),都会极大地影响性能; 如果使用的外部变量比较大,建议使用Spark的广播功能,对该变量进行广播。广播后的变量,会保证每个Executor的内存中,只驻留一份变量副本,而Executor中的 task执行时共享该Executor中的那份变量副本。这样的话,可以大大减少变量副本的数量,从而减少网络传输的性能开销,并减少对Executor内存的占用开销,降低 GC的频率。