一、chapter2:线程安全性
1、安全性概念
当对象在多线程并发条件下进行访问的时候,能保证自身状态的正确性。
当数据被多线程访问并且执行并发读写操作的时候,如果没有足够的可靠性保证,对象本身状态容易发生不安全的变化。
2、原子性
- 概念:当一个操作不能单次完成,需要由多个操作组合而成,是可以进行分割的,对外而言,这个操作就非原子性质的。比如执行i++的操作,需要由读取数据、修改数据、写入数据的操作序列组合而成,并不是一个原子性的操作。
当多个线程执行i++的操作,比如A线程读取i的值为1,修改i为2的时候让出CPU的执行权,由线程B进行执行,此时B读取到的还是1,然后分别执行操作,就会导致双方操作后少加了一次数的问题,此时线程并不安全。
- 竞态条件:当某个计算的正确性与否取决于多个线程的交替执行时序,就会发生竞态条件。比如如下代码:
1 | public class LazyInitInstance { |
3、加锁机制(如何避免被同时访问)
为什么需要加锁:上面已经说到原子性是实现多线程并发访问的时候的一个必要条件,Java提供了一个内置的锁支持原子性,也就是同步代码块
同步代码块:同步代码块的使用姿势很简单,在方法块加上synchronized修饰即可,而synchronized实际上是一个内置锁,线程进入同步代码块的时候先获取锁,在推出的时候释放锁。因为Java的内置锁实际上是一个互斥锁(也是一个可重入锁),因为当某个线程已经获取锁的情况下,如果另外的线程尝试获取锁会进行阻塞或者等待的操作。而因为每次只能有一个线程获取锁并执行,因此同步代码块中的执行实际上就变成了一个原子操作,因此是线程安全的。
可重入锁:当一个线程请求另外的线程持有的锁的时候会进行阻塞,当时当持有锁的线程请求自己的锁的时候,是不会有问题的,否则会引起死锁。代码如下:
1 | public class SynchronizedDemo { |
二、chapter3:对象共享
1、可见性
上面的加锁实际上是避免了对象被同时访问从而保证了线程安全性。接下来如何保证对象在可被同时访问的情况下线程的安全性变得至关重要。
在Java的内存模型中,具体可以参考Java内存模型,在Java的内存模型中定义了主内存以及工作内存的概念,因此如果主内存跟工作内存中的数据无法进行实时同步的一个机制保证的情况下,在不同的线程中的数据是彼此不一致的,也就是不可见的现象,比如A线程修改了数据,但是因为CPU缓存的原因,无法被其他线程看到,那么其他线程的数据就会失效,而对失效数据的操作也就失去了意义。
- synchronized:通过在工作线程的数据修改完成后需要同步到主内存以及在进入同步代码块的时候数据需要从主内存同步的约定保证数据的可见性。
- volatile: 通过每个原子性操作的数据变更都会同步到主内存的操作保证数据的可靠性。
2、线程封闭
既然多线程访问共享的数据会导致数据的不可靠,那如果单个线程的数据都封闭在单个线程中呢?
- 栈封闭:线程内部使用,也就是使用局部变量,对象不进行发布,不逸出。
- ThreadLocal:ThreadLocal用于线程隔离,当不同线程的数据不进行相互影响的情况下,使用ThreadLocal是一个非常规范的方式。ThreadLocal的实现大概是
3、不变性
那如果访问的数据不可变,不也是安全的么?
- 对象创建之后便不能修改
- 对象的所有域都是final类型
- 对象是正确创建的,没有产生逸出的情况
三、chapter5:基础构建模块
1、同步容器类
- 概念:同步容器类是通过将他们的状态封装起来,并对所有的公有方法使用synchronized进行同步,使得每次只能有一个线程持有锁进行访问。
- 常见的同步容器类:Vector、HashTable或者Collections的synchronizedXXX等产生的同步类。
- 同样存在的问题:同步容器类是线程安全的,但是当同步容器类被不同的线程并发访问执行读写操作的时候依然存在相互影响的可能。比如获取size的之后去get(size)的同时另外的线程删除了容器中的某个值的时候,就会发生角标越界的case。那只能在修改的过程中进行加锁解决线程安全问题,但是在高并发情况下,加锁之后使得系统的并发能力下降,因为比较折中的解决办法就是将容器进行克隆,并在副本上进行迭代。
2、并发容器
- 同步容器类的问题:同步容器类是通过将所有对容器状态的访问进行串行化,实现线程安全性,这种实现方式的代价就是严重降低并发性,当多个线程竞争容器的锁的时候,吞吐量会急剧下降。
- 类型
- ConcurrentHashMap:通过使用分段锁的方式进行加锁,任意数量的读取线程可以并发的访问Map,执行读取操作的线程以及执行写入操作的线程可以并发的访问Map。因为CouncurrentHashMap无法通过加锁进行独占访问,因为客户端是无法进行加锁创建新的原子操作的。所以ConcurrentHashMap提供了一些常见的原子性的复合操作,比如putIfAbsent等等。
- CopyOnWriteArrayList:CopyOnWirteArrayList的实现方式是在写操作的时候进行加锁,同时将自身的数据复制一份,同时对复制的数据进行修改,修改完成之后,再进行同步,因此适合读多写少的情况,存在一定的并发情况下,读取的数据不是最新的可能性,因此如果需要保证读取的数据都是最新的,那么不推荐使用CopyOnWrite容器。
3、同步工具类
闭锁:闭锁的作用在于相当于一个等待开关,在闭锁未满足结束条件的时候会执行阻塞式等待结果完成。
- CountDownLatch:CoutDownLatch是一种灵活的闭锁实现,它可以使一个或者多个线程在等待一组事件的完成。闭锁状态包括一个计数器,该计数器表示需要等待的事件数量,countDown方法递减计数器,表示有一个事件发生,而await方法是一个阻塞操作,它会等待所有需要发生的事件完成才会继续向下执行。闭锁的好处在于如果以前我们是单线程串行执行,然后将所有结果整合在一起的过程可以变成并发的操作再整合,可以大幅提高程序的性能。
代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40public class CountDownLatchLock {
private static CountDownLatch countDownLatch = new CountDownLatch(4);
public static void main(String[] args) {
Thread thread1 = new Thread(new MyRunnable());
Thread thread2 = new Thread(new MyRunnable());
Thread thread3 = new Thread(new MyRunnable());
Thread thread4 = new Thread(new MyRunnable());
thread1.start();
thread2.start();
thread3.start();
thread4.start();
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("count down latch left :" + countDownLatch.getCount() + " and all task is done");
}
static class MyRunnable implements Runnable {
@Override
public void run() {
try {
Integer time = new Random().nextInt(10) * 1000;
Thread.sleep(time);
System.out.println(Thread.currentThread().getName() + " sleep " + time + "ms");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("count down latch left :" + countDownLatch.getCount());
countDownLatch.countDown();
}
}
}
}
FutureTask:FutureTask跟CountDownLatch类似,FutureTask是通过调用get方法的时候进行阻塞,实现方式如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43public class FutureTaskLock {
private static final FutureTask<String> futureTask = new FutureTask<String>(new Callable<String>() {
@Override
public String call() throws Exception {
try {
Thread.sleep(new Random().nextInt(5000));
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("task1 is finished");
return "task1 is finished";
}
});
private static final FutureTask<String> futureTask2 = new FutureTask<String>(new Callable<String>() {
@Override
public String call() throws Exception {
try {
Thread.sleep(new Random().nextInt(5000));
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("task2 is finished");
return "task2 is finished";
}
});
public static void main(String[] args) {
Thread thread = new Thread(futureTask);
Thread thread1 = new Thread(futureTask2);
thread.start();
thread1.start();
System.out.println("start time is " + System.currentTimeMillis() + "ms");
try {
futureTask.get();
futureTask2.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
System.out.println("task is over,end time is " + System.currentTimeMillis() + "ms");
}
信号量Semaphore
信号量用于控制同时访问某个特定资源的操作数量。Semaphore存放的是许可permit,在执行操作前先获取许可,在执行操作后释放许可,如果没有许可,Semaphore的acquire就会一直阻塞到有许可,release表示执行完毕返回许可给信号量。(注意:看起来是不是又是另外一种互斥锁的实现方式),使用姿势如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29public class SemaphoreLock {
static Semaphore semaphore = new Semaphore(3);
static class MyRunnable implements Runnable {
@Override
public void run() {
try {
//从信号量中获取一个机会 如果没有则阻塞等待操作 如果获取得到就执行
System.out.println("left change:"+ semaphore.availablePermits());
semaphore.acquire();
System.out.println(Thread.currentThread().getName() + " start at " + System.currentTimeMillis());
Thread.sleep(1000);
System.out.println(Thread.currentThread().getName() + " stop at " + System.currentTimeMillis());
//释放机会
semaphore.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
for (int i = 0; i < 10; i++) {
Thread thread = new Thread(new MyRunnable());
thread.start();
}
}
}
四、chapter6:任务执行
1、单线程以及无限线程的问题
- 单线程:资源利用率低
- 无线线程:资源耗尽、线程开销
2、Executor框架
- 优点:Executor框架是一个典型的BlockingQueue的生产者消费者模型,提交任务的操作是生产者,执行任务的操作是消费者。
- 生命周期:因为Executor是一个异步执行的框架,任务一旦提交,何时执行是取决于有无空闲的线程,而不能被控制的,换句话而言,Executor何时结束是不可知的。
- 运行:创建成功就处于运行等待任务提交状态
- 关闭:调用shutdown表示处理完当前已经提交的任务后关闭,新来的任务不会接受。shutdownNow是暴力终止。
- 终止
实例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59public class TaskExecutorWebServer {
public static final int MAX_THREAD_COUNT = 10;
private static final ExecutorService executor = Executors.newFixedThreadPool(MAX_THREAD_COUNT);
public static void main(String[] args) throws Exception {
withReturnAndDo();
}
/**
* 需要返回值操作 使用callble+future 其实使用futuretask一样可以实现 使用Executor节省资源,降低不必要开支,增加管理
*/
private static void withReturnAndDo() {
Callable<String> callable = new Callable<String>() {
@Override
public String call() throws Exception {
return "i am well done";
}
};
Future<String> future = executor.submit(callable);
try {
String s = future.get();
//记得关掉
executor.shutdown();
System.out.println(s);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
/**
* 不需要返回值 直接使用runnable的方式
*/
private static void withNoReturn() {
for (int i = 0; i < 100; i++) {
Runnable runnable = new Runnable() {
@Override
public void run() {
try {
Thread.sleep(1000);
System.out.println("do the thing");
} catch (Exception e) {
System.out.println("do the thing error");
e.printStackTrace();
}
}
};
executor.execute(runnable);
}
}
private void shutdown() {
executor.shutdown();
}
}
思考:如果当有一个需求,页面是瀑布流,展示非常多的图片,任务分为两部分,一部分是渲染文本,一个是下载图片并渲染的过程。
- 方案:先渲染文本、再执行下载图片的操作,这样文本先显示,图片等下载完显示。
- 改进step1:渲染文本的同时看开启另外的线程,下载完所有图片并显示。
- 改进step2:step1的作用微乎其微,因为渲染文本是一个很快的操作,所以效果将差不多。因此如果增加Future以及Callable的方式将请求提交到Executor中,比如设置为3个线程,那么速度能提高不少。当时又带来新的问题,每个submit提交的future的get返回值的操作是阻塞操作,当第一个future阻塞的时候,其他的future就算已经有返回值,那还是无法进行渲染。
- 改进step3:如果想要提交给线程池中的task能够及时返回结果,while循环看起来是一个不错的主意,不过代码看起来会比较难受,因此可以使用CompletionService, CompletionService将Executor与BlockingQueue结合在一起,当调用take的时候,能返回已经执行完的future,这样就能做到及时返回结果,渲染图片。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70public class CompletionServiceWeb {
static List<Task> taskList = new ArrayList<>();
public static void init() {
taskList.add(new Task2());
taskList.add(new Task2());
taskList.add(new Task1());
taskList.add(new Task1());
}
public static void testCompletionService(ExecutorService executor) {
CompletionService<String> completionService = new ExecutorCompletionService<String>(executor);
for (final Task task : taskList) {
completionService.submit(new Callable<String>() {
@Override
public String call() throws Exception {
// System.out.println(task.testTask());
return task.testTask();
}
});
}
for (int i = 0; i < taskList.size(); i++) {
try {
Future<String> future = completionService.take();
String s = future.get();
System.out.println(s);
} catch (Exception e) {
e.printStackTrace();
}
}
executor.shutdown();
}
interface Task {
String testTask();
}
static class Task1 implements Task {
@Override
public String testTask() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Thread:" + Thread.currentThread().getName() + " task1 is start");
return "Thread:" + Thread.currentThread().getName() + " task1 is end";
}
}
static class Task2 implements Task {
@Override
public String testTask() {
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Thread:" + Thread.currentThread().getName() + " task2 is start");
return "Thread:" + Thread.currentThread().getName() + " task2 is end";
}
}
public static void main(String[] args) {
init();
ExecutorService executorService = Executors.newFixedThreadPool(5);
testCompletionService(executorService);
}
}
- 总结
- 如果使用Runnable请转成Callable,Callable是一个具有返回值的特殊的Runnable
- 如果大量使用Thread(Runnable).start(),可以换成Executor框架代替
- 如果各个任务之间互相独立,返回值一致的情况下,可以使用CompletionService
- 如果需要控制多线程顺序,可以使用FutureTask、CountDownLatch、Semaphore等等闭锁方式实现
五、chapter8:线程池的使用
1、问题
线程池可以方便的将提交的task以及线程池的调度执行策略向分离,但是这种分离并不是特别彻底:
- 依赖任务(可能导致饥饿死锁):大多数提交给线程池的任务是独立的,比如在一个App下载首页的瀑布流图片的时候,每张图片的下载都是互相独立的。当任务是独立的时候,修改线程池的配置以及执行策略,这部分修改只对执行的性能比如负载、吞吐量等等有关系,但是如果task之间彼此依赖,那么执行策略以及提交的任务之间便存在隐性耦合。
- 线程封闭:单线程的Executor能对并发下的数据安全做出保证,同时也把任务以及执行策略进行了隐性耦合,一旦改变执行策略,处理结果就会出现问题。
任务执行时间不均匀:如下图所示
task1跟task2如果长时间占据线程池的资源工作,其他的线程会处于堵塞状态。
2、配置ThreadPoolExecutor
- 线程创建销毁:线程的创建销毁的损耗由线程池的大小,最大值以及线程的超时时间共同决定。
- 基本大小:线程池无任务的大小
- 最大值:同时活动的线程数量上限
- 超时时间:如果线程的空闲时间超过了超时时间,则会被标记为回收线程
- 队列策略
- 无界队列:例如Executors#newFixedThreadPool或者Executors.newSingleThreadExecutor等
- 有界队列
- 同步移交:如果此时线程容量以及达到上限,将请求直接拒绝加入队列直接交给工作线程处理
六、chapter13:显式锁
1、概念
与synchronized的内置锁不同,Lock定义了一个无条件的、可轮询的、可定时的、可中断的锁机制。
2、意义
- 提供了可轮询的锁:避免了顺序死锁(哲学家问题)
- 提供了定时锁:超时放弃
- 提供了公平与否的选择
- 性能高
3、读写锁
附上一个思维导图