Mind the Status Code: Publishing events in Nakadi

Publishing to Nakadi is meant to be simple: just send your events to a REST API. You can use any language you want, and there are tons of libraries to make your life easier. But there are a few things to pay attention to. This is one of them.

When you publish a batch of event to Nakadi, you expect the response to be 200. This means that your events were validated against the schema of the event type you are publishing to, enriched, and persisted to Kafka. You could be tempted to write something like this to check that your events were published:

if (responseStatus.is2xxsuccessful()) {
  // my events were published
} else {
  // handle failure
}

Convinced that you handle publishing failure correctly, you deploy your application in production. Everything is fine, until one day…

… an incident happens. Some events weren’t published.

Merde.

What happened? Nakadi replied with this status code:

207 Multi-Status

At least one event wasn’t published. Nakadi even included a summary of each event’s status in the response body. You should check the response body, and try again to publish the events that weren’t published. The code that checks the publishing status could become something like:

if (responseStatus.equals(HttpStatus.OK) {
  // my events were published
} else {
  // handle failure
  if (responseStatus.equals(HttpStatus.MULTI_STATUS) {
    // some events were not published
  }
}

This issue has affected users in the past, and it will no doubt affect others in the future. The 207 status code is clearly documented in the Nakadi documentation, but of course, a 2xx code that represents a partial failure is strange and confusing. The thing is, we haven’t found a better response code to use. And now, we can’t change it anymore without breaking the API. So be careful, and check your response very carefully.

Reducing the Commit Timeout in Nakadi Subscriptions

In November, we released a change to Nakadi: the ability to reduce commit_timeout when consuming events with the subscription API. The change may look small, but it can bring substantial improvements to users who use it properly.

TL;DR

When connecting to your subscription to receive events, add the commit_timeout parameter to select the timeout you want to use, in seconds. The range of acceptable values includes the integers between 0 and the default commit timeout. For example, this request will use a commit timeout of 5 seconds:

curl https://nakadi.url/subscriptions/my_subscription_id/events?commit_timeout=5

Commit timeout in subscriptions

When Nakadi sends events to a client consuming from a subscription, it expects the client to commit cursors before the commit timeout runs out. If the client has not committed cursors when the commit timeout passes, then Nakadi considers the client dead: cursor commits will not be accepted anymore, the slot used by the client is freed, and the partition assignments for the subscription are rebalanced between the connected clients.

What does this look like, concretely? Let’s take an example, illustrated in Figure 1. We have two clients, C1 and C2, consuming from a subscription S that is attached to two partitions, P1 and P2. Each client is assigned one partition: C1 consumes events from P1, and C2 consumes events from P2. We assume a commit timeout of 60 seconds.

Figure 1: How commit timeout works

At t0, Nakadi sends a batch of events to C1. At that time, the countdown for the commit timeout starts, and C1 has 60 seconds to commit the cursors it received together with the batch of events.

At t1, Nakadi sends a batch of events to C2. At that time, the countdown for the commit timeout starts, and C2 has 60 seconds to commit the cursors it received together with the batch of events.

At t2, which happens before C1‘s commit timeout runs out, C1 commits its cursors. The commit succeeds, and Nakadi sends another batch of events to C1. It also starts another 60 seconds countdown for C1 to commit the new batch of events.

At t3, the commit timeout for C2 runs out, but C2 has not committed offsets yet. Maybe C2 died, maybe there were network issues, or maybe C2 is not finished processing the batch yet. Nakadi can’t know for sure, so it closes the connection to C2, frees the slot that C2 was using, and rebalances the partitions between the connected clients, such that all the partitions are now assigned to C1.

At t4, C2 is done processing the batch, and tries to commit the cursors. However, it is too late, and Nakadi rejects the commit.

Issues with a fixed commit timeout

Until we added the option to decrease the commit timeout as a parameter that consumers can provide when requesting events, the commit timeout was a fixed value, set by the Nakadi operators. At Zalando, we set it to 60 seconds. 60 seconds seems like a reasonable value for most people: it is long enough that consumers should have plenty of time to process batches (and if they can’t, they can use smaller batches), while small enough that connection issues or dead consumers are detected quickly, so Nakadi can free the slot they are using in the subscription, and reassign the partitions they consume from to other consumers.

However, some users need to process events very quickly after they were published. They reported issues with the commit timeout, which was too large for their use cases: if Nakadi sends data to a consumer, and something goes wrong with the consumer, then they need to wait 60 seconds before they can reconnect to the subscription. Figure 2 shows an example of this kind of problem.

Figure 2: A very long wait

A consumer C1 consumes from a subscription that contains a single partition. Therefore, only one consumer can consume from the subscription at any point in time. Another consumer, C2, is used as a fail-over in case something goes wrong with C1. It continuously tries to connect to the subscription, but gets a 409 Conflict response while C1 is receiving events. Once again, the commit timeout is set to 60 seconds. The consumers have an important SLO to meet: 99% of events must be processed within 3 seconds.

At t0, Nakadi sends a batch of events to C1, and the countdown for the commit timeout starts. C1 usually commits very quickly, within a few 100s of milliseconds.

At t1, which is 2 seconds after t0, C1 is crashed: it will never commit the batch it received. However, C2 still cannot connect to take over the consumption, since Nakadi is waiting for another 58 seconds.

At t2, which occurs 60 seconds after t0, Nakadi considers C1 to be dead, and frees the slot. C2 can now connect to the subscription and resume consumption. However, 58 seconds have passed since C1 crashed, and the first events are now much too late. Remember the 3 seconds SLO! Worse, the subscription has accumulated a whole minute of backlog, which, on a busy event type, can represent a lot of events. Now, C2 has to process all these late events, and it will take some time before it can catch and process events within its SLO.

Reducing the commit timeout

Reducing the commit timeout per connection is now possible, which will help in scenarios such as the one described in Figure 2. Now, whenever they connect to the subscription to receive events, C1 and C2 can add a new parameter, commit_timeout. The value of commit_timeout is the commit timeout in seconds, and it can be anywhere between 0 and the default commit timeout (60 seconds at Zalando). Setting commit_timeout to 0 is equivalent to using the default.

Let’s now revisit our scenario in Figure 3. Here, both C1 and C2 connect to the subscription API with the commit_timeout value set to 1. Nakadi only waits for 1 second before considering that a client is dead.

At t0, C1 connects to the subscription to receive events, with a commit timeout of 1 second.

At t1, Nakadi sends a batch to C1, and the 1 second countdown starts.

At t2, which happens before the countdown reaches 0, C1 commits cursors. Nakadi then sends another batch to C1.

At t3, which is 1 second after t2, Nakadi still hasn’t received a commit from C1. It considers C1 dead, and frees the slot.

At t4, a few milliseconds after t3, C2 connects to the subscription to receive events. Nakadi sends a batch to C2 – the same batch that it sent to C1 at t2, since it was never committed -, and starts the 1 second countdown.

At t5, a few hundred milliseconds after t4, C2 commits its cursors.

Because the consumers have carefully selected the commit timeout that suits them best, all the events have been processed within 3 seconds, even while C1 crashed. The consumers have not broken their SLO, and did not accumulate a backlog of late events.

Caveat

You be tempted to set a very low commit timeout for all your consumers, but this can be dangerous. Let’s add a twist to the example in Figure 3: both C1 and C2 write events to the same database before committing cursors. For some reason, insertion queries to the database become very slow. Figure 4 shows what happens.

C1 is already connected to the subscription with a commit timeout of 1 second, and C2 is the fallback. At t0, Nakadi sends a batch of events to C1, and the 1 second countdown starts.

At t1, C1 is not done writing the events to the database, and therefore hasn’t committed cursors. Nakadi considers C1 dead, and frees the slot.

At t2, C2 connects to the subscription, also with a commit timeout of 1 second. C1 is still processing the events. Nakadi sends to C2 the same batch it sent to C1 at t0, and the 1 second countdown starts.

At t3, C1 is done writing to the database, and tries to commit its cursors. Nakadi rejects the commit, since it came too late.

At t4, which is 1 second after t2, C2 is still busy writing the events to the database, which is still slow. It hasn’t committed cursors, and the countdown has reached 0. Nakadi considers C2 dead, and frees the slot.

This back-and-forth between C1 and C2 can continue for a long time, until the database performance improves sufficiently again. As they are unable to commit, C1 and C2 are making no progress, and the backlog of events is growing. Events are late, and quickly, the SLO is breached.

Setting a higher commit timeout could have helped in this scenario. A good rule of thumb for setting the commit timeout value would be to use the highest value you can live with.

Incidents will happen, and things will go wrong, so it is a good idea to be able to change the consumption parameters manually, either through environment variables, or settings that can be changed at runtime. A more sophisticated approach would be for the consumer to monitor the time it takes to process events, and automatically change the commit timeout when the processing time reaches a threshold. In our example, the consumer could notice that writing to the database takes just over a second, and used a 2 seconds commit timeout on the next connection to the subscription.

Nakadi at FOSDEM 2019

My next talk on Nakadi is confirmed, and it will be at FOSDEM, on Sunday the 3rd of February. It will be a short talk in the HPC, Big Data and Data Science dev room. You can check out the details on the FOSDEM website.

I already spoke about Nakadi at FOSDEM 2018. This will be a different talk. Instead of talking about the features that Nakadi provides from the point of view of an operator, I’ll talk about how our small team of 9 people develops, maintains, operates, and supports Nakadi at Zalando, which is used by hundreds of teams every day. Come over to FOSDEM to find out more, or wait for the recording to be available if you can’t be in Brussels on the day!

Data Engineering Meetup, 4th Edition: 25 September 2018

The fourth edition of our data engineering meetup is in exactly two weeks’ time, and we have just put up the event program. If you’re in Berlin on the 25th of September, and interested in data engineering, register quickly on the meetup page. Last time, the event was full in a few hours only!

For this edition, you will get to hear about BI, serverless data ingestion, streaming platforms, notebooks, and more, with speakers from Ifesca, Valohai, and Zalando.

The talks will be recorded, and we will make them available online shortly after the meetup. You can check out the videos from the last meetup on our Youtube channel.

The meetup is an event organised by engineers, for engineers. We don’t do sales pitches, but we talk about tales from the trenches, the not-always-pretty reality of data engineering. Sometimes we rant. Sometimes we celebrate. We keep talks short, and leave plenty of time for questions and informal discussions. So, if you are interested in data engineering, don’t hesitate, join us!

We aim to organise this meetup quarterly. Would you like to talk at the next meetup? Get in touch, and show us what you’d like to talk about!

PS: I didn’t tell you this, but if the event is full, and you want to join, just show up at the door. Chances are, we’ll find a way to squeeze you in!

Last Month in Nakadi: July 2018

This is the sixth installment in the series of blog posts, “Last Month in Nakadi”, where I try to give some more details on new features and bug fixes that were released over the previous month.

[New] Log-compacted topics

Pull Request 1
Pull Request 2 
Pull Request 3

Nakadi now supports log-compacted topics. A feature long available in Kafka can now be used from Nakadi. In a nutshell, a log-compacted topic is a topic where events are published as key-value pairs. At some interval, Kafka compacts the topic, which means that, for each key, it only keeps the latest published value; messages with the same key, but published earlier, are discarded. The Kafka documentation has more details about this feature.

How to use it in Nakadi? Simply set the cleanup policy to ‘compact’ when creating the event type. It is not possible to convert a “classic” event type to a log-compacted one, because the events in the “classic” event type do not have a key.

Log compacted topics currently do not support timelines, and it is possible to specify a different Kafka storage for them. There is also a feature flag, to turn on/off the creation of log-compacted event types.

[Updated] Kafka client 1.1.1

Pull Request

We have updated the Kafka client to the latest version, 1.1.1. A few days later, Kafka 2.0.0 was released, so this is no longer the latest version, but at least it is a recent one. We also updated the version of Kafka used in the docker-compose file, when running Nakadi for development and acceptance tests.

[Fixed] Audience field

Pull Request

The audience field was accepting values with_underscores, while the requirements from the architects was to accept fields with-hyphens instead. This is to be consistent across software developed by Zalando.

[Removed] Remove legacy feature toggles

Pull Request

This is the first PR opened by our new team mate, Suyash (welcome!). He did a bit of cleaning up on the code base, and removed unused feature toggles. Now the code looks a little bit nicer, and easier to read!

And that’s it for July. If you would like to contribute to Nakadi, please feel free to browse the issues on github, especially those marked with the “help wanted” tag. If you would like to implement a large new feature, please open an issue first to discuss it, so we can all agree on what it should look like. We very much welcome all sorts of contributions: not just code, but also documentation, help with the website, etc.

Call for Submissions: Berlin Data Engineering Meetup, 25 September 2018

public speaking

Photo by Kane Reinholdtsen on Unsplash

The Berlin Data Engineering Meetup is a quarterly meetup organised by a few crazy people from Zalando’s data services department. The meetup is a venue for engineers to present their ideas, exchange best practices, and candidly talk about failures, accidents, and other catastrophes. What brings people together is their interest for all things data engineering: streaming platforms, machine learning, databases, storage formats, stream processing, etc.

For the next edition of the meetup, on the 25th of September 2018, we invite speakers of all levels of expertise to submit talks on any aspect of data engineering. Talks should be 20 minutes long, with a few extra minutes for questions. Topics of interest include, but are not limited to:

  • Stream processing
  • Data lakes
  • CQRS
  • Machine learning
  • Databases and data stores
  • Data formats
  • Data quality
  • BI
  • Infrastructure
  • Microservices
  • Access control
  • Data visualisation
  • When things go wrong

We favour talks where the speakers share their experiences, good and bad, talk about what they learned, and all the things you typically don’t find in the documentation.

How to submit

Send an email to [email protected] with the following information:

  • Name
  • Affiliation
  • Title of talk
  • Short abstract (up to 150 words)
  • Short bio (up to 100 words)

Important dates

  • 10 August 2018: submission deadline
  • 17 August 2018: speakers notification
  • 24 August 2018: schedule publication
  • 25 September 2018: meetup!

Location

The Hub @ Zalando
Tamara-Danz-Str. 1
10243 Berlin

Last Month in Nakadi: June 2018

This is the fifth installment in the series of blog posts, “Last Month in Nakadi”, where I try to give some more details on new features and bug fixes that were released over the previous month.

[Changed] Reduced logging

Pull Request

Nakadi logs a lot of stuff. It’s very useful, but also comes with a cost. Recently, we were looking at our logs, and noticed that our SLO logging amounts for a large percentage of our logs. So we combined it with our access log, which significantly reduced the number of lines logged.

[Changed] Stricter JSON parser

Pull Request 1

Pull Request 2

While working on an internal service that consumes data from Nakadi, we noticed that the JSON parser in Nakadi is a bit too lax, and allows event producers to publish incorrect JSON. The issue is that our new service’s parser couldn’t parse that. In this change, we implemented a new parser, which is both more strict and more efficient. So, Nakadi performs a little bit better, and consumers have stronger guarantees regarding the events they consume.

In order to avoid breaking existing producers, we released this feature in two parts: first, we would log events that would be accepted by the old parser but not by the new one. We ran this version for a few days and got in touch with affected producers. Once we were satisfied that all producers would not be affected, we release the second pull request, which only uses the new, stricter parser.

[New] Feature flag to allow the deletion of event types with subscriptions

Pull Request

So far, deleting an event type has only been possible if there were no subscriptions for this event type. The reason behind this is to make sure that no consumers are taken by surprise when an event type is deleted. In our staging deployment, we found that it can sometimes cause issues and delays, especially when consuming applications are configured to automatically re-create subscriptions when they are deleted.

Now, there is a new feature flag, XXX, that allows users to delete event types that have subscriptions attached to them. Note that this will also delete the subscriptions, so we do not recommend turning this feature on for production systems – accidents happen!

[Removed] No more feature flag for subscriptions API

Pull Request

A few months ago, we announced that the low-level consumption API was now considered deprecated, and that clients should not use it anymore. We have not yet set a deadline for its removal from the code, but it will come eventually. To consume events from Nakadi, clients should now use the subscription API, also known as HiLA (High-Level API). This API could be turned on and off with a feature toggle, but since it is now the only supported one, the toggle doesn’t make much sense anymore. So we removed it, in order to make the code a little easier to read and maintain.

[New] New attributes

Pull Request 1

Pull Request 2

These two pull requests add optional attributes to an event type, for consistency with Zalando’s REST API guidelines.

The first one is ordering_key_fields, which can be used when the order of events across multiple partitions is important. Nakadi only guarantees ordering within partitions, so this can be used by consumers to re-order events once they have been consumed.

The second one is audience, which determines the target of an event type. The target is a class of consumers, such as component_internal, external_partner, or others.

Both attributes are optional, and Nakadi does not perform any kind of enforcement as a result of these being set – they are purely informational.

[Fix] Latest_available_offsets

Pull Request

There was a bug in the subscription stats endpoint. In some rare cases, it could look like there were unconsumed events in a subscription, but that was not actually the case. No events would be lost, but monitoring systems would report unconsumed events – which couldn’t be consumed, since they didn’t actually exist.

And that’s it for June. If you would like to contribute to Nakadi, please feel free to browse the issues on github, especially those marked with the “help wanted” tag. If you would like to implement a large new feature, please open an issue first to discuss it, so we can all agree on what it should look like. We very much welcome all sorts of contributions: not just code, but also documentation, help with the website, etc.