Zenaton white logo

Workflow basics

Implementation

The only requirements to write a workflow are:

  • Implementing Zenaton\Interfaces\WorkflowInterface that requires only a handle method that will be called to run the workflow;
  • Using Zenaton\Traits\Zenatonable trait, that defines dispatch and execute methods
  • Using the provided Workflow function;
  • Using the provided workflow function;
  • Inheriting from Zenaton::Interfaces::Workflow that requires only a handle method that will be called to run the workflow;
  • Including Zenaton::Traits::Zenatonable module, that defines dispatch and execute methods
  • Inheriting from Zenaton.abstracts.workflow.Workflow class that requires only a handle method that will be called to run the workflow;
  • Inheriting from zenaton.traits.zenatonable.Zenatonable class, that defines dispatch and execute methods
  • Using either of the provided functions workflow.New or workflow.NewCustom;
  • Being idempotent. In more practical terms, it means it must implement a logical flow and NOT the tasks themselves.

Idempotence implies that any actions (such as requesting a database, writing/reading a file, using current time, sending an email, echoing in console, etc.) that have side effects or that need access to potentially changing information MUST be done within tasks (not from within workflows).

As Zenaton engine triggers the execution of the class describing a workflow each time it has to decide what to do next, failing to follow the idempotence requirement will lead to multiple executions of actions wrongly present in it.

The provided methods execute and dispatch are internally implemented to ensure idempotency.

Example:

const { workflow } = require("zenaton");

module.exports = workflow("WelcomeFlow", function* (email, slackId) {
  yield this.run.task("SendWelcomeEmail", email);
  yield this.run.task("IntroduceUserToSlack", slackId);
});

There are two ways to create a workflow. Both of them require implementing the Handler interface. The Handler interface has one required method: func Handle() (interface{}, error).

  • The simpler way to define a workflow is to call the workflow.New() function. You must provide a name and a handle function of the form: func () (interface{}, error). Under the hood we create a Handler Interface for you using the provided function. For example:
import "github.com/zenaton/zenaton-go/v1/zenaton/workflow"

var SimpleWorkflow = workflow.New("SimpleWorkflow",
func() (interface{}, error) {
... // business logic of the workflow
})
  • If you want to implement the Handler Interface on your own, you must call the workflow.NewCustom() function. This function takes a name and an instance of your type that has a Handle method. You can also optionally provide an Init method that takes any number and type of arguments and initializes the workflow with data. For example:
import "github.com/zenaton/zenaton-go/v1/zenaton/workflow"

var WelcomeWorkflow = workflow.NewCustom("WelcomeWorkflow", &Welcome{})

type Welcome struct {
    //Fields must be exported, as they will need to be serialized
    Email string
    SlackID string
}

func (w *Welcome) Init(user User) {
    w.Email = user.Email
    w.SlackID = user.SlackID
}

func (w *Welcome) Handle() (interface{}, error) {
    SendWelcomeEmail(w.Email).Execute()
    IntroduceUserThroughSlack(w.SlackID).Execute()
}

Start

You need to setup Zenaton with your credentials:

Zenaton\Client::init($app_id, $api_token, $app_env);
const { Client } = require("zenaton");

Client.init(app_id, api_token, app_env);
const { Client } = require("zenaton");
const client = new Client(app_id, api_token, app_env);
Zenaton::Client(app_id, api_token, app_env);
Zenaton.client.Client(app_id, api_token, app_env)
zenaton.InitClient(appID, apiToken, appEnv)

Then launching a workflow is as easy as:

(new WelcomeFlow($user))->dispatch();
await new WelcomeFlow(user).dispatch();
client.run.workflow("SequentialWorkflow", email, slackId);
WelcomeFlow.new(user).dispatch();
WelcomeFlow(self.user).dispatch();
WelcomeFlow.New(w.User).Dispatch();

dispatch returns a Promise which will resolve when the Agent confirms that your workflow has been properly scheduled.

We use await here for simplicity, but of course if your Javascript stack does not support async/await (or you are in a module scope) you can use the more traditional then()/catch() syntax.
new WelcomeFlow(user).dispatch().catch((err) => {
    console.error(err);
});

If you want to reference this workflow later, you must associate a tag with it when you launch it.
Choose something unique, for example the username, the order_id, ...

client.run.withTag(email).workflow("WelcomeFlow", user);

To be valid, this tag method MUST be unique (meaning in the same environment, you can not have two running instances of the same workflow with the same id).

Pause, Resume, Kill

You can pause a workflow’s instance

WelcomeFlow::whereId($email)->pause();
await WelcomeFlow.whereId(email).pause();
client.select.workflow("WelcomeFlow").withTag(email).pause();
WelcomeFlow.where_id(email).pause
WelcomeFlow().where_id(email).pause()
WelcomeFlow.WhereID(email).Pause()

and later resume it

WelcomeFlow::whereId($email)->resume();
await WelcomeFlow.whereId(email).resume();
client.select.workflow("WelcomeFlow").withTag(email).resume();
WelcomeFlow.where_id(email).resume
WelcomeFlow().where_id(email).resume()
WelcomeFlow.WhereID(email).Resume()

or even kill it

WelcomeFlow::whereId($email)->kill();
await WelcomeFlow.whereId(email).kill();
client.select.workflow("WelcomeFlow").withTag(email).terminate();
WelcomeFlow.where_id(email).kill
WelcomeFlow().where_id(email).kill()
WelcomeFlow.WhereID(email).Kill()

It is also possible to pause, resume or kill worklows directly from Zenaton Interface.

Pause Resume Kill Workflow

Properties

They are the attributes of your task/workflow classes.

<?php
    
use Zenaton\Interfaces\WorkflowInterface;
use Zenaton\Traits\Zenatonable;

class WelcomeFlow implements WorkflowInterface
{
    use Zenatonable;

    public function __construct(User $user)
    {
        $this->email = $user->email;
        $this->slackId = $user->slack_id;
    }

    public function handle()
    {
        ...
        $this->foo = (new TaskA($this->email))->execute();
        ...
        $this->foo = "..."
    }
}
const { Workflow } = require("zenaton");

module.exports = Workflow("WelcomeFlow", {

  init(user) {
    this.email = user.email;
    this.slackId = user.slack_id;
  },

  async handle() {
    ...
    this.foo = TaskA(this.email).execute();
    ...
    this.foo = "..."
  }
});
const { workflow } = require("zenaton");

module.exports = workflow("WelcomeFlow", function* (email) {
  ...
  this.foo = yield this.run.task("TaskA", email);
  ...
  this.foo = "..."
});
require 'zenaton'

class WelcomeFlow < Zenaton::Interfaces::Workflow
  include Zenaton::Traits::Zenatonable

  def initialize(user)
    @email = user.email
    @slack_id = user.slack_id
  end

  def handle
    ...
    @self.foo = TaskA.new(@email).execute
    ...
    @self.foo = "..."
  end
end
from zenaton.abstracts.workflow import Workflow
from zenaton.traits.zenatonable import Zenatonable

class WelcomeFlow(Workflow, Zenatonable):

    def __init__(self, user):
        self.email = user.email
        self.slack_id = user.slack_id

    def handle(self):
        ...
        self.foo = TaskA(self.email).execute()
        ...
        self.foo = "..."

For example here, you have one property: foo. You can also create a new one later, inside the handle method or the onEvent method.

You can use and mutate it when you want.

It's just regular class attributes!

Notice that we expose them in the dashboard.

How do I access my task and workflow properties?

You can either get them from the SDK or view them in the Zenaton Dashboard.
You can view task and workflow properties for each instance. You can also see all their different values through steps.

See your properties in the dashboard

Errors

If an error occurred during a task execution, then this workflow instance will automatically be paused and you will have to resume it manually here. You can also retry them manually directly from the interface.

Error Retry Workflow Dashboard

If an error is returned from one of your tasks, you can handle it normally in your Workflow. Eg:

var a int
err := tasks.A.New().Execute().Output(&a)
if err != nil {
    ... //handle error
}

Note: If you have a custom error type, the information will be lost. Here we just return a standard go error where err.Error() matches the output of the err.Error() that was returned from the task.

For parallel tasks, you will receive a slice of errors. This slice will be nil if no error occurred. If there was an error in one of the parallel tasks, you will receive a slice of the same length as the input tasks, and the index of the task that produced an error will be the same index as the non-nil err in the slice of errors. Eg:

var a int
var b int

errs := task.Parallel{
    tasks.A.New(),
    tasks.B.New(),
}.Execute().Output(&a, &b)

if errs != nil {
    if errs[0] != nil {
        // tasks.A error
    }
    if errs[1] != nil {
        // tasks.B error
    }
}

If your task panics, then this workflow instance will automatically be paused and you will have to resume it manually here.