Skip to content

Commit

Permalink
Rename ShardManagerNodeAware to ShardManagerTokenAware; add basic cac…
Browse files Browse the repository at this point in the history
…hing

also, remove some logs that are no longer necessary
  • Loading branch information
michaeljmarshall committed Sep 17, 2024
1 parent 778d47b commit 50b548d
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ static ShardManager create(DiskBoundaries diskBoundaries, AbstractReplicationStr
{
if (diskPositions != null && diskPositions.size() > 1)
throw new IllegalArgumentException("Cannot use node aware strategy with disk boundaries");
return new ShardManagerNodeAware(rs);
return new ShardManagerTokenAware(rs);
}

SortedLocalRanges localRanges = diskBoundaries.getLocalRanges();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,16 @@

package org.apache.cassandra.db.compaction;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

import javax.annotation.Nullable;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.cassandra.db.PartitionPosition;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.Range;
Expand All @@ -34,29 +39,33 @@
import org.apache.cassandra.locator.TokenMetadata;

/**
* A shard manager implementation that accepts token-allocator-generated-tokens and splits along them to ensure that
* current and future states of the cluster will have sstables within shards, not across them, for sufficiently high
* levels of compaction, which allows nodes to trivially own complete sstables for sufficiently high levels of
* compaction.
*
* If there are not yet enough tokens allocated, use the {@link org.apache.cassandra.dht.tokenallocator.TokenAllocator}
* to allocate more tokens to split along. The key to this implementation is utilizing the same algorithm to allocate
* tokens to nodes and to split ranges for higher levels of compaction.
* A {@link ShardManager} implementation that takes an {@link AbstractReplicationStrategy} as input and uses it
* to determine current and future token boundaries to use as sharding split points to ensure that for current and
* future states of the cluster, the generated sstable ranges will not span multiple nodes for sufficiently high
* levels of compaction.
* <p>
* If more compaction requires more shards than the already allocated tokens can satisfy, use the
* {@link org.apache.cassandra.dht.tokenallocator.TokenAllocator} to allocate more tokens and then use those tokens
* as split points. This implementation relies on the fact that token allocation is deterministic after the first
* token has been selected.
*/
// I haven't figured out yet whether the interesting part of this class is the fact that we use the token allocator
// to find higher level splits or if it is the node awareness. Is it possible to remove the node awareness and keep
// the allocator's logic or do we need both?
public class ShardManagerNodeAware implements ShardManager
public class ShardManagerTokenAware implements ShardManager
{
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ShardManagerNodeAware.class);
public static final Token[] TOKENS = new Token[0];
private static final Logger logger = LoggerFactory.getLogger(ShardManagerTokenAware.class);
public static final Token[] EMPTY_TOKENS = new Token[0];
private final AbstractReplicationStrategy rs;
private final TokenMetadata tokenMetadata;
private final IPartitioner partitioner;
private final ConcurrentHashMap<Integer, Token[]> splitPointCache;

public ShardManagerNodeAware(AbstractReplicationStrategy rs)
public ShardManagerTokenAware(AbstractReplicationStrategy rs)
{
this.rs = rs;
this.tokenMetadata = rs.getTokenMetadata();
// Clone the map to ensure it has a consistent view of the tokenMetadata. UCS creates a new instance of the
// ShardManagerTokenAware class when the token metadata changes.
this.tokenMetadata = rs.getTokenMetadata().cloneOnlyTokenMap();
this.splitPointCache = new ConcurrentHashMap<>();
this.partitioner = tokenMetadata.partitioner;
}

@Override
Expand Down Expand Up @@ -84,24 +93,8 @@ public ShardTracker boundaries(int shardCount)
{
try
{
logger.debug("Attempting to create shard boundaries for {} shards", shardCount);
//e5ae871ca68f
// Because sstables do not wrap around, we need shardCount - 1 splits.
var splitPointCount = shardCount - 1;
// Clone token map to avoid race conditions in the event we need to getAllEndpoints
var tokenMetadataClone = tokenMetadata.cloneOnlyTokenMap();
var sortedTokens = tokenMetadataClone.sortedTokens();
if (splitPointCount > sortedTokens.size())
{
// Not enough tokens, allocate them.
int additionalSplits = splitPointCount - sortedTokens.size();
var newTokens = IsolatedTokenAllocator.allocateTokens(additionalSplits, rs);
sortedTokens.addAll(newTokens);
sortedTokens.sort(Token::compareTo);
}
var splitPoints = findTokenAlignedSplitPoints(sortedTokens.toArray(TOKENS), splitPointCount);
logger.debug("Creating shard boundaries for {} shards. Currently {} tokens: {}. Split points: {}", shardCount, sortedTokens.size(), sortedTokens, Arrays.toString(splitPoints));
return new NodeAlignedShardTracker(splitPoints);
var splitPoints = splitPointCache.computeIfAbsent(shardCount, this::computeBoundaries);
return new TokenAlignedShardTracker(splitPoints);
}
catch (Throwable t)
{
Expand All @@ -110,18 +103,30 @@ public ShardTracker boundaries(int shardCount)
}
}

private Token[] findTokenAlignedSplitPoints(Token[] sortedTokens, int splitPointCount)
private Token[] computeBoundaries(int shardCount)
{
assert splitPointCount <= sortedTokens.length : splitPointCount + " > " + sortedTokens.length;

// Because sstables do not wrap around, we need shardCount - 1 splits.
var splitPointCount = shardCount - 1;
// Copy array list. The current token allocation logic doesn't consider our copy of tokenMetadata, so
// modifying the sorted tokens here won't give us much benefit.
var sortedTokensList = new ArrayList<>(tokenMetadata.sortedTokens());
if (splitPointCount > sortedTokensList.size())
{
// Not enough tokens, allocate them.
int additionalSplits = splitPointCount - sortedTokensList.size();
var newTokens = IsolatedTokenAllocator.allocateTokens(additionalSplits, rs);
sortedTokensList.addAll(newTokens);
sortedTokensList.sort(Token::compareTo);
}
var sortedTokens = sortedTokensList.toArray(EMPTY_TOKENS);
// Short circuit on equal and on count 1.
if (sortedTokens.length == splitPointCount)
return sortedTokens;
if (splitPointCount == 0)
return TOKENS;
return EMPTY_TOKENS;

var evenSplitPoints = computeUniformSplitPoints(tokenMetadata.partitioner, splitPointCount);
logger.debug("Even split points: {}", Arrays.toString(evenSplitPoints));
// Get the ideal split points and then map them to their nearest neighbor.
var evenSplitPoints = computeUniformSplitPoints(splitPointCount);
var nodeAlignedSplitPoints = new Token[splitPointCount];

// UCS requires that the splitting points for a given density are also splitting points for
Expand Down Expand Up @@ -174,7 +179,7 @@ else if (pos == max)
}


private Token[] computeUniformSplitPoints(IPartitioner partitioner, int splitPointCount)
private Token[] computeUniformSplitPoints(int splitPointCount)
{
// Want the shard count here to get the right ratio.
var rangeStep = 1.0 / (splitPointCount + 1);
Expand All @@ -188,17 +193,17 @@ private Token[] computeUniformSplitPoints(IPartitioner partitioner, int splitPoi
return tokens;
}

private class NodeAlignedShardTracker implements ShardTracker
private class TokenAlignedShardTracker implements ShardTracker
{
private final Token minToken;
private final Token[] sortedTokens;
private int nextShardIndex = 0;
private Token currentEnd;

NodeAlignedShardTracker(Token[] sortedTokens)
TokenAlignedShardTracker(Token[] sortedTokens)
{
this.sortedTokens = sortedTokens;
this.minToken = rs.getTokenMetadata().partitioner.getMinimumToken();
this.minToken = partitioner.getMinimumToken();
this.currentEnd = nextShardIndex < sortedTokens.length ? sortedTokens[nextShardIndex] : null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

public class ShardManagerNodeAwareTest
public class ShardManagerTokenAwareTest
{

@Test
Expand All @@ -50,7 +50,7 @@ public void testRangeEndsForShardCountEqualtToNumTokensPlusOne() throws UnknownH
{
var rs = buildStrategy(numTokens, 1, 1, 1);
var expectedTokens = rs.getTokenMetadata().sortedTokens();
var shardManager = new ShardManagerNodeAware(rs);
var shardManager = new ShardManagerTokenAware(rs);

var shardCount = numTokens + 1;
var iterator = shardManager.boundaries(shardCount);
Expand Down Expand Up @@ -81,7 +81,8 @@ public void testRangeEndsAreFromTokenListAndContainLowerRangeEnds() throws Unkno
var initialSplitPoints = rs.getTokenMetadata().sortedTokens();
// Confirm test set up is correct.
assertEquals(numTokensPerNode * nodeCount, initialSplitPoints.size());
var shardManager = new ShardManagerNodeAware(rs);
// Use a shared instance to
var shardManager = new ShardManagerTokenAware(rs);

// The tokens for one level lower.
var lowerTokens = new ArrayList<Token>();
Expand Down

1 comment on commit 50b548d

@cassci-bot
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Build rejected: 6 NEW test failure(s) in 2 builds., Build 2: ran 17581 tests with 13 failures and 128 skipped.
Butler analysis done on ds-cassandra-pr-gate/node-aware-shard-manager vs last 16 runs of ds-cassandra-build-nightly/main.
org.apache.cassandra.index.sai.cql.QueryWriteLifecycleTest.testWriteLifecycle[ca_CompositePartitionKeyDataModel{primaryKey=p1, p2}]: test is constantly failing. No failures on upstream;
branch story: [F] vs upstream: [++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++]; [NEW]
org.apache.cassandra.index.sai.cql.TinySegmentQueryWriteLifecycleTest.testWriteLifecycle[aa_CompoundKeyWithStaticsDataModel{primaryKey=p, c}]: test is constantly failing. No failures on upstream;
branch story: [F] vs upstream: [++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++]; [NEW]
org.apache.cassandra.index.sai.disk.vector.VectorCompressionTest.testOpenAiV3Small: test failed in the recent build. No failures on upstream;
branch story: [F+] vs upstream: [++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++]; [NEW]
org.apache.cassandra.index.sai.disk.vector.VectorCompressionTest.testAda002: test failed in the recent build. No failures on upstream;
branch story: [F+] vs upstream: [++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++]; [NEW]
org.apache.cassandra.index.sai.disk.vector.VectorCompressionTest.testOpenAiV3Large: test failed in the recent build. No failures on upstream;
branch story: [F+] vs upstream: [++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++]; [NEW]
org.apache.cassandra.index.sai.cql.VectorSiftSmallTest.testMultiSegmentBuild: test is constantly failing. No failures on upstream;
branch story: [FF] vs upstream: [++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++]; [NEW]
butler comparison

Please sign in to comment.