Monday 31 October 2016

Data Structure and Algorithm - Detect Missing Numbers in a Large Set

1. Problem Description
This is an Amazon interview question for senior software development engineer from careercup. Here is the original thread,
"
Given an array of 1 billion numbers with just a few 100 missing, find all the missing numbers. you have only enough memory to store only 10k elements
PS October 07, 2016 in United States Report Duplicate | Flag ".


2. Data Structure and Algorithm
Assuming that 1 billion numbers are given starting from 1. The 10K memory is in unit of C++ 32-bit integer.

The first approach:
The idea is very simple. Simply go through the number stream and detect numbers from 1, 2, ..., until 1 billion. 10K integer and use each bit to represent one number, therefore total range is 32K
    - Detect number 1, 2, ..., 32K
    - Detect number 32K+1, 32K+2, ..., 64K
    - Detect number 64K+1, 64K+2, ..., 96K
    - Keep until reach 1 billion

The computation complexity is O(N*(N/M), where
    -  N is the size of number stream and
     - M is the size of memory

The second approach:
Comparing with the first approach, instead of searching numbers one by one in the number stream this approach is to narrow down the range in missing number. Until the range is not greater than the given memory, then linear search the number stream to find out the missing number in the range.
    - Take the mid number and missing number in the first and second half.
         Set MID = (LowerBound+UpperBound)/2
         Start with (1+1000000000)/2 = 500000000
    - Find out count of number <= MID and count of number > MID
        [LowerBound, MID] and [MID+1, UpperBound]
    - Discard the range, if there is no missing number in this range
        If count of number in [LowerBound, MID] is equal to MID - LowerBound + 1
        If count of number in [MID+1, UpperBound] is equal to UpperBound - MID
    - Keep above 3 steps until the range is under the mem size
        In this case until the range is under 32K
    - Go through each range to find out the missing number.

The computation complexity is O(N(P + log(N/M)), where
    - N is the size of number stream
    - M is the size of memory
    - P is the size of missing number
Of course this approach takes extra memory to save the P ranges.

The third approach:
This is an improvement of the second approach. Instead of dividing the stream into 2 parts this approach is to directly divided the number stream into N/(M << 5) parts.
The second approach guarantees that the range is under mem size (32K), however the third approach guarantee that the range is equal to mem size. Therefore reduce the number of range and then reduce the times of going through the number stream linearly to find out the missing number.

The computation complexity is O(N*P), where,
    - N is the size of number stream
    - P is the size of missing number.

Section 3 shows the C++ implementation. And a Python implementation can be found here, Data Structure and Algorithm - Detect Missing Numbers in a Large Set.

3. C++ Implementation
An interface called "NumberStream" is implemented to act as the 1 billion numbers. A derived class called "NumberGenerator" is implemented to return the numbers with range of 1 billion with missing numbers.

A loop and recursive solution are implemented for Approach 2. A loop implementation is implemented for Approach 3. The test is based on a smaller scale, with 1 million number with 100 missing and 10000/100000/1000000 memory available.

// ********************************************************************************
// Implementation
// ********************************************************************************
// NumberStream.h  -- interface to get numbers
#pragma once

#include <unordered_set>

class NumberStream
{
public:
    using MissingNumberCollection = std::unordered_set<size_t>;

    NumberStream(const size_t upperBound, const size_t missing)
        : m_UpperBound(upperBound)
        , m_Missing(missing)
    {}
    virtual ~NumberStream() {}
    virtual bool HasNext() const = 0;
    virtual size_t GetNext() const = 0;
    virtual const MissingNumberCollection& GetMissingCollection() const = 0;
    virtual void ResetToHead() const = 0;

    size_t GetUpperBound() const {
        return m_UpperBound;
    }

    size_t GetMissing() const {
        return m_Missing;
    }

protected:
    const size_t m_UpperBound;
    const size_t m_Missing;
};

// NumberGenerator -- derived class
#pragma once

#include "NumberStream.h"

#include <unordered_set>

#include <ctime>

class NumberGenerator : public NumberStream
{
public:
    NumberGenerator(const size_t upperBound, const size_t missing);
    ~NumberGenerator();

    const MissingNumberCollection& GetMissingCollection() const {
        return m_MissingNumbers;
    }

    size_t GetNext() const;
    bool HasNext() const;
    void ResetToHead() const;
    
private:
    MissingNumberCollection m_MissingNumbers;
    mutable size_t m_MissingNumberDetected;
    mutable size_t m_CurVal;
};

// NumberGenerator.cpp - implementation
#include "stdafx.h"
#include "NumberGenerator.h"

#include <random>
#include <ctime>

NumberGenerator::NumberGenerator(const size_t upperBound, const size_t missing)
    : NumberStream(upperBound, missing)
    , m_MissingNumberDetected(0)
    , m_CurVal(0)
{
    // generate random numbers as the missing number
    std::mt19937_64 mtRand(time(NULL));
    std::uniform_int_distribution<size_t> gen(1, upperBound);
    for (size_t idx = 0; idx < m_Missing;) {
        const size_t temp = gen(mtRand);
        if (m_MissingNumbers.find(temp) != m_MissingNumbers.end()) {
            continue;
        }
        m_MissingNumbers.insert(temp);
        //m_MissingNumbers.insert(idx + 1);
        ++idx;
    }
}

NumberGenerator::~NumberGenerator()
{
}

size_t NumberGenerator::GetNext() const
{
    while (++m_CurVal < m_UpperBound) {
        if (m_MissingNumbers.find(m_CurVal) != m_MissingNumbers.end()) {
            ++m_MissingNumberDetected;
        }
        else {
            break;
        }
    }
    return m_CurVal;
}

bool NumberGenerator::HasNext() const {
    return m_CurVal < m_UpperBound
        && (m_UpperBound - m_CurVal) > (m_Missing - m_MissingNumberDetected);
}

void NumberGenerator::ResetToHead() const
{
    m_CurVal = 0;
    m_MissingNumberDetected = 0;
}

// DetectMissingNumbers_Amazon.cpp
#include "NumberStream.h"
#include "NumberGenerator.h"

#include <map>
#include <queue>
#include <tuple>
#include <unordered_map>
#include <vector>

// Given range is not greater than mem size
// Linearly search number stream to find the missing number
void FindMisingNumber(const NumberStream &ns,
    const std::tuple<size_t, size_t> &missNumbers,
    const size_t MemSizeInInt,
    NumberStream::MissingNumberCollection &results)
{
    size_t lowerBound, upperBound;
    std::tie(lowerBound, upperBound) = missNumbers;
    std::vector<bool> hashTable(upperBound + 1 - lowerBound);
    ns.ResetToHead();
    size_t tmp;
    while (ns.HasNext()) {
        tmp = ns.GetNext();
        if (tmp >= lowerBound && tmp <= upperBound) {
            hashTable[tmp - lowerBound] = true;
        }
    }

    tmp = 0;
    for (auto it = hashTable.cbegin(); it != hashTable.end(); ++it, ++tmp) {
        if (*it == false) {
            results.insert(tmp + lowerBound);
        }
    }
}

// Approach 2: recursive implementation
void DetectMissingNumber_R(const NumberStream &ns,
                        const size_t MemSizeInInt,
                        const size_t lowerBound,
                        const size_t upperBound,
                        NumberStream::MissingNumberCollection &results)
{
    const size_t TotalLinearDetectSize = MemSizeInInt << 5; // int has 32 bits
    if ((upperBound - lowerBound) <= TotalLinearDetectSize) {
        // find the missing number
        FindMisingNumber(ns, 
                        std::make_tuple(lowerBound, upperBound),
                        MemSizeInInt,
                        results);
        return;
    }

    // keep divide into two
    const size_t mid = (lowerBound + upperBound) >> 1;
    ns.ResetToHead();
    size_t countOfLowerHalf = 0; // [lowerBound, mid]
    size_t countOfUpperHalf = 0; // [mid + 1, upperBound]
    size_t temp;
    while (ns.HasNext()) {
        temp = ns.GetNext();
        if (temp >= lowerBound && temp <= upperBound) {
            temp > mid ? ++countOfUpperHalf : ++countOfLowerHalf;
        }
    }

    if (countOfLowerHalf < (mid + 1 - lowerBound)) {
        /* sub problem f(lowerBound,
                         mid,
                         mid + 1 - lowerBound - countOfLowerHalf)
        */
        DetectMissingNumber_R(
            ns,
            MemSizeInInt,
            lowerBound,
            mid,
            results);
    }
    if (countOfUpperHalf < (upperBound - mid)) {
        /* sub problem f(mid + 1,
                         upperBound,
                         upperBound - mid - countOfUpperHalf)
        */
        DetectMissingNumber_R(
            ns,
            MemSizeInInt,
            mid + 1,
            upperBound,
            results);
    }
}

// Approach 2: recursive implementation entry-point
NumberStream::MissingNumberCollection DetectMissingNumber_R(
    const NumberStream & ns,
    const size_t MemSizeInInt)
{
    const size_t missing = ns.GetMissing();
    const size_t upperBound = ns.GetUpperBound();
    NumberStream::MissingNumberCollection results;
    if (missing) {
        DetectMissingNumber_R(ns, MemSizeInInt, 1, upperBound, results);
    }
    return results;
}

// Approach 2: loop implementation
NumberStream::MissingNumberCollection DetectMissingNumber(
                                                          const NumberStream & ns,
                                                          const size_t MemSizeInInt)
{
    const size_t Missing = ns.GetMissing();
    const size_t UpperBound = ns.GetUpperBound();
    const size_t TotalLinearDetectSize = MemSizeInInt << 5; // int has 32 bits
    NumberStream::MissingNumberCollection results;
    if (!Missing) {
        return results;
    }

    using SearchQueue = std::queue<std::tuple<size_t, size_t>>;
    SearchQueue sq;
    // lowerBound and upperBound inclusive [lowerBound, upperBound]
    sq.push(std::make_tuple(1, UpperBound));
    size_t lowerBound, upperBound;
    size_t tryCount = 0;
    while (!sq.empty()) {
        ++tryCount;
        std::tie(lowerBound, upperBound) = sq.front();
        if ((upperBound - lowerBound) <= TotalLinearDetectSize) {
            // find the missing number for range under mem size
            FindMisingNumber(ns, sq.front(), MemSizeInInt, results);
        }
        else {
            // keep divide into two
            const size_t mid = (lowerBound + upperBound) >> 1;
            ns.ResetToHead();
            size_t countOfLowerHalf = 0; // [lowerBound, mid]
            size_t countOfUpperHalf = 0; // [mid + 1, upperBound]
            size_t temp;
            while (ns.HasNext()) {
                temp = ns.GetNext();
                if (temp >= lowerBound && temp <= upperBound) {
                    temp > mid ? ++countOfUpperHalf : ++countOfLowerHalf;
                }
            }
                
            if (countOfLowerHalf < (mid + 1 - lowerBound)) {
                // push [lowerBound, mid]
                sq.push(std::make_tuple(lowerBound,
                    mid));
            }
            if (countOfUpperHalf < (upperBound - mid)) {
                // push [mid+1, upperBound]
                sq.push(std::make_tuple(mid + 1,
                    upperBound));
            }
        }
        sq.pop();
    }

    return results;
}

// Approach 3: divide into rang (size equal to Mem << 5)
void DivideRange(const NumberStream &ns,
    const size_t MemSizeInInt,
    const size_t lowerBound,
    const size_t upperBound,
    std::vector<std::tuple<size_t, size_t>> &divides)
{
    const size_t TotalLinearDetectSize = MemSizeInInt << 5; // int has 32 bits    
    const size_t divisions = (upperBound - lowerBound + 1) / TotalLinearDetectSize;
    std::map<size_t, size_t> ladders;
    ladders.insert(std::make_pair(lowerBound, 0));
    for (size_t idx = 1; idx < divisions; ++idx) {
        ladders.insert(
            std::make_pair(lowerBound + TotalLinearDetectSize * idx, 0));
    }
    if (divisions * TotalLinearDetectSize + lowerBound < upperBound) {
        ladders.insert(
            std::make_pair(lowerBound + TotalLinearDetectSize * divisions, 0));
    }
    
    ns.ResetToHead();
    size_t temp;
    while (ns.HasNext()) {
        temp = ns.GetNext();
        if (temp < lowerBound || temp > upperBound) {
            continue;
        }
        auto er = ladders.equal_range(temp);
        er.first != er.second ?
            ++(er.first->second) : ++((--er.first)->second);
    }

    auto it = ladders.cbegin();
    auto itEnd = ladders.cend();
    auto itNext = ladders.cbegin();
    ++itNext;
    size_t totalMissing = 0;
    for (; itNext != itEnd; ++it, ++itNext) {
        if (it->second < (itNext->first - it->first)) {
            divides.push_back(
                std::make_tuple(it->first, itNext->first - 1));
        }
        totalMissing += itNext->first - it->first - it->second;
    }
    auto itR = ladders.rbegin();
    if (itR->second < (upperBound - itR->first + 1)) {
        divides.push_back(
            std::make_tuple(itR->first, upperBound));
    }
    totalMissing += upperBound - itR->first + 1 - itR->second;
}

// Approach 3 - entry point
NumberStream::MissingNumberCollection DetectMissingNumber_Div(const NumberStream & ns,
    const size_t MemSizeInInt)
{
    const size_t Missing = ns.GetMissing();
    const size_t UpperBound = ns.GetUpperBound();
    const size_t TotalLinearDetectSize = MemSizeInInt << 5; // int has 32 bits
    NumberStream::MissingNumberCollection results;
    if (!Missing) {
        return results;
    }

    using SearchQueue = std::queue<std::tuple<size_t, size_t>>;
    SearchQueue sq;
    // lowerBound and upperBound inclusive [lowerBound, upperBound]
    sq.push(std::make_tuple(1, UpperBound));
    size_t lowerBound, upperBound;
    size_t tryCount = 0;
    while (!sq.empty()) {
        ++tryCount;
        std::tie(lowerBound, upperBound) = sq.front();
        if ((upperBound - lowerBound) <= TotalLinearDetectSize) {
            // find the missing number
            FindMisingNumber(ns, sq.front(), MemSizeInInt, results);
        }
        else {
            // keep divide into MemSizeInInt
            std::vector<std::tuple<size_t, size_t>> divides;
            DivideRange(ns, MemSizeInInt, lowerBound, upperBound, divides);
            for (auto it = divides.cbegin(); it != divides.end(); ++it) {
                std::tie(lowerBound, upperBound) = *it;
                sq.push(std::make_tuple(lowerBound, upperBound));
            }
        }
        sq.pop();
    }

    return results;
}

// ********************************************************************************
// Test
// ********************************************************************************
#include <cassert>
void Test_DetectMissingNumbers()
{
    NumberGenerator ns(1000000, 100);
    NumberGenerator::MissingNumberCollection results;

    results = DetectMissingNumber(ns, 10000);
    assert(results == ns.GetMissingCollection());

    results = DetectMissingNumber(ns, 100000);
    assert(results == ns.GetMissingCollection());

    results = DetectMissingNumber(ns, 1000000);
    assert(results == ns.GetMissingCollection());
}

void Test_DetectMissingNumbers_R()
{
    NumberGenerator ns(1000000, 100);
    NumberGenerator::MissingNumberCollection results;

    results = DetectMissingNumber_R(ns, 10000);
    assert(results == ns.GetMissingCollection());

    results = DetectMissingNumber_R(ns, 100000);
    assert(results == ns.GetMissingCollection());

    results = DetectMissingNumber_R(ns, 1000000);
    assert(results == ns.GetMissingCollection());
}

void Test_DetectMissingNumbers_Div()
{
    NumberGenerator ns(1000000, 100);
    NumberGenerator::MissingNumberCollection results;

    results = DetectMissingNumber_Div(ns, 10000);
    assert(results == ns.GetMissingCollection());

    results = DetectMissingNumber_Div(ns, 100000);
    assert(results == ns.GetMissingCollection());

    results = DetectMissingNumber_Div(ns, 1000000);
    assert(results == ns.GetMissingCollection());
}