Transactions
KafkaJS provides a a simple interface to support Kafka transactions.
Note: Transactions require Kafka version >= v0.11.
Sending Messages within a Transaction
You initialize a transaction by making an async call to producer.transaction(). The returned transaction object has the methods send and sendBatch with an identical signature to the producer. When you are done you call transaction.commit() or transaction.abort() to end the transaction. A transactionally aware consumer will only read messages which were committed.
Note: Kafka requires that the transactional producer have the following configuration to guarantee EoS ("Exactly-once-semantics"):
- The producer must have a max in flight requests of 1
- The producer must wait for acknowledgement from all replicas (acks=-1)
- The producer must have unlimited retries
Configure the producer client with maxInFlightRequests: 1 and idempotent: true to guarantee EOS. Configuring the two options will enable the settings mentioned above.
const client = new Kafka({
clientId: 'transactional-client',
brokers: ['kafka1:9092', 'kafka2:9092'],
})
const producer = client.producer({ maxInFlightRequests: 1, idempotent: true })
Within a transaction, you can produce one or more messages. If transaction.abort is called, all messages will be rolled back.
const transaction = await producer.transaction()
try {
await transaction.send({ topic, messages })
await transaction.commit()
} catch (e) {
await transaction.abort()
}
Sending Offsets
To send offsets as part of a transaction, meaning they will be committed only if the transaction succeeds, use the transaction.sendOffsets() method. This is necessary whenever we want a transaction to produce messages derived from a consumer, in a "consume-transform-produce" loop.
await transaction.sendOffsets({
consumerGroupId, topics
})
topics has the following structure:
[{
topic: <String>,
partitions: [{
partition: <Number>,
offset: <String>
}]
}]