2013年8月2日 星期五

Easy Minutely Candle-Stick Data By MapReduce under CDH4.2.0 / CM4.5.0 --(2) Streaming


Environment:


OS: CentOS 6.2
Hadoop: CDH4.2.0 install via CM4.5.0
Hosts:
     Hostname  processes          ip
     ========  =================  ================
     master01  Namenode(CDH4)     192.168.122.11
     smaster   SecondaryNamenode  192.168.122.10
               JobTracker
     knode020  Datanode           192.168.122.20
               TaskTracker
     knode021  Datanode           192.168.122.21
               TaskTracker

MapReduce Version: 0.20.2


TWSE(Stock and warrant ticks)
  |
  +-> socket client(with libhdfs.so)
       |
       +-> HDFS(Stored by date)
             |
             +->MapReduce(candle-map, candle-reduce)
                  |
                  +-> HDFS(minutely candle stick data)

Map & Reduce Classes

candle-map.cc

#include <iostream>
#include <string>
#include <sstream>


const char kKVDelim = '\t';
const char kSecKDelim = ',';
const char kLineDelimitor = '\n';
class Mapper {
   public:
   Mapper(std::istream & input_stream){
       std::string timestamp;
       std::string symbol;
       std::string SEQN;
       std::string HiLoMark;
       std::string MatchTime;
       std::string MatchPrice;
       std::string MatchVolumn;
       std::string line;
       std::string token;
       while(getline(input_stream, line, kLineDelimitor))
       {
         int cnt = 0;
         std::istringstream issline(line);
         symbol=SEQN=MatchTime=HiLoMark=MatchPrice=MatchVolumn=timestamp="";
         while(getline(issline, token, kKVDelim) && cnt <= 6)
         {
           switch(cnt)
           {
           case 0:// timestamp
               timestamp = token;
               break;
           case 1:// symbol
               symbol = token;
               break;
           case 2:// SEQN
               SEQN = token;
               break;
           case 3:// MatchTime
               MatchTime = token;
               break;
           case 4:// HiLoMark
               HiLoMark = token;
               break;
           case 5:// MatchPrice
               MatchPrice = token;
               break;
           case 6:// MatchVolumn
               MatchVolumn = token;
               break;
           default: // order book
               break;
           }
           cnt++;
         }
         if(MatchPrice != "-")/*only match record*/
         {
           std::cout << symbol << kSecKDelim
              << MatchTime.substr(0,5) << kKVDelim
              << MatchTime.substr(6,2) << kKVDelim
              << SEQN << kKVDelim
              << timestamp << kKVDelim
              << HiLoMark << kKVDelim
              << MatchPrice << kKVDelim
              << MatchVolumn << std::endl;
         }
       }
   }
};
int main(int argc, char** argv) {
  Mapper mapper(std::cin);
  return 0;
}


candle-reduce.cc

#include <iostream>
#include <stdexcept>
#include <string>
#include <sstream>


const char kSecKDelim = ',';
const char kKVDelim = '\t';
const char kLineDelimitor = '\n';

class Reducer {
   public :
   Reducer(std::istream & input_stream){

       std::string line;
       std::string token;
       int high = 0;
       int open = 0;
       int low = 0;
       int close = 0;
       int vol = 0;
       int open_seqn = 0;
       int close_seqn = 0;
       int seqn = 0;
       int last_error = 0;
       std::string last_symbol="";
       std::string last_MatchHHMM;
       while(getline(input_stream, line, kLineDelimitor))
       {
         int cnt = 0;
         std::string symbol;
         std::string MatchHHMM;
         int val=0;
         last_error = 0;
         std::istringstream issline(line);
         while(getline(issline, token, kKVDelim) && cnt < 8)
         {
           try {
             switch(cnt)
             {
             case 0: //symbol,MatchHHMM
             {
               size_t delim_pos = token.find(kSecKDelim);
               if (delim_pos != std::string::npos) {
                 symbol = token.substr(0, delim_pos);
                 MatchHHMM = token.substr(delim_pos + 1);
               }
             }

               break;
             case 2: //SEQN
               seqn = std::stoi(token); // you need compile stoi with '-std=c++0x' option
               break;
             case 5:// MatchPrice
               val = std::stoi(token);
               if(open_seqn == 0 || open_seqn  > seqn)
               {
                 open = val;
                 open_seqn = seqn;
               }
               if(high == 0 || val > high)
                 high = val;
               if(low == 0 || val < low)
                 low = val;
               if(close_seqn == 0 || close_seqn < seqn)
               {
                 close = val;
                 close_seqn = seqn;
               }
               break;
             case 6:// MatchValumn
               val = std::stoi(token); /*total volumn*/
               if(val > vol)
                 vol=val;
               break;
             default:
               break;
             }
           }
           catch (const std::invalid_argument& ia) {
             std::cerr << "Invalid argument: " << ia.what() << '\n';
             last_error = 1;
             break;
           }
           catch (const std::out_of_range& oor) {
             std::cerr << "Out of Range error: " << oor.what() << '\n';
             last_error = 1;
             break;
           }

           if(  cnt==0 && (last_symbol != symbol || last_MatchHHMM != MatchHHMM))
           {
             if(last_symbol != "")
             {
               std::cout << last_symbol << kKVDelim
                  << last_MatchHHMM << kKVDelim
                  << open << kKVDelim
                  << high << kKVDelim
                  << low << kKVDelim
                  << close << kKVDelim
                  << vol << std::endl;
             }
             last_MatchHHMM = MatchHHMM;
             last_symbol = symbol;
             high = 0;
             open = 0;
             low = 0;
             close = 0;
             vol = 0;
             open_seqn = 0;
             close_seqn = 0;
             seqn = 0;
           }
           cnt++;
         }

       }
       if(last_error ==0)
       {

             std::cout << last_symbol << kKVDelim
                << last_MatchHHMM << kKVDelim
                << open << kKVDelim
                << high << kKVDelim
                << low << kKVDelim
                << close << kKVDelim
                << vol << std::endl;
       }
     }
   };
int main(int argc, char** argv) {
  Reducer reducer(std::cin);
  return 0;
}

Compile it:


[kccs@master01 candle-streaming]$ cat compile.sh
#!/bin/bash
g++ -o candle-map candle-map.cc
g++ -std=c++0x -o candle-reduce candle-reduce.cc

Run it:


[kccs@master01 candle-streaming]$ cat run2.sh
#!/bin/bash
WHO=`whoami`
inputPath=/kccs/$1/
outputPath=/user/$WHO/candle/$1
hdfs dfs -test -e $inputPath
if [ ! $? == "0" ]; then
echo "no tick data exist !!!"
exit 1
fi

WHO=`whoami`

hdfs dfs -test -e $outputPath
if [ $? == "0" ]; then
hdfs dfs -rm -r $outputPath
fi
echo "candle stick start...."
## you could test(debug) by local command
## cat [data source file] | ./candle-map | ./candle-reduce
hadoop jar /usr/lib/hadoop-0.20-mapreduce/contrib/streaming/hadoop-streaming-*.jar -file ./candle-map -mapper "./candle-map" -file ./candle-reduce -reducer ./candle-reduce -input $inputPath -output $outputPath


Result:


[kccs@master01 candle]$ hdfs dfs -cat /user/kccs/candle/20130730/part-00000
:
:
:
9958    12:24   1275    1280    1275    1280    543
9958    12:26   1280    1285    1280    1285    570
9958    12:27   1285    1285    1285    1285    575
9958    12:28   1285    1285    1285    1285    577
9958    12:30   1285    1285    1280    1280    580
9958    12:31   1280    1280    1280    1280    582
9958    12:32   1280    1280    1280    1280    587
9958    12:33   1280    1280    1280    1280    592
9958    12:34   1280    1280    1280    1280    612
9958    12:38   1275    1275    1275    1275    614
9958    12:43   1275    1275    1275    1275    621
9958    12:44   1275    1275    1275    1275    631
9958    12:45   1280    1280    1280    1280    632
9958    12:47   1275    1275    1275    1275    634
9958    12:51   1275    1275    1275    1275    644
9958    12:53   1275    1275    1275    1275    645
9958    12:58   1275    1275    1275    1275    647
9958    13:00   1275    1275    1275    1275    649
9958    13:01   1275    1275    1275    1275    652
9958    13:06   1280    1280    1280    1280    653
9958    13:10   1280    1280    1280    1280    673
9958    13:11   1280    1280    1280    1280    675
9958    13:12   1280    1280    1280    1280    680
9958    13:18   1275    1275    1275    1275    710
9958    13:19   1275    1275    1275    1275    717
9958    13:22   1275    1275    1275    1275    718
9958    13:23   1280    1280    1280    1280    723
9958    13:24   1280    1280    1280    1280    724
9958    13:30   1275    1275    1275    1275    737

Compare:

run trip time(sec)
+-----------------------------+
|run # | Streaming | Java     |
|------+-----------+----------|
| 1    |  28       |  24      |
| 2    |  24       |  24      |
| 3    |  23       |  22      |
| 4    |  25       |  26      |
| 5    |  31       |  25      |
|------+-----------+----------|
|AVG   |  26.2     |  24.2    | 
+-----------------------------+

沒有留言:

張貼留言

文章分類