2013年11月20日 星期三

shell scripts for file checking at day-changed(solaris)

> su - user
> vi /home/user/meter.sh
#!/bin/bash
mday=`date +%d`
cday=`date +%Y-%m-%d`
npath="/its/cts/dat/logs/mlog/$mday"
nfile="$npath/xync.log"
#fday=`stat --format="%y" $nfile | cut -f1 -d" "`
#ls -E /etc/hosts | awk '{print $6}'| sed 's#-##g'

if [ -e $nfile ]; then
    fday=`ls -E $nfile | awk '{print $6}'` >/dev/null 2>&1
if [ $fday = $cday ]; then
    date "+%Y/%m/%d %H:%M:%S -- ...   --" >> $nfile
else
    date "+%Y/%m/%d %H:%M:%S -- Start --" > $nfile
fi

else
    date "+%Y/%m/%d %H:%M:%S -- Start --" > $nfile
fi

> chmod 755 /home/user/meter.sh
> crontab -e
0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,
29,30,31,32,33,34,35,36,37,38,39,40,41,42,43,44,45,46,47,48,49,50,51,52,53,54
,55,56,57,58,59 * * * * /home/user/meter.sh 2>>/tmp/meter.log 1>>/tmp/mete
r.log

2013年10月29日 星期二

Centos 開機磁區重建

以往可能在一台電腦上安裝多個作業系統(例如CentOS+CentOS , CentOS+Windows)於不同硬碟上, 但開機的設定只能預設在一處, 當開機磁區移走時通常會遇到不能開機,以下Centos解法:
1. 找到安裝光碟
2. 以rescue mode開機, 並把原本Centos硬碟mount在/mnt/sysimage
    chroot /mnt/sysimage
    順便查一下kernel跟initrd的參數(版本)怎麼下, 可以參考原本/boot/grub/grub.conf的設定
    順便簡化它, 例如,如果指定kernel參數中root=指定/磁區, 可以把UUID改成識別符(/dev/sda3)
    如果/boot是獨立磁區, 要注意手動用grub開機的參數(見第4點說明)

3. 確定這顆Centos的硬碟的識別符是什麼(例如/dev/sda),重新安裝grub
    #grub-install /dev/sda
      Installation finished. No error reported.
      This is the contents of the device map /boot/grub/device.map.
      Check if this is correct or not. If any of the lines is incorrect,
      fix it and re-run the script `grub-install'.

      # this device map was generated by anaconda
      (hd0)     /dev/sda
 4. 此時重開機應該仍會找不到對應的開機磁區, 因為原本的grub.conf有問題,
     重開機後,會出現grub>的提示符, 假設原CentOS的磁區分配如下
     /dev/sda1    /boot
     /dev/sda2    swap
     /dev/sda3    /
    手動開機方式如下:
    grub>find //boot/grub/stage1    #尋找開機stage1的位置, 有時會不準, 尤其是/boot是獨立磁區
             (hd0,2)                           #在上面的磁區分配, 其實應該在(hd0,0)
    grub>root (hd0,0)                    #指定/boot分割區的位置
              Filesystem type is ext2fs, partition type 0x83
    grub>setup (hd0)                     #安裝開機程式(注意/boot分割區指定, 不然下次無法自動開機)
接下來如果要手動到把CentOS完全開機,就要載入kernel跟initramfs, 若不要重開機看看reboot
    grub>kernel /vmlinuz-2.6.32-220.el6.x86_64 ro root=/dev/sda3
    grub>initrd /initramfs-2.6.32-220.el6.x86_64.img
如果到這裡就沒反應,表示initrd的img檔不對, 回頭確認再試一次,(注意, 有些是initrd-2.6...)
    grub>boot
如果此時能開機成功, 回頭看一下/boot/grub/grub.conf的設定跟上面的一不一樣, 有錯的就改過來

2013年10月8日 星期二

nc - netcat TCP/UDP network testing tool

Client(10.108.2.2) ---- Server(10.108.1.1)

1. Listen as a TCP Server (IPv4)
    Server> nc -4 -l 1234

2. connect to a TCP Server (as TCP Client, IPv4)
    Client> nc -4 10.108.1.1 1234


3. Listen as a UDP Server (IPv4)
    Server> nc -4 -l -u 12345

4. connect to a UDP Server (as UDP Client, IPv4)
    Client> nc -4 -u 10.108.1.1 12345


<[outbound server] 172.16.43.100:1111>
                    +
                    |
                    +-Client(10.108.2.2) ---- Server(10.108.1.1)

5. Recieve and Listen as a TCP Server (IPv4, tcp to tcp)
    Server> nc -4 -l 1234

6. connect to a TCP Server (as TCP Client, IPv4)
    Client> nc -4 72.16.43.100 1111 | nc -4 10.108.1.1 1234


7. Recieve and Listen as a UDP Server (IPv4, tcp to udp)
    Server> nc -4 -l -u 12345

8. connect to a TCP Server (as TCP Client, IPv4)
    Client> nc -4 72.16.43.100 1111 | nc -4 -u 10.108.1.1 12345

udp to tcp
udp to udp
also fine

reference:http://blog.miniasp.com/post/2008/07/A-magic-command-netcat.aspx
                         http://www.thegeekstuff.com/2012/04/nc-command-examples/


2013年8月7日 星期三

Easy Minutely Candle-Stick Data By MapReduce under CDH4.2.0 / CM4.5.0 --(3) Pipes

Environment:



OS: CentOS 6.2
Hadoop: CDH4.2.0 install via CM4.5.0
Hosts:
     Hostname  processes          ip
     ========  =================  ================
     master01  Namenode(CDH4)     192.168.122.11
     smaster   SecondaryNamenode  192.168.122.10
               JobTracker
     knode020  Datanode           192.168.122.20
               TaskTracker
     knode021  Datanode           192.168.122.21
               TaskTracker

MapReduce Version: 0.20.2


TWSE(Stock and warrant ticks)
  |
  +-> socket client(with libhdfs.so)
       |
       +-> HDFS(Stored by date)
             |
             +->MapReduce(candle-pipes.cc)
                  |
                  +-> HDFS(minutely candle stick data)

Map & Reduce Classes


candle-pipes.cc

#include <iostream>
#include <string>  // for std::stoi
#include <sstream> // for std::istringstream
#include <stdexcept> // for exception object
#include <vector>  // for vector<>


#include <hadoop/Pipes.hh>
#include <hadoop/TemplateFactory.hh>
#include <hadoop/StringUtils.hh>
const char kKVDelim = '\t';
const char kSecKDelim = ',';
const char kLineDelimitor = '\n';

const char* sKVDelim = "\t";
const char* sSecKDelim = ",";
const char* sLineDelimitor = "\n";

class CandlePipesMapper : public HadoopPipes::Mapper{
public:
  CandlePipesMapper(HadoopPipes::TaskContext& context){}
  void map(HadoopPipes::MapContext& context){
       std::string timestamp;
       std::string symbol;
       std::string SEQN;
       std::string HiLoMark;
       std::string MatchTime;
       std::string MatchPrice;
       std::string MatchVolumn;
       std::string line = context.getInputValue();

       std::vector< std::string > words = HadoopUtils::splitString( line, sKVDelim );
       if(words.size() > 6 && words[5] != "-") /*MatchPrice != "-"*/
       {
          timestamp = words[0];
          symbol = words[1];
          SEQN = words[2];
          MatchTime = words[3];
          HiLoMark = words[4];
          MatchPrice = words[5];
          MatchVolumn = words[6];

          context.emit(symbol + sKVDelim + MatchTime.substr(0,5),
             MatchTime.substr(6,2) + sKVDelim
                  + SEQN + sKVDelim
                  + timestamp + sKVDelim
                  + HiLoMark + sKVDelim
                  + MatchPrice + sKVDelim
                  + MatchVolumn);

       }

  }
};

class CandlePipesReducer : public HadoopPipes::Reducer{
public:
  CandlePipesReducer(HadoopPipes::ReduceContext& context){}
  void reduce(HadoopPipes::ReduceContext& context){
       std::string line;
       std::string token;
       int high = 0;
       int open = 0;
       int low = 0;
       int close = 0;
       int vol = 0;
       int open_seqn = 0;
       int close_seqn = 0;
       int seqn = 0;
       int b_received = 0;
       std::string last_symbol="";
       std::string last_MatchHHMM;
       while(context.nextValue())
       {
         int val=0;
         std::string values = context.getInputValue();

         std::vector< std::string > words = HadoopUtils::splitString( values, sKVDelim );
         if(words.size() >= 6 ) /*MatchPrice != "-"*/
         {
           try{
             /*match seqn*/
             seqn = std::stoi(words[1]);

             /*match price*/
             val = std::stoi(words[4]);
             if(open_seqn == 0 || open_seqn  > seqn)
             {
               open = val;
               open_seqn = seqn;
             }
             if(high == 0 || val > high)
               high = val;
             if(low == 0 || val < low)
               low = val;
             if(close_seqn == 0 || close_seqn < seqn)
             {
               close = val;
               close_seqn = seqn;
             }

             /*match volumn*/
             val = std::stoi(words[5]); /*total volumn*/
             if(val > vol)
               vol=val;
           }
           catch (const std::invalid_argument& ia) {
             std::cerr << "Invalid argument: " << ia.what() << '\n';
             continue;
           }
           catch (const std::out_of_range& oor) {
             std::cerr << "Out of Range error: " << oor.what() << '\n';
             continue;
           }


           b_received = 1;

         }


     }
     if(b_received)
     {
         context.emit(context.getInputKey(),
                  HadoopUtils::toString(open)  + sKVDelim
                  + HadoopUtils::toString(high) + sKVDelim
                  + HadoopUtils::toString(low) + sKVDelim
                  + HadoopUtils::toString(close) + sKVDelim
                  + HadoopUtils::toString(vol) );
     }

  }
};

int main(int argc , char *argv[])
{
  return HadoopPipes::runTask(HadoopPipes::TemplateFactory<CandlePipesMapper, CandlePipesReducer>());
}


Compile it:


[kccs@master01 candle-pipes]$ cat compile.sh
#!/bin/bash
make
if [ ! -e ~/bin/candle_stick ]; then
mkdir ~/bin/candle_stick
fi

#cp run2.sh ~/bin/candle_stick/.
##cp run-test.sh ~/bin/candle_stick/.
#cp envpre.sh ~/bin/candle_stick/.
#cp candle.jar ~/bin/candle_stick/.

[kccs@master01 candle-pipes]$ cat makefile
CC = g++
MAPREDUCE_INSTALL=/usr/lib/hadoop-0.20-mapreduce
CPPFLAGS = -m64 -I$(MAPREDUCE_INSTALL)/include

candle-pipes: candle-pipes.cc
        $(CC) $(CPPFLAGS) $< -Wall -L$(MAPREDUCE_INSTALL)/lib/native -std=c++0x -lhadooppipes -lhadooputils -lpthread -lssl -lcrypto -g -O2 -o $@
clean:
        rm -f *.o *.a *.s candle-pipes

## you need openssl-devel for libssl.so and libcrypto.so

Run it:


[kccs@master01 candle-pipes]$ cat run2.sh
#!/bin/bash
WHO=`whoami`
inputPath=/kccs/$1/
#inputPath=candle-input
outputPath=/user/$WHO/candle/$1
#outputPath=candle-output
program=candle-pipes

hdfs dfs -test -e $inputPath
if [ ! $? == "0" ]; then
echo "no tick data exist !!!"
exit 1
fi



hdfs dfs -test -e $outputPath
if [ $? == "0" ]; then
hdfs dfs -rm -r $outputPath
fi

hdfs dfs -test -d bin
if [ ! $? == "0" ]; then
hdfs dfs -mkdir bin
fi

hdfs dfs -rm ./candle-pipes bin/$program
hdfs dfs -put ./candle-pipes bin/$program

echo "candle stick start...."
hadoop pipes -D hadoop.pipes.java.recordreader=true -D hadoop.pipes.java.recordwriter=true -input $inputPath -output $outputPath -program bin/candle-pipes


[kccs@master01 candle-pipes]$ ./run2.sh YYYYMMDD
ex.
[kccs@master01 candle-pipes]$ ./run2.sh 20130801

Result:



[kccs@master01 candle]$ hdfs dfs -cat /user/kccs/candle/20130801/part-00000
:
:
:
9958    10:49   1265    1265    1265    1265    201
9958    10:51   1265    1265    1265    1265    247
9958    11:00   1260    1260    1260    1260    257
9958    11:03   1260    1260    1260    1260    313
9958    11:04   1260    1260    1260    1260    316
9958    11:10   1260    1260    1260    1260    318
9958    11:11   1260    1260    1260    1260    328
9958    11:14   1260    1260    1260    1260    336
9958    11:17   1260    1260    1260    1260    338
9958    11:19   1260    1260    1260    1260    340
9958    11:25   1260    1260    1260    1260    342
9958    11:30   1260    1260    1260    1260    358
9958    11:33   1255    1255    1255    1255    363
9958    11:35   1260    1260    1260    1260    364
9958    11:47   1255    1255    1255    1255    367
9958    11:54   1255    1255    1255    1255    368
9958    12:00   1260    1260    1260    1260    369
9958    12:06   1255    1255    1255    1255    394
9958    12:09   1255    1255    1255    1255    396
9958    12:25   1260    1260    1260    1260    397
9958    12:34   1255    1255    1255    1255    417
9958    12:35   1255    1255    1255    1255    418
9958    12:42   1255    1255    1255    1255    420
9958    12:50   1255    1255    1255    1255    430
9958    13:00   1255    1255    1255    1255    431
9958    13:01   1255    1255    1255    1255    432
9958    13:11   1255    1255    1255    1255    442
9958    13:12   1255    1255    1255    1255    452
9958    13:14   1255    1255    1255    1255    455
9958    13:17   1255    1255    1255    1255    460
9958    13:19   1255    1260    1255    1260    471
9958    13:24   1260    1260    1260    1260    480
9958    13:30   1260    1260    1260    1260    511


Compare:


run input-paths-to-process/trip-time(sec)/cpu-time(ms)
+--------------------------------------------------+
|run #    | Streaming   | Java       |    Pipes    |
|---------+-------------+------------+-------------+
| 1       | 5/28/-      | 5/29/14100 | 5/33/13180  |
| 2       | 5/26/-      | 5/24/16240 | 5/32/13540  |
| 3       | 5/25/-      | 5/27/16480 | 5/33/11540  |
| 4       | 5/26/-      | 5/29/13770 | 5/33/13460  |
| 5       | 5/31/-      | 5/24/16530 | 5/34/12440  |
|---------+-------------+------------+-------------+
|AVG      |   27.2      |   26.6     |   33.0      |
|run-trip |             |            |             |
+--------------------------------------------------+

memory physical/virtual
+----------------------------------------------------------------+
|run # | Streaming |         Java         |          Pipes       |
|------+-----------+----------------------+----------------------+
| 1    |    -      | 954920960/5369929728 | 863948800/5397696512 |
| 2    |    -      | 959614976/5370134528 | 860721152/5397913600 |
| 3    |    -      | 910581760/5689860096 | 878026752/5067251712 |
| 4    |    -      |1032314880/5074067456 | 854880256/5390655488 |
| 5    |    -      | 903376896/5691715584 | 848859136/5701148672 |
+----------------------------------------------------------------+

2013年8月2日 星期五

Easy Minutely Candle-Stick Data By MapReduce under CDH4.2.0 / CM4.5.0 --(2) Streaming


Environment:


OS: CentOS 6.2
Hadoop: CDH4.2.0 install via CM4.5.0
Hosts:
     Hostname  processes          ip
     ========  =================  ================
     master01  Namenode(CDH4)     192.168.122.11
     smaster   SecondaryNamenode  192.168.122.10
               JobTracker
     knode020  Datanode           192.168.122.20
               TaskTracker
     knode021  Datanode           192.168.122.21
               TaskTracker

MapReduce Version: 0.20.2


TWSE(Stock and warrant ticks)
  |
  +-> socket client(with libhdfs.so)
       |
       +-> HDFS(Stored by date)
             |
             +->MapReduce(candle-map, candle-reduce)
                  |
                  +-> HDFS(minutely candle stick data)

Map & Reduce Classes

candle-map.cc

#include <iostream>
#include <string>
#include <sstream>


const char kKVDelim = '\t';
const char kSecKDelim = ',';
const char kLineDelimitor = '\n';
class Mapper {
   public:
   Mapper(std::istream & input_stream){
       std::string timestamp;
       std::string symbol;
       std::string SEQN;
       std::string HiLoMark;
       std::string MatchTime;
       std::string MatchPrice;
       std::string MatchVolumn;
       std::string line;
       std::string token;
       while(getline(input_stream, line, kLineDelimitor))
       {
         int cnt = 0;
         std::istringstream issline(line);
         symbol=SEQN=MatchTime=HiLoMark=MatchPrice=MatchVolumn=timestamp="";
         while(getline(issline, token, kKVDelim) && cnt <= 6)
         {
           switch(cnt)
           {
           case 0:// timestamp
               timestamp = token;
               break;
           case 1:// symbol
               symbol = token;
               break;
           case 2:// SEQN
               SEQN = token;
               break;
           case 3:// MatchTime
               MatchTime = token;
               break;
           case 4:// HiLoMark
               HiLoMark = token;
               break;
           case 5:// MatchPrice
               MatchPrice = token;
               break;
           case 6:// MatchVolumn
               MatchVolumn = token;
               break;
           default: // order book
               break;
           }
           cnt++;
         }
         if(MatchPrice != "-")/*only match record*/
         {
           std::cout << symbol << kSecKDelim
              << MatchTime.substr(0,5) << kKVDelim
              << MatchTime.substr(6,2) << kKVDelim
              << SEQN << kKVDelim
              << timestamp << kKVDelim
              << HiLoMark << kKVDelim
              << MatchPrice << kKVDelim
              << MatchVolumn << std::endl;
         }
       }
   }
};
int main(int argc, char** argv) {
  Mapper mapper(std::cin);
  return 0;
}


candle-reduce.cc

#include <iostream>
#include <stdexcept>
#include <string>
#include <sstream>


const char kSecKDelim = ',';
const char kKVDelim = '\t';
const char kLineDelimitor = '\n';

class Reducer {
   public :
   Reducer(std::istream & input_stream){

       std::string line;
       std::string token;
       int high = 0;
       int open = 0;
       int low = 0;
       int close = 0;
       int vol = 0;
       int open_seqn = 0;
       int close_seqn = 0;
       int seqn = 0;
       int last_error = 0;
       std::string last_symbol="";
       std::string last_MatchHHMM;
       while(getline(input_stream, line, kLineDelimitor))
       {
         int cnt = 0;
         std::string symbol;
         std::string MatchHHMM;
         int val=0;
         last_error = 0;
         std::istringstream issline(line);
         while(getline(issline, token, kKVDelim) && cnt < 8)
         {
           try {
             switch(cnt)
             {
             case 0: //symbol,MatchHHMM
             {
               size_t delim_pos = token.find(kSecKDelim);
               if (delim_pos != std::string::npos) {
                 symbol = token.substr(0, delim_pos);
                 MatchHHMM = token.substr(delim_pos + 1);
               }
             }

               break;
             case 2: //SEQN
               seqn = std::stoi(token); // you need compile stoi with '-std=c++0x' option
               break;
             case 5:// MatchPrice
               val = std::stoi(token);
               if(open_seqn == 0 || open_seqn  > seqn)
               {
                 open = val;
                 open_seqn = seqn;
               }
               if(high == 0 || val > high)
                 high = val;
               if(low == 0 || val < low)
                 low = val;
               if(close_seqn == 0 || close_seqn < seqn)
               {
                 close = val;
                 close_seqn = seqn;
               }
               break;
             case 6:// MatchValumn
               val = std::stoi(token); /*total volumn*/
               if(val > vol)
                 vol=val;
               break;
             default:
               break;
             }
           }
           catch (const std::invalid_argument& ia) {
             std::cerr << "Invalid argument: " << ia.what() << '\n';
             last_error = 1;
             break;
           }
           catch (const std::out_of_range& oor) {
             std::cerr << "Out of Range error: " << oor.what() << '\n';
             last_error = 1;
             break;
           }

           if(  cnt==0 && (last_symbol != symbol || last_MatchHHMM != MatchHHMM))
           {
             if(last_symbol != "")
             {
               std::cout << last_symbol << kKVDelim
                  << last_MatchHHMM << kKVDelim
                  << open << kKVDelim
                  << high << kKVDelim
                  << low << kKVDelim
                  << close << kKVDelim
                  << vol << std::endl;
             }
             last_MatchHHMM = MatchHHMM;
             last_symbol = symbol;
             high = 0;
             open = 0;
             low = 0;
             close = 0;
             vol = 0;
             open_seqn = 0;
             close_seqn = 0;
             seqn = 0;
           }
           cnt++;
         }

       }
       if(last_error ==0)
       {

             std::cout << last_symbol << kKVDelim
                << last_MatchHHMM << kKVDelim
                << open << kKVDelim
                << high << kKVDelim
                << low << kKVDelim
                << close << kKVDelim
                << vol << std::endl;
       }
     }
   };
int main(int argc, char** argv) {
  Reducer reducer(std::cin);
  return 0;
}

Compile it:


[kccs@master01 candle-streaming]$ cat compile.sh
#!/bin/bash
g++ -o candle-map candle-map.cc
g++ -std=c++0x -o candle-reduce candle-reduce.cc

Run it:


[kccs@master01 candle-streaming]$ cat run2.sh
#!/bin/bash
WHO=`whoami`
inputPath=/kccs/$1/
outputPath=/user/$WHO/candle/$1
hdfs dfs -test -e $inputPath
if [ ! $? == "0" ]; then
echo "no tick data exist !!!"
exit 1
fi

WHO=`whoami`

hdfs dfs -test -e $outputPath
if [ $? == "0" ]; then
hdfs dfs -rm -r $outputPath
fi
echo "candle stick start...."
## you could test(debug) by local command
## cat [data source file] | ./candle-map | ./candle-reduce
hadoop jar /usr/lib/hadoop-0.20-mapreduce/contrib/streaming/hadoop-streaming-*.jar -file ./candle-map -mapper "./candle-map" -file ./candle-reduce -reducer ./candle-reduce -input $inputPath -output $outputPath


Result:


[kccs@master01 candle]$ hdfs dfs -cat /user/kccs/candle/20130730/part-00000
:
:
:
9958    12:24   1275    1280    1275    1280    543
9958    12:26   1280    1285    1280    1285    570
9958    12:27   1285    1285    1285    1285    575
9958    12:28   1285    1285    1285    1285    577
9958    12:30   1285    1285    1280    1280    580
9958    12:31   1280    1280    1280    1280    582
9958    12:32   1280    1280    1280    1280    587
9958    12:33   1280    1280    1280    1280    592
9958    12:34   1280    1280    1280    1280    612
9958    12:38   1275    1275    1275    1275    614
9958    12:43   1275    1275    1275    1275    621
9958    12:44   1275    1275    1275    1275    631
9958    12:45   1280    1280    1280    1280    632
9958    12:47   1275    1275    1275    1275    634
9958    12:51   1275    1275    1275    1275    644
9958    12:53   1275    1275    1275    1275    645
9958    12:58   1275    1275    1275    1275    647
9958    13:00   1275    1275    1275    1275    649
9958    13:01   1275    1275    1275    1275    652
9958    13:06   1280    1280    1280    1280    653
9958    13:10   1280    1280    1280    1280    673
9958    13:11   1280    1280    1280    1280    675
9958    13:12   1280    1280    1280    1280    680
9958    13:18   1275    1275    1275    1275    710
9958    13:19   1275    1275    1275    1275    717
9958    13:22   1275    1275    1275    1275    718
9958    13:23   1280    1280    1280    1280    723
9958    13:24   1280    1280    1280    1280    724
9958    13:30   1275    1275    1275    1275    737

Compare:

run trip time(sec)
+-----------------------------+
|run # | Streaming | Java     |
|------+-----------+----------|
| 1    |  28       |  24      |
| 2    |  24       |  24      |
| 3    |  23       |  22      |
| 4    |  25       |  26      |
| 5    |  31       |  25      |
|------+-----------+----------|
|AVG   |  26.2     |  24.2    | 
+-----------------------------+

2013年7月30日 星期二

Easy Minutely Candle-Stick Data By MapReduce under CDH4.2.0 / CM4.5.0--(1) JAVA

Environment:

OS: CentOS 6.2
Hadoop: CDH4.2.0 install via CM4.5.0
Hosts:
     Hostname  processes          ip
     ========  =================  ================
     master01  Namenode(CDH4)     192.168.122.11
     smaster   SecondaryNamenode  192.168.122.10
               JobTracker
     knode020  Datanode           192.168.122.20
               TaskTracker
     knode021  Datanode           192.168.122.21
               TaskTracker

MapReduce Version: 0.20.2


TWSE(Stock and warrant ticks)
  |
  +-> socket client(with libhdfs.so)
       |
       +-> HDFS(Stored by date)
             |
             +->MapReduce(Candle.java, candle.jar)
                  |
                  +-> HDFS(minutely candle stick data)

Map & Reduce Classes


   public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, Text> {
     private Text keytext = new Text();
     private Text valuetext = new Text();

     public void map(LongWritable key, Text value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
       int cnt = 0;
       String line = value.toString();
       String timestamp = new String();
       String symbol = new String();
       String SEQN = new String();
       String HiLoMark = new String();
       String MatchTime = new String();
       String MatchPrice = new String();
       String MatchVolumn = new String();
       StringTokenizer tokenizer = new StringTokenizer(line,"\t\n"); // default delim = " \t\n\r\f"
       while (tokenizer.hasMoreTokens() && cnt <7 ) {
         switch(cnt)
         {
         case 0:// timestamp
             timestamp = tokenizer.nextToken();
             break;
         case 1:// symbol
             symbol = tokenizer.nextToken();
             break;
         case 2:// SEQN
             SEQN = tokenizer.nextToken();
             break;
         case 3:// MatchTime
             MatchTime = tokenizer.nextToken();
             break;
         case 4:// HiLoMark
             HiLoMark = tokenizer.nextToken();
             break;
         case 5:// MatchPrice
             MatchPrice = tokenizer.nextToken();
             break;
         case 6:// MatchVolumn
             break;
         case 6:// MatchVolumn
             MatchVolumn = tokenizer.nextToken();
             break;
         default: // order book
             tokenizer.nextToken();
             break;
         }
         cnt++;
       }
       if(!MatchPrice.matches("-"))/*only match record*/
       {
         //keytext.set( String.format("%s\t%s", symbol,MatchTime.substring(0,5)));
         keytext.set( String.format("%s\t%s", symbol,MatchTime.substring(0,5)));
         valuetext.set( String.format("%s\t%s\t%s\t%s\t%s\t%s",MatchTime.substring(6,8),SEQN,timestamp,HiLoMark,MatchPrice,MatchVolumn));
         output.collect(keytext, valuetext);
       }
     }
   }

   public static class Reduce extends MapReduceBase implements Reducer<Text, Text, Text, Text> {
     private int high = 0;
     private int open = 0;
     private int low = 0;
     private int close = 0;
     private int vol = 0;
     private int open_seqn = 0;
     private int close_seqn = 0;
     private Text candletext = new Text();
     public void reduce(Text key, Iterator<Text> values, OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
       high = 0;
       open = 0;
       low = 0;
       close = 0;
       vol = 0;
       open_seqn = 0;
       close_seqn=0;

       while (values.hasNext()) {
         String Value=values.next().toString();
         StringTokenizer ValueTokenizer = new StringTokenizer(Value,"\t"); // default delim = " \t\n\r\f"
         int ii = 0;
         int seqn = 0;
         while (ValueTokenizer.hasMoreTokens() && ii <6 ) {
           int val = 0;
           String column = ValueTokenizer.nextToken();
           try
           {
             switch(ii++)
             {
             case 1: //SEQN
               seqn = Integer.parseInt(column);
               break;
             case 4:// MatchPrice
               val = Integer.parseInt(column);
               if(open_seqn == 0 || open_seqn  > seqn)
               {
                 open = val;
                 open_seqn = seqn;
               }
               if(high == 0 || val > high)
                 high = val;
               if(low == 0 || val < low)
                 low = val;
               if(close_seqn == 0 || close_seqn < seqn)
               {
                 close = val;
                 close_seqn = seqn;
               }
               break;
             case 5:// MatchValumn
               val = Integer.parseInt(column); /*total volumn*/
               if(val > vol)
                 vol=val;
               break;
             default:
               break;
             }
           }
           catch (NumberFormatException e)
           {
             System.err.println(e.toString());
             System.err.println("Integer.parseInt Error");
           }
           catch (Exception e)
           {
             System.err.println("Other Error"+e.getMessage());
           }
         }

       }
       candletext.set( String.format("%d\t%d\t%d\t%d\t%d",open,high,low,close,vol));
       output.collect(key, candletext);
     }
   }
                                                                                                       

Compile it:

[kccs@master01 candle]$ cat compile.sh
#!/bin/bash

if [ -e /usr/lib/bigtop-utils/bigtop-detect-javahome ]; then
        source /usr/lib/bigtop-utils/bigtop-detect-javahome
else
        source /usr/libexec/bigtop-detect-javahome
fi

export CLASSPATH=/etc/hadoop/conf
for file in `ls /usr/lib/hadoop/client/*.jar`
do
  export CLASSPATH=$CLASSPATH:$file
done
export CLASSPATH=$CLASSPATH:/usr/lib/hadoop/hadoop-annotations.jar
export LD_LIBRARY_PATH="$JAVA_HOME/jre/lib/amd64/server/"


if [ ! -e MyJava ]; then
mkdir MyJava
fi

javac -d MyJava Candle.java
jar -cvf candle.jar -C MyJava .

if [ ! -e ~/bin/candle_stick ]; then
mkdir ~/bin/candle_stick
fi


Run it:

[kccs@master01 candle]$ cat run2.sh
#!/bin/bash
WHO=`whoami`
inputPath=/kccs/$1/
outputPath=/user/$WHO/candle/$1
hdfs dfs -test -e $inputPath
if [ ! $? == "0" ]; then
echo "no tick data exist !!!"
exit 1
fi

WHO=`whoami`

hdfs dfs -test -e $outputPath
if [ $? == "0" ]; then
hdfs dfs -rm -r $outputPath
fi
echo "candle stick start...."
hadoop jar /home/kccs/src/ex_mapreduce/candle/candle.jar Candle $inputPath $outputPath

Result:

[kccs@master01 candle]$ hdfs dfs -cat /user/kccs/candle/20130730/part-00000
:
:
:
9958    12:24   1275    1280    1275    1280    543
9958    12:26   1280    1285    1280    1285    570
9958    12:27   1285    1285    1285    1285    575
9958    12:28   1285    1285    1285    1285    577
9958    12:30   1285    1285    1280    1280    580
9958    12:31   1280    1280    1280    1280    582
9958    12:32   1280    1280    1280    1280    587
9958    12:33   1280    1280    1280    1280    592
9958    12:34   1280    1280    1280    1280    612
9958    12:38   1275    1275    1275    1275    614
9958    12:43   1275    1275    1275    1275    621
9958    12:44   1275    1275    1275    1275    631
9958    12:45   1280    1280    1280    1280    632
9958    12:47   1275    1275    1275    1275    634
9958    12:51   1275    1275    1275    1275    644
9958    12:53   1275    1275    1275    1275    645
9958    12:58   1275    1275    1275    1275    647
9958    13:00   1275    1275    1275    1275    649
9958    13:01   1275    1275    1275    1275    652
9958    13:06   1280    1280    1280    1280    653
9958    13:10   1280    1280    1280    1280    673
9958    13:11   1280    1280    1280    1280    675
9958    13:12   1280    1280    1280    1280    680
9958    13:18   1275    1275    1275    1275    710
9958    13:19   1275    1275    1275    1275    717
9958    13:22   1275    1275    1275    1275    718
9958    13:23   1280    1280    1280    1280    723
9958    13:24   1280    1280    1280    1280    724
9958    13:30   1275    1275    1275    1275    737



                                                                                                       

文章分類