德州做网站的/软文
大家好,我是不温卜火,是一名计算机学院大数据专业大二的学生,昵称来源于成语—
不温不火
,本意是希望自己性情温和
。作为一名互联网行业的小白,博主写博客一方面是为了记录自己的学习过程,另一方面是总结自己所犯的错误希望能够帮助到很多和自己一样处于起步阶段的萌新。但由于水平有限,博客中难免会有一些错误出现,有纰漏之处恳请各位大佬不吝赐教!暂时只有csdn这一个平台,博客主页:https://buwenbuhuo.blog.csdn.net/
本片博文为大家带来的是Spark集群启动流程的简单分析。
目录
- 1. `start-master.sh Master` 启动脚本
- 2. `start-slaves.sh Worker` 启动脚本
- 3. `Master` 启动源码
- 4. `Worker` 启动源码
本片博文主要分析的是Standalone
模式下 Spark 集群(Master, work
)启动流程
- start-all.sh脚本,实际是执行java -cp Master和java -cp Worker;
- Master启动时首先创建一个RpcEnv对象,负责管理所有通信逻辑;
- Master通过RpcEnv对象创建一个Endpoint,Master就是一个Endpoint,Worker可以与其进行通信;
- Worker启动时也是创建一个RpcEnv对象;
- Worker通过RpcEnv对象创建一个Endpoint;
- Worker通过RpcEnv对象建立到Master的连接,获取到一个RpcEndpointRef对象,通过该对象可以与Master通信;
- Worker向Master注册,注册内容包括主机名、端口、CPU Core数量、内存数量;
- Master接收到Worker的注册,将注册信息维护在内存中的Table中,其中还包含了一个到Worker的RpcEndpointRef对象引用;
- Master回复Worker已经接收到注册,告知Worker已经注册成功;
- Worker端收到成功注册响应后,开始周期性向Master发送心跳。
各启动脚本及源码分析
1. start-master.sh Master
启动脚本
启动 Master 的主要 shell
流程
- 1. start-master.sh
"${SPARK_HOME}/sbin"/spark-daemon.sh start $CLASS 1 \--host $SPARK_MASTER_HOST --port $SPARK_MASTER_PORT --webui-port $SPARK_MASTER_WEBUI_PORT \$ORIGINAL_ARGS
- 2. spark-daemon.sh
case $option in(start)run_command class "$@";;esacrun_command() {mode="$1"case "$mode" in(class)execute_command nice -n "$SPARK_NICENESS" "${SPARK_HOME}"/bin/spark-class "$command" "$@";;esac}execute_command() {if [ -z ${SPARK_NO_DAEMONIZE+set} ]; then# 最终以后台守护进程的方式启动 Masternohup -- "$@" >> $log 2>&1 < /dev/null &fi}
- 3. 启动类
/opt/module/spark-standalone/bin/spark-class org.apache.spark.deploy.master.Master --host hadoop002--port 7077 --webui-port 8080bin/spark-class
- 4. 启动命令
/opt/module/jdk1.8.0_172/bin/java -cp /opt/module/spark-standalone/conf/:/opt/module/spark-standalone/jars/* -Xmx1g org.apache.spark.deploy.master.Master --host hadoop002 --port 7077 --webui-port 8080
2. start-slaves.sh Worker
启动脚本
启动 Worker 的主要 shell
流程
- 1. start-slaves.sh
"${SPARK_HOME}/sbin/slaves.sh" cd "${SPARK_HOME}" \; "${SPARK_HOME}/sbin/start-slave.sh" "spark://$SPARK_MASTER_HOST:$SPARK_MASTER_PORT"
- 2. start-slave.sh
# worker类CLASS="org.apache.spark.deploy.worker.Worker"if [ "$SPARK_WORKER_WEBUI_PORT" = "" ]; then# worker webui 端口号SPARK_WORKER_WEBUI_PORT=8081 fiif [ "$SPARK_WORKER_INSTANCES" = "" ]; thenstart_instance 1 "$@"fi# 启动worker实例 spark-daemon.sh在启动Master的时候已经使用过一次了function start_instance {"${SPARK_HOME}/sbin"/spark-daemon.sh start $CLASS $WORKER_NUM \--webui-port "$WEBUI_PORT" $PORT_FLAG $PORT_NUM $MASTER "$@"}
- 3. 最终启动类
/opt/module/spark-standalone/bin/spark-class org.apache.spark.deploy.worker.Worker --webui-port 8081 spark://hadoop002:7077
bin/spark-class
- 4. 启动命令
opt/module/jdk1.8.0_172/bin/java -cp /opt/module/spark-standalone/conf/:/opt/module/spark-standalone/jars/* -Xmx1g org.apache.spark.deploy.worker.Worker --webui-port 8081 spark://hadoop002:7077
3. Master
启动源码
- 1. Master 源码
org.apache.spark.deploy.master.Master
- 2. Master伴生对象
启动Master的入口为Master伴生对象的main方法:
private[deploy] object Master extends Logging {val SYSTEM_NAME = "sparkMaster"val ENDPOINT_NAME = "Master"// 启动 Master 的入口函数def main(argStrings: Array[String]) {Utils.initDaemon(log)val conf = new SparkConf// 构建用于参数解析的实例 --host hadoop201 --port 7077 --webui-port 8080val args = new MasterArguments(argStrings, conf)// 启动 RPC 通信环境和 MasterEndPoint(通信终端)val (rpcEnv, _, _) = startRpcEnvAndEndpoint(args.host, args.port, args.webUiPort, conf)rpcEnv.awaitTermination()}/*** Start the Master and return a three tuple of:* 启动 Master 并返回一个三元组* (1) The Master RpcEnv* (2) The web UI bound port* (3) The REST server bound port, if any*/def startRpcEnvAndEndpoint(host: String,port: Int,webUiPort: Int,conf: SparkConf): (RpcEnv, Int, Option[Int]) = {val securityMgr = new SecurityManager(conf)// 创建 Master 端的 RpcEnv 环境 参数: sparkMaster hadoop201 7077 conf securityMgr// 实际类型是: NettyRpcEnvval rpcEnv: RpcEnv = RpcEnv.create(SYSTEM_NAME, host, port, conf, securityMgr)// 创建 Master对象, 该对象就是一个 RpcEndpoint, 在 RpcEnv中注册这个RpcEndpoint// 返回该 RpcEndpoint 的引用, 使用该引用来接收信息和发送信息val masterEndpoint: RpcEndpointRef = rpcEnv.setupEndpoint(ENDPOINT_NAME,new Master(rpcEnv, rpcEnv.address, webUiPort, securityMgr, conf))// 向 Master 的通信终端发送请求,获取 BoundPortsResponse 对象// BoundPortsResponse 是一个样例类包含三个属性: rpcEndpointPort webUIPort restPortval portsResponse: BoundPortsResponse = masterEndpoint.askWithRetry[BoundPortsResponse](BoundPortsRequest)(rpcEnv, portsResponse.webUIPort, portsResponse.restPort)}
}
- 3.
RpcEnv
的创建
真正的创建是调用NettyRpcEnvFactory的create方法创建的.
创建 NettyRpcEnv的时候, 会创建消息分发器, 收件箱和存储远程地址与发件箱的 Map
RpcEnv.scala
def create(name: String,bindAddress: String,advertiseAddress: String,port: Int,conf: SparkConf,securityManager: SecurityManager,clientMode: Boolean): RpcEnv = {// 保存 RpcEnv 的配置信息val config = RpcEnvConfig(conf, name, bindAddress, advertiseAddress, port, securityManager,clientMode)// 创建 NettyRpcEvnnew NettyRpcEnvFactory().create(config)
}
NettyRpcEnvFactory
private[rpc] class NettyRpcEnvFactory extends RpcEnvFactory with Logging {def create(config: RpcEnvConfig): RpcEnv = {val sparkConf = config.conf// Use JavaSerializerInstance in multiple threads is safe. However, if we plan to support// KryoSerializer in future, we have to use ThreadLocal to store SerializerInstance// 用于 Rpc传输对象时的序列化val javaSerializerInstance: JavaSerializerInstance = new JavaSerializer(sparkConf).newInstance().asInstanceOf[JavaSerializerInstance]// 实例化 NettyRpcEnvval nettyEnv = new NettyRpcEnv(sparkConf,javaSerializerInstance,config.advertiseAddress,config.securityManager)if (!config.clientMode) {// 定义 NettyRpcEnv 的启动函数val startNettyRpcEnv: Int => (NettyRpcEnv, Int) = { actualPort =>nettyEnv.startServer(config.bindAddress, actualPort)(nettyEnv, nettyEnv.address.port)}try {// 启动 NettyRpcEnvUtils.startServiceOnPort(config.port, startNettyRpcEnv, sparkConf, config.name)._1} catch {case NonFatal(e) =>nettyEnv.shutdown()throw e}}nettyEnv}
}
- 4. Master伴生类(Master 端的 RpcEndpoint 启动)
Master是一个RpcEndpoint
.
他的生命周期方法是:
constructor -> onStart -> receive* -> onStop
onStart 主要代码片段
// 创建 WebUI 服务器
webUi = new MasterWebUI(this, webUiPort)
// 按照固定的频率去启动线程来检查 Worker 是否超时. 其实就是给自己发信息: CheckForWorkerTimeOut
// 默认是每分钟检查一次.
checkForWorkerTimeOutTask = forwardMessageThread.scheduleAtFixedRate(new Runnable {override def run(): Unit = Utils.tryLogNonFatalError {// 在 receive 方法中对 CheckForWorkerTimeOut 进行处理self.send(CheckForWorkerTimeOut)}
}, 0, WORKER_TIMEOUT_MS, TimeUnit.MILLISECONDS)
处理Worker是否超时的方法
/** Check for, and remove, any timed-out workers */
private def timeOutDeadWorkers() {// Copy the workers into an array so we don't modify the hashset while iterating through itval currentTime = System.currentTimeMillis()// 把超时的 Worker 从 Workers 中移除val toRemove = workers.filter(_.lastHeartbeat < currentTime - WORKER_TIMEOUT_MS).toArrayfor (worker <- toRemove) {if (worker.state != WorkerState.DEAD) {logWarning("Removing %s because we got no heartbeat in %d seconds".format(worker.id, WORKER_TIMEOUT_MS / 1000))removeWorker(worker)} else {if (worker.lastHeartbeat < currentTime - ((REAPER_ITERATIONS + 1) * WORKER_TIMEOUT_MS)) {workers -= worker // we've seen this DEAD worker in the UI, etc. for long enough; cull it}}}
}
到此, Master
启动完成.
4. Worker
启动源码
- 1. Worker 源码
org.apache.spark.deploy.worker.Worker
- 2. Worker伴生对象
启动流程基本和 Master 一致.
private[deploy] object Worker extends Logging {val SYSTEM_NAME = "sparkWorker"val ENDPOINT_NAME = "Worker"def main(argStrings: Array[String]) {Utils.initDaemon(log)val conf = new SparkConf// 构建解析参数的实例val args = new WorkerArguments(argStrings, conf)// 启动 Rpc 环境和 Rpc 终端val rpcEnv = startRpcEnvAndEndpoint(args.host, args.port, args.webUiPort, args.cores,args.memory, args.masters, args.workDir, conf = conf)rpcEnv.awaitTermination()}def startRpcEnvAndEndpoint(host: String,port: Int,webUiPort: Int,cores: Int,memory: Int,masterUrls: Array[String],workDir: String,workerNumber: Option[Int] = None,conf: SparkConf = new SparkConf): RpcEnv = {// The LocalSparkCluster runs multiple local sparkWorkerX RPC Environmentsval systemName = SYSTEM_NAME + workerNumber.map(_.toString).getOrElse("")val securityMgr = new SecurityManager(conf)// 创建 RpcEnv 实例 参数: "sparkWorker", "hadoop201", 8081, conf, securityMgrval rpcEnv = RpcEnv.create(systemName, host, port, conf, securityMgr)// 根据传入 masterUrls 得到 masterAddresses. 就是从命令行中传递过来的 Master 地址val masterAddresses = masterUrls.map(RpcAddress.fromSparkURL(_))// 最终实例化 Worker 得到 Worker 的 RpcEndpointrpcEnv.setupEndpoint(ENDPOINT_NAME, new Worker(rpcEnv, webUiPort, cores, memory,masterAddresses, ENDPOINT_NAME, workDir, conf, securityMgr))rpcEnv}
}
- 3. Worker伴生类
onStart 方法
override def onStart() {// 第一次启动断言 Worker 未注册assert(!registered)// 创建工作目录createWorkDir()// 启动 shuffle 服务shuffleService.startIfEnabled()// Worker的 WebUIwebUi = new WorkerWebUI(this, workDir, webUiPort)webUi.bind()workerWebUiUrl = s"http://$publicAddress:${webUi.boundPort}"// 向 Master 注册 WorkerregisterWithMaster()
}
registerWithMaster 方法
关键代码:
// 向所有的 Master 注册
registerMasterFutures = tryRegisterAllMasters()
tryRegisterAllMasters() 方法
private def tryRegisterAllMasters(): Array[JFuture[_]] = {masterRpcAddresses.map { masterAddress =>// 从线程池中启动线程来执行 Worker 向 Master 注册registerMasterThreadPool.submit(new Runnable {override def run(): Unit = {try {// 根据 Master 的地址得到一个 Master 的 RpcEndpointRef, 然后就可以和 Master 进行通讯了.val masterEndpoint = rpcEnv.setupEndpointRef(masterAddress, Master.ENDPOINT_NAME)// 向 Master 注册registerWithMaster(masterEndpoint)} catch {}}})}
}
registerWithMaster 方法
private def registerWithMaster(masterEndpoint: RpcEndpointRef): Unit = {// 向 Master 对应的 receiveAndReply 方法发送信息// 信息的类型是 RegisterWorker, 包括 Worker 的一些信息: id, 主机地址, 端口号, 内存, webUimasterEndpoint.ask[RegisterWorkerResponse](RegisterWorker(workerId, host, port, self, cores, memory, workerWebUiUrl)).onComplete {// This is a very fast action so we can use "ThreadUtils.sameThread"// 注册成功case Success(msg) =>Utils.tryLogNonFatalError {handleRegisterResponse(msg)}// 注册失败case Failure(e) =>logError(s"Cannot register with master: ${masterEndpoint.address}", e)System.exit(1)}(ThreadUtils.sameThread)
}
Master的receiveAndReply方法
// 处理 Worker 的注册信息
case RegisterWorker(
id, workerHost, workerPort, workerRef, cores, memory, workerWebUiUrl) =>if (state == RecoveryState.STANDBY) {// 给发送者回应消息. 对方的 receive 方法会收到这个信息context.reply(MasterInStandby)} else if (idToWorker.contains(id)) { // 如果要注册的 Worker 已经存在context.reply(RegisterWorkerFailed("Duplicate worker ID"))} else {// 根据传来的信息封装 WorkerInfoval worker = new WorkerInfo(id, workerHost, workerPort, cores, memory,workerRef, workerWebUiUrl)if (registerWorker(worker)) { // 注册成功persistenceEngine.addWorker(worker)// 响应信息context.reply(RegisteredWorker(self, masterWebUiUrl))schedule()} else {val workerAddress = worker.endpoint.addresscontext.reply(RegisterWorkerFailed("Attempted to re-register worker at same address: "+ workerAddress))}}
worker的handleRegisterResponse方法
case RegisteredWorker(masterRef, masterWebUiUrl) =>logInfo("Successfully registered with master " + masterRef.address.toSparkURL)// 已经注册过了registered = true// 更新 MasterchangeMaster(masterRef, masterWebUiUrl)// 通知自己给 Master 发送心跳信息 默认 1 分钟 4 次forwordMessageScheduler.scheduleAtFixedRate(new Runnable {override def run(): Unit = Utils.tryLogNonFatalError {self.send(SendHeartbeat)}}, 0, HEARTBEAT_MILLIS, TimeUnit.MILLISECONDS)
Worker的receive方法
case SendHeartbeat =>if (connected) {// 给 Master 发送心跳sendToMaster(Heartbeat(workerId, self))}
Master的receive方法:
case Heartbeat(workerId, worker) =>idToWorker.get(workerId) match {case Some(workerInfo) =>// 记录该 Worker 的最新心跳workerInfo.lastHeartbeat = System.currentTimeMillis()}
到此, Worker启动完成
本次的分享就到这里了,
好书不厌读百回,熟读课思子自知。而我想要成为全场最靓的仔,就必须坚持通过学习来获取更多知识,用知识改变命运,用博客见证成长,用行动证明我在努力。
如果我的博客对你有帮助、如果你喜欢我的博客内容,请“点赞” “评论”“收藏”
一键三连哦!听说点赞的人运气不会太差,每一天都会元气满满呦!如果实在要白嫖的话,那祝你开心每一天,欢迎常来我博客看看。
码字不易,大家的支持就是我坚持下去的动力。点赞后不要忘了关注
我哦!