package io.siddhi.extension.io.kafka.source;

import io.siddhi.core.stream.input.source.SourceEventListener;
import io.siddhi.extension.io.kafka.metrics.SourceMetrics;
import io.siddhi.extension.io.kafka.source.KafkaSource;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:io/siddhi/extension/io/kafka/source/ConsumerKafkaGroup.class */
public class ConsumerKafkaGroup {
    private static final Logger LOG = LogManager.getLogger(ConsumerKafkaGroup.class);
    private final String[] topics;
    private final String[] partitions;
    private final Properties props;
    private ExecutorService executorService;
    private String threadingOption;
    private boolean isBinaryMessage;
    private KafkaSource.KafkaSourceState kafkaSourceState;
    private List<KafkaConsumerThread> kafkaConsumerThreadList = new ArrayList();
    private List<Future<?>> futureList = new ArrayList();

    /* JADX INFO: Access modifiers changed from: package-private */
    public ConsumerKafkaGroup(String[] strArr, String[] strArr2, Properties properties, String str, ExecutorService executorService, boolean z, boolean z2, boolean z3, SourceEventListener sourceEventListener, String[] strArr3, SourceMetrics sourceMetrics) {
        this.threadingOption = str;
        this.topics = strArr;
        this.partitions = strArr2;
        this.props = properties;
        this.executorService = executorService;
        this.isBinaryMessage = z;
        if (KafkaSource.SINGLE_THREADED.equals(str)) {
            this.kafkaConsumerThreadList.add(new KafkaConsumerThread(sourceEventListener, strArr, strArr2, properties, false, z, z2, z3, strArr3, sourceMetrics));
            LOG.info("Kafka Consumer thread starting to listen on topic(s): " + Arrays.toString(strArr) + " with partition/s: " + Arrays.toString(strArr2));
            return;
        }
        if ("topic.wise".equals(str)) {
            for (String str2 : strArr) {
                this.kafkaConsumerThreadList.add(new KafkaConsumerThread(sourceEventListener, new String[]{str2}, strArr2, properties, false, z, z2, z3, strArr3, sourceMetrics));
                LOG.info("Kafka Consumer thread starting to listen on topic: " + str2 + " with partition/s: " + Arrays.toString(strArr2));
            }
            return;
        }
        if ("partition.wise".equals(str)) {
            for (String str3 : strArr) {
                for (String str4 : strArr2) {
                    this.kafkaConsumerThreadList.add(new KafkaConsumerThread(sourceEventListener, new String[]{str3}, new String[]{str4}, properties, true, z, z2, z3, strArr3, sourceMetrics));
                    LOG.info("Kafka Consumer thread starting to listen on topic: " + str3 + " with partition: " + str4);
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void pause() {
        this.kafkaConsumerThreadList.forEach((v0) -> {
            v0.pause();
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void resume() {
        this.kafkaConsumerThreadList.forEach((v0) -> {
            v0.resume();
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void restoreState() {
        this.kafkaConsumerThreadList.forEach(kafkaConsumerThread -> {
            kafkaConsumerThread.restore();
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void shutdown() {
        this.kafkaConsumerThreadList.forEach((v0) -> {
            v0.shutdownConsumer();
        });
        this.futureList.forEach(future -> {
            if (future.isCancelled()) {
                return;
            }
            future.cancel(true);
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void run() {
        try {
            Iterator<KafkaConsumerThread> it = this.kafkaConsumerThreadList.iterator();
            while (it.hasNext()) {
                this.futureList.add(this.executorService.submit(it.next()));
            }
        } catch (Throwable th) {
            LOG.error("Error while creating KafkaConsumerThread for topic(s): " + Arrays.toString(this.topics), th);
        }
    }

    public void setKafkaSourceState(KafkaSource.KafkaSourceState kafkaSourceState) {
        this.kafkaSourceState = kafkaSourceState;
        Iterator<KafkaConsumerThread> it = this.kafkaConsumerThreadList.iterator();
        while (it.hasNext()) {
            it.next().setKafkaSourceState(kafkaSourceState);
        }
    }
}
