第六章。分区
Chapter 6. Partitioning
Clearly, we must break away from the sequential and not limit the computers. We must state definitions and provide for priorities and descriptions of data. We must state relationships, not procedures.
显然,我们必须摆脱线性思维并不限制计算机。我们必须明确定义和优先级,并描述数据。我们必须声明关系,而不是程序。
Grace Murray Hopper, Management and the Computer of the Future (1962)
格雷斯·默里·霍珀,《管理和未来的计算机》(1962年)。
In Chapter 5 we discussed replication—that is, having multiple copies of the same data on different nodes. For very large datasets, or very high query throughput, that is not sufficient: we need to break the data up into partitions , also known as sharding . i
第5章讨论了复制——也就是在不同节点上有相同数据的多个副本。对于非常大的数据集,或非常高的查询吞吐量来说,这还不足以满足需求:我们需要将数据分成分区,也被称为分片。
Terminological confusion
What we call a partition here is called a shard in MongoDB, Elasticsearch, and SolrCloud; it’s known as a region in HBase, a tablet in Bigtable, a vnode in Cassandra and Riak, and a vBucket in Couchbase. However, partitioning is the most established term, so we’ll stick with that.
在这里我们所称的“分区”,在MongoDB、Elasticsearch和SolrCloud中被称为“分片”;在HBase中被称为“区域”,在Bigtable中被称为“表格”,在Cassandra和Riak中被称为“虚拟节点”,在Couchbase中被称为“vBucket”。然而,“分区”是最常用的术语,所以我们将坚持使用它。
Normally, partitions are defined in such a way that each piece of data (each record, row, or document) belongs to exactly one partition. There are various ways of achieving this, which we discuss in depth in this chapter. In effect, each partition is a small database of its own, although the database may support operations that touch multiple partitions at the same time.
通常情况下,分区是按照每个数据片段(每个记录,行或文档)恰好属于一个分区的方式来定义。有多种方法可以实现这一点,本章节我们将深入探讨。实际上,每个分区本身都是一个小型数据库,尽管数据库可能支持同时涉及多个分区的操作。
The main reason for wanting to partition data is scalability . Different partitions can be placed on different nodes in a shared-nothing cluster (see the introduction to Part II for a definition of shared nothing ). Thus, a large dataset can be distributed across many disks, and the query load can be distributed across many processors.
分区数据的主要原因是为了可扩展性。不同的分区可以放置在共享无处不在的集群中的不同节点上(有关共享无处不在的定义,请参见第二部分的介绍)。因此,大型数据集可以分布在许多磁盘上,并且查询负载可以分布在许多处理器上。
For queries that operate on a single partition, each node can independently execute the queries for its own partition, so query throughput can be scaled by adding more nodes. Large, complex queries can potentially be parallelized across many nodes, although this gets significantly harder.
针对仅操作单个分区的查询,每个节点可以独立执行其自己分区的查询,因此可以通过添加更多节点来扩展查询吞吐量。尽管这变得更加困难,但大型复杂查询可能会在许多节点上并行化。
Partitioned databases were pioneered in the 1980s by products such as Teradata and Tandem NonStop SQL [ 1 ], and more recently rediscovered by NoSQL databases and Hadoop-based data warehouses. Some systems are designed for transactional workloads, and others for analytics (see “Transaction Processing or Analytics?” ): this difference affects how the system is tuned, but the fundamentals of partitioning apply to both kinds of workloads.
分区数据库最早在20世纪80年代由Teradata和Tandem NonStop SQL等产品开创[1],最近又被NoSQL数据库和基于Hadoop的数据仓库重新发现。一些系统适用于事务工作负载,而另一些则适用于分析(参见“事务处理还是分析?”):这种差异会影响系统的调整,但分区的基础知识适用于两种类型的工作负载。
In this chapter we will first look at different approaches for partitioning large datasets and observe how the indexing of data interacts with partitioning. We’ll then talk about rebalancing, which is necessary if you want to add or remove nodes in your cluster. Finally, we’ll get an overview of how databases route requests to the right partitions and execute queries.
在本章节中,我们首先将探讨对于大型数据集的不同分区方法,并观察数据索引与分区之间的交互作用。然后我们将谈论重新平衡,如果您想在群集中添加或删除节点,则必须进行重新平衡。最后,我们将概述数据库如何将请求路由到正确的分区并执行查询。
Partitioning and Replication
Partitioning is usually combined with replication so that copies of each partition are stored on multiple nodes. This means that, even though each record belongs to exactly one partition, it may still be stored on several different nodes for fault tolerance.
通常,分区会与复制相结合,以便将每个分区的副本存储在多个节点上。这意味着,即使每个记录仅属于一个分区,它仍可能存储在多个不同的节点上以实现容错。
A node may store more than one partition. If a leader–follower replication model is used, the combination of partitioning and replication can look like Figure 6-1 . Each partition’s leader is assigned to one node, and its followers are assigned to other nodes. Each node may be the leader for some partitions and a follower for other partitions.
一个节点可以存储多个分区。如果使用领导者-追随者复制模型,分区和复制的组合可能会看起来像图6-1所示的那样。每个分区的领导者分配给一个节点,其追随者分配给其他节点。每个节点可以是某些分区的领导者,也可以是其他分区的追随者。
Everything we discussed in Chapter 5 about replication of databases applies equally to replication of partitions. The choice of partitioning scheme is mostly independent of the choice of replication scheme, so we will keep things simple and ignore replication in this chapter.
我们在第5章中讨论的有关数据库复制的所有内容同样适用于分区复制。分区方案的选择大多与复制方案无关,因此我们将保持简单,忽略本章中的复制。
Partitioning of Key-Value Data
Say you have a large amount of data, and you want to partition it. How do you decide which records to store on which nodes?
如果您有大量的数据需要进行分区,您将如何决定将哪些记录存储在哪些节点上?
Our goal with partitioning is to spread the data and the query load evenly across nodes. If every node takes a fair share, then—in theory—10 nodes should be able to handle 10 times as much data and 10 times the read and write throughput of a single node (ignoring replication for now).
我们分区的目标是将数据和查询负载均匀地分散到各个节点上。如果每个节点都承担公平的份额,那么理论上,10个节点应该能够处理单个节点的10倍数据量和10倍读写吞吐量(暂不考虑复制)。
If the partitioning is unfair, so that some partitions have more data or queries than others, we call it skewed . The presence of skew makes partitioning much less effective. In an extreme case, all the load could end up on one partition, so 9 out of 10 nodes are idle and your bottleneck is the single busy node. A partition with disproportionately high load is called a hot spot .
如果分区不公平,某些分区的数据或查询比其他分区多,我们称之为倾斜。存在倾斜会使分区变得不太有效。在极端情况下,所有负载可能都集中在一个分区上,因此有9个节点空闲,而您的瓶颈是单个繁忙节点。负载不成比例的分区称为热点。
The simplest approach for avoiding hot spots would be to assign records to nodes randomly. That would distribute the data quite evenly across the nodes, but it has a big disadvantage: when you’re trying to read a particular item, you have no way of knowing which node it is on, so you have to query all nodes in parallel.
避免热点的最简单方法是将记录随机分配到节点上。这将在节点之间相当均匀地分布数据,但它有一个很大的缺点:当您尝试读取特定项目时,您无法知道它在哪个节点上,因此必须并行查询所有节点。
We can do better. Let’s assume for now that you have a simple key-value data model, in which you always access a record by its primary key. For example, in an old-fashioned paper encyclopedia, you look up an entry by its title; since all the entries are alphabetically sorted by title, you can quickly find the one you’re looking for.
我们可以做得更好。现在假设你有一个简单的键值数据模型,其中你总是根据主键访问记录。例如,在旧式纸质百科全书中,你通过标题查找条目;由于所有条目都按标题字母顺序排序,因此你可以迅速找到需要的条目。
Partitioning by Key Range
One way of partitioning is to assign a continuous range of keys (from some minimum to some maximum) to each partition, like the volumes of a paper encyclopedia ( Figure 6-2 ). If you know the boundaries between the ranges, you can easily determine which partition contains a given key. If you also know which partition is assigned to which node, then you can make your request directly to the appropriate node (or, in the case of the encyclopedia, pick the correct book off the shelf).
一种分区的方式是将一段连续的键(从某个最小到某个最大)分配给每个分区,就像一本书的卷数一样(图6-2)。如果你知道范围之间的界限,就可以很容易地确定哪个分区包含一个给定的键。如果你还知道哪个分区分配给哪个节点,那么你就可以直接向适当的节点发出请求(或者,在百科全书的情况下,从架子上取出正确的书)。
The ranges of keys are not necessarily evenly spaced, because your data may not be evenly distributed. For example, in Figure 6-2 , volume 1 contains words starting with A and B, but volume 12 contains words starting with T, U, V, X, Y, and Z. Simply having one volume per two letters of the alphabet would lead to some volumes being much bigger than others. In order to distribute the data evenly, the partition boundaries need to adapt to the data.
密钥范围不一定均匀分布,因为您的数据可能不是均匀分布的。例如,在图6-2中,卷1包含以A和B开头的单词,但是卷12包含以T、U、V、X、Y和Z开头的单词。仅将字母表每两个字母一个卷的方式将导致一些卷比其他卷大得多。为了均匀分布数据,分区边界需要根据数据进行调整。
The partition boundaries might be chosen manually by an administrator, or the database can choose them automatically (we will discuss choices of partition boundaries in more detail in “Rebalancing Partitions” ). This partitioning strategy is used by Bigtable, its open source equivalent HBase [ 2 , 3 ], RethinkDB, and MongoDB before version 2.4 [ 4 ].
分区边界可以由管理员手动选择,也可以被数据库自动选择(我们将在“重新平衡分区”的更多细节中讨论分区边界的选择)。这种分区策略被Bigtable、它的开源版本HBase [2, 3]、RethinkDB和MongoDB 2.4版本之前使用[4]。
Within each partition, we can keep keys in sorted order (see “SSTables and LSM-Trees” ). This has the advantage that range scans are easy, and you can treat the key as a concatenated index in order to fetch several related records in one query (see “Multi-column indexes” ). For example, consider an application that stores data from a network of sensors, where the key is the timestamp of the measurement ( year-month-day-hour-minute-second ). Range scans are very useful in this case, because they let you easily fetch, say, all the readings from a particular month.
在每个分区内,我们可以按排序顺序保留键(参见“SSTables 和 LSM-Trees”)。这样可以轻松进行范围扫描,并且您可以将键视为连接的索引,以便在一个查询中获取几个相关记录(参见“多列索引”)。例如,考虑一个存储来自传感器网络数据的应用程序,其中键是测量的时间戳(年-月-日-小时-分钟-秒)。在这种情况下,范围扫描非常有用,因为它们可以让您轻松获取一个特定月份的所有读数。
However, the downside of key range partitioning is that certain access patterns can lead to hot spots. If the key is a timestamp, then the partitions correspond to ranges of time—e.g., one partition per day. Unfortunately, because we write data from the sensors to the database as the measurements happen, all the writes end up going to the same partition (the one for today), so that partition can be overloaded with writes while others sit idle [ 5 ].
然而,键值范围分区的缺点在于某些访问模式可能会导致热点问题。如果键是时间戳,则分区对应于时间范围-例如,每天一个分区。不幸的是,由于我们将传感器数据写入数据库时,所有写操作最终都会进入同一个分区(即今天的分区),因此该分区可能会被写入压力过大而其他分区处于空闲状态[5]。
To avoid this problem in the sensor database, you need to use something other than the timestamp as the first element of the key. For example, you could prefix each timestamp with the sensor name so that the partitioning is first by sensor name and then by time. Assuming you have many sensors active at the same time, the write load will end up more evenly spread across the partitions. Now, when you want to fetch the values of multiple sensors within a time range, you need to perform a separate range query for each sensor name.
为了避免传感器数据库中出现这个问题,你需要使用不同于时间戳作为键的第一个元素的东西。例如,你可以给每个时间戳添加传感器名称前缀,以便按传感器名称和时间进行分区。假设你有许多传感器同时活动,写入负载将更均匀地分布在分区中。现在,当你想要获取某个时间范围内多个传感器的值时,你需要为每个传感器名称执行一个单独的范围查询。
Partitioning by Hash of Key
Because of this risk of skew and hot spots, many distributed datastores use a hash function to determine the partition for a given key.
由于存在偏斜和热点的风险,许多分布式数据存储系统使用散列函数来确定给定键的分区。
A good hash function takes skewed data and makes it uniformly distributed. Say you have a 32-bit hash function that takes a string. Whenever you give it a new string, it returns a seemingly random number between 0 and 2 32 − 1. Even if the input strings are very similar, their hashes are evenly distributed across that range of numbers.
一个好的哈希函数将偏斜的数据转化为均匀分布的。假设你有一个32位哈希函数,它接收一个字符串。无论何时你给它一个新的字符串,它返回的数字看起来都是0到232 − 1之间的随机数。即使输入字符串非常相似,它们的哈希在那个数字范围内也是均匀分布的。
For partitioning purposes, the hash function need not be cryptographically strong: for example,
Cassandra and MongoDB use MD5, and Voldemort uses the Fowler–Noll–Vo function. Many programming
languages have simple hash functions built in (as they are used for hash tables), but they may not
be suitable for partitioning: for example, in Java’s
Object.hashCode()
and Ruby’s
Object#hash
,
the same key may have a different hash value in different processes
[
6
].
为了分区目的,哈希函数不需要具有密码学强度:例如,Cassandra和MongoDB使用MD5,Voldemort使用Fowler-Noll-Vo函数。许多编程语言都内置了简单的哈希函数(因为它们用于哈希表),但它们可能不适合分区:例如,在Java的Object.hashCode()和Ruby的Object#hash中,同一个键在不同的进程中可能具有不同的哈希值 [6]。
Once you have a suitable hash function for keys, you can assign each partition a range of hashes (rather than a range of keys), and every key whose hash falls within a partition’s range will be stored in that partition. This is illustrated in Figure 6-3 .
一旦你为键准备了一个合适的哈希函数,就可以将每个分区分配一个哈希范围(而不是一个键的范围),每个哈希落在分区范围内的键都将存储在该分区中。如图6-3所示。
This technique is good at distributing keys fairly among the partitions. The partition boundaries can be evenly spaced, or they can be chosen pseudorandomly (in which case the technique is sometimes known as consistent hashing ).
该技术能够在分区间公平分配密钥。分区边界可以均匀间隔,也可以伪随机选择(此时该技术有时被称为一致性哈希)。
Unfortunately however, by using the hash of the key for partitioning we lose a nice property of key-range partitioning: the ability to do efficient range queries. Keys that were once adjacent are now scattered across all the partitions, so their sort order is lost. In MongoDB, if you have enabled hash-based sharding mode, any range query has to be sent to all partitions [ 4 ]. Range queries on the primary key are not supported by Riak [ 9 ], Couchbase [ 10 ], or Voldemort.
但是,通过使用密钥的哈希进行分区,我们失去了一种良好的键范围分区特性:进行高效的范围查询。曾经相邻的键现在分散在所有分区中,因此它们的排序顺序被打乱了。在MongoDB中,如果启用了基于哈希的分片模式,则任何范围查询都必须发送到所有分区。Riak [9],Couchbase [10]或Voldemort不支持基于主键的范围查询。
Cassandra achieves a compromise between the two partitioning strategies [ 11 , 12 , 13 ]. A table in Cassandra can be declared with a compound primary key consisting of several columns. Only the first part of that key is hashed to determine the partition, but the other columns are used as a concatenated index for sorting the data in Cassandra’s SSTables. A query therefore cannot search for a range of values within the first column of a compound key, but if it specifies a fixed value for the first column, it can perform an efficient range scan over the other columns of the key.
Cassandra在两个分区策略之间取得了折衷。Cassandra中的一个表可以使用由多个列组成的复合主键声明。只有该键的第一部分被哈希以确定分区,其他列用作Cassandra SSTables中数据排序的连接索引。因此,查询无法在复合键的第一列中搜索值的范围,但如果它为第一列指定一个固定值,则可以在键的其他列上执行有效的范围扫描。
The concatenated index approach enables an elegant data model for one-to-many relationships. For
example, on a social media site, one user may post many updates. If the primary key for updates is
chosen to be
(user_id, update_timestamp)
, then you can efficiently retrieve all updates made by a
particular user within some time interval, sorted by timestamp. Different users may be stored on
different partitions, but within each user, the updates are stored ordered by timestamp on a single
partition.
连接索引方法可以为一对多的关系提供优雅的数据模型。例如,在社交媒体站点上,一个用户可以发布许多更新。如果更新的主键选择为(用户ID、更新时间戳),则可以有效地检索某个时间间隔内某个特定用户所作的所有更新,按时间戳排序。不同的用户可以存储在不同的分区中,但在每个用户内,更新以时间戳在单个分区中排序存储。
Skewed Workloads and Relieving Hot Spots
As discussed, hashing a key to determine its partition can help reduce hot spots. However, it can’t avoid them entirely: in the extreme case where all reads and writes are for the same key, you still end up with all requests being routed to the same partition.
如上所述,通过对键进行哈希以确定其分区可以帮助减少热点。但是,它无法完全避免热点问题:在所有读写都针对同一个密钥的极端情况下,仍然需要将所有请求路由到同一个分区。
This kind of workload is perhaps unusual, but not unheard of: for example, on a social media site, a celebrity user with millions of followers may cause a storm of activity when they do something [ 14 ]. This event can result in a large volume of writes to the same key (where the key is perhaps the user ID of the celebrity, or the ID of the action that people are commenting on). Hashing the key doesn’t help, as the hash of two identical IDs is still the same.
这种工作负载可能不太常见,但并非闻所未闻:例如,在社交媒体网站上,拥有数百万粉丝的名人用户可能会在他们做某事时引起一场风暴活动[14]。这种事件可能会导致对相同密钥进行大量写入(其中密钥可能是名人的用户ID或人们正在评论的行动的ID)。对密钥进行散列处理并没有帮助,因为相同ID的散列仍然相同。
Today, most data systems are not able to automatically compensate for such a highly skewed workload, so it’s the responsibility of the application to reduce the skew. For example, if one key is known to be very hot, a simple technique is to add a random number to the beginning or end of the key. Just a two-digit decimal random number would split the writes to the key evenly across 100 different keys, allowing those keys to be distributed to different partitions.
今天,大多数数据系统不能自动补偿如此高度偏斜的工作量,因此应用程序需要减少偏斜的责任。例如,如果一个键被认为是非常“热门”的,则一个简单的技巧是在键的开头或结尾添加一个随机数。只需一个两位数的十进制随机数,就可以将写入键的操作均匀分布到100个不同的键中,使这些键分布到不同的分区。
However, having split the writes across different keys, any reads now have to do additional work, as they have to read the data from all 100 keys and combine it. This technique also requires additional bookkeeping: it only makes sense to append the random number for the small number of hot keys; for the vast majority of keys with low write throughput this would be unnecessary overhead. Thus, you also need some way of keeping track of which keys are being split.
然而,将写操作分散到不同的键中后,任何读取操作现在都需要进行额外的工作,因为它们必须从所有100个键中读取数据并合并它们。这种技术还需要额外的记账:仅对小量的热键添加随机数才有意义;而对于大多数写吞吐量较低的键来说,这将是不必要的开销。因此,您还需要一些方式来跟踪哪些键正在被拆分。
Perhaps in the future, data systems will be able to automatically detect and compensate for skewed workloads; but for now, you need to think through the trade-offs for your own application.
也许在未来,数据系统能够自动检测并补偿不平衡的工作负载;但现在,你需要为你自己的应用程序权衡利弊。
Partitioning and Secondary Indexes
The partitioning schemes we have discussed so far rely on a key-value data model. If records are only ever accessed via their primary key, we can determine the partition from that key and use it to route read and write requests to the partition responsible for that key.
到目前为止,我们讨论的分区方案都依赖于键值数据模型。如果记录只通过其主键访问,我们可以从该键确定分区,并使用它将读写请求路由到负责该键的分区。
The situation becomes more complicated if secondary indexes are involved (see also
“Other Indexing Structures”
). A secondary index usually doesn’t identify a record uniquely but
rather is a way of searching for occurrences of a particular value: find all actions by user
123
, find all articles containing the word
hogwash
, find all cars whose color is
red
, and so
on.
如果涉及到次要索引(另请参见“其他索引结构”),情况会变得更加复杂。次要索引通常不能唯一地标识记录,而是一种搜索特定值出现的方式:查找用户123执行的所有操作、查找包含“胡言乱语”一词的所有文章、查找颜色为红色的所有汽车等等。
Secondary indexes are the bread and butter of relational databases, and they are common in document databases too. Many key-value stores (such as HBase and Voldemort) have avoided secondary indexes because of their added implementation complexity, but some (such as Riak) have started adding them because they are so useful for data modeling. And finally, secondary indexes are the raison d’être of search servers such as Solr and Elasticsearch.
次要索引是关系型数据库的基础,也在文档数据库中很常见。许多键值存储(例如HBase和Voldemort)由于增加了实现的复杂性而避免了二级索引,但一些(例如Riak)已经开始添加它们,因为它们对于数据建模非常有用。最后,次要索引是搜索服务器(例如Solr和Elasticsearch)存在的原因。
The problem with secondary indexes is that they don’t map neatly to partitions. There are two main approaches to partitioning a database with secondary indexes: document-based partitioning and term-based partitioning.
次要索引的问题在于它们无法很好地映射到分区。为对具备次要索引的数据库进行分区,有两种主要方法:基于文档的分区和基于词项的分区。
Partitioning Secondary Indexes by Document
For example, imagine you are operating a website for selling used cars (illustrated in Figure 6-4 ). Each listing has a unique ID—call it the document ID —and you partition the database by the document ID (for example, IDs 0 to 499 in partition 0, IDs 500 to 999 in partition 1, etc.).
例如,想象你经营一个出售二手车的网站(如图6-4所示)。每个车辆列表都有一个唯一的ID,称为文档ID,你按照文档ID将数据库进行分区(例如,0到499的ID在分区0中,500到999的ID在分区1中等等)。
You want to let users search for cars, allowing them to filter by color and by make, so you need
a secondary index on
color
and
make
(in a document database these would be fields; in a
relational database they would be columns). If you have declared the index, the database can perform
the indexing automatically.
ii
For example, whenever a red car is added to the database, the database partition automatically adds
it to the list of document IDs for the index entry
color:red
.
你想讓使用者搜尋車輛,並允許他們按顏色和製造商進行篩選,因此你需要在顏色和製造商方面建立二級索引(在文檔數據庫中這些將是字段;在關系數據庫中這些將是列)。如果你已經聲明了索引,數據庫可以自動執行索引。例如,每次將紅色車輛添加到數據庫時,數據庫分區自動將其添加到索引項目“color:red”的文檔ID列表中。
In this indexing approach, each partition is completely separate: each partition maintains its own secondary indexes, covering only the documents in that partition. It doesn’t care what data is stored in other partitions. Whenever you need to write to the database—to add, remove, or update a document—you only need to deal with the partition that contains the document ID that you are writing. For that reason, a document-partitioned index is also known as a local index (as opposed to a global index , described in the next section).
在这种索引方法中,每个分区都是完全独立的:每个分区都维护着自己的二级索引,仅覆盖该分区中的文档。它不关心其他分区中存储的数据。每当您需要写入数据库 - 添加、删除或更新文档 - 您只需要处理包含要写入的文档ID的分区。因此,基于文档的索引也被称为本地索引(与下一节中描述的全局索引相对)。
However, reading from a document-partitioned index requires care: unless you have done something special with the document IDs, there is no reason why all the cars with a particular color or a particular make would be in the same partition. In Figure 6-4 , red cars appear in both partition 0 and partition 1. Thus, if you want to search for red cars, you need to send the query to all partitions, and combine all the results you get back.
然而,从文档分区索引中读取需要谨慎:除非您对文档ID进行了特殊处理,否则没有理由让所有特定颜色或制造商的汽车都在同一个分区中。在图6-4中,红色汽车出现在分区0和分区1中。因此,如果您想搜索红色汽车,您需要将查询发送到所有分区,并组合所有收到的结果。
This approach to querying a partitioned database is sometimes known as scatter/gather , and it can make read queries on secondary indexes quite expensive. Even if you query the partitions in parallel, scatter/gather is prone to tail latency amplification (see “Percentiles in Practice” ). Nevertheless, it is widely used: MongoDB, Riak [ 15 ], Cassandra [ 16 ], Elasticsearch [ 17 ], SolrCloud [ 18 ], and VoltDB [ 19 ] all use document-partitioned secondary indexes. Most database vendors recommend that you structure your partitioning scheme so that secondary index queries can be served from a single partition, but that is not always possible, especially when you’re using multiple secondary indexes in a single query (such as filtering cars by color and by make at the same time).
这种查询分区数据库的方法有时被称为scatter/gather,可以使二级索引的读取查询变得非常昂贵。即使你并行查询分区,scatter/gather也容易造成尾部延迟的加剧(见“实践中的百分位数”)。尽管如此,它被广泛使用:MongoDB、Riak [15]、Cassandra [16]、Elasticsearch [17]、SolrCloud[18]和VoltDB [19]都使用了基于文档分区的二级索引。大多数数据库供应商建议你构建分区方案,以便辅助索引查询可以从单个分区中提供服务,但这并不总是可能的,特别是当你在单个查询中使用多个辅助索引(例如同时按颜色和制造商筛选汽车)时。
Partitioning Secondary Indexes by Term
Rather than each partition having its own secondary index (a local index ), we can construct a global index that covers data in all partitions. However, we can’t just store that index on one node, since it would likely become a bottleneck and defeat the purpose of partitioning. A global index must also be partitioned, but it can be partitioned differently from the primary key index.
我们不应该让每个分区都拥有自己的二级索引,而是可以构建一个覆盖所有分区数据的全局索引。然而,我们不能把该索引仅存储在一个节点上,因为这样很可能会成为瓶颈,从而使分区失去意义。全局索引必须被分区,但是它的分区方式可以与主键索引不同。
Figure 6-5
illustrates what this could look like: red cars from all
partitions appear under
color:red
in the index, but the index is partitioned so that colors
starting with the letters
a
to
r
appear in partition 0 and colors starting with
s
to
z
appear
in partition 1. The index on the make of car is partitioned similarly (with the partition boundary
being between
f
and
h
).
图6-5说明了这个可能的样子:来自所有分区的红色汽车在索引中出现在颜色:红色下,但索引被分区,使得以字母a到r开头的颜色出现在分区0中,以s到z开头的颜色出现在分区1中。汽车制造商的索引也以类似的方式分区(分区边界在f和h之间)。
We call this kind of index
term-partitioned
, because the term we’re looking for determines the partition
of the index. Here, a term would be
color:red
, for example. The name
term
comes from full-text
indexes (a particular kind of secondary index), where the terms are all the words that occur in a
document.
我们称这种指标术语分区,因为我们正在寻找的术语确定了索引的分区。在这里,例如,一个术语将是颜色:红色。术语名称来自全文索引(一种特定的辅助索引),其中术语是文档中出现的所有单词。
As before, we can partition the index by the term itself, or using a hash of the term. Partitioning by the term itself can be useful for range scans (e.g., on a numeric property, such as the asking price of the car), whereas partitioning on a hash of the term gives a more even distribution of load.
我们可以像以前一样通过词项本身或词项的哈希来分割索引。通过词项本身分割对于范围扫描(例如在数字属性上,比如汽车的询问价格)很有用,而通过词项哈希分割则可以得到更加均匀的负载分布。
The advantage of a global (term-partitioned) index over a document-partitioned index is that it can make reads more efficient: rather than doing scatter/gather over all partitions, a client only needs to make a request to the partition containing the term that it wants. However, the downside of a global index is that writes are slower and more complicated, because a write to a single document may now affect multiple partitions of the index (every term in the document might be on a different partition, on a different node).
全局(分类术语分区)索引的优点在于可以更有效地进行读取:客户端只需向包含所需术语的分区发出请求,而不需要在所有分区上进行分散/聚合。然而,全局索引的缺点是写入速度更慢,更复杂,因为对单个文档的写入可能会影响索引的多个分区(文档中的每个术语可能位于不同的分区上,在不同的节点上)。
In an ideal world, the index would always be up to date, and every document written to the database would immediately be reflected in the index. However, in a term-partitioned index, that would require a distributed transaction across all partitions affected by a write, which is not supported in all databases (see Chapter 7 and Chapter 9 ).
在理想的世界中,索引应始终保持最新状态,每个写入数据库的文档都应立即反映在索引中。然而,在术语分区索引中,这将需要跨所有受写入影响的分区执行分布式事务,而这在所有数据库中都不受支持(请参阅第7章和第9章)。
In practice, updates to global secondary indexes are often asynchronous (that is, if you read the index shortly after a write, the change you just made may not yet be reflected in the index). For example, Amazon DynamoDB states that its global secondary indexes are updated within a fraction of a second in normal circumstances, but may experience longer propagation delays in cases of faults in the infrastructure [ 20 ].
实际上,全局二级索引的更新通常是异步的(也就是说,如果你在写操作之后立刻读取索引,你刚刚做出的更改可能还没有在索引中反映出来)。例如,亚马逊DynamoDB指出,其全局二级索引在正常情况下在几分之一秒内进行更新,但在基础设施故障的情况下可能会出现较长的传播延迟[20]。
Other uses of global term-partitioned indexes include Riak’s search feature [ 21 ] and the Oracle data warehouse, which lets you choose between local and global indexing [ 22 ]. We will return to the topic of implementing term-partitioned secondary indexes in Chapter 12 .
全局术语分区索引的其他用途包括Riak的搜索功能[21]以及Oracle数据仓库,它允许您在本地和全局索引之间进行选择[22]。我们将在第12章中回到实现术语分区二级索引的主题上。
Rebalancing Partitions
Over time, things change in a database:
随着时间的推移,数据库中的事物会发生变化。
-
The query throughput increases, so you want to add more CPUs to handle the load.
查询吞吐量增加,因此您想要增加更多的CPU来处理负载。
-
The dataset size increases, so you want to add more disks and RAM to store it.
数据集大小增加,所以你想要添加更多的硬盘和内存来存储它。
-
A machine fails, and other machines need to take over the failed machine’s responsibilities.
一台机器出现故障,其他机器需要接手故障机器的职责。
All of these changes call for data and requests to be moved from one node to another. The process of moving load from one node in the cluster to another is called rebalancing .
所有这些变化都需要将数据和请求从一个节点转移到另一个节点。将集群中的负载从一个节点移动到另一个节点的过程称为重新平衡。
No matter which partitioning scheme is used, rebalancing is usually expected to meet some minimum requirements:
无论使用哪种分区方案,重新平衡通常应满足一些最低要求:
-
After rebalancing, the load (data storage, read and write requests) should be shared fairly between the nodes in the cluster.
重新平衡后,集群中的节点应该公平共享负载(数据存储、读写请求)。
-
While rebalancing is happening, the database should continue accepting reads and writes.
在重新平衡过程中,数据库应该继续接受读和写操作。
-
No more data than necessary should be moved between nodes, to make rebalancing fast and to minimize the network and disk I/O load.
应该只传输必要的数据来完成重新平衡,以使重平衡快速,并尽量减少网络和磁盘I/O负载。
Strategies for Rebalancing
There are a few different ways of assigning partitions to nodes [ 23 ]. Let’s briefly discuss each in turn.
分配分区给节点有几种不同的方法[23]。让我们按顺序简要讨论每种方法。
How not to do it: hash mod N
When partitioning by the hash of a key, we said earlier ( Figure 6-3 ) that it’s best to divide the possible hashes into ranges and assign each range to a partition (e.g., assign key to partition 0 if 0 ≤ hash ( key ) < b 0 , to partition 1 if b 0 ≤ hash ( key ) < b 1 , etc.).
当按键的哈希值进行分区时,我们先前提到(如 Figure 6-3)最好将可行的哈希值分为范围,并将每个范围分配给一个分区(例如,如果 0 ≤ hash(key) < b0,则将键分配给分区 0,如果 b0≤hash(key)<b1,则将键分配给分区 1,以此类推)。
Perhaps you wondered why we don’t just use
mod
(the
%
operator in many programming languages).
For example,
hash
(
key
)
mod
10 would return a number between 0 and 9 (if we write the hash
as a decimal number, the hash
mod
10 would be the last digit). If we have 10 nodes, numbered 0 to
9, that seems like an easy way of assigning each key to a node.
也许你会想知道为什么我们不直接使用 mod(在许多编程语言中的 % 操作符)。例如,hash(key)mod 10 将返回 0 到 9 之间的数字(如果我们将哈希写为十进制数字,则哈希 mod 10 将是最后一位数字)。如果我们有 10 个节点,编号从 0 到 9,那么这似乎是一种将每个键分配到节点的简单方法。
The problem with the mod N approach is that if the number of nodes N changes, most of the keys will need to be moved from one node to another. For example, say hash ( key ) = 123456. If you initially have 10 nodes, that key starts out on node 6 (because 123456 mod 10 = 6). When you grow to 11 nodes, the key needs to move to node 3 (123456 mod 11 = 3), and when you grow to 12 nodes, it needs to move to node 0 (123456 mod 12 = 0). Such frequent moves make rebalancing excessively expensive.
mod N的问题在于,如果节点数N发生变化,大多数键将需要从一个节点移动到另一个节点。例如,假设哈希(key) = 123456。如果最初有10个节点,该键从节点6开始(因为123456 mod 10 = 6)。当您增长到11个节点时,该键需要移动到节点3(123456 mod 11 = 3),当您增长到12个节点时,它需要移动到节点0(123456 mod 12 = 0)。这样频繁的移动使得重新平衡极其昂贵。
We need an approach that doesn’t move data around more than necessary.
我们需要一种方法,不会将数据移动得过多。
Fixed number of partitions
Fortunately, there is a fairly simple solution: create many more partitions than there are nodes, and assign several partitions to each node. For example, a database running on a cluster of 10 nodes may be split into 1,000 partitions from the outset so that approximately 100 partitions are assigned to each node.
幸运的是,有一个相当简单的解决方案:创建比节点数量更多的分区,并将多个分区分配给每个节点。例如,运行在10个节点集群上的数据库可以从一开始就划分为1,000个分区,这样每个节点大约分配100个分区。
Now, if a node is added to the cluster, the new node can steal a few partitions from every existing node until partitions are fairly distributed once again. This process is illustrated in Figure 6-6 . If a node is removed from the cluster, the same happens in reverse.
现在,如果向群集添加节点,则新节点可以从每个现有节点中窃取一些分区,直到再次公平分配分区。这个过程如图6-6所示。如果从群集中删除节点,则反向发生相同的情况。
Only entire partitions are moved between nodes. The number of partitions does not change, nor does the assignment of keys to partitions. The only thing that changes is the assignment of partitions to nodes. This change of assignment is not immediate—it takes some time to transfer a large amount of data over the network—so the old assignment of partitions is used for any reads and writes that happen while the transfer is in progress.
仅整个分区在节点之间移动。分区数量不会改变,键到分区的分配也不会改变。唯一变化的是分配给节点的分区。该分配的更改不会立即发生,需要一些时间将大量数据通过网络传输,因此在传输过程中发生的任何读写操作都将使用旧的分区分配。
In principle, you can even account for mismatched hardware in your cluster: by assigning more partitions to nodes that are more powerful, you can force those nodes to take a greater share of the load.
原则上,您甚至可以考虑集群中不匹配的硬件:通过将更多的分区分配给更强大的节点,您可以让这些节点承担更大的工作负荷份额。
This approach to rebalancing is used in Riak [ 15 ], Elasticsearch [ 24 ], Couchbase [ 10 ], and Voldemort [ 25 ].
这种重新平衡的方法被用于Riak [15]、Elasticsearch [24]、Couchbase [10]和Voldemort [25]中。
In this configuration, the number of partitions is usually fixed when the database is first set up and not changed afterward. Although in principle it’s possible to split and merge partitions (see the next section), a fixed number of partitions is operationally simpler, and so many fixed-partition databases choose not to implement partition splitting. Thus, the number of partitions configured at the outset is the maximum number of nodes you can have, so you need to choose it high enough to accommodate future growth. However, each partition also has management overhead, so it’s counterproductive to choose too high a number.
在这种配置中,分区数量通常在数据库首次设置时固定不变。虽然原则上可以分裂和合并分区(见下一节),但固定分区数量在操作上更简单,因此许多固定分区数据库选择不实现分区分割。因此,最初配置的分区数是您可以拥有的最大节点数,因此您需要选择足够高的数字来容纳未来增长。然而,每个分区也有管理开销,因此选择过高的数字是不利的。
Choosing the right number of partitions is difficult if the total size of the dataset is highly variable (for example, if it starts small but may grow much larger over time). Since each partition contains a fixed fraction of the total data, the size of each partition grows proportionally to the total amount of data in the cluster. If partitions are very large, rebalancing and recovery from node failures become expensive. But if partitions are too small, they incur too much overhead. The best performance is achieved when the size of partitions is “just right,” neither too big nor too small, which can be hard to achieve if the number of partitions is fixed but the dataset size varies.
如果数据集的总大小非常不确定(例如,它可能从小变得非常大),那么选择正确的分区数量就很困难。由于每个分区包含总数据的固定部分,因此每个分区的大小与群集中的数据总量成比例增长。如果分区非常大,则重新平衡和从节点故障中恢复会变得昂贵。但是,如果分区太小,则会产生过多的开销。当分区大小“刚刚好”而不是太大或太小时,可以实现最佳性能,但如果分区数量固定但数据集大小变化,则可能难以实现。
Dynamic partitioning
For databases that use key range partitioning (see “Partitioning by Key Range” ), a fixed number of partitions with fixed boundaries would be very inconvenient: if you got the boundaries wrong, you could end up with all of the data in one partition and all of the other partitions empty. Reconfiguring the partition boundaries manually would be very tedious.
对于使用关键字范围分区的数据库(参见“按关键字范围分区”),具有固定边界的固定数量的分区将非常不方便:如果边界设置错误,您可能会在一个分区中得到所有数据以及所有其他分区为空。手动重新配置分区边界将非常繁琐。 对于使用关键字范围分区的数据库(参见“按关键字范围分区”),固定数量的分区以及固定边界将非常不方便:如果边界设置错误,您可能会在一个分区中得到所有数据以及所有其他分区为空。手动重新配置分区边界将非常繁琐。
For that reason, key range–partitioned databases such as HBase and RethinkDB create partitions dynamically. When a partition grows to exceed a configured size (on HBase, the default is 10 GB), it is split into two partitions so that approximately half of the data ends up on each side of the split [ 26 ]. Conversely, if lots of data is deleted and a partition shrinks below some threshold, it can be merged with an adjacent partition. This process is similar to what happens at the top level of a B-tree (see “B-Trees” ).
因此,像HBase和RethinkDB这样的键范围分区数据库会动态地创建分区。当分区增长超过配置的大小时(在HBase上,默认为10 GB),它会被分成两个分区,以便将数据的约一半分布在拆分的每一侧[26]。反之,如果删除了大量数据并且分区缩小到低于某个阈值,它可以与相邻的分区合并。 这个过程类似于B树的顶层发生的情况(参见“B树”)。
Each partition is assigned to one node, and each node can handle multiple partitions, like in the case of a fixed number of partitions. After a large partition has been split, one of its two halves can be transferred to another node in order to balance the load. In the case of HBase, the transfer of partition files happens through HDFS, the underlying distributed filesystem [ 3 ].
每个分区都被分配给一个节点,每个节点可以处理多个分区,就像一个固定数量的分区一样。在大分区被拆分后,其中的一半可以转移到另一个节点以平衡负载。在HBase的情况下,分区文件的转移通过HDFS进行,这是底层分布式文件系统。
An advantage of dynamic partitioning is that the number of partitions adapts to the total data volume. If there is only a small amount of data, a small number of partitions is sufficient, so overheads are small; if there is a huge amount of data, the size of each individual partition is limited to a configurable maximum [ 23 ].
动态分区的优点在于分区数量能够自适应数据量大小。如果数据量很小,只需要少量分区,从而减小开销;如果数据量很大,每个分区的大小会被限制在可配置的最大值[23]。
However, a caveat is that an empty database starts off with a single partition, since there is no a priori information about where to draw the partition boundaries. While the dataset is small—until it hits the point at which the first partition is split—all writes have to be processed by a single node while the other nodes sit idle. To mitigate this issue, HBase and MongoDB allow an initial set of partitions to be configured on an empty database (this is called pre-splitting ). In the case of key-range partitioning, pre-splitting requires that you already know what the key distribution is going to look like [ 4 , 26 ].
然而,有一个警告是空数据库开始时仅有一个分区,因为没有先验信息关于在哪里绘制分区边界。当数据集很小-直到第一个分区被拆分时,所有写入都必须由单个节点处理,而其他节点则闲置。为了缓解这个问题,HBase和MongoDB允许在空数据库上配置一组初始分区(这称为预分割)。在键范围分区的情况下,预分割要求你已经知道键分布会是什么样子 [4,26]。
Dynamic partitioning is not only suitable for key range–partitioned data, but can equally well be used with hash-partitioned data. MongoDB since version 2.4 supports both key-range and hash partitioning, and it splits partitions dynamically in either case.
动态分区不仅适用于键范围分区数据,同样也可用于哈希分区数据。自2.4版本以来,MongoDB支持键范围和哈希分区,并在任一情况下动态分割分区。
Partitioning proportionally to nodes
With dynamic partitioning, the number of partitions is proportional to the size of the dataset, since the splitting and merging processes keep the size of each partition between some fixed minimum and maximum. On the other hand, with a fixed number of partitions, the size of each partition is proportional to the size of the dataset. In both of these cases, the number of partitions is independent of the number of nodes.
使用动态分区,分区数量与数据集大小成正比,因为拆分和合并过程保持每个分区的大小介于某个固定的最小值和最大值之间。另一方面,如果分区数量固定,则每个分区的大小与数据集的大小成正比。在这两种情况下,分区数与节点数无关。
A third option, used by Cassandra and Ketama, is to make the number of partitions proportional to the number of nodes—in other words, to have a fixed number of partitions per node [ 23 , 27 , 28 ]. In this case, the size of each partition grows proportionally to the dataset size while the number of nodes remains unchanged, but when you increase the number of nodes, the partitions become smaller again. Since a larger data volume generally requires a larger number of nodes to store, this approach also keeps the size of each partition fairly stable.
第三种选项是由Cassandra和Ketama使用的,就是按节点数比例设置分区数量-换句话说,每个节点有一个固定数量的分区[23,27,28]。 在这种情况下,每个分区的大小与数据集大小成比例增长,而节点数保持不变,但是当您增加节点数时,分区大小又会变小。 由于更大的数据量一般需要更多的节点来存储,因此这种方法也可以保持每个分区的大小相对稳定。
When a new node joins the cluster, it randomly chooses a fixed number of existing partitions to split, and then takes ownership of one half of each of those split partitions while leaving the other half of each partition in place. The randomization can produce unfair splits, but when averaged over a larger number of partitions (in Cassandra, 256 partitions per node by default), the new node ends up taking a fair share of the load from the existing nodes. Cassandra 3.0 introduced an alternative rebalancing algorithm that avoids unfair splits [ 29 ].
当一个新节点加入集群时,它会随机选择一定数量的现有分区进行拆分,然后接管这些拆分分区的一半,同时留下每个分区的另一半。随机化可能会产生不公平的拆分,但当它在大量分区上进行平均(在Cassandra中,默认每个节点有256个分区),新节点最终会从现有节点中获得公平的负载份额。Cassandra 3.0引入了一种避免不公平拆分的替代平衡算法[29]。
Picking partition boundaries randomly requires that hash-based partitioning is used (so the boundaries can be picked from the range of numbers produced by the hash function). Indeed, this approach corresponds most closely to the original definition of consistent hashing [ 7 ] (see “Consistent Hashing” ). Newer hash functions can achieve a similar effect with lower metadata overhead [ 8 ].
随机选择分区边界需要使用基于哈希的分区(因此可以从哈希函数产生的数字范围中选择边界)。实际上,这种方法最接近一致性哈希的原始定义[7](参见“一致性哈希”)。新的哈希函数可以以较低的元数据开销实现类似的效果[8]。
Operations: Automatic or Manual Rebalancing
There is one important question with regard to rebalancing that we have glossed over: does the rebalancing happen automatically or manually?
关于再平衡存在一个重要问题:是自动还是手动进行?
There is a gradient between fully automatic rebalancing (the system decides automatically when to move partitions from one node to another, without any administrator interaction) and fully manual (the assignment of partitions to nodes is explicitly configured by an administrator, and only changes when the administrator explicitly reconfigures it). For example, Couchbase, Riak, and Voldemort generate a suggested partition assignment automatically, but require an administrator to commit it before it takes effect.
完全自动重新平衡(系统自动决定何时将分区从一个节点移动到另一个节点,而无需任何管理员干预)和完全手动之间存在梯度(将分区分配给节点是由管理员明确配置的,并且仅在管理员明确重新配置时发生更改)。例如,Couchbase,Riak和Voldemort自动生成建议的分区分配,但需要管理员在生效之前批准它。
Fully automated rebalancing can be convenient, because there is less operational work to do for normal maintenance. However, it can be unpredictable. Rebalancing is an expensive operation, because it requires rerouting requests and moving a large amount of data from one node to another. If it is not done carefully, this process can overload the network or the nodes and harm the performance of other requests while the rebalancing is in progress.
全自动重新平衡可以很方便,因为正常维护时需要做较少的操作工作。但它也可能是不可预测的。重新平衡是一项昂贵的操作,因为它需要重新路由请求并移动大量数据从一个节点到另一个节点。如果不小心进行,则此过程可能会过载网络或节点,并在重新平衡期间影响其他请求的性能。
Such automation can be dangerous in combination with automatic failure detection. For example, say one node is overloaded and is temporarily slow to respond to requests. The other nodes conclude that the overloaded node is dead, and automatically rebalance the cluster to move load away from it. This puts additional load on the overloaded node, other nodes, and the network—making the situation worse and potentially causing a cascading failure.
这种自动化在与自动故障检测相结合时可能会很危险。例如,假设一个节点过载并暂时无法快速响应请求。其他节点会得出结论这个超载节点已经死亡,并自动重新平衡集群以移除负载。这会增加超载节点、其他节点和网络的负载,使情况恶化并潜在地导致级联故障。
For that reason, it can be a good thing to have a human in the loop for rebalancing. It’s slower than a fully automatic process, but it can help prevent operational surprises.
因此,让人类介入重新平衡可能是一件好事。虽然速度比完全自动化流程慢,但它有助于防止操作上的意外。
Request Routing
We have now partitioned our dataset across multiple nodes running on multiple machines. But there remains an open question: when a client wants to make a request, how does it know which node to connect to? As partitions are rebalanced, the assignment of partitions to nodes changes. Somebody needs to stay on top of those changes in order to answer the question: if I want to read or write the key “foo”, which IP address and port number do I need to connect to?
我们现在已经将数据集分区到多个运行在多台机器上的节点上了。但是还有一个未解决的问题:当客户端想要发起请求时,它如何知道要连接哪个节点?当分区重新平衡时,分配给节点的分区会发生变化。有人需要跟上这些变化,以回答这个问题:如果我想读取或写入键“foo”,我需要连接哪个IP地址和端口号?
This is an instance of a more general problem called service discovery , which isn’t limited to just databases. Any piece of software that is accessible over a network has this problem, especially if it is aiming for high availability (running in a redundant configuration on multiple machines). Many companies have written their own in-house service discovery tools, and many of these have been released as open source [ 30 ].
这是一个更一般的问题,被称为服务发现,它不仅仅局限于数据库。任何在网络上可访问的软件都会面临这个问题,特别是如果它想要实现高可用性(在多台机器上的冗余配置运行)。许多公司编写了自己的内部服务发现工具,其中许多已被发布为开源软件[30]。
On a high level, there are a few different approaches to this problem (illustrated in Figure 6-7 ):
从高层次来讲,对于这个问题有几种不同的方法(如图6-7所示):
-
Allow clients to contact any node (e.g., via a round-robin load balancer). If that node coincidentally owns the partition to which the request applies, it can handle the request directly; otherwise, it forwards the request to the appropriate node, receives the reply, and passes the reply along to the client.
允许客户端通过轮询负载均衡器与任何节点联系。如果该节点恰好拥有请求所涉及的分区,则可以直接处理请求;否则,它将请求转发到适当的节点,接收回复,并将回复传递给客户端。
-
Send all requests from clients to a routing tier first, which determines the node that should handle each request and forwards it accordingly. This routing tier does not itself handle any requests; it only acts as a partition-aware load balancer.
将所有客户端请求先发送到路由层,该层确定应该处理每个请求的节点,并相应地转发它们。这个路由层本身不处理任何请求,它只作为一个分区感知的负载均衡器。
-
Require that clients be aware of the partitioning and the assignment of partitions to nodes. In this case, a client can connect directly to the appropriate node, without any intermediary.
要求客户端了解分区和分区分配给节点的情况。在这种情况下,客户端可以直接连接到适当的节点,无需任何中间人。
In all cases, the key problem is: how does the component making the routing decision (which may be one of the nodes, or the routing tier, or the client) learn about changes in the assignment of partitions to nodes?
在所有情况下,关键问题是:决定路由的组件(可能是节点之一、路由层或客户端)如何了解将分区分配给节点的更改?
This is a challenging problem, because it is important that all participants agree—otherwise requests would be sent to the wrong nodes and not handled correctly. There are protocols for achieving consensus in a distributed system, but they are hard to implement correctly (see Chapter 9 ).
这是一个具有挑战性的问题,因为重要的是所有参与者都同意,否则请求将被发送到错误的节点,并且不能正确处理。有协议可用于在分布式系统中实现共识,但是正确地实现它们很难(请参阅第9章)。
Many distributed data systems rely on a separate coordination service such as ZooKeeper to keep track of this cluster metadata, as illustrated in Figure 6-8 . Each node registers itself in ZooKeeper, and ZooKeeper maintains the authoritative mapping of partitions to nodes. Other actors, such as the routing tier or the partitioning-aware client, can subscribe to this information in ZooKeeper. Whenever a partition changes ownership, or a node is added or removed, ZooKeeper notifies the routing tier so that it can keep its routing information up to date.
许多分布式数据系统都依赖于一个单独的协调服务(例如ZooKeeper)来跟踪集群元数据,如图6-8所示。每个节点在ZooKeeper中注册自己,并且ZooKeeper维护分区到节点的授权映射。其他参与者(如路由层或分区感知客户端)可以在ZooKeeper中订阅这些信息。每当分区所有权发生变化或节点添加或删除时,ZooKeeper会通知路由层以便它可以保持其路由信息最新。
For example, LinkedIn’s Espresso uses Helix [ 31 ] for cluster management (which in turn relies on ZooKeeper), implementing a routing tier as shown in Figure 6-8 . HBase, SolrCloud, and Kafka also use ZooKeeper to track partition assignment. MongoDB has a similar architecture, but it relies on its own config server implementation and mongos daemons as the routing tier.
例如,LinkedIn的Espresso使用Helix进行集群管理(其又依赖于ZooKeeper),并实现了如图6-8所示的路由层。HBase、SolrCloud和Kafka也使用ZooKeeper来跟踪分区分配。MongoDB具有类似的架构,但它依赖于自己的配置服务器实现和mongos守护程序作为路由层。
Cassandra and Riak take a different approach: they use a gossip protocol among the nodes to disseminate any changes in cluster state. Requests can be sent to any node, and that node forwards them to the appropriate node for the requested partition (approach 1 in Figure 6-7 ). This model puts more complexity in the database nodes but avoids the dependency on an external coordination service such as ZooKeeper.
Cassandra和Riak采用不同的方法:它们使用节点之间的八卦协议来传播集群状态的任何更改。请求可以发送到任何节点,该节点将其转发到所请求分区的适当节点(图6-7中的第一种方法)。该模型将更多的复杂性放在数据库节点上,但避免了对外部协调服务(如ZooKeeper)的依赖。
Couchbase does not rebalance automatically, which simplifies the design. Normally it is configured with a routing tier called moxi , which learns about routing changes from the cluster nodes [ 32 ].
Couchbase不会自动重新平衡,这简化了设计。通常情况下,它会配置一个叫做moxi的路由层,从集群节点了解路由变化。
When using a routing tier or when sending requests to a random node, clients still need to find the IP addresses to connect to. These are not as fast-changing as the assignment of partitions to nodes, so it is often sufficient to use DNS for this purpose.
使用路由层或向随机节点发送请求时,客户端仍然需要找到要连接的 IP 地址。这些地址不像将分区分配给节点一样经常发生变化,因此通常可以使用 DNS 来实现这一目的。
Parallel Query Execution
So far we have focused on very simple queries that read or write a single key (plus scatter/gather queries in the case of document-partitioned secondary indexes). This is about the level of access supported by most NoSQL distributed datastores.
到目前为止,我们专注于非常简单的查询,仅读取或写入单个键(在文档分区二级索引的情况下还包括scatter/gather查询)。这是大多数NoSQL分布式数据存储支持的访问级别。
However, massively parallel processing (MPP) relational database products, often used for analytics, are much more sophisticated in the types of queries they support. A typical data warehouse query contains several join, filtering, grouping, and aggregation operations. The MPP query optimizer breaks this complex query into a number of execution stages and partitions, many of which can be executed in parallel on different nodes of the database cluster. Queries that involve scanning over large parts of the dataset particularly benefit from such parallel execution.
然而,通常用于分析的大规模并行处理(MPP)关系型数据库产品在其支持的查询类型方面要复杂得多。典型的数据仓库查询包含多个连接、筛选、分组和聚合操作。MPP查询优化器将这个复杂的查询分成多个执行阶段和分区,其中许多可以在数据库群集的不同节点上并行执行。涉及扫描大量数据集的查询尤其受益于这种并行执行。
Fast parallel execution of data warehouse queries is a specialized topic, and given the business importance of analytics, it receives a lot of commercial interest. We will discuss some techniques for parallel query execution in Chapter 10 . For a more detailed overview of techniques used in parallel databases, please see the references [ 1 , 33 ].
数据仓库查询的快速并行执行是一项专业话题,由于分析的商业重要性,它受到了很多商业上的关注。我们将在第10章中讨论一些并行查询执行技术。有关并行数据库中使用的技术的更详细概述,请参见参考文献[1,33]。
Summary
In this chapter we explored different ways of partitioning a large dataset into smaller subsets. Partitioning is necessary when you have so much data that storing and processing it on a single machine is no longer feasible.
在本章中,我们探讨了将大型数据集分割为较小子集的不同方法。当您的数据量如此大,以至于在单台计算机上存储和处理不再可行时,分区是必要的。
The goal of partitioning is to spread the data and query load evenly across multiple machines, avoiding hot spots (nodes with disproportionately high load). This requires choosing a partitioning scheme that is appropriate to your data, and rebalancing the partitions when nodes are added to or removed from the cluster.
分区的目标是在多台机器间均匀分配数据和查询负载,避免热点节点(负载过高的节点)。这需要选择适合数据的分区方案,并在集群中添加或删除节点时重新平衡分区。
We discussed two main approaches to partitioning:
我们讨论了两种主要的分区方法:
-
Key range partitioning , where keys are sorted, and a partition owns all the keys from some minimum up to some maximum. Sorting has the advantage that efficient range queries are possible, but there is a risk of hot spots if the application often accesses keys that are close together in the sorted order.
键范围分区,其中键已排序,并且一个分区拥有从某个最小值到某个最大值的所有键。排序具有高效的范围查询优势,但如果应用程序经常访问在排序顺序中彼此靠近的键,则存在热点的风险。
In this approach, partitions are typically rebalanced dynamically by splitting the range into two subranges when a partition gets too big.
在这种方法中,当一个分区变得太大时,通常会通过将范围分成两个子范围来动态地重新平衡分区。
-
Hash partitioning , where a hash function is applied to each key, and a partition owns a range of hashes. This method destroys the ordering of keys, making range queries inefficient, but may distribute load more evenly.
哈希分区,是对每个关键字应用哈希函数,并对一个分区拥有一定范围的哈希值。这种方法破坏了关键字的排序,导致区间查询效率低下,但可以更均匀地分配负载。
When partitioning by hash, it is common to create a fixed number of partitions in advance, to assign several partitions to each node, and to move entire partitions from one node to another when nodes are added or removed. Dynamic partitioning can also be used.
当使用哈希分区时,通常会提前创建固定数量的分区,分配多个分区到每个节点,当节点添加或删除时,会将整个分区从一个节点移动到另一个节点。也可以使用动态分区。
Hybrid approaches are also possible, for example with a compound key: using one part of the key to identify the partition and another part for the sort order.
混合方法也是可能的,例如使用复合键:使用键的一部分来标识分区,另一部分用于排序顺序。
We also discussed the interaction between partitioning and secondary indexes. A secondary index also needs to be partitioned, and there are two methods:
我们还讨论了分区和二级索引之间的交互。二级索引也需要进行分区,有两种方法:
-
Document-partitioned indexes (local indexes), where the secondary indexes are stored in the same partition as the primary key and value. This means that only a single partition needs to be updated on write, but a read of the secondary index requires a scatter/gather across all partitions.
文档分区索引(本地索引),其中二级索引存储在与主键和值相同的分区中。这意味着在写入时只需要更新单个分区,但读取辅助索引需要在所有分区间散布/收集。
-
Term-partitioned indexes (global indexes), where the secondary indexes are partitioned separately, using the indexed values. An entry in the secondary index may include records from all partitions of the primary key. When a document is written, several partitions of the secondary index need to be updated; however, a read can be served from a single partition.
使用按术语分区的索引(全局索引),其中二级索引单独分区,使用索引值。二级索引中的条目可能包括主键所有分区的记录。当编写文档时,需要更新二级索引的多个分区;但是,读取可以从单个分区服务。
Finally, we discussed techniques for routing queries to the appropriate partition, which range from simple partition-aware load balancing to sophisticated parallel query execution engines.
最后,我们讨论了将查询路由到适当分区的技术,这些技术包括简单的分区意识负载均衡和复杂的并行查询执行引擎。
By design, every partition operates mostly independently—that’s what allows a partitioned database to scale to multiple machines. However, operations that need to write to several partitions can be difficult to reason about: for example, what happens if the write to one partition succeeds, but another fails? We will address that question in the following chapters.
按照设计,每个分区基本上是独立运作的,这使得分区数据库可以扩展到多台机器。然而,需要写入多个分区的操作可能很难理解:例如,如果对一个分区的写入成功了,但另一个分区失败了怎么办?我们将在接下来的章节中解决这个问题。
Footnotes
i Partitioning, as discussed in this chapter, is a way of intentionally breaking a large database down into smaller ones. It has nothing to do with network partitions (netsplits), a type of fault in the network between nodes. We will discuss such faults in Chapter 8 .
本章所讨论的分区是一种有意将大型数据库分解成更小模块的方法,与网络分区(net split)这种在节点之间网络连接上发生的故障无关。我们将在第八章讨论这类故障。
ii If your database only supports a key-value model, you might be tempted to implement a secondary index yourself by creating a mapping from values to document IDs in application code. If you go down this route, you need to take great care to ensure your indexes remain consistent with the underlying data. Race conditions and intermittent write failures (where some changes were saved but others weren’t) can very easily cause the data to go out of sync—see “The need for multi-object transactions” .
如果你的数据库仅支持键值模型,你可能会想自己实现二级索引,通过在应用程序代码中创建从数值到文档ID的映射。如果你选择这种方法,你需要非常小心地确保你的索引与底层数据保持一致。竞争条件和断断续续的写入故障(其中一些更改被保存,但其他更改则没有)很容易导致数据失去同步。请参阅“多对象事务”的需要。
References
[ 1 ] David J. DeWitt and Jim N. Gray: “ Parallel Database Systems: The Future of High Performance Database Systems ,” Communications of the ACM , volume 35, number 6, pages 85–98, June 1992. doi:10.1145/129888.129894
[1] David J. DeWitt和Jim N. Gray:"并行数据库系统:高性能数据库系统的未来",ACM通讯,第35卷,第6期,1992年6月,85-98页。doi:10.1145/129888.129894。
[ 2 ] Lars George: “ HBase vs. BigTable Comparison ,” larsgeorge.com , November 2009.
[2] Lars George: “HBase vs. BigTable 对比,” larsgeorge.com,2009年11月。
[ 3 ] “ The Apache HBase Reference Guide ,” Apache Software Foundation, hbase.apache.org , 2014.
[3] “Apache HBase参考指南”,Apache软件基金会,hbase.apache.org,2014年。
[ 4 ] MongoDB, Inc.: “ New Hash-Based Sharding Feature in MongoDB 2.4 ,” blog.mongodb.org , April 10, 2013.
[4] MongoDB公司:“MongoDB 2.4中新增基于哈希的分片特性”,blog.mongodb.org,2013年4月10日。
[ 5 ] Ikai Lan: “ App Engine Datastore Tip: Monotonically Increasing Values Are Bad ,” ikaisays.com , January 25, 2011.
"[5] Ikai Lan: “App Engine 数据存储技巧:单调递增的值是不好的”,ikaisays.com, 2011年1月25日."
[ 6 ] Martin Kleppmann: “ Java’s hashCode Is Not Safe for Distributed Systems ,” martin.kleppmann.com , June 18, 2012.
"Java的hashCode对分布式系统不安全",Martin Kleppmann,martin.kleppmann.com,2012年6月18日。"
[ 7 ] David Karger, Eric Lehman, Tom Leighton, et al.: “ Consistent Hashing and Random Trees: Distributed Caching Protocols for Relieving Hot Spots on the World Wide Web ,” at 29th Annual ACM Symposium on Theory of Computing (STOC), pages 654–663, 1997. doi:10.1145/258533.258660
[7] David Karger, Eric Lehman, Tom Leighton等:“一致性哈希和随机树:分布式缓存协议以缓解全球网络上的热点”,第29届ACM计算理论研讨会(STOC),1997年,第654-663页。doi:10.1145/258533.258660。
[ 8 ] John Lamping and Eric Veach: “ A Fast, Minimal Memory, Consistent Hash Algorithm ,” arxiv.org , June 2014.
约翰·兰平和埃里克·维奇:“一种快速、最小内存、一致散列算法”,arxiv.org,2014年6月。
[ 9 ] Eric Redmond: “ A Little Riak Book ,” Version 1.4.0, Basho Technologies, September 2013.
[9] Eric Redmond: 《Riak简介》第1.4.0版, Basho Technologies,2013年9月。
[ 10 ] “ Couchbase 2.5 Administrator Guide ,” Couchbase, Inc., 2014.
[10] “Couchbase 2.5管理员指南”,Couchbase公司,2014年。
[ 11 ] Avinash Lakshman and Prashant Malik: “ Cassandra – A Decentralized Structured Storage System ,” at 3rd ACM SIGOPS International Workshop on Large Scale Distributed Systems and Middleware (LADIS), October 2009.
[11] Avinash Lakshman 和 Prashant Malik:“Cassandra-一个分散式构造存储系统”,于 2009 年 10 月第三届 ACM SIGOPS 国际大规模分散式系统和中间件研讨会(LADIS)上发表。
[ 12 ] Jonathan Ellis: “ Facebook’s Cassandra Paper, Annotated and Compared to Apache Cassandra 2.0 ,” datastax.com , September 12, 2013.
[12] Jonathan Ellis:“Facebook的Cassandra 论文:注释和与Apache Cassandra 2.0的比较”,datastax.com,2013年9月12日。
[ 13 ] “ Introduction to Cassandra Query Language ,” DataStax, Inc., 2014.
《Cassandra查询语言简介》,DataStax,Inc.,2014年。
[ 14 ] Samuel Axon: “ 3% of Twitter’s Servers Dedicated to Justin Bieber ,” mashable.com , September 7, 2010.
“Twitter的服务器中有3%专门用于贾斯汀·比伯。”,来源于2010年9月7日的mashable.com。
[ 15 ] “ Riak 1.4.8 Docs ,” Basho Technologies, Inc., 2014.
《Riak 1.4.8文档》,Basho Technologies,Inc.,2014年。
[ 16 ] Richard Low: “ The Sweet Spot for Cassandra Secondary Indexing ,” wentnet.com , October 21, 2013.
[16] Richard Low:“Cassandra二级索引的最佳情况”,wentnet.com,2013年10月21日。
[ 17 ] Zachary Tong: “ Customizing Your Document Routing ,” elasticsearch.org , June 3, 2013.
[17] Zachary Tong:“自定义文档路由”,elasticsearch.org,2013年6月3日。
[ 18 ] “ Apache Solr Reference Guide ,” Apache Software Foundation, 2014.
"[18] Apache Solr参考指南,Apache Software Foundation,2014年。"
[ 19 ] Andrew Pavlo: “ H-Store Frequently Asked Questions ,” hstore.cs.brown.edu , October 2013.
[19] Andrew Pavlo: “H-Store常见问题,” hstore.cs.brown.edu, 2013年10月。
[ 20 ] “ Amazon DynamoDB Developer Guide ,” Amazon Web Services, Inc., 2014.
「亚马逊DynamoDB开发者指南」,由Amazon Web Services,Inc.于2014年出版。
[ 21 ] Rusty Klophaus: “ Difference Between 2I and Search ,” email to riak-users mailing list, lists.basho.com , October 25, 2011.
【21】 Rusty Klophaus: “2I和Search的差异”,邮件发送至riak-users邮件列表,lists.basho.com,2011年10月25日。
[ 22 ] Donald K. Burleson: “ Object Partitioning in Oracle ,” dba-oracle.com , November 8, 2000.
[22] Donald K. Burleson:“Oracle中的对象分区”,dba-oracle.com,2000年11月8日。 [22]:唐纳德·K·伯勒森:“Oracle中的对象分区”,dba-oracle.com,2000年11月8日。
[ 23 ] Eric Evans: “ Rethinking Topology in Cassandra ,” at ApacheCon Europe , November 2012.
[23] Eric Evans:在2012年11月的ApacheCon Europe上发表了题为“重新审视Cassandra中的拓扑结构”的演讲。
[ 24 ] Rafał Kuć: “ Reroute API Explained ,” elasticsearchserverbook.com , September 30, 2013.
【24】Rafał Kuć:“重定向API解释”,elasticsearchserverbook.com,2013年9月30日。
[ 25 ] “ Project Voldemort Documentation ,” project-voldemort.com .
“Voldemort项目文档”,project-voldemort.com。
[ 26 ] Enis Soztutar: “ Apache HBase Region Splitting and Merging ,” hortonworks.com , February 1, 2013.
"[26] Enis Soztutar:'Apache HBase区域的分割和合并',hortonworks.com,2013年2月1日。"
[ 27 ] Brandon Williams: “ Virtual Nodes in Cassandra 1.2 ,” datastax.com , December 4, 2012.
[27]Brandon Williams:“Cassandra1.2中的虚拟节点”,datastax.com,2012年12月4日。
[ 28 ] Richard Jones: “ libketama: Consistent Hashing Library for Memcached Clients ,” metabrew.com , April 10, 2007.
[28] Richard Jones: “libketama:用于Memcached客户端的一致性哈希库”,metabrew.com,2007年4月10日。
[ 29 ] Branimir Lambov: “ New Token Allocation Algorithm in Cassandra 3.0 ,” datastax.com , January 28, 2016.
[29] 布拉尼米尔·兰博夫: “Cassandra 3.0中的新令牌分配算法”,datastax.com,2016年1月28日。
[ 30 ] Jason Wilder: “ Open-Source Service Discovery ,” jasonwilder.com , February 2014.
"Jason Wilder: “开源服务发现”,jasonwilder.com,2014年2月。"
[ 31 ] Kishore Gopalakrishna, Shi Lu, Zhen Zhang, et al.: “ Untangling Cluster Management with Helix ,” at ACM Symposium on Cloud Computing (SoCC), October 2012. doi:10.1145/2391229.2391248
[31] Kishore Gopalakrishna, Shi Lu, Zhen Zhang等人:“通过Helix解开集群管理”,发表于2012年ACM云计算研讨会(SoCC),2012年10月。 doi:10.1145/2391229.2391248
[ 32 ] “ Moxi 1.8 Manual ,” Couchbase, Inc., 2014.
[32] "Moxi 1.8 手册," Couchbase,Inc.,2014年。
[ 33 ] Shivnath Babu and Herodotos Herodotou: “ Massively Parallel Databases and MapReduce Systems ,” Foundations and Trends in Databases , volume 5, number 1, pages 1–104, November 2013. doi:10.1561/1900000036
“大规模并行数据库与MapReduce系统”,Shivnath Babu和Herodotos Herodotou,数据库基础与趋势,第5卷,第1号,第1-104页,2013年11月。doi:10.1561/1900000036。