Workflow basics

Implementation

The only requirements to write a workflow are:

  • Implementing Zenaton\Interfaces\WorkflowInterface that requires only a handle method that will be called to run the workflow;
  • Using Zenaton\Traits\Zenatonable trait, that defines dispatch and execute methods
  • Using the provided Workflow function;
  • Using the provided workflow function;
  • Inheriting from Zenaton::Interfaces::Workflow that requires only a handle method that will be called to run the workflow;
  • Including Zenaton::Traits::Zenatonable module, that defines dispatch and execute methods
  • Inheriting from Zenaton.abstracts.workflow.Workflow class that requires only a handle method that will be called to run the workflow;
  • Inheriting from zenaton.traits.zenatonable.Zenatonable class, that defines dispatch and execute methods
  • Using either of the provided functions workflow.New or workflow.NewCustom;
  • Being idempotent. In more practical terms, it means it must implement a logical flow and NOT the tasks themselves.

Idempotence implies that any actions (such as requesting a database, writing/reading a file, using current time, sending an email, echoing in console, etc.) that have side effects or that need access to potentially changing information MUST be done within tasks (not from within workflows).

As Zenaton engine triggers the execution of the class describing a workflow each time it has to decide what to do next, failing to follow the idempotence requirement will lead to multiple executions of actions wrongly present in it.

The provided methods execute and dispatch are internally implemented to ensure idempotency.

Example:

background Layer 1 Copy
from zenaton.abstracts.workflow import Workflow from zenaton.traits.zenatonable import Zenatonable class WelcomeFlow(Workflow, Zenatonable): def __init__(self, user): self.email = user.email self.slack_id = user.slack_id def handle(self): SendWelcomeEmail(self.email).execute() IntroduceUserThroughSlack(self.slack_id).execute()

There are two ways to create a workflow. Both of them require implementing the Handler interface. The Handler interface has one required method: func Handle() (interface{}, error).

  • The simpler way to define a workflow is to call the workflow.New() function. You must provide a name and a handle function of the form: func () (interface{}, error). Under the hood we create a Handler Interface for you using the provided function. For example:
background Layer 1 Copy
import "github.com/zenaton/zenaton-go/v1/zenaton/workflow" var SimpleWorkflow = workflow.New("SimpleWorkflow", func() (interface{}, error) { ... // business logic of the workflow })
  • If you want to implement the Handler Interface on your own, you must call the workflow.NewCustom() function. This function takes a name and an instance of your type that has a Handle method. You can also optionally provide an Init method that takes any number and type of arguments and initializes the workflow with data. For example:
background Layer 1 Copy
import "github.com/zenaton/zenaton-go/v1/zenaton/workflow" var WelcomeWorkflow = workflow.NewCustom("WelcomeWorkflow", &Welcome{}) type Welcome struct { //Fields must be exported, as they will need to be serialized Email string SlackID string } func (w *Welcome) Init(user User) { w.Email = user.Email w.SlackID = user.SlackID } func (w *Welcome) Handle() (interface{}, error) { SendWelcomeEmail(w.Email).Execute() IntroduceUserThroughSlack(w.SlackID).Execute() }

Start

You need to setup Zenaton with your credentials:

background Layer 1 Copy
Zenaton\Client::init($app_id, $api_token, $app_env);
background Layer 1 Copy
const { Client } = require("zenaton"); Client.init(app_id, api_token, app_env);
background Layer 1 Copy
const { Client } = require("zenaton"); const client = new Client(app_id, api_token, app_env);
background Layer 1 Copy
Zenaton::Client(app_id, api_token, app_env);
background Layer 1 Copy
Zenaton.client.Client(app_id, api_token, app_env)
background Layer 1 Copy
zenaton.InitClient(appID, apiToken, appEnv)

Then launching a workflow is as easy as:

background Layer 1 Copy
(new WelcomeFlow($user))->dispatch();
background Layer 1 Copy
await new WelcomeFlow(user).dispatch();
background Layer 1 Copy
client.run.workflow("SequentialWorkflow", email, slackId);
background Layer 1 Copy
WelcomeFlow.new(user).dispatch();
background Layer 1 Copy
WelcomeFlow(self.user).dispatch();
background Layer 1 Copy
WelcomeFlow.New(w.User).Dispatch();

dispatch returns a Promise which will resolve when the Agent confirms that your workflow has been properly scheduled.

We use await here for simplicity, but of course if your Javascript stack does not support async/await (or you are in a module scope) you can use the more traditional then()/catch() syntax.
background Layer 1 Copy
new WelcomeFlow(user).dispatch().catch((err) => { console.error(err); });

If you want to reference this workflow later, just implement a id public method in your workflow implementation that provides the id Zenaton should use to retrieve this workflow instance, eg in WelcomeFlow.phpWelcomeFlow.jsWelcomeFlow.jsWelcomeFlow.rbWelcomeFlow.pyWelcomeFlow.go

background Layer 1 Copy
// [...] public function getId() { return $this->email; }
background Layer 1 Copy
id() { return this.email; }
background Layer 1 Copy
... def id @email end ...
background Layer 1 Copy
... def id(self) return self.email ...
background Layer 1 Copy
... func (w *Welcome) ID() string { return self.email } ...
To be valid, this id method MUST be unique (meaning in the same environment, you can not have two running instances of the same workflow with the same id). This id method must have the form func ID() string

Pause, Resume, Kill

You can pause a workflow’s instance

background Layer 1 Copy
WelcomeFlow::whereId($email)->pause();
background Layer 1 Copy
await WelcomeFlow.whereId(email).pause();
background Layer 1 Copy
client.select.workflow("WelcomeFlow").withTag(email).pause();
background Layer 1 Copy
WelcomeFlow.where_id(email).pause
background Layer 1 Copy
WelcomeFlow().where_id(email).pause()
background Layer 1 Copy
WelcomeFlow.WhereID(email).Pause()

and later resume it

background Layer 1 Copy
WelcomeFlow::whereId($email)->resume();
background Layer 1 Copy
await WelcomeFlow.whereId(email).resume();
background Layer 1 Copy
client.select.workflow("WelcomeFlow").withTag(email).resume();
background Layer 1 Copy
WelcomeFlow.where_id(email).resume
background Layer 1 Copy
WelcomeFlow().where_id(email).resume()
background Layer 1 Copy
WelcomeFlow.WhereID(email).Resume()

or even kill it

background Layer 1 Copy
WelcomeFlow::whereId($email)->kill();
background Layer 1 Copy
await WelcomeFlow.whereId(email).kill();
background Layer 1 Copy
client.select.workflow("WelcomeFlow").withTag(email).terminate();
background Layer 1 Copy
WelcomeFlow.where_id(email).kill
background Layer 1 Copy
WelcomeFlow().where_id(email).kill()
background Layer 1 Copy
WelcomeFlow.WhereID(email).Kill()

It is also possible to pause, resume or kill worklows directly from Zenaton Interface.

Pause Resume Kill Workflow

Properties

They are the attributes of your task/workflow classes.

background Layer 1 Copy
<?php use Zenaton\Interfaces\WorkflowInterface; use Zenaton\Traits\Zenatonable; class WelcomeFlow implements WorkflowInterface { use Zenatonable; public function __construct(User $user) { $this->email = $user->email; $this->slackId = $user->slack_id; } public function handle() { ... $this->foo = (new TaskA($this->email))->execute(); ... $this->foo = "..." } }
background Layer 1 Copy
const { Workflow } = require("zenaton"); module.exports = Workflow("WelcomeFlow", { init(user) { this.email = user.email; this.slackId = user.slack_id; }, async handle() { ... this.foo = TaskA(this.email).execute(); ... this.foo = "..." } });
background Layer 1 Copy
const { workflow } = require("zenaton"); module.exports = workflow("WelcomeFlow", function* (email) { ... this.foo = yield this.run.task("TaskA", email); ... this.foo = "..." });
background Layer 1 Copy
require 'zenaton' class WelcomeFlow < Zenaton::Interfaces::Workflow include Zenaton::Traits::Zenatonable def initialize(user) @email = user.email @slack_id = user.slack_id end def handle ... @self.foo = TaskA.new(@email).execute ... @self.foo = "..." end end
background Layer 1 Copy
from zenaton.abstracts.workflow import Workflow from zenaton.traits.zenatonable import Zenatonable class WelcomeFlow(Workflow, Zenatonable): def __init__(self, user): self.email = user.email self.slack_id = user.slack_id def handle(self): ... self.foo = TaskA(self.email).execute() ... self.foo = "..."

For example here, you have three properties, email, slack_id and foo. The two first are created inside your constructor.
But you can also create a new one later like foo, inside the handle method or the on_event method.

You can use and mutate it when you want.

It's just regular class attributes!

Notice that we expose them in the dashboard.

How do I access my task and workflow properties?

You can either get them from the SDK or view them in the Zenaton Dashboard.
You can view task and workflow properties for each instance. You can also see all their different values through steps.

See your properties in the dashboard

Errors

If an error occurred during a task execution, then this workflow instance will automatically be paused and you will have to resume it manually here. You can also retry them manually directly from the interface.

Error Retry Workflow Dashboard

If an error is returned from one of your tasks, you can handle it normally in your Workflow. Eg:

background Layer 1 Copy
var a int err := tasks.A.New().Execute().Output(&a) if err != nil { ... //handle error }

Note: If you have a custom error type, the information will be lost. Here we just return a standard go error where err.Error() matches the output of the err.Error() that was returned from the task.

For parallel tasks, you will receive a slice of errors. This slice will be nil if no error occurred. If there was an error in one of the parallel tasks, you will receive a slice of the same length as the input tasks, and the index of the task that produced an error will be the same index as the non-nil err in the slice of errors. Eg:

background Layer 1 Copy
var a int var b int errs := task.Parallel{ tasks.A.New(), tasks.B.New(), }.Execute().Output(&a, &b) if errs != nil { if errs[0] != nil { // tasks.A error } if errs[1] != nil { // tasks.B error } }

If your task panics, then this workflow instance will automatically be paused and you will have to resume it manually here.