Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
218 changes: 142 additions & 76 deletions nmrpy/data_objects.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
import numpy
import scipy
from matplotlib import pyplot
import lmfit
import nmrglue
import numbers
from scipy.optimize import leastsq
from multiprocessing import Pool, cpu_count
from nmrpy.plotting import *
import os
import pickle
import re

class Base():
_complex_dtypes = [
Expand All @@ -17,7 +16,7 @@ class Base():
numpy.dtype('clongdouble'),
]

_file_formats = ['varian', 'bruker', None]
_file_formats = ['varian', 'bruker', 'spinsolve', None]

def __init__(self, *args, **kwargs):
self.id = kwargs.get('id', None)
Expand Down Expand Up @@ -114,6 +113,8 @@ def _extract_procpar(self, procpar):
return self._extract_procpar_bruker(procpar)
elif self._file_format == 'varian':
return self._extract_procpar_varian(procpar)
elif self._file_format == 'spinsolve':
return self._extract_procpar_spinsolve(procpar)
#else:
# raise AttributeError('Could not parse procpar.')

Expand Down Expand Up @@ -161,7 +162,7 @@ def _extract_procpar_varian(procpar):
return params

@staticmethod
def _extract_procpar_bruker(procpar):
def _extract_procpar_bruker(procpar):
"""
Extract some commonly-used NMR parameters (using Bruker denotations)
and return a parameter dictionary 'params'.
Expand Down Expand Up @@ -203,7 +204,51 @@ def _extract_procpar_bruker(procpar):
sfrq=sfrq,
reffrq=reffrq,
sw_left=sw_left,
)
)
return params

@staticmethod
def _extract_procpar_spinsolve(procpar):
"""
Extract some commonly-used NMR parameters (using Spinsolve denotations)
and return a parameter dictionary 'params'.
"""
nt = procpar['acqu']['nrScans']
sw_hz = procpar['acqu']['bandwidth'] * 1000.0
sfrq = procpar['acqu']['b1Freq']
sw = sw_hz / sfrq
l_frq = procpar['acqu']['lowestFrequency']
reffrq = sfrq - (l_frq + 0.5 * sw_hz) / 1e6
sw_left = (0.5 + 1e6 * (sfrq - reffrq) / sw_hz) * sw_hz / sfrq
options = procpar['acqu']['Options'].split(',')
at = 6.554 # default in reaction monitoring mode, use if not provided
for o in options:
if 'AcquisitionTime' in o:
at = float(re.search('\\(.+\\)', o)[0].strip('()')) # overwrite default
rt = procpar['acqu']['repTime']
if rt >= 600: # sometimes reported in s, sometimes in ms, 600 is a reasonable cutoff
rt /= 1000
d1 = rt - at
al = procpar['arraylength']
acqtime_array = numpy.zeros((al))
acqtime_array[0] = nt * rt / 2
for i in range(1, al):
acqtime_array[i] = acqtime_array[i - 1] + nt * rt
acqtime_array /= 60. # convert to min
params = dict(
at=at,
d1=d1,
rt=rt,
nt_array=None,
nt=nt,
acqtime_array=acqtime_array,
acqtime=None,
sw=sw,
sw_hz=sw_hz,
sfrq=sfrq,
reffrq=reffrq,
sw_left=sw_left,
)
return params

class Fid(Base):
Expand Down Expand Up @@ -524,7 +569,7 @@ def _ft(cls, list_params):
if Fid._is_valid_dataset(data) and file_format in Fid._file_formats:
data = numpy.array(numpy.fft.fft(data), dtype=data.dtype)
s = len(data)
if file_format == 'varian' or file_format == None:
if file_format == 'varian' or file_format == 'spinsolve' or file_format == None:
ft_data = numpy.append(data[int(s / 2.0):], data[: int(s / 2.0)])
if file_format == 'bruker':
ft_data = numpy.append(data[int(s / 2.0):: -1], data[s: int(s / 2.0): -1])
Expand Down Expand Up @@ -1388,29 +1433,29 @@ def _setup_params(fid_array):
:arg fid_array: a :class:`~nmrpy.data_objects.FidArray` object
"""
nt_list = []
is_bruker = False
is_varian = False
for fid in fid_array.get_fids():
fid_index = int(fid.id.split('fid')[1])
if fid._file_format == 'varian':
# Varian files provide nt_array and acqtime_array. For
# each FID, nt and acqtime are extracted from these.
is_varian = True
fid._params['nt'] = float(fid._params['nt_array'][fid_index])
fid._params['acqtime'] = float(fid._params['acqtime_array'][fid_index])
# Remove nt_array and acqtime_array from the Fid objects
del fid._params['nt_array']
del fid._params['acqtime_array']
elif fid._file_format == 'bruker':
elif fid._file_format == 'bruker' or fid._file_format == 'spinsolve':
# Bruker files provide nt and acqtime_array. For each
# FID, acqtime is extracted from acqtime_array.
# Additionally, nt is added to the nt_list for later use
# in the FidArray object.
is_bruker = True
nt_list.append(fid._params['nt'])
fid._params['acqtime'] = float(fid._params['acqtime_array'][fid_index])
# Remove nt_array and acqtime_array from the Fid objects
del fid._params['nt_array']
del fid._params['acqtime_array']
if is_bruker:
if not is_varian:
# Create nt_array from the nt_list for the FidArray object
fid_array._params['nt_array'] = numpy.array(nt_list)
# Remove nt and acqtime from the FidArray object
Expand Down Expand Up @@ -1442,9 +1487,9 @@ def from_path(cls, fid_path='.', file_format=None, arrayset=None):

:keyword fid_path: filepath to .fid directory

:keyword file_format: 'varian' or 'bruker', usually unnecessary
:keyword file_format: 'varian' or 'bruker' or 'spinsolve', usually unnecessary

:keyword arrayset: (int) array set for interleaved spectra,
:keyword arrayset: (int) array set for interleaved spectra (implemented for Bruker only),
user is prompted if not specified
"""
if not file_format:
Expand All @@ -1461,6 +1506,9 @@ def from_path(cls, fid_path='.', file_format=None, arrayset=None):
elif file_format == 'bruker':
importer = BrukerImporter(fid_path=fid_path)
importer.import_fid(arrayset=arrayset)
elif file_format == 'spinsolve':
importer = SpinsolveImporter(fid_path=fid_path)
importer.import_fid()
elif file_format == 'nmrpy':
with open(fid_path, 'rb') as f:
return pickle.load(f)
Expand Down Expand Up @@ -2067,92 +2115,90 @@ def data(self, data):
def import_fid(self, arrayset=None):
"""
This will first attempt to import Bruker data. Failing that, Varian.
Failing that, Magritek Spinsolve.
"""
try:
print('Attempting Bruker')
print('Attempting Bruker...')
brukerimporter = BrukerImporter(fid_path=self.fid_path)
brukerimporter.import_fid(arrayset=arrayset)
self.data = brukerimporter.data
self._procpar = brukerimporter._procpar
self._file_format = brukerimporter._file_format
return
except (FileNotFoundError, OSError):
print('fid_path does not specify a valid .fid directory.')
return
except (TypeError, IndexError):
print('probably not Bruker data')
try:
print('Attempting Varian')
except (TypeError, IndexError, OSError, ValueError):
print('probably not Bruker data!\n')
try:
print('Attempting Varian...')
varianimporter = VarianImporter(fid_path=self.fid_path)
varianimporter.import_fid()
self._procpar = varianimporter._procpar
self.data = varianimporter.data
self._file_format = varianimporter._file_format
return
except TypeError:
print('probably not Varian data')
except (TypeError, FileNotFoundError, OSError):
print('probably not Varian data!\n')
try:
print('Attempting Magritek Spinsolve...')
spinsolveimporter = SpinsolveImporter(fid_path=self.fid_path)
spinsolveimporter.import_fid()
self._procpar = spinsolveimporter._procpar
self.data = spinsolveimporter.data
self._file_format = spinsolveimporter._file_format
return
except (TypeError):
print('probably not Magritek Spinsolve data!')

class VarianImporter(Importer):

def import_fid(self):
try:
procpar, data = nmrglue.varian.read(self.fid_path)
self.data = data
self._procpar = procpar
self._file_format = 'varian'
except FileNotFoundError:
print('fid_path does not specify a valid .fid directory.')
except OSError:
print('fid_path does not specify a valid .fid directory.')

procpar, data = nmrglue.varian.read(self.fid_path)
self.data = data
self._procpar = procpar
self._file_format = 'varian'

class BrukerImporter(Importer):

def import_fid(self, arrayset=None):
try:
dirs = [int(i) for i in os.listdir(self.fid_path) if \
os.path.isdir(self.fid_path+os.path.sep+i)]
dirs.sort()
dirs = [str(i) for i in dirs]
alldata = []
for d in dirs:
procpar, data = nmrglue.bruker.read(self.fid_path+os.path.sep+d)
alldata.append((procpar, data))
self.alldata = alldata
incr = 1
while True:
if len(alldata) == 1:
break
if alldata[incr][1].shape == alldata[0][1].shape:
break
incr += 1
if incr > 1:
if arrayset == None:
print('Total of '+str(incr)+' alternating FidArrays found.')
arrayset = input('Which one to import? ')
arrayset = int(arrayset)
else:
arrayset = arrayset
if arrayset < 1 or arrayset > incr:
raise ValueError('Select a value between 1 and '
+ str(incr) + '.')
dirs = [int(i) for i in os.listdir(self.fid_path) if \
os.path.isdir(self.fid_path+os.path.sep+i)]
dirs.sort()
dirs = [str(i) for i in dirs]
alldata = []
for d in dirs:
procpar, data = nmrglue.bruker.read(self.fid_path+os.path.sep+d)
alldata.append((procpar, data))
self.alldata = alldata
incr = 1
while True:
if len(alldata) == 1:
break
if alldata[incr][1].shape == alldata[0][1].shape:
break
incr += 1
if incr > 1:
if arrayset == None:
print('Total of '+str(incr)+' alternating FidArrays found.')
arrayset = input('Which one to import? ')
arrayset = int(arrayset)
else:
arrayset = 1
self.incr = incr
procpar = alldata[arrayset-1][0]
data = numpy.vstack([d[1] for d in alldata[(arrayset-1)::incr]])
self.data = data
self._procpar = procpar
self._file_format = 'bruker'
self.data = nmrglue.bruker.remove_digital_filter(procpar, self.data)
self._procpar['tdelta'], self._procpar['tcum'],\
self._procpar['tsingle'] = self._get_time_delta()
self._procpar['arraylength'] = self.data.shape[0]
self._procpar['arrayset'] = arrayset
except FileNotFoundError:
print('fid_path does not specify a valid .fid directory.')
except OSError:
print('fid_path does not specify a valid .fid directory.')

arrayset = arrayset
if arrayset < 1 or arrayset > incr:
raise ValueError('Select a value between 1 and '
+ str(incr) + '.')
else:
arrayset = 1
self.incr = incr
procpar = alldata[arrayset-1][0]
data = numpy.vstack([d[1] for d in alldata[(arrayset-1)::incr]])
self.data = data
self._procpar = procpar
self._file_format = 'bruker'
self.data = nmrglue.bruker.remove_digital_filter(procpar, self.data)
self._procpar['tdelta'], self._procpar['tcum'],\
self._procpar['tsingle'] = self._get_time_delta()
self._procpar['arraylength'] = self.data.shape[0]
self._procpar['arrayset'] = arrayset

def _get_time_delta(self):
td = 0.0
tcum = []
Expand All @@ -2169,5 +2215,25 @@ def _get_time_delta(self):
tsingle.append(tot)
return (td, numpy.array(tcum), numpy.array(tsingle))

class SpinsolveImporter(Importer):
def import_fid(self):
dirs = [
i
for i in os.listdir(self.fid_path)
if os.path.isdir(self.fid_path + os.path.sep + i)
]
dirs.sort()
dirs = [str(i) for i in dirs]
alldata = []
for d in dirs:
procpar, data = nmrglue.spinsolve.read(self.fid_path + os.path.sep + d)
alldata.append((procpar, data))
self.alldata = alldata
data = numpy.vstack([d[1] for d in alldata])
self.data = data
self._procpar = procpar
self._file_format = 'spinsolve'
self._procpar['arraylength'] = self.data.shape[0]

if __name__ == '__main__':
pass
Loading