Source code for cpyMSpec.spectrum

from .utils import init_ffi, load_shared_lib

import numbers

ffi = init_ffi()
ims = load_shared_lib(ffi)

try:
    import numpy as np
    _has_numpy = True
    _dtypes = {'f': np.float32, 'd': np.float64}
except:
    _has_numpy = False

_full_types = {'f': 'float', 'd': 'double'}

def _as_buffer(array, numtype):
    if _has_numpy:
        return np.asarray(array, dtype=_dtypes[numtype])
    else:
        from array import array as make_array
        a = make_array(numtype)
        a.extend(array)
        return a

class _cffi_buffer(object):
    def __init__(self, n, numtype):
        if isinstance(n, numbers.Number):
            if _has_numpy:
                self.buf = np.zeros(n, dtype=_dtypes[numtype])
                self.ptr = self._get_ptr(numtype)
            else:
                self.buf = None
                self.ptr = ffi.new(_full_types[numtype] + "[]", n)
        else:
            self.buf = _as_buffer(n, numtype)
            self.ptr = self._get_ptr(numtype)

    def python_data(self):
        return self.buf if _has_numpy else list(self.ptr)

    def _get_ptr(self, numtype):
        if _has_numpy:
            return ffi.cast(_full_types[numtype] + '*',
                            self.buf.__array_interface__['data'][0])
        else:
            return ffi.cast(_full_types[numtype] + '*', self.buf.buffer_info()[0])

def _raise_ims_exception():
    raise Exception(ffi.string(ims.ims_strerror()))

def _raise_ims_exception_if_null(arg):
    if arg == ffi.NULL:
        _raise_ims_exception()

def _new_spectrum(class_, raw_ptr):
    _raise_ims_exception_if_null(raw_ptr)
    obj = object.__new__(class_)
    obj.ptr = ffi.gc(raw_ptr, ims.spectrum_free)
    return obj

[docs]class SpectrumBase(object):
[docs] def sortByMass(self): ims.spectrum_sort_by_mass(self.ptr)
[docs] def sortByIntensity(self): ims.spectrum_sort_by_intensity(self.ptr)
def _copy(self, modifier): obj = self.copy() modifier(obj) return obj
[docs] def sortedByMass(self): return self._copy(SpectrumBase.sortByMass)
[docs] def sortedByIntensity(self): return self._copy(SpectrumBase.sortByIntensity)
def __str__(self): assert self.ptr != ffi.NULL tmp = self.sortedByMass() peaks = zip(tmp.masses, tmp.intensities) return "{\n " +\ ",\n ".join("{0: >9.4f}: {1: >8.4f}%".format(x[0], x[1] * 100) for x in peaks) +\ "\n}" @property def size(self): """ :returns: number of peaks in the spectrum :rtype: int """ return ims.spectrum_size(self.ptr) @property def masses(self): """ :returns: peak masses :rtype: list of floats """ buf = ffi.new("double[]", self.size) ims.spectrum_masses(self.ptr, buf) return list(buf) @property def intensities(self): """ :returns: peak intensities :rtype: list of floats """ buf = ffi.new("double[]", self.size) ims.spectrum_intensities(self.ptr, buf) return list(buf)
[docs] def addCharge(self, charge): """ Adds/subtracts mass of electrons in-place. such that the charge increases by the specified amount. :param charge: number of electrons to add :type charge: integer """ ims.spectrum_add_charge(self.ptr, charge)
[docs] def trim(self, n_peaks): """ Sorts mass and intensities arrays in descending intensity order, then removes low-intensity peaks from the spectrum. :param n_peaks: number of peaks to keep """ self.sortByIntensity() ims.spectrum_trim(self.ptr, n_peaks)
[docs]class ProfileSpectrum(SpectrumBase): """ Represents profile spectrum data, which means: * neighbor mass differences are supposed to be locally approximately equal; * occasional gaps might be present, indicating zero-intensity regions """ def __init__(self, mzs, intensities): n = len(mzs) assert len(mzs) == len(intensities) mzs = _cffi_buffer(mzs, 'd') intensities = _cffi_buffer(intensities, 'd') p = ims.spectrum_new(n, mzs.ptr, intensities.ptr) _raise_ims_exception_if_null(p) self.ptr = ffi.gc(p, ims.spectrum_free) self.sortByMass()
[docs] def copy(self): """ :returns: a (deep) copy of the instance :rtype: ProfileSpectrum """ return _new_spectrum(ProfileSpectrum, ims.spectrum_copy(self.ptr))
[docs] def centroids(self, window_size=5): """ Detects peaks in raw data. :param mzs: sorted array of m/z values :param intensities: array of corresponding intensities :param window_size: size of m/z averaging window :returns: isotope pattern containing the centroids :rtype: CentroidedSpectrum """ self.sortByMass() mzs = _cffi_buffer(self.masses, 'd') intensities = _cffi_buffer(self.intensities, 'f') n = self.size p = ims.spectrum_new_from_raw(n, mzs.ptr, intensities.ptr, int(window_size)) return _new_spectrum(CentroidedSpectrum, p)
[docs]class InstrumentModel(object): def __init__(self, instrument_type, resolving_power, at_mz=200): """ :param type: instrument type, must be one of 'orbitrap', 'fticr', 'tof'. :param resolving_power: instrument resolving power :param at_mz: value at which the resolving power is specified """ p = ims.instrument_profile_new(instrument_type.lower().encode('ascii'), resolving_power, at_mz) _raise_ims_exception_if_null(p) self.ptr = ffi.gc(p, ims.instrument_profile_free)
[docs] def resolvingPowerAt(self, mz): """ Calculates resolving power at a given m/z value """ return ims.instrument_resolving_power_at(self.ptr, mz)
[docs]class TheoreticalSpectrum(SpectrumBase): """ A bag of isotopic peaks computed for a single or multiple sum formulas. """
[docs] def copy(self): """ :returns: a (deep) copy of the instance :rtype: Spectrum """ return _new_spectrum(TheoreticalSpectrum, ims.spectrum_copy(self.ptr))
[docs] def centroids(self, instrument, min_abundance=1e-4, points_per_fwhm=25): """ Estimates centroided peaks for a given instrument model. :param instrument: instrument model :param min_abundance: minimum abundance for including a peak :param points_per_fwhm: grid density used for envelope calculation :returns: peaks visible with the instrument used :rtype: TheoreticalSpectrum """ assert self.ptr != ffi.NULL centroids = ims.spectrum_envelope_centroids(self.ptr, instrument.ptr, min_abundance, points_per_fwhm) return _new_spectrum(CentroidedSpectrum, centroids)
def __mul__(self, factor): """ Multiplies all intensities by the factor """ s = self.copy() ims.spectrum_multiply_inplace(s.ptr, factor) return s def __add__(self, other): """ Adds two theoretical spectra by simply merging masses and intensities of both. """ if type(other) is not TheoreticalSpectrum: raise TypeError("can't add TheoreticalSpectrum and {}".format(str(type(other)))) s = self.copy() ims.spectrum_add_inplace(s.ptr, other.ptr) return s
[docs] def envelope(self, instrument): """ Computes isotopic envelope for a given instrument model :param instrument: instrument model to use :returns: isotopic envelope as a function of mass :rtype: function float(mz: float) """ def envelopeFunc(mz): if isinstance(mz, numbers.Number): return ims.spectrum_envelope(self.ptr, instrument.ptr, mz) mzs = _cffi_buffer(mz, 'd') n = len(mz) buf = _cffi_buffer(n, 'f') ret = ims.spectrum_envelope_plot(self.ptr, instrument.ptr, mzs.ptr, n, buf.ptr) if ret < 0: _raise_ims_exception() return buf.python_data() return envelopeFunc
[docs]class CentroidedSpectrum(SpectrumBase): """ Centroided peaks of a profile/theoretical spectrum """
[docs] def copy(self): """ :returns: a (deep) copy of the instance :rtype: CentroidedSpectrum """ return _new_spectrum(CentroidedSpectrum, ims.spectrum_copy(self.ptr))
[docs] def charged(self, charge): """ Adds/subtracts electrons and returns a new object. :param charge: number of electrons to add :type charge: integer :returns: a spectrum with appropriately shifted masses :rtype: CentroidedSpectrum """ result = self.copy() result.addCharge(charge) return result
[docs] def trimmed(self, n_peaks): """ :param n_peaks: number of peaks to keep :returns: an isotope pattern with removed low-intensity peaks :rtype: CentroidedSpectrum """ result = self.copy() result.trim(n_peaks) return result
[docs] def removeIntensitiesBelow(self, min_intensity): ims.spectrum_trim_intensity(self.ptr, min_intensity)
[docs]def isotopePattern(sum_formula, threshold=1e-4, rel_threshold=True, desired_prob=None): """ Calculates isotopic peaks for a sum formula. :param sum_formula: text representation of an atomic composition :type sum_formula: str :param threshold: minimum peak abundance :type threshold: float :param rel_threshold: if True, threshold is relative to the highest peak, otherwise it is a probability :type rel_threshold: bool :param desired_prob: total probability covered by the result; if set, threshold parameter is ignored :type desired_prob: float | None """ assert threshold >= 0 and threshold < 1 assert desired_prob is None or (desired_prob > 0 and desired_prob <= 1) if desired_prob: s = ims.spectrum_new_from_sf(sum_formula.encode('ascii'), desired_prob) else: s = ims.spectrum_new_from_sf_thr(sum_formula.encode('ascii'), threshold, rel_threshold) return _new_spectrum(TheoreticalSpectrum, s)