The only requirements to write a task are
from zenaton.abstracts.task import Task from zenaton.traits.zenatonable import Zenatonable class SimpleTask(Task, Zenatonable): def handle(self): # Your task implementation
When a task fails, an exception is usually thrown and the Zenaton engine is alerted.
But in some rare cases (such as an Agent crash), it is not possible to know for sure
that something went wrong. To handle this situation, a task will be considered a failure if
its execution lasts more than 5 minutes.
You can modify this value by using a
max_processing_time method, eg.
from zenaton.abstracts.task import Task from zenaton.traits.zenatonable import Zenatonable class MyTask(Task, Zenatonable): def handle(self): # Your task implementation def max_processing_time(self): return 600 # after 10 minutes this task will be considered a failure
Once a task timed out, you can retry it from the Zenaton interface.
You can dispatch a single task by creating a new instance from anywhere in your code and calling the
dispatch method on it.
Dispatching a task means it will be executed asynchronously. It's useful when you want to run some code in the background without waiting for it to complete, and you don't need it to be orchestrated as part of a workflow. This is the most basic way to use Zenaton. In the next section, you will learn how to orchestrate multiple tasks that are part of a workflow.