"""
calphy: a Python library and command line interface for automated free
energy calculations.
Copyright 2021 (c) Sarath Menon^1, Yury Lysogorskiy^2, Ralf Drautz^2
^1: Max Planck Institut für Eisenforschung, Dusseldorf, Germany
^2: Ruhr-University Bochum, Bochum, Germany
calphy is published and distributed under the Academic Software License v1.0 (ASL).
calphy is distributed in the hope that it will be useful for non-commercial academic research,
but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
calphy API is published and distributed under the BSD 3-Clause "New" or "Revised" License
See the LICENSE FILE for more details.
More information about the program can be found in:
Menon, Sarath, Yury Lysogorskiy, Jutta Rogal, and Ralf Drautz.
“Automated Free Energy Calculation from Atomistic Simulations.” Physical Review Materials 5(10), 2021
DOI: 10.1103/PhysRevMaterials.5.103801
For more information contact:
sarath.menon@ruhr-uni-bochum.de/yury.lysogorskiy@icams.rub.de
"""
from typing_extensions import Annotated
from typing import Any, Callable, Dict, List, ClassVar, Literal, Optional, Union
from pydantic import (
BaseModel,
Field,
ValidationError,
model_validator,
field_validator,
conlist,
PrivateAttr,
)
from pydantic.functional_validators import AfterValidator, BeforeValidator
from annotated_types import Len
import mendeleev
from tqdm import tqdm
import yaml
import numpy as np
import copy
import datetime
import itertools
import os
import warnings
from pyscal3 import System
from pyscal3.core import structure_dict, element_dict, _make_crystal
from ase.io import read, write
import shutil
__version__ = "1.8.4"
def _check_equal(val):
if not (val[0] == val[1] == val[2]):
return False
return True
[docs]def to_list(v: Any) -> List[Any]:
return np.atleast_1d(v)
def _to_str(val):
if np.isscalar(val):
return str(val)
else:
return [str(x) for x in val]
def _to_int(val):
if np.isscalar(val):
return int(val)
else:
return [int(x) for x in val]
def _to_none(val):
if val in [
"none",
"None",
]:
return None
return val
def _to_float(val):
if np.isscalar(val):
return float(val)
else:
return [float(x) for x in val]
def _extract_elements_from_pair_coeff(pair_coeff_string):
"""
Extract element symbols from pair_coeff string.
Returns None if pair_coeff doesn't contain element specifications.
Parameters
----------
pair_coeff_string : str
The pair_coeff command string (e.g., "* * potential.eam.fs Cu Zr")
Returns
-------
list or None
List of element symbols in order, or None if no elements found
"""
if pair_coeff_string is None:
return None
pcsplit = pair_coeff_string.strip().split()
elements = []
# Start collecting after we find element symbols
# Elements are typically after the potential filename
started = False
for p in pcsplit:
# Check if this looks like an element symbol
# Element symbols are 1-2 characters, start with uppercase
if len(p) <= 2 and p[0].isupper():
try:
# Verify it's a valid element using mendeleev
_ = mendeleev.element(p)
elements.append(p)
started = True
except Exception:
# Not a valid element, might be done collecting
if started:
# We already started collecting elements and hit a non-element
break
return elements if len(elements) > 0 else None
[docs]class UFMP(BaseModel, title="UFM potential input options"):
p: Annotated[float, Field(default=50.0)]
sigma: Annotated[float, Field(default=1.5)]
[docs]class MonteCarlo(
BaseModel, title="Options for Monte Carlo moves during particle swap moves"
):
n_steps: Annotated[
int, Field(default=1, gt=0, description="perform swap moves every n_step")
]
n_swaps: Annotated[
int, Field(default=0, ge=0, description="number of swap moves to perform")
]
forward_swap_types: Annotated[
List[int],
Field(default=[], description="atom types to swap during forward integration"),
]
reverse_swap_types: Annotated[
List[int],
Field(default=[], description="atom types to swap during reverse integration"),
]
allow_all_swaps: Annotated[
bool,
Field(
default=True,
description="allow swapping between all atom types including fictitious ones",
),
]
use_custom_lammps: Annotated[
bool,
Field(
default=False,
description="Whether to use the custom modified LAMMPS version",
),
]
[docs]class CompositionScaling(BaseModel, title="Composition scaling input options"):
_input_chemical_composition: PrivateAttr(default=None)
output_chemical_composition: Annotated[dict, Field(default={}, required=False)]
restrictions: Annotated[
List[str], BeforeValidator(to_list), Field(default=[], required=False)
]
[docs]class MD(BaseModel, title="MD specific input options"):
timestep: Annotated[
float,
Field(
default=0.001,
description="timestep for md simulation",
example="timestep: 0.001",
),
]
n_small_steps: Annotated[int, Field(default=10000, gt=0)]
n_every_steps: Annotated[int, Field(default=10, gt=0)]
n_repeat_steps: Annotated[int, Field(default=10, gt=0)]
n_cycles: Annotated[int, Field(default=100, gt=0)]
thermostat_damping: Annotated[
Union[float, conlist(float, min_length=2, max_length=2)],
Field(default=0.1, gt=0),
]
barostat_damping: Annotated[
Union[float, conlist(float, min_length=2, max_length=2)],
Field(default=0.1, gt=0),
]
cmdargs: Annotated[str, Field(default="")]
init_commands: Annotated[List, Field(default=[])]
[docs]class NoseHoover(BaseModel, title="Specific input options for Nose-Hoover thermostat"):
thermostat_damping: Annotated[float, Field(default=0.1, gt=0)]
barostat_damping: Annotated[float, Field(default=0.1, gt=0)]
[docs]class Berendsen(BaseModel, title="Specific input options for Berendsen thermostat"):
thermostat_damping: Annotated[float, Field(default=100.0, gt=0)]
barostat_damping: Annotated[float, Field(default=100.0, gt=0)]
[docs]class QuantumThermalBath(BaseModel, title="Dammak quantum thermal bath (LAMMPS fix qtb)"):
"""
Colored-noise Langevin thermostat that injects quantum statistics into
classical MD (Dammak et al., Phys. Rev. Lett. 103, 190601, 2009).
Active when ``mode: fe-qtb`` is set at the top level. The QTB
thermostat is paired with ``fix nph`` (NPT) or ``fix nve`` (NVT)
inside calphy; do NOT also pair with Nose-Hoover or Langevin.
"""
thermostat_damping: Annotated[float, Field(default=0.1, gt=0)]
barostat_damping: Annotated[float, Field(default=0.1, gt=0)]
f_max: Annotated[float, Field(default=200.0, gt=0,
description="Upper cutoff frequency for QTB power spectrum (THz). "
"Must exceed the highest phonon frequency of the system.")]
n_f: Annotated[int, Field(default=100, gt=0,
description="Number of frequency bins discretising the QTB spectrum.")]
seed: Annotated[int, Field(default=880302, gt=0)]
[docs]class Queue(BaseModel, title="Options for configuring queue"):
scheduler: Annotated[str, Field(default="local")]
cores: Annotated[int, Field(default=1, gt=0)]
jobname: Annotated[str, Field(default="calphy")]
walltime: Annotated[str, Field(default="23:59:00")]
queuename: Annotated[str, Field(default="")]
memory: Annotated[str, Field(default="3GB")]
commands: Annotated[List, Field(default=[])]
options: Annotated[Dict[str, str], Field(default={})]
[docs]class Tolerance(BaseModel, title="Tolerance settings for convergence"):
lattice_constant: Annotated[float, Field(default=0.0002, ge=0)]
spring_constant: Annotated[float, Field(default=0.1, gt=0)]
# Structural phase-stability checks during equilibration are OFF by
# default: solid_fraction=0 means the melt check (solid_fraction < this)
# never fires, and liquid_fraction=1.0 means the solidify check
# (solid_fraction > this) never fires. Set solid_fraction > 0 (e.g. 0.7)
# to re-enable melt detection for a solid run, or liquid_fraction < 1
# (e.g. 0.05) to re-enable solidification detection for a liquid run.
solid_fraction: Annotated[float, Field(default=0.0, ge=0)]
liquid_fraction: Annotated[float, Field(default=1.0, ge=0)]
pressure: Annotated[float, Field(default=10.0, ge=0)]
[docs]class PhaseTransitionDetection(BaseModel, title="Settings for the pre-flight temperature-range scan"):
mode: Annotated[
Literal["none", "adapt", "warn", "stop"],
Field(
default="none",
description=(
"Controls the pre-flight temperature-range scan that runs "
"before a reversible-scaling (ts) sweep. The scan performs a "
"single fast real-thermostat temperature ramp (T0 -> Tf under "
"NPT) and watches the fluctuation response functions for the "
"onset of a phase transition.\n"
" 'none' — the scan is disabled; the ts sweep runs over the "
"requested [T0, Tf] range as-is (default).\n"
" 'adapt' — if the scan detects a transition, reduce the upper "
"temperature to the detected clean onset and run the ts sweep "
"over [T0, T_clean] (keeping the same number of switching "
"steps). If the scan is clean the range is unchanged.\n"
" 'warn' — run the scan and log the detected clean range, but "
"do NOT modify the calculation; the ts sweep runs over the full "
"requested range. Use this to observe detection without "
"changing the outcome.\n"
" 'stop' — if a transition is detected, raise "
"PhaseTransitionError reporting the clean range so you can "
"re-submit with a corrected temperature range."
),
),
]
prescan_steps: Annotated[
int,
Field(
default=20000,
ge=1000,
description=(
"Number of MD steps for the pre-flight temperature ramp from "
"T0 to Tf. Typically a little shorter than the production "
"switching length. Note that a *faster* ramp superheats (or "
"supercools) the metastable phase further and yields noisier "
"response-function signals, pushing the detected onset closer "
"to the collapse; if the adapted ts sweep melts/freezes during "
"its equilibration, increase this (or lower onset_fraction). "
"Default 20000."
),
),
]
onset_fraction: Annotated[
float,
Field(
default=0.85,
gt=0,
le=1.0,
description=(
"Fractional safety margin applied to the detected onset: the "
"adapted upper temperature is "
"T_clean = T0 + onset_fraction * (T_onset - T0). The detected "
"onset is the foot of the deviation in a *fast* diagnostic ramp, "
"but a reversible-scaling sweep then EQUILIBRATES at the boundary "
"for many steps and the metastable phase has more time to "
"nucleate the transition there — so its practical stability "
"limit is somewhat below the ramp onset, and the exact onset is "
"also noisy. Backing the boundary off by a fraction of the "
"super-heated/cooled span makes the adapted sweep robust to both "
"effects. 1.0 disables the margin (cut exactly at the onset); "
"lower values give more margin (and less usable range). This is "
"the main knob to turn if an adapted ts run still melts/freezes. "
"Default 0.85."
),
),
]
# Detector-internal calibration (peak_threshold, min_agreement, onset_sigma,
# onset_level, the smoothing/fluctuation windows, slope-break parameters …)
# are intentionally NOT exposed here. They are stable defaults living in
# calphy.range_scan.RangeScan; onset_sigma in particular had no effect on the
# result (the slope-break onset always dominates the final boundary), and
# onset_level overlapped with onset_fraction. Users tune the scan through
# mode, prescan_steps and onset_fraction only.
[docs]class MeltingTemperature(BaseModel, title="Input options for melting temperature mode"):
guess: Annotated[Union[float, None], Field(default=None, gt=0)]
step: Annotated[int, Field(default=200, ge=20)]
attempts: Annotated[int, Field(default=5, ge=1)]
[docs]class MaterialsProject(BaseModel, title="Input options for materials project"):
api_key: Annotated[str, Field(default="", exclude=True)]
conventional: Annotated[bool, Field(default=True)]
target_natoms: Annotated[
int,
Field(
default=1500,
description="The structure parsed from materials project would be repeated to approximately this value",
),
]
[docs] @field_validator("api_key", mode="after")
def resolve_api_key(cls, v: str) -> str:
if not v:
return v
value = os.getenv(v)
if not value:
raise ValueError(
f"Environment variable '{v}' not found or empty. "
f"Set it before running, e.g.:\n export {v}='your_api_key_here'"
)
return value
[docs]class Calculation(BaseModel, title="Main input class"):
monte_carlo: Optional[MonteCarlo] = MonteCarlo()
composition_scaling: Optional[CompositionScaling] = CompositionScaling()
md: Optional[MD] = MD()
nose_hoover: Optional[NoseHoover] = NoseHoover()
berendsen: Optional[Berendsen] = Berendsen()
quantum_thermal_bath: Optional[QuantumThermalBath] = QuantumThermalBath()
queue: Optional[Queue] = Queue()
tolerance: Optional[Tolerance] = Tolerance()
phase_transition_detection: Optional[PhaseTransitionDetection] = PhaseTransitionDetection()
uhlenbeck_ford_model: Optional[UFMP] = UFMP()
melting_temperature: Optional[MeltingTemperature] = MeltingTemperature()
materials_project: Optional[MaterialsProject] = MaterialsProject()
element: Annotated[List[str], BeforeValidator(to_list), Field(default=[])]
n_elements: Annotated[int, Field(default=0)]
mass: Annotated[List[float], BeforeValidator(to_list), Field(default=[])]
_element_dict: dict = PrivateAttr(default={})
kernel: Annotated[int, Field(default=0)]
inputfile: Annotated[str, Field(default="")]
mode: Annotated[Union[str, None], Field(default=None)]
lattice: Annotated[str, Field(default="")]
file_format: Annotated[str, Field(default="lammps-data")]
# pressure properties
pressure: Annotated[
Union[
None,
float,
conlist(float, min_length=1, max_length=3), # Allow lists of 1-3 floats
conlist(
conlist(float, min_length=3, max_length=3), min_length=1, max_length=2
),
],
Field(default=0),
]
_pressure: float = PrivateAttr(default=None)
_pressure_stop: float = PrivateAttr(default=None)
_pressure_input: Any = PrivateAttr(default=None)
_iso: bool = PrivateAttr(default=False)
_pressure_coupling: str = PrivateAttr(default="iso")
_fix_lattice: bool = PrivateAttr(default=False)
pressure_coupling: Annotated[
Union[str, None],
Field(
default=None,
description="Pressure coupling style for LAMMPS barostat: 'iso', 'aniso', or 'tri'. "
"If not set, 'iso' is used for scalar/1D pressure and 'aniso' for 3-component pressure.",
),
]
temperature: Annotated[
Union[float, conlist(float, min_length=1, max_length=2)], Field(default=0)
]
temperature_high: Annotated[float, Field(default=0.0)]
_temperature: float = PrivateAttr(default=None)
_temperature_high: float = PrivateAttr(default=None)
_temperature_stop: float = PrivateAttr(default=None)
_temperature_input: float = PrivateAttr(default=None)
melting_cycle: Annotated[bool, Field(default=True)]
pair_style: Annotated[
Union[List[str], None], BeforeValidator(to_list), Field(default=None)
]
pair_coeff: Annotated[
Union[List[str], None], BeforeValidator(to_list), Field(default=None)
]
pair_mode: Annotated[Union[str, None], Field(default=None)]
potential_file: Annotated[Union[str, None], Field(default=None)]
fix_potential_path: Annotated[bool, Field(default=True)]
_pair_style_with_options: List[str] = PrivateAttr(default=None)
reference_phase: Annotated[str, Field(default="")]
lattice_constant: Annotated[float, Field(default=0)]
repeat: Annotated[
conlist(int, min_length=3, max_length=3), Field(default=[1, 1, 1])
]
script_mode: Annotated[bool, Field(default=False)]
lammps_executable: Annotated[Union[str, None], Field(default=None)]
mpi_executable: Annotated[Union[str, None], Field(default=None)]
npt: Annotated[bool, Field(default=True)]
n_equilibration_steps: Annotated[int, Field(default=25000)]
n_switching_steps: Annotated[
Union[int, conlist(int, min_length=2, max_length=2)],
Field(default=[50000, 50000]),
]
_n_switching_steps: int = PrivateAttr(default=50000)
_n_sweep_steps: int = PrivateAttr(default=50000)
n_print_steps: Annotated[int, Field(default=0)]
n_iterations: Annotated[int, Field(default=1)]
lambda_schedule: Annotated[str, Field(default="linear")]
equilibration_control: Annotated[Union[str, None], Field(default=None)]
folder_prefix: Annotated[Union[str, None], Field(default=None)]
# add second level options; for example spring constants
spring_constants: Annotated[Union[List[float], None], Field(default=None)]
# some input keywords that will be used for the phase diagram mode
phase_name: Annotated[str, Field(default="")]
reference_composition: Annotated[float, Field(default=0.00)]
# structure items
_structure: Any = PrivateAttr(default=None)
# just check for nlements in compscale
_totalelements = PrivateAttr(default=0)
# internal flag set when mode == "fe-qtb"; threaded through phase.py /
# solid.py to switch the thermostat to QTB and the Einstein reference
# to the quantum harmonic-oscillator form.
_qtb: bool = PrivateAttr(default=False)
@model_validator(mode="after")
def _validate_all(self) -> "Input":
if not (len(self.element) == len(self.mass)):
raise ValueError("mass and elements should have same length")
# QTB-flavoured fe mode. We resolve it here so all downstream
# dispatch sees mode=="fe" and the _qtb flag drives QTB behaviour.
if self.mode == "fe-qtb":
if self.reference_phase and self.reference_phase.lower() == "liquid":
raise ValueError(
"mode=fe-qtb is solids-only. The Uhlenbeck-Ford liquid "
"reference is intrinsically classical and cannot be paired "
"with QTB sampling."
)
self._qtb = True
self.mode = "fe"
self.n_elements = len(self.element)
if self.potential_file is not None:
warnings.warn(
"potential_file is deprecated and will be removed in a future version. "
"Use pair_style/pair_coeff (with pair_mode: overlay for multi-potential setups) instead.",
DeprecationWarning,
stacklevel=2,
)
if self.pair_mode is not None:
self.pair_mode = self.pair_mode.lower()
if self.pair_mode not in ["overlay"]:
raise ValueError("pair_mode should be one of: overlay")
self.lambda_schedule = self.lambda_schedule.lower()
if self.lambda_schedule not in ["linear", "uniform_temperature"]:
raise ValueError(
"lambda_schedule must be one of: 'linear', 'uniform_temperature'"
)
if self.pair_mode == "overlay":
if self.pair_style is None or self.pair_coeff is None:
raise ValueError("pair_mode overlay requires pair_style and pair_coeff")
if len(self.pair_style) != len(self.pair_coeff):
raise ValueError(
"pair_mode overlay requires pair_style and pair_coeff to have the same length"
)
for ps in self.pair_style:
if ps.split()[0].startswith("hybrid"):
raise ValueError(
"pair_mode overlay expects component pair styles, not a hybrid pair_style"
)
# Validate element/mass/pair_coeff ordering consistency
# This is critical for multi-element systems where LAMMPS type numbers
# are assigned based on element order: element[0]=Type1, element[1]=Type2, etc.
if (
len(self.element) > 1
and self.pair_coeff is not None
and len(self.pair_coeff) > 0
):
extracted_elements = _extract_elements_from_pair_coeff(self.pair_coeff[0])
if extracted_elements is not None:
# pair_coeff specifies elements - check ordering
if set(extracted_elements) != set(self.element):
raise ValueError(
f"Element mismatch between 'element' and 'pair_coeff'!\n"
f" element: {self.element}\n"
f" pair_coeff: {extracted_elements}\n"
f"The elements specified must be the same."
)
if list(extracted_elements) != list(self.element):
raise ValueError(
f"Element ordering mismatch detected!\n\n"
f" element: {self.element}\n"
f" pair_coeff: {extracted_elements}\n"
f" mass: {self.mass}\n\n"
f"For multi-element systems, all three must be in the SAME order.\n\n"
f"Why this matters:\n"
f" - Element order determines LAMMPS type numbers:\n"
f" element[0] → Type 1, element[1] → Type 2, etc.\n"
f" - The pair_coeff elements must match this type order\n"
f" - The mass values must correspond to the same order\n"
f" - Composition transformations depend on this ordering\n\n"
f"Please reorder your input so element, mass, and pair_coeff\n"
f"all use the same element ordering."
)
if self.pressure_coupling is not None:
self.pressure_coupling = self.pressure_coupling.lower()
if self.pressure_coupling not in ("iso", "aniso", "tri"):
raise ValueError(
"pressure_coupling must be one of 'iso', 'aniso', or 'tri'"
)
self._pressure_input = copy.copy(self.pressure)
if self.pressure is None:
self._iso = True
self._fix_lattice = True
self._pressure = None
self._pressure_stop = None
elif np.isscalar(self.pressure):
self._pressure = self.pressure
self._pressure_stop = self.pressure
self._iso = True
self._fix_lattice = False
elif np.shape(self.pressure) == (1,):
self._iso = True
self._fix_lattice = False
self._pressure = self.pressure[0]
self._pressure_stop = self.pressure[0]
elif np.shape(self.pressure) == (2,):
self._iso = True
self._fix_lattice = False
self._pressure = self.pressure[0]
self._pressure_stop = self.pressure[1]
elif np.shape(self.pressure) == (1, 3):
if not _check_equal(self.pressure[0]):
raise ValueError("All pressure terms must be equal")
self._iso = False
self._fix_lattice = False
self._pressure = self.pressure[0][0]
self._pressure_stop = self.pressure[0][0]
elif np.shape(self.pressure) == (2, 3):
if not (_check_equal(self.pressure[0]) and _check_equal(self.pressure[1])):
raise ValueError("All pressure terms must be equal")
self._iso = False
self._fix_lattice = False
self._pressure = self.pressure[0][0]
self._pressure_stop = self.pressure[1][0]
else:
raise ValueError("Unknown format for pressure")
# Resolve the final LAMMPS barostat keyword.
# An explicit pressure_coupling always takes priority; otherwise
# fall back to the legacy _iso flag (True → "iso", False → "aniso").
if self.pressure_coupling is not None:
self._pressure_coupling = self.pressure_coupling
else:
self._pressure_coupling = "iso" if self._iso else "aniso"
self._temperature_input = copy.copy(self.temperature)
# guess a melting temp of the system, this will be mostly ignored
# chem = mendeleev.element(self.element[0])
# self._melting_temperature = chem.melting_point
try:
chem = mendeleev.element(self.element[0])
self._melting_temperature = chem.melting_point
except Exception:
self._melting_temperature = None
if self.temperature == 0:
# the only situation in which it can be None is if mode is melting temp
if len(self.element) > 1:
raise ValueError(
"Cannot guess start temperature for more than one species, please specify"
)
# now try to guess
if self._melting_temperature is None:
raise ValueError(
"Could not guess start temperature for more than one species, please specify"
)
self._temperature = self._melting_temperature
self._temperature_stop = self._melting_temperature
if self.temperature_high == 0:
self._temperature_high = 2 * self._melting_temperature
else:
self._temperature_high = self.temperature_high
elif np.shape(np.atleast_1d(self.temperature)) == (1,):
temp = np.atleast_1d(self.temperature)
self._temperature = temp[0]
self._temperature_stop = temp[0]
if self.temperature_high == 0:
self._temperature_high = 2 * temp[0]
else:
self._temperature_high = self.temperature_high
elif np.shape(self.temperature) == (2,):
temp = self.temperature
self._temperature = temp[0]
self._temperature_stop = temp[1]
if self.temperature_high == 0:
self._temperature_high = 2 * temp[1]
else:
self._temperature_high = self.temperature_high
# fix pair styles
# two main lists
# _pair_style_with_options, read in as it is from file
# _pair_style_names, just the names of the pair styles
_pair_style_names = []
for ps in self.pair_style:
ps_split = ps.split()
_pair_style_names.append(ps_split[0])
# only set if its None
self._pair_style_with_options = self.pair_style
self._pair_style_names = _pair_style_names
# now fix pair coeffs with path
if self.fix_potential_path:
self.pair_coeff = self.fix_paths(self.pair_coeff)
if np.isscalar(self.n_switching_steps):
self._n_sweep_steps = self.n_switching_steps
self._n_switching_steps = self.n_switching_steps
else:
self._n_sweep_steps = self.n_switching_steps[1]
self._n_switching_steps = self.n_switching_steps[0]
# here we also prepare lattice dict
for count, element in enumerate(self.element):
self._element_dict[element] = {}
self._element_dict[element]["mass"] = self.mass[count]
self._element_dict[element]["count"] = 0
self._element_dict[element]["composition"] = 0.0
self._element_dict[element]["atomic_number"] = mendeleev.element(
element
).atomic_number
# generate temporary filename if needed
write_structure_file = False
rename_structure_file = False
if self.lattice == "":
# fetch from dict
if len(self.element) > 1:
raise ValueError(
"Cannot create lattice for more than one element, provide a lammps-data file explicitly"
)
if self.element[0] in element_dict.keys():
self.lattice = element_dict[self.element[0]]["structure"]
self.lattice_constant = element_dict[self.element[0]][
"lattice_constant"
]
else:
raise ValueError(
"Could not find structure, please provide lattice and lattice_constant explicitely"
)
if self.repeat == [1, 1, 1]:
self.repeat = [5, 5, 5]
structure = _make_crystal(
self.lattice.lower(),
lattice_constant=self.lattice_constant,
repetitions=self.repeat,
element=self.element,
)
structure = structure.write.ase()
# extract composition
types, typecounts = np.unique(
structure.get_chemical_symbols(), return_counts=True
)
for c, t in enumerate(types):
self._element_dict[t]["count"] = typecounts[c]
self._element_dict[t]["composition"] = typecounts[c] / np.sum(
typecounts
)
self._natoms = len(structure)
self._original_lattice = self.lattice.lower()
write_structure_file = True
elif self.lattice.lower() in structure_dict.keys():
if len(self.element) > 1:
raise ValueError(
"Cannot create lattice for more than one element, provide a lammps-data file explicitly"
)
# this is a valid structure
if self.lattice_constant == 0:
# we try try to get lattice_constant
if self.element[0] in element_dict.keys():
self.lattice_constant = element_dict[self.element[0]][
"lattice_constant"
]
else:
raise ValueError("Please provide lattice_constant!")
# now create lattice
structure = _make_crystal(
self.lattice.lower(),
lattice_constant=self.lattice_constant,
repetitions=self.repeat,
element=self.element,
)
structure = structure.write.ase()
# extract composition
types, typecounts = np.unique(
structure.get_chemical_symbols(), return_counts=True
)
for c, t in enumerate(types):
self._element_dict[t]["count"] = typecounts[c]
self._element_dict[t]["composition"] = typecounts[c] / np.sum(
typecounts
)
# concdict_counts = {str(t): typecounts[c] for c, t in enumerate(types)}
# concdict_frac = {str(t): typecounts[c]/np.sum(typecounts) for c, t in enumerate(types)}
# self._composition = concdict_frac
# self._composition_counts = concdict_counts
self._natoms = len(structure)
self._original_lattice = self.lattice.lower()
write_structure_file = True
elif self.lattice.split("-")[0] == "mp":
# confirm here that API key exists
if not self.materials_project.api_key:
raise ValueError("could not find API KEY, pls set it.")
# now we need to fetch the structure
try:
from mp_api.client import MPRester
except ImportError:
raise ImportError(
"Could not import mp_api, make sure you install mp_api package!"
)
# now all good
rest = {
"use_document_model": False,
"include_user_agent": True,
"api_key": self.materials_project.api_key,
}
with MPRester(**rest) as mpr:
docs = mpr.materials.summary.search(material_ids=[self.lattice])
structures = []
for doc in docs:
struct = doc["structure"]
if self.materials_project.conventional:
aseatoms = struct.to_conventional().to_ase_atoms()
else:
aseatoms = struct.to_primitive().to_ase_atoms()
structures.append(aseatoms)
structure = structures[0]
if np.prod(self.repeat) == 1:
x = int(
np.ceil(
(self.materials_project.target_natoms / len(structure))
** (1 / 3)
)
)
structure = structure.repeat(x)
else:
structure = structure.repeat(self.repeat)
# extract composition
types, typecounts = np.unique(
structure.get_chemical_symbols(), return_counts=True
)
for c, t in enumerate(types):
self._element_dict[t]["count"] = typecounts[c]
self._element_dict[t]["composition"] = typecounts[c] / np.sum(
typecounts
)
self._natoms = len(structure)
self._original_lattice = self.lattice.lower()
write_structure_file = True
else:
# this is a file
if not os.path.exists(self.lattice):
raise ValueError(f"File {self.lattice} could not be found")
if self.file_format == "lammps-data":
# create atomic numbers for proper reading
Z_of_type = dict(
[
(count + 1, self._element_dict[element]["atomic_number"])
for count, element in enumerate(self.element)
]
)
structure = read(
self.lattice,
format="lammps-data",
style="atomic",
Z_of_type=Z_of_type,
)
# structure = System(aseobj, format='ase')
rename_structure_file = True
else:
raise TypeError("Only lammps-data files are supported!")
# extract composition
# this is the types read in from the file
types, typecounts = np.unique(
structure.get_chemical_symbols(), return_counts=True
)
for c, t in enumerate(types):
self._element_dict[t]["count"] = typecounts[c]
self._element_dict[t]["composition"] = typecounts[c] / np.sum(
typecounts
)
self._natoms = len(structure)
self._original_lattice = os.path.basename(self.lattice)
self.lattice = os.path.abspath(self.lattice)
# if needed, write the file out
if write_structure_file:
structure_filename = ".".join(
[self.create_identifier(), str(self.kernel), "data"]
)
structure_filename = os.path.join(os.getcwd(), structure_filename)
write(structure_filename, structure, format="lammps-data")
self.lattice = structure_filename
if rename_structure_file:
structure_filename = ".".join(
[self.create_identifier(), str(self.kernel), "data"]
)
structure_filename = os.path.join(os.getcwd(), structure_filename)
shutil.copy(self.lattice, structure_filename)
self.lattice = structure_filename
if self.mode == "composition_scaling":
# we also should check if actual contents are present
input_chem_comp = {}
for key, val in self._element_dict.items():
input_chem_comp[key] = val["count"]
self.composition_scaling._input_chemical_composition = input_chem_comp
# now we should check output chem comp and see there are no keys extra
for key in self.composition_scaling.output_chemical_composition.keys():
if (
key
not in self.composition_scaling._input_chemical_composition.keys()
):
raise ValueError(
"An element in output composition is not possible with the given potential"
)
natoms1 = np.sum(
[
val
for key, val in self.composition_scaling._input_chemical_composition.items()
]
)
natoms2 = np.sum(
[
val
for key, val in self.composition_scaling.output_chemical_composition.items()
]
)
if not (natoms1 == natoms2):
raise ValueError(
f"Input and output number of atoms are not conserved! Input {self.composition_scaling._input_chemical_composition}, output {self.composition_scaling.output_chemical_composition}, total atoms in structure {len(structure)}"
)
return self
[docs] def fix_paths(self, potlist):
"""
Fix paths for potential files to complete ones.
Also expands environment variables (e.g. ``$USER``, ``${USER}``) and
the home-directory shortcut ``~`` in potential file paths before
resolving them to absolute paths. This allows input files to contain
portable paths such as ``/home/$USER/potentials/Cu.eam``.
"""
fixedpots = []
pair_mode = getattr(self, "pair_mode", None)
pair_style_names = getattr(self, "_pair_style_names", None) or []
for pot in potlist:
pcraw = pot.split()
path_index = 2
if (
pair_mode == "overlay"
and len(pcraw) >= 4
and pcraw[2] in pair_style_names
):
path_index = 3
if len(pcraw) > path_index:
raw_filename = pcraw[path_index]
# Expand environment variables ($USER, ${USER}, $HOME, …) and ~
expanded = os.path.expandvars(os.path.expanduser(raw_filename))
abs_filename = os.path.abspath(expanded)
if os.path.exists(abs_filename):
pcnew = " ".join(
[*pcraw[:path_index], abs_filename, *pcraw[path_index + 1 :]]
)
fixedpots.append(pcnew)
elif expanded != raw_filename:
# A variable was expanded even though the file was not found
# at that location yet – still substitute so LAMMPS receives
# the resolved path rather than a literal "$USER" string.
pcnew = " ".join(
[*pcraw[:path_index], expanded, *pcraw[path_index + 1 :]]
)
fixedpots.append(pcnew)
else:
fixedpots.append(pot)
else:
fixedpots.append(pot)
return fixedpots
[docs] def create_identifier(self):
"""
Generate an identifier
Parameters
----------
calc: dict
a calculation dict
Returns
-------
identistring: string
unique identification string
"""
# lattice processed
prefix = self.mode
# Handle both validated and non-validated objects
if self._temperature is not None:
ts = int(self._temperature)
else:
# Fallback for non-validated objects
temp = (
self.temperature
if np.isscalar(self.temperature)
else self.temperature[0]
)
ts = int(temp)
if hasattr(self, "_pressure") and self._pressure is not None:
ps = "%d" % (int(self._pressure))
elif self.pressure is None:
ps = "None"
elif np.isscalar(self.pressure):
ps = "%d" % (int(self.pressure))
else:
if isinstance(self.pressure[0], list):
ps = "%d" % (int(self.pressure[0][0]))
else:
ps = "%d" % (int(self.pressure[0]))
if hasattr(self, "_original_lattice") and self._original_lattice:
l = self._original_lattice
else:
l = self.lattice
l = l.split("/")
l = l[-1]
if self.folder_prefix is None:
identistring = "-".join(
[prefix, l.lower(), self.reference_phase, str(ts), str(ps)]
)
else:
identistring = "-".join(
[
self.folder_prefix,
prefix,
l.lower(),
self.reference_phase,
str(ts),
str(ps),
]
)
return identistring
[docs] def get_folder_name(self):
identistring = self.create_identifier()
simfolder = os.path.join(os.getcwd(), identistring)
return simfolder
[docs] def create_folders(self):
"""
Create the necessary folder for calculation
Parameters
----------
calc : dict
calculation block
Returns
-------
folder : string
create folder
"""
simfolder = self.get_folder_name()
# if folder exists, delete it -> then create
if os.path.exists(simfolder):
raise ValueError(
f"Simulation folder {simfolder} exists. Please remove and run again!"
)
os.mkdir(simfolder)
return simfolder
@property
def savefile(self):
simfolder = self.get_folder_name()
return os.path.join(simfolder, "job.npy")
[docs]def save_job(job):
filename = os.path.join(job.simfolder, "job.npy")
np.save(filename, job)
[docs]def load_job(filename):
job = np.load(filename, allow_pickle=True).flatten()[0]
return job
def _read_inputfile(file, validate=False):
with open(file, "r") as fin:
data = yaml.safe_load(fin)
calculations = []
for count, calc in enumerate(tqdm(data["calculations"])):
calc["kernel"] = count
calc["inputfile"] = file
if "pressure" in calc.keys():
calc["pressure"] = _to_none(calc["pressure"])
if validate:
# Full validation - used when actually running calculations
calculations.append(Calculation(**calc))
else:
# Skip expensive validation - for fast parsing
calculations.append(Calculation.model_construct(**calc))
return calculations
def _convert_legacy_inputfile(file, return_calcs=False):
with open(file, "r") as fin:
data = yaml.safe_load(fin)
if not "element" in data.keys():
# new format
raise ValueError("Not old format, exiting..")
# prepare combos
calculations = []
for cc, ci in enumerate(data["calculations"]):
mode = ci["mode"]
if mode == "melting_temperature":
calc = {}
for key in [
"md",
"queue",
"tolerance",
"melting_temperature",
"nose_hoover",
"berendsen",
"composition_scaling",
"temperature_high",
]:
if key in data.keys():
calc[key] = copy.copy(data[key])
for key in [
"element",
"mass",
"script_mode",
"lammps_executable",
"mpi_executable",
]:
if key in data.keys():
calc[key] = data[key]
for key in [
"mode",
"pair_style",
"pair_coeff",
"pair_mode",
"pair_style_options",
"npt",
"repeat",
"n_equilibration_steps",
"n_switching_steps",
"n_print_steps",
"n_iterations",
"potential_file",
"spring_constants",
"melting_cycle",
"equilibration_control",
"folder_prefix",
"temperature_high",
]:
if key in ci.keys():
calc[key] = ci[key]
# calc['pressure'] = float(np.atleast_1d(ci["pressure"]) if "pressure" in ci.keys() else np.atleast_1d(0))
# calc['temperature'] = float(np.atleast_1d(ci["temperature"]) if "temperature" in ci.keys() else np.atleast_1d(0))
# calc['lattice'] = str(ci["lattice"]) if "lattice" in ci.keys() else 'none'
# calc['reference_phase'] = str(ci["reference_phase"]) if "reference_phase" in ci.keys() else 'none'
# calc['lattice_constant'] = float(ci["lattice_constant"]) if "lattice_constant" in ci.keys() else 0
calc["kernel"] = cc
calc["inputfile"] = file
calculations.append(calc)
else:
pressure = np.atleast_1d(ci["pressure"])
temperature = np.atleast_1d(ci["temperature"])
lattice = np.atleast_1d(ci["lattice"])
reference_phase = np.atleast_1d(ci["reference_phase"])
if "lattice_constant" in ci.keys():
lattice_constant = np.atleast_1d(ci["lattice_constant"])
else:
lattice_constant = [0 for x in range(len(lattice))]
lat_props = [
{
"lattice": lattice[x],
"lattice_constant": lattice_constant[x],
"reference_phase": reference_phase[x],
}
for x in range(len(lattice))
]
if (mode == "fe") or (mode == "alchemy") or (mode == "composition_scaling"):
combos = itertools.product(lat_props, pressure, temperature)
elif mode == "ts" or mode == "tscale":
if not len(temperature) == 2:
raise ValueError("ts/tscale mode needs 2 temperature values")
temperature = [temperature]
combos = itertools.product(lat_props, pressure, temperature)
elif mode == "pscale":
if not len(pressure) == 2:
raise ValueError("pscale mode needs 2 pressure values")
pressure = [pressure]
combos = itertools.product(lat_props, pressure, temperature)
cc = 0
for combo in combos:
calc = {}
for key in [
"md",
"queue",
"tolerance",
"melting_temperature",
"nose_hoover",
"berendsen",
"composition_scaling",
"temperature_high",
]:
if key in data.keys():
calc[key] = copy.copy(data[key])
for key in [
"element",
"mass",
"script_mode",
"lammps_executable",
"mpi_executable",
]:
if key in data.keys():
calc[key] = data[key]
for key in [
"mode",
"pair_style",
"pair_coeff",
"pair_mode",
"pair_style_options",
"npt",
"repeat",
"n_equilibration_steps",
"n_switching_steps",
"n_print_steps",
"n_iterations",
"potential_file",
"spring_constants",
"melting_cycle",
"equilibration_control",
"folder_prefix",
"temperature_high",
]:
if key in ci.keys():
calc[key] = ci[key]
# print(combo)
calc["lattice"] = str(combo[0]["lattice"])
calc["lattice_constant"] = float(combo[0]["lattice_constant"])
calc["reference_phase"] = str(combo[0]["reference_phase"])
calc["pressure"] = _to_float(combo[1])
calc["temperature"] = _to_float(combo[2])
calc["kernel"] = cc
calc["inputfile"] = file
cc += 1
calculations.append(calc)
if return_calcs:
return calculations
else:
newdata = {}
newdata["calculations"] = calculations
# print(newdata)
outfile = "new." + file
warnings.warn(
f"Old style input file calphy < v2 found. Converted input in {outfile}. Please check!"
)
with open(outfile, "w") as fout:
yaml.safe_dump(newdata, fout)
return outfile