Skip to content

Shuttle.Hopper.AzureEventHubs

Azure Event Hubs implementation for use with Shuttle.Hopper.

Installation

bash
dotnet add package Shuttle.Hopper.AzureEventHubs

Configuration

The URI structure is azureeh://configuration-name/hub-name.

c#
services.AddHopper(builder =>
{
    builder.UseAzureEventHubs(eventHubBuilder =>
    {
        var eventHubOptions = new EventHubOptions
        {
            ConnectionString = "Endpoint=sb://{hub-namespace}.servicebus.windows.net/;SharedAccessKeyName={key-name};SharedAccessKey={key};EntityPath={hub-name}",
            ProcessEvents = true,
            ConsumerGroup = "$Default",
            BlobStorageConnectionString = "{BlobStorageConnectionString}",
            BlobContainerName = "{BlobContainerName}",
            OperationTimeout = TimeSpan.FromSeconds(30),
            ConsumeTimeout = TimeSpan.FromSeconds(30),
            DefaultStartingPosition = EventPosition.Latest,
            CheckpointInterval = 1
        };

        eventHubOptions.ProcessError.Register(async args =>
        {
            Console.WriteLine($"[error] : {args.Exception.Message}");
            await Task.CompletedTask;
        });

        eventHubBuilder.AddOptions("azure", eventHubOptions);
    });
});

The default JSON settings structure is as follows:

json
{
  "Shuttle": {
    "AzureEventHubs": {
      "azure": {
        "ConnectionString": "Endpoint=sb://{hub-namespace}.servicebus.windows.net/;SharedAccessKeyName={key-name};SharedAccessKey={key};EntityPath={hub-name}",
        "ProcessEvents": false,
        "ConsumerGroup": "$Default",
        "BlobStorageConnectionString": "{BlobStorageConnectionString}",
        "BlobContainerName": "{BlobContainerName}",
        "OperationTimeout": "00:00:30",
        "ConsumeTimeout": "00:00:30",
        "DefaultStartingPosition": "Latest",
        "CheckpointInterval": 1
      }
    }
  }
}

Options

Segment / ArgumentDefaultDescription
ConnectionStringThe Azure Event Hubs endpoint to connect to.
ProcessEventsfalseIndicates whether the endpoint will process messages. If true, an EventProcessorClient is instanced and configured.
ConsumerGroup"$Default"The consumer group to use when processing events.
BlobStorageConnectionStringThe Azure Storage Account endpoint to connect to in order to perform checkpoints.
BlobContainerNameThe blob container name where checkpoints will be stored.
OperationTimeout"00:00:30"The duration to wait for relevant async methods to complete before timing out.
ConsumeTimeout"00:00:30"The duration to poll for messages before returning null.
DefaultStartingPositionLatestThe default starting position to use when no checkpoint exists.
CheckpointInterval1The number of events to process before performing a checkpoint.
ClientIdentifierA unique identifier for the client.