主機架構
選用的Hypertable+Hadoop在先前的文章裡已有描述其安裝與建置步驟;而MPICH2+Torque的建置過程也在可以這裡找到。建置過程,礙於成本上考量,每一台主機皆是以虛擬機來達成多台的部署。在MPICH2+Torque的架構上,選用2台電腦做為分散式計算的compute nodes,其中一台為主要分散式運算服務的發送指令主機(Head node)
在這個案例裡,利用Client Agent(cagent)在compute node上來當作服務客戶要求的程序(process),採multiprocess方式運作,每當一個用戶端連線時,隨即在compute node上fork出一個程式來服務它。後續再依用戶端的要求,對2台compute node進行相對應服務的取得(分散運算或雲端運算)
上圖,即是針對上述的概念所進行一項實作內容的雛形架構。服務用戶端的程式為cagent(Client Agent),而用戶端連線後,想要取得計算移動平均線的服務mv_avg ( moving average)。 從流程的概念上分述如下:
(1)用戶端以TCP/IP對Client Agent取得連線的同意後,Client Agent於是fork另一程式出來進行單獨服務
(2)Client Agent依用戶請求,應取得移動平均之計算結果(mv_avg)。Client Agent依此,向內部的運算節點取得運要之資源(此例,透過mpiexec將mv_avg程序分成多節點計算)
(3)mv_avg以MPICH2為基,可以分別提供3種計算方式,(A)單節點運算方式,(B)雙節點運算方式(即將工作分派于2個節點平均計算),(C)多節點計算方式(即以第0節點來發號施令,其餘節點依其派于工作進行計算)
(4)各節點交付計算任務後(計算5、10、15、20、30、40、60、80、120、180、240)等移動平均計算,各計算節點所需之資料來源,乃是透Hypertable Thrift API向Hypertable取得,各計算計後將結果交付第0節點
(5)第0節點依計算結果透過Client Agent,一一回覆給用戶端
以下為Mpi工作分配的主要源碼:
/*
mode-1
[single node] --+
^ | task id == 0 , do it all
| |
+----+
mode-2
do #0,2,4...tasks
[taskid == 0] ---+ +---[taskid == 1]
^ ^ | | do #1,3,5.... tasks
| | | |
| +----+ |
+-----------+
mode-3
[taskid == 0] +--->[taskid == 1]
^ ^ ^ |
| | +-----------+ +->[taskid == 2]
dispatch| +---------------+
task # |
+------------------->[taskid == n]
finish task
and
reply data [seq][sendbuff=data1,data2,data3]...
*/
if(ntasks == 1) /*mode-1, single node run all tasks*/
{
int i;
for (i=taskid; i
if(argc == 2)
GetCandleDataDefault(sendbuff,baserec,basebuff, &realsize ,baserec,mv_avg_para[i] ,symb); /*from Hypertable*/
else
GetCandleDataPeriod(sendbuff,baserec,basebuff, &realsize ,baserec,mv_avg_para[i] ,symb, start_date,end_date); /*from Hypertable*/
send_channel( channel, sendbuff , realsize,i);
}
}
else if(ntasks == 2) /*mode-2 , 2 nodes share all tasks*/
{
int wait=1;
if( taskid == 0)
{
int i,j,count, source;
for(i=0 ; i <= sizeofar(mv_avg_para) ;i++){
task2do[i]= TASK_INPIT;
}
i=taskid;
while(wait)
{
int flag;
MPI_Iprobe(MPI_ANY_SOURCE, REPLY, MPI_COMM_WORLD,&flag, &status);
if(flag) /*data received*/
{
source = status.MPI_SOURCE;
MPI_Get_count(&status, MPI_LONG, &count);
count = count > baserec ? baserec : count;
MPI_Recv(inptbuff, count, MPI_LONG, source, REPLY, MPI_COMM_WORLD, &status);
task2do[inptbuff[0] ]=TASK_DONE;
send_channel( channel, &inptbuff[1] , count-1,inptbuff[0]);
}
if ( i
if(task2do[i ]==TASK_INPIT)
{
task2do[i ]=TASK_RUN;
if(argc == 2)
GetCandleDataDefault(sendbuff,baserec,basebuff, &realsize ,baserec,mv_avg_para[i] ,symb); /*from Hypertable*/
else
GetCandleDataPeriod(sendbuff,baserec,basebuff, &realsize ,baserec,mv_avg_para[i] ,symb, start_date,end_date); /*from Hypertable*/
send_channel( channel, sendbuff , realsize,i);
task2do[i ]=TASK_DONE;
i+=ntasks;
}
}
wait=0;
for (j=0; j
wait+=task2do[j];
}
}
}
else /*taskid == 1*/
{
int i;
for (i=taskid; i
if(argc == 2)
GetCandleDataDefault(sendbuff,baserec,basebuff, &realsize ,baserec,mv_avg_para[i] ,symb); /*from Hypertable*/
else
GetCandleDataPeriod(sendbuff,baserec,basebuff, &realsize ,baserec,mv_avg_para[i] ,symb, start_date,end_date); /*from Hypertable*/
outpbuff[0] = i;
if(realsize > 0)
{
MPI_Send( outpbuff, realsize+1, MPI_LONG, 0, REPLY, MPI_COMM_WORLD );
}
}
}
}
else /*mode-3, multiple nodes tasks ,ntasks > 2*/
{
int i,ierr,j,count;
int wait=1;
if(taskid==0)
{
for(i=0 ; i <= sizeofar(mv_avg_para) ;i++){
task2do[i]= TASK_INPIT;
}
/*first dispatch task to nodes*/
for(i=1,j=0 ; i < ntasks && j < sizeofar(mv_avg_para) ;i++,j++){
ierr=MPI_Send(&j,1,MPI_INT,
i,REQUEST,MPI_COMM_WORLD);
task2do[j]= TASK_RUN;
}
/*receiving result , and assign next task*/
while (wait) {
int pos;
MPI_Recv( inptbuff, baserec+1, MPI_LONG, MPI_ANY_SOURCE, REPLY, MPI_COMM_WORLD, &status );
MPI_Get_count(&status, MPI_LONG, &count);
pos = inptbuff[0];
send_channel( channel, &inptbuff[1] , count-1,inptbuff[0]);
task2do[ pos ]=TASK_DONE;
wait=0;
for (i=0; i <
if(task2do[i] == TASK_INPIT)
{
/*dispatch next task*/
ierr=MPI_Send(&i,1,MPI_INT,
status.MPI_SOURCE ,REQUEST,MPI_COMM_WORLD);
task2do[i]= TASK_RUN;
}
wait+=task2do[i];
}
}
/*all task was done , send 'finish(==0)' to all nodes*/
for(i=1,j=0 ; i < ntasks && j < sizeofar(mv_avg_para) ;i++,j++){
int noCalc=-1;
ierr=MPI_Send(&noCalc,1,MPI_INT,
i,REQUEST,MPI_COMM_WORLD); /*all works done!!*/
}
}
else
{
int mv_avg_pos=0;
MPI_Recv( &mv_avg_pos, 1, MPI_INT, 0, REQUEST,
MPI_COMM_WORLD, &status );
while(mv_avg_pos >= 0)
{
if(argc == 2)
GetCandleDataDefault(sendbuff,baserec,basebuff, &realsize ,baserec,mv_avg_para[mv_avg_pos] ,symb); /*from Hypertable*/
else
GetCandleDataPeriod(sendbuff,baserec,basebuff, &realsize ,baserec,mv_avg_para[mv_avg_pos] ,symb, start_date,end_date); /*from Hypertable*/
outpbuff[0] = mv_avg_pos;
if(realsize > 0)
{
MPI_Send( outpbuff, realsize+1, MPI_LONG, 0, REPLY, MPI_COMM_WORLD );
}
MPI_Recv( &mv_avg_pos, 1, MPI_INT, 0, REQUEST,
MPI_COMM_WORLD, &status );
}
}
}
MPI_Barrier(MPI_COMM_WORLD);
依上述的計算結果
(1)Hypertable 僅有300筆的情況下的簡單計算
mode-1:1.397秒 (1 process 1 nodes)
mode-2:1.296秒 (2 processes 2 nodes)
mode-3:1.298秒 (3 processes under 2 nodes)
mode-3:1.216秒 (11 processes under 2 nodes)
mode-1:11.015秒 (1 process 1 nodes)
mode-2:8.015秒(2 processes 2 nodes)
mode-3:10.018秒(3 processes under 2 nodes)
mode-3:2.031秒(11 processes under 2 nodes)
PS. 模擬耗工作,是以計算完移動平均後,再Sleep 1秒鐘為之。因為是簡單表示工作分派,不作其他資源耗用之假想
以上結果,對於表示的平行運算,只能粗淺意會,大略呈現分散運算的概念,實際需要考量現實世界的因素還有很多,需要加輔經驗的驗證方能有效適用各項計算工作。
實際上,若1 個compute node配置一個CPU或Core,上述的模擬應可大略表達分散運算的目的。在進行分散運算時,仍要注意工作內容是否採分散運算較佳或者集中1個process較佳
(1)compute nodes的 CPU & RAM資源的分佈狀況(是否足以進行分散運算)
(2)service process本身的工作是否能夠進行拆解
(3)service process的工作拆解後,分散於各節點,會不會反而因為網路傳送耗時而失去意義
(4)分散運算有其資源使用與最快速計算時間中,最適當的滿足點。應視運算成本與可容許計算時間之間,取得一個平衡
平台可再擴充之部分
(1)配合PBS進行批次作業之工作分派,以及工作之監控
(2)配合PBS對於常用之Service Process採常駐形式進行服務
(3)用戶與Client Agent間可以用多種介面支援XML, Json並輔以資料加密與壓縮
沒有留言:
張貼留言