Getting Started

Introduction

Zenaton is a service for developers to programmatically build, run and scale long-running workflows in a scalable and resilient way.

A workflow is something like this: do this, then that, and if the result is ok then this; if not, then wait 2 days and do that - you get the idea.

It’s very easy to describe workflows in plain words, but tricky to implement them in real life. Even for trivial workflows, lots of difficulties arise quickly:

  • How to handle a task failure?
  • How to pause / resume your workflows?
  • How to obtain analytics on your workflow executions?
  • How to scale and monitor your workers’ infrastructure?
  • How to modify your workflows while some instances are still running?

Zenaton was developed to handle those problems for you and make your life as a developer much, much easier, without the fuss of a state machine or a workflow engine. Implementing a workflow becomes very simple with Zenaton handling all the complicated stuff for you.

A good habit when implementing a workflow is to ensure a clear separation between the control flow and the tasks. As an example, let’s imagine a typical moderation workflow:

When a user submits an article:

  • Send moderator a reminder email
  • Wait for moderation, up to 2 days
  • In case of rejection, warn the user
  • Else or after 2 days without decision, automatically validates the review

With Zenaton, this could easily be implemented like this:

use Zenaton\Interfaces\WorkflowInterface;
use Zenaton\Tasks\Wait;
use Zenaton\Traits\Zenatonable;

class ModerationWorkflow implements WorkflowInterface
{
    use Zenatonable;

    protected $review;
    protected $user;
    protected $moderator;

    public function __construct($review, $user, $moderator)
    {
        $this->review = $review;
        $this->user = $user;
        $this->moderator = $moderator;
    }

    public function handle()
    {
        (new SendReminderEmailToModerator($this->moderator, $this->review))->execute();

        $event = (new Wait(ReviewModeratedEvent::class))->days(2)->execute();

        if ($event && $event->rejected) {
            (new SendRejectionEmailToUser($this->user, $this->review))->execute();
        } else {
            (new ValidateReview($this->review))->execute();
        }
    }

    public function getId()
    {
        return $this->review->id;
    }
}
const { Workflow, Wait } = require("zenaton");
const SendReminderEmail = require("./SendReminderEmail");
const SendRejectionEmail = require("./SendRejectionEmail");
const ValidateReview = require("./ValidateReview");

module.exports = Workflow("ModerationWorkflow", {

  init(review, user, moderator) {
    this.review = review;
    this.user = user;
    this.moderator = moderator;
  },

  async handle() {
    await new SendReminderEmail(this.moderator, this.review).execute();

    const event = await new Wait("ReviewModeratedEvent").days(2).execute();

    if (event && event.data.rejected) {
      await new SendRejectionEmail(this.user, this.review).execute();
    } else {
      await new ValidateReview(this.review).execute();
    }
  },

  id() {
      return this.review.id;
  }

});
require 'zenaton'

class ModerationWorkflow < Zenaton::Interfaces::Workflow
  include Zenaton::Traits::Zenatonable

  def initialize(review, user, moderator)
    @review = review
    @user = user
    @moderator = moderator
  end

  def handle
    SendReminderEmailToModerator.new(@moderator, @review).execute

    event = Zenaton::Tasks::Wait.new(ReviewModeratedEvent).days(2).execute

    if event && event.rejected?
      SendRejectionEmailToUser.new(@user, @review).execute
    else
      ValidateReview.new(@review).execute
    end
  end

  def id
    @review.id
  end
end

from zenaton.abstracts.workflow import Workflow
from zenaton.traits.zenatonable import Zenatonable
from zenaton.tasks.wait import Wait

class ModerationWorkflow(Workflow, Zenatonable):

    def __init__(self, review, user, moderator):
        self.review = review
        self.user = user
        self.moderator = moderator

    def handle(self):
        SendReminderEmailToModerator(self.review, self.moderator).execute()
        event = Wait(ReviewModeratedEvent).days(2).execute()

        if event and event.is_rejected:
            SendRejectionEmailToUser(self.user, self.review).execute()
        else:
            ValidateReview(self.review).execute()

    def id(self):
        return self.review.id
import (
    "github.com/zenaton/zenaton-go/v1/zenaton/workflow"
    "github.com/zenaton/zenaton-go/v1/zenaton/task"
)


var ModerationWorkflow = workflow.NewCustom("ModerationWorkflow", &Moderation{})

type Moderation struct {
    Review
    User
    Moderator
}

func (m *Moderation) Init(review Review, user User, moderator Moderator) {
    m.Review = review
    m.User = user
    m.Moderator = moderator
}

func (m *Moderation) Handle() (interface{}, error){

    SendReminderEmailToModerator.New(m.Review, m.Moderator).Execute()

    var event ReviewModeratedEvent
    task.Wait().ForEvent("ReviewModeratedEvent").Days(2).Execute().Output(&event)

    if event.Rejected {
        SendRejectionEmailToUser.New(m.User, m.Review).Execute()
    } else {
        ValidateReview.New(m.Review).Execute()
    }

    return nil, nil
}

func (m *Moderation) ID() string {
    return m.Review.ID
}

After having configured your zenaton Client

Zenaton\Client::init($app_id, $api_token, $app_env);
const { Client } = require("zenaton");

Client.init(app_id, api_token, app_env);
Zenaton::Client.init(app_id, api_token, app_env);
client = Client(app_id, api_token, app_env) #from zenaton.client import Client
zenaton.InitClient(appID, apiToken, appEnv)

You can start a workflow by dispatching it:

(new ModerationWorkflow($review, $user, $moderator))->dispatch();
await new ModerationWorkflow(review, user, moderator).dispatch();
ModerationWorkflow.new(review, user, moderator).dispatch
ModerationWorkflow(review, user, moderator).dispatch()
ModerationWorkflow.New(review, user, moderator).Dispatch()

The id method in ModerationWorkflow provides a way to identify workflow instances - eg. sending an event to the instance with id reviewId is done through:

ModerationWorkflow::whereId($reviewId)->send(new ReviewModeratedEvent($result));
await ModerationWorkflow.whereId(reviewId).send("ReviewModeratedEvent", { rejected: true });
ModerationWorkflow.where_id(review_id).send_event(ReviewModeratedEvent.new(result))
ModerationWorkflow().where_id(review_id).send_event(ReviewModeratedEvent(result))
ModerationWorkflow.WhereID(reviewID).Send("ReviewModeratedEvent", ReviewModeratedEvent{Rejected: true})

From then, Zenaton agents (installed on your servers) will trigger tasks (here SendReminderEmailToModerator, SendRejectionEmailToUser and ValidateReview) according to your ModerationWorkflow implementation and send results to Zenaton engine that will maintain state, ensure consistency, and trigger potential events and time-based tasks.

This is very simple code, but you already have a lot of benefits from Zenaton:

  • You do not have to manage state anymore, Zenaton does it for you.
  • If a task fails, you will be warned by Zenaton and be able to resume the workflow from where it failed after you fixed your code;
  • It’s scalable - without any changes, your tasks can be executed in parallel on as many servers as you want;
  • You can have long-running workflows without being afraid of having a thread killed;
  • Last but not least, the code is highly readable. Your last arrived developers will easily understand what it does.

Installation

For convenience, we provide a zenaton command line interface. To install it globally:

curl https://install.zenaton.com | sh
Depending on your configuration, you may be asked for a password to allow global installation of Zenaton.

A Zenaton agent is a binary client installed in your server. Once launched, it will listen to the Zenaton engine and trigger task executions as needed by your workflows.


Examples

Now, it's already a good time to play with Zenaton!

Download our examples repo wherever you want (on the same machine where you already launched a Zenaton agent)

git clone https://github.com/zenaton/examples-php.git
git clone https://github.com/zenaton/examples-node.git
git clone https://github.com/zenaton/examples-ruby.git
git clone https://github.com/zenaton/examples-python.git

Download our examples repo (on the same machine where you already launched a Zenaton agent)

go get github.com/zenaton/examples-go

Then, install all dependencies

composer install
npm install
bundle install
pip3 install -r requirements.txt

And the client library

go get github.com/zenaton/zenaton-go/...

cd into the examples folder

cd $(go env GOPATH)/src/github.com/zenaton/examples-go

Then, add a .env file

cp .env.example .env

And update the .env file with your application id and your api token found here. Now that your .env file is complete, you can launch Zenaton agent with these credentials:

zenaton listen --env=.env --boot=src/bootstrap.php
zenaton listen --env=.env --boot=boot.js
zenaton listen --env=.env --boot=boot.rb
zenaton listen --env=.env --boot=boot.py
zenaton listen --env=.env --boot=boot/boot.go

Note: .env is the default value for --env option. You can also omit it here.

A Zenaton agent is natively able to handle concurrent tasks. There is no need to launch it more than once on the same server. Your credentials will be used to listen to any task sent by your application to this agent, within the provided environment.

An environment is a way to isolate workflow instances between production and development. A workflow is always launched within a given environment - and its tasks are handled only by agents listening to this environment.

Now you can look at all provided examples - a launcher is provided for each of them, eg. launch_wait.js

<?php
require __DIR__.'/autoload.php';

(new WaitWorkflow())->dispatch();
// Get the configured Zenaton client
require("./client");

// Start a new instance of the workflow
const WaitWorkflow = require("./Workflows/WaitWorkflow");
new WaitWorkflow().dispatch();
# Get the configured Zenaton client
require './client'

# Start a new instance of the workflow
require './workflows/wait_workflow'
WaitWorkflow.new.dispatch
# Get the configured Zenaton client
import .client

# Start a new instance of the workflow
from .workflows.wait_workflow import WaitWorkflow
WaitWorkflow().dispatch()
package main

import (
    _ "github.com/zenaton/excamples-go" // Initializes Zenaton client with credentials
    "github.com/zenaton/excamples-go/workflows"
)

// Start a new instance of the workflow
workflows.WaitWorkflow().Dispatch()

such that

php bin/launch_wait.php
node launch_wait.js
ruby launch_wait.rb
python3 launch_wait.py
go run wait/main.go

will execute this WaitWorkflow workflow:

<?php

use Zenaton\Interfaces\WorkflowInterface;
use Zenaton\Tasks\Wait;
use Zenaton\Traits\Zenatonable;

class WaitWorkflow implements WorkflowInterface
{
    use Zenatonable;

    public function handle()
    {
        (new TaskA())->execute();
        (new Wait())->seconds(5)->execute();
        (new TaskB())->execute();
    }
}
const { Workflow, Wait } = require("zenaton");
const TaskA = require("../Tasks/TaskA");
const TaskB = require("../Tasks/TaskB");

module.exports = Workflow("WaitWorkflow", async function() {
    await new TaskA().execute();

    await new Wait().seconds(5).execute();

    await new TaskB().execute();
});
require './tasks/task_a'
require './tasks/task_b'

class WaitWorkflow < Zenaton::Interfaces::Workflow
    include Zenaton::Traits::Zenatonable

    def handle
        TaskA.new.execute
        Zenaton::Tasks::Wait.new.seconds(5).execute
        TaskB.new.execute
    end
end

from zenaton.abstracts.workflow import Workflow
from zenaton.traits.zenatonable import Zenatonable
from zenaton.tasks.wait import Wait

from tasks.task_a import TaskA
from tasks.task_b import TaskB


class WaitWorkflow(Workflow, Zenatonable):

    def handle(self):
        TaskA().execute()
        Wait().seconds(5).execute()
        TaskB().execute()
package workflows

import (
    "github.com/zenaton/examples-go/tasks"
    "github.com/zenaton/zenaton-go/v1/zenaton/task"
    "github.com/zenaton/zenaton-go/v1/zenaton/workflow"
)

var WaitWorkflow = workflow.New("WaitWorkflow",
    func() (interface{}, error) {

        tasks.A.New().Execute()

        task.Wait().Seconds(5).Execute()

        tasks.B.New().Execute()
        return nil, nil
    })

These examples include:

  • launch_sequential illustrates a sequential execution of tasks and how you can use a task result in a following task;
  • launch_parallel illustrates some parallel executions of tasks and how the workflow waits for all results before going further;
  • launch_asynchronous illustrates some asynchronous "fire and forget" executions;
  • launch_event illustrates how you can retrieve a workflow instance, send an event to it, and handle this event to modify workflow execution;
  • launch_wait illustrates how you can use the provided Waitclass to manage time in your workflow;
  • launch_wait_event illustrates how you can use the provided WaitEventclass to wait for an external event before going further (with a timeout);

Feel free to play with these examples, eg.

  • launch multiple workflows in parallel,
  • run and modify each workflow,
  • run multiple Zenaton agents on different machines (installed and configured as described above).
Last but not least, you can add an intentional error in these examples to see how the workflow stops and how you can see the error here. Fix your code and click 'retry' to resume the workflow execution without any additional effort!

Task Output

Everything written to stdout during your tasks execution will be in the zenaton.out file, that you'll find beside your sources. Eg, after launch_wait.js you should see:

Task A starts
Task A ends
Task B starts
Task B ends

Task Error

Everything written in stderr during your task execution will be in the zenaton.err file, that you'll find beside your sources. If something seems to go wrong, this is the first place to look.