超高速マージソートN分木ヒープソート,高速挿入ソート,超高速安全クイックソート(2023 年5月~2024 年1月)
マージソートを改良して高速化し,ランダムなデータに対してクイックソートより速くソートするようにで きました。
Wikipediaにある単純な コードと比較して, ランダムなデータに対して3倍以上高速です。
クイックソートが最速であるという常識が覆り,これからは,最速なソートはマージソートがであることが常識となります。
マージソートの方が遅いと言われているのは,マージソートを高速化する工夫が十分でなかったからだと思われます。
下にあるコードの速さを実感してみて下さ い。
なお,私の環境(Windows 10,AMD FX8370E,GCC の最適化レベル最高)では,1億個の整数の乱数を,c++の関数 sort()が8秒台でソートするところを4秒台でソートし ます。Cのqsort()は汎用のためか遅くて比較になりません。
私が調べた範囲では,ランダムなデータに対しては世界最速ですが,より速いコードがあればご連絡くだされば有難いです。
基本マージソートのアルゴリズムと の違いは以下のものです。
① 小さい部分のソートには挿入ソートを用いた。(マージソート,挿入ソートはともに安定ソート)
② 基本マージソートにあるデータのコピーを省いた。(array[]とwork[]のデータの移動を偶数回行うことにより,データのコピー無しにデータ はarray[]に戻る。)
③ 小さい方からと大きい方からの両方からマージすることにより,比較回数がデータ数で決まるようになり,終了判定を簡略化できた。
④ マージするときのif文に局所変数tempを用いた。
⑤ 速度に大きく影響する部分の配列の操作に,ポインタを使った
⑥ 追加の作業記憶領域を,データの記憶領域のほぼ1/2に抑えた。
①~⑤は高速化のためです。何故か④による高速化の効果が最大のようです。
if (array[j] < array[i]) {
work[k] = array[j++];
} else {
work[k] = array[i++];
}
を
if (array[j] < array[i]) {
temp = array[j++];
} else {
temp = array[i++];
}
work[k] = temp;
と変えただけで極端に速くなり,その原因は不明です。
また,③のアルゴリズムは新しいアイデアではないかと思われます。以下のコードでマージを完了することができます。
void merge(d_type array[],d_type work[],int left,int mid,int right) {
int i=left,j=mid,k=left;
int x=mid-1,y=right-1,z=right-1;
int kosuu=right-left,c;
for (c=kosuu/2;c>0;c--){
MIN_TO_WORK
MAX_TO_WORK
}
if (kosuu&1){//データ数が奇数のとき
if(i==x)work[k]=array[i];else work[k]=array[j];
}
}
普通にに考えると,マージの停止条件に i<=x && j<=y とういう条件が必要なように思えますが,左半分と右半分のデータ数が等しいかまたは1個だけしか違わないことから,そのような条件は不要になります。
このことは,証明することができます。左半分と右半分のデータ数が1個だけ違うときは, データ数が奇数であることから,最後に残った1個を処理します。
#include <stdio.h>
#include <stdlib.h>
#include <time.h>
void merge_sortA();
void merge_sortW();
typedef int d_type;// ソートするキーの型
#define SWAP(a,b) {d_type work=a;a=b;b=work;}
#define COMPSWAP(a,b) {if((b)<(a)) SWAP(a,b)}
#define MIN_TO_WORK \
{\
d_type temp;\
if (*array_i<=*array_j){\
temp=*(array_i++);\
} else {\
temp=*(array_j++);\
}\
*(work_k++)=temp;\
}
#define MAX_TO_WORK \
{\
d_type temp;\
if (*array_x>*array_y){\
temp=*(array_x--);\
} else {\
temp=*(array_y--);\
}\
*(work_z--)=temp;\
}
void merge(d_type array[],d_type work[],int left,int mid,int right) {
d_type *array_i=array+left,*array_j=array+mid,*work_k=work+left;
d_type *array_x=array+(mid-1),*array_y=array+(right-1),*work_z=work+(right-1);
int kosuu=right-left,c;
for (c=kosuu/2;c>0;c--){
MIN_TO_WORK
MAX_TO_WORK
}
if (kosuu%2){//データ数が奇数のとき
if(array_i==array_x) *work_k=*array_i;else *work_k=*array_j;
}
}
void insertion_sortA(d_type array[],int right) {
COMPSWAP(array[0],array[1])//高速化のため
int i;
for (i=2;i<right;i++) {
int temp=array[i];
if (array[i-1]>temp){
int j=i;
do {
array[j] = array[j-1];
j--;
} while (j>0 && array[j-1]>temp);
array[j]=temp;
}
}
}
void insertion_sortW(d_type *array,d_type *work,int right) {
if (array[0]<=array[1]){//高速化のため
work[0]=array[0];
work[1]=array[1];
}else{
work[0]=array[1];
work[1]=array[0];
}
int i;
for (i=2;i<right;i++) {
if (work[i-1]>array[i]){
int j=i;
do {
work[j] = work[j-1];
j--;
} while (j>0 && work[j-1]>array[i]);
work[j]=array[i];
}else work[i]=array[i];
}
}
#define UTIKIRI 7
void merge_sortA(d_type array[],d_type work[],int left,int right) {
int kosuu=right-left;
if (kosuu<=UTIKIRI) {
insertion_sortA(array+left,kosuu);
return;
}
int mid=left+kosuu/2;
merge_sortW(array,work,left,mid);
merge_sortW(array,work,mid,right);
merge(work,array,left,mid,right);
}
void merge_sortW(d_type array[],d_type work[],int left,int right) {
int kosuu=right-left;
if (kosuu<=UTIKIRI) {
insertion_sortW(array+left,work+left,kosuu);
return;
}
int mid=left+kosuu/2;
merge_sortA(array,work,left,mid);
merge_sortA(array,work,mid,right);
merge(array,work,left,mid,right);
}
#define MIN_TO_ARRAY \
{\
d_type temp;\
if (work[j]<=array[i]){\
temp=work[j++];\
} else {\
temp=array[i++];\
}\
array[k++]=temp;\
}
void merge0(d_type array[],d_type work[],int mid,int right) {
int i=mid,j=0,k=0;
if (work[mid-1]<=array[right-1]){
while (j<mid) MIN_TO_ARRAY
}else{
while (i<right) MIN_TO_ARRAY
while (j<mid) array[k++]=work[j++];
}
}
//追加の作業記憶領域を,データの記憶領域のほぼ1/2に抑えた
void sort(d_type array[],int right){
if (right<=3) {
if (right>=2) insertion_sortA(array,right);
return;
}
int mid=(right+1)/2;
d_type *work=malloc(mid*sizeof(d_type));
merge_sortA(array+mid,work,0,right-mid);
merge_sortW(array,work,0,mid);
merge0(array,work,mid,right);
free(work);
}
int main(){
int n=100000000;
int i;
clock_t start,end;
d_type *array=malloc(n*sizeof(d_type));
srand((unsigned) time(NULL));
for (i=0;i<n;i++){
array[i]=rand()*(RAND_MAX+1)+rand();
}
start=clock();
sort(array,n);
end=clock();
printf("%f秒 \n",(double)(end-start)/CLOCKS_PER_SEC);
for (i=1;i<n;i++) if(array[i-1]>array[i]) printf("%d %d\n",i,array[i]);
free(array);
return 0;
}
ポインタを使わない旧コードを以下に示しました。
#define MIN_TO_WORK \
{\
d_type temp;\
if (array[i]<=array[j]){\
temp=array[i++];\
} else {\
temp=array[j++];\
}\
work[k++]=temp;\
}
#define MAX_TO_WORK \
{\
d_type temp;\
if (array[x]>array[y]){\
temp=array[x--];\
} else {\
temp=array[y--];\
}\
work[z--]=temp;\
}
void merge(d_type array[],d_type work[],int left,int mid,int right) {
int i=left,j=mid,k=left;
int x=mid-1,y=right-1,z=y;
int kosuu=right-left,c;
for (c=kosuu/2;c!=0;c--){
MIN_TO_WORK
MAX_TO_WORK
}
if (kosuu%2){//データ数が奇数のとき
if(i==x)work[k]=array[i];else work[k]=array[j];
}
}
追加の作業記憶領域の削減
追加の作業記憶領域を小さくに抑えるには,上記のコードで 関数 sort( ) を次のように変えればよいでしょう。
次のコードでは,追加の作業記憶領域はデータの記憶領域のほぼ1/10に抑えられます。このときの速度は低下しますが,それでもク イックソートより速いです。
また,並列処理で高速化するときでも作業記憶領域を削減するには,もう一つ下の階層まで下記のルーチンを用いればよいでしょう。
//追加の作業記憶領域を,データの記憶領域のほぼ1/BUNKATUに抑えた
#define BUNKATU 10
void sort(d_type array[],int right){
if (right<BUNKATU) {
selection_sort(array,0,right-1);
return;
}
int mid1=right/BUNKATU;
int mid2=(right+BUNKATU-1)/BUNKATU;
int amari=right%BUNKATU;
d_type *work=malloc(mid2*sizeof(d_type));
d_type *array1=array+right-mid2;
int right1=mid2;
merge_sortA(array1,work,0,mid2);
int i;
for (i=1;i<BUNKATU;i++){
int mid;
if (i<amari) mid=mid2; else mid=mid1;
array1=array1-mid;
right1=right1+mid;
merge_sortW(array1,work,0,mid);
merge0(array1,work,mid,right1);
}
free(work);
}
最強のmergesort
整列されたデータに外れ値が入り込んでいる場合,クイックソートは高速に処理するので,マージソートでもクイックソート以上に高速に処理できるように改良 してみま した。
ただし,ランダムなデータの処理は,超高速マージソートより少しだけ遅くなりますが,それでも,クイックソートより速くソートします。
整列されたデータに外れ値が入り込んでいるデータとは,具体的には次のようなものです。
array[i]=i;
if (rand()%10==0) array[i]=(int)((double)rand()/(RAND_MAX+1)*n);
これは,確率1/10で外れ値が入り込むもので,このようなデータは高速に処理することができます。
しかし,ほとんど整列されたデータであっても,次のようなデータではクイックソートより速くは処理できません。
array[i]=i+(int)((double)rand()/(RAND_MAX+1)*100);
すべての場合に対して高速なコードを作ることは難しいので,バランスを考えて次のコードを作りました。なお,逆順データの対策はしていません。
なお,小さい部分のソートには挿入ソートを用いました。
#include <stdio.h>
#include <stdlib.h>
#include <time.h>
void merge_sortA();
void merge_sortW();
typedef int d_type;// ソートするキーの型
#define MIN_TO_WORK \
{\
d_type temp;\
if (*array_i<=*array_j){\
temp=*(array_i++);\
} else {\
temp=*(array_j++);\
}\
*(work_k++)=temp;\
}
#define MAX_TO_WORK \
{\
d_type temp;\
if (*array_x>*array_y){\
temp=*(array_x--);\
} else {\
temp=*(array_y--);\
}\
*(work_z--)=temp;\
}
void merge(d_type array[],d_type work[],int left,int mid,int right) {
d_type *array_i=array+left,*array_j=array+mid,*work_k=work+left;
d_type *array_x=array+(mid-1),*array_y=array+(right-1),*work_z=work+(right-1);
int kosuu=right-left,c,kosuu_8=kosuu/8;
if (*(array_x-kosuu_8)<=*(array_j+kosuu_8)){
for (c=kosuu_8;c>0;c--){
MIN_TO_WORK
MAX_TO_WORK
}
if (*array_x<=*array_j){
while (array_i<=array_x) *(work_k++)=*(array_i++);
while (array_j<=array_y) *(work_k++)=*(array_j++);
return;
}
for (c=kosuu/2-kosuu_8;c>0;c--){//if文の中に書かないと何故か速度低下する
MIN_TO_WORK
MAX_TO_WORK
}
}else{
for (c=kosuu/2;c>0;c--){
MIN_TO_WORK
MAX_TO_WORK
}
}
if (kosuu%2){//データ数が奇数のとき
if(array_i==array_x) *work_k=*array_i;else *work_k=*array_j;
}
}
// 挿入ソート
void insertion_sortA(d_type array[],int right) {
int i,j;
for (i=1;i<right;i++) {
d_type temp=array[i];
for (j=i;j>0 && array[j-1]>temp;j--) array[j]=array[j-1];
array[j]=temp;
}
}
// workへの挿入ソート
void insertion_sortW(d_type *array,d_type *work,int size) {
int i,j;
work[0]=array[0];
for (i=1;i<size;i++) {
for (j=i;j>0 && work[j-1]>array[i];j--) work[j]=work[j-1];
work[j]=array[i];
}
}
#define UTIKIRI 24
void merge_sortA(d_type array[],d_type work[],int left,int right) {
int kosuu=right-left;
if (kosuu<=UTIKIRI) {
insertion_sortA(array+left,kosuu);
return;
}
int mid=left+kosuu/2;
merge_sortW(array,work,left,mid);
merge_sortW(array,work,mid,right);
merge(work,array,left,mid,right);
}
void merge_sortW(d_type array[],d_type work[],int left,int right) {
int kosuu=right-left;
if (kosuu<=UTIKIRI) {
insertion_sortW(array+left,work+left,kosuu);
return;
}
int mid=left+kosuu/2;
merge_sortA(array,work,left,mid);
merge_sortA(array,work,mid,right);
merge(array,work,left,mid,right);
}
#define MIN_TO_ARRAY \
{\
d_type temp;\
if (work[j]<=array[i]){\
temp=work[j++];\
} else {\
temp=array[i++];\
}\
array[k++]=temp;\
}
void merge0(d_type array[],d_type work[],int mid,int right) {
int i=mid,j=0,k=0;
if (work[mid-1]<=array[right-1]){
while (j<mid) MIN_TO_ARRAY
}else{
while (i<right) MIN_TO_ARRAY
while (j<mid) array[k++]=work[j++];
}
}
void sort(d_type array[],int right){
if (right<=1) return;
int mid=(right+1)/2;
d_type *work=malloc(mid*sizeof(d_type));
merge_sortA(array+mid,work,0,right-mid);
merge_sortW(array,work,0,mid);
merge0(array,work,mid,right);
free(work);
}
int main(){
int n=100000000;
int i;
clock_t start,end;
d_type *array=malloc(n*sizeof(d_type));
//srand((unsigned) time(NULL));
for (i=0;i<n;i++){
array[i]=i;
if (rand()%10==0) array[i]=(int)((double)rand()/(RAND_MAX+1)*n);
//array[i]=rand()*(RAND_MAX+1)+rand();
//array[i]=i+(int)((double)rand()/(RAND_MAX+1)*100);
}
start=clock();
sort(array,n);
end=clock();
printf("%f秒 \n",(double)(end-start)/CLOCKS_PER_SEC);
for (i=1;i<n;i++) if(array[i-1]>array[i]) printf("%d %d\n",i,array[i]);
free(array);
return 0;
}
ポインタを使わない旧コードを以下に示しました。
void merge(d_type array[],d_type work[],int left,int mid,int right) {
int i=left,j=mid,k=left;
int x=mid-1,y=right-1,z=y;
int kosuu=right-left,c,s,kosuu_16=kosuu/16;
if (array[x-kosuu_16]<=array[j+kosuu_16]){
for (s=2;s!=0;s--){//2回で諦める
for (c=kosuu_16;c!=0;c--){
MIN_TO_WORK
MAX_TO_WORK
}
if (array[x]<=array[j]){
while (i<=x) work[k++]=array[i++];
while (j<=y) work[k++]=array[j++];
return;
}
}
for (c=(z-k+1)/2;c!=0;c--){//if文の中に書かないと何故か速度低下する
MIN_TO_WORK
MAX_TO_WORK
}
}else{
for (c=kosuu/2;c!=0;c--){
MIN_TO_WORK
MAX_TO_WORK
}
}
if (kosuu%2){//データ数が奇数のとき
if(i==x)work[k]=array[i];else work[k]=array[j];
}
}
N分木ヒープソート計算量がO(n^1.5)のソート(平方根ソート)(2023年2月~)
高速化を目指し,通常の二分木ヒープソートのコードを変形して,N分木ヒープソートのコードを書いてみました。
実行速度は,データ数が少ないときは,二分木ヒープソートやクイックソートより速いが,データ数が1億では,二分木ヒープソートより遅くなってしまいます。
Nの値とデータ数を変えて試してみてください。
高速挿入ソート#include <stdio.h>
#include <stdlib.h>
#include <time.h>
typedef int d_type;// ソートするキーの型
#define N 8
void downheap(d_type *array,d_type work,int parent,int child,int last) {
while (1) {
if (child<last) {
#if (N==2)
if (array[child+1]>array[child]) child++;
#else
int i=child,end=child+N-1;
if (last<end) end=last;
do {
if (array[++i]>array[child]) child=i;
} while (i<end);
#endif
} else if (child>last) {
break;
}
if (work>=array[child]) break;
array[parent]=array[child];
parent=child;
child=N*parent+1;
}
array[parent]=work;
}
void heap_sort(d_type *array, int last) {
last--;
int i;
for (i=(last-1)/N;i>=0;i--) {
downheap(array,array[i],i,N*i+1,last);
}
for (i=last;i>0;) {
d_type work=array[i];
array[i]=array[0];
downheap(array,work,0,1,--i);
}
}
#undef N
#define N 10000
#define M 10000
int main(){
int i;
clock_t start,end;
d_type *array=malloc(N*M*sizeof(d_type));
srand((unsigned) time(NULL));
for (i=0;i<N*M;i++) array[i]=rand()*(RAND_MAX+1)+rand();
start=clock();
for (i=0;i<M;i++){
heap_sort(array+N*i,N);
}
end=clock();
printf("%f秒 \n",(double)(end-start)/CLOCKS_PER_SEC);
for (i=1;i<N;i++) if(array[i-1]>array[i]) printf("%d %d\n",i,array[i]);
return 0;
}
挿入ソートを改良して若干だけ高速化したコードを紹介します。ポイントは if (array[i]<array[0]) によって場合分けすることにより,終了判定を単純にしたことです。
安定ソートで追加作業記憶領域を用いないものとしては,データ数が数十個程度のとき最速です。(2023年9月2日改訂)
void insertion_sort(d_type array[],int n) {
int i;
for (i=1;i<n;i++) {
int j=i;
d_type tmp=array[i];
if (tmp<array[0]){
do {
array[j]=array[j-1];
j--;
} while (j>0);
}else{
while (array[j-1]>tmp) {
array[j]=array[j-1];
j--;
}
}
array[j]=tmp;
}
}
超高速安全クイックソート
ランダムなデータに対して,クイックソートはデータ数が大きいときに速く,N分木ヒープソートはデータ数が中位のときに速く,挿入ソートはデータ数が少ないときに速いので,この3つのソートを組み合わせれば,互いの弱点 を補って高速なソートができます。
次のソートは,データ数が1万より大きいときはクイックソートで,クイックソートによりデータが分割されてデータ数が1万以下になったときは8分木ヒープ ソートで,データ数が少ないときは挿入ソートで整列するものです。
また,クイックソートの再帰の深さが深く成り過ぎたときに8分木ヒープソートに切り替えることにより,時間計算量は最悪でも O(n log n) となるようにしています。(イントロソート)
なお,イントロソートと同じ方法は,「安全クイックソー ト」として 野崎 昭弘氏 と 杉本 俊彦氏により,1980年に情報処理学会論文誌に発表されています。
#include <stdio.h>
#include <stdlib.h>
#include <time.h>
typedef int d_type;// ソートするキーの型
void insertion_sort(d_type array[],int n) {
int i;
for (i=1;i<n;i++) {
int j=i;
d_type tmp=array[i];
if (tmp<array[0]){
do {
array[j]=array[j-1];
j--;
} while (j>0);
}else{
while (array[j-1]>tmp) {
array[j]=array[j-1];
j--;
}
}
array[j]=tmp;
}
}
void swap(d_type *a,d_type *b) {
d_type temp=*a;
*a=*b;
*b=temp;
}
#define N 8
void downheap1(d_type *array,int parent,int last) {
d_type work=array[parent];
while (1) {
int child=N*parent+1;
if (child<last) {
int i=child,end=child+N-1;
if (last<end) end=last;
do {
if (array[++i]>array[child]) child=i;
} while (i<end);
} else if (child>last) {
break;
}
if (work>=array[child]) break;
array[parent]=array[child];
parent=child;
}
array[parent]=work;
}
void downheap2(d_type *array,d_type work,int last) {
int parent=0,child=1;
while (1) {
if (child<last) {
int i=child,end=child+N-1;
if (last<end) end=last;
do {
if (array[++i]>array[child]) child=i;
} while (i<end);
} else if (child>last) {
break;
}
if (work>=array[child]) break;
array[parent]=array[child];
parent=child;
child=N*parent+1;
}
array[parent]=work;
}
void heap_sort(d_type *array, int last) {
int i;
for (i=(last-1)/N;i>=0;i--) {
downheap1(array,i,last);
}
d_type work;
i=last;
while (1) {
work=array[i];
array[i]=array[0];
if (i<35) break;
downheap2(array,work,--i);
}
array[0]=work;
insertion_sort(array,i);
}
#undef N
d_type med3(d_type x,d_type y,d_type z){
if (x<y){
if (y<z) return y;
else if (z<x) return x;
}else{
if (z<y) return y;
else if (x<z) return x;
}
return z;
}
#define MIN 10000
void safe_quick_sort(d_type *array,int begin,int end,int maxDepth) {
int range=end-begin;
if (range<=MIN || maxDepth<=0){
heap_sort(array+begin,range);
return;
}
d_type pivot=med3(array[begin],array[begin+range/2],array[end]);
int l=begin,r=end;
while(1){
while(array[l]<pivot) l++;
while(array[r]>pivot) r--;
if(l>=r) break;
swap(&array[l],&array[r]);
l++;r--;
}
maxDepth--;
safe_quick_sort(array,begin,r,maxDepth);//l=r+1
safe_quick_sort(array,r+1,end,maxDepth);
}
void quick_sort(d_type *array,int n) {
if (n<75){
insertion_sort(array,n);
return;
}
int M=MIN,maxDepth=0;
while (M<n) {//maxDepth=2*log2(n/MIN)
M=M*2;
maxDepth=maxDepth+2;
}
safe_quick_sort(array,0,n-1,maxDepth);
}
#undef MIN
#define N 10000000
#define M 10
int main(){
int i;
clock_t start,end;
d_type *array=malloc(N*M*sizeof(d_type));
srand((unsigned) time(NULL));
for (i=0;i<N*M;i++) array[i]=rand()*(RAND_MAX+1)+rand();
start=clock();
for (i=0;i<M;i++){
quick_sort(array+N*i,N);
}
end=clock();
printf("%f秒 \n",(double)(end-start)/CLOCKS_PER_SEC);
for (i=1;i<N;i++) if(array[i-1]>array[i]) printf("%d %d\n",i,array[i]);
return 0;
}
選択ソートと挿入ソートの考え方を用いて、選択ソートの高速化バージョンを作成してみました。内部ソートですが安定ソートでは ありません。
特徴は,理論上の平均計算量と最悪計算量がO(n^1.5)であるこ とです。データ数の平方根を用いることにより計算量がO(n*sqrt(n))となったので,平 方 根ソートと呼ぶことにします。
データ数が1000個程度までなら使える速度ですが、このソートよりも超高速安全クイックソートの方が高速です。
#include <stdlib.h>
#include <stdio.h>
#include <math.h>
#include <time.h>
typedef int d_type;// ソートするキーの型
void sqrt_sort(d_type array[],int n) {
int i,j,min;
d_type tmp;
if (n<=25){//選択ソート
for (i=0;i<n-1;i++){
min=n-1;
for (j=min-1;j>=i;j--){
if (array[j]<array[min]) min=j;
}
tmp=array[i];
array[i]=array[min];
array[min]=tmp;
}
}else{
int sqn=sqrt(n/2);
for (i=n-sqn;i>0;i-=sqn){
sqrt_sort(array+i,sqn);
}
sqrt_sort(array,i+sqn);
for (i=0;i<n-sqn;i++){
min=n-sqn;
for (j=min-sqn;j>i;j-=sqn){
if (array[j]<array[min]) min=j;
}
if (array[min]<array[i]){
tmp=array[i];
array[i]=array[min];
for (j=min+1;j<min+sqn && array[j]<tmp;j++){
array[j-1]=array[j];
}
array[j-1]=tmp;
}
}
}
}
#define N 1000
#define M 100000
int main(){
int i;
clock_t start,end;
d_type *array=malloc(N*M*sizeof(d_type));
srand((unsigned) time(NULL));
for (i=0;i<N*M;i++) array[i]=rand()*(RAND_MAX+1)+rand();
start=clock();
for (i=0;i<M;i++){
sqrt_sort(array+N*i,N);
}
end=clock();
printf("%f秒 \n",(double)(end-start)/CLOCKS_PER_SEC);
for (i=1;i<N;i++) if(array[i-1]>array[i]) printf("%d %d\n",i,array[i]);
return 0;
}
下記のコードは,4種類のソートを用いたもので,それぞれソートには次の表のような特徴があります。なお,O( )はランダウの記号(Order)です。
名称
平均計算時間
最悪計算時間
追加作業記憶領域
安定ソート
ソート速度
マージソート
O(n logn) O(n logn)
必要 O(n)
〇
ランダムなデータに対して最速。
クイックソート
O(n logn)
O(n^2) 必要ない
×
マージソートより数割遅い。ほとんど整列されたデータに対して高 速。
ヒープソート
O(n logn) O(n logn) 必要ない
×
8分木ヒープソートは,データ数が少ないときクイックソートより速い。
挿入ソート
O(n^2) O(n^2) 必要ない 〇
データ数が少ないときは最速。ほとんど整列されたデータに対して高 速。
次のコードは,通常はマージソートと挿入ソートのみが使われる安定ソートです。しかし,メモリ不足が起きたときに,それを回避するために安全クイックソート を使います。全体としての最悪計算時間はO(n logn)で.,スタックの深さはO(log n)です。どこにも弱点がないソートであり,世界最強・最速のソートであるといえます。
作業記憶領域がデータ領域の1/32も取れない場合に,下にある最強のクイックソートに自動的に変更するようにしました。
マージソート,挿入ソートは安定ソートですが,クイックソート,ヒープソートは安定ソートではありません。以下のコードは,安定ソートでソートされたとき は0,そうでないときは1を返します。
なお,108個のランダムなデータに対して,マージソートの方がクイックソートより遅くなる限界は,作業記憶領域がデータ領域の 1/32くらいのときです。
#include <stdio.h>
#include <stdlib.h>
#include <time.h>
void merge_sortA();
void merge_sortW();
typedef int d_type;// ソートするキーの型
#define N 8
void downheap(d_type *array,d_type work,int parent,int last) {
while (1) {
int child=N*parent+1;
if (child<last) {
int i=child,end=child+N-1;
if (last<end) end=last;
do {
if (array[++i]>array[child]) child=i;
} while (i<end);
} else if (child>last) {
break;
}
if (work>=array[child]) break;
array[parent]=array[child];
parent=child;
}
array[parent]=work;
}
void heap_sort(d_type *array, int last) {
int i;
for (i=(last-1)/N;i>=0;i--) {
downheap(array,array[i],i,last);
}
for (i=last;i>0;) {
d_type work=array[i];
array[i]=array[0];
downheap(array,work,0,--i);
}
}
#undef N
d_type med3(d_type x,d_type y,d_type z){
if (x<y){
if (y<z) return y;
else if (z<x) return x;
}else{
if (z<y) return y;
else if (x<z) return x;
}
return z;
}
void swap(d_type *a,d_type *b) {
d_type temp=*a;
*a=*b;
*b=temp;
}
#define MIN 10000
void safe_quick_sort(d_type *array,int begin,int end,int maxDepth) {
int range=end-begin;
if (range<=MIN || maxDepth<=0){
heap_sort(array+begin,range);
return;
}
d_type pivot=med3(array[begin],array[begin+range/2],array[end]);
int l=begin,r=end;
while(1){
while(array[l]<pivot) l++;
while(array[r]>pivot) r--;
if(l>=r) break;
swap(&array[l],&array[r]);
l++;r--;
}
safe_quick_sort(array,begin,r,maxDepth-1);//l=r+1
safe_quick_sort(array,r+1,end,maxDepth-1);
}
void quick_sort(d_type *array,int n) {
int M=MIN,maxDepth=0;
while (M<n) {//maxDepth=2*log2(n/MIN)
M=M*2;
maxDepth=maxDepth+2;
}
safe_quick_sort(array,0,n-1,maxDepth);
}
#undef MIN
//安全クイックソートはここまで,ここからマージソート
#define MIN_TO_WORK \
{\
d_type temp;\
if (*array_i<=*array_j){\
temp=*(array_i++);\
} else {\
temp=*(array_j++);\
}\
*(work_k++)=temp;\
}
#define MAX_TO_WORK \
{\
d_type temp;\
if (*array_x>*array_y){\
temp=*(array_x--);\
} else {\
temp=*(array_y--);\
}\
*(work_z--)=temp;\
}
void merge(d_type array[],d_type work[],int left,int mid,int right) {
d_type *array_i=array+left,*array_j=array+mid,*work_k=work+left;
d_type *array_x=array+(mid-1),*array_y=array+(right-1),*work_z=work+(right-1);
int kosuu=right-left,c,kosuu_8=kosuu/8;
if (*(array_x-kosuu_8)<=*(array_j+kosuu_8)){
for (c=kosuu_8;c>0;c--){
MIN_TO_WORK
MAX_TO_WORK
}
if (*array_x<=*array_j){
while (array_i<=array_x) *(work_k++)=*(array_i++);
while (array_j<=array_y) *(work_k++)=*(array_j++);
return;
}
for (c=kosuu/2-kosuu_8;c>0;c--){//if文の中に書かないと何故か速度低下する
MIN_TO_WORK
MAX_TO_WORK
}
}else{
for (c=kosuu/2;c>0;c--){
MIN_TO_WORK
MAX_TO_WORK
}
}
if (kosuu%2){//データ数が奇数のとき
if(array_i==array_x) *work_k=*array_i;else *work_k=*array_j;
}
}
#undef MIN_TO_WORK
#undef MAX_TO_WORK
// 挿入ソート
void insertion_sortA(d_type array[],int size) {
int i,j;
for (i=1;i<size;i++) {
d_type temp=array[i];
for (j=i;j>0 && array[j-1]>temp;j--) array[j]=array[j-1];
array[j]=temp;
}
}
// workへの挿入ソート
void insertion_sortW(d_type *array,d_type *work,int size) {
int i,j;
work[0]=array[0];
for (i=1;i<size;i++) {
for (j=i;j>0 && work[j-1]>array[i];j--) work[j]=work[j-1];
work[j]=array[i];
}
}
#define UTIKIRI 15
void merge_sortA(d_type array[],d_type work[],int left,int right) {
int kosuu=right-left;
if (kosuu<=UTIKIRI) {
insertion_sortA(array+left,kosuu);
return;
}
int mid=left+kosuu/2;
merge_sortW(array,work,left,mid);
merge_sortW(array,work,mid,right);
merge(work,array,left,mid,right);
}
void merge_sortW(d_type array[],d_type work[],int left,int right) {
int kosuu=right-left;
if (kosuu<=UTIKIRI) {
insertion_sortW(array+left,work+left,kosuu);
return;
}
int mid=left+kosuu/2;
merge_sortA(array,work,left,mid);
merge_sortA(array,work,mid,right);
merge(array,work,left,mid,right);
}
#undef UTIKIRI
#define MIN_TO_ARRAY \
{\
d_type temp;\
if (work[j]<=array[i]){\
temp=work[j++];\
} else {\
temp=array[i++];\
}\
array[k++]=temp;\
}
void merge0(d_type array[],d_type work[],int mid,int right) {
int i=mid,j=0,k=0;
if (work[mid-1]<=array[right-1]){
while (j<mid) MIN_TO_ARRAY
}else{
while (i<right) MIN_TO_ARRAY
while (j<mid) array[k++]=work[j++];
}
}
#undef MIN_TO_ARRAY
//追加の作業記憶領域を,データの記憶領域のほぼ1/BUNKATUに抑えた
#define BUNKATU_MAX 32
int sort(d_type array[],int size) {
if (size<BUNKATU_MAX) {
insertion_sortA(array,size);
return 0;
}
int BUNKATU=1;
int mid1,mid2,amari;
d_type *work;
do {
BUNKATU++;
mid1=size/BUNKATU;
mid2=(size+BUNKATU-1)/BUNKATU;
amari=size%BUNKATU;
work=malloc(mid2*sizeof(d_type));
}while (work==NULL && BUNKATU<BUNKATU_MAX);
if (work==NULL) {//メモリ不足の場合
quick_sort(array,size);
return 1;
}
d_type *array1=array+size-mid2;
int right1=mid2;
merge_sortA(array1,work,0,mid2);
int i;
for (i=1;i<BUNKATU;i++){
int mid;
if (i<amari) mid=mid2; else mid=mid1;
array1=array1-mid;
right1=right1+mid;
merge_sortW(array1,work,0,mid);
merge0(array1,work,mid,right1);
}
free(work);
return 0;
}
#undef BUNKATU_MAX
//マージソートはここまで
int main() {
int n=100000000;
int i;
clock_t start,end;
d_type *array=malloc(n*sizeof(d_type));
srand((unsigned) time(NULL));
for (i=0;i<n;i++){
//array[i]=i;
//if (rand()%10==0) array[i]=(int)((double)rand()/(RAND_MAX+1)*n);
array[i]=rand()*(RAND_MAX+1)+rand();
//array[i]=i+(int)((double)rand()/(RAND_MAX+1)*100);
}
start=clock();
sort(array,n);
end=clock();
printf("%f秒 \n",(double)(end-start)/CLOCKS_PER_SEC);
for (i=1;i<n;i++) if(array[i-1]>array[i]) printf("%d %d\n",i,array[i]);
//for (i=0;i<n;i++) printf("%d %d\n",i,array[i]);
free(array);
return 0;
}
① データ数が少ない場合に中央値を高速に求める関数を作りました。この関数は,27個のデータの中央値を求める3D median filter用 としては世界最速と思われます。
② クイックソートのを変形して,大量のデータに対して中央値を求めるクイックセレクト(quickselect)の関数を作り ました。
中央値を返す関数 median について
原理は,例えば9個のデータから,小さい方から5個のデータを抽出し,それら5個のデータの最大値を求めれば,それが中央値であるというものです。
実装では,高速化のための工夫を加えていますが,原型は次のものです。
#define KOSUU 9この関数が中央値を返すことを解説します。
#define MID (KOSUU-1)/2
value_type median(value_type *a)
{
int i,j,imax=0;
for (i=1;i<=MID;i++) if (a[i]>a[imax]) imax=i;
for (;i<KOSUU;i++){
if (a[i]<a[imax]) {
a[imax]=a[i];
imax=MID;
for (j=MID-1;j>=0;j--) if (a[j]>a[imax]) imax=j;
}
}
return a[imax];
}
例えば,データをprintf("%u\n", array[i]);
a0,a1,a2,a3,a4,a5,a6,a7,a8
とします。まず,
imax=0;
for (i=1;i<=MID;i++) if (a[i]>a[imax]) imax=i;
により,a[imax]が a0,a1,a2,a3,a4 の最大値になります。
次に,
for (;i<KOSUU;i++){
if (a[i]<a[imax]) {
a[imax]=a[i];
imax=MID;
for (j=MID-1;j>=0;j--) if (a[j]>a[imax]) imax=j;
}
}
によって,a[imax]を,次々に小さいものに置き換えていきます。
その結果,a0,a1,a2,a3,a4 がデータのうちの小さい方の5個となり,a[imax]がそれらの最大値となりますから,a[imax]は中央値です。
実装した関数で,赤で示した部分が,高速化のための工夫です。
value_type median(value_type *a)赤の部分の処理により,a[i]<a[imax]である確率が小さくなり高速化します。ただし,データ数が少ないときは,この処理のために遅くなり ます。
{
int i,j,imax;
for (i=0;i<=MID-1;i++) COMPSWAP(a[i],a[i+MID+1])
for (i=1;i<=MID;i++) COMPSWAP(a[i],a[i+MID])
imax=MID;
for (j=MID-1;j>=0;j--) if (a[j]>a[imax]) imax=j;
for (;i<KOSUU;i++){
if (a[i]<a[imax]) {
a[imax]=a[i];
imax=MID;
for (j=MID-1;j>=0;j--) if (a[j]>a[imax]) imax=j;
}
}
return a[imax];
}
2D median filter,3D median filter用として
中央値を求める関数medianは,2D median filter,3D median filter 用の中央値を求める関数として使え,データ数が27個の3D median filter 用としては,他に良いアルゴリズムがないため世界最速かと思われます。
実際,http://ndevilla.free.fr/median/median/index.html や http://www.disnetwork.info/the-blog/median-value-selection-algorithm にある関数より3割くらい高速です。
なお,gccのコンパイラの最適化レベルを最高にして実行速度を測定しています。最適化を行わない場合は,この関数は最速にはなりません。
ベンチマークのために, 下記に他の関数と共に示しました。
medianiq QuickSelectを,対象データ数が少なくなったところでmedian に変更するもので,データ数が大きい場合でも高速です。
medianq0 QuickSelectよって途中まで並べ替えてから,挿入ソートによって並べ替えて中央値を求める関数です。
medianb バブルソートによって途中まで並べ替えて中央値を求める関数です。
medians 選択ソートによって途中まで並べ替えて中央値を求める関数です。
mediani 挿入ソートによって途中まで並べ替えてから中央値を求める関数です。
median9 データ数が少ないときにはmdianより高速で,2D median filter用です。
なお,cpuとコンパイラとに依りますが,CPUがamdのFX8370eでgccのコンパイラの最適化レベルを最高で行ったとき,データ数が27個で は,処理時間の大小関係は
median<median9 <QuickSelect<挿入ソート <選択ソート<バブルソート
です。ただし,データ数が11個以下ではmedian9が,37個以上ではmedianqが最速となります。
#include <stdio.h>printf("%u\n", array[i]);
#include <stdlib.h>
#include <time.h>
#define SWAP(a,b) {value_type work=a;a=b;b=work;}
#define COMPSWAP(a,b) {if((b)<(a)) SWAP(a,b)}
#define SORT3(a,b,c) {COMPSWAP(a,b);COMPSWAP(a,c);COMPSWAP(b,c)}
typedef unsigned long value_type; // ソートするキーの型
#define KOSUU 27
#define MID (KOSUU-1)/2
value_type median(value_type *a)
{
int i,j,imax;
for (i=0;i<=MID-1;i++) COMPSWAP(a[i],a[i+MID+1])
for (i=1;i<=MID;i++) COMPSWAP(a[i],a[i+MID])median
imax=MID;
for (j=MID-1;j>=0;j--) if (a[j]>a[imax]) imax=j;
for (;i<KOSUU;i++){
if (a[i]<a[imax]) {
a[imax]=a[i];
imax=MID;
for (j=MID-1;j>=0;j--) if (a[j]>a[imax]) imax=j;
}
}
return a[imax];
}
value_type median9(value_type *a)
{
int i,j,imax=0;
for (i=1;i<=MID;i++) if (a[i]>a[imax]) imax=i;
for (;i<KOSUU;i++){
if (a[i]<a[imax]) {
a[imax]=a[i];
imax=MID;
for (j=MID-1;j>=0;j--) if (a[j]>a[imax]) imax=j;
}
}
return a[imax];
}
#define KYORI 10
value_type medianq(value_type *a)
{
int begin=0,end=KOSUU-1;
while (1){
int middle=(begin+end)/2;
SORT3(a[middle],a[begin],a[end]) //pivot=a[begin];
int i=begin,j=end;
while(1){
do i++; while(a[i]<a[begin]);
do j--; while(a[j]>a[begin]);
if(i>=j) break;
SWAP(a[i],a[j])
}
if (i>=MID+2){
a[begin]=a[i-1];
end=i-2;
if (end<=MID+KYORI){
int i,j,imax=MID;
for (j=MID-1;j>=begin;j--) if (a[j]>a[imax]) imax=j;
for (i=end;i>=MID+1;i--){
if (a[i]<a[imax]) {
a[imax]=a[i];
imax=MID;
for (j=MID-1;j>=begin;j--) if (a[j]>a[imax]) imax=j;
}
}
return a[imax];
}
}else if (j<=MID-1){
begin=j+1;
if (begin>=MID-KYORI){
int i,j,imin=MID;
for (j=MID+1;j<=end;j++) if (a[j]<a[imin]) imin=j;
for (i=begin;i<=MID-1;i++){
if (a[i]>a[imin]) {
a[imin]=a[i];
imin=MID;
for (j=MID+1;j<=end;j++) if (a[j]<a[imin]) imin=j;
}
}
return a[imin];
}
}else{
return a[begin];
}
}
}
value_type medianq0(value_type *a)
{
int begin=0,end=KOSUU-1,range,i,j;
do{
int middle=(begin+end)/2;
SORT3(a[middle],a[begin],a[end])//pivot=a[begin];
i=begin+1,j=end;
SWAP(a[i],a[middle])
while(1){
do i++; while(a[i]<a[begin]);
do j--; while(a[j]>a[begin]);
if(i>=j) break;
SWAP(a[i],a[j])
}
if (i>=MID+2){
a[begin]=a[i-1];
end=i-2;
}else if (j<=MID-1){
begin=j+1;
}else{
return a[begin];
}
range=end-begin;
}while(range>9);
for (i=begin+1;i<=end;i++) {
value_type work=a[i];
j=i;
while (a[j-1]>work){
a[j]=a[j-1];
j--;
}
a[j]=work;
}
return a[MID];
}
value_type mediani(value_type *a)
{
int i;
for (i=1;i<=MID;i++) {
value_type work=a[i];
int j=i;
while (j>0 && a[j-1]>work){
a[j]=a[j-1];
j--;
}
a[j]=work;
}
for (;i<KOSUU;i++) {
value_type work=a[i];
int j=MID+1;
while (j>0 && a[j-1]>work){
a[j]=a[j-1];
j--;
}
a[j]=work;
}
return a[MID];
}
value_type medians(value_type *a)
{
int i,j;
for (i=0;i<=MID;i++){
int min=i;
for (j=KOSUU-1;j>i;j--){
if (a[j]<a[min])min=j;
}
SWAP(a[i],a[min])
}
return a[MID];
}
value_type medianb(value_type *a)
{
int i,j;
for (i=0;i<=MID;i++){
for (j=KOSUU-1;j>i;j--){
COMPSWAP(a[i],a[j])
}
}
return a[MID];
}
int main()
{
int i,j;
clock_t start,end;
value_type a[KOSUU],s=0;
start=clock();
value_type x=314159265;
for (i=0;i<10000000;i++){
for (j=0;j<KOSUU;j++){
x=x*1103515245+12345;
a[j]=x>>20;
}
s=s+median(a);
}
end=clock();
printf("%f秒 \n", (double)(end-start)/CLOCKS_PER_SEC);
printf("%u\n", s);
return 0;
}
大量のデータに対して中央値を求めるコードで,QuickSortのコードを変形したものです。
#include <stdio.h>
#include <stdlib.h>
#include <time.h>
#define SWAP(a,b) {value_type work=a;a=b;b=work;}
#define COMPSWAP(a,b) {if((b)<(a)) SWAP(a,b)}
typedef unsigned long value_type; // ソートするキーの型
value_type median3(value_type *a,int x,int y,int z)
{
COMPSWAP(a[x],a[y])
COMPSWAP(a[x],a[z])
COMPSWAP(a[y],a[z])
return a[y];
}
value_type quick_select(value_type *array,int n)
{
int begin=0,end =n-1,middle=end/2;
while (1){
int range=end-begin;
value_type pivot=median3(array,begin,begin+range/2,end);
int i=begin,j=end;
while(1){
while(array[i]<pivot) i++;
while(array[j]>pivot) j--;
if(i>=j) break;
SWAP(array[i],array[j])
i++;j--;
}
if (i>=middle+1){
end=i-1;
}else if (j<=middle-1){
begin=j+1;
}else{
return pivot;
}
}
}
int main()
{
int n=100000001;
int i;
clock_t start,end;
value_type *array=malloc(n*sizeof(value_type));
for (i=0;i<n;i++)
array[i]=rand()*(RAND_MAX+1)+rand();
start=clock();
value_type median=quick_select(array,n);
end=clock();
printf("%f秒 \n", (double)(end-start)/CLOCKS_PER_SEC);
printf("%u\n", median);
free(array);
return 0;
}