import PyIDL as CORBA
import struct
import StringIO
class OutputBuffer(object):
""" Marshaling buffer """
def __init__(self, buf=None):
if buf is None:
buf = StringIO.StringIO()
self._buf = buf
self._pos = 0
self._endian = 1 # little endian
def _get_endian(self):
return self._endian
endian = property(fget=_get_endian)
def getvalue(self):
return self._buf.getvalue()
def close(self):
self._buf.close()
def write(self, str_):
self._buf.write(str_)
def _align(self, size):
while (self._pos % size) != 0:
self._buf.write(chr(0))
self._pos += 1
def _pack(self, fmt, value):
str_ = struct.pack(fmt, value)
self._buf.write(str_)
self._pos += len(str_)
def char__marshal(self, value):
self._pack('c', value)
def wchar__marshal(self, value):
raise NotImplementedError
def octet__marshal(self, value):
self._pack('B', value)
def short__marshal(self, value):
self._align(2)
self._pack('h', value)
def unsigned_short__marshal(self, value):
self._align(2)
self._pack('H', value)
def long__marshal(self, value):
self._align(4)
self._pack('l', value)
def unsigned_long__marshal(self, value):
self._align(4)
self._pack('L', value)
def long_long__marshal(self, value):
self._align(8)
self._pack('q', value)
def unsigned_long_long__marshal(self, value):
self._align(8)
self._pack('Q', value)
def float__marshal(self, value):
self._align(4)
self._pack('f', value)
def double__marshal(self, value):
self._align(8)
self._pack('d', value)
def long_double__marshal(self, value):
raise NotImplementedError
def boolean__marshal(self, value):
if value == True:
self._pack('B', 1)
else:
self._pack('B', 0)
def string__marshal(self, value):
self._align(4)
value += chr(0)
length = len(value)
self._pack('L', length)
self._pack(str(length) + 's', value)
def wstring__marshal(self, value):
raise NotImplementedError
class InputBuffer(object):
""" Demarshaling buffer """
def __init__(self, input=None, endian=None):
if input == None:
self._buf = StringIO.StringIO('')
if isinstance(input, str):
self._buf = StringIO.StringIO(input)
else:
self._buf = input
self._pos = 0
self._endian = endian
def _set_endian(self, value):
self._endian = value
def _get_endian(self):
return self._endian
endian = property(fset=_set_endian, fget=_get_endian)
def getvalue(self):
return self._buf.getvalue()
def close(self):
self._buf.close()
def read(self, size=-1):
return self._buf.read(size)
def _align(self, size):
while (self._pos % size) != 0:
dummy = self._buf.read(1)
self._pos += 1
def _unpack(self, fmt):
size = struct.calcsize(fmt)
chunk = self._buf.read(size)
if len(chunk) < size:
print "Not enough data!"
raise CORBA.SystemException('IDL:CORBA/INTERNAL:1.0', 8,
CORBA.CORBA_COMPLETED_MAYBE)
self._pos += size
return struct.unpack(fmt, chunk)[0]
def char__demarshal(self):
return self._unpack('c')
def wchar__demarshal(self):
raise NotImplementedError
def octet__demarshal(self):
return self._unpack('B')
def short__demarshal(self):
self._align(2)
return self._unpack('h')
def unsigned_short__demarshal(self):
self._align(2)
return self._unpack('H')
def long__demarshal(self):
self._align(4)
return self._unpack('l')
def unsigned_long__demarshal(self):
self._align(4)
return self._unpack('L')
def long_long__demarshal(self):
self._align(8)
return self._unpack('q')
def unsigned_long_long__demarshal(self):
self._align(8)
return self._unpack('Q')
def float__demarshal(self):
self._align(4)
return self._unpack('f')
def double__demarshal(self):
self._align(8)
return self._unpack('d')
def long_double__demarshal(self):
raise NotImplementedError
def boolean__demarshal(self):
return self._unpack('B') == 1
def string__demarshal(self):
self._align(4)
length = self._unpack('L')
str_ = self._unpack(str(length) + 's')
return str_[:len(str_)-1]
def wstring__demarshal(self):
raise NotImplementedError