Source code for veranda.raster.mosaic.geotiff

""" Raster data class managing I/O for multiple GeoTIFF files. """

import os
import uuid
import tempfile
import xarray as xr
import numpy as np
import pandas as pd
from osgeo import gdal
from datetime import datetime
from typing import Tuple, List
from multiprocessing import Pool, RawArray

from geospade.crs import SpatialRef
from geospade.raster import Tile
from geospade.raster import MosaicGeometry

from veranda.utils import to_list
from veranda.raster.native.geotiff import GeoTiffFile
from veranda.raster.native.geotiff import create_vrt_file
from veranda.raster.gdalport import GDAL_TO_NUMPY_DTYPE
from veranda.raster.mosaic.base import RasterDataReader, RasterDataWriter, RasterAccess

PROC_OBJS = {}


[docs]def read_init(fr, am, sm, sd, td, si, ad, dc, dk): """ Helper method for setting the entries of global variable `PROC_OBJS` to be available during multiprocessing. """ PROC_OBJS['global_file_register'] = fr PROC_OBJS['access_map'] = am PROC_OBJS['shm_map'] = sm PROC_OBJS['stack_dimension'] = sd PROC_OBJS['tile_dimension'] = td PROC_OBJS['stack_ids'] = si PROC_OBJS['auto_decode'] = ad PROC_OBJS['decoder'] = dc PROC_OBJS['decoder_kwargs'] = dk
[docs]class GeoTiffAccess(RasterAccess): """ Helper class to build the link between indexes of the source array (access) and the target array (assignment). The base class `RasterAccess` is extended with some properties needed for accessing GeoTIFF files. """ def __init__(self, src_raster_geom, dst_raster_geom, src_root_raster_geom=None): """ Constructor of `GeoTiffAccess`. Parameters ---------- src_raster_geom : geospade.raster.RasterGeometry Raster geometry representing the extent and indices of the array to access. dst_raster_geom : geospade.raster.RasterGeometry Raster geometry representing the extent and indices of the array to assign. src_root_raster_geom : geospade.raster.RasterGeometry, optional Raster geometry representing the origin to which `src_raster_geom` should be referred to. Defaults to None, i.e. the root parent of `src_raster_geom` is used. """ super().__init__(src_raster_geom, dst_raster_geom, src_root_raster_geom=src_root_raster_geom) self.src_wkt = src_raster_geom.parent_root.sref.wkt self.src_geotrans = src_raster_geom.parent_root.geotrans self.src_shape = src_raster_geom.parent_root.shape @property def gdal_args(self) -> Tuple[int, int, int, int]: """ Prepares the needed positional arguments for GDAL's `ReadAsArray()` function. """ min_row, min_col, max_row, max_col = self.src_window n_cols, n_rows = max_col - min_col + 1, max_row - min_row + 1 return min_col, min_row, n_cols, n_rows @property def read_args(self) -> Tuple[int, int, int, int]: """ Prepares the needed positional arguments for the `read()` function of the internal GeoTIFF native. """ min_col, min_row, n_cols, n_rows = self.gdal_args return min_row, min_col, n_rows, n_cols
[docs]class GeoTiffReader(RasterDataReader): """ Allows to read and manage a stack of GeoTIFF data. """ def __init__(self, file_register, mosaic, stack_dimension='layer_id', stack_coords=None, tile_dimension='tile_id', space_dims=None, file_class=GeoTiffFile, file_class_kwargs=None): """ Constructor of `GeoTiffReader`. Parameters ---------- file_register : pd.Dataframe, optional Data frame managing a stack/list of files containing the following columns: - 'filepath' : str Full file path to a geospatial file. - 'layer_id' : object Specifies an ID to which layer a file belongs to, e.g. a layer counter or a timestamp. Must correspond to `stack_dimension`. - 'tile_id' : str Tile name or ID to which tile a file belongs to. mosaic : geospade.raster.MosaicGeometry Mosaic representing the spatial allocation of the given files. The tiles of the mosaic have to match the ID's/names of the 'tile_id' column. stack_dimension : str, optional Dimension/column name of the dimension, where to stack the files along (first axis), e.g. time, bands etc. Defaults to 'layer_id', i.e. the layer ID's are used as the main coordinates to stack the files. stack_coords : list, optional Additional columns of `file_register` to use as coordinates. Defaults to None, i.e. only coordinates along `stack_dimension` are used. tile_dimension : str, optional Dimension/column name of the dimension containing tile ID's in correspondence with the tiles in `mosaic`. Defaults to 'tile_id'. space_dims : list, optional Dictionary containing the spatial dimension names. By default it is set to ['y', 'x']. file_class : class, optional Class for opening GeoTIFF files. Defaults to `GeoTiffFile`. file_class_kwargs : dict, optional Keyword arguments for `file_class`. """ super().__init__(file_register, mosaic, stack_dimension=stack_dimension, stack_coords=stack_coords, tile_dimension=tile_dimension) file_class_kwargs = file_class_kwargs or dict() ref_filepath = self._file_register['filepath'].iloc[0] with file_class(ref_filepath, 'r', **file_class_kwargs) as gt_file: self._ref_dtypes = gt_file.dtypes self._ref_nodatavals = gt_file.nodatavals self._ref_space_dims = space_dims or ['y', 'x']
[docs] @classmethod def from_filepaths(cls, filepaths, mosaic_class=MosaicGeometry, mosaic_kwargs=None, tile_kwargs=None, stack_dimension='layer_id', tile_dimension='tile_id', **kwargs) -> "GeoTiffReader": """ Creates a `GeoTiffReader` instance as one stack of GeoTIFF files. Parameters ---------- filepaths : list of str List of full system paths to a GeoTIFF file. mosaic_class : geospade.raster.MosaicGeometry, optional Mosaic class used to manage the spatial properties of the file stack. If None, the most generic mosaic will be used by default. The initialised mosaic will only contain one tile. mosaic_kwargs : dict, optional Additional arguments for initialising `mosaic_class`. tile_kwargs : dict, optional Additional arguments for initialising a tile class associated with `mosaic_class`. stack_dimension : str, optional Dimension/column name of the dimension, where to stack the files along (first axis), e.g. time, bands etc. Defaults to 'layer_id', i.e. the layer ID's are used as the main coordinates to stack the files. tile_dimension : str, optional Dimension/column name of the dimension containing tile ID's in correspondence with the tiles in `mosaic`. Defaults to 'tile_id'. kwargs : dict, optional Key-word arguments for the `GeoTiffReader` constructor. Returns ------- GeoTiffReader """ mosaic_kwargs = mosaic_kwargs or dict() tile_kwargs = tile_kwargs or dict() n_filepaths = len(filepaths) file_register_dict = dict() file_register_dict['filepath'] = filepaths file_register_dict[tile_dimension] = ['0'] * n_filepaths file_register_dict[stack_dimension] = list(range(1, n_filepaths + 1)) file_register = pd.DataFrame(file_register_dict) ref_filepath = filepaths[0] with GeoTiffFile(ref_filepath, 'r') as gt_file: sref_wkt = gt_file.sref_wkt geotrans = gt_file.geotrans n_rows, n_cols = gt_file.raster_shape tile_class = mosaic_class.get_tile_class() tile = tile_class(n_rows, n_cols, sref=SpatialRef(sref_wkt), geotrans=geotrans, name='0', **tile_kwargs) mosaic_geom = mosaic_class.from_tile_list([tile], check_consistency=False, **mosaic_kwargs) return cls(file_register, mosaic_geom, stack_dimension=stack_dimension, tile_dimension=tile_dimension, **kwargs)
[docs] @classmethod def from_mosaic_filepaths(cls, filepaths, mosaic_class=MosaicGeometry, mosaic_kwargs=None, stack_dimension='layer_id', tile_dimension='tile_id', **kwargs) -> "GeoTiffReader": """ Creates a `GeoTiffDataReader` instance as multiple stacks of GeoTIFF files. Parameters ---------- filepaths : list of str List of full system paths to a GeoTIFF file. mosaic_class : geospade.raster.MosaicGeometry, optional Mosaic class used to manage the spatial properties of the file stacks. If None, the most generic mosaic will be used by default. mosaic_kwargs : dict, optional Additional arguments for initialising `mosaic_class`. stack_dimension : str, optional Dimension/column name of the dimension, where to stack the files along (first axis), e.g. time, bands etc. Defaults to 'layer_id', i.e. the layer ID's are used as the main coordinates to stack the files. tile_dimension : str, optional Dimension/column name of the dimension containing tile ID's in correspondence with the tiles in `mosaic`. Defaults to 'tile_id'. kwargs : dict, optional Key-word arguments for the `GeoTiffReader` constructor. Returns ------- GeoTiffReader """ mosaic_kwargs = mosaic_kwargs or dict() file_register_dict = dict() file_register_dict['filepath'] = filepaths tile_class = mosaic_class.get_tile_class() tiles, tile_ids, layer_ids = RasterDataReader._create_tile_and_layer_info_from_files(filepaths, tile_class, GeoTiffFile) file_register_dict[tile_dimension] = tile_ids file_register_dict[stack_dimension] = layer_ids file_register = pd.DataFrame(file_register_dict) mosaic_geom = mosaic_class.from_tile_list(tiles, check_consistency=False, **mosaic_kwargs) return cls(file_register, mosaic_geom, stack_dimension=stack_dimension, tile_dimension=tile_dimension, **kwargs)
[docs] def apply_nan(self): """ Converts no data values given as an attribute '_FillValue' or keyword `nodatavals` to np.nan. """ nodatavals = {dvar: self._ref_nodatavals[i] for i, dvar in enumerate(self.data_view.data_vars)} super().apply_nan(nodatavals=nodatavals)
[docs] def read(self, bands=1, band_names=None, engine='vrt', n_cores=1, auto_decode=False, decoder=None, decoder_kwargs=None) -> "GeoTiffReader": """ Reads data from disk. Parameters ---------- bands : tuple of int or int, optional The GeoTIFF bands of interest. Defaults to the first band, i.e. 1. band_names : tuple of str or str, optional Names associated with the respective bands of the GeoTIFF files. Defaults to None, i.e. the band numbers will be used as a name. engine : str, optional Engine used in the background to read data. The following options are available: - 'vrt' : Uses GDAL's VRT format to stack the GeoTIFF files per tile and load them as once. This option yields good performance if the mosaic is stored locally on one drive. Parallelisation is applied across tiles. - 'parallel' : Reads file by file, but in a parallelised manner. This option yields good performance if the mosaic is stored on a distributed file system. n_cores : int, optional Number of cores used to read data in a parallelised manner (defaults to 1). auto_decode : bool, optional True if data should be decoded according to the information available in its metadata. Defaults to False. decoder : callable, optional Function allowing to decode data read from disk. decoder_kwargs : dict, optional Keyword arguments for the decoder. """ bands = to_list(bands) band_names = to_list(band_names) dst_tile = Tile.from_extent(self._mosaic.outer_extent, sref=self._mosaic.sref, x_pixel_size=self._mosaic.x_pixel_size, y_pixel_size=self._mosaic.y_pixel_size, name='0') shm_map = {band: self.__init_band_data(band, dst_tile) for band in bands} access_map = {src_tile.parent_root.name: GeoTiffAccess(src_tile, dst_tile) for src_tile in self._mosaic.tiles} data_mask = self.__create_data_mask_from(access_map, dst_tile.shape) if engine == 'vrt': self.__read_vrt_stack(access_map, shm_map, n_cores=n_cores, auto_decode=auto_decode, decoder=decoder, decoder_kwargs=decoder_kwargs) elif engine == 'parallel': self.__read_parallel(access_map, shm_map, n_cores=n_cores, auto_decode=auto_decode, decoder=decoder, decoder_kwargs=decoder_kwargs) else: err_msg = f"Engine '{engine}' is not supported!" raise ValueError(err_msg) data = {band: self.__load_band_data(band, shm_map, data_mask) for band in bands} self._data_geom = dst_tile self._data = self._to_xarray(data, band_names) self._add_grid_mapping() return self
def __init_band_data(self, band, tile) -> Tuple[RawArray, tuple]: """ Initialises shared memory array for a specific band. Parameters ---------- band : int Band number. tile : Tile Tile containing information about the raster shape of the band data. Returns ------- shm_ar : RawArray Shared memory array. shm_ar_shape : tuple Shape of the array. """ np_dtype = np.dtype(self._ref_dtypes[band - 1]) self._ref_nodatavals[band - 1] = np.array((self._ref_nodatavals[band - 1])).astype(np_dtype) data_nshm = np.ones((self.n_layers, tile.n_rows, tile.n_cols), dtype=np_dtype) * \ self._ref_nodatavals[band - 1] shm_ar_shape = data_nshm.shape c_dtype = np.ctypeslib.as_ctypes_type(data_nshm.dtype) shm_rar = RawArray(c_dtype, data_nshm.size) shm_data = np.frombuffer(shm_rar, dtype=np_dtype).reshape(shm_ar_shape) shm_data[:] = data_nshm[:] return shm_rar, shm_ar_shape def __load_band_data(self, band, shm_map, mask) -> np.ndarray: """ Loads band data from the corresponding shared memory array. Parameters ---------- band : int Band number. shm_map : dict Dictionary mapping the band number with a tuple containing a shared memory array and its shape. mask : np.array Data mask. Returns ------- shm_ar : RawArray Shared memory array. """ shm_rar, shm_ar_shape = shm_map[band] shm_data = np.frombuffer(shm_rar, dtype=self._ref_dtypes[band - 1]).reshape(shm_ar_shape) shm_data[:, ~mask.astype(bool)] = self._ref_nodatavals[band - 1] return shm_data def __create_data_mask_from(self, access_map, shape) -> np.ndarray: """ Creates data mask from all individual tile masks contributing to the spatial extent of the data. Parameters ---------- access_map : dict Dictionary mapping tile names with `GeoTiffAccess` instances for storing the access relation between the data mask and the tiles. shape : tuple Shape of the data mask. Returns ------- data_mask : np.ndarray Data mask (1 valid data, 0 no data). """ data_mask = np.ones(shape) for tile in self._mosaic.tiles: gt_access = access_map[tile.parent_root.name] data_mask[gt_access.dst_row_slice, gt_access.dst_col_slice] = tile.mask return data_mask def __read_vrt_stack(self, access_map, shm_map, n_cores=1, auto_decode=False, decoder=None, decoder_kwargs=None): """ Reads GeoTIFF data from a stack of GeoTIFF files by using GDAL's VRT format. Parameters ---------- access_map : dict Dictionary mapping tile/geometry ID's with `GeoTiffAccess` instances to define the access patterns between the data to load and to assign. shm_map : dict Dictionary mapping band numbers with the respective name of the memory buffer of the shared numpy raw array. n_cores : int, optional Number of cores used to read data in a parallelised manner (defaults to 1). auto_decode : bool, optional True if data should be decoded according to the information available in its metadata. Defaults to False. decoder : callable, optional Function allowing to decode data read from disk. decoder_kwargs : dict, optional Keyword arguments for the decoder. """ decoder_kwargs = decoder_kwargs or dict() global_file_register = self._file_register with Pool(n_cores, initializer=read_init, initargs=(global_file_register, access_map, shm_map, self._file_dim, self._tile_dim, self.layer_ids, auto_decode, decoder, decoder_kwargs)) as p: p.map(read_vrt_stack, access_map.keys()) def __read_parallel(self, access_map, shm_map, n_cores=1, auto_decode=False, decoder=None, decoder_kwargs=None): """ Reads GeoTIFF mosaic on a file-by-file basis, in a parallel manner. Parameters ---------- access_map : dict Dictionary mapping tile/geometry ID's with `GeoTiffAccess` instances to define the access patterns between the mosaic to load and to assign. shm_map : dict Dictionary mapping band numbers with the respective name of the memory buffer of the shared numpy raw array. n_cores : int, optional Number of cores used to read data in a parallelised manner (defaults to 1). auto_decode : bool, optional True if data should be decoded according to the information available in its metadata. Defaults to False. decoder : callable, optional Function allowing to decode data read from disk. decoder_kwargs : dict, optional Keyword arguments for the decoder. """ decoder_kwargs = decoder_kwargs or None global_file_register = self._file_register with Pool(n_cores, initializer=read_init, initargs=(global_file_register, access_map, shm_map, self._file_dim, self._tile_dim, auto_decode, decoder, decoder_kwargs)) as p: p.map(read_single_files, global_file_register.index) def _to_xarray(self, data, band_names=None) -> xr.Dataset: """ Converts data being available as a NumPy array to an xarray dataset. Parameters ---------- data : dict Dictionary mapping band numbers to GeoTIFF raster data being available as a NumPy array. band_names : list of str, optional Band names associated with the respective band number. Returns ------- xrds : xr.Dataset """ dims = [self._file_dim] + self._ref_space_dims # retrieve file register with unique stack dimension ref_tile_id = list(self.file_register[self._tile_dim])[0] file_register_uni = self._file_register.loc[self._file_register[self._tile_dim] == ref_tile_id] coord_dict = dict() for coord in self._file_coords: coord_dict[coord] = file_register_uni[coord] coord_dict[self._ref_space_dims[0]] = self._data_geom.y_coords coord_dict[self._ref_space_dims[1]] = self._data_geom.x_coords xar_dict = dict() bands = list(data.keys()) band_names = band_names or bands for i, band in enumerate(bands): xar_dict[band_names[i]] = xr.DataArray(data[band], coords=coord_dict, dims=dims, attrs={'_FillValue': self._ref_nodatavals[band - 1]}) xrds = xr.Dataset(data_vars=xar_dict) return xrds
[docs]class GeoTiffWriter(RasterDataWriter): """ Allows to write and manage a stack of GeoTIFF files. """ def __init__(self, mosaic, file_register=None, data=None, stack_dimension='layer_id', stack_coords=None, tile_dimension='tile_id', dirpath=None, fn_pattern='{layer_id}.tif', fn_formatter=None): """ Constructor of `GeoTiffWriter`. Parameters ---------- mosaic : geospade.raster.MosaicGeometry Mosaic representing the spatial allocation of the given files. The tiles of the mosaic have to match the ID's/names of the 'tile_id' column. file_register : pd.Dataframe, optional Data frame managing a stack/list of files containing the following columns: - 'filepath' : str Full file path to a geospatial file. - 'layer_id' : object Specifies an ID to which layer a file belongs to, e.g. a layer counter or a timestamp. Must correspond to `stack_dimension`. - 'tile_id' : str Tile name or ID to which tile a file belongs to. If it is None, then the constructor tries to create a file from other keyword arguments, i.e. `data`, `dirpath`, `fn_pattern`, and `fn_formatter`. data : xr.Dataset, optional Raster data stored in memory. It must match the spatial sampling and CRS of the mosaic, but not its spatial extent or tiling. Moreover, the dimension of the mosaic along the first dimension (stack dimension), must match the entries/filepaths in `file_register`. stack_dimension : str, optional Dimension/column name of the dimension, where to stack the files along (first axis), e.g. time, bands etc. Defaults to 'layer_id', i.e. the layer ID's are used as the main coordinates to stack the files. stack_coords : list, optional Additional columns of `file_register` to use as coordinates. Defaults to None, i.e. only coordinates along `stack_dimension` are used. tile_dimension : str, optional Dimension/column name of the dimension containing tile ID's in correspondence with the tiles in `mosaic`. Defaults to 'tile_id'. dirpath : str, optional Directory path to the folder where the GeoTIFF files should be written to. Defaults to None, i.e. the current working directory is used. fn_pattern : str, optional Pattern for the filename of the new GeoTIFF files. To fill in specific parts of the new file name with information from the file register, you can specify the respective file register column names in curly brackets and add them to the pattern string as desired. Defaults to '{layer_id}.tif'. fn_formatter : dict, optional Dictionary mapping file register column names with functions allowing to encode their values as strings. """ super().__init__(mosaic, file_register=file_register, data=data, stack_dimension=stack_dimension, stack_coords=stack_coords, tile_dimension=tile_dimension, dirpath=dirpath, fn_pattern=fn_pattern, fn_formatter=fn_formatter) def __get_encoding_info_from_data(self, data, band_names) -> Tuple[dict, dict, dict, dict]: """ Extracts encoding information from an xarray dataset. Parameters ---------- data : xr.Dataset Data to extract encoding info from. band_names : list of str List of band names. Returns ------- nodatavals : dict Band number mapped to no data value (defaults to 0). scale_factors : dict Band number mapped to scale factor (defaults to 1). offsets : dict Band number mapped to offset (defaults to 0). dtypes : dict Band number mapped to data type. """ nodatavals = dict() scale_factors = dict() offsets = dict() dtypes = dict() for i, band_name in enumerate(band_names): dtypes[i + 1] = data[band_name].data.dtype.name nodatavals[i + 1] = data[band_name].attrs.get('_FillValue', 0) scale_factors[i + 1] = data[band_name].attrs.get('scale_factor', 1) offsets[i + 1] = data[band_name].attrs.get('add_offset', 0) return nodatavals, scale_factors, offsets, dtypes
[docs] def write(self, data, use_mosaic=False, data_variables=None, encoder=None, encoder_kwargs=None, overwrite=False, **kwargs): """ Writes a certain chunk of data to disk. Parameters ---------- data : xr.Dataset Data chunk to be written to disk or being appended to existing data. use_mosaic : bool, optional True if data should be written according to the mosaic. False if data composes a new tile and should not be tiled (default). data_variables : list of str, optional Data variables to write. Defaults to None, i.e. all data variables are written. encoder : callable, optional Function allowing to encode data before writing it to disk. encoder_kwargs : dict, optional Keyword arguments for the encoder. overwrite : bool, optional True if data should be overwritten, False if not (default). """ data_geom = self.raster_geom_from_data(data, sref=self.mosaic.sref) data_filt = data if data_variables is None else data[data_variables] band_names = list(data_filt.data_vars) n_bands = len(band_names) nodatavals, scale_factors, offsets, dtypes = self.__get_encoding_info_from_data(data_filt, band_names) for filepath, file_group in self._file_register.groupby('filepath'): tile_id = file_group.iloc[0].get(self._tile_dim, '0') file_coords = list(file_group[self._file_dim]) xrds = data_filt.sel(**{self._file_dim: file_coords}) data_write = xrds[band_names].to_array().data if use_mosaic: src_tile = self._mosaic[tile_id] if not src_tile.intersects(data_geom): continue dst_tile = data_geom.slice_by_geom(src_tile, inplace=False, name='0') gt_access = GeoTiffAccess(dst_tile, src_tile, src_root_raster_geom=data_geom) data_write = data_write[..., gt_access.src_row_slice, gt_access.src_col_slice] else: src_tile = data_geom dst_tile = data_geom gt_access = GeoTiffAccess(dst_tile, src_tile, src_root_raster_geom=data_geom) file_id = file_group.iloc[0].get('file_id', None) if file_id is None: gt_file = GeoTiffFile(filepath, mode='w', geotrans=src_tile.geotrans, sref_wkt=src_tile.sref.wkt, raster_shape=src_tile.shape, n_bands=n_bands, dtypes=dtypes, scale_factors=scale_factors, offsets=offsets, nodatavals=nodatavals) file_id = len(list(self._files.keys())) + 1 self._files[file_id] = gt_file self._file_register.loc[file_group.index, 'file_id'] = file_id gt_file = self._files[file_id] gt_file.write(data_write[..., gt_access.src_row_slice, gt_access.src_col_slice].reshape((-1, dst_tile.n_rows, dst_tile.n_cols)), row=gt_access.dst_window[0], col=gt_access.dst_window[1], encoder=encoder, encoder_kwargs=encoder_kwargs)
[docs] def export(self, use_mosaic=False, data_variables=None, encoder=None, encoder_kwargs=None, overwrite=False, **kwargs): """ Writes all internally stored data to disk. Parameters ---------- use_mosaic : bool, optional True if data should be written according to the mosaic. False if data composes a new tile and should not be tiled (default). data_variables : list of str, optional Data variables to write. Defaults to None, i.e. all data variables are written. encoder : callable, optional Function allowing to encode data before writing it to disk. encoder_kwargs : dict, optional Keyword arguments for the encoder. overwrite : bool, optional True if data should be overwritten, False if not (default). """ self.write(self.data_view, use_mosaic, data_variables, encoder, encoder_kwargs, overwrite, **kwargs)
[docs]def read_vrt_stack(tile_id): """ Function being responsible to create a new VRT file from a stack of GeoTIFF files, read data from this files, and assign it to a shared memory array. This function is meant to be executed in parallel on different cores. Parameters ---------- tile_id : str Tile/geometry ID coming from the Pool's mapping function. """ global_file_register = PROC_OBJS['global_file_register'] access_map = PROC_OBJS['access_map'] shm_map = PROC_OBJS['shm_map'] tile_dimension = PROC_OBJS['tile_dimension'] stack_dimension = PROC_OBJS['stack_dimension'] stack_ids = PROC_OBJS['stack_ids'] gt_access = access_map[tile_id] bands = list(shm_map.keys()) file_register = global_file_register.loc[global_file_register[tile_dimension] == tile_id] if len(file_register) > 0: path = tempfile.gettempdir() vrt_filepath = os.path.join(path, f"{datetime.utcnow().strftime('%Y%m%d%H%M%S')}-{uuid.uuid4().hex}.vrt") while os.path.exists(vrt_filepath): vrt_filepath = os.path.join(path, f"{datetime.utcnow().strftime('%Y%m%d%H%M%S')}-{uuid.uuid4().hex}.vrt") filepaths = list(file_register['filepath']) create_vrt_file(filepaths, vrt_filepath, gt_access.src_shape, gt_access.src_wkt, gt_access.src_geotrans, bands=bands) layer_ids = [stack_ids.index(stack_id) for stack_id in file_register[stack_dimension]] src = gdal.Open(vrt_filepath, gdal.GA_ReadOnly) vrt_data = src.ReadAsArray(*gt_access.gdal_args) for band in bands: _assign_vrt_stack_per_band(tile_id, layer_ids, band, src, vrt_data)
def _assign_vrt_stack_per_band(tile_id, layer_ids, band, src, vrt_data): """ Assigns loaded raster data to shared memory array for a specific band. Parameters ---------- tile_id : str Tile/geometry ID coming from the Pool's mapping function. band : int Band number. src : gdal.Dataset GDAL dataset handle. vrt_data : np.ndarray In-memory data read from disk. """ auto_decode = PROC_OBJS['auto_decode'] decoder = PROC_OBJS['decoder'] decoder_kwargs = PROC_OBJS['decoder_kwargs'] access_map = PROC_OBJS['access_map'] shm_map = PROC_OBJS['shm_map'] gt_access = access_map[tile_id] bands = list(shm_map.keys()) n_bands = len(bands) b_idx = bands.index(band) band_data = vrt_data[b_idx::n_bands, ...] scale_factor = src.GetRasterBand(b_idx + 1).GetScale() nodataval = src.GetRasterBand(b_idx + 1).GetNoDataValue() offset = src.GetRasterBand(b_idx + 1).GetOffset() dtype = GDAL_TO_NUMPY_DTYPE[src.GetRasterBand(b_idx + 1).DataType] if auto_decode: band_data = band_data.astype(float) band_data[band_data == nodataval] = np.nan band_data = band_data * scale_factor + offset else: if decoder is not None: band_data = decoder(band_data, nodataval=nodataval, band=band, scale_factor=scale_factor, offset=offset, dtype=dtype, **decoder_kwargs) shm_rar, shm_ar_shape = shm_map[band] shm_data = np.frombuffer(shm_rar, dtype=dtype).reshape(shm_ar_shape) shm_data[layer_ids, gt_access.dst_row_slice, gt_access.dst_col_slice] = band_data
[docs]def read_single_files(file_idx): """ Function being responsible to read data from a single GeoTIFF file and assign it to a shared memory array. This function is meant to be executed in parallel on different cores. Parameters ---------- file_idx : any Index value to access a specific row of the file register. The actual value should come from the Pool's mapping function. """ global_file_register = PROC_OBJS['global_file_register'] access_map = PROC_OBJS['access_map'] shm_map = PROC_OBJS['shm_map'] auto_decode = PROC_OBJS['auto_decode'] decoder = PROC_OBJS['decoder'] decoder_kwargs = PROC_OBJS['decoder_kwargs'] stack_dimension = PROC_OBJS['stack_dimension'] tile_dimension = PROC_OBJS['tile_dimension'] file_entry = global_file_register.loc[file_idx] layer_id = file_entry[stack_dimension] tile_id = file_entry[tile_dimension] filepath = file_entry['filepath'] gt_access = access_map[tile_id] bands = list(shm_map.keys()) with GeoTiffFile(filepath, mode='r', auto_decode=auto_decode) as gt_file: gt_data = gt_file.read(*gt_access.read_args, bands=bands, decoder=decoder, decoder_kwargs=decoder_kwargs) for band in bands: dtype = gt_file.dtypes[band] shm_rar, shm_ar_shape = shm_map[band] shm_data = np.frombuffer(shm_rar, dtype=dtype).reshape(shm_ar_shape) shm_data[layer_id, gt_access.dst_row_slice, gt_access.dst_col_slice] = gt_data[(band - 1), ...]
if __name__ == '__main__': pass