2013年8月7日 星期三

Easy Minutely Candle-Stick Data By MapReduce under CDH4.2.0 / CM4.5.0 --(3) Pipes

Environment:



OS: CentOS 6.2
Hadoop: CDH4.2.0 install via CM4.5.0
Hosts:
     Hostname  processes          ip
     ========  =================  ================
     master01  Namenode(CDH4)     192.168.122.11
     smaster   SecondaryNamenode  192.168.122.10
               JobTracker
     knode020  Datanode           192.168.122.20
               TaskTracker
     knode021  Datanode           192.168.122.21
               TaskTracker

MapReduce Version: 0.20.2


TWSE(Stock and warrant ticks)
  |
  +-> socket client(with libhdfs.so)
       |
       +-> HDFS(Stored by date)
             |
             +->MapReduce(candle-pipes.cc)
                  |
                  +-> HDFS(minutely candle stick data)

Map & Reduce Classes


candle-pipes.cc

#include <iostream>
#include <string>  // for std::stoi
#include <sstream> // for std::istringstream
#include <stdexcept> // for exception object
#include <vector>  // for vector<>


#include <hadoop/Pipes.hh>
#include <hadoop/TemplateFactory.hh>
#include <hadoop/StringUtils.hh>
const char kKVDelim = '\t';
const char kSecKDelim = ',';
const char kLineDelimitor = '\n';

const char* sKVDelim = "\t";
const char* sSecKDelim = ",";
const char* sLineDelimitor = "\n";

class CandlePipesMapper : public HadoopPipes::Mapper{
public:
  CandlePipesMapper(HadoopPipes::TaskContext& context){}
  void map(HadoopPipes::MapContext& context){
       std::string timestamp;
       std::string symbol;
       std::string SEQN;
       std::string HiLoMark;
       std::string MatchTime;
       std::string MatchPrice;
       std::string MatchVolumn;
       std::string line = context.getInputValue();

       std::vector< std::string > words = HadoopUtils::splitString( line, sKVDelim );
       if(words.size() > 6 && words[5] != "-") /*MatchPrice != "-"*/
       {
          timestamp = words[0];
          symbol = words[1];
          SEQN = words[2];
          MatchTime = words[3];
          HiLoMark = words[4];
          MatchPrice = words[5];
          MatchVolumn = words[6];

          context.emit(symbol + sKVDelim + MatchTime.substr(0,5),
             MatchTime.substr(6,2) + sKVDelim
                  + SEQN + sKVDelim
                  + timestamp + sKVDelim
                  + HiLoMark + sKVDelim
                  + MatchPrice + sKVDelim
                  + MatchVolumn);

       }

  }
};

class CandlePipesReducer : public HadoopPipes::Reducer{
public:
  CandlePipesReducer(HadoopPipes::ReduceContext& context){}
  void reduce(HadoopPipes::ReduceContext& context){
       std::string line;
       std::string token;
       int high = 0;
       int open = 0;
       int low = 0;
       int close = 0;
       int vol = 0;
       int open_seqn = 0;
       int close_seqn = 0;
       int seqn = 0;
       int b_received = 0;
       std::string last_symbol="";
       std::string last_MatchHHMM;
       while(context.nextValue())
       {
         int val=0;
         std::string values = context.getInputValue();

         std::vector< std::string > words = HadoopUtils::splitString( values, sKVDelim );
         if(words.size() >= 6 ) /*MatchPrice != "-"*/
         {
           try{
             /*match seqn*/
             seqn = std::stoi(words[1]);

             /*match price*/
             val = std::stoi(words[4]);
             if(open_seqn == 0 || open_seqn  > seqn)
             {
               open = val;
               open_seqn = seqn;
             }
             if(high == 0 || val > high)
               high = val;
             if(low == 0 || val < low)
               low = val;
             if(close_seqn == 0 || close_seqn < seqn)
             {
               close = val;
               close_seqn = seqn;
             }

             /*match volumn*/
             val = std::stoi(words[5]); /*total volumn*/
             if(val > vol)
               vol=val;
           }
           catch (const std::invalid_argument& ia) {
             std::cerr << "Invalid argument: " << ia.what() << '\n';
             continue;
           }
           catch (const std::out_of_range& oor) {
             std::cerr << "Out of Range error: " << oor.what() << '\n';
             continue;
           }


           b_received = 1;

         }


     }
     if(b_received)
     {
         context.emit(context.getInputKey(),
                  HadoopUtils::toString(open)  + sKVDelim
                  + HadoopUtils::toString(high) + sKVDelim
                  + HadoopUtils::toString(low) + sKVDelim
                  + HadoopUtils::toString(close) + sKVDelim
                  + HadoopUtils::toString(vol) );
     }

  }
};

int main(int argc , char *argv[])
{
  return HadoopPipes::runTask(HadoopPipes::TemplateFactory<CandlePipesMapper, CandlePipesReducer>());
}


Compile it:


[kccs@master01 candle-pipes]$ cat compile.sh
#!/bin/bash
make
if [ ! -e ~/bin/candle_stick ]; then
mkdir ~/bin/candle_stick
fi

#cp run2.sh ~/bin/candle_stick/.
##cp run-test.sh ~/bin/candle_stick/.
#cp envpre.sh ~/bin/candle_stick/.
#cp candle.jar ~/bin/candle_stick/.

[kccs@master01 candle-pipes]$ cat makefile
CC = g++
MAPREDUCE_INSTALL=/usr/lib/hadoop-0.20-mapreduce
CPPFLAGS = -m64 -I$(MAPREDUCE_INSTALL)/include

candle-pipes: candle-pipes.cc
        $(CC) $(CPPFLAGS) $< -Wall -L$(MAPREDUCE_INSTALL)/lib/native -std=c++0x -lhadooppipes -lhadooputils -lpthread -lssl -lcrypto -g -O2 -o $@
clean:
        rm -f *.o *.a *.s candle-pipes

## you need openssl-devel for libssl.so and libcrypto.so

Run it:


[kccs@master01 candle-pipes]$ cat run2.sh
#!/bin/bash
WHO=`whoami`
inputPath=/kccs/$1/
#inputPath=candle-input
outputPath=/user/$WHO/candle/$1
#outputPath=candle-output
program=candle-pipes

hdfs dfs -test -e $inputPath
if [ ! $? == "0" ]; then
echo "no tick data exist !!!"
exit 1
fi



hdfs dfs -test -e $outputPath
if [ $? == "0" ]; then
hdfs dfs -rm -r $outputPath
fi

hdfs dfs -test -d bin
if [ ! $? == "0" ]; then
hdfs dfs -mkdir bin
fi

hdfs dfs -rm ./candle-pipes bin/$program
hdfs dfs -put ./candle-pipes bin/$program

echo "candle stick start...."
hadoop pipes -D hadoop.pipes.java.recordreader=true -D hadoop.pipes.java.recordwriter=true -input $inputPath -output $outputPath -program bin/candle-pipes


[kccs@master01 candle-pipes]$ ./run2.sh YYYYMMDD
ex.
[kccs@master01 candle-pipes]$ ./run2.sh 20130801

Result:



[kccs@master01 candle]$ hdfs dfs -cat /user/kccs/candle/20130801/part-00000
:
:
:
9958    10:49   1265    1265    1265    1265    201
9958    10:51   1265    1265    1265    1265    247
9958    11:00   1260    1260    1260    1260    257
9958    11:03   1260    1260    1260    1260    313
9958    11:04   1260    1260    1260    1260    316
9958    11:10   1260    1260    1260    1260    318
9958    11:11   1260    1260    1260    1260    328
9958    11:14   1260    1260    1260    1260    336
9958    11:17   1260    1260    1260    1260    338
9958    11:19   1260    1260    1260    1260    340
9958    11:25   1260    1260    1260    1260    342
9958    11:30   1260    1260    1260    1260    358
9958    11:33   1255    1255    1255    1255    363
9958    11:35   1260    1260    1260    1260    364
9958    11:47   1255    1255    1255    1255    367
9958    11:54   1255    1255    1255    1255    368
9958    12:00   1260    1260    1260    1260    369
9958    12:06   1255    1255    1255    1255    394
9958    12:09   1255    1255    1255    1255    396
9958    12:25   1260    1260    1260    1260    397
9958    12:34   1255    1255    1255    1255    417
9958    12:35   1255    1255    1255    1255    418
9958    12:42   1255    1255    1255    1255    420
9958    12:50   1255    1255    1255    1255    430
9958    13:00   1255    1255    1255    1255    431
9958    13:01   1255    1255    1255    1255    432
9958    13:11   1255    1255    1255    1255    442
9958    13:12   1255    1255    1255    1255    452
9958    13:14   1255    1255    1255    1255    455
9958    13:17   1255    1255    1255    1255    460
9958    13:19   1255    1260    1255    1260    471
9958    13:24   1260    1260    1260    1260    480
9958    13:30   1260    1260    1260    1260    511


Compare:


run input-paths-to-process/trip-time(sec)/cpu-time(ms)
+--------------------------------------------------+
|run #    | Streaming   | Java       |    Pipes    |
|---------+-------------+------------+-------------+
| 1       | 5/28/-      | 5/29/14100 | 5/33/13180  |
| 2       | 5/26/-      | 5/24/16240 | 5/32/13540  |
| 3       | 5/25/-      | 5/27/16480 | 5/33/11540  |
| 4       | 5/26/-      | 5/29/13770 | 5/33/13460  |
| 5       | 5/31/-      | 5/24/16530 | 5/34/12440  |
|---------+-------------+------------+-------------+
|AVG      |   27.2      |   26.6     |   33.0      |
|run-trip |             |            |             |
+--------------------------------------------------+

memory physical/virtual
+----------------------------------------------------------------+
|run # | Streaming |         Java         |          Pipes       |
|------+-----------+----------------------+----------------------+
| 1    |    -      | 954920960/5369929728 | 863948800/5397696512 |
| 2    |    -      | 959614976/5370134528 | 860721152/5397913600 |
| 3    |    -      | 910581760/5689860096 | 878026752/5067251712 |
| 4    |    -      |1032314880/5074067456 | 854880256/5390655488 |
| 5    |    -      | 903376896/5691715584 | 848859136/5701148672 |
+----------------------------------------------------------------+

沒有留言:

張貼留言

文章分類