Engineering

20 March, 2019

How we scraped and aggregated data from multiple sources using OTP

How to increasingly scrape and aggregate bigger amounts of data from multiple sources into a comprehensive dashboard using Elixir OTP.

Tiago Duarte

Software Engineer

Elixir OTP: Scraping data from multiple sources - Coletiv Blog

Just like in the documentary series “Big, Bigger, Biggest” we are going to show the different versions we developed on a real project, alongside the pitfalls and breakthroughs of each version, to increasingly scrape and aggregate bigger amounts of data from multiple sources and feed that data into a comprehensive dashboard.

Challenge

Earlier this year we were challenged by Backercamp with a very interesting project, very different from the usual CRUD based applications. To give you some context Backercamp’s* *mission is to help creators of any kind — from new startups to established companies — bring products to market through crowdfunding platforms like Kickstarter or Indiegogo. Their aim is to bring performance and efficiency to crowdfunding by combining custom made software and their dedicated service.

Being a performance driven business, Backercamp faced a problem “how do we track and report to creators the progress and success of our work so that we increase both transparency and trust?”

One of the many ways Backercamp increases the sales of the creator’s product is by using online advertisement on social networks like Facebook.

When it comes to online advertisement, one of the most used metrics to track success is the so-called RoAS, an acronym for “Return on Advertising Spend”. The RoAS tracks the income a creator gets back from every unit of currency (e.g. euro or dollar) invested in advertisement.

After many successfully backed projects under their belt and with all the experience accumulated from talking with creators regarding the problem faced, they came up with the idea of creating a dashboard for each backer.

The dashboard would display aggregated data coming from both the crowdfunding platform and the advertisement platform, this would allow the live report and tracking of many metrics, including the aforementioned RoAS.

In technical terms, this meant that we would need to constantly fetch and aggregate data from sources that we don’t control and in some cases didn’t even have an open API.

We had never done such a project, so we went to the blackboard and started scribbling down possible solutions. Before we could reach any conclusions we had to get our heads back again into the Elixir documentation so that we could better understand the OTP design principles, namely Supervisors and Genservers. Here are some of the resources we explored:

V1 Source oriented approach

V1 Solution — Source oriented approach V1 Solution — Source oriented approach

Our initial approach was to scrape each source (e.g. Kickstarter) on its own GenServer. Each GenServer would, every 4 hours, fetch all the projects from the database and, project by project, fetch the correspondent source data.

Problems

What happened when fetching the data for a project fails causing the GenServer to crash?

With this approach the GenServer would die, then because the Supervision strategy was set up as being :permanent, the Supervisor would restart again the GenServer and it would try to sequentially scrape again all the projects. Because the code did not change the same error would happen again, causing again the GenServer to crash re-starting the process over and over.

Being noobs and going against the mantra “let it crash” from the Zen of Erlang, we tried to handle all the errors. Problem was that we were depending on external services, which meant we could not fully control the data and errors we were receiving so this was a dead end.

Duration of the scrapes was growing and growing

As soon as we added more projects to the platform and because the scrapes were sequential, the sources of data started to block us by retrieving rate limit errors.

Once again, we didn’t take the time to step back from the problem and threw more code into it. We decided to use the ElixirRetry dependency that provides an Exponential backoff algorithm.

We shot ourselves in the foot, this heavily increased the amount of time it took for each scrape to complete, having the risk to not even complete all the scrapes in the time window of 4 hours we had.

What if we wanted to manually scrape a single project?

At a certain point we wanted to allow the admin users to trigger scrapes for a single project so that they could update the data shown in the dashboard, but with the current solution, we could only scrape all projects at once.

This was the final nail in the coffin of the current solution and so we decided to get back to the drawing board.

V2 Project-oriented solution, a genserver per each project

V2 Project oriented solution, a genserver per each project V2 Project oriented solution, a genserver per each project

Luckily by the time we were drawing alternatives to our initial version, the DynamicSupervisor was released on Elixir v1.6.

Making use of this new capability we decided to create a project-oriented solution where each project has a Genserver responsible for scraping the data at every 4 hours. This allowed us to segregate the damages of a crash when scraping a project if the Genserver crashed it would not affect other projects.

The solution also allowed us to parallelize and distribute the scrapes throughout the 4-hour window, which completely reduced the number of rate limit errors being thrown by our sources of data. And finally, we were able to manually trigger scrapes on a single project.

Let’s take a look at 2 of our most common use cases: adding a new project and archiving a project. For simplification purposes, we abstracted many components that play a role in the whole flow (e.g. views, controllers, context, …).

Adding a new project

When an admin adds a new project, the dynamic supervisor starts a new child process passing it the project_id that the process will be responsible for. The process immediately stores a new key-value pair on the Erlang Term Storage (ETS) with the key being the project_id and the value being its Process ID (PID) and triggers an initial scrape.

This data stored on ETS will be later used when the admin user archives a project or triggers a scrape for a single project.

Add project flow Add project flow

Archiving a project

When an admin archives a project the flow is basically the reverse of adding a new project: the application retrieves the PID responsible for the project, the Dynamic Supervisor then deletes the entry for the ETS and using the retrieved PID terminates the child project responsible for the project.

Archive project flow Archive project flow

Triggering a scrape on a single project

The mechanism for scraping a single project is similar: the admin triggers a scrape, the application retrieves the PID responsible for the project, the Dynamic Supervisor then using the retrieved PID sends a message (:scrape_project) to the child project responsible for the project.

The code responsible for the Supervisor and Project Worker can be seen as follows:

defmodule Backercamp.Scraper.Supervisor do use DynamicSupervisor alias Backercamp.Management alias Backercamp.Scraper.WorkerStateStorage def start_link(arg) do DynamicSupervisor.start_link(__MODULE__, arg, name: __MODULE__) end def init(arg) do DynamicSupervisor.init(arg) end def start_project_worker(project_id) do spec = {Backercamp.Scraper.ProjectWorker, %{project_id: project_id}} DynamicSupervisor.start_child(__MODULE__, spec) end def start_all_project_workers() do Enum.each(Management.list_all_project_for_scrape(), fn %{id: project_id} -> start_project_worker(project_id) end) end def scrape_project(pid) do GenServer.cast(pid, {:scrape_project}) end def terminate_project_process(pid, project_id) do WorkerStateStorage.delete(project_id) Supervisor.terminate_child(__MODULE__, pid) end end defmodule Backercamp.Scraper.ProjectWorker do @moduledoc """ Use this genserver to scraper projects for Backercamp projects """ use GenServer, restart: :transient alias Backercamp.Management alias Backercamp.Management.{AdminSettings, Project} alias Backercamp.Scraper alias Backercamp.Scraper.WorkerStateStorage # client def start_link(state) do GenServer.start_link(__MODULE__, state) end # server def init(state) do # save pid state = Map.put(state, :pid, :erlang.pid_to_list(self())) # Reschedule schedule_scrape_project(AdminSettings.get_source_initial_scrape_delay_ms()) WorkerStateStorage.save(state) {:ok, state} end def handle_info(:schedule_scrape_project, state) do case scrape(state) do true -> stop(self(), state.project_id) false -> # Reschedule once more schedule_scrape_project(AdminSettings.get_source_scrape_frequency_ms()) end {:noreply, state} end def handle_cast({:scrape_project}, state) do case scrape(state) do true -> WorkerStateStorage.delete(state.project_id) {:stop, :normal, state} false -> {:noreply, state} end end defp schedule_scrape_project(time) do Process.send_after(self(), :schedule_scrape_project, time) end defp scrape(%{project_id: project_id, pid: pid} = state) do # ... end defp scrape(%Project{} = project, %{project_id: project_id, pid: pid}) do # ... end defp stop(pid, project_id) do WorkerStateStorage.delete(project_id) if Process.alive?(pid) do Process.exit(pid, :normal) end end end

Problems

How can we get rid of the start worker?

When the application starts we need to immediately start all project workers under the supervision of the Scrape Supervisor. At the beginning, we tried to start the project workers on the init method of the Supervisor, but we would get the following error:

[info] Application Backercamp exited: Backercamp.Application.start(:normal, []) returned an error: shutdown: failed to start child: Backercamp.Scraper.Supervisor ** (EXIT) exited in: GenServer.call(Backercamp.Scraper.Supervisor, {:start_child, {{Backercamp.Scraper.ProjectWorker, :start_link, [%{project_id: 30}]}, :transient, 5000, :worker, [Backercamp.Scraper.ProjectWorker]}}, :infinity) ** (EXIT) process attempted to call itself

To solve this problem we created a Genserver that is started after the Supervisor and whose only responsibility is to start all the project workers.

Would it be possible to simplify the current archival mechanism?

In order to manually archive a project we resorted to using ETS to store the project_id and the respective PID responsible for scraping its data, would there be a simpler way of doing this feature?

Different data from the same source requires different scrape frequency

Project requirements evolve over time and so it happened in our case.

Users were asking for certain data points to be updated more often, but with the current solution, it would not be possible. We can only get all the data at once for a certain project, we can’t tell the system to get just a single point of data for all projects.

This was when we, once again, had to improve the current solution. This time the change was not so drastic and we ended up mixing the ideas from the previous 2 versions.

V3 A mix between the previous solutions

V3 - Mix of previous solutions V3 - Mix of previous solutions

Picking up on the initial idea of having a source worker per each platform, we simply added a source worker responsible for fetching the data point needed with a much higher frequency (every 5 minutes).

If this worker fails no big problems arise as the regular scrapers will still work independently.

What will the future hold?

Snapshot of the dashboard with fake data Snapshot of the dashboard with fake data

At the time of writing, we don’t have big drawbacks from the current solution and the feedback from our dashboard users has been great. But from our experience we know that projects evolve, more features are added and current ones are changed which may lead to newer versions of the current architecture.

Also, we are still learning and experimenting a lot with the Elixir / Phoenix ecosystem which keeps giving us new ideas and improvements to our projects.

Lately, we have been using channels a lot and are moving more and more towards a near-real-time experience to the users. Our end goal is to provide a fully reactive dashboard that shows data with a delay that can be measured in seconds. We know it is a big aim but we also know how important data is for the decisions our users make, some of them bet their life on their creations and it is always a pleasure and also a responsibility to provide them with the best tool we can.

If you are launching a product on Kickstarter or Indiegogo you should definitely talk with Backercamp, they will provide all the help you need!

Elixir

Software Development

OTP

GenServer

Join our newsletter

Be part of our community and stay up to date with the latest blog posts.

Subscribe

Join our newsletter

Be part of our community and stay up to date with the latest blog posts.

Subscribe

You might also like...

Go back to blogNext
How to support a list of uploads as input with Absinthe GraphQL

Engineering

26 July, 2022

How to support a list of uploads as input with Absinthe GraphQL

As you might guess, in our day-to-day, we write GraphQL queries and mutations for Phoenix applications using Absinthe to be able to create, read, update and delete records.

Nuno Marinho

Software Engineer

Flutter Navigator 2.0 Made Easy with Auto Router - Coletiv Blog

Engineering

04 January, 2022

Flutter Navigator 2.0 Made Easy with Auto Router

If you are a Flutter developer you might have heard about or even tried the “new” way of navigating with Navigator 2.0, which might be one of the most controversial APIs I have seen.

António Valente

Software Engineer

Enabling PostgreSQL cron jobs on AWS RDS - Coletiv Blog

Engineering

04 November, 2021

Enabling PostgreSQL cron jobs on AWS RDS

A database cron job is a process for scheduling a procedure or command on your database to automate repetitive tasks. By default, cron jobs are disabled on PostgreSQL instances. Here is how you can enable them on Amazon Web Services (AWS) RDS console.

Nuno Marinho

Software Engineer

Go back to blogNext