Олег Цилюрик - QNX/UNIX: Анатомия параллелизма
Остановимся только на одном обстоятельстве: адресат получателя, которому направляется каждое сообщение, определяется при начальном установлении идентификатора соединения (coid — connect ID) вызовом:
#include <sys/neutrino.h>
int ConnectAttach(int nd, pid_t Did, int chid,
unsigned index, int flags);
Адрес назначения (сервера) в этом вызове определяется триадой {ND/PID/CHID}, где:
nd — идентификатор сетевого узла. Мы не станем углубляться в идентификацию сетевых узлов сети QNET. Возьмем на заметку лишь тот факт, что обмен сообщениями с одинаковой легкостью осуществляется как с процессом на локальном узле (nd = 0), так и на любом другом сетевом узле.
pid — PID процесса-сервера, с которым производится соединение.
chid — идентификатор канала, который открыл процесс с указанным PID, выполнив предварительно ChannelCreate(), и к которому устанавливается соединение вызовом ConnectAttach().
Выше мы неоднократно отмечали, что с процессом как с пассивной субстанцией, вообще говоря, невозможно обмениваться сообщениями. Хотя в адресной триаде обмена фигурирует именно PID процесса! Это обстоятельство не меняет положения вещей: именно адресная компонента CHID и определяет тот поток (часто это может быть главный поток приложения), с которым будет осуществляться обмен сообщениями, a PID определяет то адресное пространство процесса, в которое направляется сообщение, адресованное CHID.
Детальнее это выглядит так: в коде сервера именно тот поток, который выполнит MsgReceive*(chid, ...), и будет заблокирован в ожидании запроса от клиента MsgSend*(). Аналогично и в коде клиента вся последовательность выполнения блокировок, обозначенная выше, будет относиться именно к потоку, выполняющему последовательные операции:
coid = ConnectAttach(... , chid, ...);
MsgSend*(coid, ...);
Содержимое двух предыдущих абзацев ни одной буквой не противоречит и не отменяет положения традиционного изложения [1] технологии обмена сообщениями микроядра. Тогда зачем же мы даем именно такую формулировку? Для того чтобы акцентировать внимание на том, что все блокированные состояния и их освобождение имеют смысл относительно потоков (и только потоков!), которые выполняют последовательность операций MsgSend*() — MsgReceive*() — MsgReply*() (даже если это единственный поток — главный поток приложения, и тогда мы говорим о блокировании процессов). Проиллюстрируем сказанное следующим приложением (файл n1.cc):
Обмен сообщениями и взаимные блокировки#include <stdlib.h>
#include <stdio.h>
#include <unistd.h>
#include <inttypes.h>
#include <errno.h>
#include <iostream.h>
#include <pthread.h>
#include <signal.h>
#include <sys/neutrino.h>
#include <sys/syspage.h>
static const int TEMP = 500; // темп выполнения приложения
static int numclient = 1; // число потоков клиентов
// многопотоковая версия вывода диагностики в поток:
iostream& operator <<(iostream& с, char* s) {
static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_lock(&mutex);
c << s << flush;
pthread_mutex_unlock(&mutex);
return c;
}
static uint64_t tb; // временная отметка начала приложения
// временная отметка точки вызова:
inline uint64_t mark(void) {
// частота процессора:
const static uint64_t cps =
SYSPAGE_ENTRY(qtime)->cycles_per_sec;
return (ClockCycles() - tb) * 1000 / cps;
}
const int MSGLEN = 80;
// потоковая функция сервера:
void* server(void* chid) {
int rcvid;
char message[MSGLEN];
while (true) {
rcvid = MsgReceive((int)chid, message, MSGLEN, NULL);
sprintf(message + strlen(message), "[%07llu] ... ", mark());
delay(TEMP); // имитация обслуживания
sprintf(message + strlen(message), [%07llu]->", mark());
MsgReply(rcvid, EOK, message, strlen(message) + 1);
}
return NULL;
}
// потоковая функция клиента:
void* client(void* data) {
while (true) {
char message[MSGLEN];
sprintf(message, "%d:t[%07llu]->", pthread_self(), mark());
MsgSend((int)data, message, strlen(message) + 1, message, MSGLEN);
sprintf(message + strlen(message), "[%07llu]", mark());
cout << message << endl;
static unsigned int seed = 0;
delay(numclient*(((long)rand_r(&seed ) * TEMP / RAND_MAX) + TEMP));
// имитация вычислений...
}
return NULL;
}
int main(int argc, char** argv) {
// 1-й параметр - число потоков клиентов:
if (argc > 1 && atoi(argv[1]) > 0)
numclient = atoi(argv[1]);
tb = ClockCycles();
int chid = ChannelCreate(0);
if (pthread_create(NULL, NULL, server, (void*)chid) != EOK)
perror("server create"), exit(EXIT_FAILURE);
for (int i = 0; i < numclient; i++)
if (pthread_create(NULL, NULL, client,
(void*)ConnectAttach(0, 0, chid, _NTO_SIDE_CHANNEL, 0)) != EOK)
perror("client create"), exit(EXIT_FAILURE);
sigpause(SIGINT);
return EXIT_SUCCESS;
}
Все происходит в рамках единого процесса:
• Создается единый поток сервера, ожидающий сообщений от клиентов и отвечающий на них.
• Создается N потоков клиентов (задается параметром командной строки запуска приложения), которые будут обращаться к серверу.
• К одному каналу сервера устанавливается N соединений от клиентов.
• Канал прослушивания для сервера и идентификаторы соединений для клиентов сознательно создаются в главном потоке (т.e. вне потоков, которые их будут использовать); их значения поступают в потоки (сервера и клиентов) как параметры потоковых функций (трюк с подменой целочисленных значений на указатели мы рассматривали ранее).
• Сообщение продвигается от клиента к серверу и обратно к клиенту; в ходе пересылки объем сообщения нарастает: оно образуется конкатенацией полей, добавляемых последовательно клиентом, сервером и снова клиентом.
• В результате полного цикла обмена сообщением в теле самого сообщения формируется текст, содержащий 5 последовательных полей — идентификатор потока клиента (обращающегося с сообщением) и 4 абсолютные временные метки (в миллисекундах): передачи сообщения клиентом, приема сообщения сервером (начало обработки), ответа на сообщение сервером (конец обработки), приема ответа клиентом.
Запустим полученное приложение, например, так:
# n1 5
И прежде чем обсуждать результаты его работы, понаблюдаем состояния (блокировки) его потоков командой pidin (с другого терминала, естественно). Вот несколько «снимков» состояний, здесь можно наблюдать весь спектр блокированных состояний, о которых говорилось выше:
5546027 1 ./n1 10r SIGSUSPEND
5546027 2 ./n1 10r NANOSLEEP
5546027 3 ./n1 10r NANOSLEEP
5546027 4 ./n1 10r SEND 5546027
5546027 5 ./n1 10r REPLY 5546027
5546027 6 ./n1 10r NANOSLEEP
5546027 7 ./n1 10r NANOSLEEP
5730347 1 ./n1 10r SIGSUSPEND
5730347 2 ./n1 10r RECEIVE 1
5730347 3 ./n1 10r NANOSLEEP
5730347 4 ./n1 10r NANOSLEEP
5730347 5 ./n1 10r NANOSLEEP
5730347 6 ./n1 10r NANOSLEEP
5730347 7 ./n1 10r NANOSLEEP
А теперь рассмотрим результаты выполнения (на меньшем числе потоков клиентов, которое легче анализировать):
# n1 3
3: [0000000]->[0000000] ... [0000501]->[0000501]
4: [0000000]->[0000501] ... [0001003]->[0001003]
5: [0000000]->[0001003] ... [0001505]->[0001505]
3: [0002003]->[0002003] ... [0002504]->[0002505]
5: [0003462]->[0003462] ... [0003964]->[0003964]
4: [0003485]->[0003964] ... [0004466]->[0004466]
3: [0005017]->[0005017] ... [0005518]->[0005518]
5: [0005624]->[0005624] ... [0006126]->[0006126]
4: [0006741]->[0006741] ... [0007243]->[0007243]
...
Видно, как 3 клиента отправляют сообщения одновременно ([0000000]), поток сервера (TID = 2) немедленно получает сообщение ([0000000], 1-я строка), отправленное клиентом с TID = 3, два других сообщения (от клиентов с TID = 4 и 5) помещаются системой в очередь обслуживания (строки 2 и 3). После завершения обслуживания запроса от TID = 3 и ответа ([0000501]) поток сервера получает (извлекается из очереди ранее отправленное сообщение) сообщение от TID = 4 и так далее.
Еще содержательнее для интерпретации становится картина для большего числа потоков клиентов (здесь очередь ожидающих запросов становится гораздо длиннее, а ее поведение трудно предсказуемым - почти каждый запрос ожидает обслуживания), но эти результаты требуют намного более тщательного разбора для их осмысления:
# n1 10
3: [0000000]->[0000000] ... [0000501]->[0000501]
4: [0000000]->[0000501] ... [0001003]->[0001003]
5: [0000000]->[0001003] ... [0001505]->[0001505]
6: [0000000]->[0001505] ... [0002007]->[0002007]
7: [0000000]->[0002007] ... [0002508]->[0002508]
8: [0000000]->[0002508] ... [0003010]->[0003010]