Tuesday, October 31, 2017

Apache Arrow vs. Parquet and ORC: Do we really need a third Apache project for columnar data representation?

Apache Parquet and Apache ORC have become a popular file formats for storing data in the Hadoop ecosystem. Their primary value proposition revolves around their “columnar data representation format”. To quickly explain what this means: many people model their data in a set of two dimensional tables where each row corresponds to an entity, and each column an attribute about that entity. However, storage is one-dimensional --- you can only read data sequentially from memory or disk in one dimension. Therefore, there are two primary options for storing tables on storage: Store one row sequentially, followed by the next row, and then the next one, etc; or store the first column sequentially, followed by the next column, and then the next one, etc.


Storage layout difference between row- and column-oriented formats


For decades, the vast majority of data engines used row-oriented storage formats. This is because many early data application workloads revolved around reading, writing, and updating single entities at a time. If you store data using a columnar format and you want to extract all attributes for a particular entity, the system must jump around, finding the data for that entity from each of the separately-stored attributes. This results in a random access pattern, which results in slower access times than sequential access patterns. Therefore, columnar storage formats are a poor fit for workloads that tend to read and write entire entities at once, such as OLTP (transactional) workloads. Over time, workloads became more complex, and data analytics workloads emerged that tended to focus on only a few attributes at once; scanning through large quantities of entities to aggregate and/or process values of these attributes. Thus, storing data in a columnar fashion became more viable, and columnar formats resulted in sequential, high performance access patterns for these workloads.


Apache Arrow has recently been released with seemingly an identical value proposition as Apache Parquet and Apache ORC: it is a columnar data representation format that accelerates data analytics workloads. Yes, it is true that Parquet and ORC are designed to be used for storage on disk and Arrow is designed to be used for storage in memory. But disk and memory share the fundamental similarity that sequential access is faster than random access, and therefore the analytics workloads which tend to scan through attributes of data will perform more optimally if data is stored in columnar format no matter where data is stored --- in memory or on disk.  And if that’s the case, the workloads for which Parquet and ORC are a good fit will be the identical set of workloads for which Arrow is a good fit. If so, why do we need two different Apache projects?


Before we answer this question, let us run a simple experiment to validate the claimed advantages of column-stores. On an Amazon EC2 t2.medium instance, I created a table with 60,000,000 rows (entities) in main memory. Each row contained six attributes (columns), all of them 32-bit integers. Each row is therefore 24 bytes, and the entire table is almost 1.5GB. I created both row-oriented and column-oriented versions of this table, where the row-oriented version stores the first 24-byte row, followed by the next one, etc; and the column-oriented version stores the entire first column, followed by the next one, etc. I then ran a simple query ideally suited for column-stores --- I simply search the entire first column for a particular value. The column-oriented version of my table should therefore have to scan though just the first column and will never need to access the other five columns. Therefore it will need to scan through 60,000,000 values * 4 bytes per value = almost 0.25GB. Meanwhile the row-store will need to scan through the entire 1.5GB table because the granularity with which data can be passed from memory to the CPU (a cache line) is larger than the 24-byte tuple. Therefore, it is impossible to read just the relevant first attribute from memory without reading the other five attributes as well. So if the column-store has to read 0.25GB of data and the row-store has to read 1.5GB of data, you might expect the column-store to be 6 times faster than the row-store. However, the actual results are presented in the table below:




Surprisingly, the row-store and the column-store perform almost identically, despite the query being ideally suited for a column-store. The reason why this is the case is that I turned off all CPU optimizations (such as vectorization / SIMD processing) for this query. This resulted in the query being bottlenecked by CPU processing, despite the tiny amount of CPU work that has to happen for this query (just a simple integer comparison operation per row). To understand how it is possible for such a simple query to be bottlenecked by the CPU, we need to understand some basic performance specifications of typical machines. As a rough rule of thumb, sequential scans through memory can feed data from memory to the CPU at a rate of around 30GB a second. However, a modern CPU processor runs at approximately 3 GHz --- in other words they can process around 3 billion instructions a second. So even if the processor is doing a 4-byte integer comparison every single cycle, it is processing no more than 12GB a second --- a far smaller rate than the 30GB a second of data that can be sent to it. Therefore, CPU is the bottleneck, and it does not matter that the column-store only needs to send one sixth of the data from memory to the CPU relative to a row-store.


On the other hand, if I turn on CPU optimizations (by adding the ‘-O3’ compiler flag), the equation is very different. Most notably, the compiler can vectorize simple operations such as the comparison operation from our query. What this means is that originally each of the 60,000,000 integer comparisons that are required for this query happened sequentially --- each one occurring in a separate instruction from the previous. However, once vectorization is turned on, most processors can actually take four (or more) contiguous elements of our column, and do the comparison operation for all four of these elements in parallel --- in a single instruction. This effectively makes the CPU go 4 times faster (or more if it can do more than 4 elements in parallel). However, this vectorization optimization only works if each of the four elements fit in the processor register at once, which roughly means that they have to be contiguous in memory. Therefore, the column-store is able to take advantage of vectorization, while the row-store is not. Thus, when I run the same query with CPU optimizations turned on, I get the following result:




As can be seen, the EC2 processor appears to be vectorising 4 values per instruction, and therefore the column-store is 4 times faster than the row-store. However, the CPU still appears to be the bottleneck (if memory was the bottleneck, we would expect the column-store to be 6 times faster than the row-store).


We can thus conclude from this experiment that column-stores are still better than row-stores for attribute-limited, sequential scan queries like the one in our example and similar queries typically found in data analytics workloads. So indeed, it does not matter whether the data is stored on disk or in memory --- column-stores are a win for these types of workloads. However, the reason is totally different. When the table is stored on disk, the CPU is much faster than the bandwidth with which data can be transferred from disk to the CPU. Therefore, column-stores are a win because they require less data to be transferred for these workloads. On the other hand, when the table is stored in memory, the amount of data that needs to be transferred is less relevant. Instead, column-stores are a win because they are better suited to vectorized processing.  


The reason why it is so important to understand the difference in bottleneck (even though the bottom line is the same) is that certain decisions for how to organize data into storage formats will look different depending on the bottleneck. Most notably, compression decisions will look very different. In particular, for data stored on disk, where the bandwidth of getting data from disk to CPU is the bottleneck, compression is almost always a good idea. When you compress data, the total size of the data is decreased, and therefore less data needs to be transferred. However, you may have to pay additional CPU processing costs to do the decompression upon arrival. But if CPU is not the bottleneck, this is a great tradeoff to make. On the other hand, if CPU is the bottleneck, such as our experiments above where the data was located in main memory, the additional CPU cost of decompression is only going to slow down the query.


Now we can understand some of the key differences between Apache Parquet/ORC and Apache Arrow. Parquet and ORC, since they are designed for disk-resident data, support high-ratio compression algorithms such as snappy (both), gzip (Parquet), and zlib (ORC) all of which typically require decompression before data processing (and the associated CPU costs). Meanwhile, Arrow, which is designed for memory-resident-data, does not support these algorithms. The only compression currently supported by Arrow is dictionary compression, a scheme that usually does not require decompression before data processing. For example, if you want to find a particular value in a data set, you can simply search for the associated dictionary-encoded value instead. I assume that the Arrow developers will eventually read my 2006 paper on compression in column-stores and expand their compression options to include other schemes which can be operated on directly (such as run-length-encoding and bit-vector compression). I also expect that they will read the X100 compression paper which includes schemes which can be decompressed using vectorized processing. Thus, I expect that Arrow will eventually support an expanded set of compression options beyond just dictionary compression. But it is far less likely that we will see heavier-weight schemes like gzip and snappy in the Apache Arrow library any time soon.


Another difference between optimizing for main memory and optimizing for disk is that the relative difference between random reads and sequential reads is much smaller for memory than for disk. For magnetic disk, a sequential read can be 2-3 orders of magnitude faster than a random read. However, for memory, the difference is usually less than an order of magnitude. In other words, it might take hundreds or even thousands of sequential reads on disk in order to amortize the cost of the original random read to get to the beginning of the sequence. But for main memory, it takes less than ten sequential reads to amortize the cost of the original random read. This enables the batch size of Apache Arrow data to be much smaller than batch sizes of disk-oriented storage formats. Apache Arrow actually fixes batches to be no more 64K records.


So to return back to the original question: do we really need a third column-store Apache project? I would say that there are fundamental differences between main-memory column-stores and disk-resident column-stores. Main-memory column-stores, like Arrow, need to be CPU optimized and focused on vectorized processing (Arrow aligns data to 64-byte boundaries for this reason) and low-overhead compression algorithms. Disk-resident column-stores need to be focused transfer-bandwidth and support higher-compression ratio algorithms. Therefore, it makes sense to keep Arrow and Parquet/ORC as separate projects, while also continuing to maintain tight integration.

Sunday, October 8, 2017

Hazelcast and the Mythical PA/EC System

(Editor’s note: I was unaware that Kyle Kingsbury was doing a linearizability analysis of Hazelcast when I was writing this post. Kyle’s analysis resulted in Greg Luck, Hazelcast’s CEO, to write a blog post where he cited the PACELC theorem, and came to some of the same conclusions that I came to in writing this post. This post, however, was 98% written before both Kyle’s and Greg’s posts, but their posts got me to accelerate the completion of my analysis and publish it now.)

Seven years ago, I introduced the PACELC theorem as a mechanism to more clearly explain the consistency tradeoffs in building distributed systems. At that time, many people were familiar with the consistency vs. availability trade-off that was made well-known by the CAP theorem. However, it was common amongst people unfamiliar with the details of CAP theorem to believe that this tradeoff is always present in a distributed system. However, the truth is that the CAP consistency-availability tradeoff actually describes a very rare corner case. Only when there is an actual network partition --- an increasingly unusual event in modern day infrastructures ---  does the consistency-availability tradeoff present itself. At all other times, it is possible to be both available and consistent. Nonetheless, many systems choose not to be fully consistent at all times. The reason for this has nothing to do with the CAP tradeoff. Instead, there is a separate latency vs. consistency tradeoff. Enforcing consistency requires either (1) synchronization messages between machines that must remain consistent with each other or (2) all requests involving a particular data item to be served by a single master for that data item instead of the closest replica to the location where the request originates. Both of these options come with a latency cost. By relaxing consistency and serving reads and writes directly from a closest replica (without synchronization with other replicas), latency can be improved --- sometimes by an order of magnitude.
Therefore, I felt that it was important to clearly tease apart these separate consistency tradeoffs. This led to the PACELC theorem: if there is a partition (P), how does the system trade off availability and consistency (A and C); else (E), when the system is running normally in the absence of partitions, how does the system trade off latency (L) and consistency (C)?
In general, the PACELC theorem leads to four categories of systems: PC/EC, PA/EL, PC/EL, and PA/EC. However, in practice, an application will either go to the effort of building on top of a reduced consistency system or it will not. If it goes to this effort, it stands to benefit in two areas: availability upon a partition, and latency in everyday operation. It is unusual for a system to go to this effort and choose only to attain benefit in one area. Hence, two of these four categories are more common than the other two: PC/EC systems designed for applications that can never sacrifice consistency, and PA/EL systems that are designed for applications that are capable of being built over a reduced consistency system. Despite being less common, PACELC nonetheless theorizes about the existence of PC/EL and PA/EC systems. At the time when I originally introduced PACELC, I gave the example of PNUTS as a PC/EL system. However, I could not think of any good examples of PA/EC systems. Even in my extended article on PACELC in the CAP-anniversary edition of IEEE Computer, I only gave a somewhat hand-wavey example of a PA/EC system.
The basic problem with PA/EC systems is the following: although partitions are a rare event, they are not impossible. Any application built on top of a PA system must have mechanisms in place to deal with inconsistencies that arise during these partition events. But once they have these mechanisms in place, why not benefit during normal operation and get better latency as well?
Over the past few weeks, I have been looking more deeply at the In-Memory Data Grid (“IMDG”) market, and took an especially deep dive into Hazelcast, a ubiquitous open source implementation of a IMDG, with hundreds of thousands of in production deployments. It turns out that Hazelcast (and, indeed, most of the in-memory data grid industry) is a real implementation of the mythical PA/EC system.
In order to understand why PA/EC makes sense for Hazelcast and other IMDGs, we need to first discuss some background material on (1) Hazelcast use cases, (2) data replication and (3) PACELC.

Hazelcast use cases
The most common use case for Hazelcast is the following. Let’s say that you write a Java program that stores and manipulates data inside popular Java collections and data structures, e.g., Queue, Map, AtomicLong, or Multimap. You may want to run this program in multiple different clients, all accessing the same Java collections and data structures. Furthermore, these data structures may get so large that they cannot fit in memory on a single server. Hazelcast comes to the rescue --- it provides a distributed implementation of these Java data structures, thereby enabling scalable utilization of them. Users interact with Hazelcast the same way that they interacted with their local data structures, but behind the scenes, Hazelcast is distributing and replicating them across a cluster of machines.
The vast majority of Hazelcast use cases are within a single computing cluster. Both the client programs and the Hazelcast data structures are located in the same physical region.

Data replication
In general, any arbitrary system may choose to replicate data for one of two primary reasons: Either they want to improve fault tolerance (if a server containing some of the data fails, a replica server can be accessed instead), or they want to improve request latency (messages that have to travel farther distances take longer to transmit; therefore, having a replica of the data “near” locations from which they are typically accessed can improve request latency).
As mentioned above, in-memory data grids are typically running in the same region as the clients which access them. Therefore, only the first reason to replicate data (fault tolerance) applies. (This reason alone is enough to justify the costs of replication in any scalable system. The more physical machines that exist in the system, the more likely it is that at least one machine will fail at any given point in time. Therefore, the bigger the system, the more you need to replicate for fault tolerance).
If the replicas only exist for fault tolerance and not for performance, there is no fundamental requirement to ever access them except in the case of a failure. All reads and writes can be directed to the primary copy of any data item, with the replicas only ever accessed if the primary is not available. (In such a scenario, it is a good idea to mix primary and replica partitions on servers across the cluster, in order to prevent underutilization of server resources.) If all reads and writes go to the same location, this leads to 100% consistency and linearizability (in the absence of failures) since it is easy for a single server to ensure that reads reflect the most recent writes.

What this means for PACELC
Recall what I wrote above about the latency vs. consistency tradeoff: “Enforcing consistency requires either (1) synchronization messages between machines that must remain consistent with each other or (2) all requests involving a particular data item to be served by a single master for that data item instead of the closest replica to the location where the request originates. Both of these options come with a latency cost.” In truth, option (2) does not come with a latency cost when all requests originate from a location closest to the master replica. It’s only when messages travel for longer than the distance to the nearest replica where a cost materializes. In order words, there is no consistency vs. latency tradeoff in the typical Hazelcast use case.
Thus, we should clarify at this point that the PACELC theorem assumes that requests may originate from any arbitrary location. The ELC part of PACELC disappears if all requests come from the same location. I would argue that the CAP theorem makes the same assumption, but such an argument is not as straightforward, and requires a refined discussion about the CAP theorem which is outside scope of this particular blog post.


Failures and partitions
Up until now, we have said that as long as the master node does not fail, if it serves all reads and writes, then full consistency is achieved. The obvious next question is: what happens if the master node fails and a new master takes over? In such a scenario, the ability of the system to maintain consistency depends on how replication is performed. If replication was asynchronous, then consistency cannot be guaranteed, since some updates may have been performed on the old master, but had not yet been replicated to the new master before the old master failed. If all data had been synchronously replicated to the new master, then full consistency can still be guaranteed.
A failed node is logically equivalent to a partition where the failed node is located in one partition and every other node is in the other partition, and all client requests can reach the second partition but not the first. If the failed node is the master node, and replication was asynchronous, then both the CAP theorem and the PAC part of PACELC state that there are only two choices: quiesce the entire system since the only consistent copy is not accessible (i.e. choosing consistency over availability), or serve reads and writes from nodes in the available partition (i.e. choosing availability over consistency).  
Hazelcast by default uses “synchronous” replication, which is actually an interesting hybrid between asynchronous and synchronous replication. The master asynchronously sends the writes to the replicas, and each replica acknowledges these writes to the client. The client synchronously waits for these acknowledgments before returning from the call. However, if the requisite number of acknowledgments do not arrive before the end of a time out period, the call either returns with the write succeeding or throws an exception, depending on configuration. If Hazelcast had been configured to throw an exception, the client can retry the operation.  Hazelcast also has an anti-entropy algorithm that works offline to re-synchronize replicas with the master to repair missed replications. However, either way --- until the point where the missed replication has been repaired either through the anti-entropy algorithm or through a client retry, the system is temporarily in a state where the write has succeeded on the master but not on at least one replica.
In addition to the hybrid synchronous algorithm described above, Hazelcast also can be configured to use standard asynchronous replication. When configured in this way, the client does not wait for acknowledgments from the replicas before returning from the call. Thus, updates that failed to get replicated will go undetected until the anti-entropy algorithm identifies and repairs the missed replication.  
Thus, either way --- whether Hazelcast is configured to use standard asynchronous replication or to use the default hybrid “synchronous” model --- it is possible for the write call to return with the write only succeeding on the master.
If the master node fails, Hazelcast selects a new master to serve reads and writes, even though (as we just mentioned) it is possible that the new master does not have all the writes from the original master. If there is a network partition, the original master will remain the master for its partition, but the other partition will select its own master. Again, this second master may not have all the writes from the original master. Furthermore, a full split brain situation may occur, where the masters for the two different partitions independently accept writes to their partition, thereby causing the partitions to diverge further. However, Hazelcast does have a “split brain protection” feature that prevents significant divergence. The way this feature works is that the system can be configured to define a minimum size for read and write operations. If this minimum size is set to be larger than half of the size of the cluster, then the smaller partition will not accept reads and writes, which prevents further divergence from the larger partition. However, it can take 10s of seconds for the smaller partition to realize how small it is (although Hazelcast claims it will be much faster than this in 3.9.1 and 3.10). Thus there is a delay before the split brain protection kicks in, and the partitions can diverge during this delay period.
The bottom line here is that both if the master fails and also in the (rare) case of a network partition, a new master is selected that may not have all the updates from the original master. The system always remains available, but the second master is allowed to temporarily diverge from the original master. Thus, Hazelcast is PA/EC in PACELC. If the master has failed or partitioned, Hazelcast choses availability over consistency. However, in the absence of failures or partitions, Hazelcast is fully consistent. (As mentioned above, Hazelcast also achieves low latency in the absence of failures or partitions in their primary use case. However, it is appropriate to label Hazelcast EC rather than EL since if a request were to theoretically originate in a location that is far from the master, it would still choose consistency over latency and serve the request from the master.)
Indeed, any system that that serves reads and writes from the master, but elects a new master upon a failure, where this new master is not 100% guaranteed to have seen all of the writes from the original master, will be PA/EC in PACELC. So the PA/EC category is larger than I originally had expected.
I would still argue, however, that PA/EC systems are fundamentally confusing to the end user. If the system cannot guarantee consistency in all cases, then the end user is forced to handle cases of inconsistency in application logic. And once they have the code written to handle these cases (e.g., by including merge functions that resolve situations where replicas may diverge), then the value of the system being consistent in the absence of failures or partitions is significantly reduced. PA/EC systems thus only make sense for applications for which availability takes priority over consistency, but where the code that handles inconsistencies needs to be run as infrequently as possible --- e.g. when the code involves a real world charge (such as refunding a customer’s account) or significant performance costs.
Since not all applications fit into the above category, I suspect that many PA/EC systems will have settings to either increase consistency in order to become fully consistent (i.e. become PC instead of PA) or reduce consistency guarantees in the “else case” (i.e., become EL instead of EC).
Indeed, Hazelcast is such a system and can be configured to be EL rather than EC. There are several ways to accomplish this, but the primary mechanism is through their Near Cache feature. Near Cache is a client side cache of recently accessed data items. If the data items stored in the Near Cache are updated by a different client, these changes are not synchronously replicated to the first client’s Near Cache. Hence, the Near Cache is not kept consistent with the master version of the data (instead it is “eventually consistent”). However, reads by the client are served by its Near Cache if a copy of the data item to be read is stored there. Therefore, excellent latency (less than one microsecond) can be achieved at the cost of consistency --- EL in PACELC.
Furthermore, Hazelcast also supports replication of clusters over a WAN. For example, in a disaster recovery use case, all writes go to the primary cluster, and they are asynchronously replicated to a backup cluster. Alternatively, both clusters can accept writes, and they are asynchronously replicated to the other cluster (the application is responsible for resolving conflicts of conflicting writes to the different clusters using a conflict resolution strategy registered with Hazelcast). Unlike what we discussed earlier, in this case read requests may originate from arbitrary locations rather than always from a location near the master. Hazelcast serves these reads from the closest location, even though it may not have the most up to date copy of the data. Thus, Hazelcast is EL by default for WAN replication.  
In summary, through my investigation of Hazelcast (and in-memory data grids in general), I have discovered a new category of PA/EC systems. However, due to the confusing nature of PA/EC systems, it is no surprise that Hazelcast can be configured to be PA/EL in addition to its PA/EC default.

Thursday, April 6, 2017

Distributed consistency at scale: Spanner vs. Calvin

Introduction

In 2012, two research papers were published that described the design of geographically replicated, consistent, ACID compliant, transactional database systems. Both papers criticized the proliferation of NoSQL database systems that compromise replication consistency and transactional support, and argue that it is very possible to build extremely scalable, geographically replicated systems without giving up on consistency and transactional support. The first of these papers was the Calvin paper, published in SIGMOD 2012. A few months later, Google published their Spanner paper in OSDI 2012. Both of these papers have been cited many hundreds of times and have influenced the design of several modern “NewSQL” systems, including FaunaDB (where this post is also being published).
Recently, Google released a beta version of their Spanner implementation, available to customers who use Google Cloud Platform. This development has excited many users seeking to build scalable apps on Google’s cloud, since they now have a reliably scalable and consistent transactional database system to use as a foundation. However, the availability of Spanner outside of Google has also brought it more scrutiny --- what are its technical advantages in practice, and what are its costs? Even though it has been five years since the Calvin paper was published, it is only now that the database community is asking me to directly compare and contrast the technical designs of Calvin and Spanner.
The goal of this post is to do exactly that --- compare the architectural design decisions made in these two systems, and specifically focus on the advantages and disadvantages of these decisions against each other as they relate to performance and scalability. This post is focused on the protocols described in the original papers from 2012. Although the publicly available versions of these systems likely have deviated from the original papers, the core architectural distinctions remain the same.

The CAP theorem in context

Before we get started, allow me to suggest the following: Ignore the CAP theorem in the context of this discussion. Just forget about it. It’s not relevant for the type of modern architectural deployments discussed in this post where network partitions are rare.
Both Spanner and Calvin replicate data across independent regions for high availability. And both Spanner and Calvin are technically CP systems from CAP: they guarantee 100% consistency (serializability, linearizability, etc.) across the entire system. Yes, when there is a network partition, both systems make slight compromises on availability, but partitions are rare enough in practice that developers on top of both systems can assume a fully-available system to many 9s of availability.
(BTW: If you didn’t believe me in 2010 when I explained the shortfalls of using CAP to understand the practical consistency and availability properties of modern systems, maybe you will believe the author of the CAP theorem himself, Eric Brewer, who recommends against analyzing Spanner through the lens of the CAP theorem.)

Ordering transactions in time

Let us start our comparison of Calvin and Spanner with the most obvious difference between them: Spanner’s use of “TrueTime” vs. Calvin’s use of “preprocessing” (or “sequencing” in the language of the original paper) for transaction ordering. In fact, most of the other differences between Spanner and Calvin stem from this fundamental choice.
A serializable system provides a notion of transactional ordering. Even though many transactions may be executed in parallel across many CPUs and many servers in a large distributed system, the final state (and all observable intermediate states) must be as if each transaction was processed one-by-one. If no transactions touch the same data, it is trivial to process them in parallel and maintain this guarantee. However, if the transactions read or write each other’s data, then they must be ordered against each other, with one considered earlier than the other. The one considered “later” must be processed against a version of the database state that includes the writes of the earlier one. In addition, the one considered “earlier” must be processed against a version of the database state that excludes the writes of the later one.

Locking and logging

Spanner uses TrueTime for this transaction ordering. Google famously uses a combination of GPS and atomic clocks in all of their regions to synchronize time to within a known uncertainty bound. If two transactions are processed during time periods that do not have overlapping uncertainty bounds, Spanner can be certain that the later transaction will see all the writes of the earlier transaction.
Spanner obtains write locks within the data replicas on all the data it will write before performing any write. If it obtains all the locks it needs, it proceeds with all of its writes and then assigns the transaction a timestamp at the end of the uncertainty range of the coordinator server for that transaction. It then waits until this later timestamp has definitely passed for all servers in the system (which is the entire length of the uncertainty range) and then releases locks and commits the transaction. Future transactions will get later timestamps and see all the writes of this earlier transaction. Thus, in Spanner, every transaction receives a timestamp based on the actual time that it committed, and this timestamp is used to order transactions. Transactions with later timestamps see all the writes of transactions with earlier timestamps, with locking used to enforce this guarantee.
In contrast, Calvin uses preprocessing to order transactions. All transactions are inserted into a distributed, replicated log before being processed. In more detail: clients submit transactions to the preprocessing layer of their local region, which then submits these transactions to the global log via a cross-region consensus process like Paxos. This is similar to a write-ahead log in a traditional, non-distributed database. The order that the transactions appear in this log is the official transaction ordering. Every replica reads from their local copy of this replicated log and processes transactions in a way that guarantees that their final state is equivalent to what it would have been had every transaction in the log been executed one-by-one.

Replication overhead

The design difference between TrueTime vs. preprocessing directly leads to a difference in how the systems perform replication. In Cavlin, the replication of transactional input during preprocessing is the only replication that is needed. Calvin uses a deterministic execution framework to avoid *all* cross-replication communication during normal (non-recovery mode) execution aside from preprocessing. Every replica sees the same log of transactions and guarantees not only a final state equivalent to executing the transactions in this log one-by-one, but also a final state equivalent to every other replica.
This requires the preprocessor to analyze the transaction code and “pre-execute” any nondeterministic code (e.g. calls to sys.random() or time.now()). (The implication of this in terms of what types of transactions are supported by Calvin are discussed at the end of this post.) Once all code within a transaction is deterministic, a replica can safely focus on just processing the transactions in the log in the correct order without concern for diverging with the other replicas.  
In contrast, since Spanner does not do any transaction preprocessing, it can only perform replication after transaction execution. Spanner performs this replication via a cross-region Paxos process.

The cost of two-phase commit

Another key difference between Spanner and Calvin is how they commit multi-partitioned transactions. Both Calvin and Spanner partition data into separate shards that may be stored on separate machines that fail independently from each other. In order to guarantee transaction atomicity and durability, any transaction that accesses data in multiple partitions must go through a commit procedure that ensures that every partition successfully processed the part of the transaction that accessed data in that partition. Since machines may fail at any time, including during the commit procedure, this process generally takes two rounds of communication between the partitions involved in the transaction. This two-round commit protocol is called “two phase commit” and is used in almost every ACID-compliant distributed database system, including Spanner. This two phase commit protocol can often consume the majority of latency for short, simple transactions since the actual processing time of the transaction is much less than the delays involved in sending and receiving two rounds of messages over the network.
The cost of two phase commit is particularly high in Spanner because the protocol involves three forced writes to a log that cannot be overlapped with each other. In Spanner, every force write to a log involves a cross-region Paxos agreement, so the latency of two phase commit in Spanner is at least equal to three times the latency of cross-region Paxos.

Determinism is durability

In contrast to Spanner, Calvin leverages deterministic execution to avoid two-phase commit. Machine failures do not cause transactional aborts in Calvin. Instead, after a failure, the machine that failed in Calvin re-reads the input transaction log from a checkpoint, and deterministically replays it to recover its state at the time of the failure. It can then continue on from there as if nothing happened. As a result, the commit protocol does not need to worry about machine failures during the protocol, and can be performed in a single round of communication (and in some cases, zero rounds of communication --- see the original paper for more details).

Performance

At this point, I think I have provided enough details to make it possible to present a theoretical comparison of the bottom line performance of Calvin vs. Spanner for a variety of different types of requests.  This comparison assumes a perfectly optimized and implemented version of each system.

Transactional write latency

A transaction that is “non-read-only” writes at least one value to the database state. In Calvin, such a transaction must pay the latency cost of preprocessing, which is roughly the cost of running cross-region Paxos to agree to append the transaction to the log. After this is complete, the remaining latency is the cost of processing the transaction itself, which includes the zero or one-phase commit protocol for distributed transactions.
In Spanner, there is no preprocessing latency, but it still has to pay the cost of cross-region Paxos replication at commit time, which is roughly equivalent to the Calvin preprocessing latency. Spanner also has to pay the commit wait latency discussed above (which is the size of the time uncertainty window), but this can be overlapped with replication. It also pays the latency of two phase commit for multi-partition transactions.
Thus, Spanner and Calvin have roughly equivalent latency for single-partition transactions, but Spanner has worse latency than Calvin for multi-partition transactions due to the extra phases in the transaction commit protocol.

Snapshot read latency

Both Calvin and Spanner keep around older versions of data and read data at a requested earlier timestamp from a local replica without any Paxos-communication with the other replicas.
Thus, both Calvin and Spanner can achieve very low snapshot-read latency.

Transactional read latency

Read-only transactions do not write any data, but they must be linearizable with respect to other transactions that write data. In practice, Calvin accomplishes this via placing the read-only transaction in the preprocessor log. This means that a read-only transaction in Calvin must pay the cross-region replication latency. In contrast, Spanner only needs to submit the read-only transaction to the leader replica(s) for the partition(s) that are accessed in order to get a global timestamp (and therefore be ordered relative to concurrent transactions). Therefore, there is no cross-region Paxos latency --- only the commit time (uncertainty window) latency.
Thus, Spanner has better latency than Calvin for read-only transactions submitted by clients that are physically close to the location of the leader servers for the partitions accessed by that transaction.

Scalability

Both Spanner and Calvin are both (theoretically) roughly-linearly scalable for transactional workloads for which it is rare for concurrent transactions to be accessing the same data. However, major differences begin to present themselves as the conflict rate between concurrent transactions starts to increase.
Both Spanner and Calvin, as presented in the paper, use locks to prevent concurrent transactions from interfering with each other in impermissible ways. However, the amount of time they hold locks for an identical transaction is substantially different. Both systems need to hold locks during the commit protocol. However, since Calvin’s commit protocol is shorter than Spanner’s, Calvin reduces the lock hold time at the end of the transaction. On the flip side, Calvin acquires all locks that it will need at the beginning of the transaction, whereas Spanner performs all reads for a transaction before acquiring write locks. Therefore, Spanner reduces lock time at the beginning of the transaction.
However, this latter advantage for Spanner is generally outweighed by the former (extra lock-hold time at the end of the transactions) disadvantage, since, as discussed above, the latency of two-phase commit in Spanner involves at least three iterations of cross-region Paxos. Furthermore, Spanner has an additional major disadvantage relative to Calvin in lock-hold time: Spanner must also hold locks during replication (which, as mentioned above, is also a cross-region Paxos process). The farther apart the regions, the larger the latency of this replication, and therefore, the longer Spanner must hold locks.
In contrast, Calvin does its replication during preprocessing, and therefore does not need to hold locks during replication. This leads to Calvin holding locks for much shorter periods of time than Spanner, and therefore being able to process more concurrent transactions in parallel that conflict with each other.
A second difference that can affect scalability is the following: Calvin requires only a single Paxos group for replicating the input log. In contrast, Spanner requires one independent Paxos group per shard, with proportionally higher overhead.
Overall, Calvin has higher throughput scalability than Spanner for transactional workloads where concurrent transactions access the same data. This advantage increases with the distance between datacenters.

Limitations

In order to implement deterministic transaction processing, Calvin requires the preprocessor to analyze transactions and potentially “pre-execute” any non-deterministic code to ensure that replicas do not diverge. This implies that the preprocessor requires the entire transaction to be submitted at once. This highlights another difference between Calvin and Spanner --- while Spanner theoretically allows arbitrary client-side interactive transactions (that may include external communication), Calvin supports a more limited transaction model.
There are some subtle, but interesting differences between Calvin and Spanner in rare situations where every single replica for a shard is unavailable, or if all but one are unavailable, but these differences are out of scope for this post.

Conclusion

I’m obviously biased in favor of Calvin, but in going through this exercise, I found it very difficult to find cases where an ideal implementation of Spanner theoretically outperforms an ideal implementation of Calvin. The only place where I could find that Spanner has a clear performance advantage over Calvin is for latency of read-only transactions submitted by clients that are physically close to the location of the leader servers for the partitions accessed by that transaction. Since any complex transaction is likely to touch multiple partitions, this is almost impossible to guarantee in a real-world setting.
However, many real-world workloads do not require client-side interactive transactions, and furthermore only need transactional support for writes and are satisfied with performing reads against a snapshots (after all, this is the default isolation model of many SQL systems). It seems to me that Calvin is the better fit for a large class of modern applications.


Wednesday, October 28, 2015

Why MongoDB, Cassandra, HBase, DynamoDB, and Riak will only let you perform transactions on a single data item

(This post is co-authored by Daniel Abadi and Jose Faleiro and cross-posted on Jose's blog)

NoSQL systems such as MongoDB, Cassandra, HBase, DynamoDB, and Riak have made many things easier for application developers. They generally have extremely flexible data models, that reduce the burden of advance prediction of how an application will change over time. They support a wide variety of data types, allow nesting of data, and dynamic addition of new attributes. Furthermore, on the whole, they are relatively easy to install, with far fewer configuration parameters and dependencies than many traditional database systems.

On the other hand, their lack of support for traditional atomic transactions is a major step backwards in terms of ease-of-use for application developers. An atomic transaction enables a group of writes (to different items in the database) to occur in an all-or-nothing fashion --- either they will all succeed and be reflected in the database state, or none of them will. Moreover, in combination with appropriate concurrency control mechanisms, atomicity guarantees that concurrent and subsequent transactions either observe all of the completed writes of an atomic transaction or none of them. Without atomic transactions, application developers have to write corner-case code to account for cases in which a group of writes (that are supposed to occur together) have only partially succeeded or only partially observed by concurrent processes. This code is error-prone, and requires complex understanding of the semantics of an application.

At first it may seem odd that these NoSQL systems, that are so well-known for their developer-friendly features, should lack such a basic ease-of-use tool as an atomic transaction. One might have thought that this missing feature is a simple matter of maturity --- these systems are relatively new and perhaps they simply haven't yet gotten around to implementing support for atomic transactions. Indeed, Cassandra's "batch update" feature could be viewed as a mini-step in this direction (despite the severe constraints on what types of updates can be placed in a "batch update"). However, as we start to approach a decade since these systems were introduced, it is clear that there is a more fundamental reason for the lack of transactional support in these systems.

Indeed, there is a deeper reason for their lack of transactional support, and it stems from their focus on scalability. Most NoSQL systems are designed to scale horizontally across many different machines, where the data in a database is partitioned across these machines. The writes in a (general) transaction may access data in several different partitions (on several different machines). Such transactions are called "distributed transactions". Guaranteeing atomicity in distributed transactions requires that the machines that participate in the transaction coordinate with each other. Each machine must establish that the transaction can successfully commit on every other machine involved in the transaction. Furthermore, a protocol is used to ensure that no machine involved in the transaction will fail before the writes that it was involved in for that transactions are present in stable storage. This avoids scenarios where one set of nodes commit a transaction's writes, while another set of nodes abort or fail before the transaction is complete (which violates the all-or-nothing guarantee of atomicity).

This coordination process is expensive, both, in terms of resources, and in terms of adding latency to database requests. However, the bigger issue is that other operations are not allowed to read the writes of a transaction until this coordination is complete, since the all-or-nothing nature of transaction execution implies that these writes may need to be rolled-back if the coordination process determines that some of the writes cannot complete and the transaction must be aborted. The delay of concurrent transactions can cause further delay of other transactions that have overlapping read- and write-sets with the delayed transactions, resulting in overall "cloggage" of the system. The distributed coordination that is required for distributed transactions thus has significant drawbacks for overall database system performance --- both in terms of the throughput of transactions per unit time that the system can process, and in terms of the latency of transactions as they get caught up in the cloggage (this cloggage latency often dominates the latency of the transaction coordination protocol itself). Therefore, most NoSQL systems have chosen to disallow general transactions altogether rather than become susceptible to the performance pitfalls that distributed transactions can entail.

MongoDB, Riak, HBase, and Cassandra all provide support for transactions on a single key. This is because all information associated with a single key is stored on a single machine (aside from replicas stored elsewhere). Therefore, transactions on a single key are guaranteed not to involve the types of complicated distributed coordination described above.

Given that distributed transactions necessitate distributed coordination, it would seem that there is a fundamental tradeoff between scalable performance and support for distributed transactions. Indeed, many practitioners assume that this is the case. When they set out to build a scalable system, they immediately assume that they will not be able to support distributed atomic transactions without severe performance degradation.

This is in fact completely false. It is very much possible for a scalable system to support performant distributed atomic transactions.

In a recent paper, we published a new representation of the tradeoffs involved in supporting atomic transactions in scalable systems.  In particular, there exists a three-way tradeoff between fairness, isolation, and throughput (FIT). A scalable database system which supports atomic distributed transactions can achieve at most two out of these three properties. Fairness corresponds to the intuitive notion that the execution of any given transaction is not deliberately delayed in order to benefit other transactions.  Isolation provides each transaction with the illusion that it has the entire database system to itself. In doing so, isolation guarantees that if any pair of transactions conflict, then one transaction in the pair will always observe the writes of the other. As a consequence, it alleviates application developers from the burden of reasoning about complex interleavings of conflicting transactions' reads and writes. Throughput refers to the ability of the database to process many concurrent transactions per unit time (without hiccups in performance due to clogging).

The FIT tradeoff dictates that there exist three classes of systems that support atomic distributed transactions:
  1. Those that guarantee fairness and isolation, but sacrifice throughput, 
  2. Those that guarantee fairness and throughput, but sacrifice isolation, and 
  3. Those that guarantee isolation and throughput, but sacrifice fairness.


In other words, not only is it possible to build scalable systems with high throughput distributed transactions, but there actually exist two classes of systems that can do so: those that sacrifice isolation, and those that sacrifice fairness. We discuss each of these two alternatives below.

(Latency is not explicitly mentioned in the tradeoff, but systems that give up throughput also give up latency due to cloggage, and systems that give up fairness yield increased latency for those transactions treated unfairly.)


Give up on isolation
As described above, the root source of the database system cloggage isn't the distributed coordination itself. Rather, it is the fact that other transactions that want to access the data that a particular transaction wrote have to wait until after the distributed coordination is complete before reading or writing the shared data. This waiting occurs due to strong isolation, which guarantees that one transaction in a pair of conflicting must observe the writes of the other. Since a transaction's writes are not guaranteed to commit until after the distributed coordination process is complete, concurrent conflicting transactions cannot make progress for the duration of this coordination.

However, all of this assumes that it is unacceptable for transactions writes to not be immediately observable by concurrent conflicting transactions If this "isolation" requirement is dropped, there is no need for other transactions to wait until the distributed coordination is complete before executing and committing.

While giving up on strong isolation seemingly implies that distributed databases cannot guarantee correctness (because transactions execute against potentially stale database state), it turns out that there exists a class of database constraints that can be guaranteed to hold despite the use of weak isolation among transactions. For more details on the kinds of guarantees that can hold on constraints despite weak isolation, Peter Bailis's work on Read Atomic Multi-Partition (RAMP) transactions provides some great intuition.


Give up on fairness
The underlying motivation for giving up isolation in systems is that distributed coordination extends the duration for which transactions with overlapping data accesses are unable to make progress. Intuitively, distributed coordination and isolation mechanisms overlap in time.  This suggests that another way to circumvent the interaction between isolation techniques and distributed coordination is to re-order distributed coordination such that its overlap with any isolation mechanism is minimized. This intuition forms the basis of Isolation-Throughput systems (which give up fairness).  In giving up fairness, database systems gain the flexibility to pick the most opportune time to pay the cost of distributed coordination.  For instance, it is possible to perform coordination outside of transaction boundaries so that the additional time required to do the coordination does not increase the time that conflicting transactions cannot run. In general, when the system does not need to guarantee fairness, it can deliberately prioritize or delay specific transactions in order to benefit overall throughput.

G-Store is a good example of an Isolation-Throughput system (which gives up fairness).  G-Store extends a (non-transactional) distributed key-value store with support for multi-key transactions.  G-Store restricts the scope of transactions to an application defined set of keys called a KeyGroup. An application defines KeyGroups dynamically based on the set of keys it anticipates will be accessed together over the course of some period of time. Note that the only restriction on transactions is that the keys involved in the transaction be part of a single KeyGroup. G-Store allows KeyGroups to be created and disbanded when needed, and therefore effectively provides arbitrary transactions over any set of keys.

When an application defines a KeyGroup, G-Store moves the constituent keys from their nodes to a single leader node. The leader node copies the corresponding key-value pairs, and all transactions on the KeyGroup are executed on the leader. Since all the key-value pairs involved in a transaction are stored on a single node (the leader node), G-Store transactions do not need to execute a distributed commit protocol during transaction execution.

G-Store pays the cost of distributed coordination prior to executing transactions. In order to create a KeyGroup, G-Store executes an expensive distributed protocol to allow a leader node to take ownership of a KeyGroup, and then move the KeyGroup's constituent keys to the leader node. The KeyGroup creation protocol involves expensive distributed coordination, the cost of which is amortized across the transactions which execute on the KeyGroup.

The key point is that while G-Store still must perform distributed coordination, this coordination is done prior to transaction execution --- before the need to be concerned with isolation from other transactions. Once the distributed coordination is complete (all the relevant data has been moved to a single master node), the transaction completes quickly on a single node without forcing concurrent transactions with overlapping data accesses to wait for distributed coordination. Hence, G-Store achieves both high throughput and strong isolation.

However, the requirement that transactions restrict their scope to a single KeyGroup favors transactions that execute on keys which have already been grouped. This is "unfair" to transactions that need to execute on a set of as yet ungrouped keys. Before such transactions can begin executing, G-Store must first disband existing KeyGroups to which some keys may belong, and then create the appropriate KeyGroup --- a process with much higher latency than if the desired KeyGroup already existed.


Conclusions
The fundamental reason for the poor performance of conventional distributed transactions is the fact that the mechanisms for guaranteeing atomicity (distributed coordination), and isolation overlap in time. The key to enabling high throughput distributed transactions is to separate these two concerns. This insight leads to two ways of separating atomicity and isolation mechanisms. The first option is to weaken isolation such that conflicting transactions can execute and commit in parallel. The second option is to re-order atomicity and isolation mechanisms so that they do not overlap in time, and in doing so, give up fairness during transaction execution.

(Edit: MongoDB and HBase both have (or will soon have) limited support for multi-key transactions as long as those keys are within the same partition. However, hopefully it is clear to the reader that this post is discussing the difficulties of implementing distributed --- cross-partition --- transactions).