// • ▌ ▄ ·. ▄▄▄· ▄▄ • ▪ ▄▄· ▄▄▄▄· ▄▄▄· ▐▄▄▄ ▄▄▄ . // ·██ ▐███▪▐█ ▀█ ▐█ ▀ ▪██ ▐█ ▌▪▐█ ▀█▪▐█ ▀█ •█▌ ▐█▐▌· // ▐█ ▌▐▌▐█·▄█▀▀█ ▄█ ▀█▄▐█·██ ▄▄▐█▀▀█▄▄█▀▀█ ▐█▐ ▐▌▐▀▀▀ // ██ ██▌▐█▌▐█ ▪▐▌▐█▄▪▐█▐█▌▐███▌██▄▪▐█▐█ ▪▐▌██▐ █▌▐█▄▄▌ // ▀▀ █▪▀▀▀ ▀ ▀ ·▀▀▀▀ ▀▀▀·▀▀▀ ·▀▀▀▀ ▀ ▀ ▀▀ █▪ ▀▀▀ // Magicbane Emulator Project © 2013 - 2022 // www.magicbane.com package engine.net; import engine.core.ControlledRunnable; import engine.job.AbstractJob; import engine.job.JobManager; import engine.server.MBServerStatics; import org.pmw.tinylog.Logger; import java.io.IOException; import java.net.*; import java.nio.channels.*; import java.nio.channels.spi.SelectorProvider; import java.util.Iterator; import java.util.concurrent.ConcurrentLinkedQueue; public abstract class AbstractConnectionManager extends ControlledRunnable { private final Selector selector; private final ServerSocketChannel listenChannel; private final ConcurrentLinkedQueue chngReqs = new ConcurrentLinkedQueue<>(); /** * Removes the mapping that contains the key 'sockChan' * * @param sockChan */ private long lastAuditTime = 0; /* * */ public AbstractConnectionManager(String nodeName, InetAddress hostAddress, int port) throws IOException { super(nodeName); this.selector = SelectorProvider.provider().openSelector(); // Create a new non-blocking Server channel this.listenChannel = ServerSocketChannel.open(); this.listenChannel.configureBlocking(false); // Bind InetSocketAddress isa = new InetSocketAddress(hostAddress, port); this.listenChannel.socket().bind(isa); Logger.info(this.getLocalNodeName() + " Configured to listen: " + isa.getAddress().toString() + ':' + port); // register an interest in Accepting new connections. SelectionKey sk = this.listenChannel.register(this.selector, SelectionKey.OP_ACCEPT); sk.attach(this); } /* * ControlledRunnable implementations */ @Override protected void _startup() { } @Override protected void _shutdown() { this.selector.wakeup(); this.disconnectAll(); this.selector.wakeup(); } @Override protected boolean _preRun() { this.runStatus = true; return true; } @Override protected boolean _Run() { while (this.runCmd) { try { this.runLoopHook(); this.processChangeRequests(); this.auditSocketChannelToConnectionMap(); this.selector.select(250L); this.processNewEvents(); } catch (Exception e) { Logger.error(e.toString()); } } return true; } @Override protected boolean _postRun() { this.runStatus = false; this.disconnectAll(); try { this.selector.close(); } catch (IOException e) { Logger.error(e.toString()); } return true; } /** * Hook for subclasses to use. */ protected void runLoopHook() { } /* * Accept / New Connection FNs */ private AbstractConnection acceptNewConnection(final SelectionKey key) throws IOException { this.preAcceptNewConnectionHook(key); // Cancel incoming connections if server isn't set to listen if (this.listenChannel == null || this.listenChannel.isOpen() == false) { key.cancel(); return null; } // For an accept to be pending, the key contains a reference to the // ServerSocketChannel ServerSocketChannel ssc = (ServerSocketChannel) key.channel(); // Get SocketChannel SocketChannel sockChan = ssc.accept(); sockChan.configureBlocking(false); //Configure the Socket Socket socket = sockChan.socket(); socket.setSendBufferSize(Network.INITIAL_SOCKET_BUFFER_SIZE); socket.setReceiveBufferSize(Network.INITIAL_SOCKET_BUFFER_SIZE); socket.setTcpNoDelay(MBServerStatics.TCP_NO_DELAY_DEFAULT); //Register with the selector SelectionKey sk = sockChan.register(this.selector, SelectionKey.OP_READ); if (sk == null) { Logger.error("FIX ME! NULL SELECTION KEY!"); return null; } //Initialize Connection AbstractConnection ac = this.getNewIncomingConnection(sockChan); sk.attach(ac); this.postAcceptNewConnectionHook(ac); return ac; } /** * Hook for subclasses to use. * * @param key */ protected void preAcceptNewConnectionHook(SelectionKey key) { } /** * Hook for subclasses to use. * * @param ac */ protected void postAcceptNewConnectionHook(AbstractConnection ac) { } protected abstract AbstractConnection getNewIncomingConnection(SocketChannel sockChan); protected abstract AbstractConnection getNewOutgoingConnection(SocketChannel sockChan); /* * Disconnect / Destroy Connection FNs */ protected boolean disconnect(final SelectionKey key) { this.disconnect((AbstractConnection) key.attachment()); key.attach(null); key.cancel(); return key.isValid(); } protected boolean disconnect(final AbstractConnection c) { if (c == null) return false; c.disconnect(); return c.getSocketChannel().isConnected(); } /* * Data IO */ /* * WRITE SEQUENCE */ protected void disconnectAll() { synchronized (this.selector.keys()) { for (SelectionKey sk : this.selector.keys()) { if (sk.channel() instanceof SocketChannel) disconnect(sk); } } } /** * Submits a request to set this Connection to WRITE mode. */ protected void sendStart(final SocketChannel sockChan) { synchronized (this.chngReqs) { // Indicate we want the interest ops set changed this.chngReqs.add(new ChangeRequest(sockChan, ChangeType.CHANGEOPS, SelectionKey.OP_WRITE)); } this.selector.wakeup(); } /* * READ SEQUENCE */ /** * @param key * @return Boolean indication emit success. */ protected boolean sendFinish(final SelectionKey key) { SocketChannel sockChan = (SocketChannel) key.channel(); // Check to see if the SocketChannel the selector offered up // is null. if (sockChan == null) { Logger.error(": null sockChannel."); this.disconnect(key); return false; } AbstractConnection c = (AbstractConnection) key.attachment(); if (c == null) { Logger.error(": null Connection."); this.disconnect(key); return false; } // long startTime = System.currentTimeMillis(); boolean allSent = c.writeAll(); // if ((System.currentTimeMillis() - startTime) > 20) // this.logDirectWARNING(c.getRemoteAddressAndPortAsString() + " took " + (System.currentTimeMillis() - startTime) + "ms to handle!"); // If all was written, switch back to Read Mode. if (allSent || !c.isConnected()) { // Indicate we want the interest ops set changed ChangeRequest chReq = new ChangeRequest(c.getSocketChannel(), ChangeType.CHANGEOPS, SelectionKey.OP_READ); synchronized (this.chngReqs) { this.chngReqs.add(chReq); } this.selector.wakeup(); } return true; } /** * @param key * @return Boolean indication of success. */ private boolean receive(final SelectionKey key) { SocketChannel sockChan = (SocketChannel) key.channel(); // Check to see if the SocketChannel the selector offered up // is null. if (sockChan == null) { Logger.error("null sockChannel."); this.disconnect(key); return false; } AbstractConnection c = (AbstractConnection) key.attachment(); if (c == null) { Logger.error("null Connection."); this.disconnect(key); return false; } c.read(); return true; } /* * Main Loop And Loop Controls */ private void processChangeRequests() { SelectionKey selKey = null; ChangeRequest sccr = null; ChangeType change = null; SocketChannel sockChan = null; synchronized (this.chngReqs) { Iterator it = this.chngReqs.iterator(); while (it.hasNext()) { sccr = it.next(); if (sccr == null) { it.remove(); continue; } change = sccr.getChangeType(); sockChan = sccr.getSocketChannel(); switch (change) { case CHANGEOPS: selKey = sockChan.keyFor(this.selector); if (selKey == null || selKey.isValid() == false) continue; selKey.interestOps(sccr.getOps()); break; case REGISTER: try { sockChan.register(this.selector, sccr.getOps()); } catch (ClosedChannelException e) { // TODO Should a closed channel be logged or just // cleaned up? // Logger.error(this.getLocalNodeName(), e); } break; } } this.chngReqs.clear(); } } private void processNewEvents() { SelectionKey thisKey = null; Iterator selectedKeys = null; JobManager jm = JobManager.getInstance(); selectedKeys = this.selector.selectedKeys().iterator(); if (selectedKeys.hasNext() == false) return; while (selectedKeys.hasNext()) { thisKey = selectedKeys.next(); //To shake out any issues if (thisKey.attachment() == null) Logger.error("Found null attachment! PANIC!"); if (thisKey.attachment() instanceof AbstractConnection) if (((AbstractConnection) thisKey.attachment()).execTask.get() == true) continue; selectedKeys.remove(); try { if (thisKey.isValid() == false) break; // Changed from continue else if (thisKey.isAcceptable()) this.acceptNewConnection(thisKey); else if (thisKey.isReadable()) jm.submitJob(new ReadOperationHander(thisKey)); else if (thisKey.isWritable()) jm.submitJob(new WriteOperationHander(thisKey)); else if (thisKey.isConnectable()) this.finishConnectingTo(thisKey); else Logger.error("Unhandled keystate: " + thisKey.toString()); } catch (CancelledKeyException cke) { Logger.error(this.getLocalNodeName(), cke); this.disconnect(thisKey); } catch (IOException e) { Logger.error(this.getLocalNodeName(), e); } } } protected void connectTo(String host, int port) { try { this.connectTo(InetAddress.getByName(host), port); } catch (UnknownHostException e) { Logger.error(this.getLocalNodeName(), e); } } protected void connectTo(InetAddress host, int port) { try { this.startConnectingTo(host, port); this.selector.wakeup(); } catch (IOException e) { Logger.error(this.getLocalNodeName(), e); } } protected final void startConnectingTo(InetAddress host, int port) throws IOException { // Create a non-blocking socket channel SocketChannel sockChan = SocketChannel.open(); sockChan.configureBlocking(false); sockChan.socket().setSendBufferSize(Network.INITIAL_SOCKET_BUFFER_SIZE); sockChan.socket().setReceiveBufferSize(Network.INITIAL_SOCKET_BUFFER_SIZE); // Make a new Connection object this.getNewOutgoingConnection(sockChan); // Kick off connection establishment sockChan.connect(new InetSocketAddress(host, port)); synchronized (this.chngReqs) { this.chngReqs.add(new ChangeRequest(sockChan, ChangeType.REGISTER, SelectionKey.OP_CONNECT)); } this.selector.wakeup(); } private void finishConnectingTo(SelectionKey key) throws IOException { this.preFinishConnectingToHook(key); // Get sockChan SocketChannel sockChan = (SocketChannel) key.channel(); // Get AbstractConnection AbstractConnection ac = (AbstractConnection) key.attachment(); if (sockChan == null) { Logger.error(this.getLocalNodeName(), "null socketChannel"); this.disconnect(key); return; } if (ac == null) { Logger.error(this.getLocalNodeName(), "null AbstractConnection"); this.disconnect(key); return; } // Finish the connection. If the connection operation failed // this will raise an IOException. try { sockChan.finishConnect(); } catch (IOException e) { if (e.getMessage().startsWith("Connection refused:") || e.getMessage().startsWith( "An existing connection was forcibly closed")) { // eat this type of IOException } else Logger.error(this.getLocalNodeName(), e); // Cancel the channel's registration with our selector key.cancel(); return; } Socket socket = sockChan.socket(); Logger.debug("Connected to: " + socket.getInetAddress() + ':' + socket.getPort()); sockChan.configureBlocking(false); sockChan.register(this.selector, SelectionKey.OP_READ); this.postFinishConnectingToHook(ac); } /** * Hook for subclasses to use. * * @param key */ protected void preFinishConnectingToHook(SelectionKey key) { } /** * Hook for subclasses to use. * * @param ac */ protected void postFinishConnectingToHook(AbstractConnection ac) { } public final String getLocalNodeName() { return this.getThreadName(); } protected int auditSocketChannelToConnectionMap() { long startTime = System.currentTimeMillis(); int numberOfItemsToProcess = 0; if (lastAuditTime + MBServerStatics.TIMEOUT_CHECKS_TIMER_MS > startTime) return -1; synchronized (this.selector.keys()) { for (SelectionKey sk : this.selector.keys()) { if (!(sk.channel() instanceof SocketChannel)) continue; SocketChannel sockChan = (SocketChannel) sk.channel(); AbstractConnection conn = (AbstractConnection) sk.attachment(); if (sockChan == null) continue; if (!sockChan.isOpen()) { numberOfItemsToProcess++; Logger.info("sockChan closed. Disconnecting.."); disconnect(sk); continue; } if (conn == null) { numberOfItemsToProcess++; Logger.info("Connection is null, Disconnecting."); disconnect(sk); continue; } //removed keep alive timeout. Believe failmu used this for disconnecting players on force quit, but a closed socket will already disconnect. // if (conn.getLastKeepAliveTime() + MBServerStatics.KEEPALIVE_TIMEOUT_MS < startTime) { // numberOfItemsToProcess++; // Logger.info("Keep alive Disconnecting " + conn.getRemoteAddressAndPortAsString()); // conn.disconnect(); // continue; // } if (conn.getLastMsgTime() + MBServerStatics.AFK_TIMEOUT_MS < startTime) { numberOfItemsToProcess++; Logger.info("AFK TIMEOUT Disconnecting " + conn.getRemoteAddressAndPortAsString()); conn.disconnect(); } } } if (numberOfItemsToProcess != 0) Logger.info("Cleaned " + numberOfItemsToProcess + " dead connections in " + (System.currentTimeMillis() - startTime) + " millis."); lastAuditTime = System.currentTimeMillis(); return numberOfItemsToProcess; } public int getConnectionSize() { if (this.selector == null) return -1; if (this.selector.keys() == null) return -1; return this.selector.keys().size(); } /** * Returns the port on which this socket is listening. * * @return the port number to which this socket is listening or -1 if the * socket is not bound yet. */ public int getListeningPort() { if (this.listenChannel == null) return -1; if (this.listenChannel.socket() == null) return -1; return this.listenChannel.socket().getLocalPort(); } /** * Returns the address of the endpoint this socket is bound to, or null if * it is not bound yet. * * @return a SocketAddress representing the local endpoint of this socket, * or null if it is not bound yet. */ public SocketAddress getListeningAddress() { if (this.listenChannel == null) return null; if (this.listenChannel.socket() == null) return null; return this.listenChannel.socket().getLocalSocketAddress(); } /* * */ protected static enum ChangeType { REGISTER, CHANGEOPS } private class ChangeRequest { private final SocketChannel sockChan; private final ChangeType changeType; private final Integer ops; public ChangeRequest(SocketChannel sockChan, ChangeType changeType, int ops) { this.sockChan = sockChan; this.changeType = changeType; this.ops = ops; } public SocketChannel getSocketChannel() { synchronized (this.sockChan) { return this.sockChan; } } public ChangeType getChangeType() { synchronized (this.changeType) { return this.changeType; } } public int getOps() { synchronized (this.ops) { return this.ops; } } } private class ReadOperationHander extends AbstractJob { private final SelectionKey sk; private final AbstractConnection ac; private final boolean runStatus; public ReadOperationHander(final SelectionKey sk) { this.sk = sk; if (sk.attachment() instanceof AbstractConnection) { this.ac = (AbstractConnection) sk.attachment(); this.runStatus = this.ac.execTask.compareAndSet(false, true); } else { this.ac = null; this.runStatus = false; Logger.error("Passed selection key did not have a corresponding Connection!(Read)"); } } @Override protected void doJob() { if (runStatus) { this.ac.connMan.receive(sk); this.ac.execTask.compareAndSet(true, false); } } } private class WriteOperationHander extends AbstractJob { private final SelectionKey sk; private final AbstractConnection ac; private final boolean runStatus; public WriteOperationHander(final SelectionKey sk) { this.sk = sk; if (sk.attachment() instanceof AbstractConnection) { this.ac = (AbstractConnection) sk.attachment(); this.runStatus = this.ac.execTask.compareAndSet(false, true); } else { this.runStatus = false; this.ac = null; Logger.error("Passed selection key did not have a corresponding Connection!(Write)"); } } @Override protected void doJob() { if (runStatus) { this.ac.connMan.sendFinish(sk); this.ac.execTask.compareAndSet(true, false); } } } }