博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
和朱晔一起复习Java并发(四):Atomic
阅读量:5262 次
发布时间:2019-06-14

本文共 15357 字,大约阅读时间需要 51 分钟。

本节我们来研究下并发包中的Atomic类型。

AtomicXXX和XXXAdder以及XXXAccumulator性能测试

先来一把性能测试,对比一下AtomicLong(1.5出来的)、LongAdder(1.8出来的)和LongAccumulator(1.8出来的)用于简单累加的性能。

程序逻辑比较简单,可以看到我们在最大并发10的情况下,去做10亿次操作测试:

@Slf4jpublic class AccumulatorBenchmark {    private StopWatch stopWatch = new StopWatch();    static final int threadCount = 100;    static final int taskCount = 1000000000;    static final AtomicLong atomicLong = new AtomicLong();    static final LongAdder longAdder = new LongAdder();    static final LongAccumulator longAccumulator = new LongAccumulator(Long::sum, 0L);    @Test    public void test() {        Map
tasks = new HashMap<>(); tasks.put("atomicLong", i -> atomicLong.incrementAndGet()); tasks.put("longAdder", i -> longAdder.increment()); tasks.put("longAccumulator", i -> longAccumulator.accumulate(1L)); tasks.entrySet().forEach(item -> benchmark(threadCount, taskCount, item.getValue(), item.getKey())); log.info(stopWatch.prettyPrint()); Assert.assertEquals(taskCount, atomicLong.get()); Assert.assertEquals(taskCount, longAdder.longValue()); Assert.assertEquals(taskCount, longAccumulator.longValue()); } private void benchmark(int threadCount, int taskCount, IntConsumer task, String name) { stopWatch.start(name); ForkJoinPool forkJoinPool = new ForkJoinPool(threadCount); forkJoinPool.execute(() -> IntStream.rangeClosed(1, taskCount).parallel().forEach(task)); forkJoinPool.shutdown(); try { forkJoinPool.awaitTermination(1, TimeUnit.HOURS); } catch (InterruptedException e) { e.printStackTrace(); } stopWatch.stop(); }}

结果如下:

image_1dg6kqfra1unpqomj3v1v3l1ktu9.png-78.3kB

和官网说的差不多,在高并发的情况下LongAdder性能会比AtomicLong好很多。

AtomicReference可见性问题测试

在很多开源代码中我们有看到AtomicReference的身影,它究竟是干什么的呢?我们来写一段测试程序,在这个程序中我们定一了一个Switch类型,作为一个开关,然后写三个死循环的线程来测试,当开关有效的时候会持续死循环,在2秒后关闭所有的三个开关:

  • 第一个是普通的Switch
  • 第二个是使用了volatile声明的Switch
  • 第三个是AtomicReference包装的Switch
@Slf4jpublic class AtomicReferenceTest {    private Switch rawValue = new Switch();    private volatile Switch volatileValue = new Switch();    private AtomicReference
atomicValue = new AtomicReference<>(new Switch()); @Test public void test() throws InterruptedException { new Thread(() -> { log.info("Start:rawValue"); while (rawValue.get()) { } log.info("Done:rawValue"); }).start(); new Thread(() -> { log.info("Start:volatileValue"); while (volatileValue.get()) { } log.info("Done:volatileValue"); }).start(); new Thread(() -> { log.info("Start:atomicValue"); while (atomicValue.get().get()) { } log.info("Done:atomicValue"); }).start(); Executors.newSingleThreadScheduledExecutor().schedule(rawValue::off, 2, TimeUnit.SECONDS); Executors.newSingleThreadScheduledExecutor().schedule(volatileValue::off, 2, TimeUnit.SECONDS); Executors.newSingleThreadScheduledExecutor().schedule(atomicValue.get()::off, 2, TimeUnit.SECONDS); TimeUnit.HOURS.sleep(1); } class Switch { private boolean enable = true; public boolean get() { return enable; } public void off() { enable = false; } }}

运行程序:

image_1dg6ll5s41iaim9u1ijk56j1n2tm.png-125.5kB
可以看到2秒后有一个开关卡住了,线程没有退出。这是一个可见性的问题,AtomicReference以及volatile可以确保线程对数据的更新刷新到内存。因为我们对于开关的关闭是在另一个定时任务线程做的,如果我们不使用volatile或AtomicReference来定义对象,那么对象的操作可能无法被其它线程感知到。当然,AtomicReference除了解决可见性问题还有更多AtomicXXX提供的其它功能。

AtomicInteger测试

下面我们来看一下AtomicInteger的compareAndSet()功能。首先说明这个程序没有任何意义,只是测试一下功能。在这个程序里,我们乱序开启10个线程,每一个线程的任务就是按照次序来累加数字。我们使用AtomicInteger的compareAndSet()来确保乱序的线程也能按照我们要的顺序操作累加。

@Slf4jpublic class AtomicIntegerTest {    @Test    public void test() throws InterruptedException {        AtomicInteger atomicInteger = new AtomicInteger(0);        List
threadList = IntStream.range(0,10).mapToObj(i-> { Thread thread = new Thread(() -> { log.debug("Wait {}->{}", i, i+1); while (!atomicInteger.compareAndSet(i, i + 1)) { try { TimeUnit.MILLISECONDS.sleep(50); } catch (InterruptedException e) { e.printStackTrace(); } } log.debug("Done {}->{}", i, i+1); }); thread.setName(UUID.randomUUID().toString()); return thread; }).sorted(Comparator.comparing(Thread::getName)).collect(Collectors.toList()); for (Thread thread : threadList) { thread.start(); } for (Thread thread : threadList) { thread.join(); } log.info("result:{}", atomicInteger.get()); }}

执行结果如下:

11:46:30.611 [2c80b367-d80e-46b5-94f5-b7b172e79dad] DEBUG me.josephzhu.javaconcurrenttest.atomic.AtomicIntegerTest - Wait 4->511:46:30.611 [7bccbb54-4573-4b77-979b-840613406428] DEBUG me.josephzhu.javaconcurrenttest.atomic.AtomicIntegerTest - Wait 5->611:46:30.612 [c0792831-6201-4f6c-b702-79c1b798c3aa] DEBUG me.josephzhu.javaconcurrenttest.atomic.AtomicIntegerTest - Wait 9->1011:46:30.612 [949b0c26-febb-4830-ad98-f43521ce4382] DEBUG me.josephzhu.javaconcurrenttest.atomic.AtomicIntegerTest - Wait 7->811:46:30.613 [ccc05b0f-11da-41fa-b8fc-59a90dfc2250] DEBUG me.josephzhu.javaconcurrenttest.atomic.AtomicIntegerTest - Wait 6->711:46:30.611 [037e9595-73cb-4aa1-afee-4250347746c8] DEBUG me.josephzhu.javaconcurrenttest.atomic.AtomicIntegerTest - Wait 3->411:46:30.611 [4f15d9ce-044e-4657-b418-4874d03e5d22] DEBUG me.josephzhu.javaconcurrenttest.atomic.AtomicIntegerTest - Wait 1->211:46:30.611 [3a96c35c-bc4e-45f4-aae4-9fd8611acaea] DEBUG me.josephzhu.javaconcurrenttest.atomic.AtomicIntegerTest - Wait 8->911:46:30.611 [94465214-27bf-4543-80e2-dbaeeb6ddc94] DEBUG me.josephzhu.javaconcurrenttest.atomic.AtomicIntegerTest - Wait 0->111:46:30.611 [60f9cb50-21e6-45bc-9b4d-867783ab033b] DEBUG me.josephzhu.javaconcurrenttest.atomic.AtomicIntegerTest - Wait 2->311:46:30.627 [94465214-27bf-4543-80e2-dbaeeb6ddc94] DEBUG me.josephzhu.javaconcurrenttest.atomic.AtomicIntegerTest - Done 0->111:46:30.681 [4f15d9ce-044e-4657-b418-4874d03e5d22] DEBUG me.josephzhu.javaconcurrenttest.atomic.AtomicIntegerTest - Done 1->211:46:30.681 [60f9cb50-21e6-45bc-9b4d-867783ab033b] DEBUG me.josephzhu.javaconcurrenttest.atomic.AtomicIntegerTest - Done 2->311:46:30.734 [037e9595-73cb-4aa1-afee-4250347746c8] DEBUG me.josephzhu.javaconcurrenttest.atomic.AtomicIntegerTest - Done 3->411:46:30.780 [2c80b367-d80e-46b5-94f5-b7b172e79dad] DEBUG me.josephzhu.javaconcurrenttest.atomic.AtomicIntegerTest - Done 4->511:46:30.785 [7bccbb54-4573-4b77-979b-840613406428] DEBUG me.josephzhu.javaconcurrenttest.atomic.AtomicIntegerTest - Done 5->611:46:30.785 [ccc05b0f-11da-41fa-b8fc-59a90dfc2250] DEBUG me.josephzhu.javaconcurrenttest.atomic.AtomicIntegerTest - Done 6->711:46:30.787 [949b0c26-febb-4830-ad98-f43521ce4382] DEBUG me.josephzhu.javaconcurrenttest.atomic.AtomicIntegerTest - Done 7->811:46:30.838 [3a96c35c-bc4e-45f4-aae4-9fd8611acaea] DEBUG me.josephzhu.javaconcurrenttest.atomic.AtomicIntegerTest - Done 8->911:46:30.890 [c0792831-6201-4f6c-b702-79c1b798c3aa] DEBUG me.josephzhu.javaconcurrenttest.atomic.AtomicIntegerTest - Done 9->1011:46:30.890 [main] INFO me.josephzhu.javaconcurrenttest.atomic.AtomicIntegerTest - result:10

可以看到,Wait的输出是乱序的,最后Done的输出是顺序的。

AtomicStampedReference测试

AtomicStampedReference可以用来解决ABA问题,什么是ABA问题我们看这个例子:

线程1读取了数字之后,等待1秒,然后尝试把1修改为3。
线程2后启动,读取到数字1后修改2,稍等一下又修改回1。
虽然AtomicInteger确保多个线程的原子性操作,但是无法确保1就是原先读取到的那个1,没有经过别人修改。
可以再换一个例子来说,如果我们现在账上有100元,要修改为200元,在修改之前账户已经被操作过了从100元充值到了150然后提现到了100,虽然最后还是回到了100,但是这个时候严格一点的话,我们应该认为这个100不是原先的100,这个账户的版本发生了变化,如果我们使用乐观行锁的话,虽然余额都是100但是行锁的版本肯定不一致,AtomicStampedReference就是类似行乐观锁的概念。

@Test    public void test() throws InterruptedException {        AtomicInteger atomicInteger = new AtomicInteger(1);        Thread thread1 = new Thread(() -> {            int value = atomicInteger.get();            log.info("thread 1 read value: " + value);            try {                TimeUnit.SECONDS.sleep(1);            } catch (InterruptedException e) {                e.printStackTrace();            }            if (atomicInteger.compareAndSet(value, 3)) {                log.info("thread 1 update from " + value + " to 3");            } else {                log.info("thread 1 update fail!");            }        });        thread1.start();        Thread thread2 = new Thread(() -> {            int value = atomicInteger.get();            log.info("thread 2 read value: " + value);            if (atomicInteger.compareAndSet(value, 2)) {                log.info("thread 2 update from " + value + " to 2");                try {                    TimeUnit.MILLISECONDS.sleep(100);                } catch (InterruptedException e) {                    e.printStackTrace();                }                value = atomicInteger.get();                log.info("thread 2 read value: " + value);                if (atomicInteger.compareAndSet(value, 1)) {                    log.info("thread 2 update from " + value + " to 1");                }            }        });        thread2.start();        thread1.join();        thread2.join();    }

看下运行结果:

11:56:20.373 [Thread-1] INFO me.josephzhu.javaconcurrenttest.atomic.AtomicStampedReferenceTest - thread 2 read value: 111:56:20.381 [Thread-1] INFO me.josephzhu.javaconcurrenttest.atomic.AtomicStampedReferenceTest - thread 2 update from 1 to 211:56:20.373 [Thread-0] INFO me.josephzhu.javaconcurrenttest.atomic.AtomicStampedReferenceTest - thread 1 read value: 111:56:20.483 [Thread-1] INFO me.josephzhu.javaconcurrenttest.atomic.AtomicStampedReferenceTest - thread 2 read value: 211:56:20.484 [Thread-1] INFO me.josephzhu.javaconcurrenttest.atomic.AtomicStampedReferenceTest - thread 2 update from 2 to 111:56:21.386 [Thread-0] INFO me.josephzhu.javaconcurrenttest.atomic.AtomicStampedReferenceTest - thread 1 update from 1 to 3

下面我们使用AtomicStampedReference来修复这个问题:

@Testpublic void test2() throws InterruptedException {    AtomicStampedReference
atomicStampedReference = new AtomicStampedReference<>(1, 1); Thread thread1 = new Thread(() -> { int[] stampHolder = new int[1]; int value = atomicStampedReference.get(stampHolder); int stamp = stampHolder[0]; log.info("thread 1 read value: " + value + ", stamp: " + stamp); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } if (atomicStampedReference.compareAndSet(value, 3, stamp, stamp + 1)) { log.info("thread 1 update from " + value + " to 3"); } else { log.info("thread 1 update fail!"); } }); thread1.start(); Thread thread2 = new Thread(() -> { int[] stampHolder = new int[1]; int value = atomicStampedReference.get(stampHolder); int stamp = stampHolder[0]; log.info("thread 2 read value: " + value + ", stamp: " + stamp); if (atomicStampedReference.compareAndSet(value, 2, stamp, stamp + 1)) { log.info("thread 2 update from " + value + " to 2"); try { TimeUnit.MILLISECONDS.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } value = atomicStampedReference.get(stampHolder); stamp = stampHolder[0]; log.info("thread 2 read value: " + value + ", stamp: " + stamp); if (atomicStampedReference.compareAndSet(value, 1, stamp, stamp + 1)) { log.info("thread 2 update from " + value + " to 1"); } value = atomicStampedReference.get(stampHolder); stamp = stampHolder[0]; log.info("thread 2 read value: " + value + ", stamp: " + stamp); } }); thread2.start(); thread1.join(); thread2.join();}

运行结果如下:

11:59:11.946 [Thread-1] INFO me.josephzhu.javaconcurrenttest.atomic.AtomicStampedReferenceTest - thread 2 read value: 1, stamp: 111:59:11.951 [Thread-1] INFO me.josephzhu.javaconcurrenttest.atomic.AtomicStampedReferenceTest - thread 2 update from 1 to 211:59:11.946 [Thread-0] INFO me.josephzhu.javaconcurrenttest.atomic.AtomicStampedReferenceTest - thread 1 read value: 1, stamp: 111:59:12.053 [Thread-1] INFO me.josephzhu.javaconcurrenttest.atomic.AtomicStampedReferenceTest - thread 2 read value: 2, stamp: 211:59:12.053 [Thread-1] INFO me.josephzhu.javaconcurrenttest.atomic.AtomicStampedReferenceTest - thread 2 update from 2 to 111:59:12.053 [Thread-1] INFO me.josephzhu.javaconcurrenttest.atomic.AtomicStampedReferenceTest - thread 2 read value: 1, stamp: 311:59:12.954 [Thread-0] INFO me.josephzhu.javaconcurrenttest.atomic.AtomicStampedReferenceTest - thread 1 update fail!

可以看到,现在我们修改数据的时候不仅仅是拿着值来修改了,还要提供版本号,读取数据的时候可以读取到数据以及版本号。这样的话,虽然数值不变,但是线程2经过两次修改后数据的版本从1变为了3,回过头来线程1再要拿着版本号1来修改数据的话必然失败。

一个有趣的问题

本文比较短,我们再来看网友之前问的一个有意思的问题,程序如下。

@Slf4jpublic class InterestingProblem {    int a = 1;    int b = 1;    void add() {        a++;        b++;    }    void compare() {        if (a < b)            log.info("a:{},b:{},{}", a, b, a>b);    }    @Test    public void test() throws InterruptedException {        new Thread(() -> {            while (true)                add();        }).start();        new Thread(() -> {            while (true)                compare();        }).start();        TimeUnit.MILLISECONDS.sleep(100);    }}

这位网友是这么问的,他说见鬼了,不但能看到日志输出,而且我发现之前判断过一次a<b,之后输出a>b居然是成立的,结果里可以看到true,JVM出现Bug了可能:

image_1dg6ptemuabi6le5hbjugtv61g.png-222.3kB

他觉得a和b不是静态的,为啥会出现并发问题呢于是问了同事:

  • 同事A说是肯定多线程问题,加volatile可以解决,但是他发现为a和b加上volatile也不行
  • 同事B说是AtomicInteger可以解决并发性问题,但是把a和b都用上了AtomicInteger也没用
  • 同事C貌似看出了问题说需要锁,为add()方法增加synchronized关键字锁一下,但是也没用

这位网友其实是没有搞清楚多线程情况下,可见性问题、原子性问题解决的事情,同事也把各种并发的概念混淆在一起了。

我们这么来看这段代码,这段代码里一个线程不断操作a和b进行累加操作,一个线程判断a和b,然后输出结果。出现这个问题的原因本质上是因为a<b是三步操作,取a,取b以及比较,不是原子性的,在整个过程中可能穿插了add线程的操作a和b。如果先获取a,然后a++ b++,然后获取b,这个时候a<b,如果先a++,然后获取a,获取b,最后b++,这个时候a>b。我们来看一下compare()方法的字节码,可以很明显看到ab的比较分明是4行指令,我们不能以代码行数来判断操作是否是原子的,不是原子意味着操作过程中可能被穿插了其它线程的其它代码:

0 aload_0 1 getfield #2 
4 aload_0 5 getfield #3
8 if_icmpge 67 (+59)11 getstatic #4
14 ldc #5
16 iconst_317 anewarray #6
20 dup21 iconst_022 aload_023 getfield #2
26 invokestatic #7
29 aastore30 dup31 iconst_132 aload_033 getfield #3
36 invokestatic #7
39 aastore40 dup41 iconst_242 aload_043 getfield #2
46 aload_047 getfield #3
50 if_icmple 57 (+7)53 iconst_154 goto 58 (+4)57 iconst_058 invokestatic #8
61 aastore62 invokeinterface #9
count 367 return

所以这位网友的理解有几个问题:

  • 多线程操作的对象安全不安全和对象是否静态没关系,即使不是static的也可能会并发被多个线程来操作
  • 不能根据代码行数或代码是否简单来判断代码是否原子的,别说Java代码了,就是字节码也不行

我们再来看看他三位同事的说法:

  • 同事A可能没有彻底理解volatile的作用,现在是两个线程操作a和b相互交错干扰,加上了volatile只会让问题更严重(你可以写一段代码对比下加上和不加上volatile最后出现true的概率),这不是可见性问题
  • 同事B可能也没仔细考虑AtomicInteger的作用,AtomicInteger是用来实现多线程情况下原子性操作Integer,现在并没有多个线程来并发修改a和b,使用AtomicInteger不能解决问题
  • 同事C貌似是看到了问题的所在,但是他也没理清楚,add()方法仅仅只有一个线程在执行为这个方法加上锁是没有用的,现在的问题在于add()和compare()的干扰,它们需要串行执行才能确保a和b整体的完整

所以要进行简单修复这个问题的话就是为add()和compare()都加上synchronized关键字,除了这个锁的方式有没有其它方式呢?你可以想想。

小结

本文简单测试了一下java.util.concurrent.atomic包下面的一些常用Atomic操作类,最后分享了一个网友的问题和疑惑,希望文本对你有用。

同样,代码见,欢迎clone后自己把玩,欢迎点赞。

欢迎关注我的微信公众号:随缘主人的园子

image_1dfvp8d55spm14t7erkr3mdbscf.png-45kB

转载于:https://www.cnblogs.com/lovecindywang/p/11220228.html

你可能感兴趣的文章
程序员究竟该如何提高效率zt
查看>>
希尔排序法(缩小增量法)
查看>>
PHP编程基础学习(一)——数据类型
查看>>
MongoDB-JAVA-Driver 3.2版本常用代码全整理(2) - 查询
查看>>
NPOI处理Word文本中上下角标
查看>>
Android笔记 Handler
查看>>
如何阅读大型前端开源项目的源码(转)
查看>>
java.util.Arrays类详解
查看>>
idea搭建tocmat
查看>>
NYOJ-626-intersection set(二分查找)
查看>>
项目管理之路(1):初步踏入项目管理
查看>>
Java 中 静态方法与非静态方法的区别
查看>>
echarts饼图显示百分比
查看>>
JMS消息
查看>>
Jenkins+ProGet+Windows Batch搭建全自动的内部包(NuGet)打包和推送及管理平台
查看>>
php上传文件及头像预览
查看>>
大四java实习生的一些经历
查看>>
线程池的概念
查看>>
Oracle_Statspack性能诊断工具
查看>>
转获取sql维护的表关系
查看>>