Producing Messages
To publish messages to Kafka you have to create a producer. Simply call the producer
function of the client to create it:
const producer = kafka.producer()
or with options
const producer = kafka.producer({
allowAutoTopicCreation: false,
transactionTimeout: 30000
})
Options
option | description | default |
---|---|---|
createPartitioner | Take a look at Custom Partitioner for more information | null |
retry | Take a look at Producer Retry for more information | null |
metadataMaxAge | The period of time in milliseconds after which we force a refresh of metadata even if we haven't seen any partition leadership changes to proactively discover any new brokers or partitions | 300000 - 5 minutes |
allowAutoTopicCreation | Allow topic creation when querying metadata for non-existent topics | true |
transactionTimeout | The maximum amount of time in ms that the transaction coordinator will wait for a transaction status update from the producer before proactively aborting the ongoing transaction. If this value is larger than the transaction.max.timeout.ms setting in the broker, the request will fail with a InvalidTransactionTimeout error | 60000 |
idempotent | Experimental. If enabled producer will ensure each message is written exactly once. Acks must be set to -1 ("all"). Retries will default to MAX_SAFE_INTEGER. | false |
maxInFlightRequests | Max number of requests that may be in progress at any time. If falsey then no limit. | null (no limit) |
Producing messages
The method send
is used to publish messages to the Kafka cluster.
const producer = kafka.producer()
await producer.connect()
await producer.send({
topic: 'topic-name',
messages: [
{ key: 'key1', value: 'hello world' },
{ key: 'key2', value: 'hey hey!' }
],
})
Example with a defined partition:
const producer = kafka.producer()
await producer.connect()
await producer.send({
topic: 'topic-name',
messages: [
{ key: 'key1', value: 'hello world', partition: 0 },
{ key: 'key2', value: 'hey hey!', partition: 1 }
],
})
The method send
has the following signature:
await producer.send({
topic: <String>,
messages: <Message[]>,
acks: <Number>,
timeout: <Number>,
compression: <CompressionTypes>,
})
property | description | default |
---|---|---|
topic | topic name | |
messages | An array of objects. See Message structure for more details. Example: [{ key: 'my-key', value: 'my-value'}] | |
acks | Control the number of required acks. -1 = all insync replicas must acknowledge (default) 0 = no acknowledgments 1 = only waits for the leader to acknowledge | -1 all insync replicas must acknowledge |
timeout | The time to await a response in ms | 30000 |
compression | Compression codec | CompressionTypes.None |
Message structure
Messages have the following properties:
Property | Description | Default |
---|---|---|
key | Used for partitioning. See Key | |
value | Your message content. The value can be a Buffer, a string or null. The value will always be encoded as bytes when sent to Kafka. When consumed, the consumer will need to interpret the value according to your schema. | |
partition | Which partition to send the message to. See Key for details on how the partition is decided if this property is omitted. | |
timestamp | The timestamp of when the message was created. See Timestamp for details. | Date.now() |
headers | Metadata to associate with your message. See Headers. |
Key
The message key
is used to decide which partition the message will be sent to. This is important to ensure that messages relating to the same aggregate are processed in order. For example, if you use an orderId
as the key, you can ensure that all messages regarding that order will be processed in order.
By default, the producer is configured to distribute the messages with the following logic:
- If a partition is specified in the message, use it
- If no partition is specified but a key is present choose a partition based on a hash (murmur2) of the key
- If no partition or key is present choose a partition in a round-robin fashion
Timestamp
Each message has a timestamp in the form of a UTC timestamp with millisecond precision as a string. If no timestamp was provided, the producer will use the current time as the timestamp. When the message is consumed, the broker may override this timestamp depending on the topic configuration:
- If the topic is configured to use CreateTime, the timestamp from the producer's message will be used.
- If the topic is configured to use LogAppendTime, the timestamp will be overwritten by the broker with the broker local time when it appends the message to its log.
Headers
Kafka v0.11 introduces record headers, which allows your messages to carry extra metadata. To send headers with your message, include the key headers
with the values. Example:
await producer.send({
topic: 'topic-name',
messages: [{
key: 'key1',
value: 'hello world',
headers: {
'correlation-id': '2bfb68bb-893a-423b-a7fa-7b568cad5b67',
'system-id': 'my-system',
}
}]
})
A header value can be either a string or an array of strings.
Producing to multiple topics
To produce to multiple topics at the same time, use sendBatch
. This can be useful, for example, when migrating between two topics.
const topicMessages = [
{
topic: 'topic-a',
messages: [{ key: 'key', value: 'hello topic-a' }],
},
{
topic: 'topic-b',
messages: [{ key: 'key', value: 'hello topic-b' }],
},
{
topic: 'topic-c',
messages: [
{
key: 'key',
value: 'hello topic-c',
headers: {
'correlation-id': '2bfb68bb-893a-423b-a7fa-7b568cad5b67',
},
}
],
}
]
await producer.sendBatch({ topicMessages })
sendBatch
has the same signature as send
, except topic
and messages
are replaced with topicMessages
:
await producer.sendBatch({
topicMessages: <TopicMessages[]>,
acks: <Number>,
timeout: <Number>,
compression: <CompressionTypes>,
})
property | description |
---|---|
topicMessages | An array of objects with topic and messages .messages is an array of the same type as for send . |
Custom partitioner
It's possible to assign a custom partitioner to the producer. A partitioner is a function which returns another function responsible for the partition selection, something like this:
const MyPartitioner = () => {
return ({ topic, partitionMetadata, message }) => {
// select a partition based on some logic
// return the partition number
return 0
}
}
partitionMetadata
is an array of partitions with the following structure:
{ partitionId: <NodeId>, leader: <NodeId> }
Example:
[
{ partitionId: 1, leader: 1 },
{ partitionId: 2, leader: 2 },
{ partitionId: 0, leader: 0 }
]
To use your custom partitioner, use the option createPartitioner
when creating the producer.
kafka.producer({ createPartitioner: MyPartitioner })
Default Partitioners
KafkaJS ships with 2 partitioners: DefaultPartitioner
and LegacyPartitioner
.
The DefaultPartitioner
should be compatible with the default partitioner that ships with the Java Kafka client. This can be important to meet the co-partitioning requirement when joining multiple topics.
🚨 Important 🚨
The
LegacyPartitioner
was the default until v2.0.0. If you are upgrading from a version older and want to retain the previous partitioning behavior, use theLegacyPartitioner
by importing it and providing it to the Producer constructor:
const { Partitioners } = require('kafkajs') kafka.producer({ createPartitioner: Partitioners.LegacyPartitioner })
Retry
The option retry
can be used to customize the configuration for the producer.
Take a look at Retry for more information.
Compression
Since KafkaJS aims to have as small footprint and as few dependencies as possible, only the GZIP codec is part of the core functionality, other codecs are available as packages.
GZIP
const { CompressionTypes } = require('kafkajs')
async () => {
await producer.send({
topic: 'topic-name',
compression: CompressionTypes.GZIP,
messages: [
{ key: 'key1', value: 'hello world' },
{ key: 'key2', value: 'hey hey!' }
],
})
}
The consumers know how to decompress GZIP, so no further work is necessary.
Snappy
Snappy support is provided by the package kafkajs-snappy
npm install --save kafkajs-snappy
# yarn add kafkajs-snappy
const { CompressionTypes, CompressionCodecs } = require('kafkajs')
const SnappyCodec = require('kafkajs-snappy')
CompressionCodecs[CompressionTypes.Snappy] = SnappyCodec
Take a look at the official readme for more information
LZ4
LZ4 support is provided by the package kafkajs-lz4
npm install --save kafkajs-lz4
# yarn add kafkajs-lz4
const { CompressionTypes, CompressionCodecs } = require('kafkajs')
const LZ4 = require('kafkajs-lz4')
CompressionCodecs[CompressionTypes.LZ4] = new LZ4().codec
The package also accepts options to granularly control LZ4 compression & decompression. Take a look at the official readme for more information.
ZSTD
Zstandard support is provided by @kafkajs/zstd
npm install --save @kafkajs/zstd
# yarn add @kafkajs/zstd
const { CompressionTypes, CompressionCodecs } = require('kafkajs')
const ZstdCodec = require('@kafkajs/zstd')
CompressionCodecs[CompressionTypes.ZSTD] = ZstdCodec()
Configuration options can be passed to the factory function to control compression & decompression levels and other features. See the official readme for more information.
Other
Any other codec can be easily implemented using existing libraries.
A codec is an object with two async
functions: compress
and decompress
. Import the libraries and define the codec object:
const MyCustomSnappyCodec = {
async compress(encoder) {
return someCompressFunction(encoder.buffer)
},
async decompress(buffer) {
return someDecompressFunction(buffer)
}
}
Now that we have the codec object, we can wrap it in a function and add it to the implementation:
const { CompressionTypes, CompressionCodecs } = require('kafkajs')
CompressionCodecs[CompressionTypes.Snappy] = () => MyCustomSnappyCodec
The new codec can now be used with the send
method, example:
await producer.send({
topic: 'topic-name',
compression: CompressionTypes.Snappy,
messages: [
{ key: 'key1', value: 'hello world' },
{ key: 'key2', value: 'hey hey!' }
],
})