Wednesday, 23 July 2014

Dynamic programming - Heavy Numbers (Part II)

In my last blog entry, Dynamic programming - Heavy Numbers (Part I), a solution is presented to find how many heavy number given a range via dynamic programming. The key idea is to formulate the sub-problem and then solve it recursively. And a few optimization techniques are employed as well, for instance caching the result and using the pre-calculated results. Here I would like discuss a more direct way to tackle this problem. And this servers my last discussion on heavy number.

1. Analysis
Let's see what the heavy numbers share in common. Here is the list of all heavy numbers within [10, 99] (Sum of digits is 15, 16, 17, 18)
    - 69, 78, 79, 87, 88, 89, 96, 97, 98, 99
The let's see the combinations of heavy numbers.
    - 69, 96: {6, 9}
    - 78, 87: {7, 8}
    - 79, 97: {7, 9}
    - 88:       {8, 8}
    - 89, 98: {8, 9}
    - 99        {9, 9}
Seems to me that we can direct compute how many permutations of numbers if we can find all the possible combinations that meet the requirement of heavy number. For instance find out all the possible combination of {6, 9}, {7, 8}, {7, 9}, {8, 8}, {8, 9} and {9, 9}, then the permutations of the possible combinations are the answer and can be easily computed.

Let's re-arrange the possible combinations and take a close look at them.
    - 99      : {9, 9}
    - 98, 89: {9, 8}
    - 97, 79: {9, 7}
    - 96, 69: {9, 6}
    - 88      : {8, 8}
    - 87, 78: {8, 7}
Here are the all the heavy number should share:
    - At least one of digits should be > 7, namely 8, 9
    - A number consisting of only digits [0, 7] is not a heavy number
        * 0~7 is not heavy number
        * 66, 77 are not either
        * 666, 777 are not either
        ......
    - Narrow the range to search the combinations given n digits
        * 1 digit: [8, 9]
        * 2 digits: [87, 99]
        * 3 digits: [877, 999]
        * 4 digits: [8777, 9999]
        ......
    - Continue to narrow: each digit should be <= it direct left digit, except the left most digit.
        * 1 digit: [8, 9]
        * 2 digits: [87, 88], [90, 99]
        * 3 digits: [990, 999], [980, 988], [970, 977], [960, 966], [950, 955], [940, 944], ..., [900, 900]
                        [880, 888], [870, 877], ..., [810, 811], [800, 800]
         ......

2. Formulation of sub-problems
Based on the analysis on last section let's take a look at the sub-problem as I introduced in Dynamic programming - Heavy Numbers (Part I). Comparing with Section 2 on  Dynamic programming - Heavy Numbers (Part I) the only difference resides on "S sides", which is to reflect the narrowing space we are going to search.
    - S sides:
        * The top digit: only two sides - 9 and 8
        * The rest of digits: given its left digits value x (0 <= x <=9), it has (x+1) sides and they are  are 0, 1, 2, ..., x (not 0~9 any more)

It is implemented in function GetDigitsCombinatonsGivenSumOfNdigits. Please see Section 4. And of course sub-problem in this approach is to find out the possible combinations in the search space however in my last blog,  Dynamic programming - Heavy Numbers (Part I), it is to return how many heavy numbers. Another difference is that in this approach it is no way to  catch the previous result, because the narrowing search space is to guarantee that each combination will be unique.  

3. Calculate the permutation given a combination
Let's take a look at how to calculate how many  heavy numbers given a particular combination. The calculation will be a bit different if a combination has digit "0" within it. Because a number can not start with the digit "0".

Given a combination that none of its digits is "0".
    permutations = factorial(nDigits)/(factorial(repeatOfOne) * factorial(repeatOfTwo)* ...                                                                                     *factorial(repeatOfNine))
    where:
        - factorial(n) = n * (n - 1) * (n - 2) * ... * 2 * 1
        - "repeatOfX": occurrence of digit x, [1, 9] in this combination, can be 0, 1, ..., nDigits
        - Define factorial(0) = 1 for our computation

Given a combination that at least one of elements is "0".
The difference is that the total possible permutation will reduce. Assume that there are repeatOfZero "0". Then there are (nDigits - repeatOfZero) digits > 0.
Hence the total possible permutation will reduce
    from factorial(nDigits) to (nDigits - repeatOfZero) * factorial(nDigits - 1).
Therefore the formula can be rewritten as
    permutations = (nDigits - repeatOfZero) * factorial(nDigits - 1) /
                          (factorial(repeatOfZero) * factorial(repeatOfOne) * ... * factorial(repeatOfNine))

4. Implementation
The base class implementation of HeavyNumberImpPolynomial, please refer to my last blog, Dynamic programming - Heavy Numbers (Part I). HeavyNumberImpPermutation has a virtual implementation of function GetNumOfHeavyNumbersAgainstNdigits and shares the rest the API with its base class.

//********************************************************************************
// HeavyNumberImpPermutation.h
#ifndef HEAVY_NUMBER_IMP_PERMUTATION_H
#define HEAVY_NUMBER_IMP_PERMUTATION_H

#pragma once

#include "HeavyNumberImpPolynomial.h"

#include <deque>

class HeavyNumberImpPermutation : public HeavyNumberImpPolynomial
{
public:
HeavyNumberImpPermutation();
~HeavyNumberImpPermutation();

size_t GetNumOfHeavyNumbersAgainstNdigits(long nDigits);

private:
void GetDigitsCombinatonsGivenSumOfNdigits(long nDigits,
  long nSides,
  long sum,
  std::deque<int>& digits,
  std::vector<std::vector<int>>& digitsCombination);
size_t GetNumOfHeavyNumberGivenDigitsCombinations(long nDigits,
const std::vector<std::vector<int>>& digitsCombination);
double Factorial(int val);
};

#endif

// HeavyNumberImpPermutation.cxx
#include "stdafx.h"
#include "HeavyNumberImpPermutation.h"


HeavyNumberImpPermutation::HeavyNumberImpPermutation()
{
}


HeavyNumberImpPermutation::~HeavyNumberImpPermutation()
{
}


size_t HeavyNumberImpPermutation::GetNumOfHeavyNumbersAgainstNdigits(long nDigits)
{
if (nDigits < 0) {
return 0;
}

const size_t PRE_COMPUTED_NUM_OF_HV[9] = { 0, 2, 10, 56, 330, 2001, 12313, 76470,                                                                                                      478302 };
if (nDigits <= 8) {
return PRE_COMPUTED_NUM_OF_HV[nDigits];
}

const size_t heaviness = nDigits * AVERAGE_VALUE_OF_HEAVY_NUM + 1;
const long nSidesUpperBound = 9;
const size_t nSideLowerBound = 7;
std::deque<int> digits;
std::vector<std::vector<int>> digitsCombinations(10);
for (size_t topDigits = nSidesUpperBound; topDigits > nSideLowerBound; --topDigits) {
digits.clear();
digits.push_back(topDigits);
GetDigitsCombinatonsGivenSumOfNdigits(nDigits - 1, topDigits, heaviness - topDigits,
 digits, digitsCombinations);
}

return GetNumOfHeavyNumberGivenDigitsCombinations(nDigits, digitsCombinations);
}

void HeavyNumberImpPermutation::GetDigitsCombinatonsGivenSumOfNdigits(
long nDigits,
long nSides,
long sum,
std::deque<int>& digits,
std::vector<std::vector<int>>& digitsCombination)
{
if (sum > nSides * nDigits) {
return;
}

if (sum <= 0 && nDigits == 0) {
for (size_t index = 0; index < digitsCombination.size(); ++index) {
digitsCombination[index].push_back(0);
}
size_t size = digitsCombination[0].size();
for (size_t val, index = 0; index < digits.size(); ++index) {
val = digits[index];
++digitsCombination[val].at(size - 1);
}
return;
}

size_t count = 0;
for (long val = nSides; val >= 0; --val) {
digits.push_back(val);
GetDigitsCombinatonsGivenSumOfNdigits(nDigits - 1, val, sum - val, 
                               digits, digitsCombination);
digits.pop_back();
    }
}

size_t HeavyNumberImpPermutation::GetNumOfHeavyNumberGivenDigitsCombinations(
long nDigits,
const std::vector<std::vector<int>>& digitsCombination)
{
double factorailNDigits = Factorial(nDigits);
size_t numOfHM = 0;
double permutation;
for (size_t index = 0; index < digitsCombination[0].size(); ++index) {
permutation = digitsCombination[0].at(index) == 0?
factorailNDigits : (nDigits - digitsCombination[0].at(index)) * Factorial(nDigits - 1);
for (size_t digit = 0; digit < 10; ++digit) {
permutation /= Factorial(digitsCombination[digit].at(index));
}
numOfHM += permutation;
}

return numOfHM;
}

double HeavyNumberImpPermutation::Factorial(int val)
{
if (val <= 1) {
return 1.0;
}

double factorial = 1.0;
while (val) {
factorial *= val;
--val;
}

return factorial;
}

// Test
void Test() {
size_t numOfHN;
HeavyNumberImpPermutation hnipe;
numOfHN = hnipe.GetNumOfHeavyNumbersAgainstNdigits(1);
numOfHN = hnipe.GetNumOfHeavyNumbersAgainstNdigits(2);
numOfHN = hnipe.GetNumOfHeavyNumbersAgainstNdigits(3);
numOfHN = hnipe.GetNumOfHeavyNumbersAgainstNdigits(4);
numOfHN = hnipe.GetNumOfHeavyNumbersAgainstNdigits(5);
numOfHN = hnipe.GetNumOfHeavyNumbersAgainstNdigits(6);
numOfHN = hnipe.GetNumOfHeavyNumbersAgainstNdigits(7);
numOfHN = hnipe.GetNumOfHeavyNumbersAgainstNdigits(8);
numOfHN = hnipe.GetNumOfHeavyNumbersAgainstNdigits(9);
numOfHN = hnipe.GetNumOfHeavyNumbersAgainstNdigits(10);
}
//********************************************************************************

5. Summary
I did some very simple performance comparison between the two solutions in this blog and the last, on function GetNumOfHeavyNumbersAgainstNdigits. Lets's say the solution on last blog as Solution 1 and this blog as Solution 2. Here comes the result.
    Digits                     Solution 1 (millisecond)                Solution 2 (millisecond)
       9                                 0.000                                          0.001
      10                                0.002                                          0.002
      11                                0.006                                          0.003
      12                                0.011                                          0.005
      13                                0.025                                          0.009
      14                                0.042                                          0.011
      15                                0.069                                          0.017
      16                                0.111                                          0.025
(Environment: Microsoft Visual Studio Express 2013, Intel Core i3-3227U @1.9G, 6G RAM)

Solution 2 starts to outperform Solution 1 at digits 11. And when the number of digits increases, Solution 2 performs even better than Solution 1. I think the real advantage of Solution 2 is the narrowed search space. Even though the Solution 1 has caching facility, it impact might be balanced with it overhead as well.

These two solutions are both proven much much faster the simplest way - search the whole range, get sum of digits, calculate the average and decide if it is a heavy number. This proves very expensive,  and it takes 25.022 seconds and 311.951 seconds for 9 and 10 digits number respectively.

Bibliography:
[1] Donald E. Knuth "The Art of Computer Programming", Volume 1, Fundamental Algorithms, Third Edition
[2] Donald E. Knuth "The Art of Computer Programming", Volume 2, Seminumerical Algorithms, Third Edition

Friday, 18 July 2014

Dynamic programming - Heavy Numbers (Part I)

Other day my friend and I was on the train back home after work. We were talking about some interesting programming exercise to kill the time. He mentioned an interesting one, heavy number that he did for an interview exercise.

1. Definition
Here is the definition of heavy number, given a integer not less than zero. If the average of all its digits is more than 7, then it is called heavy number. For instance,
    - 999: (9+9+9)/3 = 9 > 7, it is a heavy number
    - 777: (7+7+7)/3 = 7 not > 7, it is not a heavy number
The exercise is to calculate how many heavy numbers given a range.

2. Analysis
I believe this is sort of similar programming exercise as I introduced in my other blog entry, Dynamic programming - Sum K of N dices with S sides. Here is the illustration,
    - S sides: 10 sides, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9;
                   Except the first digit can only have 9 sides, 1, 2, 3, 4, 5, 6, 7, 8, 9
    - N dices: number of digits, 999 - 3 digits/dices, 12345 - 5 digits/dices
    - Sum K: for 1 digit (0 ~ 9), K = 8, 9
                   for 2 digits (10 ~ 99), K = 15, 16, 17, 18
                   for 3 digits (100 ~ 99), K = 22, 23, 24, 25, 26, 27
                   ......
                   for n digits (pow(10, n-1) ~ pow(10, n) - 1), K = 7*n+1, ......, 9*n

3. Algorithm
Pseudo-code,
    - Calculate how many digits the given range comes across
    - Sum all them heavy numbers of the full range given i digits
    - Sum the heavy numbers of upper range
    - Minus the heavy numbers of lower range

Here is an example: given a range [80, 234567891]
    - digits across: 2, 3, 4, 5, 6, 7, 8, 9
    - Sum all the heavy number of given digits of 2, 3, 4, 5, 6, 7, 8
    - Sum all the heavy number of upper range [100000000, 234567891]
    - Minus the heavy numbers of lower range [10, 80), not including 80

The last two steps prove to be very difficult.  It is implemented in GetNumOfHeavyNumbersGivenListOfNum().
The idea is to convert to the sub-problems of calculating the heavy number of given i digits. Here is how to do with [100000000, 234567891]
    - [234567890, 234567891]
    - [234567800, 234567889]
        * Known sum = 2 + 3 + 4 + 5 + 6 + 7 + 8 = (2+8) / 2 * 7 = 35
        * Heavy number requirement:  9*7+1 = 64
        * Sub-problem: K >= 64 - 35 = 29
        * N dices: 2
        * S sides: 1st dice [0, 8], 2nd dice [0, 9]
    - [234567000, 234567799]
        * Known sum = 2 + 3 + 4 + 5 + 6 + 7 = (2+7) / 2 * 6 = 27
        * Heavy number requirement:  9*7+1 = 64
        * Sub-problem: K >= 64 - 27 = 37
        * N dices: 3
        * S sides: 1st dice [0, 7], 2nd and 3rd dice [0, 9]
     ......
    - [100000000, 199999999]
        * Known sum = 1
        * Heavy number requirement:  9*7+1 = 64
        * Sub-problem: K >= 64 - 1 = 63
        * N dices: 8
        * S sides: all [0, 9]

Here come the code:

//********************************************************************************
// HeavyNumberImpPolynomial.h
#ifndef HEAVY_NUMBER_IMP_POLYNOMIAL_H
#define HEAVY_NUMBER_IMP_POLYNOMIAL_H

#pragma once

#include <unordered_map>

class HeavyNumberImpPolynomial
{
public:
HeavyNumberImpPolynomial();
~HeavyNumberImpPolynomial();

virtual size_t GetNumOfHeavyNumbersAgainstNdigits(long nDigits);
size_t GetNumOfHeavyNumbersAgainstRange(size_t start, size_t end);

const static size_t AVERAGE_VALUE_OF_HEAVY_NUM = 7;

private:
struct HV_KEY{
long nDigits;
long SUM;
bool operator()(const HV_KEY& right) const {
return right.nDigits == nDigits && right.SUM == SUM;
}
    };

    struct HV_KEY_HASH{
size_t operator()(const HV_KEY& key) const {
            return std::hash<long>()(key.nDigits) ^std::hash<long>()(key.SUM);
        }

bool operator()(const HV_KEY& k1, const HV_KEY& k2) const {
return k1.operator()(k2);
}
    };
typedef std::unordered_map<HV_KEY, size_t, HV_KEY_HASH, HV_KEY_HASH> HV_CACHE;
size_t CountCombinatonsGivenSumOfNdigits(long nDigits, long nSides,
                                            long sum, HV_CACHE& hvCache);
void GetDigitsOfNumber(size_t num, std::vector<unsigned int>& digits);
size_t GetNumOfHeavyNumbersGivenListOfNum(const std::vector<unsigned int>& digits);
size_t GetLeastHeavyNumGivenNdigits(size_t n);
};

#endif

// HeavyNumberImpPolynomial.cpp
#include "stdafx.h"

#include "HeavyNumberImpPolynomial.h"

#include <assert.h>
#include <numeric>

HeavyNumberImpPolynomial::HeavyNumberImpPolynomial()
{
}


HeavyNumberImpPolynomial::~HeavyNumberImpPolynomial()
{
}


size_t HeavyNumberImpPolynomial::GetNumOfHeavyNumbersAgainstRange(size_t start, size_t end)
{
if (start > end) {
return 0;
}
std::vector<unsigned int> digitsOfStart;
GetDigitsOfNumber(start, digitsOfStart);
std::vector<unsigned int> digitsOfEnd;
GetDigitsOfNumber(end, digitsOfEnd);

size_t count = std::accumulate(digitsOfEnd.begin(), digitsOfEnd.end(), 0.0) /
         digitsOfEnd.size() > AVERAGE_VALUE_OF_HEAVY_NUM ? 1 : 0;

for (size_t digit = digitsOfStart.size(); digit < digitsOfEnd.size(); ++digit) {
count += GetNumOfHeavyNumbersAgainstNdigits(digit);
}

if (end >= GetLeastHeavyNumGivenNdigits(digitsOfEnd.size())) {
count += GetNumOfHeavyNumbersGivenListOfNum(digitsOfEnd);
}
if (start >= GetLeastHeavyNumGivenNdigits(digitsOfStart.size())) {
count -= GetNumOfHeavyNumbersGivenListOfNum(digitsOfStart);
}

return count;
}

size_t HeavyNumberImpPolynomial::GetNumOfHeavyNumbersAgainstNdigits(long nDigits)
{
if (nDigits < 0) {
return 0;
}

const static size_t PRE_COMPUTED_NUM_OF_HV[9] = { 0, 2, 10, 56,
                               330, 2001, 12313, 76470, 478302 };

if (nDigits <= 8) {
return PRE_COMPUTED_NUM_OF_HV[nDigits];
}

size_t numOfHN = 0;
HV_CACHE hvCache;
const size_t heaviness = nDigits * AVERAGE_VALUE_OF_HEAVY_NUM + 1;
const long nSides = 9;
for (size_t topDigits = 1; topDigits <= nSides; ++topDigits) {
numOfHN += CountCombinatonsGivenSumOfNdigits(nDigits - 1, nSides,
                               heaviness - topDigits, hvCache);
}

return numOfHN;
}

size_t HeavyNumberImpPolynomial::CountCombinatonsGivenSumOfNdigits(
           long nDigits,
           long nSides,
           long sum,
           HV_CACHE& hvCache)
{
if (sum > nSides * nDigits) {
return 0;
}

if (sum <= 0 && nDigits == 0) {
return 1;
}

const HV_KEY key{ nDigits, sum };
HV_CACHE::const_iterator cachedIter = hvCache.find(key);
if (cachedIter != hvCache.end()) {
return cachedIter->second;
}

size_t count = 0;
for (size_t val = 0; val <= nSides; ++val) {
count += CountCombinatonsGivenSumOfNdigits(nDigits - 1,
                       nSides, sum - val, hvCache);
}

hvCache.insert(std::make_pair(key, count));

return count;
}

void HeavyNumberImpPolynomial::GetDigitsOfNumber(
             size_t num,
             std::vector<unsigned int>& digits)
{
digits.clear();
if (num == 0) {
digits.push_back(0);
return;
}

while (num) {
digits.push_back(num % 10);
num /= 10;
}
return;
}

size_t HeavyNumberImpPolynomial::GetNumOfHeavyNumbersGivenListOfNum(
                const std::vector<unsigned int>& digits)
{
if (digits.empty()) {
return 0;
}

long numOfDigits = digits.size();
assert(digits[numOfDigits - 1] > 0);
long sum = numOfDigits * AVERAGE_VALUE_OF_HEAVY_NUM + 1;
size_t start;
size_t count = 0;
HV_CACHE hvCache;
const long nSides = 9;
while (numOfDigits) {
start = numOfDigits == digits.size() ? 1 : 0;
for (size_t digit = start; digit < digits[numOfDigits - 1]; ++digit) {
count += CountCombinatonsGivenSumOfNdigits(numOfDigits - 1,
                           nSides, sum - digit, hvCache);
}
sum -= digits[numOfDigits - 1];
--numOfDigits;
}

return count;
}

/*
* n = 1, return 8
* n = 2, return 69
* n = 3, return 499
* n = 4, return 2999
* n = 5, return 18999
* n = 6, return 169999
* Start to broke when n is too big for Win32
* because overflow will happen (n < 11)
*
*/
size_t HeavyNumberImpPolynomial::GetLeastHeavyNumGivenNdigits(size_t n)
{
const size_t PRE_COMPUTED_LEAST_HN[] = { 8,
69,
499,
2999,
18999,
169999,
1499999,
12999999,
109999999,
1079999999 };
if (n == 0) {
return -1;
}

if (n < 11) {
return PRE_COMPUTED_LEAST_HN[n - 1];
}

size_t sum = n*AVERAGE_VALUE_OF_HEAVY_NUM + 1;
size_t num = 0;
size_t tempDigits = 0;
while (sum) {
if (sum > 9) {
num += 9 * pow(10, tempDigits);
++tempDigits;
sum -= 9;
}
else {
break;
}
}

if ((n - tempDigits) == 1) {
num += sum * pow(10, tempDigits);
}
else {
num += (sum - 1) * pow(10, tempDigits);
num += pow(10, n - 1);
}

return num;
}

// main.cpp
#include "HeavyNumberImpPolynomial.h"

int _tmain(int argc, _TCHAR* argv[])
{
size_t numOfHN;
HeavyNumberImpPolynomial hnp;

numOfHN = hnp.GetNumOfHeavyNumbersAgainstRange(10000, 10000);
numOfHN = hnp.GetNumOfHeavyNumbersAgainstRange(789, 788);
numOfHN = hnp.GetNumOfHeavyNumbersAgainstRange(789, 56788765);
numOfHN = hnp.GetNumOfHeavyNumbersAgainstRange(0, 200000000);
}

//********************************************************************************

Optimization techniques:
    - Use pre-calculated values to speed up the process
        * The pre-calculated number of heavy number given all n digit numbers
    - Narrow the range given all n digit numbers
        * The pre-calculated least heavy number given all n digit numbers
    - Use hash map to cache the work done so far.

Thursday, 17 July 2014

Data Structure and Algorithm - Sort and Binary Search

Other day I was talking to my friends about the the most often interview questions we had encountered as interviewer or  interviewee. We seem to agree 3 items are definitely on the top of the list
    - Big O
    - Sorting and search algorithms
    - Linked list
Questions are often mixed with each other and some coding exercise are often related to these concepts and implementation.. Sometime a lot of information disguises the true algorithm that the interviewer wants the interviewee to implement.Quite often underneath it requires to use one of sorting and/or search algorithms to solve it  given a required big O notation for complexity and space.

I can't remember exactly last time I was implementing such an algorithm. Probably it was a few years when I was working on low level of C code primary in embedded applications. Nowadays I am working on C++ mostly  and life is easier by simply calling the sort/search function provided by the library. Still these algorithms are serving as the basic knowledge of Computer Science and I still enjoy a lot reviewing them and implement them as homework.

1. Heap Sort
The key to understand heap sort is to visualize the array into a tree structure. Each iteration is to pop up the largest the value in the rest of the tree to the root.
Given an array size of n: (index starts from 0)
    - Index 0 is the root, and its children are 1, 2
    - The children of index 1: 3, 4
    - The children of index 2: 5, 6
    - The children of index 3: 7, 8
    - The children of index i: 2i+1, 2i+2
The binary tree is filled always the left-side first.
As long as understanding the visualization of the binary tree structure, then the rest of algorithm to pop the largest value into the root position will be easy to understand. More details please refer to heap sort on wikipedia. Here is my homework implementation.

//********************************************************************************
// HeapSort.h
#ifndef HEAP_SORT_H
#define HEAP_SORT_H

#pragma once

#include <vector>

class HeapSort
{
public:
HeapSort();
~HeapSort();

void Sort(std::vector<int>& data);
void Sort(int data[], int size);
void Sort_Recursive(int data[], int size);

private:
void Heapify(int data[], int start, int end);
void Heapify_Recursive(int data[], int start, int end);
void ShiftRight(int data[], int start, int end);
void ShiftRight_Recursive(int data[], int start, int end);
};

#endif


// HeapSort.cpp
#include "stdafx.h"
#include "HeapSort.h"


HeapSort::HeapSort()
{
}


HeapSort::~HeapSort()
{
}


void HeapSort::Sort(std::vector<int>& data)
{
if (!data.empty()) {
Sort(&data[0], data.size());
}
}

void HeapSort::Sort(int data[], int size)
{
if (size < 1) {
return;
}

Heapify(data, 0, size);

int tempSize = size - 1;
int tempVal;
while (tempSize > 0) {
// the biggest value alwasy stays at index of "0"
tempVal = data[tempSize];
data[tempSize] = data[0];
data[0] = tempVal;

ShiftRight(data, 0, tempSize);
--tempSize;
}
}

void HeapSort::Sort_Recursive(int data[], int size)
{
if (size < 1) {
return;
}
Heapify_Recursive(data, 0, size);

int tempSize = size - 1;
int tempVal;
while (tempSize > 0) {
// the biggest value alwasy stays at index of "0"
tempVal = data[tempSize];
data[tempSize] = data[0];
data[0] = tempVal;

ShiftRight_Recursive(data, 0, tempSize);
--tempSize;
}
}

void HeapSort::Heapify(int data[], int start, int end)
{
int midpoint = (start + end) >> 1;
while (midpoint >= 0) {
ShiftRight(data, midpoint, end);
--midpoint;
}
}

void HeapSort::Heapify_Recursive(int data[], int start, int end)
{
int midpoint = (start + end) >> 1;
while (midpoint >= 0) {
ShiftRight_Recursive(data, midpoint, end);
--midpoint;
}
}

void HeapSort::ShiftRight(int data[], int start, int end)
{
int root = start;
int leftChild = 2 * root + 1;
int rightChild;
int max;
int tempRoot;
while (leftChild < end) {
max = data[root];
tempRoot = root;
if (data[leftChild] > data[root]) {
max = data[leftChild];
tempRoot = leftChild;
}
rightChild = leftChild + 1;
if (rightChild < end) {
if (data[rightChild] > max) {
max = data[rightChild];
tempRoot = rightChild;
}
}

if (tempRoot != root) {
data[tempRoot] = data[root];
data[root] = max;
root = tempRoot;
leftChild = 2 * root + 1;
}
else{
break;
}
}
}

void HeapSort::ShiftRight_Recursive(int data[], int start, int end)
{
int root = start;
int leftChild = 2 * root + 1;
int rightChild;
int max;
int tempRoot;
if (leftChild < end) {
max = data[root];
tempRoot = root;
if (data[leftChild] > data[root]) {
max = data[leftChild];
tempRoot = leftChild;
}
rightChild = leftChild + 1;
if (rightChild < end) {
if (data[rightChild] > max) {
max = data[rightChild];
tempRoot = rightChild;
}
}

if (tempRoot != root) {
data[tempRoot] = data[root];
data[root] = max;
root = tempRoot;
ShiftRight_Recursive(data, tempRoot, end);
}
}
}
//********************************************************************************

2. Merge Sort
The idea behind merge sort is not difficult to understand if you know how divide-and-conquer is working generally. Simply divide a big problem into small sub-problems. The idea of merge sort is
    - Split the big array into two (nearly) even parts: left and right
    - Sort the left and right respectively
    - Then merge the two sub-arrays into one.

This is top-down view. There is another bottom-up view. Imagine that you have a widow size of: 1, 2, 4, 8, ....... (until over the size of the array) to go through the array.
    - Windows size of 1: each individual element of the array - no need to sort
    - Windows size of 2: compare the two elements and sort them into the correct order
    - Windows size of 4: compare the two sorted arrays of Windows size of 2. Merge them in sorted order.
    - keep going on until the window size takes over the size of the array.
More details please read merge sort on wikipedia. Here is my homework implementation.

//********************************************************************************
// MergeSort.h
#ifndef MERGE_SORT_H
#define MERGE_SORT_H

#pragma once

#include <vector>

class MergeSort
{
public:
MergeSort();
~MergeSort();

void Sort(std::vector<int>& data);
void Sort(int data[], int size);
void Sort_Recursive(int data[], int size);

private:
void BottomUpMerge(int data[], int start, int mid, int end, int copy[]);
void TopDownSplit(int data[], int start, int end, int copy[]);
void CopyArray(int src[], int dst[], int start, int end);
};

#endif

// MergeSort.cpp
#include "stdafx.h"
#include "MergeSort.h"


MergeSort::MergeSort()
{
}


MergeSort::~MergeSort()
{
}

void MergeSort::Sort(std::vector<int>& data)
{
if (data.empty()) {
return;
}

Sort(&data[0], data.size());
}

void MergeSort::Sort(int data[], int size)
{
if (size <= 0) {
return;
}

int* tempCopy = new int[size];

for (int width = 1; width < size; width *= 2) {
for (int mid, end, start = 0; start < size; start += 2 * width) {
mid = (start + width) < size ? (start + width) : size;
end = (start + 2 * width) < size ? (start + width * 2) : size;
BottomUpMerge(data, start, mid, end, tempCopy);
}

// copy back the data
CopyArray(tempCopy, data, 0, size);
}

delete[] tempCopy;
}

void MergeSort::BottomUpMerge(int data[], int start, int mid, int end, int copy[])
{
int leftStart = start;
int rightStart = mid;

for (int index = start; index < end; ++index) {
if (leftStart < mid && (rightStart >= end || data[leftStart] <= data[rightStart]) ) {
copy[index] = data[leftStart];
++leftStart;
}
else {
copy[index] = data[rightStart];
++rightStart;
}
}
}

void MergeSort::Sort_Recursive(int data[], int size)
{
if (size <= 0) {
return;
}

int* tempCopy = new int[size];
TopDownSplit(data, 0, size, tempCopy);
delete[] tempCopy;
}

void MergeSort::TopDownSplit(int data[], int start, int end, int copy[])
{
if ((end - start) < 2) {
return;
}

int midpoint = (start + end) >> 1;
TopDownSplit(data, start, midpoint, copy);
TopDownSplit(data, midpoint, end, copy);
BottomUpMerge(data, start, midpoint, end, copy);

CopyArray(copy, data, start, end);
}

void MergeSort::CopyArray(int src[], int dst[], int start, int end)
{
while (start < end) {
*(dst + start) = *(src + start);
++start;
}
}
//********************************************************************************

3. Quick Sort
Quick sort utilizes the idea of divide-and-conquer as well. But the sizes of two parts in quick sort are not predefined as merge sort. The sizes of two sub-problems depend on the selection of the pivotal value. For example the left side of the pivotal value are all less than the pivotal value and the right are all no less than pivotal value. Then the big problem is split into two sub-problems whose size is decided by the pivotal values. Then recursively resolve the sub-problems until they devolve into single element.
More details please refer to quick sort on wikipedia. And here is my homework implementation.

//********************************************************************************
//QuickSort.h
#ifndef QUICK_SORT_H
#define QUICK_SORT_H

#pragma once

#include <vector>

class QuickSort
{
public:
QuickSort();
~QuickSort();

void Sort(std::vector<int>& data);
void Sort(int data[], int size);

private:
void SortInternal(int data[], int start, int end);
int Partition(int data[], int start, int end);
};

#endif


// QuickSort.cpp
#include "stdafx.h"
#include "QuickSort.h"


QuickSort::QuickSort()
{
}

QuickSort::~QuickSort()
{
}

void QuickSort::Sort(std::vector<int>& data)
{
if (data.empty()) {
return;
}

Sort(&data[0], data.size());
}

void QuickSort::Sort(int data[], int size)
{
SortInternal(data, 0, size);
}

void QuickSort::SortInternal(int data[], int start, int end)
{
if (start < end) {
int pivotalIndex = Partition(data, start, end);
SortInternal(data, start, pivotalIndex);
SortInternal(data, pivotalIndex + 1, end);
}
}

int QuickSort::Partition(int data[], int start, int end)
{
#if 0
int midpoint = (start + end) >> 1;
int pivotalVal = data[midpoint];
// swap it to rightest value
data[midpoint] = data[end - 1];
data[end - 1] = pivotalVal;
int pivotalIndex = end - 1;
#else
    // take the last element as the pivotal value
int pivotalVal = data[end - 1];
int pivotalIndex = end - 1;
#endif
for (int temp, index = pivotalIndex - 1; index >= start; --index) {
if (data[index] > pivotalVal) {
temp = data[pivotalIndex - 1];
data[pivotalIndex] = data[index];
data[index] = temp;
data[pivotalIndex - 1] = pivotalVal;
--pivotalIndex;
}
}

return pivotalIndex;
}

//********************************************************************************

4. Binary Search
Binary search is the most important search algorithm in Computer Science. Should never forget this. More details please refer to binary search algorithm on wikipedia.

//********************************************************************************
// BinarySearch.h
#ifndef BINARY_SERACH_H
#define BINARY_SERACH_H

#pragma once

#include <vector>

class BinarySearch
{
public:
BinarySearch();
~BinarySearch();

bool Search(const std::vector<int>& data, int key, int& pos);
bool Search(const int data[], int start, int end, int key, int& pos);

bool Search_recursive(const std::vector<int>& data, int key, int& pos);
bool Search_recursive(const int data[], int start, int end, int key, int& pos);
};

#endif

// BinarySearch.cpp
#include "stdafx.h"
#include "BinarySearch.h"


BinarySearch::BinarySearch()
{
}


BinarySearch::~BinarySearch()
{
}


bool BinarySearch::Search(const std::vector<int>& data, int key, int& pos)
{
if (data.empty()) {
return false;
}
return Search(&data[0], 0, data.size(), key, pos);
}

bool BinarySearch::Search(const int data[], int start, int end, int key, int& pos)
{
pos = -1;
if (data == nullptr) {
return false;
}

int midpoint;
int tempStart = start;
int tempEnd = end;
while (tempStart <= tempEnd) {
midpoint = (tempStart + tempEnd) >> 1;
if (data[midpoint] == key) {
pos = midpoint;
return true;
}
else if (data[midpoint] > key) {
tempEnd = midpoint - 1;
}
else{
tempStart = midpoint + 1;
}
    }

return false;
}

bool BinarySearch::Search_recursive(const std::vector<int>& data, int key, int& pos)
{
if (data.empty()) {
return false;
}
return Search_recursive(&data[0], 0, data.size(), key, pos);
}

bool BinarySearch::Search_recursive(const int data[], int start, int end, int key, int& pos)
{
if (start > end || data == nullptr) {
pos = -1;
return false;
}

int midpoint = (start + end) >> 1;
if (data[midpoint] == key) {
pos = midpoint;
return true;
}
else if (data[midpoint] > key) {
return Search_recursive(data, start, midpoint - 1, key, pos);
}
else {
return Search_recursive(data, midpoint + 1, end, key, pos);
}
}

//********************************************************************************

5. Summary
These algorithms are all implemented in C/C++ standard like, qsort, std::sort, std::stable_sort, std::binary_search and those specific search functions for associated containers. If your application allows, should just directly use these implementation. They are all implemented with template and simply instantiate them with the correct types and use them as you need.

The characteristics of these 3 sorting algorithm are list on sorting algorithm on wikipedia. It is worth pointing out that even thought quick sort outperforms the other two sort algorithms in most cases, heap sort and merge sort are still used in real-time/time-critical systems just in case hitting the worst case, O(N*N), in quick sort. And heap sort is particularly useful in embedded world when the resource (mainly cache, memory) is limited, because it has O(1) space requirement. Merge sort comes to use when the internal order of the same elements are needed to keep after the sorting. In the rest case quick sort should do the work.

Bibliography:
[1] http://en.wikipedia.org/wiki/Sorting_algorithm
[2] http://en.wikipedia.org/wiki/Heapsort
[3] http://en.wikipedia.org/wiki/Merge_sort
[4] http://en.wikipedia.org/wiki/Quicksort
[5] http://en.wikipedia.org/wiki/Binary_search_algorithm
[6] http://crackprogramming.blogspot.co.uk/2012/11/heap-sort-c-implementation.html

Tuesday, 8 July 2014

C++ Performance - Array vs. std::vector

1. Problem
Continuing with the investigation on how data structure impacts on performance, another interesting points has been discovered. The access via array/pointer is much much faster than the access via std::vector. I believe that this point is compiler-dependent and the performance issue is reported on Microsoft MSVC 2005.

Please check the two assembly generated by two implementations. The huge difference can easily found on very simple 3 lines of codes,

//********************************************************************************
// m_Var is in std::vector type
// hidden is a pointer type and an array is passed to it.
    // if (m_Var[*(jclist+iwork[i]) - 1].m_Hidden) {
    if (hidden[*(jclist+iwork[i]) - 1]) {
        goto NEXT_VARIABLE;
    }
//********************************************************************************

Assembly 1 is the assembly generated for the pointer/array access and Assembly 2 is the assembly generated for std::vector access. The reason why this code is bloated is because MSVC compiler  enables the dynamic sanity checking (out of bound) on std::vector.

// Assembly 1:  Pointer/Array Access
//********************************************************************************
    // if (m_Var[*(jclist+iwork[i]) - 1].m_Hidden) {
    if (hidden[*(jclist+iwork[i]) - 1]) {
0040306F  mov         esi,dword ptr [esp+14h]
00403073  mov         ecx,dword ptr [esi+ecx*4]
00403076  mov         esi,dword ptr [esp+1Ch]
0040307A  cmp         byte ptr [ecx+esi-1],0
0040307F  jne         NEXT_VARIABLE (403050h)
        goto NEXT_VARIABLE;
    }
//********************************************************************************

// Assembly 2: std::vector Access
//********************************************************************************
    // if (hidden[*(jclist+iwork[i]) - 1]) {
    if (m_Var[*(jclist+iwork[i]) - 1].m_Hidden) {
00403119  mov         edx,dword ptr [esp+28h]
0040311D  mov         ebx,dword ptr [edx+eax*4]
00403120  mov         eax,dword ptr [esp+24h]
00403124  mov         esi,dword ptr [eax+2Ch]
00403127  mov         eax,dword ptr [esi+0Ch]
0040312A  mov         edx,dword ptr [esi+10h]
0040312D  mov         ebp,eax
0040312F  add         edx,eax
00403131  sub         ebx,1
00403134  cmp         ebp,edx
00403136  jbe         NEXT_VARIABLE+52h (403142h)
00403138  call        dword ptr [__imp___invalid_parameter_noinfo (405118h)]
0040313E  mov         ecx,dword ptr [esp+3Ch]
00403142  mov         eax,dword ptr [esi+0Ch]
00403145  mov         edx,dword ptr [esi+10h]
00403148  add         ebx,ebp
0040314A  add         edx,eax
0040314C  cmp         ebx,edx
0040314E  ja          NEXT_VARIABLE+64h (403154h)
00403150  cmp         ebx,eax
00403152  jae         NEXT_VARIABLE+6Eh (40315Eh)
00403154  call        dword ptr [__imp___invalid_parameter_noinfo (405118h)]
0040315A  mov         ecx,dword ptr [esp+3Ch]
0040315E  mov         eax,dword ptr [esi+10h]
00403161  add         eax,dword ptr [esi+0Ch]
00403164  cmp         ebx,eax
00403166  jb          NEXT_VARIABLE+82h (403172h)
00403168  call        dword ptr [__imp___invalid_parameter_noinfo (405118h)]
0040316E  mov         ecx,dword ptr [esp+3Ch]
00403172  mov         eax,dword ptr [esi+8]
00403175  cmp         eax,ebx
00403177  ja          NEXT_VARIABLE+8Bh (40317Bh)
00403179  sub         ebx,eax
0040317B  mov         edx,dword ptr [esi+4]
0040317E  mov         eax,dword ptr [edx+ebx*4]
00403181  cmp         byte ptr [eax+11h],0
00403185  jne         NEXT_VARIABLE (4030F0h)
        goto NEXT_VARIABLE;
    }
//********************************************************************************

2 Fixes:
This dynamic sanity checking can be disabled by setting _SECURE_SCL and _HAS_ITERATOR_DEBUGGING as 0. These options have been set as default 0 in release mode since Visual Studio 2008 and later. But unfortunately in my case I have to use older version of MSVC, because the performance regression happens between releases and releases. Some releases were done many years ago via MSVC 2005.

As BOOST C++ library suggested that the performance of STL is implementation dependent. For Microsoft MSVC disable dynamic sanity checking as suggested above. Or choose stlport that outperforms Miscosoft MSVC.

3. Summary
For hot spot code area, using array and pointer (preventing from using feature-laden C++, STL or algorithm) is not a bad choice. In my case Fortran obviously outperforms the C++ implementation, because any overhead can be extra especially in hot spot code area.

In order to gain better performance and good maintainability at the same time, personally I believe using C to implement the algorithms and then using C++ to wrap them is not a bad choice.

Bibiliograhy:
[1] http://msdn.microsoft.com/en-us/library/hh697468.aspx
[2] http://cowboyprogramming.com/2007/02/22/what-is-_invalid_parameter_noinfo-and-how-do-i-get-rid-of-it/
[3] http://www.boost.org/doc/libs/1_50_0/libs/geometry/doc/html/geometry/compilation.html
[4] http://sourceforge.net/projects/stlport/

Tuesday, 1 July 2014

C++ Performance - Data Structure and Memory Fragment

Computer science is all about data structure and algorithm. Given a certain algorithm a good or bad data structure means a lot to the performance of the algorithm. Here I would like to present you a real example that I have experienced recently that shows how data structure impacts on the memory fragment and hence further impacts on the performance of algorithm.

As I discussed the performance issue on my other blog entry, Performance - Running out of memory (heap), this article can be a supplement to show a real example that a good data structure reduces memory fragment and improves the performance.

1. Scenario Background
I am working on scientific/mathematical modelling application. Assume that you have a system that consists of half a million to a million variables and equations. In order to boost up the speed (and/or accuracy) various top-level algorithms are employed to check the sanity of the system and decompose this large system into sub-systems. And these algorithms are usually the hot-spots of the program for structural analysis and initialization/re-initialization when system encountering discontinuity. Therefore optimizing these algorithms are vital for the performance of the system in all.

No surprise that the system can be represented in a sparse-matrix (equations as row and  variables as column), whose mathematical name is Jacobian matrix. For such a huge system, easily reaching hundreds of millions elements. crafting a good data structure means a great deal to the performance of the algorithms. A good data structure design will save process memory consumption, reduce/remove the memory fragment and hence boost the performance. Here I would to say the implementation does matter.

2. Problems
Initially this algorithm is implemented in Fortran, which is always working well on thousands of tests with consistent performance. Until 5 years ago the company decided to replace Fortran with C/C++ implementation. With this implementation a bit of extra responsibilty is introduced into this C/C++ implementation. Together some optimization of this algorithm is made as well. 

However the problem pops up that for some test cases release by release this algorithm present dramatic performance degrading, overall taking ~5x times longer than usual. Initially I was suspecting this performance is caused by the nature of this algorithm, as the developer spotted this performance issue when he was developing/testing the C/C++ implementation. At that time he mentioned the defect of this implementation but did not investigate further. Badly he simply blamed the algorithm. (Even worse I followed the indication he made. This is usually what developers do when under pressure and under limited resource and time.)

However if the same data fed into Fortran implementation it gives consistent and much faster performance. After ruling out one guess after another it finally leads me to believe that it is the C++ implementation that causes the worsening performance. After verifying that it is a correct implementation but a slower implementation as well, other experiments also proves that it is the data structure of  C++ implementation that causes this performance issue. 

3. Data Structure 
Surprisingly the system still uses Fortran 77. This means that all the memory has to be allocated in advance before the variables come to use. (Can't imagine how reluctant the industry is to adopt the new technique even when the business is so behind the technology. Again it proves that how the programmers should do their job. Try to make thing right at the first place, otherwise you will not have many chances to improve it or you just decide not to.) Of course in Fortran 77 there is no feature-laden containers like in C++. So the data structure is very simple - big arrays
    - A huge array to contain all the Jacobian elements (size of  hundreds of millions, a few billions)
    - A big array to contain the start and end indices of Jacobian elements (size of  hundreds of thousands,  a million)
    - Big arrays for the properties for each equation/variable (size of  hundreds of thousands,  a million)

Data structure used in C/C++ implementation
The C++ data structure is quite intuitive. Simply chop the Jacobian matrix into row and column and save the dependent variables/equation (row/column) into a std::vector.

// ********************************************************************************
struct JacobianRowOrColumn {
    std::vector<int> dependent; // dependent variables/equations 
    bool assgined; //
    // ......
};

typedef JacobianRowOrColumn JacobianRow; // equation has dependent variables
typedef JacobianRowOrColumn JacobinColumn; // variable has dependent equations
// ********************************************************************************

This data structure is very straightforward and simple. And this is not a bad design if not considering the scale of the data and complexity of this system. In the case I am  investigating, it has ~130K variables, ~130 equations, (the number of variables = the number of equations), and roughly ~25 million Jacobian elements. The test shows,
    - C++ version uses ~4x memory as much as Fortran
    - C++ version runs ~10x slower than Fortran version

There is no surprise that the more memory is used in C++, because the std::vector is used. More memory allocated/committed than it actually needs. The performance is really a surprise. I am convinced that the worsening performance is caused the memory fragment because a plain C++ implementation is developed and it has similar performance with Fortran, roughly ~25% slower. (From my personal experience this is language difference between C/C++ and Fortran.)

I have to dig a bit deep into what actually causes the performance degrading. It has to be the memory fragment. For the Fortran and the plain C++ implementation all the Jaciban matrix is allocated in a big array and and pointers to the array are passed around for calculation/computation. However for the C++ implementation the data structure means that ~260K small std::vector is created and committed in the stack and heap (instance plus the data). Plus the overhead the higher consumption is inevitable and slowing the performance is the only explanations. Besides as I explained in Section 1 these top-level algorithms are usually hot-spots in the program. The data allocation/releasing are done many times when solving the system/equations. 

Fix of this data structure
In the plain C++ implementation the idea is to reduce the frequency of small memory allocation/releasing and keep the data structure for readability and maintainability.
    - Keep a big array for the Jacobian matrix to reduce the memory fragment
    - Use a pointer to the big array to record the reference to dependent variables/equations for better design.

The new data structure is modified as, 
// ********************************************************************************
struct JacobianRowOrColumn {
    int* dependent; // dependent variables/equations 
    size_t count_of_dependent; // number of dependent variables/equations
    bool assgined; //
    // ......
};

typedef JacobianRowOrColumn JacobianRow; // equation has dependent variables
typedef JacobianRowOrColumn JacobinColumn; // variable has dependent equations
// ********************************************************************************

This solution is quite straight forward. Replacing std::vector with a pointer plus an extra data member - the size of dependent variables/equations. But here the pointer is not to be assigned with new operator or malloc. Following with Fortran implementation a big array will be allocated to contain the Jacobian matrix. And the pointer in the JacobianRow and JacobianColumn will be assigned with the big array plus the offset. With this data structure it removes all the small memory allocations by std::vector. Hence removes the memory fragment and at the same time the memory usage is going down by removing the overhead of std::vector. Better memory usage and faster memory access.

The result shows that the plain C++ implementation is marginally slower (~25%) than Fortran and the peak and committed memory usage is <10%  more than Fortran implementation. In my experience these numbers are very good and they come down to the language difference.

4. Summary
Tuning performance can be a fun and tough work. Having the right tools and methodology is the key. The right tools helps developers quickly find the hot-spots, for instance the most expensive path. The right methodology helps you always navigate into the correct direction. Always focus on the hot-spots in the descending order. And find a benchmark if working on regression performance.

The first thing to focus on - algorithm rather than implementation. This is a rear case that this performance degrading is caused by the implementation. Most of cases I have worked so far are related to algorithms. For instance O(N^2) algorithm to O(long(N)). And O(log(N)) to constant time. For instance use hash table to replace std::map.

Obviously this is very good example that shows how implementation is important for performance. The key is to understand your business domain or gain deep knowledge of your application. Be clear about the data size, the system complexity and the data flow. A big data application and/or a high performance real-time application  is definitely have extra design and implementation challenge. Understand the requirements before starting design and implementation.

Bibliography:
[1] Donald E. Knuth "The Art of Computer Programming", Volume 1, Fundamental Algorithms, Third Edition
[2] Donald E. Knuth "The Art of Computer Programming", Volume 2, Seminumerical Algorithms, Third Edition