From 49147e06001dc6fb6ea32e2f4864e9a28b9d96e0 Mon Sep 17 00:00:00 2001 From: Krishna R Maddikara Date: Sun, 15 Mar 2026 21:14:39 -0700 Subject: [PATCH 1/2] feat(bigquery,cubestore): Parquet pre-aggregation export with WIF-native URLs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Problem 1 — CSV.gz export is slow and expensive: BigQuery driver exports pre-aggregation data as CSV.gz. For large tables this means gigabytes of intermediate files. Parquet is 3-5x smaller and is the native CubeStore internal format. Problem 2 — getSignedUrl() requires SA key bytes (broken on GKE WIF): getSignedUrl() requires service account key bytes to sign URLs. WIF tokens from the metadata server cannot sign URLs. Pre-agg pipeline silently fails: BQ exports fine, CubeStore gets 403. Problem 3 — CubeStore cannot import Parquet (issue #3051): CubeStore only accepted CSV in its external import path. CubeStore already uses parquet/arrow internally for .chunk.parquet but CREATE TABLE ... WITH (input_format) lacked Parquet support. Fix: packages/cubejs-bigquery-driver/src/BigQueryDriver.ts: - Export format: CSV.gz -> PARQUET - URL generation: getSignedUrl() -> gs://bucket/object (IAM-authenticated) - Return key: csvFile -> parquetFile packages/cubejs-cubestore-driver/src/CubeStoreDriver.ts: - Add importParquetFile() method - Add parquetFile branch in uploadTableWithIndexes() - Sends: CREATE TABLE t (...) WITH (input_format = 'parquet') LOCATION 'gs://...' rust/cubestore/cubestore/src/metastore/mod.rs: - Add ImportFormat::Parquet variant to enum rust/cubestore/cubestore/src/sql/mod.rs: - Parse input_format = 'parquet' in WITH clause rust/cubestore/cubestore/src/import/mod.rs: - Dispatch ImportFormat::Parquet to do_import_parquet() - Add do_import_parquet() using DataFusion ParquetRecordBatchReaderBuilder - Add arrow_array_value_to_string() helper for Arrow to TableValue conversion - Fix resolve_location() to handle gs:// URLs via GCS API with WIF token - Fix estimate_location_row_count() to skip fs::metadata() for remote URLs Works with Workload Identity when combined with the GCS WIF fix in gcs.rs. Backward compatible: Postgres/Snowflake/Redshift pre-aggs unaffected. Closes #3051 Closes #9837 --- .../src/BigQueryDriver.ts | 44 +-- .../src/CubeStoreDriver.ts | 10 + rust/cubestore/cubestore/src/import/mod.rs | 373 +++++++++++++++++- rust/cubestore/cubestore/src/metastore/mod.rs | 1 + rust/cubestore/cubestore/src/sql/mod.rs | 3 + 5 files changed, 396 insertions(+), 35 deletions(-) diff --git a/packages/cubejs-bigquery-driver/src/BigQueryDriver.ts b/packages/cubejs-bigquery-driver/src/BigQueryDriver.ts index 69b8aca39761b..50d9e6e3be3e3 100644 --- a/packages/cubejs-bigquery-driver/src/BigQueryDriver.ts +++ b/packages/cubejs-bigquery-driver/src/BigQueryDriver.ts @@ -342,35 +342,25 @@ export class BigQueryDriver extends BaseDriver implements DriverInterface { } public async unload(table: string): Promise { - if (!this.bucket) { - throw new Error('Unload is not configured'); - } - - const destination = this.bucket.file(`${table}-*.csv.gz`); - const [schema, tableName] = table.split('.'); - const bigQueryTable = this.bigquery.dataset(schema).table(tableName); - const [job] = await bigQueryTable.createExtractJob(destination, { format: 'CSV', gzip: true }); - await this.waitForJobResult(job, { table }, false); - // There is an implementation for extracting and signing urls from S3 - // @see BaseDriver->extractUnloadedFilesFromS3() - // Please use that if you need. Here is a different flow - // because bigquery requires storage/bucket object for other things, - // and there is no need to initiate another one (created in extractUnloadedFilesFromS3()). - const [files] = await this.bucket.getFiles({ prefix: `${table}-` }); - const urls = await Promise.all(files.map(async file => { - const [url] = await file.getSignedUrl({ - action: 'read', - expires: new Date(new Date().getTime() + 60 * 60 * 1000), - }); - return url; - })); - - return { - exportBucketCsvEscapeSymbol: this.options.exportBucketCsvEscapeSymbol, - csvFile: urls, - }; + if (!this.bucket) { + throw new Error('Unload is not configured'); } + const destination = this.bucket.file(`${table}-*.parquet`); + const [schema, tableName] = table.split('.'); + const bigQueryTable = this.bigquery.dataset(schema).table(tableName); + const [job] = await bigQueryTable.createExtractJob(destination, { format: 'PARQUET' }); + await this.waitForJobResult(job, { table }, false); + const [files] = await this.bucket.getFiles({ prefix: `${table}-` }); + const bucketName = this.bucket.name; + const urls = files.map(file => `gs://${bucketName}/${file.name}`); + + return { + exportBucketCsvEscapeSymbol: this.options.exportBucketCsvEscapeSymbol, + parquetFile: urls, + }; +} + public async loadPreAggregationIntoTable( preAggregationTableName: string, loadSql: string, diff --git a/packages/cubejs-cubestore-driver/src/CubeStoreDriver.ts b/packages/cubejs-cubestore-driver/src/CubeStoreDriver.ts index 2b6824842dd60..b65bd0a647a5f 100644 --- a/packages/cubejs-cubestore-driver/src/CubeStoreDriver.ts +++ b/packages/cubejs-cubestore-driver/src/CubeStoreDriver.ts @@ -245,6 +245,8 @@ export class CubeStoreDriver extends BaseDriver implements DriverInterface { if (tableData.rowStream) { await this.importStream(columns, tableData, table, indexes, aggregations, queryTracingObj); + } else if (tableData.parquetFile) { + await this.importParquetFile(tableData, table, columns, indexes, aggregations, queryTracingObj); } else if (tableData.csvFile) { await this.importCsvFile(tableData, table, columns, indexes, aggregations, queryTracingObj); } else if (tableData.streamingSource) { @@ -295,6 +297,14 @@ export class CubeStoreDriver extends BaseDriver implements DriverInterface { } } + private async importParquetFile(tableData: any, table: string, columns: Column[], indexes: any, aggregations: any, queryTracingObj?: any) { + const files = Array.isArray(tableData.parquetFile) ? tableData.parquetFile : [tableData.parquetFile]; + const columnNames = columns.map(c => `${this.quoteIdentifier(c.name)} ${this.fromGenericType(c.type)}`).join(', '); + const locationsClause = files.map((f: string) => `'${f}'`).join(', '); + const createTableSql = `CREATE TABLE ${table} (${columnNames}) ${indexes} ${aggregations} WITH (input_format = 'parquet') LOCATION ${locationsClause}`; + await this.query(createTableSql, [], queryTracingObj); + } + private async importCsvFile(tableData: DownloadTableCSVData, table: string, columns: Column[], indexes: any, aggregations: any, queryTracingObj?: any) { if (!columns || columns.length === 0) { throw new Error('Unable to import (as csv) in Cube Store: empty columns. Most probably, introspection has failed.'); diff --git a/rust/cubestore/cubestore/src/import/mod.rs b/rust/cubestore/cubestore/src/import/mod.rs index c5bef785573f8..469e60b0ec6ab 100644 --- a/rust/cubestore/cubestore/src/import/mod.rs +++ b/rust/cubestore/cubestore/src/import/mod.rs @@ -77,6 +77,7 @@ impl ImportFormat { ImportFormat::CSVOptions { delimiter, quote, .. } => (delimiter.unwrap_or(','), quote.is_none()), + ImportFormat::Parquet => unreachable!("Parquet cannot reach CSV delimiter match"), }; let lines_stream: Pin> + Send>> = @@ -169,6 +170,13 @@ impl ImportFormat { }); Ok(rows.boxed()) } + ImportFormat::Parquet => { + // Unreachable: do_import dispatches Parquet before row_stream is called. + // Required for exhaustive match. + Err(CubeError::internal( + "row_stream called for Parquet format — this should never happen".to_string(), + )) + } } } @@ -587,15 +595,89 @@ impl ImportServiceImpl { self.meta_store .update_location_download_size(table_id, location.to_string(), size as u64) .await?; - Ok((temp_file, None)) - } else { - Ok(( - File::open(location).await.map_err(|e| { - CubeError::internal(format!("Open location {}: {}", location, e)) - })?, - None, - )) + Ok((temp_file, None)) } else if location.starts_with("gs://") { + // GCS object-store URL: download to a local temp file using + // the Google Cloud Storage JSON API with WIF/ADC credentials. + use tokio::fs::File as TokioFile; + use tokio::io::AsyncWriteExt; + + // Parse gs://bucket/object into GCS JSON API URL + let without_scheme = location.trim_start_matches("gs://"); + let slash_pos = without_scheme.find('/').ok_or_else(|| { + CubeError::internal(format!("Invalid gs:// URL (no slash after bucket): {}", location)) + })?; + let bucket = &without_scheme[..slash_pos]; + let object = &without_scheme[slash_pos + 1..]; + let encoded_object = object.replace('/', "%2F"); + let gcs_url = format!( + "https://storage.googleapis.com/storage/v1/b/{}/o/{}?alt=media", + bucket, encoded_object + ); + + // Get an ADC access token via the GKE metadata server + let token_resp = reqwest::Client::new() + .get("http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/default/token") + .header("Metadata-Flavor", "Google") + .send() + .await + .map_err(|e| CubeError::internal(format!("GCS token fetch error for {}: {}", location, e)))?; + + let token_json: serde_json::Value = token_resp.json().await + .map_err(|e| CubeError::internal(format!("GCS token parse error for {}: {}", location, e)))?; + let access_token = token_json["access_token"] + .as_str() + .ok_or_else(|| CubeError::internal(format!("No access_token in metadata response for {}", location)))? + .to_string(); + + let tmp = tempfile::NamedTempFile::new() + .map_err(|e| CubeError::internal(format!("GCS temp file create error for {}: {}", location, e)))?; + let tmp_path = tmp.into_temp_path(); + let tmp_path_buf = tmp_path.to_path_buf(); + + let mut response = reqwest::Client::new() + .get(&gcs_url) + .bearer_auth(&access_token) + .send() + .await + .map_err(|e| CubeError::internal(format!("GCS download error for {}: {}", location, e)))?; + + if !response.status().is_success() { + let status = response.status(); + let body = response.text().await.unwrap_or_default(); + return Err(CubeError::internal(format!( + "GCS download HTTP {} for {}: {}", status, location, body + ))); } + + { + let mut out = TokioFile::create(&tmp_path_buf).await + .map_err(|e| CubeError::internal(format!("GCS temp write error for {}: {}", location, e)))?; + while let Some(chunk) = response.chunk().await + .map_err(|e| CubeError::internal(format!("GCS chunk error for {}: {}", location, e)))? { + out.write_all(&chunk).await + .map_err(|e| CubeError::internal(format!("GCS write error for {}: {}", location, e)))?; + } + out.flush().await + .map_err(|e| CubeError::internal(format!("GCS flush error for {}: {}", location, e)))?; + } + + let size = tmp_path_buf.metadata().map(|m| m.len()).unwrap_or(0); + self.meta_store + .update_location_download_size(table_id, location.to_string(), size) + .await?; + log::info!("Import downloaded {} ({} bytes) via GCS API", location, size); + + let file = File::open(&tmp_path_buf).await + .map_err(|e| CubeError::internal(format!("Open downloaded GCS file {}: {}", location, e)))?; + Ok((file, Some(tmp_path))) + } else { + Ok(( + File::open(location).await.map_err(|e| { + CubeError::internal(format!("Open location {}: {}", location, e)) + })?, + None, + )) + } } async fn download_http_location( @@ -717,6 +799,14 @@ impl ImportServiceImpl { let (file, tmp_path) = self .resolve_location(location, table.get_id(), &temp_dir) .await?; + + // Parquet: bypass row_stream, read RecordBatches via DataFusion Parquet reader + if let ImportFormat::Parquet = format { + return self + .do_import_parquet(table, location, tmp_path, data_loaded_size) + .await; + } + let mut row_stream = format .row_stream( file, @@ -766,6 +856,138 @@ impl ImportServiceImpl { ingestion.wait_completion().await } + /// Import a Parquet file that has been downloaded to `tmp_path`. + /// + /// Uses `datafusion::parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder` + /// (already compiled into CubeStore for internal .chunk.parquet queries). + /// Converts each Arrow `RecordBatch` to CubeStore `Row`s using the same + /// column-type mapping as `ImportFormat::parse_column_value_str`. + async fn do_import_parquet( + &self, + table: &IdRow, + location: &str, + _tmp_path: Option, + data_loaded_size: Option>, + ) -> Result<(), CubeError> { + use datafusion::arrow::array::Array; + use datafusion::parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; + + // Resolve the local path from tmp_path or re-download. + // resolve_location already downloaded the file; the local path is + // stored in tmp_path. We need the string path. + let local_path = match &_tmp_path { + Some(p) => p.to_path_buf(), + None => { + return Err(CubeError::internal(format!( + "Parquet import: no local temp path for location '{}'", + location + ))); + } + }; + + let std_file = std::fs::File::open(&local_path).map_err(|e| { + CubeError::internal(format!( + "Parquet import: cannot open '{}': {}", + local_path.display(), + e + )) + })?; + + let builder = + ParquetRecordBatchReaderBuilder::try_new(std_file).map_err(|e| { + CubeError::internal(format!( + "Parquet import: cannot read schema from '{}': {}", + local_path.display(), + e + )) + })?; + + let mut reader = builder.build().map_err(|e| { + CubeError::internal(format!( + "Parquet import: cannot build reader for '{}': {}", + local_path.display(), + e + )) + })?; + + let table_cols = table.get_row().get_columns().clone(); + + let mut ingestion = Ingestion::new( + self.meta_store.clone(), + self.chunk_store.clone(), + self.limits.clone(), + table.clone(), + ); + + let finish = + |builders: Vec>| -> Vec { + builders.into_iter().map(|mut b| b.finish()).collect() + }; + + let mut array_builders = create_array_builders(&table_cols); + let mut num_rows: usize = 0; + + while let Some(batch_result) = reader.next() { + let batch = batch_result.map_err(|e| { + CubeError::internal(format!("Parquet import: error reading batch: {}", e)) + })?; + + for row_idx in 0..batch.num_rows() { + let mut row = vec![TableValue::Null; table_cols.len()]; + + for (col_idx, col) in table_cols.iter().enumerate() { + // Find the matching column in the Parquet schema by name + let parquet_col_idx = batch + .schema() + .fields() + .iter() + .position(|f| f.name() == col.get_name()); + + let val = if let Some(pidx) = parquet_col_idx { + let arr = batch.column(pidx); + if arr.is_null(row_idx) { + TableValue::Null + } else { + // Format the value as a string then use existing + // parse_column_value_str to convert to TableValue. + // This is not the fastest path but it is correct and + // reuses all existing type-parsing logic. + let val_str = arrow_array_value_to_string(arr, row_idx)?; + ImportFormat::parse_column_value_str(col, &val_str)? + } + } else { + TableValue::Null + }; + row[col_idx] = val; + } + + append_row(&mut array_builders, &table_cols, &Row::new(row)); + num_rows += 1; + + if num_rows >= self.config_obj.wal_split_threshold() as usize { + let mut to_add = create_array_builders(&table_cols); + mem::swap(&mut array_builders, &mut to_add); + num_rows = 0; + + let built = finish(to_add); + if let Some(ref dls) = data_loaded_size { + dls.add(columns_vec_buffer_size(&built)); + } + ingestion.queue_data_frame(built).await?; + } + } + } + + mem::drop(_tmp_path); // release temp file + + let built = finish(array_builders); + if let Some(ref dls) = data_loaded_size { + dls.add(columns_vec_buffer_size(&built)); + } + ingestion.queue_data_frame(built).await?; + ingestion.wait_completion().await + } + fn estimate_rows(location: &str, size: Option) -> u64 { if let Some(size) = size { let uncompressed_size = if location.contains(".gz") { @@ -781,6 +1003,137 @@ impl ImportServiceImpl { } } +/// Convert a single element of an Arrow array to a String suitable for +/// `ImportFormat::parse_column_value_str`. +/// +/// Covers all column types emitted by the BigQuery Parquet exporter. +fn arrow_array_value_to_string( + arr: &dyn datafusion::arrow::array::Array, + idx: usize, +) -> Result { + use datafusion::arrow::array::*; + use datafusion::arrow::datatypes::DataType; + + let s = match arr.data_type() { + DataType::Utf8 => arr + .as_any() + .downcast_ref::() + .map(|a| a.value(idx).to_string()), + DataType::LargeUtf8 => arr + .as_any() + .downcast_ref::() + .map(|a| a.value(idx).to_string()), + DataType::Boolean => arr + .as_any() + .downcast_ref::() + .map(|a| a.value(idx).to_string()), + DataType::Int8 => arr + .as_any() + .downcast_ref::() + .map(|a| a.value(idx).to_string()), + DataType::Int16 => arr + .as_any() + .downcast_ref::() + .map(|a| a.value(idx).to_string()), + DataType::Int32 => arr + .as_any() + .downcast_ref::() + .map(|a| a.value(idx).to_string()), + DataType::Int64 => arr + .as_any() + .downcast_ref::() + .map(|a| a.value(idx).to_string()), + DataType::UInt8 => arr + .as_any() + .downcast_ref::() + .map(|a| a.value(idx).to_string()), + DataType::UInt16 => arr + .as_any() + .downcast_ref::() + .map(|a| a.value(idx).to_string()), + DataType::UInt32 => arr + .as_any() + .downcast_ref::() + .map(|a| a.value(idx).to_string()), + DataType::UInt64 => arr + .as_any() + .downcast_ref::() + .map(|a| a.value(idx).to_string()), + DataType::Float32 => arr + .as_any() + .downcast_ref::() + .map(|a| a.value(idx).to_string()), + DataType::Float64 => arr + .as_any() + .downcast_ref::() + .map(|a| a.value(idx).to_string()), + DataType::Decimal128(_, scale) => arr + .as_any() + .downcast_ref::() + .map(|a| { + let raw = a.value(idx); + let scale = *scale as u32; + let divisor = 10i128.pow(scale); + if divisor == 0 { + raw.to_string() + } else { + format!("{}.{:0>width$}", raw / divisor, (raw % divisor).abs(), width = scale as usize) + } + }), + DataType::Date32 => arr + .as_any() + .downcast_ref::() + .map(|a| { + // Days since epoch → YYYY-MM-DD + let days = a.value(idx) as i64; + let epoch = chrono::NaiveDate::from_ymd_opt(1970, 1, 1).unwrap(); + let date = epoch + chrono::Duration::days(days); + date.format("%Y-%m-%d").to_string() + }), + DataType::Timestamp(unit, _tz) => { + use datafusion::arrow::datatypes::TimeUnit; + let nanos = match unit { + TimeUnit::Second => arr + .as_any() + .downcast_ref::() + .map(|a| a.value(idx) as i128 * 1_000_000_000), + TimeUnit::Millisecond => arr + .as_any() + .downcast_ref::() + .map(|a| a.value(idx) as i128 * 1_000_000), + TimeUnit::Microsecond => arr + .as_any() + .downcast_ref::() + .map(|a| a.value(idx) as i128 * 1_000), + TimeUnit::Nanosecond => arr + .as_any() + .downcast_ref::() + .map(|a| a.value(idx) as i128), + }; + nanos.map(|ns| { + let secs = (ns / 1_000_000_000) as i64; + let nsecs = (ns % 1_000_000_000).abs() as u32; + let dt = chrono::DateTime::::from_timestamp(secs, nsecs) + .unwrap_or_default(); + dt.format("%Y-%m-%d %H:%M:%S%.f").to_string() + }) + } + other => { + return Err(CubeError::user(format!( + "Parquet import: unsupported Arrow type {:?} — cannot convert to string", + other + ))); + } + }; + + s.ok_or_else(|| { + CubeError::internal(format!( + "Parquet import: downcast failed for Arrow type {:?}", + arr.data_type() + )) + }) +} + #[async_trait] impl ImportService for ImportServiceImpl { async fn import_table(&self, table_id: u64) -> Result<(), CubeError> { @@ -923,6 +1276,10 @@ impl LocationHelper { }? } else if location.starts_with("stream://") { None + } else if location.starts_with("gs://") || location.starts_with("s3://") { + // Remote object-store URLs: file hasn't been downloaded yet. + // Return None (unknown size); estimate_rows handles this gracefully. + None } else { Some(tokio::fs::metadata(location).await?.len()) }; diff --git a/rust/cubestore/cubestore/src/metastore/mod.rs b/rust/cubestore/cubestore/src/metastore/mod.rs index b0bc6a6d34576..74773c957a085 100644 --- a/rust/cubestore/cubestore/src/metastore/mod.rs +++ b/rust/cubestore/cubestore/src/metastore/mod.rs @@ -611,6 +611,7 @@ pub enum ImportFormat { quote: Option, has_header: bool, }, + Parquet, } data_frame_from! { diff --git a/rust/cubestore/cubestore/src/sql/mod.rs b/rust/cubestore/cubestore/src/sql/mod.rs index 9243ef2dec664..f1da180ce580c 100644 --- a/rust/cubestore/cubestore/src/sql/mod.rs +++ b/rust/cubestore/cubestore/src/sql/mod.rs @@ -786,6 +786,7 @@ impl SqlService for SqlServiceImpl { match input_format.as_str() { "csv" => Result::Ok(ImportFormat::CSV), "csv_no_header" => Result::Ok(ImportFormat::CSVNoHeader), + "parquet" => Result::Ok(ImportFormat::Parquet), _ => Result::Err(CubeError::user(format!( "Bad input_format {}", value @@ -851,6 +852,8 @@ impl SqlService for SqlServiceImpl { escape, quote, }, + // Parquet ignores delimiter — pass through unchanged + ImportFormat::Parquet => ImportFormat::Parquet, } } let build_range_end = with_options From 2fe8fa6451a38741f6a0fff2a3cf8681ba64f3cc Mon Sep 17 00:00:00 2001 From: Krishna R Maddikara Date: Sun, 15 Mar 2026 22:57:21 -0700 Subject: [PATCH 2/2] fix(bigquery,base-driver): type safety, stale file cleanup, pure Parquet path MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Make csvFile optional in TableCSVData — BigQuery now returns parquetFile only - Add parquetFile?: string[] to TableCSVData interface - Update isDownloadTableCSVData() to recognise parquetFile - Delete stale export files before BQ extract to prevent prefix collision - Remove exportBucketCsvEscapeSymbol from Parquet return (CSV-specific field) --- packages/cubejs-base-driver/src/driver.interface.ts | 10 ++++++++-- packages/cubejs-bigquery-driver/src/BigQueryDriver.ts | 5 ++++- 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/packages/cubejs-base-driver/src/driver.interface.ts b/packages/cubejs-base-driver/src/driver.interface.ts index 5fa35e06b3e7d..7126b0ee2a908 100644 --- a/packages/cubejs-base-driver/src/driver.interface.ts +++ b/packages/cubejs-base-driver/src/driver.interface.ts @@ -61,7 +61,13 @@ export interface TableCSVData extends DownloadTableBase { /** * An array of unloaded CSV data temporary URLs. */ - csvFile: string[]; + csvFile?: string[]; + + /** + * An array of unloaded Parquet data GCS URIs (gs://bucket/object). + * Used when the driver exports as Parquet instead of CSV.gz. + */ + parquetFile?: string[]; /** * Unloaded data fields types. @@ -113,7 +119,7 @@ export function isDownloadTableMemoryData(tableData: any): tableData is TableMem } export function isDownloadTableCSVData(tableData: any): tableData is TableCSVData { - return Boolean(tableData.csvFile); + return Boolean(tableData.csvFile || tableData.parquetFile); } export type DownloadTableData = TableMemoryData | TableCSVData | StreamTableData | StreamingSourceTableData; diff --git a/packages/cubejs-bigquery-driver/src/BigQueryDriver.ts b/packages/cubejs-bigquery-driver/src/BigQueryDriver.ts index 50d9e6e3be3e3..863d38174883b 100644 --- a/packages/cubejs-bigquery-driver/src/BigQueryDriver.ts +++ b/packages/cubejs-bigquery-driver/src/BigQueryDriver.ts @@ -345,6 +345,10 @@ export class BigQueryDriver extends BaseDriver implements DriverInterface { if (!this.bucket) { throw new Error('Unload is not configured'); } + // Delete stale files from any previous export for this table prefix + // to prevent prefix listing from picking up files from failed/concurrent runs. + const [staleFiles] = await this.bucket.getFiles({ prefix: `${table}-` }); + await Promise.all(staleFiles.map(f => f.delete().catch(() => {}))); const destination = this.bucket.file(`${table}-*.parquet`); const [schema, tableName] = table.split('.'); @@ -356,7 +360,6 @@ export class BigQueryDriver extends BaseDriver implements DriverInterface { const urls = files.map(file => `gs://${bucketName}/${file.name}`); return { - exportBucketCsvEscapeSymbol: this.options.exportBucketCsvEscapeSymbol, parquetFile: urls, }; }