Олег Цилюрик - QNX/UNIX: Анатомия параллелизма
Гораздо свободнее может себя чувствовать разработчик драйвера псевдоустройства (программной модели): здесь каждый запрос open() (будь то из одного последовательного потока, различных потоков процесса или даже из потоков, принадлежащих разным процессам) может порождать новый экземпляр псевдоустройства. Возвращаемый им файловый дескриптор (в QNX это дескриптор соединения) станет ссылаться на порожденный экземпляр, а вызовы read() — write(), оперирующие с различным дескриптором, будут направляться соответствующим различным экземплярам. (Понятно, что такой параллелизм операций может обеспечить только многопоточный менеджер ресурса, но нужно еще «заставить» его сделать это.)
Это настолько часто используемая модель, что она заслуживает отдельного рассмотрения. Дополнительную сложность создает то обстоятельство, что мы, как уже отмечалось, договорились писать программный код на С++, а здесь нам предстоит переопределять из своего кода определения в заголовочных файлах менеджера ресурсов, не нарушая их C-синтаксис.
Ниже показан текст простейшего многопоточного менеджера (исключены даже самые необходимые проверки), ретранслирующего по нескольким каналам независимо получаемые текстовые строки (строки кода, принципиальные для обеспечения параллельности и многоканальности, выделены жирным шрифтом):
Подмена стандартного Open Control Block// предшествующие общие строки #include не показаны
// это переопределение нужно для исключения предупреждений
// компилятора: 'THREAD_POOL_PARAM_T' redefined
#define THREAD_POOL_PARAM_T dispatch_context_t #include <sys/dispatch.h>
// следующее переопределение принципиально важно.
// оно предписывает вместо стандартного блока OCB (open control block),
// создаваемого вызовом клиента open() и соответствующего его файловому
// дескриптору, использовать собственную структуру данных.
// Эта структура должна быть производной от стандартной
// iofunc_ocb_t, а определение должно предшествовать
// включению <sys/iofunc.h>
#define IOFUNC_OCB_T struct ownocb
#include <sys/iofunc.h>
class ownocb public iofunc_ocb_t {
static const int BUFSIZE = 1024;
public:
char *buf;
ownocb(void) { buf = new char[BUFSIZE]; }
~ownocb(void) { delete buf; }
};
IOFUNC_OCB_T *ownocb_calloc(resmgr_context_t *ctp, IOFUNC_ATTR_T *device) {
return new ownocb;
}
void ownocb_free(IOFUNC_OCB_T *o) { delete o; }
iofunc_funcs_t ownocb_funcs = {
_IOFUNC_NFUNCS, ownocb_calloc, ownocb_free
};
iofunc_mount_t mountpoint = { 0, 0, 0, 0, &ownocb_funcs };
// Вместо умалчиваемой операции iofunc_lock_ocb_default(),
// вызываемой перед началом обработки запросов чтения/записи
// и блокирующей атрибутную запись, мы предписываем вызывать
// "пустую" операцию и не блокировать атрибутную запись,
// чем обеспечиваем параллелизм.
static int nolock(resmgr_context_t *ctp, void *v, IOFUNC_OCB_T *ocb) {
return EOK;
}
// обработчик запроса чтения
static int line_read(resmgr_context_t *ctp, io_read_t *msg,
IOFUNC_OCB_T *ocb) {
if (strlen(ocb->buf) != 0) {
MsgReply(ctp->rcvid, strlen(ocb->buf) + 1, ocb->buf, strlen(ocb->buf) + 1);
strcpy(ocb->buf, "");
} else MsgReply(ctp->rcvid, EOK, NULL, 0);
return _RESMGR_NOREPLY;
}
// обработчик запроса записи
static int line_write(resmgr_context_t *ctp, io_write_t *msg,
IOFUNC_OCB_T *ocb) {
resmgr_msgread(ctp, ocb->buf, msg->i.nbytes, sizeof(msg->i));
_IO_SET_WRITE_NBYTES(ctp, msg->i.nbytes);
return EOK;
}
// имя, под которым регистрируется менеджер:
const char sResName[_POSIX_PATH_MAX + 1] = "/dev/wmng";
// старт менеджера ресурса
static void StartResMng(void) {
dispatch_t* dpp;
if ((dpp = dispatch_create()) == NULL)
perror("dispatch create"), exit(EXIT_FAILURE);
resmgr_attr_t resmgr_attr;
memset(&resmgr_attr, 0, sizeof resmgr_attr);
resmgr_attr.nparts_max = 1;
resmgr_attr.msg_max_size = 2048;
// статичность 3-х последующих описаний принципиально важна!
// (также они могут быть сделаны глобальными переменными файла):
static resmgr_connect_funcs_t connect_funcs;
static resmgr_io_funcs_t io_funcs;
static iofunc_attr_t attr;
iofunc_func_init(_RESMGR_CONNECT_NFUNCS, &connect_funcs,
_RESMGR_IO_NFUNCS, &io_funcs);
// переопределение обработчиков по умолчанию
io_funcs.read = line_read;
io_funcs.write = line_write;
io_funcs.lock_ocb = nolock;
iofunc_attr_init(&attr, S_IFNAM | 0666, NULL, NULL);
// через это поле осуществляется связь с новой
// структурой OCB.
attr.mount = &mountpoint;
if (resmgr_attach(dpp, &resmgr_attr, sResName, _FTYPE_ANY, 0,
&connect_funcs, &io_funcs, &attr) == -1)
perror("name attach"), exit(EXIT_FAILURE);
// создание пула потоков (многопоточность)
thread_pool_attr_t pool_attr;
memset(&pool_attr, 0, sizeof pool_attr);
pool_attr.handle = dpp;
pool_attr.context_alloc = dispatch_context_alloc;
pool_attr.block_func = dispatch_block;
pool_attr.handler_func = dispatch_handler;
pool_attr.context_free = dispatch_context_free;
pool_attr.lo_water = 2;
pool_attr.hi_water = 6;
pool_attr.increment = 1;
pool_attr.maximum = 50;
thread_pool_t* tpp;
if ((tpp = thread_pool_create(&pool_attr, POOL_FLAG_EXIT_SELF)) == NULL)
perror("pool create"), exit(EXIT_FAILURE);
thread_pool_start(tpp);
// к этой точке return управление уже никогда не подойдет...
}
int main(int argc, char *argv[]) {
// проверка, не загружен ли ранее экземпляр менеджера,
// 2 экземпляра нам ни к чему...
char sDirName[_POSIX_NAME_MAX + 1];
int nDirLen = strrchr((const char*)sResName, '/') - (char*)sResName;
strncpy(sDirName, sResName, nDirLen);
sDirName[nDirLen] = ' ';
DIR *dirp = opendir(sDirName);
if (dirp == NULL)
perror("directory not found"), exit(EXIT_FAILURE);
struct dirent *direntp;
while (true) {
if ((direntp = readdir(dirp)) == NULL) break;
if (strcmp(direntp->d_name, strrchr(sResName, '/') + 1) == 0)
cout << "second copy of manager" << endl, exit(EXIT_FAILURE);
}
closedir(dirp);
// старт менеджера
StartResMng();
// ... к этой точке мы уже никогда не подойдем...
exit(EXIT_SUCCESS);
}
В отличие от типового и привычного шаблона многопоточного менеджера, мы проделали здесь дополнительно следующее:
• Определили собственную структуру OCB, новый экземпляр которой должен создаваться для каждого нового подключающегося клиента:
class ownocb : public iofunc_ocb_t { ... };
• Переопределили описание структуры OCB, используемое библиотеками менеджера ресурсов:
#define IOFUNC_OCB_T struct ownocb
• Заполняя атрибутную запись устройства:
attr.mount = &mountpoint;
мы к точке монтирования «привязываем» функции создания и уничтожения вновь определенной структуры OCB (по умолчанию библиотека менеджера станет размещать только стандартный OCB):
iofunc_funcs_t ownocb_funcs = {
_IOFUNC_NFUNCS, ownocb_calloc, ownocb_free
};
iofunc_mount_t mountpoint = { 0, 0, 0, 0, &ownocb_funcs };
(_IOFUNC_NFUNCS — это просто константа, определяющая число функций и равная 2.)
• Определяем собственные функции размещения и уничтожения структуры OCB с прототипами:
IOFUNC_OCB_T* ownocb_calloc(resmgr_context_t*, IOFUNC_ATTR_T*);
void ownocb_free(IOFUNC_OCB_T *o);
В нашем случае это: а) интерфейс из C-понятия «создать-удалить», в C++ — «конструктор-деструктор» и б) именно здесь создается и инициализируется сколь угодно сложная структура экземпляра OCB.
• В функциях обработки запросов клиента (операций менеджера) мы позже будем в качестве 3-го параметра вызова обработчика получать указатель именно того экземпляра, для которого требуется выполнить операцию, например:
int read(resmgr_context_t*, io_read_t*, IOFUNC_OCB_T*) {...}
Дополнительно мы проделываем еще один трюк, запрещая менеджеру блокировать атрибутную запись устройства при выполнении операций (что он делает по умолчанию; для реальных устройств это резонно, но для программного псевдоустройства это, как правило, не является необходимым). Для этого:
• В таблице операций ввода/вывода переназначаем функцию-обработчик операции блокирования атрибутной записи:
io_funcs.lock_ocb = nolock;
• В качестве такого обработчика предлагаем «пустую» операцию:
static int nolock(resmgr_context_t*, void*, IOFUNC_OCB_T*) {
return EOK;
}
Запустим менеджер и проверим, как происходит его установка в системе: