7. Examples¶
dispy can be used to distribute standalone programs or Python program fragments (functions, classes, modules) and files (programs, data) to nodes and execute jobs in parallel. It supports various options to handle rather comprehensive use cases (such as fault tolerance, sharing nodes in multiple programs simultaneously, using nodes in multiple networks etc.); however, in common setups, the usage is simple, as done in the demonstrative examples below. Some of these examples are also available in ‘examples’ directory where dispy is installed, which can be obtained with the program:
import os, dispy
print(os.path.join(os.path.dirname(dispy.__file__), 'examples'))
Short summary of examples (follow the links for details):
Command-line uses dispy as command line tool to distribute and run a standalone program.
Python script is a simple Python program that distributes and runs a standalone program.
Canonical program distributes and executes a Python function with different parameters. The program also prints some of the attributes of each job after it finishes (such as job’s result, start time).
Distributing objects shows how to send Python objects (Class instances) in client program to remote nodes to execute Python function (which takes those objects as arguments).
In-memory processing is one use of setup (and cleanup) feature to initialize nodes before executing jobs. In this case, setup is used to load data in files transferred from client in to memory (global variables); each job then uses the data (for read-only) in these variables for in-memory processing (i.e., access data already in memory instead of reading data from file in each job).
Replace in-memory data is a variation of In-memory processing. In this example, resetup_node feature is used to replace in-memory data set dynamically so different data sets can be processed.
Updating globals shows how to use Python’s sharedcypes to initialize (using setup feature as above) a global variable and then update in jobs so that all jobs on a node will see updated value.
Sending files to client illustrates how jobs can use dispy_send_file function to send files to client.
Process status notifications uses cluster_status feature to process each job’s result as they are finished.
Efficient job submission uses job_status feature to submit just enough jobs to keep scheduler use all CPUs in cluster, instead of all at once, which can cause memory problems.
Long running computations shows use cases for handling KeyboardInterrupt when jobs are being terminated. In this case, computations send latest (best) value as result of computation.
Execute different computations demonstrates how to execute different computations with same cluster.
Update nodes sends a script to each node to be executed once. The script can be customized to run routine tasks such as sending node status, updating software package etc.
Port forwarding with SSH example uses ssh to forward port to use nodes in remote network (in this case Amazon EC2 cloud instance).
Recover jobs / nodes shows how to recover jobs and nodes after a client crashes or loses network connection to nodes.
Cluster creation lists various ways to create clusters for different configurations.
MapReduce is a simple implementation of well-known map-reduce problem.
7.1. Command-Line¶
dispy can be used as a command line tool (for simple cases, scheduling cron jobs); in this case the computations should only be programs and dependencies should only be files.:
dispy_cmd.py -f /some/file1 -f file2 -a "arg11 arg12" -a "arg21 arg22" -a "arg3" /some/program
will distribute ‘/some/program’ with dependencies ‘/some/file1’ and ‘file2’ and then execute ‘/some/program’ in parallel with 3 instances: a) arg11 and arg12 (two arguments to the program), b) arg21 and arg22 (two arguments), and c) arg3 (one argument).
7.2. Python Script¶
A simple client program that distributes a program (say, ‘/path/to/program’), executes them with a sequence of numbers as arguments is:
import dispy
cluster = dispy.JobCluster('/path/to/program')
for i in range(50):
cluster.submit(i)
The program ‘/path/to/program’ on the client computer is transferred to each of the nodes, so if the program is a binary program then all the nodes should have same architecture as the client.
In the cases above we assume that the programs execute and save the computation results in a database, file system etc. If we are interested in exit status, output from each run etc., then we need to collect each of the jobs submitted from which interested attributes can be retrieved, as done in the example below.
7.3. Canonical Program¶
A canonical cluster that distributes computation ‘compute’ (Python function) to
nodes (running dispynode (Server) on a local network), schedules jobs with the
cluster, gets jobs’ results and prints them is sample.py
:
# function 'compute' is distributed and executed with arguments
# supplied with 'cluster.submit' below
def compute(n):
import time, socket
time.sleep(n)
host = socket.gethostname()
return (host, n)
if __name__ == '__main__':
# executed on client only; variables created below, including modules imported,
# are not available in job computations
import dispy, random
# distribute 'compute' to nodes; in this case, 'compute' does not have
# any dependencies to run on nodes
cluster = dispy.JobCluster(compute)
# run 'compute' with 20 random numbers on available CPUs
jobs = []
for i in range(20):
job = cluster.submit(random.randint(5,20))
jobs.append(job)
# cluster.wait() # waits until all jobs finish
for job in jobs:
host, n = job() # waits for job to finish and returns results
print('%s executed job %s at %s with %s' % (host, job.id, job.start_time, n))
# other fields of 'job' that may be useful:
# job.stdout, job.stderr, job.exception, job.ip_addr, job.end_time
cluster.print_status() # shows which nodes executed how many jobs etc.
7.4. Distributing Objects¶
If the computation has any dependencies, such as classes, objects or files, they can be specified with ‘depends’ argument and dispy will distribute them along with the computation.
Continuing trivial but illustrative examples, the program
obj_instances.py
below distributes
computation to be executed with instances of a class:
class C(object):
def __init__(self, i, n):
self.i = i
self.n = n
def show(self):
print('%s: %.2f' % (self.i, self.n))
def compute(obj):
# obj is an instance of C
import time
time.sleep(obj.n)
obj.show() # the output is stored in job.stdout
return obj.n
if __name__ == '__main__':
import random, dispy
# 'compute' needs definition of class C, so include in 'depends'
cluster = dispy.JobCluster(compute, depends=[C])
jobs = []
for i in range(10):
c = C(i, random.uniform(1, 3)) # create object of C
job = cluster.submit(c) # it is sent to a node for executing 'compute'
job.id = c # store this object for later use
jobs.append(job)
for job in jobs:
job() # wait for job to finish
print('%s: %.2f / %s' % (job.id.i, job.result, job.stdout))
Note that class C is given in ‘depends’ so the code for it is transferred to the nodes automatically and the objects created in client program work transparently in ‘compute’ on remote nodes. The objects are serialized using Pickle and sent over the to the nodes, so the objects must be serializable. If they are not serializable (e.g., they contain references to locks), then the class must provide __getstate__ and __setstate__ methods; see Python object serialization for details. In addition, the objects shouldn’t contain file descriptors, references to other objects not being transferred etc., which are not valid on remote nodes.
7.5. In-memory Processing¶
setup and cleanup parameters to cluster can be used to initialize/de-initialize a node for running jobs of that computation, e.g., to manipulate transferred files, read data into memory so jobs can process data efficiently, set/unset global variables on that node.
In the example below, setup function is used to read data in a file transferred from client in to memory on each node and jobs that are executed later use the data in memory (for in-memory processing) instead of reading from file in every job. This feature works with Posix systems (Linux, OS X and other Unix variants) without limitations - any data can be assigned to variables declared global in setup, as operating system’s fork is used to create child process, which shares the address space of parent process (where setup function is executed) with copy-on-write. This feature can be used, for example, to read large amount of data in file(s) so computations (jobs) can directly access the data in memory, instead of reading same data from file each time.
Under Windows, though, fork is not available, so the global variables are serialized and passed to child process (see multiprocessing’s Programming guidelines for Windows). Thus, for example, modules can’t be loaded in global scope with setup under Windows. Moreover, as each job runs with a copy of all the global variables, initializing objects that require lot of memory may not be possible / efficient (compared to Posix systems where objects are not copied). See pycos’s distributed communicating processes for an alternate approach that doesn’t have these limitations.
The setup function in program node_setup.py
below reads the data in a file (transferred with
depends) in to global variable. The jobs compute checksum of that data in
memory The cleanup function deletes the global variable:
def setup(data_file):
# read data in file to global variable
global data, algorithms, hashlib, time, file_name
import hashlib, time
data = open(data_file).read() # read file in to memory; data_file can now be deleted
file_name = data_file
if sys.version_info.major > 2:
data = data.encode() # convert to bytes
algorithms = list(hashlib.algorithms_guaranteed)
else:
algorithms = hashlib.algorithms
# if running under Windows, modules can't be global, as they are not
# serializable; instead, they must be loaded in 'compute' (jobs); under
# Posix (Linux, OS X and other Unix variants), modules declared global in
# 'setup' will be available in 'compute'
# 'os' module is already available (loaded by dispynode)
if os.name == 'nt': # remove modules under Windows
del hashlib, time
return 0
# 'cleanup' should have same argument as 'setup'
def cleanup(data_file):
global data, algorithms, hashlib, time, file_name
del data, algorithms, file_name
if os.name != 'nt':
del hashlib, time
def compute(n):
global hashlib, time
if os.name == 'nt': # Under Windows modules must be loaded in jobs
import hashlib, time
# 'data' and 'algorithms' global variables are initialized in 'setup'
alg = algorithms[n % len(algorithms)]
csum = getattr(hashlib, alg)()
csum.update(data)
time.sleep(2)
return (dispy_node_ip_addr, file_name, alg, csum.hexdigest())
if __name__ == '__main__':
import dispy, sys, os, glob, time
# each node processes a file in 'data_files' with 'NodeAllocate.allocate'
data_files = glob.glob(os.path.join(os.path.dirname(sys.argv[0]), '*.py'))
node_id = 0
# sub-class NodeAllocate to use node (and computation) specific 'depends'
# and 'setup_args'
class NodeAllocate(dispy.NodeAllocate):
def allocate(self, cluster, ip_addr, name, cpus, avail_info=None, platform='*'):
global node_id
data_file = data_files[node_id % len(data_files)]
node_id += 1
print('Node %s (%s) processes "%s"' % (ip_addr, name, data_file))
self.depends = [data_file] # 'depends' must be a list
self.setup_args = (data_file,) # 'setup_args' must be a tuple
return cpus
cluster = dispy.JobCluster(compute, nodes=[NodeAllocate('*')], setup=setup, cleanup=cleanup)
# wait until (enough) nodes to be setup; alternately, 'cluster_status' can
# be used to process nodes found
time.sleep(5)
jobs = []
for n in range(10):
job = cluster.submit(n)
jobs.append(job)
for job in jobs:
job()
if job.status == dispy.DispyJob.Finished:
print('%s: "%s" %s: %s' % job.result)
else:
print(job.exception)
cluster.print_status()
cluster.close()
The above program is written to work under Posix (Linux, OS X, etc) as well as Windows. If running under Posix only, ‘computation’ can use ‘hashlib’ as global variable (initialized in ‘setup’ function) to avoid loading in jobs.
7.6. Replace In-memory Data¶
As illustrated in In-memory processing, computation’s setup function can store data in global variables which are accessible in all computation jobs. If it is required to replace this in-memory data (e.g., if data sets are more than number of available nodes to read all data), resetup_node method can be used by clients to re-initialize nodes. Although this feature can be used for any purpose, in this example it is used for changing in-memory data. Since in-memory data feature doesn’t work with Windows, this example doesn’t work under Windows.
The program replace_inmem.py
below is a minor variation of
In-memory processing:
def setup(data_file, n):
global data, algorithms, hashlib, file_name
import hashlib, os, sys
data = open(data_file, 'rb').read() # read data in file to global variable
os.remove(data_file) # data_file can now be deleted
file_name = data_file
if sys.version_info.major > 2:
algorithms = list(hashlib.algorithms_guaranteed)
else:
algorithms = hashlib.algorithms
algorithms = [alg for alg in algorithms if (not alg.startswith('shake'))]
return 0
def cleanup(data_file, n):
global data, algorithms, hashlib, file_name
del data, algorithms, file_name
def compute(i, n):
# 'hashlib', 'data' and 'algorithms' global variables are initialized in 'setup'
alg = algorithms[i % len(algorithms)]
csum = getattr(hashlib, alg)()
csum.update(data)
time.sleep(n)
return (dispy_node_ip_addr, file_name, alg, csum.hexdigest())
def job_status(job):
if job.status == dispy.DispyJob.Finished:
print('\njob %s finished by %s, %s of %s is %s' % (job.id, job.result[0], job.result[2],
job.result[1], job.result[3]))
else:
print('\njob %s failed: %s' % (job.id, job.exception))
if __name__ == '__main__':
import dispy, sys, os, glob, random
data_files = glob.glob(os.path.join(os.path.abspath(os.path.dirname(sys.argv[0])), '*.py'))
file_id = 0
nodes = {}
class NodeAllocate(dispy.NodeAllocate):
def allocate(self, cluster, ip_addr, name, cpus, avail_info=None, platform='*'):
global file_id
if len(nodes) == 2:
# use at most 2 nodes, to illustrate this example
return 0
if platform.startswith('Windows'):
# In-memory is not supported with Windows
print('Ignoring node %s as in-memory data is not supported with Windows' % ip_addr)
return 0
data_file = data_files[file_id % len(data_files)]
file_id += 1
nodes[ip_addr] = data_file
print('Node %s (%s) processes "%s"' % (ip_addr, name, data_file))
self.setup_args = (data_file, file_id)
self.depends = [data_file]
return max(2, cpus) # use at most 2 cpus for illustration
cluster = dispy.JobCluster(compute, nodes=[NodeAllocate(host='*')],
setup=setup, cleanup=cleanup, job_status=job_status,
loglevel=dispy.logger.DEBUG)
for i in range(1, 7):
job = cluster.submit(i, random.uniform(2, 5))
job.id = i
cluster.wait() # not needed if jobs have been scheduled at node
for ip_addr in nodes:
cluster.resetup_node(ip_addr) # close
for i in range(i+1, i+7):
job = cluster.submit(i, random.uniform(2, 5))
job.id = i
cluster.wait()
cluster.print_status()
cluster.close()
When resetup_node
is called, that node will run cleanup
function (after all currently
running jobs are finished), if one is given, without releasing itself from current scheduler or
removing any files left behind (so new setup function and jobs can use those files). The
scheduler will then reallocate that node to by calling allocate
method of NodeAllocate
instance. If that method allocates that node, the setup_args
and depends
of that instance
are used for initializing (by transferring depends
) and calling setup
function with new
setup_args
. In the above example, allocate
method allocates at most 2 CPUs so each node
runs at most 2 jobs simultaneously. It also rejects any nodes running under Windows, sets
setup_args
and depends
to sequentially process files. Since allocate
method only
allocates 2 nodes, the example will force new setup to run to load different files in memory at
those nodes.
7.7. Updating Globals¶
The example above creates global variables that can be read (but not update) in
jobs. setup and cleanup can also be used to create global variables that
can be updated in jobs on a node (but not by jobs in other nodes) using
multiprocessing module’s Sharing state between processes.
The program node_shvars.py
below creates
an integer shared variable that is updated by jobs running on that node:
def setup():
import multiprocessing, multiprocessing.sharedctypes
global shvar
lock = multiprocessing.Lock()
shvar = multiprocessing.sharedctypes.Value('i', 1, lock=lock)
return 0
def cleanup():
global shvar
del shvar
def compute():
import random
r = random.randint(1, 10)
global shvar
shvar.value += r # update 'shvar'; all jobs on this node will
# see updated value
return shvar.value
if __name__ == '__main__':
import dispy
cluster = dispy.JobCluster(compute, setup=setup, cleanup=cleanup)
jobs = []
for n in range(10):
job = cluster.submit()
jobs.append(job)
for job in jobs:
job()
if job.status != dispy.DispyJob.Finished:
print('job %s failed: %s' % (job.id, job.exception))
else:
print('%s: %s' % (job.id, job.result))
cluster.print_status()
cluster.close()
See pycos’s Distributed Communicating Processes for an alternate approach, where setup can be used to initialize any data that can be updated in computations without any limitations, even under Windows. With pycos, computations have to be tasks and client has to implement scheduling tasks.
7.8. Sending Files to Client¶
dispy_send_file (see Transferring Files) can be used to transfer file(s) to the client. Assume that computation creates files with the parameter given (in this case n) so different runs create different files (otherwise, file(s) sent by one computation will overwrite files sent by other computations). Such files can be sent to the client with:
def compute(n):
import time
time.sleep(n)
# assume that computation saves data in file n.dat
dispy_send_file(str(n) + '.dat') # send file to client
# ... continue further computations
return n
if __name__ == '__main__':
import dispy, random
cluster = dispy.JobCluster(compute)
jobs = []
for i in range(20):
job = cluster.submit(random.randint(5,20))
jobs.append(job)
for job in jobs:
job()
print('job %s results in file %s' % (job.id, str(job.id) + '.dat'))
If the client needs to process the files as soon as they are transferred, Provisional/Intermediate Results feature along with job_status can be used to notify the client.
7.9. Process Status Notifications¶
submit_node method and cluster_status notification can be used to schedule jobs
with full control over when / which node executes a job. The program
job_scheduler.py
below schedules a job
to compute sha1 checksum of data files whenever a processor is available:
# job computation runs at dispynode servers
def compute(path):
import hashlib, time, os
csum = hashlib.sha1()
with open(os.path.basename(path), 'rb') as fd:
while True:
data = fd.read(1024000)
if not data:
break
csum.update(data)
time.sleep(5)
return csum.hexdigest()
# 'cluster_status' function. It is invoked to notify of changes
# in node / job status. Here node initialization and
# job done status are used to schedule jobs, so at most one job is
# running on a node (even if a node has more than one processor). Data
# files are assumed to be 'data000', 'data001' etc.
def status_cb(status, node, job):
if status == dispy.DispyJob.Finished:
print('sha1sum for %s: %s' % (job.id, job.result))
elif status == dispy.DispyJob.Terminated:
print('sha1sum for %s failed: %s' % (job.id, job.exception))
elif status == dispy.DispyNode.Initialized:
print('node %s with %s CPUs available' % (node.ip_addr, node.avail_cpus))
else: # ignore other status messages
return
global submitted
data_file = 'data%03d' % submitted
if os.path.isfile(data_file):
submitted += 1
# 'node' and 'dispy_job_depends' are consumed by dispy;
# 'compute' is called with only 'data_file' as argument(s)
job = cluster.submit_node(node, data_file, dispy_job_depends=[data_file])
job.id = data_file
if __name__ == '__main__':
import dispy, sys, os
cluster = dispy.JobCluster(compute, cluster_status=status_cb)
submitted = 0
while True:
try:
cmd = sys.stdin.readline().strip().lower()
except KeyboardInterrupt:
break
if cmd == 'quit' or cmd == 'exit':
break
cluster.wait()
cluster.print_status()
7.10. Efficient Job Submission¶
When a job is submitted to cluster, the arguments are kept in DispyJob structure returned. These arguments are used to resubmit the jobs if a node fails (if computation is reentrant). When large number of jobs are submitted, especially with large data in arguments, the memory required to keep arguments (at the client) may be an issue.
Note also that in many examples, jobs are kept in list and processed in sequence later, without removing jobs from the list. This is acceptable when job arguments don’t require too much memory, but not suitable otherwise (as arguments are kept even after jobs are done). It may be better to keep jobs in a dictionary (using a unique index that is also set as job ID) and remove the job from this dictionary as soon as it is done.
The program bounded_submit.py
below
uses job_status feature to keep the scheduler pipeline busy but not submit all
jobs at once. The variables lower_bound and upper_bound control number of
jobs that are scheduled at any time. These can be adjusted as suggested, or
dynamically updated with either NodeAllocate or cluster_status feature.:
def compute(n): # executed on nodes
import time
time.sleep(n)
return n
# dispy calls this function to indicate change in job status
def job_status(job): # executed at the client
global pending_jobs, jobs_cond
if (job.status == dispy.DispyJob.Finished # most usual case
or job.status in (dispy.DispyJob.Terminated, dispy.DispyJob.Cancelled,
dispy.DispyJob.Abandoned)):
# 'pending_jobs' is shared between two threads, so access it with
# 'jobs_cond' (see below)
jobs_cond.acquire()
if job.id: # job may have finished before 'main' assigned id
pending_jobs.pop(job.id)
# dispy.logger.info('job "%s" done with %s: %s', job.id, job.result, len(pending_jobs))
if len(pending_jobs) <= lower_bound:
jobs_cond.notify()
jobs_cond.release()
if __name__ == '__main__':
import dispy, threading, random
# set lower and upper bounds as appropriate; assuming there are 30
# processors in a cluster, bounds are set to 50 to 100
lower_bound, upper_bound = 50, 100
# use Condition variable to protect access to pending_jobs, as
# 'job_status' is executed in another thread
jobs_cond = threading.Condition()
cluster = dispy.JobCluster(compute, job_status=job_status)
pending_jobs = {}
# submit 1000 jobs
for i in range(1000):
i += 1
job = cluster.submit(random.uniform(3, 7))
jobs_cond.acquire()
# there is a chance the job may have finished and job_status called by
# this time, so put it in 'pending_jobs' only if job is pending
if job.status == dispy.DispyJob.Created or job.status == dispy.DispyJob.Running:
pending_jobs[i] = job
# dispy.logger.info('job "%s" submitted: %s', i, len(pending_jobs))
if len(pending_jobs) >= upper_bound:
while len(pending_jobs) > lower_bound:
jobs_cond.wait()
jobs_cond.release()
cluster.wait()
cluster.print_status()
cluster.close()
7.11. Long Running Computations¶
This example shows how computations can process KeyboardInterrupt exception raised by dispynode when a running job is cancelled. This can be used in various use cases, such as when cost of computation goes higher, client can cancel running jobs, causing computations to to save current state of computation and send the state to client so computations can be resumed later (when cost of computing comes down). KeyboardInterrupt feature is not supported if dispynode is running on Windows, but works with other operating systems, such as Linux, OS X; when dispynode is running on Windows, a running job is simply terminated when cancelled.
Another use case, shown below longrun.py
, is optimizations where
computations on their own can’t determine result of a computation (e.g., searching for global
min/max value). In this case, computations can send (periodically) current best values to client
using dispy_provisional_result feature. Client determines when such a result is acceptable and
cancels jobs. It is also possible for client to cancel jobs at any time (e.g., as done here, when
“quit” command is given). When a job is canceled, the computations can process
KeyboardInterrupt exception to send current best value as result (which may be better than
last best result sent).:
# computation runs until terminated and return appropriate "result"
def compute():
import time, random
# generate random numbers between 0 and 1 and return best value close to 1
best = 0
updated = 0 # when last provisional result was sent to client
while 1:
# when a job is canceled, first KeyboardInterrupt is raised in job and about 5 seconds
# later, job is terminated. So jobs can process KeyboardInterrupt to take appropriate
# action before job is terminated; in this case, best result is sent
try:
v = random.uniform(0, 1)
if v > best:
best = v
# if last update was more than 10 seconds ago, send provisional result;
# alternately, save current state in a file and send it to client
# (with 'dispy_send_file'), e.g., to resume computations later
if (time.time() - updated) > 10:
dispy_provisional_result(best)
updated = time.time()
time.sleep(2) # simulate computation
except KeyboardInterrupt: # job being terminated
time.sleep(2) # simulate further computation / update
break
return best
# process cluster status messages to submit jobs on initialized nodes
def cluster_status(status, node, job):
global jobs
if status == dispy.DispyJob.ProvisionalResult:
if (1.0 - job.result) < 1e-2: # enough accuracy, quit
import signal
if os.name == 'nt':
signum = signal.CTRL_BREAK
else:
signum = signal.SIGINT
os.kill(os.getpid(), signum) # raises KeyboardInterrupt in '__main__'
elif status == dispy.DispyNode.Initialized:
print('node %s initialized with %s cpus' % (node.ip_addr, node.cpus))
# submit jobs at this node
for i in range(node.cpus):
job = cluster.submit_node(node)
if isinstance(job, dispy.DispyJob):
jobs.add(job)
else:
print('failed to submit job at %s' % node.ip_addr)
# elif job.status == dispy.DispyJob.Finished:
# print('job at %s computed: %s' % (job.ip_addr, job.result))
elif status == dispy.DispyJob.Abandoned:
print('%s failed: %s' % (job.ip_addr, job.exception))
jobs.discard(job)
if __name__ == '__main__':
import sys, os, dispy
# although not required in this example, NodeAllocate is used here to allocate at most 2 CPUs
class NodeAllocate(dispy.NodeAllocate):
def allocate(self, cluster, ip_addr, name, cpus, avail_info=None, platform=''):
print('Allocate node for %s: %s / %s / %s' % (cluster.name, ip_addr, name, cpus))
# use at most 2 cpus per node
return min(cpus, 2)
jobs = set()
cluster = dispy.JobCluster(compute, nodes=[NodeAllocate(host='*')],
cluster_status=cluster_status)
if sys.version_info.major < 3:
input = raw_input
while 1:
try:
cmd = input('Enter "quit" or "exit" or Ctrl-C to terminate: ').strip().lower()
if cmd in ('quit', 'exit'):
break
except KeyboardInterrupt:
break
# cancel (terminate) jobs
for job in jobs:
cluster.cancel(job)
best = 0
for job in jobs:
result = job()
if job.status == dispy.DispyJob.Finished:
print('job at %s computed %s' % (job.ip_addr, job.result))
if result > best:
best = result
print('best estimate: %s' % best)
cluster.print_status()
exit(0)
7.12. Execute Different Computations¶
In above examples, all jobs of a cluster executes same computation with
different data. If different computations are to be executed, then a delegate
function can be used to run those computations. If the computations are known in
advance, they can be sent with depends. If either the computations are not
known when cluster is created (e.g., different computations executed depending
on execution paths at client) or not efficient to send them all in the
beginning, then dispy_job_depends can be used to send the function to be
executed, as done in delegate.py
:
# Simple program that distributes 'delegate' function' to execute different
# computations with jobs. In this example, client sends a function with each job
# and 'delegate' executes that function.
# 'delegate' is sent to nodes with cluster initialization
def delegate(func_name, n):
# get function with given name (this function would've been sent with
# 'dispy_job_depends' and available in global scope)
func = globals()[func_name]
return func(n)
# in this case two different functions (the only difference is in return value)
# are sent with jobs and executed at nodes (with 'delegate')
def func1(n):
import time
time.sleep(n)
return (dispy_node_name + ': func1', n)
def func2(n):
import time
time.sleep(n)
return (dispy_node_name + ': func2', n)
if __name__ == '__main__':
# above functions can be sent with 'depends' so they are available for jobs
# always; instead, here, requird function is sent with 'dispy_job_depends'
# to illustrate how to send functions with 'submit' (dynamically)
import dispy, random, time
cluster = dispy.JobCluster(delegate, loglevel=dispy.logger.DEBUG)
jobs = []
for i in range(4):
# run above functions (computations) alternately
if i % 2 == 0:
func = func1
else:
func = func2
# send function with 'dispy_job_depends'; this function is specific to
# this job - it is discarded when job is over
job = cluster.submit(func.__name__, random.randint(5, 10), dispy_job_depends=[func])
if not job:
print('Failed to create job %s' % i)
continue
jobs.append(job)
for job in jobs:
host, n = job() # waits for job to finish and returns results
print('%s executed job %s at %s with %s' % (host, job.id, job.start_time, n))
cluster.print_status()
dispy.logger.debug('jobs run: %s' % len(jobs))
Similar approach can be used to send computations if they are program (instead of Python functions).
7.13. Updating Nodes¶
Example node_update.py
sends a program
(node_update_script.py
) to each node to be run
once. The script in this case, as in other examples, sleeps for a bit to simulate execution. The
script in real world use can, for example, update software, collect node status (such as file
system usage, analyze logs etc.):
# 'cluster_status' function is called to indicate node / job status
# changes.
def cluster_status(status, node, job):
global nodes
if status == dispy.DispyNode.Initialized:
print('Node %s with %s CPUs available' % (node.ip_addr, node.avail_cpus))
nodes.add(node.ip_addr)
if cluster.submit_node(node):
pass
else:
print('Failed to submit job at %s' % node.ip_addr)
elif status == dispy.DispyJob.Finished:
if job.stderr or job.exception:
print('Node %s failed: %s / %s' % (job.ip_addr, job.stderr, job.exception))
else:
print('Node %s finished' % (job.ip_addr))
elif status == dispy.DispyJob.Terminated:
print('Node %s failed: %s' % (job.ip_addr, job.stderr))
if __name__ == '__main__':
import sys, os, time, dispy
# usage: sys.argv[0] [<path to program to run on nodes>] [<number of nodes>]
if len(sys.argv) < 2:
script = os.path.join(os.path.dirname(sys.argv[0]), 'node_update_script.py')
else:
script = sys.argv[1]
assert os.path.isfile(script)
if len(sys.argv) > 2:
num_nodes = int(sys.argv[2])
else:
num_nodes = 1
nodes = set()
cluster = dispy.JobCluster(script, cluster_status=cluster_status)
while 1:
# wait a bit for all nodes to be discovered
time.sleep(5)
if len(nodes) >= num_nodes:
break
cluster.discover_nodes(['*'])
cluster.wait()
cluster.print_status()
cluster.close()
By default, above program runs ‘node_update_script.py’ script on one node and quits. The number of
nodes in the network can be given as an argument, in which case, the program waits until that many
nodes are discovered. While waiting to discover nodes, the program uses discover_nodes
function to broadcast UDP discover messages (since UDP is lossy, repeated broadcasts may sometimes
be useful). When given number of nodes have been used, the program quits. In real world use,
alternate termination criteria, such as ‘exit’ command (as in Process Status Notifications) may be
more appropriate.
7.14. Port Forwarding with SSH¶
If the nodes are on remote network and client is behind a firewall that can’t be
configured to allow ports (default 9700-9702) from nodes to client, then ssh
can be used for port forwarding (and security). In this case Amazon EC2
instances are used as remote nodes. To use this feature, use ssh to forward
9700 (default port used by client that the nodes send information to) with
ssh -R 9700:localhost:9700 <remote-node>
, perhaps to start dispynode. In
the case of Amazon EC2, nodes can be configured to use public (external) IP
address (in the range 54.x.x.x) in addition to local IP address (in the range
172.x.x.x). dispynode should be started to use external IP address with
dispynode.py -i 54.204.242.185
so client and nodes can communicate. The
client should also set its IP address to localhost (127.0.0.1) so the remote
node will connect to client over ssh tunnel. sshportfw.py
shows the configuration and steps:
def compute(n): # function sent to remote nodes for execution
time.sleep(n)
return n
if __name__ == '__main__':
import dispy
# list remote nodes (here Amazon EC2 instance with external IP 54.204.242.185)
nodes = ['54.204.242.185']
# use ssh to forward port 9700 for each node; e.g.,
# 'ssh -R 9700:localhost:9700 54.204.242.185'
# start dispynode on each node with 'dispynode.py -i 54.204.242.185' (so dispynode
# uses external IP address instead of default local IP address)
cluster = dispy.JobCluster(compute, nodes=nodes, host='127.0.0.1')
jobs = []
for i in range(1, 10):
job = cluster.submit(i)
jobs.append(job)
for job in jobs:
print('result: %s' % job())
7.15. Recover jobs / nodes¶
dispy’s cluster saves information necessary for fault-recovery of jobs and nodes in case client crashes or loses network connection to nodes after submitting jobs. Even if nodes detect that client is not reachable anymore (nodes and client periodically send pulse messages to check remote side is reachable), they will finish executing currently running jobs and save job results. These results can be retrieved at the client using dispy.recover_jobs function (see (Fault) Recover Jobs). To recover jobs (with the results, output etc.), following program can be used:
import dispy
jobs = dispy.recover_jobs()
for job in jobs:
print('Job result: %s' % job.result)
This will wait for all running jobs (at the time client crashed) on all nodes and wait for the jobs to finish. A timeout can also be specified if necessary.
To recover nodes immediately, following program can be used:
import dispy
jobs = dispy.recover_jobs(timeout=1, terminate_pending=True)
for job in jobs:
print('Job result: %s' % job.result)
This will wait up to a second (timeout value) and then request nodes to terminate any pending jobs (still running). The nodes then become available for new client.
7.16. Cluster Creation¶
Cluster creation can be customized for various use cases; some examples are:
cluster = dispy.JobCluster(compute, depends=[ClassA, moduleB, 'file1'])
distributes ‘compute’ along with ClassA (Python object), moduleB (Python object) and ‘file1’, a file on client computer. Presumably ClassA, moduleB and file1 are needed by ‘compute’.cluster = dispy.JobCluster(compute, nodes=['node20', '192.168.2.21', 'node24'])
sends computation to nodes ‘node20’, ‘node24’ and node with IP address ‘192.168.2.21’. These nodes could be in different networks, as explicit names / IP addresses are listed.If nodes are on remote network, then certain ports need to be forwarded as the nodes connect to the client to send status / results of jobs; see NAT/Firewall Forwarding. If port forwarding is not possible, then ssh tunneling can be used. To use this, ssh to each node with
ssh -R 9700:127.0.0.1:9700 node
(to possibly execute dispynode (Server) program on the node if not already running), then specify ext_host=127.0.0.1,nodes=[node] to JobCluster. If using more than one node, list them all in nodes. If client port 9700 is not usable, alternate port, say, 2345 can be forwarded withssh -R 2345:127.0.0.1:2345 node
(use JobCluster with ext_host=127.0.0.1,port=2345,nodes=[node]). See SSH Port Forwarding for more details.cluster = dispy.JobCluster(compute, nodes=['192.168.2.*'])
sends computation to all nodes whose IP address starts with ‘192.168.2’. In this case, it is assumed that ‘192.168.2’ is local network (since UDP broadcast is used to discover nodes in a network and broadcasting packets don’t cross networks).cluster = dispy.JobCluster(compute, nodes=['192.168.3.5', '192.168.3.22',
'172.16.11.22', 'node39', '192.168.2.*'])
sends computation to nodes with IP addresses ‘192.168.3.5’, ‘192.168.3.22’, ‘172.16.11.22’ and node ‘node39’ (since explicit names / IP addresses are listed, they could be on different networks), all nodes whose IP address starts with ‘192.168.2’ (local network).cluster = dispy.JobCluster(compute, nodes=['192.168.3.5', '192.168.3.*', '192.168.2.*'])
In this case, dispy will send discovery messages to node with IP address ‘192.168.3.5’. If this node is running ‘dispynetrelay’, then all the nodes on that network are eligible for executing this computation, as wildcard ‘192.168.3.*’ matches IP addresses of those nodes. In addition, computation is also sent to all nodes whose IP address starts with ‘192.168.2’ (local network).cluster = dispy.JobCluster(compute, nodes=['192.168.3.5', '192.168.8.20', '172.16.2.99', '*'])
In this case, dispy will send discovery messages to nodes with IP address ‘192.168.3.5’, ‘192.168.8.20’ and ‘172.16.2.99’. If these nodes all are running dispynetrelay, then all the nodes on those networks are eligible for executing this computation, as wildcard*
matches IP addresses of those nodes. In addition, computation is also sent to all nodes on local network (since they also match wildcard*
and discovery message is broadcast on local network).Assuming that 192.168.1.39 is the (private) IP address where dispy client is used, a.b.c.d is the (public) IP address of NAT firewall/gateway (that can be reached from outside) and dispynode is running at another public IP address e.f.g.h (so that a.b.c.d and e.f.g.h can communicate, but e.f.g.h can’t communicate with 192.168.1.39),
cluster = dispy.JobCluster(compute, host='192.168.1.39', ext_host='a.b.c.d', nodes=['e.f.g.h'])
would work if NAT firewall/gateway forwards TCP port 9700 to 192.168.1.39.cluster = dispy.JobCluster(compute, secret='super')
distributes ‘compute’ to nodes that also use secret ‘super’ (i.e., nodes started withdispynode.py -s super
). Note that secret is used only for establishing communication, but not used to encrypt programs or code for python objects. This can be useful to prevent other users from (inadvertently) using the nodes. If encryption is needed, SSL can be used; see below.cluster = dispy.JobCluster(compute, certfile='mycert', keyfile='mykey')
distributes ‘compute’ and encrypts all communication using SSL certificate stored in ‘mycert’ file and key stored in ‘mykey’ file. In this case, dispynode must also use same certificate and key; i.e., each dispynode must be invoked withdispynode --certfile="mycert" --keyfile="mykey"'
If both certificate and key are stored in same file, say, ‘mycertkey’, they are expected to be in certfile:
cluster = dispy.JobCluster(compute, certfile='mycertkey')
cluster1 = dispy.JobCluster(compute1, nodes=['192.168.3.2', '192.168.3.5'])
cluster2 = dispy.JobCluster(compute2, nodes=['192.168.3.10', '192.168.3.11'])
distribute ‘compute1’ to nodes 192.168.3.2 and 192.168.3.5, and ‘compute2’ to nodes 192.168.3.10 and 192.168.3.11. With this setup, specific computations can be scheduled on certain node(s).
7.17. MapReduce¶
A simple version of word count example from MapReduce:
# a version of word frequency example from mapreduce tutorial
def mapper(doc):
# input reader and map function are combined
import os
words = []
with open(os.path.join('/tmp', doc)) as fd:
for line in fd:
words.extend((word.lower(), 1) for word in line.split() \
if len(word) > 3 and word.isalpha())
return words
def reducer(words):
# we should generate sorted lists which are then merged,
# but to keep things simple, we use dicts
word_count = {}
for word, count in words:
if word not in word_count:
word_count[word] = 0
word_count[word] += count
# print('reducer: %s to %s' % (len(words), len(word_count)))
return word_count
if __name__ == '__main__':
import dispy
# assume nodes node1 and node2 have 'doc1', 'doc2' etc. on their
# local storage, so no need to transfer them
map_cluster = dispy.JobCluster(mapper, nodes=['node1', 'node2'], reentrant=True)
# any node can work on reduce
reduce_cluster = dispy.JobCluster(reducer, nodes=['*'], reentrant=True)
map_jobs = []
for f in ['doc1', 'doc2', 'doc3', 'doc4', 'doc5']:
job = map_cluster.submit(f)
map_jobs.append(job)
reduce_jobs = []
for map_job in map_jobs:
words = map_job()
if not words:
print(map_job.exception)
continue
# simple partition
n = 0
while n < len(words):
m = min(len(words) - n, 1000)
reduce_job = reduce_cluster.submit(words[n:n+m])
reduce_jobs.append(reduce_job)
n += m
# reduce
word_count = {}
for reduce_job in reduce_jobs:
words = reduce_job()
if not words:
print(reduce_job.exception)
continue
for word, count in words.iteritems():
if word not in word_count:
word_count[word] = 0
word_count[word] += count
# sort words by frequency and print
for word in sorted(word_count, key=lambda x: word_count[x], reverse=True):
count = word_count[word]
print(word, count)
reduce_cluster.print_status()