Олег Цилюрик - QNX/UNIX: Анатомия параллелизма
Для того чтобы не допускать разночтений в вопросе, обратимся сначала к оригинальному фрагменту документации, описывающему принятую модель:
The original POSIX specification defined signal operation on processes only. In a multi-threaded process, the following rules are followed:
*The signal actions are maintained at the process level. If a thread ignores or catches a signal, it affects all threads within the process.
*The signal mask is maintained at the thread level. If a thread blocks a signal, it affects only that thread.
*An un-ignored signal targeted at a thread will be delivered to that thread alone.
*An un-ignored signal targeted at a process is delivered to the first thread that doesn't have the signal blocked. If all threads have the signal blocked, the signal will be queued on the process until any thread ignores or unblocks the signal. If ignored, the signal on the process will be removed. If unblocked, the signal will be moved from the process to the thread that unblocked it.
Все достаточно однозначно: обработчики сигналов определяются на уровне процесса, а вот сигнальные маски, определяющие, реагировать ли на данный сигнал, - на уровне каждого из потоков.
Для манипулирования сигнальными масками на уровне потоков нам придется использовать функцию SignalProcmask()[34] (естественно, из состава native API, поскольку эта модель не декларирована POSIX):
#include <sys/neutrino.h>
int SignalProcmask(pid_t pid, int tid, int how, const sigset_t* set,
sigset_t* oldset);
Видна прямая аналогия с рассматривавшейся ранее функцией sigprocmask(). Да это и неудивительно, поскольку sigprocmask() является POSIX-«оберткой» к SignalProcmask(). Только рассматриваемый вызов имеет два «лишних» начальных параметра: PID и TID потока, к маске которого применяется действие. Если pid — 0, то предполагается текущий процесс, если tid = 0, то pid игнорируется и предполагается текущий поток, вызывающий функцию.
Остальные параметры соответствуют параметрам sigprocmask() (дополнительно появляется возможное значение SIG_PENDING для how).
Рассмотрим, как это работает на примере простейшего кода (файл s6.cc):
Сигналы, обрабатываемые в потоках#include <stdio.h>
#include <iostream.h>
#include <signal.h>
#include <unistd.h>
#include <pthread.h>
#include <time.h>
#include <sys/neutrino.h>
static void handler(int signo, siginfo_t* info, void* context) {
cout << "SIG = " << signo << ";
TID = " << pthread_self() << endl;
}
sigset_t sig;
void* threadfunc(void* data) {
SignalProcmask(0, 0, SIG_UNBLOCK, &sig, NULL);
while (true) pause();
}
int main() {
sigemptyset(&sig);
sigaddset(&sig, SIGRTMIN);
sigprocmask(SIG_BLOCK, &sig, NULL);
cout << "Process " << getpid() << ", waiting for signal " <<
SIGRTMIN << endl;
struct sigaction act;
act.sa_mask = sig;
act.sa_sigaction = handler;
act.sa_flags = SA_SIGINFO;
if (sigaction(SIGRTMIN, &act, NULL) < 0)
perror("set signal handler: ");
const int thrnum = 3;
for (int i = 0; i < thrnum; i++)
pthread_create(NULL, NULL, threadfunc, NULL);
pause();
exit(EXIT_SUCCESS);
}
Для анализа этого и последующих фрагментов нам будет недостаточно команды kill, поэтому сделаем простейший «передатчик» плотной (в смысле минимального интервала следования) последовательности повторяющихся сигналов (файл k6.cc). Выполнение этого тестера, например по команде:
# k6 -p214005 -s41 -n100
направляет процессу с PID = 214005 последовательность из 100 сигналов с кодом 41 (SIGRTMIN). Посылая нашему процессу-тестеру последовательность из N сигналов, мы получим N сообщений вида:
SIG = 41; TID = 4
ПримечаниеЗдесь удобный случай показать разницу между обработкой сигналов на базе очереди и простой обработкой (модель надежных сигналов). Для этого заменим две строки заполнения структуры sigaction на:
act.sa_handler = handler;
act.sa_flags = 0;
а заголовок функции handler() перепишем так: static void handler(int signo). Если теперь мы в точности повторим предыдущий тест, то при посылке процессу- тестеру последовательности из N сигналов мы получим всего одно сообщение все того же вида. Это наблюдение интересно еще и тем, что оно показывает, что алгоритм взаимодействия сигнала с потоками не зависит от того, какая обработка установлена для этого сигнала: на основе модели сигналов реального времени или на основе модели надежных сигналов.
Сколько бы раз мы ни повторяли тестирование, идентификатор потока, получающего и обрабатывающего сигнал, всегда будет равен 4. Что же происходит:
• главный поток (TID = 1) создает 3 новых потока (TID = 2, 3, 4);
• главный поток переходит в пассивное ожидание сигналов, но в его маске доставка посылаемого сигнала (41) заблокирована;
• выполнение функции потока начинается с разблокирования ожидаемого сигнала;
• … 3 потока (TID = 2, 3, 4) ожидают поступления сигнала;
• при поступлении серии сигналов вся их очередь доставляется и обрабатывается одним потоком с TID = 4, который тут же в цикле возвращается к ожиданию следующих сигналов.
Таким образом, сигнал доставляется одному и только одному потоку, который не блокирует этот сигнал. Обработчик сигнала вызывается в контексте (стек, области собственных данных) этого потока. После выполнения обработчика сигнал поглощается. Какому из потоков, находящихся в состоянии блокирования в ожидании сигналов (в масках которых разблокирован данный сигнал), будет доставлен экземпляр сигнала, предсказать невозможно; это так и должно быть исходя из общих принципов диспетчеризации потоков. Но реально этим потоком является поток, последним перешедший в состояние ожидания. Для того чтобы убедиться в этом, заменим предпоследнюю строку программы (pause();) на:
threadfunc(NULL);
Теперь у нас 4 равнозначных потока, ожидающих прихода сигнала, переходящих в состояние ожидания в последовательности: TID = 2, 3, 4, 1. Реакция процесса на приход сигнала изменится на:
SIG = 41, TID = 1
Изменим текст функции потока на (файл s7.cc):
void* threadfunc(void* data) {
while (true) {
SignalProcmask(0, 0, SIG_UNBLOCK, &sig, NULL);
delay(1);
SignalProcmask(0, 0, SIG_BLOCK, &sig, NULL);
delay(10);
}
}
Поведение приложения радикально изменится — происходит смена обрабатывающего потока (чтобы сократить объем вывода, серии посылаемых сигналов состоят из одного сигнала). Следует отметить, что смена обрабатывающего потока происходит между сериями, но ни в коем случае не внутри длинных серий, что можно проследить экспериментально.
SIG = 41, TID = 1
SIG = 41; TID = 4
SIG = 41; TID = 4
SIG = 41; TID = 1
SIG = 41; TID = 1
SIG = 41; TID = 4
SIG = 41; TID = 4
SIG = 41; TID = 1
SIG = 41; TID = 2
SIG = 41; TID = 2
SIG = 41; TID = 3
SIG = 41; TID = 4
Такая модель вряд ли может быть названа в полной мере «сигналами в потоках», так как сигнал в ней в конечном итоге направляется процессу как контейнеру, содержащему потоки (можно сказать и так: в оболочку адресного пространства процесса). И только после этого в контексте одного из потоков (и в случае множественных потоков, разблокированных на обработку единого сигнала, невозможно предсказать, в контексте какого из них) выполняется обработчик сигнала. Главный поток процесса (TID = 1) в этой схеме участвует в равнозначном качестве (здесь хорошо видно, что устоявшееся понятие «реакция процесса на сигнал» в строгом смысле некорректно).
Перейдем к более конкретным вопросам: как можно продуктивно использовать эту схему в многопоточных приложениях? Рассмотрим сначала случай, когда каждый из рабочих потоков разблокирован на получение одного, свойственного только ему сигнала (файл s9.cc):
Чередование потоковых сигналов#include <stdio.h>
#include <iostream.h>
#include <signal.h>
#include <unistd.h>
#include <pthread.h>
#include <time.h>
#include <sys/neutrino.h>
static void handler(int signo, siginfo_t* info, void* context) {
cout << "SIG = " << signo << "; TID = " << pthread_self() << endl;
}
void* threadfunc(void* data) {
// блокировать реакцию на все сигналы
sigset_t sig;
sigfillset(&sig);
SignalProcmask(0, 0, SIG_BLOCK, &sig, NULL);
// разблокировать реакцию на свой сигнал
sigemptyset(&sig);
sigaddset(&sig, (int)data);
SignalProcmask(0, 0, SIG_UNBLOCK, &sig, NULL);
// цикл ожидания приходящих сигналов
while (true) pause();
}
int main() {
// для обработки всей группы сигналов управления потоками используем
// единую функцию реакции, иначе все было бы гораздо проще.
struct sigaction act;
sigemptyset(&act.sa_mask);