Source code for jobs.prepare_cosmo

#!/usr/bin/env python
# -*- coding: utf-8 -*-

from pathlib import Path
import logging
import csv
import os
from datetime import timedelta
from . import tools

BASIC_PYTHON_JOB = True


def set_cfg_variables(cfg):
    cfg.int2lm_root = cfg.chain_root / 'int2lm'
    cfg.int2lm_input = cfg.int2lm_root / 'input'
    cfg.int2lm_run = cfg.chain_root / 'int2lm' / 'run'
    cfg.int2lm_output = cfg.chain_root / 'int2lm' / 'output'

    cfg.cosmo_base = cfg.chain_root / 'cosmo'
    cfg.cosmo_input = cfg.chain_root / 'cosmo' / 'input'
    cfg.cosmo_run = cfg.chain_root / 'cosmo' / 'run'
    cfg.cosmo_output = cfg.chain_root / 'cosmo' / 'output'
    cfg.cosmo_output_reduced = cfg.chain_root / 'cosmo' / 'output_reduced'

    # Number of tracers
    if 'tracers' in cfg.workflow['features']:
        tracer_csvfile = cfg.chain_src_dir / 'cases' / cfg.casename / 'cosmo_tracers.csv'
        if tracer_csvfile.is_file():
            with open(tracer_csvfile, 'r') as csv_file:
                reader = csv.DictReader(csv_file, delimiter=',')
                reader = [r for r in reader if r[''] != '#']
                cfg.in_tracers = len(reader)
        else:
            raise FileNotFoundError(f"File not found: {tracer_csvfile}")

        # tracer_start namelist parameter for spinup simulation
        if hasattr(cfg, 'spinup'):
            if cfg.first_one:
                cfg.tracer_start = 0
            else:
                cfg.tracer_start = cfg.spinup
        else:
            cfg.tracer_start = 0

    # asynchronous I/O
    if hasattr(cfg, 'cfg.cosmo_np_io'):
        if cfg.cosmo_np_io == 0:
            cfg.lasync_io = '.FALSE.'
            cfg.num_iope_percomm = 0
        else:
            cfg.lasync_io = '.TRUE.'
            cfg.num_iope_percomm = 1

    # If nested run: use output of mother-simulation
    if 'nesting' in cfg.workflow['features'] and not os.path.isdir(
            cfg.meteo.dir):
        # if ifs_hres_dir doesn't point to a directory,
        # it is the name of the mother run
        mother_name = cfg.meteo.dir
        cfg.meteo.dir = cfg.work_root / mother_name / cfg.chunk_id / 'cosmo' / 'output'
        cfg.meteo.inc = 1
        cfg.meteo.prefix = 'lffd'


[docs]def main(cfg): """ **COSMO Data Preparation** This function prepares input data for COSMO simulations by creating necessary directories, copying meteorological files, and handling specific data processing. - Copy meteorological files to **int2lm** input. - Create the necessary directory ``cfg.int2lm_input/meteo``. - Copy meteorological files from the project directory (``cfg.meteo['dir']/cfg.meteo['prefix']YYYYMMDDHH``) to the int2lm input folder on scratch (``cfg.int2lm_input/meteo``). - For nested runs (meteorological files are COSMO output: ``cfg.meteo['prefix'] == 'lffd'``), also copy the ``*c.nc``-file with constant parameters. Parameters ---------- cfg : Config Object holding all user-configuration parameters as attributes. Raises ------ RuntimeError If any subprocess returns a non-zero exit code during execution. """ set_cfg_variables(cfg) tools.change_logfile(cfg.logfile) logging.info('COSMO analysis data for IC/BC') dest_path = cfg.int2lm_input / 'meteo' tools.create_dir(dest_path, "meteo input") source_nameformat = cfg.meteo['nameformat'] if cfg.meteo['prefix'] == 'lffd': # nested runs use cosmoart-output as meteo data # have to copy the *c.nc-file src_file = (cfg.meteo['dir'] / cfg.startdate_sim.strftime(source_nameformat + 'c.nc')) tools.copy_file(src_file, dest_path, output_log=True) logging.info("Copied constant-param file from {} to {}".format( src_file, dest_path)) # extend nameformat with ending to match cosmo-output source_nameformat += '.nc' if cfg.meteo['prefix'] == 'efsf': source_nameformat = cfg.meteo['prefix'] + '%y%m%d%H' num_steps = 0 meteo_dir = cfg.meteo['dir'] subdir = meteo_dir / cfg.startdate_sim.strftime('%y%m%d%H') for time in tools.iter_hours(cfg.startdate_sim, cfg.enddate_sim, cfg.meteo['inc']): dest_path = cfg.int2lm_input / 'meteo' src_file = meteo_dir / time.strftime(source_nameformat) if cfg.meteo['prefix'] == 'efsf': if time == cfg.startdate_sim: src_file = subdir / ('eas' + time.strftime('%Y%m%d%H')) if not src_file.exists() and cfg.meteo.get('dir_alt') \ is not None: meteo_dir = cfg.meteo['dir_alt'] subdir = meteo_dir / cfg.startdate_sim.strftime('%y%m%d%H') src_file = subdir / ('eas' + time.strftime('%Y%m%d%H')) dest_path = cfg.int2lm_input / 'meteo' / (cfg.meteo['prefix'] + '00000000') else: td = time - cfg.startdate_sim - timedelta(hours=6 * num_steps) days = str(td.days).zfill(2) hours = str(td.seconds // 3600).zfill(2) td_total = time - cfg.startdate_sim days_total = str(td_total.days).zfill(2) hours_total = str(td_total.seconds // 3600).zfill(2) src_file = subdir / (cfg.meteo['prefix'] + days + hours + '0000') dest_path = cfg.int2lm_input / 'meteo' / ( cfg.meteo['prefix'] + days_total + hours_total + '0000') # Next time, change directory checkdir = meteo_dir / time.strftime('%y%m%d%H') if checkdir.is_dir(): num_steps += 1 subdir = checkdir elif cfg.meteo.get('dir_alt') is not None: checkdir = cfg.meteo['dir_alt'] / time.strftime('%y%m%d%H') if checkdir.is_dir(): num_steps += 1 subdir = checkdir meteo_dir = cfg.meteo['dir_alt'] logging.info( "Switching to other input directory from {} to {}". format(cfg.meteo['dir'], cfg.meteo['dir_alt'])) elif not src_file.exists(): # special case for MeteoSwiss COSMO-7 data archive = Path('/store/mch/msopr/owm/COSMO-7') yy = time.strftime("%y") path = archive / 'ANA' + yy src_file = path / time.strftime(source_nameformat) # copy meteo file from project folder to tools.copy_file(src_file, dest_path, output_log=True) logging.info("Copied file from {} to {}".format(src_file, dest_path)) # Other IC/BC data inv_to_process = [] if hasattr(cfg, 'cams'): try: CAMS = dict(fullname="CAMS", nickname="cams", executable="cams4int2cosmo", indir=cfg.cams['dir_orig'], outdir=cfg.cams['dir_proc'], param=[{ 'inc': cfg.cams['inc'], 'suffix': cfg.cams['suffix'] }]) inv_to_process.append(CAMS) except AttributeError: pass try: CT = dict(fullname="CarbonTracker", nickname="ct", executable="ctnoaa4int2cosmo", indir=cfg.ct_dir_orig, outdir=cfg.ct_dir_proc, param=cfg.ct_parameters) inv_to_process.append(CT) except AttributeError: pass elif hasattr(cfg, 'mozart'): try: MOZART = dict(fullname='MOZART', nickname='mozart', executable='mozart2int2lm', indir=cfg.mozart_file_orig, outdir=cfg.mozart_dir_proc, param=[{ 'inc': cfg.mozart_inc, 'suffix': cfg.mozart_prefix }]) inv_to_process.append(MOZART) except AttributeError: pass if inv_to_process: logging.info("Processing " + ", ".join([i["fullname"] for i in inv_to_process]) + " data") scratch_path = cfg.int2lm_input / 'icbc' tools.create_dir(scratch_path, "icbc input") for inv in inv_to_process: logging.info(inv["fullname"] + " files") tools.create_dir(inv["outdir"], "processed " + inv["fullname"]) for p in inv["param"]: inc = p["inc"] for time in tools.iter_hours(cfg.startdate_sim, cfg.enddate_sim, inc): logging.info(time) filename = inv["outdir"] / ( p["suffix"] + "_" + time.strftime("%Y%m%d%H") + ".nc") if not filename.exists(): logging.info(filename) try: to_call = getattr(tools, inv["executable"]) to_call.main(time, inv["indir"], inv["outdir"], p) except: logging.error("Preprocessing " + inv["fullname"] + " data failed") raise # copy to (temporary) run input directory tools.copy_file(filename, scratch_path, output_log=True) logging.info("OK")