// • ▌ ▄ ·. ▄▄▄· ▄▄ • ▪ ▄▄· ▄▄▄▄· ▄▄▄· ▐▄▄▄ ▄▄▄ . // ·██ ▐███▪▐█ ▀█ ▐█ ▀ ▪██ ▐█ ▌▪▐█ ▀█▪▐█ ▀█ •█▌ ▐█▐▌· // ▐█ ▌▐▌▐█·▄█▀▀█ ▄█ ▀█▄▐█·██ ▄▄▐█▀▀█▄▄█▀▀█ ▐█▐ ▐▌▐▀▀▀ // ██ ██▌▐█▌▐█ ▪▐▌▐█▄▪▐█▐█▌▐███▌██▄▪▐█▐█ ▪▐▌██▐ █▌▐█▄▄▌ // ▀▀ █▪▀▀▀ ▀ ▀ ·▀▀▀▀ ▀▀▀·▀▀▀ ·▀▀▀▀ ▀ ▀ ▀▀ █▪ ▀▀▀ // Magicbane Emulator Project © 2013 - 2022 // www.magicbane.com package engine.net; import engine.mbEnums.DispatchChannel; import org.pmw.tinylog.Logger; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.LongAdder; import java.util.regex.Pattern; // MB Dev Notes: // All outgoing Protocol messages to the player are managed through the MessageDispatcher.class. // All incoming Protocol messages from the player are managed by the Protocol.class. // // A DispatchMessage is configured (Protocol messsage) and wrapped in a Dispatch (distribution list). // A Dispatch can be submitted to the MessageDispatcher for delivery from any thread. // // Dispatches are interleaved between channels. This is to ensure // a combat or movement message is not delayed by spam clicking a // larger message. Choose your channel wisely. public class MessageDispatcher implements Runnable { // Class variables private static final ConcurrentLinkedQueue[] _messageQueue = new ConcurrentLinkedQueue[DispatchChannel.values().length]; private static final LinkedBlockingQueue _blockingQueue = new LinkedBlockingQueue<>(); public static volatile long[] messageCount = new long[DispatchChannel.values().length]; public static LongAdder[] dispatchCount = new LongAdder[DispatchChannel.values().length]; // Performance metrics public static volatile long[] maxRecipients = new long[DispatchChannel.values().length]; public static LongAdder itemPoolSize = new LongAdder(); private final Pattern filterPattern; // Unused, but just in case private Dispatch messageDispatch; public MessageDispatcher() { // Create new FIFO queues for this network thread for (DispatchChannel dispatchChannel : DispatchChannel.values()) { _messageQueue[dispatchChannel.getChannelID()] = new ConcurrentLinkedQueue<>(); dispatchCount[dispatchChannel.getChannelID()] = new LongAdder(); } filterPattern = Pattern.compile("[^\\p{ASCII}]"); Logger.info(" Dispatcher thread has started!"); } @Override public void run() { boolean shouldBlock; while (true) { try { shouldBlock = true; for (DispatchChannel dispatchChannel : DispatchChannel.values()) { this.messageDispatch = _messageQueue[dispatchChannel.getChannelID()].poll(); if (this.messageDispatch != null) { DispatchMessage.serializeDispatch(this.messageDispatch); shouldBlock = false; } } if (shouldBlock == true) shouldBlock = _blockingQueue.take(); } catch (Exception e) { Logger.error(e); } } } public static void send(Dispatch messageDispatch, DispatchChannel dispatchChannel) { // Use Dispatch.borrow() for a new dispatch. // The Dispatch will be released by the system // once delivered. // Don't queue up empty dispatches! if (messageDispatch.player == null) return; _messageQueue[dispatchChannel.getChannelID()].add(messageDispatch); _blockingQueue.add(true); // Update performance metrics messageCount[dispatchChannel.getChannelID()]++; } public static String getNetstatString() { String outString; String newLine = System.getProperty("line.separator"); outString = "[LUA_NETSTA()]" + newLine; outString += "poolSize: " + itemPoolSize.longValue() + '\n'; for (DispatchChannel dispatchChannel : DispatchChannel.values()) { outString += "Channel: " + dispatchChannel.name() + '\n'; outString += "Dispatches: " + dispatchCount[dispatchChannel.getChannelID()].longValue() + '\n'; outString += "Messages: " + messageCount[dispatchChannel.getChannelID()] + '\n'; outString += "maxRecipients: " + maxRecipients[dispatchChannel.getChannelID()] + '\n'; } return outString; } // For Debugging: //Logger.error("MessageDispatcher", messageDispatch.msg.getOpcodeAsString() + " sent to " + messageDispatch.playerList.size() + " players"); }