超高速 In-place マージソート


一般のマージソートでは,データの記憶領域と 同じ程度の追加の記憶領域が必要です。
In-place マージソートとは,追加の記憶領域をほとんど使わずに行うマージソートです。今までのIn-place マージソートは,平均計算時間と最悪計算時間が共に,O(n (log n)2) でした。
この度(2025年1月),平均計算時間と最悪計算時間が共に,O(n log n) の In-place マージソート を考案しました。ただし,安定ソートではありません。
ランダムなデータに対する速度は、高速化の工夫をしていない素朴なマージソートよりは速く,素朴なクイックソートと大きくは違いません。
しかし,高速化の工夫がされたマージソートやクイックソート には及びません。
再帰呼び出しを用いていて,追加の記憶領域は O(log n) 程度です。
なお,ほとんど整列されたデータに外れ値が入り込んでいるデータに対しては,ランダムなデータに対してより高速です。

従来のIn-place マージソートが遅いのは,データ数が大きく異なるデータ群をマージするからです。
次のコードでは,データ数がほぼ等しいデータ群をマージする割合を増やす工夫をすることにより,高速化しました。
また,データ数が少ないデータ群をソートするときには挿入ソートを用いました。データ数が大きく異なるデータ群をマージするときには,マージソートではない手法を用いました。

このソートは,従来のマージソートや In-place マージソート とは性質が異なるので,冗談でマージックソートとでも名前をつけたくなります。
マージソートの考案者のフォン・ノイマンもマジマジと見ているかもしれません。

#include <stdio.h>
#include <stdlib.h>
#include <math.h>
#include <time.h>

typedef int d_type;// ソートするキーの型

void merge_sortD(d_type a[], int n);
void merge_sortU(d_type a[], int n);
 
void insertion_sort(d_type array[],int n){
    int i;
    for (i=1;i<n;i++) {
        int j=i;
        d_type tmp=array[i];
        if (tmp<array[0]){
            do {
                array[j]=array[j-1];
                j--;
            } while (j>0);
        }else{
            while (array[j-1]>tmp) {
                array[j]=array[j-1];
                j--;
            }
        }
        array[j]=tmp;
    }
}

#define swap(x,y) (tmp=*(x),*(x)=*(y),*(y)=tmp)
void mergeD(d_type a[], int m, int k){
    d_type *b=a+(m-1),*c=b+k,*d=c+k,tmp;
    while (1) {
        if (*b>*d) {
            swap(b,c);
            b--;
            c--;
            if (b<a) {
                while(c>=a){
                    swap(c,d);
                    c--;
                    d--;                   
                }
                break;
            }
        } else {
            swap(c,d);
            c--;
            d--;
            if (c<=b) break;
        }
    }
}

void mergeU(d_type a[], int m, int k){
    d_type *b=a+k,*c=a,*d=a-k,*e=a+m+k,tmp;
    while (1) {
        if (*b<*d) {
            swap(b,c);
            b++;
            c++;
            if (b>=e) {
                while(c<e){
                    swap(c,d);
                    c++;
                    d++;                   
                }
                break;
            }
        } else {
            swap(c,d);
            c++;
            d++;
            if (c>=b) break;
        }
    }
}
#undef swap

#define INSERT_THRESH 50
void merge_sortD(d_type a[], int n){
    if (n<INSERT_THRESH) {
        insertion_sort(a,n);
        return;
    }
    int k=n/2,m=n-k;
    merge_sortD(a,m);
    merge_sortU(a+n,k);
    mergeD(a,m,k);
}

void merge_sortU(d_type a[], int n){
    if (n<INSERT_THRESH) {
        insertion_sort(a,n);
        return;
    }
    int k=n/2,m=n-k;
    merge_sortU(a+k,m);
    merge_sortD(a-k,k);
    mergeU(a,m,k);
}

d_type maxim(d_type array[],int a,int b){
    d_type max=array[b];
    int i,j=b;
    for (i=b-1;i>=a;i--){
        if (array[i]>max){
            j=i;
            max=array[j];
        }
    }
    array[j]=array[b];
    return max;   
}

void merge_sort(d_type array[], int n){
    if (n<INSERT_THRESH) {
        insertion_sort(array,n);
        return;
    }

    int m=n/2;
    merge_sortD(array,m);

    int min=sqrt(n)*2;
    while(n-m>=min) {//残りがsqrt(n)個の2倍より少なくなったら打ち切る
        int k=(n-m)/2;
        merge_sortU(array+m+k,k);
        mergeD(array,m,k);
        m=m+k;   
    }

    //マージソートされていないデータの処理
    int i=m,j=n-1;
    d_type max=maxim(array,i,j);
    while (1) {
        if (array[i-1]>max){
            array[j]=array[i-1];
            array[i-1]=array[j-1];
            i--;
            j--;
            if (i==0){
                insertion_sort(array,j);
                array[j]=max;
                break;
            }
        }else{       
            array[j]=max;
            j--;
            if (i>j)break;
            max=maxim(array,i,j);
        }
    }
}
#undef INSERT_THRESH

int main(){
    int n=100000000;
    int i;
    clock_t start,end;
    d_type *array=malloc(n*sizeof(d_type));
    srand((unsigned) time(NULL));
    for (i=0;i<n;i++){
        array[i]=rand()*(RAND_MAX+1)+rand();
        //整列されたデータに外れ値が入り込んでいるデータの例
        //array[i]=i;
        //if (rand()%10==0)
        //    array[i]=(int)((double)rand()/(RAND_MAX+1)*n);
    }
    start=clock();
    merge_sort(array,n);
    end=clock();
    printf("%f秒  \n",(double)(end-start)/CLOCKS_PER_SEC);
    for (i=1;i<n;i++) if(array[i-1]>array[i]) printf("%d %d\n",i,array[i]);
    free(array);
    return 0;
}

Ⅰ 最悪計算時間がO(n log n) となることは,次の様に分かります。
 ① ほぼ同じ大きさのデータをマージする操作の計算時間の合計はO(n log n) で抑えられる。
 ② 大きさが大きく異なるデータをマージする回数はO(log n)で抑えられるから,それらの計算時間の和はO(n log n) で抑えられる。
 ③ 高速化のテクニックは計算時間を減少させるだけである。

Ⅱ 関数 merge_sortD( ) と merge_sortU( ) について,最初に思いついたのは下記のコードです。
しかし,ほとんど整列されたデータに外れ値が入り込んでいるデータに対する速度を比較して,上記のコードに変えました。
これにより,ランダムなデータに関しては、少しだけ遅くなりました。

void merge_sortD(d_type a[], int n){
    if (n<INSERT_THRESH) {
        insertion_sort(a,n);
        return;
    }
    int k=n/2,m=n-k;
    merge_sortD(a,m);
    merge_sortD(a+n,k);
    mergeD(a,m,k);
}

void merge_sortU(d_type a[], int n){
    if (n<INSERT_THRESH) {
        insertion_sort(a,n);
        return;
    }
    int k=n/2,m=n-k;
    merge_sortU(a+k,m);
    merge_sortU(a-k,k);
    mergeU(a,m,k);
}


最初のページ に戻る