// • ▌ ▄ ·. ▄▄▄· ▄▄ • ▪ ▄▄· ▄▄▄▄· ▄▄▄· ▐▄▄▄ ▄▄▄ . // ·██ ▐███▪▐█ ▀█ ▐█ ▀ ▪██ ▐█ ▌▪▐█ ▀█▪▐█ ▀█ •█▌ ▐█▐▌· // ▐█ ▌▐▌▐█·▄█▀▀█ ▄█ ▀█▄▐█·██ ▄▄▐█▀▀█▄▄█▀▀█ ▐█▐ ▐▌▐▀▀▀ // ██ ██▌▐█▌▐█ ▪▐▌▐█▄▪▐█▐█▌▐███▌██▄▪▐█▐█ ▪▐▌██▐ █▌▐█▄▄▌ // ▀▀ █▪▀▀▀ ▀ ▀ ·▀▀▀▀ ▀▀▀·▀▀▀ ·▀▀▀▀ ▀ ▀ ▀▀ █▪ ▀▀▀ // Magicbane Emulator Project © 2013 - 2022 // www.magicbane.com package engine.job; import engine.jobs.AttackJob; import engine.jobs.UsePowerJob; import engine.net.CheckNetMsgFactoryJob; import engine.net.ConnectionMonitorJob; import engine.server.MBServerStatics; import org.pmw.tinylog.Logger; import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; import java.util.Queue; import java.util.concurrent.LinkedBlockingQueue; public class JobPool { private final LinkedBlockingQueue jobWaitQueue = new LinkedBlockingQueue<>(); private final LinkedBlockingQueue jobWorkerQueue = new LinkedBlockingQueue<>(); private final ArrayList jobWorkerList = new ArrayList<>(); private final LinkedBlockingQueue jobRunList = new LinkedBlockingQueue<>(); int jobPoolID; int maxWorkers; int nextWorkerID; private boolean blockNewSubmissions = false; public JobPool(int id, int workers) { this.jobPoolID = id; // default to 1 unless workers parameter is higher int actualWorkers = 1; if (workers > 1) actualWorkers = workers; this.maxWorkers = actualWorkers; for (int i = 0; i < actualWorkers; i++) { this.startWorker(i); } this.nextWorkerID = this.maxWorkers; } private static boolean isExemptJobFromAudit(AbstractJob aj) { // If the job is any of the following classes, exempt it from auditWorkers if (aj instanceof ConnectionMonitorJob) { return true; } else return aj instanceof CheckNetMsgFactoryJob || aj instanceof AttackJob || aj instanceof UsePowerJob; } private int getNextWorkerID() { return this.nextWorkerID++; } public int getJobPoolID() { return this.jobPoolID; } public void setBlockNewSubmissions(boolean blocked) { this.blockNewSubmissions = blocked; } public boolean submitJob(AbstractJob aj) { if (blockNewSubmissions) { Logger.warn("A '" + aj.getClass().getSimpleName() + "' job was submitted, but submissions are currently blocked."); return false; } aj.markSubmitTime(); jobWaitQueue.add(aj); // keep notifying workers if the wait queue has items // commented out as the price of polling the wait queue // size while it is being updated out-weighs the gain // for not just blindly waking all workers // unless we have a stupidly large pool vs CPU threads JobWorker jw = jobWorkerQueue.poll(); if (jw != null) { synchronized (jw) { jw.notify(); } } return true; } private void startWorker(int workerID) { // check we dont already have a jobWorker with that ID synchronized (this.jobWorkerList) { for (JobWorker jwi : this.jobWorkerList) { if (jwi.getWorkerId() == workerID) { Logger.error("Attempt to create worker with ID " + workerID + " failed in JobPool " + jobPoolID + " as worker ID already exists"); return; } } } // ID is unique, create worker JobWorker jw; jw = new JobWorker(workerID, this.jobPoolID, this.jobWaitQueue, this.jobWorkerQueue); synchronized (this.jobWorkerList) { //Adds to the overall list.. jobWorkerList.add(jw); } //Returns to the free worker queue.. jw.startup(); } private String getQueueByClassAsString(Queue q) { HashMap ch = new HashMap<>(); int cnt = 0; // iterate through the linked queue and get every item // putting classname and incrementing the value each time in the hashmap Iterator wi = q.iterator(); while (cnt < q.size() && wi.hasNext()) { AbstractJob aj = wi.next(); if (ch.containsKey(aj.getClass().getSimpleName())) { int newValue = ch.get(aj.getClass().getSimpleName()) + 1; ch.put(aj.getClass().getSimpleName(), newValue); } else { ch.put(aj.getClass().getSimpleName(), 1); } cnt++; } // iterate through the hashmap outputting the classname and number of jobs Iterator i = ch.keySet().iterator(); String out = ""; while (i.hasNext()) { Object key = i.next(); out += "JobPoolID_" + this.jobPoolID + ' ' + key.toString() + "=>" + ch.get(key) + '\n'; } if (out.isEmpty()) return "No Jobs on queue\n"; else return out; } public void auditWorkers() { if (!MBServerStatics.ENABLE_AUDIT_JOB_WORKERS) { return; } ArrayList problemJobs = new ArrayList<>(); // Checked for stalled Workers Iterator it = jobWorkerList.iterator(); while (it.hasNext()) { JobWorker jw = it.next(); AbstractJob curJob = jw.getCurrentJob(); if (curJob != null) { // Has a job if (JobPool.isExemptJobFromAudit(curJob)) { continue; } // Determine whether the job is being executed or waiting to // start; if (curJob.getStartTime() <= 0) { // Waiting to start long diff = System.currentTimeMillis() - curJob.getSubmitTime(); if (diff >= MBServerStatics.JOB_STALL_THRESHOLD_MS) { Logger.warn("Job did not start within threshold. Stopping worker#" + jw.getWorkerId() + " JobData:" + curJob.toString()); jw.EmergencyStop(); problemJobs.add(jw.getCurrentJob()); it.remove(); } // end if (diff >= } else if (curJob.getStopTime() <= 0L) { // is executing it long diff = System.currentTimeMillis() - curJob.getStartTime(); if (diff >= MBServerStatics.JOB_STALL_THRESHOLD_MS) { Logger.warn("Job execution time exceeded threshold(" + diff + "). Stopping worker#" + jw.getWorkerId() + " JobData:" + curJob.toString()); jw.EmergencyStop(); problemJobs.add(jw.getCurrentJob()); it.remove(); } // end if (diff >= } // end if(curJob.getStopTime() } // end if(curJob != null) } // end While // Check Worker Count and add workers as necessary; int workerCount = jobWorkerList.size(); int maxThreads = this.maxWorkers; // no pool can go below a single thread if (maxThreads < 1) maxThreads = 1; while (workerCount != maxThreads) { Logger.info("Resizing JobPool " + this.jobPoolID + " from " + workerCount + " to " + maxThreads); if (workerCount < maxThreads) { this.startWorker(this.getNextWorkerID()); if (jobWorkerList.size() <= workerCount) { // Something didnt work correctly Logger.warn("auditWorkers() failed to add a new JobWorker to JobPool " + this.jobPoolID + ". Worker count " + workerCount + " Worker pool size " + jobWorkerList.size() + " Aborting Audit."); return; } } else if (workerCount > maxThreads) { synchronized (this.jobWorkerList) { Logger.warn("Reducing workers in JobPool " + this.jobPoolID + " Worker Count: " + workerCount + " to Max threads: " + maxThreads); // pick a worker off the list and shut it down JobWorker toRemove = null; int loopTries = 5; do { //Infinite loop could be bad.. toRemove = jobWorkerQueue.poll(); } while (toRemove == null && --loopTries >= 0); //remove it from the list toRemove.shutdown(); jobWorkerList.remove(toRemove); } } // update value for next loop pass workerCount = jobWorkerList.size(); } } public void shutdown() { synchronized (this.jobWorkerList) { for (JobWorker jw : this.jobWorkerList) jw.shutdown(); } } public void emergencyStop() { synchronized (this.jobWorkerList) { for (JobWorker jw : this.jobWorkerList) jw.EmergencyStop(); } } public String getRunningQueueByClassAsString() { return this.getQueueByClassAsString(this.jobRunList); } public String getWaitQueueByClassAsString() { return this.getQueueByClassAsString(this.jobWaitQueue); } // used by devcmds public String setMaxWorkers(int maxWorkers) { if (maxWorkers > 0 && maxWorkers < 101) { this.maxWorkers = maxWorkers; // audit workers reduces the cap this.auditWorkers(); return "Max workers set to " + maxWorkers + " for JobPool_" + this.jobPoolID; } else { return "Max workers not set, value must be from 1-100"; } } }