图源:
仿真
利用多线程可以编写仿真程序,实际上在中介绍的“赛马游戏”就是一个仿真程序。
这里我们介绍一个更复杂一点的仿真程序。
餐厅
假设我们有一个餐厅,有顾客、侍者、厨师这几个角色,顾客可以通过侍者来点菜,厨师根据订单来做菜,做好菜后由侍者将菜品送给顾客品尝。
package ch24.restaurant;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
import ch24.restaurant.Meal.Type;
import util.Enums;
import util.Fmt;
class Meal {
public static enum Type {
HAMBURG, CHIPS, FRIED_CHIKEN
}
private static int counter = 0;
private final int id = ++counter;
private Type type;
public Meal(Type type) {
this.type = type;
}
public String toString() {
return Fmt.sprintf("Meal#%d(%s)", id, type);
}
}
class Order {
private Customer customer;
private Waiter waiter;
private Meal.Type mealType;
private Meal meal;
public Meal getMeal() {
return meal;
}
public void setMeal(Meal meal) {
this.meal = meal;
}
public Customer getConsumer() {
return customer;
}
public void setConsumer(Customer customer) {
this.customer = customer;
}
public Waiter getWaiter() {
return waiter;
}
public void setWaiter(Waiter waiter) {
this.waiter = waiter;
}
public Meal.Type getMealType() {
return mealType;
}
public void setMealType(Meal.Type mealType) {
this.mealType = mealType;
}
public Order(Customer customer, Waiter waiter, Type mealType) {
this.customer = customer;
this.waiter = waiter;
this.mealType = mealType;
}
public String toString() {
return Fmt.sprintf("Order(%s) created by %s to %s", mealType, customer, waiter);
}
}
class Restaurant implements Runnable {
private BlockingQueue<Order> orders = new ArrayBlockingQueue<>(20);
private List<Chef> chefs = new ArrayList<>();
private List<Waiter> waiters = new ArrayList<>();
private List<Customer> customers = new ArrayList<>();
private static Random rand = new Random();
private ExecutorService es;
public Restaurant(int chefNum, int waiterNum, ExecutorService es) {
this.es = es;
for (int i = 0; i < chefNum; i++) {
this.chefs.add(new Chef(this));
}
for (int i = 0; i < waiterNum; i++) {
this.waiters.add(new Waiter());
}
}
public Order getOrder() throws InterruptedException {
return orders.take();
}
public void addOrder(Order order) throws InterruptedException {
orders.put(order);
}
public void run() {
for (Chef chef : chefs) {
es.execute(chef);
}
for (Waiter waiter : waiters) {
es.execute(waiter);
}
int counter = 0;
while (!Thread.interrupted()) {
Customer customer = new Customer();
this.customers.add(customer);
int waiterIndex = rand.nextInt(waiters.size());
Waiter waiter = waiters.get(waiterIndex);
Type mealType = Enums.random(Meal.Type.class);
Order newOrder = new Order(customer, waiter, mealType);
System.out.println(newOrder);
try {
this.addOrder(newOrder);
} catch (InterruptedException e) {
e.printStackTrace();
}
//顾客准备就餐
es.execute(customer);
counter++;
if (counter > 5) {
break;
}
}
}
}
class Chef implements Runnable {
private static int counter = 0;
private final int id = ++counter;
private Restaurant restaurant;
private static Random rand = new Random();
public Chef(Restaurant restaurant) {
this.restaurant = restaurant;
}
public void run() {
try {
while (!Thread.interrupted()) {
Order order = restaurant.getOrder();
// cook meal
TimeUnit.MILLISECONDS.sleep(rand.nextInt(500) + 500);
Meal meal = new Meal(order.getMealType());
System.out.println(meal + " is cooked by" + this);
order.setMeal(meal);
order.getWaiter().addCookedOrder(order);
}
} catch (InterruptedException e) {
// e.printStackTrace();
}
}
public String toString() {
return Fmt.sprintf("Chef#%d", id);
}
}
class Waiter implements Runnable {
private static int counter = 0;
private final int id = ++counter;
private BlockingQueue<Order> cooked = new LinkedBlockingQueue<>();
public void addCookedOrder(Order order) throws InterruptedException {
cooked.put(order);
}
public void run() {
try {
while (!Thread.interrupted()) {
// 有做好的菜就传给顾客
Order order = cooked.take();
System.out.println(
this + " get cooked meal " + order.getMeal() + " and serve meal to " + order.getConsumer());
order.getConsumer().serveMeal(order.getMeal());
}
} catch (InterruptedException e) {
// e.printStackTrace();
}
}
public String toString() {
return Fmt.sprintf("Waiter#%d", id);
}
}
class Customer implements Runnable {
private static int counter = 0;
private int id = ++counter;
private SynchronousQueue<Meal> desk = new SynchronousQueue<>(); // 顾客的桌子只能放一道菜
private static Random rand = new Random();
public void serveMeal(Meal meal) throws InterruptedException {
desk.put(meal);
}
public void run() {
try {
while (!Thread.interrupted()) {
Meal meal = desk.take();
TimeUnit.MILLISECONDS.sleep(rand.nextInt(500) + 200);
System.out.println(meal + " is eated by " + this);
}
} catch (InterruptedException e) {
// e.printStackTrace();
}
}
public String toString() {
return Fmt.sprintf("Customer#%d", id);
}
}
public class Main {
public static void main(String[] args) throws InterruptedException {
ExecutorService es = Executors.newCachedThreadPool();
es.execute(new Restaurant(1, 2, es));
es.awaitTermination(3, TimeUnit.SECONDS);
es.shutdownNow();
}
}
这个示例中的关键是订单(Order
),餐厅线程会产生顾客并生成订单,这个订单会加入到餐厅的一个阻塞队列中,厨师会从这个队列中获取订单并处理,然后将处理好的订单(包含食物)传递给侍者的一个阻塞队列,侍者从这个阻塞队列中获取食物并给对应的顾客上菜。
因为从头到尾订单都是通过队列传递的,所以不需要对订单进行同步。
这个示例中有这么几个地方可以优化:
-
给顾客上菜和顾客获取烹饪好的菜都是通过一个
SynchronousQueue
进行的,这实际上是一个“无缓冲队列”,也就是说侍者给顾客上菜后只能等待,只有顾客取走菜后侍者才能去给下一个顾客上菜。 -
在这个示例中,厨师会将做好的菜转交给固定侍者的队列,这样做好处是从头到尾都是一个指定侍者服务指定顾客,但缺点是不能根据侍者的忙碌情况来动态分配侍者上菜。
有兴趣的可以自行改进。
性能调优
这部分,原书《Thinking in Java》使用了相当篇幅的测试代码对比了多种同步方式的性能差异,基于篇幅和时间的关系,这里不做重现和探讨,这里只讨论这些多线程性能改善方案的原理与使用方式。
同步方法 or 锁
从理论上讲,同步方法的性能是要差于锁的,但实际上编译器和虚拟机都会对并发代码进行一定程度的优化,所以只有在编译器无法优化,且互斥情况频繁发生的情况下,前者才会显著差于后者。但实际上往往互斥的性能损失相比被互斥保护的部分的代码执行时间要小很多,所以这部分性能差异是否真的是程序的瓶颈依然值得仔细斟酌。
此外,最最重要的是,同步方法的代码可读性和易用性上都要优于锁,而且不应当过早且激进地对并发程序进行优化,而是应当先使用最简单和显而易见的解决方案来编程,只有在性能显著地成为程序瓶颈时再进行并发优化。
免锁容器
实际上之前示例中出现过的CopyOnWriteArrayList
就是一个“免锁容器”。
我们知道,普通的List
在使用迭代器进行遍历时,如果通过List.remove
等方法对元素进行删除,此时如果有其它线程正在遍历当前List
,就会产生一个ConcurrentModificationException
异常。
package ch24.nolock;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
public class Main {
public static void main(String[] args) {
List<Integer> list = new ArrayList<>();
for (int i = 0; i < 10; i++) {
list.add(i);
}
new Thread(){
public void run() {
try {
TimeUnit.MILLISECONDS.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
list.remove(5);
};
}.start();
for (Integer integer : list) {
System.out.println(integer);
try {
TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
// 0
// 1
// 2
// 3
// 4
// Exception in thread "main" java.util.ConcurrentModificationException
// at java.base/java.util.ArrayList$Itr.checkForComodification(ArrayList.java:1013)
// at java.base/java.util.ArrayList$Itr.next(ArrayList.java:967)
// at ch24.nolock.Main.main(Main.java:23)
当然,本质上这里就应该使用同步版本的
List
。
之所以会这样设计,是因为迭代器本身只是一个依赖于原始List
的“视图”,如果因为其他线程的操作,原始视图已经发生改变,那迭代器就变得相当不可靠,可能会产生一些不可预料的结果,所以只能是以抛出异常的方式结束当前迭代。
一个更加让人吃惊的结果是,即使是单线程,也会抛出这个异常:
package ch24.nolock2;
import java.util.ArrayList;
import java.util.List;
public class Main {
public static void main(String[] args) {
List<Integer> list = new ArrayList<>();
for (int i = 0; i < 10; i++) {
list.add(i);
}
for (Integer integer : list) {
System.out.println(integer);
if(integer.equals(3)){
list.remove(integer);
}
}
}
}
// 0
// 1
// 2
// 3
// Exception in thread "main" java.util.ConcurrentModificationException
// at java.base/java.util.ArrayList$Itr.checkForComodification(ArrayList.java:1013)
// at java.base/java.util.ArrayList$Itr.next(ArrayList.java:967)
// at ch24.nolock.Main.main(Main.java:12)
在单线程中,原因是类似的,在迭代器迭代过程中,底层List
发生了改变,但是迭代器无法“知晓”这种变化(即使是在同一个线程中),所以它抛出了这个异常。
在单线程中,可以显式地使用迭代器来规避这个问题:
package ch24.nolock3;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
public class Main {
public static void main(String[] args) {
List<Integer> list = new ArrayList<>();
for (int i = 0; i < 10; i++) {
list.add(i);
}
Iterator<Integer> iterator = list.iterator();
while (iterator.hasNext()) {
Integer integer = iterator.next();
System.out.print(integer + " ");
if (integer.equals(3)) {
iterator.remove();
}
}
System.out.println();
System.out.println(list);
}
}
// 0 1 2 3 4 5 6 7 8 9
// [0, 1, 2, 4, 5, 6, 7, 8, 9]
显而易见地是,这样做很麻烦。
对此,CopyOnWriteArrayList
给出的解决思路是,如果有线程要修改CopyOnWriteArrayList
,就复制一个拷贝,实际上线程只会修改这个拷贝,当修改完毕后,再用一个原子操作用修改后的版本更新原始List
。在更新之前,这些修改部分的数据对其它线程的访问者是不可见的。换言之,在任意时刻,CopyOnWriteArrayList
中的数据都是完整和稳定的,不会处于一种“在修改但还没有完成”的状态。这其中只有最后从拷贝更新到原始数据这个操作是需要同步的。
所以CopyOnWriteArrayList
不会有之前发生的问题:
package ch24.nolock4;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
public class Main {
public static void main(String[] args) {
List<Integer> list = new CopyOnWriteArrayList<>();
for (int i = 0; i < 10; i++) {
list.add(i);
}
for (Integer integer : list) {
System.out.print(integer+" ");
if(integer.equals(3)){
list.remove(integer);
}
}
System.out.println();
System.out.println(list);
}
}
// 0 1 2 3 4 5 6 7 8 9
// [0, 1, 2, 4, 5, 6, 7, 8, 9]
当然规避ConcurrentModificationException
只是这类免锁容器的一个副作用,他们真正的用途在于读取数据时无需同步。因此如果使用场景是需要一个低频写入,高频读取的List
,那么CopyOnWriteArrayList
的效率可能要比其他的同步List
高很多。
除了CopyOnWriteArrayList
,还有CopyOnWriteArraySet
,实际上它是使用CopyOnWriteArrayList
实现的,所以它们的原理和作用差不多。此外ConcurrentHashMap
和ConcurrentLinkedQueue
使用了类似的技术。
乐观加锁
我们知道,在只需要一个确保同步的基础数据时,我们可以使用“原子类”来代替加锁行为,原子类的相关操作是原子性的,并且效率要比使用互斥锁高很多。
除了这种常见的使用方式外,还可以实现一种“乐观加锁”的操作。
package ch24.happy_lock;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
class HappyLock implements Runnable {
private static final int DIMENSION1 = 10;
private static final int DIMENSION2 = 15;
private AtomicInteger[][] data = new AtomicInteger[DIMENSION1][DIMENSION2];
private static Random rand = new Random();
public HappyLock() {
for (int i = 0; i < DIMENSION1; i++) {
for (int j = 0; j < DIMENSION2; j++) {
data[i][j] = new AtomicInteger(rand.nextInt(100));
}
}
}
public void run() {
while (!Thread.interrupted()) {
int indexX = rand.nextInt(DIMENSION1);
for (int j = 0; j < DIMENSION2; j++) {
int oldValue = data[indexX][j].get();
// Thread.yield();
int preIndex = (indexX - 1) < 0 ? DIMENSION1 - 1 : indexX - 1;
int preValue = data[preIndex][j].get();
int afterValue = data[(indexX + 1) % DIMENSION1][j].get();
int newValue = (oldValue + preValue + afterValue) / 3;
boolean result = data[indexX][j].compareAndSet(oldValue, newValue);
if (!result) {
System.out.println("Data changed by others, drop now change operation.");
}
}
}
}
}
public class Main {
public static void main(String[] args) throws InterruptedException {
ExecutorService es = Executors.newCachedThreadPool();
HappyLock hl = new HappyLock();
for (int i = 0; i < 10; i++) {
es.execute(hl);
}
es.awaitTermination(5, TimeUnit.SECONDS);
es.shutdownNow();
}
}
上面这个示例中,有一个由原子类AtomicInteger
组成的矩阵,子线程会选取一个随机的横坐标x
,然后从上到下更新元素,新值的计算方式是左右的数字和当前数字之和,再取平均值。
显然多个线程同时处理矩阵必然涉及数据共享问题,所以一般情况下至少要获取当前正在处理的元素的锁,以确保读取数值后不会有其它线程来更新当前元素。但这里并没有这样做,这里没有使用任何锁或者同步,只是在读取当前元素后保留了一个原始值oldValue
,并在最终修改值时调用AtomicInteger.compareAndSet
方法,该方法会检查给定的原始值oldValue
和原子类当前值是否一致,如果相同,就说明这期间没有其他线程修改原子类,自然就可以正常更新数据,如果不相同,该方法不会更新数据,并且返回false
。
这种“乐观加锁”的处理方式,优点在于实际上并没有使用锁或者同步,自然就不会有传统多线程因为互斥产生的性能损失,但缺点在于只能处理特定问题,就像示例中那样,必须要承担可能的更新数据失败风险,并在失败后作出相应的处理(在这里是简单丢弃和打印消息)。
读写锁
很多支持并发的编程语言都会在“互斥锁”的基础上提供“读写锁”,顾名思义,读写锁提供更好的读并发的性能优化。
具体来说,读写锁可以提供“写加锁”和“读加锁”,后者只有在前者发生时才会产生阻塞,也就是说对于低频写入,高频读取的资源,使用读写锁可以显著改善并发性能。
关于读写锁的用法,可以看下面这个示例:
package ch24.read_write_lock;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
class Student {
}
class ClassRoom {
private List<Student> students = new ArrayList<>();
private ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
public ClassRoom(int studentNum) {
for (int i = 0; i < studentNum; i++) {
students.add(new Student());
}
}
public int getStudentsNum() {
Lock readLock = lock.readLock();
readLock.lock();
try {
return students.size();
} finally {
readLock.unlock();
}
}
public Student getStudent(int index) {
Lock readLock = lock.readLock();
readLock.lock();
try {
return students.get(index);
} finally {
readLock.unlock();
}
}
public void SetStudent(int index, Student s) {
Lock writeLock = lock.writeLock();
writeLock.lock();
students.set(index, s);
writeLock.unlock();
}
}
class ReadTask implements Runnable {
private ClassRoom cr;
private static Random rand = new Random();
public ReadTask(ClassRoom cr) {
this.cr = cr;
}
public void run() {
try {
while (!Thread.interrupted()) {
TimeUnit.MILLISECONDS.sleep(rand.nextInt(500) + 500);
Student s = cr.getStudent(rand.nextInt(cr.getStudentsNum()));
System.out.println(s + " is geted.");
}
} catch (InterruptedException e) {
// e.printStackTrace();
}
}
}
class SetTask implements Runnable {
private ClassRoom cr;
private static Random rand = new Random();
public SetTask(ClassRoom cr) {
this.cr = cr;
}
public void run() {
try {
while (!Thread.interrupted()) {
TimeUnit.MILLISECONDS.sleep(rand.nextInt(500) + 500);
int index = rand.nextInt(cr.getStudentsNum());
cr.SetStudent(index, new Student());
System.out.println("student in " + index + " is seted.");
}
} catch (InterruptedException e) {
// e.printStackTrace();
}
}
}
public class Main {
public static void main(String[] args) throws InterruptedException {
ExecutorService es = Executors.newCachedThreadPool();
ClassRoom cr = new ClassRoom(10);
for(int i=0;i<10;i++){
es.execute(new ReadTask(cr));
}
es.execute(new SetTask(cr));
es.awaitTermination(3, TimeUnit.SECONDS);
es.shutdownNow();
}
}
活动对象
活动对象指的是这么一类对象:它们可以自己管理线程池,所有对它们的调用都会转化为发送消息。
在Java中,可以利用Future
类实现活动对象:
package ch24.active_obj;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
class ActiveObject {
private ExecutorService es = Executors.newSingleThreadExecutor();
private static Random rand = new Random();
public Future<Integer> add(int x, int y) {
return es.submit(new Callable<Integer>() {
public Integer call() throws Exception {
TimeUnit.MILLISECONDS.sleep(rand.nextInt(100) + 100);
return x + y;
}
});
}
public void shutdown() {
es.shutdown();
}
}
public class Main {
public static void main(String[] args) {
ActiveObject ao = new ActiveObject();
List<Future<Integer>> results = new ArrayList<>();
for (int i = 0; i < 10; i++) {
results.add(ao.add(i, i));
}
for (Future<Integer> future : results) {
try {
Integer integer = future.get();
System.out.print(integer + " ");
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
ao.shutdown();
}
}
// 0 2 4 6 8 10 12 14 16 18
可以看到,对ActiveObject.add
的方法调用实际上变成了ActiveObject
中ExecutorService
线程池中的线程调度。而ExecutorService
线程池本身是一个单线程的线程池(newSingleThreadExecutor
),所以这种方法调用实际上可以看作是通过一个消息队列向ActiveObject
发送消息,ActiveObject
会按照消息的排队情况依次处理。
这么做的好处在于,没有任何显式地同步和加锁,实际的同步动作只有通过ExecutorService.submit
向线程池中添加任务时才会发生。这种同步延迟是相当小的,并不会产生明显的阻塞,因此也不需要处理InterruptException
。当然,如果你需要获取处理结果,可能就需要通过Future
来获取,此时阻塞是无法避免的。
总的来说这是一种相当有趣的设计,通过巧妙地使用线程池和Future
,将一个对特定对象的并发方法调用变成了无阻塞的”消息发送“。这为多线程编程提供了一种额外思路。
我必须承认,这部分内容相当的虎头蛇尾,因为种种原因就这样潦草结束了,剩余的awt
图形部分早已过时,所以《Thing in Java》的笔记部分到此结束,谢谢阅读。
本篇文章的所有源码可以从获取。
文章评论