Саммерфилд - Программирование на Python 3 (1077331), страница 107
Текст из файла (страница 107)
Пакет предоставляет несколько способов передачи данных между процессами, включая применение очереди, которая может использоваться для распределения работы между процессами точно так же, как очередь диезе. Овесе использовалась для распределения заданий между потоками. Главное преимущество версии программы, в которой для обработки запускаютея отдельные процессы, состоит в том, что на многоядерных процессорах она потенциально способна обеспечить более высокую производительность, поскольку процессы могут выполняться на всех доступных ядрах. Сравните зто со стандартным интерпретатором РуФЬоп (написанным на языке С и иногда называемым СРу!Ьоп), в котором имеется глобальная блокировка интерпретатора (С1оЬа1 1п!егрге!ег Ьос)с, С1Ь), что означает, что в каждый конкретный момент времени может выполняться только один поток программного кода Ру!Ьоп.
Это ограничение является особенностью конкретной реализации и необязательно применяется в других реализациях интерпретатора, таких как ау!Ьоп.' Краткое объяснение, для чего применяется глобальная блокировка интерпретатора, приводится пс адресу: шшш рутьоп.агу/т/ос//ад/11ьгагу/агап-1- шеует-гЫ-ар!бе-у1оЬа1-т1вгрге1ег-1ссу н йссв рутбся.сгу/ар1/1Ьгеабв.Ыт1. Делегирование работы потокам выполнения 479 Пример: многопоточная программа поиска дубликатов файлов Пример второй многопоточной программы по своей структуре напоминает первую, но она более сложная.
В ней используются две очереди— одна для заданий и вторая для результатов„а кроме того, имеется отдельный поток обработки результатов, который выводит их по мере поступления. Кроме того, демонстрируется прием создания подкласса класса сг1геаб!пд.тг1геаб и создание потока вызовом функции спгеабспд. Тлгеаб(), а также используется блокировка для упорядочения доступа к совместно используемым данным (тип б!сС). Программа ((лг)т)ир((сатез-Сру является расширенной версией программы ЛлЫиргру„которая приводилась в главе б. Она выполняет итерации по всем файлам в текущем каталоге (или в каталоге по указанному пути) и рекурсивно выполняет обход подкаталогов.
Она сравнивает размеры всех файлов с одинаковыми именами (точно так же, как программа г!лс)дир.ру); для файлов с одинаковыми именами и одинаковыми размерами она вычисляет контрольную сумму по алгоритму МРб (Мевваяе Р!яевС), чтобы убедиться, что файлы действительно являются идентичными, и сообщает о таких файлах.
Сначала мы рассмотрим функцию васо(), разделив ее на четыре части: бет еа!и(): орСз, раСП = рагзе ортсопя() баса = со11есс!опя.бегао1сбтос(1сяс) гог грос, б!гз, гс1ез сп оз.на1к(расо): Гог Гс1епаае сп ГС1ез: гоппапе = оз.расо.]оса(гоос, г!1епаее) Сгт: кеу = (оз.расо.десз!Се(го11паае), г!1епаее) ехоерт Епр!гопаепСЕггог: сор!слое сГ Кет(0] == О: сопССпие бата(аеу].аррепб(ГоИпаае) Каждый ключ словаря со значениями по умолчанию бата представляет собой кортеж из двух элементов (размер и имя файла), где имя файла не включает путь к нему, а значение представляет собой список имен файлов (с путями к ним). Любой элемент, у которого в списке имеется более одного имени файла, потенциально может содержать дубликаты. Словарь заполняется в процессе итераций по всем файлам в заданном пути, при этом пропускаются все файлы, для которых невозможно получить размер (возможно, из-за отсутствия необходимых прав доступа или потому что они не являются обычными файлами), а также файлы, имеющие нулевой размер (поскольку все файлы нулевого размера можно считать идентичными).
480 Глава 9. Процессы и потоки аогк циеие = циеие.рыог!тудиеие() геви1тв циеие = циеие.диеие() ас5 Ггоа Гт!епвае = () гог т тп гапде(орте.соил!): пиаьег = "(О): ".гогавт(! + 1) тг орте,сеьид е1ве "" аогкег = когкег(аогк пиесе, ао5 ггоа гт1епаае, геви1(в циеие, пиаЬег) аогКег.саеаоп = Тгие аогкег.втагт() Собрав исходные данные, можно приступать к созданию рабочих потоков. Но сначала создается очередь заданий и очередь результатов.
Очередь заданий представляет собой очередь с приоритетами, поэтому она всегда возвращает элемент с наименьшим значением приоритета (или, в нашем случае, файлы с наименьшими размерами). Затем создается словарь, в котором каждый ключ — имя файла (включая путь к нему), а значение — контрольная сумма МРб. Назначение словаря состоит втом, чтобы избежать повторного вычисления контрольной суммы для одного и того же файла (поскольку эта процедура является достаточно дорогостоящей).
Создав коллекцию для хранения совместно используемых данных, функция выполняет количество циклов, соответствующее количеству создаваемых потоков (по умолчанию семь циклов). Класс Иогкег напоминает одноименный класс, созданный в предыдущем примере, только на этот раз его экземплярам передаются две очереди и словарь контрольных сумм МТ)б. Как и прежде, каждый рабочий поток немедленно запускается на выполнение и каждый из них тут же блокируется, пока в очереди не появятся свободные задания.
геви1тв тпгеао = тлгеастпд.тогеас( тагдет=1ааЬОа: ргтп! геви1тв(геви1тв пиесе)) геви1тв тлгеас.саеаоп = Тгие геви!тв тлгеао.втвгт() Вместо создания отдельного подкласса тлгеао!пд, Тлгеаа для обработки результатов мы создали функцию, которая передается функции тлгеас!пд.тлгеае(). Эта функция возвращает отдельный поток, который вызовет указанную функцию сразу же после запуска. Мы передаем функции очередь с результатами (которая, конечно же, пока пустая), поэтому поток тут же оказывается заблокированным. К этому моменту мы создали все рабочие потоки и поток обработки результатов, причем все они оказались заблокированы в ожидании появления заданий.
гог вые, гт1епаае тп вогтео(овтв): аваев = пата[в!ге, г11епаае) тт 1еп(пааев) > П аогк циеие,рит((в!ве, пваев)) Делегирование работы потокам выполнения 481 нога циеие.)о(п() геви1тв пиесе.)атп() Теперь выполняются итерации по элементам словаря бата, и для всех кортежей, состоящих нз двух элементов (размер, имя), для которых имеется список из двух или более потенциальных дубликатов файлов„ размер и имена файлов с путями добавляются в виде элементов в очередь заданий. Поскольку очередь является экземпляром класса из модуля циеие, нам не нужно беспокоиться о блокировках. В заключение выполняется присоединение к очереди заданий и к очереди с результатами, чтобы заблокировать главный поток программы до того момента, пока обе очереди не опустеют. Тем самым гарантируется, что программа будет продолжать работать, пока не будут выполнены все задания и не будут выведены все результаты, после чего программа завершает свою работу.
иег ргали геви1гв(геви1тв циеие): нш1е Тгие: тгу: геви1(в = геви1тв циеие.дет() 1( геви1тв: рг(пт(геви1(в) ттпа11у: геви!тв циеие.тавя ропе() Эта функция передается в виде аргумента функции та геаб(пд, Тпгеаб() и вызывается, когда запускается данный поток. Функция выполняет бесконечный цикл, потому что она используется в потоке, который работает в режиме демона. Эта функция просто извлекает результаты (многострочный текст) и выводит непустые строки, пока очередь с результатами не опустеет. Начало определения класса у)огкег похоже на определение одноимен- ного класса, который приводился выше: с1авв Иогяег(тпгеабтпд.тпгеаб): Мб5 1осК = тлгеабтпд.(ося() бет тп11 (ве1(, ногк пиесе, аб5 тгоа (11епаае, геви1тв циеие, пиаЬег): вирег(). тптт () ве)т.ногМ пиесе = аогк пиесе ве1(.аб5 тгоа (11епаае = аб5 Ггоа Г11епаае ве1(.геви1тв циеие = геви1тв пиесе ве1(,пиаЬег = пиаЬег бег гип(ве1(): ип11е Тгие: тгу: вые, пааев = ве1(.нога циеие.дет() ве1(.ргосевв(вые, пааев) Глава 9.
Процессы и потоки гтпаПу: ве1(,ногк овесе.тавк оспе() Различия заключаются в том, что в этой версии у нас имеется больше совместно используемых данных и наша функция ргосевв() вызывается с другими аргументами. Нам не требуется беспокоиться об организации доступа к очередям, так как они гарантируют упорядочение доступа; при обращении к другим данным, в данном случае — к словарю вэ5 ггоз т11епаве, мы должны управлять доступом, используя блокировку.
Мы сделали блокировку атрибутом класса, потому что нам требуется, чтобы все экземпляры класса У(от де г использовали одну и ту же блокировку, то есть когда один экземпляр получает блокировку, все остальные экземпляры при попытке получить ее блокируются. Теперь рассмотрим функцию р госева(), разделив ее на две части: Сет ргосевв(ве1(, в(ге, (11епааев). по5в = со11ест1опв.сетаи1тстст(вет) Гог Гт1епаае 1п тт1епавев: нттп ве1(.М05 1осх: ас5 = ве1(.ес5 Ггоа (11епапе.зет(тт1епаае, Мосе) тт ао5 тв пот попе: ао5в[ес5].аоо(тт1епаее) е)ве: тгу: ас5 = Пава)1Ь.ас5() н11П ореп(тт1епаае, "гЬ") ав тл: а05.орсате(тл.геас()) ао5 = ао5.О1ревт() ао5в[ас5].асс(111епаае) н11Ь ве1(.МО5 1осрл ве1(.по5 (гоп 111епаае[(11епаее] = п05 ехсерт ЕпутгопеептЕггог: солт!псе Независимо от того, получаем ли мы доступ к словарю вс5 т гон г11епазе для чтения или для записи, мы делаем это в контексте блокировки.