Source code for midtools.dataset

#!/usr/bin/env python

import warnings

warnings.filterwarnings("ignore", category=FutureWarning)

import os
import yaml
import re
import sys
import time
from datetime import datetime
import numpy as np
import h5py as h5
import argparse
from pathlib import Path
from shutil import copyfile

# XFEL packages
import extra_data
from extra_data import RunDirectory, open_run
from extra_geom import AGIPD_1MGeometry

# Dask and xarray
import dask
import dask_jobqueue
import dask.array as da
from dask.distributed import Client, progress, LocalCluster
from dask_jobqueue import SLURMCluster
from dask.diagnostics import ProgressBar
from dask.distributed import TimeoutError
from distributed.comm.core import CommClosedError
from functools import wraps

import Xana
from Xana import Setup

from .azimuthal_integration import azimuthal_integration
from .correlations import correlate, get_q_phi_pixels
from .average import average
from .statistics import statistics
from .calibration import Calibrator, _create_mask_from_dark, _create_mask_from_flatfield
from .masking import mask_radial, mask_asics

from functools import reduce

from .plotting import get_datasets
from .plotting import Interpreter

import pdb


def _exception_handler(max_attempts=3):
    def restart(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            attempt = 0
            while attempt <= max_attempts:
                try:
                    return func(*args, **kwargs)
                except (IOError, OSError, TimeoutError, CommClosedError) as e:
                    print(f"Restart due to {str(e)}. Attempt {attempt}")
                    attempt += 1

        return wrapper

    return restart


def print_now():
    now = datetime.now()
    print(now.strftime("%Y-%m-%d: %H-%M-%S"), flush=True)


[docs]class Dataset: METHODS = [ "META", "DIAGNOSTICS", "FRAMES", "SAXS", "XPCS", "STATISTICS", "DARK", "FLATFIELD", ]
[docs] def __init__( self, run_number, setupfile, proposal=None, analysis=None, datdir=None, first_train=0, last_train=1e6, pulses_per_train=500, dark_run_number=None, train_file="train-file.npy", pulse_file="pulse-file.npy", first_cell=2, train_step=1, pulse_step=1, is_dark=False, localcluster=False, is_flatfield=False, flatfield_run_number=None, out_dir="./", file_identifier=None, trainId_offset=0, **kwargs, ): """Dataset object to analyze MID datasets on Maxwell. Args: setupfile (str): Setupfile (.yml) that contains information on the setup parameters. analysis (str, optional): Flags of the analysis to perform. Defaults to '00'. `analysis` is a string of ones and zeros where a one means to perform the analysis and a zero means to omit the analysis. The analysis types are: +-------+-----------------------------+ | flags | analysis | +========+============================+ | 1000 | average frames | +--------+----------------------------+ | 0100 | SAXS azimuthal integration | +--------+----------------------------+ | 0010 | XPCS correlation functions | +--------+----------------------------+ | 0001 | compute statistics | +--------+----------------------------+ last_train (int, optional): Index of last train to analyze. If not provided, all trains are processed. run_number (int, optional): Specify run number. Defaults to None. If not defined, the datdir in the setupfile has to contain the .h5 files. dark_run_number (int, optional): Dark run number for calibration. pulses_per_train (int, optional): Specify the number of pulses per train. If not provided, take all stored memory cells. train_step (int, optional): Stepsize for slicing the train axis. pulse_step (int, optional): Stepsize for slicing the pulse axis. is_dark (bool, optional): If True switch to dark routines, i.e., average dark for dark subtraction, calculate mask from darks. is_flatfield (bool, optional): If True use flatfield algorithms. flatfield_run_number (int, optional): Run number of the processed flatfield for calibration. Note: A setupfile might look like this:: # setup.yml file # Data datdir: /path/to/data/r0522 # Maskfile mask: /path/to/mask/agipd_mask_tmp.npy # Beamline photon_energy: 9 # keV sample_detector: 8 # m pixel_size: 200 # um quadrant_positions: dx: -18 dy: -15 q1: [-500, 650] q2: [-550, -30] q3: [ 570, -216] q4: [ 620, 500] # XPCS xpcs_opt: q_range: q_first: .1 # smallest q in nm-1 q_last: 1. # largest q in nm-1 steps: 10 # how many q-bins """ self.out_dir = out_dir self.pulse_file = os.path.join(self.out_dir, pulse_file) self.train_file = os.path.join(self.out_dir, train_file) #: str: Path to the setupfile. self.setupfile = setupfile setup_pars = self._read_setup(setupfile) #: bool: True if current run is a dark run. self.is_dark = is_dark #: bool: True if current run is a flatfield run. self.is_flatfield = is_flatfield self.dark_run_number = dark_run_number self.flatfield_run_number = flatfield_run_number options = ["slurm", "calib"] options.extend(list(map(str.lower, self.METHODS[1:]))) for option in options: option_name = "_".join([option, "opt"]) attr_name = "_" + option_name attr_value = setup_pars.pop(option_name, {}) setattr(self, attr_name, attr_value) #: bool: True if the SLURMCluster is running self._cluster_running = False self._localcluster = localcluster self.analysis = ["meta", "diagnostics"] # reduce computation to _compute_dark or _compute_flatfield if is_dark: self.analysis.extend(["dark"]) elif is_flatfield: self.analysis.extend(["flatfield"]) else: if analysis is not None: for flag, method in zip(analysis, self.METHODS[2:]): if int(flag): self.analysis.append(method.lower()) self.run_number = run_number self.proposal = proposal self.datdir = setup_pars.pop("datdir", datdir) #: DataCollection: e.g., returned from extra_data.RunDirectory self.run = RunDirectory(self.datdir) self.mask = setup_pars.pop("mask", None) self.trainId_offset = trainId_offset self.pulses_per_train = pulses_per_train self.pulse_step = pulse_step self.train_step = train_step self.first_train_idx = first_train self.last_train_idx = last_train self.first_cell = first_cell # placeholder set when computing META self.cell_ids = None self.train_ids = None self.selected_train_ids = None self.ntrains = None self.train_indices = None # Experimental Setup #: tuple: Position of the direct beam in pixels self.center = setup_pars.pop("beamcenter", None) self.agipd_geom = setup_pars.pop("geometry", False) self.adu_per_photon = setup_pars.pop("adu_per_photon", 66) # save the other entries as attributes self.__dict__.update(setup_pars) del setup_pars #: Xana setup instance self.setup = None #: np.ndarray: qmap (16, 512, 128) self.qmap = None self._get_setup() #: dict: Structure of the HDF5 file self.h5_structure = self._make_h5structure() #: str: HDF5 file name. self.file_name = None self.file_identifier = file_identifier #: Calibrator: Calibrator instance for data pre-processing set by _compute_meta self._calibrator = None # placeholder attributes self._cluster = None # the slurm cluster self._client = None # the slurm cluster client
def _make_h5structure(self): """Create the HDF5 data structure.""" h5_structure = { "META": { "/identifiers/pulses_per_train": [True], "/identifiers/pulse_ids": [True], "/identifiers/train_ids": [None], "/identifiers/train_indices": [None], "/identifiers/complete_trains": [True], "/identifiers/all_trains": [True], }, "DIAGNOSTICS": { "/identifiers/filtered_trains": [None], "/pulse_resolved/xgm/energy": [None], "/pulse_resolved/xgm/pointing_x": [None], "/pulse_resolved/xgm/pointing_y": [None], "/train_resolved/sample_position/y": [None], "/train_resolved/sample_position/z": [None], "/train_resolved/linkam-stage/temperature": [None], }, "SAXS": { "/pulse_resolved/azimuthal_intensity/q": [True], "/pulse_resolved/azimuthal_intensity/phi": [None], "/pulse_resolved/azimuthal_intensity/I": [None], }, "XPCS": { "/train_resolved/correlation/q": [True], "/train_resolved/correlation/t": [True], "/train_resolved/correlation/stride": [None], # "/train_resolved/correlation/g2": [None], "/train_resolved/correlation/ttc": [None], }, "FRAMES": { "/average/intensity": [None], "/average/variance": [None], "/average/image_2d": [None], "/utility/mask": [None], "/average/train_ids": [None], }, "STATISTICS": { "/pulse_resolved/statistics/centers": [True], "/pulse_resolved/statistics/counts": [None], }, "DARK": { "/dark/intensity": [None], "/dark/variance": [None], "/dark/mask": [None], "/dark/median": [None], }, "FLATFIELD": { "/flatfield/intensity": [None], "/flatfield/variance": [None], "/flatfield/mask": [None], "/flatfield/median": [None], }, } return h5_structure def __str__(self): return "MID dataset." def __repr(self): return f"Dataset({self.setupfile})" @property def datdir(self): """str: Data directory.""" return self.__datdir @datdir.setter def datdir(self, path): if path is None: return elif isinstance(path, dict): basedir = "/gpfs/exfel/exp/MID/" if ( len( list( filter( lambda x: x in path.keys(), ["cycle", "proposal", "datatype", "run"], ) ) ) == 4 ): path = ( basedir + f"/{path['cycle']}" + f"/p{path['proposal']:06d}" + f"/{path['datatype']}" + f"/r{path['run']:04d}" ) elif isinstance(path, str): if bool(re.search("r\d{4}", path)): self.run_number = path elif isinstance(self.run_number, int): path += f"/r{self.run_number:04d}" else: raise ValueError( "Could not determine run number. Specify run " "number with --run argument or pass data directory " "with run folder (e.g., r0123.)" ) elif isinstance(self.run_number, np.integer) and isinstance(self.proposal, np.integer): self.run = open_run(self.run_number, proposal, data='raw') self.__datdir = str(Path(self.run.files[0].filename).parent) elif isinstance(self.run, extra_data.reader.DataCollection): self.__datdir = str(Path(self.run.files[0].filename).parent) else: raise ValueError(f"Invalid data directory: {path}") # if self.is_dark or self.is_flatfield or self.dark_run_number is not None: # path = path.replace("proc", "raw") # print("Switched to raw data format") path = os.path.abspath(path) if os.path.exists(path): self.__datdir = path else: raise FileNotFoundError(f"Data directory {path} das not exist.") @property def agipd_geom(self): """AGIPD_1MGeometry: AGIPD geometry obtained from extra_data.""" return self.__agipd_geom @agipd_geom.setter def agipd_geom(self, geom): if isinstance(geom, list): geom = AGIPD_1MGeometry.from_quad_positions(geom) elif isinstance(geom, str): geom = AGIPD_1MGeometry.from_crystfel_geom(geom) else: raise TypeError(f"Cannot create geometry from {type(geom)}.") self.__agipd_geom = geom dummy_img = np.zeros((16, 512, 128), "int8") if self.center is None: self.center = geom.position_modules_fast(dummy_img)[1][::-1] else: self.center = tuple(map(float, self.center.split(','))) del dummy_img @property def run_number(self): """int: Number of the run.""" return self.__run_number @run_number.setter def run_number(self, number): if isinstance(number, str): number = re.findall("(?<=r)\d{4}", number) if len(number): number = int(number[0]) else: number = None elif isinstance(number, int): pass elif number is None: pass elif isinstance(number, extra_data.reader.DataCollection): self.run = number self.datdir = str(Path(self.run.files[0].filename).parent) return # run_number is updated in datdir setter else: raise TypeError(f"Invalid run number type {type(number)}.") self.__run_number = number @staticmethod def _read_setup(setupfile): """read setup parameters from config file Args: setupfile (str): Path to the setup file (YAML). Returns: dict: containing the setup parameters read from the setupfile. """ with open(setupfile) as file: setup_pars = yaml.load(file, Loader=yaml.FullLoader) # include dx and dy in quadrant position if "quadrant_positions" in setup_pars.keys(): quad_dict = setup_pars["quadrant_positions"] dx, dy = [quad_dict[x] for x in ["dx", "dy"]] quad_pos = [ (dx + quad_dict[f"q{x}"][0], dy + quad_dict[f"q{x}"][1]) for x in range(1, 5) ] if "geometry" not in setup_pars.keys(): setup_pars["geometry"] = quad_pos else: print( "Quadrant positions and geometry file defined by \ setupfile. Loading geometry file..." ) # if the key exists in the setup file without any entries, None is # returned. We convert those entries to empty dictionaires. for key, value in list(setup_pars.items()): if value is None: setup_pars.pop(key) return setup_pars @property def mask(self): """np.ndarray: shape(16,512,128) Mask where `bad` pixels are 0 and `good` pixels 1. """ return self.__mask @mask.setter def mask(self, mask): local_mask_file = os.path.join(self.out_dir, "mask.npy") if os.path.isfile(local_mask_file): mask = np.load(local_mask_file) print(f"Loaded local mask-file: {local_mask_file}") elif mask is None: mask = np.ones((16, 512, 128), "bool") elif isinstance(mask, str): try: mask = np.load(mask) except Exception as e: print("Loading the mask failed. Error: ", e) elif isinstance(mask, np.ndarray): if mask.shape != (16, 512, 128): try: mask = mask.reshape(16, 512, 128) except Exception as e: print("Mask array has invalid shape: ", mask.shape) else: raise TypeError(f"Cannot read mask of type {type(mask)}.") self.__mask = np.array(mask).astype("bool") @staticmethod def _get_good_trains(run): """Function that finds all complete trains of the run. Args: run (DataCollection): XFEL data, e.g., obtained from extra_data.RundDirectory. Return: tuple(np.ndarry, np.ndarry): First array contains the train_ids. Second array contains the corresponding indices. """ "Stolen from Robert Rosca" trains_with_images = {} det_sources = filter(lambda x: "AGIPD1M" in x, run.detector_sources) for source_name in det_sources: good_trains_source = set() for source_file in run._source_index[source_name]: good_trains_source.update( set( source_file.train_ids[ source_file.get_index(source_name, "image")[1].nonzero() ] ) ) trains_with_images[source_name] = good_trains_source good_trains = sorted( reduce(set.intersection, list(trains_with_images.values())) ) good_trains = np.array(good_trains) good_indices = np.intersect1d(run.train_ids, good_trains, return_indices=True)[ 1 ] return good_trains, good_indices def _get_pulse_pattern(self, pulses_per_train=500, pulse_step=1): """determine pulses pattern from machine source""" source = "MID_RR_SYS/MDL/PULSE_PATTERN_DECODER" if os.path.isfile(self.pulse_file): pass elif source in self.run.all_sources and not (self.is_dark or self.is_flatfield): pulses = self.run.get_array(source, "sase2.pulseIds.value") pulse_spacing = np.unique(pulses.where(pulses > 199).diff("dim_0")) pulse_spacing = pulse_spacing[np.isfinite(pulse_spacing)].astype("int32") pulseIds = pulses // 200 npulses = np.unique((pulseIds > 0).sum("dim_0")) if len(npulses) == 1: npulses = min(npulses[0], pulses_per_train) print(f"Analyzing {npulses} pulses per train.") else: print(f"Found various number of pulses per train {npulses}.") npulses = min(max(npulses), pulses_per_train) if len(pulse_spacing) == 1: pulse_spacing = pulse_step # max(pulse_spacing[0]//2, pulse_step) print(f"Using pulse spacing of {pulse_spacing}.") else: print( f"Varying the pulse spacing is not supported. Found {pulse_spacing}" ) pulse_spacing = pulse_step # max(min(pulse_spacing[0]//2), pulse_step) print(f"Using {pulse_spacing} cell_spacing") else: pulse_spacing = pulse_step npulses = pulses_per_train print( "Pulse pattern decoder not found. Using: " f"pulse spacing {pulse_spacing} and " f"{npulses} pulses per train" ) return npulses, pulse_spacing def _get_cell_ids(self, train_idx=0): source = "MID_DET_AGIPD1M-1/DET/{}CH0:xtdf" i = 0 while i < 500: for module in range(0, 16, 2): try: s = source.format(module) tid, train_data = self.run.select( s, "image.pulseId" ).train_from_index(train_idx) cell_ids = train_data[s]["image.pulseId"] return np.array(cell_ids).flatten() except (KeyError, ValueError): pass i += 1 train_idx += 10 raise ValueError( "Unable to determine pulse ids. Probably the data " "source was not available." ) def _get_setup(self): """initialize the Xana setup and the azimuthal integrator""" self.setup = Setup(detector="agipd1m") self.setup.mask = self.mask.copy() self.setup.make( **dict( center=self.center, wavelength=12.39 / self.photon_energy, distance=self.sample_detector, ) ) dist = self.agipd_geom.to_distortion_array() self.setup.detector.IS_CONTIGUOUS = False self.setup.detector.set_pixel_corners(dist) self.setup._update_ai() qmap = self.setup.ai.array_from_unit(unit="q_nm^-1") phimap = self.setup.ai.chiArray() #: np.ndarray: q-map self.qmap = qmap.reshape(16, 512, 128) #: np.ndarray: azimhuthal-angle-map self.phimap = phimap.reshape(16, 512, 128) self.setup.mask = self.setup.mask.reshape(-1,128) self.setup = get_q_phi_pixels(self.setup, self._xpcs_opt) self.setup.mask = self.setup.mask.reshape(16,512,128) #save array of rois img = np.zeros((16,512,128)) img[~self.setup.mask] = np.nan img = img.reshape(-1, 128) for i, roi in enumerate(self.setup.qroi): img[roi] = i+1 img = img.reshape(16,512,128) np.save(f"{self.out_dir}/rois.npy", self.__agipd_geom.position_modules_fast(img)[0]) def _start_slurm_cluster(self): """Initialize the slurm cluster""" opt = self._slurm_opt # nprocs = 72//threads_per_process nprocs = opt.pop("nprocs", 1) threads_per_process = opt.pop("cores", 40) if self.is_dark or self.is_flatfield: nprocs = 1 if isinstance(self.ntrains, np.integer): default_number_jobs = min(max(int(self.ntrains / 64), 4), 4) else: default_number_jobs = 4 njobs = opt.pop("njobs", default_number_jobs) if self._localcluster: self._cluster = LocalCluster( n_workers=nprocs, ) else: print(f"Submitting {njobs} jobs using {nprocs} processes per job.") self._cluster = SLURMCluster( queue=opt.get("partition", opt.pop("partition","upex")), processes=nprocs, cores=threads_per_process, memory=opt.pop("memory", "400GB"), log_directory="./dask_log/", local_directory="/scratch/", nanny=True, death_timeout=60 * 60, walltime="6:00:00", interface=opt.get('interface', "ib0"), # or 'eth0' name="midtools", ) self._cluster.scale(nprocs * njobs) # self._cluster.adapt(maximum_jobs=njobs) print(self._cluster) self._client = Client(self._cluster) print("Cluster dashboard link:", self._cluster.dashboard_link) def _stop_slurm_cluster(self): """Shut down the slurm cluster""" self._client.close() self._cluster.close() def _create_output_file(self): """Create the HDF5 output file.""" if not os.path.exists(self.out_dir): os.mkdir(self.out_dir) if self.is_dark: ftype = "dark" elif self.is_flatfield: ftype = "flatfield" else: ftype = "analysis" if self.file_identifier is None: # check existing files and determine counter existing = os.listdir(self.out_dir) search_str = f"(?<=r{self.run_number:04}-{ftype})" ".*\d{3,}(?=\.h5)" counter = map(re.compile(search_str).search, existing) counter = filter(lambda x: bool(x), counter) counter = list(map(lambda x: int(x[0][-3:]), counter)) identifier = max(counter) + 1 if len(counter) else 0 else: identifier = self.file_identifier # depricated include last and first # if self.is_dark or self.is_flatfield: # filename = f"r{self.run_number:04}-analysis_{identifier:03}.h5" # else: # # filename = f"r{self.run_number:04}-analysis_{self.first_train_idx}-{self.last_train_idx}_{identifier:03}.h5" # filename = f"r{self.run_number:04}-analysis_{identifier:03}.h5" # this part is just a placeholder and simply creates the HDF5-file while True: try: filename = f"r{self.run_number:04}-{ftype}_{identifier:03}.h5" self.file_name = os.path.join(self.out_dir, filename) with h5.File(self.file_name, "a") as f: for flag, method in zip(self.analysis, self.METHODS): pass # for path,(shape,dtype) in self.h5_structure[method].items(): # pass # # f.create_dataset(path, shape=shape, dtype=dtype) break except OSError as e: print(str(e)) print("Incrementing counter") if self.file_identifier is None: identifier += 1 with open(self.setupfile) as f: setup_pars = yaml.load(f, Loader=yaml.FullLoader) # attributes to save in copied setupfile attrs = [ "datdir", "is_dark", "dark_run_number", "run_number", "pulses_per_train", "pulse_step", "is_flatfield", "flatfield_run_number", ] setup_pars.update({attr: getattr(self, attr) for attr in attrs}) setup_pars["analysis"] = self.analysis # copy the setupfile new_setupfile = f"r{self.run_number:04}-setup_{identifier:03}.yml" new_setupfile = os.path.join(self.out_dir, new_setupfile) with open(new_setupfile, "w") as f: yaml.dump(setup_pars, f) self.setupfile = new_setupfile print(f"Filename: {self.file_name}") print(f"Setupfile: {self.setupfile}")
[docs] def compute(self, create_file=True): """Start the actual computation based on the analysis attribute.""" if create_file: self._create_output_file() try: for method in self.analysis: if method not in ["meta", "diagnostics"] and not self._cluster_running: print(f"\n{'Fetching Data':-^50}") self._start_slurm_cluster() self._cluster_running = True print("Initializing Data Calibrator...") self._init_data_calibrator() if self._calibrator.data is None: self._calibrator._get_data() print(f"\n{method.upper():-^50}") repetition = 0 while repetition < 5: print_now() success = getattr(self, f"_compute_{method.lower()}")() if success: break else: print(f"Compute {method} failed repetition {repetition}") repetition += 1 self._client.restart() self._calibrator._get_data() # print(f"{' Done ':-^50}") # if self._cluster_running: # print('Restarting Cluster') # self._client.restart() finally: if self._client is not None: self._stop_slurm_cluster()
def _compute_meta(self): """Find complete trains.""" # determine trainIds and pulseIds and their spacing self.pulses_per_train, self.pulse_step = self._get_pulse_pattern( self.pulses_per_train, self.pulse_step ) #: np.ndarray: Array of pulse IDs. self.cell_ids = self._get_cell_ids() first_cell_index = np.where(self.cell_ids == self.first_cell)[0][0] cell_slice = slice( first_cell_index, first_cell_index + self.pulses_per_train * self.pulse_step, self.pulse_step, ) self.cell_ids = self.cell_ids[cell_slice] #: int: Number of X-ray pulses per train. self.pulses_per_train = min(len(self.cell_ids), self.pulses_per_train) #: float: Delay time between two successive pulses. self.pulse_delay = np.diff(self.cell_ids)[0] * 220e-9 all_trains = self.run.train_ids complete_train_ids, self.train_indices = self._get_good_trains(self.run) self.train_ids = complete_train_ids.copy() if os.path.isfile(self.train_file): print(f"Loaded train-file {self.train_file}") train_ids = np.load(self.train_file).astype("int64") self.train_ids, self.train_indices, _ = np.intersect1d( all_trains, train_ids, return_indices=True ) print(f"{self.train_ids.size} of {len(all_trains)} trains are complete.") #: int: last train index to compute self.last_train_idx = min([self.last_train_idx, len(self.train_ids)]) if self.first_train_idx > len(self.train_ids): raise ValueError( "First train index {self.first_train_idx} > number of trains {len(self.train_ids)}" ) self.train_ids = self.train_ids[self.first_train_idx : self.last_train_idx] self.selected_train_ids = self.train_ids.copy() self.ntrains = len(self.train_ids) self.train_indices = self.train_indices[ self.first_train_idx : self.last_train_idx ] print( f"Processing train {self.first_train_idx} to {self.last_train_idx}\n" f"{self.ntrains} of {len(all_trains)} trains will be processed." ) data = { "pulses_per_train": self.pulses_per_train, "pulse_ids": self.cell_ids, "train_ids": self.train_ids, "train_indices": self.train_indices, "complete_train_ids": complete_train_ids, "all_train_ids": all_trains, } return self._write_to_h5(data, "META") def _init_data_calibrator(self): self._calibrator = Calibrator( self.run, cell_ids=self.cell_ids, train_ids=self.selected_train_ids, dark_run_number=self.dark_run_number, mask=self.mask.copy(), is_dark=self.is_dark, is_flatfield=self.is_flatfield, flatfield_run_number=self.flatfield_run_number, adu_per_photon=self.adu_per_photon, **self._calib_opt, ) def _compute_diagnostics(self): """Read diagnostic data.""" diagnostics_opt = dict( xgm_threshold=0, ) diagnostics_opt.update(self._diagnostics_opt) print(f"Read XGM and control sources.") sources = { "SA2_XTD1_XGM/XGM/DOOCS:output": [ "data.intensityTD", "data.xTD", "data.yTD", ], **{ f"HW_MID_EXP_SAM_MOTOR_SSHEX_{x}": ["actualPosition.value"] for x in "YZ" }, "MID_EXP_UPP/TCTRL/LINKAM": ["temperature.value"], } sources = dict(filter(lambda x: x[0] in self.run.all_sources, sources.items())) arr = {} for source in sources: for key in sources[source]: data = self.run.get_array(source, key) if ("XGM" in source) and (key == "data.intensityTD") and (diagnostics_opt['xgm_threshold']>0): self.selected_train_ids = np.intersect1d(data.isel( trainId=data.mean("dim_0") > diagnostics_opt['xgm_threshold'] ).trainId, self.train_ids) print(f"{self.selected_train_ids.size}/{self.train_ids.size} remain after filtering by XGM threshold.") arr['filtered_trains'] = self.selected_train_ids self.ntrains = len(self.selected_train_ids) else: if not 'filtered_trains' in arr: arr['filtered_trains'] = self.train_ids # we subtract trainId_offset from the trainId to account for the shift # between AGIPD and other data sources. sel_trains = np.intersect1d(data.trainId, self.selected_train_ids - self.trainId_offset) data = data.sel(trainId=sel_trains) method = None if len(sel_trains) == len(self.selected_train_ids) else "nearest" data = data.reindex(trainId=self.selected_train_ids - self.trainId_offset, method=method) if len(data.dims) == 2: data = data[:, : len(self.cell_ids)] arr["/".join((source, key))] = data return self._write_to_h5(arr, "DIAGNOSTICS") def _compute_saxs(self): """Perform the azimhuthal integration.""" saxs_opt = dict( mask=self.mask, geom=self.agipd_geom, distortion_array=self.agipd_geom.to_distortion_array(), sample_detector=self.sample_detector, photon_energy=self.photon_energy, center=self.center, setup=self.setup, integrate='1D', ) saxs_opt.update(self._saxs_opt) print("Compute pulse resolved SAXS", flush=True) out = azimuthal_integration(self._calibrator, method="single", **saxs_opt) if out["azimuthal-intensity"].shape[:2] == ( self.ntrains, self.pulses_per_train, ): # out['azimuthal-intensity'].to_netcdf(self.file_name.replace('.h5', '.nc')) # print('wrote NetCDF file') print_now() print("Start writing to HDF5", flush=True) return self._write_to_h5(out, "SAXS") else: return False def _compute_xpcs(self): """Calculate correlation functions.""" xpcs_opt = dict( mask=self.mask, qmap=self.qmap, setup=self.setup, dt=self.pulse_delay, use_multitau=False, rebin_g2=False, norm='symmetric', h5filename=self.file_name, method="intra_train", ) xpcs_opt.update(self._xpcs_opt) print("Compute XPCS correlation funcions.", flush=True) out = correlate(self._calibrator, **xpcs_opt) if out["ttc"].shape[0] == self.ntrains: print_now() print("Start writing to HDF5", flush=True) return self._write_to_h5(out, "XPCS") else: return False def _compute_frames(self): """Averaging frames.""" frames_opt = dict(axis="pulse", trainIds=self.selected_train_ids, max_trains=10) frames_opt["trainIds"] = frames_opt["trainIds"][ :: frames_opt["trainIds"].size // frames_opt["max_trains"] ] frames_opt.update(self._frames_opt) print("Computing frames.", flush=True) out = average(self._calibrator, **frames_opt) img2d = self.agipd_geom.position_modules_fast(out["average"])[0] out["image2d"] = img2d out = self._update_mask(out) out["averaged_trains"] = frames_opt["trainIds"][: out["average"].shape[0]] print_now() print("Start writing to HDF5", flush=True) return self._write_to_h5(out, "FRAMES") def _update_mask(self, frames): """Update the mask based on the average intensity and variance. Args: frames (dict): should contain the keys `average` and `variance`. """ assert ("average" in frames) and ("variance" in frames) print("Updating mask based on average image") print( f"Initially: masked {round((1-self.mask.sum()/self.mask.size)*100,2)}% of all pixels" ) mask = mask_radial(frames["average"], self.qmap, self.mask) print(f"Average: masked {round((1-mask.sum()/mask.size)*100,2)}% of all pixels") mask = mask_radial( frames["variance"].mean(0) / frames["average"].mean(0) ** 2, self.qmap, mask=mask, lower_quantile=0.01, ) print( f"Variance: masked {round((1-mask.sum()/mask.size)*100,2)}% of all pixels" ) mask = mask_asics(mask) print(f"Asics: masked {round((1-mask.sum()/mask.size)*100,2)}% of all pixels") self.mask = mask self._calibrator.xmask = mask frames["mask"] = mask return frames def _compute_dark(self): """Averaging darks.""" dark_opt = dict( axis="train", trainIds=self.train_ids, max_trains=len(self.train_ids) ) dark_opt.update(self._dark_opt) # we need to remove the options for masking before passing the dict # to the average method pvals = dark_opt.pop("pvals", (0.2, 0.5)) print("Computing darks.") out = average(self._calibrator, **dark_opt) darkmask, median = _create_mask_from_dark( out["average"], out["variance"], pvals=pvals ) out["darkmask"] = darkmask out["median"] = median return self._write_to_h5(out, "DARK") def _compute_flatfield(self): """Process flatfield.""" flatfield_opt = dict( axis="train", trainIds=self.train_ids, max_trains=len(self.train_ids) ) flatfield_opt.update(self._flatfield_opt) # we need to remove the options for masking before passing the dict # to the average method average_limits = flatfield_opt.pop("average_limits", (3500, 6000)) variance_limits = flatfield_opt.pop("variance_limits", (200, 1500)) print("Computing flatfield.") out = average(self._calibrator, **flatfield_opt) ff_mask, median = _create_mask_from_flatfield( out["average"], out["variance"], average_limits=average_limits, variance_limits=variance_limits, ) out["ffmask"] = ff_mask out["median"] = median return self._write_to_h5(out, "FLATFIELD") def _compute_statistics(self): """Calculate Histograms per image.""" statistics_opt = dict( mask=self.mask, setup=self.setup, geom=self.agipd_geom, max_trains=200, ) statistics_opt.update(self._statistics_opt) print("Compute pulse resolved statistics") out = statistics(self._calibrator, **statistics_opt) if out["counts"].shape[0] == (self.ntrains * self.pulses_per_train): return self._write_to_h5(out, "STATISTICS") else: return False def _write_to_h5(self, output, method): """Dump results in HDF5 file.""" if self.file_name is not None: keys = list(self.h5_structure[method.upper()].keys()) with h5.File(self.file_name, "r+") as f: for keyh5, (outkey, data) in zip(keys, output.items()): if data is not None: # check scalars and fixed_size if np.isscalar(data): f[keyh5] = data else: # print(outkey, keyh5, np.asarray(data).shape) f.create_dataset( keyh5, data=np.asarray(data), compression="gzip", chunks=True, # compression_opts=4, ) print("Wrote data to HDF5 file") return True else: warnings.warn("Results not saved. Filename not specified") return output
[docs] def merge_files(self, subset=None, delete_file=False): """merge existing HDF5 files for a run""" datasets = get_datasets(self.out_dir) ds = Interpreter(datasets) all_trains = [t[1] for t in ds.iter_trainids(self.run_number, subset)] files = [x[1] for x in ds.iter_files(self.run_number, subset)] all_trains = np.unique(np.hstack(all_trains)) ind_in_files = [] first_in_all = [] filenames = [] for file_index, (index, trains) in enumerate( ds.iter_trainids(self.run_number, subset) ): if len(trains) == 0: continue ind_in_file, ind_all = np.intersect1d( trains, all_trains, return_indices=True )[1:] if not len(ind_in_file): continue ind_in_files.append(ind_in_file) first_in_all.append(ind_all[0]) all_trains = np.delete(all_trains, ind_all) filenames.append(files[file_index]) file_order = np.argsort(first_in_all) filenames = np.array(filenames)[file_order] self._create_output_file() keys = [x for y in self.h5_structure.values() for x in y.keys()] with h5.File(self.file_name, "a") as F: for filename in filenames: with h5.File(filename, "r") as f: for method in self.h5_structure: for key, value in self.h5_structure[method].items(): fixed_size = bool(value[0]) if key in f: data = f[key] s = data.shape if not key in F: # check scalars and fixed_size if len(s) == 0: F[key] = np.array(data) else: F.create_dataset( key, data=data, compression="gzip", chunks=True, maxshape=(None, *s[1:]), ) else: if not fixed_size: F[key].resize((F[key].shape[0] + s[0]), axis=0) F[key][-s[0] :] = data if delete_file: os.remove(filename)
def _get_parser(): """Command line parser""" parser = argparse.ArgumentParser( prog="midtools", description="Analyze MID runs.", ) parser.add_argument( "setupfile", type=str, help="the YAML file to configure midtools", ) parser.add_argument( "analysis", type=str, help=( "which analysis to perform. List of 0s and 1s:\n" "1000 saves average data along specific axis,\n" "0100 SAXS routines,\n" "0010 XPCS routines,\n" "0001 statistics (histograms pulse resolved." ), ) parser.add_argument( "-r", "--run", type=int, help="Run number.", default=None, ) parser.add_argument( "-dr", "--dark-run-number", type=int, help="Dark run number.", default=None, nargs=2, ) parser.add_argument( "--last-train", type=int, help="last train to analyze.", default=1_000_000, ) parser.add_argument( "--first-train", type=int, help="first train to analyze.", default=0, ) parser.add_argument( "-ppt", "--pulses-per-train", type=int, help="number of pulses per train", default=500, ) parser.add_argument( "-ts", "--train-step", type=int, help="spacing of trains", default=1, ) parser.add_argument( "-ps", "--pulse-step", type=int, help="spacing of pulses", default=1, ) parser.add_argument( "--is-dark", help="whether the run is a dark run", const=True, default=False, nargs="?", ) parser.add_argument( "--is-flatfield", help="whether the run is a flatfield run", const=True, default=False, nargs="?", ) parser.add_argument( "-ffr", "--flatfield-run", type=int, help="Flatfield run number.", default=None, nargs=2, ) parser.add_argument( "--out-dir", type=str, help="Output directory", default="./", nargs="?", ) parser.add_argument( "--chunk", nargs="?", default=None, type=int, required=False, help="Split the number of trains in chunks of this size (default do not chunk)", ) parser.add_argument( "--job-dir", default="/gpfs/exfel/data/scratch/reiserm/mid-proteins/jobs/", required=False, help="Directory for the slurm output and error files", ) parser.add_argument( "--slurm", default=False, const=True, nargs="?", required=False, help="Run midtools on dedicated node with slurm job (default False)", ) parser.add_argument( "--localcluster", default=False, const=True, nargs="?", required=False, help="Use dasks LocalCluster to run midtools locally (default False)", ) parser.add_argument( "--file-identifier", default=None, type=int, nargs="?", required=False, help="Identifier at file ending. Default None.", ) parser.add_argument( "--first-cell", type=int, help="Cell ID of the first AGIPD memory cell with X-rays.", required=False, default=2, ) parser.add_argument( "--datdir", required=False, default=None, help="Path to the data. This argument is only used if the data directory is not provided in the setupfile.", ) return parser @_exception_handler(max_attempts=3) def _submit_slurm_job(run, args, test=False): args = dict(args) job_dir = args.pop("job_dir", "./jobs") job_dir = os.path.abspath(job_dir) setupfile = args.pop("setupfile") analysis = args.pop("analysis") for key, val in list(args.items()): if not bool(val): args.pop(key) elif isinstance(val, (list, tuple)): args[key] = " ".join(map(str, val)) midtools_args = " ".join( [f"--{arg.replace('_','-')} {val}" for arg, val in args.items()] ) print(f"Generating sbatch jobs for run: {run}") if not os.path.exists(job_dir) and not test: os.mkdir(job_dir) env_dir = '/'.join(sys.executable.split('/')[:-2]) TEMPLATE = """#!/bin/bash #SBATCH --job-name=midtools #SBATCH --output={job_file}.out #SBATCH --error={job_file}.err #SBATCH --partition=upex #SBATCH --exclusive #SBATCH --time 06:00:00 source {env_dir}/bin/activate echo "SLURM_JOB_ID $SLURM_JOB_ID" type midtools ulimit -n 4096 ulimit -c unlimited midtools {setupfile} {analysis} -r {run} {midtools_args} exit """ print(f"With arguments: `{midtools_args}`") timestamp = time.strftime("%Y-%m-%dT%H-%M-%S") random_id = np.random.randint(0, 999) job_name = f"{timestamp}-{random_id:03}-run{run}" job_file = f"{job_dir}/{job_name}.job" job = TEMPLATE.format_map( { "setupfile": setupfile, "analysis": analysis, "run": run, "job_name": job_name, "job_file": job_file, "midtools_args": midtools_args, "env_dir": env_dir, } ) print(f"Generating and submitting sbatch for job {job_name}") if not test: with open(job_file, "w") as f: f.write(job) os.system(f"sbatch {job_file}") else: print(job) def run_single(run_number, setupfile, analysis, **kwargs): t_start = time.time() dataset = Dataset(run_number, setupfile, analysis=analysis, **kwargs) print("Protein Mode") print(f"\n{' Starting Analysis ':-^50}") print(f"Analyzing {dataset.datdir}") dataset.compute() elapsed_time = time.time() - t_start print(f"\nFinished: elapsed time: {elapsed_time/60:.2f}min") print(f"Results saved under {dataset.file_name}\n") def main(): parser = _get_parser() args = vars(parser.parse_args()) run = args.pop("run") if args["chunk"] is None: if args["slurm"]: args["slurm"] = False _submit_slurm_job(run, args) else: setupfile = args.pop("setupfile") analysis = args.pop("analysis") run_single(run, setupfile, analysis, **args) else: args["slurm"] = False first, last = args["first_train"], args["last_train"] chunksize = args.pop("chunk") for first in range(first, last, chunksize): args["first_train"] = first args["last_train"] = first + chunksize _submit_slurm_job(run, args) if __name__ == "__main__": main()