package io.lunes.network;

import io.lunes.metrics.Metrics$;
import io.lunes.network.HandshakeHandler;
import io.lunes.network.PipelineInitializer;
import io.lunes.settings.Constants$;
import io.lunes.settings.LunesSettings;
import io.lunes.state.NG;
import io.lunes.transaction.LastBlockInfo;
import io.lunes.transaction.Transaction;
import io.lunes.utx.UtxPool;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelOption;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.util.concurrent.DefaultThreadFactory;
import io.netty.util.concurrent.ScheduledFuture;
import java.net.InetSocketAddress;
import java.net.NetworkInterface;
import java.nio.channels.ClosedChannelException;
import java.util.concurrent.ConcurrentHashMap;
import monix.eval.Task;
import monix.reactive.Observable;
import org.influxdb.dto.Point;
import scala.MatchError;
import scala.Option;
import scala.Option$;
import scala.Predef$;
import scala.Tuple2;
import scala.Tuple7;
import scala.collection.AbstractSeq;
import scala.collection.Iterator;
import scala.collection.JavaConverters$;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.SeqLike;
import scala.collection.TraversableOnce;
import scala.collection.immutable.Set;
import scala.collection.immutable.Vector;
import scala.collection.immutable.Vector$;
import scala.concurrent.duration.Cpackage;
import scala.math.BigInt;
import scala.math.Ordering$String$;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.VolatileBooleanRef;
import scorex.block.Block;
import scorex.utils.LoggerFacade;
import scorex.utils.ScorexLogging;

/* compiled from: NetworkServer.scala */
/* loaded from: input_file:io/lunes/network/NetworkServer$.class */
public final class NetworkServer$ implements ScorexLogging {
    public static NetworkServer$ MODULE$;

    static {
        new NetworkServer$();
    }

    @Override // scorex.utils.ScorexLogging
    public LoggerFacade log() {
        LoggerFacade log;
        log = log();
        return log;
    }

    @Override // scorex.utils.ScorexLogging
    public <A> ScorexLogging.TaskExt<A> TaskExt(Task<A> task) {
        ScorexLogging.TaskExt<A> TaskExt;
        TaskExt = TaskExt(task);
        return TaskExt;
    }

    @Override // scorex.utils.ScorexLogging
    public <A> ScorexLogging.ObservableExt<A> ObservableExt(Observable<A> observable) {
        ScorexLogging.ObservableExt<A> ObservableExt;
        ObservableExt = ObservableExt(observable);
        return ObservableExt;
    }

    public NS apply(LunesSettings lunesSettings, Observable<LastBlockInfo> observable, NG ng, HistoryReplier historyReplier, UtxPool utxPool, final PeerDatabase peerDatabase, final ChannelGroup channelGroup, ConcurrentHashMap<Channel, PeerInfo> concurrentHashMap) {
        final VolatileBooleanRef create = VolatileBooleanRef.create(false);
        final NioEventLoopGroup nioEventLoopGroup = new NioEventLoopGroup(0, new DefaultThreadFactory("nio-boss-group", true));
        final NioEventLoopGroup nioEventLoopGroup2 = new NioEventLoopGroup(0, new DefaultThreadFactory("nio-worker-group", true));
        Handshake handshake = new Handshake(new StringBuilder(0).append(Constants$.MODULE$.ApplicationName()).append(lunesSettings.blockchainSettings().addressSchemeCharacter()).toString(), Constants$.MODULE$.VersionTuple(), lunesSettings.networkSettings().nodeName(), lunesSettings.networkSettings().nonce(), lunesSettings.networkSettings().declaredAddress());
        TrafficWatcher trafficWatcher = new TrafficWatcher();
        TrafficLogger trafficLogger = new TrafficLogger(lunesSettings.networkSettings().trafficLogger());
        MessageCodec messageCodec = new MessageCodec(peerDatabase);
        InetSocketAddress bindAddress = lunesSettings.networkSettings().bindAddress();
        Set set = (Set) (Option$.MODULE$.apply(bindAddress.getAddress()).exists(inetAddress -> {
            return BoxesRunTime.boxToBoolean(inetAddress.isAnyLocalAddress());
        }) ? ((Iterator) JavaConverters$.MODULE$.enumerationAsScalaIteratorConverter(NetworkInterface.getNetworkInterfaces()).asScala()).flatMap(networkInterface -> {
            return ((Iterator) JavaConverters$.MODULE$.enumerationAsScalaIteratorConverter(networkInterface.getInetAddresses()).asScala()).map(inetAddress2 -> {
                return new InetSocketAddress(inetAddress2, bindAddress.getPort());
            });
        }).toSet() : (Set) Predef$.MODULE$.Set().apply(Predef$.MODULE$.wrapRefArray(new InetSocketAddress[]{bindAddress}))).$plus$plus(Option$.MODULE$.option2Iterable(lunesSettings.networkSettings().declaredAddress()).toSet());
        LengthFieldPrepender lengthFieldPrepender = new LengthFieldPrepender(4);
        WriteErrorHandler writeErrorHandler = new WriteErrorHandler();
        FatalErrorHandler fatalErrorHandler = new FatalErrorHandler();
        PipelineInitializer.HandlerWrapper handlerToWrapper = PipelineInitializer$HandlerWrapper$.MODULE$.handlerToWrapper(new InboundConnectionFilter(peerDatabase, lunesSettings.networkSettings().maxInboundConnections(), lunesSettings.networkSettings().maxConnectionsPerHost()));
        Tuple2<MessageObserver, Tuple7<Observable<Tuple2<Channel, Signatures>>, Observable<Tuple2<Channel, Block>>, Observable<Tuple2<Channel, BigInt>>, Observable<Tuple2<Channel, Checkpoint>>, Observable<Tuple2<Channel, MicroBlockInv>>, Observable<Tuple2<Channel, MicroBlockResponse>>, Observable<Tuple2<Channel, Transaction>>>> apply = MessageObserver$.MODULE$.apply();
        if (apply == null) {
            throw new MatchError(apply);
        }
        Tuple2 tuple2 = new Tuple2(apply.mo7433_1(), apply.mo7432_2());
        final MessageObserver messageObserver = (MessageObserver) tuple2.mo7433_1();
        final Tuple7 tuple7 = (Tuple7) tuple2.mo7432_2();
        Tuple2<ChannelClosedHandler, Observable<Channel>> apply2 = ChannelClosedHandler$.MODULE$.apply();
        if (apply2 == null) {
            throw new MatchError(apply2);
        }
        Tuple2 tuple22 = new Tuple2(apply2.mo7433_1(), apply2.mo7432_2());
        final ChannelClosedHandler channelClosedHandler = (ChannelClosedHandler) tuple22.mo7433_1();
        final Observable observable2 = (Observable) tuple22.mo7432_2();
        DiscardingHandler discardingHandler = new DiscardingHandler(observable.map(lastBlockInfo -> {
            return BoxesRunTime.boxToBoolean(lastBlockInfo.ready());
        }));
        ConcurrentHashMap concurrentHashMap2 = new ConcurrentHashMap(10, 0.9f, 10);
        HandshakeHandler.Server server = new HandshakeHandler.Server(handshake, concurrentHashMap, concurrentHashMap2, peerDatabase, channelGroup);
        final Option<B> map = lunesSettings.networkSettings().declaredAddress().map(inetSocketAddress -> {
            return new ServerBootstrap().group(nioEventLoopGroup, nioEventLoopGroup2).channel(NioServerSocketChannel.class).childHandler(new PipelineInitializer(() -> {
                return (Seq) Seq$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new PipelineInitializer.HandlerWrapper[]{handlerToWrapper, PipelineInitializer$HandlerWrapper$.MODULE$.handlerToWrapper(new HandshakeDecoder(peerDatabase)), PipelineInitializer$HandlerWrapper$.MODULE$.handlerToWrapper(new HandshakeTimeoutHandler(lunesSettings.networkSettings().handshakeTimeout())), PipelineInitializer$HandlerWrapper$.MODULE$.handlerToWrapper(server), PipelineInitializer$HandlerWrapper$.MODULE$.handlerToWrapper(lengthFieldPrepender), PipelineInitializer$HandlerWrapper$.MODULE$.handlerToWrapper(new LengthFieldBasedFrameDecoder(104857600, 0, 4, 0, 4)), PipelineInitializer$HandlerWrapper$.MODULE$.handlerToWrapper(new LegacyFrameCodec(peerDatabase)), PipelineInitializer$HandlerWrapper$.MODULE$.handlerToWrapper(channelClosedHandler), PipelineInitializer$HandlerWrapper$.MODULE$.handlerToWrapper(trafficWatcher), PipelineInitializer$HandlerWrapper$.MODULE$.handlerToWrapper(discardingHandler), PipelineInitializer$HandlerWrapper$.MODULE$.handlerToWrapper(messageCodec), PipelineInitializer$HandlerWrapper$.MODULE$.handlerToWrapper(trafficLogger), PipelineInitializer$HandlerWrapper$.MODULE$.handlerToWrapper(writeErrorHandler), PipelineInitializer$HandlerWrapper$.MODULE$.handlerToWrapper(peerSynchronizer$1(lunesSettings, peerDatabase)), PipelineInitializer$HandlerWrapper$.MODULE$.handlerToWrapper(historyReplier), PipelineInitializer$HandlerWrapper$.MODULE$.handlerToWrapper(messageObserver), PipelineInitializer$HandlerWrapper$.MODULE$.handlerToWrapper(fatalErrorHandler)}));
            })).bind(lunesSettings.networkSettings().bindAddress()).channel();
        });
        final ConcurrentHashMap concurrentHashMap3 = new ConcurrentHashMap();
        HandshakeHandler.Client client = new HandshakeHandler.Client(handshake, concurrentHashMap, concurrentHashMap2, peerDatabase, channelGroup);
        final Bootstrap handler = new Bootstrap().option(ChannelOption.CONNECT_TIMEOUT_MILLIS, Predef$.MODULE$.int2Integer((int) lunesSettings.networkSettings().connectionTimeout().toMillis())).group(nioEventLoopGroup2).channel(NioSocketChannel.class).handler(new PipelineInitializer(() -> {
            return (Seq) Seq$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new PipelineInitializer.HandlerWrapper[]{PipelineInitializer$HandlerWrapper$.MODULE$.handlerToWrapper(new HandshakeDecoder(peerDatabase)), PipelineInitializer$HandlerWrapper$.MODULE$.handlerToWrapper(new HandshakeTimeoutHandler(lunesSettings.networkSettings().handshakeTimeout())), PipelineInitializer$HandlerWrapper$.MODULE$.handlerToWrapper(client), PipelineInitializer$HandlerWrapper$.MODULE$.handlerToWrapper(lengthFieldPrepender), PipelineInitializer$HandlerWrapper$.MODULE$.handlerToWrapper(new LengthFieldBasedFrameDecoder(104857600, 0, 4, 0, 4)), PipelineInitializer$HandlerWrapper$.MODULE$.handlerToWrapper(new LegacyFrameCodec(peerDatabase)), PipelineInitializer$HandlerWrapper$.MODULE$.handlerToWrapper(channelClosedHandler), PipelineInitializer$HandlerWrapper$.MODULE$.handlerToWrapper(trafficWatcher), PipelineInitializer$HandlerWrapper$.MODULE$.handlerToWrapper(discardingHandler), PipelineInitializer$HandlerWrapper$.MODULE$.handlerToWrapper(messageCodec), PipelineInitializer$HandlerWrapper$.MODULE$.handlerToWrapper(trafficLogger), PipelineInitializer$HandlerWrapper$.MODULE$.handlerToWrapper(writeErrorHandler), PipelineInitializer$HandlerWrapper$.MODULE$.handlerToWrapper(peerSynchronizer$1(lunesSettings, peerDatabase)), PipelineInitializer$HandlerWrapper$.MODULE$.handlerToWrapper(historyReplier), PipelineInitializer$HandlerWrapper$.MODULE$.handlerToWrapper(messageObserver), PipelineInitializer$HandlerWrapper$.MODULE$.handlerToWrapper(fatalErrorHandler)}));
        }));
        final ScheduledFuture<?> scheduleWithFixedDelay$extension = package$EventExecutorGroupExt$.MODULE$.scheduleWithFixedDelay$extension(package$.MODULE$.EventExecutorGroupExt(nioEventLoopGroup2), new Cpackage.DurationInt(scala.concurrent.duration.package$.MODULE$.DurationInt(1)).second(), new Cpackage.DurationInt(scala.concurrent.duration.package$.MODULE$.DurationInt(5)).seconds(), () -> {
            Vector vector = ((TraversableOnce) JavaConverters$.MODULE$.asScalaIteratorConverter(concurrentHashMap3.keySet().iterator()).asScala()).toVector();
            AbstractSeq vector2 = ((Iterator) JavaConverters$.MODULE$.asScalaIteratorConverter(concurrentHashMap.values().iterator()).asScala()).flatMap(peerInfo -> {
                return Option$.MODULE$.option2Iterable(peerInfo.declaredAddress());
            }).toVector();
            Vector vector3 = (Vector) vector2.filterNot(obj -> {
                return BoxesRunTime.boxToBoolean(vector.contains(obj));
            });
            MODULE$.log().trace(() -> {
                return new StringBuilder(24).append("Outgoing: ").append(outgoingStr$1(vector)).append(" ++ incoming: ").append(incomingStr$1(vector3)).toString();
            });
            if (concurrentHashMap3.size() < lunesSettings.networkSettings().maxOutboundConnections()) {
                peerDatabase.randomPeer((Set) set.$plus$plus(vector2)).foreach(inetSocketAddress2 -> {
                    this.io$lunes$network$NetworkServer$$doConnect$1(inetSocketAddress2, peerDatabase, create, concurrentHashMap3, handler);
                    return BoxedUnit.UNIT;
                });
            }
            Metrics$.MODULE$.write(Point.measurement("connections").addField("outgoing", outgoingStr$1(vector)).addField("incoming", incomingStr$1(vector3)).addField("n", vector2.size()));
        });
        return new NS(peerDatabase, channelGroup, create, nioEventLoopGroup, nioEventLoopGroup2, messageObserver, tuple7, channelClosedHandler, observable2, map, concurrentHashMap3, handler, scheduleWithFixedDelay$extension) { // from class: io.lunes.network.NetworkServer$$anon$1
            private final Tuple7<Observable<Tuple2<Channel, Signatures>>, Observable<Tuple2<Channel, Block>>, Observable<Tuple2<Channel, BigInt>>, Observable<Tuple2<Channel, Checkpoint>>, Observable<Tuple2<Channel, MicroBlockInv>>, Observable<Tuple2<Channel, MicroBlockResponse>>, Observable<Tuple2<Channel, Transaction>>> messages;
            private final Observable<Channel> closedChannels;
            private final PeerDatabase peerDatabase$1;
            private final ChannelGroup allChannels$1;
            private final VolatileBooleanRef shutdownInitiated$1;
            private final NioEventLoopGroup bossGroup$1;
            private final NioEventLoopGroup workerGroup$1;
            private final MessageObserver mesageObserver$1;
            private final ChannelClosedHandler channelClosedHandler$1;
            private final Option serverChannel$1;
            private final ConcurrentHashMap outgoingChannels$1;
            private final Bootstrap bootstrap$1;
            private final ScheduledFuture connectTask$1;

            @Override // io.lunes.network.NS
            public void connect(InetSocketAddress inetSocketAddress2) {
                NetworkServer$.MODULE$.io$lunes$network$NetworkServer$$doConnect$1(inetSocketAddress2, this.peerDatabase$1, this.shutdownInitiated$1, this.outgoingChannels$1, this.bootstrap$1);
            }

            @Override // io.lunes.network.NS
            public void shutdown() {
                NetworkServer$.MODULE$.io$lunes$network$NetworkServer$$doShutdown$1(this.allChannels$1, this.shutdownInitiated$1, this.bossGroup$1, this.workerGroup$1, this.mesageObserver$1, this.channelClosedHandler$1, this.serverChannel$1, this.connectTask$1);
            }

            @Override // io.lunes.network.NS
            public Tuple7<Observable<Tuple2<Channel, Signatures>>, Observable<Tuple2<Channel, Block>>, Observable<Tuple2<Channel, BigInt>>, Observable<Tuple2<Channel, Checkpoint>>, Observable<Tuple2<Channel, MicroBlockInv>>, Observable<Tuple2<Channel, MicroBlockResponse>>, Observable<Tuple2<Channel, Transaction>>> messages() {
                return this.messages;
            }

            @Override // io.lunes.network.NS
            public Observable<Channel> closedChannels() {
                return this.closedChannels;
            }

            {
                this.peerDatabase$1 = peerDatabase;
                this.allChannels$1 = channelGroup;
                this.shutdownInitiated$1 = create;
                this.bossGroup$1 = nioEventLoopGroup;
                this.workerGroup$1 = nioEventLoopGroup2;
                this.mesageObserver$1 = messageObserver;
                this.channelClosedHandler$1 = channelClosedHandler;
                this.serverChannel$1 = map;
                this.outgoingChannels$1 = concurrentHashMap3;
                this.bootstrap$1 = handler;
                this.connectTask$1 = scheduleWithFixedDelay$extension;
                this.messages = tuple7;
                this.closedChannels = observable2;
            }
        };
    }

    private static final ChannelHandlerAdapter peerSynchronizer$1(LunesSettings lunesSettings, PeerDatabase peerDatabase) {
        return lunesSettings.networkSettings().enablePeersExchange() ? new PeerSynchronizer(peerDatabase, lunesSettings.networkSettings().peersBroadcastInterval()) : PeerSynchronizer$.MODULE$.Disabled();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static final String formatOutgoingChannelEvent$1(Channel channel, String str, ConcurrentHashMap concurrentHashMap) {
        return new StringBuilder(27).append(package$.MODULE$.id(channel, package$.MODULE$.id$default$2())).append(" ").append(str).append(", outgoing channel count: ").append(concurrentHashMap.size()).toString();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public final void handleOutgoingChannelClosed$1(InetSocketAddress inetSocketAddress, ChannelFuture channelFuture, PeerDatabase peerDatabase, VolatileBooleanRef volatileBooleanRef, ConcurrentHashMap concurrentHashMap) {
        concurrentHashMap.remove(inetSocketAddress, channelFuture.channel());
        if (!volatileBooleanRef.elem) {
            peerDatabase.suspendAndClose(channelFuture.channel());
        }
        if (channelFuture.isSuccess()) {
            log().trace(() -> {
                return formatOutgoingChannelEvent$1(channelFuture.channel(), "Channel closed (expected)", concurrentHashMap);
            });
        } else {
            log().debug(() -> {
                return formatOutgoingChannelEvent$1(channelFuture.channel(), new StringBuilder(16).append("Channel closed: ").append(Option$.MODULE$.apply(channelFuture.cause()).map(th -> {
                    return th.getMessage();
                }).getOrElse(() -> {
                    return "no message";
                })).toString(), concurrentHashMap);
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public final void handleConnectionAttempt$1(InetSocketAddress inetSocketAddress, ChannelFuture channelFuture, PeerDatabase peerDatabase, VolatileBooleanRef volatileBooleanRef, ConcurrentHashMap concurrentHashMap) {
        if (channelFuture.isSuccess()) {
            log().trace(() -> {
                return formatOutgoingChannelEvent$1(channelFuture.channel(), "Connection established", concurrentHashMap);
            });
            peerDatabase.touch(inetSocketAddress);
            channelFuture.channel().closeFuture().addListener2(channelFuture2 -> {
                this.handleOutgoingChannelClosed$1(inetSocketAddress, channelFuture2, peerDatabase, volatileBooleanRef, concurrentHashMap);
            });
        } else if (channelFuture.cause() != null) {
            peerDatabase.suspendAndClose(channelFuture.channel());
            concurrentHashMap.remove(inetSocketAddress, channelFuture.channel());
            Throwable cause = channelFuture.cause();
            if (!(cause instanceof ClosedChannelException)) {
                log().debug(() -> {
                    return formatOutgoingChannelEvent$1(channelFuture.channel(), cause.getMessage(), concurrentHashMap);
                });
                BoxedUnit boxedUnit = BoxedUnit.UNIT;
            } else {
                ClosedChannelException closedChannelException = (ClosedChannelException) cause;
                log().trace(() -> {
                    return formatOutgoingChannelEvent$1(channelFuture.channel(), new StringBuilder(36).append("Channel closed by connection issue: ").append(Option$.MODULE$.apply(closedChannelException.getMessage()).getOrElse(() -> {
                        return "no message";
                    })).toString(), concurrentHashMap);
                });
                BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
            }
        }
    }

    public final void io$lunes$network$NetworkServer$$doConnect$1(InetSocketAddress inetSocketAddress, PeerDatabase peerDatabase, VolatileBooleanRef volatileBooleanRef, ConcurrentHashMap concurrentHashMap, Bootstrap bootstrap) {
        concurrentHashMap.computeIfAbsent(inetSocketAddress, inetSocketAddress2 -> {
            ChannelFuture connect = bootstrap.connect(inetSocketAddress);
            MODULE$.log().trace(() -> {
                return new StringBuilder(15).append(package$.MODULE$.id(connect.channel(), package$.MODULE$.id$default$2())).append(" Connecting to ").append(inetSocketAddress).toString();
            });
            return connect.addListener2(channelFuture -> {
                this.handleConnectionAttempt$1(inetSocketAddress, channelFuture, peerDatabase, volatileBooleanRef, concurrentHashMap);
            }).channel();
        });
    }

    private static final String outgoingStr$1(Vector vector) {
        return ((TraversableOnce) ((SeqLike) vector.map(inetSocketAddress -> {
            return inetSocketAddress.toString();
        }, Vector$.MODULE$.canBuildFrom())).sorted(Ordering$String$.MODULE$)).mkString("[", ", ", "]");
    }

    private static final String incomingStr$1(Vector vector) {
        return ((TraversableOnce) ((SeqLike) vector.map(inetSocketAddress -> {
            return inetSocketAddress.toString();
        }, Vector$.MODULE$.canBuildFrom())).sorted(Ordering$String$.MODULE$)).mkString("[", ", ", "]");
    }

    public final void io$lunes$network$NetworkServer$$doShutdown$1(ChannelGroup channelGroup, VolatileBooleanRef volatileBooleanRef, NioEventLoopGroup nioEventLoopGroup, NioEventLoopGroup nioEventLoopGroup2, MessageObserver messageObserver, ChannelClosedHandler channelClosedHandler, Option option, ScheduledFuture scheduledFuture) {
        try {
            volatileBooleanRef.elem = true;
            scheduledFuture.cancel(false);
            option.foreach(channel -> {
                return channel.close().await2();
            });
            log().debug(() -> {
                return "Unbound server";
            });
            channelGroup.close().await2();
            log().debug(() -> {
                return "Closed all channels";
            });
        } finally {
            nioEventLoopGroup2.shutdownGracefully().await2();
            nioEventLoopGroup.shutdownGracefully().await2();
            messageObserver.shutdown();
            channelClosedHandler.shutdown();
        }
    }

    private NetworkServer$() {
        MODULE$ = this;
        ScorexLogging.$init$(this);
    }
}
