package com.appiancorp.expr.server.environment.epex.kafka;

import com.appiancorp.common.monitoring.Stopwatch;
import com.appiancorp.expr.server.environment.epex.KafkaActorRequestQueue;
import com.appiancorp.expr.server.environment.epex.metrics.ActorExecutorKafkaMetricsCollector;
import com.appiancorp.expr.server.environment.epex.services.ExternalActorRequestQueue;
import com.appiancorp.expr.server.environment.epex.services.InMemoryActorRequestQueue;
import com.appiancorp.expr.server.environment.epex.services.SideEffectLogQueue;
import com.google.common.collect.Maps;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;

/* loaded from: input_file:com/appiancorp/expr/server/environment/epex/kafka/ConsumerProducersImpl.class */
public class ConsumerProducersImpl implements ConsumerProducers {
    private static final Duration POLLING_DURATION = Duration.ofMillis(500);
    private Consumer<String, String> consumer;
    private final KafkaConsumerFactory consumerFactory;
    private final InMemoryActorRequestQueue inMemoryQueue;
    private final String topic;
    private final String sideEffectLogTopicName;
    private final String sideEffectLogConsumerGroupId;
    private final EPExKafkaProducerFactory producerFactory;
    private final ActorExecutorKafkaMetricsCollector metricsCollector;
    private final Set<TopicPartition> topicPartitions = new HashSet();
    private final Set<TopicPartition> newPartitions = new HashSet();
    private final Map<String, EPExKafkaProducer> producers = new ConcurrentHashMap();
    private Stopwatch rebalanceStopwatch = null;
    private final Map<TopicPartition, KafkaActorRequestQueue> kafkaActorRequestQueueMap = Maps.newHashMap();

    /* JADX INFO: Access modifiers changed from: package-private */
    public ConsumerProducersImpl(String str, String str2, String str3, KafkaConsumerFactory kafkaConsumerFactory, EPExKafkaProducerFactory ePExKafkaProducerFactory, InMemoryActorRequestQueue inMemoryActorRequestQueue, ActorExecutorKafkaMetricsCollector actorExecutorKafkaMetricsCollector) {
        this.topic = (String) Objects.requireNonNull(str);
        this.sideEffectLogTopicName = (String) Objects.requireNonNull(str2);
        this.sideEffectLogConsumerGroupId = (String) Objects.requireNonNull(str3);
        this.consumerFactory = (KafkaConsumerFactory) Objects.requireNonNull(kafkaConsumerFactory);
        this.producerFactory = (EPExKafkaProducerFactory) Objects.requireNonNull(ePExKafkaProducerFactory);
        this.inMemoryQueue = (InMemoryActorRequestQueue) Objects.requireNonNull(inMemoryActorRequestQueue);
        this.metricsCollector = (ActorExecutorKafkaMetricsCollector) Objects.requireNonNull(actorExecutorKafkaMetricsCollector);
    }

    private void assignPartition(TopicPartition topicPartition) {
        this.topicPartitions.add(topicPartition);
    }

    private EPExKafkaProducer newProducer(String str) {
        return this.producerFactory.createTransactionalKafkaProducer(str);
    }

    private void removePartition(TopicPartition topicPartition) {
        this.topicPartitions.remove(topicPartition);
        int size = this.producers.size();
        String transactionalId = getTransactionalId(topicPartition);
        this.kafkaActorRequestQueueMap.remove(topicPartition);
        Optional.ofNullable(this.producers.remove(transactionalId)).ifPresent(this::close);
        int size2 = this.producers.size();
        if (size <= 0 || size2 != 0) {
            return;
        }
        subscribe();
    }

    private void close(EPExKafkaProducer ePExKafkaProducer) {
        ePExKafkaProducer.close();
    }

    Set<TopicPartition> getTopicPartitions() {
        return Collections.unmodifiableSet(this.topicPartitions);
    }

    public void subscribe() {
        getConsumer().subscribe(Collections.singletonList(this.topic), this);
    }

    private Consumer<String, String> getConsumer() {
        if (this.consumer == null) {
            this.consumer = this.consumerFactory.createConsumer();
        }
        return this.consumer;
    }

    public Optional<PendingActorRequest> poll() {
        PendingActorRequest poll = this.inMemoryQueue.poll();
        if (poll != null) {
            return Optional.of(poll);
        }
        ConsumerRecords poll2 = getConsumer().poll(POLLING_DURATION);
        if (poll2.isEmpty()) {
            return Optional.empty();
        }
        if (poll2.count() != 1) {
            throw new IllegalStateException("Polled " + poll2.count() + " records from " + getTopic() + " when only polling one record at a time is supported.");
        }
        ConsumerRecord consumerRecord = (ConsumerRecord) poll2.iterator().next();
        TopicPartition topicPartition = new TopicPartition(consumerRecord.topic(), consumerRecord.partition());
        initExternalQueue(topicPartition);
        Optional<PendingActorRequest> of = Optional.of(new KafkaQueuePendingActorRequest(consumerRecord, this.newPartitions.contains(new TopicPartition(getTopic(), consumerRecord.partition()))));
        this.newPartitions.remove(topicPartition);
        return of;
    }

    public void onPartitionsRevoked(Collection<TopicPartition> collection) {
        startRebalance();
        collection.forEach(this::removePartition);
    }

    public void onPartitionsLost(Collection<TopicPartition> collection) {
        onPartitionsRevoked(collection);
    }

    public void onPartitionsAssigned(Collection<TopicPartition> collection) {
        this.metricsCollector.startProducerFencedTimer();
        collection.forEach(this::assignPartition);
        endRebalance(collection);
    }

    public ConsumerGroupMetadata getConsumerGroupMetadata() {
        return getConsumer().groupMetadata();
    }

    public String getGroupId() {
        return getConsumerGroupMetadata().groupId();
    }

    public String getTopic() {
        return this.topic;
    }

    public void closeProducers() {
        this.kafkaActorRequestQueueMap.clear();
        this.producers.values().forEach(this::close);
    }

    public void closeConsumer() {
        getConsumer().close();
    }

    public void resetToOffset(int i, long j) {
        TopicPartition topicPartition = new TopicPartition(getTopic(), i);
        getConsumer().seek(topicPartition, j);
        this.newPartitions.add(topicPartition);
    }

    public ExternalActorRequestQueue getExternalQueue(TopicPartition topicPartition) {
        return getKafkaActorRequestQueue(topicPartition);
    }

    public SideEffectLogQueue getSideEffectLogQueue(TopicPartition topicPartition) {
        return getKafkaActorRequestQueue(topicPartition);
    }

    private KafkaActorRequestQueue getKafkaActorRequestQueue(TopicPartition topicPartition) {
        return this.kafkaActorRequestQueueMap.computeIfAbsent(topicPartition, topicPartition2 -> {
            return KafkaActorRequestQueue.create(topicPartition2, this.sideEffectLogTopicName, this.sideEffectLogConsumerGroupId, this.producers.get(getTransactionalId(topicPartition)));
        });
    }

    public InMemoryActorRequestQueue getInMemoryQueue() {
        return this.inMemoryQueue;
    }

    public void initExternalQueue(TopicPartition topicPartition) {
        String transactionalId = getTransactionalId(topicPartition);
        if (this.producers.containsKey(transactionalId)) {
            return;
        }
        this.producers.put(transactionalId, newProducer(transactionalId));
    }

    private void startRebalance() {
        if (this.rebalanceStopwatch == null) {
            this.rebalanceStopwatch = new Stopwatch();
        }
    }

    private void endRebalance(Collection<TopicPartition> collection) {
        long measureMillis = this.rebalanceStopwatch != null ? this.rebalanceStopwatch.measureMillis() : 0L;
        this.rebalanceStopwatch = null;
        this.newPartitions.addAll(collection);
        recordRebalanceTime(measureMillis);
    }

    void recordRebalanceTime(long j) {
        this.metricsCollector.recordRebalanceDuration(j);
    }
}
