美文网首页
netty源码分析(六) - ByteBuf - 3Pooled

netty源码分析(六) - ByteBuf - 3Pooled

作者: 进击的蚂蚁zzzliu | 来源:发表于2020-10-16 07:08 被阅读0次

概述

PooledByteBufAllocator.png

主要的静态变量

//堆缓冲区个数,大部分情况默认CPU核数 * 2
private static final int DEFAULT_NUM_HEAP_ARENA;
//直接缓冲区个数,大部分情况默认CPU核数 * 2
private static final int DEFAULT_NUM_DIRECT_ARENA;
//每页大小,默认8192
private static final int DEFAULT_PAGE_SIZE;
//page组成的arena满二叉树最大深度,默认11(4096个节点,2048个叶子节点)
private static final int DEFAULT_MAX_ORDER; 
//tiny个数,默认512
private static final int DEFAULT_TINY_CACHE_SIZE;
//small个数,默认256
private static final int DEFAULT_SMALL_CACHE_SIZE;
//normal个数,默认64
private static final int DEFAULT_NORMAL_CACHE_SIZE;
//最大缓存容量,默认32K
private static final int DEFAULT_MAX_CACHED_BUFFER_CAPACITY;
//缓存回收分配次数阈值,默认8192
private static final int DEFAULT_CACHE_TRIM_INTERVAL;
//暂时没研究,默认0不会用到
private static final long DEFAULT_CACHE_TRIM_INTERVAL_MILLIS;
//是否使用缓存,默认true
private static final boolean DEFAULT_USE_CACHE_FOR_ALL_THREADS;
//直接内存对齐,默认0
private static final int DEFAULT_DIRECT_MEMORY_CACHE_ALIGNMENT;
//缓存ByteBuf对象的ArrayDeque长度阈值,默认1023
static final int DEFAULT_MAX_CACHED_BYTEBUFFERS_PER_CHUNK;
//pegasize最小值,小于该值会报错,写死4096
private static final int MIN_PAGE_SIZE = 4096;
//chunksize最大值,大于该值/2,会报错,写死
private static final int MAX_CHUNK_SIZE = (int) (((long) Integer.MAX_VALUE + 1) / 2);
  • DEFAULT_NUM_HEAP_ARENA/DEFAULT_NUM_DIRECT_ARENA
final int defaultMinNumArena = NettyRuntime.availableProcessors() * 2;
final int defaultChunkSize = DEFAULT_PAGE_SIZE << DEFAULT_MAX_ORDER;
DEFAULT_NUM_HEAP_ARENA = Math.max(0,
        SystemPropertyUtil.getInt(
                "io.netty.allocator.numHeapArenas",
                (int) Math.min(defaultMinNumArena,runtime.maxMemory() / defaultChunkSize / 2 / 3)));
DEFAULT_NUM_DIRECT_ARENA = Math.max(0,
        SystemPropertyUtil.getInt(
                "io.netty.allocator.numDirectArenas",
                (int) Math.min(defaultMinNumArena,PlatformDependent.maxDirectMemory() / defaultChunkSize / 2 / 3)));
  • DEFAULT_MAX_ORDER
    每个chunk默认大小是16M,由若干个page组成,page结构为一颗满二叉树,满二叉树最大深度默认11,则有4096个节点,存储数据的叶子节点为2048,2048 * 8K(pagesize) = 16M

构造方法

public static final PooledByteBufAllocator DEFAULT =
            new PooledByteBufAllocator(PlatformDependent.directBufferPreferred());
  • DEFAULT:默认分配器,static字段方便使用

public PooledByteBufAllocator(boolean preferDirect, int nHeapArena, int nDirectArena, int pageSize, int maxOrder,
                                  int tinyCacheSize, int smallCacheSize, int normalCacheSize,
                                  boolean useCacheForAllThreads, int directMemoryCacheAlignment) {
    //directByDefault = false    emptyBuf = new EmptyByteBuf(this);
    super(preferDirect);
    //默认0
    threadCache = new PoolThreadLocalCache(useCacheForAllThreads);
    //默认512
    this.tinyCacheSize = tinyCacheSize;
    //默认256
    this.smallCacheSize = smallCacheSize;
    //默认64
    this.normalCacheSize = normalCacheSize;
    //pageSize:8192     maxOrder:11    chunkSize:16M
    chunkSize = validateAndCalculateChunkSize(pageSize, maxOrder);
    //nHeapArena:cpu核数 * 2
    checkPositiveOrZero(nHeapArena, "nHeapArena");
    //nDirectArena:cpu核数 * 2
    checkPositiveOrZero(nDirectArena, "nDirectArena");
    //directMemoryCacheAlignment:0
    checkPositiveOrZero(directMemoryCacheAlignment, "directMemoryCacheAlignment");
    if (directMemoryCacheAlignment > 0 && !isDirectMemoryCacheAlignmentSupported()) {
        throw new IllegalArgumentException("directMemoryCacheAlignment is not supported");
    }

    if ((directMemoryCacheAlignment & -directMemoryCacheAlignment) != directMemoryCacheAlignment) {
        throw new IllegalArgumentException("directMemoryCacheAlignment: "
                + directMemoryCacheAlignment + " (expected: power of two)");
    }
    //pageShifts默认13
    int pageShifts = validateAndCalculatePageShifts(pageSize);
    //创建堆内存缓冲区
    if (nHeapArena > 0) {
        heapArenas = newArenaArray(nHeapArena);
        List<PoolArenaMetric> metrics = new ArrayList<PoolArenaMetric>(heapArenas.length);
        for (int i = 0; i < heapArenas.length; i ++) {
            PoolArena.HeapArena arena = new PoolArena.HeapArena(this,
                    pageSize, maxOrder, pageShifts, chunkSize,
                    directMemoryCacheAlignment);
            heapArenas[i] = arena;
            metrics.add(arena);
        }
        heapArenaMetrics = Collections.unmodifiableList(metrics);
    } else {
        heapArenas = null;
        heapArenaMetrics = Collections.emptyList();
    }
    //创建直接内存缓冲区
    if (nDirectArena > 0) {
        directArenas = newArenaArray(nDirectArena);
        List<PoolArenaMetric> metrics = new ArrayList<PoolArenaMetric>(directArenas.length);
        for (int i = 0; i < directArenas.length; i ++) {
            PoolArena.DirectArena arena = new PoolArena.DirectArena(
                    this, pageSize, maxOrder, pageShifts, chunkSize, directMemoryCacheAlignment);
            directArenas[i] = arena;
            metrics.add(arena);
        }
        directArenaMetrics = Collections.unmodifiableList(metrics);
    } else {
        directArenas = null;
        directArenaMetrics = Collections.emptyList();
    }
    metric = new PooledByteBufAllocatorMetric(this);
}
  • preferDirect:默认false,即alloc.buffer();默认是分配HeapByteBuf,alloc.directBuffer();才会分配DirectByteBuf
  • threadCache:创建PoolThreadLocalCache(PoolThreadLocalCache extends FastThreadLocal<PoolThreadCache>),useCacheForAllThreads为true默认所有线程都使用Cache;
  • heapArenas/directArenasnew PoolArena[size];创建CPU核数*2大小的PoolArena数组,跟NioEventLoop一样都是2倍cpu核数大小,说明每一个NioEventLoop都有一个独享的Arean,因此在每一个Arean分配内存时都是不用加锁的,PoolArena初始化先不分析

newDirectBuffer/newHeapBuffer

protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
    //获取线程本地变量的PoolArena进行分配
    PoolThreadCache cache = threadCache.get();
    //拿到cache中arena
    PoolArena<ByteBuffer> directArena = cache.directArena;
    final ByteBuf buf;
    if (directArena != null) {
        //进行内存分配
        buf = directArena.allocate(cache, initialCapacity, maxCapacity);
    } else {
        buf = PlatformDependent.hasUnsafe() ?
                UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity) :
                new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
    }
    //资源泄露检测
    return toLeakAwareBuffer(buf);
}

FastThreadLocal中容器是延迟初始化,在PoolThreadCache cache = threadCache.get();时会触发其initialValue方法创建PoolThreadCache并放到FastThreadLocal的容器中;

protected synchronized PoolThreadCache initialValue() {
    //从arenas数组中选择被最少线程使用的arena
    final PoolArena<byte[]> heapArena = leastUsedArena(heapArenas);
    final PoolArena<ByteBuffer> directArena = leastUsedArena(directArenas);
    final Thread current = Thread.currentThread();
    //useCacheForAllThreads默认为true
    if (useCacheForAllThreads || current instanceof FastThreadLocalThread) {
        //创建PoolThreadCache
        final PoolThreadCache cache = new PoolThreadCache(
                heapArena, directArena, tinyCacheSize, smallCacheSize, normalCacheSize,
                DEFAULT_MAX_CACHED_BUFFER_CAPACITY, DEFAULT_CACHE_TRIM_INTERVAL);
        //DEFAULT_CACHE_TRIM_INTERVAL_MILLIS默认为0,不走该逻辑
        if (DEFAULT_CACHE_TRIM_INTERVAL_MILLIS > 0) {
            final EventExecutor executor = ThreadExecutorMap.currentExecutor();
            if (executor != null) {
                executor.scheduleAtFixedRate(trimTask, DEFAULT_CACHE_TRIM_INTERVAL_MILLIS,
                        DEFAULT_CACHE_TRIM_INTERVAL_MILLIS, TimeUnit.MILLISECONDS);
            }
        }
        //返回PoolThreadCache,放到FastThreadLocal的容器中,跟当前线程进行绑定
        return cache;
    }
    // 如果指定了不使用缓存,或者线程换粗对象不是FastThreadLocalThread类型的,则创建一个PoolThreadCache
  // 对象,该对象中是不做任何缓存的,因为初始化数据都是0
    return new PoolThreadCache(heapArena, directArena, 0, 0, 0, 0, 0);
}
  1. 通过leastUsedArena方法从arenas数组中选择被最少线程使用的arena
  2. heapArena/directArena作为构造参数创建PoolThreadCache
  3. 返回PoolThreadCache,放到FastThreadLocal的容器中,跟当前线程进行绑定
private <T> PoolArena<T> leastUsedArena(PoolArena<T>[] arenas) {
    if (arenas == null || arenas.length == 0) {
        return null;
    }
    //取数组中第一个arena元素作为基准
    PoolArena<T> minArena = arenas[0];
    for (int i = 1; i < arenas.length; i++) {
        PoolArena<T> arena = arenas[i];
        //numThreadCaches在PoolThreadCache构造方法中自增,用于记录每个arena上被创建的PoolThreadCache个数
        if (arena.numThreadCaches.get() < minArena.numThreadCaches.get()) {
            //获取到PoolThreadCache个数最少的,即被最少线程使用的arena
            minArena = arena;
        }
    }
    return minArena;
}
  • numThreadCaches:在PoolThreadCache构造方法中自增,用于记录每个arena上被创建的PoolThreadCache个数

相关文章

网友评论

      本文标题:netty源码分析(六) - ByteBuf - 3Pooled

      本文链接:https://www.haomeiwen.com/subject/yjpwpktx.html