package com.appian.dl.replicator;

import com.appian.dl.core.base.Clock;
import com.appian.dl.core.base.ClockSystemImpl;
import com.appian.dl.replicator.ReplicationResult;
import com.appian.dl.replicator.UpsertResult;
import com.appian.dl.replicator.stats.PersisterUpsertStats;
import com.appian.dl.replicator.stats.ReplicationStats;
import com.appian.dl.repo.TypedRef;
import com.appian.dl.txn.TxnIdOutOfTrackedRangeException;
import com.appian.dl.txn.TxnMetadata;
import com.appian.dl.txn.TxnOpType;
import com.appian.dl.txn.Txns;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.collect.UnmodifiableIterator;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;

/* loaded from: input_file:com/appian/dl/replicator/ReplicatorImpl.class */
public class ReplicatorImpl implements Replicator {
    private static final Logger LOG = Logger.getLogger(ReplicatorImpl.class);
    private final String idBase;
    private final AtomicLong idSuffix;
    private final ImmutableList<Source> sources;
    private final ImmutableList<Sink> sinks;
    private final String myServerId;
    private final Clock clock;
    private final long maxMillisSinceHeartbeat;
    private final UpsertResultHandler<Object, Object> bulkLoadResultHandler;
    private final UpsertResultHandler<Object, Object> incrementalUpdateResultHandler;

    public ReplicatorImpl(Iterable<Source> iterable, Iterable<Sink> iterable2, String str, long j, UpsertResultHandler<Object, Object> upsertResultHandler, UpsertResultHandler<Object, Object> upsertResultHandler2) {
        this(iterable, iterable2, str, ClockSystemImpl.INSTANCE, j, upsertResultHandler, upsertResultHandler2);
    }

    @VisibleForTesting
    ReplicatorImpl(Iterable<Source> iterable, Iterable<Sink> iterable2, String str, Clock clock, long j, UpsertResultHandler<Object, Object> upsertResultHandler, UpsertResultHandler<Object, Object> upsertResultHandler2) {
        this.idBase = UUID.randomUUID().toString();
        this.idSuffix = new AtomicLong(1L);
        this.sources = ImmutableList.copyOf((Iterable) Preconditions.checkNotNull(iterable));
        this.sinks = ImmutableList.copyOf((Iterable) Preconditions.checkNotNull(iterable2));
        Preconditions.checkArgument(!Strings.isNullOrEmpty(str), "myServerId");
        this.myServerId = str;
        this.clock = (Clock) Preconditions.checkNotNull(clock);
        this.maxMillisSinceHeartbeat = j;
        this.bulkLoadResultHandler = (UpsertResultHandler) Preconditions.checkNotNull(upsertResultHandler);
        this.incrementalUpdateResultHandler = (UpsertResultHandler) Preconditions.checkNotNull(upsertResultHandler2);
    }

    @Override // com.appian.dl.replicator.Replicator
    /* renamed from: getSources, reason: merged with bridge method [inline-methods] */
    public ImmutableList<Source> mo5getSources() {
        return this.sources;
    }

    @Override // com.appian.dl.replicator.Replicator
    /* renamed from: getSinks, reason: merged with bridge method [inline-methods] */
    public ImmutableList<Sink> mo4getSinks() {
        return this.sinks;
    }

    @Override // com.appian.dl.replicator.Replicator
    public ReplicationResult replicate() {
        String nextId = nextId();
        if (LOG.isDebugEnabled()) {
            LOG.debug("[" + nextId + "] Replicating from " + this.sources.size() + " source(s) to " + this.sinks.size() + " sink(s).");
        }
        ReplicationResult.Builder builder = ReplicationResult.builder(this.myServerId, nextId);
        builder.totalTimeSw().start();
        UnmodifiableIterator it = this.sources.iterator();
        while (it.hasNext()) {
            Source source = (Source) it.next();
            UnmodifiableIterator it2 = this.sinks.iterator();
            while (it2.hasNext()) {
                Sink sink = (Sink) it2.next();
                ReplicationStats.Builder statsBuilder = builder.statsBuilder(source.getKey(), sink.getKey());
                statsBuilder.totalTimeSw().start();
                replicate(nextId, source, sink, statsBuilder);
                statsBuilder.totalTimeSw().stop();
            }
        }
        builder.totalTimeSw().stop();
        return builder.build();
    }

    private ReplicationState replicate(String str, Source source, Sink sink, ReplicationStats.Builder builder) {
        long nanoTime = System.nanoTime();
        String key = source.getKey();
        if (LOG.isDebugEnabled()) {
            LOG.debug(logMsgPrefix(str, source, sink) + "Starting replication. src=" + source + ", sink=" + sink);
        }
        try {
            if (LOG.isDebugEnabled()) {
                LOG.debug(logMsgPrefix(str, source, sink) + "Locking sink for replication. server=" + this.myServerId);
            }
            AtomicReference<ReplicationState> atomicReference = new AtomicReference<>(sink.lockForReplication(key, this.myServerId, this.maxMillisSinceHeartbeat));
            if (LOG.isDebugEnabled()) {
                LOG.debug(logMsgPrefix(str, source, sink) + "Successfully locked sink for replication: " + atomicReference.get());
            }
            Timestamp nowTs = this.clock.nowTs();
            try {
                try {
                    Long latestReplicatedTxnId = atomicReference.get().getLatestReplicatedTxnId();
                    if (latestReplicatedTxnId == null) {
                        if (LOG.isInfoEnabled()) {
                            LOG.info(logMsgPrefix(str, source, sink) + "BULK LOAD needed (sink has no data).");
                        }
                        clearAndReplicateAll(str, source, sink, builder, atomicReference, nowTs);
                        ReplicationState replicationState = atomicReference.get();
                        if (0 != 0) {
                            if (LOG.isInfoEnabled()) {
                                LOG.info(logMsgPrefix(str, source, sink) + "Error. state=" + atomicReference.get());
                            }
                        } else if (LOG.isDebugEnabled()) {
                            LOG.debug(logMsgPrefix(str, source, sink) + "Success. state=" + atomicReference.get());
                        }
                        if (builder.getActionTaken().isPresent()) {
                            try {
                                sink.complete((ReplicationAction) builder.getActionTaken().get());
                            } catch (Throwable th) {
                                LOG.error("Error from complete()", th);
                            }
                        }
                        try {
                            atomicReference.set(sink.unlockForReplication(atomicReference.get()));
                            Level level = 1 != 0 ? Level.INFO : Level.DEBUG;
                            if (LOG.isEnabledFor(level)) {
                                LOG.log(level, logMsgPrefix(str, source, sink) + "Done in " + TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - nanoTime) + " ms. state=" + atomicReference.get());
                            }
                        } catch (Throwable th2) {
                            LOG.error("Could not unlock sink. state=" + atomicReference.get(), th2);
                        }
                        return replicationState;
                    }
                    Long valueOf = Long.valueOf(source.getNextTxnToReplicate(latestReplicatedTxnId.longValue(), sink.getSynchronousReplicationTxnIds(key, latestReplicatedTxnId.longValue())));
                    if (valueOf.longValue() > atomicReference.get().getLatestReplicatedTxnId().longValue() && LOG.isInfoEnabled()) {
                        LOG.info(logMsgPrefix(str, source, sink) + "sync replication txId is fast-forwarded from " + atomicReference.get().getLatestReplicatedTxnId() + " to " + valueOf);
                    }
                    try {
                        List<TxnMetadata<Object, Object>> txns = source.getTxns(valueOf.longValue(), source.getTxnsBatchSize());
                        if (!txns.isEmpty()) {
                            replicateMissing(str, source, sink, builder, atomicReference, txns, nowTs);
                            ReplicationState replicationState2 = atomicReference.get();
                            if (0 != 0) {
                                if (LOG.isInfoEnabled()) {
                                    LOG.info(logMsgPrefix(str, source, sink) + "Error. state=" + atomicReference.get());
                                }
                            } else if (LOG.isDebugEnabled()) {
                                LOG.debug(logMsgPrefix(str, source, sink) + "Success. state=" + atomicReference.get());
                            }
                            if (builder.getActionTaken().isPresent()) {
                                try {
                                    sink.complete((ReplicationAction) builder.getActionTaken().get());
                                } catch (Throwable th3) {
                                    LOG.error("Error from complete()", th3);
                                }
                            }
                            try {
                                atomicReference.set(sink.unlockForReplication(atomicReference.get()));
                                Level level2 = 1 != 0 ? Level.INFO : Level.DEBUG;
                                if (LOG.isEnabledFor(level2)) {
                                    LOG.log(level2, logMsgPrefix(str, source, sink) + "Done in " + TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - nanoTime) + " ms. state=" + atomicReference.get());
                                }
                            } catch (Throwable th4) {
                                LOG.error("Could not unlock sink. state=" + atomicReference.get(), th4);
                            }
                            return replicationState2;
                        }
                        if (LOG.isDebugEnabled()) {
                            LOG.debug(logMsgPrefix(str, source, sink) + "Source doesn't have any unreplicated changes for this sink, so nothing to do.");
                        }
                        atomicReference.set(ReplicationState.builder(atomicReference.get()).upToDateAsOf(nowTs).latestReplicatedTxnId(valueOf).build());
                        ReplicationState replicationState3 = atomicReference.get();
                        if (0 != 0) {
                            if (LOG.isInfoEnabled()) {
                                LOG.info(logMsgPrefix(str, source, sink) + "Error. state=" + atomicReference.get());
                            }
                        } else if (LOG.isDebugEnabled()) {
                            LOG.debug(logMsgPrefix(str, source, sink) + "Success. state=" + atomicReference.get());
                        }
                        if (builder.getActionTaken().isPresent()) {
                            try {
                                sink.complete((ReplicationAction) builder.getActionTaken().get());
                            } catch (Throwable th5) {
                                LOG.error("Error from complete()", th5);
                            }
                        }
                        try {
                            atomicReference.set(sink.unlockForReplication(atomicReference.get()));
                            Level level3 = 0 != 0 ? Level.INFO : Level.DEBUG;
                            if (LOG.isEnabledFor(level3)) {
                                LOG.log(level3, logMsgPrefix(str, source, sink) + "Done in " + TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - nanoTime) + " ms. state=" + atomicReference.get());
                            }
                        } catch (Throwable th6) {
                            LOG.error("Could not unlock sink. state=" + atomicReference.get(), th6);
                        }
                        return replicationState3;
                    } catch (TxnIdOutOfTrackedRangeException e) {
                        if (LOG.isInfoEnabled()) {
                            LOG.info(logMsgPrefix(str, source, sink) + "BULK LOAD needed (sink is too far behind the source). " + e.toString());
                        }
                        clearAndReplicateAll(str, source, sink, builder, atomicReference, nowTs);
                        ReplicationState replicationState4 = atomicReference.get();
                        if (0 != 0) {
                            if (LOG.isInfoEnabled()) {
                                LOG.info(logMsgPrefix(str, source, sink) + "Error. state=" + atomicReference.get());
                            }
                        } else if (LOG.isDebugEnabled()) {
                            LOG.debug(logMsgPrefix(str, source, sink) + "Success. state=" + atomicReference.get());
                        }
                        if (builder.getActionTaken().isPresent()) {
                            try {
                                sink.complete((ReplicationAction) builder.getActionTaken().get());
                            } catch (Throwable th7) {
                                LOG.error("Error from complete()", th7);
                            }
                        }
                        try {
                            atomicReference.set(sink.unlockForReplication(atomicReference.get()));
                            Level level4 = 1 != 0 ? Level.INFO : Level.DEBUG;
                            if (LOG.isEnabledFor(level4)) {
                                LOG.log(level4, logMsgPrefix(str, source, sink) + "Done in " + TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - nanoTime) + " ms. state=" + atomicReference.get());
                            }
                        } catch (Throwable th8) {
                            LOG.error("Could not unlock sink. state=" + atomicReference.get(), th8);
                        }
                        return replicationState4;
                    }
                } catch (Throwable th9) {
                    throw Throwables.propagate(th9);
                }
            } catch (Throwable th10) {
                if (0 != 0) {
                    if (LOG.isInfoEnabled()) {
                        LOG.info(logMsgPrefix(str, source, sink) + "Error. state=" + atomicReference.get());
                    }
                } else if (LOG.isDebugEnabled()) {
                    LOG.debug(logMsgPrefix(str, source, sink) + "Success. state=" + atomicReference.get());
                }
                if (builder.getActionTaken().isPresent()) {
                    try {
                        sink.complete((ReplicationAction) builder.getActionTaken().get());
                    } catch (Throwable th11) {
                        LOG.error("Error from complete()", th11);
                    }
                }
                try {
                    atomicReference.set(sink.unlockForReplication(atomicReference.get()));
                    Level level5 = 0 != 0 ? Level.INFO : Level.DEBUG;
                    if (LOG.isEnabledFor(level5)) {
                        LOG.log(level5, logMsgPrefix(str, source, sink) + "Done in " + TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - nanoTime) + " ms. state=" + atomicReference.get());
                    }
                } catch (Throwable th12) {
                    LOG.error("Could not unlock sink. state=" + atomicReference.get(), th12);
                }
                throw th10;
            }
        } catch (ReplicationInProgressException e2) {
            ReplicationState state = e2.getState();
            if (LOG.isDebugEnabled()) {
                LOG.debug(logMsgPrefix(str, source, sink) + "Replication is already in progress from another server, so nothing to do. (" + state + ")");
            }
            return state;
        }
    }

    private void clearAndReplicateAll(String str, Source source, Sink sink, ReplicationStats.Builder builder, AtomicReference<ReplicationState> atomicReference, Timestamp timestamp) {
        builder.actionTaken(ReplicationAction.BULK_LOAD);
        if (atomicReference.get().getUpToDateAsOf() != null) {
            atomicReference.set(ReplicationState.builder(atomicReference.get()).upToDateAsOf(null).latestReplicatedTxnId(null).build());
            if (LOG.isDebugEnabled()) {
                LOG.debug(logMsgPrefix(str, source, sink) + "  State: " + atomicReference.get());
            }
            atomicReference.set(sink.replicationHeartbeat(atomicReference.get()));
        }
        sink.prepare(ReplicationAction.BULK_LOAD);
        clearAll(str, source, sink, builder, atomicReference);
        atomicReference.set(ReplicationState.builder(atomicReference.get()).upToDateAsOf(timestamp).latestReplicatedTxnId(Long.valueOf(replicateAll(str, source, sink, builder, atomicReference))).replicatingServerHeartbeat(null).build());
    }

    private void clearAll(String str, Source source, Sink sink, ReplicationStats.Builder builder, AtomicReference<ReplicationState> atomicReference) {
        if (LOG.isInfoEnabled()) {
            LOG.info(logMsgPrefix(str, source, sink) + "Deleting all data from sink.");
        }
        for (Loader<Object, Object, Object> loader : source.getLoaders()) {
            Object type = loader.getType();
            Set<Object> allSupportedTypes = loader.getAllSupportedTypes();
            Iterator<Persister<Object, Object, Object>> it = Replicators.getPersistersForType(sink, type).iterator();
            while (it.hasNext()) {
                new TimingPersister(it.next(), builder.deleteTimeSw()).deleteDataOfType(allSupportedTypes, ReplicationAction.BULK_LOAD);
            }
        }
    }

    private long replicateAll(String str, Source source, Sink sink, ReplicationStats.Builder builder, AtomicReference<ReplicationState> atomicReference) {
        long maxTxnId = source.getMaxTxnId();
        UpsertResult.Builder builder2 = UpsertResult.builder();
        if (LOG.isInfoEnabled()) {
            LOG.info(logMsgPrefix(str, source, sink) + "Replicating all data from source.");
        }
        Iterator<Loader<Object, Object, Object>> it = source.getLoaders().iterator();
        while (it.hasNext()) {
            TimingLoader timingLoader = new TimingLoader(it.next(), builder.loadTimeSw());
            Object type = timingLoader.getType();
            List<Persister<Object, Object, Object>> persistersForType = Replicators.getPersistersForType(sink, type);
            if (!persistersForType.isEmpty()) {
                if (LOG.isInfoEnabled()) {
                    LOG.info(logMsgPrefix(str, source, sink) + "  Replicating all data of type [" + type + "] via " + persistersForType.size() + " persister(s)");
                }
                UnmodifiableIterator partition = Iterators.partition(timingLoader.getAll(), timingLoader.getLoadingBatchSize());
                while (partition.hasNext()) {
                    List list = (List) partition.next();
                    builder.bulkLoadStats().incrementNumLoaded(list.size());
                    for (Persister<Object, Object, Object> persister : persistersForType) {
                        if (LOG.isDebugEnabled()) {
                            LOG.debug(logMsgPrefix(str, source, sink) + "  Upserting batch of " + list.size() + " items via " + persister.getClass().getSimpleName());
                        }
                        UpsertResponse<Object, Object> upsert = upsert(persister, TxnIdAndValue.transform(maxTxnId, list), ReplicationAction.BULK_LOAD);
                        this.bulkLoadResultHandler.handleIndividualResult(str, source, sink, upsert.result);
                        builder2.add(upsert.result);
                        PersisterUpsertStats persisterUpsertStats = upsert.stats;
                        builder.bulkLoadStats().incrementNumUpserts(persisterUpsertStats.getNumUpserts());
                        builder.addUpsertPrepareTimeMs(persisterUpsertStats.getPrepareTimeMs());
                        builder.addUpsertPersistTimeMs(persisterUpsertStats.getPersistTimeMs());
                        atomicReference.set(ReplicationState.builder(atomicReference.get()).replicatingServerHeartbeat(this.clock.nowTs()).build());
                        if (LOG.isDebugEnabled()) {
                            LOG.debug(logMsgPrefix(str, source, sink) + "  State: " + atomicReference.get());
                        }
                        atomicReference.set(sink.replicationHeartbeat(atomicReference.get()));
                    }
                }
            } else if (LOG.isDebugEnabled()) {
                LOG.debug(logMsgPrefix(str, source, sink) + "  No persister configured for data of type [" + type + "], so not replicating data of that type.");
            }
        }
        this.bulkLoadResultHandler.handleOverallResult(str, source, sink, builder2.build());
        return maxTxnId;
    }

    private void replicateMissing(String str, Source source, Sink sink, ReplicationStats.Builder builder, AtomicReference<ReplicationState> atomicReference, List<TxnMetadata<Object, Object>> list, Timestamp timestamp) {
        int size = list.size();
        builder.actionTaken(ReplicationAction.INCREMENTAL_UPDATE);
        builder.incrementalUpdateStats().incrementNumTxns(size);
        if (LOG.isInfoEnabled()) {
            LOG.info(logMsgPrefix(str, source, sink) + "INCREMENTAL UPDATE needed (sink is missing " + (size < source.getTxnsBatchSize() ? "" + size : ">= " + size) + " transactions).");
        }
        sink.prepare(ReplicationAction.INCREMENTAL_UPDATE);
        HashSet newHashSet = Sets.newHashSet();
        UpsertResult.Builder builder2 = UpsertResult.builder();
        for (int i = 0; i < list.size(); i++) {
            TxnMetadata<Object, Object> txnMetadata = list.get(i);
            Map deletedIdsByType = Txns.getDeletedIdsByType(txnMetadata);
            Map upsertedIdsByType = Txns.getUpsertedIdsByType(txnMetadata);
            if (LOG.isDebugEnabled()) {
                LOG.debug(logMsgPrefix(str, source, sink) + "Replicating transaction: id=" + txnMetadata.getId() + ", numOps=" + txnMetadata.getOps().size() + ", numUpserts=" + totalSize(upsertedIdsByType) + ", numDeletes=" + totalSize(deletedIdsByType));
            }
            for (Map.Entry entry : deletedIdsByType.entrySet()) {
                Object key = entry.getKey();
                Set set = (Set) entry.getValue();
                builder.incrementalUpdateStats().incrementNumDeletes(set.size());
                newHashSet.removeAll(set);
                for (Persister<Object, Object, Object> persister : Replicators.getPersistersForType(sink, key)) {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug(logMsgPrefix(str, source, sink) + "  Deleting " + set.size() + " items of type [" + key + "] via " + persister.getClass().getSimpleName());
                    }
                    new TimingPersister(persister, builder.deleteTimeSw()).delete(TxnIdAndTypedRef.transform(txnMetadata.getId(), set), ReplicationAction.INCREMENTAL_UPDATE);
                    atomicReference.set(ReplicationState.builder(atomicReference.get()).replicatingServerHeartbeat(this.clock.nowTs()).build());
                    atomicReference.set(sink.replicationHeartbeat(atomicReference.get()));
                }
            }
            for (Map.Entry entry2 : upsertedIdsByType.entrySet()) {
                Object key2 = entry2.getKey();
                Set set2 = (Set) entry2.getValue();
                ImmutableSet copyOf = ImmutableSet.copyOf(Sets.difference(set2, newHashSet));
                if (copyOf.size() < set2.size()) {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug(logMsgPrefix(str, source, sink) + "  Fast-forwarding over " + (set2.size() - copyOf.size()) + " already upserted objects");
                    }
                    if (copyOf.isEmpty()) {
                    }
                }
                Optional<Loader<Object, Object, Object>> loaderForType = Replicators.getLoaderForType(source, key2);
                if (loaderForType.isPresent()) {
                    Loader loader = (Loader) loaderForType.get();
                    List<Persister<Object, Object, Object>> persistersForType = Replicators.getPersistersForType(sink, key2);
                    if (!persistersForType.isEmpty()) {
                        if (LOG.isDebugEnabled()) {
                            LOG.debug(logMsgPrefix(str, source, sink) + "  Loading " + copyOf.size() + " items of type [" + key2 + "] via " + loader.getClass().getSimpleName());
                        }
                        TimingLoader timingLoader = new TimingLoader(loader, builder.loadTimeSw());
                        UnmodifiableIterator partition = Iterators.partition(timingLoader.get(copyOf), timingLoader.getLoadingBatchSize());
                        while (partition.hasNext()) {
                            List list2 = (List) partition.next();
                            builder.incrementalUpdateStats().incrementNumLoaded(list2.size());
                            for (Persister<Object, Object, Object> persister2 : persistersForType) {
                                if (LOG.isDebugEnabled()) {
                                    LOG.debug(logMsgPrefix(str, source, sink) + "  Upserting batch of " + list2.size() + " items of type [" + key2 + "] via " + persister2.getClass().getSimpleName());
                                }
                                ArrayList newArrayListWithCapacity = Lists.newArrayListWithCapacity(list2.size());
                                for (Object obj : list2) {
                                    newArrayListWithCapacity.add(new TxnIdAndValue<>(getItemTxnId(timingLoader.getTypedRef(obj), list, i), obj));
                                }
                                UpsertResponse<Object, Object> upsert = upsert(persister2, newArrayListWithCapacity, ReplicationAction.INCREMENTAL_UPDATE);
                                this.incrementalUpdateResultHandler.handleIndividualResult(str, source, sink, upsert.result);
                                builder2.add(upsert.result);
                                PersisterUpsertStats persisterUpsertStats = upsert.stats;
                                builder.incrementalUpdateStats().incrementNumUpserts(persisterUpsertStats.getNumUpserts());
                                builder.addUpsertPrepareTimeMs(persisterUpsertStats.getPrepareTimeMs());
                                builder.addUpsertPersistTimeMs(persisterUpsertStats.getPersistTimeMs());
                                atomicReference.set(ReplicationState.builder(atomicReference.get()).replicatingServerHeartbeat(this.clock.nowTs()).build());
                                atomicReference.set(sink.replicationHeartbeat(atomicReference.get()));
                            }
                        }
                        newHashSet.addAll(copyOf);
                    } else if (LOG.isDebugEnabled()) {
                        LOG.debug(logMsgPrefix(str, source, sink) + "  No persister configured for data of type [" + key2 + "], so not replicating these upserts: " + upsertedIdsByType.get(key2));
                    }
                } else if (LOG.isDebugEnabled()) {
                    LOG.debug(logMsgPrefix(str, source, sink) + "  No loader configured for data of type [" + key2 + "], so not replicating these upserts: " + upsertedIdsByType.get(key2));
                }
            }
            atomicReference.set(ReplicationState.builder(atomicReference.get()).upToDateAsOf(txnMetadata.getTs()).latestReplicatedTxnId(Long.valueOf(txnMetadata.getId())).replicatingServerHeartbeat(this.clock.nowTs()).build());
            if (LOG.isDebugEnabled()) {
                LOG.debug(logMsgPrefix(str, source, sink) + "  State: " + atomicReference.get());
            }
            atomicReference.set(sink.replicationHeartbeat(atomicReference.get()));
        }
        this.incrementalUpdateResultHandler.handleOverallResult(str, source, sink, builder2.build());
        atomicReference.set(ReplicationState.builder(atomicReference.get()).upToDateAsOf(size < source.getTxnsBatchSize() ? timestamp : atomicReference.get().getUpToDateAsOf()).replicatingServerHeartbeat(null).build());
    }

    private UpsertResponse<Object, Object> upsert(Persister<Object, Object, Object> persister, List<TxnIdAndValue<Object>> list, ReplicationAction replicationAction) {
        return persister.upsert(list, replicationAction);
    }

    private long getItemTxnId(TypedRef<Object, Object> typedRef, List<TxnMetadata<Object, Object>> list, int i) {
        long id = list.get(i).getId();
        for (int i2 = i + 1; i2 < list.size(); i2++) {
            TxnMetadata<Object, Object> txnMetadata = list.get(i2);
            if (txnMetadata.containsRef(TxnOpType.DELETE, typedRef)) {
                return id;
            }
            if (txnMetadata.containsRef(TxnOpType.UPSERT, typedRef)) {
                id = txnMetadata.getId();
            }
        }
        return id;
    }

    private String nextId() {
        return this.idBase + "-" + this.idSuffix.getAndIncrement();
    }

    private static String logMsgPrefix(String str, Source source, Sink sink) {
        return new StringBuilder(100).append("[").append(str).append("] [").append(source.getKey()).append("=>").append(sink.getKey()).append("] ").toString();
    }

    private static <K, E> int totalSize(Map<K, Set<E>> map) {
        int i = 0;
        for (Set<E> set : map.values()) {
            if (set != null) {
                i += set.size();
            }
        }
        return i;
    }
}
