package io.siddhi.extension.io.kafka.multidc.source;

import io.siddhi.core.stream.input.source.SourceEventListener;
import io.siddhi.extension.io.kafka.sink.KafkaSink;
import io.siddhi.query.api.definition.StreamDefinition;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.Arrays;
import org.apache.log4j.Logger;

/* compiled from: KafkaMultiDCSource.java */
/* loaded from: input_file:io/siddhi/extension/io/kafka/multidc/source/Interceptor.class */
class Interceptor implements SourceEventListener {
    private static final Logger LOG = Logger.getLogger(Interceptor.class);
    private String sourceId;
    private SourceSynchronizer synchronizer;
    private boolean isBinaryMessage;

    public Interceptor(String str, SourceSynchronizer sourceSynchronizer, boolean z) {
        this.sourceId = str;
        this.synchronizer = sourceSynchronizer;
        this.isBinaryMessage = z;
    }

    public void onEvent(Object obj, String[] strArr) {
        onEventReceive(obj, strArr, null);
    }

    public void onEvent(Object obj, String[] strArr, String[] strArr2) {
        onEventReceive(obj, strArr, strArr2);
    }

    public StreamDefinition getStreamDefinition() {
        return null;
    }

    private void onEventReceive(Object obj, String[] strArr, String[] strArr2) {
        if (!this.isBinaryMessage) {
            String str = (String) obj;
            int indexOf = str.indexOf(KafkaSink.SEQ_NO_HEADER_DELIMITER);
            if (indexOf <= 0) {
                LOG.warn("Sequence number is not contained in the message. Dropping the message :" + str);
                return;
            }
            this.synchronizer.onEvent(this.sourceId, Integer.valueOf(Integer.parseInt(str.substring(0, indexOf).split(KafkaSink.SEQ_NO_HEADER_FIELD_SEPERATOR)[1])).intValue(), str.substring(indexOf + 1), strArr);
            return;
        }
        byte[] bArr = (byte[]) obj;
        int i = ByteBuffer.wrap(bArr).getInt();
        String str2 = new String(bArr, 4, i - 1, Charset.defaultCharset());
        if (str2.isEmpty()) {
            LOG.warn("Sequence number is not contained in the message. Dropping the message");
            return;
        }
        Integer valueOf = Integer.valueOf(Integer.parseInt(str2.split(KafkaSink.SEQ_NO_HEADER_FIELD_SEPERATOR)[1]));
        this.synchronizer.onEvent(this.sourceId, valueOf.intValue(), Arrays.copyOfRange(bArr, i + 4, bArr.length), strArr);
    }
}
