dispy: Distributed and Parallel Computing with/for Python

dispy is a generic, comprehensive, yet easy to use framework and tools for creating, using and managing compute clusters to execute computations in parallel across multiple processors in a single machine (SMP), among many machines in a cluster, grid or cloud. dispy is well suited for data parallel (SIMD) paradigm where a computation (Python function or standalone program) is evaluated with different (large) datasets independently with no communication among computation tasks (except for computation tasks sending Provisional/Intermediate Results or Transferring Files to the client). If communication/cooperation among tasks is needed, Distributed Communicating Processes module of pycos framework could be used.

Some of the features of dispy:

  • dispy is implemented with pycos, an independent framework for asynchronous, concurrent, distributed, network programming with tasks (without threads). pycos uses non-blocking sockets with I/O notification mechanisms epoll, kqueue, poll and Windows I/O Completion Ports (IOCP) for high performance and scalability, so dispy works efficiently with a single node or large cluster(s) of nodes - one user reported using dispy with 500 nodes in Google cloud platform. pycos itself has support for distributed/parallel computing, including transferring computations, files etc., and message passing (for communicating with client and other computation tasks). While dispy can be used to schedule jobs of a computation to get the results, pycos can be used to create distributed communicating processes, for broad range of use cases, including in-memory processing, data streaming, real-time (live) analytics.

  • Computations (Python functions or standalone programs) and their dependencies (files, Python functions, classes, modules) are distributed to nodes automatically. Computations, if they are Python functions, can also transfer files on the nodes to the client.

  • Computation nodes can be anywhere on the network (local or remote). For security, either simple hash based authentication or SSL encryption can be used.

  • After each execution is finished, the results of execution, output, errors and exception trace are made available for further processing.

  • In-memory processing is supported (with some limitations under Windows); i.e., computations can work on data in memory instead of loading data from files each time.

  • Nodes may become available dynamically: dispy will schedule jobs whenever a node is available and computations can use that node.

  • Job and cluster status notification mechanisms allow for asynchronous processing of job results, customized job schedulers etc.

  • Client-side and server-side fault recovery are supported:

    If user program (client) terminates unexpectedly (e.g., due to uncaught exception), the nodes continue to execute scheduled jobs. The results of the scheduled (but unfinished at the time of crash) jobs for that cluster can be retrieved easily with (Fault) Recover Jobs.

    If a computation is marked reentrant when a cluster is created and a node (server) executing jobs for that computation fails, dispy automatically resubmits those jobs to other available nodes.

  • dispy can be used in a single process to use all the nodes exclusively (with JobCluster) or in multiple processes simultaneously sharing the nodes (with SharedJobCluster and dispyscheduler (Shared Execution) program).

  • Monitor and Manage Cluster with a web browser, including in iOS or Android devices.

  • Setup private compute infrastructure with existing hardware (the only requirements are that computers are connected and have Python installed), or use external cloud computing services (users reported using dispy with Amazon EC2, Google Cloud and Microsoft Azure), either exclusively or in addition to any local compute nodes. See Cloud Computing for details.

dispy works with Python versions 2.7+ and 3.1+ and tested on Linux, OS X and Windows; it may work on other platforms too. dispy works with JIT interpreter PyPy as well.

Dependencies

dispy requires pycos for concurrent, asynchronous network programming with tasks. If dispy is installed with pip (see below), pycos is also installed automatically.

Under Windows pycos uses efficient polling notifier I/O Completion Ports (IOCP) only if pywin32 is installed; otherwise, inefficient select notifier is used.

dispynode (Server) sends node availability status (availability of CPU as percent, memory in bytes and disk space in bytes) at pulse_interval frequency if psutil module is installed. This information may be useful to clients, for example, to analyze application performance, to filter nodes based on available resources etc.

For IPv6, netifaces module is required with OS X. netifaces module is strongly recommended with Linux and Windows as well (for both IPv4 and IPv6). For IPv6 with Windows, win_inet_pton module is required with Python 2.7 (but not required with Python 3).

Download / Installation

  • dispy is available through Python Package Index (PyPI) so it can be easily installed with:

    python -m pip install dispy
    
  • Containers describes how to build Docker image and run dispynode in containers so computations are fully isolated (e.g., the files on the host operating system on nodes are not accessible to computations).

  • dispy can also be downloaded from Sourceforge Files. Starting with 4.8.8 release, the download file is PyPI package so it can be installed with:

    python -m pip install dispy-<version>.tar.gz
    

    This will work for either Python 2.7+ or Python 3.1+

  • dispy development is hosted at github. Source files downloaded from here can be used in Python 2.7 and up to Python 3.6 but not Python 3.7+. With Python 3.7+, either package can be installed from PyPI as mentioned above, or if necessary to install from source, download entire source and build package with python setup.py sdist at top level directory, which generates dispy-<version>.tar.gz file in dist directory that can be installed with python -m pip install dist/dispy-<version>.tar.gz --upgrade

  • It seems there is confusion about installing dispy on Raspberry Pi, referring to a library not compatible with dispy versions after 4.7.1. However, this may be due to ‘pip’ package is too old to install packages from pypi. In such case, there are couple of options:

    • Install ‘pip’ module (not package) with get-pip <https://pip.pypa.io/en/stable/installing/. Then download pycos package from sourceforge and install it with python -m pip install pycos-<version>.tar.gz and then dispy package as above.

    • Upgrade pip package with apt-get upgrade. Then install packages with pip package.

    Latest versions of dispy and pycos have been tested (for installation and usage) with Raspberry Pi B+.

Release Notes

Short summary of changes for each release can be found at News. Detailed logs / changes are at github commits.

Quick Guide

Below is a quick guide on how to use dispy. More details are available in dispy (Client).

dispy framework consists of 5 components:

  • A client program can use dispy (Client) module to create clusters in two different ways: JobCluster when only one instance of dispy may run and SharedJobCluster when multiple instances may run (in separate programs). If JobCluster is used, the job scheduler included in it will distribute jobs on the server nodes; if SharedJobCluster is used, dispyscheduler (Shared Execution) program must also be running.

  • dispynode (Server) program executes jobs on behalf of a dispy client. dispynode must be running on each of the (server) nodes that form clusters.

  • dispyscheduler (Shared Execution) program is needed only when SharedJobCluster is used; this provides a job scheduler that can be shared by multiple dispy clients simultaneously.

  • dispyadmin program provides web interface for adminstering cluster(s). A web browser can be used to control CPUs, service times etc. of all or individual nodes.

  • dispynetrelay (Using Remote Servers) program can be used when nodes are located across different networks. If all nodes are on local network or if all remote nodes can be listed in ‘nodes’ parameter when creating cluster, there is no need for dispynetrelay - the scheduler can discover such nodes automatically. However, if there are many nodes on remote network(s), dispynetrelay can be used to relay information about the nodes on that network to scheduler, without having to list all nodes in ‘nodes’ parameter.

As an example, consider the following program, in which function compute is distributed to nodes on a local network for parallel execution. To execute this function on all available CPUs on all nodes in local network:

  • Start dispynode program (dispynode.py) on each of the nodes on the network.

  • Run the program below (available as sample.py in examples directory of installation), which creates a cluster with function compute; this cluster is then used to create 10 jobs to execute compute with a random number.:

    # 'compute' is distributed to each node running 'dispynode'
    def compute(n):
        import time, socket
        time.sleep(n)
        host = socket.gethostname()
        return (host, n)
    
    if __name__ == '__main__':
        import dispy, random
        cluster = dispy.JobCluster(compute)
        jobs = []
        for i in range(10):
            # schedule execution of 'compute' on a node (running 'dispynode')
            # with a parameter (random number in this case)
            job = cluster.submit(random.randint(5,20))
            job.id = i # optionally associate an ID to job (if needed later)
            jobs.append(job)
        # cluster.wait() # waits for all scheduled jobs to finish
        for job in jobs:
            host, n = job() # waits for job to finish and returns results
            print('%s executed job %s at %s with %s' % (host, job.id, job.start_time, n))
            # other fields of 'job' that may be useful:
            # print(job.stdout, job.stderr, job.exception, job.ip_addr, job.start_time, job.end_time)
        cluster.print_status()
    

dispy’s scheduler runs the jobs on the processors in the nodes running dispynode. The nodes execute each job with the job’s arguments in isolation - computations shouldn’t depend on global state, such as modules imported outside of computations, global variables etc. (except if ‘setup’ parameter is used, as explained in dispy (Client) and Examples). In this case, compute needs modules time and socket, so it must import them. The program then gets results of execution for each job with job().

Indices and tables