package io.siddhi.extension.io.kafka.sink;

import io.siddhi.annotation.Example;
import io.siddhi.annotation.Extension;
import io.siddhi.annotation.Parameter;
import io.siddhi.annotation.util.DataType;
import io.siddhi.core.config.SiddhiAppContext;
import io.siddhi.core.exception.ConnectionUnavailableException;
import io.siddhi.core.stream.ServiceDeploymentInfo;
import io.siddhi.core.stream.output.sink.Sink;
import io.siddhi.core.util.config.ConfigReader;
import io.siddhi.core.util.snapshot.state.State;
import io.siddhi.core.util.snapshot.state.StateFactory;
import io.siddhi.core.util.transport.DynamicOptions;
import io.siddhi.core.util.transport.Option;
import io.siddhi.core.util.transport.OptionHolder;
import io.siddhi.extension.io.kafka.KafkaIOUtils;
import io.siddhi.query.api.definition.StreamDefinition;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.log4j.Logger;

@Extension(name = "kafka", namespace = "sink", description = "A Kafka sink publishes events processed by WSO2 SP to a topic with a partition for a Kafka cluster. The events can be published in the `TEXT` `XML` `JSON` or `Binary` format.\nIf the topic is not already created in the Kafka cluster, the Kafka sink creates the default partition for the given topic. The publishing topic and partition can be a dynamic value taken from the Siddhi event.\nTo configure a sink to use the Kafka transport, the `type` parameter should have `kafka` as its value.", parameters = {@Parameter(name = "bootstrap.servers", description = " This parameter specifies the list of Kafka servers to which the Kafka sink must publish events. This list should be provided as a set of comma separated values. e.g., `localhost:9092,localhost:9093`.", type = {DataType.STRING}), @Parameter(name = KafkaSink.KAFKA_PUBLISH_TOPIC, description = "The topic to which the Kafka sink needs to publish events. Only one topic must be specified.", type = {DataType.STRING}), @Parameter(name = KafkaSink.KAFKA_PARTITION_NO, description = "The partition number for the given topic. Only one partition ID can be defined. If no value is specified for this parameter, the Kafka sink publishes to the default partition of the topic (i.e., 0)", type = {DataType.INT}, optional = true, defaultValue = "0"), @Parameter(name = KafkaSink.SEQ_ID, description = "A unique identifier to identify the messages published by this sink. This ID allows receivers to identify the sink that published a specific message.", type = {DataType.STRING}, optional = true, defaultValue = "null"), @Parameter(name = KafkaSink.KAFKA_MESSAGE_KEY, description = "The key contains the values that are used to maintain ordering in a Kafka partition.", type = {DataType.STRING}, optional = true, defaultValue = "null"), @Parameter(name = "is.binary.message", description = "In order to send the binary events via kafka sink, this parameter is set to 'True'.", type = {DataType.BOOL}, optional = false, defaultValue = "null"), @Parameter(name = "optional.configuration", description = "This parameter contains all the other possible configurations that the producer is created with. \ne.g., `producer.type:async,batch.size:200`", optional = true, type = {DataType.STRING}, defaultValue = "null")}, examples = {@Example(syntax = "@App:name('TestExecutionPlan') \ndefine stream FooStream (symbol string, price float, volume long); \n@info(name = 'query1') \n@sink(\ntype='kafka',\ntopic='topic_with_partitions',\npartition.no='0',\nbootstrap.servers='localhost:9092',\n@map(type='xml'))\nDefine stream BarStream (symbol string, price float, volume long);\nfrom FooStream select symbol, price, volume insert into BarStream;\n", description = "This Kafka sink configuration publishes to 0th partition of the topic named `topic_with_partitions`."), @Example(syntax = "@App:name('TestExecutionPlan') \ndefine stream FooStream (symbol string, price float, volume long); \n@info(name = 'query1') \n@sink(\ntype='kafka',\ntopic='{{symbol}}',\npartition.no='{{volume}}',\nbootstrap.servers='localhost:9092',\n@map(type='xml'))\nDefine stream BarStream (symbol string, price float, volume long); \nfrom FooStream select symbol, price, volume insert into BarStream;", description = "This query publishes dynamic topic and partitions that are taken from the Siddhi event. The value for `partition.no` is taken from the `volume` attribute, and the topic value is taken from the `symbol` attribute.")})
/* loaded from: input_file:io/siddhi/extension/io/kafka/sink/KafkaSink.class */
public class KafkaSink extends Sink<KafkaSinkState> {
    private static final String LAST_SENT_SEQ_NO_PERSIST_KEY = "lastSentSequenceNo";
    public static final String SEQ_NO_HEADER_DELIMITER = "~";
    public static final String SEQ_NO_HEADER_FIELD_SEPERATOR = ":";
    protected static final String KAFKA_PUBLISH_TOPIC = "topic";
    private static final String KAFKA_BROKER_LIST = "bootstrap.servers";
    protected static final String KAFKA_MESSAGE_KEY = "key";
    private static final String KAFKA_OPTIONAL_CONFIGURATION_PROPERTIES = "optional.configuration";
    protected static final String KAFKA_PARTITION_NO = "partition.no";
    private static final String SEQ_ID = "sequence.id";
    private static final String IS_BINARY_MESSAGE = "is.binary.message";
    private static final Logger LOG = Logger.getLogger(KafkaSink.class);
    protected String bootstrapServers;
    protected String optionalConfigs;
    private Option partitionOption;
    protected Boolean isBinaryMessage;
    protected Option keyOption;
    private Producer<String, Object> producer;
    private Option topicOption = null;
    private Boolean isSequenced = false;
    protected String sequenceId = null;

    /* loaded from: input_file:io/siddhi/extension/io/kafka/sink/KafkaSink$KafkaSinkState.class */
    public class KafkaSinkState extends State {
        public AtomicInteger lastSentSequenceNo = new AtomicInteger(0);
        private boolean isSequenced;

        KafkaSinkState(boolean z) {
            this.isSequenced = false;
            this.isSequenced = z;
        }

        public Map<String, Object> snapshot() {
            if (!this.isSequenced) {
                return null;
            }
            HashMap hashMap = new HashMap();
            hashMap.put(KafkaSink.LAST_SENT_SEQ_NO_PERSIST_KEY, Integer.valueOf(this.lastSentSequenceNo.get()));
            return hashMap;
        }

        public void restore(Map<String, Object> map) {
            Object obj;
            if (!this.isSequenced || (obj = map.get(KafkaSink.LAST_SENT_SEQ_NO_PERSIST_KEY)) == null) {
                return;
            }
            this.lastSentSequenceNo.set(((Integer) obj).intValue());
        }

        public boolean canDestroy() {
            return false;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public StateFactory<KafkaSinkState> init(StreamDefinition streamDefinition, OptionHolder optionHolder, ConfigReader configReader, SiddhiAppContext siddhiAppContext) {
        this.bootstrapServers = optionHolder.validateAndGetStaticValue("bootstrap.servers");
        this.optionalConfigs = optionHolder.validateAndGetStaticValue("optional.configuration", (String) null);
        this.topicOption = optionHolder.validateAndGetOption(KAFKA_PUBLISH_TOPIC);
        this.partitionOption = optionHolder.getOrCreateOption(KAFKA_PARTITION_NO, (String) null);
        this.sequenceId = optionHolder.validateAndGetStaticValue(SEQ_ID, (String) null);
        this.isBinaryMessage = Boolean.valueOf(Boolean.parseBoolean(optionHolder.validateAndGetStaticValue("is.binary.message", "false")));
        this.isSequenced = Boolean.valueOf(this.sequenceId != null);
        this.keyOption = optionHolder.getOrCreateOption(KAFKA_MESSAGE_KEY, (String) null);
        return () -> {
            return new KafkaSinkState(this.isSequenced.booleanValue());
        };
    }

    @Override // 
    public void publish(Object obj, DynamicOptions dynamicOptions, KafkaSinkState kafkaSinkState) throws ConnectionUnavailableException {
        Object obj2;
        String value = this.topicOption.getValue(dynamicOptions);
        String value2 = this.partitionOption.getValue(dynamicOptions);
        String value3 = this.keyOption.getValue(dynamicOptions);
        try {
            if (!(obj instanceof String)) {
                byte[] array = ((ByteBuffer) obj).array();
                if (this.isSequenced.booleanValue()) {
                    obj2 = getSequencedBinaryPayloadToSend(array, kafkaSinkState);
                    kafkaSinkState.lastSentSequenceNo.incrementAndGet();
                } else {
                    obj2 = array;
                }
            } else if (this.isSequenced.booleanValue() && !this.isBinaryMessage.booleanValue()) {
                StringBuilder sb = new StringBuilder();
                sb.append(this.sequenceId).append(SEQ_NO_HEADER_FIELD_SEPERATOR).append(kafkaSinkState.lastSentSequenceNo.get()).append(SEQ_NO_HEADER_DELIMITER).append(obj.toString());
                obj2 = sb.toString();
                kafkaSinkState.lastSentSequenceNo.incrementAndGet();
            } else if (!this.isSequenced.booleanValue() && !this.isBinaryMessage.booleanValue()) {
                obj2 = obj.toString();
            } else if (this.isSequenced.booleanValue() && this.isBinaryMessage.booleanValue()) {
                obj2 = getSequencedBinaryPayloadToSend(obj.toString().getBytes("UTF-8"), kafkaSinkState);
                kafkaSinkState.lastSentSequenceNo.incrementAndGet();
            } else {
                obj2 = obj.toString().getBytes("UTF-8");
            }
            if (null == value2) {
                this.producer.send(new ProducerRecord(value, (Integer) null, value3, obj2));
            } else {
                this.producer.send(new ProducerRecord(value, Integer.valueOf(Integer.parseInt(value2)), value3, obj2));
            }
        } catch (UnsupportedEncodingException e) {
            LOG.error("Error while converting the received string payload to byte[].", e);
        }
    }

    public void connect() throws ConnectionUnavailableException {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", this.bootstrapServers);
        properties.put("acks", "all");
        properties.put("retries", 0);
        properties.put("batch.size", 16384);
        properties.put("linger.ms", 1);
        properties.put("buffer.memory", 33554432);
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        if (this.isBinaryMessage.booleanValue()) {
            properties.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        } else {
            properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        }
        KafkaIOUtils.splitHeaderValues(this.optionalConfigs, properties);
        this.producer = new KafkaProducer(properties);
        LOG.info("Kafka producer created.");
    }

    public void disconnect() {
        if (this.producer != null) {
            this.producer.close();
        }
    }

    public void destroy() {
    }

    public Class[] getSupportedInputEventClasses() {
        return new Class[]{String.class, ByteBuffer.class};
    }

    protected ServiceDeploymentInfo exposeServiceDeploymentInfo() {
        return null;
    }

    public String[] getSupportedDynamicOptions() {
        return new String[]{KAFKA_PUBLISH_TOPIC, KAFKA_PARTITION_NO, KAFKA_MESSAGE_KEY};
    }

    public byte[] getSequencedBinaryPayloadToSend(byte[] bArr, KafkaSinkState kafkaSinkState) {
        StringBuilder sb = new StringBuilder();
        sb.append(this.sequenceId).append(SEQ_NO_HEADER_FIELD_SEPERATOR).append(kafkaSinkState.lastSentSequenceNo.get()).append(SEQ_NO_HEADER_DELIMITER);
        int length = sb.toString().length();
        ByteBuffer wrap = ByteBuffer.wrap(new byte[length + 4 + bArr.length]);
        wrap.putInt(length);
        wrap.put(sb.toString().getBytes(Charset.defaultCharset()));
        wrap.put(bArr);
        return wrap.array();
    }
}
