Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 35 additions & 10 deletions src/main/scala/com/databricks/spark/redshift/RedshiftWriter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -147,19 +147,41 @@ private[redshift] class RedshiftWriter(
} catch {
case e: SQLException =>
log.error("SQLException thrown while running COPY query; will attempt to retrieve " +
"more information by querying the STL_LOAD_ERRORS table", e)
// Try to query Redshift's STL_LOAD_ERRORS table to figure out why the load failed.
// See http://docs.aws.amazon.com/redshift/latest/dg/r_STL_LOAD_ERRORS.html for details.
"more information by querying the load error tables", e)
// Try to query Redshift's error tables to figure out why the load failed.
conn.rollback()
val errorLookupQuery =
"""
| SELECT *
| FROM stl_load_errors
| WHERE query = pg_last_query_id()
""".stripMargin

val detailedException: Option[SQLException] = try {
// Check if the modern sys_load_error_detail view exists (Serverless / newer Provisioned)
val tableCheckQuery = "SELECT 1 FROM pg_catalog.pg_class WHERE relname = 'sys_load_error_detail'"
val checkRs = jdbcWrapper.executeQueryInterruptibly(conn.prepareStatement(tableCheckQuery))
val hasSysView = checkRs.next()

// Build the query and alias the modern columns to match the legacy extraction logic
val errorLookupQuery = if (hasSysView) {
"""
| SELECT
| error_code AS err_code,
| error_message AS err_reason,
| column_length AS col_length,
| column_name AS colname,
| column_type AS type,
| raw_line,
| raw_field_value
| FROM sys_load_error_detail
| WHERE query_id = pg_last_query_id()
""".stripMargin
} else {
"""
| SELECT *
| FROM stl_load_errors
| WHERE query = pg_last_query_id()
""".stripMargin
}

val results =
jdbcWrapper.executeQueryInterruptibly(conn.prepareStatement(errorLookupQuery))

if (results.next()) {
val errCode = results.getInt("err_code")
val errReason = results.getString("err_reason").trim
Expand All @@ -169,6 +191,7 @@ private[redshift] class RedshiftWriter(
.filter(_.nonEmpty)
.map(n => s"($n)")
.getOrElse("")

val exceptionMessage =
s"""
|Error (code $errCode) while loading data into Redshift: "$errReason"
Expand All @@ -178,15 +201,17 @@ private[redshift] class RedshiftWriter(
|Raw line: ${results.getString("raw_line")}
|Raw field value: ${results.getString("raw_field_value")}
""".stripMargin

Some(new SQLException(exceptionMessage, e))
} else {
None
}
} catch {
case NonFatal(e2) =>
log.error("Error occurred while querying STL_LOAD_ERRORS", e2)
log.error("Error occurred while querying load error tables", e2)
None
}

throw detailedException.getOrElse(e)
}
}
Expand Down