package com.appiancorp.expr.server.environment.epex.kafka;

import com.appiancorp.common.monitoring.Stopwatch;
import com.appiancorp.core.data.Record;
import com.appiancorp.expr.server.environment.epex.ActorDbMarker;
import com.appiancorp.expr.server.environment.epex.EPExConfiguration;
import com.appiancorp.expr.server.environment.epex.driveraccess.DriverAccess;
import com.appiancorp.expr.server.environment.epex.exec.ActorDestinationQueueType;
import com.appiancorp.expr.server.environment.epex.exec.ActorRequestEvaluable;
import com.appiancorp.expr.server.environment.epex.exec.ActorRequestEvaluableFactoryBuilder;
import com.appiancorp.expr.server.environment.epex.exec.PreparedActor;
import com.appiancorp.expr.server.environment.epex.metrics.ServerActorRequestQueueMetricsCollectorImpl;
import com.appiancorp.expr.server.environment.epex.services.ExternalActorRequestQueue;
import com.appiancorp.expr.server.environment.epex.services.InMemoryActorRequestQueue;
import com.appiancorp.expr.server.environment.epex.tracing.EPExTracingUtils;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import com.google.common.collect.Queues;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.log4j.Logger;
import org.jetbrains.annotations.NotNull;

/* loaded from: input_file:com/appiancorp/expr/server/environment/epex/kafka/ActorTransactionHandlerImpl.class */
public class ActorTransactionHandlerImpl implements ActorTransactionHandler {
    private static final Logger LOG = Logger.getLogger(ActorTransactionHandlerImpl.class);
    private final ConsumerProducers consumerProducers;
    private final ActorDbMarker actorDbMarker;
    private final EPExConfiguration config;
    private final DriverAccess driverAccess;
    private ConsumerRecord<String, String> kafkaRecord = null;
    private Stopwatch yieldTimer = null;
    private final Map<String, RecordMetadata> sideEffectLogMetadata = Maps.newHashMap();
    private final Map<String, RecordMetadata> sideEffectLogActionContinuationMetadata = Maps.newHashMap();
    private final Queue<RecordMetadata> sideEffectLogMetadataUpdateQueue = Queues.newConcurrentLinkedQueue();

    public ActorTransactionHandlerImpl(ConsumerProducers consumerProducers, ActorDbMarker actorDbMarker, DriverAccess driverAccess, EPExConfiguration ePExConfiguration) {
        this.consumerProducers = (ConsumerProducers) Objects.requireNonNull(consumerProducers);
        this.actorDbMarker = (ActorDbMarker) Objects.requireNonNull(actorDbMarker);
        this.driverAccess = (DriverAccess) Objects.requireNonNull(driverAccess);
        this.config = (EPExConfiguration) Objects.requireNonNull(ePExConfiguration);
    }

    public long initiateTransaction(ActorRequestEvaluable actorRequestEvaluable, PreparedActor preparedActor, List<ActorRequestEvaluable> list) {
        if (this.kafkaRecord == null) {
            throw new IllegalStateException("Attempted transaction with no reference to Kafka");
        }
        InMemoryActorRequestQueue inMemoryQueue = this.consumerProducers.getInMemoryQueue();
        long enqueue = inMemoryQueue.enqueue(filterNoSideEffectFlows(list, preparedActor), preparedActor);
        handleSideEffectLogUpdates(inMemoryQueue, actorRequestEvaluable, filterSyncFlows(list, preparedActor));
        return shouldCheckpointState(actorRequestEvaluable, inMemoryQueue) ? initiateFullTransaction(inMemoryQueue) : enqueue;
    }

    private boolean shouldCheckpointState(ActorRequestEvaluable actorRequestEvaluable, InMemoryActorRequestQueue inMemoryActorRequestQueue) {
        return isInMemoryQueueEmpty(inMemoryActorRequestQueue) || shouldUpdateSideEffectLog(actorRequestEvaluable) || shouldYieldExecution();
    }

    private static boolean isInMemoryQueueEmpty(InMemoryActorRequestQueue inMemoryActorRequestQueue) {
        return inMemoryActorRequestQueue.getQueueSize(ActorDestinationQueueType.IN_MEMORY) == 0;
    }

    private boolean shouldUpdateSideEffectLog(ActorRequestEvaluable actorRequestEvaluable) {
        return ((ActorRequestEvaluable) Objects.requireNonNull(actorRequestEvaluable)).hasSideEffects() && !this.sideEffectLogMetadataUpdateQueue.isEmpty();
    }

    private boolean shouldYieldExecution() {
        if (this.yieldTimer == null) {
            this.yieldTimer = new Stopwatch();
        }
        return TimeUnit.MILLISECONDS.toMinutes(this.yieldTimer.measureMillis()) >= this.config.getExecutorYieldTimeMinutes();
    }

    /* JADX WARN: Multi-variable type inference failed */
    private boolean handleSideEffectLogUpdates(InMemoryActorRequestQueue inMemoryActorRequestQueue, ActorRequestEvaluable actorRequestEvaluable, List<ActorRequestEvaluable> list) {
        Objects.requireNonNull(actorRequestEvaluable);
        if (isSideEffectStart(list)) {
            inMemoryActorRequestQueue.enqueueTo(list, ActorDestinationQueueType.SIDE_EFFECTS);
            return true;
        }
        String idOf = idOf(actorRequestEvaluable);
        if (actorRequestEvaluable.hasSideEffects()) {
            Preconditions.checkState(list.size() <= 1, "Side effects %s should have at most one continuation flow.", idOf);
            this.sideEffectLogMetadataUpdateQueue.offer(Objects.requireNonNull(this.sideEffectLogMetadata.remove(idOf), "Kafka metadata missing for actor [" + idOf + "]"));
            inMemoryActorRequestQueue.enqueueTo(list, ActorDestinationQueueType.SIDE_EFFECTS);
            return true;
        }
        RecordMetadata remove = this.sideEffectLogActionContinuationMetadata.remove(idOf);
        if (remove == null) {
            return false;
        }
        this.sideEffectLogMetadataUpdateQueue.offer(remove);
        return true;
    }

    private static boolean isSideEffectStart(List<ActorRequestEvaluable> list) {
        if (list.isEmpty()) {
            return false;
        }
        List<ActorRequestEvaluable> filterSideEffectFlows = filterSideEffectFlows(list);
        if (filterSideEffectFlows.isEmpty()) {
            return false;
        }
        Preconditions.checkState(filterSideEffectFlows.size() == 1 && list.size() == 1, "Expected one and only one outgoing flow for actor with side effect");
        return true;
    }

    private List<ActorRequestEvaluable> filterSyncFlows(List<ActorRequestEvaluable> list, PreparedActor preparedActor) {
        return (List) list.stream().filter(actorRequestEvaluable -> {
            return !isAsyncSubprocess(actorRequestEvaluable, preparedActor);
        }).collect(Collectors.toList());
    }

    private static boolean isAsyncSubprocess(ActorRequestEvaluable actorRequestEvaluable, PreparedActor preparedActor) {
        return preparedActor.isSubprocessNode() && actorRequestEvaluable.isAsynchronous();
    }

    private static List<ActorRequestEvaluable> filterNoSideEffectFlows(List<ActorRequestEvaluable> list, PreparedActor preparedActor) {
        return (List) list.stream().filter(actorRequestEvaluable -> {
            return noSideEffects(preparedActor, actorRequestEvaluable);
        }).collect(Collectors.toList());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static boolean noSideEffects(PreparedActor preparedActor, ActorRequestEvaluable actorRequestEvaluable) {
        return !actorRequestEvaluable.hasSideEffects() || isAsyncSubprocess(actorRequestEvaluable, preparedActor);
    }

    private static List<ActorRequestEvaluable> filterSideEffectFlows(List<ActorRequestEvaluable> list) {
        return (List) list.stream().filter((v0) -> {
            return v0.hasSideEffects();
        }).collect(Collectors.toList());
    }

    private static String idOf(ActorRequestEvaluable actorRequestEvaluable) {
        return String.valueOf(actorRequestEvaluable.getTransactionalIdentifier());
    }

    public void updateKafkaReference(ConsumerRecord<String, String> consumerRecord) {
        this.kafkaRecord = (ConsumerRecord) Objects.requireNonNull(consumerRecord);
        this.yieldTimer = this.yieldTimer == null ? new Stopwatch() : this.yieldTimer;
    }

    public ConsumerRecord<String, String> getKafkaReference() {
        return this.kafkaRecord;
    }

    private long initiateFullTransaction(InMemoryActorRequestQueue inMemoryActorRequestQueue) {
        long doTransaction;
        ActorRequestEvaluable actorRequestEvaluable = getActorRequestEvaluable(this.kafkaRecord);
        List remove = inMemoryActorRequestQueue.remove(ActorDestinationQueueType.ASYNC_SUB);
        if (shouldYieldExecution()) {
            List remove2 = inMemoryActorRequestQueue.remove(ActorDestinationQueueType.IN_MEMORY);
            remove2.addAll(inMemoryActorRequestQueue.remove(ActorDestinationQueueType.SIDE_EFFECTS));
            remove2.addAll(remove);
            LOG.info("Process exceeded execution time limit of " + this.config.getExecutorYieldTimeMinutes() + " minutes. Yielding execution of " + actorRequestEvaluable);
            doTransaction = doTransaction(actorRequestEvaluable, toActorRequestEvaluables(remove2), Collections.emptyList());
        } else {
            List remove3 = inMemoryActorRequestQueue.remove(ActorDestinationQueueType.SIDE_EFFECTS);
            doTransaction = doTransaction(actorRequestEvaluable, toActorRequestEvaluables(remove), toActorRequestEvaluables(remove3));
            if (!remove3.isEmpty()) {
                inMemoryActorRequestQueue.enqueueTo(filterSideEffectFlows(toActorRequestEvaluables(remove3)), ActorDestinationQueueType.IN_MEMORY);
            }
        }
        this.yieldTimer = null;
        return doTransaction;
    }

    private long doTransaction(ActorRequestEvaluable actorRequestEvaluable, List<ActorRequestEvaluable> list, List<ActorRequestEvaluable> list2) {
        TopicPartition createTopicPartition = createTopicPartition(this.kafkaRecord);
        ExternalActorRequestQueue externalQueue = this.consumerProducers.getExternalQueue(createTopicPartition);
        try {
            beginTransaction(externalQueue);
            writeTransactionMarker(actorRequestEvaluable);
            commitToDriver(actorRequestEvaluable);
            markSideEffectLogComplete(createTopicPartition);
            enqueueSideEffectLogStart(createTopicPartition, list2);
            List<RecordMetadata> markAsProcessedAndCommit = markAsProcessedAndCommit(externalQueue, toRecords(list), actorRequestEvaluable);
            removeTransactionMarker(actorRequestEvaluable);
            saveSideEffectLogMetadatas(list2, markAsProcessedAndCommit);
            return markAsProcessedAndCommit.size() - list2.size();
        } catch (Exception e) {
            externalQueue.abortOnGoingTransaction();
            throw e;
        }
    }

    private void commitToDriver(ActorRequestEvaluable actorRequestEvaluable) {
        try {
            this.driverAccess.commitToDriver();
        } catch (Exception e) {
            EPExTracingUtils.logAndAddToTrace(LOG, "ERROR in DRIVER COMMIT on actor [" + actorRequestEvaluable + "].", e);
            ServerActorRequestQueueMetricsCollectorImpl.METRICS_COLLECTOR.recordActorDatastoreFailure();
            throw e;
        }
    }

    private List<RecordMetadata> markAsProcessedAndCommit(ExternalActorRequestQueue externalActorRequestQueue, List<Record> list, ActorRequestEvaluable actorRequestEvaluable) {
        try {
            externalActorRequestQueue.markAsProcessed(this.kafkaRecord, this.consumerProducers.getConsumerGroupMetadata());
            externalActorRequestQueue.enqueue(list);
            return externalActorRequestQueue.commit();
        } catch (Exception e) {
            EPExTracingUtils.logAndAddToTrace(LOG, "ERROR in KAFKA ENQUEUE AND COMMIT of actor [" + actorRequestEvaluable + "].", e);
            ServerActorRequestQueueMetricsCollectorImpl.METRICS_COLLECTOR.recordActorKafkaFailure();
            throw e;
        }
    }

    private void enqueueSideEffectLogStart(TopicPartition topicPartition, List<ActorRequestEvaluable> list) {
        try {
            this.consumerProducers.getSideEffectLogQueue(topicPartition).enqueueSideEffectLogStarted(toRecords(list));
        } catch (Exception e) {
            EPExTracingUtils.logAndAddToTrace(LOG, "ERROR in KAFKA ENQUEUE  of action flows " + list + " onto " + this.config.getSideEffectLogTopicName(), e);
            ServerActorRequestQueueMetricsCollectorImpl.METRICS_COLLECTOR.recordActorKafkaFailure();
            throw e;
        }
    }

    private void markSideEffectLogComplete(TopicPartition topicPartition) {
        if (this.sideEffectLogMetadataUpdateQueue.isEmpty()) {
            return;
        }
        while (!this.sideEffectLogMetadataUpdateQueue.isEmpty()) {
            RecordMetadata poll = this.sideEffectLogMetadataUpdateQueue.poll();
            try {
                this.consumerProducers.getSideEffectLogQueue(topicPartition).markSideEffectLogComplete(poll);
            } catch (Exception e) {
                EPExTracingUtils.logAndAddToTrace(LOG, "ERROR in KAFKA MARK-PROCESSED for actor with metadata " + poll + " on " + this.config.getSideEffectLogTopicName(), e);
                ServerActorRequestQueueMetricsCollectorImpl.METRICS_COLLECTOR.recordActorKafkaFailure();
                throw e;
            }
        }
    }

    private void saveSideEffectLogMetadatas(List<ActorRequestEvaluable> list, List<RecordMetadata> list2) {
        for (int i = 0; i < list.size(); i++) {
            saveSideEffectLogMetadata(list.get(i), list2.get(i));
        }
    }

    private void saveSideEffectLogMetadata(ActorRequestEvaluable actorRequestEvaluable, RecordMetadata recordMetadata) {
        String idOf = idOf(actorRequestEvaluable);
        if (actorRequestEvaluable.hasSideEffects()) {
            this.sideEffectLogMetadata.put(idOf, recordMetadata);
        } else {
            this.sideEffectLogActionContinuationMetadata.put(idOf, recordMetadata);
        }
    }

    private static List<Record> toRecords(List<ActorRequestEvaluable> list) {
        return (List) list.stream().map((v0) -> {
            return v0.toRecord();
        }).collect(Collectors.toList());
    }

    private void beginTransaction(ExternalActorRequestQueue externalActorRequestQueue) {
        try {
            externalActorRequestQueue.beginTransaction();
        } catch (Exception e) {
            ServerActorRequestQueueMetricsCollectorImpl.METRICS_COLLECTOR.recordActorKafkaFailure();
            throw e;
        }
    }

    private static List<ActorRequestEvaluable> toActorRequestEvaluables(List<PendingActorRequest> list) {
        return (List) list.stream().map((v0) -> {
            return v0.getActorRequestEvaluable();
        }).collect(Collectors.toList());
    }

    private void writeTransactionMarker(ActorRequestEvaluable actorRequestEvaluable) {
        try {
            this.actorDbMarker.writeMarker(actorRequestEvaluable);
        } catch (Exception e) {
            EPExTracingUtils.logAndAddToTrace(LOG, "ERROR in writing marker for " + actorRequestEvaluable, e);
            ServerActorRequestQueueMetricsCollectorImpl.METRICS_COLLECTOR.recordActorDatastoreFailure();
            throw e;
        }
    }

    private void removeTransactionMarker(ActorRequestEvaluable actorRequestEvaluable) {
        try {
            this.actorDbMarker.removeMarker(actorRequestEvaluable);
        } catch (Exception e) {
            EPExTracingUtils.logAndAddToTrace(LOG, "ERROR in removing marker for " + actorRequestEvaluable, e);
            ServerActorRequestQueueMetricsCollectorImpl.METRICS_COLLECTOR.recordActorDatastoreFailure();
            throw e;
        }
    }

    private static TopicPartition createTopicPartition(ConsumerRecord<String, String> consumerRecord) {
        return new TopicPartition(consumerRecord.topic(), consumerRecord.partition());
    }

    @NotNull
    ActorRequestEvaluable getActorRequestEvaluable(ConsumerRecord<String, String> consumerRecord) {
        return new ActorRequestEvaluableFactoryBuilder().buildWithUserUuid(consumerRecord).create();
    }
}
