Java parallel BucketSort -- version 2.0 (optimal,I think)
程序员文章站
2022-04-01 16:52:37
...
如果你能在一周左右看懂 ,说明你水平还可以。如果你看不懂,也很正常.
随机整数范围从 1 - 20亿 ,共有1亿个随机整数,然后从小到大排序。我只开了4个线程,如果12个线程火力全开,我觉得可以秒算!
linear_time: 9.624 secs 线性执行:9.624 秒
parallel_time: 2.528 secs 平行执行:2.528 秒
speed up : 3.806962025316456 加速比: 4 左右
你需要 15 G 内存,否则看不出平行计算的威力!你当然可以调整数据的大小,多少,但是数据太小,太简单,平行化就毫无意义了。
我自己的原创就是:数据的等比例缩小和还原,这一点你必须深刻理解桶排序的原理才能理解,当然还有多线程。我觉得这才是我这次重写cpp版同排序平行化最大的收获,需要一点点的技巧和智商.
这次平行化,我用了1.5 天完成,每天8小时左右 大概12小时左右。从线性桶排序代码编写,到桶排序平行化. 今天好像是大年初六,我不过年,没啥概念.
Hi,sweet heart, be careful:
-Xms15024M -Xmx15024M
/
linear_time: 9.624 secs
parallel_time: 2.528 secs
speed up : 3.806962025316456
///
public class MyTimer {
private static long begin;
public static void time_start(){
begin = System.nanoTime();
}
public static double time_end_milli_seconds(){
return System.nanoTime()-begin/1e6;
}
public static double time_end_seconds(){
double total_time = (System.nanoTime()-begin)/1e6;
return ((double)((int)total_time))/1000;
}
}
import java.util.Random;
public class RandomGenerator {
public static int[] random_generate(int min,int max,int numtrials){
int[] int_array = new int[numtrials];
Random random = new Random();
for (int i = 0; i < numtrials; i++) {
int random_num = random.nextInt(max)%(max-min+1) + min;
int_array[i] = random_num;
}
return int_array;
}
}
public class BucketSort {
public int[] bucket_sort(int[] original_array,int max){
// index: 0-99 but max is : 100 . the point is the array index for bucketsort
int[] store_array = new int[max+1];
for (int i = 0; i < original_array.length; i++) {
int int_num = original_array[i];
// here is the miracle of bucketsort
store_array[int_num]= ++store_array[int_num];
}
int[] final_sorted_array = new int[original_array.length];
int cherry_pointer = 0;
for (int cherry = 0; cherry < store_array.length; cherry++) {
int cherry_num = store_array[cherry];
for (int j = 0; j < cherry_num; j++) {
final_sorted_array[cherry_pointer]=cherry;
++cherry_pointer;
}
}
return final_sorted_array;
}
}
public class GoToBucketThread extends Thread{
public int begin_index;
public int[] random_array;
public int bucket_size;
public int[] arr0;
public int pointer0 = 0;
public int[] arr1;
public int pointer1 = 0;
public int[] arr2;
public int pointer2 = 0;
public int[] arr3;
public int pointer3 = 0;
public GoToBucketThread(int begin_index, int[] random_array, int bucket_size) {
this.begin_index = begin_index;
this.random_array = random_array;
this.bucket_size = bucket_size;
arr0 = new int[bucket_size];
arr1 = new int[bucket_size];
arr2 = new int[bucket_size];
arr3 = new int[bucket_size];
}
@Override
public void run() {
classify_data();
arr0 = wash_zero_data(arr0,pointer0);
arr1 = wash_zero_data(arr1,pointer1);
arr2 = wash_zero_data(arr2,pointer2);
arr3 = wash_zero_data(arr3,pointer3);
}
public int[] wash_zero_data(int[] original_array,int array_pointer){
int[] new_array = new int[array_pointer];
for (int i = 0; i < array_pointer; i++) {
new_array[i] = original_array[i];
}
return new_array;
}
public void classify_data(){
// range random number in different areas.... 1-25,26-50,51-75,76-100
for (int i = begin_index; i < random_array.length; i+=4) {
int bucket_num = random_array[i] / bucket_size;
switch (bucket_num)
{
case 0: arr0[pointer0] = random_array[i]; ++pointer0;
break;
case 1: arr1[pointer1] = random_array[i]; ++pointer1;
break;
case 2: arr2[pointer2] = random_array[i]; ++pointer2;
break;
case 3: arr3[pointer3] = random_array[i]; ++pointer3;
break;
}
} // all cherries are in the right area
}
}
public class SmallBucketThread extends Thread{
public int[] original_array;
public int[] final_sorted_array;
public int bucket_num;
public int bucket_size;
public SmallBucketThread(int[] original_array,int bucket_num,int bucket_size) {
this.original_array = original_array;
final_sorted_array = new int[original_array.length];
this.bucket_num = bucket_num;
this.bucket_size = bucket_size;
}
public int shrink_number(int item,int bucket_num,int bucket_size){
return item-(bucket_num*bucket_size);
}
public int recover_number(int item,int bucket_num,int bucket_size){
return item+(bucket_num*bucket_size);
}
@Override
public void run() {
bucket_sort_parallel(original_array,bucket_size);
}
public void bucket_sort_parallel(int[] original_array,int max){
// index: 0-99 but max is : 100 . the point is the array index for bucketsort
int[] store_array = new int[max+1];
for (int i = 0; i < original_array.length; i++) {
// 71 -> 21 , 89->14 , save physical memory
int int_num = shrink_number(original_array[i],bucket_num,bucket_size);
// here is the miracle of bucketsort
store_array[int_num]= ++store_array[int_num];
}
int cherry_pointer = 0;
// loop the cherry store....
for (int cherry = 0; cherry < store_array.length; cherry++) {
int cherry_num = store_array[cherry];
for (int j = 0; j < cherry_num; j++) {
final_sorted_array[cherry_pointer] = recover_number(cherry,bucket_num,bucket_size);
++cherry_pointer;
}
}
}
}
import java.util.ArrayList;
import java.util.List;
public class BucketSortParallel {
public double bucket_sort_parallel(int min_cherry,int max_cherry,int cherry_total_num) throws InterruptedException {
MyTimer.time_start();
int bucket_size = cherry_total_num/4;
int[] random_array = RandomGenerator.random_generate(min_cherry,max_cherry,cherry_total_num);
// start parallel
GoToBucketThread goToBucketThread0 = new GoToBucketThread(0,random_array,bucket_size);
GoToBucketThread goToBucketThread1 = new GoToBucketThread(1,random_array,bucket_size);
GoToBucketThread goToBucketThread2 = new GoToBucketThread(2,random_array,bucket_size);
GoToBucketThread goToBucketThread3 = new GoToBucketThread(3,random_array,bucket_size);
goToBucketThread0.start();
goToBucketThread1.start();
goToBucketThread2.start();
goToBucketThread3.start();
goToBucketThread0.join();
goToBucketThread1.join();
goToBucketThread2.join();
goToBucketThread3.join();
int arr0[] = new int[goToBucketThread0.pointer0+goToBucketThread1.pointer0+goToBucketThread2.pointer0+goToBucketThread3.pointer0];
int arr1[] = new int[goToBucketThread0.pointer1+goToBucketThread1.pointer1+goToBucketThread2.pointer1+goToBucketThread3.pointer1];
int arr2[] = new int[goToBucketThread0.pointer2+goToBucketThread1.pointer2+goToBucketThread2.pointer2+goToBucketThread3.pointer2];
int arr3[] = new int[goToBucketThread0.pointer3+goToBucketThread1.pointer3+goToBucketThread2.pointer3+goToBucketThread3.pointer3];
// collect arr0 from 4 threads
System.arraycopy(goToBucketThread0.arr0,0,arr0,0,goToBucketThread0.arr0.length);
System.arraycopy(goToBucketThread1.arr0,0,arr0,goToBucketThread0.arr0.length,goToBucketThread1.arr0.length);
System.arraycopy(goToBucketThread2.arr0,0,arr0,goToBucketThread0.arr0.length+goToBucketThread1.arr0.length,goToBucketThread2.arr0.length);
System.arraycopy(goToBucketThread3.arr0,0,arr0,goToBucketThread0.arr0.length+goToBucketThread1.arr0.length+goToBucketThread2.arr0.length,goToBucketThread3.arr0.length);
// collect arr1 from 4 threads
System.arraycopy(goToBucketThread0.arr1,0,arr1,0,goToBucketThread0.arr1.length);
System.arraycopy(goToBucketThread1.arr1,0,arr1,goToBucketThread0.arr1.length,goToBucketThread1.arr1.length);
System.arraycopy(goToBucketThread2.arr1,0,arr1,goToBucketThread0.arr1.length+goToBucketThread1.arr1.length,goToBucketThread2.arr1.length);
System.arraycopy(goToBucketThread3.arr1,0,arr1,goToBucketThread0.arr1.length+goToBucketThread1.arr1.length+goToBucketThread2.arr1.length,goToBucketThread3.arr1.length);
// collect arr2 from 4 threads
System.arraycopy(goToBucketThread0.arr2,0,arr2,0,goToBucketThread0.arr2.length);
System.arraycopy(goToBucketThread1.arr2,0,arr2,goToBucketThread0.arr2.length,goToBucketThread1.arr2.length);
System.arraycopy(goToBucketThread2.arr2,0,arr2,goToBucketThread0.arr2.length+goToBucketThread1.arr2.length,goToBucketThread2.arr2.length);
System.arraycopy(goToBucketThread3.arr2,0,arr2,goToBucketThread0.arr2.length+goToBucketThread1.arr2.length+goToBucketThread2.arr2.length,goToBucketThread3.arr2.length);
// collect arr3 from 4 threads
System.arraycopy(goToBucketThread0.arr3,0,arr3,0,goToBucketThread0.arr3.length);
System.arraycopy(goToBucketThread1.arr3,0,arr3,goToBucketThread0.arr3.length,goToBucketThread1.arr3.length);
System.arraycopy(goToBucketThread2.arr3,0,arr3,goToBucketThread0.arr3.length+goToBucketThread1.arr3.length,goToBucketThread2.arr3.length);
System.arraycopy(goToBucketThread3.arr3,0,arr3,goToBucketThread0.arr3.length+goToBucketThread1.arr3.length+goToBucketThread2.arr3.length,goToBucketThread3.arr3.length);
SmallBucketThread thread0 = new SmallBucketThread(arr0,0,bucket_size);
SmallBucketThread thread1 = new SmallBucketThread(arr1,1,bucket_size);
SmallBucketThread thread2 = new SmallBucketThread(arr2,2,bucket_size);
SmallBucketThread thread3 = new SmallBucketThread(arr3,3,bucket_size);
thread0.start();
thread1.start();
thread2.start();
thread3.start();
thread0.join();
thread1.join();
thread2.join();
thread3.join();
List list = new ArrayList<int[]>();
list.add(thread0.final_sorted_array);
list.add(thread1.final_sorted_array);
list.add(thread2.final_sorted_array);
list.add(thread3.final_sorted_array);
double time_end = MyTimer.time_end_seconds();
// this.print_array(list);
return time_end;
}
public void print_array(List list)
{
for (int i = 0; i < list.size(); i++) {
int[] array = (int[]) list.get(i);
for (int j = array.length-10; j < array.length; j++) {
System.out.println(array[j]);
}
}
}
public void print_array(int[] array){
for (int j = 0; j < array.length; j++) {
System.out.print(array[j]+"-");
}
}
}
public class TestBucketSort {
public double linear_bucketsort(int min,int max,int num_trails){
MyTimer.time_start();
BucketSort bucketSort = new BucketSort();
int[] random_array = RandomGenerator.random_generate(min,max,num_trails);
int[] sorted_array = bucketSort.bucket_sort(random_array,max);
double time_end = MyTimer.time_end_seconds();
return time_end;
}
public static void main(String[] args) throws InterruptedException {
int min = 1;
int max = 2000000000; // 2 billion
int num_trails = 100000000; // 100 million
TestBucketSort testBucketSort = new TestBucketSort();
double linear_time = testBucketSort.linear_bucketsort(min,max,num_trails);
BucketSortParallel parallel = new BucketSortParallel();
double parallel_time = parallel.bucket_sort_parallel(min,max,num_trails);
System.out.println("linear_time: "+linear_time+" secs");
System.out.println("parallel_time: "+parallel_time+" secs");
System.out.println("speed up : "+ (linear_time/parallel_time));
}
}