package io.siddhi.extension.io.kafka.source;

import io.siddhi.annotation.Example;
import io.siddhi.annotation.Extension;
import io.siddhi.annotation.Parameter;
import io.siddhi.annotation.util.DataType;
import io.siddhi.core.config.SiddhiAppContext;
import io.siddhi.core.exception.ConnectionUnavailableException;
import io.siddhi.core.exception.SiddhiAppRuntimeException;
import io.siddhi.core.stream.ServiceDeploymentInfo;
import io.siddhi.core.stream.input.source.Source;
import io.siddhi.core.stream.input.source.SourceEventListener;
import io.siddhi.core.stream.input.source.SourceSyncCallback;
import io.siddhi.core.util.config.ConfigReader;
import io.siddhi.core.util.snapshot.state.State;
import io.siddhi.core.util.snapshot.state.StateFactory;
import io.siddhi.core.util.transport.OptionHolder;
import io.siddhi.extension.io.kafka.KafkaIOUtils;
import io.siddhi.extension.io.kafka.sink.KafkaSink;
import io.siddhi.query.api.exception.SiddhiAppValidationException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.PartitionInfo;
import org.apache.log4j.Logger;

@Extension(name = "kafka", namespace = "source", description = "A Kafka source receives events to be processed by WSO2 SP from a topic with a partition for a Kafka cluster. The events received can be in the `TEXT` `XML` `JSON` or `Binary` format.\nIf the topic is not already created in the Kafka cluster, the Kafka sink creates the default partition for the given topic.", parameters = {@Parameter(name = KafkaSource.ADAPTOR_SUBSCRIBER_ZOOKEEPER_CONNECT_SERVERS, description = "This specifies the list of Kafka servers to which the Kafka source must listen. This list can be provided as a set of comma-separated values.\ne.g., `localhost:9092,localhost:9093`", type = {DataType.STRING}), @Parameter(name = KafkaSource.ADAPTOR_SUBSCRIBER_TOPIC, description = "This specifies the list of topics to which the source must listen. This list can be provided as a set of comma-separated values.\ne.g., `topic_one,topic_two`", type = {DataType.STRING}), @Parameter(name = KafkaSource.ADAPTOR_SUBSCRIBER_GROUP_ID, description = "This is an ID to identify the Kafka source group. The group ID ensures that sources with the same topic and partition that are in the same group do not receive the same event.", type = {DataType.STRING}), @Parameter(name = KafkaSource.THREADING_OPTION, description = " This specifies whether the Kafka source is to be run on a single thread, or in multiple threads based on a condition. Possible values are as follows:\n`single.thread`: To run the Kafka source on a single thread.\n`topic.wise`: To use a separate thread per topic.\n`partition.wise`: To use a separate thread per partition.", type = {DataType.STRING}), @Parameter(name = KafkaSource.ADAPTOR_SUBSCRIBER_PARTITION_NO_LIST, description = "The partition number list for the given topic. This is provided as a list of comma-separated values. e.g., `0,1,2,`.", type = {DataType.STRING}, optional = true, defaultValue = "0"), @Parameter(name = KafkaSource.SEQ_ENABLED, description = "If this parameter is set to `true`, the sequence of the events received via the source is taken into account. Therefore, each event should contain a sequence number as an attribute value to indicate the sequence.", type = {DataType.BOOL}, optional = true, defaultValue = "false"), @Parameter(name = KafkaSource.IS_BINARY_MESSAGE, description = "In order to receive binary events via the Kafka source,it is required to setthis parameter to 'True'.", type = {DataType.BOOL}, optional = true, defaultValue = "false"), @Parameter(name = KafkaSource.TOPIC_OFFSET_MAP, description = "This parameter specifies reading offsets for each topic and partition. The value for this parameter is specified in the following format: \n `<topic>=<offset>,<topic>=<offset>,`\n  When an offset is defined for a topic, the Kafka source skips reading the message with the number specified as the offset as well as all the messages sent previous to that message. If the offset is not defined for a specific topic it reads messages from the beginning. \ne.g., `stocks=100,trades=50` reads from the 101th message of the `stocks` topic, and from the 51st message of the `trades` topic.", type = {DataType.STRING}, optional = true, defaultValue = "null"), @Parameter(name = KafkaSource.ADAPTOR_ENABLE_OFFSET_COMMIT, description = "This parameter specifies whether to commit offsets. \nIf the manual asynchronous offset committing is needed, `enable.offsets.commit` should be `true` and `enable.auto.commit` should be `false`. \nIf periodical committing is needed `enable.offsets.commit` should be `true` and `enable.auto.commit` should be `true`. \nIf committing is not needed, `enable.offsets.commit` should be `false`. \n\nNote: `enable.auto.commit` is an `optional.configuration` property. If it is set to `true`, Source will periodically(default: 1000ms. Configurable with `auto.commit.interval.ms` property as an `optional.configuration`) commit its current offset (defined as the offset of the next message to be read) for the partitions it is reading from back to Kafka. \nTo guarantee at-least-once processing, we recommend you to enable Siddhi Periodic State Persistence when `enable.auto.commit` property is set to `true`. \nDuring manual committing, it might introduce a latency during consumption.", type = {DataType.BOOL}, optional = true, defaultValue = "true"), @Parameter(name = KafkaSource.ADAPTOR_OPTIONAL_CONFIGURATION_PROPERTIES, description = "This parameter contains all the other possible configurations that the consumer is created with. \ne.g., `ssl.keystore.type:JKS,batch.size:200`.", type = {DataType.STRING}, optional = true, defaultValue = "null")}, examples = {@Example(syntax = "@App:name('TestExecutionPlan') \ndefine stream BarStream (symbol string, price float, volume long); \n@info(name = 'query1') \n@source(\ntype='kafka', \ntopic.list='kafka_topic,kafka_topic2', \ngroup.id='test', \nthreading.option='partition.wise', \nbootstrap.servers='localhost:9092', \npartition.no.list='0,1', \n@map(type='xml'))\nDefine stream FooStream (symbol string, price float, volume long);\nfrom FooStream select symbol, price, volume insert into BarStream;\n", description = "This kafka source configuration listens to the `kafka_topic` and `kafka_topic2` topics with `0` and `1` partitions. A thread is created for each topic and partition combination. The events are received in the XML format, mapped to a Siddhi event, and sent to a stream named `FooStream`. "), @Example(syntax = "@App:name('TestExecutionPlan') \ndefine stream BarStream (symbol string, price float, volume long); \n@info(name = 'query1') \n@source(\ntype='kafka', \ntopic.list='kafka_topic',\ngroup.id='test', \nthreading.option='single.thread',\nbootstrap.servers='localhost:9092',\n@map(type='xml'))\nDefine stream FooStream (symbol string, price float, volume long);\nfrom FooStream select symbol, price, volume insert into BarStream;\n", description = "This Kafka source configuration listens to the `kafka_topic` topic for the default partition because no `partition.no.list` is defined. Only one thread is created for the topic. The events are received in the XML format, mapped to a Siddhi event, and sent to a stream named `FooStream`.")})
/* loaded from: input_file:io/siddhi/extension/io/kafka/source/KafkaSource.class */
public class KafkaSource extends Source<KafkaSourceState> implements SourceSyncCallback {
    public static final String SINGLE_THREADED = "single.thread";
    static final String TOPIC_WISE = "topic.wise";
    static final String PARTITION_WISE = "partition.wise";
    public static final String ADAPTOR_SUBSCRIBER_TOPIC = "topic.list";
    public static final String ADAPTOR_SUBSCRIBER_GROUP_ID = "group.id";
    public static final String ADAPTOR_SUBSCRIBER_ZOOKEEPER_CONNECT_SERVERS = "bootstrap.servers";
    public static final String ADAPTOR_SUBSCRIBER_PARTITION_NO_LIST = "partition.no.list";
    public static final String ADAPTOR_ENABLE_AUTO_COMMIT = "enable.auto.commit";
    public static final String ADAPTOR_ENABLE_OFFSET_COMMIT = "enable.offsets.commit";
    public static final String ADAPTOR_OPTIONAL_CONFIGURATION_PROPERTIES = "optional.configuration";
    private static final String TOPIC_OFFSET_MAP = "topic.offsets.map";
    public static final String THREADING_OPTION = "threading.option";
    public static final String SEQ_ENABLED = "seq.enabled";
    private static final String LAST_RECEIVED_SEQ_NO_KEY = "lastReceivedSeqNo";
    public static final String IS_BINARY_MESSAGE = "is.binary.message";
    private static final Logger LOG = Logger.getLogger(KafkaSource.class);
    private static final String TOPIC = "topic";
    private static final String PARTITION = "partition";
    private static final String OFFSET = "offSet";
    private OptionHolder optionHolder;
    private ConsumerKafkaGroup consumerKafkaGroup;
    private String bootstrapServers;
    private String groupID;
    private String[] partitions;
    private String[] topics;
    private String optionalConfigs;
    private boolean seqEnabled = false;
    private boolean isBinaryMessage;
    private boolean enableOffsetCommit;
    private String topicOffsetMapConfig;
    private SiddhiAppContext siddhiAppContext;
    private KafkaSourceState kafkaSourceState;
    private String threadingOption;
    private SourceEventListener sourceEventListener;

    /* loaded from: input_file:io/siddhi/extension/io/kafka/source/KafkaSource$KafkaSourceState.class */
    public class KafkaSourceState extends State {
        private Map<String, Map<SequenceKey, Integer>> consumerLastReceivedSeqNoMap;
        private Map<String, Map<Integer, Long>> topicOffsetMap = new HashMap();
        private boolean isRestored = false;

        public Map<String, Map<Integer, Long>> getTopicOffsetMap() {
            return this.topicOffsetMap;
        }

        public Map<String, Map<SequenceKey, Integer>> getConsumerLastReceivedSeqNoMap() {
            return this.consumerLastReceivedSeqNoMap;
        }

        public KafkaSourceState(boolean z) {
            this.consumerLastReceivedSeqNoMap = null;
            if (z) {
                this.consumerLastReceivedSeqNoMap = new HashMap();
            }
        }

        public Map<String, Object> snapshot() {
            HashMap hashMap = new HashMap();
            hashMap.put(KafkaSource.TOPIC_OFFSET_MAP, this.topicOffsetMap);
            if (KafkaSource.this.seqEnabled) {
                hashMap.put(KafkaSource.LAST_RECEIVED_SEQ_NO_KEY, this.consumerLastReceivedSeqNoMap);
            }
            return hashMap;
        }

        public void restore(Map<String, Object> map) {
            this.isRestored = true;
            this.topicOffsetMap = (Map) map.get(KafkaSource.TOPIC_OFFSET_MAP);
            if (KafkaSource.this.consumerKafkaGroup != null) {
                KafkaSource.this.consumerKafkaGroup.restoreState();
            }
            if (KafkaSource.this.seqEnabled) {
                this.consumerLastReceivedSeqNoMap = (Map) map.get(KafkaSource.LAST_RECEIVED_SEQ_NO_KEY);
            }
        }

        public boolean canDestroy() {
            return false;
        }
    }

    public StateFactory<KafkaSourceState> init(SourceEventListener sourceEventListener, OptionHolder optionHolder, String[] strArr, ConfigReader configReader, SiddhiAppContext siddhiAppContext) {
        this.siddhiAppContext = siddhiAppContext;
        this.optionHolder = optionHolder;
        this.sourceEventListener = sourceEventListener;
        this.bootstrapServers = optionHolder.validateAndGetStaticValue(ADAPTOR_SUBSCRIBER_ZOOKEEPER_CONNECT_SERVERS);
        this.groupID = optionHolder.validateAndGetStaticValue(ADAPTOR_SUBSCRIBER_GROUP_ID);
        this.threadingOption = optionHolder.validateAndGetStaticValue(THREADING_OPTION);
        String validateAndGetStaticValue = optionHolder.validateAndGetStaticValue(ADAPTOR_SUBSCRIBER_PARTITION_NO_LIST, (String) null);
        this.partitions = validateAndGetStaticValue != null ? validateAndGetStaticValue.split(KafkaIOUtils.HEADER_SEPARATOR) : null;
        this.topics = optionHolder.validateAndGetStaticValue(ADAPTOR_SUBSCRIBER_TOPIC).split(KafkaIOUtils.HEADER_SEPARATOR);
        this.seqEnabled = optionHolder.validateAndGetStaticValue(SEQ_ENABLED, "false").equalsIgnoreCase("true");
        this.optionalConfigs = optionHolder.validateAndGetStaticValue(ADAPTOR_OPTIONAL_CONFIGURATION_PROPERTIES, (String) null);
        this.isBinaryMessage = Boolean.parseBoolean(optionHolder.validateAndGetStaticValue(IS_BINARY_MESSAGE, "false"));
        this.enableOffsetCommit = Boolean.parseBoolean(optionHolder.validateAndGetStaticValue(ADAPTOR_ENABLE_OFFSET_COMMIT, "true"));
        this.topicOffsetMapConfig = optionHolder.validateAndGetStaticValue(TOPIC_OFFSET_MAP, (String) null);
        if (PARTITION_WISE.equals(this.threadingOption) && null == this.partitions) {
            throw new SiddhiAppValidationException("Threading option is selected as 'partition.wise' but there are no partitions given");
        }
        return () -> {
            return new KafkaSourceState(this.seqEnabled);
        };
    }

    public Class[] getOutputEventClasses() {
        return new Class[]{String.class, ByteBuffer.class};
    }

    public void connect(Source<KafkaSourceState>.ConnectionCallback connectionCallback, KafkaSourceState kafkaSourceState) throws ConnectionUnavailableException {
        try {
            this.consumerKafkaGroup = new ConsumerKafkaGroup(this.topics, this.partitions, createConsumerConfig(this.bootstrapServers, this.groupID, this.optionalConfigs, this.isBinaryMessage, this.enableOffsetCommit), this.threadingOption, this.siddhiAppContext.getExecutorService(), this.isBinaryMessage, this.enableOffsetCommit, this.sourceEventListener);
            checkTopicsAvailableInCluster();
            checkPartitionsAvailableForTheTopicsInCluster();
            this.kafkaSourceState = kafkaSourceState;
            if (kafkaSourceState.isRestored || this.topicOffsetMapConfig == null) {
                this.consumerKafkaGroup.setKafkaSourceState(kafkaSourceState);
            } else {
                synchronized (kafkaSourceState) {
                    kafkaSourceState.topicOffsetMap = readTopicOffsetsConfig(this.topicOffsetMapConfig);
                }
                this.consumerKafkaGroup.setKafkaSourceState(kafkaSourceState);
                this.consumerKafkaGroup.restoreState();
            }
            this.consumerKafkaGroup.run();
        } catch (SiddhiAppRuntimeException e) {
            throw e;
        } catch (Throwable th) {
            throw new ConnectionUnavailableException("Error when initiating connection with Kafka server: " + this.bootstrapServers + " in Siddhi App: " + this.siddhiAppContext.getName(), th);
        }
    }

    public void disconnect() {
        this.kafkaSourceState = null;
        if (this.consumerKafkaGroup != null) {
            this.consumerKafkaGroup.setKafkaSourceState(null);
            this.consumerKafkaGroup.shutdown();
            LOG.info("Kafka Adapter disconnected for topic(s): " + this.optionHolder.validateAndGetStaticValue(ADAPTOR_SUBSCRIBER_TOPIC));
        }
    }

    public void destroy() {
        this.consumerKafkaGroup = null;
    }

    public void pause() {
        if (this.consumerKafkaGroup != null) {
            this.consumerKafkaGroup.pause();
            LOG.info("Kafka Adapter paused for topic(s): " + this.optionHolder.validateAndGetStaticValue(ADAPTOR_SUBSCRIBER_TOPIC));
        }
    }

    public void resume() {
        if (this.consumerKafkaGroup != null) {
            this.consumerKafkaGroup.resume();
            if (LOG.isDebugEnabled()) {
                LOG.debug("Kafka Adapter resumed for topic(s): " + this.optionHolder.validateAndGetStaticValue(ADAPTOR_SUBSCRIBER_TOPIC));
            }
        }
    }

    public void update(String[] strArr) {
        for (String str : strArr) {
            String str2 = "";
            Integer num = 0;
            for (String str3 : str.split(KafkaIOUtils.HEADER_SEPARATOR)) {
                String[] split = str3.split(KafkaSink.SEQ_NO_HEADER_FIELD_SEPERATOR);
                if (split[0].equals(TOPIC)) {
                    str2 = split[1];
                    this.kafkaSourceState.topicOffsetMap.computeIfAbsent(split[1], str4 -> {
                        return new HashMap();
                    });
                } else if (split[0].equals(PARTITION)) {
                    Map map = (Map) this.kafkaSourceState.topicOffsetMap.get(str2);
                    if (null == map.get(Integer.valueOf(split[1]))) {
                        num = Integer.valueOf(split[1]);
                        map.put(num, 0L);
                    }
                } else if (split[0].equals(OFFSET)) {
                    Map map2 = (Map) this.kafkaSourceState.topicOffsetMap.get(str2);
                    long longValue = ((Long) map2.get(num)).longValue();
                    Long valueOf = Long.valueOf(split[1]);
                    if (valueOf.longValue() > longValue) {
                        map2.put(num, valueOf);
                    }
                }
            }
        }
    }

    protected ServiceDeploymentInfo exposeServiceDeploymentInfo() {
        return null;
    }

    private Map<String, Map<Integer, Long>> readTopicOffsetsConfig(String str) {
        HashMap hashMap = new HashMap();
        for (String str2 : str.split(KafkaIOUtils.HEADER_SEPARATOR)) {
            String[] split = str2.split("=");
            if (split.length != 2) {
                LOG.error("Topic offset should be given in <topic>=<offset>,.. format. ");
                return null;
            }
            if (!Arrays.stream(this.topics).anyMatch(str3 -> {
                return str3.equals(split[0]);
            })) {
                LOG.error("Topic " + split[0] + " not listed in topic.list config");
                return null;
            }
            HashMap hashMap2 = new HashMap();
            Arrays.stream(this.partitions).forEach(str4 -> {
                hashMap2.put(Integer.valueOf(Integer.parseInt(str4)), Long.valueOf(Long.parseLong(split[1])));
            });
            hashMap.put(split[0], hashMap2);
        }
        return hashMap;
    }

    private void checkTopicsAvailableInCluster() {
        Properties createConsumerConfig = createConsumerConfig(this.bootstrapServers, this.groupID, this.optionalConfigs, this.isBinaryMessage, this.enableOffsetCommit);
        createConsumerConfig.put(ADAPTOR_SUBSCRIBER_GROUP_ID, "test-consumer-group");
        Map listTopics = new KafkaConsumer(createConsumerConfig).listTopics();
        boolean z = true;
        StringBuilder sb = new StringBuilder("");
        for (String str : this.topics) {
            boolean z2 = false;
            Iterator it = listTopics.entrySet().iterator();
            while (it.hasNext()) {
                if (((String) ((Map.Entry) it.next()).getKey()).equals(str)) {
                    z2 = true;
                }
            }
            if (!z2) {
                z = false;
                if ("".equals(sb.toString())) {
                    sb.append(str);
                } else {
                    sb.append(',').append(str);
                }
                LOG.warn("Topic, " + str + " is not available.");
            }
        }
        if (null != this.partitions && ((this.partitions.length != 1 || !this.partitions[0].equals("0")) && !z)) {
            LOG.error("Topic(s) " + ((Object) sb) + " aren't available. Topics won't be created since there are partition numbers defined in the query.");
            throw new SiddhiAppRuntimeException("Topic(s) " + ((Object) sb) + " aren't available. Topics won't be created since there are partition numbers defined in the query.");
        }
        if (z) {
            return;
        }
        if (!this.siddhiAppContext.isTransportChannelCreationEnabled()) {
            throw new SiddhiAppRuntimeException("Topic(s) " + ((Object) sb) + " creation failed. User has disabled topic creation by setting transportChannelCreationEnabled property to false. Hence Siddhi App deployment will be aborted.");
        }
        LOG.warn("Topic(s) " + ((Object) sb) + " aren't available. These Topics will be created with the default partition.");
    }

    private void checkPartitionsAvailableForTheTopicsInCluster() {
        KafkaProducer kafkaProducer = new KafkaProducer(createProducerConfig(this.bootstrapServers, this.optionalConfigs, this.isBinaryMessage));
        boolean z = true;
        StringBuilder sb = new StringBuilder("");
        for (String str : this.topics) {
            List partitionsFor = kafkaProducer.partitionsFor(str);
            if (null != this.partitions && (this.partitions.length != 1 || !this.partitions[0].equals("0"))) {
                for (String str2 : this.partitions) {
                    boolean z2 = false;
                    Iterator it = partitionsFor.iterator();
                    while (it.hasNext()) {
                        if (Integer.parseInt(str2) == ((PartitionInfo) it.next()).partition()) {
                            z2 = true;
                        }
                    }
                    if (!z2) {
                        z = false;
                        if ("".equals(sb.toString())) {
                            sb.append(str2);
                        } else {
                            sb.append(',').append(str2);
                        }
                        LOG.error("Partition number, " + str2 + " in 'partition.id' is not available in topic partitions");
                    }
                }
                if (!z) {
                    throw new SiddhiAppRuntimeException("Partition number(s) " + ((Object) sb) + " aren't available for the topic: " + str);
                }
            }
        }
    }

    private static Properties createConsumerConfig(String str, String str2, String str3, boolean z, boolean z2) {
        Properties properties = new Properties();
        properties.put(ADAPTOR_SUBSCRIBER_ZOOKEEPER_CONNECT_SERVERS, str);
        properties.put(ADAPTOR_SUBSCRIBER_GROUP_ID, str2);
        properties.put("session.timeout.ms", "30000");
        if (!z2) {
            properties.put(ADAPTOR_ENABLE_AUTO_COMMIT, "false");
        }
        properties.put("auto.offset.reset", "earliest");
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        if (z) {
            properties.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        } else {
            properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        }
        KafkaIOUtils.splitHeaderValues(str3, properties);
        return properties;
    }

    private static Properties createProducerConfig(String str, String str2, boolean z) {
        Properties properties = new Properties();
        properties.put(ADAPTOR_SUBSCRIBER_ZOOKEEPER_CONNECT_SERVERS, str);
        properties.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        KafkaIOUtils.splitHeaderValues(str2, properties);
        if (z) {
            properties.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        } else {
            properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        }
        return properties;
    }

    public /* bridge */ /* synthetic */ void connect(Source.ConnectionCallback connectionCallback, State state) throws ConnectionUnavailableException {
        connect((Source<KafkaSourceState>.ConnectionCallback) connectionCallback, (KafkaSourceState) state);
    }
}
