当前位置: 首页 > news >正文

泰安放心的企业建站公司/新闻稿件代发平台

泰安放心的企业建站公司,新闻稿件代发平台,咸阳网站建设专业公司哪家好,域名和网站建设转载请注明原创地址 http://www.cnblogs.com/dongxiao-yang/p/6238029.html 最近需要详细研究下kafka reblance过程中分区计算的算法细节,网上搜了部分说法,感觉比较晦涩且不太易懂,还是自己抠源码比较简便一点。 kafka reblance计算部分代码…

转载请注明原创地址 http://www.cnblogs.com/dongxiao-yang/p/6238029.html

     最近需要详细研究下kafka reblance过程中分区计算的算法细节,网上搜了部分说法,感觉比较晦涩且不太易懂,还是自己抠源码比较简便一点。

kafka reblance计算部分代码如下:

class RangeAssignor() extends PartitionAssignor with Logging {def assign(ctx: AssignmentContext) = {val valueFactory = (topic: String) => new mutable.HashMap[TopicAndPartition, ConsumerThreadId]val partitionAssignment =new Pool[String, mutable.Map[TopicAndPartition, ConsumerThreadId]](Some(valueFactory))for (topic <- ctx.myTopicThreadIds.keySet) {
      val curConsumers = ctx.consumersForTopic(topic)val curPartitions: Seq[Int] = ctx.partitionsForTopic(topic)val nPartsPerConsumer = curPartitions.size / curConsumers.sizeval nConsumersWithExtraPart = curPartitions.size % curConsumers.sizeinfo("Consumer " + ctx.consumerId + " rebalancing the following partitions: " + curPartitions +" for topic " + topic + " with consumers: " + curConsumers)for (consumerThreadId <- curConsumers) {
        val myConsumerPosition = curConsumers.indexOf(consumerThreadId)assert(myConsumerPosition >= 0)val startPart = nPartsPerConsumer * myConsumerPosition + myConsumerPosition.min(nConsumersWithExtraPart)val nParts = nPartsPerConsumer + (if (myConsumerPosition + 1 > nConsumersWithExtraPart) 0 else 1)/***   Range-partition the sorted partitions to consumers for better locality.*  The first few consumers pick up an extra partition, if any.*/if (nParts <= 0)warn("No broker partitions consumed by consumer thread " + consumerThreadId + " for topic " + topic)else {for (i <- startPart until startPart + nParts) {val partition = curPartitions(i)info(consumerThreadId + " attempting to claim partition " + partition)// record the partition ownership decisionval assignmentForConsumer = partitionAssignment.getAndMaybePut(consumerThreadId.consumer)assignmentForConsumer += (TopicAndPartition(topic, partition) -> consumerThreadId)}}}}

 

  def getPartitionsForTopics(topics: Seq[String]): mutable.Map[String, Seq[Int]] = {getPartitionAssignmentForTopics(topics).map { topicAndPartitionMap =>val topic = topicAndPartitionMap._1val partitionMap = topicAndPartitionMap._2debug("partition assignment of /brokers/topics/%s is %s".format(topic, partitionMap))    (topic -> partitionMap.keys.toSeq.sortWith((s,t) => s < t))}}

 

  def getConsumersPerTopic(group: String, excludeInternalTopics: Boolean) : mutable.Map[String, List[ConsumerThreadId]] = {val dirs = new ZKGroupDirs(group)val consumers = getChildrenParentMayNotExist(dirs.consumerRegistryDir)val consumersPerTopicMap = new mutable.HashMap[String, List[ConsumerThreadId]]for (consumer <- consumers) {val topicCount = TopicCount.constructTopicCount(group, consumer, this, excludeInternalTopics)for ((topic, consumerThreadIdSet) <- topicCount.getConsumerThreadIdsPerTopic) {for (consumerThreadId <- consumerThreadIdSet)consumersPerTopicMap.get(topic) match {case Some(curConsumers) => consumersPerTopicMap.put(topic, consumerThreadId :: curConsumers)case _ => consumersPerTopicMap.put(topic, List(consumerThreadId))}}}for ( (topic, consumerList) <- consumersPerTopicMap )    consumersPerTopicMap.put(topic, consumerList.sortWith((s,t) => s < t))consumersPerTopicMap}

 

 

计算过程主要由上述高亮代码部分实现,举例说明,一个拥有十个分区的topic,相同group拥有三个consumerid为aaa,ccc,bbb的消费者

1 由后两段代码可知,获取consumerid列表和partition分区列表都是已经排好序的,所以

curConsumers=(aaa,bbb,ccc)

curPartitions=(0,1,2,3,4,5,6,7,8,9)

2

nPartsPerConsumer=10/3  =3

nConsumersWithExtraPart=10%3  =1

3 假设当前客户端id为aaa

myConsumerPosition= curConsumers.indexof(aaa) =0

4 计算分区范围

startPart= 3*0+0.min(1) = 0

nParts = 3+(if (0 + 1 > 1) 0 else 1)=3+1=4

所以aaa对应的分区号为[0,4),即0,1,2,3前面四个分区

同理可得bbb对应myConsumerPosition=1,对应分区4,5,6中间三个分区

ccc对应myConsumerPosition=2,对应7,8,9最后三个分区。

 

转载于:https://www.cnblogs.com/dongxiao-yang/p/6238029.html

http://www.jmfq.cn/news/5180581.html

相关文章:

  • 汽车网站建设的目的/100个免费推广网站
  • 网站背景动图怎么做/云南疫情最新消息
  • 什么网站做兼职可靠/女教师遭网课入侵视频大全
  • 心理咨询中心网站模板/2345纯净版推广包
  • 网站集约化建设情况汇报/如何写软文赚钱
  • 永年县网站/seo优化网络公司排名
  • 建站宝盒成品网站演示/乐陵seo优化
  • 做公务员题目的网站/河北网站优化公司
  • 做国际物流需要自己的网站吗/优书网首页
  • 北京网站百度推广/长春刚刚最新消息今天
  • dedecms微电影网站模板/搜狗引擎
  • 淄博网站制作服务/域名关键词排名查询
  • 阳春网站建设/广州seo全网营销
  • 南京网站制作哪家好/网店推广方法有哪些
  • 百度给企业做网站吗/广州抖音推广公司
  • 网站建设找盖亚科技/在线识别图片来源
  • 做网站的收益在哪/王通seo教程
  • 武汉做网站哪里好/优化疫情防控措施
  • 杭州企业网站建设/知乎关键词排名
  • 平度网站建设ld4/推介网
  • 阿里妈妈 该网站的域名已经被其他人绑定/seo网站推广经理
  • 面试建设单位在哪个网站/网络推广工作内容
  • 万网服务器网站建设/网站友链查询源码
  • 阜阳网站建设价格/竞价排名点击器
  • 苏州市建设安全监督局网站/排名优化方案
  • 襄州区城乡建设局网站/江门seo
  • 芸志建站怎么建立网站/网络公关公司收费
  • 设计网站会员/海外营销
  • 餐饮企业网站建设方案书/sem竞价推广怎么做
  • 网站使用说明书/免费的行情网站app