一亿个数中选出最大的1000个数

Published On March 20, 2016

category algorithm | tags sort


要求:效率尽可能地高,占用的额外空间尽可能的少,不能使用任何库函数。

这个问题已经是老生常谈了,今天做一下整理。本文对三种算法进行了对比,分别是最小堆、链表、快排。

1. 算法思想

算法1:最小堆

前1000个数构建小顶堆,遍历剩下的所有数,如果某个数大于堆顶,弹出堆顶并插入该值,然后调整堆。

补充

小顶堆也就是存储在数组中的完全二叉树,并且任意结点的值比它的左孩子和右孩子结点小。

时间复杂度

建立M个元素的堆在最坏情况下的时间复杂度为O(M),遍历剩下的N-M个元素,每次循环调用Heaplify,最多做(N-M)0logM次比较,因此该算法的时间复杂度为O(NlogM)。

算法2:维持一个长度为1000的有序链表

将前1000个数插入链表,头结点保存这1000个数中的最小的数。遍历剩余的数,每次都和头结点比较,大于头结点的话就移除头结点并插入该数并。

时间复杂度

如果以元素之间的比较作为基本运算,最坏的情况显然是是输入的数按值递增,比较次数为NxM,最好的情况正好相反,为N-1。取他们的平均值作为随机情况下的比较次数,约为NxM/2。但是,实际上远远小于这个数,因为越到后面链表保存的数越大,很多元素都只需和头结点的元素比较一次就pass掉了,这也就是为什么这种算法的时间复杂度最低。

以上两个算法的思想相同,就是往某个限定容量的容器中不断加入新的元素,同时淘汰最小的元素。

算法3:用部分快速排序(从大到小排)

因为快排是分块有序的,每次划分会有(a,...,b,c,d,...,e)其中a..b是无序的,但都大于等于c,d..e是无序的,但都小于c, 如果a..b的个数等于1000,则a..b就是所要的,如果a..b的个数大于1000,此时只要继续a..b做快速排序(相同动作),找出前1000个,如果a..b及c的个数n不足1000,则再取d..e最大的前1000-n个数就可以了(递归,相同处理)。

补充

快速排序在划分的时候以首元素为参照,如果改成随机快排(随机选取参照元素),效果并没有改善。快排的缺点是必须将所有数据加载到内存。

时间复杂度

O(Nlog(N/M)

2. 测试结果

随机数的产生

在我的系统中RAND_MAX=32767,太小了,所以我没有rand函数,而是使用了C++的随机数引擎。但是需要注意的是随机数引擎在使用默认种子的情况下是伪随机的,也就是说两次生成的随机序列完全一样。

随机数保存在内存中

1亿的数组占用的存储空间很大,那到底有多大呢?容量为1x10^8 的int型数组占用的内存空间为4x10^8 个字节约等于400Mb,不能以普通数组的形式保存在栈上,但可以保存在堆上,也就是malloc或者new。
  我一开始是将1亿个数输出到文件中,然后再从文件中一个一个读入,两种算法耗时在6分钟左右。可见与IO的耗时相比,算法本身的完全可以忽略不计。

比较结果

算法 时间(s)
Heap 3.241
LinkList 0.285
Quick 1.164
partial_sort 5.879

注:这里的partial_sort是algorithm头文件中的部分排序函数

3. 总结

综上,性能最好的算法就是用插入排序法,也就是链表。不仅时间复杂度低,而且数据不需要一次性加载到内存。

4. 最后附上C++代码

#include<iostream>
#include <time.h>
#include<random>
#include<algorithm>
using namespace std;

#define N 100000000
#define M 1000
void Heap_FindMaxM(int A[], vector<int>& vec);
void Insert_FindMaxM(int A[], vector<int>& vec);
void Quick_FindMaxM(int A[], int p, int r, vector<int>& vec);
bool comp(int i, int j) { return (i > j); }
int main()
{
    int* A = new int[N + 1];
    uniform_int_distribution<unsigned> u(0, 1000000000);//生成0~10亿均匀的随机无符号整数
    default_random_engine e;
    for (size_t i = 1; i <= N; i++)
        A[i] = u(e);
    cout << "Random numbers generated!" << endl;
    vector<int> vec1, vec2, vec3;
    clock_t t = clock();
    Heap_FindMaxM(A, vec1);
    t = clock() - t;
    cout << (double)t / CLOCKS_PER_SEC << " seconds!" << endl;

    t = clock();
    Insert_FindMaxM(A, vec2);
    t = clock() - t;
    cout << (double)t / CLOCKS_PER_SEC << " seconds!" << endl;

    t = clock();
    Quick_FindMaxM(A, 1, N, vec3);
    t = clock() - t;
    cout << (double)t / CLOCKS_PER_SEC << " seconds!" << endl;

    //t = clock();
    //partial_sort(A+1, A+M+1, A+N+1, comp);
    //t = clock() - t;
    //for (int i = 1; i <= M; i++)
    //  vec3.push_back(A[i]);
    //cout << "partial_sort takes"<<(double)t / CLOCKS_PER_SEC << " seconds!" << endl;

    sort(vec1.begin(), vec1.end());
    sort(vec3.begin(), vec3.end());
    if (vec1 == vec2&&vec2 == vec3)
        cout << "Right!" << endl;
    delete[] A;
    return 0;
}

/************Heap************/
void Heaplify(int H[], int i)
{
    int j = 2 * i;//j为左孩子的序号
    int temp = H[i];
    while (j <= M)
    {
        if (j<M&&H[j]>H[j + 1])
            j++;
        if (temp<H[j])
            break;
        H[j / 2] = H[j];//把孩子节点移到父节点的位置
        j *= 2;
    }
    H[j / 2] = temp;
}
void Heap_FindMaxM(int A[], vector<int>& vec)
{
    int i, H[M + 1];
    for (i = 1; i <= M; i++)
    {
        H[i] = A[i];
    }
    for (i = M / 2; i >= 1; i--)//建堆
        Heaplify(H, i);
    for (i = M + 1; i <= N; i++)
    {
        if (A[i]>H[1])
            H[1] = A[i];//弹出堆顶,换入新的数
        Heaplify(H, 1);//调整堆
    }

    int count = 0;
    for (i = 1; i <= M; i++)
        vec.push_back(H[i]);
    cout << "Heap finished in ";
}

/************LinkList************/
typedef struct node
{
    int data;
    struct node *link;
}LNode, *LinkList;
void Insert(int data, LinkList &head)
{
    LinkList p, q, r = NULL;
    p = (LinkList)malloc(sizeof(LNode));
    p->data = data;

    if (head == NULL || data < head->data)
    {
        p->link = head;
        head = p;
    }
    else
    {
        q = head;
        while (q != NULL&&data >= q->data)
        {
            r = q;
            q = q->link;
        }
        p->link = q;//将新节点插入到r节点之后
        r->link = p;
    }
}
void Insert_FindMaxM(int A[], vector<int>& vec)
{
    int i;
    LinkList head = NULL, p;
    for (i = 1; i <= M; i++)
    {
        Insert(A[i], head);
    }
    while (++i <= N)
    {
        if (A[i] > head->data)
        {
            p = head;
            head = head->link;
            free(p);
            Insert(A[i], head);
        }
    }
    //销毁线性链表
    p = head;
    int count = 0;
    while (p != NULL)
    {
        vec.push_back(head->data);
        count++;
        head = p->link;
        free(p);
        p = head;
    }
    if (count != M)
        cerr << "Error in linklist_findmaxM" << endl;
    cout << "Linklist finished in ";
}

/************Quick************/
/*
输入:A[p,r]是待划分的数组
输出:j,A的首元素在排好序的数组中的位置
*/
int partition(int A[], int p, int r)
{
    int i = p, j = r + 1, temp;
    while (1)
    {
        while (A[++i] >= A[p]);
        while (A[--j] < A[p]);
        if (i < j)
        {
            temp = A[i];
            A[i] = A[j];
            A[j] = temp;
        }
        else
        {
            temp = A[p];
            A[p] = A[j];
            A[j] = temp;
            return j;
        }

    }
}
void Quick_FindMaxM(int A[], int p, int r, vector<int>& vec)
{
    if (p >= r)
        return;
    int q = partition(A, p, r);
    if (q == M)
    {
        cout << "QUICK finished in ";
        for (int i = 1; i <= M; i++)
            vec.push_back(A[i]);
    }
    else if (q > M)
        Quick_FindMaxM(A, p, q - 1, vec);
    else
        Quick_FindMaxM(A, q + 1, r, vec);
}

qq email facebook github
© 2018 - Xurui Yan. All rights reserved
Built using pelican