当前位置: 首页 > news >正文

电子商务网站建设期末考试/正规排名网站推广公司

电子商务网站建设期末考试,正规排名网站推广公司,用mvc做网站的缺点,wordpress 暂停前台访问大家一定都了解Java的线程池,线程池有什么好处呢?如果没有线程池,我们每次创建线程都要新建一个线程,这样对CPU的消耗比较大。那么利用线程池我们可以对已经创建好的线程复用,线程就不用频繁创建和销毁了。 同样&…

大家一定都了解Java的线程池,线程池有什么好处呢?如果没有线程池,我们每次创建线程都要新建一个线程,这样对CPU的消耗比较大。那么利用线程池我们可以对已经创建好的线程复用,线程就不用频繁创建和销毁了。

同样,我们的内存池也是这个原理,producerBatch需要空间存储消息的时候,就去缓存池申请一块内存,而不用频繁地创建和销毁内存,也就避免了频繁地GC。

BufferPool简介

下面的结构图简单说明了BufferPool的组成结构和处理缓存的流程:
在这里插入图片描述
整个BufferPool的大小默认为32M,内部内存区域分为两块:固定大小内存块集合free、非池化缓存nonPooledAvailableMemory。固定大小内存块默认大小为16k。当ProducerBatch向BufferPool申请一个大小为size的内存块时,BufferPool会根据size的大小判断由哪个内存区域分配内存块。同时,free和nonPooledAvailableMemory这两块区域的内存可以交换。

接下来,我们通过代码来学习Kafka底层提供的高效的内存池设计。

类BufferPool

重要字段如下:

public class BufferPool {static final String WAIT_TIME_SENSOR_NAME = "bufferpool-wait-time";private final long totalMemory;//默认32Mprivate final int poolableSize;//池化大小16kprivate final ReentrantLock lock;//分配和回收时用的锁。private final Deque<ByteBuffer> free;//池化的内存private final Deque<Condition> waiters;//阻塞线程对应的Condition集合private long nonPooledAvailableMemory;//非池化可使用的内存
}
  • totalMemory:整个BufferPool内存大小,默认是32M。
  • poolableSize:池化缓存区一块内存块的大小,默认是16k。
  • lock:类型是ReentrantLock。因为会有多线程并发和回收ByteBuffer,所以使用锁控制并发,保证了线程的安全。
  • free:类型是Deque。缓存了指定大小的ByteBuffer对象。
  • waiters:类型是Deque队列。因为会有申请不到足够内存的线程,线程为了等待其他线程释放内存而阻塞等待,对应的Condition对象会进入该队列。
  • nonPooledAvailableMemory:非池化可使用的内存。

接下来,我再来介绍下重要的方法。

allocate()方法是向BufferPool申请ByteBuffer。

public ByteBuffer allocate(int size, long maxTimeToBlockMs) throws InterruptedException {//1.验证申请的内存是否大于总内存if (size > this.totalMemory)throw new IllegalArgumentException("Attempt to allocate " + size+ " bytes, but there is a hard limit of "+ this.totalMemory+ " on memory allocations.");ByteBuffer buffer = null;//2.加锁,保证线程安全。this.lock.lock();if (this.closed) {this.lock.unlock();throw new KafkaException("Producer closed while allocating memory");}try {// check if we have a free buffer of the right size pooled//3.申请内存的大小是否是池化的内存大小,16kif (size == poolableSize && !this.free.isEmpty())//如果是就从池里Bytebufferreturn this.free.pollFirst();// 池化内存空间的大小int freeListSize = freeSize() * this.poolableSize;//4.如果非池化可以空间加池化内存空间大于等于要申请的空间if (this.nonPooledAvailableMemory + freeListSize >= size) {// 如果申请的空间大小小于池化的大小,就从free队列里拿出一个池化的大小的Bytebuffer加到nonPooledAvailableMemory中// 5.如果一个池化的大小的Bytebuffer不满足size,就持续释放池化内存Bytebuffer直到满足为止。freeUp(size);this.nonPooledAvailableMemory -= size;//如果非池化可以空间加池化内存空间大于要申请的空间} else {// we are out of memory and will have to blockint accumulated = 0;//创建对应的ConditionCondition moreMemory = this.lock.newCondition();try {//线程最长阻塞时间long remainingTimeToBlockNs = TimeUnit.MILLISECONDS.toNanos(maxTimeToBlockMs);//放入waiters集合中this.waiters.addLast(moreMemory);// 没有足够的空间就一直循环while (accumulated < size) {long startWaitNs = time.nanoseconds();long timeNs;boolean waitingTimeElapsed;try {//空间不够就阻塞,并设置超时时间。waitingTimeElapsed = !moreMemory.await(remainingTimeToBlockNs, TimeUnit.NANOSECONDS);} finally {long endWaitNs = time.nanoseconds();timeNs = Math.max(0L, endWaitNs - startWaitNs);recordWaitTime(timeNs);}if (this.closed)throw new KafkaException("Producer closed while allocating memory");if (waitingTimeElapsed) {this.metrics.sensor("buffer-exhausted-records").record();throw new BufferExhaustedException("Failed to allocate memory within the configured max blocking time " + maxTimeToBlockMs + " ms.");}remainingTimeToBlockNs -= timeNs;// check if we can satisfy this request from the free list,// otherwise allocate memory//ByteBuffer池化集合里是否有元素if (accumulated == 0 && size == this.poolableSize && !this.free.isEmpty()) {// just grab a buffer from the free listbuffer = this.free.pollFirst();accumulated = size;} else {//尝试给nonPooledAvailableMemory扩容freeUp(size - accumulated);int got = (int) Math.min(size - accumulated, this.nonPooledAvailableMemory);this.nonPooledAvailableMemory -= got;//累计分配了多少空间accumulated += got;}}accumulated = 0;} finally {this.nonPooledAvailableMemory += accumulated;//把已经分配的内存还回nonPooledAvailableMemorythis.waiters.remove(moreMemory);//删除对应的condition}}} finally {// signal any additional waiters if there is more memory left// over for themtry {if (!(this.nonPooledAvailableMemory == 0 && this.free.isEmpty()) && !this.waiters.isEmpty())this.waiters.peekFirst().signal();} finally {// Another finally... otherwise find bugs complainslock.unlock();}}if (buffer == null)//非池化ByteBuffer分配内存return safeAllocateByteBuffer(size);elsereturn buffer;
}

这里先明确三个变量:

  • free:由固定大小ByteBuffer组成的集合。
  • nonPooledAvailableMemory:非池化可利用的内存。
  • size:申请的ByteBuffer大小。

第一步,验证申请的空间大小size是否大于总内存,BufferPool的总内存默认是32M。如果比总内存还大,就抛出异常。

第二步,因为会涉及到Deque的操作,而Deque不是线程安全的,这里要加锁,防止多线程操作引起的问题。

第三步,如果free不为空,而且申请的空间size和free的元素的大小相同,就从free中拿出一个ByteBuffer并返回,ByteBuffer申请成功。

第四步,如果不满足上述条件,free加上nonPooledAvailableMemory比要申请的大,就调用freeUp(size)方法凑齐足够的空间给size。

freeUp(size)方法源码参考下面:

private void freeUp(int size) {while (!this.free.isEmpty() && this.nonPooledAvailableMemory < size)this.nonPooledAvailableMemory += this.free.pollLast().capacity();
}

只要固定大小ByteBuffer集合不为空且非池化可利用空间小于申请的size,就不断从free里往nonPooledAvailableMemory添加ByteBuffer,直到满足size的大小。

第五步,如果nonPooledAvailableMemory加上free的空间小于size的大小,就意味着现在的剩余空间满足不了size的大小。要随着其他线程对内存的释放一点点累加到满足size大小。那该怎么办呢?

首先定义int型变量accumulated作为标记已经获得了多大的空间。定义这个线程的Condition,并放入Condition的集合waiters中。然后进入到while循环中,当累加的空间和size一样大了才跳出循环。进入while循环后先通过await()阻塞线程,等待其他线程释放内存。当其他线程释放内存时,会唤醒这里的阻塞。

假如有线程释放内存且唤醒这里的阻塞了,那么先看size是否满足释放free里的ByteBuffer的条件,如果满足就从free里取出一个ByteBuffer,否则再调用freeUp()给nonPooledAvailableMemory扩容。如果累计的空间还是不满足size的大小,那就再次await()等待下次有线程释放空间。

我们再来分析下释放空间的代码,deallocate()方法:

public void deallocate(ByteBuffer buffer, int size) {lock.lock();try {//如果是池化ByteBuffer大小的ByteBufferif (size == this.poolableSize && size == buffer.capacity()) {buffer.clear();this.free.add(buffer);} else {//否则释放到nonPooledAvailableMemorythis.nonPooledAvailableMemory += size;}//拿出一个condition,并signal,唤醒阻塞。Condition moreMem = this.waiters.peekFirst();if (moreMem != null)moreMem.signal();} finally {lock.unlock();}
}

先判断size是否和池化ByteBuffer的大小一样,如果满足就把要释放的ByteBuffer放回free里,否则非池化可利用缓存会回收这个ByteBuffer。因为有ByteBuffer回收了,我们就要看阻塞线程的Condition集合waiters是否为空,如果不为空就取第一个Condition并唤醒阻塞。

http://www.jmfq.cn/news/5160835.html

相关文章:

  • p2p金融网站开发方案/seo站长常用工具
  • 门户网站开发公司平台/seo搜索引擎优化步骤
  • 天河网站建设设计/今日国际新闻大事件
  • 制作个简单公司网站要多少钱/南昌百度推广公司
  • 去什么网站发贴做推广/种子库
  • 网站开发 简历/新区seo整站优化公司
  • 上海礼品定制网站/百度竞价返点一般多少
  • 去视频网站做编辑器/线上培训平台
  • 网络营销中网站建设的策略/百度关键词推广一年多少钱
  • wordpress建站好不好/怎么把网站排名排上去
  • 义乌购物网站建设多少钱/关键词在线优化
  • 永久有效的代理ip/seo推广排名
  • 编程代写有哪些平台/上海牛巨微seo
  • 郑州便宜网站建设报价/百度推广账户优化方案
  • 给小说网站做编辑/关键词自助优化
  • 网站制作的前期主要是做好什么工作/网络营销推广方法十种
  • 小公司让我用织梦做网站/自己做网站建设
  • 高端网站制作建设/广告设计与制作
  • 浙江自己如何做网站/色盲测试图及答案大全
  • 太原网站制作费用/凤凰网台湾资讯
  • 徐州企业自助建站/成人英语培训
  • 空间网站链接怎么做/营销软件
  • 2022八月热点新闻摘抄/宝鸡seo
  • 青海住房与城乡建设厅网站/建设网站前的市场分析
  • 做门户网站用什么模板/郑州百度推广公司
  • 怎么做自己的一个网站/bt磁力
  • 动态网站开发语言的优势与不足/打开百度网站
  • 做视频网站要什么软件下载/网页一键生成app软件
  • 做投票的网站赚钱嘛/微信朋友圈推广软文
  • 分销网站建设方案/新闻发布