Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sort by dw_last_updated before materializing #334

Draft
wants to merge 5 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion deltacat/compute/compactor_v2/steps/merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,10 @@
interface as unimplemented_deltacat_storage,
)
from deltacat.compute.compactor_v2.utils.dedupe import drop_duplicates
from deltacat.constants import BYTES_PER_GIBIBYTE
from deltacat.constants import (
BYTES_PER_GIBIBYTE,
DW_LAST_UPDATED_COLUMN_NAME,
)
from deltacat.compute.compactor_v2.constants import (
MERGE_TIME_IN_SECONDS,
MERGE_SUCCESS_COUNT,
Expand Down Expand Up @@ -162,6 +165,10 @@ def _merge_tables(
final_table = pa.concat_tables(result_table_list)
final_table = final_table.drop([sc._PK_HASH_STRING_COLUMN_NAME])

# TODO: Retrieve sort order policy from the table version metadata instead of hard-coding.
if DW_LAST_UPDATED_COLUMN_NAME in final_table.column_names:
final_table = final_table.sort_by([(DW_LAST_UPDATED_COLUMN_NAME, "descending")])
Comment on lines +169 to +170
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Assuming the previously compacted table will already be sorted, we can further optimize by only sorting the incremental table.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I understand. Would you mind linking a code reference?

Copy link
Contributor Author

@DmitriGekhtman DmitriGekhtman Jul 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, my bad, used my eyeballs.
Yes, I see. For the purposes of a POC, perhaps you can do that in a separate PR. Would indeed be good to avoid wasting time sorting a sorted table.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I now realize we can use sort_keys argument to sort the incremental instead of hardcoding here. This was okay for POC but we want to limit this hard coding to our internal packages.


return final_table


Expand Down
2 changes: 2 additions & 0 deletions deltacat/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,3 +53,5 @@
PYARROW_INFLATION_MULTIPLIER_ALL_COLUMNS = 6

MEMORY_TO_HASH_BUCKET_COUNT_RATIO = 0.0512 * BYTES_PER_TEBIBYTE

DW_LAST_UPDATED_COLUMN_NAME = "dw_last_updated"
46 changes: 41 additions & 5 deletions deltacat/tests/compute/compact_partition_rebase_test_cases.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import datetime as dt
import pyarrow as pa
from deltacat.constants import DW_LAST_UPDATED_COLUMN_NAME
from deltacat.tests.compute.test_util_common import (
PartitionKey,
PartitionKeyType,
Expand Down Expand Up @@ -65,20 +67,54 @@ class RebaseCompactionTestCaseParams(BaseCompactorTestCase):
],
names=["pk_col_1", "sk_col_1", "sk_col_2", "col_1"],
),
expected_terminal_compact_partition_result=pa.Table.from_arrays(
expected_terminal_compact_partition_result=pa.Table.from_arrays([]),
expected_terminal_exception=None,
expected_terminal_exception_message=None,
do_create_placement_group=False,
records_per_compacted_file=DEFAULT_MAX_RECORDS_PER_FILE,
hash_bucket_count=DEFAULT_HASH_BUCKET_COUNT,
read_kwargs_provider=None,
drop_duplicates=True,
skip_enabled_compact_partition_drivers=[CompactorVersion.V1],
),
"2-rebase-sort": RebaseCompactionTestCaseParams(
primary_keys={"pk_col_1"},
sort_keys=[
SortKey.of(key_name="sk_col_1"),
SortKey.of(key_name="sk_col_2"),
],
partition_keys=[PartitionKey.of("region_id", PartitionKeyType.INT)],
partition_values=["1"],
input_deltas=pa.Table.from_arrays(
[
pa.array([str(i) for i in range(10)]),
pa.array([i for i in range(20, 30)]),
pa.array([i for i in range(0, 10)]),
pa.array(["foo"] * 10),
pa.array([i / 10 for i in range(40, 50)]),
pa.array([i / 10 for i in range(10, 20)]),
pa.array(dt.datetime(year, 1, 1) for year in range(2000, 2010)),
],
names=["pk_col_1", "sk_col_1", "sk_col_2", "col_1"],
names=["pk_col_1", "sk_col_1", "sk_col_2", "col_1", DW_LAST_UPDATED_COLUMN_NAME],
),
input_deltas_delta_type=DeltaType.UPSERT,
# dw_last_update is in ascending order in the input table.
# Expect descending sort on dw_last_updated for each hash bucket.
# Since there is only one hash bucket, the order of input rows should be reversed.
rebase_expected_compact_partition_result=pa.Table.from_arrays(
[
pa.array([str(i) for i in reversed(range(10))]),
pa.array([i for i in reversed(range(0, 10))]),
pa.array(["foo"] * 10),
pa.array([i / 10 for i in reversed(range(10, 20))]),
pa.array(dt.datetime(year, 1, 1) for year in reversed(range(2000, 2010))),
],
names=["pk_col_1", "sk_col_1", "sk_col_2", "col_1", DW_LAST_UPDATED_COLUMN_NAME],
),
expected_terminal_compact_partition_result=pa.Table.from_arrays([]),
expected_terminal_exception=None,
expected_terminal_exception_message=None,
do_create_placement_group=False,
records_per_compacted_file=DEFAULT_MAX_RECORDS_PER_FILE,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

max records per file is 4M but your test case doesn't have more than 4M records. So, it only tests sorting within a single file but not across multiple files.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you recommend reducing DEFAULT_MAX_RECORDS_PER_FILE to a number smaller than than the input_deltas table?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(At least, for the new test case?)

hash_bucket_count=DEFAULT_HASH_BUCKET_COUNT,
hash_bucket_count=1,
read_kwargs_provider=None,
drop_duplicates=True,
skip_enabled_compact_partition_drivers=[CompactorVersion.V1],
Expand Down
2 changes: 2 additions & 0 deletions deltacat/tests/compute/compact_partition_test_cases.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import datetime as dt
import pyarrow as pa
from typing import Callable, Dict, List, Optional, Set, Tuple, Union
from deltacat.constants import DW_LAST_UPDATED_COLUMN_NAME
from deltacat.tests.compute.test_util_common import (
offer_iso8601_timestamp_list,
PartitionKey,
Expand Down
27 changes: 17 additions & 10 deletions deltacat/tests/compute/test_compact_partition_rebase.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import boto3
from boto3.resources.base import ServiceResource
import pyarrow as pa
from deltacat.constants import DW_LAST_UPDATED_COLUMN_NAME
from deltacat.io.ray_plasma_object_store import RayPlasmaObjectStore
from pytest_benchmark.fixture import BenchmarkFixture

Expand Down Expand Up @@ -274,16 +275,22 @@ def test_compact_partition_rebase_same_source_and_destination(
compacted_delta_locator, storage_type=StorageType.LOCAL, **ds_mock_kwargs
)
actual_rebase_compacted_table = pa.concat_tables(tables)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can add one assertion if the result should yield multiple tables.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand correctly, the suggestion is to add an assertion on len(tables)?

# if no primary key is specified then sort by sort_key for consistent assertion
sorting_cols: List[Any] = (
[(val, "ascending") for val in primary_keys] if primary_keys else sort_keys
)
rebase_expected_compact_partition_result = (
rebase_expected_compact_partition_result.combine_chunks().sort_by(sorting_cols)
)
actual_rebase_compacted_table = (
actual_rebase_compacted_table.combine_chunks().sort_by(sorting_cols)
)
if DW_LAST_UPDATED_COLUMN_NAME in actual_rebase_compacted_table.column_names:
# If DW_LAST_UPDATED_COLUMN_NAME is present, don't sort expected and actual tables;
# we want to assert on the order of the rows in the table, to validate sorting on timestamp.
pass
else:
# If DW_LAST_UPDATED_COLUMN_NAME is absent, sort by primary key for consistent assertion.
# Sort by sort_key if no primary key is specified.
sorting_cols: List[Any] = (
[(val, "ascending") for val in primary_keys] if primary_keys else sort_keys
)
rebase_expected_compact_partition_result = (
rebase_expected_compact_partition_result.combine_chunks().sort_by(sorting_cols)
)
actual_rebase_compacted_table = (
actual_rebase_compacted_table.combine_chunks().sort_by(sorting_cols)
)
assert actual_rebase_compacted_table.equals(
rebase_expected_compact_partition_result
), f"{actual_rebase_compacted_table} does not match {rebase_expected_compact_partition_result}"
Loading