Simplyfied MultiThreadedPOWEngine and thread pool creation
This commit is contained in:
		| @@ -37,7 +37,9 @@ import java.net.InetAddress; | ||||
| import java.util.List; | ||||
| import java.util.Timer; | ||||
| import java.util.TimerTask; | ||||
| import java.util.concurrent.*; | ||||
| import java.util.concurrent.CancellationException; | ||||
| import java.util.concurrent.ExecutionException; | ||||
| import java.util.concurrent.Future; | ||||
|  | ||||
| import static ch.dissem.bitmessage.InternalContext.NETWORK_EXTRA_BYTES; | ||||
| import static ch.dissem.bitmessage.InternalContext.NETWORK_NONCE_TRIALS_PER_BYTE; | ||||
| @@ -64,12 +66,9 @@ public class BitmessageContext { | ||||
|     public static final int CURRENT_VERSION = 3; | ||||
|     private final static Logger LOG = LoggerFactory.getLogger(BitmessageContext.class); | ||||
|  | ||||
|     private final ExecutorService pool; | ||||
|  | ||||
|     private final InternalContext ctx; | ||||
|  | ||||
|     private final Labeler labeler; | ||||
|     private final Listener listener; | ||||
|     private final NetworkHandler.MessageListener networkListener; | ||||
|  | ||||
|     private final boolean sendPubkeyOnIdentityCreation; | ||||
| @@ -77,12 +76,7 @@ public class BitmessageContext { | ||||
|     private BitmessageContext(Builder builder) { | ||||
|         ctx = new InternalContext(builder); | ||||
|         labeler = builder.labeler; | ||||
|         listener = builder.listener; | ||||
|         networkListener = new DefaultMessageListener(ctx, labeler, listener); | ||||
|  | ||||
|         // As this thread is used for parts that do POW, which itself uses parallel threads, only | ||||
|         // one should be executed at any time. | ||||
|         pool = Executors.newFixedThreadPool(1); | ||||
|         networkListener = new DefaultMessageListener(ctx, labeler, builder.listener); | ||||
|  | ||||
|         sendPubkeyOnIdentityCreation = builder.sendPubkeyOnIdentityCreation; | ||||
|  | ||||
| @@ -116,12 +110,7 @@ public class BitmessageContext { | ||||
|         )); | ||||
|         ctx.getAddressRepository().save(identity); | ||||
|         if (sendPubkeyOnIdentityCreation) { | ||||
|             pool.submit(new Runnable() { | ||||
|                 @Override | ||||
|                 public void run() { | ||||
|                     ctx.sendPubkey(identity, identity.getStream()); | ||||
|                 } | ||||
|             }); | ||||
|             ctx.sendPubkey(identity, identity.getStream()); | ||||
|         } | ||||
|         return identity; | ||||
|     } | ||||
| @@ -177,37 +166,32 @@ public class BitmessageContext { | ||||
|         if (msg.getFrom() == null || msg.getFrom().getPrivateKey() == null) { | ||||
|             throw new IllegalArgumentException("'From' must be an identity, i.e. have a private key."); | ||||
|         } | ||||
|         pool.submit(new Runnable() { | ||||
|             @Override | ||||
|             public void run() { | ||||
|                 BitmessageAddress to = msg.getTo(); | ||||
|                 if (to != null) { | ||||
|                     if (to.getPubkey() == null) { | ||||
|                         LOG.info("Public key is missing from recipient. Requesting."); | ||||
|                         ctx.requestPubkey(to); | ||||
|                     } | ||||
|                     if (to.getPubkey() == null) { | ||||
|                         msg.setStatus(PUBKEY_REQUESTED); | ||||
|                         msg.addLabels(ctx.getMessageRepository().getLabels(Label.Type.OUTBOX)); | ||||
|                         ctx.getMessageRepository().save(msg); | ||||
|                     } | ||||
|                 } | ||||
|                 if (to == null || to.getPubkey() != null) { | ||||
|                     LOG.info("Sending message."); | ||||
|                     msg.setStatus(DOING_PROOF_OF_WORK); | ||||
|                     ctx.getMessageRepository().save(msg); | ||||
|                     ctx.send( | ||||
|                             msg.getFrom(), | ||||
|                             to, | ||||
|                             wrapInObjectPayload(msg), | ||||
|                             TTL.msg() | ||||
|                     ); | ||||
|                     msg.setStatus(SENT); | ||||
|                     msg.addLabels(ctx.getMessageRepository().getLabels(Label.Type.SENT)); | ||||
|                     ctx.getMessageRepository().save(msg); | ||||
|                 } | ||||
|         BitmessageAddress to = msg.getTo(); | ||||
|         if (to != null) { | ||||
|             if (to.getPubkey() == null) { | ||||
|                 LOG.info("Public key is missing from recipient. Requesting."); | ||||
|                 ctx.requestPubkey(to); | ||||
|             } | ||||
|         }); | ||||
|             if (to.getPubkey() == null) { | ||||
|                 msg.setStatus(PUBKEY_REQUESTED); | ||||
|                 msg.addLabels(ctx.getMessageRepository().getLabels(Label.Type.OUTBOX)); | ||||
|                 ctx.getMessageRepository().save(msg); | ||||
|             } | ||||
|         } | ||||
|         if (to == null || to.getPubkey() != null) { | ||||
|             LOG.info("Sending message."); | ||||
|             msg.setStatus(DOING_PROOF_OF_WORK); | ||||
|             ctx.getMessageRepository().save(msg); | ||||
|             ctx.send( | ||||
|                     msg.getFrom(), | ||||
|                     to, | ||||
|                     wrapInObjectPayload(msg), | ||||
|                     TTL.msg() | ||||
|             ); | ||||
|             msg.setStatus(SENT); | ||||
|             msg.addLabels(ctx.getMessageRepository().getLabels(Label.Type.SENT)); | ||||
|             ctx.getMessageRepository().save(msg); | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     private ObjectPayload wrapInObjectPayload(Plaintext msg) { | ||||
| @@ -427,6 +411,8 @@ public class BitmessageContext { | ||||
|          * sender can't receive your public key) in some special situations. Also note that it's probably | ||||
|          * not a good idea to set it too low. | ||||
|          * </p> | ||||
|          * | ||||
|          * @deprecated use {@link TTL#pubkey(long)} instead. | ||||
|          */ | ||||
|         public Builder pubkeyTTL(long days) { | ||||
|             if (days < 0 || days > 28 * DAY) throw new IllegalArgumentException("TTL must be between 1 and 28 days"); | ||||
|   | ||||
| @@ -25,16 +25,18 @@ import java.security.MessageDigest; | ||||
| import java.security.NoSuchAlgorithmException; | ||||
| import java.util.ArrayList; | ||||
| import java.util.List; | ||||
| import java.util.concurrent.Semaphore; | ||||
| import java.util.concurrent.*; | ||||
|  | ||||
| import static ch.dissem.bitmessage.utils.Bytes.inc; | ||||
| import static ch.dissem.bitmessage.utils.ThreadFactoryBuilder.pool; | ||||
|  | ||||
| /** | ||||
|  * A POW engine using all available CPU cores. | ||||
|  */ | ||||
| public class MultiThreadedPOWEngine implements ProofOfWorkEngine { | ||||
|     private static final Logger LOG = LoggerFactory.getLogger(MultiThreadedPOWEngine.class); | ||||
|     private static final Semaphore semaphore = new Semaphore(1, true); | ||||
|     private final ExecutorService waiterPool = Executors.newSingleThreadExecutor(pool("POW-waiter").daemon().build()); | ||||
|     private final ExecutorService workerPool = Executors.newCachedThreadPool(pool("POW-worker").daemon().build()); | ||||
|  | ||||
|     /** | ||||
|      * This method will block until all pending nonce calculations are done, but not wait for its own calculation | ||||
| @@ -46,42 +48,59 @@ public class MultiThreadedPOWEngine implements ProofOfWorkEngine { | ||||
|      * @param callback    called with the calculated nonce as argument. The ProofOfWorkEngine implementation must make | ||||
|      */ | ||||
|     @Override | ||||
|     public void calculateNonce(byte[] initialHash, byte[] target, Callback callback) { | ||||
|         try { | ||||
|             semaphore.acquire(); | ||||
|         } catch (InterruptedException e) { | ||||
|             throw new ApplicationException(e); | ||||
|         } | ||||
|         callback = new CallbackWrapper(callback); | ||||
|         int cores = Runtime.getRuntime().availableProcessors(); | ||||
|         if (cores > 255) cores = 255; | ||||
|         LOG.info("Doing POW using " + cores + " cores"); | ||||
|         List<Worker> workers = new ArrayList<>(cores); | ||||
|         for (int i = 0; i < cores; i++) { | ||||
|             Worker w = new Worker(workers, (byte) cores, i, initialHash, target, callback); | ||||
|             workers.add(w); | ||||
|         } | ||||
|         for (Worker w : workers) { | ||||
|             // Doing this in the previous loop might cause a ConcurrentModificationException in the worker | ||||
|             // if a worker finds a nonce while new ones are still being added. | ||||
|             w.start(); | ||||
|         } | ||||
|     public void calculateNonce(final byte[] initialHash, final byte[] target, final Callback callback) { | ||||
|         waiterPool.execute(new Runnable() { | ||||
|             @Override | ||||
|             public void run() { | ||||
|                 long startTime = System.currentTimeMillis(); | ||||
|  | ||||
|                 int cores = Runtime.getRuntime().availableProcessors(); | ||||
|                 if (cores > 255) cores = 255; | ||||
|                 LOG.info("Doing POW using " + cores + " cores"); | ||||
|                 List<Worker> workers = new ArrayList<>(cores); | ||||
|                 for (int i = 0; i < cores; i++) { | ||||
|                     Worker w = new Worker((byte) cores, i, initialHash, target); | ||||
|                     workers.add(w); | ||||
|                 } | ||||
|                 List<Future<byte[]>> futures = new ArrayList<>(cores); | ||||
|                 for (Worker w : workers) { | ||||
|                     // Doing this in the previous loop might cause a ConcurrentModificationException in the worker | ||||
|                     // if a worker finds a nonce while new ones are still being added. | ||||
|                     futures.add(workerPool.submit(w)); | ||||
|                 } | ||||
|                 try { | ||||
|                     while (!Thread.interrupted()) { | ||||
|                         for (Future<byte[]> future : futures) { | ||||
|                             if (future.isDone()) { | ||||
|                                 callback.onNonceCalculated(initialHash, future.get()); | ||||
|                                 LOG.info("Nonce calculated in " + ((System.currentTimeMillis() - startTime) / 1000) + " seconds"); | ||||
|                                 for (Future<byte[]> f : futures) { | ||||
|                                     f.cancel(true); | ||||
|                                 } | ||||
|                                 return; | ||||
|                             } | ||||
|                         } | ||||
|                         Thread.sleep(100); | ||||
|                     } | ||||
|                     LOG.error("POW waiter thread interrupted - this should not happen!"); | ||||
|                 } catch (ExecutionException e) { | ||||
|                     LOG.error(e.getMessage(), e); | ||||
|                 } catch (InterruptedException e) { | ||||
|                     LOG.error("POW waiter thread interrupted - this should not happen!", e); | ||||
|                 } | ||||
|             } | ||||
|         }); | ||||
|     } | ||||
|  | ||||
|     private static class Worker extends Thread { | ||||
|         private final Callback callback; | ||||
|     private class Worker implements Callable<byte[]> { | ||||
|         private final byte numberOfCores; | ||||
|         private final List<Worker> workers; | ||||
|         private final byte[] initialHash; | ||||
|         private final byte[] target; | ||||
|         private final MessageDigest mda; | ||||
|         private final byte[] nonce = new byte[8]; | ||||
|  | ||||
|         public Worker(List<Worker> workers, byte numberOfCores, int core, byte[] initialHash, byte[] target, | ||||
|                       Callback callback) { | ||||
|             this.callback = callback; | ||||
|         Worker(byte numberOfCores, int core, byte[] initialHash, byte[] target) { | ||||
|             this.numberOfCores = numberOfCores; | ||||
|             this.workers = workers; | ||||
|             this.initialHash = initialHash; | ||||
|             this.target = target; | ||||
|             this.nonce[7] = (byte) core; | ||||
| @@ -94,49 +113,16 @@ public class MultiThreadedPOWEngine implements ProofOfWorkEngine { | ||||
|         } | ||||
|  | ||||
|         @Override | ||||
|         public void run() { | ||||
|         public byte[] call() throws Exception { | ||||
|             do { | ||||
|                 inc(nonce, numberOfCores); | ||||
|                 mda.update(nonce); | ||||
|                 mda.update(initialHash); | ||||
|                 if (!Bytes.lt(target, mda.digest(mda.digest()), 8)) { | ||||
|                     synchronized (callback) { | ||||
|                         if (!Thread.interrupted()) { | ||||
|                             for (Worker w : workers) { | ||||
|                                 w.interrupt(); | ||||
|                             } | ||||
|                             // Clear interrupted flag for callback | ||||
|                             Thread.interrupted(); | ||||
|                             callback.onNonceCalculated(initialHash, nonce); | ||||
|                         } | ||||
|                     } | ||||
|                     return; | ||||
|                     return nonce; | ||||
|                 } | ||||
|             } while (!Thread.interrupted()); | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     public static class CallbackWrapper implements Callback { | ||||
|         private final Callback callback; | ||||
|         private final long startTime; | ||||
|         private boolean waiting = true; | ||||
|  | ||||
|         public CallbackWrapper(Callback callback) { | ||||
|             this.startTime = System.currentTimeMillis(); | ||||
|             this.callback = callback; | ||||
|         } | ||||
|  | ||||
|         @Override | ||||
|         public void onNonceCalculated(byte[] initialHash, byte[] nonce) { | ||||
|             // Prevents the callback from being called twice if two nonces are found simultaneously | ||||
|             synchronized (this) { | ||||
|                 if (waiting) { | ||||
|                     semaphore.release(); | ||||
|                     LOG.info("Nonce calculated in " + ((System.currentTimeMillis() - startTime) / 1000) + " seconds"); | ||||
|                     waiting = false; | ||||
|                     callback.onNonceCalculated(initialHash, nonce); | ||||
|                 } | ||||
|             } | ||||
|             return null; | ||||
|         } | ||||
|     } | ||||
| } | ||||
|   | ||||
| @@ -0,0 +1,65 @@ | ||||
| /* | ||||
|  * Copyright 2016 Christian Basler | ||||
|  * | ||||
|  * Licensed under the Apache License, Version 2.0 (the "License"); | ||||
|  * you may not use this file except in compliance with the License. | ||||
|  * You may obtain a copy of the License at | ||||
|  * | ||||
|  *     http://www.apache.org/licenses/LICENSE-2.0 | ||||
|  * | ||||
|  * Unless required by applicable law or agreed to in writing, software | ||||
|  * distributed under the License is distributed on an "AS IS" BASIS, | ||||
|  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||
|  * See the License for the specific language governing permissions and | ||||
|  * limitations under the License. | ||||
|  */ | ||||
|  | ||||
| package ch.dissem.bitmessage.utils; | ||||
|  | ||||
| import java.util.concurrent.ThreadFactory; | ||||
| import java.util.concurrent.atomic.AtomicInteger; | ||||
|  | ||||
| public class ThreadFactoryBuilder { | ||||
|     private final String namePrefix; | ||||
|     private int prio = Thread.NORM_PRIORITY; | ||||
|     private boolean daemon = false; | ||||
|  | ||||
|     private ThreadFactoryBuilder(String pool) { | ||||
|         this.namePrefix = pool + "-thread-"; | ||||
|     } | ||||
|  | ||||
|  | ||||
|     public static ThreadFactoryBuilder pool(String name) { | ||||
|         return new ThreadFactoryBuilder(name); | ||||
|     } | ||||
|  | ||||
|     public ThreadFactoryBuilder lowPrio() { | ||||
|         prio = Thread.MIN_PRIORITY; | ||||
|         return this; | ||||
|     } | ||||
|  | ||||
|     public ThreadFactoryBuilder daemon() { | ||||
|         daemon = true; | ||||
|         return this; | ||||
|     } | ||||
|  | ||||
|     public ThreadFactory build() { | ||||
|         SecurityManager s = System.getSecurityManager(); | ||||
|         final ThreadGroup group = (s != null) ? s.getThreadGroup() : | ||||
|                 Thread.currentThread().getThreadGroup(); | ||||
|  | ||||
|         return new ThreadFactory() { | ||||
|             private final AtomicInteger threadNumber = new AtomicInteger(1); | ||||
|  | ||||
|             @Override | ||||
|             public Thread newThread(Runnable r) { | ||||
|                 Thread t = new Thread(group, r, | ||||
|                         namePrefix + threadNumber.getAndIncrement(), | ||||
|                         0); | ||||
|                 t.setPriority(prio); | ||||
|                 t.setDaemon(daemon); | ||||
|                 return t; | ||||
|             } | ||||
|         }; | ||||
|     } | ||||
| } | ||||
| @@ -28,8 +28,7 @@ import ch.dissem.bitmessage.factory.Factory; | ||||
| import ch.dissem.bitmessage.ports.NetworkHandler; | ||||
| import ch.dissem.bitmessage.utils.Collections; | ||||
| import ch.dissem.bitmessage.utils.Property; | ||||
| import org.slf4j.Logger; | ||||
| import org.slf4j.LoggerFactory; | ||||
| import ch.dissem.bitmessage.utils.ThreadFactoryBuilder; | ||||
|  | ||||
| import java.io.IOException; | ||||
| import java.net.InetAddress; | ||||
| @@ -40,35 +39,27 @@ import java.util.concurrent.*; | ||||
| import static ch.dissem.bitmessage.networking.Connection.Mode.SERVER; | ||||
| import static ch.dissem.bitmessage.networking.Connection.State.ACTIVE; | ||||
| import static ch.dissem.bitmessage.utils.DebugUtils.inc; | ||||
| import static ch.dissem.bitmessage.utils.ThreadFactoryBuilder.pool; | ||||
| import static java.util.Collections.newSetFromMap; | ||||
|  | ||||
| /** | ||||
|  * Handles all the networky stuff. | ||||
|  */ | ||||
| public class DefaultNetworkHandler implements NetworkHandler, ContextHolder { | ||||
|     private final static Logger LOG = LoggerFactory.getLogger(DefaultNetworkHandler.class); | ||||
|  | ||||
|     public final static int NETWORK_MAGIC_NUMBER = 8; | ||||
|  | ||||
|     final Collection<Connection> connections = new ConcurrentLinkedQueue<>(); | ||||
|     private final ExecutorService pool; | ||||
|     private final ExecutorService pool = Executors.newCachedThreadPool( | ||||
|             pool("network") | ||||
|                     .lowPrio() | ||||
|                     .daemon() | ||||
|                     .build()); | ||||
|     private InternalContext ctx; | ||||
|     private ServerRunnable server; | ||||
|     private volatile boolean running; | ||||
|  | ||||
|     final Set<InventoryVector> requestedObjects = newSetFromMap(new ConcurrentHashMap<InventoryVector, Boolean>(50_000)); | ||||
|  | ||||
|     public DefaultNetworkHandler() { | ||||
|         pool = Executors.newCachedThreadPool(new ThreadFactory() { | ||||
|             @Override | ||||
|             public Thread newThread(Runnable r) { | ||||
|                 Thread thread = Executors.defaultThreadFactory().newThread(r); | ||||
|                 thread.setPriority(Thread.MIN_PRIORITY); | ||||
|                 return thread; | ||||
|             } | ||||
|         }); | ||||
|     } | ||||
|  | ||||
|     @Override | ||||
|     public void setContext(InternalContext context) { | ||||
|         this.ctx = context; | ||||
|   | ||||
		Reference in New Issue
	
	Block a user