Wednesday 27 April 2016

Merge X-way Sorted List

1. Problem Description
This is a Google interview question from careercup. Here is the origin thread
'
A list L is too big to fit in memory. L is partially sorted. Partially sorted in a specific way: x-sorted L[i] < L[i+x]. Any element is at most x indices out of position. 

You can look at the condition in a different way too..
L[i] >= L[i-x]

Sort the list L.
- jss_777 March 30, 2016 in United States | Report Duplicate | Flag '.

2. Data Structure and Algorithm
As the input array meets this condition "L[i] < L[i+x]", literally we have x-way of sorted list as,
    - Arr1: L[0] < L[X] < L[2X] < ......
    - Arr2: L[1] < L[X+1] < L[2X+1] < L[3X+1] < ......
    - Arr3: L[2] < L[X+2] < L[2X+2] < L[3X+1] < ......
      ......
    - ArrX: L[X-1] < L[2X-1] < L[3X-1] < L[4X-1] < ......

Merge sorted arrays into a single one. It is not unfamiliar to us, as it is the last step of merging sort. In merging sort two sorted arrays with the size of window is merged into a bigger array (literally size of two window size). Keep sorting and merging of the smaller window and eventually sort and merge into the full size.

Coming back to this problem with X-way sorted array. There are two ways of tackling this Itproblem.

The 1st approach:
It is to use the exactly same idea as merge sort.
    - Merge Arr1 and Arr2 to become a bigger size of new array Arr_A
    - Merge Arr3 and Arr3 to become a bigger size of new array Arr_B
    ......
Now we have X/2 sorted arrays if X is an even number or X/2 + 1 sorted arrays now if X is an odd number. Keep the above process (merge Arr_A and Arr_B, merge Arr_C and Arr_D, ......) until merged into a single array.

The 2nd approach:
It is to merge X-way in one row directly into a single array. Keep taking the heads (the smallest value of Arr1, Arr2, .. ArrX) and find the least value.
    - Initial heads (L[0], L[1], ......, L[X-1]
    - Find the least value, say it is L[i], which is the least value of L (globally)
    - New heads array (L[0], L[1], ..., L[i-1], L[i+X], L[i+1], ..., L[X-1]
    - Find the least value in the new heads array, say it is L[j], which is the 2rd least value of L
       Head arrays will reduce its size if L[j] hit the end of Arr_J
    - Now replace L[j] with L[j+X]  to form the new heads
    - Keep the above two steps until exhaust the whole array

Implementation wise it is to use a min-heap (priority queue in C++) to take the header array. Each time to pop the head, L[i] (which is the least value in the heads array) and push its next, L[i+X] into the queue. This technique has been used in this post as well, Find the Nth Multiple Given a Set

3. C++ Implementation
This implementation implements the 2nd approach with two option of data structure of L. One is linked list and another is an array (std::vector).
As the interview question stated that L is very large. The best approach is to do in-place sorting/merging. And this implementation is not an in-place solution.

// ********************************************************************************
// Implementation
// ********************************************************************************
#include "../DetectCircleInLinkedList/DetectCircleInLinkedList/LinkedListElement.h"
#include "../DetectCircleInLinkedList/DetectCircleInLinkedList/LinkedList.h"

#include <queue>
#include <vector>

template <typename T>
struct LLE_Comparator
{
    bool operator() (LinkedListElement<T>* &lhs, LinkedListElement<T>* &rhs) {
        return lhs->data > rhs->data;
    }
};

template<typename T>
LinkedListElement<T>* AdvanceByX(LinkedListElement<T>* start, unsigned int X)
{
    unsigned int step = 0;
    while (start && step < X) {
        start = start->next;
        ++step;
    }

    return start;
}

template <typename T>
void MergeXwaySortedLL(LinkedList<T> &input, const unsigned int X, LinkedList<T> &result)
{
    using MinHeap = std::priority_queue<LinkedListElement<T>*, std::vector<LinkedListElement<T>*>, LLE_Comparator<T>>;

    LinkedListElement<T> **head = input.Release();
    if (!head || !(*head)) {
        return;
    }

    unsigned int count = 0;
    LinkedListElement<T> *temp = *head;
    MinHeap topNodesOfXways;
    // get first X nodes
    while (count < X && temp) {
        topNodesOfXways.push(temp);
        temp = temp->next;
        ++count;
    }

    while (!topNodesOfXways.empty()) {
        temp = topNodesOfXways.top();
        result.PushBack(temp->data);
        temp = AdvanceByX(temp, X);
        topNodesOfXways.pop();
        if (temp) {
            topNodesOfXways.push(temp);
        }
    }
}

template <typename T>
struct ValueIndexPair{
    ValueIndexPair(const T& val, size_t idx)
        : value(val), index(idx)
    {}
    ValueIndexPair()
    {}

    T value;
    size_t index;
};

template <typename T>
struct ArrE_Comparator
{
    bool operator() (ValueIndexPair<T> &lhs, ValueIndexPair<T> &rhs) {
        return lhs.value > rhs.value;
    }
};

template <typename T>
std::vector<T> MergeXwaySortedArr(const std::vector<T> &arr, const unsigned int X)
{
    using MinHeap = std::priority_queue<ValueIndexPair<T>, std::vector<ValueIndexPair<T>>, ArrE_Comparator<T>>;

    const size_t LEN = arr.size();
    if (X > LEN) {
        return std::vector<T>();
    }

    // push first X into heap
    MinHeap minHeap;
    for (unsigned int count = 0; count < X; ++count) {
        minHeap.push(ValueIndexPair<T>(arr[count], count));
    }

    std::vector<T> result;
    result.reserve(LEN);
    while (!minHeap.empty()) {
        ValueIndexPair<T> &temp = minHeap.top();
        result.push_back(temp.value);
        if ((temp.index + X) < LEN) {
            minHeap.push(ValueIndexPair<T>(arr[temp.index + X], temp.index + X));
        }
        minHeap.pop();
    }

    return result;
}


// ********************************************************************************
// Test
// ********************************************************************************
#include <cassert>
#include <array>
void Test_LL()
{
    {
        LinkedList<int> input;
        LinkedList<int> result;
        input.PushBack(1);
        MergeXwaySortedLL(input, 1, result);
        LinkedListElement<int> *head = *(result.Release());
        LinkedListElement<int> *tail = *(result.ReleaseTail());
        assert(head->data == 1);
        assert(tail->data == 1);
    }

    {
        LinkedList<int> input;
        LinkedList<int> result;
        const std::array<int, 16> data = { 0, 4, 8, 12, 1, 5, 9, 13, 2, 6, 10, 14, 3, 7, 11, 15 };
        for (auto iter = data.begin(); iter != data.end(); ++iter) {
            input.PushBack(*iter);
        }

        MergeXwaySortedLL(input, 4, result);
        LinkedListElement<int> *curNode = *(result.Release());
        unsigned int count = 0;
        while (curNode) {
            assert(curNode->data == count);
            curNode = curNode->next;
            ++count;
        }

        assert(count == 16);
    }
}

void Test_Arr()
{
    {
        const std::vector<int> data = { 1, 3, 2, 4 };
        std::vector<int> result = MergeXwaySortedArr(data, 5);
        assert(result.empty() == true);
        result = MergeXwaySortedArr(data, 2);
        assert(result == std::vector<int>({ 1, 2, 3, 4 }));
    }

    {
        const std::vector<int> data = { 0, 4, 8, 12, 1, 5, 9, 13, 2, 6, 10, 14, 3, 7, 11, 15 };
        std::vector<int> result = MergeXwaySortedArr(data, 17);
        assert(result.empty() == true);
        result = MergeXwaySortedArr(data, 4);
        assert(result == std::vector<int>({0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15}));
    }
}