概述

主要的静态变量
//堆缓冲区个数,大部分情况默认CPU核数 * 2
private static final int DEFAULT_NUM_HEAP_ARENA;
//直接缓冲区个数,大部分情况默认CPU核数 * 2
private static final int DEFAULT_NUM_DIRECT_ARENA;
//每页大小,默认8192
private static final int DEFAULT_PAGE_SIZE;
//page组成的arena满二叉树最大深度,默认11(4096个节点,2048个叶子节点)
private static final int DEFAULT_MAX_ORDER;
//tiny个数,默认512
private static final int DEFAULT_TINY_CACHE_SIZE;
//small个数,默认256
private static final int DEFAULT_SMALL_CACHE_SIZE;
//normal个数,默认64
private static final int DEFAULT_NORMAL_CACHE_SIZE;
//最大缓存容量,默认32K
private static final int DEFAULT_MAX_CACHED_BUFFER_CAPACITY;
//缓存回收分配次数阈值,默认8192
private static final int DEFAULT_CACHE_TRIM_INTERVAL;
//暂时没研究,默认0不会用到
private static final long DEFAULT_CACHE_TRIM_INTERVAL_MILLIS;
//是否使用缓存,默认true
private static final boolean DEFAULT_USE_CACHE_FOR_ALL_THREADS;
//直接内存对齐,默认0
private static final int DEFAULT_DIRECT_MEMORY_CACHE_ALIGNMENT;
//缓存ByteBuf对象的ArrayDeque长度阈值,默认1023
static final int DEFAULT_MAX_CACHED_BYTEBUFFERS_PER_CHUNK;
//pegasize最小值,小于该值会报错,写死4096
private static final int MIN_PAGE_SIZE = 4096;
//chunksize最大值,大于该值/2,会报错,写死
private static final int MAX_CHUNK_SIZE = (int) (((long) Integer.MAX_VALUE + 1) / 2);
- DEFAULT_NUM_HEAP_ARENA/DEFAULT_NUM_DIRECT_ARENA
final int defaultMinNumArena = NettyRuntime.availableProcessors() * 2;
final int defaultChunkSize = DEFAULT_PAGE_SIZE << DEFAULT_MAX_ORDER;
DEFAULT_NUM_HEAP_ARENA = Math.max(0,
SystemPropertyUtil.getInt(
"io.netty.allocator.numHeapArenas",
(int) Math.min(defaultMinNumArena,runtime.maxMemory() / defaultChunkSize / 2 / 3)));
DEFAULT_NUM_DIRECT_ARENA = Math.max(0,
SystemPropertyUtil.getInt(
"io.netty.allocator.numDirectArenas",
(int) Math.min(defaultMinNumArena,PlatformDependent.maxDirectMemory() / defaultChunkSize / 2 / 3)));
-
DEFAULT_MAX_ORDER
每个chunk默认大小是16M,由若干个page组成,page结构为一颗满二叉树,满二叉树最大深度默认11,则有4096个节点,存储数据的叶子节点为2048,2048 * 8K(pagesize) = 16M
构造方法
public static final PooledByteBufAllocator DEFAULT =
new PooledByteBufAllocator(PlatformDependent.directBufferPreferred());
- DEFAULT:默认分配器,static字段方便使用
public PooledByteBufAllocator(boolean preferDirect, int nHeapArena, int nDirectArena, int pageSize, int maxOrder,
int tinyCacheSize, int smallCacheSize, int normalCacheSize,
boolean useCacheForAllThreads, int directMemoryCacheAlignment) {
//directByDefault = false emptyBuf = new EmptyByteBuf(this);
super(preferDirect);
//默认0
threadCache = new PoolThreadLocalCache(useCacheForAllThreads);
//默认512
this.tinyCacheSize = tinyCacheSize;
//默认256
this.smallCacheSize = smallCacheSize;
//默认64
this.normalCacheSize = normalCacheSize;
//pageSize:8192 maxOrder:11 chunkSize:16M
chunkSize = validateAndCalculateChunkSize(pageSize, maxOrder);
//nHeapArena:cpu核数 * 2
checkPositiveOrZero(nHeapArena, "nHeapArena");
//nDirectArena:cpu核数 * 2
checkPositiveOrZero(nDirectArena, "nDirectArena");
//directMemoryCacheAlignment:0
checkPositiveOrZero(directMemoryCacheAlignment, "directMemoryCacheAlignment");
if (directMemoryCacheAlignment > 0 && !isDirectMemoryCacheAlignmentSupported()) {
throw new IllegalArgumentException("directMemoryCacheAlignment is not supported");
}
if ((directMemoryCacheAlignment & -directMemoryCacheAlignment) != directMemoryCacheAlignment) {
throw new IllegalArgumentException("directMemoryCacheAlignment: "
+ directMemoryCacheAlignment + " (expected: power of two)");
}
//pageShifts默认13
int pageShifts = validateAndCalculatePageShifts(pageSize);
//创建堆内存缓冲区
if (nHeapArena > 0) {
heapArenas = newArenaArray(nHeapArena);
List<PoolArenaMetric> metrics = new ArrayList<PoolArenaMetric>(heapArenas.length);
for (int i = 0; i < heapArenas.length; i ++) {
PoolArena.HeapArena arena = new PoolArena.HeapArena(this,
pageSize, maxOrder, pageShifts, chunkSize,
directMemoryCacheAlignment);
heapArenas[i] = arena;
metrics.add(arena);
}
heapArenaMetrics = Collections.unmodifiableList(metrics);
} else {
heapArenas = null;
heapArenaMetrics = Collections.emptyList();
}
//创建直接内存缓冲区
if (nDirectArena > 0) {
directArenas = newArenaArray(nDirectArena);
List<PoolArenaMetric> metrics = new ArrayList<PoolArenaMetric>(directArenas.length);
for (int i = 0; i < directArenas.length; i ++) {
PoolArena.DirectArena arena = new PoolArena.DirectArena(
this, pageSize, maxOrder, pageShifts, chunkSize, directMemoryCacheAlignment);
directArenas[i] = arena;
metrics.add(arena);
}
directArenaMetrics = Collections.unmodifiableList(metrics);
} else {
directArenas = null;
directArenaMetrics = Collections.emptyList();
}
metric = new PooledByteBufAllocatorMetric(this);
}
-
preferDirect:默认false,即
alloc.buffer();
默认是分配HeapByteBuf,alloc.directBuffer();
才会分配DirectByteBuf -
threadCache:创建PoolThreadLocalCache(
PoolThreadLocalCache extends FastThreadLocal<PoolThreadCache>
),useCacheForAllThreads为true默认所有线程都使用Cache; -
heapArenas/directArenas:
new PoolArena[size];
创建CPU核数*2大小的PoolArena数组,跟NioEventLoop一样都是2倍cpu核数大小,说明每一个NioEventLoop都有一个独享的Arean,因此在每一个Arean分配内存时都是不用加锁的,PoolArena初始化先不分析
newDirectBuffer/newHeapBuffer
protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
//获取线程本地变量的PoolArena进行分配
PoolThreadCache cache = threadCache.get();
//拿到cache中arena
PoolArena<ByteBuffer> directArena = cache.directArena;
final ByteBuf buf;
if (directArena != null) {
//进行内存分配
buf = directArena.allocate(cache, initialCapacity, maxCapacity);
} else {
buf = PlatformDependent.hasUnsafe() ?
UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity) :
new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
}
//资源泄露检测
return toLeakAwareBuffer(buf);
}
FastThreadLocal中容器是延迟初始化,在PoolThreadCache cache = threadCache.get();
时会触发其initialValue
方法创建PoolThreadCache并放到FastThreadLocal的容器中;
protected synchronized PoolThreadCache initialValue() {
//从arenas数组中选择被最少线程使用的arena
final PoolArena<byte[]> heapArena = leastUsedArena(heapArenas);
final PoolArena<ByteBuffer> directArena = leastUsedArena(directArenas);
final Thread current = Thread.currentThread();
//useCacheForAllThreads默认为true
if (useCacheForAllThreads || current instanceof FastThreadLocalThread) {
//创建PoolThreadCache
final PoolThreadCache cache = new PoolThreadCache(
heapArena, directArena, tinyCacheSize, smallCacheSize, normalCacheSize,
DEFAULT_MAX_CACHED_BUFFER_CAPACITY, DEFAULT_CACHE_TRIM_INTERVAL);
//DEFAULT_CACHE_TRIM_INTERVAL_MILLIS默认为0,不走该逻辑
if (DEFAULT_CACHE_TRIM_INTERVAL_MILLIS > 0) {
final EventExecutor executor = ThreadExecutorMap.currentExecutor();
if (executor != null) {
executor.scheduleAtFixedRate(trimTask, DEFAULT_CACHE_TRIM_INTERVAL_MILLIS,
DEFAULT_CACHE_TRIM_INTERVAL_MILLIS, TimeUnit.MILLISECONDS);
}
}
//返回PoolThreadCache,放到FastThreadLocal的容器中,跟当前线程进行绑定
return cache;
}
// 如果指定了不使用缓存,或者线程换粗对象不是FastThreadLocalThread类型的,则创建一个PoolThreadCache
// 对象,该对象中是不做任何缓存的,因为初始化数据都是0
return new PoolThreadCache(heapArena, directArena, 0, 0, 0, 0, 0);
}
- 通过
leastUsedArena
方法从arenas数组中选择被最少线程使用的arena - heapArena/directArena作为构造参数创建PoolThreadCache
- 返回PoolThreadCache,放到FastThreadLocal的容器中,跟当前线程进行绑定
private <T> PoolArena<T> leastUsedArena(PoolArena<T>[] arenas) {
if (arenas == null || arenas.length == 0) {
return null;
}
//取数组中第一个arena元素作为基准
PoolArena<T> minArena = arenas[0];
for (int i = 1; i < arenas.length; i++) {
PoolArena<T> arena = arenas[i];
//numThreadCaches在PoolThreadCache构造方法中自增,用于记录每个arena上被创建的PoolThreadCache个数
if (arena.numThreadCaches.get() < minArena.numThreadCaches.get()) {
//获取到PoolThreadCache个数最少的,即被最少线程使用的arena
minArena = arena;
}
}
return minArena;
}
- numThreadCaches:在PoolThreadCache构造方法中自增,用于记录每个arena上被创建的PoolThreadCache个数
网友评论