Resilient kafka consumption
Thoth provides a resilient kafka consumer.
Installation
- sbt
val ThothVersion = "0.1.0*" libraryDependencies += "fr.maif" % "thoth-kafka-consumer-reactor$" % ThothVersion
- Maven
<properties> <thoth.version>0.1.0*</thoth.version> </properties> <dependencies> <dependency> <groupId>fr.maif</groupId> <artifactId>thoth-kafka-consumer-reactor$</artifactId> <version>${thoth.version}</version> </dependency> </dependencies>
- Gradle
def versions = [ ThothVersion: "0.1.0*" ] dependencies { implementation "fr.maif:thoth-kafka-consumer-reactor$:${versions.ThothVersion}" }
Usage
ResilientKafkaConsumer<String, String> resilientKafkaConsumer = ResilientKafkaConsumer.create(
// Name of the consumer (for logs etc ...)
"test",
ResilientKafkaConsumer.Config.create(
List.of(topic),
groupId,
receiverOptions
),
event -> {
System.out.println(event.value());
}
);
Consuming event
There 3 way to consume events : blocking, non-blocking, streams
Blocking
It append when you a blocking call (i.e JDBC), in this case, a “parallel” scheduler is chosen.
ResilientKafkaConsumer<String, String> resilientKafkaConsumer = ResilientKafkaConsumer.create(
// Name of the consumer (for logs etc ...)
"test",
ResilientKafkaConsumer.Config.create(
List.of(topic),
groupId,
receiverOptions
),
event -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
Non Blocking
ResilientKafkaConsumer<String, String> resilientKafkaConsumer = ResilientKafkaConsumer.create(
"test",
ResilientKafkaConsumer.Config.create(
List.of(topic),
groupId,
receiverOptions
),
// Non blocking handling
(ReceiverRecord<String, String> event) -> Mono.fromCallable(() -> {
System.out.println(event.value());
return event;
})
);
Streams
Stream handling is done using the reactor Flux
.
In that stream you can, skip, group etc do whatever you want but at the end you have to give back the event.
ResilientKafkaConsumer.createFromFlow(
"test",
ResilientKafkaConsumer.Config.create(
List.of(topic),
groupId,
receiverOptions
),
flux -> flux
.index()
.concatMap(messageAndIndex -> {
Long index = messageAndIndex.getT1();
System.out.println("Message number " + index);
var event = messageAndIndex.getT2();
Mono<String> asyncApiCall = asyncApiCall();
return asyncApiCall.map(it -> event);
})
);
Handling crash
The goal of this consumer is to handle crashes by retrying the failed event consumption. There two types of errors :
- Parsing / Business errors : in that case, you should push the failing event in a dead letter queue to analyse the issue and move to the next event.
- Technical errors : e.g. a database or an API that is not available at time, in that case, you have to let it crash. The consumer will let the event uncommitted, will disconnect from kafka and restart later, reading the message again.
You can configure the consumer to set appropriate values for restart interval …
ResilientKafkaConsumer.Config.create(
List.of(topic),
groupId,
receiverOptions
)
.withCommitSize(5)
.withMinBackoff(Duration.ofMillis(200))
.withMaxBackoff(Duration.ofMinutes(10))
.withRandomFactor(0.2d);
Status and lifecycles
The resilient kafka consumer has a lifecycle and will have the following states :
Starting
: The consumer is startingStarted
: The consumer has startedFailed
: The consumer has crashed and will restart, the kafka client is no longer connected to the cluster.Stopping
: The consumer is stoppingStopped
: The consumer is stopped, the kafka client is no longer connected to the cluster.
The status is exposed by the ResilientKafkaConsumer
using the status
method.
ResilientKafkaConsumer<String, String> resilientKafkaConsumer = ResilientKafkaConsumer.create(...);
Status status = resilientKafkaConsumer.status();
You can also register callbacks :
ResilientKafkaConsumer<String, String> resilientKafkaConsumer = ResilientKafkaConsumer.create(
ResilientKafkaConsumer.Config.create(
List.of(topic),
groupId,
receiverOptions
)
.withOnStarting(() -> Mono.fromRunnable(() -> {
}))
.withOnStarted((c, time) -> Mono.fromRunnable(() -> {
}))
.withOnStopping(() -> Mono.fromRunnable(() -> {
}))
.withOnStopped(() -> Mono.fromRunnable(() -> {
}))
.withOnFailed(e -> Mono.fromRunnable(() -> {
})),
event -> {
names.set(names.get() + " " + event.record().value());
}
);