package io.lunes.network;

import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import io.lunes.settings.SynchronizationSettings;
import io.lunes.state.Diff;
import io.lunes.transaction.Transaction;
import io.lunes.transaction.ValidationError;
import io.lunes.utx.UtxBatchOps;
import io.lunes.utx.UtxPool;
import io.netty.channel.Channel;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.ChannelMatcher;
import java.util.concurrent.TimeUnit;
import monix.execution.CancelableFuture;
import monix.execution.Scheduler$;
import monix.execution.schedulers.SchedulerService;
import monix.reactive.Observable;
import scala.MatchError;
import scala.Option$;
import scala.Tuple2;
import scala.collection.Seq;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.util.Either;
import scala.util.Right;

/* compiled from: UtxPoolSynchronizer.scala */
/* loaded from: input_file:io/lunes/network/UtxPoolSynchronizer$.class */
public final class UtxPoolSynchronizer$ {
    public static UtxPoolSynchronizer$ MODULE$;

    static {
        new UtxPoolSynchronizer$();
    }

    public CancelableFuture<BoxedUnit> start(UtxPool utxPool, SynchronizationSettings.UtxSynchronizerSettings utxSynchronizerSettings, ChannelGroup channelGroup, Observable<Tuple2<Channel, Transaction>> observable) {
        SchedulerService singleThread = Scheduler$.MODULE$.singleThread("utx-pool-sync", Scheduler$.MODULE$.singleThread$default$2(), Scheduler$.MODULE$.singleThread$default$3(), Scheduler$.MODULE$.singleThread$default$4());
        Object obj = new Object();
        Cache<K1, V1> build = CacheBuilder.newBuilder().maximumSize(utxSynchronizerSettings.networkTxCacheSize()).expireAfterWrite(utxSynchronizerSettings.networkTxCacheTime().toMillis(), TimeUnit.MILLISECONDS).build();
        return observable.observeOn(singleThread).bufferTimedAndCounted(utxSynchronizerSettings.maxBufferTime(), utxSynchronizerSettings.maxBufferSize()).foreach(seq -> {
            $anonfun$start$1(utxPool, channelGroup, obj, build, seq);
            return BoxedUnit.UNIT;
        }, singleThread);
    }

    public static final /* synthetic */ boolean $anonfun$start$2(Object obj, Cache cache, Tuple2 tuple2) {
        if (tuple2 == null) {
            throw new MatchError(tuple2);
        }
        Transaction transaction = (Transaction) tuple2.mo7432_2();
        boolean isEmpty = Option$.MODULE$.apply(cache.getIfPresent(transaction.id().mo196apply())).isEmpty();
        if (isEmpty) {
            cache.put(transaction.id().mo196apply(), obj);
        }
        return isEmpty;
    }

    public static final /* synthetic */ void $anonfun$start$5(ChannelGroup channelGroup, UtxBatchOps utxBatchOps, Tuple2 tuple2) {
        if (tuple2 == null) {
            throw new MatchError(tuple2);
        }
        Channel channel = (Channel) tuple2.mo7433_1();
        Seq seq = (Seq) tuple2.mo7432_2();
        ChannelMatcher channelMatcher = channel2 -> {
            return channel2 != null ? !channel2.equals(channel) : channel != null;
        };
        seq.foreach(tuple22 -> {
            Tuple2 tuple22;
            if (tuple22 == null) {
                throw new MatchError(tuple22);
            }
            Transaction transaction = (Transaction) tuple22.mo7432_2();
            Either<ValidationError, Tuple2<Object, Diff>> putIfNew = utxBatchOps.putIfNew(transaction);
            return ((putIfNew instanceof Right) && (tuple22 = (Tuple2) ((Right) putIfNew).value()) != null && true == tuple22._1$mcZ$sp()) ? channelGroup.write(RawBytes$.MODULE$.from(transaction), channelMatcher) : BoxedUnit.UNIT;
        });
        BoxedUnit boxedUnit = BoxedUnit.UNIT;
    }

    public static final /* synthetic */ void $anonfun$start$3(ChannelGroup channelGroup, Seq seq, UtxBatchOps utxBatchOps) {
        seq.groupBy(tuple2 -> {
            if (tuple2 != null) {
                return (Channel) tuple2.mo7433_1();
            }
            throw new MatchError(tuple2);
        }).foreach(tuple22 -> {
            $anonfun$start$5(channelGroup, utxBatchOps, tuple22);
            return BoxedUnit.UNIT;
        });
    }

    public static final /* synthetic */ void $anonfun$start$1(UtxPool utxPool, ChannelGroup channelGroup, Object obj, Cache cache, Seq seq) {
        Seq filter = seq.filter(tuple2 -> {
            return BoxesRunTime.boxToBoolean($anonfun$start$2(obj, cache, tuple2));
        });
        if (filter.nonEmpty()) {
            utxPool.batched(utxBatchOps -> {
                $anonfun$start$3(channelGroup, filter, utxBatchOps);
                return BoxedUnit.UNIT;
            });
            channelGroup.flush();
        }
    }

    private UtxPoolSynchronizer$() {
        MODULE$ = this;
    }
}
