1. dispy (Client)¶
dispy module provides API for the client (program) to create clusters (to distribute computations to servers running dispynode), schedule jobs to execute those computations and process jobs’ execution results.
Creating a cluster consists of packaging computation fragments (code and data), specify options that control how the computations are executed, such as which nodes can execute computations. There are two ways to create clusters with dispy: JobCluster and SharedJobCluster. If only one instance of client program may be running at anytime, JobCluster is simple to use; it already contains a scheduler that will schedule jobs to nodes running dispynode (Server). If, however, multiple programs using dispy may be running simultaneously, JobCluster cannot be used - each of the schedulers in each instance of dispy will assume the nodes are controlled exclusively by each, causing conflicts. Instead, SharedJobCluster must be used. In this case, dispyscheduler (Shared Execution) program must also be running on some computer and SharedJobCluster must set scheduler_node parameter with the node running dispyscheduler (default is the host that calls SharedJobCluster).
Once a cluster is created, jobs can scheduled with cluster.submit()
to
execute a computation with different parameters, similar to how a function is
executed in a local program, except that here dispy’s scheduler will execute it
on an available processor on a remote server. The scheduler and dispynode server
will run at most one job on a processor at any time, as the computations are
assumed to CPU intensive. (See pycos’s dispycos for an alternate execution
model for executing multiple computations on a processor.)
While dispy and other components have various options that cover rather comprehensive use cases, making it seem complex, most of the options have default values that likely work for common cases. For example, starting dispynode (Server) program on each of the nodes on a local network and using JobCluster with computation, and possibly depends parameters, as done with example in dispy: Distributed and Parallel Computing with/for Python, may be sufficient.
1.1. JobCluster¶
-
class
dispy.
JobCluster
(computation, nodes=None, depends=[], job_status=None, cluster_status=None, host=None, ext_host=None, dispy_port=None, ipv4_udp_multicast=False, dest_path=None, loglevel=dispy.logger.INFO, setup=None, cleanup=True, ping_interval=None, pulse_interval=None, reentrant=False, secret='', keyfile=None, certfile=None, recover_file=None)¶ Creates and returns a cluster for given computation, which can be used to schedule executions, as explained above, and Cluster. The parameters are:
computation should be either a Python function or a string. If it is a string, it must be path to executable program. This computation is sent to nodes in the given cluster. When a job is submitted (to invoke computation with arguments), dispynode executes the computation with those arguments in isolation - the computation should not depend on global state, such as modules imported in the main program or global variables etc., except as described below for setup parameter (see also Examples).
As described in DispyJob, dispynode buffers stdout and stderr streams and send them to client. If any messages need to be shown at the node itself,
disynode_logger
(which is similar to Logger <https://docs.python.org/2.7/library/logging.html#logger-objects>) can be used, e.g., asdispynode_logger.info('Job process %s started' % os.getpid())
.nodes, if given, must be a list, each element of which must be either a string or NodeAllocate object. If element is a string, it must be either IP address or host name; this string is converted to NodeAllocate object.
This list serves two purposes: dispy initially sends a request to all the nodes listed to find out information about them (e.g., number of processing units available for dispy), then sends given computation to only those nodes that match the listed nodes (dispy may know about nodes not listed in this computation, as it also broadcasts identification request). Wildcard ‘*’ can be used to match (part of) any IP address; e.g., ‘192.168.3.*’ matches any node whose IP address starts with ‘192.168.3’. If there are any nodes beyond local network, then all such nodes should be mentioned in nodes. If there are many such nodes (on outside local network), it may be cumbersome to list them all (it is not possible to send identification request to outside networks with wildcard in them); in that case, dispynetrelay (Using Remote Servers) may be started on one of the nodes on that network and the node running dispynetrelay should be added to nodes list (and a wildcard for that network, so that other nodes on that network match that wildcard); see Examples.
depends is list of dependencies needed for computation. If computation is a Python function, each element of this list can be either Python function, Python class, an instance of class (object), a Python module, or path to a file. If computation is a program, each element can be either a Python module (useful if computation is a Python program), or path to a file. Only Python modules that are not present on nodes already need be listed; standard modules that are present on all nodes do not need to be listed here. Any code components in depends are executed on server such that the client can create an environment (state) for the jobs; for example, if client and computations exchange Class instances (objects), then the definition for that class can be included in depends so the object serialized at the client (for transfer over network as stream of bytes) and unserialized at the server using that class definition.
These dependencies are available to all the jobs executed by nodes. Unless cleanup parameter is set to False, they are removed after the computation is done. See cluster.submit in Cluster on how to distribute dependencies for each job, instead of for the computation.
job_status is a user provided function. When a job’s results become available, dispy will call provided job_status function with that job as the argument. If a job sends provisional results with dispy_provisional_result multiple times, then dispy will call provided job_status each such time. The (provisional) results of computation can be retrieved with ‘result’ field of job. job_status function runs in the context of user programs from which (Shared)JobCluster is called - for example, job_status can access global variables in programs that created cluster(s).
Note that essentially dispy’s scheduler sends notifications to execute job_status function (in a separate thread from dispy’s scheduler) but will not wait for all scheduled calls (e.g., for finished jobs) to finish before cluster.wait() terminates or all jobs finish. If jobs’ results are processed with this status function and it is required to finish processing all such status messages before user program terminates, for example, then user program should manage these notifications and submitted jobs to guarantee all results are processed appropriately.
cluster_status is a user provided function. dispy calls this function whenever there is a change in a job’s status or a node’s status. The function is called with the arguments:
status which is either DispyJob’s status, such as DispyJob.Created, DispyJob.Running, or DispyNode’s status, such as DispyNode.Initialized,
node, an instance of DispyNode about which the status is indicated,
job, an instance of DispyJob if the status is about a job, otherwise None.
This feature is used by Monitor and Manage Cluster so the cluster status can be monitored in a web browser.
As per job_status above, dispy’s scheduler simply sends notifications to run this function but will not wait for all scheduled notifications to finish before cluster.wait() terminates.
host is a string or list of strings of host name or IP address (in either IPv4 or IPv6 form) to use for (client) communication. If it is not set,
hostname
is used. If it is a list, all the given addresses are used (e.g., multi-homed network).ext_host is a string or list of strings of host names or IP addresses to use as return address for node to client communication. This may be needed in case the client is behind a NAT firewall/gateway and (some) nodes are outside. Typically, in such a case, ext_host must be the address of NAT firewall/gateway and the NAT firewall/gateway must forward ports to host appropriately. See below for more on NAT/firewall information.
ipv4_udp_multicast is a boolean to control whether to use multicast or broadcast for UDP with IPv4 (to discover nodes). The default value is False so broadcast is used. If it is True, multicast is used instead. Note that for IPv6 there is no broadcst, so multicast is always used.
dispy_port is port to use for (client). Default is value set in
dispy.config
module (9700). See Configuration Parameters for more details.dest_path is directory on the nodes where files are transferred to. Default is to create a separate directory for each computation. If a computation transfers files (dependencies) and same computation is run again with same files, the transfer can be avoided by specifying same dest_path, along with the option ‘cleanup=False’.
recover_file, if given, must be a string. If it is a string, dispy uses it as path to file in which it stores information about cluster. If it is None, then a file of the form ‘_dispy_YYYYMMDDHHMMSS’ in the current directory is used. In case user program terminates unexpectedly (for example, because of network failure, uncaught exception), this file can be used to retrieve results of jobs that were scheduled but not yet finished at the time of crash. See (Fault) Recover Jobs below.
loglevel is logging message level and should be one of DEBUG, INFO, WARNING, ERROR and CRITICAL attributes in dispy.logger. At DEBUG level many messages are logged and at CRITICAL level very few message are logged. Default value is *dispy.logger.INFO.
setup, if given, must be a Python function. This function is transferred, along with dependencies and computation, to each node and executed on each node after all dependencies have been copied to dest_path and working directory has also been set to dest_path (which would also be the working directory when computation jobs are executed). With dispy version 4.11.0, this function is executed on a node with the arguments set with
NodeAllocate.allocate()
method for that node.The setup function is executed on a node exactly once before any jobs are scheduled on that node. This function can be used, for example, to initialize global variables that are then used in jobs as read-only variables for in-memory processing.
Under Linux, OS X (and other Unix variants, but not Windows), child processes (that execute computation jobs) inherit parent’s address space (among other things), so the setup function can initialize any data to global variables.
Under Windows, fork is not available, so the global variables are pickled and sent as arguments to new process; see multiprocessing’s Programming guidelines for Windows. Thus, under Windows, modules, for example, can’t be declared global variables in setup function, as modules are not serializable. If setup feature is used under Windows, care must be taken not to share same node with other computations, as one computation’s global variables may interfere with another computation’s.
Until dispy version 4.9.2, the setup function is executed in “main” dispynode process but within computation’s private variable space. However, this can have side effects, such as modules loaded in one computation’s “setup” may affect other computations or modules may not unload cleanly when dispynode removes them during cleanup. With version 4.10.0, “setup” and “cleanup” are by default executed in a subprocess (unless dispynode is started with –unsafe_setup option). When “main” dispynode receives new job request, that job, including arguments are first sent to that subprocess which then creates new process to execute the job. With this approach, “main” dispynode is immune from any effects from computations and as each subprocess is created for each computation, each computation is also immune from effects from other computations. However, passing jobs to subprocess may have some performance overhead, especially if arguments are rather large in size. With version 4.10.0, “setup” computations can control this behavior:
If “setup” function returns 0, subprocess is used only if any global variables are used. If no global variables are used, “main” dispynode process directly executes jobs without sending jobs to the subprocess (i.e., as done with dispy until 4.9.2 version). If any global variables are used, job requests are sent to the subprocess which then creates jobs.
If “setup” function returns 1, subprocess is not used. If any global variables are used in “setup”, they are transferred from the subprocess to “main” dispynode process so that they can be sent to each job process (so those variables are available to jobs). These global variables are serialized in subprocess and deserialized in “main” process. Since “main” process doesn’t execute “setup” or any depedencies sent with computation, deserialization will only work if the variables are of standard types, such as integers, strings, lists, dictionaries, sets etc.
If “setup” function returns 2, subprocess is always used, even if no global variables are used.
If “setup” function returns any other value, dispynode will treat that as failure and not execute any jobs for it.
As mentioned above, if dispynode is started with –unsafe_setup option, subprocess is not used at all; “setup” and “cleanup” run in “main” process and jobs are created by the “main” process itself.
Note that pycos’s dispycos also supports distributed / parallel computing (along with additional features, such as message passing, so client and compute task can exchange data in both directions), and with it job requests are sent directly to the process that runs the jobs so there is no extra overhead in passing jobs or even creating new process for each job.
See Examples where setup and cleanup functions are used to initialize and de-initialize global variables.
cleanup: If True (default value), any files (dependencies) transferred will be deleted after the computation is done. If it is False, the files are left on the nodes; this may speedup if same files are needed for another cluster later. However, this can be security risk and/or require manual cleanup. If same files are used for multiple clusters, then cleanup may be set to False and same dest_path used.
If cleanup is neither True nor False, must be a Python function or partial function that takes no (further) arguments. This function is transferred to each of the nodes and executed on the nodes before deleting files as mentioned above. If setup function creates global variables, for example, the cleanup function may delete those global variables.
cleanup function can call
dispynode_shutdown()
to shudown the node (i.e., quit ‘dispynode’ program) executing cleanup if that node has been started withclient_shutdown
option.pulse_interval is number of seconds between ‘pulse’ messages that nodes send to indicate they are alive and computing submitted jobs. If this value is given as an integer or floating number between 1 and 600, then a node is presumed dead (zombie) if 5*pulse_interval seconds elapse without a pulse message. If reentrant is set to False and a node becomes zombie, jobs scheduled on it are abandoned and job_status / cluster_status functions are called with DispyJob.Abandoned status. It is possible that after this the node may come back (e.g., due to network connection restoration) and jobs will be finished. If reentrant is True, jobs will be rescheduled on any (other) available nodes. See reentrant below.
If a node has psutil module installed, that node sends current availability status (CPU, memory, disk space and swap space) with DispyNodeAvailInfo at pulse_interval frequency. This information is then sent to cluster_status notification in DispyNode as avail_info member. If node doesn’t have psutil module, avail_info member would be None.
reentrant must be either True or False. This value is used only if ‘pulse_interval’ is set for any of the clusters. If pulse_interval is given and reentrant is False (default), jobs scheduled for a dead node are automatically cancelled (for such jobs, execution result, output and error fields are set to None, exception field is set to ‘Cancelled’ and status is set to Cancelled); if reentrant is True, then jobs scheduled for a dead node are resubmitted to other available nodes.
ping_interval is number of seconds. Normally dispy can locate nodes running dispynode by broadcasting UDP ping messages on local network and point-to-point UDP messages to nodes on remote networks. However, UDP messages may get lost. Ping interval is number of seconds between repeated ping messages to find any nodes that have missed previous ping messages.
secret is a string that is (hashed and) used for handshaking of communication with nodes; i.e., this cluster will only work with nodes that use same secret (see
secret
option to dispynode (Server)). This prevents unauthorized use of nodes. However, the hashed string (not the secret itself) is passed in clear text, so an unauthorized, determined person may be able to figure out how to circumvent. This feature can be used, for example, to easily create a private cluster with small number of machines in a large network.keyfile is path to file containing private key for SSL communication, same as ‘keyfile’ parameter to ssl.wrap_socket of Python ssl module. This key may be stored in ‘certfile’ itself, in which case this should be None. Same file must be used as
keyfile
option for dispynode (Server) in the case of JobCluster or for dispyscheduler (Shared Execution) in the case of SharedJobCluster.certfile is path to file containing SSL certificate, same as ‘certfile’ parameter to ssl.wrap_socket of Python ssl module. Same file must be used as
certfile
option for dispynode (Server) in the case of JobCluster or for dispyscheduler (Shared Execution) in the case of SharedJobCluster.
1.3. Cluster¶
A cluster created by either JobCluster or SharedJobCluster has following methods:
-
cluster.
submit
(\*args, \*\*kwargs[, dispy_job_depends=[]])¶ Creates a DispyJob object (see below for useful fields of this object), schedules it for execution on an eligible node (whenever one becomes available) and returns the job. Results from execution of computation with given arguments will be available in the job object after execution finishes.
This method should be called with the arguments exactly as expected by the computation given to JobCluster or SharedJobCluster. If computation is a Python function, the arguments may also contain keyword arguments. All arguments must be serializable (picklable), as these are sent over the network to the nodes. If an argument is a class object that contains non-serializable members, then the classes may provide __getstate__ method for this purpose (see ‘_Job’ class in dispy.py for an example). If computation is a standalone program, then all arguments must be strings.
When submitting many jobs or if arguments are large, consider the memory required to store all the arguments for all submitted jobs - until all references to jobs and arguments are cleared, the space required to keep them may accumulate, causing memory issues at client/scheduler. In many examples, jobs are stored in a list and never removed; this is not suitable when jobs and arguments require lot of space. See Efficient job submission for an example on one approach to deal with such cases.
As noted in JobCluster, dispy can distribute dependencies to nodes when the cluster is created. All the jobs created by computations can use those dependencies. If, however, each job has its own dependencies (that are needed only for that job’s execution), then such dependencies can be passed with optional keyword parameter dispy_job_depends list to cluster.submit. The format of this dependencies list is same as that for depends parameter of JobCluster. The keyword parameter dispy_job_depends is used by cluster.submit and removed before job is created, so that the computation itself doesn’t get this parameter. Dependencies distributed by dispy_job_depeneds are automatically removed after the job is done (even if cleanup is set to False when creating cluster).
Note
Sending large objects as arguments or as results of jobs can impact efficiency of cluster usage (as time is spent in serializing and transferring the objects). When possible, consider creating objects required for computation within the computation itself, perhaps by sending chunks of data to nodes (with depends argument or cluster.send_file method) and sending results to client with dispy_send_file method.
If computation is (Python) function, it can use 3 variables (defined in global scope by dispynode):
dispy_job_path is full directory name where job is executed. Any files transferred would be available under it. Is suid option is not used to start dispynode, computations can use os.getcwd() to get working directory, but when suid option is used, dispynode sets permissions for parts of path to be unreadable to computations. This may cause os.getcwd() to fail (e.g., on OS X), so dispy_job_path is reliable way to get working directory.
dispy_node_name is name of node.
dispy_node_ip_addr is IP address of node.
-
cluster.
submit_node
(node, \*args, \*\*kwargs[, dispy_job_depends=[]])¶ Similar to submit method above, except that the job is scheduled on given node only. node can be an instance of DispyNode (e.g., as received in cluster_status notification), an IP address or host name.
This method, along with cluster_status option, can be used to schedule jobs in the client program to execute jobs with full control over when / how jobs are scheduled. (Jobs submitted with submit method are scheduled with load balancing algorithm.) See job_scheduler.py file in the examples directory for an example use case.
-
cluster.
send_file
(path, node)¶ Sends file with path to node. node can be an instance of DispyNode (as received in cluster_status notification, for example), IP address or host name. The file is copied to destination path (working directory of computation) with just the base name; i.e., without directory structure. If the file is copied successfully, the return value is 0.
send_file, along with submit_node, can be used with cluster_status to send data files after a node is initialized, for example, to distribute data for distributed in-memory processing, implement different schedulers, or fully control where and when jobs are submitted etc.
-
cluster.
cancel
(job)¶ Terminates given job (returned from previous
cluster.submit()
). If the job is queued for execution, it is removed from the scheduler, or terminated (killed) it if it has started execution at a node. Note that if the job execution has any side effects (such as updating database, files etc.), canceling a job may leave unpredictable side effects, depending on at what point job is terminated.If job being cancelled is currently running, dispynode running that job first interrupts the computation, which raises KeyboardInterrupt in that process. The computations can handle it with try/except, for example, to save state of computation and send it to client, or send results; computations have about 5 seconds of time from when KeyboardInterrupt is raised before the process is terminated by dispynode. If computation doesn’t handle this exception, typically the process is terminated right away. See Long Running Computations for an example.
-
cluster.
discover_nodes
(nodes)¶ Establish connection to each node (running dispynode) in given ‘nodes’ list. Each node may be host name or IP address, or an instance of NodeAllocate, or string ‘*’. If a node is ‘*’, UDP broadcast is used to send discovery information to which dispynodes respond. If a node is host name, IP address etc., TCP connection is used to establish communication with that node.
This method is usually not required; dispy scheduler automatically calls this method when a cluster is initialized. However, it may be useful in some cases, such as when a node in remote network is restarted. See –scheduler_node option in dispynode (Server) to direct node to establish communication with the scheduler as well.
Another option is to periodically invoke this method (in a daemon thread, for example) to find nodes if a network (e.g., WiFi) is dropping UDP packets.
-
cluster.
allocate_node
(node)¶ Allocate given node for the cluster. ‘node’ may be host name or IP address, or an instance of NodeAllocate, or string with wild card as per ‘nodes’ parameter to JobCluster.
If the request is successful, the return value is 0; otherwise, it will be -1.
-
cluster.
deallocate_node
(node)¶ Stop scheduling new jobs on node for the cluster. ‘node’ may be host name or IP address, or an instance of dispynode (Server). Currently executing jobs on the node will continue to run until they are finished and the node will have all data / initialization setup intact, so the node can be allocated again later to restart scheduling new jobs. Used this way, a node can be suspended and resumed to schedule jobs for a cluster.
If the request is successful, the return value is 0; otherwise, it will be -1.
-
cluster.
close_node
(node)¶ Close node for the cluster. ‘node’ may be host name or IP address, or an instance of dispynode (Server). The node will continue to run any currently jobs being executed and when all jobs are finished, node will run any ‘cleanup’ function supplied and remove files transferred etc. for given cluster. If necessary, node can be reallocated but will then again have to be setup for the cluster, unlike ‘deallocate_node’ which doesn’t clean up.
If the request is successful, the return value is 0; otherwise, it will be -1.
-
cluster.
resetup_node
(node)¶ Close given node without releasing it and then setup again. This will cause following steps:
dispy scheduler will stop sending new jobs to the node and node will wait for currently executing jobs to finish and then run cleanup method (with same arguments used for setup if given before), if it was given. The cleanup method may remove any data files, global variables generated / used in current run. The node will not remove any files not removed by cleanup; however, all global (in-memory) variables will be removed so new setup method is executed as if it is a fresh run (except for any files left behind after cleanup).
dispy scheduler will call allocate method to get new setup dependencies and arguments. If the return value results in non-zero value for the CPUs (see NodeAllocate), the new dependencies are transferred to the node.
Node will then run setup method with new setup arguments. If this method returns 0, the scheduler will use the node to run new jobs.
See
replace_inmem.py
example that shows how resetup_node is used to replace in-memory data at a node.This method is not supported with SharedJobCluster.
-
cluster.
node_jobs
(node)¶ Returns list of jobs currently running on given node, given as host name or IP address.
-
cluster.
set_node_cpus
(node, cpus)¶ Sets (alters) CPUs managed by dispy on a node, given as host name or IP address, to given number of CPUs. If the number of CPUs given is negative then that many CPUs are not used (from the available CPUs).
If the cluster is SharedJobCluster, set_node_cpus does not change the CPUs (as the node is shared by other users/programs).
If the request is successful, the return value indicates number of CPUs used by dispy for given node. Otherwise, the method returns -1.
-
cluster.
wait
(timeout=None)¶ Wait for all scheduled jobs to complete. If timeout is given, it is maximum time in seconds to wait for jobs to complete. If jobs complete before timeout, the return value is True; otherwise, the return value is False.
This method waits for all jobs to finish execution at nodes.
Note that, as mentioned in job_status above, this will not wait for all notifications to job*status and cluster_status to be processed.
-
cluster.
close
(timeout=None, terminate=False)¶ Close the cluster (jobs can no longer be submitted to it). If there are any jobs pending, this method waits until they all complete before cluster is closed. If timeout is given, it is maximum time in seconds to wait for jobs to complete. If jobs complete before timeout, cluster is closed and returned value is True; otherwise, the value returned is False unless terminate is True in which case pending jobs are cancelled (removed or terminated by nodes executing them). Additional clusters may be created after this call returns.
-
cluster.
print_status
()¶ Prints status about nodes, such as time each node spent executing jobs etc.
The columns “Sent” and “Rcvd” show amount of data sent to / received from a node. This information may be useful to understand application or network performance. Each time a job or file is sent, the size of (serialized) data is counted as data sent and each time a job reply or file is received, the size of (serialized) data is counted as data received. Note that overheads of protocol are not counted in these fields.
1.4. DispyJob¶
The result of cluster.submit()
call of a cluster is an instance of
DispyJob (see dispy.py), which can be used to examine status of job execution,
retrieve job results etc. It has following attributes:
id is initialized to successive integers, starting with 1. If necessary, it can be reset by client program to any value appropriate. This is the only field that can be written to by the client. Rest of the attributes are read-only.
While any Python object can be assigned to this attribute, dispy debug log (if enabled) prints this as string and node information in Manage Cluster (Nodes) shows id attribute as string as well. So if id is an instance of a class, providing appropriate __str__ method for the class, as done in the example in Example, will give useful information about the job; otherwise, id may be shown simply as reference to class instance.
status indicates current status of job. Its value is one of Created, Running, ProvisionalResult, Cancelled, Terminated, *Finished (DispyJob class properties).
host is IP address of node where the job is currently executing, or executed.
When a submitted job is called with job(), it returns that job’s execution result, waiting until the job is finished if it has not finished yet. After a job is finished, following attributes can be read:
result will have computation’s result - return value if computation is a function and exit status if it is a program. job.result is same as return value of job()
stdout and stderr will have stdout and stderr strings of the execution of computation at server node
exception will have exception trace if computation raises any exception; in this case job.result will be None
start_time will be the time when job was scheduled for execution on a node
end_time will be time when results became available
Job’s result, stdout and stderr should not be large - these are buffered and hence will consume memory (not stored on disk). Moreover, like args and kwargs, result should be serializable (picklable object). If result is (or has) an instance of Python class, that class may have to provide __getstate__ function to serialize the object. If the result is not serializable, or it is too big, then the data can be saved to a file and transferred to the client, as explained in Transferring Files.
After jobs are submitted, cluster.wait()
can be used to wait until all
submitted jobs for that cluster have finished. If necessary, results of
execution can be retrieved by either job() or job.result, as described
above.
1.5. NodeAllocate¶
As mentioned in JobCluster above, nodes must be a list, each element of which must be either a string, tuple or NodeAllocate object. The string and tuple types are used for convenience, but these elements are converted to NodeAllocate object by JobCluster, which gives more control to specify how to use nodes.
-
class
NodeAllocate
(host, port=None, cpus=0, depends=[], setup_args=())¶ host must be a string, indicating either host name, IP address of node or string with wildcard ‘*’ that matches IP address of nodes found, as explained in nodes of JobCluster.
port is port used by dispynode on given node. If it is not given, default dispynode port (9700) is used.
cpus is maximum number of CPUs to use on given node. Default is to use all CPUs available.
depends must be a list of file names that will be sent to matching node before cluster’s setup function is executed (with setup_args). Alternately,
allocate
method can instantiate this attribute.setup_args must be a tuple of parameters that will be used to execute cluster’s setup method and cleanup method (so the parameters given must match the arguments to these two methods). Alternately,
allocate
method can instantiate this attribute.When dispy finds a node, each NodeAllocate object’s allocate method is called.
-
allocate
(cluster, ip_addr, name, cpus, avail_info=None, platform=None)¶ cluster is instance of JobCluster.
ip_addr is IP address of node for which this method is called.
name is host name of node.
cpus is number of CPUs available on that node.
avail_info indicates latest availability status of node as an instance of DispyNodeAvailInfo if that node has psutil module installed; otherwise avail_info would be None.
platform is as obtained by platform.platform() method on the node. This parameter can be used to filter nodes when cluster consists of nodes with multiple platforms.
This method should return a number indicating number of CPUs to use on that node. The default allocate method returns minimum of CPUs found (given to allocate method) and cpus specified in constructor. If this method returns 0 (cpus), the node for which this method is called (i.e., node with ‘ip_addr’ called with) is not used for this computation.
This method can also (re)set depends and setup_args to speficy these attributes dnynamically. See In-memory Processing for an example on how to use this approach.
NodeAllocate class can be sub-classed (when used with JobCluster, but not with SharedJobCluster) to provide different functionality (see below). For example:
class FilterNodeAllocate(dispy.NodeAllocate): def allocate(self, cluster, ip_addr, name, cpus, avail_info=None, platform=''): # use only nodes that have 1GB of memory and 100GB of disk space available # and run Linux 64-bit GB = 1024 ** 3 if (isinstance(avail_info, dispy.DispyNodeAvailInfo) and avail_info.memory > GB and avail_info.disk > (100 * GB) and re.match('Linux.*x86_64', platform)): return cpus # use all available CPUs on this node else: return 0 # don't use this node
FilterNodeAllocate can then be used to specify nodes when creating JobCluster, for example, with:
cluster = dispy.JobCluster(compute, nodes=[FilterNodeAllocate('*)])
Sub-classing NodeAllocate is not supported with SharedJobCluster, as nodes are processed by a remote scheduler which doesn’t execute any client code. Instead, attributes set in each NodeAllocate instance are sent to scheduler and dispy.NodeAllocate’s allocate method is executed by scheduler using those attributes. For example, creating cluster with:
nodes = [dispy.NodeAllocate('server%s' % i, cpus=8, depends=['data_file%s' % i], setup_args=(i,)) for i in range(1, 10)] cluster = dispy.SharedJobCluster(compute, setup=node_setup, nodes=nodes)
will transfer ‘data_file1’, ‘data_file2’ etc. to corresponding node named ‘server1’, ‘server2’ etc. and execute
node_setup
function on the nodes with one integer argument 1, 2, etc., each node using up to 8 CPUs. Ifnode_setup
succeeds (i.e., returns 0, 1 or 2) on a node, the scheduler will start submitting jobs to that node for that cluster.-
1.6. DispyNodeAvailInfo¶
If a node has psutil module installed, its latest availability status is given as an instance of DispyNodeAvailInfo, which has four read-only attributes:
cpu is available CPU as percent. If it is close to 100, the node is not busy at all, and if it is close to 0, all CPUs are busy running compute-intensive tasks.
memory is available memory in bytes. This is not total memory in system, but usable memory for applications (as interpreted by psutil module).
disk is available disk space in bytes for the partition that dispynode uses as given by dest_path_prefix option (used by dispynode to save clients’ files and jobs are run).
swap is available swap space as percent. If it is close to 100, most of swap space is being used (i.e., system is under heavy memory load), and if it is close to 0, very little of swap space is being used.
1.7. Provisional/Intermediate Results¶
-
dispy_provisional_result
(result, relay=False, timeout=MsgTimeout)¶ Computations (if they are Python functions) can use this function to send provisional or intermediate results to the client. For example, in optimization computations, there may be many (sub) optimal results that the computations can send to the client. The client may ignore, cancel computations or create additional computations based on these results. When computation calls dispy_provisional_result(result), the Python object result (which must be serializable) is sent back to the client and computation continues to execute. The client should use job_status option to process the information, as shown in the example:
import random, dispy def compute(n): # executed on nodes import random, time, socket name = socket.gethostname() cur_best = 1 for i in range(0, n): r = random.uniform(0, 1) if r <= cur_best: # possible result dispy_provisional_result((name, r)) cur_best = r time.sleep(0.1) # final result return (name, cur_best) def job_status(job): # executed at the client if job.status == dispy.DispyJob.ProvisionalResult: if job.result[1] < 0.005: # acceptable result; terminate jobs print('%s computed: %s' % (job.result[0], job.result[1])) # 'jobs' and 'cluster' are created in '__main__' below for j in jobs: if j.status in [dispy.DispyJob.Created, dispy.DispyJob.Running, dispy.DispyJob.ProvisionalResult]: cluster.cancel(j) if __name__ == '__main__': cluster = dispy.JobCluster(compute, job_status=job_status) jobs = [] for n in range(4): job = cluster.submit(random.randint(50,100)) if job is None: print('creating job %s failed!' % n) continue job.id = n jobs.append(job) cluster.wait() cluster.print_status() cluster.close()
In the above example, computations send provisional result if computed number is <= threshold (0.2). If the number computed is less than 0.005, job_status deems it acceptable and terminates computations.
- relay boolean argument imlies whether to send the result through (shared)
scheduler when SharedJobCluster is used. If relay is False (default), the result is sent directly to the client. This is efficient (as it avoids additional transfer of data), but won’t work if communication setup between node and scheduler is different from that between scheduler and client (e.g., if SSL files used are different). If relay is True, the result is sent through scheduler and will always work (but less efficient). If JobCluster is used, relay value has no effect (as there is no shared scheduler between client and node).
timeout is timeout in seconds for socket. Default value is MsgTimeout defined in Configuration Parameters (which is 10 seconds).
1.8. Transferring Files¶
-
dispy_send_file
(path, relay=False, timeout=MsgTimeout)¶ Computations (if they are Python functions) can use this function to send files (on server nodes) to the client. This is useful if the nodes and client don’t share a file system and computations generate files.
relay and timeout arguments are as per dispy_provisional_result above.
As explained in DispyJob, computation results (return value in the case of Python functions and exit status in the case of programs), along with any output, errors and exception trace are transferred to the client after a job is finished executing. As these are stored in memory (and sent back as serializable objects), they should not be too big in size. If computations generate large amount of data, the data can be saved in file(s) (on the nodes) and then transferred to the client with dispy_send_file(path). The files are saved on the client with the same path under the directory where the client is running.
For example, a computation that saves data in a file named “file.dat” and transfers to the client is:
def compute(n): # ... generate "file.dat" dispy_send_file("file.dat") # ... computation may continue, possibly send more files return result
dispy_send_file returns 0 if the file is transferred successfully. The maximum size of the file transferred is as per max_file_size option to dispynode (Server); its default value is 10MB.
All the files sent by the computations are saved under the directory where client program is running. If more than one file is sent (generated on multiple nodes, say), care must be taken to use different paths/names to prevent overwriting files.
Note that depends parameter to JobCluster and SharedJobCluster can be used to transfer files (and other types of dependencies) from client to nodes. If files need to be sent after cluster is created, cluster’s send_file method can be used.
1.9. (Fault) Recover Jobs¶
As mentioned above, dispy stores information about cluster in a file as per recover_file option. If user program terminates unexpectedly, the nodes that execute those jobs can’t send the results back to client. In such a case, the results for the jobs that were scheduled at the time of crash can be retrieved from the nodes with the function in dispy module:
-
dispy.
recover_jobs
(recover_file=None, timeout=None, terminate_pending=False)¶
recover_file is path to the file used when cluster is created. See recover_file option in JobCluster above. If this option is *None (default), latest file with prefix _dispy_ in the name (which is the default form for recover_file option) is used.
timeout is number of seconds to wait for job results from nodes. If timeout seconds is not None, then results for jobs that have finished before timeout are returned; other jobs continue to execute.
terminate_pending, if True, will terminate jobs on the nodes that have not finished. This can be used in combination with timeout to release a node from client that crashed.
This function reads the information about cluster in the recover_file, inquires nodes about jobs, retrieves results of jobs, waiting until they are all finished. The result of this function is list of DispyJob instances with certain attributes filled, such as result, stdout, stderr, exception, status etc. This function also releases the nodes from the computation after retrieving the results of all pending jobs, so the nodes will respond to other clients.
As an example, assume that client’s recover_file is _dispy_20160125235537 and crashed after submitting 5 jobs. To recover these jobs, run following program:
import dispy
jobs = dispy.recover_jobs('_dispy_20160126112746')
for job in jobs:
print('Job result: %s' % job.result)
This will wait until all 5 jobs finish and return DispyJob instances. (Starting with version 4.8.3, recover_file parameter can be omitted, in which case latest recovery file of the form _dispy_* is used, which is the default form.)
1.10. Configuration Parameters¶
With 4.11.0 release, varoious configuration parameters are defined in
config.py
file:
DispyPort is starting number of range of ports used in dispy. The default value is 9700 (this used to be 61590 before 4.15.0). This parameter can be changed in dispy client program by either setting it after loading config module and updating this variable (e.g., with
dispy.config.DispyPort = n
), before creating cluster, or by setting optional parameter dispy_port=n to JobCluster or SharedJobCluster. Note that setting this variable affects other ports used.HTTPServerPort is port number used in Monitor and Manage Cluster module as well as in dispyadmin program. Web browsers can then connect to at this port.
IPv6MulticastGroup and IPv4MulticastGroup are addresses used in multicast when multicast is used.
MstTimeout is socket timeout (in seconds) for connection to peers. Setting it too low may result in exceptions / aborting transfers especially over slow networks.
MaxFileSize is maximum size in bytes allowed for files. If it is 0 (default), there is no limit. If a limit is specified and file is larger than the limit, remote peer will not accept request for transfer.
SharedSchedulerClientPort is port number used for client when SharedJobCluster is used. Default value 0 indicates that scheduler can use a random port. This can be changed with
client_port
parameter to SharedJobCluster.ClientPort, NodePort and SharedSchedulerPort are ports used by client (i.e., JobCluster or SharedJobCluster), dispynode (Server) and dispyscheduler (Shared Execution) respectively. These must be expressions as they are evaluated at run time. These expressions use the setting DispyPort described above.
1.11. NAT/Firewall Forwarding¶
By default dispy client uses UDP and TCP ports 9700, dispynode uses UDP and TCP ports 9701, and dispyscheduler uses UDP and TCP pots 9700 and TCP port 9701. If client/node/scheduler are behind a NAT firewall/gateway, then these ports must be forwarded appropriately and ext_host option must be used. For example, if dispy client is behind NAT firewall/gateway, JobCluster / SharedJobCluster must set ext_host to the NAT firewall/gateway address and forward UDP and TCP ports 9700 to the IP address where client is running. Similarly, if dispynode is behind NAT firewall/gateway, ext_host option must be used.
1.12. SSH Port Forwarding¶
If nodes are on remote network, nodes may not be able to communicate with client and NAT/Firewall Forwarding may not be possible. In such cases, SSH can be used for port forwarding.
To use this with JobCluster, client port (default is 9700) should be forwarded from each
node with ssh -R 9700:127.0.0.1:9700 node, and then parameters host=127.0.0.1 should be
set to JobCluster. See sshportfw.py
for an example that
can connect to multiple nodes in remote network(s).
If both client and node can only use SSH, both client and node ports must be forwarded with ssh -R 9700:localhost:9700 -L 9701:localhost:9701 server. Then start dispynode on server with dispynode -i localhost and client with cluster = dispy.JobCluster(compute, host=”127.0.0.1”, nodes=[“127.0.0.1”]). However, this approach can’t be used to connect to more than one node.
SharedJobCluster by default uses a random port for client. To use port forwarding with dispyscheduler (Shared Execution) running on remote network, force SharedJobCluster to use specific port (other than 9700, 9701 and 9702, which are used by dispynode and dispyscheduler); here port 2345 is assumed. Use ssh tunneling with ssh -R 2345:127.0.0.1:2345 scheduler_node to forward port 2345 and then specify port=2345, ext_host=127.0.0.1 to SharedJobCluster.
If dispyscheduler (Shared Execution) is running on local network and nodes are on remote network, start dispyscheduler with –ext_host 127.0.0.1 option and then forward port 9700 (default client port) from each node as done in the case of JobCluster above. No special setup is needed for SharedJobCluster, as the scheduler and client are in same network.
1.13. SSL (Security / Encryption)¶
If nodes are on public / remote networks, SSL (Secure Sockets Layer) can be used to encrypt all communication, including data, so that the connection is private - only the sender and recipient see the data exchanged and other observers see encrypted data. dispy provides a simple mechanism where digital certificates / symmetric keys are used to encrypt data on one side and decrypt data on the other side. Below are set of commands to generate the certificates using openssl tool (this is just an example - there are other approaches / methods):
openssl req -x509 -newkey rsa:2048 -keyout sskeycert.pem -nodes -out sskeycert.pem -sha256 -days 1000
This command generates self-signed key and certificate pair in file sskeycert.pem that should be used as certfile parameter where SSL is used. It is also possible to generate key and certificate in separate files, and use keyfile and certfile parameters appropriately.
Once the key/certificate pair is generated (or obtained by other means), they
should be copied to each of the nodes (over a secure channel, such as
ssh). The nodes should be started with dispynode.py --certfile
sskeycert.pem
(or if key and certificate are in separate files, with
dispynode.py --certfile sscert --keyfile sskey
) and the client should create
cluster with certfile=sskeycert.pem (or certfile=sscert, keyfile=sskey if
they are in separate files).
If SharedJobCluster is used, the certificates used by client should be those used with cluster_certfile and cluster_keyfile options used when starting dispyscheduler; the client itself doesn’t communicate with nodes. dispyscheduler may use additional set of certificates to communicate with nodes using node_certfile and node_keyfile which should also be used by nodes.
1.14. Cloud Computing¶
When client is in the same network as nodes, using dispy is same as explained above, except that some cloud platforms may not allow UDP broadcast preventing client to discover nodes automatically so nodes may have to be listed explicitly using nodes parameter.
To use nodes in a cloud computing platform when client is in a different network (e.g., local), ext_host of dispynode (Server) can be used to establish communication between client and remote nodes. Amazon EC2 cloud computing is used an example below, but similar setup should work with other cloud computing platforms, or any setup where nodes are in remote network.
It may be necessary to setup the configuration to allow TCP ports 9700-9702 (default ports used by dispy) or any other ports used in dispy client. For example, with EC2 “Security Group” should be created and assigned to the instance so inbound TCP ports 9700-9702 are allowed.
With EC2 service, a node has a private IP address (called ‘Private DNS Address’) that uses private network of the form 10.x.x.x and public address (called ‘Public DNS Address’) that is of the form ec2-x-x-x-x.x.amazonaws.com. After launching instance(s), install dispy on the node(s) and run dispynode on each node as:
dispynode.py --ext_host ec2-x-x-x-x.y.amazonaws.com
(this address can’t be used with -i/–host option, as the network interface is configured with private IP address only). This node can then be used by dispy client from outside EC2 network by specifying ec2-x-x-x-x.x.amazonaws.com in the nodes list (perhaps using EC2 servers to augment local processing units). With ext_host, dispy acts similar to NAT - it announces ext_host to other services instead of the configured host so that external services send requests to ext_host and if firewall/gateway forwards them appropriately, dispy will process them.
If the EC2 node can connect to client with the IP address and port (default 9700) used by client, the cluster should be created in the client with:
cluster = dispy.JobCluster(compute, nodes=['ec2-x-x-x-x.y.amazonaws.com'])
If the client is behind a router, the router’s firewall can be configured to forward port 9700 to client’s IP address, cluster can be created with:
cluster = dispy.JobCluster(compute, nodes=['ec2-x-x-x-x.y.amazonaws.com'],
ext_host='router')
where router
is external host name or IP address of router (as can be addressed by nodes in
EC2).
If client is behind a router and its firewall can’t be setup to forward port 9700, then ssh can be used to forward the port 9700. To use this, first login to each EC2 node with:
ssh -i ec2-key.pem 9700:127.0.0.1:9700 userid@ec2-x-x-x-x.y.amazonaws.com
Then start dispynode on each node as mentioned above, and create cluster on local computer with:
cluster = dispy.JobCluster(compute, nodes=['ec2-x-x-x-x.y.amazonaws.com'],
host='127.0.0.1')
In case of problems, enable debugging on the nodes (with -d
option) and
client (with loglevel=dispy.logger.DEBUG
option). If that still doesn’t work,
check that the node is reachable with telnet ec2-x-x-x-x.y.amazonaws.com
9701
from client (after starting dispynode); the output should contain
Connected message.