from __future__ import print_function, unicode_literals
__all__ = ['platform_is_bigendian', 'freadnumpy', 'freadstruct', 'fread',
'needs_byteswap', 'check_read', 'RecordFile', 'unpack_from_file',
'seek_to_record', 'read_into', 'writeline', 'OpenRecordFile',
'Int2Asc', 'Asc2Int']
__doc__ = """
.. _FortranFileUtil
:mod:`FortranFileUtil` -- Fortran File interaction functions
============================================================
.. module:: FortranFileUtil
:platform: Unix, Windows
:synopsis: Provides utilities for interacting with Fortran
"unformatted" binary files. These files often
have records with integer buffers and formatted data.
Often the data has a byte order different than the
machine doing the reading. These utilities provide ways
for the user to get data and ignore the byte order and
data buffers.
.. moduleauthor:: Adam Hupp
"""
# Distribution Packages
import unittest
import sys
import struct
from operator import mul
# Site-Packages
# use numpy if available
# otherwise use numeric
from numpy import array, fromfile
import numpy as np
platform_is_bigendian = (sys.byteorder != 'little')
[docs]
def freadnumpy(ifile, count, read_type, return_type=None, byteswap=False):
if return_type is None:
return_type = read_type
if byteswap ^ platform_is_bigendian:
endian = '>'
else:
endian = '<'
# rfmt = "%s%s" % (endian, return_type)
fmt = "%s%s" % (endian, read_type)
return array(fromfile(ifile, dtype=fmt, count=count), dtype=return_type)
[docs]
def freadstruct(ifile, count, read_type, return_type=None, byteswap=False):
if return_type is None:
return_type = read_type
if byteswap ^ platform_is_bigendian:
endian = '>'
else:
endian = '<'
fmt = "%s%d%s" % (endian, count, read_type)
elem = unpack_from_file(fmt, ifile)
return array(elem, return_type)
# numpy style fread is more than 2 x as fast as struct
# preserving freadstruct (previously fread) until further notice
fread = freadnumpy
[docs]
def needs_byteswap(bigendian):
"""Determines if byteswapping is necessary for the given endianness
For example, if we are reading bigendian data, call
need_byteswap(True)
to see if that data needs to be swapped on the current platform
"""
return bigendian != platform_is_bigendian
[docs]
def check_read(requested, result):
"""Checks that the result of a read from a file was of the expected size
Raises EOFError if nothing was read, or IOError if the read was incomplete
"""
if len(result) != requested and requested != 0:
if result == '':
raise EOFError()
else:
raise IOError("Incomplete read, requested %d recieved %d" %
(requested, len(result)))
[docs]
class RecordFile(object):
"""
This class provides an easy interface to treat unformated
Fortran files like like record files.
TODO: count=-1 does not work in aread, off by one element?!?
TODO: error checking in aread, prevent reading past end of record
"""
[docs]
def tell(self):
"""
use file object tell
"""
return self.infile.tell()
def __init__(self, infile, bigendian=True):
"""
Arguments:
infile -- an open file-like object. Must be random access. Must
be a real file if the RecordFile.aread method is going to be used.
bigendian -- boolean, True if file is bigendian
***Assumes single 4 byte integer pad on either end
"""
if hasattr(infile, 'seek') and \
hasattr(infile, 'tell') and \
hasattr(infile, 'read'):
self.infile = infile
else:
try:
self.infile = open(infile, 'rb')
except Exception as e:
raise TypeError("infile is of " + str(type(infile)) +
": and can only be string or file; " + str(e))
# Is this platform a different byteorder than the file?
self.byteswap = needs_byteswap(bigendian)
if bigendian:
self.format_prefix = '>'
else:
self.format_prefix = '<'
self.infile.seek(0, 2)
self.length = self.infile.tell()
self._newrecord(0)
[docs]
def seek(self, offset):
if offset < self.length:
posnow = self.infile.tell()
relmov = offset - posnow
self.infile.seek(relmov, 1)
else:
raise ValueError(("File length = %i; record start " +
"requested %f") % (self.length, offset))
def _newrecord(self, offset):
"""Move to a new record starting at the given offset
"""
self.seek(offset)
self.record_start = offset
self.record_size = self.unpack("i")[0]
[docs]
def unpack(self, fmt):
"""Unpack a set of values, like the struct module
"""
data = unpack_from_file(self.format_prefix + fmt, self.infile)
data = tuple([d.decode() if isinstance(d, bytes) else d for d in data])
return data
[docs]
def read(self, fmt):
"""Unpack a set of values, like the struct module
"""
result = unpack_from_file(self.format_prefix + fmt, self.infile)
self.next()
return result
[docs]
def next(self):
"""Move to the next record.
Returns True if sucessful, False if no more records available.
"""
# 4 bytes for record size, start and end
offset = self.record_start + self.record_size + 8
if offset < self.length:
self._newrecord(offset)
return True
else:
self.infile.seek(0, 2) # EOF
return False
[docs]
def previous(self):
"""Move to the previous record.
Returns True if sucessful, False if no more records available.
"""
# 4 bytes for record size of this record
# 4 bytes for record size of previous record
if self.tell() == 4:
return False
offset = (-8, -4)[self.eof()]
# move to size byte of previous record
self.infile.seek(offset, 1)
# Read size of previous record
offset = self.unpack("i")[0]
# actual offset = this record start - size -
# previous record length + header
offset = self.tell() - offset - 8
if offset >= 0:
self._newrecord(offset)
return True
else:
self.infile.seek(0, 1) # EOF
return False
[docs]
def aread(self, type, count):
"""Read a Numeric array
Arguments:
type -- a type code, one of those used by the struct or Numeric modules
count -- number of elements to read. If not specified,
defaults to all remaining bytes in current record
"""
if count == -1:
# NOTE: This does not work right!
remain = self.record_size - (self.infile.tell() -
self.record_start)
count = remain / struct.calcsize(type)
data = fread(self.infile, count,
type, type,
self.byteswap)
data = np.array([d.decode() if hasattr(
d, 'decode') else d for d in data])
return data
[docs]
def eof(self):
"""Returns true if this RecordFile is at the end of the file
"""
return self.infile.tell() == self.length
[docs]
def restart_record(self):
"""Move to beginning of record
"""
self._newrecord(self.record_start)
[docs]
def unpack_from_file(fmt, filein):
"""Like struct.unpack, but reads from a file instead of a string
"""
fmtsize = struct.calcsize(fmt)
data = filein.read(fmtsize)
check_read(fmtsize, data)
data = struct.unpack(fmt, data)
data = tuple([d.decode() if hasattr(d, 'decode') else d for d in data])
return data
[docs]
def seek_to_record(rf, rid, fmt):
"""Searches for a record beginning with rid by unpacking
the first struct.calcsize(fmt) bytes and comparing the
results
"""
while True:
cid = rf.unpack(fmt)
if rid == cid:
rf.restart_record()
break
else:
if not rf.next():
raise ValueError("Time %s not found" % str(rid))
[docs]
def read_into(rf, dest, id_fmt, data_fmt='f'):
"""Read an array from a RecordFile into a Numeric array.
I don't know how this will work if dest is anything but 2D.
Arguments:
rf -- a RecordFile instance
dest -- a Numeric array, possibly a slice.
There must be at least product(shape(dest)) elements
left in the current record
"""
from functools import reduce
if rf.eof():
raise EOFError()
id = rf.unpack(id_fmt)
rd = rf.aread(data_fmt, reduce(mul, dest.shape))
dest[...] = rd.reshape(dest.shape)
return id
[docs]
def writeline(d, fmt, ForceBig=True):
"""writeline appends length integers
and determines if byteswap is necessary
"""
rlen = struct.calcsize(fmt)
rfmt = "i" + fmt + "i"
if sys.byteorder == 'little' or ForceBig:
rfmt = '>' + rfmt
d = [i for i in d]
d.insert(0, rlen)
d.append(rlen)
try:
return struct.pack(rfmt, *d)
except Exception:
print(d)
raise
[docs]
def OpenRecordFile(rf):
"""All CAMx files use FortranFileUtil.RecordFiles
as inputs. This function decides if the input was
of the right type. If not, it tries to make a RecordFile
from the input.
rf - str, unicode, file, RecordFile
"""
if isinstance(rf, RecordFile):
pass
else:
rf = RecordFile(rf)
rf._newrecord(0)
return rf
[docs]
def Int2Asc(mspec):
"""Some CAMx input files have text stored as
integers. This function helps to undo that
"""
spcname = ""
for c in mspec:
spcname += chr((((c - 32) // 256 - 32) // 256 - 32) // 256)
return spcname
[docs]
def Asc2Int(spcname):
"""Some CAMx output files need the text stored
as integers. This function helps to do that
"""
mspec = []
for c in spcname:
mspec.append(int((((((ord(c) * 256) + 32) * 256) + 32) * 256) + 32))
return mspec
class TestFileUtils(unittest.TestCase):
def setUp(self):
from tempfile import TemporaryFile as tf
self.tmpfile = tf(mode='w+b')
# writing tempfile with Fortran unformatted binary format
# 1st line is 0-19 as floats
# 2nd line is 0-19 as ints
# 3rd line is 0-19 as strings
write = self.tmpfile.write
write(b'\x00\x00\x00P' +
b'\x00\x00\x00\x00?\x80\x00\x00@\x00' +
b'\x00\x00@@\x00\x00@\x80\x00\x00@\xa0' +
b'\x00\x00@\xc0\x00\x00@\xe0\x00\x00A' +
b'\x00\x00\x00A\x10\x00\x00A \x00\x00A0' +
b'\x00\x00A@\x00\x00AP\x00\x00A`\x00\x00Ap' +
b'\x00\x00A\x80\x00\x00A\x88\x00\x00A' +
b'\x90\x00\x00A\x98\x00\x00' + b'\x00\x00\x00P')
write(b'\x00\x00\x00P' + b'\x00\x00\x00\x00\x00\x00\x00\x01' +
b'\x00\x00\x00\x02\x00\x00\x00\x03' +
b'\x00\x00\x00\x04\x00\x00\x00\x05' +
b'\x00\x00\x00\x06\x00\x00\x00\x07' +
b'\x00\x00\x00\x08\x00\x00\x00\t' +
b'\x00\x00\x00\n\x00\x00\x00\x0b\x00\x00\x00\x0c' +
b'\x00\x00\x00\r\x00\x00\x00\x0e\x00\x00\x00\x0f' +
b'\x00\x00\x00\x10\x00\x00\x00\x11' +
b'\x00\x00\x00\x12\x00\x00\x00\x13' + b'\x00\x00\x00P')
write(b'\x00\x00\x00\x14' +
b'The quick brown fox ' + b'\x00\x00\x00\x14')
self.tmprf = RecordFile(self.tmpfile)
def testAdvance(self):
self.tmprf._newrecord(0)
self.assertEquals(self.tmprf.tell(), 4)
self.tmprf.next()
self.assertEquals(self.tmprf.tell(), 92)
self.tmprf.next()
self.assertEquals(self.tmprf.tell(), 180)
self.failIf(self.tmprf.next())
self.assertEquals(self.tmprf.tell(), 204)
self.failIf(self.tmprf.next())
self.tmprf.previous()
self.assertEquals(self.tmprf.tell(), 180)
self.tmprf.previous()
self.assertEquals(self.tmprf.tell(), 92)
self.tmprf.previous()
self.assertEquals(self.tmprf.tell(), 4)
self.failIf(self.tmprf.previous())
self.assertEquals(self.tmprf.tell(), 4)
def testFloat(self):
from numpy import arange
self.tmprf._newrecord(0)
self.assertTrue((arange(20, dtype='f') ==
self.tmprf.aread('f', 20)).all())
def testReadInto(self):
from numpy import arange, zeros
dest = zeros((4, 5), 'f')
self.tmprf._newrecord(0)
# Necessary because written to anticipate fortran swapping of axes
read_into(self.tmprf, dest, '', 'f')
self.assertTrue((arange(20, dtype='f').reshape((4, 5)) == dest).all())
def testInt(self):
from numpy import arange
self.tmprf._newrecord(0)
self.tmprf.next()
self.assertTrue((arange(20, dtype='i') ==
self.tmprf.aread('i', 20)).all())
def testSeek(self):
self.tmprf._newrecord(0)
seek_to_record(self.tmprf, (0., 1., 2.), "fff")
self.assertEquals(self.tmprf.tell(), 4)
seek_to_record(self.tmprf, (0, 1, 2), "iii")
self.assertEquals(self.tmprf.tell(), 92)
seek_to_record(self.tmprf, ('The',), '3s')
self.assertEquals(self.tmprf.tell(), 180)
seek_to_record(self.tmprf, ('T', 'h', 'e',), '3c')
self.assertEquals(self.tmprf.tell(), 180)
def testStr(self):
from numpy import array
self.tmprf._newrecord(0)
self.tmprf.next()
self.tmprf.next()
checkv = array(["The quick brown fox "])
testv = self.tmprf.aread('S20', 1)
self.assertTrue(np.any(checkv == testv))
def runTest(self):
pass
if __name__ == '__main__':
unittest.main()