package com.appiancorp.expr.server.environment.epex.kafka;

import com.appiancorp.expr.server.environment.epex.exec.ActorResult;
import com.appiancorp.expr.server.environment.epex.exec.PreparedActor;
import com.appiancorp.expr.server.environment.epex.services.ActorRequestQueueRunnable;
import com.appiancorp.expr.server.environment.epex.tracing.EPExTracingUtils;
import com.appiancorp.tracing.CloseableSpan;
import com.appiancorp.tracing.TracingHelper;
import java.util.Collections;
import java.util.HashSet;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.function.BiPredicate;
import java.util.function.Predicate;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.log4j.Logger;

/* loaded from: input_file:com/appiancorp/expr/server/environment/epex/kafka/ServerActorRequestQueueRunnable.class */
public class ServerActorRequestQueueRunnable implements ActorRequestQueueRunnable {
    private static final Logger LOG = Logger.getLogger(ServerActorRequestQueueRunnable.class);
    private static final long PAUSE_AFTER_EXCEPTION_INIT_MS = 50;
    private static final long PAUSE_AFTER_EXCEPTION_MAX_MS = 30000;
    private static final double PAUSE_INCREASE_FACTOR = 1.5d;
    private final ConsumerProducers consumerProducers;
    private final ConsumerRecordProcessor consumerRecordProcessor;
    private final ActorTransactionHandler actorTransactionHandler;
    private final Set<Predicate<PreparedActor>> prePredicates = new HashSet();
    private final Set<BiPredicate<PreparedActor, ActorResult>> postPredicates = new HashSet();
    private volatile boolean shutdown = false;

    public ServerActorRequestQueueRunnable(ConsumerProducers consumerProducers, ConsumerRecordProcessor consumerRecordProcessor, ActorTransactionHandler actorTransactionHandler) {
        this.consumerProducers = (ConsumerProducers) Objects.requireNonNull(consumerProducers);
        this.consumerRecordProcessor = (ConsumerRecordProcessor) Objects.requireNonNull(consumerRecordProcessor);
        this.actorTransactionHandler = (ActorTransactionHandler) Objects.requireNonNull(actorTransactionHandler);
    }

    public Set<Predicate<PreparedActor>> getPredicates() {
        return Collections.unmodifiableSet(this.prePredicates);
    }

    public Set<BiPredicate<PreparedActor, ActorResult>> getPostPredicates() {
        return Collections.unmodifiableSet(this.postPredicates);
    }

    public void shutdown() {
        LOG.info("Gracefully shutting down executor thread");
        this.shutdown = true;
        this.consumerProducers.closeProducers();
    }

    public void run() {
        long j = 50;
        try {
            LOG.info("Subscribing to [" + this.consumerProducers.getTopic() + "]; groupId = [" + this.consumerProducers.getGroupId() + "].");
            this.consumerProducers.subscribe();
            while (!this.shutdown) {
                try {
                    pollAndProcess(this.consumerProducers);
                    j = 50;
                } catch (Error e) {
                    if (!this.shutdown) {
                        LOG.error("Kafka listener aborted due to Error", e);
                    }
                    throw e;
                } catch (Throwable th) {
                    if (!this.shutdown) {
                        LOG.error("Kafka consumer received exception, but will attempt to continue receiving messages after pause of [" + j + "] ms", th);
                        sleep(j);
                        j = Math.max(PAUSE_AFTER_EXCEPTION_MAX_MS, (long) (j * PAUSE_INCREASE_FACTOR));
                    }
                }
            }
        } finally {
            this.consumerProducers.closeConsumer();
        }
    }

    private void pollAndProcess(ConsumerProducers consumerProducers) {
        Optional poll = consumerProducers.poll();
        if (poll.isPresent()) {
            PendingActorRequest pendingActorRequest = (PendingActorRequest) poll.get();
            TopicPartition topicPartition = getTopicPartition(pendingActorRequest);
            try {
                if (pendingActorRequest.getKafkaRecord().isPresent()) {
                    this.actorTransactionHandler.updateKafkaReference((ConsumerRecord) pendingActorRequest.getKafkaRecord().get());
                }
                processPendingActorRequest(pendingActorRequest);
            } catch (Throwable th) {
                LOG.error("Exception processing records, in-progress driver writes may have occurred. Enabling check for partition: " + topicPartition);
                this.consumerProducers.resetToOffset(topicPartition.partition(), this.actorTransactionHandler.getKafkaReference().offset());
                throw th;
            }
        }
    }

    private TopicPartition getTopicPartition(PendingActorRequest pendingActorRequest) {
        Optional kafkaRecord = pendingActorRequest.getKafkaRecord();
        ActorTransactionHandler actorTransactionHandler = this.actorTransactionHandler;
        actorTransactionHandler.getClass();
        ConsumerRecord consumerRecord = (ConsumerRecord) kafkaRecord.orElseGet(actorTransactionHandler::getKafkaReference);
        return new TopicPartition(consumerRecord.topic(), consumerRecord.partition());
    }

    private void processPendingActorRequest(PendingActorRequest pendingActorRequest) {
        CloseableSpan createCloseableSpan = TracingHelper.createCloseableSpan("processActors");
        Throwable th = null;
        try {
            try {
                try {
                    this.consumerRecordProcessor.processRecord(this.actorTransactionHandler, pendingActorRequest, this.prePredicates, this.postPredicates);
                    if (createCloseableSpan != null) {
                        if (0 == 0) {
                            createCloseableSpan.close();
                            return;
                        }
                        try {
                            createCloseableSpan.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                } catch (Throwable th3) {
                    EPExTracingUtils.logAndAddToTrace(LOG, "Failed to process consumer record: " + pendingActorRequest, th3);
                    throw th3;
                }
            } catch (Throwable th4) {
                th = th4;
                throw th4;
            }
        } catch (Throwable th5) {
            if (createCloseableSpan != null) {
                if (th != null) {
                    try {
                        createCloseableSpan.close();
                    } catch (Throwable th6) {
                        th.addSuppressed(th6);
                    }
                } else {
                    createCloseableSpan.close();
                }
            }
            throw th5;
        }
    }

    private static void sleep(long j) {
        try {
            Thread.sleep(j);
        } catch (InterruptedException e) {
            Thread.interrupted();
        }
    }

    public void addPredicate(Predicate<PreparedActor> predicate) {
        if (predicate == null) {
            throw new NullPointerException("prePredicate");
        }
        this.prePredicates.add(predicate);
    }

    public void removePredicate(Predicate<PreparedActor> predicate) {
        if (predicate == null) {
            throw new NullPointerException("prePredicate");
        }
        this.prePredicates.remove(predicate);
    }

    public void addPostPredicate(BiPredicate<PreparedActor, ActorResult> biPredicate) {
        if (biPredicate == null) {
            throw new NullPointerException("postBiPredicate");
        }
        this.postPredicates.add(biPredicate);
    }

    public void removePostPredicate(BiPredicate<PreparedActor, ActorResult> biPredicate) {
        if (biPredicate == null) {
            throw new NullPointerException("postBiPredicate");
        }
        this.postPredicates.remove(biPredicate);
    }
}
