Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions scopesim/effects/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,4 @@
from .metis_ifu_simple import *
# from . import effects_utils
from .selector_wheel import *
from .sky_ter_curves import *
2 changes: 1 addition & 1 deletion scopesim/effects/psfs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,6 @@

from .psf_base import PSF, PoorMansFOV
from .analytical import (Vibration, NonCommonPathAberration, SeeingPSF,
GaussianDiffractionPSF)
GaussianDiffractionPSF, MoffatPSF)
from .semianalytical import AnisocadoConstPSF
from .discrete import FieldConstantPSF, FieldVaryingPSF
158 changes: 156 additions & 2 deletions scopesim/effects/psfs/analytical.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,22 @@
"""Contains simple Vibration, NCPA, Seeing and Diffraction PSFs."""

from typing import ClassVar
from functools import partial

import numpy as np
from astropy import units as u
from astropy.convolution import Gaussian2DKernel
from astropy.convolution import Gaussian2DKernel, Moffat2DKernel
from scipy.interpolate import make_interp_spline

from .. import DataContainer
from ...optics import ImagePlane
from ...optics.fov import FieldOfView
from ...utils import (from_currsys, quantify, quantity_from_table,
figure_factory, check_keys)
figure_factory, check_keys, get_logger, airmass2zendist,
get_target, get_location, get_zenith_angle)
from . import PSF, PoorMansFOV

logger = get_logger(__name__)

class AnalyticalPSF(PSF):
"""Base class for analytical PSFs."""
Expand Down Expand Up @@ -202,6 +207,155 @@ def plot(self):
return super().plot(PoorMansFOV(pixel_scale, spec_dict))


class MoffatPSF(AnalyticalPSF):
""" Moffat PSF with given FWHM (seeing), and alpha (also known as beta) parameters.

FWHM (seeing) can be given
- using the filename keyword to read it from a table. with columns "wavelength" and "fwhm"
- a single value ("seeing")

Filename overrides seeing, if both are given.
If "seeing" and "pivot" are given, FWHM will be scaled to wavelengths using the seeing law (natural_scale()).
If "enable_ao" is set to True, "ao_filename", "ao_alpha" will be used for MoffatPSF.

:: Example config using filename for FWHM:
name: seeing_psf
class: MoffatPSF
kwargs:
filename: path/to/fwhm_table.dat
alpha: 4.765
enable_ao: False
ao_filename: path/to/ao_fwhm_table.dat
ao_alpha: 3.25

:: Example config using seeing value:
name: seeing_psf
class: MoffatPSF
kwargs:
seeing: 0.7
seeing_unit: "arcsec"
pivot_wave: 500
pivot_wave_unit: "nm"
alpha: 4.765
enable_ao: False
ao_alpha = 3.25

"""
z_order: ClassVar[tuple[int, ...]] = (43, 643)
required_keys = ["alpha"]

def __init__(self, **kwargs):
super().__init__(**kwargs)

if self.meta.get("enable_ao", False):
logger.info("AO enabled, using AO parameters for FWHM.")
check_keys(self.meta, {"ao_alpha"}, action="error")
if self.meta.get("ao_filename", None):
logger.info("filename given for AO FWHM")
_file = DataContainer(filename=self.meta["ao_filename"])
self.fwhm = self.fwhm_from_table(_file.get_data())
else:
logger.info("using in-built AO scale for FWHM")
self.fwhm = self.ao_scale
self.alpha = self.meta["ao_alpha"]

else:
if self.meta.get("filename", None):
logger.info("filename given for FWHM")
self.fwhm = self.fwhm_from_table(self.table)
elif check_keys(self.meta, {"seeing", "seeing_unit", "pivot_wave", "pivot_wave_unit"}, action="warn"):
logger.info("using seeing and natural scale for FWHM")
self.fwhm = partial(self.natural_scale, seeing=self.meta["seeing"]*u.Unit(self.meta["seeing_unit"]),
pivot=self.meta["pivot_wave"]*u.Unit(self.meta["pivot_wave_unit"]),
zenith_angle=self.zenith_angle*u.deg)
elif check_keys(self.meta, {"seeing", "seeing_unit"}, action="error"):
logger.info("using seeing only (wavelength independent) for FWHM")
self.fwhm = lambda wavelengths: self.meta["seeing"] * u.Unit(self.meta["seeing_unit"])
self.alpha = self.meta["alpha"]

def get_kernel(self, fov):
pixel_scale = fov.header["CDELT1"] * u.deg.to(u.arcsec)
pixel_scale = quantify(pixel_scale, u.arcsec)

# for each wavelength in waveset, get the corresponding FWHM, convert to gamma, and create a Moffat kernel
npts = (fov.meta["wave_max"] - fov.meta["wave_min"]) / (from_currsys("!SIM.spectral.spectral_bin_width", self.cmds) * u.um)
##sample only npts from len(fov.waveset)
wavelengths = fov.waveset[::max(1, int(len(fov.waveset) / npts))]
fwhms = self.fwhm(wavelengths).to(u.arcsec) / pixel_scale
gammas = self.fwhm2gamma(fwhms, self.alpha).value
ksize = int(4.0*np.max(fwhms).value)
ksize = ksize + 1 if ksize % 2 == 0 else ksize
kernel = np.zeros((ksize, ksize))
for gamma in gammas:
kernel += Moffat2DKernel(gamma=gamma, alpha=self.alpha, x_size=ksize, y_size=ksize).array
kernel /= len(gammas)
kernel /= np.sum(kernel)
return kernel

@staticmethod
def fwhm_from_table(table):
if "wavelength" not in table.colnames or "fwhm" not in table.colnames:
raise ValueError("Table must contain 'wavelength' and 'fwhm' columns.")
wave_array = quantity_from_table("wavelength", table, u.um)
fwhm_array = quantity_from_table("fwhm", table, u.arcsec)
return make_interp_spline(wave_array, fwhm_array) # returns bspline instance

@property
def zenith_angle(self):
if check_keys(self.cmds, {"!OBS.alt"}):
logger.info("Using !OBS.alt to determine zenith angle.")
return 90 - from_currsys("!OBS.alt", self.cmds)
elif check_keys(self.cmds, {"!OBS.ra", "!OBS.dec", "!ATMO.longitude", "!ATMO.latitude", "!ATMO.altitude", "!OBS.mjdobs"}):
logger.info("Using !OBS.ra, !OBS.dec, and !ATMO location info to determine zenith angle.")
target = get_target(ra=from_currsys("!OBS.ra", self.cmds),
dec=from_currsys("!OBS.dec", self.cmds))
location = get_location(lon=from_currsys("!ATMO.longitude", self.cmds),
lat=from_currsys("!ATMO.latitude", self.cmds),
alt=from_currsys("!ATMO.altitude", self.cmds))
obstime = from_currsys("!OBS.mjdobs", self.cmds)
return get_zenith_angle(target, location, obstime)
elif check_keys(self.cmds, {"!OBS.airmass"}):
logger.info("Using !OBS.airmass to determine zenith angle.")
return airmass2zendist(from_currsys("!OBS.airmass", self.cmds))
else:
logger.warning("No valid input found to determine zenith angle. Defaulting to z=0 (zenith).")
return 0.0

@staticmethod
def natural_scale(wavelengths: u.Quantity,
seeing: u.Quantity = 0.7*u.arcsec, pivot: u.Quantity = 500*u.nm,
zenith_angle: u.Quantity = 0.0*u.deg) -> u.Quantity:
"""
Seeing law scaled to wavelength

https://opg.optica.org/josa/fulltext.cfm?uri=josa-68-7-877&id=57124
https://www.mdpi.com/2072-4292/14/2/405
"""
return seeing * (wavelengths / pivot) ** -0.2 * 1 / np.cos(zenith_angle.to(u.rad)) ** .6

@staticmethod
def ao_scale(wavelengths: u.Quantity) -> u.Quantity:
SKYBANDS = (np.array([300, 400]) * u.nm,
np.array([395, 505]) * u.nm,
np.array([495, 635]) * u.nm,
np.array([625, 800]) * u.nm,
np.array([785, 1000]) * u.nm,
np.array([990, 1185]) * u.nm,
np.array([1165, 1400]) * u.nm,
np.array([1500, 1800]) * u.nm,
np.array([2000, 2600]) * u.nm)
fwhm_ao = lambda x, *a, **kw: make_interp_spline(
[300] + list(map(lambda x: np.mean(x).value, SKYBANDS))[1:][:-1] + [2550],
[343, 357, 220, 190, 185, 183, 182, 175, 182], k=1)(x.to(u.nm).value) * u.mas
return fwhm_ao(wavelengths)

@staticmethod
def fwhm2gamma(fwhm: u.Quantity, alpha) -> u.Quantity:
"""Convert FWHM to Moffat gamma parameter."""
theta_factor = 0.5 / np.sqrt(2**(1./alpha) - 1.0)
return fwhm * theta_factor


def wfe2gauss(wfe, wave, width=None):
strehl = wfe2strehl(wfe, wave)
sigma = _strehl2sigma(strehl)
Expand Down
Loading