Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -127,9 +127,7 @@ void whenParallelizableTransactionsAreSent_someAreExecutedInParallel() throws Ex
Future<Map<Integer, String>> future = getBlocksAsync();

try {

blocksResponseMap.put("asyncBlocksResult", future.get());

} catch (ExecutionException | InterruptedException e) {
Assertions.fail(e);
}
Expand Down Expand Up @@ -166,7 +164,13 @@ void whenParallelizableTransactionsAreSent_someAreExecutedInParallel() throws Ex
}

Assertions.assertTrue(pteFound);

txResponseMap.values().forEach(r -> {
try {
r.body().close();
} catch (IOException e) {
Assertions.fail(e);
}
});
}

private Response getBlockByNumber(String number) throws IOException {
Expand All @@ -186,7 +190,7 @@ private Future<Map<Integer, String>> getBlocksAsync() {
Map<Integer, String> results = new HashMap<>();

for (int i = 0; i < MAX_BLOCKS_TO_GET; i++) {
String response = getBlockByNumber("0x" + String.format("%02x", i)).body().string();
String response = OkHttpClientTestFixture.responseBody(getBlockByNumber("0x" + String.format("%02x", i)));

results.put(i, response);
Thread.sleep(500);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ void whenStartTheServerAndClientNodes_thenTheClientWillSynchWithServer() throws

JsonNode serverBestBlockResponse = OkHttpClientTestFixture.getJsonResponseForGetBestBlockMessage(portServerRpc, "latest");
String serverBestBlockNumber = serverBestBlockResponse.get(0).get("result").get("number").asText();
assertTrue(HexUtils.jsonHexToLong(serverBestBlockNumber) > 6000);
assertTrue(HexUtils.jsonHexToLong(serverBestBlockNumber) > 10000);

//when
String rskConfFileChangedClient = configureClientConfWithGeneratedInformation(serverDbDir, clientDbDir.toString());
Expand Down Expand Up @@ -177,6 +177,7 @@ private void generateBlocks() throws IOException {
.toArray(OkHttpClientTestFixture.FromToAddressPair[]::new);
Response response = OkHttpClientTestFixture.sendBulkTransactions(portServerRpc, pairs);
assertTrue(response.isSuccessful());
response.body().close();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -146,13 +146,19 @@ public static Response sendJsonRpcGetBlockMessage(int port, String blockNumOrTag

public static JsonNode getJsonResponseForGetBestBlockMessage(int port, String blockNumOrTag) throws IOException {
Response response = sendJsonRpcGetBlockMessage(port, blockNumOrTag);
return new ObjectMapper().readTree(response.body().string());
return new ObjectMapper().readTree(OkHttpClientTestFixture.responseBody(response));
}

public static String getEnvelopedMethodCalls(String... methodCall) {
return "[\n" + String.join(",\n", methodCall) + "]";
}

public static String responseBody(Response response) throws IOException {
try (var body = response.body()) {
return body.string();
}
}

public static Response sendBulkTransactions(int rpcPort, FromToAddressPair... fromToAddresses) throws IOException {
Objects.requireNonNull(fromToAddresses);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,6 @@ sync {
enabled = true
parallel = true
limit = 4000
snapBootNodes = [
{
nodeId = <SERVER_NODE_ID>
ip = 127.0.0.1
port = <SERVER_NODE_PORT>
}
]
snapBootNodes = []
}
}
3 changes: 1 addition & 2 deletions rskj-core/src/main/java/co/rsk/RskContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -2084,7 +2084,6 @@ private SnapshotProcessor getSnapshotProcessor() {
final RskSystemProperties rskSystemProperties = getRskSystemProperties();
final Constants commonConstants = rskSystemProperties.getNetworkConstants();
final SyncConfiguration syncConfig = getSyncConfiguration();
final int checkpointDistance = syncConfig.getChunkSize() * syncConfig.getMaxSkeletonChunks();
final BlockTimeStampValidationRule blockTimeStampValidationRule = new BlockTimeStampValidationRule(
commonConstants.getNewBlockMaxSecondsInTheFuture(),
rskSystemProperties.getActivationConfig(),
Expand Down Expand Up @@ -2113,7 +2112,7 @@ private SnapshotProcessor getSnapshotProcessor() {
new ValidGasUsedRule()
),
getRskSystemProperties().getSnapshotChunkSize(),
checkpointDistance,
syncConfig,
getRskSystemProperties().getSnapshotMaxSenderRequests(),
getRskSystemProperties().checkHistoricalHeaders(),
getRskSystemProperties().isSnapshotParallelEnabled()
Expand Down
11 changes: 11 additions & 0 deletions rskj-core/src/main/java/co/rsk/core/bc/BlockUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import co.rsk.crypto.Keccak256;
import co.rsk.net.NetBlockStore;
import co.rsk.net.sync.SyncConfiguration;
import org.ethereum.config.Constants;
import org.ethereum.core.Block;
import org.ethereum.core.BlockHeader;
Expand All @@ -36,6 +37,8 @@
public class BlockUtils {
private static final long MAX_BLOCK_PROCESS_TIME_NANOSECONDS = 60_000_000_000L;

private static final int SNAP_BLOCK_CHECKPOINT_NUMBER = 5000;

private BlockUtils() { }

public static boolean tooMuchProcessTime(long nanoseconds) {
Expand Down Expand Up @@ -118,6 +121,14 @@ public static List<Block> sortBlocksByNumber(List<Block> blocks) {
.collect(Collectors.toList());
}

public static long getSnapCheckpointBlockNumber(long bestBlockNumber, SyncConfiguration syncConfiguration) {
// round to SNAP_BLOCK_CHECKPOINT_NUMBER, so that the next checkpoint block selected is after 5000 blocks
long roundedBlockNumber = bestBlockNumber - (bestBlockNumber % SNAP_BLOCK_CHECKPOINT_NUMBER);
int checkpointDistance = syncConfiguration.getChunkSize() * syncConfiguration.getMaxSkeletonChunks();

return Math.max(0, roundedBlockNumber - checkpointDistance);
}

/**
* Calculate the gas limit of a sublist, depending on the sublist type (sequential and parallel), from the block
* gas limit. The distribution can be performed one of two ways:
Expand Down
32 changes: 20 additions & 12 deletions rskj-core/src/main/java/co/rsk/net/SnapshotProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import co.rsk.config.InternalService;
import co.rsk.core.BlockDifficulty;
import co.rsk.core.bc.BlockUtils;
import co.rsk.core.types.bytes.Bytes;
import co.rsk.crypto.Keccak256;
import co.rsk.metrics.profilers.MetricKind;
Expand Down Expand Up @@ -69,15 +70,14 @@ public class SnapshotProcessor implements InternalService, SnapProcessor {

private static final Logger logger = LoggerFactory.getLogger("snapshotprocessor");

public static final int BLOCK_NUMBER_CHECKPOINT = 5000;
public static final int BLOCK_CHUNK_SIZE = 400;
public static final int BLOCKS_REQUIRED = 6000;
public static final long CHUNK_ITEM_SIZE = 1024L;
private final Blockchain blockchain;
private final TrieStore trieStore;
private final BlockStore blockStore;
private final int chunkSize;
private final int checkpointDistance;
private final SyncConfiguration syncConfiguration;
private final SnapshotPeersInformation peersInformation;
private final TransactionPool transactionPool;

Expand Down Expand Up @@ -111,13 +111,13 @@ public SnapshotProcessor(Blockchain blockchain,
BlockHeaderParentDependantValidationRule blockHeaderParentValidator,
BlockHeaderValidationRule blockHeaderValidator,
int chunkSize,
int checkpointDistance,
SyncConfiguration syncConfiguration,
int maxSenderRequests,
boolean checkHistoricalHeaders,
boolean isParallelEnabled) { // NOSONAR
this(blockchain, trieStore, peersInformation, blockStore, transactionPool,
blockParentValidator, blockValidator, blockHeaderParentValidator, blockHeaderValidator,
chunkSize, checkpointDistance, maxSenderRequests, checkHistoricalHeaders, isParallelEnabled, null);
chunkSize, syncConfiguration, maxSenderRequests, checkHistoricalHeaders, isParallelEnabled, null);
}

@VisibleForTesting
Expand All @@ -132,7 +132,7 @@ public SnapshotProcessor(Blockchain blockchain,
BlockHeaderParentDependantValidationRule blockHeaderParentValidator,
BlockHeaderValidationRule blockHeaderValidator,
int chunkSize,
int checkpointDistance,
SyncConfiguration syncConfiguration,
int maxSenderRequests,
boolean checkHistoricalHeaders,
boolean isParallelEnabled,
Expand All @@ -141,7 +141,7 @@ public SnapshotProcessor(Blockchain blockchain,
this.trieStore = trieStore;
this.peersInformation = peersInformation;
this.chunkSize = chunkSize;
this.checkpointDistance = checkpointDistance;
this.syncConfiguration = syncConfiguration;
this.maxSenderRequests = maxSenderRequests;
this.blockStore = blockStore;
this.transactionPool = transactionPool;
Expand Down Expand Up @@ -227,10 +227,7 @@ public void run() {
void processSnapStatusRequestInternal(Peer sender, SnapStatusRequestMessage requestMessage) {
long bestBlockNumber = blockchain.getBestBlock().getNumber();

// round to BLOCK_NUMBER_CHECKPOINT, so that the next checkpoint block selected is after 5000 blocks
long roundedBlockNumber = bestBlockNumber - (bestBlockNumber % BLOCK_NUMBER_CHECKPOINT);

long checkpointBlockNumber = Math.max(0, roundedBlockNumber - checkpointDistance);
long checkpointBlockNumber = BlockUtils.getSnapCheckpointBlockNumber(bestBlockNumber, syncConfiguration);
long lowerBlockNumberToRetrieve = Math.max(0, checkpointBlockNumber - BLOCK_CHUNK_SIZE);
logger.debug("Processing snapshot status request, checkpointBlockNumber: {}, bestBlockNumber: {}", checkpointBlockNumber, bestBlockNumber);

Expand Down Expand Up @@ -295,13 +292,24 @@ public void processSnapStatusResponse(SnapSyncState state, Peer sender, SnapStat
}

List<Block> blocksFromResponse = responseMessage.getBlocks();
if (blocksFromResponse == null || blocksFromResponse.isEmpty()) {
failSyncing(state, sender, EventType.INVALID_BLOCK, "Received empty blocks list in snap status response");
return;
}

List<BlockDifficulty> difficultiesFromResponse = responseMessage.getDifficulties();
if (blocksFromResponse.size() != difficultiesFromResponse.size()) {
failSyncing(state, sender, EventType.INVALID_BLOCK, "Blocks and difficulties size mismatch. Blocks: [{}], Difficulties: [{}]", blocksFromResponse.size(), difficultiesFromResponse.size());
if (difficultiesFromResponse == null || blocksFromResponse.size() != difficultiesFromResponse.size()) {
failSyncing(state, sender, EventType.INVALID_BLOCK, "Blocks and difficulties size mismatch. Blocks: [{}], Difficulties: [{}]", blocksFromResponse.size(), Optional.ofNullable(difficultiesFromResponse).map(List::size).map(Object::toString).orElse("<null>"));
return;
}

Block lastBlock = blocksFromResponse.get(blocksFromResponse.size() - 1);
if (state.getValidatedHeader() != null && !state.getValidatedHeader().getHash().equals(lastBlock.getHash())) {
failSyncing(state, sender, EventType.INVALID_BLOCK, "Received blocks with different hash than the validated header. Expected: [{}], Received: [{}]",
state.getValidatedHeader().getHash(), lastBlock.getHash());
return;
}

BlockDifficulty lastBlockDifficulty = difficultiesFromResponse.get(difficultiesFromResponse.size() - 1);

state.setLastBlock(lastBlock, lastBlockDifficulty, sender);
Expand Down
12 changes: 9 additions & 3 deletions rskj-core/src/main/java/co/rsk/net/SyncProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ public void processBlockHashResponse(Peer peer, BlockHashResponseMessage message
MessageType messageType = message.getMessageType();
if (isPending(messageId, messageType)) {
removePendingMessage(messageId, messageType);
syncState.newConnectionPointData(message.getHash());
syncState.newConnectionPointData(message.getHash(), peer);
} else {
notifyUnexpectedMessageToPeerScoring(peer, "block hash");
}
Expand Down Expand Up @@ -317,9 +317,15 @@ public void startBlockForwardSyncing(Peer peer) {
}

@Override
public void startSnapSync(Peer peer) {
public void startSnapCapablePeerSelection() {
logger.info("Start peer selection");
setSyncState(new SnapCapablePeerSelectionSyncState(this, syncConfiguration, peersInformation, blockHeaderValidationRule, difficultyRule));
}

@Override
public void startSnapSync(Peer peer, @Nullable BlockHeader validatedHeader) {
logger.info("Start Snap syncing with {}", peer.getPeerNodeID());
setSyncState(new SnapSyncState(this, snapshotProcessor, syncConfiguration));
setSyncState(new SnapSyncState(this, snapshotProcessor, syncConfiguration, validatedHeader));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
* You should have received a copy of the GNU Lesser General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

package co.rsk.net.sync;

import co.rsk.net.Peer;
Expand Down Expand Up @@ -46,7 +47,7 @@ public void onEnter(){
}

@Override
public void newBlockHeaders(Peer peer, List<BlockHeader> chunk){
public void newBlockHeaders(Peer peer, List<BlockHeader> chunk) {
BlockHeader header = chunk.get(0);
boolean unexpectedHeader = !ByteUtil.fastEquals(header.getHash().getBytes(), miniChunk.getHash());
if (unexpectedHeader) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,8 @@ private boolean tryStartSnapshotSync() {
// we consider Snap as part of the Long Sync
syncEventsHandler.onLongSyncUpdate(true, peerBestBlockNumOpt.get());

// start snap syncing
syncEventsHandler.startSnapSync(bestPeerOpt.get());
// select snap capable peer
syncEventsHandler.startSnapCapablePeerSelection();
return true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ public void clearOldFailedPeers() {
}

private boolean isSnapPeerCandidate(Map.Entry<Peer, SyncPeerStatus> entry) {
return syncConfiguration.getNodeIdToSnapshotTrustedPeerMap().containsKey(entry.getKey().getPeerNodeID().toString());
return syncConfiguration.getSnapBootNodeIds().contains(entry.getKey().getPeerNodeID());
}

private boolean isSnapPeerCandidateOrCapable(Map.Entry<Peer, SyncPeerStatus> entry) {
Expand Down
Loading
Loading