Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions packages/cubejs-base-driver/src/driver.interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,13 @@ export interface TableCSVData extends DownloadTableBase {
/**
* An array of unloaded CSV data temporary URLs.
*/
csvFile: string[];
csvFile?: string[];

/**
* An array of unloaded Parquet data GCS URIs (gs://bucket/object).
* Used when the driver exports as Parquet instead of CSV.gz.
*/
parquetFile?: string[];

/**
* Unloaded data fields types.
Expand Down Expand Up @@ -113,7 +119,7 @@ export function isDownloadTableMemoryData(tableData: any): tableData is TableMem
}

export function isDownloadTableCSVData(tableData: any): tableData is TableCSVData {
return Boolean(tableData.csvFile);
return Boolean(tableData.csvFile || tableData.parquetFile);
}

export type DownloadTableData = TableMemoryData | TableCSVData | StreamTableData | StreamingSourceTableData;
Expand Down
47 changes: 20 additions & 27 deletions packages/cubejs-bigquery-driver/src/BigQueryDriver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -342,34 +342,27 @@ export class BigQueryDriver extends BaseDriver implements DriverInterface {
}

public async unload(table: string): Promise<TableCSVData> {
if (!this.bucket) {
throw new Error('Unload is not configured');
}

const destination = this.bucket.file(`${table}-*.csv.gz`);
const [schema, tableName] = table.split('.');
const bigQueryTable = this.bigquery.dataset(schema).table(tableName);
const [job] = await bigQueryTable.createExtractJob(destination, { format: 'CSV', gzip: true });
await this.waitForJobResult(job, { table }, false);
// There is an implementation for extracting and signing urls from S3
// @see BaseDriver->extractUnloadedFilesFromS3()
// Please use that if you need. Here is a different flow
// because bigquery requires storage/bucket object for other things,
// and there is no need to initiate another one (created in extractUnloadedFilesFromS3()).
const [files] = await this.bucket.getFiles({ prefix: `${table}-` });
const urls = await Promise.all(files.map(async file => {
const [url] = await file.getSignedUrl({
action: 'read',
expires: new Date(new Date().getTime() + 60 * 60 * 1000),
});
return url;
}));

return {
exportBucketCsvEscapeSymbol: this.options.exportBucketCsvEscapeSymbol,
csvFile: urls,
};
if (!this.bucket) {
throw new Error('Unload is not configured');
}
// Delete stale files from any previous export for this table prefix
// to prevent prefix listing from picking up files from failed/concurrent runs.
const [staleFiles] = await this.bucket.getFiles({ prefix: `${table}-` });
await Promise.all(staleFiles.map(f => f.delete().catch(() => {})));

const destination = this.bucket.file(`${table}-*.parquet`);
const [schema, tableName] = table.split('.');
const bigQueryTable = this.bigquery.dataset(schema).table(tableName);
const [job] = await bigQueryTable.createExtractJob(destination, { format: 'PARQUET' });
await this.waitForJobResult(job, { table }, false);
const [files] = await this.bucket.getFiles({ prefix: `${table}-` });
const bucketName = this.bucket.name;
const urls = files.map(file => `gs://${bucketName}/${file.name}`);

return {
parquetFile: urls,
};
}

public async loadPreAggregationIntoTable(
preAggregationTableName: string,
Expand Down
10 changes: 10 additions & 0 deletions packages/cubejs-cubestore-driver/src/CubeStoreDriver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,8 @@ export class CubeStoreDriver extends BaseDriver implements DriverInterface {

if (tableData.rowStream) {
await this.importStream(columns, tableData, table, indexes, aggregations, queryTracingObj);
} else if (tableData.parquetFile) {
await this.importParquetFile(tableData, table, columns, indexes, aggregations, queryTracingObj);
} else if (tableData.csvFile) {
await this.importCsvFile(tableData, table, columns, indexes, aggregations, queryTracingObj);
} else if (tableData.streamingSource) {
Expand Down Expand Up @@ -295,6 +297,14 @@ export class CubeStoreDriver extends BaseDriver implements DriverInterface {
}
}

private async importParquetFile(tableData: any, table: string, columns: Column[], indexes: any, aggregations: any, queryTracingObj?: any) {
const files = Array.isArray(tableData.parquetFile) ? tableData.parquetFile : [tableData.parquetFile];
const columnNames = columns.map(c => `${this.quoteIdentifier(c.name)} ${this.fromGenericType(c.type)}`).join(', ');
const locationsClause = files.map((f: string) => `'${f}'`).join(', ');
const createTableSql = `CREATE TABLE ${table} (${columnNames}) ${indexes} ${aggregations} WITH (input_format = 'parquet') LOCATION ${locationsClause}`;
await this.query(createTableSql, [], queryTracingObj);
}

private async importCsvFile(tableData: DownloadTableCSVData, table: string, columns: Column[], indexes: any, aggregations: any, queryTracingObj?: any) {
if (!columns || columns.length === 0) {
throw new Error('Unable to import (as csv) in Cube Store: empty columns. Most probably, introspection has failed.');
Expand Down
Loading