public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, Text> {
private Text keytext = new Text();
private Text valuetext = new Text();
public void map(LongWritable key, Text value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
int cnt = 0;
String line = value.toString();
String timestamp = new String();
String symbol = new String();
String SEQN = new String();
String HiLoMark = new String();
String MatchTime = new String();
String MatchPrice = new String();
String MatchVolumn = new String();
StringTokenizer tokenizer = new StringTokenizer(line,"\t\n"); // default delim = " \t\n\r\f"
while (tokenizer.hasMoreTokens() && cnt <7 ) {
switch(cnt)
{
case 0:// timestamp
timestamp = tokenizer.nextToken();
break;
case 1:// symbol
symbol = tokenizer.nextToken();
break;
case 2:// SEQN
SEQN = tokenizer.nextToken();
break;
case 3:// MatchTime
MatchTime = tokenizer.nextToken();
break;
case 4:// HiLoMark
HiLoMark = tokenizer.nextToken();
break;
case 5:// MatchPrice
MatchPrice = tokenizer.nextToken();
break;
case 6:// MatchVolumn
break;
case 6:// MatchVolumn
MatchVolumn = tokenizer.nextToken();
break;
default: // order book
tokenizer.nextToken();
break;
}
cnt++;
}
if(!MatchPrice.matches("-"))/*only match record*/
{
//keytext.set( String.format("%s\t%s", symbol,MatchTime.substring(0,5)));
keytext.set( String.format("%s\t%s", symbol,MatchTime.substring(0,5)));
valuetext.set( String.format("%s\t%s\t%s\t%s\t%s\t%s",MatchTime.substring(6,8),SEQN,timestamp,HiLoMark,MatchPrice,MatchVolumn));
output.collect(keytext, valuetext);
}
}
}
public static class Reduce extends MapReduceBase implements Reducer<Text, Text, Text, Text> {
private int high = 0;
private int open = 0;
private int low = 0;
private int close = 0;
private int vol = 0;
private int open_seqn = 0;
private int close_seqn = 0;
private Text candletext = new Text();
public void reduce(Text key, Iterator<Text> values, OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
high = 0;
open = 0;
low = 0;
close = 0;
vol = 0;
open_seqn = 0;
close_seqn=0;
while (values.hasNext()) {
String Value=values.next().toString();
StringTokenizer ValueTokenizer = new StringTokenizer(Value,"\t"); // default delim = " \t\n\r\f"
int ii = 0;
int seqn = 0;
while (ValueTokenizer.hasMoreTokens() && ii <6 ) {
int val = 0;
String column = ValueTokenizer.nextToken();
try
{
switch(ii++)
{
case 1: //SEQN
seqn = Integer.parseInt(column);
break;
case 4:// MatchPrice
val = Integer.parseInt(column);
if(open_seqn == 0 || open_seqn > seqn)
{
open = val;
open_seqn = seqn;
}
if(high == 0 || val > high)
high = val;
if(low == 0 || val < low)
low = val;
if(close_seqn == 0 || close_seqn < seqn)
{
close = val;
close_seqn = seqn;
}
break;
case 5:// MatchValumn
val = Integer.parseInt(column); /*total volumn*/
if(val > vol)
vol=val;
break;
default:
break;
}
}
catch (NumberFormatException e)
{
System.err.println(e.toString());
System.err.println("Integer.parseInt Error");
}
catch (Exception e)
{
System.err.println("Other Error"+e.getMessage());
}
}
}
candletext.set( String.format("%d\t%d\t%d\t%d\t%d",open,high,low,close,vol));
output.collect(key, candletext);
}
}
Compile it:
[kccs@master01 candle]$ cat compile.sh
#!/bin/bash
if [ -e /usr/lib/bigtop-utils/bigtop-detect-javahome ]; then
source /usr/lib/bigtop-utils/bigtop-detect-javahome
else
source /usr/libexec/bigtop-detect-javahome
fi
export CLASSPATH=/etc/hadoop/conf
for file in `ls /usr/lib/hadoop/client/*.jar`
do
export CLASSPATH=$CLASSPATH:$file
done
export CLASSPATH=$CLASSPATH:/usr/lib/hadoop/hadoop-annotations.jar
export LD_LIBRARY_PATH="$JAVA_HOME/jre/lib/amd64/server/"
if [ ! -e MyJava ]; then
mkdir MyJava
fi
javac -d MyJava Candle.java
jar -cvf candle.jar -C MyJava .
if [ ! -e ~/bin/candle_stick ]; then
mkdir ~/bin/candle_stick
fi
Run it:
[kccs@master01 candle]$ cat run2.sh
#!/bin/bash
WHO=`whoami`
inputPath=/kccs/$1/
outputPath=/user/$WHO/candle/$1
hdfs dfs -test -e $inputPath
if [ ! $? == "0" ]; then
echo "no tick data exist !!!"
exit 1
fi
WHO=`whoami`
hdfs dfs -test -e $outputPath
if [ $? == "0" ]; then
hdfs dfs -rm -r $outputPath
fi
echo "candle stick start...."
hadoop jar /home/kccs/src/ex_mapreduce/candle/candle.jar Candle $inputPath $outputPath
Result:
[kccs@master01 candle]$ hdfs dfs -cat /user/kccs/candle/20130730/part-00000
:
:
:
9958 12:24 1275 1280 1275 1280 543
9958 12:26 1280 1285 1280 1285 570
9958 12:27 1285 1285 1285 1285 575
9958 12:28 1285 1285 1285 1285 577
9958 12:30 1285 1285 1280 1280 580
9958 12:31 1280 1280 1280 1280 582
9958 12:32 1280 1280 1280 1280 587
9958 12:33 1280 1280 1280 1280 592
9958 12:34 1280 1280 1280 1280 612
9958 12:38 1275 1275 1275 1275 614
9958 12:43 1275 1275 1275 1275 621
9958 12:44 1275 1275 1275 1275 631
9958 12:45 1280 1280 1280 1280 632
9958 12:47 1275 1275 1275 1275 634
9958 12:51 1275 1275 1275 1275 644
9958 12:53 1275 1275 1275 1275 645
9958 12:58 1275 1275 1275 1275 647
9958 13:00 1275 1275 1275 1275 649
9958 13:01 1275 1275 1275 1275 652
9958 13:06 1280 1280 1280 1280 653
9958 13:10 1280 1280 1280 1280 673
9958 13:11 1280 1280 1280 1280 675
9958 13:12 1280 1280 1280 1280 680
9958 13:18 1275 1275 1275 1275 710
9958 13:19 1275 1275 1275 1275 717
9958 13:22 1275 1275 1275 1275 718
9958 13:23 1280 1280 1280 1280 723
9958 13:24 1280 1280 1280 1280 724
9958 13:30 1275 1275 1275 1275 737