JUC详解

基础知识

一、什么是JUC

JUC指的是:Java里的三个包
java.util.concurrent
java.util.concurrent.atomic:原子性
java.util.concurrent.locks:lock锁

二、线程和进程

  • 进程

程序执行的一次过程,一个进程包含一个或多个线程。进程是资源分配的单位。

  • 线程

可以指程序执行过程中,负责实现某个功能的单位。线程是CPU调度和执行的单位。

三 并发和并行

  • 并发

同一时刻,多个线程交替执行。(一个CPU交替执行线程)

  • 并行

同一时刻,多个线程同时执行。(多个CPU同时执行多个线程)

四 线程的几种状态

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public enum State {
/**
* 尚未启动的线程的线程状态。新生
*/
NEW,
/**
* 可运行线程的线程状态。
* 处于可运行状态的线程,正在Java虚拟机中执行,
* 但它可能正在等待来自操作系统的其他资源,例如处理器。运行
*/
RUNNABLE,
/**
* 等待监视器锁而阻塞的线程的线程状态。
* 处于阻塞状态的线程正在等待监视器锁进入同步块/方法,或者在调用后重新进入同步块/方法。阻塞
*/
BLOCKED,
/**
* 等待线程的线程状态。
* 线程处于等待状态,因为调用了一个以下方法:
* Object.wait、Thread.join、LockSupport.park。等待,死死的等待
*/
WAITING,
/**
* 具有指定等待时间的等待线程的线程状态。
* 线程处于定时等待状态,因为调用了以下方法与指定的正等待时间:
* Thread.sleep、Object.wait(long)、Thread.join(long)、LockSupport.parkNanos、LockSupport.parkUntil
* 超时等待
*/
TIMED_WAITING,
/**
* 终止线程的线程状态。
* 线程已完成执行。终止
*/
TERMINATED;
}

五.sleep和wait的区别

  1. sleep是Thread类的本地方法;wait是Object类的方法。
  2. sleep不释放锁;wait释放锁。
  3. sleep不需要和synchronized关键字一起使用;wait必须和synchronized代码块一起使用。
  4. sleep不需要被唤醒(时间到了自动退出阻塞);wait需要被唤醒。
  5. sleep一般用于当前线程休眠,或者轮循暂停操作;wait则多用于多线程之间的通信。
  6. sleep必须捕获异常,wait也需要捕获异常

tips:

  • 并发;多线程操作同一个资源类,把资源丢到线程中就行了
  • @FuncationalIterface 函数式接口 用lambda表达式 (参数)->{ 代码}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package com.yxq;

public class hhh {
public static void main(String[] args) {
Ticek ticek=new Ticek();
new Thread(()->{
for (int i = 0; i < 50; i++) {
ticek.sale();
}
},"A").start();

new Thread(()->{
for (int i = 0; i < 50; i++) {
ticek.sale();
}
},"B").start();

new Thread(()->{
for (int i = 0; i < 50; i++) {
ticek.sale();
}
},"C").start();
}
}
//资源类
class Ticek{
//属性,方法
private int number=50;

//给该方法加锁
public synchronized void sale(){
if (number>0){
System.out.println(Thread.currentThread().getName()+"卖出了第"+
(number--)+"张票,还剩"+number+"张票");
}
}
}

一、Lock锁

公平锁和非公平锁(锁的底层)

公平锁:十分公平,不能插队。
非公平锁:十分不公平,可以插队。(默认非公平锁)

Lock锁是一个接口,他有三个实现类:

  • ReentrantLock类
  • ReentrantReadWriteLock.ReadLock
  • ReentrantReadWriteLock.WriteLock

什么是可重入锁

可重入锁是指同一个线程可以多次获得同一把锁而不会出现死锁。在Java中,synchronized关键字和ReentrantLock都是可重入锁的实现。

可重入性的意义:

当一个线程获得了一个锁,再次请求这个锁时仍然会得到该锁。这样的机制防止了死锁,也使得同一个线程可以多次进入它已经拥有的锁所同步的代码块。

示例代码:

使用synchronized关键字实现可重入锁:

1
2
3
4
5
6
7
8
9
javapublic class ReentrantExample {
public synchronized void outer() {
inner();
}

public synchronized void inner() {
// 这里可以访问同步的代码块
}
}

在这个例子中,outer() 方法和 inner() 方法都是同步方法,它们使用的是同一把锁,即this。如果一个线程已经获得了 outer() 方法的锁,那么在调用 inner() 方法时,由于它也是同步方法,所以不会因为同一把锁而发生阻塞。

Lock锁和synchronized的区别

  1. Synchronized是内置Java关键字;Lock是一个Java类。
  2. Synchronized无法判断获取锁的状态;Lock可以判断是否获取到了锁。(boolean b = lock.tryLock();)
  3. Synchronized会自动释放锁;Lock必须要手动释放锁,如果不释放锁,死锁。
  4. Synchronized线程1获得锁阻塞时,线程2会一直等待下去;Lock锁线程1获得锁阻塞时,线程2等待足够长的时间后中断等待,去做其他的事。
  5. Synchronized可重入锁,不可以中断的,非公平;Lock,可重入锁,可以判断锁,非公平(可以自己设置)。
    lock.lockInterruptibly();方法:当两个线程同时通过该方法想获取某个锁时,假若此时线程A获取到了锁,而线程B只有在等待,那么对线程B调用threadB.interrupt()方法能够中断线程B的等待过程。
  6. Synchronized适合锁少量的代码同步问题;Lock适合锁大量的同步代码。

1)生产者消费者

生产者和消费者问题:synchronized版

如果使用if 代替while 会产生什么问题?

A、B、C、D四个线程!虚假唤醒问题(用if的时候)

虚假唤醒在下面的例子中就是:

  • A(生产者) 让货物加一,C(生产者) 让货物加一,A先获得该锁,C只能等待,当A释放锁时会通知BCD,如果是C拿到锁,他if判断已经结束了,所以会让货物再加一。如果是用while的话,就会再判断物品数量是不是等于0,就不会给C获取锁的机会
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
public class Demo04 {
public static void main(String[] args) {
Data data = new Data();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
data.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "A").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
data.decrement();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "B").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
data.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "C").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
data.decrement();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "D").start();
}
}
// 判断等待,业务,通知
class Data {
//i表示生产的物品的数量(数量为一,消费者可以消费,数量为0,生产者可以生产)
private int i = 0;
// +1
public synchronized void increment() throws InterruptedException {
while (i != 0) {
//这里表示现在货物不为0 暂时不需要生产者生产
this.wait();
}
i++;
System.out.println(Thread.currentThread().getName() + "=>" + i);
// 通知其他线程我+1完成
this.notifyAll();
}
// -1
public synchronized void decrement() throws InterruptedException {
while (i==0){
//这里表示货物为0,需要等待生产者生产
this.wait();
}
i--;
System.out.println(Thread.currentThread().getName() + "=>" + i);
// 通知其他线程,我-1完毕
this.notifyAll();
}
}

生产者和消费者问题:JUC版(用lock来实现)

用**condition.await()**来 替换 this.wait()

用 **condition.signalAll()**来替换 this.notifyAll()

  • 还可以用condition来执行通知谁来接收锁 eg: A释放的锁,指定通知C(前提是C在等待通知),让C来获取该锁
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
public class Demo04 {
public static void main(String[] args) {
Data data = new Data();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
data.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "A").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
data.decrement();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "B").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
data.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "C").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
data.decrement();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "D").start();
}
}
// 判断等待,业务,通知
class Data {
private int i = 0;
Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();
// +1
public void increment() throws InterruptedException {
lock.lock();
try {
while (i != 0) {
condition.await();
}
i++;
System.out.println(Thread.currentThread().getName() + "=>" + i);
// 通知其他线程我+1完成
condition.signalAll();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
// -1
public void decrement() throws InterruptedException {
lock.lock();
try {
while (i==0){
condition.await();
}
i--;
System.out.println(Thread.currentThread().getName() + "=>" + i);
// 通知其他线程,我-1完毕
condition.signalAll();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}

2)线程八锁问题

  1. synchronized作用于成员变量和非静态方法时,锁住的是对象的实例,即this对象
  2. synchronized作用于静态方法时,锁住的是Class实例
  3. synchronized作用于一个代码块时,锁住的是所有代码块中配置的对象,当一个线程试图访问同步代码块时,它首先必须得到锁,退出或抛出异常时必须释放锁。

所谓的“线程八锁”其实就是考察synchronized锁住的是哪个对象

总结:

对象锁和类锁(this/class)是两个不同的对象,所以静态同步方法和非静态同步方法之间是不会有竞争条件的。但一旦静态方法获取锁后,其他的静态同步方法都必须等待该方法释放锁后才能获取锁。

结论

  • 当一个线程试图访问同步代码块时,它首先必须得到锁,退出或抛出异常时必须释放锁。
  • 也就是说如果一个实例对象的普通同步方法获取锁后,该实例对象的其他普通同步方法必须等待获取锁的方法释放锁后才能获取锁,可是别的实例对象的普通同步方法因为跟该实例对象的普通同步方法用的是不同的锁,所以无需等待该实例对象已获取锁的普通同步方法释放锁就可以获取他们自己的锁。
  • 静态同步锁 与 普通同步锁 互相不干扰
  • 所有的静态同步方法用的也是同一把锁–类对象本身,
  • 这两把锁(this/class)是两个不同的对象,所以静态同步方法与非静态同步方法之间是不会有静态条件的。但是一旦一个静态同步方法获取锁后,其他的静态同步方法都必须等待该方法释放锁后才能获取锁。而不管是同一个实例对象的静态同步方法之间,还是不同的实例对象的静态同步方法之间,只要它们是同一个类的实例对象

3)集合类不安全问题

1. ArryList集合

1
2
3
4
5
6
7
8
9
10
11
12
// java.util.ConcurrentModificationException:并发修改异常
public class Test11 {
public static void main(String[] args) {
List<String> strings = new ArrayList<>();
for (int i = 0; i < 10; i++) {
new Thread(()->{
strings.add(UUID.randomUUID().toString().substring(0,5));
System.out.println(strings);
}).start();
}
}
}

出现并发修改异常原因

  1. 一个线程正在遍历列表,而另一个线程在同时修改列表,这可能导致遍历线程感知到列表的结构发生了变化,从而抛出 ConcurrentModificationException 异常。
  2. 多个线程同时在列表的末尾添加元素,这可能导致两个线程尝试修改列表的结构,从而发生竞态条件。

解决方案:

1
2
3
4
List<String> list = new Vector<>();//底层add()用了synchronized,是安全的
List<String> strings = Collections.synchronizedList(new ArrayList<>()); //和上面的这个类似
//推荐使用的是CopyOnWrite
List<String> strings = new CopyOnWriteArrayList<>(); //

概念:CopyOnWrite写入时复制,计算机程序设计语言的一种优化策略。(保证效率和性能问题)

  1. 初始化:
    • 初始时,CopyOnWriteArrayList 内部使用一个可变数组来存储元素。
  2. 写时复制:
    • 当有写操作(添加、修改、删除)发生时,不直接在当前数组上进行修改,而是先复制一份当前数组的副本。
    • 所有修改操作都在这个副本上进行,而原始数组保持不变。
  3. 修改副本:
    • 在副本上进行修改操作是非常高效的,因为它是一个普通的可变数组,没有锁或其他同步机制。
    • 所以,修改操作的复杂度是 O(1)。
  4. 替换原始数组:
    • 当修改完成后,将新的数组替换原始数组。
    • 这个替换操作是原子的,确保在替换过程中没有其他线程能够同时访问原始数组。
  5. 读操作:
    • 由于读操作不涉及数组的修改,因此可以直接在原始数组上进行,而不需要加锁。
    • 这使得读操作的性能很高,复杂度是 O(1)。

总结:CopyOnWriteArrayList 更适用于读多写少的场景,因为读操作不会堵塞,多个线程可以同时读取数据,而写操作则通过复制数组来避免并发修改的问题。

2. HashSet集合

多线程下不安全;可能会报错:java.util.ConcurrentModificationException(并发修改异常)

1
2
3
4
5
6
7
8
9
10
11
12
13
// java.util.ConcurrentModificationException:并发修改异常
public class Test11 {
public static void main(String[] args) {
// Set<String> strings = Collections.synchronizedSet(new HashSet<>());
HashSet<String> strings = new HashSet<>();
for (int i = 0; i < 10; i++) {
new Thread(()->{
strings.add(UUID.randomUUID().toString().substring(0,5));
System.out.println(strings);
}).start();
}
}
}

解决方案:

1
2
1. Set<String> strings = Collections.synchronizedSet(new HashSet<>());
2. Set<String> strings = new CopyOnWriteArraySet<>();

hashset集合的底层是hashmap的key,因为hashmap的key是不可重复的。

1
2
3
public HashSet() {
map = new HashMap<>();
}

3.HashMap集合

多线程下不安全;可能会报错:java.util.ConcurrentModificationException(并发修改异常)

1
2
3
4
5
6
7
8
9
10
11
12
13
// java.util.ConcurrentModificationException:并发修改异常
public class Test11 {
public static void main(String[] args) {
// 默认相当于
Map<String, String> map = new HashMap<>(16, 0.75F);
for (int i = 0; i < 10; i++) {
new Thread(()->{
map.put(Thread.currentThread().getName(),UUID.randomUUID().toString().substring(0,5));
System.out.println(map);
}).start();
}
}
}

什么是负载因子,什么是初始容量

  • 负载因子是一个表示哈希表占用程度的参数,通常用于衡量已经存储的元素数量与哈希表容量的比率,负载因子默认为 (0.75)
  • HashSet 在创建时需要指定一个初始容量,这是哈希表的大小。如果不提供初始容量,将使用默认的初始容量。在 HashSet 中,如果使用默认构造方法创建对象,**初始容量默认为 (16)**。

负载因子和初始容量的作用

  • 减少哈希冲突:
  • 动态调整哈希表大小:
  • 性能平衡

解决方案:

1
Map<String, String> concurrentHashMap = new ConcurrentHashMap<>();

ConcurrentHashMap 底层原理

  1. 分段锁设计(Segmentation):
    • ConcurrentHashMap 内部维护了一个分段数组(Segment),每个分段就是一个小的 HashMap,拥有独立的锁。这样,在大多数情况下,多个线程可以同时访问不同的分段,从而提高并发性能。
  2. 并发度(Concurrency Level):
    • ConcurrentHashMap 允许通过构造方法指定并发度,即分段的数量。这个并发度实际上就是 ConcurrentHashMap 内部分段数组的大小。在多线程环境下,不同的线程可以同时访问不同的分段,提高了并发性。
  3. 锁粒度降低:
    • 相比于使用单一锁的传统 HashMapConcurrentHashMap 的分段设计降低了锁的粒度。只有在对同一个分段进行修改操作时,才需要加锁,这样就减小了锁的竞争范围,提高了并发度。
  4. put 操作的原子性:
    • ConcurrentHashMapput 操作是原子性的,但不是整个表的锁,而是在单个分段上进行的。这使得多个线程可以在不同的分段上并发执行 put 操作,而不会相互阻塞。
  5. get 操作的无锁并发:
    • get 操作是无锁的,即可以在没有加锁的情况下并发执行。这是因为在获取元素时,并不涉及到对分段的修改操作,因此多个线程可以同时执行 get 操作。
  6. 同步机制:
    • ConcurrentHashMap 使用一种类似于读写锁的机制来保证在对数据进行修改时的同步,以及在读取数据时的并发性。这种机制在读操作上具有较低的开销,允许多个线程同时读取,而写操作会涉及到锁的竞争。
  7. 扩容机制:
    • ConcurrentHashMap 在扩容时,并不会对整个表进行扩容,而是只扩容其中的一部分分段。这样,扩容的开销相对较小,而且不会影响整个表的并发性能。

用自己的话简述就是:

ConcurrentHashMap 底层维护了一个分段数组,每个分段就是一个hashmap,拥有自己的独立的锁,所以安全,get操作是无锁的,多线程可以同时访问,多个线程可以在不同的分段上并发执行 put 操作,而不会相互阻塞。允许多个线程同时读取,而写操作会涉及到锁的竞争。

二 、Callable接口

Callable接口类似于Runnable接口,线程第三种创建方式。

  1. 可以抛出异常。
  2. 可以有返回值。
  3. 方法不同与Runnable接口。Call方法
  4. call()返回值类型就是callable的泛型
1
2
3
4
5
6
//Callable接口源码
@FunctionalInterface
public interface Callable<V> {

V call() throws Exception;
}
  • Thread()不能直接接受callable的实现类,只能接受Runable的实现类

  • FutureTask是Runable的实现类

  • FutureTask()可以由Callable接口的实现类来构造

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class CallableTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
FutureTask futureTask = new FutureTask(new MyThread());// 适配类
new Thread(futureTask,"A").start();
new Thread(futureTask,"B").start();// 打印一个Call,结果会被缓存,提高效率
Integer s = (Integer) futureTask.get();// get方法可能会产生阻塞
System.out.println(s);
}
}
class MyThread implements Callable<Integer>{
@Override
public Integer call(){
System.out.println("Call");
return 1024;
}
}

为什么上述代码只会输出一次call,明明有两个线程一起执行了MyThread

  • 这段代码只打印一个 “Call” 是因为 FutureTask 只会执行一次 MyThread 中的 call() 方法。虽然你在两个不同的线程中启动了同一个 FutureTask,但是 FutureTask 内部只会执行一次 call() 方法。
    • 当你创建了两个线程并分别使用同一个 FutureTask 时,这两个线程都会尝试执行 FutureTask 中 call() 方法,但实际上只有第一个线程成功执行了 call() 方法,而第二个线程并不会执行,因为 FutureTask 内部已经缓存了 call() 方法的结果。
    • 这是因为 FutureTask 内部的状态在执行一次后就被设定为 “Done”,结果被缓存起来了。因此,尽管你尝试启动多个线程去执行同一个 FutureTask,但实际上只有第一个线程会执行 call() 方法,而后续线程会直接获取缓存的结果,不会再次执行 call() 方法,因此只会打印一个 “Call”。

简述:

  • 多个线程启动同一个FutureTask,FutureTask 内部只会执行一次Callable实现类的里面的方法,并且将返回值缓存起来,下次在调用FutureTask,直接返回返回值。

三、常用辅助类(必须掌握)

1.CountDownLatch

应用场景:

  1. 多线程任务汇总。

  2. 多线程任务阻塞住,等待发令枪响,一起执行。

每次有线程调用,数量-1,当计数器归零,countDownLatch.await()就会被唤醒向下执行。

如果计数器没有归零,就是一直阻塞

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 计数器
public class CountDownLatchDemo {
public static void main(String[] args) throws InterruptedException {
// 总数是6,必须要是执行任务的时候使用
CountDownLatch countDownLatch = new CountDownLatch(6);
for (int i = 0; i < 6; i++) {
new Thread(()->{
System.out.println(Thread.currentThread().getName()+"=>Go Out");
countDownLatch.countDown();// 数量-1
}).start();
}
countDownLatch.await();// 等待计数器归零,然后再往下执行
System.out.println("关门");
}
}

image-20231206171336224

2.CyclicBarrier

应用场景:比如LOL类游戏,满10人一组,开始游戏。 少一个都不行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// 相当于加法计数器
public class CyclicBarrierDemo {
public static void main(String[] args) {
// 集齐七颗龙珠召唤神龙
CyclicBarrier cyclicBarrier = new CyclicBarrier(7, () -> {// 如果计数器为7,线程只有6个,则会等待,不进行召唤神龙
System.out.println("召唤神龙");
});
for (int i = 0; i < 7; i++) {
final int temp = i;
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + "收集" + temp + "个龙珠!");
try {
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}).start();
}
}
}

image-20231206171558811

3.Semaphore (信号量)

image-20231208160348688

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class SemaphoreDemo {
public static void main(String[] args) {
// 线程数量:停车位!限流
Semaphore semaphore = new Semaphore(3);
for (int i = 0; i < 6; i++) {
new Thread(()->{
try {
semaphore.acquire();// 得到
System.out.println(Thread.currentThread().getName()+"抢到车位!");
TimeUnit.SECONDS.sleep(2);
System.out.println(Thread.currentThread().getName()+"离开车位!");
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
semaphore.release();// 释放
}
}).start();
}
}
}

image-20231208161146225

原理:
semaphore.acquire();获得,假设已经满了则等待,等待其他线程释放。
semaphore.release();释放,会将当前的信号量释放+1,然后唤醒等待的线程。

四.读写锁ReadWriteLock

ReadWriteLock接口有一个实现类ReentrantReadWriteLock类。

作用:

读可以被多个线程同时读,写的时候只能有一个线程去写

image-20231208170739895

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
/**
* 独占锁(写锁):一次只能被一个线程占有
* 共享锁(读锁):多个线程可以同时占有
* ReentrantLock:
* 读-读:可以共存
* 读-写:不可以共存
* 写-写:不可以共存
*/
public class ReentrantLockDemo {
public static void main(String[] args) {
MyCacheLock myCache = new MyCacheLock();
// 5个线程写
for (int i = 1; i <= 5; i++) {
final int temp = i;
new Thread(() -> {
myCache.put(temp + "", temp + "");
}, String.valueOf(i)).start();
}
// 5个线程读
for (int i = 1; i <= 5; i++) {
final int temp = i;
new Thread(() -> {
myCache.get(temp + "");
}, String.valueOf(i)).start();
}
}
}
class MyCacheLock {
private volatile Map<String, Object> map = new HashMap<>();
// 读写锁,更加细粒度的控制
private ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
// 写,同时只有一个线程写
public void put(String key, Object obj) {
readWriteLock.writeLock().lock();
try {
System.out.println(Thread.currentThread().getName() + "写入");
map.put(key, obj);
System.out.println(Thread.currentThread().getName() + "写入OK");
} catch (Exception e) {
e.printStackTrace();
} finally {
readWriteLock.writeLock().unlock();
}
}
// 读,所有线程都可以读
public void get(String key) {
readWriteLock.readLock().lock();
try {
System.out.println(Thread.currentThread().getName() + "读取");
map.get(key);
System.out.println(Thread.currentThread().getName() + "读取OK");
} catch (Exception e) {
e.printStackTrace();
} finally {
readWriteLock.readLock().unlock();
}
}
}

读写锁ReadWriteLock和lock的区别

  1. 用途和粒度:
    • Lock接口提供了一般的锁定机制,支持对资源的独占式访问。这意味着在任意时刻,只有一个线程可以持有该锁。
    • ReadWriteLock接口扩展了 接口,提供了更细粒度的控制。它引入了读锁和写锁的概念,允许多个线程同时持有读锁,但只允许一个线程持有写锁。Lock
  2. 并发性:
    • Lock是互斥锁,同一时刻只能有一个线程持有该锁,其他线程需要等待。这可能导致性能瓶颈,特别是在读多写少的场景下。
    • ReadWriteLock在读多写少的场景中性能更好,因为多个线程可以同时持有读锁,只有在写操作时才需要互斥。
  3. 锁的类型:
    • Lock可以被实现为独占锁( 就是一个独占锁的实现)。ReentrantLock
    • ReadWriteLock包含两种锁:读锁和写锁。读锁可以由多个线程同时持有,但写锁是独占的。
  4. 锁的获取:
    • Lock使用 和 方法来获取和释放锁。lock()``unlock()
    • ReadWriteLock使用``readLock().lock(),writeLock().lock() 和 来获取和释放锁readLock().unlock(),writeLock().unlock()`。

五.队列

1.BlockingQueue阻塞队列

image-20231208175427657

四组API

方式 抛出异常 有返回值,不抛出异常 阻塞,一直等待 阻塞,超时等待
添加 add() offer() put() offer(,,)
移除 remove() pull() take() pull(,)
检测队首元素 element() peek() - -

注意

  • 只有element()为空会抛异常,peak()为空就返回null
  • 超时阻塞的话pull()会返回会返回null
  • 用BlockingQueue要设置阻塞队列的大小
1
BlockingQueue blockingQueue=new ArrayBlockingQueue<>(3)//设置长度为3的阻塞队列

2.SynchronousQueue同步队列

进去一个元素,必须等待取出这个元素后,才能放下一个元素。

  • take()取元素
  • put()放入元素

六. 线程池

3大方法、7大参数、4大拒绝策略

1. 线程池的优势

  • 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。

  • 提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。

  • 提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。

2 线程池的使用

线程池的真正实现类是 ThreadPoolExecutor

  • 线程使用完毕,必须关闭
  • threadPool.execute()执行线程池

线程池的使用流程如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 创建线程池
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(CORE_POOL_SIZE,
MAXIMUM_POOL_SIZE,
KEEP_ALIVE,
TimeUnit.SECONDS,
sPoolWorkQueue,
sThreadFactory);
// 向线程池提交任务
threadPool.execute(new Runnable() {
@Override
public void run() {
... // 线程执行的任务
}
});
// 关闭线程池
threadPool.shutdown(); // 设置线程池的状态为SHUTDOWN,然后中断所有没有正在执行任务的线程
threadPool.shutdownNow(); // 设置线程池的状态为 STOP,然后尝试停止所有的正在执行或暂停任务的线程,并返回等待执行任务的列表
  • corePoolSize(必需):核心线程数。默认情况下,核心线程会一直存活,但是当将 allowCoreThreadTimeout 设置为 true 时,核心线程也会超时回收。
  • maximumPoolSize(必需):线程池所能容纳的最大线程数。当活跃线程数达到该数值后,后续的新任务将会阻塞
  • keepAliveTime(必需):线程闲置超时时长。如果超过该时长,非核心线程就会被回收。如果将 allowCoreThreadTimeout 设置为 true 时,核心线程也会超时回收。
  • unit(必需):指定 keepAliveTime 参数的时间单位。常用的有:TimeUnit.MILLISECONDS(毫秒)、TimeUnit.SECONDS(秒)、TimeUnit.MINUTES(分)。
  • workQueue(必需):任务队列。通过线程池的 execute() 方法提交的 Runnable 对象将存储在该参数中。其采用阻塞队列实现。
  • threadFactory(可选):线程工厂。用于指定为线程池创建新线程的方式。(有一个默认的线程池工厂)
  • handler(可选):拒绝策略。当达到最大线程数时需要执行的饱和策略。AbortPolicy(默认)

3.线程池的工作原理

img

4 线程池的参数

4.1 任务队列(workQueue)

任务队列是基于阻塞队列实现的,即采用生产者消费者模式,在 Java 中需要实现 BlockingQueue 接口。但 Java 已经为我们提供了 7 种阻塞队列的实现:

  1. ArrayBlockingQueue:一个由数组结构组成的有界阻塞队列(数组结构可配合指针实现一个环形队列)。
  2. LinkedBlockingQueue: 一个由链表结构组成的有界阻塞队列,在未指明容量时,容量默认为 Integer.MAX_VALUE
  3. PriorityBlockingQueue: 一个支持优先级排序的无界阻塞队列,对元素没有要求,可以实现 Comparable 接口也可以提供 Comparator 来对队列中的元素进行比较。跟时间没有任何关系,仅仅是按照优先级取任务。
  4. DelayQueue:类似于PriorityBlockingQueue,是二叉堆实现的无界优先级阻塞队列。要求元素都实现 Delayed 接口,通过执行时延从队列中提取任务,时间没到任务取不出来。
  5. SynchronousQueue: 一个不存储元素的阻塞队列,消费者线程调用 take() 方法的时候就会发生阻塞,直到有一个生产者线程生产了一个元素,消费者线程就可以拿到这个元素并返回;生产者线程调用 put() 方法的时候也会发生阻塞,直到有一个消费者线程消费了一个元素,生产者才会返回。
  6. LinkedBlockingDeque: 使用双向队列实现的有界双端阻塞队列。双端意味着可以像普通队列一样 FIFO(先进先出),也可以像栈一样 FILO(先进后出)。
  7. LinkedTransferQueue: 它是ConcurrentLinkedQueue、LinkedBlockingQueue 和 SynchronousQueue 的结合体,但是把它用在 ThreadPoolExecutor 中,和 LinkedBlockingQueue 行为一致,但是是无界的阻塞队列。

注意有界队列和无界队列的区别:

如果使用有界队列,当队列饱和时并超过最大线程数时就会执行拒绝策略;而如果使用无界队列,因为任务队列永远都可以添加任务,所以设置 maximumPoolSize 没有任何意义。

4.2 线程工厂(threadFactory)

线程工厂指定创建线程的方式,需要实现 ThreadFactory 接口,并实现 newThread(Runnable r) 方法。该参数可以不用指定,Executors 框架已经为我们实现了一个默认的线程工厂。

1
Executors.defaultThreadFactory()//工厂参数要传参就用这个就行,

4.3 拒绝策略 (handler)

当线程池的线程数达到最大线程数时,需要执行拒绝策略。拒绝策略需要实现 RejectedExecutionHandler 接口,并实现 rejectedExecution(Runnable r, ThreadPoolExecutor executor) 方法。不过 Executors 框架已经为我们实现了 4 种拒绝策略

  • AbortPolicy(默认):丢弃任务并抛出 RejectedExecutionException 异常。
  • CallerRunsPolicy:由调用线程处理该任务。(哪来的回哪去)
  • DiscardPolicy:丢弃任务,但是不抛出异常。可以配合这种模式进行自定义的处理方式。
  • DiscardOldestPolicy:丢弃队列最早的未处理任务,然后重新尝试执行任务。

eg:(其他拒绝策略类似)

CallerRunsPolicy使用用例:

1
new ThreadPoolExecutor.CallerRunsPolicy()//创建一个新的对象

结果就是:

main线程创建的线程池,在线程池中被拒绝以后该线程被退回了main线程

image-20231209153846785

5. 功能线程池

img

6.线程池总结

Executors 的 4 个功能线程池虽然方便,但现在已经不建议使用了,而是建议直接通过使用ThreadPoolExecutor的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。

其实 Executors 的 4 个功能线程有如下弊端:

  • FixedThreadPoolSingleThreadExecutor:主要问题是堆积的请求处理队列均采用LinkedBlockingQueue,可能会耗费非常大的内存,甚至 OOM
  • CachedThreadPoolScheduledThreadPool:主要问题是线程数最大数是 Integer.MAX_VALUE,可能会创建数量非常多的线程,甚至 OOM
  • OOM是Out Of Memory的缩写,来源于java.lang.OutOfMemoryError这个常见的报错,一般来说是Java堆中分配对象的空间不足

线程池所能容纳的最大线程数应该如何设置?

  1. CPU密集型:最大线程数,CPU几核的就是几,可以保持CPU效率最高。
1
Runtime.getRuntime().availableProcessors();//获取当前电脑的核数
  1. IO密集型:判断程序中十分耗IO的线程数量,大于这个数,一般是这个数的两倍。

七、四大函数式接口

函数式接口:只有一个方法的接口。

  1. Function<T, R>:
    • 方法:R apply(T t)
    • 描述:接收一个类型为T的参数,返回一个类型为R的结果。
  2. Predicate
    • 方法:boolean test(T t)
    • 描述:接收一个类型为T的参数,返回一个boolean类型的结果。常用于过滤、判断等场景。
  3. Consumer
    • 方法:void accept(T t)
    • 描述:接收一个类型为T的参数,没有返回值。常用于对元素的处理,如输出、修改等。
  4. Supplier
    • 方法:T get()
    • 描述:不接收任何参数,返回一个类型为T的结果。常用于提供数据的场景,如工厂方法。

eg:

Function<T, R>:接口使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22

public static void main(String[] args) {

/**
* 这是用函数式接口的实现
* new Function<String, Integer>()
* String是传入参数类型,Integer是返回值类型
*/
Function<String, Integer> function = new Function<String, Integer>() {
@Override
public Integer apply(String s) {
System.out.println(s);
return 2023;
}
};

//这是lamda表达式优化版本
Function function = s -> {
System.out.println(s);
return 2023;
};
System.out.println(function.apply("str")) };

八. Stream流运算实例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class ajva45 {
public static void main(String[] args) {
//题目要求:
//有五个用户
//筛选
//1. id为偶数
//2. 年龄必须大于23岁
//3. 用户名转为大写字母
//4. 逆序排序
// 5. 只输出一个用户
User u1=new User(1,"a",21);
User u2=new User(2,"b",22);
User u3=new User(3,"c",23);
User u4=new User(4,"d",24);
User u5=new User(5,"e",26);
List<User> userList = Arrays.asList(u1, u2, u3, u4, u5);

//stream流
userList.stream()
.filter(u->{return u.getId()%2==0;})
.filter(u->{return u.getAge()>23;})
.map(u->{u.setName(u.getName().toUpperCase());return u;})
.sorted((uu1,uu2)->{return uu2.getName().compareTo(uu1.getName());})
.limit(1)
.forEach(u->{System.out.println(u);});
}
}

九. Fork/Join 框架

1、什么是Fork/Join 框架?

Fork/Join 是一种并行计算模式,用于解决可以被分解成更小的可并行任务的问题。该模式包含两个关键操作

  • Fork(分解)

  • Join(合并)

2. Fork/Join 框架的工作原理

  1. 任务分解(Fork):如果大任务到达设定的阈值,就分解为符合条件的若干个小任务
  2. 任务执行。
  3. 任务合并(Join):父任务会收集子任务的结果汇总。
  4. 工作窃取(Work Stealing):如果有线程执行完自己的任务,会偷窃其他线程的任务队列里面的任务。

4、工作窃取是如何实现的?

(1)工作线程维护本地任务队列:// 维护线程本地队列

(2)空闲工作线程窃取任务:窃取其他工作线程的队列里面的任务,(一般是窃取队列末尾的)

(3)工作线程之间共享任务队列:(每个工作线程维护的队列都是双端队列)

(4)工作线程调度与任务窃取:

5、Java 中 Fork/Join 框架工作窃取核心类分析

Fork/Join 框架的工作窃取是通过一些关键类和算法实现的,包括 ForkJoinTask、ForkJoinPool、WorkQueue 和 Deque。下面是对这些类和算法的简要源码分析:从核心类可以探究实现原理

(1)ForkJoinTask:定义线程池执行的任务,自定义实现很重要

ForkJoinTask 是 Fork/Join 框架中任务的基类。它定义了任务的执行和拆分方法,以及任务的状态和结果等信息。
ForkJoinTask 的子类可以通过实现 compute() 方法来定义任务的具体逻辑
ForkJoinTask 还提供了 fork() 和 join() 方法来实现任务的拆分和合并操作。

(2)ForkJoinPool:

ForkJoinPool 是 Fork/Join 框架的核心类,用于管理工作线程和任务的执行。
ForkJoinPool 内部维护了一个池化的工作线程集合,每个工作线程都是 ForkJoinWorkerThread 的实例。
ForkJoinPool 提供了 execute() 和 invoke() 等方法来提交任务,并将任务分配给工作线程执行。
ForkJoinPool 还使用了一些调度算法和策略,如工作窃取和工作线程的扩展。

(3)WorkQueue:

WorkQueue 是 ForkJoinWorkerThread 的内部类,代表工作线程的任务队列。
每个工作线程都有自己的 WorkQueue,用于存储待执行的任务。
WorkQueue 使用了一种双端队列(Deque)的数据结构,通过数组来实现任务的存储和窃取操作。

6、Fork/Join 框架的使用示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;

// 继承 RecursiveTask 来创建可分解的任务
class SumTask extends RecursiveTask<Integer> {
private static final int THRESHOLD = 10; // 任务的阈值
private int[] array;
private int start;
private int end;

public SumTask(int[] array, int start, int end) {
this.array = array;
this.start = start;
this.end = end;
}

@Override
protected Integer compute() {
if (end - start <= THRESHOLD) {
// 如果任务足够小,直接计算结果
int sum = 0;
for (int i = start; i < end; i++) {
sum += array[i];
}
return sum;
} else {
// 否则,将任务拆分为更小的子任务
int mid = (start + end) / 2;
SumTask leftTask = new SumTask(array, start, mid);
SumTask rightTask = new SumTask(array, mid, end);

// 并行执行子任务
leftTask.fork();
rightTask.fork();

// 合并子任务的结果
int leftResult = leftTask.join();
int rightResult = rightTask.join();

// 返回最终结果
return leftResult + rightResult;
}
}
}

public class ForkJoinExample {
public static void main(String[] args) {
int[] array = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10};

// 创建 ForkJoinPool 实例
ForkJoinPool forkJoinPool = ForkJoinPool.commonPool();

// 创建任务实例
SumTask task = new SumTask(array, 0, array.length);

// 提交任务给 ForkJoinPool 执行
int result = forkJoinPool.invoke(task);
System.out.println("Sum: " + result);
}
}

小结

这个框架就是分治策略的体现,分而治之。将一个大问题拆分为一个个小问题来解决,充分利用多核的优势。Fork/Join 框架最适合处理计算密集型的任务,其中任务的执行时间较长,可以充分利用并行性提高计算效率。对于I/O密集型任务或需要频繁进行I/O操作的任务,Fork/Join 框架可能不是最佳选择,因为它的主要优势在于处理并行计算。

十. CompletableFuture

1.使用原因

  • CompletableFuture是由Java 8引入的,在Java8之前我们一般通过Future实现异步。Future用于表示异步计算的结果,只能通过阻塞或者轮询的方式获取结果,不支持设置回调方法,Java 8之前若要设置回调一般会使用 Google guava的 ListenableFuture,回调的引入又会导致臭名昭著的回调地狱。
  • CompletableFuture对Future进行了扩展,可以通过设置回调的方式处理异步计算结果,同时还支持通过函数式编程的方式对各类操作进行组合编排,并且一定程度上解决了回调地狱的问题。

回调地狱:简单来说就是回调函数中嵌套回调函数的情况就叫做回调地狱,回调地狱就是为是实现代码顺序执行而出现的一种操作,它会造成我们的代码可读性非常差,后期不好维护。

  CompletableFuture实现了两个接口:Future、CompletionStage。Future表示异步计算的结果,CompletionStage 用于表示异步执行过程中的一个步骤(Stage),这个步骤可能是由另外一个CompletionStage触发的,随着当前步骤的完成,也可能会触发其他一系列CompletionStage的执行。从而我们可以根据实际业务对这些步骤进行多样化的编排组合,CompletionStage接口正是定义了这样的能力,我们可以通过其提供的thenAppy、thenCompose等函数式编程方法来组合编排这些步骤。

注意:

已下的测试用例里面的线程池为自定义的线程池。通过测试方法传参传入测试方法。

如果不指定线程池的话会使用默认的线程池。就是forkjoin维护的forkjoincommon 线程池

1
2
3
4
5
6
7
8
//自定义的线程池
ThreadPoolExecutor threadPoolExecutor=new ThreadPoolExecutor(
3,
5,
4, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy());

2.异步回调

1
2
3
4
5
6
7
8
public static void testCompleteCallback() throws Exception{
CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + " 开始执行");
return 1;
}).whenComplete((result, exception) -> {
System.out.println("所有任务执行完成,返回结果: " + result);
});
}

3.多任务串行化执行

串行化执行多个异步任务:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public static void testCompleteCallback( ThreadPoolExecutor threadPoolExecutor) throws Exception{

CompletableFuture.supplyAsync(() -> { //
System.out.println(Thread.currentThread().getName()+" 开始执行");
return 1;
},threadPoolExecutor).thenApply(integer -> {
System.out.println(Thread.currentThread().getName()+" 接收到上一个任务的处理结果为:"+integer);
return integer+1;
}).thenApply(integer -> {
System.out.println(Thread.currentThread().getName()+" 接收到上一个任务的处理结果为:"+integer);
return integer+10;
}).whenComplete((result,exception)->{
System.out.println("所有任务执行完成,返回结果: "+result);
}).exceptionally(e-> {
e.printStackTrace();
return null;
});
}

执行结果:

这里卖个关子,至于这个为什么会出现main线程和线程池线程都出现的情况。看看Async异步执行与线程池这里我会说。

image-20231210112317450

说明:

  • supplyAsync:开始异步任务,方法需要一个 Supplier函数,无输入,有输出
  • thenApply:上一个异步任务执行完之后,拿到返回结果作为入参,继续执行新任务,方法需要Function函数,有输入,有输出
  • whenComplete: 所有任务执行完成之后的回调,入参中有返回结果和异常信息;
  • exceptionally:专门处理异常信息的回调方法;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public static void testSerialExe2()  throws Exception{

CompletableFuture.runAsync(() -> { //runAsync: 无输入、无输出
System.out.println(Thread.currentThread().getName()+" exe completableFuture1...");
},executorService).thenAccept(integer -> { //thenAccept: 有输入、无输出
System.out.println(Thread.currentThread().getName()+" 接收到上一个任务的处理结果为:"+integer);
}).thenRun(() -> { //thenRun: 无输入、无输出
System.out.println(Thread.currentThread().getName()+" 执行");
});


//thenCompose :连接两个 CompletableFuture
CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName()+" cf1...");
return 1;
}).thenCompose(c -> {
return CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName()+" cf3...");
return c+3;
});
}).whenComplete((r,e)-> {
System.out.println(r);
});
}

说明:

  • runAsync:开始异步任务,无输入,无输出;
  • thenAccept:继续执行异步任务,可以拿到上一个任务的执行结果。有输入、无输出
  • thenRun:无输入,无输出
  • thenCompose:与thenAccept 类似,区别在于thenCompose用于连接两个CompletableFuture

4. Async异步执行与线程池

1) 线程情况

我们看一下CompletableFuture的执行线程情况。

CompletableFuture实现了CompletionStage接口,通过丰富的回调方法,支持各种组合操作,每种组合场景都有同步和异步两种方法。

同步方法(即不带Async后缀的方法)有两种情况:

  • 如果注册时被依赖的操作已经执行完成,则直接由当前线程执行。情况一
  • 如果注册时被依赖的操作还未执行完,则由回调线程执行。情况二

异步方法(即带Async后缀的方法)

  • 可以选择是否传递线程池参数Executor运行在指定线程池中;当不传递Executor时,会使用ForkJoinPool中的共用线程池CommonPool(CommonPool的大小是CPU核数-1,如果是IO密集的应用,线程数可能成为瓶颈)。

同步方法(即不带Async后缀的方法)有两种情况:代码演示

(这里只是展示了部分代码)

代码1->>>演示情况一

1
2
3
4
5
6
7
CompletableFuture.supplyAsync(() -> {	//
System.out.println(Thread.currentThread().getName()+" 开始执行");
return 1;
},threadPoolExecutor).thenApply(integer -> {
System.out.println(Thread.currentThread().getName()+" 接收到上一个任务的处理结果为:"+integer);
return integer+1;
});

结果:

image-20231210113502752

代码2->>>演示情况二

1
2
3
4
5
6
7
8
9
10
11
12
CompletableFuture.supplyAsync(() -> {	//
System.out.println(Thread.currentThread().getName()+" 开始执行");
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return 1;
},threadPoolExecutor).thenApply(integer -> {
System.out.println(Thread.currentThread().getName()+" 接收到上一个任务的处理结果为:"+integer);
return integer+1;
})

结果

image-20231210113719007

2) 异步回调要传线程池

  前面提到,异步回调方法可以选择是否传递线程池参数Executor,这里我们建议强制传线程池,且根据实际情况做线程池隔离

5.多任务并行化执行

首先看下两个异步任务并行化执行的场景:

1)thenAcceptBoth 和 thenCombine

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public static void testTwoTaskExe()  throws Exception{

CompletableFuture<Integer> cf1 = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName()+"cf1...");
return 1;
});
CompletableFuture<Integer> cf2 = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName()+"cf2...");
return 2;
});
CompletableFuture<Void> voidCompletableFuture = cf1.thenAcceptBoth(cf2, (resultA, resultB) -> {
System.out.println();
});
CompletableFuture<Integer> stringCompletableFuture = cf1.thenCombine(cf2, (resultA, resultB) -> {
System.out.println(resultA+resultB);
return resultA+resultB;
});
}

thenAcceptBoththenCombine 都是等待两个CompletionStage 任务执行完成后,拿到两个返回结果后统一处理;唯一的区别是thenAcceptBoth 本身无有返回值,thenCombine 有返回值;

2)anyof 和allof

多个异步任务 并行化执行: anyof, allof

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public static void testParallelExe()  throws Exception{
//准备两个异步任务 CompletableFuture
CompletableFuture<Integer> completableFuture0 = CompletableFuture.supplyAsync(() -> {
doWork();
System.out.println("exe completableFuture0....");
return 0;
});
CompletableFuture<Integer> completableFuture1 = CompletableFuture.supplyAsync(() -> {
doWork();
System.out.println("exe completableFuture1....");
return 1;
});
//anyOf:任意一个任务执行成功就返回,并且可以拿到返回结果
CompletableFuture.anyOf(completableFuture0, completableFuture1)
.thenAccept(integer -> {
System.out.println("其中一个任务执行成功,执行结果为:"+integer);
});
//allOf:所有任务都执行成功才能继续执行 (返回值没有直接提供所有异步结果)
CompletableFuture.allOf(completableFuture0,completableFuture1)
.thenAccept(r->{
System.out.println("所有异步任务执行成功");
});
}

  anyOf()表示只要任意一个异步任务执行成功,就进行下一步,allOf() 表示必须全部的异步任务执行成功,才能进行下一步,这些组合操作可以实现非常复杂的异步流程控制。

  上面 allOf 的示例,在thenAccept 的方法入参中并不能拿到所有异步任务的执行结果,CompletableFuture 的设计中并没有直接提供,要想获取所有异步任务的返回结果,还需要编写一些额外代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public static void testParallelExe()  throws Exception{
//获取所有异步任务的返回结果
List<CompletableFuture<Integer>> completableFutures =
Arrays.asList(completableFuture0, completableFuture1);

CompletableFuture
.allOf(completableFutures.toArray(new CompletableFuture[completableFutures.size()]))
.thenApply(v->{//所有任务执行完毕,获取返回结果
Stream<Integer> integerStream = completableFutures.stream().map(CompletableFuture::join);
return integerStream.collect(Collectors.toList());
}).thenAccept(v->{ //拿到所有返回结果
v.forEach(System.out::println);
});
}

注:Java9中的 CompletableFuture还添加了completeOnTimeoutorTimeout 等关于超时的方法,方便对超时任务的处理。

6.超时处理

Java9开始支持CompletableFuture 的超时处理操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public static void testTimeout() throws Exception{
//completeOnTimeout:超过指定时间后,返回给定的值,不抛异常
CompletableFuture.supplyAsync(() -> {
doWork();
System.out.println(Thread.currentThread().getName() + " 开始执行");
return 1;
},executorService).completeOnTimeout(-1, 100L, TimeUnit.MILLISECONDS)
.whenComplete((result, exception) -> {
System.out.println("所有任务执行完成,返回结果: " + result);
});
//orTimeout:超过指定时间后,抛异常 TimeoutException
CompletableFuture.supplyAsync(() -> {
doWork();
System.out.println(Thread.currentThread().getName() + " 开始执行");
return 1;
},executorService).orTimeout(10L, TimeUnit.MILLISECONDS)
.whenComplete((result, exception) -> {
System.out.println("所有任务执行完成,返回结果: " + result);
if(exception!=null) exception.printStackTrace();
});
}

7.总结

常用API方法总结:

方法 描述 输入输出
supplyAsync 表示开始执行异步任务 无输入,有输出
runAsync 表示开始执行异步任务 无输入,无输出
thenAccept 上一个任务执行完成后继续执行新任务,可接收上一个任务的处理结果 有输入,无输出
thenRun 上一个任务执行完成后继续执行新任务 无输入,无输出
thenApply 上一个任务执行完成后继续执行新任务 有输入,有输出
thenCompose 同上,与thenApply的区别在于他的入参是CompletableFuture, 用于串行连接两个CompletableFuture 有输入,有输出
thenAcceptBoth 两个任务都执行完成,可以得到两个返回结果,本身无返回值 有输入,无输出
thenCombine 同上,区别在于它本身有返回值; 有输入,有输出
whenComplete 所有任务执行完成后或发生异常时调用 有输入,无输出
exceptionally 当发生异常时调用 有输入,有输出
anyof 用于并行执行异步任务,任意一个任务执行成功就返回,并且可以拿到返回结果
allOf 用于并行执行异步任务,所有任务都执行成功才能继续执行
completeOnTimeout 超过指定时间后,返回给定的值,不抛异常
orTimeout 超过指定时间后,抛异常

十一.JMM

什么是JMM

内存模型可以理解为在特定的操作协议下,对特定的内存或者高速缓存进行读写访问的过程抽象描述,不同架构下的物理机拥有不一样的内存模型,Java虚拟机是一个实现了跨平台的虚拟系统,因此它也有自己的内存模型,即Java内存模型(Java Memory Model, JMM)

JMM结构规范

JMM规定了所有的变量都存储在主内存(Main Memory)中。每个线程还有自己的工作内存(Working Memory),线程的工作内存中保存了该线程使用到的变量的主内存的副本拷贝,线程对变量的所有操作(读取、赋值等)都必须在工作内存中进行,而不能直接读写主内存中的变量(volatile变量仍然有工作内存的拷贝,但是由于它特殊的操作顺序性规定,所以看起来如同直接在主内存中读写访问一般)。不同的线程之间也无法直接访问对方工作内存中的变量,不同线程之间值的传递都需要通过主内存来完成。

JMM的作用范围

在java中,所有实例域、静态域和数组元素存储在堆内存中,堆内存在线程之间共享(本文使用“共享变量”这个术语代指实例域,静态域和数组元素)。局部变量(Local variables),方法定义参数(java语言规范称之为formal method parameters)和异常处理器参数(exception handler parameters)不会在线程之间共享,它们不会有内存可见性问题,也不受内存模型的影响。

image-20231211203709344

1)JMM三大特性

1. 原子性

一个操作不能被打断,要么全部执行完毕,要么不执行。在这点上有点类似于事务操作,要么全部执行成功,要么回退到执行该操作之前的状态。

基本类型数据的访问大都是原子操作,long 和double类型的变量是64位,但是在32位JVM中,32位的JVM会将64位数据的读写操作分为2次32位的读写操作来进行,这就导致了long、double类型的变量在32位虚拟机中是非原子操作,数据有可能会被破坏,也就意味着多个线程在并发访问的时候是线程非安全的。

如果需要原子性

可以考虑使用 synchronized 关键字或 java.util.concurrent 包中提供的原子类(例如 AtomicInteger)。这些机制可以确保一系列操作作为一个原子单元执行。

2. 可见性

一个线程对共享变量做了修改之后,其他的线程立即能够看到(感知到)该变量的这种修改(变化)。

Java内存模型是通过将在工作内存中的变量修改后的值同步到主内存,在读取变量前从主内存刷新最新值到工作内存中,这种依赖主内存的方式来实现可见性的。

无论是普通变量还是volatile变量都是如此,区别在于:volatile的特殊规则保证了volatile变量值修改后的新值立刻同步到主内存,每次使用volatile变量前立即从主内存中刷新,因此volatile保证了多线程之间的操作变量的可见性,而普通变量则不能保证这一点。

除了volatile关键字能实现可见性之外,还有synchronized,Lock,final也是可以的。

3.有序性

在本线程内观察,操作都是有序的;如果在一个线程中观察另外一个线程,所有的操作都是无序的。前半句是指“线程内表现为串行语义(WithIn Thread As-if-Serial Semantics)”,后半句是指“指令重排”现象和“工作内存和主内存同步延迟”现象。

Java提供了两个关键字volatilesynchronized来保证多线程之间操作的有序性**,volatile关键字本身通过加入内存屏障来禁止指令的重排序,而synchronized关键字通过一个变量在同一时间只允许有一个线程对其进行加锁**的规则来实

一个最经典的例子

银行汇款问题,一个银行账户存款100,这时一个人从该账户取10元,同时另一个人向该账户汇10元,那么余额应该还是100。那么此时可能发生这种情况,A线程负责取款,B线程负责汇款,A从主内存读到100,B从主内存读到100,A执行减10操作,并将数据刷新到主内存,这时主内存数据100-10=90,而B内存执行加10操作,并将数据刷新到主内存,最后主内存数据100+10=110,显然这是一个严重的问题,我们要保证A线程和B线程有序执行,先取款后汇款或者先汇款后取款,此为有序性。

2)volatile关键字

  1. 可见性。(可见性和JMM挂钩)
  2. 不保证原子性。
  3. 禁止指令重排。

volatile原理

“观察加入volatile关键字和没有加入volatile关键字时所生成的汇编代码发现,加入volatile关键字时,会多出一个lock前缀指令”

lock前缀指令实际上相当于一个内存屏障(也成内存栅栏),内存屏障会提供3个功能:

1)它确保指令重排序时不会把其后面的指令排到内存屏障之前的位置,也不会把前面的指令排到内存屏障的后面;即在执行到内存屏障这句指令时,在它前面的操作已经全部完成;

2)它会强制将对缓存的修改操作立即写入主存;

3)如果是写操作,它会导致其他CPU中对应的缓存行无效。

使用volatile关键字的场景

synchronized关键字是防止多个线程同时执行一段代码,那么就会很影响程序执行效率,而volatile关键字在某些情况下性能要优于synchronized,但是要注意volatile关键字是无法替代synchronized关键字的,因为volatile关键字无法保证操作的原子性。

我的理解就是需要保证操作是原子性操作,才能保证使用volatile关键字的程序在并发时能够正确执行。

3)重排序

在执行程序时为了提高性能,编译器和处理器常常会对指令做重排序。重排序分三种类型:

image-20231211211057848

十二. 单例模式

1.饿汉式单例模式

1
2
3
4
5
6
7
8
9
10
11
/**
* 饿汉式单例模式,一加载就创建单例对象
*/
public class Hungry {
private Hungry(){
}
private static final Hungry hungry = new Hungry();
public static Hungry getInstance(){
return hungry;
}
}

2.懒汉式单例模式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/**
这种方式下,单例实例在第一次被请求时才会被创建。
需要注意多线程情况下的线程安全性问题。示例代码如下:
*/

public class Singleton {
private static Singleton instance;

private Singleton() {
// 私有构造函数
}

public static synchronized Singleton getInstance() {
if (instance == null) {
instance = new Singleton();
}
return instance;
}
}

3.双重检测锁单例模式DCL

相对比较安全创建单例的模式,但是反射可以破坏

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
/**
* 懒汉式单例模式
*/
public class LazyMan01 {
private LazyMan01(){

}
// 加上volatile关键字禁止指令重排
private volatile static LazyMan01 lazyMan01;
/**
* 双重检测锁模式,DCL懒汉式
* @return
*/
public static LazyMan01 getInstance(){
if (lazyMan01==null){
synchronized(LazyMan01.class){
if (lazyMan01==null){
lazyMan01 = new LazyMan01();// 不是原子性操作,可能会有指令重排
/*
* 1.分配内存空间
* 2.执行构造器,初始化对象
* 3.把对象指向内存空间
* 正常情况下:按照123执行;发生指令重排,可能会先执行132
* 多线程情况下:如果第一个线程执行了13,此时第二个线程过来可能就会判断lazyMan01不为空,直接就返回了lazyMan01
* 此时,lazyMan01对象内存是空的。
* */
}
}
}
return lazyMan01;
}
}

4.静态内部类(Static Inner Class)

这种方式利用了类加载机制和类初始化的线程安全性,保证了在需要时才加载并初始化单例实例。示例代码

1
2
3
4
5
6
7
8
9
10
11
12
13
public class Singleton {
private Singleton() {
// 私有构造函数
}

private static class SingletonHolder {
private static final Singleton instance = new Singleton();
}

public static Singleton getInstance() {
return SingletonHolder.instance;
}
}

5.枚举来创建单例

使用枚举创建单例是一种简单且线程安全的方式。在Java中,枚举类型的实例在加载时就会被初始化,而且由于枚举类的特殊性,能够防止通过反射和序列化破坏单例。以下是使用枚举创建单例的示例代码:

1
2
3
4
5
6
7
8
9
public enum Singleton {
INSTANCE;
// 这里可以添加其他方法和属性

public void doSomething() {
System.out.println("Singleton instance is doing something.");
}
}

在这个例子中,Singleton 是一个枚举类型,其中 INSTANCE 是该枚举的一个实例。这个实例在枚举加载时被创建,保证了线程安全性。
使用时,你可以通过 Singleton.INSTANCE 来访问这个单例实例,并调用其方法:

1
Singleton.INSTANCE.doSomething();

优势和注意事项:

1.线程安全: 由于枚举的特性,创建的单例实例是线程安全的。
2.防止反射攻击: 枚举类型的实例在类加载时就被创建,因此不容易受到反射攻击。
3.防止序列化问题: 默认的枚举序列化机制是安全的,防止了通过序列化和反序列化破坏单例的问题。
4.简洁明了: 枚举方式创建单例非常简洁,不需要手动编写大量的代码。

请注意,虽然枚举方式创建单例在许多情况下是一种很好的选择,但并不适用于所有情况。例如,如果你的单例需要延迟初始化或者具有复杂的构造函数,可能需要考虑其他实现方式。

十三. CAS

1.CAS是什么

比较并交换:compare and swap !在多线程环境下尽量不要用i++,要用getAndIncrement()

1
2
3
4
5
6
7
8
9
10
11
public class CASDemo {
public static void main(String[] args) {
AtomicInteger atomicInteger=new AtomicInteger(5);
// 期望,更新:public final boolean compareAndSet(int expect, int update)
// 如果我期望的值达到了(5)就更新,否则不更新。
System.out.println(atomicInteger.compareAndSet(5, 2019));//true
System.out.println(atomicInteger);//2019
System.out.println(atomicInteger.compareAndSet(5, 1024));//false
System.out.println(atomicInteger);//2019
}
}

用图解释一下

两个工作线程(其实都是main线程的)凑合看看

img

2. CAS底层原理

CAS全称是Compare-And-Swap,它是一条CPU并发原语,它的功能是判断内存某个位置是否为预期值如果是则更改为新的值,这个过程是原子的。原语是操作系统的语言是由若干条指令组成的,用于完成某一个功能的过程,原语执行必须是连续的不能被打断也就是说CAS操作是原子的所以atomicInteger.getAndIncrement()能保证线程安全。那为什么CAS原子操作就能保证不被中断呢这涉及到了底层的一些东西Unsafe类中的compareAndSwapInt,是一个本地方法该方法位于unsafe.cpp中其中的Atomic::cmpxchg(x,addr,e)只要有Atomic那他就不可被中断一定是原子的,x是新值和addr上之前的值e作比较如果相等则替换不然就不换。

3.CAS的缺点

  1. 循环会耗时。
  2. 一次性只能保证一个共享变量的原子性。
  3. 存在ABA问题。

什么是ABA问题

简而言之就是(狸猫换太子)

“ABA”问题:假设t1线程工作时间为10秒,t2线程工作时间为2秒,那么可能在A的工作期间,主内存中的共享变量 A已经被t2线程修改了多次,只是恰好最后一次修改的值是该变量的初始值,虽然用CAS判定出来的结果是期望值,但是却不是原来那个了—->“狸猫换太子”

4.原子引用解决ABA问题

(乐观锁)版本号+1

只要发现版本号不对,就说明数据修改过,我就不用。

十四 . 死锁

死锁:线程A持有锁A,想要获得锁B;线程B持有锁B,想要获得锁A。

解决方法:查看堆栈信息

  1. 使用jps -l命令查看进程号。(该命令在JDK的bin目录下)
  2. 使用jstack+进程号,找到死锁问题。