#!/usr/bin/env python
# -*- coding: utf-8 -*-
import logging
import os
import sys
import shutil
import datetime as dt
import glob
import netCDF4 as nc
import subprocess
import numpy as np
from datetime import datetime
import math
from . import tools
BASIC_PYTHON_JOB = True
[docs]def main(cfg):
"""
Calculates 2D column data and writes them into a new netCDF file.
Only a fixed number of levels from **COSMO** output are considered.
Those files are written into a new directory ``cosmo_output_reduced``.
The number of levels is set by the configuration variable
``cfg.reduce_output['output_levels']`` (default = all levels).
**Important**: If several ``GRIBOUT`` sections are used to split the output
data, then this code only works in case of the following:
The tracers, for which the column-averaged dry-air (``X``) and
moist-air (``Y``) mole fractions are calculated, have to be
1. saved in a separate output file and
2. the output file appears alphabetically **after** the meteorological
variables.
For example, use a GRIBOUT section suffix ``_met`` for standard **COSMO**
output, and ``_trc`` for tracers.
Parameters
----------
cfg : Config
Object holding all user-configuration parameters as attributes
"""
tools.change_logfile(cfg.logfile)
cosmo_output = cfg.cosmo_output
output_path = cfg.cosmo_output_reduced
date = dt.datetime.today()
to_print = """reduce_output
=====================================================
============== POST PROCESSING BEGINS ===============
============== StartTime: %s
=====================================================""" % date.strftime("%s")
logging.info(to_print)
tools.create_dir(output_path, "output")
if cfg.compute_host != "daint":
logging.error(
"The reduce_output script is supposed to be run on daint only, "
"not on %s" % cfg.compute_host)
sys.exit(1)
# Wait for Cosmo to finish first
tools.check_job_completion(cfg.log_finished_dir, "cosmo")
# Number of levels and switch for unit conversion for 'reduce_output' job
if not hasattr(cfg, 'output_levels'):
cfg.output_levels = -1
if not hasattr(cfg, 'convert_gas'):
cfg.convert_gas = True
"""Get list of constant files"""
cfiles = []
read_cfile = False
for infile in sorted(
glob.glob(os.path.join(cosmo_output, "lffd*[0-9]c*.nc"))):
cfiles.append(infile)
if not read_cfile:
# Read the first constant file and store height value
logging.info(infile)
with nc.Dataset(infile) as inf:
h = np.array(inf.variables['HHL'][0])
read_cfile = True
logging.info('Successfully read constant file %s' % infile)
if not read_cfile:
logging.error('Constant file could not be read')
"""Copy constant file directly"""
if os.path.exists(cfiles[0]):
shutil.copy(cfiles[0], output_path)
logging.info('Copied constant file %s to %s.' %
(cfiles[0], output_path))
else:
logging.error('Constant file could not be copied.')
"""Get list of files"""
infiles = sorted(glob.glob(os.path.join(cosmo_output, "lffd*.nc")))
mytimes = []
for infile in list(infiles):
basename = os.path.split(infile)[1]
timestr = basename.split('lffd', 1)[1][:10]
mytimes.append(datetime.strptime(timestr, '%Y%m%d%H'))
str_startdate = mytimes[0].strftime('%Y-%m-%d %H')
str_enddate = mytimes[-1].strftime('%Y-%m-%d %H')
"""Compute time step for parallel tasks"""
ncores = 36
total_time = mytimes[-1] - mytimes[0]
nout_times = int(total_time.total_seconds() // 3600) + 1
output_step = int(max(math.ceil(nout_times / ncores), 2))
"""Execute parallel bash script"""
dir_path = os.path.dirname(os.path.realpath(__file__))
tool_path = os.path.join(dir_path, 'tools')
bash_file = os.path.join(tool_path, 'reduce_output_parallel.bash')
py_file = os.path.join(tool_path, 'reduce_output_start_end.py')
alternate_csv_file = os.path.join(cfg.chain_src_dir, 'cases', cfg.casename,
'variables.csv')
logging.info('Submitting job to the queue...')
result = subprocess.run([
"sbatch", '--output=' + cfg.logfile, '--open-mode=append', '--wait',
bash_file, py_file, cosmo_output, output_path, str_startdate,
str_enddate,
str(cfg.reduce_output['output_levels']),
str(output_step), alternate_csv_file,
str(cfg.reduce_output['convert_gas'])
])
exitcode = result.returncode
if exitcode != 0:
raise RuntimeError("sbatch returned exitcode {}".format(exitcode))
"""Check if all files have been processed"""
cfile_names = []
for cfile in cfiles:
cfile_names.append(os.path.basename(cfile))
files_cosmo = sorted([
name for name in os.listdir(cosmo_output)
if os.path.isfile(os.path.join(cosmo_output, name))
and name not in cfile_names
])
files_reduced = sorted([
name for name in os.listdir(output_path)
if os.path.isfile(os.path.join(output_path, name))
and name not in cfile_names
])
if not files_cosmo == files_reduced:
logging.error(set(files_cosmo).symmetric_difference(files_reduced))
logging.error('Reduced output files differ from original output!')
raise RuntimeError('Error in reduce_output job')
"""Check for corrupted files"""
for f in [
os.path.join(output_path, name) for name in os.listdir(output_path)
]:
with nc.Dataset(f) as _:
logging.info('File %s is not corrupted' % f)
date = dt.datetime.today()
to_print = """=====================================================
============== POST PROCESSING ENDS ==================
============== EndTime: %s
=====================================================""" % date.strftime("%s")
logging.info(to_print)