I am currently working in a project where we are utilizing kafka as a Message Queue. We have two use cases here, one is to consume the messages in parallel, no ordering of any kind required. And one is to have ordering based on partitioning key provided during producing the message.
In kafka, one partition key will always go to the same partition. Multiple partition keys might go to the same partition as well.
But, we do not want to block processing of a message with a different partition key which is present on the same partition as other messages with different partition keys.
With this context, we have written two implementations of our kafka consumer. One will process the messages in parallel, without worrying about the order, and one will process the messages sequentially per partition key.
Now, I have created an abstract class for kafka consumer, which does not know whether to process the messages in parallel or sequential of whatever strategy. It just has all the configurations that are required for a consumer to start, and it has 4 methods, poll
, consume
, commit
and waitOptionally
, that it calls and implements.
Now, this abstract class will have two implementations, one for processing the messages in parallel, where the implementation will override consume
, commit
and waitOptionally
. One will be sequential processing per partition key, which will again override consume
, commit
and waitOptionally
. poll
remains same in both the implementations, and it's already there in abstract kafka consumer.
Approach 1
Pseudocode:
public abstract class AbstractKafkaConsumerRunnable<T> implements Runnable {
private boolean isAutoCommit;
private Map<String, List<ConsumerRecord<String, T>> partitionKeyVsconsumedRecords;
abstract protected List<T> consume(List<T> records);
protected List<T> poll() {
....
}
protected void commit(List<T> currentlyPolledRecords) {
if (isAutoCommit) {
return;
}
//logic to commit the records that are processed.
//state maintained in consumedRecords
}
protected void waitOptionally() {
....
}
@Override
public void run() {
//poll
//consume
//commit
//waitOptionally
}
}
public class SequentialKafkaConsumerRunnable extends AbstractKafkaConsumerRunnable<MessagingEvent> {
private Map<String, List<MessagingEvent>> recordsVsPartitionKey;
@Override
protected void consume(List<MessagingEvent> records) {
//sequential consumption.
//maintain state of record processing.
//call user's consume method.
}
}
public class ParallelKafkaConsumerRunnable extends AbstractKafkaConsumerRunnable<MessagingEvent> {
@Override
protected void consume(List<MessagingEvent> records) {
//parallel consumption.
//call user's consume method.
}
}
In this approach, I'm defining boolean auto commit in my abstract class, and my implementation defines it. Sequential will return false and Parallel will return true. Based on this flag, I have written the logic in abstract kafka consumer class. So, commit implementation is not there in implementation classes, consume
and waitOptionally
is implemented by implementation classes.
Approach 2
Pseudocode:
public abstract class AbstractKafkaConsumerRunnable<T> implements Runnable {
abstract protected void consume(List<T> records);
abstract protected void commit(List<T> currentlyPolledRecords);
abstract protected void waitOptionally();
protected List<T> poll() {
....
}
@Override
public void run() {
//poll
//consume
//commit
//waitOptionally
}
}
public class SequentialKafkaConsumerRunnable extends AbstractKafkaConsumerRunnable<MessagingEvent> {
private Map<String, List<ConsumerRecord<String, T>> partitionKeyVsconsumedRecords;
private Map<String, List<MessagingEvent>> recordsVsPartitionKey;
@Override
protected void consume(List<MessagingEvent> records) {
//sequential consumption.
//maintain state of record processing.
//call user's consume method.
}
@Override
protected void commit(List<T> currentlyPolledRecords) {
//commit logic.
//maintain state of commits.
}
@Override
protected void waitOptionally() {
//logic based on currently processing records.
//Uses state as well.
}
}
public class ParallelKafkaConsumerRunnable extends AbstractKafkaConsumerRunnable<MessagingEvent> {
@Override
protected void consume(List<MessagingEvent> records) {
//parallel consumption.
//call user's consume method.
}
@Override
protected void waitOptionally() {
....
}
@Override
protected void commit(List<T> currentlyPolledRecords) {
return;
}
}
In this approach, I've defined abstract methods consume
, commit
and waitOptionally
and implemented them in both of my implementation classes.
Approach 3
Pseudocode:
public abstract class AbstractKafkaConsumerRunnable<T> implements Runnable {
abstract protected List<T> consume(List<T> records);
abstract protected List<T> commit(List<T> currentlyPolledRecords);
abstract protected void waitOptionally();
protected List<T> poll() {
....
}
@Override
public void run() {
//poll
//consume
//commit
//waitOptionally
}
}
public class SequentialKafkaConsumerRunnable extends AbstractKafkaConsumerRunnable<MessagingEvent> {
private KafkaMessageCommitTracker kafkaMessageCommitTracker;
private KafkaMessageConsumptionStore kafkaMessageConsumptionStore;
@Override
protected void consume(List<MessagingEvent> records) {
//add in kafkaMessageConsumptionStore.
//call user's consume method.
}
@Override
protected void commit(List<T> currentlyPolledRecords) {
//call kafkaMessageCommitTracker.commit()
}
@Override
protected void waitOptionally() {
//logic based on currently processing records.
//Uses state as well.
}
}
public class ParallelKafkaConsumerRunnable extends AbstractKafkaConsumerRunnable<MessagingEvent> {
@Override
protected void consume(List<MessagingEvent> records) {
//parallel consumption.
//call user's consume method.
}
@Override
protected void waitOptionally() {
....
}
@Override
protected void commit(List<T> currentlyPolledRecords) {
return;
}
}
public interface KafkaMessageCommitTracker {
void commit();
...other state related functions like addRecord, pollRecord, isRecordPresent.
}
public interface KafkaMessageConsumptionStore {
void addRecord(String partitionKey);
void pollRecord(String partitionKey);
...other state related functions like isRecordPresent, totalNumberOfRecords.
}
This is similar to above approach, but I've further extracted out commit
and consume
logic in different classes (with abstractions) and made them as composition to my Sequential class. This is done with having 1 more level of granularity in mind with respect to SOLID's SRP, so that if commit logic changes, only commit class will change, and no other code needs to change with it. In this logic, only sequential class will be aware of these compositions, as only Sequential will be using it, and abstract and parallel class will have no knowledge of these abstractions.
Approach 4
In this approach, I don't have any abstractions. Just 1 KafkaConsumer class which takes strategies as it's constructor parameters. pollStrategy
, commitStrategy
, consumeStrategy
, waitOptionallyStrategy
. And my clients can pass in whatever strategies they want to plug and play with. All the strategies will be an interface with multiple implementations and any implementation can be passed in by the client.
Do keep in mind that some of the consume
and commit
strategies will have to maintain in memory state as well, which I'm keeping in the strategy class only, not exposing it to the outside world.
I'm confused on how to decide which approach should I take for my context. This is an enterprise application and although kafka is used a lot, but the frequency of change in it's implementation is EXTREMELY rare.
Approach 1 is depending on a boolean flag to implement logic, which I feel like is not extendable IF other use cases pop up, although again EXTREMELY rare.
Approach 2 is good, but the Sequential class ends up with too much logic, consume
has it's own logic and state, commit
has it's own logic and state, all in one class.
Approach 3 is what I'm aligned with, where I have defined the implementations, and further more extracted out the commit
and consume
logic in different classes with abstractions. I feel like that is just the right amount of granularity I can live with.
Approach 4 is too granular, where I'm making my clients understand what strategy does what, where else most of the time the clients just want to consume the messages, not worrying about the underlying logic.