Skip to content

Comments

[AURON #2032] Fix thread-safety issues in UnifflePartitionWriter synchronization.#2033

Open
slfan1989 wants to merge 1 commit intoapache:masterfrom
slfan1989:auron-2032
Open

[AURON #2032] Fix thread-safety issues in UnifflePartitionWriter synchronization.#2033
slfan1989 wants to merge 1 commit intoapache:masterfrom
slfan1989:auron-2032

Conversation

@slfan1989
Copy link
Contributor

Which issue does this PR close?

Closes #2032

Rationale for this change

Split synchronization blocks in write() method:

override def write(partitionId: Int, buffer: ByteBuffer): Unit = {
    val bytes = new Array[Byte](buffer.limit())
    buffer.get(bytes)
    val bytesWritten = bytes.length

    val bufferManager = rssShuffleWriter.getBufferManager
    val shuffleBlockInfos = rssShuffleWriter.synchronized {
      bufferManager.addPartitionData(partitionId, bytes)
    }
    if (shuffleBlockInfos != null && !shuffleBlockInfos.isEmpty) {
      // synchronized
      rssShuffleWriter.synchronized {
        rssShuffleWriterPushBlocksMethod.invoke(rssShuffleWriter, shuffleBlockInfos)
      }
    }
    metrics.incBytesWritten(bytesWritten)
    mapStatusLengths(partitionId) += bytesWritten
  }

Inconsistent locking in close() method

override def close(success: Boolean): Unit = {
    val start = System.currentTimeMillis()
    val bufferManager = rssShuffleWriter.getBufferManager
    val restBlocks = bufferManager.clear()
    if (success && restBlocks != null && !restBlocks.isEmpty) {
      // non-synchronized
      rssShuffleWriterPushBlocksMethod.invoke(rssShuffleWriter, restBlocks)
    }
    val writeDurationMs = bufferManager.getWriteTime + (System.currentTimeMillis() - start)
    metrics.incWriteTime(TimeUnit.MILLISECONDS.toNanos(writeDurationMs))
  }

What changes are included in this PR?

Solution

override def close(success: Boolean): Unit = {
    val start = System.currentTimeMillis()
    val bufferManager = rssShuffleWriter.getBufferManager
    val restBlocks = bufferManager.clear()
    if (success && restBlocks != null && !restBlocks.isEmpty) {
      // synchronized
      rssShuffleWriter.synchronized {
        rssShuffleWriterPushBlocksMethod.invoke(rssShuffleWriter, restBlocks)
      }
    }
    val writeDurationMs = bufferManager.getWriteTime + (System.currentTimeMillis() - start)
    metrics.incWriteTime(TimeUnit.MILLISECONDS.toNanos(writeDurationMs))
  }

Are there any user-facing changes?

No.

How was this patch tested?

Exists Unit Test.

…r synchronization.

Signed-off-by: slfan1989 <slfan1989@apache.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Fix thread-safety issues in UnifflePartitionWriter synchronization

1 participant