Parallel Quicksort Algorithm
This blog discusses the parallel implementation of the Quicksort algorithm.
But before I start, let me explain what a parallel algorithm is.
A parallel algorithm is an algorithm which can do multiple operations in a given time.
It has been a tradition of computer science to describe serial algorithms in abstract machine models, often the one known as random-access machine.
Similarly, many computer science researchers have used a so-called parallel random-access machine (PRAM) as a parallel abstract machine (shared-memory).
Parallelism is an implementation property. Parallelism is literally the simultaneous physical execution of tasks at runtime, and it requires hardware with multiple computing resources.
It resides on the hardware layer. Parallelism is a subclass of concurrency: before you can do several tasks at once, you have to manage several tasks first.
Example of parallel algorithm -
Quicksort is a type of sorting, so let us also discuss parallel sorting algorithms.
Some of them are —
1. Parallel Bubble sort
2. Parallel Merge sort
3. Shear sort
4. Parallel Quick Sort
5. Bitonic Sort
And here is a graph depicting their worst-case time complexities -
Now coming to the main point, i.e. the -
Parallel Quicksort algorithm
It uses the divide and conquer approach. This makes quicksort amenable to parallelization using task parallelism. It uses parallelization and some processes are solved concurrently.
Note -
Parallelization and concurrency are not the same. Parallelization is just a subset of concurrency.
How to perform a parallel quicksort -
1. Start a process to find pivot and partition the list into two, p < =p > p.
2. At each step, start p processes proportional to n partitions.
3. Each process finds the pivot element and divides the list based on the selected pivot.
4. Finally processes values are merged, a sorted list is returned.
Here is a pictorial representation of the implementation of the algorithm -
This method is actually not optimized as the pivot has been chosen poorly.
The time complexity we get here is O(n²).
So, this method is referred to as the naive approach sometimes
But, we can optimize it -
Instead of doubling the number of processes at each step, this approach uses n number of processes throughout the whole algorithm to find pivot element and rearrange the list. All these processes run concurrently at each step sorting the lists.
1. Start n processes which will partition the list and sort it using selected pivot element.
2. N processes will work on all partitions from the start of the algorithm till the list is sorted.
3. Each processes finds a pivot and partitions the list based on selected pivot.
4. Finally, the list is merged forming a sorted list.
As an example take the quicksort algorithm for shared address space
Here, we get a time complexity of O(n2).
Space complexity is O(log(n)).
The worst cases resulting from poor choice of pivot or poor handling of equality correspond to fairly common situations — and can be attacked with ad hoc heuristic techniques applicable only to specific distributions (e.g., use of a sample median where a linear distribution is expected), or by randomization techniques (i.e., the pivot is chosen randomly and any element equal to it is distributed randomly).
A problem with parallel algorithms is ensuring that they are suitably load balanced, by ensuring that load (overall work) is balanced, rather than input size being balanced.
For example, checking all numbers from one to a hundred thousand for primality is easy to split among processors. However, if the numbers are simply divided out evenly (1–1,000, 1,001–2,000, etc.), the amount of work will be unbalanced, as smaller numbers are easier to process by this algorithm (easier to test for primality), and thus some processors will get more work to do than the others, which will sit idle until the loaded processors complete.
Pivot selection is difficult, and it significantly affects the algorithm’s performance. Consider the case in which the first pivot happens to be the largest element in the sequence. In this case, after the first split, one of the processes will be assigned only one element, and the remaining p — 1 processes will be assigned n — 1 elements.
Hence, we are faced with a problem whose size has been reduced only by one element but only p — 1 processes will participate in the sorting operation.
Ideally, the split should be done such that each partition has a non-trivial fraction of the original array.
We could select a random pivot if we want.
It is actually analogous to it’s sequential counterpart.
But, it is not efficient for parallel algorithms.
In the parallel formulation, one poor pivot may lead to a partitioning in which a process becomes idle, and that persists throughout the execution of the algorithm.
But in sequential quicksort, if all subsequent pivot selections are good, one poor pivot will increase the overall work by at most an amount equal to the length of the subsequence.
Thus, it will not significantly degrade the performance of sequential quicksort.
Then, there other variations of Parallel Quicksort Algorithm too — Hyper Quicksort, Parallel quicksort by regular sampling (PRSR).
· Hyper quicksort has a communication overhead in the values are passed between partner processes.
· It has load balancing but load imbalance does occur.
For PRSR –
· They have the better Load balancing.
· No overhead is there.
· The number of processes don’t have to be a power of 2, during the algorithm some processes can be freed up depending on the order of elements and pivots selected.
PRSR is the best algorithm. Hyper quicksort is the second best algorithm. And parallel quicksort is the worst of them all.
However the topic of this blog is parallel quicksort so I will be focusing on it only.
Parallelizing sequential algorithms -
Say we had a program for sequential quicksort.
This is the actual implementation of the algorithm -
public static void QuickSort<T>(T[] array) where T : IComparable<T>
{
QuickSortInternal(array, 0, array.Length - 1);
}
private static void QuickSortInternal<T>(T[] array, int left, int right)
where T : IComparable<T>
{
if (left >= right)
{
return;
}
SwapElements(array, left, (left + right) / 2);
int last = left;
for (int current = left + 1; current <= right; ++current)
{
if (array[current].CompareTo(array[left]) < 0)
{
++last;
SwapElements(array, last, current);
}
}
SwapElements(array, left, last);
QuickSortInternal(array, left, last - 1);
QuickSortInternal(array, last + 1, right);
}
static void SwapElements<T>(T[] array, int i, int j)
{
T temp = array[i];
array[i] = array[j];
array[j] = temp;
}
We want to parallelize it.
First, we are considering the naive parallel formulation.
During each call of QUICKSORT, the array is partitioned into two parts and each part is solved recursively. Sorting the smaller arrays represents two completely independent subproblems that can be solved in parallel.
Therefore, one way to parallelize quicksort is to execute it initially on a single process. Then, when the algorithm performs its recursive calls, assign one of the subproblems to another process.
Now the processes will sort their subarrays by using quicksort and assign one of their subproblems to other processes.
The algorithm terminates when the arrays cannot be further partitioned.
At termination, each process holds an element of the array, and the sorted order can be recovered by traversing the processes.
Its major drawback is that partitioning the array A[q …r ] into two smaller arrays, A[q …s] and A[s + 1 …r ], is done by a single process. Since one process must partition the original array A[1 …n], the run time of this formulation is bounded below by W(n). This is not cost-effective.
However, performing partitioning in parallel will help in obtaining an efficient parallel quicksort.
The partitioning will be done concurrently.
Consider the recurrence equation T (n) = 2T (n/2) + Q(n), which gives the complexity of quicksort for optimal pivot selection. The term Q(n) is due to the partitioning of the array. Compare this complexity with the overall complexity of the algorithm, Q(n*log n).
From these two complexities, we can think of the quicksort algorithm as consisting of Q(log n) steps, each requiring time Q(n) — that of splitting the array. Therefore, if the partitioning step is performed in time Q(1), using Q(n) processes, it is possible to obtain an overall parallel run time of Q(log n), which leads to a cost-optimal formulation.
Without parallelizing, we can use only Q(log n) processes to sort n elements in time Q(n).
C++ implementation of Parallel Quicksort using the OpenMP API -
#include <bits/stdc++.h>
#include <omp.h>
using namespace std;
// Function to swap two numbers a and b
void swap(int* a, int* b)
{
int t = *a;
*a = *b;
*b = t;
}
// Function to perform the partitioning
// of array arr[]
int partition(int arr[], int start, int end)
{
// Declaration
int pivot = arr[end];
int i = (start - 1);
// Rearranging the array
for (int j = start; j <= end - 1; j++) {
if (arr[j] < pivot) {
i++;
swap(&arr[i], &arr[j]);
}
}
swap(&arr[i + 1], &arr[end]);
// Returning the respective index
return (i + 1);
}
// Function to perform QuickSort Algorithm
// using openmp
void quicksort(int arr[], int start, int end)
{
// Declaration
int index;
if (start < end) {
// Getting the index of pivot
// by partitioning
index = partition(arr, start, end);
// Parallel sections
#pragma omp parallel sections
{
#pragma omp section
{
// Evaluating the left half
quicksort(arr, start, index - 1);
}
#pragma omp section
{
// Evaluating the right half
quicksort(arr, index + 1, end);
}
}
}
}
// Driver Code
int main()
{
// Declaration
int N;
// Taking input the number of
// elements we wants
cout << "Enter the number of elements"
<< " you want to Enter\n";
cin >> N;
// Declaration of array
int arr[N];
cout << "Enter the array: \n";
// Taking input that array
for (int i = 0; i < N; i++) {
cin >> arr[i];
}
// Calling quicksort having parallel
// code implementation
quicksort(arr, 0, N - 1);
// Printing the sorted array
cout << "Array after Sorting is: \n";
for (int i = 0; i < N; i++) {
cout << arr[i] << " ";
}
return 0;
}
Open Multi-Processing is also called OMP. It’s an Application Program Interface (API) that may be used to explicitly direct multi-threaded, shared memory parallelism. In C/ C++, “omp.h” is a header file that includes all the related directives related to OMP.
We have other parallelization resources too, such as MPI -
MPI stands for Message Passing Interface. Here the message is data. MPI allows data to be passed between processes in a distributed memory environment. In C, “mpi.h” is a header file that includes all data structures, routines, and constants of MPI.
And POSIX threads -
The POSIX thread libraries are a C/C++ thread API based on standards. It enables the creation of a new concurrent process flow. It works well on multi-processor or multi-core systems, where the process flow may be scheduled to execute on another processor, increasing speed through parallel or distributed processing.
Implementation using MPI in C -
#include <mpi.h>
#include <stdio.h>
#include <stdlib.h>
#include <time.h>
#include <unistd.h>
using namespace std;
// Function to swap two numbers
void swap(int* arr, int i, int j)
{
int t = arr[i];
arr[i] = arr[j];
arr[j] = t;
}
// Function that performs the Quick Sort
// for an array arr[] starting from the
// index start and ending at index end
void quicksort(int* arr, int start, int end)
{
int pivot, index;
// Base Case
if (end <= 1)
return;
// Pick pivot and swap with first
// element Pivot is middle element
pivot = arr[start + end / 2];
swap(arr, start, start + end / 2);
// Partitioning Steps
index = start;
// Iterate over the range [start, end]
for (int i = start + 1; i < start + end; i++) {
// Swap if the element is less
// than the pivot element
if (arr[i] < pivot) {
index++;
swap(arr, i, index);
}
}
// Swap the pivot into place
swap(arr, start, index);
// Recursive Call for sorting
// of quick sort function
quicksort(arr, start, index - start);
quicksort(arr, index + 1, start + end - index - 1);
}
// Function that merges the two arrays
int* merge(int* arr1, int n1, int* arr2, int n2)
{
int* result = (int*)malloc((n1 + n2) * sizeof(int));
int i = 0;
int j = 0;
int k;
for (k = 0; k < n1 + n2; k++) {
if (i >= n1) {
result[k] = arr2[j];
j++;
}
else if (j >= n2) {
result[k] = arr1[i];
i++;
}
// Indices in bounds as i < n1
// && j < n2
else if (arr1[i] < arr2[j]) {
result[k] = arr1[i];
i++;
}
// v2[j] <= v1[i]
else {
result[k] = arr2[j];
j++;
}
}
return result;
}
// Driver Code
int main(int argc, char* argv[])
{
int number_of_elements;
int* data = NULL;
int chunk_size, own_chunk_size;
int* chunk;
FILE* file = NULL;
double time_taken;
MPI_Status status;
if (argc != 3) {
printf("Desired number of arguments are not their "
"in argv....\n");
printf("2 files required first one input and "
"second one output....\n");
exit(-1);
}
int number_of_process, rank_of_process;
int rc = MPI_Init(&argc, &argv);
if (rc != MPI_SUCCESS) {
printf("Error in creating MPI "
"program.\n "
"Terminating......\n");
MPI_Abort(MPI_COMM_WORLD, rc);
}
MPI_Comm_size(MPI_COMM_WORLD, &number_of_process);
MPI_Comm_rank(MPI_COMM_WORLD, &rank_of_process);
if (rank_of_process == 0) {
// Opening the file
file = fopen(argv[1], "r");
// Printing Error message if any
if (file == NULL) {
printf("Error in opening file\n");
exit(-1);
}
// Reading number of Elements in file ...
// First Value in file is number of Elements
printf(
"Reading number of Elements From file ....\n");
fscanf(file, "%d", &number_of_elements);
printf("Number of Elements in the file is %d \n",
number_of_elements);
// Computing chunk size
chunk_size = (number_of_elements %
number_of_process == 0) ?
(number_of_elements /
number_of_process) :
(number_of_elements /
(number_of_process - 1);
data = (int *)malloc(number_of_process *
chunk_size *
sizeof(int));
// Reading the rest elements in which
// operation is being performed
printf("Reading the array from the file.......\n");
for(int i = 0; i < number_of_elements; i++)
{
fscanf(file, "%d", &data[i]);
}
// Padding data with zero
for(int i = number_of_elements;
i < number_of_process *
chunk_size; i++)
{
data[i] = 0;
}
// Printing the array read from file
printf("Elements in the array is : \n");
for(int i = 0; i < number_of_elements; i++)
{
printf("%d ", data[i]);
}
printf("\n");
fclose(file);
file = NULL;
}
// Blocks all process until reach this point
MPI_Barrier(MPI_COMM_WORLD);
// Starts Timer
time_taken -= MPI_Wtime();
// BroadCast the Size to all the
// process from root process
MPI_Bcast(&number_of_elements, 1, MPI_INT, 0,
MPI_COMM_WORLD);
// Computing chunk size
chunk_size= (number_of_elements %
number_of_process == 0) ?
(number_of_elements /
number_of_process) :
(number_of_elements /
(number_of_process - 1);
// Calculating total size of chunk
// according to bits
chunk = (int *)malloc(chunk_size *
sizeof(int));
// Scatter the chuck size data to all process
MPI_Scatter(data, chunk_size, MPI_INT, chunk,
chunk_size, MPI_INT, 0, MPI_COMM_WORLD);
free(data);
data = NULL;
// Compute size of own chunk and
// then sort them
// using quick sort
own_chunk_size = (number_of_elements >=
chunk_size*(rank_of_process + 1)) ?
chunk_size : (number_of_elements -
chunk_size*rank_of_process);
// Sorting array with quick sort for every
// chunk as called by process
quicksort(chunk, 0, own_chunk_size);
for(int step = 1; step < number_of_process; step = 2 * step)
{
if (rank_of_process % (2 * step) != 0) {
MPI_Send(chunk, own_chunk_size, MPI_INT,
rank_of_process - step, 0,
MPI_COMM_WORLD);
break;
}
if (rank_of_process + step < number_of_process) {
int received_chunk_size
= (number_of_elements
>= chunk_size
* (rank_of_process + 2 * step))
? (chunk_size * step)
: (number_of_elements
- chunk_size
* (rank_of_process + step));
int* chunk_received;
chunk_received = (int*)malloc(
received_chunk_size * sizeof(int));
MPI_Recv(chunk_received, received_chunk_size,
MPI_INT, rank_of_process + step, 0,
MPI_COMM_WORLD, &status);
data = merge(chunk, own_chunk_size,
chunk_received,
received_chunk_size);
free(chunk);
free(chunk_received);
chunk = data;
own_chunk_size
= own_chunk_size + received_chunk_size;
}
}
// Stop the timer
time_taken += MPI_Wtime();
// Opening the other file as taken form input
// and writing it to the file and giving it
// as the output
if(rank_of_process == 0)
{
// Opening the file
file = fopen(argv[2], "w");
if (file == NULL) {
printf("Error in opening file... \n");
exit(-1);
}
// Printing total number of elements
// in the file
fprintf(
file,
"Total number of Elements in the array : %d\n",
own_chunk_size);
// Printing the value of array in the file
for (int i = 0; i < own_chunk_size; i++) {
fprintf(file, "%d ", chunk[i]);
}
// Closing the file
fclose(file);
printf("\n\n\n\nResult printed in output.txt file "
"and shown below: \n");
// For Printing in the terminal
printf("Total number of Elements given as input : "
"%d\n",
number_of_elements);
printf("Sorted array is: \n");
for (int i = 0; i < number_of_elements; i++) {
printf("%d ", chunk[i]);
}
printf(
"\n\nQuicksort %d ints on %d procs: %f secs\n",
number_of_elements, number_of_process,
time_taken);
}
MPI_Finalize();
return 0;
}
Implementation using Posix threads in C++ -
// C++ program to implement the Quick Sort
// using POSIX Thread
#include <bits/stdc++.h>
#include <pthread.h>
using namespace std;
// Structure
struct data_set {
int start_index;
int end_index;
int* data;
};
// Function to perform swap operations
void swap(int* a, int* b)
{
int t = *a;
*a = *b;
*b = t;
}
// Partition function for making
// partition in array
int partition(int arr[], int left_index,
int right_index)
{
// Declaration and initialization
// choosing pivot element form which
// we make partition
// Here pivot is last element of
// the array
int pivot = arr[right_index];
int i = left_index - 1;
// Making array as per requirement
// arranging element smaller than
// pivot on left side and larger
// then pivot on right side
for (int j = left_index;
j <= right_index - 1; j++) {
if (arr[j] < pivot) {
i++;
swap(&arr[i], &arr[j]);
}
}
swap(&arr[i + 1], &arr[right_index]);
// Returning the partition index
return i + 1;
}
// Quicksort Function for sorting
// array
void* quick_sort(void* data)
{
// Retrieving back the data sent
// from thread
struct data_set* info = (struct data_set*)data;
// Declaration of left index
int left_index, right_index, index;
// Initialization of left and
// right index
left_index = info->start_index;
right_index = info->end_index;
// Recursive call of quick_sort
// function
if (left_index < right_index) {
// Declaration of pthread and
// pthread attribute type object
pthread_attr_t attr;
pthread_t first_thread;
pthread_t second_thread;
// Making two pointers of type
// data_set for making again
// call form thread
struct data_set* info1 = new data_set;
struct data_set* info2 = new data_set;
// Their initialization
info1->data = info->data;
info2->data = info->data;
// Initialize of pthread attribute
pthread_attr_init(&attr);
// For setting the set detach
// state of attribute
pthread_attr_setdetachstate(
&attr, PTHREAD_CREATE_JOINABLE);
// Partition the array for any
// recursive call
index = partition(info->data,
left_index,
right_index);
info1->start_index = left_index;
info1->end_index = index - 1;
// Create pthread type object and
// printing the error if any
if (pthread_create(&first_thread,
&attr, quick_sort,
info1)) {
cout << "Error in creating thread "
<< endl;
// Exiting in case of not
// creation of thread
exit(-1);
}
info2->start_index = index + 1;
info2->end_index = right_index;
// Creating pthread type object
// and print the error
if (pthread_create(&second_thread,
&attr, quick_sort,
info2)) {
cout << "Error in creating thread "
<< endl;
// Exiting in case of not
// creation of thread
exit(-1);
}
// Joining the threads
pthread_join(first_thread, NULL);
pthread_join(second_thread, NULL);
}
return NULL;
}
// Driver Code
int main()
{
// Declaration of Number of threads
int N;
struct data_set* info = new data_set;
// Taking number of elements as input
cout << "Enter number of elements"
<< " in the array: \n";
cin >> N;
// Declaration of array
int A[N];
// Initialization of array
cout << "Enter the array: " << endl;
for (int i = 0; i < N; i++) {
cin >> A[i];
}
// Initialize of structure of
// data_set type
info->data = A;
info->start_index = 0;
info->end_index = N - 1;
// Declaration of pthread object
pthread_t thread_id;
// Creating and pthread object and
// printing the array of any
if (pthread_create(&thread_id, NULL,
quick_sort,
info)) {
cout << "Error in creating thread"
<< endl;
// Exit in case of error
exit(-1);
}
// Joining the pthread object
int r1 = pthread_join(thread_id, NULL);
// Printing the array if any in case
// of joining
if (r1) {
cout << "Error in Joinging thread"
<< endl;
// Exiting in case of error
exit(-1);
}
// Printing the array after sorting
cout << "Sorted Array is: " << endl;
for (int i = 0; i < N; i++) {
cout << A[i] << " ";
}
cout << endl;
// Exiting from pthread programming
pthread_exit(NULL);
return 0;
}
References Used for this blog–
· Parallel_algorithms_wikipedia