diff --git a/domain/src/main/java/ch/dissem/bitmessage/BitmessageContext.java b/domain/src/main/java/ch/dissem/bitmessage/BitmessageContext.java index def0a4f..511aaea 100644 --- a/domain/src/main/java/ch/dissem/bitmessage/BitmessageContext.java +++ b/domain/src/main/java/ch/dissem/bitmessage/BitmessageContext.java @@ -31,6 +31,8 @@ import org.slf4j.LoggerFactory; import java.net.InetAddress; import java.util.Arrays; +import java.util.Timer; +import java.util.TimerTask; import java.util.concurrent.*; import static ch.dissem.bitmessage.entity.Plaintext.Status.*; @@ -72,6 +74,13 @@ public class BitmessageContext { // As this thread is used for parts that do POW, which itself uses parallel threads, only // one should be executed at any time. pool = Executors.newFixedThreadPool(1); + + new Timer().schedule(new TimerTask() { + @Override + public void run() { + ctx.getProofOfWorkService().doMissingProofOfWork(); + } + }, 30_000); // After 30 seconds } public AddressRepository addresses() { @@ -206,6 +215,19 @@ public class BitmessageContext { } } + /** + * Send a custom message to a specific node (that should implement handling for this message type) and returns + * the response, which in turn is expected to be a {@link CustomMessage}. + * + * @param server the node's address + * @param port the node's port + * @param request the request + * @return the response + */ + public CustomMessage send(InetAddress server, int port, CustomMessage request) { + return ctx.getNetworkHandler().send(server, port, request); + } + public void cleanup() { ctx.getInventory().cleanup(); } @@ -276,6 +298,14 @@ public class BitmessageContext { ); } + /** + * Returns the {@link InternalContext} - normally you wouldn't need it, + * unless you are doing something crazy with the protocol. + */ + public InternalContext internals() { + return ctx; + } + public interface Listener { void receive(Plaintext plaintext); } diff --git a/domain/src/main/java/ch/dissem/bitmessage/InternalContext.java b/domain/src/main/java/ch/dissem/bitmessage/InternalContext.java index d139de5..89f3082 100644 --- a/domain/src/main/java/ch/dissem/bitmessage/InternalContext.java +++ b/domain/src/main/java/ch/dissem/bitmessage/InternalContext.java @@ -140,6 +140,10 @@ public class InternalContext { return proofOfWorkEngine; } + public ProofOfWorkService getProofOfWorkService() { + return proofOfWorkService; + } + public long[] getStreams() { long[] result = new long[streams.size()]; int i = 0; diff --git a/domain/src/main/java/ch/dissem/bitmessage/ProofOfWorkService.java b/domain/src/main/java/ch/dissem/bitmessage/ProofOfWorkService.java index 3cf46ef..da59105 100644 --- a/domain/src/main/java/ch/dissem/bitmessage/ProofOfWorkService.java +++ b/domain/src/main/java/ch/dissem/bitmessage/ProofOfWorkService.java @@ -20,6 +20,13 @@ public class ProofOfWorkService implements ProofOfWorkEngine.Callback, InternalC private ProofOfWorkRepository powRepo; private MessageRepository messageRepo; + public void doMissingProofOfWork() { + for (byte[] initialHash : powRepo.getItems()) { + ProofOfWorkRepository.Item item = powRepo.getItem(initialHash); + security.doProofOfWork(item.object, item.nonceTrialsPerByte, item.extraBytes, this); + } + } + public void doProofOfWork(ObjectMessage object) { doProofOfWork(null, object); } @@ -29,7 +36,7 @@ public class ProofOfWorkService implements ProofOfWorkEngine.Callback, InternalC long extraBytes = recipient == null ? 0 : recipient.getPubkey().getExtraBytes(); powRepo.putObject(object, nonceTrialsPerByte, extraBytes); - if (object.getPayload() instanceof PlaintextHolder){ + if (object.getPayload() instanceof PlaintextHolder) { Plaintext plaintext = ((PlaintextHolder) object.getPayload()).getPlaintext(); plaintext.setInitialHash(security.getInitialHash(object)); messageRepo.save(plaintext); @@ -39,7 +46,7 @@ public class ProofOfWorkService implements ProofOfWorkEngine.Callback, InternalC @Override public void onNonceCalculated(byte[] initialHash, byte[] nonce) { - ObjectMessage object = powRepo.getObject(initialHash); + ObjectMessage object = powRepo.getItem(initialHash).object; object.setNonce(nonce); // messageCallback.proofOfWorkCompleted(payload); Plaintext plaintext = messageRepo.getMessage(initialHash); @@ -48,6 +55,7 @@ public class ProofOfWorkService implements ProofOfWorkEngine.Callback, InternalC messageRepo.save(plaintext); } ctx.getInventory().storeObject(object); + ctx.getProofOfWorkRepository().removeObject(initialHash); ctx.getNetworkHandler().offer(object.getInventoryVector()); // messageCallback.messageOffered(payload, object.getInventoryVector()); } diff --git a/domain/src/main/java/ch/dissem/bitmessage/entity/CustomMessage.java b/domain/src/main/java/ch/dissem/bitmessage/entity/CustomMessage.java index a5caf3a..126b808 100644 --- a/domain/src/main/java/ch/dissem/bitmessage/entity/CustomMessage.java +++ b/domain/src/main/java/ch/dissem/bitmessage/entity/CustomMessage.java @@ -53,13 +53,21 @@ public class CustomMessage implements MessagePayload { return Command.CUSTOM; } - public byte[] getData() throws IOException { + public String getCustomCommand() { + return command; + } + + public byte[] getData() { if (data != null) { return data; } else { - ByteArrayOutputStream out = new ByteArrayOutputStream(); - write(out); - return out.toByteArray(); + try { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + write(out); + return out.toByteArray(); + } catch (IOException e) { + throw new RuntimeException(e); + } } } diff --git a/domain/src/main/java/ch/dissem/bitmessage/ports/ProofOfWorkRepository.java b/domain/src/main/java/ch/dissem/bitmessage/ports/ProofOfWorkRepository.java index 9971ad5..739c172 100644 --- a/domain/src/main/java/ch/dissem/bitmessage/ports/ProofOfWorkRepository.java +++ b/domain/src/main/java/ch/dissem/bitmessage/ports/ProofOfWorkRepository.java @@ -2,15 +2,31 @@ package ch.dissem.bitmessage.ports; import ch.dissem.bitmessage.entity.ObjectMessage; +import java.util.List; + /** * Objects that proof of work is currently being done for. * * @author Christian Basler */ public interface ProofOfWorkRepository { - ObjectMessage getObject(byte[] initialHash); + Item getItem(byte[] initialHash); + + List getItems(); void putObject(ObjectMessage object, long nonceTrialsPerByte, long extraBytes); - void removeObject(ObjectMessage object); + void removeObject(byte[] initialHash); + + class Item { + public final ObjectMessage object; + public final long nonceTrialsPerByte; + public final long extraBytes; + + public Item(ObjectMessage object, long nonceTrialsPerByte, long extraBytes) { + this.object = object; + this.nonceTrialsPerByte = nonceTrialsPerByte; + this.extraBytes = extraBytes; + } + } } diff --git a/extensions/src/main/java/ch/dissem/bitmessage/extensions/pow/ProofOfWorkRequest.java b/extensions/src/main/java/ch/dissem/bitmessage/extensions/pow/ProofOfWorkRequest.java index 2ef2f9e..196005d 100644 --- a/extensions/src/main/java/ch/dissem/bitmessage/extensions/pow/ProofOfWorkRequest.java +++ b/extensions/src/main/java/ch/dissem/bitmessage/extensions/pow/ProofOfWorkRequest.java @@ -18,13 +18,13 @@ package ch.dissem.bitmessage.extensions.pow; import ch.dissem.bitmessage.entity.BitmessageAddress; import ch.dissem.bitmessage.entity.Streamable; +import ch.dissem.bitmessage.extensions.CryptoCustomMessage; import ch.dissem.bitmessage.utils.Encode; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; -import static ch.dissem.bitmessage.extensions.pow.ProofOfWorkRequest.Request.CALCULATE; import static ch.dissem.bitmessage.utils.Decode.*; /** @@ -34,6 +34,7 @@ public class ProofOfWorkRequest implements Streamable { private final BitmessageAddress sender; private final byte[] initialHash; private final Request request; + private final byte[] data; public ProofOfWorkRequest(BitmessageAddress sender, byte[] initialHash, Request request) { @@ -79,10 +80,23 @@ public class ProofOfWorkRequest implements Streamable { Encode.varBytes(data, out); } + public static class Reader implements CryptoCustomMessage.Reader { + private final BitmessageAddress identity; + + public Reader(BitmessageAddress identity) { + this.identity = identity; + } + + @Override + public ProofOfWorkRequest read(BitmessageAddress sender, InputStream in) throws IOException { + return ProofOfWorkRequest.read(identity, in); + } + } + + public enum Request { CALCULATE, CALCULATING, - QUERY, COMPLETE } } diff --git a/repositories/src/main/java/ch/dissem/bitmessage/repository/JdbcProofOfWorkRepository.java b/repositories/src/main/java/ch/dissem/bitmessage/repository/JdbcProofOfWorkRepository.java index aa8ca23..9268311 100644 --- a/repositories/src/main/java/ch/dissem/bitmessage/repository/JdbcProofOfWorkRepository.java +++ b/repositories/src/main/java/ch/dissem/bitmessage/repository/JdbcProofOfWorkRepository.java @@ -8,6 +8,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.sql.*; +import java.util.LinkedList; +import java.util.List; import static ch.dissem.bitmessage.utils.Singleton.security; @@ -22,14 +24,18 @@ public class JdbcProofOfWorkRepository extends JdbcHelper implements ProofOfWork } @Override - public ObjectMessage getObject(byte[] initialHash) { + public Item getItem(byte[] initialHash) { try (Connection connection = config.getConnection()) { - PreparedStatement ps = connection.prepareStatement("SELECT data, version FROM POW WHERE initial_hash=?"); + PreparedStatement ps = connection.prepareStatement("SELECT data, version, nonce_trials_per_byte, extra_bytes FROM POW WHERE initial_hash=?"); ps.setBytes(1, initialHash); ResultSet rs = ps.executeQuery(); if (rs.next()) { Blob data = rs.getBlob("data"); - return Factory.getObjectMessage(rs.getInt("version"), data.getBinaryStream(), (int) data.length()); + return new Item( + Factory.getObjectMessage(rs.getInt("version"), data.getBinaryStream(), (int) data.length()), + rs.getLong("nonce_trials_per_byte"), + rs.getLong("extra_bytes") + ); } else { throw new RuntimeException("Object requested that we don't have. Initial hash: " + Strings.hex(initialHash)); } @@ -39,13 +45,31 @@ public class JdbcProofOfWorkRepository extends JdbcHelper implements ProofOfWork } } + @Override + public List getItems() { + try (Connection connection = config.getConnection()) { + List result = new LinkedList<>(); + Statement stmt = connection.createStatement(); + ResultSet rs = stmt.executeQuery("SELECT initial_hash FROM POW"); + while (rs.next()) { + result.add(rs.getBytes("initial_hash")); + } + return result; + } catch (SQLException e) { + LOG.error(e.getMessage(), e); + throw new RuntimeException(e); + } + } + @Override public void putObject(ObjectMessage object, long nonceTrialsPerByte, long extraBytes) { try (Connection connection = config.getConnection()) { - PreparedStatement ps = connection.prepareStatement("INSERT INTO POW (initial_hash, data, version) VALUES (?, ?, ?)"); + PreparedStatement ps = connection.prepareStatement("INSERT INTO POW (initial_hash, data, version, nonce_trials_per_byte, extra_bytes) VALUES (?, ?, ?, ?, ?)"); ps.setBytes(1, security().getInitialHash(object)); writeBlob(ps, 2, object); ps.setLong(3, object.getVersion()); + ps.setLong(4, nonceTrialsPerByte); + ps.setLong(5, extraBytes); ps.executeUpdate(); } catch (SQLException e) { LOG.debug("Error storing object of type " + object.getPayload().getClass().getSimpleName(), e); @@ -57,10 +81,10 @@ public class JdbcProofOfWorkRepository extends JdbcHelper implements ProofOfWork } @Override - public void removeObject(ObjectMessage object) { + public void removeObject(byte[] initialHash) { try (Connection connection = config.getConnection()) { PreparedStatement ps = connection.prepareStatement("DELETE FROM POW WHERE initial_hash=?"); - ps.setBytes(1, security().getInitialHash(object)); + ps.setBytes(1, initialHash); ps.executeUpdate(); } catch (SQLException e) { LOG.debug(e.getMessage(), e); diff --git a/repositories/src/main/resources/db/migration/V2.1__Create_table_POW.sql b/repositories/src/main/resources/db/migration/V2.1__Create_table_POW.sql index 4f54698..b39c6c5 100644 --- a/repositories/src/main/resources/db/migration/V2.1__Create_table_POW.sql +++ b/repositories/src/main/resources/db/migration/V2.1__Create_table_POW.sql @@ -1,5 +1,7 @@ CREATE TABLE POW ( - initial_hash BINARY(64) PRIMARY KEY, - data BLOB NOT NULL, - version BIGINT NOT NULL + initial_hash BINARY(64) PRIMARY KEY, + data BLOB NOT NULL, + version BIGINT NOT NULL, + nonce_trials_per_byte BIGINT NOT NULL, + extra_bytes BIGINT NOT NULL );