# Parallel Computing¶

Most modern computers possess more than one CPU, and several computers can be combined together in a cluster. Harnessing the power of these multiple CPUs allows many computations to be completed more quickly. There are two major factors that influence performance: the speed of the CPUs themselves, and the speed of their access to memory. In a cluster, it’s fairly obvious that a given CPU will have fastest access to the RAM within the same computer (node). Perhaps more surprisingly, similar issues are relevant on a typical multicore laptop, due to differences in the speed of main memory and the cache. Consequently, a good multiprocessing environment should allow control over the “ownership” of a chunk of memory by a particular CPU. Julia provides a multiprocessing environment based on message passing to allow programs to run on multiple processes in separate memory domains at once.

Julia’s implementation of message passing is different from other environments such as MPI [1]. Communication in Julia is generally “one-sided”, meaning that the programmer needs to explicitly manage only one process in a two-process operation. Furthermore, these operations typically do not look like “message send” and “message receive” but rather resemble higher-level operations like calls to user functions.

Parallel programming in Julia is built on two primitives: remote references and remote calls. A remote reference is an object that can be used from any process to refer to an object stored on a particular process. A remote call is a request by one process to call a certain function on certain arguments on another (possibly the same) process.

Remote references come in two flavors: Future and RemoteChannel.

A remote call returns a Future to its result. Remote calls return immediately; the process that made the call proceeds to its next operation while the remote call happens somewhere else. You can wait for a remote call to finish by calling wait() on the returned Future, and you can obtain the full value of the result using fetch().

On the other hand, RemoteChannel s are rewritable. For example, multiple processes can co-ordinate their processing by referencing the same remote Channel.

Each process has an associated identifier. The process providing the interactive Julia prompt always has an id equal to 1. The processes used by default for parallel operations are referred to as “workers”. When there is only one process, process 1 is considered a worker. Otherwise, workers are considered to be all processes other than process 1.

Let’s try this out. Starting with julia -p n provides n worker processes on the local machine. Generally it makes sense for n to equal the number of CPU cores on the machine.

n = n - 1
end


## Remote References and Distributed Garbage Collection¶

Objects referred to by remote references can be freed only when all held references in the cluster are deleted.

The node where the value is stored keeps track of which of the workers have a reference to it. Every time a RemoteChannel or a (unfetched) Future is serialized to a worker, the node pointed to by the reference is notified. And every time a RemoteChannel or a (unfetched) Future is garbage collected locally, the node owning the value is again notified.

The notifications are done via sending of “tracking” messages—an “add reference” message when a reference is serialized to a different process and a “delete reference” message when a reference is locally garbage collected.

Since Futures are write-once and cached locally, the act of fetch()ing a Future also updates reference tracking information on the node owning the value.

The node which owns the value frees it once all references to it are cleared.

With Futures, serializing an already fetched Future to a different node also sends the value since the original remote store may have collected the value by this time.

It is important to note that when an object is locally garbage collected depends on the size of the object and the current memory pressure in the system.

In case of remote references, the size of the local reference object is quite small, while the value stored on the remote node may be quite large. Since the local object may not be collected immediately, it is a good practice to explicitly call finalize() on local instances of a RemoteChannel, or on unfetched Futures. Since calling fetch() on a Future also removes its reference from the remote store, this is not required on fetched Futures. Explicitly calling finalize() results in an immediate message sent to the remote node to go ahead and remove its reference to the value.

Once finalized, a reference becomes invalid and cannot be used in any further calls.

## Shared Arrays¶

Shared Arrays use system shared memory to map the same array across many processes. While there are some similarities to a DArray, the behavior of a SharedArray is quite different. In a DArray, each process has local access to just a chunk of the data, and no two processes share the same chunk; in contrast, in a SharedArray each “participating” process has access to the entire array. A SharedArray is a good choice when you want to have a large amount of data jointly accessible to two or more processes on the same machine.

SharedArray indexing (assignment and accessing values) works just as with regular arrays, and is efficient because the underlying memory is available to the local process. Therefore, most algorithms work naturally on SharedArrays, albeit in single-process mode. In cases where an algorithm insists on an Array input, the underlying array can be retrieved from a SharedArray by calling sdata(). For other AbstractArray types, sdata() just returns the object itself, so it’s safe to use sdata() on any Array-type object.

The constructor for a shared array is of the form:

SharedArray(T::Type, dims::NTuple; init=false, pids=Int[])


which creates a shared array of a bits type T and size dims across the processes specified by pids. Unlike distributed arrays, a shared array is accessible only from those participating workers specified by the pids named argument (and the creating process too, if it is on the same host).

If an init function, of signature initfn(S::SharedArray), is specified, it is called on all the participating workers. You can specify that each worker runs the init function on a distinct portion of the array, thereby parallelizing initialization.

Here’s a brief example:

julia> addprocs(3)
3-element Array{Int64,1}:
2
3
4

julia> S = SharedArray(Int, (3,4), init = S -> S[Base.localindexes(S)] = myid())
3×4 SharedArray{Int64,2}:
2  2  3  4
2  3  3  4
2  3  4  4

julia> S[3,2] = 7
7

julia> S
3×4 SharedArray{Int64,2}:
2  2  3  4
2  3  3  4
2  7  4  4


Base.localindexes() provides disjoint one-dimensional ranges of indexes, and is sometimes convenient for splitting up tasks among processes. You can, of course, divide the work any way you wish:

julia> S = SharedArray(Int, (3,4), init = S -> S[indexpids(S):length(procs(S)):length(S)] = myid())
3×4 SharedArray{Int64,2}:
2  2  2  2
3  3  3  3
4  4  4  4


Since all processes have access to the underlying data, you do have to be careful not to set up conflicts. For example:

@sync begin
for p in procs(S)
@async begin
remotecall_wait(fill!, p, S, p)
end
end
end


would result in undefined behavior. Because each process fills the entire array with its own pid, whichever process is the last to execute (for any particular element of S) will have its pid retained.

As a more extended and complex example, consider running the following “kernel” in parallel:

q[i,j,t+1] = q[i,j,t] + u[i,j,t]


In this case, if we try to split up the work using a one-dimensional index, we are likely to run into trouble: if q[i,j,t] is near the end of the block assigned to one worker and q[i,j,t+1] is near the beginning of the block assigned to another, it’s very likely that q[i,j,t] will not be ready at the time it’s needed for computing q[i,j,t+1]. In such cases, one is better off chunking the array manually. Let’s split along the second dimension:

# This function retuns the (irange,jrange) indexes assigned to this worker
@everywhere function myrange(q::SharedArray)
idx = indexpids(q)
if idx == 0
# This worker is not assigned a piece
return 1:0, 1:0
end
nchunks = length(procs(q))
splits = [round(Int, s) for s in linspace(0,size(q,2),nchunks+1)]
1:size(q,1), splits[idx]+1:splits[idx+1]
end

# Here's the kernel
@everywhere function advection_chunk!(q, u, irange, jrange, trange)
@show (irange, jrange, trange)  # display so we can see what's happening
for t in trange, j in jrange, i in irange
q[i,j,t+1] = q[i,j,t] +  u[i,j,t]
end
q
end

# Here's a convenience wrapper for a SharedArray implementation


Now let’s compare three different versions, one that runs in a single process:

advection_serial!(q, u) = advection_chunk!(q, u, 1:size(q,1), 1:size(q,2), 1:size(q,3)-1)


one that uses @parallel:

function advection_parallel!(q, u)
for t = 1:size(q,3)-1
@sync @parallel for j = 1:size(q,2)
for i = 1:size(q,1)
q[i,j,t+1]= q[i,j,t] + u[i,j,t]
end
end
end
q
end


and one that delegates in chunks:

function advection_shared!(q, u)
@sync begin
for p in procs(q)
end
end
q
end


If we create SharedArrays and time these functions, we get the following results (with julia -p 4):

q = SharedArray(Float64, (500,500,500))
u = SharedArray(Float64, (500,500,500))

# Run once to JIT-compile

# Now the real results:
(irange,jrange,trange) = (1:500,1:500,1:499)
830.220 milliseconds (216 allocations: 13820 bytes)

2.495 seconds      (3999 k allocations: 289 MB, 2.09% gc time)

From worker 2:       (irange,jrange,trange) = (1:500,1:125,1:499)
From worker 4:       (irange,jrange,trange) = (1:500,251:375,1:499)
From worker 3:       (irange,jrange,trange) = (1:500,126:250,1:499)
From worker 5:       (irange,jrange,trange) = (1:500,376:500,1:499)
238.119 milliseconds (2264 allocations: 169 KB)


The biggest advantage of advection_shared! is that it minimizes traffic among the workers, allowing each to compute for an extended time on the assigned piece.

## Shared Arrays and Distributed Garbage Collection¶

Like remote references, shared arrays are also dependent on garbage collection on the creating node to release references from all participating workers. Code which creates many short lived shared array objects would benefit from explicitly finalizing these objects as soon as possible. This results in both memory and file handles mapping the shared segment being released sooner.

## ClusterManagers¶

The launching, management and networking of Julia processes into a logical cluster is done via cluster managers. A ClusterManager is responsible for

• launching worker processes in a cluster environment
• managing events during the lifetime of each worker
• optionally, providing data transport

A Julia cluster has the following characteristics:

• The initial Julia process, also called the master, is special and has an id of 1.
• Only the master process can add or remove worker processes.
• All processes can directly communicate with each other.

Connections between workers (using the in-built TCP/IP transport) is established in the following manner:

• addprocs() is called on the master process with a ClusterManager object.
• addprocs() calls the appropriate launch() method which spawns required number of worker processes on appropriate machines.
• Each worker starts listening on a free port and writes out its host and port information to STDOUT.
• The cluster manager captures the STDOUT of each worker and makes it available to the master process.
• The master process parses this information and sets up TCP/IP connections to each worker.
• Every worker is also notified of other workers in the cluster.
• Each worker connects to all workers whose id is less than the worker’s own id.
• In this way a mesh network is established, wherein every worker is directly connected with every other worker.

While the default transport layer uses plain TCPSocket, it is possible for a Julia cluster to provide its own transport.

Julia provides two in-built cluster managers:

LocalManager is used to launch additional workers on the same host, thereby leveraging multi-core and multi-processor hardware.

Thus, a minimal cluster manager would need to:

• be a subtype of the abstract ClusterManager
• implement launch(), a method responsible for launching new workers
• implement manage(), which is called at various events during a worker’s lifetime (for example, sending an interrupt signal)

addprocs(manager::FooManager) requires FooManager to implement:

function launch(manager::FooManager, params::Dict, launched::Array, c::Condition)
...
end

function manage(manager::FooManager, id::Integer, config::WorkerConfig, op::Symbol)
...
end


As an example let us see how the LocalManager, the manager responsible for starting workers on the same host, is implemented:

immutable LocalManager <: ClusterManager
np::Integer
end

function launch(manager::LocalManager, params::Dict, launched::Array, c::Condition)
...
end

function manage(manager::LocalManager, id::Integer, config::WorkerConfig, op::Symbol)
...
end


The launch() method takes the following arguments:

• manager::ClusterManager: the cluster manager that addprocs() is called with
• params::Dict: all the keyword arguments passed to addprocs()
• launched::Array: the array to append one or more WorkerConfig objects to
• c::Condition: the condition variable to be notified as and when workers are launched

The launch() method is called asynchronously in a separate task. The termination of this task signals that all requested workers have been launched. Hence the launch() function MUST exit as soon as all the requested workers have been launched.

Newly launched workers are connected to each other, and the master process, in an all-to-all manner. Specifying the command argument --worker <cookie> results in the launched processes initializing themselves as workers and connections being set up via TCP/IP sockets. Optionally, --bind-to bind_addr[:port] may also be specified to enable other workers to connect to it at the specified bind_addr and port. This is useful for multi-homed hosts.

As an example of a non-TCP/IP transport, an implementation may choose to use MPI, in which case --worker must NOT be specified. Instead, newly launched workers should call init_worker(cookie) before using any of the parallel constructs.

For every worker launched, the launch() method must add a WorkerConfig object (with appropriate fields initialized) to launched

type WorkerConfig
# Common fields relevant to all cluster managers
io::Nullable{IO}
host::Nullable{AbstractString}
port::Nullable{Integer}

# Used when launching additional workers at a host
count::Nullable{Union{Int, Symbol}}
exename::Nullable{AbstractString}
exeflags::Nullable{Cmd}

# External cluster managers can use this to store information at a per-worker level
# Can be a dict if multiple fields need to be stored.
userdata::Nullable{Any}

# SSHManager / SSH tunnel connections to workers
tunnel::Nullable{Bool}
sshflags::Nullable{Cmd}
max_parallel::Nullable{Integer}

connect_at::Nullable{Any}

.....
end


Most of the fields in WorkerConfig are used by the inbuilt managers. Custom cluster managers would typically specify only io or host / port:

• If io is specified, it is used to read host/port information. A Julia worker prints out its bind address and port at startup. This allows Julia workers to listen on any free port available instead of requiring worker ports to be configured manually.
• If io is not specified, host and port are used to connect.
• count, exename and exeflags are relevant for launching additional workers from a worker. For example, a cluster manager may launch a single worker per node, and use that to launch additional workers.
• count with an integer value n will launch a total of n workers.
• count with a value of :auto will launch as many workers as the number of cores on that machine.
• exename is the name of the julia executable including the full path.
• exeflags should be set to the required command line arguments for new workers.
• tunnel, bind_addr, sshflags and max_parallel are used when a ssh tunnel is required to connect to the workers from the master process.
• userdata is provided for custom cluster managers to store their own worker-specific information.

manage(manager::FooManager, id::Integer, config::WorkerConfig, op::Symbol) is called at different times during the worker’s lifetime with appropriate op values:

• with :register/:deregister when a worker is added / removed from the Julia worker pool.
• with :interrupt when interrupt(workers) is called. The ClusterManager should signal the appropriate worker with an interrupt signal.
• with :finalize for cleanup purposes.

## Cluster Managers with Custom Transports¶

Replacing the default TCP/IP all-to-all socket connections with a custom transport layer is a little more involved. Each Julia process has as many communication tasks as the workers it is connected to. For example, consider a Julia cluster of 32 processes in an all-to-all mesh network:

• Each Julia process thus has 31 communication tasks.
• Each task handles all incoming messages from a single remote worker in a message-processing loop.
• The message-processing loop waits on an IO object (for example, a TCPSocket in the default implementation), reads an entire message, processes it and waits for the next one.
• Sending messages to a process is done directly from any Julia task—not just communication tasks—again, via the appropriate IO object.

Replacing the default transport requires the new implementation to set up connections to remote workers and to provide appropriate IO objects that the message-processing loops can wait on. The manager-specific callbacks to be implemented are:

connect(manager::FooManager, pid::Integer, config::WorkerConfig)
kill(manager::FooManager, pid::Int, config::WorkerConfig)


The default implementation (which uses TCP/IP sockets) is implemented as connect(manager::ClusterManager, pid::Integer, config::WorkerConfig).

connect should return a pair of IO objects, one for reading data sent from worker pid, and the other to write data that needs to be sent to worker pid. Custom cluster managers can use an in-memory BufferStream as the plumbing to proxy data between the custom, possibly non-IO transport and Julia’s in-built parallel infrastructure.

A BufferStream is an in-memory IOBuffer which behaves like an IO—it is a stream which can be handled asynchronously.

Folder examples/clustermanager/0mq contains an example of using ZeroMQ to connect Julia workers in a star topology with a 0MQ broker in the middle. Note: The Julia processes are still all logically connected to each other—any worker can message any other worker directly without any awareness of 0MQ being used as the transport layer.

When using custom transports:

• Julia workers must NOT be started with --worker. Starting with --worker will result in the newly launched workers defaulting to the TCP/IP socket transport implementation.
• For every incoming logical connection with a worker, Base.process_messages(rd::IO, wr::IO)() must be called. This launches a new task that handles reading and writing of messages from/to the worker represented by the IO objects.
• init_worker(cookie, manager::FooManager) MUST be called as part of worker process initialization.
• Field connect_at::Any in WorkerConfig can be set by the cluster manager when launch() is called. The value of this field is passed in in all connect() callbacks. Typically, it carries information on how to connect to a worker. For example, the TCP/IP socket transport uses this field to specify the (host, port) tuple at which to connect to a worker.

kill(manager, pid, config) is called to remove a worker from the cluster. On the master process, the corresponding IO objects must be closed by the implementation to ensure proper cleanup. The default implementation simply executes an exit() call on the specified remote worker.

examples/clustermanager/simple is an example that shows a simple implementation using UNIX domain sockets for cluster setup.

## Network Requirements for LocalManager and SSHManager¶

Julia clusters are designed to be executed on already secured environments on infrastructure such as local laptops, departmental clusters, or even the cloud. This section covers network security requirements for the inbuilt LocalManager and SSHManager:

• The master process does not listen on any port. It only connects out to the workers.

• Each worker binds to only one of the local interfaces and listens on the first free port starting from 9009.

• LocalManager, used by addprocs(N), by default binds only to the loopback interface. This means that workers started later on remote hosts (or by anyone with malicious intentions) are unable to connect to the cluster. An addprocs(4) followed by an addprocs(["remote_host"]) will fail. Some users may need to create a cluster comprising their local system and a few remote systems. This can be done by explicitly requesting LocalManager to bind to an external network interface via the restrict keyword argument: addprocs(4; restrict=false).

• SSHManager, used by addprocs(list_of_remote_hosts), launches workers on remote hosts via SSH. By default SSH is only used to launch Julia workers. Subsequent master-worker and worker-worker connections use plain, unencrypted TCP/IP sockets. The remote hosts must have passwordless login enabled. Additional SSH flags or credentials may be specified via keyword argument sshflags.

• addprocs(list_of_remote_hosts; tunnel=true, sshflags=<ssh keys and other flags>) is useful when we wish to use SSH connections for master-worker too. A typical scenario for this is a local laptop running the Julia REPL (i.e., the master) with the rest of the cluster on the cloud, say on Amazon EC2. In this case only port 22 needs to be opened at the remote cluster coupled with SSH client authenticated via public key infrastructure (PKI). Authentication credentials can be supplied via sshflags, for example sshflags=-e <keyfile>.

Note that worker-worker connections are still plain TCP and the local security policy on the remote cluster must allow for free connections between worker nodes, at least for ports 9009 and above.

Securing and encrypting all worker-worker traffic (via SSH) or encrypting individual messages can be done via a custom ClusterManager.

## Specifying Network Topology (Experimental)¶

The keyword argument topology passed to addprocs is used to specify how the workers must be connected to each other:

• :all_to_all, the default: all workers are connected to each other.
• :master_slave: only the driver process, i.e. pid 1, has connections to the workers.
• :custom: the launch method of the cluster manager specifies the connection topology via the fields ident and connect_idents in WorkerConfig. A worker with a cluster-manager-provided identity ident will connect to all workers specified in connect_idents.

Currently, sending a message between unconnected workers results in an error. This behaviour, as with the functionality and interface, should be considered experimental in nature and may change in future releases.

In addition to tasks, remote calls, and remote references, Julia from v0.5 forwards will natively support multi-threading. Note that this section is experimental and the interfaces may change in the future.

### Setup¶

By default, Julia starts up with a single thread of execution. This can be verified by using the command Threads.nthreads():

julia> Threads.nthreads()
1


The number of threads Julia starts up with is controlled by an environment variable called JULIA_NUM_THREADS. Now, let’s start up Julia with 4 threads:

export JULIA_NUM_THREADS=4


(The above command works on bourne shells on Linux and OSX. Note that if you’re using a C shell on these platforms, you should use the keyword set instead of export. If you’re on Windows, start up the command line in the location of julia.exe and use set instead of export.)

Let’s verify there are 4 threads at our disposal.

julia> Threads.nthreads()
4


But we are currently on the master thread. To check, we use the command Threads.threadid()

julia> Threads.threadid()
1


### The @threads Macro¶

Let’s work a simple example using our native threads. Let us create an array of zeros:

julia> a = zeros(10)
10-element Array{Float64,1}:
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0


Let us operate on this array simultaneously using 4 threads. We’ll have each thread write its thread ID into each location.

Julia supports parallel loops using the Threads.@threads macro. This macro is affixed in front of a for loop to indicate to Julia that the loop is a multi-threaded region:

Threads.@threads for i = 1:10
end


The iteration space is split amongst the threads, after which each thread writes its thread ID to its assigned locations:

julia> a
10-element Array{Float64,1}:
1.0
1.0
1.0
2.0
2.0
2.0
3.0
3.0
4.0
4.0


Note that Threads.@threads does not have an optional reduction parameter like @parallel.

All I/O tasks, timers, REPL commands, etc are multiplexed onto a single OS thread via an event loop. A patched version of libuv (http://docs.libuv.org/en/v1.x/) provides this functionality. Yield points provide for co-operatively scheduling multiple tasks onto the same OS thread. I/O tasks and timers yield implicitly while waiting for the event to occur. Calling yield() explicitly allows for other tasks to be scheduled.

Thus, a task executing a ccall effectively prevents the Julia scheduler from executing any other tasks till the call returns. This is true for all calls into external libraries. Exceptions are calls into custom C code that call back into Julia (which may then yield) or C code that calls jl_yield() (C equivalent of yield()).

Note that while Julia code runs on a single thread (by default), libraries used by Julia may launch their own internal threads. For example, the BLAS library may start as many threads as there are cores on a machine.

The @threadcall macro addresses scenarios where we do not want a ccall to block the main Julia event loop. It schedules a C function for execution in a separate thread. A threadpool with a default size of 4 is used for this. The size of the threadpool is controlled via environment variable UV_THREADPOOL_SIZE. While waiting for a free thread, and during function execution once a thread is available, the requesting task (on the main Julia event loop) yields to other tasks. Note that @threadcall does not return till the execution is complete. From a user point of view, it is therefore a blocking call like other Julia APIs.

It is very important that the called function does not call back into Julia.

@threadcall may be removed/changed in future versions of Julia.

Footnotes

 [1] In this context, MPI refers to the MPI-1 standard. Beginning with MPI-2, the MPI standards committee introduced a new set of communication mechanisms, collectively referred to as Remote Memory Access (RMA). The motivation for adding RMA to the MPI standard was to facilitate one-sided communication patterns. For additional information on the latest MPI standard, see http://mpi-forum.org/docs.