This is an IBM Automation portal for Integration products. To view all of your ideas submitted to IBM, create and manage groups of Ideas, or create an idea explicitly set to be either visible by all (public) or visible only to you and IBM (private), use the IBM Unified Ideas Portal (https://ideas.ibm.com).
We invite you to shape the future of IBM, including product roadmaps, by submitting ideas that matter to you the most. Here's how it works:
Start by searching and reviewing ideas and requests to enhance a product or service. Take a look at ideas others have posted, and add a comment, vote, or subscribe to updates on them if they matter to you. If you can't find what you are looking for,
Post an idea.
Get feedback from the IBM team and other customers to refine your idea.
Follow the idea through the IBM Ideas process.
Welcome to the IBM Ideas Portal (https://www.ibm.com/ideas) - Use this site to find out additional information and details about the IBM Ideas process and statuses.
IBM Unified Ideas Portal (https://ideas.ibm.com) - Use this site to view all of your ideas, create new ideas for any IBM product, or search for ideas across all of IBM.
ideasibm@us.ibm.com - Use this email to suggest enhancements to the Ideas process or request help from IBM for submitting your Ideas.
Spoke with the technical team and have the following that they would like to get clarified on the notion of using Kafka transactions as per the suggested enhancement solution (#2)
Kafka transaction seems to be a part of the native Kafka Java client, as opposed to the Kafka cluster, can this be confirmed
It appears that Kafka transaction favours message integrity ("one and only one" support versus "at least once") over pure streaming
The following is an excerpt from a post regarding the design and intent of Kafka transactions
___________________________________________________
Using vanilla Kafka producers and consumers configured for at-least-once delivery semantics, a stream processing application could lose exactly-once processing semantics in the following ways:
The producer.send() could result in duplicate writes of message B due to internal retries. This is addressed by the idempotent producer and is not the focus of the rest of this post.
We may reprocess the input message A, resulting in duplicate B messages being written to the output, violating the exactly-once processing semantics. Reprocessing may happen if the stream processing application crashes after writing B but before marking A as consumed. Thus when it resumes, it will consume A again and write B again, causing a duplicate.
Finally, in distributed environments, applications will crash or—worse!—temporarily lose connectivity to the rest of the system. Typically, new instances are automatically started to replace the ones which were deemed lost. Through this process, we may have multiple instances processing the same input topics and writing to the same output topics, causing duplicate outputs and violating the exactly-once processing semantics. We call this the problem of "zombie instances."
With the above three points givem, we designed transaction APIs in Kafka to solve the second and third problems. Transactions enable exactly-once processing in read-process-write cycles by making these cycles atomic and by facilitating zombie fencing
______________________________________________
Given the excerpt above and along with what the product team understands about the use case at the client. What we want is a little more clarity about how the use of transaction would be fit-for-purpose to increase throughput while maintaining sequence and assurance.
Thanks for the level of detail you have provided here as well as the thought put into two options. (along with the recommended approach. From my lens and given my knowledge of the situation, it looks quite viable. I have time set of with the stakeholders next week to review the recommendation and obtain feedback--which I will then bring back here.
RFE / Idea Review. Thank you very much for raising this enhancement request and also for spending time discussing some of the project and architectural background which have led you to this stage. For the benefit of other readers and voters, we feel it would be helpful to fill in some more details about the overall situation, and following that a suggestion for a potential way we could implement a change to the product which might help your use case.
Problem Statement:
There exists an MQ queue holding persistent messages which are the inputs to a message flow.
The messages represent a logical sequence of data.
They are placed on to the MQ queue in a specific order. This order needs to be maintained when the data is transferred by an ACE message flow to a Kafka cluster.
Kafka clusters maintain a partitioned log for each topic, with all the messages from the same producer being sent to the same partition and added in the order they arrive. In this way, partitions are structured commit logs, holding ordered and immutable sequences of records.
The ACE message flow defines a partition key to indicate the destination partition which should be used for any given message.
You would like to be able to read multiple messages from the input MQ queue and have the ACE message flow send them to Kafka as a "batch".
Current Behaviour of ACE:
The ACE KafkaProducer message flow node sends messages to a Kafka cluster using a connection factory which utilises a single instance of the org.apache.kafka.clients.producer.KafkaProducer java class. This class does does not interact with the MQ transaction on the message flow, and each KafkaProducer node in the flow looks after its own connection to Kafka independently of other Kafka nodes in the flow. Currently, the KafkaProducer node sends its messages to Kafka by calling the send() method on the KafkaProducer object and then ACE immediately performs a get() on the Future which is returned by the send() method. This means that in circumstances where the flow is configured with Ack > 0, the message flow thread will not return to take any new input messages until the send to Kafka has been successfully acknowledged. The product's current behaviour of combining the send and wait for the ack into a single message flow node currently suits the majority of message flow scenarios and makes error handling predictable and easy to understand and implement in a message flow ... but as noted this is at the expense of possible batching of messages, and in the case of requiring higher qualities of service, we note that Kafka's transactional API is not currently exploited by the product.
Potential Enhancement Suggestion 1:
In cases where Ack=0, as soon as the send() method has been called, the message flow thread could be released to do further work (eg return back to the input queue to take the next message in the sequence) rather than waiting for the future completion. This could potentially enable ACE to exhibit faster performance because the message flow thread is not sitting idly waiting for the Kafka cluster to acknowledge the sending of the message. Because the node would not be waiting for an acknowledgement, there would be no information available from the Kafka cluster to nodes downstream of KafkaProducer. Once the send() method has been called, each message would be added to the in-memory store of messages which are held by the Kafka client waiting to be sent to the Kafka cluster. The batch of messages would be sent to the Kafka cluster (tangential to the work being done on the message flow thread) either when the batch hits the max size (batch.size), or when the max duration has been hit (linger.ms) Unfortunately the above option would not work well with the scenario presented where not only must message ordering be maintained, but there is a desire to not take the next message from the input queue if the put to Kafka has been unsuccessful. Whilst likely to provide faster performance, this potential enhancement would also leave the message flow developer with an additional burden of handling situations where the sending of messages to Kafka does not result in a successful acknowledgement being returned within a particular timeout. In order to react to this turn of events, we would also require some way of passing control back to the message flow at this turn of events .... perhaps with a new message flow node, from which a propagation of either the succesful acknowledgement, or the timeout information? This node would operate its own message flow thread separate to the initial message flow thread discussed above. The flow developer could then choose to take other actions in failure situations.
Given the fact that this first potential enhancement wouldn't seem to meet all aspects of the problem you posed, we feel that another potential enhancement might work better ...
Potential Enhancement Suggestion 2:
Enhance the KafkaProducer node to utilise the Kafka API's transactional possibilities. We could add a setting to the Kafka Producer node which equates to "Do my work under a Kafka transaction" setting, and have a property on the node for the transactional.id (or take it from the LE):
producerProps.put("enable.idempotence", "true");
producerProps.put("transactional.id", "prod-1");
KafkaProducer<String, String> producer = new KafkaProducer(producerProps);
producer.beginTransaction();
... and then each time through the message flow thread when we hit the KafkaProducer node do this ...
producer.send();
... and then also have a new input terminal on the KafkaProducer node which would control this:
producer.commitTransaction();
This would allow a user to design a message flow which does one or more MQGets (using either an MQGet or MQInput node) from the "input queue" all within the same MQ UoW, and thus be in control of the frequency with which the commit is executed.
We feel that this potential solution would have the following benefits:
1/ It opens up the timing of the execution of the commit of the Kafka messages to be under the control of the flow developer rather than built it in to one of the nodes in the message flow as a black-box.
2/ It makes it easier for flow developers who have message flow use cases that span multiple transports (some of which will be transactional and others non-transactional)
3/ It would avoid needing a separate node in the message flow to mark the point at which a commit should occur (representing an independent thread to "Handle errors" or even "Handle commits")
Whilst the original scenario presented is very much focussed on MQ-to-Kafka, other users have raised potential mixed-transport scenarios ... For example:
i) TCPIP socket input to Kafka output
ii) File input streamed read to Kafka output
In these situations it would be difficult for us to infer what size the kafka producer transaction should be before we issue the commit .. for example imagine the FileInput node situation, if you were streaming in 1000 records from a file, some users might prefer to issue the commit every 1, 10 or 100 records, whilst others would want "the whole file" committed together.
Of course, there could be no guarantees about the future performance of either solution, but we think particularly the second potenial enhancement would seem like the best match for all the info you've provided us so far.
We are changing the status of this enhancement request to "Future Consideration" and we invite voting but more especially any comments from the community about these potential improvements for the KafkaProducer node.