Tasks

Specifying a task

Tasks are most easily created by decorating a function:

from smttask import RecordedTask

@RecordedTask
def Add(a: float, b: float, n: int=10) -> float:
  for i in range(n):
    a += b
  return a

A few remarks:

  • Task functions must be stateless. That means that they should not be class methods (unless they are static) and should not have any side-effects, such as changing class or module variables. This is essential because a fundamental assumption of smttask is that the output of a task is entirely determined by its inputs. There is no way for smttask to check for statelessness, so you are responsible for ensuring this assumption is valid.

  • All function arguments have type annotations. This is required by smttask to construct the associated Task. If an argument can take different types, use ~typing.Union to specify that.

  • The output type must also be indicated via function annotation. There is also a more verbose notation (detailed below) allowing to specify more outputs. The use of ~typing.Union here is untested and not recommended.

  • We capitalized the function name Add() here. This is because the decorator converts the function into a class (a subclass of ~smttask.Task). This choice is of course purely stylistic.

There are currently four available Task decorators:

@RecordedTask

Standard task which will be recorded by Sumatra.

@RecordedIterativeTask

A recorded task with a special iteration parameter. This parameter can be used to reuse previous partial computations of the same task with fewer steps. Typical use cases are iterative fitting procedures or simulations.

@MemoizedTask

Stantard task which is not recorded by Sumatra. Because the result is not written to disk, it does not need to be serializable and can be any Python object. Used as component of a larger pipeline.

@UnpureMemoizedTask

A special task intended to simplify workflow definitions, by encapsulating tasks which depend on computer state. The typical case is a database query: we want to define the workflow with “list entries from DB” but the digest should be computed from the result of that query. This is especially useful if the state changes seldomly, since any change of state would cause all dependent tasks to have new digests.

For more advanced usage, callable classes can also be used to define tasks. This can be useful to define utility methods which depend on the task inputs.

from smttask import RecordedTask

@RecordedTask
class CombAdd(a: float, b: float, n: int=10, m: int=10) -> List[float]:

  def gen_combs(self):  # Yields n*m values
    for n in range(self.taskinputs.n):
      for m in range(self.taskinputs.m):
        yield (n, m)

  def __call__(self, a: float, b: float, n: int=10) -> float:
    vals = [n*a + m*b
            for n, m in self.gen_combs()]
    return vals

  def unpack_result(self, result):
    return {nm: r for nm, r in zip(self.gen_combs(),
                                   result)
           }

task = CumAdd(a=2.1, b=1.1)
# Get the (n,m) combinations used by the task
task.gen_combs()
# Run the task
res = task.run()
# Replace the list with a dictionary explicitely relating an (n,m) pair to a result
resdict = task.unpack_result(res)

Note how in this example

  • We define the task within the __call__ method. The task method must have this name.

  • We can use self within __call__ without it being added to the task arguments. Any other name for the first argument will not work. (Or rather, it will be included in the task arguments.) It is not necessary it have a self argument, although if one is not needed, then probably decorating a function suffices.

  • We use self.taskinputs to access the task inputs.

  • The use of gen_combs to generate the (n,m) combinations avoids the need for external to know implementation details, like whether we loop over n or m first.

  • We provide an unpack_result method; this can be a convenient pattern for saving outputs in a compressed format. The name unpack_result is not special and the function is not used internally by the task: it is only to simplify user code.[#unpack]_

Tasks as inputs

You can specify a Task type as an input to another:

class Mul(RecordedTask):
def Mul(a: Add, b: float) -> float:
  return a*b

Note that it is not necessary for a task to explicitly state that its input(s) should be another task, and in fact not doing so greatly simplifies composability of tasks. By specifying only the required type (possibly as a ~typing.Tuple, if the task returns multiple values), any task returning a result of appropriate type is accepted.

Multiple output values

There are two ways to specify that a task should return multiple outputs. One is simply to specify it as a ~typing.Tuple:

@RecordedTask
def Add(a: float, b: float, n: int=10) -> Tuple[float, int]:
  ...

Such a task is treated as having a single output (a tuple). The output is saved to a single file, and you use indexing to retrieve a particular result.

Alternatively, one can explicitely construct the ~smttask.TaskOutput type:

from smttask import TaskOutput

class AddOutputs(TaskOutput):
  x: float
  n: int

@RecordedTask
def Add(a: float, b: float, n: int=10) -> AddOutputs:
  ...

With this approach, it is possible to assign names to the output values. Moreover, the values of x and n will be saved to separate files (differentiated by their names).

No matter the notation used, when used as an input to another Task, the receiving Task sees a tuple. It is currently not possible to index outputs by name.

Limitations

Output types must be supported by Scitying or Pydantic, although with those packages’ hooks for defining custom encoders and validators, this is almost always a solvable problem.[#almost_always]_ You can check whether a type MyType is supported by executing the following snippet:

from scityping.pydantic import BaseModel
class Foo(BaseModel):
  a: MyType

If this raises an error stating that no validator was found, you will need to define a custom data type, as detailed in either the Pydantic or the Scityping documentation. [3]