"""Various miscellaneous functions"""
__author__ = "Adam Simpkin, Felix Simkovic & Jens Thomas"
__date__ = "05 May 2017"
__version__ = "1.0"
import glob
import logging
import math
import os
import pandas as pd
import shutil
import tempfile
from simbad.db import convert_pdb_to_dat
from simbad.util import pdb_util
from pyjob.factory import TaskFactory
# Constants that need to be accessed externally (e.g. by CCP4I2)
SIMBAD_DIRNAME = 'SIMBAD'
SIMBAD_PYRVAPI_SHAREDIR = 'jsrview'
EXPORT = "SET" if os.name == "nt" else "export"
CMD_PREFIX = "call" if os.name == "nt" else ""
logger = logging.getLogger(__name__)
[docs]def get_sequence(input_f, output_s):
"""Output sequence file from input pdb file"""
ps = pdb_util.PdbStructure()
ps.from_file(input_file=input_f)
seq_info = ps.get_sequence_info
with open(output_s, 'w') as f_out:
for i in seq_info:
f_out.write(">{}".format(i) + os.linesep)
f_out.write(seq_info[i] + os.linesep)
[docs]def get_mrbump_ensemble(mrbump_dir, final):
"""Output ensemble from mrbump directory to a dat file"""
if os.path.isdir(mrbump_dir):
ensemble = glob.iglob(os.path.join(mrbump_dir, 'models', 'domain_*', 'ensembles',
'gesamtEnsTrunc_*_100.0_SideCbeta.pdb'))[0]
convert_pdb_to_dat(ensemble, final)
else:
logger.critical("Directory missing: {}".format(mrbump_dir))
[docs]def output_files(run_dir, result, output_pdb, output_mtz):
"""Return output pdb/mtz from best result in result obj"""
pdb_code = result[0]
stem = os.path.join(run_dir, 'output_files', pdb_code)
input_pdb = os.path.join(stem, '{0}_refinement_output.pdb'.format(pdb_code))
input_mtz = os.path.join(stem, '{0}_refinement_output.mtz'.format(pdb_code))
shutil.copyfile(input_pdb, output_pdb)
shutil.copyfile(input_mtz, output_mtz)
[docs]def result_by_score_from_csv(f, score, ascending=True):
"""Return result with the best defined score"""
df = pd.read_csv(f)
df.sort_values(score, ascending=ascending, inplace=True)
return df.loc[0, ["pdb_code", score]].tolist()
[docs]def summarize_result(results, csv_file=None, columns=None):
"""Summarize the search results"""
kwargs = {}
if columns:
kwargs["columns"] = ["pdb_code"] + columns
df = pd.DataFrame([r._as_dict() for r in results], **kwargs)
df.set_index("pdb_code", inplace=True)
if csv_file:
logger.debug("Storing results in file: %s", csv_file)
df.to_csv(csv_file)
if df.empty:
logger.info("No results found")
else:
summary_table = "The results for this search are:\n\n%s\n"
logger.info(summary_table, df.to_string())
[docs]def tmp_dir(directory=None, prefix="tmp", suffix=""):
"""Return a filename for a temporary directory
Parameters
----------
directory : str, optional
Path to a directory to write the files to.
prefix : str, optional
A prefix to the temporary filename
suffix : str, optional
A suffix to the temporary filename
"""
return tempfile.mkdtemp(dir=directory, prefix=prefix, suffix=suffix)
[docs]def tmp_file(delete=False, directory=None, prefix="tmp", stem=None, suffix=""):
"""Return a filename for a temporary file
The naming convention of scripts will be ``prefix`` + ``stem`` + ``suffix``.
Parameters
----------
delete : bool, optional
Delete the file, thus return name only [default: True]
directory : str, optional
Path to a directory to write the files to
prefix : str, optional
A prefix to the temporary filename
stem : str, optional
The steam part of the script name
suffix : str, optional
A suffix to the temporary filename
"""
if directory is None:
directory = tempfile.gettempdir()
if stem is None:
tmpf = tempfile.NamedTemporaryFile(delete=delete, dir=directory, prefix=prefix, suffix=suffix)
tmpf.close()
return tmpf.name
else:
tmpf = os.path.join(directory, "".join([prefix, stem, suffix]))
if not delete:
open(tmpf, 'w').close()
return tmpf
[docs]def submit_chunk(collector, run_dir, nproc, job_name, submit_qtype, submit_queue, permit_nonzero, monitor, success_func):
"""Submit jobs in small chunks to avoid using too much disk space
Parameters
----------
collector : list
:obj:`~pyjob.script.ScriptCollector` containing run scripts
nproc : int, optional
The number of processors to run the job on
job_name : str
The name of the job to submit
submit_qtype : str
The cluster submission queue type - currently support SGE and LSF
submit_queue : str
The queue to submit to on the cluster
permit_nonzero : bool
Permit non-zero return codes from TaskFactory
success_func : func
function to check for success
"""
if submit_qtype == 'local':
processes = nproc
array_size = None
else:
processes = None
array_size = nproc
with TaskFactory(submit_qtype,
collector,
cwd=run_dir,
name=job_name,
processes=processes,
max_array_size=array_size,
queue=submit_queue,
permit_nonzero=permit_nonzero,
shell='/bin/bash',
priority=-10) as task:
task.run()
interval = int(math.log(len(collector.scripts)) / 3)
interval_in_seconds = interval if interval >= 5 else 5
task.wait(interval=interval_in_seconds, monitor_f=monitor, success_f=success_func)