45817 (665161), страница 2
Текст из файла (страница 2)
Почему, если закомментировать тело APCProc, на экран выводится следующее?
Thread id is 0x7b4 0 1 2 3 4 |
Так как теперь эта процедура фактически ничего не делает, система не успевает добавить новый запрос в очередь до завершения обработки предыдущего, так что каждый SleepEx обрабатывает «свой» APC-запрос.
Теперь вам должно быть понятно, как использовать данный механизм для организации пула потоков. Вот примерный сценарий для фиксированного количества потоков в пуле: в главном потоке приложения создаются несколько рабочих потоков, каждый из которых сразу переходит в состояние тревожного ожидания специально созданного основным потоком события. Когда приходит клиентский запрос, главный поток передает APC-запрос одному из рабочих потоков. Рабочий поток пробуждается и выполняет функцию, поставленную в очередь главным потоком. При этом он не покидает функции WaitForSingleObjectEx. То есть выполнение APC-запроса производится как бы внутри функции WaitForSingleObjectEx. После завершения выполнения запроса управление передается функции WaitForSingleObjectEx, которая, в свою очередь, передает управление основному коду потока, возвращая WAIT_IO_COMPLETION.
При получении управления рабочий поток должен проанализировать значение, возвращенное этой функцией. Если оно равно WAIT_IO_COMPLETION, то причиной выхода из функции WaitForSingleObjectEx было завершение обработки APC-запроса – поток при этом должен снова перейти в состояние ожидания события. Если же возвращается значение WAIT_OBJECT_0, то причиной выхода была установка события в сигнальное состояние главным потоком приложения. При этом рабочий поток должен завершиться.
Это очень простая схема (например, рабочие потоки вместо ожидания могут выполнять какую-то другую полезную работу), но она довольно неплохо объясняет механизм использования APC для организации пула.
SetWaitableTimer
Эта функция появилась с версии 4.0. Она позволяет активировать таймер, который через заданный период времени в 100-наносекундных интервалах или при наступлении заданного абсолютного времени переходит в сигнальное состояние. Кроме этого, можно указать процедуру завершения, которая будет вызвана с помощью APC-запроса в данном потоке. Для процедуры завершения можно указать дополнительный параметр. Функция хороша тем, что она не привязана к окнам и циклу выборки сообщений, как, например, SetTimer. С ее помощью можно использовать таймеры в любых приложениях, включая консольные и сервисы. Однако у SetWaitableTimer есть и некоторые недостатки:
функция завершения всегда вызывается в потоке, вызвавшем SetWaitableTimer;
поток должен быть в состоянии тревожного ожидания, чтобы обработать APC-запрос;
второй APC-запрос начнет обрабатываться только после окончания обработки предыдущего запроса, то есть запросы обрабатываются последовательно.
Все эти проблемы решает объект "очередь таймеров", о котором речь пойдет позже.
Порт завершения ввода/вывода
Это, безусловно, один из самых мощных и сложных объектов исполнительной системы. Он специально предназначен для оптимизации обработки клиентских запросов в серверных приложениях. Он не только организует очередь запросов, но и эффективно управляет их обработкой.
Основная идея порта завершения ввода/вывода (в дальнейшем просто порта) состоит в том, чтобы эффективно расходовать процессорное время при обработке клиентских запросов. Обработка должна вестись параллельно несколькими потоками, но строго до определенного момента, когда число потоков будет равняться максимальному значению. После этого новые потоки перестают создаваться, а запросы ставятся в очередь к существующим потокам. Давайте разберемся в этом поподробнее.
Как работает порт
При создании порта указывается максимальное количество активных потоков, способных обрабатывать клиентские запросы параллельно. Так как количество реально работающих параллельно потоков на компьютере равно количеству процессоров, то указание большего максимального количества активных потоков не выгодно. Почему? Дело в том, что для исполнения нескольких потоков на одном процессоре, системе приходится постоянно переключать процессор между потоками, эмулируя, таким образом, параллельность, однако это переключение, называемое переключением контекстов – довольно дорогая операция. Избежать ее можно только одним способом – не создавать параллельно работающие потоки в количестве большем, чем число процессоров. Таким образом, при создании порта, казалось бы, нужно указывать в качестве максимального количества активных потоков число процессоров в системе, но здесь есть одна тонкость. Допустим, у нас однопроцессорный компьютер и, соответственно, клиентские запросы мы обрабатываем в одном потоке. Что будет, если клиентский запрос придет в момент выполнения синхронной операции с диском или в момент ожидания какого-либо объекта этим потоком? Он будет ждать, пока поток не закончит свою работу, но ведь процессор в это время бездействует, потому что поток заблокирован на синхронной операции или на каком-либо объекте. Когда процессор бездействует, а клиентский запрос не обрабатывается – это плохо. Мы приходим к выводу о том, что всегда должен существовать резервный поток, который подхватывал бы запросы в момент, когда «основной» поток выполняет блокирующие операции, и процессор бездействует.
Работа с файлами (в самом широком смысле слова) очень тесно связана с многопоточностью и обработкой запросов на сервере. Сокет или pipe – это тоже файлы. Чтобы обрабатывать запросы через эти каналы параллельно, нужен порт. Давайте рассмотрим функцию создания порта и связи его с файлом (зачем-то разработчики из Microsoft объединили две эти функции в одну; в исполнительной системе эти две функции выполняют сервисы NtCreateIoCompletion и NtSetInformationFile, соответственно).
HANDLE CreateIoCompletionPort ( HANDLE FileHandle, // хендл файла HANDLE ExistingCompletionPort, // хендл порта завершения ввода/вывода ULONG_PTR CompletionKey, // ключ завершения DWORD NumberOfConcurrentThreads // максимальное число параллельных потоков ); |
Для простого создания порта нужно в качестве первого параметра передать INVALID_HANDLE_VALUE, а в качестве второго и третьего – 0. Для связывания файла с портом нужно указать первые три параметра и проигнорировать четвертый.
После того, как файл (под файлом здесь подразумевается объект подсистемы Win32, который реализуется с помощью объекта "файл исполнительной системы", к таковым относятся файлы, сокеты, почтовые ящики, именованные каналы и проч.) связан с портом, окончания всех асинхронных запросов ввода/вывода попадают в очередь порта и могут быть обработаны пулом потоков. Следующие функции могут быть использованы с портом завершения для обработки асинхронных операций ввода/вывода:
ConnectNamedPipe – ожидает подключения клиента к именованному каналу.
DeviceIoControl – низкоуровневый ввод/вывод.
LockFileEx – блокировка региона файла.
ReadDirectoryChangesW – ожидание изменений в директории.
ReadFile – чтение файла.
TransactNamedPipe – Комбинированное чтение и запись по именованному каналу, осуществляемые за одну сетевую операцию.
WaitCommEvent – ожидание события последовательного интерфейса (СОМ-порт).
WriteFile – запись в файл.
Если вы не хотите, чтобы окончание асинхронного ввода/вывода обрабатывалось портом (например, когда вам не важен результат операции), нужно использовать следующий трюк [1]. Нужно установить поле hEvent структуры OVERLAPPED равным описателю события с установленным первым битом. Делается это примерно так:
OVERLAPPED ov = {0}; ov.hEvent = CreateEvent(...); ov.hEvent = (HANDLE)((DWORD_PTR)(ov.hEvent) | 1); |
И не забывайте сбрасывать младший бит при закрытии хендла события.
Добавлять поток к пулу (подключать его к обработке запросов) можно с помощью следующей функции:
BOOL GetQueuedCompletionStatus( // хендл порта завершения ввода/вывода HANDLE CompletionPort, // количество переданных байт LPDWORD lpNumberOfBytes, // ключ завершения PULONG_PTR lpCompletionKey, // структура OVERLAPPED LPOVERLAPPED *lpOverlapped, // значение таймаута DWORD dwMilliseconds ); |
Эта функция блокирует поток до тех пор, пока порт не передаст потоку пакет запроса или не истечет таймаут.
Поместить пакет запроса в порт можно с помощью функции PostQueuedCompletionStatus.
BOOL PostQueuedCompletionStatus( HANDLE CompletionPort, // хендл порта завершения ввода/вывода DWORD dwNumberOfBytesTransferred, // количество переданных байт ULONG_PTR dwCompletionKey, // ключ завершения LPOVERLAPPED lpOverlapped // структура OVERLAPPED ); |
Пакет запроса не обязательно должен быть структурой OVERLAPPED или производной от нее [2].
Давайте соберем всю информацию воедино. Порт завершения – объект, организующий несколько очередей из клиентских запросов и потоков, их обрабатывающих. Поток добавляется в очередь ожидающих запрос потоков порта при вызове функции GetQueuedCompletionStatus. При поступлении запроса порт разблокирует первый поток в очереди ждущих потоков и передает ему этот запрос (в виде структуры OVERLAPPED и ключа завершения). Поток при этом перемещается в очередь активных потоков (число активных потоков увеличивается на 1). Предположим, у нас максимальное число активных потоков равно 1, тогда при поступлении следующего запроса другой поток из очереди ожидающих активирован не будет. После обработки клиентского запроса поток вновь вызывает GetQueuedCompletionStatus и ставится в начало списка ожидающих потоков. Почему поток ставится именно в начало списка? Дело в том, что потоки берутся из начала списка, и при низкой активности могут использоваться не все потоки. При этом стеки и контексты не используемых потоков могут быть выгружены на диск за ненадобностью.
Если в процессе обработки запроса поток обратился к блокирующей функции, число активных потоков уменьшается на 1, как если бы поток перешел снова в очередь ожидающих потоков. Это дает возможность при приходе следующего клиентского запроса задействовать следующий поток из очереди ожидающих. Когда первый поток закончит блокирующую операцию, число активных потоков превысит максимальное, и при следующем вызове функции GetQueuedCompletionStatus один из этих потоков заблокируется, а второй получит пакет запроса (если он имеется).
Очередь | Запись добавляется при: | Запись удаляется при: |
Список устройств, ассоциированных с портом | вызове CreateIoCompletionPort | закрытии хенда файла |
Очередь клиентских запросов (FIFO) | завершении асинхронной операции файла, ассоциированного с портом, или вызове функции PostQueuedCompletionStatus | передаче портом запроса потоку на обработку |
Очередь ожидающих потоков | вызове функции GetQueuedCompletionStatus | начале обработки клиентского запроса потоком |
Список работающих потоков | начале обработки клиентского запроса потоком | вызове потоком GetQueuedCompletionStatus или какую-либо блокирующей функции |
Список приостановленных потоков | вызове потоком какой-либо блокирующей функции | выходе потока из какой-либо блокирующей функции |
Таблица 1. Список очередей порта завершения ввода/вывода [1].
Недокументированные возможности порта и его низкоуровневое устройство
Как всегда это бывает у Microsoft, порт завершения обладает многими недокументированными возможностями:
У порта завершения ввода/вывода может быть имя, и соответственно, он доступен для других процессов. Совершенно непонятно, почему разработчики решили скрыть эту, на мой взгляд, нужную особенность порта. Имя можно задать в параметре ObjectAttributes функции NtCreateIoCompletion.
Вторая особенность вытекает из первой: с портом может быть связан дескриптор безопасности, который также задается в параметре ObjectAttributes функции NtCreateIoCompletion.