When processing large amounts of data, using a single computer becomes cumbersome when we start to push the limits of our network bandwidth, disk space or processing power. Using a cluster of computers is the most common solution to this problem; but it comes with significant overhead, since a distributed application usually is much more complex than a traditional single-threaded process for the same task.
distributed-dataset is a work-in-progress Haskell library that aims to overcome this problem. It hides the complexity of coordinating those machines behind a high-level list-like API, so the data transformations can be expressed almost as easily as the traditional programming model. It is inspired by Apache Spark.
The library is based around the
Dataset type, which is internally implemented as a chain of transformations applied on top a partitioned data source. Those transformations are applied in a distributed fashion where the computations are shipped to executor nodes. The executors run in parallel, each working on a subset of the data.
The transformations are defined using ordinary Haskell functions. This is achieved by the StaticPointers GHC extension and the distributed-closure library. Thanks to those,
distributed-dataset can leverage the existing Haskell ecosystem, both when defining transformations and when reading and writing the inputs and outputs. More specifically, it usually uses
Conduits on the low-level interface; so integrating a new data source or a new kind of transformation is usually as easy as writing a
Conduit for the purpose.
distributed-dataset uses pluggable Backends for actually running the computations. Backends are easy to implement; but currently the only existing useful backend is
distributed-dataset-aws. This backend uses AWS Lambda to run the computations and S3 to store the intermediate results. The main advantage is that it can rapidly spin up thousands of containers to run the transformations in parallel without requiring any pre-existing infrastructure. The library also ships with
localProcessBackend which is should only used for local development and tests.
Before we dig into the details, let’s walk through a small example. The code below downloads all public GitHub events from 2020 using GH Archive, and ranks the users based on the number of commits they pushed which contain the word “cabal” in the message. The entire code can be found (and built & ran) in the
examples/ subdirectory on the repository, but the relevant function is pasted below:
app :: DD () app = ghArchive (fromGregorian 2019 1 1, fromGregorian 2019 12 31) & dConcatMap (static (\e -> let author = e ^. gheActor . ghaLogin commits = e ^.. gheType . _GHPushEvent . ghpepCommits . traverse . ghcMessage in map (author, ) commits )) & dFilter (static (\(_, commit) -> "cabal" `T.isInfixOf` T.toLower commit )) & dGroupedAggr 50 (static fst) dCount & dAggr (dTopK (static Dict) 20 (static snd)) >>= mapM_ (liftIO . print)
The code, I believe, is quite readable and concise; while not using the fancier features of Haskell.
Let’s go through it step by step:
This function is provided by a separate library called
distributed-dataset-opendatasets. It returns a
Dataset GHEvent with public GitHub events within the given dates. Internally, each partition in the returned
Dataset downloads, decompresses and parses 4 hours worth of data. This means that
distributed-dataset will spawn 2190 (365*24/4) executors to process this
Dataset by default.
Here we are using two combinators from
As you can see, they are quite similar to
filter functions in
Data.List, and they work as you expect. There are two main differences:
StaticSerializeconstraints on input and output values since they will possibly be transferred across executors.
Closures using the
statickeyword. This keyword has some constraints about what kind of values it can wrap, but usually those limitations can be overcome using the combinators in the
Another thing to point out is that
Dataset is a delayed representation of the transformations; so they are only evaluated when the final result is requested, also they are fused when possible. In this example, the two calls to
dFilter will only do a single pass over the input dataset.
Transformations requiring information from multiple input rows that are possibly on different partitions, are called “wide” (as opposed to “narrow”). This means that the data will be serialized and “shuffled” across the network. In
distributed-dataset, this is usually done with the
Aggr data type, which provides some utilities around creating and composing these kind of transformations.
In this snippet; using the
dGroupedAggr function, we first apply
dCount aggregation to every row where the
fst is the same; meaning we are counting the number of commits per author. After that, we use
dAggr function to apply
dTopK aggregation to find the top 20 rows according to the commit count over the whole dataset.
dAggr function aggregates the entire
Dataset, it directly returns the result of an aggregation as an ordinary Haskell value. So we can just print the value in the end:
This example will download around 300 GB of compressed data, extract them to almost a terabyte worth of JSON, parse, aggregate and transfer the result back to us. Using
distributed-dataset-aws, the whole process will take less than two minutes, including the time to provision and destroy the required cloud resources. Here’s the result for the curious:
As mentioned before, a
Dataset is a distributed multiset, where the actual rows live in separate partitions which can be fetched and processed in parallel. For simpler transformations this can be done easily;
filters can be applied to each partition independently. An n-fold speedup can be achieved easily in this case by processing each partition in parallel on separate executors.
However, it requires a bit more effort when a single output row requires information from multiple input rows. Imagine a
distinct transformation which removes duplicate rows from a
Dataset. We can simply remove duplicates within a partition, however there can still be duplicates spread across different partitions; so we can not complete this task by processing each partition on its own. In this case, we need another kind of operation where the rows are shuffled across partitions so that the same rows always end up in the same partition. This can also be done in parallel; we can pass each row through an hash function and determine the target partition using
hash(row) / partition_count formula. With this functionality, it is possible to implement the
distinct combinator by:
It turns out those two operations are enough to implement the
Dataset type. Below is a slightly simplified excerpt from the library:
newtype Partition a = Partition (Closure (ConduitT () a (ResourceT IO) ())) data Dataset a where DExternal :: StaticSerialise a => [Partition a] -> Dataset a DPipe :: StaticSerialise b => Closure (ConduitT a b (ResourceT IO) ()) -> Dataset a -> Dataset b DPartition :: StaticHashable k => Closure (a -> k) -> Dataset a -> Dataset a
Partitionis a set of rows that can be streamed independently.
Datasets can be created from a list of
Partitions using the
DPipeconstructor takes a
Conduitand converts a
Dataset ato a
Dataset bby passing every
Partitionthrough the given
DPartitionconstructor takes a key function and shuffles the data across
Partitions so that all rows sharing the same key ends up in the same
Almost everything else in the library is implemented in terms of above data types.
distributed-dataset implements a bunch of well-known combinators like
filter; and most significantly it adds a data type called
Aggr implementing composable map-reduce style aggregations, see Control.Distributed.Dataset.Aggr module for more information on this.
As mentioned before,
distributed-dataset uses pluggable
Backends to actually run the tasks. Implementing a
Backend is quite simple, only a single function with signature
ByteString -> IO ByteString is enough; which should spawn the current program executable(
argv) in some environment, pass the given
ByteString as standard input and return the standard output. This provides us the ability to use arbitrary Haskell functions in our transformations, but comes with the requirement of using the exact same binary on the executors.
distributed-dataset-aws requires the executable is compiled statically, and handles shipping the binary to the executors.
The library also exposes the
Control.Distributed.Fork which provides a
fork function which can run any
Closure (IO ()) using a
Backend and fetch the result back. This is a very useful building block if you just need an equivalent of
forkIO using multiple computers rather than multiple threads; so it is exposed as a separate module.
This is pretty much everything
distributed-dataset can do for now. It does not come batteries-included, but the code is quite hackable (let me know if you are interested!). Don’t build your production system on top of it (yet), but I believe it might already be useful to some.
If anyone else is keen to work on this; below is a non-exhaustive list of missing features:
dJoin :: Dataset (k, a) -> Dataset (k, b) -> Dataset (These a b)
Conduit’s to stich together from various Hackage libraries; we should make this easier by providing utilities for common uses.
Backends; Kubernetes, YARN, Mesos, Google Cloud Run, static machines…