# Copyright (c) 2013, 2018 National Technology and Engineering Solutions of Sandia, LLC . Under the terms of Contract
# DE-NA0003525 with National Technology and Engineering Solutions of Sandia, LLC, the U.S. Government
# retains certain rights in this software.
import h5py
import numbers
import numpy
import os
import slycat.darray
import cherrypy
[docs]
class DArray(slycat.darray.Prototype):
"""Slycat darray implementation that stores data in an HDF5 file."""
def __init__(self, storage):
self._storage = storage
self._metadata = self._storage.get("metadata", None)
if self._metadata is None:
self._metadata = self._storage.attrs
self._attributes = None
@property
def ndim(self):
"""Return the number of dimensions in the darray.
Returns
-------
ndim: integer
The number of dimensions in the darray.
"""
return len(self._metadata["dimension-names"])
@property
def shape(self):
"""Return the darray shape (its size along each dimension).
Returns
-------
shape: tuple of integers
The size of the darray along each dimension.
"""
return tuple([end - begin for begin, end in zip(self._metadata["dimension-begin"], self._metadata["dimension-end"])])
@property
def size(self):
"""Return the darray size (total number of elements stored in the darray).
Returns
-------
size: integer
The total number of elements stored in the darray.
"""
return numpy.prod(self.shape)
@property
def dimensions(self):
"""Return metadata describing the darray dimensions.
Returns
-------
dimensions: list of dicts
"""
return [dict(name=name, type=type, begin=begin, end=end) for name, type, begin, end in zip(self._metadata["dimension-names"], self._metadata["dimension-types"], self._metadata["dimension-begin"], self._metadata["dimension-end"])]
@property
def attributes(self):
"""Return metadata describing the darray attributes.
Returns
-------
attributes: list of dicts
"""
if self._attributes is None:
self._attributes = [dict(name=name, type=type) for name, type in zip(self._metadata["attribute-names"], self._metadata["attribute-types"])]
return self._attributes
def _update_cache(self, attribute_index):
attribute_key = "attribute/%s" % attribute_index
unique_key = "unique/%s" % attribute_index
attribute = self._storage[attribute_key]
if "min" in attribute.attrs and "max" in attribute.attrs and "unique" in attribute.attrs and unique_key in self._storage:
return
attribute_min = None
attribute_max = None
attribute_unique = None
chunk_size = 1000
for begin in numpy.arange(0, len(attribute), chunk_size):
slice = attribute[begin : begin + chunk_size]
if attribute.dtype.char in ["O", "S", "U"]:
data_min = min(slice)
data_max = max(slice)
data_unique = numpy.unique(slice)
if type(data_min) is bytes:
data_min = str(data_min.decode())
if type(data_max) is bytes:
data_max = str(data_max.decode())
attribute_min = str(data_min) if attribute_min is None else str(min(data_min, attribute_min))
attribute_max = str(data_max) if attribute_max is None else str(max(data_max, attribute_max))
attribute_unique = data_unique if attribute_unique is None else numpy.unique(numpy.concatenate((data_unique, attribute_unique)))
else:
slice = slice[numpy.invert(numpy.isnan(slice))]
if len(slice):
data_min = numpy.asscalar(slice.min())
data_max = numpy.asscalar(slice.max())
data_unique = numpy.unique(slice)
attribute_min = data_min if attribute_min is None else min(data_min, attribute_min)
attribute_max = data_max if attribute_max is None else max(data_max, attribute_max)
attribute_unique = data_unique if attribute_unique is None else numpy.unique(numpy.concatenate((data_unique, attribute_unique)))
if attribute_min is not None:
attribute.attrs["min"] = attribute_min
if attribute_max is not None:
attribute.attrs["max"] = attribute_max
if attribute_unique is not None:
attribute.attrs["unique"] = len(attribute_unique)
self._storage.create_dataset(unique_key, data=attribute_unique, dtype=dtype(self._metadata["attribute-types"][attribute_index]))
[docs]
def get_statistics(self, attribute):
self._update_cache(attribute)
attribute = self._storage["attribute/%s" % attribute]
return {
"min": attribute.attrs.get("min", None),
"max": attribute.attrs.get("max", None),
"unique": attribute.attrs.get("unique", None),
}
[docs]
def get_unique(self, attribute, hyperslice):
self._update_cache(attribute)
return {
"values": self._storage["unique/%s" % attribute][hyperslice]
}
[docs]
def get_data(self, attribute):
"""Return a reference to the data storage for a darray attribute.
Parameters
----------
attribute: integer, optional
The integer index of the attribute data to retrieve.
Returns
-------
data: reference to a numpy-array-like object.
An object implementing a subset of the :class:`numpy.ndarray` interface
that contains the attribute data. Note that the returned object only
`references` the underlying data - data is not retrieved from the file
until you access it using the `[]` operator.
"""
class StorageWrapper(object):
"""Ensures that the dtype of data retrieved from the file matches what was put in."""
def __init__(self, storage, dtype):
self._storage = storage
self._dtype = dtype
if type(self._dtype) is bytes:
self._dtype = str(self._dtype.decode())
if self._dtype == "string":
self._dtype = 'unicode'
def __getitem__(self, *args, **kwargs):
result = self._storage.__getitem__(*args, **kwargs)
# check for unicode string, convert to numpy
if type(result) is bytes:
result = numpy.str_(result.decode())
# check for normal string, convert to numpy
if type(result) is str:
result = numpy.str_(result)
# check for list or numpy array, byte decode everything in it
if (type(result) is list) or (type(result) is numpy.ndarray):
for i in range(0, len(result)):
try:
result[i] = result[i].decode('utf-8')
except (UnicodeDecodeError, AttributeError):
pass
return result.astype(self._dtype)
return StorageWrapper(self._storage["attribute/%s" % attribute], self._metadata["attribute-types"][attribute])
[docs]
def set_data(self, attribute, hyperslice, data):
"""Overwrite the contents of a darray attribute.
Parameters
----------
attribute : integer
The zero-based integer index of the attribute to be overwritten.
hyperslice : integer, :class:`slice`, :class:`Ellipsis`, or tuple containing one or more integer, :class:`slice`, and :class:`Ellipsis` instances.
Defines the attribute region to be overwritten.
data : numpy.ndarray
Data to be written to the attribute.
"""
if not (0 <= attribute and attribute < len(self.attributes)):
cherrypy.log.error("hdf5.py set_data", "Attribute index %s out-of-range." % attribute)
raise ValueError("Attribute index %s out-of-range." % attribute)
if isinstance(hyperslice, (numbers.Integral, slice, type(Ellipsis))):
pass
elif isinstance(hyperslice, tuple):
for i in hyperslice:
if not isinstance(i, (numbers.Integral, slice, type(Ellipsis))):
cherrypy.log.error("hdf5.py set_data", "Unsupported hyperslice type.")
raise ValueError("Unsupported hyperslice type.")
else:
cherrypy.log.error("hdf5.py set_data", "Unsupported hyperslice type.")
raise ValueError("Unsupported hyperslice type.")
# Store the data.
attribute_storage = self._storage["attribute/%s" % attribute]
attribute_storage[hyperslice] = data
# Flush cached sort indices.
index_key = "index/%s" % attribute
if index_key in self._storage:
del self._storage[index_key]
# Flush cached unique values.
unique_key = "unique/%s" % attribute
if unique_key in self._storage:
del self._storage[unique_key]
# Flush cached statistics.
if "min" in attribute_storage.attrs:
del attribute_storage.attrs["min"]
if "max" in attribute_storage.attrs:
del attribute_storage.attrs["max"]
if "unique" in attribute_storage.attrs:
del attribute_storage.attrs["unique"]
[docs]
class ArraySet(object):
"""Wraps an instance of :class:`h5py.File` to implement a Slycat arrayset."""
def __init__(self, file):
self._storage = file
def __len__(self):
return len(self._storage["array"])
def __getitem__(self, key):
return DArray(self._storage["array/%s" % key])
[docs]
def keys(self):
return [int(key) for key in list(self._storage["array"].keys())]
[docs]
def array_count(self):
"""Note: this assumes that array indices are contiguous, which we don't explicitly enforce."""
return len(list(self._storage["array"].keys()))
[docs]
def start_array(self, array_index, dimensions, attributes):
"""Add an uninitialized darray to the arrayset.
An existing array with the same index will be overwritten.
Parameters
----------
array_index : integer, required.
Zero-based index of the array to create.
dimensions : list of dicts, required.
Description of the new array dimensions.
attributes : list of dicts, required.
Description of the new array attributes.
Returns
-------
array : :class:`slycat.hdf5.DArray`
"""
# cherrypy.log.error("building start_array for put_model_array")
stub = slycat.darray.Stub(dimensions, attributes)
shape = [dimension["end"] - dimension["begin"] for dimension in stub.dimensions]
stored_types = [dtype(attribute["type"]) for attribute in stub.attributes]
# cherrypy.log.error("allocating space for start_array for put_model_array")
try:
# Allocate space for the coming data ...
array_key = "array/%s" % array_index
if array_key in self._storage:
del self._storage[array_key]
for attribute_index, stored_type in enumerate(stored_types):
self._storage.create_dataset("array/%s/attribute/%s" % (array_index, attribute_index), shape, dtype=stored_type)
except Exception as e:
pass
# cherrypy.log.error("storing metadata for start_array for put_model_array")
# Store array metadata ...
array_metadata = self._storage[array_key].create_group("metadata")
array_metadata["attribute-names"] = numpy.array([attribute["name"] for attribute in stub.attributes], dtype=h5py.special_dtype(vlen=str))
array_metadata["attribute-types"] = numpy.array([attribute["type"] for attribute in stub.attributes], dtype=h5py.special_dtype(vlen=str))
array_metadata["dimension-names"] = numpy.array([dimension["name"] for dimension in stub.dimensions], dtype=h5py.special_dtype(vlen=str))
array_metadata["dimension-types"] = numpy.array([dimension["type"] for dimension in stub.dimensions], dtype=h5py.special_dtype(vlen=str))
array_metadata["dimension-begin"] = numpy.array([dimension["begin"] for dimension in stub.dimensions], dtype="int64")
array_metadata["dimension-end"] = numpy.array([dimension["end"] for dimension in stub.dimensions], dtype="int64")
# cherrypy.log.error("returning Darray for start_array for put_model_array")
return DArray(self._storage[array_key])
[docs]
def store_array(self, array_index, array):
"""Store a :class:`slycat.darray.Prototype` in the arrayset.
An existing array with the same index will be overwritten.
Parameters
----------
array_index : integer, required.
The index of the array to be created / overwritten.
array : :class:`slycat.darray.Prototype`, required.
Existing darray to be stored.
Returns
-------
array : :class:`slycat.hdf5.DArray`
"""
if not isinstance(array, slycat.darray.Prototype):
cherrypy.log.error("hdf5.py store_array", "A slycat.darray is required.")
raise ValueError("A slycat.darray is required.")
index = tuple([slice(dimension["begin"], dimension["end"]) for dimension in array.dimensions])
self.start_array(array_index, array.dimensions, array.attributes)
for attribute_index, attribute in enumerate(array.attributes):
stored_type = dtype(attribute["type"])
data = array.get_data(attribute_index)
# Store the data ...
attribute_key = "array/%s/attribute/%s" % (array_index, attribute_index)
hdf5_attribute = self._storage[attribute_key]
hdf5_attribute[index] = data
return DArray(self._storage["array/%s" % array_index])
[docs]
def start_arrayset(file):
"""Create a new array set using an open hdf5 file.
Parameters
----------
file : :class:`h5py.File`, required.
An hdf5 file open for writing.
Returns
-------
arrayset : :class:`slycat.hdf5.ArraySet`
"""
if not isinstance(file, h5py.File):
cherrypy.log.error("hdf5.py start_arrayset", "An open h5py.File is required.")
raise ValueError("An open h5py.File is required.")
file.create_group("array")
return ArraySet(file)
################################################################################################################################################
# Legacy functionality - don't use these in new code.
[docs]
def dtype(input_type):
"""Convert a string attribute type into a dtype suitable for use with h5py."""
try:
input_type = input_type.decode()
except (UnicodeDecodeError, AttributeError):
pass
if input_type not in list(dtype.type_map.keys()):
cherrypy.log.error("hdf5.py dtype", "Unsupported type: {}".format(input_type))
raise Exception("Unsupported type: {}".format(input_type))
return dtype.type_map[input_type]
dtype.type_map = {"int8":"int8", "int16":"int16", "int32":"int32", "int64":"int64", "uint8":"uint8", "uint16":"uint16", "uint32":"uint32", "uint64":"uint64", "float32":"float32", "float64":"float64", "string":h5py.special_dtype(vlen=str), "float":"float32", "double":"float64"}
[docs]
def path(array, directory):
return os.path.join(directory, array[0:2], array[2:4], array[4:6], array + ".hdf5")