7. Examples

dispy can be used to distribute standalone programs or Python program fragments (functions, classes, modules) and files (programs, data) to nodes and execute jobs in parallel. It supports various options to handle rather comprehensive use cases (such as fault tolerance, sharing nodes in multiple programs simultaneously, using nodes in multiple networks etc.); however, in common setups, the usage is simple, as done in the demonstrative examples below. Some of these examples are also available in ‘examples’ directory where dispy is installed, which can be obtained with the program:

import os, dispy
print(os.path.join(os.path.dirname(dispy.__file__), 'examples'))

Short summary of examples (follow the links for details):

  1. Command-line uses dispy as command line tool to distribute and run a standalone program.

  2. Python script is a simple Python program that distributes and runs a standalone program.

  3. Canonical program distributes and executes a Python function with different parameters. The program also prints some of the attributes of each job after it finishes (such as job’s result, start time).

  4. Distributing objects shows how to send Python objects (Class instances) in client program to remote nodes to execute Python function (which takes those objects as arguments).

  5. In-memory processing is one use of setup (and cleanup) feature to initialize nodes before executing jobs. In this case, setup is used to load data in files transferred from client in to memory (global variables); each job then uses the data (for read-only) in these variables for in-memory processing (i.e., access data already in memory instead of reading data from file in each job).

  6. Replace in-memory data is a variation of In-memory processing. In this example, resetup_node feature is used to replace in-memory data set dynamically so different data sets can be processed.

  7. Updating globals shows how to use Python’s sharedcypes to initialize (using setup feature as above) a global variable and then update in jobs so that all jobs on a node will see updated value.

  8. Sending files to client illustrates how jobs can use dispy_send_file function to send files to client.

  9. Process status notifications uses cluster_status feature to process each job’s result as they are finished.

  10. Efficient job submission uses job_status feature to submit just enough jobs to keep scheduler use all CPUs in cluster, instead of all at once, which can cause memory problems.

  11. Long running computations shows use cases for handling KeyboardInterrupt when jobs are being terminated. In this case, computations send latest (best) value as result of computation.

  12. Execute different computations demonstrates how to execute different computations with same cluster.

  13. Update nodes sends a script to each node to be executed once. The script can be customized to run routine tasks such as sending node status, updating software package etc.

  14. Port forwarding with SSH example uses ssh to forward port to use nodes in remote network (in this case Amazon EC2 cloud instance).

  15. Recover jobs / nodes shows how to recover jobs and nodes after a client crashes or loses network connection to nodes.

  16. Cluster creation lists various ways to create clusters for different configurations.

  17. MapReduce is a simple implementation of well-known map-reduce problem.

7.1. Command-Line

dispy can be used as a command line tool (for simple cases, scheduling cron jobs); in this case the computations should only be programs and dependencies should only be files.:

dispy_cmd.py -f /some/file1 -f file2 -a "arg11 arg12" -a "arg21 arg22" -a "arg3" /some/program

will distribute ‘/some/program’ with dependencies ‘/some/file1’ and ‘file2’ and then execute ‘/some/program’ in parallel with 3 instances: a) arg11 and arg12 (two arguments to the program), b) arg21 and arg22 (two arguments), and c) arg3 (one argument).

7.2. Python Script

A simple client program that distributes a program (say, ‘/path/to/program’), executes them with a sequence of numbers as arguments is:

import dispy
cluster = dispy.JobCluster('/path/to/program')
for i in range(50):
    cluster.submit(i)

The program ‘/path/to/program’ on the client computer is transferred to each of the nodes, so if the program is a binary program then all the nodes should have same architecture as the client.

In the cases above we assume that the programs execute and save the computation results in a database, file system etc. If we are interested in exit status, output from each run etc., then we need to collect each of the jobs submitted from which interested attributes can be retrieved, as done in the example below.

7.3. Canonical Program

A canonical cluster that distributes computation ‘compute’ (Python function) to nodes (running dispynode (Server) on a local network), schedules jobs with the cluster, gets jobs’ results and prints them is sample.py:

# function 'compute' is distributed and executed with arguments
# supplied with 'cluster.submit' below
def compute(n):
    import time, socket
    time.sleep(n)
    host = socket.gethostname()
    return (host, n)

if __name__ == '__main__':
    # executed on client only; variables created below, including modules imported,
    # are not available in job computations
    import dispy, random
    # distribute 'compute' to nodes; in this case, 'compute' does not have
    # any dependencies to run on nodes
    cluster = dispy.JobCluster(compute)
    # run 'compute' with 20 random numbers on available CPUs
    jobs = []
    for i in range(20):
        job = cluster.submit(random.randint(5,20))
        jobs.append(job)
    # cluster.wait() # waits until all jobs finish
    for job in jobs:
        host, n = job() # waits for job to finish and returns results
        print('%s executed job %s at %s with %s' % (host, job.id, job.start_time, n))
        # other fields of 'job' that may be useful:
        # job.stdout, job.stderr, job.exception, job.ip_addr, job.end_time
    cluster.print_status()  # shows which nodes executed how many jobs etc.

7.4. Distributing Objects

If the computation has any dependencies, such as classes, objects or files, they can be specified with ‘depends’ argument and dispy will distribute them along with the computation.

Continuing trivial but illustrative examples, the program obj_instances.py below distributes computation to be executed with instances of a class:

class C(object):
    def __init__(self, i, n):
        self.i = i
        self.n = n

    def show(self):
        print('%s: %.2f' % (self.i, self.n))

def compute(obj):
    # obj is an instance of C
    import time
    time.sleep(obj.n)
    obj.show()  # the output is stored in job.stdout
    return obj.n

if __name__ == '__main__':
    import random, dispy
    # 'compute' needs definition of class C, so include in 'depends'
    cluster = dispy.JobCluster(compute, depends=[C])
    jobs = []
    for i in range(10):
        c = C(i, random.uniform(1, 3)) # create object of C
        job = cluster.submit(c) # it is sent to a node for executing 'compute'
        job.id = c # store this object for later use
        jobs.append(job)
    for job in jobs:
        job() # wait for job to finish
        print('%s: %.2f / %s' % (job.id.i, job.result, job.stdout))

Note that class C is given in ‘depends’ so the code for it is transferred to the nodes automatically and the objects created in client program work transparently in ‘compute’ on remote nodes. The objects are serialized using Pickle and sent over the to the nodes, so the objects must be serializable. If they are not serializable (e.g., they contain references to locks), then the class must provide __getstate__ and __setstate__ methods; see Python object serialization for details. In addition, the objects shouldn’t contain file descriptors, references to other objects not being transferred etc., which are not valid on remote nodes.

7.5. In-memory Processing

setup and cleanup parameters to cluster can be used to initialize/de-initialize a node for running jobs of that computation, e.g., to manipulate transferred files, read data into memory so jobs can process data efficiently, set/unset global variables on that node.

In the example below, setup function is used to read data in a file transferred from client in to memory on each node and jobs that are executed later use the data in memory (for in-memory processing) instead of reading from file in every job. This feature works with Posix systems (Linux, OS X and other Unix variants) without limitations - any data can be assigned to variables declared global in setup, as operating system’s fork is used to create child process, which shares the address space of parent process (where setup function is executed) with copy-on-write. This feature can be used, for example, to read large amount of data in file(s) so computations (jobs) can directly access the data in memory, instead of reading same data from file each time.

Under Windows, though, fork is not available, so the global variables are serialized and passed to child process (see multiprocessing’s Programming guidelines for Windows). Thus, for example, modules can’t be loaded in global scope with setup under Windows. Moreover, as each job runs with a copy of all the global variables, initializing objects that require lot of memory may not be possible / efficient (compared to Posix systems where objects are not copied). See pycos’s distributed communicating processes for an alternate approach that doesn’t have these limitations.

The setup function in program node_setup.py below reads the data in a file (transferred with depends) in to global variable. The jobs compute checksum of that data in memory The cleanup function deletes the global variable:

def setup(data_file):
    # read data in file to global variable
    global data, algorithms, hashlib, time, file_name

    import hashlib, time
    data = open(data_file).read()  # read file in to memory; data_file can now be deleted
    file_name = data_file
    if sys.version_info.major > 2:
        data = data.encode() # convert to bytes
        algorithms = list(hashlib.algorithms_guaranteed)
    else:
        algorithms = hashlib.algorithms
    # if running under Windows, modules can't be global, as they are not
    # serializable; instead, they must be loaded in 'compute' (jobs); under
    # Posix (Linux, OS X and other Unix variants), modules declared global in
    # 'setup' will be available in 'compute'

    # 'os' module is already available (loaded by dispynode)
    if os.name == 'nt':  # remove modules under Windows
        del hashlib, time
    return 0

# 'cleanup' should have same argument as 'setup'
def cleanup(data_file):
    global data, algorithms, hashlib, time, file_name
    del data, algorithms, file_name
    if os.name != 'nt':
        del hashlib, time

def compute(n):
    global hashlib, time
    if os.name == 'nt': # Under Windows modules must be loaded in jobs
        import hashlib, time
    # 'data' and 'algorithms' global variables are initialized in 'setup'
    alg = algorithms[n % len(algorithms)]
    csum = getattr(hashlib, alg)()
    csum.update(data)
    time.sleep(2)
    return (dispy_node_ip_addr, file_name, alg, csum.hexdigest())

if __name__ == '__main__':
    import dispy, sys, os, glob, time

    # each node processes a file in 'data_files' with 'NodeAllocate.allocate'
    data_files = glob.glob(os.path.join(os.path.dirname(sys.argv[0]), '*.py'))
    node_id = 0

    # sub-class NodeAllocate to use node (and computation) specific 'depends'
    # and 'setup_args'
    class NodeAllocate(dispy.NodeAllocate):
        def allocate(self, cluster, ip_addr, name, cpus, avail_info=None, platform='*'):
            global node_id
            data_file = data_files[node_id % len(data_files)]
            node_id += 1
            print('Node %s (%s) processes "%s"' % (ip_addr, name, data_file))
            self.depends = [data_file] # 'depends' must be a list
            self.setup_args = (data_file,) # 'setup_args' must be a tuple
            return cpus

    cluster = dispy.JobCluster(compute, nodes=[NodeAllocate('*')], setup=setup, cleanup=cleanup)
    # wait until (enough) nodes to be setup; alternately, 'cluster_status' can
    # be used to process nodes found
    time.sleep(5)
    jobs = []
    for n in range(10):
        job = cluster.submit(n)
        jobs.append(job)

    for job in jobs:
        job()
        if job.status == dispy.DispyJob.Finished:
            print('%s: "%s" %s: %s' % job.result)
        else:
            print(job.exception)
    cluster.print_status()
    cluster.close()

The above program is written to work under Posix (Linux, OS X, etc) as well as Windows. If running under Posix only, ‘computation’ can use ‘hashlib’ as global variable (initialized in ‘setup’ function) to avoid loading in jobs.

7.6. Replace In-memory Data

As illustrated in In-memory processing, computation’s setup function can store data in global variables which are accessible in all computation jobs. If it is required to replace this in-memory data (e.g., if data sets are more than number of available nodes to read all data), resetup_node method can be used by clients to re-initialize nodes. Although this feature can be used for any purpose, in this example it is used for changing in-memory data. Since in-memory data feature doesn’t work with Windows, this example doesn’t work under Windows.

The program replace_inmem.py below is a minor variation of In-memory processing:

def setup(data_file, n):
    global data, algorithms, hashlib, file_name

    import hashlib, os, sys
    data = open(data_file, 'rb').read()  # read data in file to global variable
    os.remove(data_file)  # data_file can now be deleted
    file_name = data_file
    if sys.version_info.major > 2:
        algorithms = list(hashlib.algorithms_guaranteed)
    else:
        algorithms = hashlib.algorithms
    algorithms = [alg for alg in algorithms if (not alg.startswith('shake'))]
    return 0

def cleanup(data_file, n):
    global data, algorithms, hashlib, file_name
    del data, algorithms, file_name

def compute(i, n):
    # 'hashlib', 'data' and 'algorithms' global variables are initialized in 'setup'
    alg = algorithms[i % len(algorithms)]
    csum = getattr(hashlib, alg)()
    csum.update(data)
    time.sleep(n)
    return (dispy_node_ip_addr, file_name, alg, csum.hexdigest())

def job_status(job):
    if job.status == dispy.DispyJob.Finished:
        print('\njob %s finished by %s, %s of %s is %s' % (job.id, job.result[0], job.result[2],
                                                           job.result[1], job.result[3]))
    else:
        print('\njob %s failed: %s' % (job.id, job.exception))

if __name__ == '__main__':
    import dispy, sys, os, glob, random
    data_files = glob.glob(os.path.join(os.path.abspath(os.path.dirname(sys.argv[0])), '*.py'))
    file_id = 0
    nodes = {}

    class NodeAllocate(dispy.NodeAllocate):
        def allocate(self, cluster, ip_addr, name, cpus, avail_info=None, platform='*'):
            global file_id
            if len(nodes) == 2:
                # use at most 2 nodes, to illustrate this example
                return 0
            if platform.startswith('Windows'):
                # In-memory is not supported with Windows
                print('Ignoring node %s as in-memory data is not supported with Windows' % ip_addr)
                return 0
            data_file = data_files[file_id % len(data_files)]
            file_id += 1
            nodes[ip_addr] = data_file
            print('Node %s (%s) processes "%s"' % (ip_addr, name, data_file))
            self.setup_args = (data_file, file_id)
            self.depends = [data_file]
            return max(2, cpus)  # use at most 2 cpus for illustration

    cluster = dispy.JobCluster(compute, nodes=[NodeAllocate(host='*')],
                               setup=setup, cleanup=cleanup, job_status=job_status,
                               loglevel=dispy.logger.DEBUG)
    for i in range(1, 7):
        job = cluster.submit(i, random.uniform(2, 5))
        job.id = i
    cluster.wait()  # not needed if jobs have been scheduled at node
    for ip_addr in nodes:
        cluster.resetup_node(ip_addr)  # close
    for i in range(i+1, i+7):
        job = cluster.submit(i, random.uniform(2, 5))
        job.id = i
    cluster.wait()

    cluster.print_status()
    cluster.close()

When resetup_node is called, that node will run cleanup function (after all currently running jobs are finished), if one is given, without releasing itself from current scheduler or removing any files left behind (so new setup function and jobs can use those files). The scheduler will then reallocate that node to by calling allocate method of NodeAllocate instance. If that method allocates that node, the setup_args and depends of that instance are used for initializing (by transferring depends) and calling setup function with new setup_args. In the above example, allocate method allocates at most 2 CPUs so each node runs at most 2 jobs simultaneously. It also rejects any nodes running under Windows, sets setup_args and depends to sequentially process files. Since allocate method only allocates 2 nodes, the example will force new setup to run to load different files in memory at those nodes.

7.7. Updating Globals

The example above creates global variables that can be read (but not update) in jobs. setup and cleanup can also be used to create global variables that can be updated in jobs on a node (but not by jobs in other nodes) using multiprocessing module’s Sharing state between processes. The program node_shvars.py below creates an integer shared variable that is updated by jobs running on that node:

def setup():
    import multiprocessing, multiprocessing.sharedctypes
    global shvar
    lock = multiprocessing.Lock()
    shvar = multiprocessing.sharedctypes.Value('i', 1, lock=lock)
    return 0

def cleanup():
    global shvar
    del shvar

def compute():
    import random
    r = random.randint(1, 10)
    global shvar
    shvar.value += r  # update 'shvar'; all jobs on this node will
                      # see updated value
    return shvar.value

if __name__ == '__main__':
    import dispy
    cluster = dispy.JobCluster(compute, setup=setup, cleanup=cleanup)
    jobs = []
    for n in range(10):
        job = cluster.submit()
        jobs.append(job)

    for job in jobs:
        job()
        if job.status != dispy.DispyJob.Finished:
            print('job %s failed: %s' % (job.id, job.exception))
        else:
            print('%s: %s' % (job.id, job.result))
    cluster.print_status()
    cluster.close()

See pycos’s Distributed Communicating Processes for an alternate approach, where setup can be used to initialize any data that can be updated in computations without any limitations, even under Windows. With pycos, computations have to be tasks and client has to implement scheduling tasks.

7.8. Sending Files to Client

dispy_send_file (see Transferring Files) can be used to transfer file(s) to the client. Assume that computation creates files with the parameter given (in this case n) so different runs create different files (otherwise, file(s) sent by one computation will overwrite files sent by other computations). Such files can be sent to the client with:

def compute(n):
    import time
    time.sleep(n)
    # assume that computation saves data in file n.dat
    dispy_send_file(str(n) + '.dat') # send file to client
    # ... continue further computations
    return n

if __name__ == '__main__':
    import dispy, random
    cluster = dispy.JobCluster(compute)
    jobs = []
    for i in range(20):
        job = cluster.submit(random.randint(5,20))
        jobs.append(job)
    for job in jobs:
        job()
        print('job %s results in file %s' % (job.id, str(job.id) + '.dat'))

If the client needs to process the files as soon as they are transferred, Provisional/Intermediate Results feature along with job_status can be used to notify the client.

7.9. Process Status Notifications

submit_node method and cluster_status notification can be used to schedule jobs with full control over when / which node executes a job. The program job_scheduler.py below schedules a job to compute sha1 checksum of data files whenever a processor is available:

# job computation runs at dispynode servers
def compute(path):
    import hashlib, time, os
    csum = hashlib.sha1()
    with open(os.path.basename(path), 'rb') as fd:
        while True:
            data = fd.read(1024000)
            if not data:
                break
            csum.update(data)
    time.sleep(5)
    return csum.hexdigest()

# 'cluster_status' function. It is invoked to notify of changes
# in node / job status. Here node initialization and
# job done status are used to schedule jobs, so at most one job is
# running on a node (even if a node has more than one processor). Data
# files are assumed to be 'data000', 'data001' etc.
def status_cb(status, node, job):
    if status == dispy.DispyJob.Finished:
        print('sha1sum for %s: %s' % (job.id, job.result))
    elif status == dispy.DispyJob.Terminated:
        print('sha1sum for %s failed: %s' % (job.id, job.exception))
    elif status == dispy.DispyNode.Initialized:
        print('node %s with %s CPUs available' % (node.ip_addr, node.avail_cpus))
    else:  # ignore other status messages
        return

    global submitted
    data_file = 'data%03d' % submitted
    if os.path.isfile(data_file):
        submitted += 1
        # 'node' and 'dispy_job_depends' are consumed by dispy;
        # 'compute' is called with only 'data_file' as argument(s)
        job = cluster.submit_node(node, data_file, dispy_job_depends=[data_file])
        job.id = data_file

if __name__ == '__main__':
    import dispy, sys, os
    cluster = dispy.JobCluster(compute, cluster_status=status_cb)
    submitted = 0
    while True:
        try:
            cmd = sys.stdin.readline().strip().lower()
        except KeyboardInterrupt:
            break
        if cmd == 'quit' or cmd == 'exit':
            break

    cluster.wait()
    cluster.print_status()

7.10. Efficient Job Submission

When a job is submitted to cluster, the arguments are kept in DispyJob structure returned. These arguments are used to resubmit the jobs if a node fails (if computation is reentrant). When large number of jobs are submitted, especially with large data in arguments, the memory required to keep arguments (at the client) may be an issue.

Note also that in many examples, jobs are kept in list and processed in sequence later, without removing jobs from the list. This is acceptable when job arguments don’t require too much memory, but not suitable otherwise (as arguments are kept even after jobs are done). It may be better to keep jobs in a dictionary (using a unique index that is also set as job ID) and remove the job from this dictionary as soon as it is done.

The program bounded_submit.py below uses job_status feature to keep the scheduler pipeline busy but not submit all jobs at once. The variables lower_bound and upper_bound control number of jobs that are scheduled at any time. These can be adjusted as suggested, or dynamically updated with either NodeAllocate or cluster_status feature.:

def compute(n):  # executed on nodes
    import time
    time.sleep(n)
    return n

# dispy calls this function to indicate change in job status
def job_status(job): # executed at the client
    global pending_jobs, jobs_cond
    if (job.status == dispy.DispyJob.Finished  # most usual case
        or job.status in (dispy.DispyJob.Terminated, dispy.DispyJob.Cancelled,
                          dispy.DispyJob.Abandoned)):
        # 'pending_jobs' is shared between two threads, so access it with
        # 'jobs_cond' (see below)
        jobs_cond.acquire()
        if job.id: # job may have finished before 'main' assigned id
            pending_jobs.pop(job.id)
            # dispy.logger.info('job "%s" done with %s: %s', job.id, job.result, len(pending_jobs))
            if len(pending_jobs) <= lower_bound:
                jobs_cond.notify()
        jobs_cond.release()

if __name__ == '__main__':
    import dispy, threading, random

    # set lower and upper bounds as appropriate; assuming there are 30
    # processors in a cluster, bounds are set to 50 to 100
    lower_bound, upper_bound = 50, 100
    # use Condition variable to protect access to pending_jobs, as
    # 'job_status' is executed in another thread
    jobs_cond = threading.Condition()
    cluster = dispy.JobCluster(compute, job_status=job_status)
    pending_jobs = {}
    # submit 1000 jobs
    for i in range(1000):
        i += 1
        job = cluster.submit(random.uniform(3, 7))
        jobs_cond.acquire()
        # there is a chance the job may have finished and job_status called by
        # this time, so put it in 'pending_jobs' only if job is pending
        if job.status == dispy.DispyJob.Created or job.status == dispy.DispyJob.Running:
            pending_jobs[i] = job
            # dispy.logger.info('job "%s" submitted: %s', i, len(pending_jobs))
            if len(pending_jobs) >= upper_bound:
                while len(pending_jobs) > lower_bound:
                    jobs_cond.wait()
        jobs_cond.release()

    cluster.wait()
    cluster.print_status()
    cluster.close()

7.11. Long Running Computations

This example shows how computations can process KeyboardInterrupt exception raised by dispynode when a running job is cancelled. This can be used in various use cases, such as when cost of computation goes higher, client can cancel running jobs, causing computations to to save current state of computation and send the state to client so computations can be resumed later (when cost of computing comes down). KeyboardInterrupt feature is not supported if dispynode is running on Windows, but works with other operating systems, such as Linux, OS X; when dispynode is running on Windows, a running job is simply terminated when cancelled.

Another use case, shown below longrun.py, is optimizations where computations on their own can’t determine result of a computation (e.g., searching for global min/max value). In this case, computations can send (periodically) current best values to client using dispy_provisional_result feature. Client determines when such a result is acceptable and cancels jobs. It is also possible for client to cancel jobs at any time (e.g., as done here, when “quit” command is given). When a job is canceled, the computations can process KeyboardInterrupt exception to send current best value as result (which may be better than last best result sent).:

# computation runs until terminated and return appropriate "result"
def compute():
    import time, random
    # generate random numbers between 0 and 1 and return best value close to 1
    best = 0
    updated = 0  # when last provisional result was sent to client
    while 1:
        # when a job is canceled, first KeyboardInterrupt is raised in job and about 5 seconds
        # later, job is terminated. So jobs can process KeyboardInterrupt to take appropriate
        # action before job is terminated; in this case, best result is sent
        try:
            v = random.uniform(0, 1)
            if v > best:
                best = v
                # if last update was more than 10 seconds ago, send provisional result;
                # alternately, save current state in a file and send it to client
                # (with 'dispy_send_file'), e.g., to resume computations later
                if (time.time() - updated) > 10:
                    dispy_provisional_result(best)
                    updated = time.time()
            time.sleep(2)  # simulate computation
        except KeyboardInterrupt:  # job being terminated
            time.sleep(2)  # simulate further computation / update
            break
    return best


# process cluster status messages to submit jobs on initialized nodes
def cluster_status(status, node, job):
    global jobs
    if status == dispy.DispyJob.ProvisionalResult:
        if (1.0 - job.result) < 1e-2:  # enough accuracy, quit
            import signal
            if os.name == 'nt':
                signum = signal.CTRL_BREAK
            else:
                signum = signal.SIGINT
            os.kill(os.getpid(), signum)  # raises KeyboardInterrupt in '__main__'
    elif status == dispy.DispyNode.Initialized:
        print('node %s initialized with %s cpus' % (node.ip_addr, node.cpus))
        # submit jobs at this node
        for i in range(node.cpus):
            job = cluster.submit_node(node)
            if isinstance(job, dispy.DispyJob):
                jobs.add(job)
            else:
                print('failed to submit job at %s' % node.ip_addr)
    # elif job.status == dispy.DispyJob.Finished:
    #     print('job at %s computed: %s' % (job.ip_addr, job.result))
    elif status == dispy.DispyJob.Abandoned:
        print('%s failed: %s' % (job.ip_addr, job.exception))
        jobs.discard(job)


if __name__ == '__main__':
    import sys, os, dispy

    # although not required in this example, NodeAllocate is used here to allocate at most 2 CPUs
    class NodeAllocate(dispy.NodeAllocate):
        def allocate(self, cluster, ip_addr, name, cpus, avail_info=None, platform=''):
            print('Allocate node for %s: %s / %s / %s' % (cluster.name, ip_addr, name, cpus))
            # use at most 2 cpus per node
            return min(cpus, 2)

    jobs = set()
    cluster = dispy.JobCluster(compute, nodes=[NodeAllocate(host='*')],
                               cluster_status=cluster_status)

    if sys.version_info.major < 3:
        input = raw_input

    while 1:
        try:
            cmd = input('Enter "quit" or "exit" or Ctrl-C to terminate: ').strip().lower()
            if cmd in ('quit', 'exit'):
                break
        except KeyboardInterrupt:
            break

    # cancel (terminate) jobs
    for job in jobs:
        cluster.cancel(job)

    best = 0
    for job in jobs:
        result = job()
        if job.status == dispy.DispyJob.Finished:
            print('job at %s computed %s' % (job.ip_addr, job.result))
            if result > best:
                best = result
    print('best estimate: %s' % best)
    cluster.print_status()
    exit(0)

7.12. Execute Different Computations

In above examples, all jobs of a cluster executes same computation with different data. If different computations are to be executed, then a delegate function can be used to run those computations. If the computations are known in advance, they can be sent with depends. If either the computations are not known when cluster is created (e.g., different computations executed depending on execution paths at client) or not efficient to send them all in the beginning, then dispy_job_depends can be used to send the function to be executed, as done in delegate.py:

# Simple program that distributes 'delegate' function' to execute different
# computations with jobs. In this example, client sends a function with each job
# and 'delegate' executes that function.

# 'delegate' is sent to nodes with cluster initialization
def delegate(func_name, n):
    # get function with given name (this function would've been sent with
    # 'dispy_job_depends' and available in global scope)
    func = globals()[func_name]
    return func(n)

# in this case two different functions (the only difference is in return value)
# are sent with jobs and executed at nodes (with 'delegate')
def func1(n):
    import time
    time.sleep(n)
    return (dispy_node_name + ': func1', n)

def func2(n):
    import time
    time.sleep(n)
    return (dispy_node_name + ': func2', n)

if __name__ == '__main__':
    # above functions can be sent with 'depends' so they are available for jobs
    # always; instead, here, requird function is sent with 'dispy_job_depends'
    # to illustrate how to send functions with 'submit' (dynamically)
    import dispy, random, time
    cluster = dispy.JobCluster(delegate, loglevel=dispy.logger.DEBUG)
    jobs = []
    for i in range(4):
        # run above functions (computations) alternately
        if i % 2 == 0:
            func = func1
        else:
            func = func2
        # send function with 'dispy_job_depends'; this function is specific to
        # this job - it is discarded when job is over
        job = cluster.submit(func.__name__, random.randint(5, 10), dispy_job_depends=[func])
        if not job:
            print('Failed to create job %s' % i)
            continue
        jobs.append(job)

    for job in jobs:
        host, n = job() # waits for job to finish and returns results
        print('%s executed job %s at %s with %s' % (host, job.id, job.start_time, n))
    cluster.print_status()
    dispy.logger.debug('jobs run: %s' % len(jobs))

Similar approach can be used to send computations if they are program (instead of Python functions).

7.13. Updating Nodes

Example node_update.py sends a program (node_update_script.py) to each node to be run once. The script in this case, as in other examples, sleeps for a bit to simulate execution. The script in real world use can, for example, update software, collect node status (such as file system usage, analyze logs etc.):

# 'cluster_status' function is called to indicate node / job status
# changes.
def cluster_status(status, node, job):
    global nodes
    if status == dispy.DispyNode.Initialized:
        print('Node %s with %s CPUs available' % (node.ip_addr, node.avail_cpus))
        nodes.add(node.ip_addr)
        if cluster.submit_node(node):
            pass
        else:
            print('Failed to submit job at %s' % node.ip_addr)

    elif status == dispy.DispyJob.Finished:
        if job.stderr or job.exception:
            print('Node %s failed: %s / %s' % (job.ip_addr, job.stderr, job.exception))
        else:
            print('Node %s finished' % (job.ip_addr))

    elif status == dispy.DispyJob.Terminated:
        print('Node %s failed: %s' % (job.ip_addr, job.stderr))

if __name__ == '__main__':
    import sys, os, time, dispy

    # usage: sys.argv[0] [<path to program to run on nodes>] [<number of nodes>]
    if len(sys.argv) < 2:
        script = os.path.join(os.path.dirname(sys.argv[0]), 'node_update_script.py')
    else:
        script = sys.argv[1]

    assert os.path.isfile(script)
    if len(sys.argv) > 2:
        num_nodes = int(sys.argv[2])
    else:
        num_nodes = 1

    nodes = set()
    cluster = dispy.JobCluster(script, cluster_status=cluster_status)
    while 1:
        # wait a bit for all nodes to be discovered
        time.sleep(5)
        if len(nodes) >= num_nodes:
            break
        cluster.discover_nodes(['*'])
    cluster.wait()
    cluster.print_status()
    cluster.close()

By default, above program runs ‘node_update_script.py’ script on one node and quits. The number of nodes in the network can be given as an argument, in which case, the program waits until that many nodes are discovered. While waiting to discover nodes, the program uses discover_nodes function to broadcast UDP discover messages (since UDP is lossy, repeated broadcasts may sometimes be useful). When given number of nodes have been used, the program quits. In real world use, alternate termination criteria, such as ‘exit’ command (as in Process Status Notifications) may be more appropriate.

7.14. Port Forwarding with SSH

If the nodes are on remote network and client is behind a firewall that can’t be configured to allow ports (default 9700-9702) from nodes to client, then ssh can be used for port forwarding (and security). In this case Amazon EC2 instances are used as remote nodes. To use this feature, use ssh to forward 9700 (default port used by client that the nodes send information to) with ssh -R 9700:localhost:9700 <remote-node>, perhaps to start dispynode. In the case of Amazon EC2, nodes can be configured to use public (external) IP address (in the range 54.x.x.x) in addition to local IP address (in the range 172.x.x.x). dispynode should be started to use external IP address with dispynode.py -i 54.204.242.185 so client and nodes can communicate. The client should also set its IP address to localhost (127.0.0.1) so the remote node will connect to client over ssh tunnel. sshportfw.py shows the configuration and steps:

def compute(n): # function sent to remote nodes for execution
    time.sleep(n)
    return n

if __name__ == '__main__':
    import dispy
    # list remote nodes (here Amazon EC2 instance with external IP 54.204.242.185)
    nodes = ['54.204.242.185']
    # use ssh to forward port 9700 for each node; e.g.,
    # 'ssh -R 9700:localhost:9700 54.204.242.185'

    # start dispynode on each node with 'dispynode.py -i 54.204.242.185' (so dispynode
    # uses external IP address instead of default local IP address)
    cluster = dispy.JobCluster(compute, nodes=nodes, host='127.0.0.1')
    jobs = []
    for i in range(1, 10):
        job = cluster.submit(i)
        jobs.append(job)
    for job in jobs:
        print('result: %s' % job())

7.15. Recover jobs / nodes

dispy’s cluster saves information necessary for fault-recovery of jobs and nodes in case client crashes or loses network connection to nodes after submitting jobs. Even if nodes detect that client is not reachable anymore (nodes and client periodically send pulse messages to check remote side is reachable), they will finish executing currently running jobs and save job results. These results can be retrieved at the client using dispy.recover_jobs function (see (Fault) Recover Jobs). To recover jobs (with the results, output etc.), following program can be used:

import dispy
jobs = dispy.recover_jobs()
for job in jobs:
    print('Job result: %s' % job.result)

This will wait for all running jobs (at the time client crashed) on all nodes and wait for the jobs to finish. A timeout can also be specified if necessary.

To recover nodes immediately, following program can be used:

import dispy
jobs = dispy.recover_jobs(timeout=1, terminate_pending=True)
for job in jobs:
    print('Job result: %s' % job.result)

This will wait up to a second (timeout value) and then request nodes to terminate any pending jobs (still running). The nodes then become available for new client.

7.16. Cluster Creation

Cluster creation can be customized for various use cases; some examples are:

  • cluster = dispy.JobCluster(compute, depends=[ClassA, moduleB, 'file1']) distributes ‘compute’ along with ClassA (Python object), moduleB (Python object) and ‘file1’, a file on client computer. Presumably ClassA, moduleB and file1 are needed by ‘compute’.

  • cluster = dispy.JobCluster(compute, nodes=['node20', '192.168.2.21', 'node24']) sends computation to nodes ‘node20’, ‘node24’ and node with IP address ‘192.168.2.21’. These nodes could be in different networks, as explicit names / IP addresses are listed.

  • If nodes are on remote network, then certain ports need to be forwarded as the nodes connect to the client to send status / results of jobs; see NAT/Firewall Forwarding. If port forwarding is not possible, then ssh tunneling can be used. To use this, ssh to each node with ssh -R 9700:127.0.0.1:9700 node (to possibly execute dispynode (Server) program on the node if not already running), then specify ext_host=127.0.0.1,nodes=[node] to JobCluster. If using more than one node, list them all in nodes. If client port 9700 is not usable, alternate port, say, 2345 can be forwarded with ssh -R 2345:127.0.0.1:2345 node (use JobCluster with ext_host=127.0.0.1,port=2345,nodes=[node]). See SSH Port Forwarding for more details.

  • cluster = dispy.JobCluster(compute, nodes=['192.168.2.*']) sends computation to all nodes whose IP address starts with ‘192.168.2’. In this case, it is assumed that ‘192.168.2’ is local network (since UDP broadcast is used to discover nodes in a network and broadcasting packets don’t cross networks).

  • cluster = dispy.JobCluster(compute, nodes=['192.168.3.5', '192.168.3.22',
    '172.16.11.22', 'node39', '192.168.2.*']) sends computation to nodes with IP addresses ‘192.168.3.5’, ‘192.168.3.22’, ‘172.16.11.22’ and node ‘node39’ (since explicit names / IP addresses are listed, they could be on different networks), all nodes whose IP address starts with ‘192.168.2’ (local network).

  • cluster = dispy.JobCluster(compute, nodes=['192.168.3.5', '192.168.3.*', '192.168.2.*']) In this case, dispy will send discovery messages to node with IP address ‘192.168.3.5’. If this node is running ‘dispynetrelay’, then all the nodes on that network are eligible for executing this computation, as wildcard ‘192.168.3.*’ matches IP addresses of those nodes. In addition, computation is also sent to all nodes whose IP address starts with ‘192.168.2’ (local network).

  • cluster = dispy.JobCluster(compute, nodes=['192.168.3.5', '192.168.8.20', '172.16.2.99', '*']) In this case, dispy will send discovery messages to nodes with IP address ‘192.168.3.5’, ‘192.168.8.20’ and ‘172.16.2.99’. If these nodes all are running dispynetrelay, then all the nodes on those networks are eligible for executing this computation, as wildcard * matches IP addresses of those nodes. In addition, computation is also sent to all nodes on local network (since they also match wildcard * and discovery message is broadcast on local network).

  • Assuming that 192.168.1.39 is the (private) IP address where dispy client is used, a.b.c.d is the (public) IP address of NAT firewall/gateway (that can be reached from outside) and dispynode is running at another public IP address e.f.g.h (so that a.b.c.d and e.f.g.h can communicate, but e.f.g.h can’t communicate with 192.168.1.39), cluster = dispy.JobCluster(compute, host='192.168.1.39', ext_host='a.b.c.d', nodes=['e.f.g.h']) would work if NAT firewall/gateway forwards TCP port 9700 to 192.168.1.39.

  • cluster = dispy.JobCluster(compute, secret='super') distributes ‘compute’ to nodes that also use secret ‘super’ (i.e., nodes started with dispynode.py -s super). Note that secret is used only for establishing communication, but not used to encrypt programs or code for python objects. This can be useful to prevent other users from (inadvertently) using the nodes. If encryption is needed, SSL can be used; see below.

  • cluster = dispy.JobCluster(compute, certfile='mycert', keyfile='mykey') distributes ‘compute’ and encrypts all communication using SSL certificate stored in ‘mycert’ file and key stored in ‘mykey’ file. In this case, dispynode must also use same certificate and key; i.e., each dispynode must be invoked with dispynode --certfile="mycert" --keyfile="mykey"'

    If both certificate and key are stored in same file, say, ‘mycertkey’, they are expected to be in certfile: cluster = dispy.JobCluster(compute, certfile='mycertkey')

  • cluster1 = dispy.JobCluster(compute1, nodes=['192.168.3.2', '192.168.3.5'])
    cluster2 = dispy.JobCluster(compute2, nodes=['192.168.3.10', '192.168.3.11'])
    distribute ‘compute1’ to nodes 192.168.3.2 and 192.168.3.5, and ‘compute2’ to nodes 192.168.3.10 and 192.168.3.11. With this setup, specific computations can be scheduled on certain node(s).

7.17. MapReduce

A simple version of word count example from MapReduce:

# a version of word frequency example from mapreduce tutorial

def mapper(doc):
    # input reader and map function are combined
    import os
    words = []
    with open(os.path.join('/tmp', doc)) as fd:
        for line in fd:
            words.extend((word.lower(), 1) for word in line.split() \
                         if len(word) > 3 and word.isalpha())
    return words

def reducer(words):
    # we should generate sorted lists which are then merged,
    # but to keep things simple, we use dicts
    word_count = {}
    for word, count in words:
        if word not in word_count:
            word_count[word] = 0
        word_count[word] += count
    # print('reducer: %s to %s' % (len(words), len(word_count)))
    return word_count

if __name__ == '__main__':
    import dispy
    # assume nodes node1 and node2 have 'doc1', 'doc2' etc. on their
    # local storage, so no need to transfer them
    map_cluster = dispy.JobCluster(mapper, nodes=['node1', 'node2'], reentrant=True)
    # any node can work on reduce
    reduce_cluster = dispy.JobCluster(reducer, nodes=['*'], reentrant=True)
    map_jobs = []
    for f in ['doc1', 'doc2', 'doc3', 'doc4', 'doc5']:
        job = map_cluster.submit(f)
        map_jobs.append(job)
    reduce_jobs = []
    for map_job in map_jobs:
        words = map_job()
        if not words:
            print(map_job.exception)
            continue
        # simple partition
        n = 0
        while n < len(words):
            m = min(len(words) - n, 1000)
            reduce_job = reduce_cluster.submit(words[n:n+m])
            reduce_jobs.append(reduce_job)
            n += m
    # reduce
    word_count = {}
    for reduce_job in reduce_jobs:
        words = reduce_job()
        if not words:
            print(reduce_job.exception)
            continue
        for word, count in words.iteritems():
            if word not in word_count:
                word_count[word] = 0
            word_count[word] += count
    # sort words by frequency and print
    for word in sorted(word_count, key=lambda x: word_count[x], reverse=True):
        count = word_count[word]
        print(word, count)
    reduce_cluster.print_status()