Tasks#
Specifying a task#
Tasks are most easily created by decorating a function:
from smttask import RecordedTask
@RecordedTask
def Add(a: float, b: float, n: int=10) -> float:
for i in range(n):
a += b
return a
A few remarks:
Task functions must be stateless. That means that they should not be class methods (unless they are static) and should not have any side-effects, such as changing class or module variables. This is essential because a fundamental assumption of smttask is that the output of a task is entirely determined by its inputs. There is no way for smttask to check for statelessness, so you are responsible for ensuring this assumption is valid.
All function arguments have type annotations. This is required by smttask to construct the associated Task. If an argument can take different types, use ~typing.Union to specify that.
The output type must also be indicated via function annotation. There is also a more verbose notation (detailed below) allowing to specify more outputs. The use of ~typing.Union here is untested and not recommended.
We capitalized the function name
Add()here. This is because the decorator converts the function into a class (a subclass of ~smttask.Task). This choice is of course purely stylistic.
There are currently four available Task decorators:
@RecordedTaskStandard task which will be recorded by Sumatra.
@RecordedIterativeTaskA recorded task with a special iteration parameter. This parameter can be used to reuse previous partial computations of the same task with fewer steps. Typical use cases are iterative fitting procedures or simulations.
@MemoizedTaskStantard task which is not recorded by Sumatra. Because the result is not written to disk, it does not need to be serializable and can be any Python object. Used as component of a larger pipeline. By marking the task as “memoized”, the programmer guarantees that task result will not change within the same Python process: the result is memoized as an attribute of the Task object. The task is also computed at least one, but memoization can avoid repeating computations, if the same task is used multiple times within an analysis.
@UnpureMemoizedTaskA special task intended to simplify workflow definitions, by encapsulating tasks which depend on computed state. The typical case is a database query: we want to define the workflow with “list entries from DB” but the digest should be computed from the result of that query. This is especially useful if the state changes seldomly, since any change of state would cause all dependent tasks to have new digests.
Like the
MemoizedTask, this Task is only computed once within the same Python process. The programmer must therefore ensure that the Task result would not during an analysis run. For example, if used to mark a Task querying a database, it must not be possible for the database to change during analysis.
For more advanced usage, callable classes can also be used to define tasks. This can be useful to define utility methods which depend on the task inputs.
from smttask import RecordedTask
@RecordedTask
class CombAdd(a: float, b: float, n: int=10, m: int=10) -> List[float]:
def gen_combs(self): # Yields n*m values
for n in range(self.taskinputs.n):
for m in range(self.taskinputs.m):
yield (n, m)
def __call__(self, a: float, b: float, n: int=10) -> float:
vals = [n*a + m*b
for n, m in self.gen_combs()]
return vals
def unpack_result(self, result):
return {nm: r for nm, r in zip(self.gen_combs(),
result)
}
task = CumAdd(a=2.1, b=1.1)
# Get the (n,m) combinations used by the task
task.gen_combs()
# Run the task
res = task.run()
# Replace the list with a dictionary explicitely relating an (n,m) pair to a result
resdict = task.unpack_result(res)
Note how in this example
We define the task within the __call__ method. The task method must have this name.
We can use self within __call__ without it being added to the task arguments. Any other name for the first argument will not work. (Or rather, it will be included in the task arguments.) It is not necessary it have a self argument, although if one is not needed, then probably decorating a function suffices.
We use self.taskinputs to access the task inputs.
The use of gen_combs to generate the (n,m) combinations avoids the need for external to know implementation details, like whether we loop over n or m first.
We provide an unpack_result method; this can be a convenient pattern for saving outputs in a compressed format. The name unpack_result is not special and the function is not used internally by the task: it is only to simplify user code. [1]
Tasks as inputs#
You can specify a Task type as an input to another:
class Mul(RecordedTask):
def Mul(a: Add, b: float) -> float:
return a*b
Note that it is not necessary for a task to explicitly state that its input(s) should be another task, and in fact not doing so greatly simplifies composability of tasks. By specifying only the required type (possibly as a ~typing.Tuple, if the task returns multiple values), any task returning a result of appropriate type is accepted.
Multiple output values#
There are two (and a half) ways to specify that a task should return multiple outputs. One is simply to specify it as a ~typing.Tuple:
@RecordedTask
def Add(a: float, b: float, n: int=10) -> Tuple[float, int]:
...
Such a task is treated as having a single output (a tuple). The output is saved to a single file, and you use indexing to retrieve a particular result.
Alternatively, the return type can expressed as a dictionary where all the keys are strings:
@RecordedTask
def Add(a: float, b: float, n: int=10) -> {"x": float, "n": int}:
...
With this approach, it is possible to assign names to the output values.
Moreover, the values of x and n will be saved to separate files (differentiated by their names).
Note
The second approach is actually equivalent to the following:
from smttask import TaskOutput
class AddOutputs(TaskOutput):
x: float
n: int
@RecordedTask
def Add(a: float, b: float, n: int=10) -> AddOutputs:
...
Explicitly constructing the ~smttask.TaskOutput type this way allows to also associate methods to the output type, although the need for this should be rather rare.
When names are assigned to outputs, the output of a task is returned as a ~Typing.NamedTuple, so specific output values can be retrieved either by indexing either by position or by name.
Automatic expansion of inputs#
Consider the following hypothetical task dependencies, which would be one way of loading a dataset distributed over multiple files:
@MemoizedTask
def LoadDatafile(path: Path) -> Array:
...
@MemoizedTask
def LoadDataset(datafiles: list[LoadDatafile]) -> dict[str,Array]:
...
As a variation, one might want to keep a name associated to each file, and instead write the second task as
@MemoizedTask
def LoadDataset(datafiles: dict[str,LoadDatafile]) -> dict[str,Array]:
...
In both of these cases, what the developer expects is clearly for each task entry in the list datafiles to be executed, the results of each LoadDatafile task combined into either a list or dictionary, before finally executing the LoadDataset task. This indeed what happens, both with built-in python types like list and dict, and with custom types like addict’s Dict or parameters’ ParameterSet. Therefore in most cases this Just Works as expected.
In certain cases however it may be necessary to adjust this behaviour. Under the hood, what SumatraTask does is inspect each argument, and if it is a Collection (i.e. an iterable with a length), then the argument is expanded to inspect its elements. Collections include tuples, lists and sets, all of which are usually cheap to iterate through. Some iterable types don’t make sense to expand, like str and bytes, and these are listed in the configuration option smttask.config.terminating_types.
Therefore, to prevent expansion of the custom type MyType, it only needs to be added to smttask.config.terminating_types (this is a set, which is why we use add):
import smttask
smttask.config.terminating_types.add(MyType)
Note that this is only necessary if isinstance(MyType, collections.abc.Collection) returns True AND that iterating through MyType is expensive. (E.g. if iteration involves costly I/O operations to load each element.)
Remember that inputs are generally not explicitely typed as tasks, and that our recommendations would be to type LoadDataset as
@MemoizedTask
def LoadDataset(datafiles: list[Array]) -> dict[str,Array]:
...
Therefore it is not possible for SumatryTask to know before hand whether a collection passed as input may contain a Task to execute. Because of this, all inputs which are sized iterables are expanded. (Unless they match an entry in smttask.config.terminating_types.)
Limitations#
New output types need their own serializers#
Output types must be supported by Scitying or Pydantic, although with those packages’ hooks for defining custom encoders and validators, this is almost always a solvable problem. [2] You can check whether a type MyType is supported by executing the following snippet:
from scityping.pydantic import BaseModel
class Foo(BaseModel):
a: MyType
If this raises an error stating that no validator was found, you will need to define a custom data type, as detailed in either the Pydantic or the Scityping documentation. [3]