Public Repository for the Magicbane Shadowbane Emulator
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

708 lines
17 KiB

// • ▌ ▄ ·. ▄▄▄· ▄▄ • ▪ ▄▄· ▄▄▄▄· ▄▄▄· ▐▄▄▄ ▄▄▄ .
// ·██ ▐███▪▐█ ▀█ ▐█ ▀ ▪██ ▐█ ▌▪▐█ ▀█▪▐█ ▀█ •█▌ ▐█▐▌·
// ▐█ ▌▐▌▐█·▄█▀▀█ ▄█ ▀█▄▐█·██ ▄▄▐█▀▀█▄▄█▀▀█ ▐█▐ ▐▌▐▀▀▀
// ██ ██▌▐█▌▐█ ▪▐▌▐█▄▪▐█▐█▌▐███▌██▄▪▐█▐█ ▪▐▌██▐ █▌▐█▄▄▌
// ▀▀ █▪▀▀▀ ▀ ▀ ·▀▀▀▀ ▀▀▀·▀▀▀ ·▀▀▀▀ ▀ ▀ ▀▀ █▪ ▀▀▀
// Magicbane Emulator Project © 2013 - 2022
// www.magicbane.com
package engine.net;
import engine.core.ControlledRunnable;
import engine.job.AbstractJob;
import engine.job.JobManager;
import engine.server.MBServerStatics;
import org.pmw.tinylog.Logger;
import java.io.IOException;
import java.net.*;
import java.nio.channels.*;
import java.nio.channels.spi.SelectorProvider;
import java.util.Iterator;
import java.util.concurrent.ConcurrentLinkedQueue;
public abstract class AbstractConnectionManager extends ControlledRunnable {
private final Selector selector;
private final ServerSocketChannel listenChannel;
private final ConcurrentLinkedQueue<ChangeRequest> chngReqs = new ConcurrentLinkedQueue<>();
/*
*
*/
public AbstractConnectionManager(String nodeName, InetAddress hostAddress,
int port) throws IOException {
super(nodeName);
this.selector = SelectorProvider.provider().openSelector();
// Create a new non-blocking Server channel
this.listenChannel = ServerSocketChannel.open();
this.listenChannel.configureBlocking(false);
// Bind
InetSocketAddress isa = new InetSocketAddress(hostAddress, port);
this.listenChannel.socket().bind(isa);
Logger.info(this.getLocalNodeName() + " Configured to listen: "
+ isa.getAddress().toString() + ':' + port);
// register an interest in Accepting new connections.
SelectionKey sk = this.listenChannel.register(this.selector, SelectionKey.OP_ACCEPT);
sk.attach(this);
}
/*
* ControlledRunnable implementations
*/
@Override
protected void _startup() {
}
@Override
protected void _shutdown() {
this.selector.wakeup();
this.disconnectAll();
this.selector.wakeup();
}
@Override
protected boolean _preRun() {
this.runStatus = true;
return true;
}
@Override
protected boolean _Run() {
while (this.runCmd) {
try {
this.runLoopHook();
this.processChangeRequests();
this.auditSocketChannelToConnectionMap();
this.selector.select(250L);
this.processNewEvents();
} catch (Exception e) {
Logger.error(e.toString());
}
}
return true;
}
@Override
protected boolean _postRun() {
this.runStatus = false;
this.disconnectAll();
try {
this.selector.close();
} catch (IOException e) {
Logger.error( e.toString());
}
return true;
}
/**
* Hook for subclasses to use.
*
*/
protected void runLoopHook() {
}
/*
* Accept / New Connection FNs
*/
private AbstractConnection acceptNewConnection(final SelectionKey key)
throws IOException {
this.preAcceptNewConnectionHook(key);
// Cancel incoming connections if server isn't set to listen
if (this.listenChannel == null || this.listenChannel.isOpen() == false) {
key.cancel();
return null;
}
// For an accept to be pending, the key contains a reference to the
// ServerSocketChannel
ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
// Get SocketChannel
SocketChannel sockChan = ssc.accept();
sockChan.configureBlocking(false);
//Configure the Socket
Socket socket = sockChan.socket();
socket.setSendBufferSize(Network.INITIAL_SOCKET_BUFFER_SIZE);
socket.setReceiveBufferSize(Network.INITIAL_SOCKET_BUFFER_SIZE);
socket.setTcpNoDelay(MBServerStatics.TCP_NO_DELAY_DEFAULT);
//Register with the selector
SelectionKey sk = sockChan.register(this.selector, SelectionKey.OP_READ);
if (sk == null) {
Logger.error("FIX ME! NULL SELECTION KEY!");
return null;
}
//Initialize Connection
AbstractConnection ac = this.getNewIncomingConnection(sockChan);
sk.attach(ac);
this.postAcceptNewConnectionHook(ac);
return ac;
}
/**
* Hook for subclasses to use.
*
* @param key
*/
protected void preAcceptNewConnectionHook(SelectionKey key) {
}
/**
* Hook for subclasses to use.
*
* @param ac
*/
protected void postAcceptNewConnectionHook(AbstractConnection ac) {
}
protected abstract AbstractConnection getNewIncomingConnection(SocketChannel sockChan);
protected abstract AbstractConnection getNewOutgoingConnection(SocketChannel sockChan);
/*
* Disconnect / Destroy Connection FNs
*/
protected boolean disconnect(final SelectionKey key) {
this.disconnect((AbstractConnection) key.attachment());
key.attach(null);
key.cancel();
return key.isValid();
}
protected boolean disconnect(final AbstractConnection c) {
if (c == null)
return false;
c.disconnect();
return c.getSocketChannel().isConnected();
}
protected void disconnectAll() {
synchronized (this.selector.keys()) {
for (SelectionKey sk : this.selector.keys()) {
if (sk.channel() instanceof SocketChannel)
disconnect(sk);
}
}
}
/*
* Data IO
*/
/*
* WRITE SEQUENCE
*/
/**
* Submits a request to set this Connection to WRITE mode.
*
*/
protected void sendStart(final SocketChannel sockChan) {
synchronized (this.chngReqs) {
// Indicate we want the interest ops set changed
this.chngReqs.add(new ChangeRequest(sockChan, ChangeType.CHANGEOPS, SelectionKey.OP_WRITE));
}
this.selector.wakeup();
}
/**
*
* @param key
* @return Boolean indication emit success.
*/
protected boolean sendFinish(final SelectionKey key) {
SocketChannel sockChan = (SocketChannel) key.channel();
// Check to see if the SocketChannel the selector offered up
// is null.
if (sockChan == null) {
Logger.error(": null sockChannel.");
this.disconnect(key);
return false;
}
AbstractConnection c = (AbstractConnection) key.attachment();
if (c == null) {
Logger.error(": null Connection.");
this.disconnect(key);
return false;
}
// long startTime = System.currentTimeMillis();
boolean allSent = c.writeAll();
// if ((System.currentTimeMillis() - startTime) > 20)
// this.logDirectWARNING(c.getRemoteAddressAndPortAsString() + " took " + (System.currentTimeMillis() - startTime) + "ms to handle!");
// If all was written, switch back to Read Mode.
if (allSent || !c.isConnected()) {
// Indicate we want the interest ops set changed
ChangeRequest chReq = new ChangeRequest(c.getSocketChannel(), ChangeType.CHANGEOPS, SelectionKey.OP_READ);
synchronized (this.chngReqs) {
this.chngReqs.add(chReq);
}
this.selector.wakeup();
}
return true;
}
/*
* READ SEQUENCE
*/
/**
*
* @param key
* @return Boolean indication of success.
*/
private boolean receive(final SelectionKey key) {
SocketChannel sockChan = (SocketChannel) key.channel();
// Check to see if the SocketChannel the selector offered up
// is null.
if (sockChan == null) {
Logger.error("null sockChannel.");
this.disconnect(key);
return false;
}
AbstractConnection c = (AbstractConnection) key.attachment();
if (c == null) {
Logger.error("null Connection.");
this.disconnect(key);
return false;
}
c.read();
return true;
}
/*
* Main Loop And Loop Controls
*/
private void processChangeRequests() {
SelectionKey selKey = null;
ChangeRequest sccr = null;
ChangeType change = null;
SocketChannel sockChan = null;
synchronized (this.chngReqs) {
Iterator<ChangeRequest> it = this.chngReqs.iterator();
while (it.hasNext()) {
sccr = it.next();
if (sccr == null) {
it.remove();
continue;
}
change = sccr.getChangeType();
sockChan = sccr.getSocketChannel();
switch (change) {
case CHANGEOPS:
selKey = sockChan.keyFor(this.selector);
if (selKey == null || selKey.isValid() == false)
continue;
selKey.interestOps(sccr.getOps());
break;
case REGISTER:
try {
sockChan.register(this.selector, sccr.getOps());
} catch (ClosedChannelException e) {
// TODO Should a closed channel be logged or just
// cleaned up?
// Logger.error(this.getLocalNodeName(), e);
}
break;
}
}
this.chngReqs.clear();
}
}
private void processNewEvents() {
SelectionKey thisKey = null;
Iterator<SelectionKey> selectedKeys = null;
JobManager jm = JobManager.getInstance();
selectedKeys = this.selector.selectedKeys().iterator();
if (selectedKeys.hasNext() == false)
return;
while (selectedKeys.hasNext()) {
thisKey = selectedKeys.next();
//To shake out any issues
if (thisKey.attachment() == null)
Logger.error("Found null attachment! PANIC!");
if (thisKey.attachment() instanceof AbstractConnection)
if (((AbstractConnection) thisKey.attachment()).execTask.get() == true)
continue;
selectedKeys.remove();
try {
if (thisKey.isValid() == false)
break; // Changed from continue
else if (thisKey.isAcceptable())
this.acceptNewConnection(thisKey);
else if (thisKey.isReadable())
jm.submitJob(new ReadOperationHander(thisKey));
else if (thisKey.isWritable())
jm.submitJob(new WriteOperationHander(thisKey));
else if (thisKey.isConnectable())
this.finishConnectingTo(thisKey);
else
Logger.error("Unhandled keystate: " + thisKey.toString());
} catch (CancelledKeyException cke) {
Logger.error(this.getLocalNodeName(), cke);
this.disconnect(thisKey);
} catch (IOException e) {
Logger.error(this.getLocalNodeName(), e);
}
}
}
protected void connectTo(String host, int port) {
try {
this.connectTo(InetAddress.getByName(host), port);
} catch (UnknownHostException e) {
Logger.error(this.getLocalNodeName(), e);
}
}
protected void connectTo(InetAddress host, int port) {
try {
this.startConnectingTo(host, port);
this.selector.wakeup();
} catch (IOException e) {
Logger.error(this.getLocalNodeName(), e);
}
}
protected final void startConnectingTo(InetAddress host, int port)
throws IOException {
// Create a non-blocking socket channel
SocketChannel sockChan = SocketChannel.open();
sockChan.configureBlocking(false);
sockChan.socket().setSendBufferSize(Network.INITIAL_SOCKET_BUFFER_SIZE);
sockChan.socket().setReceiveBufferSize(Network.INITIAL_SOCKET_BUFFER_SIZE);
// Make a new Connection object
this.getNewOutgoingConnection(sockChan);
// Kick off connection establishment
sockChan.connect(new InetSocketAddress(host, port));
synchronized (this.chngReqs) {
this.chngReqs.add(new ChangeRequest(sockChan, ChangeType.REGISTER, SelectionKey.OP_CONNECT));
}
this.selector.wakeup();
}
private void finishConnectingTo(SelectionKey key) throws IOException {
this.preFinishConnectingToHook(key);
// Get sockChan
SocketChannel sockChan = (SocketChannel) key.channel();
// Get AbstractConnection
AbstractConnection ac = (AbstractConnection) key.attachment();
if (sockChan == null) {
Logger.error(this.getLocalNodeName(), "null socketChannel");
this.disconnect(key);
return;
}
if (ac == null) {
Logger.error(this.getLocalNodeName(), "null AbstractConnection");
this.disconnect(key);
return;
}
// Finish the connection. If the connection operation failed
// this will raise an IOException.
try {
sockChan.finishConnect();
} catch (IOException e) {
if (e.getMessage().startsWith("Connection refused:")
|| e.getMessage().startsWith(
"An existing connection was forcibly closed")) {
// eat this type of IOException
} else
Logger.error(this.getLocalNodeName(), e);
// Cancel the channel's registration with our selector
key.cancel();
return;
}
Socket socket = sockChan.socket();
Logger.debug("Connected to: "
+ socket.getInetAddress() + ':'
+ socket.getPort());
sockChan.configureBlocking(false);
sockChan.register(this.selector, SelectionKey.OP_READ);
this.postFinishConnectingToHook(ac);
}
/**
* Hook for subclasses to use.
*
* @param key
*/
protected void preFinishConnectingToHook(SelectionKey key) {
}
/**
* Hook for subclasses to use.
*
* @param ac
*/
protected void postFinishConnectingToHook(AbstractConnection ac) {
}
public final String getLocalNodeName() {
return this.getThreadName();
}
/**
* Removes the mapping that contains the key 'sockChan'
*
* @param sockChan
*/
private long lastAuditTime = 0;
protected int auditSocketChannelToConnectionMap() {
long startTime = System.currentTimeMillis();
int numberOfItemsToProcess = 0;
if (lastAuditTime + MBServerStatics.TIMEOUT_CHECKS_TIMER_MS > startTime)
return -1;
synchronized (this.selector.keys()) {
for (SelectionKey sk : this.selector.keys()) {
if (!(sk.channel() instanceof SocketChannel))
continue;
SocketChannel sockChan = (SocketChannel) sk.channel();
AbstractConnection conn = (AbstractConnection) sk.attachment();
if (sockChan == null)
continue;
if (!sockChan.isOpen()) {
numberOfItemsToProcess++;
Logger.info("sockChan closed. Disconnecting..");
disconnect(sk);
continue;
}
if (conn == null) {
numberOfItemsToProcess++;
Logger.info("Connection is null, Disconnecting.");
disconnect(sk);
continue;
}
//removed keep alive timeout. Believe failmu used this for disconnecting players on force quit, but a closed socket will already disconnect.
// if (conn.getLastKeepAliveTime() + MBServerStatics.KEEPALIVE_TIMEOUT_MS < startTime) {
// numberOfItemsToProcess++;
// Logger.info("Keep alive Disconnecting " + conn.getRemoteAddressAndPortAsString());
// conn.disconnect();
// continue;
// }
if (conn.getLastMsgTime() + MBServerStatics.AFK_TIMEOUT_MS < startTime) {
numberOfItemsToProcess++;
Logger.info("AFK TIMEOUT Disconnecting " + conn.getRemoteAddressAndPortAsString());
conn.disconnect();
}
}
}
if (numberOfItemsToProcess != 0)
Logger.info( "Cleaned "
+ numberOfItemsToProcess
+ " dead connections in "
+ (System.currentTimeMillis() - startTime)
+ " millis.");
lastAuditTime = System.currentTimeMillis();
return numberOfItemsToProcess;
}
/*
*
*/
protected static enum ChangeType {
REGISTER, CHANGEOPS
}
private class ChangeRequest {
private final SocketChannel sockChan;
private final ChangeType changeType;
private final Integer ops;
public ChangeRequest(SocketChannel sockChan, ChangeType changeType,
int ops) {
this.sockChan = sockChan;
this.changeType = changeType;
this.ops = ops;
}
public SocketChannel getSocketChannel() {
synchronized (this.sockChan) {
return this.sockChan;
}
}
public ChangeType getChangeType() {
synchronized (this.changeType) {
return this.changeType;
}
}
public int getOps() {
synchronized (this.ops) {
return this.ops;
}
}
}
public int getConnectionSize() {
if (this.selector == null)
return -1;
if (this.selector.keys() == null)
return -1;
return this.selector.keys().size();
}
/**
* Returns the port on which this socket is listening.
*
* @return the port number to which this socket is listening or -1 if the
* socket is not bound yet.
*
*/
public int getListeningPort() {
if (this.listenChannel == null)
return -1;
if (this.listenChannel.socket() == null)
return -1;
return this.listenChannel.socket().getLocalPort();
}
/**
* Returns the address of the endpoint this socket is bound to, or null if
* it is not bound yet.
*
* @return a SocketAddress representing the local endpoint of this socket,
* or null if it is not bound yet.
*/
public SocketAddress getListeningAddress() {
if (this.listenChannel == null)
return null;
if (this.listenChannel.socket() == null)
return null;
return this.listenChannel.socket().getLocalSocketAddress();
}
private class ReadOperationHander extends AbstractJob {
private final SelectionKey sk;
private final AbstractConnection ac;
private final boolean runStatus;
public ReadOperationHander(final SelectionKey sk) {
this.sk = sk;
if (sk.attachment() instanceof AbstractConnection) {
this.ac = (AbstractConnection) sk.attachment();
this.runStatus = this.ac.execTask.compareAndSet(false, true);
} else {
this.ac = null;
this.runStatus = false;
Logger.error("Passed selection key did not have a corresponding Connection!(Read)");
}
}
@Override
protected void doJob() {
if (runStatus) {
this.ac.connMan.receive(sk);
this.ac.execTask.compareAndSet(true, false);
}
}
}
private class WriteOperationHander extends AbstractJob {
private final SelectionKey sk;
private final AbstractConnection ac;
private final boolean runStatus;
public WriteOperationHander(final SelectionKey sk) {
this.sk = sk;
if (sk.attachment() instanceof AbstractConnection) {
this.ac = (AbstractConnection) sk.attachment();
this.runStatus = this.ac.execTask.compareAndSet(false, true);
} else {
this.runStatus = false;
this.ac = null;
Logger.error("Passed selection key did not have a corresponding Connection!(Write)");
}
}
@Override
protected void doJob() {
if (runStatus) {
this.ac.connMan.sendFinish(sk);
this.ac.execTask.compareAndSet(true, false);
}
}
}
}