Skip to content

Add multi-column partitioning support and custom job write path#58

Open
tryhubyshyn-absa wants to merge 5 commits intoAbsaOSS:developfrom
tryhubyshyn-absa:add-multi-partitioning-and-custom-job-write-path
Open

Add multi-column partitioning support and custom job write path#58
tryhubyshyn-absa wants to merge 5 commits intoAbsaOSS:developfrom
tryhubyshyn-absa:add-multi-partitioning-and-custom-job-write-path

Conversation

@tryhubyshyn-absa
Copy link

Summary

This PR adds support for partitioning output tables by multiple columns (e.g., INFORMATION_DATE + VERSION) instead of just a single date column. This enables A/B testing and other use cases where data needs to be organized by multiple dimensions.

It's fully backward compatible as single column partitioning continues to work as before. All new config fields are optional

Implementation details

To enable multi-column partitioning, I had to refactor dependency tracking and write verification functionality. Here is an overview of changes:

  • Dependency tracking with filters. When checking if source data is available, we need to check for specific partition combinations in that date range.

  • Completion Checking with Filters. When deciding if target table needs computation, check for the exact partition combination (date + version), not just the date.

  • TableReader with Filters. Transformations can now load specific partition variants from dependencies (e.g., latest data for specific version only).

  • Write Verification Using Delta History:

    Old approach counted records by filtering INFORMATION_DATE = <date>. This doesn't work with multi-column partitioning. New approach uses Delta table version tracking:

    1. Get table version before write
    2. Write data
    3. Get version after write
    4. If changed, read row count from Delta history
    

Copy link
Collaborator

@vvancak vvancak left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good. A couple of clarifying questions from my side: (our use-case, tables partitioned by version and information date)

A: Last month we ran v1 and v2. This month, we're going to run v2 and v3. V3 will only run for next week's predictions, if we've correctly set the look-back period, right?

B: Are we breaking backwards compatibility of any of the current model's configs? I've seen some config schema changes, will we need to update configs when we bump rialto version, or is this backward-compatible?

  • (suggestion): Do we have any examples on how to set the filters in the dependenecies in the readme file? This bit also takes a bit of thinking to realize why it's needed, which guides me to think that we should have it documented a bit better.

f"Filter column '{col}' not found in partitions of {table.get_table_path()}. "
f"Available partition columns: {partition_df.columns}"
)
partition_df = partition_df.filter(partition_df[col] == val)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we need to type to string explicitly here? I've ran into some issues previously where if you're trying to compare a column of datatype double containing value 1 with a config string "1" just didn't show equality. Explicitly casting the column to string will fix the issue.

Obviously won't be an issue when the partition column is of string datatype, but generally I don't think we want to make that assumption

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, I'll fix that

return None


def get_rows_from_history(spark: SparkSession, table_path: str, version: str) -> int:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have to be honest, I don't get this. Would you please give me a TLDR as to why we need to go into delta table versions please?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously we looked at the number of rows with this information_date, but when we have multiple partitions with different versions, then number of rows with this information_date will include writes happened for both versions. Technically, we can use filters here but this solution is much easier. We look at the last write to the table, make sure it happened within this session and look at the number of rows written

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm also not happy with this, and that's not only on you but we're getting too specific to how databricks work with the whole package and that should not be the goal. I'd like to maintain as much abstraction as possible and mainly focus on spark, otherwise we're not going to be able to keep up updating this every time something changes.

@tryhubyshyn-absa
Copy link
Author

Looks good. A couple of clarifying questions from my side: (our use-case, tables partitioned by version and information date)

A: Last month we ran v1 and v2. This month, we're going to run v2 and v3. V3 will only run for next week's predictions, if we've correctly set the look-back period, right?

B: Are we breaking backwards compatibility of any of the current model's configs? I've seen some config schema changes, will we need to update configs when we bump rialto version, or is this backward-compatible?

  • (suggestion): Do we have any examples on how to set the filters in the dependenecies in the readme file? This bit also takes a bit of thinking to realize why it's needed, which guides me to think that we should have it documented a bit better.

A: Yes

B: No, it's fully backward-compatible, all existing configs will keep working without any changes

*: Yes, I was also thinking of that. I have a half-written instruction, I'll just add it here

@tryhubyshyn-absa
Copy link
Author

My main concern for adding more docs about how to use multi-column partitioning so that the readme is sparse and doesn't cover in many details how things work. So, I added more explanation in general, how Runner works and then added information about specifically multi-column partitioning

Copy link
Collaborator

@MDobransky MDobransky left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, sorry I can't approve this, there's too many core changes, some of them breaking, i.e. checking completed jobs based on a storage partition information which is not always available (e.g. views don't have partitions). Some of the user-facing interface, like the target config is getting too cluttered for my comfort. And I don't really understand how multi-partition writes/overwrites work to be comfortable with approving this without seeing some demos.

In general I would prefer if the solution for your a/b testing problem didn't focus on partitions as much. I think the only problem you have is saving a b-version once you have an a-version on the same data without loosing the a-version data, it would be simpler to instead implement an append-write instead of partition handling.

Happy to discuss in a meeting.

:param table: input table path
:param date_until: Optional until date (inclusive)
:param uppercase_columns: Option to refactor all column names to uppercase
:param filters: Optional dict of column filters to apply before finding latest date
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I get your point that it's applied before the date filter if this is an outside partition like your usecase, but once you give people an option to filter, they will filter, and it would be faster to first select date, which is always a physical partition and then filter on anything else after that.

Comment on lines 54 to 61
def get_table(
self,
table: str,
date_column: str,
date_from: Optional[datetime.date] = None,
date_to: Optional[datetime.date] = None,
uppercase_columns: bool = False,
) -> DataFrame:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please also add the functionality here, I think it makes sense to have it in both functions for consistency.

Comment on lines +82 to +88
partition_df = spark.sql(f"SHOW PARTITIONS {table.get_table_path()}")

if table.date_column not in partition_df.columns:
raise ValueError(
f"date_column '{table.date_column}' not found in partitions of {table.get_table_path()}. "
f"Available partition columns: {partition_df.columns}"
)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I don't like this, it's a big change and can break things, we're using this to load a variaty of tables, not all of which have the info_date as a delta partition, there's a reason things were done in the more simple dumb way, because the delta partition checking didn't always work. The Interface of get_latest, does not tell you that you require something to be a partition on a storage system somewhere not should it. Also this won't let you read views.

f"Filter column '{col}' not found in partitions of {table.get_table_path()}. "
f"Available partition columns: {partition_df.columns}"
)
partition_df = partition_df.filter(F.col(col).cast("string") == val)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suspect that by doing this you can determine that a partition doesn't exist for a certain date, i.e. rialto will think that the job didn't run for that that just because you filtered it out on a different value than info_date, and then you rerun it with a dynamic partition overwrite on, what does this do the the original partition? did you test it that it won't remove any data the way we're saving it?

return None


def get_rows_from_history(spark: SparkSession, table_path: str, version: str) -> int:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm also not happy with this, and that's not only on you but we're getting too specific to how databricks work with the whole package and that should not be the goal. I'd like to maintain as much abstraction as possible and mainly focus on spark, otherwise we're not going to be able to keep up updating this every time something changes.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants