JUC
1、什么是JUC?
JUC是java.util.concurrent包的简称,在Java5.0添加,目的就是为了更好的支持高并发任务。在 Java 5.0 提供了 java.util.concurrent
(简称JUC)包,在此包中增加了在并发编程中很常用的工具类,用于定义类似于线程的自定义子系统,包括线程池,异步 IO 和轻量级任务框架;还提供了设计用于多线程上下文中的 Collection 实现等。
看一下api中的包:
2、线程与进程
一个进程包含多个线程(至少包含一个)
java默认有几个线程?答案是2个,一个是main一个是GC垃圾回收。
Java开启线程的三种方式:
Thread,继承Thread类
start()方法的作用是启动一个分支线程,在JVM中开辟一个新的栈空间。只要新的栈空间,myThread.start()瞬间就结束了,线程就启动成功了。启动成功的线程会自动调用run方法,并且run方法在分支线的栈底部。
创建一个新的类继承自Thread类,并重写run()方法。
通过调用新类的实例的start()方法来启动线程。
实现Runnable接口
创建一个实现了Runnable接口的新类,并实现其run()方法。
将此类的实例作为参数传递给Thread类的构造函数,然后调用Thread对象的start()方法来启动线程。
Callable
Callable接口类似于Runnable,不同之处在于Callable的方法可以返回结果并且可以抛出异常。
实现Callable接口的类必须实现call()方法而不是run()方法。
使用FutureTask包装Callable对象,然后将FutureTask作为参数传递给Thread类的构造函数,最后调用Thread对象的start()方法来启动线程。
可以通过FutureTask的get()方法获取异步计算的结果。
所以,Java真的能开启线程吗?
当然不行,Java只不过是调用本地方法了,真正是操作系统开的
public synchronized void start() {//同步方法
/**
* This method is not invoked for the main method thread or "system"
* group threads created/set up by the VM. Any new functionality added
* to this method in the future may have to also be added to the VM.
*
* A zero status value corresponds to state "NEW".
*/
if (threadStatus != 0)
throw new IllegalThreadStateException();
/* Notify the group that this thread is about to be started
* so that it can be added to the group's list of threads
* and the group's unstarted count can be decremented. */
//把当前线程加入到线程组
group.add(this);
boolean started = false;
try {
start0();//最终调用的是这个方法
started = true;//线程启动把状态变成true
} finally {
try {
if (!started) {
group.threadStartFailed(this);
}
} catch (Throwable ignore) {
/* do nothing. If start0 threw a Throwable then
it will be passed up the call stack */
}
}
}
//调用的是本地方法,是底层的C++,Java无法直接操作硬件
private native void start0();
3、并发和并行
并发(Concurrency) 并发是指在同一时间段内处理多个任务的能力。这些任务可能看起来像是同时执行的,但实际上它们是由操作系统或运行时环境调度的。在一个核心上处理多个任务,但是是一个个执行的,只要我切换任务处理的快就像是同时执行,快速交替(只要我快你就发现不了)。并发编程的本质就是最大化利用CPU资源。 特点:
多个任务可以在同一时间段内交替执行。
操作系统负责调度和切换任务。
主要目的是提高系统的响应性和效率。
并行(Parallelism) 并行是指同时执行多个任务的能力。这些任务真正地同时运行在不同的处理器核心上。正儿八经的多核多处理任务。 特点:
多个任务真正地同时执行。
需要多核处理器的支持。
主要目的是提高计算速度和性能。
来看一下你的电脑cpu多少核心,输出多少就是多少
public class Test1 {
public static void main(String[] args) {
//获取本机cpu核心数
//CPU密集型和IO密集型来看线程池大小的设置
System.out.println(Runtime.getRuntime().availableProcessors());
}
}
4、线程的几个状态
1、新建状态(New) 线程对象被创建但尚未启动。例如,当使用new Thread()创建一个新的线程对象时,这个线程就处于新建状态。 2、可运行状态(Runnable) 线程对象的start()方法被调用之后,线程进入可运行状态。这意味着线程已经准备好运行,但是它何时运行取决于操作系统的线程调度器。 3、运行状态(Running) 线程获得了CPU的时间片并正在执行。需要注意的是,由于操作系统调度的原因,线程可能很快又回到可运行状态。和可运行态可以算是一个。 4、阻塞状态(Blocked) 线程因为某种原因放弃了CPU使用权而暂时停止运行。阻塞状态可以分为多种情况: 等待阻塞:线程执行了wait()方法或其他等待方法,如join(),等待其他线程唤醒。 同步阻塞:线程试图获取一个已经被其他线程锁定的对象锁。 其他阻塞:如执行sleep()方法,或者发出了I/O请求等。 5、等待状态(Waiting) 线程处于无限期等待状态,等待其他线程执行特定的动作。例如,调用Object.wait()方法会使线程进入等待状态,直到另一个线程调用notify()或notifyAll()方法。 6、计时等待状态(Timed Waiting) 线程处于有限期等待状态。例如,调用Thread.sleep(long millis)或Object.wait(long timeout)方法会使线程进入计时等待状态,在指定的时间过后自动恢复到可运行状态。 7、终止状态(Terminated) 线程执行完毕或者因为异常退出了run()方法,该线程的生命期结束。
源码中的
5、wait和sleep
对比一下不同:
Thread.sleep(long millis)
所属类:Thread类中的方法。
作用:使当前正在执行的线程暂停执行指定的时间(毫秒),并将线程的状态设置为TIMED_WAITING。
锁状态:调用sleep()方法不会释放对象的锁,即使当前线程持有锁也不会释放。抱着锁睡就不放。
使用范围:可以在任何地方调用。
异常:如果当前线程被中断,sleep()方法会抛出InterruptedException,它必须要捕获异常。
Object.wait()
所属类:Object类中的方法。
作用:使当前线程等待,直到其他线程调用notify()或notifyAll()方法唤醒它,或者等待指定的时间后自动唤醒。
锁状态:调用wait()方法会释放对象的锁。
使用范围:只能在同步方法或同步代码块中调用,否则会抛出IllegalMonitorStateException。
异常:同样,如果当前线程被中断,wait()方法也会抛出InterruptedException。
6、Lock锁
6.1、Lock 接口
Lock接口位于java.util.concurrent.locks包中,它提供了一种显式的锁机制,允许更细粒度的锁控制。它有三个实现类分别是:ReentrantLock (可重入锁),ReentrantLockReadWriteLock(可重入读写锁),ReentrantLockReadWriteLock.WriteLock 方法:
void lock():获取锁,如果锁已被其他线程持有,则当前线程将等待,直到它能够获取锁。
void unlock():释放锁。
boolean tryLock():尝试获取锁,如果锁不可用,则立即返回false,而不是让线程等待。
boolean tryLock(long time, TimeUnit unit):尝试获取锁,如果锁不可用,则等待一段时间,超过指定等待时间后仍未获取到锁则返回false。
ReentrantLock 类
ReentrantLock是Lock接口的一个实现,它支持重入性,即一个线程可以多次获取同一个锁。
公平锁与非公平锁:ReentrantLock可以配置为公平锁或非公平锁。公平锁按照线程请求锁的顺序来分配锁,而非公平锁则可能会让后来的线程先获得锁。
可中断的锁获取:使用lockInterruptibly()方法可以使得线程在等待锁的过程中可以被中断。
6.2、可重入锁ReentrantLock
可重入锁支持重入性,能对共享资源重复加锁,即当前线程获得该锁之后再次获取不会被阻塞。
需要保证:
在线程获取锁的时候,如果已经获得锁的线程就是当前线程的话直接再次获取成功;
锁被获取n次,那么只能被释放n次之后才能算是完全释放。
ReentrantLock 源码分析(非公平锁)
先创建一个ReentrantLock 对象
Lock lock = new ReentrantLock();
查看一下ReentrantLock 的构造器,看到这个是非公平锁的实现
public ReentrantLock() {
sync = new NonfairSync();
}
再点开NonfairSync
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
/**
* 执行锁操作。尝试立即抢占锁,如果失败则退回到正常的获取过程。
*/
final void lock() {
if (compareAndSetState(0, 1))// 尝试将状态从 0 改为 1,即尝试获取锁
// 如果成功,设置当前线程为独占所有者
setExclusiveOwnerThread(Thread.currentThread());
else//如果失败,调用 acquire(1) 继续尝试获取锁
acquire(1);
}
//tryAcquire这个受保护的最终方法,返回一个布尔值,表示是否成功获取锁。参数 acquires 表示获取锁的次数
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
//acquire方法用于在独占模式下获取锁,并且忽略中断。
public final void acquire(int arg) {
//1.尝试获取锁:调用 tryAcquire(arg) 尝试获取锁。如果成功,则方法结束。
//2.加入等待队列:如果 tryAcquire 失败,则调用 addWaiter(Node.EXCLUSIVE) 将当前线程加入等待队列。
//3.排队等待:调用 acquireQueued 方法,使当前线程在队列中等待,直到获取锁成功。
//4.中断处理:如果在等待过程中被中断,则调用 selfInterrupt 恢复中断状态。
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
再看一下tryAcquire方法中调用的有nonfairTryAcquire方法(这里是非公平锁的实现方法),用于处理实现非公平锁的获取逻辑。
final boolean nonfairTryAcquire(int acquires) {
//获取当前线程
final Thread current = Thread.currentThread();
//获取锁的状态
int c = getState();
if (c == 0) {//为0就表示还没人持有锁
if (compareAndSetState(0, acquires)) {//CAS操作把锁的状态从0设置为传进来的acquires
setExclusiveOwnerThread(current);//设置当前线程为独占线程
return true;
}
}//state不为0就代表有人持有锁
else if (current == getExclusiveOwnerThread()) {//看看是不是自己获得的这个锁
int nextc = c + acquires;//如果是自己的就把state增加acquires个数
if (nextc < 0) // overflow溢出检查
throw new Error("Maximum lock count exceeded");// 抛出异常,表示最大锁计数超出
setState(nextc);//设置当前已经更新过的state
return true;
}
return false;//有人持有锁了,而且锁还不是自己持有的那就不能获得锁返回false
}
获取锁的逻辑是以上这些,下面是释放锁的逻辑
protected final boolean tryRelease(int releases) {
int c = getState() - releases;// 获取当前状态并减去释放次数
if (Thread.currentThread() != getExclusiveOwnerThread()) // 检查当前线程是否是锁的持有者
throw new IllegalMonitorStateException();//如果不是就抛出非法监控异常
boolean free = false;//记录锁是否完全释放
if (c == 0) {//锁数量被减完了
free = true;//设置中间标记变量为true
setExclusiveOwnerThread(null);//设置独占锁线程为空
}
setState(c);//更新state数
return free;
}
需要注意的是锁的同步状态state必须清零才能算是完全释放。
支持公平锁
ReentrantLock 支持公平锁和非公平锁的,刚才是非公平锁的呃获取锁逻辑,下面看一下公平锁的获取锁逻辑。
公平锁和非公平锁的切换就在于ReentrantLock 的构造器上,传参写true就是公平锁,不写或者false就是非公平
来看一下公平锁版的获取锁逻辑
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&// 检查是否有前驱节点在等待队列中
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
和非公平锁的主要区别就是获取的检查中多个是否有前驱节点在等待队列,如果有的话根据FIFO先来后到有比当前线程更早来请求资源的,根据公平性当前线程获取锁就会失败,如果当前节点没有前驱节点才走后面的流程。
ReentrantLock 的使用
ReentrantLock 的使用方式与 Synchronized 关键字类似,都是通过加锁和释放锁来实现同步的。
public class ReentrantLockDemo {
private static final ReentrantLock lock = new ReentrantLock();
private static int count = 0;
public static void main(String[] args) throws InterruptedException {
Thread thread1 = new Thread(() -> {
for (int i = 0; i < 10000; i++) {
lock.lock();
try {
count++;
} finally {
lock.unlock();
}
}
});
Thread thread2 = new Thread(() -> {
for (int i = 0; i < 10000; i++) {
lock.lock();
try {
count++;
} finally {
lock.unlock();
}
}
});
thread1.start();
thread2.start();
thread1.join();
thread2.join();
System.out.println(count);
}
}
结果是20000
与 传统的Synchronized 的对比
ReentrantLock 是类,而Synchronized 是Java的关键字
ReentrantLock 可以实现多路选择通知(可以绑定多个Condition),而 synchronized 只能通过 wait 和 notify/notifyAll 方法唤醒一个线程或者唤醒全部线程(单路通知);
可重入性:两者都支持可重入性,即一个线程可以多次获取同一个锁。
非阻塞尝试:Lock接口提供了tryLock()方法,允许非阻塞地尝试获取锁,而synchronized总是阻塞的。
锁的共享:ReentrantLock支持读写锁(通过ReentrantReadWriteLock),而synchronized只支持独占锁。
中断响应:Lock可以通过lockInterruptibly()方法响应中断信号,而synchronized则不能直接响应中断。
锁释放:Lock需要显式地调用unlock()方法来释放锁,而synchronized会在作用域结束时自动释放锁。
ReentrantLock在高竞争环境下的性能更好
演示:先不加任何锁,就三个线程去抢票,30个票
/**
* 现实中的多线程开发,线程是一个单独的资源类,它没有任何的附属操作
* 需要的定义:
* 1、 属性和方法
*/
public class SaleTicketDemo01 {
public static void main(String[] args) {
//并发是多个线程操作同一个资源类,把资源类放到线程里
Ticket ticket = new Ticket();
//多个线程,使用lambda表达式
new Thread(()->{
for (int i = 1; i < 40; i++) {
ticket.sale();
}
},"A").start();
new Thread(()->{
for (int i = 1; i < 40; i++) {
ticket.sale();
}
},"B").start();
new Thread(()->{
for (int i = 1; i < 40; i++) {
ticket.sale();
}
},"C").start();
}
}
//资源类
class Ticket{
//属性 方法
private int number = 30;
//方法是卖票
public void sale(){
if (number > 0){
System.out.println(Thread.currentThread().getName()+"卖出了"+(number--)+"剩余:"+number);
}
}
}
这个时候卖的就不对
使用Synchronized 在卖票的方法上添加关键字,保证同一时间只能有一个线程能执行买票的方法,保证线程安全。
//资源类
class Ticket{
//属性 方法
private int number = 30;
//方法是卖票添加synchronized关键字,本质相当于线程排队
public synchronized void sale(){
if (number > 0){
System.out.println(Thread.currentThread().getName()+"卖出了"+(number--)+"剩余:"+number);
}
}
}
这样就没有问题
看一下synchronized 的底层实现原理
基本概念锁的就是这俩,一个是对象一个是类 对象锁:用于同步方法或同步代码块。 类锁:用于静态同步方法或同步代码块。
监视器锁(Monitor Lock) synchronized关键字的底层实现基于监视器锁(Monitor Lock)。监视器锁是一种高级锁,它由JVM实现,主要用于实现互斥访问。
锁的状态 无锁状态:对象没有任何锁。 偏向锁:偏向锁是最轻量级的锁,适用于没有竞争的情况。 轻量级锁:适用于有少量竞争的情况。 重量级锁:适用于有大量竞争的情况。
锁升级 偏向锁 → 轻量级锁 → 重量级锁:随着竞争程度的增加,锁会逐步升级。
具体实现 4.1 偏向锁 定义:偏向锁是一种针对无竞争情况下的优化,它假定锁定的大部分情况是没有竞争的。 实现:JVM会在对象头中记录锁偏向的线程ID,如果该线程再次尝试获取锁,则可以直接使用。 4.2 轻量级锁 定义:轻量级锁使用线程栈上的Mark Word来实现。 实现:当线程尝试获取锁时,会将自己的线程ID写入对象头的Mark Word中。如果发现Mark Word中已经有其他线程ID,则尝试使用CAS(Compare and Swap)操作获取锁。 4.3 重量级锁 定义:重量级锁是基于操作系统互斥量实现的锁。 实现:当轻量级锁无法获取时,会升级为重量级锁,此时会挂起当前线程,并将对象头的Mark Word指向一个重量级锁对象(Monitor对象)。
监视器锁(Monitor)的实现 监视器锁(Monitor)的具体实现包括以下几个方面: 1、对象头(Object Header) Mark Word:存储对象的元数据,包括锁状态、线程ID等。 Class Metadata Address:指向类元数据的指针。 Array Length(数组对象才有):数组长度。
当线程尝试获取锁的时候会检查对象头的Mark Word,如果Mark Word为空或者说指向了当前的线程就会直接获取到锁,如果Mark Word使用了其他的线程就尝试使用CAS操作获取锁。
当线程离开同步方法或者同步代码块的时候就释放,然后就会更新对象头的Mark Word。
2、Monitor 对象 Entry List:等待进入锁的线程队列。 Wait Set:等待线程集合,当线程调用wait()方法时会被放入这里。 Owner:持有锁的线程。 Condition Objects:条件对象集合,用于实现wait()和notify()。
使用lock进行操作:
public class SaleTicketDemo02 {
public static void main(String[] args) {
//并发是多个线程操作同一个资源类,把资源类放到线程里
Ticket ticket = new Ticket();
//多个线程,使用lambda表达式
new Thread(()->{
for (int i = 1; i < 40; i++) {
ticket.sale();
}
},"A").start();
new Thread(()->{
for (int i = 1; i < 40; i++) {
ticket.sale();
}
},"B").start();
new Thread(()->{
for (int i = 1; i < 40; i++) {
ticket.sale();
}
},"C").start();
}
}
//资源类
class Ticket1{
//属性 方法
private int number = 30;
//方法是卖票
public void sale(){
//1 先创建lock实例
Lock lock = new ReentrantLock();
//2 加锁
lock.lock();
try {
//3 业务逻辑
if (number > 0){
System.out.println(Thread.currentThread().getName()+"卖出了"+(number--)+"剩余:"+number);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
// 4 释放锁
lock.unlock();
}
}
}
总结:
Synchronized 是Java内置的关键字,lock是接口有三种实现类;
Synchronized 无法判断锁的状态,它是自动挡的,一个线程锁用完了就给下一个了,而lock是手动的也能判断锁的状态,你lock了就必须在finally中手动unlock释放,你要是不放那就死锁。
Synchronized 场景,如果有一个线程获得了锁之后噶了,阻塞了,那么后面的线程就会一直等到你这个锁释放才行,就会一直等。而lock就不会,lock可以使用tryLock进行 尝试获取锁,如果锁不可用,则立即返回false,而不是让线程等待,还有带时间参数的tryLock方法,如果要不到锁就等待你传进去的参数时间,超过时间就返回false过期不候。
Synchronized 和lock都有可重入锁,Synchronized 是不能中断的它是非公平锁机制。lock的实例ReentrantLock也是默认用的非公平锁机制,但也能设置为公平锁,传个参数true就开启公平锁了
Synchronized 更适合少量的代码同步问题,lock适合锁定大量同步代码。
7、那么锁到底是什么?锁的又是什么?
问题小例子:
import java.util.concurrent.TimeUnit;
public class Test1 {
public static void main(String[] args) throws InterruptedException {
Phone phone = new Phone();
new Thread(phone::sendSms,"A").start();
new Thread(phone::call,"B").start();
}
}
class Phone{
public synchronized void sendSms(){
System.out.println("发消息");
}
public synchronized void call(){
System.out.println("打电话");
}
}
问题1:此时线程A和B哪一个会先执行? 答案是:谁先拿到锁谁就先走,A先拿到锁就是先发消息,再走B就是打电话 锁的对象是方法的调用者,也就是在main方法中new的phone对象
问题2:把发消息方法执行过程中让线程睡眠2秒谁又会先输出?
答案还是,A线程的发消息先输出,因为A先拿到锁,即使它执行过程中睡了一会,锁又没放,B拿不到锁就没法去执行打电话方法。
问题3:在phone类中再定义一个没有加锁的方法,然后让B线程中调用,谁先输出?
public class Test1 {
public static void main(String[] args) throws InterruptedException {
Phone phone = new Phone();
new Thread(phone::sendSms,"A").start();
new Thread(phone::hello,"B").start();
}
}
class Phone{
public synchronized void sendSms(){
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("发消息");
}
public synchronized void call(){
System.out.println("打电话");
}
public void hello(){
System.out.println("hello");
}
}
答案是:B线程的没加锁的方法先输出,它没加锁,当然能为所欲为,又不需要跟A抢这个锁,新赛道。
问题4:在main方法中再new一个新的phone对象,A线程调用的发消息方法有睡眠,B线程使用phone2对象调用打电话方法,谁先输出?
Phone phone = new Phone();
Phone phone2 = new Phone();
new Thread(phone::sendSms,"A").start();
new Thread(phone2::call,"B").start();
答案不用想的是打电话先走,因为A延时了2秒,但是注意:两个对象就是两把锁了,如果A线程调用的方法没有被睡眠,那么他俩谁先输出还真不一定
问题5:在两个方法前加上static变成静态方法,使用同一个phone对象调用谁会先执行?
public class Test1 {
public static void main(String[] args) throws InterruptedException {
Phone phone = new Phone();
//Phone phone2 = new Phone();
new Thread(() -> phone.sendSms(), "A").start();
new Thread(() -> phone.call(), "B").start();
}
}
class Phone{
public static synchronized void sendSms(){
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("发消息");
}
public static synchronized void call(){
System.out.println("打电话");
}
}
答案是A的发消息先执行,此时锁的对象锁的是Phone.class这个类,synchronized锁的对象是方法的调用者,static静态方法类一加载就有了,锁的是cLass。
问题6:同样的静态方法,使用两个对象来分别调用,谁先输出?
答案还是先发消息,无论你使用多少个对象调用不同方法,锁的是类,还没到具体的实例对象,俩对象的class类模板只有一个,锁的是class,实例再多都会被锁。
问题7:现在让发消息是静态同步方法,让打电话是普通的同步方法,同一个对象谁快?
public class Test1 {
public static void main(String[] args) throws InterruptedException {
Phone phone = new Phone();
//Phone phone2 = new Phone();
new Thread(() -> phone.sendSms(), "A").start();
new Thread(() -> phone.call(), "B").start();
}
}
class Phone{
public static synchronized void sendSms(){
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("发消息");
}
public synchronized void call(){
System.out.println("打电话");
}
}
答案是B线程的打电话先出,因为这个时候是两把锁,一个是类锁就是发消息那个,一个是锁的调用者B线程的打电话,你锁class关我具体实例什么事,把class锁了发消息等待2秒,B线程又不需要类锁,它是另一个锁就先输出了。
问题8:还是发消息是静态同步方法,让打电话是普通的同步方法,但是不同对象谁快?
答案不用想的,当然还是打电话先输出,还是一个道理,调用发消息锁定的是class,调用打电话锁的是调用者对象,不一样,当然不睡眠的打电话先输出了。
列出来解释一下:
这里锁的具体作用 静态方法 sendSms 的锁:
所有调用 sendSms 的线程都会锁定 Phone 类的类对象(即 Class 对象)。
如果线程 A 调用 sendSms,则其他线程调用 sendSms 也会被阻塞,直到线程 A 释放锁。
但是,其他线程调用 call 方法(即使是同一个实例)也不会被阻塞,因为 call 方法锁定的是实例对象,而不是类对象。
实例方法 call 的锁:
所有调用同一个 Phone 实例的 call 方法的线程都会锁定该实例。
如果线程 B 调用 phone.call(),则其他线程调用 phone.call() 也会被阻塞,直到线程 B 释放锁。
但是,如果存在另一个 Phone 实例 phone2,线程 C 调用 phone2.call() 不会被阻塞,因为它们锁定的是不同的实例对象。
小结
锁是什么?
在多线程编程中,锁是一种同步机制,用于控制多个线程对共享资源的访问。锁可以防止多个线程同时执行某些关键代码段,从而避免数据竞争和不一致的情况。
锁的是什么?
类或实例对象
8、生产者和消费者
线程之间也需要通讯,一个线程操作结束之后通知给另一个线程,另一个线程才能开始操作资源,线程交替的执行操作同一个资源。
8.1、使用synchronized实现
package com.zm.juc.produceConsumer;
import java.util.concurrent.locks.ReentrantLock;
/*
* 生产者消费者问题
* */
public class T1 {
public static void main(String[] args) {
Resource resource = new Resource();
new Thread(()->{
for (int i = 0; i < 5; i++) {
try {
resource.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"A").start();
new Thread(()->{
for (int i = 0; i < 5; i++) {
try {
resource.decrement();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"B").start();
}
}
//等待 业务 通知
class Resource{
private int num =0;
//+1
public synchronized void increment() throws InterruptedException {
if (num !=0) {
this.wait(); //线程等待,等到num是0为止
}
num++;
System.out.println(Thread.currentThread().getName()+"=>"+num);
this.notify();//通知其他的线程我加1操作结束了
}
//-1
public synchronized void decrement() throws InterruptedException {
if (num ==0) {
this.wait(); //线程等待,等到num是1为止
}
num--;
System.out.println(Thread.currentThread().getName()+"=>"+num);
this.notify();//通知其他的线程我-1操作结束了
}
}
线程AB交替执行
A对num加1操作了之后就唤醒其他线程也就是B,B开始对num-1,-1结束了之后开始唤醒其他线程也就是A,这样交替执行,那么再来俩线程C和D呢?
多执行几次就有离谱的出来了原来该C对0加1的,被D拿去-1了,你说D减一完了按照顺序的话应该是A来加个1,结果A没加一直到D减到-10才被唤醒,,,,这个就是虚假唤醒
怎么防止这个虚假唤醒?把判断条件从if换成while,用if的时候我们判断了当前条件i不等于0,然后线程wait会释放锁,并发情况之下其他的线程就能拿到锁就能操作数据了,假如另一个加一的线程拿到了锁if条件只会判断一次,就能执行下面的加1操作了,执行完了就通知其他人这个时候原本等待的那个线程拿到锁又加1,那数据就不是正确的了。但是使用while就不会,while会进行第二次的判断,条件不符合线程还是会被等待的。
例如:假设当前number==1,C获得了锁,由于number!=0,C wait。后面B获得了锁,将number变为0,A获得了锁,然后number变为1,之后C醒来,不会判断当前number的值,那C就理所当然的往下走加1了,那number就=2了
换成while就正常了按照ABCD轮流加减
8.2、使用Lock实现
lock的实例有一个叫Condition newCondition()的,它有一个方法是await()
是接口Condition中的方法,它能够代替原来的wait,它的另一个方法signal()
就是notify的代替。
Condition
取代了对象监视器方法的使用。
使用lock演示
package com.zm.juc.produceConsumer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/*
* 生产者消费者问题
* */
public class T2 {
public static void main(String[] args) {
Data data = new Data();
new Thread(()->{
for (int i = 0; i < 10; i++) {
try {
data.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"A").start();
new Thread(()->{
for (int i = 0; i < 10; i++) {
try {
data.decrement();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"B").start();
new Thread(()->{
for (int i = 0; i < 10; i++) {
try {
data.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"C").start();
new Thread(()->{
for (int i = 0; i < 10; i++) {
try {
data.decrement();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"D").start();
}
}
//等待 业务 通知
class Data {
private int num =0;
Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();
/* Condition conditionB = lock.newCondition();
Condition conditionC = lock.newCondition();*/
//+1
public void increment() throws InterruptedException {
lock.lock();
try {
while (num !=0) {
condition.await(); //线程等待,等到num是0为止
}
num++;
System.out.println(Thread.currentThread().getName()+"=>"+num);
condition.signalAll();//通知其他的线程我加1操作结束了
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
//-1
public void decrement() throws InterruptedException {
lock.lock();
try {
while (num == 0) {
condition.await(); //线程等待,等到num是0为止
}
num--;
System.out.println(Thread.currentThread().getName() + "=>" + num);
condition.signalAll();//通知其他的线程我加1操作结束了
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
结果虽然没错,但是ABCD不是按照顺序生产消费的,我们想A->B->C->D,可以使用多个条件对象condition
示例:
package com.zm.juc.produceConsumer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/*
* 生产者消费者问题
* */
public class T3 {
public static void main(String[] args) {
Data1 data = new Data1();
new Thread(()->{
for (int i = 0; i < 10; i++) {
data.printA();
}
},"A").start();
new Thread(()->{
for (int i = 0; i < 10; i++) {
data.printB();
}
},"B").start();
new Thread(()->{
for (int i = 0; i < 10; i++) {
data.printC();
}
},"C").start();
new Thread(()->{
for (int i = 0; i < 10; i++) {
data.printD();
}
},"D").start();
}
}
//等待 业务 通知
class Data1 {
//1A 2B 3C 4D
private int num =1;
Lock lock = new ReentrantLock();
Condition conditionA = lock.newCondition();
Condition conditionB = lock.newCondition();
Condition conditionC = lock.newCondition();
Condition conditionD = lock.newCondition();
public void printA(){
lock.lock();
try {
while (num !=1){
conditionA.await();
}
num = 2;
System.out.println(Thread.currentThread().getName()+"==AAA");
//精准指定唤醒B
conditionB.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void printB(){
lock.lock();
try {
while (num !=2){
conditionB.await();
}
num = 3;
System.out.println(Thread.currentThread().getName()+"==BBB");
//精准指定唤醒C
conditionC.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void printC(){
lock.lock();
try {
while (num !=3){
conditionC.await();
}
num = 4;
System.out.println(Thread.currentThread().getName()+"==CCC");
//精准指定唤醒D
conditionD.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void printD(){
lock.lock();
try {
while (num !=4){
conditionD.await();
}
num = 1;
System.out.println(Thread.currentThread().getName()+"==DDD");
//精准指定唤醒A
conditionA.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
按照ABCD顺序进行
9、集合类不安全
9.1、List不安全
先使用最常用的ArrayList
public class ListIsNotSecure {
public static void main(String[] args) {
//先创建一个最常用的ArrayList,在高并发情况下它是不安全的
ArrayList<String> list = new ArrayList<>();
//启动10个线程
for (int i = 1; i <=10; i++) {
new Thread(()->{
//生成uuid后转字符串截取前5个字符添加到list列表中
list.add(UUID.randomUUID().toString().substring(0,5));
System.out.println(list);
},String.valueOf(i));
}
}
}
结果会报错
会发生并发修改异常,表示在迭代过程中坚持到了结构上的更改(就是我们这里的遍历中又add),一般就是在遍历集合(ArrayList或者HashMap)的同时又进行了集合修改,为了保证迭代过程中数据的一致性抛出这个异常,想要解决可以使用并发集合类。
1 把ArrayList换成Vector
public class ListIsNotSecure {
public static void main(String[] args) {
//先创建一个最常用的ArrayList,在高并发情况下它是不安全的
//ArrayList<String> list = new ArrayList<>();
Vector<String> list = new Vector<>();
//启动10个线程
for (int i = 1; i <=10; i++) {
new Thread(()->{
//生成uuid后转字符串截取前5个字符添加到list列表中
list.add(UUID.randomUUID().toString().substring(0,5));
System.out.println(list);
},String.valueOf(i)).start();
}
}
}
这样就没有出现并发修改异常,是因为Vector集合的add方法是使用了synchronized,也就是加锁,多个线程在同一时刻只能有一个线程能修改集合,不会照成数据不一致,但是由于synchronized是独占锁,这会导致性能下降
2 将ArrayList转化为synchronizedList
public class ListIsNotSecure {
public static void main(String[] args) {
//先创建一个最常用的ArrayList,在高并发情况下它是不安全的
// List<String> list = new ArrayList<>();
//1 List<String> list = new Vector<>();
List<String> list = Collections.synchronizedList(new ArrayList<>());
//启动10个线程
for (int i = 1; i <=10; i++) {
new Thread(()->{
//生成uuid后转字符串截取前5个字符添加到list列表中
list.add(UUID.randomUUID().toString().substring(0,5));
System.out.println(list);
},String.valueOf(i)).start();
}
}
}
这样一转换也是线程安全的,是因为它有内部加锁机制,Collections.synchronizedList 方法返回一个包装了原始 ArrayList 的同步列表。这个同步列表在每个修改方法调用时都会自动加锁,确保同一时间只有一个线程可以修改列表。这种方式虽然保证了线程安全,但由于每次修改还是都需要获取独占锁,可能会导致性能瓶颈。
3 使用更高效的并发集合 CopyOnWriteArrayList写入时复制
public class ListIsNotSecure {
public static void main(String[] args) {
//先创建一个最常用的ArrayList,在高并发情况下它是不安全的
// List<String> list = new ArrayList<>();
//1 List<String> list = new Vector<>();
//2 List<String> list = Collections.synchronizedList(new ArrayList<>());
List<String> list = new CopyOnWriteArrayList<>();
//启动10个线程
for (int i = 1; i <=10; i++) {
new Thread(()->{
//生成uuid后转字符串截取前5个字符添加到list列表中
list.add(UUID.randomUUID().toString().substring(0,5));
System.out.println(list);
},String.valueOf(i)).start();
}
}
}
这样不光是线程安全的,而且效率是更高的,因为它的add方法用的是lock锁,为什么效率会更高?
写时复制(Copy-on-Write)机制:
每次添加元素时,CopyOnWriteArrayList 不直接修改现有的数组,而是创建一个新的数组并将新元素添加进去。
这意味着读操作不会受到写操作的影响,因为读操作总是读取旧的数组。
读操作不加锁:
读操作(比如 get)不需要获取锁,因为它们总是从旧的数组中读取数据。
这使得读操作非常高效,不会因为写操作而被阻塞。
写操作加锁:
写操作(如 add)确实需要加锁,但这种锁只影响写操作本身。
写操作的开销主要在于创建新的数组,而不是锁本身。
9.2、HashSet不安全
普通的HashSet也是线程不安全的
public class SetIsNotSecure {
public static void main(String[] args) {
Set<String> list = new HashSet<>();
for (int i = 1; i <= 10; i++) {
new Thread(()->{
list.add(UUID.randomUUID().toString().substring(0,5));
System.out.println(list);
},String.valueOf(i)).start();
}
}
}
也会出现并发修改异常
我们new的HashSet其实new的是一个HashMap
这里调用的add方法其实是map的put,list中的元素作为键值,PRESENT是常量对象
如果插入数据时这个键值下没有数据则 map.put(e, PRESENT) 返回 null,表示插入成功。
如果插入数据时这个键值下有数据则map.put(e, PRESENT) 返回PRESENT也就是旧数据。
根据上面的List不安全能够知道,想要线程安全的Set要么转换一下要么使用线程安全的集合。
Set<String> list = Collections.synchronizedSet(new HashSet<>());
//或者
Set<String> list = new CopyOnWriteArraySet<>();
看到CopyOnWriteArraySet里面其实还是new了一个CopyOnWriteArrayList和上面的情况是一样的了。
9.3、Map
原始HashMap也是线程不安全的,new HashMap<>()等价于new HashMap<>(16,0.75);
进入HashMap查看一下构造方法
public HashMap(Map<? extends K, ? extends V> m) {
this.loadFactor = DEFAULT_LOAD_FACTOR;//默认加载因子为0.75f
putMapEntries(m, false);//批量添加条目
}
加载因子解释:
加载因子是衡量哈希表在何时进行扩容操作的标准,具体而言,当哈希表中的元素数量达到容量乘以加载因子时,就会触发扩容。加载因子的值通常在0到1之间,表示哈希表的填充程度。
为什么加载因子默认是0.75?
加载因子的选择是一个权衡。较小的加载因子会导致哈希表更快地达到扩容条件,但可能会浪费空间。较大的加载因子会减少扩容的频率,但可能导致更多的冲突。0.75是在效率和空间利用之间取得的平衡,通常能够提供较好的性能。
MIN_TREEIFY_CAPACITY常量:
MIN_TREEIFY_CAPACITY默认是64,它的作用是确定在什么情况下将链表转换为红黑树。具体来说,当某个桶(bucket)中的链表长度达到一定阈值,并且整个哈希表的容量大于或等于 MIN_TREEIFY_CAPACITY 时,链表会被转换为红黑树。
为什么需要使用这个常量?
性能优化:当链表长度较长时,查找操作的时间复杂度会退化为 O(n),这会影响性能。红黑树的时间复杂度为 O(log n),因此在链表长度较长时,转换为红黑树可以显著提高性能。
内存使用:红黑树比链表占用更多的内存,因此在哈希表容量较小时,不应该轻易转换为红黑树。
MIN_TREEIFY_CAPACITY 确保只有在哈希表容量较大时才会进行转换,以避免不必要的内存开销。
TREEIFY_THRESHOLD常量
TREEIFY_THRESHOLD默认是8,它代表当一个桶中的链表长度达到TREEIFY_THRESHOLD时,就会开始检查整个哈希表的容量是否大于等于上面的MIN_TREEIFY_CAPACITY,如果是,就把链表转换成红黑树。
UNTREEIFY_THRESHOLD:在扩容后,如果链表长度小于此阈值,则将红黑树转换回链表,默认值为 6
public class MapIsNotSecure {
public static void main(String[] args) {
Map<String, String> map = new HashMap<>();
for (int i = 1; i <= 10; i++) {
new Thread(()->{
map.put(Thread.currentThread().getName().toString(), UUID.randomUUID().toString().substring(0,5));
System.out.println(map);
},String.valueOf(i)).start();
}
}
}
还是会发生并发修改异常
使用ConcurrentHashMap
上面的并发修改问题解决方法就是使用ConcurrentHashMap,为什么ConcurrentHashMap能够满足并发修改?
使用CAS + Synchronized
在 Java 8 及之后的版本中,ConcurrentHashMap 的实现进行了改进,不再使用分段锁。而是采用了更细粒度的锁机制,结合了 CAS(Compare and Swap)和 synchronized 关键字。
对于大多数操作(如 put 和 get),ConcurrentHashMap 使用 CAS 操作来保证原子性。
对于一些需要更复杂同步的场景(如扩容),则使用 synchronized 来保证线程安全
对于写操作:
当一个线程尝试插入一个新键值对时,它会首先计算键的哈希值,找到对应的桶(bucket)。
如果桶中只有一个节点或者没有节点,线程可以直接进行 CAS 操作插入新节点。
如果桶中有多个节点形成链表或红黑树,线程会锁定相应的节点进行修改。
对于读操作:
读操作是无锁的,线程可以直接读取数据,因为 ConcurrentHashMap 保证了读取操作的一致性和可见性。
10、Callable
Callable接口类似于Runnable ,因为它们都是为其实例可能由另一个线程执行的类设计的。 然而,Runnable不返回结果,也不能抛出被检查的异常。 实现了Callable接口就得在实现类的泛型上标注返回类型,实现方法call返回值就得是标注的类型。
我们想要使用callable在线程中启动它是不能直接启动的,需要借助Runnable接口中的实现类FutureTask来实现。
FutureTask类中有俩构造方法,分别用于实现运行指定的Callable
或者Runnable
我们就是通过FutureTask(Callable<V> callable)
来执行给定的 Callable
为什么Thread类的start方法能直接启动实现了Runnable接口的对象却不能直接启动Callable的?
Callable设计的目的是为了提供能够返回结果的线程执行方式,Callable的call方法的返回值会被Future给捕获到,这样我们就能在主线程中获取到异步执行的结果,Thread本身是不能处理返回值的。当你创建了一个FutureTask实例并传入了Callable对象后,FutureTask会负责调用Callable的call方法并管结果,由于FutureTask实现了Runnable接口,所以能被Thread启动。
这种任务的包装就是“适配器模式”
适配器模式是一种设计模式,它允许现有类的接口与另一个不兼容的接口一起工作。
FutureTask相当于Callable合Thread之间的适配器,FutureTask把Callable的功能转化为了Runnable接口的形式。FutureTask实现了RunnableFuture接口
public class CallableTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
MyThread myThread = new MyThread();
FutureTask<String> futureTask = new FutureTask<>(myThread);//适配类
new Thread(futureTask,"A").start();
//获取到返回结果,可能会被阻塞,因为它要等返回结果,如果处理的是一个非常耗时间的就很容易阻塞
String s = futureTask.get();
System.out.println(s);
}
}
//使用callable接口
class MyThread implements Callable<String> {
@Override
public String call() throws Exception {
System.out.println("call方法:");
return "callable";
}
}
如果我再启动一个B线程,还是使用的同一个futureTask,call会被执行几次?
MyThread myThread = new MyThread();
FutureTask<String> futureTask = new FutureTask<>(myThread);//适配类
new Thread(futureTask,"A").start();
new Thread(futureTask,"B").start();
//获取到返回结果,可能会被阻塞,因为它要等返回结果,如果处理的是一个非常耗时间的就很容易阻塞
String s = futureTask.get();
System.out.println(s);
结果是只执行1次,因为FutureTask 设计为只能被执行一次。一旦开始执行,再次尝试启动同一个 FutureTask 会导致 IllegalStateException。为什么?看一下FutureTask里面的方法。
我们创建FutureTask对象的时候,它的构造器方法会设置两个变量callable和状态量state
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}
state的几个状态:
NEW:初始状态,表示任务尚未开始执行。
RUNNING:任务正在执行中。
CANCELLED:任务已被取消。
DONE:任务已完成(无论成功还是失败)。
NEW代表新建状态,在Thread启动的时候会运行FutureTask的run方法,这个方法会检查state状态是否为NEW,线程第一次调用过了run方法了,state状态已经不是NEW了,所以就不会再执行第二次了。
public void run() {
//检查当前状态是否为NEW,如果不是NEW说明任务已经启动或者已经完成就直接返回
//UNSAFE.compareAndSwapObject是使用Unsafe类的原子操作来设置当前线程为runner,保证只有一个线程能够成为任务的实际执行者
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();//调用传进来的callable的call方法
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)//如果状态ran是true就设置返回结果,最后能通过futureTask对象get出来
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
11、常用辅助类
11.1、CountDownLatch
CountDownLatch是一个并发工具类,它允许一个或者多个线程等待直到其他线程完成了一定数量的操作,CountDownLatch通过构造函数设置一个计数器,每次调用一个countDown方法时,计数器减一。所有调用了await()方法的线程会阻塞,直到计数器归零。
使用场景
某个线程需要在其他n个线程执行完毕后再向下执行
下面的例子中,main主线程作为最后一个线程执行,需要等待其他8个线程执行完再走。
多个线程并行执行同一个任务,提高响应速度
public static void main(String[] args) throws InterruptedException {
//总数设置为8
CountDownLatch countDownLatch = new CountDownLatch(8);
for (int i = 1; i <= 8; i++) {
new Thread(()->{
countDownLatch.countDown();
System.out.println("线程:"+Thread.currentThread().getName()+"减一");
},String.valueOf(i)).start();
}
countDownLatch.await();
System.out.println("关门");
}
等到前面的8个线程执行完了,CountDownLatch计数器归零了,主线程countDownLatch.await();才会被唤醒执行。
11.2、CyclicBarrier
和CountDownLatch对应,是做加法,它可以让一组线程达到一个同步点之后再同时执行,可以用于多线程计算数据,最后合并计算结果的场景。
CyclicBarrier的特点:
可以重复使用:CyclicBarrier可以被重复使用,每次重复使用都需要重新调用reset()方法进行重置。
等待线程数可变:在创建CyclicBarrier的时候可以指定一个等待线程数,但是在等待的时候,如果有新的线程想要加入等待,可以通过await()方法加入,这使得CyclicBarrier的等待线程数可以动态变化。
示例集齐7颗龙珠召唤神龙:
创建一个CyclicBarrier对象,创建的时候由于CyclicBarrier的一个构造方法第二个参数是Runnable函数式接口,所以使用lambda表达式来输出我们集齐龙族的动作:输出召唤神龙
public static void main(String[] args) {
CyclicBarrier cyclicBarrier = new CyclicBarrier(7, () -> {
System.out.println("集齐了7颗龙珠,召唤神龙成功!");
});
for (int i = 1; i <= 7; i++) {
int temp = i;//由于在创建线程中无法直接获取到外面的循环变量i,所以在这里定义一个变量temp获取i
new Thread(()->{
System.out.println("收集了第:"+temp+"颗龙族");
try {
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
},String.valueOf(i)).start();
}
}
每一个线程在执行的时候都等待,等到有7个线程达到计数器的数量时就可以执行最后的结果
11.3、Semaphore
Semaphore是信号量,又叫信号灯,它能在多线程环境下协调各个线程,能够使有限的资源能被正确合理的使用。信号量在创建的初始设置许可值,这个值就代表同时间内能有多少线程能够操作资源。
Semaphore构造方法:
public Semaphore(int permits) {
sync = new NonfairSync(permits);//使用了非公平模式管理信号量,不保证获得信号量的顺序
}
使用非公平模式是为了在并发环境下有更好的吞吐量减少线程竞争的成本,性能更好。
public static void main(String[] args) {
//限制线程数量为3,3个停车位
Semaphore semaphore = new Semaphore(3);
//6个线程6个车
for (int i = 1; i <= 6; i++) {
new Thread(()->{
try {
//抢车位,有空闲就能执行
semaphore.acquire();
System.out.println("第:"+Thread.currentThread().getName()+"号车正在停车<--");
TimeUnit.SECONDS.sleep(2);//停车2秒
System.out.println("第:"+Thread.currentThread().getName()+"号车离开车位-->");
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
semaphore.release();//释放位置
}
},String.valueOf(i)).start();
}
}
semaphore.acquire();获得信号量,如果没有就会等待
semaphore.release()释放信号量,信号量会恢复1个,那等待的线程就能拿到
这样semaphore就能多个共享资源互斥使用,并发限流,控制最大的线程数。
12、ReadWriteLock
读写锁是允许多个线程同时读取共享资源,但是在写的时候只能是一个线程独占锁,适合于读多写少的场景。
ReentrantReadWriteLock类实现的层次结构图
ReentrantReadWriteLock实现了ReadWriteLock接口,并实现了接口中的两个方法
public interface ReadWriteLock {
/**
* Returns the lock used for reading.
*
* @return the lock used for reading
*/
Lock readLock();
/**
* Returns the lock used for writing.
*
* @return the lock used for writing
*/
Lock writeLock();
}
一个是readLock(),一个是writeLock()对应着读锁和写锁,从表面上看读写锁是两把锁,但是他们其实是同一把锁的两个试图而已,线程分为两类:读线程和写线程。读和读不互斥,读和写互斥,写和写互斥。
从ReentrantReadWriteLock的构造方法中能看到读写锁使用的是同一个sync对象,sync会像互斥锁一样,分成非公平锁和公平锁策略。
public ReentrantReadWriteLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();//true就是公平,false就是非公平
readerLock = new ReadLock(this);
writerLock = new WriteLock(this);
}
12.1、state变量,锁的状态
跟互斥锁一样,读写锁也有state表示锁的状态,在可重入锁ReentrantLock 中state同步状态表示的是一个线程重复拿到锁的次数,而读写锁需要表示多个读线程和一个写线程的状态,对于只有一个int类型的同步状态,它是通过“按位切割”的方法把一个int类型的变量分成高16位和低16位两部分,高16位表示读,低16位表示写。
那么为什么只使用一个int变量表示读写状态而不是用两个int变量分别表示读写呢?
因为CAS原子性操作保证的是单个变量的原子性,但是对多个变量不适用。
当state=0的时候表示没有读线程也没有写线程,当state !=0的时候要么是读线程拿到了锁要么是写线程拿到了锁,然后再进一步确认到底是读还是写。
那么读写锁又是如何快速确定读和写各自的状态的呢?答案是通过位运算。
假如当前同步状态值为 X,写状态等于 X & 0x0000FFFF(将高16位全部抹去),读状态等于 X >>> 16 (无符号补0右移16位)。
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 6317671515068378041L;
static final int SHARED_SHIFT = 16;
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
/** Returns the number of shared holds represented in count */
//获取读锁数量如果大于0表示当前是读线程获取锁
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
/** Returns the number of exclusive holds represented in count */
//获取写入锁被重入次数如果大于0表示当前是写线程持有锁
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
...
}
12.2、读写锁的获取与释放
写锁的获取
写锁其实就是一个可重入的排它锁,如果当前线程已经拿到过锁了就增加锁的状态state重入次数
public static class WriteLock implements Lock, java.io.Serializable {
private static final long serialVersionUID = -4992448646407690164L;
private final Sync sync;
/**
* Constructor for use by subclasses
*
* @param lock the outer lock object
* @throws NullPointerException if the lock is null
*/
protected WriteLock(ReentrantReadWriteLock lock) {
sync = lock.sync;
}
/**
* 获取写锁操作
如果读锁和写锁都没有被一个线程持有,则获取写锁并立即返回,将写锁持有计数设置为1。
如果当前线程已经持有写锁,那么持有计数将增加1,并且该方法立即返回。
如果该锁由另一个线程持有,那么当前线程将出于线程调度目的而被阻塞进入等待,直到获得写锁,此时写锁持有计数被设置为1
*/
public void lock() {
sync.acquire(1);//这里是sync对象调用了实现方法acquire
}
// Sync 实现的 AbstractQueuedSynchronizer 中获取独占锁的方法
protected final boolean tryAcquire(int acquires) {
/*
* Walkthrough:
* 1. 如果读计数非零或写计数非零且所有者是不同的线程,则失败。
* 2. 如果计数达到最大值,则失败。(这只有在count已经是非零的情况下才会发生。)
* 3.否则,如果该线程是可重入获取或队列策略允许,则该线程有资格获得锁。如果是,更新状态并设置所有者。
*/
Thread current = Thread.currentThread();//获取当前执行的线程
int c = getState();//获取当前同步状态
int w = exclusiveCount(c);//获取写入锁被重入次数
if (c != 0) {//同步状态不等于0说明锁已经被占有了
// (Note: if c != 0 and w == 0 then shared count != 0)
//c !=0 and w==0 当前锁被占用而且写入锁的重入次数是0,说明当前是读线程正在持有锁,直接返回false,为了保证写对读可见性,当前的写线程必须阻塞
//如果w!=0说明存在写锁,判断当前线程不是已获取写锁的线程,获取锁失败,当前线程被阻塞
if (w == 0 || current != getExclusiveOwnerThread())
return false;
//如果低 16位满了表示超过了获取锁数量的最大值抛出异常
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// Reentrant acquire
setState(c + acquires);
return true;
}
if (writerShouldBlock() ||//判断前面是否存在等待的写线程
!compareAndSetState(c, c + acquires))
return false;
setExclusiveOwnerThread(current);//如果抢锁成功,把持有锁的线程更新为当前线程
return true;
}
if(c!=0) and w == 0,说明当前一定是读线程拿着锁,写锁一定拿不到,返回false当前线程阻塞。
if(c!=0) and w != 0,说明当前一定是写线程拿着锁,执行current!=getExclusiveOwnerThread()的判断看是不是当前线程获得的,发现ownerThread不是自己,返回false当前线程阻塞。
c!=0,w!=0,且current=getExclusiveOwnerThread()表示就是自己(当前线程)拿的写锁,才会走到if(w+exclusiveCount(acquires)> MAX_COUNT)。判断重入次数,重入次数超过最大值,抛出异常。因为是用state的低16位保存写锁重入次数的,所以MAX_COUNT是216。如果超出这个值,会写到读锁的高16位上。为了避免这种情形,这里做了一个检测。当然,一般不可能重入这么多次。
if(c==0),说明当前既没有读线程,也没有写线程持有该锁。可以通过CAS操作开抢了。
公平实现和非公平实现几乎一模一样,只是 writerShouldBlock() 分别被FairSync 和NonfairSync实现,公平锁会判断是否有在等待的写锁,而非公平锁则会直接抢锁。
写锁的释放
写锁的释放与 Reentrant 的释放过程基本类似,每次释放均减少写状态,当写状态为0时表示写锁已被释放,从而等待的读写线程能够继续访问读写锁,同时写线程的修改之后的内容对后续的读线程可见。
protected final boolean tryRelease(int releases) {
if (!isHeldExclusively())//如果当前线程没有获取到锁就抛出异常
throw new IllegalMonitorStateException();
int nextc = getState() - releases;//计算释放后的锁状态
boolean free = exclusiveCount(nextc) == 0;//当前可重入次数为0就是完全释放
if (free)
setExclusiveOwnerThread(null);//清空独占锁所有者
setState(nextc);//更新锁的状态,因为写锁为排它锁,不会存在其他线程占有写锁或读锁,所以不用使用 CAS操作
return free;
}
读锁的获取
读锁也支持可重入,支持被多个线程同时获取,如果没有写线程占用锁,那么读锁总是能够获取到,拿到锁之后要做的就只是增加读状态。如果当前有写锁占用,则读线程会被阻塞。读状态是所有线程获取读锁的总和,而每一个线程各自获取读锁的次数保存在ThreadLocal里由自己维护。
protected final int tryAcquireShared(int unused) {
/*
* Walkthrough:
* 1. 如果写锁由另一个线程持有,则获取失败。
2.否则,当前线程有资格获取锁,根据队列策略判断是否应阻塞。如果不应阻塞,则尝试通过 CAS 更新状态并增加 计数。
注意:此步骤不检查重入获取,以避免在更常见的非重入情况下检查持有计数。
* 3.如果第 2 步失败(因为线程显然没有资格或 CAS 失败或计数溢出),则转到带有完整重试循环的版本。
*/
Thread current = Thread.currentThread();//获取当前执行的线程
int c = getState();//获取当前同步状态
if (exclusiveCount(c) != 0 && // 1. 如果写锁被其他线程持有,则失败,也就是重入次数不为0并且持有锁的线程不是当前线程
getExclusiveOwnerThread() != current)
return -1;
int r = sharedCount(c);//读取锁的数量
// 2. 根据队列策略判断是否应阻塞
if (!readerShouldBlock() && // 如果不应阻塞
r < MAX_COUNT && // 而且共享锁计数未达到最大值
compareAndSetState(c, c + SHARED_UNIT)) { // 尝试通过 CAS 更新状态
if (r == 0) {//如果读取锁数量为0也就是当前线程是第一个读线程
firstReader = current;//设置第一个读者为当前线程
firstReaderHoldCount = 1;//设置第一个读者的计数为1
} else if (firstReader == current) {//如果当前线程就是第一读者
firstReaderHoldCount++;//就增加第一读者计数
} else {
//如果既不是第一读者也不是第一个获取读锁的线程就获取缓存的HoldCounter
HoldCounter rh = cachedHoldCounter;
//如果 rh 为空或者 rh 的线程 ID 不等于当前线程的 ID,则需要获取一个新的 HoldCounter。
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)//如果 HoldCounter 的计数为零,则需要将其设置回存储中。
readHolds.set(rh);//将 HoldCounter 设置回 readHolds。
rh.count++;//增加 HoldCounter 的计数。
}
return 1;
}
return fullTryAcquireShared(current);
}
解释:HoldCounter 是一个用于跟踪线程持有共享锁次数的对象。在读写锁的实现中,HoldCounter 通常用来管理多个线程对共享资源的访问。具体来说,HoldCounter 用于记录每个线程持有的共享锁次数。
读锁的释放
因为读锁是共享锁,所以在释放读锁时是通过 CAS + 自旋 的方式不停的更改锁状态直到更新成功
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread(); // 获取当前线程
if (firstReader == current) { // 如果当前线程是第一个读者
// assert firstReaderHoldCount > 0; // 断言第一个读者的计数大于 0
if (firstReaderHoldCount == 1) { // 如果第一个读者的计数为 1
firstReader = null; // 清空第一个读者
} else { // 如果第一个读者的计数大于 1
firstReaderHoldCount--; // 减少第一个读者的计数
}
} else { // 如果当前线程不是第一个读者
HoldCounter rh = cachedHoldCounter; // 获取缓存的 HoldCounter
if (rh == null || rh.tid != getThreadId(current)) { // 如果 HoldCounter 为空或线程 ID 不匹配
rh = readHolds.get(current); // 从 readHolds 中获取 HoldCounter
}
int count = rh.count; // 获取 HoldCounter 的计数
if (count <= 1) { // 如果计数小于等于 1
readHolds.remove(current); // 从 readHolds 中移除 HoldCounter
if (count <= 0) { // 如果计数小于等于 0
throw unmatchedUnlockException(); // 抛出异常,表示解锁次数不匹配
}
}
--rh.count; // 减少 HoldCounter 的计数
}
for (;;) { // 通过自旋 + CAS 的方式更新读锁状态
int c = getState(); // 获取当前锁状态
int nextc = c - SHARED_UNIT; // 计算释放后的锁状态
if (compareAndSetState(c, nextc)) { // 尝试通过 CAS 更新锁状态
// 释放读锁对其他读者没有影响,
// 但如果读锁和写锁现在都为空,则可能允许等待的写线程继续执行
return nextc == 0; // 如果锁状态为 0,则返回 true
}
}
}
12.3、demo
测试demo无锁版乱套:
public class ReadWriteLockDemo {
public static void main(String[] args) {
MyCache myCache = new MyCache();
//5个线程写入
for (int i = 1; i <= 5; i++) {
//中间变量
int temp = i;
new Thread(()->{
myCache.put(String.valueOf(temp),temp);
},String.valueOf(i)).start();
}
//5个线程读取
for (int i = 1; i <= 5; i++) {
//中间变量
int temp = i;
new Thread(()->{
myCache.get(String.valueOf(temp));
},String.valueOf(i)).start();
}
}
}
//自定义缓存
class MyCache{
//volatile关键字确保了多线程环境下的可见性和一定程度的原子性,适用于不需要同步就能保证线程安全的场景。
private volatile Map<String,Object> map = new HashMap<String,Object>();
//写入
public void put(String key,Object value){
System.out.println("线程:"+Thread.currentThread().getName()+"写入key:"+key+"值为:"+value);
map.put(key, value);
System.out.println("线程:"+Thread.currentThread().getName()+"写入完成");
}
//读取
public Object get(String key){
System.out.println("线程:"+Thread.currentThread().getName()+"读取key:"+key);
Object o = map.get(key);
System.out.println("线程:"+Thread.currentThread().getName()+"读取完成");
return o;
}
}
运行之后发现就很乱了,线程1写入之间2345全来写了,没有做到同时间写入只能有一个线程来写。
测试demo加锁版:
/*
* 独占锁(排它锁)(写锁)一次只能有一个线程写入
* 共享锁(读锁)允许多线程读取,同时占有
* */
public class ReadWriteLockDemo {
public static void main(String[] args) {
// MyCache myCache = new MyCache();
MyCacheLock myCache = new MyCacheLock();
//5个线程写入
for (int i = 1; i <= 5; i++) {
//中间变量
int temp = i;
new Thread(()->{
myCache.put(String.valueOf(temp),temp);
},String.valueOf(i)).start();
}
//5个线程读取
for (int i = 1; i <= 5; i++) {
//中间变量
int temp = i;
new Thread(()->{
myCache.get(String.valueOf(temp));
},String.valueOf(i)).start();
}
}
}
//自定义缓存读写锁版
class MyCacheLock{
//volatile关键字确保了多线程环境下的可见性和一定程度的原子性,适用于不需要同步就能保证线程安全的场景。
private volatile Map<String,Object> map = new HashMap<String,Object>();
//创建读写锁实例更加细粒度的控制读写
private ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
//写入时只能有一个线程能够写入
public void put(String key,Object value){
readWriteLock.writeLock().lock();
try {
System.out.println("线程:"+Thread.currentThread().getName()+"写入key:"+key+"值为:"+value);
map.put(key, value);
System.out.println("线程:"+Thread.currentThread().getName()+"写入完成");
} catch (Exception e) {
e.printStackTrace();
} finally {
readWriteLock.writeLock().unlock();
}
}
//读取允许多线程读取
public Object get(String key){
readWriteLock.readLock().lock();
Object o = null;
try {
System.out.println("线程:"+Thread.currentThread().getName()+"读取key:"+key);
o = map.get(key);
System.out.println("线程:"+Thread.currentThread().getName()+"读取完成");
} catch (Exception e) {
e.printStackTrace();
} finally {
readWriteLock.readLock().unlock();
}
return o;
}
}
这次的写入就是想要的同时间只有一个线程能写入了,读取还是能够多个线程同时读取。
13、阻塞队列
阻塞队列(Blocking Queue)是一种支持两个附加操作的队列:在队列为满时,put 操作将会阻塞;在队列为空时,take 操作将会阻塞。这种数据结构常用于多线程编程中,特别是在生产者-消费者模式的应用场景下,可以有效地控制生产者和消费者的同步问题,避免资源竞争和数据不一致的问题。
并发包中有很多阻塞队列的实现类
ArrayBlockingQueue:基于数组结构的有界阻塞队列。
LinkedBlockingQueue:基于链表结构的阻塞队列,可以选择是否为有界队列。
PriorityBlockingQueue:具有优先级的无界阻塞队列。
SynchronousQueue:不存储元素的阻塞队列,每个 put 操作必须等待一个 take 操作,反之亦然。
DelayQueue:使用延期时间较短的对象会被优先返回。
四组API
方法 | 抛出异常 | 不抛出异常还有返回值 | 阻塞 | 超时等待 |
---|---|---|---|---|
添加队列 | add | offer | put | offer(元素,时间,时间单位) |
移除队列 | remove | poll | take | poll(时间,时间单位) |
判断队首 | element | peek | -- | -- |
1、使用add添加队列会报队列已满异常
public class BlockingQueueAPI {
public static void main(String[] args) {
BlockingQueue lockingQueue = new ArrayBlockingQueue<>(3);
//会出现异常的
System.out.println(lockingQueue.add("a"));
System.out.println(lockingQueue.add("b"));
System.out.println(lockingQueue.add("c"));
System.out.println(lockingQueue.element());//检查队首元素,如果队首元素没有,也会抛异常
//多了一个队列里没位置了就会抛出异常
System.out.println(lockingQueue.add("d"));
}
}
添加时超出队列大小会抛出队列满的异常
移除空队列会抛出无元素异常
2、不想让它报异常,给我一个返回值满了就插入失败就返回false,队列空了取元素就返回null,那就使用offer方法和poll方法
//不抛出异常有返回值的offer和poll
public static void t2(){
BlockingQueue lockingQueue = new ArrayBlockingQueue<>(3);
System.out.println(lockingQueue.offer("a"));
System.out.println(lockingQueue.offer("b"));
System.out.println(lockingQueue.offer("c"));
//System.out.println(lockingQueue.offer("d"));
System.out.println(lockingQueue.poll());
System.out.println(lockingQueue.poll());
System.out.println(lockingQueue.poll());
System.out.println(lockingQueue.peek());//检查队首元素,如果没有就返回null
}
插入不进去了就返回false,拿不出来了就返回null,
3、即不想抛出异常又不想返回结果,就让线程等着,阻塞,使用put会一直阻塞知道有闲的位置空出来再入队put对应着使用take取出队列元素
//一直阻塞
public static void t3() throws InterruptedException {
BlockingQueue lockingQueue = new ArrayBlockingQueue<>(3);
lockingQueue.put("a");
lockingQueue.put("b");
lockingQueue.put("c");
System.out.println(lockingQueue.take());
System.out.println(lockingQueue.take());
System.out.println(lockingQueue.take());
System.out.println(lockingQueue.take());
}
队列中位置不够了但是再塞入的话就一直等,等到有位置插进去,如果取的时候队列中没元素了,使用take就会一直等,等到队列里面有元素了取走。
4、我不想一直等待,等一段时间等不到我就结束返回结果了,就使用offer的重载方法可以加时间的,同样对应的poll重载方法也可以加时间等待,等不到就返回结果结束
//超时阻塞,等待一定时间停止
public static void t4() throws InterruptedException {
BlockingQueue lockingQueue = new ArrayBlockingQueue<>(3);
System.out.println(lockingQueue.offer("a"));
System.out.println(lockingQueue.offer("b"));
System.out.println(lockingQueue.offer("c"));
//System.out.println(lockingQueue.offer("d",2,TimeUnit.SECONDS));
System.out.println(lockingQueue.poll());
System.out.println(lockingQueue.poll());
System.out.println(lockingQueue.poll());
System.out.println(lockingQueue.poll(2,TimeUnit.SECONDS));
}
14、同步队列SynchronousQueue
同步队列SynchronousQueue没有容量本身不存储数据,进去一个出去一个,没有多的,put完了就take
//同步队列,进一个出一个
public class SynchronousQueueTest {
public static void main(String[] args) {
SynchronousQueue<String> queue = new SynchronousQueue<>();
//入队列线程
new Thread(()->{
try {
System.out.println(Thread.currentThread().getName()+"入=>1");
queue.put("1");
System.out.println(Thread.currentThread().getName()+"入=>2");
queue.put("2");
System.out.println(Thread.currentThread().getName()+"入=>3");
queue.put("3");
} catch (InterruptedException e) {
e.printStackTrace();
}
},"A").start();
//出队列线程
new Thread(()->{
try {
TimeUnit.SECONDS.sleep(2);
System.out.println(Thread.currentThread().getName()+"出<="+ queue.take());
TimeUnit.SECONDS.sleep(2);
System.out.println(Thread.currentThread().getName()+"出<="+ queue.take());
TimeUnit.SECONDS.sleep(2);
System.out.println(Thread.currentThread().getName()+"出<="+ queue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
},"B").start();
}
}
15、线程池
线程池就是应用程序启动的时候创建一定数量的线程,把这些已经创建好的线程保存在线程池中。当需要执行任务的时候从线程池中拿一个空闲的线程去执行任务,任务执行完之后,线程返回线程池继续被其他任务复用。这样就避免了频繁创建和销毁线程操作带来的性能开销。
线程池的组成部分:
线程池管理器:负责创建、管理和控制线程池。它负责线程的创建、销毁和管理,以及线程池的状态监控和调度任务。
工作队列:用于存储待执行的任务。当线程池中的线程都在执行任务时,新的任务会被放入工作队列中等待执行。
线程池线程:实际执行任务的线程。线程池中会维护一组线程,这些线程可以被重复使用,从而避免了频繁创建和销毁线程的开销。
线程池运行步骤:
有任务时,线程池管理器会检查线程池中有没有空闲的线程,有的话就执行任务,没有就进入第2步。
当前线程池要是没空闲的线程,线程池管理器就看一下排队区也就是阻塞队列中满了没,没满就去阻塞队列中等待执行。
如果阻塞队列也满了,但是最大线程数还没有满,线程池管理器就会再开线程去执行任务。
如果连最大线程都满了,那就只能执行拒绝策略了。
线程池的优点包括重用线程、控制并发度、提供线程管理和监控等。通过适当地配置线程池的大小和任务队列的容量,可以充分利用系统资源,提高程序的性能和响应速度。同时,线程池可以避免线程创建和销毁的开销,减少了资源的浪费。
要注意:使用线程池的时候如果任务之间有依赖关系,有可能会引发死锁。
15.1、三大方法
1、newSingleThreadExecutor单个线程
创建单个线程,同一时间只能有一个线程被创建执行
//三大方法
public class ThreadPoolTest {
public static void main(String[] args) {
ExecutorService executor = Executors.newSingleThreadExecutor();//单个线程
// ExecutorService executor = Executors.newFixedThreadPool(5);//固定数量的线程
// ExecutorService executor = Executors.newCachedThreadPool();//缓存线程,弹性有多少是多少
for (int i = 1; i <= 5; i++) {
executor.execute(()->{
System.out.println(Thread.currentThread().getName()+" ok");
});
}
//线程池用完,程序结束需要关闭线程池
executor.shutdown();
}
}
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
为啥newSingleThreadExecutor方法只能有一个线程,因为本质上它就是核心线程数和最大核心线程数都是1 的线程池
2、newFixedThreadPool固定数量的线程
创建指定数量的线程,同一时间有指定数量的线程同时运行
ExecutorService executor = Executors.newFixedThreadPool(5);//固定数量的线程
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
源码里看到,固定数量的线程池它就是核心线程数和最大核心线程数都是指定数量,线程空闲时间为0代表线程不会因为空闲而被回收,使用LinkedBlockingQueue作为任务队列表示队列长度无限制。
3、newCachedThreadPool缓存线程
缓存线程,要多少有多少,同时间有多少线程取决于CPU性能了
//三大方法
public class ThreadPoolTest {
public static void main(String[] args) {
//ExecutorService executor = Executors.newSingleThreadExecutor();//单个线程
//ExecutorService executor = Executors.newFixedThreadPool(5);//固定数量的线程
ExecutorService executor = Executors.newCachedThreadPool();//缓存线程,弹性有多少是多少
for (int i = 1; i <= 1000; i++) {
executor.execute(()->{
System.out.println(Thread.currentThread().getName()+" ok");
});
}
//线程池用完,程序结束需要关闭线程池
executor.shutdown();
}
}
看最大是200多
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
newCachedThreadPool的核心线程数是0,但是最大核心线程数却是最大值 21亿 ,60秒的空闲清理,不能积压太多的线程,阻塞队列使用的是SynchronousQueue作为工作队列,不存储任务,直接提交给线程执行或拒绝。
可以看出的是以上三个方法的本质都是使用 new ThreadPoolExecutor()来创建线程池的,这就有7大参数了
注意:三大方法看到这里就不用看了,因为一般不让用,阿里规范中就不让使用,必须强制使用自定义线程池new ThreadPoolExecutor才更加安全可控
15.2、线程池7大参数
public ThreadPoolExecutor(int corePoolSize,//核心线程池数量大小
int maximumPoolSize,//最大线程池大小
long keepAliveTime,//空闲下来的存活时间
TimeUnit unit,//存活时间单位
BlockingQueue<Runnable> workQueue,//阻塞队列,工作队列,任务列表
ThreadFactory threadFactory,//线程工厂,创建线程使用
RejectedExecutionHandler handler) {//拒绝策略,以上都满了拒绝
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
线程池的7大参数分别是
int corePoolSize:核心线程数,核心线程是不会被垃圾清理掉的,除非给线程池关闭了。
int maximumPoolSize:最大线程数量,线程池最大数量,超过这个数的任务会执行拒绝策略或者阻塞。
long keepAliveTime:非核心线程的空闲存活时间,那没事干的不是核心的线程会活到这个时间后被清理掉。
TimeUnit unit:存活时间的单位。
BlockingQueue:阻塞队列,也叫工作队列workQueue,等待处理的任务会在这里等待。
ThreadFactory:线程工厂,线程池创建线程使用的一般不需要管。
RejectedExecutionHandler:拒绝策略,有4种拒绝策略。
AbortPolicy:默认策略,直接抛出RejectedExecutionException异常,终止任务,一般就是这个默认的
CallerRunsPolicy:由调用线程(提交任务的线程)执行被拒绝的任务,哪来的回哪去;
DiscardPolicy:静默丢弃被拒绝的任务,不进行任何处理;
DiscardOldestPolicy:丢弃队列中最老的一个任务,然后尝试重新提交被拒绝的任务;
为了更好的理解,看一下银行办理业务的窗口场景
平常银行窗口只开2个就可以应付工作了,两个核心线程一个个处理工作队列中的任务
public class ThreadPoolTest {
public static void main(String[] args) {
/* ExecutorService executor = Executors.newSingleThreadExecutor();//单个线程
ExecutorService executor2 = Executors.newFixedThreadPool(5);//固定数量的线程
ExecutorService executor3 = Executors.newCachedThreadPool();//缓存线程,弹性有多少是多少*/
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
2,
5,
3,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(5),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy()//直接抛出异常
);
for (int i = 1; i <= 7; i++) {
threadPoolExecutor.execute(()->{
System.out.println(Thread.currentThread().getName()+" ok");
});
}
//线程池用完,程序结束需要关闭线程池
threadPoolExecutor.shutdown();
}
}
7个任务,同时会有两个正在执行,5个在阻塞队列中等待,最终结果就是核心1和核心2交替执行这7个任务
但是过年了,来银行办理业务的人多了起来,阻塞队列位置不够了都,银行就会把剩下的创建慢慢打开来处理业务
for (int i = 1; i <= 8; i++) {
threadPoolExecutor.execute(()->{
System.out.println(Thread.currentThread().getName()+" ok");
});
}
这个时候把执行任务增加到8个,同时间就超了1一个不能处理,这个时候再开一个线程出来处理,所以我们会看到结果有第3个线程出现了。
最大是同时处理5个任务,5个在工作队列中等待处理。所以会看到有线程3 4 5都开始工作了。
人越来越多了,开了所有的窗口了,等待区也都坐满了,那就执行拒绝策略了,抛异常,让客户晚点再来拒绝他
当线程全开处理5个,工作队列中还有5个等待处理,再来就没法处理了,执行拒绝策略。
AbortPolicy:默认策略,直接抛出RejectedExecutionException异常,终止任务,一般就是这个默认的
增加任务到11个会抛异常
public class ThreadPoolTest {
public static void main(String[] args) {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
2,
5,
3,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(5),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy()//直接抛出异常
);
for (int i = 1; i <= 7; i++) {
threadPoolExecutor.execute(()->{
System.out.println(Thread.currentThread().getName()+" ok");
});
}
//线程池用完,程序结束需要关闭线程池
threadPoolExecutor.shutdown();
}
}
CallerRunsPolicy:由调用线程(提交任务的线程)执行被拒绝的任务,哪来的回哪去;
还是11个任务,拒绝策略改成CallerRunsPolicy,从哪来回那去,main主线程调用的到线程池执行任务就还还给主线程去处理。
public class ThreadPoolTest {
public static void main(String[] args) {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
2,
5,
3,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(5),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.CallerRunsPolicy()//哪来回哪
);
for (int i = 1; i <= 7; i++) {
threadPoolExecutor.execute(()->{
System.out.println(Thread.currentThread().getName()+" ok");
});
}
//线程池用完,程序结束需要关闭线程池
threadPoolExecutor.shutdown();
}
}
可以看到最后main主线程去处理了
DiscardPolicy:静默丢弃被拒绝的任务,不进行任何处理;
还是11个任务,使用DiscardPolicy就会被默默的扔了不会报异常
public class ThreadPoolTest {
public static void main(String[] args) {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
2,
5,
3,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(5),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.DiscardPolicy()//默默的扔了不做任何的处理
);
for (int i = 1; i <= 11; i++) {
threadPoolExecutor.execute(()->{
System.out.println(Thread.currentThread().getName()+" ok");
});
}
//线程池用完,程序结束需要关闭线程池
threadPoolExecutor.shutdown();
}
}
DiscardOldestPolicy:丢弃队列中最老的一个任务,然后尝试重新提交被拒绝的任务;
还是11个任务,使用DiscardOldestPolicy拒绝策略,多的任务会看看最先被处理的任务处理完了没有,没有处理完那这个多的任务就又会被默默扔掉也不报异常,如果最老的任务处理完成了就处理多的任务。
public class ThreadPoolTest {
public static void main(String[] args) {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
2,
5,
3,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(5),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.DiscardOldestPolicy()//看看最早处理的任务处理完了没有,处理完了就处理那个多出来的任务
);
for (int i = 1; i <= 11; i++) {
threadPoolExecutor.execute(()->{
System.out.println(Thread.currentThread().getName()+" ok");
});
}
//线程池用完,程序结束需要关闭线程池
threadPoolExecutor.shutdown();
}
}
显然,它还是被扔了
16、CPU密集型和IO密集型
线程池最大线程数到底应该怎么去定义?看业务是CPU密集还是IO密集
线程池大小设置:
需要分析线程池执行的任务的特性: CPU 密集型还是 IO 密集型
每个任务执行的平均时长大概是多少,这个任务的执行时长可能还跟任务处理逻辑是否涉及到网络传输以及底层系统资源依赖有关系。
如果是 CPU 密集型,主要是执行计算任务,响应时间很快,cpu 一直在运行,这种任务 cpu 的利用率很高,那么线程数的配置应该根据 CPU 核心数来决定,CPU 核心数=最大同时执行线程数
比如说:CPU 核心数为 4,那么服务器最多能同时执行 4 个线程。过多的线程会导致上下文切换反而使得效率降低。那线程池的最大线程 数可以配置为 cpu 核心数再加1
//获取CPU的核心数
int cpuCoreNum = Runtime.getRuntime().availableProcessors();
System.out.println(cpuCoreNum);
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
2,
cpuCoreNum+1,//CPU核心数加1
3,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(5),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.DiscardOldestPolicy()
);
如果是 IO 密集型,主要是进行 IO 操作,执行 IO 操作的时间较长,这时 CPU处于空闲状态,导致 CPU的利用率不高,这种情况下可以增加线程池的大小。这种情况下可以结合线程的等待时长来做判断,等待时间越 长,那么线程数也相对越多。一般可以配置 CPU核心数的 2 倍。
public class ThreadPoolTest {
public static void main(String[] args) {
//获取CPU的核心数
int cpuCoreNum = Runtime.getRuntime().availableProcessors();
System.out.println(cpuCoreNum);
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
2,
cpuCoreNum*2,//CPU核心数*2
3,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(5),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.DiscardOldestPolicy()//看看最早处理的任务处理完了没有,处理完了就处理那个多出来的任务
);
for (int i = 1; i <= 11; i++) {
threadPoolExecutor.execute(()->{
System.out.println(Thread.currentThread().getName()+" ok");
});
}
//线程池用完,程序结束需要关闭线程池
threadPoolExecutor.shutdown();
}
}
一个公式:线程池设定最佳线程数目 = ((线程池设定的线程等待时间+线程 CPU 时间)/ 线程 CPU 时间 )这个公式的线程 cpu 时间是预估的程序单个线程在 cpu 上运行的时间(通常使用 loadrunner 测试大量运行次数求出平均值)
17、四大函数式接口
简单来说就是接口中只有一个方法的接口叫做函数式接口它们的接口上都有@FunctionalInterface这个注解
看到java.util.function这个包中的接口,原生的就四个
17.1、Function
public class FunctionTest {
public static void main(String[] args) {
//使用匿名内部类写法
/* Function function = new Function<String,String>() {
@Override
public String apply(String o) {
return o;
}
};*/
//使用lambda表达式的方式简化写法
Function<String,String> function = str->{
return str;
};
System.out.println(function.apply("abcd"));
}
}
结果都一样的
17.2、Predicate断定型接口
Predicate断定型接口有输入,返回的是布尔值,ture或false所以叫断定型的接口
下面包装一个判断字符是否为空方法
//使用匿名内部类写法
//判断字符是否为空
/* Predicate<String> predicate = new Predicate<String>() {
@Override
public boolean test(String s) {
return s.isEmpty();
}
};*/
//使用lambda表达式写
Predicate<String> predicate = str ->{
return str.isEmpty();
};
System.out.println(predicate.test("aaa"));
17.3、Supplier供给型接口
Supplier供给型接口只有返回值没有输入
//使用匿名内部类写法
/*Supplier<String> supplier = new Supplier<String>() {
@Override
public String get() {
System.out.println("输出");
return "abcd";
}
};*/
//使用lambda表达式写法
Supplier<String> supplier = ()->{
System.out.println("输出");
return "abcd";
};
System.out.println(supplier.get());
17.4、Consumer消费型接口
Consumer消费型接口只有输入没有返回
//使用匿名内部类写法
/* Consumer<String> consumer = new Consumer<String>() {
@Override
public void accept(String s) {
System.out.println(s);
}
};*/
//使用lambda表达式写法
Consumer<String> consumer = (str)->{
System.out.println(str);
};
consumer.accept("abcd");
18、Stream流式计算
Stream 流式计算是一种基于数据流的计算方式,它将数据源(如集合、数组等)生成的数据元素序列作为输入,通过一系列中间操作处理数据,最终产生一个或多个结果。与传统的批处理方式相比,Stream 流式计算具有实时性、高效性和可扩展性等优点,因此在大数据处理、云计算等领域得到了广泛应用。
Stream 流式计算的原理可以概括为“惰性求值”,即中间操作不会立即执行,而是等待终止操作时才一次性全部处理。这种方式能够节省内存空间,提高处理效率。Stream 流式计算的单线程串行计算默认使用串行方式,即在一个线程里执行。
简单示例:
/*
* 5个用户需要做以下几个筛选:
* 1、ID必须为偶数;
* 2、年龄必须大于22岁;
* 3、把用户名转化为大写;
* 4、用户名字母倒叙排列;
* 5、只输出一个用户;
* */
public class StreamTest {
public static void main(String[] args) {
User a = new User(1, "a", 21);
User b = new User(2, "b", 22);
User c = new User(3, "c", 23);
User d = new User(4, "d", 24);
User e = new User(6, "e", 25);
List<User> list = Arrays.asList(a, b, c, d, e);
//使用stream流
list.stream()
.filter(user -> user.getId() %2 == 0)
.filter(user -> user.getAge() > 22)
.map(user -> user.getName().toUpperCase())//将流中的元素映射为其他形式
.sorted(Comparator.reverseOrder())//:Comparator.reverseOrder() 返回一个比较器,用于按自然顺序的逆序进行比较。
.limit(1)//只输出一个
.forEach(System.out::println);
}
}
Steam流式计算提供很多链式操作方法,包括了创建stream流的方式、流中间的操作方法以及最后的终结方法
创建Stream流方法:
Stream.of(T... values):从值创建一个 Stream。
Collection.stream():从集合创建一个 Stream。
Arrays.stream(T[] array):从数组创建一个 Stream。
Files.lines(Path path):从文件中的每一行创建一个 Stream。
中间操作(中间方法):
中间方法的特点就是惰性求值,就是中间的操作不会立即执行,而是返回一个新的流,实际的计算操作是遇到终结方法的时候才会执行整个流水线。由于中间操作是延迟执行的,StreamAPI可以在内部进行优化,比如短路操作(如findFirst或者anyMatch)可以在找到满足条件的元素时立即终止,而不需要操作整个数据集。
常见的中间方法:
filter(Predicate<T> predicate):过滤流中的元素,只保留满足条件的元素。
map(Function<T, R> mapper):将流中的每个元素转换为另一个形式。
flatMap(Function<T, Stream<R>> mapper):将流中的每个元素转换为一个流,然后将所有流扁平化为一个流。
distinct():去除流中的重复元素。
sorted():对流中的元素进行排序,默认按自然顺序。
sorted(Comparator<? super T> comparator):对流中的元素进行排序,使用自定义比较器。
peek(Consumer<? super T> action):对流中的每个元素执行操作,并返回一个新的流。
limit(long maxSize):截断流,使其最多包含指定数量的元素。
skip(long n):跳过流中的前 n 个元素。
终结方法:
终结方法是会触发流的计算产生最终结果的方法。终结方法会结束流的操作链并返回一个具体的结果。
forEach(Consumer<? super T> action):对流中的每个元素执行操作。
toArray():将流转换为数组。
reduce(BinaryOperator<T> accumulator):对流中的元素进行归约操作。
collect(Collector<? super T, A, R> collector):将流中的元素收集到一个集合或其他数据结构中。
min(Comparator<? super T> comparator):返回流中的最小元素。
max(Comparator<? super T> comparator):返回流中的最大元素。
count():返回流中的元素数量。
anyMatch(Predicate<? super T> predicate):检查流中是否至少有一个元素满足给定的条件。
allMatch(Predicate<? super T> predicate):检查流中所有元素是否都满足给定的条件。
noneMatch(Predicate<? super T> predicate):检查流中没有任何元素满足给定的条件。
findFirst():返回流中的第一个元素。
findAny():返回流中的任意一个元素。
19、ForkJoin
当我们需要执行大量的小任务的时候会选择使用线程池进行高并发处理,但是,如果数据量很大的计算任务本身它也能并发执行,那么就可以给这个大任务进行动态的拆分,大任务拆分成小任务然后小任务再拆分成更小的任务,最后再将这些小任务的结果给合并得到最终的结果,这种思路就是Fork/Join模型。
19.1、工作窃取法
工作窃取法是指某个线程从其他线程的任务队列中窃取任务来执行。
假如我们有很大的计算任务,我们把这个任务拆分成若干个子任务,把这些子任务分别放到不同的队列里,并为各队列创建一个单独的线程处理队列中的任务。假如A线程中负责的任务都处理完了,B还有很多没有处理,这个时候A线程就可以从B线程负责的队列尾部窃取任务去执行,而不是让A线程干等着白白浪费资源。这个时候A和B线程会同时访问同一队列,这个队列通常是双端队列,被窃取任务的线程从双端队列的头部拿任务执行,而窃取任务的线程从双端队列的尾部去拿任务执行,这样就减少了线程之间的竞争,还充分的利用了线程。
缺点就是,双端队列中只有一个任务了,俩线程就会存在竞争,因为要创建很多线程,这样消耗了系统资源。
19.2、Fork/Join 框架的使用
一:第一步拆分任务。
我们需要有一个 fork 类来把大任务分割成子任务,有可能子任务还是很大,所以还需要不停的分割,直到分割出的子任务足够小。
二:第二步执行任务并合并结果
分割的子任务分别放在双端队列里,然后几个启动线程分别从双端队列里获取任务执行。子任务执行完的结果都统一放在一个队列里,启动一个线程从队列里拿数据,然后合并这些数据。
为了实现上面两个步骤实现任务拆分和合并Fork/Join 使用以下两个类来完成:
ForkJoinTask:我们要使用 ForkJoin 框架,必须首先创建一个 ForkJoin 任务。它提供在任务中执行 fork() 和 join() 操作的机制,通常情况下我们不需要直接继承 ForkJoinTask 类,而只需要继承它的子类,Fork/Join 框架提供了以下两个子类:
RecursiveAction:用于没有返回结果的任务。
RecursiveTask :用于有返回结果的任务。
ForkJoinPool :ForkJoinTask 需要通过 ForkJoinPool 来执行,任务分割出的子任务会添加到当前工作线程所维护的双端队列中,进入队列的头部。当一个工作线程的队列里暂时没有任务时,它会随机从其他工作线程的队列的尾部获取一个任务。
演示:
创建一个求和计算任务,该类继承RecursiveTask
/**
* 求和计算任务,通过forkJoinPool来执行
* 想要使用ForkJoin需要继承ForkJoinTask 的子类
* RecursiveAction:用于没有返回结果的任务
* RecursiveTask:用于有返回结果的任务
*/
public class ForkJoinTest extends RecursiveTask<Long> {
private long start;//起始值
private long end;//结束值
private long temp = 10000L;//临界值,这个临界值写死了,可以动态调整的
public ForkJoinTest(long start,long end){
this.start = start;
this.end = end;
}
//计算方法
@Override
protected Long compute() {
//如果数值没有超过临界值就按照正常的方法直接算
if ((end - start) <temp){
long sum = 0L;
for (long i = start; i <= end; i++) {
sum +=i;
}
return sum;
}else {//如果超过了我们设置的临界值就走任务拆分
//找到中间值
long middle = (end + start)/2;
ForkJoinTest task1 = new ForkJoinTest(start, middle);
task1.fork();//拆分任务把任务压入线程队列
ForkJoinTest task2 = new ForkJoinTest(middle+1, end);
task2.fork();//拆分任务把任务压入线程队列
return task1.join()+task2.join();
}
}
}
然后我们使用三种方法进行大数求和计算
第一种:硬算
public class Test {
public static void main(String[] args) throws ExecutionException, InterruptedException {
t1();
}
//第一种使用最原始的方式直接硬算
public static void t1(){
long sum = 0L;
long startTime = System.currentTimeMillis();
for (long i = 1L; i <= 10_0000_0000; i++) {
sum +=i;
}
long endTime = System.currentTimeMillis();
System.out.println("t1sum:"+sum+" 时间:"+(endTime-startTime));
}
}
结果是340毫秒
第二种:使用forkJoin方法拆分任务
public class Test {
public static void main(String[] args) throws ExecutionException, InterruptedException {
t2();
}
// 第二种是使用forkJoin方法拆分任务
public static void t2() throws ExecutionException, InterruptedException {
long startTime = System.currentTimeMillis();
// 创建执行对象
ForkJoinPool forkJoinPool = new ForkJoinPool();
try {
// 创建任务对象
ForkJoinTask<Long> forkJoinTest = new ForkJoinTest(0L, 10_0000_0000L);
// 提交任务到工作队列
ForkJoinTask<Long> submit = forkJoinPool.submit(forkJoinTest);
long sum = submit.get();
long endTime = System.currentTimeMillis();
System.out.println("t2sum: " + sum + " 时间:" + (endTime - startTime));
} finally {
// 释放资源
forkJoinPool.shutdown();
}
}
}
使用ForkJoin是150毫秒
第三种:使用stream并行流方法拆分任务
public class Test {
public static void main(String[] args) throws ExecutionException, InterruptedException {
t3();
}
// 第三种是使用stream并行流方法拆分任务
public static void t3(){
long startTime = System.currentTimeMillis();
//使用stream并行流
long sum = LongStream.rangeClosed(0L, 10_0000_0000L).parallel().reduce(0, Long::sum);
long endTime = System.currentTimeMillis();
System.out.println("t3sum:"+sum+"时间:"+(endTime-startTime));
}
}
详细解释一下:
LongStream.rangeClosed(0L, 10_0000_0000L):生成一个包含从0到100,000,000(包括两端点)的长整型流。
parallel():将流转换为并行流,允许流中的操作并行执行,利用多核处理器提高性能。
reduce(0, Long::sum):对流中的元素进行归约操作,初始值为0,使用Long::sum方法作为累加器,即对流中的每个元素求和。
使用Stream并行流的方式计算得到的结果是120毫秒
20、异步回调
Future表示一个可能还没有完成的异步任务的结果,针对这个结果可以添加Callback以便在任务执行成功或失败后作出相应的操作。
当我们得到包含结果的Future时,我们可以使用get方法等待线程完成并获取返回值,Future的get() 方法会阻塞主线程。
当Future的线程进行了一个非常耗时的操作,那我们的主线程也就阻塞了。 当我们在简单业务上,可以使用Future的另一个重载方法get(long,TimeUnit)来设置超时时间,避免我们的主线程被无穷尽地阻塞。
CompleteFuture
CompletableFuture,结合了Future的优点,提供了非常强大的Future的扩展功能,可以帮助我们简化异步编程的复杂性,提供了函数式编程的能力,可以通过回调的方式处理计算结果,并且提供了转换和组合CompletableFuture的方法。CompletableFuture被设计在Java中进行异步编程。异步编程意味着在主线程之外创建一个独立的线程,与主线程分隔开,并在上面运行一个非阻塞的任务,然后通知主线程进展,成功或者失败。
演示:
没有返回值的runAsync异步回调
/*
* 异步调用:CompleteFuture
* 成功回调
* 失败回调
* */
public class FutureTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//没有返回值的runAsync异步回调
CompletableFuture<Void> voidCompletableFuture = CompletableFuture.runAsync(() -> {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "runAsync--->无返回值的异步回调");
});
System.out.println("main111");
voidCompletableFuture.get();//获取阻塞执行结果
}
}
有返回值的异步调用结果:
public class FutureTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//有返回值的supplyAsync异步回调
CompletableFuture<Integer> completableFutureSupplyAsync = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + "supplyAsync--->有返回值的异步回调");
return 1024;
});
//成功回调
//成功返回值
//whenComplete编译成功时要做的操作,这是一个增强的消费型接口
System.out.println( completableFutureSupplyAsync.whenComplete((t,u)->{
System.out.println("t=="+t);//t就是正常返回的结果值
System.out.println("u=="+u);//u就是发生错误返回的异常
}).exceptionally((e)->{// 失败返回错误信息,参数就是e异常
System.out.println(e.getMessage());//打印堆栈信息
return 500;//失败了返回一下失败的结果这里写个500
}).get());
}
}
现在是正常的返回结果都是正常的1024
现在故意出错一下
//有返回值的supplyAsync异步回调
CompletableFuture<Integer> completableFutureSupplyAsync = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + "supplyAsync--->有返回值的异步回调");
int i = 10/0;
return 1024;
});
返回结果中的u就是错误信息
21、JMM内存模型
Java内存模型(Java Memory Model,JMM)是一种抽象的概念,用于定义多线程程序如何与内存进行交互。JMM并不真实存在,它是一种规范
,规定了程序中变量在内存中的访问方式。
JMM(Java内存模型)主要是定义了对于一个共享变量,当另一个线程对这个共享变量进行改变之后,当前线程对这个共享变量的可见性。
主内存:所有的变量都存储在主内存中,主内存是共享的。
工作内存:每一个线程都有自己的工作内存,工作内存的变量是从主内存中的副本,线程对这个副本变量进行操作都是在工作内存中,然后在把更改结果同步到主存中。
内存可见性:
既然主内存中的变量是共享的,那么多个线程对主内存中的同一个变量进行修改,一个线程对这个共享变量修改之后的结果能够被其他的线程看到,叫做内存的可见性。JMM规定了线程对变量的读取和写入顺序,确保变量的修改在其他线程中可见。
JMM的三大特性
原子性:一个或多个操作,要么全部执行,要么全部不执行;
可见性:一个线程对共享变量的修改能被其他线程看到。这个要通过volatile关键字实现;
有序性:程序的执行在实际的运行时可能会被重新排序,但JMM提供了一定的保证,使得某些操作操作在多线程环境中还会按照程序顺序执行;
JMM的八种操作
• lock(锁定):作用于主内存的共享变量,把一个变量标识为一条线程独占状态。
• unlock(解锁):作用于主内存共享变量,把一个处于锁定状态的共享变量释放出来,释放后的共享变量才可以被其他线程锁定。
• read(读取):作用于主内存共享变量,把一个共享变量值从主内存传输到线程的工作内存中,以便随后的load动作使用。
• load(载入):作用于工作内存的变量,它把read操作从主内存中得到的变量值放入工作内存的变量副本中。
• use(使用):作用于工作内存的变量,把工作内存中的一个变量值传递给执行引擎,每当虚拟机遇到一个需要使用变量的值的字节码指令时将会执行这个操作。
• assign(赋值):作用于工作内存的变量,它把一个从执行引擎接收到的值赋值给工作内存的变量,每当虚拟机遇到一个给变量赋值的字节码指令时执行这个操作。
• store(存储):作用于工作内存的变量,把工作内存中的一个变量的值传送到主内存中,以便随后的write的操作。
• write(写入):作用于主内存的变量,它把store操作从工作内存中一个变量的值传送到主内存的变量中。
演示一个案例,在没有使用volatile关键字时的共享变量修改不可见的情况:
//两个线程对共享变量操作
public class JMMDemo {
private static int num = 0;
public static void main(String[] args) {//main线程
new Thread(()->{
while (num ==0){
}
}).start();
//main线程睡一秒
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
num =1;
System.out.println(num);
}
}
启动程序会发现控制台就输出了一个1,然后就一直不动了程序不停止,因为在第二个线程启动的时候获得的共享变量num是等于0的,而此时 main线程把共享变量num修改成1了,已经刷回主内存输出了,第二个线程拿到的主存共享变量num的值还是等于0,所以会死循环。
如果不想死循环,想让这个共享变量的修改立马可见,你可以直接在循环体里面进行输出,就不会死循环
println输出方法也是有synchronized同步代码块,synchronized也能保证对变量的修改可见性,主线程对num修改之后第二个线程也能立刻感知然后停止循环。
22、Volatile
要想实现对共享变量的修改可见性就需要使用Volatile关键字了。Volatile是 Java虚拟机提供轻量级的同步机制它能保证变量修改的可见性但不保证原子性,它还能禁止指令重排。
private volatile static int num = 0;
直接返回个1就结束了,没有循环
Volatile并不保证原子性
多线程情况下对同一个变量进行修改(这里就直接自增操作了),可以使用synchronized对这个变量或者操作变量的方法修饰,也就是加锁同一时间只能有一个线程做出操作,保证最后的数据正确性,但是Volatile并不像synchronized或者直接lock那样使变量具有原子性。
演示:什么修饰都不加的共享变量高并发情况下的自增
20个线程都对这个变量加1000次最终的结果一定是小于等于20000的
public class VolatileDemo {
private static int num = 0;
public static void add(){
num++;
}
public static void main(String[] args) {
//来20个线程,分别对num加1000次
for (int i = 1; i <= 20; i++) {
new Thread(()->{
for (int j = 1; j <= 1000; j++) {
add();
}
}).start();
}
//判断线程还有多少存活的
while (Thread.activeCount() >2){//默认的得有一个main线程和gc线程
Thread.yield();
}
System.out.println(Thread.currentThread().getName()+" "+num);
}
}
下面我们在add方法使用synchronized修饰
public class VolatileDemo {
private static int num = 0;
public static synchronized void add(){
num++;
}
public static void main(String[] args) {
//来20个线程,分别对num加1000次
for (int i = 1; i <= 20; i++) {
new Thread(()->{
for (int j = 1; j <= 1000; j++) {
add();
}
}).start();
}
//判断线程还有多少存活的
while (Thread.activeCount() >2){//默认的得有一个main线程和gc线程
Thread.yield();
}
System.out.println(Thread.currentThread().getName()+" "+num);
}
}
结果一定是20000
那么在num变量上加Volatile修饰idea就直接提示非原子性操作
结果当然不会是20000
还有一种不加锁的方法就是使用原子类,加锁性能会下降,使用原子类就好很多,原子类的底层是直接和操作系统有关的,在内存中修改值
//原子类的Integer——>AtomicInteger
private static AtomicInteger num = new AtomicInteger();
public static void add(){
//num++;
num.getAndIncrement();//AtomicInteger+1的方法CAS
}
23、指令重排
假如我们写了一个Java程序,我们默认会认为程序会以我们写的代码顺序走,但是实际上,编译器、JVM或者CPU都有可能出于优化的目的并不会老老实实的按照我们写的代码顺序执行,它们会进行调整。
那么,重新排序的好处就是能提高处理速度。
举个例子:
上图左边是Java代码,右边是简化的执行指令
a=10这行对应的操作就是加载a,然后设置为10,最后存回主存,后三行也是一样的。
可以发现的是a被加载了两次还被写回了两次,按照我们给定的这个程序顺序a的操作显然有些频繁且多余,很浪费时间。
把a=a+10放在b定义的前面不就行了,也不影响a和b的结果,这就是指令重排做的,不按照你写的顺序,怎么速度快怎么方便怎么来执行,才不会严格按照你写的顺序来执行。
这样a就只被加载1次写回1次就行了,不影响正确结果。
三种重排方式
编译器优化
编译器(包括 JVM、JIT 编译器等)出于优化的目的,例如当前有了数据 a,把对 a 的操作放到一起效率会更高,避免读取 b 后又返回来重新读取 a 的时间开销,此时在编译的过程中会进行一定程度的重排。不过重排序并不意味着可以任意排序,它需要需要保证重排序后,不改变单线程内的语义,否则如果能任意排序的话,程序早就逻辑混乱了。即编译器在不改变单线程程序语义的前提下,可以重新安排语句的执行顺序。
CPU重排(指令集并行重排)
现代处理器采用了指令级并行技术(Instruction-Level Parallelism,ILP)来将多条指令重叠执行。如果不存在数据依赖性,处理器可以改变语句对应机器指令的执行顺序。
内存重排
由于处理器使用缓存和读/写缓冲区,这使得加载和存储操作看上去可能是在乱序执行。由于内存有缓存的存在,在 JMM 里表现为主存和本地内存,而主存和本地内存的内容可能不一致,所以这也会导致程序表现出乱序的行为。
重排的原则(as-if-serial语义)
编译器和处理指令也并非什么场景都会进行指令重排序的优化,而是会遵循一定的原则,只有在它们认为重排序后不会对程序结果产生影响的时候才会进行重排序的优化,如果重排序会改变程序的结果,那这样的性能优化显然是没有意义的。而遵守as-if-serial 语义规则就是重排序的一个原则,as-if-serial 的意思是说,可以允许编译器和处理器进行重排序,但是有一个条件,就是不管怎么重排序都不能改变单线程执行程序的结果。
重排在多线程情况下的影响
单线程的重排很简单,分析程序语义就能知道前后数据的依赖性。但是,多线程环境下,编译器和CPU指令优化根本不能识别多线程之间存在的数据依赖性。
例如:
int a=0;
boolean flag;
public static void init(){
a=10; //语句1
flag=true; //语句2
}
public static void getValue(){
if(flag){ //语句3
System.out.println(a); //语句4
}
}
在上面的代码中,如果程序是单线程的,那么从上到下走下来即使指令重排了getValue输出的值就是10。init方法中的语句1和语句2没有任何的数据依赖关系,他俩的位置可以调换。
此时在多线程环境下,如果两个线程分别调用init方法和getValue方法,那么可能在指令重排的时候,语句1和语句2交换位置执行,flag此刻为true的时候另一个线程调用了getValue方法,flag此时是true,但是a的值还没有赋值呢还是0,所以最终输出的a就是0了,而不会是10。
所以说,在复杂的多线程环境下,编译器和处理器是根本无法通过语义分析来知道代码指令的依赖关系的,这个问题只有我们才知道,这个时候我们就需要通过一种方式显示的告诉编译器和处理器哪些地方是存在逻辑依赖的,这些地方不能进行重排序。
内存屏障
内存屏障,也称内存栅栏、内存栅障、屏障指令等,是一类同步屏障指令,是 CPU 或编译器在对内存随机访问的操作中的一个同步点,使得此点之前的所有读写操作都执行后才可以开始执行此点之后的操作。
内存屏障可以对编译器(软件)和处理器(硬件)的指令重排做出一定的限制,比如,一条内存屏障指令可以禁止编译器和处理器将其后面的指令移到内存屏障之前。
在多线程编程中,内存屏障是一种非常重要的同步机制。多个线程同时访问同一份数据时,会出现线程安全性问题,需要使用内存屏障来保证线程之间的数据同步。
内存屏障主要有以下几种类型:
Load Barrier:阻止在屏障之前的读操作与在屏障之后的读操作进行重排序。
Store Barrier:阻止在屏障之前的写操作与在屏障之后的写操作进行重排序。
Full Barrier:阻止在屏障之前的读写操作与在屏障之后的读写操作进行重排序。
在编译器和CPU层面上都有一套内存屏障来禁止指令重排,我们需要在有数据依赖的地方加上内存屏障指令就不会被指令重排了。怎么防止指令重排,有三种方法。在Java中,可以使用volatile关键字、synchronized关键字以及显式的内存屏障来防止指令重排序。
使用volatile关键字:volatile关键字可以确保变量的可见性和有序性。它禁止了编译器和处理器对其修饰的变量进行重排序优化。
synchronized关键字可以确保进入同步代码块的每个线程都持有相同的锁,保证了代码块内的指令按顺序执行,并且保证了可见性。
Java提供了Unsafe类的方法来设置显式的内存屏障,如storeFence()和loadFence(),以强制进行内存屏障操作。
24、单例模式
单例模式(Singleton Pattern)是 Java 中最简单的设计模式之一。这种类型的设计模式属于创建型模式,它提供了一种创建对象的最佳方式。这种模式涉及到一个单一的类,该类负责创建自己的对象,同时确保只有单个对象被创建,这个类提供了一种访问其唯一的对象的方式,可以直接访问,不需要被实例化该类的对象。
单例模式主要解决频繁创建和销毁全局使用的类实例的问题;
当需要控制实力数量,节省系统资源的呃时候就需要使用单例模式了;
单例模式的类中构造器是私有的;
优点:
内存中只有一个实例,减少内存开销,尤其是频繁的创建和销毁实例时
缺点:没有接口也不能继承。与单一原则冲突,一个类只关系内部逻辑,而不关心实例化方法。
基本实现方法
创建一个饿汉式单例类:
//单例模式
public class SingleObject {
//创建SingleObject的一个对象
private static SingleObject instance = new SingleObject();
//让构造函数私有化private,这样这个类就不会被实例化
private SingleObject(){}
//对外提供唯一的访问方法来获取单例对象
public static SingleObject getInstance(){
return instance;
}
public void showMessage(){
System.out.println("单例模式");
}
}
然后使用它
public class SingletonPatternDemo {
public static void main(String[] args) {
//我们不能直接使用new SingleObject();构造函数创建单例类的对象
//SingleObject object = new SingleObject();这样是不行的
SingleObject object = SingleObject.getInstance();
object.showMessage();
}
}
实现单例模式的几种方式
(1)饿汉式单例
饿汉式顾名思义,为了饱腹,在类加载的时候就全部初始化,你用得到或者用不到的资源都给你初始化,所以比较浪费资源。
基于上面的示例
//饿汉式单例
public class SingleObject {
//浪费空间
private byte[] data1 = new byte[1024*1024];
private byte[] data2 = new byte[1024*1024];
private byte[] data3 = new byte[1024*1024];
//创建SingleObject的一个对象
private static SingleObject instance = new SingleObject();
//让构造函数私有化private,这样这个类就不会被实例化
private SingleObject(){}
//对外提供唯一的访问方法来获取单例对象
public static SingleObject getInstance(){
return instance;
}
public void showMessage(){
System.out.println("饿汉式单例");
}
}
里面定义的对象在类加载的时候就初始化占着空间,导致浪费。
(2)懒汉式单例
有需要的时候再加载,这种方式是最基本的实现方式,这种实现最大的问题就是不支持多线程。因为没有加锁 synchronized。这种方式 lazy loading 很明显,不要求线程安全,在多线程不能正常工作。
//懒汉式单例
public class LazyMan {
private static LazyMan instance;
private LazyMan(){
System.out.println(Thread.currentThread().getName()+"OK");
}
public static LazyMan getInstance(){
if (instance == null){
instance = new LazyMan();
}
return instance;
}
public static void main(String[] args) {
//多并发
for (int i = 1; i <= 20; i++) {
new Thread(()->{
LazyMan.getInstance();
}).start();
}
}
}
这样的懒汉式是不安全的,多线程环境下判断的地方可能会导致instance实例被创建多次因为没有加锁
可以看到结果并不是一个实例,好几个
(3)DCL懒汉式单例
想要多线程下安全的实现单例模式,可以使用双检锁来校验也是DCL懒汉式
//DCL懒汉式单例
public class LazyMan {
private volatile static LazyMan instance;
private LazyMan(){
System.out.println(Thread.currentThread().getName()+"OK");
}
//DCL懒汉式双重锁校验
public static LazyMan getInstance(){
if (instance == null){
synchronized (LazyMan.class){//锁这个类
if (instance == null){
//这一步创建实例对象也是非原子性的可能会被指令重排,所以在声明实例的时候就使用volatile保证原子性
instance = new LazyMan();
}
}
}
return instance;
}
public static void main(String[] args) {
//多并发
for (int i = 1; i <= 20; i++) {
new Thread(()->{
LazyMan.getInstance();
}).start();
}
}
}
这样的懒汉式单例是线程安全的
(4)静态内部类实现单例
我们在类的内部再来定义一个内部类,内部类中创建外面类的实例对象,再提供对外方法获得单例对象
//静态内部类实现单例模式
public class Holder {
private Holder(){}
public static Holder getInstance(){
return InnerClass.INSTANCE;
}
public static class InnerClass{
private final static Holder INSTANCE = new Holder();
}
}
但是这样做是线程不安全的,我们能够通过反射来破环这个单例(包括DCL懒汉式的单例也能破坏)
使用反射破坏单例
单例模式通常通过将类的构造器设为私有来防止外部直接实例化。然而,反射机制允许程序在运行时动态地访问和修改类的私有成员,包括私有构造器和私有化的字段等。
LazyMan instance = LazyMan.getInstance();
//通过反射创建一个instance对象
Constructor<LazyMan> declaredConstructor = LazyMan.class.getDeclaredConstructor(null);//空参构造器
//无视私有的构造器设置构造器可访问
declaredConstructor.setAccessible(true);
//通过反射进行创建LazyMan对象
LazyMan instance2 = declaredConstructor.newInstance();
System.out.println(instance);
System.out.println(instance2);
结果就是这两个对象并不是同一个
由于这种方法是通过构造器来创建的实例,所以我们能在构造器中做限制,在LazyMan类上加锁,第二次创建对象的时候就不能创建了。
//DCL懒汉式单例
public class LazyMan {
private volatile static LazyMan instance;
private LazyMan(){
synchronized (LazyMan.class){
if (instance != null){
throw new RuntimeException("不要使用反射破坏单例");
}
}
}
//DCL懒汉式双重锁校验
public static LazyMan getInstance(){
if (instance == null){
synchronized (LazyMan.class){//锁这个类
if (instance == null){
instance = new LazyMan();//这一步创建实例对象也是非原子性的可能会被指令重排,所以在声明实例的时候就使用volatile保证原子性
}
}
}
return instance;
}
public static void main(String[] args) throws NoSuchMethodException, InvocationTargetException, InstantiationException, IllegalAccessException {
LazyMan instance = LazyMan.getInstance();
//通过反射创建一个instance对象
Constructor<LazyMan> declaredConstructor = LazyMan.class.getDeclaredConstructor(null);//空参构造器
//无视私有的构造器设置构造器可访问
declaredConstructor.setAccessible(true);
//通过反射进行创建LazyMan对象
LazyMan instance2 = declaredConstructor.newInstance();
System.out.println(instance);
System.out.println(instance2);
}
}
但是如果我两个对象都使用反射创建它又会被破坏,反射调用构造器时,每次调用都是独立的,不会共享同一个同步锁的状态。因此,即使在构造器中使用了同步锁,也无法阻止反射多次调用构造器。
我们再加一层判断,设置一个标志,私有化flag,先看flag的状态是不是初始的false,如果是就代表它是第一次创建的实例,这个时候把flag改成true。这样再有反射创建实例的时候检查flag一看是true,那说明它不是第一次创建,说明有人想创建第二个实例,那就直接抛出异常。
//创建一个flag
private static boolean flag = false;
private volatile static LazyMan instance;
private LazyMan(){
synchronized (LazyMan.class){
if (flag == false){
flag = true;
}else {
throw new RuntimeException("不要使用反射破坏单例");
}
}
}
这样就一定可以防住反射破坏了吗?不可能的,前面说过,反射机制允许程序在运行时动态地访问和修改类的私有成员,包括私有构造器和私有化的字段等。我们尝试给私有字段也绕过去,在第一个实例创建完成之后立马把这个flag再设置回false,那就能骗过判断了。前提是,我们知道这个单例类里设置的有这个字段,且知道这个字段是什么,总有人会解密破解。
public static void main(String[] args) throws Exception {
//LazyMan instance = LazyMan.getInstance();
//通过反射拿到单例类的私有化字段
Field flag = LazyMan.class.getDeclaredField("flag");
//直接也给这个字段的私有权限绕过
flag.setAccessible(true);
//通过反射创建一个instance对象
Constructor<LazyMan> declaredConstructor = LazyMan.class.getDeclaredConstructor(null);//空参构造器
//无视私有的构造器设置构造器可访问
declaredConstructor.setAccessible(true);
//通过反射进行创建LazyMan对象
LazyMan instance1 = declaredConstructor.newInstance();
//再给新建的实例对象的flag值修改回去就能创建下一个了
flag.set(instance1,false);
LazyMan instance2 = declaredConstructor.newInstance();
//System.out.println(instance);
System.out.println(instance1);
System.out.println(instance2);
}
}
可以看到又创建出来第二个实例了
我们通过看newInstance这个方法发现,它会判断绕如果给定类是枚举类的话就会抛出异常提示不能通过反射创建枚举对象。
(5)使用枚举实现单例
这种方式是实现单例模式的最佳方式,它更为简洁,自动支持序列化机制,绝对防止多次实例化。不仅能避免多线程同步问题,还能自动支持序列化机制,防止反序列化重新创建新的实例对象,防止多次实例化。
public enum EnumSingle {
INSTANCE;
public EnumSingle getInstance(){
return INSTANCE;
}
}
class Test{
public static void main(String[] args) {
EnumSingle instance = EnumSingle.INSTANCE;
EnumSingle instance1 = EnumSingle.INSTANCE;
System.out.println(instance);
System.out.println(instance1);
}
}
这样创建的两个实例对象一定是一样的
我们再次使用反射去破坏一下这个枚举的单例,这里我们没有直接显示的写构造函数,可以去找一下EnumSingle的构造函数是有参还是无参的。在idea中的target包里有反编译class文件,找到EnumSingle.class看到我们的EnumSingle类其实是有无参的构造函数的。
那既然是无参的,我们就使用反射获取无参构造函数
class Test{
public static void main(String[] args) throws Exception {
EnumSingle instance1 = EnumSingle.INSTANCE;
//获取EnumSingle的构造函数
Constructor<EnumSingle> declaredConstructor = EnumSingle.class.getDeclaredConstructor(null);
//设置私有构造函数可访问
declaredConstructor.setAccessible(true);
EnumSingle instance = declaredConstructor.newInstance();
System.out.println(instance1);
System.out.println(instance);
}
}
结果却不是我们想要的那个“Cannot reflectively create enum objects”,它说找不到这样的构造函数,没有无参构造函数,我们明明看到了?
反编译工具会尽量还原原始的源代码结构,但有时会有一些不准确的地方。特别是对于枚举类,反编译工具可能会显示一个私有的无参构造函数,但实际上这个构造函数是受 JVM 保护的,不能被外部代码直接调用。
我们再换一个,使用jad工具反编译得到反编译后的源代码。
再看源代码到底是什么
package com.zm.juc.singleton;
public final class EnumSingle extends Enum//可以看到实际上是继承了枚举类的基类Enum
{
public static EnumSingle[] values()
{
//返回一个包含所有枚举常量的数组副本,$VALUES 是一个私有静态数组,存储了所有的枚举实例。
return (EnumSingle[])$VALUES.clone();
}
public static EnumSingle valueOf(String name)
{
return (EnumSingle)Enum.valueOf(com/zm/juc/singleton/EnumSingle, name);
}
//这里的构造函数就是有参的了,所以前面的无参的是想给你看到的这里一般看不到
private EnumSingle(String s, int i)//s是枚举常量名称,i是枚举常量的序号
{
super(s, i);
}
public EnumSingle getInstance()
{
return INSTANCE;
}
public static final EnumSingle INSTANCE;
private static final EnumSingle $VALUES[];
//静态初始化代码块在类加载的时候初始化 INSTANCE 和 $VALUES 数组。
//INSTANCE 被初始化为一个新的 EnumSingle 实例,传入名称 "INSTANCE" 和序号 0。
//$VALUES 数组被初始化为包含 INSTANCE 的数组。
static
{
INSTANCE = new EnumSingle("INSTANCE", 0);
$VALUES = (new EnumSingle[] {
INSTANCE
});
}
}
这里就能看到构造函数其实是有参数的,s就是枚举常量的名称,i就是枚举常量的序号,那这就好搞了,修改一下传参。
public static void main(String[] args) throws Exception {
EnumSingle instance1 = EnumSingle.INSTANCE;
//获取EnumSingle的构造函数
Constructor<EnumSingle> declaredConstructor = EnumSingle.class.getDeclaredConstructor(String.class,int.class);
//设置私有构造函数可访问
declaredConstructor.setAccessible(true);
EnumSingle instance = declaredConstructor.newInstance();
System.out.println(instance1);
System.out.println(instance);
}
这样想要的报错就出来了
那我非得使用反射创建枚举的实例对象呢?也能,使用valueOf方法,将枚举类和枚举常量名称传入就能获取到枚举实例,而且还不报错,这样获取的实例对象还是同一个。
public static void main(String[] args) throws Exception {
EnumSingle instance1 = EnumSingle.INSTANCE;
//获取EnumSingle的构造函数
/*Constructor<EnumSingle> declaredConstructor = EnumSingle.class.getDeclaredConstructor(String.class,int.class);
//设置私有构造函数可访问
declaredConstructor.setAccessible(true);
EnumSingle instance = declaredConstructor.newInstance();*/
EnumSingle instance = Enum.valueOf(EnumSingle.class, "INSTANCE");
System.out.println(instance1);
System.out.println(instance);
}
总结为什么不能通过反射创建新的枚举实例?
构造器私有化:尽管反编译之后我们看到构造函数里有俩参数,但是这个构造函数是受到JVM保护的,不能通过反射直接调用。
JVM保护:枚举类的实例化是由JVM在类加载的时候完成的,而不是通过普通的构造函数调用,反射机制在这种情况下会被限制,以防止破坏枚举类的单例特性。
25、CAS
CAS(Compare-and-Swap)是一种乐观锁的实现方式,全称为“比较并交换”,是一种无锁的原子操作。
为了保证并发编程中的原子性,最为常见的方法就是加锁,一般我们使用synchronized关键字和Lock加锁或者使用CAS原子操作,他俩各有各的好,synchronized它是基于悲观锁的一种实现,在干什么事情之前必须获得锁,而一旦一个线程获得锁了其他的线程就会阻塞等待锁。
CAS是乐观锁的实现方式,线程执行的时候不会加锁,他会默认操作不会冲突,如果因为冲突失败了,那就重试直到成功为止。由于乐观锁假想操作中没有锁的存在,因此不太可能出现死锁的情况,所以说,乐观锁天生免疫死锁。
25.1CAS的三个值
V(var):要更新的变量;
E(expected): 预期的值;
N(new):新值
比较交换的过程:
先判断V是否等于E,如果等于就把V的值设置为N,如果不等于就说明已经有其他的线程更新了V,于是当前的线程就需要放弃更新,什么都不做。
举个例子:
假设现在有多个线程对一个共享变量 i 操作,i初始值等于10,在当前的线程A中我想把i设置成20,用CA操作是怎么个流程?
先把i和5对比,如果还是等于10,说明没人动过i,那这个时候就能设置i为20,此次的CAS成功,当前的i就被设置为20;
如果不等于10,假如说等于30,说明有线程改过了i,那么当前的A线程就不做什么,此次的CAS失败,i还是30
其实预期值E就是变量的初始值也就是旧值,新值N是我们想要得到的值,所以这里的例子中的i就是V,E就是10,N就是20.
有没有一种可能,线程A判断了了当前的i就是10,正准备给i变成20 的时候中间被截胡了被其他线程修改掉呢?
不可能的,CAS是一种原子操作,它是系统原语,是一条CPU的原子指令,从CPU方面就已经是原子性的。当多个线程同时使用CAS操作一个变量的时候,只有一个操作会成功更新,其他的都会失败,但是失败的线程不会被挂起,只是会被告知更新失败,还能继续尝试或者放弃。
25.2、CAS原理
原子操作Java不能直接实现,都是交给底层JVM调用C或C++实现。java里有一个Usafe类的实例,提供了低级别的内存操作功能,在sun.mic
包中,里面就是一些native
方法,其中关于CAS的操作是使用C++实现的。
演示一个:
之前说的i++
不是原子性的,我们使用AtomicInteger来进行原子性操作就是调用getAndIncrement();方法
public class CASDemo {
public static void main(String[] args) {
AtomicInteger integer = new AtomicInteger(10);
//比较和设置更新compareAndSet,第一个参数是期望值也就是初始值,第二个参数是要更新成的结果
//期望值达到了就更新
integer.compareAndSet(10, 20);
System.out.println(integer.get());
//再来一遍结果值已经被改变期望是10但实际上是20,所以 不能更新还保留20
integer.compareAndSet(10, 30);
System.out.println(integer.get());
}
}
compareAndSet这个方法点进去看一下
public final boolean weakCompareAndSet(int expect, int update) {
//this是当前对象 valueOffset是当前对象中需要操作的字段的偏移量 expect是期望的当前值 update就是新值
return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}
可以看到它是使用unsafe类调用了compareAndSwapInt方法比较和交换才是底层方法
回到之前说的i++
的原子版,getAndIncrement方法还是由unsafe实例对象调用更底层的getAndAddInt方法
public final int getAndIncrement() {
//this是当前对象 valueOffset是当前对象中需要操作的字段的偏移量 1就是要加1
return unsafe.getAndAddInt(this, valueOffset, 1);
}
再看方法getAndAddInt
//Object var1,这个参数代表你想要进行操作的对象。
//long var2,这个参数是你想要操作的 var1 对象中的某个字段的偏移量。这个偏移量可以通过 Unsafe 类的 objectFieldOffset 方法获得。
//int var4,这个参数是你想要增加的值。
public final int getAndAddInt(Object var1, long var2, int var4) {
int var5;
do {
//先获取var1这个对象的内存地址偏移量赋值给var5
var5 = this.getIntVolatile(var1, var2);
//然后开始比较并交换,如果当前对象var1的内存地址偏移值还是期望值var5,那么就更新新值,在这里就是原来的旧值+1,i++操作
} while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
//然后返回最新的结果
return var5;
}
getAndAddInt方法执行过程:
首先在do while循环开始,通过this.getIntVolatile获取当前对象var1的内存地址偏移量var2赋值给var5作为期望值。这里的getIntVolatile方法能够保证读操作的可见性,即读取的结果是最新的写入结果,不会因为指令重排等原因导致读取到旧的值;
然后在执行compareAndSwapInt进行CAS操作,如果对象 var1 在内存地址 var2 处的值等于预期值 var5,则将该位置的值更新为 var5 + var4,并返回 true;否则,不做任何操作并返回 false。
如果 CAS 操作成功,说明我们成功地将 var1 对象的 var2 偏移量处的字段的值更新为 var5 + var4,并且这个更新操作是原子性的,因此我们跳出循环并返回原来的值 var5。
如果 CAS 操作失败,说明在我们尝试更新值的时候,有其他线程修改了该字段的值,所以我们继续循环,重新获取该字段的值,然后再次尝试进行 CAS 操作。
这里的do while是为了保证循环体内的语句至少会被执行一次,这样才能保证return的值是我们所期望的值,也是自旋锁实现机制。
CAS的缺点也就是三大问题:
自旋时间太长;
一次只能保证一个共享变量的原子性;
ABA问题
25.3、CAS的三大问题
1、ABA问题
ABA问题就是原来的A要被变成C,结果中间被人变成了B又给变回了A,那么A还是A只不过被更新过了一次了,再更新成C就更新两次了。CAS是检查不到这点的。
ABA问题的解决思路就是在共享变量前面追加版本号或者时间戳,有一个类就能解决AtomicStampedReference类,这个类的compareAndSet方法的作用首先检查当前引用是否等于预期的引用,并且检查当前标志是否等于预期标志,如果二者都相等,才使用CAS操作设置为新的值,否者更新失败。
AtomicStampedReference类的构造方法:
public AtomicStampedReference(V initialRef, int initialStamp) {
pair = Pair.of(initialRef, initialStamp);
}
Pair这个内部静态类就是用来封装引用对象和时间戳的方便后面构造器中以及compareAndSet方法使用。
private static class Pair<T> {
final T reference;
final int stamp;
private Pair(T reference, int stamp) {
this.reference = reference;
this.stamp = stamp;
}
static <T> Pair<T> of(T reference, int stamp) {
return new Pair<T>(reference, stamp);
}
}
compareAndSet方法
public boolean compareAndSet(V expectedReference,//预期的当前引用
V newReference,//新的引用
int expectedStamp,//预期的当前时间戳
int newStamp) {//新的时间戳
//这里将当前的 pair 对象赋值给局部变量 current。由于 pair 是 volatile 的,这确保了读取到的是最新的值。
Pair<V> current = pair;
return
expectedReference == current.reference &&//当前引用是否和预期值匹配
expectedStamp == current.stamp &&//当前时间戳和预期时间戳匹配
((newReference == current.reference &&//新的引用是否和当前引用相同
newStamp == current.stamp) ||//当前时间戳是否和新的时间戳相同,都相同就true不相同就下一步
//不相同就更新使用CAS操作
casPair(current, Pair.of(newReference, newStamp)));
}
//cas更新
private boolean casPair(Pair<V> cmp, Pair<V> val) {
//调用底层的compareAndSwapObject方法比较并交换
return UNSAFE.compareAndSwapObject(this, pairOffset, cmp, val);
}
举个例子:
A线程是正常需要把引用对象的值从10改成30,改之前被B线程动力手脚,B把10改成了20,然后又从20改回了10,这个时候A再去修改就会发现版本号已经不是A线程预期的那样,而被人变动了,所以这个时候A的修改就会失败。
public class CASABA {
//AtomicStampedReference类构造器需要初始引用对象和初始时间戳
static AtomicStampedReference<Integer> atomicStampedReference = new AtomicStampedReference<Integer>(10,1);
public static void main(String[] args) {
//线程B是中间偷偷做出修改的线程,B先给10改成20,再给20改回10
new Thread(()->{
int stamp = atomicStampedReference.getStamp();//获得当前时间戳(版本号)
System.out.println("Bv1:"+stamp);
System.out.println("B1->"+atomicStampedReference.compareAndSet(10,
20,
atomicStampedReference.getStamp(),
atomicStampedReference.getStamp() + 1));
System.out.println("Bv2:"+atomicStampedReference.getStamp());
//再把引用对象改回去
System.out.println("B2->"+atomicStampedReference.compareAndSet(20,
10,
atomicStampedReference.getStamp(),
atomicStampedReference.getStamp() + 1));
System.out.println("Bv3:"+atomicStampedReference.getStamp());
},"B").start();
//线程A是正常要更改变量的线程要把10改成30
new Thread(()->{
int stamp = atomicStampedReference.getStamp();//获得当前时间戳(版本号)
System.out.println("Av1:"+stamp);
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("A->"+atomicStampedReference.compareAndSet(10,
30,
stamp,//预期的时间戳或版本号应该是最初的1没被人动过手脚
atomicStampedReference.getStamp() + 1));
System.out.println("Av2:"+atomicStampedReference.getStamp());
},"A").start();
}
}
可以看到,B在A修改之前把值修改了,虽然说A的预期值还是10,但预期的版本号已经被改了不是1 而是3了,所以此次A的更新失败。
2、长时间自旋
CAS 多与自旋结合。如果自旋 CAS 长时间不成功,会占用大量的 CPU 资源。
解决方法:
退避机制:在每次 CAS 失败后,让线程短暂休眠一段时间,然后再尝试。这样可以减少 CPU 的占用率,降低自旋的频率。
上锁:在高并发场景下,可以考虑使用锁来替代 CAS 操作。虽然锁会引入额外的开销,但在某些情况下,它可以更有效地管理并发访问。
3、CAS只能单变量
当对一个共享变量执行操作时,CAS 能够保证该变量的原子性。但是对于多个共享变量,CAS 就无法保证操作的原子性,这时通常有两种做法:
使用
AtomicReference
类保证对象之间的原子性,把多个变量放到一个对象里面封装进行 CAS 操作;使用锁。锁内的临界区代码可以保证只有当前线程能操作。
25.3、自旋锁
自旋锁(Spin Lock)是一种同步机制,用于在多线程环境中保护共享资源。与传统的互斥锁(如 synchronized 或 ReentrantLock)不同,自旋锁不会使线程进入阻塞状态,而是让线程在一个循环中不断检查锁的状态,直到锁可用为止。这种机制适用于锁的竞争不激烈且持有时间较短的场景。
自旋锁获取流程:
先尝试获取锁:线程尝试获取锁,如果能获取就执行后面过程;
如果不能获取就自旋等待:获取不到锁也不会被阻塞而是在循环中不断检查锁的状态,直到锁能使用;
释放锁:持有锁的线程完成之后就释放锁。
优点:
低延迟:对于锁竞争不激烈且持有时间比较短的场景,自旋锁能减少上下文切换的开销,提高系统性能;
实现简单:自旋锁实现很简单方便维护。
缺点:
CPU占用率较高:如果锁的竞争非常激烈或者有的任务比较大线程持有锁的时间比较长,自旋会导致CPU占用率比较高,很浪费资源;
不适合长时间持有锁的场景
自定义一个自旋锁:
要点:
原子布尔值来标识锁的状态false就是没人占用,true就是已经被持有
private final AtomicBoolean locked = new AtomicBoolean(false);
lock方法:加锁操作,先来个while无限循环,里面判断锁的状态是不是已经被持有且能够把锁的状态修改成当前线程占用(locked.compareAndSet(false, true)之后再跳出循环,如果长时间没有获得锁的线程可以让它先睡一会,减少CPU占有率。
unlock方法:就是把锁的状态通过compareAndSet再从true占用状态改成false没人占用状态。
自定义自旋锁SpinLockTest
public class SpinLockTest {
private final AtomicBoolean locked = new AtomicBoolean(false);
//上锁
public void lock(){
while (true){
if (!locked.get() && locked.compareAndSet(false,true)){
System.out.println(Thread.currentThread().getName()+"--->lock");
break;//得到锁跳出循环
}
//获取不成功就循环获取,但等1秒再循环
try {
TimeUnit.SECONDS.sleep(1);
System.out.println(Thread.currentThread().getName()+"自旋获取锁中...");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
//解锁
public void unlock(){
if (locked.compareAndSet(true,false)) {
System.out.println(Thread.currentThread().getName()+"--->unlock");
}
}
}
测试一下:
public class SpinLockDemo {
public static void main(String[] args) {
SpinLockTest myLock = new SpinLockTest();
new Thread(()->{
myLock.lock();
try {
TimeUnit.SECONDS.sleep(5);
}catch (Exception e){
e.printStackTrace();
} finally {
myLock.unlock();
}
},"A").start();
new Thread(()->{
myLock.lock();
try {
TimeUnit.SECONDS.sleep(1);
}catch (Exception e){
e.printStackTrace();
} finally {
myLock.unlock();
}
},"B").start();
}
}
A线程完事之后B线程才能拿到锁,A持有锁的时候B线程在自旋等待获得锁。