Skip to content

Shuttle.Hopper.Kafka

Kafka implementation for Shuttle.Hopper.

Installation

bash
dotnet add package Shuttle.Hopper.Kafka

Docker Compose

To get a local Kafka broker up and running using the docker-compose.yml file in the root of the repository:

bash
docker compose up -d

Configuration

The URI structure is kafka://configuration-name/topic-name.

c#
services.AddHopper(builder =>
{
    builder.UseKafka(kafkaBuilder =>
    {
        var kafkaOptions = new KafkaOptions
        {
            BootstrapServers = "localhost:9092",
            ReplicationFactor = 1,
            NumPartitions = 1,
            MessageSendMaxRetries = 3,
            RetryBackoff = TimeSpan.FromSeconds(1),
            EnableAutoCommit = false,
            EnableAutoOffsetStore = false,
            FlushEnqueue = false,
            UseCancellationToken = true,
            ConsumeTimeout = TimeSpan.FromSeconds(30),
            OperationTimeout = TimeSpan.FromSeconds(30),
            ConnectionsMaxIdle = TimeSpan.Zero,
            Acks = Acks.All,
            EnableIdempotence = true
        };

        kafkaBuilder.AddOptions("local", kafkaOptions);
    });
});

The documentation for the Confluent.Kafka ConsumerConfig and ProducerConfig can be consulted for specific options. If there are any options that are not exposed via the KafkaOptions class, they can be set by providing the relevant configuration object:

c#
services.AddHopper(builder =>
{
    builder.UseKafka(kafkaBuilder =>
    {
        var kafkaOptions = new KafkaOptions
        {
            BootstrapServers = "localhost:9092",
            // ... other properties
            ConsumerConfig = new ConsumerConfig
            {
                // ... Confluent.Kafka specific options
            },
            ProducerConfig = new ProducerConfig
            {
                // ... Confluent.Kafka specific options
            }
        };

        kafkaBuilder.AddOptions("local", kafkaOptions);
    });
});

The default JSON settings structure is as follows:

json
{
  "Shuttle": {
    "Kafka": {
      "local": {
        "BootstrapServers": "localhost:9092",
        "ReplicationFactor": 1,
        "NumPartitions": 1,
        "MessageSendMaxRetries": 3,
        "RetryBackoff": "00:00:01",
        "EnableAutoCommit": false,
        "EnableAutoOffsetStore": false,
        "FlushEnqueue": false,
        "UseCancellationToken": true,
        "ConsumeTimeout": "00:00:30",
        "OperationTimeout": "00:00:30",
        "ConnectionsMaxIdle": "00:00:00",
        "Acks": "All",
        "EnableIdempotence": true
      }
    }
  }
}

Options

OptionDefaultDescription
BootstrapServersInitial list of brokers as a CSV list of broker host or host:port.
ReplicationFactor1The replication factor for the new topic.
NumPartitions1The number of partitions for the new topic.
MessageSendMaxRetries3How many times to retry sending a failing Message.
RetryBackoff"00:00:01"The backoff time before retrying a protocol request.
EnableAutoCommitfalseAutomatically and periodically commit offsets in the background.
EnableAutoOffsetStorefalseAutomatically store offset of last message provided to application.
FlushEnqueuefalseIf true will call Flush on the producer after a message has been enqueued.
UseCancellationTokentrueIndicates whether a cancellation token is used for relevant methods.
ConsumeTimeout"00:00:30"The duration to poll for messages before returning null, when the cancellation token is not used.
OperationTimeout"00:00:30"The duration to wait for relevant async methods to complete before timing out.
ConnectionsMaxIdle"00:00:00"Close broker connections after the specified time of inactivity.
Acks"All"This field indicates the number of acknowledgements the leader broker must receive from ISR brokers before responding to the request.
EnableIdempotencetrueWhen set to true, the producer will ensure that messages are successfully produced exactly once and in the original produce order.