Engineering

26 March, 2019

From 0 to DynamicSupervisor —  Handling Processes in Elixir

Elixir processes - Citing Bruce Williams, Elixir doesn’t treat concurrency and process management like afterthoughts or advanced features.

Nuno Marinho

Software Engineer

Handling Elixir Processes: From 0 to DynamicSupervisor (Source: https://ferd.ca/the-zen-of-erlang.html) 

I started my software development career as a Java developer and it had been my language of choice until December of 2016, which was the time when I decided to join Coletiv and started coding in Elixir 😀.

With Java it’s possible to use processes and parallelize them but it isn’t common and it’s always a struggle. One of the use cases I used processes for was when I had to process a big amount of data in a reduced timeframe, but as a rule of thumb I avoided them as much as I could.

With Elixir I found another world that, citing Bruce Williams, doesn’t treat concurrency and process management like afterthoughts or advanced features.

A few months ago Elixir was updated to version 1.6 which introduced the DynamicSupervisor.

DynamicSupervisor in my way

In one of my current projects users have tasks assigned to them and the backend has to notify the user about the status of their tasks every 5 minutes.

In terms of database structure I have users and they have tasks assigned to them.

Database structure Database structure

When it comes to the erlang/elixir process structure we decided to have a GenServer (Task worker) for each active task, whose responsibility is to constantly update the users about their tasks status.

On top of these processes there is a DynamicSupervisor responsible for supervising and working as an interface to communicate with the child processes.

In terms of process lifetime it goes like this:

  • When the application starts all the Task workers are started, one by one, for each active task.

  • When a user adds a new task, a new Task worker is started.

  • When a user completes a task the respective Task worker is killed.

Elixir process tree Elixir process tree

DynamicSupervisor Code

The TaskSupervisor is a DynamicSupervisor and we just need to initialise it (start_link/1 & init/1) and define the functions to start (start_task_worker/1) and finish (terminate_task_worker/2) the workers related to them.

defmodule MyApp.TaskSupervisor do use DynamicSupervisor alias MyApp.Management alias MyApp.WorkerStateStorage def start_link(arg) do DynamicSupervisor.start_link(__MODULE__, arg, name: __MODULE__) end def init(arg) do DynamicSupervisor.init(arg) end def start_task_worker(task_id) do spec = {MyApp.TaskWorker, %{task_id: task_id}} DynamicSupervisor.start_child(__MODULE__, spec) end def start_all_task_workers() do Enum.each(Management.list_all_tasks(), fn %{id: task_id} -> start_task_worker(task_id) end) end def terminate_task_worker(pid, task_id) do WorkerStateStorage.delete(task_id) Supervisor.terminate_child(__MODULE__, pid) end end

TaskWorker Code

The TaskWorker is a GenServer that verifies every 5 minutes if the task is active and notifies the user that the task is still open. When a task is finished the related Task worker dies.

One important thing is the restart strategy for the GenServer, we are using the :transient strategy so that if the Task worker is abnormally terminated (e.g. crashes for some unexpected reason) it is immediately respawned by the DynamicSupervisor. If the Task worker is normally terminated (e.g. when the task is finished) it is not respawned.

defmodule MyApp.TaskWorker do use GenServer, restart: :transient alias MyApp.Notify alias MyApp.Management alias MyApp.WorkerStateStorage # client def start_link(state) do GenServer.start_link(__MODULE__, state) end # server def init(state) do state = Map.put(state, :pid, :erlang.pid_to_list(self())) schedule_notify_task(300000) case WorkerStateStorage.save(state) do true -> {:ok, state} false -> case WorkerStateStorage.get(state.task_id) do nil -> {:ok, state} pid -> stop(:erlang.list_to_pid(pid), state.task_id) WorkerStateStorage.save(state) {:ok, state} end end end def handle_info(:schedule_notify_task, state) do case notify(state) do true -> stop(self(), state.task_id) false -> # Reschedule once more schedule_notify_task(300000) end {:noreply, state} end def handle_cast( {:notify_task}, state ) do case notify(state) do true -> WorkerStateStorage.delete(state.task_id) {:stop, :normal, state} false -> {:noreply, state} end end defp schedule_notify_task(time) do Process.send_after(self(), :schedule_notify_task, time) end defp notify(%{task_id: task_id, pid: pid} = state) do task = Management.get_task!(task_id) case task.is_open do false -> Notify.notify(task) true -> true end end defp stop(pid, task_id) do WorkerStateStorage.delete(task_id) if Process.alive?(pid) do Process.exit(pid, :normal) end end end

StarterWorker Code

In order to start all the workers when the application starts we created a StarterWorker that whose single responsibility is to start all Task workers. One Task worker per active Task.

defmodule MyApp.StarterWorker do @moduledoc """ Start lost children """ use GenServer def start_link() do GenServer.start_link(__MODULE__, []) end def init(state) do :ets.new(:task_backup, [:set, :public, :named_table]) MyApp.Supervisor.start_all_task_workers() {:ok, state} end end

Lesson learned

Like me, when you start with Elixir you might be afraid of using processes because of the experience you had with previous languages, instead give it a try and you will be surprised how quick and easy it is to work with them.

There are a few great links you should check when trying processes:

Elixir processes everywhere Generated at https://memegenerator.net

Elixir

Software Development

DynamicSupervisor

Process

Erlang

Join our newsletter

Be part of our community and stay up to date with the latest blog posts.

Subscribe

Join our newsletter

Be part of our community and stay up to date with the latest blog posts.

Subscribe

You might also like...

Go back to blogNext
How to support a list of uploads as input with Absinthe GraphQL

Engineering

26 July, 2022

How to support a list of uploads as input with Absinthe GraphQL

As you might guess, in our day-to-day, we write GraphQL queries and mutations for Phoenix applications using Absinthe to be able to create, read, update and delete records.

Nuno Marinho

Software Engineer

Flutter Navigator 2.0 Made Easy with Auto Router - Coletiv Blog

Engineering

04 January, 2022

Flutter Navigator 2.0 Made Easy with Auto Router

If you are a Flutter developer you might have heard about or even tried the “new” way of navigating with Navigator 2.0, which might be one of the most controversial APIs I have seen.

António Valente

Software Engineer

Enabling PostgreSQL cron jobs on AWS RDS - Coletiv Blog

Engineering

04 November, 2021

Enabling PostgreSQL cron jobs on AWS RDS

A database cron job is a process for scheduling a procedure or command on your database to automate repetitive tasks. By default, cron jobs are disabled on PostgreSQL instances. Here is how you can enable them on Amazon Web Services (AWS) RDS console.

Nuno Marinho

Software Engineer

Go back to blogNext