from __future__ import division
__all__ = ["Crossspectrum", "AveragedCrossspectrum", "coherence"]
import numpy as np
import scipy
import scipy.stats
import scipy.fftpack
import scipy.optimize
import stingray.lightcurve as lightcurve
import stingray.utils as utils
[docs]def coherence(lc1, lc2):
"""
Estimate coherence function of two light curves.
Parameters
----------
lc1: lightcurve.Lightcurve object
The first light curve data for the channel of interest.
lc2: lightcurve.Lightcurve object
The light curve data for reference band
Returns
-------
coh : np.ndarray
Coherence function
"""
assert isinstance(lc1, lightcurve.Lightcurve)
assert isinstance(lc2, lightcurve.Lightcurve)
cs = Crossspectrum(lc1, lc2, norm='none')
return cs.coherence()
[docs]class Crossspectrum(object):
def __init__(self, lc1=None, lc2=None, norm='none'):
"""
Make a cross spectrum from a (binned) light curve.
You can also make an empty Crossspectrum object to populate with your
own fourier-transformed data (this can sometimes be useful when making
binned periodograms).
Parameters
----------
lc1: lightcurve.Lightcurve object, optional, default None
The first light curve data for the channel/band of interest.
lc2: lightcurve.Lightcurve object, optional, default None
The light curve data for the reference band.
norm: {'frac', 'abs', 'leahy', 'none'}, default 'none'
The normalization of the (real part of the) cross spectrum.
Attributes
----------
freq: numpy.ndarray
The array of mid-bin frequencies that the Fourier transform samples
power: numpy.ndarray
The array of cross spectra (complex numbers)
df: float
The frequency resolution
m: int
The number of averaged cross-spectra amplitudes in each bin.
n: int
The number of data points/time bins in one segment of the light
curves.
nphots1: float
The total number of photons in light curve 1
nphots2: float
The total number of photons in light curve 2
"""
if isinstance(norm, str) is False:
raise TypeError("norm must be a string")
if norm.lower() not in ["frac", "abs", "leahy", "none"]:
raise ValueError( "norm must be 'frac', 'abs', 'leahy', or 'none'!")
self.norm = norm.lower()
## check if input data is a Lightcurve object, if not make one or
## make an empty Crossspectrum object if lc1 == None or lc2 == None
if lc1 is None or lc2 is None:
if lc1 is not None or lc2 is not None:
raise TypeError("You can't do a cross spectrum with just one "
"light curve!")
else:
self.freq = None
self.power = None
self.df = None
self.nphots1 = None
self.nphots2 = None
self.m = 1
self.n = None
return
self.lc1 = lc1
self.lc2 = lc2
self._make_crossspectrum(lc1, lc2)
def _make_crossspectrum(self, lc1, lc2):
## make sure the inputs work!
assert isinstance(lc1, lightcurve.Lightcurve), \
"lc1 must be a lightcurve.Lightcurve object!"
assert isinstance(lc2, lightcurve.Lightcurve), \
"lc2 must be a lightcurve.Lightcurve object!"
## total number of photons is the sum of the
## counts in the light curve
self.nphots1 = np.float64(np.sum(lc1.counts))
self.nphots2 = np.float64(np.sum(lc2.counts))
self.meancounts1 = np.mean(lc1.counts)
self.meancounts2 = np.mean(lc2.counts)
## the number of data points in the light curve
assert lc1.counts.shape[0] == lc2.counts.shape[0], \
"Light curves do not have same number of time bins per segment."
assert lc1.dt == lc2.dt, \
"Light curves do not have same time binning dt."
self.n = lc1.counts.shape[0]
## the frequency resolution
self.df = 1.0/lc1.tseg
## the number of averaged periodograms in the final output
## This should *always* be 1 here
self.m = 1
## make the actual Fourier transform and compute cross spectrum
self.freq, self.unnorm_power = self._fourier_cross(lc1, lc2)
## If co-spectrum is desired, normalize here. Otherwise, get raw back
## with the imaginary part still intact.
self.power = self._normalize_crossspectrum(self.unnorm_power, lc1.tseg)
def _fourier_cross(self, lc1, lc2):
"""
Fourier transform the two light curves, then compute the cross spectrum.
computed as CS = lc1 x lc2* (where lc2 is the one that gets
complex-conjugated)
Parameters
----------
lc1: lightcurve.Lightcurve object
One light curve to be Fourier transformed. Ths is the band of
interest or channel of interest.
lc2: lightcurve.Lightcurve object
Another light curve to be Fourier transformed. This is the reference
band.
Returns
-------
fr: numpy.ndarray
The squared absolute value of the Fourier amplitudes
"""
fourier_1 = scipy.fftpack.fft(lc1.counts) # do Fourier transform 1
fourier_2 = scipy.fftpack.fft(lc2.counts) # do Fourier transform 2
freqs = scipy.fftpack.fftfreq(lc1.counts.shape[0], lc1.dt)
cross = fourier_1[freqs > 0] * np.conj(fourier_2[freqs > 0])
return freqs[freqs > 0], cross
[docs] def rebin(self, df, method="mean"):
"""
Rebin the cross spectrum to a new frequency resolution df.
Parameters
----------
df: float
The new frequency resolution
Returns
-------
bin_cs = Crossspectrum object
The newly binned cross spectrum
"""
# rebin cross spectrum to new resolution
binfreq, bincs, step_size = utils.rebin_data(self.freq[1:],
self.power[1:], df,
method=method)
# make an empty cross spectrum object
# note: syntax deliberate to work with subclass Powerspectrum
bin_cs = self.__class__()
# store the binned periodogram in the new object
bin_cs.freq = np.hstack([binfreq[0]-self.df, binfreq])
bin_cs.power = np.hstack([self.power[0], bincs])
bin_cs.df = df
bin_cs.n = self.n
bin_cs.norm = self.norm
bin_cs.nphots1 = self.nphots1
bin_cs.nphots2 = self.nphots2
bin_cs.m = int(step_size)
return bin_cs
def _normalize_crossspectrum(self, unnorm_power, tseg):
"""
Normalize the real part of the cross spectrum to Leahy, absolute rms^2,
fractional rms^2 normalization, or not at all.
Parameters
----------
unnorm_power: numpy.ndarray
The unnormalized cross spectrum.
tseg: int
The length of the Fourier segment, in seconds.
Returns
-------
power: numpy.nd.array
The normalized co-spectrum (real part of the cross spectrum). For
'none' normalization, imaginary part is returned as well.
"""
# The "effective" counst/bin is the geometrical mean of the counts/bin
# of the two light curves
log_nphots1 = np.log(self.nphots1)
log_nphots2 = np.log(self.nphots2)
actual_nphots = np.float64(np.sqrt(np.exp(log_nphots1 + log_nphots2)))
actual_mean = np.sqrt(self.meancounts1 * self.meancounts2)
assert actual_mean > 0.0, \
"Mean count rate is <= 0. Something went wrong."
if self.norm.lower() == 'leahy':
c = unnorm_power.real
power = c * 2. / actual_nphots
elif self.norm.lower() == 'frac':
c = unnorm_power.real / np.float(self.n**2.)
power = c * 2. * tseg / (actual_mean**2.0)
elif self.norm.lower() == 'abs':
c = unnorm_power.real / np.float(self.n**2.)
power = c * (2. * tseg)
elif self.norm.lower() == 'none':
power = unnorm_power
else:
raise Exception("Normalization not recognized!")
return power
[docs] def rebin_log(self, f=0.01):
"""
Logarithmic rebin of the periodogram.
The new frequency depends on the previous frequency
modified by a factor f:
dnu_j = dnu_{j-1}*(1+f)
Parameters
----------
f: float, optional, default 0.01
parameter that steers the frequency resolution
Returns
-------
binfreq: numpy.ndarray
the binned frequencies
binpower: numpy.ndarray
the binned powers
nsamples: numpy.ndarray
the samples of the original periodogramincluded in each
frequency bin
"""
minfreq = self.freq[1] * 0.5 # frequency to start from
maxfreq = self.freq[-1] # maximum frequency to end
binfreq = [minfreq, minfreq + self.df] # first
df = self.freq[1] # the frequency resolution of the first bin
# until we reach the maximum frequency, increase the width of each
# frequency bin by f
while binfreq[-1] <= maxfreq:
binfreq.append(binfreq[-1] + df*(1.0+f))
df = binfreq[-1] - binfreq[-2]
# compute the mean of the powers that fall into each new frequency bin
binpower, bin_edges, binno = scipy.stats.binned_statistic(
self.freq, self.power, statistic="mean", bins=binfreq)
# compute the number of powers in each frequency bin
nsamples = np.array([len(binno[np.where(binno == i)[0]])
for i in range(np.max(binno))])
# the frequency resolution
df = np.diff(binfreq)
# shift the lower bin edges to the middle of the bin and drop the
# last right bin edge
binfreq = binfreq[:-1] + df/2
return binfreq, binpower, nsamples
[docs] def coherence(self):
"""
Compute Coherence function of the cross spectrum. Coherence is a
Fourier frequency dependent measure of the linear correlation
between time series measured simultaneously in two energy channels.
Returns
-------
coh : numpy.ndarray
Coherence function
References
----------
.. [1] http://iopscience.iop.org/article/10.1086/310430/pdf
"""
# this computes the averaged power spectrum, but using the
# cross spectrum code to avoid circular imports
ps1 = Crossspectrum(self.lc1, self.lc1)
ps2 = Crossspectrum(self.lc2, self.lc2)
return self.unnorm_power/(ps1.unnorm_power * ps2.unnorm_power)
def _phase_lag(self):
"""Return the fourier phase lag of the cross spectrum."""
return np.angle(self.power)
[docs] def time_lag(self):
"""
Calculate the fourier time lag of the cross spectrum. The time lag is
calculate using the center of the frequency bins.
"""
if self.__class__ in [Crossspectrum, AveragedCrossspectrum]:
ph_lag = self._phase_lag()
return ph_lag / (2 * np.pi * self.freq)
else:
raise AttributeError("Object has no attribute named 'time_lag' !")
[docs]class AveragedCrossspectrum(Crossspectrum):
def __init__(self, lc1, lc2, segment_size, norm='none'):
"""
Make an averaged cross spectrum from a light curve by segmenting two
light curves, Fourier-transforming each segment and then averaging the
resulting cross spectra.
Parameters
----------
lc1: lightcurve.Lightcurve object OR
iterable of lightcurve.Lightcurve objects
One light curve data to be Fourier-transformed. This is the band
of interest or channel of interest.
lc2: lightcurve.Lightcurve object OR
iterable of lightcurve.Lightcurve objects
Second light curve data to be Fourier-transformed. This is the
reference band.
segment_size: float
The size of each segment to average. Note that if the total duration
of each Lightcurve object in lc1 or lc2 is not an integer multiple
of the segment_size, then any fraction left-over at the end of the
time series will be lost. Otherwise you introduce artefacts.
norm: {'frac', 'abs', 'leahy', 'none'}, default 'none'
The normalization of the (real part of the) cross spectrum.
Attributes
----------
freq: numpy.ndarray
The array of mid-bin frequencies that the Fourier transform samples
power: numpy.ndarray
The array of cross spectra
df: float
The frequency resolution
m: int
The number of averaged cross spectra
n: int
The number of time bins per segment of light curve?
nphots1: float
The total number of photons in the first (interest) light curve
nphots2: float
The total number of photons in the second (reference) light curve
"""
self.type = "crossspectrum"
if isinstance(norm, str) is False:
raise TypeError("Norm must be a string!")
if norm.lower() not in ["frac", "abs", "leahy", "none"]:
raise ValueError("norm must be 'frac', 'abs', 'leahy', or 'none'!")
#if isinstance(lc1, lightcurve.Lightcurve) is False:
# raise TypeError("lc1 must be a lightcurve.Lightcurve object")
#if isinstance(lc2, lightcurve.Lightcurve) is False:
# raise TypeError("lc2 must be a lightcurve.Lightcurve object")
self.norm = norm.lower()
assert np.isfinite(segment_size), "segment_size must be finite!"
self.segment_size = segment_size
Crossspectrum.__init__(self, lc1, lc2, self.norm)
return
def _make_segment_spectrum(self, lc1, lc2, segment_size):
# TODO: need to update this for making cross spectra.
assert isinstance(lc1, lightcurve.Lightcurve)
assert isinstance(lc2, lightcurve.Lightcurve)
assert lc1.dt == lc2.dt, \
"Light curves do not have same time binning dt."
assert lc1.tseg == lc2.tseg, "Lightcurves do not have same tseg."
# number of bins per segment
nbins = int(segment_size/lc1.dt)
start_ind = 0
end_ind = nbins
cs_all = []
nphots1_all = []
nphots2_all = []
while end_ind <= lc1.counts.shape[0]:
time_1 = lc1.time[start_ind:end_ind]
counts_1 = lc1.counts[start_ind:end_ind]
time_2 = lc2.time[start_ind:end_ind]
counts_2 = lc2.counts[start_ind:end_ind]
lc1_seg = lightcurve.Lightcurve(time_1, counts_1)
lc2_seg = lightcurve.Lightcurve(time_2, counts_2)
cs_seg = Crossspectrum(lc1_seg, lc2_seg, norm=self.norm)
cs_all.append(cs_seg)
nphots1_all.append(np.sum(lc1_seg.counts))
nphots2_all.append(np.sum(lc2_seg.counts))
start_ind += nbins
end_ind += nbins
return cs_all, nphots1_all, nphots2_all
def _make_crossspectrum(self, lc1, lc2):
# chop light curves into segments
if isinstance(lc1, lightcurve.Lightcurve) and \
isinstance(lc2, lightcurve.Lightcurve):
if self.type == "crossspectrum":
self.cs_all, nphots1_all, nphots2_all = \
self._make_segment_spectrum(lc1, lc2, self.segment_size)
elif self.type == "powerspectrum":
self.cs_all, nphots1_all = \
self._make_segment_spectrum(lc1, self.segment_size)
else:
self.cs_all, nphots1_all, nphots2_all = [], [], []
## TODO: should be using izip from iterables if lc1 or lc2 could
## be long
for lc1_seg, lc2_seg in zip(lc1, lc2):
if self.type == "crossspectrum":
cs_sep, nphots1_sep, nphots2_sep = \
self._make_segment_spectrum(lc1_seg, lc2_seg,
self.segment_size)
nphots2_all.append(nphots2_sep)
elif self.type == "powerspectrum":
cs_sep, nphots1_sep = \
self._make_segment_spectrum(lc1_seg, self.segment_size)
else:
raise Exception("Type of spectrum not recognized!")
self.cs_all.append(cs_sep)
nphots1_all.append(nphots1_sep)
self.cs_all = np.hstack(self.cs_all)
nphots1_all = np.hstack(nphots1_all)
if self.type == "crossspectrum":
nphots2_all = np.hstack(nphots2_all)
m = len(self.cs_all)
nphots1 = np.mean(nphots1_all)
power_avg = np.zeros_like(self.cs_all[0].power)
for cs in self.cs_all:
power_avg += cs.power
power_avg /= np.float(m)
self.freq = self.cs_all[0].freq
self.power = power_avg
self.m = m
self.df = self.cs_all[0].df
self.n = self.cs_all[0].n
self.nphots1 = nphots1
if self.type == "crossspectrum":
self.nphots1 = nphots1
nphots2 = np.mean(nphots2_all)
self.nphots2 = nphots2
[docs] def coherence(self):
"""
Compute an averaged Coherence function of cross spectrum by computing
coherence function of each segment and averaging them. The return type
is a tuple with first element as the coherence function and the second
element as the corresponding uncertainty[1] associated with it.
Note : The uncertainty in coherence function is strictly valid for
Gaussian statistics only.
Returns
-------
tuple : tuple of np.ndarray
Tuple of coherence function and uncertainty.
References
----------
.. [1] http://iopscience.iop.org/article/10.1086/310430/pdf
"""
if self.m < 50:
utils.simon("Number of segments used in averaging is "
"significantly low. The result might not follow the "
"expected statistical distributions.")
# Calculate average coherence
unnorm_power_avg = np.zeros_like(self.cs_all[0].unnorm_power)
for cs in self.cs_all:
unnorm_power_avg += cs.unnorm_power
unnorm_power_avg /= self.m
num = np.abs(unnorm_power_avg)**2
# this computes the averaged power spectrum, but using the
# cross spectrum code to avoid circular imports
aps1 = AveragedCrossspectrum(self.lc1, self.lc1,
segment_size=self.segment_size)
aps2 = AveragedCrossspectrum(self.lc2, self.lc2,
segment_size=self.segment_size)
unnorm_powers_avg_1 = np.zeros_like(aps1.cs_all[0].unnorm_power)
for ps in aps1.cs_all:
unnorm_powers_avg_1 += ps.unnorm_power
unnorm_powers_avg_2 = np.zeros_like(aps2.cs_all[0].unnorm_power)
for ps in aps2.cs_all:
unnorm_powers_avg_2 += ps.unnorm_power
coh = num / (unnorm_powers_avg_1 * unnorm_powers_avg_2)
# Calculate uncertainty
uncertainty = (2**0.5 * coh * (1 - coh)) / (np.abs(coh) * self.m**0.5)
return (coh, uncertainty)