Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sort by dw_last_updated before materializing #334

Draft
wants to merge 5 commits into
base: main
Choose a base branch
from

Conversation

DmitriGekhtman
Copy link
Contributor

When handling compaction of an upsert delta, sort the table descending by dw_last_update_time prior to materializing the result.

Copy link

@github-actions github-actions bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Performance Alert ⚠️

Possible performance regression was detected for benchmark.
Benchmark result of this commit is worse than the previous benchmark result exceeding threshold 2.

Benchmark suite Current: 9ff29ca Previous: d77a208 Ratio
deltacat/tests/compute/test_compact_partition_incremental.py::test_compact_partition_incremental[1-incremental-pkstr-sknone-norcf_V1] 1.0831294944099468 iter/sec (stddev: 0) 2.2521535582165826 iter/sec (stddev: 0) 2.08
deltacat/tests/compute/test_compact_partition_incremental.py::test_compact_partition_incremental[2-incremental-pkstr-skstr-norcf_V2] 1.1254497739035378 iter/sec (stddev: 0) 2.266440743018205 iter/sec (stddev: 0) 2.01
deltacat/tests/compute/test_compact_partition_incremental.py::test_compact_partition_incremental[4-incremental-duplicate-pk_V2] 1.111115950638335 iter/sec (stddev: 0) 2.3100160932581546 iter/sec (stddev: 0) 2.08
deltacat/tests/compute/test_compact_partition_incremental.py::test_compact_partition_incremental[5-incremental-decimal-pk-simple_V2] 0.9679329973393876 iter/sec (stddev: 0) 2.2460476215775427 iter/sec (stddev: 0) 2.32
deltacat/tests/compute/test_compact_partition_incremental.py::test_compact_partition_incremental[6-incremental-integer-pk-simple_V1] 1.1245483151888511 iter/sec (stddev: 0) 2.2710662045823544 iter/sec (stddev: 0) 2.02
deltacat/tests/compute/test_compact_partition_incremental.py::test_compact_partition_incremental[8-incremental-decimal-timestamp-pk-multi_V1] 1.1231242805279775 iter/sec (stddev: 0) 2.264246475800043 iter/sec (stddev: 0) 2.02
deltacat/tests/compute/test_compact_partition_incremental.py::test_compact_partition_incremental[9-incremental-decimal-pk-multi-dup_V1] 1.1232345724879993 iter/sec (stddev: 0) 2.294583661883228 iter/sec (stddev: 0) 2.04
deltacat/tests/compute/test_compact_partition_incremental.py::test_compact_partition_incremental[10-incremental-decimal-pk-partitionless_V2] 1.1195996481837251 iter/sec (stddev: 0) 2.253822151056081 iter/sec (stddev: 0) 2.01
deltacat/tests/compute/test_compact_partition_incremental.py::test_compact_partition_incremental[11-incremental-decimal-hash-bucket-single_V1] 0.9619740674455397 iter/sec (stddev: 0) 2.2273418119238033 iter/sec (stddev: 0) 2.32
deltacat/tests/compute/test_compact_partition_incremental.py::test_compact_partition_incremental[11-incremental-decimal-hash-bucket-single_V2] 1.099635798204431 iter/sec (stddev: 0) 2.2643017544376827 iter/sec (stddev: 0) 2.06
deltacat/tests/compute/test_compact_partition_incremental.py::test_compact_partition_incremental[12-incremental-decimal-single-hash-bucket_V1] 1.1296077565953353 iter/sec (stddev: 0) 2.297777655168341 iter/sec (stddev: 0) 2.03
deltacat/tests/compute/test_compact_partition_incremental.py::test_compact_partition_incremental[12-incremental-decimal-single-hash-bucket_V2] 1.2816422705675998 iter/sec (stddev: 0) 2.6295975732771018 iter/sec (stddev: 0) 2.05

This comment was automatically generated by workflow using github-action-benchmark.

Copy link
Collaborator

@raghumdani raghumdani left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good overall. Please address few minor comments.

Comment on lines +169 to +170
if DW_LAST_UPDATED_COLUMN_NAME in final_table.column_names:
final_table = final_table.sort_by([(DW_LAST_UPDATED_COLUMN_NAME, "descending")])
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Assuming the previously compacted table will already be sorted, we can further optimize by only sorting the incremental table.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I understand. Would you mind linking a code reference?

Copy link
Contributor Author

@DmitriGekhtman DmitriGekhtman Jul 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, my bad, used my eyeballs.
Yes, I see. For the purposes of a POC, perhaps you can do that in a separate PR. Would indeed be good to avoid wasting time sorting a sorted table.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I now realize we can use sort_keys argument to sort the incremental instead of hardcoding here. This was okay for POC but we want to limit this hard coding to our internal packages.

),
expected_terminal_compact_partition_result=pa.Table.from_arrays([]),
expected_terminal_exception=None,
expected_terminal_exception_message=None,
do_create_placement_group=False,
records_per_compacted_file=DEFAULT_MAX_RECORDS_PER_FILE,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

max records per file is 4M but your test case doesn't have more than 4M records. So, it only tests sorting within a single file but not across multiple files.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you recommend reducing DEFAULT_MAX_RECORDS_PER_FILE to a number smaller than than the input_deltas table?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(At least, for the new test case?)

@@ -274,16 +275,24 @@ def test_compact_partition_rebase_same_source_and_destination(
compacted_delta_locator, storage_type=StorageType.LOCAL, **ds_mock_kwargs
)
actual_rebase_compacted_table = pa.concat_tables(tables)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can add one assertion if the result should yield multiple tables.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand correctly, the suggestion is to add an assertion on len(tables)?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants