博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
PoolThreadCache
阅读量:4586 次
发布时间:2019-06-09

本文共 18841 字,大约阅读时间需要 62 分钟。

概述(Motivation)

如果说netty维护了一个总的内存缓冲池,那么这个就是线程自己的内存缓冲池,它的工作大致是线程从“总池”获得的内存用完并不直接还回去,而是暂放到自己的内存缓冲池中。

实现细节(Modification)

主要成员变量

private static final InternalLogger logger = InternalLoggerFactory.getInstance(PoolThreadCache.class);// 所指向的arenafinal PoolArena
heapArena;final PoolArena
directArena;// Hold the caches for the different size classes, which are tiny, small and normal.// 分别是不同情况下的small tiny normal存储数组private final MemoryRegionCache
[] tinySubPageHeapCaches;private final MemoryRegionCache
[] smallSubPageHeapCaches;private final MemoryRegionCache
[] tinySubPageDirectCaches;private final MemoryRegionCache
[] smallSubPageDirectCaches;private final MemoryRegionCache
[] normalHeapCaches;private final MemoryRegionCache
[] normalDirectCaches;// Used for bitshifting when calculate the index of normal caches laterprivate final int numShiftsNormalDirect;private final int numShiftsNormalHeap;// 分配次数的阈值,超过则需要进行trimprivate final int freeSweepAllocationThreshold;// 分配的次数,每一次分配会加一private int allocations;

构造函数

这里用到了allocator传过来的三个xxCacheSize,作用是存储各自cache的数量指标,这名字取得不是很通俗易懂。。numxxSubpagePools才是存储着每种cache的细分指标。下文作解释。

// tiny 512 small 256 normal 64    // 这三个值是队列的最长长度  xxCacheSize    // tiny 32 small 4 normal 16    // 这三个值是各自的取值个数  numxxSubpagePools    PoolThreadCache(PoolArena
heapArena, PoolArena
directArena, int tinyCacheSize, int smallCacheSize, int normalCacheSize, int maxCachedBufferCapacity, int freeSweepAllocationThreshold) { ... // 初始化数组,并且将对应的线程引用计数加一 if (directArena != null) { ... } if (heapArena != null) { // Create the caches for the heap allocations tinySubPageHeapCaches = createSubPageCaches( tinyCacheSize, PoolArena.numTinySubpagePools, SizeClass.Tiny); // numSmallSubpagePools = pageShifts - 9; pageShifts等于page的偏移量,在pbba的validateAndCalculatePageShifts函数中 smallSubPageHeapCaches = createSubPageCaches( smallCacheSize, heapArena.numSmallSubpagePools, SizeClass.Small); numShiftsNormalHeap = log2(heapArena.pageSize); normalHeapCaches = createNormalCaches( normalCacheSize, maxCachedBufferCapacity, heapArena); // 引用计数加一 heapArena.numThreadCaches.getAndIncrement(); } else { // No heapArea is configured so just null out all caches tinySubPageHeapCaches = null; smallSubPageHeapCaches = null; normalHeapCaches = null; numShiftsNormalHeap = -1; } // Only check if there are caches in use. if ((tinySubPageDirectCaches != null || smallSubPageDirectCaches != null || normalDirectCaches != null || tinySubPageHeapCaches != null || smallSubPageHeapCaches != null || normalHeapCaches != null) && freeSweepAllocationThreshold < 1) { throw new IllegalArgumentException("freeSweepAllocationThreshold: " + freeSweepAllocationThreshold + " (expected: > 0)"); } }

分配函数

MemoryRegionCache

按照allocateSmall作为例子,内部存储有六组不同的MemoryRegionCache数组来存储堆和直接内存,三种大小的空间。每一个MemoryRegionCache数组的长度由numxxSubpagePools决定,如图Small级别的值是4,那么会被拆分成4种不同长度,而xxCacheSize则是决定Entry的长度,即缓存的量。

总的类大致视图

16B  -- TinyCache[1]  -- (Buf512-...-Buf3-Buf2-Buf1)32B  -- TinyCache[2]  -- ()496B -- TinyCache[31] -- (Buf2-Buf1)512B -- SmallCache[0] -- (Buf256-...-Buf3-Buf2-Buf1)8KB  -- NormalCache[0] - (Buf64 -...-Buf3-Buf2-Buf1)

比如说tiny,默认TinyCache数组长度是32,那么可以容纳的大小类型有16B--496B,间隔按2的倍数递增,而每一个数组内部结构是含有mpsc队列,队列最大长度是512small默认最大队列长度是256normal默认最大队列长度是64

具体的allocate函数如下

分配小于512B空间的allocateTiny

分配小于8K(pageSize)空间的allocateSmall

分配小于16MiB(chunckSize)空间的allocateNormal

缓存函数

MemoryRegionCache中的add用来将需要缓存的加入到队列中去

标准化capacity -- 计算对应偏移量 -- 找到制定的MemoryRegionCache队列 -- 加入队列中

/** * Add {@link PoolChunk} and {@code handle} to the cache if there is enough room. * Returns {@code true} if it fit into the cache {@code false} otherwise. */@SuppressWarnings({ "unchecked", "rawtypes" })boolean add(PoolArena
area, PoolChunk chunk, long handle, int normCapacity, SizeClass sizeClass) { MemoryRegionCache
cache = cache(area, normCapacity, sizeClass); if (cache == null) { return false; } return cache.add(chunk, handle);} private MemoryRegionCache
cache(PoolArena
area, int normCapacity, SizeClass sizeClass) { switch (sizeClass) { case Normal: return cacheForNormal(area, normCapacity); case Small: return cacheForSmall(area, normCapacity); case Tiny: return cacheForTiny(area, normCapacity); default: throw new Error(); } } private MemoryRegionCache
cacheForTiny(PoolArena
area, int normCapacity) { int idx = PoolArena.tinyIdx(normCapacity); if (area.isDirect()) { return cache(tinySubPageDirectCaches, idx); } return cache(tinySubPageHeapCaches, idx); } /** * Add to cache if not already full. */ @SuppressWarnings("unchecked") public final boolean add(PoolChunk
chunk, long handle) { Entry
entry = newEntry(chunk, handle); boolean queued = queue.offer(entry); if (!queued) { // If it was not possible to cache the chunk, immediately recycle the entry entry.recycle(); } return queued; }

释放函数

free函数,由于MemoryRegionCache只是存储相关内存的位置的基础信息,那么也是利用这些基础信息去回调对应的存储空间去回收,自身不做具体的回收工作。

MemoryRegionCache

  • queue队列是MpscArrayQueue
  • 分配次数限制来控制空间利用率,从而触发释放机制

数据结构

private abstract static class MemoryRegionCache
{ // 队列最大的长度 private final int size; // 内部缓存的mpsc队列 private final Queue
> queue; // 缓存内存的大小类别,Tiny/Small/Normal private final SizeClass sizeClass; // 分配的次数 private int allocations; MemoryRegionCache(int size, SizeClass sizeClass) { ... } // 给指定的pooledByteBuf分配空间信息 protected abstract void initBuf(PoolChunk
chunk, long handle, PooledByteBuf
buf, int reqCapacity); // 缓存空间,一般arena缓存会先调用这个来进行存储空间 /** * Add to cache if not already full. */ @SuppressWarnings("unchecked") public final boolean add(PoolChunk
chunk, long handle) { Entry
entry = newEntry(chunk, handle); boolean queued = queue.offer(entry); if (!queued) { // If it was not possible to cache the chunk, immediately recycle the entry entry.recycle(); } return queued; } // 分配空间,从队列中获取 /** * Allocate something out of the cache if possible and remove the entry from the cache. */ public final boolean allocate(PooledByteBuf
buf, int reqCapacity) { Entry
entry = queue.poll(); if (entry == null) { return false; } initBuf(entry.chunk, entry.handle, buf, reqCapacity); entry.recycle(); // allocations is not thread-safe which is fine as this is only called from the same thread all time. ++ allocations; return true; } // 回收释放缓存 /** * Free up cached {@link PoolChunk}s if not allocated frequently enough. */ public final void trim() { int free = size - allocations; allocations = 0; // We not even allocated all the number that are if (free > 0) { free(free); } } /** * Clear out this cache and free up all previous cached {@link PoolChunk}s and {@code handle}s. */ public final int free() { return free(Integer.MAX_VALUE); } private int free(int max) { int numFreed = 0; for (; numFreed < max; numFreed++) { Entry
entry = queue.poll(); if (entry != null) { freeEntry(entry); } else { // all cleared return numFreed; } } return numFreed; } @SuppressWarnings({ "unchecked", "rawtypes" }) private void freeEntry(Entry entry) { PoolChunk chunk = entry.chunk; long handle = entry.handle; // 回收entry // recycle now so PoolChunk can be GC'ed. entry.recycle(); // 释放内存空间 chunk.arena.freeChunk(chunk, handle, sizeClass); } }

内部类Entry

  • 缓存的所分配空间的基础信息,从而能利用这些信息重新进行initBuf
// 队列存储的内部对象,主要是存放所分配内存指向的chunk以及其handle参数(这里所指代的是内存偏移量),Recycle用来回收     static final class Entry
{ final Handle
> recyclerHandle; PoolChunk
chunk; long handle = -1; Entry(Handle
> recyclerHandle) { this.recyclerHandle = recyclerHandle; } void recycle() { chunk = null; handle = -1; recyclerHandle.recycle(this); } } @SuppressWarnings("rawtypes") private static Entry newEntry(PoolChunk
chunk, long handle) { Entry entry = RECYCLER.get(); entry.chunk = chunk; entry.handle = handle; return entry; } @SuppressWarnings("rawtypes") private static final Recycler
RECYCLER = new Recycler
() { @SuppressWarnings("unchecked") @Override protected Entry newObject(Handle
handle) { return new Entry(handle); } };

内存分配size

以下讨论均为默认值

当分配的空间大小小于pagesize=8k,那么认为是tiny and small。

当分配的空间大小小于512B,那么认为是tiny。

这是个大致的分类,由于还有更细的空间分配,均为2的整数倍。

tiny分类下再细的分为32组,small分类下分为4组,normal分为16组,huge是无限的。

1369298-20190215182306362-1623347091.png

综述(Result)

线程自带的缓存同样是类比总库的分配结构,有tiny,small,normal,只是线程自己热点的内存空间释放回收做了基础的信息存储,不依赖于总库的分配。自己通过队列存起来,再次用到看自己曾经分配到就再次拿取自己队列的即可。

Cache清理改进之路

ThreadDeathWatcher

这个是较早的一个版本了,内部开着一个线程死亡监视线程进行监督

核心思路维护一个队列,有任务就加进来,如果有任务开一个守护线程轮询,如果没有任务关闭守护线程,等新的任务进来再开启线程轮询

  • 任务对象设为 受监控的线程 以及 断开时需要执行任务 ,设为一个单元

  • 全部只有一个守护线程不断循环进行任务分发,当无任务关闭线程。通过一个started变量来控制,由于只需要保证原子性,不需要设为volatile,通过CAS进行操作即可

  • 任务队列设为mpmc,保证有其他线程可以访问任务量
  • 内部通过arraylist进行任务轮询,因为当某线程unwatch任务时候,通过是加入mpmc队列中,这时候该队列中有两个任务内容,内部轮询时发现unwatch时可以对arraylist执行remove操作。所以重写任务的equal函数。

/** * Checks if a thread is alive periodically and runs a task when a thread dies. * 

* This thread starts a daemon thread to check the state of the threads being watched and to invoke their * associated {@link Runnable}s. When there is no thread to watch (i.e. all threads are dead), the daemon thread * will terminate itself, and a new daemon thread will be started again when a new watch is added. *

* * @deprecated will be removed in the next major release */@Deprecatedpublic final class ThreadDeathWatcher { private static final InternalLogger logger = InternalLoggerFactory.getInstance(ThreadDeathWatcher.class); // visible for testing static final ThreadFactory threadFactory; // Use a MPMC queue as we may end up checking isEmpty() from multiple threads which may not be allowed to do // concurrently depending on the implementation of it in a MPSC queue. private static final Queue
pendingEntries = new ConcurrentLinkedQueue
(); private static final Watcher watcher = new Watcher(); private static final AtomicBoolean started = new AtomicBoolean(); private static volatile Thread watcherThread; static { String poolName = "threadDeathWatcher"; String serviceThreadPrefix = SystemPropertyUtil.get("io.netty.serviceThreadPrefix"); if (!StringUtil.isNullOrEmpty(serviceThreadPrefix)) { poolName = serviceThreadPrefix + poolName; } // because the ThreadDeathWatcher is a singleton, tasks submitted to it can come from arbitrary threads and // this can trigger the creation of a thread from arbitrary thread groups; for this reason, the thread factory // must not be sticky about its thread group threadFactory = new DefaultThreadFactory(poolName, true, Thread.MIN_PRIORITY, null); } /** * Schedules the specified {@code task} to run when the specified {@code thread} dies. * * @param thread the {@link Thread} to watch * @param task the {@link Runnable} to run when the {@code thread} dies * * @throws IllegalArgumentException if the specified {@code thread} is not alive */ public static void watch(Thread thread, Runnable task) { if (thread == null) { throw new NullPointerException("thread"); } if (task == null) { throw new NullPointerException("task"); } if (!thread.isAlive()) { throw new IllegalArgumentException("thread must be alive."); } schedule(thread, task, true); } /** * Cancels the task scheduled via {@link #watch(Thread, Runnable)}. */ public static void unwatch(Thread thread, Runnable task) { if (thread == null) { throw new NullPointerException("thread"); } if (task == null) { throw new NullPointerException("task"); } schedule(thread, task, false); } private static void schedule(Thread thread, Runnable task, boolean isWatch) { pendingEntries.add(new Entry(thread, task, isWatch)); if (started.compareAndSet(false, true)) { final Thread watcherThread = threadFactory.newThread(watcher); // Set to null to ensure we not create classloader leaks by holds a strong reference to the inherited // classloader. // See: // - https://github.com/netty/netty/issues/7290 // - https://bugs.openjdk.java.net/browse/JDK-7008595 AccessController.doPrivileged(new PrivilegedAction
() { @Override public Void run() { watcherThread.setContextClassLoader(null); return null; } }); watcherThread.start(); ThreadDeathWatcher.watcherThread = watcherThread; } } /** * Waits until the thread of this watcher has no threads to watch and terminates itself. * Because a new watcher thread will be started again on {@link #watch(Thread, Runnable)}, * this operation is only useful when you want to ensure that the watcher thread is terminated *
after your application is shut down and there's no chance of calling * {@link #watch(Thread, Runnable)} afterwards. * * @return {@code true} if and only if the watcher thread has been terminated */ public static boolean awaitInactivity(long timeout, TimeUnit unit) throws InterruptedException { if (unit == null) { throw new NullPointerException("unit"); } Thread watcherThread = ThreadDeathWatcher.watcherThread; if (watcherThread != null) { watcherThread.join(unit.toMillis(timeout)); return !watcherThread.isAlive(); } else { return true; } } private ThreadDeathWatcher() { } private static final class Watcher implements Runnable { private final List
watchees = new ArrayList
(); @Override public void run() { for (;;) { fetchWatchees(); notifyWatchees(); // Try once again just in case notifyWatchees() triggered watch() or unwatch(). fetchWatchees(); notifyWatchees(); try { Thread.sleep(1000); } catch (InterruptedException ignore) { // Ignore the interrupt; do not terminate until all tasks are run. } if (watchees.isEmpty() && pendingEntries.isEmpty()) { // Mark the current worker thread as stopped. // The following CAS must always success and must be uncontended, // because only one watcher thread should be running at the same time. boolean stopped = started.compareAndSet(true, false); assert stopped; // Check if there are pending entries added by watch() while we do CAS above. if (pendingEntries.isEmpty()) { // A) watch() was not invoked and thus there's nothing to handle // -> safe to terminate because there's nothing left to do // B) a new watcher thread started and handled them all // -> safe to terminate the new watcher thread will take care the rest break; } // There are pending entries again, added by watch() if (!started.compareAndSet(false, true)) { // watch() started a new watcher thread and set 'started' to true. // -> terminate this thread so that the new watcher reads from pendingEntries exclusively. break; } // watch() added an entry, but this worker was faster to set 'started' to true. // i.e. a new watcher thread was not started // -> keep this thread alive to handle the newly added entries. } } } private void fetchWatchees() { for (;;) { Entry e = pendingEntries.poll(); if (e == null) { break; } if (e.isWatch) { watchees.add(e); } else { watchees.remove(e); } } } private void notifyWatchees() { List
watchees = this.watchees; for (int i = 0; i < watchees.size();) { Entry e = watchees.get(i); if (!e.thread.isAlive()) { watchees.remove(i); try { e.task.run(); } catch (Throwable t) { logger.warn("Thread death watcher task raised an exception:", t); } } else { i ++; } } } } private static final class Entry { final Thread thread; final Runnable task; final boolean isWatch; Entry(Thread thread, Runnable task, boolean isWatch) { this.thread = thread; this.task = task; this.isWatch = isWatch; } @Override public int hashCode() { return thread.hashCode() ^ task.hashCode(); } @Override public boolean equals(Object obj) { if (obj == this) { return true; } if (!(obj instanceof Entry)) { return false; } Entry that = (Entry) obj; return thread == that.thread && task == that.task; } }}

ObjectCleaner

由于PoolThreadCache放入到FastThreadLocal中,所以由其出发OnRemoval函数去保证退出的时候进行清理

Finialize

ObjectCleaner does start a Thread to handle the cleaning of resources which leaks into the users application. We should not use it in netty itself to make things more predictable.

/// TODO: In the future when we move to Java9+ we should use java.lang.ref.Cleaner.    @Override    protected void finalize() throws Throwable {        try {            super.finalize();        } finally {            free();        }    }

转载于:https://www.cnblogs.com/GrimReaper/p/10385185.html

你可能感兴趣的文章
c++文件操作
查看>>
nodejs实现新闻爬虫
查看>>
关于left join (本文出处:http://www.cnblogs.com/aces/ )
查看>>
解决WebService 测试窗体只能用于来自本地计算机的请求
查看>>
Linux 用户和用户组详解
查看>>
深入理解DIP、IoC、DI以及IoC容器
查看>>
赋值文件
查看>>
Vue 数组 字典 template v-for 的使用
查看>>
蓝牙模块选择经验谈
查看>>
PAT 1060 爱丁顿数(25)(STL-multiset+思路)
查看>>
进程和线程
查看>>
爬取校花网视频
查看>>
mysql root密码忘记最快方法
查看>>
imagemagick imagick
查看>>
DevOps - 版本控制 - Gitlab
查看>>
代码管理必备-----git使用上传码云
查看>>
静态库Lib和动态库Dll
查看>>
获取日k数据
查看>>
【LOJ】 #2132. 「NOI2015」荷马史诗
查看>>
策略模式
查看>>