Improving WebSocket Callback Processing Time

Stav Alfi
octopol-engineering
3 min readMar 20, 2021

--

As part of the creation of one of our NodeJS services, me and part of my team (@NoyEliyahu and @OrFins) investigated a critical performance problem. The service wasn’t ready yet anyways but it was cool enough to share this story.

The flow was: at the initialization of the service, we connect to a web-socket (https://github.com/websockets/ws ) to other service (let’s call it ws-producer), receiving 100k of events (snapshot of what the other side has from the begining of time until now) at the start of the connection and then recieve real-time events at a much lower rate:

1k events per second

Each incomming ws-message, should be sent to our kafka.

The messages has a relation between them, and the behavior of the ws-producer has a wierd API:

It sends us 10 types of events. 2 of them is: “add_something” and “remove_something”. The problem was, sometimes, instead of sending me “add” → “remove” → “add” on the same entity, the ws-producer sends me: “add” → “add”. So to make our life easier on the rest of our services, this service will identify this situation by receiving ws events of: “add” → “add” and translate them to kafka events of: “add” → “remove” → “add”.

To do that, we use in-memory (there is a reason for that, it’s out of scope of this article) hash-map to remember all the “add” and the “remove” events.

Our ws processing function was async so as how ws callbacks work, ws will call the callback without waiting until the last call to the callback resolved. To avoid race-conditions between calls of our callback, we used an in-memory queue (https://caolan.github.io/async/v3/docs.html#queue) with concurrency-level=1 so we can safely process each incoming ws-message in the original order in a async way.

It was a big failure :S
We received 100k of events at the first 40s of the service and most of them waited in the queue for up to a minute.

There were 2 problems:

  1. We sent each message to the kafka one by one (not in batch)
  2. The queue was too slow.

To fix (1), we had to find some kind of a queue that we can consume from it in a batch and use this to send kafka messages in a batch. We used rxjs for that:

private readonly producerSubject = new Subject<{ topic: string; message: Kafka.Message }>()this.producerSubject
.pipe(
// every 100ms send the internal queue to `concatMap` and reset the queue
bufferTime(100),
concatMap(async array => {
if (array.length > 0) {
const batch = // internal processing of the array
await this.kafkaProducer.sendBatch({ topicMessages: batch })
}
}),
)

// send events to rxjs:
// NOTE: `next` returns void!
this.producerSubject.next({topic: 'topic1', messages: [message1]})
this.producerSubject.next({topic: 'topic2', messages: [message2]})
this.producerSubject.next({topic: 'topic1', messages: [message3]})
this.producerSubject.next({topic: 'topic3', messages: [message4]})

cool!

Now, the processing of each ws-message does not send messages to kafka so it’s synchronous (We also removed some other async stuff from the ws-processing callback).

How does it help us?

Alot!

As of now, the ws-callback is synchronus, so we don’t need any queue (https://caolan.github.io/async/v3/docs.html#queue) to help us process all the incoming messages one after the other (because of how node.js works — if a callback is not async, the ws will wait until the end of the processing and only then call the callback with the next message).

Here are the results (no queue, send batch messages to kafka): (90% of the messages has processing time of 0.01s — we decreased it from 60s!).

This is a huge improvement.

--

--