Source code for fast_forward.mapping

# Copyright 2020 University of Groningen
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import numpy as np
import numba
from numba import njit, prange, cuda
import MDAnalysis as mda

def _selector(atomgroup, indices, names):
    names_string = " ".join(names)
    atoms = atomgroup.select_atoms("name " + names_string)
    return atoms

[docs] def create_new_universe(universe, mapped_trajectory, mappings, n_residues=None, res_iter=None): """ Create a new universe according to the definitions in mappings the old universe, and the mapped trajectory. Parameters ----------- universe :class:`fast_forward.universe_handler.UniverseHandler` mapped_trajectory: :class:`~numpy.ndarray` coordiante array with shape (n_frames, n_atoms, 3) mappings: dict a dict of resname, mapping object :class:`~fast_forward.map_file_parers.Mapping` Returns -------- :class:`~MDAnalysis.core.universe.Universe` the new universe that ties all information """ # copy the dimensions array n_frames = mapped_trajectory.shape[0] dimensions = np.zeros((n_frames, 6)) for fdx, ts in enumerate(universe.trajectory): dimensions[fdx, :] = ts.dimensions # either we loop over pre-defined residues or the universe ones if res_iter is None: res_iter = universe # create the universe to be returend # initalize some attributes n_residues = len(list(res_iter.res_iter())) n_atoms = mapped_trajectory.shape[1] res_seg = np.array([1] * n_residues) # to read out these we have to iterate over universe again atom_resindex = [] atomnames = [] resids = [] resnames = [] for idx, residue in enumerate(res_iter.res_iter()): for bead in mappings[residue.resname].beads: atomnames.append(bead) atom_resindex.append(idx) resids.append(idx) resnames.append(mappings[residue.resname].to_resname) # now create the empty universe cg_universe = mda.Universe.empty(trajectory=True, n_atoms=n_atoms, n_residues=n_residues, atom_resindex=atom_resindex, residue_segindex=res_seg, ) # add the coordinates cg_universe.trajectory.coordinate_array = mapped_trajectory cg_universe.trajectory.dimensions_array = dimensions cg_universe.trajectory.n_frames = n_frames # some more labels to make the universe sane cg_universe.add_TopologyAttr("names", values=atomnames) cg_universe.add_TopologyAttr("resnames", values=resnames) cg_universe.add_TopologyAttr("resids", values=resids) cg_universe.trajectory[0] return cg_universe
[docs] def forward_map_indices(universe, mappings): """ Map the universe atom indices to the new universe. """ mapped_atoms = [] weights = [] bead_idxs = [] total_beads = 0 for residue in universe.res_iter(): resid = residue.resid resname = residue.resname mapping = mappings[resname] for bead_count, bead in enumerate(mapping.beads): idxs = mapping.bead_to_idx[bead] weights_from_atoms = mapping.atom_weights[bead] names = mapping.bead_to_atom[bead] atoms = _selector(residue.atoms, idxs, names) weights.append(numba.typed.List(weights_from_atoms)) mapped_atoms.append(numba.typed.List(atoms.indices)) bead_idxs.append(total_beads) total_beads += 1 mapped_atoms = numba.typed.List(mapped_atoms) bead_idxs = numba.typed.List(bead_idxs) weights = numba.typed.List(weights) return mapped_atoms, bead_idxs, weights
[docs] @njit(parallel=True) def forward_map_positions(mapped_atoms, bead_idxs, weights, positions, n_frames, mode, treated_atoms): new_trajectory = np.zeros((n_frames, len(mapped_atoms), 3)) for count_lv1 in prange(len(mapped_atoms)): bead_idx = bead_idxs[np.int64(count_lv1)] atom_idxs = mapped_atoms[np.int64(count_lv1)] atom_weights = weights[np.int64(count_lv1)] for fdx in prange(0, n_frames): pre_pos = np.array([0.0, 0.0, 0.0], dtype=np.float32) treat_pos = np.array([0.0, 0.0, 0.0], dtype=np.float32) # we need to first average the non-treated atoms # and then take the average of the these atoms with # the treated atoms pre_count = 0.0 treat_count = 0.0 for kdx in prange(len(atom_idxs)): weight = atom_weights[kdx] atom_idx = atom_idxs[kdx] vector = positions[fdx, atom_idx, :] if atom_idx not in treated_atoms: pre_pos = pre_pos + vector * weight pre_count += 1.0*weight else: treat_pos = treat_pos + vector * weight treat_count += 1.0*weight if pre_count != 0: pre_pos = pre_pos / pre_count new_pos = (pre_pos + treat_pos) / (treat_count + 1) else: new_pos = treat_pos / treat_count new_trajectory[fdx, bead_idx, :] = new_pos return new_trajectory