Compare commits
	
		
			8 Commits
		
	
	
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
| 64ee41aee8 | |||
| d3205336ed | |||
| 7b14081c63 | |||
| e1173d0619 | |||
| f0a5a40edd | |||
| 1bc82cdd7d | |||
| a880a8c10b | |||
| 6a5fe01860 | 
| @@ -198,6 +198,7 @@ public class Plaintext implements Streamable { | |||||||
|             } |             } | ||||||
|         } |         } | ||||||
|     } |     } | ||||||
|  |  | ||||||
|     public void write(ByteBuffer buffer, boolean includeSignature) { |     public void write(ByteBuffer buffer, boolean includeSignature) { | ||||||
|         Encode.varInt(from.getVersion(), buffer); |         Encode.varInt(from.getVersion(), buffer); | ||||||
|         Encode.varInt(from.getStream(), buffer); |         Encode.varInt(from.getStream(), buffer); | ||||||
| @@ -263,6 +264,9 @@ public class Plaintext implements Streamable { | |||||||
|     } |     } | ||||||
|  |  | ||||||
|     public void setStatus(Status status) { |     public void setStatus(Status status) { | ||||||
|  |         if (status != Status.RECEIVED && sent == null && status != Status.DRAFT) { | ||||||
|  |             sent = UnixTime.now(); | ||||||
|  |         } | ||||||
|         this.status = status; |         this.status = status; | ||||||
|     } |     } | ||||||
|  |  | ||||||
| @@ -279,6 +283,7 @@ public class Plaintext implements Streamable { | |||||||
|     } |     } | ||||||
|  |  | ||||||
|     public void updateNextTry() { |     public void updateNextTry() { | ||||||
|  |         if (to != null) { | ||||||
|             if (nextTry == null) { |             if (nextTry == null) { | ||||||
|                 if (sent != null && to.has(Feature.DOES_ACK)) { |                 if (sent != null && to.has(Feature.DOES_ACK)) { | ||||||
|                     nextTry = UnixTime.now(+ttl); |                     nextTry = UnixTime.now(+ttl); | ||||||
| @@ -289,6 +294,7 @@ public class Plaintext implements Streamable { | |||||||
|                 retries++; |                 retries++; | ||||||
|             } |             } | ||||||
|         } |         } | ||||||
|  |     } | ||||||
|  |  | ||||||
|     public String getSubject() { |     public String getSubject() { | ||||||
|         Scanner s = new Scanner(new ByteArrayInputStream(message), "UTF-8"); |         Scanner s = new Scanner(new ByteArrayInputStream(message), "UTF-8"); | ||||||
| @@ -323,7 +329,7 @@ public class Plaintext implements Streamable { | |||||||
|             Objects.equals(from, plaintext.from) && |             Objects.equals(from, plaintext.from) && | ||||||
|             Arrays.equals(message, plaintext.message) && |             Arrays.equals(message, plaintext.message) && | ||||||
|             Objects.equals(getAckMessage(), plaintext.getAckMessage()) && |             Objects.equals(getAckMessage(), plaintext.getAckMessage()) && | ||||||
|                 Arrays.equals(to.getRipe(), plaintext.to.getRipe()) && |             Arrays.equals(to == null ? null : to.getRipe(), plaintext.to == null ? null : plaintext.to.getRipe()) && | ||||||
|             Arrays.equals(signature, plaintext.signature) && |             Arrays.equals(signature, plaintext.signature) && | ||||||
|             Objects.equals(status, plaintext.status) && |             Objects.equals(status, plaintext.status) && | ||||||
|             Objects.equals(sent, plaintext.sent) && |             Objects.equals(sent, plaintext.sent) && | ||||||
|   | |||||||
| @@ -54,7 +54,7 @@ public abstract class AbstractConnection { | |||||||
|     protected final NetworkHandler.MessageListener listener; |     protected final NetworkHandler.MessageListener listener; | ||||||
|     protected final Map<InventoryVector, Long> ivCache; |     protected final Map<InventoryVector, Long> ivCache; | ||||||
|     protected final Deque<MessagePayload> sendingQueue; |     protected final Deque<MessagePayload> sendingQueue; | ||||||
|     protected final Set<InventoryVector> commonRequestedObjects; |     protected final Map<InventoryVector, Long> commonRequestedObjects; | ||||||
|     protected final Set<InventoryVector> requestedObjects; |     protected final Set<InventoryVector> requestedObjects; | ||||||
|  |  | ||||||
|     protected volatile State state; |     protected volatile State state; | ||||||
| @@ -71,7 +71,7 @@ public abstract class AbstractConnection { | |||||||
|  |  | ||||||
|     public AbstractConnection(InternalContext context, Mode mode, |     public AbstractConnection(InternalContext context, Mode mode, | ||||||
|                               NetworkAddress node, |                               NetworkAddress node, | ||||||
|                               Set<InventoryVector> commonRequestedObjects, |                               Map<InventoryVector, Long> commonRequestedObjects, | ||||||
|                               long syncTimeout) { |                               long syncTimeout) { | ||||||
|         this.ctx = context; |         this.ctx = context; | ||||||
|         this.mode = mode; |         this.mode = mode; | ||||||
| @@ -143,7 +143,7 @@ public abstract class AbstractConnection { | |||||||
|         int originalSize = inv.getInventory().size(); |         int originalSize = inv.getInventory().size(); | ||||||
|         updateIvCache(inv.getInventory()); |         updateIvCache(inv.getInventory()); | ||||||
|         List<InventoryVector> missing = ctx.getInventory().getMissing(inv.getInventory(), streams); |         List<InventoryVector> missing = ctx.getInventory().getMissing(inv.getInventory(), streams); | ||||||
|         missing.removeAll(commonRequestedObjects); |         missing.removeAll(commonRequestedObjects.keySet()); | ||||||
|         LOG.trace("Received inventory with " + originalSize + " elements, of which are " |         LOG.trace("Received inventory with " + originalSize + " elements, of which are " | ||||||
|             + missing.size() + " missing."); |             + missing.size() + " missing."); | ||||||
|         send(new GetData.Builder().inventory(missing).build()); |         send(new GetData.Builder().inventory(missing).build()); | ||||||
| @@ -175,7 +175,7 @@ public abstract class AbstractConnection { | |||||||
|         } catch (IOException e) { |         } catch (IOException e) { | ||||||
|             LOG.error("Stream " + objectMessage.getStream() + ", object type " + objectMessage.getType() + ": " + e.getMessage(), e); |             LOG.error("Stream " + objectMessage.getStream() + ", object type " + objectMessage.getType() + ": " + e.getMessage(), e); | ||||||
|         } finally { |         } finally { | ||||||
|             if (!commonRequestedObjects.remove(objectMessage.getInventoryVector())) { |             if (commonRequestedObjects.remove(objectMessage.getInventoryVector()) == null) { | ||||||
|                 LOG.debug("Received object that wasn't requested."); |                 LOG.debug("Received object that wasn't requested."); | ||||||
|             } |             } | ||||||
|         } |         } | ||||||
| @@ -205,6 +205,10 @@ public abstract class AbstractConnection { | |||||||
|         return ivCache.containsKey(iv); |         return ivCache.containsKey(iv); | ||||||
|     } |     } | ||||||
|  |  | ||||||
|  |     public boolean requested(InventoryVector iv) { | ||||||
|  |         return requestedObjects.contains(iv); | ||||||
|  |     } | ||||||
|  |  | ||||||
|     private void cleanupIvCache() { |     private void cleanupIvCache() { | ||||||
|         Long fiveMinutesAgo = UnixTime.now(-5 * MINUTE); |         Long fiveMinutesAgo = UnixTime.now(-5 * MINUTE); | ||||||
|         for (Map.Entry<InventoryVector, Long> entry : ivCache.entrySet()) { |         for (Map.Entry<InventoryVector, Long> entry : ivCache.entrySet()) { | ||||||
|   | |||||||
| @@ -36,9 +36,9 @@ import java.net.InetAddress; | |||||||
| import java.net.InetSocketAddress; | import java.net.InetSocketAddress; | ||||||
| import java.net.Socket; | import java.net.Socket; | ||||||
| import java.net.SocketTimeoutException; | import java.net.SocketTimeoutException; | ||||||
| import java.util.HashSet; | import java.util.HashMap; | ||||||
|  | import java.util.Map; | ||||||
| import java.util.Objects; | import java.util.Objects; | ||||||
| import java.util.Set; |  | ||||||
|  |  | ||||||
| import static ch.dissem.bitmessage.networking.AbstractConnection.Mode.CLIENT; | import static ch.dissem.bitmessage.networking.AbstractConnection.Mode.CLIENT; | ||||||
| import static ch.dissem.bitmessage.networking.AbstractConnection.Mode.SYNC; | import static ch.dissem.bitmessage.networking.AbstractConnection.Mode.SYNC; | ||||||
| @@ -64,20 +64,20 @@ class Connection extends AbstractConnection { | |||||||
|     private boolean socketInitialized; |     private boolean socketInitialized; | ||||||
|  |  | ||||||
|     public Connection(InternalContext context, Mode mode, Socket socket, |     public Connection(InternalContext context, Mode mode, Socket socket, | ||||||
|                       Set<InventoryVector> requestedObjectsMap) throws IOException { |                       Map<InventoryVector, Long> requestedObjectsMap) throws IOException { | ||||||
|         this(context, mode, socket, requestedObjectsMap, |         this(context, mode, socket, requestedObjectsMap, | ||||||
|             new NetworkAddress.Builder().ip(socket.getInetAddress()).port(socket.getPort()).stream(1).build(), |             new NetworkAddress.Builder().ip(socket.getInetAddress()).port(socket.getPort()).stream(1).build(), | ||||||
|             0); |             0); | ||||||
|     } |     } | ||||||
|  |  | ||||||
|     public Connection(InternalContext context, Mode mode, NetworkAddress node, |     public Connection(InternalContext context, Mode mode, NetworkAddress node, | ||||||
|                       Set<InventoryVector> requestedObjectsMap) { |                       Map<InventoryVector, Long> requestedObjectsMap) { | ||||||
|         this(context, mode, new Socket(), requestedObjectsMap, |         this(context, mode, new Socket(), requestedObjectsMap, | ||||||
|             node, 0); |             node, 0); | ||||||
|     } |     } | ||||||
|  |  | ||||||
|     private Connection(InternalContext context, Mode mode, Socket socket, |     private Connection(InternalContext context, Mode mode, Socket socket, | ||||||
|                        Set<InventoryVector> commonRequestedObjects, NetworkAddress node, long syncTimeout) { |                        Map<InventoryVector, Long> commonRequestedObjects, NetworkAddress node, long syncTimeout) { | ||||||
|         super(context, mode, node, commonRequestedObjects, syncTimeout); |         super(context, mode, node, commonRequestedObjects, syncTimeout); | ||||||
|         this.startTime = UnixTime.now(); |         this.startTime = UnixTime.now(); | ||||||
|         this.socket = socket; |         this.socket = socket; | ||||||
| @@ -86,7 +86,7 @@ class Connection extends AbstractConnection { | |||||||
|     public static Connection sync(InternalContext ctx, InetAddress address, int port, MessageListener listener, |     public static Connection sync(InternalContext ctx, InetAddress address, int port, MessageListener listener, | ||||||
|                                   long timeoutInSeconds) throws IOException { |                                   long timeoutInSeconds) throws IOException { | ||||||
|         return new Connection(ctx, SYNC, new Socket(address, port), |         return new Connection(ctx, SYNC, new Socket(address, port), | ||||||
|                 new HashSet<InventoryVector>(), |             new HashMap<InventoryVector, Long>(), | ||||||
|             new NetworkAddress.Builder().ip(address).port(port).stream(1).build(), |             new NetworkAddress.Builder().ip(address).port(port).stream(1).build(), | ||||||
|             timeoutInSeconds); |             timeoutInSeconds); | ||||||
|     } |     } | ||||||
|   | |||||||
| @@ -18,7 +18,6 @@ package ch.dissem.bitmessage.networking; | |||||||
|  |  | ||||||
| import ch.dissem.bitmessage.InternalContext; | import ch.dissem.bitmessage.InternalContext; | ||||||
| import ch.dissem.bitmessage.entity.valueobject.NetworkAddress; | import ch.dissem.bitmessage.entity.valueobject.NetworkAddress; | ||||||
| import ch.dissem.bitmessage.ports.NetworkHandler; |  | ||||||
| import ch.dissem.bitmessage.utils.UnixTime; | import ch.dissem.bitmessage.utils.UnixTime; | ||||||
| import org.slf4j.Logger; | import org.slf4j.Logger; | ||||||
| import org.slf4j.LoggerFactory; | import org.slf4j.LoggerFactory; | ||||||
| @@ -32,12 +31,13 @@ import static ch.dissem.bitmessage.networking.DefaultNetworkHandler.NETWORK_MAGI | |||||||
| /** | /** | ||||||
|  * @author Christian Basler |  * @author Christian Basler | ||||||
|  */ |  */ | ||||||
|  | @Deprecated | ||||||
|  | @SuppressWarnings("deprecation") | ||||||
| public class ConnectionOrganizer implements Runnable { | public class ConnectionOrganizer implements Runnable { | ||||||
|     private static final Logger LOG = LoggerFactory.getLogger(ConnectionOrganizer.class); |     private static final Logger LOG = LoggerFactory.getLogger(ConnectionOrganizer.class); | ||||||
|  |  | ||||||
|     private final InternalContext ctx; |     private final InternalContext ctx; | ||||||
|     private final DefaultNetworkHandler networkHandler; |     private final DefaultNetworkHandler networkHandler; | ||||||
|     private final NetworkHandler.MessageListener listener; |  | ||||||
|  |  | ||||||
|     private Connection initialConnection; |     private Connection initialConnection; | ||||||
|  |  | ||||||
| @@ -45,7 +45,6 @@ public class ConnectionOrganizer implements Runnable { | |||||||
|                                DefaultNetworkHandler networkHandler) { |                                DefaultNetworkHandler networkHandler) { | ||||||
|         this.ctx = ctx; |         this.ctx = ctx; | ||||||
|         this.networkHandler = networkHandler; |         this.networkHandler = networkHandler; | ||||||
|         this.listener = ctx.getNetworkListener(); |  | ||||||
|     } |     } | ||||||
|  |  | ||||||
|     @Override |     @Override | ||||||
|   | |||||||
| @@ -39,7 +39,6 @@ import static ch.dissem.bitmessage.networking.AbstractConnection.Mode.SERVER; | |||||||
| import static ch.dissem.bitmessage.networking.AbstractConnection.State.ACTIVE; | import static ch.dissem.bitmessage.networking.AbstractConnection.State.ACTIVE; | ||||||
| import static ch.dissem.bitmessage.utils.DebugUtils.inc; | import static ch.dissem.bitmessage.utils.DebugUtils.inc; | ||||||
| import static ch.dissem.bitmessage.utils.ThreadFactoryBuilder.pool; | import static ch.dissem.bitmessage.utils.ThreadFactoryBuilder.pool; | ||||||
| import static java.util.Collections.newSetFromMap; |  | ||||||
|  |  | ||||||
| /** | /** | ||||||
|  * Handles all the networky stuff. |  * Handles all the networky stuff. | ||||||
| @@ -59,7 +58,7 @@ public class DefaultNetworkHandler implements NetworkHandler, ContextHolder { | |||||||
|     private ServerRunnable server; |     private ServerRunnable server; | ||||||
|     private volatile boolean running; |     private volatile boolean running; | ||||||
|  |  | ||||||
|     final Set<InventoryVector> requestedObjects = newSetFromMap(new ConcurrentHashMap<InventoryVector, Boolean>(50_000)); |     final Map<InventoryVector, Long> requestedObjects = new ConcurrentHashMap<>(50_000); | ||||||
|  |  | ||||||
|     @Override |     @Override | ||||||
|     public void setContext(InternalContext context) { |     public void setContext(InternalContext context) { | ||||||
|   | |||||||
| @@ -31,6 +31,7 @@ import static ch.dissem.bitmessage.networking.AbstractConnection.Mode.SERVER; | |||||||
| /** | /** | ||||||
|  * @author Christian Basler |  * @author Christian Basler | ||||||
|  */ |  */ | ||||||
|  | @Deprecated | ||||||
| public class ServerRunnable implements Runnable, Closeable { | public class ServerRunnable implements Runnable, Closeable { | ||||||
|     private static final Logger LOG = LoggerFactory.getLogger(ServerRunnable.class); |     private static final Logger LOG = LoggerFactory.getLogger(ServerRunnable.class); | ||||||
|     private final InternalContext ctx; |     private final InternalContext ctx; | ||||||
|   | |||||||
| @@ -26,11 +26,10 @@ import ch.dissem.bitmessage.entity.valueobject.NetworkAddress; | |||||||
| import ch.dissem.bitmessage.exception.NodeException; | import ch.dissem.bitmessage.exception.NodeException; | ||||||
| import ch.dissem.bitmessage.factory.V3MessageReader; | import ch.dissem.bitmessage.factory.V3MessageReader; | ||||||
| import ch.dissem.bitmessage.networking.AbstractConnection; | import ch.dissem.bitmessage.networking.AbstractConnection; | ||||||
|  | import ch.dissem.bitmessage.utils.UnixTime; | ||||||
|  |  | ||||||
| import java.nio.ByteBuffer; | import java.nio.ByteBuffer; | ||||||
| import java.util.Iterator; | import java.util.*; | ||||||
| import java.util.Queue; |  | ||||||
| import java.util.Set; |  | ||||||
|  |  | ||||||
| import static ch.dissem.bitmessage.networking.AbstractConnection.Mode.CLIENT; | import static ch.dissem.bitmessage.networking.AbstractConnection.Mode.CLIENT; | ||||||
| import static ch.dissem.bitmessage.networking.AbstractConnection.Mode.SYNC; | import static ch.dissem.bitmessage.networking.AbstractConnection.Mode.SYNC; | ||||||
| @@ -46,7 +45,7 @@ public class ConnectionInfo extends AbstractConnection { | |||||||
|     private long lastUpdate = System.currentTimeMillis(); |     private long lastUpdate = System.currentTimeMillis(); | ||||||
|  |  | ||||||
|     public ConnectionInfo(InternalContext context, Mode mode, NetworkAddress node, |     public ConnectionInfo(InternalContext context, Mode mode, NetworkAddress node, | ||||||
|                           Set<InventoryVector> commonRequestedObjects, long syncTimeout) { |                           Map<InventoryVector, Long> commonRequestedObjects, long syncTimeout) { | ||||||
|         super(context, mode, node, commonRequestedObjects, syncTimeout); |         super(context, mode, node, commonRequestedObjects, syncTimeout); | ||||||
|         headerOut.flip(); |         headerOut.flip(); | ||||||
|         if (mode == CLIENT || mode == SYNC) { |         if (mode == CLIENT || mode == SYNC) { | ||||||
| @@ -147,8 +146,12 @@ public class ConnectionInfo extends AbstractConnection { | |||||||
|     protected void send(MessagePayload payload) { |     protected void send(MessagePayload payload) { | ||||||
|         sendingQueue.add(payload); |         sendingQueue.add(payload); | ||||||
|         if (payload instanceof GetData) { |         if (payload instanceof GetData) { | ||||||
|             requestedObjects.addAll(((GetData) payload).getInventory()); |             Long now = UnixTime.now(); | ||||||
|             commonRequestedObjects.addAll(((GetData) payload).getInventory()); |             List<InventoryVector> inventory = ((GetData) payload).getInventory(); | ||||||
|  |             requestedObjects.addAll(inventory); | ||||||
|  |             for (InventoryVector iv : inventory) { | ||||||
|  |                 commonRequestedObjects.put(iv, now); | ||||||
|  |             } | ||||||
|         } |         } | ||||||
|     } |     } | ||||||
|  |  | ||||||
|   | |||||||
| @@ -46,14 +46,14 @@ import static ch.dissem.bitmessage.utils.Collections.selectRandom; | |||||||
| import static ch.dissem.bitmessage.utils.DebugUtils.inc; | import static ch.dissem.bitmessage.utils.DebugUtils.inc; | ||||||
| import static ch.dissem.bitmessage.utils.ThreadFactoryBuilder.pool; | import static ch.dissem.bitmessage.utils.ThreadFactoryBuilder.pool; | ||||||
| import static java.nio.channels.SelectionKey.*; | import static java.nio.channels.SelectionKey.*; | ||||||
| import static java.util.Collections.newSetFromMap; |  | ||||||
|  |  | ||||||
| /** | /** | ||||||
|  * Network handler using java.nio, resulting in less threads. |  * Network handler using java.nio, resulting in less threads. | ||||||
|  */ |  */ | ||||||
| public class NioNetworkHandler implements NetworkHandler, InternalContext.ContextHolder { | public class NioNetworkHandler implements NetworkHandler, InternalContext.ContextHolder { | ||||||
|     private static final Logger LOG = LoggerFactory.getLogger(NioNetworkHandler.class); |     private static final Logger LOG = LoggerFactory.getLogger(NioNetworkHandler.class); | ||||||
|     private static final long REQUESTED_OBJECTS_MAX_TIME = 30 * 60_000; // 30 minutes |     private static final long REQUESTED_OBJECTS_MAX_TIME = 2 * 60_000; // 2 minutes | ||||||
|  |     private static final Long DELAYED = Long.MIN_VALUE; | ||||||
|  |  | ||||||
|     private final ExecutorService threadPool = Executors.newCachedThreadPool( |     private final ExecutorService threadPool = Executors.newCachedThreadPool( | ||||||
|         pool("network") |         pool("network") | ||||||
| @@ -66,8 +66,7 @@ public class NioNetworkHandler implements NetworkHandler, InternalContext.Contex | |||||||
|     private ServerSocketChannel serverChannel; |     private ServerSocketChannel serverChannel; | ||||||
|     private Queue<NetworkAddress> connectionQueue = new ConcurrentLinkedQueue<>(); |     private Queue<NetworkAddress> connectionQueue = new ConcurrentLinkedQueue<>(); | ||||||
|     private Map<ConnectionInfo, SelectionKey> connections = new ConcurrentHashMap<>(); |     private Map<ConnectionInfo, SelectionKey> connections = new ConcurrentHashMap<>(); | ||||||
|     private final Set<InventoryVector> requestedObjects = newSetFromMap(new ConcurrentHashMap<InventoryVector, Boolean>(10_000)); |     private final Map<InventoryVector, Long> requestedObjects = new ConcurrentHashMap<>(10_000); | ||||||
|     private long requestedObjectsTimeout = 0; |  | ||||||
|  |  | ||||||
|     private Thread starter; |     private Thread starter; | ||||||
|  |  | ||||||
| @@ -80,7 +79,7 @@ public class NioNetworkHandler implements NetworkHandler, InternalContext.Contex | |||||||
|                     channel.configureBlocking(false); |                     channel.configureBlocking(false); | ||||||
|                     ConnectionInfo connection = new ConnectionInfo(ctx, SYNC, |                     ConnectionInfo connection = new ConnectionInfo(ctx, SYNC, | ||||||
|                         new NetworkAddress.Builder().ip(server).port(port).stream(1).build(), |                         new NetworkAddress.Builder().ip(server).port(port).stream(1).build(), | ||||||
|                         new HashSet<InventoryVector>(), timeoutInSeconds); |                         new HashMap<InventoryVector, Long>(), timeoutInSeconds); | ||||||
|                     while (channel.isConnected() && !connection.isSyncFinished()) { |                     while (channel.isConnected() && !connection.isSyncFinished()) { | ||||||
|                         write(channel, connection); |                         write(channel, connection); | ||||||
|                         read(channel, connection); |                         read(channel, connection); | ||||||
| @@ -147,7 +146,6 @@ public class NioNetworkHandler implements NetworkHandler, InternalContext.Contex | |||||||
|         } catch (IOException e) { |         } catch (IOException e) { | ||||||
|             throw new ApplicationException(e); |             throw new ApplicationException(e); | ||||||
|         } |         } | ||||||
|         requestedObjectsTimeout = System.currentTimeMillis() + REQUESTED_OBJECTS_MAX_TIME; |  | ||||||
|         requestedObjects.clear(); |         requestedObjects.clear(); | ||||||
|  |  | ||||||
|         starter = thread("connection manager", new Runnable() { |         starter = thread("connection manager", new Runnable() { | ||||||
| @@ -189,15 +187,22 @@ public class NioNetworkHandler implements NetworkHandler, InternalContext.Contex | |||||||
|                     // The list 'requested objects' helps to prevent downloading an object |                     // The list 'requested objects' helps to prevent downloading an object | ||||||
|                     // twice. From time to time there is an error though, and an object is |                     // twice. From time to time there is an error though, and an object is | ||||||
|                     // never downloaded. To prevent a large list of failed objects and give |                     // never downloaded. To prevent a large list of failed objects and give | ||||||
|                     // them a chance to get downloaded again, let's clear the list from time |                     // them a chance to get downloaded again, we will attempt to download an | ||||||
|                     // to time. The timeout should be such that most of the initial object |                     // object from another node after some time out. | ||||||
|                     // sync should be done by then, but small enough to prevent objects with |                     long timedOut = System.currentTimeMillis() - REQUESTED_OBJECTS_MAX_TIME; | ||||||
|                     // a normal time out from not being downloaded at all. |                     List<InventoryVector> delayed = new LinkedList<>(); | ||||||
|                     long now = System.currentTimeMillis(); |                     Iterator<Map.Entry<InventoryVector, Long>> iterator = requestedObjects.entrySet().iterator(); | ||||||
|                     if (now > requestedObjectsTimeout) { |                     while (iterator.hasNext()) { | ||||||
|                         requestedObjectsTimeout = now + REQUESTED_OBJECTS_MAX_TIME; |                         Map.Entry<InventoryVector, Long> e = iterator.next(); | ||||||
|                         requestedObjects.clear(); |                         //noinspection NumberEquality | ||||||
|  |                         if (e.getValue() == DELAYED) { | ||||||
|  |                             iterator.remove(); | ||||||
|  |                         } else if (e.getValue() < timedOut) { | ||||||
|  |                             delayed.add(e.getKey()); | ||||||
|  |                             e.setValue(DELAYED); | ||||||
|                         } |                         } | ||||||
|  |                     } | ||||||
|  |                     request(delayed); | ||||||
|  |  | ||||||
|                     try { |                     try { | ||||||
|                         Thread.sleep(30_000); |                         Thread.sleep(30_000); | ||||||
| @@ -422,7 +427,7 @@ public class NioNetworkHandler implements NetworkHandler, InternalContext.Contex | |||||||
|                         break; |                         break; | ||||||
|                     } |                     } | ||||||
|                 } |                 } | ||||||
|                 if (connection.knowsOf(next)) { |                 if (connection.knowsOf(next) && !connection.requested(next)) { | ||||||
|                     List<InventoryVector> ivs = distribution.get(connection); |                     List<InventoryVector> ivs = distribution.get(connection); | ||||||
|                     if (ivs.size() == GetData.MAX_INVENTORY_SIZE) { |                     if (ivs.size() == GetData.MAX_INVENTORY_SIZE) { | ||||||
|                         connection.send(new GetData.Builder().inventory(ivs).build()); |                         connection.send(new GetData.Builder().inventory(ivs).build()); | ||||||
| @@ -442,7 +447,9 @@ public class NioNetworkHandler implements NetworkHandler, InternalContext.Contex | |||||||
|         } while (iterator.hasNext()); |         } while (iterator.hasNext()); | ||||||
|  |  | ||||||
|         // remove objects nobody knows of |         // remove objects nobody knows of | ||||||
|         requestedObjects.removeAll(inventoryVectors); |         for (InventoryVector iv : inventoryVectors) { | ||||||
|  |             requestedObjects.remove(iv); | ||||||
|  |         } | ||||||
|  |  | ||||||
|         for (ConnectionInfo connection : distribution.keySet()) { |         for (ConnectionInfo connection : distribution.keySet()) { | ||||||
|             List<InventoryVector> ivs = distribution.get(connection); |             List<InventoryVector> ivs = distribution.get(connection); | ||||||
|   | |||||||
| @@ -76,6 +76,7 @@ public class NetworkHandlerTest { | |||||||
|     } |     } | ||||||
|  |  | ||||||
|     @Parameterized.Parameters |     @Parameterized.Parameters | ||||||
|  |     @SuppressWarnings("deprecation") | ||||||
|     public static List<Object[]> parameters() { |     public static List<Object[]> parameters() { | ||||||
|         return Arrays.asList(new Object[][]{ |         return Arrays.asList(new Object[][]{ | ||||||
|             {new DefaultNetworkHandler(), new DefaultNetworkHandler()}, |             {new DefaultNetworkHandler(), new DefaultNetworkHandler()}, | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user