Saturday 3 February 2018

Designing Async Task Dispatch Library From Scratch (Part-2)

Working with Futures, Executors and Tasks

What is a Future

In layman terms, a future is an object which can hold the value or result of some computation done asynchronously. What does that mean ?
Consider the below example

def some_task():
    time.sleep(5)
    return 42

ret1 = some_task()              #ret1 will be 42
ret2 = async_compute(some_task) #what should be ret2 ?

ret1 would be assigned with some value only after 5 seconds and during that time the thread executing the code would be blocked and cannot do anything else. This is called synchronous way of doing things. Other examples of blocking calls from your network programming 101 are accept, connect, read, write etc.

But what about ret2 ? If we are to do asynchronous programming we cannot block the executing thread to block for 5 seconds, we can do other tasks meanwhile. To achieve that, async_compute might very well dispatch off the task to another thread. If we do that, there is no way we can return the return value of some_task to ret2 (Hold off your idea about using coroutines. We will get there).

So what async_compute can return is a placeholder where the computed result would be put at some future time. This placeholder is what commonly known as future!

There are different ways to get the computed result out from a future object:

  • Via a blocking call
  • Via callbacks
Blocking call example:

ret = async_compute(some_task)
assert isinstance(ret, Future)

print (ret.result()) # Will print 42

This example is still equivalent to the blocking example as the call to result would block till the result from the asynchronous computation is not available. In our example it would block for 5 seconds.

Callback example:

def print_result(fut):
    assert isinstance(fut, Future)
    print (fut.result()) # This will not block

ret = async_compute(some_task)
assert isinstance(ret, Future)

ret.add_done_callback(print_result)

print ("Continuing with other tasks")

What we are doing here is providing a callback to the future which would get executed when the result of the asynchronous computation is made available to the future.

Future - Building Blocks

In this section we will see what are the requirements that a Future type should have and try to create one.

States of a future object
We will be discussing only the bare minimum states required for a future. There may be other states involved based upon the implementation and feature provision.

  • Pending - The future is constructed with this state. It basically indicates that the asynchronous computation has either not started or finished yet.
  • Cancelled - The future is cancelled and all the completion callbacks are called. The future that we would be implementing does not have the ability to cancel the running asynchronous process though.
  • Finished - The future has the result of the asynchronous operation ready. A future is ready when the result of the async operation is available or if the async operation finished with an exception.
Client APIs of a future type
  1. add_done_callback - For calling a function when the future state is 'Finished' or 'Cancelled'. The function must take only one argument which is the future object. The client code can access the result or the exception from the passed future argument. The purpose of adding callbacks or done callbacks to a future is to do some action in response to the completion of the async operation or many say it chaining of operations.
          For eg:
def some_long_task():
    time.sleep(10)
    return 43

def another_long_task():
    time.sleep(5)
    return 68

def after_another_long_task(fut):
    res = fut1.result()
    print ("All computations finished. Result is {}".format(res))


def after_long_task(fut):
    res = fut.result() # This would not block, result is already available
    # Trigger another asynchronous computation
    fut1 = async_compute(another_long_task)
    fut1.add_done_callback(after_another_long_task)

fut = async_compute(some_long_task)
fut.add_done_callback(after_long_task)
       
      2. set_result - This API must usually be only called by the asynchronous operation which created the future. It basically stores the computed result into the future, sets the state to 'Finished' and calls all the added callbacks.

      3. set_exception - If the asynchronous executor encounters an exception while performing our operation, it will set the exception inside the future by calling this function. The state of the future would be still set to 'Finished' and will call all the added done callbacks.

      4. cancel - Cancels the future by setting the state to 'Cancelled' and calls all the added done callbacks.

      5. result -  Blocks (or upto provided timeout) until the result is available.

      6. exception - Blocks (or upto provided timeout) until the exception is set in the future.

These APIs needs to be thread safe if they are to be called from different threads. We will be using Mutex + Condition variable to achieve it.

An Implementation

# coding: utf-8

# In[11]:


import threading
import time


# Future states:
#
# An instance of the future could be in any of these states.
# 1. PENDING : The future does not have the result/exception for the corresponding work.
# 2. RUNNING : TBD
# 3. CANCELLED : The future got cancelled before the result of the associated work was computed.
# 4. FINISHED : The associated work got finished and resulted in giving out a value or exception.

# In[12]:



PENDING   = 'pending'
RUNNING   = 'running'
CANCELLED = 'cancelled'
FINISHED  = 'finished'

FUTURE_STATES = [
    PENDING,
    RUNNING,
    CANCELLED,
    FINISHED
]


# In[13]:


class FutureCancelledError(Exception):
    """"""
    def __init__(self):
        pass


class FutureTimeoutError(Exception):
    """"""
    def __init__(self):
        pass



# In[14]:


class Future(object):
    def __init__(self):
        """"""
        self._state = PENDING
        self._condition = threading.Condition()
        self._done_callbacks = []

        self._result = None
        self._exception = None

    def add_done_callback(self, cb):
        """
        Add the callback to be executed when the future state becomes
        cancelled/finished
        """
        with self._condition:
            if self._state not in [CANCELLED, FINISHED]:
                self._done_callbacks.append(cb)
                return
        #Call immediately if result/exception already set
        cb(self)


    def result(self, timeout=None):
        """
        Blocking call on the calling thread.

        timeout: time to wait for the result to be ready.

        Throws:
        FutureCancelledError if the state of future was CANCELLED
        or became CANCELLED later.

        FutureTimeoutError if future did not become ready before
        the timeout.
        """
        with self._condition:
            if self._state in [CANCELLED]:
                raise FutureCancelledError()

            if self._state == FINISHED:
                # Already done, return the result
                return self._result

            self._condition.wait(timeout)

            if self._state in [CANCELLED]:
                raise FutureCancelledError()

            if self._state == FINISHED:
                return self._result
            else:
                return FutureTimeoutError()
        pass

    def exception(self, timeout=None):
        """
        Blocking call on the calling thread.
        """
        with self._condition:
            if self._state in [CANCELLED]:
                raise FutureCancelledError()

            if self._state == FINISHED:
                #Already done. Return the exception
                return self._exception

            self._condition.wait(timeout)

            if self._state in [CANCELLED]:
                raise FutureCancelledError()

            if self._state == FINISHED:
                return self._exception
            else:
                raise FutureTimeoutError()


    def done(self):
        """Future is finished"""
        with self._condition:
            return self._state in [CANCELLED, FINISHED]

    def cancelled(self):
        """ Is the future cancelled or not"""
        with self._condition:
            return self._state == CANCELLED

    def cancel(self):
        """Cancel the future if not already finished or running"""
        with self._condition:
            if self._state in [RUNNING, FINISHED]:
                return False
            self._set_state(CANCELLED)

            self._condition.notify_all()

        self._execute_done_callbacks()
        return True

    def set_result(self, result):
        """
        Sets the result of the work associated with this future.
        """
        with self._condition:
            self._result = result
            self._state = FINISHED

            self._condition.notify_all()

        self._execute_done_callbacks()


    def set_exception(self, exp):
        """
        Sets the exception that occurred while performing
        the work associated with this future.
        """
        with self._condition:
            self._exception = exp
            self._state = FINISHED

            self._condition.notify_all()

        self._execute_done_callbacks()

    def _set_state(self, state):
        """
        Sets the state.
        Assumes that lock is taken
        """
        self._state = state

    def _execute_done_callbacks(self):
        for cb in self._done_callbacks:
            try:
                cb(self)
            except Exception as e:
                print ("ERROR: {}".format(str(e)))



Executors

Now that we understand futures and have an implementation of it ready with us, let us use it. For that, we need to have some way to submit our tasks to "some" thing which would return us with a future object instance corresponding the to the submitted task.
This "some" thing is what we call as Executors. They are responsible for execution of the submitted tasks by the client code. There are different types of executors that one can provide:

  • Thread Executor
  • Thread Pool Executor
  • Multi Process Executor
  • Queuing Executor
NOTE: There may be more varieties of executors that I am not aware of.

For demonstration purposes we will use an extremely simple executor interface.

class Executor(object):
    """
    An interface for Executor.
    A concrete implementation of this class
    is expected to override all methods of this class.
    """
    def __init__(self):
        pass

    def submit(self, task, *args, **kwargs):
        """
        Takes a task to be executed with the arguments
        it requires.
        Returns a Future object instance.
        """
        raise NotImplementedError()


With this interface in hand let us create a simple Thread executor, which will simply execute the submitted task on a new thread.

class ThreadedExecutor(Executor):
    def __init__(self):
        super(self).__init__()
        self._thread = threading.Thread(target=self._runner)

    def submit(self, task, *args, **kwargs):
        """
        Create a new thread and run the _runner function.
        Execute the task in the _runner and return the 
        future.
        """
        f = Future()
        t = threading.Thread(target=self._runner, args=(f, task, args, kwargs,))
        t.start()
        return f

    def _runner(self, fut, task, args, kwargs):
        v = task(*args, **kwargs)
        fut.set_result(v)
        pass


This is pretty easy to understand now. Inside the submit function we do the following:

  • Create an instance of the Future.
  • Create a new thread with target function to run as _runner. Pass the future instance, task to the _runner.
  • Return the future. The client code now has the future.
  • On a separate thread _runner executes the task and once complete will store the result in the passed future instance. This results in execution of all the done callbacks methods registered with that future as explained in the previous section.

NOTE: Exception handling is not shown in the above example.
Now, the client code:
def my_long_running_task():
    time.sleep(100)
    return 42

def print_task(future):
    print (future.result()) #Result will not block here.

def schedule_task():
    future = ThreadedExecutor().submit(my_long_running_task)
    future.add_done_callback(print_result)

if __name__ == "__main__":
    schedule_task()


I hope it the use of futures make more sense in the context of Executors.


The 'Task' at hand

This is where we get back to coroutines and see its brilliance in action.

NOTE: The things I am presenting here is basically influenced by David Bleazy's talk on coroutines. You should definitely check that out.

The previous example using future with executor was good in that we could do asynchronous computation in fairly straightforward manner, but still it is not easy. To get the result at the call site i.e. where the executor was run, we either need to poll continuously to check if the future is ready with result or get blocked by calling result on the future object. Since we hate blocking or polling, we attached a done callback to the future.

We need something like this:

def perform_async_task():
    result = <magic> ThreadedExecutor().submit(my_long_running_task)
    print (result)
 

Isn't that sweet ? It is almost like a synchronous code. Assuming it works like this, if you think deeper you can see that the function when called is getting executed in the context of 2 threads. The thread which calls the perform_async_task function and the thread which executes the my_long_running_task function.

def perform_async_task():
    result = <magic> ThreadedExecutor().submit(my_long_running_task) <-- Executor thread
    print (result)  <---------- Client calling thread

Mind blown ? Lets see how to get this actually working. What should we replace the "<magic>" tag with ? Since we are going to talk about coroutines, lets replace it with a yield for now :)

In the part-1 of the series we have already seen how to control a generator function from outside using the generator handle using the next and send functions. If you have not or forgot I highly recommend going through that again.

With that knowledge we know that RHS expression of a yield statement is what is executed on next (or send based upon the state of generatorand the value sent using send is what gets assigned to the LHS of the assignment statement.
The thing to note here is that some statement like

result = yield ThreadedExecutor().submit(my_long_running_task)

requires 2 separate operations to finish (next and then send). Until then the coroutine is in suspended state. This is what makes it possible to write something like above. Lets see how it would work:

  • Consider 'G' is the generator object that we get on calling perform_async_task since it has a yield statement now (instead of magic).
  • future = next(G) would give me the future returned by the submit function of the Thread executor. Why ? Because thats the RHS expression of the yield.
  • future.add_done_callback(send_result) To the obtained future we will add a done callback which will get called when the executor has done its job.
  • And send_result is:
def send_result(future):
    send(G, future.result())


  • So when the send_result gets called, it will send the result wrapped inside future to the generator which is what get assigned to the LHS variable. Cool!!! Got it ?

Now we know what needs to be done manually to get the desired behaviour. Lets put it nicely and automate it using a class named Task.

Lets first jump into an implementation of the Task itself:

class Task(object):
    """
    Wraps a generator yielding a Future
    object and abstracts away the handling of future API
    """
    def __init__(self, gen):
        self.gen = gen

    def step(self, snd_val=None, exp=None):
        """"""
        try:
            if exp:
                self.gen.throw(exp)
            else:
                fut = self.gen.send(snd_val)
                fut.add_done_callback(self._fut_done_cb)

        except StopIteration as e:
            return e.value

    def _fut_done_cb(self, fut):
        try:
            result = fut.result()
            self.step(result, None)
        except Exception as e:
            self.step(None, e)
        pass


This is what in essence an asyncio Task is. It surely does more stuff, but this is the bare minimum stuff that gets the work done.

Lets explain its working through an example.

def perform_async_task():
    result = yield ThreadedExecutor().submit(my_long_running_task)
    return reult

t = Task(perform_async_task())
t.step()

## OR

Task(perform_async_task()).step()    


As one can see, we have got our asynchronous code look like a synchronous one without having any hairy callbacks and futures involved in the client code. All of those details are handled by the task.

What really happening is:

  • The constructor of the task takes the generator object and stores it. That make the Task a little more than a wrapper over a generator.
  • Now we call the step function of the task. The step function calls generator send function with None value for the first time which is basically next call on the generator.
  • This results in the execution of the RHS expression of the yield which is the executor submit function.
  • The submit function returns the future object associated with the asynchronous operation. This is what fut variable holds inside step function.
  • To the obtained future, we add a done callback which is basically the second member function of the Task class, _fut_done_cb. What this means is that, when the asynchronous operation finishes, _fut_done_cb will get called.
  • Note that until _fut_done_cb is called the coroutine perform_async_task is suspended. It is not blocked.
  • So when the asynchronous operation finishes, _fut_done_cb gets executed, which gets the result from the future and calls the step function again with the obtained result.
  • So, the step function executes again calls the send function on the generator, but this time with the result of the async operation instead of None.
  • The above send actually sets the result of yield to the LHS inside perform_async_task.
  • With that due to the absence of any further yields inside perform_async_task, send will throw a StopIteration exception.
  • The return value of perform_async_task will be returned by the step function.

Try it for yourself!