In this post we’re going to talk about the process of moving from an ETL script to a robust Dagster pipeline using Software-Defined Assets.
- 🧠 What is ETL?
- 💥 What’s wrong with ETL scripts?
- 🧑🏫 Introducing the example
- 🚅 Migrating to Software-Defined Assets
- 🧱 Making your pipeline robust
- 🗓 Scheduling your pipeline
- ♻️ Increase development velocity
- ☁️ Abstracting away storage
- 🧬 Future work
ETL is an acronym for “Extract-Transform-Load.” This is one of the most common activities that data practitioners need to do day to day. Broadly, they consist of three steps:
- Extract: fetch data from some upstream source, like a database, file system, or third-party API.
- Transform: take the extracted data and change it in some way that adds business value.
- Load: take the transformed data and store it in some downstream system (like a database or filesystem) where it can be used by others.
ETL scripts are often called ETL pipelines, because they start out as a single ETL script, but eventually grow to a series of interdependent ETL scripts that need to be run in the correct order. When visualized, this resembles a pipeline.
Usually, ETL pipelines start out as a set of Python scripts that are run via cron. While this is easy to get started with, it can introduce numerous problems.
ETL scripts are easy to write but are hard to maintain as the number of scripts and the complexity of the pipeline grows. They introduce a number of issues:
- Development velocity. Complex ETL pipelines may take many minutes or hours to run, and the entire pipeline must be re-run for every change. In these situations, any change, regardless of how simple or complex, is very expensive to make as the developer needs to continuously re-run the pipeline from start to finish.
- Scheduling. ETL scripts need to be run on a regular basis. They are often triggered either by an event or the passage of time. cron works for a while, but as interdependencies between scripts get more complex, cron becomes unwieldy.
- Robustness. ETL scripts can fail and should page their developers. However, ideally an ETL script would attempt to recover from the failure before waking someone up in the middle of the night with a page.
- Testability. Running prototype ETL scripts in production is dangerous and expensive. They should be developed locally with automated testing to improve quality and development velocity. Refactoring ETL scripts to be testable in a local environment can be quite painful.
Introducing a data orchestrator can solve many of these problems. Introducing an asset-oriented data orchestrator is even better. But we’ll get to that in a second.
Let’s consider a simple ETL script example. It will fetch the top 500 Hacker News stories and create a wordcloud visualization of the top headlines.
It’ll have three steps, of course: extract, transform and load. But before we get to this, let’s create a new Python file called hackernews.py
and add some imports:
import base64
from io import BytesIO
import matplotlib.pyplot as plt
import pandas as pd
import requests
from wordcloud import STOPWORDS, WordCloud
from tqdm import tqdm
Let’s also install the dependencies we need.
pip install matplotlib pandas requests wordcloud tqdm
Next, let’s add our extract step. This will hit the Hacker News API to fetch the IDs of the top 500 stories, and then fetch metadata for each story, returning the result as a pandas DataFrame
.
def extract() -> pd.DataFrame:
newstories_url = "https://hacker-news.firebaseio.com/v0/topstories.json"
hackernews_topstory_ids = requests.get(newstories_url).json()
results = []
for item_id in tqdm(hackernews_topstory_ids):
item = requests.get(
f"https://hacker-news.firebaseio.com/v0/item/{item_id}.json"
).json()
results.append(item)
hackernews_topstories = pd.DataFrame(results)
return hackernews_topstories
Transform step
Next, we’ll take the data fetched in the extract step and turn it into a wordcloud. I won’t dig too deep into the details here, but we:
- Split the titles into words and remove commonly used words.
- Create a wordcloud using
matplotlib
and thewordcloud
library. - Embed the wordcloud in a markdown document using base64 encoding.
Here’s the code.
def transform(hackernews_topstories: pd.DataFrame) -> str:
stopwords = set(STOPWORDS)
stopwords.update(["Ask", "Show", "HN"])
titles_text = " ".join([str(item) for item in hackernews_topstories["title"]])
titles_cloud = WordCloud(stopwords=stopwords, background_color="white").generate(
titles_text
)
plt.figure(figsize=(8, 8), facecolor=None)
plt.imshow(titles_cloud, interpolation="bilinear")
plt.axis("off")
plt.tight_layout(pad=0)
buffer = BytesIO()
plt.savefig(buffer, format="png")
image_data = base64.b64encode(buffer.getvalue())
return f"""
# Wordcloud of top Hacker News stories
})
""".strip()
Load step
For the final step we write the markdown to a file on disk. In production we may choose to store it somewhere else, like Amazon S3.
def load(md_content: str):
with open("output.md", "w") as f:
f.write(md_content)
Running the pipeline
Finally, let’s add some code to pull all the pieces together.
if __name__ == "__main__":
input = extract()
output = transform(input)
load(output)
Running the script should produce an output.md
file containing the wordcloud.
The first step when migrating an ETL script to Dagster is actually quite simple. Just wrap your entire script in a single software-defined asset. Create hn_dagster.py
with the following code:
from hackernews import extract, transform, load
from dagster import asset
@asset
def hackernews_wordcloud():
input = extract()
output = transform(input)
load(output)
As you can see, we’ve basically just copy-pasted the