Parallel sorting by regular sampling: implementation

A C++ with MPI implementation is discussed in some detail.

Phase I is implemented in lines 1-94 of the program. Essentially the arrays needed by the processors are setup, each processor gets MPI specific information, and the root processor (#0) sets the data size, random seed value, and generates the entire data set. All processors execute MPI::Init(), MPI::COMM_WORLD.Get_size() and MPI::COMM_WORLD.Get_rank() in order to initiate an MPI session, determine the number of processors in the session and to establish the rank of each individual processor, respectively.

Phase II is implemented in lines 97-122. First, using MPI::COMM_WORLD.Scatterv() the root processor divides up the data set among the various processors, as uniformly as possible. the last processor gets the few extra items, if any. Since there is a possibility of different sizes of the data blocks, MPI::COMM_WORLD.Scatterv() must be used instead of Scatter(). The root processor uses the MPI_IN_PLACE option to scatter its data to itself, on line 105; the other transfers are handled on line 110. All processors quicksort their smaller piece of the data. This is the only time a sorting routine is directly called. Next, all processors regularly sample their sorted data for numproc pivot candidates.

Phase III is implemented in lines 125-166. First, MPI::COMM_WORLD.Gather() collects numproc pivot candidates from each processor. As above, the root processor uses the MPI_IN_PLACE option to gather its candidates from itself. Next, the root processor uses the multimerge algorithm to merge the numproc sorted lists into one sorted list. Next, the root processor regularly samples this sorted list to produce numproc pivot values. Lastly, MPI::COMM_WORLD.Bcast() distributes the numproc pivot values to all the processors.

Phase IV is implemented in lines 169-202. Using the numproc pivot values, each processor partitions its (sorted) data into numproc classes, determined by the pivot values. The beginning and length of each class is stored in the arrays classStart[] and classLength[], both of which have length numprocs.

Phase V is implemented in lines 205-249. This is the most tricky of the phases. Each processor (i) wants to collect the elements from the other processors in class i determined by the pivot points. For each processor j the arrays classStart[] and classLength[] contains class j‘s starting points and lengths. However, processor i does not have access to processor j‘s information, so it must be shared.

Each processor i requests from the processors j it classLength[], this is done on line 198 with MPI::COMM_WORLD.Gather() . These values are stored in the array recvLengths[] on processor i. On processor i, where to store the other processors data is computed in lines 204-211, and stored in the array recvStarts[]. Keep in mind that there is no reason these classes must have uniform size.

On line 251, MPI::COMM_WORLD.Gatherv() is moves variable length data from processors j to processor i. The first two parameters &myData[classStart[iprocessor]] and classLength[iprocess] are used by the sending processors to specify where the data is to sent from, and the three parameters recvbuffer, recvLengths and recvStarts are used by the receiving processor to specify where the data is to be stored to.

After all the (sorted) class information is moved to the appropriate processors, then multimerge is used on each processor to sort this information.

Phase VI starts on line 251. The last task is to move the sorted data from the processors back to the root. The complication (again) is that the sorted lists most certainly have different sizes. As before, each processor knows the length of its list, but the root processor does not. This length information must be shared before the sorted data can be collected. As in the previous phase, this is accomplished by a call to MPI::COMM_WORLD.Gather() followed by one to MPI::COMM_WORLD.Gatherv() . On line 259, the root processor gathers the lengths of the data from each processor. Note, the root processor only needs to gather one integer from each processor. These integers are stored in the array sendLengths[]. On lines 263-270, the root processor uses the lengths of the data on the other processors to compute where each of these data sets should be stored, these starting points are stored in the array sendStarts[]. Finally, MPI::COMM_WORLD.Gatherv() collects the variable sized data into the array sorteddata[]. The first two parameters, myData and mysendLength, are variables on the sending processor, and the three parameters sortedData, sendLengths and sendStarts are on the root processor.

Finally the root processor computes and displays the elapsed time.

C++ with MPI implementation of PSRS

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
// Parallel sorting using regular sampling
// C++ implementation using MPI
//
// David John, August 2013
// (c) Wake Forest University, 2013

#include <iostream>
#include <iomanip>
#include <string>
#include <cstdlib>
#include <algorithm>
#include <string.h>

using namespace std;

#include "mpi.h"
#include "multimerge.h"
#include "utilities.h"

// The main program that is executed on each processor

int main( int argc, char* argv[])
{
   	// processor rank, and total number of processors
    int myid, numprocs;
    
    // for timing used by root processor (#0)
    double startwtime = 0.0, endwtime;


	// *******************************************
	//
	// PHASE I:  Initialization

    // start MPI and pass command line arguments to MPI
    MPI::Init(argc, argv);

    // Collect information about the number of
    // processors and rank of this processor
    numprocs = MPI::COMM_WORLD.Get_size();
    myid = MPI::COMM_WORLD.Get_rank();

	// look through arguments for:
	//		 -DS nnnn to set myDataSize
	//		 -SR nnnn to set randomSeed
	int randomSeed = 1000;
	int myDataSize = 4000;
	for(int i=0;i<argc;i++)
	{
		// search for special arguments
		if (strcmp(argv[i],"-DS")==0)
		{
			myDataSize = atoi(argv[i+1]); i++;
		}
		else if (strcmp(argv[i],"-SR")==0)
		{
			randomSeed = atoi(argv[i+1]); i++;
		}
	}

    // array to hold input data to sort
	// myDataLengths[] and myDataStarts[] used with Scatterv() to distribute
	// myData[] across processors
	int myData[myDataSize];
	int myDataLengths[numprocs];
	int myDataStarts[numprocs];
	
	// communication buffer used for determination of pivot values
	int pivotbuffer[numprocs*numprocs];
	int pivotbufferSize;

	// Compute the individual lengths of mydata[] to be distributed
	// to the numproc processors.  The last processor gets the "extras".
	for(int i=0;i<numprocs;i++)
	{
		myDataLengths[i] = myDataSize/numprocs;
		myDataStarts[i]= i*myDataSize/numprocs;
	}
	myDataLengths[numprocs-1]+=(myDataSize%numprocs);

    // Root node initializes the testing data, and also starts
    // the timer
    if (myid == 0)
    {
		// set random seed and randomly generate testing data
		srandom(randomSeed);
		for(int index=0; index<myDataSize; index++)
		{
			myData[index] = random()% 900;
		}
		
		startwtime = MPI::Wtime();
    }


	// *******************************************
	//
	// PHASE II:  Scatter data, local sorts and collect regular sample

	//  The data is scattered to all processors from the root processor (#0)
	//  (root processor) does "in place".
	if (myid==0)
	{
		MPI::COMM_WORLD.Scatterv(myData,myDataLengths,myDataStarts,MPI::INT,
				MPI_IN_PLACE,myDataLengths[myid],MPI::INT,0);
	}
	else
	{			
		MPI::COMM_WORLD.Scatterv(myData,myDataLengths,myDataStarts,MPI::INT,
				myData,myDataLengths[myid],MPI::INT,0);
	}
      
	// All processors sort their piece of the data using cstdlib::quicksort
	qsort(myData,myDataLengths[myid], sizeof(int), compare_ints);

	// All processors collect regular samples from sorted list
	// Consider an offset to the myData[] index
	for(int index=0;index<numprocs;index++)
	{
		pivotbuffer[index]= myData[index*myDataLengths[myid]/numprocs];
	}


	// *******************************************
	//
	// PHASE III:  Gather and merge samples, and broadcast p-1 pivots

	// The root processor gathers all pivot candidates from the processors, root
	// processor has data "in place"
	if (myid==0)
	{
		MPI::COMM_WORLD.Gather(MPI_IN_PLACE,numprocs,MPI::INT,
			pivotbuffer,numprocs,MPI::INT,0);
	}
	else
	{
		MPI::COMM_WORLD.Gather(pivotbuffer,numprocs,MPI::INT,
			pivotbuffer,numprocs,MPI::INT,0);
	}

	//  Root processor multimerges the lists together and then selects
	//  final pivot values to broadcast
	if (myid == 0)
	{
		// multimerge the numproc sorted lists into one
		int *starts[numprocs];  // array of lists
		int lengths[numprocs];  // array of lengths of lists
		for(int i=0;i<numprocs;i++)
		{
			starts[i]=&pivotbuffer[i*numprocs];
			lengths[i]=numprocs;
		}
		int tempbuffer[numprocs*numprocs];  // merged list
		multimerge(starts,lengths,numprocs,tempbuffer,numprocs*numprocs);

		// regularly select numprocs-1 of pivot candidates to broadcast
		// as partition pivot values for myData
		for(int i=0; i<numprocs-1; i++)
		{
			pivotbuffer[i] = tempbuffer[(i+1)*numprocs];
		}				
	}

	// Root processor (#0) broadcasts the partition values
	MPI::COMM_WORLD.Bcast(pivotbuffer,numprocs-1,MPI::INT,0);


	// *******************************************
	//
	// PHASE IV: Local data partitioned

	// All processors partition their data members based on the
	// pivot values stored in pivotbuffer[].

	// Partition information for myData[]: 
	// 		index of beginning of ith class is classStart[i],
	//		length of ith class is classLength[i], and
	// 		members of ith class, myData[j], have the property
	//   		pivotbuffer[i-1]<= myData[j] < pivotbuffer[i]
	int classStart[numprocs];
	int classLength[numprocs];
	
	// need for each processor to partition its list using the values
	// of pivotbuffer
	int dataindex=0;
	for(int classindex=0; classindex<numprocs-1; classindex++)
	{
		classStart[classindex] = dataindex;
		classLength[classindex]=0;

		// as long as dataindex refers to data in the current class
		while((dataindex< myDataLengths[myid]) 
			&& (myData[dataindex]<=pivotbuffer[classindex]))
		{
			classLength[classindex]++;
			dataindex++;
		}		
	}
	// set Start and Length for last class
	classStart[numprocs-1] = dataindex;
	classLength[numprocs-1] = myDataLengths[myid] - dataindex;
	
	
	// *******************************************
	//
	// PHASE V:  All ith classes are gathered by processor i 
	int recvbuffer[myDataSize];    // buffer to hold all members of class i
	int recvLengths[numprocs];     // on myid, lengths of each myid^th class
	int recvStarts[numprocs];      // indices of where to start the store from 0, 1, ...

	// processor iprocessor functions as the root and gathers from the
	// other processors all of its sorted values in the iprocessor^th class.  
	for(int iprocessor=0; iprocessor<numprocs; iprocessor++)
	{	
		// Each processor, iprocessor gathers up the numproc lengths of the sorted
		// values in the iprocessor class
		MPI::COMM_WORLD.Gather(&classLength[iprocessor], 1, MPI::INT, 
			recvLengths,1,MPI::INT,iprocessor);
	

		// From these lengths the myid^th class starts are computed on
		// processor myid
		if (myid == iprocessor)
		{
			recvStarts[0]=0;
			for(int i=1;i<numprocs; i++)
			{
				recvStarts[i] = recvStarts[i-1]+recvLengths[i-1];
			}
		}

		// each iprocessor gathers up all the members of the iprocessor^th 
		// classes from the other nodes
		MPI::COMM_WORLD.Gatherv(&myData[classStart[iprocessor]],
			classLength[iprocessor],MPI::INT,
			recvbuffer,recvLengths,recvStarts,MPI::INT,iprocessor);
	}
		
	
	// multimerge these numproc lists on each processor
	int *mmStarts[numprocs]; // array of list starts
	for(int i=0;i<numprocs;i++)
	{
		mmStarts[i]=recvbuffer+recvStarts[i];
	}
	multimerge(mmStarts,recvLengths,numprocs,myData,myDataSize);
	
	int mysendLength = recvStarts[numprocs-1] + recvLengths[numprocs-1];
	
	// *******************************************
	//
	// PHASE VI:  Root processor collects all the data


	int sendLengths[numprocs]; // lengths of consolidated classes
	int sendStarts[numprocs];  // starting points of classes
	// Root processor gathers up the lengths of all the data to be gathered
	MPI::COMM_WORLD.Gather(&mysendLength,1,MPI::INT,
		sendLengths,1,MPI::INT,0);

	// The root processor compute starts from lengths of classes to gather
	if (myid == 0)
	{
		sendStarts[0]=0;
		for(int i=1; i<numprocs; i++)
		{
			sendStarts[i] = sendStarts[i-1]+sendLengths[i-1];
		}	
	}

	// Now we let processor #0 gather the pieces and glue them together in
	// the right order
	int sortedData[myDataSize];
	MPI::COMM_WORLD.Gatherv(myData,mysendLength,MPI::INT,
		sortedData,sendLengths,sendStarts,MPI::INT,0);

	// the root processor prints the elapsed clock time
    if (myid == 0)
	{
		endwtime = MPI::Wtime();
        cout << "wall clock time (seconds) = " 
		     << scientific << setprecision(4) << endwtime-startwtime << endl;

		cout << "Data set " << issorted(sortedData,myDataSize) << " sorted:" 
			<< endl;	     
	}
        
    // shutdown MPI on the processor
    MPI::Finalize();
    return 0;
}