// • ▌ ▄ ·.  ▄▄▄·  ▄▄ • ▪   ▄▄· ▄▄▄▄·  ▄▄▄·  ▐▄▄▄  ▄▄▄ .
// ·██ ▐███▪▐█ ▀█ ▐█ ▀ ▪██ ▐█ ▌▪▐█ ▀█▪▐█ ▀█ •█▌ ▐█▐▌·
// ▐█ ▌▐▌▐█·▄█▀▀█ ▄█ ▀█▄▐█·██ ▄▄▐█▀▀█▄▄█▀▀█ ▐█▐ ▐▌▐▀▀▀
// ██ ██▌▐█▌▐█ ▪▐▌▐█▄▪▐█▐█▌▐███▌██▄▪▐█▐█ ▪▐▌██▐ █▌▐█▄▄▌
// ▀▀  █▪▀▀▀ ▀  ▀ ·▀▀▀▀ ▀▀▀·▀▀▀ ·▀▀▀▀  ▀  ▀ ▀▀  █▪ ▀▀▀
//      Magicbane Emulator Project © 2013 - 2022
//                www.magicbane.com


package engine.net;

import engine.core.ControlledRunnable;
import engine.job.AbstractJob;
import engine.job.JobManager;
import engine.server.MBServerStatics;
import org.pmw.tinylog.Logger;

import java.io.IOException;
import java.net.*;
import java.nio.channels.*;
import java.nio.channels.spi.SelectorProvider;
import java.util.Iterator;
import java.util.concurrent.ConcurrentLinkedQueue;

public abstract class AbstractConnectionManager extends ControlledRunnable {

    private final Selector selector;
    private final ServerSocketChannel listenChannel;
    private final ConcurrentLinkedQueue<ChangeRequest> chngReqs = new ConcurrentLinkedQueue<>();
    /**
     * Removes the mapping that contains the key 'sockChan'
     *
     * @param sockChan
     */
    private long lastAuditTime = 0;

    /*
     *
     */
    public AbstractConnectionManager(String nodeName, InetAddress hostAddress,
                                     int port) throws IOException {
        super(nodeName);

        this.selector = SelectorProvider.provider().openSelector();

        // Create a new non-blocking Server channel
        this.listenChannel = ServerSocketChannel.open();
        this.listenChannel.configureBlocking(false);

        // Bind
        InetSocketAddress isa = new InetSocketAddress(hostAddress, port);
        this.listenChannel.socket().bind(isa);

        Logger.info(this.getLocalNodeName() + " Configured to listen: "
                + isa.getAddress().toString() + ':' + port);

        // register an interest in Accepting new connections.
        SelectionKey sk = this.listenChannel.register(this.selector, SelectionKey.OP_ACCEPT);
        sk.attach(this);
    }

    /*
     * ControlledRunnable implementations
     */
    @Override
    protected void _startup() {
    }

    @Override
    protected void _shutdown() {
        this.selector.wakeup();
        this.disconnectAll();
        this.selector.wakeup();
    }

    @Override
    protected boolean _preRun() {
        this.runStatus = true;
        return true;
    }

    @Override
    protected boolean _Run() {
        while (this.runCmd) {
            try {
                this.runLoopHook();

                this.processChangeRequests();
                this.auditSocketChannelToConnectionMap();
                this.selector.select(250L);
                this.processNewEvents();

            } catch (Exception e) {
                Logger.error(e.toString());
            }
        }
        return true;
    }

    @Override
    protected boolean _postRun() {

        this.runStatus = false;

        this.disconnectAll();

        try {
            this.selector.close();
        } catch (IOException e) {
            Logger.error(e.toString());
        }

        return true;
    }

    /**
     * Hook for subclasses to use.
     */
    protected void runLoopHook() {
    }

    /*
     * Accept / New Connection FNs
     */
    private AbstractConnection acceptNewConnection(final SelectionKey key)
            throws IOException {

        this.preAcceptNewConnectionHook(key);

        // Cancel incoming connections if server isn't set to listen
        if (this.listenChannel == null || this.listenChannel.isOpen() == false) {
            key.cancel();
            return null;
        }

        // For an accept to be pending, the key contains a reference to the
        // ServerSocketChannel
        ServerSocketChannel ssc = (ServerSocketChannel) key.channel();

        // Get SocketChannel
        SocketChannel sockChan = ssc.accept();
        sockChan.configureBlocking(false);

        //Configure the Socket
        Socket socket = sockChan.socket();
        socket.setSendBufferSize(Network.INITIAL_SOCKET_BUFFER_SIZE);
        socket.setReceiveBufferSize(Network.INITIAL_SOCKET_BUFFER_SIZE);
        socket.setTcpNoDelay(MBServerStatics.TCP_NO_DELAY_DEFAULT);

        //Register with the selector
        SelectionKey sk = sockChan.register(this.selector, SelectionKey.OP_READ);
        if (sk == null) {
            Logger.error("FIX ME! NULL SELECTION KEY!");
            return null;
        }

        //Initialize Connection
        AbstractConnection ac = this.getNewIncomingConnection(sockChan);
        sk.attach(ac);

        this.postAcceptNewConnectionHook(ac);
        return ac;
    }

    /**
     * Hook for subclasses to use.
     *
     * @param key
     */
    protected void preAcceptNewConnectionHook(SelectionKey key) {
    }

    /**
     * Hook for subclasses to use.
     *
     * @param ac
     */
    protected void postAcceptNewConnectionHook(AbstractConnection ac) {
    }

    protected abstract AbstractConnection getNewIncomingConnection(SocketChannel sockChan);

    protected abstract AbstractConnection getNewOutgoingConnection(SocketChannel sockChan);

    /*
     * Disconnect / Destroy Connection FNs
     */
    protected boolean disconnect(final SelectionKey key) {

        this.disconnect((AbstractConnection) key.attachment());

        key.attach(null);
        key.cancel();
        return key.isValid();
    }

    protected boolean disconnect(final AbstractConnection c) {
        if (c == null)
            return false;

        c.disconnect();

        return c.getSocketChannel().isConnected();
    }

    /*
     * Data IO
     */

    /*
     * WRITE SEQUENCE
     */

    protected void disconnectAll() {
        synchronized (this.selector.keys()) {
            for (SelectionKey sk : this.selector.keys()) {
                if (sk.channel() instanceof SocketChannel)
                    disconnect(sk);
            }
        }
    }

    /**
     * Submits a request to set this Connection to WRITE mode.
     */
    protected void sendStart(final SocketChannel sockChan) {
        synchronized (this.chngReqs) {
            // Indicate we want the interest ops set changed
            this.chngReqs.add(new ChangeRequest(sockChan, ChangeType.CHANGEOPS, SelectionKey.OP_WRITE));
        }

        this.selector.wakeup();
    }

    /*
     * READ SEQUENCE
     */

    /**
     * @param key
     * @return Boolean indication emit success.
     */
    protected boolean sendFinish(final SelectionKey key) {
        SocketChannel sockChan = (SocketChannel) key.channel();

        // Check to see if the SocketChannel the selector offered up
        // is null.
        if (sockChan == null) {
            Logger.error(": null sockChannel.");
            this.disconnect(key);
            return false;
        }

        AbstractConnection c = (AbstractConnection) key.attachment();

        if (c == null) {
            Logger.error(": null Connection.");
            this.disconnect(key);
            return false;
        }

        //		long startTime = System.currentTimeMillis();
        boolean allSent = c.writeAll();

        //		if ((System.currentTimeMillis() - startTime) > 20)
        //			this.logDirectWARNING(c.getRemoteAddressAndPortAsString() + " took " + (System.currentTimeMillis() - startTime) + "ms to handle!");

        // If all was written, switch back to Read Mode.
        if (allSent || !c.isConnected()) {

            // Indicate we want the interest ops set changed
            ChangeRequest chReq = new ChangeRequest(c.getSocketChannel(), ChangeType.CHANGEOPS, SelectionKey.OP_READ);
            synchronized (this.chngReqs) {
                this.chngReqs.add(chReq);
            }

            this.selector.wakeup();
        }
        return true;
    }

    /**
     * @param key
     * @return Boolean indication of success.
     */
    private boolean receive(final SelectionKey key) {
        SocketChannel sockChan = (SocketChannel) key.channel();

        // Check to see if the SocketChannel the selector offered up
        // is null.
        if (sockChan == null) {
            Logger.error("null sockChannel.");
            this.disconnect(key);
            return false;
        }

        AbstractConnection c = (AbstractConnection) key.attachment();

        if (c == null) {
            Logger.error("null Connection.");
            this.disconnect(key);
            return false;
        }

        c.read();

        return true;
    }

    /*
     * Main Loop And Loop Controls
     */
    private void processChangeRequests() {
        SelectionKey selKey = null;
        ChangeRequest sccr = null;
        ChangeType change = null;
        SocketChannel sockChan = null;

        synchronized (this.chngReqs) {
            Iterator<ChangeRequest> it = this.chngReqs.iterator();
            while (it.hasNext()) {
                sccr = it.next();

                if (sccr == null) {
                    it.remove();
                    continue;
                }

                change = sccr.getChangeType();
                sockChan = sccr.getSocketChannel();

                switch (change) {
                    case CHANGEOPS:
                        selKey = sockChan.keyFor(this.selector);

                        if (selKey == null || selKey.isValid() == false)
                            continue;

                        selKey.interestOps(sccr.getOps());
                        break;

                    case REGISTER:
                        try {
                            sockChan.register(this.selector, sccr.getOps());

                        } catch (ClosedChannelException e) {
                            // TODO Should a closed channel be logged or just
                            // cleaned up?
                            // Logger.error(this.getLocalNodeName(), e);
                        }
                        break;
                }
            }
            this.chngReqs.clear();
        }
    }

    private void processNewEvents() {
        SelectionKey thisKey = null;
        Iterator<SelectionKey> selectedKeys = null;
        JobManager jm = JobManager.getInstance();

        selectedKeys = this.selector.selectedKeys().iterator();

        if (selectedKeys.hasNext() == false)
            return;

        while (selectedKeys.hasNext()) {
            thisKey = selectedKeys.next();

            //To shake out any issues
            if (thisKey.attachment() == null)
                Logger.error("Found null attachment! PANIC!");

            if (thisKey.attachment() instanceof AbstractConnection)
                if (((AbstractConnection) thisKey.attachment()).execTask.get() == true)
                    continue;

            selectedKeys.remove();

            try {
                if (thisKey.isValid() == false)
                    break;  // Changed from continue
                else if (thisKey.isAcceptable())
                    this.acceptNewConnection(thisKey);
                else if (thisKey.isReadable())
                    jm.submitJob(new ReadOperationHander(thisKey));
                else if (thisKey.isWritable())
                    jm.submitJob(new WriteOperationHander(thisKey));
                else if (thisKey.isConnectable())
                    this.finishConnectingTo(thisKey);
                else
                    Logger.error("Unhandled keystate: " + thisKey.toString());
            } catch (CancelledKeyException cke) {
                Logger.error(this.getLocalNodeName(), cke);
                this.disconnect(thisKey);

            } catch (IOException e) {
                Logger.error(this.getLocalNodeName(), e);
            }
        }
    }

    protected void connectTo(String host, int port) {
        try {
            this.connectTo(InetAddress.getByName(host), port);
        } catch (UnknownHostException e) {
            Logger.error(this.getLocalNodeName(), e);
        }
    }

    protected void connectTo(InetAddress host, int port) {
        try {
            this.startConnectingTo(host, port);
            this.selector.wakeup();
        } catch (IOException e) {
            Logger.error(this.getLocalNodeName(), e);
        }
    }

    protected final void startConnectingTo(InetAddress host, int port)
            throws IOException {
        // Create a non-blocking socket channel
        SocketChannel sockChan = SocketChannel.open();
        sockChan.configureBlocking(false);
        sockChan.socket().setSendBufferSize(Network.INITIAL_SOCKET_BUFFER_SIZE);
        sockChan.socket().setReceiveBufferSize(Network.INITIAL_SOCKET_BUFFER_SIZE);

        // Make a new Connection object
        this.getNewOutgoingConnection(sockChan);

        // Kick off connection establishment
        sockChan.connect(new InetSocketAddress(host, port));

        synchronized (this.chngReqs) {
            this.chngReqs.add(new ChangeRequest(sockChan, ChangeType.REGISTER, SelectionKey.OP_CONNECT));
        }

        this.selector.wakeup();
    }

    private void finishConnectingTo(SelectionKey key) throws IOException {
        this.preFinishConnectingToHook(key);

        // Get sockChan
        SocketChannel sockChan = (SocketChannel) key.channel();

        // Get AbstractConnection
        AbstractConnection ac = (AbstractConnection) key.attachment();

        if (sockChan == null) {
            Logger.error(this.getLocalNodeName(), "null socketChannel");
            this.disconnect(key);
            return;
        }

        if (ac == null) {
            Logger.error(this.getLocalNodeName(), "null AbstractConnection");
            this.disconnect(key);
            return;
        }

        // Finish the connection. If the connection operation failed
        // this will raise an IOException.
        try {
            sockChan.finishConnect();
        } catch (IOException e) {
            if (e.getMessage().startsWith("Connection refused:")
                    || e.getMessage().startsWith(
                    "An existing connection was forcibly closed")) {
                // eat this type of IOException
            } else
                Logger.error(this.getLocalNodeName(), e);

            // Cancel the channel's registration with our selector
            key.cancel();
            return;
        }

        Socket socket = sockChan.socket();
        Logger.debug("Connected to: "
                + socket.getInetAddress() + ':'
                + socket.getPort());

        sockChan.configureBlocking(false);
        sockChan.register(this.selector, SelectionKey.OP_READ);

        this.postFinishConnectingToHook(ac);
    }

    /**
     * Hook for subclasses to use.
     *
     * @param key
     */
    protected void preFinishConnectingToHook(SelectionKey key) {
    }

    /**
     * Hook for subclasses to use.
     *
     * @param ac
     */
    protected void postFinishConnectingToHook(AbstractConnection ac) {
    }

    public final String getLocalNodeName() {
        return this.getThreadName();
    }

    protected int auditSocketChannelToConnectionMap() {
        long startTime = System.currentTimeMillis();
        int numberOfItemsToProcess = 0;

        if (lastAuditTime + MBServerStatics.TIMEOUT_CHECKS_TIMER_MS > startTime)
            return -1;

        synchronized (this.selector.keys()) {
            for (SelectionKey sk : this.selector.keys()) {
                if (!(sk.channel() instanceof SocketChannel))
                    continue;

                SocketChannel sockChan = (SocketChannel) sk.channel();
                AbstractConnection conn = (AbstractConnection) sk.attachment();

                if (sockChan == null)
                    continue;

                if (!sockChan.isOpen()) {
                    numberOfItemsToProcess++;
                    Logger.info("sockChan closed. Disconnecting..");
                    disconnect(sk);
                    continue;
                }

                if (conn == null) {
                    numberOfItemsToProcess++;
                    Logger.info("Connection is null, Disconnecting.");
                    disconnect(sk);
                    continue;
                }

                //removed keep alive timeout. Believe failmu used this for disconnecting players on force quit, but a closed socket will already disconnect.
//                if (conn.getLastKeepAliveTime() + MBServerStatics.KEEPALIVE_TIMEOUT_MS < startTime) {
//                    numberOfItemsToProcess++;
//                    Logger.info("Keep alive Disconnecting " + conn.getRemoteAddressAndPortAsString());
//                    conn.disconnect();
//                    continue;
//                }

                if (conn.getLastMsgTime() + MBServerStatics.AFK_TIMEOUT_MS < startTime) {
                    numberOfItemsToProcess++;
                    Logger.info("AFK TIMEOUT Disconnecting " + conn.getRemoteAddressAndPortAsString());
                    conn.disconnect();
                }
            }
        }

        if (numberOfItemsToProcess != 0)
            Logger.info("Cleaned "
                    + numberOfItemsToProcess
                    + " dead connections in "
                    + (System.currentTimeMillis() - startTime)
                    + " millis.");

        lastAuditTime = System.currentTimeMillis();
        return numberOfItemsToProcess;
    }

    public int getConnectionSize() {
        if (this.selector == null)
            return -1;
        if (this.selector.keys() == null)
            return -1;
        return this.selector.keys().size();
    }

    /**
     * Returns the port on which this socket is listening.
     *
     * @return the port number to which this socket is listening or -1 if the
     * socket is not bound yet.
     */
    public int getListeningPort() {
        if (this.listenChannel == null)
            return -1;
        if (this.listenChannel.socket() == null)
            return -1;
        return this.listenChannel.socket().getLocalPort();
    }

    /**
     * Returns the address of the endpoint this socket is bound to, or null if
     * it is not bound yet.
     *
     * @return a SocketAddress representing the local endpoint of this socket,
     * or null if it is not bound yet.
     */
    public SocketAddress getListeningAddress() {
        if (this.listenChannel == null)
            return null;
        if (this.listenChannel.socket() == null)
            return null;
        return this.listenChannel.socket().getLocalSocketAddress();
    }

    /*
     *
     */
    protected static enum ChangeType {

        REGISTER, CHANGEOPS
    }

    private class ChangeRequest {

        private final SocketChannel sockChan;
        private final ChangeType changeType;
        private final Integer ops;

        public ChangeRequest(SocketChannel sockChan, ChangeType changeType,
                             int ops) {
            this.sockChan = sockChan;
            this.changeType = changeType;
            this.ops = ops;
        }

        public SocketChannel getSocketChannel() {
            synchronized (this.sockChan) {
                return this.sockChan;
            }
        }

        public ChangeType getChangeType() {
            synchronized (this.changeType) {
                return this.changeType;
            }
        }

        public int getOps() {
            synchronized (this.ops) {
                return this.ops;
            }
        }
    }

    private class ReadOperationHander extends AbstractJob {

        private final SelectionKey sk;
        private final AbstractConnection ac;
        private final boolean runStatus;

        public ReadOperationHander(final SelectionKey sk) {
            this.sk = sk;

            if (sk.attachment() instanceof AbstractConnection) {
                this.ac = (AbstractConnection) sk.attachment();
                this.runStatus = this.ac.execTask.compareAndSet(false, true);
            } else {
                this.ac = null;
                this.runStatus = false;
                Logger.error("Passed selection key did not have a corresponding Connection!(Read)");
            }
        }

        @Override
        protected void doJob() {
            if (runStatus) {
                this.ac.connMan.receive(sk);
                this.ac.execTask.compareAndSet(true, false);
            }
        }
    }

    private class WriteOperationHander extends AbstractJob {

        private final SelectionKey sk;
        private final AbstractConnection ac;
        private final boolean runStatus;

        public WriteOperationHander(final SelectionKey sk) {
            this.sk = sk;

            if (sk.attachment() instanceof AbstractConnection) {
                this.ac = (AbstractConnection) sk.attachment();
                this.runStatus = this.ac.execTask.compareAndSet(false, true);
            } else {
                this.runStatus = false;
                this.ac = null;
                Logger.error("Passed selection key did not have a corresponding Connection!(Write)");
            }

        }

        @Override
        protected void doJob() {
            if (runStatus) {
                this.ac.connMan.sendFinish(sk);
                this.ac.execTask.compareAndSet(true, false);
            }
        }
    }

}