package org.springframework.pulsar.listener;

import io.micrometer.observation.Observation;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.pulsar.client.api.BatchReceivePolicy;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageListener;
import org.apache.pulsar.client.api.Messages;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.core.log.LogAccessor;
import org.springframework.core.task.AsyncTaskExecutor;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.pulsar.core.ConsumerBuilderConfigurationUtil;
import org.springframework.pulsar.core.ConsumerBuilderCustomizer;
import org.springframework.pulsar.core.PulsarConsumerFactory;
import org.springframework.pulsar.event.ConsumerFailedToStartEvent;
import org.springframework.pulsar.event.ConsumerStartedEvent;
import org.springframework.pulsar.event.ConsumerStartingEvent;
import org.springframework.pulsar.observation.DefaultPulsarListenerObservationConvention;
import org.springframework.pulsar.observation.PulsarListenerObservation;
import org.springframework.pulsar.observation.PulsarMessageReceiverContext;
import org.springframework.scheduling.SchedulingAwareRunnable;
import org.springframework.util.Assert;
import org.springframework.util.ReflectionUtils;
import org.springframework.util.StringUtils;

/* loaded from: input_file:org/springframework/pulsar/listener/DefaultPulsarMessageListenerContainer.class */
public class DefaultPulsarMessageListenerContainer<T> extends AbstractPulsarMessageListenerContainer<T> {
    private volatile CompletableFuture<?> listenerConsumerFuture;
    private volatile DefaultPulsarMessageListenerContainer<T>.Listener listenerConsumer;
    private volatile CountDownLatch startLatch;
    private final AbstractPulsarMessageListenerContainer<?> thisOrParentContainer;
    private final AtomicReference<Thread> listenerConsumerThread;
    private final AtomicBoolean receiveInProgress;
    private final Lock lockOnPause;
    private final Condition pausedCondition;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/springframework/pulsar/listener/DefaultPulsarMessageListenerContainer$AbstractAcknowledgement.class */
    public static abstract class AbstractAcknowledgement implements Acknowledgement {
        private static final LogAccessor logger = new LogAccessor(AbstractAcknowledgement.class);
        protected final Consumer<?> consumer;

        AbstractAcknowledgement(Consumer<?> consumer) {
            this.consumer = consumer;
        }

        @Override // org.springframework.pulsar.listener.Acknowledgement
        public void acknowledge(MessageId messageId) {
            handleAckByMessageId(this.consumer, messageId);
        }

        private static void handleAckByMessageId(Consumer<?> consumer, MessageId messageId) {
            try {
                consumer.acknowledge(messageId);
            } catch (PulsarClientException e) {
                logger.warn(e, () -> {
                    return "Acknowledgment failed for message: [%s]".formatted(messageId);
                });
                consumer.negativeAcknowledge(messageId);
            }
        }

        @Override // org.springframework.pulsar.listener.Acknowledgement
        public void acknowledge(List<MessageId> list) {
            try {
                this.consumer.acknowledge(list);
            } catch (PulsarClientException e) {
                Iterator<MessageId> it = list.iterator();
                while (it.hasNext()) {
                    handleAckByMessageId(this.consumer, it.next());
                }
            }
        }

        @Override // org.springframework.pulsar.listener.Acknowledgement
        public void nack(MessageId messageId) {
            this.consumer.negativeAcknowledge(messageId);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/springframework/pulsar/listener/DefaultPulsarMessageListenerContainer$ConsumerAcknowledgment.class */
    public static final class ConsumerAcknowledgment extends AbstractAcknowledgement {
        private final Message<?> message;

        ConsumerAcknowledgment(Consumer<?> consumer, Message<?> message) {
            super(consumer);
            this.message = message;
        }

        @Override // org.springframework.pulsar.listener.Acknowledgement
        public void acknowledge() {
            acknowledge(this.message.getMessageId());
        }

        @Override // org.springframework.pulsar.listener.Acknowledgement
        public void nack() {
            this.consumer.negativeAcknowledge(this.message);
        }
    }

    /* loaded from: input_file:org/springframework/pulsar/listener/DefaultPulsarMessageListenerContainer$ConsumerBatchAcknowledgment.class */
    private static final class ConsumerBatchAcknowledgment extends AbstractAcknowledgement {
        ConsumerBatchAcknowledgment(Consumer<?> consumer) {
            super(consumer);
        }

        @Override // org.springframework.pulsar.listener.Acknowledgement
        public void acknowledge() {
            throw new UnsupportedOperationException();
        }

        @Override // org.springframework.pulsar.listener.Acknowledgement
        public void nack() {
            throw new UnsupportedOperationException();
        }
    }

    /* loaded from: input_file:org/springframework/pulsar/listener/DefaultPulsarMessageListenerContainer$Listener.class */
    private final class Listener implements SchedulingAwareRunnable {
        private final PulsarRecordMessageListener<T> listener;
        private final PulsarBatchMessageListener<T> batchMessageListener;
        private final PulsarContainerProperties containerProperties;
        private Consumer<T> consumer;
        private final Set<MessageId> nackableMessages = new HashSet();
        private final PulsarConsumerErrorHandler<T> pulsarConsumerErrorHandler;
        private final ConsumerBuilderCustomizer<T> consumerBuilderCustomizer;
        private final boolean isBatchListener;
        private final AckMode ackMode;
        private SubscriptionType subscriptionType;

        Listener(MessageListener<?> messageListener, PulsarContainerProperties pulsarContainerProperties) {
            this.containerProperties = pulsarContainerProperties;
            this.isBatchListener = this.containerProperties.isBatchListener();
            this.ackMode = this.containerProperties.getAckMode();
            this.subscriptionType = this.containerProperties.getSubscriptionType();
            if (messageListener instanceof PulsarBatchMessageListener) {
                this.batchMessageListener = (PulsarBatchMessageListener) messageListener;
                this.listener = null;
            } else if (messageListener != null) {
                this.listener = (PulsarRecordMessageListener) messageListener;
                this.batchMessageListener = null;
            } else {
                this.listener = null;
                this.batchMessageListener = null;
            }
            this.pulsarConsumerErrorHandler = DefaultPulsarMessageListenerContainer.this.getPulsarConsumerErrorHandler();
            this.consumerBuilderCustomizer = DefaultPulsarMessageListenerContainer.this.getConsumerBuilderCustomizer();
            try {
                Map<String, Object> extractDirectConsumerProperties = extractDirectConsumerProperties();
                populateAllNecessaryPropertiesIfNeedBe(extractDirectConsumerProperties);
                BatchReceivePolicy build = new BatchReceivePolicy.Builder().maxNumMessages(pulsarContainerProperties.getMaxNumMessages()).maxNumBytes(pulsarContainerProperties.getMaxNumBytes()).timeout(pulsarContainerProperties.getBatchTimeoutMillis(), TimeUnit.MILLISECONDS).build();
                Set set = (Set) extractDirectConsumerProperties.remove("topicNames");
                Map<String, String> map = (Map) extractDirectConsumerProperties.remove("properties");
                ArrayList arrayList = new ArrayList();
                arrayList.add(consumerBuilder -> {
                    ConsumerBuilderConfigurationUtil.loadConf(consumerBuilder, extractDirectConsumerProperties);
                    consumerBuilder.batchReceivePolicy(build);
                });
                if (this.consumerBuilderCustomizer != null) {
                    arrayList.add(this.consumerBuilderCustomizer);
                }
                this.consumer = DefaultPulsarMessageListenerContainer.this.getPulsarConsumerFactory().createConsumer(pulsarContainerProperties.getSchema(), set, this.containerProperties.getSubscriptionName(), map, arrayList);
                Assert.state(this.consumer != null, "Unable to create a consumer");
                if (this.subscriptionType == null) {
                    updateSubscriptionTypeFromConsumer(this.consumer);
                }
            } catch (PulsarClientException e) {
                DefaultPulsarMessageListenerContainer.this.logger.error(e, () -> {
                    return "Pulsar client exceptions.";
                });
            }
        }

        private void updateSubscriptionTypeFromConsumer(Consumer<T> consumer) {
            try {
                Field findField = ReflectionUtils.findField(ConsumerImpl.class, "conf");
                ReflectionUtils.makeAccessible(findField);
                Object field = ReflectionUtils.getField(findField, consumer);
                if (field instanceof ConsumerConfigurationData) {
                    this.subscriptionType = ((ConsumerConfigurationData) field).getSubscriptionType();
                }
            } catch (Exception e) {
                DefaultPulsarMessageListenerContainer.this.logger.error(e, () -> {
                    return "Unable to determine default subscription type from consumer due to: " + e.getMessage();
                });
            }
        }

        private Map<String, Object> extractDirectConsumerProperties() {
            return (Map) this.containerProperties.getPulsarConsumerProperties().entrySet().stream().collect(Collectors.toMap(entry -> {
                return String.valueOf(entry.getKey());
            }, (v0) -> {
                return v0.getValue();
            }, (obj, obj2) -> {
                return obj2;
            }, HashMap::new));
        }

        private void populateAllNecessaryPropertiesIfNeedBe(Map<String, Object> map) {
            Object topicsPattern;
            Object subscriptionType;
            if (map.containsKey("topicNames")) {
                Set of = Set.of((Object[]) StringUtils.delimitedListToStringArray((String) map.get("topicNames"), ","));
                if (!of.isEmpty()) {
                    map.put("topicNames", of);
                }
            }
            if (!map.containsKey("subscriptionType") && (subscriptionType = this.containerProperties.getSubscriptionType()) != null) {
                map.put("subscriptionType", subscriptionType);
            }
            if (!map.containsKey("topicNames")) {
                Object topics = this.containerProperties.getTopics();
                if (!this.containerProperties.getTopics().isEmpty()) {
                    map.put("topicNames", topics);
                }
            }
            if (!map.containsKey("topicsPattern") && (topicsPattern = this.containerProperties.getTopicsPattern()) != null) {
                map.put("topicsPattern", topicsPattern);
            }
            if (!map.containsKey("subscriptionName") && StringUtils.hasText(this.containerProperties.getSubscriptionName())) {
                map.put("subscriptionName", this.containerProperties.getSubscriptionName());
            }
            Object obj = DefaultPulsarMessageListenerContainer.this.negativeAckRedeliveryBackoff;
            if (obj != null) {
                map.put("negativeAckRedeliveryBackoff", obj);
            }
            Object obj2 = DefaultPulsarMessageListenerContainer.this.ackTimeoutRedeliveryBackoff;
            if (obj2 != null) {
                map.put("ackTimeoutRedeliveryBackoff", obj2);
            }
            Object obj3 = DefaultPulsarMessageListenerContainer.this.deadLetterPolicy;
            if (obj3 != null) {
                map.put("deadLetterPolicy", obj3);
            }
        }

        public boolean isLongLived() {
            return true;
        }

        public void run() {
            DefaultPulsarMessageListenerContainer.this.listenerConsumerThread.set(Thread.currentThread());
            DefaultPulsarMessageListenerContainer.this.publishConsumerStartingEvent();
            DefaultPulsarMessageListenerContainer.this.publishConsumerStartedEvent();
            AtomicBoolean atomicBoolean = new AtomicBoolean(false);
            AtomicBoolean atomicBoolean2 = new AtomicBoolean(false);
            Messages<T> messages = null;
            List<Message<T>> list = null;
            while (DefaultPulsarMessageListenerContainer.this.isRunning()) {
                checkIfPausedAndHandleAccordingly();
                try {
                    try {
                        if (!atomicBoolean.get() && !atomicBoolean2.get()) {
                            DefaultPulsarMessageListenerContainer.this.receiveInProgress.set(true);
                            if (!DefaultPulsarMessageListenerContainer.this.isPaused()) {
                                messages = this.consumer.batchReceive();
                            }
                        }
                        DefaultPulsarMessageListenerContainer.this.receiveInProgress.set(false);
                    } catch (PulsarClientException e) {
                        if (e.getCause() instanceof InterruptedException) {
                            DefaultPulsarMessageListenerContainer.this.logger.debug(e, () -> {
                                return "Error receiving messages due to a thread interrupt call from upstream.";
                            });
                        } else {
                            DefaultPulsarMessageListenerContainer.this.logger.error(e, () -> {
                                return "Error receiving messages.";
                            });
                        }
                        messages = null;
                        DefaultPulsarMessageListenerContainer.this.receiveInProgress.set(false);
                    }
                    if (messages != null) {
                        if (this.isBatchListener) {
                            if (!atomicBoolean.get() && !atomicBoolean2.get()) {
                                list = new ArrayList();
                                Objects.requireNonNull(list);
                                messages.forEach((v1) -> {
                                    r1.add(v1);
                                });
                            }
                            if (list != null) {
                                try {
                                    if (list.size() > 0) {
                                        if (this.batchMessageListener instanceof PulsarBatchAcknowledgingMessageListener) {
                                            this.batchMessageListener.received(this.consumer, list, this.ackMode.equals(AckMode.MANUAL) ? new ConsumerBatchAcknowledgment(this.consumer) : null);
                                        } else {
                                            this.batchMessageListener.received(this.consumer, list);
                                        }
                                        if (this.ackMode.equals(AckMode.BATCH)) {
                                            try {
                                                if (isSharedSubscriptionType()) {
                                                    this.consumer.acknowledge(messages);
                                                } else {
                                                    this.consumer.acknowledgeCumulative((Message) StreamSupport.stream(messages.spliterator(), true).reduce((message, message2) -> {
                                                        return message2;
                                                    }).orElse(null));
                                                }
                                            } catch (PulsarClientException e2) {
                                                DefaultPulsarMessageListenerContainer.this.logger.warn(e2, () -> {
                                                    return "Batch acknowledgment failed: " + e2.getMessage();
                                                });
                                                this.consumer.negativeAcknowledge(messages);
                                            }
                                        }
                                        if (this.pulsarConsumerErrorHandler != null) {
                                            pendingMessagesHandledSuccessfully(atomicBoolean, atomicBoolean2);
                                        }
                                    }
                                } catch (Exception e3) {
                                    if (this.pulsarConsumerErrorHandler != null) {
                                        list = invokeBatchListenerErrorHandler(atomicBoolean, atomicBoolean2, list, e3);
                                    } else {
                                        this.consumer.negativeAcknowledge(messages);
                                    }
                                }
                            }
                        } else {
                            for (T t : messages) {
                                do {
                                    newObservation(t).observe(() -> {
                                        dispatchMessageToListener(t, atomicBoolean);
                                    });
                                } while (atomicBoolean.get());
                            }
                            if (this.ackMode.equals(AckMode.BATCH)) {
                                handleBatchAcks(messages);
                            }
                        }
                    }
                } catch (Throwable th) {
                    DefaultPulsarMessageListenerContainer.this.receiveInProgress.set(false);
                    throw th;
                }
            }
        }

        private void checkIfPausedAndHandleAccordingly() {
            if (DefaultPulsarMessageListenerContainer.this.isPaused()) {
                DefaultPulsarMessageListenerContainer.this.lockOnPause.lock();
                try {
                    try {
                        DefaultPulsarMessageListenerContainer.this.pausedCondition.await();
                        DefaultPulsarMessageListenerContainer.this.lockOnPause.unlock();
                    } catch (InterruptedException e) {
                        throw new IllegalStateException("Exception occurred trying to wake up the paused listener thread.");
                    }
                } catch (Throwable th) {
                    DefaultPulsarMessageListenerContainer.this.lockOnPause.unlock();
                    throw th;
                }
            }
        }

        private Observation newObservation(Message<T> message) {
            return this.containerProperties.getObservationRegistry() == null ? Observation.NOOP : PulsarListenerObservation.LISTENER_OBSERVATION.observation(this.containerProperties.getObservationConvention(), DefaultPulsarListenerObservationConvention.INSTANCE, () -> {
                return new PulsarMessageReceiverContext(message, DefaultPulsarMessageListenerContainer.this.getBeanName());
            }, this.containerProperties.getObservationRegistry());
        }

        private void dispatchMessageToListener(Message<T> message, AtomicBoolean atomicBoolean) {
            try {
                if (this.listener instanceof PulsarAcknowledgingMessageListener) {
                    this.listener.received(this.consumer, message, this.ackMode.equals(AckMode.MANUAL) ? new ConsumerAcknowledgment(this.consumer, message) : null);
                } else if (this.listener != null) {
                    this.listener.received(this.consumer, message);
                }
                if (this.ackMode.equals(AckMode.RECORD)) {
                    handleAck(message);
                }
                atomicBoolean.compareAndSet(true, false);
            } catch (Exception e) {
                DefaultPulsarMessageListenerContainer.this.logger.debug(e, () -> {
                    return "Error dispatching the message to the listener.";
                });
                if (this.pulsarConsumerErrorHandler != null) {
                    invokeRecordListenerErrorHandler(atomicBoolean, message, e);
                } else if (this.ackMode.equals(AckMode.RECORD)) {
                    this.consumer.negativeAcknowledge(message);
                } else {
                    if (!this.ackMode.equals(AckMode.BATCH)) {
                        throw new IllegalStateException("Exception occurred and message %s was not auto-nacked; switch to AckMode BATCH or RECORD to enable auto-nacks".formatted(message.getMessageId()), e);
                    }
                    this.nackableMessages.add(message.getMessageId());
                }
            }
        }

        /* JADX WARN: Multi-variable type inference failed */
        /* JADX WARN: Type inference failed for: r0v3, types: [org.springframework.pulsar.listener.PulsarBatchListenerFailedException, java.lang.Exception] */
        private List<Message<T>> invokeBatchListenerErrorHandler(AtomicBoolean atomicBoolean, AtomicBoolean atomicBoolean2, List<Message<T>> list, Throwable th) {
            if (!(th instanceof PulsarBatchListenerFailedException)) {
                th = th.getCause();
                Assert.isInstanceOf(PulsarBatchListenerFailedException.class, th, "Batch listener should throw PulsarBatchListenerFailedException on errors.");
            }
            ?? r0 = (PulsarBatchListenerFailedException) th;
            Message<T> pulsarMessageCausedTheException = getPulsarMessageCausedTheException(r0);
            Message<T> currentMessage = this.pulsarConsumerErrorHandler.currentMessage();
            if (currentMessage != null && !currentMessage.equals(pulsarMessageCausedTheException)) {
                pendingMessagesHandledSuccessfully(atomicBoolean, atomicBoolean2);
            }
            List<Message<T>> subList = list.subList(list.indexOf(pulsarMessageCausedTheException), list.size());
            if (this.pulsarConsumerErrorHandler.shouldRetryMessage(r0, pulsarMessageCausedTheException)) {
                atomicBoolean.set(true);
            } else {
                atomicBoolean.compareAndSet(true, false);
                this.pulsarConsumerErrorHandler.recoverMessage(this.consumer, pulsarMessageCausedTheException, r0);
                handleAck(pulsarMessageCausedTheException);
                if (subList.size() == 1) {
                    atomicBoolean2.set(false);
                } else {
                    subList = subList.subList(1, subList.size());
                }
                if (!subList.isEmpty()) {
                    atomicBoolean2.set(true);
                }
                this.pulsarConsumerErrorHandler.clearMessage();
            }
            return subList;
        }

        private void invokeRecordListenerErrorHandler(AtomicBoolean atomicBoolean, Message<T> message, Exception exc) {
            if (this.pulsarConsumerErrorHandler.shouldRetryMessage(exc, message)) {
                atomicBoolean.set(true);
                return;
            }
            atomicBoolean.compareAndSet(true, false);
            this.pulsarConsumerErrorHandler.recoverMessage(this.consumer, message, exc);
            if (this.ackMode.equals(AckMode.RECORD)) {
                handleAck(message);
            }
        }

        private void pendingMessagesHandledSuccessfully(AtomicBoolean atomicBoolean, AtomicBoolean atomicBoolean2) {
            atomicBoolean.compareAndSet(true, false);
            atomicBoolean2.compareAndSet(true, false);
            this.pulsarConsumerErrorHandler.clearMessage();
        }

        private Message<T> getPulsarMessageCausedTheException(PulsarBatchListenerFailedException pulsarBatchListenerFailedException) {
            return (Message) pulsarBatchListenerFailedException.getMessageInError();
        }

        private boolean isSharedSubscriptionType() {
            return this.subscriptionType != null && (this.subscriptionType.equals(SubscriptionType.Shared) || this.subscriptionType.equals(SubscriptionType.Key_Shared));
        }

        private void handleBatchAcks(Messages<T> messages) {
            if (!this.nackableMessages.isEmpty()) {
                for (T t : messages) {
                    if (this.nackableMessages.contains(t.getMessageId())) {
                        this.consumer.negativeAcknowledge(t);
                        this.nackableMessages.remove(t.getMessageId());
                    } else {
                        handleAck(t);
                    }
                }
                return;
            }
            try {
                if (messages.size() > 0) {
                    if (isSharedSubscriptionType()) {
                        this.consumer.acknowledge(messages);
                    } else {
                        this.consumer.acknowledgeCumulative((Message) StreamSupport.stream(messages.spliterator(), true).reduce((message, message2) -> {
                            return message2;
                        }).orElse(null));
                    }
                }
            } catch (PulsarClientException e) {
                DefaultPulsarMessageListenerContainer.this.logger.warn(e, () -> {
                    return "Batch acknowledgments failed: " + e.getMessage();
                });
                this.consumer.negativeAcknowledge(messages);
            }
        }

        private void handleAck(Message<T> message) {
            AbstractAcknowledgement.handleAckByMessageId(this.consumer, message.getMessageId());
        }

        public void pause() {
            if (this.consumer != null) {
                this.consumer.pause();
            }
        }

        public void resume() {
            if (this.consumer != null) {
                this.consumer.resume();
            }
        }
    }

    public DefaultPulsarMessageListenerContainer(PulsarConsumerFactory<? super T> pulsarConsumerFactory, PulsarContainerProperties pulsarContainerProperties) {
        super(pulsarConsumerFactory, pulsarContainerProperties);
        this.startLatch = new CountDownLatch(1);
        this.listenerConsumerThread = new AtomicReference<>();
        this.receiveInProgress = new AtomicBoolean();
        this.lockOnPause = new ReentrantLock();
        this.pausedCondition = this.lockOnPause.newCondition();
        this.thisOrParentContainer = this;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.springframework.pulsar.core.AbstractPulsarMessageContainer
    public void doStart() {
        PulsarContainerProperties containerProperties = getContainerProperties();
        AsyncTaskExecutor consumerTaskExecutor = containerProperties.getConsumerTaskExecutor();
        if (consumerTaskExecutor == null) {
            consumerTaskExecutor = new SimpleAsyncTaskExecutor((getBeanName() == null ? "" : getBeanName()) + "-C-");
            containerProperties.setConsumerTaskExecutor(consumerTaskExecutor);
        }
        this.listenerConsumer = new Listener((MessageListener) containerProperties.getMessageListener(), getContainerProperties());
        setRunning(true);
        this.startLatch = new CountDownLatch(1);
        this.listenerConsumerFuture = consumerTaskExecutor.submitCompletable(this.listenerConsumer);
        try {
            if (!this.startLatch.await(containerProperties.getConsumerStartTimeout().toMillis(), TimeUnit.MILLISECONDS)) {
                this.logger.error("Consumer thread failed to start - does the configured task executor have enough threads to support all containers and concurrency?");
                publishConsumerFailedToStart();
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    @Override // org.springframework.pulsar.core.AbstractPulsarMessageContainer
    public void doStop() {
        setRunning(false);
        this.logger.info("Pausing consumer");
        if (((Listener) this.listenerConsumer).consumer != null) {
            ((Listener) this.listenerConsumer).consumer.pause();
        }
        if (this.listenerConsumerThread.get() != null) {
            if (this.receiveInProgress.get()) {
                this.listenerConsumerThread.get().interrupt();
            }
            try {
                this.listenerConsumerThread.get().join();
            } catch (InterruptedException e) {
                this.logger.error(e, () -> {
                    return "Interrupting the main thread";
                });
                Thread.currentThread().interrupt();
            }
        }
        try {
            this.logger.info("Closing consumer");
            if (((Listener) this.listenerConsumer).consumer != null) {
                ((Listener) this.listenerConsumer).consumer.close();
            }
        } catch (PulsarClientException e2) {
            this.logger.error(e2, () -> {
                return "Error closing Pulsar Client.";
            });
        }
    }

    private void publishConsumerStartingEvent() {
        this.startLatch.countDown();
        ApplicationEventPublisher applicationEventPublisher = getApplicationEventPublisher();
        if (applicationEventPublisher != null) {
            applicationEventPublisher.publishEvent(new ConsumerStartingEvent(this, this.thisOrParentContainer));
        }
    }

    private void publishConsumerStartedEvent() {
        ApplicationEventPublisher applicationEventPublisher = getApplicationEventPublisher();
        if (applicationEventPublisher != null) {
            applicationEventPublisher.publishEvent(new ConsumerStartedEvent(this, this.thisOrParentContainer));
        }
    }

    private void publishConsumerFailedToStart() {
        ApplicationEventPublisher applicationEventPublisher = getApplicationEventPublisher();
        if (applicationEventPublisher != null) {
            applicationEventPublisher.publishEvent(new ConsumerFailedToStartEvent(this, this.thisOrParentContainer));
        }
    }

    @Override // org.springframework.pulsar.listener.AbstractPulsarMessageListenerContainer
    public void doPause() {
        setPaused(true);
        if (this.listenerConsumer != null) {
            this.listenerConsumer.pause();
        }
    }

    @Override // org.springframework.pulsar.listener.AbstractPulsarMessageListenerContainer
    public void doResume() {
        if (this.listenerConsumer != null) {
            this.listenerConsumer.resume();
        }
        setPaused(false);
        this.lockOnPause.lock();
        try {
            this.pausedCondition.signal();
        } finally {
            this.lockOnPause.unlock();
        }
    }
}
