网站开发 简历/新区seo整站优化公司
1、Producer加载元数据深度剖析
1.1、Producer加载元数据整体流程分析
-
前面我们大概了解了Producer核心流程【初探】,接下来我们深入剖析下Producer加载元数据的核心细节
-
回顾同步等待拉取元数据核心代码
/*** todo: 第一步:同步等待拉取元数据* maxBlockTimeMs 最多能等待多久 默认是1分钟*/ClusterAndWaitTime clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs);
-
下面就看看
waitOnMetadata
方法的底层实现//todo: 连接上kafka集群,获取topic对应的元数据信息 private ClusterAndWaitTime waitOnMetadata(String topic, Integer partition, long maxWaitMs) throws InterruptedException {// add topic to metadata topic list if it is not there already and reset expiry//todo: 把当前topic添加到元数据对象中metadata.add(topic);//todo: 我们使用的是场景驱动的方式,然后我们目前代码执行到的producer端初始化完成。//我们知道这个cluster里面其实没有元数据,只是有我们写代码的时候设置addressCluster cluster = metadata.fetch();//todo: 根据当前的topic从这个集群的cluster元数据信息里面查看分区的信息。//因为我们目前是第一次执行这段代码,所以这儿肯定是没有对应的分区的信息的。Integer partitionsCount = cluster.partitionCountForTopic(topic);// Return cached metadata if we have it, and if the record's partition is either undefined// or within the known partition range//todo: 如果在元数据里面获取到了分区的信息//我们用场景驱动的方式,我们知道如果是第一次代码进来这儿,代码是不会运行这儿。if (partitionsCount != null && (partition == null || partition < partitionsCount))//todo: 直接返回cluster元数据信息,拉取元数据花的时间。return new ClusterAndWaitTime(cluster, 0);//todo:如果代码执行到这儿,说明真的需要去服务端拉取元数据。//记录当前时间long begin = time.milliseconds();//剩余多少时间,默认值给的是 最多可以等待的时间。long remainingWaitMs = maxWaitMs;//已经花了多少时间。long elapsed;// Issue metadata requests until we have metadata for the topic or maxWaitTimeMs is exceeded.// In case we already have cached metadata for the topic, but the requested partition is greater// than expected, issue an update request only once. This is necessary in case the metadata// is stale and the number of partitions for this topic has increased in the meantime.do {log.trace("Requesting metadata update for topic {}.", topic);//todo: 1)获取当前元数据的版本//在Producer管理元数据时候,对于他来说元数据是有版本号的。//每次成功更新元数据,都会递增这个版本号。//todo: 2)把needUpdate 标识赋值为trueint version = metadata.requestUpdate();//todo: 唤醒sender线程,开始执行拉取元数据操作,拉取元数据的操作是由Sender线程完成的//这里会涉及到java线程知识,并发知识,大家一定要掌握,没有掌握好的同学,大家需要补一补sender.wakeup();try {//todo: 同步的等待sender线程拉取元数据metadata.awaitUpdate(version, remainingWaitMs);} catch (TimeoutException ex) {// Rethrow with original maxWaitMs to prevent logging exception with remainingWaitMsthrow new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");}//todo: 获取一下集群的元数据信息。(上面可能成功更新了集群元数据信息了)cluster = metadata.fetch();//todo: 计算一下 拉取元数据已经花了多少时间elapsed = time.milliseconds() - begin;//todo: 如果花的时间大于 最大等待的时间,那么就报超时。if (elapsed >= maxWaitMs)throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");//todo: 如果已经获取到了元数据,但是发现topic没有授权if (cluster.unauthorizedTopics().contains(topic))throw new TopicAuthorizationException(topic);//计算出来 还可以用的时间。remainingWaitMs = maxWaitMs - elapsed;//获取该topic的分区数,// 如果这个值不为null,说明前面sender线程已经获取到元数据了。partitionsCount = cluster.partitionCountForTopic(topic);//todo: 如果获取到了元数据以后,这儿代码就会退出。 整体是一个do..while...循环,退出的条件就是获取到元数据信息} while (partitionsCount == null);//todo: 获取到元数据,发现指定的分区编号比总的分区数要大,报分区号范围越界异常if (partition != null && partition >= partitionsCount) {throw new KafkaException(String.format("Invalid partition given with record: %d is not in the range [0...%d).", partition, partitionsCount));}//todo: 返回集群的元数据信息和获取元数据信息需要的时间//2个参数//第一个参数表示集群的元数据信息//第二个参数表示拉取元数据需要的时间return new ClusterAndWaitTime(cluster, elapsed); }
1.2、Producer加载元数据之等待元数据更新分析
-
接下里分析下加载元数据流程中的
awaitUpdate
方法 -
代码
//todo: 同步的等待sender线程拉取元数据metadata.awaitUpdate(version, remainingWaitMs);
-
awaitUpdate
方法实现逻辑/*** Wait for metadata update until the current version is larger than the last version we know of*/public synchronized void awaitUpdate(final int lastVersion, final long maxWaitMs) throws InterruptedException {if (maxWaitMs < 0) {throw new IllegalArgumentException("Max time to wait for metadata updates should not be < 0 milli seconds");}//获取当前时间long begin = System.currentTimeMillis();//看剩余可以使用的时间,一开始是最大等待的时间。long remainingWaitMs = maxWaitMs;// 如果元数据的版本号小于等于上一次的version,说明元数据还没有更新成功// 因为如果sender线程那儿 更新元数据,如果更新成功了,sender线程肯定回去累加这个version。while (this.version <= lastVersion) {//还有剩余时间if (remainingWaitMs != 0)//让当前线程阻塞等待。//这里虽然还没有去可能sender线程的源码,但是我们应该可以猜想它肯定会有这样的一个操作//如果更新元数据成功了,会唤醒该线程wait(remainingWaitMs);//如果代码执行到这儿 说明就要么就被唤醒了,要么就到时间了。//计算一下花了多少时间。long elapsed = System.currentTimeMillis() - begin;//超时了if (elapsed >= maxWaitMs)//报超时异常throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");//没有超时,就计算还剩的时间remainingWaitMs = maxWaitMs - elapsed;}}
1.3、Sender线程拉取元数据
-
目前我们应该知道了拉取元数据其本质是交给Sender线程去做的,下面我们来分析下Sender线程如何实现元数据的拉取
-
Sender是一个线程,我们只需要分析它的run方法就可以了
/*** Run a single iteration of sending* * @param now* The current POSIX time in milliseconds*/ void run(long now) {//todo: 1、获取元数据// 因为这里采用场景驱动的方式,由于代码第一次进来,目前还没有获取到元数据。 所以这个Cluster对象中是没有元数据的//todo: 注意: 如果没有获取到元数据,下面的一些代码逻辑不需要看了,因为下面的代码都需要依赖于这个元数据//todo: 我们直接看run方法,最后一行代码就可以了,就是通过该行代码去拉取的元数据Cluster cluster = metadata.fetch();// get the list of partitions with data ready to sendRecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);// if there are any partitions whose leaders are not known yet, force metadata updateif (!result.unknownLeaderTopics.isEmpty()) {// The set of topics with unknown leader contains topics with leader election pending as well as// topics which may have expired. Add the topic again to metadata to ensure it is included// and request metadata update, since there are messages to send to the topic.for (String topic : result.unknownLeaderTopics)this.metadata.add(topic);this.metadata.requestUpdate();}// remove any nodes we aren't ready to send toIterator<Node> iter = result.readyNodes.iterator();long notReadyTimeout = Long.MAX_VALUE;while (iter.hasNext()) {Node node = iter.next();if (!this.client.ready(node, now)) {iter.remove();notReadyTimeout = Math.min(notReadyTimeout, this.client.connectionDelay(node, now));}}// create produce requestsMap<Integer, List<RecordBatch>> batches = this.accumulator.drain(cluster,result.readyNodes,this.maxRequestSize,now);if (guaranteeMessageOrder) {// Mute all the partitions drainedfor (List<RecordBatch> batchList : batches.values()) {for (RecordBatch batch : batchList)this.accumulator.mutePartition(batch.topicPartition);}}List<RecordBatch> expiredBatches = this.accumulator.abortExpiredBatches(this.requestTimeout, now);// update sensorsfor (RecordBatch expiredBatch : expiredBatches)this.sensors.recordErrors(expiredBatch.topicPartition.topic(), expiredBatch.recordCount);sensors.updateProduceRequestMetrics(batches);List<ClientRequest> requests = createProduceRequests(batches, now);// If we have any nodes that are ready to send + have sendable data, poll with 0 timeout so this can immediately// loop and try sending more data. Otherwise, the timeout is determined by nodes that have partitions with data// that isn't yet sendable (e.g. lingering, backing off). Note that this specifically does not include nodes// with sendable data that aren't ready to send since they would cause busy looping.long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);if (result.readyNodes.size() > 0) {log.trace("Nodes with data ready to send: {}", result.readyNodes);log.trace("Created {} produce requests: {}", requests.size(), requests);pollTimeout = 0;}for (ClientRequest request : requests)client.send(request, now);// if some partitions are already ready to be sent, the select time would be 0;// otherwise if some partition already has some data accumulated but not ready yet,// the select time will be the time difference between now and its linger expiry time;// otherwise the select time will be the time difference between now and the metadata expiry time;//todo: 重点就是看这个方法,就是用这个方法拉取kafka集群的元数据this.client.poll(pollTimeout, now); }
-
拉取元数据是通过上面` this.client.poll(pollTimeout, now)方法实现的
-
NetworkClient.poll
方法分析public List<ClientResponse> poll(long timeout, long now) {//todo: 1、封装了一个要拉取元数据请求long metadataTimeout = metadataUpdater.maybeUpdate(now);try {//todo: 2、发送请求、进行复杂的网络操作/*** 但是我们目前还没有学习到kafka的网络* 所以这儿大家就只需要知道这儿会发送网络请求。*/this.selector.poll(Utils.min(timeout, metadataTimeout, requestTimeoutMs));} catch (IOException e) {log.error("Unexpected error during I/O", e);}// process completed actionslong updatedNow = this.time.milliseconds();List<ClientResponse> responses = new ArrayList<>();//处理完成的发送响应handleCompletedSends(responses, updatedNow);/*** 步骤三:处理响应,响应里面就会有我们需要的元数据。** 这个地方是我们在看生产者是如何获取元数据的时候,看的。* 其实Kafak获取元数据的流程跟我们发送消息的流程是一模一样。* 获取元数据 ---> 判断网络连接是否建立好 ---> 建立网络连接 ---> 发送请求(获取元数据的请求)---> 服务端发送回来响应(带了集群的元数据信息)*///处理完成的接收响应handleCompletedReceives(responses, updatedNow);//处理关闭的响应handleDisconnections(responses, updatedNow);//处理连接状态变化handleConnections();//处理超时的请求handleTimedOutRequests(responses, updatedNow);// invoke callbacksfor (ClientResponse response : responses) {if (response.request().hasCallback()) {try {response.request().callback().onComplete(response);} catch (Exception e) {log.error("Uncaught error in request completion:", e);}}}return responses;}
-
metadataUpdater.maybeUpdate
方法分析//todo: 核心方法,这里会封装请求maybeUpdate(now, node);
-
maybeUpdate
方法分析private void maybeUpdate(long now, Node node) {if (node == null) {log.debug("Give up sending metadata request since no node is available");// mark the timestamp for no node available to connectthis.lastNoNodeAvailableMs = now;return;}String nodeConnectionId = node.idString();//判断网络连接是否建立好了if (canSendRequest(nodeConnectionId)) {this.metadataFetchInProgress = true;MetadataRequest metadataRequest;if (metadata.needMetadataForAllTopics())//获取所有topic的元数据信息//但是我们一般获取元数据的时候,只获取自己要发送消息的topic的元数据信息metadataRequest = MetadataRequest.allTopics();else//默认走的是这个方法//拉取我们发送消息的topic的方法metadataRequest = new MetadataRequest(new ArrayList<>(metadata.topics()));//todo: 创建了一个拉取元数据的请求ClientRequest clientRequest = request(now, nodeConnectionId, metadataRequest);log.debug("Sending metadata request {} to node {}", metadataRequest, node.id());//todo; 发送获取元数据信息的请求//内部中的代码是怎么发送的?我们在分析kafka网络知识的时候解决doSend(clientRequest, now);} else if (connectionStates.canConnect(nodeConnectionId, now)) {// we don't have a connection to this node right now, make onelog.debug("Initialize connection to node {} for sending metadata request", node.id());initiateConnect(node, now);// If initiateConnect failed immediately, this node will be put into blackout and we// should allow immediately retrying in case there is another candidate node. If it// is still connecting, the worst case is that we end up setting a longer timeout// on the next round and then wait for the response.} else { // connected, but can't send more OR connecting// In either case, we just need to wait for a network event to let us know the selected// connection might be usable again.this.lastNoNodeAvailableMs = now;}}
-
handleCompletedReceives(responses, updatedNow)
处理响应分析private void handleCompletedReceives(List<ClientResponse> responses, long now) {for (NetworkReceive receive : this.selector.completedReceives()) {//返回响应String source = receive.source();//从inFlightRequests中取出对应的ClientRequestClientRequest req = inFlightRequests.completeNext(source);//解析响应,数据封装成Struct对象中Struct body = parseResponse(receive.payload(), req.request().header());//todo: 如果是关于元数据的响应if (!metadataUpdater.maybeHandleCompletedReceive(req, now, body))responses.add(new ClientResponse(req, now, false, body));} }
-
metadataUpdater.maybeHandleCompletedReceive
元数据的响应分析@Override public boolean maybeHandleCompletedReceive(ClientRequest req, long now, Struct body) {short apiKey = req.request().header().apiKey();if (apiKey == ApiKeys.METADATA.id && req.isInitiatedByNetworkClient()) {//todo: 处理响应handleResponse(req.request().header(), body, now);return true;}return false; }
-
handleResponse
解析响应分析private void handleResponse(RequestHeader header, Struct body, long now) {this.metadataFetchInProgress = false;//todo: 解析响应的数据Struct对象,封装成MetadataResponseMetadataResponse response = new MetadataResponse(body);//todo: 响应中带有元数据信息//获取从服务端拉取的集群的元数据信息Cluster cluster = response.cluster();// check if any topics metadata failed to get updatedMap<String, Errors> errors = response.errors();//响应中的异常信息if (!errors.isEmpty())log.warn("Error while fetching metadata with correlation id {} : {}", header.correlationId(), errors);// don't update the cluster if there are no valid nodes...the topic we want may still be in the process of being// created which means we will get errors and no nodes until it exists//todo: 如果获取到了集群的元数据信息,if (cluster.nodes().size() > 0) {//todo: 更新元数据信息,接下来进入到了我们之前分析过得update方法this.metadata.update(cluster, now);} else {log.trace("Ignoring empty metadata response with correlation id {}.", header.correlationId());this.metadata.failedUpdate(now);} }
-
如果此时获取到了元数据,程序会调用
metadata.update
方法进行元数据分析,接下来又进入到我们之前分析过得update方法中//todo: 获取或者更新元数据 public synchronized void update(Cluster cluster, long now) {Objects.requireNonNull(cluster, "cluster should not be null");this.needUpdate = false;this.lastRefreshMs = now;this.lastSuccessfulRefreshMs = now;//更新元数据的版本this.version += 1;//todo: 默认是trueif (topicExpiryEnabled) {// Handle expiry of topics from the metadata refresh set.//todo: 第一次进来的topics为空,下面代码不会运行//todo: 如果代码第二次进来, 此时此刻进来,通过Producer的sender方法进来,这个时候已经有topic元数据信息//可以运行下面的代码逻辑了for (Iterator<Map.Entry<String, Long>> it = topics.entrySet().iterator(); it.hasNext(); ) {Map.Entry<String, Long> entry = it.next();long expireMs = entry.getValue();if (expireMs == TOPIC_EXPIRY_NEEDS_UPDATE)entry.setValue(now + TOPIC_EXPIRY_MS);else if (expireMs <= now) {it.remove();log.debug("Removing unused topic {} from the metadata list, expiryMs {} now {}", entry.getKey(), expireMs, now);}}}for (Listener listener: listeners)listener.onMetadataUpdate(cluster);String previousClusterId = cluster.clusterResource().clusterId();//todo: 这个值默认是false,代码不会进入到该分支中if (this.needMetadataForAllTopics) {// the listener may change the interested topics, which could cause another metadata refresh.// If we have already fetched all topics, however, another fetch should be unnecessary.this.needUpdate = false;this.cluster = getClusterForCurrentTopics(cluster);} else {//todo: 直接把传进入来的Cluster对象赋值给cluster,内部仅仅只包含了kafka集群地址//所有初始化的时候,update这个方法并没有去服务端去拉取元数据this.cluster = cluster;}// The bootstrap cluster is guaranteed not to have any useful informationif (!cluster.isBootstrapConfigured()) {String clusterId = cluster.clusterResource().clusterId();if (clusterId == null ? previousClusterId != null : !clusterId.equals(previousClusterId))log.info("Cluster ID: {}", cluster.clusterResource().clusterId());clusterResourceListeners.onUpdate(cluster.clusterResource());}//todo: 该方法最最重要的作业就是唤醒上一讲中处于wait的线程, metadata.awaitUpdate(version, remainingWaitMs);notifyAll();log.debug("Updated cluster metadata version {} to {}", this.version, this.cluster); }
程序再次进入到update方法中,此时此刻是通过Producer的sender方法进来,这个时候已经有topic元数据信息,会运行之前没有元数据而没有运行的一些代码逻辑。最后又一个很重要的操作,调用了notifyAll()方法,唤醒了之前处于阻塞等待获取元数据的线程
1.4、KafkaProducer加载元数据整体流程图
To Be Continued
接下来该看下Kafka的分区选择原理了
【生产者分析三】Kafka分区器选择