图源:
终止任务
公园入园
关于这点,《Thinking in Java》中有一个公园入园的示例:假设有一个公园,存在多个入口可以入园,现在用程序统计每个入口已进入的人数,以及总的已入园人数。
这可以看做是一个简单的模拟程序,显然多个入口入园是一个并发的过程,要用多个线程去模拟。而总的入园计数器需要在多个线程之间共享。
最终的代码如下:
package ch22.end;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import util.Fmt;
class Counter {
private int total;
public synchronized void increase() {
total++;
}
public synchronized int getTotal() {
return total;
}
}
class Entrance implements Runnable {
private static final Counter COUNTER = new Counter();
private int total;
private int id;
private static volatile boolean canceled = false;
private static List<Entrance> entrances = new ArrayList<>();
public Entrance(int id) {
this.id = id;
entrances.add(this);
}
public static void cancel() {
canceled = true;
}
public static boolean isCancel() {
return canceled;
}
public void run() {
while (!canceled) {
synchronized (this) {
total++;
}
COUNTER.increase();
System.out.println(this + " total:" + COUNTER.getTotal());
}
System.out.println(this + " is closed, total:" + COUNTER.getTotal());
}
public synchronized String toString() {
return Fmt.sprintf("Gate#%d,nums:%d", id, total);
}
public synchronized int getTotal() {
return total;
}
public static int getSumTotal() {
int total = 0;
for (Entrance entrance : entrances) {
total += entrance.getTotal();
}
return total;
}
}
public class Main {
public static void main(String[] args) {
ExecutorService es = Executors.newCachedThreadPool();
for (int i = 0; i < 3; i++) {
es.execute(new Entrance(i + 1));
}
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
Entrance.cancel();
es.shutdown();
try {
if(!es.awaitTermination(3, TimeUnit.SECONDS)){
System.out.println("task is not end all.");
}
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("total: " + Entrance.getSumTotal());
}
}
// ...
// Gate#2,nums:2205 is closed, total:6517
// Gate#1,nums:2083 total:6517
// Gate#3,nums:2229 is closed, total:6517
// Gate#1,nums:2083 is closed, total:6517
// total: 6517
在这个示例中,存在两个共享数据:Entrance.canceled
和Entrance.COUNTER
。前者是boolean
类型,对其赋值和读取操作是原子性的,而后者的相关操作都要经过同步方法increase
和getTotal
完成,因此也是线程安全的。
此外,为了检测Entrance.COUNTER
中的合计是否与当前所有Entrance
实例中的total
总和相等,使用一个额外的Entrance.entrances
来持有所有Entrance
实例的引用,并在必要时候进行遍历汇总结果以进行比较。
需要注意的是,往
Entrance.entrances
添加数据时,是通过Entrance
的构造器完成的,此时是处于主线程(只有run
中的代码才是子线程执行的部分)。而调用Entrance.getSumTotal
方法遍历Entrance.entrances
以获取合计结果同样是发生在主线程。换言之,Entrance.entrances
并不是一个共享数据,所以这里并没有使用Collections.synchronizedList
方法将其转换为线程安全的容器。
主线程最后调用Entrance.cancel
和ExecutorService.shutdown
方法终止子线程后,调用ExecutorService.awaitTermination
方法的作用在于等待若干时间后检查ExecutorService
管理的线程是否全部终止(返回值true
表示肯定)。
这里通过加载Entrance
任务启动的子线程,通过检查Entrance.canceled
来判断是否应当退出。虽然在这个示例中是可行的,但这并非是一般性的停止多线程的方式,下面我们会看到更一般性的方法。
在阻塞时终结
一个线程,可以处于以下四种线程状态之一:
-
新建(new):线程创建后,会短暂处于这种状态。
-
就绪(runnable):在这种状态下,处理器可以将时间片分配给线程以执行。
-
阻塞(blocked):线程可以运行,但因为某种条件不满足无法继续运行。此时处理器将不给该线程分配时间片,直到该线程条件满足,重新处于就绪状态。
-
死亡(dead):线程结束,不再可调度,也不会被处理器分配时间片。
一个线程进入阻塞状态的原因,有以下几种:
-
调用
sleep()
进入休眠。 -
调用
wait()
使线程挂起,直到相应的notify()
或notifyAll
被调用。 -
等待输入/输出(IO)完成。
-
视图在某个对象上调用同步方法,但对象锁不可用。
中断
在上篇有提到过可以通过Thread.interrupt
方法让线程产生一个中断(interrupt),以关闭相应的线程。
实际上ExecutorService.shutdownNow
方法也是通过这种方式关闭所有线程的,它会向所管理的全部线程都调用对应的Thread.interrupt
方法发送中断。你甚至可以通过获取线程的Future
对象,并通过Future.cancel
方法向单个线程发送中断。
需要注意的是,并非所有阻塞的线程都可以“响应”中断并退出:
package ch22.interrupt;
import java.io.IOException;
import java.io.InputStream;
import java.util.concurrent.TimeUnit;
class SleepInterrupt implements Runnable {
public void run() {
try {
System.out.println("sleeped and block...");
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
System.out.println("interrupt is happen.");
}
}
}
class IOInterrupt implements Runnable{
private InputStream input;
public IOInterrupt(InputStream input){
this.input = input;
}
public void run() {
System.out.println("wait for io read and block...");
try {
input.read();
} catch (IOException e) {
if(Thread.interrupted()){
System.out.println("interrupt is happen.");
}
e.printStackTrace();
}
}
}
class SyncMethodInterrupt implements Runnable{
public SyncMethodInterrupt(){
new Thread(){
public void run() {
synchronized(SyncMethodInterrupt.class){
while(true){
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}.start();
}
public void run() {
System.out.println("require sync lock and block...");
synchronized(SyncMethodInterrupt.class){
Thread.yield();
}
if(Thread.interrupted()){
System.out.println("interrupt is happen.");
}
}
}
public class Main {
public static void main(String[] args) {
testInterruptTask(new SleepInterrupt());
testInterruptTask(new IOInterrupt(System.in));
testInterruptTask(new SyncMethodInterrupt());
}
private static void testInterruptTask(Runnable task) {
Thread thread = new Thread(task);
thread.start();
try {
TimeUnit.SECONDS.sleep(1);
thread.interrupt();
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
// sleeped and block...
// interrupt is happen.
// wait for io read and block...
// require sync lock and block...
上面的示例分别测试了sleep
调用引发的阻塞、等待IO引发的阻塞、等待同步锁引发的阻塞这三种情况,只有sleep
调用引起的阻塞是可以通过Thread.interrupt
调用发送中断结束的。其它两种情况不会响应中断,自然也不会结束。
对于因为等待IO发生的阻塞,实际上可以通过关闭相应的阻塞来引发异常,继而让线程退出:
package ch22.io_block;
import java.io.IOException;
import java.io.InputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.UnknownHostException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class Main {
public static void main(String[] args) throws UnknownHostException, IOException, InterruptedException {
ServerSocket ss = new ServerSocket(8081);
InputStream in = new Socket("localhost", 8081).getInputStream();
ExecutorService es = Executors.newCachedThreadPool();
es.execute(new IOInterrupt(System.in));
es.execute(new IOInterrupt(in));
TimeUnit.SECONDS.sleep(1);
es.shutdown();
System.out.println("close socket.");
in.close();
TimeUnit.SECONDS.sleep(1);
System.out.println("close system.in");
System.in.close();
}
}
// wait for io read and block...
// wait for io read and block...
// close socket.
// java.net.SocketException: Socket closed
// IOInterrupt.run is exit.
// close system.in
这里使用两种输入流来创建IOInterrupt
实例,并分别用线程运行。主线程稍后分别通过关闭输入流的方式来让线程退出。
奇怪的是这里关闭标准输入(
System.in.close()
)并不能让相应的子线程捕获异常并退出,这点和《Thinking in Java》中的示例结果是完全不同的,或许和javaSE 8对IO相应的改动有关。
不过我们在中介绍过的新IO(nio)可以很好的响应中断:
package ch22.nio_block;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
class NioBlock implements Runnable {
private SocketChannel sc;
public NioBlock(SocketChannel sc) {
this.sc = sc;
}
public void run() {
try {
System.out.println("use nio wait socket and block...");
sc.read(ByteBuffer.allocate(1));
} catch (IOException e) {
if (Thread.interrupted()) {
System.out.println("interrupt is happen.");
}
System.out.println(e);
}
System.out.println(this.getClass().getSimpleName()+".run() is exit.");
}
}
public class Main {
public static void main(String[] args) throws IOException, InterruptedException {
ServerSocket ss = new ServerSocket(8080);
InetSocketAddress isa = new InetSocketAddress("localhost", 8080);
SocketChannel sc1 = SocketChannel.open(isa);
SocketChannel sc2 = SocketChannel.open(isa);
ExecutorService es = Executors.newCachedThreadPool();
Future<?> result = es.submit(new NioBlock(sc1));
es.execute(new NioBlock(sc2));
TimeUnit.SECONDS.sleep(1);
es.shutdown();
result.cancel(true);
TimeUnit.SECONDS.sleep(1);
sc2.close();
}
}
// use nio wait socket and block...
// use nio wait socket and block...
// interrupt is happen.
// java.nio.channels.ClosedByInterruptException
// NioBlock.run() is exit.
// java.nio.channels.AsynchronousCloseException
// NioBlock.run() is exit.
可以看到,这个示例中有两个线程都在使用nio
来等待资源并阻塞,在主线程中,分别使用Future.cancel
以发送中断的方式以及SocketChannel.close
关闭资源的方式让子线程退出。从输出可以很清楚地看到,前者有中断标识,Thread.interrupted
返回true
,并且异常类型是ClosedByInterruptException
。而后者不会产生中断标识,其异常类型是AsynchronousCloseException
。
前边我们看到了使用同步块的方式加锁引发的阻塞不会响应中断。而JavaSE 5引入的“再入锁”ReentrantLock
其功能和特点很像同步块(同一个线程可以多次获得锁),默认情况下调用ReentrantLock.lock
引发的阻塞也同样不会响应中断。但比较特别的是,还有一个ReentrantLock.lockInterruptibly
方法,同样可以尝试获取锁,并且该方法调用引起的阻塞是可以响应中断的:
package ch22.block;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
class ReentrantLockBlock implements Runnable {
ReentrantLock lock = new ReentrantLock();
public ReentrantLockBlock() {
lock.lock();
}
public void run() {
try {
System.out.println("wait lock and blocked.");
lock.lockInterruptibly();
} catch (InterruptedException e) {
if (Thread.interrupted()) {
System.out.println("interrupt is happen.");
}
System.out.println(e);
}
}
}
public class Main {
public static void main(String[] args) throws InterruptedException {
Thread t = new Thread(new ReentrantLockBlock());
t.start();
TimeUnit.SECONDS.sleep(1);
t.interrupt();
}
}
// wait lock and blocked.
// java.lang.InterruptedException
在ReentrantLockBlock
的构造器中,通过lock.lock
在主线程中对lock
加锁,并且没有释放。在子线程中,通过lock.lockInterruptibly
获取锁并产生阻塞。
在主线程发送中断后,可以看到子线程捕获了InterruptedException
异常并退出。
这里并没有输出
interrupt is happen.
,是因为子线程抛出InterruptedException
异常后,相应的异常标识已经重置,所以Thread.interrupted
返回的是false
。
检查中断
你可能有点迷糊,子线程在什么时候用InterruptedException
捕获中断,而什么时候用Thread.interrupted
检查中断。
实际上,在其它线程给当前线程发送中断后,当前线程的中断标识会被设置。此时如果当前线程处于sleep
调用等可中断阻塞情况,就会产生一个InterruptedException
异常,子线程就可以捕获这个异常并退出。如果子线程并非处于阻塞状态,就会正常执行代码,除非遇到Thread.interrupted
语句检查中断情况并退出。
最常见的情况是子线程通过循环来执行某些任务,此时将循环条件设置为!Thread.interrupted()
是一个习惯用法:
package ch22.check;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
class OddWriter implements Runnable {
private int num = 1;
public void run() {
try {
while (!Thread.interrupted()) {
System.out.print(num + " ");
num += 2;
TimeUnit.MILLISECONDS.sleep(500);
}
} catch (InterruptedException e) {
;
}
System.out.println();
System.out.println("OddWriter is exit.");
}
}
class EvenWriter implements Runnable {
private int num = 0;
public void run() {
try {
while (!Thread.interrupted()) {
System.out.print(num + " ");
num += 2;
TimeUnit.MILLISECONDS.sleep(500);
}
} catch (InterruptedException e) {
;
}
System.out.println();
System.out.println("EvenWriter is exit.");
}
}
public class Main {
public static void main(String[] args) throws InterruptedException {
ExecutorService es = Executors.newCachedThreadPool();
es.execute(new EvenWriter());
TimeUnit.MILLISECONDS.sleep(100);
es.execute(new OddWriter());
TimeUnit.SECONDS.sleep(3);
es.shutdownNow();
}
}
// 0 1 2 3 4 5 6 7 8 9 10 11 12
// OddWriter is exit.
// EvenWriter is exit.
这个示例中两个任务分别用于输出奇数和偶数,并在输出一个数字后用sleep
休眠一会。
为了能确保子线程无论是在休眠时还是在程序正常执行时都能以中断的方式退出,这里在run
中使用try...catch
来捕获可能出现的InterruptedException
并用!Thread.interrupted
作为循环条件。
实际上在这个示例中,仅有
sleep
会产生InterruptedException
异常,并需要用try...catch
捕获,但对整个run
方法内的代码使用try...catch
是一种更方便的做法,可以保证即使后期加入其它可能引发InterruptedException
异常的阻塞调用语句时,程序依然可以正常运行。
线程协作
在多线程编程领域,通常会有需要多个线程协同完成某个任务这类问题,比如上边的示例,一个线程负责产生奇数,一个线程负责产生偶数。上边的示例其实是有问题的,那个示例并不能确保这种数据产生顺序是交叉进行的,这里就需要通过某种方式让线程“知道”自己应该在什么时候工作,什么时候休眠。
我们可以通过wait()
和notifyAll()
来实现这种需要:
package ch22.cooperation;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import ch22.cooperation.Need.Type;
class Need {
enum Type {
EVEN, ODD
};
public Type nextType = Type.EVEN;
}
class OddWriter implements Runnable {
private Need need;
private int num = 1;
public OddWriter(Need need) {
this.need = need;
}
public void run() {
try {
while (!Thread.interrupted()) {
synchronized (need) {
while (need.nextType != Need.Type.ODD) {
need.wait();
}
System.out.print(num + " ");
num += 2;
need.nextType = Need.Type.EVEN;
TimeUnit.MILLISECONDS.sleep(100);
need.notifyAll();
}
}
} catch (InterruptedException e) {
// e.printStackTrace();
}
}
}
class EvenWriter implements Runnable {
private Need need;
private int num = 0;
public EvenWriter(Need need) {
this.need = need;
}
public void run() {
try {
while (!Thread.interrupted()) {
synchronized (need) {
while (need.nextType != Type.EVEN) {
need.wait();
}
System.out.print(num + " ");
num += 2;
need.nextType = Type.ODD;
TimeUnit.MILLISECONDS.sleep(100);
need.notifyAll();
}
}
} catch (InterruptedException e) {
// e.printStackTrace();
}
}
}
public class Main {
public static void main(String[] args) throws InterruptedException {
Need need = new Need();
ExecutorService es = Executors.newCachedThreadPool();
es.execute(new EvenWriter(need));
es.execute(new OddWriter(need));
TimeUnit.SECONDS.sleep(3);
es.shutdownNow();
}
}
// 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
这里依然是两个任务分别产生奇数和偶数,不同的是,它们都需要通过同步块对Need
实例加锁,并通过Need.nextType
获取下一个应当产生的数据的类型。
在负责产生奇数的OddWriter
任务中,如果成功获取了Need
实例的锁,就通过need.nextType != Need.Type.ODD
条件检查当前是否应该输出奇数,如果不是,就使用wait()
方法挂起。此时所在的线程会阻塞,并且释放Need
实例的锁(这点很重要)。此时另一个线程中的EvenWriter.run
方法会通过同步块获取到Need
实例的锁,并检查need.nextType != Type.EVEN
是否成立(当前这种情况下肯定是成立的),如果成立,就输出偶数并自增,然后将need.nextType
修改为Type.ODD
,并通过need.notifyAll
调用"唤醒"所有因为need.wait
挂起的线程。这些线程会尝试重新获取Need
实例的锁,如果成功,就会重新检查条件并执行。
就像示例中展示的,这些协作任务通常会在条件不满足时挂起,在其它任务改变条件,并通过notify
或notifyAll
唤醒这些挂起的任务后,重新检查条件,如果满足就可以继续执行(如果不满足就继续挂起)。
需要注意的是:
-
wait
调用必须是在一个同步块或者一个同步方法内,换言之必须获取一个对象锁,并且之后的wait
调用主体必须是这个对象锁对应的对象。这是因为调用wait
后对应的对象锁就会释放,以便其他等待同一个对象锁的线程能够获取锁,并执行代码以改变条件。所以这种“获取-挂起”和“获取-唤醒”操作使用的对象和对象锁必须是基于同一个对象的。 -
notify
只会唤醒一个等待对象锁的线程,而notifyAll
可以唤醒所有等待该对象锁的线程。在这个示例中因为一个线程工作时,仅有另一个线程等待Need
实例的锁,因此使用notify
或notifyAll
都是可行的。但因为通常程序中都存在若干个挂起线程,所以使用notifyAll
更加安全和常见。 -
wait
调用通常都会用while
包裹,因为其他线程通过notify
唤醒当前线程,并不意味着当前线程的工作条件一定已经达成。这很容易理解,因为可能不止两种类型,而是更多类型的线程在协作,此时这种唤醒可能是任何一种条件改变后发生的,很可能并不是当前线程需要收到的信号,但当前线程同样会被唤醒。因此必须通过循环不断检查唤醒后的条件是否满足,只有满足后才继续执行下边的操作。
生产者与消费者
再看一个生产者与消费者的例子:一个餐厅中有一个大厨和一个服务员,大厨在吧台为空的情况下生产食物并放在吧台上,服务员在吧台有食物的情况下带走食物,送给顾客。
下面是完整示例:
package ch22.restaurant;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import util.Fmt;
class Waiter implements Runnable {
private Restaurant restaurant;
public Waiter(Restaurant restaurant) {
this.restaurant = restaurant;
}
public void run() {
try {
while (!Thread.interrupted()) {
synchronized (restaurant) {
while (restaurant.bar == null) {
restaurant.wait();
}
Meal meal = restaurant.bar;
restaurant.bar = null;
System.out.println(meal + " is take by Waiter.");
restaurant.notifyAll();
if (meal.id > 5) {
break;
}
}
}
} catch (InterruptedException e) {
;
}
}
}
class Chef implements Runnable {
private Restaurant restaurant;
private int mealId = 1;
public Chef(Restaurant restaurant) {
this.restaurant = restaurant;
}
public void run() {
try {
while (!Thread.interrupted()) {
synchronized (restaurant) {
while (restaurant.bar != null) {
restaurant.wait();
}
Meal meal = new Meal(mealId);
mealId++;
restaurant.bar = meal;
System.out.println(meal + " is build by Chef.");
restaurant.notifyAll();
if (meal.id > 5) {
break;
}
}
}
} catch (InterruptedException e) {
// e.printStackTrace();
}
}
}
class Restaurant {
public Meal bar;
}
class Meal {
public int id;
public Meal(int id) {
this.id = id;
}
public String toString() {
return Fmt.sprintf("Meal#%d", id);
}
}
public class Main {
public static void main(String[] args) throws InterruptedException {
ExecutorService es = Executors.newCachedThreadPool();
Restaurant restaurant = new Restaurant();
es.execute(new Waiter(restaurant));
es.execute(new Chef(restaurant));
es.shutdown();
}
}
// Meal#1 is build by Chef.
// Meal#1 is take by Waiter.
// Meal#2 is build by Chef.
// Meal#2 is take by Waiter.
// Meal#3 is build by Chef.
// Meal#3 is take by Waiter.
// Meal#4 is build by Chef.
// Meal#4 is take by Waiter.
// Meal#5 is build by Chef.
// Meal#5 is take by Waiter.
// Meal#6 is build by Chef.
// Meal#6 is take by Waiter.
其实本质上与上一个示例是相同的。
不过显而易见的是,通过wait
和notify
来协同线程,很难理解且容易出错,如果你用过Go,就会明白我说的是什么意思。实际上Java中也可以用同步队列完成类似的工作。
Condition
之前说过,同步块在很多方面和“再入锁”ReentrantLock
很相似,事实上它们也经常可以互相替换。实际上使用“再入锁”同样可以编写类似的线程协同代码:
package ch22.restaurant2;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import util.Fmt;
class Waiter implements Runnable {
private Restaurant restaurant;
public Waiter(Restaurant restaurant) {
this.restaurant = restaurant;
}
public void run() {
try {
while (!Thread.interrupted()) {
restaurant.lock.lock();
try {
while (restaurant.bar == null) {
restaurant.condition.await();
}
Meal meal = restaurant.bar;
restaurant.bar = null;
System.out.println(meal + " is take by Waiter.");
restaurant.condition.signalAll();
if (meal.id > 5) {
break;
}
} finally {
restaurant.lock.unlock();
}
}
} catch (InterruptedException e) {
;
}
}
}
class Chef implements Runnable {
private Restaurant restaurant;
private int mealId = 1;
public Chef(Restaurant restaurant) {
this.restaurant = restaurant;
}
public void run() {
try {
while (!Thread.interrupted()) {
restaurant.lock.lock();
try {
while (restaurant.bar != null) {
restaurant.condition.await();
}
Meal meal = new Meal(mealId);
mealId++;
restaurant.bar = meal;
System.out.println(meal + " is build by Chef.");
restaurant.condition.signalAll();
if (meal.id > 5) {
break;
}
} finally {
restaurant.lock.unlock();
}
}
} catch (InterruptedException e) {
// e.printStackTrace();
}
}
}
class Restaurant {
public Meal bar;
public ReentrantLock lock = new ReentrantLock();
public Condition condition = lock.newCondition();
}
class Meal {
public int id;
public Meal(int id) {
this.id = id;
}
public String toString() {
return Fmt.sprintf("Meal#%d", id);
}
}
public class Main {
public static void main(String[] args) throws InterruptedException {
ExecutorService es = Executors.newCachedThreadPool();
Restaurant restaurant = new Restaurant();
es.execute(new Waiter(restaurant));
es.execute(new Chef(restaurant));
es.shutdown();
}
}
// Meal#1 is build by Chef.
// Meal#1 is take by Waiter.
// Meal#2 is build by Chef.
// Meal#2 is take by Waiter.
// Meal#3 is build by Chef.
// Meal#3 is take by Waiter.
// Meal#4 is build by Chef.
// Meal#4 is take by Waiter.
// Meal#5 is build by Chef.
// Meal#5 is take by Waiter.
// Meal#6 is build by Chef.
// Meal#6 is take by Waiter.
与之前示例不同的是,这里没有用同步块给Restaurant
实例加锁,而是显式获取了Restaurant.lock
锁。并且该锁通过lock.newCondition
方法关联了一个Condition
实例。我们通过Condition.await
可以挂起当前线程并释放ReentrantLock
锁,通过Condition.signalAll
可以唤醒对应的Condition
对象挂起的线程,并尝试获取相应的ReentrantLock
锁。
整体上两者代码结构是类似的,ReentrantLock
+Condition
的组合更复杂一些。这种方式一般只会在更复杂的多线程协同编程中使用。
阻塞队列
如果用Go,通常会使用队列来解决类似的线程协同问题,同样的,Java也支持线程安全的队列(阻塞队列)。
在Java中,同步队列被抽象成java.util.concurrent.BlockingQueue
接口。
BlockingQueue
主要有三种实现:
-
ArrayBlockingQueue
,固定容量的队列。 -
LinkedBlockingDeque
,无限容量的队列。 -
SynchronousQueue
,容量为0的队列,也称作同步队列。
同步队列的容量为0,写入数据后会立即阻塞,需要其它线程读取后才能继续执行。
实际上这里的同步队列其行为和Go语言中的无缓冲通道一致,相关内容可以阅读。
可以很容易地用阻塞队列来修改之前的示例:
package ch22.restaurant3;
...
class Waiter implements Runnable {
...
public void run() {
try {
while (!Thread.interrupted()) {
Meal meal = restaurant.bar.take();
System.out.println(meal + " is take by Waiter.");
if (meal.id > 5) {
break;
}
}
} catch (InterruptedException e) {
;
}
}
}
class Chef implements Runnable {
...
public void run() {
try {
while (!Thread.interrupted()) {
Meal meal = new Meal(mealId);
System.out.println(meal + " is build by Chef.");
restaurant.bar.put(meal);
mealId++;
if (meal.id > 5) {
break;
}
}
} catch (InterruptedException e) {
// e.printStackTrace();
}
}
}
class Restaurant {
public LinkedBlockingQueue<Meal> bar = new LinkedBlockingQueue<>();
}
...
这里只显示了差异的部分。
可以看到代码简洁了很多,大厨和服务员不需要考虑同步的问题,一个只管生产食物并放进队列,另一个只管从队列中获取食物。
使用队列的优点并不仅仅是代码简洁,来看另一个更典型的问题:
package ch22.toast;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import util.Fmt;
class Cooker1 implements Runnable {
private BlockingQueue<Toast> queue;
private int delay;
public Cooker1(BlockingQueue<Toast> queue, int delay) {
this.queue = queue;
this.delay = delay;
}
public void run() {
try {
while (!Thread.interrupted()) {
TimeUnit.SECONDS.sleep(delay);
Toast t = new Toast();
System.out.println(t + " is cooked.");
queue.put(t);
}
} catch (InterruptedException e) {
;
}
}
}
class Cooker2 implements Runnable {
private int delay;
private BlockingQueue<Toast> dry;
private BlockingQueue<Toast> buttered;
public Cooker2(int delay, BlockingQueue<Toast> dry, BlockingQueue<Toast> buttered) {
this.delay = delay;
this.dry = dry;
this.buttered = buttered;
}
public void run() {
try {
while (!Thread.interrupted()) {
Toast toast = dry.take();
TimeUnit.SECONDS.sleep(delay);
toast.hasButter = true;
System.out.println(toast + " is buttered.");
buttered.put(toast);
}
} catch (InterruptedException e) {
;
}
}
}
class Cooker3 implements Runnable {
private int delay;
private BlockingQueue<Toast> buttered;
public Cooker3(int delay, BlockingQueue<Toast> buttered) {
this.delay = delay;
this.buttered = buttered;
}
public void run() {
try {
while (!Thread.interrupted()) {
Toast t = buttered.take();
TimeUnit.SECONDS.sleep(delay);
t.hasJam = true;
System.out.println(t + " is jamed.");
}
} catch (InterruptedException e) {
;
}
}
}
class Toast {
private static int counter = 1;
private final int id = counter++;
boolean hasButter = false;
boolean hasJam = false;
public int getId() {
return id;
}
public String toString() {
return Fmt.sprintf("Toast#%d(hasButter:%s,hasJam:%s)", id, hasButter, hasJam);
}
}
public class Main {
public static void main(String[] args) throws InterruptedException {
ExecutorService es = Executors.newCachedThreadPool();
BlockingQueue<Toast> dry = new ArrayBlockingQueue<>(3);
BlockingQueue<Toast> buttered = new ArrayBlockingQueue<>(3);
BlockingQueue<Toast> jamed = new ArrayBlockingQueue<>(3);
es.execute(new Cooker1(dry, 1));
es.execute(new Cooker2(1, dry, buttered));
es.execute(new Cooker3(1, buttered));
TimeUnit.SECONDS.sleep(5);
es.shutdownNow();
}
}
// Toast#1(hasButter:false,hasJam:false) is cooked.
// Toast#2(hasButter:false,hasJam:false) is cooked.
// Toast#1(hasButter:true,hasJam:false) is buttered.
// Toast#2(hasButter:true,hasJam:false) is buttered.
// Toast#1(hasButter:true,hasJam:true) is jamed.
// Toast#3(hasButter:false,hasJam:false) is cooked.
// Toast#2(hasButter:true,hasJam:true) is jamed.
// Toast#3(hasButter:true,hasJam:false) is buttered.
// Toast#4(hasButter:false,hasJam:false) is cooked.
可以将上面的示例想象成一个面包店,1号厨师(Cook1
)负责生产吐司面包(Toast
),二号厨师负责给面包涂抹黄油,三号厨师负责给面包涂抹果酱。每一片吐司面包都要经过这三道工序才能最终完成。
三个厨师由三个线程运行,所以它们是可以并行工作的。面包由两个阻塞队列在三个线程之间传递,并依次由相应的线程“加工”。因为同一时间同一个面包只会由一个线程拥有,所以这里并不存在共享数据的问题,我们也不需要对面包对象进行额外的同步处理。
这个解决方案最大的优点在于可以根据情况给这个“生产线”添加或削减某个工序的生产者。比如假设1号厨师和3号厨师干活很快,而2号厨师比较慢,显而易见地是面包会堆积在2号厨师这里,整个系统都会被拖慢。此时你只需要添加一个额外的线程执行Cook2
任务即可。
管道
在介绍IO时,实际上有一些组件没有介绍,比如和。
可以将PipedWriter
和PipedReader
看作IO版本的阻塞队列,同样可以用它们实现某些线程协作。
比如:
package ch22.pipe;
import java.io.IOException;
import java.io.PipedReader;
import java.io.PipedWriter;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
class CharacterWriter implements Runnable {
private PipedWriter pipedWriter;
public CharacterWriter(PipedWriter pipedWriter) {
this.pipedWriter = pipedWriter;
}
public void run() {
char character = 'a';
while (!Thread.interrupted()) {
try {
pipedWriter.append(character);
} catch (IOException e) {
System.out.println(e);
break;
}
if (character == 'z') {
break;
}
character++;
}
}
};
class CharacterReader implements Runnable {
private PipedReader pipedReader;
public CharacterReader(PipedReader pipedReader) {
this.pipedReader = pipedReader;
}
public void run() {
while (!Thread.interrupted()) {
try {
char character = (char) pipedReader.read();
System.out.print(character + " ");
if(character == 'z'){
break;
}
} catch (IOException e) {
System.out.println(e);
break;
}
}
}
};
public class Main {
public static void main(String[] args) throws IOException {
PipedWriter pipedWriter = new PipedWriter();
PipedReader pipedReader = new PipedReader(pipedWriter);
ExecutorService es = Executors.newCachedThreadPool();
es.execute(new CharacterWriter(pipedWriter));
es.execute(new CharacterReader(pipedReader));
es.shutdown();
}
}
// a b c d e f g h i j k l m n o p q r s t u v w x y z
这里有两个线程,一个产生字母并写入PipedWriter
管道,另一个从PipedReader
管道读取。
当然,前提条件是这两个管道需要以某种方式创建联系,比如这里的
new PipedReader(pipedWriter)
。
管道更多的是在JavaSE 5提供阻塞队列之前的一种选择,所以在JavaSE 5之后,它更多的是被阻塞队列所取代。
死锁
关于死锁,《Thinking in Java》介绍了一个很经典的哲学家问题:
package ch22.philosopher;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import util.Fmt;
class Philosopher implements Runnable {
private static int counter = 1;
private final int id = counter++;
private Chopstick left;
private Chopstick right;
private static Random random = new Random();
private int delay;
public Philosopher(Chopstick left, Chopstick right, int delay) {
this.left = left;
this.right = right;
this.delay = delay;
}
public void run() {
pause();
left.take();
System.out.println(this + " take left chopstick.");
right.take();
System.out.println(this + " take right chopstick.");
pause();
left.drop();
System.out.println(this + " drop left chopstick.");
right.drop();
System.out.println(this + " drop right chopstick.");
}
private void pause() {
try {
TimeUnit.MILLISECONDS.sleep(random.nextInt(250 * delay + 1));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public String toString() {
return Fmt.sprintf("Philosohper#%d", id);
}
}
class Chopstick {
private boolean isUsed = false;
public synchronized void take() {
while (isUsed) {
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
isUsed = true;
}
public synchronized void drop() {
isUsed = false;
notifyAll();
}
}
public class Main {
public static void main(String[] args) {
test(5, 0);
}
public static void test(int num, int delay) {
Chopstick[] chopsticks = new Chopstick[num];
for (int i = 0; i < chopsticks.length; i++) {
chopsticks[i] = new Chopstick();
}
Philosopher[] philosophers = new Philosopher[num];
for (int i = 0; i < philosophers.length; i++) {
if (i == num - 1) {
philosophers[i] = new Philosopher(chopsticks[i], chopsticks[0], delay);
} else {
philosophers[i] = new Philosopher(chopsticks[i], chopsticks[i + 1], delay);
}
}
ExecutorService es = Executors.newCachedThreadPool();
for (Philosopher philosopher : philosophers) {
es.execute(philosopher);
}
es.shutdown();
}
}
// Philosohper#2 take left chopstick.
// Philosohper#3 take left chopstick.
// Philosohper#5 take left chopstick.
// Philosohper#1 take left chopstick.
// Philosohper#4 take left chopstick.
这个问题中,有若干个哲学家(Philosopher
)围坐在一起吃饭,每两个哲学家中间有一只筷子(Chopstick
),每个哲学家都会尝试先拿起左边的筷子,再拿起右边的筷子吃饭。并且在吃饭前和吃饭中随机休眠一段时间,表示思考或吃饭。休眠时间由我们通过一个参数delay
进行指定。
在这里我使用5个哲学家以及delay=0
的方式运行程序,可以发现程序立即“死锁”。每个哲学家都拿起了左边的筷子,视图拿起右边的筷子时发现没有筷子可以使用,整个程序没法继续运行下去。
在这个示例中,如果要让死锁尽快发生,可以减少哲学家的数目和delay
的值。如果要避免死锁发生,可以做相反的操作。
当且仅当程序满足以下情况就可能发生死锁:
-
互斥条件:任务使用的资源至少有一个是不能共享(独占)的。
-
至少有一个任务持有一个资源,并且正试图获取另一个任务持有的资源。
-
资源不能被抢占,只能等待持有的任务主动释放。
-
存在循环等待:即一个任务等待另一个任务释放资源,另一个任务等待另另一个任务释放资源...直到某个任务等待第一个任务释放资源。
要产生死锁,以上情况都要满足。如果要防止死锁,解决任意一个情况都可以。
在这个例子中,要解决死锁,最简单的方式是打破循环等待:
package ch22.philosopher2;
...
public class Main {
public static void main(String[] args) {
test(5, 0);
}
public static void test(int num, int delay) {
...
Philosopher[] philosophers = new Philosopher[num];
for (int i = 0; i < philosophers.length; i++) {
if (i == num - 1) {
philosophers[i] = new Philosopher(chopsticks[0], chopsticks[i], delay);
} else {
philosophers[i] = new Philosopher(chopsticks[i], chopsticks[i + 1], delay);
}
}
...
}
}
这里仅修改了一处代码,即将最后一个哲学家的左手筷子和右手筷子调换位置,这可以理解为最后一个哲学家拿起筷子的顺序改变,不再是先左后右,而是先右后左。这样以后就不会出现死锁。
假设最后一个哲学家试图拿起右边的筷子,如果被占用,那么第一个哲学家必然已经拿起左边的筷子,这样自然不会产生死锁。假设最后一个哲学家成功拿起右边的筷子,那么第一个哲学家就没法拿起左边的筷子,自然第二个哲学家就可以拿起左边的筷子完成用餐。这同样不会发生死锁。
死锁的难点在于难以发现,有时候死锁是隐蔽且难以发现和复现的,这就很难排查和处理。
今天对并发的讨论就到这里了,谢谢阅读。
文章评论