JUC高并发编程-共享模型之管程

共享模型之管程

共享模型之管程

共享带来的问题

Java的体现

两个线程对初始值为0的静态变量一个做自增,一个自减,各做5000次,结果是0 吗?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
static int counter = 0;

public static void main(String[] args){
Thread t1 = new Thread(()->{
for(int i = 0;i < 5000;i++){
counter++;
}
},"t1");

Thread t2 = new Thread(()->{
for(int i = 0;i < 5000;i++){
counter--;
}
},"t2");

t1.start();
t2.start();
t1.join();
t2.join();
log.debug("{}",counter);
}
问题分析

以上结果可能是正数、负数、零。因为java中对静态变量的自增,自减并不是原子操作,要彻底理解,必须从字节码来进行分析。

例如对于i++而言(i为静态变量),实际会产生如下的JVM字节码指令:

1
2
3
4
getstatic	i	//准备静态变量i的值
iconst_1 //准备常量1
iadd //自增
putstatic i //将修改后的值存入静态变量i

i–也类似

1
2
3
4
getstatic	i	//准备静态变量i的值
iconst_1 //准备常量1
isub //自减
putstatic i //将修改后的值存入静态变量i

java的内存模型如下,完成静态变量的自增,自减需要在主存和工作内存中进行数据交换:

img52

如果是单线程以上8行代码是顺序执行(不会交错)没有问题:

img53

但是多线程下这8行代码可能交错运行:

出现负数的情况:

img54

出现正数的情况:

img55

临界区(Critical Section)
  • 一个程序运行多个线程本身是没有问题的
  • 问题出在多个线程访问共享资源
    • 多个线程共享资源其实也没有问题
    • 在多个线程对共享资源读写操作时发生指令交错,就会出现问题
  • 一段代码块如果存在对共享资源的多线程读写操作,称这段代码块为临界区
1
2
3
4
5
6
7
8
9
10
11
12
13
static int counter = 0;

static void increment()
//临界区
{
counter++;
}

static void decrement()
//临界区
{
counter--;
}
竞态条件(Race Condition)

多个线程在临界区内执行,由于代码的执行序列不同而导致结果无法预测,称之为发生了竞态条件。

synchronized解决方法

应用之互斥

为了避免临界区的竞态条件发生,有多种手段可以达到目的。

  • 阻塞式的解决方案:synchronized,Lock
  • 非阻塞式的解决方案:原子变量

使用synchronized来解决上述问题,即俗称的【对象锁】,它采用互斥的方式让同一时刻至多只有一个线程能持有【对象锁】,其它线程再想获取这个【对象锁】时就会阻塞住。这样就能保证拥有锁的线程可以安全的执行临界区内的代码,不用担心线程上下文切换。

注意:虽然java中互斥和同步都可以采用synchronized关键字来完成,但它们还是有区别的:

  • 互斥是保证临界区的竞态条件发生,同一时刻只能有一个线程执行临界区代码
  • 同步是由于线程执行的先后、顺序不同、需要一个线程等待其它线程运行到某个点

synchronized语法

1
2
3
4
synchronized(对象)
{
临界区
}

解决

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
static int counter = 0;
static final Object room = new Object();

public static void main(String[] args) throws InterruptedException{
Thread t1 = new Thread(()->{
for(int i = 0;i < 5000;i++){
synchronized(room){
counter++;
}
}
},"t1");

Thread t2 = new Thread(()->{
for(int i = 0;i < 5000;i++){
synchronized(room){
counter--;
}
}
},"t2");

t1.start();
t2.start();
t1.join();
t2.join();
log.debug("{}",counter);
}

理解:

  • synchronized中的对象,可以想象为一个房间(room),有唯一入口(门)房间只能一次进入一人进行计算,线程t1,t2想象成两个人。
  • 当线程t1执行到synchronized(room)时就好比t1进入了这个房间,并锁住了门拿走了钥匙,在门内执行count++代码。
  • 这时候如果t2也运行到了synchronized(room)时,它发现门被锁住了,只能在门外等待,发生了上下文切换,阻塞住了。
  • 这中间即使t1的cpu时间片不幸用完,被踢出了门外(不要错误理解为锁住了对象就能一直执行下去),这时门还是锁住的,t1仍拿着钥匙,t2线程还在阻塞状态进不来,只有下次轮到t1自己再次获得时间片时才能开门进入。
  • 当t1执行完synchronized块内的代码,这时候才会从obj房间出来并解开门上的锁,唤醒t2线程把钥匙给他。t2线程这时才可以进入obj房间,锁住了门拿上钥匙,执行它的count–代码。

synchronized实际是用对象锁保证了临界区内代码的原子性,临界区内的代码对外是不可分割的,不会被线程切换所打断。

面向对象改进

把需要保护的共享变量放入一个类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public static void main(String[] args) throws InterruptedException{
Room room = new Room();
Thread t1 = new Thread(()->{
for(int i = 0;i < 5000;i++){
room.increment();
}
},"t1");

Thread t2 = new Thread(()->{
for(int i = 0;i < 5000;i++){
room.decrement();
}
},"t2");

t1.start();
t2.start();
t1.join();
t2.join();
log.debug("{}",room.getValue());
}

class Room{
int value = 0;

public void increment(){
synchronized(this){
value++;
}
}
public void decrement(){
synchronized(this){
value--;
}
}
public int getValue(){
synchronized(this){
return value;
}
}
}
方法上的synchronized
1
2
3
4
5
6
7
8
9
10
11
12
13
class Test{
public synchronized void test(){

}
}
//等价于
class Test{
public void test(){
synchronized(this){

}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
class Test{
public synchronized static void test(){

}
}
//等价于
class Test{
public static void test(){
synchronized(Test.class){

}
}
}
不加synchronized的方法

不加synchronized的方法就好比不遵守规则的人,不去老实排队。

所谓的“线程八锁”

其实就是考察synchronized锁住的是哪个对象

情况1:12或21

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
class Number{
public synchronized void a(){
log.debug("1");
}
public synchronized void b(){
log.debug("2");
}
}

public static void main(String[] args){
Number n1 = new Number();
new Thread(()->{
n1.a();
}).start();
new Thread(()->{
n1.b();
}).start();
}

情况2:一秒后,12或者2,一秒后1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
class Number{
public synchronized void a(){
sleep(1);
log.debug("1");
}
public synchronized void b(){
log.debug("2");
}
}

public static void main(String[] args){
Number n1 = new Number();
new Thread(()->{
n1.a();
}).start();
new Thread(()->{
n1.b();
}).start();
}

情况3://3 1秒后 12 或者 //23 1秒后 1或者//32 1秒后1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
class Number{
public synchronized void a(){
sleep(1);
log.debug("1");
}
public synchronized void b(){
log.debug("2");
}
public void c(){
log.debug("3");
}
}

public static void main(String[] args){
Number n1 = new Number();
new Thread(()->{
n1.a();
}).start();
new Thread(()->{
n1.b();
}).start();
new Thread(()->{
n1.c();
}).start();
}

情况4:2 -(1秒后)-1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
class Number{
public synchronized void a(){
sleep(1);
log.debug("1");
}
public synchronized void b(){
log.debug("2");
}
}
public static void main(String[] args){
Number n1 = new Number();
new Thread(()->{
n1.a();
}).start();
new Thread(()->{
n1.b();
}).start();
}

情况5:2 -(1秒后)-1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
class Number{
public static synchronized void a(){
sleep(1);
log.debug("1");
}
public synchronized void b(){
log.debug("2");
}
}
public static void main(String[] args){
Number n1 = new Number();
new Thread(()->{
n1.a();
}).start();
new Thread(()->{
n1.b();
}).start();
}

情况6:1s后-12,或者2-1秒后-1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
class Number{
public static synchronized void a(){
sleep(1);
log.debug("1");
}
public static synchronized void b(){
log.debug("2");
}
}
public static void main(String[] args){
Number n1 = new Number();
new Thread(()->{
n1.a();
}).start();
new Thread(()->{
n1.b();
}).start();
}

情况7:2 -(1秒后)-1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
class Number{
public static synchronized void a(){
sleep(1);
log.debug("1");
}
public synchronized void b(){
log.debug("2");
}
}
public static void main(String[] args){
Number n1 = new Number();
Number n2 = new Number();
new Thread(()->{
n1.a();
}).start();
new Thread(()->{
n2.b();
}).start();
}

情况8:1s后-12,或者2-1秒后-1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
class Number{
public static synchronized void a(){
sleep(1);
log.debug("1");
}
public static synchronized void b(){
log.debug("2");
}
}
public static void main(String[] args){
Number n1 = new Number();
Number n2 = new Number();
new Thread(()->{
n1.a();
}).start();
new Thread(()->{
n2.b();
}).start();
}

变量的线程安全分析

成员变量和静态变量是否线程安全?
  • 如果它们没有共享,则线程安全
  • 如果它们被共享了,根据它们的状态是否能够改变,又分为两种情况
    • 如果只有读操作,则线程安全
    • 如果有读写操作,则这段代码是临界区,需要考虑线程安全
局部变量是否线程安全?
  • 局部变量是线程安全的
  • 但局部变量引用的对象则未必
    • 如果该对象没有逃离方法的作用访问,它是线程安全的
    • 如果该对象逃离方法的作用范围,需要考虑线程安全
局部变量线程安全分析
1
2
3
4
public static void test1(){
int i = 10;
i++;
}

每个线程调用test1()方法时局部变量i,会在每个线程的栈帧内存中被创建多份,因此不存在共享。

img56

img57

局部变量的引用稍有不同

成员变量的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
class ThreadUnsafe{
ArrayList<String> list = new ArrayList<>();
public void method1(int loopNumber){
for(int i = 0;i < loopNumber;i++){
//{临界区,会产生竞态条件
method2();
method3();
//}临界区
}
}

private void method2(){
list.add("1");
}

private void method3(){
list.remove(0);
}
}

public class Test{
static final int THREAD_NUMBER = 2;
static final int LOOP_NUMBER = 200;
public static void main(String[] args){
ThreadUnsafe test = new ThreadUnsafe();
for(int i = 0;i < THREAD_NUMBER;i++){
new Thread(()->{
test.method1(LOOP_NUMBER);
},"Thread"+(i+1)).start();
}
}
}

其中一种情况是,如果线程2还未add,线程 1 remove就会报错:java.lang.IndexOutOfBoundsException

分析:

  • 无论哪个线程中的method2引用的都是同一个对象中的成员变量
  • method3与method2分析相同

img58

将list修改为局部变量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
class ThreadSafe{
public void method1(int loopNumber){
ArrayList<String> list = new ArrayList<>();
for(int i = 0;i < loopNumber;i++){
//{临界区,会产生竞态条件
method2(list);
method3(list);
//}临界区
}
}

private void method2(){
list.add("1");
}

private void method3(){
list.remove(0);
}
}

public class Test{
static final int THREAD_NUMBER = 2;
static final int LOOP_NUMBER = 200;
public static void main(String[] args){
ThreadUnsafe test = new ThreadUnsafe();
for(int i = 0;i < THREAD_NUMBER;i++){
new Thread(()->{
test.method1(LOOP_NUMBER);
},"Thread"+(i+1)).start();
}
}
}

修改为局部变量就不会出现上述问题

分析:

  • list是局部变量,每个线程调用时会创建其不同实例,没有共享
  • 而method2的参数是从method1中传过来的,与method1中引用同一个对象
  • method3的参数分析与method2相同

img59

方法访问修饰符带来的思考,如果把method2和method3的方法修改为public会不会带来线程安全问题?

  • 情况1:有其它线程调用method2和method3
  • 情况2:在情况1的基础上,为ThreadSafe类添加子类,子类覆盖method2或method3方法,即
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
class ThreadSafe{
public final void method1(int loopNumber){
ArrayList<String> list = new ArrayList<>();
for(int i = 0;i < loopNumber;i++){
//{临界区,会产生竞态条件
method2(list);
method3(list);
//}临界区
}
}

private void method2(ArrayList<String> list){
list.add("1");
}

private void method3(ArrayList<String> list){
list.remove(0);
}
}

class ThreadSafeSubClass extends ThreadSafe{
public void method3(ArrayList<String> list){
new Thread(()->{
list.remove(0);
}).start();
}
}
常见线程安全类
  • String
  • Integer
  • StringBuffer
  • Random
  • Vector
  • Hashtable
  • java.util.concurrent包下的类

这里说它们是线程安全指的是,多个线程调用它们同一个实例的某个方法时,是线程安全的,也可以理解为:

1
2
3
4
5
6
7
8
Hashtable table = new Hashtable();

new Thread(()->{
table.put("key","value1");
}).start();
new Thread(()->{
table.put("key","value2");
}).start();
  • 它们的每个方法是原子的
  • 但注意它们多个方法的组合不是原子的。
线程安全类方法的组合

分析下面代码是否安全?

1
2
3
4
5
Hashtable table = new Hashtable();
//线程1,线程2
if(table.get("key") == null){
table.put("key",value);
}

img60

不可变类线程安全性

String、Integer等都是不可变类,因为其内部的状态不可以改变,因此它们的方法是线程安全的。

1
2
3
4
5
6
7
8
9
10
11
public class Immutable{
private int value = 0;

public Immutable(int value){
this.value = value;
}

public int getValue(){
return this.value;
}
}
实例分析

例1:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
publc class MyServlet extends HttpServlet{
//是否线程安全?
Map<String,Object> map = new HashMap<>();
//是否线程安全?
String s1 = "...";
//是否线程安全?
final String s2 = "...";
//是否线程安全?
Date d1 = new Date();
//是否线程安全?
final Date d2 = new Date();

public void doGet(HttpServletRequest request,HttpServletResponse response){
//使用上述变量
}
}

例2:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class MyServlet extends HttpServlet{
//是否线程安全
private UserService userService = new UserServiceImpl();

public void doGet(HttpServletRequest request,HttpServletResponse response){
userService.update(...);
}
}

public class UserServiceImpl implements UserService{
//记录调用次数
private int count = 0;

public void update(){
//...
count++;
}
}

例3:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Aspect
@Component
public class MyAspect{
//是否线程安全
private long start = 0L;

@Before("execution(* *(..))")
public void before(){
start = System.nanoTime();
}

@After("execution(* *(..))")
public void after(){
long end = System.nanoTime();
System.out.println("cost time:" + (end-start));
}
}

例4:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class MyServlet extends HttpServlet{
//是否线程安全
private UserService userService = new UserServiceImpl();

public void doGet(HttpServletRequest request,HttpServletResponse response){
userService.update(...);
}
}

public class UserServiceImpl implements UserService{
//是否安全
private UserDao userDao = new UserDaoImpl();

public void update(){
//...
userDao.update();
}
}

public class UserDaoImpl implements UserDao{
publci void update(){
String sql = "update user set password = ? where username = ?";
//是否安全
try(Connection conn = DriverManager.getConnection("","","")){
//...
}catch(Exception e){
//...
}
}
}

例5:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class MyServlet extends HttpServlet{
//是否线程安全
private UserService userService = new UserServiceImpl();

public void doGet(HttpServletRequest request,HttpServletResponse response){
userService.update(...);
}
}

public class UserServiceImpl implements UserService{
//是否安全
private UserDao userDao = new UserDaoImpl();

public void update(){
//...
userDao.update();
}
}

public class UserDaoImpl implements UserDao{
//是否线程安全
private Connection conn = null;
publci void update()throws SQLException{
String sql = "update user set password = ? where username = ?";
conn = DriverManager.getConnection("","","");
//...
conn.close();
}
}

例6:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class MyServlet extends HttpServlet{
//是否线程安全
private UserService userService = new UserServiceImpl();

public void doGet(HttpServletRequest request,HttpServletResponse response){
userService.update(...);
}
}

public class UserServiceImpl implements UserService{

public void update(){
//...
UserDao userDao = new UserDaoImpl();
userDao.update();
}
}

public class UserDaoImpl implements UserDao{
//是否线程安全
private Connection conn = null;
publci void update()throws SQLException{
String sql = "update user set password = ? where username = ?";
conn = DriverManager.getConnection("","","");
//...
conn.close();
}
}

例7:

1
2
3
4
5
6
7
8
9
10
11
12
public abstract class Test{
public void bar(){
SimpleDateFormate sdf = new SimpleDateFormate("yyyy-MM-dd HH:mm:ss");
foo(sdf);
}

public abstract foo(SimpleDateFormate sdf);

public static void main(String[] args){
new Test().bar();
}
}

Monitor概念

Java对象头

以32位虚拟机为例

普通对象

img61

数组对象

img62

其中Mark World结构为

img64

Monitor(锁)

Monitor被翻译为监视器或管程

每个java对象都可以关联一个Monitor对象,如果使用synchronized给对象锁(重量级)之后,该对象头的Mark Word中就被设置指向Monitor对象的指针。

Monitor结构如下:

img63

  • 刚开始Monitor中Owner为null
  • 当Thread-2执行synchronized(obj)就会将Monitor的所有者Owner置为Thread-2,Monitor中只能有一个Owner。
  • 在Thread-2上锁过程中,如果Thread-3,Thread-4,Thread-5也来执行synchronized(obj),就会进入EntryList BLOCKED.
  • Thread-2执行完同步代码块的内容,然后唤醒EntryList中等待的线程来竞争锁,竞争时是非公平的。
  • 图中WaitSet中的Thread-0,Thread-1是之前获得过锁,但条件不满足进入WAITING状态的线程,后面讲wait-notify分析

注意:

  • synchronized必须是进入同一对象的monitor才有上述的效果
  • 不加synchronized的对象不会关联监听器,不遵从以上规则。
synchronized原理进阶
轻量级锁

轻量级锁的使用场景:如果一个对象虽然有多线程访问,但多线程访问的时间是错开的(也就是没有竞争),那么可以使用轻量级锁来优化。

轻量级锁对使用者是透明的,即语法仍是synchronized

假设有两个方法同步块,利用同一对象加锁

1
2
3
4
5
6
7
8
9
10
11
12
static final Object obj = new Object();
public static void method1(){
synchronized(obj){
//同步块A
method2();
}
}
public static void method2(){
synchronized(obj){
//同步块B
}
}
  • 创建锁记录对象,每个线程的栈帧都会包含一个锁记录的结构,内部可以存储锁定对象的Mark Word。

img65

  • 让锁记录中Object Reference指向锁对象,并尝试用cas替换Object的Mark Word,将Mark Word的值存入锁记录。

img66

  • 如果cas替换成功,对象头中存储了锁记录地址和状态00,表示由该线程给对象加锁,这时图示如下:

img67

  • 如果cas失败,有两种情况
    • 如果其它线程已经持有了该Object的轻量级锁,这时表明有竞争,进入锁膨胀过程
    • 如果是自己执行了synchronized锁重入,那么再添加一条Lock Record作为重入的计数

img68

  • 当退出synchronized代码块(解锁时)如果有取值为null的锁新纪录,表示有重入,这时重置锁记录,表示重入计数减一。

img69

  • 当退出synchronized代码块(解锁时)锁记录的值不为null,这时使用cas将Mark Word的值恢复给对象头
    • 成功,则解锁成功
    • 失败,说明轻量级锁进行了锁膨胀或已经升级为重量级锁,进入重量级锁解锁流程
锁膨胀

如果在尝试加轻量级锁的过程中,CAS操作无法成功,这时一种情况就是有其它线程为此对象加上了轻量级锁(有竞争),这时需要进行锁膨胀,将轻量级锁变为重量级锁。

1
2
3
4
5
6
static Object obj = new Object();
public static void method1(){
synchronized(obj){
//同步块
}
}
  • 当Thread-1进行轻量级加锁时,Thread-0已经对该对象加了轻量级锁

img70

  • 这时Thread-1加轻量级锁失败,进入锁膨胀流程

    • 即为Object对象申请Monitor锁,让Object指向重量级锁地址
    • 然后自己进入Monitor的EntryList Blocked

    img71

  • 当Thread-0退出同步块解锁时,使用cas将Mark Word的值恢复给对象头,失败。这时会进入重量级解锁流程,即按照Monitor地址找到Monitor对象,设置Owner为null,唤醒EntryList中Blocked线程。

自旋优化

重量级锁竞争时,还可以使用自旋来进行优化,如果当前线程自旋成功(即这个时候持锁线程已经退出了同步块,释放了锁),这时当前线程就可以避免阻塞。

自旋重试成功的情况:

img72

自旋重试失败的情况:

img73

  • 在java6之后自旋锁是自适应的,比如对象刚刚的一次自旋操作成功过,那么认为自旋成功的可能性会高,就多自旋几次;反之,就少自旋甚至不自旋。
  • 自旋会占用CPU时间,单核CPU自旋就是浪费时间,多核CPU自旋才能发挥优势。
  • Java7之后不能控制是否开启自旋功能。
偏向锁

轻量级锁在没有竞争时(就自己这个进程),每次重入仍然需要执行CAS操作。

Java6中引入了偏向锁来做进一步优化:只有第一次使用CAS将线程ID设置到对象的Mark Word头,之后发现这个线程ID是自己的就表示没有竞争,不用重新CAS。以后只要不发生竞争,这个对象就归该线程所有。

例如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
static final Object obj = new Object();
public static void m1(){
synchronized(obj){
//同步块A
m2();
}
}
public static void m2(){
synchronized(obj){
//同步块B
m3();
}
}
public static void m3(){
synchronized(obj){
//同步块C
}
}

偏向状态

img74

一个对象创建时:

  • 如果开启了偏向锁(默认开启),那么对象创建后,markword值为0x05即最后3位为101,这时它的thread,epoch,age都为0。
  • 偏向锁是默认延迟的,不会再程序启动时立即生效,如果想避免延迟,可以加VM参数:-XX:BaisedLockingStarupDelay=0来禁用延迟。
  • 如果没有开启偏向锁,那么对象创建后,markword值为0x01即最后3位为001,这时hashcode、age都为0,第一次用到hashcode时才会赋值。

wait/notify

原理之wait/notify

img75

  • Owner线程发现条件不足,调用wait方法,即可进入WaitSet变为WAITING状态
  • BLOCKED和WAITING的线程都处于阻塞状态,不占用CPU时间片
  • BLOCKED线程会在Owner线程释放锁时唤醒
  • WAITING线程会在Owner线程调用notify或notifyAll时唤醒,但唤醒后并不意味着立刻获得锁,仍需进入EntryList重新竞争
API介绍
  • obj.wait()让进入object监视器的线程到waitSet等待
  • obj.notify()在object上正在waitSet等待的线程中挑一个唤醒
  • obj.notifyAll()让object上正在waitSet等待的线程全部唤醒

它们都是线程之间进行协作的手段,都属于Object对象的方法。必须获得此对象的锁,才能调用这几个方法。

1
2
3
4
5
6
7
8
9
10
11
12
public class Test{
static final Object lock = new Object();
public static void main(String[] args){
synchronized(lock){
try{
lock.wait();
}catch(InterruptedException){
e.printStackTrace();
}
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
static final Object obj = new Object();
public static void main(String[] args){
new Thread(()->{
synchronized(obj){
log.debug("执行.....");
try{
obj.wait();//让线程在obj上一直等待下去
}catch(InterruptedException e){
e.prinStackTrace();
}
log.debug("其它代码....");
}
},"t1").start();

new Thread(()->{
synchronized(obj){
log.debug("执行.....");
try{
obj.wait();//让线程在obj上一直等待下去
}catch(InterruptedException e){
e.prinStackTrace();
}
log.debug("其它代码....");
}
},"t2").start();

//主线程2秒后执行
sleep(2);
log.debug("唤醒obj上其它线程");
synchronized(obj){
obj.notify();//唤醒obj上一个线程
obj.notifyAll();//唤醒obj上所有等待线程
}
}
wait/notify的正确姿势
sleep(long n)和wait(long n)的区别
  • sleep是Thread方法,而wait是Object方法
  • sleep不需要强制和synchronized配合使用,但wait需要和synchronized一起用
  • sleep在睡眠的同时,不会释放对象锁的,但wait在等待的时候会释放对象锁
  • 它们状态TIMED_WAITING
step1
1
2
3
static final Object room = new Object();
static boolean hasCigarette = false;
static boolean hasTakeOut = false;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
new Thread(()->{
synchronized(room){
log.debug("有烟没?[{}]",hasCigarette);
if(!hasCigarette){
log.debug("没烟,先歇会!");
sleep(2);
}
log.debug("有烟没?[{}]",hasCigarette);
if(hasCigarette){
log.debug("可以开始干活了!");
}
}
},"小南").start();

for(int i = 0;i < 5;i++){
new Thread(()->{
synchronized(room){
log.debug("可以开始干活了!");
}
},"其它人").start();
}

sleep(1);
new Thread(()->{
hasCigarette = true;
log.debug("烟到了哦!");
},"送烟的").start();

缺点:

  • 其它干活的线程,都要一直阻塞,效率太低
  • 小南线程必须睡足2s后才能醒来,就算烟提前送到,也无法立刻醒来
  • 加了synchronized(room)后,就好比小南在里面反锁了门睡觉,烟根本没法送进门,main没加synchronized就好像main线程里翻窗户进来的
  • 解决方法,使用wait-notify机制
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
new Thread(()->{
synchronized(room){
log.debug("有烟没?[{}]",hasCigarette);
if(!hasCigarette){
log.debug("没烟,先歇会!");
//sleep(2);
try{
room.wait();
}catch(InterruptedException e){
e.prinStackTrace();
}
}
log.debug("有烟没?[{}]",hasCigarette);
if(hasCigarette){
log.debug("可以开始干活了!");
}
}
},"小南").start();

for(int i = 0;i < 5;i++){
new Thread(()->{
synchronized(room){
log.debug("可以开始干活了!");
}
},"其它人").start();
}

sleep(1);
new Thread(()->{
hasCigarette = true;
log.debug("烟到了哦!");
room.notify();
},"送烟的").start();
1
2
3
4
5
6
7
8
9
10
synchronized(lock){
while(条件不成立){
lock.wait();
}
//干活
}
//另一个线程
synchronized(lock){
lock.notifyAll();
}

同步模式之保护性暂停

即Guarded Suspension,用在一个线程等待另一个线程的执行结果

要点

  • 有一个结果需要从一个线程传递到另一个线程,让他们关联同一个GuardedObject
  • 如果有结果不断从一个线程到另一个线程那么可以使用消息队列
  • JDK中,join的实现,Future的实现,采用的就是此模式
  • 因为要等待另一方的结果,因此归纳到同步模式

img76

实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
public class Test{
//线程1等待线程2的下载结果
public static void main(String[] args){
GuardedObject guardedObject = new GuardedObject();
new Thread(()->{
//等待结果
log.debug("等待结果");
List<String> list = (List<String>)guardedObject.get();
},"t1").start();

new Thread(()->{
log.debug("执行下载");
try{
List<String>list = DownLoader.download();
guardedObject.complete(list);
log.debug("结果大小:{}",list.size());
}catch(InterruptedException e){
e.printStackTrace();
}
},"t2").start();
}
}

class GuardedObject{
//结果
private Object response;

//获取结果
public Object get(){
synchronized(this){
while(response == null){
try{
this.wait();
}catch(InterruptedException e){
e.printStackTrace();
}
}
return response;
}
}
//产生结果
public void complete(){
synchronized(this){
this.response = response;
this.notifyAll();
}
}
}
join原理

调用者轮询检查线程 alive 状态。

1
2
3
4
5
6
7
8
t1.join();
//等价于下面的代码
synchronized (t1) {
// 调用者线程进入 t1 的 waitSet 等待, 直到 t1 运行结束
while (t1.isAlive()) {
t1.wait(0);
}
}

异步模式之生产者/消费者

要点

  • 与前面的保护性暂停中的GuardObject不同,不需要产生结果和消费结果的线程一一对应
  • 消费队列可以用来平衡生产和消费的线程资源
  • 生产者仅仅负责产生结果数据,不关心数据该如何处理,而消费者专心处理结果数据
  • 消息队列是由容量限制的,满时不会再加入数据,空时不会再消耗数据
  • JDK中各种阻塞队列,采用的就是这种模式

img77

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
public class Test{
public static void main(String[] args){
MessageQueue queue = new MessageQueue(2);
for(int i = 0;i < 3;i++){
int id = i;
new Thread(()->{
queue.put(new Message(i,"值"+i));
},"生产者" + i).start();
}

new Thread(()->{
while(true){
sleep(1);
Message message = queue.take();
}
},"消费者");
}
}

//消息队列,java线程间通信
class MessageQueue{
//消息的队列集合
private LinkedList<Message> list = new LinkedList<>();
//队列容量
private int capcity;

public MessageQueue(int capcity){
this.capcity = capcity;
}
//获取消息
public Message take(){
//检查队列是否为空
synchronized(list){
while(list.isEmpty()){
try{
log.debug("队列为空,消费者线程等待");
list.wait();
}catch(InterruptedException e){
e.printStackTrace();
}
}
//从队列的头部获取消息并返回
Message message = list.removeFirst();
log.debug("已消费消息{}",message);
list.notifyAll();
return message;
}
}

//存入消息
public void put(Message message){
synchronized(list){
//检查队列是否已满
while(list.size() == capcity){
try{
log.debug("队列已满,生产者线程等待");
list.wait();
}catch(InterruptedException e){
e.prinStackTrace();
}
}
//将消息加入队列尾部
list.addLast(message);
log.debug("已生产消息{}",message);
list.notifyAll();
}
}
}

final class Message{
private int id;
private Object value;

public Message(int id,Object value){
this.id = id;
this.value = value;
}

public int getId(){
return id;
}

public Object getValue(){
return value;
}

@override
public String toString(){
return "Message{" +
"id=" + id +
",value=" + value +
'}';
}
}

终止模式之两阶段终止模式

Two Phase Termination

在一个线程T1中如何优雅终止线程T2?这里优雅指的是给T2一个料理后事的机会。

错误思路
  • 使用线程对象的stop()方法停止线程
    • stop方法会真正杀死线程,如果这时线程锁住了共享资源,那么当它被杀死后就再也没有机会释放锁,其它线程将永远无法获取锁
  • 使用System.exit(init)方法停止线程
    • 目的仅是停止一个线程,但这种做法会让整个程序都停止

park&upark

基本使用

它们是LockSupport类中的方法

1
2
3
4
//暂停当前线程
LockSupport.park();
//恢复某个线程的运行
LockSupport.unpark(暂停线程对象);

先park再unpark

1
2
3
4
5
6
7
8
9
10
11
12
Thread t1 = new Thread(()->{
log.debug("start...");
sleep(1);
log.debug("park...");
LockSupport.park();
log.debug("resume...");
},"t1");
t1.start();

sleep(2);
log.debug("unpark...");
LockSupport.unpark(t1);

特点

与Object的wait&notify相比

  • wait,notify和notifyAll必须配合Object Monitor一起使用,而park,unpark不必
  • park & unpark是以线程为单位来阻塞和唤醒线程,而notify只能随机唤醒一个等待线程,notifyAll是唤醒所有等待线程,就不那么精确
  • park & unpark可以先unpark,而wait & notify不能先notify
原理之park & unpark

每个线程都有自己的一个Parker对象,由三部分组成_counter,_cond和_mutex打个比喻

  • 线程就像一个旅人,Parker就像他随身携带的背包,条件变量就好比背包中的帐篷。_counter就好比背包中的备用干粮(0为耗尽,1为充足)
  • 调用park就是要看需不需要停下来歇息
    • 如果备用干粮耗尽,那么钻进帐篷歇息
    • 如果备用干粮充足,那么不需要停留,继续前进
  • 调用unpark,就好比令干粮充足
    • 如果这时线程还在帐篷,就唤醒让他继续前进
    • 如果这时线程还在运行,那么下次他调用park时,仅是消耗备用干粮,不需要停留继续前进
    • 因为背包空间有限,多次调用unpark仅会补充一份备用干粮

img78

  1. 当线程调用Unsafe.park()方法
  2. 检查_counter,本情况为0,这时,获得_mutex互斥锁
  3. 线程进入_cond条件变量阻塞
  4. 设置_counter = 0

img79

  1. 调用Unsafe.unpark(Thread_0)方法,设置_counter为1
  2. 唤醒_cond条件变量中的Thread_0
  3. Thread_0恢复运行
  4. 设置_counter为0

img80

  1. 调用Unsafe.unpark(Thread_0)方法,设置_counter为1
  2. 当前线程调用Unsafe.park()方法
  3. 检查_counter,本情况为1,这时线程无需阻塞,继续运行
  4. 设置_counter为0

重新理解线程状态转换

img81假设有线程Thread t

情况1:NEW—>RUNNABLE

  • 当调用t.start()方法时,由NEW—>RUNNABLE

情况2:RUNNABLE<—>WAITING

t线程用synchronized(obj)获取了对象锁后

  • 调用obj.wait()方法时,t线程从RUNNABLE—>WAITING
  • 调用obj.notify(),obj.notifyAll(),t.interrupt()时
    • 竞争锁成功,t线程从WAITING—>RUNNABLE
    • 竞争锁失败,t线程从WAITING—>BLOCKED

情况3:RUNNABLE <–> WAITING

  • 当前线程调用 t.join () 方法时,当前线程从 RUNNABLE –> WAITING,注意是当前线程在 t 线程对象的监视器上等待。
  • t 线程运行结束,或调用了当前线程的 interrupt () 时,当前线程从 WAITING –> RUNNABLE。

情况4:RUNNABLE <–> WAITING

  • 当前线程调用 LockSupport.park () 方法会让当前线程从 RUNNABLE –> WAITING。
  • 调用 LockSupport.unpark (目标线程) 或调用了线程 的 interrupt () ,会让目标线程从 WAITING –> RUNNABLE。

情况5:RUNNABLE <–> TIMED_WAITING

  • t 线程用 synchronized (obj) 获取了对象锁后,调用 obj.wait (long n) 方法时,t 线程从 RUNNABLE –> TIMED_WAITING。
  • t 线程等待时间超过了 n 毫秒,或调用 obj.notify () , obj.notifyAll () , t.interrupt () 时:
    • 竞争锁成功,t 线程从 TIMED_WAITING –> RUNNABLE
    • 竞争锁失败,t 线程从 TIMED_WAITING –> BLOCKED

情况6:RUNNABLE <–> TIMED_WAITING

当前线程调用 t.join (long n) 方法时,当前线程从 RUNNABLE –> TIMED_WAITING。

当前线程等待时间超过了 n 毫秒,或 t 线程运行结束,或调用了当前线程的 interrupt () 时,当前线程从 TIMED_WAITING –> RUNNABLE。

情况7:RUNNABLE <–> TIMED_WAITING

  • 当前线程调用 Thread.sleep (long n) ,当前线程从 RUNNABLE –> TIMED_WAITING。
  • 当前线程等待时间超过了 n 毫秒,当前线程从 TIMED_WAITING –> RUNNABLE。

情况8:RUNNABLE <–> TIMED_WAITING

  • 当前线程调用 LockSupport.parkNanos (long nanos) 或 LockSupport.parkUntil (long millis) 时,当前线 程从 RUNNABLE –> TIMED_WAITING。
  • 调用 LockSupport.unpark (目标线程) 或调用了线程 的 interrupt () ,或是等待超时,会让目标线程从 TIMED_WAITING–> RUNNABLE。

情况9:RUNNABLE <–> BLOCKED

  • t 线程用 synchronized (obj) 获取了对象锁时如果竞争失败,从 RUNNABLE –> BLOCKED。
  • 持 obj 锁线程的同步代码块执行完毕,会唤醒该对象上所有 BLOCKED 的线程重新竞争,如果其中 t 线程竞争成功,从 BLOCKED –> RUNNABLE ,其它失败的线程仍然 BLOCKED。

情况10:RUNNABLE <–> TERMINATED

  • 当前线程所有代码运行完毕,进入 TERMINATED。

多把锁

多把不相干的锁

一间大屋子有两个功能:睡觉、学习,互不相干。

现在小南要学习,小女要睡觉,但如果只用一间屋子(一个对象锁)的话,那么并发度很低

解决方法是准备多个房间(多个对象锁)

例如

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public static void main(String[] args){
BigRoom bigRoom = new BigRoom();
new Thread(()->{
bigRoom.study();
},"小南").start();
new Thread(()->{
bigRoom.sleep();
},"小女").start();
}

class BigRoom{
public void sleep(){
synchronized(this){
log.debug("sleeping 2 小时");
Sleeper.sleep(1);
}
}

public void study(){
synchronized(this){
log.debug("study 1 小时");
Sleeper.sleep(1);
}
}
}

改进:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public static void main(String[] args){
BigRoom bigRoom = new BigRoom();
new Thread(()->{
bigRoom.study();
},"小南").start();
new Thread(()->{
bigRoom.sleep();
},"小女").start();
}

class BigRoom{
private final Object studyRoom = new Object();
private final Object sleepRoom = new Object();
public void sleep(){
synchronized(sleepRoom){
log.debug("sleeping 2 小时");
Sleeper.sleep(1);
}
}

public void study(){
synchronized(studyRoom){
log.debug("study 1 小时");
Sleeper.sleep(1);
}
}
}

将锁的粒度细分

  • 好处,可以增强并发度
  • 坏处,如果一个线程需要同时获得多把锁,就容易发生死锁

活跃性

死锁

有这样情况:一个线程需要同时获取多把锁,这时就容易发生死锁

t1线程获得A对象锁,接下来想获取B对象的锁

t2线程获得B对象锁,接下来想获取A对象的锁

例如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
Object A = new Object();
Object B = new Object();
Thread t1 = new Thread(()->{
synchronized(A){
log.debug("lock A");
sleep(1);
synchronized(B){
log.debug("lock B");
log.debug("操作...");
}
}
},"t1");

Thread t2 = new Thread(()->{
synchronized(B){
log.debug("lock B");
sleep(0.5);
synchronized(A){
log.debug("lock A");
log.debug("操作...");
}
}
},"t2");
t1.start();
t2.start();
定位死锁

检测死锁可以使用jconsole工具,或者使用jps定位进程id,再用jstack定位死锁。

哲学家就餐问题

img82

有五位哲学家,围坐在圆桌旁

  • 他们只做两件事,思考和吃饭,思考一会吃口饭,吃完饭后接着思考
  • 吃饭时要用两根筷子,桌上共有5根筷子,每位哲学家左右手边各有一根筷子
  • 如果筷子被身边的人拿着,自己就得等待

筷子类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
public class TestDeadLock{
public static void main(String[] args){
Chopstick c1 = new Chopstick("1");
Chopstick c2 = new Chopstick("2");
Chopstick c3 = new Chopstick("3");
Chopstick c4 = new Chopstick("4");
Chopstick c5 = new Chopstick("5");
new Philosopher("苏格拉底",c1,c2).start();
new Philosopher("柏拉图",c2,c3).start();
new Philosopher("亚里士多德",c3,c4).start();
new Philosopher("赫拉克利特",c4,c5).start();
new Philosopher("阿基米德",c5,c1).start();
}
}

//哲学家类
class Philosopher extends Thread{
Chopstick left;
Chopstick right;

public Philosopher(String name,Chopstick left,Chopstick right){
super(name);
this.left = left;
this.right = right;
}

@override
public void run(){
while(true){
//尝试获得左手筷子
synchronized(left){
//尝试获得右手筷子
synchronized(right){
eat();
}
}
}
}

public void eat(){
log.debug("eating...");
Sleeper.sleep(1);
}
}

//筷子类
class Chopstick{
String name;
public Chopstick(String name){
this.name = name;
}
@override
public String toString(){
return "筷子{" + name +'}';
}
}
活锁

活锁出现在两个线程互相改变对方的结束条件,最后谁也无法结束,例如

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class TestLiveLock{
static volatile int count = 10;
static final Object lock = new Object();

public static void main(String[] args){
new Thread(()->{
//期望减到0退出循环
while(count > 0){
sleep(0.2);
count--;
log.debug("count:{}",count);
}
},"t1").start();
new Thread(()->{
//期望超过20退出循环
while(count < 20){
sleep(0.2);
count++;
log.debug("count:{}",count);
}
},"t2").start();
}
}
饥饿

很多教程把饥饿定义为,一个线程由于优先级太低,始终得不到CPU调度执行,也不能够结束,饥饿的情况不易演示,讲读写锁时会涉及到饥饿问题。

使用顺序加锁的方式解决之前的死锁问题

img83

ReentrantLock

相对于synchronized它具备如下特点

  • 可中断
  • 可以设置超时时间
  • 可以设置为公平锁
  • 支持多个条件变量

与synchronized一样,都支持可重入。

基本语法

1
2
3
4
5
6
7
8
//获取锁
reentrantLock.lock();
try{
//临界区
}finally{
//释放锁
reentrantLock.unlock();
}
可重入

可重入是指同一个线程如果首次获得了这把锁,那么因为它是这把锁的拥有者,因此有权利再次获取这把锁

如果是不可重入,那么第二次获得锁时,自己也会被锁挡住

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
    private static ReentrantLock lock = new ReentrantLock();

public static void main(String[] args){
method1();
}

public static void method1(){
lock.lock();
try{
log.debug("execute method1");
method2();
}finally{
lock.unlock();
}
}

public static void method2(){
lock.lock();
try{
log.debug("execute method2");
method3();
}finally{
lock.unlock();
}
}

public static void method3(){
lock.lock();
try{
log.debug("execute method2");
method3();
}finally{
lock.unlock();
}
}
可打断
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
private static ReentrantLock lock = new ReentrantLock();
public static void main(String[] args){
new Thread(()->{
try{
//如果没有竞争那么此方法就会获取lock对象锁
//如果有竞争就进入阻塞队列,可以被其它线程用interrupt方法打断
log.debug("尝试获取锁");
lock.lockInterruptibly();
}catch(InterruptedException e){
e.printStackTrace();
log.debug("没有获得锁,返回");
return;
}
try{
log.debug("获取锁");
}finally{
lock.unlock();
}
},"t1");

lock.lock();
t1.start();

sleep(1);
log.debug("打断t1");
t1.interrupt();
}
锁超时

立刻失效

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
ReentrantLock lock = new ReentrantLock(); 
Thread t1 = new Thread(()->{
log.debug("启动...");
//尝试获得锁
if(!lock.tryLock()){
log.debug("获取立刻失败,返回");
return;
}
try{
log.debug("获得锁");
}finally{
lock.unlock();
}
},"t1");

lock.lock();
log.debug("获得锁");
t1.start();
try{
sleep(2);
}finally{
lock.unlock();
}
解决哲学家就餐
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
public class TestDeadLock{
public static void main(String[] args){
Chopstick c1 = new Chopstick("1");
Chopstick c2 = new Chopstick("2");
Chopstick c3 = new Chopstick("3");
Chopstick c4 = new Chopstick("4");
Chopstick c5 = new Chopstick("5");
new Philosopher("苏格拉底",c1,c2).start();
new Philosopher("柏拉图",c2,c3).start();
new Philosopher("亚里士多德",c3,c4).start();
new Philosopher("赫拉克利特",c4,c5).start();
new Philosopher("阿基米德",c5,c1).start();
}
}

//哲学家类
class Philosopher extends Thread{
Chopstick left;
Chopstick right;

public Philosopher(String name,Chopstick left,Chopstick right){
super(name);
this.left = left;
this.right = right;
}

@override
public void run(){
while(true){
//尝试获得左手筷子
if(left.tryLock())
try{
//尝试获得右手筷子
if(right.tryLock()){
try{
eat();
}finally{
right.unlock();
}
}
}finally{
left.unlock();//释放自己手里的筷子
}
}
}
}

public void eat(){
log.debug("eating...");
Sleeper.sleep(1);
}
}

//筷子类
class Chopstick extends ReentrantLock{
String name;
public Chopstick(String name){
this.name = name;
}
@override
public String toString(){
return "筷子{" + name +'}';
}
}
公平锁

ReentrantLock默认是不公平的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
ReentrantLock lock = new ReentrantLock(false);

lock.lock();
for(int i = 0;i < 500;i++){
new Thread(()->{
lock.lock();
try{
System.out.println(Thread.currentThread().getName + "running...");
}finally{
lock.unlock();
}
},"t" + i).start();
}

//1秒之后去争夺锁
Thread.sleep(1000);
new Thread(()->{
System.out.println(Thread.currrentThread().getName() + "start...");
lock.lock();
try{
System.out.println(Thread.currentThread().getName() + "running...");
}finally{
lock.unlock();
}
},"强行插入").start();
条件变量

synchronized中也有条件变量,就是我们讲原理时那个waitSet休息室,当条件不满足时进入waitSet等待

ReentrantLock的条件变量比synchronized强大之处在于,它是支持多个条件变量的,这就好比

  • synchronized是那些不满足条件的线程都在一间休息室等消息
  • 而ReentrantLock支持多间休息室,有专门等烟的休息室、专门等早餐的休息室,唤醒时,也是按休息室来唤醒

使用流程

  • await前需要获得锁
  • await执行后,会释放锁,进入conditionObject
  • await的线程被唤醒(或打断、或超时)取重新竞争lock锁
  • 竞争lock锁成功后,从await后继续执行
1
2
3
4
5
6
7
8
9
10
11
12
13
static ReentrantLock lock = new ReentrantLock();
static Condition waitcigaretteQueue = lock.newCondition();
static Condition waitbreakfastQueue = lock.newCondition();
static volatile boolean hascigrette = false;
static volatile boolean hasBreakfast = false;

public static void main(String[] args){
new Thread(()->{
try{
lock.lock();
}
});
}

同步模式之顺序控制

固定顺序运行

wait & notify版

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
//用来同步的对象
static Object obj = new Object();
//t2运行标记,代表t2是否执行过
static boolean t2runed = false;

public static void main(String[] args){
Thread t1 = new Thread(()->{
synchronized(obj){
while(!t2runed){
try{
//t1先等一会
obj.wait();
}catch(InterruptedException e){
e.printStackTrace();
}
}
log.debug("1");
}
},"t1");

Thread t2 = new Thread(()->{
synchronized(obj){
log.debug("2");
t2runed = true;
lock.notify();
}
},"t2");

t1.start();
t2.start();
}

park & unpark版

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class Test{
public static void main(String[] args){
new Thread(()->{
LockSupport.park();
log.debug("1");
},"t1");
t1.start();

new Thread(()->{
log.debug("2");
LockSupport.unpark(t1);
},"t2").start();
}
}
交替输出

线程1输出a5次,线程2输出b5次,线程3输出c5次。现在要求输出abcabcabcabcabc。

wait & notify版

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
 public static void main(String[] args) {
WaitAndNotify waitAndNotify = new WaitAndNotify(1, 5);

new Thread(()->{
waitAndNotify.run("a", 1, 2);
}).start();
new Thread(()->{
waitAndNotify.run("b", 2, 3);
}).start();
new Thread(()->{
waitAndNotify.run("c", 3, 1);
}).start();
}
}

class WaitAndNotify {
public void run(String str, int flag, int nextFlag) {
for(int i = 0; i < loopNumber; i++) {
synchronized(this) {
while (flag != this.flag) {
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.print(str);
// 设置下一个运行的线程标记
this.flag = nextFlag;
// 唤醒所有线程
this.notifyAll();
}
}
}

private int flag;
private int loopNumber;

public WaitAndNotify(int flag, int loopNumber) {
this.flag = flag;
this.loopNumber = loopNumber;
}

park & unpark版

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public static Thread t1, t2, t3;
public static void main(String[] args) {
ParkAndUnPark obj = new ParkAndUnPark(5);
t1 = new Thread(() -> {
obj.run("a", t2);
});

t2 = new Thread(() -> {
obj.run("b", t3);
});

t3 = new Thread(() -> {
obj.run("c", t1);
});
t1.start();
t2.start();
t3.start();

LockSupport.unpark(t1);
}
}

class ParkAndUnPark {
public void run(String str, Thread nextThread) {
for(int i = 0; i < loopNumber; i++) {
LockSupport.park();
System.out.print(str);
LockSupport.unpark(nextThread);
}
}

private int loopNumber;

public ParkAndUnPark(int loopNumber) {
this.loopNumber = loopNumber;
}

await & signal版

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
public static void main(String[] args) {
AwaitAndSignal lock = new AwaitAndSignal(5);
Condition a = lock.newCondition();
Condition b = lock.newCondition();
Condition c = lock.newCondition();
new Thread(() -> {
lock.run("a", a, b);
}).start();

new Thread(() -> {
lock.run("b", b, c);
}).start();

new Thread(() -> {
lock.run("c", c, a);
}).start();

try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}

lock.lock();
try {
a.signal();
}finally {
lock.unlock();
}
}
}

class AwaitAndSignal extends ReentrantLock {
public void run(String str, Condition current, Condition nextCondition) {
for(int i = 0; i < loopNumber; i++) {
lock();
try {
current.await();
System.out.print(str);
nextCondition.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
unlock();
}
}
}

private int loopNumber;

public AwaitAndSignal(int loopNumber) {
this.loopNumber = loopNumber;
}
-------------本文结束感谢您的阅读-------------