超高速 In-Place Merge Sort (クイックソートに匹敵する速度のインプレースマージソート)


クイックソートに匹敵する速度へ (クイックソートが最速という常識への挑戦)

一般のマージソートでは,データの記憶領域と同じ程度の追加の記憶領域が必要です。in-place merge sort(インプレースマージソート)とは,追加の記憶領域をほとんど使わずに行うマージソートです。
また,一般のマージソートでは,キーの値が同じ要素のソー ト前の順序が,ソート後も変わらずに維持されます。このようなソートを安定ソートと言います。
一般のマージソートは,最速と言われているクイックソートよりも遅いようです。ただし,クイックソートは,安定ソートではありません。(私が作ったマージソートはクイッ クソートよりも速いです。)
これまでのインプレースマージソートの研究目標の多くは,安定ソートありながら平均計算時間と最悪計算時間が共にO(n (log n)で一般のマージソート並みの速さのものを作ることだったようです。

In-Place Sorting With Merge Sort に,平均計算時間と最悪計算時間が共に O(n log n)で,安定ソートではないインプレースマージソートがありますが,それは一般のマージソートより遅いものです。
私も,平均計算時間と最悪計算時間が共に O(n log n)で,安定ソートではないインプレースマージソートを作りました。再帰呼 び出しを用いていて,追加の記憶領域は O(log n) です。(2025年1月)
その後,改良を重ねたところ,大量のランダムなデータに対する平均計算時間はクイックソートに匹敵するまでになりました。
そのソートは,In-Place Sorting With Merge Sort にあるインプレースマージソートを基本として,それに幾つかの速度を上げる改良を施した形になっています。
In-Place Sorting With Merge Sort での計算時間O(n log n)のインプレースマージソートの実現方法と,このインプレースマージソートを比較しながら,以下を読んで頂ければ幸いです。

ここで示すインプレースマージソートは,In-Place Sorting With Merge Sort にあるインプレースマージソートよりも高速ですが,それは,部分列のソート法を次のように変えたからです。
    データ列のある部分をソートする → データ列のある部分をソートされたデータ列にする
ここでは,データ列のある部分について,そこに存在するデータをソートするのではなく,その部分を,他の部分にあるデータも用いてソートされたデータ列に したのです。

In-Place Sorting With Merge Sort では,整列されたm個のデータと整列されたk個のデータを,整列されていないk個のデータを上手く用いて,整列されたm+k個のデータにしていました。
    (整列されたm個,整列されていないk 個,整列されたk個) → (整列されたm+k個,整列されていないk個)
ここでのインプレースマージソートは,上記の操作を再帰的に繰り返しました。このことが,In-Place Sorting With Merge Sort のソート法より高速になった最大の理由と思われます。

さらに,ソートされていないデータ が少なくになったとき,ソートされていないデータをソートされたデータ列にマージするのに新たな手法を用いました。
これにより,クイックソートやイントロソートに匹敵する速度までになりました。
また,小さなデータブロックに対するソートに挿入ソートを用いることにより高速化しましたが,クイックソートもこの方法で同様に高速化されます。
このソートは,最悪計算時間において,クイックソートやイントロソートよりも優れていて,私の知る限りインプレースなソートの内では最悪計算時間において 最も優れています。

インプレースマージソートを少しでも高速化しようと工夫していましたが,ほんの少し高速化するとコードが大きく 複雑になることに気が付きました。
そこで,インプレースマージソートのコードを,できる限り短く単純にしてみました。それでも,クイックソートに十分に対抗できます。そのコードを示しま す。


複雑さを極力抑えたコード
複雑さを極力抑えて,クイックソートやイントロソートとほぼ同じ速度を実現したコードです。

#include <stdio.h>
#include <stdlib.h>
#include <time.h>

typedef int d_type;// ソートするキーの型

// i<mが整列されている配列に,m=<i<nの要素を挿入ソートする
#define INSERTION_SORT(array,m,n) {\
    int i=m;\
    for (;i<n;i++){\
        d_type tmp=array[i];\
        int j=i-1;\
        for (;j>=0 && array[j]>tmp;j--) array[j+1]=array[j];\
        array[j+1]=tmp;\
    }\
}

//値を交換する関数
void swap(d_type *x,d_type *y){
    d_type tmp=*x;
    *x=*y;
    *y=tmp;
}

// マージ (整列されたm個,整列されていないk 個,整列されたk個)
// → (整列されたm+k個,整列されていないk個) を行う関数
void mergeD(d_type a[], int m, int k){
    d_type *b=a+(m-1),*c=a+(m-1)+k,*d=a+(m-1)+2*k;
    while (1) {
        if (*b>*d) {
            swap(b,c);
            b--;c--;
            if (b<a) {
                while(c>=a){
                    swap(c,d);
                    c--;d--;
                }
                break;
            }
        } else {
            swap(c,d);
            c--;d--;
            if (c<=b) break;
        }
    }
}

#define RATIO 0.2 //RATIO<=0.5
//整列されたn個の要素の配列を作る関数
void merge_sortD(d_type array[], int n){
    if (n<50) {
        INSERTION_SORT(array,1,n)
        return;
    }
    int k=n*RATIO,m=n-k;//k=n/2をk=n*0.2に変えたら,なぜか高速化
    merge_sortD(array,m);
    merge_sortD(array+n,k);
    mergeD(array,m,k);
}

void merge_sort(d_type array[], int n){
    int m=n*(1.0-RATIO);//mを最大限まで大きくした
    merge_sortD(array,m);//整列されたm個の要素の配列を作る
    int k=(n-m)/2;
    while (k>=2){//m個の要素の配列にk個の要素をマージすることを繰り返す
        int i=m;//残ったデータを高速にマージする工夫
        for (;i<m+k;i++) if (array[i]<array[i+k]) swap(array+i,array+i+k);
        merge_sort(array+m+k,n-m-k);
        mergeD(array,m,k);
        m=m+k;
        k=(n-m)/2;
    }
    INSERTION_SORT(array,m,n) //残った要素を挿入ソートする
}
#undef RATIO
#undef INSERTION_SORT

int main(){
    int n=100000000;
    int i;
    clock_t start,end;
    d_type *array=malloc(n*sizeof(d_type));
    srand((unsigned) time(NULL));
    for (i=0;i<n;i++) array[i]=rand()*(RAND_MAX+1)+rand();
    start=clock();
    merge_sort(array,n);
    end=clock();
    printf("%f秒  \n",(double)(end-start)/CLOCKS_PER_SEC);
    for (i=1;i<n;i++) if(array[i-1]>array[i]) printf("%d %d\n",i,array[i]);
    free(array);
    return 0;
}

上記の merge_sort() は,最も基本的なコードに次のコードを付加することにより,だいぶ高速化されています。このコードは,ソートされていない残ったデータのうち,小さいデー タをなるべく早く段階でマージするためのものす。
無くても動作しますので,ある場合と無い場合で速度を比較してみてください。

        int i=m;//残ったデータを高速にマージする工夫
        for (;i<m+k;i++) if (array[i]<array[i+k]) swap(array+i,array+i+k);

上記のプログラムがやっていることの概略は,次の様です。

1 全データの80%分のソートされた配列を作る。(この作業は,関数 merge_sortD() により高速に行われる)
2 残りの20%のうちの半分の10%をソートされた配列にし,それをソートされた80%の配列にマージして,ソートされた90%分の配列を作る。
3 残りの10%のうちの半分の5%をソートされた配列にし,それをソートされた90%の配列にマージして,ソートされた95%分の配列を作る。
4 残りの5%のうちの半分の2.5%をソートされた配列にし,それをソートされた95%の配列にマージして,ソートされた97.5%分の配列を作る。
          …………

最初のコードでは,できるだけデータ数がほぼ等しいデータ群をマージしようとしていました。すなわち,m≒k となる割合を多くすれば高速になると思い込んでいました。
ところが,コードの一部の「k=n/2」を「k=n*0.2」に変えたところ,不思議にも大きく速くなりました。
データ数比 1:1でマージしていたのを,4::1でマージするように変えると遅くなると思っていたのに,速くなるなんてびっくり。
私には理由は分かりませんが,「クイックソートのピボットは中央値でなく四分位数を選択したほうが高速」というブログを見て,もしかしたら分岐予測ミス確 率の関係かなと思っています。

速度は,クイックソートやイントロソートと同じくらいであり,コードの複雑さはイントロソートと同じか少ないくらいです。
イントロソートは,このインプレースマージソートと同じく,平均計算時間と最悪計算時間が共に O(n log n)の不安定なインプレースソートなので,このインプレースマージソートはイントロソートに比肩するインプレースなマージソートである と言えます。

最悪計算時間がO(n log n) となることは,次の様に分かります。
 ① ほぼ同じ大きさのデータをマージする操作の計算時間の合計はO(n log n) で抑えられる。
 ② 大きさが大きく異なるデータをマージする回数はO(log n)で抑えられるから,それらの計算時間の和はO(n log n) で抑えられる。
 ③ 高速化のテクニックは計算時間を減少させるだけである。

注 関数 merge_sort() にある「m=n*(1.0-RATIO);//mを最大限まで大きくした」は,関数 merge_sortD()を用いるに当たり,次の条件が満たされれば良いと考えたからです。
    m+m×RATIO+m×(RATIO)2+m×(RATIO)3+m×(RATIO)4+……≦n
ここで,無限等比級数の和の公式により
    左辺=m/(1-RATIO)
よって
    m/(1-RATIO)≦n
となるから
    m≦n×(1-RATIO)


1つ前のコード
上記のコードの1つ前に作ったコードです。こちらの方が上記のコードより少しだけ複雑ですが,こちらの方が速度が安定していて,少しだけ速いかもしれません。

#include <stdio.h>
#include <stdlib.h>
#include <time.h>

typedef int d_type;// ソートするキーの型

#define INSERTION_SORT(array,m,n) {\
    int i=m;\
    for (;i<n;i++){\
        d_type tmp=array[i];\
        int j=i-1;\
        for (;j>=0 && array[j]>tmp;j--) array[j+1]=array[j];\
        array[j+1]=tmp;\
    }\
}

void swap(d_type *x,d_type *y){
    d_type tmp=*x;
    *x=*y;
    *y=tmp;
}

void mergeD(d_type a[], int m, int k){
    d_type *b=a+(m-1),*c=a+(m-1)+k,*d=a+(m-1)+2*k;
    while (1) {
        if (*b>*d) {
            swap(b,c);
            b--;c--;
            if (b<a) {
                while(c>=a){
                    swap(c,d);
                    c--;d--;
                }
                break;
            }
        } else {
            swap(c,d);
            c--;d--;
            if (c<=b) break;
        }
    }
}

#define RATIO 0.2 //RATIO<=0.5
void merge_sortD(d_type array[], int n){
    if (n<50) {
        INSERTION_SORT(array,1,n)
        return;
    }
    int k=n*RATIO,m=n-k;//k=n/2をk=n*0.2に変えたら,なぜか高速化
    merge_sortD(array,m);
    merge_sortD(array+n,k);
    mergeD(array,m,k);
}

void merge_sort(d_type array[], int n){
    int m=n*(1.0-RATIO);//mを最大限まで大きくした
    merge_sortD(array,m);
    int k=(n-m)/2,border=n/60;
    while (k>=2){
        if (k>=border)//残ったデータを高速にマージする工夫
            merge_sortD(array+m,k);
        else
            merge_sort(array+m,n-m);
        int i=m;
        for (;i<m+k;i++) swap(array+i,array+i+k);
        mergeD(array,m,k);       
        m=m+k;
        k=(n-m)/2;
    }
    INSERTION_SORT(array,m,n)
}
#undef RATIO
#undef INSERTION_SORT

int main(){
    int n=100000000;
    int i;
    clock_t start,end;
    d_type *array=malloc(n*sizeof(d_type));
    srand((unsigned) time(NULL));
    for (i=0;i<n;i++) array[i]=rand()*(RAND_MAX+1)+rand();
    start=clock();
    merge_sort(array,n);
    end=clock();
    printf("%f秒  \n",(double)(end-start)/CLOCKS_PER_SEC);
    for (i=1;i<n;i++) if(array[i-1]>array[i]) printf("%d %d\n",i,array[i]);
    free(array);
    return 0;
}

最新のコードは,上記のコードの関数 merge_sort() の一部を次の様に変更しました。

        int border=n/60;
        if (k>=border)//残ったデータを高速にマージする工夫
            merge_sortD(array+m,k);
        else
            merge_sort(array+m,n-m);
        int i=m;
        for (;i<m+k;i++) swap(array+i,array+i+k);
                  
        int i=m;//残ったデータを高速にマージする工夫
        for (;i<m+k;i++) if (array[i]<array[i+k]) swap(array+i,array+i+k);
        merge_sort(array+m+k,n-m-k);

このコードでは, k<border のときに小さい要素だけがマージされます。その基本的なアイデアは次の様です。

    (整列されたm個,整列されていない2k 個) → (整列されたm個,整列されていないk個,整列されたk個) → (整列されたm+k個,整列されていないk個)
と変形するときに,整列されたk個の要素の全てが,整列されていないk個のどの要素よりも大きく無いようにしたい。
そうすれば,整列されたm+k個の要素の半数位は,整列されていないk個のどの要素よりも大きく無くなるから,次に整列されていないk個を処理するときに 高速化される。
そのためには,整列されていない2k個の要素を整列されていないk個と整列されたk個の要素に分けるときに,このインプレースマージソート自身を用いれば よい。


クイックソートとの比較

複雑さを極力抑えたインプレースマージソートのコードと,私ができる限り高速化したクイックソートのコードとの速度比較ができるコードを作りました。
私のパソコンで速度を測定したところでは,このインプレースマージソートの1億個のランダムなデータに対する平均計算時間は,最高速と言われているクイッ クソートと同じくらいです。
実際にコンパイルして実行してみて下さい。

#include <stdio.h>
#include <stdlib.h>
#include <time.h>

typedef int d_type;// ソートするキーの型

#define INSERTION_SORT(array,m,n) {\
    int i=m;\
    for (;i<n;i++){\
        d_type tmp=array[i];\
        int j=i-1;\
        for (;j>=0 && array[j]>tmp;j--) array[j+1]=array[j];\
        array[j+1]=tmp;\
    }\
}

void insertion_sort(d_type array[],int n){
    INSERTION_SORT(array,1,n)
}

void swap(d_type *x,d_type *y){
    d_type tmp=*x;
    *x=*y;
    *y=tmp;
}

d_type med3(d_type x,d_type y,d_type z){
    if (x<y){
        if (y<z)      return y;
        else if (z<x) return x;
    }else{
        if (z<y)      return y;
        else if (x<z) return x;
    }
    return z;
}

void quick_sort(d_type array[], int begin, int end){
    int n=end-begin;
    if (n<50){
        insertion_sort(array+begin,n+1);
        return;
    }
    d_type pivot=med3(array[begin],array[begin+n/2],array[end]);
    int l=begin,r=end;
    while(1){
        while(array[l]<pivot) l++;
        while(array[r]>pivot) r--;
        if(l>=r) break;
        swap(array+l,array+r);
        l++;r--;
    }
    quick_sort(array,begin,r);//l=r+1
    quick_sort(array,r+1,end);
}

void mergeD(d_type a[], int m, int k){
    d_type *b=a+(m-1),*c=a+(m-1)+k,*d=a+(m-1)+2*k;
    while (1) {
        if (*b>*d) {
            swap(b,c);
            b--;c--;
            if (b<a) {
                while(c>=a){
                    swap(c,d);
                    c--;d--;
                }
                break;
            }
        } else {
            swap(c,d);
            c--;d--;
            if (c<=b) break;
        }
    }
}

#define RATIO 0.2 //RATIO<=0.5
void merge_sortD(d_type array[], int n){
    if (n<50) {
        INSERTION_SORT(array,1,n)
        return;
    }
    int k=n*RATIO,m=n-k;//k=n/2をk=n*0.2に変えたら,なぜか高速化
    merge_sortD(array,m);
    merge_sortD(array+n,k);
    mergeD(array,m,k);
}

void merge_sort(d_type array[], int n){
    int m=n*(1.0-RATIO);//mを最大限まで大きくした
    merge_sortD(array,m);
    int k=(n-m)/2;
    while (k>=2){
        int i=m;//残ったデータを高速にマージする工夫
        for (;i<m+k;i++) if (array[i]<array[i+k]) swap(array+i,array+i+k);
        merge_sort(array+m+k,n-m-k);
        mergeD(array,m,k);
        m=m+k;
        k=(n-m)/2;
    }
    INSERTION_SORT(array,m,n)
}
#undef RATIO
#undef INSERTION_SORT

int main(){
    int n=100000000;
    int i;
    clock_t start,end,end2;
    d_type *array=malloc(n*sizeof(d_type));
    d_type *array2=malloc(n*sizeof(d_type));
    srand((unsigned) time(NULL));
    for (i=0;i<n;i++){
        array[i]=rand()*(RAND_MAX+1)+rand();
        array2[i]=array[i];
    }
    start=clock();
    merge_sort(array,n);
    end=clock();
    quick_sort(array2,0,n-1);
    end2=clock();
    printf("merge_sort %f秒  \n",(double)(end-start)/CLOCKS_PER_SEC);
    printf("quick_sort %f秒  \n",(double)(end2-end)/CLOCKS_PER_SEC);
    for (i=0;i<n;i++) if(array[i]!=array2[i]) printf("%d %d\n",array[i],array2[i]);
    free(array);
    free(array2);
    return 0;
}

実行例

       実行例           実行例2

2台のパソコンで実行してみましたが,速度が逆転してしまいました。私には理由が分かりません。

最悪計算時間は,クイックソートはO(n2)で,イントロソートとヒープソートはO(n log n)です。
イントロソートは,最悪の場合に途中からヒープソートに切り替えるため,最悪計算時間はヒープソートの最悪計算時間より大きくなります。
ヒープソートは,平均計算時間がO(n log n)のソートの内で比較的遅いソートです。
イントロソートソートはクイックソートにO(n2)対策を行ったため,元のクイックソートより若干遅くなります。
これらのことより,このソートは,最悪計算時間がO(n log n)のインプレースソートのうちで,データ数が多いときの最悪計算時間が最少で はないかと考えています。

理由が大して分からずマジックのように速くなったソートなので,冗談でマジックソートとでも名前をつけたくなります。マージソートの考案者のフォン・ノイ マンもマジマジと見ているかもしれません。


最初のページ に戻る