第十章 批处理
Chapter 10. Batch Processing
A system cannot be successful if it is too strongly influenced by a single person. Once the initial design is complete and fairly robust, the real test begins as people with many different viewpoints undertake their own experiments.
一个系统如果受到单个人的过多影响,就不可能成功。一旦初步设计完成并相当健壮,真正的考验就开始了,因为拥有许多不同观点的人会进行自己的实验。
Donald Knuth
高德纳
In the first two parts of this book we talked a lot about requests and queries , and the corresponding responses or results . This style of data processing is assumed in many modern data systems: you ask for something, or you send an instruction, and some time later the system (hopefully) gives you an answer. Databases, caches, search indexes, web servers, and many other systems work this way.
在本书的前两个部分,我们广泛讨论了请求和查询,以及相应的回答或结果。许多现代数据系统都使用这种数据处理方式:您请求某些内容或发送指令,一段时间后系统(希望)给您答案。数据库,缓存,搜索索引,Web服务器和许多其他系统都是这样工作的。
In such online systems, whether it’s a web browser requesting a page or a service calling a remote API, we generally assume that the request is triggered by a human user, and that the user is waiting for the response. They shouldn’t have to wait too long, so we pay a lot of attention to the response time of these systems (see “Describing Performance” ).
在这样的在线系统中,无论是Web浏览器请求页面还是服务调用远程API,我们通常假设请求是由人类用户触发的,并且用户正在等待响应。他们不应该等待太久,因此我们非常注重这些系统的响应时间(参见“描述性能”)。
The web, and increasing numbers of HTTP/REST-based APIs, has made the request/response style of interaction so common that it’s easy to take it for granted. But we should remember that it’s not the only way of building systems, and that other approaches have their merits too. Let’s distinguish three different types of systems:
网络和HTTP / REST API数量的不断增加,使请求/响应式的交互方式变得如此普遍,以至于很容易把它视为理所当然的事情。但我们应该记住,这不是构建系统的唯一方式,其他方法也有他们的优点。让我们区分三种不同类型的系统: 网络和HTTP / REST API数量的不断增加,使请求/响应式的交互方式变得如此普遍,以至于很容易把它视为理所当然的事情。但我们应该记住,这不是构建系统的唯一方式,其他方法也有他们的优点。让我们区分三种不同类型的系统:
- Services (online systems)
-
A service waits for a request or instruction from a client to arrive. When one is received, the service tries to handle it as quickly as possible and sends a response back. Response time is usually the primary measure of performance of a service, and availability is often very important (if the client can’t reach the service, the user will probably get an error message).
一个服务等待客户端的请求或指令,一旦收到,服务会尽快处理并发送响应。响应时间通常是衡量服务性能的主要指标,可用性经常非常重要(如果客户端无法连接服务,则用户可能会收到错误消息)。
- Batch processing systems (offline systems)
-
A batch processing system takes a large amount of input data, runs a job to process it, and produces some output data. Jobs often take a while (from a few minutes to several days), so there normally isn’t a user waiting for the job to finish. Instead, batch jobs are often scheduled to run periodically (for example, once a day). The primary performance measure of a batch job is usually throughput (the time it takes to crunch through an input dataset of a certain size). We discuss batch processing in this chapter.
批次处理系统会处理大量的输入数据,并且运行作业以产生输出数据。由于作业通常需要一段时间(从几分钟到数天),因此通常没有用户等待作业完成。相反,批处理作业通常被安排定期运行(例如,每天一次)。批处理作业的主要性能指标通常是吞吐量(完成指定大小的输入数据集所需的时间)。我们在本章中会讨论批处理。
- Stream processing systems (near-real-time systems)
-
Stream processing is somewhere between online and offline/batch processing (so it is sometimes called near-real-time or nearline processing). Like a batch processing system, a stream processor consumes inputs and produces outputs (rather than responding to requests). However, a stream job operates on events shortly after they happen, whereas a batch job operates on a fixed set of input data. This difference allows stream processing systems to have lower latency than the equivalent batch systems. As stream processing builds upon batch processing, we discuss it in Chapter 11 .
流处理位于在线和离线/批处理之间(因此有时称为近实时或近线处理)。与批处理系统一样,流处理器消耗输入并产生输出(而不是响应请求)。但是,流作业在事件发生后不久运行,而批作业在固定的输入数据集上运行。这种差异使流处理系统具有比等效批处理系统更低的延迟。由于流处理建立在批处理之上,因此我们在第11章中讨论它。
As we shall see in this chapter, batch processing is an important building block in our quest to build reliable, scalable, and maintainable applications. For example, MapReduce, a batch processing algorithm published in 2004 [ 1 ], was (perhaps over-enthusiastically) called “the algorithm that makes Google so massively scalable” [ 2 ]. It was subsequently implemented in various open source data systems, including Hadoop, CouchDB, and MongoDB.
正如本章所述,批处理是我们构建可靠、可扩展和易于维护应用程序的重要基石。例如,MapReduce 是一种批处理算法,于 2004 年发表[1],被誉为“使谷歌规模如此巨大的算法”[2](可能过分热情)。它随后被实现在各种开源数据系统中,包括 Hadoop、CouchDB 和 MongoDB。
MapReduce is a fairly low-level programming model compared to the parallel processing systems that were developed for data warehouses many years previously [ 3 , 4 ], but it was a major step forward in terms of the scale of processing that could be achieved on commodity hardware. Although the importance of MapReduce is now declining [ 5 ], it is still worth understanding, because it provides a clear picture of why and how batch processing is useful.
MapReduce相比多年前为数据仓库开发的并行处理系统是一种相当低级的编程模型,但它在可运用于大规模处理商用硬件方面迈出了重要的一步。虽然MapReduce的重要性正在逐渐下降,但了解它仍是值得的,因为它清晰地阐述了为什么和如何使用批处理是有用的。
In fact, batch processing is a very old form of computing. Long before programmable digital computers were invented, punch card tabulating machines—such as the Hollerith machines used in the 1890 US Census [ 6 ]—implemented a semi-mechanized form of batch processing to compute aggregate statistics from large inputs. And MapReduce bears an uncanny resemblance to the electromechanical IBM card-sorting machines that were widely used for business data processing in the 1940s and 1950s [ 7 ]. As usual, history has a tendency of repeating itself.
事实上,批处理是一种非常古老的计算形式。早在可编程数字计算机被发明之前,例如在1890年美国人口普查中使用的Hollerith机器,实现了一种半机械化的批处理形式,以从大型输入中计算聚合统计数据。而MapReduce与1940年代和1950年代广泛用于业务数据处理的电机械IBM卡片分类机具有惊人的相似之处。通常,历史有一种重复的趋势。
In this chapter, we will look at MapReduce and several other batch processing algorithms and frameworks, and explore how they are used in modern data systems. But first, to get started, we will look at data processing using standard Unix tools. Even if you are already familiar with them, a reminder about the Unix philosophy is worthwhile because the ideas and lessons from Unix carry over to large-scale, heterogeneous distributed data systems.
在本章中,我们将研究MapReduce和其他几种批处理算法和框架,并探讨它们在现代数据系统中的应用。但首先,为了开始,我们将研究使用标准Unix工具的数据处理。即使您已经熟悉它们,Unix哲学的提醒也是值得的,因为Unix的思想和教训可以应用于大规模、异构分布式数据系统。
Batch Processing with Unix Tools
Let’s start with a simple example. Say you have a web server that appends a line to a log file every time it serves a request. For example, using the nginx default access log format, one line of the log might look like this:
让我们从一个简单的例子开始。假设您有一个Web服务器,每次提供请求时都向日志文件追加一行。例如,使用nginx默认访问日志格式,日志的一行可能如下所示:
216.58.210.78 - - [27/Feb/2015:17:55:11 +0000] "GET /css/typography.css HTTP/1.1" 200 3377 "http://martin.kleppmann.com/" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/40.0.2214.115 Safari/537.36"
(That is actually one line; it’s only broken onto multiple lines here for readability.) There’s a lot of information in that line. In order to interpret it, you need to look at the definition of the log format, which is as follows:
“这实际上是一行;为了易读性,它被分成了多行。这行信息非常多。为了解释它,你需要查看日志格式的定义,如下:”
$remote_addr - $remote_user [$time_local] "$request" $status $body_bytes_sent "$http_referer" "$http_user_agent"
So, this one line of the log indicates that on February 27, 2015, at 17:55:11 UTC, the server
received a request for the file
/css/typography.css
from the client IP address 216.58.210.78. The
user was not authenticated, so
$remote_user
is set to a hyphen (
-
). The response status was 200
(i.e., the request was successful), and the response was 3,377 bytes in size. The web browser was
Chrome 40, and it loaded the file because it was referenced in the page at the URL
http://martin.kleppmann.com/
.
因此,日志中的这一行表示在2015年2月27日17:55:11 UTC时,服务器从客户端IP地址216.58.210.78收到了对文件/css/typography.css的请求。用户未经过身份验证,因此$remote_user设置为连字符(-)。响应状态为200(即请求成功),响应大小为3,377字节。网页浏览器为Chrome 40,并且加载该文件是因为它在URL http://martin.kleppmann.com/ 的页面中被引用。
Simple Log Analysis
Various tools can take these log files and produce pretty reports about your website traffic, but for the sake of exercise, let’s build our own, using basic Unix tools. For example, say you want to find the five most popular pages on your website. You can do this in a Unix shell as follows: i
使用各种工具可以获取这些日志文件并生成有关网站流量的漂亮报告,但是为了练习起见,让我们使用基本的Unix工具构建自己的工具。例如,假设您想找到网站上最受欢迎的五个页面。您可以在Unix shell中按如下方式执行:
cat
/var/log/nginx/access.log
|
awk
'{print $7}'
|
sort
|
uniq
-c
|
sort
-r
-n
|
head
-n
5
-
Read the log file.
阅读日志文件。
-
Split each line into fields by whitespace, and output only the seventh such field from each line, which happens to be the requested URL. In our example line, this request URL is /css/typography.css .
将每行按空格分成多个字段,并只输出每行中的第七个字段,即所请求的URL。在我们的例子中,这个请求URL是/css/typography.css。
-
Alphabetically
sort
the list of requested URLs. If some URL has been requested n times, then after sorting, the file contains the same URL repeated n times in a row.按字母顺序排序所请求的URL列表。如果某个URL被请求了n次,则在排序后,文件中包含相同的URL连续出现n次。
-
The
uniq
command filters out repeated lines in its input by checking whether two adjacent lines are the same. The-c
option tells it to also output a counter: for every distinct URL, it reports how many times that URL appeared in the input.uniq 命令通过检查相邻行是否相同来过滤输入中的重复行。 -c 选项告诉它还要输出计数器:对于每个独特的 URL,它报告该 URL 在输入中出现了多少次。
-
The second
sort
sorts by the number (-n
) at the start of each line, which is the number of times the URL was requested. It then returns the results in reverse (-r
) order, i.e. with the largest number first.第二种排序方式是按照每行开头的数字(-n)来排序,这个数字表示URL被请求的次数。然后按照逆序(-r) 返回结果,也就是先返回请求次数最多的URL。
-
Finally,
head
outputs just the first five lines (-n 5
) of input, and discards the rest.最终,头部只输出输入的前五行(-n 5),并且丢弃其余部分。
The output of that series of commands looks something like this:
那串命令的输出大概是这个样子的:
4189 /favicon.ico 3631 /2013/05/24/improving-security-of-ssh-private-keys.html 2124 /2012/12/05/schema-evolution-in-avro-protocol-buffers-thrift.html 1369 / 915 /css/typography.css
Although the preceding command line likely looks a bit obscure if you’re unfamiliar with Unix tools,
it is incredibly powerful. It will process gigabytes of log files in a matter of seconds, and you
can easily modify the analysis to suit your needs. For example, if you want to omit CSS files from
the report, change the
awk
argument to
'$7 !~ /\.css$/ {print $7}'
. If you want to count top
client IP addresses instead of top pages, change the
awk
argument to
'{print $1}'
. And so on.
尽管如果您不熟悉Unix工具,前面的命令行可能看起来有些晦涩,但它非常强大。它可以在几秒钟内处理数千兆字节的日志文件,并且您可以轻松修改分析以适应您的需要。例如,如果您想从报告中省略CSS文件,将awk参数更改为'$7 !~ /\.css$/ {print $7}'。如果您想统计顶部客户端IP地址而不是顶部页面,则将awk参数更改为'{print $1}'。等等。
We don’t have space in this book to explore Unix tools in detail, but they are very much worth
learning about. Surprisingly many data analyses can be done in a few minutes using some combination
of
awk
,
sed
,
grep
,
sort
,
uniq
, and
xargs
, and they perform surprisingly well
[
8
].
我们在这本书中没有足够的空间详细探讨Unix工具,但它们非常值得学习。令人惊讶的是,使用awk、sed、grep、sort、uniq和xargs的一些组合,可以在几分钟内完成许多数据分析,并且它们的表现出奇的好 [8]。
Chain of commands versus custom program
Instead of the chain of Unix commands, you could write a simple program to do the same thing. For example, in Ruby, it might look something like this:
不用Unix命令的链条,你可以编写一个简单的程序来做同样的事情。例如,在Ruby中,它可能看起来像这样:
counts
=
Hash
.
new
(
0
)
File
.
open
(
'/var/log/nginx/access.log'
)
do
|
file
|
file
.
each
do
|
line
|
url
=
line
.
split
[
6
]
counts
[
url
]
+=
1
end
end
top5
=
counts
.
map
{
|
url
,
count
|
[
count
,
url
]
}
.
sort
.
reverse
[
0
.
.
.
5
]
top5
.
each
{
|
count
,
url
|
puts
"
#{
count
}
#{
url
}
"
}
-
counts
is a hash table that keeps a counter for the number of times we’ve seen each URL. A counter is zero by default."counts是一个哈希表,用于记录我们看到每个URL的次数。一个计数器默认为零。"
-
From each line of the log, we take the URL to be the seventh whitespace-separated field (the array index here is 6 because Ruby’s arrays are zero-indexed).
从日志的每一行中,我们将URL取为第七个用空格分隔的字段(在此,数组索引为6,因为Ruby的数组是从零开始计数的)。
-
Increment the counter for the URL in the current line of the log.
增加日志当前行中URL的计数器。
-
Sort the hash table contents by counter value (descending), and take the top five entries.
将哈希表内容按计数器值排序(降序),并取前五条条目。
-
Print out those top five entries.
打印出那五个排名前五项。
This program is not as concise as the chain of Unix pipes, but it’s fairly readable, and which of the two you prefer is partly a matter of taste. However, besides the superficial syntactic differences between the two, there is a big difference in the execution flow, which becomes apparent if you run this analysis on a large file.
这个程序不如Unix管道链那样简洁,但它相当易读,你喜欢哪一个主要取决于个人口味。然而,除了表面上的语法差异之外,这两者之间存在着执行流程的巨大差异,如果你在大文件上运行这个分析,这一点就会显而易见。
Sorting versus in-memory aggregation
The Ruby script keeps an in-memory hash table of URLs, where each URL is mapped to the number of times it has been seen. The Unix pipeline example does not have such a hash table, but instead relies on sorting a list of URLs in which multiple occurrences of the same URL are simply repeated.
Ruby脚本保持了一个内存hash表,其中每个URL被映射到它被访问的次数。Unix管道示例没有这样的哈希表,而是依靠对URL列表进行排序,其中相同URL的多次出现只是被重复。
Which approach is better? It depends how many different URLs you have. For most small to mid-sized websites, you can probably fit all distinct URLs, and a counter for each URL, in (say) 1 GB of memory. In this example, the working set of the job (the amount of memory to which the job needs random access) depends only on the number of distinct URLs: if there are a million log entries for a single URL, the space required in the hash table is still just one URL plus the size of the counter. If this working set is small enough, an in-memory hash table works fine—even on a laptop.
哪种方法更好?这取决于您有多少不同的URL。对于大多数小型到中型网站,您可以将所有独立的URL以及每个URL的计数器拟合在(比如说)1GB的内存中。在这个例子中,作业的工作集(作业需要随机访问的内存量)仅取决于独立URL的数量:如果有一百万条针对单个URL的日志条目,哈希表中所需的空间仍然只是一个URL加上计数器的大小。如果此工作集足够小,则内存中的哈希表可以正常工作,即使在笔记本电脑上也可以。
On the other hand, if the job’s working set is larger than the available memory, the sorting approach has the advantage that it can make efficient use of disks. It’s the same principle as we discussed in “SSTables and LSM-Trees” : chunks of data can be sorted in memory and written out to disk as segment files, and then multiple sorted segments can be merged into a larger sorted file. Mergesort has sequential access patterns that perform well on disks. (Remember that optimizing for sequential I/O was a recurring theme in Chapter 3 . The same pattern reappears here.)
如果工作集大于可用内存,排序方法具有以下优势:它可以高效地利用磁盘。这与我们在"SSTables and LSM-Trees"中讨论的原理相同:可以在内存中对数据块进行排序并将其写入磁盘作为片段文件,然后将多个排序段合并为一个较大的排序文件。归并排序具有良好的顺序访问模式,可在磁盘上执行良好的性能。(请记住,在第三章中,优化顺序I/O是一个反复出现的主题。这种模式在这里重新出现。)
The
sort
utility in GNU Coreutils (Linux) automatically handles larger-than-memory datasets by
spilling to disk, and automatically parallelizes sorting across multiple CPU cores
[
9
].
This means that the simple chain of Unix commands we saw earlier easily scales to large datasets, without
running out of memory. The bottleneck is likely to be the rate at which the input file can be read
from disk.
GNU Coreutils(Linux)中的排序实用程序通过向磁盘溢出并自动并行化多个 CPU 内核中的排序来自动处理大于内存数据集。这意味着我们之前看到的简单的 Unix 命令链可以轻松扩展到大型数据集,而不会用尽内存。 Eng: GNU Coreutils (Linux)中的排序实用程序通过向磁盘溢出并自动并行化多个CPU内核中的排序来自动处理大于内存数据集。这意味着我们之前看到的简单的Unix命令链可以轻松扩展到大型数据集,而不会用尽内存。瓶颈可能是从磁盘读取输入文件的速率。
The Unix Philosophy
It’s no coincidence that we were able to analyze a log file quite easily, using a chain of commands like in the previous example: this was in fact one of the key design ideas of Unix, and it remains astonishingly relevant today. Let’s look at it in some more depth so that we can borrow some ideas from Unix [ 10 ].
我们能够轻松地使用一系列命令,像之前的例子一样分析日志文件,这并非巧合:这实际上是Unix的关键设计思想之一,而且如今仍然非常相关。让我们深入了解一下,这样我们就可以从Unix中借鉴一些思想。
Doug McIlroy, the inventor of Unix pipes, first described them like this in 1964 [ 11 ]: “We should have some ways of connecting programs like [a] garden hose—screw in another segment when it becomes necessary to massage data in another way. This is the way of I/O also.” The plumbing analogy stuck, and the idea of connecting programs with pipes became part of what is now known as the Unix philosophy —a set of design principles that became popular among the developers and users of Unix. The philosophy was described in 1978 as follows [ 12 , 13 ]:
道格·麦克罗伊(Doug McIlroy)是Unix管道的发明者,他在1964年首次这样描述它们:“我们应该有一些连接程序的方法,就像[花园水管]一样--当需要以另一种方式处理数据时,拧紧另一段。这也是I/O的方式。”这个管道的类比很受欢迎,将程序用管道连接起来的想法成为了Unix哲学的一部分——一套设计原则,成为Unix开发者和用户中广受欢迎的哲学。这个哲学在1978年被描述为:
Make each program do one thing well. To do a new job, build afresh rather than complicate old programs by adding new “features”.
每个程序都要做好一件事情。要完成新的工作,就要重新开始,而不是通过添加新的“功能”来使旧程序变得复杂。
Expect the output of every program to become the input to another, as yet unknown, program. Don’t clutter output with extraneous information. Avoid stringently columnar or binary input formats. Don’t insist on interactive input.
期望每个程序的输出成为另一个尚未知晓的程序的输入。不要用杂乱无用的信息混淆输出。避免使用严格的列或二进制输入格式。不要坚持交互式输入。
Design and build software, even operating systems, to be tried early, ideally within weeks. Don’t hesitate to throw away the clumsy parts and rebuild them.
设计并构建软件,甚至操作系统,在几周内尝试。不要犹豫地丢弃笨拙的部分并重新构建。
Use tools in preference to unskilled help to lighten a programming task, even if you have to detour to build the tools and expect to throw some of them out after you’ve finished using them.
使用工具来轻松完成编程任务,而不是使用不熟练的帮助,即使你需要绕道建立工具并且在使用完后可能需要放弃一些。
This approach—automation, rapid prototyping, incremental iteration, being friendly to experimentation, and breaking down large projects into manageable chunks—sounds remarkably like the Agile and DevOps movements of today. Surprisingly little has changed in four decades.
这种方法——自动化、快速原型制作、迭代增量、友好实验和将大型项目分解成可管理的块——听起来非常像今天的敏捷和DevOps运动。四十年过去了,令人惊讶的是很少改变。
The
sort
tool is a great example of a program that does one thing well. It is arguably a better
sorting implementation than most programming languages have in their standard libraries (which do not
spill to disk and do not use multiple threads, even when that would be beneficial). And yet,
sort
is barely useful in isolation. It only becomes powerful in combination with the other Unix tools,
such as
uniq
.
排序工具是一个很好的例子,它能够很好地完成一件事情。它可能比大多数编程语言的标准库中的排序实现更好(即使在这种情况下使用多线程更有益),但在孤立的情况下它几乎没用。只有与其他Unix工具(如uniq)相结合,它才变得强大。
A Unix shell like
bash
lets us easily
compose
these small programs into surprisingly powerful
data processing jobs. Even though many of these programs are written by different groups of people,
they can be joined together in flexible ways. What does Unix do to enable this composability?
像bash这样的Unix shell让我们轻松地将这些小程序组合成令人惊讶的强大数据处理作业。尽管许多这些程序由不同的人编写,但它们可以以灵活的方式组合在一起。Unix是如何实现这种可组合性的呢?
A uniform interface
If you expect the output of one program to become the input to another program, that means those programs must use the same data format—in other words, a compatible interface. If you want to be able to connect any program’s output to any program’s input, that means that all programs must use the same input/output interface.
如果您希望一个程序的输出成为另一个程序的输入,那么这些程序必须使用相同的数据格式,换句话说,必须具有兼容的接口。如果您想能够将任何程序的输出连接到任何程序的输入,那么所有程序都必须使用相同的输入/ 输出接口。
In Unix, that interface is a file (or, more precisely, a file descriptor). A file is just an ordered
sequence of bytes. Because that is such a simple interface, many different things can be represented
using the same interface: an actual file on the filesystem, a communication channel to another
process (Unix socket,
stdin
,
stdout
), a device driver (say
/dev/audio
or
/dev/lp0
), a socket
representing a TCP connection, and so on. It’s easy to take this for granted, but it’s actually
quite remarkable that these very different things can share a uniform interface, so they can easily
be plugged together.
ii
在Unix中,该接口是一个文件(或更精确地说,是一个文件描述符)。文件只是一个有序的字节序列。因为这是一个如此简单的接口,许多不同的事物都可以使用相同的接口表示:文件系统上的实际文件,到另一个进程的通信通道(Unix套接字,stdin,stdout),设备驱动程序(例如/dev/audio或/dev/lp0),表示TCP连接的套接字等。很容易认为这是理所当然的,但实际上,这些非常不同的东西可以共享统一的接口,因此可以轻松地将它们组合在一起。
By convention, many (but not all) Unix programs treat this sequence of bytes as ASCII text. Our log
analysis example used this fact:
awk
,
sort
,
uniq
, and
head
all treat their input file
as a list of records separated by the
\n
(newline, ASCII
0x0A
) character. The choice of
\n
is
arbitrary—arguably, the ASCII record separator
0x1E
would have been a better choice, since it’s
intended for this purpose [
14
]—but in any case, the fact that
all these programs have standardized on using the same record separator allows them to interoperate.
按照惯例,许多(但不是所有)Unix程序将这个字节序列视为ASCII文本。我们的日志分析示例利用了这一事实:awk、sort、uniq和head都将它们的输入文件视为由\n(换行符,ASCII 0x0A)字符分隔的记录列表。选择\n是任意的 - 可以说,ASCII记录分隔符0x1E可能是更好的选择,因为它是为此目的而设计的[14] - 但无论如何,所有这些程序标准化使用相同的记录分隔符,使它们可以互操作。
The parsing of each record (i.e., a line of input) is more vague. Unix tools commonly split a line
into fields by whitespace or tab characters, but CSV (comma-separated), pipe-separated, and other
encodings are also used. Even a fairly simple tool like
xargs
has half a dozen command-line
options for specifying how its input should be parsed.
每条记录的解析(即输入的一行)更加模糊。Unix工具通常通过空格或制表符将一行分割为字段,但CSV(逗号分隔)、管道分隔和其他编码也被使用。即使像xargs这样相当简单的工具也有半打命令行选项来指定其输入应该如何解析。
The uniform interface of ASCII text mostly works, but it’s not exactly beautiful: our log analysis
example used
{print $7}
to extract the URL, which is not very readable. In an ideal world this
could have perhaps been
{print $request_url}
or something of that sort. We will return to this
idea later.
ASCII文本的统一接口大部分都能够正常运作,但它并不是非常美观:我们的日志分析示例使用{print $7}来提取URL,这并不是非常易读。在理想的情况下,可能会使用{print $request_url}或类似的方式。稍后我们会回到这个想法。
Although it’s not perfect, even decades later, the uniform interface of Unix is still something remarkable. Not many pieces of software interoperate and compose as well as Unix tools do: you can’t easily pipe the contents of your email account and your online shopping history through a custom analysis tool into a spreadsheet and post the results to a social network or a wiki. Today it’s an exception, not the norm, to have programs that work together as smoothly as Unix tools do.
尽管不完美,Unix 的统一界面至今仍然值得称道。没有多少软件像Unix工具一样互操作和组合得如此出色:您无法轻松地通过自定义分析工具将电子邮件账户和在线购物历史的内容输入到电子表格中,并将结果发布到社交网络或维基上。如今,拥有像Unix工具那样顺畅地协同工作的程序已成为例外而非常规。
Even databases with the same data model often don’t make it easy to get data out of one and into the other. This lack of integration leads to Balkanization of data.
即使数据模型相同的数据库,通常也不容易将数据从一个数据库传输到另一个数据库。这种缺乏集成会导致数据的巴尔干化。
Separation of logic and wiring
Another characteristic feature of Unix tools is their use of standard input (
stdin
) and standard
output (
stdout
). If you run a program and don’t specify anything else,
stdin
comes from the
keyboard and
stdout
goes to the screen. However, you can also take input from a file and/or
redirect output to a file. Pipes let you attach the
stdout
of one process to the
stdin
of
another process (with a small in-memory buffer, and without writing the entire intermediate data
stream to disk).
Unix工具的另一个特点是它们使用标准输入(stdin)和标准输出(stdout)。如果您运行一个程序而没有指定任何其他内容,stdin就来自键盘,stdout就会显示在屏幕上。但是,您也可以从文件中获取输入,并将输出重定向到文件。使用管道(pipe)可以将一个进程的stdout连接到另一个进程的stdin(通过一个小的内存缓冲区,而无需将整个中间数据流写入磁盘)。
A program can still read and write files directly if it needs to, but the Unix approach works best
if a program doesn’t worry about particular file paths and simply uses
stdin
and
stdout
. This
allows a shell user to wire up the input and output in whatever way they want; the program doesn’t
know or care where the input is coming from and where the output is going to. (One could say this is
a form of
loose coupling
,
late binding
[
15
], or
inversion of control
[
16
].) Separating the input/output wiring from the
program logic makes it easier to compose small tools into bigger systems.
如果需要的话,程序仍然可以直接读写文件,但是Unix的方法最好是如果程序不用担心特定的文件路径,而只是使用stdin和stdout。这使得shell用户可以以任何他们想要的方式连接输入和输出;程序不知道也不关心输入来自哪里,输出去哪里。 (可以说这是一种松散耦合,晚绑定 [15]或控制反转[16]的形式。)将输入/输出布线与程序逻辑分离使得更容易将小工具组合成更大的系统。
You can even write your own programs and combine them with the tools provided by the operating
system. Your program just needs to read input from
stdin
and write output to
stdout
, and it can
participate in data processing pipelines. In the log analysis example, you could write a tool that
translates user-agent strings into more sensible browser identifiers, or a tool that translates IP
addresses into country codes, and simply plug it into the pipeline. The
sort
program doesn’t care
whether it’s communicating with another part of the operating system or with a program written by
you.
你甚至可以编写自己的程序并将它们与操作系统提供的工具结合使用。你的程序只需要从stdin读取输入并将输出写入stdout,就可以参与数据处理管道。在日志分析示例中,你可以编写一个工具,将用户代理字符串转换为更合理的浏览器标识符,或者将IP地址转换为国家代码,并将其简单地插入到管道中。排序程序无论是与操作系统的另一个部分还是与你编写的程序通信,都不会在意。
However, there are limits to what you can do with
stdin
and
stdout
. Programs that need multiple
inputs or outputs are possible but tricky. You can’t pipe a program’s output into a network
connection [
17
,
18
].
iii
If a program directly opens files for reading and writing, or starts another program as a
subprocess, or opens a network connection, then that I/O is wired up by the program itself. It can
still be configurable (through command-line options, for example), but the flexibility of wiring up
inputs and outputs in a shell is reduced.
然而,使用 stdin 和 stdout 存在一定的限制。需要多个输入或输出的程序也可以编写,但比较棘手。不能将程序的输出导入网络连接[17,18]. 如果程序直接打开文件进行读写,或启动另一个程序作为子进程,或打开网络连接,则程序自身将进行IO布线。它仍然可以通过命令行选项进行配置,但在 shell 中布线输入和输出的灵活性会降低。
Transparency and experimentation
Part of what makes Unix tools so successful is that they make it quite easy to see what is going on:
Unix工具之所以如此成功的一部分原因是它们很容易让人看到正在发生的事情。
-
The input files to Unix commands are normally treated as immutable. This means you can run the commands as often as you want, trying various command-line options, without damaging the input files.
Unix命令的输入文件通常被视为不可变的。这意味着您可以多次运行命令,尝试不同的命令行选项,而不会破坏输入文件。
-
You can end the pipeline at any point, pipe the output into
less
, and look at it to see if it has the expected form. This ability to inspect is great for debugging.您可以在任何时候结束管道,将输出导入到 less 中并查看它,以查看其是否具有预期的形式。这种检查的能力对于调试非常有用。
-
You can write the output of one pipeline stage to a file and use that file as input to the next stage. This allows you to restart the later stage without rerunning the entire pipeline.
你可以将一个流水线阶段的输出写入文件,并将该文件作为下一个阶段的输入。这使得你可以在不重新运行整个管道的情况下重新启动后面的阶段。
Thus, even though Unix tools are quite blunt, simple tools compared to a query optimizer of a relational database, they remain amazingly useful, especially for experimentation.
因此,尽管与关系数据库的查询优化器相比,Unix工具非常简单,但它们仍然非常有用,特别是对于实验。
However, the biggest limitation of Unix tools is that they run only on a single machine—and that’s where tools like Hadoop come in.
然而,Unix工具的最大限制在于它们只能运行在单个机器上,这就是Hadoop等工具出现的原因。
MapReduce and Distributed Filesystems
MapReduce is a bit like Unix tools, but distributed across potentially thousands of machines. Like Unix tools, it is a fairly blunt, brute-force, but surprisingly effective tool. A single MapReduce job is comparable to a single Unix process: it takes one or more inputs and produces one or more outputs.
MapReduce就像Unix工具一样,但是分布在可能数千台机器上。像Unix工具一样,它是一个相当短暂,粗暴的,但出奇地有效的工具。一个MapReduce作业相当于一个Unix进程:它需要一个或多个输入,并产生一个或多个输出。
As with most Unix tools, running a MapReduce job normally does not modify the input and does not have any side effects other than producing the output. The output files are written once, in a sequential fashion (not modifying any existing part of a file once it has been written).
与大多数Unix工具一样,运行MapReduce作业通常不修改输入,并且除了生成输出之外没有任何副作用。输出文件按顺序一次性写入(一旦写入过某个文件的任何部分,就不会修改该部分)。
While Unix tools use
stdin
and
stdout
as input and output, MapReduce jobs read and write files
on a distributed filesystem. In Hadoop’s implementation of MapReduce, that filesystem is called HDFS
(Hadoop Distributed File System), an open source reimplementation of the Google File System (GFS)
[
19
].
而Unix工具使用标准输入和输出作为输入和输出,MapReduce作业则读取和写入分布式文件系统上的文件。在Hadoop的MapReduce实现中,该文件系统称为HDFS(Hadoop分布式文件系统),是Google文件系统(GFS)的开源重新实现[19]。
Various other distributed filesystems besides HDFS exist, such as GlusterFS and the Quantcast File System (QFS) [ 20 ]. Object storage services such as Amazon S3, Azure Blob Storage, and OpenStack Swift [ 21 ] are similar in many ways. iv In this chapter we will mostly use HDFS as a running example, but the principles apply to any distributed filesystem.
除HDFS以外,还存在其他分布式文件系统,如GlusterFS和Quantcast文件系统(QFS)[20]。对象存储服务,如Amazon S3,Azure Blob存储和OpenStack Swift [21],在许多方面类似。在本章中,我们将主要使用HDFS作为运行示例,但这些原则适用于任何分布式文件系统。
HDFS is based on the shared-nothing principle (see the introduction to Part II ), in contrast to the shared-disk approach of Network Attached Storage (NAS) and Storage Area Network (SAN) architectures. Shared-disk storage is implemented by a centralized storage appliance, often using custom hardware and special network infrastructure such as Fibre Channel. On the other hand, the shared-nothing approach requires no special hardware, only computers connected by a conventional datacenter network.
HDFS基于共享无关原则(参见第II部分的介绍),与网络附加存储(NAS)和存储区域网络(SAN)架构的共享磁盘方法相反。共享磁盘存储由集中式存储设备实现,通常使用定制硬件和特殊网络基础设施,如光纤通道。另一方面,共享无关方法不需要特殊硬件,只需要通过传统的数据中心网络连接计算机。
HDFS consists of a daemon process running on each machine, exposing a network service that allows other nodes to access files stored on that machine (assuming that every general-purpose machine in a datacenter has some disks attached to it). A central server called the NameNode keeps track of which file blocks are stored on which machine. Thus, HDFS conceptually creates one big filesystem that can use the space on the disks of all machines running the daemon.
HDFS由在每台计算机上运行的守护程序进程组成,它公开了一项网络服务,允许其他节点访问存储在该计算机上的文件(假设数据中心中的每台通用计算机都连接了一些磁盘)。一个名为NameNode的中央服务器跟踪存储在哪台计算机上的文件块。因此,HDFS在概念上创建了一个大型的文件系统,可以使用运行守护程序的所有计算机上的磁盘空间。
In order to tolerate machine and disk failures, file blocks are replicated on multiple machines. Replication may mean simply several copies of the same data on multiple machines, as in Chapter 5 , or an erasure coding scheme such as Reed–Solomon codes, which allows lost data to be recovered with lower storage overhead than full replication [ 20 , 22 ]. The techniques are similar to RAID, which provides redundancy across several disks attached to the same machine; the difference is that in a distributed filesystem, file access and replication are done over a conventional datacenter network without special hardware.
为了容忍机器和磁盘故障,文件块会在多台机器上复制。复制可能是指将同一数据简单地在多台机器上复制几份,就像第5章中那样,也可能是纠删码方案,如Reed-Solomon码,这使得可以用比全复制更低的存储开销来恢复遗失的数据[20,22]。 这些技术类似于RAID,它提供对连接到同一台机器上的多个磁盘的冗余;不同之处在于,在分布式文件系统中,文件访问和复制是通过普通的数据中心网络完成的,没有特殊的硬件。
HDFS has scaled well: at the time of writing, the biggest HDFS deployments run on tens of thousands of machines, with combined storage capacity of hundreds of petabytes [ 23 ]. Such large scale has become viable because the cost of data storage and access on HDFS, using commodity hardware and open source software, is much lower than that of the equivalent capacity on a dedicated storage appliance [ 24 ].
HDFS已经成功地扩展了:在撰写本文的时候,最大的HDFS部署运行在数万台机器上,存储能力达到数百PB[23]。如此大规模的应用之所以可行,是因为使用普通硬件和开源软件在HDFS上进行数据存储和访问的成本远远低于专用存储设备[24]。
MapReduce Job Execution
MapReduce is a programming framework with which you can write code to process large datasets in a distributed filesystem like HDFS. The easiest way of understanding it is by referring back to the web server log analysis example in “Simple Log Analysis” . The pattern of data processing in MapReduce is very similar to this example:
MapReduce是一个编程框架,您可以使用它编写代码来处理像HDFS这样的分布式文件系统中的大型数据集。最简单的理解方法是参考“简单日志分析”中的Web服务器日志分析示例。MapReduce中的数据处理模式非常类似于此示例。
-
Read a set of input files, and break it up into records . In the web server log example, each record is one line in the log (that is,
\n
is the record separator).读取一组输入文件,并将其分解为记录。在Web服务器日志示例中,每个记录都是日志中的一行(即,\n是记录分隔符)。
-
Call the mapper function to extract a key and value from each input record. In the preceding example, the mapper function is
awk '{print $7}'
: it extracts the URL ($7
) as the key, and leaves the value empty.请调用映射函数从每个输入记录中提取键和值。在上面的例子中,mapper函数是awk '{print $7}':它将URL ($7)提取为键,值保留为空。
-
Sort all of the key-value pairs by key. In the log example, this is done by the first
sort
command.将所有键-值对按键排序。在示例日志中,这是通过第一个排序命令完成的。
-
Call the reducer function to iterate over the sorted key-value pairs. If there are multiple occurrences of the same key, the sorting has made them adjacent in the list, so it is easy to combine those values without having to keep a lot of state in memory. In the preceding example, the reducer is implemented by the command
uniq -c
, which counts the number of adjacent records with the same key.调用 reducer 函数来迭代排序后的键值对列表。如果有多个相同的键,则排序将使它们相邻,因此很容易将这些值组合起来,无需在内存中保存大量状态。在前面的示例中,reducer 由 uniq -c 命令实现,该命令计算具有相同键的相邻记录数。
Those four steps can be performed by one MapReduce job. Steps 2 (map) and 4 (reduce) are where you
write your custom data processing code. Step 1 (breaking files into records) is handled by the input
format parser. Step 3, the
sort
step, is implicit in MapReduce—you don’t have to write it, because the
output from the mapper is always sorted before it is given to the reducer.
这四个步骤可以通过一个MapReduce作业完成。第二步(映射)和第四步(减少)是您编写自定义数据处理代码的地方。第一步(将文件拆分为记录)由输入格式解析器处理。第三步,排序步骤,在MapReduce中是隐含的 - 您无需编写它,因为从mapper的输出始终在提交给reducer之前进行排序。
To create a MapReduce job, you need to implement two callback functions, the mapper and reducer, which behave as follows (see also “MapReduce Querying” ):
创建一个MapReduce作业,需要实现两个回调函数,即mapper和reducer函数,它们的行为如下(也可参见“MapReduce查询”):
- Mapper
-
The mapper is called once for every input record, and its job is to extract the key and value from the input record. For each input, it may generate any number of key-value pairs (including none). It does not keep any state from one input record to the next, so each record is handled independently.
映射器针对每个输入记录调用一次,其工作是从输入记录中提取键和值。对于每个输入,它可以生成任意数量的键值对(包括零)。它不会保留从一个输入记录到下一个的任何状态,因此每个记录都是独立处理的。
- Reducer
-
The MapReduce framework takes the key-value pairs produced by the mappers, collects all the values belonging to the same key, and calls the reducer with an iterator over that collection of values. The reducer can produce output records (such as the number of occurrences of the same URL).
MapReduce框架通过收集键值对,将由映射器产生的值根据键的归属进行聚合,并使用该值集合上的迭代器调用缩小器。缩小器可以产生输出记录 (例如相同URL的出现次数)。
In the web server log example, we had a second
sort
command in step 5, which ranked URLs by number
of requests. In MapReduce, if you need a second sorting stage, you can implement it by writing a
second MapReduce job and using the output of the first job as input to the second job. Viewed like
this, the role of the mapper is to prepare the data by putting it into a form that is suitable for
sorting, and the role of the reducer is to process the data that has been sorted.
在Web服务器日志示例中,我们在第五步中使用了第二个排序命令,这将URL按请求次数进行排名。在MapReduce中,如果需要第二个排序阶段,可以通过编写第二个MapReduce作业,将第一个作业的输出作为第二个作业的输入,来实现它。从这个角度看,mapper的作用是通过将数据放入适合排序的形式来准备数据,而reducer的作用是处理已经排序的数据。
Distributed execution of MapReduce
The main difference from pipelines of Unix commands is that MapReduce can parallelize a computation across many machines, without you having to write code to explicitly handle the parallelism. The mapper and reducer only operate on one record at a time; they don’t need to know where their input is coming from or their output is going to, so the framework can handle the complexities of moving data between machines.
MapReduce与Unix命令管道的主要区别在于,MapReduce可以在许多计算机上并行化计算,而无需编写代码来显式处理并行性。映射器和规约器一次只处理一条记录,它们不需要知道其输入来自何处或其输出要去何处,因此框架可以处理在计算机之间移动数据的复杂性。
It is possible to use standard Unix tools as mappers and reducers in a distributed computation [ 25 ], but more commonly they are implemented as functions in a conventional programming language. In Hadoop MapReduce, the mapper and reducer are each a Java class that implements a particular interface. In MongoDB and CouchDB, mappers and reducers are JavaScript functions (see “MapReduce Querying” ).
可以使用标准的Unix工具作为分布式计算中的映射器和减少器[25],但更常见的是将它们实现为常规编程语言中的函数。在Hadoop MapReduce中,映射器和减少器是实现特定接口的Java类。在MongoDB和CouchDB中,映射器和减少器是JavaScript函数(参见“MapReduce查询”)。
Figure 10-1 shows the dataflow in a Hadoop MapReduce job. Its parallelization is based on partitioning (see Chapter 6 ): the input to a job is typically a directory in HDFS, and each file or file block within the input directory is considered to be a separate partition that can be processed by a separate map task (marked by m 1 , m 2 , and m 3 in Figure 10-1 ).
图 10-1 显示了 Hadoop MapReduce 作业的数据流。它的并行化基于分区(请参见第 6 章):作业的输入通常是 HDFS 中的一个目录,输入目录中的每个文件或文件块被视为可由单独的映射任务(在图 10-1 中标记为 m 1、m 2 和 m 3)处理的单独分区。
Each input file is typically hundreds of megabytes in size. The MapReduce scheduler (not shown in the diagram) tries to run each mapper on one of the machines that stores a replica of the input file, provided that machine has enough spare RAM and CPU resources to run the map task [ 26 ]. This principle is known as putting the computation near the data [ 27 ]: it saves copying the input file over the network, reducing network load and increasing locality.
每个输入文件通常都有数百兆字节的大小。MapReduce调度程序(在图表中未显示)试图在一台存储输入文件副本的机器上运行每个映射器,前提是该机器具有足够的空闲RAM和CPU资源来运行映射任务[26]。 这个原则被称为将计算放在数据附近[27]:它可以避免在网络上传输输入文件,减少网络负荷,提高局部性。
In most cases, the application code that should run in the map task is not yet present on the machine that is assigned the task of running it, so the MapReduce framework first copies the code (e.g., JAR files in the case of a Java program) to the appropriate machines. It then starts the map task and begins reading the input file, passing one record at a time to the mapper callback. The output of the mapper consists of key-value pairs.
在大多数情况下,应该在映射任务中运行的应用程序代码尚未存在于被分配运行任务的机器上,因此MapReduce框架首先将代码(例如,在Java程序的情况下为JAR文件)复制到相应的机器上。然后启动映射任务,并开始读取输入文件,逐个将记录传递给映射器回调函数。映射器的输出由键值对组成。
The reduce side of the computation is also partitioned. While the number of map tasks is determined by the number of input file blocks, the number of reduce tasks is configured by the job author (it can be different from the number of map tasks). To ensure that all key-value pairs with the same key end up at the same reducer, the framework uses a hash of the key to determine which reduce task should receive a particular key-value pair (see “Partitioning by Hash of Key” ).
计算的归约阶段也被分区了。虽然地图任务的数量由输入文件块的数量确定,但归约任务的数量由作业作者配置(它可以与地图任务的数量不同)。为了确保具有相同键的所有键值对都最终在同一个归约器中,框架使用键的哈希值来确定哪个归约任务应接收特定的键值对(请参阅“按关键字哈希分区”)。 Note: This translation may not be accurate and is provided as a rough guide only.
The key-value pairs must be sorted, but the dataset is likely too large to be sorted with a conventional sorting algorithm on a single machine. Instead, the sorting is performed in stages. First, each map task partitions its output by reducer, based on the hash of the key. Each of these partitions is written to a sorted file on the mapper’s local disk, using a technique similar to what we discussed in “SSTables and LSM-Trees” .
键值对必须排序,但数据集可能太大,无法使用单个机器上的传统排序算法进行排序。相反,排序是分阶段进行的。首先,每个映射任务根据键的哈希将其输出分区到缩小器。每个这些分区都写入映射器本地磁盘上的排序文件,使用类似于我们在“ SSTables和LSM-Trees”中讨论的技术。
Whenever a mapper finishes reading its input file and writing its sorted output files, the MapReduce scheduler notifies the reducers that they can start fetching the output files from that mapper. The reducers connect to each of the mappers and download the files of sorted key-value pairs for their partition. The process of partitioning by reducer, sorting, and copying data partitions from mappers to reducers is known as the shuffle [ 26 ] (a confusing term—unlike shuffling a deck of cards, there is no randomness in MapReduce).
每当一个映射器完成读取其输入文件并写入其排序的输出文件时,MapReduce调度程序会通知减少器,它们可以开始从该映射器获取输出文件。 减数器连接到每个映射器,并下载其分区的已排序键值对文件。由减小程序分区,排序和复制数据分区从映射到减程序的过程称为shuffle [26](一个令人困惑的术语-与随机化纸牌不同,MapReduce中没有随机性)。
The reduce task takes the files from the mappers and merges them together, preserving the sort order. Thus, if different mappers produced records with the same key, they will be adjacent in the merged reducer input.
Reducer任务将来自映射器的文件合并在一起,保留了排序顺序。因此,如果不同的映射器生成具有相同键的记录,它们将相邻地出现在合并后的Reducer输入中。
The reducer is called with a key and an iterator that incrementally scans over all records with the same key (which may in some cases not all fit in memory). The reducer can use arbitrary logic to process these records, and can generate any number of output records. These output records are written to a file on the distributed filesystem (usually, one copy on the local disk of the machine running the reducer, with replicas on other machines).
reducer被调用时,会传入一个键和一个迭代器,该迭代器会逐步扫描所有具有相同键的记录(有些情况下可能无法全部放入内存)。reducer可以使用任意逻辑来处理这些记录,并且可以生成任意数量的输出记录。这些输出记录将被写入分布式文件系统上的文件中(通常情况下,一份副本存储在运行reducer的机器的本地磁盘上,其他机器上也会有其副本)。
MapReduce workflows
The range of problems you can solve with a single MapReduce job is limited. Referring back to the log analysis example, a single MapReduce job could determine the number of page views per URL, but not the most popular URLs, since that requires a second round of sorting.
使用单个MapReduce作业可以解决的问题范围有限。回到日志分析的例子,单个MapReduce作业可以确定每个URL的页面浏览次数,但无法确定最受欢迎的URL,因为这需要进行第二轮排序。
Thus, it is very common for MapReduce jobs to be chained together into workflows , such that the output of one job becomes the input to the next job. The Hadoop MapReduce framework does not have any particular support for workflows, so this chaining is done implicitly by directory name: the first job must be configured to write its output to a designated directory in HDFS, and the second job must be configured to read that same directory name as its input. From the MapReduce framework’s point of view, they are two independent jobs.
因此,将MapReduce作业链接成工作流非常常见,以使一个作业的输出成为下一个作业的输入。 Hadoop MapReduce框架没有对工作流程提供任何特殊支持,因此通过目录名称隐式连接这些操作:第一个作业必须配置为将其输出写入HDFS中的指定目录,第二个作业必须配置为读取该目录名称作为其输入。从MapReduce框架的角度来看,它们是两个独立的作业。
Chained MapReduce jobs are therefore less like pipelines of Unix commands (which pass the output of one process as input to another process directly, using only a small in-memory buffer) and more like a sequence of commands where each command’s output is written to a temporary file, and the next command reads from the temporary file. This design has advantages and disadvantages, which we will discuss in “Materialization of Intermediate State” .
串联的MapReduce作业因此不像Unix命令的管道(它直接将一个进程的输出作为另一个进程的输入传递,只使用一个小的内存缓冲区),而更像一个命令序列,每个命令的输出都被写入临时文件,下一个命令从临时文件中读取。这种设计有优点和缺点,我们将在“中间状态的实现”中讨论。
A batch job’s output is only considered valid when the job has completed successfully (MapReduce discards the partial output of a failed job). Therefore, one job in a workflow can only start when the prior jobs—that is, the jobs that produce its input directories—have completed successfully. To handle these dependencies between job executions, various workflow schedulers for Hadoop have been developed, including Oozie, Azkaban, Luigi, Airflow, and Pinball [ 28 ].
只有批处理作业成功完成(MapReduce会丢弃失败作业的部分输出),作业的输出才被视为有效。因此,在工作流程中,一个作业只有在其前置作业(即生成其输入目录的作业)成功完成后才能开始。为了处理作业执行之间的这些依赖关系,开发了各种Hadoop工作流调度程序,包括Oozie、Azkaban、Luigi、Airflow和Pinball[28]。
These schedulers also have management features that are useful when maintaining a large collection of batch jobs. Workflows consisting of 50 to 100 MapReduce jobs are common when building recommendation systems [ 29 ], and in a large organization, many different teams may be running different jobs that read each other’s output. Tool support is important for managing such complex dataflows.
这些调度程序还具有管理特性,可在维护大量批处理作业时非常有用。在构建推荐系统时,包含50到100个MapReduce作业的工作流程很常见[29],在大型组织中,许多不同的团队可能在运行读取彼此输出的不同作业。工具支持对于管理如此复杂的数据流至关重要。
Various higher-level tools for Hadoop, such as Pig [ 30 ], Hive [ 31 ], Cascading [ 32 ], Crunch [ 33 ], and FlumeJava [ 34 ], also set up workflows of multiple MapReduce stages that are automatically wired together appropriately.
Hadoop的各种高级工具,如Pig(30)、Hive(31)、Cascading(32)、Crunch(33)和FlumeJava(34),还可以设置多个MapReduce阶段的工作流,自动适当地连接在一起。
Reduce-Side Joins and Grouping
We discussed joins in Chapter 2 in the context of data models and query languages, but we have not delved into how joins are actually implemented. It is time that we pick up that thread again.
我们在第2章中讨论了连接,涉及数据模型和查询语言,但我们还没有深入探讨连接是如何实现的。现在是我们重新开始探讨这个话题的时候了。
In many datasets it is common for one record to have an association with another record: a foreign key in a relational model, a document reference in a document model, or an edge in a graph model. A join is necessary whenever you have some code that needs to access records on both sides of that association (both the record that holds the reference and the record being referenced). As discussed in Chapter 2 , denormalization can reduce the need for joins but generally not remove it entirely. v
在许多数据集中,一个记录与另一个记录有关联:在关系模型中是外键,在文档模型中是文档引用,或者在图模型中是边缘。每当您有一些需要访问该关联的两边记录(既包含引用的记录,又包含被引用的记录)的代码时,就需要进行连接。正如第2章所讨论的那样,去规范化可以减少连接的需求,但通常不能完全消除它。
In a database, if you execute a query that involves only a small number of records, the database will typically use an index to quickly locate the records of interest (see Chapter 3 ). If the query involves joins, it may require multiple index lookups. However, MapReduce has no concept of indexes—at least not in the usual sense.
在数据库中,如果您执行仅涉及少量记录的查询,数据库通常会使用索引快速定位感兴趣的记录(请参见第3章)。如果查询涉及连接,则可能需要进行多个索引查找。然而,MapReduce没有索引的概念-至少不是通常意义上的。
When a MapReduce job is given a set of files as input, it reads the entire content of all of those files; a database would call this operation a full table scan . If you only want to read a small number of records, a full table scan is outrageously expensive compared to an index lookup. However, in analytic queries (see “Transaction Processing or Analytics?” ) it is common to want to calculate aggregates over a large number of records. In this case, scanning the entire input might be quite a reasonable thing to do, especially if you can parallelize the processing across multiple machines.
当MapReduce作业接收一组文件作为输入时,它会读取所有文件的整个内容;数据库称此操作为完全扫描。如果您只想读取少量记录,与索引查找相比,完全扫描的费用非常昂贵。但是,在分析查询中(请参见“事务处理还是分析?”),通常希望计算大量记录的聚合值。在这种情况下,扫描整个输入可能是相当合理的,特别是如果您可以在多台机器上并行处理。
When we talk about joins in the context of batch processing, we mean resolving all occurrences of some association within a dataset. For example, we assume that a job is processing the data for all users simultaneously, not merely looking up the data for one particular user (which would be done far more efficiently with an index).
在批量处理的上下文中,当我们谈论连接时,我们指的是解析数据集中某个关联的所有出现。例如,我们假设作业正在同时处理所有用户的数据,而不仅仅是查找某个特定用户的数据(如果使用索引将更高效)。
Example: analysis of user activity events
A typical example of a join in a batch job is illustrated in Figure 10-2 . On the left is a log of events describing the things that logged-in users did on a website (known as activity events or clickstream data ), and on the right is a database of users. You can think of this example as being part of a star schema (see “Stars and Snowflakes: Schemas for Analytics” ): the log of events is the fact table, and the user database is one of the dimensions.
批处理作业中加入的一个典型示例如图10-2所示。左边是描述登陆用户在网站上所做的事情的事件日志(称为活动事件或点击流数据),右边是用户数据库。您可以认为这个例子是星型架构的一部分(参见“星型和雪花形式:用于分析的模式”):事件日志是事实表,用户数据库是其中一个维度。
An analytics task may need to correlate user activity with user profile information: for example, if the profile contains the user’s age or date of birth, the system could determine which pages are most popular with which age groups. However, the activity events contain only the user ID, not the full user profile information. Embedding that profile information in every single activity event would most likely be too wasteful. Therefore, the activity events need to be joined with the user profile database.
一个分析任务可能需要将用户活动与用户个人资料信息相关联:例如,如果个人资料包含用户的年龄或出生日期,则系统可以确定哪些页面最受哪个年龄组的欢迎。然而,活动事件仅包含用户ID,而不包含完整的用户个人资料信息。将该资料信息嵌入每一个活动事件中很可能会太浪费资源。因此,需要将活动事件与用户个人资料数据库连接。
The simplest implementation of this join would go over the activity events one by one and query the user database (on a remote server) for every user ID it encounters. This is possible, but it would most likely suffer from very poor performance: the processing throughput would be limited by the round-trip time to the database server, the effectiveness of a local cache would depend very much on the distribution of data, and running a large number of queries in parallel could easily overwhelm the database [ 35 ].
这种连接的最简单实现方式是逐个遍历活动事件,并为遇到的每个用户ID查询远程服务器上的用户数据库。虽然这是可能的,但很可能会受到非常差的性能影响:处理吞吐量将受到到数据库服务器的往返时间的限制,本地缓存的有效性将在很大程度上取决于数据的分布,而同时运行大量查询可能很容易压垮数据库。
In order to achieve good throughput in a batch process, the computation must be (as much as possible) local to one machine. Making random-access requests over the network for every record you want to process is too slow. Moreover, querying a remote database would mean that the batch job becomes nondeterministic, because the data in the remote database might change.
为了在批处理过程中获得良好的吞吐量,计算必须尽可能地局限在一台机器上。对于每个要处理的记录进行随机访问网络请求太慢。此外,查询远程数据库将意味着批作业变得非确定性,因为远程数据库中的数据可能会更改。
Thus, a better approach would be to take a copy of the user database (for example, extracted from a database backup using an ETL process—see “Data Warehousing” ) and to put it in the same distributed filesystem as the log of user activity events. You would then have the user database in one set of files in HDFS and the user activity records in another set of files, and could use MapReduce to bring together all of the relevant records in the same place and process them efficiently.
因此,更好的方法是将用户数据库的副本(例如,使用ETL流程从数据库备份中提取 - 见“数据仓库”)放在与用户活动事件日志相同的分布式文件系统中。你可以将用户数据库存在HDFS中的一个文件集合中,将用户活动记录存在另一个文件集合中,并使用MapReduce将所有相关记录汇集在同一位置并有效地处理它们。
Sort-merge joins
Recall that the purpose of the mapper is to extract a key and value from each input record. In the case of Figure 10-2 , this key would be the user ID: one set of mappers would go over the activity events (extracting the user ID as the key and the activity event as the value), while another set of mappers would go over the user database (extracting the user ID as the key and the user’s date of birth as the value). This process is illustrated in Figure 10-3 .
重申一下,映射器的目的是从每个输入记录中提取一个键和一个值。在图10-2中,这个键将是用户ID:一组映射器将处理活动事件(提取用户ID作为键,活动事件作为值),而另一组映射器将处理用户数据库(提取用户ID作为键,用户的出生日期作为值)。这个过程在图10-3中说明。
When the MapReduce framework partitions the mapper output by key and then sorts the key-value pairs, the effect is that all the activity events and the user record with the same user ID become adjacent to each other in the reducer input. The MapReduce job can even arrange the records to be sorted such that the reducer always sees the record from the user database first, followed by the activity events in timestamp order—this technique is known as a secondary sort [ 26 ].
当MapReduce框架按键值对分割映射器输出并排序时,具有相同用户ID的所有活动事件和用户记录变得相邻,并进入缩小器输入。 MapReduce作业甚至可以安排记录排序,使得缩小器始终优先查看来自用户数据库的记录,然后按时间戳顺序显示活动事件 - 这种技术称为二次排序[26]。
The reducer can then perform the actual join logic easily: the reducer function is called once for every user ID, and thanks to the secondary sort, the first value is expected to be the date-of-birth record from the user database. The reducer stores the date of birth in a local variable and then iterates over the activity events with the same user ID, outputting pairs of viewed-url and viewer-age-in-years . Subsequent MapReduce jobs could then calculate the distribution of viewer ages for each URL, and cluster by age group.
Reducer能够轻松执行实际的连接逻辑:Reducer函数针对每个用户ID只被调用一次,得益于次要排序,期望第一个值是来自用户数据库的出生日期记录。Reducer在本地变量中存储出生日期,然后迭代相同用户ID的活动事件,输出查看的URL和观看者年龄。随后的MapReduce作业可以计算每个URL的观众年龄分布,并按年龄分组。
Since the reducer processes all of the records for a particular user ID in one go, it only needs to keep one user record in memory at any one time, and it never needs to make any requests over the network. This algorithm is known as a sort-merge join , since mapper output is sorted by key, and the reducers then merge together the sorted lists of records from both sides of the join.
由于Reducer一次处理特定用户ID的所有记录,所以它每次只需要在内存中保持一个用户记录,而且不需要通过网络进行任何请求。这个算法被称为排序合并连接,因为mapper输出按键排序,然后reducer将连接的两侧的排序记录列表合并在一起。
Bringing related data together in the same place
In a sort-merge join, the mappers and the sorting process make sure that all the necessary data to perform the join operation for a particular user ID is brought together in the same place: a single call to the reducer. Having lined up all the required data in advance, the reducer can be a fairly simple, single-threaded piece of code that can churn through records with high throughput and low memory overhead.
在排序合并连接中,映射器和排序过程确保所有执行特定用户ID的连接操作所需的数据汇集到同一个地方:单个对减速器的调用。事先整理好所有所需的数据后,减速器可以是一个相当简单、单线程的代码片段,可以高吞吐量、低内存开销地处理记录。
One way of looking at this architecture is that mappers “send messages” to the reducers. When a mapper emits a key-value pair, the key acts like the destination address to which the value should be delivered. Even though the key is just an arbitrary string (not an actual network address like an IP address and port number), it behaves like an address: all key-value pairs with the same key will be delivered to the same destination (a call to the reducer).
一个看待这种架构的方式是,映射器向减速器“发送消息”。当映射器发出一个键值对时,键就像目的地址,值应该被传送到那里。尽管键只是任意字符串(不是实际的网络地址,如IP地址和端口号),它表现得像一个地址:所有具有相同键的键值对都将被传递到同一个目的地(减速器的调用)。
Using the MapReduce programming model has separated the physical network communication aspects of the computation (getting the data to the right machine) from the application logic (processing the data once you have it). This separation contrasts with the typical use of databases, where a request to fetch data from a database often occurs somewhere deep inside a piece of application code [ 36 ]. Since MapReduce handles all network communication, it also shields the application code from having to worry about partial failures, such as the crash of another node: MapReduce transparently retries failed tasks without affecting the application logic.
使用MapReduce编程模型将计算的物理网络通信方面(将数据传送到正确的计算机)与应用逻辑(一旦获得数据则处理数据)分开。这种分离与典型的数据库使用不同,数据库中从数据库提取数据的请求通常发生在应用程序代码的深处[36]。由于MapReduce处理所有网络通信,因此它还保护应用程序代码免受部分故障的影响,例如其他节点的崩溃:MapReduce在不影响应用程序逻辑的情况下自动重试失败的任务。
GROUP BY
Besides joins, another common use of the “bringing related data to the same place” pattern is
grouping records by some key (as in the
GROUP BY
clause in SQL). All records with the same key
form a group, and the next step is often to perform some kind of aggregation within each group—for
example:
除了连接之外,“将相关数据放置在同一位置”的模式的另一个常见用途是通过某个键将记录分组(如SQL中的GROUP BY子句)。具有相同键的所有记录形成一个组,下一步通常是对每个组执行某种聚合 - 例如:
-
Counting the number of records in each group (like in our example of counting page views, which you would express as a
COUNT(*)
aggregation in SQL)计算每个组中的记录数(例如在统计页面浏览次数时,您可以将其表示为 SQL 中的 COUNT(*) 聚合函数)。
-
Adding up the values in one particular field (
SUM(fieldname)
) in SQL在 SQL 中对特定字段进行值求和(SUM(字段名))。
-
Picking the top k records according to some ranking function
根据排名函数选择前k条记录。
The simplest way of implementing such a grouping operation with MapReduce is to set up the mappers so that the key-value pairs they produce use the desired grouping key. The partitioning and sorting process then brings together all the records with the same key in the same reducer. Thus, grouping and joining look quite similar when implemented on top of MapReduce.
使用MapReduce实现此类分组操作的最简单方法是设置映射器,使其生成的键值对使用所需的分组键。然后,分区和排序过程将所有具有相同键的记录汇集到同一个减速器中。因此,当在MapReduce的基础上实现分组和连接时,它们看起来非常相似。
Another common use for grouping is collating all the activity events for a particular user session, in order to find out the sequence of actions that the user took—a process called sessionization [ 37 ]. For example, such analysis could be used to work out whether users who were shown a new version of your website are more likely to make a purchase than those who were shown the old version (A/B testing), or to calculate whether some marketing activity is worthwhile.
分组的另一个常见用途是将特定用户会话的所有活动事件整理在一起,以找出用户所采取的操作序列,这个过程被称为“sessionization”。例如,这种分析可以用于确定那些被展示新版网站的用户比那些被展示旧版的用户更有可能购买(A/B测试),或者计算某些营销活动是否值得。
If you have multiple web servers handling user requests, the activity events for a particular user are most likely scattered across various different servers’ log files. You can implement sessionization by using a session cookie, user ID, or similar identifier as the grouping key and bringing all the activity events for a particular user together in one place, while distributing different users’ events across different partitions.
如果您有多个Web服务器处理用户请求,则特定用户的活动事件很可能散布在不同的服务器日志文件中。您可以通过使用会话Cookie、用户ID或类似的标识符作为分组键,将特定用户的所有活动事件集中在一个地方,同时将不同用户的事件分布在不同的分区中来实现会话化。
Handling skew
The pattern of “bringing all records with the same key to the same place” breaks down if there is a very large amount of data related to a single key. For example, in a social network, most users might be connected to a few hundred people, but a small number of celebrities may have many millions of followers. Such disproportionately active database records are known as linchpin objects [ 38 ] or hot keys .
“将所有具有相同键的记录放在同一个位置”模式在单个键相关的数据非常庞大的情况下将崩溃。例如,在社交网络中,大多数用户可能与数百人相连,但少数名人可能拥有数百万追随者。这些比例过高的数据库记录被称为枢纽对象[38]或热键。
Collecting all activity related to a celebrity (e.g., replies to something they posted) in a single reducer can lead to significant skew (also known as hot spots )—that is, one reducer that must process significantly more records than the others (see “Skewed Workloads and Relieving Hot Spots” ). Since a MapReduce job is only complete when all of its mappers and reducers have completed, any subsequent jobs must wait for the slowest reducer to complete before they can start.
将所有与名人相关的活动(例如对其发布内容的回复)收集到一个单独的减速器中可能会导致显著的偏斜(也称为热点),即一个减速器必须处理比其他减速器更多的记录(请参见“偏斜工作负载和缓解热点”)。由于 MapReduce 作业只有在其所有映射器和减速器完成后才算完成,因此任何后续作业都必须等待最慢的减速器完成后才能开始。
If a join input has hot keys, there are a few algorithms you can use to compensate. For example, the skewed join method in Pig first runs a sampling job to determine which keys are hot [ 39 ]. When performing the actual join, the mappers send any records relating to a hot key to one of several reducers, chosen at random (in contrast to conventional MapReduce, which chooses a reducer deterministically based on a hash of the key). For the other input to the join, records relating to the hot key need to be replicated to all reducers handling that key [ 40 ].
如果联接输入有热键,可以使用一些算法进行补偿。例如,Pig中的Skewed Join方法首先运行采样作业,以确定哪些键是热点[39]。在执行实际连接时,映射器将任何与热点键相关的记录发送到随机选择的多个减速器之一(与传统的MapReduce相反,后者基于键的哈希值确定减速器)。对于联接的另一个输入,与热键相关的记录需要复制到处理该键的所有减速器[40]。
This technique spreads the work of handling the hot key over several reducers, which allows it to be parallelized better, at the cost of having to replicate the other join input to multiple reducers. The sharded join method in Crunch is similar, but requires the hot keys to be specified explicitly rather than using a sampling job. This technique is also very similar to one we discussed in “Skewed Workloads and Relieving Hot Spots” , using randomization to alleviate hot spots in a partitioned database.
这种技术将处理热键的工作分散到多个Reducer中,这使得并行化更好,但代价是必须将其他连接输入复制到多个Reducer中。Crunch中的分片连接方法类似,但需要显式指定热键,而不是使用采样作业。这种技术也类似于我们在“扭曲的工作负载和缓解热点”中讨论的技术,使用随机化来缓解分区数据库中的热点。
Hive’s skewed join optimization takes an alternative approach. It requires hot keys to be specified explicitly in the table metadata, and it stores records related to those keys in separate files from the rest. When performing a join on that table, it uses a map-side join (see the next section) for the hot keys.
Hive 的倾斜连接优化采取了一种替代方法。它要求在表元数据中明确指定热点键,并将与这些键相关的记录存储在与其余记录分开的文件中。在对该表执行连接时,它使用 map-side join(参见下一节)来处理热点键。
When grouping records by a hot key and aggregating them, you can perform the grouping in two stages. The first MapReduce stage sends records to a random reducer, so that each reducer performs the grouping on a subset of records for the hot key and outputs a more compact aggregated value per key. The second MapReduce job then combines the values from all of the first-stage reducers into a single value per key.
当按热键对记录进行分组和聚合时,可以分两个阶段执行分组。 第一个MapReduce阶段将记录发送到随机归约器,以便每个归约器对热键的记录子集执行分组,并为每个键输出更紧凑的聚合值。 第二个MapReduce作业然后将所有第一阶段归约器的值合并为每个键的单个值。
Map-Side Joins
The join algorithms described in the last section perform the actual join logic in the reducers, and are hence known as reduce-side joins . The mappers take the role of preparing the input data: extracting the key and value from each input record, assigning the key-value pairs to a reducer partition, and sorting by key.
上一节中描述的连接算法在减少器中执行实际连接逻辑,因此被称为减少器端连接。映射器扮演准备输入数据的角色:从每个输入记录中提取键和值,将键值对分配给减少器分区并按键排序。
The reduce-side approach has the advantage that you do not need to make any assumptions about the input data: whatever its properties and structure, the mappers can prepare the data to be ready for joining. However, the downside is that all that sorting, copying to reducers, and merging of reducer inputs can be quite expensive. Depending on the available memory buffers, data may be written to disk several times as it passes through the stages of MapReduce [ 37 ].
"Reduce-side方法的优点是您无需对输入数据做出任何假设:无论其属性和结构如何,映射器都可以准备好加入的数据。然而,缺点是所有排序,复制到缩小器以及合并缩小器输入可能相当昂贵。根据可用的内存缓冲区,数据可能会经过MapReduce的各个阶段多次写入磁盘。"
On the other hand, if you can make certain assumptions about your input data, it is possible to make joins faster by using a so-called map-side join . This approach uses a cut-down MapReduce job in which there are no reducers and no sorting. Instead, each mapper simply reads one input file block from the distributed filesystem and writes one output file to the filesystem—that is all.
另一方面,如果您能对输入数据作出某些假设,就可以使用所谓的映射端连接来更快地进行连接。该方法使用了一个简化版的MapReduce作业,在其中没有减速器和排序。相反,每个映射器仅从分布式文件系统中读取一个输入文件块,并将一个输出文件写入文件系统 - 就是这些了。
Broadcast hash joins
The simplest way of performing a map-side join applies in the case where a large dataset is joined with a small dataset. In particular, the small dataset needs to be small enough that it can be loaded entirely into memory in each of the mappers.
最简单的地图端连接方式适用于将大型数据集与小型数据集连接的情况。特别是,小数据集需要足够小,以便可以在每个映射器中完全加载到内存中。
For example, imagine in the case of Figure 10-2 that the user database is small enough to fit in memory. In this case, when a mapper starts up, it can first read the user database from the distributed filesystem into an in-memory hash table. Once this is done, the mapper can scan over the user activity events and simply look up the user ID for each event in the hash table. vi
例如,假设在图10-2的情况下,用户数据库足够小可以放入内存中。这种情况下,当一个Mapper启动时,它可以先将用户数据库从分布式文件系统读取到内存哈希表中。一旦完成这个操作,Mapper可以扫描用户活动事件并在哈希表中查找每个事件的用户ID。
There can still be several map tasks: one for each file block of the large input to the join (in the example of Figure 10-2 , the activity events are the large input). Each of these mappers loads the small input entirely into memory.
还可以有几个地图任务:针对连接的大型输入的每个文件块(在图10-2的示例中,活动事件是大型输入),都可以有一个地图器。每个地图器将完全将小输入加载到内存中。
This simple but effective algorithm is called a broadcast hash join : the word broadcast reflects the fact that each mapper for a partition of the large input reads the entirety of the small input (so the small input is effectively “broadcast” to all partitions of the large input), and the word hash reflects its use of a hash table. This join method is supported by Pig (under the name “replicated join”), Hive (“MapJoin”), Cascading, and Crunch. It is also used in data warehouse query engines such as Impala [ 41 ].
这个简单但有效的算法被称为广播哈希连接:广播一词反映了大输入每个分区的映射器都读取小输入的全部内容(因此小输入实际上被“广播”到大输入的所有分区),哈希一词则反映了它使用哈希表的特点。这种连接方法由Pig(以“重复连接”的名称)、Hive(“MapJoin”)、Cascading和Crunch支持。它还被用于数据仓库查询引擎,例如Impala[41]。
Instead of loading the small join input into an in-memory hash table, an alternative is to store the small join input in a read-only index on the local disk [ 42 ]. The frequently used parts of this index will remain in the operating system’s page cache, so this approach can provide random-access lookups almost as fast as an in-memory hash table, but without actually requiring the dataset to fit in memory.
将小的连接输入加载到内存哈希表中的替代方法是将其存储在本地磁盘上的只读索引中。这个索引的经常使用部分将保留在操作系统的页面缓存中,因此这种方法可以提供几乎与内存哈希表一样快的随机访问查找,但不需要实际上将数据集适合内存。
Partitioned hash joins
If the inputs to the map-side join are partitioned in the same way, then the hash join approach can be applied to each partition independently. In the case of Figure 10-2 , you might arrange for the activity events and the user database to each be partitioned based on the last decimal digit of the user ID (so there are 10 partitions on either side). For example, mapper 3 first loads all users with an ID ending in 3 into a hash table, and then scans over all the activity events for each user whose ID ends in 3.
如果地图侧连接的输入以相同的方式分区,那么哈希连接方法可以独立地应用于每个分区。在图10-2的情况下,您可以根据用户ID的最后一位来对活动事件和用户数据库进行分区(因此每侧有10个分区)。例如,mapper 3首先将所有ID以3结尾的用户加载到哈希表中,然后扫描每个ID以3结尾的用户的所有活动事件。
If the partitioning is done correctly, you can be sure that all the records you might want to join are located in the same numbered partition, and so it is sufficient for each mapper to only read one partition from each of the input datasets. This has the advantage that each mapper can load a smaller amount of data into its hash table.
如果分区正确执行,则可以确保您想要加入的所有记录位于相同编号的分区中,因此对于每个映射器,从每个输入数据集中仅读取一个分区就足够了。这有一个优点,即每个映射器可以将较少的数据加载到其哈希表中。
This approach only works if both of the join’s inputs have the same number of partitions, with records assigned to partitions based on the same key and the same hash function. If the inputs are generated by prior MapReduce jobs that already perform this grouping, then this can be a reasonable assumption to make.
如果连接的输入具有相同数量的分区,并且根据相同的键和相同的哈希函数将记录分配到分区中,则该方法仅在两个加入的输入都可行。如果输入是由先前执行此分组的MapReduce作业生成的,则可以做出合理的假设。
Partitioned hash joins are known as bucketed map joins in Hive [ 37 ].
分区哈希连接在Hive中被称为分桶映射连接。
Map-side merge joins
Another variant of a map-side join applies if the input datasets are not only partitioned in the same way, but also sorted based on the same key. In this case, it does not matter whether the inputs are small enough to fit in memory, because a mapper can perform the same merging operation that would normally be done by a reducer: reading both input files incrementally, in order of ascending key, and matching records with the same key.
另一种地图端连接的变体适用于输入数据集不仅以相同方式分区,而且基于相同键进行排序。在这种情况下,无论输入是否足够小可以放入内存中,因为 Mapper 可以执行常规 Reducer 执行的相同合并操作:按升序顺序逐渐读取两个输入文件并匹配具有相同键的记录。
If a map-side merge join is possible, it probably means that prior MapReduce jobs brought the input datasets into this partitioned and sorted form in the first place. In principle, this join could have been performed in the reduce stage of the prior job. However, it may still be appropriate to perform the merge join in a separate map-only job, for example if the partitioned and sorted datasets are also needed for other purposes besides this particular join.
如果可能存在一种基于地图合并加入的方法,那很可能是先前的MapReduce作业已经将输入数据集首先分区并排序了。原则上,这个连接可能已经在前一个作业的减少阶段中执行。然而,执行分离的仅映射作业中的合并加入可能仍然是适当的,例如,如果除了这个特定的连接之外,还需要这些分区和排序的数据集用于其他目的。
MapReduce workflows with map-side joins
When the output of a MapReduce join is consumed by downstream jobs, the choice of map-side or reduce-side join affects the structure of the output. The output of a reduce-side join is partitioned and sorted by the join key, whereas the output of a map-side join is partitioned and sorted in the same way as the large input (since one map task is started for each file block of the join’s large input, regardless of whether a partitioned or broadcast join is used).
当MapReduce连接的结果被下游作业使用时,选择在Map端还是Reduce端进行连接会影响到输出的结构。Reduce端连接的输出会按照连接键进行分区和排序,而Map端连接的输出会按照与大型输入相同的方式进行分区和排序(因为无论使用分区连接还是广播连接,每个文件块的连接都会启动一个Map任务)。
As discussed, map-side joins also make more assumptions about the size, sorting, and partitioning of their input datasets. Knowing about the physical layout of datasets in the distributed filesystem becomes important when optimizing join strategies: it is not sufficient to just know the encoding format and the name of the directory in which the data is stored; you must also know the number of partitions and the keys by which the data is partitioned and sorted.
根据讨论,由于映射端关联操作需要考虑输入数据集的大小、排序和分片情况,因此对于分布式文件系统中数据集的物理布局,我们需要了解得更加清楚,以便优化联接策略。知道数据集的编码格式和存储目录名称是不够的,您还必须知道分区数量和用于分区和排序的键。
In the Hadoop ecosystem, this kind of metadata about the partitioning of datasets is often maintained in HCatalog and the Hive metastore [ 37 ].
在Hadoop生态系统中,有关数据集分区的此类元数据通常存储在HCatalog和Hive元数据存储中。[37]。
The Output of Batch Workflows
We have talked a lot about the various algorithms for implementing workflows of MapReduce jobs, but we neglected an important question: what is the result of all of that processing, once it is done? Why are we running all these jobs in the first place?
我们已经讨论了许多有关实施MapReduce工作流的各种算法,但我们忽略了一个重要问题:一旦完成所有这些处理,结果是什么?我们为什么首先要运行所有这些作业?
In the case of database queries, we distinguished transaction processing (OLTP) purposes from analytic purposes (see “Transaction Processing or Analytics?” ). We saw that OLTP queries generally look up a small number of records by key, using indexes, in order to present them to a user (for example, on a web page). On the other hand, analytic queries often scan over a large number of records, performing groupings and aggregations, and the output often has the form of a report: a graph showing the change in a metric over time, or the top 10 items according to some ranking, or a breakdown of some quantity into subcategories. The consumer of such a report is often an analyst or a manager who needs to make business decisions.
在数据库查询方面,我们将事务处理(OLTP)目的与分析目的(参见“事务处理还是分析?”)区分开来。我们发现,OLTP查询通常通过索引按键查找少量记录,并将其呈现给用户(例如,在网页上)。另一方面,分析查询通常扫描大量记录,执行分组和聚合,并且输出通常采用报告形式:显示指标随时间变化的图形,或根据某个排名的前10个项目,或将某个数量拆分为子类别。此类报告的消费者通常是分析师或经理,他们需要做出业务决策。
Where does batch processing fit in? It is not transaction processing, nor is it analytics. It is closer to analytics, in that a batch process typically scans over large portions of an input dataset. However, a workflow of MapReduce jobs is not the same as a SQL query used for analytic purposes (see “Comparing Hadoop to Distributed Databases” ). The output of a batch process is often not a report, but some other kind of structure.
批处理适用于哪些情况?它既不是事务处理,也不是分析。它与分析更接近,因为批处理通常会扫描整个输入数据集的大部分内容。然而,MapReduce作业的工作流与用于分析目的的SQL查询并不相同(请参见“将Hadoop与分布式数据库进行比较”)。批处理的输出通常不是报告,而是其他一些结构。
Building search indexes
Google’s original use of MapReduce was to build indexes for its search engine, which was implemented as a workflow of 5 to 10 MapReduce jobs [ 1 ]. Although Google later moved away from using MapReduce for this purpose [ 43 ], it helps to understand MapReduce if you look at it through the lens of building a search index. (Even today, Hadoop MapReduce remains a good way of building indexes for Lucene/Solr [ 44 ].)
Google最初使用MapReduce是用来建立其搜索引擎的索引,它是由5至10个MapReduce作业构成的工作流程[1]。尽管Google后来不再将MapReduce用于此目的[43],但如果你从构建搜索引擎索引的角度来看MapReduce,它仍然有助于理解。 (即使到今天,Hadoop MapReduce仍然是构建Lucene / Solr索引的好方法[44]。)
We saw briefly in “Full-text search and fuzzy indexes” how a full-text search index such as Lucene works: it is a file (the term dictionary) in which you can efficiently look up a particular keyword and find the list of all the document IDs containing that keyword (the postings list). This is a very simplified view of a search index—in reality it requires various additional data, in order to rank search results by relevance, correct misspellings, resolve synonyms, and so on—but the principle holds.
我们简单地在“全文搜索和模糊索引”一章中看到了全文搜索索引(如Lucene)的工作方式:它是一个文件(词典),您可以在其中高效地查找特定的关键词并找到包含该关键词的所有文档ID列表(后续列表)。这是搜索索引的一个非常简化的视图——实际上需要各种额外的数据,以便通过相关性对搜索结果进行排序、纠正拼写错误、解决同义词等,但是原则保持不变。
If you need to perform a full-text search over a fixed set of documents, then a batch process is a very effective way of building the indexes: the mappers partition the set of documents as needed, each reducer builds the index for its partition, and the index files are written to the distributed filesystem. Building such document-partitioned indexes (see “Partitioning and Secondary Indexes” ) parallelizes very well. Since querying a search index by keyword is a read-only operation, these index files are immutable once they have been created.
如果您需要在一组固定的文件上执行全文搜索,那么批处理是构建索引的非常有效的方式:映射器按需要分区文档集,每个减少器为其分区建立索引,并将索引文件写入分布式文件系统。构建这样的文档分区索引(请参阅“分区和二级索引”)非常好并行化。由于通过关键字查询搜索索引是只读操作,因此一旦创建了这些索引文件,它们就是不可变的。
If the indexed set of documents changes, one option is to periodically rerun the entire indexing workflow for the entire set of documents, and replace the previous index files wholesale with the new index files when it is done. This approach can be computationally expensive if only a small number of documents have changed, but it has the advantage that the indexing process is very easy to reason about: documents in, indexes out.
如果文档集合发生变化,一种选择是定期重新运行整个文档集的索引工作流程,并在完成后以新索引文件整体替换以前的索引文件。如果只有少数文件发生更改,这种方法可能计算成本很高,但它具有易于理解的优点:将文档放入,索引输出。
Alternatively, it is possible to build indexes incrementally. As discussed in Chapter 3 , if you want to add, remove, or update documents in an index, Lucene writes out new segment files and asynchronously merges and compacts segment files in the background. We will see more on such incremental processing in Chapter 11 .
另外,也可以逐步建立索引。正如第3章所讨论的,如果您想要添加、删除或更新索引中的文档,Lucene会写入新的段文件,并在后台异步合并和压缩段文件。我们将在第11章中进一步了解这种递增式处理的方法。
Key-value stores as batch process output
Search indexes are just one example of the possible outputs of a batch processing workflow. Another common use for batch processing is to build machine learning systems such as classifiers (e.g., spam filters, anomaly detection, image recognition) and recommendation systems (e.g., people you may know, products you may be interested in, or related searches [ 29 ]).
搜索索引只是批处理工作流的可能输出之一。批处理的另一个常见用途是构建机器学习系统,例如分类器(如垃圾邮件过滤器、异常检测、图像识别)和推荐系统(如可能认识的人、可能感兴趣的产品或相关搜索[29])。
The output of those batch jobs is often some kind of database: for example, a database that can be queried by user ID to obtain suggested friends for that user, or a database that can be queried by product ID to get a list of related products [ 45 ].
这些批处理作业的输出通常是某种数据库:例如,可以通过用户ID查询以获取该用户的建议朋友的数据库,或者可以通过产品ID查询以获取相关产品列表的数据库。
These databases need to be queried from the web application that handles user requests, which is usually separate from the Hadoop infrastructure. So how does the output from the batch process get back into a database where the web application can query it?
这些数据库需要从处理用户请求的Web应用程序中查询,这通常与Hadoop基础架构分开。那么,批处理过程的输出如何返回到数据库中,使得Web应用程序可以查询它?
The most obvious choice might be to use the client library for your favorite database directly within a mapper or reducer, and to write from the batch job directly to the database server, one record at a time. This will work (assuming your firewall rules allow direct access from your Hadoop environment to your production databases), but it is a bad idea for several reasons:
最明显的选择可能是直接在映射器或减少器中使用您喜欢的数据库的客户端库,并直接从批处理作业一次写入一条记录到数据库服务器。这将起作用(假设您的防火墙规则允许从Hadoop环境直接访问您的生产数据库),但出于几个原因这是一个不好的想法:
-
As discussed previously in the context of joins, making a network request for every single record is orders of magnitude slower than the normal throughput of a batch task. Even if the client library supports batching, performance is likely to be poor.
如前所述,在联接的上下文中讨论过,为每条记录进行网络请求的速度比批处理任务的正常吞吐量慢数倍。即使客户端库支持批处理,性能也很可能很差。
-
MapReduce jobs often run many tasks in parallel. If all the mappers or reducers concurrently write to the same output database, with a rate expected of a batch process, that database can easily be overwhelmed, and its performance for queries is likely to suffer. This can in turn cause operational problems in other parts of the system [ 35 ].
MapReduce作业通常并行运行许多任务。如果所有映射器或约简器同时向相同的输出数据库写入,预期是批处理进程的速率,那么该数据库很容易被压垮,其查询性能可能会受影响。这反过来会导致系统其他部分的操作问题 [35]。
-
Normally, MapReduce provides a clean all-or-nothing guarantee for job output: if a job succeeds, the result is the output of running every task exactly once, even if some tasks failed and had to be retried along the way; if the entire job fails, no output is produced. However, writing to an external system from inside a job produces externally visible side effects that cannot be hidden in this way. Thus, you have to worry about the results from partially completed jobs being visible to other systems, and the complexities of Hadoop task attempts and speculative execution.
通常,MapReduce 为作业输出提供了一个干净的全有或全无的保证:如果一个作业成功了,那么结果就是准确地运行每个任务一次的输出,即使有些任务失败了并且必须在路上重试;如果整个作业失败了,就不会有输出产生。然而,从作业内部写入外部系统会产生外部可见的副作用,这种副作用无法隐藏。因此,你必须担心部分完成的作业结果对其他系统的可见性以及 Hadoop 任务尝试和投机执行的复杂性。
A much better solution is to build a brand-new database inside the batch job and write it as files to the job’s output directory in the distributed filesystem, just like the search indexes in the last section. Those data files are then immutable once written, and can be loaded in bulk into servers that handle read-only queries. Various key-value stores support building database files in MapReduce jobs, including Voldemort [ 46 ], Terrapin [ 47 ], ElephantDB [ 48 ], and HBase bulk loading [ 49 ].
一个更好的解决方案是在批处理作业中构建一个全新的数据库,并将其写为文件存储在分布式文件系统的作业输出目录中,就像上一节中的搜索索引一样。这些数据文件一旦被写入就是不可变的,可以大量加载到处理只读查询的服务器中。各种键值存储支持在MapReduce作业中构建数据库文件,包括Voldemort [46]、Terrapin [47]、ElephantDB [48]和HBase批量加载 [49]。
Building these database files is a good use of MapReduce: using a mapper to extract a key and then sorting by that key is already a lot of the work required to build an index. Since most of these key-value stores are read-only (the files can only be written once by a batch job and are then immutable), the data structures are quite simple. For example, they do not require a WAL (see “Making B-trees reliable” ).
构建这些数据库文件是MapReduce的良好使用方式:使用mapper提取关键字并按照关键字进行排序,已经完成了构建索引所需的大部分工作。由于这些键值存储大部分是只读的(文件只能由批处理作业编写一次,然后是不变的),因此数据结构非常简单。例如,它们不需要WAL(参见“使B树可靠”)。
When loading data into Voldemort, the server continues serving requests to the old data files while the new data files are copied from the distributed filesystem to the server’s local disk. Once the copying is complete, the server atomically switches over to querying the new files. If anything goes wrong in this process, it can easily switch back to the old files again, since they are still there and immutable [ 46 ].
在将数据加载到 Voldemort 时,服务器会在从分布式文件系统将新数据文件复制到服务器本地磁盘的过程中,继续为旧数据文件提供请求服务。一旦复制完成,服务器就会原子地切换到查询新文件。如果在此过程中出现任何问题,可以轻松地切换回旧文件,因为它们仍然存在且不可变[46]。
Philosophy of batch process outputs
The Unix philosophy that we discussed earlier in this chapter ( “The Unix Philosophy” ) encourages experimentation by being very explicit about dataflow: a program reads its input and writes its output. In the process, the input is left unchanged, any previous output is completely replaced with the new output, and there are no other side effects. This means that you can rerun a command as often as you like, tweaking or debugging it, without messing up the state of your system.
Unix哲学鼓励实验,因为它非常明确的数据流:程序读取其输入,然后写入其输出。在这个过程中,输入不会被改变,任何以前的输出都将完全被新输出替换,并且没有其他副作用。这意味着你可以随时重新运行命令,进行调试或优化,而不会破坏系统状态。
The handling of output from MapReduce jobs follows the same philosophy. By treating inputs as immutable and avoiding side effects (such as writing to external databases), batch jobs not only achieve good performance but also become much easier to maintain:
MapReduce作业的输出处理遵循相同的哲学。通过将输入视为不可变,避免副作用(例如写入外部数据库),批处理作业不仅可以实现良好的性能,而且也变得更容易维护。
-
If you introduce a bug into the code and the output is wrong or corrupted, you can simply roll back to a previous version of the code and rerun the job, and the output will be correct again. Or, even simpler, you can keep the old output in a different directory and simply switch back to it. Databases with read-write transactions do not have this property: if you deploy buggy code that writes bad data to the database, then rolling back the code will do nothing to fix the data in the database. (The idea of being able to recover from buggy code has been called human fault tolerance [ 50 ].)
如果您在代码中引入了一个 bug,并且输出结果是错误或损坏的,您可以简单地回滚到代码的先前版本并重新运行作业,输出结果将再次正确。或者,更简单的方法是将旧的输出保留在不同的目录中,然后简单地切换回它。具有读写事务的数据库没有此属性:如果您部署有问题的代码并将坏数据写入数据库,则回滚代码将无法修复数据库中的数据。 (能够从错误的代码中恢复的想法被称为人类容错性[50]。)
-
As a consequence of this ease of rolling back, feature development can proceed more quickly than in an environment where mistakes could mean irreversible damage. This principle of minimizing irreversibility is beneficial for Agile software development [ 51 ].
由于可以轻松回退的能力,特性开发可以比在可能造成不可逆伤害的环境中更快地进行。最小化不可逆性的原则有利于敏捷软件开发。
-
If a map or reduce task fails, the MapReduce framework automatically re-schedules it and runs it again on the same input. If the failure is due to a bug in the code, it will keep crashing and eventually cause the job to fail after a few attempts; but if the failure is due to a transient issue, the fault is tolerated. This automatic retry is only safe because inputs are immutable and outputs from failed tasks are discarded by the MapReduce framework.
如果地图或归纳任务失败,MapReduce框架会自动重新安排并在同一输入上运行它。如果故障是由代码中的错误引起的,它会继续崩溃并最终在几次尝试后导致作业失败;但是,如果故障是由短暂问题引起的,则容错。这种自动重试只有在输入不可变且来自失败任务的输出由MapReduce框架丢弃时才是安全的。
-
The same set of files can be used as input for various different jobs, including monitoring jobs that calculate metrics and evaluate whether a job’s output has the expected characteristics (for example, by comparing it to the output from the previous run and measuring discrepancies).
同一组文件可用作多个不同工作的输入,包括监视工作,计算指标并评估工作的输出是否具有预期特性(例如通过与上次运行的输出进行比较并测量差异)。
-
Like Unix tools, MapReduce jobs separate logic from wiring (configuring the input and output directories), which provides a separation of concerns and enables potential reuse of code: one team can focus on implementing a job that does one thing well, while other teams can decide where and when to run that job.
像Unix工具一样,MapReduce作业将逻辑与连接(配置输入和输出目录)分离,这提供了关注点分离并使代码的重复使用成为可能:一个团队可以专注于实现一个良好执行的作业,而其他团队可以决定何时何地运行该作业。
In these areas, the design principles that worked well for Unix also seem to be working well for
Hadoop—but Unix and Hadoop also differ in some ways. For example, because most Unix tools assume
untyped text files, they have to do a lot of input parsing (our log analysis example at the
beginning of the chapter used
{print $7}
to extract the URL).
On Hadoop, some of those low-value syntactic conversions
are eliminated by using more structured file formats: Avro (see
“Avro”
) and Parquet
(see
“Column-Oriented Storage”
) are often used, as they provide efficient schema-based encoding and
allow evolution of their schemas over time (see
Chapter 4
).
在这些领域,对于Unix有效的设计原则似乎也适用于Hadoop,但Unix和Hadoop在某些方面也存在差异。例如,由于大多数Unix工具假定是未经处理的文本文件,它们必须执行大量的输入解析(我们在本章开始时的日志分析示例使用了{print $7}来提取URL)。 在Hadoop上,通过使用更结构化的文件格式,可以消除其中一些低价值的句法转换:Avro(请参阅“Avro”)和Parquet(请参阅“列向存储”)通常被用作它们提供了高效的基于模式的编码,并允许随时间演变它们的模式(请参阅第4章)。
Comparing Hadoop to Distributed Databases
As we have seen, Hadoop is somewhat like a distributed version of Unix, where HDFS is the
filesystem and MapReduce is a quirky implementation of a Unix process (which happens to always run
the
sort
utility between the map phase and the reduce phase). We saw how you can implement various
join and grouping operations on top of these primitives.
正如我们所看到的,Hadoop有点像分布式Unix,其中HDFS是文件系统,而MapReduce是Unix进程的奇怪实现(在map阶段和reduce阶段之间始终运行sort工具)。我们看到了如何在这些基本操作之上实现各种连接和分组操作。
When the MapReduce paper [ 1 ] was published, it was—in some sense—not at all new. All of the processing and parallel join algorithms that we discussed in the last few sections had already been implemented in so-called massively parallel processing (MPP) databases more than a decade previously [ 3 , 40 ]. For example, the Gamma database machine, Teradata, and Tandem NonStop SQL were pioneers in this area [ 52 ].
当MapReduce论文[1]出版时,在某种程度上,它并不是全新的。我们在最近几节中讨论的所有处理和并行连接算法都已经在所谓的大规模并行处理(MPP)数据库中实现了超过十年[3,40]。 例如,Gamma数据库机器,Teradata和Tandem NonStop SQL是该领域的先驱[52]。
The biggest difference is that MPP databases focus on parallel execution of analytic SQL queries on a cluster of machines, while the combination of MapReduce and a distributed filesystem [ 19 ] provides something much more like a general-purpose operating system that can run arbitrary programs.
MPP数据库的最大区别在于其专注于在机器群组上并行执行分析性SQL查询,而MapReduce和分布式文件系统的组合则提供了类似于通用操作系统的功能,可以运行任意程序。
Diversity of storage
Databases require you to structure data according to a particular model (e.g., relational or documents), whereas files in a distributed filesystem are just byte sequences, which can be written using any data model and encoding. They might be collections of database records, but they can equally well be text, images, videos, sensor readings, sparse matrices, feature vectors, genome sequences, or any other kind of data.
数据库要求您根据特定模型(例如,关系型或文档型)来构建数据,而在分布式文件系统中的文件只是字节序列,可以使用任何数据模型和编码进行编写。它们可能是数据库记录的集合,但同样可以是文本、图像、视频、传感器读数、稀疏矩阵、特征向量、基因组序列或任何其他类型的数据。
To put it bluntly, Hadoop opened up the possibility of indiscriminately dumping data into HDFS, and only later figuring out how to process it further [ 53 ]. By contrast, MPP databases typically require careful up-front modeling of the data and query patterns before importing the data into the database’s proprietary storage format.
简而言之,Hadoop打开了随意将数据倾倒到HDFS的可能性,然后再决定如何进一步处理它。相比之下,MPP数据库通常需要在将数据导入数据库的专有存储格式之前仔细建模数据和查询模式。
From a purist’s point of view, it may seem that this careful modeling and import is desirable, because it means users of the database have better-quality data to work with. However, in practice, it appears that simply making data available quickly—even if it is in a quirky, difficult-to-use, raw format—is often more valuable than trying to decide on the ideal data model up front [ 54 ].
从纯粹主义者的角度来看,这种谨慎的建模和导入似乎是可取的,因为这意味着数据库的用户有更好质量的数据可使用。然而,在实践中,似乎往往比起尝试事先确定理想的数据模型,只需迅速提供数据,即使它是用奇怪、难以使用的原始格式,通常更有价值。
The idea is similar to a data warehouse (see “Data Warehousing” ): simply bringing data from various parts of a large organization together in one place is valuable, because it enables joins across datasets that were previously disparate. The careful schema design required by an MPP database slows down that centralized data collection; collecting data in its raw form, and worrying about schema design later, allows the data collection to be speeded up (a concept sometimes known as a “data lake” or “enterprise data hub” [ 55 ]).
这个想法很类似于数据仓库:把来自大型组织不同部分的数据集中到一个地方是很有价值的,因为可以实现跨数据集的连接,这些数据集以前是分散的。由于 MPP 数据库要求谨慎的模式设计,因此会减缓中央数据收集的速度。将数据以原始形式收集,并稍后考虑模式设计,可以加快数据收集 (有时称为“数据湖”或“企业数据中心”)。
Indiscriminate data dumping shifts the burden of interpreting the data: instead of forcing the producer of a dataset to bring it into a standardized format, the interpretation of the data becomes the consumer’s problem (the schema-on-read approach [ 56 ]; see “Schema flexibility in the document model” ). This can be an advantage if the producer and consumers are different teams with different priorities. There may not even be one ideal data model, but rather different views onto the data that are suitable for different purposes. Simply dumping data in its raw form allows for several such transformations. This approach has been dubbed the sushi principle : “raw data is better” [ 57 ].
不加区分的数据倾倒将解释数据的负担转移给了消费者:与其强制数据集的生产者将其置于标准化格式中,解释数据成为了消费者的问题(即读取时模式的方法)。如果生产者和消费者是不同的团队,有着不同的优先级,这可能是一种优势。可能并没有一个理想的数据模型,而是适用于不同目的的不同数据视图。简单地将数据倾倒在其原始形式中允许进行多种这样的转换。这种方法被称为寿司原则:“原始数据更好”。
Thus, Hadoop has often been used for implementing ETL processes (see “Data Warehousing” ): data from transaction processing systems is dumped into the distributed filesystem in some raw form, and then MapReduce jobs are written to clean up that data, transform it into a relational form, and import it into an MPP data warehouse for analytic purposes. Data modeling still happens, but it is in a separate step, decoupled from the data collection. This decoupling is possible because a distributed filesystem supports data encoded in any format.
因此,Hadoop通常用于实现ETL过程(请参见“数据仓库”):从事务处理系统中获得的数据以某种原始形式转储到分布式文件系统中,然后编写MapReduce作业清理该数据,将其转换为关系形式,并将其导入MPP数据仓库以用于分析目的。数据建模仍然存在,但它是一个独立的步骤,与数据收集解耦。这种解耦是可能的,因为分布式文件系统支持以任何格式编码的数据。
Diversity of processing models
MPP databases are monolithic, tightly integrated pieces of software that take care of storage layout on disk, query planning, scheduling, and execution. Since these components can all be tuned and optimized for the specific needs of the database, the system as a whole can achieve very good performance on the types of queries for which it is designed. Moreover, the SQL query language allows expressive queries and elegant semantics without the need to write code, making it accessible to graphical tools used by business analysts (such as Tableau).
MPP数据库是集成紧密的软件单体,负责处理磁盘存储布局、查询规划、调度和执行等。由于所有这些组件都可以根据数据库的特定需求进行调优和优化,因此整个系统可以在其设计的查询类型上实现非常出色的性能。此外,SQL查询语言允许表达丰富的查询和优雅的语义,无需编写代码,使其可访问业务分析师(例如Tableau)使用的图形工具。
On the other hand, not all kinds of processing can be sensibly expressed as SQL queries. For example, if you are building machine learning and recommendation systems, or full-text search indexes with relevance ranking models, or performing image analysis, you most likely need a more general model of data processing. These kinds of processing are often very specific to a particular application (e.g., feature engineering for machine learning, natural language models for machine translation, risk estimation functions for fraud prediction), so they inevitably require writing code, not just queries.
另一方面,并非所有类型的处理都可以明智地表示为SQL查询。例如,如果你正在建立机器学习和推荐系统、具有相关性排名模型的全文搜索索引或进行图像分析,那么你很可能需要更通用的数据处理模型。这些处理通常非常特定于特定应用程序(例如,用于机器学习的特征工程、用于机器翻译的自然语言模型、用于欺诈预测的风险估计函数),因此它们不可避免地需要编写代码,而不仅仅是查询。
MapReduce gave engineers the ability to easily run their own code over large datasets. If you have HDFS and MapReduce, you can build a SQL query execution engine on top of it, and indeed this is what the Hive project did [ 31 ]. However, you can also write many other forms of batch processes that do not lend themselves to being expressed as a SQL query.
MapReduce将使工程师能够轻松地在大型数据集上运行自己的代码。如果您拥有HDFS和MapReduce,可以在其上构建一个SQL查询执行引擎,这正是Hive项目所做的[31]。不过,您也可以编写许多其他形式的批处理过程,这些过程不适合表达为SQL查询。
Subsequently, people found that MapReduce was too limiting and performed too badly for some types of processing, so various other processing models were developed on top of Hadoop (we will see some of them in “Beyond MapReduce” ). Having two processing models, SQL and MapReduce, was not enough: even more different models were needed! And due to the openness of the Hadoop platform, it was feasible to implement a whole range of approaches, which would not have been possible within the confines of a monolithic MPP database [ 58 ].
随后,人们发现MapReduce的局限性和性能过差,无法处理某些类型的数据处理,因此在Hadoop之上开发出了各种其他的处理模型(我们会在“超越MapReduce”中看到一些)。拥有两个处理模型,SQL和MapReduce,还不够:需要更多不同的模型!由于Hadoop平台的开放性,实现一整个系列的方法是可行的,而这是在一个单一的MPP数据库的限制内不可能实现的[58]。
Crucially, those various processing models can all be run on a single shared-use cluster of machines, all accessing the same files on the distributed filesystem. In the Hadoop approach, there is no need to import the data into several different specialized systems for different kinds of processing: the system is flexible enough to support a diverse set of workloads within the same cluster. Not having to move data around makes it a lot easier to derive value from the data, and a lot easier to experiment with new processing models.
重要的是,这些不同的处理模型都可以在一个共享使用的机群上运行,所有在分布式文件系统上访问相同的文件。在Hadoop的方法中,没有必要将数据导入多个不同的专门处理系统以进行处理:该系统足够灵活以在同一机群中支持各种工作负载。不需要移动数据会使从数据中获取价值变得更加容易,也更容易尝试新的处理模型。
The Hadoop ecosystem includes both random-access OLTP databases such as HBase (see “SSTables and LSM-Trees” ) and MPP-style analytic databases such as Impala [ 41 ]. Neither HBase nor Impala uses MapReduce, but both use HDFS for storage. They are very different approaches to accessing and processing data, but they can nevertheless coexist and be integrated in the same system.
Hadoop生态系统包括随机访问的OLTP数据库,例如HBase(参见“SSTables和LSM树”)和类似MPP的分析数据库,例如Impala [41]。 HBase和Impala均不使用MapReduce,但都使用HDFS进行存储。 它们是访问和处理数据的非常不同的方法,但它们仍然可以共存并集成到同一系统中。
Designing for frequent faults
When comparing MapReduce to MPP databases, two more differences in design approach stand out: the handling of faults and the use of memory and disk. Batch processes are less sensitive to faults than online systems, because they do not immediately affect users if they fail and they can always be run again.
当将MapReduce与MPP数据库进行比较时,设计方法中有两个更为显着的不同之处:故障处理和内存和磁盘的使用。批处理过程对于故障比在线系统不太敏感,因为如果它们失败并且可以随时重新运行,则不会立即影响用户。
If a node crashes while a query is executing, most MPP databases abort the entire query, and either let the user resubmit the query or automatically run it again [ 3 ]. As queries normally run for a few seconds or a few minutes at most, this way of handling errors is acceptable, since the cost of retrying is not too great. MPP databases also prefer to keep as much data as possible in memory (e.g., using hash joins) to avoid the cost of reading from disk.
如果一个节点在执行查询时崩溃,大多数MPP数据库会中止整个查询,然后让用户重新提交查询或自动运行再次查询[3]。由于查询通常只运行几秒钟或最多几分钟,处理错误的方式是可接受的,因为重试的成本不是太高。MPP数据库还喜欢尽可能多地保持数据在内存中(例如,使用哈希连接)以避免从磁盘读取的成本。
On the other hand, MapReduce can tolerate the failure of a map or reduce task without it affecting the job as a whole by retrying work at the granularity of an individual task. It is also very eager to write data to disk, partly for fault tolerance, and partly on the assumption that the dataset will be too big to fit in memory anyway.
另一方面,MapReduce 可以容忍 map 或 reduce 任务的失败,而不会影响整个任务,因为它可以在单个任务的粒度上重新尝试工作。此外,它也非常渴望将数据写入磁盘,部分是为了容错,部分是基于数据集太大无法放入内存的假设。
The MapReduce approach is more appropriate for larger jobs: jobs that process so much data and run for such a long time that they are likely to experience at least one task failure along the way. In that case, rerunning the entire job due to a single task failure would be wasteful. Even if recovery at the granularity of an individual task introduces overheads that make fault-free processing slower, it can still be a reasonable trade-off if the rate of task failures is high enough.
MapReduce方法更适合处理更大的任务: 即处理大量数据并且运行时间如此之长以至于它们有可能在执行过程中至少遇到一个任务故障。那种情况下,由于一个任务故障而重新运行整个任务是浪费的。即使以单个任务为粒度的恢复引入了一些开销,使得无故障处理变慢,如果任务故障率足够高,这仍然可以是一个合理的权衡。
But how realistic are these assumptions? In most clusters, machine failures do occur, but they are not very frequent—probably rare enough that most jobs will not experience a machine failure. Is it really worth incurring significant overheads for the sake of fault tolerance?
但这些假设有多现实?在大多数集群中会发生机器故障,但它们并不是非常频繁 - 可能很少有工作会经历机器故障。为了实现容错性,真的值得承担重大的开销吗?
To understand the reasons for MapReduce’s sparing use of memory and task-level recovery, it is helpful to look at the environment for which MapReduce was originally designed. Google has mixed-use datacenters, in which online production services and offline batch jobs run on the same machines. Every task has a resource allocation (CPU cores, RAM, disk space, etc.) that is enforced using containers. Every task also has a priority, and if a higher-priority task needs more resources, lower-priority tasks on the same machine can be terminated (preempted) in order to free up resources. Priority also determines pricing of the computing resources: teams must pay for the resources they use, and higher-priority processes cost more [ 59 ].
为了理解MapReduce在内存和任务级别恢复方面节约使用的原因,有助于看看MapReduce最初设计的环境。谷歌拥有混合使用的数据中心,在同一台机器上运行在线生产服务和离线批处理作业。每个任务都有一个资源分配(CPU核心,内存,磁盘空间等),使用容器来执行。每个任务还有一个优先级,如果需要更多资源的更高优先级任务,可以终止(抢占)同一台机器上的低优先级任务以释放资源。优先级还确定了计算资源的定价:团队必须为他们使用的资源付费,较高优先级的进程成本更高[59]。
This architecture allows non-production (low-priority) computing resources to be overcommitted, because the system knows that it can reclaim the resources if necessary. Overcommitting resources in turn allows better utilization of machines and greater efficiency compared to systems that segregate production and non-production tasks. However, as MapReduce jobs run at low priority, they run the risk of being preempted at any time because a higher-priority process requires their resources. Batch jobs effectively “pick up the scraps under the table,” using any computing resources that remain after the high-priority processes have taken what they need.
这种架构允许非生产(低优先级)计算资源被超额投入,因为系统知道如果需要的话可以收回资源。相比分离生产和非生产任务的系统,超额投入资源可以更好地利用机器并提高效率。然而,因为MapReduce作业以低优先级运行,它们随时面临被抢占的风险,因为高优先级进程需要它们的资源。批处理作业有效地“捡拾桌子下的残羹剩饭”,使用高优先级进程取走所需资源后,剩余的计算资源。
At Google, a MapReduce task that runs for an hour has an approximately 5% risk of being terminated to make space for a higher-priority process. This rate is more than an order of magnitude higher than the rate of failures due to hardware issues, machine reboot, or other reasons [ 59 ]. At this rate of preemptions, if a job has 100 tasks that each run for 10 minutes, there is a risk greater than 50% that at least one task will be terminated before it is finished.
在谷歌,运行一个持续一个小时的MapReduce任务有大约5%的风险被终止,以腾出更高优先级进程的空间。这个速率比由于硬件问题、机器重启或其他原因引起的故障率高出一个数量级。以这种预占率,如果一个作业有100个每个运行10分钟的任务,则至少有一个任务在完成之前被终止的风险大于50%。
And this is why MapReduce is designed to tolerate frequent unexpected task termination: it’s not because the hardware is particularly unreliable, it’s because the freedom to arbitrarily terminate processes enables better resource utilization in a computing cluster.
这也是为什么MapReduce被设计成能够容忍频繁意外任务终止的原因:不是因为硬件特别不可靠,而是因为可以任意终止进程的自由能够更好地利用计算集群的资源。
Among open source cluster schedulers, preemption is less widely used. YARN’s CapacityScheduler supports preemption for balancing the resource allocation of different queues [ 58 ], but general priority preemption is not supported in YARN, Mesos, or Kubernetes at the time of writing [ 60 ]. In an environment where tasks are not so often terminated, the design decisions of MapReduce make less sense. In the next section, we will look at some alternatives to MapReduce that make different design decisions.
在开源集群调度器中,抢占不太常用。YARN的CapacityScheduler支持抢占来平衡不同队列的资源分配,但在写作时,YARN、Mesos或Kubernetes均不支持常规优先级抢占。在任务不经常终止的环境中,MapReduce的设计决策显得不那么合理。在下一部分中,我们将探讨一些替代MapReduce的方案,这些方案做出了不同的设计决策。
Beyond MapReduce
Although MapReduce became very popular and received a lot of hype in the late 2000s, it is just one among many possible programming models for distributed systems. Depending on the volume of data, the structure of the data, and the type of processing being done with it, other tools may be more appropriate for expressing a computation.
尽管MapReduce在2000年代末非常流行并受到很多宣传,但它只是分布式系统中许多可能的编程模型之一。根据数据的数量、数据的结构和使用它进行的处理类型,可能有其他更适合表达计算的工具。
We nevertheless spent a lot of time in this chapter discussing MapReduce because it is a useful learning tool, as it is a fairly clear and simple abstraction on top of a distributed filesystem. That is, simple in the sense of being able to understand what it is doing, not in the sense of being easy to use. Quite the opposite: implementing a complex processing job using the raw MapReduce APIs is actually quite hard and laborious—for instance, you would need to implement any join algorithms from scratch [ 37 ].
我们在这章中花费了很多时间讨论MapReduce,因为它是一个有用的学习工具,它是一个相对清晰和简单的抽象,建立在分布式文件系统之上。也就是说,它容易理解它正在做什么,但并不容易使用。相反,使用原始的MapReduce API实现复杂的处理工作实际上是相当困难和繁琐的,例如,你需要从头实现任何联接算法[37]。
In response to the difficulty of using MapReduce directly, various higher-level programming models (Pig, Hive, Cascading, Crunch) were created as abstractions on top of MapReduce. If you understand how MapReduce works, they are fairly easy to learn, and their higher-level constructs make many common batch processing tasks significantly easier to implement.
针对直接使用MapReduce的困难,各种更高层次的编程模型(Pig、Hive、Cascading、Crunch)被创建为对MapReduce的抽象。如果您理解MapReduce的工作原理,学习它们是相当容易的,它们的高级构造使得许多常见的批处理任务更容易实现。
However, there are also problems with the MapReduce execution model itself, which are not fixed by adding another level of abstraction and which manifest themselves as poor performance for some kinds of processing. On the one hand, MapReduce is very robust: you can use it to process almost arbitrarily large quantities of data on an unreliable multi-tenant system with frequent task terminations, and it will still get the job done (albeit slowly). On the other hand, other tools are sometimes orders of magnitude faster for some kinds of processing.
然而,MapReduce执行模型本身也存在问题,即使增加另一级抽象也无法修复,表现为某些处理的性能不佳。一方面,MapReduce非常健壮:您可以在频繁任务终止的不可靠多租户系统上使用它来处理几乎任意数量的数据,并且它仍然能够完成工作(虽然缓慢)。另一方面,其他工具对于某些类型的处理有时会快上几个数量级。
In the rest of this chapter, we will look at some of those alternatives for batch processing. In Chapter 11 we will move to stream processing, which can be regarded as another way of speeding up batch processing.
在本章的其余部分,我们将探讨一些批处理的替代方法。在第11章中,我们将转向流处理,这可以被视为加速批处理的另一种方式。
Materialization of Intermediate State
As discussed previously, every MapReduce job is independent from every other job. The main contact points of a job with the rest of the world are its input and output directories on the distributed filesystem. If you want the output of one job to become the input to a second job, you need to configure the second job’s input directory to be the same as the first job’s output directory, and an external workflow scheduler must start the second job only once the first job has completed.
如前所述,每个MapReduce作业都与其他作业无关。作业与世界的主要联系点是分布式文件系统上的输入和输出目录。如果您希望一个作业的输出成为第二个作业的输入,则需要将第二个作业的输入目录配置为与第一个作业的输出目录相同,并且外部工作流调度程序必须在第一个作业完成后才能启动第二个作业。
This setup is reasonable if the output from the first job is a dataset that you want to publish widely within your organization. In that case, you need to be able to refer to it by name and reuse it as input to several different jobs (including jobs developed by other teams). Publishing data to a well-known location in the distributed filesystem allows loose coupling so that jobs don’t need to know who is producing their input or consuming their output (see “Separation of logic and wiring” ).
如果第一项工作的输出是您想要在组织内广泛发布的数据集,则此设置是合理的。在这种情况下,您需要能够通过名称引用它,并将其作为多个不同作业(包括其他团队开发的作业)的输入进行重用。将数据发布到分布式文件系统中的公认位置允许松散耦合,以便作业不需要知道谁正在生成其输入或使用其输出(参见“逻辑和接线的分离”)。
However, in many cases, you know that the output of one job is only ever used as input to one other job, which is maintained by the same team. In this case, the files on the distributed filesystem are simply intermediate state : a means of passing data from one job to the next. In the complex workflows used to build recommendation systems consisting of 50 or 100 MapReduce jobs [ 29 ], there is a lot of such intermediate state.
然而,在许多情况下,您知道一个作业的输出只会作为输入传递给同一团队维护的另一个作业。在这种情况下,分布式文件系统上的文件只是中间状态:将数据从一个作业传递到下一个的手段。在用于构建由50或100个MapReduce作业组成的推荐系统的复杂工作流程中,有许多这样的中间状态。
The process of writing out this intermediate state to files is called materialization . (We came across the term previously in the context of materialized views, in “Aggregation: Data Cubes and Materialized Views” . It means to eagerly compute the result of some operation and write it out, rather than computing it on demand when requested.)
将这个中间状态写入文件的过程称为物化。我们之前在“聚合:数据立方体和物化视图”中遇到过这个术语。它意味着急切地计算某个操作的结果并写出来,而不是在请求时再计算。
By contrast, the log analysis example at the beginning of the chapter used Unix pipes to connect the output of one command with the input of another. Pipes do not fully materialize the intermediate state, but instead stream the output to the input incrementally, using only a small in-memory buffer.
与此相反,本章开头的日志分析示例使用Unix管道将一个命令的输出与另一个命令的输入连接起来。管道不会完全实现中间状态,而是以增量方式将输出流式传输到输入,仅使用小的内存缓冲区。
MapReduce’s approach of fully materializing intermediate state has downsides compared to Unix pipes:
MapReduce完全物化中间状态的方法与Unix pipes相比有缺点。
-
A MapReduce job can only start when all tasks in the preceding jobs (that generate its inputs) have completed, whereas processes connected by a Unix pipe are started at the same time, with output being consumed as soon as it is produced. Skew or varying load on different machines means that a job often has a few straggler tasks that take much longer to complete than the others. Having to wait until all of the preceding job’s tasks have completed slows down the execution of the workflow as a whole.
MapReduce作业只有在生成其输入的所有前置作业中的所有任务都完成时才能开始,而通过Unix管道连接的进程则同时启动,输出在产生时立即被消耗。不同机器上的偏差或负载变化意味着作业经常有一些拖延任务要比其他任务完成时间更长。必须等待所有前置作业的任务完成会减慢整个工作流的执行速度。
-
Mappers are often redundant: they just read back the same file that was just written by a reducer, and prepare it for the next stage of partitioning and sorting. In many cases, the mapper code could be part of the previous reducer: if the reducer output was partitioned and sorted in the same way as mapper output, then reducers could be chained together directly, without interleaving with mapper stages.
映射器通常是冗余的:它们只是读取刚刚被减少器写入的相同文件,并为分区和排序的下一阶段进行准备。在许多情况下,映射器代码可以是前一个减少器的一部分:如果减少器输出的方式与映射器输出相同,则可以直接将减少器链在一起,而不需要与映射器阶段交错。
-
Storing intermediate state in a distributed filesystem means those files are replicated across several nodes, which is often overkill for such temporary data.
在分布式文件系统中存储中间状态意味着这些文件被复制到多个节点,这对于这种临时数据来说通常是过度的。
Dataflow engines
In order to fix these problems with MapReduce, several new execution engines for distributed batch computations were developed, the most well known of which are Spark [ 61 , 62 ], Tez [ 63 , 64 ], and Flink [ 65 , 66 ]. There are various differences in the way they are designed, but they have one thing in common: they handle an entire workflow as one job, rather than breaking it up into independent subjobs.
为了解决MapReduce存在的问题,出现了几个新的分布式批量计算执行引擎,其中最著名的是Spark[61,62]、Tez[63,64]和Flink[65,66]。它们的设计存在一定差异,但有一个共同点:将整个工作流程作为一个作业来处理,而不是将其分解为独立的子作业。
Since they explicitly model the flow of data through several processing stages, these systems are known as dataflow engines . Like MapReduce, they work by repeatedly calling a user-defined function to process one record at a time on a single thread. They parallelize work by partitioning inputs, and they copy the output of one function over the network to become the input to another function.
由于其明确地模拟了数据流通过多个处理阶段的过程,这些系统被称为数据流引擎。与MapReduce一样,它们通过重复调用用户定义的函数来处理单个线程上的一条记录。它们通过将输入分区来并行化工作,并将一个函数的输出复制到网络上,以成为另一个函数的输入。
Unlike in MapReduce, these functions need not take the strict roles of alternating map and reduce, but instead can be assembled in more flexible ways. We call these functions operators , and the dataflow engine provides several different options for connecting one operator’s output to another’s input:
与MapReduce不同,这些函数不需要采取严格的交替映射和归约角色,而是可以以更灵活的方式组装。我们将这些函数称为运算符,数据流引擎提供了连接一个运算符输出到另一个运算符输入的几种不同选项。
-
One option is to repartition and sort records by key, like in the shuffle stage of MapReduce (see “Distributed execution of MapReduce” ). This feature enables sort-merge joins and grouping in the same way as in MapReduce.
一种选项是重新分区并按键值排序记录,就像在MapReduce的shuffle阶段中一样(请参见“MapReduce的分布式执行”)。此功能使得排序合并连接和分组与MapReduce中的方式相同。
-
Another possibility is to take several inputs and to partition them in the same way, but skip the sorting. This saves effort on partitioned hash joins, where the partitioning of records is important but the order is irrelevant because building the hash table randomizes the order anyway.
另一个可能性是采用多个输入并以相同的方式对它们进行分区,但跳过排序。这可以节省在分区哈希连接上的工作,其中记录的分区很重要,但顺序是无关紧要的,因为构建哈希表会随机排列订单。
-
For broadcast hash joins, the same output from one operator can be sent to all partitions of the join operator.
在广播哈希连接中,可以将来自一个运算符的相同输出发送到连接运算符的所有分区。
This style of processing engine is based on research systems like Dryad [ 67 ] and Nephele [ 68 ], and it offers several advantages compared to the MapReduce model:
这种处理引擎的风格是基于Dryad [67]和Nephele [68]等研究系统,相比于MapReduce模型,它提供了几个优点:
-
Expensive work such as sorting need only be performed in places where it is actually required, rather than always happening by default between every map and reduce stage.
昂贵的工作(如排序)只需在实际需要的地方执行,而无需在每个映射和减少阶段之间默认发生。
-
There are no unnecessary map tasks, since the work done by a mapper can often be incorporated into the preceding reduce operator (because a mapper does not change the partitioning of a dataset).
由于Mapper不会更改数据集的分区,因此它完成的工作往往可以合并到先前的Reduce操作符中,因此不存在不必要的映射任务。
-
Because all joins and data dependencies in a workflow are explicitly declared, the scheduler has an overview of what data is required where, so it can make locality optimizations. For example, it can try to place the task that consumes some data on the same machine as the task that produces it, so that the data can be exchanged through a shared memory buffer rather than having to copy it over the network.
由于工作流程中的所有连接和数据依赖项都是明确声明的,因此调度程序可以了解需要在哪里使用数据,因此它可以进行本地化优化。例如,它可以尝试将消耗一些数据的任务放置在与生成它的任务相同的机器上,以便可以通过共享内存缓冲区交换数据,而无需复制它。
-
It is usually sufficient for intermediate state between operators to be kept in memory or written to local disk, which requires less I/O than writing it to HDFS (where it must be replicated to several machines and written to disk on each replica). MapReduce already uses this optimization for mapper output, but dataflow engines generalize the idea to all intermediate state.
通常情况下,操作员之间的中间状态只需保留在内存或写入本地磁盘即可,这比将其写入HDFS(必须复制到多台计算机并在每个副本上写入磁盘)需要更少的I/O。MapReduce已经在mapper输出中使用了这种优化,但数据流引擎则将这个想法推广到所有中间状态。
-
Operators can start executing as soon as their input is ready; there is no need to wait for the entire preceding stage to finish before the next one starts.
运算符可以在其输入准备就绪时立即开始执行;在上一个阶段完成之前,下一个阶段无需等待整个前置阶段的完成。
-
Existing Java Virtual Machine (JVM) processes can be reused to run new operators, reducing startup overheads compared to MapReduce (which launches a new JVM for each task).
现有的Java虚拟机(JVM)进程可以重用,以运行新的运算符,与MapReduce进程相比,不需要为每个任务启动新的JVM,从而减少启动开销。
You can use dataflow engines to implement the same computations as MapReduce workflows, and they usually execute significantly faster due to the optimizations described here. Since operators are a generalization of map and reduce, the same processing code can run on either execution engine: workflows implemented in Pig, Hive, or Cascading can be switched from MapReduce to Tez or Spark with a simple configuration change, without modifying code [ 64 ].
你可以使用数据流引擎来实现与MapReduce工作流相同的计算,并且它们通常由于此处描述的优化而执行速度显着更快。由于操作符是map和reduce的泛化,因此相同的处理代码可以在任何执行引擎上运行:在Pig、Hive或Cascading中实现的工作流可以通过简单的配置更改从MapReduce转换到Tez或Spark,而不需要修改代码[64]。
Tez is a fairly thin library that relies on the YARN shuffle service for the actual copying of data between nodes [ 58 ], whereas Spark and Flink are big frameworks that include their own network communication layer, scheduler, and user-facing APIs. We will discuss those high-level APIs shortly.
Tez是一个相对轻量级的库,它依赖于YARN shuffle服务来实际复制节点之间的数据[58],而Spark和Flink是大型框架,包括自己的网络通信层、调度器和面向用户的API。我们稍后将讨论这些高级API。
Fault tolerance
An advantage of fully materializing intermediate state to a distributed filesystem is that it is durable, which makes fault tolerance fairly easy in MapReduce: if a task fails, it can just be restarted on another machine and read the same input again from the filesystem.
将中间状态完全实现到分布式文件系统中的一个优势是它是持久化的,在MapReduce中容错非常容易:如果一个任务失败,它可以在另一台机器上重新启动,并从文件系统中再次读取相同的输入。
Spark, Flink, and Tez avoid writing intermediate state to HDFS, so they take a different approach to tolerating faults: if a machine fails and the intermediate state on that machine is lost, it is recomputed from other data that is still available (a prior intermediary stage if possible, or otherwise the original input data, which is normally on HDFS).
Spark、Flink和Tez不会将中间状态写入HDFS,因此它们采取不同的方法来容错:如果某台机器故障并且该机器上的中间状态丢失,则会从其他仍可用的数据重新计算该状态(尽可能是前一个中间阶段,否则就是通常在HDFS上的原始输入数据)。
To enable this recomputation, the framework must keep track of how a given piece of data was computed—which input partitions it used, and which operators were applied to it. Spark uses the resilient distributed dataset (RDD) abstraction for tracking the ancestry of data [ 61 ], while Flink checkpoints operator state, allowing it to resume running an operator that ran into a fault during its execution [ 66 ].
为了实现这种重新计算,该框架必须跟踪给定数据是如何计算的——它使用了哪些输入分区以及应用了哪些运算符。Spark使用弹性分布式数据集(RDD)抽象来跟踪数据的谱系,而Flink通过检查点操作状态来恢复运行因执行期间遇到错误的操作符。
When recomputing data, it is important to know whether the computation is deterministic : that is, given the same input data, do the operators always produce the same output? This question matters if some of the lost data has already been sent to downstream operators. If the operator is restarted and the recomputed data is not the same as the original lost data, it becomes very hard for downstream operators to resolve the contradictions between the old and new data. The solution in the case of nondeterministic operators is normally to kill the downstream operators as well, and run them again on the new data.
当重新计算数据时,了解计算是否具有确定性非常重要:也就是说,给定相同的输入数据,算子是否总是产生相同的输出?如果一些已丢失的数据已经发送到下游算子,则这个问题非常重要。如果算子被重新启动,并且重新计算的数据与原始丢失的数据不同,则下游算子将很难解决旧数据和新数据之间的矛盾。在非确定性算子的情况下,解决方案通常是将下游算子也杀死,并在新数据上再次运行它们。
In order to avoid such cascading faults, it is better to make operators deterministic. Note however that it is easy for nondeterministic behavior to accidentally creep in: for example, many programming languages do not guarantee any particular order when iterating over elements of a hash table, many probabilistic and statistical algorithms explicitly rely on using random numbers, and any use of the system clock or external data sources is nondeterministic. Such causes of nondeterminism need to be removed in order to reliably recover from faults, for example by generating pseudorandom numbers using a fixed seed.
为了避免此类级联故障,最好使操作员具有确定性。然而请注意,非确定性行为很容易意外混入:例如,许多编程语言在迭代哈希表元素时不保证任何特定顺序,许多概率和统计算法明确依赖使用随机数,任何使用系统时钟或外部数据源都是非确定性的。必须消除这些非确定性的原因,以可靠地从故障中恢复,例如通过使用固定种子生成伪随机数。
Recovering from faults by recomputing data is not always the right answer: if the intermediate data is much smaller than the source data, or if the computation is very CPU-intensive, it is probably cheaper to materialize the intermediate data to files than to recompute it.
通过重新计算数据来恢复故障并不总是正确的答案:如果中间数据比源数据要小得多,或者计算非常耗费 CPU,那么将中间数据实现为文件比重新计算要便宜些。
Discussion of materialization
Returning to the Unix analogy, we saw that MapReduce is like writing the output of each command to a temporary file, whereas dataflow engines look much more like Unix pipes. Flink especially is built around the idea of pipelined execution: that is, incrementally passing the output of an operator to other operators, and not waiting for the input to be complete before starting to process it.
回到Unix的比喻,我们发现MapReduce就像是将每个命令的输出写入临时文件,而数据流引擎更像是Unix的管道。特别是,Flink是建立在管道执行的思想基础上的:即逐步将一个运算符的输出传递给其他运算符,而不必等待输入完成才开始处理。
A sorting operation inevitably needs to consume its entire input before it can produce any output, because it’s possible that the very last input record is the one with the lowest key and thus needs to be the very first output record. Any operator that requires sorting will thus need to accumulate state, at least temporarily. But many other parts of a workflow can be executed in a pipelined manner.
排序操作无论如何都需要在产生任何输出之前消耗其完整的输入,因为最后一个输入的记录可能是具有最低键值的记录,并且需要成为第一个输出记录。因此,任何需要排序的操作都需要累积状态,至少是暂时的。但是,工作流程的许多其他部分可以以流水线方式执行。
When the job completes, its output needs to go somewhere durable so that users can find it and use it—most likely, it is written to the distributed filesystem again. Thus, when using a dataflow engine, materialized datasets on HDFS are still usually the inputs and the final outputs of a job. Like with MapReduce, the inputs are immutable and the output is completely replaced. The improvement over MapReduce is that you save yourself writing all the intermediate state to the filesystem as well.
任务完成后,其输出需要被存储到某个可靠的地方,以便用户可以找到并使用它——很可能,它会再次写入分布式文件系统。因此,在使用数据流引擎时,HDFS上的物化数据集通常仍然是作业的输入和最终输出。与MapReduce一样,输入是不可变的,输出完全被替换。相对于MapReduce的改进在于,您无需将所有中间状态都写入文件系统。
Graphs and Iterative Processing
In “Graph-Like Data Models” we discussed using graphs for modeling data, and using graph query languages to traverse the edges and vertices in a graph. The discussion in Chapter 2 was focused around OLTP-style use: quickly executing queries to find a small number of vertices matching certain criteria.
在“类图形数据模型”中,我们讨论了使用图形来建立数据模型,以及使用图形查询语言遍历图形中的边缘和顶点。第2章的讨论集中在OLTP样式的用途上:快速执行查询以找到满足某些条件的少量顶点。
It is also interesting to look at graphs in a batch processing context, where the goal is to perform some kind of offline processing or analysis on an entire graph. This need often arises in machine learning applications such as recommendation engines, or in ranking systems. For example, one of the most famous graph analysis algorithms is PageRank [ 69 ], which tries to estimate the popularity of a web page based on what other web pages link to it. It is used as part of the formula that determines the order in which web search engines present their results.
在批处理环境下查看图表也很有趣,因为目标是对整个图表执行某种离线处理或分析。这种需求在机器学习应用程序,如推荐引擎或排名系统中经常出现。例如,最著名的图表分析算法之一是PageRank[69],它试图根据其他网页链接到它的数量来估计网页的流行程度。它作为公式的一部分被用来确定Web搜索引擎呈现其结果的顺序。
Note
Dataflow engines like Spark, Flink, and Tez (see “Materialization of Intermediate State” ) typically arrange the operators in a job as a directed acyclic graph (DAG). This is not the same as graph processing: in dataflow engines, the flow of data from one operator to another is structured as a graph, while the data itself typically consists of relational-style tuples. In graph processing, the data itself has the form of a graph. Another unfortunate naming confusion!
像Spark、 Flink和Tez这样的数据流引擎(参见“中间状态的实体化”)通常将作业中的运算符排列成有向无环图(DAG)。这与图处理不同:在数据流引擎中,从一个运算符到另一个运算符的数据流被组织成图形,而数据本身通常包含关系型元组。在图形处理中,数据本身的形式就是图形。另一个不幸的命名混淆!
Many graph algorithms are expressed by traversing one edge at a time, joining one vertex with an adjacent vertex in order to propagate some information, and repeating until some condition is met—for example, until there are no more edges to follow, or until some metric converges. We saw an example in Figure 2-6 , which made a list of all the locations in North America contained in a database by repeatedly following edges indicating which location is within which other location (this kind of algorithm is called a transitive closure ).
许多图算法通过每次遍历一条边来表达,将一个顶点连接到相邻的顶点以传播一些信息,并重复这个过程直到满足某些条件——例如,直到没有更多的边可跟随,或者直到某些度量收敛。我们在图2-6中看到了一个例子,它通过反复跟随指示哪个位置在哪个其他位置中的边来列出包含在数据库中的所有北美位置的列表(这种算法称为传递闭包)。 许多图算法通过每次遍历一条边来表达,将一个顶点连接到相邻的顶点以传播一些信息,并重复这个过程直到满足某些条件——例如,直到没有更多的边可跟随,或者直到某些度量收敛。我们在图2-6中看到了一个例子,它通过反复跟随指示哪个位置在哪个其他位置中的边来列出包含在数据库中的所有北美位置的列表(这种算法称为传递闭包)。
It is possible to store a graph in a distributed filesystem (in files containing lists of vertices and edges), but this idea of “repeating until done” cannot be expressed in plain MapReduce, since it only performs a single pass over the data. This kind of algorithm is thus often implemented in an iterative style:
可以在分布式文件系统中存储图形(包含顶点和边列表的文件),但是“反复执行直到完成”的想法不能用普通MapReduce表达,因为它只对数据执行单个传递。因此,这种算法通常以迭代式实现。
-
An external scheduler runs a batch process to calculate one step of the algorithm.
一个外部调度程序运行批处理过程来计算算法的一步。
-
When the batch process completes, the scheduler checks whether it has finished (based on the completion condition—e.g., there are no more edges to follow, or the change compared to the last iteration is below some threshold).
当批处理过程完成后,调度程序会检查它是否已完成(基于完成条件,例如没有更多的边可跟随,或与上一次迭代相比的变化低于某个阈值)。
-
If it has not yet finished, the scheduler goes back to step 1 and runs another round of the batch process.
如果还没有完成,调度程序将返回第一步并运行另一轮批处理过程。
This approach works, but implementing it with MapReduce is often very inefficient, because MapReduce does not account for the iterative nature of the algorithm: it will always read the entire input dataset and produce a completely new output dataset, even if only a small part of the graph has changed compared to the last iteration.
这种方法可以奏效,但通过MapReduce实现通常非常低效,因为MapReduce并不考虑迭代算法的本质:即使只有图的一小部分与上一次迭代相比发生了变化,它也会读取整个输入数据集并生成完全新的输出数据集。
The Pregel processing model
As an optimization for batch processing graphs, the bulk synchronous parallel (BSP) model of computation [ 70 ] has become popular. Among others, it is implemented by Apache Giraph [ 37 ], Spark’s GraphX API, and Flink’s Gelly API [ 71 ]. It is also known as the Pregel model, as Google’s Pregel paper popularized this approach for processing graphs [ 72 ].
作为优化批处理图形的一种模型,批量同步并行(BSP)计算模型 [70] 变得流行。其中,它由 Apache Giraph [37]、Spark的GraphX API 和 Flink的Gelly API [71] 实现。它也被称为Pregel模型,因为谷歌的Pregel论文推广了这种处理图形的方法 [72]。
Recall that in MapReduce, mappers conceptually “send a message” to a particular call of the reducer because the framework collects together all the mapper outputs with the same key. A similar idea is behind Pregel: one vertex can “send a message” to another vertex, and typically those messages are sent along the edges in a graph.
在MapReduce中,映射器在概念上“发送消息”到特定的减速器调用,因为框架将所有具有相同键的映射器输出集合在一起。Pregel背后的一个类似的思想:一个顶点可以“发送消息”到另一个顶点,通常这些消息沿着图中的边发送。
In each iteration, a function is called for each vertex, passing it all the messages that were sent to it—much like a call to the reducer. The difference from MapReduce is that in the Pregel model, a vertex remembers its state in memory from one iteration to the next, so the function only needs to process new incoming messages. If no messages are being sent in some part of the graph, no work needs to be done.
在每次迭代中,对于每个顶点都会调用一个函数,并将所有发送到该顶点的消息传递给它,就像调用Reducer一样。与MapReduce的不同之处在于,在Pregel模型中,一个顶点会记住它的状态,从一个迭代到下一个迭代,因此函数只需要处理新来的传入消息。如果在图的某些部分没有发送消息,则无需进行任何工作。
It’s a bit similar to the actor model (see “Distributed actor frameworks” ), if you think of each vertex as an actor, except that vertex state and messages between vertices are fault-tolerant and durable, and communication proceeds in fixed rounds: at every iteration, the framework delivers all messages sent in the previous iteration. Actors normally have no such timing guarantee.
这有点类似于演员模型(参见“分布式演员框架”),如果您将每个顶点视为演员,除了顶点状态和顶点之间的消息是容错和可持续的,而且通信是在固定回合中进行的:在每次迭代中,框架传递了在上一次迭代中发送的所有消息。演员通常没有这样的时间保证。
Fault tolerance
The fact that vertices can only communicate by message passing (not by querying each other directly) helps improve the performance of Pregel jobs, since messages can be batched and there is less waiting for communication. The only waiting is between iterations: since the Pregel model guarantees that all messages sent in one iteration are delivered in the next iteration, the prior iteration must completely finish, and all of its messages must be copied over the network, before the next one can start.
节点只能通过消息传递来通信(而不能直接查询彼此),这有助于提高Pregel作业的性能,因为消息可以批处理,通信等待时间也较短。唯一需要等待的是在迭代之间:由于Pregel模型保证在一次迭代中发送的所有消息都会在下一次迭代中到达,因此前一次迭代必须完全结束,并且所有消息必须复制到网络中,然后才能开始下一次迭代。
Even though the underlying network may drop, duplicate, or arbitrarily delay messages (see “Unreliable Networks” ), Pregel implementations guarantee that messages are processed exactly once at their destination vertex in the following iteration. Like MapReduce, the framework transparently recovers from faults in order to simplify the programming model for algorithms on top of Pregel.
尽管底层网络可能会丢失、复制或任意延迟消息(参见“不可靠网络”),但 Pregel 的实现保证消息在下一次迭代时在其目标顶点上仅被处理一次。与 MapReduce 类似,该框架能够透明地从故障中恢复,以简化基于 Pregel 的算法的编程模型。
This fault tolerance is achieved by periodically checkpointing the state of all vertices at the end of an iteration—i.e., writing their full state to durable storage. If a node fails and its in-memory state is lost, the simplest solution is to roll back the entire graph computation to the last checkpoint and restart the computation. If the algorithm is deterministic and messages are logged, it is also possible to selectively recover only the partition that was lost (like we previously discussed for dataflow engines) [ 72 ].
这种容错性是通过在迭代结束时定期检查点所有顶点的状态,即将它们的完整状态写入可持久化存储中来实现的。如果一个节点失败并且其内存中的状态丢失,最简单的解决方案是将整个图计算回滚到最后一个检查点并重新启动计算。如果算法是确定性的且消息被记录,则也可以选择性地恢复只丢失的分区(就像我们之前讨论过的数据流引擎一样)[72]。
Parallel execution
A vertex does not need to know on which physical machine it is executing; when it sends messages to other vertices, it simply sends them to a vertex ID. It is up to the framework to partition the graph—i.e., to decide which vertex runs on which machine, and how to route messages over the network so that they end up in the right place.
一个顶点不需要知道它在哪台物理机器上执行;当它发送消息到其他顶点时,它只是将它们发送给顶点ID。框架负责对图进行分区——即决定哪个顶点在哪个机器上运行,并通过网络路由消息,使它们到达正确的位置。
Because the programming model deals with just one vertex at a time (sometimes called “thinking like a vertex”), the framework may partition the graph in arbitrary ways. Ideally it would be partitioned such that vertices are colocated on the same machine if they need to communicate a lot. However, finding such an optimized partitioning is hard—in practice, the graph is often simply partitioned by an arbitrarily assigned vertex ID, making no attempt to group related vertices together.
由于编程模型仅处理一个顶点(有时称为“像顶点一样思考”),因此框架可以以任意方式分区图形。理想情况下,应该将顶点分配在同一台机器上,以便进行大量通信。然而,寻找这样一个优化的分区很难-在实践中,图通常只是按任意分配的顶点ID进行分区,不尝试将相关的顶点分组在一起。
As a result, graph algorithms often have a lot of cross-machine communication overhead, and the intermediate state (messages sent between nodes) is often bigger than the original graph. The overhead of sending messages over the network can significantly slow down distributed graph algorithms.
因此,图形算法通常有很多机器间通信开销,中间状态(节点之间发送的消息)通常比原始图形更大。在网络上发送消息的开销会严重减慢分布式图形算法的速度。
For this reason, if your graph can fit in memory on a single computer, it’s quite likely that a single-machine (maybe even single-threaded) algorithm will outperform a distributed batch process [ 73 , 74 ]. Even if the graph is bigger than memory, it can fit on the disks of a single computer, single-machine processing using a framework such as GraphChi is a viable option [ 75 ]. If the graph is too big to fit on a single machine, a distributed approach such as Pregel is unavoidable; efficiently parallelizing graph algorithms is an area of ongoing research [ 76 ].
因此,如果你的图能够在一台计算机的内存中容纳,那么单机(甚至单线程)算法很可能会胜过分布式批处理[73,74]。 即使图形比内存大,它也可以适合单台计算机的磁盘,使用GraphChi这样的框架进行单机处理是一个可行的选择[75]。 如果图太大而无法适合单台机器,则需要采用分布式方法,如Pregel;高效并行化图形算法是正在进行的研究领域[76]。
High-Level APIs and Languages
Over the years since MapReduce first became popular, the execution engines for distributed batch processing have matured. By now, the infrastructure has become robust enough to store and process many petabytes of data on clusters of over 10,000 machines. As the problem of physically operating batch processes at such scale has been considered more or less solved, attention has turned to other areas: improving the programming model, improving the efficiency of processing, and broadening the set of problems that these technologies can solve.
自从MapReduce首次流行以来,分布式批处理的执行引擎得到了发展。到现在,基础设施已经变得足够强大,在超过10,000台机器的集群上存储和处理许多PB级别的数据。由于在这样的规模下物理操作批处理问题已基本解决,注意力已经转向其他领域:改进编程模型、提高处理效率和扩大这些技术可解决的问题范围。
As discussed previously, higher-level languages and APIs such as Hive, Pig, Cascading, and Crunch became popular because programming MapReduce jobs by hand is quite laborious. As Tez emerged, these high-level languages had the additional benefit of being able to move to the new dataflow execution engine without the need to rewrite job code. Spark and Flink also include their own high-level dataflow APIs, often taking inspiration from FlumeJava [ 34 ].
如前所述,较高级别的语言和API(如Hive、Pig、Cascading和Crunch)变得流行是因为手动编写MapReduce作业非常费力。随着Tez的出现,这些高级语言还具有无需重新编写作业代码即可移动到新的数据流执行引擎的附加好处。Spark和Flink还包括自己的高级数据流API,通常受到FlumeJava[34]的启发。
These dataflow APIs generally use relational-style building blocks to express a computation: joining datasets on the value of some field; grouping tuples by key; filtering by some condition; and aggregating tuples by counting, summing, or other functions. Internally, these operations are implemented using the various join and grouping algorithms that we discussed earlier in this chapter.
这些数据流API通常使用关系式构建块来表达计算:在某个字段的值上连接数据集;按键分组元组;按条件过滤;通过计数、求和或其他函数聚合元组。在内部,这些操作使用我们在本章早些时候讨论过的各种连接和分组算法实现。
Besides the obvious advantage of requiring less code, these high-level interfaces also allow interactive use, in which you write analysis code incrementally in a shell and run it frequently to observe what it is doing. This style of development is very helpful when exploring a dataset and experimenting with approaches for processing it. It is also reminiscent of the Unix philosophy, which we discussed in “The Unix Philosophy” .
除了需要更少的代码之外,这些高级接口还允许交互式使用,在其中您可以在shell中逐步编写分析代码,并经常运行它以观察它的执行情况。这种开发风格在探索数据集和尝试处理方法时非常有帮助。这也让人想起了Unix哲学,我们在“Unix哲学”一文中已经讨论过。
Moreover, these high-level interfaces not only make the humans using the system more productive, but they also improve the job execution efficiency at a machine level.
此外,这些高级接口不仅使使用系统的人更加高效,而且也提高了机器级别的工作执行效率。
The move toward declarative query languages
An advantage of specifying joins as relational operators, compared to spelling out the code that performs the join, is that the framework can analyze the properties of the join inputs and automatically decide which of the aforementioned join algorithms would be most suitable for the task at hand. Hive, Spark, and Flink have cost-based query optimizers that can do this, and even change the order of joins so that the amount of intermediate state is minimized [ 66 , 77 , 78 , 79 ].
将连接指定为关系运算符的优点,与拼写执行连接的代码相比,是框架可以分析连接输入的属性,并自动决定哪种连接算法最适合当前任务。Hive、Spark和Flink都拥有成本基于查询优化器,甚至可以改变连接顺序,以使中间状态的数量最小化。[66, 77, 78, 79]。
The choice of join algorithm can make a big difference to the performance of a batch job, and it is nice not to have to understand and remember all the various join algorithms we discussed in this chapter. This is possible if joins are specified in a declarative way: the application simply states which joins are required, and the query optimizer decides how they can best be executed. We previously came across this idea in “Query Languages for Data” .
连接算法的选择可以对批处理作业的性能产生很大影响,并且不需要了解和记住我们在本章中讨论的所有各种连接算法。 如果连接以声明方式指定,则可能:应用程序仅说明需要哪些连接,查询优化器将决定如何最好地执行它们。 我们之前在“数据查询语言”中遇到了这个想法。
However, in other ways, MapReduce and its dataflow successors are very different from the fully declarative query model of SQL. MapReduce was built around the idea of function callbacks: for each record or group of records, a user-defined function (the mapper or reducer) is called, and that function is free to call arbitrary code in order to decide what to output. This approach has the advantage that you can draw upon a large ecosystem of existing libraries to do things like parsing, natural language analysis, image analysis, and running numerical or statistical algorithms.
然而,在其他方面,MapReduce及其数据流后继者与SQL完全声明式查询模型非常不同。MapReduce是围绕函数回调的想法构建的:对于每个记录或记录组,都会调用用户定义的功能(映射器或减少器),并且该功能可以自由调用任意代码以决定要输出什么。这种方法的优点是,您可以利用大量现有库的生态系统来完成诸如解析、自然语言分析、图像分析以及运行数字或统计算法之类的任务。
The freedom to easily run arbitrary code is what has long distinguished batch processing systems of MapReduce heritage from MPP databases (see “Comparing Hadoop to Distributed Databases” ); although databases have facilities for writing user-defined functions, they are often cumbersome to use and not well integrated with the package managers and dependency management systems that are widely used in most programming languages (such as Maven for Java, npm for JavaScript, and Rubygems for Ruby).
批处理系统和MPP数据库一直以来最大的区别在于能够轻松运行任意代码。尽管数据库也有撰写用户定义函数的功能,但往往使用麻烦,并且不太与大多数编程语言中广泛使用的包管理器和依赖管理系统(例如Java的Maven、JavaScript的npm和Ruby的Rubygems)相结合。
However, dataflow engines have found that there are also advantages to incorporating more declarative features in areas besides joins. For example, if a callback function contains only a simple filtering condition, or it just selects some fields from a record, then there is significant CPU overhead in calling the function on every record. If such simple filtering and mapping operations are expressed in a declarative way, the query optimizer can take advantage of column-oriented storage layouts (see “Column-Oriented Storage” ) and read only the required columns from disk. Hive, Spark DataFrames, and Impala also use vectorized execution (see “Memory bandwidth and vectorized processing” ): iterating over data in a tight inner loop that is friendly to CPU caches, and avoiding function calls. Spark generates JVM bytecode [ 79 ] and Impala uses LLVM to generate native code for these inner loops [ 41 ].
然而,数据流引擎发现,除连接外,在其他领域引入更多声明性功能也有优势。例如,如果回调函数只包含简单的过滤条件,或仅从记录中选择某些字段,则在每个记录上调用函数存在显着的CPU开销。如果这些简单的过滤和映射操作以声明性的方式表示,则查询优化器可以利用面向列的存储布局(请参见“面向列的存储”),并从磁盘中仅读取所需的列。Hive、Spark DataFrames和Impala也使用矢量化执行(请参见“内存带宽和矢量化处理”):在一个紧密的内部循环中迭代数据,这对CPU缓存友好,并避免函数调用。Spark生成JVM字节码[79],而Impala使用LLVM为这些内部循环生成本地代码[41]。
By incorporating declarative aspects in their high-level APIs, and having query optimizers that can take advantage of them during execution, batch processing frameworks begin to look more like MPP databases (and can achieve comparable performance). At the same time, by having the extensibility of being able to run arbitrary code and read data in arbitrary formats, they retain their flexibility advantage.
通过在高级API中添加声明性方面,并拥有可利用它们的查询优化器,批处理框架开始看起来更像MPP数据库(并可实现可比较的性能)。同时,通过具有运行任意代码和以任意格式读取数据的可扩展性,它们保留了灵活性的优势。
Specialization for different domains
While the extensibility of being able to run arbitrary code is useful, there are also many common cases where standard processing patterns keep reoccurring, and so it is worth having reusable implementations of the common building blocks. Traditionally, MPP databases have served the needs of business intelligence analysts and business reporting, but that is just one among many domains in which batch processing is used.
尽管能够运行任意代码的可扩展性很有用,但也有许多常见情况下采用标准处理模式重复出现,因此值得拥有可重复使用的常见构建块的实现。传统上,MPP数据库已经满足了商业智能分析师和商业报告的需求,但这仅仅是批处理被用于许多领域中之一。
Another domain of increasing importance is statistical and numerical algorithms, which are needed for machine learning applications such as classification and recommendation systems. Reusable implementations are emerging: for example, Mahout implements various algorithms for machine learning on top of MapReduce, Spark, and Flink, while MADlib implements similar functionality inside a relational MPP database (Apache HAWQ) [ 54 ].
另一个越来越重要的领域是统计和数字算法,这些算法是机器学习应用(如分类和推荐系统)所需的。可重复使用的实现正在出现:例如,Mahout在MapReduce、Spark和Flink之上实现了各种机器学习算法,而MADlib在关系型MPP数据库(Apache HAWQ)内实现了类似的功能。
Also useful are spatial algorithms such as k-nearest neighbors [ 80 ], which searches for items that are close to a given item in some multi-dimensional space—a kind of similarity search. Approximate search is also important for genome analysis algorithms, which need to find strings that are similar but not identical [ 81 ].
同样有用的是空间算法,例如k-近邻[80],它会在多维空间中搜索与给定项相近的项 - 一种相似性搜索。近似搜索对于基因组分析算法也非常重要,需要找到相似但不完全相同的字符串[81]。
Batch processing engines are being used for distributed execution of algorithms from an increasingly wide range of domains. As batch processing systems gain built-in functionality and high-level declarative operators, and as MPP databases become more programmable and flexible, the two are beginning to look more alike: in the end, they are all just systems for storing and processing data.
批量处理引擎被用于从越来越广泛的领域分布式执行算法。随着批量处理系统获得内置功能和高级声明操作符,以及MPP数据库变得更加可编程和灵活,两者开始看起来更相似:最终,它们都只是存储和处理数据的系统。
Summary
In this chapter we explored the topic of batch processing. We started by looking at Unix tools such
as
awk
,
grep
, and
sort
, and we saw how the design philosophy of those tools is carried forward
into MapReduce and more recent dataflow engines. Some of those design principles are that inputs are
immutable, outputs are intended to become the input to another (as yet unknown) program, and complex
problems are solved by composing small tools that “do one thing well.”
在这一章中,我们探讨了批处理的主题。我们从查看Unix工具如awk、grep和sort开始,看到了这些工具的设计理念如何被延续到MapReduce和更近期的数据流引擎中。其中一些设计原则是输入是不可变的,输出旨在成为另一个(尚未知晓)程序的输入,而复杂问题则通过组合“做一件事情很好”的小工具来解决。
In the Unix world, the uniform interface that allows one program to be composed with another is files and pipes; in MapReduce, that interface is a distributed filesystem. We saw that dataflow engines add their own pipe-like data transport mechanisms to avoid materializing intermediate state to the distributed filesystem, but the initial input and final output of a job is still usually HDFS.
在Unix系统中,允许一个程序与另一个程序组合的统一接口是文件和管道;在MapReduce中,该接口是分布式文件系统。我们看到,数据流引擎添加了自己的类似管道的数据传输机制,以避免将中间状态实现到分布式文件系统中,但作业的初始输入和最终输出通常仍然是HDFS。
The two main problems that distributed batch processing frameworks need to solve are:
分布式批处理框架需要解决的两个主要问题是:
- Partitioning
-
In MapReduce, mappers are partitioned according to input file blocks. The output of mappers is repartitioned, sorted, and merged into a configurable number of reducer partitions. The purpose of this process is to bring all the related data—e.g., all the records with the same key—together in the same place.
在MapReduce中,mappers按照输入文件块进行分区。 Mapper的输出被重新分区,排序并合并为可配置数量的reducer分区。此过程的目的是将所有相关数据(例如具有相同键的所有记录)聚集在同一位置。
Post-MapReduce dataflow engines try to avoid sorting unless it is required, but they otherwise take a broadly similar approach to partitioning.
后MapReduce数据流引擎尝试避免排序,除非需要,但它们在分区方面采取了类似的方法。
- Fault tolerance
-
MapReduce frequently writes to disk, which makes it easy to recover from an individual failed task without restarting the entire job but slows down execution in the failure-free case. Dataflow engines perform less materialization of intermediate state and keep more in memory, which means that they need to recompute more data if a node fails. Deterministic operators reduce the amount of data that needs to be recomputed.
MapReduce 经常会写入磁盘,这使得在单个任务失败时很容易从中恢复,而无需重新启动整个作业,但在没有故障的情况下会降低执行速度。数据流引擎执行的中间状态材料化较少,并将更多的状态保留在内存中,这意味着如果节点失败,它们需要重新计算更多的数据。确定性操作符减少了需要重新计算的数据量。
We discussed several join algorithms for MapReduce, most of which are also internally used in MPP databases and dataflow engines. They also provide a good illustration of how partitioned algorithms work:
我们讨论了几种 MapReduce 的连接算法,其中大部分也在 MPP 数据库和数据流引擎中内部使用。它们也很好地说明了分区算法的工作原理。
- Sort-merge joins
-
Each of the inputs being joined goes through a mapper that extracts the join key. By partitioning, sorting, and merging, all the records with the same key end up going to the same call of the reducer. This function can then output the joined records.
每个连接的输入都经过映射器处理,提取连接键。通过分区,排序和合并,所有相同键的记录最终被发送到相同的reducer中。这个函数可以输出连接的记录。
- Broadcast hash joins
-
One of the two join inputs is small, so it is not partitioned and it can be entirely loaded into a hash table. Thus, you can start a mapper for each partition of the large join input, load the hash table for the small input into each mapper, and then scan over the large input one record at a time, querying the hash table for each record.
两个连接输入中的一个较小,因此它不会被分区,可以完全加载到哈希表中。因此,您可以为大型连接输入的每个分区启动一个映射器,将小输入的哈希表加载到每个映射器中,然后逐个扫描大型输入记录,查询每个记录的哈希表。
- Partitioned hash joins
-
If the two join inputs are partitioned in the same way (using the same key, same hash function, and same number of partitions), then the hash table approach can be used independently for each partition.
如果两个连接输入以相同的方式进行分区(使用相同的键,相同的哈希函数和相同数量的分区),则哈希表方法可以独立地用于每个分区。
Distributed batch processing engines have a deliberately restricted programming model: callback functions (such as mappers and reducers) are assumed to be stateless and to have no externally visible side effects besides their designated output. This restriction allows the framework to hide some of the hard distributed systems problems behind its abstraction: in the face of crashes and network issues, tasks can be retried safely, and the output from any failed tasks is discarded. If several tasks for a partition succeed, only one of them actually makes its output visible.
分布式批处理引擎采用有意限制的编程模型: 回调函数 (例如 mappers 和 reducers) 被假定为无状态的,并且除了它们指定的输出之外,没有任何外部可见的副作用。 这种限制使框架能够将一些艰难的分布式系统问题隐藏在其抽象后面: 在遇到崩溃和网络问题时,任务可以安全地重试,并且任何失败任务的输出都会被丢弃。 如果一个分区的多个任务成功,只有其中一个实际上会使其输出可见。
Thanks to the framework, your code in a batch processing job does not need to worry about implementing fault-tolerance mechanisms: the framework can guarantee that the final output of a job is the same as if no faults had occurred, even though in reality various tasks perhaps had to be retried. These reliable semantics are much stronger than what you usually have in online services that handle user requests and that write to databases as a side effect of processing a request.
由于框架,批处理作业中的代码无需担心实现容错机制:框架可以保证作业的最终输出与没有发生故障时相同,即使实际上可能需要重试各种任务。这些可靠的语义比通常处理用户请求并以处理请求的副作用写入数据库的在线服务强得多。
The distinguishing feature of a batch processing job is that it reads some input data and produces some output data, without modifying the input—in other words, the output is derived from the input. Crucially, the input data is bounded : it has a known, fixed size (for example, it consists of a set of log files at some point in time, or a snapshot of a database’s contents). Because it is bounded, a job knows when it has finished reading the entire input, and so a job eventually completes when it is done.
批处理作业的特点是读取一些输入数据并生成一些输出数据,而不修改输入数据,换句话说,输出是由输入得出的。关键的是,输入数据是有界限的:具有已知的、固定的大小(例如,在某个时刻由一组日志文件组成,或者是数据库内容的快照)。由于它是有界的,作业知道何时完成读取整个输入,因此作业完成时才最终完成。
In the next chapter, we will turn to stream processing, in which the input is unbounded —that is, you still have a job, but its inputs are never-ending streams of data. In this case, a job is never complete, because at any time there may still be more work coming in. We shall see that stream and batch processing are similar in some respects, but the assumption of unbounded streams also changes a lot about how we build systems.
在下一章中,我们将转向流处理,其中输入是无界的 - 也就是说,您仍有一份工作,但其输入是永无止境的数据流。在这种情况下,工作永远不会完成,因为随时可能还有更多的工作进来。我们将看到,在某些方面,流和批处理是相似的,但是假设为无界流也改变了我们构建系统的方式。
Footnotes
i
Some people love to point out that
cat
is unnecessary here, as the input file could be given directly as an argument to
awk
. However, the linear pipeline is more apparent when written like this.
有些人喜欢指出,猫在这里是不必要的,因为输入文件可以直接作为参数提供给awk。然而,这样写线性管道更明显。
ii Another example of a uniform interface is URLs and HTTP, the foundations of the web. A URL identifies a particular thing (resource) on a website, and you can link to any URL from any other website. A user with a web browser can thus seamlessly jump between websites by following links, even though the servers may be operated by entirely unrelated organizations. This principle seems obvious today, but it was a key insight in making the web the success that it is today. Prior systems were not so uniform: for example, in the era of bulletin board systems (BBSs), each system had its own phone number and baud rate configuration. A reference from one BBS to another would have to be in the form of a phone number and modem settings; the user would have to hang up, dial the other BBS, and then manually find the information they were looking for. It wasn’t possible to link directly to some piece of content inside another BBS.
另一个统一接口的例子是URL和HTTP,是Web的基础。 URL识别网站上的特定事物(资源),您可以从任何其他网站链接到任何URL。通过遵循链接,带有Web浏览器的用户可以轻松地在网站之间跳转,即使Web服务器可以由完全不相关的组织操作。这个原则今天似乎很明显,但它是使Web取得今天的成功的关键洞察力。以前的系统不是这么统一:例如,在公告板系统(BBS)的时代,每个系统都有自己的电话号码和波特率配置。从一个BBS到另一个BBS的参考必须以电话号码和调制解调器设置的形式出现;用户必须挂断电话,拨打另一个BBS,然后手动查找他们正在寻找的信息。无法直接链接到另一个BBS中的某些内容。
iii
Except by
using a separate tool, such as
netcat
or
curl
. Unix started out trying to
represent everything as files, but the BSD sockets API deviated from that convention
[
17
]. The research operating systems
Plan
9
and
Inferno
are more consistent in their use of files: they represent a TCP
connection as a file in
/net/tcp
[
18
].
除非使用其他工具,如netcat或curl,否则无法实现。Unix最初试图将所有内容表示为文件,但BSD套接字API偏离了这个约定[17]。研究操作系统Plan 9和Inferno在使用文件上更加一致:它们将TCP连接表示为 /net/tcp中的文件[18]。
iv One difference is that with HDFS, computing tasks can be scheduled to run on the machine that stores a copy of a particular file, whereas object stores usually keep storage and computation separate. Reading from a local disk has a performance advantage if network bandwidth is a bottleneck. Note however that if erasure coding is used, the locality advantage is lost, because the data from several machines must be combined in order to reconstitute the original file [ 20 ].
一个区别是,使用HDFS可以将计算任务安排在存储特定文件副本的机器上运行,而对象存储通常将存储和计算分开。 如果网络带宽成为瓶颈,则从本地磁盘读取具有性能优势。但是请注意,如果使用纠删码,则会失去局部化优势,因为必须将来自多台计算机的数据组合在一起才能重新组合原始文件。
v The joins we talk about in this book are generally equi-joins , the most common type of join, in which a record is associated with other records that have an identical value in a particular field (such as an ID). Some databases support more general types of joins, for example using a less-than operator instead of an equality operator, but we do not have space to cover them here.
本书中讨论的连接通常是等值连接,这是最常见的连接类型,其中一条记录与在特定字段(如ID)中具有相同值的其他记录相关联。某些数据库支持更一般的连接类型,例如使用小于运算符而不是等于运算符,但我们没有空间在此处涵盖它们。
vi This example assumes that there is exactly one entry for each key in the hash table, which is probably true with a user database (a user ID uniquely identifies a user). In general, the hash table may need to contain several entries with the same key, and the join operator will output all matches for a key.
此示例假定哈希表中每个键都有唯一一个条目,这在用户数据库中可能是正确的(用户ID唯一标识用户)。一般来说,哈希表可能需要包含多个具有相同键的条目,连接操作符将输出键的所有匹配项。
References
[ 1 ] Jeffrey Dean and Sanjay Ghemawat: “ MapReduce: Simplified Data Processing on Large Clusters ,” at 6th USENIX Symposium on Operating System Design and Implementation (OSDI), December 2004.
[1] Jeffrey Dean 和 Sanjay Ghemawat:“MapReduce:大型集群上的简化数据处理”,于2004年12月的第六届USENIX操作系统设计和实现研讨会(OSDI)上。
[ 2 ] Joel Spolsky: “ The Perils of JavaSchools ,” joelonsoftware.com , December 25, 2005.
[2] Joel Spolsky: “Java 学校的危险”,来源于 joelonsoftware.com,2005 年 12 月 25 日。
[ 3 ] Shivnath Babu and Herodotos Herodotou: “ Massively Parallel Databases and MapReduce Systems ,” Foundations and Trends in Databases , volume 5, number 1, pages 1–104, November 2013. doi:10.1561/1900000036
"[3] Shivnath Babu and Herodotos Herodotou: “Massively Parallel Databases and MapReduce Systems,” Foundations and Trends in Databases, volume 5, number 1, pages 1–104, November 2013. doi:10.1561/1900000036" [3] Shivnath Babu和Herodotos Herodotou:“高度并行的数据库和MapReduce系统”,《数据库基础与趋势》第5卷,第1期,第1-104页,2013年11月。doi:10.1561/1900000036。
[ 4 ] David J. DeWitt and Michael Stonebraker: “ MapReduce: A Major Step Backwards ,” originally published at databasecolumn.vertica.com , January 17, 2008.
[4] David J. DeWitt 和 Michael Stonebraker: “MapReduce:一个重大的倒退”,最初发表于 databasecolumn.vertica.com,2008 年 1 月 17 日。 [4] David J. DeWitt 和 Michael Stonebraker: “MapReduce:一个重大的倒退”,最初发表于 databasecolumn.vertica.com,2008 年 1 月 17 日。
[ 5 ] Henry Robinson: “ The Elephant Was a Trojan Horse: On the Death of Map-Reduce at Google ,” the-paper-trail.org , June 25, 2014.
[5] 亨利·罗宾逊:“大象是特洛伊木马:论谷歌Map-Reduce之死”,the-paper-trail.org,2014年6月25日。
[ 6 ] “ The Hollerith Machine ,” United States Census Bureau, census.gov .
[6] “洪勒里特机器”,美国人口普查局,census.gov。
[ 7 ] “ IBM 82, 83, and 84 Sorters Reference Manual ,” Edition A24-1034-1, International Business Machines Corporation, July 1962.
[7] “IBM 82、83和84分类机参考手册”,A24-1034-1版,国际商业机器公司,1962年7月。
[ 8 ] Adam Drake: “ Command-Line Tools Can Be 235x Faster than Your Hadoop Cluster ,” aadrake.com , January 25, 2014.
“命令行工具可比你的Hadoop集群快235倍”, Adam Drake,aadrake.com,2014年1月25日。
[ 9 ] “ GNU Coreutils 8.23 Documentation ,” Free Software Foundation, Inc., 2014.
"[9] GNU Coreutils 8.23文档,自由软件基金会,2014年."
[ 10 ] Martin Kleppmann: “ Kafka, Samza, and the Unix Philosophy of Distributed Data ,” martin.kleppmann.com , August 5, 2015.
[10] Martin Kleppmann:“Kafka、Samza和分布式数据的Unix哲学”, martin.kleppmann.com, 2015年8月5日。
[ 11 ] Doug McIlroy: Internal Bell Labs memo , October 1964. Cited in: Dennis M. Richie: “ Advice from Doug McIlroy ,” cm.bell-labs.com .
[11] 道格·麦克罗伊(Doug McIlroy): 1964年10月,AT&T贝尔实验室内部备忘录。引用自:丹尼斯·里奇(Dennis M. Richie): “道格·麦克罗伊的建议”, cm.bell-labs.com。
[ 12 ] M. D. McIlroy, E. N. Pinson, and B. A. Tague: “ UNIX Time-Sharing System: Foreword ,” The Bell System Technical Journal , volume 57, number 6, pages 1899–1904, July 1978.
【12】M.D. McIlroy,E.N. Pinson 和 B.A. Tague: “UNIX 分时系统:前言,” 贝尔系统技术杂志,第57卷,第6 期,页码1899–1904, 1978 年7月。
[ 13 ] Eric S. Raymond: The Art of UNIX Programming . Addison-Wesley, 2003. ISBN: 978-0-13-142901-7
[13] Eric S. Raymond: UNIX编程艺术。Addison-Wesley出版社,2003年。ISBN: 978-0-13-142901-7。
[ 14 ] Ronald Duncan: “ Text File Formats – ASCII Delimited Text – Not CSV or TAB Delimited Text ,” ronaldduncan.wordpress.com , October 31, 2009.
[14]罗纳德·邓肯: “文本文件格式-ASCII分隔文本-不是CSV或Tab分隔文本,” ronaldduncan.wordpress.com,2009年10月31日。
[ 15 ] Alan Kay: “ Is ‘Software Engineering’ an Oxymoron? ,” tinlizzie.org .
艾伦·凯[15]: “软件工程是一个矛盾词吗?”(来源于tinlizzie.org)
[ 16 ] Martin Fowler: “ InversionOfControl ,” martinfowler.com , June 26, 2005.
[16] Martin Fowler: “控制反转,” martinfowler.com,2005年6月26日。
[ 17 ] Daniel J. Bernstein: “ Two File Descriptors for Sockets ,” cr.yp.to .
丹尼尔·J·伯恩斯坦:「套接字使用两个文件描述符」,cr.yp.to。
[ 18 ] Rob Pike and Dennis M. Ritchie: “ The Styx Architecture for Distributed Systems ,” Bell Labs Technical Journal , volume 4, number 2, pages 146–152, April 1999.
"[18] Rob Pike和Dennis M. Ritchie: “分布式系统的Styx架构”,贝尔实验室技术杂志,卷4,号码2,页码146-152,1999年4月。"
[ 19 ] Sanjay Ghemawat, Howard Gobioff, and Shun-Tak Leung: “ The Google File System ,” at 19th ACM Symposium on Operating Systems Principles (SOSP), October 2003. doi:10.1145/945445.945450
[19] Sanjay Ghemawat, Howard Gobioff, 和Shun-Tak Leung:“谷歌文件系统”,发表于2003年10月的第19届ACM操作系统原理研讨会(SOSP)。doi:10.1145/945445.945450。
[ 20 ] Michael Ovsiannikov, Silvius Rus, Damian Reeves, et al.: “ The Quantcast File System ,” Proceedings of the VLDB Endowment , volume 6, number 11, pages 1092–1101, August 2013. doi:10.14778/2536222.2536234
[20] Michael Ovsiannikov, Silvius Rus, Damian Reeves等人: “Quantcast文件系统”,《VLDB终极论文集》第6卷,第11号,页面1092-1101,2013年8月。 doi:10.14778/2536222.2536234。
[ 21 ] “ OpenStack Swift 2.6.1 Developer Documentation ,” OpenStack Foundation, docs.openstack.org , March 2016.
[21] "OpenStack Swift 2.6.1开发者文档”,OpenStack基金会,docs.openstack.org,2016年3月。
[ 22 ] Zhe Zhang, Andrew Wang, Kai Zheng, et al.: “ Introduction to HDFS Erasure Coding in Apache Hadoop ,” blog.cloudera.com , September 23, 2015.
"[22] 张哲,安德鲁·王,郑凯等人:“Apache Hadoop中HDFS纠删码入门”,blog.cloudera.com,2015年9月23日发布。"
[ 23 ] Peter Cnudde: “ Hadoop Turns 10 ,” yahoohadoop.tumblr.com , February 5, 2016.
"Peter Cnudde:“Hadoop迈入第10个年头”,yahoohadoop.tumblr.com,2016年2月5日。"
[ 24 ] Eric Baldeschwieler: “ Thinking About the HDFS vs. Other Storage Technologies ,” hortonworks.com , July 25, 2012.
"[24] Eric Baldeschwieler: “关于HDFS与其他存储技术的思考”,hortonworks.com,2012年7月25日。"
[ 25 ] Brendan Gregg: “ Manta: Unix Meets Map Reduce ,” dtrace.org , June 25, 2013.
“Manta: Unix Meets Map Reduce”,布伦丹·格雷格, dtrace.org,2013年6月25日。
[ 26 ] Tom White: Hadoop: The Definitive Guide , 4th edition. O’Reilly Media, 2015. ISBN: 978-1-491-90163-2
[26] Tom White: Hadoop: 完全指南, 第四版. O'Reilly 媒体, 2015. ISBN: 978-1-491-90163-2
[ 27 ] Jim N. Gray: “ Distributed Computing Economics ,” Microsoft Research Tech Report MSR-TR-2003-24, March 2003.
[27]吉姆·格雷: 《分布式计算经济学》,微软研究技术报告MSR-TR-2003-24,2003年3月。
[ 28 ] Márton Trencséni: “ Luigi vs Airflow vs Pinball ,” bytepawn.com , February 6, 2016.
"[28] Márton Trencséni: “Luigi vs Airflow vs Pinball,” bytepawn.com, February 6, 2016." "[28] Márton Trencséni:“Luigi vs Airflow vs Pinball,”bytepawn.com,2016年2月6日。"
[ 29 ] Roshan Sumbaly, Jay Kreps, and Sam Shah: “ The ‘Big Data’ Ecosystem at LinkedIn ,” at ACM International Conference on Management of Data (SIGMOD), July 2013. doi:10.1145/2463676.2463707
【29】Roshan Sumbaly、Jay Kreps和Sam Shah:“领英的‘大数据’生态系统”,ACM数据管理国际会议(SIGMOD),2013年7月。doi:10.1145/2463676.2463707。
[ 30 ] Alan F. Gates, Olga Natkovich, Shubham Chopra, et al.: “ Building a High-Level Dataflow System on Top of Map-Reduce: The Pig Experience ,” at 35th International Conference on Very Large Data Bases (VLDB), August 2009.
[30] Alan F. Gates, Olga Natkovich, Shubham Chopra等: “在Map-Reduce之上构建一个高级数据流系统:Pig体验”,刊于2009年8月的第35届国际超大型数据库会议(VLDB)。
[ 31 ] Ashish Thusoo, Joydeep Sen Sarma, Namit Jain, et al.: “ Hive – A Petabyte Scale Data Warehouse Using Hadoop ,” at 26th IEEE International Conference on Data Engineering (ICDE), March 2010. doi:10.1109/ICDE.2010.5447738
[31] Ashish Thusoo、Joydeep Sen Sarma、Namit Jain 等人: “Hive – 使用 Hadoop 的 PB 级数据仓库”,收录于 26 届 IEEE 国际数据工程会议 (ICDE),2010年3月。 doi:10.1109/ICDE.2010.5447738。
[ 32 ] “ Cascading 3.0 User Guide ,” Concurrent, Inc., docs.cascading.org , January 2016.
【32】“Cascading 3.0 用户指南”,Concurrent, Inc.,docs.cascading.org,2016年1月。 “Cascading 3.0 用户指南”,Concurrent, Inc.,docs.cascading.org,2016年1月。
[ 33 ] “ Apache Crunch User Guide ,” Apache Software Foundation, crunch.apache.org .
[33] “Apache Crunch 用户指南”,Apache软件基金会,crunch.apache.org。
[ 34 ] Craig Chambers, Ashish Raniwala, Frances Perry, et al.: “ FlumeJava: Easy, Efficient Data-Parallel Pipelines ,” at 31st ACM SIGPLAN Conference on Programming Language Design and Implementation (PLDI), June 2010. doi:10.1145/1806596.1806638
[34] Craig Chambers(克雷格·钱伯斯),Ashish Raniwala(阿希斯·拉尼瓦拉),Frances Perry(弗朗西斯·佩里)等人:“FlumeJava:易于使用,高效的数据并行管道”,出版于2010年6月的第31届ACM SIGPLAN编程语言设计与实现会议(PLDI)。doi:10.1145/1806596.1806638。
[ 35 ] Jay Kreps: “ Why Local State is a Fundamental Primitive in Stream Processing ,” oreilly.com , July 31, 2014.
Jay Kreps:「为何本地状态是流处理中的基本原语」,oreilly.com,2014 年 7 月 31 日。
[ 36 ] Martin Kleppmann: “ Rethinking Caching in Web Apps ,” martin.kleppmann.com , October 1, 2012.
[36] Martin Kleppmann:“重新思考Web应用程序中的缓存”,martin.kleppmann.com,2012年10月1日。
[ 37 ] Mark Grover, Ted Malaska, Jonathan Seidman, and Gwen Shapira: Hadoop Application Architectures . O’Reilly Media, 2015. ISBN: 978-1-491-90004-8
马克·格罗弗(Mark Grover)、泰德·马拉斯卡(Ted Malaska)、乔纳森·塞德曼(Jonathan Seidman)和格温·夏皮拉(Gwen Shapira):《Hadoop 应用架构》,O’Reilly Media,2015年,ISBN: 978-1-491-90004-8
[ 38 ] Philippe Ajoux, Nathan Bronson, Sanjeev Kumar, et al.: “ Challenges to Adopting Stronger Consistency at Scale ,” at 15th USENIX Workshop on Hot Topics in Operating Systems (HotOS), May 2015.
[38] Philippe Ajoux, Nathan Bronson, Sanjeev Kumar等: "在规模上采用更强的一致性所面临的挑战",于2015年5月在第15届USENIX操作系统热门主题研讨会(HotOS)上发表。
[ 39 ] Sriranjan Manjunath: “ Skewed Join ,” wiki.apache.org , 2009.
"Skewed Join," wiki.apache.org,2009. "倾斜连接",wiki.apache.org,2009年。
[ 40 ] David J. DeWitt, Jeffrey F. Naughton, Donovan A. Schneider, and S. Seshadri: “ Practical Skew Handling in Parallel Joins ,” at 18th International Conference on Very Large Data Bases (VLDB), August 1992.
[40] David J. DeWitt, Jeffrey F. Naughton, Donovan A. Schneider和S. Seshadri:「並行聯接中實用的偏斜處理」,於1992年8月第18屆非常大資料庫國際會議(VLDB)上發表。
[ 41 ] Marcel Kornacker, Alexander Behm, Victor Bittorf, et al.: “ Impala: A Modern, Open-Source SQL Engine for Hadoop ,” at 7th Biennial Conference on Innovative Data Systems Research (CIDR), January 2015.
[41] Marcel Kornacker,Alexander Behm,Victor Bittorf等人:「Impala:Hadoop 的现代开源 SQL 引擎」,发表于第七届创新数据系统研究双年会(CIDR),2015年1月。
[ 42 ] Matthieu Monsch: “ Open-Sourcing PalDB, a Lightweight Companion for Storing Side Data ,” engineering.linkedin.com , October 26, 2015.
[42] Matthieu Monsch:“开源 PalDB,一个轻量级的存储辅助工具”,工程.linkedin.com,2015年10月26日。
[ 43 ] Daniel Peng and Frank Dabek: “ Large-Scale Incremental Processing Using Distributed Transactions and Notifications ,” at 9th USENIX conference on Operating Systems Design and Implementation (OSDI), October 2010.
[43] 丹尼尔·彭和弗兰克·达贝克:「使用分布式事务和通知进行大规模增量处理」,发表于2010年10月第9届USENIX操作系统设计和实现会议(OSDI)。
[ 44 ] “ “Cloudera Search User Guide,” Cloudera, Inc., September 2015.
[44] “Cloudera搜索用户指南”,Cloudera,Inc.,2015年9月。
[ 45 ] Lili Wu, Sam Shah, Sean Choi, et al.: “ The Browsemaps: Collaborative Filtering at LinkedIn ,” at 6th Workshop on Recommender Systems and the Social Web (RSWeb), October 2014.
“Browsemaps: LinkedIn上的协作过滤” - Lili Wu,Sam Shah,Sean Choi等在2014年10月举办的第6届推荐系统和社交网络研讨会(RSWeb)上的文章。
[ 46 ] Roshan Sumbaly, Jay Kreps, Lei Gao, et al.: “ Serving Large-Scale Batch Computed Data with Project Voldemort ,” at 10th USENIX Conference on File and Storage Technologies (FAST), February 2012.
【46】Roshan Sumbaly, Jay Kreps, Lei Gao等人:《使用Project Voldemort为大规模批处理数据提供服务》,发表于2012年2月的第十届USENIX文件与存储技术会议(FAST)。
[ 47 ] Varun Sharma: “ Open-Sourcing Terrapin: A Serving System for Batch Generated Data ,” engineering.pinterest.com , September 14, 2015.
[47] Varun Sharma: “公开源代码泰瑞龟:批量生成数据服务系统”,engineering.pinterest.com,2015年9月14日。
[ 48 ] Nathan Marz: “ ElephantDB ,” slideshare.net , May 30, 2011.
[48] Nathan Marz: “ElephantDB,” 幻灯片分享网,2011年5月30日。
[ 49 ] Jean-Daniel (JD) Cryans: “ How-to: Use HBase Bulk Loading, and Why ,” blog.cloudera.com , September 27, 2013.
"Jean-Daniel (JD) Cryans:如何使用HBase批量加载及其原因",blog.cloudera.com,2013年9月27日。"
[ 50 ] Nathan Marz: “ How to Beat the CAP Theorem ,” nathanmarz.com , October 13, 2011.
[50] Nathan Marz:“如何打败CAP定理”,nathanmarz.com,2011年10月13日。
[ 51 ] Molly Bartlett Dishman and Martin Fowler: “ Agile Architecture ,” at O’Reilly Software Architecture Conference , March 2015.
[51] 莫利·巴特利特·迪什曼和马丁·福勒: “敏捷架构”,于2015年3月在O'Reilly软件架构会议上。
[ 52 ] David J. DeWitt and Jim N. Gray: “ Parallel Database Systems: The Future of High Performance Database Systems ,” Communications of the ACM , volume 35, number 6, pages 85–98, June 1992. doi:10.1145/129888.129894
[52] David J. DeWitt 和 Jim N. Gray: “并行数据库系统: 高性能数据库系统的未来,”ACM 通讯, 卷 35, 第 6 期, 页码 85–98,1992年 6 月. doi:10.1145/129888.129894.
[ 53 ] Jay Kreps: “ But the multi-tenancy thing is actually really really hard ,” tweetstorm, twitter.com , October 31, 2014.
[53] Jay Kreps:“但是多租户的事实际上非常非常难。”(tweetstorm,twitter.com,2014年10月31日。)
[ 54 ] Jeffrey Cohen, Brian Dolan, Mark Dunlap, et al.: “ MAD Skills: New Analysis Practices for Big Data ,” Proceedings of the VLDB Endowment , volume 2, number 2, pages 1481–1492, August 2009. doi:10.14778/1687553.1687576
[54] Jeffrey Cohen, Brian Dolan, Mark Dunlap等:《MAD技能:大数据新分析实践》,VLDB杂志,第2卷,第2号,1481-1492页,2009年8月。doi:10.14778/1687553.1687576。
[ 55 ] Ignacio Terrizzano, Peter Schwarz, Mary Roth, and John E. Colino: “ Data Wrangling: The Challenging Journey from the Wild to the Lake ,” at 7th Biennial Conference on Innovative Data Systems Research (CIDR), January 2015.
[55] Ignacio Terrizzano, Peter Schwarz, Mary Roth, and John E. Colino: “数据整理:从荒野到湖泊的挑战之旅”,于2015年1月在第七届创新数据系统研究(CIDR)双年会上发表。
[ 56 ] Paige Roberts: “ To Schema on Read or to Schema on Write, That Is the Hadoop Data Lake Question ,” adaptivesystemsinc.com , July 2, 2015.
"对于Hadoop数据湖来说,是选择“读时模式”还是“写时模式”?——佩奇·罗伯茨(56),适应性系统公司,2015年7月2日。"
[ 57 ] Bobby Johnson and Joseph Adler: “ The Sushi Principle: Raw Data Is Better ,” at Strata+Hadoop World , February 2015.
[57] 罗伯特·约翰逊和约瑟夫·阿德勒: “寿司原则:原始数据更好” ,于2015年2月在Strata+Hadoop World发表。
[ 58 ] Vinod Kumar Vavilapalli, Arun C. Murthy, Chris Douglas, et al.: “ Apache Hadoop YARN: Yet Another Resource Negotiator ,” at 4th ACM Symposium on Cloud Computing (SoCC), October 2013. doi:10.1145/2523616.2523633
"Apache Hadoop YARN:另一个资源协商者",作者:Vinod Kumar Vavilapalli,Arun C. Murthy,Chris Douglas等,发表于2013年10月第4届ACM云计算研讨会(SoCC)。doi:10.1145/2523616.2523633。"
[ 59 ] Abhishek Verma, Luis Pedrosa, Madhukar Korupolu, et al.: “ Large-Scale Cluster Management at Google with Borg ,” at 10th European Conference on Computer Systems (EuroSys), April 2015. doi:10.1145/2741948.2741964
[59] Abhishek Verma, Luis Pedrosa, Madhukar Korupolu等: “在Google中的大规模集群管理 使用Borg” ,出自于第10届欧洲计算机系统会议(EuroSys),2015年4月。 doi:10.1145/2741948.2741964
[ 60 ] Malte Schwarzkopf: “ The Evolution of Cluster Scheduler Architectures ,” firmament.io , March 9, 2016.
「[60] Malte Schwarzkopf:群集调度器架构的演变,firmament.io,2016年3月9日。」
[ 61 ] Matei Zaharia, Mosharaf Chowdhury, Tathagata Das, et al.: “ Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing ,” at 9th USENIX Symposium on Networked Systems Design and Implementation (NSDI), April 2012.
[61] Matei Zaharia, Mosharaf Chowdhury, Tathagata Das 等人: “弹性分布式数据集:一种容错的内存集群计算抽象”,于2012年4月在第9届USENIX网络系统设计和实现研讨会(NSDI)上发表。
[ 62 ] Holden Karau, Andy Konwinski, Patrick Wendell, and Matei Zaharia: Learning Spark . O’Reilly Media, 2015. ISBN: 978-1-449-35904-1
[62] Holden Karau,Andy Konwinski,Patrick Wendell和Matei Zaharia:《学习Spark》。O'Reilly Media,2015. ISBN:978-1-449-35904-1。
[ 63 ] Bikas Saha and Hitesh Shah: “ Apache Tez: Accelerating Hadoop Query Processing ,” at Hadoop Summit , June 2014.
[63] Bikas Saha和Hitesh Shah: “Apache Tez:加速Hadoop查询处理”,于2014年6月的Hadoop峰会上。
[ 64 ] Bikas Saha, Hitesh Shah, Siddharth Seth, et al.: “ Apache Tez: A Unifying Framework for Modeling and Building Data Processing Applications ,” at ACM International Conference on Management of Data (SIGMOD), June 2015. doi:10.1145/2723372.2742790
【64】Bikas Saha、Hitesh Shah、Siddharth Seth等:“Apache Tez: 一个用于建模和构建数据处理应用的统一框架”,载于ACM数据管理国际会议(SIGMOD),2015年6月。doi:10.1145/2723372.2742790。
[ 65 ] Kostas Tzoumas: “ Apache Flink: API, Runtime, and Project Roadmap ,” slideshare.net , January 14, 2015.
[65] Kostas Tzoumas: “Apache Flink:API、Runtime 和项目路线图”,slideshare.net,2015 年 1 月 14 日。
[ 66 ] Alexander Alexandrov, Rico Bergmann, Stephan Ewen, et al.: “ The Stratosphere Platform for Big Data Analytics ,” The VLDB Journal , volume 23, number 6, pages 939–964, May 2014. doi:10.1007/s00778-014-0357-y
[66] 亚历山大·亚历山德罗夫、里科·柏格曼、斯蒂芬·伊文等人:《大数据分析的平流层平台》,《VLDB Journal》,第23卷,第6期,2014年5月,第939-964页。doi:10.1007/s00778-014-0357-y。
[ 67 ] Michael Isard, Mihai Budiu, Yuan Yu, et al.: “ Dryad: Distributed Data-Parallel Programs from Sequential Building Blocks ,” at European Conference on Computer Systems (EuroSys), March 2007. doi:10.1145/1272996.1273005
[67] Michael Isard, Mihai Budiu, Yuan Yu等人:“Dryad:来自顺序组成模块的分布式数据并行程序”,发表于欧洲计算机系统会议(EuroSys),2007年3月。doi:10.1145 / 1272996.1273005。
[ 68 ] Daniel Warneke and Odej Kao: “ Nephele: Efficient Parallel Data Processing in the Cloud ,” at 2nd Workshop on Many-Task Computing on Grids and Supercomputers (MTAGS), November 2009. doi:10.1145/1646468.1646476
[68] Daniel Warneke和Odej Kao:“Nephele: 云中高效的并行数据处理”,发表于第二届网格和超级计算的许多任务计算研讨会议(MTAGS),2009年11月。doi:10.1145/1646468.1646476。
[ 69 ] Lawrence Page, Sergey Brin, Rajeev Motwani, and Terry Winograd: “ The PageRank Citation Ranking: Bringing Order to the Web ,” Stanford InfoLab Technical Report 422, 1999.
[69] 劳伦斯·佩奇(Lawrence Page),谢尔盖·布林(Sergey Brin),拉吉夫·莫特瓦尼(Rajeev Motwani)和特里·温格勒德(Terry Winograd):“PageRank引用排名:为网络带来秩序”,斯坦福信息实验室技术报告422,1999年。
[ 70 ] Leslie G. Valiant: “ A Bridging Model for Parallel Computation ,” Communications of the ACM , volume 33, number 8, pages 103–111, August 1990. doi:10.1145/79173.79181
[70] 莱斯利·瓦利安特: "并行计算的桥接模型", ACM通讯,卷33,号8,页103-111,1990年8月。 doi:10.1145/79173.79181。
[ 71 ] Stephan Ewen, Kostas Tzoumas, Moritz Kaufmann, and Volker Markl: “ Spinning Fast Iterative Data Flows ,” Proceedings of the VLDB Endowment , volume 5, number 11, pages 1268-1279, July 2012. doi:10.14778/2350229.2350245
[71] Stephan Ewen,Kostas Tzoumas,Moritz Kaufmann和Volker Markl:“Spinning Fast Iterative Data Flows”,VLDB Endowment会议论文集,第5卷,第11期,2012年7月。doi:10.14778/2350229.2350245。
[ 72 ] Grzegorz Malewicz, Matthew H. Austern, Aart J. C. Bik, et al.: “ Pregel: A System for Large-Scale Graph Processing ,” at ACM International Conference on Management of Data (SIGMOD), June 2010. doi:10.1145/1807167.1807184
【72】Grzegorz Malewicz, Matthew H. Austern, Aart J. C. Bik 等人:「Pregel:一個用於大規模圖形處理的系統」,於 2010 年 6 月的 ACM 國際數據管理會議 (SIGMOD) 上發表。 doi:10.1145/1807167.1807184
[ 73 ] Frank McSherry, Michael Isard, and Derek G. Murray: “ Scalability! But at What COST? ,” at 15th USENIX Workshop on Hot Topics in Operating Systems (HotOS), May 2015.
[73] Frank McSherry, Michael Isard, 和 Derek G. Murray: “可扩展性!但代价是什么?”,于 2015 年 5 月在第 15 届 USENIX 操作系统热点问题研讨会上发表。
[ 74 ] Ionel Gog, Malte Schwarzkopf, Natacha Crooks, et al.: “ Musketeer: All for One, One for All in Data Processing Systems ,” at 10th European Conference on Computer Systems (EuroSys), April 2015. doi:10.1145/2741948.2741968
【74】Ionel Gog, Malte Schwarzkopf, Natacha Crooks等:“Musketeer: 数据处理系统中的All for One, One for All”,发表于2015年4月的第10届欧洲计算机系统会议(EuroSys)。doi:10.1145/2741948.2741968
[ 75 ] Aapo Kyrola, Guy Blelloch, and Carlos Guestrin: “ GraphChi: Large-Scale Graph Computation on Just a PC ,” at 10th USENIX Symposium on Operating Systems Design and Implementation (OSDI), October 2012.
“GraphChi:只需一台PC的大规模图形计算”的作者为Aapo Kyrola,Guy Blelloch和Carlos Guestrin。文章发表于2012年10月的第10届USENIX操作系统设计和实现研讨会(OSDI)。
[ 76 ] Andrew Lenharth, Donald Nguyen, and Keshav Pingali: “ Parallel Graph Analytics ,” Communications of the ACM , volume 59, number 5, pages 78–87, May 2016. doi:10.1145/2901919
[76] Andrew Lenharth,Donald Nguyen和Keshav Pingali:“并行图分析”,ACM通讯,第59卷,第5期,页78-87,2016年5月。doi:10.1145 / 2901919
[ 77 ] Fabian Hüske: “ Peeking into Apache Flink’s Engine Room ,” flink.apache.org , March 13, 2015.
[77] Fabian Hüske:“深入了解Apache Flink引擎”,flink.apache.org,2015年3月13日。
[ 78 ] Mostafa Mokhtar: “ Hive 0.14 Cost Based Optimizer (CBO) Technical Overview ,” hortonworks.com , March 2, 2015.
“Mostafa Mokhtar: “Hive 0.14 成本基于优化器 (CBO) 技术概述”,hortonworks.com,2015 年 3 月 2 日。”
[ 79 ] Michael Armbrust, Reynold S Xin, Cheng Lian, et al.: “ Spark SQL: Relational Data Processing in Spark ,” at ACM International Conference on Management of Data (SIGMOD), June 2015. doi:10.1145/2723372.2742797
"[79] Michael Armbrust, Reynold S Xin, Cheng Lian等:’Spark SQL:Spark中的关系数据处理’,发表于ACM数据管理国际会议(SIGMOD),2015年6月。doi:10.1145 / 2723372.2742797"
[ 80 ] Daniel Blazevski: “ Planting Quadtrees for Apache Flink ,” insightdataengineering.com , March 25, 2016.
80. Daniel Blazevski:“为Apache Flink种植四叉树”,insightdataengineering.com,2016年3月25日。
[ 81 ] Tom White: “ Genome Analysis Toolkit: Now Using Apache Spark for Data Processing ,” blog.cloudera.com , April 6, 2016.
"[81] Tom White: “Genome Analysis Toolkit: Now Using Apache Spark for Data Processing,” blog.cloudera.com, April 6, 2016." 81. 汤姆·怀特:“基因组分析工具包:现在使用Apache Spark进行数据处理”,blog.cloudera.com,2016年4月6日。