1 Spark基础篇

1.1 介绍一下Spark

Apache Spark是一个分布式、内存级计算框架。起初为加州大学伯克利分校AMPLab的实验性项目,后经过开源,在2014年成为Apache基金会顶级项目之一,现已更新至3.2.0版本。

1.2 谈一谈Spark的生态体系

Spark体系包含Spark Core、Spark SQL、Spark Streaming、Spark MLlib及 Spark Graphx。其中Spark Core为核心组件,提供RDD计算模型。在其基础上的众组件分别提供查询分析、实时计算、机器学、图计算等功能。

1.3说说Spark的工作流程

主要考察对Spark运行机制的理解,需要掌握Spark任务提交、资源申请、任务分配等阶段中各组件的协作机制,这里放上Spark官网的工作流程示意图。

Tips: 可结合4、5点运行模式原理展开细说

  • 客户端提交任务,创建Driver进程并初始化SparkContext
  • SparkContext向Cluster Manager申请资源
  • Cluster Manager选择合适的worker节点创建executor进程
  • Executor向Driver端注册,并等待其分配task任务
  • SparkContext构建DAG图(有向无环图)、划分stage并分配taskset至Executor
  • Executor启动Task线程执行具体任务

1.4 Spark运行模式有哪些?说说你最熟悉的一种

Spark的运行模式包括Local、Standalone、Yarn及Mesos几种。其中Local模式仅用于本地开发,Mesos模式国内几乎不用。在公司中因为大数据服务基本搭载Yarn集群调度,因此Spark On Yarn模式会用的比较多。

Standalone模式是Spark内置的运行模式,常用于小型测试集群。这里我就拿Standalone模式来举例:

  • Master为资源调度器,负责executors资源调度
  • Worker负责Executor进程的启动和监控
  • Driver在客户端启动,负责SparkContext初始化

1.5 谈谈Yarn ClusterYarn Client模式的区别

这是Spark中最普遍的一道面试题,主要是考察对Spark On Yarn 原理掌握的扎实程度。yarn-cluster 和 yarn-client 模式的区别其实就是 Application Master 进程的区别。

Yarn Cluster模式的driver进程托管给Yarn(AppMaster)管理,通过yarn UI或者Yarn logs命令查看日志。yarn-cluster 模式下,driver 运行在 AM(Application Master)中,它负责向 YARN 申请资源,并监督作业的运行状况。当用户提交了作业之后,就可以关掉 Client,作业会继续在 YARN 上运行。然而 yarn-cluster 模式不适合运行交互类型的作业。

Yarn Client模式的driver进程运行在本地客户端,因资源调度、任务分发会和Yarn集群产生大量网络通信,出现网络激增现象,适合本地调试,不建议生产上使用。Application Master 仅仅向 YARN 请求 executor,Client 会和请求的 container 通信来调度他们工作,也就是说 Client 不能离开。

两者具体执行流程整理如下:

  • Yarn Cluster模式

  • Yarn Client模式

1.6 简单讲下RDD的特性

RDD(分布式弹性数据集)是Spark的基础数据单元,和Mysql数据库中的视图view概念类似,其本身不存储数据,仅作为数据访问的一种虚拟结构。Spark通过对RDD的相互转换操作完成整个计算过程

  • 分布式:RDD本质上可以看成是一组只读的、可分区的分布式数据集,支持跨节点多台机器上进行并行计算。
  • 弹性:数据优先内存存储,当计算节点内存不够时,可以把数据刷到磁盘等外部存储,且支持手动设定存储级别。
  • 容错性:RDD的血脉机制保存RDD的依赖关系,同时支持Checkpoint容错机制,当RDD结构更新或数据丢失时可对RDD进行重建。

RDD的创建支持从集合List中parallelize()、外部Text|JSON|JDBC等数据源读取、RDD的Transformation转换等方式。

以Scala代码为例:

//从集合中创建

val inputRDD = sc.parallelize(List("hello", "world"))

// 从数据源创建

val inputRDD = sc.textFile('/filePath/test.txt')

// rdd转换

val filterRDD = inputRDD.filter(x != 'a')

1.7 RDD的宽依赖和窄依赖了解吗

这又是一道经典的面试题,切记不要忽视细节!Spark中的RDD血脉机制,当RDD数据丢失时,可以根据记录的血脉依赖关系重新计算。而DAG调度中对计算过程划分stage,划分的依据也是RDD的依赖关系。

针对不同的函数转换,RDD之间的依赖关系分为宽依赖和窄依赖。宽依赖会产生shuffle行为,经历map输出、中间文件落地和reduce聚合等过程。

首先,我们看一下Spark官网中对于宽依赖和窄依赖的定义:

  • 宽依赖: 父RDD每个分区被多个子RDD分区使用
  • 窄依赖: 父RDD每个分区被子RDD的一个分区使用

这里需要注意的是,网上有些论调是不正确的,只各自考虑了一种情况:

  • 窄依赖就是一个父分区对应一个子分区(错误)
  • 宽依赖就是一个父分区对应所有子分区(错误)

下面我们结合示意图,分别列出宽依赖和窄依赖存在的四种情况:

  • 窄依赖(一个父RDD对应一个子RDD:map/filter、union算子)

  • 窄依赖(多个父RDD对应一个子RDD:co-partioned join算子)

  • 宽依赖(一个父RDD对应多个非全部子RDD: groupByKey算子等)

  • 宽依赖(一个父RDD对应全部子RDD: not co-partioned join算子)

1.8 你用过的TransformationAction算子有哪些

Spark中的Transformation操作会生成一个新的RDD,且具有Lazy特性,不触发任务的实际执行。常见的算子有map、filter、flatMap、groupByKey、join等。一般聚合类算子多数会导致shuffle。

  • map: 遍历RDD中元素,转换成新元素, 然后用新元素组成一个新的RDD
  • filter: 遍历RDD中元素进行判断,结果为真则保留,否则删除
  • flatMap: 与map类似,不过每个元素可返回多个元素
  • groupByKey: 聚合类算子,根据元素key分组(会产生shuffle)
  • join: 对包含<key, value>键值对的多个RDD join操作

Action操作是对RDD结果进行聚合或输出,此过程会触发Spark Job任务执行,从而执行之前所有的Transformation操作,结果可返回至Driver端。常见的算子有foreach、reduce、count、saveAsTextFile等。

  • foreach: 遍历RDD中元素
  • reduce: 将RDD中的所有元素依次聚合
  • count: 遍历RDD元素,进行累加计数
  • saveAsTextFile: 将RDD结果保存到目标源TextFile中

1.9 说说jobstagetask的关系

Job、stage和task是spark任务执行流程中的三个基本单位。其中job是最大的单位,也是Spark Application任务执行的基本单元,由action算子划分触发生成。

stage隶属于单个job,根据shuffle算子(宽依赖)拆分。单个stage内部可根据数据分区数划分成多个task,由TaskScheduler分发到各个Executor上的task线程中执行。

1.10 Spark为什么这么快

Spark是一个基于内存的,用于大规模数据处理的统一分析引擎,其运算速度可以达到Mapreduce的10-100倍。具有如下特点:

  • 内存计算。Spark优先将数据加载到内存中,数据可以被快速处理,并可启用缓存。
  • shuffle过程优化。和Mapreduce的shuffle过程中间文件频繁落盘不同,Spark对Shuffle机制进行了优化,降低中间文件的数量并保证内存优先。
  • RDD计算模型。Spark具有高效的DAG调度算法,同时将RDD计算结果存储在内存中,避免重复计算。

2 Spark进阶篇

2.1如何理解DAG Scheduler的Stage划分算法

首先放上官网的RDD执行流程图:

针对一段应用代码(如上),Driver会以Action算子为边界生成DAG调度图。DAGScheduler从DAG末端开始遍历划分Stage,封装成一系列的tasksets移交TaskScheduler,后者根据调度算法, 将taskset分发到相应worker上的Executor中执行。

1. DAGSchduler的工作原理

  • DAGScheduler是一个面向stage调度机制的高级调度器,为每个job计算stage的DAG(有向无环图),划分stage并提交taskset给TaskScheduler。
  • 追踪每个RDD和stage的物化情况,处理因shuffle过程丢失的RDD,重新计算和提交。
  • 查找rdd partition 是否cache/checkpoint。提供优先位置给TaskScheduler,等待后续TaskScheduler的最佳位置划分

2. Stage划分算法

  • 从触发action操作的算子开始,从后往前遍历DAG。
  • 为最后一个rdd创建finalStage。
  • 遍历过程中如果发现该rdd是宽依赖,则为其生成一个新的stage,与旧stage分隔而开,此时该rdd是新stage的最后一个rdd。
  • 如果该rdd是窄依赖,将该rdd划分为旧stage内,继续遍历,以此类推,继续遍历直至DAG完成。

2.2 如何理解TaskScheduler的Task分配算法

TaskScheduler负责Spark中的task任务调度工作。TaskScheduler内部使用TasksetPool调度池机制存放task任务。TasksetPool分为FIFO(先进先出调度)和FAIR(公平调度)。

  • FIFO调度: 基于队列思想,使用先进先出原则顺序调度taskset
  • FAIR调度: 根据权重值调度,一般选取资源占用率作为标准,可人为设定

1. TaskScheduler的工作原理

  • 负责Application在Cluster Manager上的注册
  • 根据不同策略创建TasksetPool资源调度池,初始化pool大小
  • 根据task分配算法发送Task到Executor上执行
  1. Task分配算法
  • 首先获取所有的executors,包含executors的ip和port等信息
  • 将所有的executors根据shuffle算法进行打散
  • 遍历executors。在程序中依次尝试本地化级别,最终选择每个task的最优位置(结合DAGScheduler优化位置策略)
  • 序列化task分配结果,并发送RPC消息等待Executor响应

2.3 Spark的本地化级别有哪几种?怎么调优

移动计算 or 移动数据?这是一个问题。在分布式计算的核心思想中,移动计算永远比移动数据要合算得多,如何合理利用本地化数据计算是值得思考的一个问题。

TaskScheduler在进行task任务分配时,需要根据本地化级别计算最优位置,一般是遵循就近原则,选择最近位置和缓存。Spark中的本地化级别在TaskManager中定义,分为五个级别。

1. Spark**本地化级别**

  • PROCESS_LOCAL(进程本地化) partition和task在同一个executor中,task分配到本地Executor进程。

  • NODE_LOCAL(节点本地化) partition和task在同一个节点的不同Executor进程中,可能发生跨进程数据传输

  • NO_PREF(无位置) 没有最佳位置的要求,比如Spark读取JDBC的数据

  • RACK_LOCAL(机架本地化) partition和task在同一个机架的不同worker节点上,可能需要跨机器数据传输

  • ANY(跨机架): 数据在不同机架上,速度最慢

2. Spark本地化调优

在task最佳位置的选择上,DAGScheduler先判断RDD是否有cache/checkpoint,即缓存优先;否则TaskScheduler进行本地级别选择等待发送task。

TaskScheduler首先会根据最高本地化级别发送task,如果在尝试5次并等待3s内还是无法执行,则认为当前资源不足,即降低本地化级别,按照PROCESS->NODE->RACK等顺序。

  • 调优1:加大spark.locality.wait 全局等待时长
  • 调优2:加大spark.locality.wait.xx等待时长(进程、节点、机架)
  • 调优3:加大重试次数(根据实际情况微调)

2.4 说说SparkMapreduceShuffle的区别

Spark中的shuffle很多过程与MapReduce的shuffle类似,都有Map输出端、Reduce端,shuffle过程通过将Map端计算结果分区、排序并发送到Reducer端。

下面将对Spark和Mapreduce中shuffle过程分开叙述,Mapreduce的shuffle大家都不陌生了,主要重点突出Spark的Shuffle机制做了哪些优化工作。

1. Hadoop Mapreduce Shuffle

MapReduce的shuffle需要依赖大量磁盘操作,数据会频繁落盘产生大量IO,同时产生大量小文件冗余。虽然缓存buffer区中启用了缓存机制,但是阈值较低且内存空间小。

  • 读取输入数据,并根据split大小切分为map任务
  • map任务在分布式节点中执行map()计算
  • 每个map task维护一个环形的buffer缓存区,存储map输出结果,分区且排序
  • 当buffer区域达到阈值时,开始溢写到临时文件中。map task任务结束时进行临时文件合并。此时,整合shuffle map端执行完成
  • mapreduce根据partition数启动reduce任务,copy拉取数据
  • merge合并拉取的文件
  • reduce()函数聚合计算,整个过程完成

2. SparkShuffle机制

Spark1.2以前,默认的shuffle计算引擎是HashShuffleManager,此种Shuffle产生大量的中间磁盘文件,消耗磁盘IO性能。在Spark1.2后续版本中,默认的ShuffleManager改成了SortShuffleManager,通过索引机制和合并临时文件的优化操作,大幅提高shuffle性能。

  • HashShuffleManager

HashShuffleManager的运行机制主要分成两种,一种是普通运行机制,另一种是合并的运行机制。合并机制主要是通过复用buffer来优化Shuffle过程中产生的小文件的数量,Hash shuffle本身不排序。开启合并机制后,同一个Executor共用一组core,文件个数为cores * reduces。

  • SortShuffleManager

SortShuffleManager的运行机制分成两种,普通运行机制和bypass运行机制。当shuffletask的数量小于等于spark.shuffle.sort.bypassMergeThreshold参数的值时(默认200),会启用bypass机制。

SortShuffleManager机制采用了一个特殊的内存数据结构(Map),数据优先写入此结构中,当达到阈值时溢写到磁盘中并清空内存数据结构。在过程中对数据进行排序并合并,减少最终的临时文件数量。ByPass机制下在其基础上加了一个索引机制,将数据存放位置记录hash索引值,相同hash的数据合并到同一个文件中。

2.5 Spark的内存是怎么管理的

Spark内存分为堆内内存和堆外内存,其中堆内内存基于JVM实现,堆外内存则是通过调用JDK Unsafe API管理。在Spark1.6版本前后内存管理模式分为: 静态管理(Static Memory)和统一管理(Unified Memory)。

两种内存管理方式存在很大的差别,内存计算占比也不同,具体细节查看我的Spark内存管理相关文章~

2.6 Spark的广播变量和累加器的作用是什么

Executor接收到TaskScheduler的taskset分发命令,根据rdd分区数在ThreadPool中创建对应的Task线程,每个Task线程拉取并序列化代码,启动分布式计算。

Spark在计算过程中的算子函数、变量都会由Driver分发到每台机器中,每个Task持有该变量的一个副本拷贝。可是这样会存在两个问题:

  1. 是否可以只在Executor中存放一次变量,所有Task共享?
  2. 分布式计算场景下怎么可以做到全局计数

1. 广播变量(Broadcast)

在Driver端使用broadcast()将一些大变量(List、Array)持久化,Executor根据broadcastid拉取本地缓存中的Broadcast对象,如果不存在,则尝试远程拉取Driver端持久化的那份Broadcast变量

这样所有的Executor均存储了一份变量的备份,这个executor启动的task会共享这个变量,节省了通信的成本和服务器的资源。注意不能广播RDD,因为RDD不存储数据;同时广播变量只能在Driver端定义和修改,Executor端只能读取。

val sc = new SparkContext(conf)
val list = List('hello world')

//定义broadcast变量
val broadcastVal = sc.broadcast(list)
val dataRDD = sc.textFile('./test.txt')

//broadcast变量读取
dataRDD.filter{x => broadcastVal.value
   .contains(x)}.foreach{println}

2. 累加器(Accumulator)

Spark累加器支持在Driver端进行全局汇总的计算需求,实现分布式计数的功能。累加器在Driver端定义赋初始值,在Excutor端更新,最终在Driver端读取最后的汇总值。

val sc = new SparkContext(conf)

// 定义累加器
val accumulator = sc.accumulator(0)

// 累加器计算
sc.textFile('./test.txt').foreach{x =>
   {accumulator.add(1)}}

// 累加器读数
println(accumulator.value)

2.7 Spark SQLHive SQL的区别

Hive SQL是Hive提供的SQL查询引擎,底层由MapReduce实现。Hive根据输入的SQL语句执行词法分析、语法树构建、编译、逻辑计划、优化逻辑计划以及物理计划等过程,转化为Map Task和Reduce Task最终交由Mapreduce引擎执行。

  • 执行引擎。具有mapreduce的一切特性,适合大批量数据离线处理,相较于Spark而言,速度较慢且IO操作频繁
  • 有完整的hql语法,支持基本sql语法、函数和udf
  • 对表数据存储格式有要求,不同存储、压缩格式性能不同

Spark SQL底层基于Spark引擎,使用Antlr解析语法,编译生成逻辑计划和物理计划,过程和Hive SQL执行过程类似,只不过Spark SQL产生的物理计划为Spark程序。

  • 执行引擎。背靠Spark计算模型,基于内存计算快速高效。
  • 可支持SQL和DataFrame等形式,底层转化为Spark算子参与计算。
  • 集成了HiveContext接口,基本实现Hive功能

2.8 说下Spark SQL的执行流程

可以参考Hive SQL的执行流程展开叙述,大致过程一致,具体执行流程如下:

  • 输入编写的Spark SQL
  • SqlParser分析器。进行语法检查、词义分析,生成未绑定的Logical Plan逻辑计划(未绑定查询数据的元数据信息,比如查询什么文件,查询那些列等)
  • Analyzer解析器。查询元数据信息并绑定,生成完整的逻辑计划。此时可以知道具体的数据位置和对象,Logical Plan 形如from table -> filter column -> select 形式的树结构
  • Optimizer优化器。选择最好的一个Logical Plan,并优化其中的不合理的地方。常见的例如谓词下推、剪枝、合并等优化操作
  • Planner使用Planing Strategies将逻辑计划转化为物理计划,并根据最佳策略选择出的物理计划作为最终的执行计划
  • 调用Spark Plan Execution执行引擎执行Spark RDD任务

2.9 RDDDataFrameDataSet的区别

1. RDDDataFrameDataset的共性

三者均为Spark分布式弹性数据集,Spark 2.x 的DataFrame被Dataset合并,现在只有DataSet和RDD。三者有许多相同的算子如filter、map等,且均具有惰性执行机制。

2. DataFrame**DataSet的区别**

DataFrame是分布式Row对象的集合,所有record类型均为Row。Dataset可以认为是DataFrame的特例,每个record存储的是强类型值而不是Row,同理Dataframe可以看作Dataset[Row]。

3. RDDDataFrameDataset转换

  • DataFrame/DataSet转换为RDD

    val rdd1 = myDF.rdd
  • RDD转换为DataFrame/Dataset (spark低版)

import spark.implicits._
val myDF = rdd.map {
 line=> (line._1,line._2)}
 .toDF("col1","col2")
  • RDD转换为Dataset
import spark.implicits._
case class ColSet(
 col1:String,col2:Int) extends Serializable 
val myDS = rdd.map {row=>
 ColSet(row._1,row._2)
}.toDS

4. Spark SQL中的RDDDataset

RDD无法支持Spark sql操作,而dataframe和dataset均支持。

2.10 groupbyKeyreduceBykey的区别

在介绍groupByKey和reduceByKey的区别之前,首先介绍一下什么是聚合算子:

根据Key进行分组聚合,解决<K, V>类型的数据计算问题

在Spark中存在很多聚合算子,常用于处理分类统计等计算场景。

  • 分组:groupByKey算子
  • 聚合:reduceByKey算子
  • 本地聚合:CombineByKey算子

1. CombineByKey算子

聚合算子内部调用的基础算子之一,程序调用CombineByKey算子时会在本地预先进行规约计算,类似于Mapreduce Shuffle中Map阶段的Combine阶段,先看一下执行原理:

  • 为各分区内所有Key创建累加器对象并赋值
  • 每次计算时分区内相同Key累加器值加一
  • 合并各分区内相同Key的值
val input = sc.parallelize(
 Array(1,1),(1,2),(2,3),(2,4),2)
val result = input.combineByKey(
 //# 初始化(k,v) 将v置换为c(1)
 (v) => (v, 1)
 //#调用mergeKey结果 将v累加到聚合对象
 (arr: (Int, Int), v)
  => (arr._1 +v, arr._2+1),
 //# 每个分区结果聚合
 (arr1:(Int,Int),arr2:(Int,Int))
  =>(arr1._1+arr2._1, arr1._2+arr2._2)
).map{
 case(k,v)=>(k, v._1/v._2)
}

2. ReduceByKey算子

内部调用CombineByKey算子实现。即先在本地预聚合,随后在分布式节点聚合,最终返回(K, V) 数据类型的计算结果。通过第一步本地聚合,大幅度减少跨节点shuffle计算的数据量,提高聚合计算的效率。

3. GroupByKey算子

GroupByKey内部禁用CombineByKey算子,将分区内相同Key元素进行组合,不参与聚合计算。此过程会和ReduceByKey一致均会产生Shuffle过程,但是ReduceByKey存在本地预聚合,效率高于GroupByKey。

  • 在聚合计算场景下,计算效率低于ReduceBykey
  • 可以搭配mapValues算子实现ReduceByKey的聚合计算

2.11 coalescerepartition的区别

两个算子都可以解决Spark的小文件过多和分区数据倾斜问题。举个例子,在使用Spark进行数据处理的过程中,常常会调用filter方法进行数据预处理,频繁的过滤操作会导致分区数据产生大量小文件碎片,当shuffle过程读取分区文件时极容易产生数据倾斜现象。

Spark通过repartition和coalesce算子来控制分区数量,通过合并小分区的方式保持数据紧凑型,提高分区的利用率。

1. 内部实现机制

首先打开repartition的源码,可以看到方法仅存在一个参数: numPartitions(分区数),这里表示需要合并的分区数量。再细看内部调用的是coalesce(shuffle=true)函数,即核心逻辑还是由coalesce()实现,且过程会产生shuffle操作。

再次定位到coalesce()方法内部,可以看到根据shuffle的条件判断,先通过生成随机数将partition重新组合,随后生成CoalesceRDD进行后续的逻辑处理。

2. 分区重分配原则

  • 当分区数大于原分区时,类型为宽依赖(shuffle过程),需要把coalesce的shuffle参数设为true,执行HashPartition重新扩大分区,这时调用repartition()
  • 当分区数两者相差不大时,类型为窄依赖,可以进行分区合并,这时调用coalesce()
  • 当分区数远远小于原分区时,需要综合考虑不同场景的使用

2.12 说说cachepersist的异同

  • cache()方法内部调用了persist()
  • persist()方法存在多种缓存级别,默认为Momory
  • cache()只有一个默认的缓存级别MEMORY_ONLY
  • persist()可以根据情况设置其它的缓存级别

2.13 连续登陆问题SQL

这是一个经典的SQL面试题,例如计算平台连续登陆3天以上的用户统计,诸如此类网上存在很多种答案,这里放上其中一种解题思路的SQL实现和DataFrame实现版本。

1. 实现思路

  • 将用户分组并按照时间排序,并记录rank排名
  • 计算dt-rank的差值,差值与用户共同分组
  • 统计count并找出 count > 3的用户

2. Spark DataFrame**实现**

val df: DataFrame = ...
val partionWindow = 
 Window.partitionBy('userid)
  .orderBy('dt')
df.select('userid, 'dt, 
 row_number() over(
  partitionWindow) as 'rn' 
)
.select('userid, 'dt, expr(
 "data_sub(dt,rn)") as 'date_diff')
.groupBy('userid, 'date_diff)
.agg(min('dt), max('dt),
 count('userid) as 'counts'
)
.where("counts >=3").show()

3. Spark SQL**实现**

select
 userid
 ,min(dt) as start_date
 ,max(dt) as end_date
 ,count(1) as times
from
(
 select 
  userid
  ,dt
  ,date_sub(dt, rn) as date_diff
  from
  (
   select 
userid
,dt
 ,row_number() over(partition by
  userid order by dt) as rn
   from 
  user_tables
  )
)
group by 
 userid, date_diff
having times >= 3

2.14 SparkStreaming怎么保证精准一次消费

实时场景下的Spark Streaming流处理,通过Receiver组件实时接收数据,最终将连续的Dstream数据流转换为微批RDD在Spark引擎中执行。Spark Streaming实时场景中最通用数据源是Kafka,一个高性能、分布式的实时消息队列。Spark Streaming最大化实时消费Kafka分区数据,提供秒级响应计算服务。

Spark Streaming保证精确一次消费,需要整个实时系统的各环节均保持强一致性。即可靠的Kafka端(数据可重复读取、不丢失)、可靠的消费端(Spark内部精确一次消费)、可靠的输出端(幂等性、事务)。

2.15 对于 Spark 中的数据倾斜问题你有什么好的方案?

简单一句: Spark 数据倾斜的几种场景以及对应的解决方案,包括避免数据源倾斜,调整并行度,使用自定义 Partitioner,使用 Map 侧 Join 代替 Reduce 侧 Join(内存表合并),给倾斜 Key 加上随机前缀等。

什么是数据倾斜 对 Spark/Hadoop 这样的大数据系统来讲,数据量大并不可怕,可怕的是数据倾斜。数据倾斜指的是,并行处理的数据集中,某一部分(如 Spark 或 Kafka 的一个 Partition)的数据显著多于其它部分,从而使得该部分的处理速度成为整个数据集处理的瓶颈(木桶效应)。

数据倾斜是如何造成的 在 Spark 中,同一个 Stage 的不同 Partition 可以并行处理,而具有依赖关系的不同 Stage 之间是串行处理的。假设某个 Spark Job 分为 Stage 0和 Stage 1两个 Stage,且 Stage 1依赖于 Stage 0,那 Stage 0完全处理结束之前不会处理Stage 1。而 Stage 0可能包含 N 个 Task,这 N 个 Task 可以并行进行。如果其中 N-1个 Task 都在10秒内完成,而另外一个 Task 却耗时1分钟,那该 Stage 的总时间至少为1分钟。换句话说,一个 Stage 所耗费的时间,主要由最慢的那个 Task 决定。由于同一个 Stage 内的所有 Task 执行相同的计算,在排除不同计算节点计算能力差异的前提下,不同 Task 之间耗时的差异主要由该 Task 所处理的数据量决定。

具体解决方案

  1. 调整并行度分散同一个 Task 的不同 Key
    Spark 在做 Shuffle 时,默认使用 HashPartitioner(非 Hash Shuffle )对数据进行分区。如果并行度设置的不合适,可能造成大量不相同的 Key 对应的数据被分配到了同一个 Task 上,造成该 Task 所处理的数据远大于其它 Task,从而造成数据倾斜。如果调整 Shuffle 时的并行度,使得原本被分配到同一 Task 的不同 Key 发配到不同 Task 上处理,则可降低原 Task 所需处理的数据量,从而缓解数据倾斜问题造成的短板效应。图中左边绿色框表示 kv 样式的数据,key 可以理解成 name。可以看到 Task0 分配了许多的 key,调整并行度,多了几个 Task,那么每个 Task 处理的数据量就分散了。

适用场景
大量不同的Key被分配到了相同的Task造成该Task数据量过大。

解决方案
调整并行度。一般是增大并行度,但有时如本例减小并行度也可达到效果。

优势
实现简单,可在需要Shuffle的操作算子上直接设置并行度或者使用spark.default.parallelism设置。如果是Spark SQL,还可通过SET spark.sql.shuffle.partitions=[num_tasks]设置并行度。可用最小的代价解决问题。一般如果出现数据倾斜,都可以通过这种方法先试验几次,如果问题未解决,再尝试其它方法。

劣势
适用场景少,只能将分配到同一Task的不同Key分散开,但对于同一Key倾斜严重的情况该方法并不适用。并且该方法一般只能缓解数据倾斜,没有彻底消除问题。从实践经验来看,其效果一般。

2.自定义Partitioner
使用自定义的 Partitioner(默认为 HashPartitioner),将原本被分配到同一个 Task 的不同 Key 分配到不同 Task,可以拿上图继续想象一下,通过自定义 Partitioner 可以把原本分到 Task0 的 Key 分到 Task1,那么 Task0 的要处理的数据量就少了。

适用场景
大量不同的Key被分配到了相同的Task造成该Task数据量过大。

解决方案
使用自定义的Partitioner实现类代替默认的HashPartitioner,尽量将所有不同的Key均匀分配到不同的Task中。

优势
不影响原有的并行度设计。如果改变并行度,后续Stage的并行度也会默认改变,可能会影响后续Stage。

劣势
适用场景有限,只能将不同Key分散开,对于同一Key对应数据集非常大的场景不适用。效果与调整并行度类似,只能缓解数据倾斜而不能完全消除数据倾斜。而且需要根据数据特点自定义专用的Partitioner,不够灵活。

3.将 Reduce side(侧)Join 转变为 Map side(侧) Join
通过 Spark 的 Broadcast 机制,将 Reduce 侧 Join 转化为 Map 侧 Join,避免 Shuffle 从而完全消除 Shuffle 带来的数据倾斜。可以看到 RDD2 被加载到内存中了。

适用场景
参与Join的一边数据集足够小,可被加载进Driver并通过Broadcast方法广播到各个Executor中。

解决方案
在Java/Scala代码中将小数据集数据拉取到Driver,然后通过broadcast方案将小数据集的数据广播到各Executor。或者在使用SQL前,将broadcast的阈值调整得足够多,从而使用broadcast生效。进而将Reduce侧Join替换为Map侧Join。

优势
避免了Shuffle,彻底消除了数据倾斜产生的条件,可极大提升性能。

劣势
要求参与Join的一侧数据集足够小,并且主要适用于Join的场景,不适合聚合的场景,适用条件有限。

4. skew key 增加随机前**/**后缀
为数据量特别大的 Key 增加随机前/后缀,使得原来 Key 相同的数据变为 Key 不相同的数据,从而使倾斜的数据集分散到不同的 Task 中,彻底解决数据倾斜问题。Join 另一则的数据中,与倾斜 Key 对应的部分数据,与随机前缀集作笛卡尔乘积,从而保证无论数据倾斜侧倾斜 Key 如何加前缀,都能与之正常 Join。

适用场景
两张表都比较大,无法使用Map则Join。其中一个RDD有少数几个Key的数据量过大,另外一个RDD的Key分布较为均匀。

解决方案
将有数据倾斜的RDD中倾斜Key对应的数据集单独抽取出来加上随机前缀,另外一个RDD每条数据分别与随机前缀结合形成新的RDD(相当于将其数据增到到原来的N倍,N即为随机前缀的总个数),然后将二者Join并去掉前缀。然后将不包含倾斜Key的剩余数据进行Join。最后将两次Join的结果集通过union合并,即可得到全部Join结果。

优势
相对于Map则Join,更能适应大数据集的Join。如果资源充足,倾斜部分数据集与非倾斜部分数据集可并行进行,效率提升明显。且只针对倾斜部分的数据做数据扩展,增加的资源消耗有限。

劣势
如果倾斜Key非常多,则另一侧数据膨胀非常大,此方案不适用。而且此时对倾斜Key与非倾斜Key分开处理,需要扫描数据集两遍,增加了开销。

5.大表随机添加 N 种随机前缀,小表扩大 N 倍
如果出现数据倾斜的 Key 比较多,上一种方法将这些大量的倾斜 Key 分拆出来,意义不大(很难一个 Key 一个 Key 都加上后缀)。此时更适合直接对存在数据倾斜的数据集全部加上随机前缀,然后对另外一个不存在严重数据倾斜的数据集整体与随机前缀集作笛卡尔乘积(即将数据量扩大 N 倍),可以看到 RDD2 扩大了 N 倍了,再和加完前缀的大数据做笛卡尔积。

适用场景
一个数据集存在的倾斜Key比较多,另外一个数据集数据分布比较均匀。

优势
对大部分场景都适用,效果不错。

劣势
需要将一个数据集整体扩大N倍,会增加资源消耗。

2.16 Spark为什么快,Spark SQL 一定比 Hive 快吗

Spark SQL 比 Hadoop Hive 快,是有一定条件的,而且不是 Spark SQL 的引擎比 Hive 的引擎快,相反,Hive 的 HQL 引擎还比 Spark SQL 的引擎更快。其实,关键还是在于 Spark 本身快。

  1. 消除了冗余的 HDFS 读写: Hadoop 每次 shuffle 操作后,必须写到磁盘,而 Spark 在 shuffle 后不一定落盘,可以 cache 到内存中,以便迭代时使用。如果操作复杂,很多的 shufle 操作,那么 Hadoop 的读写 IO 时间会大大增加,也是 Hive 更慢的主要原因了。

  2. 消除了冗余的 MapReduce 阶段: Hadoop 的 shuffle 操作一定连着完整的 MapReduce 操作,冗余繁琐。而 Spark 基于 RDD 提供了丰富的算子操作,且 reduce 操作产生 shuffle 数据,可以缓存在内存中。

  3. JVM 的优化: Hadoop 每次 MapReduce 操作,启动一个 Task 便会启动一次 JVM,基于进程的操作。而 Spark 每次 MapReduce 操作是基于线程的,只在启动 Executor 是启动一次 JVM,内存的 Task 操作是在线程复用的。每次启动 JVM 的时间可能就需要几秒甚至十几秒,那么当 Task 多了,这个时间 Hadoop 不知道比 Spark 慢了多少。

记住一种反例 考虑一种极端查询:

Select month_id, sum(sales) from T group by month_id;

这个查询只有一次 shuffle 操作,此时,也许 Hive HQL 的运行时间也许比 Spark 还快,反正 shuffle 完了都会落一次盘,或者都不落盘。

结论 Spark 快不是绝对的,但是绝大多数,Spark 都比 Hadoop 计算要快。这主要得益于其对 mapreduce 操作的优化以及对 JVM 使用的优

2.17 RDD, DAG, Stage怎么理解?

DAG Spark 中使用 DAG 对 RDD 的关系进行建模,描述了 RDD 的依赖关系,这种关系也被称之为 lineage(血缘),RDD 的依赖关系使用 Dependency 维护。DAG 在 Spark 中的对应的实现为 DAGScheduler。

RDD RDD 是 Spark 的灵魂,也称为弹性分布式数据集。一个 RDD 代表一个可以被分区的只读数据集。RDD 内部可以有许多分区(partitions),每个分区又拥有大量的记录(records)。Rdd的五个特征: 1. dependencies: 建立 RDD 的依赖关系,主要 RDD 之间是宽窄依赖的关系,具有窄依赖关系的 RDD 可以在同一个 stage 中进行计算。 2. partition: 一个 RDD 会有若干个分区,分区的大小决定了对这个 RDD 计算的粒度,每个 RDD 的分区的计算都在一个单独的任务中进行。 3. preferedlocations: 按照“移动数据不如移动计算”原则,在 Spark 进行任务调度的时候,优先将任务分配到数据块存储的位置。 4. compute: Spark 中的计算都是以分区为基本单位的,compute 函数只是对迭代器进行复合,并不保存单次计算的结果。 5. partitioner: 只存在于(K,V)类型的 RDD 中,非(K,V)类型的 partitioner 的值就是 None。

RDD 的算子主要分成2类,action 和 transformation。这里的算子概念,可以理解成就是对数据集的变换。action 会触发真正的作业提交,而 transformation 算子是不会立即触发作业提交的。每一个 transformation 方法返回一个新的 RDD。只是某些 transformation 比较复杂,会包含多个子 transformation,因而会生成多个 RDD。这就是实际 RDD 个数比我们想象的多一些 的原因。通常是,当遇到 action 算子时会触发一个job的提交,然后反推回去看前面的 transformation 算子,进而形成一张有向无环图。

Stage 在 DAG 中又进行 stage 的划分,划分的依据是依赖是否是 shuffle 的,每个 stage 又可以划分成若干 task。接下来的事情就是 driver 发送 task 到 executor,executor 自己的线程池去执行这些 task,完成之后将结果返回给 driver。action 算子是划分不同 job 的依据。

2.18 Spark-Job-Stage-Task之间的关系

基本概念

在开始之前需要先了解Spark中Application,Job,Stage等基本概念,官方给出的解释如下表:

Term Meaning
Application 用户编写的Spark应用程序,包括一个Driver和多个executors
Application jar 包含用户程序的Jar包
Driver Program 运行main()函数并创建SparkContext进程
Cluster manager 在集群上获取资源的外部服务,如standalone manager,yarn,Mesos
deploy mode 部署模式,区别在于driver process运行的位置
worker node 集群中可以运行程序代码的节点(机器)
Executor 运行在worker node上执行具体的计算任务,存储数据的进程
Task 被分配到一个Executor上的计算单元
Job 由多个任务组成的并行计算阶段,因RDD的Action产生
Stage 每个Job被分为小的计算任务组,每组称为一个stage
DAGScheduler 根据Job构建基于Stage的DAG,并提交Stage给TaskScheduler
TaskScheduler 将TaskSet提交给worker运行,每个executor运行什么task在此分配

Job-Stage-Task之间的关系

如下图所示,一个Spark程序可以被划分为一个或多个Job,划分的依据是RDD的Action算子,每遇到一个RDD的Action操作就生成一个新的Job。
每个spark Job在具体执行过程中因为shuffle的存在,需要将其划分为一个或多个可以并行计算的stage,划分的依据是RDD间的Dependency关系,当遇到Wide Dependency时因需要进行shuffle操作,这涉及到了不同Partition之间进行数据合并,故以此为界划分不同的Stage。
Stage是由Task组组成的并行计算,因此每个stage中可能存在多个Task,这些Task执行相同的程序逻辑,只是它们操作的数据不同。一般RDD的一个Partition对应一个Task,Task可以分为ResultTask和ShuffleMapTask。

RDD-Partition-Records-Task之间的关系

通常一个RDD被划分为一个或多个Partition,Partition是Spark进行数据处理的基本单位,一般来说一个Partition对应一个Task,而一个Partition中通常包含数据集中的多条记录(Record)。
注意不同Partition中包含的记录数可能不同。Partition的数目可以在创建RDD时指定,也可以通过reparation和coalesce等算子重新进行划分。
通常在进行shuffle的时候也会重新进行分区,这是对于key-valueRDD,Spark通常根据RDD中的Partitioner来进行分区,目前Spark中实现的Partitioner有两种:HashPartitioner和RangePartitioner,当然也可以实现自定义的Partitioner,只需要继承抽象类Partitioner并实现numPartitions and getPartition(key: Any)即可。

将上面的概念串联起来,可以得到下面的运行层次图:

2.19 RDD 如何通过记录更新的方式容错

RDD 的容错机制实现分布式数据集容错方法有两种: 1. 数据检查点 2. 记录更新。

RDD 采用记录更新的方式:记录所有更新点的成本很高。所以,RDD只支持粗颗粒变换,即只记录单个块(分区)上执行的单个操作,然后创建某个 RDD 的变换序列(血统 lineage)存储下来;变换序列指,每个 RDD 都包含了它是如何由其他 RDD 变换过来的以及如何重建某一块数据的信息。因此 RDD 的容错机制又称“血统”容错。

在容错机制中,如果一个节点死机了,而且运算窄依赖,则只要把丢失的父RDD分区重算即可,不依赖于其他节点。而宽依赖需要父RDD的所有分区都存在,重算就很昂贵了。可以这样理解开销的经济与否:在窄依赖中,在子RDD的分区丢失、重算父RDD分区时,父RDD相应分区的所有数据都是子RDD分区的数据,并不存在冗余计算。在宽依赖情况下,丢失一个子RDD分区重算的每个父RDD的每个分区的所有数据并不是都给丢失的子RDD分区用的,会有一部分数据相当于对应的是未丢失的子RDD分区中需要的数据,这样就会产生冗余计算开销,这也是宽依赖开销更大的原因。因此如果使用Checkpoint算子来做检查点,不仅要考虑Lineage是否足够长,也要考虑是否有宽依赖,对宽依赖加Checkpoint是最物有所值的。

2.20 Spark 血统的概念

RDD 的 lineage 记录的是粗颗粒度的特定数据转换(transformation)操作(filter, map, join etc.)行为。当这个 RDD 的部分分区数据丢失时,它可以通过 lineage 获取足够的信息来重新运算和恢复丢失的数据分区。这种粗颗粒的数据模型,限制了 Spark 的运用场合,但同时相比细颗粒度的数据模型,也带来了性能的提升。

2.21 容错方法

Spark 选择记录更新的方式。但是,如果更新粒度太细太多,那么记录更新成本也不低。因此,RDD只支持粗粒度转换,即只记录单个块上执行的单个操作,然后将创建 RDD 的一系列变换序列(每个 RDD 都包含了他是如何由其他 RDD 变换过来的以及如何重建某一块数据的信息。因此 RDD 的容错机制又称血统容错)记录下来,以便恢复丢失的分区。lineage本质上很类似于数据库中的重做日志(Redo Log),只不过这个重做日志粒度很大,是对全局数据做同样的重做进而恢复数据。

相比其他系统的细颗粒度的内存数据更新级别的备份或者 LOG 机制,RDD 的 lineage 记录的是粗颗粒度的特定数据 transformation 操作行为。当这个 RDD 的部分分区数据丢失时,它可以通过 lineage 获取足够的信息来重新运算和恢复丢失的数据分区。

2.22 Spark 粗粒度和细粒度

如果问的是操作的粗细粒度,应该是,Spark 在错误恢复的时候,只需要粗粒度的记住 lineage,就可实现容错。

关于 Mesos 1. 粗粒度模式(Coarse-grained Mode): 每个应用程序的运行环境由一个 dirver 和若干个 executor 组成,其中,每个 executor 占用若干资源,内部可运行多个 task(对应多少个 slot)。应用程序的各个任务正式运行之前,需要将运行环境中的资源全部申请好,且运行过程中要一直占用这些资源,即使不用,最后程序运行结束后,回收这些资源。举个例子,比如你提交应用程序时,指定使用5个 executor 运行你的应用程序,每个 executor 占用5GB内存和5个 CPU,每个 executor 内部设置了5个 slot,则 Mesos 需要先为 executor 分配资源并启动它们,之后开始调度任务。另外,在程序运行过程中,Mesos 的 master 和 slave 并不知道 executor 内部各个 task 的运行情况,executor 直接将任务状态通过内部的通信机制汇报给 driver,从一定程度上可以认为,每个应用程序利用 Mesos 搭建了一个虚拟集群自己使用。 2. 细粒度模式(Fine-grained Mode): 鉴于粗粒度模式会造成大量资源浪费,Spark On Mesos 还提供了另外一种调度模式:细粒度模式,这种模式类似于现在的云计算,思想是按需分配。与粗粒度模式一样,应用程序启动时,先会启动 executor,但每个 executor 占用资源仅仅是自己运行所需的资源,不需要考虑将来要运行的任务,之后,Mesos 会为每个 executor 动态分配资源,每分配一些,便可以运行一个新任务,单个 Task 运行完之后可以马上释放对应的资源。每个 Task 会汇报状态给 Mesos slave 和 Mesos Master,便于更加细粒度管理和容错,这种调度模式类似于 MapReduce 调度模式,每个 task 完全独立,优点是便于资源控制和隔离,但缺点也很明显,短作业运行延迟大。

Spark中,每个 application 对应一个 SparkContext。对于 SparkContext 之间的调度关系,取决于 Spark 的运行模式。对 Standalone 模式而言,Spark Master 节点先计算集群内的计算资源能否满足等待队列中的应用对内存和 CPU 资源的需求,如果可以,则 Master 创建 Spark Driver,启动应用的执行。宏观上来讲,这种对应用的调度类似于 FIFO 策略。在 Mesos 和 Yarn 模式下,底层的资源调度系统的调度策略都是由 Mesos 和 Yarn 决定的。具体分类描述如下:

  1. Standalone 模式: 默认以用户提交 Applicaiton 的顺序来调度,即 FIFO 策略。每个应用执行时独占所有资源。如果有多个用户要共享集群资源,则可以使用参数 spark.cores.max 来配置应用在集群中可以使用的最大 CPU 核的数量。如果不配置,则采用默认参数 spark.deploy.defaultCore 的值来确定。
  1. Mesos 模式: 如果在 Mesos 上运行 Spark,用户想要静态配置资源的话,可以设置 spark.mesos.coarse 为 true,这样 Mesos 变为粗粒度调度模式。然后可以设置 spark.cores.max 指定集群中可以使用的最大核数,与上面 Standalone 模式类似。同时,在 Mesos 模式下,用户还可以设置参数 spark.executor.memory 来配置每个 executor 的内存使用量。如果想使 Mesos 在细粒度模式下运行,可以通过 mesos://<url-info> 设置动态共享 CPU core 的执行模式。在这种模式下,应用不执行时的空闲 CPU 资源得以被其他用户使用,提升了 CPU 使用率。

2.23 Worker 和 Executor 的区别是什么?

Worker 是指每个工作节点,启动的一个进程,负责管理本节点,jps 可以看到 Worker 进程在运行,对应的概念是 Master 节点。

Executor 每个 Spark 程序在每个节点上启动的一个进程,专属于一个 Spark 程序,与 Spark 程序有相同的生命周期,负责 Spark 在节点上启动的 Task,管理内存和磁盘。如果一个节点上有多个 Spark 程序,那么相应就会启动多个执行器。所以说一个 Worker 节点可以有多个 Executor 进程。

2.24 说说Spark提供的两种共享变量

Spark 程序的大部分操作都是 RDD 操作,通过传入函数给 RDD 操作函数来计算,这些函数在不同的节点上并发执行,内部的变量有不同的作用域,不能相互访问,有些情况下不太方便。

  1. 广播变量,是一个只读对象,在所有节点上都有一份缓存,创建方法是 SparkContext.broadcast()。创建之后再更新它的值是没有意义的,一般用 val 来修改定义。

  2. 计数器,只能增加,可以用计数或求和,支持自定义类型。创建方法是 SparkContext.accumulator(V, name)。只有 Driver 程序可以读这个计算器的变量,RDD 操作中读取计数器变量是无意义的。但节点可以对该计算器进行增加(???

以上两种类型都是 Spark 的共享变量。

2.25 Master容错 和 Slave节点容错

Master节点失效

Spark Master的容错分为两种情况:Standalone集群模式和单点模式。

Standalone集群模式下的Master容错是通过ZooKeeper来完成的,即有多个Master,一个角色是Active,其他的角色是Standby。当处于Active的Master异常时,需要重新选择新的Master,通过ZooKeeper的ElectLeader功能实现。关于ZooKeeper的实现,这里就不展开了,感兴趣的朋友可以参考Paxos。

要使用ZooKeeper模式,你需要在conf/spark-env.sh中为SPARK_DAEMON_JAVA_OPTS添加一些选项,详见下表。

系统属性 说明
spark.deploy.recoveryMode 默认值为NONE。设置为ZOOKEEPER后,可以在Active Master异常之后重新选择一个Active Master
spark.deploy.zookeeper.url ZooKeeper集群地址(比如192.168.1.100:2181,192.168.1.101:2181)
spark.deploy.zookeeper.dir 用于恢复的ZooKeeper目录,默认值为/spark

设置SPARK_DAEMON_JAVA_OPTS的实际例子如下:

SPARK_DAEMON_JAVA_OPTS="$SPARK_DAEMON_JAVA_OPTS
    -Dspark.deploy.recoveryMode =ZOOKEEPER"

应用程序启动运行时,指定多个Master地址,它们之间用逗号分开,如下所示:

MASTER=spark://192.168.100.101:7077,spark://192.168.100.102:7077 bin/spark-shell

在ZooKeeper模式下,恢复期间新任务无法提交,已经运行的任务不受影响。

此外,Spark Master还支持一种更简单的单点模式下的错误恢复,即当Master进程异常时,重启Master进程并从错误中恢复。具体方法是设置spark.deploy.recoveryMode属性的值为FILESYSTEM,并为spark.deploy.recoveryDirectory属性设置一个本地目录,用于存储必要的信息来进行错误恢复。

Slave节点失效

Slave节点运行着Worker、执行器和Driver程序,所以我们分三种情况讨论下3个角色分别退出的容错过程。

  • Worker异常停止时,会先将自己启动的执行器停止,Driver需要有相应的程序来重启Worker进程。
  • 执行器异常退出时,Driver没有在规定时间内收到执行器的StatusUpdate,于是Driver会将注册的执行器移除,Worker收到LaunchExecutor指令,再次启动执行器。
  • Driver异常退出时,一般要使用检查点重启Driver,重新构造上下文并重启接收器。第一步,恢复检查点记录的元数据块。第二步,未完成作业的重新形成。由于失败而没有处理完成的RDD,将使用恢复的元数据重新生成RDD,然后运行后续的Job重新计算后恢复。

2.26 Transformationaction是什么?区别?举几个常用方法

RDD 创建后就可以在 RDD 上进行数据处理。RDD 支持两种操作: 1. 转换(transformation): 即从现有的数据集创建一个新的数据集 2. 动作(action): 即在数据集上进行计算后,返回一个值给 Driver 程序

RDD 的转化操作是返回一个新的 RDD 的操作,比如 map() 和 filter() ,而行动操作则是向驱动器程序返回结果或把结果写入外部系统的操作,会触发实际的计算,比如 count() 和 first() 。Spark 对待转化操作和行动操作的方式很不一样,因此理解你正在进行的操作的类型是很重要的。如果对于一个特定的函数是属于转化操作还是行动操作感到困惑,你可以看看它的返回值类型:转化操作返回的是 RDD,而行动操作返回的是其他的数据类型。

RDD 中所有的 Transformation 都是惰性的,也就是说,它们并不会直接计算结果。相反的它们只是记住了这些应用到基础数据集(例如一个文件)上的转换动作。只有当发生一个要求返回结果给 Driver 的 Action 时,这些 Transformation 才会真正运行。

这个设计让 Spark 更加有效的运行

2.27 Spark作业提交流程是怎么样的

  1. spark-submit 提交代码,执行 new SparkContext(),在 SparkContext 里构造 DAGScheduler 和 TaskScheduler。

  2. TaskScheduler 会通过后台的一个进程,连接 Master,向 Master 注册 Application。

  3. Master 接收到 Application 请求后,会使用相应的资源调度算法,在 Worker 上为这个 Application 启动多个 Executer。

  4. Executor 启动后,会自己反向注册到 TaskScheduler 中。 所有 Executor 都注册到 Driver 上之后,SparkContext 结束初始化,接下来往下执行我们自己的代码。

  5. 每执行到一个 Action,就会创建一个 Job。Job 会提交给 DAGScheduler。

  6. DAGScheduler 会将 Job划分为多个 stage,然后每个 stage 创建一个 TaskSet。

  7. TaskScheduler 会把每一个 TaskSet 里的 Task,提交到 Executor 上执行。

  8. Executor 上有线程池,每接收到一个 Task,就用 TaskRunner 封装,然后从线程池里取出一个线程执行这个 task。(TaskRunner 将我们编写的代码,拷贝,反序列化,执行 Task,每个 Task 执行 RDD 里的一个 partition)

2.28 Spark streamning工作流程是怎么样的,和Storm比有什么区别

Spark Streaming 与 Storm 都可以用于进行实时流计算。但是他们两者的区别是非常大的。其中区别之一,就是,Spark Streaming 和 Storm 的计算模型完全不一样,Spark Streaming 是基于 RDD 的,因此需要将一小段时间内的,比如1秒内的数据,收集起来,作为一个 RDD,然后再针对这个 batch 的数据进行处理。而 Storm 却可以做到每来一条数据,都可以立即进行处理和计算。因此,Spark Streaming 实际上严格意义上来说,只能称作准实时的流计算框架;而 Storm 是真正意义上的实时计算框架。 此外,Storm 支持的一项高级特性,是 Spark Streaming 暂时不具备的,即 Storm 支持在分布式流式计算程序(Topology)在运行过程中,可以动态地调整并行度,从而动态提高并发处理能力。而 Spark Streaming 是无法动态调整并行度的。 但是 Spark Streaming 也有其优点,首先 Spark Streaming 由于是基于 batch 进行处理的,因此相较于 Storm 基于单条数据进行处理,具有数倍甚至数十倍的吞吐量。 此外,Spark Streaming 由于也身处于 Spark 生态圈内,因此Spark Streaming可以与Spark Core、Spark SQL,甚至是Spark MLlib、Spark GraphX进行无缝整合。流式处理完的数据,可以立即进行各种map、reduce转换操作,可以立即使用sql进行查询,甚至可以立即使用machine learning或者图计算算法进行处理。这种一站式的大数据处理功能和优势,是Storm无法匹敌的。 因此,综合上述来看,通常在对实时性要求特别高,而且实时数据量不稳定,比如在白天有高峰期的情况下,可以选择使用Storm。但是如果是对实时性要求一般,允许1秒的准实时处理,而且不要求动态调整并行度的话,选择Spark Streaming是更好的选择。

补充一个比较好的比较文章: https://juejin.im/post/5a40b76a6fb9a045263bd279

2.29 Spark ML MLLib 两个包区别和联系

  1. 技术角度上,面向的数据集类型不一样: ML 的 API 是面向 Dataset 的(Dataframe 是 Dataset 的子集,也就是 Dataset[Row]), mllib 是面对 RDD 的。Dataset 和 RDD 有啥不一样呢?Dataset 的底端是 RDD。Dataset 对 RDD 进行了更深一层的优化,比如说有 sql 语言类似的黑魔法,Dataset 支持静态类型分析所以在 compile time 就能报错,各种 combinators(map,foreach 等)性能会更好,等等。

  2. 编程过程上,构建机器学习算法的过程不一样: ML 提倡使用 pipelines,把数据想成水,水从管道的一段流入,从另一端流出。ML 是1.4比 Mllib 更高抽象的库,它解决如果简洁的设计一个机器学习工作流的问题,而不是具体的某种机器学习算法。未来这两个库会并行发展。

2.30 groupByKey、reduceByKey和reduceByKeyLocally是用于对RDD进行聚合操作的函数。

groupByKey函数将键值对RDD中具有相同键的所有值进行分组。它返回一个新的键值对RDD,其中每个键关联一个由原RDD中具有该键的所有值组成的可迭代器。这个函数在数据量较小的情况下使用较为合适,因为它将所有值都放在内存中进行操作。

reduceByKey函数将具有相同键的所有值进行合并。它返回一个新的键值对RDD,其中每个键关联一个经过合并的值。这个函数在数据量较大的情况下使用较为合适,因为它会在分布式计算中对每个分区的数据进行合并,然后再将结果合并到最终的结果中。

reduceByKeyLocally函数与reduceByKey函数类似,但它不会在分布式计算中进行合并操作,而是将结果返回到驱动程序中。它返回一个新的键值对的Map,其中每个键关联一个经过合并的值。这个函数适用于数据量较小且需要将结果返回到驱动程序进行后续处理的情况。

需要注意的是,reduceByKey和reduceByKeyLocally函数都是基于reduce操作进行聚合的,而groupByKey函数是基于分组操作进行聚合的。在处理大数据量时,推荐使用reduceByKey函数,因为它可以在分布式计算中进行合并操作,提高计算效率。

2.31 解释 persist() 和 cache() 的异同

在Spark源码中,persist()和cache()都是用于将RDD持久化到内存或磁盘的函数。它们的主要区别在于持久化级别和默认行为。

persist()函数可以指定持久化级别,它接受一个StorageLevel参数,用于指定数据的持久化级别。这个参数可以是Memory-only、Disk-only、Memory and Disk等不同级别的组合。通过persist()函数持久化的RDD可以在后续的操作中重用,从而避免了重复计算。但是需要注意的是,persist()函数只是将RDD标记为持久化,具体的持久化操作是惰性执行的,也就是说在第一次使用RDD时才会将其持久化到内存或磁盘。

cache()函数是persist()函数的一个特例,它默认将RDD持久化到内存中,并且使用的是默认的持久化级别。默认情况下,cache()函数将RDD持久化到内存中,使用的是MEMORY_ONLY级别。与persist()函数一样,cache()函数也是惰性执行的,只有在第一次使用RDD时才会将其持久化到内存。

总结来说,persist()函数和cache()函数的主要区别在于持久化级别和默认行为。persist()函数可以指定不同的持久化级别,而cache()函数默认将RDD持久化到内存中,并且使用的是默认的持久化级别。在使用时,可以根据需求选择使用persist()函数指定不同的持久化级别,或者直接使用cache()函数进行默认的内存持久化。

2.32 解释如下两个代码的差异

#代码1
val counter = 0
val data = Seq(1, 2, 3)
data.foreach(x => counter += x)
println("Counter value: " + counter)

#代码2
val counter = 0
val data = Seq(1, 2, 3)
var rdd = sc.parallelizze(data)
rdd.foreach(x => counter += x)
println("Counter value: " + counter)

在默认情况下,当 Spark 在集群的多个不同节点的多个任务上并行运行一个函数时,它会把函数中涉及到的每个变量,在每个任务上都生成一个副本。但是,有时候需要在多个任务之间共享变量,或者在任务(Task)和任务控制节点(Driver Program)之间共享变量。

为了满足这种需求,Spark 提供了两种类型的变量:

累加器 accumulators:累加器支持在所有不同节点之间进行累加计算(比如计数或者求和)。

广播变量 broadcast variables:广播变量用来把变量在所有节点的内存之间进行共享,在每个机器上缓存一个只读的变量,而不是为机器上的每个任务都生成一个副本。

  1. 累加器
  • 1.1不使用累加器
    var counter = 0
    val data = Seq(1, 2, 3)
    data.foreach(x => counter += x)
    println("Counter value: "+ counter)
    运行结果:
    Counter value: 6
    如果我们将 data 转换成 RDD,再来重新计算:
    var counter = 0
    val data = Seq(1, 2, 3)
    var rdd = sc.parallelize(data)
    rdd.foreach(x => counter += x)
    println("Counter value: "+ counter)
    运行结果:
    Counter value: 0
  • 1.2. 使用累加器

通常在向 Spark 传递函数时,比如使用 map() 函数或者用 filter() 传条件时,可以使用驱动器程序中定义的变量,但是集群中运行的每个任务都会得到这些变量的一份新的副本,更新这些副本的值也不会影响驱动器中的对应变量。这时使用累加器就可以实现我们想要的效果:

val xx: Accumulator[Int] = sc.accumulator(0)
  1. 代码示例:
import org.apache.spark.rdd.RDD
import org.apache.spark.{Accumulator, SparkConf, SparkContext}

object AccumulatorTest {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("wc").setMaster("local[*]")
    val sc: SparkContext = new SparkContext(conf)
    sc.setLogLevel("WARN")

    //使用scala集合完成累加
    var counter1: Int = 0;
    var data = Seq(1,2,3)
    data.foreach(x => counter1 += x )
    println(counter1)//6

    println("+++++++++++++++++++++++++")

    //使用RDD进行累加
    var counter2: Int = 0;
    val dataRDD: RDD[Int] = sc.parallelize(data) //分布式集合的[1,2,3]
    dataRDD.foreach(x => counter2 += x)
    println(counter2)//0
    //注意:上面的RDD操作运行结果是0
    //因为foreach中的函数是传递给Worker中的Executor执行,用到了counter2变量
    //而counter2变量在Driver端定义的,在传递给Executor的时候,各个Executor都有了一份counter2
    //最后各个Executor将各自个x加到自己的counter2上面了,和Driver端的counter2没有关系

    //那这个问题得解决啊!不能因为使用了Spark连累加都做不了了啊!
    //如果解决?---使用累加器
    val counter3: Accumulator[Int] = sc.accumulator(0)
    dataRDD.foreach(x => counter3 += x)
    println(counter3)//6
  }
}
  1. 广播变量
  • 2.1. 不使用广播变量

    1. 使用广播变量
    1. 代码示例:

关键词:sc.broadcast()

import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object BroadcastVariablesTest {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("wc").setMaster("local[*]")
    val sc: SparkContext = new SparkContext(conf)
    sc.setLogLevel("WARN")

    //不使用广播变量
    val kvFruit: RDD[(Int, String)] = sc.parallelize(List((1,"apple"),(2,"orange"),(3,"banana"),(4,"grape")))
    val fruitMap: collection.Map[Int, String] =kvFruit.collectAsMap
    //scala.collection.Map[Int,String] = Map(2 -> orange, 4 -> grape, 1 -> apple, 3 -> banana)
    val fruitIds: RDD[Int] = sc.parallelize(List(2,4,1,3))
    //根据水果编号取水果名称
    val fruitNames: RDD[String] = fruitIds.map(x=>fruitMap(x))
    fruitNames.foreach(println)
    //注意:以上代码看似一点问题没有,但是考虑到数据量如果较大,且Task数较多,
    //那么会导致,被各个Task共用到的fruitMap会被多次传输
    //应该要减少fruitMap的传输,一台机器上一个,被该台机器中的Task共用即可
    //如何做到?---使用广播变量
    //注意:广播变量的值不能被修改,如需修改可以将数据存到外部数据源,如MySQL、Redis
    println("=====================")
    val BroadcastFruitMap: Broadcast[collection.Map[Int, String]] = sc.broadcast(fruitMap)
    val fruitNames2: RDD[String] = fruitIds.map(x=>BroadcastFruitMap.value(x))
    fruitNames2.foreach(println)

  }
}

2.32 说说mapmapPartitions的区别

map 中的 func 作用的是 RDD 中每一个元素,而 mapPartitioons 中的 func 作用的对象是 RDD 的一整个分区。所以 func 的类型是Iterator<T> => Iterator<T>,其中 T 是输入 RDD 的元素类型。

2.33 groupByKey和reduceByKey是属于Transformation还是 Action?

groupByKey和reduceByKey都属于Spark中的Transformation操作,而不是Action操作。

Transformation操作是指对RDD进行转换,生成一个新的RDD,而不会立即触发计算。在执行Transformation操作时,Spark会记录下转换的步骤和依赖关系,以便在需要时进行计算。

Action操作是指对RDD进行实际计算并返回结果,触发Spark执行计算。Action操作会导致Spark将转换操作应用于数据集,并返回结果或将结果写入到外部存储系统。

2.34 Spark Streaming小文件问题

使用 Spark Streaming 时,如果实时计算结果要写入到 HDFS,那么不可避免的会遇到一个问题,那就是在默认情况下会产生非常多的小文件,这是由 Spark Streaming 的微批处理模式和 DStream(RDD) 的分布式(partition)特性导致的,Spark Streaming 为每个 Partition 启动一个独立的线程(一个 task/partition 一个线程)来处理数据,一旦文件输出到 HDFS,那么这个文件流就关闭了,再来一个 batch 的 parttition 任务,就再使用一个新的文件流,那么假设,一个 batch 为10s,每个输出的 DStream 有32(最大???)个 partition,那么一个小时产生的文件数将会达到(3600/10)*32=11520个之多。众多小文件带来的结果是有大量的文件元信息,比如文件的 location、文件大小、block number 等需要 NameNode 来维护,NameNode 会因此鸭梨山大。不管是什么格式的文件,parquet、text、JSON 或者 Avro,都会遇到这种小文件问题,这里讨论几种处理 Spark Streaming 小文件的典型方法。

  1. 增加 batch 大小: 这种方法很容易理解,batch 越大,从外部接收的 event 就越多,内存积累的数据也就越多,那么输出的文件数也就会变少,比如上边的时间从10s增加为100s,那么一个小时的文件数量就会减少到1152个。但别高兴太早,实时业务能等那么久吗,本来人家10s看到结果更新一次,现在要等快两分钟,是人都会骂娘。所以这种方法适用的场景是消息实时到达,但不想挤压在一起处理,因为挤压在一起处理的话,批处理任务在干等,这时就可以采用这种方法。

  2. Coalesce大法好: 文章开头讲了,小文件的基数是 batch_number * partition_number,而第一种方法是减少 batch_number,那么这种方法就是减少 partition_number 了,这个 api 不细说,就是减少初始的分区个数。看过 spark 源码的童鞋都知道,对于窄依赖,一个子 RDD 的 partition 规则继承父 RDD,对于宽依赖(就是那些个叉叉叉ByKey操作),如果没有特殊指定分区个数,也继承自父 rdd。那么初始的 SourceDstream 是几个 partiion,最终的输出就是几个 partition。所以 Coalesce 大法的好处就是,可以在最终要输出的时候,来减少一把 partition 个数。但是这个方法的缺点也很明显,本来是32个线程在写256M数据,现在可能变成了4个线程在写256M数据,而没有写完成这256M数据,这个 batch 是不算结束的。那么一个 batch 的处理时延必定增长,batch 挤压会逐渐增大。

  3. Spark Streaming 外部来处理: 我们既然把数据输出到 hdfs,那么说明肯定是要用 Hive 或者 Spark Sql 这样的“sql on hadoop”系统类进一步进行数据分析,而这些表一般都是按照半小时或者一小时、一天,这样来分区的(注意不要和 Spark Streaming 的分区混淆,这里的分区,是用来做分区裁剪优化的),那么我们可以考虑在 Spark Streaming 外再启动定时的批处理任务来合并 Spark Streaming 产生的小文件。这种方法不是很直接,但是却比较有用,“性价比”较高,唯一要注意的是,批处理的合并任务在时间切割上要把握好,搞不好就可能会去合并一个还在写入的 Spark Streaming 小文件。

自己调用 foreach 去 append: Spark Streaming 提供的 foreach 这个 outout 类 api (一种 Action 操作),可以让我们自定义输出计算结果的方法。那么我们其实也可以利用这个特性,那就是每个 batch 在要写文件时,并不是去生成一个新的文件流,而是把之前的文件打开。考虑这种方法的可行性,首先,HDFS 上的文件不支持修改,但是很多都支持追加,那么每个 batch 的每个 partition 就对应一个输出文件,每次都去追加这个 partition 对应的输出文件,这样也可以实现减少文件数量的目的。这种方法要注意的就是不能无限制的追加,当判断一个文件已经达到某一个阈值时,就要产生一个新的文件进行追加了。所以大概就是一直32个文件咯。

2.35 说说Spark的预写日志功能

也叫 WriteAheadLogs,通常被用于数据库和文件系统中,保证数据操作的持久性。预写日志通常是先将操作写入到一个持久可靠的日志文件中,然后才对数据施加该操作,当加入施加该操作中出现异常,可以通过读取日志文件并重新施加该操作,从而恢复系统。

当 WAL 开启后,所有收到的数据同时保存到了容错文件系统的日志文件中,当 Spark Streaming 失败,这些接受到的数据也不会丢失。另外,接收数据的正确性只在数据被预写到日志以后接收器才会确认。已经缓存但还没有保存的数据可以在 Driver 重新启动之后由数据源再发送一次(经常问)。

这两个机制保证了数据的零丢失,即所有的数据要么从日志中恢复,要么由数据源重发。

2.36 Mesos粗粒度和细粒度对比

  1. 粗粒度运行模式: Spark 应用程序在注册到 Mesos 时会分配对应系统资源,在执行过程中由 SparkContext 和 Executor 直接交互,该模式优点是由于资源长期持有减少了资源调度的时间开销,缺点是该模式下 Mesos 无法感知资源使用的变化,容易造成系统资源的闲置,无法被 Mesos 其他框架使用,造成资源浪费。

  2. 细粒度的运行模式: Spark 应用程序是以单个任务的粒度发送到 Mesos 中执行,在执行过程中 SparkContext 并不能和 Executor 直接交互,而是由 Mesos Master 进行统一的调度管理,这样能够根据整个 Mesos 集群资源使用的情况动态调整。该模式的优点是系统资源能够得到充分利用,缺点是该模式中每个人物都需要从 Mesos 获取资源,调度延迟较大,对于 Mesos Master 开销较大。

2.37 Spark经常说的Repartition是个什么玩意

coalesce函数:
返回一个经过简化到numPartitions个分区的新RDD。这会导致一个窄依赖,例如:你将1000个分区转换成100个分区,这个过程不会发生shuffle,相反如果10个分区转换成100个分区将会发生shuffle。然而如果你想大幅度合并分区,例如合并成一个分区,这会导致你的计算在少数几个集群节点上计算(言外之意:并行度不够)。为了避免这种情况,你可以将第二个shuffle参数传递一个true,这样会在重新分区过程中多一步shuffle,这意味着上游的分区可以并行运行。
第二个参数shuffle=true,将会产生多于之前的分区数目,例如你有一个个数较少的分区,假如是100,调用coalesce(1000, shuffle = true)将会使用一个 HashPartitioner产生1000个分区分布在集群节点上。这个(对于提高并行度)是非常有用的。

repartition函数:

返回一个恰好有numPartitions个分区的RDD,可以增加或者减少此RDD的并行度。内部,这将使用shuffle重新分布数据,如果你减少分区数,考虑使用coalesce,这样可以避免执行shuffle

作用:
1.避免小文件
2.减少 Task 个数
3.但是会增加每个 Task 处理的数据量

2.38 了解Spark的架构吗

从集群部署的角度来看,Spark 集群由集群管理器 Cluster Manager、工作节点 Worker、执行器 Executor、驱动器 Driver、应用程序 Application 等部分组成。

  1. Cluster Manager: 主要负责对集群资源的分配和管理,Cluster Manager 在 YARN 部署模式下为 RM,在 Mesos 下为 Mesos Master,Standalone 模式下为 Master。CM 分配的资源属于一级分配,它将各个 Worker 上的内存、CPU 等资源分配给 Application,但是不负责对 Executor 的资源分类。Standalone 模式下的 Master 会直接给 Application 分配内存、CPU 及 Executor 等资源。

  2. Worker: Spark 的工作节点。在 YARN 部署模式下实际由 NodeManager 替代。Worker 节点主要负责,把自己的内存、CPU 等资源通过注册机制告知 CM,创建 Executor,把资源和任务进一步分配给 Executor,同步资源信息,Executor 状态信息给 CM 等等。Standalone 部署模式下,Master 将 Worker 上的内存、CPU 以及 Executor 等资源分配给 Application 后,将命令 Worker 启动 CoarseGrainedExecutorBackend 进程(此进程会创建 Executor 实例)。

  3. Executor: 执行计算任务的一线组件,主要负责任务的执行及与 Worker Driver 信息同步。

  4. Driver: Application 的驱动程序,Application 通过 Driver 与 CM、Executor 进行通信。Driver 可以运行在 Application 中,也可以由 Application 提交给 CM 并由 CM 安排 Worker 运行。

Application: 用户使用 Spark 提供的 API 编写的应用程序,Application 通过 Spark API 将进行 RDD 的转换和 DAG 的创建,并通过 Driver 将 Application 注册到 CM,CM 将会根据 Application 的资源需求,通过一级资源分配将 Excutor、内存、CPU 等资源分配给 Application。Drvier 通过二级资源分配将 Executor 等资源分配给每一个任务,Application 最后通过 Driver 告诉 Executor 运行任务。

2.39了解StreamingContext启动时序图吗

  1. 初始化 StreamingContext 中的 DStreamGraph 和 JobScheduler,进而启动 JobScheduler 的 ReceiveTracker 和 JobGenerator。

  2. 初始化阶段会进行成员变量的初始化,重要的包括 DStreamGraph(包含 DStream 之间相互依赖的有向无环图),JobScheduler(定时查看 DStreamGraph,然后根据流入的数据生成运行作业),StreamingTab(在 Spark Streaming 运行的时候对流数据处理的监控)。

  3. 然后就是创建 InputDStream,接着就是对 InputDStream 进行 flatMap, map, reduceByKey, print 等操作,类似于 RDD 的转换操作。

  4. 启动 JobScheduler,实例化并启动 ReceiveTracker 和 JobGenerator。

  5. 启动 JobGenerator

  6. 启动 ReceiverTracker

2.40 再谈Spark Streaming的容错性

实时流处理系统需要长时间接收并处理数据,这个过程中出现异常是难以避免的,需要流程系统具备高容错性。Spark Streaming 一开始就考虑了两个方面。

  1. 利用 Spark 自身的容错设计、存储级别和 RDD 抽象设计能够处理集群中任何 Worker 节点的故障

  2. Spark 运行多种运行模式,其 Driver 端可能运行在 Master 节点或者集群中的任意节点,这样让 Driver 端具备容错能力是很大的挑战,但是由于其接收的数据是按照批进行存储和处理,这些批次数据的元数据可以通过执行检查点的方式定期写入到可靠的存储中,在 Driver 端重新启动中恢复这些状态

当接收到的数据缓存在 Executor 内存中的丢失风险要怎么处理呢?

如果是独立运行模式/Yarn/Mesos 模式,当 Driver 端失败的时候,该 Driver 端所管理的 Executor 以及内存中数据将终止,即时 Driver 端重新启动这些缓存的数据也不能被恢复。为了避免这种数据损失,就需要预写日志功能了。

当 Spark Streaming 应用开始的时候,也就是 Driver 开始的时候,接收器成为长驻运行任务,这些接收器接收并保存流数据到 Spark 内存以供处理。

  1. 接收器将数据分成一系列小块,存储到 Executor 内存或磁盘中,如果启动预写日志,数据同时还写入到容错文件系统的预写日志文件。

  2. 通知 StreamingContext,接收块中的元数据被发送到 Driver 的 StreamingContext,这个元数据包括两种,一是定位其 Executor 内存或磁盘中数据位置的块编号,二是块数据在日志中的偏移信息(如果启用 WAL 的话)。

2.41 谈谈Spark Streaming Driver端重启会发生什么

  1. 恢复计算: 使用检查点信息重启 Driver 端,重构上下文并重启接收器

  2. 恢复元数据块: 为了保证能够继续下去所必备的全部元数据块都被恢复

  3. 未完成作业的重新形成: 由于失败而没有处理完成的批处理,将使用恢复的元数据再次产生 RDD 和对应的作业

  4. 读取保存在日志中的块数据: 在这些作业执行的时候,块数据直接从预写日志中读出,这将恢复在日志中可靠地保存所有必要的数据

  5. 重发尚未确认的数据: 失败时没有保存到日志中的缓存数据将由数据源再次发送

2.42 说说RDDDataFrameDataSet的关系

这里主要对比 Dataset 和 DataFrame,因为 Dataset 和 DataFrame 拥有完全相同的成员函数,区别只是每一行的数据类型不同。

DataFrame 也可以叫 Dataset[Row],每一行的类型是 Row,不解析,每一行究竟有哪些字段,各个字段又是什么类型都无从得知,只能用上面提到的 getAS 方法或者共性中的第七条提到的模式匹配拿出特定字段。

而 Dataset 中,每一行是什么类型是不一定的,在自定义了 case class 之后可以很自由的获得每一行的信息。

2.43 Spark 的运行流程?

具体运行流程如下:
SparkContext 向资源管理器注册并向资源管理器申请运行 Executor
资源管理器分配 Executor,然后资源管理器启动 Executor
Executor 发送心跳至资源管理器
SparkContext 构建 DAG 有向无环图
将 DAG 分解成 Stage(TaskSet)
把 Stage 发送给 TaskScheduler
Executor 向 SparkContext 申请 Task
TaskScheduler 将 Task 发送给 Executor 运行
同时 SparkContext 将应用程序代码发放给 Executor
Task 在 Executor 上运行,运行完毕释放所有资源

2.44 RDDreduceBykey groupByKey 哪个性能好,为什么?

reduceByKey:reduceByKey 会在结果发送至 reducer 之前会对每个 mapper 在本地进行 merge,有点类似于在 MapReduce 中的 combiner。这样做的好处在于,在 map 端进行一次 reduce 之后,数据量会大幅度减小,从而减小传输,保证 reduce 端能够更快的进行结果计算。

groupByKey:groupByKey 会对每一个 RDD 中的 value 值进行聚合形成一个序列(Iterator),此操作发生在 reduce 端,所以势必会将所有的数据通过网络进行传输,造成不必要的浪费。同时如果数据量十分大,可能还会造成 OutOfMemoryError。

所以在进行大量数据的 reduce 操作时候建议使用 reduceByKey。不仅可以提高速度,还可以防止使用 groupByKey 造成的内存溢出问题。

2.45 介绍一下 cogroup rdd 实现原理,你在什么场景下用过这个 rdd

cogroup:对多个(2~4)RDD 中的 KV 元素,每个 RDD 中相同 key 中的元素分别聚合成一个集合。

与 reduceByKey 不同的是:reduceByKey 针对一个 RDD中相同的 key 进行合并。而 cogroup 针对多个 RDD中相同的 key 的元素进行合并。

cogroup 的函数实现:这个实现根据要进行合并的两个 RDD 操作,生成一个 CoGroupedRDD 的实例,这个 RDD 的返回结果是把相同的 key 中两个 RDD 分别进行合并操作,最后返回的 RDD 的 value 是一个 Pair 的实例,这个实例包含两个 Iterable 的值,第一个值表示的是 RDD1 中相同 KEY 的值,第二个值表示的是 RDD2 中相同 key 的值。

由于做 cogroup 的操作,需要通过 partitioner 进行重新分区的操作,因此,执行这个流程时,需要执行一次 shuffle 的操作(如果要进行合并的两个 RDD 的都已经是 shuffle 后的 rdd,同时他们对应的 partitioner 相同时,就不需要执行 shuffle)。

场景:表关联查询或者处理重复的 key。

2.46 DAG划分为Stage 的算法了解吗?

核心算法:回溯算法
从后往前回溯/反向解析,遇到窄依赖加入本 Stage,遇见宽依赖进行 Stage 切分。
Spark 内核会从触发 Action 操作的那个 RDD 开始从后往前推,首先会为最后一个 RDD 创建一个 Stage,然后继续倒推,如果发现对某个 RDD 是宽依赖,那么就会将宽依赖的那个 RDD 创建一个新的 Stage,那个 RDD 就是新的 Stage 的最后一个 RDD。
然后依次类推,继续倒推,根据窄依赖或者宽依赖进行 Stage 的划分,直到所有的 RDD 全部遍历完成为止。

2.47 对于Spark 中的数据倾斜问题你有什么好的方案?

前提是定位数据倾斜,是 OOM 了,还是任务执行缓慢,看日志,看 WebUI
解决方法,有多个方面:
避免不必要的 shuffle,如使用广播小表的方式,将 reduce-side-join 提升为 map-side-join
分拆发生数据倾斜的记录,分成几个部分进行,然后合并 join 后的结果
改变并行度,可能并行度太少了,导致个别 task 数据压力大
两阶段聚合,先局部聚合,再全局聚合
自定义 paritioner,分散 key 的分布,使其更加均匀

2.48 Spark 中的 OOM 问题?

map 类型的算子执行中内存溢出如 flatMap,mapPatitions
原因:map 端过程产生大量对象导致内存溢出:这种溢出的原因是在单个 map 中产生了大量的对象导致的针对这种问题。

解决方案:

增加堆内内存。

在不增加内存的情况下,可以减少每个 Task 处理数据量,使每个 Task 产生大量的对象时,Executor 的内存也能够装得下。具体做法可以在会产生大量对象的 map 操作之前调用 repartition 方法,分区成更小的块传入 map。

shuffle 后内存溢出如 join,reduceByKey,repartition。

shuffle 内存溢出的情况可以说都是 shuffle 后,单个文件过大导致的。在 shuffle 的使用,需要传入一个 partitioner,大部分 Spark 中的 shuffle 操作,默认的 partitioner 都是 HashPatitioner,默认值是父 RDD 中最大的分区数.这个参数 spark.default.parallelism 只对 HashPartitioner 有效.如果是别的 partitioner 导致的 shuffle 内存溢出就需要重写 partitioner 代码了.

driver 内存溢出

用户在 Dirver 端口生成大对象,比如创建了一个大的集合数据结构。解决方案:将大对象转换成 Executor 端加载,比如调用 sc.textfile 或者评估大对象占用的内存,增加 dirver 端的内存

从 Executor 端收集数据(collect)回 Dirver 端,建议将 driver 端对 collect 回来的数据所作的操作,转换成 executor 端 rdd 操作。

2.49 Spark 中数据的位置是被谁管理的?

每个数据分片都对应具体物理位置,数据的位置是被blockManager管理,无论数据是在磁盘,内存还是 tacyan,都是由 blockManager 管理。

2.50 Spak 程序执行,有时候默认为什么会产生很多 task,怎么修改默认 task 执行个数?

输入数据有很多 task,尤其是有很多小文件的时候,有多少个输入
block 就会有多少个 task 启动;

spark 中有 partition 的概念,每个 partition 都会对应一个 task,task 越多,在处理大规模数据的时候,就会越有效率。不过 task 并不是越多越好,如果平时测试,或者数据量没有那么大,则没有必要 task 数量太多。

参数可以通过 spark_home/conf/spark-default.conf 配置文件设置:
针对 spark sql 的 task 数量:spark.sql.shuffle.partitions=50
非 spark sql 程序设置生效:spark.default.parallelism=10

2.51 介绍一下 join 操作优化经验?

这道题常考,这里只是给大家一个思路,简单说下!面试之前还需做更多准备。
join 其实常见的就分为两类: map-side join 和 reduce-side join。
当大表和小表 join 时,用 map-side join 能显著提高效率。

将多份数据进行关联是数据处理过程中非常普遍的用法,不过在分布式计算系统中,这个问题往往会变的非常麻烦,因为框架提供的 join 操作一般会将所有数据根据 key 发送到所有的 reduce 分区中去,也就是 shuffle 的过程。造成大量的网络以及磁盘 IO 消耗,运行效率极其低下,这个过程一般被称为 reduce-side-join。

如果其中有张表较小的话,我们则可以自己实现在 map 端实现数据关联,跳过大量数据进行 shuffle 的过程,运行时间得到大量缩短,根据不同数据可能会有几倍到数十倍的性能提升。

在大数据量的情况下,join 是一中非常昂贵的操作,需要在 join 之前应尽可能的先缩小数据量。

对于缩小数据量,有以下几条建议:

若两个 RDD 都有重复的 key,join 操作会使得数据量会急剧的扩大。所有,最好先使用 distinct 或者 combineByKey 操作来减少 key 空间或者用 cogroup 来处理重复的 key,而不是产生所有的交叉结果。在 combine 时,进行机智的分区,可以避免第二次 shuffle。

如果只在一个 RDD 出现,那你将在无意中丢失你的数据。所以使用外连接会更加安全,这样你就能确保左边的 RDD 或者右边的 RDD 的数据完整性,在 join 之后再过滤数据。

如果我们容易得到 RDD 的可以的有用的子集合,那么我们可以先用 filter 或者 reduce,如何在再用 join。

2.52 SparkMapReduceShuffle 的区别?

相同点:都是将 mapper(Spark 里是 ShuffleMapTask)的输出进行 partition,不同的 partition 送到不同的 reducer(Spark 里 reducer 可能是下一个 stage 里的 ShuffleMapTask,也可能是 ResultTask)

不同点:

MapReduce 默认是排序的,spark 默认不排序,除非使用 sortByKey 算子。

MapReduce 可以划分成 split,map()、spill、merge、shuffle、sort、reduce()等阶段,spark 没有明显的阶段划分,只有不同的 stage 和算子操作。

MR 落盘,Spark 不落盘,spark 可以解决 mr 落盘导致效率低下的问题。

2.53 Spark SQL 执行的流程?

这个问题如果深挖还挺复杂的,这里简单介绍下总体流程:

parser:基于 antlr 框架对 sql 解析,生成抽象语法树。

变量替换:通过正则表达式找出符合规则的字符串,替换成系统缓存环境的变量

SQLConf 中的spark.sql.variable.substitute,默认是可用的;参考SparkSqlParser

parser:将 antlr 的 tree 转成 spark catalyst 的 LogicPlan,也就是 未解析的逻辑计划;详细参考AstBuild, ParseDriver

analyzer:通过分析器,结合 catalog,把 logical plan 和实际的数据绑定起来,将 未解析的逻辑计划 生成 逻辑计划;详细参考QureyExecution

缓存替换:通过 CacheManager,替换有相同结果的 logical plan(逻辑计划)

logical plan 优化,基于规则的优化;优化规则参考 Optimizer,优化执行器 RuleExecutor

生成 spark plan,也就是物理计划;参考QueryPlanner和SparkStrategies

spark plan 准备阶段

构造 RDD 执行,涉及 spark 的 wholeStageCodegenExec 机制,基于 janino 框架生成 java 代码并编译

2.54 Spark SQL 是如何将数据写到 Hive 表的?

方式一:是利用 Spark RDD 的 API 将数据写入 hdfs 形成 hdfs 文件,之后再将 hdfs 文件和 hive 表做加载映射。

方式二:利用 Spark SQL 将获取的数据 RDD 转换成 DataFrame,再将 DataFrame 写成缓存表,最后利用 Spark SQL 直接插入 hive 表中。而对于利用 Spark SQL 写 hive 表官方有两种常见的 API,第一种是利用 JavaBean 做映射,第二种是利用 StructType 创建 Schema 做映射。

2.55 通常来说,SparkMapReduce 相比,Spark 运行效率更高。请说明效率更高来源于 Spark 内置的哪些机制?

基于内存计算,减少低效的磁盘交互;
高效的调度算法,基于 DAG;
容错机制 Linage。
重点部分就是 DAG 和 Lingae

2.56 Hadoop Spark 的相同点和不同点?

Hadoop 底层使用 MapReduce 计算架构,只有 map 和 reduce 两种操作,表达能力比较欠缺,而且在 MR 过程中会重复的读写 hdfs,造成大量的磁盘 io 读写操作,所以适合高时延环境下批处理计算的应用;

Spark 是基于内存的分布式计算架构,提供更加丰富的数据集操作类型,主要分成转化操作和行动操作,包括 map、reduce、filter、flatmap、groupbykey、reducebykey、union 和 join 等,数据分析更加快速,所以适合低时延环境下计算的应用;

spark 与 hadoop 最大的区别在于迭代式计算模型。基于 mapreduce 框架的 Hadoop 主要分为 map 和 reduce 两个阶段,两个阶段完了就结束了,所以在一个 job 里面能做的处理很有限;spark 计算模型是基于内存的迭代式计算模型,可以分为 n 个阶段,根据用户编写的 RDD 算子和程序,在处理完一个阶段后可以继续往下处理很多个阶段,而不只是两个阶段。所以 spark 相较于 mapreduce,计算模型更加灵活,可以提供更强大的功能。

但是 spark 也有劣势,由于 spark 基于内存进行计算,虽然开发容易,但是真正面对大数据的时候,在没有进行调优的情况下,可能会出现各种各样的问题,比如 OOM 内存溢出等情况,导致 spark 程序可能无法运行起来,而 mapreduce 虽然运行缓慢,但是至少可以慢慢运行完。

2.57 Spark 如何保证宕机迅速恢复**?**

适当增加 spark standby master
编写 shell 脚本,定期检测 master 状态,出现宕机后对 master 进行重启操作

2.58 RDD 持久化原理?

spark 非常重要的一个功能特性就是可以将 RDD 持久化在内存中。

调用 cache()和 persist()方法即可。cache()和 persist()的区别在于,cache()是 persist()的一种简化方式,cache()的底层就是调用 persist()的无参版本 persist(MEMORY_ONLY),将数据持久化到内存中。

如果需要从内存中清除缓存,可以使用 unpersist()方法。RDD 持久化是可以手动选择不同的策略的。在调用 persist()时传入对应的 StorageLevel 即可。

2.59 Checkpoint 检查点机制?

应用场景:当 spark 应用程序特别复杂,从初始的 RDD 开始到最后整个应用程序完成有很多的步骤,而且整个应用运行时间特别长,这种情况下就比较适合使用 checkpoint 功能。

原因:对于特别复杂的 Spark 应用,会出现某个反复使用的 RDD,即使之前持久化过但由于节点的故障导致数据丢失了,没有容错机制,所以需要重新计算一次数据。

Checkpoint 首先会调用 SparkContext 的 setCheckPointDIR()方法,设置一个容错的文件系统的目录,比如说 HDFS;然后对 RDD 调用 checkpoint()方法。之后在 RDD 所处的 job 运行结束之后,会启动一个单独的 job,来将 checkpoint 过的 RDD 数据写入之前设置的文件系统,进行高可用、容错的类持久化操作。

检查点机制是我们在 spark streaming 中用来保障容错性的主要机制,它可以使 spark streaming 阶段性的把应用数据存储到诸如 HDFS 等可靠存储系统中,以供恢复时使用。具体来说基于以下两个目的服务:

控制发生失败时需要重算的状态数。Spark streaming 可以通过转化图的谱系图来重算状态,检查点机制则可以控制需要在转化图中回溯多远。

提供驱动器程序容错。如果流计算应用中的驱动器程序崩溃了,你可以重启驱动器程序并让驱动器程序从检查点恢复,这样 spark streaming 就可以读取之前运行的程序处理数据的进度,并从那里继续。

2.60 Checkpoint 和持久化机制的区别?

最主要的区别在于持久化只是将数据保存在 BlockManager 中,但是 RDD 的 lineage(血缘关系,依赖关系)是不变的。但是 checkpoint 执行完之后,rdd 已经没有之前所谓的依赖 rdd 了,而只有一个强行为其设置的 checkpointRDD,checkpoint 之后 rdd 的 lineage 就改变了。

持久化的数据丢失的可能性更大,因为节点的故障会导致磁盘、内存的数据丢失。但是 checkpoint 的数据通常是保存在高可用的文件系统中,比如 HDFS 中,所以数据丢失可能性比较低

2.61 Spark Streaming 以及基本工作原理?

Spark streaming 是 spark core API 的一种扩展,可以用于进行大规模、高吞吐量、容错的实时数据流的处理。

它支持从多种数据源读取数据,比如 Kafka、Flume、Twitter 和 TCP Socket,并且能够使用算子比如 map、reduce、join 和 window 等来处理数据,处理后的数据可以保存到文件系统、数据库等存储中。

Spark streaming 内部的基本工作原理是:接受实时输入数据流,然后将数据拆分成 batch,比如每收集一秒的数据封装成一个 batch,然后将每个 batch 交给 spark 的计算引擎进行处理,最后会生产处一个结果数据流,其中的数据也是一个一个的 batch 组成的。

2.62 DStream 以及基本工作原理?

DStream 是 spark streaming 提供的一种高级抽象,代表了一个持续不断的数据流。

DStream 可以通过输入数据源来创建,比如 Kafka、flume 等,也可以通过其他 DStream 的高阶函数来创建,比如 map、reduce、join 和 window 等。

DStream 内部其实不断产生 RDD,每个 RDD 包含了一个时间段的数据。

Spark streaming 一定是有一个输入的 DStream 接收数据,按照时间划分成一个一个的 batch,并转化为一个 RDD,RDD 的数据是分散在各个子节点的 partition 中。

2.63 Spark Streaming 整合 Kafka 的两种模式?

receiver 方式:将数据拉取到 executor 中做操作,若数据量大,内存存储不下,可以通过 WAL,设置了本地存储,保证数据不丢失,然后使用 Kafka 高级 API 通过 zk 来维护偏移量,保证消费数据。receiver 消费的数据偏移量是在 zk 获取的,此方式效率低,容易出现数据丢失。

receiver 方式的容错性:在默认的配置下,这种方式可能会因为底层的失败而丢失数据。如果要启用高可靠机制,让数据零丢失,就必须启用 Spark Streaming 的预写日志机制(Write Ahead Log,WAL)。该机制会同步地将接收到的 Kafka 数据写入分布式文件系统(比如 HDFS)上的预写日志中。所以,即使底层节点出现了失败,也可以使用预写日志中的数据进行恢复。

Kafka 中的 topic 的 partition,与 Spark 中的 RDD 的 partition 是没有关系的。在 1、KafkaUtils.createStream()中,提高 partition 的数量,只会增加 Receiver 方式中读取 partition 的线程的数量。不会增加 Spark 处理数据的并行度。 可以创建多个 Kafka 输入 DStream,使用不同的 consumer group 和 topic,来通过多个 receiver 并行接收数据。

基于 Direct 方式:使用 Kafka 底层 Api,其消费者直接连接 kafka 的分区上,因为 createDirectStream 创建的 DirectKafkaInputDStream 每个 batch 所对应的 RDD 的分区与 kafka 分区一一对应,但是需要自己维护偏移量,即用即取,不会给内存造成太大的压力,效率高。

优点:简化并行读取:如果要读取多个 partition,不需要创建多个输入 DStream 然后对它们进行 union 操作。Spark 会创建跟 Kafka partition 一样多的 RDD partition,并且会并行从 Kafka 中读取数据。所以在 Kafka partition 和 RDD partition 之间,有一个一对一的映射关系。

高性能:如果要保证零数据丢失,在基于 receiver 的方式中,需要开启 WAL 机制。这种方式其实效率低下,因为数据实际上被复制了两份,Kafka 自己本身就有高可靠的机制,会对数据复制一份,而这里又会复制一份到 WAL 中。而基于 direct 的方式,不依赖 Receiver,不需要开启 WAL 机制,只要 Kafka 中作了数据的复制,那么就可以通过 Kafka 的副本进行恢复。

receiver 与和 direct 的比较:

基于 receiver 的方式,是使用 Kafka 的高阶 API 来在 ZooKeeper 中保存消费过的 offset 的。这是消费 Kafka 数据的传统方式。这种方式配合着 WAL 机制可以保证数据零丢失的高可靠性,但是却无法保证数据被处理一次且仅一次,可能会处理两次。因为 Spark 和 ZooKeeper 之间可能是不同步的。

基于 direct 的方式,使用 Kafka 的低阶 API,Spark Streaming 自己就负责追踪消费的 offset,并保存在 checkpoint 中。Spark 自己一定是同步的,因此可以保证数据是消费一次且仅消费一次。

Receiver 方式是通过 zookeeper 来连接 kafka 队列,Direct 方式是直接连接到 kafka 的节点上获取数据。

2.64 Spark 主备切换机制原理知道吗?

Master 实际上可以配置两个,Spark 原生的 standalone 模式是支持 Master 主备切换的。当 Active Master 节点挂掉以后,我们可以将 Standby Master 切换为 Active Master。

Spark Master 主备切换可以基于两种机制,一种是基于文件系统的,一种是基于 ZooKeeper 的。

基于文件系统的主备切换机制,需要在 Active Master 挂掉之后手动切换到 Standby Master 上;

而基于 Zookeeper 的主备切换机制,可以实现自动切换 Master。

2.65 Spark 解决了 Hadoop 的哪些问题?

MR:抽象层次低,需要使用手工代码来完成程序编写,使用上难以上手;

Spark:Spark 采用 RDD 计算模型,简单容易上手。

MR:只提供 map 和 reduce 两个操作,表达能力欠缺;

Spark:Spark 采用更加丰富的算子模型,包括 map、flatmap、groupbykey、reducebykey 等;

MR:一个 job 只能包含 map 和 reduce 两个阶段,复杂的任务需要包含很多个 job,这些 job 之间的管理以来需要开发者自己进行管理;

Spark:Spark 中一个 job 可以包含多个转换操作,在调度时可以生成多个 stage,而且如果多个 map 操作的分区不变,是可以放在同一个 task 里面去执行;

MR:中间结果存放在 hdfs 中;

Spark:Spark 的中间结果一般存在内存中,只有当内存不够了,才会存入本地磁盘,而不是 hdfs;

MR:只有等到所有的 map task 执行完毕后才能执行 reduce task;

Spark:Spark 中分区相同的转换构成流水线在一个 task 中执行,分区不同的需要进行 shuffle 操作,被划分成不同的 stage 需要等待前面的 stage 执行完才能执行。

MR:只适合 batch 批处理,时延高,对于交互式处理和实时处理支持不够;

Spark:Spark streaming 可以将流拆成时间间隔的 batch 进行处理,实时计算。

2.66 数据倾斜的产生和解决办法?

数据倾斜以为着某一个或者某几个 partition 的数据特别大,导致这几个 partition 上的计算需要耗费相当长的时间。

在 spark 中同一个应用程序划分成多个 stage,这些 stage 之间是串行执行的,而一个 stage 里面的多个 task 是可以并行执行,task 数目由 partition 数目决定,如果一个 partition 的数目特别大,那么导致这个 task 执行时间很长,导致接下来的 stage 无法执行,从而导致整个 job 执行变慢。

避免数据倾斜,一般是要选用合适的 key,或者自己定义相关的 partitioner,通过加盐或者哈希值来拆分这些 key,从而将这些数据分散到不同的 partition 去执行。

如下算子会导致 shuffle 操作,是导致数据倾斜可能发生的关键点所在:groupByKey;reduceByKey;aggregaByKey;join;cogroup;

2.67 你用 Spark Sql 处理的时候, 处理过程中用的 DataFrame 还是直接写的 Sql?为什么?

这个问题的宗旨是问你 spark sql 中 dataframe 和 sql 的区别,从执行原理、操作方便程度和自定义程度来分析

这个问题。

2.68 Spark Master HA 主从切换过程不会影响到集群已有作业的运行,为什么?

不会的。

因为程序在运行之前,已经申请过资源了,driver 和 Executors 通讯,不需要和 master 进行通讯的。

2.69 .Spark Master 使用 Zookeeper 进行 HA**,有哪些源数据保存到** Zookeeper 里面?

spark 通过这个参数 spark.deploy.zookeeper.dir 指定 master 元数据在 zookeeper 中保存的位置,包括 Worker,Driver 和 Application 以及 Executors。standby 节点要从 zk 中,获得元数据信息,恢复集群运行状态,才能对外继续提供服务,作业提交资源申请等,在恢复前是不能接受请求的。

注:Master 切换需要注意 2 点:
1、在 Master 切换的过程中,所有的已经在运行的程序皆正常运行!
因为 Spark Application 在运行前就已经通过 Cluster Manager 获得了计算资源,所以在运行时 Job 本身的
调度和处理和 Master 是没有任何关系。

2、在 Master 的切换过程中唯一的影响是不能提交新的 Job:一方面不能够提交新的应用程序给集群,
因为只有 Active Master 才能接受新的程序的提交请求;另外一方面,已经运行的程序中也不能够因
Action 操作触发新的 Job 的提交请求。

2.70 如何实现Spark Streaming读取Flume中的数据?

可以这样说:
前期经过技术调研,查看官网相关资料,发现sparkStreaming整合flume有2种模式,一种是拉模式,一种是推模式,然后在简单的聊聊这2种模式的特点,以及如何部署实现,需要做哪些事情,最后对比两种模式的特点,选择那种模式更好。

推模式:Flume将数据Push推给Spark Streaming
拉模式:Spark Streaming从flume 中Poll拉取数据

2.71 在实际开发的时候是如何保证数据不丢失的?

可以这样说:

flume那边采用的channel是将数据落地到磁盘中,保证数据源端安全性(可以在补充一下,flume在这里的channel可以设置为memory内存中,提高数据接收处理的效率,但是由于数据在内存中,安全机制保证不了,故选择channel为磁盘存储。整个流程运行有一点的延迟性)

sparkStreaming通过拉模式整合的时候,使用了FlumeUtils这样一个类,该类是需要依赖一个额外的jar包(spark-streaming-flume_2.10)

要想保证数据不丢失,数据的准确性,可以在构建StreamingConext的时候,利用StreamingContext.getOrCreate(checkpoint, creatingFunc: () => StreamingContext)来创建一个StreamingContext,使用StreamingContext.getOrCreate来创建StreamingContext对象,传入的第一个参数是checkpoint的存放目录,第二参数是生成StreamingContext对象的用户自定义函数。如果checkpoint的存放目录存在,则从这个目录中生成StreamingContext对象;如果不存在,才会调用第二个函数来生成新的StreamingContext对象。在creatingFunc函数中,除了生成一个新的StreamingContext操作,还需要完成各种操作,然后调用ssc.checkpoint(checkpointDirectory)来初始化checkpoint功能,最后再返回StreamingContext对象。

这样,在StreamingContext.getOrCreate之后,就可以直接调用start()函数来启动(或者是从中断点继续运行)流式应用了。如果有其他在启动或继续运行都要做的工作,可以在start()调用前执行。

2.72 RDD有哪些缺陷?

不支持细粒度的写和更新操作,Spark写数据是粗粒度的,所谓粗粒度,就是批量写入数据,目的是为了提高效率。但是Spark读数据是细粒度的,也就是说可以一条条的读。

不支持增量迭代计算,如果对Flink熟悉,可以说下Flink支持增量迭代计算。

2.73 讲解spark shuffle原理和特性?shuffle write和shuffle read过程做些什么?

https://blog.csdn.net/zhanglh046/article/details/78360762

2.74 Shuffle数据块有多少种不同的存储方式?分别是什么

  1. RDD数据块:用来存储所缓存的RDD数据。
  2. Shuffle数据块:用来存储持久化的Shuffle数据。
  3. 广播变量数据块:用来存储所存储的广播变量数据。
  4. 任务返回结果数据块:用来存储在存储管理模块内部的任务返回结果。通常情况下任务返回结果随任务一起通过Akka返回到Driver端。但是当任务返回结果很大时,会引起Akka帧溢出,这时的另一种方案是将返回结果以块的形式放入存储管理模块,然后在Driver端获取该数据块即可,因为存储管理模块内部数据块的传输是通过Socket连接的,因此就不会出现Akka帧溢出了。
  5. 流式数据块:只用在Spark Streaming中,用来存储所接收到的流式数据块

2.75 哪些spark算子会有shuffle?

  1. 去重,distinct
  2. 排序,groupByKey,reduceByKey等
  3. 重分区,repartition,coalesce
  4. 集合或者表操作,interection,join

https://kuncle.github.io/spark/2017/03/13/Spark%E7%9A%84shuffle%E7%AE%97%E5%AD%90.html

2.76 讲解spark schedule(任务调度)?

在这里插入图片描述
https://www.cnblogs.com/missmzt/p/6734078.html

2.77 Spark stage是如何划分的?

  1. 从hdfs中读取文件后,创建 RDD 对象
  2. DAGScheduler模块介入运算,计算RDD之间的依赖关系。RDD之间的依赖关系就形成了DAG
  3. 每一个JOB被分为多个Stage,划分Stage的一个主要依据是当前计算因子的输入是否是确定的,如果是则将其分在同一个Stage,避免多个Stage之间的消息传递开销。

因此spark划分stage的整体思路是:从后往前推,遇到宽依赖就断开,划分为一个stage;遇到窄依赖就将这个RDD加入该stage中。

2.78 Spark cache一定能提升计算性能么?说明原因?

不一定啊,cache是将数据缓存到内存里,当小数据量的时候是能提升效率,但数据大的时候内存放不下就会报溢出。

2.79 RDD是弹性数据集,“弹性”体现在哪里呢?你觉得RDD有哪些缺陷?

  1. 自动进行内存和磁盘切换
  2. 基于lineage的高效容错
  3. task如果失败会特定次数的重试
  4. stage如果失败会自动进行特定次数的重试,而且只会只计算失败的分片
  5. checkpoint【每次对RDD操作都会产生新的RDD,如果链条比较长,计算比较笨重,就把数据放在硬盘中】和persist 【内存或磁盘中对数据进行复用】(检查点、持久化)
  6. 数据调度弹性:DAG TASK 和资源管理无关
  7. 数据分片的高度弹性repartion

缺陷:
惰性计算的缺陷也是明显的:中间数据默认不会保存,每次动作操作都会对数据重复计算,某些计算量比较大的操作可能会影响到系统的运算效率

2.80 RDD有多少种持久化方式?memory_only如果内存存储不了,会怎么操作?

cache和persist
memory_and_disk,放一部分到磁盘
MEMORY_ONLY_SER:同MEMORY_ONLY,但是会使用Java序列化方式,将Java对象序列化后进行持久化。可以减少内存开销,但是需要进行反序列化,因此会加大CPU开销。
MEMORY_AND_DSK_SER:同MEMORY_AND_DSK。但是使用序列化方式持久化Java对象。
DISK_ONLY:使用非序列化Java对象的方式持久化,完全存储到磁盘上。
MEMORY_ONLY_2或者MEMORY_AND_DISK_2等:如果是尾部加了2的持久化级别,表示会将持久化数据复用一份,保存到其他节点,从而在数据丢失时,不需要再次计算,只需要使用备份数据即可。

2.81 RDD分区和数据块有啥联系?

2.82 当GC时间占比很大可能的原因有哪些?对应的优化方法是?

垃圾回收的开销和对象合数成正比,所以减少对象的个数,就能大大减少垃圾回收的开销。序列化存储数据,每个RDD就是一个对象。缓存RDD占用的内存可能跟工作所需的内存打架,需要控制好

2.83 Spark中repartition和coalesce异同?coalesce什么时候效果更高,为什么

repartition(numPartitions:Int):RDD[T]
coalesce(numPartitions:Int, shuffle:Boolean=false):RDD[T]

以上为他们的定义,区别就是repartition一定会触发shuffle,而coalesce默认是不触发shuffle的。

他们两个都是RDD的分区进行重新划分,repartition只是coalesce接口中shuffle为true的简易实现,(假设RDD有N个分区,需要重新划分成M个分区)

减少分区提高效率

2.84 Groupbykey和reducebykey哪个性能更高,为什么?

reduceByKey性能高,更适合大数据集
https://www.jianshu.com/p/0c6705724cff

2.85 你是如何理解caseclass的?

https://blog.csdn.net/hellojoy/article/details/81034528

2.86 Scala里trait有什么功能,与class有何异同?什么时候用trait什么时候该用class

它可以被继承,而且支持多重继承,其实它更像我们熟悉的接口(interface),但它与接口又有不同之处是:
trait中可以写方法的实现,interface不可以(java8开始支持接口中允许写方法实现代码了),这样看起来trait又很像抽象类

2.87 Scala语法中to和until有啥区别

to 包含上界,until不包含上界

2.88 讲解Scala伴生对象和伴生类

单例对象与类同名时,这个单例对象被称为这个类的伴生对象,而这个类被称为这个单例对象的伴生类。伴生类和伴生对象要在同一个源文件中定义,伴生对象和伴生类可以互相访问其私有成员。不与伴生类同名的单例对象称为孤立对象。

import scala.collection.mutable.Map
class ChecksumAccumulator {
  private var sum = 0
  def add(b: Byte) {
    sum += b
  }
  def checksum(): Int = ~(sum & 0xFF) + 1
}

object ChecksumAccumulator {
  private val cache = Map[String, Int]()
  def calculate(s: String): Int =
    if (cache.contains(s))
    cache(s)
  else {
      val acc = new ChecksumAccumulator
      for (c <- s)
        acc.add(c.toByte)
      val cs = acc.checksum()
      cache += (s -> cs)
      println("s:"+s+" cs:"+cs)
      cs
    }

  def main(args: Array[String]) {
    println("Java 1:"+calculate("Java"))
    println("Java 2:"+calculate("Java"))
    println("Scala :"+calculate("Scala"))
  }
}

2.89 Spark相比MapReduce的计算模型有哪些区别?

  1. spark处理数据是基于内存的,而MapReduce是基于磁盘处理数据的。
  2. Spark在处理数据时构建了DAG有向无环图,减少了shuffle和数据落地磁盘的次数
  3. Spark是粗粒度资源申请,而MapReduce是细粒度资源申请

2.90 一个RDD的partition数量是由什么决定的?它和原始输入数据的文件数量是怎么样的对应关系?

https://blog.csdn.net/mzqadl/article/details/104217828

ChatGpt回答

1 spark堆外内存主要是哪些地方在用?

Spark堆外内存主要用于以下方面:

  1. 存储RDD序列化后的数据;
  2. 存储Shuffle过程中的数据;
  3. 存储Broadcast变量的数据;
  4. 存储Task运行时的数据结构,如Task的输入输出数据、Task的运行状态等。

2 spark中executor间相互fetch通讯是如何通讯的

在Spark中,当一个Executor需要从另一个Executor获取数据时,它可以使用两种不同的通信方式:

  1. 通过网络传输:该方式涉及将数据序列化并通过网络传输到请求方Executor。这种方式可以通过Spark的BlockTransferService实现,BlockTransferService提供了一种可靠的、基于NIO的数据传输机制,可以高效地传输大量数据。
  2. 通过本地磁盘:当数据量较大时,通过网络传输可能会导致性能瓶颈,因此Spark还提供了一种本地磁盘通信的方式。在这种情况下,Executor将数据写入本地磁盘,并将磁盘路径发送给请求方Executor。请求方Executor然后可以读取该路径中的数据。这种方式可以通过Spark的DiskBlockManager实现,它管理着本地磁盘上的Block数据,并提供了一些API来读写本地磁盘上的Block数据。

3 Spark中Executor是什么意思?作用是干啥的?

在Spark中,Executor是运行在集群节点上的一个进程,它们负责在Spark应用程序中执行任务(Tasks)。每个Executor都有自己的JVM进程,它们在运行时会加载应用程序代码和数据,并执行应用程序中的任务。

Executor的主要作用包括:

  1. 执行Task:Spark应用程序中所有的计算任务都是运行在Executor上的,Executor会根据Task的类型创建对应的计算上下文(如RDD、DataFrame等),然后在上面执行具体的计算。
  2. 管理内存:Executor负责管理内存,包括堆内内存和堆外内存。堆内内存主要用于存储Executor运行时的对象,而堆外内存主要用于存储RDD序列化后的数据、Shuffle数据、Broadcast变量等。
  3. 与Driver通信:Executor会与Driver程序进行通信,接收Driver发送的任务请求,并将计算结果返回给Driver。
  4. 处理数据本地化:Executor会根据数据本地性原则,在本地执行尽可能多的任务,从而避免数据的网络传输,提高计算效率。
  5. 处理任务失败和重试:Executor会监控任务的运行状态,如果任务失败,会向Driver报告错误,并进行任务重试或者重新启动Executor进程。

4 spark的stage是什么意思?作用是什么?

在Spark中,一个Stage是一个任务的逻辑分组,该任务可以在一个或多个Executor上并行执行。每个Stage都会根据数据依赖关系将任务划分为一组可以并行执行的任务,这些任务被称为Tasks。

Spark中的Stages有两种类型:

  1. ShuffleMapStage:这种类型的Stage是由输入数据的分区数决定的,每个分区都会被映射到一个Task。在这种Stage中,每个Task都会对数据进行处理,并生成一组键值对,这些键值对会被发送到下一个Stage中的Task进行处理。 ShuffleMapStage通常是Spark作业的第一个Stage,它的输出数据会被分区、排序、合并,然后传递给下一个Stage。
  2. ResultStage:这种类型的Stage是最终输出结果的Stage,它不需要进行数据Shuffle,因为所有的数据都已经被处理完成了。在ResultStage中,每个Task会从ShuffleMapStage或者其他ResultStage中获取数据,并将数据进行计算、聚合之后最终输出结果。

Spark中的Stage有以下作用:

  1. 提高并行度:将一个大任务拆分成多个Stage,可以提高任务的并行度,从而加速任务的执行。
  2. 减少数据Shuffle:通过将Map和Reduce操作划分到不同的Stage中,可以减少数据Shuffle的数据量和成本。
  3. 增强容错性:在Spark中,每个Stage都是一个独立的任务单元,因此在任务失败时,可以只重试失败的Stage,而不需要重试整个作业。这种设计提高了Spark的容错性,可以显著减少任务失败的影响。

5 spark的sql以Cluster模式提交,执行过程是什么样子的

当在Spark的SQL应用程序中以Cluster模式提交任务时,整个任务执行过程通常包括以下步骤:

  1. 提交应用程序:首先,用户需要使用Spark提交工具将应用程序提交到集群上运行。
  2. 创建Driver进程:Spark Driver进程是应用程序的主进程,它负责解析SQL语句,生成执行计划,调度任务,并将结果返回给客户端。当应用程序被提交时,Driver进程会在集群中的一个节点上启动。
  3. 解析SQL语句:Driver进程首先会解析SQL语句,并将其转换为Spark的逻辑执行计划。
  4. 生成物理执行计划:Spark会根据逻辑执行计划生成物理执行计划,该计划将SQL操作转换为一系列Spark任务。
  5. 提交任务到集群:Driver进程会将生成的任务提交到集群中的节点上执行。
  6. 执行任务:一旦任务被提交到节点上,Spark会启动Executor进程,并将任务分配给Executor。Executor进程会执行任务并将结果返回给Driver进程。
  7. 返回结果:一旦任务完成,Executor进程会将结果返回给Driver进程。Driver进程将结果汇总并返回给客户端。

总的来说,Spark的SQL执行过程涉及多个组件和步骤,包括Driver进程、Executor进程、任务调度和数据传输等。在Cluster模式下,Spark会自动将任务分配到集群中的多个节点上执行,从而提高任务的并行度和性能。

6 spark sql在k8s上,执行过程是什么样子的

在Kubernetes(K8s)上运行Spark SQL应用程序时,执行过程通常包括以下步骤:

  1. 创建Kubernetes集群:首先,用户需要在Kubernetes上创建一个集群,该集群包括一组工作节点和一个主节点(通常称为Kubernetes Master)。
  2. 准备Spark镜像:为了在Kubernetes上运行Spark应用程序,我们需要使用Docker将Spark打包成一个可运行的镜像,并将其上传到Docker Hub或者私有的Docker Registry中。
  3. 安装Spark Operator:Spark Operator是一个用于在Kubernetes上运行Spark应用程序的工具,它提供了自动化的部署、管理和监控功能。用户需要在Kubernetes上安装Spark Operator并创建一个SparkApplication资源,该资源描述了Spark应用程序的配置和参数。
  4. 提交Spark SQL应用程序:用户可以使用kubectl命令或者Kubernetes Dashboard将Spark SQL应用程序提交到Kubernetes上运行。Spark Operator会自动在集群中分配资源,并启动一组Spark Driver和Executor容器来执行任务。
  5. 执行Spark任务:一旦Spark SQL应用程序被提交到Kubernetes上,Spark Operator会自动启动一组Driver和Executor容器,并将任务分配给它们执行。在执行过程中,Spark会通过网络传输将数据加载到内存中,并在Executor容器中执行SQL操作。最终结果会被汇总到Driver容器中,然后返回给客户端。

总的来说,Spark SQL在Kubernetes上的执行过程与在其他环境中的执行过程类似,但需要通过Spark Operator来管理Spark应用程序的运行和资源分配。Spark Operator可以自动化地管理Spark应用程序的生命周期,并提供可靠的容错和弹性功能。

7 spark 内存分配原理

在Spark中,内存分配主要分为两部分:堆内存和堆外内存。堆内存主要用于存储Spark的元数据和Java对象,而堆外内存则主要用于存储Spark的数据和缓存。

堆内存分配:

Spark将JVM的堆内存分为两部分:JVM堆和Spark堆。JVM堆用于存储Java对象,而Spark堆用于存储Spark的元数据。Spark堆的大小由spark.memory.fraction参数控制,默认为0.6,表示将JVM堆的60%用于Spark堆。

在Spark中,元数据主要包括RDD的依赖关系、分区信息、序列化器等信息。这些元数据需要在任务执行期间一直保留在内存中,以便Spark可以快速地访问和处理数据。由于元数据通常比较小,因此Spark可以使用较小的内存空间来存储它们。

堆外内存分配:

Spark使用堆外内存来存储数据和缓存,以避免JVM堆内存的限制。堆外内存由两个部分组成:存储内存和执行内存。

存储内存用于存储数据,包括缓存的RDD数据和Shuffle数据。存储内存的大小由spark.memory.storageFraction参数控制,默认为0.5,表示将Spark堆的一半用于存储内存。执行内存用于存储任务执行期间的数据和中间结果,包括Shuffle数据的中间结果和任务计算过程中产生的临时变量。执行内存的大小由spark.memory.executionFraction参数控制,默认为0.2,表示将Spark堆的20%用于执行内存。

Spark还支持使用外部存储系统(如HDFS或S3)来存储数据,从而避免内存限制。外部存储系统可以将数据存

8 spark 调优参数讲解

在Spark中,有许多参数可以用来进行调优,以下是一些常用的参数及其作用:

  1. spark.driver.memory:用于设置Driver进程的内存大小,默认为1g。
  2. spark.executor.memory:用于设置Executor进程的内存大小,默认为1g。
  3. spark.executor.instances:用于设置Executor的数量,默认为2。
  4. spark.executor.cores:用于设置每个Executor可用的CPU核数,默认为1。
  5. spark.task.cpus:用于设置每个任务可用的CPU核数,默认为1。
  6. spark.driver.maxResultSize:用于设置Driver进程接收的最大结果集大小,默认为1g。
  7. spark.default.parallelism:用于设置并行度,默认为CPU核数。
  8. spark.shuffle.manager:用于设置Shuffle数据的管理方式,默认为SortShuffleManager。
  9. spark.shuffle.compress:用于设置Shuffle数据的压缩方式,默认为true。
  10. spark.shuffle.spill.compress:用于设置Spill数据的压缩方式,默认为true。
  11. spark.serializer:用于设置序列化器,默认为JavaSerializer。
  12. spark.memory.fraction:用于设置Spark堆内存占JVM堆的比例,默认为0.6。
  13. spark.memory.storageFraction:用于设置存储内存占Spark堆内存的比例,默认为0.5。
  14. spark.memory.executionFraction:用于设置执行内存占Spark堆内存的比例,默认为0.2。
  15. spark.sql.shuffle.partitions:用于设置SQL操作中的Shuffle分区数,默认为200。

以上是一些常用的Spark调优参数及其作用,通过调整这些参数能够提高Spark应用程序的性能和稳定性。需要注意的是,调优不应该只依赖于单个参数,而应该综合考虑多个参数的影响,以达到最佳的性能和稳定性。

9 spark中driver原理

在Spark中,Driver是Spark应用程序的主要进程,负责协调任务的执行,将任务分配给Executor进行计算,并将结果汇总。

Driver的主要功能包括:

  1. 解析Spark应用程序的代码,将其转换为RDD和DAG(有向无环图)。
  2. 将DAG分解为多个阶段(Stage),并将任务分配给Executor进行计算。
  3. 监控任务的执行情况,收集Executor的执行状态和结果,并将结果汇总。
  4. 在任务出错时进行恢复,并在任务完成后释放资源。

在Spark中,Driver运行在应用程序的客户端进程中,通常是一个JVM进程。客户端进程向Spark集群提交应用程序时,会启动Driver进程,该进程会在集群中分配资源,并启动Executor进程来执行任务。

在Spark中,Driver和Executor是相互独立的进程,它们之间通过网络通信来交换任务和数据。Driver会将任务分解为多个阶段,每个阶段都包括若干个任务,每个任务都会分配给一个Executor进行计算。Executor会从Driver获取任务,执行计算,并将结果返回给Driver。

需要注意的是,Driver进程通常需要具备较高的内存和计算能力,因为它需要负责处理整个应用程序的元数据和控制流程。如果Driver进程过于繁忙或过于消耗资源,可能会影响整个应用程序的性能和稳定性。因此,在设计Spark应用程序时,需要合理设置Driver的资源和并发度,以避免出现瓶颈和性能问题。

10 spark中的名词

Spark是一个分布式计算框架,其中有一些常用的名词:

  1. RDD(Resilient Distributed Datasets):弹性分布式数据集,是Spark的核心数据抽象,它是一组被分区的不可变对象的集合,支持并行操作。
  2. DataFrame:类似于关系型数据库的表格,是一种以列为主导的数据结构,支持SQL查询和数据操作。
  3. Dataset:是DataFrame的类型安全版本,支持编译期类型检查和面向对象的API。
  4. Executor:在Spark集群中执行任务的进程,每个Executor会分配一定数量的CPU核和内存资源来执行任务。
  5. Driver:Spark应用程序的主进程,负责协调任务的执行,将任务分配给Executor进行计算,并将结果汇总。
  6. DAG(Directed Acyclic Graph):有向无环图,用于表示Spark应用程序的任务依赖关系。
  7. Stage:Spark应用程序的任务分解后生成的一组阶段,每个阶段包含若干个任务,可以并行执行。
  8. Shuffle:在Spark应用程序中,将数据从一个节点传输到另一个节点的过程,通常是由于任务需要进行数据聚合或排序而引起的。
  9. Partition:将RDD分割成多个逻辑块,每个块都可以在不同的节点上并行处理。

以上是Spark中一些常用的名词,了解它们的含义对于理解Spark的工作机制和编程模型都非常重要。

11 spark堆外内存和堆内内存分配原理

在Spark中,堆内内存指的是Java虚拟机(JVM)的堆内存,而堆外内存指的是JVM之外的内存空间。Spark使用堆外内存来存储一些临时数据,例如排序缓冲区、聚合缓冲区等,这些数据通常比较大,不适合存储在JVM的堆内存中。

Spark中的堆外内存分配和回收是由Netty组件负责的,Netty是一个高性能的网络通信框架,可以支持堆外内存的分配和管理。在Spark中,每个Executor进程启动时,都会预留一定数量的堆外内存空间,用于存储临时数据。当任务需要使用堆外内存时,Spark会将该内存空间分配给任务,任务完成后,Spark会将内存空间释放回Netty的内存池中。

Spark中的堆内内存分配和回收是由JVM负责的,JVM会将堆内存分成多个区域,例如新生代和老年代。当Spark应用程序需要创建对象时,JVM会将对象存储在堆内存中,当对象不再被引用时,JVM会将其标记为可回收的,等待垃圾回收器回收。

需要注意的是,由于堆内内存的分配和回收需要涉及到JVM的垃圾回收机制,因此会产生一定的性能开销和延迟。而堆外内存则可以避免这些问题,因为它是由Netty组件直接管理的,可以更加高效地分配和回收内存空间。在Spark应用程序中,适当使用堆外内存可以提高性能和稳定性,尤其是在处理大规模数据时。

12 Spark中堆外内存来存储哪些临时数据

在Spark中,堆外内存主要用于存储一些临时数据,包括:

  1. 排序缓冲区:在进行排序操作时,需要将数据加载到内存中,并对其进行排序。为了避免将大量数据存储在JVM的堆内存中,Spark会使用堆外内存来存储排序缓冲区。
  2. 聚合缓冲区:在进行聚合操作时,需要将数据加载到内存中,并对其进行聚合计算。为了避免将大量数据存储在JVM的堆内存中,Spark会使用堆外内存来存储聚合缓冲区。
  3. 网络缓冲区:在进行网络通信时,需要将数据从一个节点传输到另一个节点。为了避免将大量数据存储在JVM的堆内存中,Spark会使用堆外内存来存储网络缓冲区。
  4. 内存映射文件:在处理大文件时,可以使用内存映射文件的方式将文件读取到内存中。为了避免将大量数据存储在JVM的堆内存中,Spark会使用堆外内存来存储内存映射文件。
  5. 其他临时数据:例如序列化/反序列化缓冲区、压缩/解压缩缓冲区等。

需要注意的是,堆外内存的分配和回收需要涉及到Netty组件,因此在使用堆外内存时,需要根据具体的场景和需求来配置堆外内存的大小和参数,以便达到最佳的性能和稳定性。

13 spark的排序缓冲区如何优化

在Spark中,排序是一个常见的操作,例如在进行groupByKey、reduceByKey和sortByKey等操作时,都需要进行数据的排序。由于排序操作需要将数据加载到内存中,并对其进行排序,因此在处理大规模数据时,排序的性能和效率是很重要的。

以下是一些优化Spark排序缓冲区的方法:

  1. 使用堆外内存:堆外内存可以避免将大量数据存储在JVM的堆内存中,从而提高排序的性能和效率。
  2. 使用内存序列化:内存序列化可以避免将数据序列化到磁盘上,从而提高排序的性能和效率。
  3. 调整缓冲区大小:Spark的排序操作默认使用的缓冲区大小是64KB,可以根据具体的场景和需求来调整缓冲区大小,以便达到最佳的性能和稳定性。
  4. 使用本地化存储:将数据本地化存储在每个Executor的本地磁盘上,可以减少数据的网络传输,从而提高排序的性能和效率。
  5. 使用基数排序:基数排序是一种高效的排序算法,可以在排序时减少比较次数和交换次数,从而提高排序的性能和效率。
  6. 使用分布式排序:在处理大规模数据时,可以使用分布式排序的方式,将数据分散到多个节点上进行排序,从而提高排序的性能和效率。

需要注意的是,优化Spark排序缓冲区需要根据具体的场景和需求来选择合适的方法和策略,以便达到最佳的性能和稳定性。

14 spark是如何创建Driver进程的

在Spark中,Driver进程是Spark应用程序的主进程,负责协调和管理整个应用程序的执行过程。Spark创建Driver进程的过程如下:

  1. 用户提交Spark应用程序:用户将Spark应用程序提交到Spark集群中,可以使用spark-submit脚本或者通过Spark API进行提交。

  2. Spark启动Driver进程:当Spark集群接收到用户提交的Spark应用程序时,会启动一个Driver进程,该进程运行在集群的某个节点上,并负责协调和管理整个应用程序的执行过程。

  3. Driver进程创建SparkContext对象:在启动Driver进程后,Spark会创建一个SparkContext对象,该对象是Spark应用程序的入口点,负责连接到Spark集群,并提供对Spark API的访问。

  4. Driver进程分配Executor进程:在Spark应用程序运行过程中,Driver进程会根据需要向集群申请Executor进程,用于执行具体的任务和作业。

  5. Executor进程执行任务:一旦Driver进程分配了Executor进程,Executor进程就可以开始执行具体的任务和作业,例如map、reduce、join等操作。

需要注意的是,Spark应用程序运行过程中,Driver进程是单点故障,如果Driver进程出现故障或崩溃,整个应用程序将会停止运行。因此,在实际应用中,需要使用一些高可用性的技术和策略来保证Driver进程的可靠性和稳定性,例如使用备份Driver进程、使用ZooKeeper等。

15 spark中Driver进程向集群申请Executor进程的过程是什么样子的?

在Spark中,Driver进程向集群申请Executor进程的过程如下:

  1. Driver进程向集群管理器申请资源:Driver进程会向集群管理器(例如YARN、Mesos或Standalone)申请资源,请求分配一定数量的Executor进程用于执行具体的任务和作业。

  2. 集群管理器分配资源:集群管理器会根据Driver进程的请求,分配一定数量的资源(例如CPU、内存、磁盘等)给Driver进程。

  3. Driver进程启动Executor进程:一旦集群管理器分配了资源,Driver进程就会启动相应数量的Executor进程,每个Executor进程都运行在集群中的一个节点上,并负责执行具体的任务和作业。

  4. Executor进程向Driver进程注册:在启动Executor进程后,Executor进程会向Driver进程注册自己,告诉Driver进程自己已经准备好接收任务和作业。

  5. Driver进程向Executor进程发送任务和作业:一旦Executor进程向Driver进程注册成功,Driver进程就可以向Executor进程发送具体的任务和作业,例如map、reduce、join等操作。

需要注意的是,Spark的资源调度和分配是一个动态的过程,集群管理器会根据集群的负载情况和资源使用情况,动态调整资源的分配和释放。因此,在实际应用中,需要根据具体的场景和需求来配置Spark的资源调度和分配策略,以便达到最佳的性能和稳定性。

16 spark中Driver进程的个数 和 Executor进程个数分别是多少个?

在Spark中,Driver进程的个数通常是1个,而Executor进程的个数则可以根据具体的需求和场景来配置,通常情况下建议根据集群的配置和资源情况来设置Executor进程的个数。

在实际应用中,可以通过以下方式来配置Executor进程的个数:

  1. 静态配置:可以通过在Spark应用程序中指定Executor进程的个数来进行配置,例如使用spark-submit命令时,可以通过–num-executors参数来指定Executor进程的个数。

  2. 动态调整:可以通过动态调整Executor进程的个数来适应不同的负载和资源需求,例如使用Spark的动态资源调整功能(Dynamic Allocation),根据当前的负载和资源使用情况来动态调整Executor进程的个数。

需要注意的是,Executor进程的个数不是越多越好,过多的Executor进程可能会导致资源的浪费和性能的下降,因此需要根据具体的场景和需求来选择合适的Executor进程个数。同时,由于Driver进程是整个应用程序的控制中心,因此通常情况下只需要一个Driver进程来协调和管理整个应用程序的执行过程。

17 spark 的client模式有Driver进程吗

在Spark中,Client模式和Cluster模式是两种不同的运行模式。在Client模式下,Driver进程运行在提交Spark应用程序的客户端机器上,而在Cluster模式下,Driver进程运行在集群的某个节点上。

因此,在Spark的Client模式下,会有一个Driver进程运行在客户端机器上,负责协调和管理整个应用程序的执行过程。客户端可以使用Spark API或者spark-submit命令来提交Spark应用程序到集群中,并且可以通过Driver进程的日志和控制台输出来监控和调试应用程序的执行过程。

需要注意的是,在Spark的Client模式下,客户端机器需要满足一定的硬件和软件要求,例如需要安装Spark的运行环境、配置正确的环境变量、具有足够的内存和CPU资源等。同时,由于Driver进程运行在客户端机器上,因此可能会受到网络延迟、带宽和稳定性等问题的影响,需要根据具体的场景和需求来选择合适的运行模式。

作者:admin  创建时间:2023-06-27 22:48
最后编辑:admin  更新时间:2024-04-07 15:40