Java高并发编程详解:深入理解并发核心库
上QQ阅读APP看书,第一时间看更新

2.4 AtomicReference详解

AtomicReference类提供了对象引用的非阻塞原子性读写操作,并且提供了其他一些高级的用法。众所周知,对象的引用其实是一个4字节的数字,代表着在JVM堆内存中的引用地址,对一个4字节数字的读取操作和写入操作本身就是原子性的,通常情况下,我们对对象引用的操作一般都是获取该引用或者重新赋值(写入操作),我们也没有办法对对象引用的4字节数字进行加减乘除运算,那么为什么JDK要提供AtomicReference类用于支持引用类型的原子性操作呢?

本节将结合实例为大家解释AtomicReference的用途,在某些场合下该类可以完美地替代synchronized关键字和显式锁,实现在多线程下的非阻塞操作。

2.4.1 AtomicReference的应用场景

这里通过设计一个个人银行账号资金变化的场景,逐渐引入AtomicReference的使用,该实例有些特殊,需要满足如下几点要求。

▪ 个人账号被设计为不可变对象,一旦创建就无法进行修改。

▪ 个人账号类只包含两个字段:账号名、现金数字。

▪ 为了便于验证,我们约定个人账号的现金只能增多而不能减少。

根据前两个要求,我们简单设计一个代表个人银行账号的Java类DebitCard,该类将被设计为不可变。

程序代码:DebitCard.java


package com.wangwenjun.concurrent.juc.automic;

public class DebitCard
{
    private final String account;
    private final int amount;

    public DebitCard(String account, int amount)
    {
        this.account = account;
        this.amount = amount;
    }

    public String getAccount()
    {
        return account;
    }

    public int getAmount()
    {
        return amount;
    }

    @Override
    public String toString()
    {
        return "DebitCard{" +
                "account='" + account + '\'' +
                ", amount=" + amount +
                '}';
    }
}

1. 多线程下增加账号金额

假设有10个人不断地向这个银行账号里打钱,每次都存入10元,因此这个个人账号在每次被别人存入钱之后都会多10元。下面用多线程代码实现一下这样的场景。

程序代码:AtomicReferenceExample1.java


package com.wangwenjun.concurrent.juc.automic;

import java.util.concurrent.TimeUnit;

import static java.util.concurrent.ThreadLocalRandom.current;

public class AtomicReferenceExample1
{
// volatile关键字修饰,每次对DebitCard对象引用的写入操作都会被其他线程看到
// 创建初始DebitCard,账号金额为0元
static volatile DebitCard debitCard = new DebitCard("Alex", 0);

    public static void main(String[] args)
    {
        for (int i = 0; i < 10; i++)
        {
            new Thread("T-" + i)
            {
                @Override
                public void run()
                {
                    while (true)
                    {
                        // 读取全局DebitCard对象的引用
                        final DebitCard dc = debitCard;
                        // 基于全局DebitCard的金额增加10元并且产生一个新的DebitCard
                        DebitCard newDC = new DebitCard(dc.getAccount(),
                                                    dc.getAmount() + 10);
                        // 输出全新的DebitCard
                        System.out.println(newDC);
                        // 修改全局DebitCard对象的引用
                        debitCard = newDC;

                        try
                        {
                            TimeUnit.MILLISECONDS.sleep(current().nextInt(20));
                        } catch (InterruptedException e)
                        {
                            e.printStackTrace();
                        }
                    }
                }
            }.start();
        }
    }
}

在上面的代码中,我们声明了一个全局的DebitCard对象的引用,并且用volatile关键字进行了修饰,其目的主要是为了使DebitCard对象引用的变化对其他线程立即可见,在每个线程中都会基于全局的DebitCard金额创建一个新的DebitCard,并且用新的DebitCard对象引用更新全局DebitCard对象的引用。运行上面的程序,我们能够看到控制台输出存在的问题。

程序输出:AtomicReferenceExample1.java


...省略
DebitCard{account='Alex', amount=10}
DebitCard{account='Alex', amount=10}
DebitCard{account='Alex', amount=10}
DebitCard{account='Alex', amount=20}
DebitCard{account='Alex', amount=30}
DebitCard{account='Alex', amount=40}
DebitCard{account='Alex', amount=50}
DebitCard{account='Alex', amount=60}
DebitCard{account='Alex', amount=70}
DebitCard{account='Alex', amount=80}
...省略

分明已有3个人向这个账号存入了10元钱,为什么账号的金额却少于30元呢?不明白的读者可以参考笔者在《Java高并发编程详解:多线程与架构设计》一书第4章中介绍的方法自行分析,这里给点小提示:虽然被volatile关键字修饰的变量每次更改都可以立即被其他线程看到,但是我们针对对象引用的修改其实至少包含了如下两个步骤,获取该引用和改变该引用(每一个步骤都是原子性的操作,但组合起来就无法保证原子性了)。

2. 多线程下加锁增加账号金额

那么我们该如何解决第1小节中出现的问题呢?相信很多人的第一反应是提出为共享数据加锁的解决方案,没错,通过加锁确实能够保证对DebitCard对象引用的原子性操作。下面简单修改一下第1小节中程序。


synchronized (AtomicReferenceExample2.class)
{
    final DebitCard dc = debitCard;
    DebitCard newDC = new DebitCard(dc.getAccount(), dc.getAmount() + 10);
    System.out.println(newDC);
    debitCard = newDC;
}
try
{
    TimeUnit.MILLISECONDS.sleep(current().nextInt(20));
} catch (InterruptedException e)
{
    e.printStackTrace();
}

相比较AtomicReferenceExample1.java,我们在AtomicReferenceExample2. Java中增加了同步代码块,用于确保同一时刻只能由一个线程对全局DebitCard的对象引用进行修改。运行修改之后的程序,我们会看到Alex的银行账号在以10作为步长逐渐递增。


...省略
DebitCard{account='Alex', amount=310}
DebitCard{account='Alex', amount=320}
DebitCard{account='Alex', amount=330}
DebitCard{account='Alex', amount=340}
DebitCard{account='Alex', amount=350}
DebitCard{account='Alex', amount=360}
DebitCard{account='Alex', amount=370}
DebitCard{account='Alex', amount=380}
DebitCard{account='Alex', amount=390}
DebitCard{account='Alex', amount=400}
DebitCard{account='Alex', amount=410}
DebitCard{account='Alex', amount=420}
DebitCard{account='Alex', amount=430}
DebitCard{account='Alex', amount=440}
...省略

3. AtomicReference的非阻塞解决方案

第2小节中的方案似乎满足了我们的需求,但是它却是一种阻塞式的解决方案,同一时刻只能有一个线程真正在工作,其他线程都将陷入阻塞,因此这并不是一种效率很高的解决方案,这个时候就可以利用AtomicReference的非阻塞原子性解决方案提供更加高效的方式了。

基于AtomicReferenceExample1.java创建一个新的java文件,并且用Atomic Reference代替volatile关键字,代码如下所示。

程序代码:AtomicReferenceExample3.java


package com.wangwenjun.concurrent.juc.automic;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import static java.util.concurrent.ThreadLocalRandom.current;

public class AtomicReferenceExample3
{
// 定义AtomicReference并且初始值为DebitCard("Alex", 0)
    private static AtomicReference<DebitCard> debitCardRef
            = new AtomicReference<>(new DebitCard("Alex", 0));

    public static void main(String[] args)
    {
        // 启动10个线程
        for (int i = 0; i < 10; i++)
        {
            new Thread("T-" + i)
            {
                @Override
                public void run()
                {
                    while (true)
                    {
                        // 获取AtomicReference的当前值
                        final DebitCard dc = debitCardRef.get();
                        // 基于AtomicReference的当前值创建一个新的DebitCard
                        DebitCard newDC = new DebitCard(dc.getAccount(),                         dc.getAmount() + 10);
                        // 基于CAS算法更新AtomicReference的当前值
                        if (debitCardRef.compareAndSet(dc, newDC))
                        {
                            System.out.println(newDC);
                        }

                        try
                        {
                            TimeUnit.MILLISECONDS.sleep(current().nextInt(20));
                        } catch (InterruptedException e)
                        {
                            e.printStackTrace();
                        }
                    }
                }
            }.start();
        }
    }
}

在上面的程序代码中,我们使用了AtomicReference封装DebitCard的对象引用,每一次对AtomicReference的更新操作,我们都采用CAS这一乐观非阻塞的方式进行,因此也会存在对DebitCard对象引用更改失败的问题(更新时所持有的期望值引用有可能并不是AtomicReference所持有的当前引用,这也是第1小节中程序运行出现错误的根本原因。比如,A线程获得了DebitCard引用R1,在进行修改之前B线程已经将全局引用更新为R2,A线程仍然基于引用R1进行计算并且最终将全局引用更新为R1)。

CAS算法在此处就是要确保接下来要修改的对象引用是基于当前线程刚才获取的对象引用,否则更新将直接失败。运行上面的程序,我们再来分析一下控制台的输出。

程序输出:AtomicReferenceExample3.java


...省略
DebitCard{account='Alex', amount=20}
DebitCard{account='Alex', amount=10}
DebitCard{account='Alex', amount=30}
DebitCard{account='Alex', amount=40}
DebitCard{account='Alex', amount=50}
DebitCard{account='Alex', amount=60}
DebitCard{account='Alex', amount=70}
DebitCard{account='Alex', amount=80}
DebitCard{account='Alex', amount=90}
DebitCard{account='Alex', amount=100}
DebitCard{account='Alex', amount=120}
DebitCard{account='Alex', amount=130}
DebitCard{account='Alex', amount=110}
DebitCard{account='Alex', amount=140}
...省略

控制台的输出显示账号的金额按照10的步长在增长,由于非阻塞的缘故,数值20的输出有可能会出现在数值10的前面,数值130的输出则出现在了数值110的前面,但这并不妨碍amount的数值是按照10的步长增长的。

4. 性能大PK

AtomicReference所提供的非阻塞原子性对象引用读写解决方案,被应用在很多高并发容器中,比如ConcurrentHashMap。为了让读者更加直观地看到阻塞与非阻塞的性能对比,本节将使用JMH工具对比两者的性能,参赛双方分别是synchronized关键字和AtomicReference。

程序代码:AtomicReferenceExample4.java


package com.wangwenjun.concurrent.juc.automic;

import org.openjdk.jmh.annotations.*;
import org.openjdk.jmh.profile.StackProfiler;
import org.openjdk.jmh.runner.Runner;
import org.openjdk.jmh.runner.RunnerException;
import org.openjdk.jmh.runner.options.Options;
import org.openjdk.jmh.runner.options.OptionsBuilder;
import org.openjdk.jmh.runner.options.TimeValue;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

@Measurement(iterations = 20)
@Warmup(iterations = 20)
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
public class AtomicReferenceExample4
{
    @State(Scope.Group)
    public static class MonitorRace
    {
        private DebitCard debitCard = new DebitCard("Alex", 0);

        public void syncInc()
        {
            synchronized (AtomicReferenceExample4.class)
            {
                final DebitCard dc = debitCard;
                final DebitCard newDC = new DebitCard(dc.getAccount(),                                                           dc.getAmount() + 10);
                this.debitCard = newDC;
            }
        }
    }

    @State(Scope.Group)
    public static class AtomicReferenceRace
    {
        private AtomicReference<DebitCard> ref
                = new AtomicReference<>(new DebitCard("Alex", 0));
        public void casInc()
        {
            final DebitCard dc = ref.get();
            final DebitCard newDC = new DebitCard(dc.getAccount(), dc.getAmount() + 10);
            ref.compareAndSet(dc, newDC);
        }
    }

    @GroupThreads(10)
    @Group("sync")
    @Benchmark
    public void syncInc(MonitorRace monitor)
    {
        monitor.syncInc();
    }

    @GroupThreads(10)
    @Group("cas")
    @Benchmark
    public void casInc(AtomicReferenceRace casRace)
    {
        casRace.casInc();
    }

    public static void main(String[] args) throws RunnerException
    {
        Options opts = new OptionsBuilder()
             .include(AtomicReferenceExample4.class.getSimpleName())
             .forks(1)
             .timeout(TimeValue.seconds(10))
             .addProfiler(StackProfiler.class)
             .build();
        new Runner(opts).run();
    }
}

对于基准测试的代码,此处不做过多解释,第1章已经非常详细地讲解了JMH的使用。执行上面的基准测试代码,会看到两者之间的性能差异。


Benchmark                        Mode  Cnt  Score   Error  Units
AtomicReferenceExample4.cas      avgt   20  0.638 ± 0.029  us/op
AtomicReferenceExample4.sync     avgt   20  0.980 ± 0.020  us/op

通过基准测试,我们可以看到AtomicReference的性能要高出synchronized关键字30%以上。下面进一步分析线程堆栈情况。


Synchronized关键字的线程堆栈
 70.5%         BLOCKED
 28.5%         RUNNABLE
  1.1%         WAITING
AtomicReference的线程堆栈
  92.0%        RUNNABLE
  8.0%         WAITING

2.4.2 AtomicReference的基本用法

掌握了AtomicReference的使用场景之后,本节将详细介绍AtomicReference的其他方法。

AtomicReference的构造:AtomicReference是一个泛型类,它的构造与其他原子类型的构造一样,也提供了无参和一个有参的构造函数。

AtomicReference():当使用无参构造函数创建AtomicReference对象的时候,需要再次调用set()方法为AtomicReference内部的value指定初始值。

AtomicReference(V initialValue):创建AtomicReference对象时顺便指定初始值。

compareAndSet(V expect, V update):原子性地更新AtomicReference内部的value值,其中expect代表当前AtomicReference的value值,update则是需要设置的新引用值。该方法会返回一个boolean的结果,当expect和AtomicReference的当前值不相等时,修改会失败,返回值为false,若修改成功则会返回true。

getAndSet(V newValue):原子性地更新AtomicReference内部的value值,并且返回AtomicReference的旧值。

getAndUpdate(UnaryOperator<V> updateFunction):原子性地更新value值,并且返回AtomicReference的旧值,该方法需要传入一个Function接口。


AtomicReference<DebitCard> debitCardRef =
                new AtomicReference<>(new DebitCard("Alex", 0));
DebitCard preDC = debitCardRef.get();
DebitCard result = debitCardRef.getAndUpdate(dc -> new DebitCard(dc.getAccount(),                                           dc.getAmount() + 10));
// 返回之前的旧值
assert preDC == result;
// debitCardRef更新成功
assert result != debitCardRef.get();

updateAndGet(UnaryOperator<V> updateFunction):原子性地更新value值,并且返回AtomicReference更新后的新值,该方法需要传入一个Function接口。


AtomicReference<DebitCard> debitCardRef =
                new AtomicReference<>(new DebitCard("Alex", 0));
// 原子性地更新DebitCard
DebitCard newDC = debitCardRef.updateAndGet(dc -> new DebitCard(dc.getAccount(),                                          dc.getAmount() + 10));
// 更新成功
assert newDC == debitCardRef.get();
assert newDC.getAmount() == 10;

getAndAccumulate(V x, BinaryOperator<V> accumulatorFunction):原子性地更新value值,并且返回AtomicReference更新前的旧值。该方法需要传入两个参数,第一个是更新后的新值,第二个是BinaryOperator接口。


DebitCard initialVal = new DebitCard("Alex", 0);
AtomicReference<DebitCard> debitCardRef =
                new AtomicReference<>(initialVal);
DebitCard newValue = new DebitCard("Alex2", 10);
DebitCard result = debitCardRef.getAndAccumulate(newValue,                                                      (prev, newVal) -> newVal);
assert initialVal == result;
assert newValue == debitCardRef.get();

accumulateAndGet(V x, BinaryOperator<V> accumulatorFunction):原子性地更新value值,并且返回AtomicReference更新后的值。该方法需要传入两个参数,第一个是更新的新值,第二个是BinaryOperator接口。


DebitCard initialVal = new DebitCard("Alex", 0);
AtomicReference<DebitCard> debitCardRef =
                new AtomicReference<>(initialVal);
DebitCard newValue = new DebitCard("Alex2", 10);
DebitCard result = debitCardRef.accumulateAndGet(newValue,                                                      (prev, newVal) -> newVal);
assert newValue == result;
assert newValue == debitCardRef.get();

get():获取AtomicReference的当前对象引用值。

set(V newValue):设置AtomicReference最新的对象引用值,该新值的更新对其他线程立即可见。

lazySet(V newValue):设置AtomicReference的对象引用值。lazySet方法的原理已经在AtomicInteger中介绍过了,这里不再赘述。

2.4.3 AtomicReference的内幕

在AtomicReference类中,最关键的方法为compareAndSet(),下面来一探该方法的内幕。


// AtomicReference.java中的compareAndSet方法
public final boolean compareAndSet(V expect, V update) {
    return unsafe.compareAndSwapObject(this, valueOffset, expect, update);
}
// 对应于Unsafe.java中的compareAndSwapObject方法
public final native boolean compareAndSwapObject(Object obj, long offset,     Object exceptRef, Object newRef);

打开openjdk的unsafe.cpp文件,具体路径为openjdk-jdk8u/hotspot/src/share/vm/prims/unsafe.cpp。


UNSAFE_ENTRY(jboolean, Unsafe_CompareAndSwapObject(JNIEnv *env, jobject               unsafe, jobject obj, jlong offset, jobject e_h, jobject x_h))
    UnsafeWrapper("Unsafe_CompareAndSwapObject");
    oop x = JNIHandles::resolve(x_h);
    oop e = JNIHandles::resolve(e_h);
    oop p = JNIHandles::resolve(obj);
    HeapWord* addr = (HeapWord *)index_oop_from_field_offset_long(p, offset);
    oop res = oopDesc::atomic_compare_exchange_oop(x, addr, e, true);
    jboolean success  = (res == e);
    if (success)
        update_barrier_set((void*)addr, x);
    return success;
UNSAFE_END

在unsafe.cpp中,我们找到了对应的Unsafe_CompareAndSwapObject方法,该方法调用了另外一个C++方法oopDesc::atomic_compare_exchange_oop。

打开另外一个C++文件,我们会发现在内联函数中,当UseCompressedOops为true时将会调用执行与AtomicInteger一样的CAS函数Atomic::cmpxchg(),文件路径为openjdk-jdk8u/hotspot/src/share/vm/oops/oop.inline.hpp。


inline oop oopDesc::atomic_compare_exchange_oop(oop exchange_value,
                                                volatile HeapWord *dest,
                                                oop compare_value,
                                                bool prebarrier) {
    if (UseCompressedOops) {
        if (prebarrier) {
            update_barrier_set_pre((narrowOop*)dest, exchange_value);
        }
        // encode exchange and compare value from oop to T
        narrowOop val = encode_heap_oop(exchange_value);
        narrowOop cmp = encode_heap_oop(compare_value);

        narrowOop old = (narrowOop) Atomic::cmpxchg(val, (narrowOop*)dest, cmp);
        // decode old from T to oop
        return decode_heap_oop(old);
    } else {
        if (prebarrier) {
            update_barrier_set_pre((oop*)dest, exchange_value);
        }
        return (oop)Atomic::cmpxchg_ptr(exchange_value, (oop*)dest, compare_value);
  }
}

UseCompressedOops参数是JVM用于控制指针压缩的参数,一般情况下,64位的JDK版本基本上都是默认打开的(对于32位JDK的版本,该参数无效),大家可以根据jinfo -JPID查看你自己运行的JVM参数。关于Atomic::cmpxchg方法,AtomicInteger和AtomicLong中已经做过了介绍,这里不再赘述。

如果想要了解更多JVM参数,可以阅读JDK官网文档,地址如下:

https://www.oracle.com/technetwork/java/javase/tech/vmoptions-jsp-140102.html

2.4.4 AtomicReference总结

虽然AtomicReference的使用非常简单,但是很多人依然很难理解它的使用场景,网上大量的文章也只是讲述API如何使用,容易让人疑惑它存在的价值。因此在本节的一开始,我们便通过一个应用场景的演进为大家展示AtomicReference原子性操作对象引用(在并发的场景之下)所带来的性能提升,进而说明AtomicReference存在的价值和意义,紧接着我们又详细介绍了AtomicReference API方法的使用,并重点介绍了CAS算法的底层C++实现(其实在64位JDK版本中使用的汇编指令与AtomicInteger是完全一致的)。