2013年7月30日 星期二

Easy Minutely Candle-Stick Data By MapReduce under CDH4.2.0 / CM4.5.0--(1) JAVA

Environment:

OS: CentOS 6.2
Hadoop: CDH4.2.0 install via CM4.5.0
Hosts:
     Hostname  processes          ip
     ========  =================  ================
     master01  Namenode(CDH4)     192.168.122.11
     smaster   SecondaryNamenode  192.168.122.10
               JobTracker
     knode020  Datanode           192.168.122.20
               TaskTracker
     knode021  Datanode           192.168.122.21
               TaskTracker

MapReduce Version: 0.20.2


TWSE(Stock and warrant ticks)
  |
  +-> socket client(with libhdfs.so)
       |
       +-> HDFS(Stored by date)
             |
             +->MapReduce(Candle.java, candle.jar)
                  |
                  +-> HDFS(minutely candle stick data)

Map & Reduce Classes


   public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, Text> {
     private Text keytext = new Text();
     private Text valuetext = new Text();

     public void map(LongWritable key, Text value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
       int cnt = 0;
       String line = value.toString();
       String timestamp = new String();
       String symbol = new String();
       String SEQN = new String();
       String HiLoMark = new String();
       String MatchTime = new String();
       String MatchPrice = new String();
       String MatchVolumn = new String();
       StringTokenizer tokenizer = new StringTokenizer(line,"\t\n"); // default delim = " \t\n\r\f"
       while (tokenizer.hasMoreTokens() && cnt <7 ) {
         switch(cnt)
         {
         case 0:// timestamp
             timestamp = tokenizer.nextToken();
             break;
         case 1:// symbol
             symbol = tokenizer.nextToken();
             break;
         case 2:// SEQN
             SEQN = tokenizer.nextToken();
             break;
         case 3:// MatchTime
             MatchTime = tokenizer.nextToken();
             break;
         case 4:// HiLoMark
             HiLoMark = tokenizer.nextToken();
             break;
         case 5:// MatchPrice
             MatchPrice = tokenizer.nextToken();
             break;
         case 6:// MatchVolumn
             break;
         case 6:// MatchVolumn
             MatchVolumn = tokenizer.nextToken();
             break;
         default: // order book
             tokenizer.nextToken();
             break;
         }
         cnt++;
       }
       if(!MatchPrice.matches("-"))/*only match record*/
       {
         //keytext.set( String.format("%s\t%s", symbol,MatchTime.substring(0,5)));
         keytext.set( String.format("%s\t%s", symbol,MatchTime.substring(0,5)));
         valuetext.set( String.format("%s\t%s\t%s\t%s\t%s\t%s",MatchTime.substring(6,8),SEQN,timestamp,HiLoMark,MatchPrice,MatchVolumn));
         output.collect(keytext, valuetext);
       }
     }
   }

   public static class Reduce extends MapReduceBase implements Reducer<Text, Text, Text, Text> {
     private int high = 0;
     private int open = 0;
     private int low = 0;
     private int close = 0;
     private int vol = 0;
     private int open_seqn = 0;
     private int close_seqn = 0;
     private Text candletext = new Text();
     public void reduce(Text key, Iterator<Text> values, OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
       high = 0;
       open = 0;
       low = 0;
       close = 0;
       vol = 0;
       open_seqn = 0;
       close_seqn=0;

       while (values.hasNext()) {
         String Value=values.next().toString();
         StringTokenizer ValueTokenizer = new StringTokenizer(Value,"\t"); // default delim = " \t\n\r\f"
         int ii = 0;
         int seqn = 0;
         while (ValueTokenizer.hasMoreTokens() && ii <6 ) {
           int val = 0;
           String column = ValueTokenizer.nextToken();
           try
           {
             switch(ii++)
             {
             case 1: //SEQN
               seqn = Integer.parseInt(column);
               break;
             case 4:// MatchPrice
               val = Integer.parseInt(column);
               if(open_seqn == 0 || open_seqn  > seqn)
               {
                 open = val;
                 open_seqn = seqn;
               }
               if(high == 0 || val > high)
                 high = val;
               if(low == 0 || val < low)
                 low = val;
               if(close_seqn == 0 || close_seqn < seqn)
               {
                 close = val;
                 close_seqn = seqn;
               }
               break;
             case 5:// MatchValumn
               val = Integer.parseInt(column); /*total volumn*/
               if(val > vol)
                 vol=val;
               break;
             default:
               break;
             }
           }
           catch (NumberFormatException e)
           {
             System.err.println(e.toString());
             System.err.println("Integer.parseInt Error");
           }
           catch (Exception e)
           {
             System.err.println("Other Error"+e.getMessage());
           }
         }

       }
       candletext.set( String.format("%d\t%d\t%d\t%d\t%d",open,high,low,close,vol));
       output.collect(key, candletext);
     }
   }
                                                                                                       

Compile it:

[kccs@master01 candle]$ cat compile.sh
#!/bin/bash

if [ -e /usr/lib/bigtop-utils/bigtop-detect-javahome ]; then
        source /usr/lib/bigtop-utils/bigtop-detect-javahome
else
        source /usr/libexec/bigtop-detect-javahome
fi

export CLASSPATH=/etc/hadoop/conf
for file in `ls /usr/lib/hadoop/client/*.jar`
do
  export CLASSPATH=$CLASSPATH:$file
done
export CLASSPATH=$CLASSPATH:/usr/lib/hadoop/hadoop-annotations.jar
export LD_LIBRARY_PATH="$JAVA_HOME/jre/lib/amd64/server/"


if [ ! -e MyJava ]; then
mkdir MyJava
fi

javac -d MyJava Candle.java
jar -cvf candle.jar -C MyJava .

if [ ! -e ~/bin/candle_stick ]; then
mkdir ~/bin/candle_stick
fi


Run it:

[kccs@master01 candle]$ cat run2.sh
#!/bin/bash
WHO=`whoami`
inputPath=/kccs/$1/
outputPath=/user/$WHO/candle/$1
hdfs dfs -test -e $inputPath
if [ ! $? == "0" ]; then
echo "no tick data exist !!!"
exit 1
fi

WHO=`whoami`

hdfs dfs -test -e $outputPath
if [ $? == "0" ]; then
hdfs dfs -rm -r $outputPath
fi
echo "candle stick start...."
hadoop jar /home/kccs/src/ex_mapreduce/candle/candle.jar Candle $inputPath $outputPath

Result:

[kccs@master01 candle]$ hdfs dfs -cat /user/kccs/candle/20130730/part-00000
:
:
:
9958    12:24   1275    1280    1275    1280    543
9958    12:26   1280    1285    1280    1285    570
9958    12:27   1285    1285    1285    1285    575
9958    12:28   1285    1285    1285    1285    577
9958    12:30   1285    1285    1280    1280    580
9958    12:31   1280    1280    1280    1280    582
9958    12:32   1280    1280    1280    1280    587
9958    12:33   1280    1280    1280    1280    592
9958    12:34   1280    1280    1280    1280    612
9958    12:38   1275    1275    1275    1275    614
9958    12:43   1275    1275    1275    1275    621
9958    12:44   1275    1275    1275    1275    631
9958    12:45   1280    1280    1280    1280    632
9958    12:47   1275    1275    1275    1275    634
9958    12:51   1275    1275    1275    1275    644
9958    12:53   1275    1275    1275    1275    645
9958    12:58   1275    1275    1275    1275    647
9958    13:00   1275    1275    1275    1275    649
9958    13:01   1275    1275    1275    1275    652
9958    13:06   1280    1280    1280    1280    653
9958    13:10   1280    1280    1280    1280    673
9958    13:11   1280    1280    1280    1280    675
9958    13:12   1280    1280    1280    1280    680
9958    13:18   1275    1275    1275    1275    710
9958    13:19   1275    1275    1275    1275    717
9958    13:22   1275    1275    1275    1275    718
9958    13:23   1280    1280    1280    1280    723
9958    13:24   1280    1280    1280    1280    724
9958    13:30   1275    1275    1275    1275    737



                                                                                                       

文章分類