Added synchronization code and unit test.
Synchronisation fails if the trusted host has no new messages - this needs to be fixed (but shouldn't be an issue for real world applications)
This commit is contained in:
		| @@ -21,13 +21,13 @@ import ch.dissem.bitmessage.entity.ObjectMessage; | ||||
| import ch.dissem.bitmessage.entity.Plaintext; | ||||
| import ch.dissem.bitmessage.entity.payload.*; | ||||
| import ch.dissem.bitmessage.entity.payload.Pubkey.Feature; | ||||
| import ch.dissem.bitmessage.entity.valueobject.InventoryVector; | ||||
| import ch.dissem.bitmessage.entity.valueobject.Label; | ||||
| import ch.dissem.bitmessage.entity.valueobject.PrivateKey; | ||||
| import ch.dissem.bitmessage.exception.DecryptionFailedException; | ||||
| import ch.dissem.bitmessage.factory.Factory; | ||||
| import ch.dissem.bitmessage.ports.*; | ||||
| import ch.dissem.bitmessage.utils.Property; | ||||
| import ch.dissem.bitmessage.utils.UnixTime; | ||||
| import org.slf4j.Logger; | ||||
| import org.slf4j.LoggerFactory; | ||||
|  | ||||
| @@ -179,19 +179,6 @@ public class BitmessageContext { | ||||
|         ); | ||||
|     } | ||||
|  | ||||
|     private void send(long stream, ObjectPayload payload, long timeToLive) { | ||||
|         long expires = UnixTime.now(+timeToLive); | ||||
|         LOG.info("Expires at " + expires); | ||||
|         ObjectMessage object = new ObjectMessage.Builder() | ||||
|                 .stream(stream) | ||||
|                 .expiresTime(expires) | ||||
|                 .payload(payload) | ||||
|                 .build(); | ||||
|         ctx.getSecurity().doProofOfWork(object, ctx.getNetworkNonceTrialsPerByte(), ctx.getNetworkExtraBytes()); | ||||
|         ctx.getInventory().storeObject(object); | ||||
|         ctx.getNetworkHandler().offer(object.getInventoryVector()); | ||||
|     } | ||||
|  | ||||
|     public void startup(Listener listener) { | ||||
|         this.listener = listener; | ||||
|         ctx.getNetworkHandler().start(new DefaultMessageListener(ctx, listener)); | ||||
| @@ -280,6 +267,7 @@ public class BitmessageContext { | ||||
|         MessageRepository messageRepo; | ||||
|         ProofOfWorkEngine proofOfWorkEngine; | ||||
|         Security security; | ||||
|         MessageCallback messageCallback; | ||||
|  | ||||
|         public Builder() { | ||||
|         } | ||||
| @@ -319,6 +307,11 @@ public class BitmessageContext { | ||||
|             return this; | ||||
|         } | ||||
|  | ||||
|         public Builder messageCallback(MessageCallback callback) { | ||||
|             this.messageCallback = callback; | ||||
|             return this; | ||||
|         } | ||||
|  | ||||
|         public Builder proofOfWorkEngine(ProofOfWorkEngine proofOfWorkEngine) { | ||||
|             this.proofOfWorkEngine = proofOfWorkEngine; | ||||
|             return this; | ||||
| @@ -333,6 +326,25 @@ public class BitmessageContext { | ||||
|             if (proofOfWorkEngine == null) { | ||||
|                 proofOfWorkEngine = new MultiThreadedPOWEngine(); | ||||
|             } | ||||
|             if (messageCallback == null) { | ||||
|                 messageCallback = new MessageCallback() { | ||||
|                     @Override | ||||
|                     public void proofOfWorkStarted(ObjectPayload message) { | ||||
|                     } | ||||
|  | ||||
|                     @Override | ||||
|                     public void proofOfWorkCompleted(ObjectPayload message) { | ||||
|                     } | ||||
|  | ||||
|                     @Override | ||||
|                     public void messageOffered(ObjectPayload message, InventoryVector iv) { | ||||
|                     } | ||||
|  | ||||
|                     @Override | ||||
|                     public void messageAcknowledged(InventoryVector iv) { | ||||
|                     } | ||||
|                 }; | ||||
|             } | ||||
|             return new BitmessageContext(this); | ||||
|         } | ||||
|  | ||||
|   | ||||
| @@ -49,12 +49,13 @@ public class InternalContext { | ||||
|     private final AddressRepository addressRepository; | ||||
|     private final MessageRepository messageRepository; | ||||
|     private final ProofOfWorkEngine proofOfWorkEngine; | ||||
|     private final MessageCallback messageCallback; | ||||
|  | ||||
|     private final TreeSet<Long> streams = new TreeSet<>(); | ||||
|     private final int port; | ||||
|     private long networkNonceTrialsPerByte = 1000; | ||||
|     private long networkExtraBytes = 1000; | ||||
|     private long clientNonce; | ||||
|     private final long clientNonce; | ||||
|     private final long networkNonceTrialsPerByte = 1000; | ||||
|     private final long networkExtraBytes = 1000; | ||||
|  | ||||
|     public InternalContext(BitmessageContext.Builder builder) { | ||||
|         this.security = builder.security; | ||||
| @@ -65,11 +66,11 @@ public class InternalContext { | ||||
|         this.messageRepository = builder.messageRepo; | ||||
|         this.proofOfWorkEngine = builder.proofOfWorkEngine; | ||||
|         this.clientNonce = security.randomNonce(); | ||||
|         this.messageCallback = builder.messageCallback; | ||||
|         this.port = builder.port; | ||||
|  | ||||
|         Singleton.initialize(security); | ||||
|  | ||||
|         port = builder.port; | ||||
|  | ||||
|         // TODO: streams of new identities and subscriptions should also be added. This works only after a restart. | ||||
|         for (BitmessageAddress address : addressRepository.getIdentities()) { | ||||
|             streams.add(address.getStream()); | ||||
| @@ -81,7 +82,10 @@ public class InternalContext { | ||||
|             streams.add(1L); | ||||
|         } | ||||
|  | ||||
|         init(inventory, nodeRegistry, networkHandler, addressRepository, messageRepository, proofOfWorkEngine, security); | ||||
|         init(inventory, nodeRegistry, networkHandler, addressRepository, messageRepository, proofOfWorkEngine); | ||||
|         for (BitmessageAddress identity : addressRepository.getIdentities()) { | ||||
|             streams.add(identity.getStream()); | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     private void init(Object... objects) { | ||||
| @@ -169,7 +173,9 @@ public class InternalContext { | ||||
|             } else if (payload instanceof Encrypted) { | ||||
|                 object.encrypt(to.getPubkey()); | ||||
|             } | ||||
|             messageCallback.proofOfWorkStarted(payload); | ||||
|             security.doProofOfWork(object, nonceTrialsPerByte, extraBytes); | ||||
|             messageCallback.proofOfWorkCompleted(payload); | ||||
|             if (payload instanceof PlaintextHolder) { | ||||
|                 Plaintext plaintext = ((PlaintextHolder) payload).getPlaintext(); | ||||
|                 plaintext.setInventoryVector(object.getInventoryVector()); | ||||
| @@ -177,6 +183,7 @@ public class InternalContext { | ||||
|             } | ||||
|             inventory.storeObject(object); | ||||
|             networkHandler.offer(object.getInventoryVector()); | ||||
|             messageCallback.messageOffered(payload, object.getInventoryVector()); | ||||
|         } catch (IOException e) { | ||||
|             throw new RuntimeException(e); | ||||
|         } | ||||
| @@ -193,16 +200,13 @@ public class InternalContext { | ||||
|                     .build(); | ||||
|             response.sign(identity.getPrivateKey()); | ||||
|             response.encrypt(security.createPublicKey(identity.getPublicDecryptionKey())); | ||||
|             messageCallback.proofOfWorkStarted(identity.getPubkey()); | ||||
|             security.doProofOfWork(response, networkNonceTrialsPerByte, networkExtraBytes); | ||||
|             if (response.isSigned()) { | ||||
|                 response.sign(identity.getPrivateKey()); | ||||
|             } | ||||
|             if (response instanceof Encrypted) { | ||||
|                 response.encrypt(security.createPublicKey(identity.getPublicDecryptionKey())); | ||||
|             } | ||||
|             messageCallback.proofOfWorkCompleted(identity.getPubkey()); | ||||
|             inventory.storeObject(response); | ||||
|             networkHandler.offer(response.getInventoryVector()); | ||||
|             // TODO: save that the pubkey was just sent, and on which stream! | ||||
|             messageCallback.messageOffered(identity.getPubkey(), response.getInventoryVector()); | ||||
|         } catch (IOException e) { | ||||
|             throw new RuntimeException(e); | ||||
|         } | ||||
| @@ -216,9 +220,12 @@ public class InternalContext { | ||||
|                 .expiresTime(expires) | ||||
|                 .payload(new GetPubkey(contact)) | ||||
|                 .build(); | ||||
|         messageCallback.proofOfWorkStarted(response.getPayload()); | ||||
|         security.doProofOfWork(response, networkNonceTrialsPerByte, networkExtraBytes); | ||||
|         messageCallback.proofOfWorkCompleted(response.getPayload()); | ||||
|         inventory.storeObject(response); | ||||
|         networkHandler.offer(response.getInventoryVector()); | ||||
|         messageCallback.messageOffered(response.getPayload(), response.getInventoryVector()); | ||||
|     } | ||||
|  | ||||
|     public long getClientNonce() { | ||||
|   | ||||
| @@ -0,0 +1,52 @@ | ||||
| /* | ||||
|  * Copyright 2015 Christian Basler | ||||
|  * | ||||
|  * Licensed under the Apache License, Version 2.0 (the "License"); | ||||
|  * you may not use this file except in compliance with the License. | ||||
|  * You may obtain a copy of the License at | ||||
|  * | ||||
|  *     http://www.apache.org/licenses/LICENSE-2.0 | ||||
|  * | ||||
|  * Unless required by applicable law or agreed to in writing, software | ||||
|  * distributed under the License is distributed on an "AS IS" BASIS, | ||||
|  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||
|  * See the License for the specific language governing permissions and | ||||
|  * limitations under the License. | ||||
|  */ | ||||
|  | ||||
| package ch.dissem.bitmessage; | ||||
|  | ||||
| import ch.dissem.bitmessage.entity.payload.ObjectPayload; | ||||
| import ch.dissem.bitmessage.entity.valueobject.InventoryVector; | ||||
|  | ||||
| /** | ||||
|  * Callback for message sending events, mostly so the user can be notified when POW is done. | ||||
|  */ | ||||
| public interface MessageCallback { | ||||
|     /** | ||||
|      * Called before calculation of proof of work begins. | ||||
|      */ | ||||
|     void proofOfWorkStarted(ObjectPayload message); | ||||
|  | ||||
|     /** | ||||
|      * Called after calculation of proof of work finished. | ||||
|      */ | ||||
|     void proofOfWorkCompleted(ObjectPayload message); | ||||
|  | ||||
|     /** | ||||
|      * Called once the message is offered to the network. Please note that this doesn't mean the message was sent, | ||||
|      * if the client is not connected to the network it's just stored in the inventory. | ||||
|      * <p> | ||||
|      * Also, please note that this is where the original payload as well as the {@link InventoryVector} of the sent | ||||
|      * message is available. If the callback needs the IV for some reason, it should be retrieved here. (Plaintext | ||||
|      * and Broadcast messages will have their IV property set automatically though.) | ||||
|      * </p> | ||||
|      */ | ||||
|     void messageOffered(ObjectPayload message, InventoryVector iv); | ||||
|  | ||||
|     /** | ||||
|      * This isn't called yet, as ACK messages aren't being processed yet. Also, this is only relevant for Plaintext | ||||
|      * messages. | ||||
|      */ | ||||
|     void messageAcknowledged(InventoryVector iv); | ||||
| } | ||||
| @@ -27,12 +27,24 @@ import java.net.InetAddress; | ||||
|  * Handles incoming messages | ||||
|  */ | ||||
| public interface NetworkHandler { | ||||
|     /** | ||||
|      * Connects to the trusted host, fetches and offers new messages and disconnects afterwards. | ||||
|      */ | ||||
|     Thread synchronize(InetAddress trustedHost, int port, MessageListener listener, long timeoutInSeconds); | ||||
|  | ||||
|     /** | ||||
|      * Start a full network node, accepting incoming connections and relaying objects. | ||||
|      */ | ||||
|     void start(MessageListener listener); | ||||
|  | ||||
|     /** | ||||
|      * Stop the full network node. | ||||
|      */ | ||||
|     void stop(); | ||||
|  | ||||
|     void synchronize(InetAddress trustedHost, int port, MessageListener listener) throws IOException; | ||||
|  | ||||
|     /** | ||||
|      * Offer new objects to up to 8 random nodes. | ||||
|      */ | ||||
|     void offer(InventoryVector iv); | ||||
|  | ||||
|     Property getNetworkStatus(); | ||||
|   | ||||
| @@ -35,6 +35,25 @@ public class Property { | ||||
|         this.properties = properties; | ||||
|     } | ||||
|  | ||||
|     public String getName() { | ||||
|         return name; | ||||
|     } | ||||
|  | ||||
|     public Object getValue() { | ||||
|         return value; | ||||
|     } | ||||
|  | ||||
|     public Property getProperty(String name) { | ||||
|         for (Property p : properties) { | ||||
|             if (name == null) { | ||||
|                 if (p.name == null) return p; | ||||
|             } else { | ||||
|                 if (name.equals(p.name)) return p; | ||||
|             } | ||||
|         } | ||||
|         return null; | ||||
|     } | ||||
|  | ||||
|     @Override | ||||
|     public String toString() { | ||||
|         return toString(""); | ||||
|   | ||||
| @@ -12,6 +12,8 @@ uploadArchives { | ||||
|  | ||||
| dependencies { | ||||
|     compile project(':domain') | ||||
|     testCompile 'org.slf4j:slf4j-simple:1.7.12' | ||||
|     testCompile 'junit:junit:4.11' | ||||
|     testCompile 'org.slf4j:slf4j-simple:1.7.12' | ||||
|     testCompile 'org.mockito:mockito-core:1.10.19' | ||||
|     testCompile project(':security-bc') | ||||
| } | ||||
| @@ -33,6 +33,7 @@ import org.slf4j.LoggerFactory; | ||||
| import java.io.IOException; | ||||
| import java.io.InputStream; | ||||
| import java.io.OutputStream; | ||||
| import java.net.InetAddress; | ||||
| import java.net.InetSocketAddress; | ||||
| import java.net.Socket; | ||||
| import java.net.SocketTimeoutException; | ||||
| @@ -54,43 +55,55 @@ public class Connection implements Runnable { | ||||
|     private final static Logger LOG = LoggerFactory.getLogger(Connection.class); | ||||
|     private static final int CONNECT_TIMEOUT = 5000; | ||||
|     private final ConcurrentMap<InventoryVector, Long> ivCache; | ||||
|     private InternalContext ctx; | ||||
|     private Mode mode; | ||||
|     private final InternalContext ctx; | ||||
|     private final Mode mode; | ||||
|     private final Socket socket; | ||||
|     private final MessageListener listener; | ||||
|     private final NetworkAddress host; | ||||
|     private final NetworkAddress node; | ||||
|     private final Queue<MessagePayload> sendingQueue = new ConcurrentLinkedDeque<>(); | ||||
|     private final Map<InventoryVector, Long> requestedObjects; | ||||
|     private final long syncTimeout; | ||||
|  | ||||
|     private State state; | ||||
|     private Socket socket; | ||||
|     private InputStream in; | ||||
|     private OutputStream out; | ||||
|     private MessageListener listener; | ||||
|     private int version; | ||||
|     private long[] streams; | ||||
|     private NetworkAddress host; | ||||
|     private NetworkAddress node; | ||||
|     private Queue<MessagePayload> sendingQueue = new ConcurrentLinkedDeque<>(); | ||||
|     private ConcurrentMap<InventoryVector, Long> requestedObjects; | ||||
|  | ||||
|     public Connection(InternalContext context, Mode mode, Socket socket, MessageListener listener, | ||||
|                       ConcurrentMap<InventoryVector, Long> requestedObjectsMap) throws IOException { | ||||
|         this(context, mode, listener, requestedObjectsMap); | ||||
|         this.socket = socket; | ||||
|         this.node = new NetworkAddress.Builder().ip(socket.getInetAddress()).port(socket.getPort()).stream(1).build(); | ||||
|         this(context, mode, listener, socket, requestedObjectsMap, | ||||
|                 new NetworkAddress.Builder().ip(socket.getInetAddress()).port(socket.getPort()).stream(1).build(), | ||||
|                 0); | ||||
|     } | ||||
|  | ||||
|     public Connection(InternalContext context, Mode mode, NetworkAddress node, MessageListener listener, | ||||
|                       ConcurrentMap<InventoryVector, Long> requestedObjectsMap) { | ||||
|         this(context, mode, listener, requestedObjectsMap); | ||||
|         this.socket = new Socket(); | ||||
|         this.node = node; | ||||
|         this(context, mode, listener, new Socket(), requestedObjectsMap, | ||||
|                 node, 0); | ||||
|     } | ||||
|  | ||||
|     private Connection(InternalContext context, Mode mode, MessageListener listener, | ||||
|                        ConcurrentMap<InventoryVector, Long> requestedObjectsMap) { | ||||
|     private Connection(InternalContext context, Mode mode, MessageListener listener, Socket socket, | ||||
|                        Map<InventoryVector, Long> requestedObjectsMap, NetworkAddress node, long syncTimeout) { | ||||
|         this.ctx = context; | ||||
|         this.mode = mode; | ||||
|         this.state = CONNECTING; | ||||
|         this.listener = listener; | ||||
|         this.socket = socket; | ||||
|         this.requestedObjects = requestedObjectsMap; | ||||
|         this.host = new NetworkAddress.Builder().ipv6(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0).port(0).build(); | ||||
|         ivCache = new ConcurrentHashMap<>(); | ||||
|         this.node = node; | ||||
|         this.syncTimeout = (syncTimeout > 0 ? UnixTime.now(+syncTimeout) : 0); | ||||
|         this.ivCache = new ConcurrentHashMap<>(); | ||||
|     } | ||||
|  | ||||
|     public static Connection sync(InternalContext ctx, InetAddress address, int port, MessageListener listener, | ||||
|                                   long timeoutInSeconds) throws IOException { | ||||
|         return new Connection(ctx, Mode.CLIENT, listener, new Socket(address, port), | ||||
|                 new HashMap<InventoryVector, Long>(), | ||||
|                 new NetworkAddress.Builder().ip(address).port(port).stream(1).build(), | ||||
|                 timeoutInSeconds); | ||||
|     } | ||||
|  | ||||
|     public Mode getMode() { | ||||
| @@ -168,7 +181,7 @@ public class Connection implements Runnable { | ||||
|                                             + msg.getPayload().getCommand() + "'"); | ||||
|                             } | ||||
|                     } | ||||
|                     if (socket.isClosed()) state = DISCONNECTED; | ||||
|                     if (socket.isClosed() || syncFinished(msg)) disconnect(); | ||||
|                 } catch (SocketTimeoutException ignore) { | ||||
|                     if (state == ACTIVE) { | ||||
|                         sendQueue(); | ||||
| @@ -184,13 +197,27 @@ public class Connection implements Runnable { | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     @SuppressWarnings("RedundantIfStatement") | ||||
|     private boolean syncFinished(NetworkMessage msg) { | ||||
|         if (syncTimeout == 0 || state != ACTIVE) { | ||||
|             return false; | ||||
|         } | ||||
|         if (syncTimeout < UnixTime.now()) { | ||||
|             return true; | ||||
|         } | ||||
|         if (!(msg.getPayload() instanceof Addr) && requestedObjects.isEmpty() && sendingQueue.isEmpty()) { | ||||
|             return true; | ||||
|         } | ||||
|         return false; | ||||
|     } | ||||
|  | ||||
|     private void activateConnection() { | ||||
|         LOG.info("Successfully established connection with node " + node); | ||||
|         state = ACTIVE; | ||||
|         sendAddresses(); | ||||
|         sendInventory(); | ||||
|         node.setTime(UnixTime.now()); | ||||
|         ctx.getNodeRegistry().offerAddresses(Arrays.asList(node)); | ||||
|         ctx.getNodeRegistry().offerAddresses(Collections.singletonList(node)); | ||||
|     } | ||||
|  | ||||
|     private void sendQueue() { | ||||
|   | ||||
| @@ -66,6 +66,17 @@ public class DefaultNetworkHandler implements NetworkHandler, ContextHolder { | ||||
|         this.ctx = context; | ||||
|     } | ||||
|  | ||||
|     @Override | ||||
|     public Thread synchronize(InetAddress trustedHost, int port, MessageListener listener, long timeoutInSeconds) { | ||||
|         try { | ||||
|             Thread t = new Thread(Connection.sync(ctx, trustedHost, port, listener, timeoutInSeconds)); | ||||
|             t.start(); | ||||
|             return t; | ||||
|         } catch (IOException e) { | ||||
|             throw new RuntimeException(e); | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     @Override | ||||
|     public void start(final MessageListener listener) { | ||||
|         if (listener == null) { | ||||
| @@ -150,11 +161,6 @@ public class DefaultNetworkHandler implements NetworkHandler, ContextHolder { | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     @Override | ||||
|     public void synchronize(InetAddress trustedHost, int port, MessageListener listener) throws IOException { | ||||
|         startConnection(new Connection(ctx, CLIENT, new Socket(trustedHost, port), listener, requestedObjects)); | ||||
|     } | ||||
|  | ||||
|     private void startConnection(Connection c) { | ||||
|         synchronized (connections) { | ||||
|             // prevent connecting twice to the same node | ||||
|   | ||||
| @@ -1,64 +0,0 @@ | ||||
| /* | ||||
|  * Copyright 2015 Christian Basler | ||||
|  * | ||||
|  * Licensed under the Apache License, Version 2.0 (the "License"); | ||||
|  * you may not use this file except in compliance with the License. | ||||
|  * You may obtain a copy of the License at | ||||
|  * | ||||
|  *     http://www.apache.org/licenses/LICENSE-2.0 | ||||
|  * | ||||
|  * Unless required by applicable law or agreed to in writing, software | ||||
|  * distributed under the License is distributed on an "AS IS" BASIS, | ||||
|  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||
|  * See the License for the specific language governing permissions and | ||||
|  * limitations under the License. | ||||
|  */ | ||||
|  | ||||
| package ch.dissem.bitmessage.networking; | ||||
|  | ||||
| import ch.dissem.bitmessage.entity.NetworkMessage; | ||||
| import ch.dissem.bitmessage.entity.Version; | ||||
| import ch.dissem.bitmessage.entity.valueobject.NetworkAddress; | ||||
| import ch.dissem.bitmessage.utils.UnixTime; | ||||
| import org.junit.Ignore; | ||||
| import org.junit.Test; | ||||
|  | ||||
| /** | ||||
|  * FIXME: there really should be sensible tests for the network handler | ||||
|  */ | ||||
| public class DefaultNetworkHandlerTest { | ||||
|     private NetworkAddress localhost = new NetworkAddress.Builder().ipv4(127, 0, 0, 1).port(8444).build(); | ||||
|  | ||||
|     // void start(MessageListener listener); | ||||
|     // void stop(); | ||||
|     // void offer(InventoryVector iv); | ||||
|     // Property getNetworkStatus(); | ||||
|  | ||||
|     @Ignore | ||||
|     @Test(expected = InterruptedException.class) | ||||
|     public void testSendMessage() throws Exception { | ||||
|         final Thread baseThread = Thread.currentThread(); | ||||
|         DefaultNetworkHandler net = new DefaultNetworkHandler(); | ||||
| //        net.setListener(localhost, new NetworkHandler.MessageListener() { | ||||
| //            @Override | ||||
| //            public void receive(ObjectPayload payload) { | ||||
| //                System.out.println(payload); | ||||
| //                baseThread.interrupt(); | ||||
| //            } | ||||
| //        }); | ||||
|         NetworkMessage ver = new NetworkMessage( | ||||
|                 new Version.Builder() | ||||
|                         .version(3) | ||||
|                         .services(1) | ||||
|                         .timestamp(UnixTime.now()) | ||||
|                         .addrFrom(localhost) | ||||
|                         .addrRecv(localhost) | ||||
|                         .nonce(-1) | ||||
|                         .userAgent("Test") | ||||
|                         .streams(1, 2) | ||||
|                         .build() | ||||
|         ); | ||||
| //        net.send(localhost, ver); | ||||
|         Thread.sleep(20000); | ||||
|     } | ||||
| } | ||||
| @@ -0,0 +1,147 @@ | ||||
| /* | ||||
|  * Copyright 2015 Christian Basler | ||||
|  * | ||||
|  * Licensed under the Apache License, Version 2.0 (the "License"); | ||||
|  * you may not use this file except in compliance with the License. | ||||
|  * You may obtain a copy of the License at | ||||
|  * | ||||
|  *     http://www.apache.org/licenses/LICENSE-2.0 | ||||
|  * | ||||
|  * Unless required by applicable law or agreed to in writing, software | ||||
|  * distributed under the License is distributed on an "AS IS" BASIS, | ||||
|  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||
|  * See the License for the specific language governing permissions and | ||||
|  * limitations under the License. | ||||
|  */ | ||||
|  | ||||
| package ch.dissem.bitmessage.networking; | ||||
|  | ||||
| import ch.dissem.bitmessage.BitmessageContext; | ||||
| import ch.dissem.bitmessage.entity.valueobject.NetworkAddress; | ||||
| import ch.dissem.bitmessage.ports.AddressRepository; | ||||
| import ch.dissem.bitmessage.ports.MessageRepository; | ||||
| import ch.dissem.bitmessage.ports.NetworkHandler; | ||||
| import ch.dissem.bitmessage.security.bc.BouncySecurity; | ||||
| import ch.dissem.bitmessage.utils.Property; | ||||
| import org.junit.BeforeClass; | ||||
| import org.junit.Test; | ||||
| import org.mockito.Mockito; | ||||
|  | ||||
| import java.net.InetAddress; | ||||
|  | ||||
| import static org.junit.Assert.assertEquals; | ||||
| import static org.mockito.Mockito.mock; | ||||
|  | ||||
| /** | ||||
|  * FIXME: there really should be sensible tests for the network handler | ||||
|  */ | ||||
| public class NetworkHandlerTest { | ||||
|     private static NetworkAddress localhost = new NetworkAddress.Builder().ipv4(127, 0, 0, 1).port(6001).build(); | ||||
|  | ||||
|     private static TestInventory peerInventory; | ||||
|     private static TestInventory nodeInventory; | ||||
|  | ||||
|     private static BitmessageContext node; | ||||
|     private static NetworkHandler networkHandler; | ||||
|  | ||||
|     @BeforeClass | ||||
|     public static void setUp() { | ||||
|         peerInventory = new TestInventory(); | ||||
|         BitmessageContext peer = new BitmessageContext.Builder() | ||||
|                 .addressRepo(Mockito.mock(AddressRepository.class)) | ||||
|                 .inventory(peerInventory) | ||||
|                 .messageRepo(Mockito.mock(MessageRepository.class)) | ||||
|                 .port(6001) | ||||
|                 .nodeRegistry(new TestNodeRegistry()) | ||||
|                 .networkHandler(new DefaultNetworkHandler()) | ||||
|                 .security(new BouncySecurity()) | ||||
|                 .build(); | ||||
|         peer.startup(Mockito.mock(BitmessageContext.Listener.class)); | ||||
|  | ||||
|         nodeInventory = new TestInventory(); | ||||
|         networkHandler = new DefaultNetworkHandler(); | ||||
|         node = new BitmessageContext.Builder() | ||||
|                 .addressRepo(Mockito.mock(AddressRepository.class)) | ||||
|                 .inventory(nodeInventory) | ||||
|                 .messageRepo(Mockito.mock(MessageRepository.class)) | ||||
|                 .port(6002) | ||||
|                 .nodeRegistry(new TestNodeRegistry(localhost)) | ||||
|                 .networkHandler(networkHandler) | ||||
|                 .security(new BouncySecurity()) | ||||
|                 .build(); | ||||
|     } | ||||
|  | ||||
|     @Test(timeout = 20_000) | ||||
|     public void ensureNodesAreConnecting() { | ||||
|         try { | ||||
|             node.startup(Mockito.mock(BitmessageContext.Listener.class)); | ||||
|             Property status; | ||||
|             do { | ||||
|                 Thread.yield(); | ||||
|                 status = node.status().getProperty("network").getProperty("connections").getProperty("stream 0"); | ||||
|             } while (status == null); | ||||
|             assertEquals(1, status.getProperty("outgoing").getValue()); | ||||
|         } finally { | ||||
|             shutdown(node); | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     @Test(timeout = 5_000) | ||||
|     public void ensureObjectsAreSynchronizedIfBothHaveObjects() throws Exception { | ||||
|         peerInventory.init( | ||||
|                 "V4Pubkey.payload", | ||||
|                 "V5Broadcast.payload" | ||||
|         ); | ||||
|  | ||||
|         nodeInventory.init( | ||||
|                 "V1Msg.payload" | ||||
|         ); | ||||
|  | ||||
|         Thread t = networkHandler.synchronize(InetAddress.getLocalHost(), 6001, | ||||
|                 mock(NetworkHandler.MessageListener.class), | ||||
|                 10); | ||||
|         t.join(); | ||||
|         assertEquals(3, nodeInventory.getInventory().size()); | ||||
|         assertEquals(3, peerInventory.getInventory().size()); | ||||
|     } | ||||
|  | ||||
|     @Test(timeout = 5_000) | ||||
|     public void ensureObjectsAreSynchronizedIfOnlyPeerHasObjects() throws Exception { | ||||
|         peerInventory.init( | ||||
|                 "V4Pubkey.payload", | ||||
|                 "V5Broadcast.payload" | ||||
|         ); | ||||
|  | ||||
|         nodeInventory.init(); | ||||
|  | ||||
|         Thread t = networkHandler.synchronize(InetAddress.getLocalHost(), 6001, | ||||
|                 mock(NetworkHandler.MessageListener.class), | ||||
|                 10); | ||||
|         t.join(); | ||||
|         assertEquals(2, nodeInventory.getInventory().size()); | ||||
|         assertEquals(2, peerInventory.getInventory().size()); | ||||
|     } | ||||
|  | ||||
|     @Test(timeout = 5_000) | ||||
|     public void ensureObjectsAreSynchronizedIfOnlyNodeHasObjects() throws Exception { | ||||
|         peerInventory.init(); | ||||
|  | ||||
|         nodeInventory.init( | ||||
|                 "V1Msg.payload" | ||||
|         ); | ||||
|  | ||||
|         Thread t = networkHandler.synchronize(InetAddress.getLocalHost(), 6001, | ||||
|                 mock(NetworkHandler.MessageListener.class), | ||||
|                 10); | ||||
|         t.join(); | ||||
|         assertEquals(1, nodeInventory.getInventory().size()); | ||||
|         assertEquals(1, peerInventory.getInventory().size()); | ||||
|     } | ||||
|  | ||||
|     private void shutdown(BitmessageContext node) { | ||||
|         node.shutdown(); | ||||
|         do { | ||||
|             Thread.yield(); | ||||
|         } while (node.isRunning()); | ||||
|     } | ||||
| } | ||||
| @@ -0,0 +1,76 @@ | ||||
| /* | ||||
|  * Copyright 2015 Christian Basler | ||||
|  * | ||||
|  * Licensed under the Apache License, Version 2.0 (the "License"); | ||||
|  * you may not use this file except in compliance with the License. | ||||
|  * You may obtain a copy of the License at | ||||
|  * | ||||
|  *     http://www.apache.org/licenses/LICENSE-2.0 | ||||
|  * | ||||
|  * Unless required by applicable law or agreed to in writing, software | ||||
|  * distributed under the License is distributed on an "AS IS" BASIS, | ||||
|  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||
|  * See the License for the specific language governing permissions and | ||||
|  * limitations under the License. | ||||
|  */ | ||||
|  | ||||
| package ch.dissem.bitmessage.networking; | ||||
|  | ||||
| import ch.dissem.bitmessage.entity.ObjectMessage; | ||||
| import ch.dissem.bitmessage.entity.payload.ObjectType; | ||||
| import ch.dissem.bitmessage.entity.valueobject.InventoryVector; | ||||
| import ch.dissem.bitmessage.ports.Inventory; | ||||
| import ch.dissem.bitmessage.utils.TestUtils; | ||||
|  | ||||
| import java.io.IOException; | ||||
| import java.util.ArrayList; | ||||
| import java.util.HashMap; | ||||
| import java.util.List; | ||||
| import java.util.Map; | ||||
|  | ||||
| public class TestInventory implements Inventory { | ||||
|     private final Map<InventoryVector, ObjectMessage> inventory; | ||||
|  | ||||
|     public TestInventory() { | ||||
|         this.inventory = new HashMap<>(); | ||||
|     } | ||||
|  | ||||
|     @Override | ||||
|     public List<InventoryVector> getInventory(long... streams) { | ||||
|         return new ArrayList<>(inventory.keySet()); | ||||
|     } | ||||
|  | ||||
|     @Override | ||||
|     public List<InventoryVector> getMissing(List<InventoryVector> offer, long... streams) { | ||||
|         return offer; | ||||
|     } | ||||
|  | ||||
|     @Override | ||||
|     public ObjectMessage getObject(InventoryVector vector) { | ||||
|         return inventory.get(vector); | ||||
|     } | ||||
|  | ||||
|     @Override | ||||
|     public List<ObjectMessage> getObjects(long stream, long version, ObjectType... types) { | ||||
|         return new ArrayList<>(inventory.values()); | ||||
|     } | ||||
|  | ||||
|     @Override | ||||
|     public void storeObject(ObjectMessage object) { | ||||
|         inventory.put(object.getInventoryVector(), object); | ||||
|     } | ||||
|  | ||||
|     @Override | ||||
|     public void cleanup() { | ||||
|  | ||||
|     } | ||||
|  | ||||
|     public void init(String... resources) throws IOException { | ||||
|         inventory.clear(); | ||||
|         for (String resource : resources) { | ||||
|             int version = Integer.parseInt(resource.substring(1, 2)); | ||||
|             ObjectMessage obj = TestUtils.loadObjectMessage(version, resource); | ||||
|             inventory.put(obj.getInventoryVector(), obj); | ||||
|         } | ||||
|     } | ||||
| } | ||||
| @@ -0,0 +1,44 @@ | ||||
| /* | ||||
|  * Copyright 2015 Christian Basler | ||||
|  * | ||||
|  * Licensed under the Apache License, Version 2.0 (the "License"); | ||||
|  * you may not use this file except in compliance with the License. | ||||
|  * You may obtain a copy of the License at | ||||
|  * | ||||
|  *     http://www.apache.org/licenses/LICENSE-2.0 | ||||
|  * | ||||
|  * Unless required by applicable law or agreed to in writing, software | ||||
|  * distributed under the License is distributed on an "AS IS" BASIS, | ||||
|  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||
|  * See the License for the specific language governing permissions and | ||||
|  * limitations under the License. | ||||
|  */ | ||||
|  | ||||
| package ch.dissem.bitmessage.networking; | ||||
|  | ||||
| import ch.dissem.bitmessage.entity.valueobject.NetworkAddress; | ||||
| import ch.dissem.bitmessage.ports.NodeRegistry; | ||||
|  | ||||
| import java.util.Arrays; | ||||
| import java.util.List; | ||||
|  | ||||
| /** | ||||
|  * Empty {@link NodeRegistry} that doesn't do anything, but shouldn't break things either. | ||||
|  */ | ||||
| class TestNodeRegistry implements NodeRegistry { | ||||
|     private List<NetworkAddress> nodes; | ||||
|  | ||||
|     public TestNodeRegistry(NetworkAddress... nodes) { | ||||
|         this.nodes = Arrays.asList(nodes); | ||||
|     } | ||||
|  | ||||
|     @Override | ||||
|     public List<NetworkAddress> getKnownAddresses(int limit, long... streams) { | ||||
|         return nodes; | ||||
|     } | ||||
|  | ||||
|     @Override | ||||
|     public void offerAddresses(List<NetworkAddress> addresses) { | ||||
|         // Ignore | ||||
|     } | ||||
| } | ||||
							
								
								
									
										
											BIN
										
									
								
								networking/src/test/resources/V1Msg.payload
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										
											BIN
										
									
								
								networking/src/test/resources/V1Msg.payload
									
									
									
									
									
										Normal file
									
								
							
										
											Binary file not shown.
										
									
								
							
							
								
								
									
										
											BIN
										
									
								
								networking/src/test/resources/V4Pubkey.payload
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										
											BIN
										
									
								
								networking/src/test/resources/V4Pubkey.payload
									
									
									
									
									
										Normal file
									
								
							
										
											Binary file not shown.
										
									
								
							
							
								
								
									
										
											BIN
										
									
								
								networking/src/test/resources/V5Broadcast.payload
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										
											BIN
										
									
								
								networking/src/test/resources/V5Broadcast.payload
									
									
									
									
									
										Normal file
									
								
							
										
											Binary file not shown.
										
									
								
							
		Reference in New Issue
	
	Block a user