package io.siddhi.extension.io.kafka.multidc.source;

import io.siddhi.core.stream.input.source.SourceEventListener;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.log4j.Logger;

/* loaded from: input_file:io/siddhi/extension/io/kafka/multidc/source/SourceSynchronizer.class */
public class SourceSynchronizer {
    private static final Logger LOG = Logger.getLogger(SourceSynchronizer.class);
    private final SourceEventListener eventListener;
    private int maxBufferSize;
    private int bufferInterval;
    boolean isEventGap = false;
    Map<Long, BufferValueHolder> eventBuffer = new TreeMap();
    Map<String, Long> perSourceReceivedSeqNo = new HashMap();
    Timer flushBufferTimer = new Timer(true);
    String[] bootstrapServers = new String[2];
    List<Long> toRemoveSeqNos = new ArrayList();
    private Long lastConsumedSeqNo = -1L;
    private AtomicBoolean isFlushTaskDue = new AtomicBoolean(false);

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/siddhi/extension/io/kafka/multidc/source/SourceSynchronizer$BufferFlushTask.class */
    public class BufferFlushTask extends TimerTask {
        private final Logger log = Logger.getLogger(BufferFlushTask.class);

        BufferFlushTask() {
        }

        @Override // java.util.TimerTask, java.lang.Runnable
        public synchronized void run() {
            SourceSynchronizer.this.isFlushTaskDue.set(false);
            long max = Math.max(SourceSynchronizer.this.perSourceReceivedSeqNo.get(SourceSynchronizer.this.bootstrapServers[0]).longValue(), SourceSynchronizer.this.perSourceReceivedSeqNo.get(SourceSynchronizer.this.bootstrapServers[1]).longValue());
            if (this.log.isDebugEnabled()) {
                this.log.debug("Executing the buffer flushing task. Flushing buffers till " + max);
            }
            SourceSynchronizer.this.forceFlushBuffer(max);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/siddhi/extension/io/kafka/multidc/source/SourceSynchronizer$BufferValueHolder.class */
    public static class BufferValueHolder {
        String[] strings;
        private Object event;
        private String sourceId;

        BufferValueHolder(Object obj, String str, String[] strArr) {
            this.event = obj;
            this.sourceId = str;
            this.strings = strArr;
        }

        String[] getStrings() {
            return this.strings;
        }

        String getSourceId() {
            return this.sourceId;
        }

        public Object getEvent() {
            return this.event;
        }
    }

    public SourceSynchronizer(SourceEventListener sourceEventListener, String[] strArr, int i, int i2) {
        this.eventListener = sourceEventListener;
        this.bootstrapServers[0] = strArr[0];
        this.bootstrapServers[1] = strArr[1];
        this.maxBufferSize = i;
        this.bufferInterval = i2;
        this.perSourceReceivedSeqNo.put(strArr[0], -1L);
        this.perSourceReceivedSeqNo.put(strArr[1], -1L);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public synchronized void forceFlushBuffer(long j) {
        for (Map.Entry<Long, BufferValueHolder> entry : this.eventBuffer.entrySet()) {
            Long key = entry.getKey();
            BufferValueHolder value = entry.getValue();
            if (key.longValue() > this.lastConsumedSeqNo.longValue() && key.longValue() <= j) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Updating the lastConsumedSeqNo=" + key + " as the event is forcefully flushed, from the source " + value.getSourceId());
                }
                if (key.longValue() >= this.lastConsumedSeqNo.longValue() && this.lastConsumedSeqNo.longValue() != key.longValue() + 1) {
                    LOG.warn("Events lost from sequence " + (this.lastConsumedSeqNo.longValue() + 1) + " to " + (key.longValue() - 1));
                }
                this.lastConsumedSeqNo = key;
                this.toRemoveSeqNos.add(key);
                this.eventListener.onEvent(value.getEvent(), value.getStrings());
            }
        }
        this.toRemoveSeqNos.forEach(l -> {
            this.eventBuffer.remove(l);
        });
        this.toRemoveSeqNos.clear();
    }

    private synchronized void flushBuffer() {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Start flushing buffer");
        }
        Iterator<Map.Entry<Long, BufferValueHolder>> it = this.eventBuffer.entrySet().iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            Map.Entry<Long, BufferValueHolder> next = it.next();
            Long key = next.getKey();
            BufferValueHolder value = next.getValue();
            if (key.longValue() <= this.lastConsumedSeqNo.longValue()) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Message with sequence " + key + " already received. Dropping the event from the buffer");
                }
                this.toRemoveSeqNos.add(key);
            } else if (key.longValue() == this.lastConsumedSeqNo.longValue() + 1) {
                this.isEventGap = false;
                Long l = this.lastConsumedSeqNo;
                this.lastConsumedSeqNo = Long.valueOf(this.lastConsumedSeqNo.longValue() + 1);
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Message with sequence " + key + " flushed from buffer. Updating lastConsumedSeqNo=" + this.lastConsumedSeqNo);
                }
                this.toRemoveSeqNos.add(key);
                this.eventListener.onEvent(value.getEvent(), value.getStrings());
            } else {
                this.isEventGap = true;
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Gap detected while flushing the buffer. Flushed message sequence=" + key + ". Expected sequence=" + (this.lastConsumedSeqNo.longValue() + 1) + ". Stop flushing the buffer.");
                }
            }
        }
        this.toRemoveSeqNos.forEach(l2 -> {
            this.eventBuffer.remove(l2);
        });
        this.toRemoveSeqNos.clear();
        if (LOG.isDebugEnabled()) {
            LOG.debug("End flushing buffer");
        }
    }

    private synchronized void bufferEvent(String str, long j, Object obj, String[] strArr) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Buffering Event. SourceId=" + str + ", SequenceNumber=" + j);
        }
        if (this.eventBuffer.size() >= this.maxBufferSize) {
            long max = Math.max(this.perSourceReceivedSeqNo.get(this.bootstrapServers[0]).longValue(), this.perSourceReceivedSeqNo.get(this.bootstrapServers[1]).longValue());
            LOG.info("Buffer size exceeded. Force flushing events till the sequence " + j);
            forceFlushBuffer(max);
        }
        this.eventBuffer.put(Long.valueOf(j), new BufferValueHolder(obj, str, strArr));
    }

    public synchronized void onEvent(String str, long j, Object obj, String[] strArr) {
        this.perSourceReceivedSeqNo.put(str, Long.valueOf(j));
        if (j <= this.lastConsumedSeqNo.longValue()) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Message with sequence " + j + " already received. Dropping the event from source " + str + ":" + obj);
                return;
            }
            return;
        }
        if (j == this.lastConsumedSeqNo.longValue() + 1) {
            Long l = this.lastConsumedSeqNo;
            this.lastConsumedSeqNo = Long.valueOf(this.lastConsumedSeqNo.longValue() + 1);
            if (LOG.isDebugEnabled()) {
                LOG.debug("Message with sequence " + j + " received from source " + str + ". Updating lastConsumedSeqNo=" + this.lastConsumedSeqNo);
            }
            this.eventListener.onEvent(obj, strArr);
            if (this.eventBuffer.isEmpty()) {
                return;
            }
            flushBuffer();
            return;
        }
        if (this.isEventGap) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Message with sequence " + j + " from source" + str + ". Couldn't fill the gap, buffering the event.");
            }
            bufferEvent(str, j, obj, strArr);
            long min = Math.min(this.perSourceReceivedSeqNo.get(this.bootstrapServers[0]).longValue(), this.perSourceReceivedSeqNo.get(this.bootstrapServers[1]).longValue());
            this.isEventGap = false;
            forceFlushBuffer(min);
            return;
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("Gap detected. Message with sequence " + j + " received from source " + str + ". Expected sequence number is " + (this.lastConsumedSeqNo.longValue() + 1) + ". Starting buffering events");
        }
        this.isEventGap = true;
        bufferEvent(str, j, obj, strArr);
        if (this.isFlushTaskDue.get()) {
            return;
        }
        this.flushBufferTimer.schedule(new BufferFlushTask(), this.bufferInterval);
        this.isFlushTaskDue.set(true);
    }

    public synchronized Long getLastConsumedSeqNo() {
        return this.lastConsumedSeqNo;
    }

    public synchronized void setLastConsumedSeqNo(long j) {
        this.lastConsumedSeqNo = Long.valueOf(j);
    }
}
