2. dispynode (Server)

dispynode.py program should be running on each of the nodes (servers). It executes jobs for dispy clients; i.e., jobs submitted by JobCluster or SharedJobCluster. Usually no options are needed to run this program; ‘-d’ option may be useful to see log of jobs being executed.

The server sends node’s availability status (available CPU as percent, memory in bytes and disk space in bytes) to clients at pulse_interval as specified with JobCluster or option given to dispyscheduler (Shared Execution) (when used with SharedJobCluster) if psutil module is available. This availability information can be used by clients to monitor/analyze application performance, filter nodes based on available resources with customized NodeAllocate etc. The Monitor and Manage Cluster module also maintains and sends this information to web browsers so users can monitor availability status of nodes.

Below are various options to invoke dispynode program:

  • --save_config <file> saves configuration (i.e., options as given except for save_config) in given file and exits. This can be used to save the configuration on one node, copy that file over to all the other nodes and start dispynode with --config option to start dispynode with that configuration. If file is not given, the configuration is written to stdout.

    For example, dispynode.py -d --cpus -1 -s test --zombie_interval=10 --service_start 17:00 --service_stop 6:00 --service_end 8:00 --save_config /etc/dispynode.cfg stores given options in /etc/dispynode.cfg. This file can then be used in all the nodes to start dispynode with dispynode.py --config /etc/dispynode.cfg. Note that node specific configuration (e.g., IP address, name etc.) should not be given, as another node can’t be started with those options.

  • --config <file> reads configuration from given file (e.g., saved with save_config option).

  • -d enables debug messages that show trace of execution.

  • -c n or --cpus=n sets the number of processing units to n. Without this option, dispynode will use all the processing units available on that node. If n is positive, it must be at least 1 and at most number of processing units on that node; dispynode will then use at most n processors. If n is negative, then that many processing units are not used by dispynode.

  • -i addr or --host=addr directs dispynode to use given addr for communication. addr can be either host name or IP address in IPv4 or IPv6 format. If this option is not given, IP address associated with default host name is used.

  • --ext_host=addr directs dispynode to announce addr in network communication so that the node can be used if it is behind NAT firewall/gateway that is configured to use addr. See NAT/Firewall Forwarding below.

  • -p n or --dispy_port=n directs dispynode to set given port n to dispy.config.DispyPort port (instead of 9700). dispynode uses port dispy.config.DispyPort + 1 (i.e., n + 1) for TCP and UDP. See Configuration Parameters for more details.

  • --name=name associates given name to the node. If this option is not given, result of socket’s gethostname() is used as name.

  • -s secret or --secret=secret directs dispynode to use ‘secret’ for hashing handshake communication with dispy scheduler; i.e., this node will only work with clients that use same secret (see secret option to JobCluster and node_secret option to dispyscheduler (Shared Execution)).

  • --dest_path_prefix=path directs dispynode to use path as prefix for storing files sent by dispy scheduler. If a cluster uses dest_path option (when creating cluster with JobCluster or SharedJobCluster), then dest_path is appended to path prefix. With this, files from different clusters can be automatically stored in different directories, to avoid conflicts. Unless cleanup=False option is used when creating a cluster, dispynode will remove all files and directories created after the cluster is terminated.

  • --scheduler_node=addr: If the node is in the same network as the dispy scheduler or when no jobs are scheduled at the time dispynode is started, this option is not necessary. However, if jobs are already scheduled and scheduler and node are on different networks, the given addr is used for handshake with the scheduler.

  • --ipv4_udp_multicast controls whether to use multicast or broadcast for UDP with IPv4 (to discover nodes). If this option is not given, broadcast is used. If it is given, multicast is used instead. Note that for IPv6 there is no broadcst, so multicast is always used.

  • --keyfile=path is path to file containing private key for SSL communication, same as ‘keyfile’ parameter to ssl.wrap_socket of Python ssl module. This key may be stored in ‘certfile’ itself, in which case this must be None (default). Same file must be used as keyfile parameter for JobCluster, or node_keyfile option with dispyscheduler (Shared Execution) (when SharedJobCluster is used).

  • --certfile=path is path to file containing SSL certificate, same as ‘certfile’ parameter to ssl.wrap_socket of Python ssl module. Same file must be used as certfile parameter for JobCluster, or node_certfile option with dispyscheduler (Shared Execution) (when SharedJobCluster is used).

  • --max_file_size n specifies maximum size of any file transferred from/to clients. If size of a file transferred exceeds n, the file will be truncated. n can be specified as a number >= 0, with an optional suffix letter k (indicating n kilobytes), m (n megabytes), g (n gigabytes) or t (n terabytes). If n is 0 (default), there is no maximum limit.

  • --zombie_interval=n indicates dispynode to assume a scheduler / client is a zombie if there is no communication from it for n minutes. dispynode doesn’t terminate jobs submitted by a zombie scheduler; instead, when all the jobs scheduled are completed, the node frees itself from that scheduler so other schedulers may use the node.

    Starting with version 4.10.1, dispynode checks if any job processes is dead (i.e., process terminated either due to an exception or computation function used exit instead of return, or process was killed). If a job process is dead / zombie, dispynode will cleanup that process and send reply for that job with status DipsyJob.Terminated.

    The default value for zombie_interval is 60 (minutes), which may be too long especially when node is shared, so a smaller value, such as 10, 5 or even 1 may be more appropriate.

  • --ping_interval=n is interval in seconds to send ping messages to discover schedulers. With the default value 0 dispynode doesn’t send such messages, except when dispynode is started. In addition to broadcasting UDP messages, dispynode will send TCP messages to the last scheduler (or for the first time the scheduler given with scheduler_node option) at given interval.

  • --clean indicates dispynode should remove any files saved from previous runs. dispynode saves any files sent by dispy clients and information about jobs’ execution results that couldn’t be sent to clients (because of network failures, clients crashed etc.). The cleaning is done once when the node is starting. If dispynode is left running for a long time, it may be advisable to periodically remove such files (perhaps files that were accessed before a certain time). Note that dispy sets the timestamps of files saved from dispy client computations to the timestamps on the clients, so modification times of such files may not be a good measure to know if the files are still in use.

  • --unsafe_setup causes dispynode to run client’s “setup” and “cleanup” functions, if given, to be run in “main” process (as done until version 4.9.2). If these functions have side effects, that client and other clients may be affected. Starting with 4.10.0 version, unless this option is given, dispynode runs setup and cleanup functions in a subprocess and uses that process to run all jobs (so any global variables in setup are available to jobs). This means job arguments are sent to this subprocess which then creates jobs, which may be a bit inefficient. If this is an issue, consider using pycos’s dispycos which sends job arguments directly to the computing process so there is no overhead.

    If a client doesn’t use setup and cleanup functions, subprocess is not used so jobs are started by main process itself (as done until 4.9.2 version). See description for setup option in JobCluster.

    It is recommended that this options is not used (so setup and cleanup functions run in subprocess) if a node is shared by multiple users.

  • --force_cleanup indicates dispynode should cleanup computations even if they disabled cleanup (i.e., with cleanup=False when cluster is created). This is useful if a node is shared by different clients to guarantee that all files transferred by computations are automatically removed when computations are closed.

  • --client_shutdown indicates that dispynode can be shutdown by client by calling dispynode_shutdown() in cleanup function.

  • --msg_timeout n specifies timeout value in seconds for socket I/O operations with the client / scheduler. The default value is 5 seconds. If the network is slow, this timeout can be increased. Bigger timeout values than necessary will cause longer delays in recognizing communication failures.

  • --service_start HH:MM, --service_stop HH:MM, --service_end HH:MM options allow service (executing jobs) only between those times (of day). HH:MM should be in 24 hour format. service_stop is optional and if given, the node stops accepting jobs at that time. Any jobs executing at service_end will be terminated (killed) so the clients should only submit reentrant computations if this feature is used. For example, if service_start is set to 17:00, service_stop set to 07:00 and service_end is set to 08:00, the node will execute jobs from 5PM, stop accepting new jobs at 7AM (next day), and kill any running jobs at 8AM. Then it will not accept any jobs until 5PM.

  • --serve n specifies maximum number of clients that can use the server. The default value of -1 implies no limit and any positive number causes dispynode to quit after running computations from that many clients.

    dispynode decrements number of clients left to run when all computations from a client are closed. With this, it is possible to run more computations than given n if the scheduler issues computations before currently running computations are closed. For example, if n is 1, scheduler from JobCluster can send second computation before closing first one; the node will accept second computation, and both computations will have access to same files. With SharedJobCluster, the client for node is dispyscheduler, so computations from different programs may be accepted by the node (until all computations are closed). See exclusive option for SharedJobCluster to prevent a node from being shared in more than one computation.

    serve option can be used with Containers to run each client’s computations in a new container.

  • --admin_secret secret uses secret for adminstering node with dispyadmin. If admin_secret is not used, the node appears in client (web browser interface) but no details (such as avaliable memory, CPUs) are sent nor node can be controlled.

  • --daemon option causes dispynode to not read from standard input, so dispynode can be run as background process, or started from (system startup) scripts. If this option is not given, dispynode prints menu of commands, and commands can be entered to get status and control dispynode.

2.1. Commands

If dispynode is started as non-daemon (i.e., not as background process), following commands can be given at the prompt:

  • “quit” or “exit” terminate dispynode, killing any running jobs.

  • “stop” accepts no new jobs but continues to execute currently running jobs.

  • “start” resumes accepting new jobs.

  • “release” checks if current scheduler (client) is active and, if not, close its computations (i.e., release node from current client so another client can use the node). Current scheduler is considered active if either node is currently running any jobs for it or it can communicate with the node. “release” command is useful if current client has crashed or is not reachable due to network issues and zombie period is too long for dispynode to automatically release itself from the client.

  • “cpus” followed by number of cpus to use to change the number of jobs running at any time. If given number is negative then that many cpus are not used; e.g., if a node has 8 CPUs and given number is -3, then dispynode runs up to 5 jobs at any time.

  • Any other input causes dispynode to show status, including number of computations finished, currently running computations, jobs and CPU time used.

2.2. NAT/Firewall Forwarding

As explained in dispy (Client) documentation, ext_host can be used in case dispynode is behind a NAT firewall/gateway and the NAT forwards UDP and TCP ports 9701 (default value) to the IP address where dispynode is running. Thus, assuming NAT firewall/gateway is at (public) IP address a.b.c.d, dispynode is to run at (private) IP address 192.168.5.33 and NAT forwards UDP and TCP ports 9701 to 192.168.5.33, dispynode can be invoked as:

dispynode.py -i 192.168.5.33 --ext_host=a.b.c.d

If multiple dispynodes are needed behind a.b.c.d, then each must be started with different ‘port’ argument and those ports must be forwarded to nodes appropriately. For example, to continue the example, if 192.168.5.34 is another node that can run dispynode, then it can be started on it as:

dispynode.py -i 192.168.5.34 -p 9705 --ext_host=a.b.c.d

and configure NAT to forward UDP and TCP ports 9705 to 192.168.5.34. Then dispy client can use the nodes with:

cluster = JobCluster(compute, nodes=[('a.b.c.d', 9700), ('a.b.c.d', 9705)])

2.3. Isolate Computation Files

By default, all computations (jobs) run as the user that started dispynode. This may be acceptable when dispynode is used privately (for a single client at a time), but when node is shared (i.e., used with SharedJobCluster), it is possible for one computation to access / affect files used by another computation. Even if node is not being shared, a computation may accidentally damage files used by dispynode (that store information about processes, jobs). Starting with 4.10.1 release, dispynode supports suid and sgid features in Unix (Linux, OS X, FreeBSD etc, but not Windows). When these bits are set for Python interpreter that runs dispynode program, dispynode will switch to that uid and guid when executing client computations. In addition, the permissions for files and directories are set such that each client can only access files within its directory and not any other computation’s or files used by dispynode. Thus, each computation is protected (in terms of access to its files transferred to node) from any other computations that may also be executing at the same time.

The instructions below are to use above feature are for Ubuntu Linux, but similar approach should work for any other Unix (including OS X).

Assume that account name of user that runs dispynode program is user.

  • Create a group, say, hpc, with groupadd hpc.

  • Create a user, say, dispy, with useradd -g hpc -s /bin/false -M -d /nonexistent dispy. Shell for dispy user account is set to a program that terminates right away, so this account can’t be used to login.

  • Add user to group hpc with usermod -a -G hpc user.

  • Above 3 steps are system dependent; in other systems these steps may have to be done differently. Rest of the steps should work on all Unix systems. Copy Python interpreter to set suid and sgid with cp /usr/bin/python2.7 /usr/local/bin/supython2.7.

  • Change ownership of this program to user created above with chown dispy:hpc /usr/local/bin/supython2.7.

  • Set suid and sgid bits of mode with chmod ug+s /usr/local/bin/supython2.7.

  • Logout of user account and login (so groups for that account are reset to include group hpc). Then user should start dispynode with supython2.7 dispynode.py. With this, each computation can access only files under its directory and not any files of any other computations.

Note that this feature doesn’t prevent a computation from accessing files elsewhere on the node. If full protection is needed, either chroot environment or Containers can be used, along with suid and sgid features.

2.4. Containers

dispy isolates computation environment so that jobs from one computation don’t affect jobs from another computation, even if a node is shared and jobs from different computations are running simultaneously. Usually, any files transferred and saved by jobs are also removed when computation is closed (the exception is when dest_path is given or if cleanup is False, when files may be left behind). However, the jobs have access to server’s file system so they can be security risk. It is possible to avoid (some) issues by creating special user with access only to specific path (e.g., with a chroot environment).

If complete isolation of computation is needed, containers such as Docker, LXC containers can be used. Each container runs a copy of small Linux distribution with its own file system; the container has no access to host file system (unless it is configured to). Instructions below describe are for building and using Docker containers; LXC images can be built by installing dispy into a container and copying the image to all nodes.

dispy now includes Dockerfile under data directory where dispy module is installed, which can be obtained with the program:

import os, dispy
print(os.path.join(os.path.dirname(dispy.__file__), 'data'))

Note that Docker runs on Linux host only; with other operating systems a guest VM can be used to run Linux under which Docker can be run. See Docker Machine and Docker Docs for more details.

To build an image with latest Ubuntu Linux and dispy, install docker if not already installed, create a temporary directory, say, /tmp/dispy-docker, change to that directory and copy Dockerfile from above to that directory. (The Dockerfile can be customized to suit any additional tools or setup needed.) Then execute docker build -t dispy . (note the dot at the end). Full list of instructions for building image for Python 2.7 (for Python 3 use appropriate path to where Dockerfile is installed) are:

mkdir /tmp/dispy-docker
cd /tmp/dispy-docker
cp /path/to/Dockerfile .
docker build -t dispy .

where /path/to/ is path obtained from Python snippet (two lines) above.

Once the image is built, a new container can be run with:

docker run --net=host -it dispy

to start dispynode.py (which is the default command for the image built above) with default options. --net=host runs container in host network mode, i.e., container uses host network configuration. See –save_config and –config options to dispynode to use same options across many runs. If these or any other options are needed, Dockerfile can be customized before building the image in the instructions above.

If each client run should be started in a new container (so that clients do not interfere with each other and start in the same environment using the image built above), then serve option can be used as:

while :; do
    docker run --net=host -it dispy dispynode.py --serve 1
done

This causes dispynode to accept computations only from one client until it closes computations. When all running computations from that client are closed, dispynode quits, which terminates container and because of while loop, a new container is started from the image.