Error deserializing key value for partition

Hi I am using Akka-Stream-Kafka for consuming messages from Kafka in a Play framework application. Here are the details: build.sbt libraryDependencies += "com.typesafe.akka" %% "akka...
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.lang.Thread.run(Thread.java:745)
2020-04-10 09:15:29,537 [lenderId-,transactionId-,userId-,refrenceNumber-] ERROR - o.s.k.l.LoggingErrorHandler         - Error while processing: null
41333org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition borrower_log-0 at offset 13. If needed, please seek past the record to continue consumption.
Caused by: java.lang.IllegalStateException: No type information in headers and no default type provided
	at org.springframework.util.Assert.state(Assert.java:73)
	at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:220)
	at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:923)
	at org.apache.kafka.clients.consumer.internals.Fetcher.access$2600(Fetcher.java:93)
	at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1100)
	at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1200(Fetcher.java:949)
	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:570)
	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:531)
	at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1154)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1111)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:699)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.lang.Thread.run(Thread.java:745)
2020-04-10 09:15:29,538 [lenderId-,transactionId-,userId-,refrenceNumber-] ERROR - o.s.k.l.LoggingErrorHandler         - Error while processing: null
41334org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition borrower_log-0 at offset 13. If needed, please seek past the record to continue consumption.
Caused by: java.lang.IllegalStateException: No type information in headers and no default type provided
	at org.springframework.util.Assert.state(Assert.java:73)
	at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:220)
	at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:923)
	at org.apache.kafka.clients.consumer.internals.Fetcher.access$2600(Fetcher.java:93)
	at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1100)
	at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1200(Fetcher.java:949)
	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:570)
	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:531)
	at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1154)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1111)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:699)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.lang.Thread.run(Thread.java:745)
2020-04-10 09:15:29,539 [lenderId-,transactionId-,userId-,refrenceNumber-] ERROR - o.s.k.l.LoggingErrorHandler         - Error while processing: null
41335org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition borrower_log-0 at offset 13. If needed, please seek past the record to continue consumption.
Caused by: java.lang.IllegalStateException: No type information in headers and no default type provided
	at org.springframework.util.Assert.state(Assert.java:73)
	at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:220)
	at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:923)
	at org.apache.kafka.clients.consumer.internals.Fetcher.access$2600(Fetcher.java:93)
	at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1100)
	at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1200(Fetcher.java:949)
	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:570)
	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:531)
	at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1154)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1111)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:699)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.lang.Thread.run(Thread.java:745)
2020-04-10 09:15:29,540 [lenderId-,transactionId-,userId-,refrenceNumber-] ERROR - o.s.k.l.LoggingErrorHandler         - Error while processing: null
41336org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition borrower_log-0 at offset 13. If needed, please seek past the record to continue consumption.
Caused by: java.lang.IllegalStateException: No type information in headers and no default type provided
	at org.springframework.util.Assert.state(Assert.java:73)
	at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:220)
	at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:923)
	at org.apache.kafka.clients.consumer.internals.Fetcher.access$2600(Fetcher.java:93)
	at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1100)
	at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1200(Fetcher.java:949)
	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:570)
	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:531)
	at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1154)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1111)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:699)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.lang.Thread.run(Thread.java:745)
2020-04-10 09:15:29,541 [lenderId-,transactionId-,userId-,refrenceNumber-] ERROR - o.s.k.l.LoggingErrorHandler         - Error while processing: null
41337org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition borrower_log-0 at offset 13. If needed, please seek past the record to continue consumption.
Caused by: java.lang.IllegalStateException: No type information in headers and no default type provided
	at org.springframework.util.Assert.state(Assert.java:73)
	at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:220)
	at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:923)
	at org.apache.kafka.clients.consumer.internals.Fetcher.access$2600(Fetcher.java:93)
	at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1100)
	at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1200(Fetcher.java:949)
	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:570)
	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:531)
	at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1154)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1111)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:699)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.lang.Thread.run(Thread.java:745)
2020-04-10 09:15:29,542 [lenderId-,transactionId-,userId-,refrenceNumber-] ERROR - o.s.k.l.LoggingErrorHandler         - Error while processing: null
41338org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition borrower_log-0 at offset 13. If needed, please seek past the record to continue consumption.
Caused by: java.lang.IllegalStateException: No type information in headers and no default type provided
	at org.springframework.util.Assert.state(Assert.java:73)
	at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:220)
	at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:923)
	at org.apache.kafka.clients.consumer.internals.Fetcher.access$2600(Fetcher.java:93)
	at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1100)
	at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1200(Fetcher.java:949)
	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:570)
	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:531)
	at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1154)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1111)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:699)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.lang.Thread.run(Thread.java:745)
2020-04-10 09:15:29,543 [lenderId-,transactionId-,userId-,refrenceNumber-] ERROR - o.s.k.l.LoggingErrorHandler         - Error while processing: null
41339org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition borrower_log-0 at offset 13. If needed, please seek past the record to continue consumption.
Caused by: java.lang.IllegalStateException: No type information in headers and no default type provided
	at org.springframework.util.Assert.state(Assert.java:73)
	at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:220)
	at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:923)
	at org.apache.kafka.clients.consumer.internals.Fetcher.access$2600(Fetcher.java:93)
	at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1100)
	at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1200(Fetcher.java:949)
	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:570)
	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:531)
	at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1154)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1111)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:699)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.lang.Thread.run(Thread.java:745)
2020-04-10 09:15:29,544 [lenderId-,transactionId-,userId-,refrenceNumber-] ERROR - o.s.k.l.LoggingErrorHandler         - Error while processing: null
41340org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition borrower_log-0 at offset 13. If needed, please seek past the record to continue consumption.
Caused by: java.lang.IllegalStateException: No type information in headers and no default type provided
	at org.springframework.util.Assert.state(Assert.java:73)
	at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:220)
	at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:923)
	at org.apache.kafka.clients.consumer.internals.Fetcher.access$2600(Fetcher.java:93)
	at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1100)
	at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1200(Fetcher.java:949)
	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:570)
	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:531)
	at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1154)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1111)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:699)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.lang.Thread.run(Thread.java:745)
2020-04-10 09:15:29,545 [lenderId-,transactionId-,userId-,refrenceNumber-] ERROR - o.s.k.l.LoggingErrorHandler         - Error while processing: null
41341org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition borrower_log-0 at offset 13. If needed, please seek past the record to continue consumption.
Caused by: java.lang.IllegalStateException: No type information in headers and no default type provided
	at org.springframework.util.Assert.state(Assert.java:73)
	at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:220)
	at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:923)
	at org.apache.kafka.clients.consumer.internals.Fetcher.access$2600(Fetcher.java:93)
	at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1100)
	at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1200(Fetcher.java:949)
	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:570)
	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:531)
	at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1154)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1111)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:699)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.lang.Thread.run(Thread.java:745)
2020-04-10 09:15:29,545 [lenderId-,transactionId-,userId-,refrenceNumber-] ERROR - o.s.k.l.LoggingErrorHandler         - Error while processing: null
41341org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition borrower_log-0 at offset 13. If needed, please seek past the record to continue consumption.
Caused by: java.lang.IllegalStateException: No type information in headers and no default type provided
	at org.springframework.util.Assert.state(Assert.java:73)
	at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:220)
	at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:923)
	at org.apache.kafka.clients.consumer.internals.Fetcher.access$2600(Fetcher.java:93)
	at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1100)
	at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1200(Fetcher.java:949)
	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:570)
	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:531)
	at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1154)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1111)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:699)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.lang.Thread.run(Thread.java:745)
2020-04-10 09:15:29,546 [lenderId-,transactionId-,userId-,refrenceNumber-] ERROR - o.s.k.l.LoggingErrorHandler         - Error while processing: null
41342org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition borrower_log-0 at offset 13. If needed, please seek past the record to continue consumption.
Caused by: java.lang.IllegalStateException: No type information in headers and no default type provided
	at org.springframework.util.Assert.state(Assert.java:73)
	at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:220)
	at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:923)
	at org.apache.kafka.clients.consumer.internals.Fetcher.access$2600(Fetcher.java:93)
	at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1100)
	at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1200(Fetcher.java:949)
	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:570)
	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:531)
	at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1154)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1111)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:699)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.lang.Thread.run(Thread.java:745)
2020-04-10 09:15:29,547 [lenderId-,transactionId-,userId-,refrenceNumber-] ERROR - o.s.k.l.LoggingErrorHandler         - Error while processing: null
41343org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition borrower_log-0 at offset 13. If needed, please seek past the record to continue consumption.
Caused by: java.lang.IllegalStateException: No type information in headers and no default type provided
	at org.springframework.util.Assert.state(Assert.java:73)
	at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:220)
	at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:923)
	at org.apache.kafka.clients.consumer.internals.Fetcher.access$2600(Fetcher.java:93)
	at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1100)
	at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1200(Fetcher.java:949)
	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:570)
	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:531)
	at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1154)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1111)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:699)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.lang.Thread.run(Thread.java:745)
2020-04-10 09:15:29,548 [lenderId-,transactionId-,userId-,refrenceNumber-] ERROR - o.s.k.l.LoggingErrorHandler         - Error while processing: null
41344org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition borrower_log-0 at offset 13. If needed, please seek past the record to continue consumption.
Caused by: java.lang.IllegalStateException: No type information in headers and no default type provided
	at org.springframework.util.Assert.state(Assert.java:73)
	at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:220)
	at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:923)
	at org.apache.kafka.clients.consumer.internals.Fetcher.access$2600(Fetcher.java:93)
	at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1100)
	at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1200(Fetcher.java:949)
	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:570)
	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:531)
	at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1154)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1111)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:699)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.lang.Thread.run(Thread.java:745)
2020-04-10 09:15:29,549 [lenderId-,transactionId-,userId-,refrenceNumber-] ERROR - o.s.k.l.LoggingErrorHandler         - Error while processing: null
41345org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition borrower_log-0 at offset 13. If needed, please seek past the record to continue consumption.
Caused by: java.lang.IllegalStateException: No type information in headers and no default type provided
	at org.springframework.util.Assert.state(Assert.java:73)
	at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:220)
	at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:923)
	at org.apache.kafka.clients.consumer.internals.Fetcher.access$2600(Fetcher.java:93)
	at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1100)
	at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1200(Fetcher.java:949)
	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:570)
	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:531)
	at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1154)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1111)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:699)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.lang.Thread.run(Thread.java:745)
2020-04-10 09:15:29,550 [lenderId-,transactionId-,userId-,refrenceNumber-] ERROR - o.s.k.l.LoggingErrorHandler         - Error while processing: null
41346org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition borrower_log-0 at offset 13. If needed, please seek past the record to continue consumption.
Caused by: java.lang.IllegalStateException: No type information in headers and no default type provided
	at org.springframework.util.Assert.state(Assert.java:73)
	at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:220)
	at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:923)
	at org.apache.kafka.clients.consumer.internals.Fetcher.access$2600(Fetcher.java:93)
	at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1100)
	at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1200(Fetcher.java:949)
	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:570)
	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:531)
	at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1154)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1111)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:699)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.lang.Thread.run(Thread.java:745)
2020-04-10 09:15:29,551 [lenderId-,transactionId-,userId-,refrenceNumber-] ERROR - o.s.k.l.LoggingErrorHandler         - Error while processing: null
41347org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition borrower_log-0 at offset 13. If needed, please seek past the record to continue consumption.
Caused by: java.lang.IllegalStateException: No type information in headers and no default type provided
	at org.springframework.util.Assert.state(Assert.java:73)
	at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:220)
	at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:923)
	at org.apache.kafka.clients.consumer.internals.Fetcher.access$2600(Fetcher.java:93)
	at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1100)
	at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1200(Fetcher.java:949)
	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:570)
	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:531)
	at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1154)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1111)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:699)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.lang.Thread.run(Thread.java:745)
2020-04-10 09:15:29,552 [lenderId-,transactionId-,userId-,refrenceNumber-] ERROR - o.s.k.l.LoggingErrorHandler         - Error while processing: null
41348org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition borrower_log-0 at offset 13. If needed, please seek past the record to continue consumption.
Caused by: java.lang.IllegalStateException: No type information in headers and no default type provided
	at org.springframework.util.Assert.state(Assert.java:73)
	at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:220)
	at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:923)
	at org.apache.kafka.clients.consumer.internals.Fetcher.access$2600(Fetcher.java:93)
	at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1100)
	at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1200(Fetcher.java:949)
	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:570)
	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:531)
	at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1154)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1111)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:699)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.lang.Thread.run(Thread.java:745)
2020-04-10 09:15:29,553 [lenderId-,transactionId-,userId-,refrenceNumber-] ERROR - o.s.k.l.LoggingErrorHandler         - Error while processing: null
41349org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition borrower_log-0 at offset 13. If needed, please seek past the record to continue consumption.
Caused by: java.lang.IllegalStateException: No type information in headers and no default type provided
	at org.springframework.util.Assert.state(Assert.java:73)
	at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:220)
	at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:923)
	at org.apache.kafka.clients.consumer.internals.Fetcher.access$2600(Fetcher.java:93)
	at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1100)
	at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1200(Fetcher.java:949)
	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:570)
	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:531)
	at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1154)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1111)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:699)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.lang.Thread.run(Thread.java:745)
2020-04-10 09:15:29,553 [lenderId-,transactionId-,userId-,refrenceNumber-] ERROR - o.s.k.l.LoggingErrorHandler         - Error while processing: null
41349org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition borrower_log-0 at offset 13. If needed, please seek past the record to continue consumption.
Caused by: java.lang.IllegalStateException: No type information in headers and no default type provided
	at org.springframework.util.Assert.state(Assert.java:73)
	at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:220)
	at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:923)
	at org.apache.kafka.clients.consumer.internals.Fetcher.access$2600(Fetcher.java:93)
	at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1100)
	at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1200(Fetcher.java:949)
	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:570)
	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:531)
	at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1154)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1111)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:699)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.lang.Thread.run(Thread.java:745)
2020-04-10 09:15:29,554 [lenderId-,transactionId-,userId-,refrenceNumber-] ERROR - o.s.k.l.LoggingErrorHandler         - Error while processing: null
41350org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition borrower_log-0 at offset 13. If needed, please seek past the record to continue consumption.
Caused by: java.lang.IllegalStateException: No type information in headers and no default type provided
	at org.springframework.util.Assert.state(Assert.java:73)
	at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:220)
	at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:923)
	at org.apache.kafka.clients.consumer.internals.Fetcher.access$2600(Fetcher.java:93)
	at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1100)
	at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1200(Fetcher.java:949)
	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:570)
	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:531)
	at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1154)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1111)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:699)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.lang.Thread.run(Thread.java:745)
2020-04-10 09:15:29,555 [lenderId-,transactionId-,userId-,refrenceNumber-] ERROR - o.s.k.l.LoggingErrorHandler         - Error while processing: null
41351org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition borrower_log-0 at offset 13. If needed, please seek past the record to continue consumption.
Caused by: java.lang.IllegalStateException: No type information in headers and no default type provided
	at org.springframework.util.Assert.state(Assert.java:73)
	at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:220)
	at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:923)
	at org.apache.kafka.clients.consumer.internals.Fetcher.access$2600(Fetcher.java:93)
	at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1100)
	at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1200(Fetcher.java:949)
	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:570)
	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:531)
	at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1154)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1111)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:699)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.lang.Thread.run(Thread.java:745)
2020-04-10 09:15:29,556 [lenderId-,transactionId-,userId-,refrenceNumber-] ERROR - o.s.k.l.LoggingErrorHandler         - Error while processing: null
41352org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition borrower_log-0 at offset 13. If needed, please seek past the record to continue consumption.
Caused by: java.lang.IllegalStateException: No type information in headers and no default type provided
	at org.springframework.util.Assert.state(Assert.java:73)
	at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:220)
	at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:923)
	at org.apache.kafka.clients.consumer.internals.Fetcher.access$2600(Fetcher.java:93)
	at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1100)
	at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1200(Fetcher.java:949)
	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:570)
	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:531)
	at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1154)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1111)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:699)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.lang.Thread.run(Thread.java:745)
2020-04-10 09:15:29,557 [lenderId-,transactionId-,userId-,refrenceNumber-] ERROR - o.s.k.l.LoggingErrorHandler         - Error while processing: null
41353org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition borrower_log-0 at offset 13. If needed, please seek past the record to continue consumption.
Caused by: java.lang.IllegalStateException: No type information in headers and no default type provided
	at org.springframework.util.Assert.state(Assert.java:73)
	at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:220)
	at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:923)
	at org.apache.kafka.clients.consumer.internals.Fetcher.access$2600(Fetcher.java:93)
	at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1100)
	at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1200(Fetcher.java:949)
	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:570)
	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:531)
	at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1154)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1111)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:699)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.lang.Thread.run(Thread.java:745)
2020-04-10 09:15:29,557 [lenderId-,transactionId-,userId-,refrenceNumber-] ERROR - o.s.k.l.LoggingErrorHandler         - Error while processing: null
41353org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition borrower_log-0 at offset 13. If needed, please seek past the record to continue consumption.
Caused by: java.lang.IllegalStateException: No type information in headers and no default type provided
	at org.springframework.util.Assert.state(Assert.java:73)
	at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:220)
	at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:923)
	at org.apache.kafka.clients.consumer.internals.Fetcher.access$2600(Fetcher.java:93)
	at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1100)
	at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1200(Fetcher.java:949)
	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:570)
	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:531)
	at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1154)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1111)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:699)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.lang.Thread.run(Thread.java:745)
2020-04-10 09:15:29,558 [lenderId-,transactionId-,userId-,refrenceNumber-] ERROR - o.s.k.l.LoggingErrorHandler         - Error while processing: null
41354org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition borrower_log-0 at offset 13. If needed, please seek past the record to continue consumption.
Caused by: java.lang.IllegalStateException: No type information in headers and no default type provided
	at org.springframework.util.Assert.state(Assert.java:73)
	at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:220)
	at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:923)
	at org.apache.kafka.clients.consumer.internals.Fetcher.access$2600(Fetcher.java:93)
	at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1100)
	at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1200(Fetcher.java:949)
	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:570)
	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:531)
	at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1154)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1111)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:699)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.lang.Thread.run(Thread.java:745)
2020-04-10 09:15:29,559 [lenderId-,transactionId-,userId-,refrenceNumber-] ERROR - o.s.k.l.LoggingErrorHandler         - Error while processing: null
41355org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition borrower_log-0 at offset 13. If needed, please seek past the record to continue consumption.
Caused by: java.lang.IllegalStateException: No type information in headers and no default type provided
	at org.springframework.util.Assert.state(Assert.java:73)
	at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:220)
	at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:923)
	at org.apache.kafka.clients.consumer.internals.Fetcher.access$2600(Fetcher.java:93)
	at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1100)
	at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1200(Fetcher.java:949)
	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:570)
	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:531)
	at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1154)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1111)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:699)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.lang.Thread.run(Thread.java:745)
2020-04-10 09:15:29,561 [lenderId-,transactionId-,userId-,refrenceNumber-] ERROR - o.s.k.l.LoggingErrorHandler         - Error while processing: null
41357org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition borrower_log-0 at offset 13. If needed, please seek past the record to continue consumption.
Caused by: java.lang.IllegalStateException: No type information in headers and no default type provided
	at org.springframework.util.Assert.state(Assert.java:73)
	at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:220)
	at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:923)
	at org.apache.kafka.clients.consumer.internals.Fetcher.access$2600(Fetcher.java:93)
	at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1100)
	at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1200(Fetcher.java:949)
	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:570)
	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:531)
	at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1154)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1111)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:699)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.lang.Thread.run(Thread.java:745)
2020-04-10 09:15:29,562 [lenderId-,transactionId-,userId-,refrenceNumber-] ERROR - o.s.k.l.LoggingErrorHandler         - Error while processing: null
41358org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition borrower_log-0 at offset 13. If needed, please seek past the record to continue consumption.
Caused by: java.lang.IllegalStateException: No type information in headers and no default type provided
	at org.springframework.util.Assert.state(Assert.java:73)
	at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:220)
	at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:923)
	at org.apache.kafka.clients.consumer.internals.Fetcher.access$2600(Fetcher.java:93)
	at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1100)
	at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1200(Fetcher.java:949)
	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:570)
	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:531)
	at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1154)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1111)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:699)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.lang.Thread.run(Thread.java:745)
2020-04-10 09:15:29,562 [lenderId-,transactionId-,userId-,refrenceNumber-] ERROR - o.s.k.l.LoggingErrorHandler         - Error while processing: null
41358org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition borrower_log-0 at offset 13. If needed, please seek past the record to continue consumption.
Caused by: java.lang.IllegalStateException: No type information in headers and no default type provided
	at org.springframework.util.Assert.state(Assert.java:73)
	at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:220)
	at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:923)
	at org.apache.kafka.clients.consumer.internals.Fetcher.access$2600(Fetcher.java:93)
	at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1100)
	at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1200(Fetcher.java:949)
	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:570)
	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:531)
	at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1154)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1111)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:699)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.lang.Thread.run(Thread.java:745)
2020-04-10 09:15:29,563 [lenderId-,transactionId-,userId-,refrenceNumber-] ERROR - o.s.k.l.LoggingErrorHandler         - Error while processing: null
41359org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition borrower_log-0 at offset 13. If needed, please seek past the record to continue consumption.
Caused by: java.lang.IllegalStateException: No type information in headers and no default type provided
	at org.springframework.util.Assert.state(Assert.java:73)
	at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:220)
	at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:923)
	at org.apache.kafka.clients.consumer.internals.Fetcher.access$2600(Fetcher.java:93)
	at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1100)
	at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1200(Fetcher.java:949)
	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:570)
	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:531)
	at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1154)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1111)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:699)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.lang.Thread.run(Thread.java:745)
2020-04-10 09:15:29,564 [lenderId-,transactionId-,userId-,refrenceNumber-] ERROR - o.s.k.l.LoggingErrorHandler         - Error while processing: null
41360org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition borrower_log-0 at offset 13. If needed, please seek past the record to continue consumption.
Caused by: java.lang.IllegalStateException: No type information in headers and no default type provided
	at org.springframework.util.Assert.state(Assert.java:73)
	at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:220)
	at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:923)
	at org.apache.kafka.clients.consumer.internals.Fetcher.access$2600(Fetcher.java:93)
	at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1100)
	at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1200(Fetcher.java:949)
	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:570)
	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:531)
	at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1154)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1111)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:699)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.lang.Thread.run(Thread.java:745)
2020-04-10 09:15:29,565 [lenderId-,transactionId-,userId-,refrenceNumber-] ERROR - o.s.k.l.LoggingErrorHandler         - Error while processing: null
41361org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition borrower_log-0 at offset 13. If needed, please seek past the record to continue consumption.
Caused by: java.lang.IllegalStateException: No type information in headers and no default type provided
	at org.springframework.util.Assert.state(Assert.java:73)
	at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:220)
	at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:923)
	at org.apache.kafka.clients.consumer.internals.Fetcher.access$2600(Fetcher.java:93)
	at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1100)
	at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1200(Fetcher.java:949)
	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:570)
	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:531)
	at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1154)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1111)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:699)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.lang.Thread.run(Thread.java:745)
2020-04-10 09:15:29,566 [lenderId-,transactionId-,userId-,refrenceNumber-] ERROR - o.s.k.l.LoggingErrorHandler         - Error while processing: null
41362org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition borrower_log-0 at offset 13. If needed, please seek past the record to continue consumption.
Caused by: java.lang.IllegalStateException: No type information in headers and no default type provided
	at org.springframework.util.Assert.state(Assert.java:73)
	at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:220)
	at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:923)
	at org.apache.kafka.clients.consumer.internals.Fetcher.access$2600(Fetcher.java:93)
	at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1100)
	at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1200(Fetcher.java:949)
	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:570)
	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:531)
	at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1154)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1111)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:699)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.lang.Thread.run(Thread.java:745)
2020-04-10 09:15:29,567 [lenderId-,transactionId-,userId-,refrenceNumber-] ERROR - o.s.k.l.LoggingErrorHandler         - Error while processing: null
41363org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition borrower_log-0 at offset 13. If needed, please seek past the record to continue consumption.
Caused by: java.lang.IllegalStateException: No type information in headers and no default type provided
	at org.springframework.util.Assert.state(Assert.java:73)
	at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:220)
	at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:923)
	at org.apache.kafka.clients.consumer.internals.Fetcher.access$2600(Fetcher.java:93)
	at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1100)
	at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1200(Fetcher.java:949)
	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:570)
	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:531)
	at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1154)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1111)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:699)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.lang.Thread.run(Thread.java:745)
2020-04-10 09:15:29,588 [lenderId-,transactionId-,userId-,refrenceNumber-] ERROR - o.s.k.l.LoggingErrorHandler         - Error while processing: null
41384org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition borrower_log-0 at offset 13. If needed, please seek past the record to continue consumption.
Caused by: java.lang.IllegalStateException: No type information in headers and no default type provided
	at org.springframework.util.Assert.state(Assert.java:73)
	at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:220)
	at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:923)
	at org.apache.kafka.clients.consumer.internals.Fetcher.access$2600(Fetcher.java:93)
	at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1100)
	at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1200(Fetcher.java:949)
	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:570)
	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:531)
	at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1154)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1111)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:699)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.lang.Thread.run(Thread.java:745)
2020-04-10 09:15:29,589 [lenderId-,transactionId-,userId-,refrenceNumber-] ERROR - o.s.k.l.LoggingErrorHandler         - Error while processing: null
41385org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition borrower_log-0 at offset 13. If needed, please seek past the record to continue consumption.
Caused by: java.lang.IllegalStateException: No type information in headers and no default type provided
	at org.springframework.util.Assert.state(Assert.java:73)
	at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:220)
	at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:923)
	at org.apache.kafka.clients.consumer.internals.Fetcher.access$2600(Fetcher.java:93)
	at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1100)
	at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1200(Fetcher.java:949)
	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:570)
	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:531)
	at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1154)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1111)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:699)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.lang.Thread.run(Thread.java:745)
2020-04-10 09:15:29,597 [lenderId-,transactionId-,userId-,refrenceNumber-] ERROR - o.s.k.l.LoggingErrorHandler         - Error while processing: null
41393org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition borrower_log-0 at offset 13. If needed, please seek past the record to continue consumption.
Caused by: java.lang.IllegalStateException: No type information in headers and no default type provided
	at org.springframework.util.Assert.state(Assert.java:73)
	at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:220)
	at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:923)
	at org.apache.kafka.clients.consumer.internals.Fetcher.access$2600(Fetcher.java:93)
	at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1100)
	at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1200(Fetcher.java:949)
	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:570)
	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:531)
	at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1154)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1111)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:699)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.lang.Thread.run(Thread.java:745)
2020-04-10 09:15:29,598 [lenderId-,transactionId-,userId-,refrenceNumber-] ERROR - o.s.k.l.LoggingErrorHandler         - Error while processing: null
41394org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition borrower_log-0 at offset 13. If needed, please seek past the record to continue consumption.
Caused by: java.lang.IllegalStateException: No type information in headers and no default type provided
	at org.springframework.util.Assert.state(Assert.java:73)
	at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:220)
	at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:923)
	at org.apache.kafka.clients.consumer.internals.Fetcher.access$2600(Fetcher.java:93)
	at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1100)
	at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1200(Fetcher.java:949)
	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:570)
	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:531)
	at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1154)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1111)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:699)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.lang.Thread.run(Thread.java:745)
2020-04-10 09:15:29,599 [lenderId-,transactionId-,userId-,refrenceNumber-] ERROR - o.s.k.l.LoggingErrorHandler         - Error while processing: null
41395org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition borrower_log-0 at offset 13. If needed, please seek past the record to continue consumption.
Caused by: java.lang.IllegalStateException: No type information in headers and no default type provided
	at org.springframework.util.Assert.state(Assert.java:73)
	at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:220)
	at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:923)
	at org.apache.kafka.clients.consumer.internals.Fetcher.access$2600(Fetcher.java:93)
	at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1100)
	at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1200(Fetcher.java:949)
	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:570)
	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:531)
	at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1154)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1111)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:699)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.lang.Thread.run(Thread.java:745)
2020-04-10 09:15:29,600 [lenderId-,transactionId-,userId-,refrenceNumber-] ERROR - o.s.k.l.LoggingErrorHandler         - Error while processing: null
41396org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition borrower_log-0 at offset 13. If needed, please seek past the record to continue consumption.
Caused by: java.lang.IllegalStateException: No type information in headers and no default type provided
	at org.springframework.util.Assert.state(Assert.java:73)
	at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:220)
	at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:923)
	at org.apache.kafka.clients.consumer.internals.Fetcher.access$2600(Fetcher.java:93)
	at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1100)
	at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1200(Fetcher.java:949)
	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:570)
	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:531)
	at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1154)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1111)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:699)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.lang.Thread.run(Thread.java:745)
2020-04-10 09:15:29,601 [lenderId-,transactionId-,userId-,refrenceNumber-] ERROR - o.s.k.l.LoggingErrorHandler         - Error while processing: null
41397org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition borrower_log-0 at offset 13. If needed, please seek past the record to continue consumption.
Caused by: java.lang.IllegalStateException: No type information in headers and no default type provided
	at org.springframework.util.Assert.state(Assert.java:73)
	at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:220)
	at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:923)
	at org.apache.kafka.clients.consumer.internals.Fetcher.access$2600(Fetcher.java:93)
	at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1100)
	at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1200(Fetcher.java:949)
	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:570)
	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:531)
	at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1154)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1111)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:699)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.lang.Thread.run(Thread.java:745)
2020-04-10 09:15:29,602 [lenderId-,transactionId-,userId-,refrenceNumber-] ERROR - o.s.k.l.LoggingErrorHandler         - Error while processing: null
41398org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition borrower_log-0 at offset 13. If needed, please seek past the record to continue consumption.
Caused by: java.lang.IllegalStateException: No type information in headers and no default type provided
	at org.springframework.util.Assert.state(Assert.java:73)
	at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:220)
	at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:923)
	at org.apache.kafka.clients.consumer.internals.Fetcher.access$2600(Fetcher.java:93)
	at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1100)
	at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1200(Fetcher.java:949)
	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:570)
	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:531)
	at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1154)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1111)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:699)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.lang.Thread.run(Thread.java:745)
2020-04-10 09:15:29,603 [lenderId-,transactionId-,userId-,refrenceNumber-] ERROR - o.s.k.l.LoggingErrorHandler         - Error while processing: null
41399org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition borrower_log-0 at offset 13. If needed, please seek past the record to continue consumption.
Caused by: java.lang.IllegalStateException: No type information in headers and no default type provided
	at org.springframework.util.Assert.state(Assert.java:73)
	at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:220)
	at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:923)
	at org.apache.kafka.clients.consumer.internals.Fetcher.access$2600(Fetcher.java:93)
	at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1100)
	at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1200(Fetcher.java:949)
	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:570)
	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:531)
	at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1154)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1111)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:699)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.lang.Thread.run(Thread.java:745)
2020-04-10 09:15:29,603 [lenderId-,transactionId-,userId-,refrenceNumber-] ERROR - o.s.k.l.LoggingErrorHandler         - Error while processing: null
41399org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition borrower_log-0 at offset 13. If needed, please seek past the record to continue consumption.
Caused by: java.lang.IllegalStateException: No type information in headers and no default type provided
	at org.springframework.util.Assert.state(Assert.java:73)
	at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:220)
	at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:923)
	at org.apache.kafka.clients.consumer.internals.Fetcher.access$2600(Fetcher.java:93)
	at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1100)
	at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1200(Fetcher.java:949)
	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:570)
	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:531)
	at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1154)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1111)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:699)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.lang.Thread.run(Thread.java:745)
2020-04-10 09:15:29,604 [lenderId-,transactionId-,userId-,refrenceNumber-] ERROR - o.s.k.l.LoggingErrorHandler         - Error while processing: null
41400org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition borrower_log-0 at offset 13. If needed, please seek past the record to continue consumption.
Caused by: java.lang.IllegalStateException: No type information in headers and no default type provided
	at org.springframework.util.Assert.state(Assert.java:73)
	at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:220)
	at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:923)
	at org.apache.kafka.clients.consumer.internals.Fetcher.access$2600(Fetcher.java:93)
	at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1100)
	at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1200(Fetcher.java:949)
	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:570)
	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:531)
	at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1154)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1111)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:699)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.lang.Thread.run(Thread.java:745)
2020-04-10 09:15:29,605 [lenderId-,transactionId-,userId-,refrenceNumber-] ERROR - o.s.k.l.LoggingErrorHandler         - Error while processing: null
41401org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition borrower_log-0 at offset 13. If needed, please seek past the record to continue consumption.
Caused by: java.lang.IllegalStateException: No type information in headers and no default type provided
	at org.springframework.util.Assert.state(Assert.java:73)
	at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:220)
	at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:923)
	at org.apache.kafka.clients.consumer.internals.Fetcher.access$2600(Fetcher.java:93)
	at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1100)
	at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1200(Fetcher.java:949)
	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:570)
	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:531)
	at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1154)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1111)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:699)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.lang.Thread.run(Thread.java:745)
2020-04-10 09:15:29,606 [lenderId-,transactionId-,userId-,refrenceNumber-] ERROR - o.s.k.l.LoggingErrorHandler         - Error while processing: null
41402org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition borrower_log-0 at offset 13. If needed, please seek past the record to continue consumption.
Caused by: java.lang.IllegalStateException: No type information in headers and no default type provided
	at org.springframework.util.Assert.state(Assert.java:73)
	at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:220)
	at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:923)
	at org.apache.kafka.clients.consumer.internals.Fetcher.access$2600(Fetcher.java:93)
	at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1100)
	at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1200(Fetcher.java:949)
	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:570)
	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:531)
	at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1154)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1111)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:699)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.lang.Thread.run(Thread.java:745)
2020-04-10 09:15:29,607 [lenderId-,transactionId-,userId-,refrenceNumber-] ERROR - o.s.k.l.LoggingErrorHandler         - Error while processing: null
41403org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition borrower_log-0 at offset 13. If needed, please seek past the record to continue consumption.
Caused by: java.lang.IllegalStateException: No type information in headers and no default type provided
	at org.springframework.util.Assert.state(Assert.java:73)
	at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:220)
	at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:923)
	at org.apache.kafka.clients.consumer.internals.Fetcher.access$2600(Fetcher.java:93)
	at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1100)
	at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1200(Fetcher.java:949)
	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:570)
	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:531)
	at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1154)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1111)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:699)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.lang.Thread.run(Thread.java:745)
2020-04-10 09:15:29,607 [lenderId-,transactionId-,userId-,refrenceNumber-] ERROR - o.s.k.l.LoggingErrorHandler         - Error while processing: null
41403org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition borrower_log-0 at offset 13. If needed, please seek past the record to continue consumption.
Caused by: java.lang.IllegalStateException: No type information in headers and no default type provided
	at org.springframework.util.Assert.state(Assert.java:73)
	at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:220)
	at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:923)
	at org.apache.kafka.clients.consumer.internals.Fetcher.access$2600(Fetcher.java:93)
	at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1100)
	at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1200(Fetcher.java:949)
	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:570)
	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:531)
	at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1154)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1111)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:699)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.lang.Thread.run(Thread.java:745)
2020-04-10 09:15:29,608 [lenderId-,transactionId-,userId-,refrenceNumber-] ERROR - o.s.k.l.LoggingErrorHandler         - Error while processing: null
41404org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition borrower_log-0 at offset 13. If needed, please seek past the record to continue consumption.
Caused by: java.lang.IllegalStateException: No type information in headers and no default type provided
	at org.springframework.util.Assert.state(Assert.java:73)
	at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:220)
	at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:923)
	at org.apache.kafka.clients.consumer.internals.Fetcher.access$2600(Fetcher.java:93)
	at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1100)
	at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1200(Fetcher.java:949)
	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:570)
	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:531)
	at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1154)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1111)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:699)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.lang.Thread.run(Thread.java:745)
2020-04-10 09:15:29,609 [lenderId-,transactionId-,userId-,refrenceNumber-] ERROR - o.s.k.l.LoggingErrorHandler         - Error while processing: null
41405org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition borrower_log-0 at offset 13. If needed, please seek past the record to continue consumption.
Caused by: java.lang.IllegalStateException: No type information in headers and no default type provided
	at org.springframework.util.Assert.state(Assert.java:73)
	at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:220)
	at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:923)
	at org.apache.kafka.clients.consumer.internals.Fetcher.access$2600(Fetcher.java:93)
	at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1100)
	at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1200(Fetcher.java:949)
	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:570)
	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:531)
	at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1154)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1111)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:699)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.lang.Thread.run(Thread.java:745)
2020-04-10 09:15:29,610 [lenderId-,transactionId-,userId-,refrenceNumber-] ERROR - o.s.k.l.LoggingErrorHandler         - Error while processing: null
41406org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition borrower_log-0 at offset 13. If needed, please seek past the record to continue consumption.
Caused by: java.lang.IllegalStateException: No type information in headers and no default type provided
	at org.springframework.util.Assert.state(Assert.java:73)
	at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:220)
	at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:923)
	at org.apache.kafka.clients.consumer.internals.Fetcher.access$2600(Fetcher.java:93)
	at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1100)
	at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1200(Fetcher.java:949)
	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:570)
	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:531)
	at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1154)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1111)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:699)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.lang.Thread.run(Thread.java:745)
2020-04-10 09:15:29,611 [lenderId-,transactionId-,userId-,refrenceNumber-] ERROR - o.s.k.l.LoggingErrorHandler         - Error while processing: null
41407org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition borrower_log-0 at offset 13. If needed, please seek past the record to continue consumption.
Caused by: java.lang.IllegalStateException: No type information in headers and no default type provided
	at org.springframework.util.Assert.state(Assert.java:73)
	at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:220)
	at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:923)
	at org.apache.kafka.clients.consumer.internals.Fetcher.access$2600(Fetcher.java:93)
	at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1100)
	at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1200(Fetcher.java:949)
	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:570)
	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:531)
	at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1154)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1111)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:699)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.lang.Thread.run(Thread.java:745)
2020-04-10 09:15:29,612 [lenderId-,transactionId-,userId-,refrenceNumber-] ERROR - o.s.k.l.LoggingErrorHandler         - Error while processing: null
41408org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition borrower_log-0 at offset 13. If needed, please seek past the record to continue consumption.
Caused by: java.lang.IllegalStateException: No type information in headers and no default type provided
	at org.springframework.util.Assert.state(Assert.java:73)
	at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:220)
	at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:923)
	at org.apache.kafka.clients.consumer.internals.Fetcher.access$2600(Fetcher.java:93)
	at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1100)
	at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1200(Fetcher.java:949)
	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:570)
	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:531)
	at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1154)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1111)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:699)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.lang.Thread.run(Thread.java:745)
2020-04-10 09:15:29,612 [lenderId-,transactionId-,userId-,refrenceNumber-] ERROR - o.s.k.l.LoggingErrorHandler         - Error while processing: null
41408org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition borrower_log-0 at offset 13. If needed, please seek past the record to continue consumption.
Caused by: java.lang.IllegalStateException: No type information in headers and no default type provided
	at org.springframework.util.Assert.state(Assert.java:73)
	at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:220)
	at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:923)
	at org.apache.kafka.clients.consumer.internals.Fetcher.access$2600(Fetcher.java:93)
	at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1100)
	at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1200(Fetcher.java:949)
	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:570)
	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:531)
	at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1154)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1111)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:699)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.lang.Thread.run(Thread.java:745)
2020-04-10 09:15:29,613 [lenderId-,transactionId-,userId-,refrenceNumber-] ERROR - o.s.k.l.LoggingErrorHandler         - Error while processing: null
41409org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition borrower_log-0 at offset 13. If needed, please seek past the record to continue consumption.
Caused by: java.lang.IllegalStateException: No type information in headers and no default type provided
	at org.springframework.util.Assert.state(Assert.java:73)
	at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:220)
	at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:923)
	at org.apache.kafka.clients.consumer.internals.Fetcher.access$2600(Fetcher.java:93)
	at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1100)
	at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1200(Fetcher.java:949)
	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:570)
	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:531)
	at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1154)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1111)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:699)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.lang.Thread.run(Thread.java:745)
2020-04-10 09:15:29,614 [lenderId-,transactionId-,userId-,refrenceNumber-] ERROR - o.s.k.l.LoggingErrorHandler         - Error while processing: null
41410org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition borrower_log-0 at offset 13. If needed, please seek past the record to continue consumption.
Caused by: java.lang.IllegalStateException: No type information in headers and no default type provided
	at org.springframework.util.Assert.state(Assert.java:73)
	at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:220)
	at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:923)
	at org.apache.kafka.clients.consumer.internals.Fetcher.access$2600(Fetcher.java:93)
	at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1100)
	at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1200(Fetcher.java:949)
	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:570)
	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:531)
	at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1154)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1111)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:699)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.lang.Thread.run(Thread.java:745)
2020-04-10 09:15:29,615 [lenderId-,transactionId-,userId-,refrenceNumber-] ERROR - o.s.k.l.LoggingErrorHandler         - Error while processing: null
41411org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition borrower_log-0 at offset 13. If needed, please seek past the record to continue consumption.
Caused by: java.lang.IllegalStateException: No type information in headers and no default type provided
	at org.springframework.util.Assert.state(Assert.java:73)
	at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:220)
	at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:923)
	at org.apache.kafka.clients.consumer.internals.Fetcher.access$2600(Fetcher.java:93)
	at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1100)
	at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1200(Fetcher.java:949)
	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:570)
	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:531)
	at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1154)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1111)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:699)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.lang.Thread.run(Thread.java:745)
2020-04-10 09:15:29,616 [lenderId-,transactionId-,userId-,refrenceNumber-] ERROR - o.s.k.l.LoggingErrorHandler         - Error while processing: null
41412org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition borrower_log-0 at offset 13. If needed, please seek past the record to continue consumption.
Caused by: java.lang.IllegalStateException: No type information in headers and no default type provided
	at org.springframework.util.Assert.state(Assert.java:73)
	at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:220)
	at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:923)
	at org.apache.kafka.clients.consumer.internals.Fetcher.access$2600(Fetcher.java:93)
	at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1100)
	at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1200(Fetcher.java:949)
	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:570)
	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:531)
	at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1154)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1111)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:699)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.lang.Thread.run(Thread.java:745)

Перейти к содержимому

При попытке считать сообщение из kafka получаю странную ошибку:

[kafka.request.logger] [] — Completed request:Name: FetchRequest; Version: 3; CorrelationId: 40488; ClientId: consumer-1; ReplicaId: -1; MaxWait: 500 ms; MinBytes: 1 bytes; MaxBytes:52428800 bytes from connection 127.0.0.1:53434-127.0.0.1:50755;totalTime:0,requestQueueTime:0,localTime:0,remoteTime:0,responseQueueTime:0,sendTime:0,securityProtocol:PLAINTEXT,principal:User:ANONYMOUS

[org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer] [] — Container exception

org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition shiftServiceInQueue-0 at offset 0

Caused by: org.apache.kafka.common.errors.SerializationException: Size of data received by IntegerDeserializer is not 4

 
В конце концов оказалось, что проблема была в следующем коде:

Map<String, Object> consumerProps = KafkaTestUtils.consumerProps(«testT», «false», embeddedKafka);

DefaultKafkaConsumerFactory<Integer, String> cf =

        new DefaultKafkaConsumerFactory<Integer, String>(consumerProps);//,

                //new StringDeserializer(), new StringDeserializer());

ContainerProperties containerProperties = new ContainerProperties(additionalTopics);

Arrays.stream(additionalTopics).forEach(topic -> records.put(topic, new LinkedBlockingQueue<>()));

KafkaMessageListenerContainer<Integer, String> container =

        new KafkaMessageListenerContainer<Integer, String>(cf, containerProperties);

Чтобы всё заработало, нужно просто поправить тип ключа с
Integer на 
String:

Map<String, Object> consumerProps = KafkaTestUtils.consumerProps(«testT», «false», embeddedKafka);

DefaultKafkaConsumerFactory<String, String> cf =

        new DefaultKafkaConsumerFactory<String, String>(consumerProps,

                new StringDeserializer(), new StringDeserializer());

ContainerProperties containerProperties = new ContainerProperties(additionalTopics);

Arrays.stream(additionalTopics).forEach(topic -> records.put(topic, new LinkedBlockingQueue<>()));

KafkaMessageListenerContainer<String, String> container =

        new KafkaMessageListenerContainer<String, String>(cf, containerProperties);

То есть у
KafkaConsumerFactory просто указываем строковые ключи и
StringDeserializer-ы.

There is currently a rather serious flaw in the Java KafkaConsumer when combined with KafkaAvroDeserializer, which is used to deserialize records when their schemas are stored in Schema Registry. A critical issue has been opened, but it hasn’t been updated since December 2018.

In brief, the issue is that when a record is encountered that cannot be deserialized from Avro (a.k.a a poison pill) an exception will be thrown:

org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition topic-0 at offset 2
If needed, please seek past the record to continue consumption.

This is, of course, how it should behave. However, it fails in an unrecoverable manner. Because it cannot be deserialized the record is not added to the collection returned by poll(), which means running a commit() variant won’t do anything. And, despite the instructions in the exception meessage, you can’t easily seek() past the record because the necessary partition and offset information would be in the ConsumerRecord instance, which is never instantiated. This results in a loop of fail, as poll() will continue returning the same record repeatedly with no provided way to get past the record.

Needless to say, this is disappointing for such a widely used and essential platform for just about anyone doing data engineering or event driven architecture of any sort these days. Hopefully the issue will be addressed, although it’s going to be tricky to do without introducing breaking semantic changes. The alternative presented in the issue of passing partition id and offset through the exception object isn’t exactly clean, but may be the best option. Regardless, in the meantime there are options to explore. And here they are!

‘Correct’ the deserializer

Since the heart of the problem is that the KafkaConsumer doesn’t handle SerializationException in a recoverable manner, one solution would be simply not to throw SerializationException anymore. By extending io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer (a la KafkaAvroDeserializer) we can make this happen.

    @Override
    public Object deserialize(String s, byte[] bytes) {
        try {
            return deserialize(bytes);
        } catch (SerializationException e)
        {
            return null;
        }
    }

    public Object deserialize(String s, byte[] bytes, Schema readerSchema) {
        try {
            return deserialize(bytes, readerSchema);
        } catch (SerializationException e)
        {
            return null;
        }

Now, inject the class into ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG


props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, <my_deserializer_name>.class);

… and catch those null objects.

while (true) {
    final ConsumerRecords<String, Payment> records = consumer.poll(Duration.ofMillis(100));
    for (final ConsumerRecord<String, Payment> record : records) {
        final String key = record.key();

        if (record.value() == null) {
            System.out.println("A poison pill record was encountered.");
        } else {
            try {
                final Payment value = record.value();
                System.out.printf("key = %s, value = %s%n", key, value);
            } catch (ClassCastException e) {
                System.out.println("This record is not a `Payment` event.");
            }
        }
        consumer.commitSync();
    }

It’s not great. Converting exceptions into null objects hurts, but, given the alternatives below, it may be the best option. It’s the one I’m using.

Delete the record from the topic

It is possible to delete the record from the topic manually by using the kafka-delete-records.sh. I haven’t tried this (yet), so YMMV, use at your own use, etc. Here is a blog post on the subject.

Of course, relying on this option means that anytime your consumer(s) encounter(s) a poison pill you’re down until your Kafka administrators can fix the issue. And Bad Things can always happen when you’re munging from the command line.

Use the Streams API

(Not tested) Interestingly, the problem has been fixed in the Streams API, essentially by the same method above of implementing the ‘catch exception and move on’ method. There was an improvement proposal on the subject that became a JIRA issue and the fix was merged.

The downside here is that the Streams API is meant for the very specific usage patterns of stream processing. It may or may not fit your use case.

Use Spring

(Not tested) For those using the Spring framework, there is a fix in version 2.2. Here’s a Confluent post to help.

The downside of this option is Spring.

Use a libdrkafka client

(Not tested) Colleagues assure me that libdrkafka based clients, such as in Python, don’t have this issue. If I get the chance I’ll test this one out and update this post.

Parse the error text and seek past the bad record

We’re well into hack territory here, but it might get you through in a pinch. Let’s not dwell. There’s some code below, if you must.

Just don’t tell anyone I helped you do this.

import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
import io.confluent.kafka.serializers.subject.TopicRecordNameStrategy;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.regex.Pattern;
import java.util.regex.Matcher;
import java.util.Collections;
import java.util.Properties;

public class SampleConsumer {

    private static final String TOPIC = "payment";

    @SuppressWarnings("InfiniteLoopStatement")
    public void Consume() {
        final Properties props = new Properties();
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-consumer");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
        props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 100000);
        props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");
        props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);
        props.put(KafkaAvroDeserializerConfig.KEY_SUBJECT_NAME_STRATEGY, TopicRecordNameStrategy.class.getName());
        props.put(KafkaAvroDeserializerConfig.VALUE_SUBJECT_NAME_STRATEGY, TopicRecordNameStrategy.class.getName());

        /* Sadly, we need to do some string parsing to deal with 'poison pill' records (i.e. any message that cannot be
        de-serialized by KafkaAvroDeserializer, most likely because they weren't produced using Schema Registry) so we
        need to set up some regex things
         */
        final Pattern offsetPattern = Pattern.compile("\w*offset*\w[ ]\d+");
        final Pattern partitionPattern = Pattern.compile("\w*" + TOPIC + "*\w[-]\d+");

        KafkaConsumer<String, LoanCreated> consumer =  new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singletonList(TOPIC));

        // Consume messages
        while (true) {
            try {
                final ConsumerRecords<String, LoanCreated> records = consumer.poll(Duration.ofMillis(1));
                for (final ConsumerRecord<String, LoanCreated> record : records) {
                    final String key = record.key();
                    try {
                        /* A record can be successfully de-serialized, but is not coercable into the type we need. In
                        the case of this example, we're looking for LoanCreated records, but we are also producing
                        Payment records. */
                        final LoanCreated value = record.value();
                        System.out.printf("key = %s, value = %s%n", key, value);
                        // do work here
                    } catch (ClassCastException e) {
                        System.out.println("Record is not the specified type ... skipping");
                    }
                }
                consumer.commitSync();
            } catch (SerializationException e) {
                String text = e.getMessage();
                // Parse the error message to get the partition number and offset, in order to `seek` past the poison pill.
                Matcher mPart = partitionPattern.matcher(text);
                Matcher mOff = offsetPattern.matcher(text);

                mPart.find();
                Integer partition = Integer.parseInt(mPart.group().replace(TOPIC + "-", ""));
                mOff.find();
                Long offset = Long.parseLong(mOff.group().replace("offset ", ""));
                System.out.println(String.format(
                        "'Poison pill' found at partition {0}, offset {1} .. skipping", partition, offset));
                consumer.seek(new TopicPartition(TOPIC, partition), offset + 1);
                // Continue on
            }
        }
    }
}

Problem

Message Bus Probe is able to connect to Kafa. However, it’s not able to consume the Kafka messages.

Symptom

The probe debug log displays the following exception:

Error: E-JPR-000-000: [KafkaConsumerRunnable] Exception when kafka consumer is running: org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition nfvo_notifications_v1-9 at offset 13. If needed, please seek past the record to continue consumption.

Cause

The Kafka messages are in JSON format but the serializer isn’t configured to consume JSON format.

Resolving The Problem

Change the default key.serializer property in kafkaClient.properties file from:

key.deserializer=org.apache.kafka.common.serialization.LongDeserializer

To:

key.deserializer=org.apache.kafka.common.serialization.StringDeserializer

Document Location

Worldwide

[{«Line of Business»:{«code»:»LOB45″,»label»:»Automation»},»Business Unit»:{«code»:»BU053″,»label»:»Cloud & Data Platform»},»Product»:{«code»:»SSSHTQ»,»label»:»Tivoli Netcool/OMNIbus»},»ARM Category»:[{«code»:»a8m500000008a8TAAQ»,»label»:»Probes->1 Individual Probes->Message Bus JSON XML nco_p_message_bus»}],»ARM Case Number»:»TS004269666″,»Platform»:[{«code»:»PF025″,»label»:»Platform Independent»}],»Version»:»All Version(s)»}]

In this post , we will see How to Fix Kafka Error – “org.apache.kafka.common.errors.SerializationException” which is caused by various reasons.

org.apache.kafka.common.errors.SerializationException:
Caused by: org.apache.kafka.common.errors.SerializationException:

This Exception is caused by some of the Possible below conditions and formats –

  • Caused by: org.apache.kafka.common.errors.SerializationException: Size of data received by IntegerDeserializer is not 4
  • Caused by: org.apache.kafka.common.errors.SerializationException: Can’t deserialize data
  • Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message
  • Caused by: org.apache.kafka.common.errors.SerializationException: Size of data received by LongDeserializer is not 8
  • Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
  • Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing json to Avro


if( aicp_can_see_ads() ) {

}

To understand the issue , when we produce any message and publish to Kafka or when we consume the message from Kafka either ways , the system should be able to parse the message with some schema structure (since messages are serialized and de-serialized before and after reaching Kafka ) . Avro ,JSON, Protobuf, String delimited (e.g., CSV) are popularly used schema.

So the schema structure should be comprehended by both the Producer as well as the Consumer and should be in tandem.

Generic Solution :

  • First thing first , Producer & Consumer has to be in conformation with each other in terms of serialization formats used. e.g. If Producer sends Json message, Consumer should be ready to consume Json formatted messages. Missing this simple aspect makes system to face issues.
  • The issue is very much related to the deserializer used (custom, default whatever) and their inherent capabilities and limits. So if your key or value in the message is a String (or any custom structure) and you are using , say IntegerDeserializer to parse it , it will throw error . If the derializer is , say IntegerDeserializer it will expect the data length to be of 4-Bytes . So it can not parse a String .


if( aicp_can_see_ads() ) {

}

  • Check if you are consuming any message with Null value (either as Key or as Value) and trying to deserialize that . The system might not be able too handle null during deserialization. Verify the business logic if such datamessage can be skipped or re-directed (to log etc.)
  • Check and verify what deserializer is set for the consumer. These are set by the below flags . Check if you are using the correct Deserializer. e.g.
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer 
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer

Possible deserialization options are –

    • ByteArrayDeserializer,
    • ByteBufferDeserializer,
    • BytesDeserializer,
    • DoubleDeserializer,
    • ExtendedDeserializer.Wrapper,
    • FloatDeserializer,
    • IntegerDeserializer,
    • LongDeserializer,
    • SessionWindowedDeserializer,
    • ShortDeserializer,
    • StringDeserializer,
    • TimeWindowedDeserializer,
    • UUIDDeserializer
  •  Depending on use case , you might also be required to create own custom deserializer if the out-of-box options does not meet the need.
  • If you are using Kafka Streams, when you convert KTable to KStream , you can filter out nulls .
ktable.toStream.filter((key, value) -> value != null)
  • Choose Serialization based on –
    • Schema – Based on the schema of your data. The schema is the Eye-Glass to read your data – so you need to ensure it is intact.
    • Message size – JSON uses is dependent on the the compression offered by Kafka itself. However both Avro and Protobuf are binary and hence smaller in sizes.
    • Language – If consumer is coded in Java , Avro is a popular choice. But Non-Java languages might have to depend on formats like Protobuf.


if( aicp_can_see_ads() ) {

}

  • Check the settings in the config and verify what types of values are being expected by below flags . Based on the ingested data , you might have to change that.
value.converter 
key.converter

Apart from the Generic Solution , you can check the custom pointers below –

SerializationException: Size of data received by IntegerDeserializer is not 4

  • Possibly you are using IntegerDeserializer which expects the data length to be of 4-Bytes. But you are trying to parse messages bigger than that i.e. length > 4-Bytes – It might be some string. So set the appropriate deserializer.


if( aicp_can_see_ads() ) {

}

SerializationException: Can’t deserialize data

  • May be you are not using the right deserializer taking into account the length or structure  of the data or message
  • May be you have some specific Business-use-case specific data structure . And the the out-of-box deserializers might not be able to parse that. In such scenario , once you identify the structure of the message , you would have to code custom deserializers. And use the respective class and jar while running the Kafka consumer.

SerializationException: Error deserializing Avro message

  • This occurs if the consumer is not able to load the class definition of the type generated from Avro schema.
  • Check if the deserializer used is sufficient for AVRO messages.
  • May be you have some specific Business-use-case specific data structure . And the the out-of-box deserializers might not be able to parse that. In such scenario , once you identify the structure of the message , you would have to code custom deserializers (extending KafkaAvroDeserializer). And use the respective class and jar while running the Kafka consumer.


if( aicp_can_see_ads() ) {

}

SerializationException: Size of data received by LongDeserializer is not 8

  • Possibly you are using LongDeserializer which expects the data length to be of 8-Bytes. But you are trying to parse messages bigger than that i.e. length > 8-Bytes – It might be some string. So set the appropriate deserializer.

SerializationException: Unknown magic byte!

  • It often denotes that the Topic datamessage is not valid Avro structure and hence could not be deserialised.
  • If possible to drop or skip such invalid data or messages from the Topic, you can use the below config settings in case of Kafka Connect
"errors.tolerance": "all",
"errors.log.enable":true,
"errors.log.include.messages":true

SerializationException: Error Deserializing Json to Avro

  • Check if you are consuming any message with Null value (either as Key or as Value) and trying to deserialize that . The configured deserializer might not be able to perform the conversion from json to avro if the data is Null.
  • If possible to drop or skip such invalid data or messages from the Topic or use Custom deserilizer to handle the Null.
  • If you are using Kafka Connect , use below configs to skip invalid Null data –
"errors.tolerance": "all",
"errors.log.enable":true,
"errors.log.include.messages":true

Hope this helps.


if( aicp_can_see_ads() ) {

}

Other Interesting Reads – 

  • How to Enable UTF-8 in Python ?

  • How to Handle Bad or Corrupt records in Apache Spark ?

  • How to Code Custom Exception Handling in Python ?

  • Apache Spark Tricky Interview Questions Part 1

org.apache.kafka.common.errors.SerializationException ,org apache kafka common errors serializationexception unknown magic byte n type sink ,org.apache.kafka.common.errors.serializationexception: error deserializing avro message for id -1 ,kafka-avro-console-consumer unknown magic byte ,kafka magic byte ,kafkaavrodeserializer unknown magic byte ,org apache kafka connect errors dataexception failed to deserialize data for topic ,”error deserializing avro message for id -1″ ,message does not start with magic byte ,org.apache.kafka.common.errors.serializationexception unknown magic byte ,org.apache.kafka.common.errors.serializationexception ,org.apache.kafka.common.errors.serializationexception can’t serialize data ,org.apache.kafka.common.errors.serializationexception can’t deserialize data ,org.apache.kafka.common.errors.serializationexception error deserializing key/value ,org.apache.kafka.common.errors.serializationexception error deserializing json ,kafka.common.errors.serializationexception error deserializing key/value for partition ,kafka handle serializationexception ,kafka.common.errors.serializationexception unknown magic byte ,kafka consumer serializationexception ,kafka consumer handle serializationexception ,kafka.common.errors.serializationexception can’t convert value of class ,kafka streams serializationexception ,kafka serializationexception skip ,org.apache.kafka.common.errors.serializationexception can’t deserialize data ,org apache kafka common errors serializationexception error deserializing key/value ,org.apache.kafka.common.errors.serializationexception can’t serialize data ,kafka deserialization exception ,org.apache.kafka.common.errors.serializationexception: can’t deserialize data ,kafka consumer serializationexception ,size of data received by integerdeserializer is not 4 ,org.apache.kafka.common.errors.serializationexception can’t convert value of class ,org apache kafka common serialization jsonserializer could not be found ,kafka bytearrayserializer ,kafka byte serializer example ,class org apache kafka common serialization stringserializer could not be found ,org apache kafka common serialization deserializer not found ,kafka key.serializer example ,cannot access org apache kafka common serialization extendeddeserializer ,kafka object serializer ,org apache kafka common serialization jsonserializer could not be found ,kafka bytearrayserializer example ,kafka byte serializer example ,class org apache kafka common serialization stringserializer could not be found ,org apache kafka common serialization deserializer not found ,kafka key.serializer example ,cannot access org apache kafka common serialization extendeddeserializer ,kafka object serializer ,org.apache.kafka.common.errors.serializationexception can’t deserialize data from topic ,org.apache.kafka.common.errors.serializationexception: error deserializing avro message ,kafka-avro-console-consumer error deserializing avro message for id ,kafka-avro-console-producer ,org.apache.kafka.common.errors.serializationexception: unknown magic byte! ,kafkaavrodeserializer ,org.apache.kafka.common.errors.serializationexception: error deserializing key/value for partition ,class io.confluent.kafka.serializers.kafkaavrodeserializer could not be found ,kafka-avro-console-producer json ,Caused by: org.apache.kafka.common.errors.SerializationException: Size of data received by IntegerDeserializer is not 4 ,Caused by: org.apache.kafka.common.errors.SerializationException: Can’t deserialize data ,Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message ,Caused by: org.apache.kafka.common.errors.SerializationException: Size of data received by LongDeserializer is not 8 ,Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte! ,Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing json to Avro


if( aicp_can_see_ads() ) {

}

Issue

Produder properties

spring.kafka.producer.bootstrap-servers=127.0.0.1:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer

Consumer properties

spring.kafka.consumer.bootstrap-servers=127.0.0.1:9092
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.trusted.packages=*
spring.kafka.consumer.group-id=user-group
server.port=8085

Consumer Service

@Service
public class UserConsumerService {

    @KafkaListener(topics = { "user-topic" })
    public void consumerUserData(User user) {
        System.out.println("Users Age Is: " + user.getAge() + " Fav Genre " + user.getFavGenre());
    }
}

Producer Service

@Service
public class UserProducerService {

    @Autowired
    private KafkaTemplate<String, User> kafkaTemplate;

    public void sendUserData(User user) {
        kafkaTemplate.send("user-topic", user.getName(), user);
    }
}

Producer Config for creating topic

    @Configuration public class KafkaConfig {
    
        @Bean
        public NewTopic topicOrder() {
            return TopicBuilder.name("user-topic").partitions(2).replicas(1).build();
        } 
}

Producer works well but Consumer gives error like

2021-12-06 21:45:50.299 ERROR 4936 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : Consumer exception

java.lang.IllegalStateException: This error handler cannot process 'SerializationException's directly; please consider configuring an

‘ErrorHandlingDeserializer’ in the value and/or key deserializer at
org.springframework.kafka.listener.DefaultErrorHandler.handleOtherException(DefaultErrorHandler.java:149)
~[spring-kafka-2.8.0.jar:2.8.0] DefaultErrorHandler.java:149 at
org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1760)
~[spring-kafka-2.8.0.jar:2.8.0]
KafkaMessageListenerContainer.java:1760 at
org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1283)
~[spring-kafka-2.8.0.jar:2.8.0]
KafkaMessageListenerContainer.java:1283 at
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
~[na:na] Executors.java:539 at
java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
~[na:na] FutureTask.java:264 at
java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]
Thread.java:833 Caused by:
org.apache.kafka.common.errors.RecordDeserializationException: Error
deserializing key/value for partition user-topic-0 at offset 1. If
needed, please seek past the record to continue consumption. at
org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1429)
~[kafka-clients-3.0.0.jar:na] Fetcher.java:1429 at
org.apache.kafka.clients.consumer.internals.Fetcher.access$3400(Fetcher.java:134)
~[kafka-clients-3.0.0.jar:na] Fetcher.java:134 at
org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1652)
~[kafka-clients-3.0.0.jar:na] Fetcher.java:1652 at
org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1800(Fetcher.java:1488)
~[kafka-clients-3.0.0.jar:na] Fetcher.java:1488 at
org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:721)
~[kafka-clients-3.0.0.jar:na] Fetcher.java:721 at
org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:672)
~[kafka-clients-3.0.0.jar:na] Fetcher.java:672 at
org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1277)
~[kafka-clients-3.0.0.jar:na] KafkaConsumer.java:1277 at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1238)
~[kafka-clients-3.0.0.jar:na] KafkaConsumer.java:1238 at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211)
~[kafka-clients-3.0.0.jar:na] KafkaConsumer.java:1211 at
org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollConsumer(KafkaMessageListenerContainer.java:1507)
~[spring-kafka-2.8.0.jar:2.8.0]
KafkaMessageListenerContainer.java:1507 at
org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1497)
~[spring-kafka-2.8.0.jar:2.8.0]
KafkaMessageListenerContainer.java:1497 at
org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1325)
~[spring-kafka-2.8.0.jar:2.8.0] KafkaMessage

I will be glad if you help since I am new to kafka and trying to figure out why getting this error

Solution

Does the error message not tell you anything?

This error handler cannot process ‘SerializationException’s directly; please consider configuring an ‘ErrorHandlingDeserializer’ in the value and/or key deserializer

See the documentation: https://docs.spring.io/spring-kafka/docs/current/reference/html/#error-handling-deserializer

When a deserializer fails to deserialize a message, Spring has no way to handle the problem, because it occurs before the poll() returns. To solve this problem, the ErrorHandlingDeserializer has been introduced. This deserializer delegates to a real deserializer (key or value). If the delegate fails to deserialize the record content, the ErrorHandlingDeserializer returns a null value and a DeserializationException in a header that contains the cause and the raw bytes. When you use a record-level MessageListener, if the ConsumerRecord contains a DeserializationException header for either the key or value, the container’s ErrorHandler is called with the failed ConsumerRecord. The record is not passed to the listener.

You can use the DefaultKafkaConsumerFactory constructor that takes key and value Deserializer objects and wire in appropriate ErrorHandlingDeserializer instances that you have configured with the proper delegates. Alternatively, you can use consumer configuration properties (which are used by the ErrorHandlingDeserializer) to instantiate the delegates. The property names are ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS and ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS. The property value can be a class or class name. The following example shows how to set these properties:

.. // other props
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, JsonDeserializer.class);
props.put(JsonDeserializer.KEY_DEFAULT_TYPE, "com.example.MyKey")
props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class.getName());
props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, "com.example.MyValue")
props.put(JsonDeserializer.TRUSTED_PACKAGES, "com.example")
return new DefaultKafkaConsumerFactory<>(props);

With Boot:

...
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
spring.kafka.consumer.properties.spring.deserializer.value.delegate.class=org.springframework.kafka.support.serializer.JsonDeserializer
...

Answered By — Gary Russell
Answer Checked By — Clifford M. (JavaFixing Volunteer)

I’ve got an integration tests that passes when I send to a Kafka topic without a key. However, when I add a key I start to get serialization errors.

org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition topic-1 at offset 0

Caused by: org.apache.kafka.common.errors.SerializationException: Size of data received by IntegerDeserializer is not 4

This is my sender class:

public class Sender {
    private static final Logger LOG = LoggerFactory.getLogger(Sender.class);

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    public void send() {

        String topic = "topic";
        String data = "data";
        String key = "key";

        LOG.info("sending to topic: '{}', key: '{}', data: '{}'", topic, key, data);

        // does not work
        kafkaTemplate.send(topic, key, data);

        // works
        // kafkaTemplate.send(topic, data);

    }
}

This is my configuration, where I specify a StringSerializer for the key

@Configuration
@EnableKafka
public class Config {

    @Bean
    public Sender sender() {
        return new Sender();
    }

    @Bean
    public Properties properties() {
        return new Properties();
    }

    @Bean
    public Map<String, Object> producerConfigs(Properties properties) {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServers());
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;
    }

    @Bean
    public ProducerFactory<String, String> producerFactory(Properties properties) {
        return new DefaultKafkaProducerFactory<>(producerConfigs(properties));
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate(Properties properties) {
        return new KafkaTemplate<>(producerFactory(properties));
    }

}

The problem may related to the message listener in my test, but that is also using strings across the board

@RunWith(SpringRunner.class)
@SpringBootTest()
@DirtiesContext
public class SenderIT {


    public static final Logger LOG = LoggerFactory.getLogger(SenderIT.class);

    private static String SENDER_TOPIC = "topic";

    @Autowired
    private Sender sender;

    private KafkaMessageListenerContainer<String, String> container;

    private BlockingQueue<ConsumerRecord<String, String>> records;

    @ClassRule
    public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, SENDER_TOPIC);

    @Before
    public void setUp() throws Exception {

        // set up the Kafka consumer properties
        Map<String, Object> consumerProperties =
            KafkaTestUtils.consumerProps("sender", "false", embeddedKafka);

        consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        // create a Kafka consumer factory
        DefaultKafkaConsumerFactory<String, String> consumerFactory =
            new DefaultKafkaConsumerFactory<String, String>(consumerProperties);

        // set the topic that needs to be consumed
        ContainerProperties containerProperties = new ContainerProperties(SENDER_TOPIC);

        // create a Kafka MessageListenerContainer
        container = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties);

        // create a thread safe queue to store the received message
        records = new LinkedBlockingQueue<>();

        // setup a Kafka message listener
        container.setupMessageListener(new MessageListener<String, String>() {
        @Override
        public void onMessage(ConsumerRecord<String, String> record) {
            LOG.debug("test-listener received message='{}'", record.toString());
            records.add(record);
        }
        });

        // start the container and underlying message listener
        container.start();

        // wait until the container has the required number of assigned partitions
        ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic());
    }

    @After
    public void tearDown() {
        // stop the container
        container.stop();
    }

    @Test
    public void test() throws InterruptedException {

        sender.send();

        // check that the message was received in Kafka
        ConsumerRecord<String, String> kafkaTopicMsg = records.poll(10, TimeUnit.SECONDS);

        LOG.debug("kafka recieved = {}", kafkaTopicMsg);

        assertThat(kafkaTopicMsg).isNotNull();

    }

}

As always, any help would be appreciated.

All the code to reproduce is available at https://github.com/LewisWatson/kafka-embedded-test/tree/8322621ad4e302d982e5ecd28af9fd314696d850

Full stack trace is available at https://travis-ci.org/LewisWatson/kafka-embedded-test/builds/273227986

Понравилась статья? Поделить с друзьями:
  • Error description the software licensing service reported that the product sku is not found
  • Error description the software licensing service reported that the product key is invalid
  • Error description the software licensing service reported that the license is not installed перевод
  • Error description the software licensing service reported that license consumption failed
  • Error description the activation server determined the specified product key has been blocked