Distributed parallel algorithm

Since the computation of the integral depends primarily only on the sum of the terms \(2f(x_{i})\), this task can be divided among a number of processes, computed independently, and the results of those individual tasks can be combined to get the sum. This is the basis for a distributed parallel algorithm approach.

MPI – Message Passing Interface

The MPI model of parallel computing is based on a set of independent computing processes which can pass messages among themselves. The number of these computing elements need not agree with the number of computer processors (this will be explored later). It is important to understand that the computing elements are asynchronous, and if a synchronous action is required then this must be accomplished by coordinated message passing. MPI provides a set of functions to create a set of computing processes, to pass messages between the computing elements, and to finalize a set of computing elements. As well, MPI provides a set of data types that should be used for all MPI arguments.

In the following subsection, a distributed parallel algorithm for the Trapezoidal Rule is implemented using MPI. All the MPI function calls will be carefully explained in the context of the Trapezoidal Rule.

MPI implementation

This algorithm can be easily implemented using MPI (message passing interface). Assign the independent tasks to different computing elements, and compute the sum from the partial sums. Essentially these steps of the computation are done in lines 55-67 of the following code. However, it is necessary to include other MPI related statements. The following discussion will focus on all the MPI related statements.

In the following discussion, and in the implementation, the full name of each MPI function will be specified for clarity, for example MPI::Init() instead of merely Init(). In the implementation, with the addition of using namespace MPI; the syntax could be simpler, but perhaps not as clear.

On line 34, MPI is initiated using MPI::Init() using the parameters passed into the program via mpirun. Normally, to run a program using MPI this is done from the command line with a call similar to mpirun -n 4 myprogram, where the argument -n 4 indicates that four independent processors are required and each of these independent processors should begin execution of myprogram.

On lines 38-39, the local environment is discovered, MPI::COMM_WORLD.Get_size() returns the total number of processors involved in the computation, and MPI::COMM_WORLD.Get_rank() returns the number of this processor. Keep in mind that all the processors are executing this code simultaneously.

On line 50, processor 0 gets the current time using MPI::Wtime().

On line 55, all processors participate in a broadcast, using MPI::COMM_WORLD.Bcast(). Processor 0 sends the value of n, the number of subdivisions, to all processors. For all processors, other than processor 0, the value of n is read. MPI::COMM_WORLD.Bcast() is a synchronizing action (blocking).

On line 59, each processor calls trapezoid(), defined in the previous section, to compute its portion of the work. Essentially, processor j computes \(\int_{x_{(j)numprocs}}^{x_{(j+1)numprocs}}f(x)dx\) using the sequential Trapezoidal Rule with \(\frac{n}{numprocs}\) subintervals.

On line 67, all processors participate in a reduction using MPI::COMM_WORLD.Reduce() with the predefined Reduce operation MPI::SUM. All processors send their value of psum to processor 0 and processor 0 accumulates the sum in a variable named sum.

On lines 73-77, only executed by processor 0, MPI::Wtime() is called again to get the current time, the amount of elapsed time for the parallel computation is computed, and the integral value and the elapsed time are output.

On line 82, each processor shuts down its MPI process using MPI::Finalize().

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
#include <iostream>
#include <iomanip>
#include <string>
#include <cmath>

using namespace std;

#include "mpi.h"

// Aunction to implement integration of f(x) over the interval
// [a,b] using the trapezoid rule with nsub subdivisions.
double trapezoid(double a, double b, int nsub, double (*f)(double) );


// The function to be integrated
double myfunction( double a );


//
// The main program that is executed on each
// processor

int main( int argc, char* argv[])
{
	double a,b;
	int n;
    int myid, numprocs;
    double psum=0.0, sum=0.0;

    // for timing
    double startwtime = 0.0, endwtime;

    // pass command line arguments to MPI
    MPI::Init(argc, argv);

    // Collect information about the number of
    // processors and names of the processors
    numprocs =MPI::COMM_WORLD.Get_size();
    myid = MPI::COMM_WORLD.Get_rank();

	a=1.0;
	b=2.0;

    // Root node sets the number of subdivisions used
    // in the evaluation of the integral, also start
    // the timer
    if (myid == 0)
      {		
	    n=100000; 
	    startwtime = MPI::Wtime();
       }

     // Broadcast from the root nodes to all others the
     // number of subdivisions (rendezvous)
     MPI::COMM_WORLD.Bcast(&n, 1, MPI::INT, 0);


     // each processor (including 0) computes its share of sums
     psum = trapezoid(a+myid*(b-a)/numprocs,
		       a+(myid+1)*(b-a)/numprocs,
                      n/numprocs,&myfunction);
      


     // the processors add together their partial sum values
     // into the sum and give the answer to processor 0 (rendezvous)
     MPI::COMM_WORLD.Reduce(&psum, &sum, 1, MPI::DOUBLE, MPI::SUM, 0);

     // the root processor prints the sum, the "error", and
     // the elapsed clock time
     if (myid == 0)
	{
          endwtime = MPI::Wtime();
	   cout << sum << endl;
          cout << "wall clock time (seconds) = " 
		<< scientific << setprecision(4) << endwtime-startwtime << endl;	       
	}
        
    

    // shutdown MPI on the processor
    MPI::Finalize();

    return 0;
}

Key MPI function calls

First the MPI computing environment must be established and made known to each of the independent computing processors. Using MPI::Init(), with the command line arguments, the distributed parallel system, based on MPI is initialized. The two MPI functions, MPI::COMM_WORLD.getSize() and MPI::COMM_WORLD.GetRank() assign values to numprocs and myid, respectively. These two variables tell this processor about its computing environment.

Now the numprocs independent computing processors must communicate to solve the problem together. Using MPI::COMM_WORLD.Bcast() each of the tasks is given a significant portion of the problem to solve (course grain parallelism). Each task simultaneously computes a partial sum. Each of the numprocs tasks adds its partial sum to the others via MPI::COMM_WORLD.Reduce().

Finally, to shutdown all the independent tasks, each of the numprocs tasks executes MPI::Finalize().

Speed up

For any parallel algorithm, the speedup is the ratio of the execution times of the sequential and parallel implementations. Since the independent tasks can execute simultaneously we certainly expect speedup. From our sequential discussion we have that the execution time is \(\Theta(n)\), now that we simultaneously solve numprocs problems of size \(\frac{n}{numprocs}\), we get that the maximum speedup is:

\[S(numprocs) = \frac{T(1)}{T(numprocs)} = \frac{\Theta(n)}{\left(\frac{\Theta(n)}{numprocs}\right)} = numprocs\]

However, this computation does not take into account the communication time required.

Each independent processor requires only a few variables for its computation, so the total amount of space required is \(\Theta(numprocs)\).

Questions

  1. Verify that this parallel implementation does compute the integral using the Trapezoidal Rule. In particular, make sure that the sum is computed correctly at the endpoints of each subdivision.
  2. Copy, compile and run this program. You will need to include trapezoid() and myfunction().
  3. Extend the table from the sequential algorithm for various numbers of processors and number of subdivisions. What can you say about speedup from this set of runs?
  4. Can we naively increase the number of independent tasks and always expect speed up? Justify your answer.
  5. What might be a limiting factor for overall speed up? Does your table of speedup agree with your answer?
  6. For our example, do you have a recommendation on the optimal number of processors to achieve maximum speedup?

References

[mpi]Online MPI Information, www.open-mpi.org.

Table Of Contents

Previous topic

Sequential Algorithm

Next topic

Midpoint Rule

This Page