KnigaRead.com/

Олег Цилюрик - QNX/UNIX: Анатомия параллелизма

На нашем сайте KnigaRead.com Вы можете абсолютно бесплатно читать книгу онлайн Олег Цилюрик, "QNX/UNIX: Анатомия параллелизма" бесплатно, без регистрации.
Перейти на страницу:

 sigset_t sig;

 sigfillset(&sig);

 SignalProcmask(0, 0, SIG_BLOCK, &sig, NULL);

 // разблокировать реакцию на свой сигнал

 sigemptyset(&sig);

 sigaddset(&sig, (int)data);

 SignalProcmask(0, 0, SIG_UNBLOCK, &sig, NULL);

 // цикл ожидания приходящих сигналов

 while (true) pause();

}


int main() {

 // для обработки всей группы сигналов управления потоками используем

 // единую функцию реакции, иначе все было бы гораздо проще.

 struct sigaction act;

 sigemptyset(&act.sa_mask);

 act.sa_sigaction = handler;

 act.sa_flags = SA_SIGINFO;

 // создаем группу однотипных потоков

 const int thrnum = 3;

 for (int i = SIGRTMIN; i - SIGRTMIN < thrnum; i++) {

  sigset_t sig;

  sigemptyset(&sig);

  sigaddset(&sig, 1);

  // нам нужно, чтобы главный поток не реагировал:

  sigprocmask(SIG_BLOCK, &sig, NULL);

  if (sigaction(i, &act, NULL) < 0) perror("set signal handler: ");

  // для передачи номера сигнала используется

  // трюк с подменой типа параметра:

  pthread_create(NULL, NULL, threadfunc, (void*)(i));

 }

 // начинаем циклическую синхронизацию потоков.

 for (int i = 0; ; i++) {

  sleep(1);

  // посылку сигнала можно (так даже будет корректнее)

  // сделать так:

  // union sigval val;

  // val.sival_int = i;

  // sigqueue(getpid(), SIGRTMIN + i % thrnum, val);

  // но мы сознательно демонстрируем и приемлемость kill:

  kill(getpid(), SIGRTMIN + i % thrnum);

 }

}

В этой программе главный поток циклически по таймеру активизирует поочередно каждый поток. Вот фрагмент вывода работающей программы:

SIG = 41; TID = 2

SIG = 42; TID = 3

SIG = 43; TID = 4

SIG = 41; TID = 2

SIG = 42; TID = 3

SIG = 43; TID = 4

SIG = 41; TID = 2

SIG = 42; TID = 3

SIG = 43; TID = 4

Часто приходится слышать: «…хотелось бы доставить сигнал всем потокам, уведомить всех потребителей и выполнить функцию реакции в каждом потоке…», и именно в такой последовательности действий понимается модель сигналов в потоках при поверхностном с ней ознакомлении. Иногда это представляется очень интересной возможностью, и мы реализуем такую схему взаимодействия в следующем фрагменте (файл s10.cc):

Множественная реакция на сигнал

#include <stdio.h>

#include <iostream.h>

#include <signal.h>

#include <unistd.h>

#include <pthread.h>

#include <sys/neutrino.h>

#include <vector>


static void handler(int signo, siginfo_t* info, void* context) {

 cout << "SIG = " << signo << ", TID = " << pthread_self() << endl;

}


static void endhandler(int signo) {}


// сигнал, на который реагируют потоки:

const int SIGNUM = SIGRTMIN;

sigset_t sig;

struct threcord {

 int tid;

 bool noblock;

};

static vector<threcord> tharray; // вектор состояний потоков


void* threadfunc(void* data) {

 // блокирование всех прочих сигналов:

 sigset_t sigall;

 sigfillset(&sigall);

 SignalProcmask(0, 0, SIG_BLOCK, &sigall, NULL);

 // передеспетчеризация для завершения формирования вектора

 sched_yield();

 tharray[(int)data].noblock =

  (SignalProcmask(0, 0, SIG_UNBLOCK, &sig, NULL) != -1);

 while (true) {

  pause();

  tharray[(int)data].noblock =

   !(SignalProcmask(0, 0, SIG_BLOCK, &sig, NULL) != 1);

  bool nolast = false;

  for (vector<threcord>::iterator i = tharray.begin();

   i != tharray.end(); i++)

   if (nolast = i->noblock) break;

  // последовательная пересылка сигнала следующему потоку

  if (nolast) kill(getpid(), SIGNUM);

  // ... когда пересылать больше некому -

  // переинициализация масок

  else

   for (vector<threcord>::iterator i = tharray.begin();

    i != tharray.end(); i++)

    i->noblock = (SignalProcmask(0, i->tid, SIG_UNBLOCK, &sig, NULL) != -1);

 }

}


int main() {

 // переопределение реакции ^C в старой манере

 signal(SIGINT, endhandler);

 // маска блокирования-разблокирования

 sigemptyset(&sig);

 sigaddset(&sig, SIGNUM);

 // блокировка в главном потоке приложения

 sigprocmask(SIG_BLOCK, &sig, NULL);

 cout << "Process " << getpid() << ", waiting for signal " << SIGNUM << endl;

 // установка обработчика (для дочерних потоков)

 struct sigaction act;

 act.sa_mask = sig;

 act.sa_sigaction = handler;

 act.sa_flags = SA_SIGINFO;

 if (sigaction(SIGNUM, &act, NULL) < 0) perror("set signal handler: ");

 const int thrnum = 3;

 for (int i = 0; i < thrnum; i++) {

  threcord threc = { 0, false };

  pthread_create(&threc.tid, NULL, threadfunc, (void*)i);

  tharray.push_back(three);

 }

 pause();

 // сюда мы попадаем после ^C для завершающих операций...

 tharray.erase(tharray.begin(), tharray.end());

 cout << "Clean vector" << endl;

}

Это приложение, в отличие от предыдущих, построено уже с использованием специфики С++, в нем используется контейнерный класс vector из библиотеки STL (Standard Template Library). Может быть множество вариаций на подобную тему. Приведенное нами приложение (как одна из вариаций) только подтверждает, что принятая в QNX модель достаточна для описания самых неожиданных потребностей. Логика работы приложения крайне проста: получая сигнал, поток блокирует повторную реакцию на этот сигнал, после чего возбуждает дубликат полученного сигнала от своего имени.

Примечание

Показанное приложение в значительной степени искусственно и неэффективно. Мы приводим его здесь не как образец того, «как нужно делать», а только как иллюстрацию гибкости возможностей, предоставляемых в области параллельного программирования. При некоторой изобретательности можно заставить программу вести себя согласно вашим капризам, какими бы изощренными они ни оказались.

Запускаем полученное приложение:

# s10

Process 2089006, waiting for signal 41

После чего с другого терминала пошлем приложению ожидаемый им сигнал, например командой:

# kill -41 2089006

Посылаем этот сигнал несколько раз (в данном случае 3) и получаем вывод от приложения:

SIG = 41; TID = 4

SIG = 41; TID = 2

SIG = 41; TID = 3

SIG = 41; TID = 3

SIG = 41; TID = 4

SIG = 41; TID = 2

SIG = 41; TID = 2

SIG = 41; TID = 3

SIG = 41; TID = 4

^C

Clean vector

Видно, что реакция на каждый сигнал возбуждается несколько раз (по числу потоков), каждый раз выполняясь в контексте разного потока (TID). Интересно и изменение порядка активизации потоков от сигнала к сигналу, то есть потоки в очереди ожидающих «перетасовываются» при поступлении каждого сигнала.

Примечание

В приложение добавлена реакция на ^C (сигнал SIGINT):

• начиная с некоторой сложности приложений, их завершению должна обязательно предшествовать некоторая последовательность действий; в данном случае мы условно показываем очистку вектора состояний потоков;

• реакция на SIGINT выполнена в «ненадежной» манере в смешении с моделью очереди сигналов для SIGRTMIN, что показывает возможность смешанного применения всех моделей в рамках одного приложения; все определяется требованиями и вопросами удобства.

Как мы уже видели, тот факт, что обработчик сигнала выполняется в контексте потока, который разблокировал реакцию на этот сигнал (независимо от того, в момент выполнения какого потока приходит сигнал), позволяет реализовать в обработчике сигнала обработку любой сложности в интересах этого потока. Для этого лишь требуется разместить все области данных, запрашиваемые в этой обработке, не в стеке потока (объявленные как локальные переменные потоковой функции), а в области собственных данных потока, которые мы детально рассмотрели ранее. Схематично это можно показать в коде так:

• Положим, нам нужно уведомлять о некоторых событиях N потоков.

Будем использовать для этого сигналы SIGRTMIN…SIGRTMIN + (N - 1):

for (int i = SIGRTMIN, i < SIGRTMIN + N; i++) {

 pthread_create(NULL, NULL, threadfunc, (void*)(i));

}

• При запуске N потоков (из главного потока) потоковые функции, помимо устанавливания своих индивидуальных сигнальных масок (в точности так, как это показано выше в листинге «Чередование потоковых сигналов»), размещают экземпляры собственных потоковых данных:

class DataBlock {

 ~DataBlock(void) {...}

};

static pthread_key_t key;

static pthread_once_t once = PTHREAD_ONCE_INIT;

Перейти на страницу:
Прокомментировать
Подтвердите что вы не робот:*