中英翻译谷歌论文:Percolator

在看 TiDB 的时候,看到 TiDB 的事务模型是沿用了 Google 的上一代分布式事务解决方案 Percolator。​
本论文原文名叫:《Large-scale Incremental Processing Using Distributed Transactions and Notifications》

论文链接:https://storage.googleapis.com/pub-tools-public-publication-data/pdf/36726.pdf

概要

Updating an index of the web as documents are crawled requires continuously transforming a large repository of existing documents as new documents arrive. This task is one example of a class of data processing tasks that transform a large repository of data via small, independent mutations. These tasks lie in a gap between the capabilities of existing infrastructure. Databases do not meet the storage or throughput requirements of these tasks: Google’s indexing system stores tens of petabytes of data and processes billions of updates per day on thousands of machines. MapReduce and other batch-processing systems cannot process small updates individually as they rely on creating large batches for efficiency.

在文件被抓取时更新web索引,需要在新文件到来时不断转换现有的大型文件库。现实中有很多这样的数据处理任务,都是因为一些很小的、独立的变化导致一个大型仓库的转变。这种类型的任务的性能往往受制于已存在设施的容量。数据库并不能满足这些任务的存储或吞吐量要求。谷歌的索引系统存储了几十PB的数据,每天在数千台机器上处理数十亿的更新。MapReduce和其他批处理系统是为了大型批处理任务的效率而量身定制的,并不适合单独的处理小的更新。

We have built Percolator, a system for incrementally processing updates to a large data set, and deployed it to create the Google web search index. By replacing a batch-based indexing system with an indexing system based on incremental processing using Percolator, we process the same number of documents per day, while reducing the average age of documents in Google search results by 50%.

我们已经构建了Percolator,一个用于增量处理大型数据集更新的系统,并部署它来创建谷歌网络搜索索引。通过用基于增量处理的Percolator的索引系统取代基于批处理的索引系统,我们每天处理相同数量的文件,同时将谷歌搜索结果中的文件平均年龄降低了50%。(比如本篇文章在今天中午12点发布,在Google上能在下午一点被搜索到,那年龄就是1个小时)

介绍

Consider the task of building an index of the web that can be used to answer search queries. The indexing system starts by crawling every page on the web and processing them while maintaining a set of invariants on the index. For example, if the same content is crawled under multiple URLs, only the URL with the highest PageRank appears in the index. Each link is also inverted so that the anchor text from each outgoing link is attached to the page the link points to. Link inversion must work across duplicates: links to a duplicate of a page should be forwarded to the highest PageRank duplicate if necessary

对于构建搜索引起系统来说,索引系统从抓取网络上的每一个页面开始,会对它们进行处理,同时维护一套索引的不变性。比如,如果在多个URL下抓取到了相同的内容,只需要将PageRank最高的URL添加到索引中。每个外部链接也会被反向处理,让其锚文本附加到链接指向的页面上。链接反向处理还要考虑复制品:在必要的情况下指向一个复制品的链接应该被指向最高PageRank的页面。

反向链接:是指A站通过域名或锚文本指向B站,从而使网站权重得到提升。 它是一个网页对另一个网页的引用。

锚文本(英语:Anchor text):是网页中关于超链接的一段描述,通常以文本和图片的方式出现。可以指向文中的某个位置,也可以指向其他网页。

This is a bulk-processing task that can be expressed as a series of MapReduce operations: one for clustering duplicates, one for link inversion, etc. It’s easy to maintain invariants since MapReduce limits the parallelism of the computation; all documents finish one processing step before starting the next. For example, when the indexing system is writing inverted links to the current highest-PageRank URL, we need not worry about its PageRank concurrently changing; a previous MapReduce step has already determined its PageRank.

这是一个批量处理的任务,可以表示为一系列的MapReduce操作:一个用于聚类重复,一个用于链接反转,等等。由于MapReduce限制了计算的并行性,所以很容易维护不变量;所有文件在开始下一步之前完成一个处理步骤。例如,当索引系统正在向当前最高的PageRank URL写入反向链接时,我们不必担心其PageRank同时发生变化;之前的MapReduce步骤已经确定了其PageRank。

Now, consider how to update that index after recrawling some small portion of the web. It’s not sufficient to run the MapReduces over just the new pages since, for example, there are links between the new pages and the rest of the web. The MapReduces must be run again over the entire repository, that is, over both the new pages and the old pages. Given enough computing resources, MapReduce’s scalability makes this approach feasible, and, in fact, Google’s web search index was produced in this way prior to the work described here. However, reprocessing the entire web discards the work done in earlier runs and makes latency proportional to the size of the repository, rather than the size of an update.

现在,思考一下如果在抓取之后更新一小部分网站索引。仅仅在新页面上运行MapReduces是不够的,因为例如新页面和其他网页之间存在链接。 MapReduces必须再一次全量运行,包括新旧页面。如果有充足的计算资源,MapReduce的可扩展性使这种方法可行,事实上之前的谷歌的网络搜索索引就是以这种方式产生的。但是,丢弃之前工作结果,转而重新处理整个web会使延迟取决于整个库的大小,而不是取决于更新的数据量。

The indexing system could store the repository in a DBMS and update individual documents while using transactions to maintain invariants. However, existing DBMSs can’t handle the sheer volume of data: Google’s indexing system stores tens of petabytes across thousands of machines. Distributed storage systems like Bigtable can scale to the size of our repository but don’t provide tools to help programmers maintain data invariants in the face of concurrent updates.

索引系统可以将存储库存储在DBMS中,并使用事务维护不变量时更新单个文档。然而,现有的DBMS无法处理巨大的数据量。谷歌的索引系统在数千台机器上存储了几十PB的数据。分布式存储系统Bigtable可以扩展到我们的存储库的大小,但不提供工具来帮助程序员在并发更新的情况下维护数据不变量。

An ideal data processing system for the task of maintaining the web search index would be optimized for incremental processing; that is, it would allow us to maintain a very large repository of documents and update it efficiently as each new document was crawled. Given that the system will be processing many small updates concurrently, an ideal system would also provide mechanisms for maintaining invariants despite concurrent updates and for keeping track of which updates have been processed.

对于维护网络搜索索引的任务,一个理想的数据处理系统将为增量处理进行优化。也就是说,它将允许我们维护一个非常大的文档存储库,并在抓取每个新文档时有效地对其进行更新。考虑到系统将同时处理许多小的更新,理想的系统也会提供机制来维持不变量,不管是并发更新还是保持跟踪哪些更新已被处理。

The remainder of this paper describes a particular incremental processing system: Percolator. Percolator provides the user with random access to a multi-PB repository. Random access allows us to process documents individually, avoiding the global scans of the repository that MapReduce requires. To achieve high throughput, many threads on many machines need to transform the repository concurrently, so Percolator provides ACIDcompliant transactions to make it easier for programmers to reason about the state of the repository; we currently implement snapshot isolation semantics.

本文的其余部分描述了一个特定的增量处理系统:Percolator。Percolator为用户提供对多PB资源库的随机访问。随机访问允许我们单独地处理文档,避免了MapReduce需要的存储库的全局扫描。为了实现高吞吐量,许多机器上的许多线程需要同时转换存储库,因此Percolator提供ACID兼容事务以使程序员更容易推断存储库的状态;我们目前实现了快照隔离的语义。

快照隔离(snapshot isolation,SI)是数据库事务处理中的一个隔离级别,保证事务的读操作将看到一个一致的数据库的版本快照(实际上读取比该事务早的最后一次提交值)。该事务的写操作成功提交,仅当基于该快照的任何并发修改与该事务的修改没有冲突(即写-写冲突)。

图1

In addition to reasoning about concurrency, programmers of an incremental system need to keep track of the state of the incremental computation. To assist them in this task, Percolator provides observers: pieces of code that are invoked by the system whenever a user-specified column changes. Percolator applications are structured as a series of observers; each observer completes a task and creates more work for “downstream” observers by writing to the table. An external process triggers the first observer in the chain by writing initial data into the table.

除了对并发性进行推理外,增量系统的程序员还需要跟踪增量计算的状态。为了帮助他们完成这项任务,Percolator提供了observers:当用户指定的列发生变化时,系统调用的代码段。Percolator应用程序是由一系列观察员构成的;每个observers通过写入表格来完成一项任务并为“下游”观察员创造更多的工作。外部进程通过将初始数据写入表中来触发链中的第一个observers。

Percolator was built specifically for incremental processing and is not intended to supplant existing solutions for most data processing tasks. Computations where the result can’t be broken down into small updates (sorting a file, for example) are better handled by MapReduce. Also, the computation should have strong consistency requirements; otherwise, Bigtable is sufficient. Finally, the computation should be very large in some dimension (total data size, CPU required for transformation, etc.); smaller computations not suited to MapReduce or Bigtable can be handled by traditional DBMSs.

Percolator专门为增量处理而构建,并不打算替代大多数数据处理任务的现有解决方案。如果计算结果不能被分解成小的更新(例如,对文件进行排序),那么由MapReduce来处理会更好。另外,计算应该有很强的一致性要求,否则Bigtable就足够了。最后,在计算某个维度应该会非常大(总数据大小,转换所需的CPU等);小数据量的计算可以使用 DBMSs 来处理。

Within Google, the primary application of Percolator is preparing web pages for inclusion in the live web search index. By converting the indexing system to an incremental system, we are able to process individual documents as they are crawled. This reduced the average document processing latency by a factor of 100, and the average age of a document appearing in a search result dropped by nearly 50 percent (the age of a search result includes delays other than indexing such as the time between a document being changed and being crawled). The system has also been used to render pages into images; Percolator tracks the relationship between web pages and the resources they depend on, so pages can be reprocessed when any depended-upon resources change.

在谷歌中,Percolator的主要应用是准备网页以包含在实时网络搜索索引中。通过将索引系统转换为增量系统,我们可以在抓取它们时处理单个文档。这将平均文档处理延迟减少了100倍,并且搜索结果中出现的文档的平均年龄下降了近50%(搜索结果的年龄包括除索引之外的延迟,例如文档之间的时间改变并被抓取),该系统还被用于将网页渲染成图像。Percolator跟踪网页和他们所依赖的资源之间的关系,因此当任何依赖的资源发生变化时可以对页面进行重新处理。

设计

Percolator provides two main abstractions for performing incremental processing at large scale: ACID transactions over a random-access repository and observers, a way to organize an incremental computation.

Percolator为大规模执行增量处理提供了两个主要抽象:通过随机访问库和observers的ACID事务,这是组织增量计算的一种方式。

A Percolator system consists of three binaries that run on every machine in the cluster: a Percolator worker, a Bigtable tablet server, and a GFS chunkserver. All observers are linked into the Percolator worker, which scans the Bigtable for changed columns (“notifications”) and invokes the corresponding observers as a function call in the worker process. The observers perform transactions by sending read/write RPCs to Bigtable tablet servers, which in turn send read/write RPCs to GFS chunkservers. The system also depends on two small services: the timestamp oracle and the lightweight lock service. The timestamp oracle provides strictly increasing timestamps: a property required for correct operation of the snapshot isolation protocol. Workers use the lightweight lock service to make the search for dirty notifications more efficient.

一个Percolator系统由三个二进制文件组成,它们在集群中的每台机器上运行:一个Percolator worker,一个Bigtable tablet服务器和一个GFS块服务器。所有的observers都被链接到Percolator Worker中,它扫描Bigtable的变化列,并在Worker进程中以函数调用的方式调用相应的observers。observers通过向Bigtable tablet服务器发送读取/写入RPC来执行事务,Bigtable tablet服务器将读/写RPC发送给GFS块服务器。该系统还依赖于两个小服务:时间戳oracle和轻量级锁服务。时间戳oracle提供严格增加的时间戳:正确操作快照隔离协议所需的属性。Workers 使用轻量级锁定服务来更有效地搜索脏数据通知。

From the programmer’s perspective, a Percolator repository consists of a small number of tables. Each table is a collection of “cells” indexed by row and column. Each cell contains a value: an uninterpreted array of bytes. (Internally, to support snapshot isolation, we represent each cell as a series of values indexed by timestamp.)

从程序员的角度来看,Percolator库由少量表格组成。每个表格都是按行和列索引的“单元格”的集合。每个单元格包含一个值:一个未被解释的字节数组。(在内部,为了支持快照隔离,我们将每个单元格表示为由时间戳索引的一系列值。)

The design of Percolator was influenced by the requirement to run at massive scales and the lack of a requirement for extremely low latency. Relaxed latency requirements let us take, for example, a lazy approach to cleaning up locks left behind by transactions running on failed machines. This lazy, simple-to-implement approach potentially delays transaction commit by tens of seconds. This delay would not be acceptable in a DBMS running OLTP tasks, but it is tolerable in an incremental processing system building an index of the web. Percolator has no central location for transaction management; in particular, it lacks a global deadlock detector. This increases the latency of conflicting transactions but allows the system to scale to thousands of machines.

Percolator的设计受到两个前提的影响:一是必须运行在大规模数据上,二是并不要求非常低的延迟。例如,不严格的延迟要求让我们采取一种懒惰的方法来清除在失败的机器上运行的事务遗留的锁。这种懒惰的、简单易行的方法可能会使事务提交延迟几十秒。在运行OLTP任务的DBMS中,这种延迟是不可接受的,但在构建Web索引的增量处理系统中是可以容忍的。Percolator的事务管理缺乏一个全局控制器:尤其是它缺少一个全局死锁检测器。这增加了冲突事务的延迟,但允许系统扩展到数千台机器。

Bigtable概述

Percolator is built on top of the Bigtable distributed storage system. Bigtable presents a multi-dimensional sorted map to users: keys are (row, column, timestamp) tuples. Bigtable provides lookup and update operations on each row, and Bigtable row transactions enable atomic read-modify-write operations on individual rows. Bigtable handles petabytes of data and runs reliably on large numbers of (unreliable) machines.

Percolator建立在Bigtable分布式存储系统之上。Bigtable为用户呈现一个多维排序的映射:键是(行,列,时间戳)元组。 Bigtable在每一行上提供查找和更新操作,而Bigtable行事务可以对单个行进行原子读 – 修改 – 写操作。Bigtable可处理PB级数据,并可在大量(不可靠)机器上可靠运行。

A running Bigtable consists of a collection of tablet servers, each of which is responsible for serving several tablets (contiguous regions of the key space). A master coordinates the operation of tablet servers by, for example, directing them to load or unload tablets. A tablet is stored as a collection of read-only files in the Google SSTable format. SSTables are stored in GFS; Bigtable relies on GFS to preserve data in the event of disk loss. Bigtable allows users to control the performance characteristics of the table by grouping a set of columns into a locality group. The columns in each locality group are stored in their own set of SSTables, which makes scanning them less expensive since the data in other columns need not be scanned.

一个运行中的Bigtable包含一批tablet服务器,每个负责服务多个tablet(key空间内连续的域)。一个master负责协调控制各tablet服务器的操作,比如指示它们装载或卸载tablet。一个tablet在Google SSTable上被存储为一系列只读的文件。SSTable被存储在GFS;Bigtable依靠GFS来保护数据以防磁盘故障。Bigtable允许用户控制table的执行特征,比如将一批列分配为一个locality group。locality group中的列被存储在独立隔离的SSTable集合中,在其他列不需要被扫描时可以有效降低扫描成本。

The decision to build on Bigtable defined the overall shape of Percolator. Percolator maintains the gist of Bigtable’s interface: data is organized into Bigtable rows and columns, with Percolator metadata stored alongside in special columns (see Figure 5). Percolator’s API closely resembles Bigtable’s API: the Percolator library largely consists of Bigtable operations wrapped in Percolator-specific computation. The challenge, then, in implementing Percolator is providing the features that Bigtable does not: multirow transactions and the observer framework.

基于Bigtable来构建Percolator,也就大概确定了Percolator的架构样式。数据被组织到Bigtable行和列中,Percolator会将元数据存储在旁边特殊的列中(见图5)。Percolator的API和Bigtable的API也很相似:Percolator中大量API就是在特定的计算中封装了对Bigtable的操作。实现Percolator的挑战就是提供Bigtable没有的功能:多行事务和observers框架。

事务

Percolator provides cross-row, cross-table transactions with ACID snapshot-isolation semantics. Percolator users write their transaction code in an imperative language (currently C++) and mix calls to the Percolator API with their code. Figure 2 shows a simplified version of clustering documents by a hash of their contents. In this example, if Commit() returns false, the transaction has conflicted (in this case, because two URLs with the same content hash were processed simultaneously) and should be retried after a backoff. Calls to Get() and Commit() are blocking; parallelism is achieved by running many transactions simultaneously in a thread pool.

Percolator使用ACID快照隔离语义提供跨行跨表事务。Percolator的用户可使用必要的语言(当前是C++)编写它们的事务代码,然后加上对Percolator API的调用。图2 表现了一段简化的基于内容hash的文档聚类分析程序。这个例子中,Commit 返回 false 说明事务存在冲突(在这种情况下,因为两个具有相同内容哈希的URL被同时处理),所以需要回退重试。调用 Get 和 Commit 是阻塞式的,可以通过在一个线程池里同时运行很多事务来增强并行。

图2

While it is possible to incrementally process data without the benefit of strong transactions, transactions make it more tractable for the user to reason about the state of the system and to avoid the introduction of errors into a long-lived repository. For example, in a transactional web-indexing system the programmer can make assumptions like: the hash of the contents of a document is always consistent with the table that indexes duplicates. Without transactions, an ill-timed crash could result in a permanent error: an entry in the document table that corresponds to no URL in the duplicates table. Transactions also make it easy to build index tables that are always up to date and consistent. Note that both of these examples require transactions that span rows, rather than the single-row transactions that Bigtable already provides.

尽管不利用强事务的优势也可能做到数据增量处理,但事务使得用户能更方便的推导出系统状态,避免将难以发现的错误带到长期使用的存储库中。比如,在一个事务型的web索引系统中,开发者能保证一个原始文档的内容hash值永远和索引复制表中的值保持一致。而没有事务,一个不合时的冲击可能造成永久的不一致问题。事务也让构建最新、一致的索引表更简单。注意我们说的事务指的是跨行事务,而不是Bigtable提供的单行事务。

Percolator stores multiple versions of each data item using Bigtable’s timestamp dimension. Multiple versions are required to provide snapshot isolation, which presents each transaction with the appearance of reading from a stable snapshot at some timestamp. Writes appear in a different, later, timestamp. Snapshot isolation protects against write-write conflicts: if transactions A and B, running concurrently, write to the same cell, at most one will commit. Snapshot isolation does not provide serializability; in particular, transactions running under snapshot isolation are subject to write skew. The main advantage of snapshot isolation over a serializable protocol is more efficient reads. Because any timestamp represents a consistent snapshot, reading a cell requires only performing a Bigtable lookup at the given timestamp; acquiring locks is not necessary. Figure 3 illustrates the relationship between transactions under snapshot isolation.

Percolator使用Bigtable中的时间戳维度,对每个数据项都存储多版本,以实现快照隔离。在一个事务中,按照某个时间戳读取出来的某个版本的数据就是一个隔离的快照,然后再用一个较迟的时间戳写入新的数据。快照隔离可以有效的解决“写-写”冲突:如果事务A和B并行运行,往某个cell执行写操作,大部分情况下都能正常提交。与可序列化隔离级别相比,快照隔离的主要优势在于更高效的读取。因为任何时间戳都代表一个一致的快照,读取一个单元只需要在给定的时间戳上执行一个Bigtable查询;获取锁是不必要的。图3说明了快照隔离下事务之间的关系。

图3

Because it is built as a client library accessing Bigtable, rather than controlling access to storage itself, Percolator faces a different set of challenges implementing distributed transactions than traditional PDBMSs. Other parallel databases integrate locking into the system component that manages access to the disk: since each node already mediates access to data on the disk it can grant locks on requests and deny accesses that violate locking requirements.

由于Percolator是作为访问Bigtable的客户端库而构建的,而不是控制对存储的访问,因此它在实现分布式事务方面面临着与传统PDBMS不同的挑战。传统PDBMS为了实现分布式事务,可以集成基于磁盘访问管理的锁机制:PDBMS中每个节点都会间接访问磁盘上的数据,控制磁盘访问的锁机制就可以控制生杀大权,拒绝那些违反锁要求的访问请求。

By contrast, any node in Percolator can (and does) issue requests to directly modify state in Bigtable: there is no convenient place to intercept traffic and assign locks. As a result, Percolator must explicitly maintain locks. Locks must persist in the face of machine failure; if a lock could disappear between the two phases of commit, the system could mistakenly commit two transactions that should have conflicted. The lock service must provide high throughput; thousands of machines will be requesting locks simultaneously. The lock service should also be low-latency; each Get() operation requires reading locks in addition to data, and we prefer to minimize this latency. Given these requirements, the lock server will need to be replicated (to survive failure), distributed and balanced (to handle load), and write to a persistent data store. Bigtable itself satisfies all of our requirements, and so Percolator stores its locks in special in-memory columns in the same Bigtable that stores data and reads or modifies the locks in a Bigtable row transaction when accessing data in that row.

相比之下,Percolator中的任何节点都可以发出请求,直接修改Bigtable中的状态:没有太好的办法来拦截并分配锁。所以,Percolator一定要明确的维护锁。锁必须持久化以防机器故障;如果一个锁在两阶段提交之间消失,系统可能错误的提交两个会冲突的事务。锁服务一定要高吞吐量,因为几千台机器将会并行的请求锁。锁服务应该也是低延迟的;每个Get 操作都需要申请“读取锁”,我们倾向于最小化延迟。锁服务器需要冗余备份(以防异常故障)、分布式和负载均衡(以解决负载),并需要持久化存储。Bigtable作为存储介质,可以满足所有我们的需求,所以Percolator将锁和数据存储在同一行,用特殊的内存列,访问某行数据时Percolator将在一个Bigtable行事务中对同行的锁执行读取和修改。

We’ll now consider the transaction protocol in more detail. Figure 6 shows the pseudocode for Percolator transactions, and Figure 4 shows the layout of Percolator data and metadata during the execution of a transaction. These various metadata columns used by the system are described in Figure 5. The transaction’s constructor asks the timestamp oracle for a start timestamp (line 6), which determines the consistent snapshot seen by Get(). Calls to Set() are buffered (line 7) until commit time. The basic approach for committing buffered writes is two-phase commit, which is coordinated by the client. Transactions on different machines interact through row transactions on Bigtable tablet servers.

我们现在考虑事务协议的更多细节。我们现在考虑事务协议的更多细节。图6展现了Percolator事务的伪代码,图4展现了在执行事务期间Percolator数据和元数据的布局。图5中描述了系统如何使用这些不同的元数据列。事务构造器向oracle请求一个开始的时间戳(第六行),它决定了 Get 将会看到的一致性快照。Set 操作将被缓冲(第七行),直到 Commit 被调用。提交被缓冲的Set操作的基本途径是两阶段提交,被客户端协调控制。不同机器上基于Bigtable行事务执行各自的操作,并相互影响,最终实现整体的分布式事务。

图4

图4

![图5](Percolator/2021-10-05 20-03-21 的屏幕截图.png)

In the first phase of commit (“prewrite”), we try to lock all the cells being written. (To handle client failure, we designate one lock arbitrarily as the primary; we’ll discuss this mechanism below.) The transaction reads metadata to check for conflicts in each cell being written. There are two kinds of conflicting metadata: if the transaction sees another write record after its start timestamp, it aborts (line 32); this is the write-write conflict that snapshot isolation guards against. If the transaction sees another lock at any timestamp, it also aborts (line 34). It’s possible that the other transaction is just being slow to release its lock after having already committed below our start timestamp, but we consider this unlikely, so we abort. If there is no conflict, we write the lock and the data to each cell at the start timestamp (lines 36-38).

在Commit的第一阶段(“预写”,prewrite),我们尝试锁住所有被写的cell。(为了处理客户端失败的情况,我们指派一个任意锁为“primary”;后续会讨论此机制)事务在每个被写的cell上读取元数据来检查冲突。有两种冲突场景:如果事务在它的开始时间戳之后看见另一个写记录,它会取消(32行),这是“写-写”冲突,也就是快照隔离机制所重点保护的情况。如果事务在任意时间戳看见另一个锁,它也取消(34行)。如果看到的锁在我们的开始时间戳之前,可能提交的事务已经提交了却因为某种原因推迟了锁的释放,但是这种情况可能性不大,保险起见所以取消。如果没有冲突,我们将锁和数据写到各自cell的开始时间戳下(36-38行)。

class Transaction {
  struct Write { Row row; Column col; string value; };
  vector<Write> writes ;
  int start ts ;
  Transaction() : start ts (oracle.GetTimestamp()) {}
  void Set(Write w) { writes .push back(w); }
  bool Get(Row row, Column c, string* value) {
      while (true) {
          bigtable::Txn T = bigtable::StartRowTransaction(row);
          // Check for locks that signal concurrent writes.
          if (T.Read(row, c+"lock", [0, start ts ])) {
          // There is a pending lock; try to clean it and wait
              BackoffAndMaybeCleanupLock(row, c);
              continue;
          }

          // Find the latest write below our start timestamp.
          latest write = T.Read(row, c+"write", [0, start ts ]);
          if (!latest write.found()) return false; // no data
          int data ts = latest write.start timestamp();
          *value = T.Read(row, c+"data", [data ts, data ts]);
          return true;
      }
  }
  // Prewrite tries to lock cell w, returning false in case of conflict.
  bool Prewrite(Write w, Write primary) {
      Column c = w.col;
      bigtable::Txn T = bigtable::StartRowTransaction(w.row);

      // Abort on writes after our start timestamp . . .
      if (T.Read(w.row, c+"write", [start ts , ∞])) return false;
      // . . . or locks at any timestamp.
      if (T.Read(w.row, c+"lock", [0, ∞])) return false;

      T.Write(w.row, c+"data", start ts , w.value);
      T.Write(w.row, c+"lock", start ts ,
          {primary.row, primary.col}); // The primary’s location.
      return T.Commit();
  }
  bool Commit() {
      Write primary = writes [0];
      vector<Write> secondaries(writes .begin()+1, writes .end());
      if (!Prewrite(primary, primary)) return false;
      for (Write w : secondaries)
          if (!Prewrite(w, primary)) return false;

      int commit ts = oracle .GetTimestamp();

      // Commit primary first.
      Write p = primary;
      bigtable::Txn T = bigtable::StartRowTransaction(p.row);
      if (!T.Read(p.row, p.col+"lock", [start ts , start ts ]))
          return false; // aborted while working
      T.Write(p.row, p.col+"write", commit ts,
          start ts ); // Pointer to data written at start ts .
      T.Erase(p.row, p.col+"lock", commit ts);
      if (!T.Commit()) return false; // commit point

      // Second phase: write out write records for secondary cells.
      for (Write w : secondaries) {
          bigtable::Write(w.row, w.col+"write", commit ts, start ts );
          bigtable::Erase(w.row, w.col+"lock", commit ts);
      }
      return true;
  }
 } // class Transaction

If no cells conflict, the transaction may commit and proceeds to the second phase. At the beginning of the second phase, the client obtains the commit timestamp from the timestamp oracle (line 48). Then, at each cell (starting with the primary), the client releases its lock and make its write visible to readers by replacing the lock with a write record. The write record indicates to readers that committed data exists in this cell; it contains a pointer to the start timestamp where readers can find the actual data. Once the primary’s write is visible (line 58), the transaction must commit since it has made a write visible to readers.

如果没有cell发生冲突,事务可以提交并执行到第二阶段。在第二阶段的开始,客户端从oracle获取提交时间戳(48行)。然后,在每个cell(从“primary”开始),客户端释放它的锁,替换锁为一个写记录以让其他读事务知晓。读过程中看到写记录就可以确定它所在时间戳下的新数据已经完成了提交,并可以用它的时间戳作为“指针”找到提交的真实数据。一旦“primary”的写记录可见了(58行),其他读事务就会知晓新数据已写入,所以事务必须提交。

A Get() operation first checks for a lock in the timestamp range [0, start timestamp], which is the range of timestamps visible in the transaction’s snapshot (line 12). If a lock is present, another transaction is concurrently writing this cell, so the reading transaction must wait until the lock is released. If no conflicting lock is found, Get() reads the latest write record in that timestamp range (line 19) and returns the data item corresponding to that write record (line 22).

一个 Get 操作第一步是在时间戳范围 [0,开始时间戳](是右开区间) 内检查有没有锁,这个范围是在此次事务快照所有可见的时间戳(12行)。如果看到一个锁,表示另一个事务在并发的写这个cell,所以读事务必须等待直到此锁释放。如果没有锁出现,Get 操作在时间戳范围内读取最近的写记录(19行)然后返回它的时间戳对应的数据项(22行)。

Transaction processing is complicated by the possibility of client failure (tablet server failure does not affect the system since Bigtable guarantees that written locks persist across tablet server failures). If a client fails while a transaction is being committed, locks will be left behind. Percolator must clean up those locks or they will cause future transactions to hang indefinitely. Percolator takes a lazy approach to cleanup: when a transaction A encounters a conflicting lock left behind by transaction B, A may determine that B has failed and erase its locks.

由于客户端随时可能故障,导致了事务处理的复杂度(Bigtable可保证tablet服务器故障不影响系统,因为Bigtable确保写锁持久存在)。如果一个客户端在一个事务被提交时发生故障,锁将被遗弃。Percolator必须清理这些锁,否则他们将导致将来的事务被非预期的挂起。Percolator用一个懒惰的途径来实现清理:当一个事务A遭遇一个被事务B遗弃的锁,A可以确定B遭遇故障,并清除它的锁。

It is very difficult for A to be perfectly confident in its judgment that B is failed; as a result we must avoid a race between A cleaning up B’s transaction and a notactually-failed B committing the same transaction. Percolator handles this by designating one cell in every transaction as a synchronizing point for any commit or cleanup operations. This cell’s lock is called the primary lock. Both A and B agree on which lock is primary (the location of the primary is written into the locks at all other cells). Performing either a cleanup or commit operation requires modifying the primary lock; since this modification is performed under a Bigtable row transaction, only one of the cleanup or commit operations will succeed. Specifically: before B commits, it must check that it still holds the primary lock and replace it with a write record. Before A erases B’s lock, A must check the primary to ensure that B has not committed; if the primary lock is still present, then it can safely erase the lock.

然而希望A很准确的判断出B失败是十分困难的;可能发生这样的情况,A准备清理B的事务,而事实上B并未故障还在尝试提交事务,我们必须想办法避免。Percolator在每个事务中会对任意的提交或者清理操作指定一个cell作为同步点。这个cell的锁被称之为“primary锁”。A和B在哪个锁是primary上达成一致(primary锁的位置被写入所有cell的锁中)。执行一个清理或提交操作都需要修改primary锁;这个修改操作会在一个Bigtable行事务之下执行,所以只有一个操作可以成功。特别的,在B提交之前,它必须检查它依然拥有primary锁,提交时会将它替换为一个写记录。在A删除B的锁之前,A也必须检查primary锁来保证B没有提交;如果primary锁依然存在它就能安全的删除B的锁。

When a client crashes during the second phase of commit, a transaction will be past the commit point (it has written at least one write record) but will still have locks outstanding. We must perform roll-forward on these transactions. A transaction that encounters a lock can distinguish between the two cases by inspecting the primary lock: if the primary lock has been replaced by a write record, the transaction which wrote the lock must have committed and the lock must be rolled forward, otherwise it should be rolled back (since we always commit the primary first, we can be sure that it is safe to roll back if the primary is not committed). To roll forward, the transaction performing the cleanup replaces the stranded lock with a write record as the original transaction would have done.

如果一个客户端在第二阶段提交时崩溃,一个事务将错过提交点(它已经写过至少一个写记录),而且出现未解决的锁。我们必须对这种事务执行roll-forward。当其他事务遭遇了这个因为故障而被遗弃的锁时,它可以通过检查primary锁来区分这两种情况:如果primary锁已被替换为一个写记录,写入此锁的事务则必须提交,此锁必须被roll forward;否则它应该被回滚(因为我们总是先提交primary,所以如果primary没有提交我们能肯定回滚是安全的)。执行roll forward时,执行清理的事务也是将搁浅的锁替换为一个写记录。

Since cleanup is synchronized on the primary lock, it is safe to clean up locks held by live clients; however, this incurs a performance penalty since rollback forces the transaction to abort. So, a transaction will not clean up a lock unless it suspects that a lock belongs to a dead or stuck worker. Percolator uses simple mechanisms to determine the liveness of another transaction. Running workers write a token into the Chubby lockservice [8] to indicate they belong to the system; other workers can use the existence of this token as a sign that the worker is alive (the token is automatically deleted when the process exits). To handle a worker that is live, but not working, we additionally write the wall time into the lock; a lock that contains a too-old wall time will be cleaned up even if the worker’s liveness token is valid. To handle longrunning commit operations, workers periodically update this wall time while committing.

清理操作在primary锁上是同步的,所以清理活跃客户端持有的锁是安全的;然而,由于回滚会迫使事务中止,因此会产生性能损失。所以,一个事务将不会清理一个锁除非它猜测这个锁属于一个僵死的worker。Percolator使用简单的机制来确定另一个事务的活跃度。运行中的worker会写一个token到Chubby锁服务来指示他们属于本系统,token会被其他worker视为一个代表活跃度的信号(当处理退出时token会被自动删除)。为了防止worker假死,附加的写入一个wall time到锁中;一个锁的wall time如果太老,即使token有效也会被清理。有些操作运行很长时间才会提交,针对这种情况,在整个提交过程中worker会周期的更新wall time。

时间戳

The timestamp oracle is a server that hands out timestamps in strictly increasing order. Since every transaction requires contacting the timestamp oracle twice, this service must scale well. The oracle periodically allocates a range of timestamps by writing the highest allocated timestamp to stable storage; given an allocated range of timestamps, the oracle can satisfy future requests strictly from memory. If the oracle restarts, the timestamps will jump forward to the maximum allocated timestamp (but will never go backwards). To save RPC overhead (at the cost of increasing transaction latency) each Percolator worker batches timestamp requests across transactions by maintaining only one pending RPC to the oracle. As the oracle becomes more loaded, the batching naturally increases to compensate. Batching increases the scalability of the oracle but does not affect the timestamp guarantees. Our oracle serves around 2 million timestamps per second from a single machine.

时间戳oracle是一个用严格的单调增序给外界分配时间戳的服务器。因为每个事务都需要调用oracle两次,这个服务必须有很好的可伸缩性。oracle会定期分配出一个时间戳范围,通过将范围中的最大值写入稳定的存储;范围确定后,oracle能在内存中原子递增来快速分配时间戳,查询时也不涉及磁盘I/O。如果oracle重启,将以稳定存储中的上次范围的最大值作为开始值。为了节省RPC消耗(会增加事务延迟)Percolator的worker会维持一个长连接RPC到oracle,低频率的、批量的获取时间戳。随着oracle负载的增加,worker可通过增加每次批处理返回的量来缓解。批处理有效的增强了时间戳oracle的可伸缩性而不影响其功能。我们oracle中单台机器每秒向外分配接近两百万的时间戳。

The transaction protocol uses strictly increasing timestamps to guarantee that Get() returns all committed writes before the transaction’s start timestamp. To see how it provides this guarantee, consider a transaction R reading at timestamp TR and a transaction W that committed at timestamp TW < TR; we will show that R sees W’s writes. Since TW < TR, we know that the timestamp oracle gave out TW before or in the same batch as TR; hence, W requested TW before R received TR. We know that R can’t do reads before receiving its start timestamp TR and that W wrote locks before requesting its commit timestamp TW . Therefore, the above property guarantees that W must have at least written all its locks before R did any reads; R’s Get() will see either the fullycommitted write record or the lock, in which case W will block until the lock is released. Either way, W’s write is visible to R’s Get().

事务协议使用严格增加的时间戳来保证 Get 在事务的开始时间戳之前返回所有已提交的写写操作。举个例子,如果一个事务R在时间戳TR执行读操作,事务W在时间戳TR执行提交,并且TW < TR;因为TW < TR所以oracle肯定是在TR之前或相同的批处理中给出TW;因此事务W的时间戳TW是在事务R的时间戳TR之前。我们知道R在收到TR之前不能执行读取操作,而W在它的提交时间戳TW之前必定完成了锁的写入;因此,上面的推理保证了W在R做任何读之前就写入了它所有的锁;事务R Get 要么看到已经完全提交的写记录,要么看到锁,在看到锁时R将阻塞直到锁被释放所以在任何情况下,W的写对R的 Get 都是可见的。

通知

Transactions let the user mutate the table while maintaining invariants, but users also need a way to trigger and run the transactions. In Percolator, the user writes code (“observers”) to be triggered by changes to the table, and we link all the observers into a binary running alongside every tablet server in the system. Each observer registers a function and a set of columns with Percolator, and Percolator invokes the function after data is written to one of those columns in any row.

事务可以让用户改变table,同时维护了不变量,但是用户同时还需要一个方法来触发和运行事务。在Percolator,用户编写的代码(“observers”)将因表的变化而触发,然后我们将所有observers放入一个可执行文件(也就是上面提到的Percolator Worker),它会伴随每一个tablet服务器运行。每个observers向Percolator注册一个function和表的列字段,当数据被写到这些列时Percolator会调用此function。

Percolator applications are structured as a series of observers; each observer completes a task and creates more work for “downstream” observers by writing to the table. In our indexing system, a MapReduce loads crawled documents into Percolator by running loader transactions, which trigger the document processor transaction to index the document (parse, extract links, etc.). The document processor transaction triggers further transactions like clustering. The clustering transaction, in turn, triggers transactions to export changed document clusters to the serving system.

Percolator应用的结构就是一系列的observers;每个 observer完成一个任务然后对相应table执行写操作,从而触发“下游”的observer任务。在我们的索引系统中,一个MapReduce通过运行事务加载器将抓取的文档加载到Percolator中,事务加载器触发文档处理器事务为文档建立索引(解析、提取链接等),文档处理器事务触发更多后续的事务比如聚类分析。最后聚类分析反过来触发事务将改变的文档聚类数据导出到在线服务系统。

Notifications are similar to database triggers or events in active databases , but unlike database triggers, they cannot be used to maintain database invariants. In particular, the triggered observer runs in a separate transaction from the triggering write, so the triggering write and the triggered observer’s writes are not atomic. Notifications are intended to help structure an incremental computation rather than to help maintain data integrity.

通知类似于数据库中的触发器或者事件,但是与数据库触发器不同,它们不能被用于维护数据库不变量。比如某个写操作触发了observer逻辑,写操作和observer将运行在各自的事务中,所以它们产生的写不是原子的。通知的目的是帮助构建一个增量计算,而不是帮助维护数据的完整性。

This difference in semantics and intent makes observer behavior much easier to understand than the complex semantics of overlapping triggers. Percolator applications consist of very few observers — the Google indexing system has roughly 10 observers. Each observer is explicitly constructed in the main() of the worker binary, so it is clear what observers are active. It is possible for several observers to observe the same column, but we avoid this feature so it is clear what observer will run when a particular column is written. Users do need to be wary about infinite cycles of notifications, but Percolator does nothing to prevent this; the user typically constructs a series of observers to avoid infinite cycles.

因此,相比数据库触发器,observer的行为更易理解。Percolator应用其实包含很少的观察者——Google索引系统有大概10个观察者。每个observer都是在worker可执行文件的main函数中明确构造的,所以很清楚哪些observer是活跃的。可能有多个observer需要观察同一个列,但是我们避免了这个特性,所以可以很清楚的知道当一个特定的列被写后哪个observer将运行。不过用户需要担心通知的无限循环,Percolator没有为此多做考虑;用户通常是构造一连串依次执行的观察器来避免无限循环。

We do provide one guarantee: at most one observer’s transaction will commit for each change of an observed column. The converse is not true, however: multiple writes to an observed column may cause the corresponding observer to be invoked only once. We call this feature message collapsing, since it helps avoid computation by amortizing the cost of responding to many notifications. For example, it is sufficient for http://google.com to be reprocessed periodically rather than every time we discover a new link pointing to it.

我们提供一个保证:对一个被观察列的每次改变,至多一个observer的事务被提交。反之则不然:对一个被观察列的多次写入可能导致相应的observer只被调用一次。我们称这个特性为消息重叠,它可以避免不必要的重复计算。例如,对 http://google.com 页面来说,周期性的通知其变化就够了,不需要每当一个新链接指向它时就触发一次。

To provide these semantics for notifications, each observed column has an accompanying “acknowledgment” column for each observer, containing the latest start timestamp at which the observer ran. When the observed column is written, Percolator starts a transaction to process the notification. The transaction reads the observed column and its corresponding acknowledgment column. If the observed column was written after its last acknowledgment, then we run the observer and set the acknowledgment column to our start timestamp. Otherwise, the observer has already been run, so we do not run it again. Note that if Percolator accidentally starts two transactions concurrently for a particular notification, they will both see the dirty notification and run the observer, but one will abort because they will conflict on the acknowledgment column. We promise that at most one observer will commit for each notification.

为了给通知机制提供这些语义,每个被监测列旁边都有一个“acknowledgment”列,供每个观察者使用,它包含最近一次observer启动的开始时间戳。被监测列被写入时,Percolator启动一个事务来处理通知。事务读取被监测列和它对应的acknowledgment列。如果被监测列发生写操作的时间戳在acknowledgment列的最近时间戳之后,我们就运行观察者逻辑,并设置acknowledgment列为新的开始时间戳。否则,说明已经有观察者被运行了,所以我们不重复运行它。请注意,如果Percolator不小心为一个特定的通知同时启动了两个事务,它们都会看到脏通知并运行observer,但其中一个会中止,因为它们会在确认列上发生冲突。我们保证对每个通知至多一个observer可以提交。

To implement notifications, Percolator needs to efficiently find dirty cells with observers that need to be run. This search is complicated by the fact that notifications are rare: our table has trillions of cells, but, if the system is keeping up with applied load, there will only be millions of notifications. Additionally, observer code is run on a large number of client processes distributed across a collection of machines, meaning that this search for dirty cells must be distributed.

为了实现通知机制,Percolator需要高效找到需要被运行的observers的脏cell。这个搜索是复杂的因为通知往往是稀疏的:我们表有万亿的cell,但是可能只会有百万个通知。而且,观察者的代码运行在一大批分布式的跨大量机器的客户端进程上,这意味着脏cell搜索也必须是分布式的。

To identify dirty cells, Percolator maintains a special “notify” Bigtable column, containing an entry for each dirty cell. When a transaction writes an observed cell, it also sets the corresponding notify cell. The workers perform a distributed scan over the notify column to find dirty cells. After the observer is triggered and the transaction commits, we remove the notify cell. Since the notify column is just a Bigtable column, not a Percolator column, it has no transactional properties and serves only as a hint to the scanner to check the acknowledgment column to determine if the observer should be run.

为了识别脏cell,Percolator维护一个特殊的 "notify" Bigtable列,其中包含每个脏cell的条目。当一个事务对被监测cell执行写操作时,它同时设置对应的notify cell。worker对notify列执行一个分布式扫描来找到脏cell。在观察者被触发并且事务提交成功后,我们会删除对应的notify cell。因为notify列只是一个Bigtable列,不是个Percolator列,它没有事务型属性,只是作为一个暗示,配合acknowledgment列来帮助扫描器确定是否运行观察者。

To make this scan efficient, Percolator stores the notify column in a separate Bigtable locality group so that scanning over the column requires reading only the millions of dirty cells rather than the trillions of total data cells. Each Percolator worker dedicates several threads to the scan. For each thread, the worker chooses a portion of the table to scan by first picking a random Bigtable tablet, then picking a random key in the tablet, and finally scanning the table from that position. Since each worker is scanning a random region of the table, we worry about two workers running observers on the same row concurrently. While this behavior will not cause correctness problems due to the transactional nature of notifications, it is inefficient. To avoid this, each worker acquires a lock from a lightweight lock service before scanning the row. This lock server need not persist state since it is advisory and thus is very scalable.

为了使扫描高效,Percolator存储notify列为一个独立的Bigtable locality group,所以扫描时仅需读取百万个脏cell,而不是万亿行个cell。每个Percolator的worker指定几个线程负责扫描。对每个线程,worker为其分配table的一部分作为扫描范围,首先挑选一个随机的tablet,然后挑选一个随机的key,然后从那个位置开始扫描。因为每个worker都在扫描table中的一个随机范围,我们担心两个worker会扫描到同一行、并发的运行observers。虽然由于通知的事务本性,这种行为不会导致数据准确性问题,但这是不高效的。为了避免这样,每个worker在扫描某行之前需要从一个轻量级锁服务中申请锁。这个锁服务只是咨询性质、并不严格,所以不需要持久化,因此非常可伸缩。

The random-scanning approach requires one additional tweak: when it was first deployed we noticed that scanning threads would tend to clump together in a few regions of the table, effectively reducing the parallelism of the scan. This phenomenon is commonly seen in public transportation systems where it is known as “platooning” or “bus clumping” and occurs when a bus is slowed down (perhaps by traffic or slow loading). Since the number of passengers at each stop grows with time, loading delays become even worse, further slowing the bus. Simultaneously, any bus behind the slow bus speeds up as it needs to load fewer passengers at each stop. The result is a clump of buses arriving simultaneously at a stop [19]. Our scanning threads behaved analogously: a thread that was running observers slowed down while threads “behind” it quickly skipped past the now-clean rows to clump with the lead thread and failed to pass the lead thread because the clump of threads overloaded tablet servers. To solve this problem, we modified our system in a way that public transportation systems cannot: when a scanning thread discovers that it is scanning the same row as another thread, it chooses a new random location in the table to scan. To further the transportation analogy, the buses (scanner threads) in our city avoid clumping by teleporting themselves to a random stop (location in the table) if they get too close to the bus in front of them.

这个随机扫描机制还需要一个附加优化:当它第一次被部署时,我们注意到扫描线程会倾向于在表的几个区域聚集在一起,严重影响了扫描的并行效果。这现象通常可以在公交系统中看到,被称为“bus凝结”效应。某一个bus可能因为某种原因导致速度减慢(比如在某个站上车的乘客太多)。因为每个车站的乘客数量会随时间增长,导致它到达后续车站的时间延后,于是越来越慢。同时,在这个慢bus后面的bus的速度则会提高,因为它在每个站装载的乘客数量减少了。最终的现象就是多辆公交会同时到达后续的车站。我们扫描线程行为与此类似:一个线程由于运行observer减慢,而它之后的线程快速的跳过已被处理的脏cell,逐渐与领头的线程聚集在一起,但是却没能超过领头的线程因为线程凝结导致tablet服务器繁忙过载。为了解决这个问题,我们做了一个公交系统不能实现的优化:当一个扫描线程发现了它和其他的线程在扫描相同的行,它在table中重新选择一个随机定位继续扫描。这就好比在公交系统中,公交车(扫描线程)为避免凝结而时空穿梭到一个随机的车站(table中的某个位置)。

Finally, experience with notifications led us to introduce a lighter-weight but semantically weaker notification mechanism. We found that when many duplicates of the same page were processed concurrently, each transaction would conflict trying to trigger reprocessing of the same duplicate cluster. This led us to devise a way to notify a cell without the possibility of transactional conflict. We implement this weak notification by writing only to the Bigtable “notify” column. To preserve the transactional semantics of the rest of Percolator, we restrict these weak notifications to a special type of column that cannot be written, only notified. The weaker semantics also mean that multiple observers may run and commit as a result of a single weak notification (though the system tries to minimize this occurrence). This has become an important feature for managing conflicts; if an observer frequently conflicts on a hotspot, it often helps to break it into two observers connected by a non-transactional notification on the hotspot.

最后经验让我们采取非常轻量级、弱事务语义、甚至牺牲了部分一致性的通知机制。我们通过只写到Bigtable的 "notify "列来实现这种弱通知。为了保留Percolator其他部分的事务性语义,我们将这些弱通知限制在一种特殊类型的列上,不能写入,只能通知。 较弱的语义也意味着多个观察者可能会因为一个弱通知而运行并提交(尽管系统会尽量减少这种情况的发生)。这已经成为管理冲突的一个重要特征;如果一个观察者经常在一个热点上发生冲突,那么把它分成两个观察者,由热点上的非事务性通知来连接,往往会有帮助。

讨论

One of the inefficiencies of Percolator relative to a MapReduce-based system is the number of RPCs sent per work-unit. While MapReduce does a single large read to GFS and obtains all of the data for 10s or 100s of web pages, Percolator performs around 50 individual Bigtable operations to process a single document.

相对于基于MapReduce的系统,Percolator的低效率之一是每个工作单元发送的RPC数量。MapReduce只需对GFS进行一次大规模的读取,就能获得10到100个网页的所有数据,而Percolator则需要执行大约50次单独的Bigtable操作来处理一个文档。

One source of additional RPCs occurs during commit. When writing a lock, we must do a read-modify-write operation requiring two Bigtable RPCs: one to read for conflicting locks or writes and another to write the new lock. To reduce this overhead, we modified the Bigtable API by adding conditional mutations which implements the read-modify-write step in a single RPC. Many conditional mutations destined for the same tablet server can also be batched together into a single RPC to further reduce the total number of RPCs we send. We create batches by delaying lock operations for several seconds to collect them into batches. Because locks are acquired in parallel, this adds only a few seconds to the latency of each transaction; we compensate for the additional latency with greater parallelism. Batching also increases the time window in which conflicts may occur, but in our low-contention environment this has not proved to be a problem.

导致RPC太多的其中一个因素发生在commit期间。当写入一个锁时就需要两个Bigtable的RPC:一个为查询冲突锁或写记录,另一个来写入新锁。为减少负载,我们修改了Bigtable的API将两个RPC合并,将读-修改-写放在一个RPC中。按这个方法,我们会尽量将可以打包批处理的RPC调用都合并以减少RPC总数。比如将锁操作延缓几秒钟,使它们尽可能的聚集以被批处理。因为锁是并行获取的,所以每个事务仅仅增加了几秒的延迟;这附加的延迟可以用更强的并行来弥补。批处理增大了事务时窗,导致冲突可能性提高,但是通过有效的事务、通知机制,我们的环境中竞争并不强烈,所以不成问题。

We also perform the same batching when reading from the table: every read operation is delayed to give it a chance to form a batch with other reads to the same tablet server. This delays each read, potentially greatly increasing transaction latency. A final optimization mitigates this effect, however: prefetching. Prefetching takes advantage of the fact that reading two or more values in the same row is essentially the same cost as reading one value. In either case, Bigtable must read the entire SSTable block from the file system and decompress it. Percolator attempts to predict, each time a column is read, what other columns in a row will be read later in the transaction. This prediction is made based on past behavior. Prefetching, combined with a cache of items that have already been read, reduces the number of Bigtable reads the system would otherwise do by a factor of 10.

从table读取时我们也利用了批处理:每个读取操作都被延缓,从而有一定几率让相同tablet的读取操作打包成批处理(类似buffer的原理)。这就延迟了每次读取,可能会大大增加交易延迟。然而,最后一项优化可以减轻这种影响:预读取。预读取利用了这样一个事实:在同一行中读取两个或多个值与读取一个值的成本基本相同。在这两种情况下,Bigtable必须从文件系统中读取整个SSTable块并解压。Percolator试图预测,每次读取一个列的时候,在事务的后期,一行中的其他列会被读取。这种预测是基于过去的行为而做出的。预读取与已经读过的项目的缓存相结合,将系统对大表的读取次数减少了10倍。

Early in the implementation of Percolator, we decided to make all API calls blocking and rely on running thousands of threads per machine to provide enough parallelism to maintain good CPU utilization. We chose this thread-per-request model mainly to make application code easier to write, compared to the event-driven model. Forcing users to bundle up their state each of the (many) times they fetched a data item from the table would have made application development much more difficult. Our experience with thread-per-request was, on the whole, positive: application code is simple, we achieve good utilization on many-core machines, and crash debugging is simplified by meaningful and complete stack traces. We encountered fewer race conditions in application code than we feared. The biggest drawbacks of the approach were scalability issues in the Linux kernel and Google infrastructure related to high thread counts. Our in-house kernel development team was able to deploy fixes to address the kernel issues.

在之前的Percolator的实现中,所有API调用都会阻塞,然后通过调高每台机器的线程数量来支持高并发、提升CPU利用率。相比异步、事件驱动等方案,这种thread—per-request的同步模型的代码更易编写。异步方案需要花费大量精力维护上下文状态,导致应用开发更加困难。根据我们的实际经验,thread—per-request的同步模型还是可圈可点的,它的应用代码简单,多核机器CPU利用率也不错,同步调用下的堆栈跟踪也很方便调试,所遭遇的资源竞争也没有想象中那么恐怖。不过它的最大缺点是可伸缩性问题,linux内核、Google的各种基础设施在遭遇很高的线程数时往往导致瓶颈。不过我们有in-house内核开发小组来帮助解决内核问题。

评估

Percolator lies somewhere in the performance space between MapReduce and DBMSs. For example, because Percolator is a distributed system, it uses far more resources to process a fixed amount of data than a traditional DBMS would; this is the cost of its scalability. Compared to MapReduce, Percolator can process data with far lower latency, but again, at the cost of additional resources required to support random lookups. These are engineering tradeoffs which are difficult to quantify: how much of an efficiency loss is too much to pay for the ability to add capacity endlessly simply by purchasing more machines? Or: how does one trade off the reduction in development time provided by a layered system against the corresponding decrease in efficiency?

Percolator性能空间处于MapReduce和DBMS之间。例如,由于 Percolator是一个分布式系统,它在处理固定数量的数据时使用的资源比传统的DBMS要多得多;这是它的可扩展性的代价。与MapReduce相比,Percolator处理数据的延迟要低得多。滞后性要低得多,但同样是以支持随机查询所需的额外资源为代价的。这些都是很难量化的工程权衡,效率损失多少才算多?为了通过购买更多的机器来无休止地增加容量,效率损失多少才算过分?机器就能无休止地增加容量,这样的效率损失有多大?或者如何权衡分层系统所带来的开发时间的减少?分层系统所提供的开发时间与相应的效率下降之间如何权衡?效益的相应下降?

In this section we attempt to answer some of these questions by first comparing Percolator to batch processing systems via our experiences with converting a MapReduce-based indexing pipeline to use Percolator. We’ll also evaluate Percolator with microbenchmarks and a synthetic workload based on the well-known TPC-E benchmark [1]; this test will give us a chance to evaluate the scalability and efficiency of Percolator relative to Bigtable and DBMSs.

在本节中,我们试图回答其中的一些问题,首先通过我们将基于MapReduce的索引流水线转换为使用Percolator的经验,将Percolator与批处理系统进行比较。我们还将用微观基准和基于著名的TPC-E基准的合成工作负载来评估Percolator;这个测试将给我们一个机会来评估Percolator相对于Bigtable和DBMS的可扩展性和效率。

All of the experiments in this section are run on a subset of the servers in a Google data center. The servers run the Linux operating system on x86 processors; each machine is connected to several commodity SATA drives.

本节中的所有实验都是在谷歌数据中心的一个子集的服务器上运行的。这些服务器在X86处理器上运行Linux操作系统;每台机器都连接着几个商品SATA驱动器。

从MapReduce切换

We built Percolator to create Google’s large “base” index, a task previously performed by MapReduce. In our previous system, each day we crawled several billion documents and fed them along with a repository of existing documents through a series of 100 MapReduces. The result was an index which answered user queries. Though not all 100 MapReduces were on the critical path for every document, the organization of the system as a series of MapReduces meant that each document spent 2-3 days being indexed before it could be returned as a search result.

我们建立了Percolator来创建谷歌的大型 "基础 "索引,这项任务以前是由MapReduce完成的。在我们以前的系统中,我们每天抓取几十亿个文档,并通过一系列的100个MapReduces将它们与现有的文档库联系起来。虽然不是所有的100个MapReduces都在每个文档的关键路径上,但作为一系列MapReduces的系统组织意味着每个文档在作为搜索结果返回之前要花2-3天的时间进行索引。

The Percolator-based indexing system (known as Caffeine [25]), crawls the same number of documents,but we feed each document through Percolator as it is crawled. The immediate advantage, and main design goal, of Caffeine is a reduction in latency: the median document moves through Caffeine over 100x faster than the previous system. This latency improvement grows as the system becomes more complex: adding a new clustering phase to the Percolator-based system requires an extra lookup for each document rather an extra scan over the repository. Additional clustering phases can also be implemented in the same transaction rather than in another MapReduce; this simplification is one reason the number of observers in Caffeine (10) is far smaller than the number of MapReduces in the previous system (100). This organization also allows for the possibility of performing additional processing on only a subset of the repository without rescanning the entire repository.

基于Percolator的索引系统(称为Caffeine),抓取相同数量的文档,并将每个抓取到的文档喂给Percolator。Caffeine的直接优势和主要设计目标是减少延迟:通过Caffeine的速度比以前的系统快100倍以上。这种延迟的改善随着系统变得更加复杂而增长:在基于Percolator的系统中增加一个新的聚类阶段需要对每个文档进行额外的查找,而不是对资源库进行额外的扫描。额外的聚类阶段也可以在同一个事务中实现,而不是在另一个MapReduce中实现;这种简化是Caffeine中观察者的数量(10个)远远小于之前系统中MapReduces的数量(100个)的原因之一。这种组织方式还允许只对资源库的一个子集进行额外的处理,而无需重新扫描整个资源库。

Adding additional clustering phases isn’t free in an incremental system: more resources are required to make sure the system keeps up with the input, but this is still an improvement over batch processing systems where no amount of resources can overcome delays introduced by stragglers in an additional pass over the repository. Caffeine is essentially immune to stragglers that were a serious problem in our batch-based indexing system because the bulk of the processing does not get held up by a few very slow operations. The radically-lower latency of the new system also enables us to remove the rigid distinctions between large, slow-to-update indexes and smaller, more rapidly updated indexes. Because Percolator frees us from needing to process the repository each time we index documents, we can also make it larger: Caffeine’s document collection is currently 3x larger than the previous system’s and is limited only by available disk space.

在增量系统中,增加额外的聚类阶段并不是没有消耗的:需要更多的资源来确保系统跟上输入,但这仍然是批处理系统的一个改进,在批处理系统中,无论多少资源都不能克服掉队者(stragglers 应该指的是非常慢的操作)在储存库上的延迟。 Caffeine基本上不受掉队者的影响,而掉队者在我们基于批处理的索引系统中是一个严重的问题,因为大部分的处理不会被几个非常慢的操作耽搁。新系统的延迟大大降低,也使我们能够消除大型、更新缓慢的索引和小型、更新较快的索引之间的区别。因为Percolator使我们在每次索引文档时无需处理资源库,所以我们也可以使资源库更大。Caffeine的文档库目前比以前的系统大3倍,而且只受可用磁盘空间的限制。

Compared to the system it replaced, Caffeine uses roughly twice as many resources to process the same crawl rate. However, Caffeine makes good use of the extra resources. If we were to run the old indexing system with twice as many resources, we could either increase the index size or reduce latency by at most a factor of two (but not do both). On the other hand, if Caffeine were run with half the resources, it would not be able to process as many documents per day as the old system (but the documents it did produce would have much lower latency).

与它所取代的系统相比,Caffeine使用大约两倍的资源来处理相同的抓取率。然而,Caffeine很好地利用了这些额外的资源。如果我们用两倍的资源来运行旧的索引系统,我们要么增加索引的大小,要么最多减少两倍的延迟(但不能同时做到)。另一方面,如果Caffeine以一半的资源运行,它将不能像旧系统那样每天处理许多文件(但它所产生的文件的延迟将大大降低)。

The new system is also easier to operate. Caffeine has far fewer moving parts: we run tablet servers, Percolator workers, and chunkservers. In the old system, each of a hundred different MapReduces needed to be individually configured and could independently fail. Also, the “peaky” nature of the MapReduce workload made it hard to fully utilize the resources of a datacenter compared to Percolator’s much smoother resource usage.

新系统也更容易操作。Caffeine的活动部件少得多:我们运行tablet服务器、Percolator工作者和chunkservers。在旧系统中,一百个不同的MapReduces中的每一个都需要单独配置,并可能独立失败。另外,MapReduce工作负载的 "峰值 "特性使得它很难充分利用数据中心的资源,而Percolator的资源使用则要平稳得多。

The simplicity of writing straight-line code and the ability to do random lookups into the repository makes developing new features for Percolator easy. Under MapReduce, random lookups are awkward and costly. On the other hand, Caffeine developers need to reason about concurrency where it did not exist in the MapReduce paradigm. Transactions help deal with this concurrency, but can’t fully eliminate the added complexity.

编写代码的简单性和对资源库进行随机查找的能力使Percolator的新功能开发变得简单。在MapReduce下,随机查找是很难的,而且成本很高。另一方面,Caffeine开发者需要推理并发性,而在MapReduce范式中并不存在这种情况。事务有助于处理这种并发性,但不能完全消除增加的复杂性。

图7

To quantify the benefits of moving from MapReduce to Percolator, we created a synthetic benchmark that clusters newly crawled documents against a billiondocument repository to remove duplicates in much the same way Google’s indexing pipeline operates. Documents are clustered by three clustering keys. In a real system, the clustering keys would be properties of the document like redirect target or content hash, but in this experiment we selected them uniformly at random from a collection of 750M possible keys. The average cluster in our synthetic repository contains 3.3 documents, and 93% of the documents are in a non-singleton cluster. This distribution of keys exercises the clustering logic, but does not expose it to the few extremely large clusters we have seen in practice. These clusters only affect the latency tail and not the results we present here. In the Percolator clustering implementation, each crawled document is immediately written to the repository to be clustered by an observer. The observer maintains an index table for each clustering key and compares the document against each index to determine if it is a duplicate (an elaboration of Figure 2). MapReduce implements clustering of continually arriving documents by repeatedly running a sequence of three clustering MapReduces (one for each clustering key). The sequence of three MapReduces processes the entire repository and any crawled documents that accumulated while the previous three were running.

为了量化从MapReduce到Percolator的好处,我们创建了一个synthetic benchmark,将新抓取的文档与一个十亿级的文档库进行聚类,以去除重复的文档,其操作方式与谷歌的索引管道基本相同。文件是由三个聚类key聚在一起的。在一个真实的系统中,聚类key是文件的属性,如重定向目标或内容哈希,但在这个实验中,我们从7.5亿个可能的key集合中均匀地随机选择它们。在我们的合成资源库中,平均集群包含3.3个文档,93%的文档都在一个非单子集群中。这种key的分布提炼了聚类逻辑,但并没有将其暴露在我们在实践中看到的少数极端大的集群中。这些集群只影响到延迟的尾部,而不影响我们在这里提出的结果。在Percolator聚类的实现中,每个抓取的文档都被立即写入资源库,由observer进行聚类。observer为每个聚类键维护一个索引表,并将文档与每个索引进行比较,以确定它是否是重复的(对图2的阐述)。MapReduce通过重复运行三个聚类MapReduces的序列(每个聚类key一个)来实现对不断到达的文档的聚类。这三个MapReduces序列处理整个资源库和在前三个MapReduces运行时积累的任何爬行文档。

This experiment simulates clustering documents crawled at a uniform rate. Whether MapReduce or Percolator performs better under this metric is a function of the how frequently documents are crawled (the crawl rate) and the repository size. We explore this space by fixing the size of the repository and varying the rate at which new documents arrive, expressed as a percentage of the repository crawled per hour. In a practical system, a very small percentage of the repository would be crawled per hour: there are over 1 trillion web pages on the web (and ideally in an indexing system’s repository), far too many to crawl a reasonable fraction of in a single day. When the new input is a small fraction of the repository (low crawl rate), we expect Percolator to outperform MapReduce since MapReduce must map over the (large) repository to cluster the (small) batch of new documents while Percolator does work proportional only to the small batch of newly arrived documents (a lookup in up to three index tables per document). At very large crawl rates where the number of newly crawled documents approaches the size of the repository, MapReduce will perform better than Percolator. This cross-over occurs because streaming data from disk is much cheaper, per byte, than performing random lookups. At the cross-over the total cost of the lookups required to cluster the new documents under Percolator equals the cost to stream the documents and the repository through MapReduce. At crawl rates higher than that, one is better off using MapReduce.

这个实验模拟了对以统一速率抓取的文档进行聚类。在这个指标下,MapReduce或Percolator是否表现得更好,是文件抓取频率(抓取率)和资源库大小的函数。我们通过固定资源库的大小和改变新文件的到达率来探索这个空间,以每小时抓取的资源库的百分比表示。在一个实际运行的系统中,每小时抓取的文献库的百分比非常小:网络上有超过1万亿的网页(最好是在索引系统的文献库中),以致在一天内无法抓取合理的部分。当新输入的内容只占资源库的一小部分时(低抓取率),我们希望Percolator的性能优于MapReduce,因为MapReduce必须对(大)资源库进行映射,以对(小)批量的新文件进行聚类,而Percolator只对小批量的新到文件进行相应的工作(每个文件最多在三个索引表中进行查找)。在非常大的抓取率下,新抓取的文档数量接近资源库的大小,MapReduce会比Percolator表现更好。图7中交叉点出现的原因是,从磁盘上传输数据,每一个字节都比执行随机查找要便宜得多。在图7交叉点上,Percolator下对新文档进行聚类所需的查找总成本等于通过MapReduce对文档和资源库进行流式处理的成本。在抓取率高于这个数字时,最好使用MapReduce。

We ran this benchmark on 240 machines and measured the median delay between when a document is crawled and when it is clustered. Figure 7 plots the median latency of document processing for both implementations as a function of crawl rate. When the crawl rate is low, Percolator clusters documents faster than MapReduce as expected; this scenario is illustrated by the leftmost pair of points which correspond to crawling 1 percent of documents per hour. MapReduce requires approximately 20 minutes to cluster the documents because it takes 20 minutes just to process the repository through the three MapReduces (the effect of the few newly crawled documents on the runtime is negligible). This results in an average delay between crawling a document and clustering of around 30 minutes: a random document waits 10 minutes after being crawled for the previous sequence of MapReduces to finish and then spends 20 minutes being processed by the three MapReduces. Percolator, on the other hand, finds a newly loaded document and processes it in two seconds on average, or about 1000x faster than MapReduce. The two seconds includes the time to find the dirty notification and run the transaction that performs the clustering. Note that this 1000x latency improvement could be made arbitrarily large by increasing the size of the repository.

我们在240台机器上运行了这个benchmark,并测量了从抓取文档到聚类的中位延迟。图7显示了两种实现的文档处理的中位延迟与抓取率的关系。当抓取率较低时,Percolator比MapReduce更快地对文档进行聚类,这种情况由最左边的一对点说明,它们对应于每小时抓取1%的文档。MapReduce需要大约20分钟来对文档进行聚类,因为仅仅通过三个MapReduc来处理资源库就需要20分钟(少数新抓取的文档对运行时间的影响可以忽略不计)。这导致抓取文档和聚类之间的平均延迟为30分钟左右:一个随机文档在被抓取后等待10分钟,等待前一序列的MapReduces完成,然后花20分钟被三个MapReduces处理。另一方面,Percolator找到一个新加载的文档,并在平均两秒内完成处理,即比MapReduce快1000倍左右。这两秒包括找到脏通知和运行执行聚类的事务的时间。请注意,这个1000倍的延迟这个值会随着资源库的增大而增大。

As the crawl rate increases, MapReduce’s processing time grows correspondingly. Ideally, it would be proportional to the combined size of the repository and the input which grows with the crawl rate. In practice, the running time of a small MapReduce like this is limited by stragglers, so the growth in processing time (and thus clustering latency) is only weakly correlated to crawl rate at low crawl rates. The 6 percent crawl rate, for example, only adds 150GB to a 1TB data set; the extra time to process 150GB is in the noise. The latency of Percolator is relatively unchanged as the crawl rate grows until it suddenly increases to effectively infinity at a crawl rate of 40% per hour. At this point, Percolator saturates the resources of the test cluster, is no longer able to keep up with the crawl rate, and begins building an unbounded queue of unprocessed documents. The dotted asymptote at 40% is an extrapolation of Percolator’s performance beyond this breaking point. MapReduce is subject to the same effect: eventually crawled documents accumulate faster than MapReduce is able to cluster them, and the batch size will grow without bound in subsequent runs. In this particular configuration, however, MapReduce can sustain crawl rates in excess of 100% (the dotted line, again, extrapolates performance).

随着抓取率的增加,MapReduce的处理时间也会相应增长。理想情况下,它与资源库和输入的综合大小成正比,而资源库和输入是随着爬行率增长的。在实践中,像这样的小型MapReduce的运行时间会受到最差的任务的限制,所以在低爬行率下,处理时间的增长(也就是聚类延迟)只与爬行率弱相关。例如,6%的爬行率只为1TB的数据集增加了150GB;处理150GB的额外时间可以忽略不记。Percolator的延迟随着抓取率的增长而相对不变,直到它在每小时40%的抓取率下突然增加到有效的无限大。在这一点上,Percolator使测试集群的资源趋于饱和,不再能够跟上爬行速度,并开始建立一个无限制的未处理文件队列。40%的虚线渐近线是对Percolator在这个突破点之后的性能的推断。MapReduce也会受到同样的影响:最终抓取的文档积累速度超过了MapReduce对其进行聚类的速度,在随后的运行中,批处理量会无限制地增长。然而,在这个特定的配置中,MapReduce可以维持超过100%的抓取率(虚线,同样是推断出的性能)。

These results show that Percolator can process documents at orders of magnitude better latency than MapReduce in the regime where we expect real systems to operate (single-digit crawl rates).

这些结果表明,在我们期望真实系统运行的情况下(个位数的抓取率),Percolator处理文档的延迟比MapReduce好几个数量级。

Microbenchmarks

In this section, we determine the cost of the transactional semantics provided by Percolator. In these experiments, we compare Percolator to a “raw” Bigtable. We are only interested in the relative performance of Bigtable and Percolator since any improvement in Bigtable performance will translate directly into an improvement in Percolator performance. Figure 8 shows the performance of Percolator and raw Bigtable running against a single tablet server. All data was in the tablet server’s cache during the experiments and Percolator’s batching optimizations were disabled.

在本节中,我们确定了Percolator提供的事务语义的成本。在这些实验中,我们将Percolator与 "原始 "Bigtable进行比较。我们只对Bigtable和Percolator的相对性能感兴趣,因为Bigtable性能的任何改善都将直接转化为Percolator性能的改善。图8显示了Percolator和原始Bigtable在tablet服务器上运行的性能。在实验过程中,所有数据都在tablet服务器的缓存中,Percolator的批处理优化功能被禁用。

图8

As expected, Percolator introduces overhead relative to Bigtable. We first measure the number of random writes that the two systems can perform. In the case of Percolator, we execute transactions that write a single cell and then commit; this represents the worst case for Percolator overhead. When doing a write, Percolator incurs roughly a factor of four overhead on this benchmark. This is the result of the extra operations Percolator requires for commit beyond the single write that Bigtable issues: a read to check for locks, a write to add the lock, and a second write to remove the lock record. The read, in particular, is more expensive than a write and accounts for most of the overhead. In this test, the limiting factor was the performance of the tablet server, so the additional overhead of fetching timestamps is not measured. We also tested random reads: Percolator performs a single Bigtable operation per read, but that read operation is somewhat more complex than the raw Bigtable operation (the Percolator read looks at metadata columns in addition to data columns).

正如预期的那样,相对于Bigtable,Percolator资源开销较大。我们首先测量这两个系统可以执行的随机写的数量。在Percolator的情况下,我们执行写一个单元的事务,然后提交;这代表了Percolator开销的最坏情况。当进行写操作时,Percolator在这个benchmark上产生了大约四倍的开销。这是由于Percolator在提交时需要进行额外的操作,而不是Bigtable的单一写操作:读要检查锁,写要加锁,以及第二次写要删除锁记录。特别是读,比写开销更大,占了大部分的开销。在这个测试中,限制因素是tablet服务器的性能,所以没有测量获取时间戳的额外开销。我们还测试了随机读取。Percolator每次读取都会执行一次Bigtable操作,但该读取操作比原始的Bigtable操作要复杂一些(Percolator的读取除了数据列外,还会查看元数据列)。

Synthetic 工作负载

To evaluate Percolator on a more realistic workload, we implemented a synthetic benchmark based on TPC-E [1]. This isn’t the ideal benchmark for Percolator since TPC-E is designed for OLTP systems, and a number of Percolator’s tradeoffs impact desirable properties of OLTP systems (the latency of conflicting transactions, for example). TPC-E is a widely recognized and understood benchmark, however, and it allows us to understand the cost of our system against more traditional databases.

为了在一个更真实的工作负载上评估Percolator,我们实现了一个基于TPC-E的 synthetic benchmark 。这并不是Percolator的理想benchmark,因为TPC-E是为OLTP系统设计的,而Percolator的一些权衡影响了OLTP系统的理想特性(例如,冲突事务的延迟)。然而,TPC-E是一个被广泛认可和理解的基准,它使我们能够了解我们的系统相对于更传统的数据库的成本。

TPC-E simulates a brokerage firm with customers who perform trades, market search, and account inquiries. The brokerage submits trade orders to a market exchange, which executes the trade and updates broker and customer state. The benchmark measures the number of trades executed. On average, each customer performs a trade once every 500 seconds, so the benchmark scales by adding customers and associated data.

TPC-E模拟了一个有客户的经纪公司,客户进行交易、市场搜索和账户查询。经纪公司向市场交易所提交交易指令,交易所执行交易并更新经纪人和客户状态。benchmark衡量执行的交易数量。平均而言,每个客户每500秒执行一次交易,因此该benchmark通过增加客户和相关数据来进行扩展。

TPC-E traditionally has three components – a customer emulator, a market emulator, and a DBMS running stored SQL procedures. Since Percolator is a client library running against Bigtable, our implementation is a combined customer/market emulator that calls into the Percolator library to perform operations against Bigtable. Percolator provides a low-level Get/Set/iterator API rather than a high-level SQL interface, so we created indexes and did all the ‘query planning’ by hand.

TPC-E传统上有三个组成部分–客户模拟器、市场模拟器和运行存储SQL程序的DBMS。由于Percolator是一个针对Bigtable运行的客户端库,我们的实现是一个客户/市场模拟器的组合,调用Percolator库来执行针对Bigtable的操作。Percolator提供了一个低级别的Get/Set/iterator API,而不是高级别的SQL接口,所以我们创建了索引并手工完成了所有的 "查询计划"。

Since Percolator is an incremental processing system rather than an OLTP system, we don’t attempt to meet the TPC-E latency targets. Our average transaction latency is 2 to 5 seconds, but outliers can take several minutes. Outliers are caused by, for example, exponential backoff on conflicts and Bigtable tablet unavailability. Finally, we made a small modification to the TPC-E transactions. In TPC-E, each trade result increases the broker’s commission and increments his trade count. Each broker services a hundred customers, so the average broker must be updated once every 5 seconds, which causes repeated write conflicts in Percolator. In Percolator, we would implement this feature by writing the increment to a side table and periodically aggregating each broker’s increments; for the benchmark, we choose to simply omit this write.

由于Percolator是一个增量处理系统,而不是一个OLTP系统,我们并不试图满足TPC-E的延迟目标。我们的平均交易延迟是2到5秒,但异常时可能需要几分钟。异常值是由指数冲突的后退和Bigtable tablet的不可用造成的。最后,我们对TPC-E的交易做了一个小小的修改。在TPC-E中,每个交易结果都会增加经纪人的佣金并增加他的交易数量。每个经纪人为一百个客户提供服务,所以平均每5秒就必须更新一次经纪人,这就造成了Percolator中的重复写冲突。在Percolator中,我们将通过把增量写入一个边表并定期汇总每个经纪人的增量来实现这一功能;对于这个benchmark,我们选择简单地省略这一写入。

Figure 9 shows how the resource usage of Percolator scales as demand increases. We will measure resource usage in CPU cores since that is the limiting resource in our experimental environment. We were able to procure a small number of machines for testing, but our test Bigtable cell shares the disk resources of a much larger production cluster. As a result, disk bandwidth is not a factor in the system’s performance. In this experiment, we configured the benchmark with increasing numbers of customers and measured both the achieved performance and the number of cores used by all parts of the system including cores used for background maintenance such as Bigtable compactions. The relationship between performance and resource usage is essentially linear across several orders of magnitude, from 11 cores to 15,000 cores.

图9显示了Percolator的资源使用量是如何随着需求的增加而变化的。我们将测量CPU核心的资源使用情况,因为这是我们实验环境中的限制性资源。我们能够采购少量的机器进行测试,但我们的测试Bigtable单元共享一个大得多的生产集群的磁盘资源。因此,磁盘带宽并不是影响系统性能的一个因素。在这个实验中,我们用越来越多的客户配置了基准,并测量了实现的性能和系统所有部分使用的内核数量,包括用于后台维护的内核,如Bigtable压缩。性能和资源使用之间的关系在几个数量级上基本上是线性的,从11个内核到15,000个内核。

图9

This experiment also provides an opportunity to measure the overheads in Percolator relative to a DBMS. The fastest commercial TPC-E system today performs 3,183 tpsE using a single large shared-memory machine with 64 Intel Nehalem cores with 2 hyperthreads per core [33]. Our synthetic benchmark based on TPC-E performs 11,200 tps using 15,000 cores. This comparison is very rough: the Nehalem cores in the comparison machine are significantly faster than the cores in our test cell (small-scale testing on Nehalem processors shows that they are 20-30% faster per-thread compared to the cores in the test cluster). However, we estimate that Percolator uses roughly 30 times more CPU per transaction than the benchmark system. On a cost-per-transaction basis, the gap is likely much less than 30 since our test cluster uses cheaper, commodity hardware compared to the enterprise-class hardware in the reference machine.

这个实验也提供了一个机会来衡量Percolator相对于DBMS的开销。目前最快的商业TPC-E系统拥有3,183 tpsE处理能力,使用一台具有64个英特尔Nehalem核心、每个核心有2个超线程的大型共享内存机器。我们基于TPC-E的合成基准使用15,000个内核执行了11,200 tps。这种比较非常粗略:对比机器中的Nehalem内核明显比我们测试单元中的内核快(在Nehalem处理器上的小规模测试显示,与测试集群中的内核相比,它们的每线程速度快20-30%)。然而,我们估计Percolator每个事务使用的CPU大约是benchmark系统的30倍。从每个事务的成本来看,差距可能远小于30,因为我们的测试集群与参考机器中的企业级硬件相比,使用了更便宜的商品硬件。

The conventional wisdom on implementing databases is to “get close to the iron” and use hardware as directly as possible since even operating system structures like disk caches and schedulers make it hard to implement an efficient database [32]. In Percolator we not only interposed an operating system between our database and the hardware, but also several layers of software and network links. The conventional wisdom is correct: this arrangement has a cost. There are substantial overheads in preparing requests to go on the wire, sending them, and processing them on a remote machine. To illustrate these overheads in Percolator, consider the act of mutating the database. In a DBMS, this incurs a function call to store the data in memory and a system call to force the log to hardware controlled RAID array. In Percolator, a client performing a transaction commit sends multiple RPCs to Bigtable, which commits the mutation by logging it to 3 chunkservers, which make system calls to actually flush the data to disk. Later, that same data will be compacted into minor and major sstables, each of which will be again replicated to multiple chunkservers.

实现数据库最好是尽可能直接使用硬件,因为即使是像磁盘缓存和调度器这样的操作系统结构也很难实现一个高效的数据库。在Percolator中,我们不仅在数据库和硬件之间穿插了一个操作系统,而且还穿插了几层软件和网络链接。这种设计是有成本的。在准备请求上线、发送请求以及在远程机器上处理请求的过程中,存在大量的开销。为了说明Percolator中的这些开销,请考虑突变数据库的行为。在DBMS中,这需要一个函数调用来将数据存储在内存中,还需要一个系统调用来将日志强制到硬件控制的RAID阵列中。在Percolator中,执行事务提交的客户端向Bigtable发送多个RPC,Bigtable将突变记录到3个chunkservers,这些chunkservers进行系统调用,将数据刷入磁盘。之后,同样的数据将被压缩成小表和大表,每个表将再次被复制到多个chunkservers。

图10

The CPU inflation factor is the cost of our layering. In exchange, we get scalability (our fastest result, though not directly comparable to TPC-E, is more than 3x the current official record [33]), and we inherit the useful features of the systems we build upon, like resilience to failures. To demonstrate the latter, we ran the benchmark with 15 tablet servers and allowed the performance to stabilize. Figure 10 shows the performance of the system over time. The dip in performance at 17:09 corresponds to a failure event: we killed a third of the tablet servers. Performance drops immediately after the failure event but recovers as the tablets are reloaded by other tablet servers. We allowed the killed tablet servers to restart so performance eventually returns to the original level.

CPU膨胀因素是我们分层的成本。作为交换,我们得到了可扩展性(我们最快的结果,虽然不能直接与TPC-E相提并论,但也是目前官方记录的3倍多) ,并且我们继承了我们所建立的系统的有用的系统的有用功能,比如对故障的恢复能力。 为了证明后者,我们用15台tablet服务器进行了benchmark,并让性能稳定下来。图10显示了系统的性能随时间的变化。17:09的性能下降对应的是一个故障事件:我们杀死了三分之一的tablet服务器。故障事件发生后,性能立即下降,但随着tablet被其他tablet服务器重新加载,性能又恢复了。我们允许被杀死的tablet服务器重新启动,所以性能最终恢复到原来的水平。

结论

We have built and deployed Percolator and it has been used to produce Google’s websearch index since April, 2010. The system achieved the goals we set for reducing the latency of indexing a single document with an acceptable increase in resource usage compared to the previous indexing system.

我们已经建立并部署了Percolator,自2010年4月以来,它一直被用来制作谷歌的网络搜索索引。与之前的索引系统相比,该系统实现了我们设定的降低单个文档索引延迟的目标,而资源使用量的增加是可以接受的。

The TPC-E results suggest a promising direction for future investigation. We chose an architecture that scales linearly over many orders of magnitude on commodity machines, but we’ve seen that this costs a significant 30- fold overhead compared to traditional database architectures. We are very interested in exploring this tradeoff and characterizing the nature of this overhead: how much is fundamental to distributed storage systems, and how much can be optimized away?

TPC-E的结果为未来的研究提出了一个有希望的方向。我们选择了一个在商品机器上可以线性扩展的架构,但是我们看到,与传统的数据库架构相比,这需要付出30倍的开销。我们对探索这种权衡和描述这种开销的性质非常感兴趣:有多少是分布式存储系统的基本要素,有多少可以被优化掉?

Reference

https://storage.googleapis.com/pub-tools-public-publication-data/pdf/36726.pdf

https://karellincoln.github.io/2018/04/05/percolator-translate/

扫码_搜索联合传播样式-白色版 1