package io.siddhi.extension.io.kafka.source;

import io.siddhi.annotation.Example;
import io.siddhi.annotation.Extension;
import io.siddhi.annotation.Parameter;
import io.siddhi.annotation.util.DataType;
import io.siddhi.core.config.SiddhiAppContext;
import io.siddhi.core.exception.ConnectionUnavailableException;
import io.siddhi.core.exception.SiddhiAppRuntimeException;
import io.siddhi.core.stream.input.source.Source;
import io.siddhi.core.stream.input.source.SourceEventListener;
import io.siddhi.core.util.config.ConfigReader;
import io.siddhi.core.util.snapshot.state.State;
import io.siddhi.core.util.snapshot.state.StateFactory;
import io.siddhi.core.util.transport.OptionHolder;
import io.siddhi.extension.io.kafka.Constants;
import io.siddhi.extension.io.kafka.source.KafkaSource;
import io.siddhi.extension.io.kafka.util.KafkaReplayResponseSourceRegistry;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;

@Extension(name = "kafka-replay-response", namespace = "source", description = "This source is used to listen to replayed events requested from kafka-replay-request sink", parameters = {@Parameter(name = KafkaSource.ADAPTOR_SUBSCRIBER_ZOOKEEPER_CONNECT_SERVERS, description = "This specifies the list of Kafka servers to which the Kafka source must listen. This list can be provided as a set of comma-separated values.\ne.g., `localhost:9092,localhost:9093`", type = {DataType.STRING}), @Parameter(name = KafkaSource.ADAPTOR_SUBSCRIBER_GROUP_ID, description = "This is an ID to identify the Kafka source group. The group ID ensures that sources with the same topic and partition that are in the same group do not receive the same event.", type = {DataType.STRING}), @Parameter(name = KafkaSource.THREADING_OPTION, description = " This specifies whether the Kafka source is to be run on a single thread, or in multiple threads based on a condition. Possible values are as follows:\n`single.thread`: To run the Kafka source on a single thread.\n`topic.wise`: To use a separate thread per topic.\n`partition.wise`: To use a separate thread per partition.", type = {DataType.STRING}), @Parameter(name = Constants.SINK_ID, description = "a unique SINK_ID .", type = {DataType.INT})}, examples = {@Example(syntax = "@App:name('TestKafkaReplay')\n\n@sink(type='kafka-replay-request', sink.id='1')\ndefine stream BarStream (topicForReplay string, partitionForReplay string, startOffset string, endOffset string);\n\n@info(name = 'query1')\n@source(type='kafka-replay-response',  group.id='group', threading.option='single.thread', bootstrap.servers='localhost:9092', sink.id='1',\n@map(type='json'))\nDefine stream FooStream (symbol string, amount double);\n\n@sink(type='log')\nDefine stream logStream(symbol string, amount double);\n\nfrom FooStream select * insert into logStream;", description = "In this app we can send replay request events into BarStream and observe the replayed events in the logStream")})
/* loaded from: input_file:io/siddhi/extension/io/kafka/source/KafkaReplayResponseSource.class */
public class KafkaReplayResponseSource extends KafkaSource {
    private String sinkId;
    private List<Future<?>> futureList = new ArrayList();
    private List<KafkaReplayThread> kafkaReplayThreadList = new ArrayList();

    @Override // io.siddhi.extension.io.kafka.source.KafkaSource
    public StateFactory<KafkaSource.KafkaSourceState> init(SourceEventListener sourceEventListener, OptionHolder optionHolder, String[] strArr, ConfigReader configReader, SiddhiAppContext siddhiAppContext) {
        this.siddhiAppContext = siddhiAppContext;
        this.optionHolder = optionHolder;
        this.requiredProperties = (String[]) strArr.clone();
        this.sourceEventListener = sourceEventListener;
        if (configReader != null) {
            this.bootstrapServers = configReader.readConfig(KafkaSource.ADAPTOR_SUBSCRIBER_ZOOKEEPER_CONNECT_SERVERS, optionHolder.validateAndGetStaticValue(KafkaSource.ADAPTOR_SUBSCRIBER_ZOOKEEPER_CONNECT_SERVERS));
            this.groupID = configReader.readConfig(KafkaSource.ADAPTOR_SUBSCRIBER_GROUP_ID, optionHolder.validateAndGetStaticValue(KafkaSource.ADAPTOR_SUBSCRIBER_GROUP_ID));
            this.threadingOption = configReader.readConfig(KafkaSource.THREADING_OPTION, optionHolder.validateAndGetStaticValue(KafkaSource.THREADING_OPTION));
            this.seqEnabled = configReader.readConfig(KafkaSource.SEQ_ENABLED, optionHolder.validateAndGetStaticValue(KafkaSource.SEQ_ENABLED, "false")).equalsIgnoreCase("true");
            this.optionalConfigs = configReader.readConfig(KafkaSource.ADAPTOR_OPTIONAL_CONFIGURATION_PROPERTIES, optionHolder.validateAndGetStaticValue(KafkaSource.ADAPTOR_OPTIONAL_CONFIGURATION_PROPERTIES, (String) null));
            this.isBinaryMessage = Boolean.parseBoolean(configReader.readConfig(KafkaSource.IS_BINARY_MESSAGE, optionHolder.validateAndGetStaticValue(KafkaSource.IS_BINARY_MESSAGE, "false")));
            this.enableOffsetCommit = Boolean.parseBoolean(configReader.readConfig(KafkaSource.ADAPTOR_ENABLE_OFFSET_COMMIT, optionHolder.validateAndGetStaticValue(KafkaSource.ADAPTOR_ENABLE_OFFSET_COMMIT, "true")));
            this.enableAsyncCommit = Boolean.parseBoolean(configReader.readConfig(KafkaSource.ADAPTOR_ENABLE_ASYNC_COMMIT, optionHolder.validateAndGetStaticValue(KafkaSource.ADAPTOR_ENABLE_ASYNC_COMMIT, "true")));
        } else {
            this.bootstrapServers = optionHolder.validateAndGetStaticValue(KafkaSource.ADAPTOR_SUBSCRIBER_ZOOKEEPER_CONNECT_SERVERS);
            this.groupID = optionHolder.validateAndGetStaticValue(KafkaSource.ADAPTOR_SUBSCRIBER_GROUP_ID);
            this.threadingOption = optionHolder.validateAndGetStaticValue(KafkaSource.THREADING_OPTION);
            this.optionalConfigs = optionHolder.validateAndGetStaticValue(KafkaSource.ADAPTOR_OPTIONAL_CONFIGURATION_PROPERTIES, (String) null);
            this.isBinaryMessage = Boolean.parseBoolean(optionHolder.validateAndGetStaticValue(KafkaSource.IS_BINARY_MESSAGE, "false"));
            this.enableOffsetCommit = Boolean.parseBoolean(optionHolder.validateAndGetStaticValue(KafkaSource.ADAPTOR_ENABLE_OFFSET_COMMIT, "true"));
            this.enableAsyncCommit = Boolean.parseBoolean(optionHolder.validateAndGetStaticValue(KafkaSource.ADAPTOR_ENABLE_ASYNC_COMMIT, "true"));
        }
        this.seqEnabled = optionHolder.validateAndGetStaticValue(KafkaSource.SEQ_ENABLED, "false").equalsIgnoreCase("true");
        this.optionalConfigs = optionHolder.validateAndGetStaticValue(KafkaSource.ADAPTOR_OPTIONAL_CONFIGURATION_PROPERTIES, (String) null);
        this.isBinaryMessage = Boolean.parseBoolean(optionHolder.validateAndGetStaticValue(KafkaSource.IS_BINARY_MESSAGE, "false"));
        this.enableOffsetCommit = Boolean.parseBoolean(optionHolder.validateAndGetStaticValue(KafkaSource.ADAPTOR_ENABLE_OFFSET_COMMIT, "true"));
        this.enableAsyncCommit = Boolean.parseBoolean(optionHolder.validateAndGetStaticValue(KafkaSource.ADAPTOR_ENABLE_ASYNC_COMMIT, "true"));
        this.sinkId = optionHolder.validateAndGetStaticValue(Constants.SINK_ID);
        KafkaReplayResponseSourceRegistry.getInstance().putKafkaReplayResponseSource(this.sinkId, this);
        return () -> {
            return new KafkaSource.KafkaSourceState(this.seqEnabled);
        };
    }

    @Override // io.siddhi.extension.io.kafka.source.KafkaSource
    public void connect(Source<KafkaSource.KafkaSourceState>.ConnectionCallback connectionCallback, KafkaSource.KafkaSourceState kafkaSourceState) {
    }

    public void onReplayRequest(String str, String str2, String str3, String str4) throws ConnectionUnavailableException {
        try {
            ExecutorService executorService = this.siddhiAppContext.getExecutorService();
            KafkaReplayThread kafkaReplayThread = new KafkaReplayThread(this.sourceEventListener, new String[]{str4}, new String[]{str}, KafkaSource.createConsumerConfig(this.bootstrapServers, this.groupID, this.optionalConfigs, this.isBinaryMessage, this.enableOffsetCommit), false, this.isBinaryMessage, this.enableOffsetCommit, this.enableAsyncCommit, this.requiredProperties, Integer.parseInt(str2), Integer.parseInt(str3), this.futureList.size(), this.sinkId);
            this.kafkaReplayThreadList.add(kafkaReplayThread);
            this.futureList.add(executorService.submit(kafkaReplayThread));
        } catch (SiddhiAppRuntimeException e) {
            throw e;
        } catch (Throwable th) {
            throw new ConnectionUnavailableException("Error when initiating connection with Kafka server: " + this.bootstrapServers + " in Siddhi App: " + this.siddhiAppContext.getName(), th);
        }
    }

    public void onReplayFinish(int i) {
        this.kafkaReplayThreadList.get(i).shutdownConsumer();
        Future<?> future = this.futureList.get(i);
        if (future.isCancelled()) {
            return;
        }
        future.cancel(true);
    }

    @Override // io.siddhi.extension.io.kafka.source.KafkaSource
    public /* bridge */ /* synthetic */ void connect(Source.ConnectionCallback connectionCallback, State state) throws ConnectionUnavailableException {
        connect((Source<KafkaSource.KafkaSourceState>.ConnectionCallback) connectionCallback, (KafkaSource.KafkaSourceState) state);
    }
}
