Подтвердить что ты не робот

Отправка блоков 2D-массива в C с использованием MPI

Как вы отправляете блоки двухмерного массива на разные процессоры? Предположим, что размер 2D-массива составляет 400x400, и я хочу отправить блоки размером 100X100 на разные процессоры. Идея состоит в том, что каждый процессор будет выполнять вычисления на своем отдельном блоке и отправлять результат обратно первому процессору для конечного результата.
Я использую MPI в программах C.

4b9b3361

Ответ 1

Позвольте мне начать с того, что вы вообще не хотите этого делать - разбросайте и соберите огромные куски данных из какого-то "мастер-процесса". Как правило, вы хотите, чтобы каждая задача отвлекалась на свой кусочек головоломки, и вы должны стремиться к тому, чтобы ни один процессор не нуждался в "глобальном представлении" всех данных; как только вы это потребуете, вы ограничите масштабируемость и размер проблемы. Если вы делаете это для ввода-вывода - один процесс считывает данные, затем рассеивает его, а затем собирает обратно для записи, вам нужно в конечном итоге просмотреть MPI-IO.

Однако, если у вас есть вопрос, у MPI есть очень хорошие способы вытащить произвольные данные из памяти, а также разбросать/собрать его и из набора процессоров. К сожалению, это требует значительного количества концепций MPI - типов MPI, экстентов и коллективных операций. В ответе на этот вопрос обсуждаются многие основные идеи - MPI_Type_create_subarray и MPI_Gather.

Обновление. В холодном свете дня это много кода, а не много объяснений. Поэтому позвольте мне немного расшириться.

Рассмотрим 1d целочисленный глобальный массив, для задачи 0 которого вы хотите распространять несколько задач MPI, чтобы каждый из них получал кусок в своем локальном массиве. Скажем, у вас есть 4 задачи, а глобальный массив [01234567]. У вас может быть задача 0 отправить четыре сообщения (в том числе один на себя), чтобы распределить это, и когда придет время для повторной сборки, получите четыре сообщения, чтобы связать их вместе; но это, очевидно, занимает много времени при большом количестве процессов. Существуют оптимизированные процедуры для этих видов операций - операции разбрасывания/сбора. Итак, в этом случае вы делаете что-то вроде этого:

int global[8];   /* only task 0 has this */
int local[2];    /* everyone has this */
const int root = 0;   /* the processor with the initial global data */

if (rank == root) {
   for (int i=0; i<7; i++) global[i] = i;
}

MPI_Scatter(global, 2, MPI_INT,      /* send everyone 2 ints from global */
            local,  2, MPI_INT,      /* each proc receives 2 ints into local */
            root, MPI_COMM_WORLD);   /* sending process is root, all procs in */
                                     /* MPI_COMM_WORLD participate */

После этого данные процессоров будут выглядеть как

task 0:  local:[01]  global: [01234567]
task 1:  local:[23]  global: [garbage-]
task 2:  local:[45]  global: [garbage-]
task 3:  local:[67]  global: [garbage-]

Таким образом, операция рассеяния принимает глобальный массив и отправляет смежные 2-целые фрагменты всем процессорам.

Чтобы повторно собрать массив, мы используем операцию MPI_Gather(), которая работает точно так же, но наоборот:

for (int i=0; i<2; i++) 
   local[i] = local[i] + rank;

MPI_Gather(local,  2, MPI_INT,      /* everyone sends 2 ints from local */
           global, 2, MPI_INT,      /* root receives 2 ints each proc into global */
           root, MPI_COMM_WORLD);   /* recv'ing process is root, all procs in */
                                    /* MPI_COMM_WORLD participate */

и теперь данные выглядят как

task 0:  local:[01]  global: [0134679a]
task 1:  local:[34]  global: [garbage-]
task 2:  local:[67]  global: [garbage-]
task 3:  local:[9a]  global: [garbage-]

Gather возвращает все данные, а здесь 10, потому что я не думал, что мое форматирование достаточно тщательно при запуске этого примера.

Что произойдет, если количество точек данных равномерно не делит количество процессов, и нам нужно отправлять разные количества элементов в каждый процесс? Тогда вам понадобится обобщенная версия разброса, MPI_Scatterv(), которая позволяет вам указывать подсчеты для каждого процессором и смещениями - где в глобальном массиве начинается эта часть данных. Итак, скажем, у вас был массив символов [abcdefghi] с 9 символами, и вы собирались назначить каждому процессу два символа, кроме последнего, который получил три. Тогда вам понадобится

char global[9];   /* only task 0 has this */
char local[3]={'-','-','-'};    /* everyone has this */
int  mynum;                     /* how many items */
const int root = 0;   /* the processor with the initial global data */

if (rank == 0) {
   for (int i=0; i<8; i++) global[i] = 'a'+i;
}

int counts[4] = {2,2,2,3};   /* how many pieces of data everyone has */
mynum = counts[rank];
int displs[4] = {0,2,4,6};   /* the starting point of everyone data */
                             /* in the global array */

MPI_Scatterv(global, counts, displs, /* proc i gets counts[i] pts from displs[i] */
            MPI_INT,      
            local, mynum, MPI_INT;   /* I'm receiving mynum MPI_INTs into local */
            root, MPI_COMM_WORLD);

Теперь данные выглядят как

task 0:  local:[ab-]  global: [abcdefghi]
task 1:  local:[cd-]  global: [garbage--]
task 2:  local:[ef-]  global: [garbage--]
task 3:  local:[ghi]  global: [garbage--]

Теперь вы использовали dispav для распределения нерегулярных объемов данных. В каждом случае смещение равно двум * рангам (измеряется в символах, смещение - в единицах типов, отправляемых для разброса или принимаемых для сбора, это не обычно в байтах или что-то еще) с начала массива, и подсчеты {2,2,2,3}. Если бы это был первый процессор, мы хотели иметь 3 символа, мы бы установили counts = {3,2,2,2}, а смещения были бы {0,3,5,7}. Gatherv снова работает точно так же, как и наоборот; массивы counts и pls останутся неизменными.

Теперь, для 2D, это немного сложнее. Если мы хотим отправить 2d-подслои 2-го массива, данные, которые мы отправляем сейчас, уже не смежны. Если мы отправляем (скажем) 3 × 3 подблоки массива 6x6 на 4 процессора, данные, которые мы отправляем, имеют в нем отверстия:

2D Array

   ---------
   |000|111|
   |000|111|
   |000|111|
   |---+---|
   |222|333|
   |222|333|
   |222|333|
   ---------

Actual layout in memory

   [000111000111000111222333222333222333]

(Обратите внимание, что все высокопроизводительные вычисления сводятся к пониманию компоновки данных в памяти.)

Если мы хотим отправить данные, помеченные "1" на задачу 1, нам нужно пропустить три значения, отправить три значения, пропустить три значения, отправить три значения, пропустить три значения, отправить три значения. Второе осложнение заключается в том, что субрегионы останавливаются и запускаются; обратите внимание, что область "1" не начинается, когда область "0" останавливается; после последнего элемента области "0" следующее место в памяти является частью пути через область "1".

Сначала рассмотрим первую проблему компоновки - как вытащить только те данные, которые мы хотим отправить. Мы всегда могли просто скопировать все данные области "0" в другой смежный массив и отправить это; если бы мы планировали это достаточно тщательно, мы могли бы сделать это так, чтобы мы могли называть MPI_Scatter результатами. Но нам бы не пришлось переносить всю нашу основную структуру данных таким образом.

До сих пор все типы данных MPI, которые мы использовали, были простыми - MPI_INT указывает (скажем) 4 байта в строке. Однако MPI позволяет создавать собственные типы данных, которые описывают произвольно сложные макеты данных в памяти. И этот случай - прямоугольные субрегионы массива - достаточно распространен, что есть конкретный вызов для этого. Для двумерного случай, который мы описываем выше,

    MPI_Datatype newtype;
    int sizes[2]    = {6,6};  /* size of global array */
    int subsizes[2] = {3,3};  /* size of sub-region */
    int starts[2]   = {0,0};  /* let say we're looking at region "0",
                                 which begins at index [0,0] */

    MPI_Type_create_subarray(2, sizes, subsizes, starts, MPI_ORDER_C, MPI_INT, &newtype);
    MPI_Type_commit(&newtype);

Это создает тип, который выбирает только область "0" из глобального массива; мы могли бы отправить только эту часть данных в другой процессор

    MPI_Send(&(global[0][0]), 1, newtype, dest, tag, MPI_COMM_WORLD);  /* region "0" */

и процесс получения может получить его в локальный массив. Обратите внимание, что процесс получения, если он принимает его только в массив 3x3, не может описать, что он получает как тип newtype; который больше не описывает макет памяти. Вместо этого он просто получает блок из 3 * 3 = 9 целых чисел:

    MPI_Recv(&(local[0][0]), 3*3, MPI_INT, 0, tag, MPI_COMM_WORLD);

Заметьте, что мы могли бы сделать это и для других субрегионов, либо создав другой тип (с другим массивом start) для других блоков, либо просто отправив в начальной точке конкретного блока:

    MPI_Send(&(global[0][3]), 1, newtype, dest, tag, MPI_COMM_WORLD);  /* region "1" */
    MPI_Send(&(global[3][0]), 1, newtype, dest, tag, MPI_COMM_WORLD);  /* region "2" */
    MPI_Send(&(global[3][3]), 1, newtype, dest, tag, MPI_COMM_WORLD);  /* region "3" */

Наконец, обратите внимание, что мы требуем, чтобы глобальные и локальные были смежными кусками памяти здесь; то есть &(global[0][0]) и &(local[0][0]) (или, что эквивалентно, *global и *local указывают на смежные 6 * 6 и 3 * 3 куска памяти, что не гарантируется обычным способом выделения динамических мульти- d. Он показал, как это сделать ниже.

Теперь, когда мы понимаем, как указывать субрегионы, перед обсуждением операций разброса/сбора и только "размера" этих типов нужно обсудить еще одну вещь. Мы не могли просто использовать MPI_Scatter() (или даже рассеяние) с этими типами еще, поскольку эти типы имеют длину 16 целых чисел; то есть, где они заканчиваются, это 16 целых чисел после их начала - и там, где они заканчиваются, не выстраиваются хорошо, где начинается следующий блок, поэтому мы не можем просто использовать разброс - он бы выбрал неправильное место для начала отправки данных к следующему процессору.

Конечно, мы могли бы использовать MPI_Scatterv() и сами определять перемещения, и что мы будем делать, за исключением смещений в единицах размера отправляемого типа, и это тоже не помогает; блоки начинаются с смещений (0,3,18,21) целых чисел от начала глобального массива, а тот факт, что блок заканчивает 16 целых чисел от того, где он начинается, не позволяет нам выразить эти смещения в целых кратных значениях вообще.

Чтобы справиться с этим, MPI позволяет установить размер типа для целей этих вычислений. Он не усекает тип; он просто используется для выяснения, где следующий элемент начинается с последнего элемента. Для таких типов, как эти с отверстиями в них, часто бывает полезно установить, чтобы степень была чем-то меньшим, чем расстояние в памяти до фактического конца типа.

Мы можем установить, насколько это будет что-то удобное для нас. Мы могли бы просто сделать размер 1 целым, а затем установить смещения в единицах целых чисел. В этом случае, однако, мне нравится указывать, что степень равна 3 целым числам - размер подстроки - таким образом, блок "1" начинается сразу после блока "0" , а блок "3" начинается сразу после блока "2". К сожалению, это не очень хорошо работает, когда вы прыгаете из блока "2" в блок "3", но это не может помочь.

Чтобы разброс субблоков в этом случае, мы сделали бы следующее:

    MPI_Datatype type, resizedtype;
    int sizes[2]    = {6,6};  /* size of global array */
    int subsizes[2] = {3,3};  /* size of sub-region */
    int starts[2]   = {0,0};  /* let say we're looking at region "0",
                                 which begins at index [0,0] */

    /* as before */
    MPI_Type_create_subarray(2, sizes, subsizes, starts, MPI_ORDER_C, MPI_INT, &type);  
    /* change the extent of the type */
    MPI_Type_create_resized(type, 0, 3*sizeof(int), &resizedtype);
    MPI_Type_commit(&resizedtype);

Здесь мы создали тот же тип блока, что и раньше, но мы изменили его размер; мы не изменили, где тип "начинается" (0), но мы изменили его "завершение" (3 интервала). Мы не упоминали об этом раньше, но MPI_Type_commit должен иметь возможность использовать этот тип; но вам нужно только зафиксировать конечный тип, который вы используете, а не какие-либо промежуточные шаги. Вы используете MPI_Type_free, чтобы освободить тип, когда закончите.

Итак, теперь, наконец, мы можем разбросать блоки: манипуляции с данными выше немного сложны, но как только это делается, рассеяние выглядит так же, как и раньше:

int counts[4] = {1,1,1,1};   /* how many pieces of data everyone has, in units of blocks */
int displs[4] = {0,1,6,7};   /* the starting point of everyone data */
                             /* in the global array, in block extents */

MPI_Scatterv(global, counts, displs, /* proc i gets counts[i] types from displs[i] */
            resizedtype,      
            local, 3*3, MPI_INT;   /* I'm receiving 3*3 MPI_INTs into local */
            root, MPI_COMM_WORLD);

И теперь мы закончили, после небольшого тура по разбросу, сбору и производным типам MPI.

Ниже приведен пример кода, который показывает как операцию сбора, так и операцию рассеяния с массивами символов. Запуск программы:

$ mpirun -n 4 ./gathervarray
Global array is:
0123456789
3456789012
6789012345
9012345678
2345678901
5678901234
8901234567
1234567890
4567890123
7890123456
Local process on rank 0 is:
|01234|
|34567|
|67890|
|90123|
|23456|
Local process on rank 1 is:
|56789|
|89012|
|12345|
|45678|
|78901|
Local process on rank 2 is:
|56789|
|89012|
|12345|
|45678|
|78901|
Local process on rank 3 is:
|01234|
|34567|
|67890|
|90123|
|23456|
Processed grid:
AAAAABBBBB
AAAAABBBBB
AAAAABBBBB
AAAAABBBBB
AAAAABBBBB
CCCCCDDDDD
CCCCCDDDDD
CCCCCDDDDD
CCCCCDDDDD
CCCCCDDDDD

и код следует.

#include <stdio.h>
#include <math.h>
#include <stdlib.h>
#include "mpi.h"

int malloc2dchar(char ***array, int n, int m) {

    /* allocate the n*m contiguous items */
    char *p = (char *)malloc(n*m*sizeof(char));
    if (!p) return -1;

    /* allocate the row pointers into the memory */
    (*array) = (char **)malloc(n*sizeof(char*));
    if (!(*array)) {
       free(p);
       return -1;
    }

    /* set up the pointers into the contiguous memory */
    for (int i=0; i<n; i++)
       (*array)[i] = &(p[i*m]);

    return 0;
}

int free2dchar(char ***array) {
    /* free the memory - the first element of the array is at the start */
    free(&((*array)[0][0]));

    /* free the pointers into the memory */
    free(*array);

    return 0;
}

int main(int argc, char **argv) {
    char **global, **local;
    const int gridsize=10; // size of grid
    const int procgridsize=2;  // size of process grid
    int rank, size;        // rank of current process and no. of processes

    MPI_Init(&argc, &argv);
    MPI_Comm_size(MPI_COMM_WORLD, &size);
    MPI_Comm_rank(MPI_COMM_WORLD, &rank);


    if (size != procgridsize*procgridsize) {
        fprintf(stderr,"%s: Only works with np=%d for now\n", argv[0], procgridsize);
        MPI_Abort(MPI_COMM_WORLD,1);
    }


    if (rank == 0) {
        /* fill in the array, and print it */
        malloc2dchar(&global, gridsize, gridsize);
        for (int i=0; i<gridsize; i++) {
            for (int j=0; j<gridsize; j++)
                global[i][j] = '0'+(3*i+j)%10;
        }


        printf("Global array is:\n");
        for (int i=0; i<gridsize; i++) {
            for (int j=0; j<gridsize; j++)
                putchar(global[i][j]);

            printf("\n");
        }
    }

    /* create the local array which we'll process */
    malloc2dchar(&local, gridsize/procgridsize, gridsize/procgridsize);

    /* create a datatype to describe the subarrays of the global array */

    int sizes[2]    = {gridsize, gridsize};         /* global size */
    int subsizes[2] = {gridsize/procgridsize, gridsize/procgridsize};     /* local size */
    int starts[2]   = {0,0};                        /* where this one starts */
    MPI_Datatype type, subarrtype;
    MPI_Type_create_subarray(2, sizes, subsizes, starts, MPI_ORDER_C, MPI_CHAR, &type);
    MPI_Type_create_resized(type, 0, gridsize/procgridsize*sizeof(char), &subarrtype);
    MPI_Type_commit(&subarrtype);

    char *globalptr=NULL;
    if (rank == 0) globalptr = &(global[0][0]);

    /* scatter the array to all processors */
    int sendcounts[procgridsize*procgridsize];
    int displs[procgridsize*procgridsize];

    if (rank == 0) {
        for (int i=0; i<procgridsize*procgridsize; i++) sendcounts[i] = 1;
        int disp = 0;
        for (int i=0; i<procgridsize; i++) {
            for (int j=0; j<procgridsize; j++) {
                displs[i*procgridsize+j] = disp;
                disp += 1;
            }
            disp += ((gridsize/procgridsize)-1)*procgridsize;
        }
    }


    MPI_Scatterv(globalptr, sendcounts, displs, subarrtype, &(local[0][0]),
                 gridsize*gridsize/(procgridsize*procgridsize), MPI_CHAR,
                 0, MPI_COMM_WORLD);

    /* now all processors print their local data: */

    for (int p=0; p<size; p++) {
        if (rank == p) {
            printf("Local process on rank %d is:\n", rank);
            for (int i=0; i<gridsize/procgridsize; i++) {
                putchar('|');
                for (int j=0; j<gridsize/procgridsize; j++) {
                    putchar(local[i][j]);
                }
                printf("|\n");
            }
        }
        MPI_Barrier(MPI_COMM_WORLD);
    }

    /* now each processor has its local array, and can process it */
    for (int i=0; i<gridsize/procgridsize; i++) {
        for (int j=0; j<gridsize/procgridsize; j++) {
            local[i][j] = 'A' + rank;
        }
    }

    /* it all goes back to process 0 */
    MPI_Gatherv(&(local[0][0]), gridsize*gridsize/(procgridsize*procgridsize),  MPI_CHAR,
                 globalptr, sendcounts, displs, subarrtype,
                 0, MPI_COMM_WORLD);

    /* don't need the local data anymore */
    free2dchar(&local);

    /* or the MPI data type */
    MPI_Type_free(&subarrtype);

    if (rank == 0) {
        printf("Processed grid:\n");
        for (int i=0; i<gridsize; i++) {
            for (int j=0; j<gridsize; j++) {
                putchar(global[i][j]);
            }
            printf("\n");
        }

        free2dchar(&global);
    }


    MPI_Finalize();

    return 0;
}

Ответ 2

Мне просто легче было проверить это.

#include <stdio.h>
#include <math.h>
#include <stdlib.h>
#include "mpi.h"

/*
 This is a version with integers, rather than char arrays, presented in this
 very good answer: http://stackoverflow.com/a/9271753/2411320
 It will initialize the 2D array, scatter it, increase every value by 1 and then gather it back.
*/

int malloc2D(int ***array, int n, int m) {
    int i;
    /* allocate the n*m contiguous items */
    int *p = malloc(n*m*sizeof(int));
    if (!p) return -1;

    /* allocate the row pointers into the memory */
    (*array) = malloc(n*sizeof(int*));
    if (!(*array)) {
       free(p);
       return -1;
    }

    /* set up the pointers into the contiguous memory */
    for (i=0; i<n; i++)
       (*array)[i] = &(p[i*m]);

    return 0;
}

int free2D(int ***array) {
    /* free the memory - the first element of the array is at the start */
    free(&((*array)[0][0]));

    /* free the pointers into the memory */
    free(*array);

    return 0;
}

int main(int argc, char **argv) {
    int **global, **local;
    const int gridsize=4; // size of grid
    const int procgridsize=2;  // size of process grid
    int rank, size;        // rank of current process and no. of processes
    int i, j, p;

    MPI_Init(&argc, &argv);
    MPI_Comm_size(MPI_COMM_WORLD, &size);
    MPI_Comm_rank(MPI_COMM_WORLD, &rank);


    if (size != procgridsize*procgridsize) {
        fprintf(stderr,"%s: Only works with np=%d for now\n", argv[0], procgridsize);
        MPI_Abort(MPI_COMM_WORLD,1);
    }


    if (rank == 0) {
        /* fill in the array, and print it */
        malloc2D(&global, gridsize, gridsize);
        int counter = 0;
        for (i=0; i<gridsize; i++) {
            for (j=0; j<gridsize; j++)
                global[i][j] = ++counter;
        }


        printf("Global array is:\n");
        for (i=0; i<gridsize; i++) {
            for (j=0; j<gridsize; j++) {
                printf("%2d ", global[i][j]);
            }
            printf("\n");
        }
    }
    //return;

    /* create the local array which we'll process */
    malloc2D(&local, gridsize/procgridsize, gridsize/procgridsize);

    /* create a datatype to describe the subarrays of the global array */
    int sizes[2]    = {gridsize, gridsize};         /* global size */
    int subsizes[2] = {gridsize/procgridsize, gridsize/procgridsize};     /* local size */
    int starts[2]   = {0,0};                        /* where this one starts */
    MPI_Datatype type, subarrtype;
    MPI_Type_create_subarray(2, sizes, subsizes, starts, MPI_ORDER_C, MPI_INT, &type);
    MPI_Type_create_resized(type, 0, gridsize/procgridsize*sizeof(int), &subarrtype);
    MPI_Type_commit(&subarrtype);

    int *globalptr=NULL;
    if (rank == 0)
        globalptr = &(global[0][0]);

    /* scatter the array to all processors */
    int sendcounts[procgridsize*procgridsize];
    int displs[procgridsize*procgridsize];

    if (rank == 0) {
        for (i=0; i<procgridsize*procgridsize; i++)
            sendcounts[i] = 1;
        int disp = 0;
        for (i=0; i<procgridsize; i++) {
            for (j=0; j<procgridsize; j++) {
                displs[i*procgridsize+j] = disp;
                disp += 1;
            }
            disp += ((gridsize/procgridsize)-1)*procgridsize;
        }
    }


    MPI_Scatterv(globalptr, sendcounts, displs, subarrtype, &(local[0][0]),
                 gridsize*gridsize/(procgridsize*procgridsize), MPI_INT,
                 0, MPI_COMM_WORLD);

    /* now all processors print their local data: */

    for (p=0; p<size; p++) {
        if (rank == p) {
            printf("Local process on rank %d is:\n", rank);
            for (i=0; i<gridsize/procgridsize; i++) {
                putchar('|');
                for (j=0; j<gridsize/procgridsize; j++) {
                    printf("%2d ", local[i][j]);
                }
                printf("|\n");
            }
        }
        MPI_Barrier(MPI_COMM_WORLD);
    }

    /* now each processor has its local array, and can process it */
    for (i=0; i<gridsize/procgridsize; i++) {
        for (j=0; j<gridsize/procgridsize; j++) {
            local[i][j] += 1; // increase by one the value
        }
    }

    /* it all goes back to process 0 */
    MPI_Gatherv(&(local[0][0]), gridsize*gridsize/(procgridsize*procgridsize),  MPI_INT,
                 globalptr, sendcounts, displs, subarrtype,
                 0, MPI_COMM_WORLD);

    /* don't need the local data anymore */
    free2D(&local);

    /* or the MPI data type */
    MPI_Type_free(&subarrtype);

    if (rank == 0) {
        printf("Processed grid:\n");
        for (i=0; i<gridsize; i++) {
            for (j=0; j<gridsize; j++) {
                printf("%2d ", global[i][j]);
            }
            printf("\n");
        }

        free2D(&global);
    }


    MPI_Finalize();

    return 0;
}

Вывод:

linux16:>mpicc -o main main.c
linux16:>mpiexec -n 4 main Global array is:
 1  2  3  4
 5  6  7  8
 9 10 11 12
13 14 15 16
Local process on rank 0 is:
| 1  2 |
| 5  6 |
Local process on rank 1 is:
| 3  4 |
| 7  8 |
Local process on rank 2 is:
| 9 10 |
|13 14 |
Local process on rank 3 is:
|11 12 |
|15 16 |
Processed grid:
 2  3  4  5
 6  7  8  9
10 11 12 13
14 15 16 17