一、概念
并行与并发
单核 cpu 下,线程实际还是 串行执行 的。操作系统中有一个组件叫做任务调度器,将 cpu 的时间片(windows下时间片最小约为 15 毫秒)分给不同的程序使用,只是由于 cpu 在线程间(时间片很短)的切换非常快,人类感觉是 同时运行的 。总结为一句话就是: 微观串行,宏观并行 ,
一般会将这种 线程轮流使用 CPU 的做法称为并发, concurrent
多核 cpu下,每个 核(core) 都可以调度运行线程,这时候线程可以是并行的。
并行:指两个或多个事件在同一时刻发生(同时发生)。
并发:指两个或多个事件在同一个时间段内发生。
进程与线程
程序由指令和数据组成,但这些指令要运行,数据要读写,就必须将指令加载至 CPU,数据加载至内存。在指令运行过程中还需要用到磁盘、网络等设备。进程就是用来加载指令、管理内存、管理 IO 的
当一个程序被运行,从磁盘加载这个程序的代码至内存,这时就开启了一个进程。
进程是在内存中正在运行的程序(程序 是在磁盘中的静态文件),是动态的划分的内存区域
进程状态反映进程执行过程的变化。这些状态随着进程的执行和外界条件的变化而转换。在三态模型中,进程状态 分为三个基本状态,即运行态,就绪态,阻塞态。在五态模型中,进程分为新建态、终止态,运行态,就绪态,阻 塞态。
线程是依赖于进程执行的最小单元
一个线程就是一个指令流,将指令流中的一条条指令以一定的顺序交给 CPU 执行
多线程是一个进程,多个线程执行,多个线程共享堆内存方法区,栈内存独立。一个线程一个栈
对比:
进程基本上相互独立的,而线程存在于进程内,是进程的一个子集
进程拥有共享的资源,如内存空间等,供其内部的线程共享
进程间通信较为复杂
同一台计算机的进程通信称为 IPC(Inter-process communication)
不同计算机之间的进程通信,需要通过网络,并遵守共同的协议,例如 HTTP
线程通信相对简单,因为它们共享进程内的内存,一个例子是多个线程可以访问同一个共享变量
线程更轻量,线程上下文切换成本一般上要比进程上下文切换低查看进程线程方法
windows
任务管理器可以查看进程和线程数,也可以用来杀死进程
tasklist 查看进程
taskkill 杀死进程
linux
ps -fe 查看所有进程
ps -fT -p <PID> 查看某个进程(PID)的所有线程
kill 杀死进程
top 按大写 H 切换是否显示线程
top -H -p <PID> 查看某个进程(PID)的所有线程
Java
jps 命令查看所有 Java 进程
jstack <PID> 查看某个 Java 进程(PID)的所有线程状态
jconsole 来查看某个 Java 进程中线程的运行情况(图形界面)
三态模型
就绪态(ready):进程具备了运行条件,等待CPU分配才能运行。
运行态(running):进行正在执行,CPU正在处理该程序。
等待态(waiting):又称为阻塞态(blocked),指进程正在等待某件事情完成,不具备运行的状态。
存在的:
就绪态---->运行态:其他进程时间片用完,CPU空闲时被调度选中一个就绪进程执行。
运行态---->就绪态:分配给每个进程的时间片是有限的,运行时间片到了就进入到就绪状态,或出现有更高优
先权进程。
- 运行态---->等待(阻塞)态:正在执行的进程因发生某等待事件而无法执行,则进程由执行状态变为阻塞状态,
如发生了I/O请求(等待外设传输)。
- 等待(阻塞)态---->就绪态:进程所等待的事件已经发生,就进入就绪队列。
还有两种不可能存在的转换:
- 等待(阻塞)态---->运行态:即使给阻塞进程分配CPU,也无法执行,操作系统在进行调度时不会从阻塞队列进
行挑选,而是从就绪队列中选取。
- 就绪态---->等待(阻塞)态:就绪态根本就没有执行,谈不上进入等待态。
五态模型
新建状态:进程在创建时需要申请一个空白进程管理块,向其中填写控制和管理进程的信息,完成资源分配。如果创建工作无法完成,比如资源无法满足,就无法被调度运行,把此时进程所处状态称为创建状态。 创建进程时分为两个阶段,第一个阶段为一个新进程创建必要的管理信息,第二个阶段让该进程进入就绪状 态。由于有了新建态,操作系统往往可以根据系统的性能和主存容量的限制推迟新建态进程的提交。
就绪状态:进程已经准备好,已分配到所需资源,只要分配到CPU就能够立即运行。进程这时的状态称为就 绪状态。在一个系统中处于就绪状态的进程可能有多个,通常将它们排成一个队列,称为就绪队列。例如, 当一个进程由于时间片用完而进入就绪状态时,排入低优先级队列;当进程由I/O操作完成而进入就绪状态 时,排入高优先级队列。
运行状态:进程已获得CPU,其程序正在执行。在单核系统中,只有一个进程处于执行状态; 在多核机系统 中,则有多个进程处于执行状态。在没有其他进程可以执行时(如所有进程都在阻塞状态),通常会自动执 行系统的空闲进程。
等待状态:正在执行的进程由于某些事件而暂时无法运行,CPU便暂时不再处理,进程处于暂停状态,也就 是进程受到阻塞,这种暂停状态称为阻塞状态,有时也称为等待状态。致使暂停的事件有申请缓冲空间失 败,I/O请求等。通常将这种处于阻塞状态的进程也排成一个队列。有的系统则根据阻塞原因的不同而把处于 阻塞状态的进程排成多个队列。在再满足请求时再进入就绪状态。
终止状态:进程结束,或出现错误,或被系统终止,进入终止状态。等待操作系统进行善后处理,然后将其 进程管理块清零,就无法再执行。类似的,进程的终止也可分为两个阶段,第一个阶段等待操作系统进行善 后处理,第二个阶段释放主存。
主方法入栈过程
java命令---启动jvm---启动一个进程----启动主线程(main thread)---调用main方法----main方法入栈(主线程的栈)java线程调度模型为抢占式,根据线程优先级。优先级越高,抢到的几率越大
计算机通常只有一个CPU时,在任意时刻只能执行一条计算机指令,每一个进程只有获得CPU的使用权才能执行指 令。所谓多进程并发运行,从宏观上看,其实是各个进程轮流获得CPU的使用权,分别执行各自的任务。那么,在可运行 池中,会有多个线程处于就绪状态等到CPU,JVM就负责了线程的调度。JVM采用的是抢占式调度,没有采用分时调度, 因此可以能造成多线程执行结果的的随机性。
线程的状态
- new
NEW 尚未启动的线程处于此状态。
l new关键字创建了一个线程之后,该线程就处于新建状态
l JVM为线程分配内存,初始化成员变量值
- 就绪 运行
RUNNABLE 在Java虚拟机中执行的线程处于此状态。 线程可以在java虚拟机中运行的状态,可能正在运行自己代码,也可能没有,这取决于操 作系统处理器
l 当线程对象调用了start()方法之后,该线程处于就绪状态
l JVM为线程创建方法栈和程序计数器,等待线程调度器调度
l 就绪状态的线程获得CPU资源,开始运行run()方法,该线程进入运行状态
- 阻塞
BLOCKED 被阻塞等待监视器锁定的线程处于此状态。 当一个线程试图获取一个对象锁,而该对象锁被其他的线程持有,则该线程进入Blocked状态;当该线程持有锁时,该线程将变成Runnable状态。
当发生如下情况时,线程将会进入阻塞状态
l 线程调用sleep()方法主动放弃所占用的处理器资源
l 线程调用了一个阻塞式IO方法,在该方法返回之前,该线程被阻塞
l 线程试图获得一个同步锁(同步监视器),但该同步锁正被其他线程所持有。
l 线程在等待某个通知(notify)
l 程序调用了线程的suspend()方法将该线程挂起。但这个方法容易导致死锁,所以应该尽量避免使用该方法
WAITING 正在等待另一个线程执行特定动作的线程处于此状态。 一个线程在等待另一个线程执行一个(唤醒)动作时,该线程进入Waiting状态。进入这个状态后是不能自动唤醒的,必须等待另一个线程调用notify或者notifyAll方法才能够唤醒。
TIMED_WAITING 正在等待另一个线程执行动作达到指定等待时间的线程处于此状态 同waiting状态,有几个方法有超时参数,调用他们将进入Timed Waiting状态。这一状态 将一直保持到超时期满或者接收到唤醒通知。带有超时参数的常用方法有Thread.sleep 、Object.wait。
- 死亡
TERMINATED 已退出的线程处于此状态 因为run方法正常退出而死亡,或者因为没有捕获的异常终止了run方法而死亡。
线程会以如下3种方式结束,结束后就处于死亡状态:
l run()或call()方法执行完成,线程正常结束。
l 线程抛出一个未捕获的Exception或Error。
l 调用该线程stop()方法来结束该线程,该方法容易导致死锁,不推荐使用。
Thread.getState() 方法可以获取线程的状态。 一个线程可以在给定时间点处于一个状态。 这些状态是不反映任何操作系统线程状态的虚拟机状态。
多线程实现方式
- 实现Runable接口和继承Thread的对比:
l 接口更适合多个相同的程序代码的线程去共享同一个资源。
l 接口可以避免java中的单继承的局限性。
l 接口代码可以被多个线程共享,代码和线程独立。
l 线程池只能放入实现Runable或Callable接口的线程,不能直接放入继承Thread的类。
- Runnable和Callable接口比较
相同:
l 两者都是接口;
l 两者都可用来编写多线程程序;
l 两者都需要调用Thread.start()启动线程;
不同:
l 实现Callable接口的线程能返回执行结果;而实现Runnable接口的线程不能返回结果;
l Callable接口的call()方法允许抛出异常;而Runnable接口的run()方法的不允许抛异常;
l 实现Callable接口的线程可以调用Future.cancel取消执行 ,而实现Runnable接口的线程不能
//Thread方式
package com.note.plan.dynamicCompiling;
/**
* @Classname ThreadType
* @Description 多线程Thread实现
*/
public class ThreadType extends Thread {
@Override
public void run() {
// 多线程逻辑
}
// 启动线程
public static void main(String[] args) {
ThreadType threadType = new ThreadType();
// 使用start为开启一个线程执行run方法的逻辑。run方法是直接调用
threadType.start();
}
}//Runnable方式
package com.note.plan.dynamicCompiling;
/**
* @Classname RunnableType
* @Description Runnable方式
*/
public class RunnableType implements Runnable {
@Override
public void run() {
}
public static void main(String[] args) {
Thread thread = new Thread(new RunnableType());
thread.start();
}
}// 实现Callable接口
// Callable需要使用FutureTask类帮助执行
//判断任务是否完成:isDone()
//能够中断任务:cancel()
//能够获取任务执行结果:get()
import java.util.Date;
import java.util.concurrent.Callable;
public class MyCallable implements Callable<String> {
public String call() throws Exception {
for (int i=0; i<10; i++){
System.out.println("MyCallable正在执行:"+new Date().getTime());
}
return "MyCallable执行完毕!";
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
FutureTask task = new FutureTask(new MyCallable());
Thread thread = new Thread(task);
thread.start();
for (int i=0; i<10; i++){
System.out.println("main主线程正在执行:"+new Date().getTime());
}
System.out.println(task.get());
}
}// 线程池-Executor
import java.util.Date;
import java.util.concurrent.*;
public class MyRunable implements Runnable {
public void run() {
for (int i=0; i<10; i++){
System.out.println("MyRunnable线程正在执行:"+new Date().getTime());
}
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
//1.使用Executors创建线程池
ExecutorService executorService = Executors.newFixedThreadPool(10);
//2.通过线程池执行线程
executorService.execute(new MyRunable());
//3.主线程循环打印
for (int i=0; i<10; i++){
System.out.println("main主线程正在执行:"+new Date().getTime());
}
}
}线程运行原理
栈与栈帧
Java Virtual Machine Stacks (Java 虚拟机栈)
我们都知道 JVM 中由堆、栈、方法区所组成,其中栈内存是给谁用的呢?其实就是线程,每个线程启动后,虚拟 机就会为其分配一块栈内存。
每个栈由多个栈帧(Frame)组成,对应着每次方法调用时所占用的内存
每个线程只能有一个活动栈帧,对应着当前正在执行的那个方法
线程上下文切换(Thread Context Switch)
因为以下一些原因导致 cpu 不再执行当前的线程,转而执行另一个线程的代码
线程的 cpu 时间片用完
垃圾回收
有更高优先级的线程需要运行
线程自己调用了 sleep、yield、wait、join、park、synchronized、lock 等方法
当 Context Switch 发生时,需要由操作系统保存当前线程的状态,并恢复另一个线程的状态,Java 中对应的概念 就是程序计数器(Program Counter Register),它的作用是记住下一条 jvm 指令的执行地址,是线程私有的
- 状态包括程序计数器、虚拟机栈中每个栈帧的信息,如局部变量、操作数栈、返回地址等
- Context Switch 频繁发生会影响性能
Thread主要方法
//返回对当前正在执行的线程对象的引用
public static Thread currentThread();
//返回该线程的名称
String getName();
// 返回线程的优先级 分配资源的概率比较高
// 优先级高的执行概率较高,并不是一定最先执行
int getPriority();
// 更改线程的优先级
void setPriority(int newPriority);
// 在指定的毫秒数内让当前正在执行的线程休眠(暂停执行)
public static void sleep(long millis)
// 中断线程,
// 中断线程的休眠
public void interrupt()
// 等待该线程终止。
// 代码所在的线程 等待 调用 调用方法的线程的结束再执行
// 即在线程a插入中使用该方法join()线程b a线程需要等到线程b执行完成后再执行
public final void join()
// 暂停当前正在执行的线程对象,并执行其他线程。
// 执行的其他线程也包括这个正在执行的线程
public static void yield()
// 将该线程标记为守护线程或用户线程(true为守护线程,false为用户线程)
// 当程序中都是守护线程,虚拟机停止
public final void setDaemon(boolean on)
// 状态
getState();
// 打断 sleep,wait,join 的线程 这几个方法都会让线程进入阻塞状态
// 打断 sleep 的线程, 会清空打断状态
interrupt();同步方法解决多线程数据安全问题
// 同步代码快
// 锁对象:可以为任意对象,但若要完成多个线程的同步操作,这多个锁对象必须使用相同的锁对象,否则,将无法完成多个线程的同步操作
synchronized (锁对象){
}
// 同步方法
// 同步方法的锁对象是this(当前对象,谁调用为谁)
// 若是静态方法,其锁对象为静态方法所属的类的对象
public synchronized void syncDemo(){
}Lock锁
Lock 实现提供了比使用 synchronized 方法和语句可获得的更广泛的锁定操作。
l Lock需在finally中手工释放锁(unlock()方法释放锁),否则容易造成线程死锁;
主要方法
java// 通过实现该接口的子类新建对象 // 构造函数参数为线程是否公平获取锁true-公平;false-不公平,即由某个线程独占,默认是false Lock l = new ReentrantLock(true); // 获取锁。 void lock(); //释放锁 void unlock(); //lock()相当于synchronized代码块的开始,unlock()相当于代码块的结束,lock可在更大的范围使用,必须解锁的一般配合try finally使用
Synchronized和Lock区别
l synchronized是java内置关键字,在jvm层面,Lock是个java类;
l synchronized无法判断是否获取锁的状态,Lock可以判断是否获取到锁;
l synchronized会自动释放锁(a 线程执行完同步代码会释放锁 ;b 线程执行过程中发生异常会释放锁),Lock需在finally中手工释放锁(unlock()方法释放锁),否则容易造成线程死锁;
l 用synchronized关键字的两个线程1和线程2,如果当前线程1获得锁,线程2线程等待。如果线程1阻塞,线程2则会一直等待下去,而Lock锁就不一定会等待下去,如果尝试获取不到锁,线程可以不用一直等待就结束了;
l synchronized的锁可重入、不可中断、非公平,而Lock锁可重入、可判断、可公平(两者皆可)
l Lock锁适合大量同步的代码的同步问题,synchronized锁适合代码少量的同步问题。
死锁
即多个线程之间彼此锁定占用对方所需要的资源。只能解除一方占用处理。或者避免.
产生条件:
互斥条件
进程要求对所分配的资源(如打印机)进行排他性控制,即在一段时间内某资源仅为一个进程所占有。此时若有其他进程请求该资源,则请求进程只能等待。
不可剥夺条件
进程所获得的资源在未使用完毕之前,不能被其他进程强行夺走,即只能由获得该资源的进程自己来释放(只能是主动释放)。
请求与保持条件
进程已经保持了至少一个资源,但又提出了新的资源请求,而该资源已被其他进程占有,此时请求进程被阻塞,但对自己已获得的资源保持不放。
循环等待条件
存在一种进程资源的循环等待链,链中每一个进程已获得的资源同时被 链中下一个进程所请求。即存在一个处于等待状态的进程集合{Pl, P2, …, pn},其中Pi等 待的资源被P(i+1)占有(i=0, 1, …, n-1),Pn等待的资源被P0占有
处理:
l 预防死锁:通过设置某些限制条件,去破坏产生死锁的四个必要条件中的一个或几个条件,来防止死锁的发生。
l 避免死锁:在资源的动态分配过程中,用某种方法去防止系统进入不安全状态,从而避免死锁的发生。
l 检测死锁:允许系统在运行过程中发生死锁,但可设置检测机构及时检测死锁的发生,并采取适当措施加以清除。
l 解除死锁:当检测出死锁后,便采取适当措施将进程从死锁状态中解脱出来。
死锁预防:
- 破坏“互斥”条件
“互斥”条件是无法破坏的。因此,在死锁预防里主要是破坏其他几个必要条件,而不去涉及破坏“互斥”条件。
- 破坏“占有并等待”条件
破坏“占有并等待”条件,就是在系统中不允许进程在已获得某种资源的情况下,申请其他资源。即要想出一个办法,阻止进程在持有资源的同时申请其他资源。
l 方法一:一次性分配资源,即创建进程时,要求它申请所需的全部资源,系统或满足其所有要求,或什么也不给它。
l 方法二:要求每个进程提出新的资源申请前,释放它所占有的资源。这样,一个进程在需要资源S时,须先把它先前占有的资源R释放掉,然后才能提出对S的申请,即使它可能很快又要用到资源R。
破坏“不可抢占”条件
破坏“不可抢占”条件就是允许对资源实行抢夺。
l 方法一:如果占有某些资源的一个进程进行进一步资源请求被拒绝,则该进程必须释放它最初占有的资源,如果有必要,可再次请求这些资源和另外的资源。
l 方法二:如果一个进程请求当前被另一个进程占有的一个资源,则操作系统可以抢占另一个进程,要求它释放资源。只有在任意两个进程的优先级都不相同的条件下,方法二才能预防死锁。
破坏“循环等待”条件
破坏“循环等待”条件的一种方法,是将系统中的所有资源统一编号,进程可在任何时刻提出资源申请,但所有申请必须按照资源的编号顺序(升序)提出。这样做就能保证系统不出现死锁。
死锁避免:
避免死锁不严格限制产生死锁的必要条件的存在,因为即使死锁的必要条件存在,也不一定发生死锁。
有序资源分配法
l 必须为所有资源统一编号,例如打印机为1、传真机为2、磁盘为3等
l 同类资源必须一次申请完,例如打印机和传真机一般为同一个机器,必须同时申请
l 不同类资源必须按顺序申请
银行家算法
银行家算法(Banker's Algorithm)是一个避免死锁(Deadlock)的著名算法,是由艾兹格·迪杰斯特拉在1965年为T.H.E系统设计的一种避免死锁产生的算法。
银行家算法的基本思想是分配资源之前,判断系统是否是安全的;若是,才分配。它是最具有代表性的避免死锁的算法。
设进程i提出请求REQUEST [i],则银行家算法按如下规则进行判断。
如果
REQUEST [i]<= NEED[i,j],则转(2);否则,出错。如果
REQUEST [i]<= AVAILABLE[i],则转(3);否则,等待。系统试探分配资源,修改相关数据:
AVAILABLE[i]-=REQUEST[i];//可用资源数-请求资源数
ALLOCATION[i]+=REQUEST[i];//已分配资源数+请求资源数
NEED[i]-=REQUEST[i];//需要资源数-请求资源数
- 系统执行安全性检查,如安全,则分配成立;否则试探险性分配作废,系统恢复原状,进程等待。
顺序枷锁
当多个线程需要相同的一些锁,但是按照不同的顺序加锁,死锁就很容易发生。
按照顺序加锁是一种有效的死锁预防机制。但是,这种方式需要事先知道所有可能会用到的锁,但总有些时候是无法预知的,所以该种方式只适合特定场景。
限时加锁
限时加锁是线程在尝试获取锁的时候加一个超时时间,若超过这个时间则放弃对该锁请求,并回退并释放所有已经获得的锁,然后等待一段随机的时间再重试
这种方式有两个缺点:
当线程数量少时,该种方式可避免死锁,但当线程数量过多,这些线程的加锁时限相同的概率就高很多,可能会导致超时后重试的死循环。Java中不能对synchronized同步块设置超时时间。你需要创建一个自定义锁,或使用Java5中java.util.concurrent包下的工具。
死锁检测
预防和避免死锁系统开销大且不能充分利用资源,更好的方法是不采取任何限制性措施,而是提供检测和解脱死锁的手段,这就是死锁检测和恢复。
死锁检测步骤:
寻找一个没有结束标记的进程Pi,对于它而言R矩阵的第i行向量小于或等于A。如果找到了这样一个进程,执行该进程,然后将C矩阵的第i行向量加到A中,标记该进程,并转到第1步如果没有这样的进程,那么算法终止算法结束时,所有没有标记过的进程都是死锁进程。
死锁恢复
利用抢占恢复。
临时将某个资源从它的当前所属进程转移到另一个进程。
这种做法很可能需要人工干预,主要做法是否可行需取决于资源本身的特性。
利用回滚恢复
周期性的将进程的状态进行备份,当发现进程死锁后,根据备份将该进程复位到一个更早的,还没有取得所需的资源的状态,接着就把这些资源分配给其他死锁进程。
通过杀死进程恢复
最直接简单的方式就是杀死一个或若干个进程。
尽可能保证杀死的进程可以从头再来而不带来副作用。
定位死锁
检测死锁可以使用 jconsole工具,或者使用 jps 定位进程 id,再用 jstack 定位死锁:
Timer:定时器
package com.note.plan.dynamicCompiling;
import java.util.Timer;
import java.util.TimerTask;
/**
* @Classname TimerDemo
* @Description Timer的demo
*/
public class TimerDemo {
public static void main(String[] args) {
//新建一个Timer对象
Timer timer = new Timer();
//在延迟delay时间后执行
timer.schedule(new TimerTask() {
@Override
public void run() {
}
}, 0);
//在指定delay延迟事件后,以固定period间隔执行
timer.schedule(new TimerTask() {
@Override
public void run() {
}
},0,100);
}
}线程池
线程池是一种多线程处理形式,处理过程中将任务添加到队列,然后在创建线程后自动启动这些任务。线程池线程都是后台线程。
使用步骤:
1:利用Executors工厂类的静态方法,创建线程池对象;
2:编写Runnable或Callable实现类的实例对象;
3:利用ExecutorService的submit方法或ScheduledExecutorService的schedule方 法提交并执行线程任务
4:如果有执行结果,则处理异步执行结果(Future)
5:调用shutdown()方法,关闭线程池Executor接口:声明了execute(Runnable runnable)方法,执行任务代码
ExecutorService接口:继承Executor接口,声明方法:submit、invokeAll、invokeAny以及shutDown等
AbstractExecutorService抽象类:实现ExecutorService接口,基本实现ExecutorService中声明的所有方法
ScheduledExecutorService接口:继承ExecutorService接口,声明定时执行任务方法
ThreadPoolExecutor类:继承类AbstractExecutorService,实现execute、submit、shutdown、shutdownNow方法
ScheduledThreadPoolExecutor类:继承ThreadPoolExecutor类,实现ScheduledExecutorService接口并实现其中的方法
Executors类:提供快速创建线程池的方法
主要方法
java// 创建一个可重用固定线程数的线程池,以共享的无界队列方式来运行这些线程 static ExecutorService newFixedThreadPool(int nThreads); //ExecutorService es = Executors.newFixedThreadPool(3); // 提交一个 Runnable 任务用于执行,并返回一个表示该任务的 Future。 // 可调用 Future 的 get 方法在成功 完成时将会返回 null。?表泛型 Future<?> submit(Runnable task) // 启动一次顺序关闭,执行以前提交的任务,但不接受新任务。 void shutdown() // 停止所有正在执行的任务,暂停处理正在等待的任务,并返回等待执行的任务列表。 List<Runnable> shutdownNow() // 执行带返回值的任务,返回一个Future对象。 <T> Future<T> submit(Callable<T> task) // 执行 Runnable 任务,并返回一个表示该任务的 Future。 <T> Future<T> submit(Runnable task, T result)
Future 的常用方法:
// 试图取消对此任务的执行。
boolean cancel(boolean mayInterruptIfRunning)
// 如有必要,等待计算完成,然后获取其结果。
V get()
// 如有必要,最多等待为使计算完成所给定的时间之后,获取其结果(如果结果可用)。
V get(long timeout, TimeUnit unit)
// 如果在任务正常完成前将其取消,则返回 true。
boolean isCancelled()
// 如果任务已完成,则返回 true。
boolean isDone()线程通信
多个线程并发执行时,在默认情况下CPU是随机切换线程的,有时我们希望CPU按我们的规律执行线程,此时就需要线程之间协调通信。
通信方式
线程间通信常用方式如下:
l 休眠唤醒方式:
Object的wait、notify、notifyAll
LockSupport的.park、unpark
Condition的await、signal、signalAll
l CountDownLatch:用于某个线程A等待若干个其他线程执行完之后,它才执行
l CyclicBarrier:一组线程等待至某个状态之后再全部同时执行
l Semaphore:用于控制对某组资源的访问权限
Object和Condition休眠唤醒区别
l object wait()必须在synchronized(同步锁)下使用,
l object wait()必须要通过Nodify()方法进行唤醒
l condition await() 必须和Lock(互斥锁/共享锁)配合使用
l condition await() 必须通过 signal() 方法进行唤醒
LockSupport与 Object休眠唤醒的区别
wait,notify 和 notifyAll 必须配合 Object Monitor 一起使用,而 park,unpark 不必
park & unpark 是以线程为单位来【阻塞】和【唤醒】线程,而 notify 只能随机唤醒一个等待线程,notifyAll 是唤醒所有等待线程,就不那么【精确】
park & unpark 可以先 unpark,而 wait & notify 不能先 notify
CountDownLatch方式
CountDownLatch这个类能够使一个线程等待其他线程完成各自的工作后再执行。
CountDownLatch是通过一个计数器来实现的,计数器的初始值为线程的数量。
每当一个线程完成了自己的任务后,计数器的值就会减1。当计数器值到达0时,它表示所有的线程已经完成了任务,然后在闭锁上等待的线程就可以恢复执行任务。
import java.util.concurrent.CountDownLatch;
public class CountDown {
private Integer i = 0;
private CountDownLatch countDownLatch = new CountDownLatch(1);
public void odd(){
while(i < 10){
if(i%2 == 1){
System.out.println("奇数:"+i);
i++;
countDownLatch.countDown();
} else {
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public void even(){
while(i < 10){
if(i%2 == 0){
System.out.println("偶数:"+i);
i++;
countDownLatch.countDown();
} else {
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public static void main(String[] args){
final CountDown countDown = new CountDown();
Thread t1 = new Thread(new Runnable() {
public void run() {
countDown.odd();
}
},"奇数");
Thread t2 = new Thread(new Runnable() {
public void run() {
countDown.even();
}
},"偶数");
t1.start();
t2.start();
}
}CyclicBarrier方式
CyclicBarrier实现让一组线程等待至某个状态之后再全部同时执行。
import java.util.Date;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class CyclicBarrierDemo {
public static void main(String[] args){
final CyclicBarrier cyclicBarrier = new CyclicBarrier(3);
new Thread(new Runnable() {
public void run() {
System.out.println(Thread.currentThread().getName()+":准备...");
try {
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+"启动完毕:"+new Date().getTime());
}
},"线程1").start();
new Thread(new Runnable() {
public void run() {
System.out.println(Thread.currentThread().getName()+":准备...");
try {
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+"启动完毕:"+new Date().getTime());
}
},"线程2").start();
new Thread(new Runnable() {
public void run() {
System.out.println(Thread.currentThread().getName()+":准备...");
try {
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+"启动完毕:"+new Date().getTime());
}
},"线程3").start();
}
}Semaphore方式
Semaphore用于控制对某组资源的访问权限。
import java.util.concurrent.Semaphore;
public class SemaphoreDemo {
static class Machine implements Runnable{
private int num;
private Semaphore semaphore;
public Machine(int num, Semaphore semaphore) {
this.num = num;
this.semaphore = semaphore;
}
public void run() {
try {
semaphore.acquire();//请求机器
System.out.println("工人"+this.num+"请求机器,正在使用机器");
Thread.sleep(1000);
System.out.println("工人"+this.num+"使用完毕,已经释放机器");
semaphore.release();//释放机器
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args){
int worker = 8;//工人数
Semaphore semaphore = new Semaphore(3);//机器数
for (int i=0; i< worker; i++){
new Thread(new Machine(i, semaphore)).start();
}
}
}线程唤醒示例
package com.lly.javabasic.Thread;
/**
* @Classname ThreadCommunication
* @Description 多线程通信 休眠 唤醒
*/
public class ThreadCommunication {
public static void main(String[] args) {
Communication communication = new Communication();
new Thread() {
@Override
public void run() {
while (true){
communication.produce();
}
}
}.start();
new Thread() {
@Override
public void run() {
while (true) {
communication.consumer();
}
}
}.start();
}
}
class Communication {
//是否生产
volatile boolean falg = false;
int i = 0;
/**
* 生产者
*/
public void produce() {
synchronized (this) {
//已生产则进行等待,没有则生产
if (!falg) {
i++;
System.out.println(" 生产 : " + i);
falg= true;
//唤醒 消费者去消费
this.notify();
} else {
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
/**
* 消费者
*/
public void consumer() {
synchronized (this){
// 已生产则进行消费,没有则等待
if(falg){
System.out.println(" 消费 : " + i);
falg= false;
//唤醒生产
this.notify();
}else {
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}sleep和wait区别
| wait | sleep | |
|---|---|---|
| 同步 | 只能再同步上下文中调用wait方法,否则或抛出异常 | 不需要再同步方法或同步块中使用 |
| 作用对象 | 定义在Object类中,作用于对象本身 | 定义在java.lang.Thread中,作用于当前线程 |
| 释放锁资源 | 是 | 否 |
| 唤醒条件 | 其他现场调用对象的notify()或notifyAll()方法 | 超时或调用interrupt()方法体 |
| 方法属性 | wait是实例方法 | 静态方法 |
wait和notify区别
wait和notify都是Object中的方法
wait和notify执行前线程都必须获得对象锁
wait的作用是使当前线程进行等待
notify的作用是通知其他等待当前线程的对象锁的线程
多个线程轮流打印数字
package com.lly.javabasic.Thread;
/**
* @Classname TakeTurnsPrint
* @Description 轮流打印数字
*/
public class TakeTurnsPrint {
// 打印的数字
int num = 1;
// 线程数
int max = 4;
// 锁对象
final Object lock = new Object();
class PrintThread extends Thread {
// 线程索引标识
private int index;
public void setLock(int index) {
this.index = index;
// 线程名称
this.setName(""+index);
}
public void run() {
while (num < 100) {
synchronized (lock) {
lock.notify();// 唤醒lock上的其他的线程,唤醒是随机的
// 根据数字取余判断抢到执行的线程是否是正确的打印的线程
if (num % max == index) {
System.out.println(this.getName() + "num = " + num++);
}
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
public void startJob() {
for (int i = 0; i < max; i++) {
PrintThread changeThread = new PrintThread();
changeThread.setLock(i);
changeThread.start();
}
}
public static void main(String[] args) {
TakeTurnsPrint changeThreadTest = new TakeTurnsPrint();
changeThreadTest.startJob();
}
}二、java内存模型
Java源代码文件(.java后缀)会被Java编译器编译为字节码文件(.class后缀),
然后由JVM中的类加载器加载各个类的字节码文件,
加载完毕之后,交由JVM执行引擎执行。
Java内存模型指的就是Runtime Data Area(运行时数据区),即程序执行期间用到的数据和相关信息保存区。
JVM 内存共分为虚拟机栈、堆、方法区、程序计数器、本地方法栈五个部分。结构如下图:

程序计数器
l 每个线程对应有一个程序计数器。
l 各线程的程序计数器是线程私有的,互不影响,是线程安全的。
l 程序计数器记录线程正在执行的内存地址,以便被中断线程恢复执行时再次按照中断时的指令地址继续执行
java栈(虚拟机栈 jvm stack)
l 每个线程会对应一个Java栈;
l 每个Java栈由若干栈帧组成;
l 每个方法对应一个栈帧;
l 栈帧在方法运行时,创建并入栈;方法执行完,该栈帧弹出栈帧中的元素作为该方法返回值,该栈帧被清除;
l 栈顶的栈帧叫活动栈,表示当前执行的方法,才可以被CPU执行;
l 线程请求的栈深度大于虚拟机所允许的深度,将抛出StackOverflowError异常;
l 栈扩展时无法申请到足够的内存,就会抛出OutOfMemoryError异常;
方法区
l 方法区是Java堆的永久区(PermanetGeneration)
l 方法区存放了要加载的类的信息(名称、修饰符等)、类中的静态常量、类中定义为final类型的常量、类中的Field信息、类中的方法信息,
l 方法区是被Java线程共享的
l 方法区要使用的内存超过其允许的大小时,会抛出OutOfMemoryError: PremGen space的错误信息。
常量池ConstantPool
l 常量池是方法区的一部分。
l 常量池中存储两类数据:字面量和引用量。
字面量:字符串、final变量等。
引用量:类/接口、方法和字段的名称和描述符,
l 常量池在编译期间就被确定,并保存在已编译的.class文件中
本地方法栈Native Method Stack
l 本地方法栈和Java栈所发挥的作用非常相似,区别不过是Java栈为JVM执行Java方法服务,而本地方法栈为JVM执行Native方法服务。
l 本地方法栈也会抛出StackOverflowError和OutOfMemoryError异常。
Java内存模型工作示意图

首先类加载器将Java代码加载到方法区然后执行引擎从方法区找到main方法为方法创建栈帧放入方法栈,同时创建该栈帧的程序计数器执行引擎请求CPU执行该方法CPU将方法栈数据加载到工作内存(寄存器和高速缓存),执行该方法CPU执行完之后将执行结果从工作内存同步到主内存
线程计算的时候,原始的数据来自内存,在计算过程中,有些数据可能被频繁读取,这些数据被存储在寄存器和高速缓存中,当线程计算完后,这些缓存的数据在适当的时候应该写回内存。
当个多个线程同时读写某个内存数据时,就会产生多线程并发问题,要解决这些问题就涉及到多线程编程三个特性:原子性,有序性,可见性。
多线程特性
多线程编程要保证满足三个特性:原子性、可见性、有序性。
原子性
即一个操作或者多个操作 要么全部执行并且执行的过程不会被任何因素打断,要么就都不执行。
可见性
可见性是指当多个线程访问同一个变量时,一个线程修改了这个变量的值,其他线程能够立即看得到修改的值。显然,对于单线程来说,可见性问题是不存在的。
有序性
有序性即程序执行的顺序按照代码的先后顺序执行。
多线程控制
ThreadLocal
ThreadLocal提供线程局部变量,即为使用相同变量的每一个线程维护一个该变量的副本。
当某些数据是以线程为作用域并且不同线程具有不同的数据副本的时候,就可以考虑采用ThreadLocal,比如数据库连接Connection,每个请求处理线程都需要,但又不相互影响,就是用ThreadLocal实现。
public class Bank {
ThreadLocal<Integer> t = new ThreadLocal<Integer>(){
@Override
protected Integer initialValue(){
return 0;
}
};
public Integer get(){
return t.get();
}
public void set(){
t.set(t.get()+10);
}
public static void main(String[] args){
Bank bank = new Bank();
Transfer transfer = new Transfer(bank);
Thread t1 = new Thread(transfer);
Thread t2 = new Thread(transfer);
t1.start();
t2.start();
}
}
class Transfer implements Runnable{
Bank bank;
public Transfer(Bank bank){
this.bank = bank;
}
public void run() {
for (int i=0;i<10;i++){
bank.set();
System.out.println(Thread.currentThread()+""+bank.get());
}
}
}l 在ThreadLocal类中定义了一个ThreadLocalMap,
l 每一个Thread都有一个ThreadLocalMap类型的变量threadLocals
l threadLocals内部有一个Entry,Entry的key是ThreadLocal对象实例,value就是共享变量副本
l ThreadLocal的get方法就是根据ThreadLocal对象实例获取共享变量副本
l ThreadLocal的set方法就是根据ThreadLocal对象实例保存共享变量副本
原子类
Java的java.util.concurrent.atomic包里面提供了很多可以进行原子操作的类,分为以下四类:
l 原子更新基本类型:AtomicInteger、AtomicBoolean、AtomicLong
l 原子更新数组:AtomicIntegerArray、AtomicLongArray
l 原子更新引用:AtomicReference、AtomicStampedReference等
l 原子更新属性:AtomicIntegerFieldUpdater、AtomicLongFieldUpdater
原子操作问题示例,使用静态变量时,最终值可能不是2000。
import java.util.concurrent.atomic.AtomicInteger;
public class AtomicClass {
static int n = 0; // static AtomicInteger n;
public static void main(String[] args) throws InterruptedException {
int j = 0;
while(j<100){
n = 0;//n = new AtomicInteger(0);
Thread t1 = new Thread(){
public void run(){
for(int i=0; i<1000; i++){
n++;//n.getAndIncrement();
}
}
};
Thread t2 = new Thread(){
public void run(){
for(int i=0; i<1000; i++){
n++;//n.getAndIncrement();
}
}
};
t1.start();
t2.start();
t1.join();
t2.join();
System.out.println("n的最终值是:"+n);
j++;
}
}
}原子类CAS分析
CAS的ABA问题及解决
当前内存的值一开始是A,被另外一个线程先改为B然后再改为A,那么当前线程访问的时候发现是A,则认为它没有被其他线程访问过。在某些场景下这样是存在错误风险的。
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicStampedReference;
public class AtomicClass {
static AtomicStampedReference<Integer> n;
public static void main(String[] args) throws InterruptedException {
int j = 0;
while(j<100){
n = new AtomicStampedReference<Integer>(0,0);
Thread t1 = new Thread(){
public void run(){
for(int i=0; i<1000; i++){
int stamp;
Integer reference;
do{
stamp = n.getStamp();
reference = n.getReference();
} while(!n.compareAndSet(reference, reference+1, stamp, stamp+1));
}
}
};
Thread t2 = new Thread(){
public void run(){
for(int i=0; i<1000; i++){
int stamp;
Integer reference;
do{
stamp = n.getStamp();
reference = n.getReference();
} while(!n.compareAndSet(reference, reference+1, stamp, stamp+1));
}
}
};
t1.start();
t2.start();
t1.join();
t2.join();
System.out.println("n的最终值是:"+n.getReference());
j++;
}
}
}Lock类
Lock和ReadWriteLock是两大锁的根接口
Lock 接口支持重入、公平等的锁规则:实现类 ReentrantLock、ReadLock和WriteLock。 ReadWriteLock 接口定义读取者共享而写入者独占的锁,实现类:ReentrantReadWriteLock。
Lock接口关系图:

可重入锁
不可重入锁,即线程请求它已经拥有的锁时会阻塞。
可重入锁,即线程可以进入它已经拥有的锁的同步代码块。
public class ReentrantLockTest {
public static void main(String[] args) throws InterruptedException {
ReentrantLock lock = new ReentrantLock();
for (int i = 1; i <= 3; i++) {
lock.lock();
}
for(int i=1;i<=3;i++){
try {
} finally {
lock.unlock();
}
}
}
}读写锁
读写锁,即可以同时读,读的时候不能写;不能同时写,写的时候不能读。
读锁不支持条件变量。重入时升级不支持:即持有读锁的情况下去获取写锁,会导致获取写锁永久等待
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* 读写操作类
*/
public class ReadWriteLockDemo {
private Map<String, Object> map = new HashMap<String, Object>();
//创建一个读写锁实例
private ReadWriteLock rw = new ReentrantReadWriteLock();
//创建一个读锁
private Lock r = rw.readLock();
//创建一个写锁
private Lock w = rw.writeLock();
/**
* 读操作
*
* @param key
* @return
*/
public Object get(String key) {
r.lock();
System.out.println(Thread.currentThread().getName() + "读操作开始执行......");
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
return map.get(key);
} finally {
r.unlock();
System.out.println(Thread.currentThread().getName() + "读操作执行完成......");
}
}
/**
* 写操作
*
* @param key
* @param value
*/
public void put(String key, Object value) {
try {
w.lock();
System.out.println(Thread.currentThread().getName() + "写操作开始执行......");
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
map.put(key, value);
} finally {
w.unlock();
System.out.println(Thread.currentThread().getName() + "写操作执行完成......");
}
}
public static void main(String[] args) {
final ReadWriteLockDemo d = new ReadWriteLockDemo();
d.put("key1", "value1");
new Thread(new Runnable() {
public void run() {
d.get("key1");
}
}).start();
new Thread(new Runnable() {
public void run() {
d.get("key1");
}
}).start();
new Thread(new Runnable() {
public void run() {
d.get("key1");
}
}).start();
}
}StampedLock
该类自 JDK 8 加入,是为了进一步优化读性能,它的特点是在使用读锁、写锁时都必须配合【戳】使用
// 加解读锁
long stamp = lock.readLock();
lock.unlockRead(stamp);
// 加解写锁
long stamp = lock.writeLock();
lock.unlockWrite(stamp);
// 乐观读,StampedLock 支持 tryOptimisticRead() 方法(乐观读),读取完毕后需要做一次 戳校验 如果校验通过,表示这期间确实没有写操作,数据可以安全使用,如果校验没通过,需要重新获取读锁,保证数据安全。
long stamp = lock.tryOptimisticRead();
// 验戳
if(!lock.validate(stamp)){
// 锁升级
}
// 提供一个 数据容器类 内部分别使用读锁保护数据的 read() 方法,写锁保护数据的 write() 方法Volatile关键字
一个共享变量(类的成员变量、类的静态成员变量)被volatile修饰之后,那么就具备了两层语义:
l 保证了不同线程对这个变量进行操作时的可见性,即一个线程修改了某个变量的值,这新值对其他线程来说是立即可见的。(注意:不保证原子性)
l 禁止进行指令重排序。(保证变量所在行的有序性)
当程序执行到volatile变量的读操作或者写操作时,在其前面的操作的更改肯定全部已经进行,且结果已经对后面的操作可见;在其后面的操作肯定还没有进行;
在进行指令优化时,不能将在对volatile变量访问的语句放在其后面执行,也不能把volatile变量后面的语句放到其前面执行。
应用场景:
基于volatile的作用,使用volatile必须满足以下两个条件:
l 对变量的写操作不依赖于当前值
l 该变量没有包含在具有其他变量的不变式中
状态量标记:
volatile boolean flag = false;
while(!flag){
doSomething();
}
public void setFlag() {
flag = true;
}volatile boolean inited = false;
//线程1:
context = loadContext();
inited = true;
//线程2:
while(!inited ){
sleep()
}
doSomethingwithconfig(context);双重校验:
class Singleton{
private volatile static Singleton instance = null;
private Singleton() {
}
public static Singleton getInstance() {
if(instance==null) {
synchronized (Singleton.class) {
if(instance==null)
instance = new Singleton();
}
}
return instance;
}
}三、容器
容器类关系图

Collection 接口的接口 对象的集合 ├ List 子接口 按进入先后有序保存 可重复 │├ LinkedList 接口实现类 链表 插入删除 没有同步 线程不安全 │├ ArrayList 接口实现类 数组 随机访问 没有同步 线程不安全 │└ Vector 接口实现类 数组 同步 线程安全 │ └ Stack └ Set 子接口 不可重复
├ HashSet
│ └ LinkedHashSet └ TreeSet
Map 接口 键值对的集合 ├ Hashtable 接口实现类 同步 线程安全 ├ HashMap 接口实现类 没有同步 线程不安全
│├ LinkedHashMap
│└ WeakHashMap
└ TreeMap
线程安全集合类
线程安全集合类可以分为三大类:
遗留的线程安全集合如 Hashtable , Vector
使用 Collections 装饰的线程安全集合,如:
- Collections.synchronizedCollection
- Collections.synchronizedList
- Collections.synchronizedMap
- Collections.synchronizedSet
- Collections.synchronizedNavigableMap
- Collections.synchronizedNavigableSet
- Collections.synchronizedSortedMap
- Collections.synchronizedSortedSet
java.util.concurrent.*
重点介绍 java.util.concurrent.* 下的线程安全集合类,可以发现它们有规律,里面包含三类关键词:Blocking、CopyOnWrite、Concurrent
- Blocking 大部分实现基于锁,并提供用来阻塞的方法
- CopyOnWrite 之类容器修改开销相对较重
- Concurrent 类型的容器
- 内部很多操作使用 cas 优化,一般可以提供较高吞吐量
- 弱一致性
- 遍历时弱一致性,例如,当利用迭代器遍历时,如果容器发生修改,迭代器仍然可以继续进行遍历,这时内容是旧的
- 求大小弱一致性,size 操作未必是 100% 准确
- 读取弱一致性
HashMap实现分析
HashMap实际上是一个“链表散列”的数据结构,即数组和链表的结合体。
数组:存储区间连续,占用内存严重,寻址容易,插入删除困难;
链表:存储区间离散,占用内存比较宽松,寻址困难,插入删除容易; HashMap综合应用了这两种数据结构,实现了寻址容易,插入删除也容易。
JDK1.8之前并发问题
在hashmap做put操作的时候会调用下面方法
// 新增Entry。将“key-value”插入指定位置,bucketIndex是位置索引。
void addEntry(int hash, K key, V value, int bucketIndex) {
// 保存“bucketIndex”位置的值到“e”中
Entry<K,V> e = table[bucketIndex];
// 设置“bucketIndex”位置的元素为“新Entry”,
// 设置“e”为“新Entry的下一个节点”
table[bucketIndex] = new Entry<K,V>(hash, key, value, e);
// 若HashMap的实际大小 不小于 “阈值”,则调整HashMap的大小
if (size++ >= threshold)
resize(2 * table.length);
}在hashmap做put操作的时候会调用到以上的方法。现在假如A线程和B线程同时对同一个数组位置调用addEntry,两个线程会同时得到现在的头结点,然后A写入新的头结点之后,B也写入新的头结点,那B的写入操作就会覆盖A的写入操作造成A的写入操作丢失
删除键值对会调用以下代码
final Entry<K,V> removeEntryForKey(Object key) {
// 获取哈希值。若key为null,则哈希值为0;否则调用hash()进行计算
int hash = (key == null) ? 0 : hash(key.hashCode());
int i = indexFor(hash, table.length);
Entry<K,V> prev = table[i];
Entry<K,V> e = prev;
// 删除链表中“键为key”的元素
// 本质是“删除单向链表中的节点”
while (e != null) {
Entry<K,V> next = e.next;
Object k;
if (e.hash == hash &&
((k = e.key) == key || (key != null && key.equals(k)))) {
modCount++;
size--;
if (prev == e)
table[i] = next;
else
prev.next = next;
e.recordRemoval(this);
return e;
}
prev = e;
e = next;
}
return e;
}当多个线程同时操作同一个数组位置的时候,也都会先取得现在状态下该位置存储的头结点,然后各自去进行计算操作,之后再把结果写会到该数组位置去,其实写回的时候可能其他的线程已经就把这个位置给修改过了,就会覆盖其他线程的修改。
addEntry中当加入新的键值对后键值对总数量超过门限值的时候会调用一个resize操作
// 重新调整HashMap的大小,newCapacity是调整后的容量
void resize(int newCapacity) {
Entry[] oldTable = table;
int oldCapacity = oldTable.length;
//如果就容量已经达到了最大值,则不能再扩容,直接返回
if (oldCapacity == MAXIMUM_CAPACITY) {
threshold = Integer.MAX_VALUE;
return;
}
// 新建一个HashMap,将“旧HashMap”的全部元素添加到“新HashMap”中,
// 然后,将“新HashMap”赋值给“旧HashMap”。
Entry[] newTable = new Entry[newCapacity];
transfer(newTable);
table = newTable;
threshold = (int)(newCapacity * loadFactor);
}这个操作会新生成一个新的容量的数组,然后对原数组的所有键值对重新进行计算和写入新的数组,之后指向新生成的数组。当多个线程同时检测到总数量超过门限值的时候就会同时调用resize操作,各自生成新的数组并rehash后赋给该map底层的数组table,结果最终只有最后一个线程生成的新数组被赋给table变量,其他线程的均会丢失。而且当某些线程已经完成赋值而其他线程刚开始的时候,就会用已经被赋值的table作为原始数组,这样也会有问题。
JDK1.8并发问题
abstract class HashIterator {
Node<K,V> next; // next entry to return
Node<K,V> current; // current entry
int expectedModCount; // for fast-fail
int index; // current slot
HashIterator() {
expectedModCount = modCount;
Node<K,V>[] t = table;
current = next = null;
index = 0;
if (t != null && size > 0) { // advance to first entry
do {} while (index < t.length && (next = t[index++]) == null);
}
}
public final boolean hasNext() {
return next != null;
}
final Node<K,V> nextNode() {
Node<K,V>[] t;
Node<K,V> e = next;
if (modCount != expectedModCount)
throw new ConcurrentModificationException();
if (e == null)
throw new NoSuchElementException();
if ((next = (current = e).next) == null && (t = table) != null) {
do {} while (index < t.length && (next = t[index++]) == null);
}
return e;
}
public final void remove() {
Node<K,V> p = current;
if (p == null)
throw new IllegalStateException();
if (modCount != expectedModCount)
throw new ConcurrentModificationException();
current = null;
K key = p.key;
removeNode(hash(key), key, null, false, false);
expectedModCount = modCount;
}
}l modCount是hashmap中的成员变量。
l 在调用put(),remove(),clear(),ensureCapacity()这些会修改数据结构的方法中都会使modCount++。
l 在获取迭代器的时候会把modCount赋值给迭代器的expectedModCount变量。此时modCount与expectedModCount肯定相等。
l 在迭代元素的过程中如果hashmap调用自身方法使集合发生变化,那么modCount肯定会变,此时modCount与expectedModCount肯定会不相等。
l 在迭代过程中,只要发现modCount!=expectedModCount,则说明结构发生了变化也就没有必要继续迭代元素了。此时会抛出ConcurrentModificationException,终止迭代操作。
HashMap并发问题解决方案
HashMap并发问题解决方案有如下几种:
l Synchronized关键字
l Lock锁
l 同步类容器
l 并发类容器
同步容器
在Java中,同步容器主要包括2类:
1)Vector、Stack、HashTable(可以独立创建)
2)Collections类中提供的静态工厂方法创建的类(借助工具类创建)
Vector
实现了List接口,Vector实际上就是一个数组,和ArrayList类似,但是Vector中的方法都是synchronized方法,即进行了同步措施。
Stack
也是一个同步容器,它的方法也用synchronized进行了同步,它实际上是继承于Vector类。
HashTable
实现了Map接口,它和HashMap很相似,但是HashTable进行了同步处理,而HashMap没有。
Collections
Collections类是一个工具提供类,注意,它和Collection不同,Collection是一个顶层的接口。在Collections类中提供了大量的方法,比如对集合或者容器进行排序、查找等操作。最重要的是,在它里面提供了几个静态工厂方法来创建同步容器类
HashTable
Hashtable是线程安全的,因为它的remove,put,get等public方法做成了同步方法,保证了HashTable的线程安全性。
并发容器
同步容器将几乎所有方法添加的synchronized进行同步,这样保证了线程的安全性,但代价就是严重降低了并发性能,当多个线程竞争容器时,吞吐量严重降低。
Java5.0开始针对多线程并发访问重新设计,提供了并发性能较好的并发容器,引入了java.util.concurrent包。
并发容器如下:
ConcurrentHashMap
l 对应的非并发容器:HashMap
l 目标:代替Hashtable、synchronizedMap,支持复合操作
l 原理:JDK6中采用一种更加细粒度的加锁机制Segment“分段锁”,JDK8中采用CAS无锁算法。
CopyOnWriteArrayList
l 对应的非并发容器:ArrayList
l 目标:代替Vector、synchronizedList
l 原理:利用高并发往往是读多写少的特性,对读操作不加锁,对写操作,先复制一份新的集合,在新的集合上面修改,然后将新集合赋值给旧的引用,并通过volatile 保证其可见性,当然写操作的锁是必不可少的了。
CopyOnWriteArraySet
l 对应的费并发容器:HashSet
l 目标:代替synchronizedSet
l 原理:基于CopyOnWriteArrayList实现,其唯一的不同是在add时调用的是CopyOnWriteArrayList的addIfAbsent方法,其遍历当前Object数组,如Object数组中已有了当前元素,则直接返回,如果没有则放入Object数组的尾部,并返回。
ConcurrentSkipListMap
l 对应的非并发容器:TreeMap
l 目标:代替synchronizedSortedMap(TreeMap)
l 原理:Skip list(跳表)是一种可以代替平衡树的数据结构,默认是按照Key值升序的。Skip list让已排序的数据分布在多层链表中,以0-1随机数决定一个数据的向上攀升与否,通过”空间来换取时间”的一个算法。ConcurrentSkipListMap提供了一种线程安全的并发访问的排序映射表。内部是SkipList(跳表)结构实现,在理论上能够在O(log(n))时间内完成查找、插入、删除操作。
ConcurrentSkipListSet
l 对应的非并发容器:TreeSet
l 目标:代替synchronizedSortedSet
l 原理:内部基于ConcurrentSkipListMap实现
ConcurrentLinkedQueue
l 不会阻塞的队列
l 对应的非并发容器:Queue
l 原理:基于链表实现的FIFO队列(LinkedList的并发版本)
LinkedBlockingQueue、ArrayBlockingQueue、PriorityBlockingQueue
l 对应的非并发容器:BlockingQueue
l 特点:拓展了Queue,增加了可阻塞的插入和获取等操作
l 原理:通过ReentrantLock实现线程安全,通过Condition实现阻塞和唤醒
l 实现类:
LinkedBlockingQueue:基于链表实现的可阻塞的FIFO队列
ArrayBlockingQueue:基于数组实现的可阻塞的FIFO队列
PriorityBlockingQueue:按优先级排序的队列
ConcurrentHashMap数据结构
Java7基于分段的数据结构

java7实现分析
// put方法
public V put(K key, V value) {
Segment<K,V> s;
if (value == null)
throw new NullPointerException();
// 1. 计算 key 的 hash 值
int hash = hash(key);
// 2. 根据 hash 值找到 Segment 数组中的位置 j
// hash 是 32 位,无符号右移 segmentShift(28) 位,剩下低 4 位,
// 然后和 segmentMask(15) 做一次与操作,也就是说 j 是 hash 值的最后 4 位,也就是槽的数组下标
int j = (hash >>> segmentShift) & segmentMask;
// 刚刚说了,初始化的时候初始化了 segment[0],但是其他位置还是 null,
// ensureSegment(j) 对 segment[j] 进行初始化
if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck
(segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment
s = ensureSegment(j);
// 3. 插入新值到 槽 s 中
return s.put(key, hash, value, false);
}// Segment内部的put方法(对应上方s.put(key, hash, value, false))
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
// 在往该 segment 写入前,需要先获取该 segment 的独占锁
// 先看主流程,后面还会具体介绍这部分内容
HashEntry<K,V> node = tryLock() ? null :
scanAndLockForPut(key, hash, value);
V oldValue;
try {
// 这个是 segment 内部的数组
HashEntry<K,V>[] tab = table;
// 再利用 hash 值,求应该放置的数组下标
int index = (tab.length - 1) & hash;
// first 是数组该位置处的链表的表头
HashEntry<K,V> first = entryAt(tab, index);
// 下面这串 for 循环虽然很长,不过也很好理解,想想该位置没有任何元素和已经存在一个链表这两种情况
for (HashEntry<K,V> e = first;;) {
if (e != null) {
K k;
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k))) {
oldValue = e.value;
if (!onlyIfAbsent) {
// 覆盖旧值
e.value = value;
++modCount;
}
break;
}
// 继续顺着链表走
e = e.next;
}
else {
// node 到底是不是 null,这个要看获取锁的过程,不过和这里都没有关系。
// 如果不为 null,那就直接将它设置为链表表头;如果是null,初始化并设置为链表表头。
if (node != null)
node.setNext(first);
else
node = new HashEntry<K,V>(hash, key, value, first);
int c = count + 1;
// 如果超过了该 segment 的阈值,这个 segment 需要扩容
if (c > threshold && tab.length < MAXIMUM_CAPACITY)
rehash(node); // 扩容后面也会具体分析
else
// 没有达到阈值,将 node 放到数组 tab 的 index 位置,
// 其实就是将新的节点设置成原链表的表头
setEntryAt(tab, index, node);
++modCount;
count = c;
oldValue = null;
break;
}
}
} finally {
// 解锁
unlock();
}
return oldValue;
}// scanAndLockForPut方法获取锁:对应上边scanAndLockForPut(key, hash, value);
// tryLock() 成功了,循环终止;
// 重试次数超过了 MAX_SCAN_RETRIES,进到 lock() 方法,此方法会阻塞等待,直到成功拿到独占锁。
private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
HashEntry<K,V> first = entryForHash(this, hash);
HashEntry<K,V> e = first;
HashEntry<K,V> node = null;
int retries = -1; // negative while locating node
// 循环获取锁
while (!tryLock()) {
HashEntry<K,V> f; // to recheck first below
if (retries < 0) {
if (e == null) {
if (node == null) // speculatively create node
// 进到这里说明数组该位置的链表是空的,没有任何元素
// 当然,进到这里的另一个原因是 tryLock() 失败,所以该槽存在并发,不一定是该位置
node = new HashEntry<K,V>(hash, key, value, null);
retries = 0;
}
else if (key.equals(e.key))
retries = 0;
else
// 顺着链表往下走
e = e.next;
}
// 重试次数如果超过 MAX_SCAN_RETRIES(单核1多核64),那么不抢了,进入到阻塞队列等待锁
// lock() 是阻塞方法,直到获取锁后返回
else if (++retries > MAX_SCAN_RETRIES) {
lock();
break;
}
else if ((retries & 1) == 0 &&
// 这个时候是有大问题了,那就是有新的元素进到了链表,成为了新的表头
// 所以这边的策略是,相当于重新走一遍这个 scanAndLockForPut 方法
(f = entryForHash(this, hash)) != first) {
e = first = f; // re-traverse if entry changed
retries = -1;
}
}
return node;
}// ensureSegment方法初始化分片中指定位置的元素(槽):使用CAS保证线程安全
private Segment<K,V> ensureSegment(int k) {
final Segment<K,V>[] ss = this.segments;
long u = (k << SSHIFT) + SBASE; // raw offset
Segment<K,V> seg;
if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {
// 这里看到为什么之前要初始化 segment[0] 了,
// 使用当前 segment[0] 处的数组长度和负载因子来初始化 segment[k]
// 为什么要用“当前”,因为 segment[0] 可能早就扩容过了
Segment<K,V> proto = ss[0];
int cap = proto.table.length;
float lf = proto.loadFactor;
int threshold = (int)(cap * lf);
// 初始化 segment[k] 内部的数组
HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap];
if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
== null) { // 再次检查一遍该槽是否被其他线程初始化了。
Segment<K,V> s = new Segment<K,V>(lf, threshold, tab);
// 使用 while 循环,内部用 CAS,当前线程成功设值或其他线程成功设值后,退出
while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
== null) {
if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))
break;
}
}
}
return seg;
}Java8基于CAS的数据结构

java8 同步实现分析
// put主流程
public V put(K key, V value) {
return putVal(key, value, false);
}
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
// 得到 hash 值
int hash = spread(key.hashCode());
// 用于记录相应链表的长度
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
// 如果数组"空",进行数组初始化
if (tab == null || (n = tab.length) == 0)
// 初始化数组,后面会详细介绍
tab = initTable();
// 找该 hash 值对应的数组下标,得到第一个节点 f
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
// 如果数组该位置为空,
// 用一次 CAS 操作将这个新值放入其中即可,这个 put 操作差不多就结束了,可以拉到最后面了
// 如果 CAS 失败,那就是有并发操作,进到下一个循环就好了
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
// hash 居然可以等于 MOVED,这个需要到后面才能看明白,不过从名字上也能猜到,肯定是因为在扩容
else if ((fh = f.hash) == MOVED)
// 帮助数据迁移,这个等到看完数据迁移部分的介绍后,再理解这个就很简单了
tab = helpTransfer(tab, f);
else { // 到这里就是说,f 是该位置的头结点,而且不为空
V oldVal = null;
// 获取数组该位置的头结点的监视器锁
synchronized (f) {
if (tabAt(tab, i) == f) {
if (fh >= 0) { // 头结点的 hash 值大于 0,说明是链表
// 用于累加,记录链表的长度
binCount = 1;
// 遍历链表
for (Node<K,V> e = f;; ++binCount) {
K ek;
// 如果发现了"相等"的 key,判断是否要进行值覆盖,然后也就可以 break 了
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
// 到了链表的最末端,将这个新值放到链表的最后面
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
else if (f instanceof TreeBin) { // 红黑树
Node<K,V> p;
binCount = 2;
// 调用红黑树的插值方法插入新节点
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
// binCount != 0 说明上面在做链表操作
if (binCount != 0) {
// 判断是否要将链表转换为红黑树,临界值和 HashMap 一样,也是 8
if (binCount >= TREEIFY_THRESHOLD)
// 这个方法和 HashMap 中稍微有一点点不同,那就是它不是一定会进行红黑树转换,
// 如果当前数组的长度小于 64,那么会选择进行数组扩容,而不是转换为红黑树
// 具体源码我们就不看了,扩容部分后面说
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
//
addCount(1L, binCount);
return null;
}// 初始化方法 initTable():该方法通过sizeCtl实现CAS初始化
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
// 初始化的"功劳"被其他线程"抢去"了
if ((sc = sizeCtl) < 0)
Thread.yield(); //放弃执行权
// CAS 一下,将 sizeCtl 设置为 -1,代表抢到了锁
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
if ((tab = table) == null || tab.length == 0) {
// DEFAULT_CAPACITY 默认初始容量是 16
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
// 初始化数组,长度为 16 或初始化时提供的长度
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
// 将这个数组赋值给 table,table 是 volatile 的
table = tab = nt;
// 如果 n 为 16 的话,那么这里 sc = 12
// 其实就是 0.75 * n
sc = n - (n >>> 2);
}
} finally {
// 设置 sizeCtl 为 sc,我们就当是 12 吧
sizeCtl = sc;
}
break;
}
}
return tab;
}// 扩容方法 tryPresize:该方法通过sizeCtl实现CAS初始化
// 首先要说明的是,方法参数 size 传进来的时候就已经翻了倍了
private final void tryPresize(int size) {
// c:size 的 1.5 倍,再加 1,再往上取最近的 2 的 n 次方。
int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :
tableSizeFor(size + (size >>> 1) + 1);
int sc;
while ((sc = sizeCtl) >= 0) {
Node<K,V>[] tab = table; int n;
// 这个 if 分支和之前说的初始化数组的代码基本上是一样的,在这里,我们可以不用管这块代码
if (tab == null || (n = tab.length) == 0) {
n = (sc > c) ? sc : c;
if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
if (table == tab) {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = nt;
sc = n - (n >>> 2); // 0.75 * n
}
} finally {
sizeCtl = sc;
}
}
}
else if (c <= sc || n >= MAXIMUM_CAPACITY)
break;
else if (tab == table) {
int rs = resizeStamp(n);
if (sc < 0) {
Node<K,V>[] nt;
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
// 2. 用 CAS 将 sizeCtl 加 1,然后执行 transfer 方法
// 此时 nextTab 不为 null
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
// 1. 将 sizeCtl 设置为 (rs << RESIZE_STAMP_SHIFT) + 2)
// 调用 transfer 方法,此时 nextTab 参数为 null
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
}
}
}// 数据迁移方法transfer:该方法通过CAS和synchronized关键字实现同步
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
// stride 在单核下直接等于 n,多核模式下为 (n>>>3)/NCPU,最小值是 16
// stride 可以理解为”步长“,有 n 个位置是需要进行迁移的,
// 将这 n 个任务分为多个任务包,每个任务包有 stride 个任务
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
// 如果 nextTab 为 null,先进行一次初始化
// 前面我们说了,外围会保证第一个发起迁移的线程调用此方法时,参数 nextTab 为 null
// 之后参与迁移的线程调用此方法时,nextTab 不会为 null
if (nextTab == null) {
try {
// 容量翻倍
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
// nextTable 是 ConcurrentHashMap 中的属性
nextTable = nextTab;
// transferIndex 也是 ConcurrentHashMap 的属性,用于控制迁移的位置
transferIndex = n;
}
int nextn = nextTab.length;
// ForwardingNode 翻译过来就是正在被迁移的 Node
// 这个构造方法会生成一个Node,key、value 和 next 都为 null,关键是 hash 为 MOVED
// 后面我们会看到,原数组中位置 i 处的节点完成迁移工作后,
// 就会将位置 i 处设置为这个 ForwardingNode,用来告诉其他线程该位置已经处理过了
// 所以它其实相当于是一个标志。
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
// advance 指的是做完了一个位置的迁移工作,可以准备做下一个位置的了
boolean advance = true;
boolean finishing = false; // to ensure sweep before committing nextTab
/*
* 下面这个 for 循环,最难理解的在前面,而要看懂它们,应该先看懂后面的,然后再倒回来看
*
*/
// i 是位置索引,bound 是边界,注意是从后往前
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
// 下面这个 while 真的是不好理解
// advance 为 true 表示可以进行下一个位置的迁移了
// 简单理解结局:i 指向了 transferIndex,bound 指向了 transferIndex-stride
while (advance) {
int nextIndex, nextBound;
if (--i >= bound || finishing)
advance = false;
// 将 transferIndex 值赋给 nextIndex
// 这里 transferIndex 一旦小于等于 0,说明原数组的所有位置都有相应的线程去处理了
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
// 看括号中的代码,nextBound 是这次迁移任务的边界,注意,是从后往前
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
if (finishing) {
// 所有的迁移操作已经完成
nextTable = null;
// 将新的 nextTab 赋值给 table 属性,完成迁移
table = nextTab;
// 重新计算 sizeCtl:n 是原数组长度,所以 sizeCtl 得出的值将是新数组长度的 0.75 倍
sizeCtl = (n << 1) - (n >>> 1);
return;
}
// 之前我们说过,sizeCtl 在迁移前会设置为 (rs << RESIZE_STAMP_SHIFT) + 2
// 然后,每有一个线程参与迁移就会将 sizeCtl 加 1,
// 这里使用 CAS 操作对 sizeCtl 进行减 1,代表做完了属于自己的任务
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
// 任务结束,方法退出
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
// 到这里,说明 (sc - 2) == resizeStamp(n) << RESIZE_STAMP_SHIFT,
// 也就是说,所有的迁移任务都做完了,也就会进入到上面的 if(finishing){} 分支了
finishing = advance = true;
i = n; // recheck before commit
}
}
// 如果位置 i 处是空的,没有任何节点,那么放入刚刚初始化的 ForwardingNode ”空节点“
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
// 该位置处是一个 ForwardingNode,代表该位置已经迁移过了
else if ((fh = f.hash) == MOVED)
advance = true; // already processed
else {
// 对数组该位置处的结点加锁,开始处理数组该位置处的迁移工作
synchronized (f) {
if (tabAt(tab, i) == f) {
Node<K,V> ln, hn;
// 头结点的 hash 大于 0,说明是链表的 Node 节点
if (fh >= 0) {
// 下面这一块和 Java7 中的 ConcurrentHashMap 迁移是差不多的,
// 需要将链表一分为二,
// 找到原链表中的 lastRun,然后 lastRun 及其之后的节点是一起进行迁移的
// lastRun 之前的节点需要进行克隆,然后分到两个链表中
int runBit = fh & n;
Node<K,V> lastRun = f;
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
// 其中的一个链表放在新数组的位置 i
setTabAt(nextTab, i, ln);
// 另一个链表放在新数组的位置 i+n
setTabAt(nextTab, i + n, hn);
// 将原数组该位置处设置为 fwd,代表该位置已经处理完毕,
// 其他线程一旦看到该位置的 hash 值为 MOVED,就不会进行迁移了
setTabAt(tab, i, fwd);
// advance 设置为 true,代表该位置已经迁移完毕
advance = true;
}
else if (f instanceof TreeBin) {
// 红黑树的迁移
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
// 如果一分为二后,节点数少于 8,那么将红黑树转换回链表
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
// 将 ln 放置在新数组的位置 i
setTabAt(nextTab, i, ln);
// 将 hn 放置在新数组的位置 i+n
setTabAt(nextTab, i + n, hn);
// 将原数组该位置处设置为 fwd,代表该位置已经处理完毕,
// 其他线程一旦看到该位置的 hash 值为 MOVED,就不会进行迁移了
setTabAt(tab, i, fwd);
// advance 设置为 true,代表该位置已经迁移完毕
advance = true;
}
}
}
}
}
}四、线程池
多线程的缺点:
处理任务的线程创建和销毁都非常耗时并消耗资源。
多线程之间的切换也会非常耗时并消耗资源。
解决方法:采用线程池
使用时线程已存在,消除了线程创建的时耗
通过设置线程数目,防止资源不足
构造函数
在Java中创建线程池常用的类是ThreadPoolExecutor,该类的全参构造函数如下:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {参数介绍:
l corePoolSize:线程池中核心线程数的最大值
核心线程数的设计需要依据任务的处理时间和每秒产生的任务数量来确定,例如:执行一个任务需要0.1秒,系统百分之80的时间每秒都会产生100个任务,那么要想在1秒内处理完这100个任务,就需要10个线程,此时我们就可以设计核心线程数为10;当然实际情况不可能这么平均,所以我们一般按照8020原则设计即可,既按照百分之80的情况设计核心线程数,剩下的百分之20可以利用最大线程数处理;
l maximumPoolSize:线程池中能拥有最多线程数
最大线程数的设计除了需要参照核心线程数的条件外,还需要参照系统每秒产生的最大任务数决定:例如:上述环境中,如果系统每秒最大产生的任务是1000个,那么,最大线程数=(最大任务数-任务队列长度)*单个任务执行时间;既: 最大线程数=(1000-200)*0.1=80个;
l workQueue:
任务队列长度一般设计为:核心线程数/单个任务执行时间*2即可;例如上面的场景中,核心线程数设计为10,单个任务执行时间为0.1秒,则队列长度可以设计为200;
用于缓存任务的阻塞队列,对于不同的应用场景我们可能会采取不同的排队策略,这就需要不同类型的阻塞队列,在线程池中常用的阻塞队列有以下2种:
Ø SynchronousQueue<Runnable>:此队列中不缓存任何一个任务。向线程池提交任务时,如果没有空闲线程来运行任务,则入列操作会阻塞。当有线程来获取任务时,出列操作会唤醒执行入列操作的线程。从这个特性来看,SynchronousQueue是一个无界队列,因此当使用SynchronousQueue作为线程池的阻塞队列时,参数maximumPoolSizes没有任何作用。
Ø LinkedBlockingQueue<Runnable>:顾名思义是用链表实现的队列,可以是有界的,也可以是无界的,但在Executors中默认使用无界的。
以上三个参数之间的关系如下:
如果没有空闲的线程执行该任务且当前运行的线程数少于corePoolSize,则添加新的线程执行该任务。如果没有空闲的线程执行该任务且当前的线程数等于corePoolSize同时阻塞队列未满,则将任务入队列,而不添加新的线程。如果没有空闲的线程执行该任务且阻塞队列已满同时池中的线程数小于maximumPoolSize,则创建新的线程执行任务。如果没有空闲的线程执行该任务且阻塞队列已满同时池中的线程数等于maximumPoolSize,则根据构造函数中的handler指定的策略来拒绝新的任务。
l keepAliveTime:表示空闲线程的存活时间。
这个参数的设计完全参考系统运行环境和硬件压力设定,没有固定的参考值,用户可以根据经验和系统产生任务的时间间隔合理设置一个值即可;
l unit:表示keepAliveTime的单位。
l handler:表示当workQueue已满,且池中的线程数达到maximumPoolSize时,线程池拒绝添加新任务时采取的策略。一般可以采取以下四种取值。
| ThreadPoolExecutor.AbortPolicy() | 抛出RejectedExecutionException异常 |
|---|---|
| ThreadPoolExecutor.CallerRunsPolicy() | 由向线程池提交任务的线程来执行该任务 |
| ThreadPoolExecutor.DiscardOldestPolicy() | 抛弃最旧的任务(最先提交而没有得到执行的任务) |
| ThreadPoolExecutor.DiscardPolicy() | 抛弃当前的任务 |
l threadFactory:指定创建线程的工厂
常用的线程池
Eexcutors类提供了四个创建线程池的方法,分别如下
l newCachedThreadPool
l newFixedThreadPool
l newSingleThreadExecutor
l newScheduleThreadPool
newCachedThreadPool
该方法创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
此类型线程池特点是:
工作线程的创建数量几乎没有限制(其实也有限制的,数目为Interger. MAX_VALUE)空闲的工作线程会自动销毁,有新任务会重新创建在使用CachedThreadPool时,一定要注意控制任务的数量,否则,由于大量线程同时运行,很有会造成系统瘫痪。
public static void main(String[] args) {
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
for (int i = 0; i < 10; i++) {
final int index = i;
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
cachedThreadPool.execute(new Runnable() {
public void run() {
System.out.println(index);
}
});
}
}newFixedThreadPool
该方法创建一个指定工作线程数量的线程池。每当提交一个任务就创建一个工作线程,如果工作线程数量达到线程池初始的最大数,则将提交的任务存入到池队列中。
优点:具有线程池提高程序效率和节省创建线程时所耗的开销。
缺点:在线程池空闲时,即线程池中没有可运行任务时,它不会释放工作线程,还会占用一定的系统资源。
public static void main(String[] args) {
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3);
for (int i = 0; i < 10; i++) {
final int index = i;
fixedThreadPool.execute(new Runnable() {
public void run() {
try {
System.out.println(index);
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
}newSingleThreadExecutor
该方法创建一个单线程化的Executor,即只创建唯一的工作者线程来执行任务,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO,优先级)执行。如果这个线程异常结束,会有另一个取代它,保证顺序执行。
单工作线程最大的特点是可保证顺序地执行各个任务,并且在任意给定的时间不会有多个线程是活动的。
public static void main(String[] args) {
ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
for (int i = 0; i < 10; i++) {
final int index = i;
singleThreadExecutor.execute(new Runnable() {
public void run() {
try {
System.out.println(index);
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
}newScheduleThreadPool
该方法创建一个定长的线程池,而且支持定时的以及周期性的任务执行,支持定时及周期性任务执行。
public static void main(String[] args) {
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);
for (int i = 0; i < 10; i++) {
scheduledThreadPool.schedule(new Runnable() {
public void run() {
System.out.println("delay 3 seconds");
}
}, 3, TimeUnit.SECONDS);
}
}常用方法
// 延迟时间单位是unit,数量是delay的时间后执行callable。
<V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit)
// 延迟时间单位是unit,数量是delay的时间后执行command。
ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit)
// 延迟时间单位是unit,数量是initialDelay的时间后,每间隔period时间重复执行一次command。
ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)
// 创建并执行一个在给定初始延迟后首次启用的定期操作,随后,在每一次执行终止和下一次执行开始之间都存在给定的延迟。
ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)Fork/Join
Fork/Join 是 JDK 1.7 加入的新的线程池实现,它体现的是一种分治思想,适用于能够进行任务拆分的 cpu 密集型运算
所谓的任务拆分,是将一个大任务拆分为算法上相同的小任务,直至不能拆分可以直接求解。跟递归相关的一些计算,如归并排序、斐波那契数列、都可以用分治思想进行求解
Fork/Join 在分治的基础上加入了多线程,可以把每个任务的分解和合并交给不同的线程来完成,进一步提升了运算效率
Fork/Join 默认会创建与 cpu 核心数大小相同的线程池
提交给 Fork/Join 线程池的任务需要继承 RecursiveTask(有返回值)或 RecursiveAction(没有返回值)
创建多少线程池合适
过小会导致程序不能充分地利用系统资源、容易导致饥饿
过大会导致更多的线程上下文切换,占用更多内存
CPU 密集型运算
通常采用 cpu 核数 + 1 能够实现最优的 CPU 利用率,+1 是保证当线程由于页缺失故障(操作系统)或其它原因导致暂停时,额外的这个线程就能顶上去,保证 CPU 时钟周期不被浪费
I/O 密集型运算
CPU 不总是处于繁忙状态,例如,当你执行业务计算时,这时候会使用 CPU 资源,但当你执行 I/O 操作时、远程RPC 调用时,包括进行数据库操作时,这时候 CPU 就闲下来了,你可以利用多线程提高它的利用率。
经验公式如下 线程数 = 核数 * 期望 CPU 利用率 * 总时间(CPU计算时间+等待时间) / CPU 计算时间例如 4 核 CPU 计算时间是 50% ,其它等待时间是 50%,期望 cpu 被 100% 利用,套用公式4 * 100% * 100% / 50% = 8 例如 4 核 CPU 计算时间是 10% ,其它等待时间是 90%,期望 cpu 被 100% 利用,套用公式4 * 100% * 100% / 10% = 40
自定义线程池
package cn.itcast.threadpool;
import lombok.extern.slf4j.Slf4j;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.HashSet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
@FunctionalInterface
interface RejectPolicy<T> {
void reject(BlockingQueue<T> queue, T task);
}
@Slf4j(topic = "c.ThreadPool")
class ThreadPool {
// 任务队列
private BlockingQueue<Runnable> taskQueue;
// 线程集合
private HashSet<Worker> workers = new HashSet<>();
// 核心线程数
private int coreSize;
// 获取任务时的超时时间
private long timeout;
private TimeUnit timeUnit;
private RejectPolicy<Runnable> rejectPolicy;
// 执行任务
public void execute(Runnable task) {
// 当任务数没有超过 coreSize 时,直接交给 worker 对象执行
// 如果任务数超过 coreSize 时,加入任务队列暂存
synchronized (workers) {
if(workers.size() < coreSize) {
Worker worker = new Worker(task);
log.debug("新增 worker{}, {}", worker, task);
workers.add(worker);
worker.start();
} else {
taskQueue.tryPut(task, rejectPolicy);
}
}
}
public ThreadPool(int coreSize, long timeout, TimeUnit timeUnit, int queueCapcity, RejectPolicy<Runnable> rejectPolicy) {
this.coreSize = coreSize;
this.timeout = timeout;
this.timeUnit = timeUnit;
this.taskQueue = new BlockingQueue<>(queueCapcity);
this.rejectPolicy = rejectPolicy;
}
class Worker extends Thread{
private Runnable task;
public Worker(Runnable task) {
this.task = task;
}
@Override
public void run() {
// 执行任务
// 1) 当 task 不为空,执行任务
// 2) 当 task 执行完毕,再接着从任务队列获取任务并执行
// while(task != null || (task = taskQueue.take()) != null) {
while(task != null || (task = taskQueue.poll(timeout, timeUnit)) != null) {
try {
log.debug("正在执行...{}", task);
task.run();
} catch (Exception e) {
e.printStackTrace();
} finally {
task = null;
}
}
synchronized (workers) {
log.debug("worker 被移除{}", this);
workers.remove(this);
}
}
}
}
@Slf4j(topic = "c.BlockingQueue")
class BlockingQueue<T> {
// 1. 任务队列
private Deque<T> queue = new ArrayDeque<>();
// 2. 锁
private ReentrantLock lock = new ReentrantLock();
// 3. 生产者条件变量
private Condition fullWaitSet = lock.newCondition();
// 4. 消费者条件变量
private Condition emptyWaitSet = lock.newCondition();
// 5. 容量
private int capcity;
public BlockingQueue(int capcity) {
this.capcity = capcity;
}
// 带超时阻塞获取
public T poll(long timeout, TimeUnit unit) {
lock.lock();
try {
// 将 timeout 统一转换为 纳秒
long nanos = unit.toNanos(timeout);
while (queue.isEmpty()) {
try {
// 返回值是剩余时间
if (nanos <= 0) {
return null;
}
nanos = emptyWaitSet.awaitNanos(nanos);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
T t = queue.removeFirst();
fullWaitSet.signal();
return t;
} finally {
lock.unlock();
}
}
// 阻塞获取
public T take() {
lock.lock();
try {
while (queue.isEmpty()) {
try {
emptyWaitSet.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
T t = queue.removeFirst();
fullWaitSet.signal();
return t;
} finally {
lock.unlock();
}
}
// 阻塞添加
public void put(T task) {
lock.lock();
try {
while (queue.size() == capcity) {
try {
log.debug("等待加入任务队列 {} ...", task);
fullWaitSet.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.debug("加入任务队列 {}", task);
queue.addLast(task);
emptyWaitSet.signal();
} finally {
lock.unlock();
}
}
// 带超时时间阻塞添加
public boolean offer(T task, long timeout, TimeUnit timeUnit) {
lock.lock();
try {
long nanos = timeUnit.toNanos(timeout);
while (queue.size() == capcity) {
try {
if(nanos <= 0) {
return false;
}
log.debug("等待加入任务队列 {} ...", task);
nanos = fullWaitSet.awaitNanos(nanos);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.debug("加入任务队列 {}", task);
queue.addLast(task);
emptyWaitSet.signal();
return true;
} finally {
lock.unlock();
}
}
public int size() {
lock.lock();
try {
return queue.size();
} finally {
lock.unlock();
}
}
public void tryPut(T task, RejectPolicy rejectPolicy) {
lock.lock();
try {
if(queue.size() == capcity) {
rejectPolicy.reject(this, task);
} else {
log.debug("加入任务队列 {}", task);
queue.addLast(task);
emptyWaitSet.signal();
}
} finally {
lock.unlock();
}
}
}测试
public static void main(String[] args) {
ThreadPool threadPool = new ThreadPool(1,
1000, TimeUnit.MILLISECONDS, 1, (queue, task)->{
// 1. 死等
// queue.put(task);
// 2) 带超时等待
// queue.offer(task, 1500, TimeUnit.MILLISECONDS);
// 3) 让调用者放弃任务执行
// log.debug("放弃{}", task);
// 4) 让调用者抛出异常
// throw new RuntimeException("任务执行失败 " + task);
// 5) 让调用者自己执行任务
task.run();
});
for (int i = 0; i < 4; i++) {
int j = i;
threadPool.execute(() -> {
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.debug("{}", j);
});
}
}五、秒杀案例
业务流程
1),从redis服务器中获取入库的秒杀商品
2),判断商品是否存在,或是是商品库存是否小于等于0
3),如果秒杀商品存在,创建秒杀订单
4),把新增订单存储在redis服务器中
5),把存储在redis中入库的商品库存减一
6),判断库存是否小于0,卖完需要同步数据库
7),否则同步redis购物车数量
超卖问题
秒杀一般是具备大量并发,并发时就有可能出现超卖问题。可使用Redis队列实现。
通过定时任务或消息通知将秒杀物品加入到redis队列中,秒杀是从队列中获取商品,如果能获取到,则正常下单,如果没有,则表明已售空。
并发问题解决
通过redis可以解决拆卖问题,但需要很长等待时间。因此可以通过异步和多线程来实现。
用户一下单的时候确认用户是否符合下单条件,如果符合,则开启线程执行,执行完毕之后,用户等待查询结果即可。

代码示例
@Resource
private RedisTemplate redisTemplate;
@Resource
private ThreadPoolTaskExecutor executor;
@Resource
private CreateOrderThread createOrderThread;
@Override
public void saveOrder(Long id, String userid) {
//1.判断用户是否在排队队列
Boolean isMember = redisTemplate.boundSetOps(SysConsts.SECKILL_USER+id).isMember(userid);
if(isMember){
TbSeckillOrder seckillOrder = (TbSeckillOrder) redisTemplate.boundHashOps(TbSeckillOrder.class.getSimpleName()).get(userid);
//1.1在排队,判断用户是否在订单队列中
if(null != seckillOrder){
//1.1.1在订单队列,“您已抢购成功,请支付订单!”异常
return new Result(false, "您已抢购成功,请支付订单!");
}
//1.1.2不在订单队列,“您正在排队...”
return new Result(false, "您正在排队,请耐心等待。。。");
}
//2.判断商品是否售罄
Long goodsId = (Long) redisTemplate.boundListOps(SysConsts.SECKILL_PREFIX+id).rightPop();
if(null == goodsId ){
//2.售罄
return new Result(false, "对不起,商品已售罄,请查看其他商品!");
}
redisTemplate.boundSetOps(SysConsts.SECKILL_USER+id).add(userid);
redisTemplate.boundListOps(OrderRecord.class.getSimpleName()).leftPush(new OrderRecord(userid, id));
executor.execute(createOrderThread);
return new Result(true, "秒杀成功,请您尽快支付!");
}@Component
public class CreateOrderThread implements Runnable {
@Resource
private RedisTemplate redisTemplate;
@Resource
private IdWorker idWorker;
@Resource
private TbSeckillGoodsMapper seckillGoodsMapper;
@Override
public void run() {
OrderRecord orderRecord = (OrderRecord) redisTemplate.boundListOps(OrderRecord.class.getSimpleName()).rightPop();
if(null != orderRecord){
Long id = orderRecord.getSeckillid();
String userid = orderRecord.getUserid();
TbSeckillGoods seckillGoods = (TbSeckillGoods) redisTemplate.boundHashOps(TbSeckillGoods.class.getSimpleName()).get(id);
//3.未售罄,创建订单,以用户id为key存入redis
TbSeckillOrder seckillOrder = new TbSeckillOrder();
seckillOrder.setId(idWorker.nextId());
seckillOrder.setSeckillId(id);
seckillOrder.setMoney(seckillGoods.getCostPrice()); //秒杀价格
seckillOrder.setUserId(userid);
seckillOrder.setSellerId(seckillGoods.getSellerId());
seckillOrder.setCreateTime(new Date());
seckillOrder.setStatus("0");
redisTemplate.boundHashOps(TbSeckillOrder.class.getSimpleName()).put(userid, seckillOrder);
synchronized (CreateOrderThread.class){
seckillGoods = (TbSeckillGoods) redisTemplate.boundHashOps(TbSeckillGoods.class.getSimpleName()).get(id);
//4.更新库存,判断库存是否售罄
seckillGoods.setStockCount(seckillGoods.getStockCount() - 1);
if(seckillGoods.getStockCount() <= 0){
//5.售罄,同步秒杀商品数据库(seckillGoods),将秒杀商品从redis中删除
seckillGoodsMapper.updateByPrimaryKeySelective(seckillGoods);
redisTemplate.boundHashOps(TbSeckillGoods.class.getSimpleName()).delete(seckillGoods.getId());
} else {
//6.未售罄,更新redis中秒杀商品库存
redisTemplate.boundHashOps(TbSeckillGoods.class.getSimpleName()).put(seckillGoods.getId(), seckillGoods);
}
}
}
}
}