当前位置: 首页 > news >正文

最新网站信息/促销活动推广方案

最新网站信息,促销活动推广方案,自助做网站,wordpress 首页模板修改点击箭头处“蓝色字”,关注我们哦!!Flink中JobMaster、ResourceManager、Dispatcher、WebMonitorEndpoint提供了基于zookeeper高可用,涉及到leader选举与监听, leader选举基于zookeeper开源客户端CuratorFramework 的L…

5e1c5bccdea7d8458fc28c8f39d52630.gif

点击箭头处“蓝色字”,关注我们哦!!


Flink中JobMaster、ResourceManager、Dispatcher、WebMonitorEndpoint提供了基于zookeeper高可用,涉及到leader选举与监听, leader选举基于zookeeper开源客户端CuratorFramework 的LeaderLatch方式实现,监听则通过NodeCache实现。基于此Flink提供了zookeeper高可用ZooKeeperHaServices, 通过该工具类可以创建LeaderElectionService与LeaderRetrievalService,包含了对应与zookeeper的znode节点分别是

/resource_manager_lock/dispatcher_lock/job_manager_lock/rest_server_lock
LeaderElectionService用于leader选举服务,基于zk的实现类是ZooKeeperLeaderElectionService,LeaderRetrievalService用于监听leader恢复服务,基于zk的实现类是ZooKeeperLeaderRetrievalService。涉及到两个重要的角色:LeaderContender接口,在leader选举中使用,代表了参与leader竞争的角色,其实现类有JobManagerRunner、ResourceManager、Dispatcher、WebMonitorEndpoint,该接口中包含了两个重要的方法:1.  grantLeadership,表示leader竞选成功的回调方法2.  revokeLeadership,表示由leader变为非leader的回调方法LeaderRetrievalListener接口,在监听leader恢复中使用,代表的是一个监听者,其实现类有ResourceManagerLeaderListener、JobManagerLeaderListener等,该接口包含一个重要的方法:1. notifyLeaderAddress,leader发生变化时回调方法以JobMaster与TaskExecutor之间的交互为例,JobMaster需要通过LeaderElectionService完成leader的选举,而TaskExecutor则需要监听jobMaster leader的变化,通过LeaderRetrievalService完成。
JobMaster选举JobManagerRunner负责JobMaster的选举与启动,内部包含了LeaderElectionService对象属性leaderElectionService,也就是ZooKeeperLeaderElectionService的对象,在调用JobManagerRunner的start方法时会调用leaderElectionService的start方法,传入参数this也就是当前的JobManagerRunner对象,
public void start() throws Exception {    try {      leaderElectionService.start(this);    } catch (Exception e) {      log.error("Could not start the JobManager because the leader election service did not start.", e);      throw new Exception("Could not start the leader election service.", e);    }  }
在ZooKeeperLeaderElectionService 的start方法里面
public void start(LeaderContender contender) throws Exception {    Preconditions.checkNotNull(contender, "Contender must not be null.");    Preconditions.checkState(leaderContender == null, "Contender was already set.");    LOG.info("Starting ZooKeeperLeaderElectionService {}.", this);    synchronized (lock) {      client.getUnhandledErrorListenable().addListener(this);      leaderContender = contender;      leaderLatch.addListener(this);      leaderLatch.start();      cache.getListenable().addListener(this);      cache.start();      client.getConnectionStateListenable().addListener(listener);      running = true;    }  }
leaderLatch 表示的是CuratorFramework里面参与leader选举的LeaderLatch对象,添加了一个this的listener, 也就是LeaderLatchListener,选举成功会调用isLeader方法,由leader变为非leader调用notLeader方法;并且还要通过NodeCache方式添加了监控当前节点变化的listener,也就是NodeCacheListener,当监听的节点发生变化则调用nodeChanged方法。若参与者竞选leader成功就会调用isLeader方法
public void isLeader() {    synchronized (lock) {      if (running) {        issuedLeaderSessionID = UUID.randomUUID();        confirmedLeaderSessionID = null;        if (LOG.isDebugEnabled()) {          LOG.debug(            "Grant leadership to contender {} with session ID {}.",            leaderContender.getAddress(),            issuedLeaderSessionID);        }        leaderContender.grantLeadership(issuedLeaderSessionID);      } else {        LOG.debug("Ignoring the grant leadership notification since the service has " +          "already been stopped.");      }    }  }
在里面会调用leaderContender的grantLeadership方法,也就是JobManagerRunner的grantLeadership,就会执行JobMaster的启动并且执行任务的调度。在检测到选择leader所用的path发生变化就会调用nodeChanged方法,在nodeChanged方法里面判断当前角色是leader就将leader信息写入到zookeeper路径里面去。
TaskExecutor监听在TaskExecutor中包含了一个JobLeaderService的对象属性jobLeaderService,在jobLeaderService中维护了所有的jobmaster leader,并且监听它,监听正是通过ZooKeeperLeaderRetrievalService完成。在申请slot时会调用TaskExecutor的requestSlot方法,在该方法里面调用jobLeaderService.add方法,
public void addJob(final JobID jobId, final String defaultTargetAddress) throws Exception {    Preconditions.checkState(JobLeaderService.State.STARTED == state, "The service is currently not running.");    LOG.info("Add job {} for job leader monitoring.", jobId);    final LeaderRetrievalService leaderRetrievalService = highAvailabilityServices.getJobManagerLeaderRetriever(      jobId,      defaultTargetAddress);    JobLeaderService.JobManagerLeaderListener jobManagerLeaderListener = new JobManagerLeaderListener(jobId);    final Tuple2 oldEntry = jobLeaderServices.put(jobId, Tuple2.of(leaderRetrievalService, jobManagerLeaderListener));    if (oldEntry != null) {      oldEntry.f0.stop();      oldEntry.f1.stop();    }    leaderRetrievalService.start(jobManagerLeaderListener);  }
在该方法里面开始对jobmaster leader的监听,创建一个LeaderRetrievalService对象,并且调用其start方法,传入参数是JobManagerLeaderListener对象,也就是LeaderRetrievalListener对象,在ZooKeeperLeaderRetrievalService的start方法里面,通过NodeCache方式添加了监控当前节点变化的listener,也就是NodeCacheListener,当监听的节点发生变化则调用nodeChanged方法,而在该方法里面会调用LeaderRetrievalListener的notifyLeaderAddress方法,也就是JobManagerLeaderListener的notifyLeaderAddress方法,会重新与新的leader建立连接。5e258db9d7878b5d98007c803dba7a69.png

往期精彩回顾

510e80562d751eefa4a4b8cd06011ea5.png

StreamOperator源码简析

StreamTask源码分析

Flink心跳机制分析

Flink中延时调用设计与实现

a337a91fbd73030665cfd8d9d4439cc2.png

关注回复Flink

获取更多系列

好看,就要点个"在看"

2ec5f5fcdd11518f6f6beff82278f3ef.gif
http://www.jmfq.cn/news/5169043.html

相关文章:

  • 静态网站设计/公司网站策划宣传
  • 地方型旅游网站/百度推广登录入口官网网址
  • 电脑手机网站建设/搭建网站的步骤和顺序
  • 济源市建设管理处网站/成人英语培训
  • 濮阳网站建设哪家好/长沙网站开发
  • wordpress知更鸟模板/关键词优化
  • 做视频网站赚钱/厦门网站推广优化哪家好
  • 网站设计的技能要求/app注册拉新平台
  • 美食网站建设实施方案/seo搜索引擎优化怎么优化
  • 网站内容图片怎么做的/大连网站推广
  • 凡科能上传自己做的网站/seo关键词
  • 加盟广告/企业网站seo推广方案
  • wordpress删除评论框/seo快速排名服务
  • 自架服务器建设网站/电商网店
  • 网站交互主要做什么/广告推广方案
  • 网站建设项目规划审批/seo搜索引擎优化工程师招聘
  • 网站建设公司如何找客户/东莞网站定制开发
  • 商城微信网站开发/常用的网络营销平台有哪些
  • 卖花网站源码/上海最新新闻热点事件
  • 番禺人才网服务社会/seo01网站
  • 手机网站做seo/2022最新引流推广平台
  • 二级域名ip查询/金华百度seo
  • 网站关键字 怎么设置/实时热搜
  • HTML建网站/域名注册需要哪些条件
  • 微网站怎么建设/昆明优化网站公司
  • 本地建站工具/怎样申请网站注册
  • 网站换dns/新手怎么做seo优化
  • 杭州网站开发与设计/武汉大学人民医院精神科
  • 如何拥有自己的私人网站平台/网络优化大师下载
  • 给自己女朋友做的网站/佛山网站建设