Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement ShardManagerTokenAware class to split shards along node-token boundaries #1255

Open
wants to merge 24 commits into
base: main
Choose a base branch
from

Conversation

michaeljmarshall
Copy link
Member

No description provided.

@michaeljmarshall
Copy link
Member Author

@blambov - please take a look. This is my initial take at the ShardManagerNodeAware. I left several TODOs in the code as questions for you. I haven't had a chance to do any testing yet. I am looking to get general feedback on the direction and on my understanding of what is necessary here. Thanks.

// Need to allocate tokens within node boundaries.
var endpoints = tokenMetadata.getAllEndpoints();
double addititionalSplits = splitPointCount - sortedTokens.size();
var splitPointsPerNode = (int) Math.ceil(addititionalSplits / endpoints.size());
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To generate additionalSplits many tokens the way that the allocation strategy would do it when new nodes are added, we need to do the following:

  • First, get the number of tokens the cluster uses per node, tokensPerNode. This should be sortedTokens.size() / endPoints.size() or DatabaseDescriptor.getNumTokens() (these two are expected to be the same, we should bail out if they aren't, or rely on explicitly specified value in the UCS configuration).
  • Set splitPointNodes = Math.ceil(additionalSplits / tokensPerNode).
  • Create splitPointNodes many new fake node ids, which need to be assigned in racks round-robin.
  • Get the token allocation to assign tokensPerNode many tokens for each of these fake nodes (i.e. use TokenAllocation.create(...) and then repeatedly call allocate on that object). See OfflineTokenAllocator.MultiNodeAllocator, which starts from empty and generates tokens for the given number of nodes in each rack; we want a variation of this which starts with TokenMetadata that matches the current, and then continues adding fake nodes until the required number are generated.
  • We can then flatten the generated token list and truncate it to exactly the required number of extra tokens, then concatenate it at the end of the original tokens and sort (i.e. prefer just choosing the first additionalSplits many from the new instead of using the selection scheme below).
  • The generated tokens should be cached.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking about it the wrong way. I was focused on finding the right splits for the existing nodes instead of just adding fake new nodes until we have enough tokens. This seems more straight forward.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can then flatten the generated token list and truncate it to exactly the required number of extra tokens, then concatenate it at the end of the original tokens and sort (i.e. prefer just choosing the first additionalSplits many from the new instead of using the selection scheme below).

I would have thought the selection scheme would give us better data distribution, as opposed to truncating the list of new tokens. Also, if we truncate the list, does that present issues for ensuring that higher levels of UCS have the same splits as lower levels?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just pushed up a new commit with a proposed implementation. I plan to write tests for it tomorrow.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we truncate the list, does that present issues for ensuring that higher levels of UCS have the same splits as lower levels?

No, it does not, because the same tokens will be generated again when we generate the higher-count levels (in other words, we can cache the smaller generation round (pre-truncation) and use it to save some work when generating the bigger).

I would have thought the selection scheme would give us better data distribution, as opposed to truncating the list of new tokens

The token allocation gives the biggest-impact-token first, which is also the one that splits the current biggest token range so taking them in order should be a good enough choice and we can save the effort. Also, I'm not sure looking for closest to even split can't cause bigger oddities.

}
}

return nodeAlignedSplitPoints;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can't permit multiple evenSplitPoints to map to the same nodeAlignedSplitPoint, because this effectively reduces the number of split points.

One thing we can do is remove entries from the node-aligned list when we use them, and enforce that the ones picked for smaller shard counts are also picked by recursively generating for shardCount / 2 first until shardCount is not divisible by 2.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am pretty sure I already did this, though it might not be in the ideal way. As we iterate over sortedTokens, we find the position of the nearest neighbor, and then we make set pos to the index of the next token in nodeAlignedSplitPoints. We then use pos as the lower bound in the binary search to find the closest split point. I haven't confirmed that this solution maintains the rule that UCS requires that the splitting points for a given density are also splitting points for all higher densities.

I can see that the recursive solution would trivially ensure UCS requires that the splitting points for a given density are also splitting points for all higher densities. Do you think we should prefer that approach?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that mine works assuming we have the power-of-two token allocator.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think this could work too (if we don't select a specific token, it would be because we selected it for a different split point). Added a couple of comments on ensuring no repetition, and not exhausting the sorted tokens too early.

This method needs to be unit-tested.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I hit a bug in this part of the code when working on adding unit tests and I think that it would be better to have a cleaner and clearer design. In general, I don't yet see how we guarantee that we select the same tokens at successively higher levels as tokens are added and removed to the sorted tokens list. Is it safe to assume a certain

One thing we can do is remove entries from the node-aligned list when we use them, and enforce that the ones picked for smaller shard counts are also picked by recursively generating for shardCount / 2 first until shardCount is not divisible by 2.

I tried implementing this, but I ran into trouble because I assumed I would have enough split points if I broke the space up by the nearest token to the midpoint:

    private void findTokenAlignedSplitPoints(Token[] sortedTokens, int min, int max, Token left, Token right, int splitPointCount, List<Token> splitPoints)
    {
        splitPointCount--;
        var midpoint = partitioner.midpoint(left, right);
        var index = Arrays.binarySearch(sortedTokens, min, max, midpoint);
        if (index < 0)
        {
            // -(insertion point) - 1
            System.out.println("Index: " + index + " midpoint: " + midpoint);
            index = -index - 1;
            if (index != 0 && index != sortedTokens.length - 1)
            {
                // Check to see which neighbor is closer
                var leftNeighbor = sortedTokens[index - 1];
                var rightNeighbor = sortedTokens[index];
                index = leftNeighbor.size(midpoint) <= midpoint.size(rightNeighbor) ? index - 1 : index;
            }
        }
        var tokenAlignedMidpoint = sortedTokens[index];
        var leftSplitPointCount = splitPointCount / 2;
        var rightSplitPointCount = splitPointCount - leftSplitPointCount;
        if (leftSplitPointCount > 0)
            findTokenAlignedSplitPoints(sortedTokens, min, index, left, tokenAlignedMidpoint, leftSplitPointCount, splitPoints);
        // Add this split point after finding all split points to the left
        splitPoints.add(tokenAlignedMidpoint);
        if (rightSplitPointCount > 0)
            findTokenAlignedSplitPoints(sortedTokens, index + 1, max, tokenAlignedMidpoint, right, rightSplitPointCount, splitPoints);
    }

That code doesn't work because there is no guarantee that the sorted tokens are split equally. Is that design close to what you were thinking? Or do I literally need to add calls to remove tokens from sortedTokens and then at each successive recursive call I would get the next power of two worth of tokens until I hit the sortedTokens.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I did have literal removal from the list in mind. It seems, however, that we can make your original solution work by making sure we always report the right number of splits, by doing something like this:

        int pos = 0;
        for (int i = 0; i < evenSplitPoints.length; i++)
        {
            int min = pos;
            int max = sortedTokens.length - evenSplitPoints.length + i;
            Token value = evenSplitPoints[i];
            pos = Arrays.binarySearch(sortedTokens, min, max, value);
            if (pos < 0)
                pos = -pos - 1;

            if (pos == min)
            {
                // No left neighbor, so choose the right neighbor
                nodeAlignedSplitPoints[i] = sortedTokens[pos];
                pos++;
            }
            else if (pos == max)
            {
                // No right neighbor, so choose the left neighbor
                // This also means that for all greater indexes we don't have a choice.
                for (; i < evenSplitPoints.length; ++i)
                    nodeAlignedSplitPoints[i] = sortedTokens[pos++ - 1];
            }
            else
            {
                // Check the neighbors
                Token leftNeighbor = sortedTokens[pos - 1];
                Token rightNeighbor = sortedTokens[pos];

                // Choose the nearest neighbor. By convention, prefer left if value is midpoint, but don't
                // choose the same token twice.
                if (leftNeighbor.size(value) <= value.size(rightNeighbor))
                {
                    nodeAlignedSplitPoints[i] = leftNeighbor;
                    // No need to bump pos because we decremented it to find the right split token.
                }
                else
                {
                    nodeAlignedSplitPoints[i] = rightNeighbor;
                    pos++;
                }
            }
        }

Could you run this through your test to see if it works? If not, could you upload the test so that I can play with it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That passed my test, thank you! I'll add some more tests proving that as we add nodes, the split points continue to be aligned

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This algorithm fails to choose the same tokens if we add one shard at a time. For example, give sorted tokens:

[-9193912203636246467, -4846620020038852605, -1955575638654777768, 1845313162618248341, 4442481910831405714, 7317158111889931131]

3 shards chooses split points [-1955575638654777768, 1845313162618248341]
4 shards chooses split points [-4846620020038852605, 1845313162618248341, 4442481910831405714]

Observe that the 3 split points do not contain 2 split points. I think this is acceptable because the shard count is always baseShardCount * 2 ^ n. Is that correct? I'll push up a test shortly that confirms this works with power of 2 growth.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just pushed a commit with testRangeEndsAreFromTokenListAndContainLowerRangeEnds to show that this works for powers of 2 shard counts.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, we are only guaranteed that findNearest(x) is a subset of findNearest(y) when x is a subset of y. The logic is that if we couldn't select the closest value to a point, it is only because we have already picked it for another point. This shouldn't be too hard to prove formally if we need to do that (I did check if ChatGPT 4o is smart enough to do it, without success).

@michaeljmarshall
Copy link
Member Author

@blambov - do you mind taking another look? Thanks!

Collection<Token> tokens = allocation.allocate(fakeNodeAddressAndPort);

// Validate ownership stats
validateAllocation(nodeId, rackId);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The validation only serves to ensure that the allocator's understanding of the ring is the same as the node's. After the initial testing (when we are happy that we have set the allocator up correctly), we should drop the validation to save some effort.

final Map<InetAddressAndPort, String> nodeByRack = new HashMap<>();

@Override
public String getRack(InetAddressAndPort endpoint)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this return the correct rack for the preexisting nodes?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great catch, it won't. I need to update it to call both nodeByRack and the real topology for the cluster.

}
}

return nodeAlignedSplitPoints;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think this could work too (if we don't select a specific token, it would be because we selected it for a different split point). Added a couple of comments on ensuring no repetition, and not exhausting the sorted tokens too early.

This method needs to be unit-tested.

// Need to allocate tokens within node boundaries.
var endpoints = tokenMetadata.getAllEndpoints();
double addititionalSplits = splitPointCount - sortedTokens.size();
var splitPointsPerNode = (int) Math.ceil(addititionalSplits / endpoints.size());
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we truncate the list, does that present issues for ensuring that higher levels of UCS have the same splits as lower levels?

No, it does not, because the same tokens will be generated again when we generate the higher-count levels (in other words, we can cache the smaller generation round (pre-truncation) and use it to save some work when generating the bigger).

I would have thought the selection scheme would give us better data distribution, as opposed to truncating the list of new tokens

The token allocation gives the biggest-impact-token first, which is also the one that splits the current biggest token range so taking them in order should be a good enough choice and we can save the effort. Also, I'm not sure looking for closest to even split can't cause bigger oddities.

* Fix NodeAlignedShardTrack to handle edge cases correclty.
* Improve IsolatedTokenAllocator design. Only return the
  exact number of requested tokens.
* Improve QuietSnitch by making it wrap the real snitch
* Update which snitch is used in TokenAllocation
* Add test coverage of IsolatedTokenAllocator
We were incorrectly starting with a 0
ratioToLeft arg and that meant the first
array entry was always the min token.
@@ -67,6 +67,10 @@ public static List<Token> allocateTokens(int additionalSplits, AbstractReplicati
// order to retreive the topology.
var localRacks = source.getTokenMetadata().cloneOnlyTokenMap().getTopology().getDatacenterRacks().get(localDc);
assert localRacks != null && !localRacks.isEmpty() : "No racks found for local datacenter " + localDc;
// TODO how concerned should we be about the order of the racks here? Seems like we need to do it the
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is currently okay, because in the RF=racks case the allocation in each rack is independent.

That said, in the longer term we need to find a way to fix the rack order; at the moment I can't think of an easy way to do it.

@michaeljmarshall michaeljmarshall marked this pull request as ready for review September 17, 2024 17:19
@michaeljmarshall
Copy link
Member Author

Marking as ready for review. There is still work to be done, but this will let tests run.

…hing

also, remove some logs that are no longer necessary
@michaeljmarshall michaeljmarshall changed the title WIP: Initial ShardManagerNodeAware implemenation Implement ShardManagerTokenAware class to split shards along node-token boundaries Sep 17, 2024
@blambov
Copy link

blambov commented Sep 18, 2024

Could we temporarily change DEFAULT_IS_NODE_AWARE to true to run tests with it?

@michaeljmarshall
Copy link
Member Author

Looks like we hit an UnsupportedOperationException in several tests. Checking to see if it's an issue.

java.lang.UnsupportedOperationException: Token type BytesToken does not support token allocation.

	at org.apache.cassandra.dht.ByteOrderedPartitioner$BytesToken.size(ByteOrderedPartitioner.java:134)
	at org.apache.cassandra.db.compaction.ShardManagerTokenAware$TokenAlignedShardTracker.rangeSpanned(ShardManagerTokenAware.java:285)
	at org.apache.cassandra.db.compaction.ShardTracker.applyTokenSpaceCoverage(ShardTracker.java:78)
	at org.apache.cassandra.db.compaction.ShardManagerTokenAware$TokenAlignedShardTracker.applyTokenSpaceCoverage(ShardManagerTokenAware.java:305)
	at org.apache.cassandra.db.compaction.unified.ShardedMultiWriter.prepareToCommit(ShardedMultiWriter.java:256)
	at org.apache.cassandra.db.ColumnFamilyStore$Flush.flushMemtable(ColumnFamilyStore.java:1406)
	at org.apache.cassandra.db.ColumnFamilyStore$Flush.run(ColumnFamilyStore.java:1315)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.base/java.lang.Thread.run(Thread.java:829)

Comment on lines +61 to +62
* The index of the shard this tracker is currently on. By convention, the shard index starts at -1 for the
* minimum token and increases by one for each shard.
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just noticed this javadoc isn't right and we have two ways that we count index values. In ShardManagerTrivial's anonymous class, we return 0 always. In BoundaryTracker, we start at 0 and increment. In BoundaryTrackerDiskAware, we return -1 immediately after creation, but any skips will trigger it to go to 0.

My main point of confusion is what the shard index is. It's not the index in the backing array of split points, it's the index of the shard count. At the moment, it's only used for logging, so the stakes are fairly low here. I will try to find a conclusive answer tomorrow.

@michaeljmarshall
Copy link
Member Author

@blambov - at this point, the remaining test failures appear to be from disk boundaries attempting to combine with the token aware shard manager:

	Caused by: java.lang.IllegalArgumentException: Cannot use node aware strategy with disk boundaries
		at org.apache.cassandra.db.compaction.ShardManager.create(ShardManager.java:51)
		at org.apache.cassandra.db.compaction.UnifiedCompactionStrategy.maybeUpdateSelector(UnifiedCompactionStrategy.java:400)
		at org.apache.cassandra.db.compaction.UnifiedCompactionStrategy.createSSTableMultiWriter(UnifiedCompactionStrategy.java:351)
		at org.apache.cassandra.db.compaction.UnifiedCompactionContainer.createSSTableMultiWriter(UnifiedCompactionContainer.java:327)
		at org.apache.cassandra.db.ColumnFamilyStore.createSSTableMultiWriter(ColumnFamilyStore.java:735)
		at org.apache.cassandra.db.ColumnFamilyStore.createSSTableMultiWriter(ColumnFamilyStore.java:730)
		at org.apache.cassandra.db.memtable.Flushing.createFlushWriter(Flushing.java:303)
		at org.apache.cassandra.db.memtable.Flushing.flushRunnable(Flushing.java:135)
		at org.apache.cassandra.db.memtable.Flushing.flushRunnables(Flushing.java:96)
		at org.apache.cassandra.db.memtable.Flushing.flushRunnables(Flushing.java:73)
		at org.apache.cassandra.db.ColumnFamilyStore$Flush.flushMemtable(ColumnFamilyStore.java:1360)
		at org.apache.cassandra.db.ColumnFamilyStore$Flush.run(ColumnFamilyStore.java:1315)

I think 073452c was a legitimate issue but 43624fc seems a bit more questionable.

What are your thoughts?

@blambov
Copy link

blambov commented Sep 19, 2024

java.lang.UnsupportedOperationException: Token type BytesToken does not support token allocation. normally means the test is using ByteOrderedPartitioner. Since token allocation cannot work with it either, there's no point to try to fix the tests for that.

Actually we should not instantiate ShardManagerTokenAware if the partitioner for the table does not support splitting/sizing (!partitioner.splitter().isPresent()), and probably push its selection to after the test-specific adjustments here.

@blambov
Copy link

blambov commented Sep 19, 2024

If we do the above, neither of the two fixes above should be needed.

Copy link

sonarcloud bot commented Sep 19, 2024

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants