14
Data pipelines with Spotify's Luigi
Hi!
At Wonderflow we're doing a lot of ML / NLP using Python and recently we are enjoying writing data pipelines using Spotify's Luigi.
It's quite easy to find online lots of posts about Luigi, especially comparisons with other tools like Apache AirFlow.
In this post I'll try to explain how we're using Luigi at Wonderflow, what issues we found and how we handled them.
When it comes to do computationally expensive analysis on data, state management is very important.
If you have to do a generic computation wrapped in a method like the following:
def compute():
# step 1
data = fetch_data(data, params)
# step 2
data_1 = heavy_computation_1(data, a=1, b=2, c=3)
# step 3
data_2 = heavy_computation_2(data_1, x=1, y=2, z=3)
return data_2
There are 3 main steps here and 2 of them are quite heavy to compute.
Imagine now that you are tuning step 3 x
, y
and z
parameters. And you find yourself running this method over and over with different parameter values and check every time if the output suits you.
This might seem the fastest way to do it, but you're actually wasting a lot of time: to check heavy_computation_2
result you're continuously run fetch_data
and heavy_computation_1
on (probably) the same data.
Although you can apply workarounds, even if you end up with a complete pipeline scheduled by an orchestrator, you might end up doing many useless things (i.e. doing the same things on the same data many times)
We ran into this kind of issues and we need to find a clean workaround.
As you may already found out, Luigi has been designed to solve issues like the one above.
Let's rewrite the task as a Luigi pipeline:
import luigi
from luigi.local_target import LocalTarget
from fetching import fetch_data
from processing import heavy_computation_1, heavy_computation_2
from json import dumps, loads
class FetchData(luigi.Task):
def run(self):
data = fetch_data()
with self.output().open("w") as outfile:
outfile.write(dumps(data))
def output(self):
return LocalTarget("output/data.json")
class HeavyComputationOne(luigi.Task):
a = luigi.IntParameter(default=42)
b = luigi.IntParameter(default=42)
c = luigi.IntParameter(default=42)
def requires(self):
return FetchData()
def run(self):
with self.input().open('r') as infile:
input_data = loads(infile.read())
data = heavy_computation_1(input_data, a=self.a, b=self.b, c=self.c)
with self.output().open("w") as outfile:
outfile.write(dumps(data))
def output(self):
return LocalTarget(f"output/processed_data_1-{self.a}-{self.b}-{self.c}.json")
class HeavyComputationTwo(luigi.Task):
x = luigi.IntParameter(default=42)
y = luigi.IntParameter(default=42)
z = luigi.IntParameter(default=42)
def requires(self):
return HeavyComputationOne(a=1, b=2, c=3)
def run(self):
with self.input().open('r') as infile:
input_data = loads(infile.read())
data = heavy_computation_2(input_data, x=self.x, y=self.y, z=self.z)
with self.output().open("w") as outfile:
outfile.write(dumps(data))
def output(self):
return LocalTarget(f"output/processed_data_2-{self.x}-{self.y}-{self.z}.json")
Luigi Task structure is quite simple, you simply write a Python (sub)class with 3 methods:
-
requires()
which defines task requirements (if any) -
run()
which contains the task business logic -
output()
which defines the kind of output (in this case files on local filesystem, but they could be e.g. files on S3 or many more things)
As you can see, each task has a defined output file which is reflecting input parameters. This is needed since in Luigi each task is idempotent.
It means that Luigi will not run the same task with same output twice.
Now you can run the HeavyComputationTwo
from CLI (yes, Luigi will generate automatically the CLI for you) task N times, using different values of x
, y
and z
parameters. Luigi will run the requirements (HeavyComputationOne
and FetchData
) only once.
So no more wasting time running unneeded tasks :-) And you can focus on tuning your task's logic.
Moreover, this makes the pipeline stateful, fixing the issue we had with the compute()
method.
After trying Luigi a bit, we initially tough we found the silver bullet for our needs, but as everything it has pros and cons.
Let's go through some of them.
You may have noticed that Luigi is heavily coupled with your code (e.g. the main class task is a subclass of a Luigi's class). Mainly for this reason, lots of people are saying that Luigi pipelines are hard to test.
Since we're doing TDD on almost any kind of software in Wonderflow, we wanted to make sure that TDD is possible even using Luigi.
I think it's true that Luigi is heavily coupled with your code, but I don't think this makes pipelines or tasks hard to test.
Here's the testing pattern we're using.
When you usually write a test, you follow the Given-When-Then structure:
def test_something():
# Given an initial situation
# When something happens
# Then there's an outcome
Imagine we want to test the FetchData
task (ideally we write the test first!):
import luigi
from tasks import FetchData
from json impo
def setup_db():
# here we setup a local test DB with test data
pass
def test_fetch_data_task():
setup_db()
task = FetchData()
assert luigi.build([task], local_scheduler=True)
assert task.output().exists()
data = loads(task.output().open("r").read())
assert len(data) > 0
# other assertions (if needed)
To recap:
- We setup a local database and instantiate the FetchData task
- We run the task and ensure the output exists
- We read the task's output and make needed assertions on it
This is quite effective and nice to read, perfectly following the Given-When-Then pattern.
You may have noticed that this is an integration test, which is indirectly covering the fetch_data()
method.
As a best practice, you need to do unit test on all the methods you're using inside the main run()
one, since they also can be used outside a Luigi task.
So, since on FetchData
you're calling fetch_data()
you need to unit-testing it as well :-)
If all the methods called inside the task are unit tested, and you test the whole task (or pipeline, if task has requirements) as described above, you're basically doing a nice unit + integration test combo, which is usually enough to avoid code regression.
Considering the pipeline above:
FetchData
-> HeavyComputationOne
-> HeavyComputationTwo
You may have noticed that I've hardcoded a
, b
and c
params on HeavyComputationTwo
task requirements.
So, if you have to run HeavyComputationTwo
but you need e.g. to specify different a
, b
or c
values other than x
, y
and z
ones, do you need to declare again the HeavyComputationOne
parameters again?
class HeavyComputationTwo(luigi.Task):
a = luigi.IntParameter(default=42)
b = luigi.IntParameter(default=42)
c = luigi.IntParameter(default=42)
x = luigi.IntParameter(default=42)
y = luigi.IntParameter(default=42)
z = luigi.IntParameter(default=42)
# ...
The answer is: of course not. If you do that, the code will be a lot less readable and if params changes, you have to change them in every chained task.
Fortunately, Luigi comes with a very handy inherits method from its util
module.
So you simply have to decorate the HeavyComputationTwo
method specifying that it inherits from HeavyComputationOne
, in this way:
from tasks import HeavyComputationOne
from luigi.util import inherits
@inherits(HeavyComputationOne)
class HeavyComputationTwo(luigi.Task):
x = luigi.IntParameter(default=42)
y = luigi.IntParameter(default=42)
z = luigi.IntParameter(default=42)
# ...
So, a
, b
and c
are now inherited from HeavyComputationOne
, and there's no need to re-declare them anymore.
As described in the docs, there are some limitations by design.
According to Luigi's execution model, authors say that Luigi scheduler could hang if you scheduled more than a few thousands of jobs at once.
It's also true that Luigi is designed to handle well batch processing jobs. So if you need to run lots of pipelines on an almost real-time situation, it might be not the best tool to use. Better to read carefully the docs first!
We are running only batch processing pipelines, mainly related to ML / NLP tasks and it suits us quite well.
If you really need to launch many tasks at once, there's also a Kubernetes Job wrapper plugin available.
If having N different workers does not suits you and the specific task is very complex in term of computation, you may delegate the required parallelism to other tools (if multiprocessing
is not enough.
We have tasks which actually require lots of different Spacy language models to be loaded at once, and we load them on many processes at once.
To do that, we are efficiently using Dask, simply creating on-demand local (or remote) clusters on task run()
method:
import luigi
import dask.bag as db
class HeavyTask(luigi.Task):
def run():
# ...
with create_local_cluster(memory_limit="4GB", n_workers=6) as cluster, Client(cluster):
bag = db.from_sequence(data)
output +=bag.map_partitions(heavy_method, arg1=arg1)
.compute()
)
Where create_local_cluster()
simply wraps the related Dask method:
from dask.distributed import LocalCluster
def create_local_cluster(n_workers: int = 4, **kwargs) -> LocalCluster:
return LocalCluster(
n_workers=n_workers,
worker_dashboard_address=None,
**kwargs,
)
With Dask you can create local, SSH, or Kubernetes-based clusters according to your needs. It may require a bit of tuning but it's more than enough to handle parallelism in our common use cases.
This is true, but I would not necessarily see this as a drawback. There are many options here like:
- API
- Crontab
- SSH (e.g. a Jenkins pipeline which fires a Luigi pipeline via SSH)
If you have a small team which is already proficient in Python, Luigi's relatively short learning curve and benefits could be ideal for your data pipelines, especially if you mainly run batch processing jobs.
Moreover, configure and deploy the Luigi's Scheduler on a server / pod for production use is easy, while it might be not for other similar tools like Apache AirFlow.
14