美文网首页
AtomicXXX以及LongAdder底层原理

AtomicXXX以及LongAdder底层原理

作者: 王侦 | 来源:发表于2022-11-02 20:46 被阅读0次

要搞懂AtomicXXX等的原理,首先就要了解CAS的原理。

1.CAS

1.1 什么是 CAS?

CAS(Compare And Swap,比较并交换),通常指的是这样一种原子操作:针对一个变量,首先比较它的内存值与某个期望值是否相同,如果相同,就给它赋一个新值。

在 Java 中,CAS 操作是由 Unsafe 类提供支持的,该类定义了三种针对不同类型变量的 CAS 操作


1.2 CAS源码分析

Hotspot 虚拟机对compareAndSwapInt 方法的实现如下:

#unsafe.cpp
UNSAFE_ENTRY(jboolean, Unsafe_CompareAndSwapInt(JNIEnv *env, jobject unsafe, jobject obj, jlong offset, jint e, jint x))
  UnsafeWrapper("Unsafe_CompareAndSwapInt");
  oop p = JNIHandles::resolve(obj);
  // 根据偏移量,计算value的地址
  jint* addr = (jint *) index_oop_from_field_offset_long(p, offset);
  // Atomic::cmpxchg(x, addr, e) cas逻辑 x:要交换的值   e:要比较的值
  //cas成功,返回期望值e,等于e,此方法返回true 
  //cas失败,返回内存中的value值,不等于e,此方法返回false
  return (jint)(Atomic::cmpxchg(x, addr, e)) == e;

核心逻辑在Atomic::cmpxchg方法中,这个根据不同操作系统和不同CPU会有不同的实现。这里我们以linux_64x的为例,查看Atomic::cmpxchg的实现

#atomic_linux_x86.inline.hpp
inline jint     Atomic::cmpxchg    (jint     exchange_value, volatile jint*     dest, jint     compare_value) {
  //判断当前执行环境是否为多处理器环境
  int mp = os::is_MP();
  //LOCK_IF_MP(%4) 在多处理器环境下,为 cmpxchgl 指令添加 lock 前缀,以达到内存屏障的效果
  //cmpxchgl 指令是包含在 x86 架构及 IA-64 架构中的一个原子条件指令,
  //它会首先比较 dest 指针指向的内存值是否和 compare_value 的值相等,
  //如果相等,则双向交换 dest 与 exchange_value,否则就单方面地将 dest 指向的内存值交给exchange_value。
  //这条指令完成了整个 CAS 操作,因此它也被称为 CAS 指令。
  __asm__ volatile (LOCK_IF_MP(%4) "cmpxchgl %1,(%3)"
                    : "=a" (exchange_value)
                    : "r" (exchange_value), "a" (compare_value), "r" (dest), "r" (mp)
                    : "cc", "memory");
  return exchange_value;

现代处理器指令集架构基本上都会提供 CAS 指令,例如 x86 和 IA-64 架构中的 cmpxchgl 指令和 comxchgq 指令,sparc 架构中的 cas 指令和 casx 指令。

不管是 Hotspot 中的 Atomic::cmpxchg 方法,还是 Java 中的 compareAndSwapInt 方法,它们本质上都是对相应平台的 CAS 指令的一层简单封装。CAS 指令作为一种硬件原语,有着天然的原子性,这也正是 CAS 的价值所在。

1.3 CAS缺陷

CAS 虽然高效地解决了原子操作,但是还是存在一些缺陷的,主要表现在三个方面:

  • 自旋 CAS 长时间地不成功,则会给 CPU 带来非常大的开销
  • 只能保证一个共享变量原子操作, AtomicInteger等
  • ABA 问题

ABA问题及其解决方案

  • 增加版本号,每次修改可以通过+1保证版本唯一性。这样就可以保证每次修改后的版本也会往上递增。

2.AtomicXXX底层原理

以AtomicInteger为例,看看核心方法:

//getAndSet(int newValue):将实例中的值更新为新值,并返回旧值;
public final boolean getAndSet(boolean newValue) {
    boolean prev;
    do {
        prev = get();
    } while (!compareAndSet(prev, newValue));
    return prev;
}

其核心原理都是通过while循环 + CAS实现增加的,直到成功增加为止。

3.LongAdder原理

AtomicXXX增加逻辑是采用自旋的方式不断更新目标值,直到更新成功。

在并发量较低的环境下,线程冲突的概率比较小,自旋的次数不会很多。但是,高并发环境下,N个线程同时进行自旋操作,会出现大量失败并不断自旋的情况,此时AtomicLong的自旋会成为瓶颈。

这就是LongAdder引入的初衷——解决高并发环境下AtomicInteger,AtomicLong的自旋瓶颈问题。

3.1 原理

AtomicLong中有个内部变量value保存着实际的long值,所有的操作都是针对该变量进行。也就是说,高并发环境下,value变量其实是一个热点,也就是N个线程竞争一个热点。LongAdder的基本思路就是分散热点,将value值分散到一个数组中,不同线程会命中到数组的不同槽中,各个线程只对自己槽中的那个值进行CAS操作,这样热点就被分散了,冲突的概率就小很多。如果要获取真正的long值,只要将各个槽中的变量值累加返回。

3.2 源码

LongAdder内部有一个base变量,一个Cell[]数组:

  • base变量:非竞态条件下,直接累加到该变量上
  • Cell[]数组:竞态条件下,累加个各个线程自己的槽Cell[i]中

着重来看一下LongAdder#add方法:

    public void add(long x) {
        //as 表示cells引用
        //b 表示获取的base值
        //v 表示 期望值
        //m 表示 cells 数组的长度
        //a 表示当前线程命中的cell单元格
        Cell[] as; long b, v; int m; Cell a;

        //条件一:true->表示cells已经初始化过了,当前线程应该将数据写入到对应的cell中
        //       false->表示cells未初始化,当前所有线程应该将数据写到base中

        //条件二:false->表示当前线程cas替换数据成功,
        //       true->表示发生竞争了,可能需要重试 或者 扩容
        if ((as = cells) != null || !casBase(b = base, b + x)) {
            //什么时候会进来?
            //1.true->表示cells已经初始化过了,当前线程应该将数据写入到对应的cell中
            //2.true->表示发生竞争了,可能需要重试 或者 扩容



            //true -> 未竞争  false->发生竞争
            boolean uncontended = true;

            //条件一:true->说明 cells 未初始化,也就是多线程写base发生竞争了
            //       false->说明 cells 已经初始化了,当前线程应该是 找自己的cell 写值

            //条件二:getProbe() 获取当前线程的hash值   m表示cells长度-1 cells长度 一定是2的次方数   15= b1111
            //       true-> 说明当前线程对应下标的cell为空,需要创建 longAccumulate 支持
            //       false-> 说明当前线程对应的cell 不为空,说明 下一步想要将x值 添加到cell中。

            //条件三:true->表示cas失败,意味着当前线程对应的cell 有竞争
            //       false->表示cas成功
            if (as == null || (m = as.length - 1) < 0 ||
                    (a = as[getProbe() & m]) == null ||
                    !(uncontended = a.cas(v = a.value, v + x)))
                //都有哪些情况会调用?
                //1.true->说明 cells 未初始化,也就是多线程写base发生竞争了[重试|初始化cells]
                //2.true-> 说明当前线程对应下标的cell为空,需要创建 longAccumulate 支持
                //3.true->表示cas失败,意味着当前线程对应的cell 有竞争[重试|扩容]
                longAccumulate(x, null, uncontended);
        }
    }

4.优秀设计总结

互斥锁,在多线程竞争激烈的情况下,伴随着大量线程上下文切换和独占,严重降低吞吐量。

如果使用CAS + Volatile + 自旋这种乐观锁的机制,将会大幅度提升吞吐量。

但是在严重竞争的情况下,大量的CAS失败,反复的自旋,会严重消耗CPU资源。

在这种情况下,需要弄清楚竞争的资源是什么,是否可以将竞争点拆分,分摊压力。这种方式本质上是一种锁的粒度控制,将锁的粒度控制的更细,锁的范围更小。这种做法很多地方都有:

  • ConcurrentHashMap中的实现,将对整个Map的锁,细化到对每个槽位上
  • 如使用Synchronized或者ReentrantLock,尽量控制锁的范围,避免锁的范围
  • Mysql的表锁和行锁设计,从MyISAM表锁到InnoDB支持行锁,将锁的粒度控制在行,从而提高存储引擎的并发

通过以上的优秀设计,可以总结出一套锁的优化思路:

  • 互斥锁 -> 乐观锁 -> 锁的粒度控制

在Java中对应的实现方式:

  • ReentrantLock或者Syschronized -> CAS + Volatile + 自旋 -> 拆分竞争点

相关文章

网友评论

      本文标题:AtomicXXX以及LongAdder底层原理

      本文链接:https://www.haomeiwen.com/subject/xngjtdtx.html