package io.siddhi.extension.io.kafka.source;

import io.siddhi.core.stream.input.source.SourceEventListener;
import io.siddhi.extension.io.kafka.Constants;
import io.siddhi.extension.io.kafka.metrics.SourceMetrics;
import io.siddhi.extension.io.kafka.sink.KafkaSink;
import io.siddhi.extension.io.kafka.source.KafkaSource;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.kafka.clients.consumer.CommitFailedException;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:io/siddhi/extension/io/kafka/source/KafkaConsumerThread.class */
public class KafkaConsumerThread implements Runnable {
    private static final Logger LOG = LogManager.getLogger(KafkaConsumerThread.class);
    final KafkaConsumer<byte[], byte[]> consumer;
    private final String[] partitions;
    private SourceEventListener sourceEventListener;
    private String[] topics;
    private volatile boolean paused;
    private volatile boolean inactive;
    private boolean isPartitionWiseThreading;
    private boolean isBinaryMessage;
    private boolean enableOffsetCommit;
    private boolean enableAutoCommit;
    private boolean enableAsyncCommit;
    private boolean consumerClosed;
    private KafkaSource.KafkaSourceState kafkaSourceState;
    private String[] requiredProperties;
    private int trpLength;
    private SourceMetrics metrics;
    private String groupId;
    private final Lock consumerLock = new ReentrantLock();
    List<TopicPartition> partitionsList = new ArrayList();
    boolean isReplayThread = false;
    private String consumerThreadId = buildId();
    private ReentrantLock lock = new ReentrantLock();
    private Condition condition = this.lock.newCondition();

    /* loaded from: input_file:io/siddhi/extension/io/kafka/source/KafkaConsumerThread$KafkaOffsetCommitCallback.class */
    private static class KafkaOffsetCommitCallback implements OffsetCommitCallback {
        private KafkaOffsetCommitCallback() {
        }

        public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception exc) {
            if (exc == null) {
                if (KafkaConsumerThread.LOG.isDebugEnabled()) {
                    for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : map.entrySet()) {
                        KafkaConsumerThread.LOG.debug("Asynchronously commit offset done for " + entry.getKey().topic() + " with offset of: " + entry.getValue().offset());
                    }
                    return;
                }
                return;
            }
            if (KafkaConsumerThread.LOG.isDebugEnabled()) {
                for (Map.Entry<TopicPartition, OffsetAndMetadata> entry2 : map.entrySet()) {
                    KafkaConsumerThread.LOG.debug("Commit offset exception for " + entry2.getKey().topic() + " with offset of: " + entry2.getValue().offset());
                }
            }
            KafkaConsumerThread.LOG.error("Exception occurred when committing offsets asynchronously.", exc);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public KafkaConsumerThread(SourceEventListener sourceEventListener, String[] strArr, String[] strArr2, Properties properties, boolean z, boolean z2, boolean z3, boolean z4, String[] strArr3, SourceMetrics sourceMetrics) {
        this.isPartitionWiseThreading = false;
        this.isBinaryMessage = false;
        this.enableOffsetCommit = false;
        this.enableAutoCommit = false;
        this.consumer = new KafkaConsumer<>(properties);
        this.sourceEventListener = sourceEventListener;
        this.topics = strArr;
        this.partitions = strArr2;
        this.isPartitionWiseThreading = z;
        this.isBinaryMessage = z2;
        this.enableOffsetCommit = z3;
        this.enableAutoCommit = Boolean.parseBoolean(properties.getProperty(KafkaSource.ADAPTOR_ENABLE_AUTO_COMMIT, "true"));
        this.groupId = properties.getProperty(KafkaSource.ADAPTOR_SUBSCRIBER_GROUP_ID);
        this.enableAsyncCommit = z4;
        this.metrics = sourceMetrics;
        this.requiredProperties = strArr3;
        if (strArr3 == null || strArr3.length <= 0) {
            this.trpLength = 0;
        } else {
            this.trpLength = strArr3.length;
        }
        if (null != strArr2) {
            for (String str : strArr) {
                for (String str2 : strArr2) {
                    TopicPartition topicPartition = new TopicPartition(str, Integer.parseInt(str2));
                    LOG.info("Adding partition " + str2 + " for topic: " + str);
                    this.partitionsList.add(topicPartition);
                }
                LOG.info("Adding partitions " + Arrays.toString(strArr2) + " for topic: " + str);
                this.consumer.assign(this.partitionsList);
            }
        } else {
            this.consumer.subscribe(Arrays.asList(strArr));
        }
        this.consumerClosed = false;
        LOG.info("Subscribed for topics: " + Arrays.toString(strArr));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void pause() {
        this.paused = true;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void resume() {
        restore();
        this.paused = false;
        try {
            this.lock.lock();
            this.condition.signalAll();
        } finally {
            this.lock.unlock();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void restore() {
        Lock lock = this.consumerLock;
        if (this.kafkaSourceState == null || this.kafkaSourceState.getTopicOffsetMap() == null) {
            return;
        }
        for (String str : this.topics) {
            Map<Integer, Long> map = this.kafkaSourceState.getTopicOffsetMap().get(str);
            if (null != map) {
                for (Map.Entry<Integer, Long> entry : map.entrySet()) {
                    TopicPartition topicPartition = new TopicPartition(str, entry.getKey().intValue());
                    if (this.partitionsList.contains(topicPartition)) {
                        LOG.info("Seeking partition: " + topicPartition + " for topic: " + str + " offset: " + (entry.getValue().longValue() + 1));
                        try {
                            lock.lock();
                            this.consumer.seek(topicPartition, entry.getValue().longValue() + 1);
                            lock.unlock();
                        } catch (Throwable th) {
                            lock.unlock();
                            throw th;
                        }
                    }
                }
            }
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v169, types: [byte[]] */
    @Override // java.lang.Runnable
    public void run() {
        String substring;
        Lock lock = this.consumerLock;
        while (!this.inactive) {
            if (this.paused) {
                this.lock.lock();
                try {
                    this.condition.await();
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                } finally {
                    this.lock.unlock();
                }
            }
            ConsumerRecords consumerRecords = null;
            try {
                try {
                    try {
                        lock.lock();
                        seekToRequiredOffset();
                        consumerRecords = this.consumer.poll(100L);
                        lock.unlock();
                    } catch (Throwable th) {
                        LOG.error("Consumer poll() failed.", th);
                        lock.unlock();
                    }
                } catch (CommitFailedException e2) {
                    LOG.warn("Consumer poll() failed." + e2.getMessage(), e2);
                    lock.unlock();
                }
                if (null != consumerRecords) {
                    Map<SequenceKey, Integer> map = null;
                    if (!this.isReplayThread && this.kafkaSourceState.getConsumerLastReceivedSeqNoMap() != null) {
                        map = this.kafkaSourceState.getConsumerLastReceivedSeqNoMap().get(this.consumerThreadId);
                    }
                    Iterator it = consumerRecords.iterator();
                    while (true) {
                        if (!it.hasNext()) {
                            break;
                        }
                        ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
                        String[] strArr = new String[this.trpLength];
                        int partition = consumerRecord.partition();
                        long offset = consumerRecord.offset();
                        String str = consumerRecord.topic();
                        long timestamp = consumerRecord.timestamp();
                        if (this.consumerClosed) {
                            if (!this.isReplayThread) {
                                this.kafkaSourceState.getTopicOffsetMap().get(consumerRecord.topic()).put(Integer.valueOf(consumerRecord.partition()), Long.valueOf(consumerRecord.offset()));
                            }
                            if (this.metrics != null) {
                                updateMetrics(str, partition, timestamp);
                            }
                        } else if (isRecordAfterStartOffset(consumerRecord)) {
                            Object value = consumerRecord.value();
                            String str2 = null;
                            long currentTimeMillis = System.currentTimeMillis();
                            if (LOG.isDebugEnabled()) {
                                LOG.debug("Event received in Kafka Event Adaptor with offSet: " + offset + ", key: " + consumerRecord.key() + ", topic: " + str + ", partition: " + partition + ", recordTimestamp: " + timestamp + ", eventTimestamp: " + currentTimeMillis + ", checksum: " + consumerRecord.checksum());
                            }
                            for (int i = 0; i < this.requiredProperties.length; i++) {
                                if (this.requiredProperties[i].equalsIgnoreCase(Constants.TRP_PARTITION)) {
                                    strArr[i] = String.valueOf(partition);
                                }
                                if (this.requiredProperties[i].equalsIgnoreCase(Constants.TRP_TOPIC)) {
                                    strArr[i] = str;
                                }
                                if (this.requiredProperties[i].equalsIgnoreCase(Constants.TRP_KEY)) {
                                    strArr[i] = String.valueOf(consumerRecord.key());
                                }
                                if (this.requiredProperties[i].equalsIgnoreCase(Constants.TRP_RECORD_TIMESTAMP)) {
                                    strArr[i] = String.valueOf(timestamp);
                                }
                                if (this.requiredProperties[i].equalsIgnoreCase(Constants.TRP_EVENT_TIMESTAMP)) {
                                    strArr[i] = String.valueOf(currentTimeMillis);
                                }
                                if (this.requiredProperties[i].equalsIgnoreCase(Constants.TRP_CHECK_SUM)) {
                                    strArr[i] = String.valueOf(consumerRecord.checksum());
                                }
                                if (this.requiredProperties[i].equalsIgnoreCase(Constants.TRP_OFFSET)) {
                                    strArr[i] = String.valueOf(offset);
                                }
                            }
                            String[] strArr2 = {"topic:" + str + ",partition:" + partition + ",offSet:" + offset};
                            if (map == null) {
                                this.sourceEventListener.onEvent(value, strArr, strArr2);
                            } else {
                                if (this.isBinaryMessage) {
                                    byte[] bArr = (byte[]) value;
                                    int i2 = ByteBuffer.wrap(bArr).getInt();
                                    str2 = new String(bArr, 4, i2 - 1, Charset.defaultCharset());
                                    substring = Arrays.copyOfRange(bArr, i2 + 4, bArr.length);
                                } else {
                                    String obj = value.toString();
                                    int indexOf = obj.indexOf(KafkaSink.SEQ_NO_HEADER_DELIMITER);
                                    substring = obj.substring(indexOf + 1);
                                    if (indexOf > 0) {
                                        str2 = obj.substring(0, indexOf);
                                    }
                                }
                                if (null == str2 || str2.isEmpty()) {
                                    LOG.warn("'Sequenced' option is set to true in Kafka source configuration. But this message does not contain the sequence number in consumer thread :" + this.consumerThreadId + ". Dropping the message");
                                } else {
                                    String[] split = str2.split(":");
                                    String str3 = split[0];
                                    Integer valueOf = Integer.valueOf(Integer.parseInt(split[1]));
                                    SequenceKey sequenceKey = new SequenceKey(str3, partition);
                                    Integer num = map.get(sequenceKey);
                                    if (num == null) {
                                        num = -1;
                                    }
                                    if (num.intValue() < valueOf.intValue()) {
                                        map.put(sequenceKey, valueOf);
                                        this.sourceEventListener.onEvent(substring, strArr, strArr2);
                                        if (LOG.isDebugEnabled()) {
                                            LOG.debug("Last Received SeqNo Updated to:" + valueOf + " for SeqKey:[" + sequenceKey.toString() + "] in Kafka consumer thread:" + this.consumerThreadId);
                                        }
                                    } else if (LOG.isDebugEnabled()) {
                                        LOG.debug("Duplicate Message arrived at Kafka Consumer Thread:" + this.consumerThreadId + ". SeqKey:[" + sequenceKey.toString() + "], Latest SeqNo:" + num + ", this message SeqNo:" + valueOf + ". Ignoring the message.");
                                    }
                                }
                            }
                            if (!this.isReplayThread) {
                                this.kafkaSourceState.getTopicOffsetMap().get(consumerRecord.topic()).put(Integer.valueOf(consumerRecord.partition()), Long.valueOf(consumerRecord.offset()));
                            }
                            if (endReplay(consumerRecord)) {
                                this.inactive = true;
                                break;
                            }
                        }
                    }
                    if (!this.isReplayThread && this.enableOffsetCommit && !this.enableAutoCommit) {
                        try {
                            try {
                                lock.lock();
                                if (!consumerRecords.isEmpty()) {
                                    if (this.enableAsyncCommit) {
                                        this.consumer.commitAsync(new KafkaOffsetCommitCallback());
                                    } else {
                                        try {
                                            this.consumer.commitSync();
                                        } catch (KafkaException e3) {
                                            if (this.metrics != null) {
                                                for (String str4 : this.topics) {
                                                    this.metrics.getErrorCountPerStream(str4, this.groupId, "KafkaException").inc();
                                                }
                                            }
                                            LOG.error("Exception occurred when committing offsets Synchronously", e3);
                                        }
                                    }
                                }
                                lock.unlock();
                            } catch (CommitFailedException e4) {
                                if (this.metrics != null) {
                                    for (String str5 : this.topics) {
                                        this.metrics.getErrorCountPerStream(str5, this.groupId, "CommitFailedException").inc();
                                    }
                                }
                                LOG.error("Kafka commit failed for topic kafka_result_topic", e4);
                                lock.unlock();
                            }
                        } catch (Throwable th2) {
                            lock.unlock();
                            throw th2;
                        }
                    }
                }
                try {
                    Thread.sleep(1L);
                } catch (InterruptedException e5) {
                    Thread.currentThread().interrupt();
                }
            } catch (Throwable th3) {
                lock.unlock();
                throw th3;
            }
        }
    }

    void seekToRequiredOffset() {
    }

    boolean isRecordAfterStartOffset(ConsumerRecord consumerRecord) {
        return true;
    }

    boolean endReplay(ConsumerRecord consumerRecord) {
        return false;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void shutdownConsumer() {
        try {
            this.consumerLock.lock();
            this.consumer.close();
            this.consumerClosed = true;
            this.inactive = true;
        } finally {
            this.consumerLock.unlock();
        }
    }

    public String buildId() {
        StringBuilder sb = new StringBuilder();
        int length = this.topics.length - 1;
        for (String str : this.topics) {
            sb.append(str);
            length--;
            if (length >= 0) {
                sb.append(":");
            }
        }
        if (this.partitions != null && this.isPartitionWiseThreading) {
            int length2 = this.partitions.length - 1;
            sb.append("-");
            for (String str2 : this.partitions) {
                sb.append(str2);
                length2--;
                if (length2 >= 0) {
                    sb.append(":");
                }
            }
        }
        return sb.toString();
    }

    public void setKafkaSourceState(KafkaSource.KafkaSourceState kafkaSourceState) {
        this.kafkaSourceState = kafkaSourceState;
        if (kafkaSourceState != null) {
            if (kafkaSourceState.getConsumerLastReceivedSeqNoMap() != null) {
                kafkaSourceState.getConsumerLastReceivedSeqNoMap().putIfAbsent(this.consumerThreadId, new HashMap());
            }
            for (String str : this.topics) {
                kafkaSourceState.getTopicOffsetMap().putIfAbsent(str, new HashMap());
            }
        }
    }

    private void updateMetrics(String str, int i, long j) {
        this.metrics.getTotalReads().inc();
        this.metrics.getCurrentOffset(str, Integer.valueOf(i), this.groupId);
        this.metrics.getReadCountPerStream(str, Integer.valueOf(i), this.groupId).inc();
        this.metrics.getLastMessageConsumedTime(str, this.groupId);
        this.metrics.getConsumerLag(str, this.groupId, i, j);
    }
}
