Distributed Computing?

The tiger said, "Excuse me, but I'm very hungry.

Do you think I could have tea with you?"

- Judith Kerr. (1968). The Tiger Who Came to Tea. Harper Collins

So far we have been looking at concurrent computing on a single physical machine. A natural extension of concurrent computing is distributed computing, where by computation is performed at a physical location which differs from that of the origin of the request to do so.

In time, as the demands placed upon one's software increase, it is often necessary to scale one's solution beyond more than one machine. Additionally, from an architectural/cost perspective, it is often most sensible to centralize resources, whether it be data (in the form of databases) or high performance hardware (such as gpu cluster machines, multi physical cpu machines etc), and have clients access that resource in a controlled manner on a service basis (with associated monitoring, SLA's etc). With Concurnas this is made easy since distributed computing is treated like a first class citizen in the language.

Since distributed computing is by nature non local to one's physical machine originating the need to perform distributed computing, a communication network must be utilized in order to initiate, coordinate and resolve that computation. This of course introduces a whole set of engineering problems which need to be overcome in order to create reliable software. Whereas with localized concurrent computing we need only concern ourselves with application level errors (errors in our logic or local parts of our computer we make use of), with distributed computing we must factor in inevitable errors of the communication network we are utilizing. Luckily Concurnas abstracts away most of the tedious boilerplate hard work allowing one to focus on what matters most.

Whereas with a localized machine, we are unlikely to receive notification (and therefore be able to react to) a catastrophic unexpected error since it would most likely manifest in the form of our entire machine failing or it being in such a state so as to render it unusable thereafter. But, as with distributed computing, when we are utilizing a communication network we must factor in this source of error, as it is at least an order of magnitude or three (10x - 1,000x) more probable to occur. For example network failure or the physical machines we're trying to make use of becoming unavailable (scheduled maintenance/downtime etc). To this effect we will see that Concurnas has special first class citizen syntactical support requiring capture and response to these forms of expected, unpredictable failure.

Creating a remote request?

We can connect to a remote host and perform remote computation via a remote isolate in much the same way as we can create a new isolate. The syntax is both elegant and concise. Let's look at an example:

rm = Remote('localhost')//establish a connection to a remote Concurnas server
rm.connect()//optionally called
result int: = {1+1}!(rm.onfail(e=>e.retry()))//result => 2
rm.close()//close wait for result of our isolate above to be set

Alternatively we can define the isolate to execute as follows:

rm = Remote('localhost')
rm.connect()//optionally called
def func() => 1+1
result int: = func()!(rm.onfail(e=>e.retry()))//result => 2
rm.close()//close wait for result of our isolate above to be set

In the above cases, first we establish a connection to a remote Concurnas server by creating a Remote connector with a hostname/ip address and optional port (which defaults to 42000 if unspecified): Remote('localhost'). Note that Remote is automatically imported.

We next call connect() to establish a network connection to our remote server. This is an optional step, it will be called upon first submission of an isolate if it a connection has not already been established.

Next we attempt to submit the isolate {1+1}! for execution remotely. This occurs as normal but except for us passing an executor on the right hand side of the bang ! operator. This executor is returned from the remote objects onfail method, which obliges the remote request initiator to handle the error case of a isolate task submission failing at inception by passing an object of type com.concurnas.dist.RemoteFailureHandler to handle this occurrence. RemoteFailureHandler is a SAM type thus we are able to use the compact lambda syntax to pass in a handler above. Our error handling strategy above is to simply re-attempt to submit the isolate task if it cannot be initially submit or a server side execution error occurs.

An object of type com.concurnas.dist.RemoteFailureContext is passed to the provided RemoteFailureHandler. This contains the following noteworthy utility methods:

  • retry() - Attempt to re run the isolate.

  • result() - Returns the ref to which the return value of the remotely spawned isolate would have its return value assigned. Returns null if the isolate does not return a value.

  • reason() - Returns the Exception indicating the cause of failure.

The error handler can even be stateful, for example:

from com.concurnas.dist import RemoteFailureContext, RemoteFailureContext

rem = Remote('localhost')
result = {1+1}!(rem.onfail( new RemoteFailureHandler{  
  attempts = 1
	
  def handle(e RemoteFailureContext){
    if(attempts++ > 10){//retry 10 times then give up
      e.result():setException(e.reason())
    }else{
      e.retry()
    }
  }
} ))
//result == 2

If the error handler itself throws an exception, then this exception will be set on the ref returned from the isolate. If no ref is returned from the isolate, then this exception will be passed to the default isolate error handler (this is generally not a desirable situation).

Remember that an exception occurring within the bounds of the isolate itself, does not constitute a distributed computing communication (or otherwise) failure, and is recorded on the exception value of the ref as normal - the error handler discussed here does not catch this.

Being an isolate, of which the resultant type is a ref, allows us to otherwise carry on with execution asynchronously until the point where the value of the ref is required. This is an incredibly powerful mechanism, it's feasible to envisage hundreds or even thousands of these remote requests being spawned across thousands of machines in order to solve a complex problem.

When we are finished with the remote connection we can call the close() method in order to wait for the result of any pending isolates and finally disconnect from the remote server once the results (positive or an exception) are known. By implementing the close method in this way, we can use remote connections within try-with-resources blocks. For example:

try(rm = Remote('localhost')){//close rm on completion of this block
  rm.connect()//optionally called
  result int: = {1+1}!(rem.onfail(e=>e.retry()))//result => 2
}

A second variant of the close() method exists for remote connections. The close() method takes a boolean parameter, close(hard boolean). If this is set to true then the connection will be closed immediately, any outstanding remotely spawned isolates will have an exception set on their resultant ref:

rm = Remote('localhost')
rm.connect()//optionally called
result int: = {1+1}!(rem.onfail(e=>e.retry()))//result => 2
rm.close(hard = true)//do not wait for the result above to complete

In the above case of an explicit hard disconnection any outstanding isolates will have an ClientPrematureDisconnection exception set on their result refs.

Some care should be taken with hard disconnections, since we are not waiting for the result of our submit isolate before closing our connection. Closing a connection will prevent as yet unstarted remote isolates from starting.

It is essential to call the close method when one is finished using the remote connection because just like other forms of io (e.g. files), failing to do so will keep the connection open resulting in a resource leak.

Common error handling techniques?

The com.concurnas.dist.Remote class comes with a number of pre packaged error handling mechanism. Here are two notable ones:

  • onFailRetry() - retry a remote request a fixed number of times (by default, 10) before giving up and setting the last reason for failure as the exception on the ref returned by the isolate.

  • onFailFail() - If the initial execution attempt fails, give up and set the provided exception on the ref returned by the isolate.

Network errors after submission?

So far we have covered errors occurring up to an isolate being successfully submit to, and executed by, a remote executor. Once the isolate has been received at the remote server and acknowledged as such a client remote connection will automatically reconnect to the remote server in the case of any networking disconnections.

Submit isolates will be executed by the remote server even in the case of unexpected client disconnection, the result of said computation will be cached until the client reconnects or the remote server itself is closed. If a client explicitly hard closes a connection, then any outstanding isolates will be terminated.

Creating a remote server?

A remote server can easily be created in Concurnas by using the following code:

from com.concurnas.lang.dist import RemoteServer
rmServer = RemoteServer()
remServer.startServer()//start remote server
//...use remote server...
rmServer.close()

A port may optionally be specified, if not then the server will be spawned on port: 42000.

It is essential to call the close method when one is finished using the remote server because just like other forms of I/O (e.g. files), failing to do so will keep all its client connections open resulting in a resource leak.

Security Managers?

Since when we are creating a remote server we are opening up our machine for external access. It's often beneficial to exert some additional control over this code which we may not fully trust. To this end one can pass a java.security.PermissionCollection to the com.concurnas.lang.dist.RemoteServer instance. For example:

from java.security import PermissionCollection, Permissions, AllPermission
perms = Permissions()
perms.add(new AllPermission())

remServer = RemoteServer(permissions = perms)

In the above example, in using a AllPermission object we are giving our remote code the privileges as the spawned server executing it. In practice this scope can of course be narrowed. For more details on security managers see here: Concurnas Security Managers and here: Java security managers

In the case where no security policy is defined above, remote code will be executed with the same privileges as the spawned server.

Request Dependencies?

Distributed computing in Concurnas is agnostic to the origin of the source code being submit. Remote servers are treated almost like dumb terminals though usually with a tremendous amount of compute power and exposure of special resources (databases, custom hardware etc). In practice this means that a connecting client may submit whatever code they like, with whatever code dependencies they deem appropriate.

The server will interact with the requesting client so as to obtain a copy of the bytecode required in order to execute the service if it doesn't already have a copy locally for the connected client. This is part of the protocol implemented by the client and server components of the distributed computing framework in Concurnas and occurs automatically behind the scenes. Any specified code will be executed by the remote server provided that it complies with any security policies defined. The dependency provision protocol itself is quite sophisticated and in the interests of performance is able to perform static code dependency analysis in order to determine and preemptively provide upfront the code that is required in order to execute a request.