Source code for stetl.filters.fileextractor
# Extracts a file from and archive file like a .zip,
# and saves it as the given file name.
#
# Author: Just van den Broecke (generic and VsiFileExtractor)
# Author: Frank Steggink (ZipFileExtractor)
#
import os.path
from stetl.component import Config
from stetl.filter import Filter
from stetl.util import Util
from stetl.packet import FORMAT
log = Util.get_log('fileextractor')
DEFAULT_BUFFER_SIZE = 1024 * 1024 * 1024
[docs]class FileExtractor(Filter):
"""
Abstract Base Class.
Extracts a file an archive and saves as the configured file name.
consumes=FORMAT.any, produces=FORMAT.string
"""
# Start attribute config meta
[docs] @Config(ptype=str, default=None, required=True)
def file_path(self):
"""
File name to write the extracted file to.
"""
pass
[docs] @Config(ptype=bool, default=True, required=False)
def delete_file(self):
"""
Delete the file when the chain has been completed?
"""
pass
[docs] @Config(ptype=int, default=DEFAULT_BUFFER_SIZE, required=False)
def buffer_size(self):
"""
Buffer size for read buffer during extraction.
"""
pass
# End attribute config meta
# Constructor
def __init__(self, configdict, section, consumes=FORMAT.any, produces=FORMAT.string):
Filter.__init__(self, configdict, section, consumes=consumes, produces=produces)
def delete_target_file(self):
if os.path.isfile(self.file_path):
os.remove(self.file_path)
def extract_file(self, packet):
log.error('Only classes derived from FileExtractor can be used!')
[docs] def invoke(self, packet):
if packet.data is None:
log.info("Input data is empty")
return packet
# Optionally remove old file
self.delete_target_file()
self.extract_file(packet)
packet.data = self.file_path
if not os.path.isfile(self.file_path):
log.warn('Extracted file {} does not exist'.format(self.file_path))
packet.data = None
return packet
[docs] def after_chain_invoke(self, packet):
if not self.delete_file:
return
self.delete_target_file()
return True
[docs]class ZipFileExtractor(FileExtractor):
"""
Extracts a file from a ZIP file, and saves it as the given file name.
Author: Frank Steggink
consumes=FORMAT.record, produces=FORMAT.string
"""
def __init__(self, configdict, section):
FileExtractor.__init__(self, configdict, section, consumes=FORMAT.record)
def extract_file(self, packet):
import zipfile
with zipfile.ZipFile(packet.data['file_path']) as z:
with open(self.file_path, 'wb') as f:
with z.open(packet.data['name']) as zf:
while True:
buffer = zf.read(self.buffer_size)
if not buffer:
break
f.write(buffer)
[docs]class VsiFileExtractor(FileExtractor):
"""
Extracts a file from a GDAL /vsi path spec, and saves it as the given file name.
Example paths:
/vsizip/{/project/nlextract/data/BAG-2.0/BAGNLDL-08112020.zip}/9999STA08112020.zip'
/vsizip/{/vsizip/{BAGGEM0221L-15022021.zip}/GEM-WPL-RELATIE-15022021.zip}/GEM-WPL-RELATIE-15022021-000001.xml
See also stetl.inputs.fileinput.VsiZipFileInput that generates these paths.
Author: Just van den Broecke
consumes=FORMAT.gdal_vsi_path, produces=FORMAT.string
"""
def __init__(self, configdict, section):
FileExtractor.__init__(self, configdict, section, consumes=FORMAT.gdal_vsi_path)
def extract_file(self, packet):
from stetl.util import gdal
# Example input path can be as complex as this:
#
vsi_file_path = packet.data
vsi = None
vsi_len = 0
try:
# gdal.VSIF does not support 'with' so old-school open/close.
log.info('Extracting {}'.format(vsi_file_path))
vsi = gdal.VSIFOpenL(vsi_file_path, 'rb')
with open(self.file_path, 'wb') as f:
gdal.VSIFSeekL(vsi, 0, 2)
vsi_len = gdal.VSIFTellL(vsi)
gdal.VSIFSeekL(vsi, 0, 0)
read_size = self.buffer_size
if vsi_len < read_size:
read_size = vsi_len
while True:
buffer = gdal.VSIFReadL(1, read_size, vsi)
if not buffer or len(buffer) == 0:
break
f.write(buffer)
except Exception as e:
log.error('Cannot extract {} err={}'.format(vsi_file_path, str(e)))
raise e
finally:
if vsi:
log.info('Extracted {} ok len={} bytes'.format(vsi_file_path, vsi_len))
gdal.VSIFCloseL(vsi)