一篇文章快速搞懂 Atomic(原子整数/CAS/ABA/原子引用/原子数组/LongAdder)

前言

信赖大部分开发人员,或多或少都看过或写过并发编程的代码。并发关键字除了Synchronized,另有另一大分支Atomic。若是人人没听过没用过先看基础篇,若是听过用过,请滑至底部看进阶篇,深入源码剖析。

提出问题:int线程平安吗?

看过Synchronized相关文章的小伙伴应该知道其是不平安的,再次用代码应验下其不平安性:

public class testInt {
    static int number = 0;

    public static void main(String[] args) throws Exception {
        Runnable runnable = new Runnable() {
            @Override
            public void run() {
                for (int i = 0; i < 100000; i++) {
                    number = number+1;
                }
            }
        };

        Thread t1 = new Thread(runnable);
        t1.start();
        Thread t2 = new Thread(runnable);
        t2.start();

        t1.join();
        t2.join();
        System.out.println("number:" + number);
    }
}

 

 

运行效果:

一篇文章快速搞懂 Atomic(原子整数/CAS/ABA/原子引用/原子数组/LongAdder)

在上面的例子中,我们界说一个初始值为0的静态变量number,再新建并运行两个线程让其各执行10万次的自增操作,若是他是线程平安的,应该两个线程执行后效果为20万,然则我们发现最终的效果是小于20万的,即说明他是不平安的。

在之前Synchronized那篇文章中说过,可以在number=number+1这句代码上下加Synchronized关键字实现线程平安。然则其对资源的开销较大,以是我们今天再看下另外一种实现线程平安的方式Atomic。

Atomic基础篇分界线

原子整数(基础类型)

整体先容

Atomic是jdk提供的一系列包的总称,这个人人族包罗原子整数(AtomicInteger,AtomicLong,AtomicBoolean),原子引用(AtomicReference,AtomicStampedReference,AtomicMarkableReference),原子数组(AtomicIntegerArray,AtomicLongArray,AtomicReferenceArray),更新器(AtomicIntegerFieldUpdater,AtomicLongFieldUpdater,AtomicReferenceFieldUpdater)。

一篇文章快速搞懂 Atomic(原子整数/CAS/ABA/原子引用/原子数组/LongAdder)

AtomicInteger

AtomicInteger,AtomicBoolean,AtomicLong三者功效类似,咱就以AtomicInteger为主剖析原子类。

先看下有哪些API,及其他们详细啥功效:

public class testInt {

    public static void main(String[] args) {
        //界说AtomicInteger类型的变量,值为1
        AtomicInteger i = new AtomicInteger(1);
        //incrementAndGet方式先新增1再返回,以是打印2,此时i为2
        System.out.println(i.incrementAndGet());
        //getAndIncrement方式先返回值再新增1,以是打印2,此时i为3
        System.out.println(i.getAndIncrement());
        //get方式返回当前i值,以是打印3,此时i为3
        System.out.println(i.get());
        //参数为正数即新增,getAndAdd方式先返回值再新增666,以是打印3,此时i为669
        System.out.println(i.getAndAdd(666));
        //参数为负数即减去,getAndAdd方式先返回值再减去1,以是打印669,此时i为668
        System.out.println(i.getAndAdd(-1));
        //参数为正数即新增,addAndGet方式先新增666再返回值,以是打印1334,此时i为1334
        System.out.println(i.addAndGet(666));
        //参数为负数即减去,addAndGet方式先减去-1再返回值,以是打印1333,此时i为1333
        System.out.println(i.addAndGet(-1));
        //getAndUpdate方式IntUnaryOperator参数是一个箭头函数,后面可以写任何操作,以是打印1333,此时i为13331
        System.out.println(i.getAndUpdate(x -> (x * 10 + 1)));
        //最终打印i为13331
        System.out.println(i.get());
    }
} 

 

 

执行效果:

一篇文章快速搞懂 Atomic(原子整数/CAS/ABA/原子引用/原子数组/LongAdder)

对上述int类型的例子改善

public class testInt {
    //1.界说初始值为0的AtomicInteger类型变量number
    static AtomicInteger number = new AtomicInteger(0);

    public static void main(String[] args) throws Exception {
        Runnable runnable = new Runnable() {
            @Override
            public void run() {
                for (int i = 0; i < 100000; i++) {
                    //2.挪用incrementAndGet方式,实现加1操作
                    number.incrementAndGet();
                }
            }
        };

        Thread t1 = new Thread(runnable);
        t1.start();
        Thread t2 = new Thread(runnable);
        t2.start();

        t1.join();
        t2.join();
        System.out.println("number:" + number.get());
    }
}

 

 

我们可以看到运行效果是准确的20万,说明AtomicInteger简直保证了线程平安性,即在多线程的历程中,运行效果照样准确的。然则这存在一个ABA问题,下面将原子引用的时刻再说,先立个flag。

一篇文章快速搞懂 Atomic(原子整数/CAS/ABA/原子引用/原子数组/LongAdder)

源码剖析

我们以incrementAndGet方式为例,看下底层是若何实现的,AtomicInteger类中的incrementAndGet方式挪用了Unsafe类的getAndAddInt方式。

public final int incrementAndGet() {
        return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
}

 

我们看下getAndAddInt方式,内里有个循环,直接值为compareAndSwapInt返回值为true,才竣事循环。这里就不得不提CAS,这就是多线程平安性问题的解决方式。

public final int getAndAddInt(Object var1, long var2, int var4) {
        int var5;
        do {
            var5 = this.getIntVolatile(var1, var2);
        } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));

        return var5;
}

 

CAS

线程1和线程2同事获取了主内存变量值0,线程1加1并写入主内存,现在主内存变量值1,线程2也加2并实验写入主内存,这个时刻是不能写入主内存的,由于会笼罩掉线程1的操作,详细历程如下图。

一篇文章快速搞懂 Atomic(原子整数/CAS/ABA/原子引用/原子数组/LongAdder)

CAS是在线程2实验写入内存的时刻,通过对照并设置(CompareAndSet)发现现在主内存当前值为1,和他刚最先读取的值0不一样,以是他会放弃本次修改,重新读取主内存的最新值,然后再重试下线程2的详细逻辑操作,再次实验写入主内存。若是这时刻线程1,再次对主内存举行了修改,线程2发现现在主内存的值又和预期不一样,以是将放弃本次修改,再次读取主内存最新值,再次重试并实验写入主内存。我们可以发现这是一个重复对照的历程,即直到和预期初始值一样,才会写入主内存,否则将一直读取重试的循环。这就是上面for循环的意义。

CAS的实现实际上利用了CPU指令来实现的,若是操作系统不支持CAS,照样会加锁的,若是操作系统支持CAS,则使用原子性的CPU指令。

原子引用

在一样平常使用中,我们不止对上述基本类型举行原子操作,而是需要对一些庞大类型举行原子操作,以是需要AtomicReference。

不平安实现

先看不平安的BigDecimal类型:

public class testReference {
    static BigDecimal number = BigDecimal.ZERO;

    public static void main(String[] args) throws Exception {

        Runnable runnable = new Runnable() {
            @Override
            public void run() {
                for (int i = 0; i < 1000; i++) {
                   number=number.add(BigDecimal.ONE);
                }
            }
        };

        Thread t1 = new Thread(runnable);
        t1.start();

        Thread t2 = new Thread(runnable);
        t2.start();

        t1.join();
        t2.join();

        System.out.println(number);
    }
} 

 

 

运行效果如下图,我们可以看到两个线程,自循环1000次加1操作,最终效果应该是2000,可是效果小于2000。

一篇文章快速搞懂 Atomic(原子整数/CAS/ABA/原子引用/原子数组/LongAdder)

平安实现-使用CAS

public class testReference {
    //界说AtomicReference类型BigDecimal变量
    static AtomicReference<BigDecimal> number = new AtomicReference<BigDecimal>(BigDecimal.ZERO);

    public static void main(String[] args) throws Exception {

        Runnable runnable = new Runnable() {
            @Override
            public void run() {
                for (int i = 0; i < 1000; i++) {
                    //手动写循环+CAS判断
                    while(true){
                        BigDecimal pre=number.get();
                        BigDecimal next=number.get().add(BigDecimal.ONE);
                      if(number.compareAndSet(pre,next))  {
                          break;
                      }
                    }
                }
            }
        };

        Thread t1 = new Thread(runnable);
        t1.start();

        Thread t2 = new Thread(runnable);
        t2.start();

        t1.join();
        t2.join();

        System.out.println(number.get());

    }
}

 

 

运行效果如下:

一篇文章快速搞懂 Atomic(原子整数/CAS/ABA/原子引用/原子数组/LongAdder)

ABA问题及解决

在上面CAS历程中,是通过值对照来知晓是不是能够更新乐成,那若是线程1先加1再减1,这样主内存照样原来的值,即线程2照样可以更新乐成的。然则这样逻辑错了,线程1已经发生了修改,线程2不能直接更新乐成。

一篇文章快速搞懂 Atomic(原子整数/CAS/ABA/原子引用/原子数组/LongAdder)

「持续集成实践系列 」Jenkins 2.x 构建CI自动化流水线常见技巧

代码:

public class testInt {

    static AtomicInteger number = new AtomicInteger(0);

    public static void main(String[] args) throws Exception {


        Thread t1 = new Thread(new Runnable() {
            @Override
            public void run() {
                int a = number.get();
                System.out.println("最先number:" + a);
                try {
                    Thread.sleep(5000L);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(number.compareAndSet(a, a++));


            }
        });
        t1.start();
        Thread t2 = new Thread(new Runnable() {
            @Override
            public void run() {
                System.out.println("最先增添操作");
                int a = number.incrementAndGet();
                System.out.println("当前number:" + a);
                int b = number.decrementAndGet();
                System.out.println("当前number:" + b);
            }
        });
        t2.start();

        t1.join();
        t2.join();
    }
} 

 

 

我们看线程2对其举行了一系列操作,然则最后打印了照样true,示意可以更新乐成的。这显然纰谬。

一篇文章快速搞懂 Atomic(原子整数/CAS/ABA/原子引用/原子数组/LongAdder)

那我们可以使用AtomicStampedReference,为其添加一个版本号。线程1在刚最先读取主内存的时刻,获取到值为0,版本为1,线程2也获取到这两个值,线程1举行加1,减1的操作的时刻,版本各加1,现在主内存的值为0,版本为2,而线程2还拿着预计值为0,版本为1的数据实验写入主内存,这个时刻因版本差别而更新失败。详细我们用代码试下:

public class testInt {

    static AtomicStampedReference<Integer> number = new AtomicStampedReference<Integer>(0, 0);

    public static void main(String[] args) throws Exception {


        Thread t1 = new Thread(new Runnable() {
            @Override
            public void run() {
                int a = number.getReference();
                int s = number.getStamp();
                System.out.println("最先number:" + a + ",stamp:" + s);
                try {
                    Thread.sleep(5000L);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(number.compareAndSet(a, a + 1, s, s + 1));


            }
        });
        t1.start();
        Thread t2 = new Thread(new Runnable() {
            @Override
            public void run() {
                System.out.println("最先增添操作");
                int a = number.getReference();
                int s = number.getStamp();
                number.compareAndSet(a, a + 1, s, s + 1);
                System.out.println("当前number:" + a + ",stamp:" + (s + 1));
                a = number.getReference();
                s = number.getStamp();
                number.compareAndSet(a, a - 1, s, s + 1);
                System.out.println("当前number:" + a + ",stamp:" + (s+1));
            }
        });
        t2.start();

        t1.join();
        t2.join();
    }
} 

 

我们可以看到每次操作都市更新stamp(版本号),在最后对比的时刻不仅对照值,还对照版本号,以是是不能更新乐成的,false.

一篇文章快速搞懂 Atomic(原子整数/CAS/ABA/原子引用/原子数组/LongAdder)

原子数组

AtomicIntegerArray,AtomicLongArray,AtomicReferenceArray三者类似,以是以AtomicIntegerArray为例,我们可以将下面AtomicIntegerArray看做是AtomicInteger类型的数组,其底层很类似,就不详细写了。

AtomicIntegerArray  array = new AtomicIntegerArray(10);
array.getAndIncrement(0);   // 将第0个元素原子地增添1

 

AtomicInteger[]  array = new AtomicInteger[10];
array[0].getAndIncrement();  // 将第0个元素原子地增添1

 

字段更新器和原子累加器对照简单,这里就不说了。 

Atomic进阶篇分界线

LongAdder源码剖析

LongAdder使用

LongAdder是jdk1.8之后新加的,那为什么要加他?这个问题,下面将回覆,我们先看下若何使用。

public class testLongAdder {
    public static void main(String[] args) throws Exception {
        LongAdder number = new LongAdder();
        Runnable runnable = new Runnable() {
            @Override
            public void run() {
                for (int j = 0; j < 10000; j++) {
                    number.add(1L);
                }
            }
        };
        Thread t1 = new Thread(runnable);
        Thread t2 = new Thread(runnable);
        t1.start();
        t2.start();

        t1.join();
        t2.join();

        System.out.println("number:" + number);
    }
}

 

我们可以看到LongAdder的使用和AtomicLong大致相同,使用两个线程Thread1,Thread2对number值各举行一万次的自增操作,最后的number是准确的两万。

一篇文章快速搞懂 Atomic(原子整数/CAS/ABA/原子引用/原子数组/LongAdder)

与Atomic的对比优势

那问题来了,既然AtomicLong能够完成对多线程下的number举行线程平安的操作,那为什么还要LongAdder?我们先来段代码对照下,两个在效果都是准确的前提下,性能方面的差距。

public class testLongAdder {
    public static void main(String[] args) {
       //1个线程,举行100万次自增操作
        test1(1,1000000);
      //10个线程,举行100万次自增操作
        test1(10,1000000);
     //100个线程,举行100万次自增操作
        test1(100,1000000);
    }

    static void test1(int threadCount,int times){
        long startTime=System.currentTimeMillis();
        AtomicLong number1=new AtomicLong();
        List<Thread> threads1=new ArrayList<>();
        for(int i=0;i<threadCount;i++) {
            threads1.add(new Thread(new Runnable() {
                @Override
                public void run() {
                    for (int j = 0; j < times; j++) {
                        number1.incrementAndGet();
                    }
                }
            }));
        }
        threads1.forEach(thread -> thread.start());
        threads1.forEach(thread ->{
            try {
                thread.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        } );

        long endTime=System.currentTimeMillis();
        System.out.println("AtomicLong:"+number1+",time:"+(endTime-startTime));

        LongAdder number2=new LongAdder();
        List<Thread> threads2=new ArrayList<>();
        for(int i=0;i<threadCount;i++) {
            threads2.add(new Thread(new Runnable() {
                @Override
                public void run() {
                    for (int j = 0; j < times; j++) {
                        number2.add(1);
                    }
                }
            }));
        }
        threads2.forEach(thread -> thread.start());
        threads2.forEach(thread ->{
            try {
                thread.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        } );

        System.out.println("LongAdder:"+number2+",time:"+(System.currentTimeMillis()-endTime));

    }
} 

 

上述代码对比了1个线程,10个线程,100个线程在举行100百次自增操作后,AtomicLong和LongAdder所破费的时间。通过打印语句,我们发现在最终number1和number2都准确的基础上,LongAdder破费的时间比AtomicLong少了一个量级。

一篇文章快速搞懂 Atomic(原子整数/CAS/ABA/原子引用/原子数组/LongAdder)

源码剖析

那为什么会导致这种情形,我们就要从源码层面剖析。AtomicLong为什么效率低?由于若是线程数目一多,尤其在高并发的情形下,好比有100个线程同时想要对工具举行操作,一定只有一个线程会获取到锁,其他99个线程可能空转,一直循环知道线程释放锁。若是该线程操作完毕释放了锁,其他99个线程再次竞争,也只有一个线程获取锁,另外98个线程照样空转,直到锁被释放。这样CAS操作会虚耗大量资源在空转上,从而使得AtomicLong在线程数越来越多的情形下越来越慢。

AtomicLong是多个线程对同一个value值举行操作,导致多个线程自旋次数太多,性能降低。而LongAdder在无竞争的情形,跟AtomicLong一样,对同一个base举行操作,当泛起竞争关系时则是接纳化整为零的做法,从空间换时间,用一个数组cells,将一个value拆分进这个数组cells。多个线程需要同时对value举行操作时刻,可以对线程id举行hash获得hash值,再凭据hash值映射到这个数组cells的某个下标,再对该下标所对应的值举行自增操作。当所有线程操作完毕,将数组cells的所有值和无竞争值base都加起来作为最终效果。

一篇文章快速搞懂 Atomic(原子整数/CAS/ABA/原子引用/原子数组/LongAdder)

我们先看下LongAdder内里的字段,发现其内里没有,主要是在其继续的Stripped64类中,有下面四个主要变量。

 /** CPU数目,即cells数组的最大长度*/
    static final int NCPU = Runtime.getRuntime().availableProcessors();

    /**
     *cells数组,为2的幂,2,4,8,16.....,利便以后位运算
     */
    transient volatile Cell[] cells;

    /**
     * 基值,主要用于没有竞争的情形,通过CAS更新。
     */
    transient volatile long base;

    /**
     * 调整单元格巨细(扩容),建立单元格时使用的锁。
     */
    transient volatile int cellsBusy;

 

 

下面是add方式最先。

 public void add(long x) {
        //as:cells数组的引用
        //b:base的基础值
        //v:期望值
        //m:cells数组巨细
        //a:当前数组掷中的单元
        Cell[] as; long b, v; int m; Cell a;
        //as不为空(cells已经初始化过,说明之前有其他线程对初始化)或者CAS操作不乐成(线程间泛起竞争)
        if ((as = cells) != null || !casBase(b = base, b + x)) {
        //初始化uncontented,true示意未竞争(由于有两个情形,这里先初始化,后面临其修改,就能区分这两种情形)
        boolean uncontended = true;
        //as即是null(cells未初始化)
        //或者线程id哈希出来的下标所对应的值为空(cell即是空),getProbe() & m功效是获取下标,底层逻辑是位运算
        //或者更新失败为false,即发生竞争,取非就为ture
        if (as == null || (m = as.length - 1) < 0 ||
                (a = as[getProbe() & m]) == null ||
                !(uncontended = a.cas(v = a.value, v + x)))
        //进入到if内里,说明更新case失败,或者更新某个cell也失败了,或者cell为空,或者cells为空
                longAccumulate(x, null, uncontended);
        }
    }

 

从LongAdder挪用Stripped64的longAccumulate方式,主要是初始化cellscells的扩容多个线程同时掷中一个cell的竞争操作。

final void longAccumulate(long x, LongBinaryOperator fn,
                              boolean wasUncontended) {
        //x:add的值,fn:为null,wasUncontended:是否发生竞争,true为发生竞争,false为不发生竞争
        int h;//线程的hash值
        //若是该线程为0,即第一次进来,以是ThreadLocalRandom强制初始化线程id,再对其hash
        if ((h = getProbe()) == 0) {
            ThreadLocalRandom.current(); 
            h = getProbe();
            wasUncontended = true;
        }
        //扩容意向,为false一定不扩容,为true可能扩容
        boolean collide = false; 
        //死循环               
        for (;;) {
            //as:cells数组的引用
            //a:当前线程掷中的cell
            //n:cells的长度
            //v:当前线程掷中的cell所拥有的value值
            Cell[] as; Cell a; int n; long v;
            //cells不为空
            if ((as = cells) != null && (n = as.length) > 0) {
                //当前线程掷中的cell为空,下面逻辑是新增cell
                if ((a = as[(n - 1) & h]) == null) {
                    if (cellsBusy == 0) {       // Try to attach new Cell
                        Cell r = new Cell(x);   // Optimistically create
                        if (cellsBusy == 0 && casCellsBusy()) {
                            boolean created = false;
                            try {               // Recheck under lock
                                Cell[] rs; int m, j;
                                if ((rs = cells) != null &&
                                    (m = rs.length) > 0 &&
                                    rs[j = (m - 1) & h] == null) {
                                    rs[j] = r;
                                    created = true;
                                }
                            } finally {
                                cellsBusy = 0;
                            }
                            if (created)
                                break;
                            continue;           // Slot is now non-empty
                        }
                    }
                    collide = false;
                }
                //发生竞争
                else if (!wasUncontended)       // CAS already known to fail
                    wasUncontended = true;      // Continue after rehash
                //没有竞争,实验修改当前线程对应的cell值,乐成跳出循环
                else if (a.cas(v = a.value, ((fn == null) ? v + x :
                                             fn.applyAsLong(v, x))))
                    break;
                //若是n大于CPU最大数目,不能扩容
                else if (n >= NCPU || cells != as)
                    collide = false;            // At max size or stale
                else if (!collide)
                    collide = true;
                //获取到了锁,举行扩容,为2的幂,
                else if (cellsBusy == 0 && casCellsBusy()) {
                    try {
                        if (cells == as) {      // Expand table unless stale
                            Cell[] rs = new Cell[n << 1];//左移一位运算符,数目加倍
                            for (int i = 0; i < n; ++i)
                                rs[i] = as[i];
                            cells = rs;
                        }
                    } finally {
                        cellsBusy = 0;
                    }
                    collide = false;
                    continue;                   // Retry with expanded table
                }
                h = advanceProbe(h);
            }
            //cells即是空,而且获取到锁,最先初始化事情,建立竣事释放锁,继续循环
            else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
                boolean init = false;
                try {                           // Initialize table
                    if (cells == as) {
                        Cell[] rs = new Cell[2];
                        rs[h & 1] = new Cell(x);
                        cells = rs;
                        init = true;
                    }
                } finally {
                    cellsBusy = 0;
                }
                if (init)
                    break;
            }
            else if (casBase(v = base, ((fn == null) ? v + x :
                                        fn.applyAsLong(v, x))))
                break;                          // Fall back on using base
        }
    }

 

 

结语

竣事了,撒花。这篇主要说了Atomic的一些使用,包罗Atomic原子类(AtomicInteger,AtomicLong,AtomicBoolean),Atomic原子引用(AtomicReference,AtomicStampedReference),以及1.8之后LongAdder的优势,源码剖析。历程还穿插了一些CAS,ABA问题引入和解决方式。

一篇文章快速搞懂 Atomic(原子整数/CAS/ABA/原子引用/原子数组/LongAdder)

 

参考资料

Java多线程进阶(十七)—— J.U.C之atomic框架:LongAdder

CAS原理

Java 8 Performance Improvements: LongAdder vs AtomicLong

AtomicInteger深入明白

原子操作类AtomicInteger详解

玩转Java并发工具,醒目JUC,成为并发多面手

原创文章,作者:28x0新闻网,如若转载,请注明出处:https://www.28x0.com/archives/14230.html