# -*- coding: utf-8 -*-
#
# Pydoas is a Python library for the post-analysis of DOAS result data
# Copyright (C) 2017 Jonas Gliß (jonasgliss@gmail.com)
#
# This program is free software: you can redistribute it and/or
# modify it under the terms of the BSD 3-Clause License
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
# See BSD 3-Clause License for more details
# (https://opensource.org/licenses/BSD-3-Clause)
from datetime import datetime, timedelta, date
from typing import Optional, Sequence, Union
from numpy import asarray, full, polyfit, poly1d, logical_and, polyval, ndarray
from pandas import Series, DataFrame
from matplotlib.pyplot import subplots
from matplotlib.dates import DateFormatter
from pydoas.helpers import to_datetime
from pydoas.dataimport import DataImport, ResultImportSetup
[docs]
class DatasetDoasResults(object):
"""A Dataset for DOAS fit results import and processing
Attributes:
setup (ResultImportSetup): setup specifying all necessary import
settings (please see documentation of :class:`ResultImportSetup`
for setup details)
raw_results (dict): dictionary containing the imported results
Args:
setup (ResultImportSetup): setup specifying all necessary import
settings (please see documentation of :class:`ResultImportSetup`
for setup details)
init (int): if 1, the raw results will be loaded immediately
**kwargs: alternative way to setup :attr:`setup`
(:class:`ResultImportSetup` object), which is only used in case
input parameter **setup** is None.
"""
[docs]
def __init__(self, setup: ResultImportSetup = None, init: bool = 1, **kwargs):
self.setup = None
self.raw_results = {}
self.load_input(setup, **kwargs)
if init:
self.load_raw_results()
@property
def base_path(self) -> str:
"""Basepath of resultfiles"""
return self.setup.base_path
@property
def start(self) -> datetime:
"""Start date and time of dataset"""
return self.setup.start
@property
def stop(self) -> datetime:
"""Stop date and time of dataset"""
return self.setup.stop
@property
def dev_id(self) -> str:
"""Device ID of dataset"""
return self.setup.dev_id
@property
def import_info(self):
"""Returns information about result import details"""
return self.setup.import_info
[docs]
def change_time_ival(self, start: datetime, stop: datetime) -> None:
"""Change the time interval for the considered dataset
Note:
Previously loaded results will be deleted
Args:
start (datetime): new start time
stop (datetime): new stop time
"""
if isinstance(start, datetime):
self.start = start
if isinstance(stop, datetime):
self.stop = stop
self.raw_results = {}
[docs]
def load_raw_results(self) -> bool:
"""Try to load all results as specified in the setup
This method will try to load all results as specified in the setup. If
the import setup is not complete, an exception will be raised.
Returns:
bool: True if data is loaded, False otherwise
Raises:
AttributeError: If the import setup is not complete
"""
if not self.setup.complete():
raise AttributeError("Import setup is not complete")
access = DataImport(self.setup)
if access.data_loaded:
self.raw_results = access.results
self.set_start_stop_time()
return True
return False
[docs]
def has_data(self, fit_id: str, species_id: str, start: datetime = None, stop: datetime = None) -> bool:
"""Checks if specific data is available
Args:
fit_id (str): ID of the fit scenario
species_id (str): ID of the species
start (datetime, optional): Start datetime for the data check
stop (datetime, optional): Stop datetime for the data check
Returns:
bool: True if data is available, False otherwise
"""
if not (fit_id in self.raw_results and species_id in self.raw_results[fit_id]):
return False
if all([isinstance(x, datetime) for x in [start, stop]]):
ts = self.raw_results[fit_id]["start"]
if not any([start < x < stop for x in ts]):
return False
return True
[docs]
def get_spec_times(self, fit: str) -> tuple:
"""Returns start time and stop time arrays for spectra to a given fit
Args:
fit (str): ID of the fit scenario
Returns:
tuple: start and stop time arrays for the spectra
"""
start = asarray(self.raw_results[fit]["start"])
stop = asarray(self.raw_results[fit]["stop"])
return start, stop
[docs]
def set_start_stop_time(self):
"""Get start/stop range of dataset"""
start_acq, stop_acq = [], []
for fit_id in self.raw_results:
ts, _ = self.get_spec_times(fit_id)
if len(ts) > 0:
start_acq.append(ts.min())
stop_acq.append(ts.max())
self.setup.start = min(start_acq)
self.setup.stop = max(stop_acq)
return self.start, self.stop
[docs]
def get_start_stop_mask(self, fit, start=None, stop=None):
"""Creates boolean mask for data access only in a certain time interval
"""
if start is None:
start = self.start
elif isinstance(start, date):
#assume that all spectra from that day are supposed to be imported
print ("Start time is date, loading results from all spectra of "
"that day")
start = to_datetime(start)
stop = start + timedelta(days=1)
if stop is None:
stop = self.stop
i, f = self.get_spec_times(fit)
mask = logical_and(i >= start, i <= stop)
return mask, i[mask], f[mask]
def _get_data(self, species_id, fit_id):
"""Access raw data array
:param str species_id: string ID of species
:param str fit_id: string ID of fit scenario
"""
return self.raw_results[fit_id][species_id]
[docs]
def get_results(self, species_id, fit_id=None, start=None, stop=None):
"""Get spectral results object
:param str species_id: string ID of species
:param str fit_id: string ID of fit scenario (if None, tries to
load default fit_id)
:param start: if valid (i.e. datetime object) only data after that time
stamp is considered
:param stop: if valid (i.e. datetime object) only data before that time
stamp is considered
"""
if fit_id is None:
fit_id = self.setup.default_fit_ids[species_id]
if not fit_id in self.setup.get_fit_ids():
return False
m, start, stop = self.get_start_stop_mask(fit_id, start, stop)
if not sum(m):
return False
dat = self._get_data(species_id, fit_id)[m]
times = start + (stop - start) / 2
dat_err = self._get_data((species_id + "_err"), fit_id)[m]
return DoasResults(dat, times, start, stop, dat_err, species_id, fit_id, 3)
[docs]
def get_default_fit_id(self, species_id):
"""Get default fit scenario id for species
:param str species_id: ID of species (e.g. "so2")
"""
return self.setup.default_fit_ids[species_id]
[docs]
def set_default_fitscenarios(self, default_dict):
"""Update default fit scenarios for species
:param dict default_dict: dictionary specifying new default fit
scenarios, it could e.g. look like::
default_dict = {"so2" : "f01",
"o3" : "f01",
"bro" : "f03"}
"""
try:
self.setup.set_defaults(default_dict)
return 1
except:
return 0
[docs]
def plot(self,species_id, fit_id=None, start=None, stop=None, **kwargs):
"""Plot DOAS results"""
res = self.get_results(species_id, fit_id, start, stop)
if res is not False:
return res.plot(**kwargs)
return 0
[docs]
def scatter_plot(self, species_id_xaxis, fit_id_xaxis, species_id_yaxis,
fit_id_yaxis, lin_fit_opt=1, species_id_zaxis=None,
fit_id_zaxis=None, start=None, stop=None, ax=None,
**kwargs):
"""Make a scatter plot of two species
:param str species_id_xaxis: string ID of x axis species (e.g. "so2")
:param str fit_id_xaxis: fit scenario ID of x axis species (e.g. "f01")
:param str species_id_yaxis: string ID of y axis species (e.g. "so2")
:param str fit_id_yaxis: fit scenario ID of y axis species (e.g. "f02")
:param str species_id_zaxis: string ID of z axis species (e.g. "o3")
:param str fit_id_zaxis: fit scenario ID of z axis species (e.g. "f01")
:param datetime start: start time stamp for data retrieval
:param datetime stop: stop time stamp for data retrieval
:param ax: matplotlib axes object (None), if provided, then the result
is plotted into the axes
:param kwargs: keyword arguments for matplotlib scatter plot
(e.g. color, marker, edgecolor, etc.)
"""
res_x = self.get_results(species_id_xaxis, fit_id_xaxis, start, stop)
res_y = self.get_results(species_id_yaxis, fit_id_yaxis, start, stop)
res_z = None
if species_id_zaxis is not None and fit_id_zaxis is not None:
res_z = self.get_results(\
species_id_zaxis, fit_id_zaxis, start, stop)
if ax is None:
fig, ax = subplots(1, 1)
else:
fig = ax.figure.canvas.figure
if res_z is not None:
sc = ax.scatter(res_x.values, res_y.values, 15, res_z.values,\
marker = 'o', edgecolor = 'none', **kwargs)
cb = fig.colorbar(sc, ax = ax, shrink = 0.9, **kwargs)
cb.set_label(species_id_zaxis + " " + fit_id_zaxis, **kwargs)
else:
ax.plot(res_x.values, res_y.values, " b*", label = "Data",\
**kwargs)
ax.set_xlabel(species_id_xaxis + " " + fit_id_xaxis)
ax.set_ylabel(species_id_yaxis + " " + fit_id_yaxis)
ax.grid()
if lin_fit_opt:
self.linear_regression(res_x.values, res_y.values, ax = ax)
ax.legend(loc = 'best', fancybox = True, framealpha = 0.5, fontsize = 12)
return ax
[docs]
def linear_regression(self, x_data, y_data, mask = None, ax = None):
"""Perform linear regression and return parameters
:param ndarray x_data: x data array
:param ndarray y_data: y data array
:param ndarray mask: mask specifying indices of input data supposed to
be considered for regression (None)
:param ax: matplotlib axes object (None), if provided, then the result
is plotted into the axes
"""
if mask is None:
mask = full(len(y_data), True, dtype=bool)
poly = poly1d(polyfit(x_data[mask], y_data[mask], 1))
if ax is not None:
ax.plot(x_data, polyval(poly, x_data), "--r",\
label = "Slope: %.2f" %(poly[1]))
return poly
[docs]
def get_fit_import_setup(self):
"""Get the current fit import setup"""
return self.setup.import_info
def __getitem__(self, key):
"""Get attribute using bracketed syntax"""
for k,v in list(self.__dict__.items()):
if k == key:
return v
try:
return v[key]
except:
pass
def __setitem__(self, key, val):
"""Change attribute value using bracket syntax"""
for k,v in list(self.__dict__.items()):
if k == key:
self.__dict__[key] = val
return
def __str__(self):
"""String representation"""
return ("\nDOAS result dataset\n-------------------\n" + str(self.setup))
[docs]
class DoasResults(Series):
"""Data time series for handling and analysing DOAS fit results
:param arraylike data: DOAS fit results (column densities)
:param arraylike index: Time stamps of data points
:param arraylike fit_errs: DOAS fit errors
:param string species_id: String specifying the fitted species
:param string fit_id: Unique string specifying the fit scenario used
:param int fit_errs_corr_fac: DOAS fit error correction factor
"""
[docs]
def __init__(
self,
data: Union[Sequence, ndarray, Series],
index: Optional[Union[Sequence, ndarray]] = None,
start_acq: Optional[Union[Sequence, ndarray]] = None,
stop_acq: Optional[Union[Sequence, ndarray]] = None,
fit_errs: Optional[Union[Sequence, ndarray]] = None,
species_id: Optional[str] = None,
fit_id: Optional[str] = None,
fit_errs_corr_fac=1.0):
if start_acq is None:
start_acq = []
if stop_acq is None:
stop_acq = []
if species_id is None:
species_id = ""
if fit_id is None:
fit_id = ""
if isinstance(data, Series):
index = data.index
species_id = data.name
data = data.values
super().__init__(data, index, name=species_id)
if fit_errs is None:
fit_errs = []
self.fit_errs = fit_errs
self.fit_id = fit_id
self.fit_errs_corr_fac = fit_errs_corr_fac
self.start_acq = start_acq
self.stop_acq = stop_acq
@property
def start(self):
"""Start time of data"""
return self.index[0]
@property
def stop(self):
"""Stop time of data"""
return self.index[-1]
@property
def species(self):
"""Return name of current species"""
return self.name
[docs]
def has_start_stop_acqtamps(self):
"""Checks if start_time and stop_time arrays have valid data"""
try:
if not all([isinstance(x, datetime) for x in self.start_acq]):
raise Exception("Invalid value encountered in start_acq")
if not all([isinstance(x, datetime) for x in self.stop_acq]):
raise Exception("Invalid value encountered in stop_acq")
if not all([len(self) == len(x) for x in [self.start_acq, self.stop_acq]]):
raise Exception("Lengths of arrays do not match...")
return True
except Exception:
return False
[docs]
def merge_other(self, other, itp_method="linear", dropna=True):
"""Merge with other time series sampled on different grid
Note
----
This object will not be changed, instead, two new Series objects will
be created and returned
Parameters
----------
other : Series
Other time series
itp_method : str
String specifying interpolation method (e.g. linear, quadratic)
dropna : bool
Drop indices containing NA after merging and interpolation
Returns
-------
tuple
2-element tuple containing
- this Series (merged)
- other Series (merged)
"""
if not isinstance(other, Series):
raise ValueError("Need pandas Series instance (or objects "
"inheriting from it)")
df = DataFrame(dict(s1=self,s2=other)).interpolate(itp_method)
if dropna:
df = df.dropna()
return df.s1, df.s2
[docs]
def get_data_above_detlim(self):
"""Get fit results exceeding the detection limit
The detection limit is determined as follows::
self.fit_errs_corr_fac*self.data_err
"""
return self[self > self.fit_errs * self.fit_errs_corr_fac]
[docs]
def plot(self, date_fmt=None, **kwargs):
"""Plot time series
Uses plotting utility of :class:`Series` object (pandas)
:param **kwargs: - keyword arguments for pandas plot method
"""
if not "style" in kwargs:
kwargs["style"]="--x"
try:
self.index = self.index.to_pydatetime()
except:
pass
ax = super(DoasResults, self).plot(**kwargs)
try:
if date_fmt is not None:
ax.xaxis.set_major_formatter(DateFormatter(date_fmt))
except:
pass
ax.set_ylabel(self.species)
return ax
[docs]
def shift(self, timedelta=timedelta(0.)):
"""Shift time stamps of object
:param timedelta timedelta: temporal shift
:returns: shifted :class:`DoasResults` object
"""
new = DoasResults(self.values,
self.index + timedelta,
self.start_acq,
self.stop_acq,
self.fit_errs, self.name, self.fit_id,
self.fit_errs_corr_fac)
if self.has_start_stop_acqtamps:
new.start_acq += timedelta
new.stop_acq += timedelta
return new
def _add__(self, inp):
s = super(DoasResults, self).__add__(inp)
return DoasResults(s.values, s.index)
def __sub__(self, inp):
s = super(DoasResults, self).__sub__(inp)
return DoasResults(s.values, s.index)
def __div__(self, inp):
s = super(DoasResults, self).__div__(inp)
return DoasResults(s.values, s.index)
def __mul__(self, inp):
s = super(DoasResults, self).__mul__(inp)
return DoasResults(s.values, s.index)