== Introduction ==
'''Tom Adams''', Workingmouse, December 2007
I've had the good fortune to recently complete a project at [http://www.veitchlister.com.au/ Veitch Lister Consulting (VLC)] (a transport planning consultancy) processing large datasets in real-time. This article is a summary of the technical aspects of the project, I won't be talking about the process we used, suffice to say it was XP-like, supported by tools such as my BDD framework [http://instinct.googlecode.com/ Instinct] and web-based agile project management tools. The project ran for around 7-9 weeks, the team was comprised of five developers (3 full-time on the project) consisting of two Workingmouse developers, myself and Sanjiv Sahayam, and three VLC developers, Jamie Cook, Glen Maddern and Nick Partridge.
As a [http://workingmouse.com/ consultancy] we usually work in corporate environments and as such are bound by the architectural constraints of the organisation. This usually includes the usual "enterprise" constraints such as language (Java 1.4 is common), application servers (usually WebLogic or WebSphere) and databases (Oracle or DB2). However this was one of those rare projects where you have no technical constraints and have the fortune to be working with [http://www.veitchlister.com.au/ great] [http://workingmouse.com/ people].
== Finding a working solution ==
Going in, we had three basic requirements, 1) must be callable from a Rails-based web app, 2) must return responses to typical requests in real-time to the webapp (30 seconds was our target) and 3) must query against a dataset that was initially estimated at around 45 TB, but later came down to around 100 GB.
Our technical approach initially was two-fold, firstly to understand the nature of the problem and size of the datasets, and secondly to research potential solutions to the problem. I won't talk too much about the nature of the problem domain and type of data, except to say that the data is the output from transportation simulations and ended up being around 100 GB (of raw uncompressed text), which has since been converted into a binary format. To use SQL nomenclature, our data was divided into three tables, our requests to the data storage layer consisted of a single query consisting of two joins between these three tables. The system then performed further data analysis based on the results of the query. We had complete control over the data generation (this company wrote the piece that generates the data) and storage format, something that we could exploit to provide performance far above conventional generic storage engines (i.e. our solution can not be generalised). The system we were developing was completely read only, which simplified our design considerably.
Our dataset was initially estimated to be around 45 TB (based on extrapolations from the data generation component), which led us to think we'd need to distribute the data across multiple machines. Our first investigations (e.g. Hadoop) were based around this. Later, as we worked out we could batch process the majority of the data offline (as an input to the real-time system) and input only a subset, we were able to look towards conventional tools such as SQL databases.
As there were enough problems to solve already, we didn't want to write our own storage engine. We looked towards conventional SQL databases to achieve this, including:
- [http://www.postgresql.org/ PostgreSQL]
- [http://www.mysql.com/ MySQL]
- [http://www.oracle.com/ Oracle]
- [http://www.ibm.com/db2 DB2]
We spent several weeks tuning the data, the queries, indices and the databases themselves, however we were not able to get the performance that we required out of any of the conventional SQL databases. To be fair, we didn't complete our investigations into DB2, and Oracle failed to complete its install on two separate occasions. PostgreSQL was the worst performer and MySQL, while good for the smaller datasets we threw at it (~10 GB), would not have been performant enough even if it maintained linear scalability with the larger dataset. We also looked at distributed memory solutions like those provided by [http://www.gigaspaces.com/ GigaSpaces] and [http://www.terracottatech.com/ Terracotta] however we no longer needed to distribute the data, so they weren't suited.
After switching away from traditional SQL databases, we looked at a number of [http://en.wikipedia.org/wiki/Column-oriented_DBMS column databases] including:
- [http://db.lcs.mit.edu/projects/cstore/ C-Store]
- [http://wiki.apache.org/lucene-hadoop/Hbase HBase]
- [http://monetdb.cwi.nl/ MonetDB]
However these were either unsupported and outdated (C-Store), core-dumped when queried (two versions of MonetDB, both source & binaries) or were not feature rich enough for our needs at the time (HBase).
We also briefly looked at commercial column stores such as:
- [http://www.teradata.com/ Teradata]
- [http://www.vertica.com/ Vertica]
- [http://www.sybase.com/products/datawarehousing/sybaseiq SybaseIQ]
In the end we ruled these out also as we were starting to think we could easily solve our problem using a plain vanilla filesystem and some simple parsing code, and we didn't want to deal with the sales teams at these companies (I've personally been there before and knew the process would not be pretty).
The data storage solution we came up with was laughable simply. We stored CSV files on a filesystem (ext3 on our dev boxes) and indexed into the data using directories. As we had to search linearly (table scan equivalent) through some of the CSV files we split these files into chunks. We were thus able to distribute the requests based on chunk to each of our nodes. The downside of this approach was that the chunking was a manual process, we had to select the chunk size (31 initially) up front, there was no dynamic indexing. The upside was that we didn't need to maintain the indexing scheme (using B or AVL trees) and the files could be batch updated with only a small configuration change (number of chunks). We've found that our data storage was extremely performant, contributing only 18% to the total time it takes to process a request, the remainder of the time is spent in analysing the data and distribution overhead. Subsequent conversion of the CSV files to a binary format result in even faster processing as we no longer need to parse out the CSV delimiters.
Our initial performance tests of the core algorithm showed that we'd need to distribute the computation (and potentially the data) in order to achieve our "real-time" goal. Based on this we looked at languages such as Erlang and Scala for their Actor concurrency model, both languages also have ways of distributing work across multiple machines. Although our algorithm (and data) was parallelisable, we didn't feel that it was amenable to a high level of concurrency, and our research showed us that we could build the system using Java-based tools. This along with the fact that none of the developers were proficient with either Erlang or Scala swayed us back towards Java (even considering all of its problems). Workingmouse does perform [http://workingmouse.com/research/ research] around functional languages including the newly developed [http://wiki.workingmouse.com/index.php/Scalaz Scalaz] library, however at the time only one of our guys was highly proficient in Scala and we were on a very tight time line. The risk associated with four out of five in the team learning a new language and supporting tools was thought to be too great.
Now that we had a candidate language, we looked at a bunch of Java-based "grid" or parallel computing frameworks including:
- [http://lucene.apache.org/hadoop/ Hadoop]
- [http://www.gridgain.com/ GridGain]
- [http://www.tangosol.com/coherence-overview.jsp Tagasol Coherence]
- [http://www.gigaspaces.com/ GigaSpaces]
- [http://www.terracottatech.com/ Terracotta]
- [http://www.jini.org/wiki/Main_Page JavaSpaces & Jini]
These tools perform different functions, some distribute only the data and some distribute only the computation. By now we were fairly sure that we only needed to distribute the computation, so technologies such as Coherence, Terracotta and GigaSpaces were not appropriate. During this time we'd been prototyping simple algorithms using Hadoop and GridGain. We'd found GridGain very easy to get started and performance tests showed that it added only 10-200 ms to the overhead of calls when compared to local (in-JVM) invocations. Hadoop was promising, however it was also quite complicated to setup (installation and creation of readers for input) and use, and provided components that we didn't need, such as the distributed filesystem. We were also unsure how suited it was for interactive use, it looked to be optimised for long running batch operations. In the end, the ease of use and flexibility of GridGain made it a good candidate for the initial distribution, and continues to be used to date.
== GridGain ==
GridGain has what it calls SPIs (service provider interfaces) which are a fancy way of saying an API whose implementation may be provided by a third party and which I thought had long since been relegated to the bad old days of XML parsers. These make it easy to swap in different implementations of node discovery or node-to-node communication for example. We used this to great effect when we discovered that Amazon's EC2 did not support IP multicast, we easily swapped this out for a [http://www.jgroups.org/ JGroups] implementation, which has since been swapped out for a JMS-based implementation.
For its good points however, there are some downsides (I'm being quite picky here, it's a really good framework to use and works quite well).
- We initially had some issues with GridGain, such as jobs being lost and nodes being lost off the grid. To counter this, we went back and wrote a sample project that performs word counts, and encountered no issues, so these looked to be an issue with our code.
- The error messages that come out of GridGain are horrible. They've attempted to clean up the stack traces, but have made it worse, as they've interrupted the normal flow of Java stack traces, and interspersed it with marketing and documentation material. This makes reading errors quite hard.
- The code itself is quite poor, we had lots of trouble following the flow of execution from the startup script to discover how nodes are actually started. This complicated our understanding of the system and its nomenclature. Most methods are huge, tens of lines long with nested try-catch blocks with variables initialised outside the try-catch block and later used. I know this is normal coding practice for Java developers but there are much better and cleaner ways to handle this. The whole code base needs a good refactor. I actually think this is a reflection on their non-test driven approach to development. I read a [http://www.infoq.com/news/2007/11/tdd-or-tdr#view_14195 post on InfoQ] regarding TDD from one of the developers, who said they'd tried it but basically given up. This reflects in poor design and tight coupling of the classes making them hard to use outside their original context (see Spring example below).
- The documentation while quite extensive, does not cover things such as the architectural overview, what a "grid" is (it's a node), how the queueing mechanism works, etc. All stuff you're going to need if you use GridGain in anger.
- While GridGain provides SPIs, if you need to change the way GridGain works outside of these SPIs, it's not very easy to do so. We encountered two instance of this. Firstly, we found the built in JBoss serialisation orders of magnitude slower than the default Java 1.6 serialisation for the objects we were sending (lists of simple structures containing 2-3 integers) and it produced an order of magnitude more bytes on the stream. There doesn't appear to be a way to replace this without re-implementing a lot of code. Secondly, when we changed to JGroups for discovery, we needed to pass the master node (our single node that submits jobs to the grid) a JGroups configuration file, which must to be an absolute path. As we were running inside a servlet container we couldn't be sure the webapp had been exploded to the filesystem, and even if we were exploded we also had no easy way of knowing (from a Spring config file) where the file lived. Paths are not resolved relative to the Spring config file but relative to the GridGain home directory. This meant we had to ship a JGroups config file in a known location, outside of our normal distribution package (the WAR file). This file must also be a real file on the filesystem, you can't pass a stream or a resource on the classpath.
- Because the documentation was sparse, we spent a lot of time proving theories about how the "grid" behaved under certain circumstances to try to get an understanding of how it all fitted together: Can we run multiple nodes that submit jobs to the same grid (yes), does it load balance these (unsure), what constitutes a "grid" (it's the discovery SPI, whatever other nodes a node can see are the "grid"), can we run multiple nodes in the same VM (no, though some documentation claims you can), how does the queueing work, what happens if we saturate the grid with jobs, does GridGain pre-allocate jobs to nodes or can it take advantage of new nodes coming up after a task has been split (we don't think it can, but are unsure), can nodes participate on more than one "grid", etc.
- As GridGain uses serialisation to send objects across the wire, everything must be Serializable. If it's not, you'll get obscure errors which are hard to trace back to serialisation issues. We ended up writing a test utility to ensure that classes we expected to be Serializable (the task, job and job params) were. We also used FindBugs in the build to ensure we didn't miss any inadvertently.
- We needed to be able to submit a number of tasks to the grid concurrently (where each task was a user request on the Rails app). Our initial tests showed that this bombarded the nodes with jobs, until they were unable to cope, causing the master node to fail over and eventually fail (as there were no nodes "available"). There is an SPI that partially addresses this (the [http://www.gridgainsystems.com:8080/wiki/display/GG15UG/Collision+SPI collision SPI]), however it only addresses "collisions" (i.e. messages arriving at the same time) on the consumer end (the processing/worker node) of the connection. There does not seem to be a way to batch up messages on the master (producer) node. This becomes a problem when the code submitting tasks to the grid runs concurrently (like a web service receiving multiple requests). We hadn't needed to address this yet so didn't look too hard for GridGain solutions, but other possibilities include rolling your own queue (perhaps via java.util.concurrent) or batching up requests on the Rails side. This also has affects on the architecture of your system. As GridGain (seems to) sends the jobs out as soon as they're split, only nodes that are available at the time of the initial send are available to participate in the task. So if a node fails and another comes online, it does not seem to pick up the jobs, increasing the overall processing time of the task.
- GridGain includes a peer class loading facility, which basically means whenever a class is needed by the JVM, the classloader looks in the local classpath first, and if not found, will pull the class of any other nodes in the grid that have it available (caching it locally). This is god for development, where you can make a change to your job or job parameters class and have them automatically re-synced to all the nodes. However we were having issues with class loading (which turned out to be serialization and keeping old classes in the classpath) and wanted to turn this off. Although the documentation claims it can be disabled, we couldn't get our grid to work without it on.
- Submission of a task requires the task class, you cannot give it an instance, which implies a no-args constructor on the task class. This is a bit odd.
- This isn't really an issue, but the IP multicast stuff works too well locally. It's great to get up and going but you can easily throw jobs onto nodes that you didn't intend to. The ability to integrate it into an automated build also suffers because of this. We ended up using custom multicast address per local developer machine (auto-generated from the machine's IP). Other discovery SPIs should be similarly configurable, JGroups & JMS can use localhost for example.
- And lastly, but perhaps worst of all, the nomenclature is all wrong. The entity GridGain calls a "grid" is really a "node" on the grid. This confused us for a couple of weeks, and caused us to incorrectly name the "grid" with a single name, when in fact all you are doing is naming nodes. We spent the time talking about the "conceptual grid", where nodes all communicated based on the "grid" name. This had an impact on our initial architecture, whereby we thought we could have nodes participating in more than one grid at the same time. This was appealing to us as we basically had two different kinds of requests, and we thought we may be able to dynamically partition our nodes into either RequestA grid or RequestB grid. This was not the case, the "grid" is defined by the discovery SPI not the "grid" (node) name
We also encountered issues with the discovery SPI, initially driven by EC2 not supporting IP multicast, and later by issues with JGroups. With our configuration of JGroups, we found that with larger grids (8 or 16 nodes), often the master node wouldn't discover every other node available. We could often rectify the situation by starting our nodes in a specific order (all processing nodes, then the master node) - but we would still occasionally miss a grid node or two. What was more worrying was that with long-running CPU-intensive tasks some of the nodes would drop off the grid (according to the master node's logs). We probably would've persisted with JGroups even with this problem, except that once a node dropped off the grid it was never re-discovered by the master node. Eventually the grid would dwindle to our single non-processing node (our master node was non-processing) and fall over. Because of this we ended up swapping the JGroups discovery implementation out for a JMS-based one. Given we already had a JMS service running inside the application server, switching to it was fairly painless. Grid discovery with JMS seems to work just as well as IP multicast, and there does not appear to be anymore overhead on our processing times.
== Results ==
After we'd found an initial working solution it was time to tweak it. We'd actually been doing this all along, however we now had a couple of solid weeks to spend solely on performance tuning. We had two lines of attack here, firstly we looked at tuning the JVM and its garbage collector configuration (I'd had great success with this in the past), and secondly we looked at profiling the code for CPU hotspots and memory usage. Our biggest wins turned out to be reducing the memory overhead and optimising our <code>hashCode()</code> and <code>equals()</code> implementations. Things we thought would hold us back (before profiling) turned out to be not that big an issue at all, our text-based CSV reading for example was contributing only around 2% to the overall processing time.
Part of our algorithm called for loading 3.1 million objects into memory (and retaining them for fast lookup). Each of these objects implemented a base class (<code>Primordial</code>) which gave us sensible (field-based) <code>toString()</code>, <code>hashCode()</code> and <code>equals()</code> methods. This greatly simplified development, but meant that for every object, we also had an instance of <code>Primordial</code> and its parent <code>Object</code> also in memory, giving us 9.3 million objects. <code>Primordial</code> also had two instance fields that delegate to provide equality and to string capabilities, giving us another 6.2 million objects, 15.5 million objects in total, requiring more than half a GB of RAM. By moving these instance fields into our objects and making them static (class fields) we reduced the object count to 6,200,002 objects and significantly improved the memory and CPU characteristics of the application as we were no longer letting the garbage attempt to reclaim space it couldn't.
Our algorithm also makes heavy use of maps and sets, so our <code>hashCode()</code> and <code>equals()</code> methods were getting a workout. The <code>Primordial</code> implementation uses field values (obtained reflectively) to provide sensible defaults for these methods. While this is normally fine (we used this approach in a large batch processing application for several years with no problems), for this algorithm it proved an issue. By writing custom implementations of <code>hashCode()</code> and <code>equals()</code> for the two or three objects that required it we were able to drop our processing time by a fifth (from 25 seconds to 5 seconds).
I should note here that we didn't perform any premature optimisations, we profiled with real requests against real data, giving us the true problems not the things we thought would be problems. In all we spent around a week and a half tuning and were able to decrease the total process from around 50 seconds to around 1 second (for a single node on the local machine). We were lucky however in that we always had performance at the forefront and had chosen a good architecture, one that didn't need to change following performance testing (I had thought we'd need to do some radical rework to achieve desired performance).
As a sample of the numbers we were getting, here is a performance graph that comes from us trying to determine how the number of nodes affects processing time. These numbers were measured before we'd done the performance optimisations mentioned above, the current system is around 5 times faster than this. Processing time in milliseconds is on the Y-axis and the number of GridGain nodes is on the X-axis, the results for 31 nodes are extrapolated.
[[Image:PerformanceWithMultipleGridNodes.png|none|Grid performance results over multiple nodes]]
The red and blue lines show the average processing time as the number of nodes is increased (averaged across 5 and 20 requests respectively). For a single node the system performs the query in 30 seconds, for 16 nodes the system takes 7 seconds.
The green line (time delta) shows the improvement we get in going from N nodes to N + 1 nodes. For a small number of nodes this is very significant - going from 2 to 3 nodes reduces the total time by 9086 ms - however becomes less significant as more nodes are added. Going from 8 to 11, 11 to 16 and 16 to 31 nodes only drops the processing time by 1243 ms, 1255 ms and 1219 ms respectively. So for this test, almost doubling the number of nodes from 16 to 31 (our maximum number of nodes for our given chunk size discussed above) only improves processing time by one second. At some point you approach a limit where adding more nodes does not decrease (single-request) processing time significantly. It may however provide more resilience so that the grid can handle more concurrent requests and cope better with failure.
The yellow line shows the theoretical performance we should be getting by increasing the number of nodes (assuming no network etc. overhead). If we average the total overhead out across the all the nodes, as we increase the number of nodes this blows out from 95 milliseconds for 3 nodes to 210 milliseconds for 16 nodes. This overhead corresponds roughly to the GridGain overhead we saw in our GridGain sample project.
== Deployment ==
After we'd developed a workable solution we needed a hosting environment. The guys we were working with were pretty keen not to have to host the grid themselves and were looking at [http://aws.amazon.com/ec2 Amazon's EC2] for a low cost easy ramp up solution. The process of setting up nodes was pretty painless; we had registered, set up a node and documented the issues in about a day.
The only real issues we had with EC2 is that it doesn't support IP multicast so we had to change the way our nodes found each other, IPs are also not static so management of clusters could become unwieldily. We also had issues with the IP address the machine thought it was on (the internal IP) and its external address, which meant code that returned the address of the server (the WSDL generation) returned the wrong (internal) address to external clients.
There's issues with persistent data (there is none) so you need to store anything you want somewhere else like [http://aws.amazon.com/s3 S3]. Instances boot up from images (AMIs) so you can keep stuff there also, but it doesn't get saved when the instance goes down, and there's a limit on the amount of data you can store. We used S3 to store our data and were planning on automating the copying of the data across to nodes on boot. The EC2 VMs are quite nice, instances are easy to manage and work as advertised. The small instance can be a bit slow, we found them slower than our desktop machines, but the large instances are very nice, and 64-bit.
To ease deployment, we used [http://www.capify.org/ Capistrano] to deploy our code, start and stop our nodes and the app server. Some of the guys also wrote scripts to automate the booting of our images, returning the (dynamic) IPs for SSH access.
== Conclusions ==
Based on our work, here are a few items to consider in summary.
- Conventional SQL databases are not the solution to all data storage problems, often, you can do better using just the filesystem, if you don't require a general solution.
- Because of their nature current Map/Reduce frameworks (i.e. Hadoop) may not be appropriate for all problems. If you don't need a distributed filesystem, there might be other solutions.
- Discover how your application behaves and what it needs from a grid. Does it need to be distributed? If so, do you need a computation grid or a data grid?
- Beware of the vendors marketing spin.
- If you go with GridGain, invest the time to learn how it works, especially for your problem space. Choose the size of your split (how much work gets done on a node) appropriately. The longer a job runs, the more chance a failing job has of delaying the overall processing time as it may fail at the end of the job, requiring a resend to another node. Hadoop seems to cater for this by allowing idle nodes to pick up work not yet allocated.
- Keep your nodes as consistent as possible, if you can keep them exactly the same all the better, it eases deployment and management of nodes.
- Get an end-to-end solution going as soon as possible, it'll flush out lots of issues.
- Automation is a good thing.
- Small teams can achieve a lot in a short period of time.
- This is the first project I've used the XP idea of a metaphor on (thanks Andrew!) and it worked really well. Ours was "lightning", we wanted the application to be small, simple and lightweight. Requests should flow in and out of the system as fast and simply as possible. This guided technology choices and design decisions.
- Premature optimisation is a bad thing. Keeping your code well structured will aid in refactoring it if and when you meet real problems, not perceived ones.