Next Monday, on the 24th of August, I will be giving a talk at the Data Republic meetup – and of course, it will be online, so you can join wherever you are in the world. Head to the Data Republic meetup page to register.
My talk will be called “Writing a Resilient Event Consumer, One Incident at a Time”. What I’ll do in this short talk is walk you through some of the most common pitfalls of writing and operating a data consumer that needs to satisfy latency SLOs. We’ll start with a very simple data consumer application, and through a series of incidents, we will progressively make it better.
After almost 4 years working on Zalando’s event streaming platform, Nakadi, I have seen quite a few issues that arise when people write applications that publish to, or consume from, the event streaming platform. Although I will use Nakadi as an example, what I will talk about applies generally, even if you use Kafka, AWS Kinesis, Google PubSub, or any similar product.
In November, we released a change to Nakadi: the ability to reduce commit_timeout when consuming events with the subscription API. The change may look small, but it can bring substantial improvements to users who use it properly.
When connecting to your subscription to receive events, add the commit_timeout parameter to select the timeout you want to use, in seconds. The range of acceptable values includes the integers between 0 and the default commit timeout. For example, this request will use a commit timeout of 5 seconds:
When Nakadi sends events to a client consuming from a subscription, it expects the client to commit cursors before the commit timeout runs out. If the client has not committed cursors when the commit timeout passes, then Nakadi considers the client dead: cursor commits will not be accepted anymore, the slot used by the client is freed, and the partition assignments for the subscription are rebalanced between the connected clients.
What does this look like, concretely? Let’s take an example, illustrated in Figure 1. We have two clients, C1 and C2, consuming from a subscription S that is attached to two partitions, P1 and P2. Each client is assigned one partition: C1 consumes events from P1, and C2 consumes events from P2. We assume a commit timeout of 60 seconds.
At t0, Nakadi sends a batch of events to C1. At that time, the countdown for the commit timeout starts, and C1 has 60 seconds to commit the cursors it received together with the batch of events.
At t1, Nakadi sends a batch of events to C2. At that time, the countdown for the commit timeout starts, and C2 has 60 seconds to commit the cursors it received together with the batch of events.
At t2, which happens before C1‘s commit timeout runs out, C1 commits its cursors. The commit succeeds, and Nakadi sends another batch of events to C1. It also starts another 60 seconds countdown for C1 to commit the new batch of events.
At t3, the commit timeout for C2 runs out, but C2 has not committed offsets yet. Maybe C2 died, maybe there were network issues, or maybe C2 is not finished processing the batch yet. Nakadi can’t know for sure, so it closes the connection to C2, frees the slot that C2 was using, and rebalances the partitions between the connected clients, such that all the partitions are now assigned to C1.
At t4, C2 is done processing the batch, and tries to commit the cursors. However, it is too late, and Nakadi rejects the commit.
Issues with a fixed commit timeout
Until we added the option to decrease the commit timeout as a parameter that consumers can provide when requesting events, the commit timeout was a fixed value, set by the Nakadi operators. At Zalando, we set it to 60 seconds. 60 seconds seems like a reasonable value for most people: it is long enough that consumers should have plenty of time to process batches (and if they can’t, they can use smaller batches), while small enough that connection issues or dead consumers are detected quickly, so Nakadi can free the slot they are using in the subscription, and reassign the partitions they consume from to other consumers.
However, some users need to process events very quickly after they were published. They reported issues with the commit timeout, which was too large for their use cases: if Nakadi sends data to a consumer, and something goes wrong with the consumer, then they need to wait 60 seconds before they can reconnect to the subscription. Figure 2 shows an example of this kind of problem.
A consumer C1 consumes from a subscription that contains a single partition. Therefore, only one consumer can consume from the subscription at any point in time. Another consumer, C2, is used as a fail-over in case something goes wrong with C1. It continuously tries to connect to the subscription, but gets a 409 Conflict response while C1 is receiving events. Once again, the commit timeout is set to 60 seconds. The consumers have an important SLO to meet: 99% of events must be processed within 3 seconds.
At t0, Nakadi sends a batch of events to C1, and the countdown for the commit timeout starts. C1 usually commits very quickly, within a few 100s of milliseconds.
At t1, which is 2 seconds after t0, C1 is crashed: it will never commit the batch it received. However, C2 still cannot connect to take over the consumption, since Nakadi is waiting for another 58 seconds.
At t2, which occurs 60 seconds after t0, Nakadi considers C1 to be dead, and frees the slot. C2 can now connect to the subscription and resume consumption. However, 58 seconds have passed since C1 crashed, and the first events are now much too late. Remember the 3 seconds SLO! Worse, the subscription has accumulated a whole minute of backlog, which, on a busy event type, can represent a lot of events. Now, C2 has to process all these late events, and it will take some time before it can catch and process events within its SLO.
Reducing the commit timeout
Reducing the commit timeout per connection is now possible, which will help in scenarios such as the one described in Figure 2. Now, whenever they connect to the subscription to receive events, C1 and C2 can add a new parameter, commit_timeout. The value of commit_timeout is the commit timeout in seconds, and it can be anywhere between 0 and the default commit timeout (60 seconds at Zalando). Setting commit_timeout to 0 is equivalent to using the default.
Let’s now revisit our scenario in Figure 3. Here, both C1 and C2 connect to the subscription API with the commit_timeout value set to 1. Nakadi only waits for 1 second before considering that a client is dead.
At t0, C1 connects to the subscription to receive events, with a commit timeout of 1 second.
At t1, Nakadi sends a batch to C1, and the 1 second countdown starts.
At t2, which happens before the countdown reaches 0, C1 commits cursors. Nakadi then sends another batch to C1.
At t3, which is 1 second after t2, Nakadi still hasn’t received a commit from C1. It considers C1 dead, and frees the slot.
At t4, a few milliseconds after t3, C2 connects to the subscription to receive events. Nakadi sends a batch to C2 – the same batch that it sent to C1 at t2, since it was never committed -, and starts the 1 second countdown.
At t5, a few hundred milliseconds after t4, C2 commits its cursors.
Because the consumers have carefully selected the commit timeout that suits them best, all the events have been processed within 3 seconds, even while C1 crashed. The consumers have not broken their SLO, and did not accumulate a backlog of late events.
You be tempted to set a very low commit timeout for all your consumers, but this can be dangerous. Let’s add a twist to the example in Figure 3: both C1 and C2 write events to the same database before committing cursors. For some reason, insertion queries to the database become very slow. Figure 4 shows what happens.
C1 is already connected to the subscription with a commit timeout of 1 second, and C2 is the fallback. At t0, Nakadi sends a batch of events to C1, and the 1 second countdown starts.
At t1, C1 is not done writing the events to the database, and therefore hasn’t committed cursors. Nakadi considers C1 dead, and frees the slot.
At t2, C2 connects to the subscription, also with a commit timeout of 1 second. C1 is still processing the events. Nakadi sends to C2 the same batch it sent to C1 at t0, and the 1 second countdown starts.
At t3, C1 is done writing to the database, and tries to commit its cursors. Nakadi rejects the commit, since it came too late.
At t4, which is 1 second after t2, C2 is still busy writing the events to the database, which is still slow. It hasn’t committed cursors, and the countdown has reached 0. Nakadi considers C2 dead, and frees the slot.
This back-and-forth between C1 and C2 can continue for a long time, until the database performance improves sufficiently again. As they are unable to commit, C1 and C2 are making no progress, and the backlog of events is growing. Events are late, and quickly, the SLO is breached.
Setting a higher commit timeout could have helped in this scenario. A good rule of thumb for setting the commit timeout value would be to use the highest value you can live with.
Incidents will happen, and things will go wrong, so it is a good idea to be able to change the consumption parameters manually, either through environment variables, or settings that can be changed at runtime. A more sophisticated approach would be for the consumer to monitor the time it takes to process events, and automatically change the commit timeout when the processing time reaches a threshold. In our example, the consumer could notice that writing to the database takes just over a second, and used a 2 seconds commit timeout on the next connection to the subscription.
Back when I was studying in Belgium, I religiously attended FOSDEM – the Free and Open Source Software Developers’ European Meeting, every year, in Brussels. In fact, as a member of the NamurLUG, I was part of the team that recorded the talks at FOSDEM for quite a few years. Initially we recorded with consumer-grade cameras, but we soon upgraded to better quality equipment, and even started streaming the events live, after a couple of years. Since then, another team has taken over, and the quality of the recording has improved quite a lot from our very amateur debuts.
This year, I will be back at FOSDEM, but this time I’ll be on the other side: I will give a Lightning Talk about Nakadi, the Event Broker I work on at Zalando. Nakadi is Open Source Software, and provides a RESTful API on top of Kafka-like queues (we have plans to support Kinesis in the future), as well as a bunch of other features: schema validation with json-schema, schema evolution, per-event type authorization, and more. In this talk I will focus on one of my favourite features: Timelines. What is Timelines? Well, I guess you’ll have to watch my talk to find out (or wait for the blog post explaining it, I am working on one)! If you can’t make it to Brussels for FOSDEM, the talk will be recorded and streamed live.