欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Java parallel BucketSort -- version 2.0 (optimal,I think)

程序员文章站 2022-04-01 16:52:37
...

如果你能在一周左右看懂 ,说明你水平还可以。如果你看不懂,也很正常.
随机整数范围从 1 - 20亿 ,共有1亿个随机整数,然后从小到大排序。我只开了4个线程,如果12个线程火力全开,我觉得可以秒算!

linear_time: 9.624 secs 线性执行:9.624 秒
parallel_time: 2.528 secs 平行执行:2.528 秒
speed up : 3.806962025316456 加速比: 4 左右

你需要 15 G 内存,否则看不出平行计算的威力!你当然可以调整数据的大小,多少,但是数据太小,太简单,平行化就毫无意义了。
我自己的原创就是:数据的等比例缩小和还原,这一点你必须深刻理解桶排序的原理才能理解,当然还有多线程。我觉得这才是我这次重写cpp版同排序平行化最大的收获,需要一点点的技巧和智商.

这次平行化,我用了1.5 天完成,每天8小时左右 大概12小时左右。从线性桶排序代码编写,到桶排序平行化. 今天好像是大年初六,我不过年,没啥概念.
Hi,sweet heart, be careful:
-Xms15024M -Xmx15024M
/
linear_time: 9.624 secs
parallel_time: 2.528 secs
speed up : 3.806962025316456
///

public class MyTimer {
    private static long begin;

    public static void time_start(){
        begin = System.nanoTime();
    }

    public static double time_end_milli_seconds(){
        return System.nanoTime()-begin/1e6;

    }

    public static double time_end_seconds(){
        double total_time =  (System.nanoTime()-begin)/1e6;
        return ((double)((int)total_time))/1000;

    }

}
import java.util.Random;

public class RandomGenerator {

    public static int[] random_generate(int min,int max,int numtrials){

        int[] int_array = new int[numtrials];

        Random random = new Random();

        for (int i = 0; i < numtrials; i++) {
            int random_num = random.nextInt(max)%(max-min+1) + min;
            int_array[i] = random_num;
        }

        return int_array;
    }

}
public class BucketSort {

    public int[] bucket_sort(int[] original_array,int max){

            // index: 0-99 but max is : 100 . the point is the array index for bucketsort
           int[] store_array = new int[max+1];

            for (int i = 0; i < original_array.length; i++) {
                int int_num = original_array[i];

                // here is the miracle of bucketsort
                store_array[int_num]= ++store_array[int_num];
            }

           int[] final_sorted_array = new int[original_array.length];
           int cherry_pointer = 0;

        for (int cherry = 0; cherry < store_array.length; cherry++) {

            int cherry_num = store_array[cherry];
            for (int j = 0; j < cherry_num; j++) {
                final_sorted_array[cherry_pointer]=cherry;
                ++cherry_pointer;
            }

        }

           return final_sorted_array;
    }
}
public class GoToBucketThread extends Thread{

    public int begin_index;
    public int[] random_array;
    public int bucket_size;

    public int[] arr0;
    public int pointer0 = 0;

    public int[] arr1;
    public int pointer1 = 0;

    public int[] arr2;
    public int pointer2 = 0;

    public int[] arr3;
    public int pointer3 = 0;

    public GoToBucketThread(int begin_index, int[] random_array, int bucket_size) {
        this.begin_index = begin_index;
        this.random_array = random_array;
        this.bucket_size = bucket_size;

        arr0 = new int[bucket_size];
        arr1 = new int[bucket_size];
        arr2 = new int[bucket_size];
        arr3 = new int[bucket_size];

    }

    @Override
    public void run() {
        classify_data();
        arr0 = wash_zero_data(arr0,pointer0);
        arr1 = wash_zero_data(arr1,pointer1);
        arr2 = wash_zero_data(arr2,pointer2);
        arr3 = wash_zero_data(arr3,pointer3);
    }

    public int[] wash_zero_data(int[] original_array,int array_pointer){


        int[] new_array = new int[array_pointer];

        for (int i = 0; i < array_pointer; i++) {
            new_array[i] = original_array[i];
        }
        return new_array;

    }

    public void classify_data(){

        // range random number in different areas.... 1-25,26-50,51-75,76-100
        for (int i = begin_index; i < random_array.length; i+=4) {

            int bucket_num = random_array[i] / bucket_size;

            switch (bucket_num)
            {
                case 0: arr0[pointer0] = random_array[i]; ++pointer0;
                    break;

                case 1: arr1[pointer1] = random_array[i]; ++pointer1;
                    break;

                case 2: arr2[pointer2] = random_array[i]; ++pointer2;
                    break;

                case 3: arr3[pointer3] = random_array[i]; ++pointer3;
                    break;
            }

        }  // all cherries are in the right area
    }
}
public class SmallBucketThread extends Thread{

    public int[] original_array;
    public int[] final_sorted_array;
    public int bucket_num;
    public int bucket_size;

    public SmallBucketThread(int[] original_array,int bucket_num,int bucket_size) {
        this.original_array = original_array;
        final_sorted_array = new int[original_array.length];
        this.bucket_num = bucket_num;
        this.bucket_size = bucket_size;
    }

    public int shrink_number(int item,int bucket_num,int bucket_size){

        return item-(bucket_num*bucket_size);
    }

    public int recover_number(int item,int bucket_num,int bucket_size){

        return item+(bucket_num*bucket_size);
    }

    @Override
    public void run() {
        bucket_sort_parallel(original_array,bucket_size);
    }



    public void bucket_sort_parallel(int[] original_array,int max){

        // index: 0-99 but max is : 100 . the point is the array index for bucketsort
        int[] store_array = new int[max+1];

        for (int i = 0; i < original_array.length; i++) {

                // 71 -> 21 , 89->14 , save physical memory
                int int_num = shrink_number(original_array[i],bucket_num,bucket_size);
                // here is the miracle of bucketsort
                store_array[int_num]= ++store_array[int_num];


        }

        int cherry_pointer = 0;
        // loop the cherry store....
        for (int cherry = 0; cherry < store_array.length; cherry++) {

            int cherry_num = store_array[cherry];
            for (int j = 0; j < cherry_num; j++) {
                final_sorted_array[cherry_pointer] = recover_number(cherry,bucket_num,bucket_size);
                ++cherry_pointer;
            }

        }


    }
}
import java.util.ArrayList;
import java.util.List;

public class BucketSortParallel {

     public double bucket_sort_parallel(int min_cherry,int max_cherry,int cherry_total_num) throws InterruptedException {

         MyTimer.time_start();

         int bucket_size = cherry_total_num/4;

         int[] random_array = RandomGenerator.random_generate(min_cherry,max_cherry,cherry_total_num);

         // start parallel

         GoToBucketThread goToBucketThread0 = new GoToBucketThread(0,random_array,bucket_size);
         GoToBucketThread goToBucketThread1 = new GoToBucketThread(1,random_array,bucket_size);
         GoToBucketThread goToBucketThread2 = new GoToBucketThread(2,random_array,bucket_size);
         GoToBucketThread goToBucketThread3 = new GoToBucketThread(3,random_array,bucket_size);

         goToBucketThread0.start();
         goToBucketThread1.start();
         goToBucketThread2.start();
         goToBucketThread3.start();

         goToBucketThread0.join();
         goToBucketThread1.join();
         goToBucketThread2.join();
         goToBucketThread3.join();



         int arr0[] = new int[goToBucketThread0.pointer0+goToBucketThread1.pointer0+goToBucketThread2.pointer0+goToBucketThread3.pointer0];
         int arr1[] = new int[goToBucketThread0.pointer1+goToBucketThread1.pointer1+goToBucketThread2.pointer1+goToBucketThread3.pointer1];
         int arr2[] = new int[goToBucketThread0.pointer2+goToBucketThread1.pointer2+goToBucketThread2.pointer2+goToBucketThread3.pointer2];
         int arr3[] = new int[goToBucketThread0.pointer3+goToBucketThread1.pointer3+goToBucketThread2.pointer3+goToBucketThread3.pointer3];

         // collect arr0 from 4 threads
         System.arraycopy(goToBucketThread0.arr0,0,arr0,0,goToBucketThread0.arr0.length);
         System.arraycopy(goToBucketThread1.arr0,0,arr0,goToBucketThread0.arr0.length,goToBucketThread1.arr0.length);
         System.arraycopy(goToBucketThread2.arr0,0,arr0,goToBucketThread0.arr0.length+goToBucketThread1.arr0.length,goToBucketThread2.arr0.length);
         System.arraycopy(goToBucketThread3.arr0,0,arr0,goToBucketThread0.arr0.length+goToBucketThread1.arr0.length+goToBucketThread2.arr0.length,goToBucketThread3.arr0.length);

         // collect arr1 from 4 threads
         System.arraycopy(goToBucketThread0.arr1,0,arr1,0,goToBucketThread0.arr1.length);
         System.arraycopy(goToBucketThread1.arr1,0,arr1,goToBucketThread0.arr1.length,goToBucketThread1.arr1.length);
         System.arraycopy(goToBucketThread2.arr1,0,arr1,goToBucketThread0.arr1.length+goToBucketThread1.arr1.length,goToBucketThread2.arr1.length);
         System.arraycopy(goToBucketThread3.arr1,0,arr1,goToBucketThread0.arr1.length+goToBucketThread1.arr1.length+goToBucketThread2.arr1.length,goToBucketThread3.arr1.length);

         // collect arr2 from 4 threads
         System.arraycopy(goToBucketThread0.arr2,0,arr2,0,goToBucketThread0.arr2.length);
         System.arraycopy(goToBucketThread1.arr2,0,arr2,goToBucketThread0.arr2.length,goToBucketThread1.arr2.length);
         System.arraycopy(goToBucketThread2.arr2,0,arr2,goToBucketThread0.arr2.length+goToBucketThread1.arr2.length,goToBucketThread2.arr2.length);
         System.arraycopy(goToBucketThread3.arr2,0,arr2,goToBucketThread0.arr2.length+goToBucketThread1.arr2.length+goToBucketThread2.arr2.length,goToBucketThread3.arr2.length);

         // collect arr3 from 4 threads
         System.arraycopy(goToBucketThread0.arr3,0,arr3,0,goToBucketThread0.arr3.length);
         System.arraycopy(goToBucketThread1.arr3,0,arr3,goToBucketThread0.arr3.length,goToBucketThread1.arr3.length);
         System.arraycopy(goToBucketThread2.arr3,0,arr3,goToBucketThread0.arr3.length+goToBucketThread1.arr3.length,goToBucketThread2.arr3.length);
         System.arraycopy(goToBucketThread3.arr3,0,arr3,goToBucketThread0.arr3.length+goToBucketThread1.arr3.length+goToBucketThread2.arr3.length,goToBucketThread3.arr3.length);


         SmallBucketThread thread0 = new SmallBucketThread(arr0,0,bucket_size);
         SmallBucketThread thread1 = new SmallBucketThread(arr1,1,bucket_size);
         SmallBucketThread thread2 = new SmallBucketThread(arr2,2,bucket_size);
         SmallBucketThread thread3 = new SmallBucketThread(arr3,3,bucket_size);

         thread0.start();
         thread1.start();
         thread2.start();
         thread3.start();

         thread0.join();
         thread1.join();
         thread2.join();
         thread3.join();

         List list = new ArrayList<int[]>();
         list.add(thread0.final_sorted_array);
         list.add(thread1.final_sorted_array);
         list.add(thread2.final_sorted_array);
         list.add(thread3.final_sorted_array);

         double time_end = MyTimer.time_end_seconds();

        // this.print_array(list);

         return time_end;

     }

     public void print_array(List list)
     {
         for (int i = 0; i < list.size(); i++) {

              int[] array = (int[]) list.get(i);

             for (int j = array.length-10; j < array.length; j++) {

                     System.out.println(array[j]);

             }

         }
     }

     public void print_array(int[] array){
         for (int j = 0; j < array.length; j++) {

                 System.out.print(array[j]+"-");

         }
     }


}
public class TestBucketSort {

    public double linear_bucketsort(int min,int max,int num_trails){

        MyTimer.time_start();

        BucketSort bucketSort = new BucketSort();

        int[] random_array = RandomGenerator.random_generate(min,max,num_trails);

        int[] sorted_array = bucketSort.bucket_sort(random_array,max);

        double time_end = MyTimer.time_end_seconds();

        return time_end;
    }

    public static void main(String[] args) throws InterruptedException {

        int min = 1;
        int max = 2000000000; // 2 billion
        int num_trails = 100000000; // 100 million

        TestBucketSort testBucketSort = new TestBucketSort();
        double linear_time =  testBucketSort.linear_bucketsort(min,max,num_trails);

        BucketSortParallel parallel = new BucketSortParallel();
        double parallel_time = parallel.bucket_sort_parallel(min,max,num_trails);

       System.out.println("linear_time: "+linear_time+" secs");
       System.out.println("parallel_time: "+parallel_time+" secs");
       System.out.println("speed up : "+ (linear_time/parallel_time));


    }
}