Source code for stetl.outputs.execoutput

# Output classes for ETL, executing commands.
#
# Author: Frank Steggink
#
import subprocess
import os
import re
import shutil
from stetl.component import Config
from stetl.output import Output
from stetl.util import Util
from stetl.packet import FORMAT

log = Util.get_log('execoutput')


[docs]class ExecOutput(Output): """ Executes any command (abstract base class). """
[docs] @Config(ptype=str, default='', required=False) def env_args(self): """ Provides of list of environment variables which will be used when executing the given command. Example: env_args = pgpassword=postgres othersetting=value~with~spaces """ pass
[docs] @Config(ptype=str, default='=', required=False) def env_separator(self): """ Provides the separator to split the environment variable names from their values. """ pass
def __init__(self, configdict, section, consumes): Output.__init__(self, configdict, section, consumes) def write(self, packet): return packet def execute_cmd(self, cmd): env_vars = Util.string_to_dict(self.env_args, self.env_separator) old_environ = os.environ.copy() try: os.environ.update(env_vars) log.info("executing cmd=%s" % Util.safe_string_value(cmd)) subprocess.call(cmd, shell=True) log.info("execute done") finally: os.environ = old_environ
[docs]class CommandExecOutput(ExecOutput): """ Executes an arbitrary command. consumes=FORMAT.string """ def __init__(self, configdict, section): ExecOutput.__init__(self, configdict, section, consumes=FORMAT.string) def write(self, packet): if packet.data is not None: self.execute_cmd(packet.data) return packet
[docs]class Ogr2OgrExecOutput(ExecOutput): """ Executes an Ogr2Ogr command. Input is a file name to be processed. Output by calling Ogr2Ogr command. consumes=FORMAT.string """ # Start attribute config meta # Applying Decorator pattern with the Config class to provide # read-only config values from the configured properties.
[docs] @Config(ptype=str, default=None, required=True) def dest_data_source(self): """ String denoting the OGR data destination. Usually a path to a file like "path/rivers.shp" or connection string to PostgreSQL like "PG: host=localhost dbname='rivers' user='postgres'". """ pass
[docs] @Config(ptype=str, default=None, required=False) def dest_format(self): """ Instructs GDAL to use driver by that name to open data destination. Not required for many standard formats that are self-describing like ESRI Shapefile. Examples: 'PostgreSQL', 'GeoJSON' etc """ pass
[docs] @Config(ptype=str, default=None, required=False) def lco(self): """ Options for newly created layer (-lco). """ pass
[docs] @Config(ptype=str, default=None, required=False) def spatial_extent(self): """ Spatial extent (-spat), to pass as xmin ymin xmax ymax """ pass
[docs] @Config(ptype=str, default=None, required=False) def gfs_template(self): """ Name of GFS template file to use during loading. Passed to ogr2ogr as --config GML_GFS_TEMPLATE <name> """ pass
[docs] @Config(ptype=str, default=None, required=False) def options(self): """ Miscellaneous options to pass to ogr2ogr. """ pass
[docs] @Config(ptype=bool, default=False, required=False) def cleanup_input(self): """ Flag to indicate whether the input file to ogr2ogr should be cleaned up. """ pass
[docs] @Config(ptype=bool, default=False, required=False) def always_apply_lco(self): """ Flag to indicate whether the layer creation options should be applied to all runs. """ pass
# End attribute config meta def __init__(self, configdict, section): ExecOutput.__init__(self, configdict, section, consumes=[FORMAT.string, FORMAT.gdal_vsi_path]) self.regex_vsi_filter = re.compile("^/vsi[a-z0-9_]+/.*", re.I) self.ogr2ogr_cmd = 'ogr2ogr -f ' + self.dest_format + ' ' + self.dest_data_source if self.spatial_extent: self.ogr2ogr_cmd += ' -spat ' + self.spatial_extent if self.options: self.ogr2ogr_cmd += ' ' + self.options self.first_run = True def write(self, packet): if packet.data is None: return packet # Execute ogr2ogr ogr2ogr_cmd = self.ogr2ogr_cmd if self.lco and (self.first_run or self.always_apply_lco): ogr2ogr_cmd += ' ' + self.lco self.first_run = False if type(packet.data) is list: for item in packet.data: self.execute(ogr2ogr_cmd, item) else: self.execute(ogr2ogr_cmd, packet.data) return packet def execute(self, ogr2ogr_cmd, file_path): # For creating tables the GFS file needs to be newer than # the .gml file. -lco GML_GFS_TEMPLATE somehow does not work # so we copy the .gfs file each time with the .gml file with # the same base name # Copy the .gfs file if required, use the same base name # so ogr2ogr will pick it up. # Always assemble the GFS path, in case it is provided from outside. # Note that for now using a GFS file is not supported with a VSI filter. file_ext = os.path.splitext(file_path) gfs_path = file_ext[0] + '.gfs' if self.gfs_template and not self.regex_vsi_filter.match(file_path): shutil.copy(self.gfs_template, gfs_path) # Append file name to command as last argument self.execute_cmd(ogr2ogr_cmd + ' ' + file_path) if self.cleanup_input and not self.regex_vsi_filter.match(file_path): os.remove(file_path) if gfs_path and os.path.exists(gfs_path): os.remove(gfs_path)