package com.appian.dl.repo.es;

import com.appian.dl.core.base.Clock;
import com.appian.dl.core.base.ClockSystemImpl;
import com.appian.dl.core.base.Timestamps;
import com.appian.dl.replicator.ReplicationAction;
import com.appian.dl.replicator.ReplicationInProgressException;
import com.appian.dl.replicator.ReplicationState;
import com.appian.dl.replicator.Sink;
import com.appian.dl.replicator.SynchronousReplicationState;
import com.appian.dl.repo.Entity;
import com.appian.dl.repo.PersistenceMetadataImpl;
import com.appian.dl.repo.Schema;
import com.appian.dl.repo.cdt.CdtEntity;
import com.appian.dl.repo.cdt.CdtRepo;
import com.appian.dl.repo.cdt.CdtRepoLazyInitializer;
import com.appian.dl.repo.cdt.CdtSchema;
import com.appian.dl.repo.cdt.ReplicationStateCdt;
import com.appian.dl.repo.cdt.SynchronousReplicationStateCdt;
import com.appian.dl.repo.es.client.ClientProvider;
import com.appian.dl.repo.es.schema.SchemaGenerator;
import com.appiancorp.suiteapi.type.Datatype;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import java.io.IOException;
import java.sql.Timestamp;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.stream.Stream;
import org.apache.log4j.Logger;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.document.DocumentField;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.SortOrder;

/* loaded from: input_file:com/appian/dl/repo/es/SinkEsImpl.class */
public abstract class SinkEsImpl implements Sink {
    private static final Logger LOG = Logger.getLogger(SinkEsImpl.class);
    public static final int SYNC_REP_QUERY_MAX_RESULT = 10000;
    private final String indexKey;
    private final ClientProvider clientProvider;
    private final Supplier<CdtRepo> repoSupplier;
    private final Set<Datatype> sinkPersistedDts;
    private final Entity<Datatype> replicationStateEntity;
    private final Entity<Datatype> synchronousReplicationStateEntity;
    private final Schema<Datatype> schema;
    private final String replicationStateTypeKey;
    private final String synchronousReplicationStateTypeKey;
    private Clock clock;

    public SinkEsImpl(String str, ClientProvider clientProvider, CdtRepo cdtRepo, Set<Datatype> set) {
        Preconditions.checkArgument((str == null || str.isEmpty()) ? false : true, "indexKey");
        this.indexKey = str;
        this.clientProvider = (ClientProvider) Preconditions.checkNotNull(clientProvider, "clientProvider");
        Preconditions.checkNotNull(cdtRepo, "repo");
        this.sinkPersistedDts = (Set) Preconditions.checkNotNull(set, "sinkPersistedDts");
        Datatype datatype = ReplicationStateCdt.getDatatype();
        this.replicationStateEntity = CdtEntity.entity(datatype, PersistenceMetadataImpl.builder().idPropertyName("sourceKey").isGeneratedId(false).build());
        this.replicationStateTypeKey = SchemaGenerator.getTypeKey(datatype);
        Datatype datatype2 = SynchronousReplicationStateCdt.getDatatype();
        this.synchronousReplicationStateEntity = CdtEntity.entity(datatype2, PersistenceMetadataImpl.builder().idPropertyName("id").ttlMsBeforeDeletion(TimeUnit.MINUTES.toMillis(15L)).build());
        this.synchronousReplicationStateTypeKey = SchemaGenerator.getTypeKey(datatype2);
        this.schema = CdtSchema.builder().addEntities(new Entity[]{this.replicationStateEntity, this.synchronousReplicationStateEntity}).build();
        this.repoSupplier = new CdtRepoLazyInitializer(cdtRepo, this.schema);
        this.clock = ClockSystemImpl.INSTANCE;
    }

    @VisibleForTesting
    void setClock(Clock clock) {
        this.clock = (Clock) Preconditions.checkNotNull(clock);
    }

    private RestHighLevelClient getClient() {
        getRepo();
        return this.clientProvider.get();
    }

    private CdtRepo getRepo() {
        return this.repoSupplier.get();
    }

    public String getKey() {
        return this.indexKey;
    }

    public void prepare(ReplicationAction replicationAction) {
        IndexManager.disableBackgroundRefresh(getClient(), this.indexKey);
    }

    public void complete(ReplicationAction replicationAction) {
        IndexManager.setBackgroundRefreshInterval(getClient(), this.indexKey, IndexSettings.DEFAULT_REFRESH_INTERVAL.getSeconds());
        IndexManager.refreshIndex(getClient(), this.indexKey);
    }

    public ReplicationState lockForReplication(String str, String str2, long j) throws ReplicationInProgressException {
        ReplicationState build;
        ReplicationState replicationState = getReplicationState(str);
        if (replicationState == null) {
            build = new ReplicationState(str, -2L, 0L, (Timestamp) null, (Long) null, str2, this.clock.nowTs());
            if (LOG.isDebugEnabled()) {
                LOG.debug(logMsgPrefix() + "No previously saved replication state.");
            }
        } else if (replicationState.isReplicationInProgress()) {
            Timestamp replicatingServerHeartbeat = replicationState.getReplicatingServerHeartbeat();
            Timestamp nowTs = this.clock.nowTs();
            if (!replicationState.getReplicatingServer().equals(str2) && !Timestamps.isBehindByMoreThan(replicatingServerHeartbeat, nowTs, j)) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug(logMsgPrefix() + "Replication is already in progress. state=" + replicationState);
                }
                throw new ReplicationInProgressException(replicationState);
            }
            if (LOG.isDebugEnabled()) {
                LOG.debug(logMsgPrefix() + "Replication is in progress, but the heartbeat is old; will attempt to break the lock. state=" + replicationState);
            }
            build = ReplicationState.builder(replicationState).replicatingServer(str2).replicatingServerHeartbeat(this.clock.nowTs()).build();
        } else {
            build = ReplicationState.builder(replicationState).replicatingServer(str2).replicatingServerHeartbeat(this.clock.nowTs()).build();
            if (LOG.isDebugEnabled()) {
                LOG.debug(logMsgPrefix() + "Replication is not currently in progress. state=" + replicationState);
            }
        }
        try {
            IndexResponse saveReplicationState = saveReplicationState(build);
            return ReplicationState.builder(build).seqNo(saveReplicationState.getSeqNo()).primaryTerm(saveReplicationState.getPrimaryTerm()).build();
        } catch (VersionConflictEngineException e) {
            LOG.debug(logMsgPrefix() + "Version conflict trying to set replication state: oldState=" + replicationState + ", newState=" + build, e);
            throw new ReplicationInProgressException(getReplicationState(str), e);
        }
    }

    public void appendSynchronousReplicationTxnIds(Map<String, Set<Long>> map) {
        if (map == null || map.isEmpty()) {
            return;
        }
        HashSet<Map> hashSet = new HashSet();
        for (Map.Entry<String, Set<Long>> entry : map.entrySet()) {
            String key = entry.getKey();
            Stream map2 = entry.getValue().stream().map(l -> {
                return new SynchronousReplicationState(key, l.longValue());
            }).map(SynchronousReplicationStateCdt::toMap).map(map3 -> {
                map3.put(EsJsonConstants.APPIAN_TYPE, this.synchronousReplicationStateTypeKey);
                return map3;
            });
            hashSet.getClass();
            map2.forEach((v1) -> {
                r1.add(v1);
            });
        }
        BulkRequest refreshPolicy = new BulkRequest().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
        for (Map map4 : hashSet) {
            refreshPolicy.add(new IndexRequest(this.indexKey).source(map4).create(true).id(map4.get("id").toString()));
        }
        try {
            BulkResponse bulk = getClient().bulk(refreshPolicy, RequestOptions.DEFAULT);
            if (bulk.hasFailures()) {
                throw new IllegalStateException("Failed to store txn ids from synchronous replication: " + bulk.buildFailureMessage() + " txnIds=" + map);
            }
        } catch (IOException e) {
            throw new IllegalStateException("Failed to execute bulk request", e);
        }
    }

    public ImmutableList<Long> getSynchronousReplicationTxnIds(String str, long j) {
        SearchRequest source = new SearchRequest(new String[]{this.indexKey}).source(new SearchSourceBuilder().query(QueryBuilders.boolQuery().filter(QueryBuilders.termQuery(EsJsonConstants.APPIAN_TYPE, this.synchronousReplicationStateTypeKey)).must(QueryBuilders.termQuery("sourceKey", str)).must(QueryBuilders.rangeQuery("txnId").gt(Long.valueOf(j)))).docValueField("txnId").size(SYNC_REP_QUERY_MAX_RESULT).sort("txnId", SortOrder.ASC));
        try {
            SearchHits hits = getClient().search(source, RequestOptions.DEFAULT).getHits();
            ImmutableList.Builder builder = ImmutableList.builder();
            hits.forEach(searchHit -> {
                builder.add(Long.valueOf(((Number) ((DocumentField) searchHit.getFields().get("txnId")).getValue()).longValue()));
            });
            return builder.build();
        } catch (IOException e) {
            throw new IllegalStateException("Failed to search for request [" + source + "] on index [" + this.indexKey + "]", e);
        }
    }

    @VisibleForTesting
    IndexResponse saveReplicationState(ReplicationState replicationState) {
        if (LOG.isDebugEnabled()) {
            LOG.debug(logMsgPrefix() + "Saving state: " + replicationState);
        }
        Map map = ReplicationStateCdt.toMap(replicationState);
        map.put(EsJsonConstants.APPIAN_TYPE, this.replicationStateTypeKey);
        IndexRequest source = new IndexRequest(this.indexKey).id(replicationState.getSourceKey()).source(map);
        if (replicationState.getSeqNo() == -2 && replicationState.getPrimaryTerm() == 0) {
            source.create(true);
        } else {
            source.setIfSeqNo(replicationState.getSeqNo());
            source.setIfPrimaryTerm(replicationState.getPrimaryTerm());
        }
        try {
            return getClient().index(source, RequestOptions.DEFAULT);
        } catch (IOException e) {
            throw new IllegalStateException("Failed to save replication state for index [" + this.indexKey + "]", e);
        }
    }

    public ReplicationState unlockForReplication(ReplicationState replicationState) {
        validatePersistedReplicationStateForUpdate(getReplicationState(replicationState.getSourceKey()), replicationState);
        ReplicationState build = ReplicationState.builder(replicationState).replicatingServer((String) null).replicatingServerHeartbeat((Timestamp) null).build();
        IndexResponse saveReplicationState = saveReplicationState(build);
        return ReplicationState.builder(build).seqNo(saveReplicationState.getSeqNo()).primaryTerm(saveReplicationState.getPrimaryTerm()).build();
    }

    public ReplicationState replicationHeartbeat(ReplicationState replicationState) {
        if (replicationState.getReplicatingServer() == null || replicationState.getReplicatingServerHeartbeat() == null) {
            throw new IllegalArgumentException("The replicating sever address and heartbeat must be non-null. " + replicationState);
        }
        validatePersistedReplicationStateForUpdate(getReplicationState(replicationState.getSourceKey()), replicationState);
        IndexResponse saveReplicationState = saveReplicationState(replicationState);
        return ReplicationState.builder(replicationState).seqNo(saveReplicationState.getSeqNo()).primaryTerm(saveReplicationState.getPrimaryTerm()).build();
    }

    private void validatePersistedReplicationStateForUpdate(ReplicationState replicationState, ReplicationState replicationState2) {
        if (replicationState == null) {
            throw new IllegalStateException("Sink doesn't have persisted state. requestedNewState=" + replicationState2);
        }
        if (!replicationState.isReplicationInProgress()) {
            throw new IllegalStateException("Sink is not locked. persistedState=" + replicationState + ", requestedNewState=" + replicationState2);
        }
        if (!replicationState.getReplicatingServer().equals(replicationState2.getReplicatingServer())) {
            throw new IllegalStateException("Sink is locked by another server. persistedState=" + replicationState + ", requestedNewState=" + replicationState2);
        }
    }

    public ReplicationState getReplicationState(String str) {
        try {
            GetResponse getResponse = getClient().get(new GetRequest(this.indexKey, str), RequestOptions.DEFAULT);
            ReplicationState replicationState = null;
            if (getResponse.isExists()) {
                replicationState = ReplicationStateCdt.fromMap(getResponse.getSourceAsMap(), getResponse.getSeqNo(), getResponse.getPrimaryTerm());
            }
            return replicationState;
        } catch (IOException e) {
            throw new IllegalStateException("Failed to get replication state for index [" + this.indexKey + "]", e);
        }
    }

    public ImmutableMap<String, Optional<Timestamp>> getUpToDateAsOf(Set<String> set) {
        ImmutableMap.Builder builder = ImmutableMap.builder();
        for (String str : set) {
            ReplicationState replicationState = getReplicationState(str);
            builder.put(str, Optional.fromNullable(replicationState == null ? null : replicationState.getUpToDateAsOf()));
        }
        return builder.build();
    }

    public Optional<Timestamp> getOldestUpToDateAsOf(Set<String> set) {
        Timestamp timestamp = null;
        for (Optional optional : getUpToDateAsOf(set).values()) {
            if (!optional.isPresent()) {
                return Optional.absent();
            }
            if (timestamp == null || ((Timestamp) optional.get()).before(timestamp)) {
                timestamp = (Timestamp) optional.get();
            }
        }
        return Optional.fromNullable(timestamp);
    }

    public ImmutableMap<String, Optional<ReplicationAction>> getCurrentReplicationAction(Set<String> set) {
        ImmutableMap.Builder builder = ImmutableMap.builder();
        for (String str : set) {
            ReplicationState replicationState = getReplicationState(str);
            if (replicationState == null || !replicationState.isReplicationInProgress()) {
                builder.put(str, Optional.absent());
            } else {
                builder.put(str, Optional.of(replicationState.getUpToDateAsOf() == null ? ReplicationAction.BULK_LOAD : ReplicationAction.INCREMENTAL_UPDATE));
            }
        }
        return builder.build();
    }

    public void deleteAll() {
        throw new UnsupportedOperationException();
    }

    private String logMsgPrefix() {
        return "[" + getKey() + "] ";
    }
}
