Code for computing potential magnetic field extrapolations using the method of Schmidt (1964) as described by Sakurai (1981). Includes helper functions for reprojecting a magnetogram into a Cartesian plane
Field extrapolation methods for computing 3D vector magnetic fields from LOS magnetograms
import numpy as np
from scipy.interpolate import griddata
import astropy.units as u
import numba
from astropy.utils.console import ProgressBar
from synthesizAR.util import SpatialPair
from synthesizAR.visualize import plot_fieldlines
from .helpers import from_local, to_local, magnetic_field_to_yt_dataset
from .fieldlines import trace_fieldlines
__all__ = ['PotentialField']
class PotentialField(object):
Local (~1 AR) potential field extrapolation class
Using the oblique Schmidt method as described in [1]_, compute a potential magnetic vector field
from an observed LOS magnetogram. Note that this method is only valid for spatial scales
:math:`\lesssim 1` active region.
magnetogram : ``
width_z : `~astropy.units.Quantity`
shape_z : `~astropy.units.Quantity`
.. [1] Sakurai, T., 1981, SoPh, `76, 301 <>`_
def __init__(self, magnetogram, width_z:, shape_z: u.pixel):
self.magnetogram = magnetogram
self.shape = SpatialPair(x=magnetogram.dimensions.x, y=magnetogram.dimensions.y, z=shape_z)
range_x, range_y = self._calculate_range(magnetogram)
range_z = u.Quantity([0*, width_z])
self.range = SpatialPair(,,
width_x = np.diff(range_x)[0]
width_y = np.diff(range_y)[0]
self.width = SpatialPair(,, = SpatialPair(x=self.width.x/self.shape.x,
def as_yt(self, B_field):
Wrapper around `~synthesizAR.extrapolate.magnetic_field_to_yt_dataset`
return magnetic_field_to_yt_dataset(B_field.x, B_field.y, B_field.z,
self.range.x, self.range.y, self.range.z)
def trace_fieldlines(self, B_field, number_fieldlines, **kwargs):
Trace field lines through vector magnetic field.
This is a wrapper around `~synthesizAR.extrapolate.trace_fieldlines` and
accepts all of the same keyword arguments. Note that here the field lines are
automatically converted to the HEEQ coordinate system.
B_field : `~synthesizAR.util.SpatialPair`
number_fieldlines : `int`
coordinates : `list`
`~astropy.coordinates.SkyCoord` objects giving coordinates for all field lines
field_strengths : `list`
`~astropy.units.Quantity` for magnitude of :math:`B(s)` for each field line
ds = self.as_yt(B_field)
lower_boundary = self.project_boundary(self.range.x, self.range.y).value
lines = trace_fieldlines(ds, number_fieldlines, lower_boundary=lower_boundary, **kwargs)
coordinates, field_strengths = [], []
with ProgressBar(len(lines), ipython_widget=kwargs.get('notebook', True)) as progress:
for l, b in lines:
l = u.Quantity(l, self.range.x.unit)
l_heeq = from_local(l[:, 0], l[:, 1], l[:, 2],
field_strengths.append(u.Quantity(b, str(ds.r['Bz'].units)))
if kwargs.get('verbose', True): # Optionally suppress progress bar for tests
return coordinates, field_strengths
def _calculate_range(self, magnetogram):
left_corner = to_local(magnetogram.bottom_left_coord,
right_corner = to_local(magnetogram.top_right_coord,
range_x = u.Quantity([left_corner[0][0], right_corner[0][0]])
range_y = u.Quantity([left_corner[1][0], right_corner[1][0]])
return range_x, range_y
def project_boundary(self, range_x, range_y):
Project the magnetogram onto a plane defined by the surface normal at the center of the
# Get all points in local, rotated coordinate system
p_y, p_x = np.indices((int(self.shape.x.value), int(self.shape.y.value)))
pixels = u.Quantity([(i_x, i_y) for i_x, i_y in zip(p_x.flatten(), p_y.flatten())], 'pixel')
world_coords = self.magnetogram.pixel_to_world(pixels[:, 0], pixels[:, 1])
local_x, local_y, _ = to_local(world_coords,
# Flatten
points = np.stack([,], axis=1)
values = u.Quantity(, self.magnetogram.meta['bunit']).value.flatten()
# Interpolate
x_new = np.linspace(range_x[0], range_x[1], int(self.shape.x.value))
y_new = np.linspace(range_y[0], range_y[1], int(self.shape.y.value))
x_grid, y_grid = np.meshgrid(,
boundary_interp = griddata(points, values, (x_grid, y_grid), fill_value=0.)
return u.Quantity(boundary_interp, self.magnetogram.meta['bunit'])
def line_of_sight(self):
LOS vector in the local coordinate system
los = to_local(self.magnetogram.observer_coordinate,
return np.squeeze(u.Quantity(los))
def calculate_phi(self):
Calculate potential
# Set up grid
y_grid, x_grid = np.indices((int(self.shape.x.value), int(self.shape.y.value)))
x_grid = x_grid*
y_grid = y_grid*
z_depth =*np.pi)
# Project lower boundary
boundary = self.project_boundary(self.range.x, self.range.y).value
# Normalized LOS vector
l_hat = (self.line_of_sight/np.sqrt((self.line_of_sight**2).sum())).value
# Calculate phi
delta = SpatialPair(,,
shape = SpatialPair(x=int(self.shape.x.value),
phi = np.zeros((shape.x, shape.y, shape.z))
phi = _calculate_phi_numba(phi, boundary, delta, shape, z_depth, l_hat)
return phi * u.Unit(self.magnetogram.meta['bunit']) * * (1. * u.pixel)
def calculate_field(self, phi: u.G *
Compute vector magnetic field.
Calculate the vector magnetic field using the current-free approximation,
.. math::
\\vec{B} = -\\nabla\phi
The gradient is computed numerically using a five-point stencil,
.. math::
\\frac{\partial B}{\partial x_i} \\approx -\left(\\frac{-B_{x_i}(x_i + 2\Delta x_i) + 8B_{x_i}(x_i + \Delta x_i) - 8B_{x_i}(x_i - \Delta x_i) + B_{x_i}(x_i - 2\Delta x_i)}{12\Delta x_i}\\right)
phi : `~astropy.units.Quantity`
B_field : `~synthesizAR.util.SpatialPair`
x, y, and z components of the vector magnetic field in 3D
Bx = u.Quantity(np.zeros(phi.shape), self.magnetogram.meta['bunit'])
By = u.Quantity(np.zeros(phi.shape), self.magnetogram.meta['bunit'])
Bz = u.Quantity(np.zeros(phi.shape), self.magnetogram.meta['bunit'])
# Take gradient using a five-point stencil
Bx[2:-2, 2:-2, 2:-2] = -(phi[2:-2, :-4, 2:-2] - 8.*phi[2:-2, 1:-3, 2:-2]
+ 8.*phi[2:-2, 3:-1, 2:-2]
- phi[2:-2, 4:, 2:-2])/12./( * 1. * u.pixel)
By[2:-2, 2:-2, 2:-2] = -(phi[:-4, 2:-2, 2:-2] - 8.*phi[1:-3, 2:-2, 2:-2]
+ 8.*phi[3:-1, 2:-2, 2:-2]
- phi[4:, 2:-2, 2:-2])/12./( * 1. * u.pixel)
Bz[2:-2, 2:-2, 2:-2] = -(phi[2:-2, 2:-2, :-4] - 8.*phi[2:-2, 2:-2, 1:-3]
+ 8.*phi[2:-2, 2:-2, 3:-1]
- phi[2:-2, 2:-2, 4:])/12./( * 1. * u.pixel)
# Set boundary conditions such that the last two cells in either direction in each dimension
# are the same as the preceding cell.
for Bfield in (Bx, By, Bz):
for j in [0, 1]:
Bfield[j, :, :] = Bfield[2, :, :]
Bfield[:, j, :] = Bfield[:, 2, :]
Bfield[:, :, j] = Bfield[:, :, 2]
for j in [-2, -1]:
Bfield[j, :, :] = Bfield[-3, :, :]
Bfield[:, j, :] = Bfield[:, -3, :]
Bfield[:, :, j] = Bfield[:, :, -3]
return SpatialPair(x=Bx, y=By, z=Bz)
def extrapolate(self):
phi = self.calculate_phi()
bfield = self.calculate_field(phi)
return bfield
def peek(self, fieldlines, **kwargs):
plot_fieldlines(*fieldlines, magnetogram=self.magnetogram, **kwargs)
@numba.jit(nopython=True, fastmath=True, parallel=True)
def _calculate_phi_numba(phi, boundary, delta, shape, z_depth, l_hat):
factor = 1. / (2. * np.pi) * delta.x * delta.y
for i in numba.prange(shape.x):
for j in numba.prange(shape.y):
for k in numba.prange(shape.z):
Rz = k * delta.z - z_depth
lzRz = l_hat[2] * Rz
for i_prime in range(shape.x):
for j_prime in range(shape.y):
Rx = delta.x * (i - i_prime)
Ry = delta.y * (j - j_prime)
R_mag = np.sqrt(Rx**2 + Ry**2 + Rz**2)
num = l_hat[2] + Rz / R_mag
denom = R_mag + lzRz + Rx*l_hat[0] + Ry*l_hat[1]
green = num / denom
phi[j, i, k] += boundary[j_prime, i_prime] * green * factor
return phi
Functions for generating, tracing, filtering, and converting fieldlines
import warnings
import numpy as np
import astropy.units as u
from sunpy.image.resample import resample
import yt
__all__ = ['filter_streamlines', 'find_seed_points', 'trace_fieldlines']
def filter_streamlines(streamline, domain_width, close_threshold=0.05,
loop_length_range: = [2.e+9, 5.e+10]*, **kwargs):
Check extracted loop to make sure it fits given criteria. Return True if it passes.
streamline : yt streamline object
close_threshold : `float`
percentage of domain width allowed between loop endpoints
loop_length_range : `~astropy.units.Quantity`
minimum and maximum allowed loop lengths (in centimeters)
streamline = streamline[np.all(streamline != 0.0, axis=1)]
loop_length = np.sum(np.linalg.norm(np.diff(streamline, axis=0), axis=1))
if np.fabs(streamline[0, 2] - streamline[-1, 2]) > close_threshold*domain_width[2]:
return False
elif (loop_length > loop_length_range[1].to(
or loop_length < loop_length_range[0].to(
return False
return True
def find_seed_points(ds, number_fieldlines, lower_boundary=None, preexisting_seeds=None,
mask_threshold=0.1, safety=2, max_failures=1000):
Given a 3D extrapolated field and the corresponding magnetogram, estimate the locations of the
seed points for the fieldline tracing through the extrapolated 3D volume.
ds : ``
Dataset containing the 3D extrapolated vector field
number_fieldlines : `int`
Number of seed points
lower_boundary : `numpy.ndarray`, optional
Array to search for seed points on
preexisting_seeds : `list`, optional
If a seed point is in this list, it is thrown out
mask_threshold : `float`, optional
Fraction of the field strength below (above) which the field is masked. Should be between -1
and 1. A value close to +1(-1) means the seed points will be concentrated in areas of more
positive (negative) field.
safety : `float`
Ensures the boundary is not resampled to impossibly small resolutions
max_failures : `int`
# Get lower boundary slice
if lower_boundary is None:
boundary = ds.r[:, :, 0]['Bz'].reshape(ds.domain_dimensions[:2]).value.T
boundary = lower_boundary
# mask the boundary map and estimate resampled resolution
if mask_threshold < 0:
mask_val = np.fabs(mask_threshold) * np.nanmin(boundary)
mask_func =
mask_val = mask_threshold * np.nanmax(boundary)
mask_func =
masked_boundary =, mask_val))
epsilon_area = float(masked_boundary.count()) / float(boundary.shape[0] * boundary.shape[1])
resample_resolution = int(safety*np.sqrt(number_fieldlines/epsilon_area))
# resample and mask the boundary map
boundary_resampled = resample(boundary.T, (resample_resolution, resample_resolution),
method='linear', center=True)
boundary_resampled = boundary_resampled.T
masked_boundary_resampled =, mask_val))
# find the unmasked indices
unmasked_indices = [(ix, iy) for iy, ix in zip(*np.where(masked_boundary_resampled.mask == 0))]
if len(unmasked_indices) < number_fieldlines:
raise ValueError('Requested number of seed points too large. Increase safety factor.')
x_pos = np.linspace(ds.domain_left_edge[0].value, ds.domain_right_edge[0].value,
y_pos = np.linspace(ds.domain_left_edge[1].value, ds.domain_right_edge[1].value,
# choose seed points
seed_points = []
if preexisting_seeds is None:
preexisting_seeds = []
i_fail = 0
z_pos = ds.domain_left_edge.value[2]
while len(seed_points) < number_fieldlines and i_fail < max_failures:
choice = np.random.randint(0, len(unmasked_indices))
ix, iy = unmasked_indices[choice]
_tmp = [x_pos[ix], y_pos[iy], z_pos]
if _tmp not in preexisting_seeds:
i_fail = 0
i_fail += 1
del unmasked_indices[choice]
if i_fail == max_failures:
raise ValueError(f'''Could not find desired number of seed points within failure tolerance of
{max_failures}. Try increasing safety factor or the mask threshold''')
return seed_points
def trace_fieldlines(ds, number_fieldlines, max_tries=100, get_seed_points=None, direction=1,
Trace lines of constant potential through a 3D magnetic field volume.
Given a YT dataset containing a 3D vector magnetic field, trace a number of streamlines
through the volume. This function also accepts any of the keyword arguments that can
be passed to `~synthesizAR.extrapolate.find_seed_points` and
ds : ``
Dataset containing the 3D extrapolated vector field
number_fieldlines : `int`
max_tries : `int`, optional
get_seed_points : function, optional
Function that returns a list of seed points
direction : `int`, optional
Use +1 to trace from positive to negative field and -1 to trace from negative to positive
fieldlines : `list`
get_seed_points = find_seed_points if get_seed_points is None else get_seed_points
# wrap the streamline filter method so we can pass a loop length range to it
streamline_filter_wrapper = np.vectorize(filter_streamlines,
fieldlines = []
seed_points = []
i_tries = 0
while len(fieldlines) < number_fieldlines and i_tries < max_tries:
remaining_fieldlines = number_fieldlines - len(fieldlines)
seed_points = get_seed_points(ds, remaining_fieldlines,
lower_boundary=kwargs.get('lower_boundary', None),
mask_threshold=kwargs.get('mask_threshold', 0.1),
safety=kwargs.get('safety', 2.))
yt_unit = ds.domain_width / ds.domain_width.value
streamlines = yt.visualization.api.Streamlines(ds, seed_points * yt_unit,
xfield='Bx', yfield='By', zfield='Bz',
# FIXME: The reason for this try-catch is that occasionally a streamline will fall out of
# bounds of the yt volume and the tracing will fail. We can just ignore this and move on
# This is probably an issue that should be reported upstream to yt as I'm not really sure
# what this problem is here.
except AssertionError:
i_tries += 1
warnings.warn(f'Streamlines out of bounds. Tries left = {max_tries - i_tries}')
keep_streamline = streamline_filter_wrapper(streamlines.streamlines, ds.domain_width,
if True not in keep_streamline:
i_tries += 1
warnings.warn(f'No acceptable streamlines found. Tries left = {max_tries - i_tries}')
i_tries = 0
fieldlines += [(stream[np.all(stream != 0.0, axis=1)], mag[np.all(stream != 0.0, axis=1)])
for stream, mag, keep in zip(streamlines.streamlines,
keep_streamline) if keep]
if i_tries == max_tries:
warnings.warn(f'Maxed out number of tries with {len(fieldlines)} acceptable streamlines')
return fieldlines
Helper routines for field extrapolation routines and dealing with vector field data
import numpy as np
import astropy.time
import astropy.units as u
from astropy.coordinates import SkyCoord
import yt
import sunpy.coordinates
from import GenericMap, make_fitswcs_header
__all__ = [
def synthetic_magnetogram(bottom_left_coord, top_right_coord, shape: u.pixel, centers,
sigmas: u.arcsec, amplitudes: u.Gauss, observer=None):
Compute synthetic magnetogram using 2D guassian "sunspots"
bottom_left_coord : `~astropy.coordinates.SkyCoord`
Bottom left corner
top_right_coord : `~astropy.coordinates.SkyCoord`
Top right corner
shape : `~astropy.units.Quantity`
Dimensionality of the magnetogram
centers : `~astropy.coordinates.SkyCoord`
Center coordinates of flux concentration
sigmas : `~astropy.units.Quantity`
Standard deviation of flux concentration with shape `(N,2)`, with `N` the
number of flux concentrations
amplitudes : `~astropy.units.Quantity`
Amplitude of flux concentration with shape `(N,)`
observer : `~astropy.coordinates.SkyCoord`, optional
Defaults to Earth observer at current time
time_now =
if observer is None:
observer = sunpy.coordinates.ephemeris.get_earth(time=time_now)
# Transform to HPC frame
hpc_frame = sunpy.coordinates.Helioprojective(observer=observer, obstime=observer.obstime)
bottom_left_coord = bottom_left_coord.transform_to(hpc_frame)
top_right_coord = top_right_coord.transform_to(hpc_frame)
# Setup array
delta_x = (top_right_coord.Tx - bottom_left_coord.Tx).to(u.arcsec)
delta_y = (top_right_coord.Ty - bottom_left_coord.Ty).to(u.arcsec)
dx = delta_x / shape[0]
dy = delta_y / shape[1]
data = np.zeros((int(shape[1].value), int(shape[0].value)))
xphysical, yphysical = np.meshgrid(np.arange(shape[0].value)*shape.unit*dx,
# Add sunspots
centers = centers.transform_to(hpc_frame)
for c, s, a in zip(centers, sigmas, amplitudes):
xc_2 = (xphysical - (c.Tx - bottom_left_coord.Tx)).to(u.arcsec).value**2.0
yc_2 = (yphysical - (c.Ty - bottom_left_coord.Ty)).to(u.arcsec).value**2.0
data += * np.exp(
- xc_2 / (2 * s[0].to(u.arcsec).value**2)
- yc_2 / (2 * s[1].to(u.arcsec).value**2)
# Build metadata
meta = make_fitswcs_header(
reference_pixel=(0, 0) * u.pixel,
scale=u.Quantity((dx, dy)),
meta['bunit'] = 'gauss'
return GenericMap(data, meta)
def magnetic_field_to_yt_dataset(Bx: u.gauss, By: u.gauss, Bz: u.gauss, range_x:,
range_y:, range_z:
Reshape vector magnetic field data into a yt dataset
Bx,By,Bz : `~astropy.units.Quantity`
3D arrays holding the x,y,z components of the extrapolated field
range_x, range_y, range_z : `~astropy.units.Quantity`
Spatial range in the x,y,z dimensions of the grid
Bx =
By =
Bz =
data = dict(Bx=(np.swapaxes(Bx.value, 0, 1), Bx.unit.to_string()),
By=(np.swapaxes(By.value, 0, 1), By.unit.to_string()),
Bz=(np.swapaxes(Bz.value, 0, 1), Bz.unit.to_string()))
# Uniform, rectangular grid
bbox = np.array([,,])
return yt.load_uniform_grid(data, data['Bx'][0].shape,
geometry=('cartesian', ('x', 'y', 'z')))
def from_local(x_local:, y_local:, z_local:, center):
Transform from a Cartesian frame centered on the active region (with the z-axis parallel
to the surface normal).
x_local : `~astropy.units.Quantity`
y_local : `~astropy.units.Quantity`
z_local : `~astropy.units.Quantity`
center : `~astropy.coordinates.SkyCoord`
Center of the active region
coord : `~astropy.coordinates.SkyCoord`
center = center.transform_to(sunpy.coordinates.frames.HeliographicStonyhurst)
x_center, y_center, z_center =
rot_zy = rotate_z(center.lon) @ rotate_y(
# NOTE: the coordinates are permuted because the local z-axis is parallel to the surface normal
coord_heeq = rot_zy @ u.Quantity([z_local, x_local, y_local])
return SkyCoord(x=coord_heeq[0, :] + x_center,
y=coord_heeq[1, :] + y_center,
z=coord_heeq[2, :] + z_center,
def to_local(coord, center):
Transform coordinate to a cartesian frame centered on the active region
(with the z-axis normal to the surface).
coord : `~astropy.coordinates.SkyCoord`
center : `~astropy.coordinates.SkyCoord`
Center of the active region
center = center.transform_to(sunpy.coordinates.HeliographicStonyhurst)
x_center, y_center, z_center =
xyz_heeq = coord.transform_to(sunpy.coordinates.HeliographicStonyhurst)
if xyz_heeq.shape == (3,):
xyz_heeq = xyz_heeq[:, np.newaxis]
x_heeq = xyz_heeq[0, :] - x_center
y_heeq = xyz_heeq[1, :] - y_center
z_heeq = xyz_heeq[2, :] - z_center
rot_yz = rotate_y( @ rotate_z(-center.lon)
coord_local = rot_yz @ u.Quantity([x_heeq, y_heeq, z_heeq])
# NOTE: the coordinates are permuted because the local z-axis is parallel to the surface normal
return coord_local[1, :], coord_local[2, :], coord_local[0, :]
def rotate_z(angle: u.radian):
angle =
return np.array([[np.cos(angle), -np.sin(angle), 0],
[np.sin(angle), np.cos(angle), 0],
[0, 0, 1]])
def rotate_y(angle: u.radian):
angle =
return np.array([[np.cos(angle), 0, np.sin(angle)],
[0, 1, 0],
[-np.sin(angle), 0, np.cos(angle)]])
