超高速 In-Place Merge Sort
(クイックソートに比肩するインプレースなマージソート)
一般のマージソートでは,データの記憶領域と
同じ程度の追加の記憶領域が必要です。
in-place merge sort とは,追加の記憶領域をほとんど使わずに行うマージソートです。今までの
in-place
merge sort は,平均計算時間と最悪計算時間が共に,O(n (log n)2) でした。
この度(2025年1月),追加の使用メモリがO(log n)なのに平均計算時間と最悪計算時間が共にO(n log
n)の,in-place merge sort を作りました。ただし,安定ソートではありません。
まずは,ランダムなデータに対して,高速化の工夫がされていない素朴なクイックソートと競い合えるレベルまでの速度になりました。
その後,さらなる工夫を重ねたところ,高速化の工夫がされたクイックソートやイントロソートに比肩するまでになりました。
このソートは,最悪計算時間において,クイックソートやイントロソートよりも優れていて,インプレースなソートの内では最悪計算時間において最も
優れています。
再帰呼び出しを用いていて,追加の記憶領域は O(log n) 程度です。
なお,ほとんど整列されたデータに外れ値が入り込んでいるデータに対しては,ランダムなデータに対してより高速です。
このソートの核となるのは,整列されたm個のデータと整列されたk個のデータを,整列されていないk個のデータを上手く用いて,次のようにマージをするこ
とです。(ネットで検索して知った方法です)
(整列されたm個,整列されていないk個,整列されたk個) → (整列されたm+k個,整列されていないk個)
(整列されたk個,整列されていないk個,整列されたm個) → (整列されていないk個,整列されたm+k個)
次のコードでは,データ数がほぼ等しいデータ群をマージする割合を増やす工夫をすることにより,高速化しました。
また,データ数が少ないデータ群をソートするときには挿入ソートを用いました。データ数が大きく異なるデータ群をマージするときには,マージソートではな
い手法を用いました。
このコードは,高速化の工夫をしていない素朴なマージソートよりは速く,素朴なクイックソートと競い合えるレベルです。
#include <stdio.h>
#include <stdlib.h>
#include <math.h>
#include <time.h>
typedef int d_type;// ソートするキーの型
void merge_sortD(d_type a[], int n);
void merge_sortU(d_type a[], int n);
void insertion_sort(d_type array[],int n) {
int i;
for (i=1;i<n;i++) {
int j=i;
d_type tmp=array[i];
if (tmp<array[0]){
do {
array[j]=array[j-1];
j--;
}
while (j>0);
}else{
while (array[j-1]>tmp) {
array[j]=array[j-1];
j--;
}
}
array[j]=tmp;
}
}
void swap(d_type *x,d_type *y){
d_type tmp=*x;
*x=*y;
*y=tmp;
}
void mergeD(d_type a[], int m, int k){
d_type *b=a+(m-1),*c=b+k,*d=c+k;
while (1) {
if (*b>*d) {
swap(b,c);
b--;
c--;
if
(b<a) {
while(c>=a){
swap(c,d);
c--;
d--;
}
break;
}
} else {
swap(c,d);
c--;
d--;
if
(c<=b) break;
}
}
}
void mergeU(d_type c[], int m, int k){
d_type *b=c+k,*d=c-k,*a=c+m+k;
while (1) {
if (*b<*d) {
swap(b,c);
b++;
c++;
if
(b>=a) {
while(c<a){
swap(c,d);
c++;
d++;
}
break;
}
} else {
swap(c,d);
c++;
d++;
if
(c>=b) break;
}
}
}
#define INSERT_THRESH 60
void merge_sortD(d_type a[], int n){
if (n<INSERT_THRESH) {
insertion_sort(a,n);
return;
}
int k=n/2,m=n-k;
merge_sortD(a,m);
merge_sortU(a+n,k);
mergeD(a,m,k);
}
void merge_sortU(d_type a[], int n){
if (n<INSERT_THRESH) {
insertion_sort(a,n);
return;
}
int k=n/2,m=n-k;
merge_sortU(a+k,m);
merge_sortD(a-k,k);
mergeU(a,m,k);
}
#undef INSERT_THRESH
d_type maxim(d_type array[],int last){
d_type max=array[last];
int i,j=last;
for (i=last-1;i>=0;i--){
if (array[i]>max){
j=i;
max=array[j];
}
}
array[j]=array[last];
return max;
}
void merge_sort(d_type array[], int n){
if (n<60) {
insertion_sort(array,n);
return;
}
int m=n/2;
merge_sortD(array,m);
int k=(n-m)/2;
while(k>=sqrt(n)) {
merge_sortU(array+m+k,k);
mergeD(array,m,k);
m=m+k;
k=(n-m)/2;
}
//マージソートされていないデータの処理
int i=m,j=n-1;
d_type max=maxim(array+i,j-i);
while (1) {
if (array[i-1]>max){
array[j]=array[i-1];
array[i-1]=array[j-1];
i--;
j--;
if
(i==0){
merge_sort(array,j);
array[j]=max;
break;
}
}else{
array[j]=max;
j--;
if
(j<i) break;
max=maxim(array+i,j-i);
}
}
}
int main(){
int n=100000000;
int i;
clock_t start,end;
d_type *array=malloc(n*sizeof(d_type));
srand((unsigned) time(NULL));
for (i=0;i<n;i++){
array[i]=rand()*(RAND_MAX+1)+rand();
//整列されたデータに外れ値が入り込んでいるデータの例
//array[i]=i;
//if (rand()%10==0)
//
array[i]=(int)((double)rand()/(RAND_MAX+1)*n);
}
start=clock();
merge_sort(array,n);
end=clock();
printf("%f秒
\n",(double)(end-start)/CLOCKS_PER_SEC);
for (i=1;i<n;i++) if(array[i-1]>array[i])
printf("%d %d\n",i,array[i]);
free(array);
return 0;
}
最悪計算時間がO(n log n) となることは,次の様に分かります。
① ほぼ同じ大きさのデータをマージする操作の計算時間の合計はO(n log n) で抑えられる。
② 大きさが大きく異なるデータをマージする回数はO(log n)で抑えられるから,それらの計算時間の和はO(n log n) で抑えられる。
③ 高速化のテクニックは計算時間を減少させるだけである。
クイックソートに比肩する速度へ
まず,簡単な改良をします。
1 コードの一部の「k=n/2」を「k=n*(4.0/5.0)」に変えたところ,不思議にも大きく速くなりました。
データ数比 1:1でマージしていたのが,1:4でマージするように変わったので遅くなると思っていたのに,速くなるなんてびっくり。
私には理由は分かりませんが,「クイックソートのピボットは中央値でなく四分位数を選択したほうが高速」というブログを見て,もしかしたら分岐予測ミス確
率の関係かなと思っています。
2 次の変形により,少しだけ速くなりました。コンパイラの関係でしょうが,これも理由が分かりません。
d_type *b=a+(m-1),*c=a+(m-1)+k,*d=a+(m-1)+k+k; →
d_type *b=a+(m-1),*c=a+(m-1)+k,*d=a+(m-1)+k+k;
3 「AIがより高速なソートアルゴリズムを発見」というニュースを調べアイデアを得て,2種類の挿入ソートを合わせて1つの挿入ソートにしてみました。
2種類とは,データ数が少ないときに速い挿入ソートと,データ数が多いときに速い挿入ソートで,これらの特徴を生かしました。
理由が分からなくても,これらによりランダムなデータに対しては,高速化の工夫がされたクイックソートやイントロソートに近い速度にな
りまし
た。
最悪計算時間については,クイックソートはO(n2)で,イントロソートとヒープソートとこのソートは同じO(n log n)です。
イントロソートは,最悪の場合に途中からヒープソートに切り替えるため,最悪計算時間はヒープソートの最悪計算時間より大きくなります。
ヒープソートは,平均計算時間がO(n log n)のソートの内で比較的遅いソートです。
これらのことより,このソートは,私が知る限り,インプレースなソートのうちでデータ数が多いときの最悪計算時間が最小と考えられます。
理由が分からずマジックのように速くなったソートなので,冗談でマージックソートとでも名前をつけたくなります。マージソートの考案者のフォン・ノイマン
もマジマジと見ているかもしれません。
注1 整列されたデータに外れ値が入り込んでいるデータでは,「k=n*(4.0/5.0)」の場合は「k=n/2」の場合より遅くなった。
注2 並列化により更に高速化する場合は,「k=n*(4.0/5.0)」では難しく,「k=n/5」に変えるのが良い。
#include <stdio.h>
#include <stdlib.h>
#include <math.h>
#include <time.h>
typedef int d_type;// ソートするキーの型
void merge_sortD(d_type a[], int n);
void merge_sortU(d_type a[], int n);
void swap(d_type *x,d_type *y){
d_type tmp=*x;
*x=*y;
*y=tmp;
}
#define INSERTION(data,n) \
int i;\
for (i=1;i<n;i++){\
if (data[i-1]>data[i]){\
int j=i;\
d_type tmp=data[i];\
do {\
data[j]=data[j-1];\
j--;\
}while (j>0 && data[j-1]>tmp);\
data[j]=tmp;\
}\
}
void insertion_sort5(d_type array[],int n) { //n>=5のときの挿入ソート
INSERTION(array,5)
for (;i<n;i++) {
int j=i;
d_type tmp=array[i];
if (tmp<array[0]){
do {
array[j]=array[j-1];
j--;
} while (j>0);
}else{
while (array[j-1]>tmp) {
array[j]=array[j-1];
j--;
}
}
array[j]=tmp;
}
}
void mergeD(d_type a[], int m, int k){
d_type *b=a+(m-1),*c=a+(m-1)+k,*d=a+(m-1)+k+k;
while (1) {
if (*b>*d) {
swap(b,c);
b--;
c--;
if (b<a) {
while(c>=a){
swap(c,d);
c--;
d--;
}
break;
}
} else {
swap(c,d);
c--;
d--;
if (c<=b) break;
}
}
}
void mergeU(d_type c[], int m, int k){
d_type *b=c+k,*d=c-k,*a=c+m+k;
while (1) {
if (*b<*d) {
swap(b,c);
b++;
c++;
if (b>=a) {
while(c<a){
swap(c,d);
c++;
d++;
}
break;
}
} else {
swap(c,d);
c++;
d++;
if (c>=b) break;
}
}
}
#define RATIO 0.8
#define INSERT_THRESH 60
void merge_sortD(d_type a[], int n){
if (n<INSERT_THRESH) {
insertion_sort5(a,n);
return;
}
int k=n*RATIO,m=n-k;//k=n/2をk=n/5*4に変えたら、なぜか高速になった
merge_sortD(a,m);
merge_sortU(a+n,k);
mergeD(a,m,k);
}
void merge_sortU(d_type a[], int n){
if (n<INSERT_THRESH) {
insertion_sort5(a,n);
return;
}
int k=n*RATIO,m=n-k;//k=n/2をk=n/5*4に変えたら、なぜか高速になった
merge_sortU(a+k,m);
merge_sortD(a-k,k);
mergeU(a,m,k);
}
#undef INSERT_THRESH
d_type maxim(d_type array[],int last){
d_type max=array[last];
int i,j=last;
for (i=last-1;i>=0;i--){
if (array[i]>max){
j=i;
max=array[j];
}
}
array[j]=array[last];
return max;
}
void merge_sort(d_type array[], int n){
if (n<60) {
if (n<5){
INSERTION(array,n)
}else
insertion_sort5(array,n);
return;
}
int m=n/(1.0+RATIO);//mを最大限まで大きくした
merge_sortD(array,m);
int k=(n-m)/2,end=sqrt(n);
while(k>=end) {
merge_sortU(array+m+k,k);
mergeD(array,m,k);
m=m+k;
k=(n-m)/2;
}
//マージソートされていないデータの処理
int i=m,j=n-1;
d_type max=maxim(array+i,j-i);
while (1) {
if (array[i-1]>max){
array[j]=array[i-1];
array[i-1]=array[j-1];
i--;
j--;
if (i==0){
merge_sort(array,j);
array[j]=max;
break;
}
}else{
array[j]=max;
j--;
if (j<i) break;
max=maxim(array+i,j-i);
}
}
}
#undef RATIO
#undef INSERTION
int main(){
int n=100000000;
int i;
clock_t start,end;
d_type *array=malloc(n*sizeof(d_type));
srand((unsigned) time(NULL));
for (i=0;i<n;i++){
array[i]=rand()*(RAND_MAX+1)+rand();
//整列されたデータに外れ値が入り込んでいるデータの例
//array[i]=i;
//if (rand()%10==0)
// array[i]=(int)((double)rand()/(RAND_MAX+1)*n);
}
start=clock();
merge_sort(array,n);
end=clock();
printf("%f秒 \n",(double)(end-start)/CLOCKS_PER_SEC);
for (i=1;i<n;i++) if(array[i-1]>array[i]) printf("%d %d\n",i,array[i]);
free(array);
return 0;
}
さらに高速化しようと思い,最後の方でソートされていない要素を処理するのに,ヒープソートを用いた改良を加えました。その基本的なアイデアは次の様で
す。
(整列されたm個,整列されていない2k個) → (整列されたm個,整列されていないk個,整列されたk個) → (整列されたm+k個,整列されていないk個)
と変形するときに,整列されたk個の要素が全て,整列されていないk個のどの要素よりも大きく無いようにしたい。
そうすれば,整列されたm+k個の要素の半数位は,整列されていないk個のどの要素よりも大きく無くなるから,次に整列されていないk個を処理するときに高速化される。
そのためには,整列されていない2k個の要素を整列されていないk個と整列されたk個の要素に分けるときに,ヒープソートを用いればよい。
この処理により,1億個のランダムなデータのソートでは数%は速く成り,高速化の工夫がされたクイックソートやイントロソートに対抗できる速度に
なりました。
また,<math.h> を使わないようにしようと思い,平方根を求める関数はニュートン法を用いて自作しました。
ただし,基本のソートのプログラムとしては,大分複雑になってしまいました。
#include <stdio.h>
#include <stdlib.h>
#include <time.h>
typedef int d_type;// ソートするキーの型
void merge_sortD(d_type a[], int n);
void merge_sortU(d_type a[], int n);
void swap(d_type *x,d_type *y){
d_type tmp=*x;
*x=*y;
*y=tmp;
}
#define INSERTION(data,n) \
int i;\
for (i=1;i<n;i++){\
if (data[i-1]>data[i]){\
int j=i;\
d_type tmp=data[i];\
do {\
data[j]=data[j-1];\
j--;\
}while (j>0 && data[j-1]>tmp);\
data[j]=tmp;\
}\
}
void insertion_sort5(d_type array[],int n) { //n>=5のときの挿入ソート
INSERTION(array,5)
for (;i<n;i++) {
int j=i;
d_type tmp=array[i];
if (tmp<array[0]){
do {
array[j]=array[j-1];
j--;
} while (j>0);
}else{
while (array[j-1]>tmp) {
array[j]=array[j-1];
j--;
}
}
array[j]=tmp;
}
}
void mergeD(d_type a[], int m, int k){
d_type *b=a+(m-1),*c=a+(m-1)+k,*d=a+(m-1)+k+k;
while (1) {
if (*b>*d) {
swap(b,c);
b--;
c--;
if (b<a) {
while(c>=a){
swap(c,d);
c--;
d--;
}
break;
}
} else {
swap(c,d);
c--;
d--;
if (c<=b) break;
}
}
}
void mergeU(d_type c[], int m, int k){
d_type *b=c+k,*d=c-k,*a=c+m+k;
while (1) {
if (*b<*d) {
swap(b,c);
b++;
c++;
if (b>=a) {
while(c<a){
swap(c,d);
c++;
d++;
}
break;
}
} else {
swap(c,d);
c++;
d++;
if (c>=b) break;
}
}
}
#define RATIO 0.8
#define INSERT_THRESH 60
void merge_sortD(d_type a[], int n){
if (n<INSERT_THRESH) {
insertion_sort5(a,n);
return;
}
int k=n*RATIO,m=n-k;//k=n/2をk=n/5*4に変えたら、なぜか高速になった
merge_sortD(a,m);
merge_sortU(a+n,k);
mergeD(a,m,k);
}
void merge_sortU(d_type a[], int n){
if (n<INSERT_THRESH) {
insertion_sort5(a,n);
return;
}
int k=n*RATIO,m=n-k;//k=n/2をk=n/5*4 に変えたら、なぜか高速になった
merge_sortU(a+k,m);
merge_sortD(a-k,k);
mergeU(a,m,k);
}
#undef INSERT_THRESH
#define N 5
void downheap(d_type *array,d_type work,int parent,int child,int last) {
while (1) {
if (child<last) {
#if (N==2)
if (array[child+1]<array[child]) child++;
#else
int i=child,end=child+N-1;
if (last<end) end=last;
do {
if (array[++i]<array[child]) child=i;
} while (i<end);
#endif
} else if (child>last) {
break;
}
if (work<=array[child]) break;
array[parent]=array[child];
parent=child;
child=N*parent+1;
}
array[parent]=work;
}
void heap_small(d_type array[],int k,int last){
int i;
for (i=(last-1)/N;i>=0;i--)
downheap(array,array[i],i,N*i+1,last);
//小さいk個を大きい順に並べる
int end=last-k+1;
for (i=last;;){
d_type work=array[i];
array[i]=array[0];
if (i==end){
array[0]=work;
break;
}
downheap(array,work,0,1,--i);
}
//小さい順に並べ替える
d_type *ak=array+k,*al=array+last;
do{
swap(ak,al);
}while (++ak < --al);
}
#undef N
d_type maxim(d_type array[],int last){
d_type max=array[last];
int i,j=last;
for (i=last-1;i>=0;i--){
if (array[i]>max){
j=i;
max=array[j];
}
}
array[j]=array[last];
return max;
}
double sqrt_n(double n){ //ニュートン法でsqrt(n)を求める
double x,y;
if (n<=500) x=10; else x=50;
y=(x+n/x)/2;
do {
x=y;
y=(x+n/x)/2;
} while (x-y>0.01);
return y;
}
void merge_sort(d_type array[], int n){
if (n<60) {
if (n<5){
INSERTION(array,n)
}else
insertion_sort5(array,n);
return;
}
int m=n/(1.0+RATIO);//mを最大限まで大きくした
merge_sortD(array,m);
int k=(n-m)/2,end;
if (n>=10000) end=n/100; else end=sqrt_n(n);
while (k>=end){
merge_sortU(array+m+k,k);
mergeD(array,m,k);
m=m+k;
k=(n-m)/2;
}
while(k>=40){
heap_small(array+m,k,n-m-1);
mergeD(array,m,k);
m=m+k;
k=(n-m)/2;
}
//マージソートされていないデータの処理
int i=m,j=n-1;
d_type max=maxim(array+i,j-i);
while (1) {
if (array[i-1]>max){
array[j]=array[i-1];
array[i-1]=array[j-1];
i--;
j--;
if (i==0){
merge_sort(array,j);
array[j]=max;
break;
}
}else{
array[j]=max;
j--;
if (j<i) break;
max=maxim(array+i,j-i);
}
}
}
#undef RATIO
#undef INSERTION
int main(){
int n=100000000;
int i;
clock_t start,end;
d_type *array=malloc(n*sizeof(d_type));
srand((unsigned) time(NULL));
for (i=0;i<n;i++){
array[i]=rand()*(RAND_MAX+1)+rand();
//整列されたデータに外れ値が入り込んでいるデータの例
//array[i]=i;
//if (rand()%10==0)
// array[i]=(int)((double)rand()/(RAND_MAX+1)*n);
}
start=clock();
merge_sort(array,n);
end=clock();
printf("%f秒 \n",(double)(end-start)/CLOCKS_PER_SEC);
for (i=1;i<n;i++) if(array[i-1]>array[i]) printf("%d %d\n",i,array[i]);
free(array);
return 0;
}
参考
最高速と言われているクイックソートと速度
比較ができるように,以下にクイックソート(イントロソート)のコードを掲載します。
クイックソートの最悪計算時間はO(n2)なので,
最悪計算時間がO(n log n)同士で比べるために,純粋なクイックソートよりも若干だけ遅くなるイントロソートを掲載しました。
なお,このソートにある挿入ソートには,in-place merge sortにある挿入ソートに対して行った高速化の工夫に対応する工夫を行っています。
私のパソコンで速度を測定したところでは,in-place
merge sortとイントロソートの差は条件によって変わる程度です。
実際にコンパイルして実行し,速度を比べてみて下さい。また,標準ライブラリ関数qsort()とも比べてみて下さい。(qsortは遅いです)
#include <stdio.h>
#include <stdlib.h>
#include <time.h>
typedef int d_type;// ソートするキーの型
void swap(d_type *x,d_type *y){
d_type tmp=*x;
*x=*y;
*y=tmp;
}
void insertion_sort(d_type array[],int n) {
if (n<=1) return;
if (array[0]>array[1]) swap(array,array+1);
if (n<=2) return;
if (array[1]>array[2]) swap(array+1,array+2);
if (array[0]>array[1]) swap(array,array+1);
int i;
for (i=3;i<n;i++) {
int j=i;
d_type tmp=array[i];
if (tmp<array[0]){
do {
array[j]=array[j-1];
j--;
} while (j>0);
}else{
while (array[j-1]>tmp) {
array[j]=array[j-1];
j--;
}
}
array[j]=tmp;
}
}
void downheap(d_type *array,d_type work,int parent,int child,int last) {
while (1) {
if (child<last) {
if (array[child+1]>array[child]) child++;
} else if (child>last) {
break;
}
if (work>=array[child]) break;
array[parent]=array[child];
parent=child;
child=2*parent+1;
}
array[parent]=work;
}
void heap_sort(d_type *array, int last) {
int i;
for (i=(last-1)/2;i>=0;i--) {
downheap(array,array[i],i,2*i+1,last);
}
for (i=last;i>0;) {
d_type work=array[i];
array[i]=array[0];
downheap(array,work,0,1,--i);
}
}
d_type med3(d_type x,d_type y,d_type z){
if (x<y){
if (y<z) return y;
else if (z<x) return x;
}else{
if (z<y) return y;
else if (x<z) return x;
}
return z;
}
#define INSERT_THRESH 60
void safe_quick_sort(d_type *array,int begin,int end,int maxDepth) {
int range=end-begin;
if (range<INSERT_THRESH){
insertion_sort(array+begin,range+1);
return;
}
if (maxDepth<=0){
heap_sort(array+begin,range);
return;
}
d_type pivot=med3(array[begin],array[begin+range/2],array[end]);
int l=begin,r=end;
while(1){
while(array[l]<pivot) l++;
while(array[r]>pivot) r--;
if(l>=r) break;
swap(&array[l],&array[r]);
l++;r--;
}
maxDepth--;
safe_quick_sort(array,begin,r,maxDepth);//l=r+1
safe_quick_sort(array,r+1,end,maxDepth);
}
void quick_sort(d_type *array,int n) {
if (n<60){
insertion_sort(array,n);
return;
}
int M=INSERT_THRESH,maxDepth=0;
while (M<n) {//maxDepth=2*log2(n/INSERT_THRESH)
M=M*2;
maxDepth=maxDepth+2;
}
safe_quick_sort(array,0,n-1,maxDepth);
}
#undef INSERT_THRESH
int compare(const void *a, const void *b){
return *(int*)a-*(int*)b;
}
int main(){
int n=100000000;
int i;
clock_t start,end;
d_type *array=malloc(n*sizeof(d_type));
srand((unsigned) time(NULL));
for (i=0;i<n;i++){
array[i]=rand()*(RAND_MAX+1)+rand();
//整列されたデータに外れ値が入り込んでいるデータの例
//array[i]=i;
//if (rand()%10==0)
// array[i]=(int)((double)rand()/(RAND_MAX+1)*n);
}
start=clock();
quick_sort(array,n);
//qsort(array,n,sizeof(int),compare);
end=clock();
printf("%f秒 \n",(double)(end-start)/CLOCKS_PER_SEC);
for (i=1;i<n;i++) if(array[i-1]>array[i]) printf("%d %d\n",i,array[i]);
free(array);
return 0;
}