Scheduling in iota2 ------------------- The control of task chaining in iota2 is very important. This is what we usually call task scheduling. A "task" is the generic term for something to be done. Fine control of these tasks provides maximum information to the user on the sequence of tasks that will be sent, which tasks will be launched in parallel, which ones will be launched sequentially, which tasks must be completed before launching others etc. In addition, in case of problems, the user will be able to find out which tasks did not finish correctly and why: special attention is given to logging. This is why we have chosen to use dask to schedule our tasks, and more specifically `dask.delayed `_ and `dask.jobqueue `_ to respectively build the dependencies between tasks and then submit them for execution. About dask ********** A lot of documentation is available on dask's website : https://docs.dask.org/en/latest/ iota2, the context of a processing chain **************************************** Iota2 is a modular processing chain, which is capable of handling large amounts of data implementing algorithms that can be sequential and/or massively parallel. From this, we can deduce 3 major constraints which have oriented the chosen solutions for the scheduling: - modularity This means that depending on the user's choice, an algorithm can be activated or not and be set up differently. As a result, the sequences of tasks to be launched can be very different from one iota2 execution to another. - management of computing resources Being able to assign a quota of resources per task is needed in order to have an optimal scheduling when iota2 is launched on infrastructures (HPC clusters, cloud) where the allocation of computing resources conditions the launch of a task and can be billable. - Step selection iota2 must be able to provide its users with a simple interface to resume processing in the event of problems or to select the processing to be performed from a list of available steps to be processed. This documentation proposes to guide the reader on the solution chosen in iota2 for task scheduling with dask through a simple example, to meet each of the constraints expressed above. Then, the final solutions implemented in iota will be shown. The proposed example is not representative of the sequence of steps actually performed in iota2. Modularity ********** dask.delayed : how to use it ? ============================== `dask.delayed `_ objects are used in iota2 to define the dependencies between steps and tasks. .. Warning:: in iota2 we use the term "step" to refer to executions that have a common purpose. For example learning, classifying and validating classifications are 3 distinct steps: at each launch of iota2, a summary of the steps to be launched is shown to the user. In this documentation, we will also talk about tasks. From this point on, tasks will designate a subset of a step. For example, the learning step will consist of 2 tasks: "learning model 1" and "learning model 2". First, let's construct a simple tasks dependency thanks to two simple functions. The purpose is to launch once `function_A`. Once completed, launch `function_B` once. .. code-block:: python def function_A(arg_a: int) -> int """ """ print(arg_a) return arg_a + 1 def function_B(arg_b: int) -> int: """ """ print(arg_b) return arg_b + 1 a_return = function_A(1) print("between two functions call") b_return = function_B(a_return) print(b_return) then without surprise, we get the trace .. code-block:: bash 1 between two functions call 2 3 However, by using dask.delayed : .. code-block:: python a_return = dask.delayed(function_A)(1) print("between two functions call") b_return = dask.delayed(function_B)(a_return) print(b_return) .. code-block:: bash between two functions call Delayed('function_B-66d71002-295c-4711-8c86-4a3184ee6163') We only get a delayed object, **no function has been executed**. In order to trigger some processing, we must use the **compute()** method from the delayed object ``b_return``. Then we get : .. code-block:: python a_return = dask.delayed(function_A)(1) print("between two function call") b_return = dask.delayed(function_B)(a_return) print(b_return) res = b_return.compute() print(res) .. code-block:: bash between two function call Delayed('function_B-66d71002-295c-4711-8c86-4a3184ee6163') 1 2 3 This is the common usage of dask.delayed objects : ``use returned object`` to generate a complete graph and submit it using **compute()** or equivalent. ``However in iota2``, functions do not fit to this kind of use: functions are more data oriented and most ``functions return the None object``. The following code snippet illustrates this: .. code-block:: python def learn_model(vector_file: str, output_model: str) -> None: """ learn model according to features stored in a vector file """ print(f"I'm learning {output_model} thanks to {vector_file}") def classify_tile(model_file: str, output_classif: str) -> None: """ perform classification thanks to a model """ print(f"classification of {output_classif} thanks to {model_file}") Dask assumes that every functions can communicate through their API (function's inputs and outputs) and then, thanks to these communications, deduce an execution graph. In iota2 the dependencies can be strong between certain stages (learning then classification) whereas it is not possible to express a dependency between these tasks understandable by dask. To control the dependencies between these tasks, a solution could have been to impose a type of return to these functions and their input, but this then constrains the use of these functions to an internal use in iota2, which is not the purpose of these functions. That's why, in order to be as generic as possible, we have decided not to impose any particular type of return to these functions launched by dask. The chosen solution is to use a function that wraps the function to be executed, this function will be called ``tasks_launcher()``. .. _1: rewrite the previous test using tasks_launcher() ................................................ .. code-block:: python def task_launcher(*args: List[dask.delayed], **kwargs) -> None: # deep copy to avoid side effect. kwargs = kwargs.copy() task_kwargs = kwargs.get("task_kwargs", None) if task_kwargs: # get the function to execute func = task_kwargs["f"] # remove it from the input dictionnary task_kwargs.pop('f', None) # use the input dictionary as parameters of the stored function # the next line launch the task func(**task_kwargs) database_file = "my_database.sqlite" model_file = "model_1.txt" from_learn_model = dask.delayed(task_launcher)(task_kwargs={ "f": learn_model, "vector_file": database_file, "output_model": model_file }) classification_1 = "Classif_1.tif" res = dask.delayed(task_launcher)(from_learn_model, task_kwargs={ "f": classify_tile, "model_file": database_file, "output_classif": classification_1 }) res.compute() This time the execution and arguments are respected. To completely understand the workflow two points must be understood: - the functioning of dask.delayed(function_to_delayed)(parameters_of_function_to_delayed) - if one of the parameters of the delayed function is a dask.delayed, then the parameter will be triggered/executed before the function is called, for instance: .. code-block:: python def some_function(arg1, arg2: int): ... A = dask.delayed(...)(...) delayed_return = dask.delayed(some_function)(A, 2) delayed_return_value = delayed_return.compute() In this example, `A` is a dependency of `some_function`. Therefore, `A` will be executed before the call to some_function. ``It is this behaviour that allows the dependencies to be expressed in iota2.`` This behaviour enables us to submit parallel tasks. .. Note:: The number of tasks that can be executed in parallel should not be confused with the actual number of tasks executed at the same time. Indeed, the number of tasks actually launched in parallel depends mainly on the resources available at the time when iota2 must launch a task. Submitting parallel tasks ......................... Now that the system of dependencies between functions is set up, let's see how to integrate multiple dependencies with the following example: "generate 2 models before launching the classification of a tile, both models can be computed in parallel". We can schematize this exercise by the following diagram: .. code-block:: bash model_1 model_2 \ / \ / classification .. code-block:: python model_1_delayed = dask.delayed(task_launcher)(task_kwargs={ "f": learn_model, "vector_file": database_file, "output_model": model_file_1 }) model_2_delayed = dask.delayed(task_launcher)(task_kwargs={ "f": learn_model, "vector_file": database_file, "output_model": model_file_2 }) classification_1 = "Classif_1.tif" res = dask.delayed(task_launcher)(model_1_delayed, model_2_delayed, task_kwargs={ "f": classify_tile, "model_file": database_file, "output_classif": classification_1 }) # there is no execution until the next line res.compute() In the exemple above, ``model_1_delayed`` and ``model_2_delayed`` become dependencies of the execution of the function task_launcher which will launch the classification function. Consequently, the function ``learn_model`` will be executed twice before the ``classify_tile`` call. Also, we can re-write dependencies like the following : .. code-block:: python dependencies = [model_1_delayed, model_2_delayed] res = dask.delayed(task_launcher)(*dependencies, task_kwargs={ "f": classify_tile, "model_file": database_file, "output_classif": classification_1 }) # there is no execution until the next line res.compute() Two key points can be concluded from this example: - we can store dependencies (delayed objects) in a usual Python list. - the first parameter of task_launcher(), ``*args``, represents the variable number of dependencies. Thanks to it, it is pretty easy to express dependencies between functions as long as the parameters of the functions to launch are known beforehand. Indeed, every task_kwargs parameters must contain the exhaustive list of parameters of the function to launch. Submitting tasks ================ We have seen that in order to trigger processing, we must use the compute() function of the dask.delayed objects. This method allows us to execute locally the desired function. However, dask also offers the possibility to trigger the execution on different computing architectures. This is what is proposed by the dask_jobqueue module. In iota2, we will give special attention to the LocalCluster and PBSCluster objects which allow respectively to create a cluster on a local machine or to use a physical HPC cluster managed with PBS. We had : .. code-block:: python ... res = dask.delayed(task_launcher)(*dependencies, task_kwargs={ "f": classify_tile, "model_file": database_file, "output_classif": classification_1 }) # there is no execution until the next line res.compute() now we get .. code-block:: python if __name__ == '__main__': from dask.distributed import Client from dask.distributed import LocalCluster from dask_jobqueue import PBSCluster ... res = dask.delayed(task_launcher)(*dependencies, task_kwargs={ "f": classify_tile, "model_file": database_file, "output_classif": classification_1 }) cluster = PBSCluster(n_workers=1, cores=1, memory='5GB') client = Client(cluster) # there is no execution until next line sub_results = client.submit(res.compute) # wait until tasks are complete sub_results.result() .. note:: In order to reproduce the examples and if you don't have an access to a HPC cluster scheduled by PBS, you can replace the line : .. code-block:: python cluster = PBSCluster(n_workers=1, cores=1, memory='5GB') by .. code-block:: python cluster = LocalCluster(n_workers=1, threads_per_worker=1) Here, a cluster object is created with one worker, one core and 5 Gb of RAM. Then tasks are sent to the PBS scheduler using the submit function. You may notice that resources can now be chosen. Now we will have a look on how we can assign resources to different tasks in iota2. Set ressources by task ********************** Currently, dask does not offer a solution for attaching resources by task, so we have to find a way to do this. For this, we will rely on the ``task_launcher()`` function which is the place where each task is sent. In fact, we are going to add a new parameter to this function, a python dictionary, which will characterize the resources needed to execute a task. These resources will be used to create inside the task_launcher() function a cluster object with the correct resource allocation. It is on this cluster, instantiated in task_launcher(), that the task will be executed. Here is its definition: .. code-block:: python def task_launcher(*args: List[dask.delayed], resources={"cpu": 1, "ram": "5Gb"}, **kwargs) -> None: kwargs = kwargs.copy() task_kwargs = kwargs.get("task_kwargs", None) if task_kwargs: # get the function to execute func = task_kwargs["f"] # remove it from the input dictionnary task_kwargs.pop('f', None) # next line launch the function locally # func(**task_kwargs) # next lines deploy the function to the cluster with the necessary resources. cluster = PBSCluster(n_workers=1, cores=resources["cpu"], memory=resources["ram"]) client = Client(cluster) client.wait_for_workers(1) sub_results = client.submit(func, **task_kwargs) sub_results.result() # shutdown the cluster/client dedicated to one task cluster.close() client.close() Every task has to create its own cluster object with the appropriate resource allocation. Once the cluster is ready, the tasks can be executed on it. So now, if learning tasks are monothreaded and the classification task is multithreaded, our main code may looks like: .. _previously: .. _2: .. code-block:: python if __name__ == '__main__': database_file = "my_database.sqlite" model_file_1 = "model_1.txt" model_file_2 = "model_2.txt" model_1_delayed = dask.delayed(task_launcher)(task_kwargs={ "f": learn_model, "vector_file": database_file, "output_model": model_file_1 }, resources={ "cpu": 1, "ram": "5Gb" }) model_2_delayed = dask.delayed(task_launcher)(task_kwargs={ "f": learn_model, "vector_file": database_file, "output_model": model_file_2 }, resources={ "cpu": 1, "ram": "5Gb" }) classification_1 = "Classif_1.tif" dependencies = [model_1_delayed, model_2_delayed] res = dask.delayed(task_launcher)(*dependencies, task_kwargs={ "f": classify_tile, "model_file": database_file, "output_classif": classification_1 }, resources={ "cpu": 2, "ram": "10Gb" }) cluster = LocalCluster(n_workers=1) client = Client(cluster) client.wait_for_workers(1) # there is no execution until next line sub_results = client.submit(res.compute) # wait until tasks are terminated sub_results.result() In the example above, we have the learning tasks that will start with a resource reservation of 1 CPU and 5GB of RAM while the classification step will be sent with a resource reservation twice as large. At this point, we can say that two thirds of the scheduling solution is already in place, because scheduling allows dependencies between tasks and the assignment of resources by task is possible. The last phase that remains to be dealt with is offering the possibility for users to select a step interval to be executed. This is the subject of the following section. Enable and disable steps ***************************** In a processing chain composed of several steps it can sometimes be interesting for users to select a step interval to rerun. Several use cases can lead the user to make this choice: replaying a processing chain in case of errors, some steps are not necessary because already done outside iota2 (learning a model, generating a database, ...) or the user wants to rerun a step but with different parameters. iota2 must be able to offer this kind of service which is very appreciated by users, but also by step developers. Problem illustration, thanks to the previous example ==================================================== In the previous example, the task sequence graph is rigid, in the sense that if a step (the group of learning tasks and/or classification) is removed, no execution will be possible: the dependency tree is broken. The goal here is to find a generic way to express a step sequence that will work regardless of the steps that are removed from the execution graph. A first trivial solution will be exposed, its limitations will show that it is necessary to move towards a more complex and generic solution which is the one implemented in iota2. remove the last task .................... Based on the solution proposed previously_, it is relatively easy to manage the case where only the classification is desired. Assuming the second python argument is the first step index to execute and the third argument the last one. .. code-block:: python if __name__ == '__main__': import sys first_step = int(sys.argv[1]) last_step = int(sys.argv[2]) ... if first_step == 1 and last_step == 2: res = dask.delayed(task_launcher)(*dependencies, task_kwargs={ "f": classify_tile, "model_file": database_file, "output_classif": classification_1 }, resources={ "cpu": 2, "ram": "10Gb" }) elif first_step == 2 and last_step == 2: # if only the classification is asked, then classifications tasks get no dependencies res = dask.delayed(task_launcher)(task_kwargs={ "f": classify_tile, "model_file": database_file, "output_classif": classification_1 }, resources={ "cpu": 2, "ram": "10Gb" }) ... sub_results = client.submit(res.compute) sub_results.results() This solution works when only the last step is required. If only the classification step is required (first_step == last_step == 2), then the classification taks get no dependencies. However, the solution will not work when only the learning tasks are requested. Indeed, the line : .. code-block:: python sub_results = client.submit(res.compute) will not be able to launch the learning tasks because the variable ``res`` (defining the tasks to be launched) and ``dependencies`` (containing the learning tasks) have different types: one is a dask.delayed and the other is a list of dask.delayed. A generic way to send tasks to the cluster must therefore be found. Below is the one proposed in iota2: a container of the last steps to be sent, ``step_tasks``. .. _3: remove the first task ..................... .. code-block:: python if __name__ == '__main__': import sys first_step = int(sys.argv[1]) last_step = int(sys.argv[2]) ... step_tasks = [] if first_step == 1: model_1_delayed = dask.delayed(task_launcher)(...) model_2_delayed = dask.delayed(task_launcher)(...) step_tasks = [model_1_delayed, model_2_delayed] ... if first_step == 1 and last_step == 2: dependencies = [model_1_delayed, model_2_delayed] classif_delayed = dask.delayed(task_launcher)(*dependencies, ...) step_tasks.append(classif_delayed) elif first_step == 2 and last_step == 2: classif_delayed = dask.delayed(task_launcher)(...) step_tasks.append(classif_delayed) # there is no execution until the next line final_dask_graph = dask.delayed(task_launcher)(*step_tasks) sub_results = client.submit(final_dask_graph.compute) # wait until tasks are terminated sub_results.result() The step_tasks container is redefined for each step and contains all the tasks to be launched for a step. This step container, through the use of ``task_launcher()``, enables gathering all the tasks defined in a step of iota2. All scheduling constraints have been exposed and solved, the following script summarizes all these solutions by introducing an additional complexity which is present in iota2, all steps are represented by classes. Thus, this script allows to be the minimal representation of what scheduling is in iota2. Scheduling solution in iota2 **************************** It is easy to notice that steps have common functionnalities: creating tasks, associating dependencies to them and launching tasks. All these steps common features have been gathered in the base class: ``i2_step()``. The next section shows the slightly simplified, but functional, definition of this class and parallels between the definition of this class and the previous examples are highlighted. It is therefore important that the examples 1_, 2_ and 3_ are well understood. .. _BaseClass: .. _i2_step: Base class dedicated to steps ============================= .. code-block:: python class i2_step(): """base class for every iota2 steps""" class i2_task(): """class dedicated to reprensent a task in iota2""" def __init__(self, task_parameter: Dict, task_resources: Dict): self.task_parameter = task_parameter self.task_resources = task_resources step_container = [] tasks_graph = {} # parameters known before the iota2's launch database_file = "my_database.sqlite" models_to_learn = ["model_1.txt", "model_2.txt"] classifications_to_perform = ["Classif_1.tif"] def __init__(self): self.step_container.append(self) self.step_tasks = [] @classmethod def get_final_execution_graph(cls): return dask.delayed( cls.task_launcher)(*cls.step_container[-1].step_tasks) def task_launcher(*args: List[dask.delayed], resources={"cpu": 1, "ram": "5Gb"}, **kwargs) -> None: """method to launch task""" kwargs = kwargs.copy() task_kwargs = kwargs.get("task_kwargs", None) if task_kwargs: func = task_kwargs["f"] task_kwargs.pop('f', None) task_cluster = PBSCluster(n_workers=1, cores=resources["cpu"], memory=resources["ram"]) task_client = Client(task_cluster) task_client.wait_for_workers(1) task_results = task_client.submit(func, **task_kwargs) task_results.result() task_cluster.close() task_client.close() # func(**task_kwargs) def add_task_to_i2_processing_graph( self, task, task_group: str, task_sub_group: Optional[str] = None, task_dep_dico: Optional[Dict[str, List[str]]] = None) -> None: """use to build the tasks' execution graph """ if task_group not in self.tasks_graph: self.tasks_graph[task_group] = {} # if there is no steps in the step container that mean no dependencies have # to be set if len(self.step_container) == 1: new_task = dask.delayed(self.task_launcher)( task_kwargs=task.task_parameter, resources=task.task_resources) else: task_dependencies = [] for task_group_name, tasks_name in task_dep_dico.items(): if task_group_name in self.tasks_graph: for dep_task in tasks_name: task_dependencies.append( self.tasks_graph[task_group_name][dep_task]) new_task = dask.delayed(self.task_launcher)( *task_dependencies, task_kwargs=task.task_parameter, resources=task.task_resources) # update the dependency graph self.tasks_graph[task_group][task_sub_group] = new_task # save current step's tasks self.step_tasks.append(new_task) This class allows the possibility to define dependencies between tasks and to have different resources per task using the ``task_launcher()`` function which has exactly the same definition as in the examples above. Also, dependencies between tasks are managed thanks to the `add_task_to_i2_processing_graph()`. This function relies on the class attribute "tasks_graph" which, like "dependencies" in the example 2_, allows to save the dependencies between tasks and thus build a dependency graph using dask. However, ``tasks_graph`` and ``dependencies`` are a bit different: dependencies was a simple ``list of dask.delayed objects``, while tasks_graph is a ``dictionary of dictionaries``. This change makes it possible to 'name' the dependencies and to be more explicit than a simple list when traveling between stages. All dependency management between tasks will be handled by the class attribute ``tasks_graph`` and the method ``add_task_to_i2_processing_graph()``. Each task must belong to a ``task_group`` and must define its dependencies if needed. For example, let's take the previous example where 2 models have to be created. So there will be 2 tasks that we will name "model_1.txt" and "model_2.txt" that we will group together under the ``task_group`` "models". Somewhere in the code, there will be a call to add_task_to_i2_processing_graph() like this: .. code-block:: python add_task_to_i2_processing_graph(task, "models", "model_1.txt") ... add_task_to_i2_processing_graph(task, "models", "model_2.txt") where ``task`` is a class containing two dictionaries, one describing the task and the other describing the task resources. These dictionaries are the same that the ones we can find in 3_ under the names ``task_kwargs`` and ``resources``. .. Warning:: There are no special naming conventions for ``task_group`` and ``task_sub_group`` variables, users can name them as they wish. No dependencies have been mentioned because the learning step will be the first step in our example processing chain. We can define our learning step as a class that inherits the functionality of the ``i2_step`` class : .. code-block:: python class learn_model(i2_step): """class simulation the learn of models""" def __init__(self): super(learn_model, self).__init__() for model in self.models_to_learn: model_task = self.i2_task(task_parameter={ "f": self.learn_model, "vector_file": self.database_file, "output_model": model }, task_resources={ "cpu": 1, "ram": "5Gb" }) self.add_task_to_i2_processing_graph(model_task, task_group="models", task_sub_group=model) def learn_model(self, vector_file: str, output_model: str) -> None: """ learn model according to features stored in a vector file """ print(f"I'm learning {output_model} thanks to {vector_file}") Likewise, we can define the classification step .. code-block:: python class classifications(i2_step): """class to simulate classifications""" def __init__(self): super(classifications, self).__init__() classification_task = self.i2_task(task_parameter={ "f": self.classify_tile, "model_file": self.models_to_learn[0], "output_classif": self.classifications_to_perform[0] }, task_resources={ "cpu": 2, "ram": "10Gb" }) self.add_task_to_i2_processing_graph( classification_task, task_group="classifications", task_sub_group=self.classifications_to_perform[0], task_dep_dico={"models": self.models_to_learn}) def classify_tile(self, model_file: str, output_classif: str) -> None: """ """ print(f"classification of {output_classif} thanks to {model_file}") This time, dependencies have been set when creating the classification task: the classification task needs both learning tasks to be completed in order to be run. .. code-block:: python task_dep_dico={"models": self.models_to_learn} which means: "I need that every task named as in the list of tasks names self.models_to_learn of the group of tasks 'models' has been finished to be launched". Furthermore, it is possible to create tasks which depend of many groups of tasks. Let's create a new class : ``confusion`` which depends on the classification and one learning task. .. code-block:: python class confusion(i2_step): """class to simulate classifications""" def __init__(self): super(confusion, self).__init__() confusion_task = self.i2_task(task_parameter={ "f": self.confusion, "output_confusion_file": "confusion.csv" }, task_resources={ "cpu": 2, "ram": "10Gb" }) self.add_task_to_i2_processing_graph( confusion_task, task_group="confusion", task_sub_group="confusion_tile_T31TCJ", task_dep_dico={ "classifications": [self.classifications_to_perform[0]], "models": [self.models_to_learn[-1]] }) def confusion(self, output_confusion_file) -> None: """ """ print(f"generetating confusion : {output_confusion_file}") This class generates only one task, which depends on the first classification task (actually, there is only one) and the last task of learning step. Once our step classes are defined, it is time to instanciate them. To achieve this pupose, we use the class called ``i2_builder()`` which manages the step's instanciation considering 2 parameters: the index of the first and the last step to be launched. This way, we satisfy the last constraint: being able to select an interval of steps to execute in a sequence of steps. Then after the call of ``get_final_i2_exec_graph()``, the full processing chain will be ready to be launched. Actually, the only purpose of this method is to instanciate the required steps and by extension build the final dask graph. .. Note:: self.step_tasks and the varialbe ``step_tasks`` in example 3_ have the same goal, save step's tasks .. _i2_builder: .. code-block:: python class i2_builder(): """define step sequence to be launched""" def __init__(self, first_step: int, last_step: int): self.first_step = first_step self.last_step = last_step self.steps = self.build_steps() if self.last_step > len(self.steps): raise ValueError(f"last step must be <= {len(self.steps)}") def build_steps(self): """prepare steps sequence""" from functools import partial steps_constructors = [] steps_constructors.append(partial(learn_model)) steps_constructors.append(partial(classifications)) steps_constructors.append(partial(confusion)) return steps_constructors def get_final_i2_exec_graph(self): # instanciate steps which must me launched steps_to_exe = [ step() for step in self.steps[self.first_step:self.last_step + 1] ] return i2_step.get_final_execution_graph() if __name__ == '__main__': import sys run_first_step = int(sys.argv[1]) - 1 run_last_step = int(sys.argv[2]) - 1 assert run_last_step >= run_first_step i2_chain = i2_builder(run_first_step, run_last_step) graph = i2_chain.get_final_i2_exec_graph() graph.compute() These orchestration classes, are functional and enable to meet the constraints previously mentioned. One last constraint has not yet been cited and is related to dask: when a graph is built, it is impossible to modify it during its call. Once launched, it is necessary to wait for the end of the execution before doing anything else. This trivial constraint can have serious consequences for some chain executions where in some particular cases the number of tasks to be launched cannot be determined before submiting computations. In the figure below, the task :math:`T_3` have two dependencies : :math:`T_1` and :math:`T_2` and produce a number of dependencies which is only know after it's execution. The complete graph, from :math:`T_4` to :math:`T_N`, can't be build. .. figure:: ./Images/random_task.png :align: center :alt: random number of tasks Step generating a non-deterministic number of dependencies In iota2, we propose to bypass this constraint thanks to the creation of several independent execution graphs. The following section will complement the `i2_builder` and `i2_step` classes to satify the following scenario: Currently, there are 3 steps in a sequence, the steps of learning, classify and then confusion. We are going to add a new step to report production. The class dedicated to the production of these tasks is named ``report``. In this scenario, the confusion step will generate a **random** number of files and for each of these files, the report class will have to associate it a task. The report class must deal with the randomness of its dependencies. The first change to be made concerns the object that will contain the steps. Previously, in the i2_builder_ class, we used a simple list `called steps_constructors`. This list simply stored the steps that will be launched later in the execution phase. In the proposed solution, we will replace this list with a class that we will create : step_container() whose definition is as follows .. code-block:: python class step_container(): list_of_name = set() def __init__(self, container_name: Optional[str] = "i2_step_container", callback=None): self.name = container_name if self.name in self.list_of_name: raise ValueError(f"{self.name} already exists") self.list_of_name.add(container_name) self.container = [] self.callback = callback def append(self, step): self.container.append(step) # next line inform the step instance, the name of it's container owner step.func.container_name = self.name def __setitem__(self, index, val): self.container[index] = val def __getitem__(self, index): return self.container[index] def __len__(self): return len(self.container) As you can observe, This class needs two parameters to be instantiated: a name and a partial function (ready to be executed). These two parameters enable the class to satisfy these two main purposes: - Inform the step of the name of the container that contains it. It is this name which, as we will see in the next code snippet, will determine the execution graph of the step. - Associate a function (the callback function) to a particular step container. This function could, for example, be triggered before the execution of a sequence of steps. The method `add_task_to_i2_processing_graph()` of class i2_step_ must therefore be modified to be able to generate different execution graphs according to the information sent by the "step_container" (the execution graph to feed) .. code-block:: python def add_task_to_i2_processing_graph( self, task, task_group: str, task_sub_group: Optional[str] = None, task_dep_dico: Optional[Dict[str, List[str]]] = None) -> None: """use to build the tasks' execution graph """ if not self.container_name in self.tasks_graph: self.tasks_graph[self.container_name] = {} if task_group not in self.tasks_graph[self.container_name]: self.tasks_graph[self.container_name][task_group] = {} if not task_dep_dico: new_task = dask.delayed(self.task_launcher)( task_kwargs=task.task_parameter, resources=task.task_resources) else: task_dependencies = [] for task_group_name, tasks_name in task_dep_dico.items(): if task_group_name in self.tasks_graph[self.container_name]: for dep_task in tasks_name: task_dependencies.append(self.tasks_graph[ self.container_name][task_group_name][dep_task]) new_task = dask.delayed(self.task_launcher)( *task_dependencies, task_kwargs=task.task_parameter, resources=task.task_resources) self.tasks_graph[ self.container_name][task_group][task_sub_group] = new_task self.step_tasks.append(new_task) .. Warning:: In the previous solution, the `tasks_graph` class attribute was a dependency graph, whereas now it is a dictionnary of dependency graph where each key in the dictionary is actually given by the name of the step container. It is important to notice that we have created two containers. The first one contains the first steps already built and the second one contains only the report creation step, these two containers generate two independants dask graphs. Since the confusing step generates a random number of files at runtime, it is impossible to predict in advance the dependencies of the reporting step. This is why the report step is placed alone in a container, we will see later in another section how the callback function will enable us to help the report creation step to create as many tasks as necessary. The second method to be modified in class i2_builder_ is ``get_final_i2_exec_graph``. The goal of this method will be to prepare the dask graphs, according to the steps requested by the user for their execution. .. code-block:: python def get_final_i2_exec_graph(self) -> List[waiting_i2_graph]: """ """ i2_graphs = [] # Assuming that self.first_step and self.last_step are respectively # the index of the first and last step requested by the user indexes = self.get_indexes_by_container(self.first_step, self.last_step) for container, tuple_index in zip(self.steps_containers, indexes): if tuple_index: container_start_ind, container_end_ind = tuple_index i2_graphs.append( waiting_i2_graph(self, container, container_start_ind, container_end_ind, container.callback)) return i2_graphs def get_indexes_by_container(self, first_step_index, last_step_index) -> List[Tuple[int, int]]: """ the purpose of this function is to get for each container of steps, indexes of steps to process according to user demand. print([len(steps_container) for steps_container in self.build_steps()]) >>> [3, 1] print(self.get_indexes_by_container(0, 0)) >>> [(0, 0), []] print(self.get_indexes_by_container(0, 4)) >>> [(0, 2), (0, 0) """ import numpy as np remaining_steps = len(np.arange(first_step_index, last_step_index + 1)) buffer_index = [] len_buff = 0 for cpt, container in enumerate(self.steps_containers): buffer_index.append([]) if first_step_index != 0: first_step_graph = first_step_index - len_buff else: first_step_graph = 0 len_buff = len(container) if first_step_graph > len(container) - 1: continue for elem in range(first_step_graph, len(container)): if remaining_steps == 0: break buffer_index[cpt].append(elem) remaining_steps -= 1 container_indexes = [] for index_list in buffer_index: if index_list: container_indexes.append((index_list[0], index_list[-1])) else: container_indexes.append([]) return container_indexes For each graph, we prepare its execution thanks to the use of `waiting_i2_graph` class. This class will allow us to build the dependency graph after calling the callback function. .. code-block:: python class waiting_i2_graph(): """ data class containing dask's graph ready to be build. """ def __init__(self, builder, steps_container, starting_step, ending_step, callback_function=None): self.container = steps_container self.starting_step = starting_step self.ending_step = ending_step self.callback = callback_function self.builder = builder self.figure_graph = None def build_graph(self): if self.callback: self.callback() # call every needed steps constructor _ = [ step() for step in self.container[self.starting_step:self.ending_step + 1] ] return i2_step.get_exec_graph() The `get_final_i2_exec_graph()` method remains the entry point of the orchestration api. This method now returns instances of `waiting_i2_graph`, so the new way to send iota2 processing is as follow: .. code-block:: python if __name__ == '__main__': import sys run_first_step = int(sys.argv[1]) - 1 run_last_step = int(sys.argv[2]) - 1 assert run_last_step >= run_first_step i2_chain = i2_builder(run_first_step, run_last_step) i2_graphs = i2_chain.get_final_i2_exec_graph() # we sequentially call the build_graph() method for each waiting_i2_graph, building dask's graph for i2_graph in i2_graphs: delayed_dask_graph = i2_graph.build_graph() # use dask's compute function as usual delayed_dask_graph.compute() Now that the step orchestration in iota2 enables us to launch several execution graphs one after the other we can write the step class `confusion` and `report` to respond to the scenario and see how an execution works. .. code-block:: python class confusion(i2_step): """class to simulate classifications""" def __init__(self): super(confusion, self).__init__() confusion_task = self.i2_task(task_parameter={ "f": self.confusion, "output_confusion_file": "confusion.csv" }, task_resources={ "cpu": 2, "ram": "10Gb" }) self.add_task_to_i2_processing_graph( confusion_task, task_group="confusion", task_sub_group="confusion_tile_T31TCJ", task_dep_dico={ "classifications": [self.classifications_to_perform[0]], "models": [self.models_to_learn[-1]] }) def confusion(self, output_confusion_file) -> None: """ """ import random nb_confusion = random.randint(1, 5) print(f"number of random confusion file : {nb_confusion}") for nb_conf in range(nb_confusion): with open(f"confusion_iota2_{nb_conf}.txt", "w") as conf_file: conf_file.write(str(nb_conf)) class report(i2_step): """class to simulate classifications""" def __init__(self): super(report, self).__init__() for conf_file_number, conf_file in enumerate(i2_step.confusion_files): report_task = self.i2_task(task_parameter={ "f": self.do_report, "report_file": f"report_{conf_file_number}.txt", "confusion_file": conf_file }, task_resources={ "cpu": 2, "ram": "10Gb" }) self.add_task_to_i2_processing_graph( report_task, task_group="final_report", task_sub_group=conf_file_number) def do_report(self, report_file, confusion_file) -> None: """ """ print( f"generetating report : {report_file} thanks to {confusion_file}") os.remove(confusion_file) .. Warning:: It is important to note that to build its tasks, the `report` class relies on the class attribute i2_step.confusion_files. This attribute must, of course, have a value before calling the class constructor. This attribute can, for example, be updated in the callback function of the appropriate container : .. code-block:: python def callback_function(self): import glob confusion_files = glob.glob("confusion_iota2_*.txt") i2_step.confusion_files = confusion_files def build_steps(self) -> List[step_container]: """prepare steps sequence""" from functools import partial steps_part_1 = step_container("part_1") steps_part_1.append(partial(learn_model)) steps_part_1.append(partial(classifications)) steps_part_1.append(partial(confusion)) steps_part_2 = step_container("part_2") steps_part_2.callback = partial(self.callback_function) steps_part_2.append(partial(report)) return steps_part_1, steps_part_2 The full scheduling script is available at : :download:`i2_dask_scheduling.py <./scripts/i2_dask_scheduling.py>` Then an execution trace can be: .. code-block:: bash >>> python i2_dask_scheduling.py 1 4 I'm learning model_1.txt thanks to my_database.sqlite I'm learning model_2.txt thanks to my_database.sqlite classification of Classif_1.tif thanks to model_1.txt number of random confusion file : 3 generetating report : report_0.txt thanks to confusion_iota2_0.txt generetating report : report_1.txt thanks to confusion_iota2_1.txt generetating report : report_2.txt thanks to confusion_iota2_2.txt We can see that the unpredictability of the use of the confusion class is correctly managed through the use of several dask graphs. Indeed, for 3 produced confusion files, 3 corresponding reports are generated. This script provides the main components of scheduling in iota2. Most of the functions presented in this script are used as is in iota2. Developers will be able to find these functions in the Iota2.py, Iota2Builder.py and IOTA2Steps.py modules.