2012年11月6日 星期二

基於mpich2進行雲端運算實例

近來為了一個實驗性的架構,以支應明年進行服務擴充而準備。選定了以Hadoop+Hypertable為基底的資料服務;並選用了MPICH2+Torque來進行分散式運算的工作。在這裡先不論其好壞(日後再論述),先進行實作之探討:

主機架構

       選用的Hypertable+Hadoop在先前的文章裡已有描述其安裝與建置步驟;而MPICH2+Torque的建置過程也在可以這裡找到。建置過程,礙於成本上考量,每一台主機皆是以虛擬機來達成多台的部署。在MPICH2+Torque的架構上,選用2台電腦做為分散式計算的compute nodes,其中一台為主要分散式運算服務的發送指令主機(Head node)
        在這個案例裡,利用Client Agent(cagent)在compute node上來當作服務客戶要求的程序(process),採multiprocess方式運作,每當一個用戶端連線時,隨即在compute node上fork出一個程式來服務它。後續再依用戶端的要求,對2台compute node進行相對應服務的取得(分散運算或雲端運算)








        上圖,即是針對上述的概念所進行一項實作內容的雛形架構。服務用戶端的程式為cagent(Client Agent),而用戶端連線後,想要取得計算移動平均線的服務mv_avg ( moving average)。 從流程的概念上分述如下:
(1)用戶端以TCP/IP對Client Agent取得連線的同意後,Client Agent於是fork另一程式出來進行單獨服務
(2)Client Agent依用戶請求,應取得移動平均之計算結果(mv_avg)。Client Agent依此,向內部的運算節點取得運要之資源(此例,透過mpiexec將mv_avg程序分成多節點計算)
(3)mv_avg以MPICH2為基,可以分別提供3種計算方式,(A)單節點運算方式,(B)雙節點運算方式(即將工作分派于2個節點平均計算),(C)多節點計算方式(即以第0節點來發號施令,其餘節點依其派于工作進行計算)
(4)各節點交付計算任務後(計算5、10、15、20、30、40、60、80、120、180、240)等移動平均計算,各計算節點所需之資料來源,乃是透Hypertable Thrift API向Hypertable取得,各計算計後將結果交付第0節點
(5)第0節點依計算結果透過Client Agent,一一回覆給用戶端

以下為Mpi工作分配的主要源碼:


/*
mode-1

  [single node] --+

             ^    | task id == 0 , do it all
             |    |
             +----+

mode-2


  do #0,2,4...tasks

  [taskid == 0] ---+   +---[taskid == 1]
           ^  ^    |   |  do #1,3,5.... tasks
           |  |    |   |
           |  +----+   |
           +-----------+
             
mode-3

  [taskid == 0]          +--->[taskid == 1]

         ^ ^ ^           |  
         | | +-----------+ +->[taskid == 2]
 dispatch| +---------------+
  task # |    
         +------------------->[taskid == n]
                               finish task
                                and
                               reply data [seq][sendbuff=data1,data2,data3]...
         
*/


   if(ntasks == 1) /*mode-1, single node run all tasks*/

   {
       int i;
       for (i=taskid; i >sizeofar(mv_avg_para);i+=ntasks ) { 
 
      if(argc == 2)
          GetCandleDataDefault(sendbuff,baserec,basebuff, &realsize ,baserec,mv_avg_para[i] ,symb); /*from Hypertable*/
      else
          GetCandleDataPeriod(sendbuff,baserec,basebuff, &realsize ,baserec,mv_avg_para[i] ,symb, start_date,end_date); /*from Hypertable*/
      send_channel( channel, sendbuff , realsize,i);
               
       } 
   }
   else if(ntasks == 2) /*mode-2 , 2 nodes share all tasks*/
   {
       int wait=1;
       if( taskid == 0)
       {
           int i,j,count, source;
           for(i=0 ; i <= sizeofar(mv_avg_para) ;i++){
          task2do[i]= TASK_INPIT;
           }
           i=taskid;
           while(wait)
           {
               int flag;
               MPI_Iprobe(MPI_ANY_SOURCE, REPLY, MPI_COMM_WORLD,&flag, &status);
               if(flag) /*data received*/
               {
                   source = status.MPI_SOURCE;
                   MPI_Get_count(&status, MPI_LONG, &count);
                   count = count > baserec ? baserec : count;
                   MPI_Recv(inptbuff, count, MPI_LONG, source, REPLY, MPI_COMM_WORLD, &status);
                   task2do[inptbuff[0] ]=TASK_DONE;
              send_channel( channel, &inptbuff[1] , count-1,inptbuff[0]);

               }

      if ( i sizeofar(mv_avg_para) ) { 
        
    if(task2do[i ]==TASK_INPIT)
    {
                  task2do[i ]=TASK_RUN;
                  if(argc == 2)
                      GetCandleDataDefault(sendbuff,baserec,basebuff, &realsize ,baserec,mv_avg_para[i] ,symb); /*from Hypertable*/
                  else
                      GetCandleDataPeriod(sendbuff,baserec,basebuff, &realsize ,baserec,mv_avg_para[i] ,symb, start_date,end_date); /*from Hypertable*/
                      
                  send_channel( channel, sendbuff , realsize,i);
                  
                  task2do[i ]=TASK_DONE;
                  i+=ntasks;

              }

               
      } 
      wait=0;
      for (j=0; j sizeofar(mv_avg_para);j++ ) { 
      wait+=task2do[j];
      } 
           }

       }

       else /*taskid == 1*/
       {
           int i;
  for (i=taskid; i sizeofar(mv_avg_para);i+=ntasks ) { 
 
          if(argc == 2)
              GetCandleDataDefault(sendbuff,baserec,basebuff, &realsize ,baserec,mv_avg_para[i] ,symb); /*from Hypertable*/
          else
              GetCandleDataPeriod(sendbuff,baserec,basebuff, &realsize ,baserec,mv_avg_para[i] ,symb, start_date,end_date); /*from Hypertable*/
               outpbuff[0] = i;
          if(realsize > 0)
          {
          MPI_Send( outpbuff, realsize+1, MPI_LONG, 0, REPLY, MPI_COMM_WORLD ); 
      }
  } 
       }
   }
   else /*mode-3, multiple nodes tasks ,ntasks > 2*/
   {
    int i,ierr,j,count;
    int wait=1;
    if(taskid==0)  
    {
       for(i=0 ; i <= sizeofar(mv_avg_para) ;i++){
           task2do[i]= TASK_INPIT;
       }
       /*first dispatch task to nodes*/
       for(i=1,j=0 ; i < ntasks && j < sizeofar(mv_avg_para) ;i++,j++){
                    ierr=MPI_Send(&j,1,MPI_INT,
                      i,REQUEST,MPI_COMM_WORLD); 
           task2do[j]= TASK_RUN; 
       }
       
       /*receiving result , and assign next task*/
   while (wait) { 
       int pos;
       MPI_Recv( inptbuff, baserec+1, MPI_LONG, MPI_ANY_SOURCE, REPLY, MPI_COMM_WORLD, &status ); 
                MPI_Get_count(&status, MPI_LONG, &count);
                pos = inptbuff[0];
                
           send_channel( channel, &inptbuff[1] , count-1,inptbuff[0]);
           
       task2do[ pos  ]=TASK_DONE; 
       wait=0;
       for (i=0; i < sizeofar(mv_avg_para);i++ ) { 
           if(task2do[i] == TASK_INPIT)
           {
            /*dispatch next task*/
                        ierr=MPI_Send(&i,1,MPI_INT,
                      status.MPI_SOURCE ,REQUEST,MPI_COMM_WORLD);
               task2do[i]= TASK_RUN; 
           }
       wait+=task2do[i];
       } 
   } 
   
   /*all task was done , send 'finish(==0)' to all nodes*/
       for(i=1,j=0 ; i < ntasks && j < sizeofar(mv_avg_para) ;i++,j++){
           int noCalc=-1;
                    ierr=MPI_Send(&noCalc,1,MPI_INT,
                      i,REQUEST,MPI_COMM_WORLD);  /*all works done!!*/ 
  
       }
    }
    else
    {
       int mv_avg_pos=0;
   MPI_Recv( &mv_avg_pos, 1, MPI_INT, 0, REQUEST, 
    MPI_COMM_WORLD, &status ); 
       while(mv_avg_pos >= 0)
       {
           if(argc == 2)
               GetCandleDataDefault(sendbuff,baserec,basebuff, &realsize ,baserec,mv_avg_para[mv_avg_pos] ,symb); /*from Hypertable*/
           else
               GetCandleDataPeriod(sendbuff,baserec,basebuff, &realsize ,baserec,mv_avg_para[mv_avg_pos] ,symb, start_date,end_date); /*from Hypertable*/
                outpbuff[0] = mv_avg_pos;
           if(realsize > 0)
           {
           MPI_Send( outpbuff, realsize+1, MPI_LONG, 0, REPLY, MPI_COMM_WORLD ); 
       }
       MPI_Recv( &mv_avg_pos, 1, MPI_INT, 0, REQUEST, 
        MPI_COMM_WORLD, &status ); 
   }
        }
   }
   
   MPI_Barrier(MPI_COMM_WORLD);



依上述的計算結果
(1)Hypertable 僅有300筆的情況下的簡單計算
   mode-1:1.397秒 (1 process 1 nodes)

   mode-2:1.296秒 (2 processes 2 nodes)
   mode-3:1.298秒 (3 processes under 2 nodes)
   mode-3:1.216秒 (11 processes under 2 nodes)
(2)模擬各節點需,每個round(每計算一次移動平均)需要耗時1秒以上複雜計算時的結果
   mode-1:11.015秒 (1 process 1 nodes)

   mode-2:8.015秒(2 processes 2 nodes)
   mode-3:10.018秒(3 processes under 2 nodes)
   mode-3:2.031秒(11 processes under 2 nodes)
PS. 模擬耗工作,是以計算完移動平均後,再Sleep 1秒鐘為之。因為是簡單表示工作分派,不作其他資源耗用之假想

以上結果,對於表示的平行運算,只能粗淺意會,大略呈現分散運算的概念,實際需要考量現實世界的因素還有很多,需要加輔經驗的驗證方能有效適用各項計算工作。

實際上,若1 個compute node配置一個CPU或Core,上述的模擬應可大略表達分散運算的目的。在進行分散運算時,仍要注意工作內容是否採分散運算較佳或者集中1個process較佳
(1)compute nodes的 CPU & RAM資源的分佈狀況(是否足以進行分散運算)
(2)service process本身的工作是否能夠進行拆解
(3)service process的工作拆解後,分散於各節點,會不會反而因為網路傳送耗時而失去意義
(4)分散運算有其資源使用與最快速計算時間中,最適當的滿足點。應視運算成本與可容許計算時間之間,取得一個平衡

平台可再擴充之部分
(1)配合PBS進行批次作業之工作分派,以及工作之監控
(2)配合PBS對於常用之Service Process採常駐形式進行服務
(3)用戶與Client Agent間可以用多種介面支援XML, Json並輔以資料加密與壓縮

沒有留言:

張貼留言

文章分類