5.10. Interacting with asynchronous parallel tasks in IPython
This is one of the 100+ free recipes of the IPython Cookbook, Second Edition, by Cyrille Rossant, a guide to numerical computing and data science in the Jupyter Notebook. The ebook and printed book are available for purchase at Packt Publishing.
▶ Text on GitHub with a CC-BY-NC-ND license
▶ Code on GitHub with a MIT license
▶ Go to Chapter 5 : High-Performance Computing
▶ Get the Jupyter notebook
In this recipe, we will show how to interact with asynchronous tasks running in parallel with ipyparallel.
Getting ready
You need to start the IPython engines (see the previous recipe). The simplest option is to launch them from the IPython Clusters tab in the Notebook dashboard. In this recipe, we use four engines.
How to do it...
1. Let's import a few modules:
import sys
import time
import ipyparallel
import ipywidgets
from IPython.display import clear_output, display
2. We create a Client
:
rc = ipyparallel.Client()
3. Now, we create a load-balanced view on the IPython engines:
view = rc.load_balanced_view()
4. We define a simple function for our parallel tasks:
def f(x):
import time
time.sleep(.1)
return x * x
5. We will run this function on 100 integer numbers in parallel:
numbers = list(range(100))
6. We execute f
on our list numbers
in parallel across all of our engines, using map_async()
. This function immediately returns an AsyncResult
object that allows us to interactively retrieve information about the tasks:
ar = view.map_async(f, numbers)
7. This object has a metadata
attribute: a list of dictionaries for all engines. We can get the date of submission and completion, the status, the standard output and error, and other information:
ar.metadata[0]
{'after': None,
'completed': None,
'data': {},
...
'submitted': datetime.datetime(2017, ...)}
8. Iterating over the AsyncResult
instance works normally; the iteration progresses in real-time while the tasks are being completed:
for i in ar:
print(i, end=', ')
0, 1, 4, ..., 9801,
9. Now, we create a simple progress bar for our asynchronous tasks. The idea is to create a loop polling for the tasks' status at every second. An IntProgressWidget
widget is updated in real-time and shows the progress of the tasks:
def progress_bar(ar):
# We create a progress bar.
w = ipywidgets.IntProgress()
# The maximum value is the number of tasks.
w.max = len(ar.msg_ids)
# We display the widget in the output area.
display(w)
# Repeat:
while not ar.ready():
# Update the widget's value with the
# number of tasks that have finished
# so far.
w.value = ar.progress
time.sleep(.1)
w.value = w.max
ar = view.map_async(f, numbers)
The progress bar is shown in the following screenshot:
progress_bar(ar)
How it works...
AsyncResult
instances are returned by asynchronous parallel functions. They implement several useful attributes and methods, notably:
elapsed
: Elapsed time since submissionprogress
: Number of tasks that have competed so farserial_time
: Sum of the computation time of all of the tasks done in parallelmetadata
: Dictionary with further information about the taskready()
: Returns whether the call has finishedsuccessful()
: Returns whether the call has completed without raising an exception (an exception is raised if the task has not completed yet)wait()
: Blocks until the tasks have completed (there is an optionaltimeout
argument)get()
: Blocks until the tasks have completed and returns the result (there is an optionaltimeout
argument)
There's more...
Here are a few references:
- Documentation of the AsyncResult class available at http://ipyparallel.readthedocs.io/en/latest/asyncresult.html
- Documenation of the AsyncResult of the native multiprocessing module at https://docs.python.org/3/library/multiprocessing.html#multiprocessing.pool.AsyncResult
- Documentation of the task interface available at http://ipyparallel.readthedocs.io/en/latest/task.html
See also
- Distributing your code across multiple cores with IPython