How to support a list of uploads as input with Absinthe GraphQL
As you might guess, in our day-to-day, we write GraphQL queries and mutations for Phoenix applications using Absinthe to be able to create, read, update and delete records.
I started my software development career as a Java developer and it had been my language of choice until December of 2016, which was the time when I decided to join Coletiv and started coding in Elixir 😀.
With Java it’s possible to use processes and parallelize them but it isn’t common and it’s always a struggle. One of the use cases I used processes for was when I had to process a big amount of data in a reduced timeframe, but as a rule of thumb I avoided them as much as I could.
With Elixir I found another world that, citing Bruce Williams, doesn’t treat concurrency and process management like afterthoughts or advanced features.
A few months ago Elixir was updated to version 1.6 which introduced the DynamicSupervisor.
In one of my current projects users
have tasks
assigned to them and the backend has to notify the user about the status of their tasks every 5 minutes.
In terms of database structure I have users and they have tasks assigned to them.
When it comes to the erlang/elixir process structure we decided to have a GenServer (Task worker) for each active task, whose responsibility is to constantly update the users about their tasks status.
On top of these processes there is a DynamicSupervisor
responsible for supervising and working as an interface to communicate with the child processes.
In terms of process lifetime it goes like this:
When the application starts all the Task workers
are started, one by one, for each active task.
When a user adds a new task, a new Task worker
is started.
When a user completes a task the respective Task worker
is killed.
The TaskSupervisor
is a DynamicSupervisor
and we just need to initialise it (start_link/1
& init/1
) and define the functions to start (start_task_worker/1
) and finish (terminate_task_worker/2
) the workers related to them.
defmodule MyApp.TaskSupervisor do
use DynamicSupervisor
alias MyApp.Management
alias MyApp.WorkerStateStorage
def start_link(arg) do
DynamicSupervisor.start_link(__MODULE__, arg, name: __MODULE__)
end
def init(arg) do
DynamicSupervisor.init(arg)
end
def start_task_worker(task_id) do
spec = {MyApp.TaskWorker, %{task_id: task_id}}
DynamicSupervisor.start_child(__MODULE__, spec)
end
def start_all_task_workers() do
Enum.each(Management.list_all_tasks(), fn %{id: task_id} ->
start_task_worker(task_id)
end)
end
def terminate_task_worker(pid, task_id) do
WorkerStateStorage.delete(task_id)
Supervisor.terminate_child(__MODULE__, pid)
end
end
The TaskWorker
is a GenServer
that verifies every 5 minutes if the task is active and notifies the user that the task is still open. When a task is finished the related Task worker
dies.
One important thing is the restart strategy for the GenServer
, we are using the :transient
strategy so that if the Task worker is abnormally terminated (e.g. crashes for some unexpected reason) it is immediately respawned by the DynamicSupervisor
. If the Task worker is normally terminated (e.g. when the task is finished) it is not respawned.
defmodule MyApp.TaskWorker do
use GenServer, restart: :transient
alias MyApp.Notify
alias MyApp.Management
alias MyApp.WorkerStateStorage
# client
def start_link(state) do
GenServer.start_link(__MODULE__, state)
end
# server
def init(state) do
state = Map.put(state, :pid, :erlang.pid_to_list(self()))
schedule_notify_task(300000)
case WorkerStateStorage.save(state) do
true ->
{:ok, state}
false ->
case WorkerStateStorage.get(state.task_id) do
nil ->
{:ok, state}
pid ->
stop(:erlang.list_to_pid(pid), state.task_id)
WorkerStateStorage.save(state)
{:ok, state}
end
end
end
def handle_info(:schedule_notify_task, state) do
case notify(state) do
true ->
stop(self(), state.task_id)
false ->
# Reschedule once more
schedule_notify_task(300000)
end
{:noreply, state}
end
def handle_cast(
{:notify_task},
state
) do
case notify(state) do
true ->
WorkerStateStorage.delete(state.task_id)
{:stop, :normal, state}
false ->
{:noreply, state}
end
end
defp schedule_notify_task(time) do
Process.send_after(self(), :schedule_notify_task, time)
end
defp notify(%{task_id: task_id, pid: pid} = state) do
task = Management.get_task!(task_id)
case task.is_open do
false ->
Notify.notify(task)
true ->
true
end
end
defp stop(pid, task_id) do
WorkerStateStorage.delete(task_id)
if Process.alive?(pid) do
Process.exit(pid, :normal)
end
end
end
In order to start all the workers when the application starts we created a StarterWorker
that whose single responsibility is to start all Task workers. One Task worker per active Task
.
defmodule MyApp.StarterWorker do
@moduledoc """
Start lost children
"""
use GenServer
def start_link() do
GenServer.start_link(__MODULE__, [])
end
def init(state) do
:ets.new(:task_backup, [:set, :public, :named_table])
MyApp.Supervisor.start_all_task_workers()
{:ok, state}
end
end
Like me, when you start with Elixir you might be afraid of using processes because of the experience you had with previous languages, instead give it a try and you will be surprised how quick and easy it is to work with them.
There are a few great links you should check when trying processes:
Generated at https://memegenerator.net
Join our newsletter
Be part of our community and stay up to date with the latest blog posts.
SubscribeJoin our newsletter
Be part of our community and stay up to date with the latest blog posts.
SubscribeAs you might guess, in our day-to-day, we write GraphQL queries and mutations for Phoenix applications using Absinthe to be able to create, read, update and delete records.
If you are a Flutter developer you might have heard about or even tried the “new” way of navigating with Navigator 2.0, which might be one of the most controversial APIs I have seen.
A database cron job is a process for scheduling a procedure or command on your database to automate repetitive tasks. By default, cron jobs are disabled on PostgreSQL instances. Here is how you can enable them on Amazon Web Services (AWS) RDS console.