package io.siddhi.extension.io.googlepubsub.source;

import com.google.api.gax.core.FixedCredentialsProvider;
import com.google.api.gax.rpc.ApiException;
import com.google.api.gax.rpc.StatusCode;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.auth.oauth2.ServiceAccountCredentials;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
import com.google.cloud.pubsub.v1.SubscriptionAdminSettings;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.ProjectTopicName;
import com.google.pubsub.v1.PushConfig;
import io.siddhi.annotation.Example;
import io.siddhi.annotation.Extension;
import io.siddhi.annotation.Parameter;
import io.siddhi.annotation.util.DataType;
import io.siddhi.core.config.SiddhiAppContext;
import io.siddhi.core.exception.ConnectionUnavailableException;
import io.siddhi.core.exception.SiddhiAppCreationException;
import io.siddhi.core.stream.ServiceDeploymentInfo;
import io.siddhi.core.stream.input.source.Source;
import io.siddhi.core.stream.input.source.SourceEventListener;
import io.siddhi.core.util.config.ConfigReader;
import io.siddhi.core.util.snapshot.state.State;
import io.siddhi.core.util.snapshot.state.StateFactory;
import io.siddhi.core.util.transport.OptionHolder;
import io.siddhi.extension.io.googlepubsub.util.GooglePubSubConstants;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

@Extension(name = "googlepubsub", namespace = "source", description = "The GooglePubSub source receives events to be processed by Siddhi from a topic in a GooglePubSub server. Here, a subscriber client creates a subscription to that topic and consumes messages via the subscription. The subscription applications receive only the messages that are published after the subscription is created. A subscription connects a topic to a subscriber application, enabling the application to receive and process messages from that topic. A topic can have multiple subscriptions, but a given subscription belongs only to a single topic.", parameters = {@Parameter(name = GooglePubSubConstants.GOOGLE_PUB_SUB_SERVER_PROJECT_ID, description = "The unique ID of the GCP console project within which the topic is created.", type = {DataType.STRING}), @Parameter(name = GooglePubSubConstants.TOPIC_ID, description = "The unique ID of the topic from which the messages are received.", type = {DataType.STRING}), @Parameter(name = GooglePubSubConstants.SUBSCRIPTION_ID, description = "The unique ID of the subscription from which messages must be retrieved.", type = {DataType.STRING}), @Parameter(name = GooglePubSubConstants.CREDENTIAL_PATH, description = "The file path of the service account credentials.", type = {DataType.STRING})}, examples = {@Example(syntax = "@source(type='googlepubsub',@map(type='text'),\ntopic.id='topicA',\nproject.id='sp-path-1547649404768',\ncredential.path = 'src/test/resources/security/sp.json',\nsubscription.id='subA',\n)\ndefine stream OutputStream(message String);", description = "This query shows how to subscribe to a googlepubsub topic. Here, a googlepubsub source subscribes to the 'topicA' topic that resides in the 'sp-path-1547649404768' project within a googlepubsub instance. The events are received in the text format, mapped to a Siddhi event, and then sent to a stream named OutputStream.")})
/* loaded from: input_file:io/siddhi/extension/io/googlepubsub/source/GooglePubSubSource.class */
public class GooglePubSubSource extends Source<State> {
    private static final Logger log = LogManager.getLogger(GooglePubSubSource.class);
    private String streamID;
    private String siddhiAppName;
    private SubscriptionAdminClient subscriptionAdminClient;
    private GooglePubSubMessageReceiver googlePubSubMessageReceiver;
    private Subscriber subscriber;
    private ProjectTopicName topicName;
    private ProjectSubscriptionName subscriptionName;
    private GoogleCredentials credentials;
    private String projectId;
    private String topicId;

    public StateFactory<State> init(SourceEventListener sourceEventListener, OptionHolder optionHolder, String[] strArr, ConfigReader configReader, SiddhiAppContext siddhiAppContext) {
        this.streamID = sourceEventListener.getStreamDefinition().getId();
        this.siddhiAppName = siddhiAppContext.getName();
        String validateAndGetStaticValue = optionHolder.validateAndGetStaticValue(GooglePubSubConstants.SUBSCRIPTION_ID);
        this.topicId = optionHolder.validateAndGetStaticValue(GooglePubSubConstants.TOPIC_ID);
        this.projectId = optionHolder.validateAndGetStaticValue(GooglePubSubConstants.GOOGLE_PUB_SUB_SERVER_PROJECT_ID);
        String validateAndGetStaticValue2 = optionHolder.validateAndGetStaticValue(GooglePubSubConstants.CREDENTIAL_PATH);
        this.googlePubSubMessageReceiver = new GooglePubSubMessageReceiver(sourceEventListener);
        this.topicName = ProjectTopicName.of(this.projectId, this.topicId);
        this.subscriptionName = ProjectSubscriptionName.of(this.projectId, validateAndGetStaticValue);
        try {
            this.credentials = ServiceAccountCredentials.fromStream((InputStream) new FileInputStream(new File(validateAndGetStaticValue2)));
            return null;
        } catch (IOException e) {
            throw new SiddhiAppCreationException("The file that contains your service account credentials is not found or you are not permitted to make authenticated calls.Check the credential.path '" + validateAndGetStaticValue2 + "' defined in stream " + this.siddhiAppName + ": " + this.streamID, e);
        }
    }

    public Class[] getOutputEventClasses() {
        return new Class[]{String.class};
    }

    protected ServiceDeploymentInfo exposeServiceDeploymentInfo() {
        return null;
    }

    public void connect(Source<State>.ConnectionCallback connectionCallback, State state) throws ConnectionUnavailableException {
        try {
            try {
                this.subscriptionAdminClient = SubscriptionAdminClient.create(SubscriptionAdminSettings.newBuilder().setCredentialsProvider(FixedCredentialsProvider.create(this.credentials)).build());
                this.subscriptionAdminClient.createSubscription(this.subscriptionName, this.topicName, PushConfig.getDefaultInstance(), 10);
                if (this.subscriptionAdminClient != null) {
                    this.subscriptionAdminClient.shutdown();
                }
            } catch (ApiException e) {
                if (e.getStatusCode().getCode() != StatusCode.Code.ALREADY_EXISTS) {
                    log.error("Error in connecting to the resources at " + this.siddhiAppName + ": " + this.streamID);
                    throw new ConnectionUnavailableException("An error is caused due to resource " + e.getStatusCode().getCode() + ".Check whether you have provided a proper project.id for '" + this.projectId + "' or existing topic.id for '" + this.topicId + "' defined in stream " + this.siddhiAppName + ": " + this.streamID + " and make sure you have enough access to use all resources in API.", e);
                }
                log.info("You have a subscription " + this.subscriptionName + "to the topic " + this.topicName);
                if (this.subscriptionAdminClient != null) {
                    this.subscriptionAdminClient.shutdown();
                }
            } catch (IOException e2) {
                throw new ConnectionUnavailableException("Could not create a subscription " + this.subscriptionName + "to pull messages from the google pub sub server defined in stream " + this.siddhiAppName + ": " + this.streamID, e2);
            }
            this.subscriber = Subscriber.newBuilder(this.subscriptionName, this.googlePubSubMessageReceiver).setCredentialsProvider(FixedCredentialsProvider.create(this.credentials)).build();
            this.subscriber.startAsync().awaitRunning();
        } catch (Throwable th) {
            if (this.subscriptionAdminClient != null) {
                this.subscriptionAdminClient.shutdown();
            }
            throw th;
        }
    }

    public void disconnect() {
        if (this.subscriber != null) {
            this.subscriber.stopAsync().awaitTerminated();
        }
    }

    public void destroy() {
    }

    public void pause() {
        this.googlePubSubMessageReceiver.pause();
    }

    public void resume() {
        this.googlePubSubMessageReceiver.resume();
    }
}
