Home

[ANN] distributed-dataset: A distributed data processing framework in Haskell

3 March 2020

When processing large amounts of data, using a single computer becomes cumbersome when we start to push the limits of our network bandwidth, disk space or processing power. Using a cluster of computers is the most common solution to this problem; but it comes with significant overhead, since a distributed application usually is much more complex than a traditional single-threaded process for the same task.

distributed-dataset is a work-in-progress Haskell library that aims to overcome this problem. It hides the complexity of coordinating those machines behind a high-level list-like API, so the data transformations can be expressed almost as easily as the traditional programming model. It is inspired by Apache Spark.

The library is based around the Dataset type, which is internally implemented as a chain of transformations applied on top a partitioned data source. Those transformations are applied in a distributed fashion where the computations are shipped to executor nodes. The executors run in parallel, each working on a subset of the data.

The transformations are defined using ordinary Haskell functions. This is achieved by the StaticPointers GHC extension and the distributed-closure library. Thanks to those, distributed-dataset can leverage the existing Haskell ecosystem, both when defining transformations and when reading and writing the inputs and outputs. More specifically, it usually uses Conduits on the low-level interface; so integrating a new data source or a new kind of transformation is usually as easy as writing a Conduit for the purpose.

distributed-dataset uses pluggable Backends for actually running the computations. Backends are easy to implement; but currently the only existing useful backend is distributed-dataset-aws. This backend uses AWS Lambda to run the computations and S3 to store the intermediate results. The main advantage is that it can rapidly spin up thousands of containers to run the transformations in parallel without requiring any pre-existing infrastructure. The library also ships with localProcessBackend which is should only used for local development and tests.

Example

Before we dig into the details, let's walk through a small example. The code below downloads all public GitHub events from 2020 using GH Archive, and ranks the users based on the number of commits they pushed which contain the word "cabal" in the message. The entire code can be found (and built & ran) in the examples/ subdirectory on the repository, but the relevant function is pasted below:

app :: DD ()
app =
  ghArchive (fromGregorian 2019 1 1, fromGregorian 2019 12 31)
    & dConcatMap (static (\e ->
        let author = e ^. gheActor . ghaLogin
            commits = e ^.. gheType . _GHPushEvent
                        . ghpepCommits . traverse . ghcMessage
        in  map (author, ) commits
      ))
    & dFilter (static (\(_, commit) ->
        "cabal" `T.isInfixOf` T.toLower commit
      ))
    & dGroupedAggr 50 (static fst) dCount
    & dAggr (dTopK (static Dict) 20 (static snd))
    >>= mapM_ (liftIO . print)

The code, I believe, is quite readable and concise; while not using the fancier features of Haskell.

Let's go through it step by step:


ghArchive (fromGregorian 2019 1 1, fromGregorian 2019 12 31)

This function is provided by a separate library called distributed-dataset-opendatasets. It returns a Dataset GHEvent with public GitHub events within the given dates. Internally, each partition in the returned Dataset downloads, decompresses and parses 4 hours worth of data. This means that distributed-dataset will spawn 2190 (365*24/4) executors to process this Dataset by default.


  & dConcatMap (static (\e ->
      let author = e ^. gheActor . ghaLogin
          commits = e ^.. gheType . _GHPushEvent
                      . ghpepCommits . traverse . ghcMessage
      in  map (author, ) commits
    ))
  & dFilter (static (\(_, commit) ->
      "cabal" `T.isInfixOf` T.toLower commit
    ))

Here we are using two combinators from distributed-dataset:

dConcatMap :: (StaticSerialise a, StaticSerialise b)
           => Closure (a -> [b]) -> Dataset a -> Dataset b
dFilter :: StaticSerialise a
        => Closure (a -> Bool) -> Dataset a -> Dataset a

As you can see, they are quite similar to concatMap and filter functions in Data.List, and they work as you expect. There are two main differences:

Another thing to point out is that Dataset is a delayed representation of the transformations; so they are only evaluated when the final result is requested, also they are fused when possible. In this example, the two calls to dConcatMap and dFilter will only do a single pass over the input dataset.


  & dGroupedAggr 50 (static fst) dCount
  & dAggr (dTopK (static Dict) 20 (static snd))

Transformations requiring information from multiple input rows that are possibly on different partitions, are called "wide" (as opposed to "narrow"). This means that the data will be serialized and "shuffled" across the network. In distributed-dataset, this is usually done with the Aggr data type, which provides some utilities around creating and composing these kind of transformations.

In this snippet; using the dGroupedAggr function, we first apply dCount aggregation to every row where the fst is the same; meaning we are counting the number of commits per author. After that, we use dAggr function to apply dTopK aggregation to find the top 20 rows according to the commit count over the whole dataset.


Finally, since dAggr function aggregates the entire Dataset, it directly returns the result of an aggregation as an ordinary Haskell value. So we can just print the value in the end:

  >>= mapM_ (liftIO . print)

This example will download around 300 GB of compressed data, extract them to almost a terabyte worth of JSON, parse, aggregate and transfer the result back to us. Using distributed-dataset-aws, the whole process will take less than two minutes, including the time to provision and destroy the required cloud resources. Here's the result for the curious:

("bgamari",25334)
("ghc-mirror-bot",897)
("aarongable",639)
("peti",634)
("haskell-pushbot",590)
("pull[bot]",539)
("phadej",311)
("fendor",268)
("iohk-bors[bot]",262)
("mergify[bot]",191)
("Ericson2314",190)
("rcaballeromx",184)
("jneira",177)
("vadimeisenbergibm",144)
("aherrmann",127)
("felixonmars-bot",121)
("alanz",111)
("diyessi",107)
("newhoggy",105)
("vdemeester",104)

Implementation

As mentioned before, a Dataset is a distributed multiset, where the actual rows live in separate partitions which can be fetched and processed in parallel. For simpler transformations this can be done easily; maps and filters can be applied to each partition independently. An n-fold speedup can be achieved easily in this case by processing each partition in parallel on separate executors.

However, it requires a bit more effort when a single output row requires information from multiple input rows. Imagine a distinct transformation which removes duplicate rows from a Dataset. We can simply remove duplicates within a partition, however there can still be duplicates spread across different partitions; so we can not complete this task by processing each partition on its own. In this case, we need another kind of operation where the rows are shuffled across partitions so that the same rows always end up in the same partition. This can also be done in parallel; we can pass each row through an hash function and determine the target partition using hash(row) / partition_count formula. With this functionality, it is possible to implement the distinct combinator by:

  1. Removing duplicates within each partition.
  2. Shuffling the rows so the same rows end up in the same partition.
  3. Removing duplicates within each partition again.

It turns out those two operations are enough to implement the Dataset type. Below is a slightly simplified excerpt from the library:

newtype Partition a
  = Partition (Closure (ConduitT () a (ResourceT IO) ()))

data Dataset a where
  DExternal
    :: StaticSerialise a
    => [Partition a]
    -> Dataset a
  DPipe
    :: StaticSerialise b
    => Closure (ConduitT a b (ResourceT IO) ())
    -> Dataset a
    -> Dataset b
  DPartition
    :: StaticHashable k
    => Closure (a -> k)
    -> Dataset a
    -> Dataset a

To clarify:

Almost everything else in the library is implemented in terms of above data types. distributed-dataset implements a bunch of well-known combinators like map and filter; and most significantly it adds a data type called Aggr implementing composable map-reduce style aggregations, see Control.Distributed.Dataset.Aggr module for more information on this.

Control.Distributed.Fork

As mentioned before, distributed-dataset uses pluggable Backends to actually run the tasks. Implementing a Backend is quite simple, only a single function with signature ByteString -> IO ByteString is enough; which should spawn the current program executable(argv[0]) in some environment, pass the given ByteString as standard input and return the standard output. This provides us the ability to use arbitrary Haskell functions in our transformations, but comes with the requirement of using the exact same binary on the executors. distributed-dataset-aws requires the executable is compiled statically, and handles shipping the binary to the executors.

The library also exposes the Control.Distributed.Fork which provides a fork function which can run any Closure (IO ()) using a Backend and fetch the result back. This is a very useful building block if you just need an equivalent of forkIO using multiple computers rather than multiple threads; so it is exposed as a separate module.

Final Words

This is pretty much everything distributed-dataset can do for now. It does not come batteries-included, but the code is quite hackable (let me know if you are interested!). Don't build your production system on top of it (yet), but I believe it might already be useful to some.

If anyone else is keen to work on this; below is a non-exhaustive list of missing features:

  1. Relational joins. dJoin :: Dataset (k, a) -> Dataset (k, b) -> Dataset (These a b)
  2. A row-polymorphic API. Currently we manipulate ordinary Haskell data types; if we had more descriptive types for the data types and transformations, specifying the columns they have and how they are modified, we could apply many more optimisations using that information.
  3. An SQL interpreter. This would make it possible to have an interactive notebook-like experience that can be used without recompiling any Haskell code.
  4. Helpers for common data formats and storage systems. Currently this requires figuring out which Conduit's to stich together from various Hackage libraries; we should make this easier by providing utilities for common uses.
  5. More Backends; Kubernetes, YARN, Mesos, Google Cloud Run, static machines...

Project is hosted at https://github.com/utdemir/distributed-dataset. Feel free to open issues for questions, feature requests or bug reports, and send me an email if you want to discuss it further.

Home
© Utku Demir, 2015-2023. Source: github.com/utdemir/utdemir.com.
This work is licensed under a Creative Commons Attribution 4.0 International License.