Source code for stetl.filters.zipfileextractor

# Extracts a file from a ZIP file, and saves it as the given file name.
#
# Author: Frank Steggink
#
from stetl.component import Config
from stetl.filter import Filter
from stetl.util import Util
from stetl.packet import FORMAT

log = Util.get_log('zipfileextractor')

BUFFER_SIZE = 1024 * 1024 * 1024


[docs]class ZipFileExtractor(Filter): """ Extracts a file from a ZIP file, and saves it as the given file name. consumes=FORMAT.record, produces=FORMAT.string """ # Start attribute config meta
[docs] @Config(ptype=str, default=None, required=True) def file_path(self): """ File name to write the extracted file to. """ pass
[docs] @Config(ptype=bool, default=True, required=False) def delete_file(self): """ Delete the file when the chain has been completed? """ pass
# End attribute config meta # Constructor def __init__(self, configdict, section): Filter.__init__(self, configdict, section, consumes=FORMAT.record, produces=FORMAT.string) self.cur_file_path = self.cfg.get('file_path')
[docs] def invoke(self, packet): if packet.data is None: log.info("No file name given") return packet import zipfile with zipfile.ZipFile(packet.data['file_path']) as z: with open(self.cur_file_path, 'wb') as f: with z.open(packet.data['name']) as zf: while True: buffer = zf.read(BUFFER_SIZE) if not buffer: break f.write(buffer) packet.data = self.cur_file_path return packet
[docs] def after_chain_invoke(self, packet): import os.path if os.path.isfile(self.cur_file_path) and self.delete_file: os.remove(self.cur_file_path) return True