Source code for psi4.driver.p4util.util

#
# @BEGIN LICENSE
#
# Psi4: an open-source quantum chemistry software package
#
# Copyright (c) 2007-2019 The Psi4 Developers.
#
# The copyrights for code used from other parties are included in
# the corresponding files.
#
# This file is part of Psi4.
#
# Psi4 is free software; you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, version 3.
#
# Psi4 is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License along
# with Psi4; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
#
# @END LICENSE
#
"""Module with utility functions for use in input files."""

import os
import re
import sys
import math
import warnings

import numpy as np

from psi4 import core
from .exceptions import ValidationError, TestComparisonError


[docs]def oeprop(wfn, *args, **kwargs): """Evaluate one-electron properties. :returns: None :type wfn: :py:class:`~psi4.core.Wavefunction` :param wfn: set of molecule, basis, orbitals from which to compute properties How to specify args, which are actually the most important :type title: string :param title: label prepended to all psivars computed :examples: >>> # [1] Moments with specific label >>> E, wfn = energy('hf', return_wfn=True) >>> oeprop(wfn, 'DIPOLE', 'QUADRUPOLE', title='H3O+ SCF') """ oe = core.OEProp(wfn) if 'title' in kwargs: oe.set_title(kwargs['title']) for prop in args: oe.add(prop) oe.compute()
[docs]def cubeprop(wfn, **kwargs): """Evaluate properties on a grid and generate cube files. .. versionadded:: 0.5 *wfn* parameter passed explicitly :returns: None :type wfn: :py:class:`~psi4.core.Wavefunction` :param wfn: set of molecule, basis, orbitals from which to generate cube files :examples: >>> # [1] Cube files for all orbitals >>> E, wfn = energy('b3lyp', return_wfn=True) >>> cubeprop(wfn) >>> # [2] Cube files for density (alpha, beta, total, spin) and four orbitals >>> # (two alpha, two beta) >>> set cubeprop_tasks ['orbitals', 'density'] >>> set cubeprop_orbitals [5, 6, -5, -6] >>> E, wfn = energy('scf', return_wfn=True) >>> cubeprop(wfn) """ # By default compute the orbitals if not core.has_global_option_changed('CUBEPROP_TASKS'): core.set_global_option('CUBEPROP_TASKS', ['ORBITALS']) if ((core.get_global_option('INTEGRAL_PACKAGE') == 'ERD') and ('ESP' in core.get_global_option('CUBEPROP_TASKS'))): raise ValidationError('INTEGRAL_PACKAGE ERD does not play nicely with electrostatic potential, so stopping.') cp = core.CubeProperties(wfn) cp.compute_properties()
[docs]def set_memory(inputval, execute=True): """Function to reset the total memory allocation. Takes memory value *inputval* as type int, float, or str; int and float are taken literally as bytes to be set, string taken as a unit-containing value (e.g., 30 mb) which is case-insensitive. Set *execute* to False to interpret *inputval* without setting in Psi4 core. :returns: *memory_amount* (float) Number of bytes of memory set :raises: ValidationError when <500MiB or disallowed type or misformatted :examples: >>> # [1] Passing absolute number of bytes >>> psi4.set_memory(600000000) >>> psi4.get_memory() Out[1]: 600000000L >>> # [2] Passing memory value as string with units >>> psi4.set_memory('30 GB') >>> psi4.get_memory() Out[2]: 30000000000L >>> # Good examples >>> psi4.set_memory(800000000) # 800000000 >>> psi4.set_memory(2004088624.9) # 2004088624 >>> psi4.set_memory(1.0e9) # 1000000000 >>> psi4.set_memory('600 mb') # 600000000 >>> psi4.set_memory('600.0 MiB') # 629145600 >>> psi4.set_memory('.6 Gb') # 600000000 >>> psi4.set_memory(' 100000000kB ') # 100000000000 >>> psi4.set_memory('2 eb') # 2000000000000000000 >>> # Bad examples >>> psi4.set_memory({}) # odd type >>> psi4.set_memory('') # no info >>> psi4.set_memory("8 dimms") # unacceptable units >>> psi4.set_memory("1e5 gb") # string w/ exponent >>> psi4.set_memory("5e5") # string w/o units >>> psi4.set_memory(2000) # mem too small >>> psi4.set_memory(-5e5) # negative (and too small) """ # Handle memory given in bytes directly (int or float) if isinstance(inputval, (int, float)): val = inputval units = '' # Handle memory given as a string elif isinstance(inputval, str): memory_string = re.compile(r'^\s*(\d*\.?\d+)\s*([KMGTPBE]i?B)\s*$', re.IGNORECASE) matchobj = re.search(memory_string, inputval) if matchobj: val = float(matchobj.group(1)) units = matchobj.group(2) else: raise ValidationError("""Invalid memory specification: {}. Try 5e9 or '5 gb'.""".format(repr(inputval))) else: raise ValidationError("""Invalid type {} in memory specification: {}. Try 5e9 or '5 gb'.""".format( type(inputval), repr(inputval))) # Units decimal or binary? multiplier = 1000 if "i" in units.lower(): multiplier = 1024 units = units.lower().replace("i", "").upper() # Build conversion factor, convert units unit_list = ["", "KB", "MB", "GB", "TB", "PB", "EB"] mult = 1 for unit in unit_list: if units.upper() == unit: break mult *= multiplier memory_amount = int(val * mult) # Check minimum memory requirement min_mem_allowed = 262144000 if memory_amount < min_mem_allowed: raise ValidationError( """set_memory(): Requested {:.3} MiB ({:.3} MB); minimum 250 MiB (263 MB). Please, sir, I want some more.""" .format(memory_amount / 1024**2, memory_amount / 1000**2)) if execute: core.set_memory_bytes(memory_amount) return memory_amount
[docs]def get_memory(): """Function to return the total memory allocation.""" return core.get_memory()
[docs]def success(label): """Function to print a '*label*...PASSED' line to screen. Used by :py:func:`util.compare_values` family when functions pass. """ msg = '\t{0:.<66}PASSED'.format(label) print(msg) sys.stdout.flush() core.print_out(msg + '\n')
# Test functions
[docs]def compare_values(expected, computed, digits, label, exitonfail=True): """Function to compare two values. Prints :py:func:`util.success` when value *computed* matches value *expected* to number of *digits* (or to *digits* itself when *digits* < 1 e.g. digits=0.04). Performs a system exit on failure unless *exitonfail* False, in which case returns error message. Used in input files in the test suite. """ if digits > 1: thresh = 10**-digits message = """\t{}: computed value ({:.{digits1}f}) does not match ({:.{digits1}f}) to {digits} digits.""".format( label, computed, expected, digits1=int(digits) + 1, digits=digits) else: thresh = digits message = ("\t%s: computed value (%f) does not match (%f) to %f digits." % (label, computed, expected, digits)) if abs(float(expected) - float(computed)) > thresh: # float cast handles decimal.Decimal vars print(message) if exitonfail: raise TestComparisonError(message) if math.isnan(computed): print(message) print("\tprobably because the computed value is nan.") if exitonfail: raise TestComparisonError(message) success(label) return True
[docs]def compare_integers(expected, computed, label): """Function to compare two integers. Prints :py:func:`util.success` when value *computed* matches value *expected*. Performs a system exit on failure. Used in input files in the test suite. """ if (expected != computed): message = ("\t%s: computed value (%d) does not match (%d)." % (label, computed, expected)) raise TestComparisonError(message) success(label) return True
[docs]def compare_strings(expected, computed, label): """Function to compare two strings. Prints :py:func:`util.success` when string *computed* exactly matches string *expected*. Performs a system exit on failure. Used in input files in the test suite. """ if (expected != computed): message = ("\t%s: computed value (%s) does not match (%s)." % (label, computed, expected)) raise TestComparisonError(message) success(label) return True
[docs]def compare_matrices(expected, computed, digits, label): """Function to compare two matrices. Prints :py:func:`util.success` when elements of matrix *computed* match elements of matrix *expected* to number of *digits*. Performs a system exit on failure to match symmetry structure, dimensions, or element values. Used in input files in the test suite. """ if (expected.nirrep() != computed.nirrep()): message = ("\t%s has %d irreps, but %s has %d\n." % (expected.name(), expected.nirrep(), computed.name(), computed.nirrep())) raise TestComparisonError(message) if (expected.symmetry() != computed.symmetry()): message = ("\t%s has %d symmetry, but %s has %d\n." % (expected.name(), expected.symmetry(), computed.name(), computed.symmetry())) raise TestComparisonError(message) nirreps = expected.nirrep() symmetry = expected.symmetry() for irrep in range(nirreps): if (expected.rows(irrep) != computed.rows(irrep)): message = ("\t%s has %d rows in irrep %d, but %s has %d\n." % (expected.name(), expected.rows(irrep), irrep, computed.name(), computed.rows(irrep))) raise TestComparisonError(message) if (expected.cols(irrep ^ symmetry) != computed.cols(irrep ^ symmetry)): message = ("\t%s has %d columns in irrep, but %s has %d\n." % (expected.name(), expected.cols(irrep), irrep, computed.name(), computed.cols(irrep))) raise TestComparisonError(message) rows = expected.rows(irrep) cols = expected.cols(irrep ^ symmetry) failed = 0 for row in range(rows): for col in range(cols): if (abs(expected.get(irrep, row, col) - computed.get(irrep, row, col)) > 10**(-digits)): print("\t%s: computed value (%s) does not match (%s)." % (label, expected.get(irrep, row, col), computed.get(irrep, row, col))) failed = 1 break if (failed): print("Check your output file for reporting of the matrices.") core.print_out("The Failed Test Matrices\n") core.print_out("Computed Matrix (2nd matrix passed in)\n") computed.print_out() core.print_out("Expected Matrix (1st matrix passed in)\n") expected.print_out() raise TestComparisonError("\n") success(label) return True
[docs]def compare_vectors(expected, computed, digits, label): """Function to compare two vectors. Prints :py:func:`util.success` when elements of vector *computed* match elements of vector *expected* to number of *digits*. Performs a system exit on failure to match symmetry structure, dimension, or element values. Used in input files in the test suite. """ if (expected.nirrep() != computed.nirrep()): message = ("\t%s has %d irreps, but %s has %d\n." % (expected.name(), expected.nirrep(), computed.name(), computed.nirrep())) raise TestComparisonError(message) nirreps = expected.nirrep() for irrep in range(nirreps): if (expected.dim(irrep) != computed.dim(irrep)): message = ("\tThe reference has %d entries in irrep %d, but the computed vector has %d\n." % (expected.dim(irrep), irrep, computed.dim(irrep))) raise TestComparisonError(message) dim = expected.dim(irrep) failed = 0 for entry in range(dim): if (abs(expected.get(irrep, entry) - computed.get(irrep, entry)) > 10**(-digits)): failed = 1 break if (failed): core.print_out("The computed vector\n") computed.print_out() core.print_out("The reference vector\n") expected.print_out() message = ("\t%s: computed value (%s) does not match (%s)." % (label, computed.get(irrep, entry), expected.get(irrep, entry))) raise TestComparisonError(message) success(label) return True
[docs]def compare_arrays(expected, computed, digits, label, rtol=1.e-16): """Function to compare two numpy arrays. Prints :py:func:`util.success` when elements of vector *computed* match elements of vector *expected* to number of *digits*. Performs a system exit on failure to match symmetry structure, dimension, or element values. Used in input files in the test suite. Sets rtol to zero to match expected Psi4 behaviour, otherwise measured as: absolute(computed - expected) <= (atol + rtol * absolute(expected)) """ try: expected = np.asarray(expected) computed = np.asarray(computed) shape1 = expected.shape shape2 = computed.shape except: raise TestComparisonError("Input objects do not have a shape attribute.") if shape1 != shape2: TestComparisonError("Input shapes do not match.") tol = 10**(-digits) if not np.allclose(expected, computed, atol=tol, rtol=rtol): message = "\tArray difference norm is %12.6e." % np.linalg.norm(expected - computed) raise TestComparisonError(message) success(label) return True
[docs]def compare_cubes(expected, computed, label): """Function to compare two cube files. Prints :py:func:`util.success` when value *computed* matches value *expected*. Performs a system exit on failure. Used in input files in the test suite. """ # Grab grid points. Skip the first nine lines and the last one evec = np.genfromtxt(expected, skip_header=9, skip_footer=1) cvec = np.genfromtxt(computed, skip_header=9, skip_footer=1) if evec.size == cvec.size: if not np.allclose(cvec, evec, rtol=5e-05, atol=1e-10): message = ("\t%s: computed cube file does not match expected cube file." % label) raise TestComparisonError(message) else: message = ("\t%s: computed cube file does not match size of expected cube file." % label) raise TestComparisonError(message) success(label) return True
[docs]def compare_wavefunctions(expected, computed, digits=9, label='Wavefunctions equal'): """Function to compare two wavefunctions. Prints :py:func:`util.success` when value *computed* matches value *expected*. Performs a system exit on failure. Used in input files in the test suite. """ # yapf: disable if expected.Ca(): compare_matrices(expected.Ca(), computed.Ca(), digits, 'compare Ca') if expected.Cb(): compare_matrices(expected.Cb(), computed.Cb(), digits, 'compare Cb') if expected.Da(): compare_matrices(expected.Da(), computed.Da(), digits, 'compare Da') if expected.Db(): compare_matrices(expected.Db(), computed.Db(), digits, 'compare Db') if expected.Fa(): compare_matrices(expected.Fa(), computed.Fa(), digits, 'compare Fa') if expected.Fb(): compare_matrices(expected.Fb(), computed.Fb(), digits, 'compare Fb') if expected.H(): compare_matrices(expected.H(), computed.H(), digits, 'compare H') if expected.S(): compare_matrices(expected.S(), computed.S(), digits, 'compare S') if expected.X(): compare_matrices(expected.X(), computed.X(), digits, 'compare X') if expected.aotoso(): compare_matrices(expected.aotoso(), computed.aotoso(), digits, 'compare aotoso') if expected.gradient(): compare_matrices(expected.gradient(), computed.gradient(), digits, 'compare gradient') if expected.hessian(): compare_matrices(expected.hessian(), computed.hessian(), digits, 'compare hessian') if expected.epsilon_a(): compare_vectors(expected.epsilon_a(), computed.epsilon_a(), digits, 'compare epsilon_a') if expected.epsilon_b(): compare_vectors(expected.epsilon_b(), computed.epsilon_b(), digits, 'compare epsilon_b') if expected.frequencies(): compare_vectors(expected.frequencies(), computed.frequencies(), digits, 'compare frequencies') # yapf: enable compare_integers(expected.nalpha(), computed.nalpha(), 'compare nalpha') compare_integers(expected.nbeta(), computed.nbeta(), 'compare nbeta') compare_integers(expected.nfrzc(), computed.nfrzc(), 'compare nfrzc') compare_integers(expected.nirrep(), computed.nirrep(), 'compare nirrep') compare_integers(expected.nmo(), computed.nmo(), 'compare nmo') compare_integers(expected.nso(), computed.nso(), 'compare nso') compare_strings(expected.name(), computed.name(), 'compare name') compare_values(expected.energy(), computed.energy(), digits, 'compare energy') compare_values(expected.efzc(), computed.efzc(), digits, 'compare frozen core energy') compare_values(expected.get_dipole_field_strength()[0], computed.get_dipole_field_strength()[0], digits, 'compare dipole field strength x') compare_values(expected.get_dipole_field_strength()[1], computed.get_dipole_field_strength()[1], digits, 'compare dipole field strength y') compare_values(expected.get_dipole_field_strength()[2], computed.get_dipole_field_strength()[2], digits, 'compare dipole field strength z') compare_strings(expected.basisset().name(), computed.basisset().name(), 'compare basis set name') compare_integers(expected.basisset().nbf(), computed.basisset().nbf(), 'compare number of basis functions in set') compare_integers(expected.basisset().nprimitive(), computed.basisset().nprimitive(), 'compare total number of primitives in basis set') compare_strings(expected.molecule().name(), computed.molecule().name(), 'compare molecule name') compare_strings(expected.molecule().get_full_point_group(), computed.molecule().get_full_point_group(), 'compare molecule point group') compare_matrices(expected.molecule().geometry(), computed.molecule().geometry(), digits, 'compare molecule geometry') success(label) return True
# Uncomment and use if compare_arrays above is inadequate #def compare_lists(expected, computed, digits, label): # """Function to compare two Python lists. Prints :py:func:`util.success` # when elements of vector *computed* match elements of vector *expected* to # number of *digits*. Performs a system exit on failure to match symmetry # structure, dimension, or element values. Used in input files in the test suite. # # """ # if len(expected) != len(computed): # message = ("\tThe reference has %d entries, but the computed vector has %d\n." % (len(expected), len(computed))) # raise TestComparisonError(message) # dim = len(expected) # failed = 0 # for entry in range(dim): # if(abs(expected[entry] - computed[entry]) > 10 ** (-digits)): # print("\t%s: computed value (%s) does not match (%s)." % (label, computed[entry], expected[entry])) # failed = 1 # break # # if(failed): # core.print_out("The computed vector\n") # computed.print_out() # core.print_out("The reference vector\n") # expected.print_out() # message = ("\t%s: computed list does not match expected list." % (label, computed, expected)) # raise TestComparisonError(message) # success(label)
[docs]def copy_file_to_scratch(filename, prefix, namespace, unit, move=False): """Function to move file into scratch with correct naming convention. Arguments: @arg filename full path to file @arg prefix computation prefix, usually 'psi' @arg namespace context namespace, usually molecule name @arg unit unit number, e.g. 32 @arg move copy or move? (default copy) Example: Assume PID is 12345 and SCRATCH is /scratch/parrish/ copy_file_to_scratch('temp', 'psi', 'h2o', 32): -cp ./temp /scratch/parrish/psi.12345.h2o.32 copy_file_to_scratch('/tmp/temp', 'psi', 'h2o', 32): -cp /tmp/temp /scratch/parrish/psi.12345.h2o.32 copy_file_to_scratch('/tmp/temp', 'psi', '', 32): -cp /tmp/temp /scratch/parrish/psi.12345.32 copy_file_to_scratch('/tmp/temp', 'psi', '', 32, True): -mv /tmp/temp /scratch/parrish/psi.12345.32 """ pid = str(os.getpid()) scratch = core.IOManager.shared_object().get_file_path(int(unit)) cp = '/bin/cp' if move: cp = '/bin/mv' unit = str(unit) target = '' target += prefix target += '.' target += pid if len(namespace): target += '.' target += namespace target += '.' target += unit command = ('%s %s %s/%s' % (cp, filename, scratch, target)) os.system(command)
#print command
[docs]def copy_file_from_scratch(filename, prefix, namespace, unit, move=False): """Function to move file out of scratch with correct naming convention. Arguments: @arg filename full path to target file @arg prefix computation prefix, usually 'psi' @arg namespace context namespace, usually molecule name @arg unit unit number, e.g. 32 @arg move copy or move? (default copy) Example: Assume PID is 12345 and SCRATCH is /scratch/parrish/ copy_file_to_scratch('temp', 'psi', 'h2o', 32): -cp /scratch/parrish/psi.12345.h2o.32 .temp copy_file_to_scratch('/tmp/temp', 'psi', 'h2o', 32): -cp /scratch/parrish/psi.12345.h2o.32 /tmp/temp copy_file_to_scratch('/tmp/temp', 'psi', '', 32): -cp /scratch/parrish/psi.12345.32 /tmp/temp copy_file_to_scratch('/tmp/temp', 'psi', '', 32, True): -mv /scratch/parrish/psi.12345.32 /tmp/temp """ pid = str(os.getpid()) scratch = core.IOManager.shared_object().get_file_path(int(unit)) cp = '/bin/cp' if move: cp = '/bin/mv' unit = str(unit) target = '' target += prefix target += '.' target += pid if len(namespace): target += '.' target += namespace target += '.' target += unit command = ('%s %s/%s %s' % (cp, scratch, target, filename)) os.system(command)
[docs]def xml2dict(filename=None): """Read XML *filename* into nested OrderedDict-s. *filename* defaults to active CSX file. """ warnings.warn( "Using `psi4.driver.p4util.xml2dict` is deprecated (silently in 1.2), and in 1.3 it will stop working\n", category=FutureWarning, stacklevel=2) import xmltodict as xd if filename is None: csx = os.path.splitext(core.outfile_name())[0] + '.csx' else: csx = filename with open(csx, 'r') as handle: csxdict = xd.parse(handle) return csxdict
[docs]def getFromDict(dataDict, mapList): warnings.warn( "Using `psi4.driver.p4util.getFromDict` is deprecated (silently in 1.2), and in 1.3 it will stop working\n", category=FutureWarning, stacklevel=2) return reduce(lambda d, k: d[k], mapList, dataDict)
[docs]def csx2endict(): """Grabs the CSX file as a dictionary, encodes translation of PSI variables to XML blocks, gathers all available energies from CSX file into returned dictionary. """ warnings.warn( "Using `psi4.driver.p4util.csx2endict` is deprecated (silently in 1.2), and in 1.3 it will stop working\n", category=FutureWarning, stacklevel=2) blockprefix = [ 'chemicalSemantics', 'molecularCalculation', 'quantumMechanics', 'singleReferenceState', 'singleDeterminant' ] blockmidfix = ['energies', 'energy'] prefix = 'cs:' pv2xml = { 'MP2 CORRELATION ENERGY': [['mp2'], 'correlation'], 'MP2 SAME-SPIN CORRELATION ENERGY': [['mp2'], 'sameSpin correlation'], 'HF TOTAL ENERGY': [['abinitioScf'], 'electronic'], 'NUCLEAR REPULSION ENERGY': [['abinitioScf'], 'nuclearRepulsion'], 'DFT FUNCTIONAL TOTAL ENERGY': [['dft'], 'dftFunctional'], 'DFT TOTAL ENERGY': [['dft'], 'electronic'], 'DOUBLE-HYBRID CORRECTION ENERGY': [['dft'], 'doubleHybrid correction'], 'DISPERSION CORRECTION ENERGY': [['dft'], 'dispersion correction'], } csxdict = xml2dict() enedict = {} for pv, lpv in pv2xml.items(): address = blockprefix + lpv[0] + blockmidfix indices = [prefix + bit for bit in address] try: qwer = getFromDict(csxdict, indices) except KeyError: continue for v in qwer: vv = v.values() if vv[0] == prefix + lpv[1]: enedict[pv] = float(vv[1]) return enedict
[docs]def compare_csx(): """Function to validate energies in CSX files against PSIvariables. Only active if write_csx flag on. """ warnings.warn( "Using `psi4.driver.p4util.compare_csx` is deprecated (silently in 1.2), and in 1.3 it will stop working\n", category=FutureWarning, stacklevel=2) if 'csx4psi' in sys.modules.keys(): if core.get_global_option('WRITE_CSX'): enedict = csx2endict() compare_integers(len(enedict) >= 2, True, 'CSX harvested') for pv, en in enedict.items(): compare_values(core.variable(pv), en, 6, 'CSX ' + pv + ' ' + str(round(en, 4)))