#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Plots a folder of FAST ESA data as spectrograms.
Assumed folder layout is::
{FAST_CDF_DATA_FOLDER_PATH}/year/month
Filenames in the month folders assumed to be in the following formats::
{??}_{??}_{??}_{instrument}_{timestamp}_{orbit}_v02.cdf (known "instruments" are ees, eeb, ies, or ieb)
{??}_{??}_orb_{orbit}_{??}.cdf
Examples::
FAST_data/2000/01/fa_esa_l2_eeb_20000101001737_13312_v02.cdf
FAST_data/2000/01/fa_k0_orb_13312_v01.cdf
"""
__authors__: list[str] = ["Ev Hansen"]
__contact__: str = "ephansen+gh@terpmail.umd.edu"
__credits__: list[list[str]] = [
["Ev Hansen", "Python code"],
["Emma Mirizio", "Co-Mentor"],
["Marilia Samara", "Co-Mentor"],
]
__date__: str = "2025-08-13"
__status__: str = "Development"
__version__: str = "0.0.1"
__license__: str = "GPL-3.0"
import signal
import os
import sys
import gc
import concurrent.futures
import json
from collections import defaultdict, deque
import math
from datetime import datetime, timezone
from pathlib import Path
from tqdm import tqdm
import traceback
import numpy as np
import cdflib
import time as _time
from typing import Any, Dict, Iterable, List, Optional, Tuple, Union
# Import only required helpers from generic spectrogram module (avoid wildcard import for linting clarity)
from batch_multi_plot_spectrogram import (
load_filtered_orbits,
get_cdf_file_type,
get_timestamps_for_orbit,
generic_plot_multirow_optional_zoom,
close_all_axes_and_clear,
log_error,
log_message,
DEFAULT_ZOOM_WINDOW_MINUTES,
)
# FAST-specific paths (renamed for FAST batch)
FAST_CDF_DATA_FOLDER_PATH = "./FAST_data/"
FAST_FILTERED_ORBITS_CSV_PATH = "./FAST_Cusp_Indices.csv"
FAST_PLOTTING_PROGRESS_JSON = "./batch_multi_plot_FAST_progress.json"
FAST_LOGFILE_PATH = "./batch_multi_plot_FAST_log.log"
FAST_OUTPUT_BASE = "./FAST_plots/"
FAST_LOGFILE_DATETIME_PATH = "./batch_multi_plot_FAST_logfile_datetime.txt"
if os.path.exists(FAST_LOGFILE_DATETIME_PATH):
with open(FAST_LOGFILE_DATETIME_PATH, "r") as f:
FAST_LOGFILE_DATETIME_STRING = f.read().strip()
if not FAST_LOGFILE_DATETIME_STRING:
FAST_LOGFILE_DATETIME_STRING = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
with open(FAST_LOGFILE_DATETIME_PATH, "w") as f:
f.write(FAST_LOGFILE_DATETIME_STRING)
else:
FAST_LOGFILE_DATETIME_STRING = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
with open(FAST_LOGFILE_DATETIME_PATH, "w") as f:
f.write(FAST_LOGFILE_DATETIME_STRING)
FAST_COLLAPSE_FUNCTION = np.nansum
CDF_VARIABLES = ["time_unix", "data", "energy", "pitch_angle"]
# Colormaps for different axis scaling combinations (colorblind-friendly and visually distinct)
DEFAULT_COLORMAP_LINEAR_Y_LINEAR_Z = "viridis"
DEFAULT_COLORMAP_LINEAR_Y_LOG_Z = "cividis"
DEFAULT_COLORMAP_LOG_Y_LINEAR_Z = "plasma"
DEFAULT_COLORMAP_LOG_Y_LOG_Z = "inferno"
# Buffered logging configuration (batching to reduce I/O)
INFO_LOG_BATCH_SIZE_DEFAULT = 10 # fallback default
_INFO_LOG_BATCH_SIZE = INFO_LOG_BATCH_SIZE_DEFAULT
_INFO_LOG_BUFFER: List[Tuple[str, str]] = [] # (level, message)
[docs]
def flush_info_logger_buffer(force: bool = True) -> None:
"""Flush any buffered info/error log messages immediately.
Parameters
----------
force : bool, default True
Present for future extensibility; currently ignored (always flushes).
"""
global _INFO_LOG_BUFFER
if not _INFO_LOG_BUFFER:
return
# Emit each buffered message using underlying logger functions directly
for level, msg in _INFO_LOG_BUFFER:
try:
if level == "error":
try:
log_error(msg)
except Exception as buffered_error_emit_exception:
print(msg, file=sys.stderr)
else:
try:
log_message(msg)
except Exception as buffered_message_emit_exception:
print(msg)
except Exception as buffered_flush_loop_exception:
# Suppress any failure in flush loop, continue with remaining entries
pass
_INFO_LOG_BUFFER = []
# Section: Logging & Helper Functions
[docs]
def info_logger(
prefix: str,
exception: Optional[BaseException] = None,
level: str = "error",
include_trace: bool = False,
force_flush: bool = False,
) -> None:
"""Unified logger for messages and exceptions.
This helper formats a message with an optional exception, includes the
exception class name when provided, and delegates to generic logging
helpers from the base module (``log_message``/``log_error``). When
``include_trace`` is True and an exception is given, a traceback is also
emitted.
Parameters
----------
prefix : str
Human-readable message prefix for the log line.
exception : BaseException or None
Optional exception instance. If ``None``, only ``prefix`` is logged.
level : {'error', 'message'}, default 'error'
Logging level. ``'error'`` routes to ``log_error``; otherwise to
``log_message``.
include_trace : bool, default False
When ``True`` and ``exception`` is not ``None``, include a formatted
traceback after the primary log message.
Parameters
----------
force_flush : bool, default False
When True, forces an immediate flush of the buffered log messages
(including the current one) regardless of batch size.
Returns
-------
None
Notes
-----
If the underlying logging helpers fail (e.g., misconfigured), this
function falls back to printing to stdout/stderr to avoid silent loss. When
an exception is provided, messages are formatted as
``"{prefix} [<ExceptionClass>]: {exception}"``.
"""
if exception is None:
message = str(prefix)
else:
try:
name = getattr(exception, "__class__", type(exception)).__name__
except Exception as exception_name_introspection_exception:
name = "Exception"
message = f"{prefix} [{name}]: {exception}"
# Prepare trace (as separate buffered messages) if requested
trace_lines: List[str] = []
if include_trace and exception is not None:
try:
trace = "".join(
traceback.format_exception(
type(exception), exception, exception.__traceback__
)
)
trace_lines.append("[TRACE]\n" + trace)
except Exception as trace_format_exception:
pass
# Buffer message
try:
_INFO_LOG_BUFFER.append((level, message))
for tr in trace_lines:
_INFO_LOG_BUFFER.append(("message", tr))
# Decide flush
if (
force_flush
or _INFO_LOG_BATCH_SIZE <= 1
or len(_INFO_LOG_BUFFER) >= _INFO_LOG_BATCH_SIZE
):
flush_info_logger_buffer(force=True)
except Exception as info_logger_buffer_append_exception:
# On any buffering failure, attempt direct emission
try:
if level == "error":
log_error(message)
else:
log_message(message)
except Exception as info_logger_direct_emit_exception:
try:
print(message)
except Exception as info_logger_print_fallback_exception:
pass
def _terminate_all_child_processes() -> None:
"""Attempt to terminate all child processes of the current process.
Uses ``psutil``, if available, to iterate over child processes recursively
and call ``terminate()`` on each. Errors are suppressed, as this is
typically invoked during shutdown.
Returns
-------
None
Notes
-----
This is best-effort and does not guarantee exit. Callers may follow with
stronger measures (e.g., ``kill``) after a brief grace period.
"""
try:
import psutil
except Exception as psutil_import_exception:
return
try:
current_process = psutil.Process()
for child in current_process.children(recursive=True):
try:
child.terminate()
except Exception as child_terminate_exception:
pass
except Exception as psutil_process_iter_exception:
pass
[docs]
def round_extrema(value: float | int, direction: str) -> float:
"""
Round an extrema value up or down to a visually clean axis limit.
This function is used to make plot axis extrema (min/max) more visually appealing
and consistent by rounding them to the nearest significant digit in the specified direction.
For example, 1234 rounded up becomes 1300, and 0.0123 rounded down becomes 0.012.
Parameters
----------
value : float or int
The extrema value to round. If zero, returns 0.0.
direction : {'up', 'down'}
The direction to round:
- 'up': round up to the next clean value (for maxima)
- 'down': round down to the previous clean value (for minima)
Returns
-------
float
The rounded extrema value.
Raises
------
ValueError
If an invalid direction is provided.
Examples
--------
>>> round_extrema(1234, 'up')
1300.0
>>> round_extrema(1234, 'down')
1200.0
>>> round_extrema(0.0123, 'up')
0.013
>>> round_extrema(0.0123, 'down')
0.012
"""
# Special case: zero should always round to zero
if value == 0:
return 0.0
# Determine the rounding factor based on the order of magnitude
factor = 10 ** (math.floor(math.log10(abs(value))) - 1)
# Round up for maxima, down for minima
if direction == "up":
# Use math.ceil to ensure we always round up
return float(math.ceil(value / factor) * factor)
elif direction == "down":
# Use math.floor to ensure we always round down
return float(math.floor(value / factor) * factor)
else:
# Raise an error for invalid direction arguments
raise ValueError(f"Invalid direction: {direction}")
[docs]
def FAST_plot_pitch_angle_grid(
cdf_file_path: str,
filtered_orbits_df=None,
orbit_number: Optional[int] = None,
zoom_duration_minutes: float = 6.25,
scale_function_y: str = "linear",
scale_function_z: str = "linear",
pitch_angle_categories: Optional[Dict[str, List[Tuple[float, float]]]] = None,
show: bool = True,
colormap: str = "viridis",
y_min: Optional[float] = None,
y_max: Optional[float] = None,
z_min: Optional[float] = None,
z_max: Optional[float] = None,
) -> Tuple[Any, Any]:
"""Plot a grid of ESA spectrograms collapsed by pitch-angle categories.
Each row corresponds to a pitch-angle category (e.g., downgoing, upgoing,
perpendicular, all). If orbit boundary timestamps are available for this
instrument/orbit, a zoom column is added. Data are loaded from the CDF,
oriented so that energy is the y-axis, and collapsed over pitch-angle via
``FAST_COLLAPSE_FUNCTION`` (``np.nansum`` by default).
Parameters
----------
cdf_file_path : str
Path to the instrument CDF file.
filtered_orbits_df : pandas.DataFrame or None
DataFrame used to compute vertical lines for the ``orbit_number``.
If ``None``, vertical lines are omitted.
orbit_number : int or None
Orbit number used to compute and label vertical lines.
zoom_duration_minutes : float, default 6.25
Window length (minutes) for the optional zoom column.
scale_function_y : {'linear', 'log'}, default 'linear'
Y-axis scaling for the spectrogram.
scale_function_z : {'linear', 'log'}, default 'linear'
Color scale for the spectrogram intensity.
pitch_angle_categories : dict or None
Mapping of label -> list of (min_deg, max_deg) ranges; if ``None``,
defaults to standard four groups.
show : bool, default True
If ``True``, display the plot; otherwise render off-screen.
colormap : str, default 'viridis'
Matplotlib colormap name.
y_min, y_max : float or None, optional
Optional explicit energy (y-axis) limits override. When ``None`` the
default lower bound (0) and observed upper bound (<=4000) subset is
used. These overrides are typically provided by precomputed global
extrema.
z_min, z_max : float or None, optional
Optional explicit color (intensity) scale limits. When ``None`` the
1st / 99th percentiles per row are used (robust to outliers). When
provided (e.g., global extrema), they are applied uniformly across
rows.
Returns
-------
tuple[Figure or None, FigureCanvasBase or None]
Figure and Canvas for the grid, or ``(None, None)`` if no datasets.
Notes
-----
- Energy bins are filtered to ``[0, 4000]`` (or explicit ``y_min`` / ``y_max``).
- ``vmin``/``vmax`` (row color bounds) are derived from 1st/99th percentiles
unless explicit ``z_min`` / ``z_max`` provided.
- Each dataset row includes ``y_label='Energy (eV)'`` and ``z_label='Counts'``;
modify after return if alternative units are desired.
- When no pitch-angle category yields data, the function logs a message and
returns ``(None, None)``.
"""
# TODO: record orbits when error contains "is not a CDF file or a non-supported CDF!" in json log file
if pitch_angle_categories is None:
pitch_angle_categories = {
"downgoing\n(0, 30), (330, 360)": [(0, 30), (330, 360)],
"upgoing\n(150, 210)": [(150, 210)],
"perpendicular\n(40, 140), (210, 330)": [(40, 140), (210, 330)],
"all\n(0, 360)": [(0, 360)],
}
instrument_type = get_cdf_file_type(cdf_file_path)
cdf_file = cdflib.CDF(cdf_file_path)
times = np.asarray(cdf_file.varget(CDF_VARIABLES[0]))
data = np.asarray(cdf_file.varget(CDF_VARIABLES[1]))
energy_full = np.asarray(cdf_file.varget(CDF_VARIABLES[2]))
pitchangle_full = np.asarray(cdf_file.varget(CDF_VARIABLES[3]))
energy = energy_full[0, 0, :] if energy_full.ndim == 3 else energy_full
pitchangle = (
pitchangle_full[0, :, 0] if pitchangle_full.ndim == 3 else pitchangle_full
)
if data.shape[1] == len(energy) and data.shape[2] == len(pitchangle):
data = np.transpose(data, (0, 2, 1))
vertical_lines = None
if filtered_orbits_df is not None and orbit_number is not None:
vertical_lines = get_timestamps_for_orbit(
filtered_orbits_df, orbit_number, instrument_type, times
)
if (vertical_lines is None) or (len(vertical_lines) == 0):
info_logger(
f"No vertical lines found for orbit {orbit_number} in {cdf_file_path}. Skipping.",
level="message",
)
pa_keys = [
"all\n(0, 360)",
"downgoing\n(0, 30), (330, 360)",
"upgoing\n(150, 210)",
"perpendicular\n(40, 140), (210, 330)",
]
datasets = []
for key in pa_keys:
mask = np.zeros_like(pitchangle, dtype=bool)
for rng in pitch_angle_categories[key]:
mask |= (pitchangle >= rng[0]) & (pitchangle <= rng[1])
pa_data = data[:, mask, :]
matrix_full = FAST_COLLAPSE_FUNCTION(pa_data, axis=1)
nan_col_mask = ~np.all(np.isnan(matrix_full), axis=0)
# Apply energy (y-axis) limits; default 0-4000 if not overridden
y_lower = 0 if y_min is None else y_min
y_upper = 4000 if y_max is None else y_max
valid_energy_mask = (energy >= y_lower) & (energy <= y_upper)
combined_mask = nan_col_mask & valid_energy_mask
matrix_full = matrix_full[:, combined_mask]
matrix_full_plot = matrix_full.T
if matrix_full_plot.size == 0:
continue
# Color (z-axis) min/max percentiles unless overridden
if z_min is None:
vmin = np.nanpercentile(matrix_full_plot, 1)
else:
vmin = z_min
if z_max is None:
vmax = np.nanpercentile(matrix_full_plot, 99)
else:
vmax = z_max
# Include per-row y/z overrides so downstream generic grid honors them
datasets.append(
{
"x": times,
"y": energy,
"data": pa_data,
"label": key.title(),
"y_label": "Energy (eV)",
"z_label": "Counts",
"vmin": vmin, # color range (row-specific)
"vmax": vmax,
"y_min": y_lower,
"y_max": y_upper,
# z_min/z_max are not repeated unless provided explicitly to avoid
# forcing identical bounds when percentile scaling applied
**({"z_min": z_min} if z_min is not None else {}),
**({"z_max": z_max} if z_max is not None else {}),
}
)
if not datasets:
info_logger(
f"[WARNING] No pitch angle datasets to plot for {cdf_file_path}.",
level="message",
)
return None, None
title = f"Orbit {orbit_number} - Pitch Angle {instrument_type} ESA Spectrograms"
return generic_plot_multirow_optional_zoom(
datasets,
vertical_lines=vertical_lines,
zoom_duration_minutes=zoom_duration_minutes,
y_scale=scale_function_y,
z_scale=scale_function_z,
colormap=colormap,
show=show,
title=title,
row_label_pad=50,
row_label_rotation=90,
y_min=y_min,
y_max=y_max,
z_min=z_min,
z_max=z_max,
)
[docs]
def FAST_plot_instrument_grid(
cdf_file_paths: Dict[str, str],
filtered_orbits_df=None,
orbit_number: Optional[int] = None,
zoom_duration_minutes: float = 6.25,
scale_function_y: str = "linear",
scale_function_z: str = "linear",
instrument_order: Tuple[str, ...] = ("ees", "eeb", "ies", "ieb"),
show: bool = True,
colormap: str = "viridis",
y_min: Optional[float] = None,
y_max: Optional[float] = None,
z_min: Optional[float] = None,
z_max: Optional[float] = None,
global_extrema: Optional[Dict[str, Union[int, float]]] = None,
) -> Tuple[Any, Any]:
"""Plot a multi-instrument ESA spectrogram grid for a single orbit.
Loads each instrument CDF, orients and filters the data, collapses across
pitch-angle, and constructs datasets for
``generic_plot_multirow_optional_zoom``. When vertical lines are available
for the orbit, a zoom column is included.
Parameters
----------
cdf_file_paths : dict of {str: str}
Mapping of instrument key (``'ees'``, ``'eeb'``, ``'ies'``, ``'ieb'``)
to CDF file path. Missing instruments are skipped.
filtered_orbits_df : pandas.DataFrame or None
DataFrame for vertical line computation; if ``None``, lines are omitted.
orbit_number : int or None
Orbit identifier used in titles/lines.
zoom_duration_minutes : float, default 6.25
Zoom window length (minutes).
scale_function_y : {'linear', 'log'}, default 'linear'
Y-axis scaling.
scale_function_z : {'linear', 'log'}, default 'linear'
Color scale for intensity.
instrument_order : tuple of str, default ('ees', 'eeb', 'ies', 'ieb')
Display order of instrument rows.
show : bool, default True
Whether to show the figure interactively.
colormap : str, default 'viridis'
Matplotlib colormap name.
y_min, y_max, z_min, z_max : float or None, optional
Global fallback overrides for axis/color limits. Per-instrument
overrides from ``global_extrema`` take precedence; finally row-level
percentile scaling is used when neither is provided.
global_extrema : dict or None
Mapping containing precomputed extrema keys (``{instrument}_{y_scale}_{z_scale}_{axis}_{min|max}``) used
to supply per-instrument (row-specific) limits. This enables distinct
y/z ranges for ``ees``, ``eeb``, ``ies``, and ``ieb`` within the same
figure, improving contrast when dynamic ranges differ.
y_min, y_max : float or None, optional
Direct energy bounds override applied when ``global_extrema`` is not
provided. Ignored per-instrument when ``global_extrema`` supplies
instrument-specific keys.
z_min, z_max : float or None, optional
Direct intensity scale overrides (see above re: ``global_extrema``).
global_extrema : dict or None, optional
Mapping containing precomputed extrema keys of the form
``{instrument}_{y_scale}_{z_scale}_{axis}_{min|max}``. When present
these take precedence over ``y_min`` / ``y_max`` / ``z_min`` /
``z_max``.
Returns
-------
tuple[Figure or None, FigureCanvasBase or None]
Figure and Canvas, or ``(None, None)`` if no datasets.
Notes
-----
- Files that fail to load are logged and skipped; remaining instruments may
still render.
- Energy bins are restricted to ``[0, 4000]`` (or overridden via ``global_extrema``
or explicit ``y_min`` / ``y_max``).
- ``vmin``/``vmax`` per row use 1st/99th percentiles for robust scaling unless
``global_extrema`` provides per-instrument ``z_min`` / ``z_max``.
- Each dataset row sets ``y_label='Energy (eV)'`` and ``z_label='Counts'`` by
default for clarity of physical units.
"""
datasets = []
vertical_lines = None
first_times = None
for inst in instrument_order:
cdf_path = cdf_file_paths.get(inst)
if not cdf_path:
continue
try:
cdf_file = cdflib.CDF(cdf_path)
times = np.asarray(cdf_file.varget(CDF_VARIABLES[0]))
data = np.asarray(cdf_file.varget(CDF_VARIABLES[1]))
energy_full = np.asarray(cdf_file.varget(CDF_VARIABLES[2]))
energy = energy_full[0, 0, :] if energy_full.ndim == 3 else energy_full
pitchangle_full = np.asarray(cdf_file.varget(CDF_VARIABLES[3]))
pitchangle = (
pitchangle_full[0, :, 0]
if pitchangle_full.ndim == 3
else pitchangle_full
)
if data.shape[1] == len(energy) and data.shape[2] == len(pitchangle):
data = np.transpose(data, (0, 2, 1))
if first_times is None:
first_times = times
if (
vertical_lines is None
and filtered_orbits_df is not None
and orbit_number is not None
):
instrument_type = get_cdf_file_type(cdf_path)
vertical_lines = get_timestamps_for_orbit(
filtered_orbits_df, orbit_number, instrument_type, times
)
if (vertical_lines is None) or (len(vertical_lines) == 0):
info_logger(
f"No vertical lines found for orbit {orbit_number} in {cdf_path}. Skipping.",
level="message",
)
matrix_full = FAST_COLLAPSE_FUNCTION(data, axis=1)
nan_col_mask = ~np.all(np.isnan(matrix_full), axis=0)
# Determine instrument-specific y bounds (energy)
if isinstance(global_extrema, dict):
key_prefix = f"{inst}_{scale_function_y}_{scale_function_z}"
y_lower = global_extrema.get(
f"{key_prefix}_y_min", 0 if y_min is None else y_min
)
y_upper = global_extrema.get(
f"{key_prefix}_y_max", 4000 if y_max is None else y_max
)
else:
y_lower = 0 if y_min is None else y_min
y_upper = 4000 if y_max is None else y_max
valid_energy_mask = (energy >= y_lower) & (energy <= y_upper)
combined_mask = nan_col_mask & valid_energy_mask
matrix_full = matrix_full[:, combined_mask]
matrix_full_plot = matrix_full.T
if matrix_full_plot.size == 0:
continue
# Determine instrument-specific z bounds (intensity)
if isinstance(global_extrema, dict):
key_prefix = f"{inst}_{scale_function_y}_{scale_function_z}"
vmin = global_extrema.get(f"{key_prefix}_z_min")
vmax = global_extrema.get(f"{key_prefix}_z_max")
if vmin is None:
vmin = np.nanpercentile(matrix_full_plot, 1)
if vmax is None:
vmax = np.nanpercentile(matrix_full_plot, 99)
else:
if z_min is None:
vmin = np.nanpercentile(matrix_full_plot, 1)
else:
vmin = z_min
if z_max is None:
vmax = np.nanpercentile(matrix_full_plot, 99)
else:
vmax = z_max
# Provide per-row overrides so generic multi-row plot can honor
# distinct instrument ranges for both y (energy) and z (intensity).
datasets.append(
{
"x": times,
"y": energy,
"data": data,
"label": inst.upper(),
"y_label": "Energy (eV)",
"z_label": "Counts",
"vmin": vmin,
"vmax": vmax,
"y_min": y_lower,
"y_max": y_upper,
# Only include z_min/z_max if explicitly fixed by global extrema
**({"z_min": z_min} if z_min is not None else {}),
**({"z_max": z_max} if z_max is not None else {}),
}
)
except Exception as file_load_failure:
info_logger(
f"Failed to load CDF for {inst} at {cdf_path}. Skipping.",
file_load_failure,
level="error",
)
continue
if not datasets:
return None, None
title = f"Orbit {orbit_number} - ESA Spectrograms"
return generic_plot_multirow_optional_zoom(
datasets,
vertical_lines=vertical_lines,
zoom_duration_minutes=zoom_duration_minutes,
y_scale=scale_function_y,
z_scale=scale_function_z,
colormap=colormap,
show=show,
title=title,
row_label_pad=50,
row_label_rotation=90,
y_min=y_min,
y_max=y_max,
z_min=z_min,
z_max=z_max,
)
# (Residual code from previous implementation removed in refactor.)
[docs]
def FAST_process_single_orbit(
orbit_number: int,
instrument_file_paths: Dict[str, str],
filtered_orbits_dataframe,
zoom_duration_minutes: float,
y_axis_scale: str,
z_axis_scale: str,
instrument_order: Tuple[str, ...],
colormap: str,
output_base_directory: str,
orbit_timeout_seconds: Union[int, float] = 60,
instrument_timeout_seconds: Union[int, float] = 30,
global_extrema: Optional[Dict[str, Union[int, float]]] = None,
) -> Dict[str, Any]:
"""Process all plots for a single orbit with timeouts and deferred saving.
For each available instrument, renders a pitch-angle grid, then renders a
combined instrument grid. Figures are accumulated in memory and saved only
if no timeout thresholds are exceeded.
Parameters
----------
orbit_number : int
The orbit identifier.
instrument_file_paths : dict of {str: str}
Mapping of instrument key to CDF file path.
filtered_orbits_dataframe : pandas.DataFrame
DataFrame used to compute orbit boundary timestamps.
zoom_duration_minutes : float
Zoom window length for zoomed plots.
y_axis_scale : {'linear', 'log'}
Y-axis scaling.
z_axis_scale : {'linear', 'log'}
Color scale for intensity.
instrument_order : tuple of str
Order used in the instrument grid.
colormap : str
Matplotlib colormap.
output_base_directory : str
Root folder for saving figures; year/month are inferred from the CDF
path when possible, else ``'unknown'``.
orbit_timeout_seconds : int or float, default 60
Maximum wall-clock seconds permitted for the entire orbit processing (summed).
instrument_timeout_seconds : int or float, default 30
Per-instrument rendering timeout; exceeded instruments are skipped and noted.
global_extrema : dict or None
Precomputed extrema mapping used to supply uniform axis limits to all
instrument plots for deterministic scaling; produced by
``compute_global_extrema``.
orbit_timeout_seconds : int or float, default 60
Max total time for this orbit; if exceeded, status becomes ``'timeout'``
and no figures are saved.
instrument_timeout_seconds : int or float, default 30
Max time per instrument (and for the instrument grid). Exceeding this
aborts the orbit without saving.
global_extrema : dict or None, optional
Precomputed extrema dictionary (see ``compute_global_extrema``) used
to supply consistent per-instrument axis limits across all orbits.
Returns
-------
dict
Result dictionary with keys:
``orbit`` (int), ``status`` (``'ok'``, ``'error'``, or ``'timeout'``),
``errors`` (list of str). On timeout, includes ``timeout_type`` and
``timeout_instrument`` when applicable.
Notes
-----
- Timing diagnostics are logged per instrument and for the grid.
- Exceptions during plotting/saving are logged; processing continues when
safe. Figures are closed in all cases to free memory.
"""
result = {"orbit": orbit_number, "status": "ok", "errors": []}
orbit_start_time = _time.time()
pending_figures = [] # defer saving until timeouts cleared
timeout_triggered = False
timeout_type = None
timeout_instrument = None
try:
# Derive year/month for output path
year = "unknown"
month = "unknown"
first_path = next(
(
instrument_file_paths[k]
for k in ("ees", "eeb", "ies", "ieb")
if k in instrument_file_paths
),
None,
)
if first_path:
try:
parts = Path(first_path).parts
for i, part in enumerate(parts):
if part.isdigit() and len(part) == 4:
year = part
if (
i + 1 < len(parts)
and parts[i + 1].isdigit()
and len(parts[i + 1]) == 2
):
month = parts[i + 1]
break
except Exception as year_month_parse_exception:
info_logger(
"[WARN] Could not parse year/month",
year_month_parse_exception,
level="message",
)
output_dir = os.path.join(
output_base_directory, str(year), str(month), str(orbit_number)
)
os.makedirs(output_dir, exist_ok=True)
# Per-instrument processing
for inst_type in ("ees", "eeb", "ies", "ieb"):
if timeout_triggered:
break
cdf_path = instrument_file_paths.get(inst_type)
if not cdf_path:
continue
inst_start = _time.time()
try:
inst_detected = get_cdf_file_type(cdf_path)
if inst_detected is None or inst_detected == "orb":
continue
cdf_obj = cdflib.CDF(cdf_path)
time_unix_array = np.asarray(cdf_obj.varget("time_unix"))
vertical_lines = get_timestamps_for_orbit(
filtered_orbits_dataframe,
orbit_number,
inst_detected,
time_unix_array,
)
# Lookup global extrema overrides
y_min_override = None
y_max_override = None
z_min_override = None
z_max_override = None
if isinstance(global_extrema, dict):
key_base = f"{inst_detected}_{y_axis_scale}_{z_axis_scale}"
y_min_raw = global_extrema.get(f"{key_base}_y_min")
y_max_raw = global_extrema.get(f"{key_base}_y_max")
z_min_raw = global_extrema.get(f"{key_base}_z_min")
z_max_raw = global_extrema.get(f"{key_base}_z_max")
y_min_override = (
round_extrema(y_min_raw, "down")
if y_min_raw is not None
else None
)
y_max_override = (
round_extrema(y_max_raw, "up")
if y_max_raw is not None
else None
)
z_min_override = (
round_extrema(z_min_raw, "down")
if z_min_raw is not None
else None
)
z_max_override = (
round_extrema(z_max_raw, "up")
if z_max_raw is not None
else None
)
fig_pa, canvas_pa = FAST_plot_pitch_angle_grid(
cdf_path,
filtered_orbits_df=filtered_orbits_dataframe,
orbit_number=orbit_number,
zoom_duration_minutes=zoom_duration_minutes,
scale_function_y=y_axis_scale,
scale_function_z=z_axis_scale,
show=False,
colormap=colormap,
y_min=y_min_override,
y_max=y_max_override,
z_min=z_min_override,
z_max=z_max_override,
)
if fig_pa is not None:
cusp = vertical_lines is not None and len(vertical_lines) > 0
filename = f"{orbit_number}{'_cusp' if cusp else ''}_pitch-angle_ESA_{inst_detected}_y-{y_axis_scale}_z-{z_axis_scale}.png"
pending_figures.append(
{
"figure": fig_pa,
"canvas": canvas_pa,
"path": os.path.join(output_dir, filename),
"desc": f"pitch-angle {inst_detected}",
}
)
except Exception as instrument_exception:
err = f"[FAIL] Plotting Orbit {orbit_number} pitch angle grid for {inst_type}"
info_logger(err, instrument_exception, level="error")
result["status"] = "error"
result.setdefault("errors", []).append(err)
continue
finally:
inst_elapsed = _time.time() - inst_start
info_logger(
f"[TIMING] Orbit {orbit_number} instrument {inst_type} elapsed {inst_elapsed:.3f}s",
level="message",
)
if inst_elapsed > instrument_timeout_seconds and not timeout_triggered:
timeout_triggered = True
timeout_type = "instrument"
timeout_instrument = inst_type
info_logger(
f"[TIMEOUT] Instrument {inst_type} in orbit {orbit_number} exceeded {instrument_timeout_seconds:.0f}s ({inst_elapsed:.2f}s). Aborting orbit without saving.",
level="message",
)
break
# Instrument grid (if still OK)
grid_elapsed = None
if not timeout_triggered:
grid_start = _time.time()
try:
# Provide global_extrema dict so per-instrument limits are applied inside grid helper
fig_grid, canvas_grid = FAST_plot_instrument_grid(
instrument_file_paths,
filtered_orbits_df=filtered_orbits_dataframe,
orbit_number=orbit_number,
zoom_duration_minutes=zoom_duration_minutes,
scale_function_y=y_axis_scale,
scale_function_z=z_axis_scale,
instrument_order=instrument_order,
show=False,
colormap=colormap,
global_extrema=global_extrema,
)
if fig_grid is not None:
pending_figures.append(
{
"figure": fig_grid,
"canvas": canvas_grid,
"path": os.path.join(
output_dir,
f"{orbit_number}_instrument-grid_ESA_y-{y_axis_scale}_z-{z_axis_scale}.png",
),
"desc": "instrument-grid",
}
)
except Exception as instrument_grid_exception:
err = f"[FAIL] Plotting Orbit {orbit_number} instrument grid"
info_logger(err, instrument_grid_exception, level="error")
result["status"] = "error"
result.setdefault("errors", []).append(err)
finally:
grid_elapsed = _time.time() - grid_start
info_logger(
f"[TIMING] Orbit {orbit_number} instrument-grid elapsed {grid_elapsed:.3f}s",
level="message",
)
if (
grid_elapsed is not None
and grid_elapsed > instrument_timeout_seconds
and not timeout_triggered
):
timeout_triggered = True
timeout_type = "instrument"
timeout_instrument = "instrument_grid"
info_logger(
f"[TIMEOUT] Instrument grid in orbit {orbit_number} exceeded {instrument_timeout_seconds:.0f}s ({grid_elapsed:.2f}s). Aborting orbit without saving.",
level="message",
)
# Orbit total timeout check
orbit_elapsed = _time.time() - orbit_start_time
if orbit_elapsed > orbit_timeout_seconds and not timeout_triggered:
timeout_triggered = True
timeout_type = "orbit"
info_logger(
f"[TIMEOUT] Orbit {orbit_number} exceeded {orbit_timeout_seconds:.0f}s total ({orbit_elapsed:.2f}s). Aborting without saving.",
level="message",
)
# If timeout -> discard pending figures
if timeout_triggered:
for fig_item in pending_figures:
try:
close_all_axes_and_clear(fig_item["figure"])
except Exception as pitch_angle_energy_transpose_exception:
pass
pending_figures.clear()
result["status"] = "timeout"
result["timeout_type"] = timeout_type
if timeout_instrument:
result["timeout_instrument"] = timeout_instrument
return result
# Save figures now
for fig_item in pending_figures:
fig = fig_item["figure"]
fpath = fig_item["path"]
try:
info_logger(
f"[DEBUG] Saving {fig_item['desc']} plot: y_axis_scale={y_axis_scale}, z_axis_scale={z_axis_scale}, filename={fpath}",
level="message",
)
fig.savefig(fpath, dpi=200)
info_logger(f"[SAVED] {fpath}", level="message")
except Exception as save_exception:
info_logger(
f"[FAIL] Saving figure {fpath}", save_exception, level="error"
)
result["status"] = "error"
result.setdefault("errors", []).append(str(save_exception))
finally:
try:
close_all_axes_and_clear(fig)
except Exception as pitch_angle_percentile_exception:
pass
pending_figures.clear()
except Exception as orbit_processing_exception:
err = f"[FAIL] Orbit {orbit_number} processing"
info_logger(err, orbit_processing_exception, level="error")
result["status"] = "error"
result.setdefault("errors", []).append(err)
finally:
gc.collect()
return result
[docs]
def FAST_plot_spectrograms_directory(
directory_path: str = FAST_CDF_DATA_FOLDER_PATH,
output_base: str = FAST_OUTPUT_BASE,
y_scale: str = "linear",
z_scale: str = "log",
zoom_duration_minutes: float = DEFAULT_ZOOM_WINDOW_MINUTES,
instrument_order: Tuple[str, ...] = ("ees", "eeb", "ies", "ieb"),
verbose: bool = True,
progress_json_path: Optional[str] = FAST_PLOTTING_PROGRESS_JSON,
ignore_progress_json: bool = False,
use_tqdm: Optional[bool] = None,
colormap: str = "viridis",
max_workers: int = 4,
orbit_timeout_seconds: Union[int, float] = 60,
instrument_timeout_seconds: Union[int, float] = 30,
retry_timeouts: bool = True,
flush_batch_size: int = 10,
log_flush_batch_size: Optional[int] = None,
max_processing_percentile: float = 95.0,
) -> List[Dict[str, Any]]:
"""Batch process ESA spectrogram plots for all orbits in a directory.
Discovers instrument CDF files (excluding filenames containing
``"_orb_"``), groups them by orbit number, and processes each orbit in
parallel worker processes for matplotlib safety. Progress is persisted to a
JSON file to support resume and to record error/timeout orbits per y/z
scale combo. Before scheduling orbit tasks a global extrema pass runs
(``compute_global_extrema``) for the requested (y_scale, z_scale). For
log scales, if a completed linear counterpart (same other-axis scale)
exists, its maxima are reused and log-transformed with a floor (see
``log_floor_cutoff`` / ``log_floor_value``) instead of rescanning files.
Parameters
----------
directory_path : str, default FAST_CDF_DATA_FOLDER_PATH
Root folder containing CDF files.
output_base : str, default FAST_OUTPUT_BASE
Base output directory for plots organized as
``output_base/year/month/orbit``.
y_scale : {'linear', 'log'}, default 'linear'
Y-axis scaling.
z_scale : {'linear', 'log'}, default 'log'
Color scale for intensity.
zoom_duration_minutes : float, default DEFAULT_ZOOM_WINDOW_MINUTES
Zoom window length for zoom columns.
instrument_order : tuple of str, default ('ees', 'eeb', 'ies', 'ieb')
Display order for instrument grid.
verbose : bool, default True
If ``True``, prints additional batch messages.
progress_json_path : str or None, default FAST_PLOTTING_PROGRESS_JSON
Path to persist progress across runs.
ignore_progress_json : bool, default False
If ``True``, don't read previous progress before starting.
use_tqdm : bool or None, default None
If ``True``, show a tqdm progress bar; if ``None``, defaults to ``False``.
colormap : str, default 'viridis'
Matplotlib colormap name.
max_workers : int, default 4
Max number of worker processes.
orbit_timeout_seconds : int or float, default 60
Total per-orbit timeout.
instrument_timeout_seconds : int or float, default 30
Per-instrument/grid timeout.
retry_timeouts : bool, default True
If ``True``, retry timed-out orbits once after the initial pass.
flush_batch_size : int, default 10
Batch size applied to both extrema JSON and progress JSON writes to
reduce I/O. Values < 1 are coerced to 1. Final partial batch always flushes.
log_flush_batch_size : int or None, default None
Batch size for buffered logging. If None, defaults to ``flush_batch_size``.
max_processing_percentile : float, default 95.0
Upper percentile (0 < p <= 100) forwarded to the internal
``compute_global_extrema`` helper as ``max_percentile``. Currently this
percentile controls the pooled positive intensity (Z) maxima selection.
Energy (Y) maxima continue to use a fixed 99% cumulative positive count
coverage rule (hard-coded) – i.e., the smallest energy whose cumulative
positive finite sample count reaches 99% of total positive samples. A
future refactor may unify these so that ``max_processing_percentile``
also governs the Y coverage threshold.
Returns
-------
list of dict
Result dictionaries from ``FAST_process_single_orbit`` (and any
retries), in no particular order.
Raises
------
KeyboardInterrupt
Raised immediately on SIGINT/SIGTERM to stop scheduling and terminate
workers.
Notes
-----
- Signal handling installs handlers that request shutdown, terminate child
processes, and raise ``KeyboardInterrupt`` promptly.
- Progress JSON tracks last completed orbit per scale combo under key
``f"progress_{y_scale}_{z_scale}_last_orbit"`` and records error/timeout
orbits under dedicated keys (including per-instrument).
- If ``ignore_progress_json`` is ``True``, progress is not read; writes are
still attempted.
- If ``retry_timeouts`` is ``True``, timed-out orbits are retried once with
a smaller pool.
- ``max_processing_percentile`` only impacts intensity maxima today; Y
maxima coverage threshold is fixed at 99%.
"""
# Placeholders for restoring prior signal handlers at function exit (runtime vars, not part of docstring)
previous_sigint: Any = None
previous_sigterm: Any = None
# Section: Early shutdown support
# A lightweight flag set when SIGINT/SIGTERM received; workers are separate processes so we mainly
# use this to abort scheduling / wait loops and then terminate children explicitly.
shutdown_requested = {"flag": False}
# Section: Global extrema computation (cached)
def compute_global_extrema(
directory_path: str,
y_scale: str,
z_scale: str,
instrument_order: Iterable[str],
extrema_json_path: str = "./FAST_calculated_extrema.json",
compute_mins: bool = False,
max_percentile: float = max_processing_percentile,
log_floor_cutoff: float = 0.1,
log_floor_value: float = -1.0,
flush_batch_size: int = 10,
) -> Dict[str, Union[int, float, Dict[str, Union[int, float]]]]:
"""Compute (or incrementally update) cached axis extrema per instrument.
This routine performs a resumable pass over all instrument CDF files
(excluding those with ``"_orb_"`` in the filename). Processing is
incremental: after each file the current maxima and progress index
are flushed to ``extrema_json_path`` so interrupted executions can
resume without losing prior work.
Extrema Logic
-------------
- Y (energy) minima are fixed to 0 (not computed) unless ``compute_mins`` (linear scale).
- Linear Y maxima: smallest energy whose cumulative positive finite count reaches
99% of total positive finite samples (fixed threshold; independent of
``max_percentile`` for now).
- Linear Z maxima: ``max_percentile``th percentile of pooled positive finite intensity
samples (configurable via outer ``max_processing_percentile``).
- If either axis is requested in log scale and the corresponding linear-scale extrema
(same other-axis scale) are already complete (``complete: True`` in their progress key),
the linear maxima are re-used and transformed via base-10 log without re-scanning files.
- Log transform applies a floor: any linear value ``<= log_floor_cutoff`` or non-finite
is replaced by ``log_floor_value`` (default -1). Minima in log scale are set to this
floor value.
- If a linear precursor is not available for a requested log axis, a full scan occurs and
the resulting maxima are then log-transformed with the same floor rule.
- Maxima are monotonically non-decreasing across incremental updates
and re-ceiled each iteration (``math.ceil``); energy maxima are
additionally capped at 4000.
- Empty / non-positive datasets for a file are skipped silently.
Resume Strategy
---------------
A progress entry with key ``{instrument}_{y_scale}_{z_scale}_extrema_progress``
stores ``processed_index``, ``total`` and a ``complete`` flag. Files
are processed in deterministic (sorted) order; any index less than or
equal to ``processed_index`` is skipped on subsequent runs.
Stored JSON Keys
----------------
``{instrument}_{y_scale}_{z_scale}_y_min`` (always 0)
``{instrument}_{y_scale}_{z_scale}_y_max``
``{instrument}_{y_scale}_{z_scale}_z_min`` (always 0)
``{instrument}_{y_scale}_{z_scale}_z_max``
Parameters
----------
directory_path : str
Root directory containing instrument CDF files.
y_scale : {'linear', 'log'}
Y scaling label (used for key names only at present).
z_scale : {'linear', 'log'}
Z scaling label (used for key names only; percentile selection
already ignores zeros/NaNs).
instrument_order : iterable of str
Instruments to process (e.g., ("ees", "eeb", "ies", "ieb")).
extrema_json_path : str, default './FAST_calculated_extrema.json'
Path to the JSON cache file (created if absent).
max_percentile : float, default value forwarded from the enclosing
``FAST_plot_spectrograms_directory`` via ``max_processing_percentile``
(current default 95.0 there). At present this percentile is applied
ONLY to the pooled positive intensity (Z) distribution to derive
``z_max``. Energy (Y) maxima instead use a fixed 99% cumulative
positive sample coverage heuristic (independent of this value).
Future revisions may allow this percentile to also govern the
energy coverage threshold for consistency.
log_floor_cutoff : float, default 0.1
Threshold below which linear-domain values are treated as at/under floor when
converting to log space (avoids log10 of very small or non-positive values).
log_floor_value : float, default -1.0
Value substituted for any log-transformed minima and for maxima that fall
at or below the cutoff or are non-finite.
flush_batch_size : int, default 10
Number of orbits (that produced updates) to accumulate before flushing the
extrema JSON to disk. Final partial batch is always flushed. Values < 1
are coerced to 1.
Returns
-------
dict
Updated mapping containing extrema values and progress entries.
Notes
-----
Previously computed maxima are reused; only instruments missing at least one
required key are revisited. JSON writes are batched: after every
``flush_batch_size`` orbits that produced updates (or at the very end) a flush
occurs. This reduces I/O frequency at the cost of potentially losing up to
``flush_batch_size - 1`` orbits worth of new extrema work if interrupted.
"""
# Load (or initialise) persistent extrema cache from disk.
if os.path.exists(extrema_json_path):
try:
with open(extrema_json_path, "r") as file_in:
extrema_state: Dict[
str, Union[int, float, Dict[str, Union[int, float]]]
] = json.load(file_in)
except Exception as extrema_json_read_exception:
# Fall back to a fresh state if the file is corrupt / unreadable.
info_logger(
f"[EXTREMA] Failed to read existing extrema JSON '{extrema_json_path}' (starting fresh)",
extrema_json_read_exception,
level="message",
)
extrema_state = {}
else:
extrema_state = {}
# Helper: safe base-10 log transform honoring floor cutoff/value.
def _safe_log_transform(linear_value: Optional[Union[int, float]]) -> float:
"""Convert a linear-domain positive value to log10 with floor handling.
Parameters
----------
linear_value : float or int or None
The original (linear scale) extrema value. If None, non-finite, or
<= ``log_floor_cutoff`` it is mapped to ``log_floor_value``.
Returns
-------
float
The transformed log10 value or the configured floor value when the
input is absent / invalid / below threshold.
"""
try:
if linear_value is None:
return float(log_floor_value)
numeric_value = float(linear_value)
if not np.isfinite(numeric_value) or numeric_value <= log_floor_cutoff:
return float(log_floor_value)
return float(np.log10(numeric_value))
except Exception as safe_log_transform_exception:
info_logger(
"[EXTREMA] _safe_log_transform failed; substituting floor value",
safe_log_transform_exception,
level="message",
)
return float(log_floor_value)
# Section: Discover CDF files and group by orbit/instrument
# orbit_map[orbit_number][instrument_name] -> list[cdf_paths]; skips
# paths containing '_orb_' and filenames lacking a parsable orbit.
orbit_map: Dict[int, Dict[str, List[str]]] = defaultdict(
lambda: defaultdict(list)
)
for path_obj in Path(directory_path).rglob("*.[cC][dD][fF]"):
candidate_path = str(path_obj)
if "_orb_" in candidate_path.lower(): # skip aggregated/orbit products
continue
instrument_name = get_cdf_file_type(candidate_path)
if not (
isinstance(instrument_name, str) and instrument_name in instrument_order
):
continue # not one of the target instruments
file_name = os.path.basename(candidate_path)
file_parts = file_name.split("_")
if len(file_parts) < 5:
continue # unexpected naming, cannot parse orbit
try:
orbit_number = int(file_parts[-2])
except Exception as instrument_processing_exception:
continue
orbit_map[orbit_number][instrument_name].append(candidate_path)
sorted_orbit_numbers = sorted(orbit_map.keys())
# Diagnostic: print instruments found for each orbit
# for orbit in sorted_orbit_numbers:
# instruments_found = list(orbit_map[orbit].keys())
# print(f"[DEBUG] Orbit {orbit} has instruments: {instruments_found}")
# Section: Accumulators (resumable, maxima never decrease)
# energy_positive_counts_by_instrument -> per-energy positive finite counts
# positive_sample_arrays_by_instrument -> positive intensity arrays for z
# file_progress_index_by_instrument -> last processed file index
# total_files_per_instrument -> file counts per instrument
energy_positive_counts_by_instrument: Dict[str, Dict[float, int]] = {
inst: defaultdict(int) for inst in instrument_order
}
positive_sample_arrays_by_instrument: Dict[str, List[np.ndarray]] = {
inst: [] for inst in instrument_order
}
file_progress_index_by_instrument: Dict[str, int] = {
inst: -1 for inst in instrument_order
}
total_files_per_instrument: Dict[str, int] = {
inst: sum(
len(orbit_map[orbit_number].get(inst, []))
for orbit_number in sorted_orbit_numbers
)
for inst in instrument_order
}
# Aggregate total files across all instruments for a single tqdm bar.
total_discovered_files = sum(total_files_per_instrument.values())
processed_files_all = 0 # (presently informational only)
extrema_progress_bar = tqdm(
total=total_discovered_files,
desc=f"Extrema {y_scale}/{z_scale}",
unit="file",
leave=False,
disable=(total_discovered_files == 0),
)
try:
orbits_since_last_flush = (
0 # count of orbits with updates since last disk flush
)
# Refactored: iterate by orbit, then by instrument
last_orbit_global_key = f"{y_scale}_{z_scale}_last_orbit"
last_processed_orbit_val = extrema_state.get(last_orbit_global_key, -1)
if not isinstance(last_processed_orbit_val, (int, float)):
last_processed_orbit = -1
else:
last_processed_orbit = int(last_processed_orbit_val)
for orbit_number in sorted_orbit_numbers:
if orbit_number <= last_processed_orbit:
continue
for instrument_name in instrument_order:
key_prefix = f"{instrument_name}_{y_scale}_{z_scale}"
progress_key = f"{key_prefix}_extrema_progress"
orbit_had_any_updates = (
False # track whether this orbit produced any new data
)
# Fetch prior progress entry for this instrument+scale combo (if any)
raw_progress_entry = (
extrema_state.get(progress_key)
if isinstance(extrema_state, dict)
else None
)
progress_entry = (
raw_progress_entry
if isinstance(raw_progress_entry, dict)
else {}
)
if isinstance(progress_entry, dict) and progress_entry.get(
"complete"
):
continue # Already finished in a previous run
# Attempt reuse if requesting log scaling for either axis and a completed
# linear counterpart exists (same other-axis scale). This must happen
# before we derive file lists to avoid unnecessary scans.
reuse_performed = False
need_log_y = y_scale == "log"
need_log_z = z_scale == "log"
if need_log_y or need_log_z:
linear_y_progress_key = (
f"{instrument_name}_linear_{z_scale}_extrema_progress"
if need_log_y
else None
)
linear_z_progress_key = (
f"{instrument_name}_{y_scale}_linear_extrema_progress"
if need_log_z
else None
)
linear_y_complete = False
if linear_y_progress_key:
linear_y_progress_entry = extrema_state.get(
linear_y_progress_key
)
if isinstance(linear_y_progress_entry, dict):
linear_y_complete = bool(
linear_y_progress_entry.get("complete")
)
linear_z_complete = False
if linear_z_progress_key:
linear_z_progress_entry = extrema_state.get(
linear_z_progress_key
)
if isinstance(linear_z_progress_entry, dict):
linear_z_complete = bool(
linear_z_progress_entry.get("complete")
)
if (not need_log_y or linear_y_complete) and (
not need_log_z or linear_z_complete
):
# Obtain linear maxima sources
if need_log_y:
linear_y_prefix = f"{instrument_name}_linear_{z_scale}"
raw_linear_y_max = extrema_state.get(
f"{linear_y_prefix}_y_max"
)
linear_y_max_val = (
raw_linear_y_max
if isinstance(raw_linear_y_max, (int, float))
else None
)
transformed_y_max = _safe_log_transform(
linear_y_max_val
)
y_min_val = log_floor_value
else:
transformed_y_max = extrema_state.get(
f"{key_prefix}_y_max"
)
y_min_val = 0
if need_log_z:
linear_z_prefix = f"{instrument_name}_{y_scale}_linear"
raw_linear_z_max = extrema_state.get(
f"{linear_z_prefix}_z_max"
)
linear_z_max_val = (
raw_linear_z_max
if isinstance(raw_linear_z_max, (int, float))
else None
)
transformed_z_max = _safe_log_transform(
linear_z_max_val
)
z_min_val = log_floor_value
else:
transformed_z_max = extrema_state.get(
f"{key_prefix}_z_max"
)
z_min_val = 0
# Persist transformed (reused) extrema directly without scanning.
extrema_state[f"{key_prefix}_y_min"] = y_min_val
extrema_state[f"{key_prefix}_y_max"] = (
transformed_y_max
if transformed_y_max is not None
else log_floor_value
)
extrema_state[f"{key_prefix}_z_min"] = z_min_val
extrema_state[f"{key_prefix}_z_max"] = (
transformed_z_max
if transformed_z_max is not None
else log_floor_value
)
total_for_inst = total_files_per_instrument[instrument_name]
extrema_state[progress_key] = {
"processed_index": max(total_for_inst - 1, -1),
"total": total_for_inst,
"complete": True,
}
# Remove instrument-specific last_orbit keys if present
for inst in instrument_order:
inst_key = f"{inst}_{y_scale}_{z_scale}_last_orbit"
if inst_key in extrema_state:
extrema_state.pop(inst_key)
last_orbit_global_key = f"{y_scale}_{z_scale}_last_orbit"
extrema_state[last_orbit_global_key] = (
max(sorted_orbit_numbers)
if sorted_orbit_numbers
else -1
)
reuse_performed = True
info_logger(
f"[EXTREMA] Reused linear extrema for log scaling instrument={instrument_name} y_scale={y_scale} z_scale={z_scale}",
level="message",
)
if reuse_performed:
# Skip scanning
orbit_had_any_updates = True
continue
# Files for the current orbit/instrument
orbit_instrument_files = orbit_map[orbit_number].get(
instrument_name, []
)
# if not orbit_instrument_files:
# print(f"[EXTREMA] Skipping instrument {instrument_name} for orbit {orbit_number}: no files found.")
# continue
# Section: Per-file ingestion & collapse
for cdf_path in sorted(orbit_instrument_files):
try:
cdf_obj = cdflib.CDF(cdf_path)
data_raw = np.asarray(cdf_obj.varget("data"))
energy_raw = np.asarray(cdf_obj.varget("energy"))
pitch_angle_raw = np.asarray(cdf_obj.varget("pitch_angle"))
# Normalize dimensionality: handle (t, energy, pa) vs collapsed shapes.
energy_axis = (
energy_raw[0, 0, :]
if energy_raw.ndim == 3
else energy_raw
)
pitch_angle_axis = (
pitch_angle_raw[0, :, 0]
if pitch_angle_raw.ndim == 3
else pitch_angle_raw
)
# If dimensions are reversed (energy, pa) vs expected (pa, energy) fix ordering.
if data_raw.shape[1] == len(energy_axis) and data_raw.shape[
2
] == len(pitch_angle_axis):
data_raw = np.transpose(data_raw, (0, 2, 1))
# Collapse pitch-angle dimension (axis=1 after potential transpose) to pool intensities.
collapsed_intensity_matrix = FAST_COLLAPSE_FUNCTION(
data_raw, axis=1
)
# Mask columns that are entirely NaN and restrict acceptable energy bounds.
non_all_nan_column_mask = ~np.all(
np.isnan(collapsed_intensity_matrix), axis=0
)
valid_energy_range_mask = (energy_axis >= 0) & (
energy_axis <= 4000
)
combined_valid_column_mask = (
non_all_nan_column_mask & valid_energy_range_mask
)
collapsed_intensity_matrix = collapsed_intensity_matrix[
:, combined_valid_column_mask
]
if collapsed_intensity_matrix.size == 0:
continue # Nothing useful in this file
# Energy values that correspond to retained (non-empty) columns.
retained_energy_values = energy_axis[
combined_valid_column_mask
]
# Identify strictly positive finite samples.
with np.errstate(invalid="ignore"):
positive_sample_mask = np.isfinite(
collapsed_intensity_matrix
) & (collapsed_intensity_matrix > 0)
if positive_sample_mask.any():
# Count positive samples for each retained energy column.
positive_counts_per_energy_column = (
positive_sample_mask.sum(axis=0)
)
for energy_value, positive_count in zip(
retained_energy_values,
positive_counts_per_energy_column,
):
if positive_count > 0:
energy_positive_counts_by_instrument[
instrument_name
][float(energy_value)] += int(positive_count)
# Flatten (extract) positive intensity values for percentile computation.
flattened_positive_values = collapsed_intensity_matrix[
positive_sample_mask
]
if flattened_positive_values.size:
positive_sample_arrays_by_instrument[
instrument_name
].append(flattened_positive_values)
# Update progress for this instrument (file index)
file_progress_index_by_instrument[instrument_name] += 1
orbit_had_any_updates = True
processed_files_all += 1 # informational
try:
extrema_progress_bar.update(1)
except Exception as extrema_progress_postfix_exception:
pass # tqdm may be disabled in some environments
except Exception as ingest_file_exception:
# Capture ingest/parse issues but continue with remaining files.
info_logger(
f"[EXTREMA] Ingest failure inst={instrument_name} orbit={orbit_number} file={cdf_path}",
ingest_file_exception,
level="message",
)
# Section: Post-orbit extrema update (monotonic merge)
try:
energy_positive_counts_map = (
energy_positive_counts_by_instrument[instrument_name]
)
positive_value_blocks = positive_sample_arrays_by_instrument[
instrument_name
]
# Candidate Y (energy) maximum via configurable cumulative coverage rule.
candidate_energy_max = 0.0
if energy_positive_counts_map:
sorted_energy_values = sorted(
energy_positive_counts_map.keys()
)
positive_counts_array = np.array(
[
energy_positive_counts_map[e_val]
for e_val in sorted_energy_values
]
)
cumulative_positive_counts_array = np.cumsum(
positive_counts_array
)
coverage_target_count = (
0.99 * cumulative_positive_counts_array[-1]
)
coverage_index = np.searchsorted(
cumulative_positive_counts_array,
coverage_target_count,
side="right",
)
if coverage_index >= len(sorted_energy_values):
coverage_index = len(sorted_energy_values) - 1
candidate_energy_max = float(
sorted_energy_values[coverage_index]
)
# Candidate Z (intensity) maximum via configurable percentile of pooled positive values.
candidate_intensity_max = 0.0
if positive_value_blocks:
aggregated_positive_values = np.concatenate(
positive_value_blocks
)
finite_positive_values = aggregated_positive_values[
np.isfinite(aggregated_positive_values)
& (aggregated_positive_values > 0)
]
if finite_positive_values.size:
candidate_intensity_max = float(
np.nanpercentile(
finite_positive_values, max_percentile
)
)
# Retrieve any previously stored maxima to enforce monotonic growth.
previous_energy_max_raw = (
extrema_state.get(f"{key_prefix}_y_max")
if isinstance(extrema_state, dict)
else None
)
previous_energy_max = (
previous_energy_max_raw
if isinstance(previous_energy_max_raw, (int, float))
else None
)
previous_intensity_max_raw = (
extrema_state.get(f"{key_prefix}_z_max")
if isinstance(extrema_state, dict)
else None
)
previous_intensity_max = (
previous_intensity_max_raw
if isinstance(previous_intensity_max_raw, (int, float))
else None
)
# Monotonic merge: prefer the larger of previous and new candidate.
merged_energy_max = (
max(float(previous_energy_max), candidate_energy_max)
if previous_energy_max is not None
else candidate_energy_max
)
merged_intensity_max = (
max(float(previous_intensity_max), candidate_intensity_max)
if previous_intensity_max is not None
else candidate_intensity_max
)
# Apply final normalisation: ceil, energy cap at 4000.
try:
merged_energy_max = int(
min(4000, math.ceil(merged_energy_max))
)
except Exception as extrema_bar_update_exception:
pass
try:
merged_intensity_max = float(
math.ceil(merged_intensity_max)
)
except Exception as extrema_reuse_logging_exception:
pass
# Optional minima (currently fixed 0 unless compute_mins True)
if compute_mins and positive_value_blocks:
try:
aggregated_positive_values = np.concatenate(
positive_value_blocks
)
finite_positive_values = aggregated_positive_values[
np.isfinite(aggregated_positive_values)
& (aggregated_positive_values > 0)
]
if finite_positive_values.size:
intensity_min_store = float(
np.nanpercentile(finite_positive_values, 1)
)
else:
intensity_min_store = 0.0
except Exception as extrema_flush_exception:
intensity_min_store = 0.0
energy_min_store = 0
else:
energy_min_store = 0
intensity_min_store = 0
# Persist updated extrema & progress bookkeeping.
extrema_state[f"{key_prefix}_y_min"] = energy_min_store
extrema_state[f"{key_prefix}_y_max"] = merged_energy_max
extrema_state[f"{key_prefix}_z_min"] = intensity_min_store
extrema_state[f"{key_prefix}_z_max"] = merged_intensity_max
extrema_state[progress_key] = {
"processed_index": file_progress_index_by_instrument[
instrument_name
],
"total": total_files_per_instrument[instrument_name],
"complete": file_progress_index_by_instrument[
instrument_name
]
+ 1
>= total_files_per_instrument[instrument_name],
}
# Remove instrument-specific last_orbit keys if present
for inst in instrument_order:
inst_key = f"{inst}_{y_scale}_{z_scale}_last_orbit"
if inst_key in extrema_state:
extrema_state.pop(inst_key)
extrema_state[last_orbit_global_key] = orbit_number
# Progress bar suffix: latest instrument + snapshot of maxima.
try:
extrema_progress_bar.set_postfix(
inst=instrument_name,
orbit=orbit_number,
refresh=False,
)
except Exception as maxima_progress_postfix_exception:
pass
# Diagnostic print for test: print orbit number after finishing extrema for this orbit
# print(f"[EXTREMA] Finished extrema for orbit {orbit_number} instrument {instrument_name}")
except Exception as maxima_update_exception:
info_logger(
f"[EXTREMA] Update failure inst={instrument_name} orbit={orbit_number}",
maxima_update_exception,
level="message",
)
# Section: Batched flush decision
orbits_since_last_flush += 1
if orbits_since_last_flush >= flush_batch_size:
try:
with open(extrema_json_path, "w") as file_out:
json.dump(extrema_state, file_out, indent=2)
orbits_since_last_flush = 0
except Exception as orbit_flush_exception:
info_logger(
f"[EXTREMA] Batched flush failure after orbit {orbit_number}",
orbit_flush_exception,
level="message",
)
# Section: Final flush if pending updates remain
if orbits_since_last_flush > 0:
try:
# Move the global last_orbit key to the top of the JSON
last_orbit_global_key = f"{y_scale}_{z_scale}_last_orbit"
if last_orbit_global_key in extrema_state:
ordered = {
last_orbit_global_key: extrema_state[last_orbit_global_key]
}
for k, v in extrema_state.items():
if k != last_orbit_global_key:
ordered[k] = v
with open(extrema_json_path, "w") as file_out:
json.dump(ordered, file_out, indent=2)
else:
with open(extrema_json_path, "w") as file_out:
json.dump(extrema_state, file_out, indent=2)
except Exception as final_flush_exception:
info_logger(
"[EXTREMA] Final batched flush failure",
final_flush_exception,
level="message",
)
finally:
try:
extrema_progress_bar.close()
except Exception as maxima_update_exception_outer:
pass
# Always return with the global last_orbit key first
last_orbit_global_key = f"{y_scale}_{z_scale}_last_orbit"
if last_orbit_global_key in extrema_state:
ordered = {last_orbit_global_key: extrema_state[last_orbit_global_key]}
for k, v in extrema_state.items():
if k != last_orbit_global_key:
ordered[k] = v
return ordered
return extrema_state
def _signal_handler(signum, frame): # frame unused
# On first signal, request shutdown and immediately terminate children
if not shutdown_requested["flag"]:
info_logger(
f"[INTERRUPT] Signal {signum} received. Requesting shutdown...",
level="message",
)
shutdown_requested["flag"] = True
try:
_terminate_all_child_processes()
finally:
# Interrupt the main loop immediately
raise KeyboardInterrupt
else:
# Second Ctrl+C -> hard exit
info_logger(
"[INTERRUPT] Second interrupt – forcing immediate exit.",
level="message",
)
try:
_terminate_all_child_processes()
finally:
raise SystemExit(130)
# Register (idempotent inside single main process invocation)
try:
signal.signal(signal.SIGINT, _signal_handler)
signal.signal(signal.SIGTERM, _signal_handler)
except Exception as signal_registration_exception:
info_logger(
"[WARN] Could not register signal handlers",
signal_registration_exception,
level="message",
)
# Load the DataFrame of filtered orbits (for vertical line marking, etc.)
filtered_orbits_dataframe = load_filtered_orbits()
# Configure buffered logging batch size (reuse flush_batch_size if not explicitly set)
try:
configure_info_logger_batch(log_flush_batch_size or flush_batch_size)
except Exception as signal_registration_exception_outer:
pass
# Compute or load global extrema for this y/z combo prior to scheduling futures
# Note: 'max_processing_percentile' controls intensity percentile; energy coverage fixed at 99%.
# Derive (or refresh) global extrema; intensity percentile governed by
# 'max_processing_percentile' (energy coverage remains fixed at 99%).
global_extrema = compute_global_extrema(
directory_path,
y_scale,
z_scale,
instrument_order,
compute_mins=False,
max_percentile=max_processing_percentile,
log_floor_cutoff=0.1,
log_floor_value=-1.0,
flush_batch_size=flush_batch_size,
)
# Gather all CDF files, skipping any with '_orb_' in the filename (these are not data files)
cdf_file_paths = [
str(path)
for path in Path(directory_path).rglob("*.[cC][dD][fF]")
if "_orb_" not in str(path).lower()
]
# Helper function: extract orbit number and instrument type from a CDF file path
def extract_orbit_and_instrument(cdf_path: str):
"""Parse filename to ``(orbit_number, instrument_type, cdf_path)`` or ``None``.
Parameters
----------
cdf_path : str
Path to a CDF file.
Returns
-------
tuple of (int, str, str) or None
Parsed ``(orbit_number, instrument_type, cdf_path)`` or ``None``.
Notes
-----
Returns ``None`` when the filename doesn't match the expected pattern,
the orbit number cannot be parsed, or the path corresponds to a non-data
CDF (e.g., when ``instrument_type`` is ``None`` or ``'orb'``).
"""
filename = os.path.basename(cdf_path)
parts = filename.split("_")
if len(parts) < 5:
return None
try:
orbit_number = int(parts[-2])
except Exception as invalid_orbit_number_exception:
info_logger(
f"[ERROR] Invalid orbit number in filename: {filename}",
invalid_orbit_number_exception,
level="message",
)
return None
instrument_type = get_cdf_file_type(cdf_path)
if instrument_type is None or instrument_type == "orb":
return None
return (orbit_number, instrument_type, cdf_path)
# Build a mapping: orbit_number -> {instrument_type: cdf_path, ...}
orbit_to_instruments = defaultdict(dict)
for idx, cdf_path in enumerate(cdf_file_paths):
result = extract_orbit_and_instrument(cdf_path)
if result is not None:
orbit_number, instrument_type, cdf_path = result
orbit_to_instruments[orbit_number][instrument_type] = cdf_path
# Sort orbits by orbit number (ascending)
sorted_orbits = sorted(orbit_to_instruments.items(), key=lambda x: x[0])
total_orbits = len(sorted_orbits)
# Progress and error tracking (per y/z scale combo)
progress_key = f"{y_scale}_{z_scale}_last_orbit"
error_key = f"{y_scale}_{z_scale}_error_plotting"
progress_data = {}
last_completed_orbit = None
error_orbits = set()
if (progress_json_path is not None) and (not ignore_progress_json):
try:
with open(progress_json_path, "r") as f:
progress_data = json.load(f)
last_completed_orbit = progress_data.get(progress_key, None)
error_orbits = set(progress_data.get(error_key, []))
except Exception as progress_json_initial_read_exception:
info_logger(
f"[ERROR] Failed to load progress JSON from {progress_json_path}. Starting fresh.",
progress_json_initial_read_exception,
level="error",
)
progress_data = {}
last_completed_orbit = None
error_orbits = set()
# Determine where to start (resume from last completed orbit, if any)
start_idx = 0
if last_completed_orbit is not None:
for i, (orbit, _) in enumerate(sorted_orbits):
if orbit > last_completed_orbit:
start_idx = i
break
else:
start_idx = total_orbits # All done
info_logger(
f"[RESUME] Skipping {start_idx} orbits (up to and including orbit {last_completed_orbit}) based on progress file. {len(error_orbits)} error orbits will also be skipped.",
level="message",
)
else:
info_logger(
f"[RESUME] No previous progress found. Starting from the first orbit. {len(error_orbits)} error orbits will be skipped if present.",
level="message",
)
# Batch processing state
# Use LOGFILE_DATETIME to improve ETA if possible
batch_start_time = _time.time()
try:
# Correct variable: FAST_LOGFILE_DATETIME_STRING (not path) holds the timestamp string
if "FAST_LOGFILE_DATETIME_STRING" in globals():
dt = datetime.strptime(FAST_LOGFILE_DATETIME_STRING, "%Y-%m-%d_%H-%M-%S")
batch_start_time = dt.replace(tzinfo=timezone.utc).timestamp()
except Exception as logfile_datetime_parse_exception:
# Non-fatal; continue with current time
info_logger(
"[WARN] LOGFILE_DATETIME_STRING parsing failed, using current time for batch start.",
logfile_datetime_parse_exception,
level="message",
)
orbit_times = deque(maxlen=20) # Track only last 20 orbit times for ETA
div_zero_orbits = set() # Track orbits with divide by zero warnings
# Progress bar selection
use_tqdm_bar = bool(use_tqdm) if use_tqdm is not None else False
# Prepare the main orbit argument list
orbit_args_list = []
for orbit_number, instrument_files in sorted_orbits[start_idx:]:
if orbit_number in error_orbits:
continue
orbit_args = (
orbit_number,
instrument_files,
filtered_orbits_dataframe,
zoom_duration_minutes,
y_scale,
z_scale,
instrument_order,
colormap,
output_base,
orbit_timeout_seconds,
instrument_timeout_seconds,
global_extrema,
)
orbit_args_list.append(orbit_args)
results = []
# Load progress data again in case of concurrent updates
# Progress batching state
if flush_batch_size < 1:
flush_batch_size = 1
_batched_progress_dirty = {"count": 0}
def save_progress_json(progress_data: Dict[str, Any], force: bool = False) -> None:
"""Write ``progress_data`` to disk, logging on failure.
Parameters
----------
progress_data : dict
Progress dictionary to persist.
Returns
-------
None
"""
if progress_json_path is None:
return
if not force:
_batched_progress_dirty["count"] += 1
if _batched_progress_dirty["count"] < flush_batch_size:
return
# Flush now
_batched_progress_dirty["count"] = 0
try:
with open(progress_json_path, "w") as f:
json.dump(progress_data, f, indent=2)
except Exception as progress_json_write_exception:
info_logger(
"[FAIL] Could not write progress JSON",
progress_json_write_exception,
level="error",
)
# Multiprocessing batch execution with correct tqdm update
executor = None
try:
executor = concurrent.futures.ProcessPoolExecutor(max_workers=max_workers)
future_to_orbit = {}
for args in orbit_args_list:
if shutdown_requested["flag"]:
break
future = executor.submit(FAST_process_single_orbit, *args)
future_to_orbit[future] = args[0]
futures = set(future_to_orbit.keys())
def _handle_completed_future(fut, orbit_number):
"""Consume a completed future and update progress JSON.
Parameters
----------
fut : concurrent.futures.Future
The completed future.
orbit_number : int
Orbit number corresponding to this future.
Returns
-------
None
"""
try:
result = fut.result()
results.append(result)
status_value = result.get("status")
if verbose and use_tqdm_bar:
tqdm.write(
f"[BATCH] Completed orbit {orbit_number}: {status_value}"
)
if progress_json_path is not None:
try:
with open(progress_json_path, "r") as f:
pdisk = json.load(f)
except Exception as progress_json_read_exception:
info_logger(
f"[WARN] Could not read progress JSON (using empty). Exception: {progress_json_read_exception}",
level="message",
)
pdisk = {}
# Ensure base keys
pdisk[progress_key] = orbit_number
if error_key not in pdisk:
pdisk[error_key] = []
# Timeout keys
orbit_timeout_key = f"orbit_{y_scale}_{z_scale}_timed_out"
if orbit_timeout_key not in pdisk:
pdisk[orbit_timeout_key] = []
# If instrument timeout, store under instrument specific list
if status_value == "error":
# Record generic error orbit list
pdisk[error_key] = sorted(
list(set(pdisk[error_key]) | {orbit_number})
)
# Capture detailed error reasons (if provided)
error_list = result.get("errors") or []
for err_msg in error_list:
# Derive a short reason token
reason_token = "generic"
lowered = err_msg.lower()
if "divide" in lowered and "zero" in lowered:
reason_token = "divide-by-zero"
elif "invalid" in lowered and "cdf" in lowered:
reason_token = "invalid-cdf"
elif "timeout" in lowered:
reason_token = "timeout"
elif "plotting" in lowered:
reason_token = "plotting"
# Determine instrument if present in message (ees/eeb/ies/ieb)
instrument_match = None
for cand in ("ees", "eeb", "ies", "ieb"):
if cand in lowered:
instrument_match = cand
break
inst_token = instrument_match or "unknown"
detailed_key = (
f"{inst_token}_{y_scale}_{z_scale}_error-{reason_token}"
)
if detailed_key not in pdisk:
pdisk[detailed_key] = []
if orbit_number not in pdisk[detailed_key]:
pdisk[detailed_key].append(orbit_number)
# Also maintain a reason-only aggregate (no instrument)
agg_reason_key = f"{y_scale}_{z_scale}_error-{reason_token}"
if agg_reason_key not in pdisk:
pdisk[agg_reason_key] = []
if orbit_number not in pdisk[agg_reason_key]:
pdisk[agg_reason_key].append(orbit_number)
elif status_value == "timeout":
timeout_type = result.get("timeout_type")
timeout_instrument = result.get("timeout_instrument")
if timeout_type == "orbit":
pdisk[orbit_timeout_key] = sorted(
list(set(pdisk[orbit_timeout_key]) | {orbit_number})
)
info_logger(
f"[TIMEOUT-LOG] Recorded orbit timeout for orbit {orbit_number} under key {orbit_timeout_key}",
level="message",
)
elif timeout_type == "instrument":
if timeout_instrument is None:
timeout_instrument = "unknown_instrument"
instrument_timeout_key = (
f"{timeout_instrument}_{y_scale}_{z_scale}_timed_out"
)
if instrument_timeout_key not in pdisk:
pdisk[instrument_timeout_key] = []
pdisk[instrument_timeout_key] = sorted(
list(
set(pdisk[instrument_timeout_key]) | {orbit_number}
)
)
info_logger(
f"[TIMEOUT-LOG] Recorded instrument timeout for orbit {orbit_number}, instrument {timeout_instrument} under key {instrument_timeout_key}",
level="message",
)
save_progress_json(pdisk)
except Exception as orbit_future_exception:
info_logger(
f"[BATCH] Orbit {orbit_number} generated an exception",
orbit_future_exception,
level="error",
)
results.append(
{
"orbit": orbit_number,
"status": "error",
"errors": [str(orbit_future_exception)],
}
)
if progress_json_path is not None:
try:
with open(progress_json_path, "r") as f:
pdisk = json.load(f)
except Exception as progress_json_read_exception2:
info_logger(
"[WARN] Could not read progress JSON after exception (using empty).",
progress_json_read_exception2,
level="message",
)
pdisk = {}
pdisk[progress_key] = orbit_number
if error_key not in pdisk:
pdisk[error_key] = []
pdisk[error_key] = sorted(
list(set(pdisk[error_key]) | {orbit_number})
)
# Record exception reason
exception_str = str(orbit_future_exception)
lowered = exception_str.lower()
reason_token = "generic"
if "divide" in lowered and "zero" in lowered:
reason_token = "divide-by-zero"
elif "invalid" in lowered and "cdf" in lowered:
reason_token = "invalid-cdf"
elif "timeout" in lowered:
reason_token = "timeout"
elif "plot" in lowered:
reason_token = "plotting"
detailed_key = f"unknown_{y_scale}_{z_scale}_error-{reason_token}"
if detailed_key not in pdisk:
pdisk[detailed_key] = []
if orbit_number not in pdisk[detailed_key]:
pdisk[detailed_key].append(orbit_number)
agg_reason_key = f"{y_scale}_{z_scale}_error-{reason_token}"
if agg_reason_key not in pdisk:
pdisk[agg_reason_key] = []
if orbit_number not in pdisk[agg_reason_key]:
pdisk[agg_reason_key].append(orbit_number)
save_progress_json(pdisk)
if shutdown_requested["flag"]:
info_logger(
"[INTERRUPT] Shutdown requested before any futures executed.",
level="message",
)
raise KeyboardInterrupt
# Non-blocking wait loop with polling so Ctrl+C is handled immediately
total_count = len(futures)
processed_count = 0
progress_bar = None
if use_tqdm_bar:
if start_idx > 0:
info_logger(
f"[RESUME] Resuming progress bar at orbit {start_idx+1} of {total_orbits} for y_scale={y_scale}, z_scale={z_scale}.",
level="message",
)
progress_bar = tqdm(
total=total_count,
initial=0,
desc=f"Orbits - {y_scale} / {z_scale}",
unit="orbit",
leave=False,
)
try:
while futures:
if shutdown_requested["flag"]:
break
done, pending = concurrent.futures.wait(
futures, timeout=0.2, return_when=concurrent.futures.FIRST_COMPLETED
)
for fut in done:
futures.discard(fut)
orbit_number = future_to_orbit.get(fut)
_handle_completed_future(fut, orbit_number)
processed_count += 1
if progress_bar is not None:
# Update the progress bar description/postfix with the current orbit number
progress_bar.set_postfix(orbit=orbit_number)
progress_bar.update(1)
finally:
if progress_bar is not None:
progress_bar.close()
if shutdown_requested["flag"]:
info_logger(
"[INTERRUPT] Cancelling remaining futures and terminating workers...",
level="message",
)
for fut in list(futures):
try:
fut.cancel()
except Exception as future_cancel_exception:
pass
# Forcefully terminate worker processes
try:
executor.shutdown(wait=False, cancel_futures=True)
if hasattr(executor, "_processes"):
for proc in executor._processes.values(): # type: ignore[attr-defined]
try:
proc.terminate()
except Exception as process_terminate_exception_inner:
pass
# brief grace period
_time.sleep(0.05)
for proc in executor._processes.values(): # type: ignore[attr-defined]
try:
if proc.is_alive():
proc.kill()
except Exception as process_kill_exception_inner:
pass
except Exception as executor_shutdown_exception_inner:
pass
raise KeyboardInterrupt
except KeyboardInterrupt as keyboard_interrupt_exception:
info_logger(
f"[INTERRUPT] KeyboardInterrupt caught. Terminating worker processes... Exception: {keyboard_interrupt_exception}",
level="message",
)
if executor is not None:
try:
executor.shutdown(wait=False, cancel_futures=True)
if hasattr(executor, "_processes"):
for proc in executor._processes.values(): # type: ignore[attr-defined]
try:
proc.terminate()
except Exception as process_terminate_exception:
# Suppress termination errors during shutdown
pass
# Force kill if still alive after short grace
_time.sleep(0.05)
for proc in executor._processes.values(): # type: ignore[attr-defined]
try:
if proc.is_alive():
proc.kill()
except Exception as retry_cleanup_exception:
pass
except Exception as executor_shutdown_exception:
# Suppress shutdown errors during intentional interrupt
pass
# Propagate interrupt so caller (main) stops and doesn't start next scale combo
raise
finally:
if executor is not None:
try:
executor.shutdown(wait=False, cancel_futures=True)
except Exception as final_executor_shutdown_exception:
# Suppress final shutdown errors; we're exiting anyway
pass
# Force flush any remaining batched progress updates
try:
if progress_json_path is not None and os.path.exists(progress_json_path):
with open(progress_json_path, "r") as f:
final_progress_data = json.load(f)
else:
final_progress_data = (
progress_data if isinstance(progress_data, dict) else {}
)
save_progress_json(final_progress_data, force=True)
except Exception as final_progress_flush_exception:
pass
# Flush any remaining buffered logs
try:
flush_info_logger_buffer(force=True)
except Exception as final_log_flush_exception:
pass
# Retry logic for timeouts (run once, sequentially or with a small pool)
if retry_timeouts and not shutdown_requested["flag"]:
timeout_orbits = [r["orbit"] for r in results if r.get("status") == "timeout"]
if timeout_orbits:
info_logger(
f"[RETRY] Retrying {len(timeout_orbits)} timed-out orbits once (thresholds orbit={orbit_timeout_seconds}s, instrument={instrument_timeout_seconds}s).",
level="message",
)
# Rebuild args only for timeout orbits
retry_args = [
(
o,
orbit_to_instruments[o],
filtered_orbits_dataframe,
zoom_duration_minutes,
y_scale,
z_scale,
instrument_order,
colormap,
output_base,
orbit_timeout_seconds,
instrument_timeout_seconds,
)
for o in timeout_orbits
if o in orbit_to_instruments
]
retry_results = []
# Use a smaller pool (min of current max_workers and 2) to reduce overhead
try:
with concurrent.futures.ProcessPoolExecutor(
max_workers=min(max_workers, 2)
) as retry_executor:
retry_future_map = {
retry_executor.submit(FAST_process_single_orbit, *ra): ra[0]
for ra in retry_args
}
for rfut in concurrent.futures.as_completed(retry_future_map):
r_orbit = retry_future_map[rfut]
try:
r_result = rfut.result()
retry_results.append(r_result)
info_logger(
f"[RETRY] Completed orbit {r_orbit}: {r_result.get('status')}",
level="message",
)
# Update JSON: remove from timeout lists if success
if (
progress_json_path is not None
and r_result.get("status") == "ok"
):
try:
with open(progress_json_path, "r") as f:
pdisk_retry = json.load(f)
except Exception as retry_json_read_exception:
info_logger(
"[WARN] Could not read progress JSON for retry cleanup",
retry_json_read_exception,
level="message",
)
pdisk_retry = {}
# Remove orbit from any timeout key lists for this scale combo
keys_to_check = [
k
for k in pdisk_retry.keys()
if k.endswith(f"_{y_scale}_{z_scale}_timed_out")
]
modified = False
for tk in keys_to_check:
if (
isinstance(pdisk_retry.get(tk), list)
and r_orbit in pdisk_retry[tk]
):
pdisk_retry[tk] = [
x for x in pdisk_retry[tk] if x != r_orbit
]
modified = True
if modified:
try:
with open(progress_json_path, "w") as f:
json.dump(pdisk_retry, f, indent=2)
info_logger(
f"[RETRY] Cleaned orbit {r_orbit} from timeout lists after successful retry.",
level="message",
)
except Exception as retry_json_write_exception:
info_logger(
"[WARN] Could not write cleaned progress JSON",
retry_json_write_exception,
level="message",
)
except Exception as retry_orbit_exception:
info_logger(
f"[RETRY] Orbit {r_orbit} retry failed with exception",
retry_orbit_exception,
level="error",
)
retry_results.append(
{
"orbit": r_orbit,
"status": "error",
"errors": [str(retry_orbit_exception)],
}
)
except Exception as retry_pool_exception:
info_logger(
"[RETRY] Failed to execute retry pool",
retry_pool_exception,
level="message",
)
# Merge retry results: replace original entries for those orbits
results_map = {r["orbit"]: r for r in results}
for rr in retry_results:
results_map[rr["orbit"]] = rr
results = list(results_map.values())
# Restore original signal handlers (best-effort) to avoid lingering changes
try:
if previous_sigint is not None:
signal.signal(signal.SIGINT, previous_sigint)
if previous_sigterm is not None:
signal.signal(signal.SIGTERM, previous_sigterm)
except Exception as fast_handler_restore_exception:
info_logger(
"[WARN] Could not restore original FAST signal handlers",
fast_handler_restore_exception,
level="message",
)
return results
[docs]
def main() -> None:
"""Run the FAST batch plotter for all y/z scale combinations sequentially.
Invokes ``FAST_plot_spectrograms_directory`` four times with combinations
of linear/log y and z, using colormaps tailored for each.
Returns
-------
None
Notes
-----
An interrupt during any run stops the sequence immediately without
starting subsequent combinations.
"""
# Use the batch-runner's own handlers; avoid top-level sys.exit to ensure clean shutdown
for scale_combo in [
("linear", "linear", DEFAULT_COLORMAP_LINEAR_Y_LINEAR_Z),
("linear", "log", DEFAULT_COLORMAP_LINEAR_Y_LOG_Z),
("log", "linear", DEFAULT_COLORMAP_LOG_Y_LINEAR_Z),
("log", "log", DEFAULT_COLORMAP_LOG_Y_LOG_Z),
]:
y_scale, z_scale, colormap = scale_combo
FAST_plot_spectrograms_directory(
FAST_CDF_DATA_FOLDER_PATH,
verbose=False,
y_scale=y_scale,
z_scale=z_scale,
use_tqdm=True,
colormap=colormap,
max_processing_percentile=90,
)
# Section: Script entry point
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
info_logger("[INTERRUPT] Batch plotting aborted by user.", level="message")
print("\n[INTERRUPT] Aborted by user.")
sys.exit(130)