Source code for stetl.filters.formatconverter

#!/usr/bin/env python
#
# Converts Stetl Packet FORMATs. This can be used to connect
# Stetl components with different output/input formats.
#
# Author:Just van den Broecke

import json
from stetl.component import Config
from stetl.util import Util, etree
from stetl.filter import Filter
from stetl.packet import FORMAT

log = Util.get_log("formatconverter")


[docs] class FormatConverter(Filter): """ Converts (almost) any packet format (if converter available). consumes=FORMAT.any, produces=FORMAT.any but actual formats are changed at initialization based on the input to output format to be converted via the input_format and output_format config parameters. """ # Start attribute config meta # Applying Decorator pattern with the Config class to provide # read-only config values from the configured properties.
[docs] @Config(ptype=dict, default=None, required=False) def converter_args(self): """ Custom converter-specific arguments. """ pass
# End attribute config meta # Constructor def __init__(self, configdict, section): Filter.__init__(self, configdict, section, consumes=FORMAT.any, produces=FORMAT.any) self.converter = None
[docs] def init(self): if self.output_format == FORMAT.any: # Any as output is always valid, do nothing self.converter = FormatConverter.no_op return # generate runtime error as we may have registered converters at init time... if self.input_format not in FORMAT_CONVERTERS.keys(): raise NotImplementedError('No format converters found for input format %s' % self.input_format) # ASSERTION: converters present for input_format if self.output_format not in FORMAT_CONVERTERS[self.input_format].keys(): raise NotImplementedError('No format converters found for input format %s to output format %s' % ( self.input_format, self.output_format)) # ASSERTION: converters present for input_format and output_format # Lookup and assign the active converter self.converter = FORMAT_CONVERTERS[self.input_format][self.output_format] # OGR feature def self.feat_def = None
[docs] def invoke(self, packet): if packet.data is None: packet.format = self.output_format return packet if self.converter_args is not None: self.converter(packet, self.converter_args) else: self.converter(packet) packet.format = self.output_format return packet
@staticmethod def add_converter(input_format, output_format, converter_fun): # Add to existing input format converters or create new if input_format not in FORMAT_CONVERTERS.keys(): FORMAT_CONVERTERS[input_format] = {output_format: converter_fun} else: FORMAT_CONVERTERS[input_format][output_format] = converter_fun @staticmethod def no_op(packet): return packet # START etree_doc
[docs] @staticmethod def etree_doc2geojson_collection(packet, converter_args=None): """ Use converter_args to determine XML tag names for features and GeoJSON feature id. For example converter_args = { 'root_tag': 'FeatureCollection', 'feature_tag': 'featureMember', 'feature_id_attr': 'fid' } :param packet: :param converter_args: :return: """ packet.data = packet.data.getroot() packet = FormatConverter.etree_elem2struct(packet) feature_coll = {'type': 'FeatureCollection', 'features': []} root_tag = 'FeatureCollection' feature_tag = 'featureMember' if converter_args: root_tag = converter_args['root_tag'] feature_tag = converter_args['feature_tag'] features = packet.data[root_tag][feature_tag] for feature in features: packet.data = feature packet = FormatConverter.struct2geojson_feature(packet, converter_args) feature_coll['features'].append(packet.data) packet.data = feature_coll return packet
@staticmethod def etree_doc2string(packet): packet.data = etree.tostring(packet.data, pretty_print=True, xml_declaration=True) return packet
[docs] @staticmethod def etree_doc2struct(packet, strip_space=True, strip_ns=True, sub=False, attr_prefix='', gml2ogr=True, ogr2json=True): """ :param packet: :param strip_space: :param strip_ns: :param sub: :param attr_prefix: :param gml2ogr: :param ogr2json: :return: """ packet.data = packet.data.getroot() return FormatConverter.etree_elem2struct(packet, strip_space, strip_ns, sub, attr_prefix, gml2ogr, ogr2json)
# END etree_doc # START etree_elem
[docs] @staticmethod def etree_elem2geojson_feature(packet, converter_args=None): """ """ packet = FormatConverter.etree_elem2struct(packet, converter_args) packet = FormatConverter.struct2geojson_feature(packet, converter_args) return packet
[docs] @staticmethod def etree_elem2struct(packet, strip_space=True, strip_ns=True, sub=False, attr_prefix='', gml2ogr=True, ogr2json=True): """ :param packet: :param strip_space: :param strip_ns: :param sub: :param attr_prefix: :param gml2ogr: :param ogr2json: :return: """ packet.data = Util.elem_to_dict(packet.data, strip_space, strip_ns, sub, attr_prefix, gml2ogr, ogr2json) return packet
# END etree_elem # START geojson_feature @staticmethod def geojson_feature2ogr_feature(packet, converter_args=None): from stetl.util import ogr # str = json.dumps(packet.data) json_feat = packet.data json_geom = json_feat["geometry"] json_props = json_feat["properties"] # Create OGR Geometry from GeoJSON geom-fields geom_dict = dict() geom_dict["type"] = json_geom["type"] geom_dict["coordinates"] = json_geom["coordinates"] geom_str = json.dumps(geom_dict) ogr_geom = ogr.CreateGeometryFromJson(geom_str) # Once: create OGR Feature definition # TODO: assume all string-fields for now, may use type-mapping definition in converter_args comp = packet.component if comp.feat_def is None: comp.feat_def = ogr.FeatureDefn() field_def = ogr.FieldDefn("id", ogr.OFTString) comp.feat_def.AddFieldDefn(field_def) for field_name in json_props: field_def = ogr.FieldDefn(field_name, ogr.OFTString) comp.feat_def.AddFieldDefn(field_def) ogr_geom_type = ogr_geom.GetGeometryType() comp.feat_def.SetGeomType(ogr_geom_type) # Create and populate Feature with id, geom and attributes feature = ogr.Feature(comp.feat_def) json_id = json_feat["id"] feature.SetField("id", json_id) feature.SetGeometry(ogr_geom) for field_name in json_props: field_value = str(json_props[field_name]) # print("id=%s k=%s v=%s" % (json_id, field_name, field_value)) feature.SetField(field_name, field_value) packet.data = feature return packet # END geojson_feature # START geojson_collection @staticmethod def geojson_coll2ogr_feature_arr(packet, converter_args=None): json_feat_arr = packet.data["features"] ogr_feat_arr = list() for feat in json_feat_arr: packet.data = feat packet = FormatConverter.geojson_feature2ogr_feature(packet) ogr_feat_arr.append(packet.data) packet.data = ogr_feat_arr return packet # END geojson_collection # START gdal_vsi_path @staticmethod def gdal_vsi_path2etree_doc(packet, converter_args=None): from stetl.util import gdal import re # Example input path: # /vsizip/{/vsizip/{BAGGEM0221L-15022021.zip}/GEM-WPL-RELATIE-15022021.zip}/GEM-WPL-RELATIE-15022021-000001.xml vsi_file_path = packet.data vsi_file = gdal.VSIFOpenL(vsi_file_path, 'rb') gdal.VSIFSeekL(vsi_file, 0, 2) vsileng = gdal.VSIFTellL(vsi_file) gdal.VSIFSeekL(vsi_file, 0, 0) # read the XML as string (or bytearray) xml_str = gdal.VSIFReadL(1, vsileng, vsi_file) # Type is GDAL-version dependent, may be bytes-like if type(xml_str) in [bytearray, bytes]: xml_str = xml_str.decode('utf-8') # Need to strip the XML header to avoid XML parse error xml_str = re.sub(r'<\?xml.*?\?>', '', xml_str) packet.data = etree.fromstring(xml_str) return packet # END gdal_vsi_path # START ogr_feature @staticmethod def ogr_feature2struct(packet, converter_args=None): s = packet.data.ExportToJson() import ast # http://stackoverflow.com/questions/988228/converting-a-string-to-dictionary # ast.literal_eval("{'muffin' : 'lolz', 'foo' : 'kitty'}") packet.data = ast.literal_eval(s) return packet # END ogr_feature @staticmethod def ogr_feature_arr2geojson_coll(packet, converter_args=None): # See http://geojson.org/geojson-spec.html geojson_coll = {'type': 'FeatureCollection', 'features': []} import ast for feature in packet.data: geojson_coll['features'].append(ast.literal_eval(feature.ExportToJson())) packet.data = geojson_coll return packet @staticmethod def record2struct(packet, converter_args=None): if converter_args is not None: struct = dict() struct[converter_args['top_name']] = packet.data packet.data = struct return packet @staticmethod def record2record_array(packet, converter_args=None): if not hasattr(packet, 'arr'): packet.arr = list() if packet.data is not None: packet.arr.append(packet.data) packet.consume() # At end of stream or when max array size reached: close the array if packet.is_end_of_stream() is True or \ (converter_args is not None and len(packet.arr) >= converter_args['max_len']): # End of stream reached: assembled record array packet.data = packet.arr packet.arr = list() return packet @staticmethod def record_array2struct(packet, converter_args=None): return FormatConverter.record2struct(packet, converter_args) @staticmethod def string2etree_doc(packet): packet.data = etree.fromstring(packet.data) return packet @staticmethod def struct2string(packet): packet.data = packet.to_string() return packet @staticmethod def struct2geojson_feature(packet, converter_args=None): key, feature_struct = packet.data.popitem() feature = {'type': 'feature', 'properties': {}} id_field = None if converter_args: id_field = converter_args['feature_id_attr'] for attr_name in feature_struct: val = feature_struct[attr_name] if attr_name == 'geometry': feature['geometry'] = val elif attr_name == id_field: feature['id'] = val else: feature['properties'][attr_name] = val packet.data = feature return packet
# 'xml_line_stream', 'etree_doc', 'etree_element', 'etree_feature_array', 'xml_doc_as_string', # 'string', 'record', 'record_array', 'geojson_collection', geojson_feature', 'struct', # 'ogr_feature', 'ogr_feature_array', 'any' FORMAT_CONVERTERS = { FORMAT.etree_doc: { FORMAT.geojson_collection: FormatConverter.etree_doc2geojson_collection, FORMAT.string: FormatConverter.etree_doc2string, FORMAT.struct: FormatConverter.etree_doc2struct, FORMAT.xml_doc_as_string: FormatConverter.etree_doc2string }, FORMAT.etree_element: { FORMAT.geojson_feature: FormatConverter.etree_elem2geojson_feature, FORMAT.string: FormatConverter.etree_doc2string, FORMAT.struct: FormatConverter.etree_elem2struct, FORMAT.xml_doc_as_string: FormatConverter.etree_doc2string }, FORMAT.geojson_feature: { FORMAT.ogr_feature: FormatConverter.geojson_feature2ogr_feature }, FORMAT.geojson_collection: { FORMAT.ogr_feature_array: FormatConverter.geojson_coll2ogr_feature_arr }, FORMAT.gdal_vsi_path: { FORMAT.etree_doc: FormatConverter.gdal_vsi_path2etree_doc }, FORMAT.ogr_feature: { FORMAT.geojson_feature: FormatConverter.ogr_feature2struct, FORMAT.struct: FormatConverter.ogr_feature2struct }, FORMAT.ogr_feature_array: { FORMAT.geojson_collection: FormatConverter.ogr_feature_arr2geojson_coll }, FORMAT.record: { FORMAT.struct: FormatConverter.record2struct, FORMAT.record_array: FormatConverter.record2record_array }, FORMAT.record_array: { FORMAT.struct: FormatConverter.record_array2struct }, FORMAT.string: { FORMAT.etree_doc: FormatConverter.string2etree_doc, FORMAT.xml_doc_as_string: FormatConverter.no_op }, FORMAT.struct: { FORMAT.string: FormatConverter.struct2string, FORMAT.geojson_feature: FormatConverter.struct2geojson_feature }, FORMAT.xml_doc_as_string: { FORMAT.etree_doc: FormatConverter.string2etree_doc, FORMAT.string: FormatConverter.no_op }, FORMAT.line_stream: { FORMAT.string: FormatConverter.no_op }, FORMAT.xml_line_stream: { FORMAT.string: FormatConverter.no_op } }