第11章。流处理
Chapter 11. Stream Processing
A complex system that works is invariably found to have evolved from a simple system that works. The inverse proposition also appears to be true: A complex system designed from scratch never works and cannot be made to work.
复杂的系统往往从一个可行的简单系统逐步演化而来,而反过来并不成立:完全由头设计的复杂系统往往无法正常运行,也无法被强行运行。
John Gall, Systemantics (1975)
约翰·高尔,《系统论》(1975年)
In Chapter 10 we discussed batch processing—techniques that read a set of files as input and produce a new set of output files. The output is a form of derived data ; that is, a dataset that can be recreated by running the batch process again if necessary. We saw how this simple but powerful idea can be used to create search indexes, recommendation systems, analytics, and more.
在第10章中,我们讨论了批处理——一种读取一组文件作为输入并产生新的输出文件的技术。输出是一种导出数据的形式;也就是说,如果必要,可以通过再次运行批处理来重新创建该数据集。我们看到了这个简单而强大的想法如何被用于创建搜索索引、推荐系统、分析等等。
However, one big assumption remained throughout Chapter 10 : namely, that the input is bounded—i.e., of a known and finite size—so the batch process knows when it has finished reading its input. For example, the sorting operation that is central to MapReduce must read its entire input before it can start producing output: it could happen that the very last input record is the one with the lowest key, and thus needs to be the very first output record, so starting the output early is not an option.
然而,第10章始終存在一個重要的假設:即輸入是有界的,即已知且有限的大小,因此批處理程序知道何時完成讀取其輸入。例如,MapReduce集中的排序操作必須在開始生成輸出之前讀取其整個輸入:可能發生的情況是,最後一個輸入記錄是具有最低鍵的記錄,因此需要成為第一個輸出記錄,因此提早開始輸出不是選項。
In reality, a lot of data is unbounded because it arrives gradually over time: your users produced data yesterday and today, and they will continue to produce more data tomorrow. Unless you go out of business, this process never ends, and so the dataset is never “complete” in any meaningful way [ 1 ]. Thus, batch processors must artificially divide the data into chunks of fixed duration: for example, processing a day’s worth of data at the end of every day, or processing an hour’s worth of data at the end of every hour.
实际上,很多数据是无界限的,因为它随着时间逐渐到达:你的用户昨天和今天生产出数据,他们明天还会继续生产更多的数据。除非你倒闭,否则这个过程永远不会结束,因此数据集从没有任何有意义的“完整”[1]。 因此,批处理器必须人为地将数据分成固定时长的块,例如,每天结束时处理一天的数据,或每小时结束时处理一小时的数据。
The problem with daily batch processes is that changes in the input are only reflected in the output a day later, which is too slow for many impatient users. To reduce the delay, we can run the processing more frequently—say, processing a second’s worth of data at the end of every second—or even continuously, abandoning the fixed time slices entirely and simply processing every event as it happens. That is the idea behind stream processing .
每日批处理的问题在于输入的变化只能在一天后反映在输出中,这对于许多不耐烦的用户来说太慢了。为了缩短延迟,我们可以更频繁地运行处理——比如在每秒末处理一秒钟的数据——甚至连续处理,完全放弃固定的时间片,简单地处理每个事件发生的过程。这就是流处理的思想。
In general, a “stream” refers to data that is incrementally made available over time. The concept
appears in many places: in the
stdin
and
stdout
of Unix, programming languages (lazy lists)
[
2
],
filesystem APIs (such as Java’s
FileInputStream
), TCP connections, delivering audio and video over
the internet, and so on.
一般情况下,“数据流”指的是随着时间逐步可用的数据。这个概念在很多地方都出现过:在Unix中的标准输入和标准输出、编程语言中的延迟列表,文件系统API(比如Java的FileInputStream)中,TCP连接、通过互联网传输音频和视频等等。
In this chapter we will look at event streams as a data management mechanism: the unbounded, incrementally processed counterpart to the batch data we saw in the last chapter . We will first discuss how streams are represented, stored, and transmitted over a network. In “Databases and Streams” we will investigate the relationship between streams and databases. And finally, in “Processing Streams” we will explore approaches and tools for processing those streams continually , and ways that they can be used to build applications.
在本章中,我们将研究事件流作为数据管理机制:无限的、逐步处理的批处理数据的对应物。我们首先讨论流如何被表示、存储和通过网络传输。在“数据库和流”中,我们将研究流和数据库之间的关系。最后,在“处理流”中,我们将探索持续处理这些流的方法和工具,以及它们可以用于构建应用程序的方式。
Transmitting Event Streams
In the batch processing world, the inputs and outputs of a job are files (perhaps on a distributed filesystem). What does the streaming equivalent look like?
在批处理世界中,作业的输入和输出是文件(可能位于分布式文件系统上)。流处理的等效物是什么样子?
When the input is a file (a sequence of bytes), the first processing step is usually to parse it into a sequence of records. In a stream processing context, a record is more commonly known as an event , but it is essentially the same thing: a small, self-contained, immutable object containing the details of something that happened at some point in time. An event usually contains a timestamp indicating when it happened according to a time-of-day clock (see “Monotonic Versus Time-of-Day Clocks” ).
当输入是一个文件(一系列字节)时,通常的第一步处理是将其解析成一系列记录。在流处理上下文中,记录更常被称为事件,但本质上是相同的东西:一个小的、独立的、不可变的对象,包含在某个时间点发生的详细信息。通常情况下,事件包含一个时间戳,表示发生了何时,根据按日计时时钟(参见“单调性与按日计时时钟”)。
For example, the thing that happened might be an action that a user took, such as viewing a page or making a purchase. It might also originate from a machine, such as a periodic measurement from a temperature sensor, or a CPU utilization metric. In the example of “Batch Processing with Unix Tools” , each line of the web server log is an event.
例如,发生的事情可能是用户采取的行动,比如查看页面或进行购买。它也可能来自机器,比如从温度传感器的定期测量或CPU利用率指标。在“使用Unix工具进行批处理”的例子中,Web服务器日志的每一行都是一个事件。
An event may be encoded as a text string, or JSON, or perhaps in some binary form, as discussed in Chapter 4 . This encoding allows you to store an event, for example by appending it to a file, inserting it into a relational table, or writing it to a document database. It also allows you to send the event over the network to another node in order to process it.
一个事件可以编码为文本字符串,JSON,或者可能以某种二进制形式,如第四章讨论的方式。这种编码允许你存储一个事件,例如将它附加到一个文件中,插入到关系表中,或将其写入文档数据库中。它还允许你将事件发送到网络中的另一个节点以便处理它。
In batch processing, a file is written once and then potentially read by multiple jobs. Analogously, in streaming terminology, an event is generated once by a producer (also known as a publisher or sender ), and then potentially processed by multiple consumers ( subscribers or recipients ) [ 3 ]. In a filesystem, a filename identifies a set of related records; in a streaming system, related events are usually grouped together into a topic or stream .
在批量处理中,一个文件只写一次,然后可能被多个作业读取。类似地,在实时处理术语中,事件首先由生产者(也称为发布者或发送者)生成,然后可能由多个消费者(订阅者或接收者)处理。在文件系统中,文件名标识一组相关记录;在实时处理系统中,相关事件通常被分组到主题或流中。
In principle, a file or database is sufficient to connect producers and consumers: a producer writes every event that it generates to the datastore, and each consumer periodically polls the datastore to check for events that have appeared since it last ran. This is essentially what a batch process does when it processes a day’s worth of data at the end of every day.
原则上,一个文件或数据库足够连接生产者和消费者:生产者将其生成的每个事件写入数据存储库,每个消费者定期轮询数据存储库以检查自上次运行以来出现的事件。这基本上就是批处理在每天结束时处理一天数据的方式。
However, when moving toward continual processing with low delays, polling becomes expensive if the datastore is not designed for this kind of usage. The more often you poll, the lower the percentage of requests that return new events, and thus the higher the overheads become. Instead, it is better for consumers to be notified when new events appear.
然而,当向着低延迟的持续处理方向移动时,如果数据存储区没有设计为此类使用方式,轮询变得非常昂贵。轮询的次数越多,返回新事件的请求所占比例就越低,因此开销就越高。相反,最好在出现新事件时通知消费者。
Databases have traditionally not supported this kind of notification mechanism very well: relational databases commonly have triggers , which can react to a change (e.g., a row being inserted into a table), but they are very limited in what they can do and have been somewhat of an afterthought in database design [ 4 , 5 ]. Instead, specialized tools have been developed for the purpose of delivering event notifications.
传统上,数据库并不很好地支持这种通知机制:关系型数据库通常具有触发器,可以对更改做出反应(例如,将一行插入到表中),但它们在功能上非常有限,在数据库设计中有些事后想法。为了实现事件通知,专门的工具已经被开发出来。
Messaging Systems
A common approach for notifying consumers about new events is to use a messaging system : a producer sends a message containing the event, which is then pushed to consumers. We touched on these systems previously in “Message-Passing Dataflow” , but we will now go into more detail.
通知消费者新事件的常见方法是使用消息系统:生产者发送包含事件的消息,然后将其推送到消费者。我们之前在“消息传递数据流”中涉及过这些系统,但现在我们将进一步详细介绍。
A direct communication channel like a Unix pipe or TCP connection between producer and consumer would be a simple way of implementing a messaging system. However, most messaging systems expand on this basic model. In particular, Unix pipes and TCP connect exactly one sender with one recipient, whereas a messaging system allows multiple producer nodes to send messages to the same topic and allows multiple consumer nodes to receive messages in a topic.
直接通信渠道,比如Unix管道或TCP连接,可作为实现消息传递系统的简单方式。然而,大多数消息传递系统都建立在这一基础模型之上。特别是,Unix管道和TCP只能将一个发送方与一个接收方连接起来,而消息传递系统允许多个生产节点向同一主题发送消息,并允许多个使用节点在主题中接收消息。
Within this publish/subscribe model, different systems take a wide range of approaches, and there is no one right answer for all purposes. To differentiate the systems, it is particularly helpful to ask the following two questions:
在这个发布/订阅模型中,不同的系统采取了各种各样的方法,并没有一个通用的答案适用于所有情况。为了区分这些系统,有两个问题特别有帮助:
-
What happens if the producers send messages faster than the consumers can process them? Broadly speaking, there are three options: the system can drop messages, buffer messages in a queue, or apply backpressure (also known as flow control ; i.e., blocking the producer from sending more messages). For example, Unix pipes and TCP use backpressure: they have a small fixed-size buffer, and if it fills up, the sender is blocked until the recipient takes data out of the buffer (see “Network congestion and queueing” ).
如果生产者发送消息的速度比消费者处理它们的速度更快,会发生什么?大致上,有三个选择:系统可以丢弃消息,在队列中缓冲消息,或者应用反压(也称为流量控制;即,阻止生产者继续发送更多消息)。例如,Unix管道和TCP使用反压:它们有一个小的固定大小的缓冲区,如果它被填满,发送者将被阻止,直到接收方将数据从缓冲区中取出为止(请参见“网络拥塞和排队”)。
If messages are buffered in a queue, it is important to understand what happens as that queue grows. Does the system crash if the queue no longer fits in memory, or does it write messages to disk? If so, how does the disk access affect the performance of the messaging system [ 6 ]?
如果消息在队列中缓冲,重要的是要了解队列增长时会发生什么。如果队列无法再适配内存,系统是否会崩溃,或者它是否会将消息写入磁盘?如果是这样,磁盘访问如何影响消息系统的性能?[6]
-
What happens if nodes crash or temporarily go offline—are any messages lost? As with databases, durability may require some combination of writing to disk and/or replication (see the sidebar “Replication and Durability” ), which has a cost. If you can afford to sometimes lose messages, you can probably get higher throughput and lower latency on the same hardware.
如果节点崩溃或暂时离线,是否会丢失任何消息?与数据库类似,耐久性可能需要将数据写入磁盘和/或复制(请参见侧边栏“复制和耐久性”),这具有成本。如果您需要时偶尔丢失消息,您可能可以在相同硬件上获得更高的吞吐量和更低的延迟。
Whether message loss is acceptable depends very much on the application. For example, with sensor readings and metrics that are transmitted periodically, an occasional missing data point is perhaps not important, since an updated value will be sent a short time later anyway. However, beware that if a large number of messages are dropped, it may not be immediately apparent that the metrics are incorrect [ 7 ]. If you are counting events, it is more important that they are delivered reliably, since every lost message means incorrect counters.
信息丢失是否可接受非常取决于应用程序。例如,对于定期传输的传感器读数和指标,偶尔丢失的数据点可能不重要,因为稍后会发送更新的值。然而,请注意,如果丢失了大量的消息,则可能不会立即发现指标不正确[7]。如果您正在计算事件,则更重要的是它们可靠地传递,因为每个丢失的消息都意味着计数器不正确。
A nice property of the batch processing systems we explored in Chapter 10 is that they provide a strong reliability guarantee: failed tasks are automatically retried, and partial output from failed tasks is automatically discarded. This means the output is the same as if no failures had occurred, which helps simplify the programming model. Later in this chapter we will examine how we can provide similar guarantees in a streaming context.
我们在第10章探讨的批处理系统的一个很好的特点是它们提供了强大的可靠性保证:失败的任务会自动重试,并且失败任务的部分输出会自动丢弃。这意味着输出与未发生故障时的输出相同,有助于简化编程模型。本章稍后我们将探讨如何在流式上下文中提供类似的保证。
Direct messaging from producers to consumers
A number of messaging systems use direct network communication between producers and consumers without going via intermediary nodes:
许多消息系统使用生产者和消费者之间的直接网络通信,而不经过中介节点。
-
UDP multicast is widely used in the financial industry for streams such as stock market feeds, where low latency is important [ 8 ]. Although UDP itself is unreliable, application-level protocols can recover lost packets (the producer must remember packets it has sent so that it can retransmit them on demand).
UDP组播在金融行业广泛应用于像股票市场数据等的流媒体,低延迟很重要。虽然UDP本身不可靠,但应用层协议可以恢复丢失的数据包(生产者必须记得它已经发送的数据包,以便在需要时进行重传)。
-
Brokerless messaging libraries such as ZeroMQ [ 9 ] and nanomsg take a similar approach, implementing publish/subscribe messaging over TCP or IP multicast.
没有经纪人的消息传递库,如ZeroMQ和nanomsg采用类似的方法,通过TCP或IP组播实现发布/订阅消息传递。
-
StatsD [ 10 ] and Brubeck [ 7 ] use unreliable UDP messaging for collecting metrics from all machines on the network and monitoring them. (In the StatsD protocol, counter metrics are only correct if all messages are received; using UDP makes the metrics at best approximate [ 11 ]. See also “TCP Versus UDP” .)
StatsD和Brubeck使用不可靠的UDP消息在网络上从所有机器收集指标并对其进行监控。(在StatsD协议中,只有当收到所有消息时,计数器指标才正确;使用UDP最多只能获得近似的指标[11]。另请参见“TCP与UDP”)
-
If the consumer exposes a service on the network, producers can make a direct HTTP or RPC request (see “Dataflow Through Services: REST and RPC” ) to push messages to the consumer. This is the idea behind webhooks [ 12 ], a pattern in which a callback URL of one service is registered with another service, and it makes a request to that URL whenever an event occurs.
如果消费者在网络上提供了服务,那么生产者可以直接通过HTTP或RPC请求(请参阅“服务数据流:REST和RPC”)向消费者推送消息。这就是Webhook[12]背后的思想,这是一种模式,其中一个服务的回调URL被注册到另一个服务中,并在发生事件时向该URL发出请求。
Although these direct messaging systems work well in the situations for which they are designed, they generally require the application code to be aware of the possibility of message loss. The faults they can tolerate are quite limited: even if the protocols detect and retransmit packets that are lost in the network, they generally assume that producers and consumers are constantly online.
尽管这些直接信息传递系统在它们被设计的情况下工作得很好,但它们通常需要应用程序代码意识到消息丢失的可能性。它们可以容忍的错误相当有限:即使协议检测到并重新传输在网络中丢失的数据包,它们通常假定生产者和消费者始终在线。
If a consumer is offline, it may miss messages that were sent while it is unreachable. Some protocols allow the producer to retry failed message deliveries, but this approach may break down if the producer crashes, losing the buffer of messages that it was supposed to retry.
如果消费者离线,则可能错过在其不可达时发送的消息。一些协议允许生产者重试失败的消息传递,但如果生产者崩溃,失去了应该重试的消息缓冲区,这种方法可能会崩溃。
Message brokers
A widely used alternative is to send messages via a message broker (also known as a message queue ), which is essentially a kind of database that is optimized for handling message streams [ 13 ]. It runs as a server, with producers and consumers connecting to it as clients. Producers write messages to the broker, and consumers receive them by reading them from the broker.
一种广泛使用的替代方式是通过消息代理(也称为消息队列)发送消息,它实际上是一种针对处理消息流进行了优化的数据库[13]。它作为服务器运行,生产者和消费者作为客户端连接到它上面。生产者将消息写入代理,消费者通过从代理中读取消息来接收它们。
By centralizing the data in the broker, these systems can more easily tolerate clients that come and go (connect, disconnect, and crash), and the question of durability is moved to the broker instead. Some message brokers only keep messages in memory, while others (depending on configuration) write them to disk so that they are not lost in case of a broker crash. Faced with slow consumers, they generally allow unbounded queueing (as opposed to dropping messages or backpressure), although this choice may also depend on the configuration.
通过将数据集中存储在代理中,这些系统可以更容易地容忍客户端的连接和断开(连接,断开和崩溃),并且持久性的问题被移至代理。一些消息代理仅将消息保存在内存中,而其他代理(取决于配置)会将它们写入磁盘,以防代理崩溃时消息丢失。面对缓慢的消费者,他们通常允许无限排队(与丢弃消息或反压相对),尽管这种选择也可能取决于配置。
A consequence of queueing is also that consumers are generally asynchronous : when a producer sends a message, it normally only waits for the broker to confirm that it has buffered the message and does not wait for the message to be processed by consumers. The delivery to consumers will happen at some undetermined future point in time—often within a fraction of a second, but sometimes significantly later if there is a queue backlog.
排队的一个后果是消费者通常是异步的:当生产者发送一条消息时,它通常只等待代理确认已缓存该消息,而不等待消息被消费者处理。向消费者的交付将在某个不确定的未来时间发生 - 通常在几分之一秒内,但如果存在队列积压,则有时会更晚。
Message brokers compared to databases
Some message brokers can even participate in two-phase commit protocols using XA or JTA (see “Distributed Transactions in Practice” ). This feature makes them quite similar in nature to databases, although there are still important practical differences between message brokers and databases:
一些消息代理甚至可以使用XA或JTA参与两阶段提交协议(参见“实践中的分布式事务”)。这个功能使它们与数据库非常相似,尽管消息代理和数据库之间仍然存在重要的实际差异:
-
Databases usually keep data until it is explicitly deleted, whereas most message brokers automatically delete a message when it has been successfully delivered to its consumers. Such message brokers are not suitable for long-term data storage.
数据库通常会保留数据直到显式删除,而大多数消息代理在将消息成功交付给其使用者后会自动删除该消息。这样的消息代理不适合长期数据存储。
-
Since they quickly delete messages, most message brokers assume that their working set is fairly small—i.e., the queues are short. If the broker needs to buffer a lot of messages because the consumers are slow (perhaps spilling messages to disk if they no longer fit in memory), each individual message takes longer to process, and the overall throughput may degrade [ 6 ].
由于它们会快速删除消息,大多数消息代理假设它们的工作集相对较小,即队列较短。如果代理需要缓冲大量消息,因为消费者速度较慢(可能将消息溢出到磁盘,如果它们不再适合内存),每个单独的消息处理时间变长,整体吞吐量可能会下降。
-
Databases often support secondary indexes and various ways of searching for data, while message brokers often support some way of subscribing to a subset of topics matching some pattern. The mechanisms are different, but both are essentially ways for a client to select the portion of the data that it wants to know about.
数据库通常支持辅助索引和各种搜索数据的方法,而消息代理通常支持订阅与某些模式匹配的主题子集的一些方式。这些机制不同,但两者本质上都是客户端选择要了解的数据部分的方法。
-
When querying a database, the result is typically based on a point-in-time snapshot of the data; if another client subsequently writes something to the database that changes the query result, the first client does not find out that its prior result is now outdated (unless it repeats the query, or polls for changes). By contrast, message brokers do not support arbitrary queries, but they do notify clients when data changes (i.e., when new messages become available).
查询数据库时,结果通常基于数据的某个时间点的快照;如果另一个客户端稍后向数据库写入了一些更改查询结果的内容,则第一个客户端无法发现其先前的结果已过时(除非它重复查询或轮询更改)。 相比之下,消息代理不支持任意查询,但它们在数据更改时通知客户端(即在新消息变得可用时通知)。
This is the traditional view of message brokers, which is encapsulated in standards like JMS [ 14 ] and AMQP [ 15 ] and implemented in software like RabbitMQ, ActiveMQ, HornetQ, Qpid, TIBCO Enterprise Message Service, IBM MQ, Azure Service Bus, and Google Cloud Pub/Sub [ 16 ].
这是消息代理的传统视角,它被封装在像JMS [14]和AMQP [15]这样的标准中,并在RabbitMQ,ActiveMQ,HornetQ,Qpid,TIBCO企业消息服务,IBM MQ,Azure服务中得到了实现Bus和Google Cloud Pub / Sub [16]。
Multiple consumers
When multiple consumers read messages in the same topic, two main patterns of messaging are used, as illustrated in Figure 11-1 :
当多个消费者在相同的主题中读取消息时,使用了两种主要的消息模式,如图11-1所示:
- Load balancing
-
Each message is delivered to one of the consumers, so the consumers can share the work of processing the messages in the topic. The broker may assign messages to consumers arbitrarily. This pattern is useful when the messages are expensive to process, and so you want to be able to add consumers to parallelize the processing. (In AMQP, you can implement load balancing by having multiple clients consuming from the same queue, and in JMS it is called a shared subscription .)
每条消息都会被发送到其中一个消费者,这样消费者可以共同处理主题中的消息。代理可以任意地将消息分配给消费者。当消息处理成本很高时,这种模式非常有用,因此您希望能够添加消费者来并行处理。 (在AMQP中,您可以通过让多个客户端从同一队列消费来实现负载平衡,在JMS中则称为共享订阅。)
- Fan-out
-
Each message is delivered to all of the consumers. Fan-out allows several independent consumers to each “tune in” to the same broadcast of messages, without affecting each other—the streaming equivalent of having several different batch jobs that read the same input file. (This feature is provided by topic subscriptions in JMS, and exchange bindings in AMQP.)
每条消息都会发送给所有的消费者。扇出允许多个独立的消费者“调谐”到相同的消息广播,而不会相互影响-这就像有几个不同的批处理作业读取相同的输入文件的流媒体一样。(这个特性由JMS中的主题订阅和AMQP中的交换绑定提供。)
The two patterns can be combined: for example, two separate groups of consumers may each subscribe to a topic, such that each group collectively receives all messages, but within each group only one of the nodes receives each message.
这两个模式可以结合使用:例如,两个独立的消费者群体可以各自订阅一个主题,以便每个群体共同接收所有消息,但在每个群体中只有一个节点接收每条消息。
Acknowledgments and redelivery
Consumers may crash at any time, so it could happen that a broker delivers a message to a consumer but the consumer never processes it, or only partially processes it before crashing. In order to ensure that the message is not lost, message brokers use acknowledgments : a client must explicitly tell the broker when it has finished processing a message so that the broker can remove it from the queue.
消费者可能随时崩溃,因此经纪人可能会发送消息给消费者,但消费者可能在崩溃之前未能处理完全,或仅部分处理完毕。为确保消息不丢失,消息经纪人使用确认机制:客户端必须明确告诉经纪人已完成对消息的处理,以便经纪人可以将其从队列中移除。
If the connection to a client is closed or times out without the broker receiving an acknowledgment, it assumes that the message was not processed, and therefore it delivers the message again to another consumer. (Note that it could happen that the message actually was fully processed, but the acknowledgment was lost in the network. Handling this case requires an atomic commit protocol, as discussed in “Distributed Transactions in Practice” .)
如果与客户端的连接关闭或超时,而经纪人未收到确认,则假定该消息未被处理,因此将该消息再次传递给另一个消费者。(请注意,实际上该消息可能已完全处理,但在网络中丢失了确认。处理此情况需要原子提交协议,如“实践中的分布式事务”所讨论的。)
When combined with load balancing, this redelivery behavior has an interesting effect on the ordering of messages. In Figure 11-2 , the consumers generally process messages in the order they were sent by producers. However, consumer 2 crashes while processing message m3 , at the same time as consumer 1 is processing message m4 . The unacknowledged message m3 is subsequently redelivered to consumer 1, with the result that consumer 1 processes messages in the order m4 , m3 , m5 . Thus, m3 and m4 are not delivered in the same order as they were sent by producer 1.
当与负载均衡结合使用时,此重新传递行为对消息排序产生有趣的影响。在图11-2中,消费者通常按照制造商发送的顺序处理消息。但是,当消费者2在处理消息m3时崩溃,同时消费者1正在处理消息m4时,未确认的消息m3随后会被重新发送给消费者1,结果是消费者1按照m4,m3,m5的顺序处理消息。因此,m3和m4不按制造商1发送的相同顺序传递。
Even if the message broker otherwise tries to preserve the order of messages (as required by both the JMS and AMQP standards), the combination of load balancing with redelivery inevitably leads to messages being reordered. To avoid this issue, you can use a separate queue per consumer (i.e., not use the load balancing feature). Message reordering is not a problem if messages are completely independent of each other, but it can be important if there are causal dependencies between messages, as we shall see later in the chapter.
即使消息代理器尽可能保持消息顺序(符合JMS和AMQP标准的要求),负载均衡与重传结合必然会导致消息被重新排序。为了避免这个问题,可以为每个消费者使用单独的队列(不使用负载均衡功能)。如果消息完全独立,那么消息重新排序不是问题,但是如果消息之间存在因果依赖关系,则可能很重要,这将在本章后面讨论。
Partitioned Logs
Sending a packet over a network or making a request to a network service is normally a transient operation that leaves no permanent trace. Although it is possible to record it permanently (using packet capture and logging), we normally don’t think of it that way. Even message brokers that durably write messages to disk quickly delete them again after they have been delivered to consumers, because they are built around a transient messaging mindset.
通过网络发送数据包或向网络服务发送请求通常是一个短暂的操作,不会留下永久的痕迹。尽管可以通过数据包捕获和记录来永久记录它,但我们通常不会这样考虑。即使消息代理将消息可靠地写入磁盘,它们也会在传递给消费者后快速将其删除,因为它们是围绕着短暂消息传递模式构建的。
Databases and filesystems take the opposite approach: everything that is written to a database or file is normally expected to be permanently recorded, at least until someone explicitly chooses to delete it again.
数据库和文件系统采取相反的方式:通常期望写入到数据库或文件的所有内容都能够永久记录,至少在有人明确选择再次删除之前是这样的。
This difference in mindset has a big impact on how derived data is created. A key feature of batch processes, as discussed in Chapter 10 , is that you can run them repeatedly, experimenting with the processing steps, without risk of damaging the input (since the input is read-only). This is not the case with AMQP/JMS-style messaging: receiving a message is destructive if the acknowledgment causes it to be deleted from the broker, so you cannot run the same consumer again and expect to get the same result.
思维方式的差异会对衍生数据的创建产生很大的影响。批量处理的一个关键特征是你可以反复运行它们,尝试处理步骤,而不会破坏输入数据(因为输入数据是只读的)。而AMQP/JMS风格的消息传递不是这样的:如果确认消息后导致它从代理中被删除,那么接收到消息就会被破坏,所以你不能再次运行同样的消费者并期望得到相同的结果。
If you add a new consumer to a messaging system, it typically only starts receiving messages sent after the time it was registered; any prior messages are already gone and cannot be recovered. Contrast this with files and databases, where you can add a new client at any time, and it can read data written arbitrarily far in the past (as long as it has not been explicitly overwritten or deleted by the application).
如果向消息系统添加新的消费者,通常只会开始接收在它注册之后发送的消息;之前的消息已经消失,无法恢复。相比之下,文件和数据库可以在任何时间添加新的客户端,并且可以读取写入任意远的数据(只要它没有被应用程序明确覆盖或删除)。
Why can we not have a hybrid, combining the durable storage approach of databases with the low-latency notification facilities of messaging? This is the idea behind log-based message brokers .
为什么我们不能拥有一种混合方法,将数据库的持久存储方法与消息传递的低延迟通知功能相结合呢?这就是基于日志的消息代理的核心思想。
Using logs for message storage
A log is simply an append-only sequence of records on disk. We previously discussed logs in the context of log-structured storage engines and write-ahead logs in Chapter 3 , and in the context of replication in Chapter 5 .
一个日志是仅仅在磁盘上追加记录的序列。我们之前在第三章中讨论了日志结构化存储引擎和预写日志,在第五章中讨论了复制的情况下的日志。
The same structure can be used to implement a message broker: a producer sends a message by
appending it to the end of the log, and a consumer receives messages by reading the log
sequentially. If a consumer reaches the end of the log, it waits for a notification that a new
message has been appended. The Unix tool
tail -f
, which watches a file for data being appended,
essentially works like this.
同样的结构可以用来实现一个消息代理:生产者通过将消息附加到日志末尾来发送消息,而消费者通过按顺序读取日志来接收消息。如果消费者到达日志的末尾,它会等待通知新消息已被附加。Unix工具tail -f,它观察文件中是否有数据被附加,基本上就是这样工作的。
In order to scale to higher throughput than a single disk can offer, the log can be partitioned (in the sense of Chapter 6 ). Different partitions can then be hosted on different machines, making each partition a separate log that can be read and written independently from other partitions. A topic can then be defined as a group of partitions that all carry messages of the same type. This approach is illustrated in Figure 11-3 .
为了扩大吞吐量,超过单个磁盘的性能,可以对日志进行分区(如第6章所述)。不同的分区可以托管在不同的机器上,使每个分区成为一个独立的日志,可以独立于其他分区进行读写。然后可以将一个主题定义为一组携带相同类型信息的分区。这种方法在图11-3中说明了。
Within each partition, the broker assigns a monotonically increasing sequence number, or offset , to every message (in Figure 11-3 , the numbers in boxes are message offsets). Such a sequence number makes sense because a partition is append-only, so the messages within a partition are totally ordered. There is no ordering guarantee across different partitions.
在每个分区中,代理人为每个消息分配一个单调递增的序列号或偏移量(在图11-3中,方框中的数字是消息偏移量)。这样的序列号是有意义的,因为分区是仅追加的,所以分区内的消息是完全有序的。跨不同分区没有排序保证。
Apache Kafka [ 17 , 18 ], Amazon Kinesis Streams [ 19 ], and Twitter’s DistributedLog [ 20 , 21 ] are log-based message brokers that work like this. Google Cloud Pub/Sub is architecturally similar but exposes a JMS-style API rather than a log abstraction [ 16 ]. Even though these message brokers write all messages to disk, they are able to achieve throughput of millions of messages per second by partitioning across multiple machines, and fault tolerance by replicating messages [ 22 , 23 ].
Apache Kafka、Amazon Kinesis Streams 和 Twitter 的 DistributedLog 是基于日志的消息代理,工作方式如下。Google Cloud Pub/Sub 与其架构类似,但提供类似 JMS 的 API 而非日志抽象。尽管这些消息代理将所有消息都写入磁盘,但通过跨多台计算机分区和复制消息,它们能够实现每秒数百万条消息的吞吐量和容错性。
Logs compared to traditional messaging
The log-based approach trivially supports fan-out messaging, because several consumers can independently read the log without affecting each other—reading a message does not delete it from the log. To achieve load balancing across a group of consumers, instead of assigning individual messages to consumer clients, the broker can assign entire partitions to nodes in the consumer group.
日志基于的方法很容易支持扇出式消息传递,因为多个消费者可以独立地读取日志,而不会相互影响——读取一条消息并不会从日志中删除它。为了在多个消费者节点之间实现负载均衡,代理可以将整个分区分配给该消费者组中的节点,而不是将单个消息分配给消费者客户端。
Each client then consumes all the messages in the partitions it has been assigned. Typically, when a consumer has been assigned a log partition, it reads the messages in the partition sequentially, in a straightforward single-threaded manner. This coarse-grained load balancing approach has some downsides:
每个客户端都会消费分配给它的分区中的所有消息。通常,当消费者分配到一个日志分区后,会按顺序逐个读取分区中的消息,采用简单的单线程方式。这种粗粒度的负载均衡方法存在一些缺点:
-
The number of nodes sharing the work of consuming a topic can be at most the number of log partitions in that topic, because messages within the same partition are delivered to the same node. i
消费一个主题的节点数量最多只能是该主题的日志分区数,因为同一分区内的消息会被传递到相同的节点。
-
If a single message is slow to process, it holds up the processing of subsequent messages in that partition (a form of head-of-line blocking; see “Describing Performance” ).
如果一个单独的消息处理较慢,它会阻塞该分区后续消息的处理(一种先到先处理的形式;参见“描述性能”)。
Thus, in situations where messages may be expensive to process and you want to parallelize processing on a message-by-message basis, and where message ordering is not so important, the JMS/AMQP style of message broker is preferable. On the other hand, in situations with high message throughput, where each message is fast to process and where message ordering is important, the log-based approach works very well.
因此,在处理信息可能很昂贵且您想要按消息进行并行处理的情况下,而且消息排序并不那么重要时,首选JMS/AMQP样式的消息代理。另一方面,在高消息吞吐量的情况下,每个消息都快速处理且消息排序很重要时,基于日志的方法非常有效。
Consumer offsets
Consuming a partition sequentially makes it easy to tell which messages have been processed: all messages with an offset less than a consumer’s current offset have already been processed, and all messages with a greater offset have not yet been seen. Thus, the broker does not need to track acknowledgments for every single message—it only needs to periodically record the consumer offsets. The reduced bookkeeping overhead and the opportunities for batching and pipelining in this approach help increase the throughput of log-based systems.
按顺序消耗分区可以轻松确定已处理过的消息:具有比消费者当前偏移量小的偏移量的所有消息已经处理完毕,具有更大偏移量的所有消息尚未被查看。因此,代理不需要跟踪每个单个消息的确认 - 它只需要定期记录消费者偏移量。这种方法中减少的簿记开销以及管道化和批处理的机会有助于提高基于日志的系统的吞吐量。
This offset is in fact very similar to the log sequence number that is commonly found in single-leader database replication, and which we discussed in “Setting Up New Followers” . In database replication, the log sequence number allows a follower to reconnect to a leader after it has become disconnected, and resume replication without skipping any writes. Exactly the same principle is used here: the message broker behaves like a leader database, and the consumer like a follower.
这个偏移实际上非常类似于单主数据库复制中常见的日志序列号,我们在“设置新的跟随者”中讨论过。在数据库复制中,日志序列号允许跟随者在与主服务器断开连接后重新连接到主服务器,并在不跳过任何写入的情况下恢复复制。这里使用完全相同的原则:消息代理行为就像一个主数据库,而消费者则像一个跟随者。
If a consumer node fails, another node in the consumer group is assigned the failed consumer’s partitions, and it starts consuming messages at the last recorded offset. If the consumer had processed subsequent messages but not yet recorded their offset, those messages will be processed a second time upon restart. We will discuss ways of dealing with this issue later in the chapter.
如果一个消费者节点失败,消费者组中的另一个节点将被分配到失败的消费者分区,并从最后记录的偏移量开始消费消息。如果消费者已经处理了后续消息但尚未记录它们的偏移量,则在重新启动后这些消息将被再次处理。我们将在本章后面讨论处理这个问题的方法。
Disk space usage
If you only ever append to the log, you will eventually run out of disk space. To reclaim disk space, the log is actually divided into segments, and from time to time old segments are deleted or moved to archive storage. (We’ll discuss a more sophisticated way of freeing disk space later.)
如果你只是向日志追加数据,最终你会耗尽磁盘空间。为了回收磁盘空间,日志实际上被分成了段,而旧的段会定期删除或移动到归档存储中。(我们稍后将讨论一种更复杂的释放磁盘空间的方法。)
This means that if a slow consumer cannot keep up with the rate of messages, and it falls so far behind that its consumer offset points to a deleted segment, it will miss some of the messages. Effectively, the log implements a bounded-size buffer that discards old messages when it gets full, also known as a circular buffer or ring buffer . However, since that buffer is on disk, it can be quite large.
这意味着如果一个慢消费者无法跟上消息的速度,它会落后很远,导致其消费者偏移指向已删除的段,这会导致它错过一些消息。实际上,该记录实现了一个有界大小的缓冲区,当它变满时会丢弃旧的消息,也被称为环形缓冲区或环形缓存。但是,由于该缓冲区在磁盘上,它可以非常大。
Let’s do a back-of-the-envelope calculation. At the time of writing, a typical large hard drive has a capacity of 6 TB and a sequential write throughput of 150 MB/s. If you are writing messages at the fastest possible rate, it takes about 11 hours to fill the drive. Thus, the disk can buffer 11 hours’ worth of messages, after which it will start overwriting old messages. This ratio remains the same, even if you use many hard drives and machines. In practice, deployments rarely use the full write bandwidth of the disk, so the log can typically keep a buffer of several days’ or even weeks’ worth of messages.
让我们进行草稿纸计算。在撰写本文时,一台典型的大型硬盘容量为6 TB,顺序写入吞吐量为150 MB/s。如果您以最快速度写入消息,填充驱动器需要约11小时。因此,磁盘可以缓存11小时的消息,之后它将开始覆盖旧的消息。即使使用多个硬盘和机器,这个比率仍然保持不变。实际上,在部署中很少使用磁盘的完整写入带宽,因此日志通常可以保持数天或甚至数周的消息缓冲。
Regardless of how long you retain messages, the throughput of a log remains more or less constant, since every message is written to disk anyway [ 18 ]. This behavior is in contrast to messaging systems that keep messages in memory by default and only write them to disk if the queue grows too large: such systems are fast when queues are short and become much slower when they start writing to disk, so the throughput depends on the amount of history retained.
日志的吞吐量始终保持相对恒定,无论消息保留多长时间,因为无论如何每个消息都会被写入磁盘。这种行为与默认将消息保留在内存中,仅在队列过大时才将其写入磁盘的消息系统形成对比。这些系统在队列较短时速度很快,当它们开始向磁盘写入时会变得慢得多,因此吞吐量取决于保留的历史数量。
When consumers cannot keep up with producers
At the beginning of “Messaging Systems” we discussed three choices of what to do if a consumer cannot keep up with the rate at which producers are sending messages: dropping messages, buffering, or applying backpressure. In this taxonomy, the log-based approach is a form of buffering with a large but fixed-size buffer (limited by the available disk space).
在“消息系统”的开头,我们讨论了三种选择,如果消费者无法跟上生产者发送消息的速度:丢弃消息、缓冲或施加反压力。在这种分类中,基于日志的方法是一种具有大但固定大小缓冲区(由可用磁盘空间限制)的缓冲。
If a consumer falls so far behind that the messages it requires are older than what is retained on disk, it will not be able to read those messages—so the broker effectively drops old messages that go back further than the size of the buffer can accommodate. You can monitor how far a consumer is behind the head of the log, and raise an alert if it falls behind significantly. As the buffer is large, there is enough time for a human operator to fix the slow consumer and allow it to catch up before it starts missing messages.
如果消费者落后太远,需要的消息比磁盘上保留的消息还要旧,那么它将无法读取这些消息。因此,经纪人有效地删除更早的消息,这些消息回溯的时间超过缓冲区大小。您可以监视消费者落后于日志头的程度,并在其显著落后时发出警报。由于缓冲区很大,因此人工操作员有足够的时间修复缓慢的消费者并使其赶上,以免错过消息。
Even if a consumer does fall too far behind and starts missing messages, only that consumer is affected; it does not disrupt the service for other consumers. This fact is a big operational advantage: you can experimentally consume a production log for development, testing, or debugging purposes, without having to worry much about disrupting production services. When a consumer is shut down or crashes, it stops consuming resources—the only thing that remains is its consumer offset.
即使消费者落后太多开始错过信息,也只影响到该消费者本身,不会影响其他消费者的服务。这个事实是一个极大的运营优势:你可以实验性地消费生产日志,用于开发、测试或调试目的,而不必担心会干扰生产服务。当消费者关闭或崩溃时,它停止消耗资源,唯一留下的是消费者偏移量。
This behavior also contrasts with traditional message brokers, where you need to be careful to delete any queues whose consumers have been shut down—otherwise they continue unnecessarily accumulating messages and taking away memory from consumers that are still active.
此行为与传统的消息代理相比也存在明显的对比,传统的消息代理需要谨慎删除已经关闭的消费者所使用的队列,否则这些队列会持续积累消息,占用消费者还需使用的内存空间。
Replaying old messages
We noted previously that with AMQP- and JMS-style message brokers, processing and acknowledging messages is a destructive operation, since it causes the messages to be deleted on the broker. On the other hand, in a log-based message broker, consuming messages is more like reading from a file: it is a read-only operation that does not change the log.
之前我们注意到,使用AMQP和JMS风格的消息代理时,处理和确认消息是一个破坏性的操作,因为它会导致消息在代理中被删除。另一方面,在基于日志的消息代理中,消费消息更像是从文件中读取:它是一个只读操作,不会改变日志。
The only side effect of processing, besides any output of the consumer, is that the consumer offset moves forward. But the offset is under the consumer’s control, so it can easily be manipulated if necessary: for example, you can start a copy of a consumer with yesterday’s offsets and write the output to a different location, in order to reprocess the last day’s worth of messages. You can repeat this any number of times, varying the processing code.
数据处理的唯一副作用,除了任何消费者的输出之外,就是消费者偏移量向前移动。但偏移量在消费者的控制之下,因此如果需要,很容易进行操作:例如,您可以使用昨天的偏移量启动消费者的副本,并将输出写入不同的位置,以重新处理最后一天的消息。您可以重复这个过程任意次数,可以变换处理代码。
This aspect makes log-based messaging more like the batch processes of the last chapter, where derived data is clearly separated from input data through a repeatable transformation process. It allows more experimentation and easier recovery from errors and bugs, making it a good tool for integrating dataflows within an organization [ 24 ].
这个方面使基于日志的消息更像上一章的批处理,通过可重复的转换过程清楚地将派生数据与输入数据分离。它允许更多的实验和更容易地从错误和漏洞中恢复,使其成为组织内集成数据流的好工具[24]。 这方面使基于日志的消息更像前面章节中的批处理模式,通过可重复转换过程将衍生数据与输入数据明确区别开来。这使得数据集成更加灵活,容易恢复出错和解决漏洞,也是组织内数据流集成的重要工具。[24]
Databases and Streams
We have drawn some comparisons between message brokers and databases. Even though they have traditionally been considered separate categories of tools, we saw that log-based message brokers have been successful in taking ideas from databases and applying them to messaging. We can also go in reverse: take ideas from messaging and streams, and apply them to databases.
我们已经对消息代理和数据库进行了一些比较。尽管它们传统上被认为是不同类别的工具,但我们发现基于日志的消息代理已成功地将数据库的思想应用于消息传递。我们也可以反向操作:从消息和数据流中获取想法,并将其应用于数据库。
We said previously that an event is a record of something that happened at some point in time. The thing that happened may be a user action (e.g., typing a search query), or a sensor reading, but it may also be a write to a database . The fact that something was written to a database is an event that can be captured, stored, and processed. This observation suggests that the connection between databases and streams runs deeper than just the physical storage of logs on disk—it is quite fundamental.
我们之前提到过,事件是发生在某个时间点的事情的记录。发生的事情可能是用户操作(例如,输入搜索查询),或传感器读数,但它也可能是对数据库的写入。将某些内容写入数据库是一个可以捕捉、存储和处理的事件。这个观察结果表明,数据库与流之间的联系比仅仅在磁盘上存储日志的物理存储更加深刻,它是相当基本的。
In fact, a replication log (see “Implementation of Replication Logs” ) is a stream of database write events, produced by the leader as it processes transactions. The followers apply that stream of writes to their own copy of the database and thus end up with an accurate copy of the same data. The events in the replication log describe the data changes that occurred.
实际上,复制日志(参见“复制日志的实现”)是领导者处理事务时生成的数据库写事件流。跟随者将该写流应用于其自己的数据库副本,因此最终得到了同样数据的准确副本。复制日志中的事件描述了发生的数据更改。
We also came across the state machine replication principle in “Total Order Broadcast” , which states: if every event represents a write to the database, and every replica processes the same events in the same order, then the replicas will all end up in the same final state. (Processing an event is assumed to be a deterministic operation.) It’s just another case of event streams!
我们在“全序广播”中也遇到了状态机复制原理,该原理指出:如果每个事件都代表对数据库的写操作,并且每个副本以相同顺序处理相同的事件,则所有副本最终都将处于相同的状态。 (假设处理事件是确定性操作。)这只是事件流的另一个案例!
In this section we will first look at a problem that arises in heterogeneous data systems, and then explore how we can solve it by bringing ideas from event streams to databases.
在本节中,我们首先会探讨异构数据系统中出现的问题,然后通过引入事件流的思想来解决它。
Keeping Systems in Sync
As we have seen throughout this book, there is no single system that can satisfy all data storage, querying, and processing needs. In practice, most nontrivial applications need to combine several different technologies in order to satisfy their requirements: for example, using an OLTP database to serve user requests, a cache to speed up common requests, a full-text index to handle search queries, and a data warehouse for analytics. Each of these has its own copy of the data, stored in its own representation that is optimized for its own purposes.
在本书中,我们已经看到,没有单一的系统能够满足所有数据存储、查询和处理的需求。实际上,大多数非平凡应用需要组合几种不同的技术来满足它们的需求,例如使用 OLTP 数据库来服务用户请求,使用缓存来加速常见请求,使用全文索引来处理搜索查询,以及使用数据仓库进行分析。每个技术都有自己的数据副本,存储在自己的表示形式中,其优化了自己的目的。
As the same or related data appears in several different places, they need to be kept in sync with one another: if an item is updated in the database, it also needs to be updated in the cache, search indexes, and data warehouse. With data warehouses this synchronization is usually performed by ETL processes (see “Data Warehousing” ), often by taking a full copy of a database, transforming it, and bulk-loading it into the data warehouse—in other words, a batch process. Similarly, we saw in “The Output of Batch Workflows” how search indexes, recommendation systems, and other derived data systems might be created using batch processes.
由于同一或相关数据出现在多个不同的地方,它们需要保持同步:如果在数据库中更新一个项目,则还需要在缓存、搜索索引和数据仓库中进行更新。对于数据仓库,通常通过ETL过程(请参见“数据仓库”)执行此同步,这通常是通过对数据库进行全量复制、转换并批量加载到数据仓库中即批处理的方式。同样,我们在“批处理工作流的输出”中看到,可以使用批处理过程创建搜索索引、推荐系统和其他派生数据系统。
If periodic full database dumps are too slow, an alternative that is sometimes used is dual writes , in which the application code explicitly writes to each of the systems when data changes: for example, first writing to the database, then updating the search index, then invalidating the cache entries (or even performing those writes concurrently).
如果定期进行完整的数据库转储速度太慢,有时会使用另一种替代方法——双写,应用程序代码会在数据发生更改时明确地向每个系统写入:例如,先写入数据库,然后更新搜索索引,然后使缓存条目无效(甚至同时执行这些写操作) 。
However, dual writes have some serious problems, one of which is a race condition illustrated in Figure 11-4 . In this example, two clients concurrently want to update an item X: client 1 wants to set the value to A, and client 2 wants to set it to B. Both clients first write the new value to the database, then write it to the search index. Due to unlucky timing, the requests are interleaved: the database first sees the write from client 1 setting the value to A, then the write from client 2 setting the value to B, so the final value in the database is B. The search index first sees the write from client 2, then client 1, so the final value in the search index is A. The two systems are now permanently inconsistent with each other, even though no error occurred.
然而,双重写入存在一些严重问题,其中之一是在图11-4中说明的竞态条件。在这个例子中,两个客户端同时想要更新项目X:客户端1想要将值设置为A,而客户端2想要将其设置为B。两个客户端先将新值写入数据库,然后将其写入搜索索引。由于不幸的时机,请求发生了交错:数据库首先看到了来自客户端1的写入,将值设置为A,然后看到客户端2设置值为B的写入,因此数据库中的最终值为B。搜索索引首先看到客户端2的写入,然后是客户端1,因此搜索索引中的最终值为A。即使没有发生错误,这两个系统现在永久不一致。
Unless you have some additional concurrency detection mechanism, such as the version vectors we discussed in “Detecting Concurrent Writes” , you will not even notice that concurrent writes occurred—one value will simply silently overwrite another value.
除非您拥有其他并发检测机制,比如我们在“检测并发写入”中讨论过的版本向量,否则您甚至不会注意到发生了并发写入——一个值将会默默地覆盖另一个值。
Another problem with dual writes is that one of the writes may fail while the other succeeds. This is a fault-tolerance problem rather than a concurrency problem, but it also has the effect of the two systems becoming inconsistent with each other. Ensuring that they either both succeed or both fail is a case of the atomic commit problem, which is expensive to solve (see “Atomic Commit and Two-Phase Commit (2PC)” ).
双写的另一个问题是,其中一个写入可能失败,而另一个则成功。这是一种容错问题,而不是并发问题,但它也会导致两个系统之间不一致。确保它们要么都成功要么都失败是原子提交问题的一个实例,其解决方法代价昂贵(参见“原子提交和两阶段提交(2PC)”)。
If you only have one replicated database with a single leader, then that leader determines the order of writes, so the state machine replication approach works among replicas of the database. However, in Figure 11-4 there isn’t a single leader: the database may have a leader and the search index may have a leader, but neither follows the other, and so conflicts can occur (see “Multi-Leader Replication” ).
如果您只有一个带有单个领导者的复制数据库,那么该领导者确定写入顺序,因此状态机复制方法在数据库的副本之间起作用。但是,在图11-4中没有单个领导者:数据库可能有一个领导者,搜索索引可能有一个领导者,但是两者都不遵循另一个,因此可能发生冲突(参见“多领导者复制”)。
The situation would be better if there really was only one leader—for example, the database—and if we could make the search index a follower of the database. But is this possible in practice?
如果确实只有一个领导者(例如数据库),并且我们能够使搜索索引成为数据库的追随者,那么情况会更好。但这在实践中可能吗?
Change Data Capture
The problem with most databases’ replication logs is that they have long been considered to be an internal implementation detail of the database, not a public API. Clients are supposed to query the database through its data model and query language, not parse the replication logs and try to extract data from them.
大多数数据库的复制日志问题在于,它们长期以来被认为是数据库的内部实现细节,而不是公共 API。客户端应该通过其数据模型和查询语言查询数据库,而不是解析复制日志并尝试从中提取数据。
For decades, many databases simply did not have a documented way of getting the log of changes written to them. For this reason it was difficult to take all the changes made in a database and replicate them to a different storage technology such as a search index, cache, or data warehouse.
几十年来,许多数据库根本没有记录获取其更改日志的方法。因此,将数据库中进行的所有更改复制到不同的存储技术(例如搜索索引、缓存或数据仓库)中就变得困难。
More recently, there has been growing interest in change data capture (CDC), which is the process of observing all data changes written to a database and extracting them in a form in which they can be replicated to other systems. CDC is especially interesting if changes are made available as a stream, immediately as they are written.
最近,越来越多的人对变更数据捕获(CDC)产生兴趣,这是观察写入数据库的所有数据更改并以可复制到其他系统的形式提取它们的过程。如果更改以流的形式立即提供,CDC尤其有趣。
For example, you can capture the changes in a database and continually apply the same changes to a search index. If the log of changes is applied in the same order, you can expect the data in the search index to match the data in the database. The search index and any other derived data systems are just consumers of the change stream, as illustrated in Figure 11-5 .
例如,您可以捕获数据库中的更改,并持续将相同的更改应用于搜索索引。如果按相同顺序应用更改日志,则可以期望搜索索引中的数据与数据库中的数据匹配。搜索索引和任何其他派生数据系统仅是更改流的使用者,如图 11-5所示。
Implementing change data capture
We can call the log consumers derived data systems , as discussed in the introduction to Part III : the data stored in the search index and the data warehouse is just another view onto the data in the system of record. Change data capture is a mechanism for ensuring that all changes made to the system of record are also reflected in the derived data systems so that the derived systems have an accurate copy of the data.
我们可以将日志消耗者派生的数据系统称为第三部分引言中讨论的系统,搜索索引和数据仓库存储的数据仅是系统记录中数据的另一种视图。更改数据捕获是一种机制,可确保所有对系统记录所做的更改也反映在派生数据系统中,以便派生系统具有准确的数据副本。
Essentially, change data capture makes one database the leader (the one from which the changes are captured), and turns the others into followers. A log-based message broker is well suited for transporting the change events from the source database, since it preserves the ordering of messages (avoiding the reordering issue of Figure 11-2 ).
本质上,更改数据捕获将一个数据库设为领导者(从中捕获更改),并将其他数据库转变为追随者。基于日志的消息代理非常适合传输来自源数据库的更改事件,因为它保留了消息的顺序(避免了图11-2中的重新排序问题)。
Database triggers can be used to implement change data capture (see “Trigger-based replication” ) by registering triggers that observe all changes to data tables and add corresponding entries to a changelog table. However, they tend to be fragile and have significant performance overheads. Parsing the replication log can be a more robust approach, although it also comes with challenges, such as handling schema changes.
数据库触发器可以用于实现更改数据捕获(参见“基于触发器的复制”),通过注册观察所有数据表更改并将对应条目添加到更改日志表的触发器。然而,它们往往很容易出错并且具有显着的性能开销。解析复制日志可以是更强大的方法,但它也带来了挑战,如处理模式更改。
LinkedIn’s Databus [ 25 ], Facebook’s Wormhole [ 26 ], and Yahoo!’s Sherpa [ 27 ] use this idea at large scale. Bottled Water implements CDC for PostgreSQL using an API that decodes the write-ahead log [ 28 ], Maxwell and Debezium do something similar for MySQL by parsing the binlog [ 29 , 30 , 31 ], Mongoriver reads the MongoDB oplog [ 32 , 33 ], and GoldenGate provides similar facilities for Oracle [ 34 , 35 ].
LinkedIn的Databus[25]、Facebook的Wormhole[26]和Yahoo!的Sherpa[27]大规模地使用了这个想法。Bottled Water使用API解码预写日志[28]为PostgreSQL实现CDC,Maxwell和Debezium通过解析binlog为MySQL做了类似的事情[29,30,31],Mongoriver读取MongoDB oplog[32,33],GoldenGate为Oracle提供类似的设施[34,35]。
Like message brokers, change data capture is usually asynchronous: the system of record database does not wait for the change to be applied to consumers before committing it. This design has the operational advantage that adding a slow consumer does not affect the system of record too much, but it has the downside that all the issues of replication lag apply (see “Problems with Replication Lag” ).
像消息代理一样,更改数据捕获通常是异步的:记录数据库的系统在提交更改之前不会等待消费者应用更改。该设计具有运营优势,即添加一个缓慢的消费者不会对系统的记录产生太大影响,但它的副作用是所有复制延迟的问题都适用(请参见“复制延迟的问题”)。
Initial snapshot
If you have the log of all changes that were ever made to a database, you can reconstruct the entire state of the database by replaying the log. However, in many cases, keeping all changes forever would require too much disk space, and replaying it would take too long, so the log needs to be truncated.
如果您拥有数据库中所有更改的日志记录,可以通过重播日志来重建数据库的整个状态。然而,在许多情况下,永久保留所有更改将需要太多的磁盘空间,并且重放将花费太长时间,因此需要截断日志。
Building a new full-text index, for example, requires a full copy of the entire database—it is not sufficient to only apply a log of recent changes, since it would be missing items that were not recently updated. Thus, if you don’t have the entire log history, you need to start with a consistent snapshot, as previously discussed in “Setting Up New Followers” .
建立新的全文索引,例如,需要完整复制整个数据库-仅应用最近更改的日志是不够的,因为它会缺少未最近更新的项目。因此,如果您没有完整的日志历史记录,您需要从一致的快照开始,如“设置新的关注者”中所讨论的那样。
The snapshot of the database must correspond to a known position or offset in the change log, so that you know at which point to start applying changes after the snapshot has been processed. Some CDC tools integrate this snapshot facility, while others leave it as a manual operation.
数据库的快照必须对应于更改日志中的已知位置或偏移量,这样在快照处理后,您就知道从哪个点开始应用更改。一些CDC工具会集成这种快照功能,而其他工具则将其留作手动操作。
Log compaction
If you can only keep a limited amount of log history, you need to go through the snapshot process every time you want to add a new derived data system. However, log compaction provides a good alternative.
如果你只能保留有限数量的日志历史记录,每次想添加新的派生数据系统时就需要进行快照过程。然而,日志压缩提供了一个很好的替代方案。
We discussed log compaction previously in “Hash Indexes” , in the context of log-structured storage engines (see Figure 3-2 for an example). The principle is simple: the storage engine periodically looks for log records with the same key, throws away any duplicates, and keeps only the most recent update for each key. This compaction and merging process runs in the background.
我们之前在“哈希索引”一章中讨论了日志压缩,其中的上下文是基于日志结构的存储引擎(参见图3-2作为示例)。原理很简单:存储引擎周期性地查找具有相同键的日志记录,丢弃任何副本,并仅保留每个键的最新更新。这个压缩和合并的过程在后台运行。
In a log-structured storage engine, an update with a special null value (a tombstone ) indicates that a key was deleted, and causes it to be removed during log compaction. But as long as a key is not overwritten or deleted, it stays in the log forever. The disk space required for such a compacted log depends only on the current contents of the database, not the number of writes that have ever occurred in the database. If the same key is frequently overwritten, previous values will eventually be garbage-collected, and only the latest value will be retained.
在基于日志结构的存储引擎中,通过使用特定的空值(墓碑),更新表示密钥已被删除,并在日志压缩期间将其删除。但只要密钥未被覆盖或删除,它就会永远保留在日志中。这样压缩日志所需的磁盘空间仅取决于数据库的当前内容,不取决于数据库中以前执行的写入次数。如果经常覆盖同一密钥,则先前的值最终将被垃圾回收,仅保留最新的值。
The same idea works in the context of log-based message brokers and change data capture. If the CDC system is set up such that every change has a primary key, and every update for a key replaces the previous value for that key, then it’s sufficient to keep just the most recent write for a particular key.
在基于日志的消息代理和变更数据捕获的上下文中,同样的想法适用。如果CDC系统设置为每个更改都有一个主键,并且每个关键字的更新替换该关键字的先前值,则仅保留特定关键字的最新写入即足够。
Now, whenever you want to rebuild a derived data system such as a search index, you can start a new consumer from offset 0 of the log-compacted topic, and sequentially scan over all messages in the log. The log is guaranteed to contain the most recent value for every key in the database (and maybe some older values)—in other words, you can use it to obtain a full copy of the database contents without having to take another snapshot of the CDC source database.
现在,每当您想重新构建派生数据系统,例如搜索索引,您可以从日志压缩主题的偏移0开始启动新的消费者,并按顺序扫描日志中的所有消息。日志保证包含数据库中每个键的最新值(可能还有一些旧值),换句话说,您可以使用它获取数据库内容的完整副本,而无需对CDC源数据库进行另一个快照。
This log compaction feature is supported by Apache Kafka. As we shall see later in this chapter, it allows the message broker to be used for durable storage, not just for transient messaging.
这个日志压缩功能由Apache Kafka支持。正如我们在本章后面所将看到的,这使得消息代理可以被用作持久化存储,而不仅仅是短暂的消息传递。
API support for change streams
Increasingly, databases are beginning to support change streams as a first-class interface, rather than the typical retrofitted and reverse-engineered CDC efforts. For example, RethinkDB allows queries to subscribe to notifications when the results of a query change [ 36 ], Firebase [ 37 ] and CouchDB [ 38 ] provide data synchronization based on a change feed that is also made available to applications, and Meteor uses the MongoDB oplog to subscribe to data changes and update the user interface [ 39 ].
越来越多的数据库开始支持变更流作为一流的接口,而不是典型的后续和逆向工程的CDC努力。例如,RethinkDB允许查询订阅通知查询结果更改(36),Firebase(37)和CouchDB(38)基于变更流提供数据同步,也可供应用程序使用,Meteor使用MongoDB oplog来订阅数据更改并更新用户界面(39)。
VoltDB allows transactions to continuously export data from a database in the form of a stream [ 40 ]. The database represents an output stream in the relational data model as a table into which transactions can insert tuples, but which cannot be queried. The stream then consists of the log of tuples that committed transactions have written to this special table, in the order they were committed. External consumers can asynchronously consume this log and use it to update derived data systems.
VoltDB允许事务连续以流的形式从数据库中导出数据。该数据库将输出流在关系数据模型中表示为一个表,事务可以插入元组,但不能查询。然后,该流由已提交的事务写入到此特殊表中的元组日志组成,按照它们提交的顺序。外部消费者可以异步消费此日志并使用它来更新衍生数据系统。
Kafka Connect [ 41 ] is an effort to integrate change data capture tools for a wide range of database systems with Kafka. Once the stream of change events is in Kafka, it can be used to update derived data systems such as search indexes, and also feed into stream processing systems as discussed later in this chapter.
Kafka Connect是将变更数据捕捉工具整合到各种数据库系统和Kafka的一种努力。一旦变更事件的流在Kafka中,它就可以用来更新派生数据系统,例如搜索索引,同时也可以馈送到流处理系统,正如本章后面所讨论的那样。
Event Sourcing
There are some parallels between the ideas we’ve discussed here and event sourcing , a technique that was developed in the domain-driven design (DDD) community [ 42 , 43 , 44 ]. We will discuss event sourcing briefly, because it incorporates some useful and relevant ideas for streaming systems.
这里讨论的思想与事件溯源存在一些相似之处,事件溯源是在面向领域的设计(DDD)社区中开发的一种技术 [42, 43, 44]。我们会简短地讨论事件溯源,因为它融合了一些有用和相关的流式系统思想。
Similarly to change data capture, event sourcing involves storing all changes to the application state as a log of change events. The biggest difference is that event sourcing applies the idea at a different level of abstraction:
与更改数据捕获类似,事件源存储应用程序状态的所有更改作为更改事件日志。最大的区别在于事件源将这个想法应用于不同的抽象级别:
-
In change data capture, the application uses the database in a mutable way, updating and deleting records at will. The log of changes is extracted from the database at a low level (e.g., by parsing the replication log), which ensures that the order of writes extracted from the database matches the order in which they were actually written, avoiding the race condition in Figure 11-4 . The application writing to the database does not need to be aware that CDC is occurring.
在变化数据捕获中,应用程序会以可变方式使用数据库,随意更新和删除记录。变更日志将从数据库中以低层级方式提取出来(例如,通过解析复制日志),这可以确保从数据库中提取的写入顺序与实际写入顺序相匹配,避免了图11-4中的竞争条件。写入数据库的应用程序不需要知道CDC正在发生。
-
In event sourcing, the application logic is explicitly built on the basis of immutable events that are written to an event log. In this case, the event store is append-only, and updates or deletes are discouraged or prohibited. Events are designed to reflect things that happened at the application level, rather than low-level state changes.
在事件溯源中,应用程序逻辑是明确基于写入事件日志的不可变事件构建的。在这种情况下,事件存储是追加模式的,更新或删除是不鼓励或禁止的。事件旨在反映应用程序级别发生的事情,而非低级状态更改。
Event sourcing is a powerful technique for data modeling: from an application point of view it is more meaningful to record the user’s actions as immutable events, rather than recording the effect of those actions on a mutable database. Event sourcing makes it easier to evolve applications over time, helps with debugging by making it easier to understand after the fact why something happened, and guards against application bugs (see “Advantages of immutable events” ).
事件溯源是一种强大的数据建模技术:从应用程序的角度来看,记录用户的行为作为不可变事件是更有意义的,而不是记录这些行为对可变数据库的影响。事件溯源使得随着时间的推移更容易演进应用程序,有助于调试,因为它更容易理解事后发生了什么,同时可以防止应用程序错误发生(详见“不可变事件的优点”)。
For example, storing the event “student cancelled their course enrollment” clearly expresses the intent of a single action in a neutral fashion, whereas the side effects “one entry was deleted from the enrollments table, and one cancellation reason was added to the student feedback table” embed a lot of assumptions about the way the data is later going to be used. If a new application feature is introduced—for example, “the place is offered to the next person on the waiting list”—the event sourcing approach allows that new side effect to easily be chained off the existing event.
例如,存储“学生取消了他们的课程注册”事件可以清晰地表达单个行动的意图,以中立的方式表达,而“从注册表中删除一个条目并在学生反馈表中添加一个取消原因”的副作用则嵌入了很多关于数据后续使用方式的假设。如果引入了新的应用程序功能 - 例如,“将位置提供给等候名单上的下一个人”,事件采集方法可以使新的副作用轻松地针对现有事件进行链接。
Event sourcing is similar to the chronicle data model [ 45 ], and there are also similarities between an event log and the fact table that you find in a star schema (see “Stars and Snowflakes: Schemas for Analytics” ).
事件溯源类似于编年史数据模型[45],事件日志也与星型模式中的事实表存在相似之处(参见“星型和雪花型:用于分析的模式”)。
Specialized databases such as Event Store [ 46 ] have been developed to support applications using event sourcing, but in general the approach is independent of any particular tool. A conventional database or a log-based message broker can also be used to build applications in this style.
专门的数据库,如事件存储库,已经被开发出来以支持使用事件溯源的应用程序,但一般来说,这种方法不依赖于任何特定的工具。传统的数据库或基于日志的消息代理也可以用来构建这种风格的应用程序。
Deriving current state from the event log
An event log by itself is not very useful, because users generally expect to see the current state of a system, not the history of modifications. For example, on a shopping website, users expect to be able to see the current contents of their cart, not an append-only list of all the changes they have ever made to their cart.
事件日志本身并不是很有用,因为用户通常希望看到系统的当前状态,而不是修改历史。例如,在购物网站上,用户希望能够看到其购物车的当前内容,而不是一个仅包含其购物车所有更改的追加列表。
Thus, applications that use event sourcing need to take the log of events (representing the data written to the system) and transform it into application state that is suitable for showing to a user (the way in which data is read from the system [ 47 ]). This transformation can use arbitrary logic, but it should be deterministic so that you can run it again and derive the same application state from the event log.
因此,使用事件溯源的应用程序需要记录事件日志(代表写入系统的数据),并将其转化为适合向用户展示的应用程序状态(系统中读取数据的方式[47])。这种转化可以使用任何逻辑,但应该是确定性的,这样您可以再次运行它,并从事件日志中派生相同的应用程序状态。
Like with change data capture, replaying the event log allows you to reconstruct the current state of the system. However, log compaction needs to be handled differently:
像更改数据捕获一样,重放事件日志允许您重建系统的当前状态。然而,需要以不同的方式处理日志压缩:
-
A CDC event for the update of a record typically contains the entire new version of the record, so the current value for a primary key is entirely determined by the most recent event for that primary key, and log compaction can discard previous events for the same key.
CDC事件用于更新记录时,通常包含完整的新记录版本,因此主键的当前值完全取决于该主键的最新事件,并且日志压缩可以丢弃相同键的先前事件。
-
On the other hand, with event sourcing, events are modeled at a higher level: an event typically expresses the intent of a user action, not the mechanics of the state update that occurred as a result of the action. In this case, later events typically do not override prior events, and so you need the full history of events to reconstruct the final state. Log compaction is not possible in the same way.
另一方面,事件溯源模型更高层次地建模事件:事件通常表达用户操作的意图,而不是由于该操作导致的状态更新的机制。在这种情况下,后续事件通常不会覆盖先前事件,因此您需要完整的事件历史记录来重建最终状态。日志压缩无法以相同的方式进行。
Applications that use event sourcing typically have some mechanism for storing snapshots of the current state that is derived from the log of events, so they don’t need to repeatedly reprocess the full log. However, this is only a performance optimization to speed up reads and recovery from crashes; the intention is that the system is able to store all raw events forever and reprocess the full event log whenever required. We discuss this assumption in “Limitations of immutability” .
使用事件源应用程序通常具有某种机制来存储当前状态的快照,该快照来自事件日志,以便它们不需要重复处理完整的日志。但是,这只是一种性能优化,可加快读取和从崩溃中恢复的速度;意图是系统能够永久存储所有原始事件并在需要时重新处理完整的事件日志。我们在“不可变性的局限性”中讨论了这种假设。
Commands and events
The event sourcing philosophy is careful to distinguish between events and commands [ 48 ]. When a request from a user first arrives, it is initially a command: at this point it may still fail, for example because some integrity condition is violated. The application must first validate that it can execute the command. If the validation is successful and the command is accepted, it becomes an event, which is durable and immutable.
事件溯源哲学特别注重区分事件和命令[48]。当用户的请求首次到达时,它最初是一个命令:此时它仍可能失败,例如因为某些完整性条件被破坏。应用程序必须先验证它是否能执行此命令。如果验证成功且命令被接受,则成为一个持久且不可变的事件。
For example, if a user tries to register a particular username, or reserve a seat on an airplane or in a theater, then the application needs to check that the username or seat is not already taken. (We previously discussed this example in “Fault-Tolerant Consensus” .) When that check has succeeded, the application can generate an event to indicate that a particular username was registered by a particular user ID, or that a particular seat has been reserved for a particular customer.
例如,如果用户尝试注册特定的用户名,或者预留飞机或剧院的座位,那么应用程序需要检查该用户名或座位是否已经被占用。(我们之前在“容错共识”中讨论过这个例子。)当检查成功时,应用程序可以生成一个事件,指示特定的用户名已经被特定的用户ID注册,或者特定的座位已经被特定的客户预订。
At the point when the event is generated, it becomes a fact . Even if the customer later decides to change or cancel the reservation, the fact remains true that they formerly held a reservation for a particular seat, and the change or cancellation is a separate event that is added later.
当事件被触发时,它便成为了一个事实。即使顾客后来决定改变或取消预订,这个事实仍然成立,即他们曾经持有预订特定座位的凭证,并且更改或取消是一个后来被添加的单独事件。
A consumer of the event stream is not allowed to reject an event: by the time the consumer sees the event, it is already an immutable part of the log, and it may have already been seen by other consumers. Thus, any validation of a command needs to happen synchronously, before it becomes an event—for example, by using a serializable transaction that atomically validates the command and publishes the event.
事件流的消费者不允许拒绝事件:当消费者看到事件时,它已经是日志不可变的一部分,而且可能已经被其他消费者看到了。因此,任何命令的验证都需要同步发生,即在成为事件之前,通过使用序列化事务原子验证命令并发布事件。
Alternatively, the user request to reserve a seat could be split into two events: first a tentative reservation, and then a separate confirmation event once the reservation has been validated (as discussed in “Implementing linearizable storage using total order broadcast” ). This split allows the validation to take place in an asynchronous process.
Alternatively可以翻译成“或者”,用户请求预留座位可以分成两个事件:首先是暂时预留,然后是在预留得到确认后进行的确认事件(如“使用总序列广播实现线性化存储”中所讨论的)。这种分割允许验证过程在异步处理中进行。
State, Streams, and Immutability
We saw in Chapter 10 that batch processing benefits from the immutability of its input files, so you can run experimental processing jobs on existing input files without fear of damaging them. This principle of immutability is also what makes event sourcing and change data capture so powerful.
在第10章中,我们看到批处理从其输入文件的不变性中获益,因此您可以在现有输入文件上运行实验处理作业,而不用担心损坏它们。这种不变性原则也使得事件溯源和变更数据捕获非常强大。
We normally think of databases as storing the current state of the application—this representation is optimized for reads, and it is usually the most convenient for serving queries. The nature of state is that it changes, so databases support updating and deleting data as well as inserting it. How does this fit with immutability?
我们通常认为数据库存储应用程序的当前状态——对于读取而言,该表示方式被优化,并且通常最方便用于提供查询。状态的特性在于它会发生变化,因此数据库支持更新、删除数据以及插入数据。这与不变性如何相适应?
Whenever you have state that changes, that state is the result of the events that mutated it over time. For example, your list of currently available seats is the result of the reservations you have processed, the current account balance is the result of the credits and debits on the account, and the response time graph for your web server is an aggregation of the individual response times of all web requests that have occurred.
无论何时,当你拥有一个不断变化的状态时,这个状态都是随着时间发生了变化的事件的结果。例如,你当前可用座位的列表取决于你处理过的预订,当前账户余额是账户上的借贷的结果,你网站服务器的响应时间图是所有发生过的网络请求的响应时间的聚合。
No matter how the state changes, there was always a sequence of events that caused those changes. Even as things are done and undone, the fact remains true that those events occurred. The key idea is that mutable state and an append-only log of immutable events do not contradict each other: they are two sides of the same coin. The log of all changes, the changelog , represents the evolution of state over time.
无论状态如何改变,总会有一系列事件导致这些变化。即使事物发生了变化和消失,事实仍然存在:这些事件发生了。关键思想是可变状态和只追加不可变事件的日志并不矛盾:它们是同一硬币的两面。所有变化的日志,即更改日志,代表了状态随时间的演变。
If you are mathematically inclined, you might say that the application state is what you get when you integrate an event stream over time, and a change stream is what you get when you differentiate the state by time, as shown in Figure 11-6 [ 49 , 50 , 51 ]. The analogy has limitations (for example, the second derivative of state does not seem to be meaningful), but it’s a useful starting point for thinking about data.
如果你在数学方面有天赋,你可能会说应用程序状态是随时间积分事件流得到的,而更改流是通过时间对状态求导得到的,如图11-6所示。这个类比有局限性(例如,状态的二阶导数似乎没有意义),但它是思考数据的有用起点。
If you store the changelog durably, that simply has the effect of making the state reproducible. If you consider the log of events to be your system of record, and any mutable state as being derived from it, it becomes easier to reason about the flow of data through a system. As Pat Helland puts it [ 52 ]:
如果您持久性地存储更改日志,那只会使状态可再生。如果您将事件日志视为记录系统,将任何可变状态视为从中派生,那么可以更轻松地推理数据在系统中的流动。正如Pat Helland所说:'“这简化了对数据流的推理。”
Transaction logs record all the changes made to the database. High-speed appends are the only way to change the log. From this perspective, the contents of the database hold a caching of the latest record values in the logs. The truth is the log. The database is a cache of a subset of the log. That cached subset happens to be the latest value of each record and index value from the log.
事务日志记录对数据库所做的所有更改。高速追加是更改日志的唯一方法。从这个角度来看,数据库的内容保留了日志中最新记录值的缓存。事实就是日志。数据库是日志的一个子集缓存。该缓存子集恰好是从日志中每个记录和索引值中提取的最新值。
Log compaction, as discussed in “Log compaction” , is one way of bridging the distinction between log and database state: it retains only the latest version of each record, and discards overwritten versions.
日志压缩是一种方法,用来消除日志和数据库状态之间的区别:它仅保留每个记录的最新版本,并且丢弃覆盖的版本。
Advantages of immutable events
Immutability in databases is an old idea. For example, accountants have been using immutability for centuries in financial bookkeeping. When a transaction occurs, it is recorded in an append-only ledger , which is essentially a log of events describing money, goods, or services that have changed hands. The accounts, such as profit and loss or the balance sheet, are derived from the transactions in the ledger by adding them up [ 53 ].
数据库中的不可变性是一个古老的概念。例如,会计师在财务簿记中已经使用不可变性几个世纪了。当发生交易时,它被记录在一个附加日志中,该日志本质上是描述已经转移的资金、商品或服务事件的日志。损益表或资产负债表等帐户是通过将其加起来从日志中派生出来的交易[53]。
If a mistake is made, accountants don’t erase or change the incorrect transaction in the ledger—instead, they add another transaction that compensates for the mistake, for example refunding an incorrect charge. The incorrect transaction still remains in the ledger forever, because it might be important for auditing reasons. If incorrect figures, derived from the incorrect ledger, have already been published, then the figures for the next accounting period include a correction. This process is entirely normal in accounting [ 54 ].
如果出现错误,会计师不会在账簿中擦除或更改不正确的交易——相反,他们会添加另一笔交易来补偿错误,例如退还错误的费用。不正确的交易仍然会永久保存在账簿中,因为这可能对审核原因很重要。如果已经发布了从错误账簿派生的不正确数字,则下一个会计期间的数字将包括校正。这个过程在会计中是非常正常的 [54]。
Although such auditability is particularly important in financial systems, it is also beneficial for many other systems that are not subject to such strict regulation. As discussed in “Philosophy of batch process outputs” , if you accidentally deploy buggy code that writes bad data to a database, recovery is much harder if the code is able to destructively overwrite data. With an append-only log of immutable events, it is much easier to diagnose what happened and recover from the problem.
尽管审计能力在金融系统中尤为重要,但对许多其他系统也有益,这些系统并不受到如此严格的监管。如“批处理输出哲学”中所讨论的,如果您不小心部署了有错误的代码,将糟糕的数据写入数据库,如果该代码能够具有破坏性地覆盖数据,则恢复将更加困难。使用不可变时间轴的追加日志记录事件,诊断问题和恢复变得更加容易。
Immutable events also capture more information than just the current state. For example, on a shopping website, a customer may add an item to their cart and then remove it again. Although the second event cancels out the first event from the point of view of order fulfillment, it may be useful to know for analytics purposes that the customer was considering a particular item but then decided against it. Perhaps they will choose to buy it in the future, or perhaps they found a substitute. This information is recorded in an event log, but would be lost in a database that deletes items when they are removed from the cart [ 42 ].
不可变事件还可以捕捉比当前状态更多的信息。例如,在购物网站上,客户可能会将商品添加到购物车中,然后再将其移除。尽管第二个事件从订单履行的角度来看会取消第一个事件,但为了分析目的,知道客户正在考虑某个商品,但最终决定不购买,可能很有用。也许他们将来会选择购买它,或者可能找到替代品。这些信息记录在事件日志中,但在从购物车中删除商品的数据库中将丢失。
Deriving several views from the same event log
Moreover, by separating mutable state from the immutable event log, you can derive several different read-oriented representations from the same log of events. This works just like having multiple consumers of a stream ( Figure 11-5 ): for example, the analytic database Druid ingests directly from Kafka using this approach [ 55 ], Pistachio is a distributed key-value store that uses Kafka as a commit log [ 56 ], and Kafka Connect sinks can export data from Kafka to various different databases and indexes [ 41 ]. It would make sense for many other storage and indexing systems, such as search servers, to similarly take their input from a distributed log (see “Keeping Systems in Sync” ).
此外,通过将可变状态与不可变事件日志分开,您可以从相同的事件日志中获得多种不同的读取导向表示。这就像拥有多个流的消费者一样(图11-5):例如,分析数据库Druid使用此方法直接从Kafka摄取[55],Pistachio是一种分布式键值存储,它使用Kafka作为提交日志[56],而Kafka Connect Sinks可以将数据从Kafka导出到各种不同的数据库和索引[41]。许多其他存储和索引系统,例如搜索服务器,同样可以从分布式日志中获取它们的输入(请参阅“保持系统同步”)。
Having an explicit translation step from an event log to a database makes it easier to evolve your application over time: if you want to introduce a new feature that presents your existing data in some new way, you can use the event log to build a separate read-optimized view for the new feature, and run it alongside the existing systems without having to modify them. Running old and new systems side by side is often easier than performing a complicated schema migration in an existing system. Once the old system is no longer needed, you can simply shut it down and reclaim its resources [ 47 , 57 ].
将事件日志明确地翻译到数据库中,有助于随着时间的推移,演变应用程序:如果您想引入一种新功能,以某种新方式呈现现有数据,您可以使用事件日志为新功能构建单独的读取优化视图,在不修改它们的情况下与现有系统并行运行。在现有系统中执行复杂的模式迁移比在旧系统旁边运行旧和新系统通常更容易。一旦不再需要旧系统,您可以简单地关闭它并回收其资源[47、57]。
Storing data is normally quite straightforward if you don’t have to worry about how it is going to be queried and accessed; many of the complexities of schema design, indexing, and storage engines are the result of wanting to support certain query and access patterns (see Chapter 3 ). For this reason, you gain a lot of flexibility by separating the form in which data is written from the form it is read, and by allowing several different read views. This idea is sometimes known as command query responsibility segregation (CQRS) [ 42 , 58 , 59 ].
如果您不必担心数据如何查询和访问,那么存储数据通常相当简单;模式设计、索引和存储引擎的许多复杂性都是为了支持特定的查询和访问模式(请参见第3章)。由于这个原因,通过将数据写入和读取的方式分离,以及允许多个不同的读取视图,可以获得很多的灵活性。这个想法有时被称为命令查询责任分离(CQRS)[42,58,59]。
The traditional approach to database and schema design is based on the fallacy that data must be written in the same form as it will be queried. Debates about normalization and denormalization (see “Many-to-One and Many-to-Many Relationships” ) become largely irrelevant if you can translate data from a write-optimized event log to read-optimized application state: it is entirely reasonable to denormalize data in the read-optimized views, as the translation process gives you a mechanism for keeping it consistent with the event log.
传统的数据库和模式设计方法是基于一种谬论,即数据必须以查询的形式写入。如果您可以将数据从写优化的事件日志翻译成读优化的应用程序状态,则有关规范化和去规范化的辩论(请参见“多对一和多对多关系”)将变得相对无关紧要:在读优化视图中去规范化数据是完全合理的,因为翻译过程为您提供了一种保持一致性的机制。
In “Describing Load” we discussed Twitter’s home timelines, a cache of recently written tweets by the people a particular user is following (like a mailbox). This is another example of read-optimized state: home timelines are highly denormalized, since your tweets are duplicated in all of the timelines of the people following you. However, the fan-out service keeps this duplicated state in sync with new tweets and new following relationships, which keeps the duplication manageable.
在“描述载荷”一节中,我们讨论了Twitter的主页时间线,这是一个由特定用户正在关注的人最近撰写的推文的缓存(类似于邮箱)。这是另一个读取优化状态的例子:主页时间线高度非规范化,因为你的推文在所有关注你的人的时间线中都会重复。然而,分扇服务使这个重复的状态与新的推文和新的关注关系同步,从而使重复变得可管理。
Concurrency control
The biggest downside of event sourcing and change data capture is that the consumers of the event log are usually asynchronous, so there is a possibility that a user may make a write to the log, then read from a log-derived view and find that their write has not yet been reflected in the read view. We discussed this problem and potential solutions previously in “Reading Your Own Writes” .
事件溯源和变更数据捕获的最大缺点是事件日志的消费者通常是异步的,因此存在一个可能性,即用户可能会将其写入日志,然后从日志派生的视图中读取,并发现其写入尚未在读取视图中反映出来。我们在“读取自己的写入”中曾经讨论过这个问题和潜在的解决方案。
One solution would be to perform the updates of the read view synchronously with appending the event to the log. This requires a transaction to combine the writes into an atomic unit, so either you need to keep the event log and the read view in the same storage system, or you need a distributed transaction across the different systems. Alternatively, you could use the approach discussed in “Implementing linearizable storage using total order broadcast” .
一个解决方案是在将事件追加到日志时同步执行读视图的更新。这需要一个事务将写操作组合成一个原子单元,因此你需要将事件日志和读视图保存在同一个存储系统中,或者你需要在不同系统之间使用分布式事务。另外,你可以使用“使用总序广播实现可线性化存储”的方法。
On the other hand, deriving the current state from an event log also simplifies some aspects of concurrency control. Much of the need for multi-object transactions (see “Single-Object and Multi-Object Operations” ) stems from a single user action requiring data to be changed in several different places. With event sourcing, you can design an event such that it is a self-contained description of a user action. The user action then requires only a single write in one place—namely appending the events to the log—which is easy to make atomic.
另一方面,从事件日志中获取当前状态也简化了并发控制的某些方面。大部分需要多对象事务(参见“单对象和多对象操作”)的原因在于单个用户操作需要在多个不同位置更改数据。使用事件源,您可以设计一个事件,使其成为用户操作的独立描述。然后,用户操作仅需要在一个地方进行单个写入 - 即将事件附加到日志中 - 这很容易使其原子化。
If the event log and the application state are partitioned in the same way (for example, processing an event for a customer in partition 3 only requires updating partition 3 of the application state), then a straightforward single-threaded log consumer needs no concurrency control for writes—by construction, it only processes a single event at a time (see also “Actual Serial Execution” ). The log removes the nondeterminism of concurrency by defining a serial order of events in a partition [ 24 ]. If an event touches multiple state partitions, a bit more work is required, which we will discuss in Chapter 12 .
如果事件日志和应用状态以相同的方式进行分区(例如,仅需更新应用状态的分区3以处理客户的事件),则简单的单线程日志消费者不需要并发控制来进行写操作——由于构造原因,它一次只处理一个事件(另请参阅“实际串行执行”)。日志通过定义分区中事件的序列顺序来消除并发的非确定性[24]。如果事件涉及多个状态分区,则需要更多工作,在第12章中我们将讨论这些。
Limitations of immutability
Many systems that don’t use an event-sourced model nevertheless rely on immutability: various databases internally use immutable data structures or multi-version data to support point-in-time snapshots (see “Indexes and snapshot isolation” ). Version control systems such as Git, Mercurial, and Fossil also rely on immutable data to preserve version history of files.
许多不使用事件溯源模型的系统仍然依赖于不变性:各种数据库内部使用不可变数据结构或多版本数据来支持时间点快照(请参见“索引和快照隔离”)。版本控制系统如Git、Mercurial和Fossil也依赖于不变数据来保留文件版本历史记录。
To what extent is it feasible to keep an immutable history of all changes forever? The answer depends on the amount of churn in the dataset. Some workloads mostly add data and rarely update or delete; they are easy to make immutable. Other workloads have a high rate of updates and deletes on a comparatively small dataset; in these cases, the immutable history may grow prohibitively large, fragmentation may become an issue, and the performance of compaction and garbage collection becomes crucial for operational robustness [ 60 , 61 ].
保留所有更改的不可变历史记录是否可行取决于数据集中的变化量。某些工作负载主要添加数据,很少更新或删除;它们易于变为不可变。其他工作负载在相对较小的数据集上具有高更新和删除率;在这些情况下,不可变历史记录可能会增长到无法承受的大小,碎片化可能会成为一个问题,压缩和垃圾收集的性能变得至关重要 [60、61]。
Besides the performance reasons, there may also be circumstances in which you need data to be deleted for administrative reasons, in spite of all immutability. For example, privacy regulations may require deleting a user’s personal information after they close their account, data protection legislation may require erroneous information to be removed, or an accidental leak of sensitive information may need to be contained.
除了性能原因,由于管理原因,您可能需要删除数据,尽管存在不可变性。例如,隐私规定可能要求在用户关闭账户后删除其个人信息,数据保护法规可能要求删除错误信息,或者敏感信息的意外泄露可能需要被控制。
In these circumstances, it’s not sufficient to just append another event to the log to indicate that the prior data should be considered deleted—you actually want to rewrite history and pretend that the data was never written in the first place. For example, Datomic calls this feature excision [ 62 ], and the Fossil version control system has a similar concept called shunning [ 63 ].
在这种情况下,仅仅将另一个事件附加到日志中以指示应将先前的数据视为已删除是不足够的 - 你实际上想要重写历史并假装第一次没有写入数据。例如,Datomic称此功能为切除[62],而Fossil版本控制系统具有类似的概念称为shunning[63]。
Truly deleting data is surprisingly hard [ 64 ], since copies can live in many places: for example, storage engines, filesystems, and SSDs often write to a new location rather than overwriting in place [ 52 ], and backups are often deliberately immutable to prevent accidental deletion or corruption. Deletion is more a matter of “making it harder to retrieve the data” than actually “making it impossible to retrieve the data.” Nevertheless, you sometimes have to try, as we shall see in “Legislation and self-regulation” .
真正删除数据的难度出人意料 [64],因为副本可以存在于许多地方: 例如,存储引擎、文件系统和SSD通常在新位置写入而不是原地覆盖 [52],备份通常故意不可变,以防止意外删除或损坏。删除更多地是“让检索数据变得更难”而不是真正“使检索数据变得不可能”。虽然如此,有时您必须尝试,正如我们将在“立法和自我调节”中看到的那样。
Processing Streams
So far in this chapter we have talked about where streams come from (user activity events, sensors, and writes to databases), and we have talked about how streams are transported (through direct messaging, via message brokers, and in event logs).
到目前为止,在这一章中,我们已经谈论了流从哪来(用户活动事件,传感器和对数据库的写入),以及我们谈论了流如何传输(通过直接消息传递,通过消息代理和事件日志)。
What remains is to discuss what you can do with the stream once you have it—namely, you can process it. Broadly, there are three options:
剩下的就是讨论一旦你拥有了这个数据流你可以做什么——也就是你可以对其进行处理。大致上,有三个选项:
-
You can take the data in the events and write it to a database, cache, search index, or similar storage system, from where it can then be queried by other clients. As shown in Figure 11-5 , this is a good way of keeping a database in sync with changes happening in other parts of the system—especially if the stream consumer is the only client writing to the database. Writing to a storage system is the streaming equivalent of what we discussed in “The Output of Batch Workflows” .
您可以将事件中的数据写入数据库、缓存、搜索索引或类似存储系统中,其他客户端可以从中查询。如图11-5所示,这是一种很好的方法,可以保持数据库与系统中其他部分发生的更改同步,特别是如果流式消费者是写入数据库的唯一客户端。写入存储系统就相当于我们在“批量工作流的输出”中讨论的流式等效物。
-
You can push the events to users in some way, for example by sending email alerts or push notifications, or by streaming the events to a real-time dashboard where they are visualized. In this case, a human is the ultimate consumer of the stream.
你可以通过发送电子邮件或推送通知,或将事件流式传输到实时仪表板并进行可视化的方式向用户推送事件。在这种情况下,人类是流的终极消费者。
-
You can process one or more input streams to produce one or more output streams. Streams may go through a pipeline consisting of several such processing stages before they eventually end up at an output (option 1 or 2).
您可以处理一个或多个输入流以生成一个或多个输出流。流可能通过由几个这样的处理阶段组成的管道经过,在最终到达选项 1 或 2 的输出之前。
In the rest of this chapter, we will discuss option 3: processing streams to produce other, derived streams. A piece of code that processes streams like this is known as an operator or a job . It is closely related to the Unix processes and MapReduce jobs we discussed in Chapter 10 , and the pattern of dataflow is similar: a stream processor consumes input streams in a read-only fashion and writes its output to a different location in an append-only fashion.
在本章的其余部分中,我们将讨论选项3:处理流以生成其他派生流。像这样处理流的代码被称为操作符或作业。它与我们在第10章中讨论的Unix进程和MapReduce作业密切相关,数据流模式相似:流处理器以只读的方式消耗输入流,并以追加的方式将其输出写入不同的位置。
The patterns for partitioning and parallelization in stream processors are also very similar to those in MapReduce and the dataflow engines we saw in Chapter 10 , so we won’t repeat those topics here. Basic mapping operations such as transforming and filtering records also work the same.
流处理器中的分区和并行化模式与MapReduce和我们在第10章中看到的数据流引擎非常相似,因此我们不会在这里重复这些主题。基本的映射操作,如转换和过滤记录,也是相同的。
The one crucial difference to batch jobs is that a stream never ends. This difference has many implications: as discussed at the start of this chapter, sorting does not make sense with an unbounded dataset, and so sort-merge joins (see “Reduce-Side Joins and Grouping” ) cannot be used. Fault-tolerance mechanisms must also change: with a batch job that has been running for a few minutes, a failed task can simply be restarted from the beginning, but with a stream job that has been running for several years, restarting from the beginning after a crash may not be a viable option.
一个批处理和流处理的关键区别是流处理从不结束。这个差异有许多影响:就像本章开头所讨论的,对于一个无限的数据集来说排序是没有意义的,因此无法使用排序合并连接(参见“Reduce-Side Joins and Grouping”)。容错机制也必须改变:对于运行几分钟的批处理作业,失败的任务可以从头重新开始,但对于运行数年的流处理作业,在崩溃后从头重新开始可能不是可行的选择。
Uses of Stream Processing
Stream processing has long been used for monitoring purposes, where an organization wants to be alerted if certain things happen. For example:
流处理长期以来一直被用于监控目的,其中组织希望在发生某些事情时得到警报。例如:
-
Fraud detection systems need to determine if the usage patterns of a credit card have unexpectedly changed, and block the card if it is likely to have been stolen.
欺诈检测系统需要确定信用卡使用模式是否意外更改,并在可能被盗时封锁该卡。
-
Trading systems need to examine price changes in a financial market and execute trades according to specified rules.
交易系统需要审查金融市场的价格变化,并按照规定的规则执行交易。
-
Manufacturing systems need to monitor the status of machines in a factory, and quickly identify the problem if there is a malfunction.
制造系统需要监视工厂内机器的状态,并且在机器出现故障时能够迅速识别问题。
-
Military and intelligence systems need to track the activities of a potential aggressor, and raise the alarm if there are signs of an attack.
军事和情报系统需要跟踪潜在敌人的活动,并在发现攻击迹象时发出警报。
These kinds of applications require quite sophisticated pattern matching and correlations. However, other uses of stream processing have also emerged over time. In this section we will briefly compare and contrast some of these applications.
这种应用需要相当复杂的模式匹配和相关性。然而,随着时间的推移,流处理的其他用途也不断出现。在本节中,我们将简要比较和对比这些应用。
Complex event processing
Complex event processing (CEP) is an approach developed in the 1990s for analyzing event streams, especially geared toward the kind of application that requires searching for certain event patterns [ 65 , 66 ]. Similarly to the way that a regular expression allows you to search for certain patterns of characters in a string, CEP allows you to specify rules to search for certain patterns of events in a stream.
复杂事件处理(CEP)是开发于1990年代的一种方法,用于分析事件流,特别适用于需要搜索特定事件模式的应用程序[65,66]。类似于正则表达式允许您在字符串中搜索特定模式的字符的方式,CEP允许您指定规则搜索事件流中的某些事件模式。
CEP systems often use a high-level declarative query language like SQL, or a graphical user interface, to describe the patterns of events that should be detected. These queries are submitted to a processing engine that consumes the input streams and internally maintains a state machine that performs the required matching. When a match is found, the engine emits a complex event (hence the name) with the details of the event pattern that was detected [ 67 ].
CEP系统通常使用高级声明性查询语言(如SQL)或图形用户界面来描述应检测到的事件模式。这些查询将提交给处理引擎,该引擎消耗输入流并内部维护状态机以执行所需的匹配。当找到匹配时,引擎发出一个带有被检测到的事件模式细节的复杂事件(因此得名)。
In these systems, the relationship between queries and data is reversed compared to normal databases. Usually, a database stores data persistently and treats queries as transient: when a query comes in, the database searches for data matching the query, and then forgets about the query when it has finished. CEP engines reverse these roles: queries are stored long-term, and events from the input streams continuously flow past them in search of a query that matches an event pattern [ 68 ].
在这些系统中,与普通数据库相比,查询和数据之间的关系被颠倒了。通常,数据库会持久地存储数据,并将查询视为短暂的:当有查询时,数据库将搜索与查询匹配的数据,然后在完成后就忘记了查询。CEP引擎颠倒了这些角色:查询被长期存储,而来自输入流的事件不断流过它们,以寻找与事件模式匹配的查询[68]。
Implementations of CEP include Esper [ 69 ], IBM InfoSphere Streams [ 70 ], Apama, TIBCO StreamBase, and SQLstream. Distributed stream processors like Samza are also gaining SQL support for declarative queries on streams [ 71 ].
CEP 的实现包括 Esper [69],IBM InfoSphere Streams [70],Apama,TIBCO StreamBase 和 SQLstream。分布式流处理器像 Samza 也在逐渐获得 SQL 对流查询的支持 [71]。
Stream analytics
Another area in which stream processing is used is for analytics on streams. The boundary between CEP and stream analytics is blurry, but as a general rule, analytics tends to be less interested in finding specific event sequences and is more oriented toward aggregations and statistical metrics over a large number of events—for example:
流处理使用的另一个领域是流分析。 CEP 和流分析之间的界限模糊不清,但一般规则是分析倾向于不关心特定事件序列,而更倾向于对大量事件进行聚合和统计度量。例如:
-
Measuring the rate of some type of event (how often it occurs per time interval)
测量某种类型事件的频率(每个时间间隔发生的频率)
-
Calculating the rolling average of a value over some time period
计算某一时期内某个值的滚动平均值。
-
Comparing current statistics to previous time intervals (e.g., to detect trends or to alert on metrics that are unusually high or low compared to the same time last week)
将当前统计数据与以前的时间间隔进行比较(例如,以便检测趋势或警报指标是否与上周同期明显偏高或偏低)。
Such statistics are usually computed over fixed time intervals—for example, you might want to know the average number of queries per second to a service over the last 5 minutes, and their 99th percentile response time during that period. Averaging over a few minutes smoothes out irrelevant fluctuations from one second to the next, while still giving you a timely picture of any changes in traffic pattern. The time interval over which you aggregate is known as a window , and we will look into windowing in more detail in “Reasoning About Time” .
这类统计通常是在固定的时间间隔内计算的——例如,您可能想知道过去5分钟内服务的平均每秒查询次数,以及在这段时间内它们的99th百分位响应时间。在几分钟内平均化可以平滑不相关的波动,同时仍能及时地给您展示任何流量模式的变化。您聚合的时间间隔称为窗口,在“时间推理”中详细探讨窗口。
Stream analytics systems sometimes use probabilistic algorithms, such as Bloom filters (which we encountered in “Performance optimizations” ) for set membership, HyperLogLog [ 72 ] for cardinality estimation, and various percentile estimation algorithms (see “Percentiles in Practice” ). Probabilistic algorithms produce approximate results, but have the advantage of requiring significantly less memory in the stream processor than exact algorithms. This use of approximation algorithms sometimes leads people to believe that stream processing systems are always lossy and inexact, but that is wrong: there is nothing inherently approximate about stream processing, and probabilistic algorithms are merely an optimization [ 73 ].
流分析系统有时使用概率算法,比如布隆过滤器(我们在“性能优化”中遇到)用于集合成员、HyperLogLog用于基数估计以及各种百分位估计算法(详见“百分位数实践”)。概率算法会产生近似结果,但其优点是需要远比精确算法更少的内存资源。这种运用近似算法有时会让人们认为流处理系统总是有损和不精确的,但这是错误的:流处理本质上并非近似,而概率算法只是一种优化。
Many open source distributed stream processing frameworks are designed with analytics in mind: for example, Apache Storm, Spark Streaming, Flink, Concord, Samza, and Kafka Streams [ 74 ]. Hosted services include Google Cloud Dataflow and Azure Stream Analytics.
许多开源分布式流处理框架都是以分析为重点设计的,例如Apache Storm、Spark Streaming、Flink、Concord、Samza和Kafka Streams [74]。托管服务包括Google Cloud Dataflow和Azure Stream Analytics。
Maintaining materialized views
We saw in “Databases and Streams” that a stream of changes to a database can be used to keep derived data systems, such as caches, search indexes, and data warehouses, up to date with a source database. We can regard these examples as specific cases of maintaining materialized views (see “Aggregation: Data Cubes and Materialized Views” ): deriving an alternative view onto some dataset so that you can query it efficiently, and updating that view whenever the underlying data changes [ 50 ].
在“数据库和流”中,我们看到可以使用数据库的更改流来保持派生数据系统(例如缓存、搜索索引和数据仓库)与源数据库保持最新。我们可以将这些示例视为维护物化视图的特定案例(请参见“聚合:数据立方体和物化视图”):导出一些数据集的替代视图,以便您可以高效地查询它,并在基础数据更改时更新该视图[50]。
Similarly, in event sourcing, application state is maintained by applying a log of events; here the application state is also a kind of materialized view. Unlike stream analytics scenarios, it is usually not sufficient to consider only events within some time window: building the materialized view potentially requires all events over an arbitrary time period, apart from any obsolete events that may be discarded by log compaction (see “Log compaction” ). In effect, you need a window that stretches all the way back to the beginning of time.
在事件溯源中,通过应用一系列事件日志来维护应用状态;这里应用状态也是某种物化视图。与流分析场景不同的是,仅考虑某个时间窗口内的事件通常是不足够的:构建物化视图可能需要在任意时间段内获取所有事件,除了可能被日志压缩(参见“日志压缩”)丢弃的过时事件。实际上,您需要一个一直延伸到时间开始的窗口。
In principle, any stream processor could be used for materialized view maintenance, although the need to maintain events forever runs counter to the assumptions of some analytics-oriented frameworks that mostly operate on windows of a limited duration. Samza and Kafka Streams support this kind of usage, building upon Kafka’s support for log compaction [ 75 ].
原则上,任何流处理器都可以用于实时视图维护,尽管需要永久维护事件与一些面向分析的框架的假设相矛盾,这些框架主要基于有限持续时间的窗口操作。Samza和Kafka Streams支持这种使用方式,基于Kafka的日志压缩支持 [75]。
Search on streams
Besides CEP, which allows searching for patterns consisting of multiple events, there is also sometimes a need to search for individual events based on complex criteria, such as full-text search queries.
除了CEP,它允许搜索由多个事件组成的模式,有时还需要根据复杂条件搜索个别事件,例如全文搜索查询。
For example, media monitoring services subscribe to feeds of news articles and broadcasts from media outlets, and search for any news mentioning companies, products, or topics of interest. This is done by formulating a search query in advance, and then continually matching the stream of news items against this query. Similar features exist on some websites: for example, users of real estate websites can ask to be notified when a new property matching their search criteria appears on the market. The percolator feature of Elasticsearch [ 76 ] is one option for implementing this kind of stream search.
例如,媒体监控服务订阅来自媒体机构的新闻文章和广播,搜索任何涉及感兴趣的公司、产品或主题的新闻。这是通过预先制定搜索查询,然后不断将新闻流与此查询进行匹配来完成的。一些网站上存在类似的功能:例如,房地产网站的用户可以要求在市场上出现符合其搜索标准的新房产时收到通知。 Elasticsearch 的 percolator 功能是实现这种流搜索的选项之一。
Conventional search engines first index the documents and then run queries over the index. By contrast, searching a stream turns the processing on its head: the queries are stored, and the documents run past the queries, like in CEP. In the simplest case, you can test every document against every query, although this can get slow if you have a large number of queries. To optimize the process, it is possible to index the queries as well as the documents, and thus narrow down the set of queries that may match [ 77 ].
传统搜索引擎首先索引文档,然后在索引上运行查询。相比之下,搜索流程将处理颠倒过来:将查询存储,然后文档通过查询运行,就像在CEP中一样。在最简单的情况下,可以对每个文档进行每个查询的测试,但如果有大量查询,则可能速度较慢。为优化这一过程,可以索引查询以及文档,从而缩小可能匹配的查询集合[77]。
Message passing and RPC
In “Message-Passing Dataflow” we discussed message-passing systems as an alternative to RPC—i.e., as a mechanism for services to communicate, as used for example in the actor model. Although these systems are also based on messages and events, we normally don’t think of them as stream processors:
在“消息传递数据流”中,我们讨论了消息传递系统作为RPC的一种替代方案——即作为服务之间通信的机制,例如在actor模型中使用。虽然这些系统也基于消息和事件,但我们通常不将它们视为流处理器:
-
Actor frameworks are primarily a mechanism for managing concurrency and distributed execution of communicating modules, whereas stream processing is primarily a data management technique.
演员框架主要是用于管理并发和分布式执行通信模块的机制,而流处理主要是一种数据管理技术。
-
Communication between actors is often ephemeral and one-to-one, whereas event logs are durable and multi-subscriber.
演员之间的沟通往往是短暂的一对一交流,而事件日志是持久的、可多次订阅的。
-
Actors can communicate in arbitrary ways (including cyclic request/response patterns), but stream processors are usually set up in acyclic pipelines where every stream is the output of one particular job, and derived from a well-defined set of input streams.
演员可以用任意方式进行沟通(包括循环的请求/响应模式),但流处理器通常设定为非循环的管道,其中每个流都是一个特定工作的输出,并来源于一个明确定义的输入流集合.
That said, there is some crossover area between RPC-like systems and stream processing. For example, Apache Storm has a feature called distributed RPC , which allows user queries to be farmed out to a set of nodes that also process event streams; these queries are then interleaved with events from the input streams, and results can be aggregated and sent back to the user [ 78 ]. (See also “Multi-partition data processing” .)
RPC式系统和流处理之间存在一些交叉区域。例如,Apache Storm具有名为“分布式RPC”的功能,允许将用户查询分配到一组同时处理事件流的节点;然后,这些查询将与输入流中的事件交错,并可聚合结果并发送回用户。请参见“多分区数据处理”。
It is also possible to process streams using actor frameworks. However, many such frameworks do not guarantee message delivery in the case of crashes, so the processing is not fault-tolerant unless you implement additional retry logic.
使用 actor 框架也可以处理数据流。然而,许多这样的框架在发生崩溃时无法保证消息传递,所以除非您实现了额外的重试逻辑,否则处理不具备容错能力。
Reasoning About Time
Stream processors often need to deal with time, especially when used for analytics purposes, which frequently use time windows such as “the average over the last five minutes.” It might seem that the meaning of “the last five minutes” should be unambiguous and clear, but unfortunately the notion is surprisingly tricky.
流处理器常常需要处理时间,特别是在用于分析目的时,这通常会使用时间窗口,例如“过去五分钟内的平均值”。这个概念似乎应该是明确和清晰的,但不幸的是,它的意义却令人惊讶地棘手。
In a batch process, the processing tasks rapidly crunch through a large collection of historical events. If some kind of breakdown by time needs to happen, the batch process needs to look at the timestamp embedded in each event. There is no point in looking at the system clock of the machine running the batch process, because the time at which the process is run has nothing to do with the time at which the events actually occurred.
在批处理过程中,处理任务快速地处理大量的历史事件。如果需要按时间进行分解,批处理需要查看每个事件中嵌入的时间戳。查看批处理运行的机器的系统时钟没有意义,因为处理运行的时间与事件实际发生的时间无关。
A batch process may read a year’s worth of historical events within a few minutes; in most cases, the timeline of interest is the year of history, not the few minutes of processing. Moreover, using the timestamps in the events allows the processing to be deterministic: running the same process again on the same input yields the same result (see “Fault tolerance” ).
批处理可以在几分钟内读取一年的历史事件;在大多数情况下,感兴趣的时间线是历史的一年,而不是几分钟的处理。此外,使用事件中的时间戳允许处理是确定性的:在相同的输入上再次运行相同的过程会产生相同的结果(参见“错误容忍”)。
On the other hand, many stream processing frameworks use the local system clock on the processing machine (the processing time ) to determine windowing [ 79 ]. This approach has the advantage of being simple, and it is reasonable if the delay between event creation and event processing is negligibly short. However, it breaks down if there is any significant processing lag—i.e., if the processing may happen noticeably later than the time at which the event actually occurred.
另一方面,许多流处理框架使用处理机上的本地系统时钟(处理时间)来确定窗口化[79]。这种方法具有简单的优点,如果事件创建和事件处理之间的延迟可以忽略不计的话是合理的。然而,如果存在任何显着的处理延迟,即如果处理可能明显晚于事件实际发生的时间,它将失效。
Event time versus processing time
There are many reasons why processing may be delayed: queueing, network faults (see “Unreliable Networks” ), a performance issue leading to contention in the message broker or processor, a restart of the stream consumer, or reprocessing of past events (see “Replaying old messages” ) while recovering from a fault or after fixing a bug in the code.
处理可能被延迟的原因有很多:排队,网络故障(参见“不可靠网络”),性能问题导致消息代理或处理器中的争用,流消费者的重启,或者在修复代码中的错误后从故障中恢复或重新处理过去的事件(参见“重新播放旧消息”)。
Moreover, message delays can also lead to unpredictable ordering of messages. For example, say a user first makes one web request (which is handled by web server A), and then a second request (which is handled by server B). A and B emit events describing the requests they handled, but B’s event reaches the message broker before A’s event does. Now stream processors will first see the B event and then the A event, even though they actually occurred in the opposite order.
此外,消息延迟还会导致消息的顺序变得不可预测。例如,一个用户首先请求一个网页(由服务器A处理),然后请求第二个网页(服务器B处理)。A和B都会发布关于它们处理的请求的事件,但是B的事件先到达消息代理,然后才是A的事件。现在,流处理器会先看到B的事件,然后再看到A的事件,尽管它们实际上是相反的顺序发生的。
If it helps to have an analogy, consider the Star Wars movies: Episode IV was released in 1977, Episode V in 1980, and Episode VI in 1983, followed by Episodes I, II, and III in 1999, 2002, and 2005, respectively, and Episode VII in 2015 [ 80 ]. ii If you watched the movies in the order they came out, the order in which you processed the movies is inconsistent with the order of their narrative. (The episode number is like the event timestamp, and the date when you watched the movie is the processing time.) As humans, we are able to cope with such discontinuities, but stream processing algorithms need to be specifically written to accommodate such timing and ordering issues.
如果需要一个类比的话,可以考虑《星球大战》电影系列:第四集在1977年上映,第五集在1980年,第六集在1983年上映,接着是第一集、第二集和第三集,在1999年、2002年和2005年上映,最后是第七集在2015年上映[80]。如果您按照它们上映的顺序观看电影,那么您处理电影的顺序与它们叙述的顺序是不一致的。(剧集编号就像事件时间戳,您观看电影的日期就是处理时间。)作为人类,我们能够应对这种不连贯性,但流处理算法需要专门编写以适应这种时间和顺序问题。
Confusing event time and processing time leads to bad data. For example, say you have a stream processor that measures the rate of requests (counting the number of requests per second). If you redeploy the stream processor, it may be shut down for a minute and process the backlog of events when it comes back up. If you measure the rate based on the processing time, it will look as if there was a sudden anomalous spike of requests while processing the backlog, when in fact the real rate of requests was steady ( Figure 11-7 ).
混淆事件时间和处理时间会导致错误的数据。例如,假设你有一个流处理器,用于测量请求的速率(每秒请求的数量)。如果你重新部署流处理器,那么它可能会被关闭一分钟,并在重新启动后处理事件的积压。如果你根据处理时间来测量速率,则会看起来似乎请求的突然异常峰值,但实际上请求的实际速率是稳定的(图11-7)。
Knowing when you’re ready
A tricky problem when defining windows in terms of event time is that you can never be sure when you have received all of the events for a particular window, or whether there are some events still to come.
定义事件时间窗口时的一个棘手问题在于无法确定何时已接收到某个窗口的所有事件,或者是否还有一些事件需要到来。
For example, say you’re grouping events into one-minute windows so that you can count the number of requests per minute. You have counted some number of events with timestamps that fall in the 37th minute of the hour, and time has moved on; now most of the incoming events fall within the 38th and 39th minutes of the hour. When do you declare that you have finished the window for the 37th minute, and output its counter value?
例如,假设您正在将事件分组到每分钟窗口中,以便可以计算每分钟的请求数量。您已经计算出一些时间戳位于小时的第37分钟内的事件数量,但时间已经过去了。现在,大部分传入事件都在小时的第38和39分钟内。您应该在何时声明已完成第37分钟的窗口,并输出其计数器值?
You can time out and declare a window ready after you have not seen any new events for a while, but it could still happen that some events were buffered on another machine somewhere, delayed due to a network interruption. You need to be able to handle such straggler events that arrive after the window has already been declared complete. Broadly, you have two options [ 1 ]:
在一段时间内没有观察到任何新事件,可以超时且声明窗口已准备好,但仍可能存在某些事件在其他机器上被缓存或由于网络中断而延迟。您需要能够处理在窗口已经被声明为完成后到达的这种拖延事件。一般而言,您有两个选择[1]:
-
Ignore the straggler events, as they are probably a small percentage of events in normal circumstances. You can track the number of dropped events as a metric, and alert if you start dropping a significant amount of data.
忽略那些迷路的事件,因为它们在正常情况下可能只占事件的一小部分。你可以把丢失的事件数作为指标进行跟踪,并在你开始丢失大量数据时发出警报。
-
Publish a correction , an updated value for the window with stragglers included. You may also need to retract the previous output.
发布一份更正的通知,包括剩余者在内的更新数值。您可能需要撤回以前的输出。
In some cases it is possible to use a special message to indicate, “From now on there will be no more messages with a timestamp earlier than t ,” which can be used by consumers to trigger windows [ 81 ]. However, if several producers on different machines are generating events, each with their own minimum timestamp thresholds, the consumers need to keep track of each producer individually. Adding and removing producers is trickier in this case.
有些情况下,可以使用特殊信息来表示“从现在开始,将不再有比t更早的时间戳的信息”,消费者可以使用这些信息来触发窗口[81]。然而,如果有多个位于不同机器上生成事件的生产者,每个生产者都有自己的最小时间戳阈值,消费者需要单独跟踪每个生产者。在这种情况下,添加和删除生产者更加棘手。
Whose clock are you using, anyway?
Assigning timestamps to events is even more difficult when events can be buffered at several points in the system. For example, consider a mobile app that reports events for usage metrics to a server. The app may be used while the device is offline, in which case it will buffer events locally on the device and send them to a server when an internet connection is next available (which may be hours or even days later). To any consumers of this stream, the events will appear as extremely delayed stragglers.
在事件可以被缓冲在系统的几个点之后分配时间戳变得更加困难。例如,考虑一个移动应用程序,它向服务器报告用于使用度量的事件。在设备离线的情况下,应用程序可能被使用,并且在设备上缓冲事件,并在下次可用的互联网连接(可能是几小时甚至几天后)将它们发送到服务器。对于这个流的任何消费者来说,事件将显得非常迟滞。
In this context, the timestamp on the events should really be the time at which the user interaction occurred, according to the mobile device’s local clock. However, the clock on a user-controlled device often cannot be trusted, as it may be accidentally or deliberately set to the wrong time (see “Clock Synchronization and Accuracy” ). The time at which the event was received by the server (according to the server’s clock) is more likely to be accurate, since the server is under your control, but less meaningful in terms of describing the user interaction.
在这种情况下,事件的时间戳应该真正是用户交互发生时的时间,根据移动设备的本地时钟。然而,用户控制的设备上的时钟往往不能被信任,因为它可能会被意外或故意设置为错误的时间(请参见“时钟同步和精度”)。根据服务器时钟接收事件的时间更有可能是准确的,因为服务器是在你的控制之下,但从描述用户交互的角度来看,意义较小。
To adjust for incorrect device clocks, one approach is to log three timestamps [ 82 ]:
调整不正确的设备时钟的一种方法是记录三个时间戳 [82]:
-
The time at which the event occurred, according to the device clock
事件发生的时间,根据设备时钟。
-
The time at which the event was sent to the server, according to the device clock
根据设备时钟发送事件的时间。
-
The time at which the event was received by the server, according to the server clock
服务器接收该事件的时间,根据服务器时钟显示。
By subtracting the second timestamp from the third, you can estimate the offset between the device clock and the server clock (assuming the network delay is negligible compared to the required timestamp accuracy). You can then apply that offset to the event timestamp, and thus estimate the true time at which the event actually occurred (assuming the device clock offset did not change between the time the event occurred and the time it was sent to the server).
通过从第三个时间戳中减去第二个时间戳,可以估计设备时钟和服务器时钟之间的偏移量(假设网络延迟与所需时间戳精度相比可以忽略不计)。然后,可以将该偏移量应用于事件时间戳,从而估计事件实际发生的真正时间(假设设备时钟偏移量在该事件发生时和发送到服务器时没有发生变化)。
This problem is not unique to stream processing—batch processing suffers from exactly the same issues of reasoning about time. It is just more noticeable in a streaming context, where we are more aware of the passage of time.
这个问题不仅仅是流处理所独有的——批量处理在关于时间的推断方面也有完全相同的问题。只是在流处理环境中更加显著,因为我们更加注意时间的流逝。
Types of windows
Once you know how the timestamp of an event should be determined, the next step is to decide how windows over time periods should be defined. The window can then be used for aggregations, for example to count events, or to calculate the average of values within the window. Several types of windows are in common use [ 79 , 83 ]:
一旦确定了事件的时间戳应如何确定,下一步就是决定如何定义时间段内的窗口。接下来可以使用窗口进行聚合,例如计算事件次数或计算窗口内数值的平均值。常用的几种窗口类型包括[79,83]:
- Tumbling window
-
A tumbling window has a fixed length, and every event belongs to exactly one window. For example, if you have a 1-minute tumbling window, all the events with timestamps between 10:03:00 and 10:03:59 are grouped into one window, events between 10:04:00 and 10:04:59 into the next window, and so on. You could implement a 1-minute tumbling window by taking each event timestamp and rounding it down to the nearest minute to determine the window that it belongs to.
滚动窗口具有固定长度,每个事件都属于恰好一个窗口。例如,如果您有一个1分钟的滚动窗口,则所有时间戳介于10:03:00和10:03:59之间的事件都分组为一个窗口,介于10:04:00和10:04:59之间的事件则分为下一个窗口,以此类推。您可以通过将每个事件时间戳向下舍入到最接近的一分钟来实现1分钟的滚动窗口。
- Hopping window
-
A hopping window also has a fixed length, but allows windows to overlap in order to provide some smoothing. For example, a 5-minute window with a hop size of 1 minute would contain the events between 10:03:00 and 10:07:59, then the next window would cover events between 10:04:00 and 10:08:59, and so on. You can implement this hopping window by first calculating 1-minute tumbling windows, and then aggregating over several adjacent windows.
跳跃窗口也有固定的长度,但允许窗口重叠以提供一些平滑。例如,带有1分钟跳跃大小的5分钟窗口将包含10:03:00至10:07:59之间的事件,然后下一个窗口将覆盖10:04:00至10:08:59之间的事件,以此类推。您可以通过先计算1分钟滚动窗口,然后聚合多个相邻窗口来实现此跳跃窗口。
- Sliding window
-
A sliding window contains all the events that occur within some interval of each other. For example, a 5-minute sliding window would cover events at 10:03:39 and 10:08:12, because they are less than 5 minutes apart (note that tumbling and hopping 5-minute windows would not have put these two events in the same window, as they use fixed boundaries). A sliding window can be implemented by keeping a buffer of events sorted by time and removing old events when they expire from the window.
滑动窗口包含在某段时间内发生的所有事件。例如,5分钟的滑动窗口将包括在10:03:39和10:08:12发生的事件,因为它们相隔不到5分钟(请注意,滚动和跳跃的5分钟窗口不会将这两个事件放在同一个窗口中,因为它们使用固定边界)。滑动窗口可以通过保留按时间排序的事件缓冲区来实现,并在它们从窗口中过期时删除旧事件。
- Session window
-
Unlike the other window types, a session window has no fixed duration. Instead, it is defined by grouping together all events for the same user that occur closely together in time, and the window ends when the user has been inactive for some time (for example, if there have been no events for 30 minutes). Sessionization is a common requirement for website analytics (see “GROUP BY” ).
与其他窗口类型不同,会话窗口没有固定的持续时间。相反,它是通过将同一用户在时间上紧密联系的所有事件组合在一起来定义的,当用户一段时间内处于非活动状态时窗口就会结束(例如,如果30分钟没有出现任何事件)。会话化是网站分析的常见要求(请参见“GROUP BY”)。
Stream Joins
In Chapter 10 we discussed how batch jobs can join datasets by key, and how such joins form an important part of data pipelines. Since stream processing generalizes data pipelines to incremental processing of unbounded datasets, there is exactly the same need for joins on streams.
在第10章中,我们讨论了批处理作业如何通过键值连接数据集,以及这些连接如何构成数据管道的重要部分。由于流处理将数据管道广义化为无限增量处理数据集,因此流上的连接具有完全相同的需求。
However, the fact that new events can appear anytime on a stream makes joins on streams more challenging than in batch jobs. To understand the situation better, let’s distinguish three different types of joins: stream-stream joins, stream-table joins, and table-table joins [ 84 ]. In the following sections we’ll illustrate each by example.
然而,流上随时可以出现新事件的事实,使得流上的连接比批处理作业更具挑战性。为了更好地理解情况,让我们区分三种不同的连接类型:流-流连接、流-表连接和表-表连接[84]。在接下来的章节中,我们将以示例说明每种类型。
Stream-stream join (window join)
Say you have a search feature on your website, and you want to detect recent trends in searched-for URLs. Every time someone types a search query, you log an event containing the query and the results returned. Every time someone clicks one of the search results, you log another event recording the click. In order to calculate the click-through rate for each URL in the search results, you need to bring together the events for the search action and the click action, which are connected by having the same session ID. Similar analyses are needed in advertising systems [ 85 ].
假设您的网站有一个搜索功能,并且您想要检测最近搜索的URL趋势。每当有人键入搜索查询时,您记录一个包含查询和返回结果的事件。每当有人点击搜索结果之一时,您记录另一个事件,记录点击情况。为了计算搜索结果中每个URL的点击率,您需要将搜索操作和点击操作的事件结合起来,它们通过具有相同会话ID来连接。类似的分析也需要在广告系统中进行。
The click may never come if the user abandons their search, and even if it comes, the time between the search and the click may be highly variable: in many cases it might be a few seconds, but it could be as long as days or weeks (if a user runs a search, forgets about that browser tab, and then returns to the tab and clicks a result sometime later). Due to variable network delays, the click event may even arrive before the search event. You can choose a suitable window for the join—for example, you may choose to join a click with a search if they occur at most one hour apart.
如果用户放弃搜索,那么单击可能永远不会出现,即使它出现了,在搜索和单击之间的时间也可能高度变化:在许多情况下可能只有几秒钟,但它也可能长达数天或数周(如果用户运行 搜索,忘记了浏览器选项卡,然后稍后返回选项卡并单击结果)。由于网络延迟的不确定性,单击事件甚至可能在搜索事件之前到达。您可以选择合适的窗口进行连接-例如,如果它们在最多一小时内发生,则可以选择将单击与搜索连接起来。
Note that embedding the details of the search in the click event is not equivalent to joining the events: doing so would only tell you about the cases where the user clicked a search result, not about the searches where the user did not click any of the results. In order to measure search quality, you need accurate click-through rates, for which you need both the search events and the click events.
请注意,在单击事件中嵌入搜索详细信息并不等同于加入事件:这样做只会告诉您用户单击搜索结果的情况,而不是用户没有单击任何结果的搜索情况。为了测量搜索质量,您需要精确的点击率,而这需要搜索事件和单击事件两者都需要。
To implement this type of join, a stream processor needs to maintain state : for example, all the events that occurred in the last hour, indexed by session ID. Whenever a search event or click event occurs, it is added to the appropriate index, and the stream processor also checks the other index to see if another event for the same session ID has already arrived. If there is a matching event, you emit an event saying which search result was clicked. If the search event expires without you seeing a matching click event, you emit an event saying which search results were not clicked.
为实现此类连接,流处理器需要维护状态:例如,所有最近一小时发生的事件,按会话 ID 索引。每当搜索事件或点击事件发生时,它将被添加到相应的索引中,流处理器也将检查其他索引,以查看是否已经到达了与相同会话 ID 的另一个事件。如果存在匹配的事件,则发出一条事件,说明哪个搜索结果已被点击。如果搜索事件在您看不到匹配的点击事件的情况下过期,则发出一条事件,说明哪些搜索结果未被点击。
Stream-table join (stream enrichment)
In “Example: analysis of user activity events” ( Figure 10-2 ) we saw an example of a batch job joining two datasets: a set of user activity events and a database of user profiles. It is natural to think of the user activity events as a stream, and to perform the same join on a continuous basis in a stream processor: the input is a stream of activity events containing a user ID, and the output is a stream of activity events in which the user ID has been augmented with profile information about the user. This process is sometimes known as enriching the activity events with information from the database.
在“示例:用户活动事件的分析”(图10-2)中,我们看到了一个批处理作业示例,在此作业中,两个数据集被合并:一组用户活动事件和一个用户配置文件数据库。自然而然的想法是将用户活动事件视为数据流,并在数据流处理器中持续执行相同的合并:输入是包含用户ID的活动事件流,输出是已经被用户配置文件信息扩充的活动事件流。这个过程有时被称为将活动事件与数据库中的信息进行丰富。
To perform this join, the stream process needs to look at one activity event at a time, look up the event’s user ID in the database, and add the profile information to the activity event. The database lookup could be implemented by querying a remote database; however, as discussed in “Example: analysis of user activity events” , such remote queries are likely to be slow and risk overloading the database [ 75 ].
为了执行这个连接操作,流处理需要逐一查看每个活动事件,从数据库中查找该事件的用户ID并将个人资料信息添加到活动事件中。数据库的查找可以通过查询远程数据库来实现;然而,正如在“示例:用户活动事件分析”中所讨论的,这样的远程查询很可能很慢,有过载数据库的风险[75]。
Another approach is to load a copy of the database into the stream processor so that it can be queried locally without a network round-trip. This technique is very similar to the hash joins we discussed in “Map-Side Joins” : the local copy of the database might be an in-memory hash table if it is small enough, or an index on the local disk.
另一种方法是将数据库副本加载到流处理器中,以便可以在本地查询,而不需要网络往返。这种技术非常类似于我们在“Map-Side Joins”中讨论的哈希连接:如果本地数据库的大小足够小,那么它的本地副本可能是在内存中的哈希表,否则可能是本地磁盘上的索引。
The difference to batch jobs is that a batch job uses a point-in-time snapshot of the database as input, whereas a stream processor is long-running, and the contents of the database are likely to change over time, so the stream processor’s local copy of the database needs to be kept up to date. This issue can be solved by change data capture: the stream processor can subscribe to a changelog of the user profile database as well as the stream of activity events. When a profile is created or modified, the stream processor updates its local copy. Thus, we obtain a join between two streams: the activity events and the profile updates.
批处理任务的区别在于,批处理任务使用数据库的某个时间点的快照作为输入,而流处理器是长时间运行的,数据库的内容随着时间会发生变化,因此流处理器的本地副本需要保持最新。这个问题可以通过变更数据捕获来解决:流处理器可以订阅用户配置文件数据库的变更日志以及活动事件流。当配置文件被创建或修改时,流处理器会更新其本地副本。因此,我们得到了两个流的连接:活动事件和配置文件更新。
A stream-table join is actually very similar to a stream-stream join; the biggest difference is that for the table changelog stream, the join uses a window that reaches back to the “beginning of time” (a conceptually infinite window), with newer versions of records overwriting older ones. For the stream input, the join might not maintain a window at all.
流表联接实际上与流流联接非常相似;最大的区别在于对于表changelog流,联接使用窗口回溯到“时间的开始”(概念上的无限窗口),以新版本的记录覆盖旧的记录。对于流输入,联接可能根本不维护窗口。
Table-table join (materialized view maintenance)
Consider the Twitter timeline example that we discussed in “Describing Load” . We said that when a user wants to view their home timeline, it is too expensive to iterate over all the people the user is following, find their recent tweets, and merge them.
考虑我们在“描述负载”中讨论过的Twitter时间线示例。我们说当用户想要查看他们的主页时间线时,遍历用户关注的所有人,找到他们最近的推文并合并它们是太昂贵了。
Instead, we want a timeline cache: a kind of per-user “inbox” to which tweets are written as they are sent, so that reading the timeline is a single lookup. Materializing and maintaining this cache requires the following event processing:
相反,我们想要一个时间轴缓存:一种每个用户“收件箱”,当推文被发送时写入该收件箱,这样阅读时间轴就是单个查找。实现和维护此缓存需要以下事件处理:
-
When user u sends a new tweet, it is added to the timeline of every user who is following u .
当用户 u 发布新推文时,它将被添加到所有正在关注用户 u 的用户的时间线中。
-
When a user deletes a tweet, it is removed from all users’ timelines.
当用户删除一条推文时,它会从所有用户的时间线上删除。
-
When user u 1 starts following user u 2 , recent tweets by u 2 are added to u 1 ’s timeline.
当用户 u1 开始关注用户 u2 时,u2 的最近推文将被添加到 u1 的时间线中。
-
When user u 1 unfollows user u 2 , tweets by u 2 are removed from u 1 ’s timeline.
当用户u1取消关注用户u2时,u2的推文会从u1的时间线中移除。
To implement this cache maintenance in a stream processor, you need streams of events for tweets (sending and deleting) and for follow relationships (following and unfollowing). The stream process needs to maintain a database containing the set of followers for each user so that it knows which timelines need to be updated when a new tweet arrives [ 86 ].
为了在流处理器中实现此缓存维护,您需要具有推文事件(发送和删除)和关注关系(关注和取消关注)的事件流。流处理过程需要维护一个包含每个用户的关注者集合的数据库,以便知道在新推文到达时需要更新哪些时间线。[86]。
Another way of looking at this stream process is that it maintains a materialized view for a query that joins two tables (tweets and follows), something like the following:
另一种看待这个流程的方式是它为一个连接两个表(tweets和follows)的查询维护了一个物化视图,类似于以下内容:
SELECT
follows
.
follower_id
AS
timeline_id
,
array_agg
(
tweets
.
*
ORDER
BY
tweets
.
timestamp
DESC
)
FROM
tweets
JOIN
follows
ON
follows
.
followee_id
=
tweets
.
sender_id
GROUP
BY
follows
.
follower_id
The join of the streams corresponds directly to the join of the tables in that query. The timelines are effectively a cache of the result of this query, updated every time the underlying tables change. iii
流的联接直接对应于查询中表的联接。时间轴实际上是这个查询结果的缓存,在基础表变化时每次更新。
Time-dependence of joins
The three types of joins described here (stream-stream, stream-table, and table-table) have a lot in common: they all require the stream processor to maintain some state (search and click events, user profiles, or follower list) based on one join input, and query that state on messages from the other join input.
这里描述的三种连接类型(流-流、流-表和表-表)有许多共同点:它们都需要流处理器基于一个连接输入维护一些状态(搜索和点击事件、用户资料或关注者列表),并查询来自其他连接输入的信息的状态。
The order of the events that maintain the state is important (it matters whether you first follow and then unfollow, or the other way round). In a partitioned log, the ordering of events within a single partition is preserved, but there is typically no ordering guarantee across different streams or partitions.
维护状态的事件顺序很重要(首先关注再取消或者相反,顺序有影响)。在分区日志中,单个分区内的事件顺序被保留,但通常没有跨不同流或分区的顺序保证。
This raises a question: if events on different streams happen around a similar time, in which order are they processed? In the stream-table join example, if a user updates their profile, which activity events are joined with the old profile (processed before the profile update), and which are joined with the new profile (processed after the profile update)? Put another way: if state changes over time, and you join with some state, what point in time do you use for the join [ 45 ]?
这引发了一个问题:如果不同数据流上的事件在相近的时间内发生,它们的处理顺序是什么?在流表联接的例子中,如果用户更新他们的个人资料,哪些活动事件将与旧资料一起联接(在资料更新之前处理),哪些事件将与新资料一起联接(在资料更新之后处理)?换句话说:如果状态随时间变化,并且您与某些状态联接,那么对于联接使用哪个时间点呢?[45]
Such time dependence can occur in many places. For example, if you sell things, you need to apply the right tax rate to invoices, which depends on the country or state, the type of product, and the date of sale (since tax rates change from time to time). When joining sales to a table of tax rates, you probably want to join with the tax rate at the time of the sale, which may be different from the current tax rate if you are reprocessing historical data.
这种时间上的依赖性可能发生在很多地方。例如,如果你销售产品,你需要将适当的税率应用于发票上,这取决于国家或州、产品类型以及销售日期(因为税率会随着时间变化而变化)。当将销售与税率表结合时,你可能想要使用销售时的税率进行结合,这可能与当前的税率不同,特别是当你重新处理历史数据时。
If the ordering of events across streams is undetermined, the join becomes nondeterministic [ 87 ], which means you cannot rerun the same job on the same input and necessarily get the same result: the events on the input streams may be interleaved in a different way when you run the job again.
如果跨流事件的排序是不确定的,则连接将变得非确定性,这意味着您不能在相同的输入上重新运行相同的作业并且必须得到相同的结果:当您再次运行作业时,输入流中的事件可能以不同的方式交织。
In data warehouses, this issue is known as a slowly changing dimension (SCD), and it is often addressed by using a unique identifier for a particular version of the joined record: for example, every time the tax rate changes, it is given a new identifier, and the invoice includes the identifier for the tax rate at the time of sale [ 88 , 89 ]. This change makes the join deterministic, but has the consequence that log compaction is not possible, since all versions of the records in the table need to be retained.
在数据仓库中,这个问题被称为逐渐变化的维度(SCD),通常通过为特定版本的连接记录使用唯一标识符来解决。例如,每当税率发生变化,就会赋予一个新的标识符,并且发票包括销售时税率的标识符[88, 89]。这种改变使得连接是确定性的,但是有一个后果,即日志压缩是不可能的,因为表中所有版本的记录都需要被保留。
Fault Tolerance
In the final section of this chapter, let’s consider how stream processors can tolerate faults. We saw in Chapter 10 that batch processing frameworks can tolerate faults fairly easily: if a task in a MapReduce job fails, it can simply be started again on another machine, and the output of the failed task is discarded. This transparent retry is possible because input files are immutable, each task writes its output to a separate file on HDFS, and output is only made visible when a task completes successfully.
在本章的最后一节中,让我们考虑流处理器如何容忍故障。我们在第10章中看到,批处理框架可以相对容易地容错:如果MapReduce作业中的任务失败,它可以简单地在另一台机器上重新启动,失败任务的输出将被丢弃。这种透明的重试是可能的,因为输入文件是不可变的,每个任务将其输出写入HDFS上的单独文件中,只有在任务成功完成时才会使输出可见。
In particular, the batch approach to fault tolerance ensures that the output of the batch job is the same as if nothing had gone wrong, even if in fact some tasks did fail. It appears as though every input record was processed exactly once—no records are skipped, and none are processed twice. Although restarting tasks means that records may in fact be processed multiple times, the visible effect in the output is as if they had only been processed once. This principle is known as exactly-once semantics , although effectively-once would be a more descriptive term [ 90 ].
特别是,批处理容错方法确保与未发生故障相同的批处理任务输出结果,即使某些任务实际上出现故障。似乎每个输入记录都被处理了一次,没有跳过记录,也没有重复处理记录。尽管重新启动任务可能意味着实际上记录可能被处理多次,但在输出中的可见效果就像它们只被处理了一次一样。这个原则被称为精确一次语义,然而,有效一次可能是一个更描述性的术语。[90]。
The same issue of fault tolerance arises in stream processing, but it is less straightforward to handle: waiting until a task is finished before making its output visible is not an option, because a stream is infinite and so you can never finish processing it.
在流处理中也会出现相同的容错问题,但处理起来要复杂一些:等待任务完成后再显示其输出不是一个选项,因为流是无限的,所以你永远无法完成对其的处理。
Microbatching and checkpointing
One solution is to break the stream into small blocks, and treat each block like a miniature batch process. This approach is called microbatching , and it is used in Spark Streaming [ 91 ]. The batch size is typically around one second, which is the result of a performance compromise: smaller batches incur greater scheduling and coordination overhead, while larger batches mean a longer delay before results of the stream processor become visible.
一种解决方案是将流分成小块,将每个块作为小批量处理。这种方法称为微批处理,它在Spark Streaming [91]中使用。批量大小通常约为一秒钟,这是性能折衷的结果:较小批次会产生更大的调度和协调开销,而较大批次意味着流处理器结果变得可见之前需要更长的延迟。
Microbatching also implicitly provides a tumbling window equal to the batch size (windowed by processing time, not event timestamps); any jobs that require larger windows need to explicitly carry over state from one microbatch to the next.
微批处理还隐含提供一个等于批量大小的滚动窗口(按处理时间而非事件时间戳进行窗口化);任何需要较大窗口的作业都需要显式地从一个微批处理传递状态到下一个。
A variant approach, used in Apache Flink, is to periodically generate rolling checkpoints of state and write them to durable storage [ 92 , 93 ]. If a stream operator crashes, it can restart from its most recent checkpoint and discard any output generated between the last checkpoint and the crash. The checkpoints are triggered by barriers in the message stream, similar to the boundaries between microbatches, but without forcing a particular window size.
另一种方法是在Apache Flink中使用周期性滚动检查点来存储状态,并将它们写入持久性存储 [92,93]。如果流操作员崩溃,则可以从其最近的检查点重新启动,并丢弃在最后一个检查点和崩溃之间生成的任何输出。检查点是由消息流中的屏障触发的,类似于微批次之间的边界,但不强制任何窗口大小。
Within the confines of the stream processing framework, the microbatching and checkpointing approaches provide the same exactly-once semantics as batch processing. However, as soon as output leaves the stream processor (for example, by writing to a database, sending messages to an external message broker, or sending emails), the framework is no longer able to discard the output of a failed batch. In this case, restarting a failed task causes the external side effect to happen twice, and microbatching or checkpointing alone is not sufficient to prevent this problem.
在流处理框架的范围内,微批处理和检查点方案提供了与批处理完全相同的仅一次语义。但是,一旦输出离开流处理器(例如,写入数据库、发送消息到外部消息代理或发送电子邮件),框架就无法丢弃由于失败的批处理而产生的输出。在这种情况下,重新启动失败的任务会导致外部副作用发生两次,仅仅使用微批处理或检查点方案是不足以解决这个问题的。
Atomic commit revisited
In order to give the appearance of exactly-once processing in the presence of faults, we need to ensure that all outputs and side effects of processing an event take effect if and only if the processing is successful. Those effects include any messages sent to downstream operators or external messaging systems (including email or push notifications), any database writes, any changes to operator state, and any acknowledgment of input messages (including moving the consumer offset forward in a log-based message broker).
为了在故障存在的情况下呈现出精确一次处理的外观,我们需要确保所有事件的处理所产生的输出和副作用只有在处理成功时才会生效。这些影响包括发送到下游操作器或外部消息系统(包括电子邮件或推送通知)的任何消息、任何数据库写入、任何操作器状态更改以及对输入消息的确认(包括将消费者偏移量向前移动在基于日志的消息代理中)。
Those things either all need to happen atomically, or none of them must happen, but they should not go out of sync with each other. If this approach sounds familiar, it is because we discussed it in “Exactly-once message processing” in the context of distributed transactions and two-phase commit.
这些事情要么必须同时发生,要么就不能发生,但它们不应该与彼此失去同步。如果这种方法听起来很熟悉,那是因为我们曾在分布式事务和两阶段提交的“恰好一次消息处理”中讨论过它。
In Chapter 9 we discussed the problems in the traditional implementations of distributed transactions, such as XA. However, in more restricted environments it is possible to implement such an atomic commit facility efficiently. This approach is used in Google Cloud Dataflow [ 81 , 92 ] and VoltDB [ 94 ], and there are plans to add similar features to Apache Kafka [ 95 , 96 ]. Unlike XA, these implementations do not attempt to provide transactions across heterogeneous technologies, but instead keep them internal by managing both state changes and messaging within the stream processing framework. The overhead of the transaction protocol can be amortized by processing several input messages within a single transaction.
第9章中,我们讨论了传统分布式事务(如XA)实现中存在的问题。然而,在更受限制的环境中,可以有效地实现这样的原子提交功能。Google Cloud Dataflow[81,92]和VoltDB [94]使用了这种方法,并计划在Apache Kafka [95,96] 中添加类似功能。与XA不同,这些实现不尝试通过异构技术提供事务,而是通过在流处理框架内管理状态更改和消息来保持它们的内部性。事务协议的开销可以通过在单个事务中处理多个输入消息而摊销。
Idempotence
Our goal is to discard the partial output of any failed tasks so that they can be safely retried without taking effect twice. Distributed transactions are one way of achieving that goal, but another way is to rely on idempotence [ 97 ].
我们的目标是丢弃任何失败任务的部分输出,以便可以安全地重试而不会产生两倍影响。分布式事务是实现这一目标的一种方式,但另一种方式是依靠幂等性。
An idempotent operation is one that you can perform multiple times, and it has the same effect as if you performed it only once. For example, setting a key in a key-value store to some fixed value is idempotent (writing the value again simply overwrites the value with an identical value), whereas incrementing a counter is not idempotent (performing the increment again means the value is incremented twice).
幂等操作是指你可以多次执行的操作,而它的效果和只执行一次一样。例如,在键值存储中将键设置为某个固定值是幂等的(再次写入相同的值只会将该值覆盖),而增加计数器不是幂等的(再次进行增加意味着值增加了两次)。
Even if an operation is not naturally idempotent, it can often be made idempotent with a bit of extra metadata. For example, when consuming messages from Kafka, every message has a persistent, monotonically increasing offset. When writing a value to an external database, you can include the offset of the message that triggered the last write with the value. Thus, you can tell whether an update has already been applied, and avoid performing the same update again.
即使一个操作本质上不是幂等的,加入一些额外的元数据也可以让它变为幂等的。例如,在从Kafka接收消息时,每个消息都有一个持久化的、单调递增的偏移量。当将一个值写入外部数据库时,可以将触发最后一次写入的消息的偏移量与该值一起包含在内。这样,就可以判断一个更新是否已经被应用,避免再次执行相同的更新。
The state handling in Storm’s Trident is based on a similar idea [ 78 ]. Relying on idempotence implies several assumptions: restarting a failed task must replay the same messages in the same order (a log-based message broker does this), the processing must be deterministic, and no other node may concurrently update the same value [ 98 , 99 ].
Storm的Trident中的状态处理基于类似的思路。依赖幂等性意味着有几个假设:如果重新启动失败的任务,则必须以相同的顺序再次播放相同的消息(基于日志的消息代理程序可以实现此操作),处理必须是确定性的,并且不能同时更新同一值的任何其他节点。
When failing over from one processing node to another, fencing may be required (see “The leader and the lock” ) to prevent interference from a node that is thought to be dead but is actually alive. Despite all those caveats, idempotent operations can be an effective way of achieving exactly-once semantics with only a small overhead.
当从一个处理节点故障转移至另一个处理节点时,可能需要进行隔离(请参见“领导者和锁定”),以防止来自被认为已死亡但实际上仍然活动的节点的干扰。尽管有所有这些警告,但幂等操作可以有效地实现精确一次性语义而只带来小的开销。
Rebuilding state after a failure
Any stream process that requires state—for example, any windowed aggregations (such as counters, averages, and histograms) and any tables and indexes used for joins—must ensure that this state can be recovered after a failure.
任何需要状态的流处理过程——例如任何窗口聚合(例如计数器、平均值和直方图)以及用于连接的任何表和索引——必须确保在故障后可以恢复此状态。
One option is to keep the state in a remote datastore and replicate it, although having to query a remote database for each individual message can be slow, as discussed in “Stream-table join (stream enrichment)” . An alternative is to keep state local to the stream processor, and replicate it periodically. Then, when the stream processor is recovering from a failure, the new task can read the replicated state and resume processing without data loss.
一种选择是将状态保持在远程数据仓库中并复制它,尽管每个消息都需要查询远程数据库可能会很慢,如“流-表连接(流增强)”中所讨论的那样。 另一种选择是将状态保持在流处理器本地并定期进行复制。然后,当流处理器恢复失败时,新任务可以读取复制的状态并在没有数据丢失的情况下恢复处理。
For example, Flink periodically captures snapshots of operator state and writes them to durable storage such as HDFS [ 92 , 93 ]; Samza and Kafka Streams replicate state changes by sending them to a dedicated Kafka topic with log compaction, similar to change data capture [ 84 , 100 ]. VoltDB replicates state by redundantly processing each input message on several nodes (see “Actual Serial Execution” ).
例如,Flink定期捕获操作状态的快照并将其写入持久存储,例如HDFS [92,93];Samza和Kafka Streams通过将状态更改发送到专用Kafka主题并进行日志压缩来复制状态,类似于更改数据捕获 [84,100]. VoltDB通过在多个节点上冗余处理每个输入消息来复制状态(参见“实际串行执行”)。
In some cases, it may not even be necessary to replicate the state, because it can be rebuilt from the input streams. For example, if the state consists of aggregations over a fairly short window, it may be fast enough to simply replay the input events corresponding to that window. If the state is a local replica of a database, maintained by change data capture, the database can also be rebuilt from the log-compacted change stream (see “Log compaction” ).
在某些情况下,甚至不必复制状态,因为它可以从输入流重建。例如,如果状态是在相当短的窗口期内聚合的,那么只需简单地重播与该窗口相对应的输入事件即可快速完成。如果状态是由变更数据捕获维护的数据库的本地副本,则数据库也可以从经过日志压缩的更改流中重建 (见“日志压缩”)。
However, all of these trade-offs depend on the performance characteristics of the underlying infrastructure: in some systems, network delay may be lower than disk access latency, and network bandwidth may be comparable to disk bandwidth. There is no universally ideal trade-off for all situations, and the merits of local versus remote state may also shift as storage and networking technologies evolve.
然而,所有这些权衡取决于底层基础设施的性能特征:在某些系统中,网络延迟可能低于磁盘访问延迟,并且网络带宽可能与磁盘带宽相当。对于所有情况,没有普遍理想的权衡,当存储和网络技术发展时,本地与远程状态的优点也可能会发生变化。
Summary
In this chapter we have discussed event streams, what purposes they serve, and how to process them. In some ways, stream processing is very much like the batch processing we discussed in Chapter 10 , but done continuously on unbounded (never-ending) streams rather than on a fixed-size input. From this perspective, message brokers and event logs serve as the streaming equivalent of a filesystem.
在本章中,我们讨论了事件流,它们的作用以及如何处理它们。从某些方面来看,流式处理非常类似于第10章中讨论的批处理,但是是在无限制(永不结束的)流上连续执行而不是在固定大小的输入上执行。从这个角度来看,消息代理和事件日志是文件系统的流式处理等效物。
We spent some time comparing two types of message brokers:
我们花了一些时间比较了两种消息中间件:
- AMQP/JMS-style message broker
-
The broker assigns individual messages to consumers, and consumers acknowledge individual messages when they have been successfully processed. Messages are deleted from the broker once they have been acknowledged. This approach is appropriate as an asynchronous form of RPC (see also “Message-Passing Dataflow” ), for example in a task queue, where the exact order of message processing is not important and where there is no need to go back and read old messages again after they have been processed.
经纪人将单个消息分配给消费者,并且消费者在成功处理单个消息后进行确认。一旦消息被确认,它将从经纪人中删除。这种方法适用于异步RPC的形式(也请参见“消息传递数据流”),例如在任务队列中,其中消息处理的确切顺序不重要,并且在处理完后没有必要回头读取旧消息。
- Log-based message broker
-
The broker assigns all messages in a partition to the same consumer node, and always delivers messages in the same order. Parallelism is achieved through partitioning, and consumers track their progress by checkpointing the offset of the last message they have processed. The broker retains messages on disk, so it is possible to jump back and reread old messages if necessary.
代理将一个分区中的所有消息分配给同一个消费节点,并始终按照相同的顺序传递消息。通过分区实现并行处理,消费者通过检查点记住它们已经处理的最后一条消息的偏移量来追踪进度。代理在磁盘上保留消息,因此如果需要,可以跳回并重新读取旧的消息。
The log-based approach has similarities to the replication logs found in databases (see Chapter 5 ) and log-structured storage engines (see Chapter 3 ). We saw that this approach is especially appropriate for stream processing systems that consume input streams and generate derived state or derived output streams.
基于日志的方法与数据库中的复制日志(参见第5章)和基于日志的存储引擎(参见第3章)相似。我们发现,这种方法特别适用于流处理系统,这些系统会消耗输入流并生成派生状态或派生输出流。
In terms of where streams come from, we discussed several possibilities: user activity events, sensors providing periodic readings, and data feeds (e.g., market data in finance) are naturally represented as streams. We saw that it can also be useful to think of the writes to a database as a stream: we can capture the changelog—i.e., the history of all changes made to a database—either implicitly through change data capture or explicitly through event sourcing. Log compaction allows the stream to retain a full copy of the contents of a database.
关于数据流的来源,我们讨论了几种可能性:用户活动事件、传感器提供的定期读数和数据源(如金融市场数据)自然地表示为数据流。我们发现,将对数据库的写操作视为数据流也是有用的:我们可以通过更改数据捕获变更日志,即数据库所有更改的历史记录,无论是通过更改数据捕获还是通过事件源显式捕获。日志压缩使数据流能够保留数据库内容的完整副本。
Representing databases as streams opens up powerful opportunities for integrating systems. You can keep derived data systems such as search indexes, caches, and analytics systems continually up to date by consuming the log of changes and applying them to the derived system. You can even build fresh views onto existing data by starting from scratch and consuming the log of changes from the beginning all the way to the present.
将数据库表示为流,为系统集成提供了强大的机会。通过消耗更改日志并将其应用于派生系统,您可以将派生数据系统(例如搜索索引、缓存和分析系统)保持最新状态。您甚至可以从头开始并在整个时间上消耗更改日志,以构建对现有数据的新视图。
The facilities for maintaining state as streams and replaying messages are also the basis for the techniques that enable stream joins and fault tolerance in various stream processing frameworks. We discussed several purposes of stream processing, including searching for event patterns (complex event processing), computing windowed aggregations (stream analytics), and keeping derived data systems up to date (materialized views).
状态维护流和重播消息的设施也是实现流连接和各种流处理框架中的容错技术的基础。我们讨论了流处理的几个目的,包括搜索事件模式(复杂事件处理)、计算窗口聚合(流分析)以及保持派生数据系统最新状态(实体视图)。
We then discussed the difficulties of reasoning about time in a stream processor, including the distinction between processing time and event timestamps, and the problem of dealing with straggler events that arrive after you thought your window was complete.
我们随后讨论了在流式处理器中推理时间的困难,包括处理时间和事件时间戳之间的区别,以及解决晚到的事件,这些事件在您认为窗口已经完成后到达的问题。
We distinguished three types of joins that may appear in stream processes:
我们区分了在数据流处理中可能出现的三种连接类型:
- Stream-stream joins
-
Both input streams consist of activity events, and the join operator searches for related events that occur within some window of time. For example, it may match two actions taken by the same user within 30 minutes of each other. The two join inputs may in fact be the same stream (a self-join ) if you want to find related events within that one stream.
两个输入流都包含活动事件,连接操作符搜索在某个时间窗口内发生的相关事件。例如,它可以匹配同一用户在30分钟内执行的两个操作。如果您想在一个流中找到相关的事件,则两个连接输入实际上可以是同一个流(自连接)。
- Stream-table joins
-
One input stream consists of activity events, while the other is a database changelog. The changelog keeps a local copy of the database up to date. For each activity event, the join operator queries the database and outputs an enriched activity event.
一个输入流由行为事件组成,而另一个是数据库变更日志。变更日志保持数据库的本地副本是最新的。对于每个行为事件,联接操作符会查询数据库并输出丰富的行为事件。
- Table-table joins
-
Both input streams are database changelogs. In this case, every change on one side is joined with the latest state of the other side. The result is a stream of changes to the materialized view of the join between the two tables.
两个输入流都是数据库更改日志。在这种情况下,每个一侧的更改都与另一侧的最新状态连接。结果是两个表之间连接的物化视图的更改流。
Finally, we discussed techniques for achieving fault tolerance and exactly-once semantics in a stream processor. As with batch processing, we need to discard the partial output of any failed tasks. However, since a stream process is long-running and produces output continuously, we can’t simply discard all output. Instead, a finer-grained recovery mechanism can be used, based on microbatching, checkpointing, transactions, or idempotent writes.
最终,我们讨论了在流式处理器中实现容错和精确一次语义的技术。与批处理一样,我们需要丢弃任何失败任务的部分输出。然而,由于流式处理进程长时间运行且持续产生输出,我们无法简单地丢弃所有输出。取而代之的是,可以使用基于微批处理、检查点、事务或幂等写入的更精细的恢复机制。
Footnotes
i It’s possible to create a load balancing scheme in which two consumers share the work of processing a partition by having both read the full set of messages, but one of them only considers messages with even-numbered offsets while the other deals with the odd-numbered offsets. Alternatively, you could spread message processing over a thread pool, but that approach complicates consumer offset management. In general, single-threaded processing of a partition is preferable, and parallelism can be increased by using more partitions.
可以创建一个负载均衡方案,其中两个消费者共享处理分区的工作,同时读取完整的消息集,但其中一个仅考虑偶数偏移量的消息,而另一个则处理奇数偏移量的消息。或者,您可以将消息处理分散在线程池中,但这种方法会复杂化消费者偏移量管理。一般而言,在分区上进行单线程处理更为优选,通过使用更多分区来增加并行性。
ii Thank you to Kostas Kloudas from the Flink community for coming up with this analogy.
感谢 Flink 社区的 Kostas Kloudas 提供这个比喻。
iii If you regard a stream as the derivative of a table, as in Figure 11-6 , and regard a join as a product of two tables u·v , something interesting happens: the stream of changes to the materialized join follows the product rule ( u·v )′ = u ′ v + uv ′. In words: any change of tweets is joined with the current followers, and any change of followers is joined with the current tweets [ 49 , 50 ].
如果您将流视为表格的导数(如图11-6所示)并将连接视为两个表格u·v的乘积,则会发生有趣的事情:对于物化连接的更改流遵循乘法规则(u·v)′ = u′v + uv′。换句话说:任何推文的更改都与当前的关注者配对,任何关注者的更改都与当前的推文配对。[49,50]。
References
[ 1 ] Tyler Akidau, Robert Bradshaw, Craig Chambers, et al.: “ The Dataflow Model: A Practical Approach to Balancing Correctness, Latency, and Cost in Massive-Scale, Unbounded, Out-of-Order Data Processing ,” Proceedings of the VLDB Endowment , volume 8, number 12, pages 1792–1803, August 2015. doi:10.14778/2824032.2824076
[1] Tyler Akidau, Robert Bradshaw, Craig Chambers等人:“数据流模型:在大规模、无限制、乱序数据处理中平衡正确性、延迟和成本的实用方法”,《VLDB Endowment》杂志,第8卷,第12期,2015年8月,1792-1803页,doi:10.14778/2824032.2824076。
[ 2 ] Harold Abelson, Gerald Jay Sussman, and Julie Sussman: Structure and Interpretation of Computer Programs , 2nd edition. MIT Press, 1996. ISBN: 978-0-262-51087-5, available online at mitpress.mit.edu
[2] Harold Abelson,Gerald Jay Sussman和Julie Sussman:《计算机程序的构造与解释》,第二版。MIT出版社,1996年。ISBN:978-0-262-51087-5,可在线访问mitpress.mit.edu。
[ 3 ] Patrick Th. Eugster, Pascal A. Felber, Rachid Guerraoui, and Anne-Marie Kermarrec: “ The Many Faces of Publish/Subscribe ,” ACM Computing Surveys , volume 35, number 2, pages 114–131, June 2003. doi:10.1145/857076.857078
[3] Patrick Th. Eugster, Pascal A. Felber,Rachid Guerraoui和Anne-Marie Kermarrec:““发布/订阅”的许多面孔。ACM计算机调查,第35卷,第2期,114-131页,2003年6月。doi:10.1145 / 857076.857078”
[ 4 ] Joseph M. Hellerstein and Michael Stonebraker: Readings in Database Systems , 4th edition. MIT Press, 2005. ISBN: 978-0-262-69314-1, available online at redbook.cs.berkeley.edu
[4] Joseph M. Hellerstein 和 Michael Stonebraker: 《数据库系统阅读集》第4版。 MIT出版社,2005年。ISBN:978-0-262-69314-1,可在线访问redbook.cs.berkeley.edu。
[ 5 ] Don Carney, Uğur Çetintemel, Mitch Cherniack, et al.: “ Monitoring Streams – A New Class of Data Management Applications ,” at 28th International Conference on Very Large Data Bases (VLDB), August 2002.
Don Carney、Uğur Çetintemel、Mitch Cherniack等人在2002年8月的第28届国际大型数据库会议(VLDB)上发表了重要论文:“流式数据监控——数据管理应用的新型类别”。
[ 6 ] Matthew Sackman: “ Pushing Back ,” lshift.net , May 5, 2016.
[6] Matthew Sackman:“Pushing Back,”lshift.net,2016年5月5日。 [6] 马修·萨克曼:“推回”,lshift.net,2016年5月5日。
[ 7 ] Vicent Martí: “ Brubeck, a statsd-Compatible Metrics Aggregator ,” githubengineering.com , June 15, 2015.
[7] Vicent Martí:“Brubeck,一种与Statsd兼容的度量聚合器”,githubengineering.com,2015年6月15日。
[ 8 ] Seth Lowenberger: “ MoldUDP64 Protocol Specification V 1.00 ,” nasdaqtrader.com , July 2009.
[8] Seth Lowenberger: “MoldUDP64 协议规范 V 1.00,” nasdaqtrader.com,2009年7月。
[ 9 ] Pieter Hintjens: ZeroMQ – The Guide . O’Reilly Media, 2013. ISBN: 978-1-449-33404-8
[9] Pieter Hintjens: ZeroMQ指南。O’Reilly Media,2013年。 ISBN:978-1-449-33404-8。
[ 10 ] Ian Malpass: “ Measure Anything, Measure Everything ,” codeascraft.com , February 15, 2011.
"[10] Ian Malpass: “Measure Anything, Measure Everything,” codeascraft.com, February 15, 2011." "[10] Ian Malpass: “度量一切,度量万物”,codeascraft.com,2011年2月15日。"
[ 11 ] Dieter Plaetinck: “ 25 Graphite, Grafana and statsd Gotchas ,” blog.raintank.io , March 3, 2016.
[11] Dieter Plaetinck:“25个Graphite、Grafana和statsd的坑点”,blog.raintank.io,2016年3月3日。
[ 12 ] Jeff Lindsay: “ Web Hooks to Revolutionize the Web ,” progrium.com , May 3, 2007.
[12] Jeff Lindsay:“Web Hook 改变网页的方式”,progrium.com,2007年5月3日。
[ 13 ] Jim N. Gray: “ Queues Are Databases ,” Microsoft Research Technical Report MSR-TR-95-56, December 1995.
"[13] Jim N. Gray: “队列就是数据库”,微软研究院技术报告MSR-TR-95-56,1995年12月。"
[ 14 ] Mark Hapner, Rich Burridge, Rahul Sharma, et al.: “ JSR-343 Java Message Service (JMS) 2.0 Specification ,” jms-spec.java.net , March 2013.
[14] Mark Hapner, Rich Burridge, Rahul Sharma等人: “JSR-343 Java消息服务(JMS)2.0规范”,jms-spec.java.net,2013年3月。
[ 15 ] Sanjay Aiyagari, Matthew Arrott, Mark Atwell, et al.: “ AMQP: Advanced Message Queuing Protocol Specification ,” Version 0-9-1, November 2008.
[15] Sanjay Aiyagari, Matthew Arrott, Mark Atwell,等人: “AMQP:高级消息队列协议规范”,版本0-9-1,2008年11月。
[ 16 ] “ Google Cloud Pub/Sub: A Google-Scale Messaging Service ,” cloud.google.com , 2016.
“Google Cloud Pub/Sub:一个谷歌规模的消息服务”,cloud.google.com,2016年。
[ 17 ] “ Apache Kafka 0.9 Documentation ,” kafka.apache.org , November 2015.
【17】“Apache Kafka 0.9 文档”,kafka.apache.org,2015年11月。
[ 18 ] Jay Kreps, Neha Narkhede, and Jun Rao: “ Kafka: A Distributed Messaging System for Log Processing ,” at 6th International Workshop on Networking Meets Databases (NetDB), June 2011.
[18] Jay Kreps,Neha Narkhede和Jun Rao:2011年6月在第六届国际网络与数据库研讨会(NetDB)上发表了“Kafka:用于日志处理的分布式消息传递系统”。
[ 19 ] “ Amazon Kinesis Streams Developer Guide ,” docs.aws.amazon.com , April 2016.
“Amazon Kinesis Streams 开发者指南”,docs.aws.amazon.com,2016年4月。
[ 20 ] Leigh Stewart and Sijie Guo: “ Building DistributedLog: Twitter’s High-Performance Replicated Log Service ,” blog.twitter.com , September 16, 2015.
【20】Leigh Stewart和Sijie Guo:“构建DistributedLog:Twitter的高性能复制日志服务”,博客.twitter.com,2015年9月16日。
[ 21 ] “ DistributedLog Documentation ,” Twitter, Inc., distributedlog.io , May 2016.
[21] “DistributedLog文档”,Twitter公司,distributedlog.io,2016年5月。
[ 22 ] Jay Kreps: “ Benchmarking Apache Kafka: 2 Million Writes Per Second (On Three Cheap Machines) ,” engineering.linkedin.com , April 27, 2014.
[22] Jay Kreps:「基准测试Apache Kafka:在三台便宜的机器上每秒写入2百万条记录」,引擎工程师.linkedin.com,2014年4月27日。
[ 23 ] Kartik Paramasivam: “ How We’re Improving and Advancing Kafka at LinkedIn ,” engineering.linkedin.com , September 2, 2015.
[23] Kartik Paramasivam:“LinkedIn如何改进和推进Kafka”,engineering.linkedin.com,2015年9月2日。
[ 24 ] Jay Kreps: “ The Log: What Every Software Engineer Should Know About Real-Time Data’s Unifying Abstraction ,” engineering.linkedin.com , December 16, 2013.
[24] Jay Kreps:“日志:每个软件工程师应该了解实时数据统一抽象的知识”,engineering.linkedin.com,2013年12月16日。
[ 25 ] Shirshanka Das, Chavdar Botev, Kapil Surlaker, et al.: “ All Aboard the Databus! ,” at 3rd ACM Symposium on Cloud Computing (SoCC), October 2012.
"全部登上数据总线!",来自Shirshanka Das,Chavdar Botev,Kapil Surlaker等人在2012年10月的第三届ACM云计算研讨会(SoCC)上的报告。
[ 26 ] Yogeshwer Sharma, Philippe Ajoux, Petchean Ang, et al.: “ Wormhole: Reliable Pub-Sub to Support Geo-Replicated Internet Services ,” at 12th USENIX Symposium on Networked Systems Design and Implementation (NSDI), May 2015.
[26] Yogeshwer Sharma,Philippe Ajoux,Petchean Ang等: “Wormhole:可靠的发布订阅技术以支持地理复制的互联网服务”,于2015年5月第12届 USENIX Symposium on Networked Systems Design and Implementation (NSDI) 上发表。
[ 27 ] P. P. S. Narayan: “ Sherpa Update ,” developer.yahoo.com , June 8, .
[27] P. P. S. Narayan: "Sherpa 更新", developer.yahoo.com,6月8日。
[ 28 ] Martin Kleppmann: “ Bottled Water: Real-Time Integration of PostgreSQL and Kafka ,” martin.kleppmann.com , April 23, 2015.
[28] Martin Kleppmann:“瓶装水:PostgreSQL和Kafka的实时集成”,martin.kleppmann.com,2015年4月23日。
[ 29 ] Ben Osheroff: “ Introducing Maxwell, a mysql-to-kafka Binlog Processor ,” developer.zendesk.com , August 20, 2015.
[29] Ben Osheroff:“介绍Maxwell,一个将mysql转换为kafka binlog处理器”,developer.zendesk.com,2015年8月20日。
[ 30 ] Randall Hauch: “ Debezium 0.2.1 Released ,” debezium.io , June 10, 2016.
兰德尔·霍克:「Debezium 0.2.1 发布」,debezium.io,2016年6月10日。
[ 31 ] Prem Santosh Udaya Shankar: “ Streaming MySQL Tables in Real-Time to Kafka ,” engineeringblog.yelp.com , August 1, 2016.
“将MySQL表实时流式传输到Kafka”,Yelp工程博客,2016年8月1日。
[ 32 ] “ Mongoriver ,” Stripe, Inc., github.com , September 2014.
【32】“Mongoriver”,Stripe,Inc.,github.com,2014年9月。 【32】“蒙戈河”,Stripe公司,github.com,2014年9月。
[ 33 ] Dan Harvey: “ Change Data Capture with Mongo + Kafka ,” at Hadoop Users Group UK , August 2015.
[33] Dan Harvey: “Mongo + Kafka 实现数据捕获变化”,在英国 Hadoop 用户组于2015年8月演讲。
[ 34 ] “ Oracle GoldenGate 12c: Real-Time Access to Real-Time Information ,” Oracle White Paper, March 2015.
“Oracle GoldenGate 12c:实时访问实时信息”,Oracle白皮书,2015年3月。
[ 35 ] “ Oracle GoldenGate Fundamentals: How Oracle GoldenGate Works ,” Oracle Corporation, youtube.com , November 2012.
【35】《Oracle GoldenGate基础知识:Oracle GoldenGate如何工作》 甲骨文公司,youtube.com, 2012年11月。
[ 36 ] Slava Akhmechet: “ Advancing the Realtime Web ,” rethinkdb.com , January 27, 2015.
[36] Slava Akhmechet:「推动实时网络」,rethinkdb.com,2015年1月27日。
[ 37 ] “ Firebase Realtime Database Documentation ,” Google, Inc., firebase.google.com , May 2016.
【37】“Firebase实时数据库文档”,Google公司,firebase.google.com,2016年5月。
[ 38 ] “ Apache CouchDB 1.6 Documentation ,” docs.couchdb.org , 2014.
[38] “Apache CouchDB 1.6 文档”,docs.couchdb.org,2014年。
[ 39 ] Matt DeBergalis: “ Meteor 0.7.0: Scalable Database Queries Using MongoDB Oplog Instead of Poll-and-Diff ,” info.meteor.com , December 17, 2013.
[39] Matt DeBergalis: “Meteor 0.7.0:使用 MongoDB Oplog 而非轮询差异来提高可扩展性的数据库查询”,info.meteor.com,2013年12月17日。
[ 40 ] “ Chapter 15. Importing and Exporting Live Data ,” VoltDB 6.4 User Manual, docs.voltdb.com , June 2016.
[40] "第15章:导入和导出实时数据",VoltDB 6.4 用户手册,docs.voltdb.com,2016年6月。
[ 41 ] Neha Narkhede: “ Announcing Kafka Connect: Building Large-Scale Low-Latency Data Pipelines ,” confluent.io , February 18, 2016.
Neha Narkhede:“宣布Kafka Connect:构建大规模低延迟数据管道”,confluent.io,2016年2月18日。
[ 42 ] Greg Young: “ CQRS and Event Sourcing ,” at Code on the Beach , August 2014.
[42] Greg Young: "CQRS和事件溯源",在海滩代码上,2014年8月。
[ 43 ] Martin Fowler: “ Event Sourcing ,” martinfowler.com , December 12, 2005.
【43】马丁·福勒(Martin Fowler):“事件溯源”,martinfowler.com,2005年12月12日。
[ 44 ] Vaughn Vernon: Implementing Domain-Driven Design . Addison-Wesley Professional, 2013. ISBN: 978-0-321-83457-7
[44] Vaughn Vernon:实现领域驱动设计。Addison-Wesley 专业出版社,2013年。ISBN:978-0-321-83457-7。
[ 45 ] H. V. Jagadish, Inderpal Singh Mumick, and Abraham Silberschatz: “ View Maintenance Issues for the Chronicle Data Model ,” at 14th ACM SIGACT-SIGMOD-SIGART Symposium on Principles of Database Systems (PODS), May 1995. doi:10.1145/212433.220201
【45】H. V. Jagadish,Inderpal Singh Mumick,和Abraham Silberschatz:“编年史数据模型的视图维护问题”,发表于1995年5月的第14届ACM SIGACT-SIGMOD-SIGART数据库系统基本原理研讨会(PODS)上。doi:10.1145/212433.220201。
[ 46 ] “ Event Store 3.5.0 Documentation ,” Event Store LLP, docs.geteventstore.com , February 2016.
[46] “Event Store 3.5.0文档”,Event Store LLP,docs.geteventstore.com,2016年2月。
[ 47 ] Martin Kleppmann: Making Sense of Stream Processing . Report, O’Reilly Media, May 2016.
[47] Martin Kleppmann:流处理的意义。报告,O'Reilly Media,2016年5月。
[ 48 ] Sander Mak: “ Event-Sourced Architectures with Akka ,” at JavaOne , September 2014.
[48] Sander Mak: 在JavaOne 2014年9月针对Akka的基于事件溯源的架构的演讲。
[ 49 ] Julian Hyde: personal communication , June 2016.
[49] 朱利安 · 海德: 个人交流, 2016年6月。
[ 50 ] Ashish Gupta and Inderpal Singh Mumick: Materialized Views: Techniques, Implementations, and Applications . MIT Press, 1999. ISBN: 978-0-262-57122-7
[50] Ashish Gupta和Inderpal Singh Mumick: 实现材料视图的技术和应用。麻省理工学院出版社,1999年。ISBN: 978-0-262-57122-7。
[ 51 ] Timothy Griffin and Leonid Libkin: “ Incremental Maintenance of Views with Duplicates ,” at ACM International Conference on Management of Data (SIGMOD), May 1995. doi:10.1145/223784.223849
[51] Timothy Griffin 和 Leonid Libkin:「重複值視圖的增量維護」,於 1995 年 5 月的 ACM 國際數據管理會議 (SIGMOD) 上,DOI:10.1145/223784.223849。
[ 52 ] Pat Helland: “ Immutability Changes Everything ,” at 7th Biennial Conference on Innovative Data Systems Research (CIDR), January 2015.
[52] Pat Helland在2015年1月的第七届创新数据系统研究双年会(CIDR)上发表的“不可变性改变一切”演讲。
[ 53 ] Martin Kleppmann: “ Accounting for Computer Scientists ,” martin.kleppmann.com , March 7, 2011.
[53] Martin Kleppmann:“计算机科学家的会计学”,martin.kleppmann.com,2011年3月7日。
[ 54 ] Pat Helland: “ Accountants Don’t Use Erasers ,” blogs.msdn.com , June 14, 2007.
[54] Pat Helland: “会计师不会用橡皮擦”,blogs.msdn.com,2007年6月14日。
[ 55 ] Fangjin Yang: “ Dogfooding with Druid, Samza, and Kafka: Metametrics at Metamarkets ,” metamarkets.com , June 3, 2015.
“使用Druid、Samza和Kafka进行自用:Metamarkets的Metametrics”,metamarkets.com,2015年6月3日。
[ 56 ] Gavin Li, Jianqiu Lv, and Hang Qi: “ Pistachio: Co-Locate the Data and Compute for Fastest Cloud Compute ,” yahoohadoop.tumblr.com , April 13, 2015.
"开心果: 将数据和计算同时放置以获得最快的云计算",来自yahoohadoop.tumblr.com,2015年4月13日。"
[ 57 ] Kartik Paramasivam: “ Stream Processing Hard Problems – Part 1: Killing Lambda ,” engineering.linkedin.com , June 27, 2016.
[57]Kartik Paramasivam:“流处理的难题-第一部分:杀死Lambda”,engineering.linkedin.com,2016年6月27日。 [57]Kartik Paramasivam:“流处理中的难题 - 第1部分:杀死Lambda”,engineering.linkedin.com,2016年6月27日。
[ 58 ] Martin Fowler: “ CQRS ,” martinfowler.com , July 14, 2011.
[58] Martin Fowler:“CQRS”,martinfowler.com,2011年7月14日。 [58] 马丁·福勒:“CQRS”,martinfowler.com,2011年7月14日。
[ 59 ] Greg Young: “ CQRS Documents ,” cqrs.files.wordpress.com , November 2010.
[59] Greg Young: “CQRS文档”, cqrs.files.wordpress.com,2010年11月。
[ 60 ] Baron Schwartz: “ Immutability, MVCC, and Garbage Collection ,” xaprb.com , December 28, 2013.
[60] Baron Schwartz: “不 变 性、MVCC 和 垃圾 回收”,xaprb.com,2013年12月28日。
[ 61 ] Daniel Eloff, Slava Akhmechet, Jay Kreps, et al.: “Re: Turning the Database Inside-out with Apache Samza ,” Hacker News discussion, news.ycombinator.com , March 4, 2015.
[61] Daniel Eloff, Slava Akhmechet, Jay Kreps,等人:"重新定义数据库:Apache Samza开发", Hacker News讨论,news.ycombinator.com,2015年3月4日。
[ 62 ] “ Datomic Development Resources: Excision ,” Cognitect, Inc., docs.datomic.com .
`【62】“Datomic开发资源:切除”,Cognitect, Inc.,docs.datomic.com。`
[ 63 ] “ Fossil Documentation: Deleting Content from Fossil ,” fossil-scm.org , 2016.
[63] “Fossil文档:从Fossil中删除内容”, fossil-scm.org,2016年。
[ 64 ] Jay Kreps: “ The irony of distributed systems is that data loss is really easy but deleting data is surprisingly hard, ” twitter.com , March 30, 2015.
“分布式系统的讽刺在于,数据丢失非常容易,但删除数据却令人意外地困难。”——杰伊·克莱普斯,2015年3月30日,Twitter。
[ 65 ] David C. Luckham: “ What’s the Difference Between ESP and CEP? ,” complexevents.com , August 1, 2006.
[65] David C. Luckham: “ESP和CEP的区别是什么?”,complexevents.com,2006年8月1日。
[ 66 ] Srinath Perera: “ How Is Stream Processing and Complex Event Processing (CEP) Different? ,” quora.com , December 3, 2015.
66. Srinath Perera:“流处理和复杂事件处理(CEP)有何不同?”, quora.com,2015年12月3日。
[ 67 ] Arvind Arasu, Shivnath Babu, and Jennifer Widom: “ The CQL Continuous Query Language: Semantic Foundations and Query Execution ,” The VLDB Journal , volume 15, number 2, pages 121–142, June 2006. doi:10.1007/s00778-004-0147-z
【67】Arvind Arasu、Shivnath Babu 和 Jennifer Widom:“CQL 连续查询语言:语义基础和查询执行”,The VLDB Journal,卷 15,第 2 期,页码 121–142,2006 年 6 月。doi:10.1007/s00778-004-0147-z。
[ 68 ] Julian Hyde: “ Data in Flight: How Streaming SQL Technology Can Help Solve the Web 2.0 Data Crunch ,” ACM Queue , volume 7, number 11, December 2009. doi:10.1145/1661785.1667562
[68] Julian Hyde:“数据在传输中:流式 SQL 技术如何帮助解决 Web 2.0 数据危机”,ACM Queue,第 7 卷,第 11 期,2009 年 12 月。doi:10.1145/1661785.1667562。
[ 69 ] “ Esper Reference, Version 5.4.0 ,” EsperTech, Inc., espertech.com , April 2016.
“Esper 参考文档, 版本5.4.0”,EsperTech, Inc., espertech.com,2016年4月。
[ 70 ] Zubair Nabi, Eric Bouillet, Andrew Bainbridge, and Chris Thomas: “ Of Streams and Storms ,” IBM technical report, developer.ibm.com , April 2014.
[70]Zubair Nabi,Eric Bouillet,Andrew Bainbridge和Chris Thomas:“关于数据流和风暴”,IBM技术报告,developer.ibm.com,2014年4月。
[ 71 ] Milinda Pathirage, Julian Hyde, Yi Pan, and Beth Plale: “ SamzaSQL: Scalable Fast Data Management with Streaming SQL ,” at IEEE International Workshop on High-Performance Big Data Computing (HPBDC), May 2016. doi:10.1109/IPDPSW.2016.141
"[71] Milinda Pathirage, Julian Hyde, Yi Pan和Beth Plale: ‘SamzaSQL:使用流式SQL进行可扩展快速数据管理’,发表于IEEE国际高性能大数据计算研讨会(HPBDC),2016年5月。 doi: 10.1109 / IPDPSW.2016.141"
[ 72 ] Philippe Flajolet, Éric Fusy, Olivier Gandouet, and Frédéric Meunier: “ HyperLogLog: The Analysis of a Near-Optimal Cardinality Estimation Algorithm ,” at Conference on Analysis of Algorithms (AofA), June 2007.
[72] Philippe Flajolet, Éric Fusy, Olivier Gandouet, and Frédéric Meunier: “HyperLogLog:一种近似最优基数估计算法的分析”,发表于算法分析 (AofA) 学术会议,2007 年 6 月。
[ 73 ] Jay Kreps: “ Questioning the Lambda Architecture ,” oreilly.com , July 2, 2014.
[73] 杰伊·克雷普斯:《质疑Lambda架构》,oreilly.com,2014年7月2日。
[ 74 ] Ian Hellström: “ An Overview of Apache Streaming Technologies ,” databaseline.wordpress.com , March 12, 2016.
"Apache流技术综述",Ian Hellström,databaseline。wordpress.com,2016年3月12日。
[ 75 ] Jay Kreps: “ Why Local State Is a Fundamental Primitive in Stream Processing ,” oreilly.com , July 31, 2014.
[75] Jay Kreps:“为什么本地状态是流处理中的基本原语”,oreilly.com,2014年7月31日。
[ 76 ] Shay Banon: “ Percolator ,” elastic.co , February 8, 2011.
[76] Shay Banon:“Percolator”,elastic.co,2011年2月8日。
[ 77 ] Alan Woodward and Martin Kleppmann: “ Real-Time Full-Text Search with Luwak and Samza ,” martin.kleppmann.com , April 13, 2015.
【77】Alan Woodward和Martin Kleppmann: “Luwak和Samza实现的实时全文搜索”, martin.kleppmann.com,2015年4月13日。
[ 78 ] “ Apache Storm 1.0.1 Documentation ,” storm.apache.org , May 2016.
“Apache Storm 1.0.1 文档,” storm.apache.org, 2016 年 5 月。
[ 79 ] Tyler Akidau: “ The World Beyond Batch: Streaming 102 ,” oreilly.com , January 20, 2016.
[79] Tyler Akidau:“超越批处理:流处理 102”。oreilly.com,2016 年 1 月 20 日。
[ 80 ] Stephan Ewen: “ Streaming Analytics with Apache Flink ,” at Kafka Summit , April 2016.
80. Stephan Ewen:在2016年四月的Kafka峰会上发表“使用Apache Flink进行流式分析”演讲。
[ 81 ] Tyler Akidau, Alex Balikov, Kaya Bekiroğlu, et al.: “ MillWheel: Fault-Tolerant Stream Processing at Internet Scale ,” at 39th International Conference on Very Large Data Bases (VLDB), August 2013.
[81] Tyler Akidau, Alex Balikov, Kaya Bekiroğlu, 等: “MillWheel:面向互联网规模的容错流处理”,发表于第39届非常大型数据集成会议(VLDB),2013年8月。
[ 82 ] Alex Dean: “ Improving Snowplow’s Understanding of Time ,” snowplowanalytics.com , September 15, 2015.
【82】Alex Dean:“优化Snowplow对时间的了解”,snowplowanalytics.com,2015年9月15日。
[ 83 ] “ Windowing (Azure Stream Analytics) ,” Microsoft Azure Reference, msdn.microsoft.com , April 2016.
"[83] "窗口化(Azure Stream Analytics)",Microsoft Azure参考,msdn.microsoft.com,2016年4月。"
[ 84 ] “ State Management ,” Apache Samza 0.10 Documentation, samza.apache.org , December 2015.
[84] “状态管理,” Apache Samza 0.10 文档,samza.apache.org,2015 年 12 月。
[ 85 ] Rajagopal Ananthanarayanan, Venkatesh Basker, Sumit Das, et al.: “ Photon: Fault-Tolerant and Scalable Joining of Continuous Data Streams ,” at ACM International Conference on Management of Data (SIGMOD), June 2013. doi:10.1145/2463676.2465272
“Photon: 容错和可扩展的连续数据流连接”,85页,作者为Rajagopal Ananthanarayanan、Venkatesh Basker、Sumit Das等,发表于2013年6月的ACM国际数据管理会议(SIGMOD),DOI:10.1145/2463676.2465272。
[ 86 ] Martin Kleppmann: “ Samza Newsfeed Demo ,” github.com , September 2014.
[86] Martin Kleppmann:“Samza Newsfeed演示”,github.com,2014年9月。
[ 87 ] Ben Kirwin: “ Doing the Impossible: Exactly-Once Messaging Patterns in Kafka ,” ben.kirw.in , November 28, 2014.
【87】本·柯尔文: “在Kafka中实现不可能完成的任务:确切一次性消息模式”,ben.kirw.in,2014年11月28日。
[ 88 ] Pat Helland: “ Data on the Outside Versus Data on the Inside ,” at 2nd Biennial Conference on Innovative Data Systems Research (CIDR), January 2005.
[88] Pat Helland:《内外数据的区别》,收录于2005年第二届创新数据系统研究会议(CIDR)。
[ 89 ] Ralph Kimball and Margy Ross: The Data Warehouse Toolkit: The Definitive Guide to Dimensional Modeling , 3rd edition. John Wiley & Sons, 2013. ISBN: 978-1-118-53080-1
[89] Ralph Kimball和Margy Ross: 数据仓库工具包:维度建模的权威指南,第三版。约翰威利和儿子,2013年。ISBN:978-1-118-53080-1。
[ 90 ] Viktor Klang: “ I’m coining the phrase ‘effectively-once’ for message processing with at-least-once + idempotent operations ,” twitter.com , October 20, 2016.
"维克多·克朗:我正在创造一个新的术语——“有效一次”,用于至少一次加幂等操作的消息处理。",推特,2016年10月20日。
[ 91 ] Matei Zaharia, Tathagata Das, Haoyuan Li, et al.: “ Discretized Streams: An Efficient and Fault-Tolerant Model for Stream Processing on Large Clusters ,” at 4th USENIX Conference in Hot Topics in Cloud Computing (HotCloud), June 2012.
[91] Matei Zaharia,Tathagata Das,Haoyuan Li等: “离散化流:大规模集群上流处理的高效和容错模型”,于2012年6月的第4届云计算热点问题(HotCloud)USENIX会议上。
[ 92 ] Kostas Tzoumas, Stephan Ewen, and Robert Metzger: “ High-Throughput, Low-Latency, and Exactly-Once Stream Processing with Apache Flink ,” data-artisans.com , August 5, 2015.
“高吞吐量、低延迟和精确一次性流处理与Apache Flink”,作者:Kostas Tzoumas、Stephan Ewen和Robert Metzger,发表于data-artisans.com网站,发布于2015年8月5日。
[ 93 ] Paris Carbone, Gyula Fóra, Stephan Ewen, et al.: “ Lightweight Asynchronous Snapshots for Distributed Dataflows ,” arXiv:1506.08603 [cs.DC], June 29, 2015.
[93] Paris Carbone, Gyula Fóra, Stephan Ewen和其他人: “轻量级异步快照用于分布式数据流”,arXiv:1506.08603 [cs.DC],2015年6月29日。
[ 94 ] Ryan Betts and John Hugg: Fast Data: Smart and at Scale . Report, O’Reilly Media, October 2015.
[94] Ryan Betts和John Hugg: 即时数据:智能且规模化。O'Reilly Media,2015年10月报告。
[ 95 ] Flavio Junqueira: “ Making Sense of Exactly-Once Semantics ,” at Strata+Hadoop World London , June 2016.
[95] Flavio Junqueira:“解读仅一次语义”,斯特拉塔+哈杜普世界伦敦,2016年6月。
[ 96 ] Jason Gustafson, Flavio Junqueira, Apurva Mehta, Sriram Subramanian, and Guozhang Wang: “ KIP-98 – Exactly Once Delivery and Transactional Messaging ,” cwiki.apache.org , November 2016.
[96] Jason Gustafson,Flavio Junqueira,Apurva Mehta,Sriram Subramanian和Guozhang Wang:“KIP-98-精确一次传递和事务消息”,cwiki.apache.org,2016年11月。
[ 97 ] Pat Helland: “ Idempotence Is Not a Medical Condition ,” Communications of the ACM , volume 55, number 5, page 56, May 2012. doi:10.1145/2160718.2160734
"97. 帕特·赫兰德:『幂等性不是医学条件』,ACM通讯,2012年5月第55卷第5期,第56页。doi: 10.1145/2160718.2160734。"
[ 98 ] Jay Kreps: “ Re: Trying to Achieve Deterministic Behavior on Recovery/Rewind ,” email to samza-dev mailing list, September 9, 2014.
"[98] Jay Kreps: “Re: 在恢复/倒带时实现确定性行为的尝试”,发送给Samza-dev邮件列表,2014年9月9日。"
[ 99 ] E. N. (Mootaz) Elnozahy, Lorenzo Alvisi, Yi-Min Wang, and David B. Johnson: “ A Survey of Rollback-Recovery Protocols in Message-Passing Systems ,” ACM Computing Surveys , volume 34, number 3, pages 375–408, September 2002. doi:10.1145/568522.568525
“消息传递系统中的回滚恢复协议综述”,作者为E. N. (Mootaz) Elnozahy、Lorenzo Alvisi、Yi-Min Wang和David B. Johnson。发表于2002年9月的ACM Computing Surveys上,第34卷,第3期,页码为375-408。doi:10.1145/568522.568525。
[ 100 ] Adam Warski: “ Kafka Streams – How Does It Fit the Stream Processing Landscape? ,” softwaremill.com , June 1, 2016.
[100] Adam Warski: “Kafka Streams - 它在流处理领域中的地位和应用?”, softwaremill.com, 2016年6月1日。