package io.siddhi.extension.io.kafka.sink;

import io.siddhi.annotation.Example;
import io.siddhi.annotation.Extension;
import io.siddhi.annotation.Parameter;
import io.siddhi.annotation.util.DataType;
import io.siddhi.core.config.SiddhiAppContext;
import io.siddhi.core.event.Event;
import io.siddhi.core.exception.ConnectionUnavailableException;
import io.siddhi.core.stream.ServiceDeploymentInfo;
import io.siddhi.core.stream.output.sink.Sink;
import io.siddhi.core.util.config.ConfigReader;
import io.siddhi.core.util.snapshot.state.State;
import io.siddhi.core.util.snapshot.state.StateFactory;
import io.siddhi.core.util.transport.DynamicOptions;
import io.siddhi.core.util.transport.OptionHolder;
import io.siddhi.extension.io.kafka.Constants;
import io.siddhi.extension.io.kafka.util.KafkaReplayResponseSourceRegistry;
import io.siddhi.query.api.definition.StreamDefinition;

@Extension(name = "kafka-replay-request", namespace = "sink", description = "This sink is used to request replay of specific range of events on a specified partition of a topic.", parameters = {@Parameter(name = Constants.SINK_ID, description = "a unique SINK_ID should be set. This sink id will be used to match with the appropriate kafka-replay-response source", type = {DataType.STRING})}, examples = {@Example(syntax = "@App:name('TestKafkaReplay')\n\n@sink(type='kafka-replay-request', sink.id='1')\ndefine stream BarStream (topicForReplay string, partitionForReplay string, startOffset string, endOffset string);\n\n@info(name = 'query1')\n@source(type='kafka-replay-response',  group.id='group', threading.option='single.thread', bootstrap.servers='localhost:9092', sink.id='1',\n@map(type='json'))\nDefine stream FooStream (symbol string, amount double);\n\n@sink(type='log')\nDefine stream logStream(symbol string, amount double);\n\nfrom FooStream select * insert into logStream;", description = "In this app we can send replay request events into BarStream and observe the replayed events in the logStream")})
/* loaded from: input_file:io/siddhi/extension/io/kafka/sink/KafkaReplayRequestSink.class */
public class KafkaReplayRequestSink extends Sink {
    private String sinkID;

    public Class[] getSupportedInputEventClasses() {
        return new Class[]{String.class, Event.class};
    }

    protected ServiceDeploymentInfo exposeServiceDeploymentInfo() {
        return null;
    }

    public String[] getSupportedDynamicOptions() {
        return new String[0];
    }

    protected StateFactory init(StreamDefinition streamDefinition, OptionHolder optionHolder, ConfigReader configReader, SiddhiAppContext siddhiAppContext) {
        this.sinkID = optionHolder.validateAndGetOption(Constants.SINK_ID).getValue();
        return null;
    }

    public void publish(Object obj, DynamicOptions dynamicOptions, State state) throws ConnectionUnavailableException {
        Object[] data;
        if (obj instanceof Event[]) {
            data = ((Event[]) obj)[0].getData();
        } else {
            if (!(obj instanceof Event)) {
                throw new ConnectionUnavailableException("Unknown type");
            }
            data = ((Event) obj).getData();
        }
        String str = (String) data[0];
        KafkaReplayResponseSourceRegistry.getInstance().getKafkaReplayResponseSource(this.sinkID).onReplayRequest((String) data[1], (String) data[2], (String) data[3], str);
    }

    public void connect() throws ConnectionUnavailableException {
    }

    public void disconnect() {
    }

    public void destroy() {
    }
}
