#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Thu May 26 11:01:11 2021
@author: daniel
"""
from typing import Union, Optional
import warnings; warnings.filterwarnings("ignore")
from scipy import interpolate, stats
import astropy.constants as const
from pathlib import Path
import numpy as np
from protoRT import compute_opacities
# For loading the internal files in the data dir, importlib.resources replaces pkg_resources from setuptools which was removed in Python3.9+
# v1.0.0 of the code therefore requires Python3.9 or below to work, v1.0.1+ supports modern Python
import protoRT
from importlib.resources import files
[docs]class RadiativeTransferCube:
"""
Class for radiative transfer through a 3D dust density cube and for computing the mass excess.
This class computes the optical depth and outgoing intensity through a
protoplanetary disk model by performing radiative transfer along the z-axis
of a 3D density cube. It supports both monodisperse and polydisperse dust
distributions and can automatically compute dust opacities using the
DSHARP model (Birnstiel et al. 2018) if they are not provided.
It also computes the mass excess — defined as the ratio of the true dust mass in the cube
to the mass inferred from the observed intensity under the assumption of optically thin emission.
This metric quantifies how much disk mass may be underestimated in observations due to
optically thick regions.
Note
----
- The current version of the code assumes that the cube is symmetric along all axes (i.e., cubic domain).
- If no input data is provided, a default test dataset from a streaming
instability simulation by Yang & Johansen (2014) is used. This is a snapshot at orbit 100.
Parameters
----------
data : ndarray, optional
3D dust density cube (e.g., rhop field). Defaults to test data if None.
axis : ndarray, optional
1D coordinate array along the integration axis (must be `z`), in units of gas scale height.
code_rho : float, optional
Midplane gas density in code units (`rho0` in `start.in`). Default is 1.
code_cs : float, optional
Sound speed in code units (`cs0` in `start.in`). Default is 2π.
code_omega : float, optional
Orbital frequency in code units (`Omega` in `start.in`). Default is 2π.
column_density : float, optional
Gas column density in cgs units (g/cm²). Default is 10.
T : float, optional
Isothermal box temperature (K). Default is 30.
H : float, optional
Pressure scale height (cm). Default is 7.5e13 (≈ 5 AU).
stoke : float or ndarray
Stokes number(s) for the dust population.
grain_rho : float or ndarray, optional
Internal grain density (g/cm³). Must correspond with the input Stokes number(s). If None, defaults to 1.675, which is from the DSHARP dust model and must be used if no opacities are input as DSHARP opacities are used by default.
wavelength : float, optional
Wavelength at which opacities are computed (cm). Range: 1e-5 to 10. Default is 0.1.
include_scattering : bool, optional
If True, include scattering opacity in radiative transfer. Default is False.
kappa : float or ndarray, optional
Dust absorption opacity (cm²/g). Must correspond with the input Stokes number(s). If None, computed from DSHARP.
sigma : float or ndarray, optional
Dust scattering opacity (cm²/g). Must correspond with the input Stokes number(s). If None, computed from DSHARP.
p : float, optional
Power-law index for grain size distribution (n(a) ∝ a^{-p}). Default is 2.5.
npar : int, optional
Number of particles in the simulation. Default is 1,000,000. Only required for multi-species simulations.
ipars, xp, yp, zp : ndarray, optional
Particle species and positions. Only required for multi-species simulations.
xgrid, ygrid, zgrid : ndarray, optional
Grid coordinates used to bin particles into grid cells for opacity weighting. Only required for multi-species simulations.
rhopswarm : ndarray, optional
Local swarm densities used in density map conversion and proto-mass calculation. Only required for polydisperse and/or self-gravitating simulations.
particle_weight : float or array_like, optional
Particle density or weight. If a float, all particles have the same weight, if array, must correspond with the particle positions. Pencil Code stores this as the rhop_swarm attribute in read_param(). Default is 6.30813659928. Only required for multi-species simulations.
grid_func : {'linear'}, optional
Grid interpolation scheme for each axis. Only `'linear'` is currently supported. Pencil Code stores these as attributes in read_param(). Default is 'linear'. Only required for multi-species simulations.
num_grid_points : int
Number of grid points in x, y, and z directions. Current code assumes cubic domain. Pencil code stores these as attributes in read_dim(). Default is 262. Only required for multi-species simulations.
num_interp_points : int
Number of interpolation points in x, y, and z. Current code assumes cubic domain. Controls smoothing kernel width. Pencil code stores these as attributes in read_dim(). Default is 256. Only required for multi-species simulations.
index_limits_1 : int
Grid index limits along x (l1), y (m1), and z (n1) directions for trimming ghost zones. Pencil code stores these as attributes in read_dim(). Default is 3. Only required for multi-species simulations.
index_limits_2 : int
Grid index limits along x (l2), y (m2), and z (n2) directions for trimming ghost zones. Pencil code stores these as attributes in read_dim(). Default is 258. Only required for multi-species simulations.
aps : ndarray, optional
Azimuthal positions of particles for proto-mass calculation. Only required for self-gravitating simulations. Must be input to enable protomass calculations.
eps_dtog : float, optional
Dust-to-gas ratio used to estimate proto-masses. Only required for self-gravitating simulations. Must be input to enable protomass calculations.
init_var : ndarray, optional
Initial density cube for mass excess calculation. Only required for self-gravitating simulations. In these cases the initial mass should be considered in the mass excess calculation.
Attributes
----------
axis : ndarray
The 1D coordinate array corresponding to the integration axis (must be the z-axis), in units of gas scale height.
data : ndarray
The 3D dust density cube (rhop field), used for radiative transfer.
filling_factor : float
Fraction of columns with τ ≥ 1 (optically thick).
grain_size : float or ndarray
Derived grain size(s) based on Stokes number(s).
H : float
Gas pressure scale height of the box (cm).
intensity : ndarray
Outgoing intensity map computed along the z-axis.
mass : float
Total dust mass in the cube (g).
mass_excess : float
Ratio of true dust mass to inferred dust mass (from the optically thin approximation).
proto_mass : float
Total mass in gravitationally bound clumps (g), if simulation is self-gravitating.
tau : ndarray
Optical depth map computed along the z-axis.
Methods
-------
configure()
Initializes internal attributes and computes the opacity (if not provided), calculates the optical depth,
performs the radiative transfer, and then computes the mass excess.
blackbody()
Computes blackbody intensity from Planck's law at temperature T.
calc_tau()
Computes the 2D optical depth (τ) map.
calc_t(rhod, kappa, sigma)
Computes cumulative τ along one vertical column.
calc_intensity()
Solves the radiative transfer equation along the z-axis.
calc_mass_excess()
Calculates the dust mass underestimation factor.
calc_grain_size()
Converts Stokes number(s) into physical grain size(s).
extract_opacity()
Computes opacities using the DSHARP dust model.
get_proto_mass()
Estimates mass in bound clumps (protoplanets) using swarm density and azimuthal info.
"""
# Type hints
def __init__(
self,
data: Optional[np.ndarray] = None,
axis: Optional[np.ndarray] = None,
code_rho: float = 1.0,
code_cs: float = 2 * np.pi,
code_omega: float = 2 * np.pi,
column_density: float = 10.0,
T: float = 30.0,
H: float = 5 * const.au.cgs.value,
stoke: Union[float, np.ndarray] = 0.3,
grain_rho: Union[float, np.ndarray] = 1.675,
wavelength: float = 0.1,
include_scattering: bool = False,
kappa: Optional[Union[float, np.ndarray]] = None,
sigma: Optional[Union[float, np.ndarray]] = None,
p: float = 2.5,
npar: int = int(1e6),
ipars: Optional[np.ndarray] = None,
xp: Optional[np.ndarray] = None,
yp: Optional[np.ndarray] = None,
zp: Optional[np.ndarray] = None,
xgrid: Optional[np.ndarray] = None,
ygrid: Optional[np.ndarray] = None,
zgrid: Optional[np.ndarray] = None,
rhopswarm: Optional[np.ndarray] = None,
particle_weight: Optional[Union[float, np.ndarray]] = 6.30813659928,
grid_func: Optional[str] = 'linear',
num_grid_points: Optional[int] = 262,
num_interp_points: Optional[int] = 256,
index_limits_1: Optional[int] = 3,
index_limits_2: Optional[int] = 258,
aps: Optional[np.ndarray] = None,
eps_dtog: Optional[float] = None,
init_var: Optional[np.ndarray] = None,
) -> None:
"""
Initialize
"""
# Simulation Data
self.data: Optional[np.ndarray] = data
self.axis: Optional[np.ndarray] = axis
# Disk Parameters
self.column_density: float = column_density
self.T: float = T
self.H: float = H
# Dust Grain Parameters
self.stoke: Union[float, np.ndarray] = stoke
self.grain_rho: Union[float, np.ndarray] = grain_rho
# Radiative Transfer Settings
self.wavelength: float = wavelength
self.include_scattering: bool = include_scattering
self.kappa: Optional[Union[float, np.ndarray]] = kappa
self.sigma: Optional[Union[float, np.ndarray]] = sigma
self.p: float = p
# Polydisperse Simulation Parameters
self.npar: int = npar
self.ipars: Optional[np.ndarray] = ipars
self.xp: Optional[np.ndarray] = xp
self.yp: Optional[np.ndarray] = yp
self.zp: Optional[np.ndarray] = zp
self.xgrid: Optional[np.ndarray] = xgrid
self.ygrid: Optional[np.ndarray] = ygrid
self.zgrid: Optional[np.ndarray] = zgrid
self.rhopswarm: Optional[np.ndarray] = rhopswarm
self.particle_weight: Optional[Union[float, np.ndarray]] = particle_weight
self.grid_func: Optional[str] = grid_func
self.num_grid_points: Optional[int] = num_grid_points
self.num_interp_points: Optional[int] = num_interp_points
self.index_limits_1: Optional[int] = index_limits_1
self.index_limits_2: Optional[int] = index_limits_2
# Data Normalization Parameters
self.code_rho: float = code_rho
self.code_cs: float = code_cs
self.code_omega: float = code_omega
# Self-Gravity Parameters
self.aps: Optional[np.ndarray] = aps
self.eps_dtog: Optional[float] = eps_dtog
self.init_var: Optional[np.ndarray] = init_var
# Validations and Defaults
if isinstance(self.grain_rho, np.ndarray) == False:
if self.kappa is None and self.grain_rho != 1.675:
raise ValueError(
f"The DSHARP dust model assumes a dust grain density of 1.675 g/cm^3! "
f"The input density is: {self.grain_rho} g/cm^3. "
f"Either change the dust grain density or input the appropriate opacities (kappa/sigma arguments)."
)
else:
if self.kappa is None and np.any(self.grain_rho != 1.675):
raise ValueError(
f"The DSHARP dust model assumes a dust grain density of 1.675 g/cm^3! "
f"The input density is: {self.grain_rho} g/cm^3. "
f"Either change the dust grain density or input the appropriate opacities (kappa/sigma arguments)."
)
if self.include_scattering:
if self.kappa is not None and self.sigma is None:
raise ValueError('The include_scattering parameter is enabled but no scattering coefficient (sigma) was provided!')
if self.sigma is not None and self.kappa is None:
raise ValueError('The include_scattering parameter is enabled but no absorption coefficient (kappa) was provided!')
if self.data is None:
print('No data input, loading density cube from Yang & Johansen (2014), snapshot taken at orbit 100...')
self.data, self.axis = load_cube()
# Additional Attributes
self.tau: Optional[np.ndarray] = None
self.intensity: Optional[np.ndarray] = None
self.mass: Optional[float] = None
self.mass_excess: Optional[float] = None
self.filling_factor: Optional[float] = None
self.area: Optional[float] = None
self.grain_size: Optional[Union[float, np.ndarray]] = None
self.proto_mass: Optional[float] = None
self.grain_size_bins: Optional[np.ndarray] = None
[docs] def blackbody(self):
"""
Compute the blackbody spectral radiance using Planck's law.
This method calculates the specific intensity (spectral radiance) of a blackbody
at temperature `T` and frequency `self.frequency`, assuming pure absorption
(i.e., no scattering contribution). The result is stored in `self.B_nu`.
Raises
------
ValueError
If `self.frequency` or `self.T` is not set, or if `T <= 0`.
Returns
-------
None
Updates the `self.B_nu` attribute with the spectral radiance
in units of erg s⁻¹ cm⁻² Hz⁻¹ sr⁻¹.
"""
# Validate inputs
if self.frequency is None or self.T is None: raise ValueError("Both `frequency` and `T` must be defined before calling the `blackbody()` method.")
if self.T <= 0: raise ValueError("Temperature (`T`) must be positive!")
self.B_nu = 2 * const.h.cgs.value * self.frequency**3 / (const.c.cgs.value**2 * (np.exp(const.h.cgs.value * self.frequency / (const.k_B.cgs.value * self.T)) - 1))
return
[docs] def calc_tau(self):
"""
Compute the optical depth map (`self.tau`) by integrating along the z-axis of the density cube.
For monodisperse simulations, a uniform opacity coefficient is applied throughout.
For polydisperse simulations, species-specific densities are reconstructed in 3D,
and local weighted opacity coefficients are calculated at each cell using the particle data.
This method updates `self.tau` for all (x, y) columns and computes the `self.filling_factor`,
defined as the fraction of columns with optical depth τ ≥ 1.
Notes
-----
- `self.data` must be in cgs units (g/cm³).
- `self.dz` must be the vertical grid spacing in cm.
- `self.kappa` (absorption opacity) is required.
- If `include_scattering` is True, `self.sigma` (scattering opacity) must also be provided.
- For polydisperse simulations:
- `self.grain_size` must be an array.
- `self.ipars`, `self.xp`, and `self.xgrid` must be defined to compute density per species.
Raises
------
ValueError
If required attributes (`self.data`, `self.dz`, `self.kappa`, etc.) are not defined.
If any necessary polydisperse inputs are missing.
Returns
-------
None
The following attributes are updated in place:
- `self.tau` : ndarray
2D optical depth map integrated along the z-axis for each (x, y) column.
- `self.filling_factor` : float
Fraction of columns with τ ≥ 1 (optically thick).
- `self.effective_kappa` : ndarray
3D array of locally averaged absorption opacities (polydisperse only).
- `self.effective_sigma` : ndarray
3D array of locally averaged scattering opacities (if `include_scattering=True`).
- `self.density_per_species` : ndarray
4D array of species-separated dust density grids (polydisperse only).
"""
# Validate
if self.data is None or self.dz is None: raise ValueError("Density data (`self.data`) and grid spacing (`self.dz`) must be defined before calling `calc_tau`.")
if self.kappa is None or (self.include_scattering and self.sigma is None): raise ValueError("Opacity coefficients (`kappa` and optionally `sigma`) must be defined.")
if isinstance(self.grain_size, np.ndarray): #Polydisperse case
if self.ipars is None or self.xp is None or self.xgrid is None:
raise ValueError("Particle positions (`ipars`, `xp`, and grid coordinates) must be defined for polydisperse simulations.")
self.tau = np.zeros((self.Ny, self.Nx)) # To store the optical depth at each (x,y) column
# If polydisperse, create a 'density_per_species' 4D array to store the density of dust grains in each 3D cell on a per-species basis, used to calculate weighted opacities
if isinstance(self.grain_size, np.ndarray):
# To store the weighted opacity coefficients at each cell
self.effective_kappa = np.zeros((self.Nz, self.Ny, self.Nx))
self.effective_sigma = np.zeros((self.Nz, self.Ny, self.Nx))
# Convert the ipars array to numerical labels, first species is 1, second is 2, etc...
species = np.ceil(self.ipars / (self.npar / len(self.grain_size))).astype(int)
num_species = len(np.unique(species))
print(f"Detected {num_species} grain sizes, applying weighted opacity calculation...")
# To store the density of each species located at each cell, shape (num_of_species, len(Nz), len(Ny), len(Nx))
self.density_per_species = np.zeros((num_species, self.Nz, self.Ny, self.Nx))
for species_type in range(1, num_species+1):
print(f"Converting grain positions to density field: {species_type} out of {num_species}")
# Index the particular species
index_species = np.where(species == species_type)[0]
# Extract the (x,y,z) positions for all of these grains
species_x, species_y, species_z = self.xp[index_species], self.yp[index_species], self.zp[index_species]
# If particle weight is an array need to index according to the particle position, otherwise if float, the particles all have same weight
rhop_swarm = self.particle_weight[index_species] if isinstance(self.particle_weight, np.ndarray) else self.particle_weight
# Convert positions of particles to a grid density field
particle_density = particles_to_density(xp=species_x, yp=species_y, zp=species_z,
x=self.xgrid, y=self.ygrid, z=self.zgrid,
rhop_swarm=rhop_swarm, grid_func1=self.grid_func, grid_func2=self.grid_func, grid_func3=self.grid_func,
mx=self.num_grid_points, my=self.num_grid_points, mz=self.num_grid_points,
nx=self.num_interp_points, ny=self.num_interp_points, nz=self.num_interp_points,
n1=self.index_limits_1, n2=self.index_limits_2,
m1=self.index_limits_1, m2=self.index_limits_2,
l1=self.index_limits_1, l2=self.index_limits_2,
density=True)
# Update the density array
self.density_per_species[species_type - 1] = particle_density
###
### Calculate the optical depth: tau = kappa * rhop * dz
###
for i in range(self.Nx):
for j in range(self.Ny):
# This is the Monodisperse case, opacity coefficients are single values (self.kappa and self.sigma)
if isinstance(self.grain_size, np.ndarray) is False:
surface_density = np.trapz(self.data[:, j, i]) * self.dz # * self.unit_sigma
self.tau[j, i] = surface_density * (self.kappa + self.sigma) if self.include_scattering else surface_density * self.kappa
else:
# This is the polydisperse case in which the cell-wise averaged opacities must be calculated: (N1*κ1 + N2*κ2 + N3*κ3 + N4*κ4 + ...) / N
# This loop goes through all the z-cells in the particular (x,y) column and stores the opacities in self.effective_kappa and self.effective_sigma
for k in range(self.Nz):
# To count the opacities at each (x,y,z) cell (equivalent to N*k)
weighted_kappa, weighted_sigma = 0, 0
#denominator = 0
# Add the number of grains in that cell for a given species times that species' opacity coefficient (stored in self.kappa/self.sigma which in the polydisperse case are arrays)
for species_type in range(num_species):
# #
weighted_kappa += self.density_per_species[species_type][k, j, i] * self.kappa[species_type] # This is the numerator (N1*k1 + N2*k2 + ...)
weighted_sigma = weighted_sigma + self.density_per_species[species_type][k, j, i] * self.sigma[species_type] if self.include_scattering else 0
# #
#denominator += self.density_per_species[species_type][k, j, i]
# Divide by the total number of species in that (x,y,z) cell to compute the weighted mean and to the 3D opacity array
self.effective_kappa[k, j, i] = weighted_kappa / np.sum(self.density_per_species[:, k, j, i]) if np.sum(self.density_per_species[:, k, j, i]) != 0 else 0 # Avoid division by zero # Denominator is also np.sum(self.density_per_species[:, k, j, i])
if self.include_scattering:
self.effective_sigma[k, j, i] = weighted_sigma / np.sum(self.density_per_species[:, k, j, i]) if np.sum(self.density_per_species[:, k, j, i]) != 0 else 0 # Avoid division by zero
# Now that the opacities have been calculated for that column, vertically integrate to find the opacity in that column
self.tau[j, i] = np.trapz(self.data[:, j, i] * (self.effective_kappa[:, j, i] + self.effective_sigma[:, j, i])) * self.dz # * self.unit_sigma
# Calculate the ratio of cells that are optically thick (tau >= 1)
self.filling_factor = len(np.where(self.tau >= 1)[0]) / (self.Nx * self.Ny)
return
[docs] def calc_t(self, rhod, effective_kappa, effective_sigma):
"""
Compute the cumulative optical depth along a single vertical column.
This method integrates the optical depth from the bottom of the cube (z = 0)
upward to each height `z`, returning a 1D array of cumulative τ values.
Unlike `calc_tau`, which computes total τ for an entire column, this returns
the depth-dependent progression of optical depth as a function of height.
Parameters
----------
rhod : ndarray
1D array of dust mass densities along the vertical column (g/cm³).
effective_kappa : ndarray
1D array of absorption opacity coefficients along the column (cm²/g).
effective_sigma : ndarray
1D array of scattering opacity coefficients along the column (cm²/g).
Notes
-----
- All input arrays must be 1D and of length `self.Nz`.
- The grid spacing `self.dz` is assumed to be in cm and must be positive.
Raises
------
ValueError
If input arrays are not 1D or do not match the vertical grid size `self.Nz`,
or if `self.dz` is not positive.
Returns
-------
t : ndarray
1D array of cumulative optical depth values at each vertical cell (unitless).
"""
# Validate inputs
if rhod.ndim != 1 or effective_kappa.ndim != 1 or effective_sigma.ndim != 1: raise ValueError("All input arrays must be 1D.")
if not (len(rhod) == len(effective_kappa) == len(effective_sigma) == self.Nz): raise ValueError("All input arrays must have the same length as the column height (Nz).")
if self.dz <= 0: raise ValueError("Grid spacing `dz` must be positive.")
# To store the emission along the z-column
t = np.zeros(self.Nz)
# Integrate starting at the first cell of the column and move upward adding one cell at a time
for i in range(self.Nz):
t[i] = np.trapz(rhod[:i] * (effective_kappa[:i] + effective_sigma[:i])) * self.dz
return t
[docs] def calc_intensity(self):
"""
Compute the outgoing intensity along the z-axis for all (x, y) columns.
This method solves the radiative transfer equation using the emissivity
term only, assuming no incident intensity at the base (i.e., no
back-illumination). It integrates the column-wise emissivity weighted
by the exponential attenuation factor exp(−τ) to determine the observed
intensity at the top of the cube.
Scattering is included if `self.include_scattering` is True, in which
case the mean intensity `self.J` is also used in the emissivity term.
Supports both monodisperse and polydisperse models.
Notes
-----
- Requires `self.tau` to be precomputed using `calc_tau()`.
- Assumes that `self.data` contains dust density in cgs units (g/cm³).
- Assumes Planck function (`self.B_nu`) is constant along each column.
- The returned intensity is in units of erg s⁻¹ cm⁻² Hz⁻¹ sr⁻¹.
- If scattering is enabled, `self.effective_sigma` and `self.J` must be defined.
Raises
------
ValueError
If required attributes such as `self.data`, `self.tau`, or opacity arrays are missing or have incorrect shapes.
Returns
-------
None
The following attribute is updated:
- `self.intensity` : ndarray
2D map of outgoing specific intensity at the top of the cube for each (x, y) position.
"""
# Validate
if self.tau is None: raise ValueError("No optical depth map exists! Run the `calc_tau()` method first.")
if self.data is None or self.data.ndim != 3: raise ValueError("Density data `self.data` must be a 3D array (Nz, Ny, Nx).")
if not (self.effective_kappa.shape == self.data.shape == self.effective_sigma.shape): raise ValueError("Opacity attributes (`effective_kappa`, `effective_sigma`) must match the shape of `self.data`.")
# To store the outgoing intensity at each (x,y) column
self.intensity = np.zeros((self.Ny, self.Nx))
# Integrate each (x,y) column
for i in range(self.Nx):
print(f"Intensity calculation: {i+1} out of {self.Nx}")
for j in range(self.Ny):
rhod = self.data[:, j, i] # The dust density in a particular column
#bb = self.src_fn[:, j, i] # The source function in a particular column
kappa = self.effective_kappa[:, j, i] # The weighted absorption opacities in a particular column
sigma = self.effective_sigma[:, j, i] # The weighted scattering opacities in a particular column (this is zero if include_scattering=False)
# Mask where the particle density is zero along the column
mask = (rhod == 0)
# If density is zero then the source function and opacities should be zero as well
kappa[mask], sigma[mask] = 0, 0
# The directional average of the intensity from scattering solution (3D if polydisperse)
if self.include_scattering:
if isinstance(self.grain_size, np.ndarray): #Polydisperse
J = self.J[:, j, i]
J[mask] = 0
else: #Monodisperse
J = self.J
# This is the optical depth as the emission progresses up the column (0 to z integral)
t = self.calc_t(rhod, kappa, sigma)
# The emissivity
emissivity = (kappa * self.B_nu) + (sigma * J) if self.include_scattering else kappa * self.B_nu
# Integrate to compute the output intensity at a given (x,y) position
#self.intensity[j, i] = np.trapz(bb * np.exp(-(self.tau[j, i] - t)), x=self.axis, dx=self.dx)
self.intensity[j, i] = np.trapz(emissivity * rhod * np.exp(-(self.tau[j, i] - t))) * self.dz
return
[docs] def calc_mass_excess(self):
"""
Calculate the mass underestimation factor ("mass excess") for the dust density cube.
This method solves the radiative transfer equation along each column, calculates
the outgoing intensity, and derives the observed dust mass assuming optically thin
emission. The ratio of the true dust mass to the inferred dust mass is stored
as `self.mass_excess`, quantifying how much mass is missed due to optically
thick regions or scattering.
Scattering is handled via a two-stream approximation (Miyake & Nakagawa 1993; Zhu et al. 2019),
which modifies the source function. Supports both monodisperse and polydisperse models.
Notes
-----
- Requires `self.kappa` to be defined. If `include_scattering=True`, `self.sigma` is also required.
- The cube must be configured first with `configure()`, which sets `self.mass` and grid info.
- The outgoing intensity is computed using `calc_intensity()`, based on the full RT solution.
- Assumes thermal equilibrium (LTE) and no external illumination (I₀ = 0).
- For polydisperse simulations, the average opacity assumed in the optically thin approximation
is taken to be the mean of the absorption opacities across species.
Raises
------
ValueError
If required attributes such as density data, opacities, or mass are not initialized.
Returns
-------
None
The following attributes are updated:
- `self.src_fn` : float or ndarray
The effective source function used in radiative transfer integration.
- `self.intensity` : ndarray
2D array of emergent intensity for each (x, y) column.
- `self.observed_mass` : float
The dust mass inferred under the assumption of optically thin emission.
- `self.mass_excess` : float
The true mass divided by the observed mass. A value > 1 indicates underestimation.
"""
# Validate
if self.data is None: raise ValueError("Density data `self.data` is not initialized.")
if self.mass is None: raise ValueError("The total mass of the box `self.mass` is not initialized. Run `configure()` first.")
if self.kappa is None or (self.include_scattering and self.sigma is None): raise ValueError("Opacity coefficients (`kappa` and `sigma`) must be defined before calculating mass excess.")
# Calculate the optical depth map
self.calc_tau()
# Compute the radiation assuming a Planckian black body
self.blackbody()
###
### Calculate the effective source function (which is just the Planckian if scattering is not accounted for)
###
if self.include_scattering:
# The scattering solution of a thin slab as approximated by Miyake & Nakagawa (1993),
# which has been used to compute the emergent intensity of protoplanetary disks including scattering.
# This is applicable under the assumption that the disk temperature is constant
# and that there are no incoming radiation fields at either the upper or lower disk surfaces.
# Calculate the single scattering albedo
if isinstance(self.grain_size, np.ndarray) is False:
# Monodisperse
self.albedo = self.sigma / (self.kappa + self.sigma)
else:
# Polydisperse -- the albedo and source function are now 3-dimensional!
self.albedo = self.effective_sigma / (self.effective_kappa + self.effective_sigma)
self.albedo[~np.isfinite(self.albedo)] = 0 #Replace all NaNs that come about when the denominator is zero
# Similar format as Zhu. et al (2019) -- Section 2.1 (https://iopscience.iop.org/article/10.3847/2041-8213/ab1f8c/pdf)
epsilon = 1.0 - self.albedo # For convenience
mu = 1.0 / np.sqrt(3.0) # The rays originate from the direction of cos(θ) = 1/sqrt(3) for all inclinations -- where θ is the angle between the intensity and the vertical direction
tau_d = (2 * mu) / 3.0 # Total optical depth in the vertical direction? Or is this the specific depth according to the Eddington-Barbier relation?
tau_ = 0.0 # Variable optical depth in the vertical direction? Or is this the optical depth at the surface of the slab, which is 0?
# Same format as Eq. 8 of Zhu et al. (2019) -- (https://iopscience.iop.org/article/10.3847/2041-8213/ab1f8c/pdf)
numerator = np.exp(-np.sqrt(3 * epsilon) * tau_) + np.exp(np.sqrt(3 * epsilon) * (tau_ - tau_d))
denominator = (np.exp(-np.sqrt(3 * epsilon) * tau_d) * (1 - np.sqrt(epsilon))) + (np.sqrt(epsilon) + 1)
self.J = self.B_nu * (1 - (numerator / denominator))
# With J known we can now solve for the source function (Eq. 6 of Zhu et al. (2019))
if isinstance(self.grain_size, np.ndarray) is False: # Monodisperse case needs a cube for the RT integration
self.src_fn = np.zeros((self.Nz, self.Ny, self.Nx))
self.src_fn[::] = self.albedo * self.J + (1 - self.albedo) * self.B_nu
else: # Polydisperse case is a cube already since albedo is 3D
self.src_fn = self.albedo * self.J + (1 - self.albedo) * self.B_nu
else: #Absorption only case, source function is just the Planckian black body
if isinstance(self.grain_size, np.ndarray) is False:
# Monodisperse
self.src_fn = self.B_nu
else:
# Polydisperse -- make a 3D array in which the value in each cell is the source function
# This will be used to integrate the RT solution (values where dust density is zero will be zeroed out)
self.src_fn = np.zeros((self.Nz, self.Ny, self.Nx))
self.src_fn[::] = self.B_nu
###
### Compute the mass excess / underestimation
###
# Monodisperse
if isinstance(self.grain_size, np.ndarray) is False:
# In the absorption only case the, source function is independent of optical depth, therefore can use the following simplified form
if self.include_scattering is False:
# Set the opactities -- the opacity at every cell is the same (cells with zero dust density will be zeroed out during intensity calculation)
self.effective_kappa, self.effective_sigma = np.zeros((self.Nz, self.Ny, self.Nx)), np.zeros((self.Nz, self.Ny, self.Nx))
self.effective_kappa[::], self.effective_sigma[::] = self.kappa, 0
# Solve the RT solution to set the self.intensity parameter
self.calc_intensity()
# Integrating the general RT solution with constant T and source function as well as I(0)=0 simplifies to the following
#self.intensity = self.B_nu * (1 - np.exp(-self.tau)) # Not used, rather solve the full RT equation using calc_intensity() method
# Under the assumption of optically thin emission, the observed intensity scales with the column density of the dust, allowing us to analytically solve for Σd as
self.sigma_dust = np.mean(self.intensity) / (self.B_nu * self.kappa) # Convolution theory -- take the mean of the output intensity
# If including photon scattering we need to solve the general RT solution since the effective source function is now dependent on position
else:
# Set the opactities -- the opacity at every cell is the same (cells with zero dust density will be zeroed out during intensity calculation)
self.effective_kappa, self.effective_sigma = np.zeros((self.Nz, self.Ny, self.Nx)), np.zeros((self.Nz, self.Ny, self.Nx))
self.effective_kappa[::], self.effective_sigma[::] = self.kappa, self.sigma
# Solve the RT solution to set the self.intensity parameter
self.calc_intensity()
# Under the assumption of optically thin emission, the observed intensity scales with the column density of the dust, allowing us to analytically solve for Σd as
#self.sigma_dust = np.mean(self.intensity) / (self.B_nu * (self.kappa + self.sigma)) # Convolution theory -- take the mean of the output intensity
self.sigma_dust = np.mean(self.intensity) / (self.B_nu * self.kappa) # Correction: Only the absorption opacity is considered when analytically approximating the observed dust mass
#Polydisperse
else:
# Solve the RT solution to set the self.intensity parameter
self.calc_intensity()
# The average opacity an observer would assume (assumes all grain sizes are equally distributed so scales with the inverse number of species)
#self.assumed_opacity = (np.sum(self.kappa) + np.sum(self.sigma)) / len(self.grain_size) if self.include_scattering else np.sum(self.kappa) / len(self.grain_size)
self.assumed_opacity = np.sum(self.kappa) / len(self.grain_size) # Correction: Only the absorption opacity is considered when analytically approximating the observed dust mass
# Under the assumption of optically thin emission, the observed intensity scales with the column density of the dust, allowing us to analytically solve for Σd as
self.sigma_dust = np.mean(self.intensity) / (self.B_nu * self.assumed_opacity) # Convolution theory -- take the mean of the output intensity and the source function
# The observed mass of the box can now be quantified as the product of Σd and the domain area
self.observed_mass = self.sigma_dust * self.area
# The mass underestimation, ratio of true box mass to the observed mass
self.mass_excess = self.mass / self.observed_mass
return
[docs] def calc_grain_size(self):
"""
Compute the grain size(s) from the Stokes number and gas column density.
This method uses the Stokes–Epstein drag relation to calculate the physical grain size `a`
corresponding to a given Stokes number `St`, assuming all grains are in the Epstein regime.
For monodisperse models, a single value is returned; for polydisperse models, an array of
grain sizes is computed based on the per-species Stokes number and internal grain density.
The grain size is given by:
a = (St * 2 * Σ_g) / (π * ρ_g)
where:
- `a` is the grain size (cm),
- `St` is the Stokes number (dimensionless),
- `Σ_g` is the gas column density (g/cm²),
- `ρ_g` is the internal grain density (g/cm³).
Notes
-----
- Assumes Epstein drag regime is valid.
- For polydisperse simulations, both `stoke` and `grain_rho` must be arrays of equal length.
- Requires `self.column_density` > 0.
Raises
------
ValueError
If `stoke` or `grain_rho` is not set, non-positive, or if their lengths mismatch in the polydisperse case.
Returns
-------
None
The computed grain sizes are stored in the following attribute:
- `self.grain_size` : float or ndarray
Computed grain size(s) in cm.
"""
# Validate
if self.stoke is None: raise ValueError("The `stoke` parameter must be provided to calculate grain sizes.")
if self.column_density <= 0: raise ValueError("The gas column density (`column_density`) must be positive.")
if self.grain_rho is None or np.any(np.array(self.grain_rho) <= 0): raise ValueError("The internal grain density (`grain_rho`) must be positive.")
if isinstance(self.stoke, np.ndarray) is False:
# Monodisperse
self.grain_size = self.stoke * 2. * self.column_density / np.pi / self.grain_rho
else:
# Polydisperse
if isinstance(self.grain_rho, np.ndarray) is False:
raise ValueError("If entering multiple stoke's numbers, the corresponding grain_rho paramater must be a list/array of same size!")
# Calculate the grain sizes corresponding to each stokes number
self.grain_size = np.zeros(len(self.stoke))
for grain in range(len(self.grain_size)):
self.grain_size[grain] = self.stoke[grain] * 2. * self.column_density / np.pi / self.grain_rho[grain]
return
[docs] def get_proto_mass(self):
"""
Estimate the masses of protoplanets (planetesimal clumps) from local particle densities.
This method calculates the mass of planetesimal clumps based on the local swarm density
(`rhopswarm`), azimuthal particle positions (`aps`), and the total box dust mass.
The particle mass is inferred from the total number of particles and the global dust-to-gas ratio.
Clumps are identified via non-zero azimuthal particle positions, and individual clump masses
are computed as:
M_p = 10 ** [log10(N_clump) + log10(M_particle)]
where:
- `N_clump` is the number of particles in a given clump (estimated from density),
- `M_particle` is the dust mass per particle.
Notes
-----
- Requires `configure()` to have been run (sets `self.mass`).
- Assumes that `rhopswarm` is proportional to the number of particles per grid cell.
- Returns clump masses in cgs units (g), sorted in ascending order.
- If no clumps are found, `self.proto_mass` is set to 0.
Raises
------
ValueError
If required attributes (`eps_dtog`, `mass`, `rhopswarm`, `aps`, `npar`) are not defined.
Returns
-------
None
The resulting clump masses are stored in the `self.proto_mass` attribute:
- `self.proto_mass` : float or ndarray
Sorted array of clump masses in grams, or 0 if no clumps are detected.
"""
# Validate required inputs
if self.eps_dtog is None: raise ValueError("The `eps_dtog` parameter (dust-to-gas ratio) must be provided.")
if self.mass is None: raise ValueError("The `mass` parameter must be computed (via `configure()`) before calling this method.")
if self.npar <= 0: raise ValueError("The number of particles (`npar`) must be a positive integer.")
if self.rhopswarm is None: raise ValueError("The `rhopswarm` parameter (swarm density) must be provided.")
if self.aps is None: raise ValueError("The `aps` parameter (azimuthal particle positions) must be provided.")
# Calculate the mass of a single particle in code units
mp_code = self.eps_dtog * self.mass / self.npar # Note: this is no longer in code units, as self.mass is coverted during the configuration
mp = stats.mode(self.rhopswarm)[0]
# Identify indices of particles in azimuthal clumps
index = np.where(self.aps != 0)[0]
# Calculate the number of particles in each clump
npclump = self.rhopswarm[index] / mp
tmp = np.log10(npclump)
ttmp = tmp + np.log10(mp_code)
mass = 10**ttmp # Total mass of each clump
self.proto_mass = np.sort(mass)
# If there are no planetesimals set to 0
try:
self.proto_mass.max()
except ValueError:
self.proto_mass = 0
return
[docs]def particles_to_density(
xp, yp, zp,
x, y, z,
rhop_swarm=6.30813659928,
grid_func1='linear', grid_func2='linear', grid_func3='linear',
mx=262, my=262, mz=262, nx=256, ny=256, nz=256,
n1=3, n2=258, m1=3, m2=258, l1=3, l2=258,
density=True):
"""
Deposit particles onto a 3D grid to generate a dust density field.
This function interpolates particle mass or count contributions onto a 3D mesh
using a quadratic weighting scheme adapted from an IDL routine by Anders Johansen.
The method accounts for non-uniform grid spacing and allows interpolation weights
to be computed using either particle mass (`rhop_swarm`) or uniform weighting.
Parameters
----------
xp, yp, zp : array_like
Particle positions in x, y, and z. Pencil Code stores these as attributes in the pvar files.
x, y, z : array_like
Grid node coordinates along x, y, and z axes, respectively. Pencil Code stores these as attributes in read_grid().
rhop_swarm : float or array_like, optional
Particle density or weight. If a float, all particles have the same weight;
if an array, must match length of xp. Default is 6.30813659928. Pencil Code stores this as attribute in read_param().
grid_func1, grid_func2, grid_func3 : {'linear'}, optional
Grid interpolation scheme for each axis. Only `'linear'` is currently supported. Pencil Code stores these as attributes in read_param().
mx, my, mz : int
Number of grid points in x, y, and z directions. Pencil code stores these as attributes in read_dim().
nx, ny, nz : int
Number of interpolation points in x, y, and z. Controls smoothing kernel width. Pencil code stores these as attributes in read_dim().
l1, l2, m1, m2, n1, n2 : int
Grid index limits along x (l), y (m), and z (n) directions for trimming ghost zones. Pencil code stores these as attributes in read_dim().
density : bool, optional
If True, return a mass-weighted density field. If False, return particle count field. Default is True.
Returns
-------
ndarray
3D array of shape (n2 - n1 + 1, m2 - m1 + 1, l2 - l1 + 1), representing the deposited
density or particle count on the grid.
Notes
-----
- The interpolation uses a second-order accurate kernel.
- The resulting array excludes ghost zones as defined by index limits.
- Ensure `xp`, `yp`, `zp` fall within bounds of `x`, `y`, `z` to avoid edge errors.
References
----------
- W. Lyra, private communication (adapted from IDL code by A. Johansen)
"""
dx, dy, dz = np.gradient(x), np.gradient(y), np.gradient(z)
dx1, dy1, dz1 = 1.0/dx, 1.0/dy, 1.0/dz
dx2, dy2, dz2 = 1.0/dx**2, 1.0/dy**2, 1.0/dz**2
if grid_func1 == 'linear': dx1_pt = dx1[0]
if grid_func2 == 'linear': dy1_pt = dy1[0]
if grid_func3 == 'linear': dz1_pt = dz1[0]
nnp = np.zeros((mz, my, mx))
for k in range(len(xp)):
_xp_, _yp_, _zp_ = xp[k], yp[k], zp[k]
ix0 = int(round((_xp_ - x[0]) * dx1_pt)) if grid_func1 == 'linear' else find_index_bisect(_xp_, x)
if ix0 == l2 + 1: ix0 = ix0 - 1
if ix0 == l1 - 1: ix0 = ix0 + 1
dx_1, dx_2 = dx1[ix0], dx2[ix0]
iy0 = int(round((_yp_ - y[0]) * dy1_pt)) if grid_func2 == 'linear' else find_index_bisect(_yp_, y)
if iy0 == m2 + 1: iy0 = iy0 - 1
if iy0 == m1 - 1: iy0 = iy0 + 1
dy_1, dy_2 = dy1[iy0], dy2[iy0]
iz0 = int(round((_zp_ - z[0]) * dz1_pt)) if grid_func3 == 'linear' else find_index_bisect(_zp_, z)
if iz0 == n2 + 1: iz0 = iz0 - 1
if iz0 == n1 - 1: iz0 = iz0 + 1
dz_1, dz_2 = dz1[iz0], dz2[iz0]
ixx0, ixx1 = ix0 - 1, ix0 + 1
iyy0, iyy1 = iy0 - 1, iy0 + 1
izz0, izz1 = iz0 - 1, iz0 + 1
for ixx in np.arange(ixx0, ixx1 + 1):
for iyy in np.arange(iyy0, iyy1 + 1):
for izz in np.arange(izz0, izz1 + 1):
if ( ((ixx - ix0) == -1) or ((ixx - ix0) == +1) ):
weight_x = 1.125 - 1.5 * abs(_xp_ - x[ixx]) * dx_1 + 0.5 * abs(_xp_ - x[ixx])**2 * dx_2
else:
if nx != 1: weight_x = 0.75 - (_xp_ - x[ixx])**2 * dx_2
if ( ((iyy - iy0) == -1) or ((iyy - iy0) == +1) ):
weight_y = 1.125 - 1.5 * abs(_yp_ - y[iyy]) * dy_1 + 0.5 * abs(_yp_ - y[iyy])**2 * dy_2
else:
if ny != 1: weight_y = 0.75 - (_yp_ - y[iyy])**2 * dy_2
if ( ((izz - iz0) == -1) or ((izz - iz0) == +1) ):
weight_z = 1.125 - 1.5 * abs(_zp_ - z[izz]) * dz_1 + 0.5 * abs(_zp_ - z[izz])**2 * dz_2
else:
if nz != 1: weight_z = 0.75 - (_zp_ - z[izz])**2 * dz_2
if density:
weight = rhop_swarm if type(rhop_swarm) is float else rhop_swarm[k]
else:
weight = 1.0
if nx != 1: weight = weight * weight_x
if ny != 1: weight = weight * weight_y
if nz != 1: weight = weight * weight_z
nnp[izz, iyy, ixx] = nnp[izz, iyy, ixx] + weight
return nnp[n1:n2 + 1, m1:m2 + 1, l1:l2 + 1]
[docs]def find_index_bisect(qpar, q):
"""
Find the index of the element in a sorted list closest to a target value using binary search.
This function returns the index of the element in the sorted array `q` that is nearest
to the target value `qpar`, using an efficient bisection method.
Parameters
----------
qpar : float
The target value for which the nearest index is sought.
q : array_like
A 1D sorted array of floats (in ascending order) to search within.
Returns
-------
int
Index of the element in `q` closest to `qpar`.
Notes
-----
This method assumes that `q` is sorted in ascending order. If the array is not sorted,
the result is undefined.
References
----------
- W. Lyra, private communication (adapted from IDL code by A. Johansen)
"""
jl, ju = 0, len(q) - 1
while (ju - jl) > 1:
jm = (ju + jl) // 2
if qpar > q[jm]:
jl = jm
else:
ju = jm
iq0 = jl if (qpar - q[jl] <= q[ju] - qpar) else ju
return iq0
[docs]def load_cube():
"""
Load a 256×256×256 dust density cube from a simulation snapshot.
This function loads a 3D density cube corresponding to a single snapshot (orbit 100)
of a 100-orbit streaming instability simulation in a protoplanetary disk,
provided by Yang & Johansen (2014). The full cube is stored across two
`.npy` files and is concatenated along the z-axis.
Returns
-------
data : ndarray
A 3D NumPy array of shape (256, 256, 256) representing the dust
density cube.
axis : ndarray
A 1D NumPy array representing the spatial coordinates corresponding
to each axis (z, y, x) in code units.
"""
base = files(protoRT).joinpath("data")
density_cube_1 = np.load(base.joinpath("density_cube_1.npy"))
density_cube_2 = np.load(base.joinpath("density_cube_2.npy"))
axis = np.loadtxt(base.joinpath("axis"))
data = np.r_[density_cube_1, density_cube_2]
return data, axis
return data, axis