package io.siddhi.extension.io.kafka.multidc.source;

import io.siddhi.annotation.Example;
import io.siddhi.annotation.Extension;
import io.siddhi.annotation.Parameter;
import io.siddhi.annotation.util.DataType;
import io.siddhi.core.config.SiddhiAppContext;
import io.siddhi.core.exception.ConnectionUnavailableException;
import io.siddhi.core.stream.ServiceDeploymentInfo;
import io.siddhi.core.stream.input.source.Source;
import io.siddhi.core.stream.input.source.SourceEventListener;
import io.siddhi.core.util.config.ConfigReader;
import io.siddhi.core.util.snapshot.state.State;
import io.siddhi.core.util.snapshot.state.StateFactory;
import io.siddhi.core.util.transport.OptionHolder;
import io.siddhi.extension.io.kafka.source.KafkaSource;
import io.siddhi.query.api.exception.SiddhiAppValidationException;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import org.apache.log4j.Logger;

@Extension(name = "kafkaMultiDC", namespace = "source", description = "The Kafka Multi-Datacenter(DC) source receives records from the same topic in brokers deployed in two different kafka clusters. It filters out all the duplicate messages and ensuresthat the events are received in the correct order using sequential numbering. It receives events in formats such as `TEXT`, `XML` JSON` and `Binary`.The Kafka Source creates the default partition '0' for a given topic, if the topic has not yet been created in the Kafka cluster.", parameters = {@Parameter(name = KafkaSource.ADAPTOR_SUBSCRIBER_ZOOKEEPER_CONNECT_SERVERS, description = "This contains the kafka server list which the kafka source listens to. This is given using comma-separated values. eg: 'localhost:9092,localhost:9093' ", type = {DataType.STRING}), @Parameter(name = KafkaMultiDCSource.KAFKA_TOPIC, description = "This is the topic that the source listens to. eg: 'topic_one' ", type = {DataType.STRING}), @Parameter(name = KafkaMultiDCSource.KAFKA_PARTITION_NO, description = "This is the partition number of the given topic.", type = {DataType.INT}, optional = true, defaultValue = "0"), @Parameter(name = KafkaSource.IS_BINARY_MESSAGE, description = "In order to receive the binary events via the Kafka Multi-DC source, the value of this parameter needs to be set to 'True'.", type = {DataType.BOOL}, optional = true, defaultValue = "false"), @Parameter(name = KafkaSource.ADAPTOR_OPTIONAL_CONFIGURATION_PROPERTIES, description = "This contains all the other possible configurations with which the consumer can be created.eg: producer.type:async,batch.size:200", type = {DataType.STRING}, optional = true, defaultValue = "null")}, examples = {@Example(syntax = "@App:name('TestExecutionPlan') \ndefine stream BarStream (symbol string, price float, volume long); \n@info(name = 'query1') \n@source(type='kafkaMultiDC', topic='kafka_topic', bootstrap.servers='host1:9092,host1:9093', partition.no='1', @map(type='xml'))\nDefine stream FooStream (symbol string, price float, volume long);\nfrom FooStream select symbol, price, volume insert into BarStream;\n", description = "The following query listens to 'kafka_topic' topic, deployed in the broker host1:9092 and host1:9093, with partition 1. A thread is created for each broker. The receiving xml events are mapped to a siddhi event and sent to the FooStream.")})
/* loaded from: input_file:io/siddhi/extension/io/kafka/multidc/source/KafkaMultiDCSource.class */
public class KafkaMultiDCSource extends Source<KafkaMultiDCSourceState> {
    private static final String KAFKA_TOPIC = "topic";
    private static final String KAFKA_PARTITION_NO = "partition.no";
    private static final Logger LOG = Logger.getLogger(KafkaMultiDCSource.class);
    private static final String LAST_RECEIVED_SEQ_NO_KEY = "lastConsumedSeqNo";
    private SourceEventListener eventListener;
    private Map<String, KafkaSource> sources = new HashMap();
    private Map<String, StateFactory<KafkaSource.KafkaSourceState>> stateFactories = new HashMap();
    private String[] bootstrapServers;
    private SourceSynchronizer synchronizer;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/siddhi/extension/io/kafka/multidc/source/KafkaMultiDCSource$KafkaMultiDCSourceState.class */
    public class KafkaMultiDCSourceState extends State {
        Map<String, KafkaSource.KafkaSourceState> kafkaSourceStateMap;

        KafkaMultiDCSourceState(Map<String, KafkaSource.KafkaSourceState> map) {
            this.kafkaSourceStateMap = new HashMap();
            this.kafkaSourceStateMap = map;
        }

        public Map<String, Object> snapshot() {
            HashMap hashMap = new HashMap();
            for (Map.Entry<String, KafkaSource.KafkaSourceState> entry : this.kafkaSourceStateMap.entrySet()) {
                hashMap.put(entry.getKey(), entry.getValue().snapshot());
            }
            hashMap.put(KafkaMultiDCSource.LAST_RECEIVED_SEQ_NO_KEY, KafkaMultiDCSource.this.synchronizer.getLastConsumedSeqNo());
            return hashMap;
        }

        public void restore(Map<String, Object> map) {
            KafkaMultiDCSource.this.synchronizer.setLastConsumedSeqNo(((Long) map.get(KafkaMultiDCSource.LAST_RECEIVED_SEQ_NO_KEY)).longValue());
            for (Map.Entry<String, KafkaSource.KafkaSourceState> entry : this.kafkaSourceStateMap.entrySet()) {
                entry.getValue().restore((Map) map.get(entry.getKey()));
            }
            this.kafkaSourceStateMap = (Map) map.get("SOURCE_STATES");
        }

        public boolean canDestroy() {
            return false;
        }
    }

    /* loaded from: input_file:io/siddhi/extension/io/kafka/multidc/source/KafkaMultiDCSource$KafkaMultiDCSourceStateFactory.class */
    class KafkaMultiDCSourceStateFactory implements StateFactory<KafkaMultiDCSourceState> {
        private Map<String, StateFactory<KafkaSource.KafkaSourceState>> stateFactories;

        public KafkaMultiDCSourceStateFactory(Map<String, StateFactory<KafkaSource.KafkaSourceState>> map) {
            this.stateFactories = map;
        }

        /* renamed from: createNewState, reason: merged with bridge method [inline-methods] */
        public KafkaMultiDCSourceState m3createNewState() {
            HashMap hashMap = new HashMap();
            for (Map.Entry<String, StateFactory<KafkaSource.KafkaSourceState>> entry : this.stateFactories.entrySet()) {
                hashMap.put(entry.getKey(), entry.getValue().createNewState());
            }
            return new KafkaMultiDCSourceState(hashMap);
        }
    }

    public StateFactory<KafkaMultiDCSourceState> init(SourceEventListener sourceEventListener, OptionHolder optionHolder, String[] strArr, ConfigReader configReader, SiddhiAppContext siddhiAppContext) {
        this.eventListener = sourceEventListener;
        String validateAndGetStaticValue = optionHolder.validateAndGetStaticValue(KafkaSource.ADAPTOR_SUBSCRIBER_ZOOKEEPER_CONNECT_SERVERS);
        boolean parseBoolean = Boolean.parseBoolean(optionHolder.validateAndGetStaticValue(KafkaSource.IS_BINARY_MESSAGE, "false"));
        this.bootstrapServers = validateAndGetStaticValue.split(KafkaSource.HEADER_SEPARATOR);
        if (this.bootstrapServers.length != 2) {
            throw new SiddhiAppValidationException("There should be two servers listed in 'bootstrap.servers' configuration to ensure fault tolerant kafka messaging.");
        }
        this.synchronizer = new SourceSynchronizer(sourceEventListener, this.bootstrapServers, 1000, 10);
        LOG.info("Initializing kafka source for bootstrap server :" + this.bootstrapServers[0]);
        Interceptor interceptor = new Interceptor(this.bootstrapServers[0], this.synchronizer, parseBoolean);
        OptionHolder createOptionHolders = createOptionHolders(this.bootstrapServers[0], optionHolder);
        KafkaSource kafkaSource = new KafkaSource();
        this.stateFactories.put(this.bootstrapServers[0], kafkaSource.init(interceptor, createOptionHolders, strArr, configReader, siddhiAppContext));
        this.sources.put(this.bootstrapServers[0], kafkaSource);
        LOG.info("Initializing kafka source for bootstrap server :" + this.bootstrapServers[1]);
        Interceptor interceptor2 = new Interceptor(this.bootstrapServers[1], this.synchronizer, parseBoolean);
        OptionHolder createOptionHolders2 = createOptionHolders(this.bootstrapServers[1], optionHolder);
        KafkaSource kafkaSource2 = new KafkaSource();
        this.stateFactories.put(this.bootstrapServers[1], kafkaSource2.init(interceptor2, createOptionHolders2, strArr, configReader, siddhiAppContext));
        this.sources.put(this.bootstrapServers[1], kafkaSource2);
        return new KafkaMultiDCSourceStateFactory(this.stateFactories);
    }

    public Class[] getOutputEventClasses() {
        return new Class[]{String.class, byte[].class};
    }

    public void connect(Source<KafkaMultiDCSourceState>.ConnectionCallback connectionCallback, KafkaMultiDCSourceState kafkaMultiDCSourceState) throws ConnectionUnavailableException {
        StringBuilder sb = new StringBuilder();
        for (Map.Entry<String, KafkaSource> entry : this.sources.entrySet()) {
            try {
                entry.getValue().connect((Source<KafkaSource.KafkaSourceState>.ConnectionCallback) connectionCallback, kafkaMultiDCSourceState.kafkaSourceStateMap.get(entry.getKey().toString()));
                LOG.info("Connect to bootstrap server " + ((Object) entry.getKey()));
            } catch (ConnectionUnavailableException e) {
                sb.append("Error occurred while connecting to ").append((Object) entry.getKey()).append(":").append(e.getMessage()).append("\n");
            }
        }
        if (sb.toString().isEmpty()) {
            return;
        }
        LOG.error("Error while trying to connect boot strap server(s): " + sb.toString());
        throw new ConnectionUnavailableException(sb.toString());
    }

    public void disconnect() {
        this.sources.values().forEach((v0) -> {
            v0.disconnect();
        });
    }

    public void destroy() {
        this.sources.values().forEach((v0) -> {
            v0.destroy();
        });
    }

    public void pause() {
        this.sources.values().forEach((v0) -> {
            v0.pause();
        });
    }

    public void resume() {
        this.sources.values().forEach((v0) -> {
            v0.resume();
        });
    }

    private OptionHolder createOptionHolders(String str, OptionHolder optionHolder) {
        HashMap hashMap = new HashMap();
        hashMap.put(KafkaSource.ADAPTOR_SUBSCRIBER_ZOOKEEPER_CONNECT_SERVERS, str);
        hashMap.put(KafkaSource.ADAPTOR_SUBSCRIBER_GROUP_ID, UUID.randomUUID().toString());
        hashMap.put(KafkaSource.THREADING_OPTION, KafkaSource.SINGLE_THREADED);
        hashMap.put(KafkaSource.SEQ_ENABLED, "false");
        hashMap.put(KafkaSource.ADAPTOR_SUBSCRIBER_PARTITION_NO_LIST, optionHolder.validateAndGetStaticValue(KAFKA_PARTITION_NO, "0"));
        hashMap.put(KafkaSource.ADAPTOR_SUBSCRIBER_TOPIC, optionHolder.validateAndGetStaticValue(KAFKA_TOPIC));
        hashMap.put(KafkaSource.ADAPTOR_OPTIONAL_CONFIGURATION_PROPERTIES, optionHolder.validateAndGetStaticValue(KafkaSource.ADAPTOR_OPTIONAL_CONFIGURATION_PROPERTIES, (String) null));
        hashMap.put(KafkaSource.IS_BINARY_MESSAGE, optionHolder.validateAndGetStaticValue(KafkaSource.IS_BINARY_MESSAGE, "false"));
        return new OptionHolder(this.eventListener.getStreamDefinition(), hashMap, new HashMap(), KafkaSource.class.getAnnotation(Extension.class));
    }

    protected ServiceDeploymentInfo exposeServiceDeploymentInfo() {
        return null;
    }

    public /* bridge */ /* synthetic */ void connect(Source.ConnectionCallback connectionCallback, State state) throws ConnectionUnavailableException {
        connect((Source<KafkaMultiDCSourceState>.ConnectionCallback) connectionCallback, (KafkaMultiDCSourceState) state);
    }
}
