package com.appiancorp.portaldesigner.messaging;

import com.appian.dl.replicator.ReplicationState;
import com.appian.dl.replicator.Sink;
import com.appian.kafka.KafkaConsumerProcessor;
import com.appian.kafka.KafkaTopicManager;
import com.appiancorp.ix.analysis.IaReplicator;
import com.appiancorp.ix.analysis.index.IaType;
import com.appiancorp.ix.analysis.index.TypedUuid;
import com.appiancorp.portal.persistence.PortalService;
import com.appiancorp.portaldesigner.manager.AffectedPortalsCalculationMessageSender;
import com.appiancorp.portaldesigner.manager.AffectedPortalsMessageType;
import com.appiancorp.portaldesigner.messaging.metrics.PortalBulkLoadKafkaMetricsCollector;
import com.appiancorp.security.auth.SecurityEscalator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.log4j.Logger;

/* loaded from: input_file:com/appiancorp/portaldesigner/messaging/PortalBulkLoadKafkaConsumer.class */
public class PortalBulkLoadKafkaConsumer implements KafkaConsumerProcessor<PortalBulkLoadMessageToken> {
    static final String FAILED_TO_FIND_IA_SINK_ERROR = "Failed to find a Sink in the IaReplicator with the expected key";
    private static final long MILLIS_TO_SLEEP_WHEN_IA_BULK_LOAD_IS_IN_PROGRESS = TimeUnit.SECONDS.toMillis(30);
    static final long MINIMUM_MILLIS_BETWEEN_PORTAL_BULK_LOADS = TimeUnit.MINUTES.toMillis(2);
    static Long MAX_WAIT_TIME_BEFORE_STARTING_TO_PROCESS = 1000L;
    public static final String PORTAL_BULK_LOAD_CONSUMER_GROUP = "PORTAL_BULK_LOAD_CONSUMER_GROUP";
    private final PortalBulkLoadKafkaMessageHandler messageHandler;
    private final PortalService portalService;
    private final AffectedPortalsCalculationMessageSender affectedPortalsCalculationMessageSender;
    private final IaReplicator iaReplicator;
    private final SecurityEscalator securityEscalator;
    private final Supplier<Long> currentTimeProvider;
    private final Logger LOGGER = Logger.getLogger(PortalBulkLoadKafkaConsumer.class);
    private long lastPortalBulkLoadKickoffTimestamp = 0;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/appiancorp/portaldesigner/messaging/PortalBulkLoadKafkaConsumer$InProgressBulkLoad.class */
    public static class InProgressBulkLoad {
        public final String nodeName;
        public final String sourceKey;

        public InProgressBulkLoad(String str, String str2) {
            this.nodeName = str;
            this.sourceKey = str2;
        }
    }

    public PortalBulkLoadKafkaConsumer(PortalBulkLoadKafkaMessageHandler portalBulkLoadKafkaMessageHandler, KafkaTopicManager kafkaTopicManager, PortalService portalService, AffectedPortalsCalculationMessageSender affectedPortalsCalculationMessageSender, IaReplicator iaReplicator, SecurityEscalator securityEscalator, Supplier<Long> supplier) {
        this.messageHandler = portalBulkLoadKafkaMessageHandler;
        this.portalService = portalService;
        this.affectedPortalsCalculationMessageSender = affectedPortalsCalculationMessageSender;
        this.iaReplicator = iaReplicator;
        this.securityEscalator = securityEscalator;
        this.currentTimeProvider = supplier;
        kafkaTopicManager.registerQueueConsumer(PortalBulkLoadKafkaTopic.PORTAL_BULK_LOAD_TOPIC_NAME, PortalBulkLoadKafkaTopic.PORTAL_BULK_LOAD_TOPIC_NAME, 10, () -> {
            return 100L;
        }, () -> {
            return Double.valueOf(TimeUnit.SECONDS.toMillis(1L));
        }, PortalBulkLoadKafkaMetricsCollector.PORTAL_BULK_LOAD_KAFKA_METRICS_COLLECTOR, this, PORTAL_BULK_LOAD_CONSUMER_GROUP);
    }

    public int processMessages(List<PortalBulkLoadMessageToken> list) {
        return ((Integer) this.securityEscalator.runAsAdmin(() -> {
            return Integer.valueOf(processMessagesInternal(list.size()));
        })).intValue();
    }

    public void onDeadLetteringDataItems(List<PortalBulkLoadMessageToken> list) {
        this.LOGGER.warn("Portal Bulk Load Topic: Failed to process messages and is committing " + list.size() + " messages to unblock the queue");
    }

    public Class<PortalBulkLoadMessageToken> getSupportedMessageType() {
        return PortalBulkLoadMessageToken.class;
    }

    public void overrideConsumerProperties(Properties properties) {
        try {
            properties.setProperty("fetch.min.bytes", String.valueOf(this.messageHandler.toKafkaMessageValue(PortalBulkLoadMessageToken.fromV1()).length));
            properties.setProperty("fetch.max.wait.ms", String.valueOf(MAX_WAIT_TIME_BEFORE_STARTING_TO_PROCESS));
            super.overrideConsumerProperties(properties);
        } catch (Exception e) {
            throw new IllegalStateException("Failed to compute the size of a Portal Bulk Load message", e);
        }
    }

    int processMessagesInternal(int i) {
        this.LOGGER.info(String.format("Starting to handle %1$d request(s) to bulk load Portal data", Integer.valueOf(i)));
        InProgressBulkLoad findFirstInProgressBulkLoad = findFirstInProgressBulkLoad();
        if (findFirstInProgressBulkLoad != null) {
            sleepDueToInProgressBulkLoad(findFirstInProgressBulkLoad);
            this.LOGGER.info("Returning control to Kafka after sleeping so that bulk load requests are retried");
            return 0;
        }
        List<String> allPortalUuids = this.portalService.getAllPortalUuids();
        if (!allPortalUuids.isEmpty() && !isThrottled()) {
            kickOffPortalBulkLoad(allPortalUuids);
            this.lastPortalBulkLoadKickoffTimestamp = this.currentTimeProvider.get().longValue();
        }
        this.LOGGER.info(String.format("Finished handling %1$d request(s) to bulk load Portal data", Integer.valueOf(i)));
        return i;
    }

    void kickOffPortalBulkLoad(List<String> list) {
        this.LOGGER.info(String.format("Starting to kick off a Portal bulk load of %1$d Portals", Integer.valueOf(list.size())));
        Set set = (Set) list.stream().map(str -> {
            return new TypedUuid(IaType.PORTAL, str);
        }).collect(Collectors.toSet());
        Stream<String> stream = list.stream();
        PortalService portalService = this.portalService;
        portalService.getClass();
        try {
            this.affectedPortalsCalculationMessageSender.calculateAffectedPortals(AffectedPortalsMessageType.upsert, set, "SYSTEM_ADMINISTRATOR_USER0", this.currentTimeProvider.get(), (Map) stream.map(portalService::getPublishInfoByPortalUuid).collect(Collectors.toMap((v0) -> {
                return v0.getPortalUuid();
            }, (v0) -> {
                return v0.getTargetTag();
            })));
            this.LOGGER.info(String.format("Finished kicking off a Portal bulk load of %1$d Portal(s)", Integer.valueOf(list.size())));
        } catch (Exception e) {
            PortalBulkLoadKafkaMetricsCollector.PORTAL_BULK_LOAD_KAFKA_METRICS_COLLECTOR.incrementPortalBulkLoadExceptions();
            throw new IllegalStateException("Failed to request Affected Portals recalculation", e);
        }
    }

    InProgressBulkLoad findFirstInProgressBulkLoad() {
        Sink iaSink = getIaSink(this.iaReplicator);
        for (String str : this.iaReplicator.getSourceKeys()) {
            ReplicationState replicationState = iaSink.getReplicationState(str);
            if (replicationState != null && replicationState.getLatestReplicatedTxnId() == null) {
                return new InProgressBulkLoad(replicationState.getReplicatingServer(), str);
            }
        }
        return null;
    }

    Sink getIaSink(IaReplicator iaReplicator) {
        return (Sink) iaReplicator.getSinks().stream().filter(sink -> {
            return "designer-objects-ia".equals(sink.getKey());
        }).findFirst().orElseThrow(() -> {
            return new IllegalStateException(FAILED_TO_FIND_IA_SINK_ERROR);
        });
    }

    void sleepDueToInProgressBulkLoad(InProgressBulkLoad inProgressBulkLoad) {
        try {
            this.LOGGER.info(String.format("Sleeping for %1$d msec before triggering Portal bulk load, due to an in-progress IA bulk load from source \"%2$s\" on node \"%3$s\".", Long.valueOf(MILLIS_TO_SLEEP_WHEN_IA_BULK_LOAD_IS_IN_PROGRESS), inProgressBulkLoad.sourceKey, inProgressBulkLoad.nodeName));
            Thread.sleep(MILLIS_TO_SLEEP_WHEN_IA_BULK_LOAD_IS_IN_PROGRESS);
        } catch (InterruptedException e) {
            PortalBulkLoadKafkaMetricsCollector.PORTAL_BULK_LOAD_KAFKA_METRICS_COLLECTOR.incrementPortalBulkLoadExceptions();
            throw new IllegalStateException("Got interrupted while waiting during a bulk load", e);
        }
    }

    boolean isThrottled() {
        if (this.currentTimeProvider.get().longValue() - this.lastPortalBulkLoadKickoffTimestamp >= MINIMUM_MILLIS_BETWEEN_PORTAL_BULK_LOADS) {
            return false;
        }
        this.LOGGER.info(String.format("Ignoring/throttling a request to start Portal bulk load because we previously started one less than %1$d msec ago.", Long.valueOf(MINIMUM_MILLIS_BETWEEN_PORTAL_BULK_LOADS)));
        return true;
    }

    long getLastPortalBulkLoadKickoffTimestamp() {
        return this.lastPortalBulkLoadKickoffTimestamp;
    }
}
