Add multi-column partitioning support and custom job write path#58
Conversation
…iably detect writes to the specific target table.
vvancak
left a comment
There was a problem hiding this comment.
Looks good. A couple of clarifying questions from my side: (our use-case, tables partitioned by version and information date)
A: Last month we ran v1 and v2. This month, we're going to run v2 and v3. V3 will only run for next week's predictions, if we've correctly set the look-back period, right?
B: Are we breaking backwards compatibility of any of the current model's configs? I've seen some config schema changes, will we need to update configs when we bump rialto version, or is this backward-compatible?
- (suggestion): Do we have any examples on how to set the filters in the dependenecies in the readme file? This bit also takes a bit of thinking to realize why it's needed, which guides me to think that we should have it documented a bit better.
rialto/runner/utils.py
Outdated
| f"Filter column '{col}' not found in partitions of {table.get_table_path()}. " | ||
| f"Available partition columns: {partition_df.columns}" | ||
| ) | ||
| partition_df = partition_df.filter(partition_df[col] == val) |
There was a problem hiding this comment.
Don't we need to type to string explicitly here? I've ran into some issues previously where if you're trying to compare a column of datatype double containing value 1 with a config string "1" just didn't show equality. Explicitly casting the column to string will fix the issue.
Obviously won't be an issue when the partition column is of string datatype, but generally I don't think we want to make that assumption
There was a problem hiding this comment.
Good catch, I'll fix that
| return None | ||
|
|
||
|
|
||
| def get_rows_from_history(spark: SparkSession, table_path: str, version: str) -> int: |
There was a problem hiding this comment.
I have to be honest, I don't get this. Would you please give me a TLDR as to why we need to go into delta table versions please?
There was a problem hiding this comment.
Previously we looked at the number of rows with this information_date, but when we have multiple partitions with different versions, then number of rows with this information_date will include writes happened for both versions. Technically, we can use filters here but this solution is much easier. We look at the last write to the table, make sure it happened within this session and look at the number of rows written
There was a problem hiding this comment.
I'm also not happy with this, and that's not only on you but we're getting too specific to how databricks work with the whole package and that should not be the goal. I'd like to maintain as much abstraction as possible and mainly focus on spark, otherwise we're not going to be able to keep up updating this every time something changes.
A: Yes B: No, it's fully backward-compatible, all existing configs will keep working without any changes *: Yes, I was also thinking of that. I have a half-written instruction, I'll just add it here |
|
My main concern for adding more docs about how to use multi-column partitioning so that the readme is sparse and doesn't cover in many details how things work. So, I added more explanation in general, how Runner works and then added information about specifically multi-column partitioning |
MDobransky
left a comment
There was a problem hiding this comment.
Hi, sorry I can't approve this, there's too many core changes, some of them breaking, i.e. checking completed jobs based on a storage partition information which is not always available (e.g. views don't have partitions). Some of the user-facing interface, like the target config is getting too cluttered for my comfort. And I don't really understand how multi-partition writes/overwrites work to be comfortable with approving this without seeing some demos.
In general I would prefer if the solution for your a/b testing problem didn't focus on partitions as much. I think the only problem you have is saving a b-version once you have an a-version on the same data without loosing the a-version data, it would be simpler to instead implement an append-write instead of partition handling.
Happy to discuss in a meeting.
| :param table: input table path | ||
| :param date_until: Optional until date (inclusive) | ||
| :param uppercase_columns: Option to refactor all column names to uppercase | ||
| :param filters: Optional dict of column filters to apply before finding latest date |
There was a problem hiding this comment.
I get your point that it's applied before the date filter if this is an outside partition like your usecase, but once you give people an option to filter, they will filter, and it would be faster to first select date, which is always a physical partition and then filter on anything else after that.
| def get_table( | ||
| self, | ||
| table: str, | ||
| date_column: str, | ||
| date_from: Optional[datetime.date] = None, | ||
| date_to: Optional[datetime.date] = None, | ||
| uppercase_columns: bool = False, | ||
| ) -> DataFrame: |
There was a problem hiding this comment.
Please also add the functionality here, I think it makes sense to have it in both functions for consistency.
| partition_df = spark.sql(f"SHOW PARTITIONS {table.get_table_path()}") | ||
|
|
||
| if table.date_column not in partition_df.columns: | ||
| raise ValueError( | ||
| f"date_column '{table.date_column}' not found in partitions of {table.get_table_path()}. " | ||
| f"Available partition columns: {partition_df.columns}" | ||
| ) |
There was a problem hiding this comment.
Sorry I don't like this, it's a big change and can break things, we're using this to load a variaty of tables, not all of which have the info_date as a delta partition, there's a reason things were done in the more simple dumb way, because the delta partition checking didn't always work. The Interface of get_latest, does not tell you that you require something to be a partition on a storage system somewhere not should it. Also this won't let you read views.
| f"Filter column '{col}' not found in partitions of {table.get_table_path()}. " | ||
| f"Available partition columns: {partition_df.columns}" | ||
| ) | ||
| partition_df = partition_df.filter(F.col(col).cast("string") == val) |
There was a problem hiding this comment.
I suspect that by doing this you can determine that a partition doesn't exist for a certain date, i.e. rialto will think that the job didn't run for that that just because you filtered it out on a different value than info_date, and then you rerun it with a dynamic partition overwrite on, what does this do the the original partition? did you test it that it won't remove any data the way we're saving it?
| return None | ||
|
|
||
|
|
||
| def get_rows_from_history(spark: SparkSession, table_path: str, version: str) -> int: |
There was a problem hiding this comment.
I'm also not happy with this, and that's not only on you but we're getting too specific to how databricks work with the whole package and that should not be the goal. I'd like to maintain as much abstraction as possible and mainly focus on spark, otherwise we're not going to be able to keep up updating this every time something changes.
Summary
This PR adds support for partitioning output tables by multiple columns (e.g.,
INFORMATION_DATE+VERSION) instead of just a single date column. This enables A/B testing and other use cases where data needs to be organized by multiple dimensions.It's fully backward compatible as single column partitioning continues to work as before. All new config fields are optional
Implementation details
To enable multi-column partitioning, I had to refactor dependency tracking and write verification functionality. Here is an overview of changes:
Dependency tracking with filters. When checking if source data is available, we need to check for specific partition combinations in that date range.
Completion Checking with Filters. When deciding if target table needs computation, check for the exact partition combination (date + version), not just the date.
TableReader with Filters. Transformations can now load specific partition variants from dependencies (e.g., latest data for specific version only).
Write Verification Using Delta History:
Old approach counted records by filtering
INFORMATION_DATE = <date>. This doesn't work with multi-column partitioning. New approach uses Delta table version tracking: