Environment:
OS: CentOS 6.2
Hadoop: CDH4.2.0 install via CM4.5.0
Hosts:
Hostname processes ip
======== ================= ================
master01 Namenode(CDH4) 192.168.122.11
smaster SecondaryNamenode 192.168.122.10
JobTracker
knode020 Datanode 192.168.122.20
TaskTracker
knode021 Datanode 192.168.122.21
TaskTracker
MapReduce Version: 0.20.2
TWSE(Stock and warrant ticks)
|
+-> socket client(with libhdfs.so)
|
+-> HDFS(Stored by date)
|
+->MapReduce(candle-map, candle-reduce)
|
+-> HDFS(minutely candle stick data)
Map & Reduce Classes
candle-map.cc
#include <iostream>#include <string>
#include <sstream>
const char kKVDelim = '\t';
const char kSecKDelim = ',';
const char kLineDelimitor = '\n';
class Mapper {
public:
Mapper(std::istream & input_stream){
std::string timestamp;
std::string symbol;
std::string SEQN;
std::string HiLoMark;
std::string MatchTime;
std::string MatchPrice;
std::string MatchVolumn;
std::string line;
std::string token;
while(getline(input_stream, line, kLineDelimitor))
{
int cnt = 0;
std::istringstream issline(line);
symbol=SEQN=MatchTime=HiLoMark=MatchPrice=MatchVolumn=timestamp="";
while(getline(issline, token, kKVDelim) && cnt <= 6)
{
switch(cnt)
{
case 0:// timestamp
timestamp = token;
break;
case 1:// symbol
symbol = token;
break;
case 2:// SEQN
SEQN = token;
break;
case 3:// MatchTime
MatchTime = token;
break;
case 4:// HiLoMark
HiLoMark = token;
break;
case 5:// MatchPrice
MatchPrice = token;
break;
case 6:// MatchVolumn
MatchVolumn = token;
break;
default: // order book
break;
}
cnt++;
}
if(MatchPrice != "-")/*only match record*/
{
std::cout << symbol << kSecKDelim
<< MatchTime.substr(0,5) << kKVDelim
<< MatchTime.substr(6,2) << kKVDelim
<< SEQN << kKVDelim
<< timestamp << kKVDelim
<< HiLoMark << kKVDelim
<< MatchPrice << kKVDelim
<< MatchVolumn << std::endl;
}
}
}
};
int main(int argc, char** argv) {
Mapper mapper(std::cin);
return 0;
}
candle-reduce.cc
#include <iostream>#include <stdexcept>
#include <string>
#include <sstream>
const char kSecKDelim = ',';
const char kKVDelim = '\t';
const char kLineDelimitor = '\n';
class Reducer {
public :
Reducer(std::istream & input_stream){
std::string line;
std::string token;
int high = 0;
int open = 0;
int low = 0;
int close = 0;
int vol = 0;
int open_seqn = 0;
int close_seqn = 0;
int seqn = 0;
int last_error = 0;
std::string last_symbol="";
std::string last_MatchHHMM;
while(getline(input_stream, line, kLineDelimitor))
{
int cnt = 0;
std::string symbol;
std::string MatchHHMM;
int val=0;
last_error = 0;
std::istringstream issline(line);
while(getline(issline, token, kKVDelim) && cnt < 8)
{
try {
switch(cnt)
{
case 0: //symbol,MatchHHMM
{
size_t delim_pos = token.find(kSecKDelim);
if (delim_pos != std::string::npos) {
symbol = token.substr(0, delim_pos);
MatchHHMM = token.substr(delim_pos + 1);
}
}
break;
case 2: //SEQN
seqn = std::stoi(token); // you need compile stoi with '-std=c++0x' option
break;
case 5:// MatchPrice
val = std::stoi(token);
if(open_seqn == 0 || open_seqn > seqn)
{
open = val;
open_seqn = seqn;
}
if(high == 0 || val > high)
high = val;
if(low == 0 || val < low)
low = val;
if(close_seqn == 0 || close_seqn < seqn)
{
close = val;
close_seqn = seqn;
}
break;
case 6:// MatchValumn
val = std::stoi(token); /*total volumn*/
if(val > vol)
vol=val;
break;
default:
break;
}
}
catch (const std::invalid_argument& ia) {
std::cerr << "Invalid argument: " << ia.what() << '\n';
last_error = 1;
break;
}
catch (const std::out_of_range& oor) {
std::cerr << "Out of Range error: " << oor.what() << '\n';
last_error = 1;
break;
}
if( cnt==0 && (last_symbol != symbol || last_MatchHHMM != MatchHHMM))
{
if(last_symbol != "")
{
std::cout << last_symbol << kKVDelim
<< last_MatchHHMM << kKVDelim
<< open << kKVDelim
<< high << kKVDelim
<< low << kKVDelim
<< close << kKVDelim
<< vol << std::endl;
}
last_MatchHHMM = MatchHHMM;
last_symbol = symbol;
high = 0;
open = 0;
low = 0;
close = 0;
vol = 0;
open_seqn = 0;
close_seqn = 0;
seqn = 0;
}
cnt++;
}
}
if(last_error ==0)
{
std::cout << last_symbol << kKVDelim
<< last_MatchHHMM << kKVDelim
<< open << kKVDelim
<< high << kKVDelim
<< low << kKVDelim
<< close << kKVDelim
<< vol << std::endl;
}
}
};
int main(int argc, char** argv) {
Reducer reducer(std::cin);
return 0;
}
Compile it:
[kccs@master01 candle-streaming]$ cat compile.sh
#!/bin/bash
g++ -o candle-map candle-map.cc
g++ -std=c++0x -o candle-reduce candle-reduce.cc
Run it:
[kccs@master01 candle-streaming]$ cat run2.sh
#!/bin/bash
WHO=`whoami`
inputPath=/kccs/$1/
outputPath=/user/$WHO/candle/$1
hdfs dfs -test -e $inputPath
if [ ! $? == "0" ]; then
echo "no tick data exist !!!"
exit 1
fi
WHO=`whoami`
hdfs dfs -test -e $outputPath
if [ $? == "0" ]; then
hdfs dfs -rm -r $outputPath
fi
echo "candle stick start...."
## you could test(debug) by local command
## cat [data source file] | ./candle-map | ./candle-reduce
hadoop jar /usr/lib/hadoop-0.20-mapreduce/contrib/streaming/hadoop-streaming-*.jar -file ./candle-map -mapper "./candle-map" -file ./candle-reduce -reducer ./candle-reduce -input $inputPath -output $outputPath
Result:
[kccs@master01 candle]$ hdfs dfs -cat /user/kccs/candle/20130730/part-00000
:
:
:
9958 12:24 1275 1280 1275 1280 543
9958 12:26 1280 1285 1280 1285 570
9958 12:27 1285 1285 1285 1285 575
9958 12:28 1285 1285 1285 1285 577
9958 12:30 1285 1285 1280 1280 580
9958 12:31 1280 1280 1280 1280 582
9958 12:32 1280 1280 1280 1280 587
9958 12:33 1280 1280 1280 1280 592
9958 12:34 1280 1280 1280 1280 612
9958 12:38 1275 1275 1275 1275 614
9958 12:43 1275 1275 1275 1275 621
9958 12:44 1275 1275 1275 1275 631
9958 12:45 1280 1280 1280 1280 632
9958 12:47 1275 1275 1275 1275 634
9958 12:51 1275 1275 1275 1275 644
9958 12:53 1275 1275 1275 1275 645
9958 12:58 1275 1275 1275 1275 647
9958 13:00 1275 1275 1275 1275 649
9958 13:01 1275 1275 1275 1275 652
9958 13:06 1280 1280 1280 1280 653
9958 13:10 1280 1280 1280 1280 673
9958 13:11 1280 1280 1280 1280 675
9958 13:12 1280 1280 1280 1280 680
9958 13:18 1275 1275 1275 1275 710
9958 13:19 1275 1275 1275 1275 717
9958 13:22 1275 1275 1275 1275 718
9958 13:23 1280 1280 1280 1280 723
9958 13:24 1280 1280 1280 1280 724
9958 13:30 1275 1275 1275 1275 737
Compare:
run trip time(sec)
+-----------------------------+
|run # | Streaming | Java |
|------+-----------+----------|
| 1 | 28 | 24 |
| 2 | 24 | 24 |
| 3 | 23 | 22 |
| 4 | 25 | 26 |
| 5 | 31 | 25 |
|------+-----------+----------|
|AVG | 26.2 | 24.2 |
+-----------------------------+
沒有留言:
張貼留言