# (C) 2019 University of Colorado AES-CCAR-SEDA (Space Environment Data Analysis) Group
# Liam Kilcommons - University of Colorado, Boulder - Colorado Center for Astrodynamics Research
# Originally created May, 2016
import numpy as np
import h5py,os,shutil
from collections import OrderedDict
from esabin.esagrid import Esagrid,ConstantAzimuthalSpacingGrid,EsagridBin
from esabin import spheretools
class EsaGridFileDuplicateTimeError(Exception):
pass
class InvalidFlatIndexError(Exception):
pass
class InvalidBinGroupNameError(Exception):
pass
class EsagridBinComparisonError(Exception):
pass
[docs]class EsagridFileBinGroup(object):
"""Class representing an HDF5 Group containing data from one bin
PARAMETERS
----------
grid : esabin.esagrid.ConstantLatitudeSpacingGrid
Object representing the particular grid used
flatind : int
The index (number) defining which bin this Group is for
"""
def __init__(self,grid,flatind):
self.esagrid_bin = EsagridBin(grid,flatind)
self.groupname = self._group_name_to_flatind(flatind)
def __getitem__(self,key):
return self.esagrid_bin[key]
def __setitem__(self,key,value):
self.esagrid_bin[key]=value
def __contains__(self,key):
return key in self.esagrid_bin
def items(self):
return self.esagrid_bin.items()
@staticmethod
def _flatind_from_group_name(h5groupname):
flatindstr = h5groupname.split('bin')[-1]
try:
flatind = int(flatindstr)
except ValueError:
raise InvalidBinGroupNameError(('{} '.format(h5groupname)
+'is not a valid bin group name'))
return flatind
@classmethod
def from_groupname(cls,grid,groupname):
flatind = cls._flatind_from_group_name(groupname)
return cls(grid,flatind)
def _check_flatind(self,flatind):
grid = self.esagrid_bin.grid
if flatind < 0 or flatind>=grid.n_bins:
raise InvalidFlatIndexError(('No bin with flat index {}'.format(flatind)
+'in grid {}'.format(str(grid))))
def _group_name_to_flatind(self,flatind):
self._check_flatind(flatind)
return 'bin%d' % (flatind)
def _check_bin_group_name(self,h5group):
#H5 groups' name is their full path
h5groupname = h5group.name.split('/')[-1]
if self.groupname != h5groupname:
raise RuntimeError(('H5 group name {} did not match'.format(h5groupname)
+'EsagridFileBinGroup {}'.format(self.groupname)))
def check_bin_group_metadata(self,h5group,fix=False,raise_error=False):
#Check that this group matches this object
self._check_bin_group_name(h5group)
#Check for old/missing metadata (e.g. flatind not an integer)
for attrname,attrval in self.esagrid_bin.items():
if attrname in h5group.attrs:
if h5group.attrs[attrname]!=attrval:
errstr = ('Group:{}\n'.format(h5group.name)
+'Incorrect Attribute {}'.format(attrname)
+'{}!={}'.format(attrval,h5group.attrs[attrname]))
if raise_error:
raise BinGroupMetadataError(errstr)
else:
print(errstr)
if fix:
h5group.attrs[attrname]=attrval
print('Fixed')
else:
print('Group:{}'.format(h5group.name))
print('Missing Attribute {}'.format(attrname))
if fix:
h5group.attrs[attrname]=attrval
print('Fixed')
def get_h5group(self,h5f):
if self.groupname not in h5f:
h5grp = h5f.create_group(self.groupname)
#Write bin describing metadata
for attrname,attrval in self.esagrid_bin.items():
h5grp.attrs[attrname]=attrval
else:
h5grp = h5f[self.groupname]
self.check_bin_group_metadata(h5grp,fix=True)
return h5grp
def append_from_other(self,h5f,other_h5f):
self_h5group = self.get_h5group(h5f)
other_h5group = other_h5f[self.groupname]
self.check_bin_group_metadata(other_h5group,fix=False,raise_error=True)
for h5dsname,h5ds in other_h5group.items():
if not isinstance(h5ds,h5py.Dataset):
print('Will not copy {}, not a HDF5 dataset'.format(h5dsname))
additional_attrs = {key:val for key,val in h5ds.attrs.items()}
data = h5ds[:]
self.store(h5f,h5dsname,data,additional_attrs=additional_attrs)
[docs] def store(self,h5f,jd,data,additional_attrs=None,silent=False):
"""Store an array of data taken at a particular time. Time
is assumed to be specified as a floating point number (Julian date)
"""
h5grp = self.get_h5group(h5f)
if isinstance(jd,np.ndarray):
#If time is an array, use the first value in the bin
#as the hdf5 dataset name
h5datasetnm = str(jd.flatten()[0])
else:
#If time is not an array, just use it's string
#version as the dataset name
h5datasetnm = str(jd)
#Ensure no dataset name collisions
if h5datasetnm in h5grp:
raise EsaGridFileDuplicateTimeError(('Dataset with name'
+' {}'.format(h5datasetnm)
+' already exists in'
+' group {}'.format(h5grp)))
# else:
# while h5datasetnm in h5grp:
# h5datasetnm += '0'
dataset = h5grp.create_dataset(h5datasetnm,data=data)
if additional_attrs is not None:
for attr in additional_attrs:
dataset.attrs[attr]=additional_attrs[attr]
if not silent:
print("Added %d points to %s" % (data.size,
h5grp.attrs['longname']))
def copy(self,h5f,destination_esagrid_file_bingroup,destination_h5f):
src_esagrid_bin = self.esagrid_bin
dest_esagrid_bin = destination_esagrid_file_bingroup.esagrid_bin
if src_esagrid_bin != dest_esagrid_bin:
raise EsagridBinComparisonError(('Cannot copy because '
+'destination bin metadata does '
+'not match source bin metadata '
+'{} != '.format(str(dest_esagrid_bin))
+'{}'.format(str(src_esagrid_bin))))
h5grp = self.get_h5group(h5f)
other_h5grp = destination_esagrid_file_bingroup.get_h5group(destination_h5f)
for dataset_name in h5grp:
if dataset_name in other_h5grp:
raise EsaGridFileDuplicateTimeError(('Error while copying. '
+' Dataset with name '
+' {}'.format(h5datasetnm)
+' already exists in '
+' destination '
+' group {}'.format(h5grp)))
#Only h5py Group objects have a copy method
h5grp.copy(dataset_name,other_h5grp,name=dataset_name)
def _columnify_additional_attrs(self,additional_attrs):
"""Takes a list of dictionaries. Each dictionary in the
list are the HDF5 dataset attributes from one dataset in this
bin's group. Converts this list of dictionaries to an
output dictionary of lists or arrays depending on the type of
data encountered. The keys of the output dictionary include any
keys encountered in an input dictionary. If a particular key is
not in every input dictionary a fill value with be inserted
The fill value is numpy.nan if the data is numeric,
otherwise it is None"""
keys = []
typefuncs = []
for attrdict in additional_attrs:
for key in attrdict:
if key not in keys:
keys.append(key)
try:
dum = float(attrdict[key])
typefuncs.append(float)
except ValueError:
typefuncs.append(str)
outdict = {key:[] for key in keys}
for attrdict in additional_attrs:
for key,typefunc in zip(keys,typefuncs):
if key in attrdict:
outdict[key].append(typefunc(attrdict[key]))
else:
outdict[key].append(np.nan if typefunc is float else None)
for key,typefunc in zip(keys,typefuncs):
if typefunc is float:
outdict[key]=np.array(outdict[key])
return outdict
def read(self,h5f):
h5grp = self.get_h5group(h5f)
times = []
datasets = []
additional_attrs = []
for dset_timestr in h5grp:
dataset_time = float(dset_timestr)
data = h5grp[dset_timestr][:]
times.append(dataset_time)
datasets.append(data)
additional_attrs.append({key:val for key,val in h5grp[dset_timestr].attrs.items()})
additional_attrs = self._columnify_additional_attrs(additional_attrs)
return times,datasets,additional_attrs
class DefaultEsagrid(Esagrid):
"""Default settings of a 3 degrees latitude per band, 3 cap bins, with
localtime as the azimuthal coordinate"""
def __init__(self,delta_lat=3,n_cap_bins=3,azi_coord='lt'):
Esagrid.__init__(self,delta_lat,n_cap_bins=n_cap_bins,azi_coord=azi_coord)
[docs]class EsagridFile(object):
"""Class representing HDF5 file which contains one Group for each (populated) bin
PARAMETERS
----------
hdf5_filenm : str
The filename of the hdf5 file to store to. If this is an existing file
and clobber == False, will use the stored metadata in the file to
create the appropriate esagrid and you can continue adding to the file
or process results from it
grid : esabin.esagrid.ConstantLatitudeSpacingGrid, optional
The grid of bins to bin into. If it is None (default), a default grid
with delta_lat = 3 and n_cap_bins = 3 and azi_coord = 'lt' is used
hdf5_local_dir : str, optional
A valid local path at which the hdf5 files will be created
clobber : bool, optional
If True, will delete and overwrite the HDF5 file specified as
os.path.join(hdf5_local_dir,hdf5_filenm) if it exists.
"""
def __init__(self,hdf5_filenm,grid=None,hdf5_local_dir=None,clobber=False):
if hdf5_local_dir is None:
raise ValueError(('hdf5_local_dir kwarg is now mandatory '
+'and cannot be None'))
self.hdf5dir = hdf5_local_dir
self.hdf5filenm = hdf5_filenm
self.h5fn = os.path.join(self.hdf5dir,self.hdf5filenm)
#Default to grid of with 3 latitude bins if no grid passed
default_grid = DefaultEsagrid()
if os.path.exists(self.h5fn):
if not clobber:
self.grid = self.create_grid_from_metadata()
else:
os.remove(self.h5fn)
self.grid = default_grid if grid is None else grid
self.write_grid_metadata()
else:
self.grid = default_grid if grid is None else grid
self.write_grid_metadata()
self.binlats,self.binlonorlts = self.grid.bin_locations(center_or_edges='edges')
self._bingroups = {}
with h5py.File(self.h5fn,'r') as h5f:
for groupname in h5f:
try:
bingroup = EsagridFileBinGroup.from_groupname(self.grid,
groupname)
except InvalidBinGroupNameError:
print(('Could not create EsagridFileBinGroup'
+'from h5 group {}'.format(groupname)))
continue
self._bingroups[bingroup['flatind']]=bingroup
[docs] @classmethod
def copy_of_existing(cls,source_esagrid_file,destination_h5fn):
"""Generate a new EsagridFile that contains all of the data from
an existing one under a new filename"""
if os.path.exists(destination_h5fn):
raise IOError('Destination file {} exists!'.format(destination_h5fn))
shutil.copyfile(source_esagrid_file.h5fn,destination_h5fn)
dest_hdf5_local_dir,dest_hdf5_filename = os.path.split(destination_h5fn)
return EsagridFile(dest_hdf5_filename,hdf5_local_dir=dest_hdf5_local_dir)
[docs] def append_existing(self,existing_esagrid_file):
"""Copy all bin data from another EsagridFile instance into this one"""
with h5py.File(self.h5fn,'a') as h5f:
with h5py.File(existing_esagrid_file.h5fn,'r') as other_h5f:
for flatind in existing_esagrid_file:
if flatind not in self:
self[flatind]=EsagridFileBinGroup(self.grid,flatind)
self[flatind].append_from_other(h5f,other_h5f)
def __getitem__(self,flatind):
return self._bingroups[flatind]
def __setitem__(self,flatind,esagrid_file_bingroup):
if not isinstance(esagrid_file_bingroup,EsagridFileBinGroup):
raise TypeError(('{}'.format(esagrid_file_bingroup)
+' is not an EsagridFileBinGroup'))
self._bingroups[flatind] = esagrid_file_bingroup
def __contains__(self,flatind):
return flatind in self._bingroups
def items(self):
return self._bingroups.items()
def __iter__(self):
for flatind in self._bingroups:
yield flatind
def write_grid_metadata(self):
with h5py.File(self.h5fn,'a') as h5f:
h5f.attrs['delta_lat'] = self.grid.delta_lat
h5f.attrs['n_cap_bins'] = self.grid.n_cap_bins
h5f.attrs['azi_coord'] = self.grid.azi_coord
def create_grid_from_metadata(self):
with h5py.File(self.h5fn,'r') as h5f:
delta_lat = h5f.attrs['delta_lat']
try:
azi_coord = str(h5f.attrs['azi_coord'],'utf8')
except:
azi_coord = h5f.attrs['azi_coord']
if 'n_cap_bins' in h5f.attrs:
n_cap_bins = h5f.attrs['n_cap_bins']
return Esagrid(delta_lat,n_cap_bins=n_cap_bins,azi_coord=azi_coord)
elif 'delta_azi' in h5f.attrs:
delta_azi = h5f.attrs['delta_azi']
return ConstantAzimuthalSpacingGrid(delta_lat,delta_azi,azi_coord)
else:
raise ValueError(('Missing H5 attribute; cannot specify grid'
+'\ndid not find either "n_cap_bins" (esagrid)'
+'\n or "delta_azi" (constant azi spacing grid)'
+'\nin attrs: {}'.format(h5f.attrs)))
[docs] def bin_and_store(self,t,lat,lonorlt,data,silent=False,additional_attrs=None):
"""Store data into HDF5 file bin groups.
All arrays must be shape (n,) or shape (n,1) or shape (1,n).
PARAMETERS
----------
t : np.ndarray
Array of times (any float numeric representation)
lat : np.ndarray
Array of latitudes
lonorlt : np.ndarray
Array of longitudes or local times
data : np.ndarray
Array of data to bin
silent : bool,optional
Do not print status messages (default False)
additional_attrs : dict,optional
A dictionary of additional information which will be stored as
HDF5 attributes attached to any Datasets created by this function
call. Keys will be used as attribute names. Attribute values will
be stored as string representations of dictionary values (str(val)).
"""
latbands,lonbins,flatinds = self.grid.whichbin(lat,lonorlt)
with h5py.File(self.h5fn,'a') as h5f:
for bin_ind in np.unique(flatinds):
in_bin = flatinds == bin_ind
if bin_ind not in self:
self[bin_ind] = EsagridFileBinGroup(self.grid,bin_ind)
self[bin_ind].store(h5f,
t[in_bin].flatten(),
data[in_bin].flatten(),
additional_attrs=additional_attrs,
silent=silent)
def _dataset_passes_attr_filters(self,dataset,attr_filters,default_result=True):
"""
Filtering whether to include a specific dataset in a bin_stats
sample for a particular bin
Filters are specified as a nested dictionary
with the following grammar:
filters['attr_key']=test_function
where:
'attr_key' : the NAME of the HDF5 attribute of the dataset
test_function : a python lambda function or other function
to apply to the value of attribute.
This function must return a single True or
False value and appropriately handle the
type of data that is in the attribute
"""
passed_filters=default_result
if attr_filters is not None:
for attr_key in attr_filters:
attr_test_fun = attr_filters[attr_key]
if attr_key in dataset.attrs:
attr_value = dataset.attrs[attr_key]
passed_filters = passed_filters and attr_test_fun(attr_value)
return passed_filters
[docs] def bin_stats(self,statfun=np.nanmean,statfunname=None,
center_or_edges='edges',minlat=50.,
silent=False,force_recompute=False,
write_to_h5=True,attr_filters=None):
"""Apply some function to the contents of each bin
PARAMETERS
----------
statfun : callable or list, optional
The function which will be called. Can also be a list
of multiple callables (in which case a dict keyed with
str(callable) is returned). It is MUCH MORE EFFICIENT to
do this than call bin_stats multiple times.
statfunname : str or list, optional
List of custom keys (use if statfun is a list of callables)
center_or_edges : {'center','edges'},optional
Return the bin center or edges
minlat : float,optional
Minimum absolute latitude (default: 50.) below
which bins are ignored.
silent : bool,optional
Do not print status messages to stdout, default False
force_recompute : bool,optional
If a particular statfun callable has be evaluated before,
do not return a cached result. Default False.
write_to_h5 : bool, optional
Cache the results for particular statfun callable in the HDF5.
Default True. If statfunname is defined, cached result will
be stored under that name (identity of callable is not checked)
attr_filters : dict, optional
Provides optional filtering of the data fed to the callable using
HDF5 attributes of the bin Datasets (additional_attrs)
Key should be the HDF5 Dataset attribute name,
value should be a callable which takes attribute value
and returns True or False.
RETURNS
-------
binlats : np.ndarray
Bin latitudes (center or edges)
binlonorlt : np.ndarray
Bin longitudes or local times (center or edges)
binstats : np.ndarray or dict
If statfun is a single callable, will return an array, the
result of evaluating statfun. If statfun is a list,
this will be a dict of arrays, keyed with either the string
representation of each statfun or statfunnames if they are
defined.
"""
if not isinstance(statfun,list):
statfun = [statfun]
if statfunname is not None:
if not isinstance(statfunname,list):
statfunname = [statfunname]
#hdf5 dataset names where we can store results, or reload them if they've already been computed
statfun_dataset_names = ['binstats_'+func.__name__ for func in statfun]
print(statfun_dataset_names)
binstats = []
for fun in statfun:
this_binstats = np.zeros((len(self.grid.flat_bins[:,0]),1))
this_binstats.fill(np.nan)
binstats.append(this_binstats)
#Locate the bins
binlats,binlonorlts = self.grid.bin_locations(center_or_edges=center_or_edges)
with h5py.File(self.h5fn,'a') as h5f:
stats_computed = 'binstats_results' in h5f and \
all([dataset_name in h5f['binstats_results'] \
for dataset_name in statfun_dataset_names])
if not force_recompute and stats_computed:
if not silent:
print("Loading precomputed results, set kwarg force_recompute=True to avoid this behaviour")
for istatfun,statfun_dataset_name in enumerate(statfun_dataset_names):
this_binstats = h5f['binstats_results'][statfun_dataset_name][:]
if not silent:
print("Loading %d nonzero bins from dataset %s..." % (np.count_nonzero(np.isfinite(this_binstats)),statfun_dataset_name))
binstats[istatfun] = this_binstats
#We can just short-circuit and return, since we don't need to do any more data loading
#if len(binstats) == 1:
# binstats = binstats[0]
#return binlats,binlonorlts,binstats
elif not stats_computed or force_recompute:
#Read every dataset (pass) from each bin
for groupnm in h5f:
try:
flatind = EsagridFileBinGroup._flatind_from_group_name(groupnm)
except InvalidBinGroupNameError:
print('{} is not a bin group, skipping'.format(groupnm))
continue
esagrid_file_bingroup = EsagridFileBinGroup(self.grid,flatind)
#Load h5 group and check for old or missing metadata
grp = esagrid_file_bingroup.get_h5group(h5f)
#Skip bins below the desired latitude
if np.abs(grp.attrs['slat'])<minlat and np.abs(grp.attrs['elat'])<minlat:
#if not silent:
# print("Skipping bin %s because too low latitude (<%.3f)" % (grp.attrs['longname'],minlat))
continue
statusstr = "| %s | " % (grp.attrs['longname'])
flatind = grp.attrs['flatind']
if np.floor(flatind) != flatind or flatind < 0:
raise ValueError('Unexpected bin index {}'.format(flatind))
flatind = int(flatind)
datasets = []
ndatasets = len(grp.items())
for datasetnm,dataset in grp.items():
#Check that the dataset does not have
#any attributes which are in the prohibited_attrs
#list (an optional kwarg)
is_okay = self._dataset_passes_attr_filters(dataset,
attr_filters,
default_result=True)
if is_okay:
datasets.append(dataset[:])
statusstr+= "kept %d/%d passes | " % (len(datasets),ndatasets)
if len(datasets)>0:
bin_data = np.concatenate(datasets)
#Do all stat functions
for ifun,this_statfun in enumerate(statfun):
binstats[ifun][flatind] = this_statfun(bin_data)
statusstr+="%s: %.3f | " % (this_statfun.__name__,
binstats[ifun][flatind])
else:
for ifun,this_statfun in enumerate(statfun):
binstats[ifun][flatind] = np.nan
statusstr+="%s: NaN | " % (this_statfun.__name__)
if not silent:
print(statusstr)
#Save the results
else:
raise RuntimeError("Unhandled case!")
if write_to_h5:
for istatfun,statfun_dataset_name in enumerate(statfun_dataset_names):
if 'binstats_results' not in h5f:
h5f.create_group('binstats_results')
results_grp = h5f['binstats_results']
#Create a dataset for each statistics function's binned results
if statfun_dataset_name in results_grp:
del results_grp[statfun_dataset_name]
results_grp.create_dataset(statfun_dataset_name,data=binstats[istatfun])
if not silent:
print("Saved binning results to h5 datatset %s" % (statfun_dataset_name))
if statfunname is not None:
#Return binstats as a dictionary if we named the statistics
#functions
binstats_dict = {}
for i in range(len(binstats)):
binstats_dict[statfunname[i]] = binstats[i]
return binlats,binlonorlts,binstats_dict
else:
#Don't bother returning a list of results if we are only using one stat function
#Just return the array
if len(binstats) == 1:
binstats = binstats[0]
return binlats,binlonorlts,binstats