Брюс Эккель - Философия Java3
Мы создадим пул объектов Fat, чтобы свести к минимуму затраты на выполнение конструктора. Для тестирования класса Pool будет создана задача, которая забирает объекты Fat для использования, удерживает их в течение некоторого времени, а затем возвращает обратно:
// concurrency/SemaphoreDemo java // Тестирование класса Pool import java.util.concurrent.*; import java util *;
import static net.mindview.util.Print.*;
// Задача для получения ресурса из пула: class CheckoutTask<T> implements Runnable { private static int counter = 0; private final int id = counter++; private Pool<T> pool. public CheckoutTask(Pool<T> pool) { this.pool = pool;
}
public void run() { try {
T item = pool.checkoutО;
print(this + "checked out " + item); продолжение &
TimeUnit SECONDS sleep(l), pri nt(thi s +"checking in " + item), pool checkln(item). } catch(InterruptedException e) {
// Приемлемый способ завершения
}
}
public String toStringO {
return "CheckoutTask " + id + " ";
public class SemaphoreDemo {
final static int SIZE = 25;
public static void main(String[] args) throws Exception { final Pool<Fat> pool =
new Pool<Fat>(Fat.class. SIZE). ExecutorService exec = Executors newCachedThreadPoolО. for(int i = 0; i < SIZE; i++)
exec.execute(new CheckoutTask<Fat>(pool)). print("All CheckoutTasks created"); List<Fat> list = new ArrayList<Fat>0. for(int i = 0; i < SIZE; i++) { Fat f = pool.checkout О. printnb(i + " mainO thread checked out "). f operationO; list add(f);
}
Future<?> blocked = exec submit(new RunnableO { public void runO { try {
// Семафор предотвращает лишний вызов checkout. // поэтому следующий вызов блокируется: pool checkOutO. } catch(InterruptedException e) {
pri nt("checkout() Interrupted");
}
}
}):
TimeUnit.SECONDS sleep(2);
blocked.cancel(true); // Выход из заблокированного вызова print("Checking in objects in " + list); for(Fat f • list)
pool checkln(f); for(Fat f : list)
pool.checkln(f); // Второй вызов checkln игнорируется exec.shutdown О;
}
} ///:-
В коде main() создается объект Pool для хранения объектов Fat, после чего группа задач CheckoutTask начинает использовать Pool. Далее поток main() начинает выдавать объекты Fat, не возвращая их обратно. После того как все объекты пула будут выданы, семафор запрещает дальнейшие выдачи. Метод run() блокируется, и через две секунды вызывается метод cancel(). Лишние возвраты Pool игнорирует.
Exchanger
Класс Exchanger представляет собой «барьер», который меняет местами объекты двух задач. На подходе к барьеру задачи имеют один объект, а на выходе — объект, ранее удерживавшийся другой задачей. Объекты Exchanger обычно используются в тех ситуациях, когда одна задача создает высокозатратные объекты, а другая задача эти объекты потребляет.
Чтобы опробовать на практике класс Exchanger, мы создадим задачу-постав-щика и задачу-потребителя, которые благодаря параметризации и генераторам могут работать с объектами любого типа. Затем эти параметризованные задачи будут применены к классу Fat. ExchangerProducer и ExchangerConsumer меняют местами List<T>; при вызове метода Exchanger.exchange() вызов блокируется до тех пор, пока парная задача не вызовет свой метод exchange(), после чего оба метода exchange() завершаются, а контейнеры List<T> меняются местами:
//: concurrency/ExchangerDemo.java import java.util.concurrent.*; import java.util.*; i mport net.mi ndvi ew.uti1.*:
class ExchangerProducer<T> implements Runnable { private Generator<T> generator; private Exchanger<List<T>> exchanger; private List<T> holder; ExchangerProducer(Exchanger<Li st<T>> exchg, Generator<T> gen, List<T> holder) { exchanger = exchg; generator = gen; this.holder = holder;
}
public void run() { try {
while(IThread.interruptedO) {
for(int i =0; i < ExchangerDemo size; i++)
hoi der.add(generator. nextO); // Заполненный контейнер заменяется пустым: holder = exchanger exchange(holder);
}
} catchdnterruptedException e) {
// Приемлемый способ завершения.
}
class ExchangerConsumer<T> implements Runnable { private Exchanger<List<T>> exchanger; private List<T> holder; private volatile T value;
ExchangerConsumer(Exchanger<List<T>> ex, List<T> holder){ exchanger = ex; this.holder = holder;
}
public void runO {
} catch(InterruptedException e) {
// Приемлемый способ завершения
}
System out.printlnC'HToroBoe значение- " + value).
}
}
public class ExchangerDemo { static int size = 10; static int delay = 5; // Секунды
public static void main(String[] args) throws Exception { if(args.length > 0)
size = new lnteger(args[0]), if(args.length > 1)
delay = new Integer(args[l]); ExecutorService exec = Executors.newCachedThreadPoolО. Exchanger<List<Fat>> xc = new Exchanger<List<Fat»0; List<Fat>
producerList = new CopyOnWriteArrayList<Fat>(). consumerList = new CopyOnWriteArrayList<Fat>(); exec.execute(new ExchangerProducer<Fat>(xc.
BasicGenerator.create(Fat.class). producerList)); exec.execute(
new ExchangerConsumer<Fat>(xc.consumerLi st)); TimeUni t.SECONDS.sieep(delay); exec shutdownNowO;
}
} /* Output:
Итоговое значение: Fat id: 29999 *///.-
В методе main() для обеих задач создается один объект Exchanger, а для перестановки создаются два контейнера CopyOnWriteArrayList. Эта разновидность List нормально переносит вызов метода remove() при перемещении по списку, не выдавая исключения ConcurrentModificationException. ExchangerProducer заполняет список, а затем меняет местами заполненный список с пустым, передаваемым от ExchangerConsumer. Благодаря Exchanger заполнение списка происходит одновременно с использованием уже заполненного списка.
Моделирование
Одна из самых интересных областей применения параллельных вычислений — всевозможные имитации и моделирование. Каждый компонент модели оформляется в виде отдельной задачи, что значительно упрощает его программирование.
whi 1е(!Thread interruptedO) {
holder = exchanger.exchange(holder), for(T x . holder) {
value = x; // Выборка значения holder remove(x); // Нормально для
CopyOnWri teArrayLi st
Примеры HorseRace.java и GreenhouseScheduler.java, приведенные ранее, тоже можно считать своего рода имитаторами.
Модель кассира
В этой классической модели объекты появляются случайным образом и обслуживаются за случайное время ограниченным количеством серверов. Моделирование позволяет определить идеальное количество серверов. Продолжительность обслуживания в следующей модели зависит от клиента и определяется случайным образом. Вдобавок мы не знаем, сколько новых клиентов будет прибывать за каждый период времени, поэтому эта величина тоже определяется случайным образом.
//. concurrency/BankTel1erSimulation.java
// Пример использования очередей и многопоточного программирования. // {Args. 5}
import java.util.concurrent *. import java.util *;
// Объекты, доступные только для чтения, не требуют синхронизации-class Customer {
private final int serviceTime, public Customer(int tm) { serviceTime = tm; } public int getServiceTimeO { return serviceTime; } public String toStringO {
return "[" + serviceTime + "]";
}
}
// Очередь клиентов умеет выводить информацию о своем состоянии: class CustomerLine extends ArrayBlockingQueue<Customer> { public Customerl_ine(int maxLineSize) { super(maxLineSize),
}
public String toStringO {
ifCthis sizeO == 0)
return "[Пусто]"; StringBuilder result = new StringBuilderO; for(Customer customer this)
result append(customer), return result toStringO,
}
}
// Случайное добавление клиентов в очередь: class CustomerGenerator implements Runnable { private CustomerLine customers, private static Random rand = new Random(47), public CustomerGenerator(CustomerLine cq) { customers = cq,
}
public void runO { try {
while(IThread.interruptedO) {
TimeUnit MILLISECONDS.sleep(rand nextlnt(300)):
продолжение &
customers put(new Customer(rand nextlnt(lOOO)));
}
} catchdnterruptedException e) {
System.out.pri ntin("CustomerGenerator i nterrupted");
}
System.out printin("CustomerGenerator terminating");
class Teller implements Runnable. Comparable<Teller> { private static int counter = 0; private final int id = counter**; // Счетчик клиентов, обслуженных за текущую смену: private int customersServed = 0; private CustomerLine customers; private boolean servingCustomerLine = true; public Teller(CustomerLine cq) { customers = cq; } public void run О { try {
while(IThread.interruptedO) {
Customer customer = customers.takeO. Ti meUni t.MILLISECONDS.s1eep(
customer. getServiceTimeO); synchronized(this) {
customersServed++; while(IservingCustomerLine) waitO;
}
}
} catchdnterruptedException e) {
System out println(this + "прерван");
}
System out.println(this + "завершается");
}
public synchronized void doSomethingElseO { customersServed = 0; servingCustomerLine = false;
}
public synchronized void serveCustomerLineO {
assert IservingCustomerLine:"уже обслуживает: " + this; servingCustomerLine = true; notifyAl 10;
}
public String toStringO { return "Кассир " + id + " "; } public String shortStringO { return "K" + id. } // Используется приоритетной очередью: public synchronized int compareTo(Teller other) {
return customersServed < other customersServed ? -1 .
(customersServed == other.customersServed ? 0 . 1);
}
}
class TellerManager implements Runnable { private ExecutorService exec, private CustomerLine customers; private PriorityQueue<Teller> workingTellers =
new PriorityQueue<Teller>(); private Queue<Teller> tellersDoingOtherThings =
new LinkedList<Tel 1 er>(); private int adjustmentPeriod. private static Random rand = new Random(47); public TellerManager(ExecutorService e,
CustomerLine customers, int adjustmentPeriod) { exec = e;
this.customers = customers;
this.adjustmentPeriod = adjustmentPeriod;
// Начинаем с одного кассира:
Teller teller = new Teller(customers);
exec.execute(teller);
workingTellers.add(teller);
}
public void adjustTellerNumberO {
// Фактически это система управления. Регулировка числовых // параметров позволяет выявить проблемы стабильности // в механизме управления.
// Если очередь слишком длинна, добавить другого кассира: if(customers.size() / workingTellers.sizeO > 2) { // Если кассиры отдыхают или заняты // другими делами, вернуть одного из них: if(tellersDoingOtherThings.size() > 0) {
Teller teller = tellersDoingOtherThings.remove(); tel1er.serveCustomerLi ne(); workingTellers.offer(teller); return;
}
// Иначе создаем (нанимаем) нового кассира Teller teller = new Teller(customers); exec.execute(teller); workingTellers.add(teller); return;
}
// Если очередь достаточно коротка, освободить кассира: if (workingTellers.sizeO > 1 &&
customers.size() / workingTellers.sizeO < 2) reassignOneTellerO; // Если очереди нет. достаточно одного кассира: if (customers, si ze() ==0)
while(workingTellers.size() > 1) reassignOneTellerO;
}
// Поручаем кассиру другую работу или отправляем его отдыхать: private void reassignOneTellerO {
Teller teller = workingTellers.pollО;
tel 1 er. doSomethi ngEl seO,
tel1ersDoi ngOtherThi ngs.offer(tel1er);
}
public void runO { try {
while(!Thread.interruptedO) {
TimeUnit.MILLISECONDS.sleep(adjustmentPeriod);
adjustTellerNumberO;
System.out.print(customers +"{");