package org.springframework.pulsar.function;

import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Stream;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.functions.Utils;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.context.SmartLifecycle;
import org.springframework.core.log.LogAccessor;
import org.springframework.pulsar.PulsarException;
import org.springframework.pulsar.core.PulsarAdministration;

/* loaded from: input_file:org/springframework/pulsar/function/PulsarFunctionAdministration.class */
public class PulsarFunctionAdministration implements SmartLifecycle {
    private final PulsarAdministration pulsarAdministration;
    private final ObjectProvider<PulsarFunction> pulsarFunctions;
    private final ObjectProvider<PulsarSink> pulsarSinks;
    private final ObjectProvider<PulsarSource> pulsarSources;
    private final boolean failFast;
    private final boolean propagateFailures;
    private final boolean propagateStopFailures;
    private volatile boolean running;
    private final LogAccessor logger = new LogAccessor(getClass());
    private final List<PulsarFunctionOperations<?>> processedFunctions = new ArrayList();

    /* loaded from: input_file:org/springframework/pulsar/function/PulsarFunctionAdministration$PulsarFunctionException.class */
    public static class PulsarFunctionException extends PulsarException {
        private final Map<PulsarFunctionOperations<?>, Exception> failures;

        public PulsarFunctionException(String str, Map<PulsarFunctionOperations<?>, Exception> map) {
            super(str);
            this.failures = map;
        }

        public Map<PulsarFunctionOperations<?>, Exception> getFailures() {
            return this.failures;
        }
    }

    public PulsarFunctionAdministration(PulsarAdministration pulsarAdministration, ObjectProvider<PulsarFunction> objectProvider, ObjectProvider<PulsarSink> objectProvider2, ObjectProvider<PulsarSource> objectProvider3, boolean z, boolean z2, boolean z3) {
        this.pulsarAdministration = pulsarAdministration;
        this.pulsarFunctions = objectProvider;
        this.pulsarSinks = objectProvider2;
        this.pulsarSources = objectProvider3;
        this.failFast = z;
        this.propagateFailures = z2;
        this.propagateStopFailures = z3;
    }

    public synchronized void start() {
        if (this.running) {
            return;
        }
        this.logger.debug(() -> {
            return "Processing Pulsar Functions";
        });
        long currentTimeMillis = System.currentTimeMillis();
        createOrUpdateUserDefinedFunctions();
        this.running = true;
        long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
        this.logger.debug(() -> {
            return "Processed Pulsar Functions in " + currentTimeMillis2 + " ms";
        });
    }

    public synchronized void stop() {
        if (this.running) {
            this.logger.debug(() -> {
                return "Enforcing stop policy on Pulsar Functions";
            });
            this.running = false;
            long currentTimeMillis = System.currentTimeMillis();
            enforceStopPolicyOnUserDefinedFunctions();
            long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
            this.logger.debug(() -> {
                return "Enforced stop policy on Pulsar Functions in " + currentTimeMillis2 + " ms";
            });
        }
    }

    public boolean isRunning() {
        return this.running;
    }

    public void createOrUpdateUserDefinedFunctions() {
        List<PulsarFunctionOperations<?>> list = Stream.concat(Stream.concat(this.pulsarFunctions.orderedStream(), this.pulsarSinks.orderedStream()), this.pulsarSources.orderedStream()).toList();
        if (list.isEmpty()) {
            this.logger.debug("No user defined functions to process.");
            return;
        }
        try {
            PulsarAdmin createAdminClient = this.pulsarAdministration.createAdminClient();
            try {
                LinkedHashMap linkedHashMap = new LinkedHashMap();
                for (PulsarFunctionOperations<?> pulsarFunctionOperations : list) {
                    Optional<Exception> createOrUpdateFunction = createOrUpdateFunction(pulsarFunctionOperations, createAdminClient);
                    if (!createOrUpdateFunction.isEmpty()) {
                        linkedHashMap.put(pulsarFunctionOperations, createOrUpdateFunction.get());
                        if (this.failFast) {
                            break;
                        }
                    } else {
                        this.processedFunctions.add(pulsarFunctionOperations);
                    }
                }
                if (!linkedHashMap.isEmpty()) {
                    String str = "Encountered " + linkedHashMap.size() + " error(s) creating/updating functions: " + linkedHashMap;
                    if (this.propagateFailures) {
                        throw new PulsarFunctionException(str, linkedHashMap);
                    }
                    this.logger.error(() -> {
                        return str;
                    });
                }
                if (createAdminClient != null) {
                    createAdminClient.close();
                }
            } finally {
            }
        } catch (PulsarClientException e) {
            String str2 = "Unable to create/update functions - could not create PulsarAdmin: " + e.getMessage();
            if (this.propagateFailures) {
                throw new PulsarException(str2, e);
            }
            this.logger.error(e, () -> {
                return str2;
            });
        }
    }

    private Optional<Exception> createOrUpdateFunction(PulsarFunctionOperations<?> pulsarFunctionOperations, PulsarAdmin pulsarAdmin) {
        try {
            boolean isFunctionPackageUrlSupported = Utils.isFunctionPackageUrlSupported(pulsarFunctionOperations.archive());
            if (pulsarFunctionOperations.functionExists(pulsarAdmin)) {
                if (isFunctionPackageUrlSupported) {
                    this.logger.info(() -> {
                        return buildLogMsg(pulsarFunctionOperations, true, true);
                    });
                    pulsarFunctionOperations.updateWithUrl(pulsarAdmin);
                } else {
                    this.logger.info(() -> {
                        return buildLogMsg(pulsarFunctionOperations, true, false);
                    });
                    pulsarFunctionOperations.update(pulsarAdmin);
                }
            } else if (isFunctionPackageUrlSupported) {
                this.logger.info(() -> {
                    return buildLogMsg(pulsarFunctionOperations, false, true);
                });
                pulsarFunctionOperations.createWithUrl(pulsarAdmin);
            } else {
                this.logger.info(() -> {
                    return buildLogMsg(pulsarFunctionOperations, false, false);
                });
                pulsarFunctionOperations.create(pulsarAdmin);
            }
            return Optional.empty();
        } catch (Exception e) {
            return Optional.of(e);
        } catch (PulsarAdminException e2) {
            if (e2.getStatusCode() != 400 || !"Update contains no change".equals(e2.getHttpError())) {
                return Optional.of(e2);
            }
            this.logger.debug(() -> {
                return "Update contained no change for " + functionDesc(pulsarFunctionOperations);
            });
            return Optional.empty();
        }
    }

    private String buildLogMsg(PulsarFunctionOperations<?> pulsarFunctionOperations, boolean z, boolean z2) {
        Object[] objArr = new Object[4];
        objArr[0] = z ? "Updating" : "Creating";
        objArr[1] = functionDesc(pulsarFunctionOperations);
        objArr[2] = z2 ? "url" : "local";
        objArr[3] = pulsarFunctionOperations.archive();
        return "%s %s (using %s archive: %s)".formatted(objArr);
    }

    List<PulsarFunctionOperations<?>> getProcessedFunctions() {
        return this.processedFunctions;
    }

    public void enforceStopPolicyOnUserDefinedFunctions() {
        if (this.processedFunctions.isEmpty()) {
            this.logger.debug("No processed functions to enforce stop policy on");
            return;
        }
        try {
            PulsarAdmin createAdminClient = this.pulsarAdministration.createAdminClient();
            try {
                LinkedHashMap linkedHashMap = new LinkedHashMap();
                Collections.reverse(this.processedFunctions);
                for (PulsarFunctionOperations<?> pulsarFunctionOperations : this.processedFunctions) {
                    enforceStopPolicyOnFunction(pulsarFunctionOperations, createAdminClient).ifPresent(exc -> {
                        linkedHashMap.put(pulsarFunctionOperations, exc);
                    });
                }
                if (!linkedHashMap.isEmpty()) {
                    String str = "Encountered " + linkedHashMap.size() + " error(s) enforcing stop policy on functions: " + linkedHashMap;
                    if (this.propagateStopFailures) {
                        throw new PulsarFunctionException(str, linkedHashMap);
                    }
                    this.logger.error(() -> {
                        return str;
                    });
                }
                if (createAdminClient != null) {
                    createAdminClient.close();
                }
            } finally {
            }
        } catch (PulsarClientException e) {
            String str2 = "Unable to enforce stop policy on functions - could not create PulsarAdmin: " + e.getMessage();
            if (this.propagateStopFailures) {
                throw new PulsarException(str2, e);
            }
            this.logger.error(e, () -> {
                return str2;
            });
        }
    }

    private Optional<Exception> enforceStopPolicyOnFunction(PulsarFunctionOperations<?> pulsarFunctionOperations, PulsarAdmin pulsarAdmin) {
        switch (pulsarFunctionOperations.stopPolicy()) {
            case NONE:
                this.logger.info(() -> {
                    return "No stop policy for %s - leaving alone".formatted(functionDesc(pulsarFunctionOperations));
                });
                return Optional.empty();
            case STOP:
                this.logger.info(() -> {
                    return "Stopping %s".formatted(functionDesc(pulsarFunctionOperations));
                });
                return safeInvoke(() -> {
                    pulsarFunctionOperations.stop(pulsarAdmin);
                });
            case DELETE:
                this.logger.info(() -> {
                    return "Deleting %s".formatted(functionDesc(pulsarFunctionOperations));
                });
                return safeInvoke(() -> {
                    pulsarFunctionOperations.delete(pulsarAdmin);
                });
            default:
                throw new IncompatibleClassChangeError();
        }
    }

    private Optional<Exception> safeInvoke(Runnable runnable) {
        try {
            runnable.run();
            return Optional.empty();
        } catch (Exception e) {
            return Optional.of(e);
        }
    }

    private String functionDesc(PulsarFunctionOperations<?> pulsarFunctionOperations) {
        return "'%s' %s".formatted(pulsarFunctionOperations.name(), pulsarFunctionOperations.type().toString().toLowerCase());
    }
}
