Merge branch 'release/2.0.4'
This commit is contained in:
		| @@ -264,6 +264,9 @@ public class Plaintext implements Streamable { | ||||
|     } | ||||
|  | ||||
|     public void setStatus(Status status) { | ||||
|         if (status != Status.RECEIVED && sent == null && status != Status.DRAFT) { | ||||
|             sent = UnixTime.now(); | ||||
|         } | ||||
|         this.status = status; | ||||
|     } | ||||
|  | ||||
|   | ||||
| @@ -54,7 +54,7 @@ public abstract class AbstractConnection { | ||||
|     protected final NetworkHandler.MessageListener listener; | ||||
|     protected final Map<InventoryVector, Long> ivCache; | ||||
|     protected final Deque<MessagePayload> sendingQueue; | ||||
|     protected final Set<InventoryVector> commonRequestedObjects; | ||||
|     protected final Map<InventoryVector, Long> commonRequestedObjects; | ||||
|     protected final Set<InventoryVector> requestedObjects; | ||||
|  | ||||
|     protected volatile State state; | ||||
| @@ -71,7 +71,7 @@ public abstract class AbstractConnection { | ||||
|  | ||||
|     public AbstractConnection(InternalContext context, Mode mode, | ||||
|                               NetworkAddress node, | ||||
|                               Set<InventoryVector> commonRequestedObjects, | ||||
|                               Map<InventoryVector, Long> commonRequestedObjects, | ||||
|                               long syncTimeout) { | ||||
|         this.ctx = context; | ||||
|         this.mode = mode; | ||||
| @@ -143,7 +143,7 @@ public abstract class AbstractConnection { | ||||
|         int originalSize = inv.getInventory().size(); | ||||
|         updateIvCache(inv.getInventory()); | ||||
|         List<InventoryVector> missing = ctx.getInventory().getMissing(inv.getInventory(), streams); | ||||
|         missing.removeAll(commonRequestedObjects); | ||||
|         missing.removeAll(commonRequestedObjects.keySet()); | ||||
|         LOG.trace("Received inventory with " + originalSize + " elements, of which are " | ||||
|             + missing.size() + " missing."); | ||||
|         send(new GetData.Builder().inventory(missing).build()); | ||||
| @@ -175,7 +175,7 @@ public abstract class AbstractConnection { | ||||
|         } catch (IOException e) { | ||||
|             LOG.error("Stream " + objectMessage.getStream() + ", object type " + objectMessage.getType() + ": " + e.getMessage(), e); | ||||
|         } finally { | ||||
|             if (!commonRequestedObjects.remove(objectMessage.getInventoryVector())) { | ||||
|             if (commonRequestedObjects.remove(objectMessage.getInventoryVector()) == null) { | ||||
|                 LOG.debug("Received object that wasn't requested."); | ||||
|             } | ||||
|         } | ||||
| @@ -205,6 +205,10 @@ public abstract class AbstractConnection { | ||||
|         return ivCache.containsKey(iv); | ||||
|     } | ||||
|  | ||||
|     public boolean requested(InventoryVector iv) { | ||||
|         return requestedObjects.contains(iv); | ||||
|     } | ||||
|  | ||||
|     private void cleanupIvCache() { | ||||
|         Long fiveMinutesAgo = UnixTime.now(-5 * MINUTE); | ||||
|         for (Map.Entry<InventoryVector, Long> entry : ivCache.entrySet()) { | ||||
|   | ||||
| @@ -36,9 +36,9 @@ import java.net.InetAddress; | ||||
| import java.net.InetSocketAddress; | ||||
| import java.net.Socket; | ||||
| import java.net.SocketTimeoutException; | ||||
| import java.util.HashSet; | ||||
| import java.util.HashMap; | ||||
| import java.util.Map; | ||||
| import java.util.Objects; | ||||
| import java.util.Set; | ||||
|  | ||||
| import static ch.dissem.bitmessage.networking.AbstractConnection.Mode.CLIENT; | ||||
| import static ch.dissem.bitmessage.networking.AbstractConnection.Mode.SYNC; | ||||
| @@ -64,20 +64,20 @@ class Connection extends AbstractConnection { | ||||
|     private boolean socketInitialized; | ||||
|  | ||||
|     public Connection(InternalContext context, Mode mode, Socket socket, | ||||
|                       Set<InventoryVector> requestedObjectsMap) throws IOException { | ||||
|                       Map<InventoryVector, Long> requestedObjectsMap) throws IOException { | ||||
|         this(context, mode, socket, requestedObjectsMap, | ||||
|             new NetworkAddress.Builder().ip(socket.getInetAddress()).port(socket.getPort()).stream(1).build(), | ||||
|             0); | ||||
|     } | ||||
|  | ||||
|     public Connection(InternalContext context, Mode mode, NetworkAddress node, | ||||
|                       Set<InventoryVector> requestedObjectsMap) { | ||||
|                       Map<InventoryVector, Long> requestedObjectsMap) { | ||||
|         this(context, mode, new Socket(), requestedObjectsMap, | ||||
|             node, 0); | ||||
|     } | ||||
|  | ||||
|     private Connection(InternalContext context, Mode mode, Socket socket, | ||||
|                        Set<InventoryVector> commonRequestedObjects, NetworkAddress node, long syncTimeout) { | ||||
|                        Map<InventoryVector, Long> commonRequestedObjects, NetworkAddress node, long syncTimeout) { | ||||
|         super(context, mode, node, commonRequestedObjects, syncTimeout); | ||||
|         this.startTime = UnixTime.now(); | ||||
|         this.socket = socket; | ||||
| @@ -86,7 +86,7 @@ class Connection extends AbstractConnection { | ||||
|     public static Connection sync(InternalContext ctx, InetAddress address, int port, MessageListener listener, | ||||
|                                   long timeoutInSeconds) throws IOException { | ||||
|         return new Connection(ctx, SYNC, new Socket(address, port), | ||||
|                 new HashSet<InventoryVector>(), | ||||
|             new HashMap<InventoryVector, Long>(), | ||||
|             new NetworkAddress.Builder().ip(address).port(port).stream(1).build(), | ||||
|             timeoutInSeconds); | ||||
|     } | ||||
|   | ||||
| @@ -18,7 +18,6 @@ package ch.dissem.bitmessage.networking; | ||||
|  | ||||
| import ch.dissem.bitmessage.InternalContext; | ||||
| import ch.dissem.bitmessage.entity.valueobject.NetworkAddress; | ||||
| import ch.dissem.bitmessage.ports.NetworkHandler; | ||||
| import ch.dissem.bitmessage.utils.UnixTime; | ||||
| import org.slf4j.Logger; | ||||
| import org.slf4j.LoggerFactory; | ||||
| @@ -32,12 +31,13 @@ import static ch.dissem.bitmessage.networking.DefaultNetworkHandler.NETWORK_MAGI | ||||
| /** | ||||
|  * @author Christian Basler | ||||
|  */ | ||||
| @Deprecated | ||||
| @SuppressWarnings("deprecation") | ||||
| public class ConnectionOrganizer implements Runnable { | ||||
|     private static final Logger LOG = LoggerFactory.getLogger(ConnectionOrganizer.class); | ||||
|  | ||||
|     private final InternalContext ctx; | ||||
|     private final DefaultNetworkHandler networkHandler; | ||||
|     private final NetworkHandler.MessageListener listener; | ||||
|  | ||||
|     private Connection initialConnection; | ||||
|  | ||||
| @@ -45,7 +45,6 @@ public class ConnectionOrganizer implements Runnable { | ||||
|                                DefaultNetworkHandler networkHandler) { | ||||
|         this.ctx = ctx; | ||||
|         this.networkHandler = networkHandler; | ||||
|         this.listener = ctx.getNetworkListener(); | ||||
|     } | ||||
|  | ||||
|     @Override | ||||
|   | ||||
| @@ -39,7 +39,6 @@ import static ch.dissem.bitmessage.networking.AbstractConnection.Mode.SERVER; | ||||
| import static ch.dissem.bitmessage.networking.AbstractConnection.State.ACTIVE; | ||||
| import static ch.dissem.bitmessage.utils.DebugUtils.inc; | ||||
| import static ch.dissem.bitmessage.utils.ThreadFactoryBuilder.pool; | ||||
| import static java.util.Collections.newSetFromMap; | ||||
|  | ||||
| /** | ||||
|  * Handles all the networky stuff. | ||||
| @@ -59,7 +58,7 @@ public class DefaultNetworkHandler implements NetworkHandler, ContextHolder { | ||||
|     private ServerRunnable server; | ||||
|     private volatile boolean running; | ||||
|  | ||||
|     final Set<InventoryVector> requestedObjects = newSetFromMap(new ConcurrentHashMap<InventoryVector, Boolean>(50_000)); | ||||
|     final Map<InventoryVector, Long> requestedObjects = new ConcurrentHashMap<>(50_000); | ||||
|  | ||||
|     @Override | ||||
|     public void setContext(InternalContext context) { | ||||
|   | ||||
| @@ -31,6 +31,7 @@ import static ch.dissem.bitmessage.networking.AbstractConnection.Mode.SERVER; | ||||
| /** | ||||
|  * @author Christian Basler | ||||
|  */ | ||||
| @Deprecated | ||||
| public class ServerRunnable implements Runnable, Closeable { | ||||
|     private static final Logger LOG = LoggerFactory.getLogger(ServerRunnable.class); | ||||
|     private final InternalContext ctx; | ||||
|   | ||||
| @@ -26,11 +26,10 @@ import ch.dissem.bitmessage.entity.valueobject.NetworkAddress; | ||||
| import ch.dissem.bitmessage.exception.NodeException; | ||||
| import ch.dissem.bitmessage.factory.V3MessageReader; | ||||
| import ch.dissem.bitmessage.networking.AbstractConnection; | ||||
| import ch.dissem.bitmessage.utils.UnixTime; | ||||
|  | ||||
| import java.nio.ByteBuffer; | ||||
| import java.util.Iterator; | ||||
| import java.util.Queue; | ||||
| import java.util.Set; | ||||
| import java.util.*; | ||||
|  | ||||
| import static ch.dissem.bitmessage.networking.AbstractConnection.Mode.CLIENT; | ||||
| import static ch.dissem.bitmessage.networking.AbstractConnection.Mode.SYNC; | ||||
| @@ -46,7 +45,7 @@ public class ConnectionInfo extends AbstractConnection { | ||||
|     private long lastUpdate = System.currentTimeMillis(); | ||||
|  | ||||
|     public ConnectionInfo(InternalContext context, Mode mode, NetworkAddress node, | ||||
|                           Set<InventoryVector> commonRequestedObjects, long syncTimeout) { | ||||
|                           Map<InventoryVector, Long> commonRequestedObjects, long syncTimeout) { | ||||
|         super(context, mode, node, commonRequestedObjects, syncTimeout); | ||||
|         headerOut.flip(); | ||||
|         if (mode == CLIENT || mode == SYNC) { | ||||
| @@ -147,8 +146,12 @@ public class ConnectionInfo extends AbstractConnection { | ||||
|     protected void send(MessagePayload payload) { | ||||
|         sendingQueue.add(payload); | ||||
|         if (payload instanceof GetData) { | ||||
|             requestedObjects.addAll(((GetData) payload).getInventory()); | ||||
|             commonRequestedObjects.addAll(((GetData) payload).getInventory()); | ||||
|             Long now = UnixTime.now(); | ||||
|             List<InventoryVector> inventory = ((GetData) payload).getInventory(); | ||||
|             requestedObjects.addAll(inventory); | ||||
|             for (InventoryVector iv : inventory) { | ||||
|                 commonRequestedObjects.put(iv, now); | ||||
|             } | ||||
|         } | ||||
|     } | ||||
|  | ||||
|   | ||||
| @@ -46,14 +46,14 @@ import static ch.dissem.bitmessage.utils.Collections.selectRandom; | ||||
| import static ch.dissem.bitmessage.utils.DebugUtils.inc; | ||||
| import static ch.dissem.bitmessage.utils.ThreadFactoryBuilder.pool; | ||||
| import static java.nio.channels.SelectionKey.*; | ||||
| import static java.util.Collections.newSetFromMap; | ||||
|  | ||||
| /** | ||||
|  * Network handler using java.nio, resulting in less threads. | ||||
|  */ | ||||
| public class NioNetworkHandler implements NetworkHandler, InternalContext.ContextHolder { | ||||
|     private static final Logger LOG = LoggerFactory.getLogger(NioNetworkHandler.class); | ||||
|     private static final long REQUESTED_OBJECTS_MAX_TIME = 30 * 60_000; // 30 minutes | ||||
|     private static final long REQUESTED_OBJECTS_MAX_TIME = 2 * 60_000; // 2 minutes | ||||
|     private static final Long DELAYED = Long.MIN_VALUE; | ||||
|  | ||||
|     private final ExecutorService threadPool = Executors.newCachedThreadPool( | ||||
|         pool("network") | ||||
| @@ -66,8 +66,7 @@ public class NioNetworkHandler implements NetworkHandler, InternalContext.Contex | ||||
|     private ServerSocketChannel serverChannel; | ||||
|     private Queue<NetworkAddress> connectionQueue = new ConcurrentLinkedQueue<>(); | ||||
|     private Map<ConnectionInfo, SelectionKey> connections = new ConcurrentHashMap<>(); | ||||
|     private final Set<InventoryVector> requestedObjects = newSetFromMap(new ConcurrentHashMap<InventoryVector, Boolean>(10_000)); | ||||
|     private long requestedObjectsTimeout = 0; | ||||
|     private final Map<InventoryVector, Long> requestedObjects = new ConcurrentHashMap<>(10_000); | ||||
|  | ||||
|     private Thread starter; | ||||
|  | ||||
| @@ -80,7 +79,7 @@ public class NioNetworkHandler implements NetworkHandler, InternalContext.Contex | ||||
|                     channel.configureBlocking(false); | ||||
|                     ConnectionInfo connection = new ConnectionInfo(ctx, SYNC, | ||||
|                         new NetworkAddress.Builder().ip(server).port(port).stream(1).build(), | ||||
|                         new HashSet<InventoryVector>(), timeoutInSeconds); | ||||
|                         new HashMap<InventoryVector, Long>(), timeoutInSeconds); | ||||
|                     while (channel.isConnected() && !connection.isSyncFinished()) { | ||||
|                         write(channel, connection); | ||||
|                         read(channel, connection); | ||||
| @@ -147,7 +146,6 @@ public class NioNetworkHandler implements NetworkHandler, InternalContext.Contex | ||||
|         } catch (IOException e) { | ||||
|             throw new ApplicationException(e); | ||||
|         } | ||||
|         requestedObjectsTimeout = System.currentTimeMillis() + REQUESTED_OBJECTS_MAX_TIME; | ||||
|         requestedObjects.clear(); | ||||
|  | ||||
|         starter = thread("connection manager", new Runnable() { | ||||
| @@ -189,15 +187,22 @@ public class NioNetworkHandler implements NetworkHandler, InternalContext.Contex | ||||
|                     // The list 'requested objects' helps to prevent downloading an object | ||||
|                     // twice. From time to time there is an error though, and an object is | ||||
|                     // never downloaded. To prevent a large list of failed objects and give | ||||
|                     // them a chance to get downloaded again, let's clear the list from time | ||||
|                     // to time. The timeout should be such that most of the initial object | ||||
|                     // sync should be done by then, but small enough to prevent objects with | ||||
|                     // a normal time out from not being downloaded at all. | ||||
|                     long now = System.currentTimeMillis(); | ||||
|                     if (now > requestedObjectsTimeout) { | ||||
|                         requestedObjectsTimeout = now + REQUESTED_OBJECTS_MAX_TIME; | ||||
|                         requestedObjects.clear(); | ||||
|                     // them a chance to get downloaded again, we will attempt to download an | ||||
|                     // object from another node after some time out. | ||||
|                     long timedOut = System.currentTimeMillis() - REQUESTED_OBJECTS_MAX_TIME; | ||||
|                     List<InventoryVector> delayed = new LinkedList<>(); | ||||
|                     Iterator<Map.Entry<InventoryVector, Long>> iterator = requestedObjects.entrySet().iterator(); | ||||
|                     while (iterator.hasNext()) { | ||||
|                         Map.Entry<InventoryVector, Long> e = iterator.next(); | ||||
|                         //noinspection NumberEquality | ||||
|                         if (e.getValue() == DELAYED) { | ||||
|                             iterator.remove(); | ||||
|                         } else if (e.getValue() < timedOut) { | ||||
|                             delayed.add(e.getKey()); | ||||
|                             e.setValue(DELAYED); | ||||
|                         } | ||||
|                     } | ||||
|                     request(delayed); | ||||
|  | ||||
|                     try { | ||||
|                         Thread.sleep(30_000); | ||||
| @@ -422,7 +427,7 @@ public class NioNetworkHandler implements NetworkHandler, InternalContext.Contex | ||||
|                         break; | ||||
|                     } | ||||
|                 } | ||||
|                 if (connection.knowsOf(next)) { | ||||
|                 if (connection.knowsOf(next) && !connection.requested(next)) { | ||||
|                     List<InventoryVector> ivs = distribution.get(connection); | ||||
|                     if (ivs.size() == GetData.MAX_INVENTORY_SIZE) { | ||||
|                         connection.send(new GetData.Builder().inventory(ivs).build()); | ||||
| @@ -442,7 +447,9 @@ public class NioNetworkHandler implements NetworkHandler, InternalContext.Contex | ||||
|         } while (iterator.hasNext()); | ||||
|  | ||||
|         // remove objects nobody knows of | ||||
|         requestedObjects.removeAll(inventoryVectors); | ||||
|         for (InventoryVector iv : inventoryVectors) { | ||||
|             requestedObjects.remove(iv); | ||||
|         } | ||||
|  | ||||
|         for (ConnectionInfo connection : distribution.keySet()) { | ||||
|             List<InventoryVector> ivs = distribution.get(connection); | ||||
|   | ||||
| @@ -76,6 +76,7 @@ public class NetworkHandlerTest { | ||||
|     } | ||||
|  | ||||
|     @Parameterized.Parameters | ||||
|     @SuppressWarnings("deprecation") | ||||
|     public static List<Object[]> parameters() { | ||||
|         return Arrays.asList(new Object[][]{ | ||||
|             {new DefaultNetworkHandler(), new DefaultNetworkHandler()}, | ||||
|   | ||||
		Reference in New Issue
	
	Block a user