空间大数据处理框架

文章目录
  1. 1. 大数据计算框架简介
    1. 1.1. 1.1. Hadoop
    2. 1.2. 1.2. Spark
    3. 1.3. 1.3. Flink
  2. 2. 2. Spark简介
    1. 2.1. 2.1. Saprk相关概念
    2. 2.2. 2.2. Spark核心模块
    3. 2.3. 2.3. Spark集群架构
      1. 2.3.1. 2.3.1. Driver
      2. 2.3.2. 2.3.2. Cluster Manager
      3. 2.3.3. 2.3.3. Worker
      4. 2.3.4. 2.3.4. Executor
    4. 2.4. 2.4. Spark任务提交过程
    5. 2.5. 2.5. Spark性能调优
      1. 2.5.1. 2.5.1. 参数调优
        1. 2.5.1.1. 2.5.1.1. num-executors
        2. 2.5.1.2. 2.5.1.2. executor-memory
        3. 2.5.1.3. 2.5.1.3. executor-core
        4. 2.5.1.4. 2.5.1.4. driver-memory
        5. 2.5.1.5. 2.5.1.5. Spark.default.parallelism
        6. 2.5.1.6. 2.5.1.6. Spark.Shuffle.memoryFraction
      2. 2.5.2. 2.5.2. 代码重构调优
        1. 2.5.2.1. 2.5.2.1. 尽可能复用一个RDD
        2. 2.5.2.2. 2.5.2.2. 对多次使用的RDD进行持久化
        3. 2.5.2.3. 2.5.2.3. 尽量避免使用Shuffle算子
        4. 2.5.2.4. 2.5.2.4. 使用高性能算子
        5. 2.5.2.5. 2.5.2.5. 使用Kryo优化序列化性能
        6. 2.5.2.6. 2.5.2.6. 优化数据结构
        7. 2.5.2.7. 2.5.2.7. 广播大变量
  3. 3. 3. 空间大数据计算框架简介
    1. 3.1. 3.1. Sedona(GeoSpark)简介
    2. 3.2. 3.2. 扩展——SQL/MM简介

大数据计算框架简介

Hadoop,Spark、Flink是目前重要的三大分布式计算系统

  • Hadoop用于离线复杂大数据处理
  • Spark用于离线快速的大数据处理
  • Flink用于在线实时的大数据处理

1.1. Hadoop

Hadoop是一个由Apache基金会所开发的分布式系统基础架构,主要解决海量数据的存储和分析计算问题。

广义上来说,Hadoop通常是指一个更广泛的概念——Haddop生态圈。

img

1) Hadoop分布式文件系统(HDFS)

HDFS (Hadoop Distributed File System) ,它是一个文件系统,用来存储文件,通过目录树来定位文件,其次,它是分布式的,由很多服务器联合起来实现其功能,集群中的服务器有各自的角色。

使用场景:HDFS适合一次写入,多次读出的场景,且不支持文件的修改,适合用来做数据分析,并不适合用来做网盘应用。

2) Hadoop MapReduce引擎

MapReduce是一种并行程序设计模型与方法,用于大规模数据集的并行运算。概念”Map(映射)”和”Reduce(归约)”是它们的主要思想。

img

1.2. Spark

Apache Spark是专为大规模数据处理而设计的快速通用的计算引擎。Spark是类Hadoop MapReduce的通用并行框架。Spark拥有Hadoop MapReduce所具有的优点;但不同于MapReduce的是,Job中间的输出结果可以保存在内存中,从而不再需要读写HDFS,因此Spark能更好地适用于数据挖掘与机器学习等需要迭代的MapReduce的算法。

img

Apache Flink是一个框架和分布式处理引擎,用于对无界和有界数据流进行有状态计算。Flink 被设计在所有常见的集群环境中运行,以内存执行速度和任意规模来执行计算。

流数据:在自然环境中,数据的产生原本就是流式的。但是当你分析数据时,可以围绕 有界流(bounded)或 无界流(unbounded)两种模型来组织处理数据,当然,选择不同的模型,程序的执行和处理方式也都会不同。

离线计算的代表spark和flink的对比

在spark中,一切都是由批次组成的,离线数据是一个大批次,而实时数据是由一个一个无限的小批次组成的。而在flink中,一切都是由流组成的,离线数据是有界限的流,实时数据是一个没有界限的流。

有界流\:****数据有时间的界限,比如我们长说某天的数据、某月的数据。

无界流:数据没有时间的界限,所处理的数据是持续不断的输入的。因此程序必须持续不断地对到达的数据进行处理。

2. Spark简介

2.1. Saprk相关概念

RDD(Resilient Distributed Dataset):弹性分布式数据集,是Spark的核心部分,可以理解为一个分布式数据结构的封装,并在此基础上提供了各种数据操作的API。其中,数据操作可以分成转换(**transformation**行动(**action**两类,转换构建各**RDD**之间的依赖关系,但不会实际执行,只有遇到行动操作后才会实际提交作业触发执行。RDD之间的依赖关系构成了RDD的血缘,当RDD丢失时可以通过血缘关系重新执行生成,保证了RDD的容错性。

举例:

数据集 map reduce
{x: x∈N+, x<10 } {x: f(x)} {x: ∑f(x)}、{x: Πf(x)}等
类别 函数名 解释
转换算子 map 将处理的数据逐条进行映射转换,这里的转换可以是类型的转换,也可以是值的转换
mapPartitions 将待处理的数据以分区为单位发送到计算节点进行处理,这里的处理是指可以进行任意的处理,包括过滤
mapPartitionsWithindex 将待处理的数据以分区为单位发送到计算节点进行处理,这里的处理是指可以进行任意的处理,包括过滤,在处理同时可以获取当前分区的索引
flatMap 将处理的数据进行扁平化后再进行映射处理,所以算子也称之为扁平映射
glom 将同一个分区的数据直接转换为相同类型的内存数组进行处理,分区不变
groupBy 将数据根据指定的规则进行分组,分区默认不变,但是数据会被打乱重新组合,我们将这样的操作称之为shuffle,极限情况下,数据可能被分到同一个分区中
filter 将数据根据执行的规则进行筛选过滤,符合规则的数据保留,不符合规则的数据丢弃。当数据进行筛选过滤后,分区不变,但是分区内的数据可能不均衡,生产环境下,可能出现数据倾斜
sample 根据指定的规则聪数据中抽取数据
coalesce 根据数据量进行缩减分区,用于大数据集过滤后,提高小数据集的执行效率,减少任务调度成本
distinct 将数据集中的重复数据进行去重
repartitions 调整分区数量
sortBy 可以根据指定的规则对数据源中的数据进行排序
intersection 对源RDD和参数RDD求交集后返回一个新的RDD
union 对源RDD和参数RDD求并集后返回一个新的RDD
subtract 对源RDD和参数RDD求差集后返回一个新的RDD
zip 相同位置的数据拉取到一块
partitionBy 将数据按照指定Partitioner重新进行分区
reduceByKey 将相同的key进行分组,将value聚合
groupByKey 将分区的数据直接转换为相同类型的内存数组进行后续处理
aggregateByKey 将数据根据不同规则进行分区内计算和分区间计算
flodByKey foldByKey就是aggregateByKey的简化版本,当aggregateByKey的第二个参数计算逻辑相同时就可以用foldByKey
combineByKey 需要三个参数 1.将相同key的第一个数据进行结构的转换,实现操作 2.分区内计算规则 3.分区间计算规则
join 在类型为(K,V)和(K,W)的RDD上调用,返回一个相同的key对应的所有元素连接在一起的(K,(v,w))的RDD
leftOuterJoin 类似于sql的左外连接
rightOuterJoin 类似于sql的右外连接
cogroup 在类型(K,V)和(K,W)的RDD上调用,返回一个(K,(Iterable,Iterable))类型的RDD
行动算子 reduce 聚集RDD中的所有元素,先聚合分区内数据,再聚合分区间数
collect 方法会将不同分区的数据按照分区顺序采集到Driver端内存中,形成数组
count 数据源中数据的个数
first 数据源中数据的第一个
take 获取n个数据
takeOrdered 数据排序后取n个数据
aggregate 分区的数据通过初始值和分区内的数据进行聚合,然后再和初始值进行分区间的数据聚合
fold 折叠操作,aggregate的简化版操作
countByKey 统计每种key的个数

Partition:数据分区,即RDD进行数据划分的单元,也是Spark数据处理的单元,任务的数量由分区数量确定,一个任务处理一个分区的数据。

Dependency:RDD之间的依赖关系。分为窄依赖(NarrowDependency)和Shuffle依赖。前者表示子RDD中的分区依赖于父RDD中固定数量的分区,包括依赖一个分区的OneToOneDependency和一定范围内多个分区的RangeDependency;后者也称为宽依赖,子RDD中的分区可能对父RDD中所有的分区产生依赖,具体依赖关系取决于分区器partitioner。

Job:Spark中的作业,对**RDD**的一个行动操作对应一个作业。每个作业包括了该RDD的行动操作以及构成当前RDD血缘关系的所有转换操作,一个Spark应用可以包含多个作业。

Stage:在每个作业中,Spark会根据**shuffle**依赖将作业划分成多个阶段。从最后一个触发行动操作的RDD开始(最后一个属于ResultStage),对RDD的依赖进行回溯,如果遇到shuffle依赖,则将shuffle之前的RDD视为一个阶段(ShuffleMapStage),shuffle之后的RDD到回溯起点之间所有的RDD视为一个阶段,接着从shuffle之前的RDD开始继续回溯,直到回溯至与外部数据源相关的RDD为止,如图1所示。

Task:每个阶段中,Spark会根据**RDD**的分区数量,创建相同数量的任务,每个任务处理一个分区的数据。其中,ShuffleMapStage中的任务为ShuffleMapTask,ResultStage中的任务为ResultTask。

下图为Spark中Stage的划分方式

img

2.2. Spark核心模块

Spark的模块组成包括核心模块Spark Core以及建立在核心模块之上的扩展模块Spark SQL、Spark Streaming、GraphX、MLlib。Spark核心模块提供了Spark中最基础最核心的功能,包括:

基础设施:分为负责参数配置的SparkConf,负责Spark跨节点组件之间通信的基于netty实现的RPC框架,负责SparkContext内部组件间通信的事件总线ListenerBus,负责监控各个组件运行期状态的度量系统;

SparkContext:作为Spark所有功能的集成者和用户程序开发的接口,其内部封装了网络通信、消息通信、存储体系、计算引擎、度量系统、WebUI等内容,这些功能对开发人员来说都是透明的;

SparkEnv:是Spark执行环境,任务运行所必需的组件,由RpcEnv、序列化管理器、BroadcastManager、MapOutputTracker、存储体系、度量系统、OutputCommitCoordinator等组件组成;

存储体系:分为内存存储管理和磁盘存储管理,Spark的内存空间提供了Tungsten的实现,对堆外内存进行管理。此外,Spark的内存存储空间和计算存储空间之间的边界是“软”边界,当一方资源不够时可向另一方借用,提高了资源利用率;

调度体系:分为DAGScheduler和TaskScheduler,内置在SparkContext中。DAGScheduler负责解析RDD之间的依赖,按照shuffle操作划分阶段,根据阶段创建TaskSet,TaskScheduler负责按照FAIR或FIFO调度算法对TaskSet进行调度,为每个任务分配可执行的Executor,并将序列化后的任务发送到指定的Executor上执行;

计算引擎:由内存管理器、Tungsten、任务内存管理器、外部排序器、Shuffle管理器等组成,其中外部排序器用于在map端或者reduce端对ShuffleMapTask计算得到的中间结果进行排序、聚合等操作,Shuffle管理器用于将各个分区对应的ShuffleMapTask产生的中间结果持久化到磁盘,并在reduce端按照分区远程拉取生成的中间结果。

2.3. Spark集群架构

从集群部署的角度来看,Spark集群由集群管理器(Cluster Manager)、工作节点(Worker)、执行器(Executor)、驱动器(Driver)、应用程序(Application)等部分组成。
img

2.3.1. Driver

用于运行Spark程序的 main 方法,创建SparkContext对象

Driver在spark作业执行时主要负责:

1)启动SparkContext或SparkSession,将用户程序转化为作业(Job);

2)在Executor之间调度任务(Task);

3)跟踪Executor的执行情况;

4)通过UI展示查询运行情况。

2.3.2. Cluster Manager

在YARN部署模式下为ResourceManager

负责对整个集群资源的分配与管理,不负责对Executor的资源的分配。

常见的集群管理工具有Yarn、Mesos、Kubernetes、Standalone。

2.3.3. Worker

在YARN部署模式下为NodeManager

负责启动和管理Executor,将Executor状态信息同步给Cluster Manager。

2.3.4. Executor

负责执行Spark作业中具体的任务,任务彼此之间相互独立。

Executor 有两个核心功能:

1)向Driver认领属于自己的任务,接收任务后负责运行Spark Task,并将结果返回给Driver。

2)通过自身的Block Manager为用户程序中要求缓存的RDD提供内存式存储。RDD 的数据是直接缓存在 Executor 进程内的,因此任务可以在运行时充分利用缓存数据加速运算。

2.4. Spark任务提交过程

当用户通过SparkContext提交应用程序时,分为以下几步:

1)首先通过RpcEnv向ClusterManager注册应用并申请资源

2)Cluster Manager根据应用的需求,将对应的Executor分配给应用程序,并在Worker上启动Executor后台进程,该进程通过RpcEnv向Driver注册Executor资源信息,Driver中的TaskScheduler保存Executor的地址、内存以及核数等信息;

3)SparkContext根据RDD的转换操作,构建RDD的血缘关系,并在遇到行动操作的时候,将构建的依赖关系提交给DAGScheduler,DAGScheduler对依赖关系进行解析,生成TaskSet,将TaskSet保存至任务池,TaskScheduler从任务池中取出TaskSet后按照FAIR或FIFO算法进行调度,给任务分配Executor资源,并将序列化后的任务发送至Executor执行。

2.5. Spark性能调优

2.5.1. 参数调优

2.5.1.1. num-executors

该参数用于设计Spark作业总的Executor进程的个数。YARN集群管理器会尽可能根据num-executor设置在工作节点上启动Executor。

2.5.1.2. executor-memory

该参数用于设置每个Executor 进程的内存,Executor内存的大小,很多程度上直接决定了Spark作业的性能,而且跟很常见的Java中的虚拟机内存溢出异常(OOM)也有关系。

2.5.1.3. executor-core

该参数用于设置每个Executor进程的CPU core 数量。

2.5.1.4. driver-memory

该参数用于设置Driver进程的内存,这个参数通常不设置。但是要注意的一点是,使用collect算子时,一定要保证Driver内存足够大,否则会出现内存溢出的错误。

2.5.1.5. Spark.default.parallelism

该参数用于设置每个Stage默认的task数量。

2.5.1.6. Spark.Shuffle.memoryFraction

该参数用于设置Shuffle过程中一个task拉取到上个Stage的task的输出后,进行聚合操作时能够使用的Executor内存的比例,默认是0.2。也就是说,Executor默认只有20%的内存用来进行该操作。shuffle操作在进行聚合时,如果发现使用的内存超出了这个20%的限制,那么多余的数据就会溢写到磁盘文件中去,此时就会极大地降低性能。建议:如果Spark作业中的RDD持久化操作较少,shuffle操作较多时,建议降低持久化操作的内存占比,提高shuffle操作的内存占比比例,避免shuffle过程中数据过多时内存不够用,必须溢写到磁盘上,降低了性能。此外,如果发现作业由于频繁的gc导致运行缓慢,意味着task执行用户代码的内存不够用,那么同样建议调低这个参数的值。

2.5.2. 代码重构调优

2.5.2.1. 尽可能复用一个RDD

在对不同的数据执行算子操作时应该尽量复用一个RDD。例如,当RDD A的数据格式是key-value类型的,RDD B的数据格式是value类型的,但是这两个RDD的value数据完全相同;那么,RDD A包含了RDD B中的所有信息,理论上来说RDD B可以被替代,而实际开发中也应该尽量减少多个RDD数据有重复或者包含的情况,这样可以尽可能减少RDD的数量从而减少算子执行的次数。

2.5.2.2. 对多次使用的RDD进行持久化

RDD的持有化有几种不同的级别,分别是:MEMORY_ONLY、MEMORY_AND_DISK、MEMORY_ONLY_SER、MEMORY_AND_DISK_SER、DISK_ONLY、MEMORY_ONLY_2 等,这几种持久化级别使用的优先级排序如下:

  1. MEMORY_ONLY性能最高,直接将RDD存储在内存中,省去了序列化及反序列化、从磁盘读取的时间,但是对于内存的容量有较高的要求;
  2. MEMORY_ONLY_SER会将数据序列化后保存在内存中,通过序列化压缩了RDD的大小,但是相较于MEMORY_ONLY多出了序列化及反序列化的时间;
  3. MEMORY_AND_DISK_SER优先将RDD缓存在内存中,内存缓存不下时才会存在磁盘中;
  4. DISK_ONLY和后缀为_2的级别通常不建议使用,完全基于磁盘文件的读写会导致性能的极具降低;后缀为2的级别会将所有数据都复制一份副本到其他节点上,数据复制及网络传输会导致较大的性能开销。

2.5.2.3. 尽量避免使用Shuffle算子

Spark作业最消耗性能的部分就是Shuffle过程,应尽量避免使用Shuffle算子。Shuffle过程就是将分布在集群中多个节点上的同一个 key,拉取到同一个节点上,进行聚合或者join操作,在操作过程中可能会因为一个节点上处理的key过多导致数据溢出到磁盘。由此可见,Shuffle过程可能会发生大量的磁盘文件读写的 IO 操作,以及数据的网络传输操作。

Shuffle类算子有:distinct、groupByKey、reduceByKey、aggregateByKey、join、cogroup、repartition等,编写Spark作业程序时,应该尽量使用map类算子替代Shuffle算子

2.5.2.4. 使用高性能算子

  1. 使用reduceByKey/aggregateByKey替代groupByKey
  2. 使用mapPartitions替代普通map Transformation算子
  3. 使用foreachPartitions替代foreach Action算子
  4. 使用filter之后进行coalesce操作
  5. repartition:coalesce(numPartitions,true)增多分区使用这个
  6. coalesce(numPartitions,false)减少分区,没有shuffle只是合并partition

2.5.2.5. 使用Kryo优化序列化性能

Spark支持使用Kryo序列化机制。这种序列化机制,比默认的Java序列化机制速度要快,序列化后的数据更小。所以Kryo序列化优化以后,可以让网络传输的数据变少,在集群中耗费的内存资源大大减少。

2.5.2.6. 优化数据结构

在Java中有三种类型比较耗费内存

  • 对象:每个Java对象都有对象头、引用等额外的信息,因此比较占用内存空间。
  • 字符串:每个字符串内部都有一个字符数组以及长度等额外信息。
  • 集合类型:比如HashMap、LinkedList等,因为集合类型内部通常会使用一些内部类来封装集合元素,比如Map.Entry。

因此Spark编码时应尽量不要使用以上三种数据结构,尽量使用字符串代替对象,使用原始类型(比如 Int、Long)替代字符串,使用数组替代集合类型,这样尽可能地减少内存占用,降低垃圾回收的频率提高性能。

2.5.2.7. 广播大变量

开发过程中,会遇到需要在算子函数中使用外部变量的场景(尤其是大变量,比如100M以上的大集合),那么此时就应该使用Spark的广播(Broadcast)功能来提升性能;函数中使用到外部变量时,默认情况下,Spark会将该变量复制多个副本,通过网络传输到task中,此时每个task都有一个变量副本。如果变量本身比较大的话(比如100M,甚至1G),那么大量的变量副本在网络中传输的性能开销,以及在各个节点的Executor中占用过多内存导致的频繁GC(垃圾回收),都会极大地影响性能; 如果使用的外部变量比较大,建议使用Spark的广播功能,对该变量进行广播。广播后的变量,会保证每个Executor的内存中,只驻留一份变量副本,而Executor中的 task执行时共享该Executor中的那份变量副本。这样的话,可以大大减少变量副本的数量,从而减少网络传输的性能开销,并减少对Executor内存的占用开销,降低 GC的频率。

img

3. 空间大数据计算框架简介

3.1. Sedona(GeoSpark)简介

4.1.2.3 sedona(geospark)

3.2. 扩展——SQL/MM简介

1.4.2 SQL/MM