Environment:
OS: CentOS 6.2
Hadoop: CDH4.2.0 install via CM4.5.0
Hosts:
Hostname processes ip
======== ================= ================
master01 Namenode(CDH4) 192.168.122.11
smaster SecondaryNamenode 192.168.122.10
JobTracker
knode020 Datanode 192.168.122.20
TaskTracker
knode021 Datanode 192.168.122.21
TaskTracker
MapReduce Version: 0.20.2
TWSE(Stock and warrant ticks)
|
+-> socket client(with libhdfs.so)
|
+-> HDFS(Stored by date)
|
+->MapReduce(candle-pipes.cc)
|
+-> HDFS(minutely candle stick data)
Map & Reduce Classes
candle-pipes.cc
#include <iostream>
#include <string> // for std::stoi
#include <sstream> // for std::istringstream
#include <stdexcept> // for exception object
#include <vector> // for vector<>
#include <hadoop/Pipes.hh>
#include <hadoop/TemplateFactory.hh>
#include <hadoop/StringUtils.hh>
const char kKVDelim = '\t';
const char kSecKDelim = ',';
const char kLineDelimitor = '\n';
const char* sKVDelim = "\t";
const char* sSecKDelim = ",";
const char* sLineDelimitor = "\n";
class CandlePipesMapper : public HadoopPipes::Mapper{
public:
CandlePipesMapper(HadoopPipes::TaskContext& context){}
void map(HadoopPipes::MapContext& context){
std::string timestamp;
std::string symbol;
std::string SEQN;
std::string HiLoMark;
std::string MatchTime;
std::string MatchPrice;
std::string MatchVolumn;
std::string line = context.getInputValue();
std::vector< std::string > words = HadoopUtils::splitString( line, sKVDelim );
if(words.size() > 6 && words[5] != "-") /*MatchPrice != "-"*/
{
timestamp = words[0];
symbol = words[1];
SEQN = words[2];
MatchTime = words[3];
HiLoMark = words[4];
MatchPrice = words[5];
MatchVolumn = words[6];
context.emit(symbol + sKVDelim + MatchTime.substr(0,5),
MatchTime.substr(6,2) + sKVDelim
+ SEQN + sKVDelim
+ timestamp + sKVDelim
+ HiLoMark + sKVDelim
+ MatchPrice + sKVDelim
+ MatchVolumn);
}
}
};
class CandlePipesReducer : public HadoopPipes::Reducer{
public:
CandlePipesReducer(HadoopPipes::ReduceContext& context){}
void reduce(HadoopPipes::ReduceContext& context){
std::string line;
std::string token;
int high = 0;
int open = 0;
int low = 0;
int close = 0;
int vol = 0;
int open_seqn = 0;
int close_seqn = 0;
int seqn = 0;
int b_received = 0;
std::string last_symbol="";
std::string last_MatchHHMM;
while(context.nextValue())
{
int val=0;
std::string values = context.getInputValue();
std::vector< std::string > words = HadoopUtils::splitString( values, sKVDelim );
if(words.size() >= 6 ) /*MatchPrice != "-"*/
{
try{
/*match seqn*/
seqn = std::stoi(words[1]);
/*match price*/
val = std::stoi(words[4]);
if(open_seqn == 0 || open_seqn > seqn)
{
open = val;
open_seqn = seqn;
}
if(high == 0 || val > high)
high = val;
if(low == 0 || val < low)
low = val;
if(close_seqn == 0 || close_seqn < seqn)
{
close = val;
close_seqn = seqn;
}
/*match volumn*/
val = std::stoi(words[5]); /*total volumn*/
if(val > vol)
vol=val;
}
catch (const std::invalid_argument& ia) {
std::cerr << "Invalid argument: " << ia.what() << '\n';
continue;
}
catch (const std::out_of_range& oor) {
std::cerr << "Out of Range error: " << oor.what() << '\n';
continue;
}
b_received = 1;
}
}
if(b_received)
{
context.emit(context.getInputKey(),
HadoopUtils::toString(open) + sKVDelim
+ HadoopUtils::toString(high) + sKVDelim
+ HadoopUtils::toString(low) + sKVDelim
+ HadoopUtils::toString(close) + sKVDelim
+ HadoopUtils::toString(vol) );
}
}
};
int main(int argc , char *argv[])
{
return HadoopPipes::runTask(HadoopPipes::TemplateFactory<CandlePipesMapper, CandlePipesReducer>());
}
Compile it:
[kccs@master01 candle-pipes]$ cat compile.sh
#!/bin/bash
make
if [ ! -e ~/bin/candle_stick ]; then
mkdir ~/bin/candle_stick
fi
#cp run2.sh ~/bin/candle_stick/.
##cp run-test.sh ~/bin/candle_stick/.
#cp envpre.sh ~/bin/candle_stick/.
#cp candle.jar ~/bin/candle_stick/.
[kccs@master01 candle-pipes]$ cat makefile
CC = g++
MAPREDUCE_INSTALL=/usr/lib/hadoop-0.20-mapreduce
CPPFLAGS = -m64 -I$(MAPREDUCE_INSTALL)/include
candle-pipes: candle-pipes.cc
$(CC) $(CPPFLAGS) $< -Wall -L$(MAPREDUCE_INSTALL)/lib/native -std=c++0x -lhadooppipes -lhadooputils -lpthread -lssl -lcrypto -g -O2 -o $@
clean:
rm -f *.o *.a *.s candle-pipes
## you need openssl-devel for libssl.so and libcrypto.so
Run it:
[kccs@master01 candle-pipes]$ cat run2.sh
#!/bin/bash
WHO=`whoami`
inputPath=/kccs/$1/
#inputPath=candle-input
outputPath=/user/$WHO/candle/$1
#outputPath=candle-output
program=candle-pipes
hdfs dfs -test -e $inputPath
if [ ! $? == "0" ]; then
echo "no tick data exist !!!"
exit 1
fi
hdfs dfs -test -e $outputPath
if [ $? == "0" ]; then
hdfs dfs -rm -r $outputPath
fi
hdfs dfs -test -d bin
if [ ! $? == "0" ]; then
hdfs dfs -mkdir bin
fi
hdfs dfs -rm ./candle-pipes bin/$program
hdfs dfs -put ./candle-pipes bin/$program
echo "candle stick start...."
hadoop pipes -D hadoop.pipes.java.recordreader=true -D hadoop.pipes.java.recordwriter=true -input $inputPath -output $outputPath -program bin/candle-pipes
[kccs@master01 candle-pipes]$ ./run2.sh YYYYMMDD
ex.
[kccs@master01 candle-pipes]$ ./run2.sh 20130801
Result:
[kccs@master01 candle]$ hdfs dfs -cat /user/kccs/candle/20130801/part-00000
:
:
:
9958 10:49 1265 1265 1265 1265 201
9958 10:51 1265 1265 1265 1265 247
9958 11:00 1260 1260 1260 1260 257
9958 11:03 1260 1260 1260 1260 313
9958 11:04 1260 1260 1260 1260 316
9958 11:10 1260 1260 1260 1260 318
9958 11:11 1260 1260 1260 1260 328
9958 11:14 1260 1260 1260 1260 336
9958 11:17 1260 1260 1260 1260 338
9958 11:19 1260 1260 1260 1260 340
9958 11:25 1260 1260 1260 1260 342
9958 11:30 1260 1260 1260 1260 358
9958 11:33 1255 1255 1255 1255 363
9958 11:35 1260 1260 1260 1260 364
9958 11:47 1255 1255 1255 1255 367
9958 11:54 1255 1255 1255 1255 368
9958 12:00 1260 1260 1260 1260 369
9958 12:06 1255 1255 1255 1255 394
9958 12:09 1255 1255 1255 1255 396
9958 12:25 1260 1260 1260 1260 397
9958 12:34 1255 1255 1255 1255 417
9958 12:35 1255 1255 1255 1255 418
9958 12:42 1255 1255 1255 1255 420
9958 12:50 1255 1255 1255 1255 430
9958 13:00 1255 1255 1255 1255 431
9958 13:01 1255 1255 1255 1255 432
9958 13:11 1255 1255 1255 1255 442
9958 13:12 1255 1255 1255 1255 452
9958 13:14 1255 1255 1255 1255 455
9958 13:17 1255 1255 1255 1255 460
9958 13:19 1255 1260 1255 1260 471
9958 13:24 1260 1260 1260 1260 480
9958 13:30 1260 1260 1260 1260 511
Compare:
run input-paths-to-process/trip-time(sec)/cpu-time(ms)
+--------------------------------------------------+
|run # | Streaming | Java | Pipes |
|---------+-------------+------------+-------------+
| 1 | 5/28/- | 5/29/14100 | 5/33/13180 |
| 2 | 5/26/- | 5/24/16240 | 5/32/13540 |
| 3 | 5/25/- | 5/27/16480 | 5/33/11540 |
| 4 | 5/26/- | 5/29/13770 | 5/33/13460 |
| 5 | 5/31/- | 5/24/16530 | 5/34/12440 |
|---------+-------------+------------+-------------+
|AVG | 27.2 | 26.6 | 33.0 |
|run-trip | | | |
+--------------------------------------------------+
memory physical/virtual
+----------------------------------------------------------------+
|run # | Streaming | Java | Pipes |
|------+-----------+----------------------+----------------------+
| 1 | - | 954920960/5369929728 | 863948800/5397696512 |
| 2 | - | 959614976/5370134528 | 860721152/5397913600 |
| 3 | - | 910581760/5689860096 | 878026752/5067251712 |
| 4 | - |1032314880/5074067456 | 854880256/5390655488 |
| 5 | - | 903376896/5691715584 | 848859136/5701148672 |
+----------------------------------------------------------------+