Merge branch 'develop' into feature/ACK
# Conflicts: # core/src/main/java/ch/dissem/bitmessage/BitmessageContext.java
This commit is contained in:
		| @@ -64,12 +64,9 @@ public class BitmessageContext { | |||||||
|     public static final int CURRENT_VERSION = 3; |     public static final int CURRENT_VERSION = 3; | ||||||
|     private final static Logger LOG = LoggerFactory.getLogger(BitmessageContext.class); |     private final static Logger LOG = LoggerFactory.getLogger(BitmessageContext.class); | ||||||
|  |  | ||||||
|     private final ExecutorService pool; |  | ||||||
|  |  | ||||||
|     private final InternalContext ctx; |     private final InternalContext ctx; | ||||||
|  |  | ||||||
|     private final Labeler labeler; |     private final Labeler labeler; | ||||||
|     private final Listener listener; |  | ||||||
|     private final NetworkHandler.MessageListener networkListener; |     private final NetworkHandler.MessageListener networkListener; | ||||||
|  |  | ||||||
|     private final boolean sendPubkeyOnIdentityCreation; |     private final boolean sendPubkeyOnIdentityCreation; | ||||||
| @@ -77,12 +74,7 @@ public class BitmessageContext { | |||||||
|     private BitmessageContext(Builder builder) { |     private BitmessageContext(Builder builder) { | ||||||
|         ctx = new InternalContext(builder); |         ctx = new InternalContext(builder); | ||||||
|         labeler = builder.labeler; |         labeler = builder.labeler; | ||||||
|         listener = builder.listener; |         networkListener = new DefaultMessageListener(ctx, labeler, builder.listener); | ||||||
|         networkListener = new DefaultMessageListener(ctx, labeler, listener); |  | ||||||
|  |  | ||||||
|         // As this thread is used for parts that do POW, which itself uses parallel threads, only |  | ||||||
|         // one should be executed at any time. |  | ||||||
|         pool = Executors.newFixedThreadPool(1); |  | ||||||
|  |  | ||||||
|         sendPubkeyOnIdentityCreation = builder.sendPubkeyOnIdentityCreation; |         sendPubkeyOnIdentityCreation = builder.sendPubkeyOnIdentityCreation; | ||||||
|  |  | ||||||
| @@ -116,12 +108,7 @@ public class BitmessageContext { | |||||||
|         )); |         )); | ||||||
|         ctx.getAddressRepository().save(identity); |         ctx.getAddressRepository().save(identity); | ||||||
|         if (sendPubkeyOnIdentityCreation) { |         if (sendPubkeyOnIdentityCreation) { | ||||||
|             pool.submit(new Runnable() { |             ctx.sendPubkey(identity, identity.getStream()); | ||||||
|                 @Override |  | ||||||
|                 public void run() { |  | ||||||
|                     ctx.sendPubkey(identity, identity.getStream()); |  | ||||||
|                 } |  | ||||||
|             }); |  | ||||||
|         } |         } | ||||||
|         return identity; |         return identity; | ||||||
|     } |     } | ||||||
| @@ -177,35 +164,33 @@ public class BitmessageContext { | |||||||
|         if (msg.getFrom() == null || msg.getFrom().getPrivateKey() == null) { |         if (msg.getFrom() == null || msg.getFrom().getPrivateKey() == null) { | ||||||
|             throw new IllegalArgumentException("'From' must be an identity, i.e. have a private key."); |             throw new IllegalArgumentException("'From' must be an identity, i.e. have a private key."); | ||||||
|         } |         } | ||||||
|         pool.submit(new Runnable() { |         BitmessageAddress to = msg.getTo(); | ||||||
|             @Override |         if (to != null) { | ||||||
|             public void run() { |             if (to.getPubkey() == null) { | ||||||
|                 BitmessageAddress to = msg.getTo(); |                 LOG.info("Public key is missing from recipient. Requesting."); | ||||||
|                 if (to != null) { |                 ctx.requestPubkey(to); | ||||||
|                     if (to.getPubkey() == null) { |  | ||||||
|                         LOG.info("Public key is missing from recipient. Requesting."); |  | ||||||
|                         ctx.requestPubkey(to); |  | ||||||
|                     } |  | ||||||
|                     if (to.getPubkey() == null) { |  | ||||||
|                         msg.setStatus(PUBKEY_REQUESTED); |  | ||||||
|                         msg.addLabels(ctx.getMessageRepository().getLabels(Label.Type.OUTBOX)); |  | ||||||
|                         ctx.getMessageRepository().save(msg); |  | ||||||
|                     } |  | ||||||
|                 } |  | ||||||
|                 if (to == null || to.getPubkey() != null) { |  | ||||||
|                     LOG.info("Sending message."); |  | ||||||
|                     msg.setStatus(DOING_PROOF_OF_WORK); |  | ||||||
|                     msg.addLabels(ctx.getMessageRepository().getLabels(Label.Type.OUTBOX)); |  | ||||||
|                     ctx.getMessageRepository().save(msg); |  | ||||||
|                     ctx.send( |  | ||||||
|                             msg.getFrom(), |  | ||||||
|                             to, |  | ||||||
|                             wrapInObjectPayload(msg), |  | ||||||
|                             TTL.msg() |  | ||||||
|                     ); |  | ||||||
|                 } |  | ||||||
|             } |             } | ||||||
|         }); |             if (to.getPubkey() == null) { | ||||||
|  |                 msg.setStatus(PUBKEY_REQUESTED); | ||||||
|  |                 msg.addLabels(ctx.getMessageRepository().getLabels(Label.Type.OUTBOX)); | ||||||
|  |                 ctx.getMessageRepository().save(msg); | ||||||
|  |             } | ||||||
|  |         } | ||||||
|  |         if (to == null || to.getPubkey() != null) { | ||||||
|  |             LOG.info("Sending message."); | ||||||
|  |             msg.setStatus(DOING_PROOF_OF_WORK); | ||||||
|  |             msg.addLabels(ctx.getMessageRepository().getLabels(Label.Type.OUTBOX)); | ||||||
|  |             ctx.getMessageRepository().save(msg); | ||||||
|  |             ctx.send( | ||||||
|  |                     msg.getFrom(), | ||||||
|  |                     to, | ||||||
|  |                     wrapInObjectPayload(msg), | ||||||
|  |                     TTL.msg() | ||||||
|  |             ); | ||||||
|  |             msg.setStatus(SENT); | ||||||
|  |             msg.addLabels(ctx.getMessageRepository().getLabels(Label.Type.SENT)); | ||||||
|  |             ctx.getMessageRepository().save(msg); | ||||||
|  |         } | ||||||
|     } |     } | ||||||
|  |  | ||||||
|     private ObjectPayload wrapInObjectPayload(Plaintext msg) { |     private ObjectPayload wrapInObjectPayload(Plaintext msg) { | ||||||
| @@ -425,6 +410,8 @@ public class BitmessageContext { | |||||||
|          * sender can't receive your public key) in some special situations. Also note that it's probably |          * sender can't receive your public key) in some special situations. Also note that it's probably | ||||||
|          * not a good idea to set it too low. |          * not a good idea to set it too low. | ||||||
|          * </p> |          * </p> | ||||||
|  |          * | ||||||
|  |          * @deprecated use {@link TTL#pubkey(long)} instead. | ||||||
|          */ |          */ | ||||||
|         public Builder pubkeyTTL(long days) { |         public Builder pubkeyTTL(long days) { | ||||||
|             if (days < 0 || days > 28 * DAY) throw new IllegalArgumentException("TTL must be between 1 and 28 days"); |             if (days < 0 || days > 28 * DAY) throw new IllegalArgumentException("TTL must be between 1 and 28 days"); | ||||||
|   | |||||||
| @@ -25,16 +25,18 @@ import java.security.MessageDigest; | |||||||
| import java.security.NoSuchAlgorithmException; | import java.security.NoSuchAlgorithmException; | ||||||
| import java.util.ArrayList; | import java.util.ArrayList; | ||||||
| import java.util.List; | import java.util.List; | ||||||
| import java.util.concurrent.Semaphore; | import java.util.concurrent.*; | ||||||
|  |  | ||||||
| import static ch.dissem.bitmessage.utils.Bytes.inc; | import static ch.dissem.bitmessage.utils.Bytes.inc; | ||||||
|  | import static ch.dissem.bitmessage.utils.ThreadFactoryBuilder.pool; | ||||||
|  |  | ||||||
| /** | /** | ||||||
|  * A POW engine using all available CPU cores. |  * A POW engine using all available CPU cores. | ||||||
|  */ |  */ | ||||||
| public class MultiThreadedPOWEngine implements ProofOfWorkEngine { | public class MultiThreadedPOWEngine implements ProofOfWorkEngine { | ||||||
|     private static final Logger LOG = LoggerFactory.getLogger(MultiThreadedPOWEngine.class); |     private static final Logger LOG = LoggerFactory.getLogger(MultiThreadedPOWEngine.class); | ||||||
|     private static final Semaphore semaphore = new Semaphore(1, true); |     private final ExecutorService waiterPool = Executors.newSingleThreadExecutor(pool("POW-waiter").daemon().build()); | ||||||
|  |     private final ExecutorService workerPool = Executors.newCachedThreadPool(pool("POW-worker").daemon().build()); | ||||||
|  |  | ||||||
|     /** |     /** | ||||||
|      * This method will block until all pending nonce calculations are done, but not wait for its own calculation |      * This method will block until all pending nonce calculations are done, but not wait for its own calculation | ||||||
| @@ -46,42 +48,59 @@ public class MultiThreadedPOWEngine implements ProofOfWorkEngine { | |||||||
|      * @param callback    called with the calculated nonce as argument. The ProofOfWorkEngine implementation must make |      * @param callback    called with the calculated nonce as argument. The ProofOfWorkEngine implementation must make | ||||||
|      */ |      */ | ||||||
|     @Override |     @Override | ||||||
|     public void calculateNonce(byte[] initialHash, byte[] target, Callback callback) { |     public void calculateNonce(final byte[] initialHash, final byte[] target, final Callback callback) { | ||||||
|         try { |         waiterPool.execute(new Runnable() { | ||||||
|             semaphore.acquire(); |             @Override | ||||||
|         } catch (InterruptedException e) { |             public void run() { | ||||||
|             throw new ApplicationException(e); |                 long startTime = System.currentTimeMillis(); | ||||||
|         } |  | ||||||
|         callback = new CallbackWrapper(callback); |                 int cores = Runtime.getRuntime().availableProcessors(); | ||||||
|         int cores = Runtime.getRuntime().availableProcessors(); |                 if (cores > 255) cores = 255; | ||||||
|         if (cores > 255) cores = 255; |                 LOG.info("Doing POW using " + cores + " cores"); | ||||||
|         LOG.info("Doing POW using " + cores + " cores"); |                 List<Worker> workers = new ArrayList<>(cores); | ||||||
|         List<Worker> workers = new ArrayList<>(cores); |                 for (int i = 0; i < cores; i++) { | ||||||
|         for (int i = 0; i < cores; i++) { |                     Worker w = new Worker((byte) cores, i, initialHash, target); | ||||||
|             Worker w = new Worker(workers, (byte) cores, i, initialHash, target, callback); |                     workers.add(w); | ||||||
|             workers.add(w); |                 } | ||||||
|         } |                 List<Future<byte[]>> futures = new ArrayList<>(cores); | ||||||
|         for (Worker w : workers) { |                 for (Worker w : workers) { | ||||||
|             // Doing this in the previous loop might cause a ConcurrentModificationException in the worker |                     // Doing this in the previous loop might cause a ConcurrentModificationException in the worker | ||||||
|             // if a worker finds a nonce while new ones are still being added. |                     // if a worker finds a nonce while new ones are still being added. | ||||||
|             w.start(); |                     futures.add(workerPool.submit(w)); | ||||||
|         } |                 } | ||||||
|  |                 try { | ||||||
|  |                     while (!Thread.interrupted()) { | ||||||
|  |                         for (Future<byte[]> future : futures) { | ||||||
|  |                             if (future.isDone()) { | ||||||
|  |                                 callback.onNonceCalculated(initialHash, future.get()); | ||||||
|  |                                 LOG.info("Nonce calculated in " + ((System.currentTimeMillis() - startTime) / 1000) + " seconds"); | ||||||
|  |                                 for (Future<byte[]> f : futures) { | ||||||
|  |                                     f.cancel(true); | ||||||
|  |                                 } | ||||||
|  |                                 return; | ||||||
|  |                             } | ||||||
|  |                         } | ||||||
|  |                         Thread.sleep(100); | ||||||
|  |                     } | ||||||
|  |                     LOG.error("POW waiter thread interrupted - this should not happen!"); | ||||||
|  |                 } catch (ExecutionException e) { | ||||||
|  |                     LOG.error(e.getMessage(), e); | ||||||
|  |                 } catch (InterruptedException e) { | ||||||
|  |                     LOG.error("POW waiter thread interrupted - this should not happen!", e); | ||||||
|  |                 } | ||||||
|  |             } | ||||||
|  |         }); | ||||||
|     } |     } | ||||||
|  |  | ||||||
|     private static class Worker extends Thread { |     private class Worker implements Callable<byte[]> { | ||||||
|         private final Callback callback; |  | ||||||
|         private final byte numberOfCores; |         private final byte numberOfCores; | ||||||
|         private final List<Worker> workers; |  | ||||||
|         private final byte[] initialHash; |         private final byte[] initialHash; | ||||||
|         private final byte[] target; |         private final byte[] target; | ||||||
|         private final MessageDigest mda; |         private final MessageDigest mda; | ||||||
|         private final byte[] nonce = new byte[8]; |         private final byte[] nonce = new byte[8]; | ||||||
|  |  | ||||||
|         public Worker(List<Worker> workers, byte numberOfCores, int core, byte[] initialHash, byte[] target, |         Worker(byte numberOfCores, int core, byte[] initialHash, byte[] target) { | ||||||
|                       Callback callback) { |  | ||||||
|             this.callback = callback; |  | ||||||
|             this.numberOfCores = numberOfCores; |             this.numberOfCores = numberOfCores; | ||||||
|             this.workers = workers; |  | ||||||
|             this.initialHash = initialHash; |             this.initialHash = initialHash; | ||||||
|             this.target = target; |             this.target = target; | ||||||
|             this.nonce[7] = (byte) core; |             this.nonce[7] = (byte) core; | ||||||
| @@ -94,49 +113,16 @@ public class MultiThreadedPOWEngine implements ProofOfWorkEngine { | |||||||
|         } |         } | ||||||
|  |  | ||||||
|         @Override |         @Override | ||||||
|         public void run() { |         public byte[] call() throws Exception { | ||||||
|             do { |             do { | ||||||
|                 inc(nonce, numberOfCores); |                 inc(nonce, numberOfCores); | ||||||
|                 mda.update(nonce); |                 mda.update(nonce); | ||||||
|                 mda.update(initialHash); |                 mda.update(initialHash); | ||||||
|                 if (!Bytes.lt(target, mda.digest(mda.digest()), 8)) { |                 if (!Bytes.lt(target, mda.digest(mda.digest()), 8)) { | ||||||
|                     synchronized (callback) { |                     return nonce; | ||||||
|                         if (!Thread.interrupted()) { |  | ||||||
|                             for (Worker w : workers) { |  | ||||||
|                                 w.interrupt(); |  | ||||||
|                             } |  | ||||||
|                             // Clear interrupted flag for callback |  | ||||||
|                             Thread.interrupted(); |  | ||||||
|                             callback.onNonceCalculated(initialHash, nonce); |  | ||||||
|                         } |  | ||||||
|                     } |  | ||||||
|                     return; |  | ||||||
|                 } |                 } | ||||||
|             } while (!Thread.interrupted()); |             } while (!Thread.interrupted()); | ||||||
|         } |             return null; | ||||||
|     } |  | ||||||
|  |  | ||||||
|     public static class CallbackWrapper implements Callback { |  | ||||||
|         private final Callback callback; |  | ||||||
|         private final long startTime; |  | ||||||
|         private boolean waiting = true; |  | ||||||
|  |  | ||||||
|         public CallbackWrapper(Callback callback) { |  | ||||||
|             this.startTime = System.currentTimeMillis(); |  | ||||||
|             this.callback = callback; |  | ||||||
|         } |  | ||||||
|  |  | ||||||
|         @Override |  | ||||||
|         public void onNonceCalculated(byte[] initialHash, byte[] nonce) { |  | ||||||
|             // Prevents the callback from being called twice if two nonces are found simultaneously |  | ||||||
|             synchronized (this) { |  | ||||||
|                 if (waiting) { |  | ||||||
|                     semaphore.release(); |  | ||||||
|                     LOG.info("Nonce calculated in " + ((System.currentTimeMillis() - startTime) / 1000) + " seconds"); |  | ||||||
|                     waiting = false; |  | ||||||
|                     callback.onNonceCalculated(initialHash, nonce); |  | ||||||
|                 } |  | ||||||
|             } |  | ||||||
|         } |         } | ||||||
|     } |     } | ||||||
| } | } | ||||||
|   | |||||||
| @@ -0,0 +1,65 @@ | |||||||
|  | /* | ||||||
|  |  * Copyright 2016 Christian Basler | ||||||
|  |  * | ||||||
|  |  * Licensed under the Apache License, Version 2.0 (the "License"); | ||||||
|  |  * you may not use this file except in compliance with the License. | ||||||
|  |  * You may obtain a copy of the License at | ||||||
|  |  * | ||||||
|  |  *     http://www.apache.org/licenses/LICENSE-2.0 | ||||||
|  |  * | ||||||
|  |  * Unless required by applicable law or agreed to in writing, software | ||||||
|  |  * distributed under the License is distributed on an "AS IS" BASIS, | ||||||
|  |  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||||
|  |  * See the License for the specific language governing permissions and | ||||||
|  |  * limitations under the License. | ||||||
|  |  */ | ||||||
|  |  | ||||||
|  | package ch.dissem.bitmessage.utils; | ||||||
|  |  | ||||||
|  | import java.util.concurrent.ThreadFactory; | ||||||
|  | import java.util.concurrent.atomic.AtomicInteger; | ||||||
|  |  | ||||||
|  | public class ThreadFactoryBuilder { | ||||||
|  |     private final String namePrefix; | ||||||
|  |     private int prio = Thread.NORM_PRIORITY; | ||||||
|  |     private boolean daemon = false; | ||||||
|  |  | ||||||
|  |     private ThreadFactoryBuilder(String pool) { | ||||||
|  |         this.namePrefix = pool + "-thread-"; | ||||||
|  |     } | ||||||
|  |  | ||||||
|  |  | ||||||
|  |     public static ThreadFactoryBuilder pool(String name) { | ||||||
|  |         return new ThreadFactoryBuilder(name); | ||||||
|  |     } | ||||||
|  |  | ||||||
|  |     public ThreadFactoryBuilder lowPrio() { | ||||||
|  |         prio = Thread.MIN_PRIORITY; | ||||||
|  |         return this; | ||||||
|  |     } | ||||||
|  |  | ||||||
|  |     public ThreadFactoryBuilder daemon() { | ||||||
|  |         daemon = true; | ||||||
|  |         return this; | ||||||
|  |     } | ||||||
|  |  | ||||||
|  |     public ThreadFactory build() { | ||||||
|  |         SecurityManager s = System.getSecurityManager(); | ||||||
|  |         final ThreadGroup group = (s != null) ? s.getThreadGroup() : | ||||||
|  |                 Thread.currentThread().getThreadGroup(); | ||||||
|  |  | ||||||
|  |         return new ThreadFactory() { | ||||||
|  |             private final AtomicInteger threadNumber = new AtomicInteger(1); | ||||||
|  |  | ||||||
|  |             @Override | ||||||
|  |             public Thread newThread(Runnable r) { | ||||||
|  |                 Thread t = new Thread(group, r, | ||||||
|  |                         namePrefix + threadNumber.getAndIncrement(), | ||||||
|  |                         0); | ||||||
|  |                 t.setPriority(prio); | ||||||
|  |                 t.setDaemon(daemon); | ||||||
|  |                 return t; | ||||||
|  |             } | ||||||
|  |         }; | ||||||
|  |     } | ||||||
|  | } | ||||||
| @@ -28,8 +28,7 @@ import ch.dissem.bitmessage.factory.Factory; | |||||||
| import ch.dissem.bitmessage.ports.NetworkHandler; | import ch.dissem.bitmessage.ports.NetworkHandler; | ||||||
| import ch.dissem.bitmessage.utils.Collections; | import ch.dissem.bitmessage.utils.Collections; | ||||||
| import ch.dissem.bitmessage.utils.Property; | import ch.dissem.bitmessage.utils.Property; | ||||||
| import org.slf4j.Logger; | import ch.dissem.bitmessage.utils.ThreadFactoryBuilder; | ||||||
| import org.slf4j.LoggerFactory; |  | ||||||
|  |  | ||||||
| import java.io.IOException; | import java.io.IOException; | ||||||
| import java.net.InetAddress; | import java.net.InetAddress; | ||||||
| @@ -40,35 +39,27 @@ import java.util.concurrent.*; | |||||||
| import static ch.dissem.bitmessage.networking.Connection.Mode.SERVER; | import static ch.dissem.bitmessage.networking.Connection.Mode.SERVER; | ||||||
| import static ch.dissem.bitmessage.networking.Connection.State.ACTIVE; | import static ch.dissem.bitmessage.networking.Connection.State.ACTIVE; | ||||||
| import static ch.dissem.bitmessage.utils.DebugUtils.inc; | import static ch.dissem.bitmessage.utils.DebugUtils.inc; | ||||||
|  | import static ch.dissem.bitmessage.utils.ThreadFactoryBuilder.pool; | ||||||
| import static java.util.Collections.newSetFromMap; | import static java.util.Collections.newSetFromMap; | ||||||
|  |  | ||||||
| /** | /** | ||||||
|  * Handles all the networky stuff. |  * Handles all the networky stuff. | ||||||
|  */ |  */ | ||||||
| public class DefaultNetworkHandler implements NetworkHandler, ContextHolder { | public class DefaultNetworkHandler implements NetworkHandler, ContextHolder { | ||||||
|     private final static Logger LOG = LoggerFactory.getLogger(DefaultNetworkHandler.class); |  | ||||||
|  |  | ||||||
|     public final static int NETWORK_MAGIC_NUMBER = 8; |     public final static int NETWORK_MAGIC_NUMBER = 8; | ||||||
|  |  | ||||||
|     final Collection<Connection> connections = new ConcurrentLinkedQueue<>(); |     final Collection<Connection> connections = new ConcurrentLinkedQueue<>(); | ||||||
|     private final ExecutorService pool; |     private final ExecutorService pool = Executors.newCachedThreadPool( | ||||||
|  |             pool("network") | ||||||
|  |                     .lowPrio() | ||||||
|  |                     .daemon() | ||||||
|  |                     .build()); | ||||||
|     private InternalContext ctx; |     private InternalContext ctx; | ||||||
|     private ServerRunnable server; |     private ServerRunnable server; | ||||||
|     private volatile boolean running; |     private volatile boolean running; | ||||||
|  |  | ||||||
|     final Set<InventoryVector> requestedObjects = newSetFromMap(new ConcurrentHashMap<InventoryVector, Boolean>(50_000)); |     final Set<InventoryVector> requestedObjects = newSetFromMap(new ConcurrentHashMap<InventoryVector, Boolean>(50_000)); | ||||||
|  |  | ||||||
|     public DefaultNetworkHandler() { |  | ||||||
|         pool = Executors.newCachedThreadPool(new ThreadFactory() { |  | ||||||
|             @Override |  | ||||||
|             public Thread newThread(Runnable r) { |  | ||||||
|                 Thread thread = Executors.defaultThreadFactory().newThread(r); |  | ||||||
|                 thread.setPriority(Thread.MIN_PRIORITY); |  | ||||||
|                 return thread; |  | ||||||
|             } |  | ||||||
|         }); |  | ||||||
|     } |  | ||||||
|  |  | ||||||
|     @Override |     @Override | ||||||
|     public void setContext(InternalContext context) { |     public void setContext(InternalContext context) { | ||||||
|         this.ctx = context; |         this.ctx = context; | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user