package com.appiancorp.expr.server.environment.epex.metadata.kafka;

import com.appian.kafka.KafkaConsumerProcessor;
import com.appian.kafka.KafkaTopicManager;
import com.appian.komodo.topology.KafkaTopology;
import com.appiancorp.expr.server.environment.epex.EPExConfiguration;
import com.appiancorp.expr.server.environment.epex.kafka.MultiTenantTopicManager;
import com.appiancorp.expr.server.environment.epex.metadata.ProcessMetadata;
import com.appiancorp.suite.cfg.ConfigurationFactory;
import java.util.List;
import java.util.Properties;
import org.apache.log4j.Logger;
import org.springframework.transaction.CannotCreateTransactionException;

/* loaded from: input_file:com/appiancorp/expr/server/environment/epex/metadata/kafka/EPExMetadataKafkaConsumer.class */
public class EPExMetadataKafkaConsumer implements KafkaConsumerProcessor<ProcessMetadata> {
    public static final String DEFAULT_CONSUMER_GROUP_NAME = "processmetadata-0";
    private final EPExMetadataKafkaRdbmsHelper emkrh;
    private final KafkaTopology topology;
    private static final Logger LOG = Logger.getLogger(EPExMetadataKafkaConsumer.class);
    private static final int MAX_ERRORS_PER_PROCESS = ((EPExConfiguration) ConfigurationFactory.getConfiguration(EPExConfiguration.class)).getCountStoredErrorsPerProcess();

    public EPExMetadataKafkaConsumer(KafkaTopicManager kafkaTopicManager, String str, EPExMetadataKafkaMetrics ePExMetadataKafkaMetrics, EPExMetadataKafkaConsumerConfig ePExMetadataKafkaConsumerConfig, String str2, EPExMetadataKafkaRdbmsHelper ePExMetadataKafkaRdbmsHelper) {
        kafkaTopicManager.registerQueueConsumer(str, str, ePExMetadataKafkaConsumerConfig.getNumMessagesPerPoll(), ePExMetadataKafkaConsumerConfig.getHighWaterMarkQueueSize(), ePExMetadataKafkaConsumerConfig.getHighWaterMarkTimeSeconds(), ePExMetadataKafkaMetrics, this, str2);
        this.emkrh = ePExMetadataKafkaRdbmsHelper;
        this.topology = kafkaTopicManager.getTopicManagerSelector().selectTopology(str);
    }

    public int processMessages(List<ProcessMetadata> list) {
        try {
            this.emkrh.persistMetadataToRdbms(list);
        } catch (CannotCreateTransactionException e) {
            if (!LOG.isDebugEnabled()) {
                return 0;
            }
            LOG.debug("Unable to connect to RDBMS when persisting process metadata. Failed due to exception: [" + e.getMessage() + "]. Will try to write data again.", e);
            return 0;
        } catch (Exception e2) {
            LOG.error("An exception occurred when writing process metadata to database: [" + e2.getMessage() + "]. Some errors may not be persisted.", e2);
        }
        return list.size();
    }

    public void onDeadLetteringDataItems(List<ProcessMetadata> list) {
    }

    public Class<ProcessMetadata> getSupportedMessageType() {
        return ProcessMetadata.class;
    }

    public void overrideConsumerProperties(Properties properties) {
        properties.setProperty("bootstrap.servers", String.join(",", this.topology.getKafkaHosts()));
        MultiTenantTopicManager.maybeAddSecurityConfigs(properties);
    }
}
