Monday, July 22, 2013

Musicbrainz Graph

For a recent project, we needed access to the Musicbrainz core data. This data is published in a relational representation that has about 180 tables.  That representation was inconvenient.  The query planner sometimes had problems, and we didn't have all the indexes we needed anyway.  Figuring out what was where also wasn't easy.

We wanted a simple graph representation, so we sketched a transformation tool which generated a graph from the core SQL tables.  The output was a graph with 94 million nodes and 460 million edges.  Not bad at all.

We did some initial prototyping with Neo4j.  After a fair amount of experimentation with various tuning knobs, we eventually concluded that Neo4j was (at least at that time) too inefficient for our application.

We had the good fortune of being able to tolerate write-once data.  Every day we could pull the latest Musicbrainz data, generate a completely new graph representation, and switch over to that new system in production almost instantly.  (Our clients all retry on certain failures, so the sub-millisecond switch-over is not a problem.)

Since we can use write-once data, we could easily intern all strings to 32-bit integers while preserving lexicographical ordering.  That technique gives a 12 byte representation of an edge without any additional effort.  Good enough.

For non-trivial data that can fit on a single machine, we often prefer native datastores.  In this case, we used Kyoto Cabinet, which is a system in the DBM genreLevelDB and Berkeley DB are in the same family, and we move between these systems fairly easily.

We can't live without a REPL for exploration, so we decided to access the data from Clojure.  We don't need much in the way of fancy graph algorithms.  Instead, we really just get basic edge traversal.  For example, we want to be able do something like this:

That's easy.  Here's a fragment of code to get a node's properties:

With data on an SSD (of course!), we easily get millisecond latencies (95%) for basic graph access.  To search the strings, we threw in a custom Lucene index (sitting on the same SSD).  The result: fast, economical, uniform, and convenient access to about a half a billion edges in about 1000 lines of code with no noteworthy dependencies.

Friday, July 19, 2013


morphismlabs/kyotograph is a little demonstration of using Kyoto Cabinet (or other order-supporting indexes) for working with graph data that will fit on a single machine.

An edge is stored as a B*Tree key. The value is null. The edge's source, label, and target should have fixed-length representations. So part of the task is to come up with those representations. Sometimes you can play interesting games with locality.

You can reimplement this simple approach in the language of your choice in a few hundred lines of code.  We can't live without a REPL when exploring data.  We're using Clojure here, so we can launch work in threads once we've figured out what we want to do.

To provide some robustness, this example code uses a TAB to delimit source, edge label, and target. (An optimal triple representation doesn't need a field separator.) So a B*Tree key looks like
source TAB edgeLabel TAB target
We haven't done good performance benchmarks, so we offer some anecdotes.  With no tuning, a modest machine can load 20+K edges per second per core/filesystem. Throughput can go much higher. Edge traversal can run at 50+K per second per core/filesystem.

Since we want to keep our B*Trees small, we want to partition the nodes when we have many edges.  Easy to do.  Here's an example run with four partitions:

On a 2-core VM with an SSD, we loaded 89K edges at the rate of 65K edges per second.  We walked edges at the rate of 39K per second.  Count2 is a count of a node's grandchildren.  The shape of the generated data is obviously important.  That histogram says that 99% of the edges had fewer than three grandchildren, so this random data is very sparse.

To stress a single B*Tree a little bit, we loaded up 36M edges and we saw about 10K Count2 s per second and 24K edge traversals per second.  The bottleneck was mostly CPU.  But -- again -- we didn't really do any serious performance testing.  Note: With SSDs, it's always a good idea to keep an eye on pdflush during write loads.

Quarterly Census of Employment and Wages

A few of our projects use the Quarterly Census of Employment and Wages from the Bureau of Labor Statistics.  These time series are published in a format that is inconvenient for our purposes, so we transformed the data into a timeseries ID key and TSV string value representation in a random access datastore (Kyoto Cabinet).

As published, the QCEW data is (currently) about 2.5GB compressed.  In our datastore, the same data is about 25GB uncompressed and 5GB compressed.  About 14M time series.  Everything sits on a SSD.  With this configuration, we can get a time series in less than a millisecond, which is great for the downstream analysis we do.

We nearly always expose API calls via a CLI, so we can do:

We store series metadata in a little Solr database.  The metadata includes:

For example, here's a search for industry 62441 and aggregation level 77:

Now for some graphs using D3:

This data represents residential plumbing and HVAC contractor employment in Dallas County, Texas.  Incidentally, you can see the seasonality in HVAC contracting.  We sometimes use x13as to process seasonal data.  Or we use the R package x12, which is pretty convenient:

We work over the data in lots of other ways, but that's another story.

Wednesday, July 17, 2013

Accumulo Mutation Hooks

Messing with the write path of a datastore is asking for trouble.

If you like trouble, then you might like the idea of using an Accumulo constraint as a mutation hook.  As with triggers, use with caution.  This example publishes mutations to a Redis pub/sub key, so other systems can listen to mutations of the specified tables.  Queues (maybe after filtering) also make a good target.

So how to parameterize these mutation hooks?  Currently Accumulo doesn't support constraint parameters, but, with some hacking, we could send some parameters through Java.  However, we'd then need to do a bit more work to get that data across the cluster.  Instead, we usually use Zookeeper as the authoritative source for this kind of state.

The resulting behavior is pretty convenient.  We can set hooks on certain tables, and we can tell Zookeeper where to route the mutations.  For example, we could tell Zookeeper to tell the nodes to publish any delete mutations for a specified column family to a Redis channel.  If we want another column family to be treated the same way, we just tell Zookeeper.  We'll publishe some more example code soon.

Why Accumulo?

We use lots of different datastores, and we often use federated native datastores (e.g., LevelDB, Kyoto Cabinet, Berkeley DB (C Edition), etc.) for high performance.  Sometimes we wrap those in some simplified Dynamo or related scheme for scaling out if/when we hit single machine limits.  We also use some of the popular distributed datastores (Cassandra, Riak, DynamoDB, etc.), and we of course use relational datastores (PostgreSQLMySQL-derived databases, etc.) as both relational datastores and graph/key-value datastores.  So why also Accumulo?

Accumulo claims several interesting features.  That's a nice list.

For some projects, cell-level access control is simply required.

As we've discussed, we've used iterators and combiners to generate statistical data pretty efficiently.  Doing computation in the datastore along with that datastore's required processing is very convenient.  (As with stored procedures and triggers, one must be careful, of course.)  We're pulling together some of our iterators and combiners for release on, but we're not there yet.

We also like Accumulo's use of HDFS for what HDFS is pretty good at.

As many folks have reported, Accumulo handles certain graph data well.  Accumulo stores records ordered by row ID, and this locality can be exploited in many algorithms.  Cassandra by default uses a RandomPartitioner.  A ByteOrderedPartitioner is also option, as is anything else you want to implement yourself.  The trade-offs are interesting.  For example, depending on your data representation, you could find yourself using the random partitioner to avoid hotspots only to run into hotspots within a row.  All of these issues are relevant to Accumulo in several respects.  (On a related note, consider Bloom filter behavior when using both systems.  Too many elements in a Bloom filter can obviously lead to sorrow.)  For our Accumulo-based projects, we've had pretty good experiences in balancing locality, hotspot avoidance, and data-per-node issues.

Oh, and the test cases.  Accumulo has a nice, growing set of test cases, which of course are run as part of the release cycle.  Also, it's good to know that Accumulo has really been used at scale.  One outfit did some serious work with a 1,200 node Accumulo cluster.

Friday, July 12, 2013

Accumulo and Clojure

Accumulo and Clojure work well together.  Here's a little demo.

This Combiner delegates the reduce work to Clojure code.  As we mentioned previously, our work often involves computing lots of little statistics.  We can use this Clojure code

to compute and maintain histograms:

An nice extension would be to allow the Clojure code to be provided during the setiter invocation.

See Charles Simpson's alternate approach for iterators in Clojure.  The primary difference is that his approach does some marshaling in Java.  Our approach delegates that work to Clojure as well.

Accumulo Histogram Combiner

Just a quick note re Accumulo combiners.  In many of our projects, we compute many little histograms.  Often billions of them.  Accumulo is particularly convenient for such tasks since it allows server-side computation as an extension of its existing processing.  That distinctive ability can make certain processing very inexpensive.

Here's a quick example of an Accumulo combiner that computes a histogram.  The comments in the gist describe how to use the combiner.

Update: Thanks to Keith Turner for the transform generalization.

Example use: