超高速 In-Place Merge Sort
(クイックソートに比肩するインプレースなマージソート)
クイックソートに比肩する速度へ
一般のマージソートでは,データの記憶領域と
同じ程度の追加の記憶領域が必要です。in-place merge sort(インプレースマージソート)とは,追加の記憶領域をほとんど使わずに行うマージソートです。
今までのインプレースマージソートの多くは,平均計算時間と最悪計算時間が共に O(n (log n)2) で,一般のマージソートと同じ安定ソートでした。
この度(2025年1月),追加の使用メモリがO(log n)なのに平均計算時間と最悪計算時間が共にO(n log
n)の,in-place merge sort を作りました。ただし,安定ソートではありません。
クイックソートは,分割した配列が一定の要素数になったら挿入ソートに切り替える事により高速化できる事が知られています。マージソートも同様な方法で高速化できます。
このインプレースマージソートを,まずは挿入ソートを用いて高速化したところ,ランダムなデータに対して,高速化されていないクイックソートと競い合える速度になりました。
その後,さらなる工夫を重ねたところ,高速化されたクイックソートやイントロソートに比肩する速度までになりました。
このソートは,最悪計算時間において,クイックソートやイントロソートよりも優れていて,私の知る限りインプレースなソートの内では最悪計算時間において最も
優れています。
再帰呼び出しを用いていて,追加の記憶領域は O(log n) 程度です。
なお,ほとんど整列されたデータに外れ値が入り込んでいるデータに対しては,ランダムなデータに対してより高速です。
このソートの核となるのは,整列されたm個のデータと整列されたk個のデータを,整列されていないk個のデータを上手く用いて,次のようにマージをするこ
とです。(ネットで検索して知った方法です)
(整列されたm個,整列されていないk個,整列されたk個) → (整列されたm+k個,整列されていないk個)
(整列されたk個,整列されていないk個,整列されたm個) → (整列されていないk個,整列されたm+k個)
最初に考えたコードでは,データ数がほぼ等しいデータ群をマージしていました。また,データ数が少ないデータ群をソートするときには挿入ソートを用いました。
データ数が大きく異なるデータ群をマージするときには,マージソートではない手法を用いました。
最初のコードは,高速化していないマージソートよりは速く,高速化していないクイックソートと競い合えるレベルでした。
そのコードに,次の2つの変形を加えたのが,以下にあるコードです。
1 コードの一部の「k=n/2」を「k=n*(4.0/5.0)」に変えたところ,不思議にも大きく速くなりました。
データ数比 1:1でマージしていたのが,1:4でマージするように変わったので遅くなると思っていたのに,速くなるなんてびっくり。
私には理由は分かりませんが,「クイックソートのピボットは中央値でなく四分位数を選択したほうが高速」というブログを見て,もしかしたら分岐予測ミス確率の関係かなと思っています。
2 次の変形により,少しだけ速くなりました。コンパイラの関係でしょうが,これも理由が分かりません。
d_type *b=a+(m-1),*c=a+(m-1)+k,*d=a+(m-1)+k+k; → d_type *b=a+(m-1),*c=a+(m-1)+k,*d=a+(m-1)+2*k;
#include <stdio.h>
#include <stdlib.h>
#include <math.h>
#include <time.h>
typedef int d_type;// ソートするキーの型
void merge_sortD(d_type a[], int n);
void merge_sortU(d_type a[], int n);
void swap(d_type *x,d_type *y){
d_type tmp=*x;
*x=*y;
*y=tmp;
}
void insertion_sort(d_type array[],int n) {
int i;
for (i=1;i<n;i++) {
int j=i;
d_type tmp=array[i];
if (tmp<array[0]){
do {
array[j]=array[j-1];
j--;
} while (j>0);
}else{
while (array[j-1]>tmp) {
array[j]=array[j-1];
j--;
}
}
array[j]=tmp;
}
}
void mergeD(d_type a[], int m, int k){
d_type *b=a+(m-1),*c=a+(m-1)+k,*d=a+(m-1)+2*k;
while (1) {
if (*b>*d) {
swap(b,c);
b--;
c--;
if (b<a) {
while(c>=a){
swap(c,d);
c--;
d--;
}
break;
}
} else {
swap(c,d);
c--;
d--;
if (c<=b) break;
}
}
}
void mergeU(d_type c[], int m, int k){
d_type *b=c+k,*d=c-k,*a=c+m+k;
while (1) {
if (*b<*d) {
swap(b,c);
b++;
c++;
if (b>=a) {
while(c<a){
swap(c,d);
c++;
d++;
}
break;
}
} else {
swap(c,d);
c++;
d++;
if (c>=b) break;
}
}
}
#define RATIO 0.8
#define INSERT_THRESH 60
void merge_sortD(d_type a[], int n){
if (n<INSERT_THRESH) {
insertion_sort(a,n);
return;
}
int k=n*RATIO,m=n-k;//k=n/2をk=n/5*4に変えたら、なぜか高速化
merge_sortD(a,m);
merge_sortU(a+n,k);
mergeD(a,m,k);
}
void merge_sortU(d_type a[], int n){
if (n<INSERT_THRESH) {
insertion_sort(a,n);
return;
}
int k=n*RATIO,m=n-k;//k=n/2をk=n/5*4に変えたら、なぜか高速化
merge_sortU(a+k,m);
merge_sortD(a-k,k);
mergeU(a,m,k);
}
#undef INSERT_THRESH
d_type maxim(d_type array[],int last){
int i,max_i=last;
for (i=last-1;i>=0;i--){
if (array[i]>array[max_i]) max_i=i;
}
d_type maxim=array[max_i];
array[max_i]=array[last];
return maxim;
}
void merge_sort(d_type array[], int n){
if (n<60) {
insertion_sort(array,n);
return;
}
int m=n/(1.0+RATIO);//mを最大限まで大きくした
merge_sortD(array,m);
int k=(n-m)/2,end=sqrt(n);
while(k>=end) {
merge_sortU(array+m+k,k);
mergeD(array,m,k);
m=m+k;
k=(n-m)/2;
}
//マージソートされていないデータの処理
int i=m,j=n-1;
d_type max=maxim(array+i,j-i);
while (1) {
if (array[i-1]>max){
array[j]=array[i-1];
array[i-1]=array[j-1];
i--;
j--;
if (i==0){
merge_sort(array,j);
array[j]=max;
break;
}
}else{
array[j]=max;
j--;
if (j<i) break;
max=maxim(array+i,j-i);
}
}
}
#undef RATIO
#undef INSERTION
int main(){
int n=100000000;
int i;
clock_t start,end;
d_type *array=malloc(n*sizeof(d_type));
srand((unsigned) time(NULL));
for (i=0;i<n;i++){
array[i]=rand()*(RAND_MAX+1)+rand();
//整列されたデータに外れ値が入り込んでいるデータの例
//array[i]=i;
//if (rand()%10==0)
// array[i]=(int)((double)rand()/(RAND_MAX+1)*n);
}
start=clock();
merge_sort(array,n);
end=clock();
printf("%f秒 \n",(double)(end-start)/CLOCKS_PER_SEC);
for (i=1;i<n;i++) if(array[i-1]>array[i]) printf("%d %d\n",i,array[i]);
free(array);
return 0;
}
理由が分からなくても,これらによりランダムなデータに対しては,高速化されたクイックソートやイントロソートに近い速度にな
りまし
た。
ただし,別のコンパイラではどこまで速くなるか不明です。
最悪計算時間がO(n log n) となることは,次の様に分かります。
① ほぼ同じ大きさのデータをマージする操作の計算時間の合計はO(n log n) で抑えられる。
② 大きさが大きく異なるデータをマージする回数はO(log n)で抑えられるから,それらの計算時間の和はO(n log n) で抑えられる。
③ 高速化のテクニックは計算時間を減少させるだけである。
注1 整列されたデータに外れ値が入り込んでいるデータでは,「k=n*(4.0/5.0)」の場合は「k=n/2」の場合より遅くなった。
注2 並列化により更に高速化する場合は,「k=n*(4.0/5.0)」では難しく,「k=n/5」に変えるのが良い。
「クイックソートが最速」という常識の終わり
実用では上記のコードの速度で十分だと思います。しかし,もう少しだけ速くして,高速化されたクイックソートやイントロソートと拮抗させたいと思い,次の2つの改良を加えました。
ただし,基本のソートのプログラムとしては,大分複雑になってしまいました。
1 「AIがより高速なソートアルゴリズムを発見」というニュースが以前にありましたが,その方法からアイデアを得て,2種類の挿入ソートを合わせて1つの挿入ソートにしたところ,少しだけ速くなりました。
データ数が少ないときに速い挿入ソートと,データ数が多いときに速い挿入ソートの特徴を生かしたつもりでした。
しかし,速くしようと思ってコードを次の様に変えると,極端に遅くなってしまいます。コンパイラのご機嫌で速くなったり遅くなったり,かもしれません。
#define INSERTION(data,n) \
int i=1;\
do {\
if (data[i-1]>data[i]){\
int j=i;\
d_type tmp=data[i];\
do {\
data[j]=data[j-1];\
j--;\
}while (j>0 && data[j-1]>tmp);\
data[j]=tmp;\
}\
i++;\
}while (i<n);
2 このソートの終わりの方にあるソートされていない要素の処理に,ヒープソートを用いた改良を加えました。その基本的なアイデアは次の様です。
(整列されたm個,整列されていない2k個) → (整列されたm個,整列されていないk個,整列されたk個) → (整列されたm+k個,整列されていないk個)
と変形するときに,整列されたk個の要素の全てが,整列されていないk個のどの要素よりも大きく無いようにしたい。
そうすれば,整列されたm+k個の要素の半数位は,整列されていないk個のどの要素よりも大きく無くなるから,次に整列されていないk個を処理するときに高速化される。
そのためには,整列されていない2k個の要素を整列されていないk個と整列されたk個の要素に分けるときに,ヒープソートを用いればよい。
これら2つの改良により,1億個のランダムなデータのソートでは数%は速く成り,高速化されたクイックソートやイントロソートに対抗できる速度に
なりました。
最悪計算時間については,クイックソートはO(n2)で,イントロソートとヒープソートとこのソートは同じO(n log n)です。
イントロソートは,最悪の場合に途中からヒープソートに切り替えるため,最悪計算時間はヒープソートの最悪計算時間より大きくなります。
ヒープソートは,平均計算時間がO(n log n)のソートの内で比較的遅いソートです。
これらのことより,このソートは,私が知る限り,インプレースなソートのうちでデータ数が多いときの最悪計算時間が最小と考えられます。
理由が分からずマジックのように速くなったソートなので,冗談でマージックソートとでも名前をつけたくなります。マージソートの考案者のフォン・ノイマン
もマジマジと見ているかもしれません。
#include <stdio.h>
#include <stdlib.h>
#include <math.h>
#include <time.h>
typedef int d_type;// ソートするキーの型
void merge_sortD(d_type a[], int n);
void merge_sortU(d_type a[], int n);
void swap(d_type *x,d_type *y){
d_type tmp=*x;
*x=*y;
*y=tmp;
}
#define INSERTION(data,n) \
int i;\
for (i=1;i<n;i++){\
if (data[i-1]>data[i]){\
int j=i;\
d_type tmp=data[i];\
do {\
data[j]=data[j-1];\
j--;\
}while (j>0 && data[j-1]>tmp);\
data[j]=tmp;\
}\
}
void insertion_sort5(d_type array[],int n) { //n>=5のときの挿入ソート
INSERTION(array,5)
for (;i<n;i++) {
int j=i;
d_type tmp=array[i];
if (tmp<array[0]){
do {
array[j]=array[j-1];
j--;
} while (j>0);
}else{
while (array[j-1]>tmp) {
array[j]=array[j-1];
j--;
}
}
array[j]=tmp;
}
}
void mergeD(d_type a[], int m, int k){
d_type *b=a+(m-1),*c=a+(m-1)+k,*d=a+(m-1)+2*k;
while (1) {
if (*b>*d) {
swap(b,c);
b--;
c--;
if (b<a) {
while(c>=a){
swap(c,d);
c--;
d--;
}
break;
}
} else {
swap(c,d);
c--;
d--;
if (c<=b) break;
}
}
}
void mergeU(d_type c[], int m, int k){
d_type *b=c+k,*d=c-k,*a=c+m+k;
while (1) {
if (*b<*d) {
swap(b,c);
b++;
c++;
if (b>=a) {
while(c<a){
swap(c,d);
c++;
d++;
}
break;
}
} else {
swap(c,d);
c++;
d++;
if (c>=b) break;
}
}
}
#define RATIO 0.8
#define INSERT_THRESH 60
void merge_sortD(d_type a[], int n){
if (n<INSERT_THRESH) {
insertion_sort5(a,n);
return;
}
int k=n*RATIO,m=n-k;//k=n/2をk=n/5*4に変えたら、なぜか高速化
merge_sortD(a,m);
merge_sortU(a+n,k);
mergeD(a,m,k);
}
void merge_sortU(d_type a[], int n){
if (n<INSERT_THRESH) {
insertion_sort5(a,n);
return;
}
int k=n*RATIO,m=n-k;//k=n/2をk=n/5*4 に変えたら、なぜか高速化
merge_sortU(a+k,m);
merge_sortD(a-k,k);
mergeU(a,m,k);
}
#undef INSERT_THRESH
#define N 5 //N分木ヒープソート N>=3
void downheap(d_type *array,d_type work,int parent,int child,int last) {
while (1) {
if (child<last) {
int i=child,end=child+N-1;
if (last<end) end=last;
do {
if (array[++i]<array[child]) child=i;
} while (i<end);
} else if (child>last) {
break;
}
if (work<=array[child]) break;
array[parent]=array[child];
parent=child;
child=N*parent+1;
}
array[parent]=work;
}
void heap_small(d_type array[],int k,int last){
int i;
for (i=(last-1)/N;i>=0;i--)
downheap(array,array[i],i,N*i+1,last);
//小さいk個を大きい順に並べる
int end=last-k+1;
for (i=last;;){
d_type work=array[i];
array[i]=array[0];
if (i<=end){
array[0]=work;
break;
}
downheap(array,work,0,1,--i);
}
//小さい順に並べ替える
d_type *ak=array+k,*al=array+last;
do{
swap(ak,al);
}while (++ak < --al);
}
#undef N
d_type maxim(d_type array[],int last){
int i,max_i=last;
for (i=last-1;i>=0;i--){
if (array[i]>array[max_i]) max_i=i;
}
d_type maxim=array[max_i];
array[max_i]=array[last];
return maxim;
}
void merge_sort(d_type array[], int n){
if (n<40) {
INSERTION(array,n)
return;
}
int m=n/(1.0+RATIO);//mを最大限まで大きくした
merge_sortD(array,m);
int k=(n-m)/2,end;
if (n<1600) end=sqrt(n); else end=n/80+20;
while (k>=end){
merge_sortU(array+m+k,k);
mergeD(array,m,k);
m=m+k;
k=(n-m)/2;
}
while(k>=40){
heap_small(array+m,k,n-m-1);
mergeD(array,m,k);
m=m+k;
k=(n-m)/2;
}
//マージソートされていないデータの処理
int i=m,j=n-1;
d_type max=maxim(array+i,j-i);
while (1) {
if (array[i-1]>max){
array[j]=array[i-1];
array[i-1]=array[j-1];
i--;
j--;
if (i==0){
merge_sort(array,j);
array[j]=max;
break;
}
}else{
array[j]=max;
j--;
if (j<i) break;
max=maxim(array+i,j-i);
}
}
}
#undef RATIO
#undef INSERTION
int main(){
int n=100000000;
int i;
clock_t start,end;
d_type *array=malloc(n*sizeof(d_type));
srand((unsigned) time(NULL));
for (i=0;i<n;i++){
array[i]=rand()*(RAND_MAX+1)+rand();
//整列されたデータに外れ値が入り込んでいるデータの例
//array[i]=i;
//if (rand()%10==0)
// array[i]=(int)((double)rand()/(RAND_MAX+1)*n);
}
start=clock();
merge_sort(array,n);
end=clock();
printf("%f秒 \n",(double)(end-start)/CLOCKS_PER_SEC);
for (i=1;i<n;i++) if(array[i-1]>array[i]) printf("%d %d\n",i,array[i]);
free(array);
return 0;
}
注 <math.h>が使えない場合は,次の様にニュートン法によって平方根を求めることができます。
if (n<1600) end=sqrt(n); else end=n/80+20;
↓
if (n<1600){
double x;
if (n<=128) x=8; else if (n<=512) x=16; else x=32;
x=(x+n/x)/2.0;
end=(x+n/x)/2.0;
}else {
end=n/80+20;
}
クイックソート(イントロソート)との比較
最高速と言われているクイックソートと速度
比較ができるように,以下に高速化したクイックソート(イントロソート)のコードを掲載します。
クイックソートの最悪計算時間はO(n2)なので,
最悪計算時間がO(n log n)同士で比べるために,純粋なクイックソートよりも若干だけ遅くなるイントロソートを掲載しました。
なお,このソートにある挿入ソートには,in-place merge sortにある挿入ソートに対して行った高速化の工夫に対応する工夫を行っています。
私のパソコンで速度を測定したところでは,in-place
merge sortとイントロソートの差は条件によって変わる程度です。
実際にコンパイルして実行し,速度を比べてみて下さい。また,標準ライブラリ関数qsort()とも比べてみて下さい。(qsortは汎用なので遅いです)
#include <stdio.h>
#include <stdlib.h>
#include <time.h>
typedef int d_type;// ソートするキーの型
void swap(d_type *x,d_type *y){
d_type tmp=*x;
*x=*y;
*y=tmp;
}
void insertion_sort(d_type array[],int n) {
if (n<=1) return;
if (array[0]>array[1]) swap(array,array+1);
if (n<=2) return;
if (array[1]>array[2]) swap(array+1,array+2);
if (array[0]>array[1]) swap(array,array+1);
int i;
for (i=3;i<n;i++) {
int j=i;
d_type tmp=array[i];
if (tmp<array[0]){
do {
array[j]=array[j-1];
j--;
} while (j>0);
}else{
while (array[j-1]>tmp) {
array[j]=array[j-1];
j--;
}
}
array[j]=tmp;
}
}
void downheap(d_type *array,d_type work,int parent,int child,int last) {
while (1) {
if (child<last) {
if (array[child+1]>array[child]) child++;
} else if (child>last) {
break;
}
if (work>=array[child]) break;
array[parent]=array[child];
parent=child;
child=2*parent+1;
}
array[parent]=work;
}
void heap_sort(d_type *array, int last) {
int i;
for (i=(last-1)/2;i>=0;i--) {
downheap(array,array[i],i,2*i+1,last);
}
for (i=last;i>0;) {
d_type work=array[i];
array[i]=array[0];
downheap(array,work,0,1,--i);
}
}
d_type med3(d_type x,d_type y,d_type z){
if (x<y){
if (y<z) return y;
else if (z<x) return x;
}else{
if (z<y) return y;
else if (x<z) return x;
}
return z;
}
#define INSERT_THRESH 60
void safe_quick_sort(d_type *array,int begin,int end,int maxDepth) {
int range=end-begin;
if (range<INSERT_THRESH){
insertion_sort(array+begin,range+1);
return;
}
if (maxDepth<=0){
heap_sort(array+begin,range);
return;
}
d_type pivot=med3(array[begin],array[begin+range/2],array[end]);
int l=begin,r=end;
while(1){
while(array[l]<pivot) l++;
while(array[r]>pivot) r--;
if(l>=r) break;
swap(&array[l],&array[r]);
l++;r--;
}
maxDepth--;
safe_quick_sort(array,begin,r,maxDepth);//l=r+1
safe_quick_sort(array,r+1,end,maxDepth);
}
void quick_sort(d_type *array,int n) {
if (n<60){
insertion_sort(array,n);
return;
}
int M=INSERT_THRESH,maxDepth=0;
while (M<n) {//maxDepth=2*log2(n/INSERT_THRESH)
M=M*2;
maxDepth=maxDepth+2;
}
safe_quick_sort(array,0,n-1,maxDepth);
}
#undef INSERT_THRESH
int compare(const void *a, const void *b){
return *(int*)a-*(int*)b;
}
int main(){
int n=100000000;
int i;
clock_t start,end;
d_type *array=malloc(n*sizeof(d_type));
srand((unsigned) time(NULL));
for (i=0;i<n;i++){
array[i]=rand()*(RAND_MAX+1)+rand();
//整列されたデータに外れ値が入り込んでいるデータの例
//array[i]=i;
//if (rand()%10==0)
// array[i]=(int)((double)rand()/(RAND_MAX+1)*n);
}
start=clock();
quick_sort(array,n);
//qsort(array,n,sizeof(int),compare);
end=clock();
printf("%f秒 \n",(double)(end-start)/CLOCKS_PER_SEC);
for (i=1;i<n;i++) if(array[i-1]>array[i]) printf("%d %d\n",i,array[i]);
free(array);
return 0;
}