21 March 2019: 5th Data Engineering Meetup Berlin – Open Source Infrastructure

The Data Engineering Meetup in Berlin is back! The next edition will be on the 21st of March, and you can register on meetup.com. The meetup is free, and seats run out quickly, so don’t forget to RSVP!

This time, we got a new location: Zalando’s BZS office, across the river from the East Side Gallery. There is space for around 200 people, and the seat arrangement should be more suited for the event – especially if you’re sitting in the back.

The theme for this edition is ‘Open Source Infrastructure’. This time we have two talks, followed by a panel discussion.

The first talk is by Michal Gancarski, data engineer at Zalando. He will share his experience building data services in a small team. If you want to know how a small team achieves great results, you won’t want to miss this.

The second talk is by Wojciech Biela, the co-founder and Director of Product Development at Starburst. He will give a very technical talk about join reordering in Presto. If you use Presto and want a deep dive into technical details, this is the talk for you!

After the break, we will have a panel discussion on cloud infrastructure, titled “Infrastructure in the Cloud: DIY, Vendor, or Cloud Provider”. Moderated by our VP Data Kshitij Kumar, we will hear the perspective of cloud providers, of vendor of services based on open source projects, as well as of open source project committers. There will be time for questions, so get ready to grill our panelists!

As usual, food and drinks will be provided throughout the event. We organise this meetup for software engineers, data engineers, and data scientist to discuss what they do, the challenges they face, what works, and what doesn’t. If you would like to talk at the next meetup, get in touch. We want to know about all the interesting work you are doing in your organisation.

Mind the Status Code: Publishing events in Nakadi

Publishing to Nakadi is meant to be simple: just send your events to a REST API. You can use any language you want, and there are tons of libraries to make your life easier. But there are a few things to pay attention to. This is one of them.

When you publish a batch of event to Nakadi, you expect the response to be 200. This means that your events were validated against the schema of the event type you are publishing to, enriched, and persisted to Kafka. You could be tempted to write something like this to check that your events were published:

if (responseStatus.is2xxsuccessful()) {
  // my events were published
} else {
  // handle failure
}

Convinced that you handle publishing failure correctly, you deploy your application in production. Everything is fine, until one day…

… an incident happens. Some events weren’t published.

Merde.

What happened? Nakadi replied with this status code:

207 Multi-Status

At least one event wasn’t published. Nakadi even included a summary of each event’s status in the response body. You should check the response body, and try again to publish the events that weren’t published. The code that checks the publishing status could become something like:

if (responseStatus.equals(HttpStatus.OK) {
  // my events were published
} else {
  // handle failure
  if (responseStatus.equals(HttpStatus.MULTI_STATUS) {
    // some events were not published
  }
}

This issue has affected users in the past, and it will no doubt affect others in the future. The 207 status code is clearly documented in the Nakadi documentation, but of course, a 2xx code that represents a partial failure is strange and confusing. The thing is, we haven’t found a better response code to use. And now, we can’t change it anymore without breaking the API. So be careful, and check your response very carefully.

Reducing the Commit Timeout in Nakadi Subscriptions

In November, we released a change to Nakadi: the ability to reduce commit_timeout when consuming events with the subscription API. The change may look small, but it can bring substantial improvements to users who use it properly.

TL;DR

When connecting to your subscription to receive events, add the commit_timeout parameter to select the timeout you want to use, in seconds. The range of acceptable values includes the integers between 0 and the default commit timeout. For example, this request will use a commit timeout of 5 seconds:

curl https://nakadi.url/subscriptions/my_subscription_id/events?commit_timeout=5

Commit timeout in subscriptions

When Nakadi sends events to a client consuming from a subscription, it expects the client to commit cursors before the commit timeout runs out. If the client has not committed cursors when the commit timeout passes, then Nakadi considers the client dead: cursor commits will not be accepted anymore, the slot used by the client is freed, and the partition assignments for the subscription are rebalanced between the connected clients.

What does this look like, concretely? Let’s take an example, illustrated in Figure 1. We have two clients, C1 and C2, consuming from a subscription S that is attached to two partitions, P1 and P2. Each client is assigned one partition: C1 consumes events from P1, and C2 consumes events from P2. We assume a commit timeout of 60 seconds.

Figure 1: How commit timeout works

At t0, Nakadi sends a batch of events to C1. At that time, the countdown for the commit timeout starts, and C1 has 60 seconds to commit the cursors it received together with the batch of events.

At t1, Nakadi sends a batch of events to C2. At that time, the countdown for the commit timeout starts, and C2 has 60 seconds to commit the cursors it received together with the batch of events.

At t2, which happens before C1‘s commit timeout runs out, C1 commits its cursors. The commit succeeds, and Nakadi sends another batch of events to C1. It also starts another 60 seconds countdown for C1 to commit the new batch of events.

At t3, the commit timeout for C2 runs out, but C2 has not committed offsets yet. Maybe C2 died, maybe there were network issues, or maybe C2 is not finished processing the batch yet. Nakadi can’t know for sure, so it closes the connection to C2, frees the slot that C2 was using, and rebalances the partitions between the connected clients, such that all the partitions are now assigned to C1.

At t4, C2 is done processing the batch, and tries to commit the cursors. However, it is too late, and Nakadi rejects the commit.

Issues with a fixed commit timeout

Until we added the option to decrease the commit timeout as a parameter that consumers can provide when requesting events, the commit timeout was a fixed value, set by the Nakadi operators. At Zalando, we set it to 60 seconds. 60 seconds seems like a reasonable value for most people: it is long enough that consumers should have plenty of time to process batches (and if they can’t, they can use smaller batches), while small enough that connection issues or dead consumers are detected quickly, so Nakadi can free the slot they are using in the subscription, and reassign the partitions they consume from to other consumers.

However, some users need to process events very quickly after they were published. They reported issues with the commit timeout, which was too large for their use cases: if Nakadi sends data to a consumer, and something goes wrong with the consumer, then they need to wait 60 seconds before they can reconnect to the subscription. Figure 2 shows an example of this kind of problem.

Figure 2: A very long wait

A consumer C1 consumes from a subscription that contains a single partition. Therefore, only one consumer can consume from the subscription at any point in time. Another consumer, C2, is used as a fail-over in case something goes wrong with C1. It continuously tries to connect to the subscription, but gets a 409 Conflict response while C1 is receiving events. Once again, the commit timeout is set to 60 seconds. The consumers have an important SLO to meet: 99% of events must be processed within 3 seconds.

At t0, Nakadi sends a batch of events to C1, and the countdown for the commit timeout starts. C1 usually commits very quickly, within a few 100s of milliseconds.

At t1, which is 2 seconds after t0, C1 is crashed: it will never commit the batch it received. However, C2 still cannot connect to take over the consumption, since Nakadi is waiting for another 58 seconds.

At t2, which occurs 60 seconds after t0, Nakadi considers C1 to be dead, and frees the slot. C2 can now connect to the subscription and resume consumption. However, 58 seconds have passed since C1 crashed, and the first events are now much too late. Remember the 3 seconds SLO! Worse, the subscription has accumulated a whole minute of backlog, which, on a busy event type, can represent a lot of events. Now, C2 has to process all these late events, and it will take some time before it can catch and process events within its SLO.

Reducing the commit timeout

Reducing the commit timeout per connection is now possible, which will help in scenarios such as the one described in Figure 2. Now, whenever they connect to the subscription to receive events, C1 and C2 can add a new parameter, commit_timeout. The value of commit_timeout is the commit timeout in seconds, and it can be anywhere between 0 and the default commit timeout (60 seconds at Zalando). Setting commit_timeout to 0 is equivalent to using the default.

Let’s now revisit our scenario in Figure 3. Here, both C1 and C2 connect to the subscription API with the commit_timeout value set to 1. Nakadi only waits for 1 second before considering that a client is dead.

At t0, C1 connects to the subscription to receive events, with a commit timeout of 1 second.

At t1, Nakadi sends a batch to C1, and the 1 second countdown starts.

At t2, which happens before the countdown reaches 0, C1 commits cursors. Nakadi then sends another batch to C1.

At t3, which is 1 second after t2, Nakadi still hasn’t received a commit from C1. It considers C1 dead, and frees the slot.

At t4, a few milliseconds after t3, C2 connects to the subscription to receive events. Nakadi sends a batch to C2 – the same batch that it sent to C1 at t2, since it was never committed -, and starts the 1 second countdown.

At t5, a few hundred milliseconds after t4, C2 commits its cursors.

Because the consumers have carefully selected the commit timeout that suits them best, all the events have been processed within 3 seconds, even while C1 crashed. The consumers have not broken their SLO, and did not accumulate a backlog of late events.

Caveat

You be tempted to set a very low commit timeout for all your consumers, but this can be dangerous. Let’s add a twist to the example in Figure 3: both C1 and C2 write events to the same database before committing cursors. For some reason, insertion queries to the database become very slow. Figure 4 shows what happens.

C1 is already connected to the subscription with a commit timeout of 1 second, and C2 is the fallback. At t0, Nakadi sends a batch of events to C1, and the 1 second countdown starts.

At t1, C1 is not done writing the events to the database, and therefore hasn’t committed cursors. Nakadi considers C1 dead, and frees the slot.

At t2, C2 connects to the subscription, also with a commit timeout of 1 second. C1 is still processing the events. Nakadi sends to C2 the same batch it sent to C1 at t0, and the 1 second countdown starts.

At t3, C1 is done writing to the database, and tries to commit its cursors. Nakadi rejects the commit, since it came too late.

At t4, which is 1 second after t2, C2 is still busy writing the events to the database, which is still slow. It hasn’t committed cursors, and the countdown has reached 0. Nakadi considers C2 dead, and frees the slot.

This back-and-forth between C1 and C2 can continue for a long time, until the database performance improves sufficiently again. As they are unable to commit, C1 and C2 are making no progress, and the backlog of events is growing. Events are late, and quickly, the SLO is breached.

Setting a higher commit timeout could have helped in this scenario. A good rule of thumb for setting the commit timeout value would be to use the highest value you can live with.

Incidents will happen, and things will go wrong, so it is a good idea to be able to change the consumption parameters manually, either through environment variables, or settings that can be changed at runtime. A more sophisticated approach would be for the consumer to monitor the time it takes to process events, and automatically change the commit timeout when the processing time reaches a threshold. In our example, the consumer could notice that writing to the database takes just over a second, and used a 2 seconds commit timeout on the next connection to the subscription.

Nakadi at FOSDEM 2019

My next talk on Nakadi is confirmed, and it will be at FOSDEM, on Sunday the 3rd of February. It will be a short talk in the HPC, Big Data and Data Science dev room. You can check out the details on the FOSDEM website.

I already spoke about Nakadi at FOSDEM 2018. This will be a different talk. Instead of talking about the features that Nakadi provides from the point of view of an operator, I’ll talk about how our small team of 9 people develops, maintains, operates, and supports Nakadi at Zalando, which is used by hundreds of teams every day. Come over to FOSDEM to find out more, or wait for the recording to be available if you can’t be in Brussels on the day!

Data Engineering Meetup, 4th Edition: 25 September 2018

The fourth edition of our data engineering meetup is in exactly two weeks’ time, and we have just put up the event program. If you’re in Berlin on the 25th of September, and interested in data engineering, register quickly on the meetup page. Last time, the event was full in a few hours only!

For this edition, you will get to hear about BI, serverless data ingestion, streaming platforms, notebooks, and more, with speakers from Ifesca, Valohai, and Zalando.

The talks will be recorded, and we will make them available online shortly after the meetup. You can check out the videos from the last meetup on our Youtube channel.

The meetup is an event organised by engineers, for engineers. We don’t do sales pitches, but we talk about tales from the trenches, the not-always-pretty reality of data engineering. Sometimes we rant. Sometimes we celebrate. We keep talks short, and leave plenty of time for questions and informal discussions. So, if you are interested in data engineering, don’t hesitate, join us!

We aim to organise this meetup quarterly. Would you like to talk at the next meetup? Get in touch, and show us what you’d like to talk about!

PS: I didn’t tell you this, but if the event is full, and you want to join, just show up at the door. Chances are, we’ll find a way to squeeze you in!

Last Month in Nakadi: July 2018

This is the sixth installment in the series of blog posts, “Last Month in Nakadi”, where I try to give some more details on new features and bug fixes that were released over the previous month.

[New] Log-compacted topics

Pull Request 1
Pull Request 2 
Pull Request 3

Nakadi now supports log-compacted topics. A feature long available in Kafka can now be used from Nakadi. In a nutshell, a log-compacted topic is a topic where events are published as key-value pairs. At some interval, Kafka compacts the topic, which means that, for each key, it only keeps the latest published value; messages with the same key, but published earlier, are discarded. The Kafka documentation has more details about this feature.

How to use it in Nakadi? Simply set the cleanup policy to ‘compact’ when creating the event type. It is not possible to convert a “classic” event type to a log-compacted one, because the events in the “classic” event type do not have a key.

Log compacted topics currently do not support timelines, and it is possible to specify a different Kafka storage for them. There is also a feature flag, to turn on/off the creation of log-compacted event types.

[Updated] Kafka client 1.1.1

Pull Request

We have updated the Kafka client to the latest version, 1.1.1. A few days later, Kafka 2.0.0 was released, so this is no longer the latest version, but at least it is a recent one. We also updated the version of Kafka used in the docker-compose file, when running Nakadi for development and acceptance tests.

[Fixed] Audience field

Pull Request

The audience field was accepting values with_underscores, while the requirements from the architects was to accept fields with-hyphens instead. This is to be consistent across software developed by Zalando.

[Removed] Remove legacy feature toggles

Pull Request

This is the first PR opened by our new team mate, Suyash (welcome!). He did a bit of cleaning up on the code base, and removed unused feature toggles. Now the code looks a little bit nicer, and easier to read!

And that’s it for July. If you would like to contribute to Nakadi, please feel free to browse the issues on github, especially those marked with the “help wanted” tag. If you would like to implement a large new feature, please open an issue first to discuss it, so we can all agree on what it should look like. We very much welcome all sorts of contributions: not just code, but also documentation, help with the website, etc.

Call for Submissions: Berlin Data Engineering Meetup, 25 September 2018

public speaking

Photo by Kane Reinholdtsen on Unsplash

The Berlin Data Engineering Meetup is a quarterly meetup organised by a few crazy people from Zalando’s data services department. The meetup is a venue for engineers to present their ideas, exchange best practices, and candidly talk about failures, accidents, and other catastrophes. What brings people together is their interest for all things data engineering: streaming platforms, machine learning, databases, storage formats, stream processing, etc.

For the next edition of the meetup, on the 25th of September 2018, we invite speakers of all levels of expertise to submit talks on any aspect of data engineering. Talks should be 20 minutes long, with a few extra minutes for questions. Topics of interest include, but are not limited to:

  • Stream processing
  • Data lakes
  • CQRS
  • Machine learning
  • Databases and data stores
  • Data formats
  • Data quality
  • BI
  • Infrastructure
  • Microservices
  • Access control
  • Data visualisation
  • When things go wrong

We favour talks where the speakers share their experiences, good and bad, talk about what they learned, and all the things you typically don’t find in the documentation.

How to submit

Send an email to [email protected] with the following information:

  • Name
  • Affiliation
  • Title of talk
  • Short abstract (up to 150 words)
  • Short bio (up to 100 words)

Important dates

  • 10 August 2018: submission deadline
  • 17 August 2018: speakers notification
  • 24 August 2018: schedule publication
  • 25 September 2018: meetup!

Location

The Hub @ Zalando
Tamara-Danz-Str. 1
10243 Berlin