Real World Examples

Scrape & Analyze Restaurant Reviews

This workflow introduces the Parallel and execute Methods.

This simple ETL Workflow scrapes all of the Tripadvisor reviews for a specific restaurant for the current month and then runs a sentiment analysis on each review to determine the positive or negative aspects. Then, the results are saved to a Google sheet where a report is generated.

  • First, it gets the current month.
  • Then we pull all of the tripadvisor urls for that restaurant for that month.
  • Then a sentiment analysis script is launched for each review (in parallel) to determine if it is positive or negative.
  • Finally, the data is saved to a google sheet.

Flowchart of the workflow

This flowchart helps us to visualize the tasks and logic of the workflow.

review analysis workflow

Workflow Code

This workflow is the code that orchestrates tasks (through the Zenaton workflow engine) and executes them on your servers. Tasks inside the workflow are not detailed here.

review_analysis_workflow.rb

<?php

use Zenaton\Interfaces\WorkflowInterface;
use Zenaton\Parallel\Parallel;
use Zenaton\Traits\Zenatonable;

class ReviewAnalysisWorkflow implements WorkflowInterface
{
    use Zenatonable;

    public function __construct($restaurantName, $tripAdvisorUrl)
    {
        $this->restaurantName = $restaurantName;
        $this->tripAdvisorUrl = $tripAdvisorUrl;
    }

    public function handle()
    {
        $currentMonth = (new GetCurrentMonth())->execute();
        $reviews = (new ScrapeReviewsOfTheMonth($this->tripAdvisorUrl, $currentMonth))->execute();

        $postiveAndNegativeAspects = (new Parallel(
            ...array_map(
                function ($review) { return new SentimentAnalysis($review); },
                $reviews
            )
        ))->execute();

        (new SaveToGoogleSheet(
            $this->restaurantName,
            $currentMonth,
            count($reviews),
            $postiveAndNegativeAspects
        ))->execute();
    }
}
const { Workflow, Parallel } = require("zenaton");
const GetCurrentMonth = require("../Tasks/GetCurrentMonth");
const ScrapeReviewsOfTheMonth = require("../Tasks/ScrapeReviewsOfTheMonth");
const SentimentAnalysis = require("../Tasks/SentimentAnalysis");
const SaveToGoogleSheet = require("../Tasks/SaveToGoogleSheet");

module.exports = Workflow("ReviewAnalysisWorkflow", {

    init(restaurantName, tripAdvisorUrl) {
        this.restaurantName = restaurantName;
        this.tripAdvisorUrl = tripAdvisorUrl;
    },
    async handle() {
        const currentMonth = await new GetCurrentMonth().execute();
        const reviews = await new ScrapeReviewsOfTheMonth(
            this.tripAdvisorUrl,
            currentMonth
        ).execute();

        const postiveAndNegativeAspects = new Parallel(
            ...reviews.map(review => new SentimentAnalysis(review))
        );

        await new SaveToGoogleSheet(
            this.restaurantName,
            currentMonth,
            reviews.length,
            postiveAndNegativeAspects
        ).execute();
    }
});
const { workflow } = require("zenaton");

module.exports = workflow("ReviewAnalysisWorkflow", {
    *handle(restaurantName, tripAdvisorUrl) {
        const google_sheets = this.connector('google_sheets', 'your-connector-id');

        const currentMonth = yield this.run.task("GetCurrentMonth");

        const reviews = yield this.run.task("ScrapeReviewsOfTheMonth",
            tripAdvisorUrl,
            currentMonth
        );

        tasks = reviews.map((review) => ["SentimentAnalysis", review]);
        const postiveAndNegativeAspects = yield this.run.task(tasks) // run in parallel

        const cells = this.format_cells(restaurantName, currentMonth, postiveAndNegativeAspects);
        yield google_sheets.post(`v4/spreadsheets/${spreadsheetId}:batchUpdate`, cells)
    },
    format_cells(restaurantName, currentMonth, postiveAndNegativeAspects) {
        // ...
    }
});
require "./tasks/get_current_month"
require "./tasks/scrape_reviews_of_the_month"
require "./tasks/sentiment_analysis"
require "./tasks/save_to_google_sheet"

class ReviewAnalysisWorkflow < Zenaton::Interfaces::Workflow
  include Zenaton::Traits::Zenatonable
  
  def initialize(restaurant_name, trip_advisor_url)
    @restaurant_name = restaurant_name
    @trip_advisor_url = trip_advisor_url
  end
  
  def handle
    current_month = GetCurrentMonth.new.execute

    reviews = ScrapeReviewsOfTheMonth.new(@trip_advisor_url, current_month).execute

    postive_and_negative_aspects = Zenaton::Parallel.new(*reviews.map { |review|
      SentimentAnalysis.new(review)
    }).execute

    SaveToGoogleSheet.new(@restaurant_name, current_month, reviews.length, postive_and_negative_aspects).execute
  end
end
from zenaton.abstracts.workflow import Workflow
from zenaton.traits.zenatonable import Zenatonable
from zenaton.parallel import Parallel
from tasks.get_current_month import GetCurrentMonth
from tasks.scrape_reviews_of_the_month import ScrapeReviewsOfTheMonth
from tasks.sentiment_analysis import SentimentAnalysis
from tasks.save_to_google_sheet import SaveToGoogleSheet

class ReviewAnalysisWorkflow(Workflow, Zenatonable):

    def __init__(self, restaurantName, tripAdvisorUrl):
        self.restaurantName = restaurantName
        self.tripAdvisorUrl = tripAdvisorUrl

    def handle(self):
        currentMonth = GetCurrentMonth().execute()
        reviews = ScrapeReviewsOfTheMonth(self.tripAdvisorUrl, currentMonth).execute()

        postiveAndNegativeAspects = Parallel(
            *list(map(lambda review: SentimentAnalysis(review), reviews))
        ).execute()

        SaveToGoogleSheet(
            self.restaurantName,
            currentMonth,
            len(reviews),
            postiveAndNegativeAspects
        ).execute()

Schedule Scrape & Analyze Workflow

We can easily make this workflow recurrent by using the schedule method. For instance, the one-line code below schedules the workflow every month.

(new RecurrentWorkflow())->schedule('0 0 1 * *');
new RecurrentWorkflow().schedule('0 0 1 * *');
const { Client } = require("zenaton");
const client = new Client(app_id, api_token, app_env);

client.schedule("0 0 1 * *").workflow("RecurrentWorkflow");
RecurrentWorkflow.new.schedule('0 0 1 * *')
RecurrentWorkflow().schedule('0 0 1 * *')

You will have access on the Zenaton dashboard to all workflows scheduled:

review analysis animation

For more information about scheduling, please refer to the documentation.

Real-Time Monitoring on the Zenaton Dashboard

View a short snippet of the task executions from the dashboard.

review analysis animation

Document Validation

This workflow introduces the Wait, OnEvent, execute and dispatch Methods.

This Workflow illustrates how to implement a document validation process using a third party API/service to verify a user's identity based on the documents that they upload.

  • A user signs up and opens an account and is prompted to upload their documents.
  • According to the account type, some additional documents may be needed. In this example, the user must upload 2 documents : an ID and a proof of address.
  • After 3 days, if the user has not uploaded the documents, a reminder is sent to tell them that they must import the remaining documents.
  • We’ll send a maximum of 3 reminders spaced 3 days apart.
  • If the user does not import their documents the account will not be opened, and we would email the user to notify them that they needs to sign in again and create the account.
  • When a user does import the documents, we would check with our third party service to make sure that they are valid.
  • If the documents are not valid, we would notify the user by email and tell them the reason.
  • When the documents have been validated, we will notify the user that the process is complete.

Flowchart of the workflow

This flowchart is a visual representation of the different tasks in the workflow.

Workflow Code

This workflow is the code that orchestrates the tasks through the Zenaton workflow engine and are then executed on your servers. Tasks will be dispatched as soon as a user is prompted to upload their documents and the tasks will be executed "behind the scenes" on your workers.

document_validation.rb

<?php
  
use Zenaton\Interfaces\EventInterface;
use Zenaton\Interfaces\WorkflowInterface;
use Zenaton\Tasks\Wait;
use Zenaton\Traits\Zenatonable;

class DocumentValidationWorkflow implements WorkflowInterface
{
    use Zenatonable;

    protected $requestId;
    protected $remainingReminders = 3;
    protected $documents = [];

    public function __construct($requestId)
    {
        $this->requestId = $requestId;
    }

    public function handle()
    {
        $request = (new GetRequestInformation($this->requestId))->execute();

        $this->documents = $request->documents;

        while (count($this->documents) > 0 && $this->remainingReminders > 0) {        
            $event = (new Wait(ProofReceivedEvent::class))->days(3)->execute();

            if (!$event) {
                (new SendReminderEmail($request))->dispatch();
                --$this->remainingReminders;
            }
        }

        if (count($this->documents) === 0) {
            (new SendSuccessValidationEmail($request))->dispatch();
        } else {
            (new SendFailureValidationEmail($request))->dispatch();
        }
    }

    public function getId()
    {
        return $this->requestId;
    }
}
require "./tasks/get_request_information"
require "./tasks/send_reminder_email"
require "./tasks/send_success_validation_email"
require "./tasks/send_failure_validation_email"
require "./events/proof_received_event"

class DocumentValidationWorkflow < Zenaton::Interfaces::Workflow
  include Zenaton::Traits::Zenatonable
  def initialize(requestId)
    @request_id = requestId
    @documents = []
    @remaining_reminders = 3
  end

  def handle
    counter = 0
    request = GetRequestInformation(@request_id).new.execute
    @documents = request.documents

    loop do
      event = Zenaton::Tasks::Wait.new(ProofReceivedEvent).days(3).execute
      if !event
        SendReminderEmail.new.execute
        @remaining_reminders -= 1
      end
      break if @documents.length == 0 || @remaining_reminders == 0
    end

    if @documents.length == 0
      SendSuccessValidationEmail.new.dispatch
    else
      SendFailureValidationEmail.new.dispatch
    end
  end

  def id
    @request_id
  end
end
from zenaton.abstracts.workflow import Workflow
from zenaton.traits.zenatonable import Zenatonable
from zenaton.tasks.wait import Wait
from tasks.get_request_information import GetRequestInformation
from tasks.send_reminder_email import SendReminderEmail
from tasks.send_success_validation_email import SendSuccessValidationEmail
from tasks.send_failure_validation_email import SendFailureValidationEmail
from events.proof_received_event import ProofReceivedEvent


class DocumentValidationWorkflow(Workflow, Zenatonable):
    def __init__(self, requestId):
        self.requestId = requestId
        self.remainingReminders = 3
        self.documents = []
    def handle(self):

        GetRequestInformation(self.requestId).execute()
        self.documents = request.documents
    
        while count(self.documents) > 0 and self.remainingReminders > 0:
            event = Wait(ProofReceivedEvent).days(3).execute()
            if not event
                SendReminderEmail(self.request).dispatch()
                self.remainingReminders -= 1
            break

        if count(self.documents) == 0:
            SendSuccessValidationEmail().dispatch()
        else:
            SendFailureValidationEmail().dispatch()
        
    def id(self):
        return self.requestId
const { Wait, Workflow } = require('zenaton')
const GetRequestInformation = require('../Tasks/GetRequestInformation')
const SendReminderEmail = require('../Tasks/SendReminderEmail')
const SendSuccessValidationEmail = require('../Tasks/SendSuccessValidationEmail')
const SendFailureValidationEmail = require('../Tasks/SendFailureValidationEmail')

module.exports = Workflow('DocumentValidationWorkflow', {
  init (requestId) {
    this.requestId = requestId
    this.counter = null
  },

  async handle () {
    let remainingReminders = 3
    const request = await new GetRequestInformation(this.requestId).execute()

    this.counter = request.documents.length

    while (this.counter > 0 && remainingReminders > 0) {
      const event = await new Wait('ProofReceivedEvent').days(3).execute()

      if (!event) {
        await new SendReminderEmail(request).dispatch()
        remainingReminders--
      }
    } 

    if (this.counter === 0) {
      await new SendSuccessValidationEmail().dispatch()
    } else {
      await new SendFailureValidationEmail().dispatch()
    }
  },

  id () {
    return this.requestId
  },

  onEvent (name, data) {
    if (name === 'ProofReceivedEvent') {
      if (data.isValid) {
        this.counter--
      }
    }
  }
})
const { workflow } = require('zenaton')

module.exports = workflow('DocumentValidationWorkflow', {
  *handle(requestId) {
    const sendgrid = this.connector('sendgrid', 'your-connector-id');

    this.requestId = requestId
    this.counter = null

    let remainingReminders = 3
    const request = yield this.run.task("GetRequestInformation", this.requestId);
    this.counter = request.documents.length

    while (this.counter > 0 && remainingReminders > 0) {
      const event = yield this.wait.event("ProofReceivedEvent").for(duration.days(3));

      if (!event) {
        mail_template_params = {
          body: {
            "template_id": "reminder"
            "personalizations": [
              {
                "subject": `Still ${this.counter} documents to upload`,
                "to": [ { "email": request.user.email } ],
                "dynamic_template_data": {
                  "requestId": requestId,
                  remainingReminders": remainingReminders,
                }
              }
            ]
          }
        }
        sendgrid.post('/mail/send', mail_template_params)

        remainingReminders--
      }
    } 

    if (this.counter === 0) {
      sendgrid.post('/mail/send', {body: {"template_id": "valiation_success", ...}})
    } else {
      sendgrid.post('/mail/send', {body: {"template_id": "validation_failed", ...}})
    }
  },

  *onEvent(name, ...data) {
    if (name === 'ProofReceivedEvent') {
      if (data.isValid) {
        this.counter--
      }
    }
  }
})

Real-time executions of the workflow from the Zenaton Dashboard

View the real-time tasks executions of this workflow.

Automated Onboarding for New Employees

This workflow introduces you the Wait, OnEvent, execute and dispatch Methods.

This Workflow allows a company to automate their employee onboarding process by scheduling meetings, creating accounts for software tools, and sending documents to read. The employees also receive personalize onboarding content via an onboarding form.

  • Prepare paperwork of the employee.
  • If the paperwork has been received, then schedule a pre-onboarding meeting.
  • A form must be completed by the employee.
  • When it has been completed, required accounts are created.
  • Wait until the first day and then send a welcome email to the employee.
  • Based on the entries in the employee's form, provide the relevant credentials and required readings.
  • Finally, schedule another onboarding meeting.
  • If the paperwork was never been received, cancel the onboarding.

Flowchart of the workflow

This flowchart shows a visual representation of the different tasks in the workflow.

Flowchart of employee onboarding

Workflow Code

This workflow is the code that orchestrates tasks (through the Zenaton workflow engine) and executes them on your servers.

employee_onboarding.rb

<?php
        
use Zenaton\Interfaces\WorkflowInterface;
use Zenaton\Tasks\Wait;
use Zenaton\Traits\Zenatonable;

class OnboardingWorkflow implements WorkflowInterface
{
    use Zenatonable;

    public function __construct($employee)
    {
        $this->employee = $employee;
    }

    public function handle()
    {
        (new PreparePaperwork($this->employee))->dispatch();

        $paperworkEvent = (new Wait(PaperworkEvent::class))->execute();

        if ($paperworkEvent->success) {
            (new SchedulePreOnboardingMeeting($this->employee))->dispatch();

            $form = (new Wait(OnboardingFormEvent::class))->execute();

            $accounts = (new CreateAccounts($this->employee, $form->tools))->execute();
            (new OfficeManagement($this->employee, $form))->dispatch();

            (new Wait())->timestamp($form->onboardingDay)->execute();

            (new SendWelcome($this->employee))->dispatch();
            (new SendCredentials($this->employee, $accounts))->dispatch();
            (new SendRequiredReading($this->employee, $form->reading))->dispatch();
            (new ScheduleOnboardingMeetings($this->employee, $form->meetings))->dispatch();
        } else {
            (new CancelOnboarding($this->employee))->dispatch();
        }
    }

    public function getId()
    {
        return $this->employee->id;
    }
}
    
const { Workflow } = require("zenaton");
const PreparePaperwork = require("./Tasks/PreparePaperwork");
const SchedulePreOnboardingMeeting = require("./Tasks/SchedulePreOnboardingMeeting");
const CreateAccounts = require("./Tasks/CreateAccounts");
const OfficeManagement = require("./Tasks/OfficeManagement");
const SendWelcome = require("./Tasks/SendWelcome");
const SendCredentials = require("./Tasks/SendCredentials");
const SendRequiredReading = require("./Tasks/SendRequiredReading");
const ScheduleOnboardingMeetings = require("./Tasks/ScheduleOnboardingMeetings");
const CancelOnboarding = require("./Tasks/CancelOnboarding");

module.exports = Workflow("OnboardingWorkflow", {
  init(employee) {
    this.employee = employee;
  },

  async handle() {
    await new PreparePaperwork(this.employee).dispatch();

    const paperworkEvent = await new Wait("PaperworkEvent").execute();

    if (paperworkEvent.success) {
      await new SchedulePreOnboardingMeeting(this.employee).dispatch();

      const form = await new Wait("OnboardingFormEvent").execute();

      this.accounts = await new CreateAccounts(employee, form.tools).execute();
      await new OfficeManagement(employee, form).dispatch();

      await new Wait().timestamp(form.onboardingDay).execute();

      await new SendWelcome(employee).dispatch();
      await new SendCredentials(employee, this.accounts).dispatch();
      await new SendRequiredReading(employee, form.reading).dispatch();
      await new ScheduleOnboardingMeetings(employee, form.meetings).dispatch();
    } else {
      await new CancelOnboarding(this.employee).dispatch();
    }
  }
});      
    
const { workflow, duration } = require("zenaton");

module.exports = workflow("OnboardingWorkflow", {
  *handle(employee) {
    const slack = this.connector('slack', 'your-slack-connector-id');
    const sendgrid = this.connector('sendgrid', 'your-sendgrid-connector-id');

    this.run.task("PreparePaperwork", employee)

    const paperworkEvent = yield this.wait.event("PaperworkEvent");

    if (paperworkEvent.success) {
      this.run.task("SchedulePreOnboardingMeeting", employee)

      const form = yield this.wait.event("OnboardingFormEvent");

      accounts = yield this.run.task("CreateAccounts", employee, form.tools)

      this.run.task("OfficeManagement", employee, form)

      yield this.wait.for(duration.timestamp(form.onboardingDay));
      
      sendgrid.post('/mail/send', {body: {"template_id": "welcome", ...}})
      sendgrid.post('/mail/send', {body: {"template_id": "credentials", ...}})
      slack.post('/api/chat.postMessage', {body: {"channel": "...", ...}})
      sendgrid.post('/mail/send', {body: {"template_id": "onboarding_meetings", ...}})
    } else {
      this.run.task('CancelOnboarding', employee);
    }
  }
});
require "./Tasks/PreparePaperwork"
require "./Tasks/SchedulePreOnboardingMeeting"
require "./Tasks/CreateAccounts"
require "./Tasks/OfficeManagement"
require "./Tasks/SendWelcome"
require "./Tasks/SendCredentials"
require "./Tasks/SendRequiredReading"
require "./Tasks/ScheduleOnboardingMeetings"
require "./Tasks/CancelOnboarding"

class OnboardingWorkflow < Zenaton::Interfaces::Workflow
  include Zenaton::Traits::Zenatonable
  def initialize(employee)
    @employee = employee
  end
  def handle
    PreparePaperwork.new(employee).execute
    
    paperworkEvent = Zenaton::Tasks::Wait.new(PaperworkEvent).execute

    if paperworkEvent.success 
    
      SchedulePreOnboardingMeeting.new(employee).dispatch

      form = Zenaton::Tasks::Wait.new(OnboardingFormEvent).execute
      accounts = CreateAccounts.new(employee, form.tools).dispatch
      
      OfficeManagement.new(employee, form).dispatch
      Zenaton::Tasks::Wait.new.timestamp(form.onboardingDay).execute
      SendWelcome.new(employee).dispatch
      SendCredentials.new(employee, this.accounts).dispatch
      SendRequiredReading.new(employee, form.reading).dispatch
      ScheduleOnboardingMeetings.new(employee, form.meetings).dispatch
      
      else
      CancelOnboarding.new(employee).dispatch
      end
  end
end
    
from zenaton.abstracts.workflow import Workflow
from zenaton.traits.zenatonable import Zenatonable
from tasks.prepare_paper_work import PreparePaperwork
from tasks.schedule_pre_onboarding_meeting import SchedulePreOnboardingMeeting
from tasks.create_accounts import CreateAccounts
from tasks.office_management import OfficeManagement
from tasks.send_welcome import SendWelcome
from tasks.send_credentials import SendCredentials
from tasks.send_required_reading import SendRequiredReading
from tasks.schedule_onboarding_meetings import ScheduleOnboardingMeetings
from tasks.cancel_onboarding import CancelOnboarding


class OnboardingWorkflow(Workflow, Zenatonable):
    def __init__(self, employee):
        self.employee = employee
        
    def handle(self):
    PreparePaperwork(employee).dispatch()
    
    paperworkEvent = Wait(paperworkEvent).execute()

    if paperworkEvent.success
    
      SchedulePreOnboardingMeeting(employee).dispatch()

      formEvent = Wait(formEvent).execute()
      accounts = CreateAccounts(employee,form.tools).dispatch()
      newOfficeManagement(employee,form)).dispatch()
      
      Wait().timestamp(form.onboardingDay).execute()

      SendWelcome(employee).dispatch()
      SendCredentials(employee,accounts).dispatch()
      SendRequiredReading(employee,form.reading).dispatch()
      ScheduleOnboardingMeetings(employee,form.meetings).dispatch()
      
      else
      CancelOnboarding(employee).dispatch()
    

Real-time executions of the workflow from Zenaton Dashboard

View the real time execution of this workflow from the Zenaton dashboard.

Monitoring Dashboard of onboarding workflow

Classical Workflows on an E-commerce Website

These workflows introduce the Wait, OnEvent, execute and dispatch Methods.

This is a series of workflows that illustrates processes of a basic e-commerce website. Some workflows will trigger other workflows when events happen. Since most of these interactions happen asynchronously, it is common for an ecommerce website to have one or many queueing systems. Depending on the scale, it can quickly become difficult to maintain. By contrast, a single developer could write and deploy each of these workflows within a day and would have very little maintenance. Zenaton offers a way to focus on writing workflows/events without having to maintain the infrastructure and orchestration.
In this series, we will introduce 5 different workflows:

  • Cart Workflow
  • Check Out Workflow
  • Abandoned Cart Workflow
  • Shipping Workflow
  • Refund Workflow

Flowchart of the workflows

This flowchart helps us visualize the series of workflows and events during a user's journey . Workflows are symbolized in green-squares and events in gray-rounds.

ecommerce-workflows

Cart Workflow

The Cart Workflow is the initial workflow that all e-commerce websites require. It starts when a user adds something in their cart on the website.

  • An item is added to the cart
  • If the cart has been completed in one day, then proceed to check out
  • If not, trigger the Abandoned Cart Workflow
  • In case of any change in the cart, update cart information

Flowchart of the Cart Workflow

The Cart Workflow is here represented in a flowchart and the last event triggers two possible workflows.

cart-workflow

Cart Workflow Code

This workflow is the code that orchestrates the tasks through the Zenaton workflow engine and are then executed on your servers. The workflow is triggered as soon as an item is added to the cart. It also shows the ability to send a subworkflow into a workflow.

cart_workflow.rb

<?php
              
use Zenaton\Interfaces\WorkflowInterface;
use Zenaton\Tasks\Wait;
use Zenaton\Traits\Zenatonable;
use Zenaton\Interfaces\EventInterface;

class CartWorkflow implements WorkflowInterface
{
    use Zenatonable;

    public function __construct($cart)
    {
        $this->cart = $cart;
    }

    public function handle()
    {
        $cartCompleted = (new Wait(CartCompleted::class))->days(1)->execute();

        if ($cartCompleted) {
            (new ProceedToCheckout($this->cart))->execute();
        } else {
            (new AbandonedCart($this->cart))->execute();
        }
    }

    public function onEvent(EventInterface $event)
    {
        if ($event instanceof CartCompleted) {
            $this->cart = (new UpdateCart($this->cart))->execute();
        }
    }
}
            
module MyStore
  module Workflows
    class Cart < Zenaton::Interfaces::Workflow
      include Zenaton::Traits::Zenatonable

      def initialize(cart = nil)
        @cart = cart
      end

      def handle
        cart_completed = Zenaton::Tasks::Wait.new(MyStore::Events::CartCompleted).days(1).execute

        if cart_completed
          MyStore::Tasks::ProceedToCheckout.new(@cart).execute
        else
          MyStore::Tasks::AbandonedCart.new(@cart).execute
        end
      end

      def on_event(event)
        case event
        when MyStore::Events::CartUpdated
          @cart = MyStore::Tasks::UpdateCart.new(@cart, event).execute
        end
      end
    end
  end
end
from zenaton.abstracts.workflow import Workflow
from zenaton.traits.zenatonable import Zenatonable
from zenaton.tasks.wait import Wait
from tasks.proceed_to_checkout import ProceedToCheckout
from tasks.abandoned_cart import AbandonedCart
from tasks.update_cart import UpdateCart
from events.cart_completed import CartCompleted

class CartWorkflow(Workflow, Zenatonable):
    def __init__(self, cart):
        self.cart = cart
    def handle(self):
      cart_completed = Wait(CartCompleted).days(1).execute()

      if cart_completed:
        ProceedToCheckout(self.cart).execute()
      else:
        AbandonedCart(self.cart).execute()
        
    def on_event(self, event):
        if isinstance(event, CartCompleted):
            self.cart = UpdateCart(self.cart).execute()
        
    def id(self):
        return self.cart
const { Wait, Workflow } = require("zenaton");
const ProceedToCheckout = require("../Tasks/ProceedToCheckout");
const AbandonedCart = require("../Tasks/AbandonedCart");
const UpdateCart = require("../Tasks/UpdateCart");

module.exports = Workflow("CartWorkflow", {
  init(cart) {
    this.cart = cart;
  },
  async handle() {
    const cartCompleted = await new Wait("CartCompleted").days(1).execute();

    if (cartCompleted) {
      await new ProceedToCheckout(this.cart).execute();
    } else {
      await new AbandonedCart(this.cart).execute();
    }
  },
  async onEvent(name) {
    if (name === "CartCompleted") {
      this.cart = await new UpdateCart(this.cart).execute();
    }
  },
});
            
const { workflow, duration } = require("zenaton");

module.exports = workflow("CartWorkflow", {
  *handle(cart) {
    this.cart = cart
    const cartCompleted = yield this.wait.event("CartCompleted").for(duration.days(1));

    if (cartCompleted) {
      yield this.run.task("ProceedToCheckout", this.cart);
    } else {
      yield this.run.task("AbandonedCart", this.cart);
    }
  },
  *onEvent(name) {
    if (name === "CartCompleted") {
      this.cart = yield this.run.task("UpdateCart", this.cart);
    }
  },
});

Check Out Workflow

The Check Out Workflow is the basic workflow related to payment. It is business-sensitive and many external events can happen at this level.

  • Proceed to payment
  • If after one hour, the payment hasn't been completed, then abandon check out.
  • If the payment is completed, then proceed to shipment.
  • If the payment is partial, trigger a partial payment task.
  • If the customer has paid too much, then the overpaid task is triggered.
  • If the payment is declined, then the checkout is stopped.

Flowchart of the Check Out Workflow

The Check Out Workflow is here represented in a flowchart. Several events can be received, resulting in different scenarios.

cart-workflow

Check Out Workflow Code

This workflow is the code that orchestrates the tasks through the Zenaton workflow engine and are then executed on your servers. The workflow is triggered after the order has been completed. Then, the customer proceeds to payment.

check_out_workflow.rb

<?php

use Zenaton\Interfaces\WorkflowInterface;
use Zenaton\Tasks\Wait;
use Zenaton\Traits\Zenatonable;
use Zenaton\Interfaces\EventInterface;

class CheckoutWorkflow implements WorkflowInterface
{
    use Zenatonable;

    public function __construct($order)
    {
        $this->order = $order;
    }

    public function handle()
    {
        (new ProcessPayment($this->order))->dispatch();
        $paymentCompleted = (new Wait(PaymentCompleted::class))->hours(1)->execute();
        
        if ($paymentCompleted) {
            (new ProceedToShipment($this->order))->execute();
        } else {
            (new AbandonedCheckout($this->order))->execute();
        }
    }

    public function onEvent(EventInterface $event)
    {
        if ($event instanceof PaymentDeclined) {
            (new DeclinePayment($this->order))->execute();
        }
        if ($event instanceof PartialPayment) {
            (new DeclinePartialPayment($this->order))->execute();
        }
        if ($event instanceof OverPaid) {
            (new RefundWorkflow($this->order))->execute();
        }
    }
}
          
module MyStore
module Workflows
require "./tasks/process_payment"
require "./tasks/proceed_to_shipment"
require "./tasks/abandoned_checkout"
require "./tasks/decline_payment"
require "./tasks/decline_partial_payment"
require "./tasks/refund_workflow"
    class CheckoutWorkflow < Zenaton::Interfaces::Workflow
      include Zenaton::Traits::Zenatonable

      def initialize(order)
        @order = order
      end

      def id
        @order.id
      end

      def handle
        MyStore::Tasks::ProcessPayment.new(@order).dispatch
        payment_completed = MyStore::Tasks::Wait.new(MyStore::Events::PaymentCompleted).hours(1).execute

        if payment_completed
          MyStore::Tasks::ProceedToShipment.new(@order).execute
        else
          MyStore::Tasks::AbandonedCheckout.new(@order).execute
        end
      end

      def on_event(event)
        case event
        when MyStore::Events::PaymentDeclined
          MyStore::Tasks::DeclinePayment.new(@order, event).execute
        when MyStore::Events::PartialPayment
          MyStore::Tasks::DeclinePartialPayment.new(@order, event).execute
        when MyStore::Events::RefundWorkflow
          MyStore::Tasks::RefundWorkflow.new(@order, event).execute
        end
      end
    end
  end
end
          
from zenaton.abstracts.workflow import Workflow
from zenaton.traits.zenatonable import Zenatonable
from zenaton.tasks.wait import Wait

from tasks.process_payment import ProcessPayment
from tasks.proceed_to_shipment import ProceedToShipment
from tasks.abandoned_checkout import AbandonedCheckout
from tasks.decline_payment import DeclinePayment
from tasks.decline_partial_payment import DeclinePartialPayment
from tasks.refund_workflow import RefundWorkflow
from events.payment_completed import PaymentCompleted
from events.payment_declined import PaymentDeclined
from events.partial_payment import PartialPayment
from events.over_paid import OverPaid

class CheckoutWorkflow(Workflow, Zenatonable):
    def __init__(self, order):
        self.order = order
    def handle(self):
        ProcessPayment(self.order).dispatch()
        payment_completed = Wait(PaymentCompleted).hours(1).execute()
        
        if payment_completed:
            ProceedToShipment(self.order).execute()
        else:
            AbandonedCheckout(self.order).execute()
            
    def on_event(self, event):
        if isinstance(event, PaymentDeclined):
            DeclinePayment(self.order).execute()
        if isinstance(event, PartialPayment):
            DeclinePartialPayment(self.order).execute()
        if isinstance(event, OverPaid):
            RefundWorkflow(self.order).execute()
          
const { Wait, Workflow } = require("zenaton");
const ProcessPayment = require("../Tasks/ProcessPayment");
const ProceedToShipment = require("../Tasks/ProceedToShipment");
const AbandonedCheckout = require("../Tasks/AbandonedCheckout");
const DeclinePayment = require("../Tasks/DeclinePayment");
const DeclinePartialPayment = require("../Tasks/DeclinePartialPayment");
const RefundWorkflow = require("../Tasks/RefundWorkflow");

module.exports = Workflow("CheckoutWorkflow", {
  init(order) {
    this.order = order;
  },
  async handle() {
    await new ProcessPayment(this.order).dispatch();
    const paymentCompleted = await new Wait("PaymentCompleted")
      .hours(1)
      .execute();

    if (paymentCompleted) {
      await new ProceedToShipment(this.order).execute();
    } else {
      await new AbandonedCheckout(this.order).execute();
    }
  },
  async onEvent(eventName) {
    if (eventName === "PaymentDeclined") {
      await new DeclinePayment(this.order).execute();
    }
    if (eventName === "PartialPayment") {
      await new DeclinePartialPayment(this.order).execute();
    }
    if (eventName === "OverPaid") {
      await new RefundWorkflow(this.order).execute();
    }
  },
});
          
const { workflow, } = require("zenaton");

module.exports = workflow("CheckoutWorkflow", {
  *handle(order) {
    this.stripe = this.connector('stripe', 'your-connector-id');

    this.order = order;

    this.stripe.post('/v1/charges', {body: {amount: this.order.amount, charge: false, ...}});

    const paymentCompleted = yield this.wait.event("PaymentCompleted").for(duration.hours(1));

    if (paymentCompleted) {
      yield this.run.task("ProceedToShipment", this.order);
    } else {
      yield this.run.task("AbandonedCheckout", this.order);
    }
  },
  *onEvent(eventName, eventData) {
    if (eventName === "PaymentDeclined") {
      yield stripe.post(`/v1/issuing/authorizations/${this.order.auth_id}/decline`);
    }
    if (eventName === "PartialPayment") {
      yield stripe.post(`/v1/issuing/authorizations/${this.order.auth_id}/decline`);
    }
    if (eventName === "OverPaid") {
      yield this.stripe.post('/v1/refunds', {body: {charge: this.order.charge, amount: eventData.to_refund}});
    }
  },
});

Abandoned Workflow

The Abandoned Workflow is an easy way to improve your conversion rate in E-commerce. The user receives a notification to incite him to complete the check out. After one week, the content of the cart is removed. Step by step, it is:

  • Remove cart's content
  • Send an email to the user for his abandoned cart
  • If after more than one week, the user didn't come back to the platform, then the workflow is finished
  • If they do come back, the inital content is added back to the cart, allowing him to finish the check out

Flowchart of the Abandoned Workflow

The Abandoned Workflow is here represented in a flowchart. After one week, without user interaction, the workflow is automatically terminated.

abandoned-cart-workflow

Abandoned Workflow Code

This workflow is the code that orchestrates the tasks through the Zenaton workflow engine and are then executed on your servers.

abandoned_workflow.rb

<?php
                
use Zenaton\Interfaces\WorkflowInterface;
use Zenaton\Tasks\Wait;
use Zenaton\Traits\Zenatonable;
class AbandonedCart implements WorkflowInterface
{
    use Zenatonable;
    public function __construct($cart)
    {
        $this->cart = $cart;
    }
    public function handle()
    {
    (new RemoveCart($this->cart->email))->execute();
    (new SendAbandonedCartEmail($this->cart))->dispatch();
    
    $cart_resumed = (new Wait(CartResumed::class))->weeks(1)->execute();

        if ($cart_resumed) {
            (new AddInitialCart($this->cart))->execute();
        } 
    }
}
module MyStore
  module Workflows
    require './tasks/remove_cart'
    require './tasks/send_abandoned_cart_email'
    require './tasks/add_initial_cart'
    require './events/cart_resumed'
    class AbandonedCart < Zenaton::Interfaces::Workflow
      include Zenaton::Traits::Zenatonable

      def initialize(cart)
        @cart = cart
      end

      def handle
        MyStore::Tasks::RemoveCart.new(@cart.email).execute
        MyStore::Tasks::SendAbandonedCartEmail.new(@cart).dispatch

        cart_resumed = Zenaton::Tasks::Wait.new(MyStore::Events::CartResumed).weeks(1).execute
        MyStore::Tasks::AddInitialCart.new(@cart).execute if cart_resumed
      end
    end
  end
end
from zenaton.abstracts.workflow import Workflow
from zenaton.traits.zenatonable import Zenatonable
from zenaton.tasks.wait import Wait
from tasks.remove_cart import RemoveCart
from tasks.send_abandoned_cart_email import SendAbandonedCartEmail
from tasks.add_initial_cart import AddInitialCart
from events.cart_resumed import CartResumed

class AbandonedCart(Workflow, Zenatonable):
    def __init__(self, cart):
        self.cart = cart
    def handle(self):
        RemoveCart(self.cart).execute()
        SendAbandonedCartEmail(self.cart).dispatch()
        
        cart_resumed = Wait(CartResumed).weeks(1).execute()

        if cart_resumed:
            AddInitialCart(self.cart).execute()
const { Wait, Workflow } = require("zenaton");
const RemoveCart = require("../Tasks/RemoveCart");
const SendAbandonedCartEmail = require("../Tasks/SendAbandonedCartEmail");
const AddInitialCart = require("../Tasks/AddInitialCart");

module.exports = Workflow("AbandonedCart", {
    init(cart) {
        this.cart = cart;
    },
    async handle() {
      await new RemoveCart(this.cart.email).execute();
      await new SendAbandonedCartEmail(this.cart).dispatch();
      
      const cart_resumed = await new Wait("CartResumed").weeks(1).execute();

      if (cart_resumed) {
        await new AddInitialCart(this.cart).execute();
      } 
    },
});
const { workflow, duration } = require("zenaton");

module.exports = workflow("AbandonedCart", function*(cart) {
    const sendgrid = this.connector('sendgrid', 'your-connector-id');

    yield this.run.task("RemoveCart", cart.email)

    yield sendgrid.post('/mail/send', {body: {"template_id": "abandoned_cart", ...}})

    const cart_resumed = yield this.wait.event("CartResumed").for(duration.weeks(1));

    if (cart_resumed) {
      yield this.run.task("AddInitialCart", cart)
    } 
});

Shipping Workflow

The Shipping Workflow is triggered when the payment confirmation has been received. At this stage, several external events can be received. Here are the details:

  • Fulfill the shipment
  • At the same time, wait one week to receive external events about shipment
  • Either the order is fulfilled, in which case the tracking workflow is launched
  • Or the charge back event is received and it cancels the order from the shipment
  • Or the out of stock event is received and it alerts the user
  • In the case that no events are received, the alert is not fulfilled

Flowchart of the Shipping Workflow

The flowchart of the shipping workflow is showing the different events you can receive that results in different scenarios.

shipping workflow

Shipping Workflow Code

This workflow is the code that orchestrates the tasks through the Zenaton workflow engine and are then executed on your servers.

shipping_workflow.rb

<?php
                
use Zenaton\Interfaces\WorkflowInterface;
use Zenaton\Tasks\Wait;
use Zenaton\Traits\Zenatonable;
use Zenaton\Interfaces\EventInterface;

class ShippingWorkflow implements WorkflowInterface
{
    use Zenatonable;
    
    public function __construct($shipment)
    {
        $this->shipment = $shipment;
    }

    public function handle()
    {
        (new Fulfill($this->shipment))->dispatch();

        $fulfilled = (new Wait(Fulfilled::class))->weeks(1)->execute();

        if ($fulfilled) {
            (new ProceedToTracking($this->shipment))->execute();
        } else {
            (new AlertNotFulfilled($this->shipment))->execute();
        }
    }

    public function onEvent(EventInterface $event)
    {
        if ($event instanceof ChargeBack) {
            (new CancelOrderFromShipment($this->shipment))->execute();
        }
        if ($event instanceof OutOfStock) {
            (new ProcessOutOfStockQuantity($this->shipment))->execute();
        }
    }
}
module MyStore
  module Workflows
  require './tasks/fulfill'
  require './tasks/proceed_to_tracking'
  require './tasks/alert_not_fulfilled'
  require './tasks/cancel_order_from_shipment'
  require './tasks/process_out_of_stock_quantity'
  require './events/fulfilled'
  require './events/charge_back'
  require './events/out_of_stock'
    class ShippingWorkflow < Zenaton::Interfaces::Workflow
      include Zenaton::Traits::Zenatonable

      def initialize(shipment)
        @shipment = shipment
      end

      def id
        @shipment.id
      end

      def handle
        MyStore::Tasks::Fulfill.new(@shipment).dispatch

        fulfilled = Zenaton::Tasks::Wait.new(MyStore::Events::Fulfilled).weeks(1).execute
        if fulfilled
          MyStore::Tasks::ProceedToTracking.new(@shipment).execute
        else
          MyStore::Tasks::AlertNotFulfilled.new(@shipment).execute
        end
      end

      def on_event(event)
        case event
        when MyStore::Events::ChargeBack
          MyStore::Tasks::CancelOrderFromShipment.new(@shipment, event).execute
        when MyStore::Events::OutOfStock
          MyStore::Tasks::ProcessOutOfStockQuantity.new(@shipment, event).execute
        end
      end
    end
  end
end
from zenaton.abstracts.workflow import Workflow
from zenaton.traits.zenatonable import Zenatonable
from zenaton.tasks.wait import Wait
from tasks.fulfill import Fulfill
from tasks.proceed_to_tracking import ProceedToTracking
from tasks.alert_not_fulfilled import AlertNotFulfilled
from tasks.cancel_order_from_shipment import CancelOrderFromShipment
from tasks.process_out_of_stock_quantity import ProcessOutOfStockQuantity
from events.fulfilled import Fulfilled
from events.charge_back import ChargeBack
from events.out_of_stock import OutOfStock

class ShippingWorkflow(Workflow, Zenatonable):

    def __init__(self, shipment):
      self.shipment = shipment
        
    def handle(self):
      Fulfill(self.shipment).dispatch()
      fulfilled = Wait(Fulfilled).weeks(1).execute()

        if fulfilled:
            ProceedToTracking(self.shipment).execute()
        else:
            AlertNotFulfilled(self.shipment).execute()
                    
    def on_event(self, event):
      if isinstance(event, ChargeBack):
          CancelOrderFromShipment(self.shipment).execute()
      if isinstance(event, OutOfStock):
          ProcessOutOfStockQuantity(self.shipment).execute()
const { Wait, Workflow } = require("zenaton");
const Fulfill = require("../Tasks/Fulfill");
const ProceedToTracking = require("../Tasks/ProceedToTracking");
const AlertNotFulfilled = require("../Tasks/AlertNotFulfilled");
const CancelOrderFromShipment = require("../Tasks/CancelOrderFromShipment");
const ProcessOutOfStockQuantity = require("../Tasks/ProcessOutOfStockQuantity");

module.exports = Workflow("ShippingWorkflow", {
  init(shipment) {
    this.shipment = shipment;
  },
  async handle() {
    await new Fulfill(this.shipment).dispatch();

    const fulfilled = await new Wait("Fulfilled").weeks(1).execute();

    if (fulfilled) {
      await new ProceedToTracking(this.shipment).execute();
    } else {
      await new AlertNotFulfilled(this.shipment).execute();
    }
  },
  async onEvent(eventName) {
    if (eventName === "ChargeBack") {
      await new CancelOrderFromShipment(this.shipment).execute();
    }
    if (eventName === "OutOfStock") {
      await new ProcessOutOfStockQuantity(this.shipment).execute();
    }
  },
});
const { workflow, duration } = require("zenaton");

module.exports = workflow("ShippingWorkflow", {
  *handle(shipment) {
    this.shipment = shipment;
    
    yield this.run.task("Fulfill", this.shipment)

    const fulfilled = yield this.wait.event("Fulfilled").for(duration.weeks(1));

    if (fulfilled) {
      yield this.run.task("ProceedToTracking", this.shipment)
    } else {
      yield this.run.task("AlertNotFulfilled", this.shipment)
    }
  },
  *onEvent(eventName) {
    if (eventName === "ChargeBack") {
      yield this.run.task("CancelOrderFromShipment", this.shipment)
    }
    if (eventName === "OutOfStock") {
      yield this.run.task("ProcessOutOfStockQuantity", this.shipment)
    }
  },
});

Refund Workflow

The Refund Workflow is triggered when a user has overpaid for the items in their cart. So the payment issue needs to be solved and then the shipment workflow has to be launched.

  • Closes the checkout
  • Correct the payment issue
  • If in less than one day, the user has been refunded, then launch the shipment workflow
  • If not, trigger a refund timed-out alert

Flowchart of the Refund Workflow

The Refund Workflow shows the power of Zenaton with asynchronous tasks, waiting, events and the ability to launch sub-workflows.

refund workflow

Refund Workflow Code

This workflow is

refund_workflow.rb

<?php
                
use Zenaton\Interfaces\WorkflowInterface;
use Zenaton\Tasks\Wait;
use Zenaton\Traits\Zenatonable;

class RefundWorkflow implements WorkflowInterface
{
    use Zenatonable;

    public function __construct($refund)
    {
        $this->refund = $refund;
    }

    public function handle()
    {
        (new CloseCheckout($this->refund->order->id))->execute();
        (new IssueRefund($this->refund))->dispatch();

        $refundIssued = (new Wait(Refunded::class))->days(1)->execute();

        if ($refundIssued) {
            (new ProceedFromRefundToShipment($this->refund))->execute();
        } else {
            (new AlertRefundTimeout($this->refund))->execute();
        }
    }
}
module MyStore
module Workflows
require "./tasks/close_checkout"
require "./tasks/issue_refund"
require "./tasks/proceed_from_refund_to_shipment"
require "./tasks/alert_refund_timeout"
    class RefundWorkflow < Zenaton::Interfaces::Workflow
      include Zenaton::Traits::Zenatonable

      def initialize(refund)
        @refund = refund
      end

      def id
        @refund.id
      end

      def handle
        MyStore::Tasks::CloseCheckout.new(@refund.order.id).execute
        MyStore::Tasks::IssueRefund.new(@refund).dispatch

        refund_issued = Zenaton::Tasks::Wait.new(MyStore::Events::Refunded).days(1).execute
        if refund_issued
          MyStore::Tasks::ProceedFromRefundToShipment.new(@refund).execute
        else
          MyStore::Tasks::AlertRefundTimeout.new(@refund).execute
        end
      end
    end
  end
end

from zenaton.abstracts.workflow import Workflow
from zenaton.traits.zenatonable import Zenatonable
from zenaton.tasks.wait import Wait
from tasks.close_checkout import CloseCheckout
from tasks.issue_refund import IssueRefund
from tasks.proceed_from_refund_to_shipment import ProceedFromRefundToShipment
from tasks.alert_refund_timeout import AlertRefundTimeout
from events.refund_issued import Refunded

class RefundWorkflow(Workflow, Zenatonable):

    def __init__(self, refund):
      self.refund = refund
      
    def handle(self):
        CloseCheckout(self.refund, order).execute()
        IssueRefund(self.refund).dispatch()

        refund_issued = Wait(Refunded).days(1).execute()

        if refund_issued:
            ProceedFromRefundToShipment(self.refund).execute()
        else:
            AlertRefundTimeout(self.refund).execute()
const { Wait, Workflow } = require("zenaton");
const CloseCheckout = require("../Tasks/CloseCheckout");
const IssueRefund = require("../Tasks/IssueRefund");
const ProceedFromRefundToShipment = require("../Tasks/ProceedFromRefundToShipment");
const AlertRefundTimeout = require("../Tasks/AlertRefundTimeout");

module.exports = Workflow("RefundWorkflow", {
  init(refund) {
    this.refund = refund;
  },
  async handle() {
    await new CloseCheckout(this.refund.order.id).execute();
    await new IssueRefund(this.refund).dispatch();

    const refundIssued = await new Wait("Refunded").days(1).execute();

    if (refundIssued) {
      await new ProceedFromRefundToShipment(this.refund).execute();
    } else {
      await new AlertRefundTimeout(this.refund).execute();
    }
  },
});
const { workflow, duration } = require("zenaton");

module.exports = workflow("RefundWorkflow", {
  *handle(refund) {
    yield this.run.task("CloseCheckout", refund.order.id)

    this.run.task("IssueRefund", refund)

    const refundIssued = yield this.wait.event("Refunded").for(duration.days(1));

    if (refundIssued) {
      yield this.run.task("ProceedFromRefundToShipment", refund)

    } else {
      yield this.run.task("AlertRefundTimeout", refund)
    }
  },
});

Tracking Workflow

The Tracking Workflow is the last workflow of the series. It tracks the order and sends a notification to the user based on external events, either that the order is on the way, or that something has gone wrong.

  • Deliver the order
  • If the order hasn't been delivered in the maximum number of days indicated, then send an alert to the user.
  • If not, if the package has been lost, send a different alert to the user and stop tracking.
  • If there is a delivery error, a delivery error alert is sent and stop tracking.
  • If the order is returned to sender, another alert is sent and stop tracking.
  • If the order is delivered, a notification is sent to the customer.

Flowchart of the Tracking Workflow

The Tracking Workflow is represented below in a flowchart. After a certain number of days of waiting, several events can be received. Depending on these events, a specific alert will be sent. The provides the best user experience while the user is waiting for their order.

tracking workflow

Tracking Workflow Code

This workflow is the code that orchestrates the tasks through the Zenaton workflow engine and are then executed on your servers. The workflow is triggered after the shipment, when the order is about to be delivered.

tracking_workflow.rb

<?php
            
use Zenaton\Interfaces\WorkflowInterface;
use Zenaton\Tasks\Wait;
use Zenaton\Traits\Zenatonable;

class TrackingWorkflow implements WorkflowInterface
{
    use Zenatonable;

    public function __construct($shipment)
    {
        $this->shipment = $shipment;
    }

    public function handle()
    {
        (new Deliver($this->shipment))->dispatch();
        $deliveryDays = $this->shipment->maximumShippingDays;

        $delivered = (new Wait(Delivered::class))->days($deliveryDays)->execute();
        if ($delivered) {
            (new AlertDelivery($this->shipment))->execute();
        } else {
            (new AlertLateDelivery($this->shipment))->execute();
        }
    }

    public function onEvent(EventInterface $event)
    {
        if ($event instanceof PackageLost) {
            (new AlertPackageLost($this->shipment))->execute();
            (new StopTrackingWorkflow($this->shipment))->execute();
        }
        if ($event instanceof DeliveryError) {
            (new AlertDeliveryError($this->shipment))->execute();
            (new StopTrackingWorkflow($this->shipment))->execute();
        }
        if ($event instanceof ReturnedToSender) {
            (new AlertReturn($this->shipment))->execute();
            (new StopTrackingWorkflow($this->shipment))->execute();
        }
    }
}
module MyStore
  module Workflows
  
    class TrackingWorkflow < Zenaton::Interfaces::Workflow
      include Zenaton::Traits::Zenatonable

      def initialize(shipment)
        @shipment = shipment
      end

      def id
        @shipment.id
      end

      def handle
        MyStore::Tasks::Deliver.new(@shipment).dispatch
        delivery_days = @shipment.carrier.maximum_shipping_days

        delivered = Zenaton::Tasks::Wait.new(MyStore::Events::Delivered).days(delivery_days).execute
        if delivered
          MyStore::Tasks::AlertDelivery.new(@shipment).execute
        else
          MyStore::Tasks::AlertLateDelivery.new(@shipment).execute
        end
      end

      def on_event(event)
        case event
        when MyStore::Events::PackageLost
          MyStore::Tasks::AlertPackageLost.new(@shipment, event).execute
          MyStore::Tasks::StopTrackingWorkflow.new(@shipment, event).execute
        when MyStore::Events::DeliveryError
          MyStore::Tasks::AlertDeliveryError.new(@shipment, event).execute
          MyStore::Tasks::StopTrackingWorkflow.new(@shipment, event).execute
        when MyStore::Events::ReturnedToSender
          MyStore::Tasks::AlertReturn.new(@shipment, event).execute
          MyStore::Tasks::StopTrackingWorkflow.new(@shipment, event).execute
        end
      end
    end
  end
end
from zenaton.abstracts.workflow import Workflow
from zenaton.traits.zenatonable import Zenatonable
from zenaton.tasks.wait import Wait
from tasks.deliver import Deliver
from tasks.alert_delivery import AlertDelivery
from tasks.alert_late_delivery import AlertLateDelivery
from tasks.alert_package_lost import AlertPackageLost
from tasks.alert_delivery_error import AlertDeliveryError
from tasks.alert_return import AlertReturn

from events.delivered import Delivered
from events.package_lost import PackageLost
from events.delivery_error import DeliveryError
from events.returned_to_sender import ReturnedToSender

class TrackingWorkflow(Workflow, Zenatonable):
    def __init__(self, shipment):
      self.shipment = shipment
    def handle(self):
      Deliver(self.shipment).dispatch()
      
      delivery_days = self.shipment.maximum_shipping_days

      delivered = Wait(Delivered).days(delivery_days).execute()
      if delivered:
          AlertDelivery(self.shipment).execute()
      else:
          AlertLateDelivery(self.shipment).execute()
            
    def on_event(self, event):
      if isinstance(event, PackageLost):
          AlertPackageLost(self.shipment).execute()
          StopTrackingWorkflow(self.shipment).execute() 
      if isinstance(event, DeliveryError):
          AlertDeliveryError(self.shipment).execute()
          StopTrackingWorkflow(self.shipment).execute()
      if isinstance(event, ReturnedToSender):
          AlertReturn(self.shipment).execute()
          StopTrackingWorkflow(self.shipment).execute()
const { Wait, Workflow } = require("zenaton");
const Deliver = require("../Tasks/Deliver");
const AlertDelivery = require("../Tasks/AlertDelivery");
const AlertLateDelivery = require("../Tasks/AlertLateDelivery");
const AlertPackageLost = require("../Tasks/AlertPackageLost");
const AlertDeliveryError = require("../Tasks/AlertDeliveryError");
const AlertReturn = require("../Tasks/AlertReturn");
const StopTrackingWorkflow = require("../Tasks/StopTrackingWorkflow");

module.exports = Workflow("TrackingWorkflow", {
  init(shipment) {
    this.shipment = shipment;
  },
  async handle() {
    await new Deliver(this.shipment).dispatch();

    const deliveryDays = this.shipment.carrier.maximumShippingDays;

    const delivered = await new Wait("Delivered").days(deliveryDays).execute();

    if (delivered) {
      await new AlertDelivery(this.shipment).execute();
    } else {
      await new AlertLateDelivery(this.shipment).execute();
    }
  },
  async onEvent(eventName) {
    if (eventName === "PackageLost") {
      await new AlertPackageLost(this.shipment).execute();
      await new StopTrackingWorkflow(this.shipment).execute();
    }
    if (eventName === "DeliveryError") {
      await new AlertDeliveryError(this.shipment).execute();
      await new StopTrackingWorkflow(this.shipment).execute();
    }
    if (eventName === "ReturnedToSender") {
      await new AlertReturn(this.shipment).execute();
      await new StopTrackingWorkflow(this.shipment).execute();
    }
  },
});
const { workflow,duration } = require("zenaton");

module.exports = workflow("TrackingWorkflow", {
  *handle(shipment) {
    this.shipment = shipment
    this.run.task("Deliver", shipment)

    const deliveryDays = this.shipment.carrier.maximumShippingDays;

    const delivered = yield this.wait.event("Delivered").for(duration.days(deliveryDays));

    if (delivered) {
      yield this.run.task("AlertDelivery", this.shipment);
    } else {
      yield this.run.task("AlertLateDelivery", this.shipment);
    }
  },
  *onEvent(eventName) {
    if (eventName === "PackageLost") {
      yield this.run.task("AlertPackageLost", this.shipment);
      yield this.run.task("StopTrackingWorkflow", this.shipment);
    }
    if (eventName === "DeliveryError") {
      yield this.run.task("AlertDeliveryError", this.shipment);
      yield this.run.task("StopTrackingWorkflow", this.shipment);
    }
    if (eventName === "ReturnedToSender") {
      yield this.run.task("AlertReturn", this.shipment);
      yield this.run.task("StopTrackingWorkflow", this.shipment);
    }
  },
});

Calculate and send a real-time alerts using real time data from a vehicle tracking API

This workflow introduces the Wait and execute Methods.

This example shows a workflow for a delivery or car service with real-time data calculations and a vehicle tracking process as well as triggering notifications (emails or text messages) to the customer based on different conditions.
Our workflow manages real time calculations and customer communications (emails and SMS) based on the realtime duration to delivery and ETA. We calculate the 'duration' and 'ETA of the service using real time vehicle tracking data from an internal API . We recalculate periodically and send a notification to the customer one hour before arrival. If the ETA changes by more than 20 minutes after we have sent the original notification, we send a new notification with the updated ETA. Then 2 minutes before arrival, the customer receives another notification that the vehicle is arriving

Here is briefly the step by step use case:

  • Run a simple algorithm that calculates the duration and estimated time of arrival (ETA)
  • As long as the remaining duration is more than 1 hour, continue calculating the ETA and remaining duration at every interval (half of the time until arrival)
  • Once the remaining duration is less than one hour, send a message to the user saying that the vehicle will arrive in 1 hour
  • As long as the duration is more than our target time (2 minutes here), recalculate the remaining duration and ETA and compare it with the previous one
  • If the ETA has changed by more than 20 minutes, then send an alert to the user letting them know that there is a delay and include the new ETA
  • If the ETA has not changed by more than 20 minutes, send a notification to the user 2 minutes before arrival

Flowchart of the workflow

This flowchart shows a visual representation of the workflow tasks.

Flowchart of sending ETA notification

Workflow Code

This workflow is the code that orchestrates tasks (through the Zenaton workflow engine) and executes them on your servers.

notify_eta_workflow.rb

<?php
use Zenaton\Interfaces\WorkflowInterface;
use Zenaton\Traits\Zenatonable;
use Zenaton\Tasks\Wait;

class NotifyEtaWorkflow implements WorkflowInterface
{
    use Zenatonable;
    const BEFORE = 3600;
    const PRECISION = 120;
    const UPDATE = 1200;

    protected $tripId;
    protected $user;
    public function __construct($tripId, $user)
    {
        $this->tripId = $tripId;
        $this->user = $user;
    }
    public function handle()
    {
        [$duration, $eta] = $this->getTimeToArrival();

        while ($duration > self::BEFORE + self::PRECISION) {

            (new Wait())->seconds(($duration - self::BEFORE) / 2))->execute();

            [$duration, $eta] = $this->getTimeToArrival();
        }

        (new InformUserOfEtaTask($this->user, $eta))->execute();

        while ($duration > self::PRECISION) {

            (new Wait())->seconds(self::UPDATE)->execute();

            [$duration, $eta2] = $this->getTimeToArrival();

            if (abs($eta2 - $eta) >= self::UPDATE) {
                $eta = $eta2;
                (new NotifyUserOfUpdatedEtaTask($this->user, $eta))->execute();
            }
        }
    }
    
    protected function getTimeToArrival()
    {
        return (new CalculateTimeToArrivalTask($this->tripId))->execute();
    }
}
require "./tasks/calculate_time_to_arrival_task"
require "./tasks/inform_user_of_eta_task"
require "./tasks/notify_user_of_updated_eta_task"

class NotifyEtaWorkflow < Zenaton::Interfaces::Workflow
  include Zenaton::Traits::Zenatonable

  BEFORE = 3600
  PRECISION = 120
  UPDATE = 1200

  def initialize(trip_id, user)
    @trip_id = trip_id
    @user = user
  end

  def handle
    duration, eta = CalculateTimeToArrivalTask.new(@trip_id).execute

    loop do
      Zenaton::Tasks::Wait.new.seconds((duration - BEFORE) / 2).execute

      duration, eta = CalculateTimeToArrivalTask.new(@trip_id).execute

      break if duration > (BEFORE + PRECISION)
    end

    InformUserOfEtaTask.new(@user, eta).execute

    loop do
      Zenaton::Tasks::Wait.new.seconds(UPDATE).execute

      duration, eta2 = CalculateTimeToArrivalTask.new(@trip_id).execute

      if (eta2 - eta).abs >= UPDATE
        eta = eta2
        NotifyUserOfUpdatedEtaTask.new(@user, eta).execute
      end

      break if duration > PRECISION
    end
  end
end
const { Workflow, Wait } = require("zenaton");
const CalculateTimeToArrivalTask = require("../Tasks/CalculateTimeToArrivalTask");
const InformUserOfEtaTask = require("../Tasks/InformUserOfEtaTask");
const NotifyUserOfUpdatedEtaTask = require("../Tasks/NotifyUserOfUpdatedEtaTask");

module.exports = Workflow("NotifyEtaWorkflow", {
  init(tripId, user) {
    this.tripId = tripId;
    this.user = user;
  },
  async handle() {
    const BEFORE = 3600;
    const PRECISION = 120;
    const UPDATE = 1200;
    let duration;
    let eta;
    let eta2;

    [duration, eta] = await this.getTimeToArrival();

    while (duration > BEFORE + PRECISION) {
      await new Wait().seconds((duration - BEFORE) / 2).execute();

      [duration, eta] = await this.getTimeToArrival();
    }

    new InformUserOfEtaTask(this.user, eta).execute();

    while (duration > PRECISION) {
      await new Wait().seconds(UPDATE).execute();

      [duration, eta2] = this.getTimeToArrival();
      if (Math.abs(eta2 - eta) >= UPDATE) {
        eta = eta2;

        await new NotifyUserOfUpdatedEtaTask(this.user, eta).execute();
      }
    }
  },
  async getTimeToArrival() {
    return new CalculateTimeToArrivalTask(this.tripId).execute();
  },
});
const { workflow, duration } = require("zenaton");

module.exports = workflow("NotifyEtaWorkflow", {
  *handle(tripId, user) {
    const twilio_sms = this.connector('twilio_sms', 'your-connector-id');

    this.tripId = tripId;
    this.user = user;

    const BEFORE = 3600;
    const PRECISION = 120;
    const UPDATE = 1200;
    let duration;
    let eta;
    let eta2;

    [duration, eta] = this.getTimeToArrival();

    while (duration > BEFORE + PRECISION) {
      yield this.wait.for(duration.seconds((duration - BEFORE) / 2));

      [duration, eta] = this.getTimeToArrival();
    }

    yield this.run.task("InformUserOfEtaTask", this.user, eta);

    while (duration > PRECISION) {
      yield this.wait.for(duration.seconds(UPDATE));

      [duration, eta2] = this.getTimeToArrival();
      if (Math.abs(eta2 - eta) >= UPDATE) {
        eta = eta2;

        twilio_sms.post('/Messages.json', {body: {To: this.user.phone, Body: '...'}})
      }
    }
  },
  *getTimeToArrival() {
    return yield this.run.task("CalculateTimeToArrivalTask", this.tripId);
  },
});
from zenaton.abstracts.workflow import Workflow
from zenaton.traits.zenatonable import Zenatonable
from zenaton.tasks.wait import Wait

from tasks.calculate_time_to_arrival_task import CalculateTimeToArrivalTask
from tasks.inform_user_of_eta_task import InformUserOfEtaTask
from tasks.notify_user_of_updated_eta_task import NotifyUserOfUpdatedEtaTask


class NotifyEtaWorkflow(Workflow, Zenatonable):
    BEFORE = 3600
    PRECISION = 120
    UPDATE = 1200

    def __init__(self, user, trip_id):
        self.trip_id = trip_id
        self.user = user

    def handle(self):
        duration, eta = CalculateTimeToArrivalTask(self.trip_id).execute()

        while True:
            Wait().seconds((duration - self.BEFORE) / 2).execute()
            duration, eta = CalculateTimeToArrivalTask(self.trip_id).execute()

            if duration > self.BEFORE + self.PRECISION:
                break

        InformUserOfEtaTask(self.user, eta).execute()

        while True:
            Wait().seconds(self.UPDATE).execute()

            duration, eta2 = CalculateTimeToArrivalTask(self.trip_id).execute()

            if abs(eta2 - eta) >= self.UPDATE:
                eta = eta2
                NotifyUserOfUpdatedEtaTask(self.user, eta).execute()
            else:
                break

Product Return Workflow

This workflow introduces the Wait and execute Methods.

This Workflow optimizes a customer product return process by automating customer support internal processes and interfacing with external services: Chronopost, Sengrid, Zendesk and TrustPilot API. We create the return inside our internal application, generate and email a 'return parcel' and send a personalised email to the customer depending on their status and return parameters. Depending on the reason for the return, there might be a follow up action such as assigning an account manager to reach out or asking the customer for a review when the product has been exchanged.

  • Idenfy the reason for the return.
  • Create a parcel return in our internal application.
  • Grab parcel data and generate a QR code with Chronopost API.
  • Check the reason for the return and send an email template via Sendgrid using the right template and attach QR code from chronopost.
  • Depending on the reason for the return, trigger specific tasks such as "Assign an Operator" or "Ask for a Review".
  • Wait for up to three days to receive an external event that the user shipped the product.
  • If it has been shipped, the workflow is completed.
  • If not, an email reminder subworkflow is launched to remind the user to send the parcel and follow up if they do not.

Flowchart of the workflow

This flowchart shows a visual representation of the workflow tasks.

Workflow Code

This workflow is the code that orchestrates tasks (through the Zenaton workflow engine) and executes them on your servers. Tasks inside the workflow are not detailed here.

product_return.rb

<?php

use ZenatonInterfacesWorkflowInterface;
use ZenatonTasksWait;
use ZenatonTraitsZenatonable;

class ProductReturnWorkflow implements WorkflowInterface
{
    use Zenatonable;

    public function __construct($parcel_return)
    {
        $this->parcel_return = $parcel_return;
    }

    public function handle()
    {
        $email = $this->parcel_return.email;
        
        (new CreateReturnParcel($this->parcel_return))->execute();
        
        $status = (new GetParcelData($this->parcel_return))->execute();
        
        if (status === 'status_1') {
            (new SendTemplateSendGrid('template_1', email))->dispatch();
            (new AssignSupportOperatorZendesk())->dispatch();
        } elif (status == 'status_2') {
            (new SendTemplateSendGrid('template_2', email))->dispatch();
            (new AskReviewTrustPilot())->dispatch();
        // elif (status == 'status_xxx' { 
        //  (new SendTemplateSendGrid('template_xxx', email))->dispatch();
        // }
        } else {
            (new SendTemplateSendGrid('template_x', email))->dispatch();
        }
        
        $shipped = (new Wait(Shipped::class))->days(3)->execute();

        if ($shipped) {
            return;
        } else {
            (new EmailReminderWorkflow())->execute();
        }
    }
    public function getId()
    {
        return $this->parcel_return_id;
    }
}
const { Wait, Workflow } = require("zenaton");
const CreateReturnParcel = require("../Tasks/CreateReturnParcel");
const GetParcelData = require("../Tasks/GetParcelData");
const SendTemplateSendGrid = require("../Tasks/SendTemplateSendGrid");
const AssignSupportOperatorZendesk = require("../Tasks/AssignSupportOperatorZendesk");
const AskReviewTrustPilot = require("../Tasks/AskReviewTrustPilot");
const EmailReminderWorkflow = require("../Tasks/EmailReminderWorkflow");

module.exports = Workflow("ProductReturnWorkflow", {
  init(parcelReturn) {
    this.parcelReturn = parcelReturn;
  },

  async handle() {
    const email = this.parcelReturn.email;

    await new CreateReturnParcel(this.parcelReturn).execute();

    const status = await new GetParcelData(this.parcelReturn).execute();

    if (status === "status_1") {
      await new SendTemplateSendGrid("template_1", email).dispatch();
      await new AssignSupportOperatorZendesk().dispatch();
    } else if (status === "status_2") {
      await new SendTemplateSendGrid("template_2", email).dispatch();
      await new AskReviewTrustPilot().dispatch();
      // else if (status === 'xxx') {
      //   SendTemplateSendGrid.new('template_xxx', email).dispatch();
      // }
    } else {
      await new SendTemplateSendGrid("template_x", email).dispatch();
    }

    const shipped = await new Wait("Shipped").days(3).execute();

    if (!shipped) {
      await new EmailReminderWorkflow().execute();
    }
  },
  id() {
    return this.parcelReturn.id;
  }
});
const { workflow, duration } = require("zenaton");

module.exports = workflow("ProductReturnWorkflow", {
  *handle(parcelReturn) {
    const sendgrid = this.connector('sendgrid', 'your-connector-id');
    this.parcelReturn = parcelReturn;

    const email = this.parcelReturn.email;

    yield this.run.task("CreateReturnParcel", this.parcelReturn);

    const status = yield this.run.task("GetParcelData", this.parcelReturn);

    if (status === "status_1") {
      sendgrid.post('/mail/send', {body: {"template_id": "template_1", ...}})
      this.run.task("AssignSupportOperatorZendesk");
    } else if (status === "status_2") {
      sendgrid.post('/mail/send', {body: {"template_id": "template_2", ...}})
      this.run.task("AskReviewTrustPilot");
      // else if (status === 'xxx') {
      //   sendgrid.post('/mail/send', {body: {"template_id": "template_xxx", ...}})
      // }
    } else {
      sendgrid.post('/mail/send', {body: {"template_id": "template_x", ...}})
    }

    const shipped = yield this.wait.event("Shipped").for(duration.days(3));

    if (!shipped) {
      yield this.run.task("EmailReminderWorkflow");
    }
  }
});
require "./tasks/create_return_parcel"
require "./tasks/get_parcel_data"
require "./tasks/send_template_send_grid"
require "./tasks/assign_support_operator_zendesk"
require "./tasks/ask_review_trust_pilot"
require "./tasks/email_reminder_workflow"
require "./events/shipped_event"

class ProductReturnWorkflow < Zenaton::Interfaces::Workflow
  include Zenaton::Traits::Zenatonable

  def initialize(parcel_return)
    @parcel_return = parcel_return
  end

  def handle
    email = @parcel_return.email
    
    CreateReturnParcel.new(@parcel_return).execute
    
    status = GetParcelData.new(@parcel_return).execute
    
    if status == 'status_1'
        SendTemplateSendGrid.new('template_1', email).dispatch()
        AssignSupportOperatorZendesk.new().dispatch()
    elsif status ==  'status_2'
        SendTemplateSendGrid('template_2', email).dispatch()
        AskReviewTrustPilot.new().dispatch()
    # elsif status == 'xxx'
    #     SendTemplateSendGrid.new('template_xxx', email).dispatch()
    else
        SendTemplateSendGrid.new('template_x', email).dispatch() 
    end

    shipped = Zenaton::Tasks::Wait.new(ShippedEvent).days(3).execute
    
    unless shipped
        EmailReminderWorkflow.new().execute
    end
  end

  def id
    @parcel_return.id
  end
end
from zenaton.abstracts.workflow import Workflow
from zenaton.traits.zenatonable import Zenatonable
from zenaton.tasks.wait import Wait

from events.shipped import Shipped
from tasks.create_return_parcel import CreateReturnParcel
from tasks.get_parcel_data import GetParcelData
from tasks.send_template_send_grid import SendTemplateSendGrid
from tasks.assign_support_operator_zendesk import AssignSupportOperatorZendesk
from tasks.ask_review_trust_pilot import AskReviewTrustPilot
from tasks.email_reminder_workflow import EmailReminderWorkflow

class ProductReturnWorkflow(Workflow, Zenatonable):
    def __init__(self, parcel_return):
        self.parcel_return = parcel_return

    def handle(self):
        email = self.parcel_return.email

        CreateReturnParcel(self.parcel_return).execute()

        status = GetParcelData(self.parcel_return).execute()
        
        if status == 'status_1':
            SendTemplateSendGrid('template_1', email).dispatch()
            AssignSupportOperatorZendesk().dispatch()
        elif status ==  'status_2':
            SendTemplateSendGrid('template_2', email).dispatch()
            AskReviewTrustPilot().dispatch()
        # elif status == 'xxx':
        #     SendTemplateSendGrid('template_xxx', email).dispatch()
        else:
            SendTemplateSendGrid('template_x', email).dispatch() 

        shipped = Wait(Shipped).days(3).execute()

        if shipped:
            return
        else:
            EmailReminderWorkflow().execute()

    def id(self):
        return self.parcel_return.id