1、JUC是什么
java.util.concurrent在并发编程中使用的工具类
进程/线程回顾
1.1 进程/线程是什么?
进程:进程是一个具有一定独立功能的程序关于某个数据集合的一次运行活动。它是操作系统动态执行的基本单元,在传统的操作系统中,进程既是基本的分配单元,也是基本的执行单元。
线程:通常在一个进程中可以包含若干个线程,当然一个进程中至少有一个线程,不然没有存在的意义。线程可以利用进程所拥有的资源,在引入线程的操作系统中,通常都是把进程作为分配资源的基本单位,而把线程作为独立运行和独立调度的基本单位,由于线程比进程更小,基本上不拥有系统资源,故对它的调度所付出的开销就会小得多,能更高效的提高系统多个程序间并发执行的程度。
1.2 进程/线程例子
进程:一个程序,QQ.exe Music.exe 程序的集合;
一个进程往往可以包含多个线程,至少包含一个!
Java默认有几个线程? 2 个 mian、GC
线程:开了一个进程 Typora,写字,自动保存(线程负责的)
对于Java而言:Thread、Runnable、Callable
Java 真的可以开启线程吗? 开不了
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31public synchronized void start() {
/**
* This method is not invoked for the main method thread or "system"
* group threads created/set up by the VM. Any new functionality added
* to this method in the future may have to also be added to the VM.
* *
A zero status value corresponds to state "NEW".
*/
if (threadStatus != 0)
throw new IllegalThreadStateException();
/* Notify the group that this thread is about to be started
* so that it can be added to the group's list of threads
* and the group's unstarted count can be decremented. */
group.add(this);
boolean started = false;
try {
start0();
started = true;
} finally {
try {
if (!started) {
group.threadStartFailed(this);
}
} catch (Throwable ignore) {
/* do nothing. If start0 threw a Throwable then
it will be passed up the call stack */
}
}
}
// 本地方法,底层的C++ ,Java 无法直接操作硬件
private native void start0();
1.3 线程状态
1 | //Thread.State |
1.4 wait/sleep的区别
- 来自不同的类
wait => Object
sleep => Thread - 关于锁的释放
wait 会释放锁,sleep 睡觉了,抱着锁睡觉,不会释放! - 使用的范围是不同的
wait 必须在同步代码块中使用
sleep 可以再任何地方睡 - 是否需要捕获异常
wait 不需要捕获异常
sleep 必须要捕获异常
1.5 什么是并发 什么是并行
并发:同一时刻多个线程在访问同一个资源,多个线程对一个点
例子:小米9今天上午10点,限量抢购
春运抢票
电商秒杀…
并行:多项工作一起执行,之后再汇总
例子:泡方便面,电水壶烧水,一边撕调料倒入桶中
2、Lock接口
传统 Synchronized
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40// 基本的卖票例子
import java.time.OffsetDateTime;
/**
* 多线程编程 企业级开发套路 模板
* 在高内聚低耦合前提下,线程 操作(对外暴露的调用方法) 资源类
*/
public class SaleTicketDemo01 {
public static void main(String[] args) {
// 并发:多线程操作同一个资源类, 把资源类丢入线程
Ticket ticket = new Ticket();
// @FunctionalInterface 函数式接口,jdk1.8 lambda表达式 (参数)->{ 代码 }
new Thread(()->{
for (int i = 1; i < 40 ; i++) {
ticket.sale();
}
},"A").start();
new Thread(()->{
for (int i = 1; i < 40 ; i++) {
ticket.sale();
}
},"B").start();
new Thread(()->{
for (int i = 1; i < 40 ; i++) {
ticket.sale();
}
},"C").start();
}
}
// 资源类 OOP
class Ticket {
// 属性、方法
private int number = 30;
// 卖票的方式
// synchronized 本质: 队列,锁
public synchronized void sale(){
if (number>0){
System.out.println(Thread.currentThread().getName()+"卖出了"+(number--)+"票,剩余:"+number);
}
}
}Lock 接口
锁实现提供了比使用同步方法和语句可以获得的更广泛的锁操作。它们允许更灵活的结构,可能具有非常不同的属性,并且可能支持多个关联的条件对象。
Lock接口的实现 ReentrantLock可重入锁
公平锁:十分公平:可以先来后到
非公平锁:十分不公平:可以插队 (默认)
代码实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class SaleTicketDemo02 {
public static void main(String[] args) {
// 并发:多线程操作同一个资源类, 把资源类丢入线程
Ticket2 ticket = new Ticket2();
// @FunctionalInterface 函数式接口,jdk1.8 lambda表达式 (参数)->{ 代码 }
new Thread(()->{for (int i = 1; i < 40 ; i++)
ticket.sale();},"A").start();
new Thread(()->{for (int i = 1; i < 40 ; i++)
ticket.sale();},"B").start();
new Thread(()->{for (int i = 1; i < 40 ; i++)
ticket.sale();},"C").start();
}
}
// Lock三部曲
// 1、 new ReentrantLock();
// 2、 lock.lock(); // 加锁
// 3、 finally=> lock.unlock(); // 解锁
class Ticket2 {
// 属性、方法
private int number = 30;
private Lock lock = new ReentrantLock();
public void sale(){
lock.lock(); // 加锁
try {
// 业务代码
if (number>0){
System.out.println(Thread.currentThread().getName()+"卖出了"+(number--)+"票,剩余:"+number);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock(); // 解锁
}
}
}
Synchronized 和 Lock 区别
- Synchronized 内置的Java关键字, Lock 是一个Java类
- Synchronized 无法判断获取锁的状态,Lock 可以判断是否获取到了锁
- Synchronized 会自动释放锁,lock 必须要手动释放锁!如果不释放锁,死锁
- Synchronized 线程 1(获得锁,阻塞)、线程2(等待,傻傻的等);Lock锁就不一定会等待下去;
- Synchronized 可重入锁,不可以中断的,非公平;Lock ,可重入锁,可以 判断锁,非公平(可以自己设置);
- Synchronized 适合锁少量的代码同步问题,Lock 适合锁大量的同步代码!
3、Java8之lambda表达式复习
函数式接口
lambda表达式,必须是函数式接口,必须只有一个方法
如果接口只有一个方法java默认它为函数式接口。
为了正确使用Lambda表达式,需要给接口加个注解:@FunctionalInterface
如有两个方法,立刻报错
接口里是否能有实现方法?
default方法
1
2
3
4
5//接口里在java8后容许有接口的实现,default方法默认实现
default int div(int x,int y) {
return x/y;
}
//接口里default方法可以有几个? 多个静态方法实现
1
2
3
4
5
6//静态方法实现:接口新增
public static int sub(int x,int y){
return x-y;
}
//可以有几个? 多个
//注意静态的叫类方法,能用foo去调吗? 要改成aFoo
代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
interface Foo{
// public void sayHello() ;
// public void say886() ;
public int add(int x,int y);
default int div(int x,int y) {
return x/y;
}
public static int sub(int x,int y) {
return x-y;
}
}
/**
*
* @Description: Lambda Express-----> 函数式编程
* 1 拷贝小括号(形参列表),写死右箭头 ->,落地大括号 {方法实现}
* 2 有且只有一个public方法@FunctionalInterface注解增强定义
* 3 default方法默认实现
* 4 静态方法实现
*/
public class LambdaDemo
{
public static void main(String[] args)
{
// Foo foo = new Foo() {
// @Override
// public void sayHello() {
// System.out.println("Hello!!");
// }
//
// @Override
// public void say886() {
// // TODO Auto-generated method stub
//
// }
// };
// foo.sayHello();
// System.out.println("============");
// foo = ()->{System.out.println("Hello!! lambda !!");};
// foo.sayHello();
Foo foo = (x,y)->{
System.out.println("Hello!! lambda !!");
return x+y;
};
int result = foo.add(3,5);
System.out.println("******result="+result);
System.out.println("******result div="+foo.div(10, 2));
System.out.println("******result sub="+Foo.sub(10, 2));
}
}
4、线程间通信(生产者和消费者问题)
生产者和消费者问题 Synchronized 版
虚假唤醒 (if 改为 while 判断) :
换成4个线程会导致错误
原因:在java多线程判断时,不能用if,程序出事出在了判断上面,突然有一添加的线程进到if了,突然中断了交出控制权,没有进行验证,而是直接走下去了,加了两次,甚至多次
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78package com.ming.pc;
/**
* 线程之间的通信问题 生产者和消费者问题 等待唤醒 通知唤醒
* 线程交替执行 A B 操作同一个变量
*/
public class A {
public static void main(String[] args) {
Data data = new Data();
new Thread(()->{
for (int i = 0; i < 10; i++) {
try {
data.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"A").start();
new Thread(()->{
for (int i = 0; i < 10; i++) {
try {
data.decrement();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"B").start();
new Thread(()->{
for (int i = 0; i < 10; i++) {
try {
data.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"C").start();
new Thread(()->{
for (int i = 0; i < 10; i++) {
try {
data.decrement();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"D").start();
}
}
//判断等待 业务 通知
class Data{//数字 资源类
private int number = 0;
//+1
public synchronized void increment() throws InterruptedException {
//防止虚假唤醒
while (number>=1){
//等待
this.wait();
}
number++;
System.out.println(Thread.currentThread().getName()+"===>"+number);
//通知
this.notifyAll();
}
//-1
public synchronized void decrement() throws InterruptedException {
while (number<=0){
//等待
this.wait();
}
number--;
System.out.println(Thread.currentThread().getName()+"===>"+number);
//通知
this.notifyAll();
}
}JUC版的生产者和消费者问题
通过Lock 找到 Condition
代码实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97package com.ming.pc;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* 线程之间的通信问题 生产者和消费者问题 等待唤醒 通知唤醒
* 线程交替执行 A B 操作同一个变量
*/
public class B {
public static void main(String[] args) {
Data2 data = new Data2();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
data.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "A").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
data.decrement();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "B").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
data.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "C").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
data.decrement();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "D").start();
}
}
//判断等待 业务 通知
class Data2 {//数字 资源类
private int number = 0;
private Lock lock = new ReentrantLock();
private Condition condition = lock.newCondition();
//+1
public void increment() throws InterruptedException {
lock.lock();
try {
//防止虚假唤醒
while (number >= 1) {
//等待
condition.await();//判断等待
}
number++;
System.out.println(Thread.currentThread().getName() + "===>" + number);
//通知
condition.signalAll();//唤醒
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
//-1
public void decrement() throws InterruptedException {
lock.lock();
try {
while (number <= 0) {
//等待
condition.await();
}
number--;
System.out.println(Thread.currentThread().getName() + "===>" + number);
//通知
condition.signalAll();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
5、线程间定制化调用通信
Condition 精准的通知和唤醒线程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85package com.ming.pc;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* A执行完调用B再调用C再调用A
*/
public class C {
public static void main(String[] args) {
Data3 data = new Data3();
new Thread(() -> {
for (int i = 0; i < 10; i++) data.printA();
}, "A").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
data.printB();
}
}, "B").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
data.printC();
}
}, "C").start();
}
}
//判断等待 业务 通知
class Data3 {//数字 资源类
private int number = 1; //标识位
private Lock lock = new ReentrantLock();
private Condition condition1 = lock.newCondition();//用于同步的信号量
private Condition condition2 = lock.newCondition();
private Condition condition3 = lock.newCondition();
public void printA() {
lock.lock();
try {
while (number != 1) { //判断标志位
condition1.await();
}
//干活
System.out.println(Thread.currentThread().getName() + "AAAAAAAAAAAA");
number = 2; //修改标志位,通知下一个
condition2.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void printB() {
lock.lock();
try {
while (number != 2) {
condition2.await();
}
System.out.println(Thread.currentThread().getName() + "BBBBBBBBBBB");
number = 3;
condition3.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void printC() {
lock.lock();
try {
while (number != 3) {
condition3.await();
}
System.out.println(Thread.currentThread().getName() + "CCCCCCCCCCCCC");
number = 1;
condition1.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
6:多线程锁(8锁解析)
8锁,就是关于锁的8个问题
标准情况下,两个线程先打印 发短信还是 打电话? 发短信
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34package com.ming.lock8;
import java.util.concurrent.TimeUnit;
public class Test1 {
public static void main(String[] args) {
Phone phone = new Phone();
//锁的存在
new Thread(() -> {
phone.sendSms();
}, "A").start();
// 捕获
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread (() -> {
phone.call();
}, "B").start();
}
}
class Phone {
// synchronized 锁的对象是方法的调用者!、
// 两个方法用的是同一个锁,谁先拿到谁执行!
public synchronized void sendSms() {
System.out.println("发短信");
}
public synchronized void call() {
System.out.println("打电话");
}
}sendSms延迟4秒,两个线程先打印 发短信还是 打电话? 发短信
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16class Phone {
// synchronized 锁的对象是方法的调用者!、
// 两个方法用的是同一个锁,谁先拿到谁执行!
public synchronized void sendSms() {
try {
TimeUnit.SECONDS.sleep(4);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("发短信");
}
public synchronized void call() {
System.out.println("打电话");
}
}增加了一个普通方法后!先执行发短信还是Hello? 普通方法(先Hello)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43package com.ming.lock8;
import java.util.concurrent.TimeUnit;
public class Test2 {
public static void main(String[] args) {
Phone2 phone = new Phone2();
//锁的存在
new Thread(() -> {
phone.sendSms();
}, "A").start();
// 捕获
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(() -> {
phone.hello();
}, "B").start();
}
}
class Phone2 {
// synchronized 锁的对象是方法的调用者!
public synchronized void sendSms() {
try {
TimeUnit.SECONDS.sleep(4);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("发短信");
}
public synchronized void call() {
System.out.println("打电话");
}
//这里没有锁!不是同步方法,不受锁的影响
public void hello() {
System.out.println("hello");
}
}两个对象,两个同步方法, 发短信还是 打电话? 打电话
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45package com.ming.lock8;
import java.util.concurrent.TimeUnit;
public class Test2 {
public static void main(String[] args) {
// 两个对象,两个调用者,两把锁!
Phone2 phone1 = new Phone2();
Phone2 phone2 = new Phone2();
//锁的存在
new Thread(() -> {
phone1.sendSms();
}, "A").start();
// 捕获
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(() -> {
phone2.call();
}, "B").start();
}
}
class Phone2 {
// synchronized 锁的对象是方法的调用者!
public synchronized void sendSms() {
try {
TimeUnit.SECONDS.sleep(4);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("发短信");
}
public synchronized void call() {
System.out.println("打电话");
}
//这里没有锁!不是同步方法,不受锁的影响
public void hello() {
System.out.println("hello");
}
}增加两个静态的同步方法,只有一个对象,先打印 发短信?打电话? 发短信
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39public class Test3 {
public static void main(String[] args) {
// 两个对象的Class类模板只有一个,static,锁的是Class
Phone3 phone = new Phone3();
//锁的存在
new Thread(() -> {
phone.sendSms();
}, "A").start();
// 捕获
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(() -> {
phone.call();
}, "B").start();
}
}
// Phone3唯一的一个 Class 对象
class Phone3 {
// synchronized 锁的对象是方法的调用者!
// static 静态方法
// 类一加载就有了!锁的是Class
public static synchronized void sendSms() {
try {
TimeUnit.SECONDS.sleep(4);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("发短信");
}
public static synchronized void call() {
System.out.println("打电话");
}
}两个对象!增加两个静态的同步方法, 先打印 发短信?打电话? 发短信
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40public class Test3 {
public static void main(String[] args) {
// 两个对象的Class类模板只有一个,static,锁的是Class
Phone3 phone1 = new Phone3();
Phone3 phone2 = new Phone3();
//锁的存在
new Thread(() -> {
phone1.sendSms();
}, "A").start();
// 捕获
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(() -> {
phone2.call();
}, "B").start();
}
}
// Phone3唯一的一个 Class 对象
class Phone3 {
// synchronized 锁的对象是方法的调用者!
// static 静态方法
// 类一加载就有了!锁的是Class
public static synchronized void sendSms() {
try {
TimeUnit.SECONDS.sleep(4);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("发短信");
}
public static synchronized void call() {
System.out.println("打电话");
}
}1个静态的同步方法,1个普通的同步方法 ,一个对象,先打印 发短信?打电话? 打电话
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36public class Test4 {
public static void main(String[] args) {
// 两个对象的Class类模板只有一个,static,锁的是Class
Phone4 phone = new Phone4();
//锁的存在
new Thread(() -> {
phone.sendSms();
}, "A").start();
// 捕获
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(() -> {
phone.call();
}, "B").start();
}
}
// Phone3唯一的一个 Class 对象
class Phone4 {
// 静态的同步方法 锁的是 Class 类模板
public static synchronized void sendSms() {
try {
TimeUnit.SECONDS.sleep(4);
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("发短信");
}
public synchronized void call() {
System.out.println("打电话");
}
}1个静态的同步方法,1个普通的同步方法 ,两个对象,先打印 发短信?打电话? 打电话
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37public class Test4 {
public static void main(String[] args) {
// 两个对象的Class类模板只有一个,static,锁的是Class
Phone4 phone1 = new Phone4();
Phone4 phone2 = new Phone4();
//锁的存在
new Thread(() -> {
phone1.sendSms();
}, "A").start();
// 捕获
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(() -> {
phone2.call();
}, "B").start();
}
}
// Phone3唯一的一个 Class 对象
class Phone4 {
// 静态的同步方法 锁的是 Class 类模板
public static synchronized void sendSms() {
try {
TimeUnit.SECONDS.sleep(4);
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("发短信");
}
public synchronized void call() {
System.out.println("打电话");
}
}
7、NotSafeDemo
7.1 List 不安全
1 | package com.ming.unsafe; |
CopyOnWrite容器即写时复制的容器。往一个容器添加元素的时候,不直接往当前容器Object[]添加,而是先将当前容器Object[]进行Copy,复制出一个新的容器Object[] newElements,然后向新的容器Object[] newElements里添加元素。
添加元素后,再将原容器的引用指向新的容器setArray(newElements)。这样做的好处是可以对CopyOnWrite容器进行并发的读,而不需要加锁,因为当前容器不会添加任何元素。所以CopyOnWrite容器也是一种读写分离的思想,读和写不同的容器。
1 | //CopyOnWriteArrayList 的add方法 |
7.2 Set 不安全
1 | package com.ming.unsafe; |
hashSet 底层是什么?
1
2
3
4
5
6
7
8
9
10public HashSet() {
map = new HashMap<>();
}
// add set 本质就是 map key是无法重复的!
public boolean add(E e) {
return map.put(e, PRESENT)==null;
}
private static final Object PRESENT = new Object(); // 不变的值!
7.3 Map 不安全
回顾Map基本操作 (Node类型的 数组 链表 红黑树)
1 | package com.ming.unsafe; |
8、Callable接口
- 可以有返回值
- 可以抛出异常
- 方法不同,run()/ call()
代码测试
1 | package com.ming.callable; |
细节:
- 有缓存
- 结果可能需要等待,会阻塞!
9、JUC强大的辅助类
9.1 CountDownLatch
1 | package com.ming.add; |
原理:
countDownLatch.countDown();
// 数量-1countDownLatch.await();
// 等待计数器归零,然后再向下执行每次有线程调用 countDown() 数量-1,假设计数器变为0,countDownLatch.await() 就会被唤醒,继续执行!
9.2 CyclicBarrier(加法计数器)
1 | package com.ming.add; |
9.3 Semaphore
Semaphore:信号量
抢车位! 6车—3个停车位置
1 | package com.ming.add; |
原理:
semaphore.acquire()
获得,假设如果已经满了,等待,等待被释放为止!semaphore.release()
释放,会将当前的信号量释放 + 1,然后唤醒等待的线程!- 作用: 多个共享资源互斥的使用!并发限流,控制最大的线程数!
10、ReadWriteLock 读写锁
1 | package com.ming.rwlock; |
11、BlockingQueue 阻塞队列
四组API
方式 | 抛出异常 | 有返回值,不抛出异常 | 阻塞 等待 | 超时等待 |
---|---|---|---|---|
添加 | add | offer() | put() | offer(,,) |
移除 | remove | poll() | take() | poll(,) |
检测队首元素 | element | peek | - | - |
1 | /** |
1 | /** |
1 | /** |
1 | /** |
SynchronousQueue 同步队列
没有容量,进去一个元素,必须等待取出来之后,才能再往里面放一个元素!
put、take
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43package com.ming.bq;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
/**
* 同步队列
* 和其他的BlockingQueue 不一样, SynchronousQueue 不存储元素
* put了一个元素,必须从里面先take取出来,否则不能在put进去值!
*/
public class SynchronousQueueDemo {
public static void main(String[] args) {
SynchronousQueue<String> queue = new SynchronousQueue<>();
new Thread(() -> {
try {
System.out.println(Thread.currentThread().getName() + "put 1");
queue.put("1");
System.out.println(Thread.currentThread().getName() + "put 2");
queue.put("2");
System.out.println(Thread.currentThread().getName() + "put 3");
queue.put("3");
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "T1").start();
new Thread(() -> {
try {
TimeUnit.SECONDS.sleep(3);
System.out.println(Thread.currentThread().getName() + "get 1");
queue.take();
TimeUnit.SECONDS.sleep(3);
System.out.println(Thread.currentThread().getName() + "get 2");
queue.take();
TimeUnit.SECONDS.sleep(3);
System.out.println(Thread.currentThread().getName() + "get 3");
queue.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "T2").start();
}
}
12、ThreadPool线程池
线程池:三大方法、7大参数、4种拒绝策略
程序的运行,本质:占用系统的资源! 优化资源的使用!=>池化技术
线程池、连接池、内存池、对象池///….. 创建、销毁。十分浪费资源
池化技术:事先准备好一些资源,有人要用,就来我这里拿,用完之后还给我。
线程池的好处:
- 降低资源的消耗
- 提高响应的速度
- 方便管理。
线程复用、可以控制最大并发数、管理线程
线程池:三大方法
1 | package com.ming.pool; |
7大参数
源码分析
1 | public static ExecutorService newSingleThreadExecutor() { |
手动创建一个线程池
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27public static void main(String[] args) {
// 自定义线程池!工作 ThreadPoolExecutor
ExecutorService threadPool = new ThreadPoolExecutor(
2,
5,
3,
TimeUnit.SECONDS,
new LinkedBlockingDeque<>(3), //这里一定要写参数 不然默认Integer.MAX_VALUE
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.DiscardOldestPolicy() //队列满了,尝试去和最早的竞争,也不会抛出异常!
);
try {
// 最大承载:Deque + max
// 超过 RejectedExecutionException
for (int i = 1; i <= 9; i++) {
// 使用了线程池之后,使用线程池来创建线程
threadPool.execute(() -> {
System.out.println(Thread.currentThread().getName() + " ok");
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
// 线程池用完,程序结束,关闭线程池
threadPool.shutdown();
}
}
4种拒绝策略
1 | /** |
小结和拓展
1 | 1、在创建了线程池后,开始等待请求。 |
池的最大的大小如何去设置? 了解:IO密集型,CPU密集型:(调优)
CPU密集型
尽量使用较小的线程池,一般Cpu核心数+1
因为CPU密集型任务CPU的使用率很高,若开过多的线程,只能增加线程上下文的切换次数,带来额外的开销
IO密集型
方法一:可以使用较大的线程池,一般CPU核心数 * 2
IO密集型CPU使用率不高,可以让CPU等待IO的时候处理别的任务,充分利用cpu时间
方法二:线程等待时间所占比例越高,需要越多线程。线程CPU时间所占比例越高,需要越少线程。
最佳线程数目 = (线程等待时间与线程CPU时间之比 + 1)* CPU数目
混合型
可以将任务分为CPU密集型和IO密集型,然后分别使用不同的线程池去处理,按情况而定
1 | public static void main(String[] args) { |
(附加)四大函数式接口(必需掌握)
新时代的程序员:lambda表达式、链式编程、函数式接口、Stream流式计算
函数式接口: 只有一个方法的接口
1
2
3
4
5
6
7
8
9
public interface Runnable {
public abstract void run();
}
// 泛型、枚举、反射
// lambda表达式、链式编程、函数式接口、Stream流式计算
// 超级多FunctionalInterface
// 简化编程模型,在新版本的框架底层大量应用!
// foreach(消费者类的函数式接口)Function函数式接口
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20package com.ming.function;
import java.util.function.Function;
/**
* Function Function 函数型接口, 有一个输入参数,有一个输出
* 只要是 函数型接口 可以 用 lambda表达式简化
*/
public class Demo01 {
public static void main(String[] args) {
// Function function = new Function<String,String>() {
// @Override
// public String apply(String str) {
// return str;
// }
// };
Function<String,String> function=(str)->{return str;};
System.out.println(function.apply("123"));
}
}断定型接口:有一个输入参数,返回值只能是 布尔值
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18package com.ming.function;
import java.util.function.Predicate;
public class Demo02 {
public static void main(String[] args) {
// Predicate<String> predicate = new Predicate<String>(){
// @Override
// public boolean test(String s) {
// return s.isEmpty();
// }
// };
Predicate<String> predicate =(s)->{
return s.isEmpty();
};
System.out.println(predicate.test("12"));
}
}Consumer 消费型接口
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21package com.ming.function;
import java.util.function.Consumer;
/**
* Consumer 消费型接口: 只有输入,没有返回值
*/
public class Demo03 {
public static void main(String[] args) {
// Consumer<String> consumer = new Consumer<String>() {
// @Override
// public void accept(String str) {
// System.out.println(str);
// }
// };
Consumer<String> consumer = (str) -> {
System.out.println(str);
};
consumer.accept("sdadasd");
}
}Supplier 供给型接口
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22package com.ming.function;
import java.util.function.Supplier;
/**
* Supplier 供给型接口 没有参数,只有返回值
*/
public class Demo04 {
public static void main(String[] args) {
// Supplier supplier = new Supplier<Integer>() {
// @Override
// public Integer get() {
// System.out.println("get()");
// return 1024;
// }
// };
Supplier supplier = () -> {
return 1024;
};
System.out.println(supplier.get());
}
}
13、Java8之Stream流式计算
流(Stream) 是什么
是数据渠道,用于操作数据源(集合、数组等)所生成的元素序列。
“集合讲的是数据,流讲的是计算!”
特点
- Stream 自己不会存储元素
- Stream 不会改变源对象。相反,他们会返回一个持有结果的新Stream。
- Stream 操作是延迟执行的。这意味着他们会等到需要结果的时候才执行。
阶段
- 创建一个Stream:一个数据源(数组、集合)
- 中间操作:一个中间操作,处理数据源数据
- 终止操作:一个终止操作,执行中间操作链,产生结果
大数据:存储 + 计算
集合、MySQL 本质就是存储东西的;
计算都应该交给流来操作!
1 | package com.ming.stream; |
14、分支合并框架(ForkJoin)
什么是 ForkJoin
ForkJoin 在 JDK 1.7 , 并行执行任务!提高效率。大数据量! 大数据:Map Reduce (把大任务拆分为小任务)
ForkJoin 特点:工作窃取
这个里面维护的都是双端队列
ForkJoin
代码实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45package com.ming.forkjoin;
import java.util.concurrent.RecursiveTask;
/**
* 求和计算的任务!
* 3000 6000(ForkJoin) 9000(Stream并行流)
* // 如何使用 forkjoin
* // 1、forkjoinPool 通过它来执行
* // 2、计算任务 forkjoinPool.execute(ForkJoinTask task)
* // 3. 计算类要继承 ForkJoinTask
*/
public class ForkJoinDemo extends RecursiveTask<Long> {
private Long start; // 1
private Long end; // 1990900000
// 临界值
private Long temp = 10000L;
public ForkJoinDemo(Long start, Long end) {
this.start = start;
this.end = end;
}
protected Long compute() {
if ((end - start) < temp) {
Long sum = 0L;
for (Long i = start; i <= end; i++) {
sum += i;
}
return sum;
} else { // forkjoin 递归
long middle = (start + end) / 2; // 中间值
ForkJoinDemo task1 = new ForkJoinDemo(start, middle);
task1.fork(); // 拆分任务,把任务压入线程队列
ForkJoinDemo task2 = new ForkJoinDemo(middle + 1, end);
task2.fork(); // 拆分任务,把任务压入线程队列
return task1.join() + task2.join();
}
}
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50package com.ming.forkjoin;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.stream.DoubleStream;
import java.util.stream.IntStream;
import java.util.stream.LongStream;
/**
* 同一个任务,别人效率高你几十倍!
*/
public class Test {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// test1(); // 4906
// test2(); // 3509
test3(); // 157
}
//普通程序员
public static void test1() {
Long sum = 0L;
long start = System.currentTimeMillis();
for (Long i = 1L; i <= 10_0000_0000; i++) {
sum += i;
}
long end = System.currentTimeMillis();
System.out.println("sum=" + sum + " 时间:" + (end - start));
}
//会使用ForkJoin
public static void test2() throws ExecutionException, InterruptedException {
long start = System.currentTimeMillis();
ForkJoinPool forkJoinPool = new ForkJoinPool();
ForkJoinTask<Long> task = new ForkJoinDemo(0L, 10_0000_0000L);
ForkJoinTask<Long> submit = forkJoinPool.submit(task);// 提交任务
Long sum = submit.get();
long end = System.currentTimeMillis();
System.out.println("sum=" + sum + " 时间:" + (end - start));
}
public static void test3() {
long start = System.currentTimeMillis();
// Stream并行流 () ()
long sum = LongStream.rangeClosed(0L,
10_0000_0000L).parallel().reduce(0, Long::sum);
long end = System.currentTimeMillis();
System.out.println("sum=" + sum + "时间:" + (end - start));
}
}
15、异步回调
Future 设计的初衷: 对将来的某个事件的结果进行建模
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50package com.ming.future;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
/**
* 异步调用: CompletableFuture
* // 异步执行
* // 成功回调
* // 失败回调
*/
public class Demo01 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 没有返回值的 runAsync 异步回调
// CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> {
// try {
// TimeUnit.SECONDS.sleep(2);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
// System.out.println(Thread.currentThread().getName() + "runAsync=>Void");
// });
//
// System.out.println("1111");
//
// completableFuture.get(); // 获取阻塞执行结果
// 有返回值的 supplyAsync 异步回调
// ajax,成功和失败的回调
// 返回的是错误信息
CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + "supplyAsync=>Integer");
int i = 10 / 0;
return 1024;
});
System.out.println(completableFuture.whenComplete((t, u) -> {
System.out.println("t=>" + t); // 正常的返回结果
System.out.println("u=>" + u); // 错误信息:java.util.concurrent.CompletionException: java.lang.ArithmeticException: /by zero
}).exceptionally((e) -> {
System.out.println(e.getMessage());
return 233; // 可以获取到错误的返回结果
}).get());
/**
* succee Code 200
* error Code 404 500
*/
}
}
16、JMM
JMM : Java内存模型,不存在的东西,概念!约定!
关于JMM的一些同步的约定:
- 线程解锁前,必须把共享变量立刻刷回主存。
- 线程加锁前,必须读取主存中的最新值到工作内存中!
- 加锁和解锁是同一把锁
线程 工作内存 、主内存
内存交互操作有8种,虚拟机实现必须保证每一个操作都是原子的,不可在分的(对于double和long类型的变量来说,load、store、read和write操作在某些平台上允许例外)
- lock (锁定):作用于主内存的变量,把一个变量标识为线程独占状态
- unlock (解锁):作用于主内存的变量,它把一个处于锁定状态的变量释放出来,释放后的变量才可以被其他线程锁定
- read (读取):作用于主内存变量,它把一个变量的值从主内存传输到线程的工作内存中,以便随后的load动作使用
- load (载入):作用于工作内存的变量,它把read操作从主存中变量放入工作内存中
- use (使用):作用于工作内存中的变量,它把工作内存中的变量传输给执行引擎,每当虚拟机遇到一个需要使用到变量的值,就会使用到这个指令
- assign (赋值):作用于工作内存中的变量,它把一个从执行引擎中接受到的值放入工作内存的变量副本中store (存储):作用于主内存中的变量,它把一个从工作内存中一个变量的值传送到主内存中,以便后续的write使用
- write (写入):作用于主内存中的变量,它把store操作从工作内存中得到的变量的值放入主内存的变量中
JMM对这八种指令的使用,制定了如下规则:
- 不允许read和load、store和write操作之一单独出现。即使用了read必须load,使用了store必须write
- 不允许线程丢弃他最近的assign操作,即工作变量的数据改变了之后,必须告知主存
- 不允许一个线程将没有assign的数据从工作内存同步回主内存
- 一个新的变量必须在主内存中诞生,不允许工作内存直接使用一个未被初始化的变量。就是怼变量实施use、store操作之前,必须经过assign和load操作
- 一个变量同一时间只有一个线程能对其进行lock。多次lock后,必须执行相同次数的unlock才能解锁
- 如果对一个变量进行lock操作,会清空所有工作内存中此变量的值,在执行引擎使用这个变量前,必须重新load或assign操作初始化变量的值
- 如果一个变量没有被lock,就不能对其进行unlock操作。也不能unlock一个被其他线程锁住的变量
- 对一个变量进行unlock操作之前,必须把此变量同步回主内存
17、Volatile
Volatile 是 Java 虚拟机提供轻量级的同步机制
1、保证可见性
2、不保证原子性
3、禁止指令重排
如果不加 lock 和 synchronized ,怎么样保证原子性
使用原子类,解决 原子性问题
1 | package com.ming.volatilet; |
见之前笔记
18、单例模式
见之前笔记
19、深入理解CAS
什么是CAS
compareAndSwap : 比较并交换!
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15import java.util.concurrent.atomic.AtomicInteger;
public class CASDemo {
// CAS compareAndSet : 比较并交换!
public static void main(String[] args) {
AtomicInteger atomicInteger = new AtomicInteger(2020);
// 期望、更新
// public final boolean compareAndSet(int expect, int update)
// 如果我期望的值达到了,那么就更新,否则,就不更新, CAS 是CPU的并发原语!
System.out.println(atomicInteger.compareAndSet(2020, 2021));
System.out.println(atomicInteger.get());
atomicInteger.getAndIncrement()
System.out.println(atomicInteger.compareAndSet(2020, 2021));
System.out.println(atomicInteger.get());
}
}Unsafe 类
CAS : 比较当前工作内存中的值和主内存中的值,如果这个值是期望的,那么则执行操作!如果不是就一直循环!
缺点:
- 循环会耗时
- 一次性只能保证一个共享变量的原子性
- ABA问题
ABA问题:狸猫换太子
1 | import java.util.concurrent.atomic.AtomicInteger; |
20、原子引用
解决ABA 问题,引入原子引用! 对应的思想:乐观锁!
带版本号 的原子操作!
1 | package com.ming.cas; |
注意:
Integer 使用了对象缓存机制,默认范围是 -128 ~ 127 ,推荐使用静态工厂方法 valueOf 获取对象实例,而不是 new,因为 valueOf 使用缓存,而 new 一定会创建新的对象分配新的内存空间;
21、各种锁的理解
1、公平锁、非公平锁
公平锁: 非常公平, 不能够插队,必须先来后到!
非公平锁:非常不公平,可以插队 (默认都是非公平)
1 | public ReentrantLock() { |
2、可重入锁(递归锁)
Synchronized
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28// 可重入锁
// Synchronized
public class Demo01 {
public static void main(String[] args) {
Phone phone = new Phone();
new Thread(()->{
phone.sms();
},"A").start();
new Thread(()->{
phone.sms();
},"B").start();
}
}
class Phone{
public synchronized void sms(){
System.out.println(Thread.currentThread().getName() + "sms");
call(); // 这里也有锁
}
public synchronized void call(){
System.out.println(Thread.currentThread().getName() + "call");
}
}Lock 版
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49package com.ming.lock;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
// 可重入锁
public class Demo02 {
public static void main(String[] args) {
Phone2 phone = new Phone2();
new Thread(() -> {
phone.sms();
}, "A").start();
new Thread(() -> {
phone.sms();
}, "B").start();
}
}
class Phone2 {
Lock lock = new ReentrantLock();
public void sms() {
lock.lock(); // 细节问题:lock.lock(); lock.unlock(); // lock 锁必须配对,否则就会死在里面
lock.lock();
try {
System.out.println(Thread.currentThread().getName() + "sms");
call(); // 这里也有锁
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
lock.unlock();
}
}
public void call() {
lock.lock();
try {
System.out.println(Thread.currentThread().getName() + "call");
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
3、自旋锁
spinlock
1 | package com.ming.lock; |
1 | public static void main(String[] args) throws InterruptedException { |
4、死锁
代码测试
1 | package com.kuang.lock; |
解决问题
1、使用 jps -l
定位进程号
2、使用jstack 进程号
找到死锁问题