#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import bisect
import io
import lzma
import math
import multiprocessing.pool
import os
import resource
import sys
import time
from typing import Iterable
import xz
# TODO Add tests for everything
def overrides(parentClass):
"""Simple decorator that checks that a method with the same name exists in the parent class"""
def overrider(method):
assert method.__name__ in dir(parentClass)
assert callable(getattr(parentClass, method.__name__))
return method
return overrider
class LruCache(dict):
def __init__(self, size: int = 10):
self.size = size
self.lastUsed: List[int] = []
def _refresh(self, key):
if key in self.lastUsed:
self.lastUsed.remove(key)
self.lastUsed.append(key)
def __setitem__(self, key, value):
super().__setitem__(key, value)
self._refresh(key)
while super().__len__() > self.size:
super().__delitem__(self.lastUsed.pop(0))
def __getitem__(self, key):
value = super().__getitem__(key)
self._refresh(key)
return value
class Prefetcher:
def __init__(self, memorySize):
self.lastFetched = []
self.memorySize = memorySize
def fetch(self, value):
if value in self.lastFetched:
self.lastFetched.remove(value)
self.lastFetched.append(value)
while len(self.lastFetched) > self.memorySize:
self.lastFetched.pop(0)
def prefetch(self, maximumToPrefetch) -> Iterable:
if not self.lastFetched or maximumToPrefetch <= 0:
return []
consecutiveCount = 0
values = self.lastFetched[::-1]
for i, j in zip(values[0:-1], values[1:]):
if i == j + 1:
consecutiveCount += 1
else:
break
# I want an exponential progression like: logStep**consecutiveCount with the boundary conditions:
# logStep**0 = 1 (mathematically true for any logStep because consecutiveCount was chosen to fit)
# logStep**maxConsecutiveCount = maximumToPrefetch
# => logStep = exp(ln(maximumToPrefetch)/maxConsecutiveCount)
# => logStep**consecutiveCount = exp(ln(maximumToPrefetch) * consecutiveCount/maxConsecutiveCount)
prefetchCount = int(round(math.exp(math.log(maximumToPrefetch) * consecutiveCount / (self.memorySize - 1))))
return range(self.lastFetched[-1] + 1, self.lastFetched[-1] + 1 + prefetchCount)
class ParallelXZReader(io.BufferedIOBase):
# TODO test if a simple thread pool would also parallelize equally well
"""Uses a process pool to prefetch and cache decoded xz blocks"""
def __init__(self, filename, parallelization):
print("Parallelize:", parallelization)
self.parallelization = parallelization - 1 # keep one core for on-demand decompression
self.pool = multiprocessing.pool.Pool(self.parallelization)
self.offset = 0
self.filename = filename
self.fileobj = xz.open(filename, 'rb')
self.blockCache = LruCache(2 * parallelization)
self.prefetcher = Prefetcher(4)
assert self.fileobj.seekable() and self.fileobj.readable()
print(self.fileobj.stream_boundaries)
print(self.fileobj.block_boundaries) # contains uncompressed offsets and therefore sizes -> perfect!
def _findBlock(self, offset: int):
blockNumber = bisect.bisect_right(self.fileobj.block_boundaries, offset)
print("Look for offset:", offset, "found:", blockNumber)
if blockNumber <= 0:
return blockNumber - 1, 0, 0
if blockNumber >= len(self.fileobj.block_boundaries) or blockNumber <= 0:
return blockNumber - 1, offset - self.fileobj.block_boundaries[blockNumber - 1], -1
blockSize = self.fileobj.block_boundaries[blockNumber] - self.fileobj.block_boundaries[blockNumber - 1]
offsetInBlock = offset - self.fileobj.block_boundaries[blockNumber - 1]
assert offsetInBlock >= 0
assert offsetInBlock < blockSize
return blockNumber - 1, offsetInBlock, blockSize
def _blockSize(self, blockNumber):
blockNumber += 1
if blockNumber >= len(self.fileobj.block_boundaries) or blockNumber <= 0:
return -1
return self.fileobj.block_boundaries[blockNumber] - self.fileobj.block_boundaries[blockNumber - 1]
@staticmethod
def _decodeBlock(filename, offset, size):
with xz.open(filename, 'rb') as file:
file.seek(offset)
return file.read(size)
def __enter__(self):
return self
def __exit__(self, exception_type, exception_value, exception_traceback):
self.close()
@overrides(io.BufferedIOBase)
def close(self) -> None:
self.fileobj.close()
self.pool.close()
@overrides(io.BufferedIOBase)
def fileno(self) -> int:
# This is a virtual Python level file object and therefore does not have a valid OS file descriptor!
raise io.UnsupportedOperation()
@overrides(io.BufferedIOBase)
def seekable(self) -> bool:
return True
@overrides(io.BufferedIOBase)
def readable(self) -> bool:
return True
@overrides(io.BufferedIOBase)
def writable(self) -> bool:
return False
@overrides(io.BufferedIOBase)
def read(self, size: int = -1) -> bytes:
print("\nread", size, "from", self.offset)
result = bytes()
blocks = []
blockNumber, firstBlockOffset, blockSize = self._findBlock(self.offset)
print("Found block:", blockNumber, blockSize, firstBlockOffset)
if blockNumber >= len(self.fileobj.block_boundaries) or blockNumber < 0:
return result
pendingBlocks = sum(not block.ready() for block in self.blockCache.values())
availableSize = blockSize - firstBlockOffset
while True:
# Fetch Block
self.prefetcher.fetch(blockNumber)
if blockNumber in self.blockCache:
fetchedBlock = self.blockCache[blockNumber]
else:
print("fetch block:", blockNumber, "sized", self._blockSize(blockNumber))
fetchedBlock = self.pool.apply_async(
ParallelXZReader._decodeBlock,
(self.filename, self.fileobj.block_boundaries[blockNumber], self._blockSize(blockNumber)),
)
self.blockCache[blockNumber] = fetchedBlock
pendingBlocks += 1
blocks.append(fetchedBlock)
if size <= availableSize or blockSize == -1:
break
size -= availableSize
self.offset += availableSize
# Get metadata for next block
blockNumber += 1
if blockNumber >= len(self.fileobj.block_boundaries):
break
blockSize = self._blockSize(blockNumber)
offsetInBlock = self.offset - self.fileobj.block_boundaries[blockNumber - 1]
availableSize = blockSize - offsetInBlock
# TODO apply prefetch suggestion
maxToPrefetch = self.parallelization - pendingBlocks
toPrefetch = self.prefetcher.prefetch(self.parallelization)
print("Prefetch suggestion:", toPrefetch)
for blockNumber in toPrefetch:
if blockNumber < len(self.fileobj.block_boundaries) and blockNumber not in self.blockCache:
fetchedBlock = self.pool.apply_async(
ParallelXZReader._decodeBlock,
(self.filename, self.fileobj.block_boundaries[blockNumber], self._blockSize(blockNumber)),
)
self.blockCache[blockNumber] = fetchedBlock
pendingBlocks += 1
print("pending blocks:", pendingBlocks)
print("Got blocks:", blocks)
while blocks:
block = blocks.pop(0)
# Note that it is perfectly safe to call AsyncResult.get multiple times!
toAppend = block.get()
print(f"Append view ({firstBlockOffset},{ size}) of block of length {len(toAppend)}")
if firstBlockOffset > 0:
toAppend = toAppend[firstBlockOffset:]
if not blocks:
toAppend = toAppend[:size]
firstBlockOffset = 0
result += toAppend
if blockNumber == 21:
print("Result:", len(result))
# TODO fall back to reading directly from fileobj if prefetch suggests nothing at all to improve latency!
# self.fileobj.seek(self.offset)
# result = self.fileobj.read(size)
self.offset += len(result)
return result
@overrides(io.BufferedIOBase)
def seek(self, offset: int, whence: int = io.SEEK_SET) -> int:
if whence == io.SEEK_CUR:
self.offset += offset
elif whence == io.SEEK_END:
self.offset = self.cumsizes[-1] + offset
elif whence == io.SEEK_SET:
self.offset = offset
if self.offset < 0:
raise ValueError("Trying to seek before the start of the file!")
if self.offset >= self.cumsizes[-1]:
return self.offset
return self.offset
@overrides(io.BufferedIOBase)
def tell(self) -> int:
return self.offset
Xz is pretty slow compared to other compression formats. It would be really cool if python-xz could be parallelized such that it prefetches the next blocks and decodes them in parallel. I think this would be a helpful feature and unique selling point for python-xz. I don't think there is a parallelized XZ decoder for Python at all, or is there?
I'm doing something similar in
indexed_bzip2. But, I am aware that this adds complexity and problems:xzcan compress in parallel, so maybe that could also be possible.I implemented a very rudimentary sketch on top of python-xz using
multiprocessing.pool.Pool. It has the same design asindexed_bzip2, which is:With this, I was able to speed up the decompression of a 3.1GiB xz file (decompressed 4GiB) consisting of 171 blocks by factor ~7 on an 8-core CPU (16 virtual cores):
Hower, at this point I'm becoming uncertain whether this might be easier to implement inside python-xz itself or whether the wrapper is a sufficient ad-hoc solution. It only uses public methods and members of XZFile, so it should be stable during non-major version changes.
Rudimentary unfinished sketch / proof of work:
decompress-xz-parallel.py
Click to expand
parallel_xz_decoder.py
Click to expand
Manual Shell Execution