Admin Client
The admin client hosts all the cluster operations, such as: createTopics
, createPartitions
, etc.
const kafka = new Kafka(...)
const admin = kafka.admin()
// remember to connect and disconnect when you are done
await admin.connect()
await admin.disconnect()
The option retry
can be used to customize the configuration for the admin.
Take a look at Retry for more information.
Create topics
createTopics
will resolve to true
if the topic was created successfully or false
if it already exists. The method will throw exceptions in case of errors.
await admin.createTopics({
validateOnly: <boolean>,
waitForLeaders: <boolean>
timeout: <Number>,
topics: <ITopicConfig[]>,
})
ITopicConfig
structure:
{
topic: <String>,
numPartitions: <Number>, // default: 1
replicationFactor: <Number>, // default: 1
replicaAssignment: <Array>, // Example: [{ partition: 0, replicas: [0,1,2] }] - default: []
configEntries: <Array> // Example: [{ name: 'cleanup.policy', value: 'compact' }] - default: []
}
property | description | default |
---|---|---|
topics | Topic definition | |
validateOnly | If this is true , the request will be validated, but the topic won't be created. | false |
timeout | The time in ms to wait for a topic to be completely created on the controller node | 5000 |
waitForLeaders | If this is true it will wait until metadata for the new topics doesn't throw LEADER_NOT_AVAILABLE | true |
Delete topics
await admin.deleteTopics({
topics: <String[]>,
timeout: <Number>,
})
Topic deletion is disabled by default in Apache Kafka versions prior to 1.0.0
. To enable it set the server config.
delete.topic.enable=true
Get topic metadata
Deprecated, see Fetch topic metadata
Fetch topic metadata
await admin.fetchTopicMetadata({ topics: <Array<String> })
TopicsMetadata
structure:
{
topics: <Array<TopicMetadata>>,
}
TopicMetadata
structure:
{
topic: <String>,
partitions: <Array<PartitionMetadata>> // default: 1
}
PartitionMetadata
structure:
{
partitionErrorCode: <Number>, // default: 0
partitionId: <Number>,
leader: <Number>,
replicas: <Array<Number>>,
isr: <Array<Number>>,
}
The admin client will throw an exception if any of the provided topics do not already exist.
If you omit the topics
argument the admin client will fetch metadata for all topics:
await admin.fetchTopicMetadata()
Fetch topic offsets
fetchTopicOffsets
returns most recent offset for a topic.
await admin.fetchTopicOffsets(topic)
// [
// { partition: 0, offset: '31004', high: '31004', low: '421' },
// { partition: 1, offset: '54312', high: '54312', low: '3102' },
// { partition: 2, offset: '32103', high: '32103', low: '518' },
// { partition: 3, offset: '28', high: '28', low: '0' },
// ]
Fetch consumer group offsets
fetchOffsets
returns the consumer group offset for a topic.
await admin.fetchOffsets({ groupId, topic })
// [
// { partition: 0, offset: '31004' },
// { partition: 1, offset: '54312' },
// { partition: 2, offset: '32103' },
// { partition: 3, offset: '28' },
// ]
Reset consumer group offsets
resetOffsets
resets the consumer group offset to the earliest or latest offset (latest by default).
The consumer group must have no running instances when performing the reset. Otherwise, the command will be rejected.
await admin.resetOffsets({ groupId, topic }) // latest by default
// await admin.resetOffsets({ groupId, topic, earliest: true })
Set consumer group offsets
setOffsets
allows you to set the consumer group offset to any value.
await admin.setOffsets({
groupId: <String>,
topic: <String>,
partitions: <SeekEntry[]>,
})
SeekEntry
structure:
{
partition: <Number>,
offset: <String>,
}
Example:
await admin.setOffsets({
groupId: 'my-consumer-group',
topic: 'custom-topic',
partitions: [
{ partition: 0, offset: '35' },
{ partition: 3, offset: '19' },
]
})
Describe configs
Get the configuration for the specified resources.
await admin.describeConfigs({
includeSynonyms: <boolean>,
resources: <ResourceConfigQuery[]>
})
ResourceConfigQuery
structure:
{
type: <ResourceType>,
name: <String>,
configNames: <String[]>
}
Returning all configs for a given resource:
const { ResourceTypes } = require('kafkajs')
await admin.describeConfigs({
includeSynonyms: false,
resources: [
{
type: ResourceTypes.TOPIC,
name: 'topic-name'
}
]
})
Returning specific configs for a given resource:
const { ResourceTypes } = require('kafkajs')
await admin.describeConfigs({
includeSynonyms: false,
resources: [
{
type: ResourceTypes.TOPIC,
name: 'topic-name',
configNames: ['cleanup.policy']
}
]
})
Take a look at resourceTypes for a complete list of resources.
Example response:
{
resources: [
{
configEntries: [{
configName: 'cleanup.policy',
configValue: 'delete',
isDefault: true,
isSensitive: false,
readOnly: false
}],
errorCode: 0,
errorMessage: null,
resourceName: 'topic-name',
resourceType: 2
}
],
throttleTime: 0
}
Alter configs
Update the configuration for the specified resources.
await admin.alterConfigs({
validateOnly: false,
resources: <ResourceConfig[]>
})
ResourceConfig
structure:
{
type: <ResourceType>,
name: <String>,
configEntries: <ResourceConfigEntry[]>
}
ResourceConfigEntry
structure:
{
name: <String>,
value: <String>
}
Example:
const { ResourceTypes } = require('kafkajs')
await admin.alterConfigs({
resources: [{
type: ResourceTypes.TOPIC,
name: 'topic-name',
configEntries: [{ name: 'cleanup.policy', value: 'compact' }]
}]
})
Take a look at resourceTypes for a complete list of resources.
Example response:
{
resources: [{
errorCode: 0,
errorMessage: null,
resourceName: 'topic-name',
resourceType: 2,
}],
throttleTime: 0,
}