The program, tspawn.py, shows how the new routines can be used. It prints a number of constants and attributes. It then uses mpi_comm_spawn to launch a 3 instances of second Python MPI program, worker.py. Each instance of this second program prints creates a file with that contains a date stamp.
The program tspawn.py and worker.py engage in communications, Bradcast, Scatter, Send, Receive, and Reduction.
Finally, the tspawn.py program also starts a "C" version of the worker program. The source for these programscan be found in mpi_tests/mpi2Examples.
Example 7-1. tspawn.py
#!/usr/bin/env python
import numpy
from numpy import *
import mpi
import sys
from time import sleep
from os import getcwd
sys.argv = mpi.mpi_init(len(sys.argv),sys.argv)
t1=mpi.mpi_wtime()
tick=mpi.mpi_wtick()
#print the module version
print " MYMPI VERSION",mpi.VERSION,"\n"
#test attributes
print "MPI_WTIME_IS_GLOBAL",mpi.mpi_attr_get(mpi.MPI_COMM_WORLD,mpi.MPI_WTIME_IS_GLOBAL)
print " MPI_UNIVERSE_SIZE",mpi.mpi_attr_get(mpi.MPI_COMM_WORLD,mpi.MPI_UNIVERSE_SIZE)
print " MPI_TAG_UB",mpi.mpi_attr_get(mpi.MPI_COMM_WORLD,mpi.MPI_TAG_UB)
print " MPI_HOST",mpi.mpi_attr_get(mpi.MPI_COMM_WORLD,mpi.MPI_HOST)
print " MPI_IO",mpi.mpi_attr_get(mpi.MPI_COMM_WORLD,mpi.MPI_IO)
print "%s%1.1d%s%1.1d" % ("mpi version ",mpi.MPI_VERSION,".",mpi.MPI_SUBVERSION)
parent=mpi.mpi_comm_get_parent()
if (parent == mpi.MPI_COMM_NULL) :
print mpi.mpi_get_processor_name(),"running head "
copies=3
##### start up remote tasks ####
toRun=getcwd()+"/worker.py"
print mpi.mpi_get_processor_name(),"starting",toRun
newcom1=mpi.mpi_comm_spawn(toRun,"from_P_",copies,mpi.MPI_INFO_NULL,0,mpi.MPI_COMM_WORLD)
errors=mpi.mpi_array_of_errcodes()
print "errors=",errors
newcom1Size=mpi.mpi_comm_size(newcom1)
print "newcom1Size",newcom1Size," yes it is strange but it should be 1"
##### bcast ####
x=array(([1,2,3,4]),"i")
count=4
print "head starting bcast",x
junk=mpi.mpi_bcast(x,count,mpi.MPI_INT,mpi.MPI_ROOT,newcom1)
print "head did bcast"
##### scatter ####
scat=array([10,20,30],"i")
junk=mpi.mpi_scatter(scat,1,mpi.MPI_INT,1,mpi.MPI_INT,mpi.MPI_ROOT,newcom1)
##### send/recv ####
for i in range(0,copies):
k=(i+1)*100
mpi.mpi_send(k,1,mpi.MPI_INT,i,1234,newcom1)
back=mpi.mpi_recv(1,mpi.MPI_INT,i,5678,newcom1)
print "from ",i,back
##### reduce ####
dummy=1000
final=mpi.mpi_reduce(dummy,1,mpi.MPI_INT,mpi.MPI_SUM,mpi.MPI_ROOT,newcom1)
sleep(5)
print "the final answer is=",final
toRun=getcwd()+"/worker"
print mpi.mpi_get_processor_name(),"starting",toRun
newcom2=mpi.mpi_comm_spawn(toRun,"from_C_",copies,mpi.MPI_INFO_NULL,0,mpi.MPI_COMM_WORLD)
errors=mpi.mpi_array_of_errcodes()
print "errors=",errors
newcom2Size=mpi.mpi_comm_size(newcom2)
print "newcom2Size",newcom2Size
sleep(15)
t2=mpi.mpi_wtime()
print "run time=",t2-t1,"with resolution=",tick
mpi.mpi_comm_free(newcom1)
mpi.mpi_comm_free(newcom2)
mpi.mpi_finalize() |
Example 7-2. worker.py
#!/usr/bin/env python import numpy from numpy import * import mpi import sys from time import gmtime, time,sleep def stamp(): timeTuple = gmtime(time())[1:6] return "%02d%02d%02d%02d%02d" % timeTuple sys.argv = mpi.mpi_init(len(sys.argv),sys.argv) myid=mpi.mpi_comm_rank(mpi.MPI_COMM_WORLD) numprocs=mpi.mpi_comm_size(mpi.MPI_COMM_WORLD) parent=mpi.mpi_comm_get_parent() parentSize=mpi.mpi_comm_size(parent) print "parentSize",parentSize tod=stamp() s=sys.argv[1]+"%2.2d" % myid print "hello from python worker",myid," writing to ",s x=array([5,3,4,2],'i') print "starting bcast" buffer=mpi.mpi_bcast(x,4,mpi.MPI_INT,0,parent) out = open(s, "w") out.write(str(buffer)) out.write(tod+"\n") out.close() print myid," got ",buffer junk=mpi.mpi_scatter(x,1,mpi.MPI_INT,1,mpi.MPI_INT,0,parent) print myid," got scatter ",junk back=mpi.mpi_recv(1,mpi.MPI_INT,0,1234,parent) back[0]=back[0]+1 mpi.mpi_send(back,1,mpi.MPI_INT,0,5678,parent) dummy=myid final=mpi.mpi_reduce(dummy,1,mpi.MPI_INT,mpi.MPI_SUM,0,parent) sleep(10) mpi.mpi_comm_free(parent) mpi.mpi_finalize() |
Example 7-3. worker.c
#include <stdio.h>
#include <mpi.h>
#include <math.h>
void lam_darwin_malloc_linker_hack(){};
void timestmp(char *timestr);
int main(argc,argv)
int argc;
char *argv[];
{
int myid, numprocs;
FILE *f1;
int i;
int ierr;
MPI_Comm parent;
char fname[10];
char stamp[11];
MPI_Init(&argc,&argv);
MPI_Comm_size(MPI_COMM_WORLD,&numprocs);
MPI_Comm_rank(MPI_COMM_WORLD,&myid);
MPI_Comm_get_parent(&parent);
sprintf(fname,"%s%2.2d",argv[1],myid);
printf("Hello from c worker %d writing to %s\n",myid,fname);
f1=fopen(fname,"w");
timestmp(stamp);
fprintf(f1,"%s\n",stamp);
fclose(f1);
sleep(10);
MPI_Comm_free(&parent);
MPI_Finalize();
} |
Example 7-4. driver program for tspawn
#include <time.h>
#include <sys/time.h>
#include <string.h>
void timestmp(char *timestr) {
/* char timestr[11]; */
struct timeval tp;
struct timezone tzp;
struct tm *tmstruct;
time_t tod;
gettimeofday(&tp,&tzp);
tod=tp.tv_sec;
tmstruct=gmtime(&tod);
/* tmstruct=localtime(&tod); */
sprintf(timestr,"%2.2d%2.2d%2.2d%2.2d%2.2d",tmstruct->tm_mon+1,
tmstruct->tm_mday,
tmstruct->tm_hour,
tmstruct->tm_min,
tmstruct->tm_sec);
timestr[10]=(char)0;
} |