Основы параллельных вычислений (1119586), страница 2
Текст из файла (страница 2)
Создание потока осуществляется с помощью функцииpthread_create(pthread_t * Id, const pthread_attr_t * Attr, void *(* Func) (void *), void * Arg)Каждый поток имеет свой идентификатор (или попросту уникальный номер). Он возвращается этой функцией в первом аргументе — указатель на переменную Id. Тип данных pthread_t есть просто целое число. Второйпараметр функции указывает, с какими атрибутами создавать поток. Нам вполне хватит атрибутов по умолчанию, поэтому, как учил Борисенко, вторым аргументом передаём всегда нуль. Третий аргумент — указательна функцию потока, то есть на ту самую функцию, которую мы хотим запустить (там нужно писать простоимя функции, без всякого оператора адреса &).
А вот четвёртый параметр — это указатель, который будетпередан потоковой функции. Поясним, зачем он нужен. Обычно нужно бывает передавать в функцию какие-топараметры. Сколько их — мы заранее не знаем, а заголовок функции потока фиксированный:void* Func(void* Arg).Но это не означает, что в потоковую функцию можно передать всего один параметр, да и то фиксированного типа.
Можно передать туда указатель на некоторую структуру данных (struct) или класс, если писатьпрограмму на C++. А в эту структуру можно напихать те самые параметры, которые нам нужно передать.Чтобы всё это стало понятнее, разберём простейший пример, в котором поток выводит на экран два числа,передаваемых ему извне. Разумеется, мы опустим здесь строчки #include, чтобы не загромождать текст:class CData { public: int a,b; }; // Класс с параметрамиvoid * Func(void * Arg) // Потоковая функция{CData* D = (CData *) Arg; // Преобразуем указатель к нашему типу CDataprintf("Hello form thread! Data: %d, %d\n", D->a, D->b);return 0; // Завершаем поток}int main(){pthread_t ID; // Номер потокаCData D;D.a = 100; D.b = 200; // Заполняем структуру даннымиpthread_create(&ID, 0, Func, &D); // Создаем потокpthread_join(ID, 0); // Ждем завершения потока с номером ID (т.е.
функции Func)3return 0;}Происходит здесь следующее: функция создания потока запускает функцию Func, но выполняется она ужепараллельно с функцией main(). В комментариях нуждается функция pthread_join, которая ждёт завершенияпотока с номером ID, и лишь тогда, когда оно произойдёт, выполнение функции main продолжится. Это и естьта самая синхронизация, о которой речь шла в самом начале данного опуса. Если бы мы не стали ждать, тосработал бы return 0, и мы (возможно) не увидели бы никаких результатов работы на экране.
Интересно бываетсоздать несколько одинаковых потоков в цикле и последить за тем, в каком порядке они завершатся. Вообщеговоря, этот порядок может быть произвольным; ни в коем случае нельзя полагать, что то, что запущено раньше,обязано закончиться раньше. Глюки, связанные с ошибками подобного рода, очень тяжело ловить, так как онимогут непредсказуемо появляться и пропадать при многократном запуске программы.
По этим же причинамони практически не поддаются отладке (а всё, что работает в режиме реального времени, и без того трудноотлаживать).Остаётся только пояснить, кто такие аргументы функции pthread_join. Первый параметр — это простономер потока, завершения которого мы ждём. Второй аргумент является адресом переменной типа void*, вкоторую кладётся результат работы функции Func. В нашем случае он нам не нужен, поэтому передаём тудануль.3.2. Более содержательный примерА теперь можно написать нечто более разумное, чем просто вывод пары чисел на экран.
Давайте напишемпрограмму, которая будет. . . ну, скажем, перемножать две огромные квадратные матрицы. Тут, с одной стороны, всё просто с математической точки зрения, но в то же время весьма интересно с плане параллельныхвычислений.Давайте сначала поймём, что́, собственно говоря, тут можно выполнять параллельно. Как известно, каждыйэлемент произведения есть скалярное произведение строки и столбца, т. е. его значение не зависит от значенийостальных элементов произведения.
А это как раз то, чего мы хотим: каждый элемент можно вычислятьн е з а в и с и м о! Более того, каждое такое скалярное произведение (а их всего n2 ) будет вычислятьсяпримерно одинаковое время — там будет одинаковое число операций. А это уже совсем здорово: мы не потеряемни одной лишней доли секунды на синхронизацию! Однако если мы будем умножать матрицы порядка n == 1000, то создавать миллион потоков. . . нет, это перебор. Тут система просто задохнётся, и никакого выигрышапо скорости мы не получим. Тогда давайте объединим несколько строк, пусть каждый поток работает наднесколькими строчками результирующей матрицы. Кстати, а почему именно строк, а не столбцов? Это крайневажный вопрос, связанный с представлением массивов в памяти и таким понятием, как кэширование (caching)данных.
Он заслуживает отдельной главы в этом документе. Но пока пусть читатель поверит в то, что построчкам будет быстрее, и приступим к реализации алгоритма.Пусть размер матриц равен n и мы хотим использовать T потоков. Поделим n на T с остатком. Если намповезло, и оно разделилось нацело — просто запускаем T потоков (вызываем pthread_create(...) T раз),запоминая их идентификаторы в массиве, а потом синхронизируем их, вызывая в цикле pthread_join — сноваT итераций. Ну а если не разделилось нацело, тогда оставшиеся строчки нужно обработать в отдельном потоке.Вот, в сущности, и всё. Очевидно, что нужно сделать структуру с двумя полями, одно из которых — начальнаяобрабатываемая потоком строка, а второе — конечная обрабатываемая строка.
При создании каждого потокавычисляются те номера строк, которые он должен обработать, записываются в структуру, а потом указательна эту структуру передаётся в pthread_create — точно так же, как и в первой программе.class CThreadData // структура данных для потока{public:pthread_t id;double * a; // первая матрицаdouble * b; // вторая матрицаdouble * r; // результат умноженияint n; // total size of matrixint cs; // start rowint ce; // end row};// Потоковая функцияvoid * Thread(void * Ptr) // вычисляет строки матрицы R=A*B от cs до ce{4CThreadData * ThreadData = (CThreadData *) Ptr;int cs = ThreadData->cs; // startint ce = ThreadData->ce; // enddouble * a = ThreadData->a;double * b = ThreadData->b;double * r = ThreadData->r;int n = ThreadData->n;for (int i = cs;for (int j ={double sfor (k =r[i*n+j]}return 0;col < ce; col++) // цикл по строкам от cs до ce0; j < n; j++) // цикл по строке= 0.0;0; k < n; i++) s += a[i*n+k]*b[k*n+j];= s;}// Процедура параллельного умножения матриц r = a*b// a, b, r -- указатели на матрицы, n - размер.// ThreadCount -- число потоковvoid Multiply(int ThreadCount, double * a, double * b, double * r, int n){// массив с данными для потоковCThreadData * T = new CThreadData[ThreadCount];int FullThreads;int Rest = n % ThreadCount;if (!Rest) FullThreads = ThreadCount; else FullThreads = ThreadCount-1;int ColsPerThread = 0;if (FullThreads)ColsPerThread = TotalCols/FullThreads; // cols per thread// запускаем полные потокиfor (int i = 0; i < FullThreads; i++){T[i].a = a;T[i].b = b;T[i].r = r;T[i].n = n;T[i].cs = i*ColsPerThread;T[i].ce = (i+1)*ColsPerThread;pthread_create(&(T[i].id), 0, Thread, &T[i]);}// остаток (если есть) -- если не поделилось нацелоif (Rest){T[FullThreads].a = a;T[FullThreads].b = b;T[FullThreads].r = r;T[FullThreads].n = n;T[FullThreads].cs = FullThreads*ColsPerThread;T[FullThreads].ce = n;pthread_create(&(T[FullThreads].id), 0, Thread, &T[FullThreads]);}// ждем, пока идет подсчётfor (int i = 0; i < ThreadCount; i++) pthread_join(T[i].id, 0);}Процедура Multiply(...) предполагает, что память под все матрицы уже выделена.
Тут, правда, в структуре получилось не два поля, а больше. Если не использовать глобальных переменных, то нужно в структурузагнать ещё и все указатели на матрицы, а также их размер. Теперь обсудим схему работы процедур. Главная5процедура Multiply(...) получает на вход три матрицы и их размер, а также количество потоков. Делим n наэто количество. Поделилось — отлично, тогда FullThreads совпадает с ThreadCount, т. е.
все потоки «полные» —они вычисляют одинаковое число строк. А если не поделилось, тогда полных будет на один меньше, а последнийпоток вычислит все оставшиеся столбцы. В первом цикле заполняется структура для i-го потока, а потом онсоздаётся, и в него передаётся соответствующая структура — элемент массива типа CThreadData. Далее, еслиесть остаток, запускаем последний поток, а затем в цикле ждём, пока они сработают. Вот и всё.Конечно, можно было бы указатели на матрицы не записывать в каждый элемент массива, а сделать однуобщую структуру, в которой будет число n и три указателя на матрицы (это еще одна обоснование того, зачемвообще нужны указатели).
Но для примера сойдёт и так.4. Ещё три примера4.1. Параллельные отраженияИз названия раздела очевидным образом следует, что речь пойдёт о многопоточной реализации алгоритмавычисления обратной матрицы по методу отражений. Тут всё почти так же, как и в предыдущем примере. Нашипотоки будут осуществлять умножение матрицы системы на матрицу U (x). Точнее говоря, не всей матрицы A, анекоторой части её столбцов.
Структура данных для потока хранит указатель на матрицу, указатель на векторx, от которого зависит матрица U , номер шага k и общую размерность системы n, а также начальный столбецcs (Column Start) и конечный столбец ce (Column End). Разумеется, для всех потоков отличаются только двапоследних элемента, а k меняется только при переходе к новому шагу.Для сокращения объёма здесь приводится только процедура вычисления. Предполагается, что S_Reflect()получает на вход матрицу A, указатель на A−1 , их размерность n и ненулевое количество потоков.#define A(i,j) a[(i)*n+j]#define AI(i,j) ai[(i)*n+j]class CThreadData // структура данных для потока{public:pthread_t id;double * m; // matrix pointerdouble * x; // vector xdouble * r; // matrix pointerint n; // total size of matrixint k; // stepint cs; // start colint ce; // end col};void * Thread(void * Ptr) // умножает столбцы матрицы m от cs до ce{CThreadData * ThreadData = (CThreadData *) Ptr;int cs = ThreadData->cs; // startint ce = ThreadData->ce; // enddouble * m = ThreadData->m;double * x = ThreadData->x;int k = ThreadData->k;int n = ThreadData->n;for (int col = cs; col < ce; col++) // cycle through cols of A{double dp = 0.0;for (int i = k; i < n; i++) dp += x[i]*m[i*n+col];dp *= 2.0; for (int i = k; i < n; i++) m[i*n+col] -= x[i]*dp;}return 0;}// Вычисление обратной матрицы методом отраженийbool S_Reflect(int ThreadCount, double * a, int n, double * ai){double * x = new double[n];// vector X6// Данные для потоков (массив структур)CThreadData * T = new CThreadData[ThreadCount];int FullThreads, iFullThreads; // число ’полных’ потоков// Вычисляем параметры потоков для обратной матрицыint iRest = n % ThreadCount;if (!iRest) iFullThreads = ThreadCount; else iFullThreads = ThreadCount-1;int iColsPerThread = 0;if (iFullThreads) iColsPerThread = n / iFullThreads; // количество столбцов на один потокdouble s, t, Nx, Na; // Norm of vector// делаем обратную матрицу единичнойfor (int i=0;i<n;i++) for (int j=0;j<n;j++) ai[i*n+j]=(i==j);// Шаги алгоритмаfor (int k = 0; k < n-1; k++){// вычисляем вектор x и его нормуs = 0.0;for (int i = k+1; i < n; i++) { x[i] = A(i,k); s += x[i]*x[i]; }t = A(k,k);Na = sqrt(t*t + s); // norm of vector a_1t -= Na;x[k] = t; // x[k] = a_1 - |a_1|*e_1Nx = sqrt(t*t+s);if (Nx < eps) return false;Nx = 1.0 / Nx; for (int i=k;i<n;i++) x[i] *= Nx;CD.k = k; // устанавливаем номер шага// теперь x содержит то, что нужно.