Брюс Эккель - Философия Java3
CyclicBarrier
Класс CyclicBarrier используется при создании группы параллельно выполняемых задач, завершения которых необходимо дождаться до перехода к следующей фазе. Все параллельные задачи «приостанавливаются» у барьера, чтобы сделать возможным их согласованное продвижение вперед. Класс очень похож на CountDownLatch, за одним важным исключением: CountDownLatch является «одноразовым», a CyclicBarrier может использоваться снова и снова.
Имитации привлекали меня с первых дней работы с компьютерами, и параллельные вычисления играют в них ключевую роль. Даже самая первая программа, которую я написал на BASIC, имитировала скачки на ипподроме. Вот как выглядит объектно-ориентированная, многопоточная версия этой программы с использованием CyclicBarrier:
//: concurrency/HorseRace.java // Using CyclicBarriers import java.util concurrent *; import java.util *;s
import static net.mindview util Print.*;
class Horse implements Runnable { private static int counter = 0; private final int id = counter++; private int strides = 0; private static Random rand = new Random(47); private static CyclicBarrier barrier; public Horse(CyclicBarrier b) { barrier = b; } public synchronized int getStridesO { return strides; } public void run() { try {
whi 1 e(!Thread.interruptedO) { synchronized(this) {
strides += rand.nextInt(3); // Produces 0. 1 or 2
}
barrier.awaitO;
}
} catch(InterruptedException e) {
// Приемлемый вариант выхода } catch(BrokenBarrierException e) {
// Исключение, которое нас интересует throw new RuntimeException(e);
}
}
public String toStringO { return "Horse " + id + " "; } public String tracks О {
StringBuilder s = new StringBuilderO; for (int i = 0; i < getStridesO; i++)
s appendC'*"); s.append(id); return s.toStringO;
public class HorseRace {
static final int FINISH_LINE = 75;
private List<Horse> horses = new ArrayList<Horse>0;
private ExecutorService exec =
Executors.newCachedThreadPool (); private CyclicBarrier barrier; public HorseRace(int nHorses. final int pause) {
barrier = new CyclicBarrier(nHorses, new RunnableO { public void runO {
StringBuilder s = new StringBuilderO; for(int i = 0; i < FINISH_LINE; i++)
s.append("="); // Забор на беговой дорожке
print(s);
for(Horse horse ; horses)
print(horse.tracksO); for(Horse horse ; horses)
if (horse. getStridesO >= FINISH_LINE) { print(horse + "won!"); exec.shutdownNowO; return;
}
try {
Ti meUnit.MILLISECONDS.s1eep(pause); } catch(InterruptedException e) {
printC'barrier-action sleep interrupted");
}
}).
for(int i = 0. i < nHorses; i++) {
Horse horse = new Horse(barrier); horses add(horse); exec.execute(horse);
}
}
public static void main(String[] args) {
int nHorses = 7;
int pause = 200,
if(args.length > 0) { // Необязательный аргумент int n = new Integer(args[0J); nHorses = n > 0 ? n . nHorses,
}
if(args length > 1) { // Необязательный аргумент int p = new Integer(args[l]); pause = p > -1 ? p : pause;
}
new HorseRace(nHorses, pause);
}
} ///;-
Для объекта CyclicBarrier можно задать «барьерное действие» — объект Runnable, автоматически запускаемый при обнулении счетчика (еще одно отличие CyclicBarrier от CountdownLatch). В нашем примере барьерное действие определяется в виде безымянного класса, передаваемого конструктору CyclicBarrier.
Я попытался сделать так, чтобы каждый объект лошади отображал себя, но порядок отображения зависел от диспетчера задач. Благодаря CyclicBarrier каждая лошадь делает то, что ей необходимо для продвижения вперед, а затем ожидает у барьера перемещения всех остальных лошадей. Когда все лошади переместятся, CyclicBarrier автоматически вызывает «барьерную» задачу Runnable, чтобы отобразить всех лошадей по порядку вместе с барьером. Как только все задачи пройдут барьер, последний автоматически становится готовым для следующего захода.
DelayQueue
Класс представляет неограниченную блокирующую очередь объектов, реализующих интерфейс Delayed. Объект может быть извлечен из очереди только после истечения задержки. Очередь сортируется таким образом, что объект в начале очереди обладает наибольшим сроком истечения задержки. Если задержка ни у одного объекта не истекла, начального элемента нет, и вызов poll() возвращает null (из-за этого в очередь не могут помещаться элементы null).
В следующем примере объекты, реализующие Delayed, сами являются задачами, a DelayedTaskContainer берет задачу с наибольшей просроченной задержкой и запускает ее. Таким образом, DelayQueue является разновидностью приоритетной очереди.
//• concurrency/DelayQueueDemo java import java util concurrent *; import java util *.
import static java util concurrent TimeUnit *, import static net mindview.util Print *;
class DelayedTask implements Runnable, Delayed { private static int counter = 0; private final int id = counter++, private final int delta; private final long trigger; protected static List<DelayedTask> sequence =
new ArrayList<DelayedTask>(), public DelayedTask(int delaylnMilliseconds) { delta = delaylnMilliseconds, trigger = System nanoTimeO +
NANOSECONDS.convert(delta. MILLISECONDS). sequence add(this),
}
public long getDelay(TimeUnit unit) { return unit.convert(
trigger - System.nanoTime(), NANOSECONDS),
}
public int compareTo(Delayed arg) {
DelayedTask that = (DelayedTask)arg, if(trigger < that.trigger) return -1; if(trigger > that.trigger) return 1, return 0;
}
public void run() { printnb(this + " "), } public String toStringO {
return String.format("[£l$-4d] delta) + " Task " + id;
}
public String summaryО {
return "(" + id + + delta + ")";
}
public static class EndSentinel extends DelayedTask { private ExecutorService exec; public EndSentinel(int delay. ExecutorService e) { super(delay), exec = e;
}
public void runO {
for(DelayedTask pt . sequence) {
printnb(pt.summary() + " ");
}
printO;
print (this + " вызывает shutdownNowO"); exec.shutdownNowO;
}
}
}
class DelayedTaskConsumer implements Runnable { private DelayQueue<DelayedTask> q, public DelayedTaskConsumer(DelayQueue<DelayedTask> q) { this.q = q;
public void run() { try {
while(IThread interruptedO)
q.takeO.runО; // Выполнение задачи в текущем потоке } catchdnterruptedException е) {
// Приемлемый вариант выхода
}
print("Завершается DelayedTaskConsumer");
public class DelayQueueDemo {
public static void main(String[] args) { Random rand = new Random(47),
ExecutorService exec = Executors newCachedThreadPoolО; DelayQueue<@060>DelayedTask> queue =
new DelayQueue<DelayedTask>(); // Очередь заполняется задачами со случайной задержкой-for(int i = 0; i < 20; i++)
queue put(new DelayedTask(rand.nextInt(5000))); // Назначение точки остановки queue.add(new DelayedTask.EndSentinel(5000, exec)); exec.execute(new DelayedTaskConsumer(queue)),
}
} /* Output;
[128 ] Task 11 [200 ] Task 7 [429 ] Task 5 [520 ] Task 18 [555 ] Task 1 [961 ] Task 4 [998 ] Task 16 [1207] Task 9 [1693] Task 2 [1809] Task 14 [1861] Task 3 [2278] Task 15 [3288] Task 10 [3551] Task 12 [4258] Task 0 [4258] Task 19 [4522] Task 8 [4589] Task 13 [4861] Task 17 [4868] Task 6 (0:4258) (1.555) (2:1693) (3-1861) (4.961) (5:429) (6:4868) (7:200) (8:4522) (9:1207) (10:3288) (11:128) (12:3551) (13.4589) (14.1809) (15-2278) (16:998) (17:4861) (18:520) (19:4258) (20:5000) [5000] Task 20 вызывает shutdownNowO Завершается DelayedTaskConsumer *///.-
DelayedTask содержит контейнер List<DelayedTask> с именем sequence, в котором сохраняется порядок создания задач, и мы видим, что сортировка действительно выполняется.
Интерфейс Delayed содержит единственный метод getDelay(), который сообщает, сколько времени осталось до истечения задержки или как давно задержка истекла. Метод заставляет нас использовать класс TimeUnit, потому что его аргумент относится именно к этому типу. Впрочем, этот класс очень удобен, поскольку он позволяет легко преобразовывать единицы без каких-либо вычислений. Например, значение delta хранится в миллисекундах, а метод Java SE5 System.nanoTime() выдает значение в наносекундах. Чтобы преобразовать значение delta, достаточно указать исходные и итоговые единицы:
NANOSECONDS.convert(delta. MILL ISECONDS);
В getDelay() желаемые единицы передаются в аргументе unit. Аргумент используется для преобразования времени задержки во временные единицы, используемые вызывающей стороной.
Для выполнения сортировки интерфейс Delayed также наследует интерфейс Comparable, поэтому необходимо реализовать метод compareToQ для выполнения осмысленных сравнений. Методы toStringO и summary() обеспечивают форматирование вывода.
Из выходных данных видно, что порядок создания задач не влияет на порядок их выполнения — вместо этого задачи, как и предполагалось, выполняются в порядке следования задержек.
PriorityBlockingQueue
Фактически класс PriorityBlockingQueue представляет приоритетную очередь с блокирующими операциями выборки. В следующем примере объектами в приоритетной очереди являются задачи, покидающие очередь в порядке приоритетов. Для определения этого порядка в класс PrioritizedTask включается поле priority:
//: concurrency/PriorityBlockingQueueDemo.java import java.util.concurrent.*; import java.util.*;
import static net.mindview.util.Print.*;
class PrioritizedTask implements Runnable, Comparable<Pri ori ti zedTask> { private Random rand = new Random(47); private static int counter = 0; private final int id = counter++; private final int priority; protected static List<PrioritizedTask> sequence =
new ArrayLi st<Pri ori ti zedTask>(); public PrioritizedTask(int priority) { this.priority = priority; sequence.add(this);
}
public int compareTo(PrioritizedTask arg) { return priority < arg.priority ? 1 ;
(priority > arg.priority ? -1 : 0);
}
public void run() { try {
Ti mellni t. MILLI SECONDS. s 1 eep (rand. next I nt (250)); } catch(InterruptedException e) {
// Приемлемый вариант выхода
}
print(this);
}
public String toStringO {
return String, format С [n$-3d] priority) + " Task " + id;
}
public String summaryО {
return "(" + id + ":" + priority + ")";
}
public static class EndSentinel extends PrioritizedTask { private ExecutorService exec; public EndSentinel(ExecutorService e) {
super(-l); // Минимальный приоритет в этой программе
exec = e;
public void run() {
int count = 0;
for(PrioritizedTask pt : sequence) { printnb(pt.summaryO), if(++count % 5 == 0) printO.
}
printO;
print (this + " Calling shutdownNowO"); exec.shutdownNowO.
}
}
}
class PrioritizedTaskProducer implements Runnable { private Random rand = new Random(47); private Queue<Runnable> queue; private ExecutorService exec; public PrioritizedTaskProducer(
Queue<Runnable> q, ExecutorService e) { queue = q;
exec = e; // Используется для EndSentinel
}
public void run() {
// Неограниченная очередь без блокировки. // Быстрое заполнение случайными приоритетами: for(int i = 0; i < 20; i++) {
queue.add(new PrioritizedTask(rand.nextInt(10))); Thread.yieldO;
}
// Добавление высокоприоритетных задач: try {
for(int i = 0; i < 10; i++) {
TimeUnit.MILLISECONDS.sleep(250); queue.add(new PrioritizedTask(lO)).
}
// Добавление заданий, начиная с наименьших приоритетов: for(int i = 0; i < 10; i++)
queue.add(new PrioritizedTask(i)); // Предохранитель для остановки всех задач: queue.add(new PrioritizedTask EndSentinel(exec)); } catchdnterruptedException e) {