note: part 3 of a series on using MPI in python, I suggest reading the previous one before continuing

Refresher

In the previous part of this series, we continued to try to do some very simple parallel computing (without being “embarrassingly parallel”) using MPI. The task was simple: have several independent processes(procs) guess random integers from 1 to 10, scoring points for integers haven’t been guessed yet. We achieved something kind of close to this, but we weren’t able to de-sync the procs. Basically, even by making some procs slower tha others– each was still getting the same number of guesses over the full computation. The solution was to step away from the usual results you get when you google how to use MPI: allreduce, gather, broadcast, etc…. Instead, we are going to use nonblockng MPI message passing.

Explicitly nonblocking communication in MPI

Primarily, our savior is comm.Iprobe(source=i), which will return a boolean depending on wether the given source has incoming messages for the proc or not. This is a built in way to implement the workaround that we tried previously ( recall our “message” boolean) but in a nonblocking way. It will not hang if there isn’t a message yet, it will simply return a False and keep going. With this tool, we can manually control the messages that flow in and out of proc 0 in a way that allows all procs to go at their own pace. Let’s use comm.Iprobe to retool our code:

# the beginning is the same, but well need to initialize some extra variables
final_list = []
timeout = 100

# the break condition will come at the end, rather than at the beginning. here we add a timeout in case of unexpected failures
while i < timeout:
    i += 1
    sleep(rank+.1)
    
    if rank != 0:
        # use comm.Iprobe to check if there is a new list incoming from proc 0
        # if there is, we update our local list, we need a local list because
        # broadcasting from proc 0 is blocking communication
        if comm.Iprobe(source=0, tag=1):
            final_list = comm.recv(source=0, tag=1)

        tprint(f'rank {rank} guess {i} current list:{final_list}')
        trial_n = np.random.randint(10)
        tprint(f'rank {rank} guess {i}: {trial_n}; has {j} points \r ')
        sys.stdout.flush()

        if trial_n not in final_list:
            j += 1
            # note we don't need the message boolean anymore
            comm.send(trial_n, dest=0, tag=0)


    if rank==0:
        new_vals = []
        for i in range(size-1):
            # this will check if there is a new number pending on any proc and then add it to the master list (kept on proc 0)
            if comm.Iprobe(source=i+1):
                j += 1
                trial_n = comm.recv(source=i+1, tag=0)
                final_list.append(trial_n)
                final_list = list(set(final_list))
                
                tprint(f'added {trial_n} to list, list {j} is {final_list}')
                sys.stdout.flush()

                # every time we add a number to the list, we send it out to all other procs
                for i in range(size-1):
                    comm.send(final_list, dest=i+1, tag=1)
    # once each proc receives a totally full list, it knows that the game is over
    if len(final_list) > 9:
        break

# this barrier means no proc prints its score until all the procs are done. 
comm.barrier()
if rank !=0:
    tprint(f'rank {rank} got {j} points with {i} guesses')

One more catch!

Ok, so the script above still isn’t quite going to cut it. If you run it you will find that, while the loops can now happen independently, the local lists do not stay up to date with the newest iteration. This can be fixed by changing one word only. Before looking at the solution, maybe take some time to analyze what you think is happening. Run the code a few times and pay attention to what is printed…

Solution

Now, that you have done that, here is the answer. All you need to do is change

if comm.Iprobe(source=0, tag=1):
    final_list = comm.recv(source=0, tag=1)

to

while comm.Iprobe(source=0, tag=1):
    final_list = comm.recv(source=0, tag=1)

You see, the issue is that the list update messages from proc 0 are queued up, and if you just check if there is a message or not and then update the list one time you will only get the oldest message in the queue. The solution is, then, to completely clear out the queue each time you get to it. A while loop does this perfectly, because it will keep returning true until there are no more messages in the queue. This means you have received the most recent list and are good to go!